Skip to content

Instantly share code, notes, and snippets.

@dicej
Last active August 4, 2024 15:42
Show Gist options
  • Save dicej/ec066dc4553bf16f8a78446aa2d0e535 to your computer and use it in GitHub Desktop.
Save dicej/ec066dc4553bf16f8a78446aa2d0e535 to your computer and use it in GitHub Desktop.
use futures::{
future::{Future, IntoFuture},
sync::oneshot,
};
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
};
struct Inner<T> {
item: Option<T>,
wait_list: VecDeque<oneshot::Sender<T>>,
}
// `Resource` is like an asynchronous `Mutex` (and can probably be replaced by the futures 0.3 `Mutex` when we
// upgrade to that version). The `with` method can be used to acquire the wrapped resource asynchronously and pass
// it to a lambda which returns a future that releases the resource so it can be used by any waiting tasks.
//
// Usage:
// let shared_sink = resource::Resource::new(my_sink);
// let shared_sink_clone = shared_sink.clone();
// ...
// let future = shared_sink.with(|sink| sink.send("hello, world!"));
// ...
// let other_future = shared_sink_clone.with(|sink| sink.send("hello from elsewhere!"));
#[derive(Clone)]
pub struct Resource<T> {
inner: Arc<Mutex<Inner<T>>>,
}
fn next<T>(inner: Arc<Mutex<Inner<T>>>, item: T) {
let mut inner = inner.lock().unwrap();
if let Some(tx) = inner.wait_list.pop_front() {
let _ = tx.send(item);
} else {
inner.item = Some(item)
}
}
impl<T: Send + 'static> Resource<T> {
pub fn new(item: T) -> Self {
Resource {
inner: Arc::new(Mutex::new(Inner {
item: Some(item),
wait_list: VecDeque::new(),
})),
}
}
pub fn with<E, B: IntoFuture<Item = T, Error = E>, F: FnOnce(T) -> B + Sync + Send + 'static>(
&self,
fun: F,
) -> Box<dyn Future<Item = (), Error = E> + Send>
where
B::Future: Send + 'static,
{
let mut inner = self.inner.lock().unwrap();
if let Some(item) = inner.item.take() {
drop(inner);
let inner = self.inner.clone();
Box::new(fun(item).into_future().map(|item| next(inner, item)))
as Box<dyn Future<Item = (), Error = E> + Send>
} else {
let (tx, rx) = oneshot::channel();
inner.wait_list.push_back(tx);
drop(inner);
let inner = self.inner.clone();
Box::new(
rx.map_err(|_| unreachable!())
.and_then(move |item| fun(item).into_future().map(|item| next(inner, item))),
)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment