diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp index ced4d0bcb1..8a2f83f7ec 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp @@ -71,7 +71,8 @@ int ObPocServerHandleContext::create(int64_t resp_id, const char* buf, int64_t s ret = common::OB_ALLOCATE_MEMORY_FAILED; RPC_LOG(WARN, "pool allocate memory failed", K(tenant_id), K(pcode_label)); } else { - ctx = new(temp)ObPocServerHandleContext(*pool, resp_id); + int64_t resp_expired_abs_us = ObTimeUtility::current_time() + tmp_pkt.get_timeout(); + ctx = new(temp)ObPocServerHandleContext(*pool, resp_id, resp_expired_abs_us); ctx->set_peer_unsafe(); req = new(ctx + 1)ObRequest(ObRequest::OB_RPC, ObRequest::TRANSPORT_PROTO_POC); ObRpcPacket* pkt = (ObRpcPacket*)pool->alloc(sizeof(ObRpcPacket) + alloc_payload_sz); @@ -120,7 +121,7 @@ void ObPocServerHandleContext::resp(ObRpcPacket* pkt) buf = NULL; sz = 0; } - if ((sys_err = pn_resp(resp_id_, buf, sz)) != 0) { + if ((sys_err = pn_resp(resp_id_, buf, sz, resp_expired_abs_us_)) != 0) { RPC_LOG(WARN, "pn_resp fail", K(resp_id_), K(sys_err)); } } @@ -163,7 +164,7 @@ int serve_cb(int grp, const char* b, int64_t sz, uint64_t resp_id) } if (OB_SUCCESS != tmp_ret) { int sys_err = 0; - if ((sys_err = pn_resp(resp_id, NULL, 0)) != 0) { + if ((sys_err = pn_resp(resp_id, NULL, 0, OB_INVALID_TIMESTAMP)) != 0) { RPC_LOG(WARN, "pn_resp fail", K(resp_id), K(sys_err)); } } diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h index 75165d1805..1cbe14420e 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h @@ -28,8 +28,8 @@ public: enum { OBCG_ELECTION = 2 }; // same as src/share/resource_manager/ob_group_list.h - ObPocServerHandleContext( ObRpcMemPool& pool, uint64_t resp_id): - pool_(pool), resp_id_(resp_id), peer_() + ObPocServerHandleContext( ObRpcMemPool& pool, uint64_t resp_id, int64_t resp_expired_abs_us): + pool_(pool), resp_id_(resp_id), resp_expired_abs_us_(resp_expired_abs_us), peer_() {} ~ObPocServerHandleContext() { destroy(); @@ -40,9 +40,12 @@ public: ObAddr get_peer(); void set_peer_unsafe(); // This function can only be called from the pnio thread. void* alloc(int64_t sz) { return pool_.alloc(sz); } + void set_resp_expired_time(int64_t ts) { resp_expired_abs_us_ = ts; } + int64_t get_resp_expired_time() { return resp_expired_abs_us_; } private: ObRpcMemPool& pool_; uint64_t resp_id_; + int64_t resp_expired_abs_us_; ObAddr peer_; }; diff --git a/deps/oblib/src/rpc/pnio/interface/group.c b/deps/oblib/src/rpc/pnio/interface/group.c index b0c7b25ea1..ea986228e3 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.c +++ b/deps/oblib/src/rpc/pnio/interface/group.c @@ -481,7 +481,7 @@ static void pn_pkts_flush_cb_error_func(pkts_req_t* req) fifo_free(ctx); } -PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz) +PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz, int64_t resp_expired_abs_us) { pn_resp_ctx_t* ctx = (typeof(ctx))req_id; #ifdef PERF_MODE @@ -501,6 +501,7 @@ PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz) r->flush_cb = pn_pkts_flush_cb_func; r->sock_id = ctx->sock_id; r->categ_id = 0; + r->expire_us = resp_expired_abs_us; eh_copy_msg(&r->msg, ctx->pkt_id, buf, sz); } else { r = (typeof(r))(ctx->reserve); @@ -508,6 +509,7 @@ PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz) r->flush_cb = pn_pkts_flush_cb_error_func; r->sock_id = ctx->sock_id; r->categ_id = 0; + r->expire_us = resp_expired_abs_us; } pkts_t* pkts = &ctx->pn->pkts; return pkts_resp(pkts, r); diff --git a/deps/oblib/src/rpc/pnio/interface/group.h b/deps/oblib/src/rpc/pnio/interface/group.h index 3dd4f2d3c3..b55ee9a617 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.h +++ b/deps/oblib/src/rpc/pnio/interface/group.h @@ -57,7 +57,7 @@ PN_API int pn_listen(int port, serve_cb_t cb); PN_API int pn_provision(int listen_id, int grp, int thread_count); // gid_tid = (gid<<8) | tid PN_API int pn_send(uint64_t gid_tid, struct sockaddr_in* addr, const char* buf, int64_t sz, int16_t categ_id, int64_t expire_us, client_cb_t cb, void* arg); -PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz); +PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz, int64_t resp_expired_abs_us); PN_API int pn_get_peer(uint64_t req_id, struct sockaddr_storage* addr); PN_API int pn_ratelimit(int grp_id, int64_t value); PN_API int64_t pn_get_ratelimit(int grp_id); diff --git a/deps/oblib/src/rpc/pnio/nio/nio-tpl-ns.h b/deps/oblib/src/rpc/pnio/nio/nio-tpl-ns.h index e1b27375a2..a79f7dc948 100644 --- a/deps/oblib/src/rpc/pnio/nio/nio-tpl-ns.h +++ b/deps/oblib/src/rpc/pnio/nio/nio-tpl-ns.h @@ -18,6 +18,7 @@ #define my_sk_flush tns(_sk_flush) #define my_write_queue_on_sk_destroy tns(_write_queue_on_sk_destroy) #define my_sk_consume tns(_sk_consume) +#define my_wq_flush tns(_wq_flush) #else #undef __ns__ #undef tns @@ -38,4 +39,5 @@ #undef my_sk_flush #undef my_write_queue_on_sk_destroy #undef my_sk_consume +#undef my_wq_flush #endif diff --git a/deps/oblib/src/rpc/pnio/nio/packet_client.c b/deps/oblib/src/rpc/pnio/nio/packet_client.c index ac04174d98..53c76e91a6 100644 --- a/deps/oblib/src/rpc/pnio/nio/packet_client.c +++ b/deps/oblib/src/rpc/pnio/nio/packet_client.c @@ -10,10 +10,15 @@ static int pktc_sk_read(void** b, pktc_sk_t* s, int64_t sz, int64_t* read_bytes) } static void pktc_flush_cb(pktc_t* io, pktc_req_t* req) { + unused(io); PNIO_DELAY_WARN(delay_warn("pktc_flush_cb", req->ctime_us, FLUSH_DELAY_WARN_US)); req->flush_cb(req); } +static int pktc_wq_flush(sock_t* s, write_queue_t* wq, dlink_t** old_head) { + return wq_flush(s, wq, old_head); +} + #include "pktc_resp.h" #define tns(x) pktc ## x diff --git a/deps/oblib/src/rpc/pnio/nio/packet_client.h b/deps/oblib/src/rpc/pnio/nio/packet_client.h index 956c97f9b6..b9741ddf1d 100644 --- a/deps/oblib/src/rpc/pnio/nio/packet_client.h +++ b/deps/oblib/src/rpc/pnio/nio/packet_client.h @@ -21,7 +21,7 @@ struct pktc_req_t { pktc_flush_cb_func_t flush_cb; pktc_cb_t* resp_cb; addr_t dest; - int64_t categ_id; + int64_t categ_id; // ATTENTION! Cannot add new structure field from categ_id! dlink_t link; str_t msg; }; diff --git a/deps/oblib/src/rpc/pnio/nio/packet_server.c b/deps/oblib/src/rpc/pnio/nio/packet_server.c index a367b82b8d..112b6cc087 100644 --- a/deps/oblib/src/rpc/pnio/nio/packet_server.c +++ b/deps/oblib/src/rpc/pnio/nio/packet_server.c @@ -25,6 +25,23 @@ static int pkts_sk_handle_msg(pkts_sk_t* s, pkts_msg_t* msg) { return ret; } +static int pkts_wq_flush(sock_t* s, write_queue_t* wq, dlink_t** old_head) { + // delete response req that has reached expired time + if (PNIO_REACH_TIME_INTERVAL(10*1000)) { + int64_t cur_time = rk_get_us(); + dlink_for(&wq->queue.head, p) { + pkts_req_t* req = structof(p, pkts_req_t, link); + if (req->expire_us > 0 && cur_time >= req->expire_us) { + if (PNIO_OK == wq_delete(wq, p)) { + rk_warn("rpc resp is expired, expire_us=%ld, sock_id=%ld", req->expire_us, req->sock_id); + pkts_flush_cb(NULL, req); + } + } + } + } + return wq_flush(s, wq, old_head); +} + #define tns(x) pkts ## x #include "nio-tpl-ns.h" #include "write_queue.t.h" diff --git a/deps/oblib/src/rpc/pnio/nio/packet_server.h b/deps/oblib/src/rpc/pnio/nio/packet_server.h index 444dc97e2c..8fa223dd0a 100644 --- a/deps/oblib/src/rpc/pnio/nio/packet_server.h +++ b/deps/oblib/src/rpc/pnio/nio/packet_server.h @@ -15,7 +15,8 @@ typedef struct pkts_req_t { int errcode; pkts_flush_cb_func_t flush_cb; uint64_t sock_id; - int64_t categ_id; + int64_t expire_us; + int64_t categ_id; // ATTENTION! Cannot add new structure field from categ_id! dlink_t link; str_t msg; } pkts_req_t; diff --git a/deps/oblib/src/rpc/pnio/nio/write_queue.t.h b/deps/oblib/src/rpc/pnio/nio/write_queue.t.h index 2b4e69611b..714c083ecf 100644 --- a/deps/oblib/src/rpc/pnio/nio/write_queue.t.h +++ b/deps/oblib/src/rpc/pnio/nio/write_queue.t.h @@ -19,7 +19,7 @@ static void my_flush_cb_after_flush(my_t* io, my_req_t* r) { static int my_sk_do_flush(my_sk_t* s, int64_t* remain) { dlink_t* h = NULL; - int err = wq_flush((sock_t*)s, &s->wq, &h); + int err = my_wq_flush((sock_t*)s, &s->wq, &h); my_t* io = structof(s->fty, my_t, sf); if (0 == err && NULL != h) { dlink_t* stop = dqueue_top(&s->wq.queue);