diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp index 042f65e51..f863b9c9a 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp @@ -230,8 +230,8 @@ int32_t ObPocClientStub::get_proxy_group_id(ObRpcProxy& proxy) { void ObPocClientStub::set_rcode(ObRpcProxy& proxy, const ObRpcResultCode& rcode) { proxy.set_result_code(rcode); } -void ObPocClientStub::set_handle(ObRpcProxy& proxy, Handle* handle, const ObRpcPacketCode& pcode, const ObRpcOpts& opts, bool is_stream_next, int64_t session_id) { - proxy.set_handle_attr(handle, pcode, opts, is_stream_next, session_id); +void ObPocClientStub::set_handle(ObRpcProxy& proxy, Handle* handle, const ObRpcPacketCode& pcode, const ObRpcOpts& opts, bool is_stream_next, int64_t session_id, int64_t pkt_id, int64_t send_ts) { + proxy.set_handle_attr(handle, pcode, opts, is_stream_next, session_id, pkt_id, send_ts); } int ObPocClientStub::translate_io_error(int io_err) { return tranlate_to_ob_error(io_err); diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h index b1501f471..54aaca2bb 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h @@ -105,7 +105,7 @@ public: static int64_t get_proxy_timeout(ObRpcProxy& proxy); static void set_rcode(ObRpcProxy& proxy, const ObRpcResultCode& rcode); static int check_blacklist(const common::ObAddr& addr); - static void set_handle(ObRpcProxy& proxy, Handle* handle, const ObRpcPacketCode& pcode, const ObRpcOpts& opts, bool is_stream_next, int64_t session_id); + static void set_handle(ObRpcProxy& proxy, Handle* handle, const ObRpcPacketCode& pcode, const ObRpcOpts& opts, bool is_stream_next, int64_t session_id, int64_t pkt_id, int64_t send_ts); static int32_t get_proxy_group_id(ObRpcProxy& proxy); static uint8_t balance_assign_tidx() { @@ -181,7 +181,8 @@ public: } set_rcode(proxy, rcode); if (OB_SUCC(ret) && handle) { - set_handle(proxy, handle, pcode, opts, resp_pkt.is_stream_next(), resp_pkt.get_session_id()); + int64_t pkt_id = static_cast(cb.pkt_id_); + set_handle(proxy, handle, pcode, opts, resp_pkt.is_stream_next(), resp_pkt.get_session_id(), pkt_id, start_ts); } } rpc::RpcStatPiece piece; diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp index 69e17cf65..a731e3ab9 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp @@ -39,6 +39,22 @@ bool __attribute__((weak)) enable_pkt_nio(bool start_as_client) { int64_t __attribute__((weak)) get_max_rpc_packet_size() { return OB_MAX_RPC_PACKET_LENGTH; } +void __attribute__((weak)) stream_rpc_register(const int64_t pkt_id, int64_t send_time_us) +{ + UNUSED(pkt_id); + UNUSED(send_time_us); + RPC_LOG_RET(WARN, OB_ERR_UNEXPECTED, "should not reach here"); +} +void __attribute__((weak)) stream_rpc_unregister(const int64_t pkt_id) +{ + UNUSED(pkt_id); + RPC_LOG_RET(WARN, OB_ERR_UNEXPECTED, "should not reach here"); +} +int __attribute__((weak)) stream_rpc_reverse_probe(const ObRpcReverseKeepaliveArg& reverse_keepalive_arg) +{ + UNUSED(reverse_keepalive_arg); + return OB_ERR_UNEXPECTED; +} }; // end namespace obrpc }; // end namespace oceanbase @@ -107,6 +123,10 @@ int ObPocServerHandleContext::create(int64_t resp_id, const char* buf, int64_t s } int64_t receive_ts = ObTimeUtility::current_time(); pkt->set_receive_ts(receive_ts); + int64_t pkt_id = pn_get_pkt_id(resp_id); + if (OB_LIKELY(pkt_id >= 0)) { + pkt->set_packet_id(pkt_id); + } pkt->set_content(packet_data, tmp_pkt.get_clen()); req->set_server_handle_context(ctx); req->set_packet(pkt); diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h index 41a1059a9..b3df2bdc0 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h @@ -22,6 +22,10 @@ namespace oceanbase namespace obrpc { +enum { + INVALID_RPC_PKT_ID = -1 +}; +struct ObRpcReverseKeepaliveArg; class ObPocServerHandleContext { public: @@ -80,6 +84,9 @@ private: extern ObPocRpcServer global_poc_server; extern ObListener* global_ob_listener; +void stream_rpc_register(const int64_t pkt_id, int64_t send_time_us); +void stream_rpc_unregister(const int64_t pkt_id); +int stream_rpc_reverse_probe(const ObRpcReverseKeepaliveArg& reverse_keepalive_arg); int64_t get_max_rpc_packet_size(); extern "C" { int dispatch_to_ob_listener(int accept_fd); diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index 8fa32ce6f..dcc2ab8b8 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -1073,6 +1073,7 @@ PCODE_DEF(OB_DETECT_RPC_CALL, 0x1595) // session info diagnosis // PCODE_DEF(OB_SESS_INFO_DIAGNOSIS, 0x1596) +PCODE_DEF(OB_RPC_REVERSE_KEEPALIVE, 0x1597) // tenant snapshot // create tenant snapshot diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.cpp b/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.cpp index 90d606215..a3eb81a5c 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.cpp @@ -541,7 +541,7 @@ int ObRpcProcessorBase::part_response_error(rpc::ObRequest* req, const int retco } return ret; } -int ObRpcProcessorBase::flush(int64_t wait_timeout) +int ObRpcProcessorBase::flush(int64_t wait_timeout, const ObAddr *src_addr) { int ret = OB_SUCCESS; is_stream_ = true; @@ -549,7 +549,7 @@ int ObRpcProcessorBase::flush(int64_t wait_timeout) UNIS_VERSION_GUARD(unis_version_); const int64_t stream_rpc_max_wait_timeout = get_stream_rpc_max_wait_timeout(tenant_id_); - if (0 == wait_timeout || wait_timeout > stream_rpc_max_wait_timeout) { + if (0 == wait_timeout) { wait_timeout = stream_rpc_max_wait_timeout; } @@ -560,6 +560,8 @@ int ObRpcProcessorBase::flush(int64_t wait_timeout) RPC_OBRPC_LOG(WARN, "allocate stream condition object fail", K(ret)); } } + int64_t tenant_id = OB_INVALID_TENANT_ID; + if (OB_FAIL(ret)) { } else if (OB_ISNULL(rpc_pkt_) || is_stream_end_) { ret = OB_ERR_UNEXPECTED; @@ -568,7 +570,7 @@ int ObRpcProcessorBase::flush(int64_t wait_timeout) } else if (rpc_pkt_ && rpc_pkt_->is_stream_last()) { ret = OB_ITER_END; RPC_OBRPC_LOG(WARN, "stream is end", K(ret), K(*rpc_pkt_)); - } else if (OB_FAIL(sc_->prepare())) { + } else if (OB_FAIL(sc_->prepare(src_addr, rpc_pkt_))) { RPC_OBRPC_LOG(WARN, "prepare stream session fail", K(ret)); } else if (OB_FAIL(part_response(common::OB_SUCCESS, false))) { RPC_OBRPC_LOG(WARN, "response part result to peer fail", K(ret)); diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.h b/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.h index 2db38c8db..0f8a382b7 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_processor_base.h @@ -108,7 +108,7 @@ protected: virtual int deserialize(); virtual int serialize(); virtual int response(const int retcode) { return part_response(retcode, true); } - virtual int flush(int64_t wait_timeout = 0); + virtual int flush(int64_t wait_timeout = 0, const ObAddr *src_addr = NULL); void set_preserve_recv_data() { preserve_recv_data_ = true; } void set_result_compress_type(common::ObCompressorType t) { result_compress_type_ = t; } diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.cpp b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.cpp index 8096c3e3e..4d48039a4 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.cpp @@ -35,6 +35,7 @@ using namespace oceanbase::rpc::frame; ObAddr ObRpcProxy::myaddr_; + Handle::Handle() : has_more_(false), dst_(), @@ -42,7 +43,8 @@ Handle::Handle() opts_(), transport_(NULL), proxy_(), - pcode_(OB_INVALID_RPC_CODE) + pcode_(OB_INVALID_RPC_CODE), + first_pkt_id_(INVALID_RPC_PKT_ID) {} int ObRpcProxy::init(const ObReqTransport *transport, @@ -333,7 +335,7 @@ int ObRpcProxy::create_request( return transport.create_request(req, addr, size, timeout, local_addr, do_ratelimit, is_bg_flow, ob_ssl_invited_nodes, cb); } -void ObRpcProxy::set_handle_attr(Handle* handle, const ObRpcPacketCode& pcode, const ObRpcOpts& opts, bool is_stream_next, int64_t session_id) { +void ObRpcProxy::set_handle_attr(Handle* handle, const ObRpcPacketCode& pcode, const ObRpcOpts& opts, bool is_stream_next, int64_t session_id, int64_t pkt_id, int64_t send_ts) { if (handle) { handle->pcode_ = pcode; handle->opts_ = opts; @@ -344,5 +346,9 @@ void ObRpcProxy::set_handle_attr(Handle* handle, const ObRpcPacketCode& pcode, c handle->do_ratelimit_ = do_ratelimit_; handle->is_bg_flow_ = is_bg_flow_; handle->transport_ = NULL; + if (is_stream_next) { + handle->first_pkt_id_ = pkt_id; + stream_rpc_register(pkt_id, send_ts); + } } } diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.h b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.h index 9bc467ad4..696e93477 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.h @@ -153,7 +153,7 @@ public: rcode_.warnings_.reset(); rcode_.warnings_ = retcode.warnings_; } - void set_handle_attr(Handle* handle, const ObRpcPacketCode& pcode, const ObRpcOpts& opts, bool is_stream_next, int64_t session_id); + void set_handle_attr(Handle* handle, const ObRpcPacketCode& pcode, const ObRpcOpts& opts, bool is_stream_next, int64_t session_id, int64_t pkt_id, int64_t send_ts); bool need_increment_request_level(int pcode) const { return ((pcode > OB_SQL_PCODE_START && pcode < OB_SQL_PCODE_END) @@ -299,7 +299,7 @@ protected: ObRpcPacketCode pcode_; bool do_ratelimit_; int8_t is_bg_flow_; - + int64_t first_pkt_id_; private: DISALLOW_COPY_AND_ASSIGN(Handle); }; diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp index 330706213..c35b2a489 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp @@ -101,6 +101,10 @@ int SSHandle::get_more(typename pcodeStruct::Response &result) } else { has_more_ = resp_pkt.is_stream_next(); } + if (OB_FAIL(ret) || !has_more_) { + stream_rpc_unregister(first_pkt_id_); + first_pkt_id_ = INVALID_RPC_PKT_ID; + } } else if (OB_FAIL(ObRpcProxy::create_request(pcode_, *transport_, req, dst_, PAYLOAD_SIZE, proxy_.timeout(), opts_.local_addr_, do_ratelimit_, @@ -246,6 +250,10 @@ int SSHandle::abort() } has_more_ = false; } + if (first_pkt_id_ > 0) { + stream_rpc_unregister(first_pkt_id_); + first_pkt_id_ = INVALID_RPC_PKT_ID; + } } else if (OB_FAIL(ObRpcProxy::create_request(pcode_, *transport_, req, dst_, PAYLOAD_SIZE, proxy_.timeout(), opts_.local_addr_, do_ratelimit_, diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_reverse_keepalive_struct.h b/deps/oblib/src/rpc/obrpc/ob_rpc_reverse_keepalive_struct.h new file mode 100644 index 000000000..48aa048f7 --- /dev/null +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_reverse_keepalive_struct.h @@ -0,0 +1,70 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_OBSERVER_RPC_REVERSE_KEEPALIVE_STRUCT_H +#define OCEANBASE_OBSERVER_RPC_REVERSE_KEEPALIVE_STRUCT_H + +#include "lib/utility/ob_unify_serialize.h" +#include "lib/net/ob_addr.h" +#include "lib/utility/ob_print_utils.h" + +namespace oceanbase +{ +namespace obrpc +{ +struct ObRpcReverseKeepaliveArg +{ + OB_UNIS_VERSION(1); +public: + ObRpcReverseKeepaliveArg() + : dst_(), first_send_ts_(OB_INVALID_TIMESTAMP), pkt_id_(-1) + {} + ObRpcReverseKeepaliveArg(const ObAddr& dst, int64_t first_send_ts_, const int64_t pkt_id) + : dst_(dst), first_send_ts_(first_send_ts_), pkt_id_(pkt_id) + {} + ~ObRpcReverseKeepaliveArg() + {} + bool is_valid() const + { + return dst_.is_valid() && pkt_id_ >= 0 && pkt_id_ <= INT32_MAX && first_send_ts_ > 0; + } + int assign(const ObRpcReverseKeepaliveArg &other); + TO_STRING_KV(K_(dst), K_(pkt_id), K_(first_send_ts)); + + ObAddr dst_; + int64_t first_send_ts_; + int64_t pkt_id_; + DISALLOW_COPY_AND_ASSIGN(ObRpcReverseKeepaliveArg); +}; + +struct ObRpcReverseKeepaliveResp +{ + OB_UNIS_VERSION(1); +public: + ObRpcReverseKeepaliveResp() : ret_(OB_SUCCESS) + {} + ~ObRpcReverseKeepaliveResp() + {} + int assign(const ObRpcReverseKeepaliveResp &other) + { + ret_ = other.ret_; + return OB_SUCCESS; + } + TO_STRING_KV(K_(ret)); + int32_t ret_; +private: + DISALLOW_COPY_AND_ASSIGN(ObRpcReverseKeepaliveResp); +}; +}; // end namespace obrpc +}; // end namespace oceanbase + +#endif /* !OCEANBASE_OBSERVER_RPC_REVERSE_KEEPALIVE_STRUCT_H */ diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp b/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp index d36b6ab64..3c95bb8df 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.cpp @@ -18,6 +18,8 @@ #include "lib/thread_local/ob_tsi_utils.h" #include "rpc/ob_request.h" #include "rpc/obrpc/ob_rpc_packet.h" +#include "rpc/obrpc/ob_poc_rpc_server.h" +#include "rpc/obrpc/ob_rpc_reverse_keepalive_struct.h" using namespace oceanbase::common; using namespace oceanbase::rpc; @@ -178,7 +180,8 @@ int ObRpcSessionHandler::destroy_session(int64_t sessid) int ObRpcSessionHandler::wait_for_next_request(int64_t sessid, ObRequest *&req, - const int64_t timeout) + const int64_t timeout, + const ObRpcReverseKeepaliveArg& reverse_keepalive_arg) { int ret = OB_SUCCESS; int hash_ret = 0; @@ -208,6 +211,8 @@ int ObRpcSessionHandler::wait_for_next_request(int64_t sessid, K(ret), K(sessid), K(thid), K(wait_object.thid_), K(ret)); } else if (NULL == wait_object.req_) { int64_t timeout_ms = timeout / 1000; + int64_t abs_timeout_us = ObTimeUtility::current_time() + timeout; + int64_t keepalive_timeout_us = abs_timeout_us - timeout + MAX_WAIT_TIMEOUT_MS * 1000; if (timeout_ms < DEFAULT_WAIT_TIMEOUT_MS) { timeout_ms = DEFAULT_WAIT_TIMEOUT_MS; } else { @@ -215,7 +220,8 @@ int ObRpcSessionHandler::wait_for_next_request(int64_t sessid, } //Waking up by IO thread - while (timeout_ms > 0) { + int64_t current_time_us = 0; + while ((current_time_us = ObTimeUtility::current_time()) < abs_timeout_us) { // Here we don't care about result of wait because: // // 1. If it return success, it may be caused by spurious wakeup. @@ -232,14 +238,22 @@ int ObRpcSessionHandler::wait_for_next_request(int64_t sessid, break; } else if (OB_ISNULL(wait_object.req_)) { LOG_DEBUG("the stream request hasn't come"); + // when waiting for OB_REMOTE_EXECUTE/OB_REMOTE_SYNC_EXECUTE/OB_INNER_SQL_SYNC_TRANSMIT request more than 30s, + // try to send reverse keepalive request. + if (current_time_us >= keepalive_timeout_us + && reverse_keepalive_arg.is_valid() + && OB_FAIL(stream_rpc_reverse_probe(reverse_keepalive_arg))) { + LOG_WARN("stream rpc sender has been aborted, unneed to wait", K(sessid), K(timeout), K(reverse_keepalive_arg)); + break; + } } else { req = wait_object.req_; break; } - timeout_ms -= DEFAULT_WAIT_TIMEOUT_MS; + timeout_ms = (abs_timeout_us - current_time_us) / 1000; } - if (timeout_ms <= 0) { + if (current_time_us >= abs_timeout_us) { // Stop or time out req = NULL; ret = OB_WAIT_NEXT_TIMEOUT; //WAIT_TIMEOUT; diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.h b/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.h index 91dbb0c23..bebaaf336 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_session_handler.h @@ -30,6 +30,7 @@ class ObRequest; namespace obrpc { +struct ObRpcReverseKeepaliveArg; class ObRpcSessionHandler { public: @@ -53,7 +54,8 @@ public: */ virtual int wait_for_next_request(int64_t session_id, rpc::ObRequest *&req, - const int64_t timeout); + const int64_t timeout, + const ObRpcReverseKeepaliveArg& reverse_keepalive_arg); bool wakeup_next_thread(rpc::ObRequest &req); diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_stream_cond.cpp b/deps/oblib/src/rpc/obrpc/ob_rpc_stream_cond.cpp index 19a74f8c3..518b42181 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_stream_cond.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_stream_cond.cpp @@ -16,13 +16,16 @@ #include "lib/oblog/ob_log.h" #include "rpc/obrpc/ob_rpc_session_handler.h" +#include "rpc/obrpc/ob_poc_rpc_server.h" +#include "rpc/obrpc/ob_rpc_reverse_keepalive_struct.h" using namespace oceanbase::common; using namespace oceanbase::rpc; using namespace oceanbase::obrpc; ObRpcStreamCond::ObRpcStreamCond(ObRpcSessionHandler &handler) - : sessid_(0), handler_(handler) + : sessid_(0), handler_(handler), + first_pkt_id_(INVALID_RPC_PKT_ID), first_send_ts_(OB_INVALID_TIMESTAMP), src_addr_() { } @@ -31,7 +34,7 @@ ObRpcStreamCond::~ObRpcStreamCond() destroy(); } -int ObRpcStreamCond::prepare() +int ObRpcStreamCond::prepare(const ObAddr *src_addr, const ObRpcPacket *packet) { int ret = OB_SUCCESS; if (0 == sessid_) { @@ -41,6 +44,10 @@ int ObRpcStreamCond::prepare() if (OB_FAIL(handler_.prepare_for_next_request(sessid_))) { LOG_WARN("preapre stream rpc fail", K(ret)); + } else if (src_addr != NULL) { + src_addr_ = *src_addr; + first_pkt_id_ = packet->get_packet_id(); + first_send_ts_ = packet->get_timestamp(); } } return ret; @@ -49,10 +56,11 @@ int ObRpcStreamCond::prepare() int ObRpcStreamCond::wait(ObRequest *&req, int64_t timeout) { int ret = OB_SUCCESS; + ObRpcReverseKeepaliveArg reverse_keepalive_arg(src_addr_, first_send_ts_, first_pkt_id_); if (sessid_ <= 0) { LOG_WARN("sessid is invalied", K_(sessid)); } else if (OB_FAIL(handler_.wait_for_next_request( - sessid_, req, timeout))) { + sessid_, req, timeout, reverse_keepalive_arg))) { LOG_WARN("wait for next request failed", K(ret), K_(sessid)); } else { //do nothing diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_stream_cond.h b/deps/oblib/src/rpc/obrpc/ob_rpc_stream_cond.h index f2e85abd9..b333c57c3 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_stream_cond.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_stream_cond.h @@ -14,6 +14,7 @@ #define OCEANBASE_RPC_OBRPC_OB_RPC_STREAM_COND_ #include #include "lib/ob_define.h" +#include "lib/net/ob_addr.h" namespace oceanbase { @@ -32,7 +33,7 @@ public: explicit ObRpcStreamCond(ObRpcSessionHandler &handle); virtual ~ObRpcStreamCond(); - virtual int prepare(); + virtual int prepare(const ObAddr *src_addr, const ObRpcPacket *packet); virtual int wait(rpc::ObRequest *&req, int64_t timeout); virtual int wakeup(rpc::ObRequest &req); virtual int destroy(); @@ -43,6 +44,9 @@ public: private: int64_t sessid_; ObRpcSessionHandler &handler_; + int64_t first_pkt_id_; + int64_t first_send_ts_; + ObAddr src_addr_; private: DISALLOW_COPY_AND_ASSIGN(ObRpcStreamCond); }; // end of class ObRpcStreamCond diff --git a/deps/oblib/src/rpc/pnio/interface/group.c b/deps/oblib/src/rpc/pnio/interface/group.c index 7a3dc88e1..a6358bb1a 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.c +++ b/deps/oblib/src/rpc/pnio/interface/group.c @@ -667,6 +667,18 @@ PN_API int pn_get_fd(uint64_t req_id) return fd; } +PN_API int64_t pn_get_pkt_id(uint64_t req_id) +{ + int64_t pkt_id = -1; + pn_resp_ctx_t* ctx = (typeof(ctx))req_id; + if (unlikely(NULL == ctx)) { + rk_warn("invalid arguments, req_id=%p", ctx); + } else { + pkt_id = ctx->pkt_id; + } + return pkt_id; +} + void pn_print_diag_info(pn_comm_t* pn_comm) { pn_t* pn = (pn_t*)pn_comm; int64_t client_cnt = 0; diff --git a/deps/oblib/src/rpc/pnio/interface/group.h b/deps/oblib/src/rpc/pnio/interface/group.h index be5f5ad2d..fdf5f9c85 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.h +++ b/deps/oblib/src/rpc/pnio/interface/group.h @@ -89,6 +89,7 @@ PN_API int dispatch_accept_fd_to_certain_group(int fd, uint64_t gid); PN_API void pn_stop(uint64_t gid); PN_API void pn_wait(uint64_t gid); PN_API int pn_get_fd(uint64_t req_id); +PN_API int64_t pn_get_pkt_id(uint64_t req_id); PN_API int pn_terminate_pkt(uint64_t gtid, uint32_t pkt_id); extern int64_t pnio_keepalive_timeout; pn_comm_t* get_current_pnio(); diff --git a/src/observer/CMakeLists.txt b/src/observer/CMakeLists.txt index d96370292..054d7737d 100644 --- a/src/observer/CMakeLists.txt +++ b/src/observer/CMakeLists.txt @@ -104,6 +104,7 @@ ob_set_subtarget(ob_server net net/ob_ingress_bw_alloc_service.cpp net/ob_net_endpoint_ingress_rpc_proccessor.cpp net/ob_net_endpoint_ingress_rpc_struct.cpp + net/ob_rpc_reverse_keepalive.cpp ) ob_set_subtarget(ob_server omt diff --git a/src/observer/net/ob_rpc_reverse_keepalive.cpp b/src/observer/net/ob_rpc_reverse_keepalive.cpp new file mode 100644 index 000000000..0e6b309d0 --- /dev/null +++ b/src/observer/net/ob_rpc_reverse_keepalive.cpp @@ -0,0 +1,143 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "observer/net/ob_rpc_reverse_keepalive.h" +#include "rpc/obrpc/ob_rpc_reverse_keepalive_struct.h" +#define USING_LOG_PREFIX RPC_FRAME +namespace oceanbase +{ +namespace obrpc +{ +int ObRpcReverseKeepaliveArg::assign(const ObRpcReverseKeepaliveArg &other) +{ + int ret = OB_SUCCESS; + dst_ = other.dst_; + first_send_ts_ = other.first_send_ts_; + pkt_id_ = other.pkt_id_; + return ret; +} +OB_SERIALIZE_MEMBER(ObRpcReverseKeepaliveArg, dst_, first_send_ts_, pkt_id_); +OB_SERIALIZE_MEMBER(ObRpcReverseKeepaliveResp, ret_); + +ObRpcReverseKeepAliveService rpc_reverse_keepalive_instance; + +int ObRpcReverseKeepAliveService::init(ObSrvRpcProxy *srv_rpc_proxy) +{ + int ret = OB_SUCCESS; + rpc_proxy_ = srv_rpc_proxy; + if (OB_FAIL(rpc_pkt_id_map_.init("RpcKeepalive"))) { + LOG_ERROR("init rpc pkt_id map failed"); + } else { + init_ = true; + } + return ret; +} +int ObRpcReverseKeepAliveService::destroy() +{ + if (init_) { + rpc_pkt_id_map_.destroy(); + } + return OB_SUCCESS; +} +int ObRpcReverseKeepAliveService::receiver_probe(const ObRpcReverseKeepaliveArg &arg) +{ + int ret = OB_SUCCESS; + if OB_ISNULL(rpc_proxy_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("rpc reverse keepalive rpc proxy is null"); + } else { + const int64_t rpc_timeout = GCONF.rpc_timeout; + ObAddr addr = arg.dst_; + int64_t tenant_id = ob_get_tenant_id(); + ObRpcReverseKeepaliveResp res; + ret = rpc_proxy_->to(addr).by(tenant_id).as(OB_SYS_TENANT_ID).timeout(rpc_timeout).rpc_reverse_keepalive(arg, res); + if (OB_SUCC(ret)) { + ret = res.ret_; + } + if OB_FAIL(ret) { + LOG_WARN("send reverse keepalive failed", K(tenant_id), K(arg)); + } + } + return ret; +} +int ObRpcReverseKeepAliveService::sender_register(const int64_t pkt_id, int64_t send_time_us) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!init_)) { + ret = OB_NOT_INIT; + LOG_WARN("rpc reverse keepalive service is not inited"); + } else { + RpcPktID key(pkt_id); + if (OB_FAIL(rpc_pkt_id_map_.insert(key, send_time_us))) { + LOG_WARN("register pkt_id failed", K(key), K(send_time_us)); + } + } + return ret; +} +int ObRpcReverseKeepAliveService::sender_unregister(const int64_t pkt_id) +{ + int ret = OB_SUCCESS; + RpcPktID key(pkt_id); + if (OB_UNLIKELY(!init_)) { + ret = OB_NOT_INIT; + LOG_WARN("rpc reverse keepalive service is not inited", K(pkt_id)); + } else if (OB_FAIL(rpc_pkt_id_map_.erase(key))) { + LOG_WARN("unregister pkt_id failed", K(pkt_id)); + } + return ret; +} +int ObRpcReverseKeepAliveService::check_status(const int64_t send_time_us, const int64_t pkt_id) +{ + int ret = OB_SUCCESS; + int64_t time_us = 0; + RpcPktID key(pkt_id); + if (OB_UNLIKELY(!init_)) { + ret = OB_NOT_INIT; + LOG_WARN("rpc reverse keepalive service is not inited", K(pkt_id)); + } else if OB_FAIL(rpc_pkt_id_map_.get(key, time_us)) { + LOG_INFO("orignal rpc has released", K(ret), K(pkt_id)); + } else if (time_us > send_time_us) { + ret = OB_HASH_NOT_EXIST; + LOG_WARN("send_ts check failed, client has been restarted and the pkt_id is reused", K(time_us), K(send_time_us), K(pkt_id)); + } + return ret; +} +void stream_rpc_register(const int64_t pkt_id, int64_t send_time_us) +{ + IGNORE_RETURN rpc_reverse_keepalive_instance.sender_register(pkt_id, send_time_us); +} +void stream_rpc_unregister(const int64_t pkt_id) +{ + IGNORE_RETURN rpc_reverse_keepalive_instance.sender_unregister(pkt_id); +} +int stream_rpc_reverse_probe(const ObRpcReverseKeepaliveArg& reverse_keepalive_arg) +{ + return rpc_reverse_keepalive_instance.receiver_probe(reverse_keepalive_arg); +} + +}; // end namespace obrpc + +namespace observer +{ +int ObRpcReverseKeepaliveP::process() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!arg_.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("rpc reverse keepalive arg invalid", K(arg_)); + } else { + result_.ret_ = obrpc::rpc_reverse_keepalive_instance.check_status(arg_.first_send_ts_, arg_.pkt_id_); + } + return ret; +} +}; // end of namespace observer +}; // end namespace oceanbase diff --git a/src/observer/net/ob_rpc_reverse_keepalive.h b/src/observer/net/ob_rpc_reverse_keepalive.h new file mode 100644 index 000000000..2f1937fbc --- /dev/null +++ b/src/observer/net/ob_rpc_reverse_keepalive.h @@ -0,0 +1,72 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_OBSERVER_RPC_REVERSE_KEEPALIVE_H +#define OCEANBASE_OBSERVER_RPC_REVERSE_KEEPALIVE_H + +#include "observer/ob_rpc_processor_simple.h" +#include "lib/utility/ob_unify_serialize.h" +#include "lib/net/ob_addr.h" +#include "lib/utility/ob_print_utils.h" +#include "share/ob_srv_rpc_proxy.h" + +namespace oceanbase +{ +namespace obrpc +{ +class ObRpcReverseKeepAliveService +{ +public: + ObRpcReverseKeepAliveService() : init_(false), rpc_proxy_(NULL), map_attr_(), rpc_pkt_id_map_() {} + int init(ObSrvRpcProxy *srv_rpc_proxy); + int destroy(); + // called from the RPC receiver that needs to probe + int receiver_probe(const ObRpcReverseKeepaliveArg& reverse_keepalive_arg); + int sender_register(const int64_t pkt_id, int64_t pcode); + int sender_unregister(const int64_t pkt_id); + int check_status(const int64_t send_ts, const int64_t pkt_id); + bool init_; +public: + struct RpcPktID + { + int64_t rpc_pkt_id_; + RpcPktID(const int64_t id) : rpc_pkt_id_(id) {} + int64_t hash() const + { + return rpc_pkt_id_; + } + int hash(uint64_t &hash_val) const + { + hash_val = hash(); + return OB_SUCCESS; + } + bool operator== (const RpcPktID &other) const + { + return rpc_pkt_id_ == other.rpc_pkt_id_; + } + TO_STRING_KV(K_(rpc_pkt_id)); + }; +private: + ObSrvRpcProxy *rpc_proxy_; + ObMemAttr map_attr_; + common::ObLinearHashMap rpc_pkt_id_map_; +}; +extern ObRpcReverseKeepAliveService rpc_reverse_keepalive_instance; +}; // end namespace obrpc +namespace observer +{ +OB_DEFINE_PROCESSOR_S(Srv, OB_RPC_REVERSE_KEEPALIVE, ObRpcReverseKeepaliveP); + +}; // end of namespace observer +}; // end namespace oceanbase + +#endif /* !OCEANBASE_OBSERVER_RPC_REVERSE_KEEPALIVE_H */ diff --git a/src/observer/ob_inner_sql_rpc_processor.cpp b/src/observer/ob_inner_sql_rpc_processor.cpp index 2bc5c6f8d..946fe4e45 100644 --- a/src/observer/ob_inner_sql_rpc_processor.cpp +++ b/src/observer/ob_inner_sql_rpc_processor.cpp @@ -173,7 +173,7 @@ int ObInnerSqlRpcP::process_read( if (OB_SUCC(ret)) { if (need_flush) { // last row is not be used if (OB_FAIL(obrpc::ObRpcProcessor< obrpc::ObInnerSQLRpcProxy::ObRpc< - obrpc::OB_INNER_SQL_SYNC_TRANSMIT> >::flush(THIS_WORKER.get_timeout_remain()))) { + obrpc::OB_INNER_SQL_SYNC_TRANSMIT> >::flush(THIS_WORKER.get_timeout_remain(), &arg_.get_ctrl_svr()))) { LOG_WARN("fail to flush", K(ret)); } else { LOG_DEBUG("flush scanner successfully", K(scanner), K(scanner.get_found_rows())); diff --git a/src/observer/ob_srv_network_frame.cpp b/src/observer/ob_srv_network_frame.cpp index 298bbff30..d444c668e 100644 --- a/src/observer/ob_srv_network_frame.cpp +++ b/src/observer/ob_srv_network_frame.cpp @@ -22,6 +22,7 @@ #include "share/ob_rpc_share.h" #include "observer/ob_server_struct.h" #include "observer/ob_rpc_intrusion_detect.h" +#include "observer/net/ob_rpc_reverse_keepalive.h" #include "storage/ob_locality_manager.h" #include "lib/ssl/ob_ssl_config.h" extern "C" { @@ -158,6 +159,8 @@ int ObSrvNetworkFrame::init() LOG_ERROR("high prio rpc net register fail", K(ret)); } else if (OB_FAIL(ingress_service_.init(GCONF.cluster_id))) { LOG_ERROR("endpoint ingress service init fail", K(ret)); + } else if (OB_FAIL(rpc_reverse_keepalive_instance.init(GCTX.srv_rpc_proxy_))) { + LOG_ERROR("rpc reverse keepalive instance init fail", K(ret)); } else if (OB_FAIL(net_.add_rpc_unix_listen(rpc_unix_path, rpc_handler_))) { LOG_ERROR("listen rpc unix path fail"); } else { @@ -179,6 +182,7 @@ void ObSrvNetworkFrame::destroy() net_.destroy(); ObNetKeepAlive::get_instance().destroy(); ingress_service_.destroy(); + rpc_reverse_keepalive_instance.destroy(); if (NULL != obmysql::global_sql_nio_server) { obmysql::global_sql_nio_server->destroy(); } diff --git a/src/observer/ob_srv_xlator_primary.cpp b/src/observer/ob_srv_xlator_primary.cpp index e373e6b8d..7631617ad 100644 --- a/src/observer/ob_srv_xlator_primary.cpp +++ b/src/observer/ob_srv_xlator_primary.cpp @@ -45,6 +45,7 @@ #include "observer/table/ob_table_batch_execute_processor.h" #include "observer/table/ob_table_query_processor.h" #include "observer/table/ob_table_query_and_mutate_processor.h" +#include "observer/net/ob_rpc_reverse_keepalive.h" #include "observer/dbms_job/ob_dbms_job_rpc_processor.h" #include "storage/tx_storage/ob_tenant_freezer_rpc.h" @@ -118,6 +119,7 @@ void oceanbase::observer::init_srv_xlator_for_sys(ObSrvRpcXlator *xlator) { RPC_PROCESSOR(ObRpcRunDBMSSchedJobP, gctx_); RPC_PROCESSOR(ObRpcGetServerResourceInfoP, gctx_); + RPC_PROCESSOR(ObRpcReverseKeepaliveP, gctx_); } void oceanbase::observer::init_srv_xlator_for_schema_test(ObSrvRpcXlator *xlator) { diff --git a/src/share/ob_srv_rpc_proxy.h b/src/share/ob_srv_rpc_proxy.h index 4e6d5cf9f..caf78e1c0 100755 --- a/src/share/ob_srv_rpc_proxy.h +++ b/src/share/ob_srv_rpc_proxy.h @@ -22,6 +22,7 @@ #include "observer/net/ob_net_endpoint_ingress_rpc_struct.h" #include "share/ob_heartbeat_struct.h" #include "observer/table_load/control/ob_table_load_control_rpc_struct.h" +#include "rpc/obrpc/ob_rpc_reverse_keepalive_struct.h" namespace oceanbase { @@ -255,6 +256,7 @@ public: RPC_S(PR5 cancel_gather_stats, OB_CANCEL_GATHER_STATS, (ObCancelGatherStatsArg)); RPC_S(PR5 force_set_tenant_log_disk, OB_LOG_FORCE_SET_TENANT_LOG_DISK, (obrpc::ObForceSetTenantLogDiskArg)); RPC_S(PR5 dump_server_usage, OB_FORCE_DUMP_SERVER_USAGE, (obrpc::ObDumpServerUsageRequest), obrpc::ObDumpServerUsageResult); + RPC_S(PR5 rpc_reverse_keepalive, OB_RPC_REVERSE_KEEPALIVE, (obrpc::ObRpcReverseKeepaliveArg), obrpc::ObRpcReverseKeepaliveResp); }; // end of class ObSrvRpcProxy } // end of namespace rpc diff --git a/src/sql/executor/ob_remote_executor_processor.cpp b/src/sql/executor/ob_remote_executor_processor.cpp index 5b08ae0e1..228e256c6 100644 --- a/src/sql/executor/ob_remote_executor_processor.cpp +++ b/src/sql/executor/ob_remote_executor_processor.cpp @@ -308,7 +308,7 @@ int ObRemoteBaseExecuteP::sync_send_result(ObExecContext &exec_ctx, } else { has_send_result_ = true; // override error code - if (OB_FAIL(ObRpcProcessor::flush(THIS_WORKER.get_timeout_remain()))) { + if (OB_FAIL(ObRpcProcessor::flush(THIS_WORKER.get_timeout_remain(), &ObRpcProcessor::arg_.get_ctrl_server()))) { LOG_WARN("fail to flush", K(ret)); } else { // 超过1个scanner的情况,每次发送都打印一条日志 diff --git a/src/storage/ob_storage_rpc.cpp b/src/storage/ob_storage_rpc.cpp index d3022955b..63f1ae422 100644 --- a/src/storage/ob_storage_rpc.cpp +++ b/src/storage/ob_storage_rpc.cpp @@ -1317,7 +1317,7 @@ int ObStorageStreamRpcP::flush_and_wait() if (OB_FAIL(this->check_timeout())) { LOG_WARN("rpc is timeout, no need flush", K(ret)); - } else if (OB_FAIL(this->flush(OB_DEFAULT_STREAM_WAIT_TIMEOUT))) { + } else if (OB_FAIL(this->flush())) { STORAGE_LOG(WARN, "failed to flush", K(ret)); } else { this->result_.get_position() = 0; diff --git a/unittest/observer/CMakeLists.txt b/unittest/observer/CMakeLists.txt index f6d02225a..61a6e5d43 100644 --- a/unittest/observer/CMakeLists.txt +++ b/unittest/observer/CMakeLists.txt @@ -5,6 +5,7 @@ storage_unittest(test_create_executor table/test_create_executor.cpp) storage_unittest(test_table_aggregation table/test_table_aggregation.cpp) storage_unittest(test_table_sess_pool table/test_table_sess_pool.cpp) storage_unittest(test_ingress_bw_alloc_manager net/test_ingress_bw_alloc_manager.cpp) +storage_unittest(test_rpc_reverse_keepalive net/test_rpc_reverse_keepalive.cpp) ob_unittest(test_obkv_config tableapi/test_obkv_config.cpp) ob_unittest(test_uniq_task_queue) ob_unittest(test_table_connection tableapi/test_table_connection.cpp) diff --git a/unittest/observer/net/test_rpc_reverse_keepalive.cpp b/unittest/observer/net/test_rpc_reverse_keepalive.cpp new file mode 100644 index 000000000..a5bf0a69a --- /dev/null +++ b/unittest/observer/net/test_rpc_reverse_keepalive.cpp @@ -0,0 +1,160 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX SERVER +#include +#include +#include "observer/net/ob_rpc_reverse_keepalive.h" +#include "rpc/obrpc/ob_rpc_reverse_keepalive_struct.h" +#include "rpc/frame/ob_req_deliver.h" +#include "lib/net/ob_net_util.h" + +#define private public + +using namespace oceanbase::rpc::frame; +namespace oceanbase +{ +namespace obrpc +{ +class TestRpcReverseKeepAliveService : public testing::Test +{ +public: + TestRpcReverseKeepAliveService() + {} + virtual ~TestRpcReverseKeepAliveService() + {} + static int find_port(int start_port = 20000) + { + int sock_fd = -1; + int port = -1; + if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + // failed to create socket + } else { + struct sockaddr_in serv_addr; + memset(&serv_addr, 0, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + for (port = start_port; port <= 65535; ++port) { + serv_addr.sin_port = htons(port); + serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); + if (bind(sock_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) == 0) { + close(sock_fd); + break; + } + } + close(sock_fd); + } + return port; + } +}; +class ObTestRpcQHandler + : public rpc::frame::ObiReqQHandler +{ +public: + ObTestRpcQHandler() {} + int onThreadCreated(obsys::CThread *) override final { return OB_SUCCESS; }; + int onThreadDestroy(obsys::CThread *) override final { return OB_SUCCESS; }; + bool handlePacketQueue(rpc::ObRequest *req, void *args) override final + { + ObIAllocator &alloc = THIS_WORKER.get_sql_arena_allocator(); + const observer::ObGlobalContext gctx; + ObReqProcessor *processor = NULL; + ObRpcPacketCode pcode = reinterpret_cast(req->get_packet()).get_pcode(); + if (OB_RPC_REVERSE_KEEPALIVE == pcode) { + processor = OB_NEWx(observer::ObRpcReverseKeepaliveP, &alloc, gctx); + } + if (OB_NOT_NULL(processor)) { + processor->init(); + processor->set_ob_request(*req); + processor->run(); + processor->~ObReqProcessor(); + THIS_WORKER.get_sql_arena_allocator().free(processor); + processor = NULL; + } + return true; + } + +}; +class ObTestRpcDeliver + : public rpc::frame::ObReqQDeliver +{ +public: + ObTestRpcDeliver() : ObReqQDeliver(qhandler_) {}; + int deliver(rpc::ObRequest &req) override final + { + int ret = OB_SUCCESS; + LOG_INFO("deliver rpc request", K(req)); + qhandler_.handlePacketQueue(&req, NULL); + return ret; + } + int init() {return OB_SUCCESS;} + void stop() override final + {}; + ObTestRpcQHandler qhandler_; +}; +bool enable_pkt_nio(bool start_as_client) { + return true; +} + +TEST_F(TestRpcReverseKeepAliveService, reverse_keepalive_service) +{ + // init + int ret = common::OB_SUCCESS; + obrpc::ObSrvRpcProxy srv_rpc_proxy; + ObReqTransport dummy_transport(NULL, NULL); + srv_rpc_proxy.init(&dummy_transport); + ObTestRpcDeliver deliver; + int port = find_port(); + ObAddr dst; + uint32_t ip_value = 0; + if (OB_FAIL(obsys::ObNetUtil::get_local_addr_ipv4("eth0", ip_value)) + && OB_FAIL(obsys::ObNetUtil::get_local_addr_ipv4("bond0", ip_value))) { + dst.set_ip_addr("127.0.0.1", port); + } else { + ip_value = ntohl(ip_value); + dst.set_ipv4_addr(ip_value, port); + LOG_INFO("get local ip", K(dst)); + } + + // test + ret = rpc_reverse_keepalive_instance.init(&srv_rpc_proxy); + ASSERT_EQ(ret, OB_SUCCESS); + ret = global_poc_server.start(port, 1, &deliver); + ASSERT_EQ(ret, OB_SUCCESS); + + uint32_t pkt_id = 1; + stream_rpc_register(pkt_id, ObTimeUtility::current_time()); + ObRpcReverseKeepaliveArg a(dst, ObTimeUtility::current_time(), pkt_id); + ret = stream_rpc_reverse_probe(a); + ASSERT_EQ(ret, OB_SUCCESS); + stream_rpc_unregister(pkt_id); + ret = stream_rpc_reverse_probe(a); + ASSERT_EQ(ret, OB_ENTRY_NOT_EXIST); + + pkt_id = 2; + int64_t send_time_us = ObTimeUtility::current_time(); + stream_rpc_register(pkt_id, send_time_us + 100000); + ObRpcReverseKeepaliveArg a2(dst, send_time_us, pkt_id); + ret = stream_rpc_reverse_probe(a2); + ASSERT_EQ(ret, OB_HASH_NOT_EXIST); + + rpc_reverse_keepalive_instance.destroy(); +} + +} // end namespace obrpc +} // end namespace oceanbase + +int main(int argc, char **argv) +{ + OB_LOGGER.set_file_name("test_reverse_keepalive.log", true); + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}