From a97b6dae5b115e5de71ca6e13ab43966f0a7531a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Thu, 11 Jun 2026 21:07:37 +0300 Subject: [PATCH 1/5] Add read_batch(col_indices) overload declaration to ColumnarTable --- include/storage/columnar_table.hpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/include/storage/columnar_table.hpp b/include/storage/columnar_table.hpp index ae4f4f2b..868fa916 100644 --- a/include/storage/columnar_table.hpp +++ b/include/storage/columnar_table.hpp @@ -37,6 +37,16 @@ class ColumnarTable { */ bool read_batch(uint64_t start_row, uint32_t batch_size, executor::VectorBatch& out_batch); + /** + * @brief Load a batch of data for only the specified columns + * @param start_row Starting row index + * @param batch_size Maximum rows to read + * @param out_batch Output batch (pre-initialized with required schema) + * @param col_indices Columns to read (indices into table's schema) + */ + bool read_batch(uint64_t start_row, uint32_t batch_size, executor::VectorBatch& out_batch, + const std::vector& col_indices); + /** * @brief Append a batch of data to the table */ From 1c86311e6316e59f504c1f005a5c1f3a349e0d39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Thu, 11 Jun 2026 21:07:41 +0300 Subject: [PATCH 2/5] Implement projection-aware read_batch(col_indices) in ColumnarTable --- src/storage/columnar_table.cpp | 174 +++++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) diff --git a/src/storage/columnar_table.cpp b/src/storage/columnar_table.cpp index 25c39ccd..b7151db6 100644 --- a/src/storage/columnar_table.cpp +++ b/src/storage/columnar_table.cpp @@ -295,4 +295,178 @@ bool ColumnarTable::read_batch(uint64_t start_row, uint32_t batch_size, return true; } +bool ColumnarTable::read_batch(uint64_t start_row, uint32_t batch_size, + executor::VectorBatch& out_batch, + const std::vector& col_indices) { + if (start_row >= row_count_) return false; + if (col_indices.empty()) return false; + + uint32_t actual_rows = + static_cast(std::min(static_cast(batch_size), row_count_ - start_row)); + + // out_batch is pre-initialized with the reduced schema by the caller + // (VectorizedSeqScanOperator via set_required_columns) + + for (size_t idx = 0; idx < col_indices.size(); ++idx) { + size_t col_idx = col_indices[idx]; + const std::string base = name_ + ".col" + std::to_string(col_idx); + std::ifstream n_in(storage_manager_.get_full_path(base + ".nulls.bin"), std::ios::binary); + std::ifstream d_in(storage_manager_.get_full_path(base + ".data.bin"), std::ios::binary); + if (!n_in.is_open() || !d_in.is_open()) return false; + + auto& target_col = out_batch.get_column(idx); + const auto type = schema_.get_column(col_idx).type(); + + if (type == common::ValueType::TYPE_INT64) { + auto& num_vec = dynamic_cast&>(target_col); + + n_in.seekg(static_cast(start_row), std::ios::beg); + std::vector nulls(actual_rows); + n_in.read(reinterpret_cast(nulls.data()), actual_rows); + + d_in.seekg(static_cast(start_row * 8), std::ios::beg); + std::vector data(actual_rows); + d_in.read(reinterpret_cast(data.data()), actual_rows * 8); + + for (uint32_t r = 0; r < actual_rows; ++r) { + if (nulls[r] != 0U) { + num_vec.append(common::Value::make_null()); + } else { + num_vec.append(common::Value::make_int64(data[r])); + } + } + } else if (type == common::ValueType::TYPE_INT32 || type == common::ValueType::TYPE_INT16 || + type == common::ValueType::TYPE_INT8) { + auto& num_vec = dynamic_cast&>(target_col); + + n_in.seekg(static_cast(start_row), std::ios::beg); + std::vector nulls(actual_rows); + n_in.read(reinterpret_cast(nulls.data()), actual_rows); + + d_in.seekg(static_cast(start_row * 8), std::ios::beg); + std::vector data(actual_rows); + d_in.read(reinterpret_cast(data.data()), actual_rows * 8); + + for (uint32_t r = 0; r < actual_rows; ++r) { + if (nulls[r] != 0U) { + num_vec.append(common::Value::make_null()); + } else if (type == common::ValueType::TYPE_INT32) { + num_vec.append(common::Value(static_cast(data[r]))); + } else if (type == common::ValueType::TYPE_INT16) { + num_vec.append(common::Value(static_cast(data[r]))); + } else { + num_vec.append(common::Value(static_cast(data[r]))); + } + } + } else if (type == common::ValueType::TYPE_FLOAT64) { + auto& num_vec = dynamic_cast&>(target_col); + + n_in.seekg(static_cast(start_row), std::ios::beg); + std::vector nulls(actual_rows); + n_in.read(reinterpret_cast(nulls.data()), actual_rows); + + d_in.seekg(static_cast(start_row * 8), std::ios::beg); + std::vector data(actual_rows); + d_in.read(reinterpret_cast(data.data()), actual_rows * 8); + + for (uint32_t r = 0; r < actual_rows; ++r) { + if (nulls[r] != 0U) { + num_vec.append(common::Value::make_null()); + } else { + num_vec.append(common::Value::make_float64(data[r])); + } + } + } else if (type == common::ValueType::TYPE_FLOAT32) { + auto& num_vec = dynamic_cast&>(target_col); + + n_in.seekg(static_cast(start_row), std::ios::beg); + std::vector nulls(actual_rows); + n_in.read(reinterpret_cast(nulls.data()), actual_rows); + + d_in.seekg(static_cast(start_row * 8), std::ios::beg); + std::vector data(actual_rows); + d_in.read(reinterpret_cast(data.data()), actual_rows * 8); + + for (uint32_t r = 0; r < actual_rows; ++r) { + if (nulls[r] != 0U) { + num_vec.append(common::Value::make_null()); + } else { + num_vec.append(common::Value(static_cast(data[r]))); + } + } + } else if (type == common::ValueType::TYPE_DECIMAL) { + auto& num_vec = dynamic_cast&>(target_col); + + n_in.seekg(static_cast(start_row), std::ios::beg); + std::vector nulls(actual_rows); + n_in.read(reinterpret_cast(nulls.data()), actual_rows); + + d_in.seekg(static_cast(start_row * 8), std::ios::beg); + std::vector data(actual_rows); + d_in.read(reinterpret_cast(data.data()), actual_rows * 8); + + for (uint32_t r = 0; r < actual_rows; ++r) { + if (nulls[r] != 0U) { + num_vec.append(common::Value::make_null()); + } else { + num_vec.append(common::Value::make_float64(data[r])); + } + } + } else if (type == common::ValueType::TYPE_BOOL) { + auto& num_vec = dynamic_cast&>(target_col); + + n_in.seekg(static_cast(start_row), std::ios::beg); + std::vector nulls(actual_rows); + n_in.read(reinterpret_cast(nulls.data()), actual_rows); + + d_in.seekg(static_cast(start_row), std::ios::beg); + std::vector data(actual_rows); + d_in.read(reinterpret_cast(data.data()), actual_rows); + + for (uint32_t r = 0; r < actual_rows; ++r) { + if (nulls[r] != 0U) { + num_vec.append(common::Value::make_null()); + } else { + num_vec.append(common::Value(data[r] != 0)); + } + } + } else if (type == common::ValueType::TYPE_TEXT || + type == common::ValueType::TYPE_VARCHAR || + type == common::ValueType::TYPE_CHAR) { + auto& str_vec = dynamic_cast(target_col); + + n_in.seekg(static_cast(start_row), std::ios::beg); + std::vector nulls(actual_rows); + n_in.read(reinterpret_cast(nulls.data()), actual_rows); + + if (start_row > 0) { + for (uint32_t r = 0; r < start_row; ++r) { + uint32_t len = 0; + if (!d_in.read(reinterpret_cast(&len), 4)) break; + if (len > 0) { + d_in.seekg(static_cast(len), std::ios::cur); + } + } + } + + for (uint32_t r = 0; r < actual_rows; ++r) { + uint32_t len = 0; + d_in.read(reinterpret_cast(&len), 4); + std::string s(len, '\0'); + d_in.read(s.data(), len); + if (nulls[r] != 0U) { + str_vec.append(common::Value::make_null()); + } else { + str_vec.append(common::Value::make_text(s)); + } + } + } else { + throw std::runtime_error( + "ColumnarTable::read_batch(col_indices): Unsupported type " + + std::to_string(static_cast(type))); + } + } + out_batch.set_row_count(actual_rows); + return true; +} } // namespace cloudsql::storage From 023fb82c2912b500c3c51e8b8cd1b9e42fb1e7d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Thu, 11 Jun 2026 21:07:44 +0300 Subject: [PATCH 3/5] Add set_required_columns() to VectorizedSeqScanOperator for column projection --- include/executor/vectorized_operator.hpp | 34 ++++++++++++++++++++---- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index bb1d309d..aa7405d5 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -73,6 +73,8 @@ class VectorizedSeqScanOperator : public VectorizedOperator { size_t num_threads_ = 1; std::vector> parallel_results_; size_t parallel_idx_ = 0; + std::vector required_col_indices_; + executor::Schema reduced_schema_; public: VectorizedSeqScanOperator(std::string table_name, std::shared_ptr table, @@ -94,12 +96,26 @@ class VectorizedSeqScanOperator : public VectorizedOperator { return next_batch_parallel(out_batch); } + void set_required_columns(std::vector col_indices, executor::Schema reduced_schema) { + required_col_indices_ = std::move(col_indices); + reduced_schema_ = std::move(reduced_schema); + } + private: bool next_batch_sequential(VectorBatch& out_batch) { if (current_row_ >= table_->row_count()) { return false; } + if (!required_col_indices_.empty()) { + out_batch.init_from_schema(reduced_schema_); + if (table_->read_batch(current_row_, batch_size_, out_batch, required_col_indices_)) { + current_row_ += out_batch.row_count(); + return true; + } + return false; + } + if (table_->read_batch(current_row_, batch_size_, out_batch)) { current_row_ += out_batch.row_count(); return true; @@ -128,7 +144,8 @@ class VectorizedSeqScanOperator : public VectorizedOperator { size_t end = std::min(start + range_size, total_rows); current_row_ = end; - auto batch = VectorBatch::create(output_schema_); + auto batch = VectorBatch::create( + required_col_indices_.empty() ? output_schema_ : reduced_schema_); parallel_results_.push_back(std::move(batch)); } @@ -139,10 +156,17 @@ class VectorizedSeqScanOperator : public VectorizedOperator { parallel_results_[t]->set_row_count(0); continue; } - thread_pool_->submit([this, t, start, rows_to_read]() { - table_->read_batch(start, static_cast(rows_to_read), - *parallel_results_[t]); - }); + if (!required_col_indices_.empty()) { + thread_pool_->submit([this, t, start, rows_to_read]() { + table_->read_batch(start, static_cast(rows_to_read), + *parallel_results_[t], required_col_indices_); + }); + } else { + thread_pool_->submit([this, t, start, rows_to_read]() { + table_->read_batch(start, static_cast(rows_to_read), + *parallel_results_[t]); + }); + } } thread_pool_->wait(); From f16006daffaedf1c18194a6b1f3750edb2bf376e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Thu, 11 Jun 2026 21:07:47 +0300 Subject: [PATCH 4/5] Propagate required column indices from GROUP BY to scan in build_vectorized_plan --- src/executor/query_executor.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/executor/query_executor.cpp b/src/executor/query_executor.cpp index 7c541ade..ff45272d 100644 --- a/src/executor/query_executor.cpp +++ b/src/executor/query_executor.cpp @@ -1512,6 +1512,7 @@ std::unique_ptr QueryExecutor::build_vectorized_plan( auto thread_pool = std::make_shared(std::thread::hardware_concurrency()); std::unique_ptr current_root = std::make_unique(base_table_name, col_table, thread_pool); + VectorizedSeqScanOperator* base_scan = static_cast(current_root.get()); // Track estimated output rows for join reordering decisions uint64_t current_est_rows = optimizer::RowEstimator::estimate_scan_rows(*base_table_meta); @@ -1720,6 +1721,7 @@ std::unique_ptr QueryExecutor::build_vectorized_plan( } executor::Schema output_schema; + std::vector required_col_indices; for (const auto& gb : stmt.group_by()) { const auto& gb_name = gb->to_string(); size_t idx = current_root->output_schema().find_column(gb_name); @@ -1727,13 +1729,19 @@ std::unique_ptr QueryExecutor::build_vectorized_plan( output_schema.add_column(current_root->output_schema().get_column(idx).name(), current_root->output_schema().get_column(idx).type(), current_root->output_schema().get_column(idx).nullable()); + required_col_indices.push_back(idx); } } for (size_t i = 0; i < agg_infos.size(); ++i) { output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64, false); + if (agg_infos[i].input_col_idx >= 0) { + required_col_indices.push_back(static_cast(agg_infos[i].input_col_idx)); + } } + base_scan->set_required_columns(required_col_indices, output_schema); + auto agg_op = std::make_unique( std::move(current_root), std::move(group_by), std::move(agg_infos), output_schema, thread_pool); From 5cfb3a9ff8ca29f0a190705a41a55a63f3c00f34 Mon Sep 17 00:00:00 2001 From: poyrazK <83272398+poyrazK@users.noreply.github.com> Date: Thu, 11 Jun 2026 18:08:33 +0000 Subject: [PATCH 5/5] style: automated clang-format fixes --- include/executor/vectorized_operator.hpp | 4 ++-- src/executor/query_executor.cpp | 3 ++- src/storage/columnar_table.cpp | 5 ++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index aa7405d5..762d8cb8 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -144,8 +144,8 @@ class VectorizedSeqScanOperator : public VectorizedOperator { size_t end = std::min(start + range_size, total_rows); current_row_ = end; - auto batch = VectorBatch::create( - required_col_indices_.empty() ? output_schema_ : reduced_schema_); + auto batch = VectorBatch::create(required_col_indices_.empty() ? output_schema_ + : reduced_schema_); parallel_results_.push_back(std::move(batch)); } diff --git a/src/executor/query_executor.cpp b/src/executor/query_executor.cpp index ff45272d..cf6e7002 100644 --- a/src/executor/query_executor.cpp +++ b/src/executor/query_executor.cpp @@ -1512,7 +1512,8 @@ std::unique_ptr QueryExecutor::build_vectorized_plan( auto thread_pool = std::make_shared(std::thread::hardware_concurrency()); std::unique_ptr current_root = std::make_unique(base_table_name, col_table, thread_pool); - VectorizedSeqScanOperator* base_scan = static_cast(current_root.get()); + VectorizedSeqScanOperator* base_scan = + static_cast(current_root.get()); // Track estimated output rows for join reordering decisions uint64_t current_est_rows = optimizer::RowEstimator::estimate_scan_rows(*base_table_meta); diff --git a/src/storage/columnar_table.cpp b/src/storage/columnar_table.cpp index b7151db6..6fda05ac 100644 --- a/src/storage/columnar_table.cpp +++ b/src/storage/columnar_table.cpp @@ -461,9 +461,8 @@ bool ColumnarTable::read_batch(uint64_t start_row, uint32_t batch_size, } } } else { - throw std::runtime_error( - "ColumnarTable::read_batch(col_indices): Unsupported type " + - std::to_string(static_cast(type))); + throw std::runtime_error("ColumnarTable::read_batch(col_indices): Unsupported type " + + std::to_string(static_cast(type))); } } out_batch.set_row_count(actual_rows);