RustBrock/Futures in Sequence.md
darkicewolf50 f76d2c7454
Some checks failed
Test Gitea Actions / first (push) Successful in 15s
Test Gitea Actions / check-code (push) Failing after 14s
Test Gitea Actions / test (push) Has been skipped
Test Gitea Actions / documentation-check (push) Has been skipped
completed half of ch17.4
2025-03-25 16:59:44 -06:00

12 KiB

Streams: Futures in Sequence

So far we have limited ourselves to individual features.

There has been one exception was the async channel.

The async recv method produces a sequence of items over time.

This is an instance of a much more general pattern known as a stream.

Earlier we saw a sequence of items, the Iterator trait, there are tow differences between iterators and the async channel receiver.

The first difference is time: iterators are synchronous, while the channel receiver is asynchronous.

The second is the API.

When working directly with Iterator, we call its synchronous next method.

With the trpl::Receiver stream in particular, we called an asynchronous recv method instead.

Otherwise, the APIs feel very similar, and this isn't similarity isn't coincidence.

A stream is like an asynchronous form of iteration.

The trpl::Receiver on the other side specifically waits to receive messages, the general-purpose stream API is much broader.

It provides the next item the way Iterator does, but asynchronously.

The similarity between iterators and streams in Rust means we can actually create a stream from any iterator.

With any iterator, we can work with a stream by calling its next method then awaiting the output.

Here is an example of this

        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }

Here we start with an array of numbers, which we then convert into an iterator and then call map on to double all the values.

We then convert the iterator into a stream using the trpl::stream_from_iter function.

Next, we loop over the items in the stream as they arrive with the while let loop.

This does not compile but instead it reports that there is no next method available.

error[E0599]: no method named `next` found for struct `Iter` in the current scope
  --> src/main.rs:10:40
   |
10 |         while let Some(value) = stream.next().await {
   |                                        ^^^^
   |
   = note: the full type name has been written to 'file:///projects/async_await/target/debug/deps/async_await-9de943556a6001b8.long-type-1281356139287206597.txt'
   = note: consider using `--verbose` to print the full type name to the console
   = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
   |
1  + use crate::trpl::StreamExt;
   |
1  + use futures_util::stream::stream::StreamExt;
   |
1  + use std::iter::Iterator;
   |
1  + use std::str::pattern::Searcher;
   |
help: there is a method `try_next` with a similar name
   |
10 |         while let Some(value) = stream.try_next().await {
   |                                        ~~~~~~~~

This output explains, the reason for the compiler error is that we need the right trait in scope to be able to use the next method.

You may expect that trait to be in Stream, but it is actually in StreamExt.

This is short for extension, Ext is a common pattern in the Rust community for extending one trait with another.

We will explain the Stream and StreamExt traits in a bit more detail at the end of the chapter.

For now know that the Stream trait defines a low-level interface that effectively combines the Iterator and Future traits.

StreamExt supplies a higher level set of APIs on top of Stream, including the next method as well as other utility methods similar to the ones provided by the Iterator trait.

Stream and StreamExt are not yet part of Rust's std library but most crates use the same definition.

The fix for this is to add a use statement for trpl::StreamExt.

extern crate trpl; // required for mdbook test

use trpl::StreamExt;

fn main() {
    trpl::run(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}

Now with all of the pieces put together, the code works the way we want.

Now that we have StreamExt in scope, we can use all of its utility methods, just as with iterators.

Here is an example of this, here we use the filter method to filter out everything but multiples of three and five.

extern crate trpl; // required for mdbook test

use trpl::StreamExt;

fn main() {
    trpl::run(async {
        let values = 1..101;
        let iter = values.map(|n| n * 2);
        let stream = trpl::stream_from_iter(iter);

        let mut filtered =
            stream.filter(|value| value % 3 == 0 || value % 5 == 0);

        while let Some(value) = filtered.next().await {
            println!("The value was: {value}");
        }
    });
}

This isn't very interesting, since we could do the same with normal iterators and without any async at all.

Now looks look at what is unique to streams

Composing Streams

Many concepts are naturally represented as streams.

Items become available in a queue, chunks of data being pulled incrementally from the filesystem when the full data set is too large for the computer's, or data arriving over the network over time.

An example of this is batching up events to avoid triggering too many network calls, set timeouts on sequences of long-running operations, or throttle user interface events to avoid doing needless work.

To start we will build a little stream of messages as a stand-in for a stream of data we might see from a WebSocket, or another real-time communication protocol.

Here is an example of this.

extern crate trpl; // required for mdbook test

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages = get_messages();

        while let Some(message) = messages.next().await {
            println!("{message}");
        }
    });
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
    for message in messages {
        tx.send(format!("Message: '{message}'")).unwrap();
    }

    ReceiverStream::new(rx)
}

Here we first create a function called get_messages that returns impl Stream<Item = String>.

This implementation, we create a async channel, loop over the first 10 letters of the alphabet and send them across the channel.

Now we use a new type ReceiverStream.

This converts the rx receiver from trpl::channel into a Stream with a next method.

In main, we use a while let loop to print all the messages form the stream.

When we run this code we get this output, as expected.

Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'

We could have done this with the regular Receiver API or even the regular Iterator API so lets add a feature that requires streams.

Adding a timeout that applies to every item in the stream, and a delay on the items we emit.

This is shown here

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =
            pin!(get_messages().timeout(Duration::from_millis(200)));

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
    for message in messages {
        tx.send(format!("Message: '{message}'")).unwrap();
    }

    ReceiverStream::new(rx)
}

Here we add a timeout to the stream with the timeout method.

This comes form the StreamExt trait.

Next we update the body of the while let loop, because the stream now returns a Result.

The Ok variant indicates a message arrived in time.

The Err variant indicates that the timeout elapsed before any message arrived.

We match this result and either print the message we receive it successfully or print a notice about the timeout.

Notice that we pin the messages after applying the timeout to them, because the timeout helper produces a stream that needs to be pinned to be polled.

Because there are no delays between messages, the timeout does not change the behavior of the program.

Now lets add a variable delay to the messages we send

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =
            pin!(get_messages().timeout(Duration::from_millis(200)));

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

In get_messages we use the enumerate iterator method with the messages array.

This is so that we can get the index of each item we are sending along with the item itself.

Next we apply a 100-millisecond delay to even-index items and a 300-millisecond delay to odd-index items to simulate the different days we may see from a stream of messages in the real world.

Because our timeout is for 200 milliseconds, this should affect half of the messages.

To sleep between messages in the get_messages function without blocking, we need to use async.

However, we cant make get_messages itself into an async function, because then we would return a Future<Output = Stream<Item = String>> instead of a Stream<Item = String>>/

The caller would need to await get_messages itself to get access to the stream.

Remember everything in a given future happens linearly; concurrency happens between futures.

Awaiting get_messages would require it to send all the messages, including the sleep delay between each message, before returning the receiver stream

This would make the timeout useless.

There would be no delays in the stream itself; they would happen before the stream was even available.

instead, we leave get_messages as a regular function that returns a stream, and we spawn a task to handle the async sleep calls.

Note that calling spawn_task in this way works because we already set up our runtime.

If we had not it would cause a panic.

Other implementations choose different tradeoffs.

They may spawn a new runtime and avoid the panic but end up with a but of extra overhead, or they may simply not provide a standalone way to spawn tasks without reference to a runtime.

Make sure you know what tradeoff you runtime has chosen and write your code accordingly.

This makes our code have a much more interesting result.

Between every other pair of messages, a Problem:: Elapsed(()) error.

Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'

The timeout doesn't prevent the messages form arriving in the end.

We still get all of the original messages, because our channel is unbounded.

It can hold as messages as we can fit in memory.

If the message doesn't arrive before the timeout, our stream handler will account for that, but when it polls the stream again, the message many now have arrived.

You can get different behavior if needed by using other kinds of channels or other kinds of streams more generally.

Lets see this by combining a stream of time intervals with this stream of messages.

Merging Streams