[Bug](pipeline) Fix pipeline load lose data use wrong var check order (#30341)

This commit is contained in:
HappenLee
2024-01-25 13:10:51 +08:00
committed by yiguolei
parent 6614d40dad
commit b60a272be0
2 changed files with 6 additions and 4 deletions

View File

@ -62,7 +62,7 @@ Status UnionSourceOperator::pull_data(RuntimeState* state, vectorized::Block* bl
// here we precess const expr firstly
if (_need_read_for_const_expr) {
if (_node->has_more_const(state)) {
static_cast<void>(_node->get_next_const(state, block));
RETURN_IF_ERROR(_node->get_next_const(state, block));
}
_need_read_for_const_expr = _node->has_more_const(state);
} else {

View File

@ -45,7 +45,6 @@ void AsyncResultWriter::set_dependency(pipeline::AsyncWriterDependency* dep,
Status AsyncResultWriter::sink(Block* block, bool eos) {
auto rows = block->rows();
auto status = Status::OK();
std::unique_ptr<Block> add_block;
if (rows) {
add_block = _get_free_block(block, rows);
@ -58,7 +57,6 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
return _writer_status;
}
_eos = eos;
if (_dependency && _is_finished()) {
_dependency->set_ready();
}
@ -68,9 +66,13 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
_dependency->block();
}
}
// in 'process block' we check _eos first and _data_queue second so here
// in the lock. must modify the _eos after change _data_queue to make sure
// not lead the logic error in multi thread
_eos = eos;
_cv.notify_one();
return status;
return Status::OK();
}
std::unique_ptr<Block> AsyncResultWriter::_get_block_from_queue() {