Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 102 additions & 4 deletions src/iceberg/data/data_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,118 @@

#include "iceberg/data/data_writer.h"

#include "iceberg/file_writer.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/util/macros.h"

namespace iceberg {

class DataWriter::Impl {
public:
static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
WriterOptions writer_options;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use aggregate initialization for writer_options

writer_options.path = options.path;
writer_options.schema = options.schema;
writer_options.io = options.io;
writer_options.properties = WriterProperties::FromMap(options.properties);

ICEBERG_ASSIGN_OR_RAISE(auto writer,
WriterFactoryRegistry::Open(options.format, writer_options));

return std::unique_ptr<Impl>(new Impl(std::move(options), std::move(writer)));
}

Status Write(ArrowArray* data) {
ICEBERG_PRECHECK(writer_, "Writer not initialized");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this check ever fail? If not, should we remove the check or use ICEBERG_DCHECK instead? Same question for below.

return writer_->Write(data);
}

Result<int64_t> Length() const {
ICEBERG_PRECHECK(writer_, "Writer not initialized");
return writer_->length();
}

Status Close() {
ICEBERG_PRECHECK(writer_, "Writer not initialized");
if (closed_) {
// Idempotent: no-op if already closed
return {};
}
Comment on lines 55 to 58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see a case for making close idempotent, is there any strong reason why we want to return this error instead of no op for example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

ICEBERG_RETURN_UNEXPECTED(writer_->Close());
closed_ = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this class address thread safety?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! I've added explicit documentation that this class is not thread-safe:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think a single writer (or reader) should support thread safety so it is fine not to add comment like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wgtmac out of curiosity for my own knowledge, what guarantees that a single writer/reader will be using the class?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These file writers are supposed to be used by a single write task, which for example can be an unit of a table sink operator in a sql job plan. Usually the writer is responsible for partitioned (and sometimes sorted) data chunks.

return {};
}

Result<FileWriter::WriteResult> Metadata() {
ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer");
ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");

We should return invalid state instead of invalid argument in this case.


ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
auto split_offsets = writer_->split_offsets();

auto data_file = std::make_shared<DataFile>();
data_file->content = DataFile::Content::kData;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use aggregate initialization

data_file->file_path = options_.path;
data_file->file_format = options_.format;
data_file->partition = options_.partition;
data_file->record_count = metrics.row_count.value_or(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
data_file->record_count = metrics.row_count.value_or(0);
data_file->record_count = metrics.row_count.value_or(-1);

Java impl uses -1 when row count is unavailable.

data_file->file_size_in_bytes = length;
data_file->sort_order_id = options_.sort_order_id;
data_file->split_offsets = std::move(split_offsets);

// Convert metrics maps from unordered_map to map
for (const auto& [col_id, size] : metrics.column_sizes) {
data_file->column_sizes[col_id] = size;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it makes sense to change DataFile and Metrics classes to use std::map or std::unordered_map consistently so we don't need to use a for-loop here?

cc @zhjwpku

}
for (const auto& [col_id, count] : metrics.value_counts) {
data_file->value_counts[col_id] = count;
}
for (const auto& [col_id, count] : metrics.null_value_counts) {
data_file->null_value_counts[col_id] = count;
}
for (const auto& [col_id, count] : metrics.nan_value_counts) {
data_file->nan_value_counts[col_id] = count;
}

// Serialize literal bounds to binary format
for (const auto& [col_id, literal] : metrics.lower_bounds) {
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
data_file->lower_bounds[col_id] = std::move(serialized);
}
for (const auto& [col_id, literal] : metrics.upper_bounds) {
ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
data_file->upper_bounds[col_id] = std::move(serialized);
}

FileWriter::WriteResult result;
result.data_files.push_back(std::move(data_file));
return result;
}

private:
Impl(DataWriterOptions options, std::unique_ptr<Writer> writer)
: options_(std::move(options)), writer_(std::move(writer)) {}

DataWriterOptions options_;
std::unique_ptr<Writer> writer_;
bool closed_ = false;
};

DataWriter::DataWriter(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}

DataWriter::~DataWriter() = default;

Status DataWriter::Write(ArrowArray* data) { return NotImplemented(""); }
Result<std::unique_ptr<DataWriter>> DataWriter::Make(const DataWriterOptions& options) {
ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
return std::unique_ptr<DataWriter>(new DataWriter(std::move(impl)));
}

Status DataWriter::Write(ArrowArray* data) { return impl_->Write(data); }

Result<int64_t> DataWriter::Length() const { return NotImplemented(""); }
Result<int64_t> DataWriter::Length() const { return impl_->Length(); }

Status DataWriter::Close() { return NotImplemented(""); }
Status DataWriter::Close() { return impl_->Close(); }

Result<FileWriter::WriteResult> DataWriter::Metadata() { return NotImplemented(""); }
Result<FileWriter::WriteResult> DataWriter::Metadata() { return impl_->Metadata(); }

} // namespace iceberg
8 changes: 8 additions & 0 deletions src/iceberg/data/data_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,16 @@ struct ICEBERG_EXPORT DataWriterOptions {
};

/// \brief Writer for Iceberg data files.
///
/// This class is not thread-safe. Concurrent calls to Write(), Close(), or Metadata()
/// from multiple threads may result in undefined behavior.
class ICEBERG_EXPORT DataWriter : public FileWriter {
public:
~DataWriter() override;

/// \brief Create a new DataWriter instance.
static Result<std::unique_ptr<DataWriter>> Make(const DataWriterOptions& options);

Status Write(ArrowArray* data) override;
Result<int64_t> Length() const override;
Status Close() override;
Expand All @@ -63,6 +69,8 @@ class ICEBERG_EXPORT DataWriter : public FileWriter {
private:
class Impl;
std::unique_ptr<Impl> impl_;

explicit DataWriter(std::unique_ptr<Impl> impl);
};

} // namespace iceberg
Loading
Loading