Use tokio::sync::mpsc for unbounded or bounded async message passing between tasks, where the sender and receiver are Send and can be awaited directly. Unlike standard channels, these integrate with the async runtime, allowing you to await send and receive operations without blocking the thread.
Here is a practical example using a bounded channel to prevent memory exhaustion and handle backpressure:
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<String>(10); // Bounded channel with capacity 10
// Spawn a task to receive messages
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
println!("Received: {}", msg);
}
});
// Send messages from the main task
for i in 0..15 {
let msg = format!("Message {}", i);
// This will wait if the channel is full (backpressure)
if tx.send(msg).await.is_err() {
eprintln!("Receiver dropped");
break;
}
}
// Give the receiver time to process remaining messages
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
If you need to send messages to multiple receivers (broadcasting), use tokio::sync::broadcast instead. Note that broadcast drops messages if a receiver falls behind, whereas mpsc blocks the sender.
For cloning senders to multiple producers, simply clone the tx handle:
let (tx, mut rx) = mpsc::channel::<i32>(100);
let tx2 = tx.clone();
// Both tx and tx2 can send to the same receiver
tokio::spawn(async move {
tx.send(1).await.unwrap();
});
tokio::spawn(async move {
tx2.send(2).await.unwrap();
});
// Receiver gets both messages
println!("{}", rx.recv().await.unwrap());
println!("{}", rx.recv().await.unwrap());
Key considerations:
- Backpressure: Always use a bounded channel (
mpsc::channel::<T>(capacity)) in production to prevent unbounded memory growth. Thesendmethod will await until space is available. - Receiver Drop: If the receiver is dropped,
sendreturns anErr. If the sender is dropped,recvreturnsNone. - Unbounded: Use
mpsc::unbounded_channelonly if you are certain the producer won't outpace the consumer indefinitely, as it can lead to OOM errors.