Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions include/executor/vectorized_operator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class VectorizedSeqScanOperator : public VectorizedOperator {
size_t num_threads_ = 1;
std::vector<std::unique_ptr<VectorBatch>> parallel_results_;
size_t parallel_idx_ = 0;
std::vector<size_t> required_col_indices_;
executor::Schema reduced_schema_;

public:
VectorizedSeqScanOperator(std::string table_name, std::shared_ptr<storage::ColumnarTable> table,
Expand All @@ -94,12 +96,26 @@ class VectorizedSeqScanOperator : public VectorizedOperator {
return next_batch_parallel(out_batch);
}

void set_required_columns(std::vector<size_t> col_indices, executor::Schema reduced_schema) {
required_col_indices_ = std::move(col_indices);
reduced_schema_ = std::move(reduced_schema);
}
Comment on lines +99 to +102

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

output_schema_ must be updated when column projection is enabled.

When set_required_columns is called, the operator will produce batches with reduced_schema, but output_schema_ (returned by output_schema()) still reflects the full table schema (set at line 82). Downstream operators resolve column indices from child_->output_schema(), then access batch columns at those indices. If the indices refer to the full schema but the batch has a reduced schema, column accesses will retrieve wrong data or crash from out-of-bounds access.

For example:

  • Table schema: [id INT, cat TEXT, val INT]
  • Projection: columns [cat, val] → indices [1, 2] in full schema
  • Batch schema after projection: [cat, val] → positions [0, 1]
  • GROUP BY resolves "cat" → index 1 (from full schema)
  • batch.get_column(1) retrieves "val" (batch position 1) instead of "cat" → wrong result
🐛 Proposed fix
 void set_required_columns(std::vector<size_t> col_indices, executor::Schema reduced_schema) {
     required_col_indices_ = std::move(col_indices);
     reduced_schema_ = std::move(reduced_schema);
+    output_schema_ = reduced_schema_;  // Update output schema to match actual batch schema
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@include/executor/vectorized_operator.hpp` around lines 99 - 102,
set_required_columns currently updates required_col_indices_ and reduced_schema_
but does not update output_schema_, causing downstream operators to resolve
indices against the full schema while batches use reduced_schema_; update
set_required_columns (the method named set_required_columns) to also set
output_schema_ = reduced_schema_ (or otherwise replace the operator's
output_schema_ with reduced_schema_) so output_schema() matches the projected
batch schema; ensure you modify the same function that assigns
required_col_indices_ and reduced_schema_ and keep reduced_schema_ and
required_col_indices_ semantics intact.


private:
bool next_batch_sequential(VectorBatch& out_batch) {
if (current_row_ >= table_->row_count()) {
return false;
}

if (!required_col_indices_.empty()) {
out_batch.init_from_schema(reduced_schema_);
if (table_->read_batch(current_row_, batch_size_, out_batch, required_col_indices_)) {
current_row_ += out_batch.row_count();
return true;
}
return false;
}

if (table_->read_batch(current_row_, batch_size_, out_batch)) {
current_row_ += out_batch.row_count();
return true;
Expand Down Expand Up @@ -128,7 +144,8 @@ class VectorizedSeqScanOperator : public VectorizedOperator {
size_t end = std::min(start + range_size, total_rows);
current_row_ = end;

auto batch = VectorBatch::create(output_schema_);
auto batch = VectorBatch::create(required_col_indices_.empty() ? output_schema_
: reduced_schema_);
parallel_results_.push_back(std::move(batch));
}

Expand All @@ -139,10 +156,17 @@ class VectorizedSeqScanOperator : public VectorizedOperator {
parallel_results_[t]->set_row_count(0);
continue;
}
thread_pool_->submit([this, t, start, rows_to_read]() {
table_->read_batch(start, static_cast<uint32_t>(rows_to_read),
*parallel_results_[t]);
});
if (!required_col_indices_.empty()) {
thread_pool_->submit([this, t, start, rows_to_read]() {
table_->read_batch(start, static_cast<uint32_t>(rows_to_read),
*parallel_results_[t], required_col_indices_);
});
} else {
thread_pool_->submit([this, t, start, rows_to_read]() {
table_->read_batch(start, static_cast<uint32_t>(rows_to_read),
*parallel_results_[t]);
});
}
}

thread_pool_->wait();
Expand Down
10 changes: 10 additions & 0 deletions include/storage/columnar_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ class ColumnarTable {
*/
bool read_batch(uint64_t start_row, uint32_t batch_size, executor::VectorBatch& out_batch);

/**
* @brief Load a batch of data for only the specified columns
* @param start_row Starting row index
* @param batch_size Maximum rows to read
* @param out_batch Output batch (pre-initialized with required schema)
* @param col_indices Columns to read (indices into table's schema)
*/
bool read_batch(uint64_t start_row, uint32_t batch_size, executor::VectorBatch& out_batch,
const std::vector<size_t>& col_indices);

/**
* @brief Append a batch of data to the table
*/
Expand Down
9 changes: 9 additions & 0 deletions src/executor/query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,8 @@ std::unique_ptr<VectorizedOperator> QueryExecutor::build_vectorized_plan(
auto thread_pool = std::make_shared<executor::ThreadPool>(std::thread::hardware_concurrency());
std::unique_ptr<VectorizedOperator> current_root =
std::make_unique<VectorizedSeqScanOperator>(base_table_name, col_table, thread_pool);
VectorizedSeqScanOperator* base_scan =
static_cast<VectorizedSeqScanOperator*>(current_root.get());

// Track estimated output rows for join reordering decisions
uint64_t current_est_rows = optimizer::RowEstimator::estimate_scan_rows(*base_table_meta);
Expand Down Expand Up @@ -1720,20 +1722,27 @@ std::unique_ptr<VectorizedOperator> QueryExecutor::build_vectorized_plan(
}

executor::Schema output_schema;
std::vector<size_t> required_col_indices;
for (const auto& gb : stmt.group_by()) {
const auto& gb_name = gb->to_string();
size_t idx = current_root->output_schema().find_column(gb_name);
if (idx != static_cast<size_t>(-1)) {
output_schema.add_column(current_root->output_schema().get_column(idx).name(),
current_root->output_schema().get_column(idx).type(),
current_root->output_schema().get_column(idx).nullable());
required_col_indices.push_back(idx);
}
}
for (size_t i = 0; i < agg_infos.size(); ++i) {
output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64,
false);
if (agg_infos[i].input_col_idx >= 0) {
required_col_indices.push_back(static_cast<size_t>(agg_infos[i].input_col_idx));
}
}

base_scan->set_required_columns(required_col_indices, output_schema);
Comment on lines 1724 to +1744

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Wrong schema passed to set_required_columns — causes type mismatch and crashes.

The output_schema built at lines 1724-1742 is the GROUP BY operator's output schema (keys + aggregate results), not the scan operator's output schema (keys + aggregate inputs). When passed to set_required_columns at line 1744, this causes the scan to initialize out_batch with aggregate result types (e.g., FLOAT64 for SUM) instead of input types (e.g., INT for the SUM input column).

Then in ColumnarTable::read_batch, the code reads the table column type from the table schema (INT), attempts to dynamic_cast the out_batch column to the corresponding vector type (NumericVector<int64_t>), but the column was initialized as FLOAT64 (NumericVector<double>). The cast throws std::bad_cast, crashing the query.

Example:

  • Table: (cat TEXT, val INT)
  • Query: SELECT cat, SUM(val) FROM test_table GROUP BY cat
  • output_schema at line 1744: (cat TEXT, agg_0 FLOAT64) — includes SUM output type
  • required_col_indices: [0, 1] — cat and val
  • Scan creates batch with schema (cat TEXT, agg_0 FLOAT64)
  • Storage tries to deserialize val (INT) into batch column 1 (FLOAT64 vector) → crash
🐛 Proposed fix

Build reduced_schema from the required input columns instead of the GROUP BY output schema:

         executor::Schema output_schema;
         std::vector<size_t> required_col_indices;
+        executor::Schema reduced_input_schema;  // Schema of columns scanned from table
         for (const auto& gb : stmt.group_by()) {
             const auto& gb_name = gb->to_string();
             size_t idx = current_root->output_schema().find_column(gb_name);
             if (idx != static_cast<size_t>(-1)) {
+                const auto& col = current_root->output_schema().get_column(idx);
+                reduced_input_schema.add_column(col.name(), col.type(), col.nullable());
                 output_schema.add_column(current_root->output_schema().get_column(idx).name(),
                                          current_root->output_schema().get_column(idx).type(),
                                          current_root->output_schema().get_column(idx).nullable());
                 required_col_indices.push_back(idx);
             }
         }
         for (size_t i = 0; i < agg_infos.size(); ++i) {
             output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64,
                                      false);
             if (agg_infos[i].input_col_idx >= 0) {
+                size_t input_idx = static_cast<size_t>(agg_infos[i].input_col_idx);
+                const auto& col = current_root->output_schema().get_column(input_idx);
+                reduced_input_schema.add_column(col.name(), col.type(), col.nullable());
                 required_col_indices.push_back(static_cast<size_t>(agg_infos[i].input_col_idx));
             }
         }

-        base_scan->set_required_columns(required_col_indices, output_schema);
+        base_scan->set_required_columns(required_col_indices, reduced_input_schema);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
executor::Schema output_schema;
std::vector<size_t> required_col_indices;
for (const auto& gb : stmt.group_by()) {
const auto& gb_name = gb->to_string();
size_t idx = current_root->output_schema().find_column(gb_name);
if (idx != static_cast<size_t>(-1)) {
output_schema.add_column(current_root->output_schema().get_column(idx).name(),
current_root->output_schema().get_column(idx).type(),
current_root->output_schema().get_column(idx).nullable());
required_col_indices.push_back(idx);
}
}
for (size_t i = 0; i < agg_infos.size(); ++i) {
output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64,
false);
if (agg_infos[i].input_col_idx >= 0) {
required_col_indices.push_back(static_cast<size_t>(agg_infos[i].input_col_idx));
}
}
base_scan->set_required_columns(required_col_indices, output_schema);
executor::Schema output_schema;
std::vector<size_t> required_col_indices;
executor::Schema reduced_input_schema; // Schema of columns scanned from table
for (const auto& gb : stmt.group_by()) {
const auto& gb_name = gb->to_string();
size_t idx = current_root->output_schema().find_column(gb_name);
if (idx != static_cast<size_t>(-1)) {
const auto& col = current_root->output_schema().get_column(idx);
reduced_input_schema.add_column(col.name(), col.type(), col.nullable());
output_schema.add_column(current_root->output_schema().get_column(idx).name(),
current_root->output_schema().get_column(idx).type(),
current_root->output_schema().get_column(idx).nullable());
required_col_indices.push_back(idx);
}
}
for (size_t i = 0; i < agg_infos.size(); ++i) {
output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64,
false);
if (agg_infos[i].input_col_idx >= 0) {
size_t input_idx = static_cast<size_t>(agg_infos[i].input_col_idx);
const auto& col = current_root->output_schema().get_column(input_idx);
reduced_input_schema.add_column(col.name(), col.type(), col.nullable());
required_col_indices.push_back(static_cast<size_t>(agg_infos[i].input_col_idx));
}
}
base_scan->set_required_columns(required_col_indices, reduced_input_schema);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/executor/query_executor.cpp` around lines 1724 - 1744, The code builds
output_schema (GROUP BY keys + aggregate result types) and passes it to
base_scan->set_required_columns, causing the scan to initialize output columns
with aggregate result types and crash when storage expects the original input
types; instead build a reduced_schema containing the actual input column types
for every index in required_col_indices (use
current_root->output_schema().get_column(idx).type() for group-by key indices
and, for agg_infos entries with input_col_idx >=0, use the table/input column
type rather than the aggregate result type) and pass that reduced_schema to
base_scan->set_required_columns; update references around output_schema,
required_col_indices, agg_infos, and the call base_scan->set_required_columns to
use this corrected schema so ColumnarTable::read_batch sees matching types.


auto agg_op = std::make_unique<VectorizedGroupByOperator>(
std::move(current_root), std::move(group_by), std::move(agg_infos), output_schema,
thread_pool);
Expand Down
173 changes: 173 additions & 0 deletions src/storage/columnar_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,177 @@ bool ColumnarTable::read_batch(uint64_t start_row, uint32_t batch_size,
return true;
}

bool ColumnarTable::read_batch(uint64_t start_row, uint32_t batch_size,
executor::VectorBatch& out_batch,
const std::vector<size_t>& col_indices) {
if (start_row >= row_count_) return false;
if (col_indices.empty()) return false;

uint32_t actual_rows =
static_cast<uint32_t>(std::min(static_cast<uint64_t>(batch_size), row_count_ - start_row));

// out_batch is pre-initialized with the reduced schema by the caller
// (VectorizedSeqScanOperator via set_required_columns)

for (size_t idx = 0; idx < col_indices.size(); ++idx) {
size_t col_idx = col_indices[idx];
const std::string base = name_ + ".col" + std::to_string(col_idx);
std::ifstream n_in(storage_manager_.get_full_path(base + ".nulls.bin"), std::ios::binary);
std::ifstream d_in(storage_manager_.get_full_path(base + ".data.bin"), std::ios::binary);
if (!n_in.is_open() || !d_in.is_open()) return false;

auto& target_col = out_batch.get_column(idx);
const auto type = schema_.get_column(col_idx).type();

if (type == common::ValueType::TYPE_INT64) {
auto& num_vec = dynamic_cast<executor::NumericVector<int64_t>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg);
std::vector<int64_t> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else {
num_vec.append(common::Value::make_int64(data[r]));
}
}
} else if (type == common::ValueType::TYPE_INT32 || type == common::ValueType::TYPE_INT16 ||
type == common::ValueType::TYPE_INT8) {
auto& num_vec = dynamic_cast<executor::NumericVector<int64_t>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg);
std::vector<int64_t> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else if (type == common::ValueType::TYPE_INT32) {
num_vec.append(common::Value(static_cast<int32_t>(data[r])));
} else if (type == common::ValueType::TYPE_INT16) {
num_vec.append(common::Value(static_cast<int16_t>(data[r])));
} else {
num_vec.append(common::Value(static_cast<int8_t>(data[r])));
}
}
} else if (type == common::ValueType::TYPE_FLOAT64) {
auto& num_vec = dynamic_cast<executor::NumericVector<double>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg);
std::vector<double> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else {
num_vec.append(common::Value::make_float64(data[r]));
}
}
} else if (type == common::ValueType::TYPE_FLOAT32) {
auto& num_vec = dynamic_cast<executor::NumericVector<float>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg);
std::vector<double> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else {
num_vec.append(common::Value(static_cast<float>(data[r])));
}
}
} else if (type == common::ValueType::TYPE_DECIMAL) {
auto& num_vec = dynamic_cast<executor::NumericVector<double>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg);
std::vector<double> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else {
num_vec.append(common::Value::make_float64(data[r]));
}
}
} else if (type == common::ValueType::TYPE_BOOL) {
auto& num_vec = dynamic_cast<executor::NumericVector<bool>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else {
num_vec.append(common::Value(data[r] != 0));
}
}
} else if (type == common::ValueType::TYPE_TEXT ||
type == common::ValueType::TYPE_VARCHAR ||
type == common::ValueType::TYPE_CHAR) {
auto& str_vec = dynamic_cast<executor::StringVector&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

if (start_row > 0) {
for (uint32_t r = 0; r < start_row; ++r) {
uint32_t len = 0;
if (!d_in.read(reinterpret_cast<char*>(&len), 4)) break;
if (len > 0) {
d_in.seekg(static_cast<std::streamoff>(len), std::ios::cur);
}
}
}

for (uint32_t r = 0; r < actual_rows; ++r) {
uint32_t len = 0;
d_in.read(reinterpret_cast<char*>(&len), 4);
std::string s(len, '\0');
d_in.read(s.data(), len);
if (nulls[r] != 0U) {
str_vec.append(common::Value::make_null());
} else {
str_vec.append(common::Value::make_text(s));
}
}
} else {
throw std::runtime_error("ColumnarTable::read_batch(col_indices): Unsupported type " +
std::to_string(static_cast<int>(type)));
}
}
out_batch.set_row_count(actual_rows);
return true;
}
Comment on lines +310 to +470

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Consider extracting shared deserialization logic into a helper method.

The type-dispatch deserialization logic (lines 320-466) is nearly identical to the original read_batch implementation (lines 141-292). This ~150-line duplication creates maintenance burden: any type support changes or bug fixes must be applied in both places, risking divergence.

♻️ Suggested refactoring approach

Extract a private helper method:

private:
    // Helper: deserialize a single column from disk into a vector
    bool deserialize_column(size_t col_idx, uint64_t start_row, uint32_t actual_rows,
                           executor::ColumnVector& target_col);

Then both read_batch overloads can call this helper in their loops, eliminating duplication.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/storage/columnar_table.cpp` around lines 310 - 470, The duplicated
per-column deserialization logic in ColumnarTable::read_batch should be
extracted into a private helper (suggested signature: bool
deserialize_column(size_t col_idx, uint64_t start_row, uint32_t actual_rows,
executor::ColumnVector& target_col)) that encapsulates opening nulls/data files
(using storage_manager_ and name_), seeking, reading nulls and data, and
appending values for all supported common::ValueType cases; update both
read_batch overloads to call deserialize_column for each col_idx and propagate
its bool result (leave out_batch.set_row_count/return handling in the callers).
Ensure the helper references schema_.get_column(col_idx).type() and reuses the
same deserialization branches (INT64, INT32/16/8, FLOAT64/FLOAT32/DECIMAL, BOOL,
TEXT) so all file IO and value conversion is centralized.

} // namespace cloudsql::storage