From b1525ce56af7c3aa14379030e907eaf132da2d0a Mon Sep 17 00:00:00 2001 From: Adrian Tanase Date: Mon, 2 Jun 2025 15:44:19 +0300 Subject: [PATCH] [HSTACK] - add support for pseudo-CDF (load recent commits & skip checkpoints) Signed-off-by: Adrian Tanase --- .../core/src/kernel/snapshot/log_segment.rs | 65 +++++++++++++++++ crates/core/src/kernel/snapshot/mod.rs | 69 +++++++++++++++++-- crates/core/src/table/builder.rs | 38 +++++++++- 3 files changed, 165 insertions(+), 7 deletions(-) diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index 32a7a0e27..07a06df79 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -194,6 +194,38 @@ impl LogSegment { Ok(segment) } + /// Try to create a new [`LogSegment`] from a slice of the log. + /// + /// This will create a new [`LogSegment`] from the log with JUST relevant commit log files + /// starting at `start_version` and ending at `end_version`. + /// This ignores check point files, allowing to reconstruct a pseudo-changelog for quick preview + /// of recent data. + pub async fn try_recent_commits( + table_root: &Path, + start_version: i64, + store: &dyn ObjectStore, + max_commits: usize, + ) -> DeltaResult { + debug!("try_recent_commits: start_version: {start_version}",); + let log_url = table_root.child("_delta_log"); + let mut commit_files = + list_commit_files(store, &log_url, None, Some(start_version)).await?; + + // max count of commits to load without starting from checkpoint + commit_files.truncate(max_commits); + + let mut segment = Self { + version: start_version, + commit_files: commit_files.into(), + checkpoint_files: vec![], + }; + segment.version = segment.file_version().unwrap_or(start_version); + + segment.validate()?; + + Ok(segment) + } + pub fn validate(&self) -> DeltaResult<()> { let is_contiguous = self .commit_files @@ -576,6 +608,39 @@ pub(super) async fn list_log_files( Ok((commit_files, checkpoint_files)) } +/// List relevant commit files, ignoring checkpoints +/// +/// See `try_recent_commits` for more details on how this is used +pub(super) async fn list_commit_files( + fs_client: &dyn ObjectStore, + log_root: &Path, + max_version: Option, + start_version: Option, +) -> DeltaResult> { + let max_version = max_version.unwrap_or(i64::MAX - 1); + let start_from = log_root.child(format!("{:020}", start_version.unwrap_or(0)).as_str()); + + let mut commit_files = Vec::with_capacity(25); + + for meta in fs_client + .list_with_offset(Some(log_root), &start_from) + .try_collect::>() + .await? + { + if meta.location.commit_version().unwrap_or(i64::MAX) <= max_version + && meta.location.commit_version() >= start_version + && meta.location.is_commit_file() + { + commit_files.push(meta); + } + } + + // NOTE this will sort in reverse order + commit_files.sort_unstable_by(|a, b| b.location.cmp(&a.location)); + + Ok(commit_files) +} + #[cfg(test)] pub(super) mod tests { use delta_kernel::table_features::{ReaderFeature, WriterFeature}; diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 24ca09025..292e7fe50 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -15,17 +15,18 @@ //! //! -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::io::{BufRead, BufReader, Cursor}; use std::sync::Arc; -use ::serde::{Deserialize, Serialize}; use arrow_array::RecordBatch; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; +use itertools::Itertools; use object_store::path::Path; -use object_store::ObjectStore; -use tracing::warn; +use object_store::{ObjectMeta, ObjectStore}; +use ::serde::{Deserialize, Serialize}; +use tracing::{debug, info, warn}; use self::log_segment::{LogSegment, PathExt}; use self::parse::{read_adds, read_removes}; @@ -72,7 +73,17 @@ impl Snapshot { config: DeltaTableConfig, version: Option, ) -> DeltaResult { - let log_segment = LogSegment::try_new(table_root, version, store.as_ref()).await?; + let log_segment = if config.pseudo_cdf { + // we need the full changes for metadata loading + info!( + "Loading table with pseudo-cdf, target version is: {:?}", + version + ); + LogSegment::try_new(table_root, None, store.as_ref()).await? + } else { + // classic behaviour, only load metadata up to version + LogSegment::try_new(table_root, version, store.as_ref()).await? + }; let (protocol, metadata) = log_segment.read_metadata(store.clone(), &config).await?; if metadata.is_none() || protocol.is_none() { return Err(DeltaTableError::Generic( @@ -84,6 +95,54 @@ impl Snapshot { PROTOCOL.can_read_from_protocol(&protocol)?; + // TODO: extract to helper function + let log_segment = if config.pseudo_cdf { + // unless provided with a start version, we only load the last commit + let start_version = version.unwrap_or(log_segment.version); + + // check if we already have loaded the needed version, discarding VACUUM commits + let min_version = log_segment + .commit_files + .iter() + // simple heuristic for ruling out VACUUM commits (in practice ~900 bytes) + // if this fails, we would need to use commits_stream() and look explicitly for adds + .filter(|f| f.size > 1500) + .filter_map(|f| f.location.commit_version()) + .min() + .unwrap_or(i64::MAX); + + if start_version >= min_version { + let mut recent_commits: VecDeque = log_segment + .commit_files + .iter() + // go back 2 more commits in order to make sure we skip over VACUUM start & end + .filter(|f| f.location.commit_version().unwrap_or(0) >= start_version - 2) + .cloned() + .collect(); + recent_commits.truncate(config.pseudo_cdf_max_commits); + debug!("Reusing existing log files: {:?}", recent_commits); + LogSegment { + version: log_segment.version, + commit_files: recent_commits, + checkpoint_files: vec![], + } + } else { + // go back 2 more commits in order to make sure we skip over VACUUM start & end + let start_version = (start_version - 2).max(0); + let segment = LogSegment::try_recent_commits( + table_root, + start_version, + store.as_ref(), + config.pseudo_cdf_max_commits, + ) + .await?; + debug!("Version not found after checkpoint, reloading slice from version: {start_version}: {:?}", segment.commit_files); + segment + } + } else { + log_segment + }; + Ok(Self { log_segment, config, diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 6a6da9bf9..2966303b6 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -38,6 +38,14 @@ pub struct DeltaTableConfig { /// Hence, DeltaTable will be loaded with significant memory reduction. pub require_files: bool, + /// Enables "pseudo-changelog" by only loading JSON commit files starting with specified version + /// Please note that the meaning of version or datestring is flipped, instead of loading data + /// UP TO the version, it will now load data added AFTER this version + pub pseudo_cdf: bool, + + /// Max number of commits to load + pub pseudo_cdf_max_commits: usize, + /// Controls how many files to buffer from the commit log when updating the table. /// This defaults to 4 * number of cpus /// @@ -57,13 +65,15 @@ pub struct DeltaTableConfig { pub io_runtime: Option, #[delta(skip)] - pub options: HashMap + pub options: HashMap, } impl Default for DeltaTableConfig { fn default() -> Self { Self { require_files: true, + pseudo_cdf: false, + pseudo_cdf_max_commits: 1024, log_buffer_size: num_cpus::get() * 4, log_batch_size: 1024, io_runtime: None, @@ -149,6 +159,23 @@ impl DeltaTableBuilder { self } + /// Sets `pseudo_cdf=true` to the builder + pub fn with_pseudo_cdf(mut self) -> Self { + self.table_config.pseudo_cdf = true; + self + } + + /// Sets `log_buffer_size` to the builder + pub fn with_pseudo_cdf_max_commits(mut self, max_commits: usize) -> DeltaResult { + if max_commits == 0 { + return Err(DeltaTableError::Generic(String::from( + "Max number of commits should be positive", + ))); + } + self.table_config.pseudo_cdf_max_commits = max_commits; + Ok(self) + } + /// Sets `version` to the builder pub fn with_version(mut self, version: i64) -> Self { self.version = DeltaVersion::Version(version); @@ -211,7 +238,14 @@ impl DeltaTableBuilder { storage_options .clone() .into_iter() - .map(|(k, v)| (k.strip_prefix("deltalake.").map(ToString::to_string).unwrap_or(k), v)) + .map(|(k, v)| { + ( + k.strip_prefix("deltalake.") + .map(ToString::to_string) + .unwrap_or(k), + v, + ) + }) .map(|(k, v)| { let needs_trim = v.starts_with("http://") || v.starts_with("https://")