diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp index a3080c252..da1c07d6a 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.cpp @@ -14,6 +14,7 @@ #include "rpc/obrpc/ob_poc_rpc_server.h" #include "rpc/obrpc/ob_rpc_proxy.h" #include "rpc/obrpc/ob_net_keepalive.h" +#include "share/ob_errno.h" extern "C" { #include "rpc/pnio/r0/futex.h" } @@ -63,8 +64,20 @@ int ObSyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz) int ObSyncRespCallback::wait(const int64_t wait_timeout_us, const int64_t pcode, const int64_t req_sz) { ObWaitEventGuard wait_guard(ObWaitEventIds::SYNC_RPC, wait_timeout_us / 1000, pcode, req_sz); + const struct timespec ts = {1, 0}; + bool has_terminated = false; while(ATOMIC_LOAD(&cond_) == 0) { - rk_futex_wait(&cond_, 0, NULL); + if (!has_terminated && OB_ERR_SESSION_INTERRUPTED == THIS_WORKER.check_status()) { + RPC_LOG(INFO, "check session killed, will execute pn_terminate_pkt", K(gtid_), K(pkt_id_)); + int err = 0; + if ((err = pn_terminate_pkt(gtid_, pkt_id_)) != 0) { + int tmp_ret = tranlate_to_ob_error(err); + RPC_LOG_RET(WARN, tmp_ret, "pn_terminate_pkt failed", K(err)); + } else { + has_terminated = true; + } + } + rk_futex_wait(&cond_, 0, &ts); } return send_ret_; } diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h index 99311acb2..c7016a98c 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h @@ -28,7 +28,8 @@ namespace obrpc class ObSyncRespCallback { public: - ObSyncRespCallback(ObRpcMemPool& pool): pkt_nio_cb_(NULL), pool_(pool), resp_(NULL), sz_(0), cond_(0), send_ret_(common::OB_SUCCESS){} + ObSyncRespCallback(ObRpcMemPool& pool) + : pkt_nio_cb_(NULL), pool_(pool), resp_(NULL), sz_(0), cond_(0), send_ret_(common::OB_SUCCESS), gtid_(0), pkt_id_(0){} ~ObSyncRespCallback() {} void* alloc(int64_t sz) { return pool_.alloc(sz); } int handle_resp(int io_err, const char* buf, int64_t sz); @@ -48,6 +49,9 @@ private: int64_t sz_; int cond_; int send_ret_; +public: + uint64_t gtid_; + uint32_t pkt_id_; }; typedef rpc::frame::ObReqTransport::AsyncCB UAsyncCB; @@ -73,6 +77,9 @@ private: void* pkt_nio_cb_; ObRpcMemPool& pool_; UAsyncCB* ucb_; +public: + uint64_t gtid_; + uint32_t pkt_id_; }; void init_ucb(ObRpcProxy& proxy, UAsyncCB* ucb, const common::ObAddr& addr, int64_t send_ts, int64_t payload_sz); @@ -138,27 +145,32 @@ public: RPC_LOG(WARN, "rpc encode req fail", K(ret)); } else if(OB_FAIL(check_blacklist(addr))) { RPC_LOG(WARN, "check_blacklist failed", K(ret)); - } else if (0 != (sys_err = pn_send( - (pnio_group_id<<32) + thread_id, - obaddr2sockaddr(&sock_addr, addr), + } else { + const pn_pkt_t pkt = { req, req_sz, - static_cast(set.idx_of_pcode(pcode)), start_ts + get_proxy_timeout(proxy), + static_cast(set.idx_of_pcode(pcode)), ObSyncRespCallback::client_cb, - &cb))) { - ret = translate_io_error(sys_err); - RPC_LOG(WARN, "pn_send fail", K(sys_err), K(addr), K(pcode)); - } else { + &cb + }; + cb.gtid_ = (pnio_group_id<<32) + thread_id; + if (0 != (sys_err = pn_send((pnio_group_id<<32) + thread_id, obaddr2sockaddr(&sock_addr, addr), &pkt, &cb.pkt_id_))) { + ret = translate_io_error(sys_err); + RPC_LOG(WARN, "pn_send fail", K(sys_err), K(addr), K(pcode)); + } + } + if (OB_SUCC(ret)) { EVENT_INC(RPC_PACKET_OUT); EVENT_ADD(RPC_PACKET_OUT_BYTES, req_sz); - if (OB_FAIL(cb.wait(get_proxy_timeout(proxy), pcode, req_sz))) { - RPC_LOG(WARN, "sync rpc execute fail", K(ret), K(addr), K(pcode)); + int64_t timeout = get_proxy_timeout(proxy); + if (OB_FAIL(cb.wait(timeout, pcode, req_sz))) { + RPC_LOG(WARN, "sync rpc execute fail", K(ret), K(addr), K(pcode), K(timeout)); } else if (NULL == (resp = cb.get_resp(resp_sz))) { ret = common::OB_ERR_UNEXPECTED; - RPC_LOG(WARN, "sync rpc execute success but resp is null", K(ret), K(addr), K(pcode)); + RPC_LOG(WARN, "sync rpc execute success but resp is null", K(ret), K(addr), K(pcode), K(timeout)); } else if (OB_FAIL(rpc_decode_resp(resp, resp_sz, out, resp_pkt, rcode))) { - RPC_LOG(WARN, "execute rpc fail", K(addr), K(pcode), K(ret)); + RPC_LOG(WARN, "execute rpc fail", K(addr), K(pcode), K(ret), K(timeout)); } } } @@ -210,6 +222,7 @@ public: } else { char* req = NULL; int64_t req_sz = 0; + uint32_t* pkt_id_ptr = NULL; timeguard.click(); if (OB_FAIL(rpc_encode_req(proxy, *pool, pcode, args, opts, req, req_sz, NULL == ucb))) { RPC_LOG(WARN, "rpc encode req fail", K(ret)); @@ -224,20 +237,25 @@ public: set_ucb_args(newcb, args); init_ucb(proxy, cb->get_ucb(), addr, start_ts, req_sz); } + cb->gtid_ = (pnio_group_id<<32) + thread_id; + pkt_id_ptr = &cb->pkt_id_; } timeguard.click(); if (OB_SUCC(ret)) { sockaddr_in sock_addr; + const pn_pkt_t pkt = { + req, + req_sz, + start_ts + get_proxy_timeout(proxy), + static_cast(set.idx_of_pcode(pcode)), + ObAsyncRespCallback::client_cb, + cb + }; if (0 != (sys_err = pn_send( (pnio_group_id<<32) + thread_id, obaddr2sockaddr(&sock_addr, addr), - req, - req_sz, - static_cast(set.idx_of_pcode(pcode)), - start_ts + get_proxy_timeout(proxy), - ObAsyncRespCallback::client_cb, - cb) - )) { + &pkt, + pkt_id_ptr))) { ret = translate_io_error(sys_err); RPC_LOG(WARN, "pn_send fail", K(sys_err), K(addr), K(pcode)); } else { diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp index b3d0dd71b..68bd07c48 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp @@ -72,17 +72,22 @@ int SSHandle::get_more(typename pcodeStruct::Response &result) RPC_LOG(WARN, "rpc encode req fail", K(ret)); } else if(OB_FAIL(ObPocClientStub::check_blacklist(dst_))) { RPC_LOG(WARN, "check_blacklist failed", K(ret)); - } else if (0 != (pn_err = pn_send( - (pnio_group_id<<32) + thread_id, - ObPocClientStub::obaddr2sockaddr(&sock_addr, dst_), + } else { + const pn_pkt_t pkt = { pnio_req, pnio_req_sz, - static_cast(set.idx_of_pcode(pcode_)), start_ts + proxy_.timeout(), + static_cast(set.idx_of_pcode(pcode_)), ObSyncRespCallback::client_cb, - &cb))) { - ret = ObPocClientStub::translate_io_error(pn_err); - RPC_LOG(WARN, "pnio post fail", K(pn_err)); + &cb + }; + cb.gtid_ = (pnio_group_id<<32) + thread_id; + if (0 != (pn_err = pn_send((pnio_group_id<<32) + thread_id, ObPocClientStub::obaddr2sockaddr(&sock_addr, dst_), &pkt, &cb.pkt_id_))) { + ret = ObPocClientStub::translate_io_error(pn_err); + RPC_LOG(WARN, "pnio post fail", K(pn_err)); + } + } + if (OB_FAIL(ret)) { } else if (OB_FAIL(cb.wait(proxy_.timeout(), pcode_, pnio_req_sz))) { RPC_LOG(WARN, "stream rpc execute fail", K(ret), K(dst_)); } else if (NULL == (resp = cb.get_resp(resp_sz))) { @@ -208,17 +213,22 @@ int SSHandle::abort() RPC_LOG(WARN, "rpc encode req fail", K(ret)); } else if(OB_FAIL(ObPocClientStub::check_blacklist(dst_))) { RPC_LOG(WARN, "check_blacklist failed", K(ret)); - } else if (0 != (pn_err = pn_send( - (pnio_group_id<<32) + thread_id, - ObPocClientStub::obaddr2sockaddr(&sock_addr, dst_), + } else { + const pn_pkt_t pkt = { pnio_req, pnio_req_sz, - static_cast(set.idx_of_pcode(pcode_)), start_ts + proxy_.timeout(), + static_cast(set.idx_of_pcode(pcode_)), ObSyncRespCallback::client_cb, - &cb))) { - ret = ObPocClientStub::translate_io_error(pn_err); - RPC_LOG(WARN, "pnio post fail", K(pn_err)); + &cb + }; + cb.gtid_ = (pnio_group_id<<32) + thread_id; + if (0 != (pn_err = pn_send((pnio_group_id<<32) + thread_id, ObPocClientStub::obaddr2sockaddr(&sock_addr, dst_), &pkt, &cb.pkt_id_))) { + ret = ObPocClientStub::translate_io_error(pn_err); + RPC_LOG(WARN, "pnio post fail", K(pn_err)); + } + } + if (OB_FAIL(ret)) { } else if (OB_FAIL(cb.wait(proxy_.timeout(), pcode_, pnio_req_sz))) { RPC_LOG(WARN, "stream rpc execute fail", K(ret), K(dst_)); } else if (NULL == (resp = cb.get_resp(resp_sz))) { diff --git a/deps/oblib/src/rpc/pnio/interface/group.c b/deps/oblib/src/rpc/pnio/interface/group.c index 5b05c459f..309343969 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.c +++ b/deps/oblib/src/rpc/pnio/interface/group.c @@ -278,6 +278,7 @@ typedef struct pn_client_req_t easy_head_t head; } pn_client_req_t; + typedef struct pn_client_slice_t { int64_t ref_; @@ -307,8 +308,10 @@ static void pn_pktc_resp_cb(pktc_cb_t* cb, const char* resp, int64_t sz) cfifo_free(pn_cb); } -static pktc_req_t* pn_create_pktc_req(pn_t* pn, uint64_t pkt_id, addr_t dest, const char* req, int64_t req_sz, int16_t categ_id, int64_t expire_us, client_cb_t client_cb, void* arg) +static pktc_req_t* pn_create_pktc_req(pn_t* pn, uint64_t pkt_id, addr_t dest, const pn_pkt_t* pkt) { + const char* req = pkt->buf; + const int64_t req_sz = pkt->sz; pn_client_req_t* pn_req = (typeof(pn_req))cfifo_alloc(&pn->client_req_alloc, sizeof(*pn_req) + req_sz); if (unlikely(NULL == pn_req)) { return NULL; @@ -320,22 +323,40 @@ static pktc_req_t* pn_create_pktc_req(pn_t* pn, uint64_t pkt_id, addr_t dest, co } pktc_cb_t* cb = &pn_cb->cb; pktc_req_t* r = &pn_req->req; - pn_cb->client_cb = client_cb; - pn_cb->arg = arg; + pn_cb->client_cb = pkt->cb; + pn_cb->arg = pkt->arg; cb->id = pkt_id; - cb->expire_us = expire_us; + cb->expire_us = pkt->expire_us; cb->resp_cb = pn_pktc_resp_cb; cb->errcode = PNIO_OK; cb->req = r; + r->pkt_type = PN_NORMAL_PKT; r->flush_cb = pn_pktc_flush_cb; r->resp_cb = cb; r->dest = dest; - r->categ_id = categ_id; + r->categ_id = pkt->categ_id; dlink_init(&r->link); eh_copy_msg(&r->msg, cb->id, req, req_sz); return r; } +static pktc_req_t* pn_create_cmd_req(pn_t* pn, int64_t cmd, uint64_t pkt_id) +{ + pktc_req_t* r = NULL; + pn_client_cmd_req_t* pn_req = (typeof(pn_req))cfifo_alloc(&pn->client_req_alloc, sizeof(*pn_req)); + if (likely(pn_req)) { + memset(pn_req, 0, sizeof(*pn_req)); + r = &pn_req->req; + r->pkt_type = PN_CMD_PKT; + r->flush_cb = NULL; + r->resp_cb = NULL; + pn_req->cmd = cmd; + pn_req->arg = pkt_id; + eh_copy_msg(&r->msg, pkt_id, NULL, 0); + } + return r; +} + static uint32_t global_next_pkt_id RK_CACHE_ALIGNED; static uint32_t gen_pkt_id() { @@ -348,9 +369,15 @@ static pn_t* get_pn_for_send(pn_grp_t* pgrp, int tid) return pgrp->pn_array[tid % pgrp->count]; } -PN_API int pn_send(uint64_t gtid, struct sockaddr_in* addr, const char* buf, int64_t sz, int16_t categ_id, int64_t expire_us, client_cb_t cb, void* arg) +PN_API int pn_send(uint64_t gtid, struct sockaddr_in* addr, const pn_pkt_t* pkt, uint32_t* pkt_id_ret) { int err = 0; + const char* buf = pkt->buf; + const int64_t sz = pkt->sz; + const int16_t categ_id = pkt->categ_id; + const int64_t expire_us = pkt->expire_us; + const void* arg = pkt->arg; + pn_grp_t* pgrp = locate_grp(gtid>>32); pn_t* pn = get_pn_for_send(pgrp, gtid & 0xffffffff); addr_t dest = {.ip=addr->sin_addr.s_addr, .port=htons(addr->sin_port), .tid=0}; @@ -364,12 +391,13 @@ PN_API int pn_send(uint64_t gtid, struct sockaddr_in* addr, const char* buf, int } else if (LOAD(&pn->is_stop_)) { err = PNIO_STOPPED; } else { - pktc_req_t* r = pn_create_pktc_req(pn, pkt_id, dest, buf, sz, categ_id, expire_us, cb, arg); + pktc_req_t* r = pn_create_pktc_req(pn, pkt_id, dest, pkt); if (NULL == r) { err = ENOMEM; } else { if (NULL != arg) { *((void**)arg) = r; + *pkt_id_ret = pkt_id; } err = pktc_post(&pn->pktc, r); } @@ -574,6 +602,24 @@ PN_API uint64_t pn_get_rxbytes(int grp_id) { return bytes; } +PN_API int pn_terminate_pkt(uint64_t gtid, uint32_t pkt_id) { + int err = 0; + pn_grp_t* pgrp = locate_grp(gtid>>32); + if (NULL == pgrp) { + err = EINVAL; + } else { + pn_t* pn = get_pn_for_send(pgrp, gtid & 0xffffffff); + pktc_req_t* r = pn_create_cmd_req(pn, PN_CMD_TERMINATE_PKT, pkt_id); + if (NULL == r) { + err = ENOMEM; + rk_warn("create cmd req failed, gtid=0x%lx, pkt_id=%u", gtid, pkt_id); + } else { + err = pktc_post(&pn->pktc, r); + } + } + return err; +} + int dispatch_accept_fd_to_certain_group(int fd, uint64_t gid) { int ret = 0; diff --git a/deps/oblib/src/rpc/pnio/interface/group.h b/deps/oblib/src/rpc/pnio/interface/group.h index c207b5e0a..039470d91 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.h +++ b/deps/oblib/src/rpc/pnio/interface/group.h @@ -63,13 +63,23 @@ typedef struct pn_comm_t PN_COMM; } pn_comm_t; +typedef struct pn_pkt_t +{ + char* buf; + int64_t sz; + int64_t expire_us; + int16_t categ_id; + client_cb_t cb; + void* arg; +} pn_pkt_t; + PN_API int64_t pn_set_keepalive_timeout(int64_t user_timeout); PN_API int pn_listen(int port, serve_cb_t cb); // if listen_id == -1, act as client only // make sure grp != 0 PN_API int pn_provision(int listen_id, int grp, int thread_count); // gid_tid = (gid<<8) | tid -PN_API int pn_send(uint64_t gid_tid, struct sockaddr_in* addr, const char* buf, int64_t sz, int16_t categ_id, int64_t expire_us, client_cb_t cb, void* arg); +PN_API int pn_send(uint64_t gtid, struct sockaddr_in* addr, const pn_pkt_t* pkt, uint32_t* pkt_id_ret); PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz, int64_t resp_expired_abs_us); PN_API int pn_get_peer(uint64_t req_id, struct sockaddr_storage* addr); PN_API int pn_ratelimit(int grp_id, int64_t value); @@ -79,6 +89,7 @@ PN_API int dispatch_accept_fd_to_certain_group(int fd, uint64_t gid); PN_API void pn_stop(uint64_t gid); PN_API void pn_wait(uint64_t gid); PN_API int pn_get_fd(uint64_t req_id); +PN_API int pn_terminate_pkt(uint64_t gtid, uint32_t pkt_id); extern int64_t pnio_keepalive_timeout; pn_comm_t* get_current_pnio(); void pn_release(pn_comm_t* pn_comm); @@ -93,7 +104,13 @@ void pn_release(pn_comm_t* pn_comm); #define PNIO_TIMEOUT_NOT_SENT_OUT (-54) #define PNIO_DISCONNECT_NOT_SENT_OUT (-55) #define PNIO_LISTEN_ERROR (-56) +#define PNIO_PKT_TERMINATE (-57) +enum { + PN_NORMAL_PKT = 0, + PN_CMD_PKT, + PN_CMD_TERMINATE_PKT +}; /* // 启动listen线程和epool线程池, epoll线程池有10个线程 int listen_id = pn_listen(8042, cb); diff --git a/deps/oblib/src/rpc/pnio/nio/packet_client.h b/deps/oblib/src/rpc/pnio/nio/packet_client.h index 4165e4a17..0259a84d6 100644 --- a/deps/oblib/src/rpc/pnio/nio/packet_client.h +++ b/deps/oblib/src/rpc/pnio/nio/packet_client.h @@ -28,6 +28,7 @@ struct pktc_cb_t { }; struct pktc_req_t { + int64_t pkt_type; struct pktc_sk_t* sk; PNIO_DELAY_WARN(int64_t ctime_us); pktc_flush_cb_func_t flush_cb; @@ -38,6 +39,14 @@ struct pktc_req_t { str_t msg; }; +typedef struct pn_client_cmd_req_t +{ + pktc_req_t req; + easy_head_t head; + int64_t cmd; + int64_t arg; +} pn_client_cmd_req_t; + extern int64_t pktc_init(pktc_t* io, eloop_t* ep, uint64_t dispatch_id); extern int pktc_post(pktc_t* io, pktc_req_t* req); diff --git a/deps/oblib/src/rpc/pnio/nio/pktc_post.h b/deps/oblib/src/rpc/pnio/nio/pktc_post.h index 7ec13fa57..9a3d16edb 100644 --- a/deps/oblib/src/rpc/pnio/nio/pktc_post.h +++ b/deps/oblib/src/rpc/pnio/nio/pktc_post.h @@ -117,9 +117,20 @@ static int pktc_handle_req_queue(pktc_t* io) { int cnt = 0; while(cnt < 128 && (l = sc_queue_pop(&io->req_queue))) { pktc_req_t* req = structof(l, pktc_req_t, link); - PNIO_DELAY_WARN(delay_warn("pktc_handle_req_queue", req->ctime_us, HANDLE_DELAY_WARN_US)); - pktc_post_io(io, req); - cnt++; + if (unlikely(PN_NORMAL_PKT != req->pkt_type)) { + // cmd pkt + pn_client_cmd_req_t* cmd_req = structof(req, pn_client_cmd_req_t, req); + if (PN_CMD_TERMINATE_PKT == cmd_req->cmd) { + // make rpc callback executed in advance + rk_info("hand cmd req, cmd=%ld, arg=%ld", cmd_req->cmd, cmd_req->arg); + pktc_resp_cb_on_terminate(io, (uint32_t)cmd_req->arg); + } + cfifo_free(cmd_req); + } else { + PNIO_DELAY_WARN(delay_warn("pktc_handle_req_queue", req->ctime_us, HANDLE_DELAY_WARN_US)); + pktc_post_io(io, req); + cnt++; + } } return cnt == 0? EAGAIN: 0; } diff --git a/deps/oblib/src/rpc/pnio/nio/pktc_resp.h b/deps/oblib/src/rpc/pnio/nio/pktc_resp.h index d0be65ce9..4854ab558 100644 --- a/deps/oblib/src/rpc/pnio/nio/pktc_resp.h +++ b/deps/oblib/src/rpc/pnio/nio/pktc_resp.h @@ -71,6 +71,19 @@ static void pktc_resp_cb_on_msg(pktc_t* io, pktc_msg_t* msg) { } } +static void pktc_resp_cb_on_terminate(pktc_t* io, uint32_t id) { + link_t* hlink = ihash_del(&io->cb_map, id); + if (hlink) { + pktc_cb_t* cb = structof(hlink, pktc_cb_t, hash_link); + dlink_delete(&cb->timer_dlink); + dlink_delete(&cb->sk_dlink); + cb->errcode = PNIO_PKT_TERMINATE; + pktc_do_cb_exception(io, cb); + } else { + rk_info("resp cb not found: packet_id=%u", id); + } +} + static int pktc_sk_handle_msg(pktc_sk_t* s, pktc_msg_t* m) { pktc_t* io = structof(s->fty, pktc_t, sf); pktc_resp_cb_on_msg(io, m);