Skip to content
Open
233 changes: 197 additions & 36 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,33 @@ impl ExecutionPlan for HashJoinExec {
.with_category(MetricCategory::Rows)
.counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition);

// Initialize build_accumulator lazily with runtime partition counts (only if enabled)
// Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
let repartition_random_state = REPARTITION_RANDOM_STATE;
let on_right_exprs = self
.on
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();
let build_accumulator = enable_dynamic_filter_pushdown
.then(|| {
self.dynamic_filter.as_ref().map(|df| {
let filter = Arc::clone(&df.filter);
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
self.mode,
self.left.as_ref(),
self.right.as_ref(),
filter,
on_right_exprs.clone(),
repartition_random_state,
))
})))
})
})
.flatten()
.flatten();

let left_fut = match self.mode {
PartitionMode::CollectLeft => self.left_fut.try_once(|| {
let left_stream = self.left.execute(0, Arc::clone(&context))?;
Expand Down Expand Up @@ -1343,7 +1370,6 @@ impl ExecutionPlan for HashJoinExec {
let reservation =
MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
.register(context.memory_pool());

OnceFut::new(collect_left_input(
self.random_state.random_state().clone(),
left_stream,
Expand All @@ -1368,33 +1394,6 @@ impl ExecutionPlan for HashJoinExec {

let batch_size = context.session_config().batch_size();

// Initialize build_accumulator lazily with runtime partition counts (only if enabled)
// Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
let repartition_random_state = REPARTITION_RANDOM_STATE;
let build_accumulator = enable_dynamic_filter_pushdown
.then(|| {
self.dynamic_filter.as_ref().map(|df| {
let filter = Arc::clone(&df.filter);
let on_right = self
.on
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
self.mode,
self.left.as_ref(),
self.right.as_ref(),
filter,
on_right,
repartition_random_state,
))
})))
})
})
.flatten()
.flatten();

// we have the batches and the hash map with their keys. We can how create a stream
// over the right that uses this information to issue new batches.
let right_stream = self.right.execute(partition, context)?;
Expand All @@ -1408,16 +1407,10 @@ impl ExecutionPlan for HashJoinExec {
None => self.column_indices.clone(),
};

let on_right = self
.on
.iter()
.map(|(_, right_expr)| Arc::clone(right_expr))
.collect::<Vec<_>>();

Ok(Box::pin(HashJoinStream::new(
partition,
self.schema(),
on_right,
on_right_exprs,
self.filter.clone(),
self.join_type,
right_stream,
Expand Down Expand Up @@ -2353,6 +2346,22 @@ mod tests {
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
join_type: JoinType,
) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
hash_join_with_dynamic_filter_and_mode(
left,
right,
on,
join_type,
PartitionMode::CollectLeft,
)
}

fn hash_join_with_dynamic_filter_and_mode(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
join_type: JoinType,
mode: PartitionMode,
) -> Result<(HashJoinExec, Arc<DynamicFilterPhysicalExpr>)> {
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let mut join = HashJoinExec::try_new(
Expand All @@ -2362,7 +2371,7 @@ mod tests {
None,
&join_type,
None,
PartitionMode::CollectLeft,
mode,
NullEquality::NullEqualsNothing,
false,
)?;
Expand Down Expand Up @@ -5634,6 +5643,158 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_partitioned_dynamic_filter_reports_empty_canceled_partitions()
-> Result<()> {
let mut session_config = SessionConfig::default();
session_config
.options_mut()
.optimizer
.enable_dynamic_filter_pushdown = true;
let task_ctx =
Arc::new(TaskContext::default().with_session_config(session_config));

let child_left_schema = Arc::new(Schema::new(vec![
Field::new("child_left_payload", DataType::Int32, false),
Field::new("child_key", DataType::Int32, false),
Field::new("child_left_extra", DataType::Int32, false),
]));
let child_right_schema = Arc::new(Schema::new(vec![
Field::new("child_right_payload", DataType::Int32, false),
Field::new("child_right_key", DataType::Int32, false),
Field::new("child_right_extra", DataType::Int32, false),
]));
let parent_left_schema = Arc::new(Schema::new(vec![
Field::new("parent_payload", DataType::Int32, false),
Field::new("parent_key", DataType::Int32, false),
Field::new("parent_extra", DataType::Int32, false),
]));

let child_left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
&[
vec![build_table_i32(
("child_left_payload", &vec![10]),
("child_key", &vec![0]),
("child_left_extra", &vec![100]),
)],
vec![build_table_i32(
("child_left_payload", &vec![11]),
("child_key", &vec![1]),
("child_left_extra", &vec![101]),
)],
vec![build_table_i32(
("child_left_payload", &vec![12]),
("child_key", &vec![2]),
("child_left_extra", &vec![102]),
)],
vec![build_table_i32(
("child_left_payload", &vec![13]),
("child_key", &vec![3]),
("child_left_extra", &vec![103]),
)],
],
Arc::clone(&child_left_schema),
None,
)?;
let child_right: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
&[
vec![build_table_i32(
("child_right_payload", &vec![20]),
("child_right_key", &vec![0]),
("child_right_extra", &vec![200]),
)],
vec![build_table_i32(
("child_right_payload", &vec![21]),
("child_right_key", &vec![1]),
("child_right_extra", &vec![201]),
)],
vec![build_table_i32(
("child_right_payload", &vec![22]),
("child_right_key", &vec![2]),
("child_right_extra", &vec![202]),
)],
vec![build_table_i32(
("child_right_payload", &vec![23]),
("child_right_key", &vec![3]),
("child_right_extra", &vec![203]),
)],
],
Arc::clone(&child_right_schema),
None,
)?;
let parent_left: Arc<dyn ExecutionPlan> = TestMemoryExec::try_new_exec(
&[
vec![build_table_i32(
("parent_payload", &vec![30]),
("parent_key", &vec![0]),
("parent_extra", &vec![300]),
)],
vec![RecordBatch::new_empty(Arc::clone(&parent_left_schema))],
vec![build_table_i32(
("parent_payload", &vec![32]),
("parent_key", &vec![2]),
("parent_extra", &vec![302]),
)],
vec![RecordBatch::new_empty(Arc::clone(&parent_left_schema))],
],
Arc::clone(&parent_left_schema),
None,
)?;

let child_on = vec![(
Arc::new(Column::new_with_schema("child_key", &child_left_schema)?) as _,
Arc::new(Column::new_with_schema(
"child_right_key",
&child_right_schema,
)?) as _,
)];
let (child_join, _child_dynamic_filter) = hash_join_with_dynamic_filter_and_mode(
child_left,
child_right,
child_on,
JoinType::Inner,
PartitionMode::Partitioned,
)?;
let child_join: Arc<dyn ExecutionPlan> = Arc::new(child_join);

let parent_on = vec![(
Arc::new(Column::new_with_schema("parent_key", &parent_left_schema)?) as _,
Arc::new(Column::new_with_schema("child_key", &child_join.schema())?) as _,
)];
let parent_join = HashJoinExec::try_new(
parent_left,
child_join,
parent_on,
None,
&JoinType::RightSemi,
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNothing,
false,
)?;

let batches = tokio::time::timeout(
std::time::Duration::from_secs(5),
crate::execution_plan::collect(Arc::new(parent_join), task_ctx),
)
.await
.expect("partitioned right-semi join should not hang")?;

assert_batches_sorted_eq!(
[
"+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
"| child_left_payload | child_key | child_left_extra | child_right_payload | child_right_key | child_right_extra |",
"+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
"| 10 | 0 | 100 | 20 | 0 | 200 |",
"| 12 | 2 | 102 | 22 | 2 | 202 |",
"+--------------------+-----------+------------------+---------------------+-----------------+-------------------+",
],
&batches
);

Ok(())
}

#[tokio::test]
async fn test_hash_join_skips_probe_on_empty_build_after_partition_bounds_report()
-> Result<()> {
Expand Down
Loading
Loading