pkt-nio: release response packet when rpc timeout reached
This commit is contained in:
@ -71,7 +71,8 @@ int ObPocServerHandleContext::create(int64_t resp_id, const char* buf, int64_t s
|
||||
ret = common::OB_ALLOCATE_MEMORY_FAILED;
|
||||
RPC_LOG(WARN, "pool allocate memory failed", K(tenant_id), K(pcode_label));
|
||||
} else {
|
||||
ctx = new(temp)ObPocServerHandleContext(*pool, resp_id);
|
||||
int64_t resp_expired_abs_us = ObTimeUtility::current_time() + tmp_pkt.get_timeout();
|
||||
ctx = new(temp)ObPocServerHandleContext(*pool, resp_id, resp_expired_abs_us);
|
||||
ctx->set_peer_unsafe();
|
||||
req = new(ctx + 1)ObRequest(ObRequest::OB_RPC, ObRequest::TRANSPORT_PROTO_POC);
|
||||
ObRpcPacket* pkt = (ObRpcPacket*)pool->alloc(sizeof(ObRpcPacket) + alloc_payload_sz);
|
||||
@ -120,7 +121,7 @@ void ObPocServerHandleContext::resp(ObRpcPacket* pkt)
|
||||
buf = NULL;
|
||||
sz = 0;
|
||||
}
|
||||
if ((sys_err = pn_resp(resp_id_, buf, sz)) != 0) {
|
||||
if ((sys_err = pn_resp(resp_id_, buf, sz, resp_expired_abs_us_)) != 0) {
|
||||
RPC_LOG(WARN, "pn_resp fail", K(resp_id_), K(sys_err));
|
||||
}
|
||||
}
|
||||
@ -163,7 +164,7 @@ int serve_cb(int grp, const char* b, int64_t sz, uint64_t resp_id)
|
||||
}
|
||||
if (OB_SUCCESS != tmp_ret) {
|
||||
int sys_err = 0;
|
||||
if ((sys_err = pn_resp(resp_id, NULL, 0)) != 0) {
|
||||
if ((sys_err = pn_resp(resp_id, NULL, 0, OB_INVALID_TIMESTAMP)) != 0) {
|
||||
RPC_LOG(WARN, "pn_resp fail", K(resp_id), K(sys_err));
|
||||
}
|
||||
}
|
||||
|
||||
7
deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h
vendored
7
deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h
vendored
@ -28,8 +28,8 @@ public:
|
||||
enum {
|
||||
OBCG_ELECTION = 2
|
||||
}; // same as src/share/resource_manager/ob_group_list.h
|
||||
ObPocServerHandleContext( ObRpcMemPool& pool, uint64_t resp_id):
|
||||
pool_(pool), resp_id_(resp_id), peer_()
|
||||
ObPocServerHandleContext( ObRpcMemPool& pool, uint64_t resp_id, int64_t resp_expired_abs_us):
|
||||
pool_(pool), resp_id_(resp_id), resp_expired_abs_us_(resp_expired_abs_us), peer_()
|
||||
{}
|
||||
~ObPocServerHandleContext() {
|
||||
destroy();
|
||||
@ -40,9 +40,12 @@ public:
|
||||
ObAddr get_peer();
|
||||
void set_peer_unsafe(); // This function can only be called from the pnio thread.
|
||||
void* alloc(int64_t sz) { return pool_.alloc(sz); }
|
||||
void set_resp_expired_time(int64_t ts) { resp_expired_abs_us_ = ts; }
|
||||
int64_t get_resp_expired_time() { return resp_expired_abs_us_; }
|
||||
private:
|
||||
ObRpcMemPool& pool_;
|
||||
uint64_t resp_id_;
|
||||
int64_t resp_expired_abs_us_;
|
||||
ObAddr peer_;
|
||||
};
|
||||
|
||||
|
||||
4
deps/oblib/src/rpc/pnio/interface/group.c
vendored
4
deps/oblib/src/rpc/pnio/interface/group.c
vendored
@ -481,7 +481,7 @@ static void pn_pkts_flush_cb_error_func(pkts_req_t* req)
|
||||
fifo_free(ctx);
|
||||
}
|
||||
|
||||
PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz)
|
||||
PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz, int64_t resp_expired_abs_us)
|
||||
{
|
||||
pn_resp_ctx_t* ctx = (typeof(ctx))req_id;
|
||||
#ifdef PERF_MODE
|
||||
@ -501,6 +501,7 @@ PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz)
|
||||
r->flush_cb = pn_pkts_flush_cb_func;
|
||||
r->sock_id = ctx->sock_id;
|
||||
r->categ_id = 0;
|
||||
r->expire_us = resp_expired_abs_us;
|
||||
eh_copy_msg(&r->msg, ctx->pkt_id, buf, sz);
|
||||
} else {
|
||||
r = (typeof(r))(ctx->reserve);
|
||||
@ -508,6 +509,7 @@ PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz)
|
||||
r->flush_cb = pn_pkts_flush_cb_error_func;
|
||||
r->sock_id = ctx->sock_id;
|
||||
r->categ_id = 0;
|
||||
r->expire_us = resp_expired_abs_us;
|
||||
}
|
||||
pkts_t* pkts = &ctx->pn->pkts;
|
||||
return pkts_resp(pkts, r);
|
||||
|
||||
2
deps/oblib/src/rpc/pnio/interface/group.h
vendored
2
deps/oblib/src/rpc/pnio/interface/group.h
vendored
@ -57,7 +57,7 @@ PN_API int pn_listen(int port, serve_cb_t cb);
|
||||
PN_API int pn_provision(int listen_id, int grp, int thread_count);
|
||||
// gid_tid = (gid<<8) | tid
|
||||
PN_API int pn_send(uint64_t gid_tid, struct sockaddr_in* addr, const char* buf, int64_t sz, int16_t categ_id, int64_t expire_us, client_cb_t cb, void* arg);
|
||||
PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz);
|
||||
PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz, int64_t resp_expired_abs_us);
|
||||
PN_API int pn_get_peer(uint64_t req_id, struct sockaddr_storage* addr);
|
||||
PN_API int pn_ratelimit(int grp_id, int64_t value);
|
||||
PN_API int64_t pn_get_ratelimit(int grp_id);
|
||||
|
||||
2
deps/oblib/src/rpc/pnio/nio/nio-tpl-ns.h
vendored
2
deps/oblib/src/rpc/pnio/nio/nio-tpl-ns.h
vendored
@ -18,6 +18,7 @@
|
||||
#define my_sk_flush tns(_sk_flush)
|
||||
#define my_write_queue_on_sk_destroy tns(_write_queue_on_sk_destroy)
|
||||
#define my_sk_consume tns(_sk_consume)
|
||||
#define my_wq_flush tns(_wq_flush)
|
||||
#else
|
||||
#undef __ns__
|
||||
#undef tns
|
||||
@ -38,4 +39,5 @@
|
||||
#undef my_sk_flush
|
||||
#undef my_write_queue_on_sk_destroy
|
||||
#undef my_sk_consume
|
||||
#undef my_wq_flush
|
||||
#endif
|
||||
|
||||
5
deps/oblib/src/rpc/pnio/nio/packet_client.c
vendored
5
deps/oblib/src/rpc/pnio/nio/packet_client.c
vendored
@ -10,10 +10,15 @@ static int pktc_sk_read(void** b, pktc_sk_t* s, int64_t sz, int64_t* read_bytes)
|
||||
}
|
||||
|
||||
static void pktc_flush_cb(pktc_t* io, pktc_req_t* req) {
|
||||
unused(io);
|
||||
PNIO_DELAY_WARN(delay_warn("pktc_flush_cb", req->ctime_us, FLUSH_DELAY_WARN_US));
|
||||
req->flush_cb(req);
|
||||
}
|
||||
|
||||
static int pktc_wq_flush(sock_t* s, write_queue_t* wq, dlink_t** old_head) {
|
||||
return wq_flush(s, wq, old_head);
|
||||
}
|
||||
|
||||
#include "pktc_resp.h"
|
||||
|
||||
#define tns(x) pktc ## x
|
||||
|
||||
2
deps/oblib/src/rpc/pnio/nio/packet_client.h
vendored
2
deps/oblib/src/rpc/pnio/nio/packet_client.h
vendored
@ -21,7 +21,7 @@ struct pktc_req_t {
|
||||
pktc_flush_cb_func_t flush_cb;
|
||||
pktc_cb_t* resp_cb;
|
||||
addr_t dest;
|
||||
int64_t categ_id;
|
||||
int64_t categ_id; // ATTENTION! Cannot add new structure field from categ_id!
|
||||
dlink_t link;
|
||||
str_t msg;
|
||||
};
|
||||
|
||||
17
deps/oblib/src/rpc/pnio/nio/packet_server.c
vendored
17
deps/oblib/src/rpc/pnio/nio/packet_server.c
vendored
@ -25,6 +25,23 @@ static int pkts_sk_handle_msg(pkts_sk_t* s, pkts_msg_t* msg) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int pkts_wq_flush(sock_t* s, write_queue_t* wq, dlink_t** old_head) {
|
||||
// delete response req that has reached expired time
|
||||
if (PNIO_REACH_TIME_INTERVAL(10*1000)) {
|
||||
int64_t cur_time = rk_get_us();
|
||||
dlink_for(&wq->queue.head, p) {
|
||||
pkts_req_t* req = structof(p, pkts_req_t, link);
|
||||
if (req->expire_us > 0 && cur_time >= req->expire_us) {
|
||||
if (PNIO_OK == wq_delete(wq, p)) {
|
||||
rk_warn("rpc resp is expired, expire_us=%ld, sock_id=%ld", req->expire_us, req->sock_id);
|
||||
pkts_flush_cb(NULL, req);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return wq_flush(s, wq, old_head);
|
||||
}
|
||||
|
||||
#define tns(x) pkts ## x
|
||||
#include "nio-tpl-ns.h"
|
||||
#include "write_queue.t.h"
|
||||
|
||||
3
deps/oblib/src/rpc/pnio/nio/packet_server.h
vendored
3
deps/oblib/src/rpc/pnio/nio/packet_server.h
vendored
@ -15,7 +15,8 @@ typedef struct pkts_req_t {
|
||||
int errcode;
|
||||
pkts_flush_cb_func_t flush_cb;
|
||||
uint64_t sock_id;
|
||||
int64_t categ_id;
|
||||
int64_t expire_us;
|
||||
int64_t categ_id; // ATTENTION! Cannot add new structure field from categ_id!
|
||||
dlink_t link;
|
||||
str_t msg;
|
||||
} pkts_req_t;
|
||||
|
||||
2
deps/oblib/src/rpc/pnio/nio/write_queue.t.h
vendored
2
deps/oblib/src/rpc/pnio/nio/write_queue.t.h
vendored
@ -19,7 +19,7 @@ static void my_flush_cb_after_flush(my_t* io, my_req_t* r) {
|
||||
|
||||
static int my_sk_do_flush(my_sk_t* s, int64_t* remain) {
|
||||
dlink_t* h = NULL;
|
||||
int err = wq_flush((sock_t*)s, &s->wq, &h);
|
||||
int err = my_wq_flush((sock_t*)s, &s->wq, &h);
|
||||
my_t* io = structof(s->fty, my_t, sf);
|
||||
if (0 == err && NULL != h) {
|
||||
dlink_t* stop = dqueue_top(&s->wq.queue);
|
||||
|
||||
Reference in New Issue
Block a user