Last active
July 2, 2021 20:01
-
-
Save ruescasd/3b58f99ad0217d782edf58e64e7e1b33 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "b3" | |
version = "0.1.0" | |
edition = "2018" | |
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |
[dependencies] | |
rand = "0.7.3" | |
sha2 = "0.9.1" | |
hex = "0.4.3" | |
r2d2 = "0.8.9" | |
r2d2_postgres = "0.18.0" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
no lock: 896.861 / s | |
lock: 881.834 / s | |
*/ | |
use std::thread; | |
use rand::{thread_rng, Rng}; | |
use rand::distributions::Alphanumeric; | |
use std::sync::{Arc, Mutex}; | |
use sha2::{Digest, Sha512}; | |
use r2d2; | |
use r2d2_postgres::{postgres::NoTls, PostgresConnectionManager}; | |
fn main() { | |
let mtx: Arc<Mutex<String>> = Arc::new(Mutex::new(String::from(""))); | |
let manager = PostgresConnectionManager::new( | |
"host=localhost port=5433 user=b3".parse().unwrap(), | |
NoTls, | |
); | |
let pool = r2d2::Pool::new(manager).unwrap(); | |
let mut client = pool.get().unwrap(); | |
client.execute("drop table test", &[]).unwrap(); | |
client.batch_execute(" | |
CREATE TABLE test ( | |
id SERIAL PRIMARY KEY, | |
content TEXT NOT NULL, | |
h1 TEXT NOT NULL, | |
h2 TEXT NOT NULL | |
) | |
").unwrap(); | |
let now = std::time::Instant::now(); | |
let handles: Vec<_> = (0..10).into_iter().map(|_| { | |
let mtx = mtx.clone(); | |
let pool = pool.clone(); | |
thread::spawn(move || { | |
for _ in 0..100 { | |
insert(mtx.clone(), pool.clone(), random_string()); | |
} | |
}) | |
}).collect(); | |
for h in handles { | |
h.join().unwrap(); | |
} | |
let rate = (1000.0 * 1000.0) / (now.elapsed().as_millis() as f32); | |
println!("{:.3} / s", rate); | |
/* | |
for row in client.query("SELECT id, content, h1, h2 FROM test order by id", &[]).unwrap() { | |
let id: i32 = row.get(0); | |
let content: &str = row.get(1); | |
let h1: &str = row.get(2); | |
let h2: &str = row.get(3); | |
println!("found: {} {} {} {}", id, content, h1, h2); | |
}*/ | |
} | |
pub fn insert(mtx: Arc<Mutex<String>>, pool: r2d2::Pool<PostgresConnectionManager<NoTls>>, content: String) { | |
let mut client = pool.get().unwrap(); | |
let mtx = Arc::clone(&mtx); | |
let mut head_mtx = mtx.lock().unwrap(); | |
let head = if head_mtx.len() > 0 { | |
(*head_mtx).clone() | |
} | |
else { | |
content.clone() | |
}; | |
// uncomment for no lock | |
// let head = String::from("fixed"); | |
let (h1, h2): (String, String) = hash(content.clone(), head.clone()); | |
*head_mtx = h2.clone(); | |
drop(head_mtx); | |
let result = client.execute( | |
"INSERT INTO test (content, h1, h2) VALUES ($1, $2, $3)", | |
&[&content, &h1, &h2], | |
); | |
if result.is_err() { | |
let mut head_mtx = mtx.lock().unwrap(); | |
*head_mtx = head; | |
} | |
} | |
fn random_string() -> String { | |
let ret: String = thread_rng() | |
.sample_iter(&Alphanumeric) | |
.take(128) | |
.map(char::from) | |
.collect(); | |
ret | |
} | |
pub fn hash(content: String, head: String) -> (String, String) { | |
let mut hasher = Sha512::new(); | |
hasher.update(content.as_bytes()); | |
let h1 = hex::encode(hasher.finalize()); | |
let mut hasher2 = Sha512::new(); | |
hasher2.update(h1.as_bytes()); | |
hasher2.update(head.as_bytes()); | |
let h2 = hex::encode(hasher2.finalize()); | |
(h1, h2) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
508.388 / s | |
*/ | |
#[macro_use] | |
extern crate quick_error; | |
use std::thread; | |
use std::io::Write; | |
use std::time::Duration; | |
use std::sync::{Arc, Mutex}; | |
use rand::{thread_rng, Rng}; | |
use rand::distributions::Alphanumeric; | |
use std::time::SystemTime; | |
use sha2::{Digest, Sha512}; | |
use r2d2; | |
use r2d2::PooledConnection; | |
use r2d2_postgres::{postgres::NoTls, PostgresConnectionManager, postgres::Error}; | |
quick_error! { | |
#[derive(Debug)] | |
pub enum WriteError { | |
Empty{} | |
PsqlError(err: Error) { | |
from() | |
} | |
IOError(err: std::io::Error) { | |
from() | |
} | |
} | |
} | |
struct Entry { | |
content: String, | |
h: String | |
} | |
impl Entry { | |
fn new(content: String, h: String) -> Entry { | |
Entry { | |
content, h | |
} | |
} | |
} | |
struct Entries { | |
head: String, | |
entries: Vec<Entry>, | |
time: SystemTime | |
} | |
impl Entries { | |
fn new(head: String, entries: Vec<Entry>, time: SystemTime) -> Entries { | |
Entries { | |
head, entries, time | |
} | |
} | |
} | |
impl Default for Entries { | |
fn default() -> Entries { | |
Entries::new(String::from("ROOT"), vec![], SystemTime::now()) | |
} | |
} | |
fn main() { | |
let mtx: Arc<Mutex<Entries>> = Arc::new(Mutex::new(Entries::default())); | |
let manager = PostgresConnectionManager::new( | |
"host=localhost port=5433 user=b3".parse().unwrap(), | |
NoTls, | |
); | |
let pool = r2d2::Pool::new(manager).unwrap(); | |
let mut client = pool.get().unwrap(); | |
client.execute("drop table test", &[]).unwrap(); | |
client.batch_execute(" | |
CREATE TABLE test ( | |
id SERIAL PRIMARY KEY, | |
content TEXT NOT NULL, | |
h1 TEXT NOT NULL, | |
h2 TEXT NOT NULL | |
) | |
").unwrap(); | |
let now = std::time::Instant::now(); | |
let handles: Vec<_> = (0..10).into_iter().map(|_| { | |
let mtx = mtx.clone(); | |
thread::spawn(move || { | |
for _ in 0..100 { | |
let mut e = mtx.lock().unwrap(); | |
let entry = Entry::new(random_string(), random_string()); | |
e.entries.push(entry); | |
drop(e); | |
thread::sleep(Duration::from_millis(10)); | |
} | |
}) | |
}).collect(); | |
let mtx = mtx.clone(); | |
let mut hash_conn = pool.get().unwrap(); | |
let hasher = thread::spawn(move || { | |
for _ in 0..20 { | |
let mut e = mtx.lock().unwrap(); | |
let elapsed = e.time.elapsed().unwrap().as_millis(); | |
let count = e.entries.len(); | |
if count == 0 { | |
break; | |
} | |
if count > 50 || (elapsed > 500 && count > 0) { | |
print!("Running write [count={}] [elapsed={}]..", count, elapsed); | |
let now = std::time::Instant::now(); | |
let result = write(&mut hash_conn, &e.entries, e.head.clone()); | |
if result.is_ok() { | |
*e = Entries::default(); | |
} | |
else { | |
println!("result is {:?}", result); | |
} | |
println!("done[{}ms]", now.elapsed().as_millis()); | |
} | |
drop(e); | |
thread::sleep(Duration::from_millis(200)); | |
} | |
}); | |
hasher.join().unwrap(); | |
for h in handles { | |
h.join().unwrap(); | |
} | |
let rate = (1000.0 * 1000.0) / (now.elapsed().as_millis() as f32); | |
println!("{:.3} / s", rate); | |
/*for row in client.query("SELECT id, content, h1, h2 FROM test order by id", &[]).unwrap() { | |
let id: i32 = row.get(0); | |
let content: &str = row.get(1); | |
let h1: &str = row.get(2); | |
let h2: &str = row.get(3); | |
println!("found: {} {} {} {}", id, content, h1, h2); | |
}*/ | |
} | |
fn write(client: &mut PooledConnection<PostgresConnectionManager<NoTls>>, entries: &Vec<Entry>, head: String) -> Result<(), WriteError> { | |
let mut writer = client.copy_in("COPY test(content, h1, h2) FROM stdin")?; | |
let mut head = head; | |
for entry in entries { | |
let (h1, h2) = hash(&entry.content, &head); | |
let row = format!("{}\t{}\t{}\n", entry.content, h1, h2); | |
writer.write_all(row.as_bytes())?; | |
head = h2; | |
} | |
writer.finish()?; | |
Ok(()) | |
} | |
fn random_string() -> String { | |
let ret: String = thread_rng() | |
.sample_iter(&Alphanumeric) | |
.take(128) | |
.map(char::from) | |
.collect(); | |
ret | |
} | |
pub fn hash(content: &String, head: &String) -> (String, String) { | |
let mut hasher = Sha512::new(); | |
hasher.update(content.as_bytes()); | |
let h1 = hex::encode(hasher.finalize()); | |
let mut hasher2 = Sha512::new(); | |
hasher2.update(h1.as_bytes()); | |
hasher2.update(head.as_bytes()); | |
let h2 = hex::encode(hasher2.finalize()); | |
(h1, h2) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment