Skip to content

Instantly share code, notes, and snippets.

@aleics
Created January 8, 2021 06:55
Show Gist options
  • Save aleics/5c9905e14872c8462040f5f524508a5a to your computer and use it in GitHub Desktop.
Save aleics/5c9905e14872c8462040f5f524508a5a to your computer and use it in GitHub Desktop.
Clone streams by using Bus
use bus::{Bus, BusReader};
use futures::task::{Context, Poll};
use futures::Stream;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
/// `SourceInner` contains the reference to the Bus. `SourceInner` is available to the
/// `Source` instance, as well as its connected `SourceNode`. The `inner` needs to be wrapped
/// around a `Mutex` so that it's possible to include new receivers, and around a `Arc` so that
/// the instance is cloneable, potentially from multiple threads.
pub struct SourceInner<T> {
inner: Arc<Mutex<Bus<T>>>,
}
impl<T> SourceInner<T> {
pub fn new(bus: Bus<T>) -> SourceInner<T> {
SourceInner {
inner: Arc::new(Mutex::new(bus)),
}
}
/// Add a new receiver to the source Bus
pub fn add_rx(&self) -> BusReader<T> {
self.inner.lock().unwrap().add_rx()
}
/// Broadcast a value to all the connected nodes
pub fn try_broadcast(&self, value: T) -> Result<(), T> {
self.inner.lock().unwrap().try_broadcast(value)
}
}
impl<T> Clone for SourceInner<T> {
fn clone(&self) -> Self {
SourceInner {
inner: self.inner.clone(),
}
}
}
/// `Source` initializes the `Bus` as well as it plays as the broadcaster to all the connected
/// nodes.
pub struct Source<T> {
inner: SourceInner<Option<T>>,
}
impl<T: Clone> Source<T> {
pub fn new() -> Self {
Source::default()
}
/// Links a new `SourceNode` to the `Source`
pub fn link(&self) -> SourceNode<T> {
SourceNode::new(self.inner.clone())
}
/// Broadcasts a chunk of data to all the connected nodes. A chunk of data would terminate
/// by sending a `None` as a value. Therefore, we would allow to run the stream only for a
/// chunk of data, keeping the original bus open for sending multiple chunks, if necessary.
pub fn send_chunk(&self, chunk: Vec<T>) -> Result<(), Option<T>> {
for value in chunk {
self.inner.try_broadcast(Some(value))?;
}
self.inner.try_broadcast(None)?;
Ok(())
}
}
impl<T> Default for Source<T> {
fn default() -> Self {
Source {
inner: SourceInner::new(Bus::new(10)),
}
}
}
/// `SourceNode` defines a connected receiver to the source. `SourceNode` keeps a `SourceInner`
/// reference so that it can add new receivers when cloning the instance, independently from the
/// original `Source`.
pub struct SourceNode<T> {
source: SourceInner<Option<T>>,
receiver: BusReader<Option<T>>,
}
impl<T> SourceNode<T> {
/// Creates a new `SourceNode` instance given a `SourceInner`. When generating the instance,
/// a new receiver is attached to the `Bus`.
pub fn new(source: SourceInner<Option<T>>) -> Self {
let receiver = source.add_rx();
SourceNode { source, receiver }
}
}
impl<T: Clone> Clone for SourceNode<T> {
/// Cloning the `SourceNode` instance creates a new reference to the shared `Bus` and it adds the
/// new cloned `SourceNode` as a receiver to the `Bus`.
fn clone(&self) -> Self {
SourceNode::new(self.source.clone())
}
}
impl Stream for SourceNode<f32> {
type Item = f32;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(loop {
if let Some(item) = self.receiver.recv().ok() {
break item;
}
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment