From 01dc644d3cd7def5b499b056bbc7ee40988c125c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 16 Apr 2026 12:23:22 +0200 Subject: [PATCH 1/4] Add io-uring based ObjectStore for local file I/O MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces `datafusion-object-store-iouring`, a new crate that provides an `IoUringObjectStore` using Linux's io_uring interface for high-performance local file reads. A dedicated thread runs an io_uring event loop, and read requests (`get_opts`, `get_ranges`) are dispatched via channels — enabling batched syscalls where multiple byte-range reads (e.g., Parquet column chunks) are submitted in a single `io_uring_enter()` call instead of individual `pread()` calls. Key design: - Dedicated io_uring worker thread with a 256-entry submission queue - Unbounded mpsc channel for requests, oneshot channels for responses - Range reads batched per-request; chunked if exceeding ring capacity - Write/list/copy/delete operations delegated to LocalFileSystem - On non-Linux platforms, all operations fall back to LocalFileSystem - Feature flag `io-uring` on `datafusion-execution` to opt in Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 26 ++ Cargo.toml | 2 + datafusion/execution/Cargo.toml | 5 + datafusion/execution/src/object_store.rs | 23 +- datafusion/object-store-iouring/Cargo.toml | 50 +++ datafusion/object-store-iouring/src/lib.rs | 408 +++++++++++++++++++ datafusion/object-store-iouring/src/uring.rs | 183 +++++++++ 7 files changed, 694 insertions(+), 3 deletions(-) create mode 100644 datafusion/object-store-iouring/Cargo.toml create mode 100644 datafusion/object-store-iouring/src/lib.rs create mode 100644 datafusion/object-store-iouring/src/uring.rs diff --git a/Cargo.lock b/Cargo.lock index f918b3ae2663d..2ad98622750ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2189,6 +2189,7 @@ dependencies = [ "dashmap", "datafusion-common", "datafusion-expr", + "datafusion-object-store-iouring", "futures", "insta", "log", @@ -2382,6 +2383,20 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "datafusion-object-store-iouring" +version = "47.0.0" +dependencies = [ + "async-trait", + "bytes", + "futures", + "io-uring", + "log", + "object_store", + "tempfile", + "tokio", +] + [[package]] name = "datafusion-optimizer" version = "47.0.0" @@ -3700,6 +3715,17 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "io-uring" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd7bddefd0a8833b88a4b68f90dae22c7450d11b354198baee3874fd811b344" +dependencies = [ + "bitflags 2.9.1", + "cfg-if", + "libc", +] + [[package]] name = "ipnet" version = "2.11.0" diff --git a/Cargo.toml b/Cargo.toml index 79bb2f3cc602d..10356d0f938a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ members = [ "benchmarks", "datafusion/macros", "datafusion/doc", + "datafusion/object-store-iouring", ] exclude = ["dev/depcheck"] resolver = "2" @@ -131,6 +132,7 @@ datafusion-functions-table = { path = "datafusion/functions-table", version = "4 datafusion-functions-window = { path = "datafusion/functions-window", version = "47.0.0" } datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "47.0.0" } datafusion-macros = { path = "datafusion/macros", version = "47.0.0" } +datafusion-object-store-iouring = { path = "datafusion/object-store-iouring", version = "47.0.0" } datafusion-optimizer = { path = "datafusion/optimizer", version = "47.0.0", default-features = false } datafusion-physical-expr = { path = "datafusion/physical-expr", version = "47.0.0", default-features = false } datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "47.0.0", default-features = false } diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index 5988d3a336602..d8069d032f4c9 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -37,11 +37,16 @@ workspace = true [lib] name = "datafusion_execution" +[features] +default = [] +io-uring = ["datafusion-object-store-iouring"] + [dependencies] arrow = { workspace = true } dashmap = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } +datafusion-object-store-iouring = { workspace = true, optional = true } futures = { workspace = true } log = { workspace = true } object_store = { workspace = true, features = ["fs"] } diff --git a/datafusion/execution/src/object_store.rs b/datafusion/execution/src/object_store.rs index cd75c9f3c49ee..168c72e1b5137 100644 --- a/datafusion/execution/src/object_store.rs +++ b/datafusion/execution/src/object_store.rs @@ -21,7 +21,7 @@ use dashmap::DashMap; use datafusion_common::{exec_err, DataFusionError, Result}; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(all(not(target_arch = "wasm32"), not(feature = "io-uring")))] use object_store::local::LocalFileSystem; use object_store::ObjectStore; use std::sync::Arc; @@ -194,11 +194,28 @@ impl Default for DefaultObjectStoreRegistry { } impl DefaultObjectStoreRegistry { - /// This will register [`LocalFileSystem`] to handle `file://` paths + /// This will register [`LocalFileSystem`] to handle `file://` paths. + /// + /// When the `io-uring` feature is enabled (Linux only), registers an + /// [`IoUringObjectStore`](datafusion_object_store_iouring::IoUringObjectStore) + /// instead, which uses io_uring for batched local file reads. #[cfg(not(target_arch = "wasm32"))] pub fn new() -> Self { let object_stores: DashMap> = DashMap::new(); - object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new())); + + #[cfg(feature = "io-uring")] + { + object_stores.insert( + "file://".to_string(), + Arc::new(datafusion_object_store_iouring::IoUringObjectStore::new()), + ); + } + + #[cfg(not(feature = "io-uring"))] + { + object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new())); + } + Self { object_stores } } diff --git a/datafusion/object-store-iouring/Cargo.toml b/datafusion/object-store-iouring/Cargo.toml new file mode 100644 index 0000000000000..099cbdebcdb5d --- /dev/null +++ b/datafusion/object-store-iouring/Cargo.toml @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-object-store-iouring" +description = "io-uring based ObjectStore for DataFusion local file I/O" +keywords = ["arrow", "query", "sql", "io-uring"] +readme = "README.md" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[lints] +workspace = true + +[lib] +name = "datafusion_object_store_iouring" + +[dependencies] +async-trait = { workspace = true } +bytes = { workspace = true } +futures = { workspace = true } +log = { workspace = true } +object_store = { workspace = true, features = ["fs"] } +tokio = { workspace = true, features = ["sync"] } + +[target.'cfg(target_os = "linux")'.dependencies] +io-uring = "0.7" + +[dev-dependencies] +tempfile = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/datafusion/object-store-iouring/src/lib.rs b/datafusion/object-store-iouring/src/lib.rs new file mode 100644 index 0000000000000..02df227acdd5d --- /dev/null +++ b/datafusion/object-store-iouring/src/lib.rs @@ -0,0 +1,408 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! io-uring based [`ObjectStore`] implementation for DataFusion. +//! +//! Provides [`IoUringObjectStore`] which uses Linux's io_uring interface +//! for high-performance local file reads. A dedicated thread runs an +//! io_uring event loop, and read requests are dispatched via channels +//! from async [`ObjectStore`] methods. +//! +//! On non-Linux platforms, [`IoUringObjectStore`] delegates all operations +//! to [`LocalFileSystem`] without io_uring acceleration. +//! +//! # Performance +//! +//! The main benefit is **batched syscalls**: multiple byte-range reads +//! (e.g., Parquet column chunks) are submitted as a single +//! `io_uring_enter()` call instead of individual `pread()` calls. + +#[cfg(target_os = "linux")] +mod uring; + +use std::fmt; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::BoxStream; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +#[cfg(target_os = "linux")] +use object_store::{Attributes, GetResultPayload}; +use object_store::{ + GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, +}; + +/// ObjectStore implementation that uses io_uring for local file reads on Linux. +/// +/// Write, list, copy, and delete operations are delegated to [`LocalFileSystem`]. +/// Read operations (`get_opts`, `get_ranges`) use a dedicated io_uring thread +/// for batched, zero-copy I/O. +/// +/// # Example +/// +/// ```no_run +/// use datafusion_object_store_iouring::IoUringObjectStore; +/// use object_store::ObjectStore; +/// +/// let store = IoUringObjectStore::new(); +/// ``` +pub struct IoUringObjectStore { + inner: Arc, + root: PathBuf, + #[cfg(target_os = "linux")] + uring_sender: tokio::sync::mpsc::UnboundedSender, +} + +impl IoUringObjectStore { + /// Create a new `IoUringObjectStore` with root at `/`. + pub fn new() -> Self { + Self::new_with_root(PathBuf::from("/")) + } + + /// Create a new `IoUringObjectStore` with the given root directory. + pub fn new_with_root(root: PathBuf) -> Self { + let inner = if root == PathBuf::from("/") { + Arc::new(LocalFileSystem::new()) + } else { + Arc::new(LocalFileSystem::new_with_prefix(&root).expect("valid root path")) + }; + + #[cfg(target_os = "linux")] + { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + std::thread::Builder::new() + .name("io-uring-worker".to_string()) + .spawn(move || uring::run_uring_loop(rx)) + .expect("failed to spawn io-uring thread"); + + Self { + inner, + root, + uring_sender: tx, + } + } + + #[cfg(not(target_os = "linux"))] + { + Self { inner, root } + } + } + + /// Resolve an object_store Path to an absolute filesystem path. + #[cfg(target_os = "linux")] + fn resolve_path(&self, location: &Path) -> PathBuf { + self.root.join(location.as_ref()) + } +} + +impl Default for IoUringObjectStore { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for IoUringObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("IoUringObjectStore") + .field("root", &self.root) + .finish() + } +} + +impl fmt::Display for IoUringObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "IoUringObjectStore({})", self.root.display()) + } +} + +// ============================================================ +// Linux: io_uring accelerated reads +// ============================================================ + +#[cfg(target_os = "linux")] +impl IoUringObjectStore { + async fn get_opts_uring( + &self, + location: &Path, + options: GetOptions, + ) -> Result { + // Get file metadata via the inner store + let meta = self.inner.head(location).await?; + let file_size = meta.size as u64; + + // Resolve the requested byte range + let range = match &options.range { + Some(r) => { + r.as_range(file_size) + .map_err(|e| object_store::Error::Generic { + store: "IoUringObjectStore", + source: Box::new(e), + })? + } + None => 0..file_size, + }; + + if range.start == range.end { + // Empty range — return an empty stream + let stream = futures::stream::once(async { Ok(Bytes::new()) }).boxed(); + return Ok(GetResult { + payload: GetResultPayload::Stream(stream), + meta, + range: range.start as usize..range.end as usize, + attributes: Attributes::new(), + }); + } + + let fs_path = self.resolve_path(location); + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.uring_sender + .send(uring::IoCommand::ReadRanges { + path: fs_path, + ranges: vec![range.clone()], + response: tx, + }) + .map_err(|_| object_store::Error::Generic { + store: "IoUringObjectStore", + source: "io-uring worker thread is gone".into(), + })?; + + let mut results = rx.await.map_err(|_| object_store::Error::Generic { + store: "IoUringObjectStore", + source: "io-uring response channel dropped".into(), + })??; + + let bytes = results.remove(0); + let stream = futures::stream::once(async { Ok(bytes) }).boxed(); + + Ok(GetResult { + payload: GetResultPayload::Stream(stream), + meta, + range: range.start as usize..range.end as usize, + attributes: Attributes::new(), + }) + } + + async fn get_ranges_uring( + &self, + location: &Path, + ranges: &[Range], + ) -> Result> { + if ranges.is_empty() { + return Ok(vec![]); + } + + let fs_path = self.resolve_path(location); + let (tx, rx) = tokio::sync::oneshot::channel(); + + self.uring_sender + .send(uring::IoCommand::ReadRanges { + path: fs_path, + ranges: ranges.to_vec(), + response: tx, + }) + .map_err(|_| object_store::Error::Generic { + store: "IoUringObjectStore", + source: "io-uring worker thread is gone".into(), + })?; + + rx.await.map_err(|_| object_store::Error::Generic { + store: "IoUringObjectStore", + source: "io-uring response channel dropped".into(), + })? + } +} + +// ============================================================ +// ObjectStore trait implementation +// ============================================================ + +#[async_trait] +impl ObjectStore for IoUringObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOpts, + ) -> Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + // Head-only requests don't need io_uring + if options.head { + return self.inner.get_opts(location, options).await; + } + + #[cfg(target_os = "linux")] + { + return self.get_opts_uring(location, options).await; + } + + #[cfg(not(target_os = "linux"))] + { + self.inner.get_opts(location, options).await + } + } + + async fn get_ranges( + &self, + location: &Path, + ranges: &[Range], + ) -> Result> { + #[cfg(target_os = "linux")] + { + return self.get_ranges_uring(location, ranges).await; + } + + #[cfg(not(target_os = "linux"))] + { + self.inner.get_ranges(location, ranges).await + } + } + + async fn delete(&self, location: &Path) -> Result<()> { + self.inner.delete(location).await + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + self.inner.copy_if_not_exists(from, to).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use object_store::ObjectStore; + + #[tokio::test] + async fn test_put_and_get() { + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()); + + let path = Path::from("test/data.txt"); + let payload = PutPayload::from_static(b"hello io_uring"); + store.put(&path, payload).await.unwrap(); + + let result = store.get(&path).await.unwrap(); + let bytes = result.bytes().await.unwrap(); + assert_eq!(bytes.as_ref(), b"hello io_uring"); + } + + #[tokio::test] + async fn test_get_range() { + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()); + + let path = Path::from("test/range.txt"); + let payload = PutPayload::from_static(b"0123456789"); + store.put(&path, payload).await.unwrap(); + + let bytes = store.get_range(&path, 2..5).await.unwrap(); + assert_eq!(bytes.as_ref(), b"234"); + } + + #[tokio::test] + async fn test_get_ranges() { + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()); + + let path = Path::from("test/ranges.txt"); + let payload = PutPayload::from_static(b"0123456789"); + store.put(&path, payload).await.unwrap(); + + let ranges = vec![0..3, 5..8]; + let results = store.get_ranges(&path, &ranges).await.unwrap(); + assert_eq!(results.len(), 2); + assert_eq!(results[0].as_ref(), b"012"); + assert_eq!(results[1].as_ref(), b"567"); + } + + #[tokio::test] + async fn test_head() { + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()); + + let path = Path::from("test/head.txt"); + let payload = PutPayload::from_static(b"hello"); + store.put(&path, payload).await.unwrap(); + + let meta = store.head(&path).await.unwrap(); + assert_eq!(meta.size, 5); + } + + #[tokio::test] + async fn test_list() { + use futures::TryStreamExt; + + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()); + + let path1 = Path::from("prefix/a.txt"); + let path2 = Path::from("prefix/b.txt"); + store + .put(&path1, PutPayload::from_static(b"a")) + .await + .unwrap(); + store + .put(&path2, PutPayload::from_static(b"b")) + .await + .unwrap(); + + let prefix = Path::from("prefix"); + let entries: Vec<_> = store.list(Some(&prefix)).try_collect().await.unwrap(); + assert_eq!(entries.len(), 2); + } + + #[tokio::test] + async fn test_empty_ranges() { + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()); + + let path = Path::from("test/empty.txt"); + let payload = PutPayload::from_static(b"data"); + store.put(&path, payload).await.unwrap(); + + let results = store.get_ranges(&path, &[]).await.unwrap(); + assert!(results.is_empty()); + } +} diff --git a/datafusion/object-store-iouring/src/uring.rs b/datafusion/object-store-iouring/src/uring.rs new file mode 100644 index 0000000000000..f9265fbaeeef9 --- /dev/null +++ b/datafusion/object-store-iouring/src/uring.rs @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Dedicated io_uring worker thread. +//! +//! Receives [`IoCommand`] requests via an unbounded channel, submits +//! batched read SQEs to the kernel, collects CQEs, and sends results +//! back via oneshot channels. + +use std::ops::Range; +use std::os::unix::io::AsRawFd; +use std::path::PathBuf; + +use bytes::Bytes; +use io_uring::{opcode, types, IoUring}; +use object_store::Result; +use tokio::sync::{mpsc, oneshot}; + +/// Ring size (number of SQ entries). 256 is a good default that +/// handles typical Parquet column-chunk batches without overflow. +const RING_ENTRIES: u32 = 256; + +/// Command sent from the async ObjectStore methods to the io_uring thread. +pub(crate) enum IoCommand { + /// Read one or more byte ranges from a file. + /// All ranges are submitted as a batch in a single `io_uring_enter()`. + ReadRanges { + path: PathBuf, + ranges: Vec>, + response: oneshot::Sender>>, + }, +} + +/// Main loop for the io_uring worker thread. +/// +/// Blocks on the channel receiver, processes one [`IoCommand`] at a time, +/// and sends results back. The thread exits when the channel is closed +/// (i.e., when all senders are dropped). +pub(crate) fn run_uring_loop(mut rx: mpsc::UnboundedReceiver) { + let mut ring = match IoUring::new(RING_ENTRIES) { + Ok(ring) => ring, + Err(e) => { + log::error!("Failed to create io_uring instance: {e}"); + // Drain and error all pending requests + while let Some(cmd) = rx.blocking_recv() { + match cmd { + IoCommand::ReadRanges { response, .. } => { + let _ = response.send(Err(object_store::Error::Generic { + store: "IoUringObjectStore", + source: format!("io_uring init failed: {e}").into(), + })); + } + } + } + return; + } + }; + + while let Some(cmd) = rx.blocking_recv() { + match cmd { + IoCommand::ReadRanges { + path, + ranges, + response, + } => { + let result = execute_read_ranges(&mut ring, &path, &ranges); + let _ = response.send(result); + } + } + } + + log::debug!("io_uring worker thread exiting"); +} + +/// Execute a batch of byte-range reads using io_uring. +/// +/// Opens the file, submits all read SQEs (chunked by ring capacity), +/// waits for CQEs, and returns the results in order. +fn execute_read_ranges( + ring: &mut IoUring, + path: &std::path::Path, + ranges: &[Range], +) -> Result> { + let file = std::fs::File::open(path).map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + object_store::Error::NotFound { + path: path.display().to_string(), + source: e.into(), + } + } else { + object_store::Error::Generic { + store: "IoUringObjectStore", + source: e.into(), + } + } + })?; + + let fd = file.as_raw_fd(); + + // Allocate buffers for all ranges up front + let mut buffers: Vec> = ranges + .iter() + .map(|r| vec![0u8; (r.end - r.start) as usize]) + .collect(); + + let sq_capacity = ring.params().sq_entries() as usize; + + // Process ranges in chunks that fit the submission queue + for chunk_start in (0..ranges.len()).step_by(sq_capacity) { + let chunk_end = (chunk_start + sq_capacity).min(ranges.len()); + let chunk_len = chunk_end - chunk_start; + + // Submit read SQEs for this chunk + // SAFETY: buffers[i] is valid, properly sized, and lives until + // we collect the corresponding CQE below. + unsafe { + let mut sq = ring.submission(); + for i in chunk_start..chunk_end { + let entry = opcode::Read::new( + types::Fd(fd), + buffers[i].as_mut_ptr(), + buffers[i].len() as u32, + ) + .offset(ranges[i].start) + .build() + .user_data(i as u64); + + // If the SQ is full (shouldn't happen since we chunk), + // drop the queue, submit, and retry. + if sq.push(&entry).is_err() { + drop(sq); + ring.submit().map_err(io_err)?; + sq = ring.submission(); + sq.push(&entry).expect("SQ should have space after submit"); + } + } + sq.sync(); + } + + // Submit and wait for all reads in this chunk to complete + ring.submit_and_wait(chunk_len).map_err(io_err)?; + + // Collect completions + let mut completed = 0; + while completed < chunk_len { + let cq = ring.completion(); + for cqe in cq { + let idx = cqe.user_data() as usize; + let ret = cqe.result(); + if ret < 0 { + return Err(io_err(std::io::Error::from_raw_os_error(-ret))); + } + let bytes_read = ret as usize; + buffers[idx].truncate(bytes_read); + completed += 1; + } + } + } + + // Convert buffers to Bytes + Ok(buffers.into_iter().map(Bytes::from).collect()) +} + +fn io_err(e: std::io::Error) -> object_store::Error { + object_store::Error::Generic { + store: "IoUringObjectStore", + source: e.into(), + } +} From 897740049aa9c45dd3e3fd79b95afca345a7cf2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 16 Apr 2026 13:59:20 +0200 Subject: [PATCH 2/4] Fix Linux compile errors and enable io-uring by default in datafusion - Fix missing `StreamExt` import for `.boxed()` on Linux code path - Fix `GetResult.range` type: `Range` in object_store 0.13.2 - Add `io-uring` feature to datafusion core, forwarding to execution - Add to core's default features so benchmarks get it automatically Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/core/Cargo.toml | 2 ++ datafusion/object-store-iouring/src/lib.rs | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index a2a07d4598b0a..be3f06f15b49d 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -67,8 +67,10 @@ default = [ "parquet", "recursive_protection", "sql", + "io-uring", ] encoding_expressions = ["datafusion-functions/encoding_expressions"] +io-uring = ["datafusion-execution/io-uring"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"] math_expressions = ["datafusion-functions/math_expressions"] diff --git a/datafusion/object-store-iouring/src/lib.rs b/datafusion/object-store-iouring/src/lib.rs index 5670a638a3196..c4399321f3c7f 100644 --- a/datafusion/object-store-iouring/src/lib.rs +++ b/datafusion/object-store-iouring/src/lib.rs @@ -41,6 +41,8 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; +#[cfg(target_os = "linux")] +use futures::StreamExt; use futures::stream::BoxStream; #[cfg(target_os = "linux")] use object_store::ObjectStoreExt; @@ -169,7 +171,7 @@ impl IoUringObjectStore { return Ok(GetResult { payload: GetResultPayload::Stream(stream), meta, - range: range.start as usize..range.end as usize, + range: range.clone(), attributes: Attributes::new(), }); } @@ -199,7 +201,7 @@ impl IoUringObjectStore { Ok(GetResult { payload: GetResultPayload::Stream(stream), meta, - range: range.start as usize..range.end as usize, + range: range.clone(), attributes: Attributes::new(), }) } From 87539f664eeeaf6cf3c3f9bb99aa2467670e9fcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 16 Apr 2026 14:25:59 +0200 Subject: [PATCH 3/4] Fall back to LocalFileSystem when io_uring is unavailable CI runners and Docker containers often block io_uring_setup via seccomp filters (EPERM). Instead of failing hard, probe availability at construction time and gracefully fall back to LocalFileSystem for all read operations when io_uring cannot be initialized. Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/object-store-iouring/src/lib.rs | 56 +++++++++++++------- datafusion/object-store-iouring/src/uring.rs | 6 +++ 2 files changed, 42 insertions(+), 20 deletions(-) diff --git a/datafusion/object-store-iouring/src/lib.rs b/datafusion/object-store-iouring/src/lib.rs index c4399321f3c7f..aabe9f0ce179d 100644 --- a/datafusion/object-store-iouring/src/lib.rs +++ b/datafusion/object-store-iouring/src/lib.rs @@ -72,8 +72,9 @@ use object_store::{ pub struct IoUringObjectStore { inner: Arc, root: PathBuf, + /// `None` when io_uring is unavailable (non-Linux, or EPERM in containers). #[cfg(target_os = "linux")] - uring_sender: tokio::sync::mpsc::UnboundedSender, + uring_sender: Option>, } impl IoUringObjectStore { @@ -91,17 +92,34 @@ impl IoUringObjectStore { }; #[cfg(target_os = "linux")] - { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - std::thread::Builder::new() - .name("io-uring-worker".to_string()) - .spawn(move || uring::run_uring_loop(rx)) - .expect("failed to spawn io-uring thread"); + let uring_sender = { + // Probe whether io_uring is available (may fail with EPERM + // inside Docker / seccomp-restricted environments). + match uring::is_available() { + Ok(()) => { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + std::thread::Builder::new() + .name("io-uring-worker".to_string()) + .spawn(move || uring::run_uring_loop(rx)) + .expect("failed to spawn io-uring thread"); + log::info!("io_uring available — using IoUringObjectStore"); + Some(tx) + } + Err(e) => { + log::warn!( + "io_uring unavailable ({e}), falling back to LocalFileSystem" + ); + None + } + } + }; + #[cfg(target_os = "linux")] + { Self { inner, root, - uring_sender: tx, + uring_sender, } } @@ -179,7 +197,9 @@ impl IoUringObjectStore { let fs_path = self.resolve_path(location); let (tx, rx) = tokio::sync::oneshot::channel(); - self.uring_sender + // SAFETY: callers check uring_sender.is_some() before calling + let sender = self.uring_sender.as_ref().unwrap(); + sender .send(uring::IoCommand::ReadRanges { path: fs_path, ranges: vec![range.clone()], @@ -218,7 +238,9 @@ impl IoUringObjectStore { let fs_path = self.resolve_path(location); let (tx, rx) = tokio::sync::oneshot::channel(); - self.uring_sender + // SAFETY: callers check uring_sender.is_some() before calling + let sender = self.uring_sender.as_ref().unwrap(); + sender .send(uring::IoCommand::ReadRanges { path: fs_path, ranges: ranges.to_vec(), @@ -266,14 +288,11 @@ impl ObjectStore for IoUringObjectStore { } #[cfg(target_os = "linux")] - { + if self.uring_sender.is_some() { return self.get_opts_uring(location, options).await; } - #[cfg(not(target_os = "linux"))] - { - self.inner.get_opts(location, options).await - } + self.inner.get_opts(location, options).await } async fn get_ranges( @@ -282,14 +301,11 @@ impl ObjectStore for IoUringObjectStore { ranges: &[Range], ) -> Result> { #[cfg(target_os = "linux")] - { + if self.uring_sender.is_some() { return self.get_ranges_uring(location, ranges).await; } - #[cfg(not(target_os = "linux"))] - { - self.inner.get_ranges(location, ranges).await - } + self.inner.get_ranges(location, ranges).await } fn delete_stream( diff --git a/datafusion/object-store-iouring/src/uring.rs b/datafusion/object-store-iouring/src/uring.rs index 12cfe45b941b9..fd4f354eba440 100644 --- a/datafusion/object-store-iouring/src/uring.rs +++ b/datafusion/object-store-iouring/src/uring.rs @@ -34,6 +34,12 @@ use tokio::sync::{mpsc, oneshot}; /// handles typical Parquet column-chunk batches without overflow. const RING_ENTRIES: u32 = 256; +/// Probe whether the kernel supports io_uring (may fail with EPERM in +/// Docker / seccomp-restricted environments). +pub(crate) fn is_available() -> std::result::Result<(), std::io::Error> { + IoUring::new(2).map(|_| ()) +} + /// Command sent from the async ObjectStore methods to the io_uring thread. pub(crate) enum IoCommand { /// Read one or more byte ranges from a file. From 5df85ebe5502006d51799f611793667d77d04012 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Thu, 16 Apr 2026 15:17:14 +0200 Subject: [PATCH 4/4] Fix CI clippy and doc failures - Remove unnecessary `as u64` cast (meta.size is already u64 in 0.13.2) - Allow clippy::result_large_err on execute_read_ranges (object_store::Error is large by design) - Fix broken rustdoc links to LocalFileSystem (cfg-gated away when io-uring feature is enabled) Co-Authored-By: Claude Opus 4.6 (1M context) --- datafusion/execution/src/object_store.rs | 8 ++++---- datafusion/object-store-iouring/src/lib.rs | 2 +- datafusion/object-store-iouring/src/uring.rs | 1 + 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/datafusion/execution/src/object_store.rs b/datafusion/execution/src/object_store.rs index e79827c2a05e8..5799a678a728e 100644 --- a/datafusion/execution/src/object_store.rs +++ b/datafusion/execution/src/object_store.rs @@ -205,11 +205,11 @@ impl Default for DefaultObjectStoreRegistry { } impl DefaultObjectStoreRegistry { - /// This will register [`LocalFileSystem`] to handle `file://` paths. + /// This will register a local filesystem object store to handle `file://` paths. /// /// When the `io-uring` feature is enabled (Linux only), registers an - /// [`IoUringObjectStore`](datafusion_object_store_iouring::IoUringObjectStore) - /// instead, which uses io_uring for batched local file reads. + /// `IoUringObjectStore` instead, which uses io_uring for batched local + /// file reads (falling back to `LocalFileSystem` if io_uring is unavailable). #[cfg(not(target_arch = "wasm32"))] pub fn new() -> Self { let object_stores: DashMap> = DashMap::new(); @@ -240,7 +240,7 @@ impl DefaultObjectStoreRegistry { /// /// Stores are registered based on the scheme, host and port of the provided URL -/// with a [`LocalFileSystem::new`] automatically registered for `file://` (if the +/// with a local filesystem store automatically registered for `file://` (if the /// target arch is not `wasm32`). /// /// For example: diff --git a/datafusion/object-store-iouring/src/lib.rs b/datafusion/object-store-iouring/src/lib.rs index aabe9f0ce179d..8548249e00116 100644 --- a/datafusion/object-store-iouring/src/lib.rs +++ b/datafusion/object-store-iouring/src/lib.rs @@ -169,7 +169,7 @@ impl IoUringObjectStore { ) -> Result { // Get file metadata via the inner store let meta = self.inner.head(location).await?; - let file_size = meta.size as u64; + let file_size = meta.size; // Resolve the requested byte range let range = match &options.range { diff --git a/datafusion/object-store-iouring/src/uring.rs b/datafusion/object-store-iouring/src/uring.rs index fd4f354eba440..9d0702a106bf1 100644 --- a/datafusion/object-store-iouring/src/uring.rs +++ b/datafusion/object-store-iouring/src/uring.rs @@ -96,6 +96,7 @@ pub(crate) fn run_uring_loop(mut rx: mpsc::UnboundedReceiver) { /// /// Opens the file, submits all read SQEs (chunked by ring capacity), /// waits for CQEs, and returns the results in order. +#[allow(clippy::result_large_err)] // object_store::Error is large by design fn execute_read_ranges( ring: &mut IoUring, path: &std::path::Path,