diff --git a/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp b/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp index 48990e9b47..9698830ced 100644 --- a/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp +++ b/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp @@ -735,21 +735,18 @@ public: tcp_keepintvl_ = tcp_keepintvl; tcp_keepcnt_ = tcp_keepcnt; } - void close_all_fd() { - if (lfd_ > 0) { - IGNORE_RETURN epoll_ctl(epfd_, EPOLL_CTL_DEL, lfd_, NULL); - close(lfd_); - lfd_ = -1; - } + void close_all() { ObDLink* head = all_list_.head(); ObLink* cur = head->next_; while (cur != head) { ObSqlSock* s = CONTAINER_OF(cur, ObSqlSock, all_list_link_); cur = cur->next_; - s->on_disconnect(); - ObSqlSockSession *sess = (ObSqlSockSession *)s->sess_; - sess->destroy(); - s->do_close(); + prepare_destroy(s); + } + while(head->next_ != head) { + handle_write_req_queue(); + handle_close_req_queue(); + handle_pending_destroy_list(); } } private: @@ -1065,7 +1062,7 @@ void ObSqlNio::run(int64_t idx) impl_[idx].do_work(); } if (has_set_stop()) { - impl_[idx].close_all_fd(); + impl_[idx].close_all(); } } } diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp index c9d8e5a640..6cd26f3273 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp @@ -190,6 +190,21 @@ int ObPocRpcServer::start(int port, int net_thread_count, frame::ObReqDeliver* d } return ret; } + +void ObPocRpcServer::stop() +{ + for (uint64_t gid = 1; gid < END_GROUP; gid++) { + pn_stop(gid); + } +} + +void ObPocRpcServer::wait() +{ + for (uint64_t gid = 1; gid < END_GROUP; gid++) { + pn_wait(gid); + } +} + int ObPocRpcServer::update_tcp_keepalive_params(int64_t user_timeout) { int ret = OB_SUCCESS; if (pn_set_keepalive_timeout(user_timeout) != user_timeout) { diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h index eecf8dba6f..843ce0b255 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.h @@ -51,12 +51,14 @@ class ObPocRpcServer public: enum { DEFAULT_PNIO_GROUP = 1, - RATELIMIT_PNIO_GROUP = 2 + RATELIMIT_PNIO_GROUP = 2, + END_GROUP }; ObPocRpcServer() : has_start_(false){} ~ObPocRpcServer() {} int start(int port, int net_thread_count, rpc::frame::ObReqDeliver* deliver); - void stop() {} + void stop(); + void wait(); bool has_start() {return has_start_;} int update_tcp_keepalive_params(int64_t user_timeout); int update_server_standby_fetch_log_bandwidth_limit(int64_t value); diff --git a/deps/oblib/src/rpc/pnio/interface/group.c b/deps/oblib/src/rpc/pnio/interface/group.c index f458a2a2f9..c967b72ffc 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.c +++ b/deps/oblib/src/rpc/pnio/interface/group.c @@ -223,6 +223,7 @@ static pn_t* pn_create(int listen_id, int gid, int tid) cfifo_alloc_init(&pn->server_resp_alloc, &pn->server_resp_chunk_alloc); cfifo_alloc_init(&pn->client_req_alloc, &pn->client_req_chunk_alloc); cfifo_alloc_init(&pn->client_cb_alloc, &pn->client_cb_chunk_alloc); + pn->is_stop_ = false; } if (0 != err && NULL != pn) { pn_destroy(pn); @@ -396,6 +397,24 @@ PN_API int pn_send(uint64_t gtid, struct sockaddr_in* addr, const char* buf, int return err; } +PN_API void pn_stop(uint64_t gid) +{ + pn_grp_t *pgrp = locate_grp(gid); + for (int tid = 0; tid < pgrp->count; tid++) { + pn_t *pn = get_pn_for_send(pgrp, tid); + ATOMIC_STORE(&pn->is_stop_, true); + } +} + +PN_API void pn_wait(uint64_t gid) +{ + pn_grp_t *pgrp = locate_grp(gid); + for (int tid = 0; tid < pgrp->count; tid++) { + pn_t *pn = get_pn_for_send(pgrp, tid); + pthread_join(pn->pd, NULL); + } +} + typedef struct pn_resp_ctx_t { pn_t* pn; diff --git a/deps/oblib/src/rpc/pnio/interface/group.h b/deps/oblib/src/rpc/pnio/interface/group.h index f2ab9c9fbe..3dd4f2d3c3 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.h +++ b/deps/oblib/src/rpc/pnio/interface/group.h @@ -38,6 +38,7 @@ typedef struct pn_grp_comm_t } pn_grp_comm_t; #define PN_COMM \ + bool is_stop_; \ pthread_t pd; \ int accept_qfd; \ int gid; \ @@ -62,6 +63,8 @@ PN_API int pn_ratelimit(int grp_id, int64_t value); PN_API int64_t pn_get_ratelimit(int grp_id); PN_API uint64_t pn_get_rxbytes(int grp_id); PN_API int dispatch_accept_fd_to_certain_group(int fd, uint64_t gid); +PN_API void pn_stop(uint64_t gid); +PN_API void pn_wait(uint64_t gid); extern int64_t pnio_keepalive_timeout; pn_comm_t* get_current_pnio(); diff --git a/deps/oblib/src/rpc/pnio/io/eloop.c b/deps/oblib/src/rpc/pnio/io/eloop.c index 780c631535..6b312e076b 100644 --- a/deps/oblib/src/rpc/pnio/io/eloop.c +++ b/deps/oblib/src/rpc/pnio/io/eloop.c @@ -102,7 +102,8 @@ int eloop_thread_run(eloop_t** udata) { } int eloop_run(eloop_t* ep) { - while(true) { + pn_comm_t* pn = get_current_pnio(); + while(!ATOMIC_LOAD(&pn->is_stop_)) { int64_t epoll_timeout = 1000; ob_update_loop_ts(); if (ep->ready_link.next != &ep->ready_link) { @@ -116,7 +117,6 @@ int eloop_run(eloop_t* ep) { } PNIO_DELAY_WARN(eloop_delay_warn(start_us, ELOOP_WARN_US)); - pn_comm_t* pn = get_current_pnio(); if (unlikely(NULL != pn && 0 == pn->tid && PNIO_REACH_TIME_INTERVAL(1000000))) { static __thread uint64_t last_rx_bytes = 0; static __thread uint64_t last_time = 0; diff --git a/src/observer/main.cpp b/src/observer/main.cpp index 7b80bdb811..17d6fe18ca 100644 --- a/src/observer/main.cpp +++ b/src/observer/main.cpp @@ -396,6 +396,39 @@ static int check_uid_before_start(const char *dir_path) return ret; } + +static void print_all_thread(const char* desc) +{ + MPRINT("============= [%s]begin to show unstopped thread =============", desc); + DIR *dir = opendir("/proc/self/task"); + if (dir == NULL) { + MPRINT("fail to print all thread"); + } else { + struct dirent *entry; + while ((entry = readdir(dir)) != NULL) { + char *tid = entry->d_name; + if (tid[0] == '.') + continue; // pass . and .. + char path[256]; + sprintf(path, "/proc/self/task/%s/comm", tid); + FILE *file = fopen(path, "r"); + if (file == NULL) { + MPRINT("fail to print thread tid: %s", tid); + } + char name[256]; + fgets(name, 256, file); + size_t len = strlen(name); + if (len > 0 && name[len - 1] == '\n') { + name[len - 1] = '\0'; + } + MPRINT("tid: %s, name: %s", tid, name); + fclose(file); + } + } + closedir(dir); + MPRINT("============= [%s]finish to show unstopped thread =============", desc); +} + extern "C" { typedef void *(*reasy_pool_realloc_pt)(void *ptr, size_t size); void reasy_pool_set_allocator(reasy_pool_realloc_pt alloc); @@ -551,6 +584,7 @@ int main(int argc, char *argv[]) } else if (OB_FAIL(observer.wait())) { LOG_ERROR("observer wait fail", K(ret)); } + print_all_thread("BEFORE_DESTORY"); observer.destroy(); ObKVGlobalCache::get_instance().destroy(); ObVirtualTenantManager::get_instance().destroy(); @@ -563,5 +597,6 @@ int main(int argc, char *argv[]) OB_LOGGER.set_stop_append_log(); OB_LOGGER.set_enable_async_log(false); OB_LOGGER.set_enable_log_limit(false); + print_all_thread("AFTER_DESTORY"); return ret; } diff --git a/src/observer/ob_server.cpp b/src/observer/ob_server.cpp index 773dc4f521..19ff1a604e 100644 --- a/src/observer/ob_server.cpp +++ b/src/observer/ob_server.cpp @@ -1200,6 +1200,10 @@ int ObServer::stop() } + FLOG_INFO("begin to stop global_poc_server"); + obrpc::global_poc_server.stop(); + FLOG_INFO("stop global_poc_server success"); + has_stopped_ = true; FLOG_INFO("[OBSERVER_NOTICE] stop observer end", KR(ret)); if (OB_SUCCESS != fail_ret) { @@ -1390,6 +1394,11 @@ int ObServer::wait() LOG_DBA_ERROR(OB_ERR_OBSERVER_STOP, "msg", "observer wait() has failure", KR(fail_ret)); } } + + FLOG_INFO("begin to wait global_poc_server"); + obrpc::global_poc_server.wait(); + FLOG_INFO("wait global_poc_server success"); + return ret; }