diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index d064f5ce6c3b7..6be69a2beeb4a 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1316,6 +1316,33 @@ impl ExecutionPlan for HashJoinExec { .with_category(MetricCategory::Rows) .counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition); + // Initialize build_accumulator lazily with runtime partition counts (only if enabled) + // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing + let repartition_random_state = REPARTITION_RANDOM_STATE; + let on_right_exprs = self + .on + .iter() + .map(|(_, right_expr)| Arc::clone(right_expr)) + .collect::>(); + let build_accumulator = enable_dynamic_filter_pushdown + .then(|| { + self.dynamic_filter.as_ref().map(|df| { + let filter = Arc::clone(&df.filter); + Some(Arc::clone(df.build_accumulator.get_or_init(|| { + Arc::new(SharedBuildAccumulator::new_from_partition_mode( + self.mode, + self.left.as_ref(), + self.right.as_ref(), + filter, + on_right_exprs.clone(), + repartition_random_state, + )) + }))) + }) + }) + .flatten() + .flatten(); + let left_fut = match self.mode { PartitionMode::CollectLeft => self.left_fut.try_once(|| { let left_stream = self.left.execute(0, Arc::clone(&context))?; @@ -1343,7 +1370,6 @@ impl ExecutionPlan for HashJoinExec { let reservation = MemoryConsumer::new(format!("HashJoinInput[{partition}]")) .register(context.memory_pool()); - OnceFut::new(collect_left_input( self.random_state.random_state().clone(), left_stream, @@ -1368,33 +1394,6 @@ impl ExecutionPlan for HashJoinExec { let batch_size = context.session_config().batch_size(); - // Initialize build_accumulator lazily with runtime partition counts (only if enabled) - // Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing - let repartition_random_state = REPARTITION_RANDOM_STATE; - let build_accumulator = enable_dynamic_filter_pushdown - .then(|| { - self.dynamic_filter.as_ref().map(|df| { - let filter = Arc::clone(&df.filter); - let on_right = self - .on - .iter() - .map(|(_, right_expr)| Arc::clone(right_expr)) - .collect::>(); - Some(Arc::clone(df.build_accumulator.get_or_init(|| { - Arc::new(SharedBuildAccumulator::new_from_partition_mode( - self.mode, - self.left.as_ref(), - self.right.as_ref(), - filter, - on_right, - repartition_random_state, - )) - }))) - }) - }) - .flatten() - .flatten(); - // we have the batches and the hash map with their keys. We can how create a stream // over the right that uses this information to issue new batches. let right_stream = self.right.execute(partition, context)?; @@ -1408,16 +1407,10 @@ impl ExecutionPlan for HashJoinExec { None => self.column_indices.clone(), }; - let on_right = self - .on - .iter() - .map(|(_, right_expr)| Arc::clone(right_expr)) - .collect::>(); - Ok(Box::pin(HashJoinStream::new( partition, self.schema(), - on_right, + on_right_exprs, self.filter.clone(), self.join_type, right_stream, @@ -2353,6 +2346,22 @@ mod tests { right: Arc, on: JoinOn, join_type: JoinType, + ) -> Result<(HashJoinExec, Arc)> { + hash_join_with_dynamic_filter_and_mode( + left, + right, + on, + join_type, + PartitionMode::CollectLeft, + ) + } + + fn hash_join_with_dynamic_filter_and_mode( + left: Arc, + right: Arc, + on: JoinOn, + join_type: JoinType, + mode: PartitionMode, ) -> Result<(HashJoinExec, Arc)> { let dynamic_filter = HashJoinExec::create_dynamic_filter(&on); let mut join = HashJoinExec::try_new( @@ -2362,7 +2371,7 @@ mod tests { None, &join_type, None, - PartitionMode::CollectLeft, + mode, NullEquality::NullEqualsNothing, false, )?; @@ -5634,6 +5643,158 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_partitioned_dynamic_filter_reports_empty_canceled_partitions() + -> Result<()> { + let mut session_config = SessionConfig::default(); + session_config + .options_mut() + .optimizer + .enable_dynamic_filter_pushdown = true; + let task_ctx = + Arc::new(TaskContext::default().with_session_config(session_config)); + + let child_left_schema = Arc::new(Schema::new(vec![ + Field::new("child_left_payload", DataType::Int32, false), + Field::new("child_key", DataType::Int32, false), + Field::new("child_left_extra", DataType::Int32, false), + ])); + let child_right_schema = Arc::new(Schema::new(vec![ + Field::new("child_right_payload", DataType::Int32, false), + Field::new("child_right_key", DataType::Int32, false), + Field::new("child_right_extra", DataType::Int32, false), + ])); + let parent_left_schema = Arc::new(Schema::new(vec![ + Field::new("parent_payload", DataType::Int32, false), + Field::new("parent_key", DataType::Int32, false), + Field::new("parent_extra", DataType::Int32, false), + ])); + + let child_left: Arc = TestMemoryExec::try_new_exec( + &[ + vec![build_table_i32( + ("child_left_payload", &vec![10]), + ("child_key", &vec![0]), + ("child_left_extra", &vec![100]), + )], + vec![build_table_i32( + ("child_left_payload", &vec![11]), + ("child_key", &vec![1]), + ("child_left_extra", &vec![101]), + )], + vec![build_table_i32( + ("child_left_payload", &vec![12]), + ("child_key", &vec![2]), + ("child_left_extra", &vec![102]), + )], + vec![build_table_i32( + ("child_left_payload", &vec![13]), + ("child_key", &vec![3]), + ("child_left_extra", &vec![103]), + )], + ], + Arc::clone(&child_left_schema), + None, + )?; + let child_right: Arc = TestMemoryExec::try_new_exec( + &[ + vec![build_table_i32( + ("child_right_payload", &vec![20]), + ("child_right_key", &vec![0]), + ("child_right_extra", &vec![200]), + )], + vec![build_table_i32( + ("child_right_payload", &vec![21]), + ("child_right_key", &vec![1]), + ("child_right_extra", &vec![201]), + )], + vec![build_table_i32( + ("child_right_payload", &vec![22]), + ("child_right_key", &vec![2]), + ("child_right_extra", &vec![202]), + )], + vec![build_table_i32( + ("child_right_payload", &vec![23]), + ("child_right_key", &vec![3]), + ("child_right_extra", &vec![203]), + )], + ], + Arc::clone(&child_right_schema), + None, + )?; + let parent_left: Arc = TestMemoryExec::try_new_exec( + &[ + vec![build_table_i32( + ("parent_payload", &vec![30]), + ("parent_key", &vec![0]), + ("parent_extra", &vec![300]), + )], + vec![RecordBatch::new_empty(Arc::clone(&parent_left_schema))], + vec![build_table_i32( + ("parent_payload", &vec![32]), + ("parent_key", &vec![2]), + ("parent_extra", &vec![302]), + )], + vec![RecordBatch::new_empty(Arc::clone(&parent_left_schema))], + ], + Arc::clone(&parent_left_schema), + None, + )?; + + let child_on = vec![( + Arc::new(Column::new_with_schema("child_key", &child_left_schema)?) as _, + Arc::new(Column::new_with_schema( + "child_right_key", + &child_right_schema, + )?) as _, + )]; + let (child_join, _child_dynamic_filter) = hash_join_with_dynamic_filter_and_mode( + child_left, + child_right, + child_on, + JoinType::Inner, + PartitionMode::Partitioned, + )?; + let child_join: Arc = Arc::new(child_join); + + let parent_on = vec![( + Arc::new(Column::new_with_schema("parent_key", &parent_left_schema)?) as _, + Arc::new(Column::new_with_schema("child_key", &child_join.schema())?) as _, + )]; + let parent_join = HashJoinExec::try_new( + parent_left, + child_join, + parent_on, + None, + &JoinType::RightSemi, + None, + PartitionMode::Partitioned, + NullEquality::NullEqualsNothing, + false, + )?; + + let batches = tokio::time::timeout( + std::time::Duration::from_secs(5), + crate::execution_plan::collect(Arc::new(parent_join), task_ctx), + ) + .await + .expect("partitioned right-semi join should not hang")?; + + assert_batches_sorted_eq!( + [ + "+--------------------+-----------+------------------+---------------------+-----------------+-------------------+", + "| child_left_payload | child_key | child_left_extra | child_right_payload | child_right_key | child_right_extra |", + "+--------------------+-----------+------------------+---------------------+-----------------+-------------------+", + "| 10 | 0 | 100 | 20 | 0 | 200 |", + "| 12 | 2 | 102 | 22 | 2 | 202 |", + "+--------------------+-----------+------------------+---------------------+-----------------+-------------------+", + ], + &batches + ); + + Ok(()) + } + #[tokio::test] async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report() -> Result<()> { diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index f32dc7fa80268..827605bb3a859 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -33,7 +33,7 @@ use crate::joins::hash_join::partitioned_hash_eval::{ use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; -use datafusion_common::{Result, ScalarValue}; +use datafusion_common::{DataFusionError, Result, ScalarValue, SharedResult}; use datafusion_expr::Operator; use datafusion_functions::core::r#struct as struct_func; use datafusion_physical_expr::expressions::{ @@ -42,7 +42,7 @@ use datafusion_physical_expr::expressions::{ use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr}; use parking_lot::Mutex; -use tokio::sync::Barrier; +use tokio::sync::Notify; /// Represents the minimum and maximum values for a specific column. /// Used in dynamic filter pushdown to establish value boundaries. @@ -215,8 +215,8 @@ fn create_bounds_predicate( /// partition executions. pub(crate) struct SharedBuildAccumulator { /// Build-side data protected by a single mutex to avoid ordering concerns - inner: Mutex, - barrier: Barrier, + inner: Mutex, + completion_notify: Notify, /// Dynamic filter for pushdown to probe side dynamic_filter: Arc, /// Right side join expressions needed for creating filter expressions @@ -262,13 +262,40 @@ struct PartitionData { /// Build-side data organized by partition mode enum AccumulatedBuildData { Partitioned { - partitions: Vec>, + partitions: Vec, + completed_partitions: usize, }, CollectLeft { - data: Option, + data: PartitionStatus, + reported_count: usize, + expected_reports: usize, }, } +enum CompletionState { + Pending, + Finalizing, + Ready(SharedResult<()>), +} + +struct AccumulatorState { + data: AccumulatedBuildData, + completion: CompletionState, +} + +#[derive(Clone)] +enum PartitionStatus { + Pending, + Reported(PartitionData), + CanceledUnknown, +} + +#[derive(Clone)] +enum FinalizeInput { + Partitioned(Vec), + CollectLeft(PartitionStatus), +} + impl SharedBuildAccumulator { /// Creates a new SharedBuildAccumulator configured for the given partition mode /// @@ -323,21 +350,27 @@ impl SharedBuildAccumulator { let mode_data = match partition_mode { PartitionMode::Partitioned => AccumulatedBuildData::Partitioned { partitions: vec![ - None; + PartitionStatus::Pending; left_child.output_partitioning().partition_count() ], + completed_partitions: 0, + }, + PartitionMode::CollectLeft => AccumulatedBuildData::CollectLeft { + data: PartitionStatus::Pending, + reported_count: 0, + expected_reports: expected_calls, }, - PartitionMode::CollectLeft => { - AccumulatedBuildData::CollectLeft { data: None } - } PartitionMode::Auto => unreachable!( "PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!" ), }; Self { - inner: Mutex::new(mode_data), - barrier: Barrier::new(expected_calls), + inner: Mutex::new(AccumulatorState { + data: mode_data, + completion: CompletionState::Pending, + }), + completion_notify: Notify::new(), dynamic_filter, on_right, repartition_random_state, @@ -358,229 +391,283 @@ impl SharedBuildAccumulator { /// # Returns /// * `Result<()>` - Ok if successful, Err if filter update failed or mode mismatch pub(crate) async fn report_build_data(&self, data: PartitionBuildData) -> Result<()> { - // Store data in the accumulator - { + let finalize_input = { let mut guard = self.inner.lock(); + self.store_build_data(&mut guard, data)?; + self.take_finalize_input_if_ready(&mut guard) + }; - match (data, &mut *guard) { - // Partitioned mode - ( - PartitionBuildData::Partitioned { - partition_id, - pushdown, - bounds, - }, - AccumulatedBuildData::Partitioned { partitions }, - ) => { - partitions[partition_id] = Some(PartitionData { pushdown, bounds }); - } - // CollectLeft mode (store once, deduplicate across partitions) - ( - PartitionBuildData::CollectLeft { pushdown, bounds }, - AccumulatedBuildData::CollectLeft { data }, - ) => { - // Deduplicate - all partitions report the same data in CollectLeft - if data.is_none() { - *data = Some(PartitionData { pushdown, bounds }); - } + if let Some(finalize_input) = finalize_input { + self.finish(finalize_input); + } + + self.wait_for_completion().await + } + + pub(crate) fn report_canceled_partition(&self, partition_id: usize) { + let finalize_input = { + let mut guard = self.inner.lock(); + self.store_canceled_partition(&mut guard, partition_id); + self.take_finalize_input_if_ready(&mut guard) + }; + + if let Some(finalize_input) = finalize_input { + self.finish(finalize_input); + } + } + + fn store_build_data( + &self, + guard: &mut AccumulatorState, + data: PartitionBuildData, + ) -> Result<()> { + match (data, &mut guard.data) { + ( + PartitionBuildData::Partitioned { + partition_id, + pushdown, + bounds, + }, + AccumulatedBuildData::Partitioned { + partitions, + completed_partitions, + }, + ) => { + if matches!(partitions[partition_id], PartitionStatus::Pending) { + *completed_partitions += 1; } - // Mismatched modes - should never happen - _ => { - return datafusion_common::internal_err!( - "Build data mode mismatch in report_build_data" - ); + partitions[partition_id] = + PartitionStatus::Reported(PartitionData { pushdown, bounds }); + } + ( + PartitionBuildData::CollectLeft { pushdown, bounds }, + AccumulatedBuildData::CollectLeft { + data, + reported_count, + .. + }, + ) => { + if matches!(data, PartitionStatus::Pending) { + *data = PartitionStatus::Reported(PartitionData { pushdown, bounds }); } + *reported_count += 1; + } + _ => { + return datafusion_common::internal_err!( + "Build data mode mismatch in report_build_data" + ); } } + Ok(()) + } - // Wait for all partitions to report - if self.barrier.wait().await.is_leader() { - // All partitions have reported, so we can create and update the filter - let inner = self.inner.lock(); - - match &*inner { - // CollectLeft: Simple conjunction of bounds and membership check - AccumulatedBuildData::CollectLeft { data } => { - if let Some(partition_data) = data { - // Create membership predicate (InList for small build sides, hash lookup otherwise) - let membership_expr = create_membership_predicate( - &self.on_right, - partition_data.pushdown.clone(), - &HASH_JOIN_SEED, - self.probe_schema.as_ref(), - )?; - - // Create bounds check expression (if bounds available) - let bounds_expr = create_bounds_predicate( - &self.on_right, - &partition_data.bounds, - ); - - // Combine membership and bounds expressions for multi-layer optimization: - // - Bounds (min/max): Enable statistics-based pruning (Parquet row group/file skipping) - // - Membership (InList/hash lookup): Enables: - // * Precise filtering (exact value matching) - // * Bloom filter utilization (if present in Parquet files) - // * Better pruning for data types where min/max isn't effective (e.g., UUIDs) - // Together, they provide complementary benefits and maximize data skipping. - // Only update the filter if we have something to push down - if let Some(filter_expr) = match (membership_expr, bounds_expr) { - (Some(membership), Some(bounds)) => { - // Both available: combine with AND - Some(Arc::new(BinaryExpr::new( - bounds, - Operator::And, - membership, - )) - as Arc) - } - (Some(membership), None) => { - // Membership available but no bounds - // This is reachable when we have data but bounds aren't available - // (e.g., unsupported data types or no columns with bounds) - Some(membership) - } - (None, Some(bounds)) => { - // Bounds available but no membership. - // This should be unreachable in practice: we can always push down a reference - // to the hash table. - // But it seems safer to handle it defensively. - Some(bounds) - } - (None, None) => { - // No filter available (e.g., empty build side) - // Don't update the filter, but continue to mark complete - None - } - } { - self.dynamic_filter.update(filter_expr)?; - } + fn store_canceled_partition( + &self, + guard: &mut AccumulatorState, + partition_id: usize, + ) { + if let AccumulatedBuildData::Partitioned { + partitions, + completed_partitions, + } = &mut guard.data + && matches!(partitions[partition_id], PartitionStatus::Pending) + { + partitions[partition_id] = PartitionStatus::CanceledUnknown; + *completed_partitions += 1; + } + } + + fn take_finalize_input_if_ready( + &self, + guard: &mut AccumulatorState, + ) -> Option { + if !matches!(guard.completion, CompletionState::Pending) { + return None; + } + + let finalize_input = match &guard.data { + AccumulatedBuildData::Partitioned { + partitions, + completed_partitions, + } if *completed_partitions == partitions.len() => { + Some(FinalizeInput::Partitioned(partitions.clone())) + } + AccumulatedBuildData::CollectLeft { + data, + reported_count, + expected_reports, + } if *reported_count == *expected_reports => { + Some(FinalizeInput::CollectLeft(data.clone())) + } + _ => None, + }?; + + guard.completion = CompletionState::Finalizing; + Some(finalize_input) + } + + fn finish(&self, finalize_input: FinalizeInput) { + let result = self.build_filter(finalize_input).map_err(Arc::new); + self.dynamic_filter.mark_complete(); + + let mut guard = self.inner.lock(); + guard.completion = CompletionState::Ready(result); + drop(guard); + self.completion_notify.notify_waiters(); + } + + async fn wait_for_completion(&self) -> Result<()> { + loop { + let notified = { + let guard = self.inner.lock(); + match &guard.completion { + CompletionState::Ready(Ok(())) => return Ok(()), + CompletionState::Ready(Err(err)) => { + return Err(DataFusionError::Shared(Arc::clone(err))); + } + CompletionState::Pending | CompletionState::Finalizing => { + self.completion_notify.notified() } } - // Partitioned: CASE expression routing to per-partition filters - AccumulatedBuildData::Partitioned { partitions } => { - // Collect all partition data (should all be Some at this point) - let partition_data: Vec<_> = - partitions.iter().filter_map(|p| p.as_ref()).collect(); - - if !partition_data.is_empty() { - // Build a CASE expression that combines range checks AND membership checks - // CASE (hash_repartition(join_keys) % num_partitions) - // WHEN 0 THEN (col >= min_0 AND col <= max_0 AND ...) AND membership_check_0 - // WHEN 1 THEN (col >= min_1 AND col <= max_1 AND ...) AND membership_check_1 - // ... - // ELSE false - // END - - let num_partitions = partition_data.len(); - - // Create base expression: hash_repartition(join_keys) % num_partitions - let routing_hash_expr = Arc::new(HashExpr::new( - self.on_right.clone(), - self.repartition_random_state.clone(), - "hash_repartition".to_string(), - )) - as Arc; - - let modulo_expr = Arc::new(BinaryExpr::new( - routing_hash_expr, - Operator::Modulo, - lit(ScalarValue::UInt64(Some(num_partitions as u64))), - )) - as Arc; - - // Create WHEN branches for each partition - let when_then_branches: Vec<( - Arc, - Arc, - )> = partitions - .iter() - .enumerate() - .filter_map(|(partition_id, partition_opt)| { - partition_opt.as_ref().and_then(|partition| { - // Skip empty partitions - they would always return false anyway - match &partition.pushdown { - PushdownStrategy::Empty => None, - _ => Some((partition_id, partition)), - } - }) - }) - .map(|(partition_id, partition)| -> Result<_> { - // WHEN partition_id - let when_expr = - lit(ScalarValue::UInt64(Some(partition_id as u64))); - - // THEN: Combine bounds check AND membership predicate - - // 1. Create membership predicate (InList for small build sides, hash lookup otherwise) - let membership_expr = create_membership_predicate( - &self.on_right, - partition.pushdown.clone(), - &HASH_JOIN_SEED, - self.probe_schema.as_ref(), - )?; - - // 2. Create bounds check expression for this partition (if bounds available) - let bounds_expr = create_bounds_predicate( - &self.on_right, - &partition.bounds, - ); - - // 3. Combine membership and bounds expressions - let then_expr = match (membership_expr, bounds_expr) { - (Some(membership), Some(bounds)) => { - // Both available: combine with AND - Arc::new(BinaryExpr::new( - bounds, - Operator::And, - membership, - )) - as Arc - } - (Some(membership), None) => { - // Membership available but no bounds (e.g., unsupported data types) - membership - } - (None, Some(bounds)) => { - // Bounds available but no membership. - // This should be unreachable in practice: we can always push down a reference - // to the hash table. - // But it seems safer to handle it defensively. - bounds - } - (None, None) => { - // No filter for this partition - should not happen due to filter_map above - // but handle defensively by returning a "true" literal - lit(true) - } - }; - - Ok((when_expr, then_expr)) - }) - .collect::>>()?; - - // Optimize for single partition: skip CASE expression entirely - let filter_expr = if when_then_branches.is_empty() { - // All partitions are empty: no rows can match - lit(false) - } else if when_then_branches.len() == 1 { - // Single partition: just use the condition directly - // since hash % 1 == 0 always, the WHEN 0 branch will always match - Arc::clone(&when_then_branches[0].1) - } else { - // Multiple partitions: create CASE expression - Arc::new(CaseExpr::try_new( - Some(modulo_expr), - when_then_branches, - Some(lit(false)), // ELSE false - )?) as Arc - }; + }; + notified.await; + } + } + fn build_filter(&self, finalize_input: FinalizeInput) -> Result<()> { + match finalize_input { + FinalizeInput::CollectLeft(partition) => match partition { + PartitionStatus::Reported(partition_data) => { + let membership_expr = create_membership_predicate( + &self.on_right, + partition_data.pushdown.clone(), + &HASH_JOIN_SEED, + self.probe_schema.as_ref(), + )?; + let bounds_expr = + create_bounds_predicate(&self.on_right, &partition_data.bounds); + + if let Some(filter_expr) = match (membership_expr, bounds_expr) { + (Some(membership), Some(bounds)) => Some(Arc::new( + BinaryExpr::new(bounds, Operator::And, membership), + ) + as Arc), + (Some(membership), None) => Some(membership), + (None, Some(bounds)) => Some(bounds), + (None, None) => None, + } { self.dynamic_filter.update(filter_expr)?; } } + PartitionStatus::Pending => { + return datafusion_common::internal_err!( + "attempted to finalize collect-left dynamic filter without reported build data" + ); + } + PartitionStatus::CanceledUnknown => { + return datafusion_common::internal_err!( + "collect-left dynamic filter cannot finalize with canceled build data" + ); + } + }, + FinalizeInput::Partitioned(partitions) => { + let num_partitions = partitions.len(); + let routing_hash_expr = Arc::new(HashExpr::new( + self.on_right.clone(), + self.repartition_random_state.clone(), + "hash_repartition".to_string(), + )) as Arc; + + let modulo_expr = Arc::new(BinaryExpr::new( + routing_hash_expr, + Operator::Modulo, + lit(ScalarValue::UInt64(Some(num_partitions as u64))), + )) as Arc; + + let mut real_branches = Vec::new(); + let mut empty_partition_ids = Vec::new(); + let mut has_canceled_unknown = false; + + for (partition_id, partition) in partitions.iter().enumerate() { + match partition { + PartitionStatus::Reported(partition) + if matches!(partition.pushdown, PushdownStrategy::Empty) => + { + empty_partition_ids.push(partition_id); + } + PartitionStatus::Reported(partition) => { + let membership_expr = create_membership_predicate( + &self.on_right, + partition.pushdown.clone(), + &HASH_JOIN_SEED, + self.probe_schema.as_ref(), + )?; + let bounds_expr = create_bounds_predicate( + &self.on_right, + &partition.bounds, + ); + let then_expr = match (membership_expr, bounds_expr) { + (Some(membership), Some(bounds)) => Arc::new( + BinaryExpr::new(bounds, Operator::And, membership), + ) + as Arc, + (Some(membership), None) => membership, + (None, Some(bounds)) => bounds, + (None, None) => lit(true), + }; + real_branches.push(( + lit(ScalarValue::UInt64(Some(partition_id as u64))), + then_expr, + )); + } + PartitionStatus::CanceledUnknown => { + has_canceled_unknown = true; + } + PartitionStatus::Pending => { + return datafusion_common::internal_err!( + "attempted to finalize dynamic filter with pending partition" + ); + } + } + } + + let filter_expr = if has_canceled_unknown { + let mut when_then_branches = empty_partition_ids + .into_iter() + .map(|partition_id| { + ( + lit(ScalarValue::UInt64(Some(partition_id as u64))), + lit(false), + ) + }) + .collect::>(); + when_then_branches.extend(real_branches); + + if when_then_branches.is_empty() { + lit(true) + } else { + Arc::new(CaseExpr::try_new( + Some(modulo_expr), + when_then_branches, + Some(lit(true)), + )?) as Arc + } + } else if real_branches.is_empty() { + lit(false) + } else if real_branches.len() == 1 + && empty_partition_ids.len() + 1 == num_partitions + { + Arc::clone(&real_branches[0].1) + } else { + Arc::new(CaseExpr::try_new( + Some(modulo_expr), + real_branches, + Some(lit(false)), + )?) as Arc + }; + + self.dynamic_filter.update(filter_expr)?; } - self.dynamic_filter.mark_complete(); } Ok(()) diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 1004fba3d4f45..39241724ad89f 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -219,6 +219,8 @@ pub(super) struct HashJoinStream { /// Optional future to signal when build information has been reported by all partitions /// and the dynamic filter has been updated build_waiter: Option>, + /// Tracks whether this partition has already reported build information to the coordinator. + build_reported: bool, /// Partitioning mode to use mode: PartitionMode, /// Output buffer for coalescing small batches into larger ones with optional fetch limit. @@ -400,6 +402,7 @@ impl HashJoinStream { right_side_ordered, build_accumulator, build_waiter: None, + build_reported: false, mode, output_buffer, null_aware, @@ -421,6 +424,49 @@ impl HashJoinStream { } } + /// Transitions state after build-side data has been collected, automatically + /// reporting build data to the accumulator when one is present. + /// + /// If a `build_accumulator` is configured, this method constructs the + /// appropriate [`PartitionBuildData`], schedules the reporting future, and + /// returns [`HashJoinStreamState::WaitPartitionBoundsReport`]. Otherwise it + /// delegates to [`Self::state_after_build_ready`]. + fn transition_after_build_collected( + &mut self, + left_data: &Arc, + ) -> HashJoinStreamState { + let Some(build_accumulator) = self.build_accumulator.as_ref() else { + return Self::state_after_build_ready(self.join_type, left_data.as_ref()); + }; + + let pushdown = left_data.membership().clone(); + let bounds = left_data + .bounds + .clone() + .unwrap_or_else(|| PartitionBounds::new(vec![])); + + let build_data = match self.mode { + PartitionMode::Partitioned => PartitionBuildData::Partitioned { + partition_id: self.partition, + pushdown, + bounds, + }, + PartitionMode::CollectLeft => { + PartitionBuildData::CollectLeft { pushdown, bounds } + } + PartitionMode::Auto => unreachable!( + "PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!" + ), + }; + + let acc = Arc::clone(build_accumulator); + self.build_waiter = Some(OnceFut::new(async move { + acc.report_build_data(build_data).await + })); + self.build_reported = true; + HashJoinStreamState::WaitPartitionBoundsReport + } + /// Separate implementation function that unpins the [`HashJoinStream`] so /// that partial borrows work correctly fn poll_next_impl( @@ -511,55 +557,7 @@ impl HashJoinStream { // not the build side (left). The probe-side NULL check happens during process_probe_batch. // The probe_side_has_null flag will be set there if any probe batch contains NULL. - // Handle dynamic filter build-side information accumulation - // - // Dynamic filter coordination between partitions: - // Report hash maps (Partitioned mode) or bounds (CollectLeft mode) to the accumulator - // which will handle synchronization and filter updates - if let Some(ref build_accumulator) = self.build_accumulator { - let build_accumulator = Arc::clone(build_accumulator); - - let left_side_partition_id = match self.mode { - PartitionMode::Partitioned => self.partition, - PartitionMode::CollectLeft => 0, - PartitionMode::Auto => unreachable!( - "PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!" - ), - }; - - // Determine pushdown strategy based on availability of InList values - let pushdown = left_data.membership().clone(); - - // Construct the appropriate build data enum variant based on partition mode - let build_data = match self.mode { - PartitionMode::Partitioned => PartitionBuildData::Partitioned { - partition_id: left_side_partition_id, - pushdown, - bounds: left_data - .bounds - .clone() - .unwrap_or_else(|| PartitionBounds::new(vec![])), - }, - PartitionMode::CollectLeft => PartitionBuildData::CollectLeft { - pushdown, - bounds: left_data - .bounds - .clone() - .unwrap_or_else(|| PartitionBounds::new(vec![])), - }, - PartitionMode::Auto => unreachable!( - "PartitionMode::Auto should not be present at execution time" - ), - }; - - self.build_waiter = Some(OnceFut::new(async move { - build_accumulator.report_build_data(build_data).await - })); - self.state = HashJoinStreamState::WaitPartitionBoundsReport; - } else { - self.state = - Self::state_after_build_ready(self.join_type, left_data.as_ref()); - } + self.state = self.transition_after_build_collected(&left_data); self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); Poll::Ready(Ok(StatefulStreamResult::Continue)) @@ -947,3 +945,15 @@ impl Stream for HashJoinStream { self.poll_next_impl(cx) } } + +impl Drop for HashJoinStream { + fn drop(&mut self) { + if self.mode == PartitionMode::Partitioned + && !self.build_reported + && let Some(build_accumulator) = &self.build_accumulator + { + build_accumulator.report_canceled_partition(self.partition); + self.build_reported = true; + } + } +}