Skip to content

feat: Add StreamExt::switch#2997

Open
Hywan wants to merge 1 commit intorust-lang:masterfrom
Hywan:feat-util-switch
Open

feat: Add StreamExt::switch#2997
Hywan wants to merge 1 commit intorust-lang:masterfrom
Hywan:feat-util-switch

Conversation

@Hywan
Copy link
Copy Markdown

@Hywan Hywan commented Mar 5, 2026

This patch introduces the Switch combinator stream: it flattens a higher-order stream into a first-order stream.

This combinator flattens a stream of streams, i.e. an outer stream yielding inner streams. This combinator always keeps the most recently yielded inner stream, and yields items from it, until the outer stream produces a new inner stream, at which point the inner stream to yield items from is switched to the new one.

Examples

An empty (outer) stream can be switched:

use futures::stream::{self, Empty, StreamExt, Switch};

let stream: Switch<Empty<Empty<()>>> = stream::empty().switch();

assert!(stream.collect::<Vec<_>>().await.is_empty());

An outer stream can produce several inner streams — once switched, the last one is immediately selected:

use futures::stream::{self, StreamExt};

let stream = stream::iter([
    stream::iter([1, 2, 3]),
    stream::iter([4, 5, 6]),
    stream::iter([7, 8, 9]),
])
.switch();

assert_eq!(vec![7, 8, 9], stream.collect::<Vec<_>>().await);

One of the most interesting usecase is when an outer stream produces new inner streams dynamically. Let's imagine the outer stream produces inner streams yielding a sequence of integers starting from a particular value. For example:

  • outer stream yields 7:
    • inner stream yields 7, 8, 9, 10, 11…
  • outer stream yields 42:
    • inner stream yields 42, 43, 44, 45, 46…
use futures::channel::mpsc;
use futures::stream::{self, StreamExt};
use futures::task::Poll;
use std::pin::pin;

// `receiver` is the outer stream: it yields integers received by
// `sender`.
let (sender, receiver) = mpsc::unbounded::<u8>();

let mut stream = pin!(receiver
    // For every new received value from `sender`…
    .map(|init_value| {
        // … let's create a new inner stream.
        //
        // First off, the inner stream starts with `init_value`. Then,
        // it continues with incremented integers.
        let mut next_value = init_value;

        stream::poll_fn(move |_| {
            let current_value = next_value;
            next_value += 1;

            Poll::Ready(Some(current_value))
        })
    })
    .switch());

// `stream` is pending until the outer stream yields something.
sender.unbounded_send(7).unwrap();

// `stream` has switched to the inner stream.
assert_eq!(vec![7, 8, 9, 10, 11], stream.by_ref().take(5).collect::<Vec<_>>().await);
assert_eq!(vec![12, 13, 14, 15, 16], stream.by_ref().take(5).collect::<Vec<_>>().await);

// The outer stream will yield a new value, which will create a new
// inner stream.
sender.unbounded_send(42).unwrap();

// `stream` has been “reset” and will produce a new inner stream.
assert_eq!(vec![42, 43, 44, 45, 46], stream.take(5).collect::<Vec<_>>().await);

This is a port of async_rx::Switch, written by @jplatte, and maintained by @jplatte and I. The test suite has been improved compared to the original repository.

I believe this combinator is useful when one wants to generate streams dynamically (the inner streams) based on another input (managed by the outer stream).

@rustbot rustbot added the A-stream Area: futures::stream label Mar 5, 2026
@Hywan Hywan marked this pull request as ready for review March 5, 2026 13:44
@rustbot rustbot added the S-waiting-on-review Status: Awaiting review from the assignee but also interested parties. label Mar 5, 2026
This patch introduces the `Switch` combinator stream: it flattens a
higher-order stream into a first-order stream.

This combinator flattens a stream of streams, i.e. an outer stream
yielding inner streams. This combinator always keeps the most recently
yielded inner stream, and yields items from it, until the outer stream
produces a new inner stream, at which point the inner stream to yield
items from is switched to the new one.

Examples
--------

An empty (outer) stream can be switched:

```
\# futures::executor::block_on(async {
use futures::stream::{self, Empty, StreamExt, Switch};

let stream: Switch<Empty<Empty<()>>> = stream::empty().switch();

assert!(stream.collect::<Vec<_>>().await.is_empty());
\# });
```

An outer stream can produce several inner streams — once switched, the
last one is immediately selected:

```
\# futures::executor::block_on(async {
use futures::stream::{self, StreamExt};

let stream = stream::iter([
    stream::iter([1, 2, 3]),
    stream::iter([4, 5, 6]),
    stream::iter([7, 8, 9]),
])
.switch();

assert_eq!(vec![7, 8, 9], stream.collect::<Vec<_>>().await);
\# });
```

One of the most interesting usecase is when an outer stream produces new
inner  streams dynamically. Let's imagine the outer stream produces inner
streams yielding a sequence of integers starting from a particular value.
For example:

- outer stream yields 7:
  - inner stream yields 7, 8, 9, 10, 11…
- outer stream yields 42:
  - inner stream yields 42, 43, 44, 45, 46…

```
\# futures::executor::block_on(async {
use futures::channel::mpsc;
use futures::stream::{self, StreamExt};
use futures::task::Poll;
use std::pin::pin;

// `receiver` is the outer stream: it yields integers received by
// `sender`.
let (sender, receiver) = mpsc::unbounded::<u8>();

let mut stream = pin!(receiver
    // For every new received value from `sender`…
    .map(|init_value| {
        // … let's create a new inner stream.
        //
        // First off, the inner stream starts with `init_value`. Then,
        // it continues with incremented integers.
        let mut next_value = init_value;

        stream::poll_fn(move |_| {
            let current_value = next_value;
            next_value += 1;

            Poll::Ready(Some(current_value))
        })
    })
    .switch());

// `stream` is pending until the outer stream yields something.
sender.unbounded_send(7).unwrap();

// `stream` has switched to the inner stream.
assert_eq!(vec![7, 8, 9, 10, 11], stream.by_ref().take(5).collect::<Vec<_>>().await);
assert_eq!(vec![12, 13, 14, 15, 16], stream.by_ref().take(5).collect::<Vec<_>>().await);

// The outer stream will yield a new value, which will create a new
// inner stream.
sender.unbounded_send(42).unwrap();

// `stream` has been “reset” and will produce a new inner stream.
assert_eq!(vec![42, 43, 44, 45, 46], stream.take(5).collect::<Vec<_>>().await);
\# });
```
@Hywan Hywan force-pushed the feat-util-switch branch from 22f656e to 71a9245 Compare April 17, 2026 07:04
pub struct Switch<Outer, Inner> {
#[pin]
outer_stream: Outer,
outer_stream_is_closed: bool,
Copy link
Copy Markdown
Author

@Hywan Hywan Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be removed by Outer: FusedStream. This is debatable and I'm happy to change it.

View changes since the review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-stream Area: futures::stream S-waiting-on-review Status: Awaiting review from the assignee but also interested parties.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants