Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ Result<std::unique_ptr<TableScanBuilder>> Table::NewScan() const {
return TableScanBuilder::Make(metadata_, io_);
}

Result<std::unique_ptr<IncrementalScanBuilder<IncrementalAppendScan>>>
Table::NewIncrementalAppendScan() const {
return IncrementalScanBuilder<IncrementalAppendScan>::Make(metadata_, io_);
}

Result<std::unique_ptr<IncrementalScanBuilder<IncrementalChangelogScan>>>
Table::NewIncrementalChangelogScan() const {
return IncrementalScanBuilder<IncrementalChangelogScan>::Make(metadata_, io_);
}

Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
// Create a brand new transaction object for the table. Users are expected to commit the
// transaction manually.
Expand Down
8 changes: 8 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,14 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// filter data.
virtual Result<std::unique_ptr<TableScanBuilder>> NewScan() const;

/// \brief Create a new incremental append scan builder for this table
virtual Result<std::unique_ptr<IncrementalScanBuilder<IncrementalAppendScan>>>
NewIncrementalAppendScan() const;

/// \brief Create a new incremental changelog scan builder for this table
virtual Result<std::unique_ptr<IncrementalScanBuilder<IncrementalChangelogScan>>>
NewIncrementalChangelogScan() const;

/// \brief Create a new Transaction to commit multiple table operations at once.
virtual Result<std::shared_ptr<Transaction>> NewTransaction();

Expand Down
123 changes: 85 additions & 38 deletions src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,29 +313,6 @@ TableScanBuilder& TableScanBuilder::AsOfTime(int64_t timestamp_millis) {
return UseSnapshot(snapshot_id);
}

TableScanBuilder& TableScanBuilder::FromSnapshot(
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive) {
return AddError(NotImplemented("Incremental scan is not implemented"));
}

TableScanBuilder& TableScanBuilder::FromSnapshot([[maybe_unused]] const std::string& ref,
[[maybe_unused]] bool inclusive) {
return AddError(NotImplemented("Incremental scan is not implemented"));
}

TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] int64_t to_snapshot_id) {
return AddError(NotImplemented("Incremental scan is not implemented"));
}

TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] const std::string& ref) {
return AddError(NotImplemented("Incremental scan is not implemented"));
}

TableScanBuilder& TableScanBuilder::UseBranch(const std::string& branch) {
context_.branch = branch;
return *this;
}

Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
TableScanBuilder::ResolveSnapshotSchema() {
if (snapshot_schema_ == nullptr) {
Expand All @@ -352,18 +329,10 @@ TableScanBuilder::ResolveSnapshotSchema() {
return snapshot_schema_;
}

bool TableScanBuilder::IsIncrementalScan() const {
return context_.from_snapshot_id.has_value() || context_.to_snapshot_id.has_value();
}

Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
Result<std::unique_ptr<DataTableScan>> TableScanBuilder::Build() {
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
ICEBERG_RETURN_UNEXPECTED(context_.Validate());

if (IsIncrementalScan()) {
return NotImplemented("Incremental scan is not yet implemented");
}

ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSnapshotSchema());
return DataTableScan::Make(metadata_, schema.get(), io_, std::move(context_));
}
Expand Down Expand Up @@ -442,12 +411,6 @@ Result<std::unique_ptr<DataTableScan>> DataTableScan::Make(
std::move(metadata), std::move(schema), std::move(io), std::move(context)));
}

DataTableScan::DataTableScan(std::shared_ptr<TableMetadata> metadata,
std::shared_ptr<Schema> schema, std::shared_ptr<FileIO> io,
internal::TableScanContext context)
: TableScan(std::move(metadata), std::move(schema), std::move(io),
std::move(context)) {}

Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() const {
ICEBERG_ASSIGN_OR_RAISE(auto snapshot, this->snapshot());
if (!snapshot) {
Expand Down Expand Up @@ -477,4 +440,88 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() co
return manifest_group->PlanFiles();
}

// BaseIncrementalScanBuilder implementation

BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::FromSnapshot(
[[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive) {
return AddError(NotImplemented("Incremental scan is not implemented"));
}

BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::FromSnapshot(
[[maybe_unused]] const std::string& ref, [[maybe_unused]] bool inclusive) {
return AddError(NotImplemented("Incremental scan is not implemented"));
}

BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::ToSnapshot(
[[maybe_unused]] int64_t to_snapshot_id) {
return AddError(NotImplemented("Incremental scan is not implemented"));
}

BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::ToSnapshot(
[[maybe_unused]] const std::string& ref) {
return AddError(NotImplemented("Incremental scan is not implemented"));
}

BaseIncrementalScanBuilder& BaseIncrementalScanBuilder::UseBranch(
const std::string& branch) {
context_.branch = branch;
return *this;
}

template <typename ScanType>
Result<std::unique_ptr<IncrementalScanBuilder<ScanType>>>
IncrementalScanBuilder<ScanType>::Make(std::shared_ptr<TableMetadata> metadata,
std::shared_ptr<FileIO> io) {
ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
return std::unique_ptr<IncrementalScanBuilder<ScanType>>(
new IncrementalScanBuilder<ScanType>(std::move(metadata), std::move(io)));
}

template <typename ScanType>
Result<std::unique_ptr<ScanType>> IncrementalScanBuilder<ScanType>::Build() {
return NotImplemented("IncrementalAppendScanBuilder is not implemented");
}

template class IncrementalScanBuilder<IncrementalAppendScan>;
template class IncrementalScanBuilder<IncrementalChangelogScan>;

template <typename ScanTaskType>
Result<std::vector<std::shared_ptr<ScanTaskType>>>
IncrementalScan<ScanTaskType>::PlanFiles() const {
return NotImplemented("Incremental scan is not implemented");
}

// IncrementalAppendScan implementation

Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScan::Make(
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
[[maybe_unused]] std::shared_ptr<Schema> schema,
[[maybe_unused]] std::shared_ptr<FileIO> io,
[[maybe_unused]] internal::TableScanContext context) {
return NotImplemented("IncrementalAppendScan is not implemented");
}

Result<std::vector<std::shared_ptr<FileScanTask>>> IncrementalAppendScan::PlanFiles(
std::optional<int64_t> from_snapshot_id_exclusive,
int64_t to_snapshot_id_inclusive) const {
return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented");
}

// IncrementalChangelogScan implementation

Result<std::unique_ptr<IncrementalChangelogScan>> IncrementalChangelogScan::Make(
[[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
[[maybe_unused]] std::shared_ptr<Schema> schema,
[[maybe_unused]] std::shared_ptr<FileIO> io,
[[maybe_unused]] internal::TableScanContext context) {
return NotImplemented("IncrementalChangelogScan is not implemented");
}

Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_exclusive,
int64_t to_snapshot_id_inclusive) const {
return NotImplemented("IncrementalChangelogScan::PlanFiles is not implemented");
}

} // namespace iceberg
Loading
Loading