From 9a74ad1702fdf850e758934d8237402fac60b0d6 Mon Sep 17 00:00:00 2001 From: Kikyou1997 <33112463+Kikyou1997@users.noreply.github.com> Date: Tue, 30 Aug 2022 16:17:10 +0800 Subject: [PATCH] [feature](Nereids)add the ability of projection on each ExecNode and add column prune on OlapScan (#11842) We have added logical project before, but to actually finish the prune to reduce the data IO, we need to add related supports in translator and BE. This PR: - add projections on each ExecNode in BE - translate PhysicalProject into projections on PlanNode in FE - do column prune on ScanNode in FE Co-authored-by: HappenLee --- be/src/exec/exec_node.cpp | 57 ++++++++++++- be/src/exec/exec_node.h | 14 +++- be/src/runtime/plan_fragment_executor.cpp | 5 +- be/src/vec/exec/join/vhash_join_node.cpp | 8 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 5 +- be/src/vec/exec/vaggregation_node.cpp | 4 +- be/src/vec/exec/vanalytic_eval_node.cpp | 5 +- be/src/vec/exec/vassert_num_rows_node.cpp | 4 +- be/src/vec/exec/vblocking_join_node.cpp | 5 +- be/src/vec/exec/vcross_join_node.cpp | 5 +- be/src/vec/exec/vrepeat_node.cpp | 2 +- be/src/vec/exec/vselect_node.cpp | 5 +- be/src/vec/exec/vset_operation_node.cpp | 7 +- be/src/vec/exec/vsort_node.cpp | 2 +- be/src/vec/exec/vtable_function_node.cpp | 2 +- be/src/vec/exec/vunion_node.cpp | 7 +- be/src/vec/exprs/vexpr.cpp | 8 +- be/src/vec/functions/like.cpp | 11 ++- .../apache/doris/analysis/SlotDescriptor.java | 6 ++ .../glue/translator/ExpressionTranslator.java | 3 +- .../translator/PhysicalPlanTranslator.java | 78 ++++++++++++------ .../translator/PlanTranslatorContext.java | 15 ++-- .../apache/doris/nereids/jobs/JobType.java | 2 +- .../doris/nereids/jobs/batch/RewriteJob.java | 2 + .../doris/nereids/stats/StatsCalculator.java | 2 +- .../nereids/trees/expressions/Expression.java | 23 ------ .../trees/expressions/SlotReference.java | 4 - .../nereids/trees/plans/AbstractPlan.java | 12 +++ .../doris/nereids/trees/plans/GroupPlan.java | 1 + .../doris/nereids/trees/plans/Plan.java | 4 - .../plans/physical/AbstractPhysicalPlan.java | 7 -- .../trees/plans/visitor/PlanVisitor.java | 2 +- .../apache/doris/planner/ExchangeNode.java | 13 ++- .../apache/doris/planner/HashJoinNode.java | 29 ++++++- .../apache/doris/planner/OlapScanNode.java | 1 - .../org/apache/doris/planner/PlanNode.java | 31 ++++++- .../doris/planner/SingleNodePlanner.java | 2 +- .../PhysicalPlanTranslatorTest.java | 82 +++++++++++++++++++ .../doris/nereids/util/PlanRewriter.java | 4 +- .../doris/planner/DistributedPlannerTest.java | 2 + 40 files changed, 352 insertions(+), 129 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 0b643135b0..9a2b335728 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -150,9 +150,13 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl _rows_returned_rate(nullptr), _memory_used_counter(nullptr), _get_next_span(), - _is_closed(false) {} + _is_closed(false) { + if (tnode.__isset.output_tuple_id) { + _output_row_descriptor.reset(new RowDescriptor(descs, {tnode.output_tuple_id}, {true})); + } +} -ExecNode::~ExecNode() {} +ExecNode::~ExecNode() = default; void ExecNode::push_down_predicate(RuntimeState* state, std::list* expr_ctxs) { if (_type != TPlanNodeType::AGGREGATION_NODE) { @@ -194,6 +198,13 @@ Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) { } RETURN_IF_ERROR(Expr::create_expr_trees(_pool, tnode.conjuncts, &_conjunct_ctxs)); + // create the projections expr + if (tnode.__isset.projections) { + DCHECK(tnode.__isset.output_tuple_id); + RETURN_IF_ERROR( + vectorized::VExpr::create_expr_trees(_pool, tnode.projections, &_projections)); + } + return Status::OK(); } @@ -220,6 +231,7 @@ Status ExecNode::prepare(RuntimeState* state) { typeid(*this) != typeid(doris::vectorized::NewOlapScanNode)) { RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, _row_descriptor)); } + RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, _row_descriptor)); for (int i = 0; i < _children.size(); ++i) { RETURN_IF_ERROR(_children[i]->prepare(state)); @@ -239,6 +251,8 @@ Status ExecNode::open(RuntimeState* state) { } else { return Status::OK(); } + RETURN_IF_ERROR(Expr::open(_conjunct_ctxs, state)); + return vectorized::VExpr::open(_projections, state); } Status ExecNode::reset(RuntimeState* state) { @@ -282,6 +296,7 @@ Status ExecNode::close(RuntimeState* state) { typeid(*this) != typeid(doris::vectorized::NewOlapScanNode)) { Expr::close(_conjunct_ctxs, state); } + vectorized::VExpr::close(_projections, state); if (_buffer_pool_client.is_registered()) { state->exec_env()->buffer_pool()->DeregisterClient(&_buffer_pool_client); @@ -769,4 +784,42 @@ std::string ExecNode::get_name() { return (_is_vec ? "V" : "") + print_plan_node_type(_type); } +Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) { + using namespace vectorized; + auto is_mem_reuse = output_block->mem_reuse(); + MutableBlock mutable_block = + is_mem_reuse ? MutableBlock(output_block) + : MutableBlock(VectorizedUtils::create_empty_columnswithtypename( + *_output_row_descriptor)); + auto rows = origin_block->rows(); + + if (rows != 0) { + auto& mutable_columns = mutable_block.mutable_columns(); + DCHECK(mutable_columns.size() == _projections.size()); + for (int i = 0; i < mutable_columns.size(); ++i) { + auto result_column_id = -1; + RETURN_IF_ERROR(_projections[i]->execute(origin_block, &result_column_id)); + auto column_ptr = origin_block->get_by_position(result_column_id) + .column->convert_to_full_column_if_const(); + mutable_columns[i]->insert_range_from(*column_ptr, 0, rows); + } + + if (!is_mem_reuse) output_block->swap(mutable_block.to_block()); + DCHECK(output_block->rows() == rows); + } + + return Status::OK(); +} + +Status ExecNode::get_next_after_projects(RuntimeState* state, vectorized::Block* block, bool* eos) { + // delete the UNLIKELY after support new optimizers + if (UNLIKELY(_output_row_descriptor)) { + _origin_block.clear_column_data(_row_descriptor.num_materialized_slots()); + auto status = get_next(state, &_origin_block, eos); + if (UNLIKELY(!status.ok())) return status; + return do_projections(&_origin_block, block); + } + return get_next(state, block, eos); +} + } // namespace doris diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 5b700090b2..c9f4323dca 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -105,6 +105,9 @@ public: virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos); virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos); + // new interface to compatible new optimizers in FE + Status get_next_after_projects(RuntimeState* state, vectorized::Block* block, bool* eos); + // Resets the stream of row batches to be retrieved by subsequent GetNext() calls. // Clears all internal state, returning this node to the state it was in after calling // Prepare() and before calling Open(). This function must not clear memory @@ -179,7 +182,9 @@ public: int id() const { return _id; } TPlanNodeType::type type() const { return _type; } - virtual const RowDescriptor& row_desc() const { return _row_descriptor; } + virtual const RowDescriptor& row_desc() const { + return _output_row_descriptor ? *_output_row_descriptor : _row_descriptor; + } int64_t rows_returned() const { return _num_rows_returned; } int64_t limit() const { return _limit; } bool reached_limit() const { return _limit != -1 && _num_rows_returned >= _limit; } @@ -221,6 +226,9 @@ protected: // and add block rows for profile void reached_limit(vectorized::Block* block, bool* eos); + /// Only use in vectorized exec engine try to do projections to trans _row_desc -> _output_row_desc + Status do_projections(vectorized::Block* origin_block, vectorized::Block* output_block); + /// Extends blocking queue for row batches. Row batches have a property that /// they must be processed in the order they were produced, even in cancellation /// paths. Preceding row batches can contain ptrs to memory in subsequent row batches @@ -276,6 +284,10 @@ protected: std::vector _children; RowDescriptor _row_descriptor; + vectorized::Block _origin_block; + + std::unique_ptr _output_row_descriptor; + std::vector _projections; /// Resource information sent from the frontend. const TBackendResourceProfile _resource_profile; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 9346b947d3..6274b47174 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -339,8 +339,9 @@ Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block* while (!_done) { _block->clear_column_data(_plan->row_desc().num_materialized_slots()); SCOPED_TIMER(profile()->total_time_counter()); - RETURN_IF_ERROR_AND_CHECK_SPAN(_plan->get_next(_runtime_state.get(), _block.get(), &_done), - _plan->get_next_span(), _done); + RETURN_IF_ERROR_AND_CHECK_SPAN( + _plan->get_next_after_projects(_runtime_state.get(), _block.get(), &_done), + _plan->get_next_span(), _done); if (_block->rows() > 0) { COUNTER_UPDATE(_rows_produced_counter, _block->rows()); diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 228e0020e8..05040fc2b6 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -892,6 +892,7 @@ Status HashJoinNode::prepare(RuntimeState* state) { RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->prepare(state, _intermediate_row_desc)); } RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _intermediate_row_desc)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, _intermediate_row_desc)); // right table data types _right_table_data_types = VectorizedUtils::get_data_types(child(1)->row_desc()); @@ -937,8 +938,9 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo do { SCOPED_TIMER(_probe_next_timer); - RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, &_probe_block, &_probe_eos), - child(0)->get_next_span(), _probe_eos); + RETURN_IF_ERROR_AND_CHECK_SPAN( + child(0)->get_next_after_projects(state, &_probe_block, &_probe_eos), + child(0)->get_next_span(), _probe_eos); } while (_probe_block.rows() == 0 && !_probe_eos); probe_rows = _probe_block.rows(); @@ -1135,7 +1137,7 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) { block.clear_column_data(); RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next(state, &block, &eos), + RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next_after_projects(state, &block, &eos), child(1)->get_next_span(), eos); _mem_used += block.allocated_bytes(); diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 00e9b2bc34..0b25890a50 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -175,9 +175,8 @@ Status NewOlapScanner::_init_tablet_reader_params( _tablet_reader_params.delete_predicates.begin())); // Merge the columns in delete predicate that not in latest schema in to current tablet schema - for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) { - _tablet_schema->merge_dropped_columns( - _tablet->tablet_schema(Version(del_pred_pb.version(), del_pred_pb.version()))); + for (auto& del_pred_rs : _tablet_reader_params.delete_predicates) { + _tablet_schema->merge_dropped_columns(_tablet->tablet_schema(del_pred_rs->version())); } // Range diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index a724c46afe..ae6421e7a6 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -433,7 +433,7 @@ Status AggregationNode::open(RuntimeState* state) { while (!eos) { RETURN_IF_CANCELLED(state); release_block_memory(block); - RETURN_IF_ERROR_AND_CHECK_SPAN(_children[0]->get_next(state, &block, &eos), + RETURN_IF_ERROR_AND_CHECK_SPAN(_children[0]->get_next_after_projects(state, &block, &eos), _children[0]->get_next_span(), eos); if (block.rows() == 0) { continue; @@ -461,7 +461,7 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { do { release_block_memory(_preagg_block); RETURN_IF_ERROR_AND_CHECK_SPAN( - _children[0]->get_next(state, &_preagg_block, &child_eos), + _children[0]->get_next_after_projects(state, &_preagg_block, &child_eos), _children[0]->get_next_span(), child_eos); } while (_preagg_block.rows() == 0 && !child_eos); diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index dde7cb8453..fc5b224253 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -467,8 +467,9 @@ Status VAnalyticEvalNode::_fetch_next_block_data(RuntimeState* state) { Block block; RETURN_IF_CANCELLED(state); do { - RETURN_IF_ERROR_AND_CHECK_SPAN(_children[0]->get_next(state, &block, &_input_eos), - _children[0]->get_next_span(), _input_eos); + RETURN_IF_ERROR_AND_CHECK_SPAN( + _children[0]->get_next_after_projects(state, &block, &_input_eos), + _children[0]->get_next_span(), _input_eos); } while (!_input_eos && block.rows() == 0); if (_input_eos && block.rows() == 0) { diff --git a/be/src/vec/exec/vassert_num_rows_node.cpp b/be/src/vec/exec/vassert_num_rows_node.cpp index 28e1389bb7..b9239a3c53 100644 --- a/be/src/vec/exec/vassert_num_rows_node.cpp +++ b/be/src/vec/exec/vassert_num_rows_node.cpp @@ -51,8 +51,8 @@ Status VAssertNumRowsNode::get_next(RuntimeState* state, Block* block, bool* eos INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VAssertNumRowsNode::get_next"); SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, block, eos), child(0)->get_next_span(), - *eos); + RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, block, eos), + child(0)->get_next_span(), *eos); _num_rows_returned += block->rows(); bool assert_res = false; switch (_assertion) { diff --git a/be/src/vec/exec/vblocking_join_node.cpp b/be/src/vec/exec/vblocking_join_node.cpp index c6d548de91..809da414f4 100644 --- a/be/src/vec/exec/vblocking_join_node.cpp +++ b/be/src/vec/exec/vblocking_join_node.cpp @@ -119,8 +119,9 @@ Status VBlockingJoinNode::open(RuntimeState* state) { // Seed left child in preparation for get_next(). while (true) { release_block_memory(_left_block); - RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, &_left_block, &_left_side_eos), - child(0)->get_next_span(), _left_side_eos); + RETURN_IF_ERROR_AND_CHECK_SPAN( + child(0)->get_next_after_projects(state, &_left_block, &_left_side_eos), + child(0)->get_next_span(), _left_side_eos); COUNTER_UPDATE(_left_child_row_counter, _left_block.rows()); _left_block_pos = 0; diff --git a/be/src/vec/exec/vcross_join_node.cpp b/be/src/vec/exec/vcross_join_node.cpp index 4a8c2f3b17..c3b94f7561 100644 --- a/be/src/vec/exec/vcross_join_node.cpp +++ b/be/src/vec/exec/vcross_join_node.cpp @@ -60,7 +60,7 @@ Status VCrossJoinNode::construct_build_side(RuntimeState* state) { RETURN_IF_CANCELLED(state); Block block; - RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next(state, &block, &eos), + RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next_after_projects(state, &block, &eos), child(1)->get_next_span(), eos); auto rows = block.rows(); auto mem_usage = block.allocated_bytes(); @@ -117,7 +117,8 @@ Status VCrossJoinNode::get_next(RuntimeState* state, Block* block, bool* eos) { release_block_memory(_left_block); timer.stop(); RETURN_IF_ERROR_AND_CHECK_SPAN( - child(0)->get_next(state, &_left_block, &_left_side_eos), + child(0)->get_next_after_projects(state, &_left_block, + &_left_side_eos), child(0)->get_next_span(), _left_side_eos); timer.start(); } while (_left_block.rows() == 0 && !_left_side_eos); diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp index 3c180d417d..a2001cff15 100644 --- a/be/src/vec/exec/vrepeat_node.cpp +++ b/be/src/vec/exec/vrepeat_node.cpp @@ -167,7 +167,7 @@ Status VRepeatNode::get_next(RuntimeState* state, Block* block, bool* eos) { if (_child_block->rows() == 0) { while (_child_block->rows() == 0 && !_child_eos) { RETURN_IF_ERROR_AND_CHECK_SPAN( - child(0)->get_next(state, _child_block.get(), &_child_eos), + child(0)->get_next_after_projects(state, _child_block.get(), &_child_eos), child(0)->get_next_span(), _child_eos); } diff --git a/be/src/vec/exec/vselect_node.cpp b/be/src/vec/exec/vselect_node.cpp index c69f997150..8f561f10a0 100644 --- a/be/src/vec/exec/vselect_node.cpp +++ b/be/src/vec/exec/vselect_node.cpp @@ -48,8 +48,9 @@ Status VSelectNode::get_next(RuntimeState* state, vectorized::Block* block, bool RETURN_IF_CANCELLED(state); do { RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR_AND_CHECK_SPAN(_children[0]->get_next(state, block, &_child_eos), - _children[0]->get_next_span(), _child_eos); + RETURN_IF_ERROR_AND_CHECK_SPAN( + _children[0]->get_next_after_projects(state, block, &_child_eos), + _children[0]->get_next_span(), _child_eos); if (_child_eos) { *eos = true; break; diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index b46d9988c5..b95714513c 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -242,7 +242,7 @@ Status VSetOperationNode::hash_table_build(RuntimeState* state) { block.clear_column_data(); SCOPED_TIMER(_build_timer); RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, &block, &eos), + RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, &block, &eos), child(0)->get_next_span(), eos); size_t allocated_bytes = block.allocated_bytes(); @@ -309,8 +309,9 @@ Status VSetOperationNode::process_probe_block(RuntimeState* state, int child_id, _probe_rows = 0; RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR_AND_CHECK_SPAN(child(child_id)->get_next(state, &_probe_block, eos), - child(child_id)->get_next_span(), *eos); + RETURN_IF_ERROR_AND_CHECK_SPAN( + child(child_id)->get_next_after_projects(state, &_probe_block, eos), + child(child_id)->get_next_span(), *eos); _probe_rows = _probe_block.rows(); RETURN_IF_ERROR(extract_probe_column(_probe_block, _probe_columns, child_id)); return Status::OK(); diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 73618b0b26..8749b2913e 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -125,7 +125,7 @@ Status VSortNode::sort_input(RuntimeState* state) { bool eos = false; do { Block block; - RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next(state, &block, &eos), + RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, &block, &eos), child(0)->get_next_span(), eos); auto rows = block.rows(); diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp index 0852d9d966..49ff5d5a69 100644 --- a/be/src/vec/exec/vtable_function_node.cpp +++ b/be/src/vec/exec/vtable_function_node.cpp @@ -117,7 +117,7 @@ Status VTableFunctionNode::get_expanded_block(RuntimeState* state, Block* output if (_child_block->rows() == 0) { while (_child_block->rows() == 0 && !_child_eos) { RETURN_IF_ERROR_AND_CHECK_SPAN( - child(0)->get_next(state, _child_block.get(), &_child_eos), + child(0)->get_next_after_projects(state, _child_block.get(), &_child_eos), child(0)->get_next_span(), _child_eos); } if (_child_eos && _child_block->rows() == 0) { diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp index 5ef738ba5b..e9de9c5bcb 100644 --- a/be/src/vec/exec/vunion_node.cpp +++ b/be/src/vec/exec/vunion_node.cpp @@ -106,8 +106,9 @@ Status VUnionNode::get_next_pass_through(RuntimeState* state, Block* block) { _child_eos = false; } DCHECK_EQ(block->rows(), 0); - RETURN_IF_ERROR_AND_CHECK_SPAN(child(_child_idx)->get_next(state, block, &_child_eos), - child(_child_idx)->get_next_span(), _child_eos); + RETURN_IF_ERROR_AND_CHECK_SPAN( + child(_child_idx)->get_next_after_projects(state, block, &_child_eos), + child(_child_idx)->get_next_span(), _child_eos); if (_child_eos) { // Even though the child is at eos, it's not OK to close() it here. Once we close // the child, the row batches that it produced are invalid. Marking the batch as @@ -148,7 +149,7 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) { child_block.clear(); // The first batch from each child is always fetched here. RETURN_IF_ERROR_AND_CHECK_SPAN( - child(_child_idx)->get_next(state, &child_block, &_child_eos), + child(_child_idx)->get_next_after_projects(state, &child_block, &_child_eos), child(_child_idx)->get_next_span(), _child_eos); SCOPED_TIMER(_materialize_exprs_evaluate_timer); if (child_block.rows() > 0) { diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp index 6d0de17dac..60a4ad5ce3 100644 --- a/be/src/vec/exprs/vexpr.cpp +++ b/be/src/vec/exprs/vexpr.cpp @@ -227,15 +227,15 @@ Status VExpr::create_expr_trees(ObjectPool* pool, const std::vector& ctxs, RuntimeState* state, const RowDescriptor& row_desc) { - for (int i = 0; i < ctxs.size(); ++i) { - RETURN_IF_ERROR(ctxs[i]->prepare(state, row_desc)); + for (auto ctx : ctxs) { + RETURN_IF_ERROR(ctx->prepare(state, row_desc)); } return Status::OK(); } void VExpr::close(const std::vector& ctxs, RuntimeState* state) { - for (int i = 0; i < ctxs.size(); ++i) { - ctxs[i]->close(state); + for (auto ctx : ctxs) { + ctx->close(state); } } diff --git a/be/src/vec/functions/like.cpp b/be/src/vec/functions/like.cpp index 306aa19a7e..86db32c0dd 100644 --- a/be/src/vec/functions/like.cpp +++ b/be/src/vec/functions/like.cpp @@ -135,15 +135,14 @@ Status FunctionLikeBase::regexp_fn(LikeSearchState* state, const StringValue& va Status FunctionLikeBase::hs_prepare(FunctionContext* context, const char* expression, hs_database_t** database, hs_scratch_t** scratch) { hs_compile_error_t* compile_err; - - if (hs_compile(expression, HS_FLAG_DOTALL | HS_FLAG_ALLOWEMPTY, HS_MODE_BLOCK, NULL, database, - &compile_err) != HS_SUCCESS) { + auto res = hs_compile(expression, HS_FLAG_DOTALL | HS_FLAG_ALLOWEMPTY, HS_MODE_BLOCK, NULL, + database, &compile_err); + if (res != HS_SUCCESS) { *database = nullptr; if (context) context->set_error("hs_compile regex pattern error"); - auto status = Status::RuntimeError("hs_compile regex pattern error:" + - std::string(compile_err->message)); + return Status::RuntimeError("hs_compile regex pattern error:" + + std::string(compile_err->message)); hs_free_compile_error(compile_err); - return status; } hs_free_compile_error(compile_err); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java index 22eae97c1d..a3fb63f0e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java @@ -338,6 +338,7 @@ public class SlotDescriptor { builder.append(prefix).append("byteOffset=").append(byteOffset).append("\n"); builder.append(prefix).append("nullIndicatorByte=").append(nullIndicatorByte).append("\n"); builder.append(prefix).append("nullIndicatorBit=").append(nullIndicatorBit).append("\n"); + builder.append(prefix).append("nullable=").append(isNullable).append("\n"); builder.append(prefix).append("slotIdx=").append(slotIdx).append("\n"); return builder.toString(); } @@ -345,4 +346,9 @@ public class SlotDescriptor { public boolean isScanSlot() { return parent.getTable() instanceof OlapTable; } + + public void setMaterialized(boolean materialized) { + isMaterialized = materialized; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/ExpressionTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/ExpressionTranslator.java index 5a40a8cb0b..43d9975e43 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/ExpressionTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/ExpressionTranslator.java @@ -258,9 +258,10 @@ public class ExpressionTranslator extends DefaultExpressionVisitor 1) { rootFragment = exchangeToMergeFragment(rootFragment, context); } - // TODO: trick here, we need push project down - if (physicalPlan.getType() == PlanType.PHYSICAL_PROJECT) { - PhysicalProject physicalProject = (PhysicalProject) physicalPlan; - List outputExprs = physicalProject.getProjects().stream() - .map(e -> ExpressionTranslator.translate(e, context)) - .collect(Collectors.toList()); - rootFragment.setOutputExprs(outputExprs); - } else { - List outputExprs = Lists.newArrayList(); - physicalPlan.getOutput().stream().map(Slot::getExprId) - .forEach(exprId -> outputExprs.add(context.findSlotRef(exprId))); - rootFragment.setOutputExprs(outputExprs); - } + List outputExprs = Lists.newArrayList(); + physicalPlan.getOutput().stream().map(Slot::getExprId) + .forEach(exprId -> outputExprs.add(context.findSlotRef(exprId))); + rootFragment.setOutputExprs(outputExprs); rootFragment.getPlanRoot().convertToVectoriezd(); for (PlanFragment fragment : context.getPlanFragments()) { fragment.finalize(null); } Collections.reverse(context.getPlanFragments()); + context.getDescTable().computeMemLayout(); return rootFragment; } @@ -240,10 +232,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor slotList = olapScan.getOutput(); OlapTable olapTable = olapScan.getTable(); - List execConjunctsList = olapScan - .getExpressions() - .stream() - .map(e -> ExpressionTranslator.translate(e, context)).collect(Collectors.toList()); TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, olapTable, context); tupleDescriptor.setTable(olapTable); OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(), tupleDescriptor, olapTable.getName()); @@ -259,12 +247,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor project, PlanTranslatorContext context) { + public PlanFragment visitPhysicalProject(PhysicalProject project, PlanTranslatorContext context) { PlanFragment inputFragment = project.child(0).accept(this, context); // TODO: handle p.child(0) is not NamedExpression. @@ -453,28 +442,66 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor ExpressionTranslator.translate(e, context)) .collect(Collectors.toList()); // TODO: fix the project alias of an aliased relation. + List slotList = project.getOutput(); + TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context); PlanNode inputPlanNode = inputFragment.getPlanRoot(); + // For hash join node, use vSrcToOutputSMap to describe the expression calculation, use + // vIntermediateTupleDescList as input, and set vOutputTupleDesc as the final output. + // TODO: HashJoinNode's be implementation is not support projection yet, remove this after when supported. + if (inputPlanNode instanceof HashJoinNode) { + HashJoinNode hashJoinNode = (HashJoinNode) inputPlanNode; + hashJoinNode.setvOutputTupleDesc(tupleDescriptor); + hashJoinNode.setvSrcToOutputSMap(execExprList); + return inputFragment; + } + inputPlanNode.setProjectList(execExprList); + inputPlanNode.setOutputTupleDesc(tupleDescriptor); + List predicateList = inputPlanNode.getConjuncts(); Set requiredSlotIdList = new HashSet<>(); for (Expr expr : predicateList) { extractExecSlot(expr, requiredSlotIdList); } for (Expr expr : execExprList) { - if (expr instanceof SlotRef) { - requiredSlotIdList.add(((SlotRef) expr).getDesc().getId().asInt()); - } + extractExecSlot(expr, requiredSlotIdList); + } + if (inputPlanNode instanceof OlapScanNode) { + updateChildSlotsMaterialization(inputPlanNode, requiredSlotIdList, context); } return inputFragment; } + private void updateChildSlotsMaterialization(PlanNode execPlan, + Set requiredSlotIdList, + PlanTranslatorContext context) { + Set slotRefSet = new HashSet<>(); + for (Expr expr : execPlan.getConjuncts()) { + expr.collect(SlotRef.class, slotRefSet); + } + Set slotIdSet = slotRefSet.stream() + .map(SlotRef::getSlotId).map(SlotId::asInt).collect(Collectors.toSet()); + slotIdSet.addAll(requiredSlotIdList); + execPlan.getTupleIds().stream() + .map(context::getTupleDesc) + .map(TupleDescriptor::getSlots) + .flatMap(List::stream) + .forEach(s -> s.setIsMaterialized(slotIdSet.contains(s.getId().asInt()))); + } + @Override public PlanFragment visitPhysicalFilter(PhysicalFilter filter, PlanTranslatorContext context) { PlanFragment inputFragment = filter.child(0).accept(this, context); PlanNode planNode = inputFragment.getPlanRoot(); + addConjunctsToPlanNode(filter, planNode, context); + return inputFragment; + } + + private void addConjunctsToPlanNode(PhysicalFilter filter, + PlanNode planNode, + PlanTranslatorContext context) { Expression expression = filter.getPredicates(); List expressionList = ExpressionUtils.extractConjunction(expression); expressionList.stream().map(e -> ExpressionTranslator.translate(e, context)).forEach(planNode::addConjunct); - return inputFragment; } @Override @@ -600,4 +627,5 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor // TODO: We should subtract those pruned column, and consider the expression transformations in the node. @Override - public StatsDeriveResult visitPhysicalProject(PhysicalProject project, Void context) { + public StatsDeriveResult visitPhysicalProject(PhysicalProject project, Void context) { return computeProject(project); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java index 818ef3796e..173b363799 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/Expression.java @@ -32,8 +32,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.List; import java.util.Objects; @@ -45,8 +43,6 @@ public abstract class Expression extends AbstractTreeNode { private static final String INPUT_CHECK_ERROR_MESSAGE = "argument %d requires %s type, however '%s' is of %s type"; - private final Logger logger = LoggerFactory.getLogger(this.getClass()); - public Expression(Expression... children) { super(children); } @@ -147,23 +143,4 @@ public abstract class Expression extends AbstractTreeNode { return 0; } - /** - * Return true if all the SlotRef in the expr tree is bound to the same column. - */ - public boolean boundToColumn(String column) { - for (Expression child : children) { - if (!child.boundToColumn(column)) { - return false; - } - } - return true; - } - - public Expression leftMostNode() { - Expression leftChild = this; - while (leftChild.children.size() > 0) { - leftChild = leftChild.child(0); - } - return leftChild; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java index e8e04e901a..24a601a2e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/SlotReference.java @@ -155,8 +155,4 @@ public class SlotReference extends Slot { return new SlotReference(exprId, name, dataType, nullable, qualifiers); } - @Override - public boolean boundToColumn(String name) { - return this.name.equals(name); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java index 5c7564b50d..06527a1b78 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java @@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.trees.AbstractTreeNode; +import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.statistics.StatsDeriveResult; import org.apache.commons.lang3.StringUtils; @@ -119,4 +120,15 @@ public abstract class AbstractPlan extends AbstractTreeNode implements Pla public int hashCode() { return Objects.hash(statsDeriveResult, logicalProperties); } + + @Override + public List getOutput() { + return logicalProperties.getOutput(); + } + + @Override + public Plan child(int index) { + return super.child(index); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/GroupPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/GroupPlan.java index eaf5c56b44..cfc8d1daf2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/GroupPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/GroupPlan.java @@ -98,4 +98,5 @@ public class GroupPlan extends LogicalLeaf { public String toString() { return "GroupPlan( " + group.getGroupId() + " )"; } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java index a6440252de..68cea493dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/Plan.java @@ -49,10 +49,6 @@ public interface Plan extends TreeNode { List getOutput(); - default List computeOutput(Plan... inputs) { - throw new IllegalStateException("Not support compute output for " + getClass().getName()); - } - String treeString(); default Plan withOutput(List output) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java index 4b2349fbca..a14797ec88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java @@ -20,12 +20,10 @@ package org.apache.doris.nereids.trees.plans.physical; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; -import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.AbstractPlan; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; -import java.util.List; import java.util.Optional; /** @@ -59,11 +57,6 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi this.physicalProperties = PhysicalProperties.ANY; } - @Override - public List getOutput() { - return logicalProperties.getOutput(); - } - @Override public LogicalProperties getLogicalProperties() { return logicalProperties; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java index 6d75bf5819..cc5d914c1a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java @@ -180,7 +180,7 @@ public abstract class PlanVisitor { return visit(nestedLoopJoin, context); } - public R visitPhysicalProject(PhysicalProject project, C context) { + public R visitPhysicalProject(PhysicalProject project, C context) { return visit(project, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index ad2cc2f5fc..5cae77c483 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -23,6 +23,7 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.SortInfo; +import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.common.UserException; import org.apache.doris.common.util.VectorizedUtil; @@ -83,6 +84,7 @@ public class ExchangeNode extends PlanNode { limit = inputNode.limit; } computeTupleIds(); + } public boolean isMergingExchange() { @@ -94,8 +96,15 @@ public class ExchangeNode extends PlanNode { @Override public final void computeTupleIds() { - clearTupleIds(); - tupleIds.addAll(getChild(0).getTupleIds()); + PlanNode inputNode = getChild(0); + TupleDescriptor outputTupleDesc = inputNode.getOutputTupleDesc(); + if (outputTupleDesc != null) { + tupleIds.clear(); + tupleIds.add(outputTupleDesc.getId()); + } else { + clearTupleIds(); + tupleIds.addAll(getChild(0).getTupleIds()); + } tblRefIds.addAll(getChild(0).getTblRefIds()); nullableTupleIds.addAll(getChild(0).getNullableTupleIds()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index 9d0a13e24d..46e35775da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -1080,13 +1080,17 @@ public class HashJoinNode extends PlanNode { } if (vSrcToOutputSMap != null) { for (int i = 0; i < vSrcToOutputSMap.size(); i++) { + // TODO: Enable it after we support new optimizers + // if (ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) { + // msg.addToProjections(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); + // } else msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); - msg.addToProjections(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); } } if (vOutputTupleDesc != null) { msg.hash_join_node.setVoutputTupleId(vOutputTupleDesc.getId().asInt()); - msg.setOutputTupleId(vOutputTupleDesc.getId().asInt()); + // TODO Enable it after we support new optimizers + // msg.setOutputTupleId(vOutputTupleDesc.getId().asInt()); } if (vIntermediateTupleDescList != null) { for (TupleDescriptor tupleDescriptor : vIntermediateTupleDescList) { @@ -1257,4 +1261,25 @@ public class HashJoinNode extends PlanNode { } return true; } + + /** + * Used by nereids. + */ + public void setvOutputTupleDesc(TupleDescriptor vOutputTupleDesc) { + this.vOutputTupleDesc = vOutputTupleDesc; + } + + /** + * Used by nereids. + */ + public void setvIntermediateTupleDescList(List vIntermediateTupleDescList) { + this.vIntermediateTupleDescList = vIntermediateTupleDescList; + } + + /** + * Used by nereids. + */ + public void setvSrcToOutputSMap(List lhs) { + this.vSrcToOutputSMap = new ExprSubstitutionMap(lhs, Collections.emptyList()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 967895ee4e..3c82be05f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -319,7 +319,6 @@ public class OlapScanNode extends ScanNode { } situation = "The key type of table is aggregated."; update = false; - break CHECK; } // CHECKSTYLE IGNORE THIS LINE if (update) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index c25282da83..596897c947 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -49,8 +49,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.Collection; @@ -74,7 +72,6 @@ import java.util.Set; * its children (= are bound by tupleIds). */ public abstract class PlanNode extends TreeNode implements PlanStats { - private static final Logger LOG = LogManager.getLogger(PlanNode.class); protected String planNodeName; @@ -142,6 +139,10 @@ public abstract class PlanNode extends TreeNode implements PlanStats { protected StatisticalType statisticalType = StatisticalType.DEFAULT; protected StatsDeriveResult statsDeriveResult; + protected TupleDescriptor outputTupleDesc; + + protected List projectList; + protected PlanNode(PlanNodeId id, ArrayList tupleIds, String planNodeName, StatisticalType statisticalType) { this.id = id; @@ -550,6 +551,14 @@ public abstract class PlanNode extends TreeNode implements PlanStats { } toThrift(msg); container.addToNodes(msg); + if (projectList != null) { + for (Expr expr : projectList) { + msg.addToProjections(expr.treeToThrift()); + } + } + if (outputTupleDesc != null) { + msg.setOutputTupleId(outputTupleDesc.getId().asInt()); + } if (this instanceof ExchangeNode) { msg.num_children = 0; return; @@ -1009,4 +1018,20 @@ public abstract class PlanNode extends TreeNode implements PlanStats { public void finalizeForNereids() { } + + public void setOutputTupleDesc(TupleDescriptor outputTupleDesc) { + this.outputTupleDesc = outputTupleDesc; + } + + public TupleDescriptor getOutputTupleDesc() { + return outputTupleDesc; + } + + public void setProjectList(List projectList) { + this.projectList = projectList; + } + + public List getProjectList() { + return projectList; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index b93f7f15c0..58e4a02435 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1144,7 +1144,7 @@ public class SingleNodePlanner { MaterializedViewSelector.BestIndexInfo bestIndexInfo = materializedViewSelector.selectBestMV(olapScanNode); if (bestIndexInfo == null) { - selectFailed |= true; + selectFailed = true; TupleId tupleId = olapScanNode.getTupleId(); selectStmt.updateDisableTuplesMVRewriter(tupleId); LOG.debug("MV rewriter of tuple [] will be disable", tupleId); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java new file mode 100644 index 0000000000..8762ce125b --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.glue.translator; + +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.GreaterThan; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; +import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; +import org.apache.doris.nereids.types.IntegerType; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanNode; + +import mockit.Injectable; +import mockit.Mocked; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class PhysicalPlanTranslatorTest { + + @Test + public void testOlapPrune(@Mocked OlapTable t1, @Injectable LogicalProperties placeHolder) throws Exception { + List qualifierList = new ArrayList<>(); + qualifierList.add("test"); + qualifierList.add("t1"); + List t1Output = new ArrayList<>(); + SlotReference col1 = new SlotReference("col1", IntegerType.INSTANCE); + SlotReference col2 = new SlotReference("col2", IntegerType.INSTANCE); + SlotReference col3 = new SlotReference("col2", IntegerType.INSTANCE); + t1Output.add(col1); + t1Output.add(col2); + t1Output.add(col3); + LogicalProperties t1Properties = new LogicalProperties(() -> t1Output); + PhysicalOlapScan scan = new PhysicalOlapScan(t1, qualifierList, 0L, + Collections.emptyList(), Collections.emptyList(), null, + Optional.empty(), + t1Properties); + Literal t1FilterRight = new IntegerLiteral(1); + Expression t1FilterExpr = new GreaterThan(col1, t1FilterRight); + PhysicalFilter filter = + new PhysicalFilter(t1FilterExpr, placeHolder, scan); + List projList = new ArrayList<>(); + projList.add(col2); + PhysicalProject project = new PhysicalProject(projList, + placeHolder, filter); + PlanTranslatorContext planTranslatorContext = new PlanTranslatorContext(); + PhysicalPlanTranslator translator = new PhysicalPlanTranslator(); + PlanFragment fragment = translator.visitPhysicalProject(project, planTranslatorContext); + PlanNode planNode = fragment.getPlanRoot(); + List scanNodeList = new ArrayList<>(); + planNode.collect(OlapScanNode.class::isInstance, scanNodeList); + Assertions.assertEquals(2, scanNodeList.get(0).getTupleDesc().getMaterializedSlots().size()); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanRewriter.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanRewriter.java index 2a4a9343e6..67f6bc43ea 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanRewriter.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanRewriter.java @@ -40,14 +40,14 @@ public class PlanRewriter { public static Memo bottomUpRewriteMemo(Plan plan, ConnectContext connectContext, RuleFactory... rules) { return new Memo(plan) .newCascadesContext(new StatementContext(connectContext, new OriginStatement("", 0))) - .topDownRewrite(rules) + .bottomUpRewrite(rules) .getMemo(); } public static Memo bottomUpRewriteMemo(Plan plan, ConnectContext connectContext, Rule... rules) { return new Memo(plan) .newCascadesContext(new StatementContext(connectContext, new OriginStatement("", 0))) - .topDownRewrite(rules) + .bottomUpRewrite(rules) .getMemo(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java index bd3d1f3866..2fe0085883 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java @@ -90,6 +90,8 @@ public class DistributedPlannerTest { Deencapsulation.setField(inputPlanRoot, "conjuncts", Lists.newArrayList()); new Expectations() { { + inputPlanRoot.getOutputTupleDesc(); + result = null; inputFragment.isPartitioned(); result = true; plannerContext.getNextNodeId();