From 0bf09e26543b8b55e31a83b1bcded66a7c99a12d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Thu, 11 Jun 2026 21:07:37 +0300 Subject: [PATCH 1/7] Add read_batch(col_indices) overload declaration to ColumnarTable --- include/storage/columnar_table.hpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/include/storage/columnar_table.hpp b/include/storage/columnar_table.hpp index ae4f4f2b..868fa916 100644 --- a/include/storage/columnar_table.hpp +++ b/include/storage/columnar_table.hpp @@ -37,6 +37,16 @@ class ColumnarTable { */ bool read_batch(uint64_t start_row, uint32_t batch_size, executor::VectorBatch& out_batch); + /** + * @brief Load a batch of data for only the specified columns + * @param start_row Starting row index + * @param batch_size Maximum rows to read + * @param out_batch Output batch (pre-initialized with required schema) + * @param col_indices Columns to read (indices into table's schema) + */ + bool read_batch(uint64_t start_row, uint32_t batch_size, executor::VectorBatch& out_batch, + const std::vector& col_indices); + /** * @brief Append a batch of data to the table */ From e16616156648e72682dd4fb567d4135993cd1c08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Thu, 11 Jun 2026 21:07:41 +0300 Subject: [PATCH 2/7] Implement projection-aware read_batch(col_indices) in ColumnarTable --- src/storage/columnar_table.cpp | 174 +++++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) diff --git a/src/storage/columnar_table.cpp b/src/storage/columnar_table.cpp index 25c39ccd..b7151db6 100644 --- a/src/storage/columnar_table.cpp +++ b/src/storage/columnar_table.cpp @@ -295,4 +295,178 @@ bool ColumnarTable::read_batch(uint64_t start_row, uint32_t batch_size, return true; } +bool ColumnarTable::read_batch(uint64_t start_row, uint32_t batch_size, + executor::VectorBatch& out_batch, + const std::vector& col_indices) { + if (start_row >= row_count_) return false; + if (col_indices.empty()) return false; + + uint32_t actual_rows = + static_cast(std::min(static_cast(batch_size), row_count_ - start_row)); + + // out_batch is pre-initialized with the reduced schema by the caller + // (VectorizedSeqScanOperator via set_required_columns) + + for (size_t idx = 0; idx < col_indices.size(); ++idx) { + size_t col_idx = col_indices[idx]; + const std::string base = name_ + ".col" + std::to_string(col_idx); + std::ifstream n_in(storage_manager_.get_full_path(base + ".nulls.bin"), std::ios::binary); + std::ifstream d_in(storage_manager_.get_full_path(base + ".data.bin"), std::ios::binary); + if (!n_in.is_open() || !d_in.is_open()) return false; + + auto& target_col = out_batch.get_column(idx); + const auto type = schema_.get_column(col_idx).type(); + + if (type == common::ValueType::TYPE_INT64) { + auto& num_vec = dynamic_cast&>(target_col); + + n_in.seekg(static_cast(start_row), std::ios::beg); + std::vector nulls(actual_rows); + n_in.read(reinterpret_cast(nulls.data()), actual_rows); + + d_in.seekg(static_cast(start_row * 8), std::ios::beg); + std::vector data(actual_rows); + d_in.read(reinterpret_cast(data.data()), actual_rows * 8); + + for (uint32_t r = 0; r < actual_rows; ++r) { + if (nulls[r] != 0U) { + num_vec.append(common::Value::make_null()); + } else { + num_vec.append(common::Value::make_int64(data[r])); + } + } + } else if (type == common::ValueType::TYPE_INT32 || type == common::ValueType::TYPE_INT16 || + type == common::ValueType::TYPE_INT8) { + auto& num_vec = dynamic_cast&>(target_col); + + n_in.seekg(static_cast(start_row), std::ios::beg); + std::vector nulls(actual_rows); + n_in.read(reinterpret_cast(nulls.data()), actual_rows); + + d_in.seekg(static_cast(start_row * 8), std::ios::beg); + std::vector data(actual_rows); + d_in.read(reinterpret_cast(data.data()), actual_rows * 8); + + for (uint32_t r = 0; r < actual_rows; ++r) { + if (nulls[r] != 0U) { + num_vec.append(common::Value::make_null()); + } else if (type == common::ValueType::TYPE_INT32) { + num_vec.append(common::Value(static_cast(data[r]))); + } else if (type == common::ValueType::TYPE_INT16) { + num_vec.append(common::Value(static_cast(data[r]))); + } else { + num_vec.append(common::Value(static_cast(data[r]))); + } + } + } else if (type == common::ValueType::TYPE_FLOAT64) { + auto& num_vec = dynamic_cast&>(target_col); + + n_in.seekg(static_cast(start_row), std::ios::beg); + std::vector nulls(actual_rows); + n_in.read(reinterpret_cast(nulls.data()), actual_rows); + + d_in.seekg(static_cast(start_row * 8), std::ios::beg); + std::vector data(actual_rows); + d_in.read(reinterpret_cast(data.data()), actual_rows * 8); + + for (uint32_t r = 0; r < actual_rows; ++r) { + if (nulls[r] != 0U) { + num_vec.append(common::Value::make_null()); + } else { + num_vec.append(common::Value::make_float64(data[r])); + } + } + } else if (type == common::ValueType::TYPE_FLOAT32) { + auto& num_vec = dynamic_cast&>(target_col); + + n_in.seekg(static_cast(start_row), std::ios::beg); + std::vector nulls(actual_rows); + n_in.read(reinterpret_cast(nulls.data()), actual_rows); + + d_in.seekg(static_cast(start_row * 8), std::ios::beg); + std::vector data(actual_rows); + d_in.read(reinterpret_cast(data.data()), actual_rows * 8); + + for (uint32_t r = 0; r < actual_rows; ++r) { + if (nulls[r] != 0U) { + num_vec.append(common::Value::make_null()); + } else { + num_vec.append(common::Value(static_cast(data[r]))); + } + } + } else if (type == common::ValueType::TYPE_DECIMAL) { + auto& num_vec = dynamic_cast&>(target_col); + + n_in.seekg(static_cast(start_row), std::ios::beg); + std::vector nulls(actual_rows); + n_in.read(reinterpret_cast(nulls.data()), actual_rows); + + d_in.seekg(static_cast(start_row * 8), std::ios::beg); + std::vector data(actual_rows); + d_in.read(reinterpret_cast(data.data()), actual_rows * 8); + + for (uint32_t r = 0; r < actual_rows; ++r) { + if (nulls[r] != 0U) { + num_vec.append(common::Value::make_null()); + } else { + num_vec.append(common::Value::make_float64(data[r])); + } + } + } else if (type == common::ValueType::TYPE_BOOL) { + auto& num_vec = dynamic_cast&>(target_col); + + n_in.seekg(static_cast(start_row), std::ios::beg); + std::vector nulls(actual_rows); + n_in.read(reinterpret_cast(nulls.data()), actual_rows); + + d_in.seekg(static_cast(start_row), std::ios::beg); + std::vector data(actual_rows); + d_in.read(reinterpret_cast(data.data()), actual_rows); + + for (uint32_t r = 0; r < actual_rows; ++r) { + if (nulls[r] != 0U) { + num_vec.append(common::Value::make_null()); + } else { + num_vec.append(common::Value(data[r] != 0)); + } + } + } else if (type == common::ValueType::TYPE_TEXT || + type == common::ValueType::TYPE_VARCHAR || + type == common::ValueType::TYPE_CHAR) { + auto& str_vec = dynamic_cast(target_col); + + n_in.seekg(static_cast(start_row), std::ios::beg); + std::vector nulls(actual_rows); + n_in.read(reinterpret_cast(nulls.data()), actual_rows); + + if (start_row > 0) { + for (uint32_t r = 0; r < start_row; ++r) { + uint32_t len = 0; + if (!d_in.read(reinterpret_cast(&len), 4)) break; + if (len > 0) { + d_in.seekg(static_cast(len), std::ios::cur); + } + } + } + + for (uint32_t r = 0; r < actual_rows; ++r) { + uint32_t len = 0; + d_in.read(reinterpret_cast(&len), 4); + std::string s(len, '\0'); + d_in.read(s.data(), len); + if (nulls[r] != 0U) { + str_vec.append(common::Value::make_null()); + } else { + str_vec.append(common::Value::make_text(s)); + } + } + } else { + throw std::runtime_error( + "ColumnarTable::read_batch(col_indices): Unsupported type " + + std::to_string(static_cast(type))); + } + } + out_batch.set_row_count(actual_rows); + return true; +} } // namespace cloudsql::storage From 9997939f6a2014f593b80c9579193d392608f2bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Thu, 11 Jun 2026 21:07:44 +0300 Subject: [PATCH 3/7] Add set_required_columns() to VectorizedSeqScanOperator for column projection --- include/executor/vectorized_operator.hpp | 34 ++++++++++++++++++++---- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index bb1d309d..aa7405d5 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -73,6 +73,8 @@ class VectorizedSeqScanOperator : public VectorizedOperator { size_t num_threads_ = 1; std::vector> parallel_results_; size_t parallel_idx_ = 0; + std::vector required_col_indices_; + executor::Schema reduced_schema_; public: VectorizedSeqScanOperator(std::string table_name, std::shared_ptr table, @@ -94,12 +96,26 @@ class VectorizedSeqScanOperator : public VectorizedOperator { return next_batch_parallel(out_batch); } + void set_required_columns(std::vector col_indices, executor::Schema reduced_schema) { + required_col_indices_ = std::move(col_indices); + reduced_schema_ = std::move(reduced_schema); + } + private: bool next_batch_sequential(VectorBatch& out_batch) { if (current_row_ >= table_->row_count()) { return false; } + if (!required_col_indices_.empty()) { + out_batch.init_from_schema(reduced_schema_); + if (table_->read_batch(current_row_, batch_size_, out_batch, required_col_indices_)) { + current_row_ += out_batch.row_count(); + return true; + } + return false; + } + if (table_->read_batch(current_row_, batch_size_, out_batch)) { current_row_ += out_batch.row_count(); return true; @@ -128,7 +144,8 @@ class VectorizedSeqScanOperator : public VectorizedOperator { size_t end = std::min(start + range_size, total_rows); current_row_ = end; - auto batch = VectorBatch::create(output_schema_); + auto batch = VectorBatch::create( + required_col_indices_.empty() ? output_schema_ : reduced_schema_); parallel_results_.push_back(std::move(batch)); } @@ -139,10 +156,17 @@ class VectorizedSeqScanOperator : public VectorizedOperator { parallel_results_[t]->set_row_count(0); continue; } - thread_pool_->submit([this, t, start, rows_to_read]() { - table_->read_batch(start, static_cast(rows_to_read), - *parallel_results_[t]); - }); + if (!required_col_indices_.empty()) { + thread_pool_->submit([this, t, start, rows_to_read]() { + table_->read_batch(start, static_cast(rows_to_read), + *parallel_results_[t], required_col_indices_); + }); + } else { + thread_pool_->submit([this, t, start, rows_to_read]() { + table_->read_batch(start, static_cast(rows_to_read), + *parallel_results_[t]); + }); + } } thread_pool_->wait(); From 1a4cf4830f84333bbdcefd733c38409300c4c60c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Thu, 11 Jun 2026 21:07:47 +0300 Subject: [PATCH 4/7] Propagate required column indices from GROUP BY to scan in build_vectorized_plan --- src/executor/query_executor.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/executor/query_executor.cpp b/src/executor/query_executor.cpp index 7c541ade..ff45272d 100644 --- a/src/executor/query_executor.cpp +++ b/src/executor/query_executor.cpp @@ -1512,6 +1512,7 @@ std::unique_ptr QueryExecutor::build_vectorized_plan( auto thread_pool = std::make_shared(std::thread::hardware_concurrency()); std::unique_ptr current_root = std::make_unique(base_table_name, col_table, thread_pool); + VectorizedSeqScanOperator* base_scan = static_cast(current_root.get()); // Track estimated output rows for join reordering decisions uint64_t current_est_rows = optimizer::RowEstimator::estimate_scan_rows(*base_table_meta); @@ -1720,6 +1721,7 @@ std::unique_ptr QueryExecutor::build_vectorized_plan( } executor::Schema output_schema; + std::vector required_col_indices; for (const auto& gb : stmt.group_by()) { const auto& gb_name = gb->to_string(); size_t idx = current_root->output_schema().find_column(gb_name); @@ -1727,13 +1729,19 @@ std::unique_ptr QueryExecutor::build_vectorized_plan( output_schema.add_column(current_root->output_schema().get_column(idx).name(), current_root->output_schema().get_column(idx).type(), current_root->output_schema().get_column(idx).nullable()); + required_col_indices.push_back(idx); } } for (size_t i = 0; i < agg_infos.size(); ++i) { output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64, false); + if (agg_infos[i].input_col_idx >= 0) { + required_col_indices.push_back(static_cast(agg_infos[i].input_col_idx)); + } } + base_scan->set_required_columns(required_col_indices, output_schema); + auto agg_op = std::make_unique( std::move(current_root), std::move(group_by), std::move(agg_infos), output_schema, thread_pool); From 114f4f29323b1456c533ee490b512f58bc43d766 Mon Sep 17 00:00:00 2001 From: poyrazK <83272398+poyrazK@users.noreply.github.com> Date: Thu, 11 Jun 2026 18:09:31 +0000 Subject: [PATCH 5/7] style: automated clang-format fixes --- include/executor/vectorized_operator.hpp | 4 ++-- src/executor/query_executor.cpp | 3 ++- src/storage/columnar_table.cpp | 5 ++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index aa7405d5..762d8cb8 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -144,8 +144,8 @@ class VectorizedSeqScanOperator : public VectorizedOperator { size_t end = std::min(start + range_size, total_rows); current_row_ = end; - auto batch = VectorBatch::create( - required_col_indices_.empty() ? output_schema_ : reduced_schema_); + auto batch = VectorBatch::create(required_col_indices_.empty() ? output_schema_ + : reduced_schema_); parallel_results_.push_back(std::move(batch)); } diff --git a/src/executor/query_executor.cpp b/src/executor/query_executor.cpp index ff45272d..cf6e7002 100644 --- a/src/executor/query_executor.cpp +++ b/src/executor/query_executor.cpp @@ -1512,7 +1512,8 @@ std::unique_ptr QueryExecutor::build_vectorized_plan( auto thread_pool = std::make_shared(std::thread::hardware_concurrency()); std::unique_ptr current_root = std::make_unique(base_table_name, col_table, thread_pool); - VectorizedSeqScanOperator* base_scan = static_cast(current_root.get()); + VectorizedSeqScanOperator* base_scan = + static_cast(current_root.get()); // Track estimated output rows for join reordering decisions uint64_t current_est_rows = optimizer::RowEstimator::estimate_scan_rows(*base_table_meta); diff --git a/src/storage/columnar_table.cpp b/src/storage/columnar_table.cpp index b7151db6..6fda05ac 100644 --- a/src/storage/columnar_table.cpp +++ b/src/storage/columnar_table.cpp @@ -461,9 +461,8 @@ bool ColumnarTable::read_batch(uint64_t start_row, uint32_t batch_size, } } } else { - throw std::runtime_error( - "ColumnarTable::read_batch(col_indices): Unsupported type " + - std::to_string(static_cast(type))); + throw std::runtime_error("ColumnarTable::read_batch(col_indices): Unsupported type " + + std::to_string(static_cast(type))); } } out_batch.set_row_count(actual_rows); From a9050cf1b1bea5d25ac20c2f532dc7d121c9e7cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Thu, 11 Jun 2026 21:16:29 +0300 Subject: [PATCH 6/7] Fix duplicate column indices and wrong reduced schema in column projection pushdown - Deduplicate required_col_indices before passing to set_required_columns() - Build scan_reduced_schema from table columns only (not aggregate output) - Fixes data corruption when same column appears in GROUP BY and aggregate --- src/executor/query_executor.cpp | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/executor/query_executor.cpp b/src/executor/query_executor.cpp index cf6e7002..e0bbd97d 100644 --- a/src/executor/query_executor.cpp +++ b/src/executor/query_executor.cpp @@ -1741,7 +1741,20 @@ std::unique_ptr QueryExecutor::build_vectorized_plan( } } - base_scan->set_required_columns(required_col_indices, output_schema); + // Deduplicate required_col_indices (same column may appear in GROUP BY and aggregate) + sort(required_col_indices.begin(), required_col_indices.end()); + required_col_indices.erase(unique(required_col_indices.begin(), required_col_indices.end()), + required_col_indices.end()); + + // Build scan's reduced schema (table columns only, not aggregate output columns) + executor::Schema scan_reduced_schema; + for (size_t idx : required_col_indices) { + scan_reduced_schema.add_column(current_root->output_schema().get_column(idx).name(), + current_root->output_schema().get_column(idx).type(), + current_root->output_schema().get_column(idx).nullable()); + } + + base_scan->set_required_columns(required_col_indices, scan_reduced_schema); auto agg_op = std::make_unique( std::move(current_root), std::move(group_by), std::move(agg_infos), output_schema, From 245326263c43ff37f60d4376256b435000603374 Mon Sep 17 00:00:00 2001 From: poyrazK <83272398+poyrazK@users.noreply.github.com> Date: Thu, 11 Jun 2026 18:17:07 +0000 Subject: [PATCH 7/7] style: automated clang-format fixes --- src/executor/query_executor.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/executor/query_executor.cpp b/src/executor/query_executor.cpp index e0bbd97d..9f1d916d 100644 --- a/src/executor/query_executor.cpp +++ b/src/executor/query_executor.cpp @@ -1749,9 +1749,10 @@ std::unique_ptr QueryExecutor::build_vectorized_plan( // Build scan's reduced schema (table columns only, not aggregate output columns) executor::Schema scan_reduced_schema; for (size_t idx : required_col_indices) { - scan_reduced_schema.add_column(current_root->output_schema().get_column(idx).name(), - current_root->output_schema().get_column(idx).type(), - current_root->output_schema().get_column(idx).nullable()); + scan_reduced_schema.add_column( + current_root->output_schema().get_column(idx).name(), + current_root->output_schema().get_column(idx).type(), + current_root->output_schema().get_column(idx).nullable()); } base_scan->set_required_columns(required_col_indices, scan_reduced_schema);