diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index b6c26ea00..06943340d 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -152,6 +152,16 @@ Result> Table::NewScan() const { return TableScanBuilder::Make(metadata_, io_); } +Result>> +Table::NewIncrementalAppendScan() const { + return IncrementalScanBuilder::Make(metadata_, io_); +} + +Result>> +Table::NewIncrementalChangelogScan() const { + return IncrementalScanBuilder::Make(metadata_, io_); +} + Result> Table::NewTransaction() { // Create a brand new transaction object for the table. Users are expected to commit the // transaction manually. diff --git a/src/iceberg/table.h b/src/iceberg/table.h index 1f3135dd7..c232b1e07 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -129,6 +129,14 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// filter data. virtual Result> NewScan() const; + /// \brief Create a new incremental append scan builder for this table + virtual Result>> + NewIncrementalAppendScan() const; + + /// \brief Create a new incremental changelog scan builder for this table + virtual Result>> + NewIncrementalChangelogScan() const; + /// \brief Create a new Transaction to commit multiple table operations at once. virtual Result> NewTransaction(); diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 4992b18d5..de1558aa2 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -313,29 +313,6 @@ TableScanBuilder& TableScanBuilder::AsOfTime(int64_t timestamp_millis) { return UseSnapshot(snapshot_id); } -TableScanBuilder& TableScanBuilder::FromSnapshot( - [[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive) { - return AddError(NotImplemented("Incremental scan is not implemented")); -} - -TableScanBuilder& TableScanBuilder::FromSnapshot([[maybe_unused]] const std::string& ref, - [[maybe_unused]] bool inclusive) { - return AddError(NotImplemented("Incremental scan is not implemented")); -} - -TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] int64_t to_snapshot_id) { - return AddError(NotImplemented("Incremental scan is not implemented")); -} - -TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] const std::string& ref) { - return AddError(NotImplemented("Incremental scan is not implemented")); -} - -TableScanBuilder& TableScanBuilder::UseBranch(const std::string& branch) { - context_.branch = branch; - return *this; -} - Result>> TableScanBuilder::ResolveSnapshotSchema() { if (snapshot_schema_ == nullptr) { @@ -352,18 +329,10 @@ TableScanBuilder::ResolveSnapshotSchema() { return snapshot_schema_; } -bool TableScanBuilder::IsIncrementalScan() const { - return context_.from_snapshot_id.has_value() || context_.to_snapshot_id.has_value(); -} - -Result> TableScanBuilder::Build() { +Result> TableScanBuilder::Build() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); ICEBERG_RETURN_UNEXPECTED(context_.Validate()); - if (IsIncrementalScan()) { - return NotImplemented("Incremental scan is not yet implemented"); - } - ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSnapshotSchema()); return DataTableScan::Make(metadata_, schema.get(), io_, std::move(context_)); } @@ -442,12 +411,6 @@ Result> DataTableScan::Make( std::move(metadata), std::move(schema), std::move(io), std::move(context))); } -DataTableScan::DataTableScan(std::shared_ptr metadata, - std::shared_ptr schema, std::shared_ptr io, - internal::TableScanContext context) - : TableScan(std::move(metadata), std::move(schema), std::move(io), - std::move(context)) {} - Result>> DataTableScan::PlanFiles() const { ICEBERG_ASSIGN_OR_RAISE(auto snapshot, this->snapshot()); if (!snapshot) { @@ -477,4 +440,88 @@ Result>> DataTableScan::PlanFiles() co return manifest_group->PlanFiles(); } +// BaseIncrementalScanBuilder implementation + +BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::FromSnapshot( + [[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive) { + return AddError(NotImplemented("Incremental scan is not implemented")); +} + +BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::FromSnapshot( + [[maybe_unused]] const std::string& ref, [[maybe_unused]] bool inclusive) { + return AddError(NotImplemented("Incremental scan is not implemented")); +} + +BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::ToSnapshot( + [[maybe_unused]] int64_t to_snapshot_id) { + return AddError(NotImplemented("Incremental scan is not implemented")); +} + +BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::ToSnapshot( + [[maybe_unused]] const std::string& ref) { + return AddError(NotImplemented("Incremental scan is not implemented")); +} + +BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::UseBranch( + const std::string& branch) { + context_.branch = branch; + return *this; +} + +template +Result>> +IncrementalScanBuilder::Make(std::shared_ptr metadata, + std::shared_ptr io) { + ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null"); + ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null"); + return std::unique_ptr>( + new IncrementalScanBuilder(std::move(metadata), std::move(io))); +} + +template +Result> IncrementalScanBuilder::Build() { + return NotImplemented("IncrementalAppendScanBuilder is not implemented"); +} + +template class IncrementalScanBuilder; +template class IncrementalScanBuilder; + +template +Result>> +IncrementalScan::PlanFiles() const { + return NotImplemented("Incremental scan is not implemented"); +} + +// IncrementalAppendScan implementation + +Result> IncrementalAppendScan::Make( + [[maybe_unused]] std::shared_ptr metadata, + [[maybe_unused]] std::shared_ptr schema, + [[maybe_unused]] std::shared_ptr io, + [[maybe_unused]] internal::TableScanContext context) { + return NotImplemented("IncrementalAppendScan is not implemented"); +} + +Result>> IncrementalAppendScan::PlanFiles( + std::optional from_snapshot_id_exclusive, + int64_t to_snapshot_id_inclusive) const { + return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented"); +} + +// IncrementalChangelogScan implementation + +Result> IncrementalChangelogScan::Make( + [[maybe_unused]] std::shared_ptr metadata, + [[maybe_unused]] std::shared_ptr schema, + [[maybe_unused]] std::shared_ptr io, + [[maybe_unused]] internal::TableScanContext context) { + return NotImplemented("IncrementalChangelogScan is not implemented"); +} + +Result>> +IncrementalChangelogScan::PlanFiles(std::optional from_snapshot_id_exclusive, + int64_t to_snapshot_id_inclusive) const { + return NotImplemented("IncrementalChangelogScan::PlanFiles is not implemented"); +} + } // namespace iceberg diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h index aa225ff81..5a9adcc06 100644 --- a/src/iceberg/table_scan.h +++ b/src/iceberg/table_scan.h @@ -39,6 +39,7 @@ class ICEBERG_EXPORT ScanTask { public: enum class Kind : uint8_t { kFileScanTask, + kChangelogScanTask, }; /// \brief The kind of scan task. @@ -100,6 +101,16 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask { std::shared_ptr residual_filter_; }; +/// \brief A scan task for reading changelog entries between snapshots. +class ICEBERG_EXPORT ChangelogScanTask : public ScanTask { + public: + Kind kind() const override { return Kind::kChangelogScanTask; } + // TODO(): Return actual values once member fields are implemented + int64_t size_bytes() const override { return 0; } + int32_t files_count() const override { return 0; } + int64_t estimated_row_count() const override { return 0; } +}; + namespace internal { // Internal table scan context used by different scan implementations. @@ -204,63 +215,16 @@ class ICEBERG_EXPORT TableScanBuilder : public ErrorCollector { /// travel is attempted on a tag TableScanBuilder& AsOfTime(int64_t timestamp_millis); - /// \brief Instructs this scan to look for changes starting from a particular snapshot. - /// - /// If the start snapshot is not configured, it defaults to the oldest ancestor of the - /// end snapshot (inclusive). - /// - /// \param from_snapshot_id the start snapshot ID - /// \param inclusive whether the start snapshot is inclusive, default is false - /// \note InvalidArgument will be returned if the start snapshot is not an ancestor of - /// the end snapshot - TableScanBuilder& FromSnapshot(int64_t from_snapshot_id, bool inclusive = false); - - /// \brief Instructs this scan to look for changes starting from a particular snapshot. - /// - /// If the start snapshot is not configured, it defaults to the oldest ancestor of the - /// end snapshot (inclusive). - /// - /// \param ref the start ref name that points to a particular snapshot ID - /// \param inclusive whether the start snapshot is inclusive, default is false - /// \note InvalidArgument will be returned if the start snapshot is not an ancestor of - /// the end snapshot - TableScanBuilder& FromSnapshot(const std::string& ref, bool inclusive = false); - - /// \brief Instructs this scan to look for changes up to a particular snapshot - /// (inclusive). - /// - /// If the end snapshot is not configured, it defaults to the current table snapshot - /// (inclusive). - /// - /// \param to_snapshot_id the end snapshot ID (inclusive) - TableScanBuilder& ToSnapshot(int64_t to_snapshot_id); - - /// \brief Instructs this scan to look for changes up to a particular snapshot ref - /// (inclusive). - /// - /// If the end snapshot is not configured, it defaults to the current table snapshot - /// (inclusive). - /// - /// \param ref the end snapshot Ref (inclusive) - TableScanBuilder& ToSnapshot(const std::string& ref); - - /// \brief Use the specified branch - /// \param branch the branch name - TableScanBuilder& UseBranch(const std::string& branch); - /// \brief Builds and returns a TableScan instance. /// \return A Result containing the TableScan or an error. - Result> Build(); + Result> Build(); - private: + protected: TableScanBuilder(std::shared_ptr metadata, std::shared_ptr io); // Return the schema bound to the specified snapshot. Result>> ResolveSnapshotSchema(); - // Return whether current configuration indicates an incremental scan mode. - bool IsIncrementalScan() const; - std::shared_ptr metadata_; std::shared_ptr io_; internal::TableScanContext context_; @@ -293,10 +257,6 @@ class ICEBERG_EXPORT TableScan { /// \brief Returns whether this scan is case-sensitive. bool is_case_sensitive() const; - /// \brief Plans the scan tasks by resolving manifests and data files. - /// \return A Result containing scan tasks or an error. - virtual Result>> PlanFiles() const = 0; - protected: TableScan(std::shared_ptr metadata, std::shared_ptr schema, std::shared_ptr io, internal::TableScanContext context); @@ -316,6 +276,8 @@ class ICEBERG_EXPORT TableScan { /// \brief A scan that reads data files and applies delete files to filter rows. class ICEBERG_EXPORT DataTableScan : public TableScan { public: + ~DataTableScan() override = default; + /// \brief Constructs a DataTableScan instance. static Result> Make( std::shared_ptr metadata, std::shared_ptr schema, @@ -323,11 +285,133 @@ class ICEBERG_EXPORT DataTableScan : public TableScan { /// \brief Plans the scan tasks by resolving manifests and data files. /// \return A Result containing scan tasks or an error. - Result>> PlanFiles() const override; + Result>> PlanFiles() const; + + protected: + using TableScan::TableScan; +}; + +/// \brief Base class for incremental scan builders with common functionality. +class ICEBERG_EXPORT BaseIncrementalScanBuilder : public TableScanBuilder { + public: + /// \brief Instructs this scan to look for changes starting from a particular snapshot. + /// + /// If the start snapshot is not configured, it defaults to the oldest ancestor of the + /// end snapshot (inclusive). + /// + /// \param from_snapshot_id the start snapshot ID + /// \param inclusive whether the start snapshot is inclusive, default is false + /// \note InvalidArgument will be returned if the start snapshot is not an ancestor of + /// the end snapshot + BaseIncrementalScanBuilder& FromSnapshot(int64_t from_snapshot_id, + bool inclusive = false); + + /// \brief Instructs this scan to look for changes starting from a particular snapshot. + /// + /// If the start snapshot is not configured, it defaults to the oldest ancestor of the + /// end snapshot (inclusive). + /// + /// \param ref the start ref name that points to a particular snapshot ID + /// \param inclusive whether the start snapshot is inclusive, default is false + /// \note InvalidArgument will be returned if the start snapshot is not an ancestor of + /// the end snapshot + BaseIncrementalScanBuilder& FromSnapshot(const std::string& ref, + bool inclusive = false); + + /// \brief Instructs this scan to look for changes up to a particular snapshot + /// (inclusive). + /// + /// If the end snapshot is not configured, it defaults to the current table snapshot + /// (inclusive). + /// + /// \param to_snapshot_id the end snapshot ID (inclusive) + BaseIncrementalScanBuilder& ToSnapshot(int64_t to_snapshot_id); + + /// \brief Instructs this scan to look for changes up to a particular snapshot ref + /// (inclusive). + /// + /// If the end snapshot is not configured, it defaults to the current table snapshot + /// (inclusive). + /// + /// \param ref the end snapshot Ref (inclusive) + BaseIncrementalScanBuilder& ToSnapshot(const std::string& ref); + + /// \brief Use the specified branch + /// \param branch the branch name + BaseIncrementalScanBuilder& UseBranch(const std::string& branch); + + protected: + using TableScanBuilder::TableScanBuilder; +}; + +/// \brief Builder class for creating incremental scans that read changes between +/// snapshots. +template +class ICEBERG_EXPORT IncrementalScanBuilder : public BaseIncrementalScanBuilder { + public: + static Result>> Make( + std::shared_ptr metadata, std::shared_ptr io); + + Result> Build(); + + protected: + using BaseIncrementalScanBuilder::BaseIncrementalScanBuilder; +}; + +/// \brief A base template class for incremental scans that read changes between +/// snapshots, and return scan tasks of the specified type. +template +class ICEBERG_EXPORT IncrementalScan : public TableScan { + public: + ~IncrementalScan() override = default; + + /// \brief Plans the scan tasks by resolving manifests and data files. + /// \return A Result containing scan tasks or an error. + Result>> PlanFiles() const; protected: - DataTableScan(std::shared_ptr metadata, std::shared_ptr schema, - std::shared_ptr io, internal::TableScanContext context); + virtual Result>> PlanFiles( + std::optional from_snapshot_id_exclusive, + int64_t to_snapshot_id_inclusive) const = 0; + + using TableScan::TableScan; +}; + +/// \brief A scan that reads data files added between snapshots (incremental appends). +class ICEBERG_EXPORT IncrementalAppendScan : public IncrementalScan { + public: + /// \brief Constructs an IncrementalAppendScan instance. + static Result> Make( + std::shared_ptr metadata, std::shared_ptr schema, + std::shared_ptr io, internal::TableScanContext context); + + ~IncrementalAppendScan() override = default; + + protected: + Result>> PlanFiles( + std::optional from_snapshot_id_exclusive, + int64_t to_snapshot_id_inclusive) const override; + + using IncrementalScan::IncrementalScan; +}; + +/// \brief A scan that reads changelog entries between snapshots. +class ICEBERG_EXPORT IncrementalChangelogScan + : public IncrementalScan { + public: + /// \brief Constructs an IncrementalChangelogScan instance. + static Result> Make( + std::shared_ptr metadata, std::shared_ptr schema, + std::shared_ptr io, internal::TableScanContext context); + + ~IncrementalChangelogScan() override = default; + + protected: + Result>> PlanFiles( + std::optional from_snapshot_id_exclusive, + int64_t to_snapshot_id_inclusive) const override; + + using IncrementalScan::IncrementalScan; }; } // namespace iceberg diff --git a/src/iceberg/test/table_scan_test.cc b/src/iceberg/test/table_scan_test.cc index 2eed86732..be6c20fda 100644 --- a/src/iceberg/test/table_scan_test.cc +++ b/src/iceberg/test/table_scan_test.cc @@ -279,7 +279,6 @@ TEST_P(TableScanTest, TableScanBuilderOptions) { auto filter = Expressions::Equal("id", Literal::Int(42)); constexpr int64_t kMinRows = 1000; constexpr int64_t kSnapshotId = 1000L; - const std::string branch_name = "test-branch"; ICEBERG_UNWRAP_OR_FAIL(auto builder2, TableScanBuilder::Make(table_metadata_, file_io_)); @@ -292,7 +291,6 @@ TEST_P(TableScanTest, TableScanBuilderOptions) { .IgnoreResiduals() .MinRowsRequested(kMinRows) .UseSnapshot(kSnapshotId) - .UseBranch(branch_name) .Build()); // Verify all options were set correctly @@ -313,7 +311,6 @@ TEST_P(TableScanTest, TableScanBuilderOptions) { EXPECT_EQ(context.min_rows_requested.value(), kMinRows); EXPECT_TRUE(context.snapshot_id.has_value()); EXPECT_EQ(context.snapshot_id.value(), kSnapshotId); - EXPECT_EQ(context.branch, branch_name); // Test UseRef separately ICEBERG_UNWRAP_OR_FAIL(auto builder3, diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index e97de0ac5..34b22e29c 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -140,8 +140,13 @@ class ResidualEvaluator; class StrictMetricsEvaluator; /// \brief Scan. +class ChangelogScanTask; class DataTableScan; class FileScanTask; +class IncrementalAppendScan; +class IncrementalChangelogScan; +template +class IncrementalScanBuilder; class ScanTask; class TableScan; class TableScanBuilder;