Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions dfir_lang/src/graph/graph_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ where
src = src_str.trim(),
arrow_body = "--",
arrow_head = match delay_type {
None | Some(DelayType::MonotoneAccum) => ">",
None => ">",
Some(DelayType::Stratum) => "x",
Some(DelayType::Tick | DelayType::TickLazy) => "o",
},
Expand All @@ -201,7 +201,6 @@ where
self.link_count,
match delay_type {
DelayType::Stratum | DelayType::Tick | DelayType::TickLazy => "red",
DelayType::MonotoneAccum => "#060",
}
)?;
}
Expand Down
11 changes: 1 addition & 10 deletions dfir_lang/src/graph/meta_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1422,16 +1422,7 @@ impl DfirGraph {
);
let root = change_spans(root.clone(), pivot_span);
subgraph_op_iter_code.push(quote_spanned! {pivot_span=>
#[inline(always)]
fn #pivot_fn_ident<Pul, Psh, Item>(pull: Pul, push: Psh)
-> impl ::std::future::Future<Output = ()>
where
Pul: #root::dfir_pipes::pull::Pull<Item = Item>,
Psh: #root::dfir_pipes::push::Push<Item, Pul::Meta>,
{
#root::dfir_pipes::pull::Pull::send_push(pull, push)
}
(#pivot_fn_ident)(#pull_ident, #push_ident).await;
#root::dfir_pipes::pull::Pull::send_push(#pull_ident, #push_ident).await;
});
}
};
Expand Down
10 changes: 1 addition & 9 deletions dfir_lang/src/graph/ops/anti_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ pub const ANTI_JOIN: OperatorConstraints = OperatorConstraints {
},
write_fn: |wc @ &WriteContextArgs {
root,
context,
op_span,
work_fn_async,
ident,
Expand Down Expand Up @@ -118,21 +117,14 @@ pub const ANTI_JOIN: OperatorConstraints = OperatorConstraints {
let #ident = {
#accum_neg

let replay_idx = if #context.is_first_run_this_tick() {
0
} else {
#pos_ident.len()
};

// Accum into pos vec
let fut = #root::dfir_pipes::pull::Pull::for_each(#input_pos, |kv| {
#pos_ident.push(kv);
});
let () = #work_fn_async(fut).await;

// Replay out of pos vec
let iter = #pos_ident[replay_idx..].iter();
let iter = ::std::iter::Iterator::filter(iter, |(k, _)| {
let iter = ::std::iter::Iterator::filter(#pos_ident.iter(), |(k, _)| {
!#neg_ident.contains(k)
});
let iter = ::std::iter::Iterator::cloned(iter);
Expand Down
25 changes: 4 additions & 21 deletions dfir_lang/src/graph/ops/cross_join_multiset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub const CROSS_JOIN_MULTISET: OperatorConstraints = OperatorConstraints {
input_delaytype_fn: |_| None,
write_fn: |wc @ &WriteContextArgs {
root,
context,
op_span,
work_fn_async,
ident,
Expand Down Expand Up @@ -74,33 +73,17 @@ pub const CROSS_JOIN_MULTISET: OperatorConstraints = OperatorConstraints {
_ => Default::default(),
};

let lhs_i = wc.make_ident("lhs_i");
let rhs_i = wc.make_ident("rhs_i");
let write_iterator = quote_spanned! {op_span=>
let (#lhs_i, #rhs_i) = if #context.is_first_run_this_tick() {
(0, 0)
} else {
(#lhs_state.len(), #rhs_state.len())
};

#work_fn_async(#root::dfir_pipes::pull::Pull::for_each(#lhs, |x| #lhs_state.push(x))).await;
#work_fn_async(#root::dfir_pipes::pull::Pull::for_each(#rhs, |x| #rhs_state.push(x))).await;

// RHS
// +-----+-----+
// L | Old | New |
// H +-----+-----+
// S | New | New |
// +-----+-----+
let #ident = #root::dfir_pipes::pull::iter(
#lhs_state
.iter()
.enumerate()
.flat_map(|(i, lhs)| {
let j = if i < #lhs_i { #rhs_i } else { 0 };
#rhs_state[j..]
.iter()
.map(move |rhs| (::std::clone::Clone::clone(lhs), ::std::clone::Clone::clone(rhs)))
.flat_map(|lhs| {
#rhs_state
.iter()
.map(move |rhs| (::std::clone::Clone::clone(lhs), ::std::clone::Clone::clone(rhs)))
})
);
};
Expand Down
13 changes: 11 additions & 2 deletions dfir_lang/src/graph/ops/fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,24 @@ pub const FOLD: OperatorConstraints = OperatorConstraints {
)
);
}
} else {
assert_eq!(0, outputs.len());
} else if outputs.is_empty() {
// Terminal push: fold is a singleton reference target with no downstream.
quote_spanned! {op_span=>
let #ident = #root::dfir_pipes::push::for_each(|#item_ident| {
#assign_accum_ident

#foreach_body
});
}
} else {
let output = &outputs[0];
quote_spanned! {op_span=>
let #ident = #root::dfir_pipes::push::Fold::new(
&mut #singleton_output_ident,
|#accumulator_ident: &mut _, #item_ident| { #foreach_body },
#output,
);
}
};

Ok(OperatorWriteOutput {
Expand Down
30 changes: 16 additions & 14 deletions dfir_lang/src/graph/ops/fold_keyed.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use quote::{ToTokens, quote_spanned};

use super::{
DelayType, OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance,
OpInstGenerics, OperatorCategory, OperatorConstraints, OperatorInstance,
OperatorWriteOutput, Persistence, RANGE_1, WriteContextArgs,
};

Expand Down Expand Up @@ -80,17 +80,16 @@ pub const FOLD_KEYED: OperatorConstraints = OperatorConstraints {
flo_type: None,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| Some(DelayType::Stratum),
input_delaytype_fn: |_| None,
write_fn: |wc @ &WriteContextArgs {
context,
op_span,
work_fn_async,
ident,
inputs,
outputs,
singleton_output_ident,
is_pull,
root,
op_name,
op_inst:
OperatorInstance {
generics:
Expand All @@ -105,8 +104,6 @@ pub const FOLD_KEYED: OperatorConstraints = OperatorConstraints {
..
},
_| {
assert!(is_pull, "TODO(mingwei): `{}` only supports pull.", op_name);

let persistence = match persistence_args[..] {
[] => Persistence::Tick,
[a] => a,
Expand Down Expand Up @@ -145,7 +142,17 @@ pub const FOLD_KEYED: OperatorConstraints = OperatorConstraints {
let mut #hashtable_ident = &mut #singleton_output_ident;
};

let write_iterator = if Persistence::Mutable == persistence {
let write_iterator = if !is_pull {
let output = &outputs[0];
quote_spanned! {op_span=>
let #ident = #root::dfir_pipes::push::FoldKeyed::new(
&mut #singleton_output_ident,
#initfn,
#aggfn,
#output,
);
}
} else if Persistence::Mutable == persistence {
quote_spanned! {op_span=>
#assign_hashtable_ident

Expand Down Expand Up @@ -202,13 +209,8 @@ pub const FOLD_KEYED: OperatorConstraints = OperatorConstraints {
)
},
Persistence::Static => quote_spanned! {op_span=>
// Play everything but only on the first run of this tick/stratum.
// (We know we won't have any more inputs, so it is fine to only play once.
// Because of the `DelayType::Stratum` or `DelayType::MonotoneAccum`).
#context.is_first_run_this_tick()
.then_some(#hashtable_ident.iter())
.into_iter()
.flatten()
// Play everything (each subgraph runs exactly once per tick).
#hashtable_ident.iter()
.map(
#[allow(suspicious_double_ref_op, clippy::clone_on_copy)]
|(k, v)| (
Expand Down
2 changes: 1 addition & 1 deletion dfir_lang/src/graph/ops/fold_no_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub const FOLD_NO_REPLAY: OperatorConstraints = OperatorConstraints {
let () = #work_fn_async(__fut).await;
}

let #ident = if __was_updated || (#context.current_tick().0 == 0 && #context.is_first_run_this_tick()) {
let #ident = if __was_updated || #context.current_tick().0 == 0 {
#work_fn(
|| #root::dfir_pipes::pull::iter(
::std::option::Option::Some(::std::clone::Clone::clone(&*#accumulator_ident))
Expand Down
3 changes: 1 addition & 2 deletions dfir_lang/src/graph/ops/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ pub const JOIN: OperatorConstraints = OperatorConstraints {
input_delaytype_fn: |_| None,
write_fn: |wc @ &WriteContextArgs {
root,
context,
loop_id,
op_span,
work_fn,
Expand Down Expand Up @@ -205,7 +204,7 @@ pub const JOIN: OperatorConstraints = OperatorConstraints {
).await
}

let fut = check_inputs(#lhs, #rhs, &mut #lhs_joindata_ident, &mut #rhs_joindata_ident, #context.is_first_run_this_tick());
let fut = check_inputs(#lhs, #rhs, &mut #lhs_joindata_ident, &mut #rhs_joindata_ident, true);
#work_fn_async(fut).await
};
};
Expand Down
14 changes: 3 additions & 11 deletions dfir_lang/src/graph/ops/join_fused_lhs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ pub const JOIN_FUSED_LHS: OperatorConstraints = OperatorConstraints {
},
write_fn: |wc @ &WriteContextArgs {
root,
context,
op_span,
work_fn_async,
ident,
Expand Down Expand Up @@ -101,13 +100,6 @@ pub const JOIN_FUSED_LHS: OperatorConstraints = OperatorConstraints {
#root::dfir_pipes::pull::accumulate_all(&mut #lhs_accum, &mut *#lhs_borrow, #lhs),
).await;

// RHS replay index.
let replay_idx = if #context.is_first_run_this_tick() {
0
} else {
#rhs_borrow_ident.len()
};

// Accumulate RHS.
let () = #work_fn_async(
#root::dfir_pipes::pull::Pull::for_each(#rhs, |kv| {
Expand All @@ -118,9 +110,9 @@ pub const JOIN_FUSED_LHS: OperatorConstraints = OperatorConstraints {

#[allow(clippy::clone_on_copy)]
#[allow(suspicious_double_ref_op)]
let iter = #rhs_borrow_ident[replay_idx..]
.iter()
.filter_map(|(k, v2)| #lhs_borrow.get(k).map(|v1| (k.clone(), (v1.clone(), v2.clone()))));
let iter = #rhs_borrow_ident
.iter()
.filter_map(|(k, v2)| #lhs_borrow.get(k).map(|v1| (k.clone(), (v1.clone(), v2.clone()))));
#root::dfir_pipes::pull::iter(iter)
};
},
Expand Down
9 changes: 1 addition & 8 deletions dfir_lang/src/graph/ops/join_multiset_half.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ pub const JOIN_MULTISET_HALF: OperatorConstraints = OperatorConstraints {
},
write_fn: |wc @ &WriteContextArgs {
root,
context,
op_span,
work_fn_async,
ident,
Expand Down Expand Up @@ -137,12 +136,6 @@ pub const JOIN_MULTISET_HALF: OperatorConstraints = OperatorConstraints {
let #ident = {
#accum_build

let replay_idx = if #context.is_first_run_this_tick() {
0
} else {
#probe_ident.len()
};

// Accum into probe vec
let fut = #root::dfir_pipes::pull::Pull::for_each(#input_probe, |kv| {
#probe_ident.push(kv);
Expand All @@ -151,7 +144,7 @@ pub const JOIN_MULTISET_HALF: OperatorConstraints = OperatorConstraints {

// Replay out of probe vec
#[allow(clippy::clone_on_copy, noop_method_call)]
let iter = #probe_ident[replay_idx..].iter().flat_map(|(k, v_probe)| {
let iter = #probe_ident.iter().flat_map(|(k, v_probe)| {
#build_ident
.get(k)
.map(|vals: &::std::vec::Vec<_>| {
Expand Down
15 changes: 6 additions & 9 deletions dfir_lang/src/graph/ops/multiset_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ pub const MULTISET_DELTA: OperatorConstraints = OperatorConstraints {
write_fn: |wc @ &WriteContextArgs {
root,
op_span,
context,
ident,
inputs,
outputs,
Expand All @@ -74,14 +73,12 @@ pub const MULTISET_DELTA: OperatorConstraints = OperatorConstraints {

let tick_swap = quote_spanned! {op_span=>
{
if #context.is_first_run_this_tick() {
let (mut prev_map, mut curr_map) = (
#prev_data.borrow_mut(),
#curr_data.borrow_mut(),
);
::std::mem::swap(::std::ops::DerefMut::deref_mut(&mut prev_map), ::std::ops::DerefMut::deref_mut(&mut curr_map));
curr_map.clear();
}
let (mut prev_map, mut curr_map) = (
#prev_data.borrow_mut(),
#curr_data.borrow_mut(),
);
::std::mem::swap(::std::ops::DerefMut::deref_mut(&mut prev_map), ::std::ops::DerefMut::deref_mut(&mut curr_map));
curr_map.clear();
}
};

Expand Down
11 changes: 2 additions & 9 deletions dfir_lang/src/graph/ops/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ pub const PERSIST: OperatorConstraints = OperatorConstraints {
input_delaytype_fn: |_| None,
write_fn: |wc @ &WriteContextArgs {
root,
context,
op_span,
ident,
is_pull,
Expand Down Expand Up @@ -99,18 +98,12 @@ pub const PERSIST: OperatorConstraints = OperatorConstraints {
let #vec_ident = &mut #persistdata_ident;

let #ident = {
let replay_idx = if #context.is_first_run_this_tick() {
0
} else {
#vec_ident.len()
};

let fut = #root::dfir_pipes::pull::Pull::for_each(#input, |item| {
#vec_ident.push(item);
});
let () = #work_fn_async(fut).await;

let iter = #vec_ident[replay_idx..].iter().cloned();
let iter = #vec_ident.iter().cloned();
#root::dfir_pipes::pull::iter(iter)
};
}
Expand All @@ -127,7 +120,7 @@ pub const PERSIST: OperatorConstraints = OperatorConstraints {
{
#root::dfir_pipes::push::persist_state(vec, is_new_tick, output)
}
constrain_types(&mut *#vec_ident, #output, #context.is_first_run_this_tick())
constrain_types(&mut *#vec_ident, #output, true)
};
}
};
Expand Down
Loading
Loading