[Bug](pipeline) fix hang on union_source_operator when child sink_operator all finished (#20938)

This commit is contained in:
Pxl
2023-06-19 09:46:38 +08:00
committed by GitHub
parent 0bed52d86b
commit 99810f1ea5
2 changed files with 9 additions and 3 deletions

View File

@ -46,10 +46,14 @@ UnionSourceOperator::UnionSourceOperator(OperatorBuilderBase* operator_builder,
_data_queue(queue),
_need_read_for_const_expr(true) {};
bool UnionSourceOperator::_has_data() {
return _need_read_for_const_expr || _data_queue->remaining_has_data();
}
// we assumed it can read to process const expr, Although we don't know whether there is
// ,and queue have data, could read also
bool UnionSourceOperator::can_read() {
return _need_read_for_const_expr || _data_queue->remaining_has_data();
return _has_data() || _data_queue->is_all_finish();
}
Status UnionSourceOperator::pull_data(RuntimeState* state, vectorized::Block* block, bool* eos) {
@ -83,9 +87,9 @@ Status UnionSourceOperator::get_block(RuntimeState* state, vectorized::Block* bl
std::bind(&UnionSourceOperator::pull_data, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3)));
//have exectue const expr, queue have no data any more, and child could be colsed
if (eos || (!can_read() && _data_queue->is_all_finish())) {
if (eos || (!_has_data() && _data_queue->is_all_finish())) {
source_state = SourceState::FINISHED;
} else if (can_read()) {
} else if (_has_data()) {
source_state = SourceState::MORE_DATA;
} else {
source_state = SourceState::DEPEND_ON_SOURCE;

View File

@ -59,6 +59,8 @@ public:
Status pull_data(RuntimeState* state, vectorized::Block* output_block, bool* eos);
private:
bool _has_data();
std::shared_ptr<DataQueue> _data_queue;
bool _need_read_for_const_expr;
};