Last active
April 12, 2024 02:04
-
-
Save mkatychev/aac26524730238cef151f11d715f2891 to your computer and use it in GitHub Desktop.
Cancellable stream draft
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{future::Future, time::Duration}; | |
use bigerror::{LogError, NetworkError, OptionReport, Report, ReportAs, ResultExt}; | |
use futures::{future, pin_mut}; | |
use stream_cancel::TakeUntilIf; | |
use tokio::{ | |
sync::mpsc::{UnboundedSender, WeakUnboundedSender}, | |
task::JoinHandle, | |
}; | |
use tokio_stream::wrappers::UnboundedReceiverStream; | |
use tokio_util::sync::CancellationToken; | |
// to avoid adding tokio_util to all dependents; | |
pub use tokio_util::sync::DropGuard; | |
use tracing::{debug, error, info, instrument, trace, warn, Instrument}; | |
pub fn unbounded_stream<T>() -> (UnboundedSender<T>, UnboundedReceiverStream<T>) { | |
let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); | |
(tx, UnboundedReceiverStream::new(rx)) | |
} | |
/// Convenience type alias to quickly access a future's associated type allowing | |
/// one to quickly specify constraints for the output type of a given future: | |
/// ``` | |
/// F: Future, | |
/// F::Output: Send + Sync | |
/// ``` | |
pub type FutOut<F> = <F as Future>::Output; | |
/// Spawn a future that is joined with a [`CancellationToken`] future, | |
/// resolving with the first completed future | |
pub fn spawn_cancellable<F>( | |
future: F, | |
token: CancellationToken, | |
cancellation_out: impl Fn() -> FutOut<F> + Send + 'static, | |
) -> JoinHandle<FutOut<F>> | |
where | |
F: Future + Send + 'static, | |
FutOut<F>: Send, | |
{ | |
let cancel_fut = async move { | |
token.cancelled_owned().await; | |
trace!("token cancelled"); | |
}; | |
tokio::spawn( | |
async move { | |
pin_mut!(cancel_fut); | |
pin_mut!(future); | |
let either = future::select(future, cancel_fut).in_current_span().await; | |
match either { | |
future::Either::Left((fut, _token)) => fut, | |
// if our `CancellationToken` future resolves, use the given closure | |
// to return an object that satisfies the `F::Output` type signature | |
future::Either::Right(((), _)) => cancellation_out(), | |
} | |
} | |
.in_current_span(), | |
) | |
} | |
// spawn a cancellable future that resolves to `()`/unit | |
pub fn spawn_cancellable_unit( | |
future: impl Future<Output = ()> + Send + 'static, | |
token: CancellationToken, | |
) -> JoinHandle<()> { | |
let cancel_fut = async move { | |
token.cancelled_owned().await; | |
trace!("token cancelled"); | |
}; | |
tokio::spawn( | |
async move { | |
pin_mut!(cancel_fut); | |
pin_mut!(future); | |
future::select(future, cancel_fut).in_current_span().await; | |
} | |
.in_current_span(), | |
) | |
} | |
/// Spawn a keepalive cancellabel keepalive task | |
pub fn spawn_keepalive_token<T>( | |
interval: Duration, | |
tx: WeakUnboundedSender<T>, | |
message: T, | |
token: CancellationToken, | |
) where | |
T: Clone + std::fmt::Debug + Send + Sync + 'static, | |
{ | |
let keepalive_fut = async move { | |
loop { | |
tokio::time::sleep(interval).await; | |
let tx = tx.upgrade().expect_or().change_context(NetworkError)?; | |
trace!("sending keepalive"); | |
// Send keep alive packet | |
match tx | |
.send(message.clone()) | |
.report_as::<NetworkError>() | |
.and_log_err() | |
{ | |
Ok(_) => {} | |
Err(e) => return Err::<(), Report<NetworkError>>(e), | |
} | |
} | |
}; | |
spawn_cancellable(keepalive_fut, token, || { | |
debug!("keepalive task closed"); | |
Ok(()) | |
}); | |
} | |
pub trait OptionFuture { | |
fn fut(self) -> futures::future::OptionFuture<impl Future>; | |
} | |
impl<F: Future> OptionFuture for Option<F> { | |
// it's easier to define an opaque return on the trait for now | |
#[allow(refining_impl_trait)] | |
fn fut(self) -> futures::future::OptionFuture<F> { | |
futures::future::OptionFuture::from(self) | |
} | |
} | |
/// Trait meant to extend `stream_cancel::StreamExt::take_until_cancelled` | |
/// to allow a given stream to take in a [`CancellationToken`] and `.await` it, | |
/// immediately breaking the stream once the token's future resolves. | |
pub trait StreamCancel: stream_cancel::StreamExt | |
where | |
Self: Sized, | |
{ | |
fn take_until_cancelled( | |
self, | |
token: CancellationToken, | |
) -> TakeUntilIf<Self, impl Future<Output = bool>>; | |
} | |
impl<S> StreamCancel for S | |
where | |
S: stream_cancel::StreamExt, | |
{ | |
fn take_until_cancelled( | |
self, | |
token: CancellationToken, | |
) -> TakeUntilIf<Self, impl Future<Output = bool>> { | |
self.take_until_if(async move { | |
token.cancelled().await; | |
warn!("token cancelled"); | |
true | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment