Created
January 8, 2021 06:55
-
-
Save aleics/5c9905e14872c8462040f5f524508a5a to your computer and use it in GitHub Desktop.
Clone streams by using Bus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use bus::{Bus, BusReader}; | |
use futures::task::{Context, Poll}; | |
use futures::Stream; | |
use std::pin::Pin; | |
use std::sync::{Arc, Mutex}; | |
/// `SourceInner` contains the reference to the Bus. `SourceInner` is available to the | |
/// `Source` instance, as well as its connected `SourceNode`. The `inner` needs to be wrapped | |
/// around a `Mutex` so that it's possible to include new receivers, and around a `Arc` so that | |
/// the instance is cloneable, potentially from multiple threads. | |
pub struct SourceInner<T> { | |
inner: Arc<Mutex<Bus<T>>>, | |
} | |
impl<T> SourceInner<T> { | |
pub fn new(bus: Bus<T>) -> SourceInner<T> { | |
SourceInner { | |
inner: Arc::new(Mutex::new(bus)), | |
} | |
} | |
/// Add a new receiver to the source Bus | |
pub fn add_rx(&self) -> BusReader<T> { | |
self.inner.lock().unwrap().add_rx() | |
} | |
/// Broadcast a value to all the connected nodes | |
pub fn try_broadcast(&self, value: T) -> Result<(), T> { | |
self.inner.lock().unwrap().try_broadcast(value) | |
} | |
} | |
impl<T> Clone for SourceInner<T> { | |
fn clone(&self) -> Self { | |
SourceInner { | |
inner: self.inner.clone(), | |
} | |
} | |
} | |
/// `Source` initializes the `Bus` as well as it plays as the broadcaster to all the connected | |
/// nodes. | |
pub struct Source<T> { | |
inner: SourceInner<Option<T>>, | |
} | |
impl<T: Clone> Source<T> { | |
pub fn new() -> Self { | |
Source::default() | |
} | |
/// Links a new `SourceNode` to the `Source` | |
pub fn link(&self) -> SourceNode<T> { | |
SourceNode::new(self.inner.clone()) | |
} | |
/// Broadcasts a chunk of data to all the connected nodes. A chunk of data would terminate | |
/// by sending a `None` as a value. Therefore, we would allow to run the stream only for a | |
/// chunk of data, keeping the original bus open for sending multiple chunks, if necessary. | |
pub fn send_chunk(&self, chunk: Vec<T>) -> Result<(), Option<T>> { | |
for value in chunk { | |
self.inner.try_broadcast(Some(value))?; | |
} | |
self.inner.try_broadcast(None)?; | |
Ok(()) | |
} | |
} | |
impl<T> Default for Source<T> { | |
fn default() -> Self { | |
Source { | |
inner: SourceInner::new(Bus::new(10)), | |
} | |
} | |
} | |
/// `SourceNode` defines a connected receiver to the source. `SourceNode` keeps a `SourceInner` | |
/// reference so that it can add new receivers when cloning the instance, independently from the | |
/// original `Source`. | |
pub struct SourceNode<T> { | |
source: SourceInner<Option<T>>, | |
receiver: BusReader<Option<T>>, | |
} | |
impl<T> SourceNode<T> { | |
/// Creates a new `SourceNode` instance given a `SourceInner`. When generating the instance, | |
/// a new receiver is attached to the `Bus`. | |
pub fn new(source: SourceInner<Option<T>>) -> Self { | |
let receiver = source.add_rx(); | |
SourceNode { source, receiver } | |
} | |
} | |
impl<T: Clone> Clone for SourceNode<T> { | |
/// Cloning the `SourceNode` instance creates a new reference to the shared `Bus` and it adds the | |
/// new cloned `SourceNode` as a receiver to the `Bus`. | |
fn clone(&self) -> Self { | |
SourceNode::new(self.source.clone()) | |
} | |
} | |
impl Stream for SourceNode<f32> { | |
type Item = f32; | |
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | |
Poll::Ready(loop { | |
if let Some(item) = self.receiver.recv().ok() { | |
break item; | |
} | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment