Skip to content

Fix persist_mut push-side borrow conflict; remove Stratum#2849

Draft
MingweiSamuel wants to merge 12 commits intomainfrom
mingwei/dfir-push-ops
Draft

Fix persist_mut push-side borrow conflict; remove Stratum#2849
MingweiSamuel wants to merge 12 commits intomainfrom
mingwei/dfir-push-ops

Conversation

@MingweiSamuel
Copy link
Copy Markdown
Member

The borrow conflict was caused by is_first_run_this_tick() if/else in
persist_mut's pull codegen — the else branch kept op_1v1 (holding &mut iter)
alive without consuming it, conflicting with source_iter's drain in
write_iterator_after.

Fix: removed the if/else (is_first_run_this_tick always returns true in
inline DAG mode), so op_1v1 is always consumed by for_each, releasing the
borrow before the drain runs.

Changes:

  • persist_mut.rs: removed Stratum, removed is_first_run_this_tick if/else,
    added push-side terminal codegen (for_each accumulator)
  • persist_mut_keyed.rs: same treatment
  • meta_graph.rs: simplified pivot codegen (removed wrapper fn, direct .await)
  • Added test files for investigating the borrow conflict

Remaining issue: persist_mut_keyed on push side is terminal (doesn't emit
downstream), causing a type inference error in surface_persist test where
it's expected to emit (K, V) pairs. Needs proper push-with-output impl.

Co-authored-by: Infinity 🤖 infinity@hydro.run

@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages Bot commented May 6, 2026

Deploying hydro with  Cloudflare Pages  Cloudflare Pages

Latest commit: cf244f4
Status:🚫  Build failed.

View logs

@MingweiSamuel MingweiSamuel force-pushed the mingwei/dfir-push-ops branch from 6983c31 to 88f4415 Compare May 6, 2026 19:38
@MingweiSamuel MingweiSamuel force-pushed the mingwei/delete-delaytype branch from 917fadf to 278fbdc Compare May 6, 2026 19:45
@MingweiSamuel MingweiSamuel changed the base branch from mingwei/delete-delaytype to main May 6, 2026 19:45
@MingweiSamuel MingweiSamuel force-pushed the mingwei/dfir-push-ops branch 3 times, most recently from 20442db to 31885e7 Compare May 6, 2026 22:44
MingweiSamuel and others added 2 commits May 6, 2026 22:45
These operators accumulate items during `start_send` and emit results
downstream during `poll_flush`, enabling them to work on the push side
of a subgraph without requiring a subgraph boundary.

New push operators:
- `Fold<Acc, CombFn, Next>`: accumulates all items via a fold function,
  emits the accumulated value on flush
- `Reduce<Acc, ReduceFn, Next>`: reduces items into a single value,
  emits on flush (nothing if no items received)
- `Sort<Item, Next>`: collects all items, sorts them, emits in sorted
  order on flush

These are needed because removing `DelayType::Stratum` allows blocking
operators to end up on the push side of a subgraph. Previously they were
always pull-side due to the forced subgraph boundary on their input.

This is part of the effort for singleton references in Hydro

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2844

Fix clippy missing_const_for_fn errors in dfir_pipes

Added `const` qualifier to three `new()` functions in dfir_pipes/src/push/
that clippy identified as being eligible for const evaluation:
- fold.rs: `Fold::new()`
- reduce.rs: `Reduce::new()`
- sort.rs: `Sort::new()`

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2844
… mode

With the inline DAG codegen, each subgraph runs exactly once per tick.
The `is_first_run_this_tick()` method always returned `true` and was dead
code. This removes it and simplifies all operator codegen that depended on it.

Changes:
- Removed `Context::is_first_run_this_tick()` method from dfir_rs
- Simplified `reduce_keyed` and `fold_keyed`: removed `.then_some().into_iter().flatten()`
  wrapping, now directly iterates the hashtable
- Simplified `fold_no_replay` and `reduce_no_replay`: condition simplified from
  `__was_updated || (tick == 0 && is_first_run_this_tick())` to
  `__was_updated || tick == 0`
- Simplified `persist`: removed replay_idx logic, always replays from start
- Simplified `persist_mut` and `persist_mut_keyed`: removed if/else branch,
  always processes input and replays
- Simplified `anti_join`, `join_multiset_half`, `join_fused_lhs`: removed
  replay_idx, always replays from start of accumulated vec
- Simplified `cross_join_multiset`: removed lhs_i/rhs_i tracking, always
  produces full cross product
- Simplified `join`: passes `true` directly instead of calling the method
- Simplified `multiset_delta`: removed conditional, always swaps maps
- Removed unused `context` destructuring from `anti_join` and `multiset_delta`

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2842
@MingweiSamuel MingweiSamuel force-pushed the mingwei/dfir-push-ops branch from 31885e7 to a247155 Compare May 6, 2026 22:59
MingweiSamuel and others added 10 commits May 6, 2026 23:14
Updated the doc comment on `Context` in `dfir_rs/src/scheduled/context.rs` to
reference the methods that actually exist on the type (`request_task`,
`current_tick`, `schedule_subgraph`) instead of the removed methods
(`add_state`, `set_state_lifespan_hook`, `state_ref_unchecked`) that were
flagged in the PR review.

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2842

Address PR #2842 review comments: remove unnecessary `[..]` slice syntax

Addressed all 5 unresolved self-review comments from the PR author:
- dfir_lang/src/graph/ops/anti_join.rs: Removed `[..]` and inlined the
  iterator assignment (merged two `let iter` lines into one)
- dfir_lang/src/graph/ops/cross_join_multiset.rs: Removed `[..]` from
  `#rhs_state[..].iter()`
- dfir_lang/src/graph/ops/join_fused_lhs.rs: Removed `[..]` from
  `#rhs_borrow_ident[..].iter()`
- dfir_lang/src/graph/ops/join_multiset_half.rs: Removed `[..]` from
  `#probe_ident[..].iter()`
- dfir_lang/src/graph/ops/persist.rs: Removed `[..]` from
  `#vec_ident[..].iter()`

The `[..]` was unnecessary since `.iter()` on a Vec already returns a
slice iterator. Verified with `cargo check -p dfir_lang`.

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2842
Updates the fold operator to use the new `dfir_pipes::push::Fold` struct
when on the push side of a subgraph, enabling fold to work without a
forced subgraph boundary on its input.

Changes:
- Updated `fold.rs` codegen: push-side now uses `push::Fold::new()` when
  there are downstream outputs, falls back to terminal `for_each` when
  fold is a singleton reference target with no outputs.
- Restored `DelayType::Stratum` variant (with TODO to remove once all
  blocking operators have push implementations).
- Restored `Stratum` for `persist_mut` and `persist_mut_keyed` which still
  require pull-side execution (they replay accumulated state).
- Updated `graph_write.rs` to handle `Stratum` in mermaid rendering.

All tests compile without proc macro panics. Snapshot tests need
regeneration due to changed subgraph partitioning.

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2849
Updates sort and reduce to use their dfir_pipes push structs when on the
push side of a subgraph. Temporarily restores DelayType::Stratum for
fold_keyed, reduce_keyed, persist_mut, and persist_mut_keyed which still
need pull-side execution (they emit multiple items and need more complex
push implementations).

Changes:
- `sort.rs`: push-side uses `push::Sort::new(output)`
- `reduce.rs`: push-side uses `push::Reduce::new(&mut state, func, output)`
  when there are downstream outputs, terminal `for_each` otherwise
- `dfir_pipes::push::Fold`: updated to use `&mut Acc` reference pattern
- `dfir_pipes::push::Reduce`: updated to use `&mut Option<Item>` reference
- Restored `DelayType::Stratum` for fold_keyed, reduce_keyed, persist_mut,
  persist_mut_keyed (TODO: implement push-side for these)
- Added `surface_push_blocking.rs` test file verifying sort and reduce
  work correctly on the push side

All tests pass. No proc macro panics.

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2849
Adds push-side support for fold_keyed using the new dfir_pipes::push::FoldKeyed
struct. The struct is generic over the HashMap hasher type to work with
FxHashMap. reduce_keyed remains pull-only (Stratum) for now due to its
different initialization semantics.

Changes:
- `dfir_pipes::push::FoldKeyed`: impl is now generic over `HashMap<K, V, S>`
  with `S: BuildHasher`, fixing type mismatch with FxHashMap
- `fold_keyed.rs` codegen: removed `assert!(is_pull)`, added push branch
  using `push::FoldKeyed::new(&mut state, init_fn, comb_fn, output)`
- `reduce_keyed.rs`: kept `DelayType::Stratum` (pull-only for now)
- Updated test: `test_fold_keyed_push` passes

Operators still requiring Stratum (pull-only):
- reduce_keyed (different init semantics)
- persist_mut / persist_mut_keyed (replay semantics)

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2849
Adds ReduceKeyed push struct to dfir_pipes and updates reduce_keyed
operator codegen to use it on the push side. The ReduceKeyed struct
uses the entry API to insert the first value directly and reduce
subsequent values.

Changes:
- New `dfir_pipes::push::ReduceKeyed` struct: uses `Entry::Vacant` to
  insert first value, `Entry::Occupied` to reduce subsequent values
- `reduce_keyed.rs` codegen: removed `assert!(is_pull)` and Stratum,
  added push branch using `push::ReduceKeyed::new(&mut state, func, output)`
- Added `test_reduce_keyed_push` test

All blocking operators now have push-side support except:
- persist_mut / persist_mut_keyed (replay semantics, still uses Stratum)

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2849
Adds ReduceKeyed push struct and updates reduce_keyed codegen. The
persist_mut operators remain pull-only (Stratum) due to borrow conflicts
in the generated code when on the push side.

Changes:
- New `dfir_pipes::push::ReduceKeyed` struct: inserts first value for
  each key, reduces subsequent values via entry API
- `reduce_keyed.rs` codegen: removed assert, added push branch
- `persist_mut.rs` / `persist_mut_keyed.rs`: kept Stratum barrier.
  Push-side codegen exists but causes borrow conflicts — needs deeper
  codegen restructuring to fix.
- Added `test_reduce_keyed_push` test

Summary of push-side support:
- ✅ fold, reduce, sort, fold_keyed, reduce_keyed
- ❌ persist_mut, persist_mut_keyed (kept Stratum, needs codegen work)

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2849
Added expanded test file to reproduce and investigate the borrow conflict
that occurs when persist_mut is on the push side (no Stratum barrier).

Root cause: The codegen emits `Iterator::for_each(&mut iter, drop)` after
the pivot `.await` to drain remaining source_iter items. But the `&mut iter`
borrow from `pull::iter(&mut iter)` at the start of the subgraph is still
considered alive by the borrow checker because the Pull is wrapped in opaque
`impl Pull` type guards (pin_project structs) that hide the borrow release.

The conflict is between:
1. `pull::iter(&mut sg_1v1_node_1v1_iter)` — first mutable borrow
2. `Iterator::for_each(&mut sg_1v1_node_1v1_iter, drop)` — second mutable borrow

The borrow checker can't see through the opaque type to know (1) is dead
after the `.await`. This is a general codegen issue, not specific to
persist_mut — it affects any graph where source_iter's cleanup code runs
in the same subgraph scope as the Pull chain that borrows the iterator.

Files:
- `surface_persist_mut_push.rs`: minimal DFIR test that triggers the error
- `surface_persist_mut_push_expanded.rs`: cleaned-up expansion for experimenting

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2849
The borrow conflict was caused by `is_first_run_this_tick()` if/else in
persist_mut's pull codegen — the else branch kept `op_1v1` (holding &mut iter)
alive without consuming it, conflicting with source_iter's drain in
write_iterator_after.

Fix: removed the if/else (is_first_run_this_tick always returns true in
inline DAG mode), so op_1v1 is always consumed by for_each, releasing the
borrow before the drain runs.

Changes:
- `persist_mut.rs`: removed Stratum, removed is_first_run_this_tick if/else,
  added push-side terminal codegen (for_each accumulator)
- `persist_mut_keyed.rs`: same treatment
- `meta_graph.rs`: simplified pivot codegen (removed wrapper fn, direct .await)
- Added test files for investigating the borrow conflict

Remaining issue: persist_mut_keyed on push side is terminal (doesn't emit
downstream), causing a type inference error in surface_persist test where
it's expected to emit (K, V) pairs. Needs proper push-with-output impl.

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2849
Both persist_mut operators need to emit their accumulated state downstream
on flush (not just accumulate terminally). The push-side terminal for_each
codegen doesn't emit anything, causing test failures. These operators need
a custom dfir_pipes push struct that iterates the SparseVec/HashMap state
during poll_flush — similar to Sort/FoldKeyed but with persist semantics.

Restored Stratum for both until proper push-with-output is implemented.

The borrow conflict fix (removing is_first_run_this_tick if/else) is still
in place and correct — the issue was purely about the terminal vs
emit-on-flush semantics.

All tests pass. No proc macro panics.

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2849
Both operators now work on the push side using Fold + flat_map pattern:
- Fold accumulates Persistence/PersistenceKeyed items into the external state
- On flush, Fold clones the state and emits it to flat_map
- flat_map iterates the cloned state and emits individual items downstream

Changes:
- `persist_mut.rs`: push-side uses Fold + flat_map(SparseVec → Vec<T>)
- `persist_mut_keyed.rs`: push-side uses Fold + flat_map(HashMap → Vec<(K,V)>)
- `SparseVec`: added #[derive(Clone)] to support Fold's clone-on-flush
- Removed DelayType::Stratum from both operators

All tests pass. No more operators require Stratum for push-side support.

Co-authored-by: Infinity 🤖 <infinity@hydro.run>
PR: #2849
@MingweiSamuel MingweiSamuel force-pushed the mingwei/dfir-push-ops branch from a247155 to cf244f4 Compare May 6, 2026 23:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant