[feature](Nereids)add the ability of projection on each ExecNode and add column prune on OlapScan (#11842)

We have added logical project before, but to actually finish the prune to reduce the data IO, we need to add related supports in translator and BE.
This PR:
- add projections on each ExecNode in BE
- translate PhysicalProject into projections on PlanNode in FE
- do column prune on ScanNode in FE

Co-authored-by: HappenLee <happenlee@hotmail.com>
This commit is contained in:
Kikyou1997
2022-08-30 16:17:10 +08:00
committed by GitHub
parent fb27e3ef31
commit 9a74ad1702
40 changed files with 352 additions and 129 deletions

View File

@ -150,9 +150,13 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
_rows_returned_rate(nullptr),
_memory_used_counter(nullptr),
_get_next_span(),
_is_closed(false) {}
_is_closed(false) {
if (tnode.__isset.output_tuple_id) {
_output_row_descriptor.reset(new RowDescriptor(descs, {tnode.output_tuple_id}, {true}));
}
}
ExecNode::~ExecNode() {}
ExecNode::~ExecNode() = default;
void ExecNode::push_down_predicate(RuntimeState* state, std::list<ExprContext*>* expr_ctxs) {
if (_type != TPlanNodeType::AGGREGATION_NODE) {
@ -194,6 +198,13 @@ Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) {
}
RETURN_IF_ERROR(Expr::create_expr_trees(_pool, tnode.conjuncts, &_conjunct_ctxs));
// create the projections expr
if (tnode.__isset.projections) {
DCHECK(tnode.__isset.output_tuple_id);
RETURN_IF_ERROR(
vectorized::VExpr::create_expr_trees(_pool, tnode.projections, &_projections));
}
return Status::OK();
}
@ -220,6 +231,7 @@ Status ExecNode::prepare(RuntimeState* state) {
typeid(*this) != typeid(doris::vectorized::NewOlapScanNode)) {
RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, _row_descriptor));
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, _row_descriptor));
for (int i = 0; i < _children.size(); ++i) {
RETURN_IF_ERROR(_children[i]->prepare(state));
@ -239,6 +251,8 @@ Status ExecNode::open(RuntimeState* state) {
} else {
return Status::OK();
}
RETURN_IF_ERROR(Expr::open(_conjunct_ctxs, state));
return vectorized::VExpr::open(_projections, state);
}
Status ExecNode::reset(RuntimeState* state) {
@ -282,6 +296,7 @@ Status ExecNode::close(RuntimeState* state) {
typeid(*this) != typeid(doris::vectorized::NewOlapScanNode)) {
Expr::close(_conjunct_ctxs, state);
}
vectorized::VExpr::close(_projections, state);
if (_buffer_pool_client.is_registered()) {
state->exec_env()->buffer_pool()->DeregisterClient(&_buffer_pool_client);
@ -769,4 +784,42 @@ std::string ExecNode::get_name() {
return (_is_vec ? "V" : "") + print_plan_node_type(_type);
}
Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) {
using namespace vectorized;
auto is_mem_reuse = output_block->mem_reuse();
MutableBlock mutable_block =
is_mem_reuse ? MutableBlock(output_block)
: MutableBlock(VectorizedUtils::create_empty_columnswithtypename(
*_output_row_descriptor));
auto rows = origin_block->rows();
if (rows != 0) {
auto& mutable_columns = mutable_block.mutable_columns();
DCHECK(mutable_columns.size() == _projections.size());
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(_projections[i]->execute(origin_block, &result_column_id));
auto column_ptr = origin_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
}
if (!is_mem_reuse) output_block->swap(mutable_block.to_block());
DCHECK(output_block->rows() == rows);
}
return Status::OK();
}
Status ExecNode::get_next_after_projects(RuntimeState* state, vectorized::Block* block, bool* eos) {
// delete the UNLIKELY after support new optimizers
if (UNLIKELY(_output_row_descriptor)) {
_origin_block.clear_column_data(_row_descriptor.num_materialized_slots());
auto status = get_next(state, &_origin_block, eos);
if (UNLIKELY(!status.ok())) return status;
return do_projections(&_origin_block, block);
}
return get_next(state, block, eos);
}
} // namespace doris