From 104e0984b0a183130755079ce5425a622ae32351 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 15 Apr 2026 21:41:58 +0200 Subject: [PATCH 1/3] Use NDV estimate to pre-allocate hash tables during aggregation Use column distinct_count statistics (from Parquet metadata or other sources) to pre-size the hash table in GroupValues implementations, avoiding expensive rehashing during aggregation. The capacity hint is bounded by 128K entries to prevent over-allocation. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../physical-expr-common/src/binary_map.rs | 7 ++- .../src/binary_view_map.rs | 7 ++- .../src/aggregates/group_values/mod.rs | 47 +++++++++++++++---- .../group_values/multi_group_by/mod.rs | 10 ++-- .../src/aggregates/group_values/row.rs | 6 +-- .../group_values/single_group_by/bytes.rs | 4 +- .../single_group_by/bytes_view.rs | 4 +- .../group_values/single_group_by/primitive.rs | 7 +-- .../physical-plan/src/aggregates/mod.rs | 5 +- .../physical-plan/src/aggregates/row_hash.rs | 16 ++++++- .../physical-plan/src/recursive_query.rs | 2 +- 11 files changed, 84 insertions(+), 31 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_map.rs b/datafusion/physical-expr-common/src/binary_map.rs index ad184d6500d56..f55f68f2775b1 100644 --- a/datafusion/physical-expr-common/src/binary_map.rs +++ b/datafusion/physical-expr-common/src/binary_map.rs @@ -244,9 +244,14 @@ where V: Debug + PartialEq + Eq + Clone + Copy + Default, { pub fn new(output_type: OutputType) -> Self { + Self::with_capacity(output_type, INITIAL_MAP_CAPACITY) + } + + pub fn with_capacity(output_type: OutputType, capacity: usize) -> Self { + let capacity = capacity.max(INITIAL_MAP_CAPACITY); Self { output_type, - map: hashbrown::hash_table::HashTable::with_capacity(INITIAL_MAP_CAPACITY), + map: hashbrown::hash_table::HashTable::with_capacity(capacity), map_size: 0, buffer: BufferBuilder::new(INITIAL_BUFFER_CAPACITY), offsets: vec![O::default()], // first offset is always 0 diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index abc3e28f82627..69d1784459766 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -155,9 +155,14 @@ where V: Debug + PartialEq + Eq + Clone + Copy + Default, { pub fn new(output_type: OutputType) -> Self { + Self::with_capacity(output_type, INITIAL_MAP_CAPACITY) + } + + pub fn with_capacity(output_type: OutputType, capacity: usize) -> Self { + let capacity = capacity.max(INITIAL_MAP_CAPACITY); Self { output_type, - map: hashbrown::hash_table::HashTable::with_capacity(INITIAL_MAP_CAPACITY), + map: hashbrown::hash_table::HashTable::with_capacity(capacity), map_size: 0, views: Vec::new(), in_progress: Vec::new(), diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index 2f3b1a19e7d73..ebc55375889a2 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -134,13 +134,18 @@ pub trait GroupValues: Send { pub fn new_group_values( schema: SchemaRef, group_ordering: &GroupOrdering, + capacity_hint: Option, ) -> Result> { + let capacity = capacity_hint.unwrap_or(0); if schema.fields.len() == 1 { let d = schema.fields[0].data_type(); macro_rules! downcast_helper { ($t:ty, $d:ident) => { - return Ok(Box::new(GroupValuesPrimitive::<$t>::new($d.clone()))) + return Ok(Box::new(GroupValuesPrimitive::<$t>::new( + $d.clone(), + capacity, + ))) }; } @@ -176,22 +181,40 @@ pub fn new_group_values( downcast_helper!(Decimal128Type, d); } DataType::Utf8 => { - return Ok(Box::new(GroupValuesBytes::::new(OutputType::Utf8))); + return Ok(Box::new(GroupValuesBytes::::new( + OutputType::Utf8, + capacity, + ))); } DataType::LargeUtf8 => { - return Ok(Box::new(GroupValuesBytes::::new(OutputType::Utf8))); + return Ok(Box::new(GroupValuesBytes::::new( + OutputType::Utf8, + capacity, + ))); } DataType::Utf8View => { - return Ok(Box::new(GroupValuesBytesView::new(OutputType::Utf8View))); + return Ok(Box::new(GroupValuesBytesView::new( + OutputType::Utf8View, + capacity, + ))); } DataType::Binary => { - return Ok(Box::new(GroupValuesBytes::::new(OutputType::Binary))); + return Ok(Box::new(GroupValuesBytes::::new( + OutputType::Binary, + capacity, + ))); } DataType::LargeBinary => { - return Ok(Box::new(GroupValuesBytes::::new(OutputType::Binary))); + return Ok(Box::new(GroupValuesBytes::::new( + OutputType::Binary, + capacity, + ))); } DataType::BinaryView => { - return Ok(Box::new(GroupValuesBytesView::new(OutputType::BinaryView))); + return Ok(Box::new(GroupValuesBytesView::new( + OutputType::BinaryView, + capacity, + ))); } DataType::Boolean => { return Ok(Box::new(GroupValuesBoolean::new())); @@ -202,11 +225,15 @@ pub fn new_group_values( if multi_group_by::supported_schema(schema.as_ref()) { if matches!(group_ordering, GroupOrdering::None) { - Ok(Box::new(GroupValuesColumn::::try_new(schema)?)) + Ok(Box::new(GroupValuesColumn::::try_new( + schema, capacity, + )?)) } else { - Ok(Box::new(GroupValuesColumn::::try_new(schema)?)) + Ok(Box::new(GroupValuesColumn::::try_new( + schema, capacity, + )?)) } } else { - Ok(Box::new(GroupValuesRows::try_new(schema)?)) + Ok(Box::new(GroupValuesRows::try_new(schema, capacity)?)) } } diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs index cc4576eabddbd..d4781434a9a2f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs @@ -260,8 +260,8 @@ impl GroupValuesColumn { // ======================================================================== /// Create a new instance of GroupValuesColumn if supported for the specified schema - pub fn try_new(schema: SchemaRef) -> Result { - let map = HashTable::with_capacity(0); + pub fn try_new(schema: SchemaRef, capacity: usize) -> Result { + let map = HashTable::with_capacity(capacity); Ok(Self { schema, map, @@ -1268,7 +1268,7 @@ mod tests { fn test_intern_for_vectorized_group_values() { let data_set = VectorizedTestDataSet::new(); let mut group_values = - GroupValuesColumn::::try_new(data_set.schema()).unwrap(); + GroupValuesColumn::::try_new(data_set.schema(), 0).unwrap(); data_set.load_to_group_values(&mut group_values); let actual_batch = group_values.emit(EmitTo::All).unwrap(); @@ -1281,7 +1281,7 @@ mod tests { fn test_emit_first_n_for_vectorized_group_values() { let data_set = VectorizedTestDataSet::new(); let mut group_values = - GroupValuesColumn::::try_new(data_set.schema()).unwrap(); + GroupValuesColumn::::try_new(data_set.schema(), 0).unwrap(); // 1~num_rows times to emit the groups let num_rows = data_set.expected_batch.num_rows(); @@ -1332,7 +1332,7 @@ mod tests { let field = Field::new_list_field(DataType::Int32, true); let schema = Arc::new(Schema::new_with_metadata(vec![field], HashMap::new())); - let mut group_values = GroupValuesColumn::::try_new(schema).unwrap(); + let mut group_values = GroupValuesColumn::::try_new(schema, 0).unwrap(); // Insert group index views and check if success to insert insert_inline_group_index_view(&mut group_values, 0, 0); diff --git a/datafusion/physical-plan/src/aggregates/group_values/row.rs b/datafusion/physical-plan/src/aggregates/group_values/row.rs index a3bd31f76c233..8b2742692bfab 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/row.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/row.rs @@ -82,7 +82,7 @@ pub struct GroupValuesRows { } impl GroupValuesRows { - pub fn try_new(schema: SchemaRef) -> Result { + pub fn try_new(schema: SchemaRef, capacity: usize) -> Result { // Print a debugging message, so it is clear when the (slower) fallback // GroupValuesRows is used. debug!("Creating GroupValuesRows for schema: {schema}"); @@ -94,9 +94,9 @@ impl GroupValuesRows { .collect(), )?; - let map = HashTable::with_capacity(0); + let map = HashTable::with_capacity(capacity); - let starting_rows_capacity = 1000; + let starting_rows_capacity = capacity.max(1000); let starting_data_capacity = 64 * starting_rows_capacity; let rows_buffer = diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs index b881a51b25474..2022601d5e0ec 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes.rs @@ -36,9 +36,9 @@ pub struct GroupValuesBytes { } impl GroupValuesBytes { - pub fn new(output_type: OutputType) -> Self { + pub fn new(output_type: OutputType, capacity: usize) -> Self { Self { - map: ArrowBytesMap::new(output_type), + map: ArrowBytesMap::with_capacity(output_type, capacity), num_groups: 0, } } diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs index 7a56f7c52c11a..b444bb94b815e 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs @@ -34,9 +34,9 @@ pub struct GroupValuesBytesView { } impl GroupValuesBytesView { - pub fn new(output_type: OutputType) -> Self { + pub fn new(output_type: OutputType, capacity: usize) -> Self { Self { - map: ArrowBytesViewMap::new(output_type), + map: ArrowBytesViewMap::with_capacity(output_type, capacity), num_groups: 0, } } diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs index efaf7eba0f1b5..26e7a7ca69fd1 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs @@ -98,12 +98,13 @@ pub struct GroupValuesPrimitive { } impl GroupValuesPrimitive { - pub fn new(data_type: DataType) -> Self { + pub fn new(data_type: DataType, capacity: usize) -> Self { assert!(PrimitiveArray::::is_compatible(&data_type)); + let capacity = capacity.max(128); Self { data_type, - map: HashTable::with_capacity(128), - values: Vec::with_capacity(128), + map: HashTable::with_capacity(capacity), + values: Vec::with_capacity(capacity), null_group: None, random_state: crate::aggregates::AGGREGATION_HASH_SEED, } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 79a21d1c345ec..1214036432743 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1202,7 +1202,10 @@ impl AggregateExec { /// **Grouping sets:** `GROUPING SETS ((a), (b), (a, b))` with NDV(a) = 100, NDV(b) = 50 /// → set(a) = 100, set(b) = 50, set(a, b) = 100 × 50 = 5,000 /// → total = 100 + 50 + 5,000 = 5,150 - fn compute_group_ndv(&self, child_statistics: &Statistics) -> Option { + pub(crate) fn compute_group_ndv( + &self, + child_statistics: &Statistics, + ) -> Option { let mut total: usize = 0; for group_mask in &self.group_by.groups { let mut set_product: usize = 1; diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 056a7f171a516..7dad666ac431b 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -587,7 +587,18 @@ impl GroupedHashAggregateStream { _ => OutOfMemoryMode::ReportError, }; - let group_values = new_group_values(group_schema, &group_ordering)?; + // Use NDV estimate from child statistics to pre-allocate hash table, + // bounded by 128K to avoid over-allocation. + const MAX_NDV_CAPACITY: usize = 128 * 1024; + let capacity_hint = agg + .input + .partition_statistics(None) + .ok() + .and_then(|stats| agg.compute_group_ndv(&stats)) + .map(|ndv: usize| ndv.min(MAX_NDV_CAPACITY)); + + let group_values = + new_group_values(group_schema, &group_ordering, capacity_hint)?; let reservation = MemoryConsumer::new(name) // We interpret 'can spill' as 'can handle memory back pressure'. // This value needs to be set to true for the default memory pool implementations @@ -1269,7 +1280,8 @@ impl GroupedHashAggregateStream { .merging_group_by .group_schema(&self.spill_state.spill_schema)?; if group_schema.fields().len() > 1 { - self.group_values = new_group_values(group_schema, &self.group_ordering)?; + self.group_values = + new_group_values(group_schema, &self.group_ordering, None)?; } // Use `OutOfMemoryMode::ReportError` from this point on diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 35b787759441c..6bd765434d4b9 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -441,7 +441,7 @@ struct DistinctDeduplicator { impl DistinctDeduplicator { fn new(schema: SchemaRef, task_context: &TaskContext) -> Result { - let group_values = new_group_values(schema, &GroupOrdering::None)?; + let group_values = new_group_values(schema, &GroupOrdering::None, None)?; let reservation = MemoryConsumer::new("RecursiveQueryHashTable") .register(task_context.memory_pool()); Ok(Self { From 2ebcfa8bf3f5faf2ed61df57bbfce84addd8a65c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 15 Apr 2026 21:47:08 +0200 Subject: [PATCH 2/3] Add preallocate to GroupsAccumulator trait and key implementations Add `preallocate(total_num_groups)` method to the `GroupsAccumulator` trait (default no-op) and implement it for: - PrimitiveGroupsAccumulator (SUM, MIN, MAX, etc.) - CountGroupsAccumulator - VarianceGroupsAccumulator - CorrelationGroupsAccumulator Call preallocate on all accumulators in GroupedHashAggregateStream when NDV capacity hint is available. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/expr-common/src/groups_accumulator.rs | 10 ++++++++++ .../src/aggregate/groups_accumulator/prim_op.rs | 4 ++++ datafusion/functions-aggregate/src/correlation.rs | 9 +++++++++ datafusion/functions-aggregate/src/count.rs | 4 ++++ datafusion/functions-aggregate/src/variance.rs | 6 ++++++ datafusion/physical-plan/src/aggregates/row_hash.rs | 8 +++++++- 6 files changed, 40 insertions(+), 1 deletion(-) diff --git a/datafusion/expr-common/src/groups_accumulator.rs b/datafusion/expr-common/src/groups_accumulator.rs index 9053f7a8eab9f..8ad852eaf22df 100644 --- a/datafusion/expr-common/src/groups_accumulator.rs +++ b/datafusion/expr-common/src/groups_accumulator.rs @@ -247,6 +247,16 @@ pub trait GroupsAccumulator: Send + std::any::Any { false } + /// Pre-allocates internal storage for the given number of groups. + /// + /// This is an optional optimization hint. When statistics (such as NDV + /// from Parquet metadata) predict the number of distinct groups, calling + /// this before the first `update_batch` avoids repeated resizing of + /// internal vectors. + /// + /// The default implementation is a no-op. + fn preallocate(&mut self, _total_num_groups: usize) {} + /// Amount of memory used to store the state of this accumulator, /// in bytes. /// diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs index a81b89e1e46f1..1ff325287970a 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/prim_op.rs @@ -195,6 +195,10 @@ where true } + fn preallocate(&mut self, total_num_groups: usize) { + self.values.resize(total_num_groups, self.starting_value); + } + fn size(&self) -> usize { self.values.capacity() * size_of::() + self.null_state.size() } diff --git a/datafusion/functions-aggregate/src/correlation.rs b/datafusion/functions-aggregate/src/correlation.rs index 2621fcf0bf3c7..6713cc5e27829 100644 --- a/datafusion/functions-aggregate/src/correlation.rs +++ b/datafusion/functions-aggregate/src/correlation.rs @@ -536,6 +536,15 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator { Ok(()) } + fn preallocate(&mut self, total_num_groups: usize) { + self.count.resize(total_num_groups, 0); + self.sum_x.resize(total_num_groups, 0.0); + self.sum_y.resize(total_num_groups, 0.0); + self.sum_xy.resize(total_num_groups, 0.0); + self.sum_xx.resize(total_num_groups, 0.0); + self.sum_yy.resize(total_num_groups, 0.0); + } + fn size(&self) -> usize { self.count.capacity() * size_of::() + self.sum_x.capacity() * size_of::() diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 81b6eda3f9b1c..7313a5e4be3ca 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -712,6 +712,10 @@ impl GroupsAccumulator for CountGroupsAccumulator { true } + fn preallocate(&mut self, total_num_groups: usize) { + self.counts.resize(total_num_groups, 0); + } + fn size(&self) -> usize { self.counts.capacity() * size_of::() } diff --git a/datafusion/functions-aggregate/src/variance.rs b/datafusion/functions-aggregate/src/variance.rs index ce3e00b9ffd91..0f51f95f7175f 100644 --- a/datafusion/functions-aggregate/src/variance.rs +++ b/datafusion/functions-aggregate/src/variance.rs @@ -575,6 +575,12 @@ impl GroupsAccumulator for VarianceGroupsAccumulator { ]) } + fn preallocate(&mut self, total_num_groups: usize) { + self.m2s.resize(total_num_groups, 0.0); + self.means.resize(total_num_groups, 0.0); + self.counts.resize(total_num_groups, 0); + } + fn size(&self) -> usize { self.m2s.capacity() * size_of::() + self.means.capacity() * size_of::() diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 7dad666ac431b..6296e064b1653 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -498,7 +498,7 @@ impl GroupedHashAggregateStream { }; // Instantiate the accumulators - let accumulators: Vec<_> = aggregate_exprs + let mut accumulators: Vec<_> = aggregate_exprs .iter() .map(create_group_accumulator) .collect::>()?; @@ -597,6 +597,12 @@ impl GroupedHashAggregateStream { .and_then(|stats| agg.compute_group_ndv(&stats)) .map(|ndv: usize| ndv.min(MAX_NDV_CAPACITY)); + if let Some(capacity) = capacity_hint { + for acc in &mut accumulators { + acc.preallocate(capacity); + } + } + let group_values = new_group_values(group_schema, &group_ordering, capacity_hint)?; let reservation = MemoryConsumer::new(name) From 3c61ea10c8bc5d201b5fcd2807279017ae6fdd07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Wed, 15 Apr 2026 22:20:22 +0200 Subject: [PATCH 3/3] Fall back to Interval::cardinality() for NDV estimation from min/max When distinct_count is absent, estimate NDV from min/max range using Interval::cardinality(), matching the approach already used by join cardinality estimation. This enables NDV-based pre-allocation for Parquet files that have min/max stats but lack explicit distinct_count metadata (e.g. ClickBench). Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/physical-plan/src/aggregates/mod.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 1214036432743..94338dc8049fd 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -49,6 +49,7 @@ use datafusion_common::{ Constraint, Constraints, Result, ScalarValue, assert_eq_or_internal_err, not_impl_err, }; use datafusion_execution::TaskContext; +use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::{Accumulator, Aggregate}; use datafusion_physical_expr::aggregate::AggregateFunctionExpr; use datafusion_physical_expr::equivalence::ProjectionMapping; @@ -1215,7 +1216,15 @@ impl AggregateExec { } let col = expr.as_any().downcast_ref::()?; let col_stats = &child_statistics.column_statistics[col.index()]; - let ndv = *col_stats.distinct_count.get_value()?; + let ndv = + col_stats.distinct_count.get_value().copied().or_else(|| { + let min = col_stats.min_value.get_value()?; + let max = col_stats.max_value.get_value()?; + let card = Interval::try_new(min.clone(), max.clone()) + .ok() + .and_then(|i| i.cardinality())?; + Some(card as usize) + })?; let null_adjustment = match col_stats.null_count.get_value() { Some(&n) if n > 0 => 1usize, _ => 0,