Created
August 3, 2018 21:39
-
-
Save bovee/e9c979799ebc6d0c29156c995518eec7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::cell::UnsafeCell; | |
use std::io::Write; | |
// `parking_lot` evidently has better implementations of some of these sync primatives? | |
use std::sync::atomic::{AtomicUsize, Ordering}; | |
use std::sync::{Arc, Condvar, Mutex}; | |
use errors::Result; | |
const BUFFER_SIZE: usize = 1000; | |
pub struct OrderedWriter { | |
start_line: AtomicUsize, | |
end_line: AtomicUsize, | |
n_lines: AtomicUsize, | |
flush_lock: Arc<(Mutex<bool>, Condvar)>, | |
buffer: UnsafeCell<Vec<String>>, | |
} | |
unsafe impl Sync for OrderedWriter {} | |
impl OrderedWriter { | |
pub fn new() -> Self { | |
OrderedWriter { | |
start_line: AtomicUsize::new(0), | |
end_line: AtomicUsize::new(BUFFER_SIZE), | |
n_lines: AtomicUsize::new(0), | |
flush_lock: Arc::new((Mutex::new(false), Condvar::new())), | |
buffer: UnsafeCell::new(vec![String::new(); BUFFER_SIZE]), | |
} | |
} | |
pub fn write(&self, line_number: usize, line: String) { | |
let buffer = unsafe { &mut *self.buffer.get() }; | |
buffer[line_number - self.start_line.load(Ordering::Relaxed)] = line; | |
// we add one in to the atomic and add 1 to the value returned to be consistant | |
let n_lines = self.n_lines.fetch_add(1, Ordering::Relaxed) + 1; | |
self.release_lock(n_lines); | |
} | |
/// use this after processing all the lines to make sure the last `synced_flush` | |
/// gets called | |
pub fn last_line(&self, line_number: usize) { | |
self.end_line.store(line_number, Ordering::Relaxed); | |
let n_lines = self.n_lines.load(Ordering::Relaxed); | |
self.release_lock(n_lines); | |
} | |
#[inline] | |
fn release_lock(&self, n_lines: usize) { | |
if n_lines == self.end_line.load(Ordering::Relaxed) { | |
// release the lock for `check_lock` to take | |
let &(ref lock, ref cvar) = &*self.flush_lock; | |
let mut flush = lock.lock().unwrap(); | |
*flush = true; | |
// this could be `notify_all` too, but only the IO thread should be | |
// calling `synced_flush` anyhow | |
cvar.notify_one(); | |
} | |
} | |
pub fn synced_flush(&self, writer: &mut Write, line_number: Option<usize>) -> Result<()> { | |
// if `line_number` is specified, make sure it's at the end of the buffer | |
// otherwise return until we've reached the end | |
if let Some(ln) = line_number { | |
if ln < self.end_line.load(Ordering::Relaxed) { | |
return Ok(()); | |
} | |
} | |
// halt until all the `write`s are done | |
{ | |
let &(ref lock, ref cvar) = &*self.flush_lock; | |
let mut flush = lock.lock().unwrap(); | |
while !*flush { | |
flush = cvar.wait(flush).unwrap(); | |
} | |
} | |
// self.n_lines is the total number of lines "written" so far, but n_lines here is the | |
// number of lines "to write" | |
let n_lines = | |
self.n_lines.load(Ordering::Relaxed) - self.start_line.load(Ordering::Relaxed); | |
// now we write out everything | |
let buffer = unsafe { &mut *self.buffer.get() }; | |
for line in buffer.iter().take(n_lines) { | |
writer.write_all(line.as_bytes())?; | |
} | |
// and bump the start and end | |
let start_line = self.start_line.fetch_add(n_lines, Ordering::Relaxed) + n_lines; | |
self.end_line | |
.store(start_line + BUFFER_SIZE, Ordering::Relaxed); | |
// relock the flush_lock | |
let &(ref lock, ref _cvar) = &*self.flush_lock; | |
let mut flush = lock.lock().unwrap(); | |
*flush = false; | |
Ok(()) | |
} | |
} | |
impl Default for OrderedWriter { | |
fn default() -> Self { | |
OrderedWriter { | |
start_line: AtomicUsize::new(0), | |
end_line: AtomicUsize::new(BUFFER_SIZE), | |
n_lines: AtomicUsize::new(0), | |
flush_lock: Arc::new((Mutex::new(false), Condvar::new())), | |
buffer: UnsafeCell::new(vec![String::new(); BUFFER_SIZE]), | |
} | |
} | |
} | |
#[test] | |
fn test_ordered_writer() { | |
extern crate threadpool; | |
let pool = threadpool::ThreadPool::default(); | |
let master_writer = Arc::new(OrderedWriter::new()); | |
let mut line_no = 0; | |
let mut output: Vec<u8> = Vec::new(); | |
for i in 0..10 { | |
let writer = Arc::clone(&master_writer); | |
pool.execute(move || { | |
writer.write(line_no, format!("{},", i)); | |
}); | |
line_no += 1; | |
let writer = Arc::clone(&master_writer); | |
writer.synced_flush(&mut output, Some(line_no)).unwrap(); | |
} | |
let writer = Arc::clone(&master_writer); | |
writer.last_line(line_no); | |
writer.synced_flush(&mut output, None).unwrap(); | |
assert_eq!(output, b"0,1,2,3,4,5,6,7,8,9,"); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment