[fix](pipelineX) fix null ptr when unionoperator only have constexpr #24822

This commit is contained in:
Mryange
2023-09-23 20:28:38 +08:00
committed by GitHub
parent 11b6fb9d10
commit 5a4d51716c
3 changed files with 16 additions and 7 deletions

View File

@ -152,7 +152,9 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b
}
local_state.reached_limit(block, source_state);
//have exectue const expr, queue have no data any more, and child could be colsed
if ((!_has_data(state) && local_state._shared_state->data_queue->is_all_finish())) {
if (_child_size == 0) {
source_state = SourceState::FINISHED;
} else if ((!_has_data(state) && local_state._shared_state->data_queue->is_all_finish())) {
source_state = SourceState::FINISHED;
} else if (_has_data(state)) {
source_state = SourceState::MORE_DATA;

View File

@ -95,6 +95,9 @@ public:
: Base(pool, tnode, descs), _child_size(tnode.num_children) {};
~UnionSourceOperatorX() override = default;
Dependency* wait_for_dependency(RuntimeState* state) override {
if (_child_size == 0) {
return nullptr;
}
CREATE_LOCAL_STATE_RETURN_NULL_IF_ERROR(local_state);
return local_state._dependency->read_blocked_by();
}
@ -133,9 +136,13 @@ public:
}
return Status::OK();
}
int get_child_count() const { return _child_size; }
private:
bool _has_data(RuntimeState* state) {
if (_child_size == 0) {
return false;
}
auto& local_state = state->get_local_state(id())->cast<UnionSourceLocalState>();
return local_state._shared_state->data_queue->remaining_has_data();
}

View File

@ -307,9 +307,6 @@ Status OperatorX<LocalStateType>::setup_local_state(RuntimeState* state, LocalSt
template <typename LocalStateType>
Status OperatorX<LocalStateType>::setup_local_states(RuntimeState* state,
std::vector<LocalStateInfo>& infos) {
if (infos.size() > 1) {
LOG_WARNING("herr");
}
DCHECK(infos.size() == 1) << infos.size();
for (auto& info : infos) {
RETURN_IF_ERROR(setup_local_state(state, info));
@ -320,15 +317,18 @@ Status OperatorX<LocalStateType>::setup_local_states(RuntimeState* state,
template <>
Status OperatorX<UnionSourceLocalState>::setup_local_states(RuntimeState* state,
std::vector<LocalStateInfo>& infos) {
int child_count = static_cast<pipeline::UnionSourceOperatorX*>(this)->get_child_count();
std::shared_ptr<DataQueue> data_queue;
for (auto& info : infos) {
auto local_state = UnionSourceLocalState::create_shared(state, this);
state->emplace_local_state(id(), local_state);
RETURN_IF_ERROR(local_state->init(state, info));
if (!data_queue) {
data_queue = local_state->data_queue();
if (child_count != 0) {
if (!data_queue) {
data_queue = local_state->data_queue();
}
local_state->_shared_state->data_queue = data_queue;
}
local_state->_shared_state->data_queue = data_queue;
}
return Status::OK();
}