RustBrock/Any Number of Futures.md
darkicewolf50 c73d875808
Some checks failed
Test Gitea Actions / first (push) Successful in 18s
Test Gitea Actions / check-code (push) Failing after 12s
Test Gitea Actions / test (push) Has been skipped
Test Gitea Actions / documentation-check (push) Has been skipped
started ch17.3, completed half
2025-03-24 16:01:40 -06:00

16 KiB

Working with Any Number of Futures

When we switched form using join to using join3.

This would be very annoying every time we changed the number of futures we wanted to join.

We have a macro instead to do this for us, which we can pass an arbitrary number of arguments.

This also handles awaiting the futures itself.

We could rewrite the code to use join! instead of join3

        trpl::join!(tx1_fut, tx_fut, rx_fut);

This is an improvement because we do not need to swap between join, join3, join4, etc.

Even though this macro form only when we know the number of futures ahead of time.

In the real world Rust, pushing futures into a collection and then waiting on some or all the futures of them to complete is a common pattern.

To check all the futures in a collection, we need to iterate over and join all of them.

The trpl::join_all function any type that implements the Iterator trait.

Here is an example of putting all our futures in a vector and replacing join! with join_all.

        let futures = vec![tx1_fut, rx_fut, tx_fut];

        trpl::join_all(futures).await;

This will not compile, instead we get this error.

error[E0308]: mismatched types
  --> src/main.rs:45:37
   |
10 |         let tx1_fut = async move {
   |                       ---------- the expected `async` block
...
24 |         let rx_fut = async {
   |                      ----- the found `async` block
...
45 |         let futures = vec![tx1_fut, rx_fut, tx_fut];
   |                                     ^^^^^^ expected `async` block, found a 
different `async` block
   |
   = note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
              found `async` block `{async block@src/main.rs:24:22: 24:27}`
   = note: no two async blocks, even if identical, have the same type
   = help: consider pinning your async block and casting it to a trait object

This may be surprising.

This after all, none of the async blocks return anything, so each one produces a Future<Output = ()>.

Remember that Future is a trait, and that the compiler creates a unique enum for each async block.

You can't put two different hand-written structs in a Vec.

This rule also applies to the different enums generated by the compiler.

In order to make this work we need to use trait objects, just like we did in "Returning Errors from the run function" (improving error handling and modularity)

Using trait objects lets us treat each of the anonymous futures produced by these types as the same type, because all of them implement the Future trait.

Note we discussed another way to include multiple types in a Vec

Using an enum to represent each type that can appear in the vector.

We are unable to do this here.

For one thing we have no way to name the different types because they are anonymous.

Another reason, we reach for a vector and join_all in the first place was to be able to work with a dynamic collection of futures where we only care that they have the same output.

We will try this by wrapping each future in the vec! in a Box::new

        let futures =
            vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];

        trpl::join_all(futures).await;

As expected this code will not compile.

We get the same basic error we got before for both the second and third Box::new calls, as well get get new errors referring to the Unpin trait.

We will fix this on the Box::new calls by explicitly annotating the tpye of the futures variable

        let futures: Vec<Box<dyn Future<Output = ()>>> =
            vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];

We will walk through this type declaration through the use of a ordered list

  1. The innermost type is the future itself.
    1. Note we need to explicitly that the output of the future is the unit type () by writing Future<Output = ()>
  2. We annotate the trait with dyn to mark it as dynamic
  3. The entire trait reference is wrapped in a Box
  4. We state explicitly that futures is a Vec containing these items This has already made a big difference.

Now we only get the errors mentioning Unpin.

Although there are three of them and their contents are very similar.

Here is the compiler error

error[E0308]: mismatched types
   --> src/main.rs:46:46
    |
10  |         let tx1_fut = async move {
    |                       ---------- the expected `async` block
...
24  |         let rx_fut = async {
    |                      ----- the found `async` block
...
46  |             vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
    |                                     -------- ^^^^^^ expected `async` block, found a different `async` block
    |                                     |
    |                                     arguments to this function are incorrect
    |
    = note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
               found `async` block `{async block@src/main.rs:24:22: 24:27}`
    = note: no two async blocks, even if identical, have the same type
    = help: consider pinning your async block and casting it to a trait object
note: associated function defined here
   --> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12
    |
255 |     pub fn new(x: T) -> Self {
    |            ^^^

error[E0308]: mismatched types
   --> src/main.rs:46:64
    |
10  |         let tx1_fut = async move {
    |                       ---------- the expected `async` block
...
30  |         let tx_fut = async move {
    |                      ---------- the found `async` block
...
46  |             vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
    |                                                       -------- ^^^^^^ expected `async` block, found a different `async` block
    |                                                       |
    |                                                       arguments to this function are incorrect
    |
    = note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
               found `async` block `{async block@src/main.rs:30:22: 30:32}`
    = note: no two async blocks, even if identical, have the same type
    = help: consider pinning your async block and casting it to a trait object
note: associated function defined here
   --> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12
    |
255 |     pub fn new(x: T) -> Self {
    |            ^^^

error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
   --> src/main.rs:48:24
    |
48  |         trpl::join_all(futures).await;
    |         -------------- ^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
    |         |
    |         required by a bound introduced by this call
    |
    = note: consider using the `pin!` macro
            consider using `Box::pin` if you need to access the pinned value outside of the current scope
    = note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `join_all`
   --> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:105:14
    |
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
    |        -------- required by a bound in this function
...
105 |     I::Item: Future,
    |              ^^^^^^ required by this bound in `join_all`

error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
  --> src/main.rs:48:9
   |
48 |         trpl::join_all(futures).await;
   |         ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
   |
   = note: consider using the `pin!` macro
           consider using `Box::pin` if you need to access the pinned value outside of the current scope
   = note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
  --> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
  --> src/main.rs:48:33
   |
48 |         trpl::join_all(futures).await;
   |                                 ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
   |
   = note: consider using the `pin!` macro
           consider using `Box::pin` if you need to access the pinned value outside of the current scope
   = note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
  --> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

The first part of the message tells us that the first async block (scr/main.rs:8:20: 20:10) doesn't implement the Unpin trait.

It also suggests using pin! or Box::pin to resolve it.

We will dig into this later about the Pin and Unpin.

For now, we can just follow the compiler's advice to get unstuck.

We will start by updating the type annotation for futures, with a Pin wrapping each Box

Next we will use Box::pin to pin the futures themselves.

        let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> =
            vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)];

Now if we compile and run this we get this output

received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'

Using Pin<Box<T>> adds a small amount of overhead from putting these futures on the heap with Box.

We are only doing that to get the types to line up.

We don't actually need the heap allocation.

These futures are local to this particular function.

Pin is itself a wrapper type, so we can get the benefit of having a single type in the Vec.

This was the original reason we tried using Box.

Now without the heap allocation we can use Pin directly with each future, using the std::pin::pin macro.

We still must be explicit about the type of the pinned reference.

Without this Rust will still not know how to interpret these as dynamic trait objects, which is what we need them to be in the Vec.

We therefore pin! each future when we define it and define futures as a Vec containing pinned mutable references to the dynamic future type.

        let tx1_fut = pin!(async move {
            // --snip--
        });

        let rx_fut = pin!(async {
            // --snip--
        });

        let tx_fut = pin!(async move {
            // --snip--
        });

        let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
            vec![tx1_fut, rx_fut, tx_fut];

Whole code, ignore it.

extern crate trpl; // required for mdbook test

use std::{
    future::Future,
    pin::{pin, Pin},
    time::Duration,
};

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = pin!(async move {
            // --snip--
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        });

        let rx_fut = pin!(async {
            // --snip--
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        });

        let tx_fut = pin!(async move {
            // --snip--
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        });

        let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
            vec![tx1_fut, rx_fut, tx_fut];

        trpl::join_all(futures).await;
    });
}

So far we got to this point by ignoring the fact that we might have different Output types.

Here is an example of this.

  • The anonymous future for a implements Future<Output = u32>
  • The anonymous future for b implements Future<Output = &str>
  • The anonymous future for c implements Future<Output = bool>
        let a = async { 1u32 };
        let b = async { "Hello!" };
        let c = async { true };

        let (a_result, b_result, c_result) = trpl::join!(a, b, c);
        println!("{a_result}, {b_result}, {c_result}");

We can use trpl::join! to await them because it allows us to pass in multiple future types and produces a tuple of those types.

We cannot use trpl::join_all because it requires all of the futures passed in to gave the same type.

This would bring us pack to the Pin trait.

This is a fundamental tradeoff.

We can either deal with a dynamic number of futures with join_all, as long as they are all the same type, or we can deal with a set number of futures with the join functions or the join! macro, even if they have different types.

This is the same scenario we have faced when working with any other types in Rust.

Futures have some nice syntax for working with them, but ultimately are not special.

Racing Futures

When we join the futures with the join family of functions and macros, we require all of them to finish before we move on.

Sometimes we only need some future from a set to finish before we move on.

This is like racing one future against another.

Here we once again use trpl::race to run two futures, slow and fast against each other.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let slow = async {
            println!("'slow' started.");
            trpl::sleep(Duration::from_millis(100)).await;
            println!("'slow' finished.");
        };

        let fast = async {
            println!("'fast' started.");
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'fast' finished.");
        };

        trpl::race(slow, fast).await;
    });
}

Each future prints a message when it starts, it then pauses for some amount of time (calling and awaiting sleep) and then prints another message when it finishes.

Then we pass both slow and dast to trpl::race and wait for one of them to finish. (fast wins)

Unlike when we used race before we ignore the Either instance it returns here, because all of the interesting behavior happens in the body of the async blocks.

Other implementations are fair and will randomly choose which future to poll first.

Regardless of whether the implementation of race we are using is fair, one of the futures will run up to the first await in its body before another task can start.

Rust gives a runtime a chance to pause the task and switch to another one if the future being awaited isn't ready.

The invers is also true: Rust only pauses async blocks and hands control back to a runtime will run up to the first await in its body before another task can start.

This mean that if you do a bunch of work in an async block without an await point, that future will block any other futures form making progress.

You sometimes hear this referred to as one future starving other futures.

Some cases this may be a big problem.

However if you do some expensive setup or long-running work, or if you have a future that will keep doing some particular task indefinitely.

You will need to think about when and where to hand control back to the runtime.

If you have long-running blocking operations, async can be a useful tool for providing ways for different parts of the program to relate to each other.

How would you hand control back to the runtime in those cases?

Yielding Control to the Runtime