diff --git a/be/src/runtime/memory_scratch_sink.cpp b/be/src/runtime/memory_scratch_sink.cpp index 6b4e32cf30..add1708fda 100644 --- a/be/src/runtime/memory_scratch_sink.cpp +++ b/be/src/runtime/memory_scratch_sink.cpp @@ -222,8 +222,8 @@ Status MemoryScratchSink::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); } - // shutdown queue, then blocking_get return false, put sentinel - if (_queue) { + // put sentinel + if (_queue != nullptr) { _queue->blocking_put(nullptr); } Expr::close(_output_expr_ctxs, state); diff --git a/be/src/runtime/result_queue_mgr.cpp b/be/src/runtime/result_queue_mgr.cpp index 20cf073f2c..02ddcedf24 100644 --- a/be/src/runtime/result_queue_mgr.cpp +++ b/be/src/runtime/result_queue_mgr.cpp @@ -47,6 +47,10 @@ Status ResultQueueMgr::fetch_result(const TUniqueId& fragment_instance_id, std:: // sentinel nullptr indicates scan end if (*result == nullptr) { *eos = true; + // put sentinel for consistency, avoid repeated invoking fetch result when hava no rowbatch + if (queue != nullptr) { + queue->blocking_put(nullptr); + } } else { *eos = false; }