How to Use Async Channels in Rust

Use `tokio::sync::mpsc` for unbounded or bounded async message passing between tasks, where the sender and receiver are `Send` and can be awaited directly.

The async pipe that doesn't block

You're building a log aggregator. One task reads logs from disk. Another task ships them to a remote server. You can't share a Vec between tasks; that requires locks and risks deadlocks. You need a way to pass data safely. In async Rust, the tool is the async channel.

Standard channels block the thread when the buffer is full or empty. In an async runtime, blocking a thread is expensive. It wastes a worker thread that could be running other tasks. Async channels solve this by integrating with the runtime. When a sender or receiver needs to wait, it yields control back to the runtime. The thread stays alive and runs other tasks. The waiting task resumes only when the condition changes.

How async channels work

Think of tokio::sync::mpsc as a smart conveyor belt in a factory. The producer puts items on the belt. The consumer takes them off. If the belt is full, the producer doesn't stand there spinning its wheels. It steps aside and lets other workers do their jobs. When the consumer removes an item, the belt signals the producer to step back and load the next item.

The channel has two ends: a sender (tx) and a receiver (rx). The sender implements Clone, so you can hand copies to multiple tasks. The receiver is usually unique, though you can wrap it in shared state if needed. The channel holds a buffer of items. When you create the channel, you set the buffer size. This size controls backpressure.

Backpressure is the mechanism that stops a fast producer from overwhelming a slow consumer. If the buffer is full, send pauses the sender task. The sender waits until the receiver takes an item and frees space. This keeps memory usage bounded. If you use an unbounded channel, the producer can fill memory until the system runs out. Bounded channels are the safe default.

Minimal example

Here's a bounded channel in action. The sender produces messages. The receiver prints them. The channel capacity is 5.

use tokio::sync::mpsc;

/// Demonstrates a basic bounded async channel.
/// The sender pauses if the buffer fills up.
#[tokio::main]
async fn main() {
    // Create a bounded channel with capacity 5.
    // The type parameter <String> defines the message type.
    let (tx, mut rx) = mpsc::channel::<String>(5);

    // Spawn a task to receive messages.
    // We move rx into the task so it owns the receiver.
    tokio::spawn(async move {
        // Loop until the channel closes and the buffer is empty.
        // recv() returns None when all senders are dropped.
        while let Some(msg) = rx.recv().await {
            println!("Received: {}", msg);
        }
    });

    // Send messages from the main task.
    for i in 0..10 {
        let msg = format!("Message {}", i);
        
        // send() returns a Result.
        // It errors if the receiver is dropped.
        // We unwrap here for brevity; production code handles errors.
        if tx.send(msg).await.is_err() {
            eprintln!("Receiver dropped, stopping sender");
            break;
        }
    }

    // The main task exits here.
    // The spawned task continues until the channel closes.
    // Tokio keeps running because the spawned task is active.
}

The channel handles synchronization automatically. You don't write locks. You don't manage condition variables. The runtime coordinates the tasks.

Convention aside: Cloning the sender is cheap. It just increments a reference count. The community convention is to clone tx for every task that needs to send, rather than sharing tx through an Arc. The sender is designed to be cloned freely.

Walking through the mechanics

When you call tx.send(msg).await, three things can happen.

First, if the buffer has space, the message goes in and send returns Ok. The task continues immediately.

Second, if the buffer is full, send registers a waker with the runtime and suspends the task. The thread is free to run other tasks. When the receiver calls recv and removes an item, the channel wakes the suspended sender. The sender resumes, writes the message, and returns Ok.

Third, if the receiver is dropped, send returns Err. This tells the sender that no one is listening. The sender should stop sending.

The receiver side is similar. rx.recv().await returns Option<T>. If there's a message, it returns Some(msg). If the buffer is empty, the receiver suspends and waits for a sender. If all senders are dropped and the buffer is empty, recv returns None. This signals that the channel is closed.

The message type must implement Send. The compiler enforces this. Tasks can move between threads, so the data must be safe to transfer. If you try to send a Rc<String>, the compiler rejects it with E0277 (trait bound not satisfied). Rc uses reference counting that isn't thread-safe. You'd need Arc<String> instead. This check prevents data races at compile time.

Trust the Send bound. If the compiler rejects your type, it's saving you from a data race.

Realistic example: Multiple producers

In real code, you often have multiple tasks sending to a single receiver. You clone the sender for each task. You also need to handle the case where the receiver drops while senders are running.

use tokio::sync::mpsc;

/// Worker task that sends messages to a shared channel.
/// Stops gracefully if the receiver is dropped.
async fn worker(id: u32, tx: mpsc::Sender<String>) {
    for i in 0..5 {
        let msg = format!("Worker {} says {}", id, i);
        
        // Check the result of send.
        // If the receiver is gone, send returns Err.
        if tx.send(msg).await.is_err() {
            eprintln!("Worker {} stopped: receiver gone", id);
            return;
        }
        
        // Simulate work.
        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
    }
}

/// Main function demonstrating multiple producers.
#[tokio::main]
async fn main() {
    // Bounded channel with capacity 10.
    let (tx, mut rx) = mpsc::channel::<String>(10);

    // Spawn three workers.
    for id in 0..3 {
        // Clone tx for each task.
        // Cloning is cheap and safe.
        let tx_clone = tx.clone();
        tokio::spawn(worker(id, tx_clone));
    }

    // Drop the original sender.
    // This ensures the channel closes when all workers finish.
    // If we kept tx, the channel would stay open forever.
    drop(tx);

    // Collect all messages.
    while let Some(msg) = rx.recv().await {
        println!("Main: {}", msg);
    }
}

The drop(tx) line is critical. The channel stays open as long as at least one sender exists. If you keep the original sender in the main task, the channel never closes, even after all workers exit. The receiver would hang forever waiting for a message. Dropping the original sender is the standard pattern when you want the channel to close after spawned tasks finish.

Drop the original sender. If you don't, your receiver waits for a message that never comes.

Pitfalls and compiler errors

Async channels are safe, but they have traps.

Forgetting to drop the sender. This is the most common bug. The receiver loop runs forever because the channel never closes. Always drop the original sender if you only want spawned tasks to keep the channel alive.

Using unbounded channels. mpsc::unbounded_channel creates a channel with no limit. If the producer is faster than the consumer, memory grows until the system crashes. Use unbounded channels only when you're certain the producer won't outpace the consumer. In production, bounded channels are safer.

Ignoring send errors. If the receiver drops, send returns Err. If you ignore this error, the sender keeps trying to send to a dead channel. Check the result and stop sending on error.

Blocking inside async tasks. If you call a blocking function inside a task that's waiting on a channel, you block the runtime thread. Use tokio::task::spawn_blocking for blocking work.

Sending non-Send types. The compiler catches this with E0277. If you need to share data across tasks, use Arc instead of Rc. Use Mutex instead of RefCell if you need interior mutability across threads.

Convention aside: The community convention for the receiver loop is while let Some(msg) = rx.recv().await. This pattern is idiomatic and handles the None case cleanly. Avoid loop { match rx.recv().await { ... } } unless you need complex control flow.

Decision: which channel to use

Rust's async ecosystem offers several channel types. Pick the one that matches your flow.

Use tokio::sync::mpsc when you have one receiver and one or more senders, and you need reliable delivery with backpressure. This is the default choice for producer-consumer patterns.

Use tokio::sync::broadcast when every receiver needs a copy of every message, and you can tolerate dropping messages if a receiver falls too far behind. Use this for event distribution where receivers might lag.

Use tokio::sync::watch when you only care about the latest value and receivers should update immediately when the sender changes, without a queue. Use this for configuration updates or state broadcasting.

Use tokio::sync::oneshot when you need a single message to signal completion or pass a result exactly once between two tasks. Use this for task results or cancellation signals.

Pick the channel that matches your flow. Forcing a broadcast pattern onto an mpsc channel creates deadlocks.

Where to go next