Skip to content
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ arrow-avro = { version = "58.1.0", default-features = false, features = [
"xz",
] }
arrow-buffer = { version = "58.1.0", default-features = false }
arrow-data = { version = "58.1.0", default-features = false }
arrow-flight = { version = "58.1.0", features = [
"flight-sql-experimental",
] }
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ name = "datafusion_physical_plan"

[dependencies]
arrow = { workspace = true }
arrow-data = { workspace = true }
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency seems unused.
The only occurrence of arrow_data is at https://github.com/apache/datafusion/pull/21633/changes#diff-1f7d15c867929af294664ebbde4e8c9038186222cbb95ed86e527406cf066e84R463 for a test helper.

arrow-ord = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
Expand All @@ -72,6 +73,7 @@ pin-project-lite = "^0.2.7"
tokio = { workspace = true }

[dev-dependencies]
arrow-data = { workspace = true }
criterion = { workspace = true, features = ["async_futures"] }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-window = { workspace = true }
Expand Down
74 changes: 3 additions & 71 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::{
Statistics,
};

use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
use arrow::datatypes::SchemaRef;
use datafusion_common::config::SpillCompression;
Expand Down Expand Up @@ -418,8 +418,6 @@ impl ExternalSorter {
Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
}

Self::organize_stringview_arrays(globally_sorted_batches)?;

debug!("Spilling sort data of ExternalSorter to disk whilst inserting");

let batches_to_spill = std::mem::take(globally_sorted_batches);
Expand All @@ -431,10 +429,9 @@ impl ExternalSorter {
})?;

for batch in batches_to_spill {
in_progress_file.append_batch(&batch)?;
let gc_sliced_size = in_progress_file.append_batch(&batch)?;

*max_record_batch_size =
(*max_record_batch_size).max(batch.get_sliced_size()?);
*max_record_batch_size = (*max_record_batch_size).max(gc_sliced_size);
}

assert_or_internal_err!(
Expand Down Expand Up @@ -463,71 +460,6 @@ impl ExternalSorter {
Ok(())
}

/// Reconstruct `globally_sorted_batches` to organize the payload buffers of each
/// `StringViewArray` in sequential order by calling `gc()` on them.
///
/// Note this is a workaround until <https://github.com/apache/arrow-rs/issues/7185> is
/// available
///
/// # Rationale
/// After (merge-based) sorting, all batches will be sorted into a single run,
/// but physically this sorted run is chunked into many small batches. For
/// `StringViewArray`s inside each sorted run, their inner buffers are not
/// re-constructed by default, leading to non-sequential payload locations
/// (permutated by `interleave()` Arrow kernel). A single payload buffer might
/// be shared by multiple `RecordBatch`es.
/// When writing each batch to disk, the writer has to write all referenced buffers,
/// because they have to be read back one by one to reduce memory usage. This
/// causes extra disk reads and writes, and potentially execution failure.
///
/// # Example
/// Before sorting:
/// batch1 -> buffer1
/// batch2 -> buffer2
///
/// sorted_batch1 -> buffer1
/// -> buffer2
/// sorted_batch2 -> buffer1
/// -> buffer2
///
/// Then when spilling each batch, the writer has to write all referenced buffers
/// repeatedly.
fn organize_stringview_arrays(
globally_sorted_batches: &mut Vec<RecordBatch>,
) -> Result<()> {
let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());

for batch in globally_sorted_batches.drain(..) {
let mut new_columns: Vec<Arc<dyn Array>> =
Vec::with_capacity(batch.num_columns());

let mut arr_mutated = false;
for array in batch.columns() {
if let Some(string_view_array) =
array.as_any().downcast_ref::<StringViewArray>()
{
let new_array = string_view_array.gc();
new_columns.push(Arc::new(new_array));
arr_mutated = true;
} else {
new_columns.push(Arc::clone(array));
}
}

let organized_batch = if arr_mutated {
RecordBatch::try_new(batch.schema(), new_columns)?
} else {
batch
};

organized_batches.push(organized_batch);
}

*globally_sorted_batches = organized_batches;

Ok(())
}

/// Sorts the in-memory batches and merges them into a single sorted run, then writes
/// the result to spill files.
async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
Expand Down
20 changes: 16 additions & 4 deletions datafusion/physical-plan/src/spill/in_progress_spill_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use arrow::array::RecordBatch;
use datafusion_common::exec_datafusion_err;
use datafusion_execution::disk_manager::RefCountedTempFile;

use super::{IPCStreamWriter, spill_manager::SpillManager};
use super::{
IPCStreamWriter, gc_view_arrays,
spill_manager::{GetSlicedSize, SpillManager},
};

/// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`.
/// Caller is able to use this struct to incrementally append in-memory batches to
Expand All @@ -51,16 +54,25 @@ impl InProgressSpillFile {

/// Appends a `RecordBatch` to the spill file, initializing the writer if necessary.
///
/// Before writing, performs GC on StringView/BinaryView arrays to compact backing
/// buffers. When a view array is sliced, it still references the original full buffers,
/// causing massive spill files without GC (see issue #19414: 820MB → 33MB after GC).
///
/// Returns the post-GC sliced memory size of the batch for memory accounting.
///
/// # Errors
/// - Returns an error if the file is not active (has been finalized)
/// - Returns an error if appending would exceed the disk usage limit configured
/// by `max_temp_directory_size` in `DiskManager`
pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<()> {
pub fn append_batch(&mut self, batch: &RecordBatch) -> Result<usize> {
if self.in_progress_file.is_none() {
return Err(exec_datafusion_err!(
"Append operation failed: No active in-progress file. The file may have already been finalized."
));
}

let gc_batch = gc_view_arrays(batch)?;

if self.writer.is_none() {
// Use the SpillManager's declared schema rather than the batch's schema.
// Individual batches may have different schemas (e.g., different nullability)
Expand All @@ -87,7 +99,7 @@ impl InProgressSpillFile {
}
}
if let Some(writer) = &mut self.writer {
let (spilled_rows, _) = writer.write(batch)?;
let (spilled_rows, _) = writer.write(&gc_batch)?;
if let Some(in_progress_file) = &mut self.in_progress_file {
let pre_size = in_progress_file.current_disk_usage();
in_progress_file.update_disk_usage()?;
Expand All @@ -102,7 +114,7 @@ impl InProgressSpillFile {
unreachable!() // Already checked inside current function
}
}
Ok(())
gc_batch.get_sliced_size()
}

pub fn flush(&mut self) -> Result<()> {
Expand Down
Loading
Loading