Created
December 6, 2017 17:52
-
-
Save gmbeard/ca70e89e4ec9ed98436b24b79ee02c29 to your computer and use it in GitHub Desktop.
Rough implementation of Rust's Future / Stream traits, to demonstrate their combinators and ownership semantics
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::mem; | |
enum Chain<T, F, U> { | |
First(T, F), | |
Second(U), | |
Done, | |
} | |
impl<T, F, U> Future for Chain<T, F, U> | |
where T: Future, | |
U: Future, | |
U::Error: From<T::Error>, | |
F: FnOnce(T::Item) -> U, | |
{ | |
type Item = U::Item; | |
type Error = U::Error; | |
fn poll(&mut self) -> Result<PollResult<Self::Item>, Self::Error> { | |
let result = match *self { | |
Chain::First(ref mut val, _) => match val.poll() { | |
Ok(PollResult::Pending) => return Ok(PollResult::Pending), | |
Ok(PollResult::Complete(r)) => Ok(r), | |
Err(e) => Err(e), | |
}, | |
Chain::Second(ref mut val) => return val.poll(), | |
Chain::Done => panic!("poll called on finished result!"), | |
}; | |
let next = match mem::replace(self, Chain::Done) { | |
Chain::First(_, f) => { | |
let mut next = f(result?); | |
match next.poll()? { | |
PollResult::Complete(r) => return Ok(PollResult::Complete(r)), | |
_ => Chain::Second(next), | |
} | |
} | |
_ => unreachable!(), | |
}; | |
*self = next; | |
Ok(PollResult::Pending) | |
} | |
} | |
enum PollResult<T> { | |
Pending, | |
Complete(T), | |
} | |
/// A trait to represent a piece of work that will complete | |
/// after one or more calls to its `poll` function. I.e. work | |
/// that will complete at some point in the future. | |
trait Future { | |
type Item; | |
type Error; | |
/// This function must be called periodically in order | |
/// for the operation to make progress. | |
fn poll(&mut self) -> Result<PollResult<Self::Item>, Self::Error>; | |
/// A combinator function that *consumes* self and returns | |
/// a type that also implements `Future`. | |
/// | |
/// The important thing to note about this is that `F` must return | |
/// another type implementing `Future`. | |
/// | |
/// The type returned by `and_then` will first `poll` `self` and | |
/// then `poll` the future returned by `F` | |
fn and_then<F, U>(self, f: F) -> Chain<Self, F, U> | |
where F: FnOnce(Self::Item) -> U, | |
Self: Sized, | |
U: Future, | |
{ | |
Chain::First(self, f) | |
} | |
} | |
/// Similar to `Future` this type represents work that will complete | |
/// at some point in the future. It differs in that it will deliver | |
/// items one after another until there are none left. | |
trait Stream { | |
type Item; | |
type Error; | |
/// This function must be called periodically in order | |
/// for the operation to make progress. Once this function | |
/// returns `Ok(PollResult::Complete(None))` then all the | |
/// work is complete. | |
fn poll(&mut self) -> Result<PollResult<Option<Self::Item>>, Self::Error>; | |
/// A combinator that *consumes* `self` and will call `F` | |
/// for every item in the stream. The return type of `for_each` | |
/// implements `Future` and will only complete when the stream | |
/// completes. | |
fn for_each<F>(self, f: F) -> ForEach<Self, F> | |
where F: Fn(Self::Item), | |
Self: Sized, | |
{ | |
ForEach(self, f) | |
} | |
} | |
struct ForEach<T: Stream, F>(T, F); | |
impl<T, F, U> Future for ForEach<T, F> | |
where T: Stream, | |
F: Fn(T::Item) -> U, | |
{ | |
type Item = (); | |
type Error = T::Error; | |
fn poll(&mut self) -> Result<PollResult<Self::Item>, Self::Error> { | |
let value = match self.0.poll()? { | |
PollResult::Complete(Some(val)) => val, | |
PollResult::Complete(None) => return Ok(PollResult::Complete(())), | |
PollResult::Pending => return Ok(PollResult::Pending), | |
}; | |
(self.1)(value); | |
Ok(PollResult::Pending) | |
} | |
} | |
/// A `Future` implementing type that will complete on the | |
/// 2nd `poll` attempt. | |
enum PollTwice<T> { | |
First(T), | |
Second(T), | |
Done, | |
} | |
impl<T> Future for PollTwice<T> { | |
type Item = T; | |
type Error = (); | |
fn poll(&mut self) -> Result<PollResult<Self::Item>, Self::Error> { | |
eprintln!("PollTwice::poll()"); | |
let next = match mem::replace(self, PollTwice::Done) { | |
PollTwice::First(val) => PollTwice::Second(val), | |
PollTwice::Second(val) => return Ok(PollResult::Complete(val)), | |
PollTwice::Done => panic!("PollTwice can only be polled twice!"), | |
}; | |
*self = next; | |
Ok(PollResult::Pending) | |
} | |
} | |
/// A `Future` implementing type that encapsulates a `Channel`, | |
/// returning ownership once it completes | |
struct MyThing(PollTwice<Channel>); | |
impl MyThing { | |
fn new(channel: Channel) -> MyThing { | |
MyThing(PollTwice::First(channel)) | |
} | |
} | |
impl Future for MyThing { | |
type Item = Channel; | |
type Error = (); | |
fn poll(&mut self) -> Result<PollResult<Self::Item>, Self::Error> { | |
self.0.poll() | |
} | |
} | |
/// This type is able to perform the following async operations: | |
/// | |
/// - `do_something` | |
/// - `do_something_else` | |
/// - `do_another_thing` | |
/// | |
/// Each of these will return a `Future` type, *consuming* `self` in | |
/// the process. | |
struct Channel; | |
impl Channel { | |
/// Creates a `Future` that will *do something*. This function | |
/// consumes self so that we can pass it to a downstream `Future`. | |
fn do_something(self) -> ChannelOp { | |
ChannelOp(PollTwice::First(self)) | |
} | |
/// Creates a `Future` that will *do something else*. This function | |
/// consumes self so that we can pass it to a downstream `Future`. | |
fn do_something_else(self) -> ChannelOp { | |
ChannelOp(PollTwice::First(self)) | |
} | |
/// Creates a `Future` that will *do another thing*. This function | |
/// consumes self so that we can pass it to a downstream `Future`. | |
/// | |
/// The result of the future created from this will be a tuple of | |
/// `(Channel, S: Stream)`. | |
fn do_another_thing(self) -> ChannelStreamOp { | |
ChannelStreamOp( | |
PollTwice::First(( | |
self, | |
// A `Stream` that counts to 4, resolving | |
// each value on the second attempt (just to | |
// simulate an asynchronous operation). | |
ChannelStream(vec![ | |
PollTwice::First(1), | |
PollTwice::First(2), | |
PollTwice::First(3), | |
PollTwice::First(4), | |
]) | |
)) | |
) | |
} | |
/// This function doesn't return anything. It will just *do the | |
/// last thing* without consuming `self` | |
fn do_the_last_thing(&self, val: usize) { | |
println!("{}", val); | |
} | |
} | |
/// A `Stream` implementing type that just pops the front of | |
/// its internal `Vec<_>` until there are no values left. | |
struct ChannelStream(Vec<PollTwice<usize>>); | |
impl Stream for ChannelStream { | |
type Item = usize; | |
type Error = (); | |
fn poll(&mut self) -> Result<PollResult<Option<Self::Item>>, Self::Error> { | |
let result = match self.0.get_mut(0) { | |
None => return Ok(PollResult::Complete(None)), | |
Some(item) => match item.poll()? { | |
PollResult::Pending => return Ok(PollResult::Pending), | |
PollResult::Complete(item) => item, | |
}, | |
}; | |
self.0.remove(0); | |
Ok(PollResult::Complete(Some(result))) | |
} | |
} | |
/// A `Future` implementing type that represents async work perfomed | |
/// by a `Channel`. Internally, we just simulate this work with | |
/// a `PollTwice` instance. | |
struct ChannelOp(PollTwice<Channel>); | |
impl Future for ChannelOp { | |
type Item = Channel; | |
type Error = (); | |
fn poll(&mut self) -> Result<PollResult<Self::Item>, Self::Error> { | |
self.0.poll() | |
} | |
} | |
/// A `Future` implementing type that represents async work perfomed | |
/// by a `Channel` and results in a `Stream` type that requires further | |
/// work. Internally, we just simulate this work with a `PollTwice` | |
/// instance. | |
struct ChannelStreamOp(PollTwice<(Channel, ChannelStream)>); | |
impl Future for ChannelStreamOp { | |
type Item = (Channel, ChannelStream); | |
type Error = (); | |
fn poll(&mut self) -> Result<PollResult<Self::Item>, Self::Error> { | |
self.0.poll() | |
} | |
} | |
fn main() { | |
let my_thing = MyThing::new(Channel); | |
let mut future = my_thing.and_then(|channel| { | |
// The `do_...` functions (apart from the last) consume | |
// `channel` (take ownership) so it can be passed down | |
// the line of `and_then` calls... | |
channel.do_something() // Returns a `Future` | |
.and_then(|ch| { | |
ch.do_something_else() // Returns a `Future` | |
}) | |
.and_then(|ch| { | |
ch.do_another_thing() // Returns a `Future` | |
}) | |
.and_then(|(ch, stream)| { | |
// The result of `for_each` implements `Future`. It | |
// will complete when all of its `Stream`'s items are | |
// exhausted (or it reports an error). We can `move` | |
// `ch` into the closure because nothing else needs | |
// it at this stage... | |
stream.for_each(move |n| { | |
println!("{}", n); | |
ch.do_the_last_thing(n); | |
}) | |
}) | |
}); | |
// `future` is now just a big *state machine*. We need to poll | |
// it to completion (or error)... | |
loop { | |
if let PollResult::Complete(_) = future.poll().unwrap() { | |
break; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment