finished ch16.2
Some checks failed
Test Gitea Actions / first (push) Successful in 15s
Test Gitea Actions / check-code (push) Failing after 15s
Test Gitea Actions / test (push) Has been skipped
Test Gitea Actions / documentation-check (push) Has been skipped

This commit is contained in:
darkicewolf50 2025-03-12 12:00:31 -06:00
parent 54cc434db5
commit 661e8bf5c7
5 changed files with 337 additions and 4 deletions

View File

@ -44,6 +44,6 @@ This means that Rust offers a variety of tools for modeling problems in whatever
Here are the topics that will be covered in this section: Here are the topics that will be covered in this section:
- How to create threads to run multiple pieces of code at the same time [Section Link Here](./Simultaneous%20Code%20Running.md) - How to create threads to run multiple pieces of code at the same time [Section Link Here](./Simultaneous%20Code%20Running.md)
- *Message-passing* concurrency, where channels send messages between threads - *Message-passing* concurrency, where channels send messages between threads [Section Link Here](./Passing%20Data%20Between%20Threads.md)
- *Shared-state* concurrency, where multiple threads have access to some piece of data - *Shared-state* concurrency, where multiple threads have access to some piece of data [Section Link Here](./Shared%20State%20Concurrency.md)
- The `Sync` and `Send` traits, which extend Rust's concurrency guarantees to use-defined types as well as types provided by the std library - The `Sync` and `Send` traits, which extend Rust's concurrency guarantees to use-defined types as well as types provided by the std library

View File

@ -1 +1,333 @@
# Using Message Passing to Transfer Data Between Threads # Using Message Passing to Transfer Data Between Threads
One popular approach to ensuring safe concurrency is *message passing*, where threads or actors communicate by sending each other message containing data.
The idea in a solgan is best said by the [Go language documentation](https://golang.org/doc/effective_go.html#concurrency):
"Do not communicate by sharing memory; instead, share memory by communicating."
To accomplish message-sending concurrency, Rust's std library provides an implementation of *channels*.
A channel is a general concept in programming by which data is sent from one thread to another.
It can be imagined being like a directional channel of water, such as a stream or a river.
If you put something like a rubber duck into a river, it will travel downstream to the end of the waterway.
A channel has two parts:
- A transmitter
- A receiver
The transmitter is the upstream location where you put rubber ducks into the river.
The receiver is where the rubber duck ends up downstream.
One part of your code calls methods on the transmitter with the data you want to send
Another part checks the receiving end for messages.
A channel is said to be *closed* if either the transmitter or receiver is dropped.
We will work up to a program that has one thread to generate values and send them down a channel and another thread that will receive the values and print them out.
We will send simple values between threads to demostrate this feature using channels.
Once fimiliar with the technique, you could use channels for any threads that need to communicate between each other.
This could be used as a chat system or a system where many threads perform parts of a calculation and send the parts to one thread that aggregates the results.
First we will create a channel and do nothing with it.
This will not compile yet becuase Rust can't tell what type of values we want to send over the channel.
```rust
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
```
We first crate a new channle using the `mpsc::channel` function.
`mpsc` stands for *multiple producer, single consumer*.
This means that the way Rust's std library implements channels means a channel have multiple *sending* ends that produce values but only one *receiving* end tha consumes thoes values.
This can be picured as multiple streams flowing together into one big river. Everything will be received at one point regradless of which stream it starts at.
Here we will start iwth a single producer for now, later we will add multiple producers when we get this example working.
The `mpsc::channel` function returns a tuple, where the first element is the sending end (the transmitter), and the second element is the receiving end (the receiver).
The abbreviatios `tx` and `rx` are normally used in many fields for *transmitter* and *receiver*, so we name our variables as such to indicate each end.
We are using a `let` statement with a pattern that destructures the tuple (this will be discussed in ch19).
For now knwo that using a `let` statement this way is a convenient approach to extract the pieces of the tuple returned by `mpsc::channel`
Now we will move the transmitting end into a spawned thread and have it send one strin so the spawned thread is communicating with the main thread.
This is like putting a rubber duck in the river upstream or send a chat message from one thread to another.
```rust
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
}
```
Here we use `thread::spawn` to create a new thread and then using `move` to move `tx` into the closure so the spawned thread owns `tx`.
The spawned thread needs to own the transmitter to be able to send messages through the channel.
The tansmitter has a `send` method that takes the value we want to send.
The `send` method returns a `Result<T, E>` type.
This means that if the receiver has already been dropped and there is nowhere to send a value, the send operation will return an error.
Here in this example we call `unwrap` to panic in the case of an error.
In a real application we should handle it properly, check ch 9 to review strategies for proper error handlin. [Found Here](./Error%20Handling.md)
In this next example we will get the value fom the receuver in the main thread.
This is like grabbing the rubber duck from the water at the end of the river or receiving a chat message.
```rust
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
```
The receiver has two useful methods: `recv` and `try_recv`.
Here we use `recv` this is short for *receive*, which will block the main thread's execution and wait until a value is sent down a channel.
Once a value is sent, `recv` will return it in a `Result<T, E>`.
When the transmitter closes, `recv` will return an error to signal that no more values will be coming.
The `try_recv` method doesn't block the thread, instead it will return a `Result<T, E>` immediately.
An `Ok` value holding a message if one is available and an `Err` value if there aren't any messages at this time.
Using `try_recv` is useful if this thread has other work to do while waiting for messages.
We could write a loop that calls `try_recv` every so often, handles a mesage if one is available and otherwise does other work for a little while until checking again.
Here we have used `recv` in this example for simplicity, we dont have any other work for the main thread to do other than wait for messages so blocking the main thread is appropriate.
Here is the output value form the main thread
```
Got: hi
```
## Channels and Ownership Transference
The ownership rules play a crutial rule in message sending because they ensure that you write safe, concurrent code.
Preventing errors in concurrent is the advantage of thinking about ownership throughout your Rust programs.
Here we will do an experiemnt to show how channels and ownership work together to prvent problems.
We will try to use a `val` value in the spawned thread *after* we sent it down the channel.
If we try compiling the code below we will see why this isn't allowed
```rust
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
```
Here we attempt to prin `val` after sending it down the channel via `tx.send`
Allowing this would be a bad idea. Once the value has been sent to another thread, that thread could have already modified or dropped it before we try to use the value again.
Or potentially, the other thread's modifications could cause errors or unexpected results due to inconsistent or non existent data.
Rust gives us this compilation error if we try to compile this
```
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:26
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {val}");
| ^^^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error
```
Here the `send` function takes ownership of its parameter and when the value is moved, the receiver takes ownership of it.
This stops us form accidentally using the value again after sending it.
Here the ownership ystem checks that everything is ok.
## Sending Multiple Vales and Seeing the Receiver Waiting
Here we will dive into a clear example that will show us that two separate threads that are talking to each other over the channel.
In this example we have made some modifications that will prove that the code is running concurrently.
The spawned thread will now send multiple messages and pause for a second between each message.
```rust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}
```
Now the spawned thread has a vector of strings that we want to send to the main thread.
We then iterate over them, sending each individually, and pause between each by calling the `thread::sleep` function witha a `Duration` value of 1 second.
Now in the main thread we are notcallign the `recv` function explicitly anymore.
Instead we treat `rx` as an iterator.
For each value, received we print it.
When th channel is closed, iteration iwll end.
Here is the output you should see with a 1-second apuse between each line
```
Got: hi
Got: from
Got: the
Got: thread
```
Due to no having any code that pauses or delays in the `for` loop in the main thread, we can tell that the main thread is wating to receive values form te spawned thread.
## Creating Multiple Producers by Cloning the Transmitter
We mentioned that `mpsc` was an acronym for *multiple producer, single consumer*.
Here we will put `mpsc` to use and expand te code example from before to create multiple threads that all send values to the same receiver.
We can do this by clonng the transmitter, shown here
```rust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
// --snip--
}
```
Now before we create the first spawned, we call `clone` on the transmitter.
This will give us a new transmitter we can pass to the first spawned thread.
We pass the original transmitter to the second spawned thread.
This now allows us to have two threads, each sending different mesages to one receiver.
Now when this code is run it should look similar to this
```
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
```
This may be in a different oder depending on the system.
This is what makes concurrency difficult but interesting.
If you experiment with `thread::sleep`, giving it various values in the different threads, each run will be even more nondeterministic and create different outputs each time.
Now we can look at a different method of concurrency, other than channels.
[Next Section Found Here](./Shared%20State%20Concurrency.md)

View File

@ -0,0 +1 @@
# Shared-State Concurrency

View File

@ -1 +1 @@
{"rustc_fingerprint":4305361489769004817,"outputs":{"17747080675513052775":{"success":true,"status":"","code":0,"stdout":"rustc 1.85.0 (4d91de4e4 2025-02-17)\nbinary: rustc\ncommit-hash: 4d91de4e48198da2e33413efdcd9cd2cc0c46688\ncommit-date: 2025-02-17\nhost: x86_64-unknown-linux-gnu\nrelease: 1.85.0\nLLVM version: 19.1.7\n","stderr":""},"13331785392996375709":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.so\nlib___.so\nlib___.a\nlib___.so\n/home/brock/.rustup/toolchains/stable-x86_64-unknown-linux-gnu\noff\npacked\nunpacked\n___\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_abi=\"\"\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"gnu\"\ntarget_family=\"unix\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_os=\"linux\"\ntarget_pointer_width=\"64\"\ntarget_vendor=\"unknown\"\nunix\n","stderr":""},"2063776225603076451":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.so\nlib___.so\nlib___.a\nlib___.so\n/home/brock/.rustup/toolchains/stable-x86_64-unknown-linux-gnu\noff\npacked\nunpacked\n___\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_abi=\"\"\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"gnu\"\ntarget_family=\"unix\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_os=\"linux\"\ntarget_pointer_width=\"64\"\ntarget_vendor=\"unknown\"\nunix\n","stderr":""}},"successes":{}} {"rustc_fingerprint":4305361489769004817,"outputs":{"13331785392996375709":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.so\nlib___.so\nlib___.a\nlib___.so\n/home/brock/.rustup/toolchains/stable-x86_64-unknown-linux-gnu\noff\npacked\nunpacked\n___\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_abi=\"\"\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"gnu\"\ntarget_family=\"unix\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_os=\"linux\"\ntarget_pointer_width=\"64\"\ntarget_vendor=\"unknown\"\nunix\n","stderr":""},"2063776225603076451":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.so\nlib___.so\nlib___.a\nlib___.so\n/home/brock/.rustup/toolchains/stable-x86_64-unknown-linux-gnu\noff\npacked\nunpacked\n___\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_abi=\"\"\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"gnu\"\ntarget_family=\"unix\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_os=\"linux\"\ntarget_pointer_width=\"64\"\ntarget_vendor=\"unknown\"\nunix\n","stderr":""},"17747080675513052775":{"success":true,"status":"","code":0,"stdout":"rustc 1.85.0 (4d91de4e4 2025-02-17)\nbinary: rustc\ncommit-hash: 4d91de4e48198da2e33413efdcd9cd2cc0c46688\ncommit-date: 2025-02-17\nhost: x86_64-unknown-linux-gnu\nrelease: 1.85.0\nLLVM version: 19.1.7\n","stderr":""}},"successes":{}}

View File

@ -1 +1 @@
{"rustc_fingerprint":4305361489769004817,"outputs":{"17747080675513052775":{"success":true,"status":"","code":0,"stdout":"rustc 1.85.0 (4d91de4e4 2025-02-17)\nbinary: rustc\ncommit-hash: 4d91de4e48198da2e33413efdcd9cd2cc0c46688\ncommit-date: 2025-02-17\nhost: x86_64-unknown-linux-gnu\nrelease: 1.85.0\nLLVM version: 19.1.7\n","stderr":""},"13331785392996375709":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.so\nlib___.so\nlib___.a\nlib___.so\n/home/brock/.rustup/toolchains/stable-x86_64-unknown-linux-gnu\noff\npacked\nunpacked\n___\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_abi=\"\"\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"gnu\"\ntarget_family=\"unix\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_os=\"linux\"\ntarget_pointer_width=\"64\"\ntarget_vendor=\"unknown\"\nunix\n","stderr":""},"2063776225603076451":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.so\nlib___.so\nlib___.a\nlib___.so\n/home/brock/.rustup/toolchains/stable-x86_64-unknown-linux-gnu\noff\npacked\nunpacked\n___\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_abi=\"\"\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"gnu\"\ntarget_family=\"unix\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_os=\"linux\"\ntarget_pointer_width=\"64\"\ntarget_vendor=\"unknown\"\nunix\n","stderr":""}},"successes":{}} {"rustc_fingerprint":4305361489769004817,"outputs":{"17747080675513052775":{"success":true,"status":"","code":0,"stdout":"rustc 1.85.0 (4d91de4e4 2025-02-17)\nbinary: rustc\ncommit-hash: 4d91de4e48198da2e33413efdcd9cd2cc0c46688\ncommit-date: 2025-02-17\nhost: x86_64-unknown-linux-gnu\nrelease: 1.85.0\nLLVM version: 19.1.7\n","stderr":""},"2063776225603076451":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.so\nlib___.so\nlib___.a\nlib___.so\n/home/brock/.rustup/toolchains/stable-x86_64-unknown-linux-gnu\noff\npacked\nunpacked\n___\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_abi=\"\"\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"gnu\"\ntarget_family=\"unix\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_os=\"linux\"\ntarget_pointer_width=\"64\"\ntarget_vendor=\"unknown\"\nunix\n","stderr":""},"13331785392996375709":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.so\nlib___.so\nlib___.a\nlib___.so\n/home/brock/.rustup/toolchains/stable-x86_64-unknown-linux-gnu\noff\npacked\nunpacked\n___\ndebug_assertions\nfmt_debug=\"full\"\noverflow_checks\npanic=\"unwind\"\nproc_macro\nrelocation_model=\"pic\"\ntarget_abi=\"\"\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"gnu\"\ntarget_family=\"unix\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_feature=\"x87\"\ntarget_has_atomic\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_has_atomic_equal_alignment=\"16\"\ntarget_has_atomic_equal_alignment=\"32\"\ntarget_has_atomic_equal_alignment=\"64\"\ntarget_has_atomic_equal_alignment=\"8\"\ntarget_has_atomic_equal_alignment=\"ptr\"\ntarget_has_atomic_load_store\ntarget_has_atomic_load_store=\"16\"\ntarget_has_atomic_load_store=\"32\"\ntarget_has_atomic_load_store=\"64\"\ntarget_has_atomic_load_store=\"8\"\ntarget_has_atomic_load_store=\"ptr\"\ntarget_os=\"linux\"\ntarget_pointer_width=\"64\"\ntarget_thread_local\ntarget_vendor=\"unknown\"\nub_checks\nunix\n","stderr":""}},"successes":{}}