How to Use Async Channels in Rust

Use `tokio::sync::mpsc` for unbounded or bounded async message passing between tasks, where the sender and receiver are `Send` and can be awaited directly.

Use tokio::sync::mpsc for unbounded or bounded async message passing between tasks, where the sender and receiver are Send and can be awaited directly. Unlike standard channels, these integrate with the async runtime, allowing you to await send and receive operations without blocking the thread.

Here is a practical example using a bounded channel to prevent memory exhaustion and handle backpressure:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<String>(10); // Bounded channel with capacity 10

    // Spawn a task to receive messages
    tokio::spawn(async move {
        while let Some(msg) = rx.recv().await {
            println!("Received: {}", msg);
        }
    });

    // Send messages from the main task
    for i in 0..15 {
        let msg = format!("Message {}", i);
        
        // This will wait if the channel is full (backpressure)
        if tx.send(msg).await.is_err() {
            eprintln!("Receiver dropped");
            break;
        }
    }

    // Give the receiver time to process remaining messages
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}

If you need to send messages to multiple receivers (broadcasting), use tokio::sync::broadcast instead. Note that broadcast drops messages if a receiver falls behind, whereas mpsc blocks the sender.

For cloning senders to multiple producers, simply clone the tx handle:

let (tx, mut rx) = mpsc::channel::<i32>(100);
let tx2 = tx.clone();

// Both tx and tx2 can send to the same receiver
tokio::spawn(async move {
    tx.send(1).await.unwrap();
});

tokio::spawn(async move {
    tx2.send(2).await.unwrap();
});

// Receiver gets both messages
println!("{}", rx.recv().await.unwrap()); 
println!("{}", rx.recv().await.unwrap());

Key considerations:

  1. Backpressure: Always use a bounded channel (mpsc::channel::<T>(capacity)) in production to prevent unbounded memory growth. The send method will await until space is available.
  2. Receiver Drop: If the receiver is dropped, send returns an Err. If the sender is dropped, recv returns None.
  3. Unbounded: Use mpsc::unbounded_channel only if you are certain the producer won't outpace the consumer indefinitely, as it can lead to OOM errors.