Skip to content
Draft
12 changes: 11 additions & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,17 @@ config_namespace! {
/// When sorting, below what size should data be concatenated
/// and sorted in a single RecordBatch rather than sorted in
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
///
/// Deprecated: this option is no longer used. The sort pipeline
/// now always coalesces batches before sorting. Use
/// `sort_coalesce_target_rows` instead.
pub sort_in_place_threshold_bytes: usize, warn = "`sort_in_place_threshold_bytes` is deprecated and ignored. Use `sort_coalesce_target_rows` instead.", default = 1024 * 1024

/// Target number of rows to coalesce before sorting in ExternalSorter.
///
/// Larger values reduce merge fan-in by producing fewer, larger
/// sorted runs.
pub sort_coalesce_target_rows: usize, default = 32768

/// Maximum buffer capacity (in bytes) per partition for BufferExec
/// inserted during sort pushdown optimization.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn test_sort_10k_mem() {
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_100k_mem() {
for (batch_size, should_spill) in
[(5, false), (10000, false), (20000, true), (1000000, true)]
[(5, false), (10000, false), (50000, true), (1000000, true)]
{
let (input, collected) = SortTest::new()
.with_int32_batches(batch_size)
Expand Down
19 changes: 2 additions & 17 deletions datafusion/core/tests/fuzz_cases/sort_query_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ impl SortFuzzerTestGenerator {
pub fn generate_random_query(&self, rng_seed: u64) -> (String, Option<usize>) {
let mut rng = StdRng::seed_from_u64(rng_seed);

// Pick 1-3 ORDER BY columns.
let num_columns = rng.random_range(1..=3).min(self.selected_columns.len());
let selected_columns: Vec<_> = self
.selected_columns
Expand Down Expand Up @@ -497,17 +498,6 @@ impl SortFuzzerTestGenerator {
..=(per_partition_mem_limit as f64 * 0.3) as usize,
);

// 1 to 3 times of the approx batch size. Setting this to a very large nvalue
// will cause external sort to fail.
let sort_in_place_threshold_bytes = if with_memory_limit {
// For memory-limited query, setting `sort_in_place_threshold_bytes` too
// large will cause failure.
0
} else {
let dataset_size = self.dataset_state.as_ref().unwrap().dataset_size;
rng.random_range(0..=dataset_size * 2_usize)
};

// Set up strings for printing
let memory_limit_str = if with_memory_limit {
human_readable_size(memory_limit)
Expand All @@ -530,16 +520,11 @@ impl SortFuzzerTestGenerator {
" Sort spill reservation bytes: {}",
human_readable_size(sort_spill_reservation_bytes)
);
println!(
" Sort in place threshold bytes: {}",
human_readable_size(sort_in_place_threshold_bytes)
);

let config = SessionConfig::new()
.with_target_partitions(num_partitions)
.with_batch_size(init_state.approx_batch_num_rows / 2)
.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes)
.with_sort_in_place_threshold_bytes(sort_in_place_threshold_bytes);
.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes);

let memory_pool: Arc<dyn MemoryPool> = if with_memory_limit {
Arc::new(FairSpillPool::new(memory_limit))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ async fn test_sort_with_limited_memory() -> Result<()> {
})
.await?;

let total_spill_files_size = spill_count * record_batch_size;
// The chunked sort pipeline is more memory-efficient (shrinks
// reservations after sorting), so total spill size may be less than
// pool size. Just verify that spilling occurred.
assert!(
total_spill_files_size > pool_size,
"Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}",
spill_count > 0,
"Expected spilling under memory pressure, but spill_count was 0",
);

Ok(())
Expand Down
24 changes: 7 additions & 17 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,7 @@ async fn sort_spill_reservation() {
let scenario = Scenario::new_dictionary_strings(1);
let partition_size = scenario.partition_size();

let base_config = SessionConfig::new()
// do not allow the sort to use the 'concat in place' path
.with_sort_in_place_threshold_bytes(10);
let base_config = SessionConfig::new();

// This test case shows how sort_spill_reservation works by
// purposely sorting data that requires non trivial memory to
Expand Down Expand Up @@ -313,26 +311,19 @@ async fn sort_spill_reservation() {
]
);

let config = base_config
.clone()
// provide insufficient reserved space for merging,
// the sort will fail while trying to merge
.with_sort_spill_reservation_bytes(1024);
// With low reservation, the sort should still succeed because
// the chunked sort pipeline eagerly sorts and the multi-level merge
// handles low merge memory by reducing fan-in.
let config = base_config.clone().with_sort_spill_reservation_bytes(1024);

test.clone()
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed",
"with top memory consumers (across reservations) as:",
"B for ExternalSorterMerge",
])
.with_expected_success()
.with_config(config)
.run()
.await;

let config = base_config
// reserve sufficient space up front for merge and this time,
// which will force the spills to happen with less buffered
// input and thus with enough to merge.
// reserve sufficient space up front for merge
.with_sort_spill_reservation_bytes(mem_limit / 2);

test.with_config(config).with_expected_success().run().await;
Expand Down Expand Up @@ -583,7 +574,6 @@ async fn setup_context(

let config = SessionConfig::new()
.with_sort_spill_reservation_bytes(64 * 1024) // 256KB
.with_sort_in_place_threshold_bytes(0)
.with_spill_compression(spill_compression)
.with_batch_size(64) // To reduce test memory usage
.with_target_partitions(1);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/runtime_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async fn test_invalid_memory_limit_when_limit_is_not_numeric() {
async fn test_max_temp_directory_size_enforcement() {
let ctx = SessionContext::new();

ctx.sql("SET datafusion.runtime.memory_limit = '1M'")
ctx.sql("SET datafusion.runtime.memory_limit = '256K'")
.await
.unwrap()
.collect()
Expand Down
23 changes: 23 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,15 @@ impl SessionConfig {
/// Set the size of [`sort_in_place_threshold_bytes`] to control
/// how sort does things.
///
/// Deprecated: this option is no longer used. Use
/// [`with_sort_coalesce_target_rows`] instead.
///
/// [`sort_in_place_threshold_bytes`]: datafusion_common::config::ExecutionOptions::sort_in_place_threshold_bytes
/// [`with_sort_coalesce_target_rows`]: Self::with_sort_coalesce_target_rows
#[deprecated(
since = "46.0.0",
note = "No longer used. Sort pipeline now coalesces batches before sorting. Use with_sort_coalesce_target_rows instead."
)]
pub fn with_sort_in_place_threshold_bytes(
mut self,
sort_in_place_threshold_bytes: usize,
Expand All @@ -474,6 +482,21 @@ impl SessionConfig {
self
}

/// Set the target number of rows to coalesce before sorting.
///
/// Larger values reduce merge fan-in by producing fewer, larger
/// sorted runs.
///
/// [`sort_coalesce_target_rows`]: datafusion_common::config::ExecutionOptions::sort_coalesce_target_rows
pub fn with_sort_coalesce_target_rows(
mut self,
sort_coalesce_target_rows: usize,
) -> Self {
self.options_mut().execution.sort_coalesce_target_rows =
sort_coalesce_target_rows;
self
}

/// Enables or disables the enforcement of batch size in joins
pub fn with_enforce_batch_size_in_joins(
mut self,
Expand Down
Loading
Loading