Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions datafusion/expr-common/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,16 @@ pub trait GroupsAccumulator: Send + std::any::Any {
false
}

/// Pre-allocates internal storage for the given number of groups.
///
/// This is an optional optimization hint. When statistics (such as NDV
/// from Parquet metadata) predict the number of distinct groups, calling
/// this before the first `update_batch` avoids repeated resizing of
/// internal vectors.
///
/// The default implementation is a no-op.
fn preallocate(&mut self, _total_num_groups: usize) {}

/// Amount of memory used to store the state of this accumulator,
/// in bytes.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ where
true
}

fn preallocate(&mut self, total_num_groups: usize) {
self.values.resize(total_num_groups, self.starting_value);
}

fn size(&self) -> usize {
self.values.capacity() * size_of::<T::Native>() + self.null_state.size()
}
Expand Down
9 changes: 9 additions & 0 deletions datafusion/functions-aggregate/src/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,15 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator {
Ok(())
}

fn preallocate(&mut self, total_num_groups: usize) {
self.count.resize(total_num_groups, 0);
self.sum_x.resize(total_num_groups, 0.0);
self.sum_y.resize(total_num_groups, 0.0);
self.sum_xy.resize(total_num_groups, 0.0);
self.sum_xx.resize(total_num_groups, 0.0);
self.sum_yy.resize(total_num_groups, 0.0);
}

fn size(&self) -> usize {
self.count.capacity() * size_of::<u64>()
+ self.sum_x.capacity() * size_of::<f64>()
Expand Down
4 changes: 4 additions & 0 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,10 @@ impl GroupsAccumulator for CountGroupsAccumulator {
true
}

fn preallocate(&mut self, total_num_groups: usize) {
self.counts.resize(total_num_groups, 0);
}

fn size(&self) -> usize {
self.counts.capacity() * size_of::<usize>()
}
Expand Down
6 changes: 6 additions & 0 deletions datafusion/functions-aggregate/src/variance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,12 @@ impl GroupsAccumulator for VarianceGroupsAccumulator {
])
}

fn preallocate(&mut self, total_num_groups: usize) {
self.m2s.resize(total_num_groups, 0.0);
self.means.resize(total_num_groups, 0.0);
self.counts.resize(total_num_groups, 0);
}

fn size(&self) -> usize {
self.m2s.capacity() * size_of::<f64>()
+ self.means.capacity() * size_of::<f64>()
Expand Down
7 changes: 6 additions & 1 deletion datafusion/physical-expr-common/src/binary_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,14 @@ where
V: Debug + PartialEq + Eq + Clone + Copy + Default,
{
pub fn new(output_type: OutputType) -> Self {
Self::with_capacity(output_type, INITIAL_MAP_CAPACITY)
}

pub fn with_capacity(output_type: OutputType, capacity: usize) -> Self {
let capacity = capacity.max(INITIAL_MAP_CAPACITY);
Self {
output_type,
map: hashbrown::hash_table::HashTable::with_capacity(INITIAL_MAP_CAPACITY),
map: hashbrown::hash_table::HashTable::with_capacity(capacity),
map_size: 0,
buffer: BufferBuilder::new(INITIAL_BUFFER_CAPACITY),
offsets: vec![O::default()], // first offset is always 0
Expand Down
7 changes: 6 additions & 1 deletion datafusion/physical-expr-common/src/binary_view_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,14 @@ where
V: Debug + PartialEq + Eq + Clone + Copy + Default,
{
pub fn new(output_type: OutputType) -> Self {
Self::with_capacity(output_type, INITIAL_MAP_CAPACITY)
}

pub fn with_capacity(output_type: OutputType, capacity: usize) -> Self {
let capacity = capacity.max(INITIAL_MAP_CAPACITY);
Self {
output_type,
map: hashbrown::hash_table::HashTable::with_capacity(INITIAL_MAP_CAPACITY),
map: hashbrown::hash_table::HashTable::with_capacity(capacity),
map_size: 0,
views: Vec::new(),
in_progress: Vec::new(),
Expand Down
47 changes: 37 additions & 10 deletions datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,18 @@ pub trait GroupValues: Send {
pub fn new_group_values(
schema: SchemaRef,
group_ordering: &GroupOrdering,
capacity_hint: Option<usize>,
) -> Result<Box<dyn GroupValues>> {
let capacity = capacity_hint.unwrap_or(0);
if schema.fields.len() == 1 {
let d = schema.fields[0].data_type();

macro_rules! downcast_helper {
($t:ty, $d:ident) => {
return Ok(Box::new(GroupValuesPrimitive::<$t>::new($d.clone())))
return Ok(Box::new(GroupValuesPrimitive::<$t>::new(
$d.clone(),
capacity,
)))
};
}

Expand Down Expand Up @@ -176,22 +181,40 @@ pub fn new_group_values(
downcast_helper!(Decimal128Type, d);
}
DataType::Utf8 => {
return Ok(Box::new(GroupValuesBytes::<i32>::new(OutputType::Utf8)));
return Ok(Box::new(GroupValuesBytes::<i32>::new(
OutputType::Utf8,
capacity,
)));
}
DataType::LargeUtf8 => {
return Ok(Box::new(GroupValuesBytes::<i64>::new(OutputType::Utf8)));
return Ok(Box::new(GroupValuesBytes::<i64>::new(
OutputType::Utf8,
capacity,
)));
}
DataType::Utf8View => {
return Ok(Box::new(GroupValuesBytesView::new(OutputType::Utf8View)));
return Ok(Box::new(GroupValuesBytesView::new(
OutputType::Utf8View,
capacity,
)));
}
DataType::Binary => {
return Ok(Box::new(GroupValuesBytes::<i32>::new(OutputType::Binary)));
return Ok(Box::new(GroupValuesBytes::<i32>::new(
OutputType::Binary,
capacity,
)));
}
DataType::LargeBinary => {
return Ok(Box::new(GroupValuesBytes::<i64>::new(OutputType::Binary)));
return Ok(Box::new(GroupValuesBytes::<i64>::new(
OutputType::Binary,
capacity,
)));
}
DataType::BinaryView => {
return Ok(Box::new(GroupValuesBytesView::new(OutputType::BinaryView)));
return Ok(Box::new(GroupValuesBytesView::new(
OutputType::BinaryView,
capacity,
)));
}
DataType::Boolean => {
return Ok(Box::new(GroupValuesBoolean::new()));
Expand All @@ -202,11 +225,15 @@ pub fn new_group_values(

if multi_group_by::supported_schema(schema.as_ref()) {
if matches!(group_ordering, GroupOrdering::None) {
Ok(Box::new(GroupValuesColumn::<false>::try_new(schema)?))
Ok(Box::new(GroupValuesColumn::<false>::try_new(
schema, capacity,
)?))
} else {
Ok(Box::new(GroupValuesColumn::<true>::try_new(schema)?))
Ok(Box::new(GroupValuesColumn::<true>::try_new(
schema, capacity,
)?))
}
} else {
Ok(Box::new(GroupValuesRows::try_new(schema)?))
Ok(Box::new(GroupValuesRows::try_new(schema, capacity)?))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
// ========================================================================

/// Create a new instance of GroupValuesColumn if supported for the specified schema
pub fn try_new(schema: SchemaRef) -> Result<Self> {
let map = HashTable::with_capacity(0);
pub fn try_new(schema: SchemaRef, capacity: usize) -> Result<Self> {
let map = HashTable::with_capacity(capacity);
Ok(Self {
schema,
map,
Expand Down Expand Up @@ -1268,7 +1268,7 @@ mod tests {
fn test_intern_for_vectorized_group_values() {
let data_set = VectorizedTestDataSet::new();
let mut group_values =
GroupValuesColumn::<false>::try_new(data_set.schema()).unwrap();
GroupValuesColumn::<false>::try_new(data_set.schema(), 0).unwrap();

data_set.load_to_group_values(&mut group_values);
let actual_batch = group_values.emit(EmitTo::All).unwrap();
Expand All @@ -1281,7 +1281,7 @@ mod tests {
fn test_emit_first_n_for_vectorized_group_values() {
let data_set = VectorizedTestDataSet::new();
let mut group_values =
GroupValuesColumn::<false>::try_new(data_set.schema()).unwrap();
GroupValuesColumn::<false>::try_new(data_set.schema(), 0).unwrap();

// 1~num_rows times to emit the groups
let num_rows = data_set.expected_batch.num_rows();
Expand Down Expand Up @@ -1332,7 +1332,7 @@ mod tests {

let field = Field::new_list_field(DataType::Int32, true);
let schema = Arc::new(Schema::new_with_metadata(vec![field], HashMap::new()));
let mut group_values = GroupValuesColumn::<false>::try_new(schema).unwrap();
let mut group_values = GroupValuesColumn::<false>::try_new(schema, 0).unwrap();

// Insert group index views and check if success to insert
insert_inline_group_index_view(&mut group_values, 0, 0);
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-plan/src/aggregates/group_values/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub struct GroupValuesRows {
}

impl GroupValuesRows {
pub fn try_new(schema: SchemaRef) -> Result<Self> {
pub fn try_new(schema: SchemaRef, capacity: usize) -> Result<Self> {
// Print a debugging message, so it is clear when the (slower) fallback
// GroupValuesRows is used.
debug!("Creating GroupValuesRows for schema: {schema}");
Expand All @@ -94,9 +94,9 @@ impl GroupValuesRows {
.collect(),
)?;

let map = HashTable::with_capacity(0);
let map = HashTable::with_capacity(capacity);

let starting_rows_capacity = 1000;
let starting_rows_capacity = capacity.max(1000);

let starting_data_capacity = 64 * starting_rows_capacity;
let rows_buffer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ pub struct GroupValuesBytes<O: OffsetSizeTrait> {
}

impl<O: OffsetSizeTrait> GroupValuesBytes<O> {
pub fn new(output_type: OutputType) -> Self {
pub fn new(output_type: OutputType, capacity: usize) -> Self {
Self {
map: ArrowBytesMap::new(output_type),
map: ArrowBytesMap::with_capacity(output_type, capacity),
num_groups: 0,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ pub struct GroupValuesBytesView {
}

impl GroupValuesBytesView {
pub fn new(output_type: OutputType) -> Self {
pub fn new(output_type: OutputType, capacity: usize) -> Self {
Self {
map: ArrowBytesViewMap::new(output_type),
map: ArrowBytesViewMap::with_capacity(output_type, capacity),
num_groups: 0,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,13 @@ pub struct GroupValuesPrimitive<T: ArrowPrimitiveType> {
}

impl<T: ArrowPrimitiveType> GroupValuesPrimitive<T> {
pub fn new(data_type: DataType) -> Self {
pub fn new(data_type: DataType, capacity: usize) -> Self {
assert!(PrimitiveArray::<T>::is_compatible(&data_type));
let capacity = capacity.max(128);
Self {
data_type,
map: HashTable::with_capacity(128),
values: Vec::with_capacity(128),
map: HashTable::with_capacity(capacity),
values: Vec::with_capacity(capacity),
null_group: None,
random_state: crate::aggregates::AGGREGATION_HASH_SEED,
}
Expand Down
16 changes: 14 additions & 2 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use datafusion_common::{
Constraint, Constraints, Result, ScalarValue, assert_eq_or_internal_err, not_impl_err,
};
use datafusion_execution::TaskContext;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::{Accumulator, Aggregate};
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::equivalence::ProjectionMapping;
Expand Down Expand Up @@ -1202,7 +1203,10 @@ impl AggregateExec {
/// **Grouping sets:** `GROUPING SETS ((a), (b), (a, b))` with NDV(a) = 100, NDV(b) = 50
/// → set(a) = 100, set(b) = 50, set(a, b) = 100 × 50 = 5,000
/// → total = 100 + 50 + 5,000 = 5,150
fn compute_group_ndv(&self, child_statistics: &Statistics) -> Option<usize> {
pub(crate) fn compute_group_ndv(
&self,
child_statistics: &Statistics,
) -> Option<usize> {
let mut total: usize = 0;
for group_mask in &self.group_by.groups {
let mut set_product: usize = 1;
Expand All @@ -1212,7 +1216,15 @@ impl AggregateExec {
}
let col = expr.as_any().downcast_ref::<Column>()?;
let col_stats = &child_statistics.column_statistics[col.index()];
let ndv = *col_stats.distinct_count.get_value()?;
let ndv =
col_stats.distinct_count.get_value().copied().or_else(|| {
let min = col_stats.min_value.get_value()?;
let max = col_stats.max_value.get_value()?;
let card = Interval::try_new(min.clone(), max.clone())
.ok()
.and_then(|i| i.cardinality())?;
Some(card as usize)
})?;
let null_adjustment = match col_stats.null_count.get_value() {
Some(&n) if n > 0 => 1usize,
_ => 0,
Expand Down
24 changes: 21 additions & 3 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ impl GroupedHashAggregateStream {
};

// Instantiate the accumulators
let accumulators: Vec<_> = aggregate_exprs
let mut accumulators: Vec<_> = aggregate_exprs
.iter()
.map(create_group_accumulator)
.collect::<Result<_>>()?;
Expand Down Expand Up @@ -587,7 +587,24 @@ impl GroupedHashAggregateStream {
_ => OutOfMemoryMode::ReportError,
};

let group_values = new_group_values(group_schema, &group_ordering)?;
// Use NDV estimate from child statistics to pre-allocate hash table,
// bounded by 128K to avoid over-allocation.
const MAX_NDV_CAPACITY: usize = 128 * 1024;
let capacity_hint = agg
.input
.partition_statistics(None)
.ok()
.and_then(|stats| agg.compute_group_ndv(&stats))
.map(|ndv: usize| ndv.min(MAX_NDV_CAPACITY));

if let Some(capacity) = capacity_hint {
for acc in &mut accumulators {
acc.preallocate(capacity);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably do these calls after MemoryConsumer

}
}

let group_values =
new_group_values(group_schema, &group_ordering, capacity_hint)?;
let reservation = MemoryConsumer::new(name)
// We interpret 'can spill' as 'can handle memory back pressure'.
// This value needs to be set to true for the default memory pool implementations
Expand Down Expand Up @@ -1269,7 +1286,8 @@ impl GroupedHashAggregateStream {
.merging_group_by
.group_schema(&self.spill_state.spill_schema)?;
if group_schema.fields().len() > 1 {
self.group_values = new_group_values(group_schema, &self.group_ordering)?;
self.group_values =
new_group_values(group_schema, &self.group_ordering, None)?;
}

// Use `OutOfMemoryMode::ReportError` from this point on
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/recursive_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ struct DistinctDeduplicator {

impl DistinctDeduplicator {
fn new(schema: SchemaRef, task_context: &TaskContext) -> Result<Self> {
let group_values = new_group_values(schema, &GroupOrdering::None)?;
let group_values = new_group_values(schema, &GroupOrdering::None, None)?;
let reservation = MemoryConsumer::new("RecursiveQueryHashTable")
.register(task_context.memory_pool());
Ok(Self {
Expand Down
Loading