add some debug log batch rpc
This commit is contained in:
@ -43,6 +43,8 @@ int ObBatchP::process()
|
||||
int64_t trx_batch_cnt = 0;
|
||||
int64_t sql_batch_cnt = 0;
|
||||
while(NULL != (req = arg_.next(req_pos))) {
|
||||
// rewrite ret
|
||||
ret = OB_SUCCESS;
|
||||
const uint32_t flag = (req->type_ >> 24);
|
||||
const uint32_t batch_type = ((req->type_ >> 16) & 0xff);
|
||||
const uint32_t msg_type = req->type_ & 0xffff;
|
||||
@ -89,6 +91,10 @@ int ObBatchP::process()
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
RPC_LOG(WARN, "process batch rpc",
|
||||
K(ret), K(sender), K(ls_id), K(src_cluster_id), K(flag), K(batch_type), K(msg_type), K(trace_id));
|
||||
}
|
||||
}
|
||||
if (REACH_TIME_INTERVAL(3000000)) {
|
||||
RPC_LOG(INFO, "batch rpc statistics",
|
||||
|
@ -122,7 +122,10 @@ public:
|
||||
if ((1 == flag) && OB_FAIL(trace_id->serialize(buf, total_size, pos))) {
|
||||
header = NULL;
|
||||
} else {
|
||||
req.fill_buffer(buf + pos, req_size, filled_size);
|
||||
if (OB_FAIL(req.fill_buffer(buf + pos, req_size, filled_size))) {
|
||||
RPC_LOG(WARN, "fill buffer failed", K(ret), K(req_size), K(total_size));
|
||||
header = NULL;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
freeze(seq);
|
||||
@ -161,7 +164,10 @@ public:
|
||||
} else if (OB_FAIL(ls.serialize(buf, total_size, pos))) {
|
||||
header = NULL;
|
||||
} else {
|
||||
req.fill_buffer(buf + pos, req_size, filled_size);
|
||||
if (OB_FAIL(req.fill_buffer(buf + pos, req_size, filled_size))) {
|
||||
RPC_LOG(WARN, "fill buffer failed", K(ret), K(req_size), K(total_size));
|
||||
header = NULL;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
freeze(seq);
|
||||
|
@ -380,6 +380,8 @@ int ObTransRpc::post_msg(const ObAddr &server, ObTxMsg &msg)
|
||||
msg.get_receiver(),
|
||||
msg))) {
|
||||
TRANS_LOG(WARN, "post msg failed", K(ret));
|
||||
} else {
|
||||
total_batch_msg_count_++;
|
||||
}
|
||||
} else if (OB_FAIL(post_(server, msg))) {
|
||||
TRANS_LOG(WARN, "post msg error", K(ret), K(server), K(msg));
|
||||
@ -440,6 +442,8 @@ int ObTransRpc::post_msg(const ObLSID &ls_id, ObTxMsg &msg)
|
||||
msg.get_receiver(),
|
||||
msg))) {
|
||||
TRANS_LOG(WARN, "post msg failed", K(ret));
|
||||
} else {
|
||||
total_batch_msg_count_++;
|
||||
}
|
||||
} else if (OB_FAIL(post_(server, msg))) {
|
||||
TRANS_LOG(WARN, "post msg error", K(ret), K(server), K(msg));
|
||||
@ -598,9 +602,9 @@ void ObTransRpc::statistics_()
|
||||
{
|
||||
const int64_t cur_ts = ObTimeUtility::current_time();
|
||||
if (cur_ts - last_stat_ts_ > STAT_INTERVAL) {
|
||||
TRANS_LOG(INFO, "rpc statistics", K_(total_trans_msg_count), K_(total_trans_resp_msg_count));
|
||||
TRANS_LOG(INFO, "rpc statistics", K_(total_trans_msg_count), K_(total_batch_msg_count));
|
||||
total_trans_msg_count_ = 0;
|
||||
total_trans_resp_msg_count_ = 0;
|
||||
total_batch_msg_count_ = 0;
|
||||
last_stat_ts_ = cur_ts;
|
||||
}
|
||||
}
|
||||
|
@ -396,7 +396,7 @@ public:
|
||||
rpc_proxy_(),
|
||||
batch_rpc_(NULL),
|
||||
total_trans_msg_count_(0),
|
||||
total_trans_resp_msg_count_(0),
|
||||
total_batch_msg_count_(0),
|
||||
last_stat_ts_(0) {}
|
||||
~ObTransRpc() { destroy(); }
|
||||
int init(ObTransService *trans_service,
|
||||
@ -448,9 +448,9 @@ private:
|
||||
obrpc::ObTxFreeRouteRPCCB<obrpc::OB_TX_FREE_ROUTE_CHECK_ALIVE> tx_free_route_ck_alive_cb_;
|
||||
obrpc::ObTxFreeRouteRPCCB<obrpc::OB_TX_FREE_ROUTE_CHECK_ALIVE_RESP> tx_free_route_ck_alive_resp_cb_;
|
||||
// statistic info
|
||||
int64_t total_trans_msg_count_;
|
||||
int64_t total_trans_resp_msg_count_;
|
||||
int64_t last_stat_ts_;
|
||||
int64_t total_trans_msg_count_ CACHE_ALIGNED;
|
||||
int64_t total_batch_msg_count_ CACHE_ALIGNED;
|
||||
int64_t last_stat_ts_ CACHE_ALIGNED;
|
||||
};
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user