Use NDV estimate to pre-allocate hash tables during aggregation#21654
Use NDV estimate to pre-allocate hash tables during aggregation#21654Dandandan wants to merge 3 commits intoapache:mainfrom
Conversation
Use column distinct_count statistics (from Parquet metadata or other sources) to pre-size the hash table in GroupValues implementations, avoiding expensive rehashing during aggregation. The capacity hint is bounded by 128K entries to prevent over-allocation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmark |
|
Hi @Dandandan, Supported benchmarks:
Usage: Per-side configuration ( env:
SHARED_SETTING: enabled
baseline:
ref: v45.0.0
env:
DATAFUSION_RUNTIME_MEMORY_LIMIT: 1G
changed:
ref: v46.0.0
env:
DATAFUSION_RUNTIME_MEMORY_LIMIT: 2GFile an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (104e098) to 5c653be (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (104e098) to 5c653be (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (104e098) to 5c653be (merge-base) diff using: tpcds File an issue against this benchmark runner |
Add `preallocate(total_num_groups)` method to the `GroupsAccumulator` trait (default no-op) and implement it for: - PrimitiveGroupsAccumulator (SUM, MIN, MAX, etc.) - CountGroupsAccumulator - VarianceGroupsAccumulator - CorrelationGroupsAccumulator Call preallocate on all accumulators in GroupedHashAggregateStream when NDV capacity hint is available. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
Thanks for tackling this @Dandandan! It was on my to-do list. We have a workload in Comet that spends the vast majority of its time in the hash agg just resizing and rehashing (almost 1 billion unique values), so I wanted to take a look at using Statistics to preallocate. The description seems perfect to me!
|
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (2ebcfa8) to 5c653be (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (2ebcfa8) to 5c653be (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (2ebcfa8) to 5c653be (merge-base) diff using: tpch File an issue against this benchmark runner |
Yeah the hashtable resizing is a pretty costly one (prob terms of cache / brach mispredicts), especially as the table grows larger.
As there also is a risk of estimating NDV too high, I added a cap for 128K rows (I think it should be configurable). |
|
|
||
| if let Some(capacity) = capacity_hint { | ||
| for acc in &mut accumulators { | ||
| acc.preallocate(capacity); |
There was a problem hiding this comment.
Should probably do these calls after MemoryConsumer
So for Comet we have the final aggregation after a shuffle stage, and Spark will tell us the number of rows in the shuffle stage. That would act as an upper-bound for NDV for Comet, while the max NDV in any single partition would act as our lower-bound. Comet would have to store these statistics values at plan generation for that stage. |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
When distinct_count is absent, estimate NDV from min/max range using Interval::cardinality(), matching the approach already used by join cardinality estimation. This enables NDV-based pre-allocation for Parquet files that have min/max stats but lack explicit distinct_count metadata (e.g. ClickBench). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (3c61ea1) to 5c653be (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (3c61ea1) to 5c653be (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing ndv-preallocate-hashtable (3c61ea1) to 5c653be (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |


Which issue does this PR close?
Rationale for this change
During hash aggregation, the hash table in
GroupValuesstarts with a small or zero initial capacity and grows dynamically as new groups are discovered. Each resize requires rehashing all existing entries, which is expensive for high-cardinality group-by queries.When column statistics include
distinct_count(e.g. from Parquet metadata), we can estimate the number of groups upfront and pre-allocate the hash table to avoid repeated rehashing.What changes are included in this PR?
GroupedHashAggregateStream::new(), compute the NDV (number of distinct values) estimate from child statistics usingAggregateExec::compute_group_ndv(), bounded by 128K entriesnew_group_values()to allGroupValuesimplementations:GroupValuesPrimitive- pre-sizesHashTableand valuesVecGroupValuesColumn- pre-sizesHashTableGroupValuesRows- pre-sizesHashTableand row bufferGroupValuesBytes/GroupValuesBytesView- pre-sizes underlyingArrowBytesMap/ArrowBytesViewMapwith_capacity()constructors toArrowBytesMapandArrowBytesViewMapAre these changes tested?
Covered by existing aggregation tests. The change is transparent - it only affects initial allocation sizes, not correctness.
Are there any user-facing changes?
No user-facing API changes. Aggregation queries may use slightly more initial memory but avoid rehashing overhead, improving performance for queries where NDV statistics are available.