[fix](move-memtable) fix load timeout caused by lost wakeup (#32720)

This commit is contained in:
Xin Liao
2024-03-23 22:33:59 +08:00
committed by yiguolei
parent b9788e5e37
commit 5c3ca0fbc2

View File

@ -321,21 +321,16 @@ Status LoadStreamStub::close_wait(RuntimeState* state, int64_t timeout_ms) {
}
DCHECK(timeout_ms > 0) << "timeout_ms should be greator than 0";
std::unique_lock<bthread::Mutex> lock(_close_mutex);
if (!_is_closed.load()) {
auto timeout_sec = timeout_ms / 1000;
while (!state->get_query_ctx()->is_cancelled() && timeout_sec > 0) {
//the query maybe cancel, so need check after wait 1s
timeout_sec = timeout_sec - 1;
int ret = _close_cv.wait_for(lock, 1000000);
if (ret == 0) {
break;
}
if (timeout_sec <= 0) {
return Status::InternalError(
"stream close_wait timeout, timeout_ms={}, load_id={}, dst_id={}, "
"stream_id={}",
timeout_ms, print_id(_load_id), _dst_id, _stream_id);
}
auto timeout_sec = timeout_ms / 1000;
while (!_is_closed.load() && !state->get_query_ctx()->is_cancelled()) {
//the query maybe cancel, so need check after wait 1s
timeout_sec = timeout_sec - 1;
int ret = _close_cv.wait_for(lock, 1000000);
if (ret != 0 && timeout_sec <= 0) {
return Status::InternalError(
"stream close_wait timeout, error={}, timeout_ms={}, load_id={}, dst_id={}, "
"stream_id={}",
ret, timeout_ms, print_id(_load_id), _dst_id, _stream_id);
}
}
RETURN_IF_ERROR(_check_cancel());