diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp index 053a8b240..b4cce64fc 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp @@ -87,6 +87,9 @@ ObAsyncRespCallback* ObAsyncRespCallback::create(ObRpcMemPool& pool, UAsyncCB* u RPC_LOG(WARN, "ucb.clone fail", K(ret)); } else { cb->low_level_cb_ = pcb; + if (cb != ucb) { + cb->set_cloned(true); + } } } new(pcb)ObAsyncRespCallback(pool, cb); @@ -107,23 +110,29 @@ int ObAsyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz) } if (ucb_ == NULL) { // do nothing - } else if (0 != io_err) { - ucb_->set_error(io_err); - if (OB_SUCCESS != ucb_->on_error(io_err)) { - ucb_->on_timeout(); - } - } else if (NULL == buf) { - ucb_->on_timeout(); - } else if (OB_FAIL(rpc_decode_ob_packet(pool_, buf, sz, ret_pkt))) { - ucb_->on_invalid(); - RPC_LOG(WARN, "rpc_decode_ob_packet fail", K(ret)); - } else if (OB_FAIL(ucb_->decode(ret_pkt))) { - ucb_->on_invalid(); - RPC_LOG(WARN, "ucb.decode fail", K(ret)); } else { - int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = ucb_->process())) { - RPC_LOG(WARN, "ucb.process fail", K(tmp_ret)); + bool cb_cloned = ucb_->get_cloned(); + if (0 != io_err) { + ucb_->set_error(io_err); + if (OB_SUCCESS != ucb_->on_error(io_err)) { + ucb_->on_timeout(); + } + } else if (NULL == buf) { + ucb_->on_timeout(); + } else if (OB_FAIL(rpc_decode_ob_packet(pool_, buf, sz, ret_pkt))) { + ucb_->on_invalid(); + RPC_LOG(WARN, "rpc_decode_ob_packet fail", K(ret)); + } else if (OB_FAIL(ucb_->decode(ret_pkt))) { + ucb_->on_invalid(); + RPC_LOG(WARN, "ucb.decode fail", K(ret)); + } else { + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = ucb_->process())) { + RPC_LOG(WARN, "ucb.process fail", K(tmp_ret)); + } + } + if (cb_cloned) { + ucb_->~AsyncCB(); } } pool_.destroy(); diff --git a/deps/oblib/src/rpc/pnio/io/write_queue.c b/deps/oblib/src/rpc/pnio/io/write_queue.c index d88048ed2..f28fe8500 100644 --- a/deps/oblib/src/rpc/pnio/io/write_queue.c +++ b/deps/oblib/src/rpc/pnio/io/write_queue.c @@ -92,7 +92,7 @@ int wq_flush(sock_t* s, write_queue_t* wq, dlink_t** old_head) { int err = 0; int64_t wbytes = 0; err = sk_flush_blist((sock_t*)s, &wq->queue.head, wq->pos, &wbytes); - if (0 == err) { + if (0 == err && wbytes > 0) { *old_head = wq_consume(wq, wbytes); } return err;