How to Use Async Streams Effectively in Rust

Use `async` functions with `await` to pause execution until I/O completes, then chain calls to reduce boilerplate.

When a single value isn't enough

You have a list of 100 URLs. You need the page titles. You write an async fn that fetches one URL and returns one title. Great. Now you need all 100. You could loop over the list, call the function, and collect results. That works, but it forces a choice: do you fetch all 100 in parallel and buffer every response in memory, or do you fetch them sequentially and wait for the network latency of 100 round-trips?

There's a third option. You can produce titles as they become available. The first title arrives, your code processes it, then the second arrives, and so on. You don't buffer everything. You don't wait for everything. You get a continuous flow of data that arrives asynchronously.

That's what an async stream is. It's a sequence of items that arrive over time, where each item might require I/O or computation before it's ready. Streams let you process data in a pipeline without loading it all into memory at once.

What is an async stream

An async stream is defined by the Stream trait. If you know Iterator, you know the pattern. An iterator yields items synchronously. You call next(), and you get the next item immediately. A stream yields items asynchronously. You call poll_next(), and you get a Poll<Option<T>>. The item might be ready now, or it might be ready later.

The Poll enum has two variants. Poll::Ready(Some(value)) means the stream produced an item. Poll::Ready(None) means the stream is exhausted. Poll::Pending means the stream isn't ready yet. When you get Pending, the stream registers a Waker so the executor knows to call back when the underlying work finishes.

Think of an iterator like a deck of cards. You flip the top card, you have it. You flip the next, you have it. Everything is in your hand. Think of a stream like a mail slot. You check the slot. Maybe there's a letter. Maybe not. If not, you wait. When a letter arrives, the mail carrier drops it in. You check again. The items arrive over time, and you can't force them to appear faster than the carrier delivers them.

The Stream trait is in the futures crate. Rust's standard library doesn't include it yet, but the ecosystem treats futures::Stream as the standard. Most async code uses StreamExt from futures to get methods like map, filter, and collect, mirroring Iterator adapters.

Minimal example

Implementing Stream manually requires writing a state machine that tracks where you are in the sequence. That's error-prone and verbose. The community convention is to use the async-stream crate for ad-hoc streams. It provides a stream! macro that lets you write async code with yield statements. The macro generates the state machine for you.

Add async-stream and futures to your dependencies. Then write a stream that yields numbers.

use async_stream::stream;
use futures::StreamExt;

#[tokio::main]
async fn main() {
    // Create a stream that yields three values.
    // The stream! macro builds a struct implementing Stream.
    let s = stream! {
        yield 1;
        yield 2;
        yield 3;
    };

    // Collect the stream into a Vec.
    // This pulls items one by one until the stream ends.
    let items: Vec<i32> = s.collect().await;
    assert_eq!(items, vec![1, 2, 3]);
}

The stream! macro captures the code block and turns it into a stream. Each yield pauses execution and sends the value to the consumer. When the consumer calls next() again, the stream resumes right after the yield. When the block finishes, the stream ends.

Convention aside: always use stream! for local streams. Manual Stream implementations belong in libraries where you need zero-cost abstractions and fine-grained control over the state machine. For application code, stream! is the standard tool.

How the stream macro works

Under the hood, stream! generates a struct that holds the local variables and a state enum. The state enum tracks which yield you're past. When you call poll_next, the generated code matches on the state. If you're at the start, it runs until the first yield, saves the state, and returns Poll::Ready(Some(value)). The next call resumes from the saved state.

This means variables captured by the stream live as long as the stream. If you move a value into the stream, the stream owns it. If you drop the stream before it finishes, the captured values are dropped too.

use async_stream::stream;
use futures::StreamExt;

#[tokio::main]
async fn main() {
    let data = vec![10, 20, 30];
    
    // The stream captures data by move.
    // data is no longer usable here.
    let s = stream! {
        for item in data {
            yield item * 2;
        }
    };

    // If you try to use data here, you get E0382.
    // println!("{:?}", data); // Error: use of moved value `data`

    let doubled: Vec<i32> = s.collect().await;
    assert_eq!(doubled, vec![20, 40, 60]);
}

The compiler rejects access to data after the stream is created with E0382 (use of moved value). The stream owns the vector. This is consistent with Rust's ownership rules. The stream is just a closure that runs over time.

Treat yield like a checkpoint. The stream pauses there until the consumer asks for the next item. Nothing after the yield runs until next() is called again.

Realistic example: streaming page titles

The kernel of this topic often involves web scraping. You have a list of URLs. You want to fetch titles and process them as they arrive. You don't want to buffer all responses. You want a stream of titles.

Use reqwest for fetching and scraper for parsing. Wrap the logic in stream!.

use async_stream::stream;
use futures::StreamExt;
use reqwest;
use scraper::{Html, Selector};

/// Returns a stream of page titles for the given URLs.
/// Yields None if a URL fails to fetch or parse.
async fn title_stream(urls: Vec<&str>) -> impl futures::Stream<Item = Option<String>> {
    stream! {
        for url in urls {
            // Fetch the page content.
            // If the request fails, skip to the next URL.
            let text = match reqwest::get(url).await {
                Ok(resp) => match resp.text().await {
                    Ok(t) => t,
                    Err(_) => continue,
                },
                Err(_) => continue,
            };

            // Parse HTML and extract the title.
            let doc = Html::parse_document(&text);
            let selector = Selector::parse("title").unwrap();
            
            // Yield the title text, or None if not found.
            let title = doc.select(&selector)
                .next()
                .map(|el| el.text().collect::<String>());
            
            yield title;
        }
    }
}

#[tokio::main]
async fn main() {
    let urls = vec![
        "https://example.com",
        "https://rust-lang.org",
        "https://invalid-url-that-fails.com",
    ];

    // Consume the stream.
    // Process each title as it arrives.
    let mut s = title_stream(urls).await;
    
    while let Some(title) = s.next().await {
        match title {
            Some(t) => println!("Got title: {}", t),
            None => println!("No title found for this URL"),
        }
    }
}

This code creates a stream that iterates over URLs. For each URL, it fetches the content, parses the HTML, and yields the title. If a fetch fails, continue skips to the next URL. The consumer pulls titles one by one. Memory usage stays low because only one response is buffered at a time.

Convention aside: return impl Stream instead of BoxStream. impl Stream avoids heap allocation for the stream object. BoxStream allocates a trait object on the heap. Use impl Stream whenever the return type is opaque. Only use BoxStream when you need to store the stream in a struct or return different stream types from branches.

Pitfalls and compiler errors

Streams introduce async-specific pitfalls. The most common is blocking the executor. If you call a blocking function inside stream!, you freeze the thread. The executor can't run other tasks while your thread is stuck.

use async_stream::stream;
use std::thread;

// BAD: This blocks the executor thread.
let s = stream! {
    thread::sleep(std::time::Duration::from_secs(1));
    yield 42;
};

Never use thread::sleep or blocking I/O inside a stream. Use tokio::time::sleep or async I/O. If you must call a blocking function, wrap it in tokio::task::spawn_blocking. The stream should only contain async operations.

Another pitfall is trait bounds. If you return a stream from a function, you need to specify the trait. Forgetting the bound causes E0277.

// BAD: Missing trait bound on return type.
async fn bad_stream() -> impl futures::Stream {
    stream! { yield 1; }
}
// Error: E0277: the trait bound `{impl Stream}: Stream` is not satisfied

The compiler rejects this with E0277 (trait bound not satisfied). You must specify the item type.

// GOOD: Specify the Item type.
async fn good_stream() -> impl futures::Stream<Item = i32> {
    stream! { yield 1; }
}

Cancellation is another concern. If the consumer drops the stream, the stream stops. Any pending work inside stream! is lost. If you're in the middle of a fetch when the stream is dropped, the fetch might be cancelled. This is usually fine, but if you're writing cleanup logic, you need to handle cancellation. Use tokio::select! inside the stream if you need to respond to cancellation signals.

Don't block the executor. If you can't await, you shouldn't be in the stream. The whole executor waits for you.

Decision: when to use this vs alternatives

Streams are powerful, but they aren't the only tool. Choose the right abstraction for the job.

Use async-stream when you need a quick stream from async code and want to avoid manual state machines. The stream! macro lets you write linear code with yield, and it handles the complexity of Stream implementation. This is the default choice for application code.

Use tokio_stream when you are already in the Tokio ecosystem and need adapters for channels, timers, or iterators. tokio_stream provides StreamMap, StreamExt extensions, and utilities like iter() to turn an iterator into a stream. If you're using Tokio, this crate fills the gaps.

Use manual Stream implementation when you are writing a library and need zero-cost abstractions. Manual implementation gives you full control over the state machine and avoids the overhead of the stream! macro. This is rare. Most library authors still use stream! unless profiling shows the macro is a bottleneck.

Reach for async fn returning a single value when you only need one result. Streams add complexity. If you fetch one URL and return one title, use a function. Don't wrap a single value in a stream just because you can.

Reach for Vec or collect() when you need all items at once and the dataset fits in memory. Streams shine when data is large, infinite, or when you want to process items as they arrive. If you need random access or the entire collection before processing, collect the stream into a vector.

Prefer impl Stream over BoxStream for return types. Allocation is a performance tax you rarely need to pay. impl Stream is the convention for opaque stream returns.

Trust the borrow checker. If your stream captures references, the lifetimes must outlive the stream. The compiler will enforce this. Don't fight it with raw pointers. Restructure the code to own the data or extend the lifetimes.

Where to go next