diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 709e89368a..8cfec9d362 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -62,7 +62,7 @@ Status UnionSourceOperator::pull_data(RuntimeState* state, vectorized::Block* bl // here we precess const expr firstly if (_need_read_for_const_expr) { if (_node->has_more_const(state)) { - static_cast(_node->get_next_const(state, block)); + RETURN_IF_ERROR(_node->get_next_const(state, block)); } _need_read_for_const_expr = _node->has_more_const(state); } else { diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 471d451872..bb8e3ea77e 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -45,7 +45,6 @@ void AsyncResultWriter::set_dependency(pipeline::AsyncWriterDependency* dep, Status AsyncResultWriter::sink(Block* block, bool eos) { auto rows = block->rows(); - auto status = Status::OK(); std::unique_ptr add_block; if (rows) { add_block = _get_free_block(block, rows); @@ -58,7 +57,6 @@ Status AsyncResultWriter::sink(Block* block, bool eos) { return _writer_status; } - _eos = eos; if (_dependency && _is_finished()) { _dependency->set_ready(); } @@ -68,9 +66,13 @@ Status AsyncResultWriter::sink(Block* block, bool eos) { _dependency->block(); } } + // in 'process block' we check _eos first and _data_queue second so here + // in the lock. must modify the _eos after change _data_queue to make sure + // not lead the logic error in multi thread + _eos = eos; _cv.notify_one(); - return status; + return Status::OK(); } std::unique_ptr AsyncResultWriter::_get_block_from_queue() {