[CP] add defence for rpc encode error and add statistics for async_cb execution

This commit is contained in:
496148326@qq.com
2023-10-31 18:13:09 +00:00
committed by ob-robot
parent a6b3ec5a21
commit e6a041419f
3 changed files with 28 additions and 4 deletions

View File

@ -106,6 +106,10 @@ int ObAsyncRespCallback::create(ObRpcMemPool& pool, UAsyncCB* ucb, ObAsyncRespCa
int ObAsyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz) int ObAsyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const int64_t start_time = ObTimeUtility::current_time();
int64_t after_decode_time = 0;
int64_t after_process_time = 0;
ObRpcPacketCode pcode = OB_INVALID_RPC_CODE;
ObRpcPacket* ret_pkt = NULL; ObRpcPacket* ret_pkt = NULL;
if (buf != NULL && sz > easy_head_size) { if (buf != NULL && sz > easy_head_size) {
sz = sz - easy_head_size; sz = sz - easy_head_size;
@ -118,6 +122,7 @@ int ObAsyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz)
// do nothing // do nothing
} else { } else {
bool cb_cloned = ucb_->get_cloned(); bool cb_cloned = ucb_->get_cloned();
pcode = ucb_->get_pcode();
if (0 != io_err) { if (0 != io_err) {
ucb_->set_error(io_err); ucb_->set_error(io_err);
if (OB_SUCCESS != ucb_->on_error(io_err)) { if (OB_SUCCESS != ucb_->on_error(io_err)) {
@ -138,10 +143,13 @@ int ObAsyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz)
ucb_->on_invalid(); ucb_->on_invalid();
RPC_LOG(WARN, "ucb.decode fail", K(ret)); RPC_LOG(WARN, "ucb.decode fail", K(ret));
} else { } else {
after_decode_time = ObTimeUtility::current_time();
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
pcode = ret_pkt->get_pcode();
if (OB_SUCCESS != (tmp_ret = ucb_->process())) { if (OB_SUCCESS != (tmp_ret = ucb_->process())) {
RPC_LOG(WARN, "ucb.process fail", K(tmp_ret)); RPC_LOG(WARN, "ucb.process fail", K(tmp_ret));
} }
after_process_time = ObTimeUtility::current_time();
} }
if (cb_cloned) { if (cb_cloned) {
ucb_->~AsyncCB(); ucb_->~AsyncCB();
@ -149,6 +157,15 @@ int ObAsyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz)
} }
pool_.destroy(); pool_.destroy();
ObCurTraceId::reset(); ObCurTraceId::reset();
const int64_t cur_time = ObTimeUtility::current_time();
const int64_t total_time = cur_time - start_time;
const int64_t decode_time = after_decode_time - start_time;
const int64_t process_time = after_process_time - after_decode_time;
const int64_t session_destroy_time = cur_time - after_process_time;
if (total_time > OB_EASY_HANDLER_COST_TIME) {
RPC_LOG(WARN, "async_cb handler cost too much time", K(total_time), K(decode_time),
K(process_time), K(session_destroy_time), K(ret), K(pcode));
}
return ret; return ret;
} }

View File

@ -38,7 +38,7 @@ int64_t calc_extra_payload_size()
return payload; return payload;
} }
int fill_extra_payload(ObRpcPacket& pkt, char* buf, int64_t len, int64_t pos) int fill_extra_payload(ObRpcPacket& pkt, char* buf, int64_t len, int64_t &pos)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (!g_runtime_enabled) { if (!g_runtime_enabled) {

View File

@ -24,7 +24,7 @@ namespace obrpc
extern int64_t get_max_rpc_packet_size(); extern int64_t get_max_rpc_packet_size();
class ObRpcProxy; class ObRpcProxy;
int64_t calc_extra_payload_size(); int64_t calc_extra_payload_size();
int fill_extra_payload(ObRpcPacket& pkt, char* buf, int64_t len, int64_t pos); int fill_extra_payload(ObRpcPacket& pkt, char* buf, int64_t len, int64_t &pos);
int init_packet(ObRpcProxy& proxy, ObRpcPacket& pkt, ObRpcPacketCode pcode, const ObRpcOpts &opts, int init_packet(ObRpcProxy& proxy, ObRpcPacket& pkt, ObRpcPacketCode pcode, const ObRpcOpts &opts,
const bool unneed_response); const bool unneed_response);
common::ObCompressorType get_proxy_compressor_type(ObRpcProxy& proxy); common::ObCompressorType get_proxy_compressor_type(ObRpcProxy& proxy);
@ -46,7 +46,9 @@ template <typename T>
int ret = common::OB_SUCCESS; int ret = common::OB_SUCCESS;
ObRpcPacket pkt; ObRpcPacket pkt;
const int64_t header_sz = pkt.get_header_size(); const int64_t header_sz = pkt.get_header_size();
int64_t payload_sz = calc_extra_payload_size() + common::serialization::encoded_length(args); int64_t extra_payload_size = calc_extra_payload_size();
int64_t args_len = common::serialization::encoded_length(args);
int64_t payload_sz = extra_payload_size + args_len;
const int64_t reserve_bytes_for_pnio = 0; const int64_t reserve_bytes_for_pnio = 0;
char* header_buf = (char*)pool.alloc(reserve_bytes_for_pnio + header_sz + payload_sz) + reserve_bytes_for_pnio; char* header_buf = (char*)pool.alloc(reserve_bytes_for_pnio + header_sz + payload_sz) + reserve_bytes_for_pnio;
char* payload_buf = header_buf + header_sz; char* payload_buf = header_buf + header_sz;
@ -62,8 +64,13 @@ template <typename T>
} else if (OB_FAIL(common::serialization::encode( } else if (OB_FAIL(common::serialization::encode(
payload_buf, payload_sz, pos, args))) { payload_buf, payload_sz, pos, args))) {
RPC_OBRPC_LOG(WARN, "serialize argument fail", K(pos), K(payload_sz), K(ret)); RPC_OBRPC_LOG(WARN, "serialize argument fail", K(pos), K(payload_sz), K(ret));
} else if (OB_UNLIKELY(args_len < pos)) {
ret = OB_ERR_UNEXPECTED;
RPC_OBRPC_LOG(ERROR, "arg encoded length greater than arg length", K(ret), K(payload_sz),
K(args_len), K(extra_payload_size), K(pos), K(pcode));
} else if (OB_FAIL(fill_extra_payload(pkt, payload_buf, payload_sz, pos))) { } else if (OB_FAIL(fill_extra_payload(pkt, payload_buf, payload_sz, pos))) {
RPC_OBRPC_LOG(WARN, "fill extra payload fail", K(ret), K(pos), K(payload_sz)); RPC_OBRPC_LOG(WARN, "fill extra payload fail", K(ret), K(pos), K(payload_sz), K(args_len),
K(extra_payload_size), K(pcode));
} else { } else {
const common::ObCompressorType &compressor_type = get_proxy_compressor_type(proxy); const common::ObCompressorType &compressor_type = get_proxy_compressor_type(proxy);
bool need_compressed = ObCompressorPool::get_instance().need_common_compress(compressor_type); bool need_compressed = ObCompressorPool::get_instance().need_common_compress(compressor_type);