Skip to content

Enable incremental read#2153

Closed
xanderbailey wants to merge 7 commits intoapache:mainfrom
xanderbailey:xb/incremental_read
Closed

Enable incremental read#2153
xanderbailey wants to merge 7 commits intoapache:mainfrom
xanderbailey:xb/incremental_read

Conversation

@xanderbailey
Copy link
Copy Markdown
Contributor

@xanderbailey xanderbailey commented Feb 18, 2026

Which issue does this PR close?

What changes are included in this PR?

Link to spark docs

This PR adds incremental snapshot scanning support to iceberg-rust, similar to the Java client's IncrementalDataTableScan. This feature allows reading only the data files that were added between two snapshots, which is essential for:

  • Change data capture (CDC) pipelines - difference but related
  • Incremental data processing
  • Efficiently reading only new data since a checkpoint

Core Iceberg Changes (crates/iceberg/src/scan/)

New API on TableScanBuilder:

// Scan changes between two snapshots (from exclusive, to inclusive)
table.scan()
    .from_snapshot_exclusive(from_id)
    .to_snapshot(to_id)
    .build()?;

// Scan changes with inclusive from
table.scan()
    .from_snapshot_inclusive(from_id)
    .to_snapshot(to_id)
    .build()?;

// Convenience methods (matching Java API)
table.scan().appends_after(from_id).build()?;
table.scan().appends_between(from_id, to_id).build()?;

Implementation details:

  • Added SnapshotRange struct to validate snapshot ancestry and track snapshot IDs in range
  • Modified ManifestFileContext to filter entries with status=ADDED and snapshot_id within range
  • Validates that only APPEND operations are in the snapshot range (matches Java behavior)
  • Returns clear error if from_snapshot is not an ancestor of to_snapshot

DataFusion Integration (crates/integrations/datafusion/)

New constructors on IcebergStaticTableProvider:

// Scan changes between two snapshots
let provider = IcebergStaticTableProvider::try_new_incremental(table, from_id, to_id).await?;

// Scan with inclusive from
let provider = IcebergStaticTableProvider::try_new_incremental_inclusive(table, from_id, to_id).await?;

// Scan all appends after a snapshot to current
let provider = IcebergStaticTableProvider::try_new_appends_after(table, from_id).await?;

// Register with DataFusion and query
ctx.register_table("changes", Arc::new(provider))?;
let df = ctx.sql("SELECT * FROM changes WHERE category = 'A'").await?;

Example Added (crates/examples/)

Added datafusion_incremental_read.rs example demonstrating:

  • Incremental reads between two snapshots
  • Using appends_after() for checkpoint-based processing
  • Combining incremental reads with SQL filters

Files Changed

File Changes
crates/iceberg/src/scan/mod.rs Added SnapshotRange, incremental scan methods on TableScanBuilder
crates/iceberg/src/scan/context.rs Added snapshot_range to contexts, manifest entry filtering
crates/integrations/datafusion/src/table/mod.rs Added incremental constructors to IcebergStaticTableProvider
crates/integrations/datafusion/src/physical_plan/scan.rs Updated IcebergTableScan and get_batch_stream()
crates/examples/src/datafusion_incremental_read.rs New example
crates/examples/Cargo.toml Added datafusion dependencies and example entry

Are these changes tested?

Yes

Core Iceberg Tests (crates/iceberg/src/scan/mod.rs)
DataFusion Integration Tests (crates/integrations/datafusion/src/table/mod.rs)

@xanderbailey xanderbailey marked this pull request as ready for review February 18, 2026 14:28
@github-actions
Copy link
Copy Markdown
Contributor

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions Bot added the stale label Mar 21, 2026
@github-actions
Copy link
Copy Markdown
Contributor

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions Bot closed this Mar 28, 2026
/// which is much more efficient than scanning the entire table when you only need
/// the new data.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to remove this if we don't want it, claude wrote it for me.

@mbutrovich
Copy link
Copy Markdown
Collaborator

I paired up with Claude to review for a while today, cross-referencing the Java behavior and spec. Suggestion 3 is the one I feel the least strongly about, and would benefit from another set of eyes.

Here are the compiled notes:

1. Separate scan type, don't bolt onto TableScanBuilder

The spec defines snapshots with parent-snapshot-id and operation (Spec: Snapshots). Incremental scanning isn't in the spec; it's a library concept built on these primitives. It operates on a range of snapshots (from/to/inclusive), which is fundamentally different from a point-in-time scan on a single snapshot. Mixing both into one builder creates an invalid state space.

The PR adds from_snapshot_id, from_snapshot_inclusive, and to_snapshot_id to TableScanBuilder (crates/iceberg/src/scan/mod.rs), mutually exclusive with snapshot_id, enforced at runtime in build(). It would be worth making illegal states unrepresentable at compile time instead.

One option: a parallel entry point on Table (crates/iceberg/src/table.rs) with a separate builder:

// crates/iceberg/src/table.rs
impl Table {
    pub fn incremental_append_scan(&self) -> IncrementalAppendScanBuilder { ... }
}

// crates/iceberg/src/scan/incremental.rs
pub struct IncrementalAppendScanBuilder<'a> {
    table: &'a Table,
    from_snapshot_id: i64,
    from_inclusive: bool,
    to_snapshot_id: Option<i64>,  // None = current snapshot
    // shared fields: column_names, filter, concurrency limits, etc.
}

Both builders produce the same TableScan (which already has plan_context: Option<PlanContext>); the difference is how PlanContext gets configured. Shared builder logic (column selection, filters, concurrency) can be extracted into a helper. TableScanBuilder stays unchanged. This also leaves room for IncrementalChangelogScanBuilder later.

Java went through this exact evolution. The original design (2020, 2562649b097a, iceberg#315) subclassed DataTableScan as IncrementalDataTableScan, which inherited inapplicable methods and had to throw UnsupportedOperationException everywhere (lines 40-61). Five PRs over 2022 redesigned it into a sibling hierarchy where IncrementalScan<T> and TableScan are peers under Scan:

Commit PR Change
beed94dc2134 iceberg#4580 IncrementalAppendScan interface
7f472ebbec19 iceberg#4744 BaseIncrementalAppendScan impl
c69a3dd6171e iceberg#4870 IncrementalScan<T> parent, IncrementalChangelogScan
40de4bc7dc12 iceberg#5382 BaseIncrementalScan extracted
80ec14ff363b iceberg#5577 Deprecated appendsBetween/appendsAfter on TableScan, removal in 2.0.0

Entry point moved from TableScan.appendsBetween() to Table.newIncrementalAppendScan() (api/src/main/java/org/apache/iceberg/Table.java:71-84).


2. Reuse ancestors_between

The spec defines snapshot ancestry via parent-snapshot-id (Spec: Snapshots). This codebase already has the traversal in crates/iceberg/src/util/snapshot.rs:51-61:

pub fn ancestors_between(
    table_metadata: &TableMetadataRef,
    latest_snapshot_id: i64,
    oldest_snapshot_id: Option<i64>,
) -> impl Iterator<Item = SnapshotRef> + Send

Returns (oldest, latest], which is already exclusive-start, inclusive-end: exactly the default incremental range.

SnapshotRange::build() reimplements this walk from scratch. It could use the existing utility instead:

let snapshots_in_range: Vec<SnapshotRef> = if from_inclusive {
    ancestors_between(&metadata, to_id, from_parent_id).collect()
} else {
    ancestors_between(&metadata, to_id, Some(from_id)).collect()
};

let snapshot_ids: HashSet<i64> = snapshots_in_range.iter()
    .filter(|s| s.summary().operation == Operation::Append)
    .map(|s| s.snapshot_id())
    .collect();

One thing to watch: ancestors_between doesn't validate that oldest_snapshot_id is actually in the chain. If it's not, you get the full chain to root. Worth verifying ancestry explicitly (check the walk terminated at the expected snapshot, not root).

Java equivalent: SnapshotUtil.ancestorsBetween (core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:212-229).


3. Skip non-APPEND snapshots, don't error

The spec defines four operation values: append, replace, overwrite, delete (Spec: Snapshot Summary). The spec doesn't define incremental scan semantics, so we have to decide what to do with non-APPEND snapshots in the range.

The PR returns ErrorKind::FeatureUnsupported for any non-APPEND snapshot. This seems too strict. A table that had a compaction (replace) in its history would break incremental append scans, even though the compaction added no logical data.

It would be better to silently skip non-APPEND snapshots. Only Operation::Append contributes new data files to an append scan:

  • Replace = compaction/rewrite, no new logical data
  • Delete = removes data
  • Overwrite = mixed add/remove, ambiguous for append-only
let append_snapshot_ids: HashSet<i64> = ancestors_between(...)
    .filter(|s| s.summary().operation == Operation::Append)
    .map(|s| s.snapshot_id())
    .collect();

Properly surfacing Overwrite changes would be the domain of an IncrementalChangelogScan, which could be a separate scan type down the road.

Java's BaseIncrementalAppendScan (core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java:105-117) silently skips all non-APPEND snapshots. The deprecated IncrementalDataTableScan errored on OVERWRITE (lines 138-143) but that behavior was not carried forward.


4. Consider manifest-level filtering

The spec says each manifest list entry tracks the snapshot that created it via added-snapshot-id (Spec: Manifest Lists). A manifest whose added-snapshot-id is outside the scan range can't contain relevant ADDED entries. Skipping it avoids the I/O and parse cost entirely.

The PR only filters at the entry level (crates/iceberg/src/scan/context.rs, fetch_manifest_and_stream_manifest_entries). Every manifest in the manifest list is fetched and parsed, including inherited ones that can't contain relevant entries. For tables with many manifests, this matters.

It might be worth adding a check in PlanContext::build_manifest_file_contexts (crates/iceberg/src/scan/context.rs:195-254). ManifestFile already has the field (crates/iceberg/src/spec/manifest_list.rs):

// Inside the loop over manifest_files:
if let Some(ref snapshot_range) = self.snapshot_range {
    if manifest_file.content == ManifestContentType::Deletes {
        continue;
    }
    if !snapshot_range.contains(manifest_file.added_snapshot_id) {
        continue;
    }
}

Keep the entry-level filter too; a manifest can contain entries from multiple snapshots. But the manifest-level check avoids unnecessary I/O.

Java applies both: .filterManifests(m -> snapshotIds.contains(m.snapshotId())) (line 70) then .filterManifestEntries(...) (line 81), plus .ignoreDeleted(). See core/src/main/java/org/apache/iceberg/ManifestGroup.java lines 121, 132, 311-320, 375.


5. DataFusion: consider a scan range enum

The PR threads from_snapshot_id: Option<i64> and from_snapshot_inclusive: bool through IcebergTableScan::new(), get_batch_stream(), IcebergStaticTableProvider, and IcebergTableProvider (crates/integrations/datafusion/src/physical_plan/scan.rs:60-84). This triggers #[allow(clippy::too_many_arguments)].

An enum could clean this up:

pub(crate) enum ScanRange {
    CurrentSnapshot,
    PointInTime(i64),
    Incremental { from: i64, to: Option<i64>, from_inclusive: bool },
}

This would replace three parameters with one across all four structs, and would collapse IcebergStaticTableProvider's three near-identical constructors (try_new_incremental, try_new_incremental_inclusive, try_new_appends_after) into one.


6. Tests don't verify filtering behavior

Tautological assertions. These tests assert result.is_ok() || result.is_err(), which is always true:

  • test_appends_after_convenience_method (crates/iceberg/src/scan/mod.rs)
  • test_appends_between_convenience_method
  • test_incremental_scan_from_snapshot_inclusive

Swallowed errors. DataFusion tests use if let Ok(provider) = provider { ... }, passing silently when construction fails:

  • test_static_provider_incremental_creates_scan (crates/integrations/datafusion/src/table/mod.rs)
  • test_static_provider_incremental_inclusive
  • test_static_provider_appends_after

No end-to-end test. No test checks that an incremental scan actually returns only files from the expected snapshot range.

Something along the lines of test_plan_files_no_deletions (crates/iceberg/src/scan/mod.rs:1278) would work well here:

  1. Extend TableTestFixture with multiple APPEND snapshots and manifest entries with explicit snapshot_id and ManifestStatus values
  2. Run an incremental scan between two snapshots
  3. Collect FileScanTasks and assert only files from the expected snapshots come back
  4. Test edge cases: inclusive vs exclusive, same from/to, non-APPEND snapshots skipped

TableTestFixture::new_with_deep_history() already creates 5 chained snapshots (S1 through S5), which could be extended with manifest files and operation types in the test metadata JSON.


7. Minor

  • Example file (crates/examples/src/datafusion_incremental_read.rs): author says "claude wrote it for me." Feels verbose; could probably be trimmed or dropped if the API doc comments cover the usage well enough.
  • Comment style: /// Optional snapshot range for incremental scans explains what, not why. Might want to match codebase convention.
  • Unrelated fix: context.rs changes self.case_sensitive to case_sensitive after destructuring. Correct but unrelated; might be better as a separate commit.

@xanderbailey
Copy link
Copy Markdown
Contributor Author

xanderbailey commented Apr 15, 2026

Thanks for the review Matt:

  1. IncrementalAppendScanBuilder agree here, this is nicer
  2. seems reasonable to me
  3. ignoring replace seems okay since I'd still be looking at the right "view" of the data. Ignoring deletes seems much more weird to me (and something we've had to implement around in spark) because you're ignoring deletes so the "view" of the data isn't accurate but maybe the point is that changelog should deal with these semantics? and the incremental scan should just tell me "What data was appended?". Maybe we should also be consistent with the java implementation here?

@xanderbailey
Copy link
Copy Markdown
Contributor Author

Moving conversation here #2337 since I can't seem to re-open this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support incremental reads between snapshot-ids

2 participants