diff --git a/Cargo.lock b/Cargo.lock index cd603efd859a..e26d57750566 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1969,6 +1969,7 @@ dependencies = [ "liblzma", "log", "object_store", + "parking_lot", "rand 0.9.2", "tempfile", "tokio", diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index ca4d097c37a4..5341dd30d752 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -19,6 +19,7 @@ use crate::sort::reverse_row_selection; use datafusion_common::{Result, assert_eq_or_internal_err}; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; +use std::sync::Arc; /// A selection of rows and row groups within a ParquetFile to decode. /// @@ -339,6 +340,11 @@ impl ParquetAccessPlan { self.row_groups } + /// Return the count of row groups that should be scanned (not skipped). + pub fn scan_count(&self) -> usize { + self.row_groups.iter().filter(|rg| rg.should_scan()).count() + } + /// Prepare this plan and resolve to the final `PreparedAccessPlan` pub(crate) fn prepare( self, @@ -398,6 +404,52 @@ impl PreparedAccessPlan { } } +/// Split a [`ParquetAccessPlan`] in two roughly equal halves by row group. +/// +/// This is used as the `split_fn` in [`SplittableExt`] so that morsel +/// splitting divides row groups instead of using byte-range heuristics. +/// +/// [`SplittableExt`]: datafusion_datasource::SplittableExt +pub fn split_parquet_access_plan( + plan: &dyn std::any::Any, +) -> Option<( + Arc, + Arc, +)> { + let plan = plan.downcast_ref::()?; + if plan.scan_count() < 2 { + return None; + } + + let target = plan.scan_count() / 2; + let mut seen = 0; + let split_idx = plan + .inner() + .iter() + .position(|rg| { + if rg.should_scan() { + seen += 1; + } + seen >= target && rg.should_scan() + }) + .unwrap_or(0) + + 1; + + let mut first = plan.inner().to_vec(); + let mut second = plan.inner().to_vec(); + for rg in first.iter_mut().skip(split_idx) { + *rg = RowGroupAccess::Skip; + } + for rg in second.iter_mut().take(split_idx) { + *rg = RowGroupAccess::Skip; + } + + Some(( + Arc::new(ParquetAccessPlan::new(first)), + Arc::new(ParquetAccessPlan::new(second)), + )) +} + #[cfg(test)] mod test { use super::*; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 1db055880d7d..f45fc111369a 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -1536,15 +1536,22 @@ fn create_initial_plan( row_group_count: usize, ) -> Result { if let Some(extensions) = extensions { - if let Some(access_plan) = extensions.downcast_ref::() { + // Try direct ParquetAccessPlan first, then unwrap SplittableExt. + let plan = extensions + .downcast_ref::() + .or_else(|| { + extensions + .downcast_ref::() + .and_then(|w| w.inner.downcast_ref::()) + }); + + if let Some(access_plan) = plan { let plan_len = access_plan.len(); if plan_len != row_group_count { return exec_err!( "Invalid ParquetAccessPlan for {file_name}. Specified {plan_len} row groups, but file has {row_group_count}" ); } - - // check row group count matches the plan return Ok(access_plan.clone()); } else { debug!("DataSourceExec Ignoring unknown extension specified for {file_name}"); diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml index 402752165897..40e2271f4520 100644 --- a/datafusion/datasource/Cargo.toml +++ b/datafusion/datasource/Cargo.toml @@ -64,6 +64,7 @@ itertools = { workspace = true } liblzma = { workspace = true, optional = true } log = { workspace = true } object_store = { workspace = true } +parking_lot = { workspace = true } rand = { workspace = true } tempfile = { workspace = true, optional = true } tokio = { workspace = true } diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index f4e4e0a4dec0..e75476f6ee55 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -24,7 +24,8 @@ use crate::file_groups::FileGroup; use crate::{ PartitionedFile, display::FileGroupsDisplay, file::FileSource, file_compression_type::FileCompressionType, file_stream::FileStreamBuilder, - source::DataSource, statistics::MinMaxStatistics, + file_stream::work_source::SharedWorkSource, source::DataSource, + statistics::MinMaxStatistics, }; use arrow::datatypes::FieldRef; use arrow::datatypes::{DataType, Schema, SchemaRef}; @@ -38,6 +39,7 @@ use datafusion_execution::{ }; use datafusion_expr::Operator; +use crate::source::OpenArgs; use datafusion_physical_expr::expressions::{BinaryExpr, Column}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::utils::reassign_expr_columns; @@ -580,6 +582,15 @@ impl DataSource for FileScanConfig { partition: usize, context: Arc, ) -> Result { + self.open_with_args(OpenArgs::new(partition, context)) + } + + fn open_with_args(&self, args: OpenArgs) -> Result { + let OpenArgs { + partition, + context, + sibling_state, + } = args; let object_store = context.runtime_env().object_store(&self.object_store_url)?; let batch_size = self .batch_size @@ -589,8 +600,17 @@ impl DataSource for FileScanConfig { let morselizer = source.create_morselizer(object_store, self, partition)?; + // Extract the shared work source from the sibling state if it exists. + // This allows multiple sibling streams to steal work from a single + // shared queue of unopened files. + let shared_work_source = sibling_state + .as_ref() + .and_then(|state| state.downcast_ref::()) + .cloned(); + let stream = FileStreamBuilder::new(self) .with_partition(partition) + .with_shared_work_source(shared_work_source) .with_morselizer(morselizer) .with_metrics(source.metrics()) .build()?; @@ -991,6 +1011,20 @@ impl DataSource for FileScanConfig { // Delegate to the file source self.file_source.apply_expressions(f) } + + /// Create any shared state that should be passed between sibling streams + /// during one execution. + /// + /// This returns `None` when sibling streams must not share work, such as + /// when file order must be preserved or the file groups define the output + /// partitioning needed for the rest of the plan + fn create_sibling_state(&self) -> Option> { + if self.preserve_order || self.partitioned_by_file_group { + return None; + } + + Some(Arc::new(SharedWorkSource::from_config(self)) as Arc) + } } impl FileScanConfig { @@ -1368,19 +1402,33 @@ mod tests { use super::*; use crate::TableSchema; + use crate::source::DataSourceExec; use crate::test_util::col; use crate::{ generate_test_files, test_util::MockSource, tests::aggr_test_schema, verify_sort_integrity, }; + use arrow::array::{Int32Array, RecordBatch}; use arrow::datatypes::Field; use datafusion_common::ColumnStatistics; use datafusion_common::stats::Precision; + use datafusion_common::tree_node::TreeNodeRecursion; + use datafusion_common::{Result, assert_batches_eq, internal_err}; + use datafusion_execution::TaskContext; use datafusion_expr::SortExpr; + use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::create_physical_sort_expr; use datafusion_physical_expr::expressions::Literal; use datafusion_physical_expr::projection::ProjectionExpr; + use datafusion_physical_expr::projection::ProjectionExprs; + use datafusion_physical_plan::ExecutionPlan; + use datafusion_physical_plan::execution_plan::collect; + use futures::FutureExt as _; + use futures::StreamExt as _; + use futures::stream; + use object_store::ObjectStore; + use std::fmt::Debug; #[derive(Clone)] struct InexactSortPushdownSource { @@ -1400,7 +1448,7 @@ mod tests { impl FileSource for InexactSortPushdownSource { fn create_file_opener( &self, - _object_store: Arc, + _object_store: Arc, _base_config: &FileScanConfig, _partition: usize, ) -> Result> { @@ -2288,6 +2336,88 @@ mod tests { assert_eq!(partition_stats.total_byte_size, Precision::Exact(800)); } + /// Regression test for reusing a `DataSourceExec` after its execution-local + /// shared work queue has been drained. + /// + /// This test uses a single file group with two files so the scan creates a + /// shared unopened-file queue. Executing after `reset_state` must recreate + /// the shared queue and return the same rows again. + #[tokio::test] + async fn reset_state_recreates_shared_work_source() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int32, + false, + )])); + let file_source = Arc::new( + MockSource::new(Arc::clone(&schema)) + .with_file_opener(Arc::new(ResetStateTestFileOpener { schema })), + ); + + let config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_group(FileGroup::new(vec![ + PartitionedFile::new("file1.parquet", 100), + PartitionedFile::new("file2.parquet", 100), + ])) + .build(); + + let exec: Arc = DataSourceExec::from_data_source(config); + let task_ctx = Arc::new(TaskContext::default()); + + // Running the same scan after resetting the state, should + // produce the same answer. + let first_run = collect(Arc::clone(&exec), Arc::clone(&task_ctx)).await?; + let reset_exec = exec.reset_state()?; + let second_run = collect(reset_exec, task_ctx).await?; + + let expected = [ + "+-------+", + "| value |", + "+-------+", + "| 1 |", + "| 2 |", + "+-------+", + ]; + assert_batches_eq!(expected, &first_run); + assert_batches_eq!(expected, &second_run); + + Ok(()) + } + + /// Test-only `FileOpener` that turns file names like `file1.parquet` into a + /// single-batch stream containing that numeric value + #[derive(Debug)] + struct ResetStateTestFileOpener { + schema: SchemaRef, + } + + impl crate::file_stream::FileOpener for ResetStateTestFileOpener { + fn open( + &self, + file: PartitionedFile, + ) -> Result { + let value = file + .object_meta + .location + .as_ref() + .trim_start_matches("file") + .trim_end_matches(".parquet") + .parse::() + .expect("invalid test file name"); + let schema = Arc::clone(&self.schema); + Ok(async move { + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(Int32Array::from(vec![value]))], + ) + .expect("test batch should be valid"); + Ok(stream::iter(vec![Ok(batch)]).boxed()) + } + .boxed()) + } + } + #[test] fn test_output_partitioning_not_partitioned_by_file_group() { let file_schema = aggr_test_schema(); @@ -2476,7 +2606,7 @@ mod tests { impl FileSource for ExactSortPushdownSource { fn create_file_opener( &self, - _object_store: Arc, + _object_store: Arc, _base_config: &FileScanConfig, _partition: usize, ) -> Result> { diff --git a/datafusion/datasource/src/file_stream/builder.rs b/datafusion/datasource/src/file_stream/builder.rs index efe9c39ce3b3..7034e902550a 100644 --- a/datafusion/datasource/src/file_stream/builder.rs +++ b/datafusion/datasource/src/file_stream/builder.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use crate::file_scan_config::FileScanConfig; use crate::file_stream::scan_state::ScanState; +use crate::file_stream::work_source::{SharedWorkSource, WorkSource}; use crate::morsel::{FileOpenerMorselizer, Morselizer}; use datafusion_common::{Result, internal_err}; use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet}; @@ -33,10 +34,11 @@ pub struct FileStreamBuilder<'a> { morselizer: Option>, metrics: Option<&'a ExecutionPlanMetricsSet>, on_error: OnError, + shared_work_source: Option, } impl<'a> FileStreamBuilder<'a> { - /// Create a new builder. + /// Create a new builder for [`FileStream`]. pub fn new(config: &'a FileScanConfig) -> Self { Self { config, @@ -44,6 +46,7 @@ impl<'a> FileStreamBuilder<'a> { morselizer: None, metrics: None, on_error: OnError::Fail, + shared_work_source: None, } } @@ -81,6 +84,15 @@ impl<'a> FileStreamBuilder<'a> { self } + /// Configure the [`SharedWorkSource`] for sibling work stealing. + pub(crate) fn with_shared_work_source( + mut self, + shared_work_source: Option, + ) -> Self { + self.shared_work_source = shared_work_source; + self + } + /// Build the configured [`FileStream`]. pub fn build(self) -> Result { let Self { @@ -89,6 +101,7 @@ impl<'a> FileStreamBuilder<'a> { morselizer, metrics, on_error, + shared_work_source, } = self; let Some(partition) = partition else { @@ -106,10 +119,14 @@ impl<'a> FileStreamBuilder<'a> { "FileStreamBuilder invalid partition index: {partition}" ); }; + let work_source = match shared_work_source { + Some(shared) => WorkSource::Shared(shared), + None => WorkSource::Local(file_group.into_inner().into()), + }; let file_stream_metrics = FileStreamMetrics::new(metrics, partition); let scan_state = Box::new(ScanState::new( - file_group.into_inner(), + work_source, config.limit, morselizer, on_error, diff --git a/datafusion/datasource/src/file_stream/mod.rs b/datafusion/datasource/src/file_stream/mod.rs index ff71f1602308..b9aabacf64c1 100644 --- a/datafusion/datasource/src/file_stream/mod.rs +++ b/datafusion/datasource/src/file_stream/mod.rs @@ -24,6 +24,7 @@ mod builder; mod metrics; mod scan_state; +pub(crate) mod work_source; use std::pin::Pin; use std::sync::Arc; @@ -175,6 +176,7 @@ mod tests { IoFutureId, MockMorselizer, MockPlanBuilder, MockPlanner, MorselId, PendingPlannerBuilder, PollsToResolve, }; + use crate::source::DataSource; use crate::tests::make_partition; use crate::{PartitionedFile, TableSchema}; use arrow::array::{AsArray, RecordBatch}; @@ -184,14 +186,22 @@ mod tests { use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{FutureExt as _, StreamExt as _}; + use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; - use crate::file_stream::{FileOpenFuture, FileOpener, FileStreamBuilder, OnError}; + use crate::file_stream::{ + FileOpenFuture, FileOpener, FileStream, FileStreamBuilder, OnError, + work_source::SharedWorkSource, + }; use crate::test_util::MockSource; use datafusion_common::{assert_batches_eq, exec_err, internal_err}; + /// Test identifier for one `FileStream` partition. + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] + struct PartitionId(usize); + /// Test `FileOpener` which will simulate errors during file opening or scanning #[derive(Default)] struct TestOpener { @@ -758,8 +768,8 @@ mod tests { async fn morsel_two_ios_one_batch() -> Result<()> { let test = FileStreamMorselTest::new().with_file( MockPlanner::builder("file1.parquet") - .add_plan(PendingPlannerBuilder::new(IoFutureId(1)).build()) - .add_plan(PendingPlannerBuilder::new(IoFutureId(2)).build()) + .add_plan(PendingPlannerBuilder::new(IoFutureId(1))) + .add_plan(PendingPlannerBuilder::new(IoFutureId(2))) .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 42)) .return_none(), ); @@ -871,8 +881,7 @@ mod tests { async fn morsel_ready_child_planner() -> Result<()> { let child_planner = MockPlanner::builder("child planner") .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 42)) - .return_none() - .build(); + .return_none(); let test = FileStreamMorselTest::new().with_file( MockPlanner::builder("file1.parquet") @@ -1001,11 +1010,265 @@ mod tests { Ok(()) } - /// Tests how FileStream opens and processes files. + /// Return a morsel test with two partitions: + /// Partition 0: file1, file2, file3 + /// Partition 1: file4 + /// + /// Partition 1 has only 1 file but it polled first 4 times + fn two_partition_morsel_test() -> FileStreamMorselTest { + FileStreamMorselTest::new() + // Partition 0 has three files + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file2.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file3.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(12), 103)) + .return_none(), + ) + // Partition 1 has only one file, but is polled first + .with_file_in_partition( + PartitionId(1), + MockPlanner::builder("file4.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(13), 201)) + .return_none(), + ) + .with_reads(vec![ + PartitionId(1), + PartitionId(1), + PartitionId(1), + PartitionId(1), + PartitionId(1), + ]) + } + + /// Verifies that an idle sibling stream can steal shared files from + /// another stream once it exhausts its own local work. + #[tokio::test] + async fn morsel_shared_files_can_be_stolen() -> Result<()> { + let test = two_partition_morsel_test().with_file_stream_events(false); + + // Partition 0 starts with 3 files, but Partition 1 is polled first. + // Since Partition is polled first, it will run all the files even those + // that were assigned to Partition 0. + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Done + ----- Partition 1 ----- + Batch: 101 + Batch: 102 + Batch: 103 + Batch: 201 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that a stream that must preserve order keeps its files local + /// and therefore cannot steal from a sibling shared queue. + #[tokio::test] + async fn morsel_preserve_order_keeps_files_local() -> Result<()> { + // same fixture as `morsel_shared_files_can_be_stolen` but marked as + // preserve-order + let test = two_partition_morsel_test() + .with_preserve_order(true) + .with_file_stream_events(false); + + // Even though that Partition 1 is polled first, it can not steal files + // from partition 0. The three files originally assigned to Partition 0 + // must be evaluated by Partition 0. + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Batch: 101 + Batch: 102 + Batch: 103 + Done + ----- Partition 1 ----- + Batch: 201 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that `partitioned_by_file_group` disables shared work stealing. + #[tokio::test] + async fn morsel_partitioned_by_file_group_keeps_files_local() -> Result<()> { + // same fixture as `morsel_shared_files_can_be_stolen` but marked as + // preserve-partitioned + let test = two_partition_morsel_test() + .with_partitioned_by_file_group(true) + .with_file_stream_events(false); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Batch: 101 + Batch: 102 + Batch: 103 + Done + ----- Partition 1 ----- + Batch: 201 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that an empty sibling can immediately steal shared files when + /// it is polled before the stream that originally owned them. + #[tokio::test] + async fn morsel_empty_sibling_can_steal() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file2.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102)) + .return_none(), + ) + // Poll the empty sibling first so it steals both files. + .with_reads(vec![PartitionId(1), PartitionId(1), PartitionId(1)]) + .with_file_stream_events(false); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Done + ----- Partition 1 ----- + Batch: 101 + Batch: 102 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Ensures that if a sibling is built and polled + /// before another sibling has been built and contributed its files to the + /// shared queue, the first sibling does not finish prematurely. + #[tokio::test] + async fn morsel_empty_sibling_can_finish_before_shared_work_exists() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file2.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102)) + .return_none(), + ) + // Build streams lazily so partition 1 can poll the shared queue + // before partition 0 has contributed its files. Once partition 0 + // is built, a later poll of partition 1 can still steal one of + // them from the shared queue. + .with_build_streams_on_first_read(true) + .with_reads(vec![PartitionId(1), PartitionId(0), PartitionId(1)]) + .with_file_stream_events(false); + + // Partition 1 polls too early once, then later steals one file after + // partition 0 has populated the shared queue. + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Batch: 102 + Done + ----- Partition 1 ----- + Batch: 101 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Verifies that one fast sibling can drain shared files that originated + /// in more than one other partition. + #[tokio::test] + async fn morsel_one_sibling_can_drain_multiple_siblings() -> Result<()> { + let test = FileStreamMorselTest::new() + .with_file_in_partition( + PartitionId(0), + MockPlanner::builder("file1.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101)) + .return_none(), + ) + // Partition 1 has two files + .with_file_in_partition( + PartitionId(1), + MockPlanner::builder("file2.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102)) + .return_none(), + ) + .with_file_in_partition( + PartitionId(1), + MockPlanner::builder("file3.parquet") + .add_plan(MockPlanBuilder::new().with_morsel(MorselId(12), 103)) + .return_none(), + ) + // Partition 2 starts empty but is polled first, so it should drain + // the shared queue across both sibling partitions. + .with_reads(vec![ + PartitionId(2), + PartitionId(2), + PartitionId(1), + PartitionId(2), + ]) + .with_file_stream_events(false); + + insta::assert_snapshot!(test.run().await.unwrap(), @r" + ----- Partition 0 ----- + Done + ----- Partition 1 ----- + Batch: 103 + Done + ----- Partition 2 ----- + Batch: 101 + Batch: 102 + Done + ----- File Stream Events ----- + (omitted due to with_file_stream_events(false)) + "); + + Ok(()) + } + + /// Tests how one or more `FileStream`s consume morselized file work. #[derive(Clone)] struct FileStreamMorselTest { morselizer: MockMorselizer, - file_names: Vec, + partition_files: BTreeMap>, + preserve_order: bool, + partitioned_by_file_group: bool, + file_stream_events: bool, + build_streams_on_first_read: bool, + reads: Vec, limit: Option, } @@ -1014,75 +1277,238 @@ mod tests { fn new() -> Self { Self { morselizer: MockMorselizer::new(), - file_names: vec![], + partition_files: BTreeMap::new(), + preserve_order: false, + partitioned_by_file_group: false, + file_stream_events: true, + build_streams_on_first_read: false, + reads: vec![], limit: None, } } - /// Adds one file and its root planner to the test input. - fn with_file(mut self, planner: impl Into) -> Self { + /// Adds one file and its root planner to partition 0. + fn with_file(self, planner: impl Into) -> Self { + self.with_file_in_partition(PartitionId(0), planner) + } + + /// Adds one file and its root planner to the specified input partition. + fn with_file_in_partition( + mut self, + partition: PartitionId, + planner: impl Into, + ) -> Self { let planner = planner.into(); - self.file_names.push(planner.file_path().to_string()); - self.morselizer = self.morselizer.with_file(planner); + let file_path = planner.file_path().to_string(); + self.morselizer = self.morselizer.with_planner(planner); + self.partition_files + .entry(partition) + .or_default() + .push(file_path); self } - /// Sets a global output limit for the stream. + /// Marks the stream (and all partitions) to preserve the specified file + /// order. + fn with_preserve_order(mut self, preserve_order: bool) -> Self { + self.preserve_order = preserve_order; + self + } + + /// Marks the test scan as pre-partitioned by file group, which should + /// force each stream to keep its own files local. + fn with_partitioned_by_file_group( + mut self, + partitioned_by_file_group: bool, + ) -> Self { + self.partitioned_by_file_group = partitioned_by_file_group; + self + } + + /// Controls whether scheduler events are included in the snapshot. + /// + /// When disabled, `run()` still includes the event section header but + /// replaces the trace with a fixed placeholder so tests can focus only + /// on the output batches. + fn with_file_stream_events(mut self, file_stream_events: bool) -> Self { + self.file_stream_events = file_stream_events; + self + } + + /// Controls whether streams are all built up front or lazily on their + /// first read. + /// + /// The default builds all streams before polling begins, which matches + /// normal execution. Tests may enable lazy creation to model races + /// where one sibling polls before another has contributed its files to + /// the shared queue. + fn with_build_streams_on_first_read( + mut self, + build_streams_on_first_read: bool, + ) -> Self { + self.build_streams_on_first_read = build_streams_on_first_read; + self + } + + /// Sets the partition polling order. + /// + /// `run()` polls these partitions in the listed order first. After + /// those explicit reads are exhausted, it completes to round + /// robin across all configured partitions, skipping any streams that + /// have already finished. + /// + /// This allows testing early scheduling decisions explicit in a test + /// while avoiding a fully scripted poll trace for the remainder. + fn with_reads(mut self, reads: Vec) -> Self { + self.reads = reads; + self + } + + /// Sets a global output limit for all streams created by this test. fn with_limit(mut self, limit: usize) -> Self { self.limit = Some(limit); self } - /// Runs the test returns combined output and scheduler trace text as a String. + /// Runs the test and returns combined stream output and scheduler + /// trace text. async fn run(self) -> Result { let observer = self.morselizer.observer().clone(); observer.clear(); - let config = self.test_config(); let metrics_set = ExecutionPlanMetricsSet::new(); - let mut stream = FileStreamBuilder::new(&config) - .with_partition(0) - .with_morselizer(Box::new(self.morselizer)) - .with_metrics(&metrics_set) - .build()?; + let partition_count = self.num_partitions(); - let mut stream_contents = Vec::new(); - while let Some(result) = stream.next().await { - match result { - Ok(batch) => { - let col = batch.column(0).as_primitive::(); - let batch_id = col.value(0); - stream_contents.push(format!("Batch: {batch_id}")); - } - Err(e) => { - // Pull the actual message for external errors rather than - // relying on DataFusionError formatting, which changes - // if backtraces are enabled, etc - let message = if let DataFusionError::External(generic) = e { - generic.to_string() - } else { - e.to_string() - }; - stream_contents.push(format!("Error: {message}")); - } + let mut partitions = (0..partition_count) + .map(|_| PartitionState::new()) + .collect::>(); + + let mut build_order = Vec::new(); + for partition in self.reads.iter().map(|partition| partition.0) { + if !build_order.contains(&partition) { + build_order.push(partition); + } + } + for partition in 0..partition_count { + if !build_order.contains(&partition) { + build_order.push(partition); } } - stream_contents.push("Done".to_string()); - Ok(format!( - "----- Output Stream -----\n{}\n----- File Stream Events -----\n{}", - stream_contents.join("\n"), + let config = self.test_config(); + // `DataSourceExec::execute` creates one execution-local shared + // state object via `create_sibling_state()` and then passes it + // to `open_with_sibling_state(...)`. These tests build + // `FileStream`s directly, bypassing `DataSourceExec`, so they must + // perform the same setup explicitly when exercising sibling-stream + // work stealing. + let shared_work_source = config.create_sibling_state().and_then(|state| { + state.as_ref().downcast_ref::().cloned() + }); + if !self.build_streams_on_first_read { + for partition in build_order { + let stream = FileStreamBuilder::new(&config) + .with_partition(partition) + .with_shared_work_source(shared_work_source.clone()) + .with_morselizer(Box::new(self.morselizer.clone())) + .with_metrics(&metrics_set) + .build()?; + partitions[partition].set_stream(stream); + } + } + + let mut initial_reads: VecDeque<_> = self.reads.into(); + let mut next_round_robin = 0; + + while !initial_reads.is_empty() + || partitions.iter().any(PartitionState::is_active) + { + let partition = if let Some(partition) = initial_reads.pop_front() { + partition.0 + } else { + let partition = next_round_robin; + next_round_robin = (next_round_robin + 1) % partition_count.max(1); + partition + }; + + let partition_state = &mut partitions[partition]; + + if self.build_streams_on_first_read && !partition_state.built { + let stream = FileStreamBuilder::new(&config) + .with_partition(partition) + .with_shared_work_source(shared_work_source.clone()) + .with_morselizer(Box::new(self.morselizer.clone())) + .with_metrics(&metrics_set) + .build()?; + partition_state.set_stream(stream); + } + + let Some(stream) = partition_state.stream.as_mut() else { + continue; + }; + + match stream.next().await { + Some(result) => partition_state.push_output(format_result(result)), + None => partition_state.finish(), + } + } + + let output_text = if partition_count == 1 { + format!( + "----- Output Stream -----\n{}", + partitions[0].output.join("\n") + ) + } else { + partitions + .into_iter() + .enumerate() + .map(|(partition, state)| { + format!( + "----- Partition {} -----\n{}", + partition, + state.output.join("\n") + ) + }) + .collect::>() + .join("\n") + }; + + let file_stream_events = if self.file_stream_events { observer.format_events() + } else { + "(omitted due to with_file_stream_events(false))".to_string() + }; + + Ok(format!( + "{output_text}\n----- File Stream Events -----\n{file_stream_events}", )) } - /// Builds the `FileScanConfig` for the configured mock file set. + /// Returns the number of configured partitions, including empty ones + /// that appear only in the explicit read schedule. + fn num_partitions(&self) -> usize { + self.partition_files + .keys() + .map(|partition| partition.0 + 1) + .chain(self.reads.iter().map(|partition| partition.0 + 1)) + .max() + .unwrap_or(1) + } + + /// Builds a `FileScanConfig` covering every configured partition. fn test_config(&self) -> FileScanConfig { - let file_group = self - .file_names - .iter() - .map(|name| PartitionedFile::new(name, 10)) - .collect(); + let file_groups = (0..self.num_partitions()) + .map(|partition| { + self.partition_files + .get(&PartitionId(partition)) + .into_iter() + .flat_map(|files| files.iter()) + .map(|name| PartitionedFile::new(name, 10)) + .collect::>() + .into() + }) + .collect::>(); + let table_schema = TableSchema::new( Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])), vec![], @@ -1091,9 +1517,76 @@ mod tests { ObjectStoreUrl::parse("test:///").unwrap(), Arc::new(MockSource::new(table_schema)), ) - .with_file_group(file_group) + .with_file_groups(file_groups) .with_limit(self.limit) + .with_preserve_order(self.preserve_order) + .with_partitioned_by_file_group(self.partitioned_by_file_group) .build() } } + + /// Formats one stream poll result into a stable snapshot line. + fn format_result(result: Result) -> String { + match result { + Ok(batch) => { + let col = batch.column(0).as_primitive::(); + let batch_id = col.value(0); + format!("Batch: {batch_id}") + } + Err(e) => { + // Pull the actual message for external errors rather than + // relying on DataFusionError formatting, which changes if + // backtraces are enabled, etc. + let message = if let DataFusionError::External(generic) = e { + generic.to_string() + } else { + e.to_string() + }; + format!("Error: {message}") + } + } + } + + /// Test-only state for one stream partition in [`FileStreamMorselTest`]. + struct PartitionState { + /// Whether the `FileStream` for this partition has been built yet. + built: bool, + /// The live stream, if this partition has not finished yet. + stream: Option, + /// Snapshot lines produced by this partition. + output: Vec, + } + + impl PartitionState { + /// Create an unbuilt partition with no output yet. + fn new() -> Self { + Self { + built: false, + stream: None, + output: vec![], + } + } + + /// Returns true if this partition might still produce output. + fn is_active(&self) -> bool { + !self.built || self.stream.is_some() + } + + /// Records that this partition's stream has been built. + fn set_stream(&mut self, stream: FileStream) { + self.stream = Some(stream); + self.built = true; + } + + /// Records one formatted output line for this partition. + fn push_output(&mut self, line: String) { + self.output.push(line); + } + + /// Marks this partition as finished. + fn finish(&mut self) { + self.push_output("Done".to_string()); + self.stream = None; + } + } } diff --git a/datafusion/datasource/src/file_stream/scan_state.rs b/datafusion/datasource/src/file_stream/scan_state.rs index 025164c29c8f..fdae1bcf7e07 100644 --- a/datafusion/datasource/src/file_stream/scan_state.rs +++ b/datafusion/datasource/src/file_stream/scan_state.rs @@ -19,7 +19,6 @@ use datafusion_common::internal_datafusion_err; use std::collections::VecDeque; use std::task::{Context, Poll}; -use crate::PartitionedFile; use crate::morsel::{Morsel, MorselPlanner, Morselizer, PendingMorselPlanner}; use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result}; @@ -27,6 +26,7 @@ use datafusion_physical_plan::metrics::ScopedTimerGuard; use futures::stream::BoxStream; use futures::{FutureExt as _, StreamExt as _}; +use super::work_source::WorkSource; use super::{FileStreamMetrics, OnError}; /// State [`FileStreamState::Scan`]. @@ -45,7 +45,7 @@ use super::{FileStreamMetrics, OnError}; /// # State Transitions /// /// ```text -/// file_iter +/// work_source /// | /// v /// morselizer.plan_file(file) @@ -62,8 +62,8 @@ use super::{FileStreamMetrics, OnError}; /// /// [`FileStreamState::Scan`]: super::FileStreamState::Scan pub(super) struct ScanState { - /// Files that still need to be planned. - file_iter: VecDeque, + /// Unopened files that still need to be planned for this stream. + work_source: WorkSource, /// Remaining row limit, if any. remain: Option, /// The morselizer used to plan files. @@ -76,7 +76,10 @@ pub(super) struct ScanState { ready_morsels: VecDeque>, /// The active reader, if any. reader: Option>>, - /// The currently outstanding I/O, if any. + /// The single planner currently blocked on I/O, if any. + /// + /// Once the I/O completes, yields the next planner and is pushed back + /// onto `ready_planners`. pending_planner: Option, /// Metrics for the active scan queues. metrics: FileStreamMetrics, @@ -84,15 +87,14 @@ pub(super) struct ScanState { impl ScanState { pub(super) fn new( - file_iter: impl Into>, + work_source: WorkSource, remain: Option, morselizer: Box, on_error: OnError, metrics: FileStreamMetrics, ) -> Self { - let file_iter = file_iter.into(); Self { - file_iter, + work_source, remain, morselizer, on_error, @@ -170,7 +172,7 @@ impl ScanState { (batch, false) } else { let batch = batch.slice(0, *remain); - let done = 1 + self.file_iter.len(); + let done = 1 + self.work_source.len(); self.metrics.files_processed.add(done); *remain = 0; (batch, true) @@ -263,8 +265,8 @@ impl ScanState { }; } - // No outstanding work remains, so morselize the next unopened file. - let part_file = match self.file_iter.pop_front() { + // No outstanding work remains, so begin planning the next unopened file. + let part_file = match self.work_source.pop_front() { Some(part_file) => part_file, None => return ScanAndReturn::Done(None), }; diff --git a/datafusion/datasource/src/file_stream/work_source.rs b/datafusion/datasource/src/file_stream/work_source.rs new file mode 100644 index 000000000000..4c5d3dd384d2 --- /dev/null +++ b/datafusion/datasource/src/file_stream/work_source.rs @@ -0,0 +1,317 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::VecDeque; +use std::sync::Arc; + +use crate::file_groups::FileGroup; +use crate::file_scan_config::FileScanConfig; +use crate::{FileRange, PartitionedFile, SplittableExt}; +use datafusion_common::Statistics; +use parking_lot::Mutex; + +/// Minimum morsel size in bytes. Morsels smaller than this are combined; +/// files are only split when each half would be at least this large. +const MIN_MORSEL_SIZE: usize = 1024 * 1024; // 1 MiB + +/// Source of work for `ScanState`. +/// +/// Streams that may share work across siblings use [`WorkSource::Shared`], +/// while streams that can not share work (e.g. because they must preserve file +/// order) use [`WorkSource::Local`]. +#[derive(Debug, Clone)] +pub(super) enum WorkSource { + /// Files this stream will plan locally without sharing them. + Local(VecDeque), + /// Files shared with sibling streams. + Shared(SharedWorkSource), +} + +impl WorkSource { + /// Pop the next file to plan from this work source. + /// + /// For shared sources, large files may be split in half when the queue + /// is running low, so idle siblings have work to steal. + pub(super) fn pop_front(&mut self) -> Option { + match self { + Self::Local(files) => files.pop_front(), + Self::Shared(shared) => shared.pop_front(), + } + } + + /// Return the number of files that are still waiting to be planned. + pub(super) fn len(&self) -> usize { + match self { + Self::Local(files) => files.len(), + Self::Shared(shared) => shared.len(), + } + } +} + +/// Shared source of work for sibling `FileStream`s +/// +/// The queue is created once per execution and shared by all reorderable +/// sibling streams for that execution. Whichever stream becomes idle first may +/// take the next unopened file from the front of the queue. +/// +/// When the queue is running low (fewer than `2 * target_partitions` items), +/// large files are split in half so idle siblings have work to steal. +/// Conversely, very small files are batched together so each stream processes +/// at least ~1 MiB of data per round-trip. +/// +/// It uses a [`Mutex`] internally to provide thread-safe access +/// to the shared file queue. +#[derive(Debug, Clone)] +pub(crate) struct SharedWorkSource { + inner: Arc, +} + +#[derive(Debug)] +pub(super) struct SharedWorkSourceInner { + files: Mutex>, + target_partitions: usize, + /// Column indices (into the table schema) that the scan projects. + /// `None` means all columns are read. + projected_columns: Option>, + /// Fallback ratio when per-column byte_size stats are absent. + projection_ratio: f64, +} + +impl SharedWorkSource { + /// Create a shared work source for the unopened files in `config`. + pub(crate) fn from_config(config: &FileScanConfig) -> Self { + let target_partitions = config.file_groups.len(); + let total_file_columns = config.file_schema().fields().len().max(1); + let projected_columns = config.file_source.projection().map(|p| { + p.expr_iter() + .filter_map(|e| { + e.as_any() + .downcast_ref::() + .map(|c| c.index()) + }) + .collect::>() + }); + let projection_ratio = projected_columns + .as_ref() + .map(|cols| cols.len() as f64 / total_file_columns as f64) + .unwrap_or(1.0); + + let files = config + .file_groups + .iter() + .flat_map(FileGroup::iter) + .cloned() + .collect(); + Self { + inner: Arc::new(SharedWorkSourceInner { + files: Mutex::new(files), + target_partitions, + projected_columns, + projection_ratio, + }), + } + } + + /// Pop the next file from the shared work queue. + /// + /// **Splitting**: when the remaining queue depth is below + /// `2 * target_partitions` and the file's projected size is at least + /// `2 * MIN_MORSEL_SIZE`, it is split in half and the second half is + /// pushed back onto the queue. + /// + /// **Merging**: when the popped file is below `MIN_MORSEL_SIZE`, + /// adjacent queue entries that refer to the same underlying file are + /// merged (their byte ranges are combined) until the merged result + /// reaches `MIN_MORSEL_SIZE` or no more same-file entries remain. + fn pop_front(&self) -> Option { + let mut files = self.inner.files.lock(); + let mut file = files.pop_front()?; + + let projected_size = self.inner.projected_byte_size(&file); + + // Split large files when the queue is shallow. + let should_split = files.len() < 2 * self.inner.target_partitions + && projected_size >= 2 * MIN_MORSEL_SIZE; + if let (true, Some((first, second))) = (should_split, split_file(&file)) { + files.push_back(second); + return Some(first); + } + + // Merge small same-file ranges until we reach MIN_MORSEL_SIZE. + if projected_size < MIN_MORSEL_SIZE { + merge_same_file(&mut file, &mut files, &self.inner, MIN_MORSEL_SIZE); + } + + Some(file) + } + + /// Return the number of files still waiting in the shared queue. + fn len(&self) -> usize { + self.inner.files.lock().len() + } +} + +impl SharedWorkSourceInner { + /// Estimate the projected byte size for `file`. + /// + /// Uses per-column `byte_size` from [`PartitionedFile::statistics`] when + /// available, otherwise falls back to + /// `raw_file_size * projection_ratio`. + fn projected_byte_size(&self, file: &PartitionedFile) -> usize { + if let (Some(cols), Some(stats)) = + (&self.projected_columns, file.statistics.as_ref()) + { + let col_stats = &stats.column_statistics; + let sum: Option = cols + .iter() + .map(|&idx| { + col_stats + .get(idx) + .and_then(|cs| cs.byte_size.get_value().copied()) + }) + .collect::>>() + .map(|v| v.into_iter().sum()); + if let Some(size) = sum { + return size; + } + } + // Fallback: raw file/range size scaled by projection ratio. + let raw = raw_file_byte_size(file); + (raw as f64 * self.projection_ratio) as usize + } +} + +/// Return the raw on-disk byte size of a [`PartitionedFile`]. +fn raw_file_byte_size(file: &PartitionedFile) -> usize { + let (start, end) = file_range(file); + (end - start) as usize +} + +/// Merge entries from `queue` into `file` while they refer to the same +/// underlying path and the projected size stays below `target`. +fn merge_same_file( + file: &mut PartitionedFile, + queue: &mut VecDeque, + inner: &SharedWorkSourceInner, + target: usize, +) { + let path = &file.object_meta.location; + while inner.projected_byte_size(file) < target { + let same_file = queue + .front() + .is_some_and(|next| next.object_meta.location == *path); + if !same_file { + break; + } + let next = queue.pop_front().unwrap(); + // Extend the byte range to cover both entries. + let (a_start, a_end) = file_range(file); + let (b_start, b_end) = file_range(&next); + file.range = Some(FileRange { + start: a_start.min(b_start), + end: a_end.max(b_end), + }); + // Drop per-file statistics — they no longer match the merged range. + file.statistics = None; + } +} + +/// Return the effective (start, end) byte range of a file. +fn file_range(file: &PartitionedFile) -> (i64, i64) { + match &file.range { + Some(range) => (range.start, range.end), + None => (0, file.object_meta.size as i64), + } +} + +/// Split a file into two halves. +/// +/// If the file's extension is a [`SplittableExt`] (e.g. +/// Parquet row-group selection), the extension is split instead of using +/// byte ranges. Otherwise falls back to byte-range splitting. +/// +/// Statistics are scaled to 50% (inexact) on each half. +fn split_file(file: &PartitionedFile) -> Option<(PartitionedFile, PartitionedFile)> { + let half_stats = file.statistics.as_ref().map(|s| Arc::new(scale_stats(s, 0.5))); + + // Try extension-based splitting (e.g. Parquet row groups). + let ext_split = file + .extensions + .as_ref() + .and_then(|ext| ext.downcast_ref::()) + .and_then(|wrapper| { + let (ext_a, ext_b) = (wrapper.split_fn)(wrapper.inner.as_ref())?; + let wrap = |inner| { + Arc::new(SplittableExt { + inner, + split_fn: wrapper.split_fn, + }) as Arc + }; + Some((wrap(ext_a), wrap(ext_b))) + }); + if let Some((ext_a, ext_b)) = ext_split { + let mut first = file.clone(); + first.extensions = Some(ext_a); + first.statistics.clone_from(&half_stats); + + let mut second = file.clone(); + second.extensions = Some(ext_b); + second.statistics = half_stats; + + return Some((first, second)); + } + + // Fallback: byte-range splitting. + let (start, end) = file_range(file); + if end - start < 2 { + return None; + } + let mid = start + (end - start) / 2; + + let mut first = file.clone(); + first.range = Some(FileRange { start, end: mid }); + first.statistics.clone_from(&half_stats); + + let mut second = file.clone(); + second.range = Some(FileRange { start: mid, end }); + second.statistics = half_stats; + + Some((first, second)) +} + +/// Scale row counts and byte sizes in `stats` by `ratio`, marking +/// everything inexact. +fn scale_stats(stats: &Statistics, ratio: f64) -> Statistics { + use datafusion_common::stats::Precision; + let scale = |p: &Precision| match p.get_value() { + Some(&v) => Precision::Inexact((v as f64 * ratio) as usize), + None => Precision::Absent, + }; + Statistics { + num_rows: scale(&stats.num_rows), + total_byte_size: scale(&stats.total_byte_size), + column_statistics: stats + .column_statistics + .iter() + .map(|cs| { + let mut cs = cs.clone().to_inexact(); + cs.byte_size = scale(&cs.byte_size); + cs + }) + .collect(), + } +} diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index a9600271c28c..4b4a7b59d189 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -67,6 +67,7 @@ pub use table_schema::TableSchema; #[expect(deprecated)] pub use statistics::add_row_stats; pub use statistics::compute_all_files_statistics; +use std::fmt::Debug; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; @@ -97,6 +98,27 @@ impl FileRange { } } +/// A pair of type-erased extension values. +type ExtPair = ( + Arc, + Arc, +); + +/// Wrapper for [`PartitionedFile::extensions`] that supports splitting +/// the extension into two halves. +/// +/// File formats that carry a sub-file selection (e.g. Parquet row groups) +/// can wrap their extension in this struct so that morsel splitting divides +/// the selection instead of falling back to byte-range splitting. +#[derive(Debug)] +pub struct SplittableExt { + /// The original extension value (e.g. `ParquetAccessPlan`). + pub inner: Arc, + /// Splits `inner` in two roughly equal halves. + /// Returns `None` when the extension cannot be split further. + pub split_fn: fn(&dyn std::any::Any) -> Option, +} + #[derive(Debug, Clone)] /// A single file or part of a file that should be read, along with its schema, statistics /// and partition column values that need to be appended to each row. diff --git a/datafusion/datasource/src/morsel/mocks.rs b/datafusion/datasource/src/morsel/mocks.rs index cd1fa3732ffe..ceb0e720691a 100644 --- a/datafusion/datasource/src/morsel/mocks.rs +++ b/datafusion/datasource/src/morsel/mocks.rs @@ -295,8 +295,11 @@ impl MockPlanBuilder { } /// Add a ready child planner - pub(crate) fn with_ready_planner(self, ready_planners: MockPlanner) -> Self { - self.with_ready_planners(vec![ready_planners]) + pub(crate) fn with_ready_planner( + self, + ready_planner: impl Into, + ) -> Self { + self.with_ready_planners(vec![ready_planner.into()]) } /// Add ready child planners produced by this planning step. @@ -430,8 +433,9 @@ impl MockMorselizer { &self.observer } - /// Associates a file path with the planner spec used to open it. - pub(crate) fn with_file(mut self, planner: MockPlanner) -> Self { + /// Specify the return planner for the specified file_path + pub(crate) fn with_planner(mut self, planner: impl Into) -> Self { + let planner = planner.into(); self.files.insert(planner.file_path.clone(), planner); self } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 81e15d0a2a09..4bdf7bf795f4 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -20,7 +20,7 @@ use std::any::Any; use std::fmt; use std::fmt::{Debug, Formatter}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_plan::execution_plan::{ @@ -123,12 +123,23 @@ use datafusion_physical_plan::filter_pushdown::{ /// └─────────────────────┘ /// ``` pub trait DataSource: Send + Sync + Debug { + /// Open the specified output partition and return its stream of + /// [`RecordBatch`]es. + /// + /// This should be used by data sources that do not need any sibling + /// coordination. Data sources that want to use per-execution shared state + /// (for example, to reorder work across partitions at runtime) should + /// implement [`Self::open_with_args`] instead. + /// + /// [`RecordBatch`]: arrow::record_batch::RecordBatch fn open( &self, partition: usize, context: Arc, ) -> Result; + fn as_any(&self) -> &dyn Any; + /// Format this source for display in explain plans fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result; @@ -246,6 +257,55 @@ pub trait DataSource: Send + Sync + Debug { ) -> Option> { None } + + /// Create per execution state to share across sibling instances of this + /// data source during one execution. + /// + /// Returns `None` (the default) if this data source has + /// no sibling-shared execution state. + fn create_sibling_state(&self) -> Option> { + None + } + + /// Open a partition using optional sibling-shared execution state. + /// + /// The default implementation ignores the additional state and delegates to + /// [`Self::open`]. + fn open_with_args(&self, args: OpenArgs) -> Result { + self.open(args.partition, args.context) + } +} + +/// Arguments for [`DataSource::open_with_args`] +#[derive(Debug, Clone)] +pub struct OpenArgs { + /// Which partition to open + pub partition: usize, + /// The task context for execution + pub context: Arc, + /// Optional sibling-shared execution state, see + /// [`DataSource::create_sibling_state`] for details. + pub sibling_state: Option>, +} + +impl OpenArgs { + /// Create a new OpenArgs with required arguments + pub fn new(partition: usize, context: Arc) -> Self { + Self { + partition, + context, + sibling_state: None, + } + } + + /// Set sibling shared state + pub fn with_shared_state( + mut self, + sibling_state: Option>, + ) -> Self { + self.sibling_state = sibling_state; + self + } } /// [`ExecutionPlan`] that reads one or more files @@ -266,6 +326,12 @@ pub struct DataSourceExec { data_source: Arc, /// Cached plan properties such as sort order cache: Arc, + /// Per execution state shared across partitions of this plan. + /// + /// Created by [`DataSource::create_sibling_state`] + /// and then passed to + /// [`DataSource::open_with_args`]. + execution_state: Arc>>>, } impl DisplayAs for DataSourceExec { @@ -339,8 +405,15 @@ impl ExecutionPlan for DataSourceExec { partition: usize, context: Arc, ) -> Result { - let stream = self.data_source.open(partition, Arc::clone(&context))?; + let shared_state = self + .execution_state + .get_or_init(|| self.data_source.create_sibling_state()) + .clone(); + let args = OpenArgs::new(partition, Arc::clone(&context)) + .with_shared_state(shared_state); + let stream = self.data_source.open_with_args(args)?; let batch_size = context.session_config().batch_size(); + log::debug!( "Batch splitting enabled for partition {partition}: batch_size={batch_size}" ); @@ -377,8 +450,13 @@ impl ExecutionPlan for DataSourceExec { fn with_fetch(&self, limit: Option) -> Option> { let data_source = self.data_source.with_fetch(limit)?; let cache = Arc::clone(&self.cache); + let execution_state = Arc::new(OnceLock::new()); - Some(Arc::new(Self { data_source, cache })) + Some(Arc::new(Self { + data_source, + cache, + execution_state, + })) } fn fetch(&self) -> Option { @@ -471,6 +549,12 @@ impl ExecutionPlan for DataSourceExec { as Arc }) } + + fn reset_state(self: Arc) -> Result> { + let mut new_exec = Arc::unwrap_or_clone(self); + new_exec.execution_state = Arc::new(OnceLock::new()); + Ok(Arc::new(new_exec)) + } } impl DataSourceExec { @@ -484,6 +568,7 @@ impl DataSourceExec { Self { data_source, cache: Arc::new(cache), + execution_state: Arc::new(OnceLock::new()), } } @@ -495,6 +580,7 @@ impl DataSourceExec { pub fn with_data_source(mut self, data_source: Arc) -> Self { self.cache = Arc::new(Self::compute_properties(&data_source)); self.data_source = data_source; + self.execution_state = Arc::new(OnceLock::new()); self } diff --git a/datafusion/datasource/src/test_util.rs b/datafusion/datasource/src/test_util.rs index 3a9e78943b07..5ce0f1419d11 100644 --- a/datafusion/datasource/src/test_util.rs +++ b/datafusion/datasource/src/test_util.rs @@ -34,6 +34,7 @@ pub(crate) struct MockSource { filter: Option>, table_schema: crate::table_schema::TableSchema, projection: crate::projection::SplitProjection, + file_opener: Option>, } impl Default for MockSource { @@ -45,6 +46,7 @@ impl Default for MockSource { filter: None, projection: crate::projection::SplitProjection::unprojected(&table_schema), table_schema, + file_opener: None, } } } @@ -57,6 +59,7 @@ impl MockSource { filter: None, projection: crate::projection::SplitProjection::unprojected(&table_schema), table_schema, + file_opener: None, } } @@ -64,6 +67,11 @@ impl MockSource { self.filter = Some(filter); self } + + pub fn with_file_opener(mut self, file_opener: Arc) -> Self { + self.file_opener = Some(file_opener); + self + } } impl FileSource for MockSource { @@ -73,7 +81,9 @@ impl FileSource for MockSource { _base_config: &FileScanConfig, _partition: usize, ) -> Result> { - unimplemented!() + self.file_opener.clone().ok_or_else(|| { + datafusion_common::internal_datafusion_err!("MockSource missing FileOpener") + }) } fn as_any(&self) -> &dyn std::any::Any {