Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions include/executor/vectorized_operator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class VectorizedSeqScanOperator : public VectorizedOperator {
size_t num_threads_ = 1;
std::vector<std::unique_ptr<VectorBatch>> parallel_results_;
size_t parallel_idx_ = 0;
std::vector<size_t> required_col_indices_;
executor::Schema reduced_schema_;

public:
VectorizedSeqScanOperator(std::string table_name, std::shared_ptr<storage::ColumnarTable> table,
Expand All @@ -94,12 +96,26 @@ class VectorizedSeqScanOperator : public VectorizedOperator {
return next_batch_parallel(out_batch);
}

void set_required_columns(std::vector<size_t> col_indices, executor::Schema reduced_schema) {
required_col_indices_ = std::move(col_indices);
reduced_schema_ = std::move(reduced_schema);
}

private:
bool next_batch_sequential(VectorBatch& out_batch) {
if (current_row_ >= table_->row_count()) {
return false;
}

if (!required_col_indices_.empty()) {
out_batch.init_from_schema(reduced_schema_);
if (table_->read_batch(current_row_, batch_size_, out_batch, required_col_indices_)) {
current_row_ += out_batch.row_count();
return true;
}
return false;
}

if (table_->read_batch(current_row_, batch_size_, out_batch)) {
current_row_ += out_batch.row_count();
return true;
Expand Down Expand Up @@ -128,7 +144,8 @@ class VectorizedSeqScanOperator : public VectorizedOperator {
size_t end = std::min(start + range_size, total_rows);
current_row_ = end;

auto batch = VectorBatch::create(output_schema_);
auto batch = VectorBatch::create(required_col_indices_.empty() ? output_schema_
: reduced_schema_);
parallel_results_.push_back(std::move(batch));
}

Expand All @@ -139,10 +156,17 @@ class VectorizedSeqScanOperator : public VectorizedOperator {
parallel_results_[t]->set_row_count(0);
continue;
}
thread_pool_->submit([this, t, start, rows_to_read]() {
table_->read_batch(start, static_cast<uint32_t>(rows_to_read),
*parallel_results_[t]);
});
if (!required_col_indices_.empty()) {
thread_pool_->submit([this, t, start, rows_to_read]() {
table_->read_batch(start, static_cast<uint32_t>(rows_to_read),
*parallel_results_[t], required_col_indices_);
});
} else {
thread_pool_->submit([this, t, start, rows_to_read]() {
table_->read_batch(start, static_cast<uint32_t>(rows_to_read),
*parallel_results_[t]);
});
}
}

thread_pool_->wait();
Expand Down
10 changes: 10 additions & 0 deletions include/storage/columnar_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ class ColumnarTable {
*/
bool read_batch(uint64_t start_row, uint32_t batch_size, executor::VectorBatch& out_batch);

/**
* @brief Load a batch of data for only the specified columns
* @param start_row Starting row index
* @param batch_size Maximum rows to read
* @param out_batch Output batch (pre-initialized with required schema)
* @param col_indices Columns to read (indices into table's schema)
*/
bool read_batch(uint64_t start_row, uint32_t batch_size, executor::VectorBatch& out_batch,
const std::vector<size_t>& col_indices);

/**
* @brief Append a batch of data to the table
*/
Expand Down
23 changes: 23 additions & 0 deletions src/executor/query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,8 @@ std::unique_ptr<VectorizedOperator> QueryExecutor::build_vectorized_plan(
auto thread_pool = std::make_shared<executor::ThreadPool>(std::thread::hardware_concurrency());
std::unique_ptr<VectorizedOperator> current_root =
std::make_unique<VectorizedSeqScanOperator>(base_table_name, col_table, thread_pool);
VectorizedSeqScanOperator* base_scan =
static_cast<VectorizedSeqScanOperator*>(current_root.get());

// Track estimated output rows for join reordering decisions
uint64_t current_est_rows = optimizer::RowEstimator::estimate_scan_rows(*base_table_meta);
Expand Down Expand Up @@ -1720,20 +1722,41 @@ std::unique_ptr<VectorizedOperator> QueryExecutor::build_vectorized_plan(
}

executor::Schema output_schema;
std::vector<size_t> required_col_indices;
for (const auto& gb : stmt.group_by()) {
const auto& gb_name = gb->to_string();
size_t idx = current_root->output_schema().find_column(gb_name);
if (idx != static_cast<size_t>(-1)) {
output_schema.add_column(current_root->output_schema().get_column(idx).name(),
current_root->output_schema().get_column(idx).type(),
current_root->output_schema().get_column(idx).nullable());
required_col_indices.push_back(idx);
}
}
for (size_t i = 0; i < agg_infos.size(); ++i) {
output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64,
false);
if (agg_infos[i].input_col_idx >= 0) {
required_col_indices.push_back(static_cast<size_t>(agg_infos[i].input_col_idx));
}
}

// Deduplicate required_col_indices (same column may appear in GROUP BY and aggregate)
sort(required_col_indices.begin(), required_col_indices.end());
required_col_indices.erase(unique(required_col_indices.begin(), required_col_indices.end()),
required_col_indices.end());

// Build scan's reduced schema (table columns only, not aggregate output columns)
executor::Schema scan_reduced_schema;
for (size_t idx : required_col_indices) {
scan_reduced_schema.add_column(
current_root->output_schema().get_column(idx).name(),
current_root->output_schema().get_column(idx).type(),
current_root->output_schema().get_column(idx).nullable());
}

base_scan->set_required_columns(required_col_indices, scan_reduced_schema);

auto agg_op = std::make_unique<VectorizedGroupByOperator>(
std::move(current_root), std::move(group_by), std::move(agg_infos), output_schema,
thread_pool);
Expand Down
173 changes: 173 additions & 0 deletions src/storage/columnar_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,177 @@ bool ColumnarTable::read_batch(uint64_t start_row, uint32_t batch_size,
return true;
}

bool ColumnarTable::read_batch(uint64_t start_row, uint32_t batch_size,
executor::VectorBatch& out_batch,
const std::vector<size_t>& col_indices) {
if (start_row >= row_count_) return false;
if (col_indices.empty()) return false;

uint32_t actual_rows =
static_cast<uint32_t>(std::min(static_cast<uint64_t>(batch_size), row_count_ - start_row));

// out_batch is pre-initialized with the reduced schema by the caller
// (VectorizedSeqScanOperator via set_required_columns)

for (size_t idx = 0; idx < col_indices.size(); ++idx) {
size_t col_idx = col_indices[idx];
const std::string base = name_ + ".col" + std::to_string(col_idx);
std::ifstream n_in(storage_manager_.get_full_path(base + ".nulls.bin"), std::ios::binary);
std::ifstream d_in(storage_manager_.get_full_path(base + ".data.bin"), std::ios::binary);
if (!n_in.is_open() || !d_in.is_open()) return false;

auto& target_col = out_batch.get_column(idx);
const auto type = schema_.get_column(col_idx).type();

if (type == common::ValueType::TYPE_INT64) {
auto& num_vec = dynamic_cast<executor::NumericVector<int64_t>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg);
std::vector<int64_t> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else {
num_vec.append(common::Value::make_int64(data[r]));
}
}
} else if (type == common::ValueType::TYPE_INT32 || type == common::ValueType::TYPE_INT16 ||
type == common::ValueType::TYPE_INT8) {
auto& num_vec = dynamic_cast<executor::NumericVector<int64_t>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg);
std::vector<int64_t> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else if (type == common::ValueType::TYPE_INT32) {
num_vec.append(common::Value(static_cast<int32_t>(data[r])));
} else if (type == common::ValueType::TYPE_INT16) {
num_vec.append(common::Value(static_cast<int16_t>(data[r])));
} else {
num_vec.append(common::Value(static_cast<int8_t>(data[r])));
}
}
} else if (type == common::ValueType::TYPE_FLOAT64) {
auto& num_vec = dynamic_cast<executor::NumericVector<double>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg);
std::vector<double> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else {
num_vec.append(common::Value::make_float64(data[r]));
}
}
} else if (type == common::ValueType::TYPE_FLOAT32) {
auto& num_vec = dynamic_cast<executor::NumericVector<float>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg);
std::vector<double> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else {
num_vec.append(common::Value(static_cast<float>(data[r])));
}
}
} else if (type == common::ValueType::TYPE_DECIMAL) {
auto& num_vec = dynamic_cast<executor::NumericVector<double>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg);
std::vector<double> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else {
num_vec.append(common::Value::make_float64(data[r]));
}
}
} else if (type == common::ValueType::TYPE_BOOL) {
auto& num_vec = dynamic_cast<executor::NumericVector<bool>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else {
num_vec.append(common::Value(data[r] != 0));
}
}
} else if (type == common::ValueType::TYPE_TEXT ||
type == common::ValueType::TYPE_VARCHAR ||
type == common::ValueType::TYPE_CHAR) {
auto& str_vec = dynamic_cast<executor::StringVector&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

if (start_row > 0) {
for (uint32_t r = 0; r < start_row; ++r) {
uint32_t len = 0;
if (!d_in.read(reinterpret_cast<char*>(&len), 4)) break;
if (len > 0) {
d_in.seekg(static_cast<std::streamoff>(len), std::ios::cur);
}
}
}

for (uint32_t r = 0; r < actual_rows; ++r) {
uint32_t len = 0;
d_in.read(reinterpret_cast<char*>(&len), 4);
std::string s(len, '\0');
d_in.read(s.data(), len);
if (nulls[r] != 0U) {
str_vec.append(common::Value::make_null());
} else {
str_vec.append(common::Value::make_text(s));
}
}
} else {
throw std::runtime_error("ColumnarTable::read_batch(col_indices): Unsupported type " +
std::to_string(static_cast<int>(type)));
}
}
out_batch.set_row_count(actual_rows);
return true;
}
} // namespace cloudsql::storage
Loading