Compare commits

...

3 Commits

Author SHA1 Message Date
1338d4f994 fix actions
Some checks failed
Test Gitea Actions / first (push) Successful in 25s
Test Gitea Actions / check-code (push) Failing after 23s
Test Gitea Actions / test (push) Has been skipped
Test Gitea Actions / documentation-check (push) Has been skipped
2025-03-27 16:37:41 -06:00
0e0e0ea857 started ch17.5 2025-03-27 16:36:00 -06:00
f79a47defd finshed ch17.4 2025-03-27 14:37:10 -06:00
4 changed files with 477 additions and 12 deletions

View File

@ -1,16 +1,12 @@
# name of the workflow.
# this is optional.
name: Rust Checking and Testing
name: Test Gitea Actions
# events that will trigger this workflow.
# here, we only have "pull_request", so the workflow will run
# whenever we create a pull request.
# other examples: [push] and [pull_request, push]
on:
# push:
# branches: [ "master" ]
pull_request:
branches: [ "master" ]
on: [push]
env:
CARGO_TERM_COLOR: always
@ -19,6 +15,29 @@ env:
# jobs run in parallel by default (we can change that).
# each job groups together a series of steps to accomplish a purpose.
jobs:
# name of the job
first:
# the platform or OS that the workflow will run on.
runs-on: ubuntu-latest
# series of steps to finish the job.
steps:
# name of the step.
# steps run sequentially.
# this is optionale
- name: checkout
# each step can either have "uses" or "run".
# "uses" run an action written somewhere other than this workflow .
# usually from the community.
# this action checks out the repo code to the runner (instance)
# running the action
uses: actions/checkout@v3
# another step.
# this step runs a bash (Ubuntu's default shell) command
- name: list files
run: ls
# name of the job
check-code:
# the platform or OS that the workflow will run on.
@ -36,6 +55,9 @@ jobs:
# this action checks out the repo code to the runner (instance)
# running the action
uses: actions/checkout@v4
- name: move to minigrep
run: cd minigrep/
# another step.
# Step 1: Run cargo check and fail if it fails
@ -63,6 +85,9 @@ jobs:
# this action checks out the repo code to the runner (instance)
# running the action
uses: actions/checkout@v4
- name: move to minigrep
run: cd minigrep/
# Step 2: Run unit and integration tests (excluding documentation tests)
- name: Run Tests
@ -89,6 +114,9 @@ jobs:
# this action checks out the repo code to the runner (instance)
# running the action
uses: actions/checkout@v4
- name: move to minigrep
run: cd minigrep/
# Step 3: Check if documentation tests were run
- name: Check for Documentation Tests
@ -96,4 +124,4 @@ jobs:
DOC_TESTS=$(cargo test --doc --verbose)
if [[ ! "$DOC_TESTS" =~ "running" ]]; then
echo "No documentation tests were run!" && exit 1
fi
fi

View File

@ -77,6 +77,20 @@
"title": "Futures in Sequence"
}
},
{
"id": "ee4116419493acd3",
"type": "leaf",
"state": {
"type": "markdown",
"state": {
"file": "Traits for Async.md",
"mode": "source",
"source": false
},
"icon": "lucide-file",
"title": "Traits for Async"
}
},
{
"id": "2a974ca5442d705f",
"type": "leaf",
@ -130,7 +144,7 @@
}
}
],
"currentTab": 4
"currentTab": 5
}
],
"direction": "vertical"
@ -273,10 +287,11 @@
"command-palette:Open command palette": false
}
},
"active": "8d868fd701da33a8",
"active": "ee4116419493acd3",
"lastOpenFiles": [
"Any Number of Futures.md",
"Futures in Sequence.md",
"Traits for Async.md",
"Any Number of Futures.md",
"Futures and Async.md",
"Async, Await, Futures and Streams.md",
"Concurrency.md",
@ -301,7 +316,6 @@
"Test_Organization.md",
"Traits.md",
"Modules and Use.md",
"Modules.md",
"does_not_compile.svg",
"Untitled.canvas",
"Good and Bad Code/Commenting Pratices",

View File

@ -349,4 +349,224 @@ You can get different behavior if needed by using other kinds of channels or oth
Lets see this by combining a stream of time intervals with this stream of messages.
## Merging Streams
We will start by creating another stream, this will emit and item every millisecond if we let it run directly.
For simplicity we will use the `sleep` function to send a message on a delay and combine it with the same approach we used in `get_messages` of creating a stream from a channel.
The difference is that we are going to send back the count of intervals that have happened.
This means the return type will be `impl Stream<Item = u32>`, and we can call the function `get_intervals`
```rust
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
```
Here we start by defining a `count` in the task.
We could have defined it outside of the task, but it is clearer to limit the scope of any given variable.
Then we create an infinite loop.
Each iteration of the loop asynchronously sleeps for one millisecond, increments the count, and then sends it over the channel.
This is all wrapped in the task created by `spawn_task`, all of it including the infinite loop will get cleaned up with the runtime.
This kind of infinite loop, ends only when the whole runtime gets torn down.
This is a fairly common in async Rust: many programs needs to keep running indefinitely.
With async, this doesn't block anything else, as long as there is at least one await point in each iteration through the loop.
Back in our main function's async block, we can attempt to merge the `messages` and `intervals` streams
```rust
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals();
let merged = messages.merge(intervals);
```
We start by calling `get_intervals`.
We then merge the `messages` and `intervals` streams with the `merge` method, which combines multiple streams into one stream that produces items from any of the source streams as soon as the items are available, without any particular ordering.
Finally, we loop over that combined stream instead of over `messages`.
At this point neither `messages` nor `intervals` need to be pinned or mutable, this is because both will be combined into the single `merged` stream.
This call to `merge` doesn't compile.
(Neither does the `next` call in the `wile let` loop, but we will come back to that).
This is because the two steams have different types.
The `messages` stream has the type `Timeout< impl Stream<Item = String>>`, where `Timeout` is the type that implements `Stream` for a `timeout` call.
The `intervals` stream has the type `impl Stream<Item = u32>`.
In order to merge these tow streams, we need to transform one of them to match the other.
We will rework the intervals stream, this is because messages is already in the basic format we want and has to handle timeout errors.
```rust
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval: {count}"))
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals);
let mut stream = pin!(merged);
```
Here we will use the `map` helper method to transform `intervals` into a string.
Next we need to match the `Timeout` from `messages`
Due us not actually *want* a timeout for `intervals`, we can just create a timeout which is longer than the other durations that we are using.
Specifically we create a 10-second timeout with `Duration::from_sec(10)`
Lastly we need to make `stream` mutable, so that the `while let` loop's `next` calls can iterate through the stream and pin it so that it is safe to do so.
This gets us *almost* to where we need to be.
Everything type checks.
If you now run this there will be two problems.
It will never stop. (You need to stop it manually with ctrl-c).
The messages from the English alphabet will be buried in the midst of all the interval counter messages.
Here is a sample of the output
```
--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--
```
This is one of the ways to solve these last two problems
```rust
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval: {count}"))
.throttle(Duration::from_millis(100))
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals).take(20);
let mut stream = pin!(merged);
```
Here we use `throttle` method on the `intervals` stream so that it doesn't overwhelm the `messages` stream.
*Throttling* is a way of limiting the rate at which a function will be called.
In this case, how often the stream will be polled.
Here it is once every 100 milliseconds should do, because this is roughly how often our messages arrive.
To limit the number of times we accept form a stream, we will apply the `take` method to the `merged` stream.
Because we want to limit the final output, and not just one stream or the other.
Now when we run it.
It stops after pulling 20 items form the stream and the intervals don't overwhelm the messages.
We also don't `Interval: 100` or `Interval: 200` or etc...
Instead we get `Interval: 1`, `Interval 2`, etc...
Even though we have a source stream that *can* produce an event every millisecond.
This is because the `throttle` call produces a new stream that wraps the original stream so that the original stream gets polled only at the throttle rate., and not its own "native" rate.
We also won't have a bunch of unhandled interval messages we are choose to ignore.
Instead we never produce those interval messages in the first place.
This is the inherent "laziness" of Rust's futures at work again.
This allows us to choose our performance characteristics.
```
Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12
```
The last thing is that we need to handle is errors.
With both of these channel-based streams, the `send` calls could fail when the other side of the channel closes and that is just a matter of how the runtime executes the futures that make up the stream.
Up until now, we have ignored this possibility by calling `unwrap`.
In a well behaved app, we should explicitly handle the error at minimum by ending the loop so we don't try to send any more messages.
Here is a simple error handling strategy; print the issue and then `break` form the loops
```rust
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
eprintln!("Cannot send message '{message}': {send_error}");
break;
}
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
if let Err(send_error) = tx.send(count) {
eprintln!("Could not send interval {count}: {send_error}");
break;
};
}
});
ReceiverStream::new(rx)
}
```
The correct way to handle a message send error will vary.
Just ensure you have a strategy.

203
Traits for Async.md Normal file
View File

@ -0,0 +1,203 @@
# A Closer Look at the Traits for Async
Sometime, you will encounter situations where you will need to understand a few more of these details.
A high level understanding is ok for most of day to day Rust writing.
In this chapter we will dig in just enough to help in those scenarios.
Diving even requires reading the documentation.
## The `Future` Trait
Now lets look at how the future trait works.
Here is how Rust defines it
```rust
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
```
The trait definition includes a bunch of new types and also some syntax we haven't seen before.
First, `Future`'s associated type `Output` says what the future resolves to.
This is analogous to the `Item` associated type for the `IUterator` trait.
Second, `Future` also has the `poll` method, this takes a special `Pin` reference for its `self` parameter and a mutable reference to a `Context` type, and returns a `Poll<Self::Output>`.
For now we will focus on what the method returns, the `Poll` type
```rust
enum Poll<T> {
Ready(T),
Pending,
}
```
This `Pool` type is similar to an `Option`.
It has one variant that has a value, `Ready(T)`, and one which does not `Pending`.
`Poll` means something quite different form `Option`.
The `Pending` variant indicates that the future still has work to do, so the caller will need to check again later.
The `Ready` variant indicates that the future has finished its work and the `T` value is available.
Note that futures, the caller should not call `poll` again after the future has returned `Ready`.
Many of the futures will panic if polled again after becoming ready.
Futures that safe to poll again will say so explicitly in their documentation.
This is similar behavior to `Iterator::next`.
When toy see code that uses `await`, Rust compiles it under the hood to code that calls `poll`.
If you look back at the previous example, where we printed out the page title for a single URL once it resolved.
Rust compiles it into something kind of like this
```rust
match page_title(url).poll() {
Ready(page_title) => match page_title {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} had no title"),
}
Pending => {
// But what goes here?
}
}
```
What should we do when the future is still `Pending`?
We need way to try again and again, until the future is finally ready.
We need a loop
```rust
let mut page_title_fut = page_title(url);
loop {
match page_title_fut.poll() {
Ready(value) => match page_title {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} had no title"),
}
Pending => {
// continue
}
}
}
```
If Rust compiled it to exactly this code.
Every `await` would be blocking, exactly the opposite of what we are trying to do.
Instead Rust makes sure that the loop can hand off control to something that can pause work in this future to work on other futures and then check this again later.
This is something that async runtime, and this scheduling and coordination work is one of its main jobs.
Earlier we described waiting on `rx.recv`.
The `recv` call returns a future, and awaiting the future polls it.
We noted that a runtime will pause the future until it is ready with either `Some(message)` or `None` when the channel closes.
Now with the deeper understanding of the `Future` trait, and specifically `Future::poll`, we can now see how that works.
The runtime knows the future isn't ready when it returns `Poll::Pending`.
The runtime also knows the future *is* ready and advances it when `poll` returns `Poll::Ready(Some(message))` or `Poll::Ready(None)`
The exact details of how a runtime works, is something that will not be covered by this book.
The key is to see the basic mechanics of futures.
A runtime *polls* each future it is responsible for, putting the future beck to sleep when it is not yet ready.
## The `Pin` and `Unpin` Traits
When we introduced pinning, we run into this error message.
Here is the relevant part again
```
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:33
|
48 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
```
This error message tells us not only that we need to pin the values but also why pinning is required.
The `trpl::join_all` function returns a struct called `JoinAll`.
The struct is a generic over a type `F` which is constrained to implement the `Future` trait.
Directly awaiting a future with `await` pins the future implicitly.
This is why we don't need to use `pin!` everywhere we want to await futures.
However we are not directly awaiting a future here.
Instead, we construct a new future, `JoinAll`, by passing a collection of futures to the `join_all` function.
The signature for `join_all` requires that the types of the items in the collection all implement the `Future` trait.
`Box<T>` implements `Future` only if the `T` wraps is a future that implements the `Unpin` trait.
Now we will dive a little deeper into how the `Future` trait actually works, in particular around *pinning*.
Lets look at the definition of pf the `Future` trait.
```rust
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
// Required method
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
```
The `cx` parameter and its `Context` type are the key to how a runtime actually knows when to check any given future while still being lazy.
The details of how that works are beyond the scope of this chapter, and you generally only need to think about this when writing a custom `Future` implementation.
Here we will focus on the type for `self` as this is the first time we have seen a method where `self` has a type annotation.
A type annotation for `self` is works like type annotations for other function parameters, but there are two key differences.
- It tells Rust what type `self` must be for the method to be called.
- It can't be just any time
- It is restricted to the type on which the method is implemented, a reference or smart pointer to the type, or a `Pin` wrapping a reference to that type.
We will see more on this syntax in Ch18.
For now know that if we want to poll a future to check whether it is `Pending` or `Ready(Output)`m we need a `Pin` wrapped mutable reference to the type.
`Pin` is a wrapper for pointer-like types such as `&`, `&mut`, `Box`, and `Rc`.
(Technically `Pin` works with types that implement the `Deref` or `DerefMut` traits but this is effectively equivalent to working only with pointers)
`Pin` is not a pointer itself.
It also doesn't have any behavior of its own like `Rc` and `Arc` do with reference counting.
This is purely a tool the compiler can use to enforce constraints on pointer usage.
Recall that `await` is implemented in terms of calls to `poll` start to explain the error message from before.
This was in terms of `Unpin`, not `Pin`.
How does `Pin` relate to `Unpin` and why does `Future` need `self` to be in a `Pin` type to call `poll`?