diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp index d286f49cbe..ec8ea4b311 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp @@ -78,33 +78,29 @@ private: ObRpcMemPool& pool_; }; -ObAsyncRespCallback* ObAsyncRespCallback::create(ObRpcMemPool& pool, UAsyncCB* ucb) +int ObAsyncRespCallback::create(ObRpcMemPool& pool, UAsyncCB* ucb, ObAsyncRespCallback*& pcb) { int ret = OB_SUCCESS; ObPocSPAlloc sp_alloc(pool); UAsyncCB* cb = NULL; - ObAsyncRespCallback* pcb = NULL; - if (NULL == (pcb = (ObAsyncRespCallback*)pool.alloc(sizeof(ObAsyncRespCallback)))) { + pcb = NULL; + if (NULL == ucb) { + // do nothing and not to allocate ObAsyncRespCallback object + } else if (NULL == (pcb = (ObAsyncRespCallback*)pool.alloc(sizeof(ObAsyncRespCallback)))) { ret = OB_ALLOCATE_MEMORY_FAILED; RPC_LOG(WARN, "alloc resp callback fail", K(ret)); + } else if (NULL == (cb = ucb->clone(sp_alloc))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + pcb = NULL; + RPC_LOG(WARN, "ucb.clone fail", K(ret)); } else { - if (NULL != ucb) { - if (NULL == (cb = ucb->clone(sp_alloc))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - pcb = NULL; - RPC_LOG(WARN, "ucb.clone fail", K(ret)); - } else { - cb->low_level_cb_ = pcb; - if (cb != ucb) { - cb->set_cloned(true); - } - new(pcb)ObAsyncRespCallback(pool, cb); - } - } else { - new(pcb)ObAsyncRespCallback(pool, NULL); + cb->low_level_cb_ = pcb; + if (cb != ucb) { + cb->set_cloned(true); } + new(pcb)ObAsyncRespCallback(pool, cb); } - return pcb; + return ret; } int ObAsyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz) diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h index b46887ad25..6098b9e845 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h @@ -56,15 +56,14 @@ class ObAsyncRespCallback public: ObAsyncRespCallback(ObRpcMemPool& pool, UAsyncCB* ucb): pkt_nio_cb_(NULL), pool_(pool), ucb_(ucb) {} ~ObAsyncRespCallback() {} - static ObAsyncRespCallback* create(ObRpcMemPool& pool, UAsyncCB* ucb); + // static ObAsyncRespCallback* create(ObRpcMemPool& pool, UAsyncCB* ucb); + static int create(ObRpcMemPool& pool, UAsyncCB* ucb, ObAsyncRespCallback*& ret_cb); UAsyncCB* get_ucb() { return ucb_; } int handle_resp(int io_err, const char* buf, int64_t sz); static int client_cb(void* arg, int io_error, const char* b, int64_t sz) { int ret = common::OB_SUCCESS; if (arg != NULL) { ret = ((ObAsyncRespCallback*)arg)->handle_resp(io_error, b, sz); - } else { - RPC_LOG(WARN, "async rpc callback is null, it is unexpected", KP(b), K(sz)); } return ret; } @@ -188,19 +187,19 @@ public: const int init_alloc_sz = 0; auto &set = obrpc::ObRpcPacketSet::instance(); const char* pcode_label = set.name_of_idx(set.idx_of_pcode(pcode)); + ObAsyncRespCallback* cb = NULL; if (NULL == (pool = ObRpcMemPool::create(src_tenant_id, pcode_label, init_alloc_sz))) { ret = common::OB_ALLOCATE_MEMORY_FAILED; } else { - ObAsyncRespCallback* cb = NULL; char* req = NULL; int64_t req_sz = 0; if (OB_FAIL(rpc_encode_req(proxy, *pool, pcode, args, opts, req, req_sz, NULL == ucb))) { RPC_LOG(WARN, "rpc encode req fail", K(ret)); } else if(OB_FAIL(check_blacklist(addr))) { RPC_LOG(WARN, "check_blacklist failed", K(addr)); - } else if (NULL == (cb = ObAsyncRespCallback::create(*pool, ucb))) { - ret = common::OB_ALLOCATE_MEMORY_FAILED; - } else { + } else if (OB_FAIL(ObAsyncRespCallback::create(*pool, ucb, cb))) { + RPC_LOG(WARN, "create ObAsyncRespCallback failed", K(ucb)); + } else if (OB_NOT_NULL(cb)) { auto newcb = reinterpret_cast(cb->get_ucb()); if (newcb) { set_ucb_args(newcb, args); @@ -224,8 +223,11 @@ public: } } } - if (common::OB_SUCCESS != ret && NULL != pool) { - pool->destroy(); + if (NULL != pool) { + if (ret != OB_SUCCESS || cb == NULL) { + // if ucb is null, the ObAsyncRespCallback::create will return OB_SUCCESS and cb will set to null, in this case we should release pool in place + pool->destroy(); + } } return ret; }