destroy pkt-nio server when obcdc exiting

This commit is contained in:
liucc1997
2024-05-08 14:16:11 +00:00
committed by ob-robot
parent 052bb6b8c5
commit 5d011b4917
5 changed files with 39 additions and 10 deletions

View File

@ -314,6 +314,8 @@ void ObPocRpcServer::stop()
for (uint64_t gid = 1; gid < END_GROUP; gid++) {
pn_stop(gid);
}
has_start_ = false;
start_as_client_ = false;
}
void ObPocRpcServer::wait()
@ -323,6 +325,12 @@ void ObPocRpcServer::wait()
}
}
void ObPocRpcServer::destroy()
{
stop();
wait();
}
int ObPocRpcServer::update_tcp_keepalive_params(int64_t user_timeout) {
int ret = OB_SUCCESS;
if (pn_set_keepalive_timeout(user_timeout) != user_timeout) {

View File

@ -71,6 +71,7 @@ public:
int start_net_client(int net_thread_count);
void stop();
void wait();
void destroy();
bool has_start() {return has_start_;}
int update_tcp_keepalive_params(int64_t user_timeout);
int update_server_standby_fetch_log_bandwidth_limit(int64_t value);

View File

@ -255,12 +255,32 @@ PN_API int pn_provision(int listen_id, int gid, int thread_count)
int err = 0;
int count = 0;
pn_grp_t* pn_grp = ensure_grp(gid);
ef(pn_grp == NULL);
if (thread_count > MAX_PN_PER_GRP) {
if (pn_grp == NULL) {
err = -ENOMEM;
rk_error("ensure group failed, gid=%d", gid);
} else if (thread_count > MAX_PN_PER_GRP) {
err = -EINVAL;
rk_error("thread count is too large, thread_count=%d, MAX_PN_PER_GRP=%d", thread_count, MAX_PN_PER_GRP);
}
count = pn_grp->count;
// restart stoped threads
for (int i = 0; 0 == err && i < count; i++) {
pn_t* pn = pn_grp->pn_array[i];
if (!pn->is_stop_) {
// pn thread is running, do nothing
} else if (pn->pd) {
err = PNIO_ERROR;
rk_error("pn is stopped but the thread is still running, gid=%d, i=%d", gid, i)
} else {
pn->is_stop_ = false;
if (0 != (err = ob_pthread_create(&pn->pd, pn_thread_func, pn))) {
pn->is_stop_ = true;
rk_error("pthread_create failed, gid=%d, i=%d", gid, i);
} else {
rk_info("pn pthread created, gid=%d, i=%d", gid, i);
}
}
}
while(0 == err && count < thread_count) {
pn_t* pn = pn_create(listen_id, gid, count);
if (NULL == pn) {
@ -268,14 +288,15 @@ PN_API int pn_provision(int listen_id, int gid, int thread_count)
} else if (0 != (err = ob_pthread_create(&pn->pd, pn_thread_func, pn))) {
pn_destroy(pn);
} else {
pn->has_stopped_ = false;
pn_grp->pn_array[count++] = pn;
}
}
pn_grp->count = count;
return pn_grp->count;
el();
return -1;
int ret = -1;
if (0 == err) {
pn_grp->count = count;
ret = pn_grp->count;
}
return ret;
}
typedef struct pn_pktc_cb_t
@ -444,10 +465,9 @@ PN_API void pn_wait(uint64_t gid)
if (pgrp != NULL) {
for (int tid = 0; tid < pgrp->count; tid++) {
pn_t *pn = get_pn_for_send(pgrp, tid);
if (!pn->has_stopped_) {
if (NULL != pn->pd) {
ob_pthread_join(pn->pd);
pn->pd = NULL;
pn->has_stopped_ = true;
}
}
}

View File

@ -51,7 +51,6 @@ typedef struct pn_grp_comm_t
#define PN_COMM \
bool is_stop_; \
bool has_stopped_; \
void *pd; \
int accept_qfd; \
int gid; \