From 5c3ca0fbc2bd94af4b58b77829ec369612941c06 Mon Sep 17 00:00:00 2001 From: Xin Liao Date: Sat, 23 Mar 2024 22:33:59 +0800 Subject: [PATCH] [fix](move-memtable) fix load timeout caused by lost wakeup (#32720) --- be/src/vec/sink/load_stream_stub.cpp | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index da0276fdd0..6eb91e4685 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -321,21 +321,16 @@ Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) { } DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0"; std::unique_lock lock(_close_mutex); - if (!_is_closed.load()) { - auto timeout_sec = timeout_ms / 1000; - while (!state->get_query_ctx()->is_cancelled() && timeout_sec > 0) { - //the query maybe cancel, so need check after wait 1s - timeout_sec = timeout_sec - 1; - int ret = _close_cv.wait_for(lock, 1000000); - if (ret == 0) { - break; - } - if (timeout_sec <= 0) { - return Status::InternalError( - "stream close_wait timeout, timeout_ms={}, load_id={}, dst_id={}, " - "stream_id={}", - timeout_ms, print_id(_load_id), _dst_id, _stream_id); - } + auto timeout_sec = timeout_ms / 1000; + while (!_is_closed.load() && !state->get_query_ctx()->is_cancelled()) { + //the query maybe cancel, so need check after wait 1s + timeout_sec = timeout_sec - 1; + int ret = _close_cv.wait_for(lock, 1000000); + if (ret != 0 && timeout_sec <= 0) { + return Status::InternalError( + "stream close_wait timeout, error={}, timeout_ms={}, load_id={}, dst_id={}, " + "stream_id={}", + ret, timeout_ms, print_id(_load_id), _dst_id, _stream_id); } } RETURN_IF_ERROR(_check_cancel());