Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,38 @@ impl LogSegment {
Ok(segment)
}

/// Try to create a new [`LogSegment`] from a slice of the log.
///
/// This will create a new [`LogSegment`] from the log with JUST relevant commit log files
/// starting at `start_version` and ending at `end_version`.
/// This ignores check point files, allowing to reconstruct a pseudo-changelog for quick preview
/// of recent data.
pub async fn try_recent_commits(
table_root: &Path,
start_version: i64,
store: &dyn ObjectStore,
max_commits: usize,
) -> DeltaResult<Self> {
debug!("try_recent_commits: start_version: {start_version}",);
let log_url = table_root.child("_delta_log");
let mut commit_files =
list_commit_files(store, &log_url, None, Some(start_version)).await?;

// max count of commits to load without starting from checkpoint
commit_files.truncate(max_commits);

let mut segment = Self {
version: start_version,
commit_files: commit_files.into(),
checkpoint_files: vec![],
};
segment.version = segment.file_version().unwrap_or(start_version);

segment.validate()?;

Ok(segment)
}

pub fn validate(&self) -> DeltaResult<()> {
let is_contiguous = self
.commit_files
Expand Down Expand Up @@ -576,6 +608,39 @@ pub(super) async fn list_log_files(
Ok((commit_files, checkpoint_files))
}

/// List relevant commit files, ignoring checkpoints
///
/// See `try_recent_commits` for more details on how this is used
pub(super) async fn list_commit_files(
fs_client: &dyn ObjectStore,
log_root: &Path,
max_version: Option<i64>,
start_version: Option<i64>,
) -> DeltaResult<Vec<ObjectMeta>> {
let max_version = max_version.unwrap_or(i64::MAX - 1);
let start_from = log_root.child(format!("{:020}", start_version.unwrap_or(0)).as_str());

let mut commit_files = Vec::with_capacity(25);

for meta in fs_client
.list_with_offset(Some(log_root), &start_from)
.try_collect::<Vec<_>>()
.await?
{
if meta.location.commit_version().unwrap_or(i64::MAX) <= max_version
&& meta.location.commit_version() >= start_version
&& meta.location.is_commit_file()
{
commit_files.push(meta);
}
}

// NOTE this will sort in reverse order
commit_files.sort_unstable_by(|a, b| b.location.cmp(&a.location));

Ok(commit_files)
}

#[cfg(test)]
pub(super) mod tests {
use delta_kernel::table_features::{ReaderFeature, WriterFeature};
Expand Down
69 changes: 64 additions & 5 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
//!
//!

use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::io::{BufRead, BufReader, Cursor};
use std::sync::Arc;

use ::serde::{Deserialize, Serialize};
use arrow_array::RecordBatch;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use object_store::path::Path;
use object_store::ObjectStore;
use tracing::warn;
use object_store::{ObjectMeta, ObjectStore};
use ::serde::{Deserialize, Serialize};
use tracing::{debug, info, warn};

use self::log_segment::{LogSegment, PathExt};
use self::parse::{read_adds, read_removes};
Expand Down Expand Up @@ -72,7 +73,17 @@ impl Snapshot {
config: DeltaTableConfig,
version: Option<i64>,
) -> DeltaResult<Self> {
let log_segment = LogSegment::try_new(table_root, version, store.as_ref()).await?;
let log_segment = if config.pseudo_cdf {
// we need the full changes for metadata loading
info!(
"Loading table with pseudo-cdf, target version is: {:?}",
version
);
LogSegment::try_new(table_root, None, store.as_ref()).await?
} else {
// classic behaviour, only load metadata up to version
LogSegment::try_new(table_root, version, store.as_ref()).await?
};
let (protocol, metadata) = log_segment.read_metadata(store.clone(), &config).await?;
if metadata.is_none() || protocol.is_none() {
return Err(DeltaTableError::Generic(
Expand All @@ -84,6 +95,54 @@ impl Snapshot {

PROTOCOL.can_read_from_protocol(&protocol)?;

// TODO: extract to helper function
let log_segment = if config.pseudo_cdf {
// unless provided with a start version, we only load the last commit
let start_version = version.unwrap_or(log_segment.version);

// check if we already have loaded the needed version, discarding VACUUM commits
let min_version = log_segment
.commit_files
.iter()
// simple heuristic for ruling out VACUUM commits (in practice ~900 bytes)
// if this fails, we would need to use commits_stream() and look explicitly for adds
.filter(|f| f.size > 1500)
.filter_map(|f| f.location.commit_version())
.min()
.unwrap_or(i64::MAX);

if start_version >= min_version {
let mut recent_commits: VecDeque<ObjectMeta> = log_segment
.commit_files
.iter()
// go back 2 more commits in order to make sure we skip over VACUUM start & end
.filter(|f| f.location.commit_version().unwrap_or(0) >= start_version - 2)
.cloned()
.collect();
recent_commits.truncate(config.pseudo_cdf_max_commits);
debug!("Reusing existing log files: {:?}", recent_commits);
LogSegment {
version: log_segment.version,
commit_files: recent_commits,
checkpoint_files: vec![],
}
} else {
// go back 2 more commits in order to make sure we skip over VACUUM start & end
let start_version = (start_version - 2).max(0);
let segment = LogSegment::try_recent_commits(
table_root,
start_version,
store.as_ref(),
config.pseudo_cdf_max_commits,
)
.await?;
debug!("Version not found after checkpoint, reloading slice from version: {start_version}: {:?}", segment.commit_files);
segment
}
} else {
log_segment
};

Ok(Self {
log_segment,
config,
Expand Down
38 changes: 36 additions & 2 deletions crates/core/src/table/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ pub struct DeltaTableConfig {
/// Hence, DeltaTable will be loaded with significant memory reduction.
pub require_files: bool,

/// Enables "pseudo-changelog" by only loading JSON commit files starting with specified version
/// Please note that the meaning of version or datestring is flipped, instead of loading data
/// UP TO the version, it will now load data added AFTER this version
pub pseudo_cdf: bool,

/// Max number of commits to load
pub pseudo_cdf_max_commits: usize,

/// Controls how many files to buffer from the commit log when updating the table.
/// This defaults to 4 * number of cpus
///
Expand All @@ -57,13 +65,15 @@ pub struct DeltaTableConfig {
pub io_runtime: Option<IORuntime>,

#[delta(skip)]
pub options: HashMap<String, String>
pub options: HashMap<String, String>,
}

impl Default for DeltaTableConfig {
fn default() -> Self {
Self {
require_files: true,
pseudo_cdf: false,
pseudo_cdf_max_commits: 1024,
log_buffer_size: num_cpus::get() * 4,
log_batch_size: 1024,
io_runtime: None,
Expand Down Expand Up @@ -149,6 +159,23 @@ impl DeltaTableBuilder {
self
}

/// Sets `pseudo_cdf=true` to the builder
pub fn with_pseudo_cdf(mut self) -> Self {
self.table_config.pseudo_cdf = true;
self
}

/// Sets `log_buffer_size` to the builder
pub fn with_pseudo_cdf_max_commits(mut self, max_commits: usize) -> DeltaResult<Self> {
if max_commits == 0 {
return Err(DeltaTableError::Generic(String::from(
"Max number of commits should be positive",
)));
}
self.table_config.pseudo_cdf_max_commits = max_commits;
Ok(self)
}

/// Sets `version` to the builder
pub fn with_version(mut self, version: i64) -> Self {
self.version = DeltaVersion::Version(version);
Expand Down Expand Up @@ -211,7 +238,14 @@ impl DeltaTableBuilder {
storage_options
.clone()
.into_iter()
.map(|(k, v)| (k.strip_prefix("deltalake.").map(ToString::to_string).unwrap_or(k), v))
.map(|(k, v)| {
(
k.strip_prefix("deltalake.")
.map(ToString::to_string)
.unwrap_or(k),
v,
)
})
.map(|(k, v)| {
let needs_trim = v.starts_with("http://")
|| v.starts_with("https://")
Expand Down
Loading