From bbb1cd866f649de2520ad4608e5599872c15a1ec Mon Sep 17 00:00:00 2001 From: liucc1997 <1192520566@qq.com> Date: Wed, 21 Jun 2023 12:18:48 +0000 Subject: [PATCH] stop ussl-loop thread and pnio threads early --- deps/oblib/src/rpc/pnio/interface/group.c | 20 ++++++++++++++------ deps/oblib/src/rpc/pnio/interface/group.h | 1 + deps/ussl-hook/ussl-hook.c | 2 +- deps/ussl-hook/ussl-loop.c | 6 +++++- src/observer/ob_server.cpp | 20 ++++---------------- src/observer/ob_srv_network_frame.cpp | 9 +++++++++ src/observer/ob_srv_network_frame.h | 1 + 7 files changed, 35 insertions(+), 24 deletions(-) diff --git a/deps/oblib/src/rpc/pnio/interface/group.c b/deps/oblib/src/rpc/pnio/interface/group.c index daa511b9b..5122ea2ef 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.c +++ b/deps/oblib/src/rpc/pnio/interface/group.c @@ -243,6 +243,7 @@ PN_API int pn_provision(int listen_id, int gid, int thread_count) } else if (0 != (err = ob_pthread_create(&pn->pd, NULL, pn_thread_func, pn))) { pn_destroy(pn); } else { + pn->has_stopped_ = false; pn_grp->pn_array[count++] = pn; } } @@ -369,18 +370,25 @@ PN_API int pn_send(uint64_t gtid, struct sockaddr_in* addr, const char* buf, int PN_API void pn_stop(uint64_t gid) { pn_grp_t *pgrp = locate_grp(gid); - for (int tid = 0; tid < pgrp->count; tid++) { - pn_t *pn = get_pn_for_send(pgrp, tid); - ATOMIC_STORE(&pn->is_stop_, true); + if (pgrp != NULL) { + for (int tid = 0; tid < pgrp->count; tid++) { + pn_t *pn = get_pn_for_send(pgrp, tid); + ATOMIC_STORE(&pn->is_stop_, true); + } } } PN_API void pn_wait(uint64_t gid) { pn_grp_t *pgrp = locate_grp(gid); - for (int tid = 0; tid < pgrp->count; tid++) { - pn_t *pn = get_pn_for_send(pgrp, tid); - pthread_join(pn->pd, NULL); + if (pgrp != NULL) { + for (int tid = 0; tid < pgrp->count; tid++) { + pn_t *pn = get_pn_for_send(pgrp, tid); + if (!pn->has_stopped_) { + pthread_join(pn->pd, NULL); + pn->has_stopped_ = true; + } + } } } diff --git a/deps/oblib/src/rpc/pnio/interface/group.h b/deps/oblib/src/rpc/pnio/interface/group.h index f16415d12..6d31c6e2d 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.h +++ b/deps/oblib/src/rpc/pnio/interface/group.h @@ -39,6 +39,7 @@ typedef struct pn_grp_comm_t #define PN_COMM \ bool is_stop_; \ + bool has_stopped_; \ pthread_t pd; \ int accept_qfd; \ int gid; \ diff --git a/deps/ussl-hook/ussl-hook.c b/deps/ussl-hook/ussl-hook.c index 5f14c1abe..5e32bbc44 100644 --- a/deps/ussl-hook/ussl-hook.c +++ b/deps/ussl-hook/ussl-hook.c @@ -25,7 +25,7 @@ static __thread int ussl_server_ctx_id = -1; static uint64_t global_gid_arr[USSL_MAX_FD_NUM]; static int global_client_ctx_id_arr[USSL_MAX_FD_NUM]; static int global_send_negotiation_arr[USSL_MAX_FD_NUM]; -static int is_ussl_bg_thread_started = 0; +int is_ussl_bg_thread_started = 0; static __attribute__((constructor(102))) void init_global_array() { diff --git a/deps/ussl-hook/ussl-loop.c b/deps/ussl-hook/ussl-loop.c index 530405d03..a43fc5f40 100644 --- a/deps/ussl-hook/ussl-loop.c +++ b/deps/ussl-hook/ussl-loop.c @@ -100,9 +100,13 @@ int ussl_init_bg_thread() return ret; } +extern int is_ussl_bg_thread_started; void ussl_wait_bg_thread() { - pthread_join(ussl_bg_thread_id, NULL); + if (ATOMIC_LOAD(&is_ussl_bg_thread_started)) { + pthread_join(ussl_bg_thread_id, NULL); + ATOMIC_STORE(&is_ussl_bg_thread_started, 0); + } } void add_to_timeout_list(ussl_dlink_t *l) diff --git a/src/observer/ob_server.cpp b/src/observer/ob_server.cpp index 8314ec5c5..de41769a2 100644 --- a/src/observer/ob_server.cpp +++ b/src/observer/ob_server.cpp @@ -1069,6 +1069,10 @@ int ObServer::stop() net_frame_.sql_nio_stop(); FLOG_INFO("sql nio stopped"); + FLOG_INFO("begin to stop rpc listen and io threads"); + net_frame_.rpc_stop(); + FLOG_INFO("rpc stopped"); + FLOG_INFO("begin to stop active session history task"); ObActiveSessHistTask::get_instance().stop(); FLOG_INFO("active session history task stopped"); @@ -1283,14 +1287,6 @@ int ObServer::stop() FLOG_INFO("net frame stopped"); } - FLOG_INFO("begin to stop ussl"); - ussl_stop(); - FLOG_INFO("stop ussl success"); - - FLOG_INFO("begin to stop global_poc_server"); - obrpc::global_poc_server.stop(); - FLOG_INFO("stop global_poc_server success"); - FLOG_INFO("begin to stop rootservice event history"); ROOTSERVICE_EVENT_INSTANCE.stop(); FLOG_INFO("rootservice event history stopped"); @@ -1534,14 +1530,6 @@ int ObServer::wait() FLOG_INFO("wait global election report timer success"); - FLOG_INFO("begin to wait ussl"); - ussl_wait(); - FLOG_INFO("wait ussl success"); - - FLOG_INFO("begin to wait global_poc_server"); - obrpc::global_poc_server.wait(); - FLOG_INFO("wait global_poc_server success"); - FLOG_INFO("begin to wait rootservice event history"); ROOTSERVICE_EVENT_INSTANCE.wait(); FLOG_INFO("wait rootservice event history success"); diff --git a/src/observer/ob_srv_network_frame.cpp b/src/observer/ob_srv_network_frame.cpp index 2854dc792..5d42346d2 100644 --- a/src/observer/ob_srv_network_frame.cpp +++ b/src/observer/ob_srv_network_frame.cpp @@ -515,6 +515,8 @@ void ObSrvNetworkFrame::wait() if (NULL != obmysql::global_sql_nio_server) { obmysql::global_sql_nio_server->wait(); } + obrpc::global_poc_server.wait(); + ussl_wait(); } int ObSrvNetworkFrame::stop() @@ -526,6 +528,7 @@ int ObSrvNetworkFrame::stop() if (OB_FAIL(net_.stop())) { LOG_WARN("stop easy net fail", K(ret)); } else { + rpc_stop(); ObNetKeepAlive::get_instance().stop(); } ingress_service_.stop(); @@ -570,6 +573,12 @@ void ObSrvNetworkFrame::sql_nio_stop() } } +void ObSrvNetworkFrame::rpc_stop() +{ + ussl_stop(); + obrpc::global_poc_server.stop(); +} + int ObSrvNetworkFrame::reload_rpc_auth_method() { int ret = OB_SUCCESS; diff --git a/src/observer/ob_srv_network_frame.h b/src/observer/ob_srv_network_frame.h index 05ad7f90f..9544a7f44 100644 --- a/src/observer/ob_srv_network_frame.h +++ b/src/observer/ob_srv_network_frame.h @@ -51,6 +51,7 @@ public: int batch_rpc_shutdown(); int unix_rpc_shutdown(); void sql_nio_stop(); + void rpc_stop(); void wait(); int stop();