diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 213b9bc0d3..9e15c2bd9f 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -26,6 +26,7 @@ #include "pipeline/exec/union_sink_operator.h" #include "pipeline/pipeline_x/dependency.h" #include "runtime/descriptors.h" +#include "util/defer_op.h" #include "vec/core/block.h" namespace doris { @@ -161,6 +162,13 @@ std::string UnionSourceLocalState::debug_string(int indentation_level) const { Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) { auto& local_state = get_local_state(state); + Defer set_eos {[&]() { + //have executing const expr, queue have no data anymore, and child could be closed + *eos = (_child_size == 0 && !local_state._need_read_for_const_expr) || + (_child_size > 0 && local_state._shared_state->data_queue.is_all_finish() && + !_has_data(state)); + }}; + SCOPED_TIMER(local_state.exec_time_counter()); if (local_state._need_read_for_const_expr) { if (has_more_const(state)) { @@ -168,7 +176,7 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b } local_state._need_read_for_const_expr = has_more_const(state); } else if (_child_size != 0) { - std::unique_ptr output_block = vectorized::Block::create_unique(); + std::unique_ptr output_block; int child_idx = 0; RETURN_IF_ERROR(local_state._shared_state->data_queue.get_block_from_queue(&output_block, &child_idx)); @@ -180,11 +188,6 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b local_state._shared_state->data_queue.push_free_block(std::move(output_block), child_idx); } local_state.reached_limit(block, eos); - //have executing const expr, queue have no data anymore, and child could be closed - *eos = (_child_size == 0 && !local_state._need_read_for_const_expr) || - (_child_size > 0 && local_state._shared_state->data_queue.is_all_finish() && - !_has_data(state)); - return Status::OK(); } diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 40d02324cb..a70d55ab60 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -137,14 +137,14 @@ public: private: bool _has_data(RuntimeState* state) const { - auto& local_state = state->get_local_state(operator_id())->cast(); + auto& local_state = get_local_state(state); if (_child_size == 0) { return local_state._need_read_for_const_expr; } return local_state._shared_state->data_queue.remaining_has_data(); } bool has_more_const(RuntimeState* state) const { - auto& local_state = state->get_local_state(operator_id())->cast(); + auto& local_state = get_local_state(state); return state->per_fragment_instance_idx() == 0 && local_state._const_expr_list_idx < local_state._const_expr_lists.size(); }