diff --git a/Futures in Sequence.md b/Futures in Sequence.md index ce06385..d0e3a6b 100644 --- a/Futures in Sequence.md +++ b/Futures in Sequence.md @@ -1 +1,352 @@ # Streams: Futures in Sequence +So far we have limited ourselves to individual features. + +There has been one exception was the async channel. + +The async `recv` method produces a sequence of items over time. + +This is an instance of a much more general pattern known as a *stream*. + +Earlier we saw a sequence of items, the `Iterator` trait, there are tow differences between iterators and the async channel receiver. + +The first difference is time: iterators are synchronous, while the channel receiver is asynchronous. + +The second is the API. + +When working directly with `Iterator`, we call its synchronous `next` method. + +With the `trpl::Receiver` stream in particular, we called an asynchronous `recv` method instead. + +Otherwise, the APIs feel very similar, and this isn't similarity isn't coincidence. + +A stream is like an asynchronous form of iteration. + +The `trpl::Receiver` on the other side specifically waits to receive messages, the general-purpose stream API is much broader. + +It provides the next item the way `Iterator` does, but asynchronously. + +The similarity between iterators and streams in Rust means we can actually create a stream from any iterator. + +With any iterator, we can work with a stream by calling its `next` method then awaiting the output. + +Here is an example of this +```rust + let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let iter = values.iter().map(|n| n * 2); + let mut stream = trpl::stream_from_iter(iter); + + while let Some(value) = stream.next().await { + println!("The value was: {value}"); + } +``` +Here we start with an array of numbers, which we then convert into an iterator and then call `map` on to double all the values. + +We then convert the iterator into a stream using the `trpl::stream_from_iter` function. + +Next, we loop over the items in the stream as they arrive with the `while let` loop. + +This does not compile but instead it reports that there is no `next` method available. +``` +error[E0599]: no method named `next` found for struct `Iter` in the current scope + --> src/main.rs:10:40 + | +10 | while let Some(value) = stream.next().await { + | ^^^^ + | + = note: the full type name has been written to 'file:///projects/async_await/target/debug/deps/async_await-9de943556a6001b8.long-type-1281356139287206597.txt' + = note: consider using `--verbose` to print the full type name to the console + = help: items from traits can only be used if the trait is in scope +help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them + | +1 + use crate::trpl::StreamExt; + | +1 + use futures_util::stream::stream::StreamExt; + | +1 + use std::iter::Iterator; + | +1 + use std::str::pattern::Searcher; + | +help: there is a method `try_next` with a similar name + | +10 | while let Some(value) = stream.try_next().await { + | ~~~~~~~~ +``` +This output explains, the reason for the compiler error is that we need the right trait in scope to be able to use the `next` method. + +You may expect that trait to be in `Stream`, but it is actually in `StreamExt`. + +This is short for *extension*, `Ext` is a common pattern in the Rust community for extending one trait with another. + +We will explain the `Stream` and `StreamExt` traits in a bit more detail at the end of the chapter. + +For now know that the `Stream` trait defines a low-level interface that effectively combines the `Iterator` and `Future` traits. + +`StreamExt` supplies a higher level set of APIs on top of `Stream`, including the `next` method as well as other utility methods similar to the ones provided by the `Iterator` trait. + +`Stream` and `StreamExt` are not yet part of Rust's std library but most crates use the same definition. + +The fix for this is to add a `use` statement for `trpl::StreamExt`. +```rust +extern crate trpl; // required for mdbook test + +use trpl::StreamExt; + +fn main() { + trpl::run(async { + let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let iter = values.iter().map(|n| n * 2); + let mut stream = trpl::stream_from_iter(iter); + + while let Some(value) = stream.next().await { + println!("The value was: {value}"); + } + }); +} +``` +Now with all of the pieces put together, the code works the way we want. + +Now that we have `StreamExt` in scope, we can use all of its utility methods, just as with iterators. + +Here is an example of this, here we use the `filter` method to filter out everything but multiples of three and five. +```rust +extern crate trpl; // required for mdbook test + +use trpl::StreamExt; + +fn main() { + trpl::run(async { + let values = 1..101; + let iter = values.map(|n| n * 2); + let stream = trpl::stream_from_iter(iter); + + let mut filtered = + stream.filter(|value| value % 3 == 0 || value % 5 == 0); + + while let Some(value) = filtered.next().await { + println!("The value was: {value}"); + } + }); +} +``` +This isn't very interesting, since we could do the same with normal iterators and without any async at all. + +Now looks look at what *is* unique to streams + +## Composing Streams +Many concepts are naturally represented as streams. + +Items become available in a queue, chunks of data being pulled incrementally from the filesystem when the full data set is too large for the computer's, or data arriving over the network over time. + +An example of this is batching up events to avoid triggering too many network calls, set timeouts on sequences of long-running operations, or throttle user interface events to avoid doing needless work. + +To start we will build a little stream of messages as a stand-in for a stream of data we might see from a WebSocket, or another real-time communication protocol. + +Here is an example of this. +```rust +extern crate trpl; // required for mdbook test + +use trpl::{ReceiverStream, Stream, StreamExt}; + +fn main() { + trpl::run(async { + let mut messages = get_messages(); + + while let Some(message) = messages.next().await { + println!("{message}"); + } + }); +} + +fn get_messages() -> impl Stream { + let (tx, rx) = trpl::channel(); + + let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; + for message in messages { + tx.send(format!("Message: '{message}'")).unwrap(); + } + + ReceiverStream::new(rx) +} +``` +Here we first create a function called `get_messages` that returns `impl Stream`. + +This implementation, we create a async channel, loop over the first 10 letters of the alphabet and send them across the channel. + +Now we use a new type `ReceiverStream`. + +This converts the `rx` receiver from `trpl::channel` into a `Stream` with a `next` method. + +In `main`, we use a `while let` loop to print all the messages form the stream. + +When we run this code we get this output, as expected. +``` +Message: 'a' +Message: 'b' +Message: 'c' +Message: 'd' +Message: 'e' +Message: 'f' +Message: 'g' +Message: 'h' +Message: 'i' +Message: 'j' +``` +We could have done this with the regular `Receiver` API or even the regular `Iterator` API so lets add a feature that requires streams. + +Adding a timeout that applies to every item in the stream, and a delay on the items we emit. + +This is shown here +```rust +extern crate trpl; // required for mdbook test + +use std::{pin::pin, time::Duration}; +use trpl::{ReceiverStream, Stream, StreamExt}; + +fn main() { + trpl::run(async { + let mut messages = + pin!(get_messages().timeout(Duration::from_millis(200))); + + while let Some(result) = messages.next().await { + match result { + Ok(message) => println!("{message}"), + Err(reason) => eprintln!("Problem: {reason:?}"), + } + } + }) +} + +fn get_messages() -> impl Stream { + let (tx, rx) = trpl::channel(); + + let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; + for message in messages { + tx.send(format!("Message: '{message}'")).unwrap(); + } + + ReceiverStream::new(rx) +} +``` + +Here we add a timeout to the stream with the `timeout` method. + +This comes form the `StreamExt` trait. + +Next we update the body of the `while let` loop, because the stream now returns a `Result`. + +The `Ok` variant indicates a message arrived in time. + +The `Err` variant indicates that the timeout elapsed before any message arrived. + +We `match` this result and either print the message we receive it successfully or print a notice about the timeout. + +Notice that we pin the messages after applying the timeout to them, because the timeout helper produces a stream that needs to be pinned to be polled. + +Because there are no delays between messages, the timeout does not change the behavior of the program. + +Now lets add a variable delay to the messages we send +```rust +extern crate trpl; // required for mdbook test + +use std::{pin::pin, time::Duration}; + +use trpl::{ReceiverStream, Stream, StreamExt}; + +fn main() { + trpl::run(async { + let mut messages = + pin!(get_messages().timeout(Duration::from_millis(200))); + + while let Some(result) = messages.next().await { + match result { + Ok(message) => println!("{message}"), + Err(reason) => eprintln!("Problem: {reason:?}"), + } + } + }) +} + +fn get_messages() -> impl Stream { + let (tx, rx) = trpl::channel(); + + trpl::spawn_task(async move { + let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; + for (index, message) in messages.into_iter().enumerate() { + let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; + trpl::sleep(Duration::from_millis(time_to_sleep)).await; + + tx.send(format!("Message: '{message}'")).unwrap(); + } + }); + + ReceiverStream::new(rx) +} +``` +In `get_messages` we use the `enumerate` iterator method with the `messages` array. + +This is so that we can get the index of each item we are sending along with the item itself. + +Next we apply a 100-millisecond delay to even-index items and a 300-millisecond delay to odd-index items to simulate the different days we may see from a stream of messages in the real world. + +Because our timeout is for 200 milliseconds, this should affect half of the messages. + +To sleep between messages in the `get_messages` function without blocking, we need to use async. + +However, we cant make `get_messages` itself into an async function, because then we would return a `Future>` instead of a `Stream>`/ + +The caller would need to await `get_messages` itself to get access to the stream. + +Remember everything in a given future happens linearly; concurrency happens *between* futures. + +Awaiting `get_messages` would require it to send all the messages, including the sleep delay between each message, before returning the receiver stream + +This would make the timeout useless. + +There would be no delays in the stream itself; they would happen before the stream was even available. + +instead, we leave `get_messages` as a regular function that returns a stream, and we spawn a task to handle the async `sleep` calls. + +Note that calling `spawn_task` in this way works because we already set up our runtime. + +If we had not it would cause a panic. + +Other implementations choose different tradeoffs. + +They may spawn a new runtime and avoid the panic but end up with a but of extra overhead, or they may simply not provide a standalone way to spawn tasks without reference to a runtime. + +Make sure you know what tradeoff you runtime has chosen and write your code accordingly. + +This makes our code have a much more interesting result. + +Between every other pair of messages, a `Problem:: Elapsed(())` error. +``` +Message: 'a' +Problem: Elapsed(()) +Message: 'b' +Message: 'c' +Problem: Elapsed(()) +Message: 'd' +Message: 'e' +Problem: Elapsed(()) +Message: 'f' +Message: 'g' +Problem: Elapsed(()) +Message: 'h' +Message: 'i' +Problem: Elapsed(()) +Message: 'j' +``` +The timeout doesn't prevent the messages form arriving in the end. + +We still get all of the original messages, because our channel is *unbounded*. + +It can hold as messages as we can fit in memory. + +If the message doesn't arrive before the timeout, our stream handler will account for that, but when it polls the stream again, the message many now have arrived. + +You can get different behavior if needed by using other kinds of channels or other kinds of streams more generally. + +Lets see this by combining a stream of time intervals with this stream of messages. + +## Merging Streams + \ No newline at end of file