finshed ch17.4

This commit is contained in:
darkicewolf50 2025-03-27 14:37:10 -06:00
parent f76d2c7454
commit f79a47defd
3 changed files with 240 additions and 5 deletions

View File

@ -77,6 +77,20 @@
"title": "Futures in Sequence" "title": "Futures in Sequence"
} }
}, },
{
"id": "ee4116419493acd3",
"type": "leaf",
"state": {
"type": "markdown",
"state": {
"file": "Traits for Async.md",
"mode": "source",
"source": false
},
"icon": "lucide-file",
"title": "Traits for Async"
}
},
{ {
"id": "2a974ca5442d705f", "id": "2a974ca5442d705f",
"type": "leaf", "type": "leaf",
@ -130,7 +144,7 @@
} }
} }
], ],
"currentTab": 4 "currentTab": 5
} }
], ],
"direction": "vertical" "direction": "vertical"
@ -273,10 +287,11 @@
"command-palette:Open command palette": false "command-palette:Open command palette": false
} }
}, },
"active": "8d868fd701da33a8", "active": "ee4116419493acd3",
"lastOpenFiles": [ "lastOpenFiles": [
"Any Number of Futures.md",
"Futures in Sequence.md", "Futures in Sequence.md",
"Traits for Async.md",
"Any Number of Futures.md",
"Futures and Async.md", "Futures and Async.md",
"Async, Await, Futures and Streams.md", "Async, Await, Futures and Streams.md",
"Concurrency.md", "Concurrency.md",
@ -301,7 +316,6 @@
"Test_Organization.md", "Test_Organization.md",
"Traits.md", "Traits.md",
"Modules and Use.md", "Modules and Use.md",
"Modules.md",
"does_not_compile.svg", "does_not_compile.svg",
"Untitled.canvas", "Untitled.canvas",
"Good and Bad Code/Commenting Pratices", "Good and Bad Code/Commenting Pratices",

View File

@ -349,4 +349,224 @@ You can get different behavior if needed by using other kinds of channels or oth
Lets see this by combining a stream of time intervals with this stream of messages. Lets see this by combining a stream of time intervals with this stream of messages.
## Merging Streams ## Merging Streams
We will start by creating another stream, this will emit and item every millisecond if we let it run directly.
For simplicity we will use the `sleep` function to send a message on a delay and combine it with the same approach we used in `get_messages` of creating a stream from a channel.
The difference is that we are going to send back the count of intervals that have happened.
This means the return type will be `impl Stream<Item = u32>`, and we can call the function `get_intervals`
```rust
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
```
Here we start by defining a `count` in the task.
We could have defined it outside of the task, but it is clearer to limit the scope of any given variable.
Then we create an infinite loop.
Each iteration of the loop asynchronously sleeps for one millisecond, increments the count, and then sends it over the channel.
This is all wrapped in the task created by `spawn_task`, all of it including the infinite loop will get cleaned up with the runtime.
This kind of infinite loop, ends only when the whole runtime gets torn down.
This is a fairly common in async Rust: many programs needs to keep running indefinitely.
With async, this doesn't block anything else, as long as there is at least one await point in each iteration through the loop.
Back in our main function's async block, we can attempt to merge the `messages` and `intervals` streams
```rust
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals();
let merged = messages.merge(intervals);
```
We start by calling `get_intervals`.
We then merge the `messages` and `intervals` streams with the `merge` method, which combines multiple streams into one stream that produces items from any of the source streams as soon as the items are available, without any particular ordering.
Finally, we loop over that combined stream instead of over `messages`.
At this point neither `messages` nor `intervals` need to be pinned or mutable, this is because both will be combined into the single `merged` stream.
This call to `merge` doesn't compile.
(Neither does the `next` call in the `wile let` loop, but we will come back to that).
This is because the two steams have different types.
The `messages` stream has the type `Timeout< impl Stream<Item = String>>`, where `Timeout` is the type that implements `Stream` for a `timeout` call.
The `intervals` stream has the type `impl Stream<Item = u32>`.
In order to merge these tow streams, we need to transform one of them to match the other.
We will rework the intervals stream, this is because messages is already in the basic format we want and has to handle timeout errors.
```rust
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval: {count}"))
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals);
let mut stream = pin!(merged);
```
Here we will use the `map` helper method to transform `intervals` into a string.
Next we need to match the `Timeout` from `messages`
Due us not actually *want* a timeout for `intervals`, we can just create a timeout which is longer than the other durations that we are using.
Specifically we create a 10-second timeout with `Duration::from_sec(10)`
Lastly we need to make `stream` mutable, so that the `while let` loop's `next` calls can iterate through the stream and pin it so that it is safe to do so.
This gets us *almost* to where we need to be.
Everything type checks.
If you now run this there will be two problems.
It will never stop. (You need to stop it manually with ctrl-c).
The messages from the English alphabet will be buried in the midst of all the interval counter messages.
Here is a sample of the output
```
--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--
```
This is one of the ways to solve these last two problems
```rust
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval: {count}"))
.throttle(Duration::from_millis(100))
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals).take(20);
let mut stream = pin!(merged);
```
Here we use `throttle` method on the `intervals` stream so that it doesn't overwhelm the `messages` stream.
*Throttling* is a way of limiting the rate at which a function will be called.
In this case, how often the stream will be polled.
Here it is once every 100 milliseconds should do, because this is roughly how often our messages arrive.
To limit the number of times we accept form a stream, we will apply the `take` method to the `merged` stream.
Because we want to limit the final output, and not just one stream or the other.
Now when we run it.
It stops after pulling 20 items form the stream and the intervals don't overwhelm the messages.
We also don't `Interval: 100` or `Interval: 200` or etc...
Instead we get `Interval: 1`, `Interval 2`, etc...
Even though we have a source stream that *can* produce an event every millisecond.
This is because the `throttle` call produces a new stream that wraps the original stream so that the original stream gets polled only at the throttle rate., and not its own "native" rate.
We also won't have a bunch of unhandled interval messages we are choose to ignore.
Instead we never produce those interval messages in the first place.
This is the inherent "laziness" of Rust's futures at work again.
This allows us to choose our performance characteristics.
```
Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12
```
The last thing is that we need to handle is errors.
With both of these channel-based streams, the `send` calls could fail when the other side of the channel closes and that is just a matter of how the runtime executes the futures that make up the stream.
Up until now, we have ignored this possibility by calling `unwrap`.
In a well behaved app, we should explicitly handle the error at minimum by ending the loop so we don't try to send any more messages.
Here is a simple error handling strategy; print the issue and then `break` form the loops
```rust
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
eprintln!("Cannot send message '{message}': {send_error}");
break;
}
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
if let Err(send_error) = tx.send(count) {
eprintln!("Could not send interval {count}: {send_error}");
break;
};
}
});
ReceiverStream::new(rx)
}
```
The correct way to handle a message send error will vary.
Just ensure you have a strategy.

1
Traits for Async.md Normal file
View File

@ -0,0 +1 @@
# A Closer Look at the Traits for Async