diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp index dc52110f8..d01cb6ce0 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp @@ -33,9 +33,11 @@ common::ObCompressorType get_proxy_compressor_type(ObRpcProxy& proxy) { int ObSyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz) { if (PNIO_OK != io_err) { - if (PNIO_TIMEOUT == io_err) { + if (PNIO_TIMEOUT == io_err || PNIO_DISCONNECT == io_err || PNIO_PKT_TERMINATE == io_err) { + // these pnio error means not sure rpc was successfully sent send_ret_ = OB_TIMEOUT; } else { + // OB_RPC_SEND_ERROR means the rpc must not have been sent out send_ret_ = OB_RPC_SEND_ERROR; RPC_LOG_RET(WARN, send_ret_, "pnio error", KP(buf), K(sz), K(io_err)); } diff --git a/deps/oblib/src/rpc/pnio/interface/group.h b/deps/oblib/src/rpc/pnio/interface/group.h index 1323f56f7..20f76fe84 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.h +++ b/deps/oblib/src/rpc/pnio/interface/group.h @@ -110,6 +110,7 @@ void pn_print_diag_info(pn_comm_t* pn_comm); #define PNIO_DISCONNECT_NOT_SENT_OUT (-55) #define PNIO_LISTEN_ERROR (-56) #define PNIO_PKT_TERMINATE (-57) +#define PNIO_PKT_TERMINATE_NOT_SENT_OUT (-58) enum { PN_NORMAL_PKT = 0, diff --git a/deps/oblib/src/rpc/pnio/io/write_queue.c b/deps/oblib/src/rpc/pnio/io/write_queue.c index 5158860ef..492a75fbf 100644 --- a/deps/oblib/src/rpc/pnio/io/write_queue.c +++ b/deps/oblib/src/rpc/pnio/io/write_queue.c @@ -87,8 +87,8 @@ inline void wq_push(write_queue_t* wq, dlink_t* l) { inline int wq_delete(write_queue_t* wq, dlink_t* l) { int err = PNIO_OK; - if (dqueue_top(&wq->queue) == l) { - // not to delete the first req of write_queue + if (dqueue_top(&wq->queue) == l && wq->pos > 0) { + // req has been sending, it cannot be deleted err = PNIO_ERROR; } else if (l == l->prev) { // req hasn't been inserted into flush_list diff --git a/deps/oblib/src/rpc/pnio/nio/pktc_resp.h b/deps/oblib/src/rpc/pnio/nio/pktc_resp.h index 40bd69819..52af7394c 100644 --- a/deps/oblib/src/rpc/pnio/nio/pktc_resp.h +++ b/deps/oblib/src/rpc/pnio/nio/pktc_resp.h @@ -26,6 +26,8 @@ static void pktc_do_cb_exception(pktc_t* io, pktc_cb_t* cb) { cb->errcode = PNIO_DISCONNECT_NOT_SENT_OUT; } else if (cb->errcode == PNIO_TIMEOUT) { cb->errcode = PNIO_TIMEOUT_NOT_SENT_OUT; + } else if (cb->errcode == PNIO_PKT_TERMINATE){ + cb->errcode = PNIO_PKT_TERMINATE_NOT_SENT_OUT; } pktc_flush_cb(io, req); } diff --git a/deps/oblib/src/rpc/pnio/nio/pktc_sk_factory.h b/deps/oblib/src/rpc/pnio/nio/pktc_sk_factory.h index ea328d3bf..fcf5b5ff7 100644 --- a/deps/oblib/src/rpc/pnio/nio/pktc_sk_factory.h +++ b/deps/oblib/src/rpc/pnio/nio/pktc_sk_factory.h @@ -84,8 +84,8 @@ static void pktc_sk_delete(pktc_sf_t* sf, pktc_sk_t* s) { pktc_t* io = structof(sf, pktc_t, sf); rk_info("sk_destroy: s=%p io=%p", s, io); pktc_sk_destroy(sf, s); - pktc_write_queue_on_sk_destroy(io, s); pktc_resp_cb_on_sk_destroy(io, s); + pktc_write_queue_on_sk_destroy(io, s); ib_destroy(&s->ib); dlink_delete(&s->rl_ready_link); pktc_sk_free(s);