[featrue](expr) support common subexpression elimination be part (#32673)

This commit is contained in:
Mryange
2024-04-01 10:20:35 +08:00
committed by yiguolei
parent 61e214c327
commit 8e19cdd745
9 changed files with 248 additions and 57 deletions

View File

@ -89,6 +89,18 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
_output_row_descriptor = std::make_unique<RowDescriptor>(
descs, std::vector {tnode.output_tuple_id}, std::vector {true});
}
if (!tnode.intermediate_output_tuple_id_list.empty()) {
DCHECK(tnode.__isset.output_tuple_id) << " no final output tuple id";
// common subexpression elimination
DCHECK_EQ(tnode.intermediate_output_tuple_id_list.size(),
tnode.intermediate_projections_list.size());
_intermediate_output_row_descriptor.reserve(tnode.intermediate_output_tuple_id_list.size());
for (auto output_tuple_id : tnode.intermediate_output_tuple_id_list) {
_intermediate_output_row_descriptor.push_back(
RowDescriptor(descs, std::vector {output_tuple_id}, std::vector {true}));
}
}
_query_statistics = std::make_shared<QueryStatistics>();
}
@ -114,7 +126,15 @@ Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) {
DCHECK(tnode.__isset.output_tuple_id);
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections, _projections));
}
if (!tnode.intermediate_projections_list.empty()) {
DCHECK(tnode.__isset.projections) << "no final projections";
_intermediate_projections.reserve(tnode.intermediate_projections_list.size());
for (const auto& tnode_projections : tnode.intermediate_projections_list) {
vectorized::VExprContextSPtrs projections;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode_projections, projections));
_intermediate_projections.push_back(projections);
}
}
return Status::OK();
}
@ -143,7 +163,12 @@ Status ExecNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, intermediate_row_desc()));
for (int i = 0; i < _intermediate_projections.size(); i++) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state,
intermediate_row_desc(i)));
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, projections_row_desc()));
for (auto& i : _children) {
RETURN_IF_ERROR(i->prepare(state));
@ -155,6 +180,9 @@ Status ExecNode::alloc_resource(RuntimeState* state) {
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->open(state));
}
for (auto& projections : _intermediate_projections) {
RETURN_IF_ERROR(vectorized::VExpr::open(projections, state));
}
RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state));
return Status::OK();
}
@ -514,6 +542,22 @@ std::string ExecNode::get_name() {
Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_projection_timer);
const size_t rows = origin_block->rows();
if (rows == 0) {
return Status::OK();
}
vectorized::Block input_block = *origin_block;
std::vector<int> result_column_ids;
for (auto& projections : _intermediate_projections) {
result_column_ids.resize(projections.size());
for (int i = 0; i < projections.size(); i++) {
RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i]));
}
input_block.shuffle_columns(result_column_ids);
}
DCHECK_EQ(rows, input_block.rows());
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (_keep_origin || !from->is_exclusive()) {
@ -535,30 +579,27 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo
using namespace vectorized;
MutableBlock mutable_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor);
auto rows = origin_block->rows();
if (rows != 0) {
auto& mutable_columns = mutable_block.mutable_columns();
auto& mutable_columns = mutable_block.mutable_columns();
if (mutable_columns.size() != _projections.size()) {
return Status::InternalError(
"Logical error during processing {}, output of projections {} mismatches with "
"exec node output {}",
this->get_name(), _projections.size(), mutable_columns.size());
}
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(_projections[i]->execute(origin_block, &result_column_id));
auto column_ptr = origin_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
//TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
DCHECK(mutable_block.rows() == rows);
output_block->set_columns(std::move(mutable_columns));
if (mutable_columns.size() != _projections.size()) {
return Status::InternalError(
"Logical error during processing {}, output of projections {} mismatches with "
"exec node output {}",
this->get_name(), _projections.size(), mutable_columns.size());
}
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(_projections[i]->execute(&input_block, &result_column_id));
auto column_ptr = input_block.get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
//TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
DCHECK(mutable_block.rows() == rows);
output_block->set_columns(std::move(mutable_columns));
return Status::OK();
}