feat(dfir_pipes): add push-side Fold, Reduce, and Sort operators#2844
feat(dfir_pipes): add push-side Fold, Reduce, and Sort operators#2844MingweiSamuel wants to merge 1 commit intomainfrom
Fold, Reduce, and Sort operators#2844Conversation
Deploying hydro with
|
| Latest commit: |
39b0fbe
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://7eeb0c80.hydroflow.pages.dev |
| Branch Preview URL: | https://mingwei-push-fold.hydroflow.pages.dev |
These operators accumulate items during `start_send` and emit results downstream during `poll_flush`, enabling them to work on the push side of a subgraph without requiring a subgraph boundary. New push operators: - `Fold<Acc, CombFn, Next>`: accumulates all items via a fold function, emits the accumulated value on flush - `Reduce<Acc, ReduceFn, Next>`: reduces items into a single value, emits on flush (nothing if no items received) - `Sort<Item, Next>`: collects all items, sorts them, emits in sorted order on flush These are needed because removing `DelayType::Stratum` allows blocking operators to end up on the push side of a subgraph. Previously they were always pull-side due to the forced subgraph boundary on their input. This is part of the effort for singleton references in Hydro Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2844
9488a92 to
ebd5997
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds new push-side blocking combinators to dfir_pipes so aggregation/sorting can happen on the push side of a subgraph (accumulating during start_send and emitting during poll_flush), supporting the post-DelayType::Stratum execution model and ongoing singleton-reference work.
Changes:
- Introduces new push combinators:
Fold,Reduce, andSort. - Wires the new operators into
dfir_pipes::pushvia module declarations and re-exports. - Adds
Sortbehind theallocfeature gate.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
| dfir_pipes/src/push/fold.rs | Adds push-side Fold combinator (accumulate on send, emit on flush). |
| dfir_pipes/src/push/reduce.rs | Adds push-side Reduce combinator (reduce into single value, emit on flush). |
| dfir_pipes/src/push/sort.rs | Adds push-side Sort combinator (buffer, sort, emit in order on flush). |
| dfir_pipes/src/push/mod.rs | Registers/re-exports the new push modules/operators (with Sort behind alloc). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| *this.flushed = true; | ||
| if let Some(value) = this.acc.as_ref() { | ||
| this.next.as_mut().start_send(value.clone(), ()); |
| if !*this.flushed { | ||
| *this.flushed = true; | ||
| if let Some(value) = this.acc.as_ref() { | ||
| this.next.as_mut().start_send(value.clone(), ()); | ||
| } | ||
| } | ||
| this.next.poll_flush(ctx) | ||
| } |
There was a problem hiding this comment.
I wonder if this indicates the trait needs a poll_close method to solely indicate end of life. Need to nail this down a bit better
| fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep<Self::CanPend> { | ||
| let mut this = self.project(); | ||
| if !*this.flushed { | ||
| *this.flushed = true; | ||
| let value = this.acc.clone(); | ||
| this.next.as_mut().start_send(value, ()); | ||
| } | ||
| this.next.poll_flush(ctx) | ||
| } |
| let mut this = self.project(); | ||
| if !*this.flushed { | ||
| *this.flushed = true; | ||
| let value = this.acc.clone(); | ||
| this.next.as_mut().start_send(value, ()); | ||
| } | ||
| this.next.poll_flush(ctx) |
There was a problem hiding this comment.
This is a good question
| pub use flat_map_stream::FlatMapStream; | ||
| pub use flatten::Flatten; | ||
| pub use flatten_stream::FlattenStream; | ||
| pub use fold::Fold; | ||
| pub use for_each::ForEach; | ||
| use futures_core::FusedStream; | ||
| pub use inspect::Inspect; | ||
| pub use map::Map; | ||
| #[cfg(feature = "alloc")] | ||
| #[cfg_attr(docsrs, doc(cfg(feature = "alloc")))] | ||
| pub use persist::Persist; | ||
| pub use reduce::Reduce; | ||
| pub use resolve_futures::ResolveFutures; | ||
| pub use sink::Sink; | ||
| pub use sink_compat::SinkCompat; | ||
| #[cfg(feature = "alloc")] | ||
| #[cfg_attr(docsrs, doc(cfg(feature = "alloc")))] | ||
| pub use sort::Sort; |
| pin_project! { | ||
| /// Push combinator that accumulates all items via a fold function, then emits | ||
| /// the accumulated value downstream on flush. | ||
| /// | ||
| /// During `start_send`, items are folded into the accumulator. | ||
| /// During `poll_flush`, the accumulated value is cloned and sent downstream, | ||
| /// then the downstream is flushed. | ||
| #[must_use = "`Push`es do nothing unless items are pushed into them"] | ||
| #[derive(Clone, Debug)] | ||
| pub struct Fold<Acc, CombFn, Next> { | ||
| #[pin] | ||
| next: Next, | ||
| acc: Acc, | ||
| comb_fn: CombFn, | ||
| flushed: bool, | ||
| } | ||
| } |
| pin_project! { | ||
| /// Push combinator that reduces all items into a single value, then emits | ||
| /// it downstream on flush. If no items were received, nothing is emitted. | ||
| /// | ||
| /// During `start_send`, items are reduced into the accumulator. | ||
| /// During `poll_flush`, the accumulated value (if any) is sent downstream. | ||
| #[must_use = "`Push`es do nothing unless items are pushed into them"] | ||
| #[derive(Clone, Debug)] | ||
| pub struct Reduce<Acc, ReduceFn, Next> { | ||
| #[pin] | ||
| next: Next, | ||
| acc: Option<Acc>, | ||
| reduce_fn: ReduceFn, | ||
| flushed: bool, | ||
| } | ||
| } |
| pin_project! { | ||
| /// Push combinator that collects all items, sorts them, then emits them | ||
| /// downstream in sorted order on flush. | ||
| #[must_use = "`Push`es do nothing unless items are pushed into them"] | ||
| #[derive(Clone, Debug)] | ||
| pub struct Sort<Item, Next> { | ||
| #[pin] | ||
| next: Next, | ||
| buf: Vec<Item>, | ||
| sorted: bool, | ||
| flush_idx: usize, | ||
| } | ||
| } |
These operators accumulate items during `start_send` and emit results downstream during `poll_flush`, enabling them to work on the push side of a subgraph without requiring a subgraph boundary. New push operators: - `Fold<Acc, CombFn, Next>`: accumulates all items via a fold function, emits the accumulated value on flush - `Reduce<Acc, ReduceFn, Next>`: reduces items into a single value, emits on flush (nothing if no items received) - `Sort<Item, Next>`: collects all items, sorts them, emits in sorted order on flush These are needed because removing `DelayType::Stratum` allows blocking operators to end up on the push side of a subgraph. Previously they were always pull-side due to the forced subgraph boundary on their input. This is part of the effort for singleton references in Hydro Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2844 Fix clippy missing_const_for_fn errors in dfir_pipes Added `const` qualifier to three `new()` functions in dfir_pipes/src/push/ that clippy identified as being eligible for const evaluation: - fold.rs: `Fold::new()` - reduce.rs: `Reduce::new()` - sort.rs: `Sort::new()` Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2844
dbba146 to
5d2ac0c
Compare
These operators accumulate items during `start_send` and emit results downstream during `poll_flush`, enabling them to work on the push side of a subgraph without requiring a subgraph boundary. New push operators: - `Fold<Acc, CombFn, Next>`: accumulates all items via a fold function, emits the accumulated value on flush - `Reduce<Acc, ReduceFn, Next>`: reduces items into a single value, emits on flush (nothing if no items received) - `Sort<Item, Next>`: collects all items, sorts them, emits in sorted order on flush These are needed because removing `DelayType::Stratum` allows blocking operators to end up on the push side of a subgraph. Previously they were always pull-side due to the forced subgraph boundary on their input. This is part of the effort for singleton references in Hydro Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2844 Fix clippy missing_const_for_fn errors in dfir_pipes Added `const` qualifier to three `new()` functions in dfir_pipes/src/push/ that clippy identified as being eligible for const evaluation: - fold.rs: `Fold::new()` - reduce.rs: `Reduce::new()` - sort.rs: `Sort::new()` Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2844
5d2ac0c to
39b0fbe
Compare
|
I think we should rename |
These operators accumulate items during
start_sendand emit resultsdownstream during
poll_flush, enabling them to work on the push sideof a subgraph without requiring a subgraph boundary.
New push operators:
Fold<Acc, CombFn, Next>: accumulates all items via a fold function,emits the accumulated value on flush
Reduce<Acc, ReduceFn, Next>: reduces items into a single value,emits on flush (nothing if no items received)
Sort<Item, Next>: collects all items, sorts them, emits in sortedorder on flush
These are needed because removing
DelayType::Stratumallows blockingoperators to end up on the push side of a subgraph. Previously they were
always pull-side due to the forced subgraph boundary on their input.
This is part of the effort for singleton references in Hydro
Co-authored-by: Infinity 🤖 infinity@hydro.run