diff --git a/dfir_lang/src/graph/graph_write.rs b/dfir_lang/src/graph/graph_write.rs index f6ace1294b24..9a82dd12fbb2 100644 --- a/dfir_lang/src/graph/graph_write.rs +++ b/dfir_lang/src/graph/graph_write.rs @@ -181,7 +181,7 @@ where src = src_str.trim(), arrow_body = "--", arrow_head = match delay_type { - None | Some(DelayType::MonotoneAccum) => ">", + None => ">", Some(DelayType::Stratum) => "x", Some(DelayType::Tick | DelayType::TickLazy) => "o", }, @@ -201,7 +201,6 @@ where self.link_count, match delay_type { DelayType::Stratum | DelayType::Tick | DelayType::TickLazy => "red", - DelayType::MonotoneAccum => "#060", } )?; } diff --git a/dfir_lang/src/graph/meta_graph.rs b/dfir_lang/src/graph/meta_graph.rs index ffa50128c5b9..fabd759433d5 100644 --- a/dfir_lang/src/graph/meta_graph.rs +++ b/dfir_lang/src/graph/meta_graph.rs @@ -1422,16 +1422,7 @@ impl DfirGraph { ); let root = change_spans(root.clone(), pivot_span); subgraph_op_iter_code.push(quote_spanned! {pivot_span=> - #[inline(always)] - fn #pivot_fn_ident(pull: Pul, push: Psh) - -> impl ::std::future::Future - where - Pul: #root::dfir_pipes::pull::Pull, - Psh: #root::dfir_pipes::push::Push, - { - #root::dfir_pipes::pull::Pull::send_push(pull, push) - } - (#pivot_fn_ident)(#pull_ident, #push_ident).await; + #root::dfir_pipes::pull::Pull::send_push(#pull_ident, #push_ident).await; }); } }; diff --git a/dfir_lang/src/graph/ops/anti_join.rs b/dfir_lang/src/graph/ops/anti_join.rs index 2090d5fa4660..4f7693467cc5 100644 --- a/dfir_lang/src/graph/ops/anti_join.rs +++ b/dfir_lang/src/graph/ops/anti_join.rs @@ -45,7 +45,6 @@ pub const ANTI_JOIN: OperatorConstraints = OperatorConstraints { }, write_fn: |wc @ &WriteContextArgs { root, - context, op_span, work_fn_async, ident, @@ -118,12 +117,6 @@ pub const ANTI_JOIN: OperatorConstraints = OperatorConstraints { let #ident = { #accum_neg - let replay_idx = if #context.is_first_run_this_tick() { - 0 - } else { - #pos_ident.len() - }; - // Accum into pos vec let fut = #root::dfir_pipes::pull::Pull::for_each(#input_pos, |kv| { #pos_ident.push(kv); @@ -131,8 +124,7 @@ pub const ANTI_JOIN: OperatorConstraints = OperatorConstraints { let () = #work_fn_async(fut).await; // Replay out of pos vec - let iter = #pos_ident[replay_idx..].iter(); - let iter = ::std::iter::Iterator::filter(iter, |(k, _)| { + let iter = ::std::iter::Iterator::filter(#pos_ident.iter(), |(k, _)| { !#neg_ident.contains(k) }); let iter = ::std::iter::Iterator::cloned(iter); diff --git a/dfir_lang/src/graph/ops/cross_join_multiset.rs b/dfir_lang/src/graph/ops/cross_join_multiset.rs index 06335f53402f..7a1f5f06eca7 100644 --- a/dfir_lang/src/graph/ops/cross_join_multiset.rs +++ b/dfir_lang/src/graph/ops/cross_join_multiset.rs @@ -41,7 +41,6 @@ pub const CROSS_JOIN_MULTISET: OperatorConstraints = OperatorConstraints { input_delaytype_fn: |_| None, write_fn: |wc @ &WriteContextArgs { root, - context, op_span, work_fn_async, ident, @@ -74,33 +73,17 @@ pub const CROSS_JOIN_MULTISET: OperatorConstraints = OperatorConstraints { _ => Default::default(), }; - let lhs_i = wc.make_ident("lhs_i"); - let rhs_i = wc.make_ident("rhs_i"); let write_iterator = quote_spanned! {op_span=> - let (#lhs_i, #rhs_i) = if #context.is_first_run_this_tick() { - (0, 0) - } else { - (#lhs_state.len(), #rhs_state.len()) - }; - #work_fn_async(#root::dfir_pipes::pull::Pull::for_each(#lhs, |x| #lhs_state.push(x))).await; #work_fn_async(#root::dfir_pipes::pull::Pull::for_each(#rhs, |x| #rhs_state.push(x))).await; - // RHS - // +-----+-----+ - // L | Old | New | - // H +-----+-----+ - // S | New | New | - // +-----+-----+ let #ident = #root::dfir_pipes::pull::iter( #lhs_state .iter() - .enumerate() - .flat_map(|(i, lhs)| { - let j = if i < #lhs_i { #rhs_i } else { 0 }; - #rhs_state[j..] - .iter() - .map(move |rhs| (::std::clone::Clone::clone(lhs), ::std::clone::Clone::clone(rhs))) + .flat_map(|lhs| { + #rhs_state + .iter() + .map(move |rhs| (::std::clone::Clone::clone(lhs), ::std::clone::Clone::clone(rhs))) }) ); }; diff --git a/dfir_lang/src/graph/ops/fold.rs b/dfir_lang/src/graph/ops/fold.rs index 578be2330702..990a25feee7f 100644 --- a/dfir_lang/src/graph/ops/fold.rs +++ b/dfir_lang/src/graph/ops/fold.rs @@ -127,8 +127,8 @@ pub const FOLD: OperatorConstraints = OperatorConstraints { ) ); } - } else { - assert_eq!(0, outputs.len()); + } else if outputs.is_empty() { + // Terminal push: fold is a singleton reference target with no downstream. quote_spanned! {op_span=> let #ident = #root::dfir_pipes::push::for_each(|#item_ident| { #assign_accum_ident @@ -136,6 +136,15 @@ pub const FOLD: OperatorConstraints = OperatorConstraints { #foreach_body }); } + } else { + let output = &outputs[0]; + quote_spanned! {op_span=> + let #ident = #root::dfir_pipes::push::Fold::new( + &mut #singleton_output_ident, + |#accumulator_ident: &mut _, #item_ident| { #foreach_body }, + #output, + ); + } }; Ok(OperatorWriteOutput { diff --git a/dfir_lang/src/graph/ops/fold_keyed.rs b/dfir_lang/src/graph/ops/fold_keyed.rs index 04432c93df5c..16721bb23ae9 100644 --- a/dfir_lang/src/graph/ops/fold_keyed.rs +++ b/dfir_lang/src/graph/ops/fold_keyed.rs @@ -1,7 +1,7 @@ use quote::{ToTokens, quote_spanned}; use super::{ - DelayType, OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance, + OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance, OperatorWriteOutput, Persistence, RANGE_1, WriteContextArgs, }; @@ -80,17 +80,16 @@ pub const FOLD_KEYED: OperatorConstraints = OperatorConstraints { flo_type: None, ports_inn: None, ports_out: None, - input_delaytype_fn: |_| Some(DelayType::Stratum), + input_delaytype_fn: |_| None, write_fn: |wc @ &WriteContextArgs { - context, op_span, work_fn_async, ident, inputs, + outputs, singleton_output_ident, is_pull, root, - op_name, op_inst: OperatorInstance { generics: @@ -105,8 +104,6 @@ pub const FOLD_KEYED: OperatorConstraints = OperatorConstraints { .. }, _| { - assert!(is_pull, "TODO(mingwei): `{}` only supports pull.", op_name); - let persistence = match persistence_args[..] { [] => Persistence::Tick, [a] => a, @@ -145,7 +142,17 @@ pub const FOLD_KEYED: OperatorConstraints = OperatorConstraints { let mut #hashtable_ident = &mut #singleton_output_ident; }; - let write_iterator = if Persistence::Mutable == persistence { + let write_iterator = if !is_pull { + let output = &outputs[0]; + quote_spanned! {op_span=> + let #ident = #root::dfir_pipes::push::FoldKeyed::new( + &mut #singleton_output_ident, + #initfn, + #aggfn, + #output, + ); + } + } else if Persistence::Mutable == persistence { quote_spanned! {op_span=> #assign_hashtable_ident @@ -202,13 +209,8 @@ pub const FOLD_KEYED: OperatorConstraints = OperatorConstraints { ) }, Persistence::Static => quote_spanned! {op_span=> - // Play everything but only on the first run of this tick/stratum. - // (We know we won't have any more inputs, so it is fine to only play once. - // Because of the `DelayType::Stratum` or `DelayType::MonotoneAccum`). - #context.is_first_run_this_tick() - .then_some(#hashtable_ident.iter()) - .into_iter() - .flatten() + // Play everything (each subgraph runs exactly once per tick). + #hashtable_ident.iter() .map( #[allow(suspicious_double_ref_op, clippy::clone_on_copy)] |(k, v)| ( diff --git a/dfir_lang/src/graph/ops/fold_no_replay.rs b/dfir_lang/src/graph/ops/fold_no_replay.rs index ec60bf9b5ea4..8225e591a8b2 100644 --- a/dfir_lang/src/graph/ops/fold_no_replay.rs +++ b/dfir_lang/src/graph/ops/fold_no_replay.rs @@ -106,7 +106,7 @@ pub const FOLD_NO_REPLAY: OperatorConstraints = OperatorConstraints { let () = #work_fn_async(__fut).await; } - let #ident = if __was_updated || (#context.current_tick().0 == 0 && #context.is_first_run_this_tick()) { + let #ident = if __was_updated || #context.current_tick().0 == 0 { #work_fn( || #root::dfir_pipes::pull::iter( ::std::option::Option::Some(::std::clone::Clone::clone(&*#accumulator_ident)) diff --git a/dfir_lang/src/graph/ops/join.rs b/dfir_lang/src/graph/ops/join.rs index c7265893c718..36a133674177 100644 --- a/dfir_lang/src/graph/ops/join.rs +++ b/dfir_lang/src/graph/ops/join.rs @@ -96,7 +96,6 @@ pub const JOIN: OperatorConstraints = OperatorConstraints { input_delaytype_fn: |_| None, write_fn: |wc @ &WriteContextArgs { root, - context, loop_id, op_span, work_fn, @@ -205,7 +204,7 @@ pub const JOIN: OperatorConstraints = OperatorConstraints { ).await } - let fut = check_inputs(#lhs, #rhs, &mut #lhs_joindata_ident, &mut #rhs_joindata_ident, #context.is_first_run_this_tick()); + let fut = check_inputs(#lhs, #rhs, &mut #lhs_joindata_ident, &mut #rhs_joindata_ident, true); #work_fn_async(fut).await }; }; diff --git a/dfir_lang/src/graph/ops/join_fused_lhs.rs b/dfir_lang/src/graph/ops/join_fused_lhs.rs index 4c1d82595f15..f8bf43067087 100644 --- a/dfir_lang/src/graph/ops/join_fused_lhs.rs +++ b/dfir_lang/src/graph/ops/join_fused_lhs.rs @@ -45,7 +45,6 @@ pub const JOIN_FUSED_LHS: OperatorConstraints = OperatorConstraints { }, write_fn: |wc @ &WriteContextArgs { root, - context, op_span, work_fn_async, ident, @@ -101,13 +100,6 @@ pub const JOIN_FUSED_LHS: OperatorConstraints = OperatorConstraints { #root::dfir_pipes::pull::accumulate_all(&mut #lhs_accum, &mut *#lhs_borrow, #lhs), ).await; - // RHS replay index. - let replay_idx = if #context.is_first_run_this_tick() { - 0 - } else { - #rhs_borrow_ident.len() - }; - // Accumulate RHS. let () = #work_fn_async( #root::dfir_pipes::pull::Pull::for_each(#rhs, |kv| { @@ -118,9 +110,9 @@ pub const JOIN_FUSED_LHS: OperatorConstraints = OperatorConstraints { #[allow(clippy::clone_on_copy)] #[allow(suspicious_double_ref_op)] - let iter = #rhs_borrow_ident[replay_idx..] - .iter() - .filter_map(|(k, v2)| #lhs_borrow.get(k).map(|v1| (k.clone(), (v1.clone(), v2.clone())))); + let iter = #rhs_borrow_ident + .iter() + .filter_map(|(k, v2)| #lhs_borrow.get(k).map(|v1| (k.clone(), (v1.clone(), v2.clone())))); #root::dfir_pipes::pull::iter(iter) }; }, diff --git a/dfir_lang/src/graph/ops/join_multiset_half.rs b/dfir_lang/src/graph/ops/join_multiset_half.rs index 0ab368d1206e..1bf8affea562 100644 --- a/dfir_lang/src/graph/ops/join_multiset_half.rs +++ b/dfir_lang/src/graph/ops/join_multiset_half.rs @@ -43,7 +43,6 @@ pub const JOIN_MULTISET_HALF: OperatorConstraints = OperatorConstraints { }, write_fn: |wc @ &WriteContextArgs { root, - context, op_span, work_fn_async, ident, @@ -137,12 +136,6 @@ pub const JOIN_MULTISET_HALF: OperatorConstraints = OperatorConstraints { let #ident = { #accum_build - let replay_idx = if #context.is_first_run_this_tick() { - 0 - } else { - #probe_ident.len() - }; - // Accum into probe vec let fut = #root::dfir_pipes::pull::Pull::for_each(#input_probe, |kv| { #probe_ident.push(kv); @@ -151,7 +144,7 @@ pub const JOIN_MULTISET_HALF: OperatorConstraints = OperatorConstraints { // Replay out of probe vec #[allow(clippy::clone_on_copy, noop_method_call)] - let iter = #probe_ident[replay_idx..].iter().flat_map(|(k, v_probe)| { + let iter = #probe_ident.iter().flat_map(|(k, v_probe)| { #build_ident .get(k) .map(|vals: &::std::vec::Vec<_>| { diff --git a/dfir_lang/src/graph/ops/multiset_delta.rs b/dfir_lang/src/graph/ops/multiset_delta.rs index 0c43cdf21039..7d54ce462678 100644 --- a/dfir_lang/src/graph/ops/multiset_delta.rs +++ b/dfir_lang/src/graph/ops/multiset_delta.rs @@ -52,7 +52,6 @@ pub const MULTISET_DELTA: OperatorConstraints = OperatorConstraints { write_fn: |wc @ &WriteContextArgs { root, op_span, - context, ident, inputs, outputs, @@ -74,14 +73,12 @@ pub const MULTISET_DELTA: OperatorConstraints = OperatorConstraints { let tick_swap = quote_spanned! {op_span=> { - if #context.is_first_run_this_tick() { - let (mut prev_map, mut curr_map) = ( - #prev_data.borrow_mut(), - #curr_data.borrow_mut(), - ); - ::std::mem::swap(::std::ops::DerefMut::deref_mut(&mut prev_map), ::std::ops::DerefMut::deref_mut(&mut curr_map)); - curr_map.clear(); - } + let (mut prev_map, mut curr_map) = ( + #prev_data.borrow_mut(), + #curr_data.borrow_mut(), + ); + ::std::mem::swap(::std::ops::DerefMut::deref_mut(&mut prev_map), ::std::ops::DerefMut::deref_mut(&mut curr_map)); + curr_map.clear(); } }; diff --git a/dfir_lang/src/graph/ops/persist.rs b/dfir_lang/src/graph/ops/persist.rs index 4dee3caa3e91..e01c1de07780 100644 --- a/dfir_lang/src/graph/ops/persist.rs +++ b/dfir_lang/src/graph/ops/persist.rs @@ -53,7 +53,6 @@ pub const PERSIST: OperatorConstraints = OperatorConstraints { input_delaytype_fn: |_| None, write_fn: |wc @ &WriteContextArgs { root, - context, op_span, ident, is_pull, @@ -99,18 +98,12 @@ pub const PERSIST: OperatorConstraints = OperatorConstraints { let #vec_ident = &mut #persistdata_ident; let #ident = { - let replay_idx = if #context.is_first_run_this_tick() { - 0 - } else { - #vec_ident.len() - }; - let fut = #root::dfir_pipes::pull::Pull::for_each(#input, |item| { #vec_ident.push(item); }); let () = #work_fn_async(fut).await; - let iter = #vec_ident[replay_idx..].iter().cloned(); + let iter = #vec_ident.iter().cloned(); #root::dfir_pipes::pull::iter(iter) }; } @@ -127,7 +120,7 @@ pub const PERSIST: OperatorConstraints = OperatorConstraints { { #root::dfir_pipes::push::persist_state(vec, is_new_tick, output) } - constrain_types(&mut *#vec_ident, #output, #context.is_first_run_this_tick()) + constrain_types(&mut *#vec_ident, #output, true) }; } }; diff --git a/dfir_lang/src/graph/ops/persist_mut.rs b/dfir_lang/src/graph/ops/persist_mut.rs index b7cd6ea0de8d..f74cb55c8149 100644 --- a/dfir_lang/src/graph/ops/persist_mut.rs +++ b/dfir_lang/src/graph/ops/persist_mut.rs @@ -42,14 +42,14 @@ pub const PERSIST_MUT: OperatorConstraints = OperatorConstraints { flo_type: None, ports_inn: None, ports_out: None, - input_delaytype_fn: |_| Some(DelayType::Stratum), + input_delaytype_fn: |_| None, write_fn: |wc @ &WriteContextArgs { root, - context, op_span, work_fn_async, ident, inputs, + outputs, is_pull, op_name, op_inst: @@ -63,8 +63,6 @@ pub const PERSIST_MUT: OperatorConstraints = OperatorConstraints { .. }, diagnostics| { - assert!(is_pull); - if [Persistence::Mutable] != persistence_args[..] { diagnostics.push(Diagnostic::spanned( op_span, @@ -82,7 +80,7 @@ pub const PERSIST_MUT: OperatorConstraints = OperatorConstraints { let mut #persistdata_ident = #root::util::sparse_vec::SparseVec::default(); }; - let write_iterator = { + let write_iterator = if is_pull { let input = &inputs[0]; quote_spanned! {op_span=> let #ident = { @@ -95,7 +93,7 @@ pub const PERSIST_MUT: OperatorConstraints = OperatorConstraints { prev } - let iter = if #context.is_first_run_this_tick() { + let iter = { let fut = #root::dfir_pipes::pull::Pull::for_each(check_pull(#input), |item| { match item { #root::util::Persistence::Persist(v) => #persistdata_ident.push(v), @@ -104,13 +102,28 @@ pub const PERSIST_MUT: OperatorConstraints = OperatorConstraints { }); let () = #work_fn_async(fut).await; - Some(#persistdata_ident.iter().cloned()).into_iter().flatten() - } else { - None.into_iter().flatten() + #persistdata_ident.iter().cloned() }; #root::dfir_pipes::pull::iter(iter) }; } + } else { + let output = &outputs[0]; + quote_spanned! {op_span=> + let #ident = #root::dfir_pipes::push::Fold::new( + &mut #persistdata_ident, + |state: &mut #root::util::sparse_vec::SparseVec<_>, item| { + match item { + #root::util::Persistence::Persist(v) => state.push(v), + #root::util::Persistence::Delete(v) => state.delete(&v), + } + }, + #root::dfir_pipes::push::flat_map( + |state: #root::util::sparse_vec::SparseVec<_>| state.iter().cloned().collect::<::std::vec::Vec<_>>(), + #output, + ), + ); + } }; Ok(OperatorWriteOutput { diff --git a/dfir_lang/src/graph/ops/persist_mut_keyed.rs b/dfir_lang/src/graph/ops/persist_mut_keyed.rs index 6d7437ffb6a5..9bbb90dcd274 100644 --- a/dfir_lang/src/graph/ops/persist_mut_keyed.rs +++ b/dfir_lang/src/graph/ops/persist_mut_keyed.rs @@ -42,14 +42,14 @@ pub const PERSIST_MUT_KEYED: OperatorConstraints = OperatorConstraints { flo_type: None, ports_inn: None, ports_out: None, - input_delaytype_fn: |_| Some(DelayType::Stratum), + input_delaytype_fn: |_| None, write_fn: |wc @ &WriteContextArgs { root, - context, op_span, work_fn_async, ident, inputs, + outputs, is_pull, op_name, op_inst: @@ -63,8 +63,6 @@ pub const PERSIST_MUT_KEYED: OperatorConstraints = OperatorConstraints { .. }, diagnostics| { - assert!(is_pull); - if [Persistence::Mutable] != persistence_args[..] { diagnostics.push(Diagnostic::spanned( op_span, @@ -83,7 +81,7 @@ pub const PERSIST_MUT_KEYED: OperatorConstraints = OperatorConstraints { #root::rustc_hash::FxHashMap::<_, #root::util::sparse_vec::SparseVec<_>>::default(); }; - let write_iterator = { + let write_iterator = if is_pull { let input = &inputs[0]; quote_spanned! {op_span=> let #ident = { @@ -96,7 +94,7 @@ pub const PERSIST_MUT_KEYED: OperatorConstraints = OperatorConstraints { prev } - let iter = if #context.is_first_run_this_tick() { + let iter = { let fut = #root::dfir_pipes::pull::Pull::for_each(check_pull(#input), |item| { match item { #root::util::PersistenceKeyed::Persist(k, v) => { @@ -111,19 +109,43 @@ pub const PERSIST_MUT_KEYED: OperatorConstraints = OperatorConstraints { #[allow(clippy::clone_on_copy)] #[allow(clippy::disallowed_methods, reason = "FxHasher is deterministic")] - Some( - #persistdata_ident - .iter() - .flat_map(|(k, v)| v.iter().map(move |v| (k.clone(), v.clone()))) - ) - .into_iter() - .flatten() - } else { - None.into_iter().flatten() + #persistdata_ident + .iter() + .flat_map(|(k, v)| v.iter().map(move |v| (k.clone(), v.clone()))) }; #root::dfir_pipes::pull::iter(iter) }; } + } else { + let output = &outputs[0]; + quote_spanned! {op_span=> + let #ident = #root::dfir_pipes::push::Fold::new( + &mut #persistdata_ident, + |state: &mut #root::rustc_hash::FxHashMap<_, #root::util::sparse_vec::SparseVec<_>>, item| { + match item { + #root::util::PersistenceKeyed::Persist(k, v) => { + state.entry(k).or_default().push(v); + }, + #root::util::PersistenceKeyed::Delete(k) => { + state.remove(&k); + } + } + }, + #root::dfir_pipes::push::flat_map( + #[allow(clippy::clone_on_copy)] + |state| { + let mut out = ::std::vec::Vec::new(); + for (k, v) in <_ as ::std::iter::IntoIterator>::into_iter(state) { + for item in #root::util::sparse_vec::SparseVec::iter(&v) { + out.push((::std::clone::Clone::clone(&k), ::std::clone::Clone::clone(item))); + } + } + out + }, + #output, + ), + ); + } }; Ok(OperatorWriteOutput { diff --git a/dfir_lang/src/graph/ops/reduce.rs b/dfir_lang/src/graph/ops/reduce.rs index c7ed1463e717..46165c302896 100644 --- a/dfir_lang/src/graph/ops/reduce.rs +++ b/dfir_lang/src/graph/ops/reduce.rs @@ -53,6 +53,7 @@ pub const REDUCE: OperatorConstraints = OperatorConstraints { work_fn_async, ident, inputs, + outputs, is_pull, singleton_output_ident, arguments, @@ -117,8 +118,8 @@ pub const REDUCE: OperatorConstraints = OperatorConstraints { ) ); } - } else { - // Is only push when used as a singleton, so no need to push to `outputs[0]`. + } else if outputs.is_empty() { + // Terminal push: reduce is a singleton reference target with no downstream. quote_spanned! {op_span=> let #ident = #root::dfir_pipes::push::for_each(|#item_ident| { #assign_accum_ident @@ -126,6 +127,18 @@ pub const REDUCE: OperatorConstraints = OperatorConstraints { #foreach_body }); } + } else { + let output = &outputs[0]; + quote_spanned! {op_span=> + let #ident = #root::dfir_pipes::push::Reduce::new( + &mut #singleton_output_ident, + |#accumulator_ident: &mut _, #item_ident| { + #[allow(clippy::redundant_closure_call)] + (#func)(#accumulator_ident, #item_ident); + }, + #output, + ); + } }; Ok(OperatorWriteOutput { diff --git a/dfir_lang/src/graph/ops/reduce_keyed.rs b/dfir_lang/src/graph/ops/reduce_keyed.rs index 3a58e12dd6a8..404ac53082bb 100644 --- a/dfir_lang/src/graph/ops/reduce_keyed.rs +++ b/dfir_lang/src/graph/ops/reduce_keyed.rs @@ -1,7 +1,7 @@ use quote::{ToTokens, quote_spanned}; use super::{ - DelayType, OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance, + OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance, OperatorWriteOutput, Persistence, RANGE_1, WriteContextArgs, }; @@ -69,17 +69,16 @@ pub const REDUCE_KEYED: OperatorConstraints = OperatorConstraints { flo_type: None, ports_inn: None, ports_out: None, - input_delaytype_fn: |_| Some(DelayType::Stratum), + input_delaytype_fn: |_| None, write_fn: |wc @ &WriteContextArgs { - context, op_span, ident, inputs, + outputs, singleton_output_ident, is_pull, work_fn_async, root, - op_name, op_inst: OperatorInstance { generics: OpInstGenerics { type_args, .. }, @@ -89,8 +88,6 @@ pub const REDUCE_KEYED: OperatorConstraints = OperatorConstraints { .. }, diagnostics| { - assert!(is_pull, "TODO(mingwei): `{}` only supports pull.", op_name); - let [persistence] = wc.persistence_args_disallow_mutable(diagnostics); let generic_type_args = [ @@ -120,7 +117,16 @@ pub const REDUCE_KEYED: OperatorConstraints = OperatorConstraints { _ => Default::default(), }; - let write_iterator = { + let write_iterator = if !is_pull { + let output = &outputs[0]; + quote_spanned! {op_span=> + let #ident = #root::dfir_pipes::push::ReduceKeyed::new( + &mut #singleton_output_ident, + #aggfn, + #output, + ); + } + } else { let iter_expr = match persistence { Persistence::None | Persistence::Tick => quote_spanned! {op_span=> #hashtable_ident.drain() @@ -135,13 +141,8 @@ pub const REDUCE_KEYED: OperatorConstraints = OperatorConstraints { ) }, Persistence::Static => quote_spanned! {op_span=> - // Play everything but only on the first run of this tick/stratum. - // (We know we won't have any more inputs, so it is fine to only play once. - // Because of the `DelayType::Stratum` or `DelayType::MonotoneAccum`). - #context.is_first_run_this_tick() - .then_some(#hashtable_ident.iter()) - .into_iter() - .flatten() + // Play everything (each subgraph runs exactly once per tick). + #hashtable_ident.iter() .map( #[allow(suspicious_double_ref_op, clippy::clone_on_copy)] |(k, v)| ( diff --git a/dfir_lang/src/graph/ops/reduce_no_replay.rs b/dfir_lang/src/graph/ops/reduce_no_replay.rs index f59534909bad..cc7563e5dae0 100644 --- a/dfir_lang/src/graph/ops/reduce_no_replay.rs +++ b/dfir_lang/src/graph/ops/reduce_no_replay.rs @@ -96,7 +96,7 @@ pub const REDUCE_NO_REPLAY: OperatorConstraints = OperatorConstraints { let () = #work_fn_async(__fut).await; } - let #ident = if __was_updated || (#context.current_tick().0 == 0 && #context.is_first_run_this_tick()) { + let #ident = if __was_updated || #context.current_tick().0 == 0 { #work_fn( || #root::dfir_pipes::pull::iter( ::std::clone::Clone::clone(&*#accumulator_ident) diff --git a/dfir_lang/src/graph/ops/sort.rs b/dfir_lang/src/graph/ops/sort.rs index abab857a4280..1d55fc68478b 100644 --- a/dfir_lang/src/graph/ops/sort.rs +++ b/dfir_lang/src/graph/ops/sort.rs @@ -37,20 +37,25 @@ pub const SORT: OperatorConstraints = OperatorConstraints { work_fn_async, ident, inputs, + outputs, is_pull, .. }, _| { - assert!(is_pull); - - let input = &inputs[0]; - let write_iterator = quote_spanned! {op_span=> - // TODO(mingwei): unnecessary extra handoff into_iter() then collect(). - let #ident = { - let mut tmp = #work_fn_async(#root::dfir_pipes::pull::Pull::collect::<::std::vec::Vec<_>>(#input)).await; - <[_]>::sort_unstable(&mut tmp); - #root::dfir_pipes::pull::iter(tmp) - }; + let write_iterator = if is_pull { + let input = &inputs[0]; + quote_spanned! {op_span=> + let #ident = { + let mut tmp = #work_fn_async(#root::dfir_pipes::pull::Pull::collect::<::std::vec::Vec<_>>(#input)).await; + <[_]>::sort_unstable(&mut tmp); + #root::dfir_pipes::pull::iter(tmp) + }; + } + } else { + let output = &outputs[0]; + quote_spanned! {op_span=> + let #ident = #root::dfir_pipes::push::Sort::new(#output); + } }; Ok(OperatorWriteOutput { write_iterator, diff --git a/dfir_pipes/src/push/fold.rs b/dfir_pipes/src/push/fold.rs new file mode 100644 index 000000000000..4b5124c750c2 --- /dev/null +++ b/dfir_pipes/src/push/fold.rs @@ -0,0 +1,73 @@ +//! [`Fold`] push combinator. +use core::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::push::{Push, PushStep}; + +pin_project! { + /// Push combinator that accumulates all items via a fold function, then emits + /// the accumulated value downstream on flush. + /// + /// During `start_send`, items are folded into the accumulator. + /// During `poll_flush`, the accumulated value is cloned and sent downstream, + /// then the downstream is flushed. + /// + /// `AccRef` is typically `&'a mut Acc` — a mutable reference to externally-owned state. + #[must_use = "`Push`es do nothing unless items are pushed into them"] + pub struct Fold { + #[pin] + next: Next, + acc: AccRef, + comb_fn: CombFn, + flushed: bool, + } +} + +impl Fold { + /// Creates a new `Fold` push combinator with the given initial accumulator value. + pub const fn new(acc: Acc, comb_fn: CombFn, next: Next) -> Self { + Self { + next, + acc, + comb_fn, + flushed: false, + } + } +} + +// TODO(mingwei): support arbitrary metadata. +impl Push for Fold<&mut Acc, CombFn, Next> +where + Acc: Clone, + CombFn: FnMut(&mut Acc, Item), + Next: Push, +{ + type Ctx<'ctx> = Next::Ctx<'ctx>; + + type CanPend = Next::CanPend; + + fn poll_ready(self: Pin<&mut Self>, _ctx: &mut Self::Ctx<'_>) -> PushStep { + PushStep::Done + } + + fn start_send(self: Pin<&mut Self>, item: Item, _meta: ()) { + let this = self.project(); + (this.comb_fn)(this.acc, item); + *this.flushed = false; + } + + fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep { + let mut this = self.project(); + if !*this.flushed { + *this.flushed = true; + let value = this.acc.clone(); + this.next.as_mut().start_send(value, ()); + } + this.next.poll_flush(ctx) + } + + fn size_hint(self: Pin<&mut Self>, _hint: (usize, Option)) { + self.project().next.size_hint((1, Some(1))); + } +} diff --git a/dfir_pipes/src/push/fold_keyed.rs b/dfir_pipes/src/push/fold_keyed.rs new file mode 100644 index 000000000000..a1ea6946e825 --- /dev/null +++ b/dfir_pipes/src/push/fold_keyed.rs @@ -0,0 +1,85 @@ +//! [`FoldKeyed`] push combinator. +use core::hash::{BuildHasher, Hash}; +use core::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::push::{Push, PushStep, ready}; + +extern crate alloc; +use alloc::vec::Vec; + +pin_project! { + /// Push combinator that folds items by key into a hashmap, then emits all + /// (key, value) pairs downstream on flush. + #[must_use = "`Push`es do nothing unless items are pushed into them"] + pub struct FoldKeyed { + #[pin] + next: Next, + map: MapRef, + init_fn: InitFn, + comb_fn: CombFn, + flush_items: Vec<(K, V)>, + flush_idx: usize, + } +} + +impl FoldKeyed { + /// Creates a new `FoldKeyed` push combinator. + pub fn new(map: MapRef, init_fn: InitFn, comb_fn: CombFn, next: Next) -> Self { + Self { + next, + map, + init_fn, + comb_fn, + flush_items: Vec::new(), + flush_idx: 0, + } + } +} + +// Impl for `&mut HashMap` (works with any hasher including FxHashMap). +// TODO(mingwei): support arbitrary metadata. +impl Push<(K, V), ()> + for FoldKeyed<&mut std::collections::HashMap, InitFn, CombFn, Next, K, V> +where + K: Eq + Hash + Clone, + V: Clone, + S: BuildHasher, + InitFn: FnMut() -> V, + CombFn: FnMut(&mut V, V), + Next: Push<(K, V), ()>, +{ + type Ctx<'ctx> = Next::Ctx<'ctx>; + + type CanPend = Next::CanPend; + + fn poll_ready(self: Pin<&mut Self>, _ctx: &mut Self::Ctx<'_>) -> PushStep { + PushStep::Done + } + + fn start_send(self: Pin<&mut Self>, item: (K, V), _meta: ()) { + let this = self.project(); + let entry = this.map.entry(item.0).or_insert_with(|| (this.init_fn)()); + (this.comb_fn)(entry, item.1); + } + + fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep { + let mut this = self.project(); + if this.flush_items.is_empty() && *this.flush_idx == 0 { + this.flush_items + .extend(this.map.iter().map(|(k, v)| (k.clone(), v.clone()))); + } + while *this.flush_idx < this.flush_items.len() { + ready!(this.next.as_mut().poll_ready(ctx)); + let item = this.flush_items[*this.flush_idx].clone(); + this.next.as_mut().start_send(item, ()); + *this.flush_idx += 1; + } + this.flush_items.clear(); + *this.flush_idx = 0; + this.next.poll_flush(ctx) + } + + fn size_hint(self: Pin<&mut Self>, _hint: (usize, Option)) {} +} diff --git a/dfir_pipes/src/push/mod.rs b/dfir_pipes/src/push/mod.rs index ec3d37e19411..2a15c737e57f 100644 --- a/dfir_pipes/src/push/mod.rs +++ b/dfir_pipes/src/push/mod.rs @@ -16,15 +16,26 @@ mod flat_map; mod flat_map_stream; mod flatten; mod flatten_stream; +mod fold; +#[cfg(feature = "std")] +#[cfg_attr(docsrs, doc(cfg(feature = "std")))] +mod fold_keyed; mod for_each; mod inspect; mod map; #[cfg(feature = "alloc")] #[cfg_attr(docsrs, doc(cfg(feature = "alloc")))] mod persist; +mod reduce; +#[cfg(feature = "std")] +#[cfg_attr(docsrs, doc(cfg(feature = "std")))] +mod reduce_keyed; mod resolve_futures; mod sink; mod sink_compat; +#[cfg(feature = "alloc")] +#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))] +mod sort; mod unzip; #[cfg(feature = "alloc")] #[cfg_attr(docsrs, doc(cfg(feature = "alloc")))] @@ -48,6 +59,10 @@ pub use flat_map::FlatMap; pub use flat_map_stream::FlatMapStream; pub use flatten::Flatten; pub use flatten_stream::FlattenStream; +pub use fold::Fold; +#[cfg(feature = "std")] +#[cfg_attr(docsrs, doc(cfg(feature = "std")))] +pub use fold_keyed::FoldKeyed; pub use for_each::ForEach; use futures_core::FusedStream; pub use inspect::Inspect; @@ -55,9 +70,16 @@ pub use map::Map; #[cfg(feature = "alloc")] #[cfg_attr(docsrs, doc(cfg(feature = "alloc")))] pub use persist::Persist; +pub use reduce::Reduce; +#[cfg(feature = "std")] +#[cfg_attr(docsrs, doc(cfg(feature = "std")))] +pub use reduce_keyed::ReduceKeyed; pub use resolve_futures::ResolveFutures; pub use sink::Sink; pub use sink_compat::SinkCompat; +#[cfg(feature = "alloc")] +#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))] +pub use sort::Sort; pub use unzip::Unzip; #[cfg(feature = "alloc")] #[cfg_attr(docsrs, doc(cfg(feature = "alloc")))] diff --git a/dfir_pipes/src/push/reduce.rs b/dfir_pipes/src/push/reduce.rs new file mode 100644 index 000000000000..af90d13864cb --- /dev/null +++ b/dfir_pipes/src/push/reduce.rs @@ -0,0 +1,73 @@ +//! [`Reduce`] push combinator. +use core::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::push::{Push, PushStep}; + +pin_project! { + /// Push combinator that reduces all items into a single value, then emits + /// it downstream on flush. If no items were received, nothing is emitted. + /// + /// `AccRef` is typically `&'a mut Option` — a mutable reference to externally-owned state. + #[must_use = "`Push`es do nothing unless items are pushed into them"] + pub struct Reduce { + #[pin] + next: Next, + acc: AccRef, + reduce_fn: ReduceFn, + flushed: bool, + } +} + +impl Reduce { + /// Creates a new `Reduce` push combinator. + pub const fn new(acc: AccRef, reduce_fn: ReduceFn, next: Next) -> Self { + Self { + next, + acc, + reduce_fn, + flushed: false, + } + } +} + +// TODO(mingwei): support arbitrary metadata. +impl Push for Reduce<&mut Option, ReduceFn, Next> +where + Item: Clone, + ReduceFn: FnMut(&mut Item, Item), + Next: Push, +{ + type Ctx<'ctx> = Next::Ctx<'ctx>; + + type CanPend = Next::CanPend; + + fn poll_ready(self: Pin<&mut Self>, _ctx: &mut Self::Ctx<'_>) -> PushStep { + PushStep::Done + } + + fn start_send(self: Pin<&mut Self>, item: Item, _meta: ()) { + let this = self.project(); + match this.acc { + Some(acc) => (this.reduce_fn)(acc, item), + None => **this.acc = Some(item), + } + *this.flushed = false; + } + + fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep { + let mut this = self.project(); + if !*this.flushed { + *this.flushed = true; + if let Some(value) = this.acc.as_ref() { + this.next.as_mut().start_send(value.clone(), ()); + } + } + this.next.poll_flush(ctx) + } + + fn size_hint(self: Pin<&mut Self>, _hint: (usize, Option)) { + self.project().next.size_hint((0, Some(1))); + } +} diff --git a/dfir_pipes/src/push/reduce_keyed.rs b/dfir_pipes/src/push/reduce_keyed.rs new file mode 100644 index 000000000000..84dd9cfd1768 --- /dev/null +++ b/dfir_pipes/src/push/reduce_keyed.rs @@ -0,0 +1,88 @@ +//! [`ReduceKeyed`] push combinator. +use core::hash::{BuildHasher, Hash}; +use core::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::push::{Push, PushStep, ready}; + +extern crate alloc; +use alloc::vec::Vec; + +pin_project! { + /// Push combinator that reduces items by key into a hashmap, then emits all + /// (key, value) pairs downstream on flush. The first value for each key + /// becomes the initial accumulator. + #[must_use = "`Push`es do nothing unless items are pushed into them"] + pub struct ReduceKeyed { + #[pin] + next: Next, + map: MapRef, + reduce_fn: ReduceFn, + flush_items: Vec<(K, V)>, + flush_idx: usize, + } +} + +impl ReduceKeyed { + /// Creates a new `ReduceKeyed` push combinator. + pub fn new(map: MapRef, reduce_fn: ReduceFn, next: Next) -> Self { + Self { + next, + map, + reduce_fn, + flush_items: Vec::new(), + flush_idx: 0, + } + } +} + +// TODO(mingwei): support arbitrary metadata. +impl Push<(K, V), ()> + for ReduceKeyed<&mut std::collections::HashMap, ReduceFn, Next, K, V> +where + K: Eq + Hash + Clone, + V: Clone, + S: BuildHasher, + ReduceFn: FnMut(&mut V, V), + Next: Push<(K, V), ()>, +{ + type Ctx<'ctx> = Next::Ctx<'ctx>; + + type CanPend = Next::CanPend; + + fn poll_ready(self: Pin<&mut Self>, _ctx: &mut Self::Ctx<'_>) -> PushStep { + PushStep::Done + } + + fn start_send(self: Pin<&mut Self>, item: (K, V), _meta: ()) { + let this = self.project(); + match this.map.entry(item.0) { + std::collections::hash_map::Entry::Vacant(vacant) => { + vacant.insert(item.1); + } + std::collections::hash_map::Entry::Occupied(mut occupied) => { + (this.reduce_fn)(occupied.get_mut(), item.1); + } + } + } + + fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep { + let mut this = self.project(); + if this.flush_items.is_empty() && *this.flush_idx == 0 { + this.flush_items + .extend(this.map.iter().map(|(k, v)| (k.clone(), v.clone()))); + } + while *this.flush_idx < this.flush_items.len() { + ready!(this.next.as_mut().poll_ready(ctx)); + let item = this.flush_items[*this.flush_idx].clone(); + this.next.as_mut().start_send(item, ()); + *this.flush_idx += 1; + } + this.flush_items.clear(); + *this.flush_idx = 0; + this.next.poll_flush(ctx) + } + + fn size_hint(self: Pin<&mut Self>, _hint: (usize, Option)) {} +} diff --git a/dfir_pipes/src/push/sort.rs b/dfir_pipes/src/push/sort.rs new file mode 100644 index 000000000000..b3351160cba6 --- /dev/null +++ b/dfir_pipes/src/push/sort.rs @@ -0,0 +1,79 @@ +//! [`Sort`] push combinator. +use alloc::vec::Vec; +use core::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::push::{Push, PushStep, ready}; + +pin_project! { + /// Push combinator that collects all items, sorts them, then emits them + /// downstream in sorted order on flush. + #[must_use = "`Push`es do nothing unless items are pushed into them"] + #[derive(Clone, Debug)] + pub struct Sort { + #[pin] + next: Next, + buf: Vec, + sorted: bool, + flush_idx: usize, + } +} + +impl Sort { + /// Creates a new `Sort` push combinator. + pub const fn new(next: Next) -> Self { + Self { + next, + buf: Vec::new(), + sorted: false, + flush_idx: 0, + } + } +} + +// TODO(mingwei): support arbitrary metadata. +impl Push for Sort +where + Item: Ord + Clone, + Next: Push, +{ + type Ctx<'ctx> = Next::Ctx<'ctx>; + + type CanPend = Next::CanPend; + + fn poll_ready(self: Pin<&mut Self>, _ctx: &mut Self::Ctx<'_>) -> PushStep { + PushStep::Done + } + + fn start_send(self: Pin<&mut Self>, item: Item, _meta: ()) { + let this = self.project(); + this.buf.push(item); + *this.sorted = false; + } + + fn poll_flush(self: Pin<&mut Self>, ctx: &mut Self::Ctx<'_>) -> PushStep { + let mut this = self.project(); + if !*this.sorted { + this.buf.sort_unstable(); + *this.sorted = true; + *this.flush_idx = 0; + } + while *this.flush_idx < this.buf.len() { + ready!(this.next.as_mut().poll_ready(ctx)); + let item = this.buf[*this.flush_idx].clone(); + this.next.as_mut().start_send(item, ()); + *this.flush_idx += 1; + } + this.buf.clear(); + *this.sorted = false; + *this.flush_idx = 0; + this.next.poll_flush(ctx) + } + + fn size_hint(self: Pin<&mut Self>, hint: (usize, Option)) { + let this = self.project(); + this.buf.reserve(hint.0); + this.next.size_hint(hint); + } +} diff --git a/dfir_rs/src/scheduled/context.rs b/dfir_rs/src/scheduled/context.rs index 162c286eab3b..d2838178fb64 100644 --- a/dfir_rs/src/scheduled/context.rs +++ b/dfir_rs/src/scheduled/context.rs @@ -54,10 +54,11 @@ impl Wake for WakeState { } /// A lightweight context for inline codegen that avoids the overhead of the full -/// [`Context`] (no tokio channels, no scheduler queues, no loop machinery). +/// scheduled graph (no tokio channels, no scheduler queues, no loop machinery). /// -/// Exposes method names that operator-generated code calls on -/// `context` (for iterators: `is_first_run_this_tick`, `current_tick`, etc.). +/// Exposes methods that operator-generated code calls on both +/// `df` (for prologues: `request_task`) and +/// `context` (for iterators: `current_tick`, `schedule_subgraph`, etc.). #[doc(hidden)] #[derive(Default)] pub struct Context { @@ -102,13 +103,6 @@ impl Context { // --- Methods called as `context.xxx()` in operator iterators --- - /// Always returns `true` in inline mode. The inline codegen runs the entire DAG - /// once per tick with no re-execution, so every subgraph is always on its first - /// (and only) run within each tick. - pub fn is_first_run_this_tick(&self) -> bool { - true - } - /// Gets the current tick count. pub fn current_tick(&self) -> TickInstant { self.current_tick diff --git a/dfir_rs/src/util/sparse_vec.rs b/dfir_rs/src/util/sparse_vec.rs index 6a01b6bdcef3..052d60a2d6a5 100644 --- a/dfir_rs/src/util/sparse_vec.rs +++ b/dfir_rs/src/util/sparse_vec.rs @@ -4,6 +4,7 @@ use std::hash::Hash; use std::iter::FusedIterator; /// A vector that supports efficient deletion without reordering all subsequent items. +#[derive(Clone)] pub struct SparseVec { items: Vec>, item_locs: HashMap>, diff --git a/dfir_rs/tests/compile-fail-stable/surface_demuxenum_wrongenum.stderr b/dfir_rs/tests/compile-fail-stable/surface_demuxenum_wrongenum.stderr index c5b673b3cd4d..b1c45d65a5eb 100644 --- a/dfir_rs/tests/compile-fail-stable/surface_demuxenum_wrongenum.stderr +++ b/dfir_rs/tests/compile-fail-stable/surface_demuxenum_wrongenum.stderr @@ -87,11 +87,11 @@ error[E0277]: the trait bound `impl dfir_rs::dfir_pipes::push::Push` `DemuxEnum` implements `dfir_rs::dfir_pipes::push::Push` `ResolveFutures` implements `dfir_rs::dfir_pipes::push::Push` + `Sort` implements `dfir_rs::dfir_pipes::push::Push` `VecPush` implements `dfir_rs::dfir_pipes::push::Push` `dfir_rs::dfir_pipes::push::DemuxVar` implements `dfir_rs::dfir_pipes::push::Push<(usize, Item), Meta>` `dfir_rs::dfir_pipes::push::Fanout` implements `dfir_rs::dfir_pipes::push::Push` `dfir_rs::dfir_pipes::push::Filter` implements `dfir_rs::dfir_pipes::push::Push` - `dfir_rs::dfir_pipes::push::FilterMap` implements `dfir_rs::dfir_pipes::push::Push` and $N others note: required by a bound in `pivot_run_sg_1v1` --> tests/compile-fail-stable/surface_demuxenum_wrongenum.rs:17:15 diff --git a/dfir_rs/tests/compile-fail-stable/surface_demuxenum_wrongfields_1.stderr b/dfir_rs/tests/compile-fail-stable/surface_demuxenum_wrongfields_1.stderr index 5c056b1b0788..9c0f8c426e67 100644 --- a/dfir_rs/tests/compile-fail-stable/surface_demuxenum_wrongfields_1.stderr +++ b/dfir_rs/tests/compile-fail-stable/surface_demuxenum_wrongfields_1.stderr @@ -9,11 +9,11 @@ error[E0277]: the trait bound `impl dfir_rs::dfir_pipes::push::Push` `DemuxEnum` implements `dfir_rs::dfir_pipes::push::Push` `ResolveFutures` implements `dfir_rs::dfir_pipes::push::Push` + `Sort` implements `dfir_rs::dfir_pipes::push::Push` `VecPush` implements `dfir_rs::dfir_pipes::push::Push` `dfir_rs::dfir_pipes::push::DemuxVar` implements `dfir_rs::dfir_pipes::push::Push<(usize, Item), Meta>` `dfir_rs::dfir_pipes::push::Fanout` implements `dfir_rs::dfir_pipes::push::Push` `dfir_rs::dfir_pipes::push::Filter` implements `dfir_rs::dfir_pipes::push::Push` - `dfir_rs::dfir_pipes::push::FilterMap` implements `dfir_rs::dfir_pipes::push::Push` and $N others note: required for `Shape` to implement `DemuxEnumPush<(Pin<&mut impl dfir_rs::dfir_pipes::push::Push<(f64,), (), CanPend = }> as dfir_rs::dfir_pipes::push::Push<(f64,), ()>>::CanPend>>, Pin<&mut impl dfir_rs::dfir_pipes::push::Push<(f64, f64), (), CanPend = }> as dfir_rs::dfir_pipes::push::Push<(f64, f64), ()>>::CanPend>>, Pin<&mut impl dfir_rs::dfir_pipes::push::Push as dfir_rs::dfir_pipes::push::Push>::CanPend>>), ()>` --> tests/compile-fail-stable/surface_demuxenum_wrongfields_1.rs:5:14 @@ -40,11 +40,11 @@ error[E0277]: the trait bound `impl dfir_rs::dfir_pipes::push::Push` `DemuxEnum` implements `dfir_rs::dfir_pipes::push::Push` `ResolveFutures` implements `dfir_rs::dfir_pipes::push::Push` + `Sort` implements `dfir_rs::dfir_pipes::push::Push` `VecPush` implements `dfir_rs::dfir_pipes::push::Push` `dfir_rs::dfir_pipes::push::DemuxVar` implements `dfir_rs::dfir_pipes::push::Push<(usize, Item), Meta>` `dfir_rs::dfir_pipes::push::Fanout` implements `dfir_rs::dfir_pipes::push::Push` `dfir_rs::dfir_pipes::push::Filter` implements `dfir_rs::dfir_pipes::push::Push` - `dfir_rs::dfir_pipes::push::FilterMap` implements `dfir_rs::dfir_pipes::push::Push` and $N others note: required for `Shape` to implement `DemuxEnumPush<(Pin<&mut impl dfir_rs::dfir_pipes::push::Push<(f64,), (), CanPend = }> as dfir_rs::dfir_pipes::push::Push<(f64,), ()>>::CanPend>>, Pin<&mut impl dfir_rs::dfir_pipes::push::Push<(f64, f64), (), CanPend = }> as dfir_rs::dfir_pipes::push::Push<(f64, f64), ()>>::CanPend>>, Pin<&mut impl dfir_rs::dfir_pipes::push::Push as dfir_rs::dfir_pipes::push::Push>::CanPend>>), ()>` --> tests/compile-fail-stable/surface_demuxenum_wrongfields_1.rs:5:14 diff --git a/dfir_rs/tests/compile-fail-stable/surface_demuxenum_wrongfields_2.stderr b/dfir_rs/tests/compile-fail-stable/surface_demuxenum_wrongfields_2.stderr index ec5e4d24e65b..128d0f0a8191 100644 --- a/dfir_rs/tests/compile-fail-stable/surface_demuxenum_wrongfields_2.stderr +++ b/dfir_rs/tests/compile-fail-stable/surface_demuxenum_wrongfields_2.stderr @@ -9,11 +9,11 @@ error[E0277]: the trait bound `impl dfir_rs::dfir_pipes::push::Push<(u32,), (), `&mut P` implements `dfir_rs::dfir_pipes::push::Push` `DemuxEnum` implements `dfir_rs::dfir_pipes::push::Push` `ResolveFutures` implements `dfir_rs::dfir_pipes::push::Push` + `Sort` implements `dfir_rs::dfir_pipes::push::Push` `VecPush` implements `dfir_rs::dfir_pipes::push::Push` `dfir_rs::dfir_pipes::push::DemuxVar` implements `dfir_rs::dfir_pipes::push::Push<(usize, Item), Meta>` `dfir_rs::dfir_pipes::push::Fanout` implements `dfir_rs::dfir_pipes::push::Push` `dfir_rs::dfir_pipes::push::Filter` implements `dfir_rs::dfir_pipes::push::Push` - `dfir_rs::dfir_pipes::push::FilterMap` implements `dfir_rs::dfir_pipes::push::Push` and $N others note: required for `Shape` to implement `DemuxEnumPush<(Pin<&mut impl dfir_rs::dfir_pipes::push::Push<(f64,), (), CanPend = }> as dfir_rs::dfir_pipes::push::Push<(f64,), ()>>::CanPend>>, Pin<&mut impl dfir_rs::dfir_pipes::push::Push<(f64, f64), (), CanPend = }> as dfir_rs::dfir_pipes::push::Push<(f64, f64), ()>>::CanPend>>, Pin<&mut impl dfir_rs::dfir_pipes::push::Push<(u32,), (), CanPend = as dfir_rs::dfir_pipes::push::Push<(u32,), ()>>::CanPend>>), ()>` --> tests/compile-fail-stable/surface_demuxenum_wrongfields_2.rs:5:14 @@ -40,11 +40,11 @@ error[E0277]: the trait bound `impl dfir_rs::dfir_pipes::push::Push<(u32,), (), `&mut P` implements `dfir_rs::dfir_pipes::push::Push` `DemuxEnum` implements `dfir_rs::dfir_pipes::push::Push` `ResolveFutures` implements `dfir_rs::dfir_pipes::push::Push` + `Sort` implements `dfir_rs::dfir_pipes::push::Push` `VecPush` implements `dfir_rs::dfir_pipes::push::Push` `dfir_rs::dfir_pipes::push::DemuxVar` implements `dfir_rs::dfir_pipes::push::Push<(usize, Item), Meta>` `dfir_rs::dfir_pipes::push::Fanout` implements `dfir_rs::dfir_pipes::push::Push` `dfir_rs::dfir_pipes::push::Filter` implements `dfir_rs::dfir_pipes::push::Push` - `dfir_rs::dfir_pipes::push::FilterMap` implements `dfir_rs::dfir_pipes::push::Push` and $N others note: required for `Shape` to implement `DemuxEnumPush<(Pin<&mut impl dfir_rs::dfir_pipes::push::Push<(f64,), (), CanPend = }> as dfir_rs::dfir_pipes::push::Push<(f64,), ()>>::CanPend>>, Pin<&mut impl dfir_rs::dfir_pipes::push::Push<(f64, f64), (), CanPend = }> as dfir_rs::dfir_pipes::push::Push<(f64, f64), ()>>::CanPend>>, Pin<&mut impl dfir_rs::dfir_pipes::push::Push<(u32,), (), CanPend = as dfir_rs::dfir_pipes::push::Push<(u32,), ()>>::CanPend>>), ()>` --> tests/compile-fail-stable/surface_demuxenum_wrongfields_2.rs:5:14 diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_dot.snap index d72e195d1270..00b222047709 100644 --- a/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_dot.snap @@ -9,23 +9,15 @@ digraph { n2v1 [label="(n2v1) fold::<'tick>(Vec::new, Vec::push)", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) flat_map(|mut vec| {\l vec.sort();\l vec\l})\l", shape=invhouse, fillcolor="#88aaff"] n4v1 [label="(n4v1) for_each(|v| print!(\"{:?}, \", v))", shape=house, fillcolor="#ffff88"] - n5v1 [label="(n5v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n3v1 -> n4v1 n2v1 -> n3v1 - n1v1 -> n5v1 - n5v1 -> n2v1 [color=red] + n1v1 -> n2v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" n1v1 - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n2v1 n3v1 n4v1 diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_mermaid.snap index 23eeb5a328d2..2bc91938c224 100644 --- a/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_mermaid.snap @@ -12,15 +12,11 @@ linkStyle default stroke:#aaa 2v1[\"(2v1) fold::<'tick>(Vec::new, Vec::push)"/]:::pullClass 3v1[\"
(3v1)
flat_map(|mut vec| {
vec.sort();
vec
})
"/]:::pullClass 4v1[/"(4v1) for_each(|v| print!("{:?}, ", v))"\]:::pushClass -5v1["(5v1) handoff"]:::otherClass 3v1-->4v1 2v1-->3v1 -1v1-->5v1 -5v1--x2v1; linkStyle 3 stroke:red +1v1-->2v1 subgraph sg_1v1 ["sg_1v1"] 1v1 -end -subgraph sg_2v1 ["sg_2v1"] 2v1 3v1 4v1 diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_mermaid.snap.new b/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_mermaid.snap.new new file mode 100644 index 000000000000..c7d99b8d32b0 --- /dev/null +++ b/dfir_rs/tests/snapshots/surface_fold__fold_sort@graphvis_mermaid.snap.new @@ -0,0 +1,24 @@ +--- +source: dfir_rs/tests/surface_fold.rs +assertion_line: 174 +expression: df.meta_graph().unwrap().to_mermaid(cfg) +--- +%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%% +flowchart TD +classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre +classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre +classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre +linkStyle default stroke:#aaa +1v1[\"(1v1) source_stream(items_recv)"/]:::pullClass +2v1[\"(2v1) fold::<'tick>(Vec::new, Vec::push)"/]:::pullClass +3v1[\"
(3v1)
flat_map(|mut vec| {
vec.sort();
vec
})
"/]:::pullClass +4v1[/"(4v1) for_each(|v| print!("{:?}, ", v))"\]:::pushClass +3v1-->4v1 +2v1-->3v1 +1v1-->2v1 +subgraph sg_1v1 ["sg_1v1"] + 1v1 + 2v1 + 3v1 + 4v1 +end diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_static@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_fold__fold_static@graphvis_dot.snap index 41feff8748af..d3b8ff5a45f0 100644 --- a/dfir_rs/tests/snapshots/surface_fold__fold_static@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_fold__fold_static@graphvis_dot.snap @@ -8,22 +8,14 @@ digraph { n1v1 [label="(n1v1) source_stream(items_recv)", shape=invhouse, fillcolor="#88aaff"] n2v1 [label="(n2v1) fold::<\l 'static,\l>(\l Vec::new,\l |old: &mut Vec, mut x: Vec| {\l old.append(&mut x);\l },\l)\l", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) for_each(|v| result_send.send(v).unwrap())", shape=house, fillcolor="#ffff88"] - n4v1 [label="(n4v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n4v1 - n4v1 -> n2v1 [color=red] + n1v1 -> n2v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" n1v1 - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n2v1 n3v1 } diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_static@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_fold__fold_static@graphvis_mermaid.snap index 6cc830469e8c..05d146be034c 100644 --- a/dfir_rs/tests/snapshots/surface_fold__fold_static@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_fold__fold_static@graphvis_mermaid.snap @@ -11,14 +11,10 @@ linkStyle default stroke:#aaa 1v1[\"(1v1) source_stream(items_recv)"/]:::pullClass 2v1[\"
(2v1)
fold::<
'static,
>(
Vec::new,
|old: &mut Vec<u32>, mut x: Vec<u32>| {
old.append(&mut x);
},
)
"/]:::pullClass 3v1[/"(3v1) for_each(|v| result_send.send(v).unwrap())"\]:::pushClass -4v1["(4v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->4v1 -4v1--x2v1; linkStyle 2 stroke:red +1v1-->2v1 subgraph sg_1v1 ["sg_1v1"] 1v1 -end -subgraph sg_2v1 ["sg_2v1"] 2v1 3v1 end diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_dot.snap index 95a707ceb2f5..59ebaf6f3383 100644 --- a/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_dot.snap @@ -13,47 +13,35 @@ digraph { n6v1 [label="(n6v1) cross_join_multiset()", shape=invhouse, fillcolor="#88aaff"] n7v1 [label="(n7v1) for_each(|v| result_send.send(v).unwrap())", shape=house, fillcolor="#ffff88"] n8v1 [label="(n8v1) handoff", shape=parallelogram, fillcolor="#ddddff"] - n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n8v1 + n1v1 -> n2v1 n3v1 -> n4v1 - n3v1 -> n9v1 + n3v1 -> n8v1 n5v1 -> n6v1 [label="0"] n6v1 -> n7v1 - n8v1 -> n2v1 [color=red] - n9v1 -> n6v1 [label="1"] + n8v1 -> n6v1 [label="1"] subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" + n4v1 subgraph sg_1v1_var_teed_fold { cluster=true label="var teed_fold" n1v1 - } - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" - n4v1 - subgraph sg_2v1_var_teed_fold { - cluster=true - label="var teed_fold" n2v1 n3v1 } } - subgraph sg_3v1 { + subgraph sg_2v1 { cluster=true fillcolor="#dddddd" style=filled - label = "sg_3v1" + label = "sg_2v1" n5v1 n7v1 - subgraph sg_3v1_var_join_node { + subgraph sg_2v1_var_join_node { cluster=true label="var join_node" n6v1 diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_mermaid.snap index cf1332118c0a..0b8fa1234d35 100644 --- a/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_mermaid.snap @@ -16,31 +16,25 @@ linkStyle default stroke:#aaa 6v1[\"(6v1) cross_join_multiset()"/]:::pullClass 7v1[/"(7v1) for_each(|v| result_send.send(v).unwrap())"\]:::pushClass 8v1["(8v1) handoff"]:::otherClass -9v1["(9v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->8v1 +1v1-->2v1 3v1-->4v1 -3v1-->9v1 +3v1-->8v1 5v1-->|0|6v1 6v1-->7v1 -8v1--x2v1; linkStyle 6 stroke:red -9v1-->|1|6v1 +8v1-->|1|6v1 subgraph sg_1v1 ["sg_1v1"] + 4v1 subgraph sg_1v1_var_teed_fold ["var teed_fold"] 1v1 - end -end -subgraph sg_2v1 ["sg_2v1"] - 4v1 - subgraph sg_2v1_var_teed_fold ["var teed_fold"] 2v1 3v1 end end -subgraph sg_3v1 ["sg_3v1"] +subgraph sg_2v1 ["sg_2v1"] 5v1 7v1 - subgraph sg_3v1_var_join_node ["var join_node"] + subgraph sg_2v1_var_join_node ["var join_node"] 6v1 end end diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_mermaid.snap.new b/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_mermaid.snap.new new file mode 100644 index 000000000000..c81c02a8ebb3 --- /dev/null +++ b/dfir_rs/tests/snapshots/surface_fold__fold_static_join@graphvis_mermaid.snap.new @@ -0,0 +1,41 @@ +--- +source: dfir_rs/tests/surface_fold.rs +assertion_line: 99 +expression: df.meta_graph().unwrap().to_mermaid(cfg) +--- +%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%% +flowchart TD +classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre +classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre +classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre +linkStyle default stroke:#aaa +1v1[\"(1v1) source_iter([])"/]:::pullClass +2v1[\"
(2v1)
fold::<
'tick,
>(
|| 0,
|old: &mut usize, _: usize| {
*old += 1;
},
)
"/]:::pullClass +3v1[/"(3v1) tee()"\]:::pushClass +4v1[/"(4v1) for_each(|_| {})"\]:::pushClass +5v1[\"(5v1) source_stream(items_recv)"/]:::pullClass +6v1[\"(6v1) cross_join_multiset()"/]:::pullClass +7v1[/"(7v1) for_each(|v| result_send.send(v).unwrap())"\]:::pushClass +8v1["(8v1) handoff"]:::otherClass +2v1-->3v1 +1v1-->2v1 +3v1-->4v1 +3v1-->8v1 +5v1-->|0|6v1 +6v1-->7v1 +8v1-->|1|6v1 +subgraph sg_1v1 ["sg_1v1"] + 4v1 + subgraph sg_1v1_var_teed_fold ["var teed_fold"] + 1v1 + 2v1 + 3v1 + end +end +subgraph sg_2v1 ["sg_2v1"] + 5v1 + 7v1 + subgraph sg_2v1_var_join_node ["var join_node"] + 6v1 + end +end diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_dot.snap index 4b8afed77715..e99a8fda216b 100644 --- a/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_dot.snap @@ -8,22 +8,14 @@ digraph { n1v1 [label="(n1v1) source_stream(items_recv)", shape=invhouse, fillcolor="#88aaff"] n2v1 [label="(n2v1) fold::<\l 'tick,\l>(\l Vec::new,\l |old: &mut Vec, mut x: Vec| {\l old.append(&mut x);\l },\l)\l", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) for_each(|v| result_send.send(v).unwrap())", shape=house, fillcolor="#ffff88"] - n4v1 [label="(n4v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n4v1 - n4v1 -> n2v1 [color=red] + n1v1 -> n2v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" n1v1 - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n2v1 n3v1 } diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_mermaid.snap index 5b16dca98905..c270ae75724f 100644 --- a/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_mermaid.snap @@ -11,14 +11,10 @@ linkStyle default stroke:#aaa 1v1[\"(1v1) source_stream(items_recv)"/]:::pullClass 2v1[\"
(2v1)
fold::<
'tick,
>(
Vec::new,
|old: &mut Vec<u32>, mut x: Vec<u32>| {
old.append(&mut x);
},
)
"/]:::pullClass 3v1[/"(3v1) for_each(|v| result_send.send(v).unwrap())"\]:::pushClass -4v1["(4v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->4v1 -4v1--x2v1; linkStyle 2 stroke:red +1v1-->2v1 subgraph sg_1v1 ["sg_1v1"] 1v1 -end -subgraph sg_2v1 ["sg_2v1"] 2v1 3v1 end diff --git a/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_mermaid.snap.new b/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_mermaid.snap.new new file mode 100644 index 000000000000..a78a34da530b --- /dev/null +++ b/dfir_rs/tests/snapshots/surface_fold__fold_tick@graphvis_mermaid.snap.new @@ -0,0 +1,21 @@ +--- +source: dfir_rs/tests/surface_fold.rs +assertion_line: 18 +expression: df.meta_graph().unwrap().to_mermaid(cfg) +--- +%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%% +flowchart TD +classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre +classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre +classDef otherClass fill:#fdc,stroke:#000,text-align:left,white-space:pre +linkStyle default stroke:#aaa +1v1[\"(1v1) source_stream(items_recv)"/]:::pullClass +2v1[\"
(2v1)
fold::<
'tick,
>(
Vec::new,
|old: &mut Vec<u32>, mut x: Vec<u32>| {
old.append(&mut x);
},
)
"/]:::pullClass +3v1[/"(3v1) for_each(|v| result_send.send(v).unwrap())"\]:::pushClass +2v1-->3v1 +1v1-->2v1 +subgraph sg_1v1 ["sg_1v1"] + 1v1 + 2v1 + 3v1 +end diff --git a/dfir_rs/tests/snapshots/surface_persist__persist_basic@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_persist__persist_basic@graphvis_dot.snap index f047ef59287d..895898f50d8e 100644 --- a/dfir_rs/tests/snapshots/surface_persist__persist_basic@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_persist__persist_basic@graphvis_dot.snap @@ -10,12 +10,10 @@ digraph { n3v1 [label="(n3v1) persist::<'static>()", shape=invhouse, fillcolor="#88aaff"] n4v1 [label="(n4v1) fold(|| 0, |a: &mut _, b| *a += b)", shape=invhouse, fillcolor="#88aaff"] n5v1 [label="(n5v1) for_each(|x| result_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"] - n6v1 [label="(n6v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n4v1 -> n5v1 - n3v1 -> n6v1 + n3v1 -> n4v1 n2v1 -> n3v1 n1v1 -> n2v1 - n6v1 -> n4v1 [color=red] subgraph sg_1v1 { cluster=true fillcolor="#dddddd" @@ -24,12 +22,6 @@ digraph { n1v1 n2v1 n3v1 - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n4v1 n5v1 } diff --git a/dfir_rs/tests/snapshots/surface_persist__persist_basic@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_persist__persist_basic@graphvis_mermaid.snap index b229f7c7dd10..aa347615011a 100644 --- a/dfir_rs/tests/snapshots/surface_persist__persist_basic@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_persist__persist_basic@graphvis_mermaid.snap @@ -13,18 +13,14 @@ linkStyle default stroke:#aaa 3v1[\"(3v1) persist::<'static>()"/]:::pullClass 4v1[\"(4v1) fold(|| 0, |a: &mut _, b| *a += b)"/]:::pullClass 5v1[/"(5v1) for_each(|x| result_send.send(x).unwrap())"\]:::pushClass -6v1["(6v1) handoff"]:::otherClass 4v1-->5v1 -3v1-->6v1 +3v1-->4v1 2v1-->3v1 1v1-->2v1 -6v1--x4v1; linkStyle 4 stroke:red subgraph sg_1v1 ["sg_1v1"] 1v1 2v1 3v1 -end -subgraph sg_2v1 ["sg_2v1"] 4v1 5v1 end diff --git a/dfir_rs/tests/snapshots/surface_persist__persist_mut@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_persist__persist_mut@graphvis_dot.snap index e19b71175738..512553d53042 100644 --- a/dfir_rs/tests/snapshots/surface_persist__persist_mut@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_persist__persist_mut@graphvis_dot.snap @@ -10,49 +10,29 @@ digraph { n3v1 [label="(n3v1) tee()", shape=house, fillcolor="#ffff88"] n4v1 [label="(n4v1) for_each(|v| pull_tx.send(v).unwrap())", shape=house, fillcolor="#ffff88"] n5v1 [label="(n5v1) flat_map(|x| if x == 3 { vec![Persist(x), Delete(x)] } else { vec![Persist(x)] })", shape=house, fillcolor="#ffff88"] - n6v1 [label="(n6v1) persist_mut::<'mutable>()", shape=invhouse, fillcolor="#88aaff"] + n6v1 [label="(n6v1) persist_mut::<'mutable>()", shape=house, fillcolor="#ffff88"] n7v1 [label="(n7v1) for_each(|v| push_tx.send(v).unwrap())", shape=house, fillcolor="#ffff88"] - n8v1 [label="(n8v1) handoff", shape=parallelogram, fillcolor="#ddddff"] - n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n8v1 + n1v1 -> n2v1 n3v1 -> n4v1 n6v1 -> n7v1 - n5v1 -> n9v1 + n5v1 -> n6v1 n3v1 -> n5v1 - n8v1 -> n2v1 [color=red] - n9v1 -> n6v1 [color=red] subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" - subgraph sg_1v1_var_my_tee { - cluster=true - label="var my_tee" - n1v1 - } - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n4v1 n5v1 - subgraph sg_2v1_var_my_tee { + n6v1 + n7v1 + subgraph sg_1v1_var_my_tee { cluster=true label="var my_tee" + n1v1 n2v1 n3v1 } } - subgraph sg_3v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_3v1" - n6v1 - n7v1 - } } diff --git a/dfir_rs/tests/snapshots/surface_persist__persist_mut@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_persist__persist_mut@graphvis_mermaid.snap index ff43dec6873a..f1eea932bba3 100644 --- a/dfir_rs/tests/snapshots/surface_persist__persist_mut@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_persist__persist_mut@graphvis_mermaid.snap @@ -13,32 +13,22 @@ linkStyle default stroke:#aaa 3v1[/"(3v1) tee()"\]:::pushClass 4v1[/"(4v1) for_each(|v| pull_tx.send(v).unwrap())"\]:::pushClass 5v1[/"(5v1) flat_map(|x| if x == 3 { vec![Persist(x), Delete(x)] } else { vec![Persist(x)] })"\]:::pushClass -6v1[\"(6v1) persist_mut::<'mutable>()"/]:::pullClass +6v1[/"(6v1) persist_mut::<'mutable>()"\]:::pushClass 7v1[/"(7v1) for_each(|v| push_tx.send(v).unwrap())"\]:::pushClass -8v1["(8v1) handoff"]:::otherClass -9v1["(9v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->8v1 +1v1-->2v1 3v1-->4v1 6v1-->7v1 -5v1-->9v1 +5v1-->6v1 3v1-->5v1 -8v1--x2v1; linkStyle 6 stroke:red -9v1--x6v1; linkStyle 7 stroke:red subgraph sg_1v1 ["sg_1v1"] - subgraph sg_1v1_var_my_tee ["var my_tee"] - 1v1 - end -end -subgraph sg_2v1 ["sg_2v1"] 4v1 5v1 - subgraph sg_2v1_var_my_tee ["var my_tee"] + 6v1 + 7v1 + subgraph sg_1v1_var_my_tee ["var my_tee"] + 1v1 2v1 3v1 end end -subgraph sg_3v1 ["sg_3v1"] - 6v1 - 7v1 -end diff --git a/dfir_rs/tests/snapshots/surface_persist__persist_mut_keyed@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_persist__persist_mut_keyed@graphvis_dot.snap index f7dc663146ed..78a1353b0cdd 100644 --- a/dfir_rs/tests/snapshots/surface_persist__persist_mut_keyed@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_persist__persist_mut_keyed@graphvis_dot.snap @@ -10,49 +10,29 @@ digraph { n3v1 [label="(n3v1) tee()", shape=house, fillcolor="#ffff88"] n4v1 [label="(n4v1) for_each(|(_k, v)| pull_tx.send(v).unwrap())", shape=house, fillcolor="#ffff88"] n5v1 [label="(n5v1) flat_map(|(k, v)| {\l if v == 3 { vec![Persist(k, v), Delete(k)] } else { vec![Persist(k, v)] }\l})\l", shape=house, fillcolor="#ffff88"] - n6v1 [label="(n6v1) persist_mut_keyed::<'mutable>()", shape=invhouse, fillcolor="#88aaff"] + n6v1 [label="(n6v1) persist_mut_keyed::<'mutable>()", shape=house, fillcolor="#ffff88"] n7v1 [label="(n7v1) for_each(|(_k, v)| push_tx.send(v).unwrap())", shape=house, fillcolor="#ffff88"] - n8v1 [label="(n8v1) handoff", shape=parallelogram, fillcolor="#ddddff"] - n9v1 [label="(n9v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n8v1 + n1v1 -> n2v1 n3v1 -> n4v1 n6v1 -> n7v1 - n5v1 -> n9v1 + n5v1 -> n6v1 n3v1 -> n5v1 - n8v1 -> n2v1 [color=red] - n9v1 -> n6v1 [color=red] subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" - subgraph sg_1v1_var_my_tee { - cluster=true - label="var my_tee" - n1v1 - } - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n4v1 n5v1 - subgraph sg_2v1_var_my_tee { + n6v1 + n7v1 + subgraph sg_1v1_var_my_tee { cluster=true label="var my_tee" + n1v1 n2v1 n3v1 } } - subgraph sg_3v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_3v1" - n6v1 - n7v1 - } } diff --git a/dfir_rs/tests/snapshots/surface_persist__persist_mut_keyed@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_persist__persist_mut_keyed@graphvis_mermaid.snap index 119f59c6f422..a6c301a87fe7 100644 --- a/dfir_rs/tests/snapshots/surface_persist__persist_mut_keyed@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_persist__persist_mut_keyed@graphvis_mermaid.snap @@ -13,32 +13,22 @@ linkStyle default stroke:#aaa 3v1[/"(3v1) tee()"\]:::pushClass 4v1[/"(4v1) for_each(|(_k, v)| pull_tx.send(v).unwrap())"\]:::pushClass 5v1[/"
(5v1)
flat_map(|(k, v)| {
if v == 3 { vec![Persist(k, v), Delete(k)] } else { vec![Persist(k, v)] }
})
"\]:::pushClass -6v1[\"(6v1) persist_mut_keyed::<'mutable>()"/]:::pullClass +6v1[/"(6v1) persist_mut_keyed::<'mutable>()"\]:::pushClass 7v1[/"(7v1) for_each(|(_k, v)| push_tx.send(v).unwrap())"\]:::pushClass -8v1["(8v1) handoff"]:::otherClass -9v1["(9v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->8v1 +1v1-->2v1 3v1-->4v1 6v1-->7v1 -5v1-->9v1 +5v1-->6v1 3v1-->5v1 -8v1--x2v1; linkStyle 6 stroke:red -9v1--x6v1; linkStyle 7 stroke:red subgraph sg_1v1 ["sg_1v1"] - subgraph sg_1v1_var_my_tee ["var my_tee"] - 1v1 - end -end -subgraph sg_2v1 ["sg_2v1"] 4v1 5v1 - subgraph sg_2v1_var_my_tee ["var my_tee"] + 6v1 + 7v1 + subgraph sg_1v1_var_my_tee ["var my_tee"] + 1v1 2v1 3v1 end end -subgraph sg_3v1 ["sg_3v1"] - 6v1 - 7v1 -end diff --git a/dfir_rs/tests/snapshots/surface_persist__persist_pull@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_persist__persist_pull@graphvis_dot.snap index f254b8247024..13be15ad6167 100644 --- a/dfir_rs/tests/snapshots/surface_persist__persist_pull@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_persist__persist_pull@graphvis_dot.snap @@ -14,7 +14,6 @@ digraph { n7v1 [label="(n7v1) union()", shape=invhouse, fillcolor="#88aaff"] n8v1 [label="(n8v1) fold(|| 0, |a: &mut _, b| *a += b)", shape=invhouse, fillcolor="#88aaff"] n9v1 [label="(n9v1) for_each(|x| result_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"] - n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n4v1 n1v1 -> n2v1 n3v1 -> n4v1 @@ -22,8 +21,7 @@ digraph { n4v1 -> n5v1 n6v1 -> n7v1 n8v1 -> n9v1 - n7v1 -> n10v1 - n10v1 -> n8v1 [color=red] + n7v1 -> n8v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" @@ -43,16 +41,6 @@ digraph { cluster=true label="var m1" n7v1 - } - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" - subgraph sg_2v1_var_m1 { - cluster=true - label="var m1" n8v1 n9v1 } diff --git a/dfir_rs/tests/snapshots/surface_persist__persist_pull@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_persist__persist_pull@graphvis_mermaid.snap index 23d8a00c8b2c..2a09ae7d2eca 100644 --- a/dfir_rs/tests/snapshots/surface_persist__persist_pull@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_persist__persist_pull@graphvis_mermaid.snap @@ -17,7 +17,6 @@ linkStyle default stroke:#aaa 7v1[\"(7v1) union()"/]:::pullClass 8v1[\"(8v1) fold(|| 0, |a: &mut _, b| *a += b)"/]:::pullClass 9v1[/"(9v1) for_each(|x| result_send.send(x).unwrap())"\]:::pushClass -10v1["(10v1) handoff"]:::otherClass 2v1-->4v1 1v1-->2v1 3v1-->4v1 @@ -25,8 +24,7 @@ linkStyle default stroke:#aaa 4v1-->5v1 6v1-->7v1 8v1-->9v1 -7v1-->10v1 -10v1--x8v1; linkStyle 8 stroke:red +7v1-->8v1 subgraph sg_1v1 ["sg_1v1"] 1v1 2v1 @@ -38,10 +36,6 @@ subgraph sg_1v1 ["sg_1v1"] end subgraph sg_1v1_var_m1 ["var m1"] 7v1 - end -end -subgraph sg_2v1 ["sg_2v1"] - subgraph sg_2v1_var_m1 ["var m1"] 8v1 9v1 end diff --git a/dfir_rs/tests/snapshots/surface_persist__persist_push@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_persist__persist_push@graphvis_dot.snap index d8ac1098af23..0e8aa63f7de4 100644 --- a/dfir_rs/tests/snapshots/surface_persist__persist_push@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_persist__persist_push@graphvis_dot.snap @@ -12,9 +12,8 @@ digraph { n5v1 [label="(n5v1) persist::<'static>()", shape=house, fillcolor="#ffff88"] n6v1 [label="(n6v1) tee()", shape=house, fillcolor="#ffff88"] n7v1 [label="(n7v1) null()", shape=house, fillcolor="#ffff88"] - n8v1 [label="(n8v1) fold(|| 0, |a: &mut _, b| *a += b)", shape=invhouse, fillcolor="#88aaff"] + n8v1 [label="(n8v1) fold(|| 0, |a: &mut _, b| *a += b)", shape=house, fillcolor="#ffff88"] n9v1 [label="(n9v1) for_each(|x| result_send.send(x).unwrap())", shape=house, fillcolor="#ffff88"] - n10v1 [label="(n10v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 n1v1 -> n2v1 n3v1 -> n4v1 @@ -22,8 +21,7 @@ digraph { n3v1 -> n5v1 n6v1 -> n7v1 n8v1 -> n9v1 - n6v1 -> n10v1 - n10v1 -> n8v1 [color=red] + n6v1 -> n8v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" @@ -31,6 +29,8 @@ digraph { label = "sg_1v1" n4v1 n7v1 + n8v1 + n9v1 subgraph sg_1v1_var_t0 { cluster=true label="var t0" @@ -45,12 +45,4 @@ digraph { n6v1 } } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" - n8v1 - n9v1 - } } diff --git a/dfir_rs/tests/snapshots/surface_persist__persist_push@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_persist__persist_push@graphvis_mermaid.snap index 7759f7b1ce47..288a0a129418 100644 --- a/dfir_rs/tests/snapshots/surface_persist__persist_push@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_persist__persist_push@graphvis_mermaid.snap @@ -15,9 +15,8 @@ linkStyle default stroke:#aaa 5v1[/"(5v1) persist::<'static>()"\]:::pushClass 6v1[/"(6v1) tee()"\]:::pushClass 7v1[/"(7v1) null()"\]:::pushClass -8v1[\"(8v1) fold(|| 0, |a: &mut _, b| *a += b)"/]:::pullClass +8v1[/"(8v1) fold(|| 0, |a: &mut _, b| *a += b)"\]:::pushClass 9v1[/"(9v1) for_each(|x| result_send.send(x).unwrap())"\]:::pushClass -10v1["(10v1) handoff"]:::otherClass 2v1-->3v1 1v1-->2v1 3v1-->4v1 @@ -25,11 +24,12 @@ linkStyle default stroke:#aaa 3v1-->5v1 6v1-->7v1 8v1-->9v1 -6v1-->10v1 -10v1--x8v1; linkStyle 8 stroke:red +6v1-->8v1 subgraph sg_1v1 ["sg_1v1"] 4v1 7v1 + 8v1 + 9v1 subgraph sg_1v1_var_t0 ["var t0"] 1v1 2v1 @@ -40,7 +40,3 @@ subgraph sg_1v1 ["sg_1v1"] 6v1 end end -subgraph sg_2v1 ["sg_2v1"] - 8v1 - 9v1 -end diff --git a/dfir_rs/tests/snapshots/surface_sort__sort@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_sort__sort@graphvis_dot.snap index f82b6587decc..14503bc05e5f 100644 --- a/dfir_rs/tests/snapshots/surface_sort__sort@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_sort__sort@graphvis_dot.snap @@ -8,22 +8,14 @@ digraph { n1v1 [label="(n1v1) source_stream(items_recv)", shape=invhouse, fillcolor="#88aaff"] n2v1 [label="(n2v1) sort()", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) for_each(|v| print!(\"{:?}, \", v))", shape=house, fillcolor="#ffff88"] - n4v1 [label="(n4v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n4v1 - n4v1 -> n2v1 [color=red] + n1v1 -> n2v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" n1v1 - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n2v1 n3v1 } diff --git a/dfir_rs/tests/snapshots/surface_sort__sort@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_sort__sort@graphvis_mermaid.snap index 6875ef8d17f4..5dad7825e9ca 100644 --- a/dfir_rs/tests/snapshots/surface_sort__sort@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_sort__sort@graphvis_mermaid.snap @@ -11,14 +11,10 @@ linkStyle default stroke:#aaa 1v1[\"(1v1) source_stream(items_recv)"/]:::pullClass 2v1[\"(2v1) sort()"/]:::pullClass 3v1[/"(3v1) for_each(|v| print!("{:?}, ", v))"\]:::pushClass -4v1["(4v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->4v1 -4v1--x2v1; linkStyle 2 stroke:red +1v1-->2v1 subgraph sg_1v1 ["sg_1v1"] 1v1 -end -subgraph sg_2v1 ["sg_2v1"] 2v1 3v1 end diff --git a/dfir_rs/tests/snapshots/surface_sort__sort_by_key@graphvis_dot.snap b/dfir_rs/tests/snapshots/surface_sort__sort_by_key@graphvis_dot.snap index 7e7a24ab721f..871be3dcd90c 100644 --- a/dfir_rs/tests/snapshots/surface_sort__sort_by_key@graphvis_dot.snap +++ b/dfir_rs/tests/snapshots/surface_sort__sort_by_key@graphvis_dot.snap @@ -8,22 +8,14 @@ digraph { n1v1 [label="(n1v1) source_iter(vec![(2, 'y'), (3, 'x'), (1, 'z')])", shape=invhouse, fillcolor="#88aaff"] n2v1 [label="(n2v1) sort_by_key(|(k, _v)| k)", shape=invhouse, fillcolor="#88aaff"] n3v1 [label="(n3v1) for_each(|v| println!(\"{:?}\", v))", shape=house, fillcolor="#ffff88"] - n4v1 [label="(n4v1) handoff", shape=parallelogram, fillcolor="#ddddff"] n2v1 -> n3v1 - n1v1 -> n4v1 - n4v1 -> n2v1 [color=red] + n1v1 -> n2v1 subgraph sg_1v1 { cluster=true fillcolor="#dddddd" style=filled label = "sg_1v1" n1v1 - } - subgraph sg_2v1 { - cluster=true - fillcolor="#dddddd" - style=filled - label = "sg_2v1" n2v1 n3v1 } diff --git a/dfir_rs/tests/snapshots/surface_sort__sort_by_key@graphvis_mermaid.snap b/dfir_rs/tests/snapshots/surface_sort__sort_by_key@graphvis_mermaid.snap index 11b297fc9687..9fc81dd1e3f6 100644 --- a/dfir_rs/tests/snapshots/surface_sort__sort_by_key@graphvis_mermaid.snap +++ b/dfir_rs/tests/snapshots/surface_sort__sort_by_key@graphvis_mermaid.snap @@ -11,14 +11,10 @@ linkStyle default stroke:#aaa 1v1[\"(1v1) source_iter(vec![(2, 'y'), (3, 'x'), (1, 'z')])"/]:::pullClass 2v1[\"(2v1) sort_by_key(|(k, _v)| k)"/]:::pullClass 3v1[/"(3v1) for_each(|v| println!("{:?}", v))"\]:::pushClass -4v1["(4v1) handoff"]:::otherClass 2v1-->3v1 -1v1-->4v1 -4v1--x2v1; linkStyle 2 stroke:red +1v1-->2v1 subgraph sg_1v1 ["sg_1v1"] 1v1 -end -subgraph sg_2v1 ["sg_2v1"] 2v1 3v1 end diff --git a/dfir_rs/tests/surface_persist_mut_push.rs b/dfir_rs/tests/surface_persist_mut_push.rs new file mode 100644 index 000000000000..3a081ca0d91b --- /dev/null +++ b/dfir_rs/tests/surface_persist_mut_push.rs @@ -0,0 +1,27 @@ +//! Isolated test for persist_mut push-side borrow conflict. +use dfir_rs::dfir_syntax; +use dfir_rs::util::Persistence::*; +use dfir_rs::util::collect_ready; +use multiplatform_test::multiplatform_test; + +#[multiplatform_test] +pub fn test_persist_mut_push() { + let (pull_tx, mut pull_rx) = dfir_rs::util::unbounded_channel::(); + let (push_tx, mut push_rx) = dfir_rs::util::unbounded_channel::(); + + let mut df = dfir_syntax! { + my_tee = source_iter([Persist(1), Persist(2), Persist(3), Persist(4), Delete(2)]) + -> persist_mut::<'mutable>() + -> tee(); + + my_tee + -> for_each(|v| pull_tx.send(v).unwrap()); + + my_tee + -> flat_map(|x| if x == 3 {vec![Persist(x), Delete(x)]} else {vec![Persist(x)]}) + -> persist_mut::<'mutable>() + -> for_each(|v| push_tx.send(v).unwrap()); + }; + + df.run_available_sync(); +} diff --git a/dfir_rs/tests/surface_persist_mut_push_expanded.rs b/dfir_rs/tests/surface_persist_mut_push_expanded.rs new file mode 100644 index 000000000000..db44445f2984 --- /dev/null +++ b/dfir_rs/tests/surface_persist_mut_push_expanded.rs @@ -0,0 +1,95 @@ +//! Cleaned-up expansion of the persist_mut push-side borrow conflict. +//! This file reproduces the borrow error without the macro. +//! +//! The issue: `sg_1v1_node_1v1_iter` is mutably borrowed by `op_1v1` (via pull::iter), +//! which flows into `op_2v1` (persist_mut pull-side). After the pivot `.await`, +//! the codegen emits `Iterator::for_each(&mut sg_1v1_node_1v1_iter, drop)` to drain +//! remaining items. But Rust can't prove the first borrow is dead because `op_2v1` +//! is an opaque `impl Pull` type. + +#![allow(unused)] + +use dfir_rs::dfir_pipes; +use dfir_rs::util::Persistence::{self, *}; +use dfir_rs::util::sparse_vec::SparseVec; + +async fn repro() { + // Prologue (outer scope, persists across ticks) + let mut sg_1v1_node_1v1_iter = { + std::iter::IntoIterator::into_iter([ + Persist(1usize), + Persist(2), + Persist(3), + Persist(4), + Delete(2), + ]) + }; + let mut sg_1v1_node_2v1_persistdata: SparseVec = SparseVec::default(); + let mut sg_1v1_node_6v1_persistdata: SparseVec = SparseVec::default(); + + let (pull_tx, _) = dfir_rs::util::unbounded_channel::(); + let (push_tx, _) = dfir_rs::util::unbounded_channel::(); + + // Tick closure (runs each tick) + // --- Subgraph 1 --- + { + // Scoped block for pull chain — borrow of iter ends when block ends + { + // Pull side: source_iter -> persist_mut + let op_1v1 = dfir_pipes::pull::iter(&mut sg_1v1_node_1v1_iter); + + // Type guard wrapper (this is what the codegen generates to check types) + fn type_guard( + input: Input, + ) -> impl dfir_pipes::pull::Pull< + Item = Item, + Meta = (), + CanPend = Input::CanPend, + CanEnd = Input::CanEnd, + > + where + Input: dfir_pipes::pull::Pull, + { + input + } + let op_1v1 = type_guard(op_1v1); + + let op_2v1 = { + // persist_mut consumes op_1v1, accumulates, then emits from persistdata + let fut = dfir_pipes::pull::Pull::for_each(op_1v1, |item| match item { + Persist(v) => sg_1v1_node_2v1_persistdata.push(v), + Delete(v) => sg_1v1_node_2v1_persistdata.delete(&v), + }); + fut.await; + dfir_pipes::pull::iter(sg_1v1_node_2v1_persistdata.iter().cloned()) + }; + let op_2v1 = type_guard(op_2v1); + + // Push side: for_each (push_tx), flat_map -> persist_mut -> for_each (pull_tx) + let op_7v1 = dfir_pipes::push::for_each(|v: usize| push_tx.send(v).unwrap()); + let op_6v1 = dfir_pipes::push::for_each(|item: Persistence| match item { + Persist(v) => sg_1v1_node_6v1_persistdata.push(v), + Delete(v) => sg_1v1_node_6v1_persistdata.delete(&v), + }); + let op_5v1 = dfir_pipes::push::flat_map( + |x: usize| { + if x == 3 { + vec![Persist(x), Delete(x)] + } else { + vec![Persist(x)] + } + }, + op_6v1, + ); + let op_4v1 = dfir_pipes::push::for_each(|v: usize| pull_tx.send(v).unwrap()); + let op_3v1 = dfir_pipes::push::fanout(op_4v1, op_5v1); // tee + + // Pivot: pull -> push + dfir_pipes::pull::Pull::send_push(op_2v1, op_3v1).await; + } + // After the block, the &mut borrow of sg_1v1_node_1v1_iter is provably dead. + + // Cleanup: drain remaining items from source_iter + std::iter::Iterator::for_each(&mut sg_1v1_node_1v1_iter, std::mem::drop); + } +} diff --git a/dfir_rs/tests/surface_push_blocking.rs b/dfir_rs/tests/surface_push_blocking.rs new file mode 100644 index 000000000000..0ee56042c14b --- /dev/null +++ b/dfir_rs/tests/surface_push_blocking.rs @@ -0,0 +1,62 @@ +//! Tests that blocking operators work on the push side of a subgraph. +//! These operators end up push-side when a tee (multi-output) is upstream in the same subgraph. + +use dfir_rs::dfir_syntax; +use dfir_rs::util::collect_ready; +use multiplatform_test::multiplatform_test; + +/// sort on push side: source -> tee -> sort -> for_each +#[multiplatform_test] +pub fn test_sort_push() { + let (out_send, mut out_recv) = dfir_rs::util::unbounded_channel::(); + let mut df = dfir_syntax! { + my_tee = source_iter([3, 1, 2]) -> tee(); + my_tee -> sort() -> for_each(|v| out_send.send(v).unwrap()); + my_tee -> null(); + }; + df.run_available_sync(); + assert_eq!(&[1, 2, 3], &*collect_ready::, _>(&mut out_recv)); +} + +/// reduce on push side: source -> tee -> reduce -> for_each +#[multiplatform_test] +pub fn test_reduce_push() { + let (out_send, mut out_recv) = dfir_rs::util::unbounded_channel::(); + let mut df = dfir_syntax! { + my_tee = source_iter([1, 2, 3]) -> tee(); + my_tee -> reduce(|a: &mut _, b| *a += b) -> for_each(|v| out_send.send(v).unwrap()); + my_tee -> null(); + }; + df.run_available_sync(); + assert_eq!(&[6], &*collect_ready::, _>(&mut out_recv)); +} + +/// fold_keyed on push side +#[multiplatform_test] +pub fn test_fold_keyed_push() { + let (out_send, mut out_recv) = dfir_rs::util::unbounded_channel::<(i32, i32)>(); + let mut df = dfir_syntax! { + my_tee = source_iter([(1, 10), (1, 20), (2, 30)]) -> tee(); + my_tee -> fold_keyed(|| 0, |a: &mut _, b| *a += b) -> for_each(|v| out_send.send(v).unwrap()); + my_tee -> null(); + }; + df.run_available_sync(); + let mut out = collect_ready::, _>(&mut out_recv); + out.sort(); + assert_eq!(&[(1, 30), (2, 30)], &*out); +} + +/// reduce_keyed on push side +#[multiplatform_test] +pub fn test_reduce_keyed_push() { + let (out_send, mut out_recv) = dfir_rs::util::unbounded_channel::<(i32, i32)>(); + let mut df = dfir_syntax! { + my_tee = source_iter([(1, 10), (1, 20), (2, 30)]) -> tee(); + my_tee -> reduce_keyed(|a: &mut _, b| *a += b) -> for_each(|v| out_send.send(v).unwrap()); + my_tee -> null(); + }; + df.run_available_sync(); + let mut out = collect_ready::, _>(&mut out_recv); + out.sort(); + assert_eq!(&[(1, 30), (2, 30)], &*out); +}