Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 52 additions & 0 deletions datafusion/datasource-parquet/src/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::sort::reverse_row_selection;
use datafusion_common::{Result, assert_eq_or_internal_err};
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
use std::sync::Arc;

/// A selection of rows and row groups within a ParquetFile to decode.
///
Expand Down Expand Up @@ -339,6 +340,11 @@ impl ParquetAccessPlan {
self.row_groups
}

/// Return the count of row groups that should be scanned (not skipped).
pub fn scan_count(&self) -> usize {
self.row_groups.iter().filter(|rg| rg.should_scan()).count()
}

/// Prepare this plan and resolve to the final `PreparedAccessPlan`
pub(crate) fn prepare(
self,
Expand Down Expand Up @@ -398,6 +404,52 @@ impl PreparedAccessPlan {
}
}

/// Split a [`ParquetAccessPlan`] in two roughly equal halves by row group.
///
/// This is used as the `split_fn` in [`SplittableExt`] so that morsel
/// splitting divides row groups instead of using byte-range heuristics.
///
/// [`SplittableExt`]: datafusion_datasource::SplittableExt
pub fn split_parquet_access_plan(
plan: &dyn std::any::Any,
) -> Option<(
Arc<dyn std::any::Any + Send + Sync>,
Arc<dyn std::any::Any + Send + Sync>,
)> {
let plan = plan.downcast_ref::<ParquetAccessPlan>()?;
if plan.scan_count() < 2 {
return None;
}

let target = plan.scan_count() / 2;
let mut seen = 0;
let split_idx = plan
.inner()
.iter()
.position(|rg| {
if rg.should_scan() {
seen += 1;
}
seen >= target && rg.should_scan()
})
.unwrap_or(0)
+ 1;

let mut first = plan.inner().to_vec();
let mut second = plan.inner().to_vec();
for rg in first.iter_mut().skip(split_idx) {
*rg = RowGroupAccess::Skip;
}
for rg in second.iter_mut().take(split_idx) {
*rg = RowGroupAccess::Skip;
}

Some((
Arc::new(ParquetAccessPlan::new(first)),
Arc::new(ParquetAccessPlan::new(second)),
))
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
13 changes: 10 additions & 3 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1536,15 +1536,22 @@ fn create_initial_plan(
row_group_count: usize,
) -> Result<ParquetAccessPlan> {
if let Some(extensions) = extensions {
if let Some(access_plan) = extensions.downcast_ref::<ParquetAccessPlan>() {
// Try direct ParquetAccessPlan first, then unwrap SplittableExt.
let plan = extensions
.downcast_ref::<ParquetAccessPlan>()
.or_else(|| {
extensions
.downcast_ref::<datafusion_datasource::SplittableExt>()
.and_then(|w| w.inner.downcast_ref::<ParquetAccessPlan>())
});

if let Some(access_plan) = plan {
let plan_len = access_plan.len();
if plan_len != row_group_count {
return exec_err!(
"Invalid ParquetAccessPlan for {file_name}. Specified {plan_len} row groups, but file has {row_group_count}"
);
}

// check row group count matches the plan
return Ok(access_plan.clone());
} else {
debug!("DataSourceExec Ignoring unknown extension specified for {file_name}");
Expand Down
1 change: 1 addition & 0 deletions datafusion/datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ itertools = { workspace = true }
liblzma = { workspace = true, optional = true }
log = { workspace = true }
object_store = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true, optional = true }
tokio = { workspace = true }
Expand Down
136 changes: 133 additions & 3 deletions datafusion/datasource/src/file_scan_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use crate::file_groups::FileGroup;
use crate::{
PartitionedFile, display::FileGroupsDisplay, file::FileSource,
file_compression_type::FileCompressionType, file_stream::FileStreamBuilder,
source::DataSource, statistics::MinMaxStatistics,
file_stream::work_source::SharedWorkSource, source::DataSource,
statistics::MinMaxStatistics,
};
use arrow::datatypes::FieldRef;
use arrow::datatypes::{DataType, Schema, SchemaRef};
Expand All @@ -38,6 +39,7 @@ use datafusion_execution::{
};
use datafusion_expr::Operator;

use crate::source::OpenArgs;
use datafusion_physical_expr::expressions::{BinaryExpr, Column};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::utils::reassign_expr_columns;
Expand Down Expand Up @@ -580,6 +582,15 @@ impl DataSource for FileScanConfig {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
self.open_with_args(OpenArgs::new(partition, context))
}

fn open_with_args(&self, args: OpenArgs) -> Result<SendableRecordBatchStream> {
let OpenArgs {
partition,
context,
sibling_state,
} = args;
let object_store = context.runtime_env().object_store(&self.object_store_url)?;
let batch_size = self
.batch_size
Expand All @@ -589,8 +600,17 @@ impl DataSource for FileScanConfig {

let morselizer = source.create_morselizer(object_store, self, partition)?;

// Extract the shared work source from the sibling state if it exists.
// This allows multiple sibling streams to steal work from a single
// shared queue of unopened files.
let shared_work_source = sibling_state
.as_ref()
.and_then(|state| state.downcast_ref::<SharedWorkSource>())
.cloned();

let stream = FileStreamBuilder::new(self)
.with_partition(partition)
.with_shared_work_source(shared_work_source)
.with_morselizer(morselizer)
.with_metrics(source.metrics())
.build()?;
Expand Down Expand Up @@ -991,6 +1011,20 @@ impl DataSource for FileScanConfig {
// Delegate to the file source
self.file_source.apply_expressions(f)
}

/// Create any shared state that should be passed between sibling streams
/// during one execution.
///
/// This returns `None` when sibling streams must not share work, such as
/// when file order must be preserved or the file groups define the output
/// partitioning needed for the rest of the plan
fn create_sibling_state(&self) -> Option<Arc<dyn Any + Send + Sync>> {
if self.preserve_order || self.partitioned_by_file_group {
return None;
}

Some(Arc::new(SharedWorkSource::from_config(self)) as Arc<dyn Any + Send + Sync>)
}
}

impl FileScanConfig {
Expand Down Expand Up @@ -1368,19 +1402,33 @@ mod tests {

use super::*;
use crate::TableSchema;
use crate::source::DataSourceExec;
use crate::test_util::col;
use crate::{
generate_test_files, test_util::MockSource, tests::aggr_test_schema,
verify_sort_integrity,
};

use arrow::array::{Int32Array, RecordBatch};
use arrow::datatypes::Field;
use datafusion_common::ColumnStatistics;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::{Result, assert_batches_eq, internal_err};
use datafusion_execution::TaskContext;
use datafusion_expr::SortExpr;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::create_physical_sort_expr;
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_expr::projection::ProjectionExpr;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::execution_plan::collect;
use futures::FutureExt as _;
use futures::StreamExt as _;
use futures::stream;
use object_store::ObjectStore;
use std::fmt::Debug;

#[derive(Clone)]
struct InexactSortPushdownSource {
Expand All @@ -1400,7 +1448,7 @@ mod tests {
impl FileSource for InexactSortPushdownSource {
fn create_file_opener(
&self,
_object_store: Arc<dyn object_store::ObjectStore>,
_object_store: Arc<dyn ObjectStore>,
_base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn crate::file_stream::FileOpener>> {
Expand Down Expand Up @@ -2288,6 +2336,88 @@ mod tests {
assert_eq!(partition_stats.total_byte_size, Precision::Exact(800));
}

/// Regression test for reusing a `DataSourceExec` after its execution-local
/// shared work queue has been drained.
///
/// This test uses a single file group with two files so the scan creates a
/// shared unopened-file queue. Executing after `reset_state` must recreate
/// the shared queue and return the same rows again.
#[tokio::test]
async fn reset_state_recreates_shared_work_source() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new(
"value",
DataType::Int32,
false,
)]));
let file_source = Arc::new(
MockSource::new(Arc::clone(&schema))
.with_file_opener(Arc::new(ResetStateTestFileOpener { schema })),
);

let config =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
.with_file_group(FileGroup::new(vec![
PartitionedFile::new("file1.parquet", 100),
PartitionedFile::new("file2.parquet", 100),
]))
.build();

let exec: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
let task_ctx = Arc::new(TaskContext::default());

// Running the same scan after resetting the state, should
// produce the same answer.
let first_run = collect(Arc::clone(&exec), Arc::clone(&task_ctx)).await?;
let reset_exec = exec.reset_state()?;
let second_run = collect(reset_exec, task_ctx).await?;

let expected = [
"+-------+",
"| value |",
"+-------+",
"| 1 |",
"| 2 |",
"+-------+",
];
assert_batches_eq!(expected, &first_run);
assert_batches_eq!(expected, &second_run);

Ok(())
}

/// Test-only `FileOpener` that turns file names like `file1.parquet` into a
/// single-batch stream containing that numeric value
#[derive(Debug)]
struct ResetStateTestFileOpener {
schema: SchemaRef,
}

impl crate::file_stream::FileOpener for ResetStateTestFileOpener {
fn open(
&self,
file: PartitionedFile,
) -> Result<crate::file_stream::FileOpenFuture> {
let value = file
.object_meta
.location
.as_ref()
.trim_start_matches("file")
.trim_end_matches(".parquet")
.parse::<i32>()
.expect("invalid test file name");
let schema = Arc::clone(&self.schema);
Ok(async move {
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(Int32Array::from(vec![value]))],
)
.expect("test batch should be valid");
Ok(stream::iter(vec![Ok(batch)]).boxed())
}
.boxed())
}
}

#[test]
fn test_output_partitioning_not_partitioned_by_file_group() {
let file_schema = aggr_test_schema();
Expand Down Expand Up @@ -2476,7 +2606,7 @@ mod tests {
impl FileSource for ExactSortPushdownSource {
fn create_file_opener(
&self,
_object_store: Arc<dyn object_store::ObjectStore>,
_object_store: Arc<dyn ObjectStore>,
_base_config: &FileScanConfig,
_partition: usize,
) -> Result<Arc<dyn crate::file_stream::FileOpener>> {
Expand Down
21 changes: 19 additions & 2 deletions datafusion/datasource/src/file_stream/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use crate::file_scan_config::FileScanConfig;
use crate::file_stream::scan_state::ScanState;
use crate::file_stream::work_source::{SharedWorkSource, WorkSource};
use crate::morsel::{FileOpenerMorselizer, Morselizer};
use datafusion_common::{Result, internal_err};
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
Expand All @@ -33,17 +34,19 @@ pub struct FileStreamBuilder<'a> {
morselizer: Option<Box<dyn Morselizer>>,
metrics: Option<&'a ExecutionPlanMetricsSet>,
on_error: OnError,
shared_work_source: Option<SharedWorkSource>,
}

impl<'a> FileStreamBuilder<'a> {
/// Create a new builder.
/// Create a new builder for [`FileStream`].
pub fn new(config: &'a FileScanConfig) -> Self {
Self {
config,
partition: None,
morselizer: None,
metrics: None,
on_error: OnError::Fail,
shared_work_source: None,
}
}

Expand Down Expand Up @@ -81,6 +84,15 @@ impl<'a> FileStreamBuilder<'a> {
self
}

/// Configure the [`SharedWorkSource`] for sibling work stealing.
pub(crate) fn with_shared_work_source(
mut self,
shared_work_source: Option<SharedWorkSource>,
) -> Self {
self.shared_work_source = shared_work_source;
self
}

/// Build the configured [`FileStream`].
pub fn build(self) -> Result<FileStream> {
let Self {
Expand All @@ -89,6 +101,7 @@ impl<'a> FileStreamBuilder<'a> {
morselizer,
metrics,
on_error,
shared_work_source,
} = self;

let Some(partition) = partition else {
Expand All @@ -106,10 +119,14 @@ impl<'a> FileStreamBuilder<'a> {
"FileStreamBuilder invalid partition index: {partition}"
);
};
let work_source = match shared_work_source {
Some(shared) => WorkSource::Shared(shared),
None => WorkSource::Local(file_group.into_inner().into()),
};

let file_stream_metrics = FileStreamMetrics::new(metrics, partition);
let scan_state = Box::new(ScanState::new(
file_group.into_inner(),
work_source,
config.limit,
morselizer,
on_error,
Expand Down
Loading
Loading