From 8c8078ad28db79ae7a8e3f6178ccfb002908097a Mon Sep 17 00:00:00 2001 From: HappenLee Date: Thu, 1 Sep 2022 10:48:10 +0800 Subject: [PATCH] [fix](projections) get error row_descriptor when have projections on ExecNode (#12232) When ExecNode's projections is not empty, it use output row descriptor to initialize the block before doing projection. But we should use original row descriptor. This PR fix it. --- be/src/vec/exec/file_scan_node.cpp | 4 ++-- be/src/vec/exec/scan/vscan_node.cpp | 4 ++-- be/src/vec/exec/vaggregation_node.cpp | 4 ++-- be/src/vec/exec/vanalytic_eval_node.cpp | 2 +- be/src/vec/exec/volap_scan_node.cpp | 4 ++-- be/src/vec/exec/vtable_function_node.cpp | 2 +- be/src/vec/exec/vunion_node.cpp | 10 +++++----- 7 files changed, 15 insertions(+), 15 deletions(-) diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp index 8dc3cc5222..0679154409 100644 --- a/be/src/vec/exec/file_scan_node.cpp +++ b/be/src/vec/exec/file_scan_node.cpp @@ -165,7 +165,7 @@ Status FileScanNode::_acquire_and_build_runtime_filter(RuntimeState* state) { } IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtimefilter; std::vector vexprs; - runtime_filter->get_prepared_vexprs(&vexprs, row_desc()); + runtime_filter->get_prepared_vexprs(&vexprs, _row_descriptor); if (vexprs.empty()) { continue; } @@ -181,7 +181,7 @@ Status FileScanNode::_acquire_and_build_runtime_filter(RuntimeState* state) { last_expr = new_node; } auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr)); - auto expr_status = new_vconjunct_ctx_ptr->prepare(state, row_desc()); + auto expr_status = new_vconjunct_ctx_ptr->prepare(state, _row_descriptor); if (UNLIKELY(!expr_status.OK())) { LOG(WARNING) << "Something wrong for runtime filters: " << expr_status; vexprs.clear(); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index ba80c4aea3..cb9a9988c1 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -267,7 +267,7 @@ Status VScanNode::_append_rf_into_conjuncts(std::vector& vexprs) { if (_vconjunct_ctx_ptr) { (*_vconjunct_ctx_ptr)->clone_fn_contexts(new_vconjunct_ctx_ptr); } - RETURN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(_state, row_desc())); + RETURN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(_state, _row_descriptor)); RETURN_IF_ERROR(new_vconjunct_ctx_ptr->open(_state)); if (_vconjunct_ctx_ptr) { (*(_vconjunct_ctx_ptr.get()))->mark_as_stale(); @@ -879,7 +879,7 @@ Status VScanNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) { ++current_arrived_rf_num; continue; } else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) { - _runtime_filter_ctxs[i].runtime_filter->get_prepared_vexprs(&vexprs, row_desc()); + _runtime_filter_ctxs[i].runtime_filter->get_prepared_vexprs(&vexprs, _row_descriptor); ++current_arrived_rf_num; _runtime_filter_ctxs[i].apply_mark = true; } diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index ae6421e7a6..64dca91019 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -515,7 +515,7 @@ Status AggregationNode::_get_without_key_result(RuntimeState* state, Block* bloc DCHECK(_agg_data.without_key != nullptr); block->clear(); - *block = VectorizedUtils::create_empty_columnswithtypename(row_desc()); + *block = VectorizedUtils::create_empty_columnswithtypename(_row_descriptor); int agg_size = _aggregate_evaluators.size(); MutableColumns columns(agg_size); @@ -1002,7 +1002,7 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) { Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Block* block, bool* eos) { bool mem_reuse = block->mem_reuse(); - auto column_withschema = VectorizedUtils::create_columns_with_type_and_name(row_desc()); + auto column_withschema = VectorizedUtils::create_columns_with_type_and_name(_row_descriptor); int key_size = _probe_expr_ctxs.size(); MutableColumns key_columns; diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index fc5b224253..7cb6fd5329 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -148,7 +148,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) { SCOPED_TIMER(_runtime_profile->total_time_counter()); RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); - DCHECK(child(0)->row_desc().is_prefix_of(row_desc())); + DCHECK(child(0)->row_desc().is_prefix_of(_row_descriptor)); _mem_pool.reset(new MemPool(mem_tracker())); _evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime"); SCOPED_TIMER(_evaluation_timer); diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 0e2bc17697..30ff08695f 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -426,7 +426,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { DCHECK(runtime_filter != nullptr); bool ready = runtime_filter->is_ready(); if (ready) { - runtime_filter->get_prepared_vexprs(&vexprs, row_desc()); + runtime_filter->get_prepared_vexprs(&vexprs, _row_descriptor); scanner_filter_apply_marks[i] = true; if (!_runtime_filter_ready_flag[i] && !vexprs.empty()) { std::lock_guard l(_rf_lock); @@ -1829,7 +1829,7 @@ Status VOlapScanNode::_append_rf_into_conjuncts(RuntimeState* state, std::vector if (_vconjunct_ctx_ptr) { (*_vconjunct_ctx_ptr)->clone_fn_contexts(new_vconjunct_ctx_ptr); } - RETURN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(state, row_desc())); + RETURN_IF_ERROR(new_vconjunct_ctx_ptr->prepare(state, _row_descriptor)); RETURN_IF_ERROR(new_vconjunct_ctx_ptr->open(state)); if (_vconjunct_ctx_ptr) { (*(_vconjunct_ctx_ptr.get()))->mark_as_stale(); diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp index 49ff5d5a69..96b887fd7b 100644 --- a/be/src/vec/exec/vtable_function_node.cpp +++ b/be/src/vec/exec/vtable_function_node.cpp @@ -59,7 +59,7 @@ Status VTableFunctionNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(VExpr::prepare(_vfn_ctxs, state, _row_descriptor)); // get current all output slots - for (const auto& tuple_desc : this->row_desc().tuple_descriptors()) { + for (const auto& tuple_desc : this->_row_descriptor.tuple_descriptors()) { for (const auto& slot_desc : tuple_desc->slots()) { _output_slots.push_back(slot_desc); } diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp index e9de9c5bcb..0bf6b2f826 100644 --- a/be/src/vec/exec/vunion_node.cpp +++ b/be/src/vec/exec/vunion_node.cpp @@ -66,7 +66,7 @@ Status VUnionNode::prepare(RuntimeState* state) { ADD_TIMER(_runtime_profile, "MaterializeExprsEvaluateTimer"); // Prepare const expr lists. for (const std::vector& exprs : _const_expr_lists) { - RETURN_IF_ERROR(VExpr::prepare(exprs, state, row_desc())); + RETURN_IF_ERROR(VExpr::prepare(exprs, state, _row_descriptor)); } // Prepare result expr lists. @@ -128,8 +128,8 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) { bool mem_reuse = block->mem_reuse(); MutableBlock mblock = mem_reuse ? MutableBlock::build_mutable_block(block) - : MutableBlock(Block( - VectorizedUtils::create_columns_with_type_and_name(row_desc()))); + : MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name( + _row_descriptor))); Block child_block; while (has_more_materialized() && mblock.rows() <= state->batch_size()) { @@ -184,8 +184,8 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) { bool mem_reuse = block->mem_reuse(); MutableBlock mblock = mem_reuse ? MutableBlock::build_mutable_block(block) - : MutableBlock(Block( - VectorizedUtils::create_columns_with_type_and_name(row_desc()))); + : MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name( + _row_descriptor))); for (; _const_expr_list_idx < _const_expr_lists.size(); ++_const_expr_list_idx) { Block tmp_block; tmp_block.insert({vectorized::ColumnUInt8::create(1),