From 67a57ace79c9f146bda1a67831e25a9005c68bb7 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 16 Apr 2026 00:22:40 -0500 Subject: [PATCH 01/11] WIP fix partitioned hash join dynamic filter cancellation --- .../physical-plan/src/joins/hash_join/exec.rs | 335 +++++++++++++++--- .../physical-plan/src/repartition/mod.rs | 22 +- report.md | 134 +++++++ 3 files changed, 427 insertions(+), 64 deletions(-) create mode 100644 report.md diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index d064f5ce6c3b7..c31f0f64d1de2 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -35,7 +35,8 @@ use crate::joins::Map; use crate::joins::array_map::ArrayMap; use crate::joins::hash_join::inlist_builder::build_struct_inlist_values; use crate::joins::hash_join::shared_bounds::{ - ColumnBounds, PartitionBounds, PushdownStrategy, SharedBuildAccumulator, + ColumnBounds, PartitionBounds, PartitionBuildData, PushdownStrategy, + SharedBuildAccumulator, }; use crate::joins::hash_join::stream::{ BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState, @@ -75,8 +76,8 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ - JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, internal_err, - plan_err, project_schema, + DataFusionError, JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, + internal_err, plan_err, project_schema, }; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; @@ -1316,6 +1317,33 @@ impl ExecutionPlan for HashJoinExec { .with_category(MetricCategory::Rows) .counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition); + // Initialize build_accumulator lazily with runtime partition counts (only if enabled) + // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing + let repartition_random_state = REPARTITION_RANDOM_STATE; + let on_right = self + .on + .iter() + .map(|(_, right_expr)| Arc::clone(right_expr)) + .collect::>(); + let build_accumulator = enable_dynamic_filter_pushdown + .then(|| { + self.dynamic_filter.as_ref().map(|df| { + let filter = Arc::clone(&df.filter); + Some(Arc::clone(df.build_accumulator.get_or_init(|| { + Arc::new(SharedBuildAccumulator::new_from_partition_mode( + self.mode, + self.left.as_ref(), + self.right.as_ref(), + filter, + on_right.clone(), + repartition_random_state, + )) + }))) + }) + }) + .flatten() + .flatten(); + let left_fut = match self.mode { PartitionMode::CollectLeft => self.left_fut.try_once(|| { let left_stream = self.left.execute(0, Arc::clone(&context))?; @@ -1343,20 +1371,45 @@ impl ExecutionPlan for HashJoinExec { let reservation = MemoryConsumer::new(format!("HashJoinInput[{partition}]")) .register(context.memory_pool()); - - OnceFut::new(collect_left_input( - self.random_state.random_state().clone(), - left_stream, - on_left.clone(), - join_metrics.clone(), - reservation, - need_produce_result_in_final(self.join_type), - 1, - enable_dynamic_filter_pushdown, - Arc::clone(context.session_config().options()), - self.null_equality, - array_map_created_count, - )) + let build_accumulator = build_accumulator + .as_ref() + .map(|acc| (Arc::clone(acc), partition)); + + if build_accumulator.is_some() { + let task = tokio::task::spawn(collect_left_input_and_maybe_report( + self.random_state.random_state().clone(), + left_stream, + on_left.clone(), + join_metrics.clone(), + reservation, + need_produce_result_in_final(self.join_type), + 1, + enable_dynamic_filter_pushdown, + Arc::clone(context.session_config().options()), + self.null_equality, + array_map_created_count, + build_accumulator, + )); + OnceFut::new(async move { + task.await.map_err(|err| { + DataFusionError::ExecutionJoin(Box::new(err)) + })? + }) + } else { + OnceFut::new(collect_left_input( + self.random_state.random_state().clone(), + left_stream, + on_left.clone(), + join_metrics.clone(), + reservation, + need_produce_result_in_final(self.join_type), + 1, + enable_dynamic_filter_pushdown, + Arc::clone(context.session_config().options()), + self.null_equality, + array_map_created_count, + )) + } } PartitionMode::Auto => { return plan_err!( @@ -1368,33 +1421,6 @@ impl ExecutionPlan for HashJoinExec { let batch_size = context.session_config().batch_size(); - // Initialize build_accumulator lazily with runtime partition counts (only if enabled) - // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing - let repartition_random_state = REPARTITION_RANDOM_STATE; - let build_accumulator = enable_dynamic_filter_pushdown - .then(|| { - self.dynamic_filter.as_ref().map(|df| { - let filter = Arc::clone(&df.filter); - let on_right = self - .on - .iter() - .map(|(_, right_expr)| Arc::clone(right_expr)) - .collect::>(); - Some(Arc::clone(df.build_accumulator.get_or_init(|| { - Arc::new(SharedBuildAccumulator::new_from_partition_mode( - self.mode, - self.left.as_ref(), - self.right.as_ref(), - filter, - on_right, - repartition_random_state, - )) - }))) - }) - }) - .flatten() - .flatten(); - // we have the batches and the hash map with their keys. We can how create a stream // over the right that uses this information to issue new batches. let right_stream = self.right.execute(partition, context)?; @@ -1413,6 +1439,13 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|(_, right_expr)| Arc::clone(right_expr)) .collect::>(); + let stream_build_accumulator = match self.mode { + PartitionMode::Partitioned => None, + PartitionMode::CollectLeft => build_accumulator, + PartitionMode::Auto => unreachable!( + "PartitionMode::Auto should not be present at execution time" + ), + }; Ok(Box::pin(HashJoinStream::new( partition, @@ -1430,7 +1463,7 @@ impl ExecutionPlan for HashJoinExec { batch_size, vec![], self.right.output_ordering().is_some(), - build_accumulator, + stream_build_accumulator, self.mode, self.null_aware, self.fetch, @@ -2091,6 +2124,51 @@ async fn collect_left_input( Ok(data) } +#[expect(clippy::too_many_arguments)] +async fn collect_left_input_and_maybe_report( + random_state: RandomState, + left_stream: SendableRecordBatchStream, + on_left: Vec, + metrics: BuildProbeJoinMetrics, + reservation: MemoryReservation, + with_visited_indices_bitmap: bool, + probe_threads_count: usize, + should_compute_dynamic_filters: bool, + config: Arc, + null_equality: NullEquality, + array_map_created_count: Count, + build_accumulator: Option<(Arc, usize)>, +) -> Result { + let left_data = collect_left_input( + random_state, + left_stream, + on_left, + metrics, + reservation, + with_visited_indices_bitmap, + probe_threads_count, + should_compute_dynamic_filters, + config, + null_equality, + array_map_created_count, + ) + .await?; + + if let Some((build_accumulator, partition_id)) = build_accumulator { + let build_data = PartitionBuildData::Partitioned { + partition_id, + pushdown: left_data.membership().clone(), + bounds: left_data + .bounds + .clone() + .unwrap_or_else(|| PartitionBounds::new(vec![])), + }; + build_accumulator.report_build_data(build_data).await?; + } + + Ok(left_data) +} + #[cfg(test)] mod tests { use super::*; @@ -2353,6 +2431,22 @@ mod tests { right: Arc, on: JoinOn, join_type: JoinType, + ) -> Result<(HashJoinExec, Arc)> { + hash_join_with_dynamic_filter_and_mode( + left, + right, + on, + join_type, + PartitionMode::CollectLeft, + ) + } + + fn hash_join_with_dynamic_filter_and_mode( + left: Arc, + right: Arc, + on: JoinOn, + join_type: JoinType, + mode: PartitionMode, ) -> Result<(HashJoinExec, Arc)> { let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); let mut join = HashJoinExec::try_new( @@ -2362,7 +2456,7 @@ mod tests { None, &join_type, None, - PartitionMode::CollectLeft, + mode, NullEquality::NullEqualsNothing, false, )?; @@ -5634,6 +5728,155 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_partitioned_dynamic_filter_reports_empty_canceled_partitions() + -> Result<()> { + let mut session_config = SessionConfig::default(); + session_config.options_mut().optimizer.enable_dynamic_filter_pushdown = true; + let task_ctx = + Arc::new(TaskContext::default().with_session_config(session_config)); + + let child_left_schema = Arc::new(Schema::new(vec![ + Field::new("child_left_payload", DataType::Int32, false), + Field::new("child_key", DataType::Int32, false), + Field::new("child_left_extra", DataType::Int32, false), + ])); + let child_right_schema = Arc::new(Schema::new(vec![ + Field::new("child_right_payload", DataType::Int32, false), + Field::new("child_right_key", DataType::Int32, false), + Field::new("child_right_extra", DataType::Int32, false), + ])); + let parent_left_schema = Arc::new(Schema::new(vec![ + Field::new("parent_payload", DataType::Int32, false), + Field::new("parent_key", DataType::Int32, false), + Field::new("parent_extra", DataType::Int32, false), + ])); + + let child_left: Arc = TestMemoryExec::try_new_exec( + &[ + vec![build_table_i32( + ("child_left_payload", &vec![10]), + ("child_key", &vec![0]), + ("child_left_extra", &vec![100]), + )], + vec![build_table_i32( + ("child_left_payload", &vec![11]), + ("child_key", &vec![1]), + ("child_left_extra", &vec![101]), + )], + vec![build_table_i32( + ("child_left_payload", &vec![12]), + ("child_key", &vec![2]), + ("child_left_extra", &vec![102]), + )], + vec![build_table_i32( + ("child_left_payload", &vec![13]), + ("child_key", &vec![3]), + ("child_left_extra", &vec![103]), + )], + ], + Arc::clone(&child_left_schema), + None, + )?; + let child_right: Arc = TestMemoryExec::try_new_exec( + &[ + vec![build_table_i32( + ("child_right_payload", &vec![20]), + ("child_right_key", &vec![0]), + ("child_right_extra", &vec![200]), + )], + vec![build_table_i32( + ("child_right_payload", &vec![21]), + ("child_right_key", &vec![1]), + ("child_right_extra", &vec![201]), + )], + vec![build_table_i32( + ("child_right_payload", &vec![22]), + ("child_right_key", &vec![2]), + ("child_right_extra", &vec![202]), + )], + vec![build_table_i32( + ("child_right_payload", &vec![23]), + ("child_right_key", &vec![3]), + ("child_right_extra", &vec![203]), + )], + ], + Arc::clone(&child_right_schema), + None, + )?; + let parent_left: Arc = TestMemoryExec::try_new_exec( + &[ + vec![build_table_i32( + ("parent_payload", &vec![30]), + ("parent_key", &vec![0]), + ("parent_extra", &vec![300]), + )], + vec![RecordBatch::new_empty(Arc::clone(&parent_left_schema))], + vec![build_table_i32( + ("parent_payload", &vec![32]), + ("parent_key", &vec![2]), + ("parent_extra", &vec![302]), + )], + vec![RecordBatch::new_empty(Arc::clone(&parent_left_schema))], + ], + Arc::clone(&parent_left_schema), + None, + )?; + + let child_on = vec![( + Arc::new(Column::new_with_schema("child_key", &child_left_schema)?) as _, + Arc::new(Column::new_with_schema( + "child_right_key", + &child_right_schema, + )?) as _, + )]; + let (child_join, _child_dynamic_filter) = hash_join_with_dynamic_filter_and_mode( + child_left, + child_right, + child_on, + JoinType::Inner, + PartitionMode::Partitioned, + )?; + let child_join: Arc = Arc::new(child_join); + + let parent_on = vec![( + Arc::new(Column::new_with_schema("parent_key", &parent_left_schema)?) as _, + Arc::new(Column::new_with_schema("child_key", &child_join.schema())?) as _, + )]; + let parent_join = HashJoinExec::try_new( + parent_left, + child_join, + parent_on, + None, + &JoinType::RightSemi, + None, + PartitionMode::Partitioned, + NullEquality::NullEqualsNothing, + false, + )?; + + let batches = tokio::time::timeout( + std::time::Duration::from_secs(5), + crate::execution_plan::collect(Arc::new(parent_join), task_ctx), + ) + .await + .expect("partitioned right-semi join should not hang")?; + + assert_batches_sorted_eq!( + [ + "+--------------------+-----------+------------------+---------------------+-----------------+-------------------+", + "| child_left_payload | child_key | child_left_extra | child_right_payload | child_right_key | child_right_extra |", + "+--------------------+-----------+------------------+---------------------+-----------------+-------------------+", + "| 10 | 0 | 100 | 20 | 0 | 200 |", + "| 12 | 2 | 102 | 22 | 2 | 202 |", + "+--------------------+-----------+------------------+---------------------+-----------------+-------------------+", + ], + &batches + ); + + Ok(()) + } + #[tokio::test] async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report() -> Result<()> { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index d4406360504f9..aef451ea3992d 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -983,7 +983,6 @@ impl ExecutionPlan for RepartitionExec { spill_metrics, input.schema(), ); - // Get existing ordering to use for merging let sort_exprs = self.sort_exprs().cloned(); @@ -1628,10 +1627,7 @@ impl PerPartitionStream { // Poll the memory channel for next message let value = match self.receiver.recv().poll_unpin(cx) { Poll::Ready(v) => v, - Poll::Pending => { - // Nothing from channel, wait - return Poll::Pending; - } + Poll::Pending => return Poll::Pending, }; match value { @@ -1664,10 +1660,7 @@ impl PerPartitionStream { // Continue to poll for more data from other partitions continue; } - None => { - // Channel closed unexpectedly - return Poll::Ready(None); - } + None => return Poll::Ready(None), } } StreamState::ReadingSpilled => { @@ -1680,15 +1673,8 @@ impl PerPartitionStream { Poll::Ready(Some(Err(e))) => { return Poll::Ready(Some(Err(e))); } - Poll::Ready(None) => { - // Spill stream ended, keep draining the memory channel - self.state = StreamState::ReadingMemory; - } - Poll::Pending => { - // Spilled batch not ready yet, must wait - // This preserves ordering by blocking until spill data arrives - return Poll::Pending; - } + Poll::Ready(None) => self.state = StreamState::ReadingMemory, + Poll::Pending => return Poll::Pending, } } } diff --git a/report.md b/report.md new file mode 100644 index 0000000000000..7173997421413 --- /dev/null +++ b/report.md @@ -0,0 +1,134 @@ +# Dynamic Filter Deadlock Investigation + +## Reproducer + +Original failing query was TPC-H Q18 at SF1, executed through `datafusion-cli` with: + +```bash +DATAFUSION_EXECUTION_TARGET_PARTITIONS=24 ./target/release/datafusion-cli -f /tmp/tpch_q18.sql +``` + +On `main`, this hung consistently. + +## Initial theories that turned out incomplete + +### Repartition EOF delivery was blocked by backpressure + +The first working theory was: + +1. some final hash-join build partitions were empty, +2. `RepartitionExec` sent end-of-stream (`None`) through the same globally gated channels as data, +3. the gate could remain closed long enough that EOF to empty partitions never arrived, +4. those build partitions never completed, +5. sibling hash-join partitions blocked forever waiting on the dynamic-filter barrier. + +This explained some symptoms but was not the full root cause. + +### Receiver-side empty-and-closed race in distributor channels + +A smaller hardening change was tried in `RecvFuture::poll`: + +- if the channel is empty and `n_senders == 0`, return `None` immediately + +This was plausible but did **not** fix the original Q18 reproducer on clean `main`. + +## What the tracing showed + +The useful traces were on: + +- `RepartitionExec::wait_for_task` +- `PerPartitionStream` +- `DistributionReceiver::drop` +- `HashJoinExec::execute` +- `HashJoinStream::collect_build_side` +- `HashJoinStream::drop` +- `OnceFut::get_shared` + +The critical observations were: + +1. The stuck final join was: + - `HashJoinExec: mode=Partitioned, join_type=Inner, on=[(o_orderkey@2, l_orderkey@0)]` +2. The empty / problematic build partitions were consistently: + - `2`, `14`, `21` +3. Those final-inner-join partitions were **created**, but they were dropped in `WaitBuildSide` before ever reaching `COLLECT_BUILD_ENTER`. +4. The parent operator above that join was a partitioned `RightSemi` join. +5. For parent partitions `2`, `14`, `21`, the `RightSemi` build side was empty: + - `rows=0`, `map_empty=true` +6. `RightSemi` is one of the join types where an empty build side produces an empty result immediately. +7. So those parent `RightSemi` partitions legally transitioned to `Completed` and stopped polling their child final-inner-join partitions. +8. Because the child final-inner-join partitions were never polled, their lazy `left_fut` never ran, so they never reported build data to `SharedBuildAccumulator`. +9. Meanwhile, sibling final-inner-join partitions did run, reported build data, and then blocked in `WaitPartitionBoundsReport` forever, because the barrier still expected all 24 partition reports. + +## Actual root cause + +The deadlock was a conflict between: + +- **lazy build-side collection/reporting** in partitioned hash join, and +- **legal early cancellation** by an upstream parent operator. + +More concretely: + +1. In `PartitionMode::Partitioned`, build-side collection lives behind a per-partition `OnceFut`. +2. Dynamic-filter reporting also happens only after that future is polled and finishes. +3. `SharedBuildAccumulator` expects one report from every partition. +4. But an upstream partitioned `RightSemi` join can legitimately stop polling some child partitions when its own build side is empty. +5. Those canceled child partitions never report "I am empty", yet the barrier still waits for them. + +So the bug was **not** fundamentally in `RepartitionExec`. + +`RepartitionExec` exposed the problem, but the actual broken invariant was: + +> partitioned hash-join dynamic filter coordination assumed every partition stream would be polled far enough to report build data. + +That assumption is false. + +## Desired behavior + +The correct semantics are: + +- dynamic filter finalization should happen only after the **entire build side** is known, +- and "entire build side" includes partitions that are empty. + +Therefore, an empty build partition must still contribute: + +- its partition id, +- empty membership / empty bounds information, +- and its completion signal for the barrier. + +## Proposed fix + +For `PartitionMode::Partitioned` when dynamic-filter pushdown is enabled: + +- start build-side collection eagerly at `HashJoinExec::execute()` time, +- do not wait for the child `HashJoinStream` to be polled, +- once build-side collection completes, report `PartitionBuildData::Partitioned` immediately, +- including the case where the partition is empty, +- then let the shared future resolve to the collected `JoinLeftData`. + +With that change: + +- empty partitions still report, +- the barrier completes, +- active sibling partitions stop hanging, +- and a parent operator is still free to cancel the child stream later. + +## Validation so far + +The eager-report fix: + +- made the original Q18 reproducer complete successfully, +- passed the broad `hash_join` test slice, +- passed the existing empty-build dynamic-filter tests, +- and survived a large `cargo test` run aside from an unrelated CLI snapshot mismatch. + +## Regression test shape + +The right regression test is one that captures this exact cancellation pattern: + +1. a partitioned child inner hash join with dynamic-filter pushdown enabled, +2. empty build partitions in some output partitions, +3. a partitioned parent `RightSemi` join above it, +4. parent partitions that legally complete early on those empty partitions, +5. assertion that execution completes instead of hanging. + +The test should fail on `main` by timing out or hanging, and pass with the eager-report fix. From aa627664acb84f9302f08ac10dbb33b779904028 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 16 Apr 2026 01:15:22 -0500 Subject: [PATCH 02/11] Prepare hash join dynamic filter fix for review --- .../physical-plan/src/joins/hash_join/exec.rs | 5 +- .../physical-plan/src/repartition/mod.rs | 22 ++- report.md | 134 ------------------ 3 files changed, 22 insertions(+), 139 deletions(-) delete mode 100644 report.md diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index c31f0f64d1de2..092fe6129a4fa 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -5732,7 +5732,10 @@ mod tests { async fn test_partitioned_dynamic_filter_reports_empty_canceled_partitions() -> Result<()> { let mut session_config = SessionConfig::default(); - session_config.options_mut().optimizer.enable_dynamic_filter_pushdown = true; + session_config + .options_mut() + .optimizer + .enable_dynamic_filter_pushdown = true; let task_ctx = Arc::new(TaskContext::default().with_session_config(session_config)); diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index aef451ea3992d..d4406360504f9 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -983,6 +983,7 @@ impl ExecutionPlan for RepartitionExec { spill_metrics, input.schema(), ); + // Get existing ordering to use for merging let sort_exprs = self.sort_exprs().cloned(); @@ -1627,7 +1628,10 @@ impl PerPartitionStream { // Poll the memory channel for next message let value = match self.receiver.recv().poll_unpin(cx) { Poll::Ready(v) => v, - Poll::Pending => return Poll::Pending, + Poll::Pending => { + // Nothing from channel, wait + return Poll::Pending; + } }; match value { @@ -1660,7 +1664,10 @@ impl PerPartitionStream { // Continue to poll for more data from other partitions continue; } - None => return Poll::Ready(None), + None => { + // Channel closed unexpectedly + return Poll::Ready(None); + } } } StreamState::ReadingSpilled => { @@ -1673,8 +1680,15 @@ impl PerPartitionStream { Poll::Ready(Some(Err(e))) => { return Poll::Ready(Some(Err(e))); } - Poll::Ready(None) => self.state = StreamState::ReadingMemory, - Poll::Pending => return Poll::Pending, + Poll::Ready(None) => { + // Spill stream ended, keep draining the memory channel + self.state = StreamState::ReadingMemory; + } + Poll::Pending => { + // Spilled batch not ready yet, must wait + // This preserves ordering by blocking until spill data arrives + return Poll::Pending; + } } } } diff --git a/report.md b/report.md deleted file mode 100644 index 7173997421413..0000000000000 --- a/report.md +++ /dev/null @@ -1,134 +0,0 @@ -# Dynamic Filter Deadlock Investigation - -## Reproducer - -Original failing query was TPC-H Q18 at SF1, executed through `datafusion-cli` with: - -```bash -DATAFUSION_EXECUTION_TARGET_PARTITIONS=24 ./target/release/datafusion-cli -f /tmp/tpch_q18.sql -``` - -On `main`, this hung consistently. - -## Initial theories that turned out incomplete - -### Repartition EOF delivery was blocked by backpressure - -The first working theory was: - -1. some final hash-join build partitions were empty, -2. `RepartitionExec` sent end-of-stream (`None`) through the same globally gated channels as data, -3. the gate could remain closed long enough that EOF to empty partitions never arrived, -4. those build partitions never completed, -5. sibling hash-join partitions blocked forever waiting on the dynamic-filter barrier. - -This explained some symptoms but was not the full root cause. - -### Receiver-side empty-and-closed race in distributor channels - -A smaller hardening change was tried in `RecvFuture::poll`: - -- if the channel is empty and `n_senders == 0`, return `None` immediately - -This was plausible but did **not** fix the original Q18 reproducer on clean `main`. - -## What the tracing showed - -The useful traces were on: - -- `RepartitionExec::wait_for_task` -- `PerPartitionStream` -- `DistributionReceiver::drop` -- `HashJoinExec::execute` -- `HashJoinStream::collect_build_side` -- `HashJoinStream::drop` -- `OnceFut::get_shared` - -The critical observations were: - -1. The stuck final join was: - - `HashJoinExec: mode=Partitioned, join_type=Inner, on=[(o_orderkey@2, l_orderkey@0)]` -2. The empty / problematic build partitions were consistently: - - `2`, `14`, `21` -3. Those final-inner-join partitions were **created**, but they were dropped in `WaitBuildSide` before ever reaching `COLLECT_BUILD_ENTER`. -4. The parent operator above that join was a partitioned `RightSemi` join. -5. For parent partitions `2`, `14`, `21`, the `RightSemi` build side was empty: - - `rows=0`, `map_empty=true` -6. `RightSemi` is one of the join types where an empty build side produces an empty result immediately. -7. So those parent `RightSemi` partitions legally transitioned to `Completed` and stopped polling their child final-inner-join partitions. -8. Because the child final-inner-join partitions were never polled, their lazy `left_fut` never ran, so they never reported build data to `SharedBuildAccumulator`. -9. Meanwhile, sibling final-inner-join partitions did run, reported build data, and then blocked in `WaitPartitionBoundsReport` forever, because the barrier still expected all 24 partition reports. - -## Actual root cause - -The deadlock was a conflict between: - -- **lazy build-side collection/reporting** in partitioned hash join, and -- **legal early cancellation** by an upstream parent operator. - -More concretely: - -1. In `PartitionMode::Partitioned`, build-side collection lives behind a per-partition `OnceFut`. -2. Dynamic-filter reporting also happens only after that future is polled and finishes. -3. `SharedBuildAccumulator` expects one report from every partition. -4. But an upstream partitioned `RightSemi` join can legitimately stop polling some child partitions when its own build side is empty. -5. Those canceled child partitions never report "I am empty", yet the barrier still waits for them. - -So the bug was **not** fundamentally in `RepartitionExec`. - -`RepartitionExec` exposed the problem, but the actual broken invariant was: - -> partitioned hash-join dynamic filter coordination assumed every partition stream would be polled far enough to report build data. - -That assumption is false. - -## Desired behavior - -The correct semantics are: - -- dynamic filter finalization should happen only after the **entire build side** is known, -- and "entire build side" includes partitions that are empty. - -Therefore, an empty build partition must still contribute: - -- its partition id, -- empty membership / empty bounds information, -- and its completion signal for the barrier. - -## Proposed fix - -For `PartitionMode::Partitioned` when dynamic-filter pushdown is enabled: - -- start build-side collection eagerly at `HashJoinExec::execute()` time, -- do not wait for the child `HashJoinStream` to be polled, -- once build-side collection completes, report `PartitionBuildData::Partitioned` immediately, -- including the case where the partition is empty, -- then let the shared future resolve to the collected `JoinLeftData`. - -With that change: - -- empty partitions still report, -- the barrier completes, -- active sibling partitions stop hanging, -- and a parent operator is still free to cancel the child stream later. - -## Validation so far - -The eager-report fix: - -- made the original Q18 reproducer complete successfully, -- passed the broad `hash_join` test slice, -- passed the existing empty-build dynamic-filter tests, -- and survived a large `cargo test` run aside from an unrelated CLI snapshot mismatch. - -## Regression test shape - -The right regression test is one that captures this exact cancellation pattern: - -1. a partitioned child inner hash join with dynamic-filter pushdown enabled, -2. empty build partitions in some output partitions, -3. a partitioned parent `RightSemi` join above it, -4. parent partitions that legally complete early on those empty partitions, -5. assertion that execution completes instead of hanging. - -The test should fail on `main` by timing out or hanging, and pass with the eager-report fix. From 57c96adca92366124af4bcbe0fcf9690ea0cea26 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 16 Apr 2026 01:25:54 -0500 Subject: [PATCH 03/11] Use SpawnedTask for eager hash join build reporting --- .../physical-plan/src/joins/hash_join/exec.rs | 51 +++++++++++++------ .../src/joins/hash_join/shared_bounds.rs | 8 +++ 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 092fe6129a4fa..bc391cfe38043 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -79,6 +79,7 @@ use datafusion_common::{ DataFusionError, JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, internal_err, plan_err, project_schema, }; +use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_expr::Accumulator; @@ -95,6 +96,7 @@ use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; use futures::TryStreamExt; use parking_lot::Mutex; +use tokio::sync::oneshot; use super::partitioned_hash_eval::SeededRandomState; @@ -1374,25 +1376,42 @@ impl ExecutionPlan for HashJoinExec { let build_accumulator = build_accumulator .as_ref() .map(|acc| (Arc::clone(acc), partition)); + let random_state = self.random_state.random_state().clone(); + let with_visited_indices_bitmap = + need_produce_result_in_final(self.join_type); + let config = Arc::clone(context.session_config().options()); + let null_equality = self.null_equality; + let background_join_metrics = join_metrics.clone(); if build_accumulator.is_some() { - let task = tokio::task::spawn(collect_left_input_and_maybe_report( - self.random_state.random_state().clone(), - left_stream, - on_left.clone(), - join_metrics.clone(), - reservation, - need_produce_result_in_final(self.join_type), - 1, - enable_dynamic_filter_pushdown, - Arc::clone(context.session_config().options()), - self.null_equality, - array_map_created_count, - build_accumulator, - )); + let (tx, rx) = oneshot::channel(); + let (build_accumulator, partition) = + build_accumulator.expect("checked is_some above"); + let background_build_accumulator = Arc::clone(&build_accumulator); + let task = SpawnedTask::spawn(async move { + let result = collect_left_input_and_maybe_report( + random_state, + left_stream, + on_left.clone(), + background_join_metrics, + reservation, + with_visited_indices_bitmap, + 1, + enable_dynamic_filter_pushdown, + config, + null_equality, + array_map_created_count, + Some((background_build_accumulator, partition)), + ) + .await; + let _ = tx.send(result); + }); + build_accumulator.register_background_task(task); OnceFut::new(async move { - task.await.map_err(|err| { - DataFusionError::ExecutionJoin(Box::new(err)) + rx.await.map_err(|err| { + DataFusionError::Execution(format!( + "hash join build task ended before sending its result: {err}" + )) })? }) } else { diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index f32dc7fa80268..c259c7ab797da 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -34,6 +34,7 @@ use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; use datafusion_common::{Result, ScalarValue}; +use datafusion_common_runtime::SpawnedTask; use datafusion_expr::Operator; use datafusion_functions::core::r#struct as struct_func; use datafusion_physical_expr::expressions::{ @@ -216,6 +217,8 @@ fn create_bounds_predicate( pub(crate) struct SharedBuildAccumulator { /// Build-side data protected by a single mutex to avoid ordering concerns inner: Mutex, + /// Keeps background build/report tasks alive until all partitions have reported. + background_tasks: Mutex>>, barrier: Barrier, /// Dynamic filter for pushdown to probe side dynamic_filter: Arc, @@ -337,6 +340,7 @@ impl SharedBuildAccumulator { Self { inner: Mutex::new(mode_data), + background_tasks: Mutex::new(vec![]), barrier: Barrier::new(expected_calls), dynamic_filter, on_right, @@ -345,6 +349,10 @@ impl SharedBuildAccumulator { } } + pub(crate) fn register_background_task(&self, task: SpawnedTask<()>) { + self.background_tasks.lock().push(task); + } + /// Report build-side data from a partition /// /// This unified method handles both CollectLeft and Partitioned modes. When all partitions From d17d5e46a45d6fa62d3b2c7a2959cba662fb0e01 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 16 Apr 2026 06:24:18 -0500 Subject: [PATCH 04/11] lint --- datafusion/physical-plan/src/joins/hash_join/exec.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index bc391cfe38043..b4a62b251f461 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1383,10 +1383,8 @@ impl ExecutionPlan for HashJoinExec { let null_equality = self.null_equality; let background_join_metrics = join_metrics.clone(); - if build_accumulator.is_some() { + if let Some((build_accumulator, partition)) = build_accumulator { let (tx, rx) = oneshot::channel(); - let (build_accumulator, partition) = - build_accumulator.expect("checked is_some above"); let background_build_accumulator = Arc::clone(&build_accumulator); let task = SpawnedTask::spawn(async move { let result = collect_left_input_and_maybe_report( From 70bb8b4608076a09804d7ff12cf0cdf25a3942c2 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 16 Apr 2026 08:36:54 -0500 Subject: [PATCH 05/11] Handle canceled partitioned hash join filters lazily --- .../physical-plan/src/joins/hash_join/exec.rs | 124 +---- .../src/joins/hash_join/shared_bounds.rs | 485 ++++++++++-------- .../src/joins/hash_join/stream.rs | 16 + 3 files changed, 301 insertions(+), 324 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index b4a62b251f461..2de7e61d40da4 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -35,8 +35,7 @@ use crate::joins::Map; use crate::joins::array_map::ArrayMap; use crate::joins::hash_join::inlist_builder::build_struct_inlist_values; use crate::joins::hash_join::shared_bounds::{ - ColumnBounds, PartitionBounds, PartitionBuildData, PushdownStrategy, - SharedBuildAccumulator, + ColumnBounds, PartitionBounds, PushdownStrategy, SharedBuildAccumulator, }; use crate::joins::hash_join::stream::{ BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState, @@ -76,10 +75,9 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ - DataFusionError, JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, - internal_err, plan_err, project_schema, + JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, internal_err, + plan_err, project_schema, }; -use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_expr::Accumulator; @@ -96,7 +94,6 @@ use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays; use futures::TryStreamExt; use parking_lot::Mutex; -use tokio::sync::oneshot; use super::partitioned_hash_eval::SeededRandomState; @@ -1373,60 +1370,19 @@ impl ExecutionPlan for HashJoinExec { let reservation = MemoryConsumer::new(format!("HashJoinInput[{partition}]")) .register(context.memory_pool()); - let build_accumulator = build_accumulator - .as_ref() - .map(|acc| (Arc::clone(acc), partition)); - let random_state = self.random_state.random_state().clone(); - let with_visited_indices_bitmap = - need_produce_result_in_final(self.join_type); - let config = Arc::clone(context.session_config().options()); - let null_equality = self.null_equality; - let background_join_metrics = join_metrics.clone(); - - if let Some((build_accumulator, partition)) = build_accumulator { - let (tx, rx) = oneshot::channel(); - let background_build_accumulator = Arc::clone(&build_accumulator); - let task = SpawnedTask::spawn(async move { - let result = collect_left_input_and_maybe_report( - random_state, - left_stream, - on_left.clone(), - background_join_metrics, - reservation, - with_visited_indices_bitmap, - 1, - enable_dynamic_filter_pushdown, - config, - null_equality, - array_map_created_count, - Some((background_build_accumulator, partition)), - ) - .await; - let _ = tx.send(result); - }); - build_accumulator.register_background_task(task); - OnceFut::new(async move { - rx.await.map_err(|err| { - DataFusionError::Execution(format!( - "hash join build task ended before sending its result: {err}" - )) - })? - }) - } else { - OnceFut::new(collect_left_input( - self.random_state.random_state().clone(), - left_stream, - on_left.clone(), - join_metrics.clone(), - reservation, - need_produce_result_in_final(self.join_type), - 1, - enable_dynamic_filter_pushdown, - Arc::clone(context.session_config().options()), - self.null_equality, - array_map_created_count, - )) - } + OnceFut::new(collect_left_input( + self.random_state.random_state().clone(), + left_stream, + on_left.clone(), + join_metrics.clone(), + reservation, + need_produce_result_in_final(self.join_type), + 1, + enable_dynamic_filter_pushdown, + Arc::clone(context.session_config().options()), + self.null_equality, + array_map_created_count, + )) } PartitionMode::Auto => { return plan_err!( @@ -1457,8 +1413,7 @@ impl ExecutionPlan for HashJoinExec { .map(|(_, right_expr)| Arc::clone(right_expr)) .collect::>(); let stream_build_accumulator = match self.mode { - PartitionMode::Partitioned => None, - PartitionMode::CollectLeft => build_accumulator, + PartitionMode::Partitioned | PartitionMode::CollectLeft => build_accumulator, PartitionMode::Auto => unreachable!( "PartitionMode::Auto should not be present at execution time" ), @@ -2141,51 +2096,6 @@ async fn collect_left_input( Ok(data) } -#[expect(clippy::too_many_arguments)] -async fn collect_left_input_and_maybe_report( - random_state: RandomState, - left_stream: SendableRecordBatchStream, - on_left: Vec, - metrics: BuildProbeJoinMetrics, - reservation: MemoryReservation, - with_visited_indices_bitmap: bool, - probe_threads_count: usize, - should_compute_dynamic_filters: bool, - config: Arc, - null_equality: NullEquality, - array_map_created_count: Count, - build_accumulator: Option<(Arc, usize)>, -) -> Result { - let left_data = collect_left_input( - random_state, - left_stream, - on_left, - metrics, - reservation, - with_visited_indices_bitmap, - probe_threads_count, - should_compute_dynamic_filters, - config, - null_equality, - array_map_created_count, - ) - .await?; - - if let Some((build_accumulator, partition_id)) = build_accumulator { - let build_data = PartitionBuildData::Partitioned { - partition_id, - pushdown: left_data.membership().clone(), - bounds: left_data - .bounds - .clone() - .unwrap_or_else(|| PartitionBounds::new(vec![])), - }; - build_accumulator.report_build_data(build_data).await?; - } - - Ok(left_data) -} - #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index c259c7ab797da..20e571b2ce1c1 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -33,8 +33,7 @@ use crate::joins::hash_join::partitioned_hash_eval::{ use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; -use datafusion_common::{Result, ScalarValue}; -use datafusion_common_runtime::SpawnedTask; +use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::Operator; use datafusion_functions::core::r#struct as struct_func; use datafusion_physical_expr::expressions::{ @@ -43,7 +42,7 @@ use datafusion_physical_expr::expressions::{ use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr}; use parking_lot::Mutex; -use tokio::sync::Barrier; +use tokio::sync::Notify; /// Represents the minimum and maximum values for a specific column. /// Used in dynamic filter pushdown to establish value boundaries. @@ -216,10 +215,8 @@ fn create_bounds_predicate( /// partition executions. pub(crate) struct SharedBuildAccumulator { /// Build-side data protected by a single mutex to avoid ordering concerns - inner: Mutex, - /// Keeps background build/report tasks alive until all partitions have reported. - background_tasks: Mutex>>, - barrier: Barrier, + inner: Mutex, + completion_notify: Notify, /// Dynamic filter for pushdown to probe side dynamic_filter: Arc, /// Right side join expressions needed for creating filter expressions @@ -265,13 +262,40 @@ struct PartitionData { /// Build-side data organized by partition mode enum AccumulatedBuildData { Partitioned { - partitions: Vec>, + partitions: Vec, + completed_partitions: usize, }, CollectLeft { data: Option, + reported_count: usize, + expected_reports: usize, }, } +enum CompletionState { + Pending, + Finalizing, + Ready(std::result::Result<(), String>), +} + +struct AccumulatorState { + data: AccumulatedBuildData, + completion: CompletionState, +} + +#[derive(Clone)] +enum PartitionStatus { + Pending, + Reported(PartitionData), + CanceledUnknown, +} + +#[derive(Clone)] +enum FinalizeInput { + Partitioned(Vec), + CollectLeft(Option), +} + impl SharedBuildAccumulator { /// Creates a new SharedBuildAccumulator configured for the given partition mode /// @@ -326,22 +350,27 @@ impl SharedBuildAccumulator { let mode_data = match partition_mode { PartitionMode::Partitioned => AccumulatedBuildData::Partitioned { partitions: vec![ - None; + PartitionStatus::Pending; left_child.output_partitioning().partition_count() ], + completed_partitions: 0, + }, + PartitionMode::CollectLeft => AccumulatedBuildData::CollectLeft { + data: None, + reported_count: 0, + expected_reports: expected_calls, }, - PartitionMode::CollectLeft => { - AccumulatedBuildData::CollectLeft { data: None } - } PartitionMode::Auto => unreachable!( "PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!" ), }; Self { - inner: Mutex::new(mode_data), - background_tasks: Mutex::new(vec![]), - barrier: Barrier::new(expected_calls), + inner: Mutex::new(AccumulatorState { + data: mode_data, + completion: CompletionState::Pending, + }), + completion_notify: Notify::new(), dynamic_filter, on_right, repartition_random_state, @@ -349,10 +378,6 @@ impl SharedBuildAccumulator { } } - pub(crate) fn register_background_task(&self, task: SpawnedTask<()>) { - self.background_tasks.lock().push(task); - } - /// Report build-side data from a partition /// /// This unified method handles both CollectLeft and Partitioned modes. When all partitions @@ -366,229 +391,255 @@ impl SharedBuildAccumulator { /// # Returns /// * `Result<()>` - Ok if successful, Err if filter update failed or mode mismatch pub(crate) async fn report_build_data(&self, data: PartitionBuildData) -> Result<()> { - // Store data in the accumulator - { + let finalize_input = { let mut guard = self.inner.lock(); + self.store_build_data(&mut guard, data)?; + self.take_finalize_input_if_ready(&mut guard) + }; - match (data, &mut *guard) { - // Partitioned mode - ( - PartitionBuildData::Partitioned { - partition_id, - pushdown, - bounds, - }, - AccumulatedBuildData::Partitioned { partitions }, - ) => { - partitions[partition_id] = Some(PartitionData { pushdown, bounds }); - } - // CollectLeft mode (store once, deduplicate across partitions) - ( - PartitionBuildData::CollectLeft { pushdown, bounds }, - AccumulatedBuildData::CollectLeft { data }, - ) => { - // Deduplicate - all partitions report the same data in CollectLeft - if data.is_none() { - *data = Some(PartitionData { pushdown, bounds }); - } + if let Some(finalize_input) = finalize_input { + self.finish(finalize_input); + } + + self.wait_for_completion().await + } + + pub(crate) fn report_canceled_partition(&self, partition_id: usize) { + let finalize_input = { + let mut guard = self.inner.lock(); + self.store_canceled_partition(&mut guard, partition_id); + self.take_finalize_input_if_ready(&mut guard) + }; + + if let Some(finalize_input) = finalize_input { + self.finish(finalize_input); + } + } + + fn store_build_data( + &self, + guard: &mut AccumulatorState, + data: PartitionBuildData, + ) -> Result<()> { + match (data, &mut guard.data) { + ( + PartitionBuildData::Partitioned { + partition_id, + pushdown, + bounds, + }, + AccumulatedBuildData::Partitioned { + partitions, + completed_partitions, + }, + ) => { + if matches!(partitions[partition_id], PartitionStatus::Pending) { + *completed_partitions += 1; } - // Mismatched modes - should never happen - _ => { - return datafusion_common::internal_err!( - "Build data mode mismatch in report_build_data" - ); + partitions[partition_id] = + PartitionStatus::Reported(PartitionData { pushdown, bounds }); + } + ( + PartitionBuildData::CollectLeft { pushdown, bounds }, + AccumulatedBuildData::CollectLeft { + data, + reported_count, + .. + }, + ) => { + if data.is_none() { + *data = Some(PartitionData { pushdown, bounds }); } + *reported_count += 1; + } + _ => { + return datafusion_common::internal_err!( + "Build data mode mismatch in report_build_data" + ); } } + Ok(()) + } - // Wait for all partitions to report - if self.barrier.wait().await.is_leader() { - // All partitions have reported, so we can create and update the filter - let inner = self.inner.lock(); - - match &*inner { - // CollectLeft: Simple conjunction of bounds and membership check - AccumulatedBuildData::CollectLeft { data } => { - if let Some(partition_data) = data { - // Create membership predicate (InList for small build sides, hash lookup otherwise) - let membership_expr = create_membership_predicate( - &self.on_right, - partition_data.pushdown.clone(), - &HASH_JOIN_SEED, - self.probe_schema.as_ref(), - )?; - - // Create bounds check expression (if bounds available) - let bounds_expr = create_bounds_predicate( - &self.on_right, - &partition_data.bounds, - ); - - // Combine membership and bounds expressions for multi-layer optimization: - // - Bounds (min/max): Enable statistics-based pruning (Parquet row group/file skipping) - // - Membership (InList/hash lookup): Enables: - // * Precise filtering (exact value matching) - // * Bloom filter utilization (if present in Parquet files) - // * Better pruning for data types where min/max isn't effective (e.g., UUIDs) - // Together, they provide complementary benefits and maximize data skipping. - // Only update the filter if we have something to push down - if let Some(filter_expr) = match (membership_expr, bounds_expr) { - (Some(membership), Some(bounds)) => { - // Both available: combine with AND - Some(Arc::new(BinaryExpr::new( - bounds, - Operator::And, - membership, - )) - as Arc) - } - (Some(membership), None) => { - // Membership available but no bounds - // This is reachable when we have data but bounds aren't available - // (e.g., unsupported data types or no columns with bounds) - Some(membership) - } - (None, Some(bounds)) => { - // Bounds available but no membership. - // This should be unreachable in practice: we can always push down a reference - // to the hash table. - // But it seems safer to handle it defensively. - Some(bounds) - } - (None, None) => { - // No filter available (e.g., empty build side) - // Don't update the filter, but continue to mark complete - None - } - } { - self.dynamic_filter.update(filter_expr)?; - } + fn store_canceled_partition( + &self, + guard: &mut AccumulatorState, + partition_id: usize, + ) { + if let AccumulatedBuildData::Partitioned { + partitions, + completed_partitions, + } = &mut guard.data + && matches!(partitions[partition_id], PartitionStatus::Pending) + { + partitions[partition_id] = PartitionStatus::CanceledUnknown; + *completed_partitions += 1; + } + } + + fn take_finalize_input_if_ready( + &self, + guard: &mut AccumulatorState, + ) -> Option { + if !matches!(guard.completion, CompletionState::Pending) { + return None; + } + + let finalize_input = match &guard.data { + AccumulatedBuildData::Partitioned { + partitions, + completed_partitions, + } if *completed_partitions == partitions.len() => { + Some(FinalizeInput::Partitioned(partitions.clone())) + } + AccumulatedBuildData::CollectLeft { + data, + reported_count, + expected_reports, + } if *reported_count == *expected_reports => { + Some(FinalizeInput::CollectLeft(data.clone())) + } + _ => None, + }?; + + guard.completion = CompletionState::Finalizing; + Some(finalize_input) + } + + fn finish(&self, finalize_input: FinalizeInput) { + let result = self + .build_filter(finalize_input) + .map_err(|err| err.to_string()); + self.dynamic_filter.mark_complete(); + + let mut guard = self.inner.lock(); + guard.completion = CompletionState::Ready(result); + drop(guard); + self.completion_notify.notify_waiters(); + } + + async fn wait_for_completion(&self) -> Result<()> { + loop { + let notified = { + let guard = self.inner.lock(); + match &guard.completion { + CompletionState::Ready(Ok(())) => return Ok(()), + CompletionState::Ready(Err(err)) => { + return Err(DataFusionError::Execution(err.clone())); + } + CompletionState::Pending | CompletionState::Finalizing => { + self.completion_notify.notified() } } - // Partitioned: CASE expression routing to per-partition filters - AccumulatedBuildData::Partitioned { partitions } => { - // Collect all partition data (should all be Some at this point) - let partition_data: Vec<_> = - partitions.iter().filter_map(|p| p.as_ref()).collect(); - - if !partition_data.is_empty() { - // Build a CASE expression that combines range checks AND membership checks - // CASE (hash_repartition(join_keys) % num_partitions) - // WHEN 0 THEN (col >= min_0 AND col <= max_0 AND ...) AND membership_check_0 - // WHEN 1 THEN (col >= min_1 AND col <= max_1 AND ...) AND membership_check_1 - // ... - // ELSE false - // END - - let num_partitions = partition_data.len(); - - // Create base expression: hash_repartition(join_keys) % num_partitions - let routing_hash_expr = Arc::new(HashExpr::new( - self.on_right.clone(), - self.repartition_random_state.clone(), - "hash_repartition".to_string(), - )) - as Arc; + }; + notified.await; + } + } - let modulo_expr = Arc::new(BinaryExpr::new( - routing_hash_expr, - Operator::Modulo, - lit(ScalarValue::UInt64(Some(num_partitions as u64))), - )) - as Arc; - - // Create WHEN branches for each partition - let when_then_branches: Vec<( - Arc, - Arc, - )> = partitions - .iter() - .enumerate() - .filter_map(|(partition_id, partition_opt)| { - partition_opt.as_ref().and_then(|partition| { - // Skip empty partitions - they would always return false anyway - match &partition.pushdown { - PushdownStrategy::Empty => None, - _ => Some((partition_id, partition)), - } - }) - }) - .map(|(partition_id, partition)| -> Result<_> { - // WHEN partition_id - let when_expr = - lit(ScalarValue::UInt64(Some(partition_id as u64))); - - // THEN: Combine bounds check AND membership predicate - - // 1. Create membership predicate (InList for small build sides, hash lookup otherwise) + fn build_filter(&self, finalize_input: FinalizeInput) -> Result<()> { + match finalize_input { + FinalizeInput::CollectLeft(data) => { + if let Some(partition_data) = data { + let membership_expr = create_membership_predicate( + &self.on_right, + partition_data.pushdown.clone(), + &HASH_JOIN_SEED, + self.probe_schema.as_ref(), + )?; + let bounds_expr = + create_bounds_predicate(&self.on_right, &partition_data.bounds); + + if let Some(filter_expr) = match (membership_expr, bounds_expr) { + (Some(membership), Some(bounds)) => Some(Arc::new( + BinaryExpr::new(bounds, Operator::And, membership), + ) + as Arc), + (Some(membership), None) => Some(membership), + (None, Some(bounds)) => Some(bounds), + (None, None) => None, + } { + self.dynamic_filter.update(filter_expr)?; + } + } + } + FinalizeInput::Partitioned(partitions) => { + let num_partitions = partitions.len(); + let routing_hash_expr = Arc::new(HashExpr::new( + self.on_right.clone(), + self.repartition_random_state.clone(), + "hash_repartition".to_string(), + )) as Arc; + + let modulo_expr = Arc::new(BinaryExpr::new( + routing_hash_expr, + Operator::Modulo, + lit(ScalarValue::UInt64(Some(num_partitions as u64))), + )) as Arc; + + let when_then_branches = partitions + .iter() + .enumerate() + .map(|(partition_id, partition)| -> Result<_> { + let then_expr = match partition { + PartitionStatus::Reported(partition) + if matches!( + partition.pushdown, + PushdownStrategy::Empty + ) => + { + lit(false) + } + PartitionStatus::Reported(partition) => { let membership_expr = create_membership_predicate( &self.on_right, partition.pushdown.clone(), &HASH_JOIN_SEED, self.probe_schema.as_ref(), )?; - - // 2. Create bounds check expression for this partition (if bounds available) let bounds_expr = create_bounds_predicate( &self.on_right, &partition.bounds, ); - - // 3. Combine membership and bounds expressions - let then_expr = match (membership_expr, bounds_expr) { - (Some(membership), Some(bounds)) => { - // Both available: combine with AND - Arc::new(BinaryExpr::new( + match (membership_expr, bounds_expr) { + (Some(membership), Some(bounds)) => Arc::new( + BinaryExpr::new( bounds, Operator::And, membership, - )) - as Arc - } - (Some(membership), None) => { - // Membership available but no bounds (e.g., unsupported data types) - membership - } - (None, Some(bounds)) => { - // Bounds available but no membership. - // This should be unreachable in practice: we can always push down a reference - // to the hash table. - // But it seems safer to handle it defensively. - bounds - } - (None, None) => { - // No filter for this partition - should not happen due to filter_map above - // but handle defensively by returning a "true" literal - lit(true) - } - }; - - Ok((when_expr, then_expr)) - }) - .collect::>>()?; - - // Optimize for single partition: skip CASE expression entirely - let filter_expr = if when_then_branches.is_empty() { - // All partitions are empty: no rows can match - lit(false) - } else if when_then_branches.len() == 1 { - // Single partition: just use the condition directly - // since hash % 1 == 0 always, the WHEN 0 branch will always match - Arc::clone(&when_then_branches[0].1) - } else { - // Multiple partitions: create CASE expression - Arc::new(CaseExpr::try_new( - Some(modulo_expr), - when_then_branches, - Some(lit(false)), // ELSE false - )?) as Arc + ), + ) + as Arc, + (Some(membership), None) => membership, + (None, Some(bounds)) => bounds, + (None, None) => lit(true), + } + } + PartitionStatus::CanceledUnknown => lit(true), + PartitionStatus::Pending => { + return datafusion_common::internal_err!( + "attempted to finalize dynamic filter with pending partition" + ) + } }; - - self.dynamic_filter.update(filter_expr)?; - } - } + Ok(( + lit(ScalarValue::UInt64(Some(partition_id as u64))), + then_expr, + )) + }) + .collect::>>()?; + + let filter_expr = if when_then_branches.len() == 1 { + Arc::clone(&when_then_branches[0].1) + } else { + Arc::new(CaseExpr::try_new( + Some(modulo_expr), + when_then_branches, + Some(lit(true)), + )?) as Arc + }; + + self.dynamic_filter.update(filter_expr)?; } - self.dynamic_filter.mark_complete(); } Ok(()) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 1004fba3d4f45..66a69a2eec1ab 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -219,6 +219,8 @@ pub(super) struct HashJoinStream { /// Optional future to signal when build information has been reported by all partitions /// and the dynamic filter has been updated build_waiter: Option>, + /// Tracks whether this partition has already reported build information to the coordinator. + build_reported: bool, /// Partitioning mode to use mode: PartitionMode, /// Output buffer for coalescing small batches into larger ones with optional fetch limit. @@ -400,6 +402,7 @@ impl HashJoinStream { right_side_ordered, build_accumulator, build_waiter: None, + build_reported: false, mode, output_buffer, null_aware, @@ -555,6 +558,7 @@ impl HashJoinStream { self.build_waiter = Some(OnceFut::new(async move { build_accumulator.report_build_data(build_data).await })); + self.build_reported = true; self.state = HashJoinStreamState::WaitPartitionBoundsReport; } else { self.state = @@ -947,3 +951,15 @@ impl Stream for HashJoinStream { self.poll_next_impl(cx) } } + +impl Drop for HashJoinStream { + fn drop(&mut self) { + if self.mode == PartitionMode::Partitioned + && !self.build_reported + && let Some(build_accumulator) = &self.build_accumulator + { + build_accumulator.report_canceled_partition(self.partition); + self.build_reported = true; + } + } +} From 92a4adb10ec8c50be440b59a2d3fb6f20a56a218 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 16 Apr 2026 08:53:55 -0500 Subject: [PATCH 06/11] Minimize hash join dynamic filter diff --- .../physical-plan/src/joins/hash_join/exec.rs | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 2de7e61d40da4..6be69a2beeb4a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1319,7 +1319,7 @@ impl ExecutionPlan for HashJoinExec { // Initialize build_accumulator lazily with runtime partition counts (only if enabled) // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing let repartition_random_state = REPARTITION_RANDOM_STATE; - let on_right = self + let on_right_exprs = self .on .iter() .map(|(_, right_expr)| Arc::clone(right_expr)) @@ -1334,7 +1334,7 @@ impl ExecutionPlan for HashJoinExec { self.left.as_ref(), self.right.as_ref(), filter, - on_right.clone(), + on_right_exprs.clone(), repartition_random_state, )) }))) @@ -1407,22 +1407,10 @@ impl ExecutionPlan for HashJoinExec { None => self.column_indices.clone(), }; - let on_right = self - .on - .iter() - .map(|(_, right_expr)| Arc::clone(right_expr)) - .collect::>(); - let stream_build_accumulator = match self.mode { - PartitionMode::Partitioned | PartitionMode::CollectLeft => build_accumulator, - PartitionMode::Auto => unreachable!( - "PartitionMode::Auto should not be present at execution time" - ), - }; - Ok(Box::pin(HashJoinStream::new( partition, self.schema(), - on_right, + on_right_exprs, self.filter.clone(), self.join_type, right_stream, @@ -1435,7 +1423,7 @@ impl ExecutionPlan for HashJoinExec { batch_size, vec![], self.right.output_ordering().is_some(), - stream_build_accumulator, + build_accumulator, self.mode, self.null_aware, self.fetch, From 24875184df19584da65f795ade2a4944d6886db7 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 16 Apr 2026 09:13:10 -0500 Subject: [PATCH 07/11] Preserve compact dynamic filter plan shapes --- .../src/joins/hash_join/shared_bounds.rs | 128 ++++++++++-------- 1 file changed, 73 insertions(+), 55 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 20e571b2ce1c1..b79172ce7ad16 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -576,65 +576,83 @@ impl SharedBuildAccumulator { lit(ScalarValue::UInt64(Some(num_partitions as u64))), )) as Arc; - let when_then_branches = partitions - .iter() - .enumerate() - .map(|(partition_id, partition)| -> Result<_> { - let then_expr = match partition { - PartitionStatus::Reported(partition) - if matches!( - partition.pushdown, - PushdownStrategy::Empty - ) => - { - lit(false) - } - PartitionStatus::Reported(partition) => { - let membership_expr = create_membership_predicate( - &self.on_right, - partition.pushdown.clone(), - &HASH_JOIN_SEED, - self.probe_schema.as_ref(), - )?; - let bounds_expr = create_bounds_predicate( - &self.on_right, - &partition.bounds, - ); - match (membership_expr, bounds_expr) { - (Some(membership), Some(bounds)) => Arc::new( - BinaryExpr::new( - bounds, - Operator::And, - membership, - ), - ) - as Arc, - (Some(membership), None) => membership, - (None, Some(bounds)) => bounds, - (None, None) => lit(true), - } - } - PartitionStatus::CanceledUnknown => lit(true), - PartitionStatus::Pending => { - return datafusion_common::internal_err!( - "attempted to finalize dynamic filter with pending partition" + let mut real_branches = Vec::new(); + let mut empty_partition_ids = Vec::new(); + let mut has_canceled_unknown = false; + + for (partition_id, partition) in partitions.iter().enumerate() { + match partition { + PartitionStatus::Reported(partition) + if matches!(partition.pushdown, PushdownStrategy::Empty) => + { + empty_partition_ids.push(partition_id); + } + PartitionStatus::Reported(partition) => { + let membership_expr = create_membership_predicate( + &self.on_right, + partition.pushdown.clone(), + &HASH_JOIN_SEED, + self.probe_schema.as_ref(), + )?; + let bounds_expr = create_bounds_predicate( + &self.on_right, + &partition.bounds, + ); + let then_expr = match (membership_expr, bounds_expr) { + (Some(membership), Some(bounds)) => Arc::new( + BinaryExpr::new(bounds, Operator::And, membership), ) - } - }; - Ok(( - lit(ScalarValue::UInt64(Some(partition_id as u64))), - then_expr, - )) - }) - .collect::>>()?; - - let filter_expr = if when_then_branches.len() == 1 { - Arc::clone(&when_then_branches[0].1) + as Arc, + (Some(membership), None) => membership, + (None, Some(bounds)) => bounds, + (None, None) => lit(true), + }; + real_branches.push(( + lit(ScalarValue::UInt64(Some(partition_id as u64))), + then_expr, + )); + } + PartitionStatus::CanceledUnknown => { + has_canceled_unknown = true; + } + PartitionStatus::Pending => { + return datafusion_common::internal_err!( + "attempted to finalize dynamic filter with pending partition" + ); + } + } + } + + let filter_expr = if has_canceled_unknown { + let mut when_then_branches = empty_partition_ids + .into_iter() + .map(|partition_id| { + ( + lit(ScalarValue::UInt64(Some(partition_id as u64))), + lit(false), + ) + }) + .collect::>(); + when_then_branches.extend(real_branches); + + if when_then_branches.is_empty() { + lit(true) + } else { + Arc::new(CaseExpr::try_new( + Some(modulo_expr), + when_then_branches, + Some(lit(true)), + )?) as Arc + } + } else if real_branches.is_empty() { + lit(false) + } else if num_partitions == 1 { + Arc::clone(&real_branches[0].1) } else { Arc::new(CaseExpr::try_new( Some(modulo_expr), - when_then_branches, - Some(lit(true)), + real_branches, + Some(lit(false)), )?) as Arc }; From 0584854263a02d0ea2c0cd78cc25d7b2cf5b1e1f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 16 Apr 2026 09:41:22 -0500 Subject: [PATCH 08/11] Restore single-branch dynamic filter collapse --- datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index b79172ce7ad16..e8e58d9776425 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -646,7 +646,9 @@ impl SharedBuildAccumulator { } } else if real_branches.is_empty() { lit(false) - } else if num_partitions == 1 { + } else if real_branches.len() == 1 + && empty_partition_ids.len() + 1 == num_partitions + { Arc::clone(&real_branches[0].1) } else { Arc::new(CaseExpr::try_new( From 8740ca2a3dd3cec88be34dea9b4ab4d8a280269d Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 16 Apr 2026 09:46:04 -0500 Subject: [PATCH 09/11] Unify dynamic filter finalize terminal states --- .../src/joins/hash_join/shared_bounds.rs | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index e8e58d9776425..3e3c78a08d7ad 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -266,7 +266,7 @@ enum AccumulatedBuildData { completed_partitions: usize, }, CollectLeft { - data: Option, + data: PartitionStatus, reported_count: usize, expected_reports: usize, }, @@ -293,7 +293,7 @@ enum PartitionStatus { #[derive(Clone)] enum FinalizeInput { Partitioned(Vec), - CollectLeft(Option), + CollectLeft(PartitionStatus), } impl SharedBuildAccumulator { @@ -356,7 +356,7 @@ impl SharedBuildAccumulator { completed_partitions: 0, }, PartitionMode::CollectLeft => AccumulatedBuildData::CollectLeft { - data: None, + data: PartitionStatus::Pending, reported_count: 0, expected_reports: expected_calls, }, @@ -447,8 +447,8 @@ impl SharedBuildAccumulator { .. }, ) => { - if data.is_none() { - *data = Some(PartitionData { pushdown, bounds }); + if matches!(data, PartitionStatus::Pending) { + *data = PartitionStatus::Reported(PartitionData { pushdown, bounds }); } *reported_count += 1; } @@ -538,8 +538,8 @@ impl SharedBuildAccumulator { fn build_filter(&self, finalize_input: FinalizeInput) -> Result<()> { match finalize_input { - FinalizeInput::CollectLeft(data) => { - if let Some(partition_data) = data { + FinalizeInput::CollectLeft(partition) => match partition { + PartitionStatus::Reported(partition_data) => { let membership_expr = create_membership_predicate( &self.on_right, partition_data.pushdown.clone(), @@ -561,7 +561,17 @@ impl SharedBuildAccumulator { self.dynamic_filter.update(filter_expr)?; } } - } + PartitionStatus::Pending => { + return datafusion_common::internal_err!( + "attempted to finalize collect-left dynamic filter without reported build data" + ); + } + PartitionStatus::CanceledUnknown => { + return datafusion_common::internal_err!( + "collect-left dynamic filter cannot finalize with canceled build data" + ); + } + }, FinalizeInput::Partitioned(partitions) => { let num_partitions = partitions.len(); let routing_hash_expr = Arc::new(HashExpr::new( From 7011c5de4dcadd85f7cb5f4b609c9f7c42d0e1df Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 16 Apr 2026 10:11:37 -0500 Subject: [PATCH 10/11] Preserve shared dynamic filter errors --- .../physical-plan/src/joins/hash_join/shared_bounds.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 3e3c78a08d7ad..827605bb3a859 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -33,7 +33,7 @@ use crate::joins::hash_join::partitioned_hash_eval::{ use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; -use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_common::{DataFusionError, Result, ScalarValue, SharedResult}; use datafusion_expr::Operator; use datafusion_functions::core::r#struct as struct_func; use datafusion_physical_expr::expressions::{ @@ -275,7 +275,7 @@ enum AccumulatedBuildData { enum CompletionState { Pending, Finalizing, - Ready(std::result::Result<(), String>), + Ready(SharedResult<()>), } struct AccumulatorState { @@ -507,9 +507,7 @@ impl SharedBuildAccumulator { } fn finish(&self, finalize_input: FinalizeInput) { - let result = self - .build_filter(finalize_input) - .map_err(|err| err.to_string()); + let result = self.build_filter(finalize_input).map_err(Arc::new); self.dynamic_filter.mark_complete(); let mut guard = self.inner.lock(); @@ -525,7 +523,7 @@ impl SharedBuildAccumulator { match &guard.completion { CompletionState::Ready(Ok(())) => return Ok(()), CompletionState::Ready(Err(err)) => { - return Err(DataFusionError::Execution(err.clone())); + return Err(DataFusionError::Shared(Arc::clone(err))); } CompletionState::Pending | CompletionState::Finalizing => { self.completion_notify.notified() From 5ad96d908292d3c17e74964854f68dd0e3a3ad43 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 16 Apr 2026 19:14:40 -0500 Subject: [PATCH 11/11] Extract build data reporting into a state transition method Replaces the manual PartitionBuildData construction + report_build_data call + build_reported flag set in collect_build_side with a single transition_after_build_collected method, making it impossible to forget to report build data when transitioning state. Co-Authored-By: Claude Sonnet 4.6 --- .../src/joins/hash_join/stream.rs | 94 +++++++++---------- 1 file changed, 44 insertions(+), 50 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 66a69a2eec1ab..39241724ad89f 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -424,6 +424,49 @@ impl HashJoinStream { } } + /// Transitions state after build-side data has been collected, automatically + /// reporting build data to the accumulator when one is present. + /// + /// If a `build_accumulator` is configured, this method constructs the + /// appropriate [`PartitionBuildData`], schedules the reporting future, and + /// returns [`HashJoinStreamState::WaitPartitionBoundsReport`]. Otherwise it + /// delegates to [`Self::state_after_build_ready`]. + fn transition_after_build_collected( + &mut self, + left_data: &Arc, + ) -> HashJoinStreamState { + let Some(build_accumulator) = self.build_accumulator.as_ref() else { + return Self::state_after_build_ready(self.join_type, left_data.as_ref()); + }; + + let pushdown = left_data.membership().clone(); + let bounds = left_data + .bounds + .clone() + .unwrap_or_else(|| PartitionBounds::new(vec![])); + + let build_data = match self.mode { + PartitionMode::Partitioned => PartitionBuildData::Partitioned { + partition_id: self.partition, + pushdown, + bounds, + }, + PartitionMode::CollectLeft => { + PartitionBuildData::CollectLeft { pushdown, bounds } + } + PartitionMode::Auto => unreachable!( + "PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!" + ), + }; + + let acc = Arc::clone(build_accumulator); + self.build_waiter = Some(OnceFut::new(async move { + acc.report_build_data(build_data).await + })); + self.build_reported = true; + HashJoinStreamState::WaitPartitionBoundsReport + } + /// Separate implementation function that unpins the [`HashJoinStream`] so /// that partial borrows work correctly fn poll_next_impl( @@ -514,56 +557,7 @@ impl HashJoinStream { // not the build side (left). The probe-side NULL check happens during process_probe_batch. // The probe_side_has_null flag will be set there if any probe batch contains NULL. - // Handle dynamic filter build-side information accumulation - // - // Dynamic filter coordination between partitions: - // Report hash maps (Partitioned mode) or bounds (CollectLeft mode) to the accumulator - // which will handle synchronization and filter updates - if let Some(ref build_accumulator) = self.build_accumulator { - let build_accumulator = Arc::clone(build_accumulator); - - let left_side_partition_id = match self.mode { - PartitionMode::Partitioned => self.partition, - PartitionMode::CollectLeft => 0, - PartitionMode::Auto => unreachable!( - "PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!" - ), - }; - - // Determine pushdown strategy based on availability of InList values - let pushdown = left_data.membership().clone(); - - // Construct the appropriate build data enum variant based on partition mode - let build_data = match self.mode { - PartitionMode::Partitioned => PartitionBuildData::Partitioned { - partition_id: left_side_partition_id, - pushdown, - bounds: left_data - .bounds - .clone() - .unwrap_or_else(|| PartitionBounds::new(vec![])), - }, - PartitionMode::CollectLeft => PartitionBuildData::CollectLeft { - pushdown, - bounds: left_data - .bounds - .clone() - .unwrap_or_else(|| PartitionBounds::new(vec![])), - }, - PartitionMode::Auto => unreachable!( - "PartitionMode::Auto should not be present at execution time" - ), - }; - - self.build_waiter = Some(OnceFut::new(async move { - build_accumulator.report_build_data(build_data).await - })); - self.build_reported = true; - self.state = HashJoinStreamState::WaitPartitionBoundsReport; - } else { - self.state = - Self::state_after_build_ready(self.join_type, left_data.as_ref()); - } + self.state = self.transition_after_build_collected(&left_data); self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); Poll::Ready(Ok(StatefulStreamResult::Continue))