fix SeArray memory leak when deserializing rcode_.warnings_

This commit is contained in:
liucc1997
2023-05-17 13:46:26 +00:00
committed by ob-robot
parent d07260f033
commit 29be7ce8ce
2 changed files with 26 additions and 17 deletions

View File

@ -87,6 +87,9 @@ ObAsyncRespCallback* ObAsyncRespCallback::create(ObRpcMemPool& pool, UAsyncCB* u
RPC_LOG(WARN, "ucb.clone fail", K(ret)); RPC_LOG(WARN, "ucb.clone fail", K(ret));
} else { } else {
cb->low_level_cb_ = pcb; cb->low_level_cb_ = pcb;
if (cb != ucb) {
cb->set_cloned(true);
}
} }
} }
new(pcb)ObAsyncRespCallback(pool, cb); new(pcb)ObAsyncRespCallback(pool, cb);
@ -107,23 +110,29 @@ int ObAsyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz)
} }
if (ucb_ == NULL) { if (ucb_ == NULL) {
// do nothing // do nothing
} else if (0 != io_err) {
ucb_->set_error(io_err);
if (OB_SUCCESS != ucb_->on_error(io_err)) {
ucb_->on_timeout();
}
} else if (NULL == buf) {
ucb_->on_timeout();
} else if (OB_FAIL(rpc_decode_ob_packet(pool_, buf, sz, ret_pkt))) {
ucb_->on_invalid();
RPC_LOG(WARN, "rpc_decode_ob_packet fail", K(ret));
} else if (OB_FAIL(ucb_->decode(ret_pkt))) {
ucb_->on_invalid();
RPC_LOG(WARN, "ucb.decode fail", K(ret));
} else { } else {
int tmp_ret = OB_SUCCESS; bool cb_cloned = ucb_->get_cloned();
if (OB_SUCCESS != (tmp_ret = ucb_->process())) { if (0 != io_err) {
RPC_LOG(WARN, "ucb.process fail", K(tmp_ret)); ucb_->set_error(io_err);
if (OB_SUCCESS != ucb_->on_error(io_err)) {
ucb_->on_timeout();
}
} else if (NULL == buf) {
ucb_->on_timeout();
} else if (OB_FAIL(rpc_decode_ob_packet(pool_, buf, sz, ret_pkt))) {
ucb_->on_invalid();
RPC_LOG(WARN, "rpc_decode_ob_packet fail", K(ret));
} else if (OB_FAIL(ucb_->decode(ret_pkt))) {
ucb_->on_invalid();
RPC_LOG(WARN, "ucb.decode fail", K(ret));
} else {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = ucb_->process())) {
RPC_LOG(WARN, "ucb.process fail", K(tmp_ret));
}
}
if (cb_cloned) {
ucb_->~AsyncCB();
} }
} }
pool_.destroy(); pool_.destroy();

View File

@ -92,7 +92,7 @@ int wq_flush(sock_t* s, write_queue_t* wq, dlink_t** old_head) {
int err = 0; int err = 0;
int64_t wbytes = 0; int64_t wbytes = 0;
err = sk_flush_blist((sock_t*)s, &wq->queue.head, wq->pos, &wbytes); err = sk_flush_blist((sock_t*)s, &wq->queue.head, wq->pos, &wbytes);
if (0 == err) { if (0 == err && wbytes > 0) {
*old_head = wq_consume(wq, wbytes); *old_head = wq_consume(wq, wbytes);
} }
return err; return err;