diff --git a/deps/oblib/src/common/ob_clock_generator.cpp b/deps/oblib/src/common/ob_clock_generator.cpp index 124be9fd2b..bd70b4409d 100644 --- a/deps/oblib/src/common/ob_clock_generator.cpp +++ b/deps/oblib/src/common/ob_clock_generator.cpp @@ -28,6 +28,11 @@ namespace common ObClockGenerator ObClockGenerator::clock_generator_; +ObClockGenerator &ObClockGenerator::get_instance() +{ + return clock_generator_; +} + int ObClockGenerator::init() { int ret = OB_SUCCESS; @@ -44,6 +49,7 @@ int ObClockGenerator::init() clock_generator_.cur_ts_ = clock_generator_.get_us(); clock_generator_.last_used_time_ = clock_generator_.get_us(); clock_generator_.inited_ = true; + clock_generator_.stopped_ = false; clock_generator_.ready_ = true; TRANS_LOG(INFO, "clock generator inited success"); } @@ -52,13 +58,27 @@ int ObClockGenerator::init() return ret; } +void ObClockGenerator::stop() +{ + if (inited_) { + stopped_ = true; + lib::ThreadPool::stop(); + } +} +void ObClockGenerator::wait() +{ + if (inited_) { + lib::ThreadPool::wait(); + } +} + void ObClockGenerator::destroy(void) { if (clock_generator_.inited_) { - clock_generator_.inited_ = false; clock_generator_.stop(); clock_generator_.wait(); - clock_generator_.destroy(); + clock_generator_.lib::ThreadPool::destroy(); + clock_generator_.inited_ = false; // Global variables and thread local variables when printing logs disable_logging_ // Uncertain release order may lead to core dump // TRANS_LOG(INFO, "clock generator destroyed"); @@ -76,7 +96,7 @@ void ObClockGenerator::run1() while (!ready_) { ob_usleep(SLEEP_US); } - while (inited_) { + while (!stopped_) { int64_t retry = 0; int64_t cur_ts = 0; int64_t delta = 0; diff --git a/deps/oblib/src/common/ob_clock_generator.h b/deps/oblib/src/common/ob_clock_generator.h index cbe0ced1bd..4911f160e0 100644 --- a/deps/oblib/src/common/ob_clock_generator.h +++ b/deps/oblib/src/common/ob_clock_generator.h @@ -33,10 +33,13 @@ class ObClockGenerator { private: ObClockGenerator() - : inited_(false), ready_(false), cur_ts_(0), last_used_time_(0) + : inited_(false), stopped_(true), ready_(false), cur_ts_(0), last_used_time_(0) {} ~ObClockGenerator() { destroy(); } public: + static ObClockGenerator &get_instance(); + void stop(); + void wait(); static int init(); static void destroy(); static int64_t getClock(); @@ -52,6 +55,7 @@ private: private: bool inited_; + bool stopped_; bool ready_; int64_t cur_ts_; int64_t last_used_time_; diff --git a/deps/oblib/src/lib/oblog/ob_log.cpp b/deps/oblib/src/lib/oblog/ob_log.cpp index 4c4a44e3e1..16df5404d8 100644 --- a/deps/oblib/src/lib/oblog/ob_log.cpp +++ b/deps/oblib/src/lib/oblog/ob_log.cpp @@ -499,6 +499,7 @@ ObLogger::~ObLogger() } void ObLogger::stop() { + OB_LOGGER.set_enable_async_log(false); ObBaseLogWriter::stop(); } void ObLogger::wait() diff --git a/src/observer/main.cpp b/src/observer/main.cpp index 6e644f2c27..e74f501038 100644 --- a/src/observer/main.cpp +++ b/src/observer/main.cpp @@ -587,17 +587,12 @@ int main(int argc, char *argv[]) } print_all_thread("BEFORE_DESTROY"); observer.destroy(); - ObTaskController::get().destroy(); - ObKVGlobalCache::get_instance().destroy(); - ObClockGenerator::destroy(); - ObVirtualTenantManager::get_instance().destroy(); } curl_global_cleanup(); unlink(PID_FILE_NAME); } LOG_INFO("observer exits", "observer_version", PACKAGE_STRING); - OB_LOGGER.destroy(); print_all_thread("AFTER_DESTROY"); return ret; } diff --git a/src/observer/net/ob_ingress_bw_alloc_service.cpp b/src/observer/net/ob_ingress_bw_alloc_service.cpp index 0940be61c9..b2076a8a7a 100644 --- a/src/observer/net/ob_ingress_bw_alloc_service.cpp +++ b/src/observer/net/ob_ingress_bw_alloc_service.cpp @@ -359,6 +359,7 @@ void ObIngressBWAllocService::destroy() if (-1 != tg_id_) { TG_STOP(tg_id_); TG_WAIT(tg_id_); + TG_DESTROY(tg_id_); tg_id_ = -1; ingress_manager_.destroy(); LOG_INFO("[INGRESS_SERVICE] ObIngressBWAllocService destroy success", K(tg_id_)); diff --git a/src/observer/ob_server.cpp b/src/observer/ob_server.cpp index 7476528643..c8cfe52dfb 100644 --- a/src/observer/ob_server.cpp +++ b/src/observer/ob_server.cpp @@ -478,6 +478,15 @@ void ObServer::destroy() if (is_arbitration_mode()) { } else if (!has_destroy_ && has_stopped_) { + + FLOG_INFO("begin to destroy OB_LOGGER"); + OB_LOGGER.destroy(); + FLOG_INFO("OB_LOGGER destroyed"); + + FLOG_INFO("begin to destroy task controller"); + ObTaskController::get().destroy(); + FLOG_INFO("task controller destroyed"); + FLOG_INFO("begin destroy signal worker"); sig_worker_->destroy(); FLOG_INFO("signal worker destroyed"); @@ -676,6 +685,10 @@ void ObServer::destroy() palf::election::GLOBAL_REPORT_TIMER.destroy(); FLOG_INFO("global election report timer destroyed"); + FLOG_INFO("begin to destroy virtual tenant manager"); + ObVirtualTenantManager::get_instance().destroy(); + FLOG_INFO("virtual tenant manager destroyed"); + FLOG_INFO("begin to destroy OB_PRIMARY_STANDBY_SERVICE"); OB_PRIMARY_STANDBY_SERVICE.destroy(); FLOG_INFO("OB_PRIMARY_STANDBY_SERVICE destroyed"); @@ -684,6 +697,14 @@ void ObServer::destroy() ROOTSERVICE_EVENT_INSTANCE.destroy(); FLOG_INFO("rootservice event history destroyed"); + FLOG_INFO("begin to destroy kv global cache"); + ObKVGlobalCache::get_instance().destroy(); + FLOG_INFO("kv global cache destroyed"); + + FLOG_INFO("begin to destroy clock generator"); + ObClockGenerator::destroy(); + FLOG_INFO("clock generator destroyed"); + has_destroy_ = true; FLOG_INFO("[OBSERVER_NOTICE] destroy observer end"); @@ -1022,6 +1043,14 @@ int ObServer::stop() int fail_ret = OB_SUCCESS; FLOG_INFO("[OBSERVER_NOTICE] stop observer begin"); + FLOG_INFO("begin to stop OB_LOGGER"); + OB_LOGGER.stop(); + FLOG_INFO("stop OB_LOGGER success"); + + FLOG_INFO("begin to stop task controller"); + ObTaskController::get().stop(); + FLOG_INFO("stop task controller success"); + FLOG_INFO("begin to stop config manager"); config_mgr_.stop(); FLOG_INFO("stop config manager success"); @@ -1295,6 +1324,15 @@ int ObServer::stop() FLOG_INFO("begin to stop global election report timer"); palf::election::GLOBAL_REPORT_TIMER.stop(); FLOG_INFO("global election report timer stopped"); + + FLOG_INFO("begin to stop kv global cache"); + ObKVGlobalCache::get_instance().stop(); + FLOG_INFO("kv global cache stopped"); + + FLOG_INFO("begin to stop clock generator"); + ObClockGenerator::get_instance().stop(); + FLOG_INFO("clock generator stopped"); + } has_stopped_ = true; @@ -1335,6 +1373,14 @@ int ObServer::wait() if (is_arbitration_mode()) { } else { + FLOG_INFO("begin to wait OB_LOGGER"); + OB_LOGGER.wait(); + FLOG_INFO("wait OB_LOGGER success"); + + FLOG_INFO("begin to wait task controller"); + ObTaskController::get().wait(); + FLOG_INFO("wait task controller success"); + FLOG_INFO("begin wait signal worker"); sig_worker_->wait(); FLOG_INFO("wait signal worker success"); @@ -1535,6 +1581,14 @@ int ObServer::wait() ROOTSERVICE_EVENT_INSTANCE.wait(); FLOG_INFO("wait rootservice event history success"); + FLOG_INFO("begin to wait kv global cache"); + ObKVGlobalCache::get_instance().wait(); + FLOG_INFO("wait kv global cache success"); + + FLOG_INFO("begin to wait clock generator"); + ObClockGenerator::get_instance().wait(); + FLOG_INFO("wait clock generator success"); + gctx_.status_ = SS_STOPPED; FLOG_INFO("[OBSERVER_NOTICE] wait observer end", KR(ret)); if (OB_SUCCESS != fail_ret) { diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp index e5ceef0323..93bc6ca448 100755 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp @@ -736,19 +736,37 @@ int ObDDLScheduler::start() void ObDDLScheduler::stop() { - TG_STOP(tg_id_); - TG_STOP(lib::TGDefIDs::DDLScanTask); - TG_STOP(lib::TGDefIDs::HeartBeatCheckTask); - task_queue_.set_stop(true); - idle_stop_ = true; - is_started_ = false; + if (is_inited_) { + TG_STOP(tg_id_); + TG_STOP(lib::TGDefIDs::DDLScanTask); + TG_STOP(lib::TGDefIDs::HeartBeatCheckTask); + task_queue_.set_stop(true); + idle_stop_ = true; + is_started_ = false; + } } void ObDDLScheduler::wait() { - TG_WAIT(tg_id_); - TG_WAIT(lib::TGDefIDs::DDLScanTask); - TG_WAIT(lib::TGDefIDs::HeartBeatCheckTask); + if (is_inited_) { + TG_WAIT(tg_id_); + TG_WAIT(lib::TGDefIDs::DDLScanTask); + TG_WAIT(lib::TGDefIDs::HeartBeatCheckTask); + } +} + +void ObDDLScheduler::destroy() +{ + if (is_inited_) { + TG_DESTROY(tg_id_); + TG_DESTROY(lib::TGDefIDs::DDLScanTask); + TG_DESTROY(lib::TGDefIDs::HeartBeatCheckTask); + allocator_.destroy(); + task_queue_.destroy(); + root_service_ = nullptr; + tg_id_ = -1; + is_inited_ = false; + } } void ObDDLScheduler::run1() diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.h b/src/rootserver/ddl_task/ob_ddl_scheduler.h index 488dc63851..02989dc7a2 100755 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.h +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.h @@ -216,6 +216,7 @@ public: int start(); void stop(); void wait(); + void destroy(); virtual void run1() override; int create_ddl_task( diff --git a/src/rootserver/ob_root_service.cpp b/src/rootserver/ob_root_service.cpp index 7c7f07e62c..69cd9a5797 100755 --- a/src/rootserver/ob_root_service.cpp +++ b/src/rootserver/ob_root_service.cpp @@ -983,6 +983,9 @@ void ObRootService::destroy() dbms_job::ObDBMSJobMaster::get_instance().destroy(); FLOG_INFO("ObDBMSJobMaster destroy"); + ddl_scheduler_.destroy(); + FLOG_INFO("ddl task scheduler destroy"); + if (OB_FAIL(disaster_recovery_task_mgr_.destroy())) { FLOG_WARN("disaster recovery task mgr destroy failed", KR(ret)); diff --git a/src/share/cache/ob_kv_storecache.cpp b/src/share/cache/ob_kv_storecache.cpp index 407db0cdc6..5fdb75413b 100644 --- a/src/share/cache/ob_kv_storecache.cpp +++ b/src/share/cache/ob_kv_storecache.cpp @@ -151,7 +151,7 @@ ObKVGlobalCache::ObKVGlobalCache() map_replace_pos_(0), map_once_replace_num_(0), map_replace_skip_count_(0), - start_destory_(false), + stopped_(true), cache_wash_interval_(0) { } @@ -229,7 +229,7 @@ int ObKVGlobalCache::init( COMMON_LOG(WARN, "failed to reload wash interval", K(ret)); } else { cache_num_ = 0; - start_destory_ = false; + stopped_ = false; mem_limit_getter_ = mem_limit_getter; map_once_clean_num_ = min(MAX_MAP_ONCE_CLEAN_NUM, bucket_num / MAP_ONCE_CLEAN_RATIO); map_once_replace_num_ = min(MAX_MAP_ONCE_REPLACE_NUM, bucket_num / MAP_ONCE_REPLACE_RATIO); @@ -252,17 +252,31 @@ int ObKVGlobalCache::init( return ret; } +void ObKVGlobalCache::stop() +{ + if (inited_) { + stopped_ = true; + TG_STOP(lib::TGDefIDs::KVCacheWash); + TG_STOP(lib::TGDefIDs::KVCacheRep); + } +} + +void ObKVGlobalCache::wait() +{ + if (inited_) { + TG_WAIT(lib::TGDefIDs::KVCacheWash); + TG_WAIT(lib::TGDefIDs::KVCacheRep); + } +} + void ObKVGlobalCache::destroy() { - if (!start_destory_) { + if (inited_) { COMMON_LOG(INFO, "Begin destroy the ObKVGlobalCache!"); // should destroy store_ before timer threads exit, before some mb_handles may // cache in wash thread. - start_destory_ = true; - TG_CANCEL(lib::TGDefIDs::KVCacheWash, wash_task_); - TG_CANCEL(lib::TGDefIDs::KVCacheRep, replace_task_); - TG_WAIT(lib::TGDefIDs::KVCacheWash); - TG_WAIT(lib::TGDefIDs::KVCacheRep); + stop(); + wait(); ws_mgr_.destroy(); map_.destroy(); store_.destroy(); @@ -702,7 +716,7 @@ int ObKVGlobalCache::set_priority(const int64_t cache_id, const int64_t priority void ObKVGlobalCache::wash() { - if (OB_LIKELY(inited_ && !start_destory_)) { + if (OB_LIKELY(inited_ && !stopped_)) { DEBUG_SYNC(BEFORE_BACKGROUND_WASH); static int64_t wash_count = 0; if (store_.wash() || (++wash_count >= MAP_WASH_CLEAN_INTERNAL)) { @@ -714,7 +728,7 @@ void ObKVGlobalCache::wash() void ObKVGlobalCache::replace_map() { - if (inited_ && !start_destory_) { + if (inited_ && !stopped_) { int ret = OB_SUCCESS; int64_t replace_node_count = 0; if (map_replace_skip_count_ <= 0) { diff --git a/src/share/cache/ob_kv_storecache.h b/src/share/cache/ob_kv_storecache.h index ffa8c8750e..838763f21a 100644 --- a/src/share/cache/ob_kv_storecache.h +++ b/src/share/cache/ob_kv_storecache.h @@ -134,6 +134,8 @@ public: const int64_t max_cache_size = DEFAULT_MAX_CACHE_SIZE, const int64_t block_size = lib::ACHUNK_SIZE, const int64_t cache_wash_interval = 0); + void stop(); + void wait(); void destroy(); void reload_priority(); int reload_wash_interval(); @@ -303,7 +305,7 @@ private: int64_t map_once_replace_num_; int64_t map_replace_skip_count_; KVMapReplaceTask replace_task_; - bool start_destory_; + bool stopped_; int64_t cache_wash_interval_; }; diff --git a/src/share/ob_task_define.cpp b/src/share/ob_task_define.cpp index df5901e400..ae49ad71c3 100644 --- a/src/share/ob_task_define.cpp +++ b/src/share/ob_task_define.cpp @@ -233,7 +233,6 @@ int ObTaskController::init() void ObTaskController::stop() { - OB_LOGGER.set_enable_log_limit(false); ObSyslogPerErrLimiter::instance().stop(); }