diff --git a/.obsidian/workspace.json b/.obsidian/workspace.json index 57a1383..a4bf1c9 100644 --- a/.obsidian/workspace.json +++ b/.obsidian/workspace.json @@ -77,6 +77,20 @@ "title": "Futures in Sequence" } }, + { + "id": "ee4116419493acd3", + "type": "leaf", + "state": { + "type": "markdown", + "state": { + "file": "Traits for Async.md", + "mode": "source", + "source": false + }, + "icon": "lucide-file", + "title": "Traits for Async" + } + }, { "id": "2a974ca5442d705f", "type": "leaf", @@ -130,7 +144,7 @@ } } ], - "currentTab": 4 + "currentTab": 5 } ], "direction": "vertical" @@ -273,10 +287,11 @@ "command-palette:Open command palette": false } }, - "active": "8d868fd701da33a8", + "active": "ee4116419493acd3", "lastOpenFiles": [ - "Any Number of Futures.md", "Futures in Sequence.md", + "Traits for Async.md", + "Any Number of Futures.md", "Futures and Async.md", "Async, Await, Futures and Streams.md", "Concurrency.md", @@ -301,7 +316,6 @@ "Test_Organization.md", "Traits.md", "Modules and Use.md", - "Modules.md", "does_not_compile.svg", "Untitled.canvas", "Good and Bad Code/Commenting Pratices", diff --git a/Futures in Sequence.md b/Futures in Sequence.md index d0e3a6b..0f4be1c 100644 --- a/Futures in Sequence.md +++ b/Futures in Sequence.md @@ -349,4 +349,224 @@ You can get different behavior if needed by using other kinds of channels or oth Lets see this by combining a stream of time intervals with this stream of messages. ## Merging Streams - \ No newline at end of file + We will start by creating another stream, this will emit and item every millisecond if we let it run directly. + +For simplicity we will use the `sleep` function to send a message on a delay and combine it with the same approach we used in `get_messages` of creating a stream from a channel. + +The difference is that we are going to send back the count of intervals that have happened. + +This means the return type will be `impl Stream`, and we can call the function `get_intervals` +```rust +fn get_intervals() -> impl Stream { + let (tx, rx) = trpl::channel(); + + trpl::spawn_task(async move { + let mut count = 0; + loop { + trpl::sleep(Duration::from_millis(1)).await; + count += 1; + tx.send(count).unwrap(); + } + }); + + ReceiverStream::new(rx) +} +``` +Here we start by defining a `count` in the task. + +We could have defined it outside of the task, but it is clearer to limit the scope of any given variable. + +Then we create an infinite loop. + +Each iteration of the loop asynchronously sleeps for one millisecond, increments the count, and then sends it over the channel. + +This is all wrapped in the task created by `spawn_task`, all of it including the infinite loop will get cleaned up with the runtime. + +This kind of infinite loop, ends only when the whole runtime gets torn down. + +This is a fairly common in async Rust: many programs needs to keep running indefinitely. + +With async, this doesn't block anything else, as long as there is at least one await point in each iteration through the loop. + +Back in our main function's async block, we can attempt to merge the `messages` and `intervals` streams +```rust + let messages = get_messages().timeout(Duration::from_millis(200)); + let intervals = get_intervals(); + let merged = messages.merge(intervals); +``` +We start by calling `get_intervals`. + +We then merge the `messages` and `intervals` streams with the `merge` method, which combines multiple streams into one stream that produces items from any of the source streams as soon as the items are available, without any particular ordering. + +Finally, we loop over that combined stream instead of over `messages`. + +At this point neither `messages` nor `intervals` need to be pinned or mutable, this is because both will be combined into the single `merged` stream. + +This call to `merge` doesn't compile. + +(Neither does the `next` call in the `wile let` loop, but we will come back to that). + +This is because the two steams have different types. + +The `messages` stream has the type `Timeout< impl Stream>`, where `Timeout` is the type that implements `Stream` for a `timeout` call. + +The `intervals` stream has the type `impl Stream`. + +In order to merge these tow streams, we need to transform one of them to match the other. + +We will rework the intervals stream, this is because messages is already in the basic format we want and has to handle timeout errors. +```rust + let messages = get_messages().timeout(Duration::from_millis(200)); + let intervals = get_intervals() + .map(|count| format!("Interval: {count}")) + .timeout(Duration::from_secs(10)); + let merged = messages.merge(intervals); + let mut stream = pin!(merged); +``` +Here we will use the `map` helper method to transform `intervals` into a string. + +Next we need to match the `Timeout` from `messages` + +Due us not actually *want* a timeout for `intervals`, we can just create a timeout which is longer than the other durations that we are using. + +Specifically we create a 10-second timeout with `Duration::from_sec(10)` + +Lastly we need to make `stream` mutable, so that the `while let` loop's `next` calls can iterate through the stream and pin it so that it is safe to do so. + +This gets us *almost* to where we need to be. + +Everything type checks. + +If you now run this there will be two problems. + +It will never stop. (You need to stop it manually with ctrl-c). + +The messages from the English alphabet will be buried in the midst of all the interval counter messages. + +Here is a sample of the output +``` +--snip-- +Interval: 38 +Interval: 39 +Interval: 40 +Message: 'a' +Interval: 41 +Interval: 42 +Interval: 43 +--snip-- +``` +This is one of the ways to solve these last two problems +```rust + let messages = get_messages().timeout(Duration::from_millis(200)); + let intervals = get_intervals() + .map(|count| format!("Interval: {count}")) + .throttle(Duration::from_millis(100)) + .timeout(Duration::from_secs(10)); + let merged = messages.merge(intervals).take(20); + let mut stream = pin!(merged); +``` +Here we use `throttle` method on the `intervals` stream so that it doesn't overwhelm the `messages` stream. + +*Throttling* is a way of limiting the rate at which a function will be called. + +In this case, how often the stream will be polled. + +Here it is once every 100 milliseconds should do, because this is roughly how often our messages arrive. + +To limit the number of times we accept form a stream, we will apply the `take` method to the `merged` stream. + +Because we want to limit the final output, and not just one stream or the other. + +Now when we run it. + +It stops after pulling 20 items form the stream and the intervals don't overwhelm the messages. + +We also don't `Interval: 100` or `Interval: 200` or etc... + +Instead we get `Interval: 1`, `Interval 2`, etc... + +Even though we have a source stream that *can* produce an event every millisecond. + +This is because the `throttle` call produces a new stream that wraps the original stream so that the original stream gets polled only at the throttle rate., and not its own "native" rate. + +We also won't have a bunch of unhandled interval messages we are choose to ignore. + +Instead we never produce those interval messages in the first place. + +This is the inherent "laziness" of Rust's futures at work again. + +This allows us to choose our performance characteristics. +``` +Interval: 1 +Message: 'a' +Interval: 2 +Interval: 3 +Problem: Elapsed(()) +Interval: 4 +Message: 'b' +Interval: 5 +Message: 'c' +Interval: 6 +Interval: 7 +Problem: Elapsed(()) +Interval: 8 +Message: 'd' +Interval: 9 +Message: 'e' +Interval: 10 +Interval: 11 +Problem: Elapsed(()) +Interval: 12 +``` +The last thing is that we need to handle is errors. + +With both of these channel-based streams, the `send` calls could fail when the other side of the channel closes and that is just a matter of how the runtime executes the futures that make up the stream. + +Up until now, we have ignored this possibility by calling `unwrap`. + +In a well behaved app, we should explicitly handle the error at minimum by ending the loop so we don't try to send any more messages. + +Here is a simple error handling strategy; print the issue and then `break` form the loops +```rust +fn get_messages() -> impl Stream { + let (tx, rx) = trpl::channel(); + + trpl::spawn_task(async move { + let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; + + for (index, message) in messages.into_iter().enumerate() { + let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; + trpl::sleep(Duration::from_millis(time_to_sleep)).await; + + if let Err(send_error) = tx.send(format!("Message: '{message}'")) { + eprintln!("Cannot send message '{message}': {send_error}"); + break; + } + } + }); + + ReceiverStream::new(rx) +} + +fn get_intervals() -> impl Stream { + let (tx, rx) = trpl::channel(); + + trpl::spawn_task(async move { + let mut count = 0; + loop { + trpl::sleep(Duration::from_millis(1)).await; + count += 1; + + if let Err(send_error) = tx.send(count) { + eprintln!("Could not send interval {count}: {send_error}"); + break; + }; + } + }); + + ReceiverStream::new(rx) +} +``` +The correct way to handle a message send error will vary. + +Just ensure you have a strategy. diff --git a/Traits for Async.md b/Traits for Async.md new file mode 100644 index 0000000..45a2708 --- /dev/null +++ b/Traits for Async.md @@ -0,0 +1 @@ +# A Closer Look at the Traits for Async