diff --git a/Cargo.lock b/Cargo.lock index bbec97ed7ff3d..512fbd3c678dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2474,6 +2474,7 @@ name = "datafusion-physical-plan" version = "53.0.0" dependencies = [ "arrow", + "arrow-data", "arrow-ord", "arrow-schema", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 7e75bb59b68f2..8d90a11858a45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,6 +100,7 @@ arrow-avro = { version = "58.1.0", default-features = false, features = [ "xz", ] } arrow-buffer = { version = "58.1.0", default-features = false } +arrow-data = { version = "58.1.0", default-features = false } arrow-flight = { version = "58.1.0", features = [ "flight-sql-experimental", ] } diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 7acb21b8f3b93..374fc275a06e0 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -48,6 +48,7 @@ name = "datafusion_physical_plan" [dependencies] arrow = { workspace = true } +arrow-data = { workspace = true } arrow-ord = { workspace = true } arrow-schema = { workspace = true } async-trait = { workspace = true } @@ -72,6 +73,7 @@ pin-project-lite = "^0.2.7" tokio = { workspace = true } [dev-dependencies] +arrow-data = { workspace = true } criterion = { workspace = true, features = ["async_futures"] } datafusion-functions-aggregate = { workspace = true } datafusion-functions-window = { workspace = true } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 583bfa29b04ad..748063afc221a 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -53,7 +53,7 @@ use crate::{ Statistics, }; -use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray}; +use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays}; use arrow::datatypes::SchemaRef; use datafusion_common::config::SpillCompression; @@ -418,8 +418,6 @@ impl ExternalSorter { Some((self.spill_manager.create_in_progress_file("Sorting")?, 0)); } - Self::organize_stringview_arrays(globally_sorted_batches)?; - debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); let batches_to_spill = std::mem::take(globally_sorted_batches); @@ -431,10 +429,9 @@ impl ExternalSorter { })?; for batch in batches_to_spill { - in_progress_file.append_batch(&batch)?; + let gc_sliced_size = in_progress_file.append_batch(&batch)?; - *max_record_batch_size = - (*max_record_batch_size).max(batch.get_sliced_size()?); + *max_record_batch_size = (*max_record_batch_size).max(gc_sliced_size); } assert_or_internal_err!( @@ -463,71 +460,6 @@ impl ExternalSorter { Ok(()) } - /// Reconstruct `globally_sorted_batches` to organize the payload buffers of each - /// `StringViewArray` in sequential order by calling `gc()` on them. - /// - /// Note this is a workaround until is - /// available - /// - /// # Rationale - /// After (merge-based) sorting, all batches will be sorted into a single run, - /// but physically this sorted run is chunked into many small batches. For - /// `StringViewArray`s inside each sorted run, their inner buffers are not - /// re-constructed by default, leading to non-sequential payload locations - /// (permutated by `interleave()` Arrow kernel). A single payload buffer might - /// be shared by multiple `RecordBatch`es. - /// When writing each batch to disk, the writer has to write all referenced buffers, - /// because they have to be read back one by one to reduce memory usage. This - /// causes extra disk reads and writes, and potentially execution failure. - /// - /// # Example - /// Before sorting: - /// batch1 -> buffer1 - /// batch2 -> buffer2 - /// - /// sorted_batch1 -> buffer1 - /// -> buffer2 - /// sorted_batch2 -> buffer1 - /// -> buffer2 - /// - /// Then when spilling each batch, the writer has to write all referenced buffers - /// repeatedly. - fn organize_stringview_arrays( - globally_sorted_batches: &mut Vec, - ) -> Result<()> { - let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len()); - - for batch in globally_sorted_batches.drain(..) { - let mut new_columns: Vec> = - Vec::with_capacity(batch.num_columns()); - - let mut arr_mutated = false; - for array in batch.columns() { - if let Some(string_view_array) = - array.as_any().downcast_ref::() - { - let new_array = string_view_array.gc(); - new_columns.push(Arc::new(new_array)); - arr_mutated = true; - } else { - new_columns.push(Arc::clone(array)); - } - } - - let organized_batch = if arr_mutated { - RecordBatch::try_new(batch.schema(), new_columns)? - } else { - batch - }; - - organized_batches.push(organized_batch); - } - - *globally_sorted_batches = organized_batches; - - Ok(()) - } - /// Sorts the in-memory batches and merges them into a single sorted run, then writes /// the result to spill files. async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> { diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs index 9084ea449d6b9..e0548bd5bf860 100644 --- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs +++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs @@ -24,7 +24,10 @@ use arrow::array::RecordBatch; use datafusion_common::exec_datafusion_err; use datafusion_execution::disk_manager::RefCountedTempFile; -use super::{IPCStreamWriter, spill_manager::SpillManager}; +use super::{ + IPCStreamWriter, gc_view_arrays, + spill_manager::{GetSlicedSize, SpillManager}, +}; /// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`. /// Caller is able to use this struct to incrementally append in-memory batches to @@ -51,16 +54,25 @@ impl InProgressSpillFile { /// Appends a `RecordBatch` to the spill file, initializing the writer if necessary. /// + /// Before writing, performs GC on StringView/BinaryView arrays to compact backing + /// buffers. When a view array is sliced, it still references the original full buffers, + /// causing massive spill files without GC (see issue #19414: 820MB → 33MB after GC). + /// + /// Returns the post-GC sliced memory size of the batch for memory accounting. + /// /// # Errors /// - Returns an error if the file is not active (has been finalized) /// - Returns an error if appending would exceed the disk usage limit configured /// by `max_temp_directory_size` in `DiskManager` - pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> { + pub fn append_batch(&mut self, batch: &RecordBatch) -> Result { if self.in_progress_file.is_none() { return Err(exec_datafusion_err!( "Append operation failed: No active in-progress file. The file may have already been finalized." )); } + + let gc_batch = gc_view_arrays(batch)?; + if self.writer.is_none() { // Use the SpillManager's declared schema rather than the batch's schema. // Individual batches may have different schemas (e.g., different nullability) @@ -87,7 +99,7 @@ impl InProgressSpillFile { } } if let Some(writer) = &mut self.writer { - let (spilled_rows, _) = writer.write(batch)?; + let (spilled_rows, _) = writer.write(&gc_batch)?; if let Some(in_progress_file) = &mut self.in_progress_file { let pre_size = in_progress_file.current_disk_usage(); in_progress_file.update_disk_usage()?; @@ -102,7 +114,7 @@ impl InProgressSpillFile { unreachable!() // Already checked inside current function } } - Ok(()) + gc_batch.get_sliced_size() } pub fn flush(&mut self) -> Result<()> { diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 6d51e6660e622..1c900b7579f73 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -34,14 +34,19 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use arrow::array::{BufferSpec, layout}; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::array::{ + Array, ArrayRef, BinaryViewArray, BufferSpec, GenericByteViewArray, StringViewArray, + layout, make_array, +}; +use arrow::datatypes::DataType; +use arrow::datatypes::{ByteViewType, Schema, SchemaRef}; use arrow::ipc::{ MetadataVersion, reader::StreamReader, writer::{IpcWriteOptions, StreamWriter}, }; use arrow::record_batch::RecordBatch; +use arrow_data::ArrayDataBuilder; use datafusion_common::config::SpillCompression; use datafusion_common::{DataFusionError, Result, exec_datafusion_err}; @@ -344,6 +349,174 @@ fn get_max_alignment_for_schema(schema: &Schema) -> usize { max_alignment } +/// Size of a single view structure in StringView/BinaryView arrays (in bytes). +/// Each view is 16 bytes: 4 bytes length + 4 bytes prefix + 8 bytes buffer ID/offset. +const VIEW_SIZE_BYTES: usize = 16; + +/// Performs garbage collection on StringView and BinaryView arrays before spilling to reduce memory usage. +/// +/// # Why GC is needed +/// +/// StringView and BinaryView arrays can accumulate significant memory waste when sliced. +/// When a large array is sliced (e.g., taking first 100 rows of 1000), the view array +/// still references the original data buffers containing all 1000 rows of data. +/// +/// For example, in the ClickBench benchmark (issue #19414), repeated slicing of StringView +/// arrays resulted in 820MB of spill files that could be reduced to just 33MB after GC - +/// a 96% reduction in size. +/// +/// # How it works +/// +/// The GC process: +/// 1. Identifies view arrays (StringView/BinaryView) in the batch +/// 2. Checks if their data buffers exceed a memory threshold +/// 3. If exceeded, calls the Arrow `gc()` method which creates new compact buffers +/// containing only the data referenced by the current views +/// 4. Returns a new batch with GC'd arrays (or original arrays if GC not needed) +/// +/// # When GC is triggered +/// +/// GC is only performed when data buffers exceed a threshold (currently 10KB). +/// This balances memory savings against the CPU overhead of garbage collection. +/// Small arrays are passed through unchanged since the GC overhead would exceed +/// any memory savings. +/// +/// # Performance considerations +/// +/// - If no view arrays need compaction, the original batch is cloned cheaply +/// - GC is skipped for small buffers to avoid unnecessary CPU overhead +/// - Nested container types are traversed recursively so view arrays inside +/// `List`, `Map`, `Union`, `Dictionary`, and other child-bearing arrays are compacted too +/// - The Arrow `gc()` method itself is optimized and only copies referenced data +pub(crate) fn gc_view_arrays(batch: &RecordBatch) -> Result { + let mut mutated = false; + let mut new_columns: Vec> = Vec::with_capacity(batch.num_columns()); + + for array in batch.columns() { + let (gc_array, array_mutated) = gc_array(array)?; + mutated |= array_mutated; + new_columns.push(gc_array); + } + + if mutated { + Ok(RecordBatch::try_new(batch.schema(), new_columns)?) + } else { + Ok(batch.clone()) + } +} + +fn gc_array(array: &ArrayRef) -> Result<(ArrayRef, bool)> { + match array.data_type() { + DataType::Utf8View => { + let string_view = array + .as_any() + .downcast_ref::() + .expect("Utf8View array should downcast to StringViewArray"); + if should_gc_view_array(string_view) { + Ok((Arc::new(string_view.gc()) as ArrayRef, true)) + } else { + Ok((Arc::clone(array), false)) + } + } + DataType::BinaryView => { + let binary_view = array + .as_any() + .downcast_ref::() + .expect("BinaryView array should downcast to BinaryViewArray"); + if should_gc_view_array(binary_view) { + Ok((Arc::new(binary_view.gc()) as ArrayRef, true)) + } else { + Ok((Arc::clone(array), false)) + } + } + _ => gc_array_children(array), + } +} + +fn gc_array_children(array: &ArrayRef) -> Result<(ArrayRef, bool)> { + let data = array.to_data(); + if data.child_data().is_empty() { + return Ok((Arc::clone(array), false)); + } + + let mut mutated = false; + let mut child_data = Vec::with_capacity(data.child_data().len()); + for child in data.child_data() { + let child_array = make_array(child.clone()); + let (gc_child, child_mutated) = gc_array(&child_array)?; + mutated |= child_mutated; + child_data.push(gc_child.to_data()); + } + + if !mutated { + return Ok((Arc::clone(array), false)); + } + + let rebuilt = ArrayDataBuilder::new(data.data_type().clone()) + .len(data.len()) + .offset(data.offset()) + .nulls(data.nulls().cloned()) + .buffers(data.buffers().to_vec()) + .child_data(child_data) + .build()?; + + Ok((make_array(rebuilt), true)) +} + +/// Determines whether a view array should be garbage collected before spilling. +/// +/// Arrow's `gc()` always allocates new compact buffers (it is never a no-op), so we +/// check here to skip the allocation cost when data buffers are small. We subtract +/// the views buffer (16 bytes × n_rows) from `get_buffer_memory_size()` so the +/// threshold tracks non-inline string data rather than row count. +fn should_gc_view_array(array: &GenericByteViewArray) -> bool { + const MIN_BUFFER_SIZE_FOR_GC: usize = 10 * 1024; // 10KB threshold + + if array.data_buffers().is_empty() { + return false; + } + + let data_buffer_size = array + .get_buffer_memory_size() + .saturating_sub(array.len() * VIEW_SIZE_BYTES); + data_buffer_size > MIN_BUFFER_SIZE_FOR_GC +} + +#[cfg(test)] +fn calculate_string_view_waste_ratio(array: &StringViewArray) -> f64 { + use arrow_data::MAX_INLINE_VIEW_LEN; + calculate_view_waste_ratio(array.len(), array.data_buffers(), |i| { + if !array.is_null(i) { + let value = array.value(i); + if value.len() > MAX_INLINE_VIEW_LEN as usize { + return value.len(); + } + } + 0 + }) +} + +#[cfg(test)] +fn calculate_view_waste_ratio( + len: usize, + data_buffers: &[arrow::buffer::Buffer], + get_value_size: F, +) -> f64 +where + F: Fn(usize) -> usize, +{ + let total_buffer_size: usize = data_buffers.iter().map(|b| b.capacity()).sum(); + if total_buffer_size == 0 { + return 0.0; + } + + let mut actual_used_size = (0..len).map(get_value_size).sum::(); + actual_used_size += len * VIEW_SIZE_BYTES; + + let waste = total_buffer_size.saturating_sub(actual_used_size); + waste as f64 / total_buffer_size as f64 +} + #[cfg(test)] mod tests { use super::in_progress_spill_file::InProgressSpillFile; @@ -866,4 +1039,386 @@ mod tests { Ok(()) } + + #[test] + fn test_gc_string_view_before_spill() -> Result<()> { + use arrow::array::StringViewArray; + + let strings: Vec = (0..200) + .map(|i| { + if i % 2 == 0 { + "short_string".to_string() + } else { + "this_is_a_much_longer_string_that_will_not_be_inlined".to_string() + } + }) + .collect(); + + let string_array = StringViewArray::from(strings); + let schema = Arc::new(Schema::new(vec![Field::new( + "strings", + DataType::Utf8View, + false, + )])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(string_array) as ArrayRef], + )?; + let sliced_batch = batch.slice(0, 20); + let gc_batch = gc_view_arrays(&sliced_batch)?; + + assert_eq!(gc_batch.num_rows(), sliced_batch.num_rows()); + assert_eq!(gc_batch.num_columns(), sliced_batch.num_columns()); + + Ok(()) + } + + #[test] + fn test_gc_binary_view_before_spill() -> Result<()> { + use arrow::array::BinaryViewArray; + + let binaries: Vec> = (0..200) + .map(|i| { + if i % 2 == 0 { + vec![1, 2, 3, 4] + } else { + vec![1; 50] + } + }) + .collect(); + + let binary_array = + BinaryViewArray::from_iter(binaries.iter().map(|b| Some(b.as_slice()))); + let schema = Arc::new(Schema::new(vec![Field::new( + "binaries", + DataType::BinaryView, + false, + )])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(binary_array) as ArrayRef], + )?; + let sliced_batch = batch.slice(0, 20); + let gc_batch = gc_view_arrays(&sliced_batch)?; + + assert_eq!(gc_batch.num_rows(), sliced_batch.num_rows()); + assert_eq!(gc_batch.num_columns(), sliced_batch.num_columns()); + + Ok(()) + } + + #[test] + fn test_gc_skips_small_arrays() -> Result<()> { + use arrow::array::StringViewArray; + + let strings: Vec = (0..10).map(|i| format!("string_{i}")).collect(); + + let string_array = StringViewArray::from(strings); + let array_ref: ArrayRef = Arc::new(string_array); + + let schema = Arc::new(Schema::new(vec![Field::new( + "strings", + DataType::Utf8View, + false, + )])); + + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![array_ref])?; + + // GC should return the original batch for small arrays + let should_gc = should_gc_view_array( + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(), + ); + let gc_batch = gc_view_arrays(&batch)?; + + assert!(!should_gc); + assert_eq!(gc_batch.num_rows(), batch.num_rows()); + assert!(Arc::ptr_eq(batch.column(0), gc_batch.column(0))); + + Ok(()) + } + + #[test] + fn test_gc_with_mixed_columns() -> Result<()> { + use arrow::array::{Int32Array, StringViewArray}; + + let strings: Vec = (0..200) + .map(|i| format!("long_string_for_gc_testing_{i}")) + .collect(); + + let string_array = StringViewArray::from(strings); + let int_array = Int32Array::from((0..200).collect::>()); + + let schema = Arc::new(Schema::new(vec![ + Field::new("strings", DataType::Utf8View, false), + Field::new("ints", DataType::Int32, false), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(string_array) as ArrayRef, + Arc::new(int_array) as ArrayRef, + ], + )?; + + let sliced_batch = batch.slice(0, 50); + let gc_batch = gc_view_arrays(&sliced_batch)?; + + assert_eq!(gc_batch.num_columns(), 2); + assert_eq!(gc_batch.num_rows(), 50); + + Ok(()) + } + + #[test] + fn test_verify_gc_triggers_for_sliced_arrays() -> Result<()> { + let strings: Vec = (0..200) + .map(|i| { + format!( + "http://example.com/very/long/path/that/exceeds/inline/threshold/{i}" + ) + }) + .collect(); + + let string_array = StringViewArray::from(strings); + let schema = Arc::new(Schema::new(vec![Field::new( + "url", + DataType::Utf8View, + false, + )])); + + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(string_array.clone()) as ArrayRef], + )?; + + let sliced = batch.slice(0, 20); + + let sliced_array = sliced + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let should_gc = should_gc_view_array(sliced_array); + let waste_ratio = calculate_string_view_waste_ratio(sliced_array); + + assert!( + waste_ratio > 0.8, + "Waste ratio should be > 0.8 for sliced array" + ); + assert!( + should_gc, + "GC should trigger for sliced array with high waste" + ); + + Ok(()) + } + + #[test] + fn test_reproduce_issue_19414_string_view_spill_without_gc() -> Result<()> { + use arrow::array::StringViewArray; + use std::fs; + + let num_rows = 1000; + let mut strings = Vec::with_capacity(num_rows); + + for i in 0..num_rows { + let url = match i % 5 { + 0 => format!( + "http://irr.ru/index.php?showalbum/login-leniya7777294,938303130/{i}" + ), + 1 => format!("http://komme%2F27.0.1453.116/very/long/path/{i}"), + 2 => format!("https://produkty%2Fproduct/category/item/{i}"), + 3 => format!( + "http://irr.ru/index.php?showalbum/login-kapusta-advert2668/{i}" + ), + 4 => format!( + "http://irr.ru/index.php?showalbum/login-kapustic/product/{i}" + ), + _ => unreachable!(), + }; + strings.push(url); + } + + let string_array = StringViewArray::from(strings); + let schema = Arc::new(Schema::new(vec![Field::new( + "URL", + DataType::Utf8View, + false, + )])); + + let original_batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(string_array.clone()) as ArrayRef], + )?; + + let total_buffer_size: usize = string_array + .data_buffers() + .iter() + .map(|buffer| buffer.capacity()) + .sum(); + + let mut sliced_batches = Vec::new(); + let slice_size = 100; + + for i in (0..num_rows).step_by(slice_size) { + let len = std::cmp::min(slice_size, num_rows - i); + let sliced = original_batch.slice(i, len); + sliced_batches.push(sliced); + } + + let env = Arc::new(RuntimeEnv::default()); + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let spill_manager = SpillManager::new(env, metrics, schema); + + let mut in_progress_file = spill_manager.create_in_progress_file("Test GC")?; + + for batch in &sliced_batches { + in_progress_file.append_batch(batch)?; + } + + let spill_file = in_progress_file.finish()?.unwrap(); + let file_size = fs::metadata(spill_file.path())?.len() as usize; + + let theoretical_without_gc = total_buffer_size * sliced_batches.len(); + let reduction_percent = ((theoretical_without_gc - file_size) as f64 + / theoretical_without_gc as f64) + * 100.0; + + assert!( + reduction_percent > 80.0, + "GC should reduce spill file size by >80%, got {reduction_percent:.1}%" + ); + + Ok(()) + } + + #[test] + fn test_spill_with_and_without_gc_comparison() -> Result<()> { + let num_rows = 400; + let strings: Vec = (0..num_rows) + .map(|i| { + format!( + "http://example.com/this/is/a/long/url/path/that/wont/be/inlined/{i}" + ) + }) + .collect(); + + let string_array = StringViewArray::from(strings); + let schema = Arc::new(Schema::new(vec![Field::new( + "url", + DataType::Utf8View, + false, + )])); + + let batch = + RecordBatch::try_new(schema, vec![Arc::new(string_array) as ArrayRef])?; + + let sliced_batch = batch.slice(0, 40); + + let array_without_gc = sliced_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let size_without_gc: usize = array_without_gc + .data_buffers() + .iter() + .map(|buffer| buffer.capacity()) + .sum(); + + let gc_batch = gc_view_arrays(&sliced_batch)?; + let array_with_gc = gc_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let size_with_gc: usize = array_with_gc + .data_buffers() + .iter() + .map(|buffer| buffer.capacity()) + .sum(); + + let reduction_percent = + ((size_without_gc - size_with_gc) as f64 / size_without_gc as f64) * 100.0; + + assert!( + reduction_percent > 85.0, + "Expected >85% reduction for 10% slice, got {reduction_percent:.1}%" + ); + + Ok(()) + } + + #[test] + fn test_gc_recurses_into_nested_view_arrays() -> Result<()> { + use arrow::array::{DictionaryArray, Int32Array}; + use arrow::buffer::Buffer; + + let strings: Vec = (0..200) + .map(|i| format!("http://example.com/nested/path/that/is/not/inlined/{i}")) + .collect(); + let string_values = Arc::new(StringViewArray::from(strings)) as ArrayRef; + + let list_data = ArrayDataBuilder::new(DataType::List(Arc::new( + Field::new_list_field(DataType::Utf8View, true), + ))) + .len(20) + .buffers(vec![Buffer::from_iter((0..=20).map(|i| i * 5_i32))]) + .child_data(vec![string_values.slice(0, 100).to_data()]) + .build()?; + let list_array = make_array(list_data); + + let keys = Int32Array::from_iter_values(0..20); + let dictionary = DictionaryArray::new(keys, string_values.slice(0, 20)); + let dictionary_array = Arc::new(dictionary) as ArrayRef; + + let schema = Arc::new(Schema::new(vec![ + Field::new( + "list_strings", + DataType::List(Arc::new(Field::new_list_field(DataType::Utf8View, true))), + false, + ), + Field::new( + "dictionary_strings", + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Utf8View), + ), + false, + ), + ])); + let batch = RecordBatch::try_new(schema, vec![list_array, dictionary_array])?; + let gc_batch = gc_view_arrays(&batch)?; + + let gc_list_values = gc_batch.column(0).to_data().child_data()[0].clone(); + let gc_list_values = make_array(gc_list_values); + let gc_list_values = gc_list_values + .as_any() + .downcast_ref::() + .unwrap(); + assert!( + calculate_string_view_waste_ratio(gc_list_values) < 0.2, + "GC should compact nested List child views" + ); + + let gc_dictionary_values = gc_batch.column(1).to_data().child_data()[0].clone(); + let gc_dictionary_values = make_array(gc_dictionary_values); + let gc_dictionary_values = gc_dictionary_values + .as_any() + .downcast_ref::() + .unwrap(); + assert!( + calculate_string_view_waste_ratio(gc_dictionary_values) < 0.2, + "GC should compact nested Dictionary values" + ); + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index 07ba6d3989bc5..c81c01bbe31a3 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -20,10 +20,9 @@ use super::{SpillReaderStream, in_progress_spill_file::InProgressSpillFile}; use crate::coop::cooperative; use crate::{common::spawn_buffered, metrics::SpillMetrics}; -use arrow::array::StringViewArray; -use arrow::datatypes::SchemaRef; +use arrow::array::{BinaryViewArray, GenericByteViewArray, StringViewArray}; +use arrow::datatypes::{ByteViewType, SchemaRef}; use arrow::record_batch::RecordBatch; -use datafusion_common::utils::memory::get_record_batch_memory_size; use datafusion_common::{DataFusionError, Result, config::SpillCompression}; use datafusion_execution::SendableRecordBatchStream; use datafusion_execution::disk_manager::RefCountedTempFile; @@ -127,10 +126,8 @@ impl SpillManager { if borrowed.num_rows() == 0 { return Ok(()); } - in_progress_file.append_batch(borrowed)?; - - max_record_batch_size = - max_record_batch_size.max(get_record_batch_memory_size(borrowed)); + let gc_sliced_size = in_progress_file.append_batch(borrowed)?; + max_record_batch_size = max_record_batch_size.max(gc_sliced_size); Result::<_, DataFusionError>::Ok(()) })?; @@ -153,9 +150,9 @@ impl SpillManager { while let Some(batch) = stream.next().await { let batch = batch?; - in_progress_file.append_batch(&batch)?; + let gc_sliced_size = in_progress_file.append_batch(&batch)?; - max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?); + max_record_batch_size = max_record_batch_size.max(gc_sliced_size); } let file = in_progress_file.finish()?; @@ -197,7 +194,7 @@ impl SpillManager { pub(crate) trait GetSlicedSize { /// Returns the size of the `RecordBatch` when sliced. /// Note: if multiple arrays or even a single array share the same data buffers, we may double count each buffer. - /// Therefore, make sure we call gc() or organize_stringview_arrays() before using this method. + /// Therefore, make sure we call gc() or gc_view_arrays() before using this method. fn get_sliced_size(&self) -> Result; } @@ -217,15 +214,24 @@ impl GetSlicedSize for RecordBatch { // "bytes needed if we materialized exactly this slice into fresh buffers". // This is a workaround until https://github.com/apache/arrow-rs/issues/8230 if let Some(sv) = array.as_any().downcast_ref::() { - for buffer in sv.data_buffers() { - total += buffer.capacity(); - } + total += byte_view_data_buffer_size(sv); + } + if let Some(bv) = array.as_any().downcast_ref::() { + total += byte_view_data_buffer_size(bv); } } Ok(total) } } +fn byte_view_data_buffer_size(array: &GenericByteViewArray) -> usize { + array + .data_buffers() + .iter() + .map(|buffer| buffer.capacity()) + .sum() +} + #[cfg(test)] mod tests { use crate::spill::{get_record_batch_memory_size, spill_manager::GetSlicedSize};