[CP] [CP] pkt-nio rpc add RPC_STAT and sync rpc add wait events

This commit is contained in:
liucc1997
2024-01-10 14:12:35 +00:00
committed by ob-robot
parent a0a97117bf
commit d94a7de336
7 changed files with 45 additions and 13 deletions

View File

@ -42,6 +42,8 @@ int ObSyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz)
send_ret_ = OB_TIMEOUT;
RPC_LOG_RET(WARN, send_ret_, "response is null", KP(buf), K(sz), K(io_err));
} else {
EVENT_INC(RPC_PACKET_IN);
EVENT_ADD(RPC_PACKET_IN_BYTES, sz);
buf = buf + easy_head_size;
sz = sz - easy_head_size; // skip easy header
sz_ = sz;
@ -58,8 +60,9 @@ int ObSyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz)
rk_futex_wake(&cond_, 1);
return ret;
}
int ObSyncRespCallback::wait()
int ObSyncRespCallback::wait(const int64_t wait_timeout_us, const int64_t pcode, const int64_t req_sz)
{
ObWaitEventGuard wait_guard(ObWaitEventIds::SYNC_RPC, wait_timeout_us / 1000, pcode, req_sz);
while(ATOMIC_LOAD(&cond_) == 0) {
rk_futex_wait(&cond_, 0, NULL);
}
@ -112,6 +115,8 @@ int ObAsyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz)
ObRpcPacketCode pcode = OB_INVALID_RPC_CODE;
ObRpcPacket* ret_pkt = NULL;
if (buf != NULL && sz > easy_head_size) {
EVENT_INC(RPC_PACKET_IN);
EVENT_ADD(RPC_PACKET_IN_BYTES, sz);
sz = sz - easy_head_size;
buf = buf + easy_head_size;
} else {
@ -121,6 +126,7 @@ int ObAsyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz)
if (ucb_ == NULL) {
// do nothing
} else {
ucb_->record_stat(buf == NULL);
bool cb_cloned = ucb_->get_cloned();
pcode = ucb_->get_pcode();
if (0 != io_err) {

View File

@ -15,6 +15,7 @@
#include "rpc/obrpc/ob_rpc_endec.h"
#include "rpc/frame/ob_req_transport.h"
#include "rpc/ob_request.h"
#include "rpc/obrpc/ob_rpc_stat.h"
#include "rpc/obrpc/ob_poc_rpc_server.h"
extern "C" {
@ -31,7 +32,7 @@ public:
~ObSyncRespCallback() {}
void* alloc(int64_t sz) { return pool_.alloc(sz); }
int handle_resp(int io_err, const char* buf, int64_t sz);
int wait();
int wait(const int64_t wait_timeout_us, const int64_t pcode, const int64_t req_sz);
const char* get_resp(int64_t& sz) {
sz = sz_;
return resp_;
@ -148,7 +149,10 @@ public:
&cb))) {
ret = translate_io_error(sys_err);
RPC_LOG(WARN, "pn_send fail", K(sys_err), K(addr), K(pcode));
} else if (OB_FAIL(cb.wait())) {
} else {
EVENT_INC(RPC_PACKET_OUT);
EVENT_ADD(RPC_PACKET_OUT_BYTES, req_sz);
if (OB_FAIL(cb.wait(get_proxy_timeout(proxy), pcode, req_sz))) {
RPC_LOG(WARN, "sync rpc execute fail", K(ret), K(addr), K(pcode));
} else if (NULL == (resp = cb.get_resp(resp_sz))) {
ret = common::OB_ERR_UNEXPECTED;
@ -157,6 +161,7 @@ public:
RPC_LOG(WARN, "execute rpc fail", K(addr), K(pcode), K(ret));
}
}
}
if (rcode.rcode_ != OB_DESERIALIZE_ERROR) {
int wb_ret = OB_SUCCESS;
if (common::OB_SUCCESS != (wb_ret = log_user_error_and_warn(rcode))) {
@ -167,6 +172,16 @@ public:
set_handle(proxy, handle, pcode, opts, resp_pkt.is_stream_next(), resp_pkt.get_session_id());
}
}
rpc::RpcStatPiece piece;
piece.size_ = req_sz;
piece.time_ = ObTimeUtility::current_time() - start_ts;
if (OB_FAIL(ret)) {
piece.failed_ = true;
if (OB_TIMEOUT == ret) {
piece.is_timeout_ = true;
}
}
RPC_STAT(pcode, src_tenant_id, piece);
return ret;
}
template<typename Input, typename UCB>
@ -225,6 +240,9 @@ public:
)) {
ret = translate_io_error(sys_err);
RPC_LOG(WARN, "pn_send fail", K(sys_err), K(addr), K(pcode));
} else {
EVENT_INC(RPC_PACKET_OUT);
EVENT_ADD(RPC_PACKET_OUT_BYTES, req_sz);
}
}
}

View File

@ -87,6 +87,8 @@ template <typename T>
bool need_compressed = ObCompressorPool::get_instance().need_common_compress(compressor_type);
if (need_compressed) {
// compress
EVENT_INC(RPC_COMPRESS_ORIGINAL_PACKET_CNT);
EVENT_ADD(RPC_COMPRESS_ORIGINAL_SIZE, payload_sz);
int tmp_ret = OB_SUCCESS;
common::ObCompressor *compressor = NULL;
char *compressed_buf = NULL;
@ -112,6 +114,8 @@ template <typename T>
pkt.set_original_len(static_cast<int32_t>(payload_sz));
memcpy(payload_buf, compressed_buf, dst_data_size);
payload_sz = dst_data_size;
EVENT_INC(RPC_COMPRESS_COMPRESSED_PACKET_CNT);
EVENT_ADD(RPC_COMPRESS_COMPRESSED_SIZE, dst_data_size);
}
if (NULL != compressed_buf) {
ob_free(compressed_buf);

View File

@ -147,7 +147,7 @@ public:
void set_compressor_type(const common::ObCompressorType &compressor_type) { compressor_type_ = compressor_type; }
void set_dst_cluster(int64_t dst_cluster_id) { dst_cluster_id_ = dst_cluster_id; }
void set_transport_impl(int transport_impl) { transport_impl_ = transport_impl; }
void set_result_code(const ObRpcResultCode retcode) {
void set_result_code(const ObRpcResultCode &retcode) {
rcode_.rcode_ = retcode.rcode_;
snprintf(rcode_.msg_, common::OB_MAX_ERROR_MSG_LEN, "%s", retcode.msg_);
rcode_.warnings_.reset();

View File

@ -83,7 +83,7 @@ int SSHandle<pcodeStruct>::get_more(typename pcodeStruct::Response &result)
&cb))) {
ret = ObPocClientStub::translate_io_error(pn_err);
RPC_LOG(WARN, "pnio post fail", K(pn_err));
} else if (OB_FAIL(cb.wait())) {
} else if (OB_FAIL(cb.wait(proxy_.timeout(), pcode_, pnio_req_sz))) {
RPC_LOG(WARN, "stream rpc execute fail", K(ret), K(dst_));
} else if (NULL == (resp = cb.get_resp(resp_sz))) {
ret = common::OB_ERR_UNEXPECTED;
@ -219,7 +219,7 @@ int SSHandle<pcodeStruct>::abort()
&cb))) {
ret = ObPocClientStub::translate_io_error(pn_err);
RPC_LOG(WARN, "pnio post fail", K(pn_err));
} else if (OB_FAIL(cb.wait())) {
} else if (OB_FAIL(cb.wait(proxy_.timeout(), pcode_, pnio_req_sz))) {
RPC_LOG(WARN, "stream rpc execute fail", K(ret), K(dst_));
} else if (NULL == (resp = cb.get_resp(resp_sz))) {
ret = common::OB_ERR_UNEXPECTED;

View File

@ -1610,7 +1610,7 @@ int ObInnerTableSchema::v_ob_rpc_incoming_schema(ObTableSchema &table_schema)
table_schema.set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
if (OB_SUCC(ret)) {
if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT * FROM oceanbase.GV$OB_RPC_INCOMING )__"))) {
if (OB_FAIL(table_schema.set_view_definition(R"__( SELECT * FROM oceanbase.GV$OB_RPC_INCOMING WHERE SVR_IP=HOST_IP() AND SVR_PORT=RPC_PORT() )__"))) {
LOG_ERROR("fail to set view_definition", K(ret));
}
}

View File

@ -15795,6 +15795,10 @@ SELECT
*
FROM
oceanbase.GV$OB_RPC_INCOMING
WHERE
SVR_IP=HOST_IP()
AND
SVR_PORT=RPC_PORT()
""".replace("\n", " "),
)