terminate sync rpc when session killed

This commit is contained in:
liucc1997 2024-01-15 03:13:19 +00:00 committed by ob-robot
parent d14882657a
commit 0d64c0ba12
8 changed files with 183 additions and 46 deletions

View File

@ -14,6 +14,7 @@
#include "rpc/obrpc/ob_poc_rpc_server.h"
#include "rpc/obrpc/ob_rpc_proxy.h"
#include "rpc/obrpc/ob_net_keepalive.h"
#include "share/ob_errno.h"
extern "C" {
#include "rpc/pnio/r0/futex.h"
}
@ -63,8 +64,20 @@ int ObSyncRespCallback::handle_resp(int io_err, const char* buf, int64_t sz)
int ObSyncRespCallback::wait(const int64_t wait_timeout_us, const int64_t pcode, const int64_t req_sz)
{
ObWaitEventGuard wait_guard(ObWaitEventIds::SYNC_RPC, wait_timeout_us / 1000, pcode, req_sz);
const struct timespec ts = {1, 0};
bool has_terminated = false;
while(ATOMIC_LOAD(&cond_) == 0) {
rk_futex_wait(&cond_, 0, NULL);
if (!has_terminated && OB_ERR_SESSION_INTERRUPTED == THIS_WORKER.check_status()) {
RPC_LOG(INFO, "check session killed, will execute pn_terminate_pkt", K(gtid_), K(pkt_id_));
int err = 0;
if ((err = pn_terminate_pkt(gtid_, pkt_id_)) != 0) {
int tmp_ret = tranlate_to_ob_error(err);
RPC_LOG_RET(WARN, tmp_ret, "pn_terminate_pkt failed", K(err));
} else {
has_terminated = true;
}
}
rk_futex_wait(&cond_, 0, &ts);
}
return send_ret_;
}

View File

@ -28,7 +28,8 @@ namespace obrpc
class ObSyncRespCallback
{
public:
ObSyncRespCallback(ObRpcMemPool& pool): pkt_nio_cb_(NULL), pool_(pool), resp_(NULL), sz_(0), cond_(0), send_ret_(common::OB_SUCCESS){}
ObSyncRespCallback(ObRpcMemPool& pool)
: pkt_nio_cb_(NULL), pool_(pool), resp_(NULL), sz_(0), cond_(0), send_ret_(common::OB_SUCCESS), gtid_(0), pkt_id_(0){}
~ObSyncRespCallback() {}
void* alloc(int64_t sz) { return pool_.alloc(sz); }
int handle_resp(int io_err, const char* buf, int64_t sz);
@ -48,6 +49,9 @@ private:
int64_t sz_;
int cond_;
int send_ret_;
public:
uint64_t gtid_;
uint32_t pkt_id_;
};
typedef rpc::frame::ObReqTransport::AsyncCB UAsyncCB;
@ -73,6 +77,9 @@ private:
void* pkt_nio_cb_;
ObRpcMemPool& pool_;
UAsyncCB* ucb_;
public:
uint64_t gtid_;
uint32_t pkt_id_;
};
void init_ucb(ObRpcProxy& proxy, UAsyncCB* ucb, const common::ObAddr& addr, int64_t send_ts, int64_t payload_sz);
@ -138,27 +145,32 @@ public:
RPC_LOG(WARN, "rpc encode req fail", K(ret));
} else if(OB_FAIL(check_blacklist(addr))) {
RPC_LOG(WARN, "check_blacklist failed", K(ret));
} else if (0 != (sys_err = pn_send(
(pnio_group_id<<32) + thread_id,
obaddr2sockaddr(&sock_addr, addr),
} else {
const pn_pkt_t pkt = {
req,
req_sz,
static_cast<int16_t>(set.idx_of_pcode(pcode)),
start_ts + get_proxy_timeout(proxy),
static_cast<int16_t>(set.idx_of_pcode(pcode)),
ObSyncRespCallback::client_cb,
&cb))) {
ret = translate_io_error(sys_err);
RPC_LOG(WARN, "pn_send fail", K(sys_err), K(addr), K(pcode));
} else {
&cb
};
cb.gtid_ = (pnio_group_id<<32) + thread_id;
if (0 != (sys_err = pn_send((pnio_group_id<<32) + thread_id, obaddr2sockaddr(&sock_addr, addr), &pkt, &cb.pkt_id_))) {
ret = translate_io_error(sys_err);
RPC_LOG(WARN, "pn_send fail", K(sys_err), K(addr), K(pcode));
}
}
if (OB_SUCC(ret)) {
EVENT_INC(RPC_PACKET_OUT);
EVENT_ADD(RPC_PACKET_OUT_BYTES, req_sz);
if (OB_FAIL(cb.wait(get_proxy_timeout(proxy), pcode, req_sz))) {
RPC_LOG(WARN, "sync rpc execute fail", K(ret), K(addr), K(pcode));
int64_t timeout = get_proxy_timeout(proxy);
if (OB_FAIL(cb.wait(timeout, pcode, req_sz))) {
RPC_LOG(WARN, "sync rpc execute fail", K(ret), K(addr), K(pcode), K(timeout));
} else if (NULL == (resp = cb.get_resp(resp_sz))) {
ret = common::OB_ERR_UNEXPECTED;
RPC_LOG(WARN, "sync rpc execute success but resp is null", K(ret), K(addr), K(pcode));
RPC_LOG(WARN, "sync rpc execute success but resp is null", K(ret), K(addr), K(pcode), K(timeout));
} else if (OB_FAIL(rpc_decode_resp(resp, resp_sz, out, resp_pkt, rcode))) {
RPC_LOG(WARN, "execute rpc fail", K(addr), K(pcode), K(ret));
RPC_LOG(WARN, "execute rpc fail", K(addr), K(pcode), K(ret), K(timeout));
}
}
}
@ -210,6 +222,7 @@ public:
} else {
char* req = NULL;
int64_t req_sz = 0;
uint32_t* pkt_id_ptr = NULL;
timeguard.click();
if (OB_FAIL(rpc_encode_req(proxy, *pool, pcode, args, opts, req, req_sz, NULL == ucb))) {
RPC_LOG(WARN, "rpc encode req fail", K(ret));
@ -224,20 +237,25 @@ public:
set_ucb_args(newcb, args);
init_ucb(proxy, cb->get_ucb(), addr, start_ts, req_sz);
}
cb->gtid_ = (pnio_group_id<<32) + thread_id;
pkt_id_ptr = &cb->pkt_id_;
}
timeguard.click();
if (OB_SUCC(ret)) {
sockaddr_in sock_addr;
const pn_pkt_t pkt = {
req,
req_sz,
start_ts + get_proxy_timeout(proxy),
static_cast<int16_t>(set.idx_of_pcode(pcode)),
ObAsyncRespCallback::client_cb,
cb
};
if (0 != (sys_err = pn_send(
(pnio_group_id<<32) + thread_id,
obaddr2sockaddr(&sock_addr, addr),
req,
req_sz,
static_cast<int16_t>(set.idx_of_pcode(pcode)),
start_ts + get_proxy_timeout(proxy),
ObAsyncRespCallback::client_cb,
cb)
)) {
&pkt,
pkt_id_ptr))) {
ret = translate_io_error(sys_err);
RPC_LOG(WARN, "pn_send fail", K(sys_err), K(addr), K(pcode));
} else {

View File

@ -72,17 +72,22 @@ int SSHandle<pcodeStruct>::get_more(typename pcodeStruct::Response &result)
RPC_LOG(WARN, "rpc encode req fail", K(ret));
} else if(OB_FAIL(ObPocClientStub::check_blacklist(dst_))) {
RPC_LOG(WARN, "check_blacklist failed", K(ret));
} else if (0 != (pn_err = pn_send(
(pnio_group_id<<32) + thread_id,
ObPocClientStub::obaddr2sockaddr(&sock_addr, dst_),
} else {
const pn_pkt_t pkt = {
pnio_req,
pnio_req_sz,
static_cast<int16_t>(set.idx_of_pcode(pcode_)),
start_ts + proxy_.timeout(),
static_cast<int16_t>(set.idx_of_pcode(pcode_)),
ObSyncRespCallback::client_cb,
&cb))) {
ret = ObPocClientStub::translate_io_error(pn_err);
RPC_LOG(WARN, "pnio post fail", K(pn_err));
&cb
};
cb.gtid_ = (pnio_group_id<<32) + thread_id;
if (0 != (pn_err = pn_send((pnio_group_id<<32) + thread_id, ObPocClientStub::obaddr2sockaddr(&sock_addr, dst_), &pkt, &cb.pkt_id_))) {
ret = ObPocClientStub::translate_io_error(pn_err);
RPC_LOG(WARN, "pnio post fail", K(pn_err));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(cb.wait(proxy_.timeout(), pcode_, pnio_req_sz))) {
RPC_LOG(WARN, "stream rpc execute fail", K(ret), K(dst_));
} else if (NULL == (resp = cb.get_resp(resp_sz))) {
@ -208,17 +213,22 @@ int SSHandle<pcodeStruct>::abort()
RPC_LOG(WARN, "rpc encode req fail", K(ret));
} else if(OB_FAIL(ObPocClientStub::check_blacklist(dst_))) {
RPC_LOG(WARN, "check_blacklist failed", K(ret));
} else if (0 != (pn_err = pn_send(
(pnio_group_id<<32) + thread_id,
ObPocClientStub::obaddr2sockaddr(&sock_addr, dst_),
} else {
const pn_pkt_t pkt = {
pnio_req,
pnio_req_sz,
static_cast<int16_t>(set.idx_of_pcode(pcode_)),
start_ts + proxy_.timeout(),
static_cast<int16_t>(set.idx_of_pcode(pcode_)),
ObSyncRespCallback::client_cb,
&cb))) {
ret = ObPocClientStub::translate_io_error(pn_err);
RPC_LOG(WARN, "pnio post fail", K(pn_err));
&cb
};
cb.gtid_ = (pnio_group_id<<32) + thread_id;
if (0 != (pn_err = pn_send((pnio_group_id<<32) + thread_id, ObPocClientStub::obaddr2sockaddr(&sock_addr, dst_), &pkt, &cb.pkt_id_))) {
ret = ObPocClientStub::translate_io_error(pn_err);
RPC_LOG(WARN, "pnio post fail", K(pn_err));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(cb.wait(proxy_.timeout(), pcode_, pnio_req_sz))) {
RPC_LOG(WARN, "stream rpc execute fail", K(ret), K(dst_));
} else if (NULL == (resp = cb.get_resp(resp_sz))) {

View File

@ -278,6 +278,7 @@ typedef struct pn_client_req_t
easy_head_t head;
} pn_client_req_t;
typedef struct pn_client_slice_t
{
int64_t ref_;
@ -307,8 +308,10 @@ static void pn_pktc_resp_cb(pktc_cb_t* cb, const char* resp, int64_t sz)
cfifo_free(pn_cb);
}
static pktc_req_t* pn_create_pktc_req(pn_t* pn, uint64_t pkt_id, addr_t dest, const char* req, int64_t req_sz, int16_t categ_id, int64_t expire_us, client_cb_t client_cb, void* arg)
static pktc_req_t* pn_create_pktc_req(pn_t* pn, uint64_t pkt_id, addr_t dest, const pn_pkt_t* pkt)
{
const char* req = pkt->buf;
const int64_t req_sz = pkt->sz;
pn_client_req_t* pn_req = (typeof(pn_req))cfifo_alloc(&pn->client_req_alloc, sizeof(*pn_req) + req_sz);
if (unlikely(NULL == pn_req)) {
return NULL;
@ -320,22 +323,40 @@ static pktc_req_t* pn_create_pktc_req(pn_t* pn, uint64_t pkt_id, addr_t dest, co
}
pktc_cb_t* cb = &pn_cb->cb;
pktc_req_t* r = &pn_req->req;
pn_cb->client_cb = client_cb;
pn_cb->arg = arg;
pn_cb->client_cb = pkt->cb;
pn_cb->arg = pkt->arg;
cb->id = pkt_id;
cb->expire_us = expire_us;
cb->expire_us = pkt->expire_us;
cb->resp_cb = pn_pktc_resp_cb;
cb->errcode = PNIO_OK;
cb->req = r;
r->pkt_type = PN_NORMAL_PKT;
r->flush_cb = pn_pktc_flush_cb;
r->resp_cb = cb;
r->dest = dest;
r->categ_id = categ_id;
r->categ_id = pkt->categ_id;
dlink_init(&r->link);
eh_copy_msg(&r->msg, cb->id, req, req_sz);
return r;
}
static pktc_req_t* pn_create_cmd_req(pn_t* pn, int64_t cmd, uint64_t pkt_id)
{
pktc_req_t* r = NULL;
pn_client_cmd_req_t* pn_req = (typeof(pn_req))cfifo_alloc(&pn->client_req_alloc, sizeof(*pn_req));
if (likely(pn_req)) {
memset(pn_req, 0, sizeof(*pn_req));
r = &pn_req->req;
r->pkt_type = PN_CMD_PKT;
r->flush_cb = NULL;
r->resp_cb = NULL;
pn_req->cmd = cmd;
pn_req->arg = pkt_id;
eh_copy_msg(&r->msg, pkt_id, NULL, 0);
}
return r;
}
static uint32_t global_next_pkt_id RK_CACHE_ALIGNED;
static uint32_t gen_pkt_id()
{
@ -348,9 +369,15 @@ static pn_t* get_pn_for_send(pn_grp_t* pgrp, int tid)
return pgrp->pn_array[tid % pgrp->count];
}
PN_API int pn_send(uint64_t gtid, struct sockaddr_in* addr, const char* buf, int64_t sz, int16_t categ_id, int64_t expire_us, client_cb_t cb, void* arg)
PN_API int pn_send(uint64_t gtid, struct sockaddr_in* addr, const pn_pkt_t* pkt, uint32_t* pkt_id_ret)
{
int err = 0;
const char* buf = pkt->buf;
const int64_t sz = pkt->sz;
const int16_t categ_id = pkt->categ_id;
const int64_t expire_us = pkt->expire_us;
const void* arg = pkt->arg;
pn_grp_t* pgrp = locate_grp(gtid>>32);
pn_t* pn = get_pn_for_send(pgrp, gtid & 0xffffffff);
addr_t dest = {.ip=addr->sin_addr.s_addr, .port=htons(addr->sin_port), .tid=0};
@ -364,12 +391,13 @@ PN_API int pn_send(uint64_t gtid, struct sockaddr_in* addr, const char* buf, int
} else if (LOAD(&pn->is_stop_)) {
err = PNIO_STOPPED;
} else {
pktc_req_t* r = pn_create_pktc_req(pn, pkt_id, dest, buf, sz, categ_id, expire_us, cb, arg);
pktc_req_t* r = pn_create_pktc_req(pn, pkt_id, dest, pkt);
if (NULL == r) {
err = ENOMEM;
} else {
if (NULL != arg) {
*((void**)arg) = r;
*pkt_id_ret = pkt_id;
}
err = pktc_post(&pn->pktc, r);
}
@ -574,6 +602,24 @@ PN_API uint64_t pn_get_rxbytes(int grp_id) {
return bytes;
}
PN_API int pn_terminate_pkt(uint64_t gtid, uint32_t pkt_id) {
int err = 0;
pn_grp_t* pgrp = locate_grp(gtid>>32);
if (NULL == pgrp) {
err = EINVAL;
} else {
pn_t* pn = get_pn_for_send(pgrp, gtid & 0xffffffff);
pktc_req_t* r = pn_create_cmd_req(pn, PN_CMD_TERMINATE_PKT, pkt_id);
if (NULL == r) {
err = ENOMEM;
rk_warn("create cmd req failed, gtid=0x%lx, pkt_id=%u", gtid, pkt_id);
} else {
err = pktc_post(&pn->pktc, r);
}
}
return err;
}
int dispatch_accept_fd_to_certain_group(int fd, uint64_t gid)
{
int ret = 0;

View File

@ -63,13 +63,23 @@ typedef struct pn_comm_t
PN_COMM;
} pn_comm_t;
typedef struct pn_pkt_t
{
char* buf;
int64_t sz;
int64_t expire_us;
int16_t categ_id;
client_cb_t cb;
void* arg;
} pn_pkt_t;
PN_API int64_t pn_set_keepalive_timeout(int64_t user_timeout);
PN_API int pn_listen(int port, serve_cb_t cb);
// if listen_id == -1, act as client only
// make sure grp != 0
PN_API int pn_provision(int listen_id, int grp, int thread_count);
// gid_tid = (gid<<8) | tid
PN_API int pn_send(uint64_t gid_tid, struct sockaddr_in* addr, const char* buf, int64_t sz, int16_t categ_id, int64_t expire_us, client_cb_t cb, void* arg);
PN_API int pn_send(uint64_t gtid, struct sockaddr_in* addr, const pn_pkt_t* pkt, uint32_t* pkt_id_ret);
PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz, int64_t resp_expired_abs_us);
PN_API int pn_get_peer(uint64_t req_id, struct sockaddr_storage* addr);
PN_API int pn_ratelimit(int grp_id, int64_t value);
@ -79,6 +89,7 @@ PN_API int dispatch_accept_fd_to_certain_group(int fd, uint64_t gid);
PN_API void pn_stop(uint64_t gid);
PN_API void pn_wait(uint64_t gid);
PN_API int pn_get_fd(uint64_t req_id);
PN_API int pn_terminate_pkt(uint64_t gtid, uint32_t pkt_id);
extern int64_t pnio_keepalive_timeout;
pn_comm_t* get_current_pnio();
void pn_release(pn_comm_t* pn_comm);
@ -93,7 +104,13 @@ void pn_release(pn_comm_t* pn_comm);
#define PNIO_TIMEOUT_NOT_SENT_OUT (-54)
#define PNIO_DISCONNECT_NOT_SENT_OUT (-55)
#define PNIO_LISTEN_ERROR (-56)
#define PNIO_PKT_TERMINATE (-57)
enum {
PN_NORMAL_PKT = 0,
PN_CMD_PKT,
PN_CMD_TERMINATE_PKT
};
/*
// 启动listen线程和epool线程池, epoll线程池有10个线程
int listen_id = pn_listen(8042, cb);

View File

@ -28,6 +28,7 @@ struct pktc_cb_t {
};
struct pktc_req_t {
int64_t pkt_type;
struct pktc_sk_t* sk;
PNIO_DELAY_WARN(int64_t ctime_us);
pktc_flush_cb_func_t flush_cb;
@ -38,6 +39,14 @@ struct pktc_req_t {
str_t msg;
};
typedef struct pn_client_cmd_req_t
{
pktc_req_t req;
easy_head_t head;
int64_t cmd;
int64_t arg;
} pn_client_cmd_req_t;
extern int64_t pktc_init(pktc_t* io, eloop_t* ep, uint64_t dispatch_id);
extern int pktc_post(pktc_t* io, pktc_req_t* req);

View File

@ -117,9 +117,20 @@ static int pktc_handle_req_queue(pktc_t* io) {
int cnt = 0;
while(cnt < 128 && (l = sc_queue_pop(&io->req_queue))) {
pktc_req_t* req = structof(l, pktc_req_t, link);
PNIO_DELAY_WARN(delay_warn("pktc_handle_req_queue", req->ctime_us, HANDLE_DELAY_WARN_US));
pktc_post_io(io, req);
cnt++;
if (unlikely(PN_NORMAL_PKT != req->pkt_type)) {
// cmd pkt
pn_client_cmd_req_t* cmd_req = structof(req, pn_client_cmd_req_t, req);
if (PN_CMD_TERMINATE_PKT == cmd_req->cmd) {
// make rpc callback executed in advance
rk_info("hand cmd req, cmd=%ld, arg=%ld", cmd_req->cmd, cmd_req->arg);
pktc_resp_cb_on_terminate(io, (uint32_t)cmd_req->arg);
}
cfifo_free(cmd_req);
} else {
PNIO_DELAY_WARN(delay_warn("pktc_handle_req_queue", req->ctime_us, HANDLE_DELAY_WARN_US));
pktc_post_io(io, req);
cnt++;
}
}
return cnt == 0? EAGAIN: 0;
}

View File

@ -71,6 +71,19 @@ static void pktc_resp_cb_on_msg(pktc_t* io, pktc_msg_t* msg) {
}
}
static void pktc_resp_cb_on_terminate(pktc_t* io, uint32_t id) {
link_t* hlink = ihash_del(&io->cb_map, id);
if (hlink) {
pktc_cb_t* cb = structof(hlink, pktc_cb_t, hash_link);
dlink_delete(&cb->timer_dlink);
dlink_delete(&cb->sk_dlink);
cb->errcode = PNIO_PKT_TERMINATE;
pktc_do_cb_exception(io, cb);
} else {
rk_info("resp cb not found: packet_id=%u", id);
}
}
static int pktc_sk_handle_msg(pktc_sk_t* s, pktc_msg_t* m) {
pktc_t* io = structof(s->fty, pktc_t, sf);
pktc_resp_cb_on_msg(io, m);