Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 92 additions & 52 deletions src/iceberg/util/snapshot_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@ namespace iceberg {

Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
const Table& table, int64_t snapshot_id) {
return table.SnapshotById(snapshot_id).and_then([&table](const auto& snapshot) {
return AncestorsOf(table, snapshot);
});
return AncestorsOf(*table.metadata(), snapshot_id);
}

Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
const TableMetadata& metadata, int64_t snapshot_id) {
return AncestorsOf(snapshot_id,
[&metadata](int64_t id) { return metadata.SnapshotById(id); });
}

Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
Expand All @@ -54,72 +58,96 @@ Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
});
}

Result<bool> SnapshotUtil::IsAncestorOf(const Table& table, int64_t snapshot_id,
Result<bool> SnapshotUtil::IsAncestorOf(const Table& table,
int64_t ancestor_snapshot_id) {
ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto& snapshot) {
return snapshot != nullptr && snapshot->snapshot_id == ancestor_snapshot_id;
});
return IsAncestorOf(*table.metadata(), ancestor_snapshot_id);
}

Result<bool> SnapshotUtil::IsAncestorOf(
int64_t snapshot_id, int64_t ancestor_snapshot_id,
const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup) {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, lookup(snapshot_id));
ICEBERG_CHECK(snapshot != nullptr, "Cannot find snapshot: {}", snapshot_id);
ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(snapshot, lookup));
return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto& ancestor) {
return ancestor != nullptr && ancestor->snapshot_id == ancestor_snapshot_id;
});
Result<bool> SnapshotUtil::IsAncestorOf(const TableMetadata& metadata,
int64_t ancestor_snapshot_id) {
ICEBERG_ASSIGN_OR_RAISE(auto current, metadata.Snapshot());
ICEBERG_CHECK(current != nullptr, "Current snapshot is null");
return IsAncestorOf(metadata, current->snapshot_id, ancestor_snapshot_id);
}

Result<bool> SnapshotUtil::IsAncestorOf(const Table& table,
Result<bool> SnapshotUtil::IsAncestorOf(const Table& table, int64_t snapshot_id,
int64_t ancestor_snapshot_id) {
ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
ICEBERG_CHECK(current != nullptr, "Current snapshot is null");
return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id);
return IsAncestorOf(*table.metadata(), snapshot_id, ancestor_snapshot_id);
}

Result<bool> SnapshotUtil::IsAncestorOf(const TableMetadata& metadata,
int64_t snapshot_id,
int64_t ancestor_snapshot_id) {
ICEBERG_ASSIGN_OR_RAISE(auto current, metadata.Snapshot());
ICEBERG_CHECK(current != nullptr, "Current snapshot is null");
return IsAncestorOf(snapshot_id, ancestor_snapshot_id,
[&metadata](int64_t id) { return metadata.SnapshotById(id); });
}

// Create a lookup function that uses the metadata
auto lookup = [&metadata](int64_t id) -> Result<std::shared_ptr<Snapshot>> {
return metadata.SnapshotById(id);
};
Result<bool> SnapshotUtil::IsAncestorOf(
int64_t snapshot_id, int64_t ancestor_snapshot_id,
const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup) {
if (snapshot_id == ancestor_snapshot_id) {
return true;
}

ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(current->snapshot_id, lookup));
return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto& snapshot) {
return snapshot != nullptr && snapshot->snapshot_id == ancestor_snapshot_id;
});
ICEBERG_ASSIGN_OR_RAISE(auto current, lookup(snapshot_id));
ICEBERG_CHECK(current != nullptr, "Cannot find snapshot: {}", snapshot_id);

while (current != nullptr && current->parent_snapshot_id.has_value()) {
int64_t parent_id = current->parent_snapshot_id.value();
auto parent_result = lookup(parent_id);
ICEBERG_ACTION_FOR_NOT_FOUND(parent_result, { break; });
if (parent_id == ancestor_snapshot_id) {
return true;
}
current = std::move(parent_result.value());
}

return false;
}

Result<bool> SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t snapshot_id,
int64_t ancestor_parent_snapshot_id) {
ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
return std::ranges::any_of(
ancestors, [ancestor_parent_snapshot_id](const auto& snapshot) {
return snapshot != nullptr && snapshot->parent_snapshot_id.has_value() &&
snapshot->parent_snapshot_id.value() == ancestor_parent_snapshot_id;
});
return IsParentAncestorOf(*table.metadata(), snapshot_id, ancestor_parent_snapshot_id);
}

Result<bool> SnapshotUtil::IsParentAncestorOf(const TableMetadata& metadata,
int64_t snapshot_id,
int64_t ancestor_parent_snapshot_id) {
return IsParentAncestorOf(
snapshot_id, ancestor_parent_snapshot_id,
[&metadata](int64_t id) { return metadata.SnapshotById(id); });
}

Result<bool> SnapshotUtil::IsParentAncestorOf(
int64_t snapshot_id, int64_t ancestor_parent_snapshot_id,
const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup) {
ICEBERG_ASSIGN_OR_RAISE(auto current, lookup(snapshot_id));
ICEBERG_CHECK(current != nullptr, "Cannot find snapshot: {}", snapshot_id);

while (current != nullptr) {
if (!current->parent_snapshot_id.has_value()) {
break;
}
if (current->parent_snapshot_id.value() == ancestor_parent_snapshot_id) {
return true;
}
auto parent_result = lookup(current->parent_snapshot_id.value());
ICEBERG_ACTION_FOR_NOT_FOUND(parent_result, { break; });
current = std::move(parent_result.value());
}

return false;
}

Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::CurrentAncestors(
const Table& table) {
ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
return AncestorsOf(table, current);
return CurrentAncestors(*table.metadata());
}

Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::CurrentAncestors(
const TableMetadata& metadata) {
ICEBERG_ASSIGN_OR_RAISE(auto current, metadata.Snapshot());
auto lookup = [&metadata](int64_t id) -> Result<std::shared_ptr<Snapshot>> {
return metadata.SnapshotById(id);
};

return AncestorsOf(current, lookup);
return AncestorsOf(metadata, current);
}

Result<std::vector<int64_t>> SnapshotUtil::CurrentAncestorIds(const Table& table) {
Expand All @@ -137,7 +165,12 @@ Result<std::optional<std::shared_ptr<Snapshot>>> SnapshotUtil::OldestAncestor(

Result<std::optional<std::shared_ptr<Snapshot>>> SnapshotUtil::OldestAncestorOf(
const Table& table, int64_t snapshot_id) {
ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
return OldestAncestorOf(*table.metadata(), snapshot_id);
}

Result<std::optional<std::shared_ptr<Snapshot>>> SnapshotUtil::OldestAncestorOf(
const TableMetadata& metadata, int64_t snapshot_id) {
ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(metadata, snapshot_id));
if (ancestors.empty()) {
return std::nullopt;
}
Expand All @@ -151,7 +184,7 @@ Result<std::optional<std::shared_ptr<Snapshot>>> SnapshotUtil::OldestAncestorAft
auto current = std::move(current_result.value());

std::optional<std::shared_ptr<Snapshot>> last_snapshot = std::nullopt;
ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, current));
ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(*table.metadata(), current));
for (const auto& snapshot : ancestors) {
auto snapshot_timestamp_ms = snapshot->timestamp_ms;
if (snapshot_timestamp_ms < timestamp_ms) {
Expand Down Expand Up @@ -200,29 +233,36 @@ Result<std::vector<int64_t>> SnapshotUtil::AncestorIdsBetween(
Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsBetween(
const Table& table, int64_t latest_snapshot_id,
std::optional<int64_t> oldest_snapshot_id) {
ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(latest_snapshot_id));
return AncestorsBetween(*table.metadata(), latest_snapshot_id, oldest_snapshot_id);
}

Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsBetween(
const TableMetadata& metadata, int64_t latest_snapshot_id,
std::optional<int64_t> oldest_snapshot_id) {
ICEBERG_ASSIGN_OR_RAISE(auto start, metadata.SnapshotById(latest_snapshot_id));

if (oldest_snapshot_id.has_value()) {
if (latest_snapshot_id == oldest_snapshot_id.value()) {
return {};
}

return AncestorsOf(start,
[&table, oldest_snapshot_id = oldest_snapshot_id.value()](
[&metadata, oldest_snapshot_id = oldest_snapshot_id.value()](
int64_t id) -> Result<std::shared_ptr<Snapshot>> {
if (id == oldest_snapshot_id) {
return nullptr;
}
return table.SnapshotById(id);
return metadata.SnapshotById(id);
});
} else {
return AncestorsOf(table, start);
return AncestorsOf(metadata, start);
}
}

Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
const Table& table, const std::shared_ptr<Snapshot>& snapshot) {
return AncestorsOf(snapshot, [&table](int64_t id) { return table.SnapshotById(id); });
const TableMetadata& metadata, const std::shared_ptr<Snapshot>& snapshot) {
return AncestorsOf(snapshot,
[&metadata](int64_t id) { return metadata.SnapshotById(id); });
}

Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
Expand Down
95 changes: 79 additions & 16 deletions src/iceberg/util/snapshot_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ class ICEBERG_EXPORT SnapshotUtil {
static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsOf(const Table& table,
int64_t snapshot_id);

/// \brief Returns a vector of ancestors of the given snapshot.
///
/// \param metadata The table metadata
/// \param snapshot_id The snapshot ID to start from
/// \return A vector of ancestor snapshots
static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsOf(
const TableMetadata& metadata, int64_t snapshot_id);

/// \brief Returns a vector of ancestors of the given snapshot.
///
/// \param snapshot_id The snapshot ID to start from
Expand All @@ -53,6 +61,23 @@ class ICEBERG_EXPORT SnapshotUtil {
int64_t snapshot_id,
const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup);

/// \brief Returns whether ancestor_snapshot_id is an ancestor of the table's current
/// state.
///
/// \param table The table to check
/// \param ancestor_snapshot_id The ancestor snapshot ID to check for
/// \return true if ancestor_snapshot_id is an ancestor of the current snapshot
static Result<bool> IsAncestorOf(const Table& table, int64_t ancestor_snapshot_id);

/// \brief Returns whether ancestor_snapshot_id is an ancestor of the metadata's current
/// state.
///
/// \param metadata The table metadata to check
/// \param ancestor_snapshot_id The ancestor snapshot ID to check for
/// \return true if ancestor_snapshot_id is an ancestor of the current snapshot
static Result<bool> IsAncestorOf(const TableMetadata& metadata,
int64_t ancestor_snapshot_id);

/// \brief Returns whether ancestor_snapshot_id is an ancestor of snapshot_id.
///
/// \param table The table to check
Expand All @@ -62,6 +87,15 @@ class ICEBERG_EXPORT SnapshotUtil {
static Result<bool> IsAncestorOf(const Table& table, int64_t snapshot_id,
int64_t ancestor_snapshot_id);

/// \brief Returns whether ancestor_snapshot_id is an ancestor of snapshot_id.
///
/// \param metadata The table metadata to check
/// \param snapshot_id The snapshot ID to check
/// \param ancestor_snapshot_id The ancestor snapshot ID to check for
/// \return true if ancestor_snapshot_id is an ancestor of snapshot_id
static Result<bool> IsAncestorOf(const TableMetadata& metadata, int64_t snapshot_id,
int64_t ancestor_snapshot_id);

/// \brief Returns whether ancestor_snapshot_id is an ancestor of snapshot_id using the
/// given lookup function.
///
Expand All @@ -73,32 +107,37 @@ class ICEBERG_EXPORT SnapshotUtil {
int64_t snapshot_id, int64_t ancestor_snapshot_id,
const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup);

/// \brief Returns whether ancestor_snapshot_id is an ancestor of the table's current
/// state.
/// \brief Returns whether some ancestor of snapshot_id has parentId matches
/// ancestor_parent_snapshot_id.
///
/// \param table The table to check
/// \param ancestor_snapshot_id The ancestor snapshot ID to check for
/// \return true if ancestor_snapshot_id is an ancestor of the current snapshot
static Result<bool> IsAncestorOf(const Table& table, int64_t ancestor_snapshot_id);
/// \param snapshot_id The snapshot ID to check
/// \param ancestor_parent_snapshot_id The ancestor parent snapshot ID to check for
/// \return true if any ancestor has the given parent ID
static Result<bool> IsParentAncestorOf(const Table& table, int64_t snapshot_id,
int64_t ancestor_parent_snapshot_id);

/// \brief Returns whether ancestor_snapshot_id is an ancestor of the metadata's current
/// state.
/// \brief Returns whether some ancestor of snapshot_id has parentId matches
/// ancestor_parent_snapshot_id.
///
/// \param metadata The table metadata to check
/// \param ancestor_snapshot_id The ancestor snapshot ID to check for
/// \return true if ancestor_snapshot_id is an ancestor of the current snapshot
static Result<bool> IsAncestorOf(const TableMetadata& metadata,
int64_t ancestor_snapshot_id);
/// \param snapshot_id The snapshot ID to check
/// \param ancestor_parent_snapshot_id The ancestor parent snapshot ID to check for
/// \return true if any ancestor has the given parent ID
static Result<bool> IsParentAncestorOf(const TableMetadata& metadata,
int64_t snapshot_id,
int64_t ancestor_parent_snapshot_id);

/// \brief Returns whether some ancestor of snapshot_id has parentId matches
/// ancestor_parent_snapshot_id.
///
/// \param table The table to check
/// \param snapshot_id The snapshot ID to check
/// \param ancestor_parent_snapshot_id The ancestor parent snapshot ID to check for
/// \param lookup Function to lookup snapshots by ID
/// \return true if any ancestor has the given parent ID
static Result<bool> IsParentAncestorOf(const Table& table, int64_t snapshot_id,
int64_t ancestor_parent_snapshot_id);
static Result<bool> IsParentAncestorOf(
int64_t snapshot_id, int64_t ancestor_parent_snapshot_id,
const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup);

/// \brief Returns a vector that traverses the table's snapshots from the current to the
/// last known ancestor.
Expand Down Expand Up @@ -147,6 +186,19 @@ class ICEBERG_EXPORT SnapshotUtil {
static Result<std::optional<std::shared_ptr<Snapshot>>> OldestAncestorOf(
const Table& table, int64_t snapshot_id);

/// \brief Traverses the history and finds the oldest ancestor of the specified
/// snapshot.
///
/// Oldest ancestor is defined as the ancestor snapshot whose parent is null or has been
/// expired. If the specified snapshot has no parent or parent has been expired, the
/// specified snapshot itself is returned.
///
/// \param metadata The table metadata
/// \param snapshot_id The ID of the snapshot to find the oldest ancestor
/// \return The oldest snapshot, or nullopt if not found
static Result<std::optional<std::shared_ptr<Snapshot>>> OldestAncestorOf(
const TableMetadata& metadata, int64_t snapshot_id);

/// \brief Traverses the history of the table's current snapshot, finds the oldest
/// snapshot that was committed either at or after a given time.
///
Expand Down Expand Up @@ -192,6 +244,17 @@ class ICEBERG_EXPORT SnapshotUtil {
const Table& table, int64_t latest_snapshot_id,
std::optional<int64_t> oldest_snapshot_id);

/// \brief Returns a vector of ancestors between two snapshots.
///
/// \param metadata The table metadata
/// \param latest_snapshot_id The latest snapshot ID
/// \param oldest_snapshot_id The oldest snapshot ID (optional, nullopt means all
/// ancestors)
/// \return A vector of ancestor snapshots between the two snapshots
static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsBetween(
const TableMetadata& metadata, int64_t latest_snapshot_id,
std::optional<int64_t> oldest_snapshot_id);

/// \brief Traverses the history of the table's current snapshot and finds the snapshot
/// with the given snapshot id as its parent.
///
Expand Down Expand Up @@ -317,11 +380,11 @@ class ICEBERG_EXPORT SnapshotUtil {
private:
/// \brief Helper function to traverse ancestors of a snapshot.
///
/// \param table The table
/// \param metadata The table metadata
/// \param snapshot The snapshot to start from
/// \return A vector of ancestor snapshots
static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsOf(
const Table& table, const std::shared_ptr<Snapshot>& snapshot);
const TableMetadata& metadata, const std::shared_ptr<Snapshot>& snapshot);

/// \brief Helper function to traverse ancestors of a snapshot using a lookup function.
///
Expand Down
Loading