RustBrock/Futures in Sequence.md
2025-03-27 14:37:10 -06:00

20 KiB

Streams: Futures in Sequence

So far we have limited ourselves to individual features.

There has been one exception was the async channel.

The async recv method produces a sequence of items over time.

This is an instance of a much more general pattern known as a stream.

Earlier we saw a sequence of items, the Iterator trait, there are tow differences between iterators and the async channel receiver.

The first difference is time: iterators are synchronous, while the channel receiver is asynchronous.

The second is the API.

When working directly with Iterator, we call its synchronous next method.

With the trpl::Receiver stream in particular, we called an asynchronous recv method instead.

Otherwise, the APIs feel very similar, and this isn't similarity isn't coincidence.

A stream is like an asynchronous form of iteration.

The trpl::Receiver on the other side specifically waits to receive messages, the general-purpose stream API is much broader.

It provides the next item the way Iterator does, but asynchronously.

The similarity between iterators and streams in Rust means we can actually create a stream from any iterator.

With any iterator, we can work with a stream by calling its next method then awaiting the output.

Here is an example of this

        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }

Here we start with an array of numbers, which we then convert into an iterator and then call map on to double all the values.

We then convert the iterator into a stream using the trpl::stream_from_iter function.

Next, we loop over the items in the stream as they arrive with the while let loop.

This does not compile but instead it reports that there is no next method available.

error[E0599]: no method named `next` found for struct `Iter` in the current scope
  --> src/main.rs:10:40
   |
10 |         while let Some(value) = stream.next().await {
   |                                        ^^^^
   |
   = note: the full type name has been written to 'file:///projects/async_await/target/debug/deps/async_await-9de943556a6001b8.long-type-1281356139287206597.txt'
   = note: consider using `--verbose` to print the full type name to the console
   = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
   |
1  + use crate::trpl::StreamExt;
   |
1  + use futures_util::stream::stream::StreamExt;
   |
1  + use std::iter::Iterator;
   |
1  + use std::str::pattern::Searcher;
   |
help: there is a method `try_next` with a similar name
   |
10 |         while let Some(value) = stream.try_next().await {
   |                                        ~~~~~~~~

This output explains, the reason for the compiler error is that we need the right trait in scope to be able to use the next method.

You may expect that trait to be in Stream, but it is actually in StreamExt.

This is short for extension, Ext is a common pattern in the Rust community for extending one trait with another.

We will explain the Stream and StreamExt traits in a bit more detail at the end of the chapter.

For now know that the Stream trait defines a low-level interface that effectively combines the Iterator and Future traits.

StreamExt supplies a higher level set of APIs on top of Stream, including the next method as well as other utility methods similar to the ones provided by the Iterator trait.

Stream and StreamExt are not yet part of Rust's std library but most crates use the same definition.

The fix for this is to add a use statement for trpl::StreamExt.

extern crate trpl; // required for mdbook test

use trpl::StreamExt;

fn main() {
    trpl::run(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}

Now with all of the pieces put together, the code works the way we want.

Now that we have StreamExt in scope, we can use all of its utility methods, just as with iterators.

Here is an example of this, here we use the filter method to filter out everything but multiples of three and five.

extern crate trpl; // required for mdbook test

use trpl::StreamExt;

fn main() {
    trpl::run(async {
        let values = 1..101;
        let iter = values.map(|n| n * 2);
        let stream = trpl::stream_from_iter(iter);

        let mut filtered =
            stream.filter(|value| value % 3 == 0 || value % 5 == 0);

        while let Some(value) = filtered.next().await {
            println!("The value was: {value}");
        }
    });
}

This isn't very interesting, since we could do the same with normal iterators and without any async at all.

Now looks look at what is unique to streams

Composing Streams

Many concepts are naturally represented as streams.

Items become available in a queue, chunks of data being pulled incrementally from the filesystem when the full data set is too large for the computer's, or data arriving over the network over time.

An example of this is batching up events to avoid triggering too many network calls, set timeouts on sequences of long-running operations, or throttle user interface events to avoid doing needless work.

To start we will build a little stream of messages as a stand-in for a stream of data we might see from a WebSocket, or another real-time communication protocol.

Here is an example of this.

extern crate trpl; // required for mdbook test

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages = get_messages();

        while let Some(message) = messages.next().await {
            println!("{message}");
        }
    });
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
    for message in messages {
        tx.send(format!("Message: '{message}'")).unwrap();
    }

    ReceiverStream::new(rx)
}

Here we first create a function called get_messages that returns impl Stream<Item = String>.

This implementation, we create a async channel, loop over the first 10 letters of the alphabet and send them across the channel.

Now we use a new type ReceiverStream.

This converts the rx receiver from trpl::channel into a Stream with a next method.

In main, we use a while let loop to print all the messages form the stream.

When we run this code we get this output, as expected.

Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'

We could have done this with the regular Receiver API or even the regular Iterator API so lets add a feature that requires streams.

Adding a timeout that applies to every item in the stream, and a delay on the items we emit.

This is shown here

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =
            pin!(get_messages().timeout(Duration::from_millis(200)));

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
    for message in messages {
        tx.send(format!("Message: '{message}'")).unwrap();
    }

    ReceiverStream::new(rx)
}

Here we add a timeout to the stream with the timeout method.

This comes form the StreamExt trait.

Next we update the body of the while let loop, because the stream now returns a Result.

The Ok variant indicates a message arrived in time.

The Err variant indicates that the timeout elapsed before any message arrived.

We match this result and either print the message we receive it successfully or print a notice about the timeout.

Notice that we pin the messages after applying the timeout to them, because the timeout helper produces a stream that needs to be pinned to be polled.

Because there are no delays between messages, the timeout does not change the behavior of the program.

Now lets add a variable delay to the messages we send

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =
            pin!(get_messages().timeout(Duration::from_millis(200)));

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

In get_messages we use the enumerate iterator method with the messages array.

This is so that we can get the index of each item we are sending along with the item itself.

Next we apply a 100-millisecond delay to even-index items and a 300-millisecond delay to odd-index items to simulate the different days we may see from a stream of messages in the real world.

Because our timeout is for 200 milliseconds, this should affect half of the messages.

To sleep between messages in the get_messages function without blocking, we need to use async.

However, we cant make get_messages itself into an async function, because then we would return a Future<Output = Stream<Item = String>> instead of a Stream<Item = String>>/

The caller would need to await get_messages itself to get access to the stream.

Remember everything in a given future happens linearly; concurrency happens between futures.

Awaiting get_messages would require it to send all the messages, including the sleep delay between each message, before returning the receiver stream

This would make the timeout useless.

There would be no delays in the stream itself; they would happen before the stream was even available.

instead, we leave get_messages as a regular function that returns a stream, and we spawn a task to handle the async sleep calls.

Note that calling spawn_task in this way works because we already set up our runtime.

If we had not it would cause a panic.

Other implementations choose different tradeoffs.

They may spawn a new runtime and avoid the panic but end up with a but of extra overhead, or they may simply not provide a standalone way to spawn tasks without reference to a runtime.

Make sure you know what tradeoff you runtime has chosen and write your code accordingly.

This makes our code have a much more interesting result.

Between every other pair of messages, a Problem:: Elapsed(()) error.

Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'

The timeout doesn't prevent the messages form arriving in the end.

We still get all of the original messages, because our channel is unbounded.

It can hold as messages as we can fit in memory.

If the message doesn't arrive before the timeout, our stream handler will account for that, but when it polls the stream again, the message many now have arrived.

You can get different behavior if needed by using other kinds of channels or other kinds of streams more generally.

Lets see this by combining a stream of time intervals with this stream of messages.

Merging Streams

We will start by creating another stream, this will emit and item every millisecond if we let it run directly.

For simplicity we will use the sleep function to send a message on a delay and combine it with the same approach we used in get_messages of creating a stream from a channel.

The difference is that we are going to send back the count of intervals that have happened.

This means the return type will be impl Stream<Item = u32>, and we can call the function get_intervals

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;
            tx.send(count).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

Here we start by defining a count in the task.

We could have defined it outside of the task, but it is clearer to limit the scope of any given variable.

Then we create an infinite loop.

Each iteration of the loop asynchronously sleeps for one millisecond, increments the count, and then sends it over the channel.

This is all wrapped in the task created by spawn_task, all of it including the infinite loop will get cleaned up with the runtime.

This kind of infinite loop, ends only when the whole runtime gets torn down.

This is a fairly common in async Rust: many programs needs to keep running indefinitely.

With async, this doesn't block anything else, as long as there is at least one await point in each iteration through the loop.

Back in our main function's async block, we can attempt to merge the messages and intervals streams

        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals();
        let merged = messages.merge(intervals);

We start by calling get_intervals.

We then merge the messages and intervals streams with the merge method, which combines multiple streams into one stream that produces items from any of the source streams as soon as the items are available, without any particular ordering.

Finally, we loop over that combined stream instead of over messages.

At this point neither messages nor intervals need to be pinned or mutable, this is because both will be combined into the single merged stream.

This call to merge doesn't compile.

(Neither does the next call in the wile let loop, but we will come back to that).

This is because the two steams have different types.

The messages stream has the type Timeout< impl Stream<Item = String>>, where Timeout is the type that implements Stream for a timeout call.

The intervals stream has the type impl Stream<Item = u32>.

In order to merge these tow streams, we need to transform one of them to match the other.

We will rework the intervals stream, this is because messages is already in the basic format we want and has to handle timeout errors.

        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval: {count}"))
            .timeout(Duration::from_secs(10));
        let merged = messages.merge(intervals);
        let mut stream = pin!(merged);

Here we will use the map helper method to transform intervals into a string.

Next we need to match the Timeout from messages

Due us not actually want a timeout for intervals, we can just create a timeout which is longer than the other durations that we are using.

Specifically we create a 10-second timeout with Duration::from_sec(10)

Lastly we need to make stream mutable, so that the while let loop's next calls can iterate through the stream and pin it so that it is safe to do so.

This gets us almost to where we need to be.

Everything type checks.

If you now run this there will be two problems.

It will never stop. (You need to stop it manually with ctrl-c).

The messages from the English alphabet will be buried in the midst of all the interval counter messages.

Here is a sample of the output

--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--

This is one of the ways to solve these last two problems

        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval: {count}"))
            .throttle(Duration::from_millis(100))
            .timeout(Duration::from_secs(10));
        let merged = messages.merge(intervals).take(20);
        let mut stream = pin!(merged);

Here we use throttle method on the intervals stream so that it doesn't overwhelm the messages stream.

Throttling is a way of limiting the rate at which a function will be called.

In this case, how often the stream will be polled.

Here it is once every 100 milliseconds should do, because this is roughly how often our messages arrive.

To limit the number of times we accept form a stream, we will apply the take method to the merged stream.

Because we want to limit the final output, and not just one stream or the other.

Now when we run it.

It stops after pulling 20 items form the stream and the intervals don't overwhelm the messages.

We also don't Interval: 100 or Interval: 200 or etc...

Instead we get Interval: 1, Interval 2, etc...

Even though we have a source stream that can produce an event every millisecond.

This is because the throttle call produces a new stream that wraps the original stream so that the original stream gets polled only at the throttle rate., and not its own "native" rate.

We also won't have a bunch of unhandled interval messages we are choose to ignore.

Instead we never produce those interval messages in the first place.

This is the inherent "laziness" of Rust's futures at work again.

This allows us to choose our performance characteristics.

Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12

The last thing is that we need to handle is errors.

With both of these channel-based streams, the send calls could fail when the other side of the channel closes and that is just a matter of how the runtime executes the futures that make up the stream.

Up until now, we have ignored this possibility by calling unwrap.

In a well behaved app, we should explicitly handle the error at minimum by ending the loop so we don't try to send any more messages.

Here is a simple error handling strategy; print the issue and then break form the loops

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];

        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
                eprintln!("Cannot send message '{message}': {send_error}");
                break;
            }
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;

            if let Err(send_error) = tx.send(count) {
                eprintln!("Could not send interval {count}: {send_error}");
                break;
            };
        }
    });

    ReceiverStream::new(rx)
}

The correct way to handle a message send error will vary.

Just ensure you have a strategy.