fix: tenant RPC_CTX_ID memory not released after tenant workers stopped

This commit is contained in:
liucc1997
2023-05-31 10:41:59 +00:00
committed by ob-robot
parent 938641501d
commit 78041964d2
2 changed files with 25 additions and 27 deletions

View File

@ -78,33 +78,29 @@ private:
ObRpcMemPool& pool_;
};
ObAsyncRespCallback* ObAsyncRespCallback::create(ObRpcMemPool& pool, UAsyncCB* ucb)
int ObAsyncRespCallback::create(ObRpcMemPool& pool, UAsyncCB* ucb, ObAsyncRespCallback*& pcb)
{
int ret = OB_SUCCESS;
ObPocSPAlloc sp_alloc(pool);
UAsyncCB* cb = NULL;
ObAsyncRespCallback* pcb = NULL;
if (NULL == (pcb = (ObAsyncRespCallback*)pool.alloc(sizeof(ObAsyncRespCallback)))) {
pcb = NULL;
if (NULL == ucb) {
// do nothing and not to allocate ObAsyncRespCallback object
} else if (NULL == (pcb = (ObAsyncRespCallback*)pool.alloc(sizeof(ObAsyncRespCallback)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
RPC_LOG(WARN, "alloc resp callback fail", K(ret));
} else if (NULL == (cb = ucb->clone(sp_alloc))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
pcb = NULL;
RPC_LOG(WARN, "ucb.clone fail", K(ret));
} else {
if (NULL != ucb) {
if (NULL == (cb = ucb->clone(sp_alloc))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
pcb = NULL;
RPC_LOG(WARN, "ucb.clone fail", K(ret));
} else {
cb->low_level_cb_ = pcb;
if (cb != ucb) {
cb->set_cloned(true);
}
new(pcb)ObAsyncRespCallback(pool, cb);
}
} else {
new(pcb)ObAsyncRespCallback(pool, NULL);
cb->low_level_cb_ = pcb;
if (cb != ucb) {
cb->set_cloned(true);
}
new(pcb)ObAsyncRespCallback(pool, cb);
}
return pcb;
return ret;
}
int ObAsyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz)

View File

@ -56,15 +56,14 @@ class ObAsyncRespCallback
public:
ObAsyncRespCallback(ObRpcMemPool& pool, UAsyncCB* ucb): pkt_nio_cb_(NULL), pool_(pool), ucb_(ucb) {}
~ObAsyncRespCallback() {}
static ObAsyncRespCallback* create(ObRpcMemPool& pool, UAsyncCB* ucb);
// static ObAsyncRespCallback* create(ObRpcMemPool& pool, UAsyncCB* ucb);
static int create(ObRpcMemPool& pool, UAsyncCB* ucb, ObAsyncRespCallback*& ret_cb);
UAsyncCB* get_ucb() { return ucb_; }
int handle_resp(int io_err, const char* buf, int64_t sz);
static int client_cb(void* arg, int io_error, const char* b, int64_t sz) {
int ret = common::OB_SUCCESS;
if (arg != NULL) {
ret = ((ObAsyncRespCallback*)arg)->handle_resp(io_error, b, sz);
} else {
RPC_LOG(WARN, "async rpc callback is null, it is unexpected", KP(b), K(sz));
}
return ret;
}
@ -188,19 +187,19 @@ public:
const int init_alloc_sz = 0;
auto &set = obrpc::ObRpcPacketSet::instance();
const char* pcode_label = set.name_of_idx(set.idx_of_pcode(pcode));
ObAsyncRespCallback* cb = NULL;
if (NULL == (pool = ObRpcMemPool::create(src_tenant_id, pcode_label, init_alloc_sz))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
} else {
ObAsyncRespCallback* cb = NULL;
char* req = NULL;
int64_t req_sz = 0;
if (OB_FAIL(rpc_encode_req(proxy, *pool, pcode, args, opts, req, req_sz, NULL == ucb))) {
RPC_LOG(WARN, "rpc encode req fail", K(ret));
} else if(OB_FAIL(check_blacklist(addr))) {
RPC_LOG(WARN, "check_blacklist failed", K(addr));
} else if (NULL == (cb = ObAsyncRespCallback::create(*pool, ucb))) {
ret = common::OB_ALLOCATE_MEMORY_FAILED;
} else {
} else if (OB_FAIL(ObAsyncRespCallback::create(*pool, ucb, cb))) {
RPC_LOG(WARN, "create ObAsyncRespCallback failed", K(ucb));
} else if (OB_NOT_NULL(cb)) {
auto newcb = reinterpret_cast<UCB*>(cb->get_ucb());
if (newcb) {
set_ucb_args(newcb, args);
@ -224,8 +223,11 @@ public:
}
}
}
if (common::OB_SUCCESS != ret && NULL != pool) {
pool->destroy();
if (NULL != pool) {
if (ret != OB_SUCCESS || cb == NULL) {
// if ucb is null, the ObAsyncRespCallback::create will return OB_SUCCESS and cb will set to null, in this case we should release pool in place
pool->destroy();
}
}
return ret;
}