-
Notifications
You must be signed in to change notification settings - Fork 0
Column Projection Pushdown: Q1 GROUP BY 16-17x faster #161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -1512,6 +1512,8 @@ std::unique_ptr<VectorizedOperator> QueryExecutor::build_vectorized_plan( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| auto thread_pool = std::make_shared<executor::ThreadPool>(std::thread::hardware_concurrency()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| std::unique_ptr<VectorizedOperator> current_root = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| std::make_unique<VectorizedSeqScanOperator>(base_table_name, col_table, thread_pool); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| VectorizedSeqScanOperator* base_scan = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| static_cast<VectorizedSeqScanOperator*>(current_root.get()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Track estimated output rows for join reordering decisions | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| uint64_t current_est_rows = optimizer::RowEstimator::estimate_scan_rows(*base_table_meta); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1720,20 +1722,27 @@ std::unique_ptr<VectorizedOperator> QueryExecutor::build_vectorized_plan( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| executor::Schema output_schema; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| std::vector<size_t> required_col_indices; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (const auto& gb : stmt.group_by()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const auto& gb_name = gb->to_string(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| size_t idx = current_root->output_schema().find_column(gb_name); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (idx != static_cast<size_t>(-1)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| output_schema.add_column(current_root->output_schema().get_column(idx).name(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| current_root->output_schema().get_column(idx).type(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| current_root->output_schema().get_column(idx).nullable()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| required_col_indices.push_back(idx); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for (size_t i = 0; i < agg_infos.size(); ++i) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| false); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (agg_infos[i].input_col_idx >= 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| required_col_indices.push_back(static_cast<size_t>(agg_infos[i].input_col_idx)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| base_scan->set_required_columns(required_col_indices, output_schema); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
1724
to
+1744
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wrong schema passed to The Then in Example:
🐛 Proposed fixBuild executor::Schema output_schema;
std::vector<size_t> required_col_indices;
+ executor::Schema reduced_input_schema; // Schema of columns scanned from table
for (const auto& gb : stmt.group_by()) {
const auto& gb_name = gb->to_string();
size_t idx = current_root->output_schema().find_column(gb_name);
if (idx != static_cast<size_t>(-1)) {
+ const auto& col = current_root->output_schema().get_column(idx);
+ reduced_input_schema.add_column(col.name(), col.type(), col.nullable());
output_schema.add_column(current_root->output_schema().get_column(idx).name(),
current_root->output_schema().get_column(idx).type(),
current_root->output_schema().get_column(idx).nullable());
required_col_indices.push_back(idx);
}
}
for (size_t i = 0; i < agg_infos.size(); ++i) {
output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64,
false);
if (agg_infos[i].input_col_idx >= 0) {
+ size_t input_idx = static_cast<size_t>(agg_infos[i].input_col_idx);
+ const auto& col = current_root->output_schema().get_column(input_idx);
+ reduced_input_schema.add_column(col.name(), col.type(), col.nullable());
required_col_indices.push_back(static_cast<size_t>(agg_infos[i].input_col_idx));
}
}
- base_scan->set_required_columns(required_col_indices, output_schema);
+ base_scan->set_required_columns(required_col_indices, reduced_input_schema);📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| auto agg_op = std::make_unique<VectorizedGroupByOperator>( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| std::move(current_root), std::move(group_by), std::move(agg_infos), output_schema, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| thread_pool); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -295,4 +295,177 @@ bool ColumnarTable::read_batch(uint64_t start_row, uint32_t batch_size, | |
| return true; | ||
| } | ||
|
|
||
| bool ColumnarTable::read_batch(uint64_t start_row, uint32_t batch_size, | ||
| executor::VectorBatch& out_batch, | ||
| const std::vector<size_t>& col_indices) { | ||
| if (start_row >= row_count_) return false; | ||
| if (col_indices.empty()) return false; | ||
|
|
||
| uint32_t actual_rows = | ||
| static_cast<uint32_t>(std::min(static_cast<uint64_t>(batch_size), row_count_ - start_row)); | ||
|
|
||
| // out_batch is pre-initialized with the reduced schema by the caller | ||
| // (VectorizedSeqScanOperator via set_required_columns) | ||
|
|
||
| for (size_t idx = 0; idx < col_indices.size(); ++idx) { | ||
| size_t col_idx = col_indices[idx]; | ||
| const std::string base = name_ + ".col" + std::to_string(col_idx); | ||
| std::ifstream n_in(storage_manager_.get_full_path(base + ".nulls.bin"), std::ios::binary); | ||
| std::ifstream d_in(storage_manager_.get_full_path(base + ".data.bin"), std::ios::binary); | ||
| if (!n_in.is_open() || !d_in.is_open()) return false; | ||
|
|
||
| auto& target_col = out_batch.get_column(idx); | ||
| const auto type = schema_.get_column(col_idx).type(); | ||
|
|
||
| if (type == common::ValueType::TYPE_INT64) { | ||
| auto& num_vec = dynamic_cast<executor::NumericVector<int64_t>&>(target_col); | ||
|
|
||
| n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg); | ||
| std::vector<uint8_t> nulls(actual_rows); | ||
| n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows); | ||
|
|
||
| d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg); | ||
| std::vector<int64_t> data(actual_rows); | ||
| d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8); | ||
|
|
||
| for (uint32_t r = 0; r < actual_rows; ++r) { | ||
| if (nulls[r] != 0U) { | ||
| num_vec.append(common::Value::make_null()); | ||
| } else { | ||
| num_vec.append(common::Value::make_int64(data[r])); | ||
| } | ||
| } | ||
| } else if (type == common::ValueType::TYPE_INT32 || type == common::ValueType::TYPE_INT16 || | ||
| type == common::ValueType::TYPE_INT8) { | ||
| auto& num_vec = dynamic_cast<executor::NumericVector<int64_t>&>(target_col); | ||
|
|
||
| n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg); | ||
| std::vector<uint8_t> nulls(actual_rows); | ||
| n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows); | ||
|
|
||
| d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg); | ||
| std::vector<int64_t> data(actual_rows); | ||
| d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8); | ||
|
|
||
| for (uint32_t r = 0; r < actual_rows; ++r) { | ||
| if (nulls[r] != 0U) { | ||
| num_vec.append(common::Value::make_null()); | ||
| } else if (type == common::ValueType::TYPE_INT32) { | ||
| num_vec.append(common::Value(static_cast<int32_t>(data[r]))); | ||
| } else if (type == common::ValueType::TYPE_INT16) { | ||
| num_vec.append(common::Value(static_cast<int16_t>(data[r]))); | ||
| } else { | ||
| num_vec.append(common::Value(static_cast<int8_t>(data[r]))); | ||
| } | ||
| } | ||
| } else if (type == common::ValueType::TYPE_FLOAT64) { | ||
| auto& num_vec = dynamic_cast<executor::NumericVector<double>&>(target_col); | ||
|
|
||
| n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg); | ||
| std::vector<uint8_t> nulls(actual_rows); | ||
| n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows); | ||
|
|
||
| d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg); | ||
| std::vector<double> data(actual_rows); | ||
| d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8); | ||
|
|
||
| for (uint32_t r = 0; r < actual_rows; ++r) { | ||
| if (nulls[r] != 0U) { | ||
| num_vec.append(common::Value::make_null()); | ||
| } else { | ||
| num_vec.append(common::Value::make_float64(data[r])); | ||
| } | ||
| } | ||
| } else if (type == common::ValueType::TYPE_FLOAT32) { | ||
| auto& num_vec = dynamic_cast<executor::NumericVector<float>&>(target_col); | ||
|
|
||
| n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg); | ||
| std::vector<uint8_t> nulls(actual_rows); | ||
| n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows); | ||
|
|
||
| d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg); | ||
| std::vector<double> data(actual_rows); | ||
| d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8); | ||
|
|
||
| for (uint32_t r = 0; r < actual_rows; ++r) { | ||
| if (nulls[r] != 0U) { | ||
| num_vec.append(common::Value::make_null()); | ||
| } else { | ||
| num_vec.append(common::Value(static_cast<float>(data[r]))); | ||
| } | ||
| } | ||
| } else if (type == common::ValueType::TYPE_DECIMAL) { | ||
| auto& num_vec = dynamic_cast<executor::NumericVector<double>&>(target_col); | ||
|
|
||
| n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg); | ||
| std::vector<uint8_t> nulls(actual_rows); | ||
| n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows); | ||
|
|
||
| d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg); | ||
| std::vector<double> data(actual_rows); | ||
| d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8); | ||
|
|
||
| for (uint32_t r = 0; r < actual_rows; ++r) { | ||
| if (nulls[r] != 0U) { | ||
| num_vec.append(common::Value::make_null()); | ||
| } else { | ||
| num_vec.append(common::Value::make_float64(data[r])); | ||
| } | ||
| } | ||
| } else if (type == common::ValueType::TYPE_BOOL) { | ||
| auto& num_vec = dynamic_cast<executor::NumericVector<bool>&>(target_col); | ||
|
|
||
| n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg); | ||
| std::vector<uint8_t> nulls(actual_rows); | ||
| n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows); | ||
|
|
||
| d_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg); | ||
| std::vector<uint8_t> data(actual_rows); | ||
| d_in.read(reinterpret_cast<char*>(data.data()), actual_rows); | ||
|
|
||
| for (uint32_t r = 0; r < actual_rows; ++r) { | ||
| if (nulls[r] != 0U) { | ||
| num_vec.append(common::Value::make_null()); | ||
| } else { | ||
| num_vec.append(common::Value(data[r] != 0)); | ||
| } | ||
| } | ||
| } else if (type == common::ValueType::TYPE_TEXT || | ||
| type == common::ValueType::TYPE_VARCHAR || | ||
| type == common::ValueType::TYPE_CHAR) { | ||
| auto& str_vec = dynamic_cast<executor::StringVector&>(target_col); | ||
|
|
||
| n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg); | ||
| std::vector<uint8_t> nulls(actual_rows); | ||
| n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows); | ||
|
|
||
| if (start_row > 0) { | ||
| for (uint32_t r = 0; r < start_row; ++r) { | ||
| uint32_t len = 0; | ||
| if (!d_in.read(reinterpret_cast<char*>(&len), 4)) break; | ||
| if (len > 0) { | ||
| d_in.seekg(static_cast<std::streamoff>(len), std::ios::cur); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| for (uint32_t r = 0; r < actual_rows; ++r) { | ||
| uint32_t len = 0; | ||
| d_in.read(reinterpret_cast<char*>(&len), 4); | ||
| std::string s(len, '\0'); | ||
| d_in.read(s.data(), len); | ||
| if (nulls[r] != 0U) { | ||
| str_vec.append(common::Value::make_null()); | ||
| } else { | ||
| str_vec.append(common::Value::make_text(s)); | ||
| } | ||
| } | ||
| } else { | ||
| throw std::runtime_error("ColumnarTable::read_batch(col_indices): Unsupported type " + | ||
| std::to_string(static_cast<int>(type))); | ||
| } | ||
| } | ||
| out_batch.set_row_count(actual_rows); | ||
| return true; | ||
| } | ||
|
Comment on lines
+310
to
+470
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win Consider extracting shared deserialization logic into a helper method. The type-dispatch deserialization logic (lines 320-466) is nearly identical to the original ♻️ Suggested refactoring approachExtract a private helper method: private:
// Helper: deserialize a single column from disk into a vector
bool deserialize_column(size_t col_idx, uint64_t start_row, uint32_t actual_rows,
executor::ColumnVector& target_col);Then both 🤖 Prompt for AI Agents |
||
| } // namespace cloudsql::storage | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
output_schema_must be updated when column projection is enabled.When
set_required_columnsis called, the operator will produce batches withreduced_schema, butoutput_schema_(returned byoutput_schema()) still reflects the full table schema (set at line 82). Downstream operators resolve column indices fromchild_->output_schema(), then access batch columns at those indices. If the indices refer to the full schema but the batch has a reduced schema, column accesses will retrieve wrong data or crash from out-of-bounds access.For example:
[id INT, cat TEXT, val INT][cat, val]→ indices[1, 2]in full schema[cat, val]→ positions[0, 1]batch.get_column(1)retrieves "val" (batch position 1) instead of "cat" → wrong result🐛 Proposed fix
void set_required_columns(std::vector<size_t> col_indices, executor::Schema reduced_schema) { required_col_indices_ = std::move(col_indices); reduced_schema_ = std::move(reduced_schema); + output_schema_ = reduced_schema_; // Update output schema to match actual batch schema }🤖 Prompt for AI Agents