mirror of
https://github.com/darkicewolf50/RustBrock.git
synced 2025-06-15 04:54:17 -06:00
573 lines
20 KiB
Markdown
573 lines
20 KiB
Markdown
# Streams: Futures in Sequence
|
|
So far we have limited ourselves to individual features.
|
|
|
|
There has been one exception was the async channel.
|
|
|
|
The async `recv` method produces a sequence of items over time.
|
|
|
|
This is an instance of a much more general pattern known as a *stream*.
|
|
|
|
Earlier we saw a sequence of items, the `Iterator` trait, there are tow differences between iterators and the async channel receiver.
|
|
|
|
The first difference is time: iterators are synchronous, while the channel receiver is asynchronous.
|
|
|
|
The second is the API.
|
|
|
|
When working directly with `Iterator`, we call its synchronous `next` method.
|
|
|
|
With the `trpl::Receiver` stream in particular, we called an asynchronous `recv` method instead.
|
|
|
|
Otherwise, the APIs feel very similar, and this isn't similarity isn't coincidence.
|
|
|
|
A stream is like an asynchronous form of iteration.
|
|
|
|
The `trpl::Receiver` on the other side specifically waits to receive messages, the general-purpose stream API is much broader.
|
|
|
|
It provides the next item the way `Iterator` does, but asynchronously.
|
|
|
|
The similarity between iterators and streams in Rust means we can actually create a stream from any iterator.
|
|
|
|
With any iterator, we can work with a stream by calling its `next` method then awaiting the output.
|
|
|
|
Here is an example of this
|
|
```rust
|
|
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
|
|
let iter = values.iter().map(|n| n * 2);
|
|
let mut stream = trpl::stream_from_iter(iter);
|
|
|
|
while let Some(value) = stream.next().await {
|
|
println!("The value was: {value}");
|
|
}
|
|
```
|
|
Here we start with an array of numbers, which we then convert into an iterator and then call `map` on to double all the values.
|
|
|
|
We then convert the iterator into a stream using the `trpl::stream_from_iter` function.
|
|
|
|
Next, we loop over the items in the stream as they arrive with the `while let` loop.
|
|
|
|
This does not compile but instead it reports that there is no `next` method available.
|
|
```
|
|
error[E0599]: no method named `next` found for struct `Iter` in the current scope
|
|
--> src/main.rs:10:40
|
|
|
|
|
10 | while let Some(value) = stream.next().await {
|
|
| ^^^^
|
|
|
|
|
= note: the full type name has been written to 'file:///projects/async_await/target/debug/deps/async_await-9de943556a6001b8.long-type-1281356139287206597.txt'
|
|
= note: consider using `--verbose` to print the full type name to the console
|
|
= help: items from traits can only be used if the trait is in scope
|
|
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
|
|
|
|
1 + use crate::trpl::StreamExt;
|
|
|
|
|
1 + use futures_util::stream::stream::StreamExt;
|
|
|
|
|
1 + use std::iter::Iterator;
|
|
|
|
|
1 + use std::str::pattern::Searcher;
|
|
|
|
|
help: there is a method `try_next` with a similar name
|
|
|
|
|
10 | while let Some(value) = stream.try_next().await {
|
|
| ~~~~~~~~
|
|
```
|
|
This output explains, the reason for the compiler error is that we need the right trait in scope to be able to use the `next` method.
|
|
|
|
You may expect that trait to be in `Stream`, but it is actually in `StreamExt`.
|
|
|
|
This is short for *extension*, `Ext` is a common pattern in the Rust community for extending one trait with another.
|
|
|
|
We will explain the `Stream` and `StreamExt` traits in a bit more detail at the end of the chapter.
|
|
|
|
For now know that the `Stream` trait defines a low-level interface that effectively combines the `Iterator` and `Future` traits.
|
|
|
|
`StreamExt` supplies a higher level set of APIs on top of `Stream`, including the `next` method as well as other utility methods similar to the ones provided by the `Iterator` trait.
|
|
|
|
`Stream` and `StreamExt` are not yet part of Rust's std library but most crates use the same definition.
|
|
|
|
The fix for this is to add a `use` statement for `trpl::StreamExt`.
|
|
```rust
|
|
extern crate trpl; // required for mdbook test
|
|
|
|
use trpl::StreamExt;
|
|
|
|
fn main() {
|
|
trpl::run(async {
|
|
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
|
|
let iter = values.iter().map(|n| n * 2);
|
|
let mut stream = trpl::stream_from_iter(iter);
|
|
|
|
while let Some(value) = stream.next().await {
|
|
println!("The value was: {value}");
|
|
}
|
|
});
|
|
}
|
|
```
|
|
Now with all of the pieces put together, the code works the way we want.
|
|
|
|
Now that we have `StreamExt` in scope, we can use all of its utility methods, just as with iterators.
|
|
|
|
Here is an example of this, here we use the `filter` method to filter out everything but multiples of three and five.
|
|
```rust
|
|
extern crate trpl; // required for mdbook test
|
|
|
|
use trpl::StreamExt;
|
|
|
|
fn main() {
|
|
trpl::run(async {
|
|
let values = 1..101;
|
|
let iter = values.map(|n| n * 2);
|
|
let stream = trpl::stream_from_iter(iter);
|
|
|
|
let mut filtered =
|
|
stream.filter(|value| value % 3 == 0 || value % 5 == 0);
|
|
|
|
while let Some(value) = filtered.next().await {
|
|
println!("The value was: {value}");
|
|
}
|
|
});
|
|
}
|
|
```
|
|
This isn't very interesting, since we could do the same with normal iterators and without any async at all.
|
|
|
|
Now looks look at what *is* unique to streams
|
|
|
|
## Composing Streams
|
|
Many concepts are naturally represented as streams.
|
|
|
|
Items become available in a queue, chunks of data being pulled incrementally from the filesystem when the full data set is too large for the computer's, or data arriving over the network over time.
|
|
|
|
An example of this is batching up events to avoid triggering too many network calls, set timeouts on sequences of long-running operations, or throttle user interface events to avoid doing needless work.
|
|
|
|
To start we will build a little stream of messages as a stand-in for a stream of data we might see from a WebSocket, or another real-time communication protocol.
|
|
|
|
Here is an example of this.
|
|
```rust
|
|
extern crate trpl; // required for mdbook test
|
|
|
|
use trpl::{ReceiverStream, Stream, StreamExt};
|
|
|
|
fn main() {
|
|
trpl::run(async {
|
|
let mut messages = get_messages();
|
|
|
|
while let Some(message) = messages.next().await {
|
|
println!("{message}");
|
|
}
|
|
});
|
|
}
|
|
|
|
fn get_messages() -> impl Stream<Item = String> {
|
|
let (tx, rx) = trpl::channel();
|
|
|
|
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
|
|
for message in messages {
|
|
tx.send(format!("Message: '{message}'")).unwrap();
|
|
}
|
|
|
|
ReceiverStream::new(rx)
|
|
}
|
|
```
|
|
Here we first create a function called `get_messages` that returns `impl Stream<Item = String>`.
|
|
|
|
This implementation, we create a async channel, loop over the first 10 letters of the alphabet and send them across the channel.
|
|
|
|
Now we use a new type `ReceiverStream`.
|
|
|
|
This converts the `rx` receiver from `trpl::channel` into a `Stream` with a `next` method.
|
|
|
|
In `main`, we use a `while let` loop to print all the messages form the stream.
|
|
|
|
When we run this code we get this output, as expected.
|
|
```
|
|
Message: 'a'
|
|
Message: 'b'
|
|
Message: 'c'
|
|
Message: 'd'
|
|
Message: 'e'
|
|
Message: 'f'
|
|
Message: 'g'
|
|
Message: 'h'
|
|
Message: 'i'
|
|
Message: 'j'
|
|
```
|
|
We could have done this with the regular `Receiver` API or even the regular `Iterator` API so lets add a feature that requires streams.
|
|
|
|
Adding a timeout that applies to every item in the stream, and a delay on the items we emit.
|
|
|
|
This is shown here
|
|
```rust
|
|
extern crate trpl; // required for mdbook test
|
|
|
|
use std::{pin::pin, time::Duration};
|
|
use trpl::{ReceiverStream, Stream, StreamExt};
|
|
|
|
fn main() {
|
|
trpl::run(async {
|
|
let mut messages =
|
|
pin!(get_messages().timeout(Duration::from_millis(200)));
|
|
|
|
while let Some(result) = messages.next().await {
|
|
match result {
|
|
Ok(message) => println!("{message}"),
|
|
Err(reason) => eprintln!("Problem: {reason:?}"),
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
fn get_messages() -> impl Stream<Item = String> {
|
|
let (tx, rx) = trpl::channel();
|
|
|
|
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
|
|
for message in messages {
|
|
tx.send(format!("Message: '{message}'")).unwrap();
|
|
}
|
|
|
|
ReceiverStream::new(rx)
|
|
}
|
|
```
|
|
|
|
Here we add a timeout to the stream with the `timeout` method.
|
|
|
|
This comes form the `StreamExt` trait.
|
|
|
|
Next we update the body of the `while let` loop, because the stream now returns a `Result`.
|
|
|
|
The `Ok` variant indicates a message arrived in time.
|
|
|
|
The `Err` variant indicates that the timeout elapsed before any message arrived.
|
|
|
|
We `match` this result and either print the message we receive it successfully or print a notice about the timeout.
|
|
|
|
Notice that we pin the messages after applying the timeout to them, because the timeout helper produces a stream that needs to be pinned to be polled.
|
|
|
|
Because there are no delays between messages, the timeout does not change the behavior of the program.
|
|
|
|
Now lets add a variable delay to the messages we send
|
|
```rust
|
|
extern crate trpl; // required for mdbook test
|
|
|
|
use std::{pin::pin, time::Duration};
|
|
|
|
use trpl::{ReceiverStream, Stream, StreamExt};
|
|
|
|
fn main() {
|
|
trpl::run(async {
|
|
let mut messages =
|
|
pin!(get_messages().timeout(Duration::from_millis(200)));
|
|
|
|
while let Some(result) = messages.next().await {
|
|
match result {
|
|
Ok(message) => println!("{message}"),
|
|
Err(reason) => eprintln!("Problem: {reason:?}"),
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
fn get_messages() -> impl Stream<Item = String> {
|
|
let (tx, rx) = trpl::channel();
|
|
|
|
trpl::spawn_task(async move {
|
|
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
|
|
for (index, message) in messages.into_iter().enumerate() {
|
|
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
|
|
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
|
|
|
|
tx.send(format!("Message: '{message}'")).unwrap();
|
|
}
|
|
});
|
|
|
|
ReceiverStream::new(rx)
|
|
}
|
|
```
|
|
In `get_messages` we use the `enumerate` iterator method with the `messages` array.
|
|
|
|
This is so that we can get the index of each item we are sending along with the item itself.
|
|
|
|
Next we apply a 100-millisecond delay to even-index items and a 300-millisecond delay to odd-index items to simulate the different days we may see from a stream of messages in the real world.
|
|
|
|
Because our timeout is for 200 milliseconds, this should affect half of the messages.
|
|
|
|
To sleep between messages in the `get_messages` function without blocking, we need to use async.
|
|
|
|
However, we cant make `get_messages` itself into an async function, because then we would return a `Future<Output = Stream<Item = String>>` instead of a `Stream<Item = String>>`/
|
|
|
|
The caller would need to await `get_messages` itself to get access to the stream.
|
|
|
|
Remember everything in a given future happens linearly; concurrency happens *between* futures.
|
|
|
|
Awaiting `get_messages` would require it to send all the messages, including the sleep delay between each message, before returning the receiver stream
|
|
|
|
This would make the timeout useless.
|
|
|
|
There would be no delays in the stream itself; they would happen before the stream was even available.
|
|
|
|
instead, we leave `get_messages` as a regular function that returns a stream, and we spawn a task to handle the async `sleep` calls.
|
|
|
|
Note that calling `spawn_task` in this way works because we already set up our runtime.
|
|
|
|
If we had not it would cause a panic.
|
|
|
|
Other implementations choose different tradeoffs.
|
|
|
|
They may spawn a new runtime and avoid the panic but end up with a but of extra overhead, or they may simply not provide a standalone way to spawn tasks without reference to a runtime.
|
|
|
|
Make sure you know what tradeoff you runtime has chosen and write your code accordingly.
|
|
|
|
This makes our code have a much more interesting result.
|
|
|
|
Between every other pair of messages, a `Problem:: Elapsed(())` error.
|
|
```
|
|
Message: 'a'
|
|
Problem: Elapsed(())
|
|
Message: 'b'
|
|
Message: 'c'
|
|
Problem: Elapsed(())
|
|
Message: 'd'
|
|
Message: 'e'
|
|
Problem: Elapsed(())
|
|
Message: 'f'
|
|
Message: 'g'
|
|
Problem: Elapsed(())
|
|
Message: 'h'
|
|
Message: 'i'
|
|
Problem: Elapsed(())
|
|
Message: 'j'
|
|
```
|
|
The timeout doesn't prevent the messages form arriving in the end.
|
|
|
|
We still get all of the original messages, because our channel is *unbounded*.
|
|
|
|
It can hold as messages as we can fit in memory.
|
|
|
|
If the message doesn't arrive before the timeout, our stream handler will account for that, but when it polls the stream again, the message many now have arrived.
|
|
|
|
You can get different behavior if needed by using other kinds of channels or other kinds of streams more generally.
|
|
|
|
Lets see this by combining a stream of time intervals with this stream of messages.
|
|
|
|
## Merging Streams
|
|
We will start by creating another stream, this will emit and item every millisecond if we let it run directly.
|
|
|
|
For simplicity we will use the `sleep` function to send a message on a delay and combine it with the same approach we used in `get_messages` of creating a stream from a channel.
|
|
|
|
The difference is that we are going to send back the count of intervals that have happened.
|
|
|
|
This means the return type will be `impl Stream<Item = u32>`, and we can call the function `get_intervals`
|
|
```rust
|
|
fn get_intervals() -> impl Stream<Item = u32> {
|
|
let (tx, rx) = trpl::channel();
|
|
|
|
trpl::spawn_task(async move {
|
|
let mut count = 0;
|
|
loop {
|
|
trpl::sleep(Duration::from_millis(1)).await;
|
|
count += 1;
|
|
tx.send(count).unwrap();
|
|
}
|
|
});
|
|
|
|
ReceiverStream::new(rx)
|
|
}
|
|
```
|
|
Here we start by defining a `count` in the task.
|
|
|
|
We could have defined it outside of the task, but it is clearer to limit the scope of any given variable.
|
|
|
|
Then we create an infinite loop.
|
|
|
|
Each iteration of the loop asynchronously sleeps for one millisecond, increments the count, and then sends it over the channel.
|
|
|
|
This is all wrapped in the task created by `spawn_task`, all of it including the infinite loop will get cleaned up with the runtime.
|
|
|
|
This kind of infinite loop, ends only when the whole runtime gets torn down.
|
|
|
|
This is a fairly common in async Rust: many programs needs to keep running indefinitely.
|
|
|
|
With async, this doesn't block anything else, as long as there is at least one await point in each iteration through the loop.
|
|
|
|
Back in our main function's async block, we can attempt to merge the `messages` and `intervals` streams
|
|
```rust
|
|
let messages = get_messages().timeout(Duration::from_millis(200));
|
|
let intervals = get_intervals();
|
|
let merged = messages.merge(intervals);
|
|
```
|
|
We start by calling `get_intervals`.
|
|
|
|
We then merge the `messages` and `intervals` streams with the `merge` method, which combines multiple streams into one stream that produces items from any of the source streams as soon as the items are available, without any particular ordering.
|
|
|
|
Finally, we loop over that combined stream instead of over `messages`.
|
|
|
|
At this point neither `messages` nor `intervals` need to be pinned or mutable, this is because both will be combined into the single `merged` stream.
|
|
|
|
This call to `merge` doesn't compile.
|
|
|
|
(Neither does the `next` call in the `wile let` loop, but we will come back to that).
|
|
|
|
This is because the two steams have different types.
|
|
|
|
The `messages` stream has the type `Timeout< impl Stream<Item = String>>`, where `Timeout` is the type that implements `Stream` for a `timeout` call.
|
|
|
|
The `intervals` stream has the type `impl Stream<Item = u32>`.
|
|
|
|
In order to merge these tow streams, we need to transform one of them to match the other.
|
|
|
|
We will rework the intervals stream, this is because messages is already in the basic format we want and has to handle timeout errors.
|
|
```rust
|
|
let messages = get_messages().timeout(Duration::from_millis(200));
|
|
let intervals = get_intervals()
|
|
.map(|count| format!("Interval: {count}"))
|
|
.timeout(Duration::from_secs(10));
|
|
let merged = messages.merge(intervals);
|
|
let mut stream = pin!(merged);
|
|
```
|
|
Here we will use the `map` helper method to transform `intervals` into a string.
|
|
|
|
Next we need to match the `Timeout` from `messages`
|
|
|
|
Due us not actually *want* a timeout for `intervals`, we can just create a timeout which is longer than the other durations that we are using.
|
|
|
|
Specifically we create a 10-second timeout with `Duration::from_sec(10)`
|
|
|
|
Lastly we need to make `stream` mutable, so that the `while let` loop's `next` calls can iterate through the stream and pin it so that it is safe to do so.
|
|
|
|
This gets us *almost* to where we need to be.
|
|
|
|
Everything type checks.
|
|
|
|
If you now run this there will be two problems.
|
|
|
|
It will never stop. (You need to stop it manually with ctrl-c).
|
|
|
|
The messages from the English alphabet will be buried in the midst of all the interval counter messages.
|
|
|
|
Here is a sample of the output
|
|
```
|
|
--snip--
|
|
Interval: 38
|
|
Interval: 39
|
|
Interval: 40
|
|
Message: 'a'
|
|
Interval: 41
|
|
Interval: 42
|
|
Interval: 43
|
|
--snip--
|
|
```
|
|
This is one of the ways to solve these last two problems
|
|
```rust
|
|
let messages = get_messages().timeout(Duration::from_millis(200));
|
|
let intervals = get_intervals()
|
|
.map(|count| format!("Interval: {count}"))
|
|
.throttle(Duration::from_millis(100))
|
|
.timeout(Duration::from_secs(10));
|
|
let merged = messages.merge(intervals).take(20);
|
|
let mut stream = pin!(merged);
|
|
```
|
|
Here we use `throttle` method on the `intervals` stream so that it doesn't overwhelm the `messages` stream.
|
|
|
|
*Throttling* is a way of limiting the rate at which a function will be called.
|
|
|
|
In this case, how often the stream will be polled.
|
|
|
|
Here it is once every 100 milliseconds should do, because this is roughly how often our messages arrive.
|
|
|
|
To limit the number of times we accept form a stream, we will apply the `take` method to the `merged` stream.
|
|
|
|
Because we want to limit the final output, and not just one stream or the other.
|
|
|
|
Now when we run it.
|
|
|
|
It stops after pulling 20 items form the stream and the intervals don't overwhelm the messages.
|
|
|
|
We also don't `Interval: 100` or `Interval: 200` or etc...
|
|
|
|
Instead we get `Interval: 1`, `Interval 2`, etc...
|
|
|
|
Even though we have a source stream that *can* produce an event every millisecond.
|
|
|
|
This is because the `throttle` call produces a new stream that wraps the original stream so that the original stream gets polled only at the throttle rate., and not its own "native" rate.
|
|
|
|
We also won't have a bunch of unhandled interval messages we are choose to ignore.
|
|
|
|
Instead we never produce those interval messages in the first place.
|
|
|
|
This is the inherent "laziness" of Rust's futures at work again.
|
|
|
|
This allows us to choose our performance characteristics.
|
|
```
|
|
Interval: 1
|
|
Message: 'a'
|
|
Interval: 2
|
|
Interval: 3
|
|
Problem: Elapsed(())
|
|
Interval: 4
|
|
Message: 'b'
|
|
Interval: 5
|
|
Message: 'c'
|
|
Interval: 6
|
|
Interval: 7
|
|
Problem: Elapsed(())
|
|
Interval: 8
|
|
Message: 'd'
|
|
Interval: 9
|
|
Message: 'e'
|
|
Interval: 10
|
|
Interval: 11
|
|
Problem: Elapsed(())
|
|
Interval: 12
|
|
```
|
|
The last thing is that we need to handle is errors.
|
|
|
|
With both of these channel-based streams, the `send` calls could fail when the other side of the channel closes and that is just a matter of how the runtime executes the futures that make up the stream.
|
|
|
|
Up until now, we have ignored this possibility by calling `unwrap`.
|
|
|
|
In a well behaved app, we should explicitly handle the error at minimum by ending the loop so we don't try to send any more messages.
|
|
|
|
Here is a simple error handling strategy; print the issue and then `break` form the loops
|
|
```rust
|
|
fn get_messages() -> impl Stream<Item = String> {
|
|
let (tx, rx) = trpl::channel();
|
|
|
|
trpl::spawn_task(async move {
|
|
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
|
|
|
|
for (index, message) in messages.into_iter().enumerate() {
|
|
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
|
|
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
|
|
|
|
if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
|
|
eprintln!("Cannot send message '{message}': {send_error}");
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
ReceiverStream::new(rx)
|
|
}
|
|
|
|
fn get_intervals() -> impl Stream<Item = u32> {
|
|
let (tx, rx) = trpl::channel();
|
|
|
|
trpl::spawn_task(async move {
|
|
let mut count = 0;
|
|
loop {
|
|
trpl::sleep(Duration::from_millis(1)).await;
|
|
count += 1;
|
|
|
|
if let Err(send_error) = tx.send(count) {
|
|
eprintln!("Could not send interval {count}: {send_error}");
|
|
break;
|
|
};
|
|
}
|
|
});
|
|
|
|
ReceiverStream::new(rx)
|
|
}
|
|
```
|
|
The correct way to handle a message send error will vary.
|
|
|
|
Just ensure you have a strategy.
|