Phase 4: Parallel hash aggregation - 9-15x faster than DuckDB#160
Conversation
…ressHashAgg - Add hash_int64() - fast FNV-1a hash for int64 keys without buffer construction - Add insert_batch_int64() and insert_batch_bytes() for batch insertion - Fix find_or_insert_int64() to use key_hash in collision check (was missing) - These support Phase 1 batch encoding optimization
📝 WalkthroughWalkthroughVectorized GROUP BY refactored to dual aggregation modes: DirectIndexAgg for low-cardinality integer keys and OpenAddressHashAgg for general cases. Operator accepts an optional ThreadPool for parallel aggregation and emits grouped output incrementally per input batch and at exhaustion. ChangesVectorized GROUP BY Operator Refactoring
Possibly related PRs
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR introduces a faster hashing path for INT64 group keys and adds batch-insert helpers to OpenAddressHashAgg, aiming to reduce per-row overhead in vectorized hash aggregation.
Changes:
- Added
OpenAddressHashAgg::hash_int64()intended to hash[0x02][int64]without building a temporary buffer. - Tightened
find_or_insert_int64()equality to also requirebucket.key_hash == hash. - Added
insert_batch_int64()andinsert_batch_bytes()to insert many keys with precomputed hashes.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Fast path for int64 keys: hash of [0x02][8-byte-int64] without buffer construction | ||
| static uint64_t hash_int64(int64_t key) { | ||
| uint64_t h = 14695981039346656037ull; | ||
| h ^= 0x02; // type tag for INT64 | ||
| h *= 1099511628211ull; | ||
| // XOR each byte of key in big-endian order | ||
| uint64_t v = static_cast<uint64_t>(key); | ||
| for (int i = 7; i >= 0; --i) { | ||
| h ^= (v >> (i * 8)) & 0xFF; | ||
| h *= 1099511628211ull; | ||
| } | ||
| return h; |
| /** | ||
| * @brief Insert a batch of string keys with precomputed hashes. | ||
| * @param keys Array of key byte arrays | ||
| * @param key_lens Array of key lengths | ||
| * @param hashes Array of precomputed hashes |
- Add batch encoding scratch buffers (batch_int64_keys_, batch_hashes_, etc.) - Add all_int64_keys_ detection for fast int64-only path - process_input_batch_open_addressing now: 1. Batch encodes all keys in the batch 2. Batch computes all FNV-1a hashes using hash_int64() for int64 keys 3. Row-by-row lookup using precomputed hashes, then accumulator updates Expected impact: 35-45% reduction in GROUP BY key encoding time
- Modify OpenAddressHashAgg::init() to pre-allocate capacity = next power of 2 above (capacity_hint / kLoadFactor) - With 0.5 load factor, this ensures grow() is never called for typical workloads - Eliminates O(n) bucket copy and re-hash during grow() Expected impact: 15-20% reduction by avoiding grow() overhead
- Add thread_pool_ and num_threads_ members to VectorizedGroupByOperator - Add process_thread_batch() for per-thread hash table updates - Partition rows by hash % num_threads_ for load balancing - Each thread builds local OpenAddressHashAgg, merged at output - Pass ThreadPool from query_executor to VectorizedGroupByOperator Benchmark shows 9-15x speedup over DuckDB on Q6 (from 1000x slower)
Critical bug fix: parallel results were being silently discarded. - Add merge_from() method to OpenAddressHashAgg to merge accumulators - Add merge step after thread_pool_->wait() to merge thread results - Fix capacity floor (8192 min) for high thread counts - Remove unused insert_batch_int64/insert_batch_bytes dead code Tests: all 50 cloudSQL + 38 vectorized_operator tests pass Benchmark: Q6 still 10-15x faster than DuckDB
- Extract shared update_bucket_accumulators() helper method - Replace duplicated accumulator blocks in sequential and thread paths - Remove unused futures vector (fire-and-forget pattern) - Add key_type documentation comment (0x01=NULL, 0x02=INT64, etc.) - Remove redundant key copy in merge_from (find_or_insert already sets key fields) - Fix const correctness with template parameter for update_bucket_accumulators Tests: all 50 cloudSQL + 38 vectorized_operator tests pass
- Add ParallelAggregationCorrectness test with 4-thread ThreadPool - Add one-time warning when string key exceeds MAX_KEY_LEN - Add const-correctness note for update_bucket_accumulators - Clarify batch_key_buffer_ is heap-allocated scratch space Tests: all 50 cloudSQL + 39 vectorized_operator tests pass
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@include/executor/vectorized_operator.hpp`:
- Around line 988-993: The code in produce_output_batch_open_addressing
incorrectly indexes hash_group_keys_ with bucket indices from valid_slots(),
causing wrong/out-of-bounds access; fix by co-locating the group key with its
bucket (add a key storage to HashBucket, e.g., a std::vector<common::Value>
key_vals member) and populate that when a new bucket is created (where
hash_group_keys_.push_back(...) currently happens), then change
produce_output_batch_open_addressing to read the key from the bucket (use the
HashBucket instance retrieved by idx) instead of indexing hash_group_keys_;
alternatively, if you prefer mapping, add a bucket_to_key_idx_ mapping updated
on bucket creation and use that to translate idx into the sequential index into
hash_group_keys_. Ensure any other code that reads hash_group_keys_ is updated
to use the new bucket key storage or the mapping.
- Around line 880-894: MIN/MAX currently always uses to_int64() in the MIN/MAX
branch inside the aggregation loop (the block checking agg.type ==
AggregateType::Min/Max and agg.input_col_idx), which truncates float64 values;
update that branch to check the column type (e.g., column.type() or equivalent)
and if TYPE_FLOAT64 use col.get(row_idx).to_double() and update new HashBucket
fields mins_float64, maxes_float64 and has_float_minmax (set/compare using
std::min/std::max for doubles), otherwise continue using the existing int64
path; add those three members to the HashBucket struct (mins_float64,
maxes_float64, has_float_minmax) and ensure any other code that reads/writes
bucket min/max values accounts for the new float variants.
- Around line 943-979: In produce_output_batch_direct_index the COUNT(*) groups
are re-emitted because slot.valid remains true; add and use an "emitted" flag on
the aggregation slot (e.g., slot.emitted or slot.output_emitted) and update the
emission condition to require !slot.emitted in addition to the existing checks
(so the if becomes: counts[0] > 0 || (slot.valid && aggregates_[0].type ==
AggregateType::Count && aggregates_[0].input_col_idx < 0 && !slot.emitted)).
After writing the row, set slot.emitted = true (or clear slot.valid if you
prefer that semantic) so COUNT(*) groups are not emitted again; update any slot
initialization/reset logic to initialize emitted = false.
- Around line 496-509: Both get_slot and track_key compute indices differently
and cast negative keys to size_t (causing wraparound); unify them to use a
signed offset from min_key_ and validate bounds: in both get_slot(int64_t key)
and track_key(int64_t key) compute size_t idx =
static_cast<size_t>(static_cast<int64_t>(key) - min_key_); ensure min_key_ is
set to the correct lower bound for int8 keys (e.g., -128) or otherwise
initialized, add a range check (key < min_key_ || key > max_key_) to avoid
out-of-bounds access on slots_ and only then access slots_[idx] and push idx
into valid_indices_.
- Around line 796-803: After merging per-thread state into the main aggregator
(hash_agg_.merge_from(thread_hash_aggs_[t])) and appending keys from
thread_group_keys_[t] into hash_group_keys_, reset the per-thread state to avoid
double-counting on subsequent batches: clear or reinitialize
thread_hash_aggs_[t] (e.g., call its clear/reset method or replace with a fresh
aggregator) and clear thread_group_keys_[t] (e.g., .clear() or swap with an
empty vector) inside the same loop that merges (the loop referencing
num_threads_, thread_hash_aggs_, thread_group_keys_, hash_agg_, and
hash_group_keys_).
In `@tests/vectorized_operator_tests.cpp`:
- Around line 1589-1602: The test assumes group rows are emitted in a fixed
order; change the verification to be order-independent by iterating all rows in
*result, reading the category key (from the key column used by the GROUP BY,
e.g. result->get_column(0).get(r).as_int64()), and building a map from category
-> (count,value sum) using result->get_column(1) for count and
result->get_column(2) for sum; then assert the map has 10 entries and for each
category k verify map[k].count == 10 and map[k].sum ==
10*static_cast<int64_t>(k) + 460. This replaces the index-based EXPECT_EQ checks
after groupby.next_batch(*result).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: eb84a38b-91d7-404f-bfd7-d36dd16bb49b
📒 Files selected for processing (3)
include/executor/vectorized_operator.hppsrc/executor/query_executor.cpptests/vectorized_operator_tests.cpp
- Added VectorizedProjectOperator, VectorizedAggregateOperator, VectorizedHashJoinOperator - Fixed produce_output_batch_open_addressing to output all groups and decode keys from bucket - Fixed grow() to properly copy accumulators and rebuild valid_indices_ - Fixed merge_from to rebuild valid_indices_ after merge - Fixed find_or_insert and find_or_insert_int64 to initialize accumulators - Added output_bucket_indices_ to track which buckets have been output - Fixed set_row_count to use rows_output instead of hardcoded 1
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
include/executor/vectorized_operator.hpp (1)
1084-1098:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winMIN/MAX truncates float64 values to int64, producing incorrect results.
Despite being marked as addressed in a previous commit,
update_bucket_accumulatorsstill usesto_int64()unconditionally for MIN/MAX (line 1088). ForTYPE_FLOAT64columns, this truncates values (e.g., 1.7 → 1), causing incorrect aggregate results.Compare with the SUM/AVG handling (lines 1075-1082) which correctly branches on column type.
🐛 Proposed fix: Handle float64 type for MIN/MAX
} else if ((agg.type == AggregateType::Min || agg.type == AggregateType::Max) && agg.input_col_idx >= 0) { const auto& col = batch.get_column(agg.input_col_idx); if (!col.is_null(row_idx)) { - auto val = col.get(row_idx).to_int64(); - if (!bucket.has_mins[i]) { - bucket.mins[i] = val; - bucket.maxes[i] = val; - bucket.has_mins[i] = true; - } else { - bucket.mins[i] = std::min(bucket.mins[i], val); - bucket.maxes[i] = std::max(bucket.maxes[i], val); + if (col.type() == common::ValueType::TYPE_FLOAT64) { + auto& num_col = dynamic_cast<const NumericVector<double>&>(col); + double val = num_col.raw_data()[row_idx]; + if (!bucket.has_mins[i]) { + bucket.mins_float64[i] = val; + bucket.maxes_float64[i] = val; + bucket.has_mins[i] = true; + bucket.has_float_value[i] = true; // Reuse flag to indicate float min/max + } else { + bucket.mins_float64[i] = std::min(bucket.mins_float64[i], val); + bucket.maxes_float64[i] = std::max(bucket.maxes_float64[i], val); + } + } else { + auto val = col.get(row_idx).to_int64(); + if (!bucket.has_mins[i]) { + bucket.mins[i] = val; + bucket.maxes[i] = val; + bucket.has_mins[i] = true; + } else { + bucket.mins[i] = std::min(bucket.mins[i], val); + bucket.maxes[i] = std::max(bucket.maxes[i], val); + } } } }Note: This requires adding
mins_float64[MAX_AGGREGATES]andmaxes_float64[MAX_AGGREGATES]arrays toHashBucket.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/executor/vectorized_operator.hpp` around lines 1084 - 1098, The MIN/MAX branch in update_bucket_accumulators currently calls col.get(...).to_int64() unconditionally, truncating TYPE_FLOAT64 values; modify update_bucket_accumulators to branch on the column type for AggregateType::Min and AggregateType::Max (similar to SUM/AVG): if column type is TYPE_FLOAT64 use col.get(...).to_double() and update new mins_float64[i]/maxes_float64[i] in HashBucket, otherwise use to_int64() and the existing mins[i]/maxes[i]; also add mins_float64[MAX_AGGREGATES] and maxes_float64[MAX_AGGREGATES] to the HashBucket structure and initialize/compare them appropriately when bucket.has_mins[i] is set/checked.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@include/executor/vectorized_operator.hpp`:
- Around line 1620-1639: emit_unmatched_left_rows currently iterates
unmatched_left_indices_ with a range-for and returns when the output batch
fills, causing re-emission on resume; add a member variable
unmatched_left_emit_idx_ to track the current index into
unmatched_left_indices_, update the loop in emit_unmatched_left_rows to start
from unmatched_left_emit_idx_ and advance it as rows are emitted, clear/reset
unmatched_left_emit_idx_ when the vector is exhausted (and preserve it when
returning true); apply analogous tracking for emit_unmatched_right_rows using
unmatched_right_emit_idx_ so both functions resume correctly without duplicating
rows.
- Around line 584-589: The loop rebuilding valid_indices_ by iterating buckets_
and pushing occupied indices is redundant and causes duplicate entries; remove
that loop so valid_indices_ is only populated during bucket allocations in
find_or_insert and find_or_insert_int64 (the rehash/allocation code paths),
which will eliminate duplicates and remove the need for the O(n²) dedup
workaround in produce_output_batch_open_addressing.
- Around line 1199-1222: The loop always reads the type tag from
bucket.key_data[0] causing wrong tags for columns after the first; change the
decoding to read the tag at the current position (use
bucket.key_data[key_offset]) and advance key_offset by 1 immediately after
reading the tag, then parse the value using the updated key_offset and further
advance by the value size (e.g., +8 for INT64, +4 + str_len for STRING); update
any length checks to use the correct offsets and adjust initial key_offset (set
to 0 instead of 1) so group_by_ decoding in the loop (the block referencing
bucket.key_data, key_offset, bucket.key_len, and out_batch.get_column) uses the
per-column tag at the current position.
---
Outside diff comments:
In `@include/executor/vectorized_operator.hpp`:
- Around line 1084-1098: The MIN/MAX branch in update_bucket_accumulators
currently calls col.get(...).to_int64() unconditionally, truncating TYPE_FLOAT64
values; modify update_bucket_accumulators to branch on the column type for
AggregateType::Min and AggregateType::Max (similar to SUM/AVG): if column type
is TYPE_FLOAT64 use col.get(...).to_double() and update new
mins_float64[i]/maxes_float64[i] in HashBucket, otherwise use to_int64() and the
existing mins[i]/maxes[i]; also add mins_float64[MAX_AGGREGATES] and
maxes_float64[MAX_AGGREGATES] to the HashBucket structure and initialize/compare
them appropriately when bucket.has_mins[i] is set/checked.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: a85bba38-ba24-4f97-93fb-0a89ca43e37f
📒 Files selected for processing (1)
include/executor/vectorized_operator.hpp
There was a problem hiding this comment.
Caution
Inline review comments failed to post. This is likely due to GitHub's internal server error or limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
include/executor/vectorized_operator.hpp (1)
1084-1098:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winMIN/MAX truncates float64 values to int64, producing incorrect results.
Despite being marked as addressed in a previous commit,
update_bucket_accumulatorsstill usesto_int64()unconditionally for MIN/MAX (line 1088). ForTYPE_FLOAT64columns, this truncates values (e.g., 1.7 → 1), causing incorrect aggregate results.Compare with the SUM/AVG handling (lines 1075-1082) which correctly branches on column type.
🐛 Proposed fix: Handle float64 type for MIN/MAX
} else if ((agg.type == AggregateType::Min || agg.type == AggregateType::Max) && agg.input_col_idx >= 0) { const auto& col = batch.get_column(agg.input_col_idx); if (!col.is_null(row_idx)) { - auto val = col.get(row_idx).to_int64(); - if (!bucket.has_mins[i]) { - bucket.mins[i] = val; - bucket.maxes[i] = val; - bucket.has_mins[i] = true; - } else { - bucket.mins[i] = std::min(bucket.mins[i], val); - bucket.maxes[i] = std::max(bucket.maxes[i], val); + if (col.type() == common::ValueType::TYPE_FLOAT64) { + auto& num_col = dynamic_cast<const NumericVector<double>&>(col); + double val = num_col.raw_data()[row_idx]; + if (!bucket.has_mins[i]) { + bucket.mins_float64[i] = val; + bucket.maxes_float64[i] = val; + bucket.has_mins[i] = true; + bucket.has_float_value[i] = true; // Reuse flag to indicate float min/max + } else { + bucket.mins_float64[i] = std::min(bucket.mins_float64[i], val); + bucket.maxes_float64[i] = std::max(bucket.maxes_float64[i], val); + } + } else { + auto val = col.get(row_idx).to_int64(); + if (!bucket.has_mins[i]) { + bucket.mins[i] = val; + bucket.maxes[i] = val; + bucket.has_mins[i] = true; + } else { + bucket.mins[i] = std::min(bucket.mins[i], val); + bucket.maxes[i] = std::max(bucket.maxes[i], val); + } } } }Note: This requires adding
mins_float64[MAX_AGGREGATES]andmaxes_float64[MAX_AGGREGATES]arrays toHashBucket.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/executor/vectorized_operator.hpp` around lines 1084 - 1098, The MIN/MAX branch in update_bucket_accumulators currently calls col.get(...).to_int64() unconditionally, truncating TYPE_FLOAT64 values; modify update_bucket_accumulators to branch on the column type for AggregateType::Min and AggregateType::Max (similar to SUM/AVG): if column type is TYPE_FLOAT64 use col.get(...).to_double() and update new mins_float64[i]/maxes_float64[i] in HashBucket, otherwise use to_int64() and the existing mins[i]/maxes[i]; also add mins_float64[MAX_AGGREGATES] and maxes_float64[MAX_AGGREGATES] to the HashBucket structure and initialize/compare them appropriately when bucket.has_mins[i] is set/checked.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@include/executor/vectorized_operator.hpp`:
- Around line 1620-1639: emit_unmatched_left_rows currently iterates
unmatched_left_indices_ with a range-for and returns when the output batch
fills, causing re-emission on resume; add a member variable
unmatched_left_emit_idx_ to track the current index into
unmatched_left_indices_, update the loop in emit_unmatched_left_rows to start
from unmatched_left_emit_idx_ and advance it as rows are emitted, clear/reset
unmatched_left_emit_idx_ when the vector is exhausted (and preserve it when
returning true); apply analogous tracking for emit_unmatched_right_rows using
unmatched_right_emit_idx_ so both functions resume correctly without duplicating
rows.
- Around line 584-589: The loop rebuilding valid_indices_ by iterating buckets_
and pushing occupied indices is redundant and causes duplicate entries; remove
that loop so valid_indices_ is only populated during bucket allocations in
find_or_insert and find_or_insert_int64 (the rehash/allocation code paths),
which will eliminate duplicates and remove the need for the O(n²) dedup
workaround in produce_output_batch_open_addressing.
- Around line 1199-1222: The loop always reads the type tag from
bucket.key_data[0] causing wrong tags for columns after the first; change the
decoding to read the tag at the current position (use
bucket.key_data[key_offset]) and advance key_offset by 1 immediately after
reading the tag, then parse the value using the updated key_offset and further
advance by the value size (e.g., +8 for INT64, +4 + str_len for STRING); update
any length checks to use the correct offsets and adjust initial key_offset (set
to 0 instead of 1) so group_by_ decoding in the loop (the block referencing
bucket.key_data, key_offset, bucket.key_len, and out_batch.get_column) uses the
per-column tag at the current position.
---
Outside diff comments:
In `@include/executor/vectorized_operator.hpp`:
- Around line 1084-1098: The MIN/MAX branch in update_bucket_accumulators
currently calls col.get(...).to_int64() unconditionally, truncating TYPE_FLOAT64
values; modify update_bucket_accumulators to branch on the column type for
AggregateType::Min and AggregateType::Max (similar to SUM/AVG): if column type
is TYPE_FLOAT64 use col.get(...).to_double() and update new
mins_float64[i]/maxes_float64[i] in HashBucket, otherwise use to_int64() and the
existing mins[i]/maxes[i]; also add mins_float64[MAX_AGGREGATES] and
maxes_float64[MAX_AGGREGATES] to the HashBucket structure and initialize/compare
them appropriately when bucket.has_mins[i] is set/checked.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: a85bba38-ba24-4f97-93fb-0a89ca43e37f
📒 Files selected for processing (1)
include/executor/vectorized_operator.hpp
🛑 Comments failed to post (3)
include/executor/vectorized_operator.hpp (3)
584-589:
⚠️ Potential issue | 🟠 Major | ⚡ Quick winRemove redundant
valid_indices_rebuild that creates duplicates.
find_or_insertandfind_or_insert_int64already push tovalid_indices_when allocating new buckets during the rehash loop (lines 501, 545). This second loop adds duplicate entries, causingvalid_indices_to contain each bucket index twice.The duplication forced an O(n²) dedup workaround in
produce_output_batch_open_addressing(lines 1189-1196). Removing this loop fixes the root cause.🐛 Proposed fix
} } - // Rebuild valid_indices_ to include ALL occupied buckets (not just new ones) - for (size_t i = 0; i < buckets_.size(); ++i) { - if (buckets_[i].occupied) { - valid_indices_.push_back(i); - } - } }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.} } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/executor/vectorized_operator.hpp` around lines 584 - 589, The loop rebuilding valid_indices_ by iterating buckets_ and pushing occupied indices is redundant and causes duplicate entries; remove that loop so valid_indices_ is only populated during bucket allocations in find_or_insert and find_or_insert_int64 (the rehash/allocation code paths), which will eliminate duplicates and remove the need for the O(n²) dedup workaround in produce_output_batch_open_addressing.
1199-1222:
⚠️ Potential issue | 🔴 Critical | ⚡ Quick winMulti-column GROUP BY key decoding reads wrong type tag for columns after the first.
The key decoding loop always reads the type tag from
bucket.key_data[0](line 1202) instead of the current position. For multi-column GROUP BY keys encoded as[tag0][val0][tag1][val1]..., this causes columns after the first to be decoded with the wrong type tag, producing incorrect output.🐛 Proposed fix: Read type tag from current position
- size_t key_offset = 1; // Skip type tag + size_t key_offset = 0; for (size_t c = 0; c < group_by_.size(); ++c) { - uint8_t type_tag = bucket.key_data[0]; + uint8_t type_tag = bucket.key_data[key_offset]; + key_offset++; // Move past type tag if (type_tag == 0x01) { // NULL out_batch.get_column(c).append(common::Value::make_null()); - key_offset = 1; - } else if (type_tag == 0x02 && bucket.key_len >= key_offset + 8) { // INT64 + } else if (type_tag == 0x02) { // INT64 int64_t val; std::memcpy(&val, &bucket.key_data[key_offset], 8); out_batch.get_column(c).append(common::Value::make_int64(val)); - key_offset += 9; // 1 byte tag + 8 bytes + key_offset += 8; } else if (type_tag == 0x04) { // STRING uint32_t str_len; std::memcpy(&str_len, &bucket.key_data[key_offset], 4);🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/executor/vectorized_operator.hpp` around lines 1199 - 1222, The loop always reads the type tag from bucket.key_data[0] causing wrong tags for columns after the first; change the decoding to read the tag at the current position (use bucket.key_data[key_offset]) and advance key_offset by 1 immediately after reading the tag, then parse the value using the updated key_offset and further advance by the value size (e.g., +8 for INT64, +4 + str_len for STRING); update any length checks to use the correct offsets and adjust initial key_offset (set to 0 instead of 1) so group_by_ decoding in the loop (the block referencing bucket.key_data, key_offset, bucket.key_len, and out_batch.get_column) uses the per-column tag at the current position.
1620-1639:
⚠️ Potential issue | 🟠 Major | ⚡ Quick winBoth unmatched row emission methods lack resumption tracking.
emit_unmatched_left_rowsandemit_unmatched_right_rowsshare the same root cause: they use range-for iteration without tracking the emission position. When the output batch fills mid-loop, the function returnstrue, but the next call restarts from the beginning of the indices vector, re-emitting already-output rows.Both methods need position tracking (e.g., member variables
unmatched_left_emit_idx_andunmatched_right_emit_idx_) to resume correctly.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/executor/vectorized_operator.hpp` around lines 1620 - 1639, emit_unmatched_left_rows currently iterates unmatched_left_indices_ with a range-for and returns when the output batch fills, causing re-emission on resume; add a member variable unmatched_left_emit_idx_ to track the current index into unmatched_left_indices_, update the loop in emit_unmatched_left_rows to start from unmatched_left_emit_idx_ and advance it as rows are emitted, clear/reset unmatched_left_emit_idx_ when the vector is exhausted (and preserve it when returning true); apply analogous tracking for emit_unmatched_right_rows using unmatched_right_emit_idx_ so both functions resume correctly without duplicating rows.
- VectorizedFilterOperator: fix row_count not being set after appending rows, causing FilterAllMatch and PipelinedBatches tests to fail - BinaryExpr::evaluate_vectorized: add Ge (>=) operator support for INT64 columns with constant, fixing val >= 0 filter conditions - VectorizedGroupByTests.ParallelAggregationCorrectness: use std::map for order-independent verification since TEXT key output order is non-deterministic with hash aggregation
…scope - Add limitation note to DirectIndexAgg class: only supports INT8 range - Add note in track_key() about key truncation for out-of-range values - Clarify parallel aggregation comment: only applies to OpenAddressHashAgg, not DirectIndexAgg path - Ensure file ends with newline
- DUCKDB_COMPARISON.md: Update Q6 benchmark results (9.7x and 15.4x speedup) - VECTORIZED_EXECUTION.md: Update GROUP BY performance to 7.3G rows/s - PHASE_8_ANALYTICS.md: Document OpenAddressHashAgg, DirectIndexAgg, and parallel aggregation
There was a problem hiding this comment.
♻️ Duplicate comments (4)
include/executor/vectorized_operator.hpp (4)
1148-1184:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winCritical: COUNT(*) groups re-emitted infinitely (duplicate).
This issue was flagged in a previous review but remains unresolved. After outputting a group,
counts[0]is set to 0 (line 1179) butslot.validremains true. On the next call, for COUNT(*) as the first aggregate, the condition at lines 1152-1153 evaluates to true again becauseslot.valid && aggregates_[0].type == AggregateType::Count && input_col_idx < 0is true, causing the group to be re-emitted with count=0 on every call.Add an
emittedflag toGroupSlotand check!slot.emittedin the emission condition, then setslot.emitted = trueafter outputting the row.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/executor/vectorized_operator.hpp` around lines 1148 - 1184, The produce_output_batch_direct_index function re-emits COUNT(*) groups because it only zeroes slot.counts[0] but leaves slot.valid true and the existing condition (slot.counts[0] > 0 || (slot.valid && aggregates_[0].type == AggregateType::Count && aggregates_[0].input_col_idx < 0)) still matches; add a boolean emitted field to GroupSlot, include !slot.emitted in that emission condition, and after writing the output set slot.emitted = true (and keep the existing slot.counts[0] = 0) so groups are not emitted repeatedly; update GroupSlot initialization/reset logic wherever slots are created to clear emitted.
1010-1016:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winCritical: Thread-local state not cleared after merge causes double-counting (duplicate).
This issue was flagged in a previous review but remains unresolved. After merging
thread_hash_aggs_[t]andthread_group_keys_[t]into the main aggregator, the thread-local state is not cleared. On subsequent input batches, new rows are accumulated into hash tables that still contain previous batch data. When merged again, the old data is re-added viadst.counts[i] += src.counts[i](line 619), producing incorrect aggregate results.After the merge loop, add:
// Clear thread-local state for next batch thread_hash_aggs_[t].init(std::max(size_t(8192), 65536 / num_threads_), aggregates_.size()); thread_group_keys_[t].clear();🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/executor/vectorized_operator.hpp` around lines 1010 - 1016, After merging each thread's partial results into the main aggregator (when iterating over thread_hash_aggs_ and thread_group_keys_ and appending into hash_group_keys_), reset that thread-local state so it doesn't carry into the next batch: reinitialize thread_hash_aggs_[t] with a suitable capacity (e.g., init(max(8192, 65536 / num_threads_), aggregates_.size())) and clear thread_group_keys_[t] immediately after the merge; place this directly after the existing hash_agg_.merge_from(thread_hash_aggs_[t]) and hash_group_keys_.insert(...) calls to avoid double-counting on subsequent merges.
899-913:⚠️ Potential issue | 🟠 Major | ⚡ Quick winMajor: MIN/MAX aggregates truncate float64 values (duplicate, fix not applied).
A previous review flagged this issue and marked it as addressed, but the fix was not applied. Both
process_input_batch_direct_index(lines 899-913) andupdate_bucket_accumulators(lines 1093-1107) callto_int64()for MIN/MAX aggregates, truncating float64 values (e.g., 1.7 → 1) and producing incorrect results.The fix requires:
- Add
mins_float64[],maxes_float64[], andhas_float_minmax[]to bothHashBucketandGroupSlot- Branch on
col.type()in the MIN/MAX handling code- Use
to_float64()and update the float64 accumulators whenTYPE_FLOAT64, otherwise use the existing int64 pathAlso applies to: 1093-1107
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/executor/vectorized_operator.hpp` around lines 899 - 913, The MIN/MAX handling currently truncates floats by calling to_int64(); add float-specific accumulators mins_float64[], maxes_float64[] and has_float_minmax[] to both HashBucket and GroupSlot, then in process_input_batch_direct_index and update_bucket_accumulators branch on the column type (inspect col.type() / TYPE_FLOAT64) and for floats use to_float64() to update mins_float64/maxes_float64 and has_float_minmax, otherwise keep the existing to_int64() logic that updates mins/maxes and has_mins; ensure both functions maintain and compare the correct typed accumulator when initializing and updating per-group values.
709-711:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winCritical: Negative keys cause wraparound and out-of-bounds access (duplicate).
This issue was flagged in a previous review but remains unresolved. Lines 710 and 719 both cast negative int64 keys to
size_t, causing wraparound toSIZE_MAXand out-of-bounds access toslots_[SIZE_MAX].For int8 range [-128, 127] to map correctly to slot indices [0, 255], both methods need:
size_t idx = static_cast<size_t>(static_cast<uint8_t>(static_cast<int8_t>(key)));Also applies to: 717-719
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/executor/vectorized_operator.hpp` around lines 709 - 711, The code casts negative int64_t keys to size_t which wraps and indexes out-of-bounds; update the index computation in GroupSlot& get_slot(int64_t key) and the const overload (the duplicate at the other location) to normalize the key through int8/uint8 before sizing: compute idx by first converting key to int8_t, then to uint8_t, and finally to size_t (i.e., replace static_cast<size_t>(key - min_key_) style casts with the three-step cast sequence) so keys in the int8 range map to [0,255] without wraparound.
🧹 Nitpick comments (1)
include/executor/vectorized_operator.hpp (1)
807-815: 💤 Low valueMinor: Thread-local hash tables allocated even when not used.
When
is_direct_indexable_is true (DirectIndexAgg path), the code still allocatesthread_hash_aggs_andthread_group_keys_(lines 809-814), but these are never used because the DirectIndexAgg path (line 851) doesn't support parallel processing. Consider wrapping this allocation inif (!is_direct_indexable_)to avoid wasting memory.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/executor/vectorized_operator.hpp` around lines 807 - 815, The code always allocates per-thread structures (thread_hash_aggs_ and thread_group_keys_) when thread_pool_ has multiple threads, even though the DirectIndexAgg path (guarded by is_direct_indexable_) does not use them; update the allocation block inside the thread_pool_/num_threads_ check to only resize/init thread_hash_aggs_ and thread_group_keys_ when !is_direct_indexable_. Specifically, in the section referencing thread_pool_, num_threads_ and the loop that calls thread_hash_aggs_[t].init(...), add a guard that checks is_direct_indexable_ and skip the resize/initialization when true so memory is not wasted for DirectIndexAgg.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@include/executor/vectorized_operator.hpp`:
- Around line 1148-1184: The produce_output_batch_direct_index function re-emits
COUNT(*) groups because it only zeroes slot.counts[0] but leaves slot.valid true
and the existing condition (slot.counts[0] > 0 || (slot.valid &&
aggregates_[0].type == AggregateType::Count && aggregates_[0].input_col_idx <
0)) still matches; add a boolean emitted field to GroupSlot, include
!slot.emitted in that emission condition, and after writing the output set
slot.emitted = true (and keep the existing slot.counts[0] = 0) so groups are not
emitted repeatedly; update GroupSlot initialization/reset logic wherever slots
are created to clear emitted.
- Around line 1010-1016: After merging each thread's partial results into the
main aggregator (when iterating over thread_hash_aggs_ and thread_group_keys_
and appending into hash_group_keys_), reset that thread-local state so it
doesn't carry into the next batch: reinitialize thread_hash_aggs_[t] with a
suitable capacity (e.g., init(max(8192, 65536 / num_threads_),
aggregates_.size())) and clear thread_group_keys_[t] immediately after the
merge; place this directly after the existing
hash_agg_.merge_from(thread_hash_aggs_[t]) and hash_group_keys_.insert(...)
calls to avoid double-counting on subsequent merges.
- Around line 899-913: The MIN/MAX handling currently truncates floats by
calling to_int64(); add float-specific accumulators mins_float64[],
maxes_float64[] and has_float_minmax[] to both HashBucket and GroupSlot, then in
process_input_batch_direct_index and update_bucket_accumulators branch on the
column type (inspect col.type() / TYPE_FLOAT64) and for floats use to_float64()
to update mins_float64/maxes_float64 and has_float_minmax, otherwise keep the
existing to_int64() logic that updates mins/maxes and has_mins; ensure both
functions maintain and compare the correct typed accumulator when initializing
and updating per-group values.
- Around line 709-711: The code casts negative int64_t keys to size_t which
wraps and indexes out-of-bounds; update the index computation in GroupSlot&
get_slot(int64_t key) and the const overload (the duplicate at the other
location) to normalize the key through int8/uint8 before sizing: compute idx by
first converting key to int8_t, then to uint8_t, and finally to size_t (i.e.,
replace static_cast<size_t>(key - min_key_) style casts with the three-step cast
sequence) so keys in the int8 range map to [0,255] without wraparound.
---
Nitpick comments:
In `@include/executor/vectorized_operator.hpp`:
- Around line 807-815: The code always allocates per-thread structures
(thread_hash_aggs_ and thread_group_keys_) when thread_pool_ has multiple
threads, even though the DirectIndexAgg path (guarded by is_direct_indexable_)
does not use them; update the allocation block inside the
thread_pool_/num_threads_ check to only resize/init thread_hash_aggs_ and
thread_group_keys_ when !is_direct_indexable_. Specifically, in the section
referencing thread_pool_, num_threads_ and the loop that calls
thread_hash_aggs_[t].init(...), add a guard that checks is_direct_indexable_ and
skip the resize/initialization when true so memory is not wasted for
DirectIndexAgg.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d225ae54-3947-4682-afa0-177ec58815a7
📒 Files selected for processing (6)
docs/VECTORIZED_EXECUTION.mddocs/performance/DUCKDB_COMPARISON.mddocs/phases/PHASE_8_ANALYTICS.mdinclude/executor/vectorized_operator.hppsrc/parser/expression.cpptests/vectorized_operator_tests.cpp
✅ Files skipped from review due to trivial changes (2)
- docs/VECTORIZED_EXECUTION.md
- docs/phases/PHASE_8_ANALYTICS.md
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/vectorized_operator_tests.cpp
1. COUNT(*) re-emission in DirectIndexAgg: add emitted flag to GroupSlot and check !slot.emitted before output to prevent repeated emission of COUNT(*) groups 2. Thread local state not reset after merge: reinitialize thread_hash_aggs_[t] and clear thread_group_keys_[t] after merge_from() to prevent double-counting on subsequent merges 3. MIN/MAX float truncation: add mins_float64[], maxes_float64[], has_float_minmax[] accumulators to HashBucket and GroupSlot; branch on column type in update_bucket_accumulators to use to_float64() for FLOAT64 columns instead of truncating via to_int64() 4. Negative int64_t key wraparound in get_slot: use 3-step cast (int8_t -> uint8_t -> size_t) instead of direct subtraction to avoid negative key values wrapping to large indices
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
include/executor/vectorized_operator.hpp (3)
1234-1252:⚠️ Potential issue | 🟠 Major | ⚡ Quick winComposite GROUP BY keys are decoded with the wrong type tag after column 0.
Line 1236 always reads
bucket.key_data[0], but the encoder writes a new type tag for each grouped column. For multi-column keys, column 1+ should read the tag at the current offset; otherwise mixed-type or NULL-containing composite keys deserialize incorrectly during output.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/executor/vectorized_operator.hpp` around lines 1234 - 1252, The GROUP BY composite key decoding always reads the type tag from bucket.key_data[0], causing wrong types after the first column; change the logic in the vectorized output path (around the block that uses group_by_, out_batch, and bucket.key_data) to read the type_tag from the current key_offset (e.g., uint8_t type_tag = bucket.key_data[key_offset]) instead of always index 0, then advance key_offset by 1 before decoding the field payload; ensure for each branch (NULL, INT64, STRING, etc.) you check bounds against key_offset, decode using the current offset, and increment key_offset by the correct number of bytes (1 for tag + payload length) so subsequent columns read their own tags correctly.
562-595:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
grow()appends every occupied bucket tovalid_indices_twice.During rehash,
find_or_insert*()already pushes each reinserted bucket index intovalid_indices_. The rebuild loop at Lines 589-594 appends the same occupied indices again without clearing first.produce_output_batch_open_addressing()papers over this locally, butmerge_from()iteratesother.valid_slots()directly, so a grown per-thread table will merge the same bucket multiple times.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/executor/vectorized_operator.hpp` around lines 562 - 595, grow() is duplicating entries in valid_indices_ because find_or_insert()/find_or_insert_int64() already appends reinserts; remove the final rebuild loop that iterates buckets_ and pushes occupied indices (or make it conditional only when valid_indices_ is empty) so valid_indices_ is only populated once during rehash; update grow() to rely on find_or_insert*/their existing push behavior and ensure merge_from()/other.valid_slots() will no longer see duplicate indices.
840-880:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winGroups are emitted before aggregation is complete.
next_batch()callsproduce_output_batch_*()after every child batch. Once a direct slot is marked emitted or an open-address bucket has its counts negated, later rows for that same group can still be aggregated but can never be emitted again. With a 4096-row scan batch size, any group spanning batches returns a partial result. This samecounts[0]sentinel also drops valid groups when the first aggregate isMIN/MAXor a nullableSUM/AVGthat saw only NULLs.Also applies to: 1176-1204, 1211-1284
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/executor/vectorized_operator.hpp` around lines 840 - 880, next_batch currently calls produce_output_batch_direct_index/produce_output_batch_open_addressing after each child batch, which emits groups prematurely and uses counts[0] as a sentinel that blocks further aggregation; change next_batch (the loop that calls child_->next_batch and process_input_batch_direct_index/process_input_batch_open_addressing) to only aggregate input (process_input_batch_*) inside the loop and NOT call any produce_output_batch_* until after the input loop completes (i.e., after child is exhausted or before returning on Error/Init), and refactor the emission logic so emitted-state is tracked separately (e.g., an explicit emitted flag per slot/bucket set only when finalizing output at the end) rather than reusing counts[0] as a sentinel, updating produce_output_batch_direct_index and produce_output_batch_open_addressing to rely on that emitted flag and to only emit groups once aggregation is complete.
♻️ Duplicate comments (2)
include/executor/vectorized_operator.hpp (2)
1109-1128:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftFloat
MIN/MAXis still only partially wired through the aggregation lifecycle.This branch populates
mins_float64/maxes_float64, butgrow()andmerge_from()still only copy/merge the integer min/max fields,process_input_batch_direct_index()still truncates viato_int64(), and both output paths still emitmake_int64(...)forMIN/MAX. So float groupedMIN/MAXremains wrong in direct-index mode and after any hash-table grow or parallel merge.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/executor/vectorized_operator.hpp` around lines 1109 - 1128, The float MIN/MAX path is only partially implemented: ensure the mins_float64/maxes_float64 and has_float_minmax flag are preserved and merged everywhere. Update grow() to allocate/copy mins_float64, maxes_float64 and has_float_minmax the same way it does mins/maxes/has_mins; update merge_from() to merge float buckets using std::min/std::max and to honor has_float_minmax when combining buckets; change process_input_batch_direct_index() to call to_float64() for columns where col.type() == common::ValueType::TYPE_FLOAT64 (instead of to_int64()) and set the float min/max fields; and change the output paths that currently emit make_int64(...) for MIN/MAX to emit make_float64(...) when the grouped column type is TYPE_FLOAT64 so floats are not truncated.
716-728:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winDirect indexing is still unsafe for negative keys and wider integer domains.
Line 727 still casts
int8_tdirectly tosize_t, sokey = -1indexesslots_[SIZE_MAX]. Separately, this path is enabled for INT16/32/64 keys, but bothtrack_key()andget_slot()narrow throughint8_t, so values outside[-128, 127]alias into the same slot. AGROUP BYon1000and-24will merge those groups.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/executor/vectorized_operator.hpp` around lines 716 - 728, The bug is caused by unsafe narrowing casts that turn negative keys into huge indices and alias wide integers into the same slot; update get_slot and track_key to cast to uint8_t before size_t (use static_cast<size_t>(static_cast<uint8_t>(key))) so int8->size_t sign-extension is avoided, and in track_key add an explicit range check (if key < -128 || key > 127) to bail out or flip is_direct_indexable_ to false (so the code falls back to OpenAddressHashAgg) instead of truncating into slots_. Reference get_slot, track_key, slots_, initialized_, and is_direct_indexable_ when making these changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@include/executor/vectorized_operator.hpp`:
- Around line 424-430: HashBucket::key_data (fixed 64-byte buffer) can be
overflowed because process_input_batch_open_addressing() and find_or_insert()
copy key_len bytes into it; change the implementation so keys are not copied
into a fixed 64-byte stack field: either make HashBucket store a dynamically
allocated buffer or pointer to a separately allocated key blob (e.g., heap
buffer, arena, or std::string/std::vector) sized to key_len, or convert key_data
into a flexible/following-variable-length storage with allocation at insertion
time; update find_or_insert(), process_input_batch_open_addressing(), and any
code that reads/writes HashBucket::key_data to allocate, copy, and free/own the
key bytes safely and validate key_len before any memcpy to prevent out-of-bounds
writes.
---
Outside diff comments:
In `@include/executor/vectorized_operator.hpp`:
- Around line 1234-1252: The GROUP BY composite key decoding always reads the
type tag from bucket.key_data[0], causing wrong types after the first column;
change the logic in the vectorized output path (around the block that uses
group_by_, out_batch, and bucket.key_data) to read the type_tag from the current
key_offset (e.g., uint8_t type_tag = bucket.key_data[key_offset]) instead of
always index 0, then advance key_offset by 1 before decoding the field payload;
ensure for each branch (NULL, INT64, STRING, etc.) you check bounds against
key_offset, decode using the current offset, and increment key_offset by the
correct number of bytes (1 for tag + payload length) so subsequent columns read
their own tags correctly.
- Around line 562-595: grow() is duplicating entries in valid_indices_ because
find_or_insert()/find_or_insert_int64() already appends reinserts; remove the
final rebuild loop that iterates buckets_ and pushes occupied indices (or make
it conditional only when valid_indices_ is empty) so valid_indices_ is only
populated once during rehash; update grow() to rely on find_or_insert*/their
existing push behavior and ensure merge_from()/other.valid_slots() will no
longer see duplicate indices.
- Around line 840-880: next_batch currently calls
produce_output_batch_direct_index/produce_output_batch_open_addressing after
each child batch, which emits groups prematurely and uses counts[0] as a
sentinel that blocks further aggregation; change next_batch (the loop that calls
child_->next_batch and
process_input_batch_direct_index/process_input_batch_open_addressing) to only
aggregate input (process_input_batch_*) inside the loop and NOT call any
produce_output_batch_* until after the input loop completes (i.e., after child
is exhausted or before returning on Error/Init), and refactor the emission logic
so emitted-state is tracked separately (e.g., an explicit emitted flag per
slot/bucket set only when finalizing output at the end) rather than reusing
counts[0] as a sentinel, updating produce_output_batch_direct_index and
produce_output_batch_open_addressing to rely on that emitted flag and to only
emit groups once aggregation is complete.
---
Duplicate comments:
In `@include/executor/vectorized_operator.hpp`:
- Around line 1109-1128: The float MIN/MAX path is only partially implemented:
ensure the mins_float64/maxes_float64 and has_float_minmax flag are preserved
and merged everywhere. Update grow() to allocate/copy mins_float64,
maxes_float64 and has_float_minmax the same way it does mins/maxes/has_mins;
update merge_from() to merge float buckets using std::min/std::max and to honor
has_float_minmax when combining buckets; change
process_input_batch_direct_index() to call to_float64() for columns where
col.type() == common::ValueType::TYPE_FLOAT64 (instead of to_int64()) and set
the float min/max fields; and change the output paths that currently emit
make_int64(...) for MIN/MAX to emit make_float64(...) when the grouped column
type is TYPE_FLOAT64 so floats are not truncated.
- Around line 716-728: The bug is caused by unsafe narrowing casts that turn
negative keys into huge indices and alias wide integers into the same slot;
update get_slot and track_key to cast to uint8_t before size_t (use
static_cast<size_t>(static_cast<uint8_t>(key))) so int8->size_t sign-extension
is avoided, and in track_key add an explicit range check (if key < -128 || key >
127) to bail out or flip is_direct_indexable_ to false (so the code falls back
to OpenAddressHashAgg) instead of truncating into slots_. Reference get_slot,
track_key, slots_, initialized_, and is_direct_indexable_ when making these
changes.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 854dcd64-096a-4034-8e07-beb27d4069e2
📒 Files selected for processing (1)
include/executor/vectorized_operator.hpp
| bool has_mins[MAX_AGGREGATES] = {false}; // Track if initialized | ||
| double mins_float64[MAX_AGGREGATES] = {0.0}; // Float MIN accumulator | ||
| double maxes_float64[MAX_AGGREGATES] = {0.0}; // Float MAX accumulator | ||
| bool has_float_minmax[MAX_AGGREGATES] = {false}; // Track if float MIN/MAX initialized | ||
| uint8_t key_type = 0; // 0x02=INT64, 0x04=STRING | ||
| uint32_t key_len = 0; // For non-int64 keys | ||
| uint8_t key_data[64]; // Stored key bytes for iteration |
There was a problem hiding this comment.
HashBucket::key_data can overflow on valid GROUP BY keys.
process_input_batch_open_addressing() can encode far more than 64 bytes per key, but find_or_insert() later copies key_len bytes into this fixed buffer with memcpy. Long text keys or composite keys will write past HashBucket and corrupt memory. This is an out-of-bounds write on query data, not just truncation.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@include/executor/vectorized_operator.hpp` around lines 424 - 430,
HashBucket::key_data (fixed 64-byte buffer) can be overflowed because
process_input_batch_open_addressing() and find_or_insert() copy key_len bytes
into it; change the implementation so keys are not copied into a fixed 64-byte
stack field: either make HashBucket store a dynamically allocated buffer or
pointer to a separately allocated key blob (e.g., heap buffer, arena, or
std::string/std::vector) sized to key_len, or convert key_data into a
flexible/following-variable-length storage with allocation at insertion time;
update find_or_insert(), process_input_batch_open_addressing(), and any code
that reads/writes HashBucket::key_data to allocate, copy, and free/own the key
bytes safely and validate key_len before any memcpy to prevent out-of-bounds
writes.
Summary
Benchmark Results (Q6)
Testing
Summary by CodeRabbit
New Features
Bug Fixes
Performance
Documentation
Tests