mirror of
https://github.com/darkicewolf50/RustBrock.git
synced 2025-06-15 04:54:17 -06:00
765 lines
28 KiB
Markdown
765 lines
28 KiB
Markdown
# Working with Any Number of Futures
|
||
When we switched form using `join` to using `join3`.
|
||
|
||
This would be very annoying every time we changed the number of futures we wanted to join.
|
||
|
||
We have a macro instead to do this for us, which we can pass an arbitrary number of arguments.
|
||
|
||
This also handles awaiting the futures itself.
|
||
|
||
We could rewrite the code to use `join!` instead of `join3`
|
||
```rust
|
||
trpl::join!(tx1_fut, tx_fut, rx_fut);
|
||
```
|
||
This is an improvement because we do not need to swap between `join`, `join3`, `join4`, etc.
|
||
|
||
Even though this macro form only when we know the number of futures ahead of time.
|
||
|
||
In the real world Rust, pushing futures into a collection and then waiting on some or all the futures of them to complete is a common pattern.
|
||
|
||
To check all the futures in a collection, we need to iterate over and join *all* of them.
|
||
|
||
The `trpl::join_all` function any type that implements the `Iterator` trait.
|
||
|
||
Here is an example of putting all our futures in a vector and replacing `join!` with `join_all`.
|
||
```rust
|
||
let futures = vec![tx1_fut, rx_fut, tx_fut];
|
||
|
||
trpl::join_all(futures).await;
|
||
```
|
||
This will not compile, instead we get this error.
|
||
```
|
||
error[E0308]: mismatched types
|
||
--> src/main.rs:45:37
|
||
|
|
||
10 | let tx1_fut = async move {
|
||
| ---------- the expected `async` block
|
||
...
|
||
24 | let rx_fut = async {
|
||
| ----- the found `async` block
|
||
...
|
||
45 | let futures = vec![tx1_fut, rx_fut, tx_fut];
|
||
| ^^^^^^ expected `async` block, found a
|
||
different `async` block
|
||
|
|
||
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
|
||
found `async` block `{async block@src/main.rs:24:22: 24:27}`
|
||
= note: no two async blocks, even if identical, have the same type
|
||
= help: consider pinning your async block and casting it to a trait object
|
||
```
|
||
This may be surprising.
|
||
|
||
This after all, none of the async blocks return anything, so each one produces a `Future<Output = ()>`.
|
||
|
||
Remember that `Future` is a trait, and that the compiler creates a unique enum for each async block.
|
||
|
||
You can't put two different hand-written structs in a `Vec`.
|
||
|
||
This rule also applies to the different enums generated by the compiler.
|
||
|
||
In order to make this work we need to use *trait objects*, just like we did in ["Returning Errors from the `run` function"]() (improving error handling and modularity)
|
||
|
||
Using trait objects lets us treat each of the anonymous futures produced by these types as the same type, because all of them implement the `Future` trait.
|
||
|
||
Note we discussed another way to include multiple types in a `Vec`
|
||
|
||
Using an enum to represent each type that can appear in the vector.
|
||
|
||
We are unable to do this here.
|
||
|
||
For one thing we have no way to name the different types because they are anonymous.
|
||
|
||
Another reason, we reach for a vector and `join_all` in the first place was to be able to work with a dynamic collection of futures where we only care that they have the same output.
|
||
|
||
We will try this by wrapping each future in the `vec!` in a `Box::new`
|
||
```rust
|
||
let futures =
|
||
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
|
||
|
||
trpl::join_all(futures).await;
|
||
```
|
||
As expected this code will not compile.
|
||
|
||
We get the same basic error we got before for both the second and third `Box::new` calls, as well get get new errors referring to the `Unpin` trait.
|
||
|
||
We will fix this on the `Box::new` calls by explicitly annotating the tpye of the `futures` variable
|
||
```rust
|
||
let futures: Vec<Box<dyn Future<Output = ()>>> =
|
||
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
|
||
```
|
||
|
||
We will walk through this type declaration through the use of a ordered list
|
||
1. The innermost type is the future itself.
|
||
1. Note we need to explicitly that the output of the future is the unit type `()` by writing `Future<Output = ()>`
|
||
2. We annotate the trait with `dyn` to mark it as dynamic
|
||
3. The entire trait reference is wrapped in a `Box`
|
||
4. We state explicitly that `futures` is a `Vec` containing these items
|
||
This has already made a big difference.
|
||
|
||
Now we only get the errors mentioning `Unpin`.
|
||
|
||
Although there are three of them and their contents are very similar.
|
||
|
||
Here is the compiler error
|
||
```
|
||
error[E0308]: mismatched types
|
||
--> src/main.rs:46:46
|
||
|
|
||
10 | let tx1_fut = async move {
|
||
| ---------- the expected `async` block
|
||
...
|
||
24 | let rx_fut = async {
|
||
| ----- the found `async` block
|
||
...
|
||
46 | vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
|
||
| -------- ^^^^^^ expected `async` block, found a different `async` block
|
||
| |
|
||
| arguments to this function are incorrect
|
||
|
|
||
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
|
||
found `async` block `{async block@src/main.rs:24:22: 24:27}`
|
||
= note: no two async blocks, even if identical, have the same type
|
||
= help: consider pinning your async block and casting it to a trait object
|
||
note: associated function defined here
|
||
--> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12
|
||
|
|
||
255 | pub fn new(x: T) -> Self {
|
||
| ^^^
|
||
|
||
error[E0308]: mismatched types
|
||
--> src/main.rs:46:64
|
||
|
|
||
10 | let tx1_fut = async move {
|
||
| ---------- the expected `async` block
|
||
...
|
||
30 | let tx_fut = async move {
|
||
| ---------- the found `async` block
|
||
...
|
||
46 | vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
|
||
| -------- ^^^^^^ expected `async` block, found a different `async` block
|
||
| |
|
||
| arguments to this function are incorrect
|
||
|
|
||
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
|
||
found `async` block `{async block@src/main.rs:30:22: 30:32}`
|
||
= note: no two async blocks, even if identical, have the same type
|
||
= help: consider pinning your async block and casting it to a trait object
|
||
note: associated function defined here
|
||
--> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12
|
||
|
|
||
255 | pub fn new(x: T) -> Self {
|
||
| ^^^
|
||
|
||
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
|
||
--> src/main.rs:48:24
|
||
|
|
||
48 | trpl::join_all(futures).await;
|
||
| -------------- ^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
|
||
| |
|
||
| required by a bound introduced by this call
|
||
|
|
||
= note: consider using the `pin!` macro
|
||
consider using `Box::pin` if you need to access the pinned value outside of the current scope
|
||
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
|
||
note: required by a bound in `join_all`
|
||
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:105:14
|
||
|
|
||
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
|
||
| -------- required by a bound in this function
|
||
...
|
||
105 | I::Item: Future,
|
||
| ^^^^^^ required by this bound in `join_all`
|
||
|
||
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
|
||
--> src/main.rs:48:9
|
||
|
|
||
48 | trpl::join_all(futures).await;
|
||
| ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
|
||
|
|
||
= note: consider using the `pin!` macro
|
||
consider using `Box::pin` if you need to access the pinned value outside of the current scope
|
||
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
|
||
note: required by a bound in `futures_util::future::join_all::JoinAll`
|
||
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
||
|
|
||
27 | pub struct JoinAll<F>
|
||
| ------- required by a bound in this struct
|
||
28 | where
|
||
29 | F: Future,
|
||
| ^^^^^^ required by this bound in `JoinAll`
|
||
|
||
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
|
||
--> src/main.rs:48:33
|
||
|
|
||
48 | trpl::join_all(futures).await;
|
||
| ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
|
||
|
|
||
= note: consider using the `pin!` macro
|
||
consider using `Box::pin` if you need to access the pinned value outside of the current scope
|
||
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
|
||
note: required by a bound in `futures_util::future::join_all::JoinAll`
|
||
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
||
|
|
||
27 | pub struct JoinAll<F>
|
||
| ------- required by a bound in this struct
|
||
28 | where
|
||
29 | F: Future,
|
||
| ^^^^^^ required by this bound in `JoinAll`
|
||
```
|
||
The first part of the message tells us that the first async block (`scr/main.rs:8:20: 20:10`) doesn't implement the `Unpin` trait.
|
||
|
||
It also suggests using `pin!` or `Box::pin` to resolve it.
|
||
|
||
We will dig into this later about the `Pin` and `Unpin`.
|
||
|
||
For now, we can just follow the compiler's advice to get unstuck.
|
||
|
||
We will start by updating the type annotation for `futures`, with a `Pin` wrapping each `Box`
|
||
|
||
Next we will use `Box::pin` to pin the futures themselves.
|
||
```rust
|
||
let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> =
|
||
vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)];
|
||
```
|
||
Now if we compile and run this we get this output
|
||
```
|
||
received 'hi'
|
||
received 'more'
|
||
received 'from'
|
||
received 'messages'
|
||
received 'the'
|
||
received 'for'
|
||
received 'future'
|
||
received 'you'
|
||
```
|
||
Using `Pin<Box<T>>` adds a small amount of overhead from putting these futures on the heap with `Box`.
|
||
|
||
We are only doing that to get the types to line up.
|
||
|
||
We don't actually *need* the heap allocation.
|
||
|
||
These futures are local to this particular function.
|
||
|
||
`Pin` is itself a wrapper type, so we can get the benefit of having a single type in the `Vec`.
|
||
|
||
This was the original reason we tried using `Box`.
|
||
|
||
Now without the heap allocation we can use `Pin` directly with each future, using the `std::pin::pin` macro.
|
||
|
||
We still must be explicit about the type of the pinned reference.
|
||
|
||
Without this Rust will still not know how to interpret these as dynamic trait objects, which is what we need them to be in the `Vec`.
|
||
|
||
We therefore `pin!` each future when we define it and define `futures` as a `Vec` containing pinned mutable references to the dynamic future type.
|
||
|
||
```rust
|
||
let tx1_fut = pin!(async move {
|
||
// --snip--
|
||
});
|
||
|
||
let rx_fut = pin!(async {
|
||
// --snip--
|
||
});
|
||
|
||
let tx_fut = pin!(async move {
|
||
// --snip--
|
||
});
|
||
|
||
let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
|
||
vec![tx1_fut, rx_fut, tx_fut];
|
||
```
|
||
Whole code, ignore it.
|
||
```rust
|
||
extern crate trpl; // required for mdbook test
|
||
|
||
use std::{
|
||
future::Future,
|
||
pin::{pin, Pin},
|
||
time::Duration,
|
||
};
|
||
|
||
fn main() {
|
||
trpl::run(async {
|
||
let (tx, mut rx) = trpl::channel();
|
||
|
||
let tx1 = tx.clone();
|
||
let tx1_fut = pin!(async move {
|
||
// --snip--
|
||
let vals = vec![
|
||
String::from("hi"),
|
||
String::from("from"),
|
||
String::from("the"),
|
||
String::from("future"),
|
||
];
|
||
|
||
for val in vals {
|
||
tx1.send(val).unwrap();
|
||
trpl::sleep(Duration::from_secs(1)).await;
|
||
}
|
||
});
|
||
|
||
let rx_fut = pin!(async {
|
||
// --snip--
|
||
while let Some(value) = rx.recv().await {
|
||
println!("received '{value}'");
|
||
}
|
||
});
|
||
|
||
let tx_fut = pin!(async move {
|
||
// --snip--
|
||
let vals = vec![
|
||
String::from("more"),
|
||
String::from("messages"),
|
||
String::from("for"),
|
||
String::from("you"),
|
||
];
|
||
|
||
for val in vals {
|
||
tx.send(val).unwrap();
|
||
trpl::sleep(Duration::from_secs(1)).await;
|
||
}
|
||
});
|
||
|
||
let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
|
||
vec![tx1_fut, rx_fut, tx_fut];
|
||
|
||
trpl::join_all(futures).await;
|
||
});
|
||
}
|
||
```
|
||
So far we got to this point by ignoring the fact that we might have different `Output` types.
|
||
|
||
Here is an example of this.
|
||
|
||
- The anonymous future for `a` implements `Future<Output = u32>`
|
||
- The anonymous future for `b` implements `Future<Output = &str>`
|
||
- The anonymous future for `c` implements `Future<Output = bool>`
|
||
```rust
|
||
let a = async { 1u32 };
|
||
let b = async { "Hello!" };
|
||
let c = async { true };
|
||
|
||
let (a_result, b_result, c_result) = trpl::join!(a, b, c);
|
||
println!("{a_result}, {b_result}, {c_result}");
|
||
```
|
||
We can use `trpl::join!` to await them because it allows us to pass in multiple future types and produces a tuple of those types.
|
||
|
||
We *cannot* use `trpl::join_all` because it requires all of the futures passed in to gave the same type.
|
||
|
||
This would bring us pack to the `Pin` trait.
|
||
|
||
This is a fundamental tradeoff.
|
||
|
||
We can either deal with a dynamic number of futures with `join_all`, as long as they are all the same type, or we can deal with a set number of futures with the `join` functions or the `join!` macro, even if they have different types.
|
||
|
||
This is the same scenario we have faced when working with any other types in Rust.
|
||
|
||
Futures have some nice syntax for working with them, but ultimately are not special.
|
||
|
||
## Racing Futures
|
||
When we join the futures with the `join` family of functions and macros, we require *all* of them to finish before we move on.
|
||
|
||
Sometimes we only need *some* future from a set to finish before we move on.
|
||
|
||
This is like racing one future against another.
|
||
|
||
Here we once again use `trpl::race` to run two futures, `slow` and `fast` against each other.
|
||
```rust
|
||
extern crate trpl; // required for mdbook test
|
||
|
||
use std::time::Duration;
|
||
|
||
fn main() {
|
||
trpl::run(async {
|
||
let slow = async {
|
||
println!("'slow' started.");
|
||
trpl::sleep(Duration::from_millis(100)).await;
|
||
println!("'slow' finished.");
|
||
};
|
||
|
||
let fast = async {
|
||
println!("'fast' started.");
|
||
trpl::sleep(Duration::from_millis(50)).await;
|
||
println!("'fast' finished.");
|
||
};
|
||
|
||
trpl::race(slow, fast).await;
|
||
});
|
||
}
|
||
```
|
||
Each future prints a message when it starts, it then pauses for some amount of time (calling and awaiting `sleep`) and then prints another message when it finishes.
|
||
|
||
Then we pass both `slow` and `dast` to `trpl::race` and wait for one of them to finish. (`fast` wins)
|
||
|
||
Unlike when we used `race` before we ignore the `Either` instance it returns here, because all of the interesting behavior happens in the body of the async blocks.
|
||
|
||
Other implementations *are* fair and will randomly choose which future to poll first.
|
||
|
||
Regardless of whether the implementation of race we are using is fair, *one* of the futures will run up to the first `await` in its body before another task can start.
|
||
|
||
Rust gives a runtime a chance to pause the task and switch to another one if the future being awaited isn't ready.
|
||
|
||
The invers is also true: Rust *only* pauses async blocks and hands control back to a runtime will run up to the first `await` in its body before another task can start.
|
||
|
||
This mean that if you do a bunch of work in an async block without an await point, that future will block any other futures form making progress.
|
||
|
||
You sometimes hear this referred to as one future *starving* other futures.
|
||
|
||
Some cases this may be a big problem.
|
||
|
||
However if you do some expensive setup or long-running work, or if you have a future that will keep doing some particular task indefinitely.
|
||
|
||
You will need to think about when and where to hand control back to the runtime.
|
||
|
||
If you have long-running blocking operations, async can be a useful tool for providing ways for different parts of the program to relate to each other.
|
||
|
||
*How* would you hand control back to the runtime in those cases?
|
||
|
||
## Yielding Control to the Runtime
|
||
Lets simulate a long running operation
|
||
```rust
|
||
fn slow(name: &str, ms: u64) {
|
||
thread::sleep(Duration::from_millis(ms));
|
||
println!("'{name}' ran for {ms}ms");
|
||
}
|
||
```
|
||
The code uses `std::thread::sleep` instead of `trpl::sleep` so that calling `slow` will block the current thread for a number of milliseconds.
|
||
|
||
We then can use `slow` to stand in for real-world operations that are both long running and blocking.
|
||
|
||
Here we use `slow` to emulate doing a CPU-bound work in a pair of futures
|
||
```rust
|
||
let a = async {
|
||
println!("'a' started.");
|
||
slow("a", 30);
|
||
slow("a", 10);
|
||
slow("a", 20);
|
||
trpl::sleep(Duration::from_millis(50)).await;
|
||
println!("'a' finished.");
|
||
};
|
||
|
||
let b = async {
|
||
println!("'b' started.");
|
||
slow("b", 75);
|
||
slow("b", 10);
|
||
slow("b", 15);
|
||
slow("b", 350);
|
||
trpl::sleep(Duration::from_millis(50)).await;
|
||
println!("'b' finished.");
|
||
};
|
||
|
||
trpl::race(a, b).await;
|
||
```
|
||
To begin, each future only hands control back to the runtime *after* carrying out a bunch of slow operations.
|
||
|
||
You see this output form this code
|
||
```
|
||
'a' started.
|
||
'a' ran for 30ms
|
||
'a' ran for 10ms
|
||
'a' ran for 20ms
|
||
'b' started.
|
||
'b' ran for 75ms
|
||
'b' ran for 10ms
|
||
'b' ran for 15ms
|
||
'b' ran for 350ms
|
||
'a' finished.
|
||
```
|
||
|
||
As with the earlier example, `race` still finishes as soon as `a` is done.
|
||
|
||
Notice that there is no interleaving between the two futures.
|
||
|
||
The `a` future does all of its work until the `trpl::sleep` is awaited, then the `b` future does all of its work until its `trpl::sleep` is awaited.
|
||
|
||
Finally the `a` future completes.
|
||
|
||
To allow both futures to make progress between their slow tasks we need to add await points so we can hand control back to the runtime.
|
||
|
||
This means we need something to await.
|
||
|
||
If we removed the `trpl::sleep` at the end of the `a` future, it would complete without the `b` future running *at all*.
|
||
|
||
Now we will try using `sleep` as a starting point for letting operations switch off making progress.
|
||
```rust
|
||
let one_ms = Duration::from_millis(1);
|
||
|
||
let a = async {
|
||
println!("'a' started.");
|
||
slow("a", 30);
|
||
trpl::sleep(one_ms).await;
|
||
slow("a", 10);
|
||
trpl::sleep(one_ms).await;
|
||
slow("a", 20);
|
||
trpl::sleep(one_ms).await;
|
||
println!("'a' finished.");
|
||
};
|
||
|
||
let b = async {
|
||
println!("'b' started.");
|
||
slow("b", 75);
|
||
trpl::sleep(one_ms).await;
|
||
slow("b", 10);
|
||
trpl::sleep(one_ms).await;
|
||
slow("b", 15);
|
||
trpl::sleep(one_ms).await;
|
||
slow("b", 35);
|
||
trpl::sleep(one_ms).await;
|
||
println!("'b' finished.");
|
||
};
|
||
```
|
||
Here we added `trpl::sleep` calls with await points between each call to `slow`.
|
||
|
||
Now thew futures' work is interleaved.
|
||
```
|
||
'a' started.
|
||
'a' ran for 30ms
|
||
'b' started.
|
||
'b' ran for 75ms
|
||
'a' ran for 10ms
|
||
'b' ran for 10ms
|
||
'a' ran for 20ms
|
||
'b' ran for 15ms
|
||
'a' finished.
|
||
```
|
||
Here the `a` future runs for a but before handing off control to `b` because it calls `slow` before ever calling `trpl::sleep`.
|
||
|
||
After that the futures swap back and forth each time one of them hits and await point.
|
||
|
||
In this case we do this after every call to slow, we could also break up the work in whatever way makes most sense to us.
|
||
|
||
We don't actually want to *sleep* here, but we want to switch control.
|
||
|
||
We want to make progress as fast as we can.
|
||
|
||
We need a way to hand control back to the runtime.
|
||
|
||
We can do this directly with the `yield_now` function.
|
||
|
||
In this example we replace all of the `sleep` calls with `yield_now`
|
||
```rust
|
||
let a = async {
|
||
println!("'a' started.");
|
||
slow("a", 30);
|
||
trpl::yield_now().await;
|
||
slow("a", 10);
|
||
trpl::yield_now().await;
|
||
slow("a", 20);
|
||
trpl::yield_now().await;
|
||
println!("'a' finished.");
|
||
};
|
||
|
||
let b = async {
|
||
println!("'b' started.");
|
||
slow("b", 75);
|
||
trpl::yield_now().await;
|
||
slow("b", 10);
|
||
trpl::yield_now().await;
|
||
slow("b", 15);
|
||
trpl::yield_now().await;
|
||
slow("b", 35);
|
||
trpl::yield_now().await;
|
||
println!("'b' finished.");
|
||
};
|
||
```
|
||
This is now much clearer about the intent and can be significantly faster than using `sleep`.
|
||
|
||
Due to timers such as the one used by `sleep` often have limits on how granular they can be.
|
||
|
||
The version of `sleep` we use for example we always sleep for at least a millisecond, even if we pass it a `Duration` of one nanosecond.
|
||
|
||
Modern computers are *fast*, and they can do a lot in one millisecond.
|
||
|
||
This will be demonstrated by setting up a little benchmark.
|
||
|
||
Note: This isn’t an especially rigorous way to do performance testing, but it suffices to show the difference here.
|
||
```rust
|
||
extern crate trpl; // required for mdbook test
|
||
|
||
use std::time::{Duration, Instant};
|
||
|
||
fn main() {
|
||
trpl::run(async {
|
||
let one_ns = Duration::from_nanos(1);
|
||
let start = Instant::now();
|
||
async {
|
||
for _ in 1..1000 {
|
||
trpl::sleep(one_ns).await;
|
||
}
|
||
}
|
||
.await;
|
||
let time = Instant::now() - start;
|
||
println!(
|
||
"'sleep' version finished after {} seconds.",
|
||
time.as_secs_f32()
|
||
);
|
||
|
||
let start = Instant::now();
|
||
async {
|
||
for _ in 1..1000 {
|
||
trpl::yield_now().await;
|
||
}
|
||
}
|
||
.await;
|
||
let time = Instant::now() - start;
|
||
println!(
|
||
"'yield' version finished after {} seconds.",
|
||
time.as_secs_f32()
|
||
);
|
||
});
|
||
}
|
||
```
|
||
Here we skip all of the status printing, pass a one-nanosecond `Duration` to `trpl::sleep` and then let each future run by itself without switching between futures.
|
||
|
||
Then we run for 1,000 iterations and see how long the future using `trpl::sleep` takes compared to the future using `trpl::yield_now` takes
|
||
|
||
The version using `yeild_now` is *way* faster.
|
||
|
||
This means that async can be useful even for CPU bound tasks (depending on what your program does/doing).
|
||
|
||
This provides a useful tool for structuring the relationships between different parts of the program.
|
||
|
||
This is a form of *cooperative multitasking*, where each future has the power to determine when it hands control over via await points.
|
||
|
||
This also means that each future also has the responsibility to avoid blocking for too long.
|
||
|
||
|
||
In some Rust-based embedded operating systems, this is the *only* kind of multitasking.
|
||
|
||
In real-world apps, you usually won't be alternating function calls with await points on ever single line.
|
||
|
||
While yielding control in this way is relatively inexpensive, it is not free.
|
||
|
||
In many cases trying to break up a compute bound task might make it significantly slower.
|
||
|
||
Sometimes it is better for the *overall* performance to let an operation briefly.
|
||
|
||
You can always measure to see what your code's actual performance bottlenecks are.
|
||
|
||
The underlying dynamic is important to keep in mind.
|
||
|
||
If you *are* seeing a lot of work happening in serial that you expected to happen in happen concurrently
|
||
|
||
## Building Our Own Async Abstractions
|
||
We can also compose futures together to create new patterns.
|
||
|
||
For example we can build a `timeout` function with async building blocks we already have.
|
||
|
||
When we are done, the result will be another building block we could use to create still more async abstractions.
|
||
|
||
This shows how we would expect this `timeout` to work with a slow future.
|
||
```rust
|
||
extern crate trpl; // required for mdbook test
|
||
|
||
use std::time::Duration;
|
||
|
||
fn main() {
|
||
trpl::run(async {
|
||
let slow = async {
|
||
trpl::sleep(Duration::from_millis(100)).await;
|
||
"I finished!"
|
||
};
|
||
|
||
match timeout(slow, Duration::from_millis(10)).await {
|
||
Ok(message) => println!("Succeeded with '{message}'"),
|
||
Err(duration) => {
|
||
println!("Failed after {} seconds", duration.as_secs())
|
||
}
|
||
}
|
||
});
|
||
}
|
||
```
|
||
|
||
To implement this we will think about the API for `timeout`:
|
||
- It needs to be an async function itself so we can await it
|
||
- Its first parameter should be a future to run
|
||
- We can make it generic to allow it to work with any future
|
||
- The second parameter will be the maximum time to wait.
|
||
- If we use a `Duration`, this will make it easy to pass along to `trpl::sleep`
|
||
- It should return a `Result`
|
||
- If the future completes successfully, the `Result` will be `Ok` with the value produced by the future.
|
||
- If the timeout elapses first the `Result` will be `Err` with the duration that the timeout waited for.
|
||
Here is a declaration of this function
|
||
```rust
|
||
async fn timeout<F: Future>(
|
||
future_to_try: F,
|
||
max_time: Duration,
|
||
) -> Result<F::Output, Duration> {
|
||
// Here is where our implementation will go!
|
||
}
|
||
```
|
||
This satisfies the goals of our types.
|
||
|
||
Now we will focus on the *behavior* we need.
|
||
|
||
We want to race the future passed in against the duration.
|
||
|
||
We can will use `trpl::sleep` to make a timer future form the duration, and use `trpl::race` to run that timer with the future the caller passes in.
|
||
|
||
We know that `race` is not fair, polling arguments in the order which they are passed.
|
||
|
||
We pass `future_to_try` to `race` fist so it gets a chance to complete even if `max_time` is a very short duration.
|
||
|
||
If `future_to_try` finishes first, `race` will return `Left` with the output from `future_to_try`.
|
||
|
||
If `timer` finishes first, `race` will return `Right` with the timer's output of `()`.
|
||
|
||
Here we match on the result of awaiting `trpl::race`
|
||
```rust
|
||
extern crate trpl; // required for mdbook test
|
||
|
||
use std::{future::Future, time::Duration};
|
||
|
||
use trpl::Either;
|
||
|
||
// --snip--
|
||
|
||
fn main() {
|
||
trpl::run(async {
|
||
let slow = async {
|
||
trpl::sleep(Duration::from_secs(5)).await;
|
||
"Finally finished"
|
||
};
|
||
|
||
match timeout(slow, Duration::from_secs(2)).await {
|
||
Ok(message) => println!("Succeeded with '{message}'"),
|
||
Err(duration) => {
|
||
println!("Failed after {} seconds", duration.as_secs())
|
||
}
|
||
}
|
||
});
|
||
}
|
||
|
||
async fn timeout<F: Future>(
|
||
future_to_try: F,
|
||
max_time: Duration,
|
||
) -> Result<F::Output, Duration> {
|
||
match trpl::race(future_to_try, trpl::sleep(max_time)).await {
|
||
Either::Left(output) => Ok(output),
|
||
Either::Right(_) => Err(max_time),
|
||
}
|
||
}
|
||
```
|
||
|
||
If the `future_to_try` succeeds and we get a `Left(output)`, we return `Ok(output)`
|
||
|
||
If the sleep timer elapses we get a `Right(())`, we ignore the `()` with `_` and return `Err(max_time)` instead.
|
||
|
||
With this we have a working `timeout` built out of two other async helpers.
|
||
|
||
If we run the code, it will print the failure mode after the timeout
|
||
```
|
||
Failed after 2 seconds
|
||
```
|
||
Because futures compose with other futures, you can build really powerful tools suing smaller async blocks.
|
||
|
||
One example of this is the same approach to combine timeout with retries, and in turn use those with operations such as network calls (thus is one of the examples form the beginning of the chapter).
|
||
|
||
In practice, you usually work directly with `async` and `await` and secondarily with functions and macros like `join`, `join_all`, `race` and so on.
|
||
|
||
You only need to reach for `pin` to use futures with those APIs.
|
||
|
||
Some things to consider:
|
||
- We used a `Vec` with `join_all` to wait for all of the futures in some group to finish. How could you use a `Vec` to process a group of futures in sequence instead? What are the tradeoffs of doing that?
|
||
- Take a look at the `futures::stream::FuturesUnordered` type from the `futures` crate. How would using it be different from using a `Vec`? (Don’t worry about the fact that it’s from the `stream` part of the crate; it works just fine with any collection of futures.)
|
||
Next we will look at working with multiple futures in a sequence over time with `streams` |