How to Implement Fan-Out/Fan-In Concurrency in Rust

Use threads to split work and channels to gather results for fan-out/fan-in concurrency in Rust.

The manager and the workers

You have a list of 500 images to resize. Processing them one by one takes minutes. You want to blast them to every core on your machine, let the cores chew through the work, and then collect the resized images back into a single list. This pattern is called fan-out and fan-in. Fan-out scatters work to multiple threads. Fan-in gathers the results back together.

Rust makes this pattern safe and explicit. The tools you need are threads and channels. You spawn a thread for each task. Each thread sends its result through a channel. The main thread reads from the channel until all results arrive. The compiler guarantees that no two threads write to the channel at the same time in a way that corrupts memory. You get parallelism without data races.

How the pattern maps to Rust

Think of a restaurant kitchen. The head chef holds a stack of orders. The chef hands an order to each line cook. That is fan-out. The cooks prepare the food and place it in the pickup window. The head chef collects the finished dishes from the window. That is fan-in.

In Rust, the pickup window is a channel. Specifically, mpsc::channel. The name stands for Multiple Producer, Single Consumer. Multiple threads can send messages. One thread receives them. The channel handles the synchronization. You do not need locks to protect the channel itself. The channel is thread-safe by design.

The sender implements Clone. This is the mechanism that enables fan-out. You clone the sender and hand a copy to each thread. The receiver stays in the main thread. The channel tracks how many senders exist. When the last sender is dropped, the channel closes. The receiver knows no more messages are coming. This automatic closing is what makes fan-in clean. You do not need to count results manually. You just read until the channel closes.

Minimal example

This code spawns four threads. Each thread doubles a number and sends the result. The main thread collects all results.

use std::sync::mpsc;
use std::thread;

fn main() {
    // Create a channel. tx is the sender, rx is the receiver.
    let (tx, rx) = mpsc::channel();
    let mut handles = vec![];

    // Fan-out: spawn a thread for each task
    for i in 0..4 {
        // Clone the sender so each thread gets its own handle.
        // The channel tracks the number of clones internally.
        let tx = tx.clone();
        
        handles.push(thread::spawn(move || {
            // Simulate work
            let result = i * 2;
            
            // Send the result. unwrap() panics if the receiver is gone.
            tx.send(result).unwrap();
        }));
    }

    // Drop the original sender. This tells the channel that no more
    // senders will be created. The receiver can stop waiting once
    // all cloned senders are also dropped.
    drop(tx);

    // Fan-in: collect results until the channel closes
    let results: Vec<i32> = rx.iter().collect();
    
    // Wait for threads to finish. This ensures threads do not outlive
    // the receiver, which would cause a panic if a thread tries to send.
    for h in handles {
        h.join().unwrap();
    }

    println!("{:?}", results);
}

Drop the original sender. If you don't, your receiver waits forever.

Walking through the mechanics

mpsc::channel() returns a tuple containing a Sender and a Receiver. The sender moves data into the channel. The receiver pulls data out. The channel is unbounded by default. It grows as needed. If you send faster than you receive, memory usage climbs. For fan-out/fan-in, this is usually fine because the total work is finite.

Inside the loop, tx.clone() creates a new handle. Cloning does not copy the data inside the channel. It copies the reference to the channel buffer and bumps an internal counter. Each thread receives its own Sender. The move keyword on the closure forces the closure to take ownership of tx. This satisfies the compiler. The closure must own everything it captures because it runs on a different thread.

When a thread finishes, its Sender is dropped. The channel decrements its internal counter. When the counter hits zero, the channel closes. The receiver detects this state. rx.iter() returns an iterator that calls recv() internally. recv() blocks until a message arrives or the channel closes. When the channel closes, recv() returns an error. The iterator stops. collect() gathers all messages into a vector.

drop(tx) is the trigger. After the loop, the original tx is still alive. The channel thinks a sender is still active. You must drop it manually. If you skip this step, rx.iter() hangs indefinitely. The program appears to freeze. This is the most common bug in fan-out/fan-in. Always drop the original sender after spawning.

join() waits for threads to finish. This is a safety measure. If a thread outlives the receiver, and the thread tries to send, the send() call panics because the receiver is gone. Joining ensures threads complete before the receiver is dropped. It also propagates panics. If a worker panics, join() returns an error. You can handle it instead of crashing the whole program.

Realistic example

Real code processes a batch of items. The function takes a vector, fans out work, fans in results, and returns a vector. Order is not guaranteed. The channel delivers results as they arrive. Fast tasks finish first. If order matters, sort the results or attach an index.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

/// Process a batch of items concurrently using fan-out/fan-in.
/// Returns results in sorted order to restore input sequence.
fn process_batch(items: Vec<u64>) -> Vec<u64> {
    let (tx, rx) = mpsc::channel();
    let mut handles = vec![];

    for item in items {
        let tx = tx.clone();
        handles.push(thread::spawn(move || {
            // Simulate heavy work with variable duration
            thread::sleep(Duration::from_millis(50));
            tx.send(item * 2).unwrap();
        }));
    }

    drop(tx);

    // Collect results. Order depends on which thread finishes first.
    let mut results: Vec<u64> = rx.iter().collect();
    
    // Restore order if the caller expects it.
    results.sort();

    for h in handles {
        h.join().expect("Worker thread panicked");
    }

    results
}

fn main() {
    let data = vec![10, 2, 5, 8];
    let output = process_batch(data);
    println!("{:?}", output);
}

Sort the results if order matters. The channel delivers speed, not sequence.

Pitfalls and compiler errors

Forgetting drop(tx) causes a deadlock. The program hangs. No error message. Just silence. Check your drop. This happens when you copy-paste the loop and miss the cleanup step.

Order confusion is a logical bug. Results come back out of order. Do not assume result index i corresponds to input index i. If you need to correlate results with inputs, send a tuple containing the result and the original index. Sort by index after collection.

Panics in worker threads close the channel. If a thread panics, its sender is dropped. The channel counter decreases. If that was the last sender, the channel closes. rx.iter() stops. You might lose results from other threads that haven't sent yet. Always check the join() result. A panicked thread leaves a hole in your fan-in.

The compiler catches capture mistakes. If you forget tx.clone() inside the loop, the first iteration moves tx into the closure. The second iteration tries to use tx again. The compiler rejects this with E0382 (use of moved value). The fix is to clone before the spawn.

If you try to move data out of a borrowed context, the compiler rejects you with E0507 (cannot move out of borrowed content). This happens when you iterate over a slice and try to move elements into closures. Use into_iter() to consume the vector, or clone the items. The move closure requires ownership.

Check the join result. A panicked thread leaves a hole in your fan-in.

Convention asides

The community prefers drop(tx) over manual counting. Some developers track the number of clones and use rx.recv() in a loop with a counter. This works, but it is error-prone. drop(tx) plus rx.iter() is idiomatic. It leverages the channel's built-in closing mechanism.

Convention dictates keeping unsafe out of fan-out/fan-in. The standard library channels are safe. You do not need raw pointers or manual memory management. If you find yourself reaching for unsafe to speed up the channel, you are likely fighting the wrong problem. Profile first. The channel overhead is usually negligible compared to the work being done.

Use handle.join().expect("msg") in production code. unwrap() is fine for scripts and examples. expect() provides context when a thread panics. It makes debugging easier.

When to use this pattern

Use mpsc fan-out/fan-in when you have a fixed set of independent tasks and need to gather results back into a single thread.

Reach for rayon when you are transforming a collection and want the compiler to handle the thread pool and synchronization for you.

Pick async runtimes like tokio when your tasks are I/O bound and you need thousands of concurrent operations without the overhead of OS threads.

Choose Arc<Mutex<T>> when workers need to update shared state incrementally rather than sending discrete results.

Don't reinvent the wheel. If rayon does the job, use it.

Where to go next