pkt-nio release pktc request when exiting
This commit is contained in:
@ -305,6 +305,8 @@ int dispatch_to_ob_listener(int accept_fd) {
|
|||||||
int tranlate_to_ob_error(int err) {
|
int tranlate_to_ob_error(int err) {
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (PNIO_OK == err) {
|
if (PNIO_OK == err) {
|
||||||
|
} else if (PNIO_STOPPED == err) {
|
||||||
|
ret = OB_RPC_SEND_ERROR;
|
||||||
} else if (PNIO_LISTEN_ERROR == err) {
|
} else if (PNIO_LISTEN_ERROR == err) {
|
||||||
ret = OB_SERVER_LISTEN_ERROR;
|
ret = OB_SERVER_LISTEN_ERROR;
|
||||||
} else if (ENOMEM == err || -ENOMEM == err) {
|
} else if (ENOMEM == err || -ENOMEM == err) {
|
||||||
|
|||||||
43
deps/oblib/src/rpc/pnio/interface/group.c
vendored
43
deps/oblib/src/rpc/pnio/interface/group.c
vendored
@ -347,15 +347,21 @@ PN_API int pn_send(uint64_t gtid, struct sockaddr_in* addr, const char* buf, int
|
|||||||
if (addr->sin_addr.s_addr == 0 || htons(addr->sin_port) == 0) {
|
if (addr->sin_addr.s_addr == 0 || htons(addr->sin_port) == 0) {
|
||||||
err = -EINVAL;
|
err = -EINVAL;
|
||||||
rk_warn("invalid sin_addr: %x:%d", addr->sin_addr.s_addr, addr->sin_port);
|
rk_warn("invalid sin_addr: %x:%d", addr->sin_addr.s_addr, addr->sin_port);
|
||||||
}
|
} else if (expire_us < 0) {
|
||||||
pktc_req_t* r = pn_create_pktc_req(pn, gen_pkt_id(), dest, buf, sz, categ_id, expire_us, cb, arg);
|
err = -EINVAL;
|
||||||
if (NULL == r) {
|
rk_error("invalid rpc timeout: %ld, it might be that the up-layer rpc timeout is too large, categ_id=%d", expire_us, categ_id);
|
||||||
err = ENOMEM;
|
} else if (LOAD(&pn->is_stop_)) {
|
||||||
|
err = PNIO_STOPPED;
|
||||||
} else {
|
} else {
|
||||||
if (NULL != arg) {
|
pktc_req_t* r = pn_create_pktc_req(pn, gen_pkt_id(), dest, buf, sz, categ_id, expire_us, cb, arg);
|
||||||
*((void**)arg) = r;
|
if (NULL == r) {
|
||||||
|
err = ENOMEM;
|
||||||
|
} else {
|
||||||
|
if (NULL != arg) {
|
||||||
|
*((void**)arg) = r;
|
||||||
|
}
|
||||||
|
err = pktc_post(&pn->pktc, r);
|
||||||
}
|
}
|
||||||
err = pktc_post(&pn->pktc, r);
|
|
||||||
}
|
}
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
@ -378,6 +384,29 @@ PN_API void pn_wait(uint64_t gid)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void pn_release(pn_comm_t* pn_comm)
|
||||||
|
{
|
||||||
|
if (NULL == pn_comm) {
|
||||||
|
// do nothing
|
||||||
|
rk_warn("unexpected argument");
|
||||||
|
} else {
|
||||||
|
pn_t* pn = (typeof(pn))pn_comm;
|
||||||
|
// empty pktc->req_queue
|
||||||
|
link_t* l = NULL;
|
||||||
|
pktc_t* pktc = &pn->pktc;
|
||||||
|
while((l = sc_queue_pop(&pktc->req_queue))) {
|
||||||
|
pktc_req_t* req = structof(l, pktc_req_t, link);
|
||||||
|
pktc_post_io(pktc, req);
|
||||||
|
}
|
||||||
|
// destroy pktc socket
|
||||||
|
dlink_for(&pktc->sk_list, p) {
|
||||||
|
pktc_sk_t* s = structof(p, pktc_sk_t, list_link);
|
||||||
|
rk_info("sock destroy: sock=%p, connection=%s", s, T2S(sock_fd, s->fd));
|
||||||
|
sock_destroy((sock_t*)s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
typedef struct pn_resp_ctx_t
|
typedef struct pn_resp_ctx_t
|
||||||
{
|
{
|
||||||
pn_t* pn;
|
pn_t* pn;
|
||||||
|
|||||||
2
deps/oblib/src/rpc/pnio/interface/group.h
vendored
2
deps/oblib/src/rpc/pnio/interface/group.h
vendored
@ -67,9 +67,11 @@ PN_API void pn_stop(uint64_t gid);
|
|||||||
PN_API void pn_wait(uint64_t gid);
|
PN_API void pn_wait(uint64_t gid);
|
||||||
extern int64_t pnio_keepalive_timeout;
|
extern int64_t pnio_keepalive_timeout;
|
||||||
pn_comm_t* get_current_pnio();
|
pn_comm_t* get_current_pnio();
|
||||||
|
void pn_release(pn_comm_t* pn_comm);
|
||||||
|
|
||||||
#define PNIO_OK 0
|
#define PNIO_OK 0
|
||||||
#define PNIO_ERROR (-1)
|
#define PNIO_ERROR (-1)
|
||||||
|
#define PNIO_STOPPED (-45)
|
||||||
#define PNIO_DISCONNECT (-46)
|
#define PNIO_DISCONNECT (-46)
|
||||||
#define PNIO_TIMEOUT (-47)
|
#define PNIO_TIMEOUT (-47)
|
||||||
#define PNIO_CONNECT_FAIL (-49)
|
#define PNIO_CONNECT_FAIL (-49)
|
||||||
|
|||||||
1
deps/oblib/src/rpc/pnio/io/eloop.c
vendored
1
deps/oblib/src/rpc/pnio/io/eloop.c
vendored
@ -129,5 +129,6 @@ int eloop_run(eloop_t* ep) {
|
|||||||
last_time = cur_time_us;
|
last_time = cur_time_us;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pn_release(pn);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user