diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp b/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp index 3c95bb8df4..aed8f87455 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp @@ -240,11 +240,14 @@ int ObRpcSessionHandler::wait_for_next_request(int64_t sessid, LOG_DEBUG("the stream request hasn't come"); // when waiting for OB_REMOTE_EXECUTE/OB_REMOTE_SYNC_EXECUTE/OB_INNER_SQL_SYNC_TRANSMIT request more than 30s, // try to send reverse keepalive request. - if (current_time_us >= keepalive_timeout_us - && reverse_keepalive_arg.is_valid() - && OB_FAIL(stream_rpc_reverse_probe(reverse_keepalive_arg))) { - LOG_WARN("stream rpc sender has been aborted, unneed to wait", K(sessid), K(timeout), K(reverse_keepalive_arg)); - break; + if (current_time_us >= keepalive_timeout_us && reverse_keepalive_arg.is_valid()) { + get_next_cond_(wait_object.thid_).unlock(); + ret = stream_rpc_reverse_probe(reverse_keepalive_arg); + get_next_cond_(wait_object.thid_).lock(); + if (OB_FAIL(ret)) { + LOG_WARN("stream rpc sender has been aborted, unneed to wait", K(sessid), K(timeout), K(reverse_keepalive_arg)); + break; + } } } else { req = wait_object.req_;