RustBrock/Applying Async.md
darkicewolf50 9ac0d0563c
Some checks failed
Test Gitea Actions / first (push) Successful in 15s
Test Gitea Actions / check-code (push) Failing after 15s
Test Gitea Actions / test (push) Has been skipped
Test Gitea Actions / documentation-check (push) Has been skipped
finished ch17.2
2025-03-20 15:57:40 -06:00

19 KiB

Applying Concurrency with Async

This section will mostly focus on what the difference between threads and futures.

In many cases, the APIs for working with concurrency using async are very similar to those for using threads.

In other cases, they are very different.

Even in the cases where the APIs look similar between threads and async, they often have different behavior, and they nearly always have different performance characteristics.

Creating a New Task with spawn_task

First we will try to do count up using two threads, but instead using async.

The trpl crate supplies a spawn_task function that looks very similar to the thread::spawn API, and a sleep function that us an async version of the thread::sleep API.

Here we can use these together to implement the counting example.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}

As the starting point we set up the main function with trpl::run so that our top-level function can be async.

Note that from this point forward in this chapter, every example will include this exact same wrapping code with trpl::run in main.

So we will often skip it as we do with main.

Don't forget this in your code.

Next we wrote two loops within that block, each containing a trpl::sleep call, which waits for a a second (500 milliseconds) before sending the next message.

Here we put one loop in the body of a trpl::spawn_task and the other in a top-level for loop.

We also add an await after the sleep calls.

This code behaves similarly to the thread-based implementation.

This includes the fact that you may see the messages appear in a different order in yor own terminal when you run it.

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

This version stops as soon as the for loop in the bod of th main async block finishes, due to the task spawned by spawn_task is shut down when the main function ends.

If you want it to run all the way to the end of completition, you will need to use a join handle to wait for the first task to complete.

With threads, we used the join method to "block" unitl the thread was done running.

In this next example we can use await to do the same thing, becuase the task handle itself is a future.

Here the Output type is a Result, so we also unwrap it after awaiting it.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}

This is the updated output

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

It now looks like async and threads give us the same basic outcomes.

This just with different syntax using await instead of calling join on the join handle, and awaiting the sleep calls.

The biggest difference is that we didn't need spawn another operating system thread to do this.

In pack we don't even need to spawn a task here.

This is due to the way async block compiles to anonymous futures, we can put each loop in an async block and have the runtime run them both to completion using the trpl::join function.

In the section Waiting for All Threads to Finishing Using join Handles, we shows how to use the join method on the JoinHandle tpe returned when youcall std::thread::spawn.

Here the trpl::join function is similarbut for futures.

When you give it two futures, it produces a single new future whose output is a tuple cointaing the output of aech future ou passed in once they both complete.

In this next example we use trlp::join to wait for both fut1 and fut2 to finish.

Here we do not await fut1 and fut2 but instead the new future produced by trpl::join.

Here we ignore the output, becuase it is just a tuple containing two unit values.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}

Here we get this output, we see both futures run to completion

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

This will produce the exact same order every time, this is very different from what we saw with threads.

This is because the trpl::join function is fair

This mean that it checks each future equally often, alternating between them, and never lets one race ahead if the other is ready.

Wth threads, the operating system decides which thread to check and how long to let it run.

With async Rust, the runtime might decides which taks to check.

An async runtime might use OS theads under the good as how it manages concurrency.

Runtimes dont have to gurarntee fairness for any given operation, and they often offer different APIs to let you choose whether or not you want fairness.

We will try some of these variantion on awaiting the futures and see what they do:

  • Remove the async block form around either or both of the loops
  • Await each async block immediately after defining it
  • Wrap only the first loop in an async block and await the resulting future after the body ofthe second loop

Counting Up on Two Tasks Using Message Passing

Sharing data between futures will also be familiar.

We will use message passing again, but this time we will use async version of the tpyes and functions.

We will take a slightly different path than we did in the Using Message Passing to Transfer Data Between Threads to illustrate some of the differences between thread-based and futures-based concurrency.

Here we will begin with just a single async block, not spawning a separate task as we spawned a speerate thread.

extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("Got: {received}");
    });
}

We use ``trpl::channel`, an async version of the multiple-producer, single-consumer chanel API that we used before.

The async version of the API is a little different.

It uses a mutable rather than an immuatable reciever rx and its recv method produces a future we need to awiat rather than producing the value directly.

Notice that we don't have to sparate thread or even a task, we merly need to await the rx.recv call.

The sychronous Receiver::recv method in std::mpsc::channel blocks until it receives a message.

The trpl::Receiver::recv method does not, because it is async.

Instead of blocking, it hands control bakc to the runtime until either a message is received or the send side of the channel closes.

This by contrast, we don't await the send call, because it doesn't block.

It doesnt need ot becuase the channle we are senfing it ot is unbounded.

Note that due to all of this async code runs in a async block in a trpl::run call, everything within it can avoid blocking.

The code outside it will block on the run function returning.

The whole point of the trpl::run function is that it lets you choose where to block on some set of async code.

This is where to transition between sync and async code.

In most async runtimes, run is actually named block_on for this reason.

Notice two things about this example.

The message will arrive right away.

Although we use a future here, there's no concurrency yet.

Everything in the listing happens in sequence, just as it would if there were no futures involved.

In this next example we will address the first example by sending a series of messages and sleeping in between them.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}

Now in addition to sending messages, we need to receive them.

This case we know how many massages are coming in, we could do that manually by calling rx.recv().await four times.

In the really world, we generally be waiting on some unknown number of messages, so we need to keep waiting until we determine that there are no more messages.

Before, we used a for loop to process all the items received form a synchronous channel.

Rust doesn't yet have a way to write a for loop over an asynchronous series of items.

Instead we need to use a loop we haven't seen before, a while let conditonal loop.

This is the loop version of the if let construct we saw back in the Concise Control Flow with if let and let else.

This loop will continue executing as long as the pattern specified continues to match the value.

The rx.recv call produces a future, which will be avaited for.

The runtime will pause the future until it is ready.

Once the message arrives, the future will resolve to Some(message) as man times as the message arrives.

When the channel closes, regradless of whether any messages have arrived, the future will instead reslove to None.

This indicates that there are no more values and thus we should stop polling, that is, stop awaiting.

The while let loops does all of this together.

If the result of calling rx.recv().await is Some(message), we get access to the message and we can then use it in the loop body, just as we could with if let.

If the result is None, the loop ends.

Every time the loop completes, it hits the awiat point again, so the runtime pauses again until another message arrives.

Now the code sucessfully sends and receives.

This still has some problems.

One thing, the messages do not arrive at half-second intervals.

They arrive all at once, 2 (2,000 milliseconds) after we start the program.

Another thing is that this program never exits, instead it waits forever for a new messages.

You need to manually shut donw the runtime using ctrl-c.

Lets see why the messages come in all at once after the full delay rather than coming in with delays between each one.

Within a given async block, the order in which await keywords appear in the code is also the order in which they are executed whne the program runs.

In the previous example everything in the async block runs linearly.

There is still no concurrency.

All the tx.send calls happen, interspersed with all of the trpl::sleep calls and their associated await points.

Only then does the while let loop get to go through any of the await points on the recv calls.

To get the intended behavior, where the sleep delay happens between each message.

We just need to put the tx and rx operations in thier own async blocks.

Thne the runtime can execute each of them spearately using trpl::join, just as in the counting example.

We will once again await the result of calling trpl::join, not the individual futures.

If we awaited the individual futres in sequence, we would just end up bakc in a sequential flow exactly what we are trying not to do.

Here is an example of this

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

Here we updated the code so that the messages get printed at 500-milliseconds intervals rather than all in a rush after 2 seconds.

The program still never exits, because of the way while let loop interacts with trpl::join

  • The future reutrned from trpl::join completes only once both futures passed to it have completed
  • The tx future completes once it finishes sleeping after sending the last message in vals
  • The rx future won't complete until the while let loop ends
  • The while let loop won't end until awaiting rx.recv produces None
  • Awaiting rx.recv will return None only once the other end of the channel is closed
  • The channel will close only if we call rx.close or when the sender side, tx, is dropped
  • We don't call rx.close anywhere and tx will not be dropped until the outermost async block passed to trpl::run ends
  • The block can't end because it is blocked on trpl::join completing, which takes us back to the top of this list We could manually close rx by calling rx.close somewhere, but this doesn't make much snese.

Stopping after handling some arbitrary number of messages.

This does not make much sense.

Stopping after some arbitrary nubmer of messages would ake the program shut down, but we could miss messages.

We need a different way to ensure that tx gets dropped before the end of the function.

We async block where send the messages doesn't require ownerhip, but if we could move tx into that async block, it would be dropped once that block ends.

The same basic dynamics apply o async blocks, so the move keyword works with async blocks just as it does with closures.

In this next example we change the block used to send messages from async to async move.

Now when we run this version, it shuts down gracefully after the last message is send and received

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

This async channel is also a multiple-producer channel, so if we call clone on tx if we want to send messages from the multiple futures.

Thi is shown below.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join3(tx1_fut, tx_fut, rx_fut).await;
    });
}

Now we clone tx, creating tx1 outside the first async block.

Next we move tx1 into that block just like we idd before tx.

Then we move the original tx into a new async block, wher we send more messages on a slightly slower delay.

We happen to put this new async block after the async block for receiving messages, but it also could go before as well.

The key is the order in which futures are awaited, not in which they are created.

Both of the async blocks for sending messages need to be async move blocks so that both tx and tx1 get dropped when those blocks finish.

If not we would end up back in the same infinite loop we started out in.

Finally, we switch form trpl::join to trpl::join3 to handle the additional future.

In this out we can see all the messages form both sneidng futures, and because the sending futures use slightly differnt delays after sending.

The messages are also received at those different intervals.

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

This is a start, but it limits us to just a handful of futures.

Two with join or three with join3

Next we will look into working with more futures.

Next ection Link Here