diff --git a/Cargo.lock b/Cargo.lock index bbec97ed7ff3..9e0940b5f068 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2148,6 +2148,7 @@ dependencies = [ "dashmap", "datafusion-common", "datafusion-expr", + "datafusion-object-store-iouring", "datafusion-physical-expr-common", "futures", "insta", @@ -2367,6 +2368,20 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "datafusion-object-store-iouring" +version = "53.0.0" +dependencies = [ + "async-trait", + "bytes", + "futures", + "io-uring", + "log", + "object_store", + "tempfile", + "tokio", +] + [[package]] name = "datafusion-optimizer" version = "53.0.0" @@ -3766,6 +3781,17 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "io-uring" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd7bddefd0a8833b88a4b68f90dae22c7450d11b354198baee3874fd811b344" +dependencies = [ + "bitflags", + "cfg-if", + "libc", +] + [[package]] name = "ipnet" version = "2.12.0" diff --git a/Cargo.toml b/Cargo.toml index 7e75bb59b68f..daf3269e2300 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ members = [ "benchmarks", "datafusion/macros", "datafusion/doc", + "datafusion/object-store-iouring", ] exclude = ["dev/depcheck"] resolver = "2" @@ -140,6 +141,7 @@ datafusion-functions-table = { path = "datafusion/functions-table", version = "5 datafusion-functions-window = { path = "datafusion/functions-window", version = "53.0.0" } datafusion-functions-window-common = { path = "datafusion/functions-window-common", version = "53.0.0" } datafusion-macros = { path = "datafusion/macros", version = "53.0.0" } +datafusion-object-store-iouring = { path = "datafusion/object-store-iouring", version = "53.0.0" } datafusion-optimizer = { path = "datafusion/optimizer", version = "53.0.0", default-features = false } datafusion-physical-expr = { path = "datafusion/physical-expr", version = "53.0.0", default-features = false } datafusion-physical-expr-adapter = { path = "datafusion/physical-expr-adapter", version = "53.0.0", default-features = false } @@ -153,7 +155,6 @@ datafusion-session = { path = "datafusion/session", version = "53.0.0" } datafusion-spark = { path = "datafusion/spark", version = "53.0.0" } datafusion-sql = { path = "datafusion/sql", version = "53.0.0" } datafusion-substrait = { path = "datafusion/substrait", version = "53.0.0" } - doc-comment = "0.3" env_logger = "0.11" flate2 = "1.1.9" diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index a2a07d4598b0..be3f06f15b49 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -67,8 +67,10 @@ default = [ "parquet", "recursive_protection", "sql", + "io-uring", ] encoding_expressions = ["datafusion-functions/encoding_expressions"] +io-uring = ["datafusion-execution/io-uring"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"] math_expressions = ["datafusion-functions/math_expressions"] diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index 06c84d8acb49..e40addb2c1df 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -41,7 +41,7 @@ workspace = true name = "datafusion_execution" [features] -default = ["sql"] +default = ["sql", "io-uring"] parquet_encryption = [ "parquet/encryption", @@ -50,6 +50,7 @@ arrow_buffer_pool = [ "arrow-buffer/pool", ] sql = [] +io-uring = ["datafusion-object-store-iouring"] [dependencies] arrow = { workspace = true } @@ -58,6 +59,7 @@ async-trait = { workspace = true } dashmap = { workspace = true } datafusion-common = { workspace = true, default-features = false } datafusion-expr = { workspace = true, default-features = false } +datafusion-object-store-iouring = { workspace = true, optional = true } datafusion-physical-expr-common = { workspace = true, default-features = false } futures = { workspace = true } log = { workspace = true } diff --git a/datafusion/execution/src/object_store.rs b/datafusion/execution/src/object_store.rs index 22ce1f0cf2bb..5799a678a728 100644 --- a/datafusion/execution/src/object_store.rs +++ b/datafusion/execution/src/object_store.rs @@ -24,7 +24,7 @@ use datafusion_common::{ DataFusionError, Result, exec_err, internal_datafusion_err, not_impl_err, }; use object_store::ObjectStore; -#[cfg(not(target_arch = "wasm32"))] +#[cfg(all(not(target_arch = "wasm32"), not(feature = "io-uring")))] use object_store::local::LocalFileSystem; use std::sync::Arc; use url::Url; @@ -205,11 +205,28 @@ impl Default for DefaultObjectStoreRegistry { } impl DefaultObjectStoreRegistry { - /// This will register [`LocalFileSystem`] to handle `file://` paths + /// This will register a local filesystem object store to handle `file://` paths. + /// + /// When the `io-uring` feature is enabled (Linux only), registers an + /// `IoUringObjectStore` instead, which uses io_uring for batched local + /// file reads (falling back to `LocalFileSystem` if io_uring is unavailable). #[cfg(not(target_arch = "wasm32"))] pub fn new() -> Self { let object_stores: DashMap> = DashMap::new(); - object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new())); + + #[cfg(feature = "io-uring")] + { + object_stores.insert( + "file://".to_string(), + Arc::new(datafusion_object_store_iouring::IoUringObjectStore::new()), + ); + } + + #[cfg(not(feature = "io-uring"))] + { + object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new())); + } + Self { object_stores } } @@ -223,7 +240,7 @@ impl DefaultObjectStoreRegistry { /// /// Stores are registered based on the scheme, host and port of the provided URL -/// with a [`LocalFileSystem::new`] automatically registered for `file://` (if the +/// with a local filesystem store automatically registered for `file://` (if the /// target arch is not `wasm32`). /// /// For example: diff --git a/datafusion/object-store-iouring/Cargo.toml b/datafusion/object-store-iouring/Cargo.toml new file mode 100644 index 000000000000..099cbdebcdb5 --- /dev/null +++ b/datafusion/object-store-iouring/Cargo.toml @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-object-store-iouring" +description = "io-uring based ObjectStore for DataFusion local file I/O" +keywords = ["arrow", "query", "sql", "io-uring"] +readme = "README.md" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[lints] +workspace = true + +[lib] +name = "datafusion_object_store_iouring" + +[dependencies] +async-trait = { workspace = true } +bytes = { workspace = true } +futures = { workspace = true } +log = { workspace = true } +object_store = { workspace = true, features = ["fs"] } +tokio = { workspace = true, features = ["sync"] } + +[target.'cfg(target_os = "linux")'.dependencies] +io-uring = "0.7" + +[dev-dependencies] +tempfile = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/datafusion/object-store-iouring/src/lib.rs b/datafusion/object-store-iouring/src/lib.rs new file mode 100644 index 000000000000..8548249e0011 --- /dev/null +++ b/datafusion/object-store-iouring/src/lib.rs @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! io-uring based [`ObjectStore`] implementation for DataFusion. +//! +//! Provides [`IoUringObjectStore`] which uses Linux's io_uring interface +//! for high-performance local file reads. A dedicated thread runs an +//! io_uring event loop, and read requests are dispatched via channels +//! from async [`ObjectStore`] methods. +//! +//! On non-Linux platforms, [`IoUringObjectStore`] delegates all operations +//! to [`LocalFileSystem`] without io_uring acceleration. +//! +//! # Performance +//! +//! The main benefit is **batched syscalls**: multiple byte-range reads +//! (e.g., Parquet column chunks) are submitted as a single +//! `io_uring_enter()` call instead of individual `pread()` calls. + +#[cfg(target_os = "linux")] +mod uring; + +use std::fmt; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +#[cfg(target_os = "linux")] +use futures::StreamExt; +use futures::stream::BoxStream; +#[cfg(target_os = "linux")] +use object_store::ObjectStoreExt; +use object_store::local::LocalFileSystem; +use object_store::path::Path; +#[cfg(target_os = "linux")] +use object_store::{Attributes, GetResultPayload}; +use object_store::{ + CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, +}; + +/// ObjectStore implementation that uses io_uring for local file reads on Linux. +/// +/// Write, list, copy, and delete operations are delegated to [`LocalFileSystem`]. +/// Read operations (`get_opts`, `get_ranges`) use a dedicated io_uring thread +/// for batched, zero-copy I/O. +/// +/// # Example +/// +/// ```no_run +/// use datafusion_object_store_iouring::IoUringObjectStore; +/// use object_store::ObjectStore; +/// +/// let store = IoUringObjectStore::new(); +/// ``` +pub struct IoUringObjectStore { + inner: Arc, + root: PathBuf, + /// `None` when io_uring is unavailable (non-Linux, or EPERM in containers). + #[cfg(target_os = "linux")] + uring_sender: Option>, +} + +impl IoUringObjectStore { + /// Create a new `IoUringObjectStore` with root at `/`. + pub fn new() -> Self { + Self::new_with_root(PathBuf::from("/")) + } + + /// Create a new `IoUringObjectStore` with the given root directory. + pub fn new_with_root(root: PathBuf) -> Self { + let inner = if root == std::path::Path::new("/") { + Arc::new(LocalFileSystem::new()) + } else { + Arc::new(LocalFileSystem::new_with_prefix(&root).expect("valid root path")) + }; + + #[cfg(target_os = "linux")] + let uring_sender = { + // Probe whether io_uring is available (may fail with EPERM + // inside Docker / seccomp-restricted environments). + match uring::is_available() { + Ok(()) => { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + std::thread::Builder::new() + .name("io-uring-worker".to_string()) + .spawn(move || uring::run_uring_loop(rx)) + .expect("failed to spawn io-uring thread"); + log::info!("io_uring available — using IoUringObjectStore"); + Some(tx) + } + Err(e) => { + log::warn!( + "io_uring unavailable ({e}), falling back to LocalFileSystem" + ); + None + } + } + }; + + #[cfg(target_os = "linux")] + { + Self { + inner, + root, + uring_sender, + } + } + + #[cfg(not(target_os = "linux"))] + { + Self { inner, root } + } + } + + /// Resolve an object_store Path to an absolute filesystem path. + #[cfg(target_os = "linux")] + fn resolve_path(&self, location: &Path) -> PathBuf { + self.root.join(location.as_ref()) + } +} + +impl Default for IoUringObjectStore { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for IoUringObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("IoUringObjectStore") + .field("root", &self.root) + .finish() + } +} + +impl fmt::Display for IoUringObjectStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "IoUringObjectStore({})", self.root.display()) + } +} + +// ============================================================ +// Linux: io_uring accelerated reads +// ============================================================ + +#[cfg(target_os = "linux")] +impl IoUringObjectStore { + async fn get_opts_uring( + &self, + location: &Path, + options: GetOptions, + ) -> Result { + // Get file metadata via the inner store + let meta = self.inner.head(location).await?; + let file_size = meta.size; + + // Resolve the requested byte range + let range = match &options.range { + Some(r) => { + r.as_range(file_size) + .map_err(|e| object_store::Error::Generic { + store: "IoUringObjectStore", + source: Box::new(e), + })? + } + None => 0..file_size, + }; + + if range.start == range.end { + // Empty range — return an empty stream + let stream = futures::stream::once(async { Ok(Bytes::new()) }).boxed(); + return Ok(GetResult { + payload: GetResultPayload::Stream(stream), + meta, + range: range.clone(), + attributes: Attributes::new(), + }); + } + + let fs_path = self.resolve_path(location); + let (tx, rx) = tokio::sync::oneshot::channel(); + + // SAFETY: callers check uring_sender.is_some() before calling + let sender = self.uring_sender.as_ref().unwrap(); + sender + .send(uring::IoCommand::ReadRanges { + path: fs_path, + ranges: vec![range.clone()], + response: tx, + }) + .map_err(|_| object_store::Error::Generic { + store: "IoUringObjectStore", + source: "io-uring worker thread is gone".into(), + })?; + + let mut results = rx.await.map_err(|_| object_store::Error::Generic { + store: "IoUringObjectStore", + source: "io-uring response channel dropped".into(), + })??; + + let bytes = results.remove(0); + let stream = futures::stream::once(async { Ok(bytes) }).boxed(); + + Ok(GetResult { + payload: GetResultPayload::Stream(stream), + meta, + range: range.clone(), + attributes: Attributes::new(), + }) + } + + async fn get_ranges_uring( + &self, + location: &Path, + ranges: &[Range], + ) -> Result> { + if ranges.is_empty() { + return Ok(vec![]); + } + + let fs_path = self.resolve_path(location); + let (tx, rx) = tokio::sync::oneshot::channel(); + + // SAFETY: callers check uring_sender.is_some() before calling + let sender = self.uring_sender.as_ref().unwrap(); + sender + .send(uring::IoCommand::ReadRanges { + path: fs_path, + ranges: ranges.to_vec(), + response: tx, + }) + .map_err(|_| object_store::Error::Generic { + store: "IoUringObjectStore", + source: "io-uring worker thread is gone".into(), + })?; + + rx.await.map_err(|_| object_store::Error::Generic { + store: "IoUringObjectStore", + source: "io-uring response channel dropped".into(), + })? + } +} + +// ============================================================ +// ObjectStore trait implementation +// ============================================================ + +#[async_trait] +impl ObjectStore for IoUringObjectStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + self.inner.put_opts(location, payload, opts).await + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> Result> { + self.inner.put_multipart_opts(location, opts).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { + // Head-only requests don't need io_uring + if options.head { + return self.inner.get_opts(location, options).await; + } + + #[cfg(target_os = "linux")] + if self.uring_sender.is_some() { + return self.get_opts_uring(location, options).await; + } + + self.inner.get_opts(location, options).await + } + + async fn get_ranges( + &self, + location: &Path, + ranges: &[Range], + ) -> Result> { + #[cfg(target_os = "linux")] + if self.uring_sender.is_some() { + return self.get_ranges_uring(location, ranges).await; + } + + self.inner.get_ranges(location, ranges).await + } + + fn delete_stream( + &self, + locations: BoxStream<'static, Result>, + ) -> BoxStream<'static, Result> { + self.inner.delete_stream(locations) + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { + self.inner.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy_opts( + &self, + from: &Path, + to: &Path, + options: CopyOptions, + ) -> Result<()> { + self.inner.copy_opts(from, to, options).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use object_store::ObjectStoreExt; + + #[tokio::test] + async fn test_put_and_get() { + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()); + + let path = Path::from("test/data.txt"); + let payload = PutPayload::from_static(b"hello io_uring"); + store.put(&path, payload).await.unwrap(); + + let result = store.get(&path).await.unwrap(); + let bytes = result.bytes().await.unwrap(); + assert_eq!(bytes.as_ref(), b"hello io_uring"); + } + + #[tokio::test] + async fn test_get_range() { + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()); + + let path = Path::from("test/range.txt"); + let payload = PutPayload::from_static(b"0123456789"); + store.put(&path, payload).await.unwrap(); + + let bytes = store.get_range(&path, 2..5).await.unwrap(); + assert_eq!(bytes.as_ref(), b"234"); + } + + #[tokio::test] + async fn test_get_ranges() { + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()); + + let path = Path::from("test/ranges.txt"); + let payload = PutPayload::from_static(b"0123456789"); + store.put(&path, payload).await.unwrap(); + + let ranges = vec![0..3, 5..8]; + let results = store.get_ranges(&path, &ranges).await.unwrap(); + assert_eq!(results.len(), 2); + assert_eq!(results[0].as_ref(), b"012"); + assert_eq!(results[1].as_ref(), b"567"); + } + + #[tokio::test] + async fn test_head() { + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()); + + let path = Path::from("test/head.txt"); + let payload = PutPayload::from_static(b"hello"); + store.put(&path, payload).await.unwrap(); + + let meta = store.head(&path).await.unwrap(); + assert_eq!(meta.size, 5); + } + + #[tokio::test] + async fn test_list() { + use futures::TryStreamExt; + + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()); + + let path1 = Path::from("prefix/a.txt"); + let path2 = Path::from("prefix/b.txt"); + store + .put(&path1, PutPayload::from_static(b"a")) + .await + .unwrap(); + store + .put(&path2, PutPayload::from_static(b"b")) + .await + .unwrap(); + + let prefix = Path::from("prefix"); + let entries: Vec<_> = store.list(Some(&prefix)).try_collect().await.unwrap(); + assert_eq!(entries.len(), 2); + } + + #[tokio::test] + async fn test_empty_ranges() { + let dir = tempfile::tempdir().unwrap(); + let store = IoUringObjectStore::new_with_root(dir.path().to_path_buf()); + + let path = Path::from("test/empty.txt"); + let payload = PutPayload::from_static(b"data"); + store.put(&path, payload).await.unwrap(); + + let results = store.get_ranges(&path, &[]).await.unwrap(); + assert!(results.is_empty()); + } +} diff --git a/datafusion/object-store-iouring/src/uring.rs b/datafusion/object-store-iouring/src/uring.rs new file mode 100644 index 000000000000..9d0702a106bf --- /dev/null +++ b/datafusion/object-store-iouring/src/uring.rs @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Dedicated io_uring worker thread. +//! +//! Receives [`IoCommand`] requests via an unbounded channel, submits +//! batched read SQEs to the kernel, collects CQEs, and sends results +//! back via oneshot channels. + +use std::ops::Range; +use std::os::unix::io::AsRawFd; +use std::path::PathBuf; + +use bytes::Bytes; +use io_uring::{IoUring, opcode, types}; +use object_store::Result; +use tokio::sync::{mpsc, oneshot}; + +/// Ring size (number of SQ entries). 256 is a good default that +/// handles typical Parquet column-chunk batches without overflow. +const RING_ENTRIES: u32 = 256; + +/// Probe whether the kernel supports io_uring (may fail with EPERM in +/// Docker / seccomp-restricted environments). +pub(crate) fn is_available() -> std::result::Result<(), std::io::Error> { + IoUring::new(2).map(|_| ()) +} + +/// Command sent from the async ObjectStore methods to the io_uring thread. +pub(crate) enum IoCommand { + /// Read one or more byte ranges from a file. + /// All ranges are submitted as a batch in a single `io_uring_enter()`. + ReadRanges { + path: PathBuf, + ranges: Vec>, + response: oneshot::Sender>>, + }, +} + +/// Main loop for the io_uring worker thread. +/// +/// Blocks on the channel receiver, processes one [`IoCommand`] at a time, +/// and sends results back. The thread exits when the channel is closed +/// (i.e., when all senders are dropped). +pub(crate) fn run_uring_loop(mut rx: mpsc::UnboundedReceiver) { + let mut ring = match IoUring::new(RING_ENTRIES) { + Ok(ring) => ring, + Err(e) => { + log::error!("Failed to create io_uring instance: {e}"); + // Drain and error all pending requests + while let Some(cmd) = rx.blocking_recv() { + match cmd { + IoCommand::ReadRanges { response, .. } => { + let _ = response.send(Err(object_store::Error::Generic { + store: "IoUringObjectStore", + source: format!("io_uring init failed: {e}").into(), + })); + } + } + } + return; + } + }; + + while let Some(cmd) = rx.blocking_recv() { + match cmd { + IoCommand::ReadRanges { + path, + ranges, + response, + } => { + let result = execute_read_ranges(&mut ring, &path, &ranges); + let _ = response.send(result); + } + } + } + + log::debug!("io_uring worker thread exiting"); +} + +/// Execute a batch of byte-range reads using io_uring. +/// +/// Opens the file, submits all read SQEs (chunked by ring capacity), +/// waits for CQEs, and returns the results in order. +#[allow(clippy::result_large_err)] // object_store::Error is large by design +fn execute_read_ranges( + ring: &mut IoUring, + path: &std::path::Path, + ranges: &[Range], +) -> Result> { + let file = std::fs::File::open(path).map_err(|e| { + if e.kind() == std::io::ErrorKind::NotFound { + object_store::Error::NotFound { + path: path.display().to_string(), + source: e.into(), + } + } else { + object_store::Error::Generic { + store: "IoUringObjectStore", + source: e.into(), + } + } + })?; + + let fd = file.as_raw_fd(); + + // Allocate buffers for all ranges up front + let mut buffers: Vec> = ranges + .iter() + .map(|r| vec![0u8; (r.end - r.start) as usize]) + .collect(); + + let sq_capacity = ring.params().sq_entries() as usize; + + // Process ranges in chunks that fit the submission queue + for chunk_start in (0..ranges.len()).step_by(sq_capacity) { + let chunk_end = (chunk_start + sq_capacity).min(ranges.len()); + let chunk_len = chunk_end - chunk_start; + + // Submit read SQEs for this chunk + // SAFETY: buffers[i] is valid, properly sized, and lives until + // we collect the corresponding CQE below. + unsafe { + let mut sq = ring.submission(); + for i in chunk_start..chunk_end { + let entry = opcode::Read::new( + types::Fd(fd), + buffers[i].as_mut_ptr(), + buffers[i].len() as u32, + ) + .offset(ranges[i].start) + .build() + .user_data(i as u64); + + // If the SQ is full (shouldn't happen since we chunk), + // drop the queue, submit, and retry. + if sq.push(&entry).is_err() { + drop(sq); + ring.submit().map_err(io_err)?; + sq = ring.submission(); + sq.push(&entry).expect("SQ should have space after submit"); + } + } + sq.sync(); + } + + // Submit and wait for all reads in this chunk to complete + ring.submit_and_wait(chunk_len).map_err(io_err)?; + + // Collect completions + let mut completed = 0; + while completed < chunk_len { + let cq = ring.completion(); + for cqe in cq { + let idx = cqe.user_data() as usize; + let ret = cqe.result(); + if ret < 0 { + return Err(io_err(std::io::Error::from_raw_os_error(-ret))); + } + let bytes_read = ret as usize; + buffers[idx].truncate(bytes_read); + completed += 1; + } + } + } + + // Convert buffers to Bytes + Ok(buffers.into_iter().map(Bytes::from).collect()) +} + +fn io_err(e: std::io::Error) -> object_store::Error { + object_store::Error::Generic { + store: "IoUringObjectStore", + source: e.into(), + } +}