From ab814f63a5346b9b58279eb14250e36c4a80d689 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Tue, 14 Apr 2026 20:22:19 +0200 Subject: [PATCH] Optimize ArrowBytesViewMap: fuse hash+insert, shrink entries, remove generic Three optimizations to ArrowBytesViewMap used by GROUP BY and COUNT DISTINCT on StringView/BinaryView columns: 1. Fuse hash computation with hash table probe: instead of a two-pass approach (batch create_hashes then per-element probe), compute the hash and fetch non-inline bytes once per element. This keeps the input string data cache-hot for the immediately-following equality comparison, avoiding a redundant pointer chase into the input array's data buffers. 2. Shrink hash table entries from 32 bytes (Entry with u128 view + u64 hash + V payload) to 16 bytes (usize index + u64 hash). Views are looked up via the index into the existing views vec on demand. 3. Remove the V generic parameter entirely. The only two usages were ArrowBytesViewMap<()> (set) and ArrowBytesViewMap (group-by), where the usize payload was always the insertion-order index -- which is already implicit in the views vec position. Simplify the two-callback API (make_payload_fn + observe_payload_fn) to a single observe_fn(usize). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/binary_view_map.rs | 416 ++++++++---------- .../single_group_by/bytes_view.rs | 39 +- 2 files changed, 187 insertions(+), 268 deletions(-) diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index abc3e28f82627..2b6e81fb00bc0 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -18,22 +18,21 @@ //! [`ArrowBytesViewMap`] and [`ArrowBytesViewSet`] for storing maps/sets of values from //! `StringViewArray`/`BinaryViewArray`. use crate::binary_map::OutputType; -use arrow::array::NullBufferBuilder; use arrow::array::cast::AsArray; -use arrow::array::{Array, ArrayRef, BinaryViewArray, ByteView, make_view}; -use arrow::buffer::{Buffer, ScalarBuffer}; +use arrow::array::{Array, ArrayRef, BinaryViewArray, ByteView}; +use arrow::buffer::{BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer}; use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType}; +use arrow::util::bit_util::unset_bit; +use datafusion_common::hash_utils::HashValue; use datafusion_common::hash_utils::RandomState; -use datafusion_common::hash_utils::create_hashes; -use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt}; -use std::fmt::Debug; +use datafusion_common::utils::proxy::HashTableAllocExt; use std::mem::size_of; use std::sync::Arc; /// HashSet optimized for storing string or binary values that can produce that /// the final set as a `GenericBinaryViewArray` with minimal copies. #[derive(Debug)] -pub struct ArrowBytesViewSet(ArrowBytesViewMap<()>); +pub struct ArrowBytesViewSet(ArrowBytesViewMap); impl ArrowBytesViewSet { pub fn new(output_type: OutputType) -> Self { @@ -42,10 +41,7 @@ impl ArrowBytesViewSet { /// Inserts each value from `values` into the set pub fn insert(&mut self, values: &ArrayRef) { - fn make_payload_fn(_value: Option<&[u8]>) {} - fn observe_payload_fn(_payload: ()) {} - self.0 - .insert_if_new(values, make_payload_fn, observe_payload_fn); + self.0.insert_if_new(values, |_idx| {}); } /// Return the contents of this map and replace it with a new empty map with @@ -88,13 +84,9 @@ impl ArrowBytesViewSet { /// values that can produce the set of keys on /// output as `GenericBinaryViewArray` without copies. /// -/// Equivalent to `HashSet` but with better performance if you need -/// to emit the keys as an Arrow `StringViewArray` / `BinaryViewArray`. For other -/// purposes it is the same as a `HashMap` -/// -/// # Generic Arguments -/// -/// * `V`: payload type +/// Each distinct value is assigned a sequential index (its position in +/// insertion order). This index serves as the implicit payload — callers +/// like `GroupValuesBytesView` use it directly as the group index. /// /// # Description /// @@ -108,23 +100,18 @@ impl ArrowBytesViewSet { /// 2. Retains the insertion order of entries in the final array. The values are /// in the same order as they were inserted. /// -/// Note this structure can be used as a `HashSet` by specifying the value type -/// as `()`, as is done by [`ArrowBytesViewSet`]. -/// /// This map is used by the special `COUNT DISTINCT` aggregate function to /// store the distinct values, and by the `GROUP BY` operator to store /// group values when they are a single string array. /// Max size of the in-progress buffer before flushing to completed buffers const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024; -pub struct ArrowBytesViewMap -where - V: Debug + PartialEq + Eq + Clone + Copy + Default, -{ +pub struct ArrowBytesViewMap { /// Should the output be StringView or BinaryView? output_type: OutputType, - /// Underlying hash set for each distinct value - map: hashbrown::hash_table::HashTable>, + /// Underlying hash set for each distinct value. + /// Stores `(index into views, hash)` pairs. + map: hashbrown::hash_table::HashTable<(usize, u64)>, /// Total size of the map in bytes map_size: usize, @@ -134,26 +121,18 @@ where in_progress: Vec, /// Completed buffers containing string data completed: Vec, - /// Tracks null values (true = null) - nulls: NullBufferBuilder, /// random state used to generate hashes random_state: RandomState, - /// buffer that stores hash values (reused across batches to save allocations) - hashes_buffer: Vec, - /// `(payload, null_index)` for the 'null' value, if any - /// NOTE null_index is the logical index in the final array, not the index - /// in the buffer - null: Option<(V, usize)>, + /// Index into `views` for the null entry, if any. + /// The null buffer is reconstructed from this in `into_state()`. + null: Option, } /// The size, in number of entries, of the initial hash table const INITIAL_MAP_CAPACITY: usize = 512; -impl ArrowBytesViewMap -where - V: Debug + PartialEq + Eq + Clone + Copy + Default, -{ +impl ArrowBytesViewMap { pub fn new(output_type: OutputType) -> Self { Self { output_type, @@ -162,9 +141,7 @@ where views: Vec::new(), in_progress: Vec::new(), completed: Vec::new(), - nulls: NullBufferBuilder::new(0), random_state: RandomState::default(), - hashes_buffer: vec![], null: None, } } @@ -177,58 +154,24 @@ where new_self } - /// Inserts each value from `values` into the map, invoking `payload_fn` for - /// each value if *not* already present, deferring the allocation of the - /// payload until it is needed. - /// - /// Note that this is different than a normal map that would replace the - /// existing entry - /// - /// # Arguments: - /// - /// `values`: array whose values are inserted - /// - /// `make_payload_fn`: invoked for each value that is not already present - /// to create the payload, in order of the values in `values` + /// Inserts each value from `values` into the map. /// - /// `observe_payload_fn`: invoked once, for each value in `values`, that was - /// already present in the map, with corresponding payload value. - /// - /// # Returns - /// - /// The payload value for the entry, either the existing value or - /// the newly inserted value - /// - /// # Safety: - /// - /// Note that `make_payload_fn` and `observe_payload_fn` are only invoked - /// with valid values from `values`, not for the `NULL` value. - pub fn insert_if_new( - &mut self, - values: &ArrayRef, - make_payload_fn: MP, - observe_payload_fn: OP, - ) where - MP: FnMut(Option<&[u8]>) -> V, - OP: FnMut(V), + /// For each value, `observe_fn` is called with the value's index in + /// insertion order (its position in `views`). New values get the next + /// sequential index; existing values get their previously assigned index. + pub fn insert_if_new(&mut self, values: &ArrayRef, observe_fn: OP) + where + OP: FnMut(usize), { // Sanity check array type match self.output_type { OutputType::BinaryView => { assert!(matches!(values.data_type(), DataType::BinaryView)); - self.insert_if_new_inner::( - values, - make_payload_fn, - observe_payload_fn, - ) + self.insert_if_new_inner::(values, observe_fn) } OutputType::Utf8View => { assert!(matches!(values.data_type(), DataType::Utf8View)); - self.insert_if_new_inner::( - values, - make_payload_fn, - observe_payload_fn, - ) + self.insert_if_new_inner::(values, observe_fn) } _ => unreachable!("Utf8/Binary should use `ArrowBytesSet`"), }; @@ -238,87 +181,122 @@ where /// (both StringView and BinaryView) /// /// Note this is the only function that is generic on [`ByteViewType`], which - /// avoids having to template the entire structure, making the code - /// simpler and understand and reducing code bloat due to duplication. + /// avoids having to template the entire structure, making the code + /// simpler to understand and reducing code bloat due to duplication. /// - /// See comments on `insert_if_new` for more details - fn insert_if_new_inner( - &mut self, - values: &ArrayRef, - mut make_payload_fn: MP, - mut observe_payload_fn: OP, - ) where - MP: FnMut(Option<&[u8]>) -> V, - OP: FnMut(V), + /// Dispatches to a const-generic inner loop specialized on the presence + /// of nulls and out-of-line buffers, eliminating per-row branches. + fn insert_if_new_inner(&mut self, values: &ArrayRef, observe_fn: OP) + where + OP: FnMut(usize), B: ByteViewType, { - // step 1: compute hashes - let batch_hashes = &mut self.hashes_buffer; - batch_hashes.clear(); - batch_hashes.resize(values.len(), 0); - create_hashes([values], &self.random_state, batch_hashes) - // hash is supported for all types and create_hashes only - // returns errors for unsupported types - .unwrap(); - - // step 2: insert each value into the set, if not already present let values = values.as_byte_view::(); - - // Get raw views buffer for direct comparison let input_views = values.views(); + let input_buffers = values.data_buffers(); + let nulls = values.nulls(); + + match (nulls.is_some(), !input_buffers.is_empty()) { + (false, false) => self.insert_loop::( + input_views, + input_buffers, + nulls, + observe_fn, + ), + (false, true) => self.insert_loop::( + input_views, + input_buffers, + nulls, + observe_fn, + ), + (true, false) => self.insert_loop::( + input_views, + input_buffers, + nulls, + observe_fn, + ), + (true, true) => self.insert_loop::( + input_views, + input_buffers, + nulls, + observe_fn, + ), + } + } - // Ensure lengths are equivalent - assert_eq!(values.len(), self.hashes_buffer.len()); - - for i in 0..values.len() { - let view_u128 = input_views[i]; - let hash = self.hashes_buffer[i]; - - // handle null value via validity bitmap check - if values.is_null(i) { - let payload = if let Some(&(payload, _offset)) = self.null.as_ref() { - payload + /// Inner loop for [`Self::insert_if_new_inner`], specialized via const generics. + /// + /// `HAS_NULLS`: whether the input has any null values (skip null checks if false) + /// `HAS_BUFFERS`: whether the input has out-of-line buffers (all inline if false) + #[inline(never)] + fn insert_loop( + &mut self, + input_views: &[u128], + input_buffers: &[Buffer], + nulls: Option<&NullBuffer>, + mut observe_fn: OP, + ) where + OP: FnMut(usize), + { + for (i, &view_u128) in input_views.iter().enumerate() { + // handle null value — skipped entirely when HAS_NULLS is false + if HAS_NULLS && nulls.unwrap().is_null(i) { + let idx = if let Some(null_index) = self.null { + null_index } else { - let payload = make_payload_fn(None); let null_index = self.views.len(); self.views.push(0); - self.nulls.append_null(); - self.null = Some((payload, null_index)); - payload + self.null = Some(null_index); + null_index }; - observe_payload_fn(payload); + observe_fn(idx); continue; } // Extract length from the view (first 4 bytes of u128 in little-endian) let len = view_u128 as u32; + // Compute hash and resolve non-inline bytes in a single pass. + // When HAS_BUFFERS is false, all values are inline (<=12 bytes) + // and we hash the u128 view directly with no pointer chase. + let (hash, value_bytes) = if !HAS_BUFFERS || len <= 12 { + (view_u128.hash_one(&self.random_state), None) + } else { + let byte_view = ByteView::from(view_u128); + let buf_idx = byte_view.buffer_index as usize; + let offset = byte_view.offset as usize; + let bytes = &input_buffers[buf_idx][offset..offset + len as usize]; + (bytes.hash_one(&self.random_state), Some(bytes)) + }; + // Check if value already exists - let maybe_payload = { - // Borrow completed and in_progress for comparison + let maybe_idx = { + let views = &self.views; let completed = &self.completed; let in_progress = &self.in_progress; self.map - .find(hash, |header| { - if header.hash != hash { + .find(hash, |&(idx, stored_hash)| { + if stored_hash != hash { return false; } - // Fast path: inline strings can be compared directly - if len <= 12 { - return header.view == view_u128; + let stored_view = views[idx]; + + // When HAS_BUFFERS is false, all values are inline + if !HAS_BUFFERS || len <= 12 { + return stored_view == view_u128; } // For larger strings: first compare the 4-byte prefix - let stored_prefix = (header.view >> 32) as u32; + let stored_prefix = (stored_view >> 32) as u32; let input_prefix = (view_u128 >> 32) as u32; if stored_prefix != input_prefix { return false; } // Prefix matched - compare full bytes - let byte_view = ByteView::from(header.view); + let byte_view = ByteView::from(stored_view); let stored_len = byte_view.length as usize; let buffer_index = byte_view.buffer_index as usize; let offset = byte_view.offset as usize; @@ -329,32 +307,36 @@ where } else { &in_progress[offset..offset + stored_len] }; - let input_value: &[u8] = values.value(i).as_ref(); - stored_value == input_value + stored_value == value_bytes.unwrap() }) - .map(|entry| entry.payload) + .map(|&(idx, _)| idx) }; - let payload = if let Some(payload) = maybe_payload { - payload + let idx = if let Some(idx) = maybe_idx { + idx } else { - // no existing value, make a new one - let value: &[u8] = values.value(i).as_ref(); - let payload = make_payload_fn(Some(value)); - - // Create view pointing to our buffers - let new_view = self.append_value(value); - let new_header = Entry { - view: new_view, - hash, - payload, + let new_idx = self.views.len(); + + let new_view = if !HAS_BUFFERS || len <= 12 { + // Inline: the input view IS the data, reuse it directly + view_u128 + } else { + // Copy bytes to our buffer and rewrite the view's + // buffer_index + offset while keeping length + prefix + let value = value_bytes.unwrap(); + self.store_non_inline(value, view_u128) }; - self.map - .insert_accounted(new_header, |h| h.hash, &mut self.map_size); - payload + self.views.push(new_view); + + self.map.insert_accounted( + (new_idx, hash), + |&(_, h)| h, + &mut self.map_size, + ); + new_idx }; - observe_payload_fn(payload); + observe_fn(idx); } } @@ -371,8 +353,20 @@ where self.completed.push(Buffer::from_vec(flushed)); } - // Build null buffer if we have any nulls - let null_buffer = self.nulls.finish(); + // Reconstruct null buffer from the null index if present. + // At most one null exists: start all-valid, unset the single null bit. + let null_buffer = self.null.map(|null_index| { + let byte_len = (self.views.len() + 7) / 8; + let mut buf = MutableBuffer::new(byte_len).with_bitset(byte_len, true); + unset_bit(buf.as_slice_mut(), null_index); + // SAFETY: we unset exactly one bit + unsafe { + NullBuffer::new_unchecked( + BooleanBuffer::new(buf.into(), 0, self.views.len()), + 1, + ) + } + }); let views = ScalarBuffer::from(self.views); let array = @@ -389,31 +383,29 @@ where } } - /// Append a value to our buffers and return the view pointing to it - fn append_value(&mut self, value: &[u8]) -> u128 { + /// Store a non-inline value (>12 bytes) in our buffers and return + /// a view with the original length+prefix but our buffer_index+offset. + #[inline] + fn store_non_inline(&mut self, value: &[u8], input_view: u128) -> u128 { let len = value.len(); - let view = if len <= 12 { - make_view(value, 0, 0) - } else { - // Ensure buffer is big enough - if self.in_progress.len() + len > BYTE_VIEW_MAX_BLOCK_SIZE { - let flushed = std::mem::replace( - &mut self.in_progress, - Vec::with_capacity(BYTE_VIEW_MAX_BLOCK_SIZE), - ); - self.completed.push(Buffer::from_vec(flushed)); - } - - let buffer_index = self.completed.len() as u32; - let offset = self.in_progress.len() as u32; - self.in_progress.extend_from_slice(value); + // Flush if the in-progress buffer would overflow + if self.in_progress.len() + len > BYTE_VIEW_MAX_BLOCK_SIZE { + let flushed = std::mem::replace( + &mut self.in_progress, + Vec::with_capacity(BYTE_VIEW_MAX_BLOCK_SIZE), + ); + self.completed.push(Buffer::from_vec(flushed)); + } - make_view(value, buffer_index, offset) - }; + let buffer_index = self.completed.len() as u32; + let offset = self.in_progress.len() as u32; + self.in_progress.extend_from_slice(value); - self.views.push(view); - self.nulls.append_non_null(); - view + // Reuse length + prefix from the input view, replace buffer location + ByteView::from(input_view) + .with_buffer_index(buffer_index) + .with_offset(offset) + .as_u128() } /// Total number of entries (including null, if present) @@ -437,55 +429,23 @@ where let views_size = self.views.len() * size_of::(); let in_progress_size = self.in_progress.capacity(); let completed_size: usize = self.completed.iter().map(|b| b.len()).sum(); - let nulls_size = self.nulls.allocated_size(); - - self.map_size - + views_size - + in_progress_size - + completed_size - + nulls_size - + self.hashes_buffer.allocated_size() + + self.map_size + views_size + in_progress_size + completed_size } } -impl Debug for ArrowBytesViewMap -where - V: Debug + PartialEq + Eq + Clone + Copy + Default, -{ +impl std::fmt::Debug for ArrowBytesViewMap { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ArrowBytesMap") + f.debug_struct("ArrowBytesViewMap") .field("map", &"") .field("map_size", &self.map_size) .field("views_len", &self.views.len()) .field("completed_buffers", &self.completed.len()) .field("random_state", &self.random_state) - .field("hashes_buffer", &self.hashes_buffer) .finish() } } -/// Entry in the hash table -- see [`ArrowBytesViewMap`] for more details -/// -/// Stores the view pointing to our internal buffers, eliminating the need -/// for a separate builder index. For inline strings (<=12 bytes), the view -/// contains the entire value. For out-of-line strings, the view contains -/// buffer_index and offset pointing directly to our storage. -#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] -struct Entry -where - V: Debug + PartialEq + Eq + Clone + Copy + Default, -{ - /// The u128 view pointing to our internal buffers. For inline strings, - /// this contains the complete value. For larger strings, this contains - /// the buffer_index/offset into our completed/in_progress buffers. - view: u128, - - hash: u64, - - /// value stored by the entry - payload: V, -} - #[cfg(test)] mod tests { use arrow::array::{GenericByteViewArray, StringViewArray}; @@ -558,7 +518,7 @@ mod tests { let mut set = ArrowBytesViewSet::new(OutputType::Utf8View); let array: ArrayRef = Arc::new(values); set.insert(&array); - // values mut appear be in the order they were inserted + // values must appear in the order they were inserted assert_set( set, &[ @@ -590,7 +550,7 @@ mod tests { let mut set = ArrowBytesViewSet::new(OutputType::Utf8View); let array: ArrayRef = Arc::new(values); set.insert(&array); - // strings mut appear be in the order they were inserted + // strings must appear in the order they were inserted assert_set( set, &[ @@ -678,15 +638,9 @@ mod tests { assert_eq!(set.len(), 10); } - #[derive(Debug, PartialEq, Eq, Default, Clone, Copy)] - struct TestPayload { - // store the string value to check against input - index: usize, // store the index of the string (each new string gets the next sequential input) - } - /// Wraps an [`ArrowBytesViewMap`], validating its invariants struct TestMap { - map: ArrowBytesViewMap, + map: ArrowBytesViewMap, // stores distinct strings seen, in order strings: Vec>, // map strings to index in strings @@ -709,14 +663,11 @@ mod tests { let string_array = StringViewArray::from(strings.to_vec()); let arr: ArrayRef = Arc::new(string_array); - let mut next_index = self.indexes.len(); - let mut actual_new_strings = vec![]; let mut actual_seen_indexes = vec![]; // update self with new values, keeping track of newly added values for str in strings { let str = str.map(|s| s.to_string()); let index = self.indexes.get(&str).cloned().unwrap_or_else(|| { - actual_new_strings.push(str.clone()); let index = self.strings.len(); self.strings.push(str.clone()); self.indexes.insert(str, index); @@ -726,25 +677,12 @@ mod tests { } // insert the values into the map, recording what we did - let mut seen_new_strings = vec![]; let mut seen_indexes = vec![]; - self.map.insert_if_new( - &arr, - |s| { - let value = s - .map(|s| String::from_utf8(s.to_vec()).expect("Non utf8 string")); - let index = next_index; - next_index += 1; - seen_new_strings.push(value); - TestPayload { index } - }, - |payload| { - seen_indexes.push(payload.index); - }, - ); + self.map.insert_if_new(&arr, |idx| { + seen_indexes.push(idx); + }); assert_eq!(actual_seen_indexes, seen_indexes); - assert_eq!(actual_new_strings, seen_new_strings); } /// Call `self.map.into_array()` validating that the strings are in the same diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs index 7a56f7c52c11a..3c85b2c54a9a7 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs @@ -28,16 +28,13 @@ use std::mem::size_of; /// purpose `Row`s format pub struct GroupValuesBytesView { /// Map string/binary values to group index - map: ArrowBytesViewMap, - /// The total number of groups so far (used to assign group_index) - num_groups: usize, + map: ArrowBytesViewMap, } impl GroupValuesBytesView { pub fn new(output_type: OutputType) -> Self { Self { map: ArrowBytesViewMap::new(output_type), - num_groups: 0, } } } @@ -54,20 +51,9 @@ impl GroupValues for GroupValuesBytesView { let arr = &cols[0]; groups.clear(); - self.map.insert_if_new( - arr, - // called for each new group - |_value| { - // assign new group index on each insert - let group_idx = self.num_groups; - self.num_groups += 1; - group_idx - }, - // called for each group - |group_idx| { - groups.push(group_idx); - }, - ); + self.map.insert_if_new(arr, |group_idx| { + groups.push(group_idx); + }); // ensure we assigned a group to for each row assert_eq!(groups.len(), arr.len()); @@ -79,26 +65,22 @@ impl GroupValues for GroupValuesBytesView { } fn is_empty(&self) -> bool { - self.num_groups == 0 + self.map.len() == 0 } fn len(&self) -> usize { - self.num_groups + self.map.len() } fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result> { + let num_groups = self.len(); + // Reset the map to default, and convert it into a single array let map_contents = self.map.take().into_state(); let group_values = match emit_to { - EmitTo::All => { - self.num_groups -= map_contents.len(); - map_contents - } - EmitTo::First(n) if n == self.len() => { - self.num_groups -= map_contents.len(); - map_contents - } + EmitTo::All => map_contents, + EmitTo::First(n) if n == num_groups => map_contents, EmitTo::First(n) => { // if we only wanted to take the first n, insert the rest back // into the map we could potentially avoid this reallocation, at @@ -108,7 +90,6 @@ impl GroupValues for GroupValuesBytesView { let remaining_group_values = map_contents.slice(n, map_contents.len() - n); - self.num_groups = 0; let mut group_indexes = vec![]; self.intern(&[remaining_group_values], &mut group_indexes)?;