Skip to content

feat(dfir_pipes): add push-side Fold, Reduce, and Sort operators#2844

Draft
MingweiSamuel wants to merge 1 commit intomainfrom
mingwei/push-fold
Draft

feat(dfir_pipes): add push-side Fold, Reduce, and Sort operators#2844
MingweiSamuel wants to merge 1 commit intomainfrom
mingwei/push-fold

Conversation

@MingweiSamuel
Copy link
Copy Markdown
Member

These operators accumulate items during start_send and emit results
downstream during poll_flush, enabling them to work on the push side
of a subgraph without requiring a subgraph boundary.

New push operators:

  • Fold<Acc, CombFn, Next>: accumulates all items via a fold function,
    emits the accumulated value on flush
  • Reduce<Acc, ReduceFn, Next>: reduces items into a single value,
    emits on flush (nothing if no items received)
  • Sort<Item, Next>: collects all items, sorts them, emits in sorted
    order on flush

These are needed because removing DelayType::Stratum allows blocking
operators to end up on the push side of a subgraph. Previously they were
always pull-side due to the forced subgraph boundary on their input.

This is part of the effort for singleton references in Hydro

Co-authored-by: Infinity 🤖 infinity@hydro.run

@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages Bot commented May 5, 2026

Deploying hydro with  Cloudflare Pages  Cloudflare Pages

Latest commit: 39b0fbe
Status: ✅  Deploy successful!
Preview URL: https://7eeb0c80.hydroflow.pages.dev
Branch Preview URL: https://mingwei-push-fold.hydroflow.pages.dev

View logs

@MingweiSamuel MingweiSamuel marked this pull request as ready for review May 5, 2026 21:35
@MingweiSamuel MingweiSamuel requested review from a team and Copilot May 5, 2026 21:35
MingweiSamuel added a commit that referenced this pull request May 5, 2026
These operators accumulate items during `start_send` and emit results
downstream during `poll_flush`, enabling them to work on the push side
of a subgraph without requiring a subgraph boundary.

New push operators:
- `Fold<Acc, CombFn, Next>`: accumulates all items via a fold function,
  emits the accumulated value on flush
- `Reduce<Acc, ReduceFn, Next>`: reduces items into a single value,
  emits on flush (nothing if no items received)
- `Sort<Item, Next>`: collects all items, sorts them, emits in sorted
  order on flush

These are needed because removing `DelayType::Stratum` allows blocking
operators to end up on the push side of a subgraph. Previously they were
always pull-side due to the forced subgraph boundary on their input.

This is part of the effort for singleton references in Hydro

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2844
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds new push-side blocking combinators to dfir_pipes so aggregation/sorting can happen on the push side of a subgraph (accumulating during start_send and emitting during poll_flush), supporting the post-DelayType::Stratum execution model and ongoing singleton-reference work.

Changes:

  • Introduces new push combinators: Fold, Reduce, and Sort.
  • Wires the new operators into dfir_pipes::push via module declarations and re-exports.
  • Adds Sort behind the alloc feature gate.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.

File Description
dfir_pipes/src/push/fold.rs Adds push-side Fold combinator (accumulate on send, emit on flush).
dfir_pipes/src/push/reduce.rs Adds push-side Reduce combinator (reduce into single value, emit on flush).
dfir_pipes/src/push/sort.rs Adds push-side Sort combinator (buffer, sort, emit in order on flush).
dfir_pipes/src/push/mod.rs Registers/re-exports the new push modules/operators (with Sort behind alloc).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +64 to +66
*this.flushed = true;
if let Some(value) = this.acc.as_ref() {
this.next.as_mut().start_send(value.clone(), ());
Comment on lines +63 to +70
if !*this.flushed {
*this.flushed = true;
if let Some(value) = this.acc.as_ref() {
this.next.as_mut().start_send(value.clone(), ());
}
}
this.next.poll_flush(ctx)
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this indicates the trait needs a poll_close method to solely indicate end of life. Need to nail this down a bit better

Comment on lines +59 to +67
fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> {
let mut this = self.project();
if !*this.flushed {
*this.flushed = true;
let value = this.acc.clone();
this.next.as_mut().start_send(value, ());
}
this.next.poll_flush(ctx)
}
Comment on lines +60 to +66
let mut this = self.project();
if !*this.flushed {
*this.flushed = true;
let value = this.acc.clone();
this.next.as_mut().start_send(value, ());
}
this.next.poll_flush(ctx)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question

Comment on lines 53 to +70
pub use flat_map_stream::FlatMapStream;
pub use flatten::Flatten;
pub use flatten_stream::FlattenStream;
pub use fold::Fold;
pub use for_each::ForEach;
use futures_core::FusedStream;
pub use inspect::Inspect;
pub use map::Map;
#[cfg(feature = "alloc")]
#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
pub use persist::Persist;
pub use reduce::Reduce;
pub use resolve_futures::ResolveFutures;
pub use sink::Sink;
pub use sink_compat::SinkCompat;
#[cfg(feature = "alloc")]
#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
pub use sort::Sort;
Comment on lines +8 to +24
pin_project! {
/// Push combinator that accumulates all items via a fold function, then emits
/// the accumulated value downstream on flush.
///
/// During `start_send`, items are folded into the accumulator.
/// During `poll_flush`, the accumulated value is cloned and sent downstream,
/// then the downstream is flushed.
#[must_use = "`Push`es do nothing unless items are pushed into them"]
#[derive(Clone, Debug)]
pub struct Fold<Acc, CombFn, Next> {
#[pin]
next: Next,
acc: Acc,
comb_fn: CombFn,
flushed: bool,
}
}
Comment on lines +8 to +23
pin_project! {
/// Push combinator that reduces all items into a single value, then emits
/// it downstream on flush. If no items were received, nothing is emitted.
///
/// During `start_send`, items are reduced into the accumulator.
/// During `poll_flush`, the accumulated value (if any) is sent downstream.
#[must_use = "`Push`es do nothing unless items are pushed into them"]
#[derive(Clone, Debug)]
pub struct Reduce<Acc, ReduceFn, Next> {
#[pin]
next: Next,
acc: Option<Acc>,
reduce_fn: ReduceFn,
flushed: bool,
}
}
Comment on lines +9 to +21
pin_project! {
/// Push combinator that collects all items, sorts them, then emits them
/// downstream in sorted order on flush.
#[must_use = "`Push`es do nothing unless items are pushed into them"]
#[derive(Clone, Debug)]
pub struct Sort<Item, Next> {
#[pin]
next: Next,
buf: Vec<Item>,
sorted: bool,
flush_idx: usize,
}
}
MingweiSamuel added a commit that referenced this pull request May 5, 2026
These operators accumulate items during `start_send` and emit results
downstream during `poll_flush`, enabling them to work on the push side
of a subgraph without requiring a subgraph boundary.

New push operators:
- `Fold<Acc, CombFn, Next>`: accumulates all items via a fold function,
  emits the accumulated value on flush
- `Reduce<Acc, ReduceFn, Next>`: reduces items into a single value,
  emits on flush (nothing if no items received)
- `Sort<Item, Next>`: collects all items, sorts them, emits in sorted
  order on flush

These are needed because removing `DelayType::Stratum` allows blocking
operators to end up on the push side of a subgraph. Previously they were
always pull-side due to the forced subgraph boundary on their input.

This is part of the effort for singleton references in Hydro

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2844

Fix clippy missing_const_for_fn errors in dfir_pipes

Added `const` qualifier to three `new()` functions in dfir_pipes/src/push/
that clippy identified as being eligible for const evaluation:
- fold.rs: `Fold::new()`
- reduce.rs: `Reduce::new()`
- sort.rs: `Sort::new()`

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2844
@MingweiSamuel MingweiSamuel force-pushed the mingwei/push-fold branch 2 times, most recently from dbba146 to 5d2ac0c Compare May 6, 2026 00:03
These operators accumulate items during `start_send` and emit results
downstream during `poll_flush`, enabling them to work on the push side
of a subgraph without requiring a subgraph boundary.

New push operators:
- `Fold<Acc, CombFn, Next>`: accumulates all items via a fold function,
  emits the accumulated value on flush
- `Reduce<Acc, ReduceFn, Next>`: reduces items into a single value,
  emits on flush (nothing if no items received)
- `Sort<Item, Next>`: collects all items, sorts them, emits in sorted
  order on flush

These are needed because removing `DelayType::Stratum` allows blocking
operators to end up on the push side of a subgraph. Previously they were
always pull-side due to the forced subgraph boundary on their input.

This is part of the effort for singleton references in Hydro

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2844

Fix clippy missing_const_for_fn errors in dfir_pipes

Added `const` qualifier to three `new()` functions in dfir_pipes/src/push/
that clippy identified as being eligible for const evaluation:
- fold.rs: `Fold::new()`
- reduce.rs: `Reduce::new()`
- sort.rs: `Sort::new()`

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2844
@MingweiSamuel
Copy link
Copy Markdown
Member Author

I think we should rename poll_flush to poll_complete or poll_close ... maybe. Might need both, a la the actual Sink trait

@MingweiSamuel MingweiSamuel marked this pull request as draft May 6, 2026 23:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants