Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ license = "GPL-3.0"
name = "self_encryption"
readme = "README.md"
repository = "https://github.com/maidsafe/self_encryption"
version = "0.35.0"
version = "0.36.0"

[features]
default = []
Expand Down
2 changes: 1 addition & 1 deletion nodejs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ crate-type = ["cdylib"]
hex = "0.4.3"
napi = { version = "2.12.2", default-features = false, features = ["napi4", "napi6", "tokio_rt", "serde-json"] }
napi-derive = "2.12.2"
self_encryption = { version = "0.35.0", path = ".." }
self_encryption = { version = "0.36.0", path = ".." }

[build-dependencies]
napi-build = "2.0.1"
19 changes: 17 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub use xor_name::XorName;
pub use self::{
data_map::{ChunkInfo, DataMap},
error::{Error, Result},
stream_decrypt::{streaming_decrypt, DecryptionStream},
stream_decrypt::{streaming_decrypt, streaming_decrypt_with_batch_size, DecryptionStream},
stream_encrypt::{stream_encrypt, ChunkStream, EncryptionStream},
};
use bytes::Bytes;
Expand All @@ -124,13 +124,28 @@ pub use xor_name;
/// Batch size for streaming decrypt chunk fetching.
///
/// Can be overridden by the `STREAM_DECRYPT_BATCH_SIZE` environment variable.
/// Invalid values and `0` fall back to [`DEFAULT_STREAM_DECRYPT_BATCH_SIZE`].
pub static STREAM_DECRYPT_BATCH_SIZE: LazyLock<usize> = LazyLock::new(|| {
std::env::var("STREAM_DECRYPT_BATCH_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(10)
.filter(|n| *n > 0)
.unwrap_or(DEFAULT_STREAM_DECRYPT_BATCH_SIZE)
});

/// Default batch size for streaming decrypt chunk fetching.
pub const DEFAULT_STREAM_DECRYPT_BATCH_SIZE: usize = 10;

/// Read the current streaming decrypt batch size.
///
/// This reads the legacy `STREAM_DECRYPT_BATCH_SIZE` environment variable on
/// first use, falling back to [`DEFAULT_STREAM_DECRYPT_BATCH_SIZE`]. New code
/// that needs explicit per-stream tuning should use
/// [`streaming_decrypt_with_batch_size`].
pub fn stream_decrypt_batch_size() -> usize {
*STREAM_DECRYPT_BATCH_SIZE
}

/// The minimum size (before compression) of data to be self-encrypted, defined as 3B.
pub const MIN_ENCRYPTABLE_BYTES: usize = 3 * MIN_CHUNK_SIZE;

Expand Down
114 changes: 110 additions & 4 deletions src/stream_decrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
//! Streaming decryption functionality for memory-efficient processing of large encrypted files.

use crate::{
decrypt::decrypt_chunk, get_root_data_map_parallel, utils::extract_hashes, ChunkInfo, DataMap,
Result, STREAM_DECRYPT_BATCH_SIZE,
decrypt::decrypt_chunk, get_root_data_map_parallel, stream_decrypt_batch_size,
utils::extract_hashes, ChunkInfo, DataMap, Result,
};
use bytes::Bytes;
use std::collections::HashMap;
Expand All @@ -33,6 +33,7 @@ pub struct DecryptionStream<F> {
current_batch_start: usize,
current_batch_chunks: Vec<Bytes>,
current_batch_index: usize,
batch_size: usize,
}

impl<F> DecryptionStream<F>
Expand All @@ -46,6 +47,27 @@ where
/// * `data_map` - The data map containing chunk information
/// * `get_chunk_parallel` - Function to retrieve chunks in parallel
pub fn new(data_map: &DataMap, get_chunk_parallel: F) -> Result<Self> {
Self::new_with_batch_size(data_map, get_chunk_parallel, stream_decrypt_batch_size())
}

/// Creates a new streaming decrypt iterator with an explicit batch size.
///
/// # Arguments
///
/// * `data_map` - The data map containing chunk information
/// * `get_chunk_parallel` - Function to retrieve chunks in parallel
/// * `batch_size` - Number of chunks to fetch/decrypt per batch. Must be greater than 0.
pub fn new_with_batch_size(
data_map: &DataMap,
get_chunk_parallel: F,
batch_size: usize,
) -> Result<Self> {
if batch_size == 0 {
return Err(crate::Error::Generic(
"stream decrypt batch size must be > 0".to_string(),
));
}

let root_map = if data_map.is_child() {
get_root_data_map_parallel(data_map.clone(), &get_chunk_parallel)?
} else {
Expand All @@ -65,6 +87,7 @@ where
current_batch_start: 0,
current_batch_chunks: Vec::new(),
current_batch_index: 0,
batch_size,
})
}

Expand All @@ -74,8 +97,10 @@ where
return Ok(false); // No more chunks
}

let batch_end =
(self.current_batch_start + *STREAM_DECRYPT_BATCH_SIZE).min(self.chunk_infos.len());
let batch_end = self
.current_batch_start
.saturating_add(self.batch_size)
.min(self.chunk_infos.len());
let batch_infos = self
.chunk_infos
.get(self.current_batch_start..batch_end)
Expand Down Expand Up @@ -507,6 +532,21 @@ where
DecryptionStream::new(data_map, get_chunk_parallel)
}

/// Creates a streaming decrypt iterator with an explicit batch size.
///
/// This is the preferred API for callers that need to tune download
/// throughput without relying on process-wide environment variables.
pub fn streaming_decrypt_with_batch_size<F>(
data_map: &DataMap,
get_chunk_parallel: F,
batch_size: usize,
) -> Result<DecryptionStream<F>>
where
F: Fn(&[(usize, XorName)]) -> Result<Vec<(usize, Bytes)>>,
{
DecryptionStream::new_with_batch_size(data_map, get_chunk_parallel, batch_size)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -600,6 +640,71 @@ mod tests {
Ok(())
}

#[test]
fn test_streaming_decrypt_explicit_batch_size() -> Result<()> {
let original_data = random_bytes(9_000_000);
let (data_map, encrypted_chunks) = encrypt(original_data.clone())?;

let mut storage = HashMap::new();
for chunk in encrypted_chunks {
let hash = crate::hash::content_hash(&chunk.content);
let _ = storage.insert(hash, chunk.content.to_vec());
}

let observed_batch_sizes = std::cell::RefCell::new(Vec::new());
let get_chunks = |hashes: &[(usize, XorName)]| -> Result<Vec<(usize, Bytes)>> {
observed_batch_sizes.borrow_mut().push(hashes.len());
let mut results = Vec::new();
for &(index, hash) in hashes {
if let Some(data) = storage.get(&hash) {
results.push((index, Bytes::from(data.clone())));
} else {
return Err(Error::Generic(format!(
"Chunk not found: {}",
hex::encode(hash)
)));
}
}
Ok(results)
};

let stream = streaming_decrypt_with_batch_size(&data_map, get_chunks, 2)?;
let mut decrypted_data = Vec::new();
for chunk_result in stream {
let chunk = chunk_result?;
decrypted_data.extend_from_slice(&chunk);
}

assert_eq!(decrypted_data, original_data.to_vec());
let observed = observed_batch_sizes.borrow();
assert!(
observed.len() >= 2,
"expected multiple fetch batches, got {:?}",
observed
);
assert!(
observed.iter().all(|size| *size <= 2),
"all batches should respect explicit size, got {:?}",
observed
);
Ok(())
}

#[test]
fn test_streaming_decrypt_explicit_batch_size_rejects_zero() -> Result<()> {
let original_data = random_bytes(50_000);
let (data_map, _encrypted_chunks) = encrypt(original_data)?;
let get_chunks =
|_hashes: &[(usize, XorName)]| -> Result<Vec<(usize, Bytes)>> { Ok(Vec::new()) };

let err = match streaming_decrypt_with_batch_size(&data_map, get_chunks, 0) {
Ok(_) => panic!("zero batch size should fail"),
Err(err) => err,
};
assert!(err.to_string().contains("> 0"), "got: {}", err);
Ok(())
}

#[test]
fn test_streaming_decrypt_error_handling() -> Result<()> {
// Create test data
Expand Down Expand Up @@ -979,6 +1084,7 @@ mod tests {
current_batch_start: 0,
current_batch_chunks: Vec::new(),
current_batch_index: 0,
batch_size: stream_decrypt_batch_size(),
};

// Use the new get_chunk_index_from_infos method instead of the utility function
Expand Down
Loading