diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 3ef5fef674..3c86fb87fb 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -152,7 +152,9 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b } local_state.reached_limit(block, source_state); //have exectue const expr, queue have no data any more, and child could be colsed - if ((!_has_data(state) && local_state._shared_state->data_queue->is_all_finish())) { + if (_child_size == 0) { + source_state = SourceState::FINISHED; + } else if ((!_has_data(state) && local_state._shared_state->data_queue->is_all_finish())) { source_state = SourceState::FINISHED; } else if (_has_data(state)) { source_state = SourceState::MORE_DATA; diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 59212c5a65..acd8419575 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -95,6 +95,9 @@ public: : Base(pool, tnode, descs), _child_size(tnode.num_children) {}; ~UnionSourceOperatorX() override = default; Dependency* wait_for_dependency(RuntimeState* state) override { + if (_child_size == 0) { + return nullptr; + } CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state); return local_state._dependency->read_blocked_by(); } @@ -133,9 +136,13 @@ public: } return Status::OK(); } + int get_child_count() const { return _child_size; } private: bool _has_data(RuntimeState* state) { + if (_child_size == 0) { + return false; + } auto& local_state = state->get_local_state(id())->cast(); return local_state._shared_state->data_queue->remaining_has_data(); } diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 42c5842d0f..8d228623c5 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -307,9 +307,6 @@ Status OperatorX::setup_local_state(RuntimeState* state, LocalSt template Status OperatorX::setup_local_states(RuntimeState* state, std::vector& infos) { - if (infos.size() > 1) { - LOG_WARNING("herr"); - } DCHECK(infos.size() == 1) << infos.size(); for (auto& info : infos) { RETURN_IF_ERROR(setup_local_state(state, info)); @@ -320,15 +317,18 @@ Status OperatorX::setup_local_states(RuntimeState* state, template <> Status OperatorX::setup_local_states(RuntimeState* state, std::vector& infos) { + int child_count = static_cast(this)->get_child_count(); std::shared_ptr data_queue; for (auto& info : infos) { auto local_state = UnionSourceLocalState::create_shared(state, this); state->emplace_local_state(id(), local_state); RETURN_IF_ERROR(local_state->init(state, info)); - if (!data_queue) { - data_queue = local_state->data_queue(); + if (child_count != 0) { + if (!data_queue) { + data_queue = local_state->data_queue(); + } + local_state->_shared_state->data_queue = data_queue; } - local_state->_shared_state->data_queue = data_queue; } return Status::OK(); }