From c45a4dd5268e13fc077afd41212ab0960c001364 Mon Sep 17 00:00:00 2001 From: Warm Beer Date: Mon, 11 May 2026 17:19:17 +0200 Subject: [PATCH] feat: add per-stream decrypt batch sizing Bump self_encryption to 0.36.0 for the new streaming_decrypt_with_batch_size API. --- Cargo.toml | 2 +- nodejs/Cargo.toml | 2 +- src/lib.rs | 19 ++++++- src/stream_decrypt.rs | 114 ++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 129 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 268036f72..91d7073f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ license = "GPL-3.0" name = "self_encryption" readme = "README.md" repository = "https://github.com/maidsafe/self_encryption" -version = "0.35.0" +version = "0.36.0" [features] default = [] diff --git a/nodejs/Cargo.toml b/nodejs/Cargo.toml index 4dae06356..e88cb09b7 100644 --- a/nodejs/Cargo.toml +++ b/nodejs/Cargo.toml @@ -12,7 +12,7 @@ crate-type = ["cdylib"] hex = "0.4.3" napi = { version = "2.12.2", default-features = false, features = ["napi4", "napi6", "tokio_rt", "serde-json"] } napi-derive = "2.12.2" -self_encryption = { version = "0.35.0", path = ".." } +self_encryption = { version = "0.36.0", path = ".." } [build-dependencies] napi-build = "2.0.1" diff --git a/src/lib.rs b/src/lib.rs index 8dac5038d..043923c63 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -111,7 +111,7 @@ pub use xor_name::XorName; pub use self::{ data_map::{ChunkInfo, DataMap}, error::{Error, Result}, - stream_decrypt::{streaming_decrypt, DecryptionStream}, + stream_decrypt::{streaming_decrypt, streaming_decrypt_with_batch_size, DecryptionStream}, stream_encrypt::{stream_encrypt, ChunkStream, EncryptionStream}, }; use bytes::Bytes; @@ -124,13 +124,28 @@ pub use xor_name; /// Batch size for streaming decrypt chunk fetching. /// /// Can be overridden by the `STREAM_DECRYPT_BATCH_SIZE` environment variable. +/// Invalid values and `0` fall back to [`DEFAULT_STREAM_DECRYPT_BATCH_SIZE`]. pub static STREAM_DECRYPT_BATCH_SIZE: LazyLock = LazyLock::new(|| { std::env::var("STREAM_DECRYPT_BATCH_SIZE") .ok() .and_then(|s| s.parse().ok()) - .unwrap_or(10) + .filter(|n| *n > 0) + .unwrap_or(DEFAULT_STREAM_DECRYPT_BATCH_SIZE) }); +/// Default batch size for streaming decrypt chunk fetching. +pub const DEFAULT_STREAM_DECRYPT_BATCH_SIZE: usize = 10; + +/// Read the current streaming decrypt batch size. +/// +/// This reads the legacy `STREAM_DECRYPT_BATCH_SIZE` environment variable on +/// first use, falling back to [`DEFAULT_STREAM_DECRYPT_BATCH_SIZE`]. New code +/// that needs explicit per-stream tuning should use +/// [`streaming_decrypt_with_batch_size`]. +pub fn stream_decrypt_batch_size() -> usize { + *STREAM_DECRYPT_BATCH_SIZE +} + /// The minimum size (before compression) of data to be self-encrypted, defined as 3B. pub const MIN_ENCRYPTABLE_BYTES: usize = 3 * MIN_CHUNK_SIZE; diff --git a/src/stream_decrypt.rs b/src/stream_decrypt.rs index efb10e1b3..c34a6f853 100644 --- a/src/stream_decrypt.rs +++ b/src/stream_decrypt.rs @@ -9,8 +9,8 @@ //! Streaming decryption functionality for memory-efficient processing of large encrypted files. use crate::{ - decrypt::decrypt_chunk, get_root_data_map_parallel, utils::extract_hashes, ChunkInfo, DataMap, - Result, STREAM_DECRYPT_BATCH_SIZE, + decrypt::decrypt_chunk, get_root_data_map_parallel, stream_decrypt_batch_size, + utils::extract_hashes, ChunkInfo, DataMap, Result, }; use bytes::Bytes; use std::collections::HashMap; @@ -33,6 +33,7 @@ pub struct DecryptionStream { current_batch_start: usize, current_batch_chunks: Vec, current_batch_index: usize, + batch_size: usize, } impl DecryptionStream @@ -46,6 +47,27 @@ where /// * `data_map` - The data map containing chunk information /// * `get_chunk_parallel` - Function to retrieve chunks in parallel pub fn new(data_map: &DataMap, get_chunk_parallel: F) -> Result { + Self::new_with_batch_size(data_map, get_chunk_parallel, stream_decrypt_batch_size()) + } + + /// Creates a new streaming decrypt iterator with an explicit batch size. + /// + /// # Arguments + /// + /// * `data_map` - The data map containing chunk information + /// * `get_chunk_parallel` - Function to retrieve chunks in parallel + /// * `batch_size` - Number of chunks to fetch/decrypt per batch. Must be greater than 0. + pub fn new_with_batch_size( + data_map: &DataMap, + get_chunk_parallel: F, + batch_size: usize, + ) -> Result { + if batch_size == 0 { + return Err(crate::Error::Generic( + "stream decrypt batch size must be > 0".to_string(), + )); + } + let root_map = if data_map.is_child() { get_root_data_map_parallel(data_map.clone(), &get_chunk_parallel)? } else { @@ -65,6 +87,7 @@ where current_batch_start: 0, current_batch_chunks: Vec::new(), current_batch_index: 0, + batch_size, }) } @@ -74,8 +97,10 @@ where return Ok(false); // No more chunks } - let batch_end = - (self.current_batch_start + *STREAM_DECRYPT_BATCH_SIZE).min(self.chunk_infos.len()); + let batch_end = self + .current_batch_start + .saturating_add(self.batch_size) + .min(self.chunk_infos.len()); let batch_infos = self .chunk_infos .get(self.current_batch_start..batch_end) @@ -507,6 +532,21 @@ where DecryptionStream::new(data_map, get_chunk_parallel) } +/// Creates a streaming decrypt iterator with an explicit batch size. +/// +/// This is the preferred API for callers that need to tune download +/// throughput without relying on process-wide environment variables. +pub fn streaming_decrypt_with_batch_size( + data_map: &DataMap, + get_chunk_parallel: F, + batch_size: usize, +) -> Result> +where + F: Fn(&[(usize, XorName)]) -> Result>, +{ + DecryptionStream::new_with_batch_size(data_map, get_chunk_parallel, batch_size) +} + #[cfg(test)] mod tests { use super::*; @@ -600,6 +640,71 @@ mod tests { Ok(()) } + #[test] + fn test_streaming_decrypt_explicit_batch_size() -> Result<()> { + let original_data = random_bytes(9_000_000); + let (data_map, encrypted_chunks) = encrypt(original_data.clone())?; + + let mut storage = HashMap::new(); + for chunk in encrypted_chunks { + let hash = crate::hash::content_hash(&chunk.content); + let _ = storage.insert(hash, chunk.content.to_vec()); + } + + let observed_batch_sizes = std::cell::RefCell::new(Vec::new()); + let get_chunks = |hashes: &[(usize, XorName)]| -> Result> { + observed_batch_sizes.borrow_mut().push(hashes.len()); + let mut results = Vec::new(); + for &(index, hash) in hashes { + if let Some(data) = storage.get(&hash) { + results.push((index, Bytes::from(data.clone()))); + } else { + return Err(Error::Generic(format!( + "Chunk not found: {}", + hex::encode(hash) + ))); + } + } + Ok(results) + }; + + let stream = streaming_decrypt_with_batch_size(&data_map, get_chunks, 2)?; + let mut decrypted_data = Vec::new(); + for chunk_result in stream { + let chunk = chunk_result?; + decrypted_data.extend_from_slice(&chunk); + } + + assert_eq!(decrypted_data, original_data.to_vec()); + let observed = observed_batch_sizes.borrow(); + assert!( + observed.len() >= 2, + "expected multiple fetch batches, got {:?}", + observed + ); + assert!( + observed.iter().all(|size| *size <= 2), + "all batches should respect explicit size, got {:?}", + observed + ); + Ok(()) + } + + #[test] + fn test_streaming_decrypt_explicit_batch_size_rejects_zero() -> Result<()> { + let original_data = random_bytes(50_000); + let (data_map, _encrypted_chunks) = encrypt(original_data)?; + let get_chunks = + |_hashes: &[(usize, XorName)]| -> Result> { Ok(Vec::new()) }; + + let err = match streaming_decrypt_with_batch_size(&data_map, get_chunks, 0) { + Ok(_) => panic!("zero batch size should fail"), + Err(err) => err, + }; + assert!(err.to_string().contains("> 0"), "got: {}", err); + Ok(()) + } + #[test] fn test_streaming_decrypt_error_handling() -> Result<()> { // Create test data @@ -979,6 +1084,7 @@ mod tests { current_batch_start: 0, current_batch_chunks: Vec::new(), current_batch_index: 0, + batch_size: stream_decrypt_batch_size(), }; // Use the new get_chunk_index_from_infos method instead of the utility function