From 68638d77f1ec93a40cc737761f0a9dfc3352616b Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 9 Feb 2024 07:04:48 +0000 Subject: [PATCH] [CP] [CP] pkt-nio rpc add RPC_STAT and sync rpc add wait events --- deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp | 8 ++++- deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h | 34 ++++++++++++++----- deps/oblib/src/rpc/obrpc/ob_rpc_endec.h | 4 +++ deps/oblib/src/rpc/obrpc/ob_rpc_proxy.h | 2 +- deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp | 4 +-- .../ob_inner_table_schema.21001_21050.cpp | 2 +- .../inner_table/ob_inner_table_schema_def.py | 4 +++ 7 files changed, 45 insertions(+), 13 deletions(-) diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp index 115a19b82b..a3080c2523 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp @@ -42,6 +42,8 @@ int ObSyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz) send_ret_ = OB_TIMEOUT; RPC_LOG_RET(WARN, send_ret_, "response is null", KP(buf), K(sz), K(io_err)); } else { + EVENT_INC(RPC_PACKET_IN); + EVENT_ADD(RPC_PACKET_IN_BYTES, sz); buf = buf + easy_head_size; sz = sz - easy_head_size; // skip easy header sz_ = sz; @@ -58,8 +60,9 @@ int ObSyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz) rk_futex_wake(&cond_, 1); return ret; } -int ObSyncRespCallback::wait() +int ObSyncRespCallback::wait(const int64_t wait_timeout_us, const int64_t pcode, const int64_t req_sz) { + ObWaitEventGuard wait_guard(ObWaitEventIds::SYNC_RPC, wait_timeout_us / 1000, pcode, req_sz); while(ATOMIC_LOAD(&cond_) == 0) { rk_futex_wait(&cond_, 0, NULL); } @@ -112,6 +115,8 @@ int ObAsyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz) ObRpcPacketCode pcode = OB_INVALID_RPC_CODE; ObRpcPacket* ret_pkt = NULL; if (buf != NULL && sz > easy_head_size) { + EVENT_INC(RPC_PACKET_IN); + EVENT_ADD(RPC_PACKET_IN_BYTES, sz); sz = sz - easy_head_size; buf = buf + easy_head_size; } else { @@ -121,6 +126,7 @@ int ObAsyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz) if (ucb_ == NULL) { // do nothing } else { + ucb_->record_stat(buf == NULL); bool cb_cloned = ucb_->get_cloned(); pcode = ucb_->get_pcode(); if (0 != io_err) { diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h index 7ed15f7301..99311acb2b 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h @@ -15,6 +15,7 @@ #include "rpc/obrpc/ob_rpc_endec.h" #include "rpc/frame/ob_req_transport.h" #include "rpc/ob_request.h" +#include "rpc/obrpc/ob_rpc_stat.h" #include "rpc/obrpc/ob_poc_rpc_server.h" extern "C" { @@ -31,7 +32,7 @@ public: ~ObSyncRespCallback() {} void* alloc(int64_t sz) { return pool_.alloc(sz); } int handle_resp(int io_err, const char* buf, int64_t sz); - int wait(); + int wait(const int64_t wait_timeout_us, const int64_t pcode, const int64_t req_sz); const char* get_resp(int64_t& sz) { sz = sz_; return resp_; @@ -148,13 +149,17 @@ public: &cb))) { ret = translate_io_error(sys_err); RPC_LOG(WARN, "pn_send fail", K(sys_err), K(addr), K(pcode)); - } else if (OB_FAIL(cb.wait())) { - RPC_LOG(WARN, "sync rpc execute fail", K(ret), K(addr), K(pcode)); - } else if (NULL == (resp = cb.get_resp(resp_sz))) { - ret = common::OB_ERR_UNEXPECTED; - RPC_LOG(WARN, "sync rpc execute success but resp is null", K(ret), K(addr), K(pcode)); - } else if (OB_FAIL(rpc_decode_resp(resp, resp_sz, out, resp_pkt, rcode))) { - RPC_LOG(WARN, "execute rpc fail", K(addr), K(pcode), K(ret)); + } else { + EVENT_INC(RPC_PACKET_OUT); + EVENT_ADD(RPC_PACKET_OUT_BYTES, req_sz); + if (OB_FAIL(cb.wait(get_proxy_timeout(proxy), pcode, req_sz))) { + RPC_LOG(WARN, "sync rpc execute fail", K(ret), K(addr), K(pcode)); + } else if (NULL == (resp = cb.get_resp(resp_sz))) { + ret = common::OB_ERR_UNEXPECTED; + RPC_LOG(WARN, "sync rpc execute success but resp is null", K(ret), K(addr), K(pcode)); + } else if (OB_FAIL(rpc_decode_resp(resp, resp_sz, out, resp_pkt, rcode))) { + RPC_LOG(WARN, "execute rpc fail", K(addr), K(pcode), K(ret)); + } } } if (rcode.rcode_ != OB_DESERIALIZE_ERROR) { @@ -167,6 +172,16 @@ public: set_handle(proxy, handle, pcode, opts, resp_pkt.is_stream_next(), resp_pkt.get_session_id()); } } + rpc::RpcStatPiece piece; + piece.size_ = req_sz; + piece.time_ = ObTimeUtility::current_time() - start_ts; + if (OB_FAIL(ret)) { + piece.failed_ = true; + if (OB_TIMEOUT == ret) { + piece.is_timeout_ = true; + } + } + RPC_STAT(pcode, src_tenant_id, piece); return ret; } template @@ -225,6 +240,9 @@ public: )) { ret = translate_io_error(sys_err); RPC_LOG(WARN, "pn_send fail", K(sys_err), K(addr), K(pcode)); + } else { + EVENT_INC(RPC_PACKET_OUT); + EVENT_ADD(RPC_PACKET_OUT_BYTES, req_sz); } } } diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_endec.h b/deps/oblib/src/rpc/obrpc/ob_rpc_endec.h index 5ac76d2a21..add5cab62b 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_endec.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_endec.h @@ -87,6 +87,8 @@ template bool need_compressed = ObCompressorPool::get_instance().need_common_compress(compressor_type); if (need_compressed) { // compress + EVENT_INC(RPC_COMPRESS_ORIGINAL_PACKET_CNT); + EVENT_ADD(RPC_COMPRESS_ORIGINAL_SIZE, payload_sz); int tmp_ret = OB_SUCCESS; common::ObCompressor *compressor = NULL; char *compressed_buf = NULL; @@ -112,6 +114,8 @@ template pkt.set_original_len(static_cast(payload_sz)); memcpy(payload_buf, compressed_buf, dst_data_size); payload_sz = dst_data_size; + EVENT_INC(RPC_COMPRESS_COMPRESSED_PACKET_CNT); + EVENT_ADD(RPC_COMPRESS_COMPRESSED_SIZE, dst_data_size); } if (NULL != compressed_buf) { ob_free(compressed_buf); diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.h b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.h index d98cad738e..9bc467ad40 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.h @@ -147,7 +147,7 @@ public: void set_compressor_type(const common::ObCompressorType &compressor_type) { compressor_type_ = compressor_type; } void set_dst_cluster(int64_t dst_cluster_id) { dst_cluster_id_ = dst_cluster_id; } void set_transport_impl(int transport_impl) { transport_impl_ = transport_impl; } - void set_result_code(const ObRpcResultCode retcode) { + void set_result_code(const ObRpcResultCode &retcode) { rcode_.rcode_ = retcode.rcode_; snprintf(rcode_.msg_, common::OB_MAX_ERROR_MSG_LEN, "%s", retcode.msg_); rcode_.warnings_.reset(); diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp index 85aaf11185..b3d0dd71be 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp @@ -83,7 +83,7 @@ int SSHandle::get_more(typename pcodeStruct::Response &result) &cb))) { ret = ObPocClientStub::translate_io_error(pn_err); RPC_LOG(WARN, "pnio post fail", K(pn_err)); - } else if (OB_FAIL(cb.wait())) { + } else if (OB_FAIL(cb.wait(proxy_.timeout(), pcode_, pnio_req_sz))) { RPC_LOG(WARN, "stream rpc execute fail", K(ret), K(dst_)); } else if (NULL == (resp = cb.get_resp(resp_sz))) { ret = common::OB_ERR_UNEXPECTED; @@ -219,7 +219,7 @@ int SSHandle::abort() &cb))) { ret = ObPocClientStub::translate_io_error(pn_err); RPC_LOG(WARN, "pnio post fail", K(pn_err)); - } else if (OB_FAIL(cb.wait())) { + } else if (OB_FAIL(cb.wait(proxy_.timeout(), pcode_, pnio_req_sz))) { RPC_LOG(WARN, "stream rpc execute fail", K(ret), K(dst_)); } else if (NULL == (resp = cb.get_resp(resp_sz))) { ret = common::OB_ERR_UNEXPECTED; diff --git a/src/share/inner_table/ob_inner_table_schema.21001_21050.cpp b/src/share/inner_table/ob_inner_table_schema.21001_21050.cpp index 46e915b054..bb70141378 100644 --- a/src/share/inner_table/ob_inner_table_schema.21001_21050.cpp +++ b/src/share/inner_table/ob_inner_table_schema.21001_21050.cpp @@ -1610,7 +1610,7 @@ int ObInnerTableSchema::v_ob_rpc_incoming_schema(ObTableSchema &table_schema) table_schema.set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset())); if (OB_SUCC(ret)) { - if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT * FROM oceanbase.GV$OB_RPC_INCOMING )__"))) { + if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT * FROM oceanbase.GV$OB_RPC_INCOMING WHERE SVR_IP=HOST_IP() AND SVR_PORT=RPC_PORT() )__"))) { LOG_ERROR("fail to set view_definition", K(ret)); } } diff --git a/src/share/inner_table/ob_inner_table_schema_def.py b/src/share/inner_table/ob_inner_table_schema_def.py index 97bbd5e6dd..6a55ee06e3 100644 --- a/src/share/inner_table/ob_inner_table_schema_def.py +++ b/src/share/inner_table/ob_inner_table_schema_def.py @@ -15795,6 +15795,10 @@ SELECT * FROM oceanbase.GV$OB_RPC_INCOMING +WHERE + SVR_IP=HOST_IP() + AND + SVR_PORT=RPC_PORT() """.replace("\n", " "), )