The problem with waiting
You are building a log parser that reads from a network socket. Or a game server that processes player actions as they arrive. Or a data pipeline that fetches paginated results from an API. You want to process each item as soon as it appears, without blocking the rest of your program and without loading the entire dataset into RAM. Synchronous iterators handle the "process as it appears" part beautifully. They break down when the data source requires waiting. The network call, the disk read, the database query. All of them require waiting.
A standard Rust Iterator is a vending machine. You press a button, you get a snack instantly. The machine has everything stocked on the shelf. An async iterator is a custom order counter. You place your order, the staff goes to the kitchen, waits for the food to cook, and brings it back. You cannot stand there holding the counter hostage. You have to step aside, do something else, and come back when the food is ready. In Rust, that stepping aside is await. The trait that models this is Stream. It lives in the futures crate, not in the standard library. The standard library deliberately left async iteration out of the 1.0 release to avoid locking in a design before the ecosystem matured. The community settled on Stream as the de facto standard.
Async iteration is not a replacement for loops. It is a bridge between blocking I/O and non-blocking execution.
The Stream trait
The Stream trait looks almost identical to Iterator, but with one crucial difference. Every method returns a Future instead of a concrete value.
use std::pin::Pin;
use std::task::{Context, Poll};
/// The core trait for asynchronous iteration.
pub trait Stream {
type Item;
/// Polls the stream for the next item.
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;
}
The poll_next method takes a pinned mutable reference and a context. It returns Poll::Ready(Some(item)) when data is available, Poll::Ready(None) when the stream is exhausted, or Poll::Pending when the stream needs to wait for an external event. The Pin type ensures the stream's memory address does not change while it is being polled. This prevents the runtime from accidentally moving a future that holds self-referential pointers.
Implementing this trait by hand requires tracking which yield point you are at, managing the Waker from the context, and handling state transitions manually. The boilerplate is heavy and error-prone. Nobody writes manual Stream implementations for everyday code. The community convention is to use the async-stream crate, which provides a macro that generates the state machine for you.
The macro writes the state machine. You just write the logic.
The minimal setup
Install the crate first. Run cargo add async-stream in your terminal. The crate depends on futures-core and pin-project-lite, but you do not need to manage those dependencies yourself.
use async_stream::stream;
/// Generates a sequence of numbers asynchronously.
async fn number_generator() {
let mut count = 0;
let s = stream! {
// The macro captures `count` and turns this block into a state machine.
for i in 0..5 {
count += 1;
// `yield` suspends execution and returns the value to the caller.
yield i;
}
};
// Consume the stream by polling for the next item.
while let Some(n) = s.next().await {
println!("Got: {}", n);
}
}
The stream! macro does not run the loop immediately. It returns a struct that implements Stream. That struct holds a Future internally. Every time you call .next().await, you are polling that future. The first poll runs until the first yield. The macro captures the local variables, saves the execution state, and returns Poll::Ready(Some(value)). The next poll resumes exactly after the yield, continues until the next yield or the end of the block, and repeats. When the block finishes, the stream returns Poll::Ready(None) to signal completion.
Polling is the heartbeat. Every await is a checkpoint where the runtime can pause and resume.
Under the hood: state machines and polling
The async-stream macro translates your code into a match statement that tracks an enum representing each possible suspension point. When you call next().await, the runtime wakes the future, the match statement jumps to the correct arm, executes until the next yield or return, and updates the enum. This is identical to how async fn works, but scoped to a single iterator-like interface.
The macro also handles variable capture. If you reference a local variable inside the block, the macro moves it into the generated state machine struct. This means the stream owns its data for the duration of its lifetime. You cannot yield a reference to a stack variable that will be dropped when the surrounding function returns. The compiler will catch this immediately.
The community convention is to keep the stream! block self-contained. If you need to mutate external state, pass it in explicitly or use interior mutability like RefCell. The macro will complain if you try to borrow a variable that goes out of scope before the stream finishes. Keep the capture list minimal. It makes the generated state machine smaller and easier to debug.
Filter early, yield late. Keep the stream lean.
Real-world pattern: filtering and error handling
Real streams rarely just count numbers. They fetch data, filter noise, and handle errors. Let's build a stream that simulates fetching paginated API results. We will add artificial delays, filter out empty responses, and stop early if we hit an error.
use async_stream::stream;
use std::time::Duration;
use tokio::time::sleep;
/// Simulates fetching paginated data from a remote API.
async fn fetch_pages() {
let s = stream! {
let mut page = 1;
loop {
// Simulate network latency.
sleep(Duration::from_millis(100)).await;
// Simulate API response.
let data = match page {
1 => vec!["alpha", "beta"],
2 => vec![], // Empty page
3 => vec!["gamma"],
4 => break, // End of data
_ => unreachable!(),
};
// Filter out empty pages before yielding.
if !data.is_empty() {
for item in data {
yield item;
}
}
page += 1;
}
};
while let Some(item) = s.next().await {
println!("Processing: {}", item);
}
}
The stream! macro allows you to use break to terminate early. It also allows return to exit the surrounding function entirely. You can yield Result<T, E> instead of T to propagate errors through the pipeline. The caller then matches on the result inside the while let loop. This pattern keeps error handling explicit and prevents silent failures.
When you need to transform the data, bring in the futures crate. The standard library does not provide combinators for Stream. You need use futures::stream::StreamExt; to unlock .map(), .filter(), .take(), and .collect(). Without that import, the compiler throws an E0599 (no method named map found for struct Stream). The combinators return new streams that wrap the original one, creating a lazy pipeline that only executes when you poll it.
Chain combinators carefully. Each .map() or .filter() adds a layer of state machine overhead. Profile before you optimize.
Pitfalls and compiler traps
Async iterators introduce two major traps. The first is borrowing across yield points. You cannot yield a reference to a local variable that will be dropped when the current function returns. The compiler will reject this with a lifetime error. The stream owns its state, so it must own the data it yields, or the data must live longer than the stream itself. If you need to yield references, use Arc or Rc to share ownership, or restructure the code so the data outlives the stream.
The second trap is pinning. The Stream trait requires &mut self to be Unpin in most cases, but the async-stream macro generates a type that is !Unpin by default. This means you cannot move the stream around after it has been polled. If you try to store the stream in a Vec and sort it, or pass it to a function that requires Unpin, the compiler will stop you. The error usually mentions "future cannot be moved" or "type is not Unpin". The fix is to pin the stream to the stack or heap using Box::pin or std::pin::pin!. Most of the time, you just consume the stream immediately in a while let loop, so pinning never becomes a problem. Only when you need to store or pass the stream around do you need to worry about it.
Another common mistake is treating Stream like Iterator. You cannot call .collect::<Vec<_>>() directly without StreamExt. The standard library deliberately separates sync and async iteration to avoid API bloat. The community convention is to import StreamExt at the top of any module that chains stream operations. It signals to readers that you are working with async pipelines.
If the compiler complains about pinning, stop moving the stream around. Pin it once, or consume it immediately.
Choosing your approach
Use async-stream when you want to write async iteration logic that looks like synchronous code. It handles state machines, pinning, and error propagation automatically. Use futures::stream::unfold when you need a lightweight, zero-allocation stream from a simple state transition. It takes a closure that returns Option<(Item, NextState)> and avoids macro overhead. Use a manual Stream implementation when you are building a foundational library or wrapping a C API that requires precise control over polling and memory layout. Reach for a synchronous Iterator when your data is already in memory and requires no I/O. The async overhead is unnecessary and will slow you down.
Pick the tool that matches your data source, not the one that looks coolest.