How to Use Async Iterators (Streams) in Rust

Define async functions with the `async` keyword and use `await` to pause execution until the result is ready.

When data arrives over time

You're building a log analyzer that needs to process a 50GB file. Loading it all into memory crashes your machine. You're also building a chat bot that needs to react to messages instantly as they arrive from a WebSocket. In both cases, you have a sequence of items that arrive over time, and you want to handle them one by one without blocking. Rust calls this an async iterator, or a stream.

A synchronous iterator works on data that's already there. An async iterator works on data that might not be ready yet. You ask for the next item, and if it's available, you get it. If not, you pause and let the runtime do other work until the item arrives.

The conveyor belt versus the delivery truck

A synchronous iterator is like a conveyor belt in a factory. The items are already there. You reach out, grab the next widget, process it, grab the next. The work is instant.

An async iterator is like a delivery service dropping off packages at your door. You check the door. Nothing there. You go back to doing other work. Ten minutes later, a package arrives. You grab it, process it, then wait for the next one. The key difference is the wait. With async iterators, the next item isn't guaranteed to be ready right now. You have to await it.

The Stream trait and StreamExt

Rust represents async iterators with the Stream trait. It lives in the futures crate, which is part of the standard async ecosystem. Most runtimes like tokio and async-std depend on it.

The Stream trait has one associated type, Item, which is the type of values the stream produces. It has one method, poll_next, which is the low-level way to ask for the next item. You rarely call poll_next directly.

The community convention is to import StreamExt. This trait adds the methods you actually use: next, map, filter, take, for_each, and collect. These methods return new streams. You chain them together just like sync iterators. The chain doesn't run until you poll it, usually by calling .next().await or .collect().await.

Treat StreamExt as the interface. Treat Stream as the implementation detail.

Minimal example

Here's a runnable example using tokio and futures. It creates a stream from a vector and consumes it.

use futures::stream::{self, StreamExt};

#[tokio::main]
async fn main() {
    // Create a stream from a vector of numbers
    let numbers = vec![1, 2, 3, 4, 5];
    let mut stream = stream::iter(numbers);

    // Consume the stream item by item
    while let Some(n) = stream.next().await {
        println!("Got: {}", n);
    }
}

The stream::iter function takes an iterator and wraps it in a stream. The StreamExt::next method returns a Future that resolves to Option<Item>. The while let Some loop pulls items until the stream returns None.

Walk through what happens

When you call stream.next(), you get a Future. You .await that future. The runtime polls the future. The stream checks if there's a next item. If there is, it returns Poll::Ready(Some(item)). The await completes, and you get the item.

If the stream has no items left, it returns Poll::Ready(None). The loop ends.

If the stream is waiting on I/O, it returns Poll::Pending. The runtime puts the task to sleep and wakes it up later when the I/O is ready. This is the magic of async. The thread isn't blocked. It can run other tasks while waiting for the next item.

Streams are lazy. Creating a stream does nothing. Calling .map() on a stream does nothing. The work only happens when you poll the stream. This lets you build complex pipelines without paying the cost until you actually need the data.

Realistic example: processing a log file

Here's a more realistic scenario. You're reading a large log file line by line and filtering for errors.

use futures::StreamExt;
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader};

async fn process_log(path: &str) -> Result<(), Box<dyn std::error::Error>> {
    // Open the file asynchronously
    let file = File::open(path).await?;
    let reader = BufReader::new(file);

    // lines() returns a Stream of Result<String, Error>
    let mut lines = reader.lines();

    while let Some(line_result) = lines.next().await {
        // Handle errors per line
        let line = line_result?;

        if line.contains("ERROR") {
            println!("Alert: {}", line);
        }
    }

    Ok(())
}

The BufReader::lines method returns a stream. Each item is a Result<String, Error>. You handle the error inside the loop. If you want to stop on the first error, the ? operator breaks the loop and returns early. If you want to skip bad lines, use if let Ok(line) = line_result.

The stream reads chunks from the file asynchronously. It doesn't load the whole file into memory. You can process files larger than your RAM.

Under the hood: Poll and Wakers

Understanding poll_next helps you debug streams and write custom ones. The poll_next method takes a Context and returns a Poll.

use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};

struct AsyncCounter {
    count: u32,
}

impl Stream for AsyncCounter {
    type Item = u32;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // Get mutable access to the struct
        let this = self.get_mut();

        if this.count < 5 {
            this.count += 1;
            // Item is ready immediately
            Poll::Ready(Some(this.count))
        } else {
            // Stream is exhausted
            Poll::Ready(None)
        }
    }
}

The Poll enum has two variants. Poll::Ready(value) means the item is available. Poll::Pending means the item isn't ready yet. When you return Pending, you must register a waker with the context using cx.waker().clone(). The waker tells the runtime how to wake this task up later.

Writing poll_next by hand is tedious. You have to manage state, handle wakers, and deal with Pin. The community convention is to use the async-stream crate. It provides a stream! macro that lets you write streams with yield syntax.

use async_stream::stream;
use futures::StreamExt;

#[tokio::main]
async fn main() {
    let s = stream! {
        for i in 0..5 {
            yield i;
        }
    };

    s.for_each(|n| println!("Yielded: {}", n)).await;
}

The macro generates the Stream implementation for you. It handles Poll, Waker, and Pin behind the scenes. Reach for async-stream when you need to create a stream. Reach for manual Stream implementation only when you're writing a low-level driver or a performance-critical library.

Pitfalls and compiler errors

Forgetting the await. If you call stream.next() without .await, you get a Future. You can't use the future as a value. The compiler rejects this with a type mismatch error. You'll see E0277 (trait bound not satisfied) or a message about Future not implementing the expected trait. Always await the result of stream methods that return futures.

Streams are lazy. Creating a stream with side effects won't run those effects until you poll the stream. If you define a stream that prints to stdout but never call .next().await, nothing prints. This catches everyone once. Treat streams like lazy recipes. They don't cook until you ask for the dish.

Borrowing across await. You cannot hold a mutable reference to a variable across an .await point. If you try to mutate a value while waiting for the next item, the borrow checker stops you. You'll see E0382 (use of moved value) or a borrow conflict error. Use RefCell for interior mutability, or restructure the code to avoid the borrow.

Mixing sync and async iterators. You can't use a sync Iterator in an async context directly. You have to wrap it with stream::iter. Conversely, you can't use a Stream in a sync for loop. You need while let Some or StreamExt::for_each.

Decision: when to use streams

Use Iterator when your data is already in memory and you can process items instantly without waiting. Use Stream when items arrive over time, come from the network, or require I/O to produce the next value. Use the async-stream crate when you need to write a stream body with yield syntax, which feels like a generator and avoids manual Poll implementation. Use futures::stream::unfold when you have a stateful loop that produces items asynchronously and you want a functional style without defining a struct. Use tokio::stream::iter when you need to convert a sync iterator into a stream for compatibility with async APIs.

Don't reach for streams when a simple Future suffices. If you only have one value, use Future. If you have a sequence, use Stream.

Where to go next