How to Build a Pub/Sub System in Rust

Build a Rust Pub/Sub system using tokio channels to decouple message producers from multiple consumers.

When one sender needs many listeners

You are building a real-time dashboard. A sensor stream feeds temperature data. Three widgets need updates: a gauge, a log, and an alert system. The sensor doesn't know about the widgets. The widgets shouldn't block the sensor. If the alert system is slow, the gauge shouldn't stall. You need a pub/sub system.

In Python or JavaScript, you might reach for a global event emitter or a callback registry. In Rust, you reach for channels. The standard library and tokio provide synchronization primitives that model this pattern directly. For one-to-many communication, tokio::sync::broadcast is the tool. It decouples the publisher from subscribers, handles backpressure via a bounded buffer, and keeps everything type-safe.

The broadcast channel

Pub/sub is about decoupling time and space. The publisher sends a message without knowing who receives it. Subscribers pull messages when they are ready. Rust implements this with broadcast::channel.

Think of a radio station. The station transmits a signal continuously. Radios tune in. The station doesn't track which radios are listening. Radios catch what they can. If a radio is turned off, it misses the broadcast. When it turns on, it hears the current signal. broadcast works the same way. It maintains a ring buffer of recent messages. Subscribers pull from that buffer. If a subscriber falls too far behind, it starts dropping messages.

The channel has a fixed capacity. This capacity defines how many messages a slow subscriber can miss before data loss begins. It also bounds memory usage. The channel never grows unbounded.

use tokio::sync::broadcast;

/// Creates a broadcast channel with a buffer size of 16.
/// The generic type is String.
fn create_channel() -> (broadcast::Sender<String>, broadcast::Receiver<String>) {
    // Buffer size limits memory if subscribers lag.
    // Power-of-two sizes are efficient for the internal ring buffer.
    broadcast::channel::<String>(16)
}

Minimal example

A broadcast channel returns a Sender and a Receiver. The Sender can be cloned and moved across tasks. The Receiver cannot be cloned. To add more subscribers, you call subscribe() on the Sender. Each call returns a new Receiver pointing to the same channel.

use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    // Create channel. Buffer holds 16 messages.
    let (tx, mut rx1) = broadcast::channel::<String>(16);
    
    // Create a second subscriber.
    // subscribe() returns a Receiver starting from the current head.
    let mut rx2 = tx.subscribe();

    // Spawn a task to publish.
    // tx is moved into the task.
    tokio::spawn(async move {
        // send() returns Result.
        // Err occurs if the channel is closed or buffer is full.
        tx.send("Market open").unwrap();
    });

    // Receive on both subscribers.
    // recv() awaits a message.
    println!("Widget 1: {}", rx1.recv().await.unwrap());
    println!("Widget 2: {}", rx2.recv().await.unwrap());
}

The publisher sends once. Both subscribers receive the message. The order is preserved. rx1 and rx2 both get "Market open".

How the ring buffer works

broadcast uses a ring buffer internally. The buffer holds a fixed number of messages. Each message has a sequence number. The Sender advances a global sequence number on every send. Each Receiver tracks its own sequence number.

When recv is called, the receiver checks its sequence number against the global one. If they match, the receiver reads the message and advances. If the global number is ahead, the receiver reads messages until it catches up. If the gap exceeds the buffer size, the receiver is "lagging". The oldest messages have been overwritten. recv returns a Lagged error indicating how many messages were skipped.

This design means broadcast is non-blocking for the sender. send never waits for subscribers. It writes to the buffer and returns. If the buffer is full, send returns an error immediately. The sender must decide how to handle this. Usually, you drop the message or log a warning.

Subscribers must keep up. If a subscriber processes messages slowly, it will eventually lag. The Lagged error is your signal. You can skip the missing messages, or you can pause processing to catch up. Ignoring Lagged leads to silent data loss.

Realistic pub/sub loop

Real code involves loops, error handling, and multiple tasks. Publishers often run in a loop, sending periodic updates. Subscribers run in loops, processing messages until the channel closes.

use tokio::sync::broadcast;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // Buffer size 8. Small buffer exposes lag quickly.
    let (tx, mut rx1) = broadcast::channel::<String>(8);
    let mut rx2 = tx.subscribe();

    // Publisher loop.
    tokio::spawn(async move {
        for i in 0..10 {
            let msg = format!("Tick {}", i);
            
            // send() returns Err if all receivers dropped.
            // It also returns Err if buffer is full.
            if tx.send(msg).is_err() {
                eprintln!("No subscribers or buffer full");
                break;
            }
            
            // Simulate periodic updates.
            sleep(Duration::from_millis(100)).await;
        }
        // tx drops here. Channel closes.
    });

    // Subscriber 1: Fast processing.
    tokio::spawn(async move {
        while let Ok(msg) = rx1.recv().await {
            println!("Fast subscriber: {}", msg);
            // Process immediately.
        }
        println!("Fast subscriber: channel closed");
    });

    // Subscriber 2: Slow processing.
    tokio::spawn(async move {
        while let Ok(msg) = rx2.recv().await {
            // Simulate slow work.
            sleep(Duration::from_millis(200)).await;
            println!("Slow subscriber: {}", msg);
        }
        println!("Slow subscriber: channel closed");
    });

    // Keep main alive long enough to see output.
    sleep(Duration::from_secs(2)).await;
}

The publisher sends every 100ms. Subscriber 1 processes instantly. Subscriber 2 takes 200ms per message. Subscriber 2 will lag. After a few messages, the buffer fills. Subscriber 2 starts receiving Lagged errors. The while let Ok(msg) pattern filters out errors, so the loop continues but skips lagged messages. This is a common pattern for fire-and-forget streams.

Pitfalls and error handling

Broadcast channels have specific failure modes. Understanding them prevents subtle bugs.

The Lagged error. This is the most common issue. If a subscriber falls behind by the buffer size, recv returns Err(RecvError::Lagged(n)). The value n tells you how many messages were skipped. You must handle this. If you unwrap recv, your task panics on lag. Use pattern matching. Decide if you can skip messages or if you need to pause.

The Disconnected error. When all Sender clones drop, the channel closes. recv returns Err(RecvError::Disconnected). This signals the end of the stream. Loops should break on this error. The while let Ok(msg) pattern handles this automatically by stopping the loop.

The Clone tax. broadcast requires messages to be Clone. The channel clones the message for each subscriber. If you send a large Vec or struct, every subscriber pays the clone cost. If cloning is expensive, wrap the message in Arc<T>. Cloning an Arc is cheap. It increments a reference count. The data stays on the heap.

use std::sync::Arc;
use tokio::sync::broadcast;

#[tokio::main]
async fn main() {
    // Wrap large data in Arc to avoid cloning.
    let (tx, mut rx) = broadcast::channel::<Arc<Vec<u8>>>(16);
    
    let data = Arc::new(vec![0u8; 1024 * 1024]); // 1MB
    
    // Cloning Arc is cheap.
    tx.send(data.clone()).unwrap();
    
    // Receiver gets Arc. No data copy.
    if let Ok(payload) = rx.recv().await {
        println!("Received {} bytes", payload.len());
    }
}

Send bounds. Messages must be Send to cross task boundaries. If you try to send a !Send type, the compiler rejects you with E0277 (trait bound not satisfied). This protects you from data races. You cannot send a raw pointer or Rc across tasks. Use Arc instead.

Buffer size choice. The buffer size is a trade-off. A small buffer detects lag quickly but drops messages sooner. A large buffer hides latency bugs and uses more memory. Start small. Increase only if profiling shows unnecessary drops. The community convention is powers of two, like 16 or 32, for alignment efficiency.

Don't ignore the Lagged error. It's your backpressure signal.

Choosing the right channel

Rust offers several channel types. Pick based on your delivery requirements and topology.

Use broadcast when you have multiple subscribers and can tolerate dropped messages if subscribers lag. Use broadcast for event streams where missing a few updates is acceptable, like UI ticks or sensor readings. Use broadcast when you want a simple fire-and-forget model without managing subscriber lists manually.

Use mpsc when you need guaranteed delivery to a single consumer. Use mpsc when you want to avoid cloning overhead, as mpsc moves values instead of cloning. Use mpsc for work queues where every task must be processed exactly once. If you have multiple workers, use one mpsc receiver and distribute tasks, or use a work-stealing pool.

Use watch when subscribers only care about the latest value and don't need a history buffer. Use watch for configuration updates or state flags where only the current state matters. watch is lighter weight than broadcast and doesn't require Clone on the value type, though it does require Clone for the inner value if you want to read it.

Use oneshot when you need a single message from one sender to one receiver. Use oneshot for signaling completion or returning a result from a spawned task.

Broadcast is for fire-and-forget. If you need delivery guarantees, rethink your architecture.

Where to go next