Fix persist_mut push-side borrow conflict; remove Stratum#2849
Draft
MingweiSamuel wants to merge 12 commits intomainfrom
Draft
Fix persist_mut push-side borrow conflict; remove Stratum#2849MingweiSamuel wants to merge 12 commits intomainfrom
MingweiSamuel wants to merge 12 commits intomainfrom
Conversation
6983c31 to
88f4415
Compare
917fadf to
278fbdc
Compare
20442db to
31885e7
Compare
These operators accumulate items during `start_send` and emit results downstream during `poll_flush`, enabling them to work on the push side of a subgraph without requiring a subgraph boundary. New push operators: - `Fold<Acc, CombFn, Next>`: accumulates all items via a fold function, emits the accumulated value on flush - `Reduce<Acc, ReduceFn, Next>`: reduces items into a single value, emits on flush (nothing if no items received) - `Sort<Item, Next>`: collects all items, sorts them, emits in sorted order on flush These are needed because removing `DelayType::Stratum` allows blocking operators to end up on the push side of a subgraph. Previously they were always pull-side due to the forced subgraph boundary on their input. This is part of the effort for singleton references in Hydro Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2844 Fix clippy missing_const_for_fn errors in dfir_pipes Added `const` qualifier to three `new()` functions in dfir_pipes/src/push/ that clippy identified as being eligible for const evaluation: - fold.rs: `Fold::new()` - reduce.rs: `Reduce::new()` - sort.rs: `Sort::new()` Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2844
… mode With the inline DAG codegen, each subgraph runs exactly once per tick. The `is_first_run_this_tick()` method always returned `true` and was dead code. This removes it and simplifies all operator codegen that depended on it. Changes: - Removed `Context::is_first_run_this_tick()` method from dfir_rs - Simplified `reduce_keyed` and `fold_keyed`: removed `.then_some().into_iter().flatten()` wrapping, now directly iterates the hashtable - Simplified `fold_no_replay` and `reduce_no_replay`: condition simplified from `__was_updated || (tick == 0 && is_first_run_this_tick())` to `__was_updated || tick == 0` - Simplified `persist`: removed replay_idx logic, always replays from start - Simplified `persist_mut` and `persist_mut_keyed`: removed if/else branch, always processes input and replays - Simplified `anti_join`, `join_multiset_half`, `join_fused_lhs`: removed replay_idx, always replays from start of accumulated vec - Simplified `cross_join_multiset`: removed lhs_i/rhs_i tracking, always produces full cross product - Simplified `join`: passes `true` directly instead of calling the method - Simplified `multiset_delta`: removed conditional, always swaps maps - Removed unused `context` destructuring from `anti_join` and `multiset_delta` Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2842
31885e7 to
a247155
Compare
Updated the doc comment on `Context` in `dfir_rs/src/scheduled/context.rs` to reference the methods that actually exist on the type (`request_task`, `current_tick`, `schedule_subgraph`) instead of the removed methods (`add_state`, `set_state_lifespan_hook`, `state_ref_unchecked`) that were flagged in the PR review. Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2842 Address PR #2842 review comments: remove unnecessary `[..]` slice syntax Addressed all 5 unresolved self-review comments from the PR author: - dfir_lang/src/graph/ops/anti_join.rs: Removed `[..]` and inlined the iterator assignment (merged two `let iter` lines into one) - dfir_lang/src/graph/ops/cross_join_multiset.rs: Removed `[..]` from `#rhs_state[..].iter()` - dfir_lang/src/graph/ops/join_fused_lhs.rs: Removed `[..]` from `#rhs_borrow_ident[..].iter()` - dfir_lang/src/graph/ops/join_multiset_half.rs: Removed `[..]` from `#probe_ident[..].iter()` - dfir_lang/src/graph/ops/persist.rs: Removed `[..]` from `#vec_ident[..].iter()` The `[..]` was unnecessary since `.iter()` on a Vec already returns a slice iterator. Verified with `cargo check -p dfir_lang`. Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2842
Updates the fold operator to use the new `dfir_pipes::push::Fold` struct when on the push side of a subgraph, enabling fold to work without a forced subgraph boundary on its input. Changes: - Updated `fold.rs` codegen: push-side now uses `push::Fold::new()` when there are downstream outputs, falls back to terminal `for_each` when fold is a singleton reference target with no outputs. - Restored `DelayType::Stratum` variant (with TODO to remove once all blocking operators have push implementations). - Restored `Stratum` for `persist_mut` and `persist_mut_keyed` which still require pull-side execution (they replay accumulated state). - Updated `graph_write.rs` to handle `Stratum` in mermaid rendering. All tests compile without proc macro panics. Snapshot tests need regeneration due to changed subgraph partitioning. Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2849
Updates sort and reduce to use their dfir_pipes push structs when on the push side of a subgraph. Temporarily restores DelayType::Stratum for fold_keyed, reduce_keyed, persist_mut, and persist_mut_keyed which still need pull-side execution (they emit multiple items and need more complex push implementations). Changes: - `sort.rs`: push-side uses `push::Sort::new(output)` - `reduce.rs`: push-side uses `push::Reduce::new(&mut state, func, output)` when there are downstream outputs, terminal `for_each` otherwise - `dfir_pipes::push::Fold`: updated to use `&mut Acc` reference pattern - `dfir_pipes::push::Reduce`: updated to use `&mut Option<Item>` reference - Restored `DelayType::Stratum` for fold_keyed, reduce_keyed, persist_mut, persist_mut_keyed (TODO: implement push-side for these) - Added `surface_push_blocking.rs` test file verifying sort and reduce work correctly on the push side All tests pass. No proc macro panics. Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2849
Adds push-side support for fold_keyed using the new dfir_pipes::push::FoldKeyed struct. The struct is generic over the HashMap hasher type to work with FxHashMap. reduce_keyed remains pull-only (Stratum) for now due to its different initialization semantics. Changes: - `dfir_pipes::push::FoldKeyed`: impl is now generic over `HashMap<K, V, S>` with `S: BuildHasher`, fixing type mismatch with FxHashMap - `fold_keyed.rs` codegen: removed `assert!(is_pull)`, added push branch using `push::FoldKeyed::new(&mut state, init_fn, comb_fn, output)` - `reduce_keyed.rs`: kept `DelayType::Stratum` (pull-only for now) - Updated test: `test_fold_keyed_push` passes Operators still requiring Stratum (pull-only): - reduce_keyed (different init semantics) - persist_mut / persist_mut_keyed (replay semantics) Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2849
Adds ReduceKeyed push struct to dfir_pipes and updates reduce_keyed operator codegen to use it on the push side. The ReduceKeyed struct uses the entry API to insert the first value directly and reduce subsequent values. Changes: - New `dfir_pipes::push::ReduceKeyed` struct: uses `Entry::Vacant` to insert first value, `Entry::Occupied` to reduce subsequent values - `reduce_keyed.rs` codegen: removed `assert!(is_pull)` and Stratum, added push branch using `push::ReduceKeyed::new(&mut state, func, output)` - Added `test_reduce_keyed_push` test All blocking operators now have push-side support except: - persist_mut / persist_mut_keyed (replay semantics, still uses Stratum) Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2849
Adds ReduceKeyed push struct and updates reduce_keyed codegen. The persist_mut operators remain pull-only (Stratum) due to borrow conflicts in the generated code when on the push side. Changes: - New `dfir_pipes::push::ReduceKeyed` struct: inserts first value for each key, reduces subsequent values via entry API - `reduce_keyed.rs` codegen: removed assert, added push branch - `persist_mut.rs` / `persist_mut_keyed.rs`: kept Stratum barrier. Push-side codegen exists but causes borrow conflicts — needs deeper codegen restructuring to fix. - Added `test_reduce_keyed_push` test Summary of push-side support: - ✅ fold, reduce, sort, fold_keyed, reduce_keyed - ❌ persist_mut, persist_mut_keyed (kept Stratum, needs codegen work) Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2849
Added expanded test file to reproduce and investigate the borrow conflict that occurs when persist_mut is on the push side (no Stratum barrier). Root cause: The codegen emits `Iterator::for_each(&mut iter, drop)` after the pivot `.await` to drain remaining source_iter items. But the `&mut iter` borrow from `pull::iter(&mut iter)` at the start of the subgraph is still considered alive by the borrow checker because the Pull is wrapped in opaque `impl Pull` type guards (pin_project structs) that hide the borrow release. The conflict is between: 1. `pull::iter(&mut sg_1v1_node_1v1_iter)` — first mutable borrow 2. `Iterator::for_each(&mut sg_1v1_node_1v1_iter, drop)` — second mutable borrow The borrow checker can't see through the opaque type to know (1) is dead after the `.await`. This is a general codegen issue, not specific to persist_mut — it affects any graph where source_iter's cleanup code runs in the same subgraph scope as the Pull chain that borrows the iterator. Files: - `surface_persist_mut_push.rs`: minimal DFIR test that triggers the error - `surface_persist_mut_push_expanded.rs`: cleaned-up expansion for experimenting Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2849
The borrow conflict was caused by `is_first_run_this_tick()` if/else in persist_mut's pull codegen — the else branch kept `op_1v1` (holding &mut iter) alive without consuming it, conflicting with source_iter's drain in write_iterator_after. Fix: removed the if/else (is_first_run_this_tick always returns true in inline DAG mode), so op_1v1 is always consumed by for_each, releasing the borrow before the drain runs. Changes: - `persist_mut.rs`: removed Stratum, removed is_first_run_this_tick if/else, added push-side terminal codegen (for_each accumulator) - `persist_mut_keyed.rs`: same treatment - `meta_graph.rs`: simplified pivot codegen (removed wrapper fn, direct .await) - Added test files for investigating the borrow conflict Remaining issue: persist_mut_keyed on push side is terminal (doesn't emit downstream), causing a type inference error in surface_persist test where it's expected to emit (K, V) pairs. Needs proper push-with-output impl. Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2849
Both persist_mut operators need to emit their accumulated state downstream on flush (not just accumulate terminally). The push-side terminal for_each codegen doesn't emit anything, causing test failures. These operators need a custom dfir_pipes push struct that iterates the SparseVec/HashMap state during poll_flush — similar to Sort/FoldKeyed but with persist semantics. Restored Stratum for both until proper push-with-output is implemented. The borrow conflict fix (removing is_first_run_this_tick if/else) is still in place and correct — the issue was purely about the terminal vs emit-on-flush semantics. All tests pass. No proc macro panics. Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2849
Both operators now work on the push side using Fold + flat_map pattern: - Fold accumulates Persistence/PersistenceKeyed items into the external state - On flush, Fold clones the state and emits it to flat_map - flat_map iterates the cloned state and emits individual items downstream Changes: - `persist_mut.rs`: push-side uses Fold + flat_map(SparseVec → Vec<T>) - `persist_mut_keyed.rs`: push-side uses Fold + flat_map(HashMap → Vec<(K,V)>) - `SparseVec`: added #[derive(Clone)] to support Fold's clone-on-flush - Removed DelayType::Stratum from both operators All tests pass. No more operators require Stratum for push-side support. Co-authored-by: Infinity 🤖 <infinity@hydro.run> PR: #2849
a247155 to
cf244f4
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The borrow conflict was caused by
is_first_run_this_tick()if/else inpersist_mut's pull codegen — the else branch kept
op_1v1(holding &mut iter)alive without consuming it, conflicting with source_iter's drain in
write_iterator_after.
Fix: removed the if/else (is_first_run_this_tick always returns true in
inline DAG mode), so op_1v1 is always consumed by for_each, releasing the
borrow before the drain runs.
Changes:
persist_mut.rs: removed Stratum, removed is_first_run_this_tick if/else,added push-side terminal codegen (for_each accumulator)
persist_mut_keyed.rs: same treatmentmeta_graph.rs: simplified pivot codegen (removed wrapper fn, direct .await)Remaining issue: persist_mut_keyed on push side is terminal (doesn't emit
downstream), causing a type inference error in surface_persist test where
it's expected to emit (K, V) pairs. Needs proper push-with-output impl.
Co-authored-by: Infinity 🤖 infinity@hydro.run