From 45b8f5d0e97bd3c0fdfc439aa46fdccbfb39f859 Mon Sep 17 00:00:00 2001 From: liucc1997 <1192520566@qq.com> Date: Tue, 5 Nov 2024 12:14:03 +0000 Subject: [PATCH] [CP] fix stream rpc droped when stream rpc keepalive unlock the cond --- deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp b/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp index b03eccfe3..fdd7cc453 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp @@ -242,10 +242,19 @@ int ObRpcSessionHandler::wait_for_next_request(int64_t sessid, // try to send reverse keepalive request. if (current_time_us >= keepalive_timeout_us && reverse_keepalive_arg.is_valid()) { get_next_cond_(thid).unlock(); - ret = stream_rpc_reverse_probe(reverse_keepalive_arg); + int tmp_ret = stream_rpc_reverse_probe(reverse_keepalive_arg); get_next_cond_(thid).lock(); - if (OB_FAIL(ret)) { + if (OB_SUCCESS != tmp_ret) { LOG_WARN("stream rpc sender has been aborted, unneed to wait", K(sessid), K(timeout), K(reverse_keepalive_arg)); + if (OB_FAIL(next_wait_map_.get_refactored(sessid, wait_object))) { + LOG_ERROR("wait object has been released", K(sessid), K(ret)); + } else if (OB_ISNULL(wait_object.req_)) { + // keepalive faild and the req is null, set the error and break + ret = tmp_ret; + } else { + req = wait_object.req_; + LOG_INFO("got the next request though keepalive failed, break and return success", K(sessid), K(tmp_ret), K(ret)); + } break; } }