add destroy thread gracefully

This commit is contained in:
zhjc1124 2023-06-08 07:42:27 +00:00 committed by ob-robot
parent 340151ccc9
commit f4b390d643
25 changed files with 203 additions and 39 deletions

View File

@ -1,5 +1,9 @@
int64_t ob_update_loop_ts();
int ussl_is_stop()
{
return ATOMIC_LOAD(&ussl_is_stopped);
}
struct epoll_event *ussl_make_epoll_event(struct epoll_event *event, uint32_t event_flag, void *val)
{
event->events = event_flag;
@ -80,11 +84,12 @@ static void ussl_eloop_handle_sock_event(ussl_sock_t *s)
int ussl_eloop_run(ussl_eloop_t *ep)
{
while (1) {
while (!ussl_is_stop()) {
ob_update_loop_ts();
ussl_eloop_refire(ep);
ussl_dlink_for(&ep->ready_link, p) { ussl_eloop_handle_sock_event(ussl_structof(p, ussl_sock_t, ready_link)); }
check_and_handle_timeout_event();
}
close(ep->fd);
return 0;
}

View File

@ -7,6 +7,7 @@ typedef struct ussl_eloop_t
ussl_dlink_t ready_link;
} ussl_eloop_t;
extern int ussl_is_stopped;
extern int ussl_eloop_init(ussl_eloop_t *ep);
extern int ussl_eloop_run(ussl_eloop_t *ep);
extern int ussl_eloop_regist(ussl_eloop_t *ep, ussl_sock_t *s, uint32_t eflag);

View File

@ -127,7 +127,7 @@ int ussl_setsockopt(int socket, int level, int optname, const void *optval, sock
{
int ret = 0;
if (ATOMIC_BCAS(&is_ussl_bg_thread_started, 0, 1)) {
ret = init_bg_thread();
ret = ussl_init_bg_thread();
if (0 != ret) {
ussl_log_error("start ussl-bk-thread failed!, ret:%d", ret);
ATOMIC_STORE(&is_ussl_bg_thread_started, 0);
@ -177,6 +177,15 @@ int ussl_setsockopt(int socket, int level, int optname, const void *optval, sock
return ret;
}
void ussl_stop()
{
ATOMIC_STORE(&ussl_is_stopped, 1);
}
void ussl_wait()
{
ussl_wait_bg_thread();
}
int ussl_listen(int socket, int backlog)
{
int ret = 0;

View File

@ -39,6 +39,10 @@ enum CtxLevelOptName {
SO_OB_CTX_SET_SSL_CONFIG,
};
static int ussl_is_stopped = 0;
void ussl_stop();
void ussl_wait();
int ussl_setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);
int ussl_listen(int fd, int n);
int ussl_connect(int fd, const struct sockaddr *addr, socklen_t len);

View File

@ -76,7 +76,7 @@ static void *bg_thread_func(void *arg)
#define F_SETPIPE_SZ 1031
#endif
int init_bg_thread()
int ussl_init_bg_thread()
{
int ret = 0;
static const int pipe_resize = 128 * 1024;
@ -100,6 +100,11 @@ int init_bg_thread()
return ret;
}
void ussl_wait_bg_thread()
{
pthread_join(ussl_bg_thread_id, NULL);
}
void add_to_timeout_list(ussl_dlink_t *l)
{
ussl_dlink_insert(&global_ussl_loop_struct.timeout_list, l);

View File

@ -11,5 +11,6 @@ int __attribute__((weak)) dispatch_accept_fd_to_certain_group(int fd, uint64_t
extern void add_to_timeout_list(ussl_dlink_t *l);
extern void remove_from_timeout_list(ussl_dlink_t *l);
extern void check_and_handle_timeout_event();
extern int init_bg_thread();
extern int ussl_init_bg_thread();
extern void ussl_wait_bg_thread();
#endif // USSL_HOOK_LOOP_USSL_LOOP_

View File

@ -345,7 +345,7 @@ int ObSimpleLogServer::simple_close(const bool is_shutdown = false)
net_.destroy();
timer_handle_.stop_and_wait();
timer_.stop_and_wait();
timer_.destroy();
}
SERVER_LOG(INFO, "stop LogService success", K(ret), K(is_shutdown), K(guard));
return ret;

View File

@ -169,6 +169,7 @@ void ObArchiveService::destroy()
ls_meta_recorder_.destroy();
timer_.destroy();
allocator_.destroy();
ObThreadPool::destroy();
log_service_ = NULL;
ls_svr_ = NULL;
}

View File

@ -64,8 +64,8 @@ struct AllLsElectionReferenceInfoFactory
void ObLeaderCoordinator::destroy()
{
LC_TIME_GUARD(1_s);
recovery_detect_timer_.stop_and_wait();
failure_detect_timer_.stop_and_wait();
recovery_detect_timer_.destroy();
failure_detect_timer_.destroy();
AllLsElectionReferenceInfoFactory::delete_obj(all_ls_election_reference_info_);
all_ls_election_reference_info_ = NULL;
COORDINATOR_LOG(INFO, "ObLeaderCoordinator mtl destroy");

View File

@ -327,6 +327,7 @@ void PalfEnvImpl::destroy()
fetch_log_engine_.destroy();
log_updater_.destroy();
log_rpc_.destroy();
election_timer_.destroy();
log_alloc_mgr_ = NULL;
monitor_ = NULL;
self_.reset();

View File

@ -589,6 +589,7 @@ int main(int argc, char *argv[])
observer.destroy();
ObTaskController::get().destroy();
ObKVGlobalCache::get_instance().destroy();
ObClockGenerator::destroy();
ObVirtualTenantManager::get_instance().destroy();
}
curl_global_cleanup();

View File

@ -113,6 +113,9 @@ using namespace oceanbase::blocksstable;
using namespace oceanbase::transaction;
using namespace oceanbase::logservice;
extern "C" void ussl_stop();
extern "C" void ussl_wait();
namespace oceanbase
{
namespace obrpc
@ -488,10 +491,6 @@ void ObServer::destroy()
signal_handle_->destroy();
FLOG_INFO("signal handle destroyed");
FLOG_INFO("active session history task destroyed");
ObClockGenerator::destroy();
FLOG_INFO("clock generator destroyed");
FLOG_INFO("opt stat manager destroyed");
ObOptStatManager::get_instance().destroy();
FLOG_INFO("opt stat manager destroyed");
@ -516,6 +515,10 @@ void ObServer::destroy()
ObBGThreadMonitor::get_instance().destroy();
FLOG_INFO("background thread monitor destroyed");
FLOG_INFO("begin to destroy thread hung detector");
common::occam::ObThreadHungDetector::get_instance().destroy();
FLOG_INFO("thread hung detector destroyed");
FLOG_INFO("begin to destroy table store stat mgr");
ObTableStoreStatMgr::get_instance().destroy();
FLOG_INFO("table store stat mgr destroyed");
@ -685,9 +688,9 @@ void ObServer::destroy()
ObServerBlacklist::get_instance().destroy();
FLOG_INFO("server blacklist destroy");
FLOG_INFO("begin to destroy election global timer");
palf::election::GLOBAL_REPORT_TIMER.~ObOccamTimer();
FLOG_INFO("election global timer destroy");
FLOG_INFO("begin to destroy global election report timer");
palf::election::GLOBAL_REPORT_TIMER.destroy();
FLOG_INFO("global election report timer destroyed");
FLOG_INFO("begin to destroy OB_PRIMARY_STANDBY_SERVICE");
OB_PRIMARY_STANDBY_SERVICE.destroy();
@ -1136,6 +1139,10 @@ int ObServer::stop()
ObBGThreadMonitor::get_instance().stop();
FLOG_INFO("bgthread monitor stopped");
FLOG_INFO("begin to stop thread hung detector");
common::occam::ObThreadHungDetector::get_instance().stop();
FLOG_INFO("thread hung detector stopped");
FLOG_INFO("begin to stop timer");
TG_STOP(lib::TGDefIDs::ServerGTimer);
FLOG_INFO("timer stopped");
@ -1297,6 +1304,10 @@ int ObServer::stop()
FLOG_INFO("net frame stopped");
}
FLOG_INFO("begin to stop ussl");
ussl_stop();
FLOG_INFO("stop ussl success");
FLOG_INFO("begin to stop global_poc_server");
obrpc::global_poc_server.stop();
FLOG_INFO("stop global_poc_server success");
@ -1305,10 +1316,11 @@ int ObServer::stop()
ROOTSERVICE_EVENT_INSTANCE.stop();
FLOG_INFO("rootservice event history stopped");
FLOG_INFO("begin to stop global election report timer");
palf::election::GLOBAL_REPORT_TIMER.stop();
FLOG_INFO("global election report timer stopped");
}
has_stopped_ = true;
FLOG_INFO("[OBSERVER_NOTICE] stop observer end", KR(ret));
if (OB_SUCCESS != fail_ret) {
@ -1388,6 +1400,10 @@ int ObServer::wait()
ObBGThreadMonitor::get_instance().wait();
FLOG_INFO("wait bg thread monitor success");
FLOG_INFO("begin to wait thread hung detector");
common::occam::ObThreadHungDetector::get_instance().wait();
FLOG_INFO("wait thread hung detector success");
#ifdef ENABLE_IMC
FLOG_INFO("begin to wait imc tasks");
imc_tasks_.wait();
@ -1538,11 +1554,15 @@ int ObServer::wait()
ObServerCheckpointSlogHandler::get_instance().wait();
FLOG_INFO("wait server checkpoint slog handler success");
FLOG_INFO("set gctx status stopped");
palf::election::GLOBAL_REPORT_TIMER.stop_and_wait();
FLOG_INFO("wait global election report timer stopped done");
FLOG_INFO("begin to wait global election report timer");
palf::election::GLOBAL_REPORT_TIMER.wait();
FLOG_INFO("wait global election report timer success");
FLOG_INFO("begin to wait ussl");
ussl_wait();
FLOG_INFO("wait ussl success");
FLOG_INFO("begin to wait global_poc_server");
obrpc::global_poc_server.wait();
FLOG_INFO("wait global_poc_server success");

View File

@ -127,7 +127,7 @@ void ObArchiveSchedulerService::stop()
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("not init", K(ret));
LOG_WARN("not init", K(ret));
} else {
ObRsReentrantThread::stop();
idling_.wakeup();
@ -135,6 +135,34 @@ void ObArchiveSchedulerService::stop()
}
}
void ObArchiveSchedulerService::wait()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else {
ObRsReentrantThread::wait();
LOG_INFO("wait archive scheduler service");
}
}
int ObArchiveSchedulerService::destroy()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else {
if(OB_FAIL(ObRsReentrantThread::destroy())) {
LOG_WARN("ObRsReentrantThread::destroy failed", K(ret));
}
is_inited_ = false;
LOG_INFO("destroy archive scheduler service", K(ret));
}
return ret;
}
void ObArchiveSchedulerService::wakeup()
{
idling_.wakeup();

View File

@ -84,7 +84,8 @@ public:
int start() override;
void stop() override;
void wait() override;
int destroy();
void run3() override;
// force cancel archive
int force_cancel(const uint64_t tenant_id) override;

View File

@ -1270,6 +1270,8 @@ int ObRootService::stop()
FLOG_INFO("dbms sched job master stop");
TG_STOP(lib::TGDefIDs::GlobalCtxTimer);
FLOG_INFO("global ctx timer stop");
archive_service_.stop();
FLOG_INFO("archive service stop");
}
}
@ -1314,6 +1316,8 @@ void ObRootService::wait()
FLOG_INFO("rebalance task mgr exit success");
TG_WAIT(lib::TGDefIDs::GlobalCtxTimer);
FLOG_INFO("global ctx timer exit success");
archive_service_.wait();
FLOG_INFO("archive service exit success");
backup_task_scheduler_.reuse();
ObUpdateRsListTask::clear_lock();
THE_RS_JOB_TABLE.reset_max_job_id();
@ -5064,6 +5068,12 @@ int ObRootService::do_restart()
FLOG_INFO("success to start backup_lease_service_");
}
if (FAILEDx(archive_service_.start())) {
FLOG_WARN("archive service start failed", KR(ret));
} else {
FLOG_INFO("success to start archive service");
}
if (FAILEDx(disaster_recovery_task_mgr_.start())) {
FLOG_WARN("disaster recovery task manager start failed", KR(ret));

View File

@ -207,17 +207,19 @@ void ObEventHistoryTableOperator::stop()
{
stopped_ = true;
event_queue_.stop();
timer_.stop();
}
void ObEventHistoryTableOperator::wait()
{
event_queue_.wait();
timer_.wait();
}
void ObEventHistoryTableOperator::destroy()
{
event_queue_.destroy();
timer_.stop_and_wait();
timer_.destroy();
// allocator should destroy after event_queue_ destroy
inited_ = false;
}

View File

@ -107,6 +107,7 @@ public:
void stop() {
OCCAM_LOG(INFO, "occam thread marked stopped", K(this), K_(id));
ATOMIC_SET(&is_stopped_, true);
share::ObThreadPool::stop();
}
void destroy() {
if (is_inited_) {
@ -269,17 +270,34 @@ public:
}
return ret;
}
void stop()
{
if (is_inited_) {
int ret = OB_SUCCESS;
{
ObThreadCondGuard guard(cv_);
is_stopped_ = true;
}
if (OB_FAIL(cv_.broadcast())) {
OCCAM_LOG(ERROR, "cv broadcast failed", K(ret));
}
for (int64_t idx = 0; idx < thread_num_; ++idx) {
threads_[idx].stop();
}
}
}
void wait()
{
if (is_inited_) {
for (int64_t idx = 0; idx < thread_num_; ++idx) {
threads_[idx].wait();
}
}
}
void destroy()
{
int ret = OB_SUCCESS;
OCCAM_LOG(INFO, "call destroy", K(lbt()));
{
ObThreadCondGuard guard(cv_);
is_stopped_ = true;
}
if (OB_FAIL(cv_.broadcast())) {
OCCAM_LOG(ERROR, "cv broadcast failed", K(ret));
}
stop();
wait();
if (is_inited_) {
for (int64_t idx = 0; idx < thread_num_; ++idx) {
threads_[idx].destroy();

View File

@ -111,13 +111,31 @@ private:
}
~ObThreadHungDetector()
{
destroy();
}
public:
void stop()
{
if (back_thread_ != nullptr) {
back_thread_->stop();
}
}
void wait()
{
if (back_thread_ != nullptr) {
back_thread_->wait();
}
}
void destroy()
{
stop();
wait();
if (back_thread_ != nullptr) {
back_thread_->destroy();
ob_free(back_thread_);
back_thread_ = nullptr;
}
}
public:
static ObThreadHungDetector &get_instance() { static ObThreadHungDetector d; return d; }
struct ClickPoint
{

View File

@ -527,7 +527,7 @@ class ObOccamTimer
{
public:
ObOccamTimer() : total_running_task_count_(0), precision_(0), is_running_(false) {}
~ObOccamTimer() { stop_and_wait(); }
~ObOccamTimer() { destroy(); }
int init_and_start(ObOccamThreadPool &pool, const int64_t precision, const char *name)
{
TIMEGUARD_INIT(OCCAM, 100_ms);
@ -572,7 +572,7 @@ public:
}
return OB_SUCCESS;
}
void stop_and_wait()
void stop()
{
ATOMIC_STORE(&is_running_, false);
int64_t last_print_time = 0;
@ -584,6 +584,32 @@ public:
K(ATOMIC_LOAD(&total_running_task_count_)), KP(this));
}
}
if (timer_shared_ptr_.is_valid()) {
timer_shared_ptr_->stop();
}
if (thread_pool_shared_ptr_.is_valid()) {
thread_pool_shared_ptr_->stop();
}
}
void wait()
{
if (timer_shared_ptr_.is_valid()) {
timer_shared_ptr_->wait();
}
if (thread_pool_shared_ptr_.is_valid()) {
thread_pool_shared_ptr_->wait();
}
}
void destroy()
{
stop();
wait();
if (thread_pool_shared_ptr_.is_valid()) {
thread_pool_shared_ptr_->destroy();
}
if (timer_shared_ptr_.is_valid()) {
timer_shared_ptr_->destroy();
}
}
bool is_running() const { return ATOMIC_LOAD(&is_running_); };
// Returned value is ignored

View File

@ -244,6 +244,8 @@ void ObTaskController::wait()
void ObTaskController::destroy()
{
stop();
wait();
ObSyslogPerErrLimiter::instance().destroy();
for (int i = 0; i < MAX_TASK_ID; i++) {
if (nullptr != limiters_[i]) {

View File

@ -133,7 +133,7 @@ int ObMultiVersionGarbageCollector::stop()
} else {
ObTimeGuard timeguard(__func__, 1 * 1000 * 1000);
(void)timer_handle_.stop_and_wait();
(void)timer_.stop_and_wait();
timer_.stop();
last_study_timestamp_ = 0;
last_refresh_timestamp_ = 0;
last_reclaim_timestamp_ = 0;
@ -152,11 +152,13 @@ int ObMultiVersionGarbageCollector::stop()
void ObMultiVersionGarbageCollector::wait()
{
timer_.wait();
MVCC_LOG(INFO, "multi version garbage collector wait", KPC(this));
}
void ObMultiVersionGarbageCollector::destroy()
{
timer_.destroy();
MVCC_LOG(INFO, "multi version garbage collector destroy", KPC(this));
}

View File

@ -153,6 +153,12 @@ void ObTableLockService::ObOBJLockGarbageCollector::wait()
LOG_INFO("ObTableLockService::ObOBJLockGarbageCollector waits successfully", KPC(this));
}
void ObTableLockService::ObOBJLockGarbageCollector::destroy()
{
timer_.destroy();
LOG_INFO("ObTableLockService::ObOBJLockGarbageCollector destroys successfully", KPC(this));
}
int ObTableLockService::ObOBJLockGarbageCollector::garbage_collect_right_now()
{
int ret = OB_SUCCESS;
@ -328,6 +334,7 @@ void ObTableLockService::wait()
void ObTableLockService::destroy()
{
obj_lock_garbage_collector_.destroy();
location_service_ = nullptr;
sql_proxy_ = nullptr;
is_inited_ = false;

View File

@ -154,6 +154,7 @@ public:
int start();
void stop();
void wait();
void destroy();
int garbage_collect_right_now();
TO_STRING_KV(KP(this),

View File

@ -56,6 +56,7 @@ ObTenantFreezer::~ObTenantFreezer()
void ObTenantFreezer::destroy()
{
freeze_trigger_timer_.destroy();
is_freezing_tx_data_ = false;
exist_ls_freezing_ = false;
self_.reset();

View File

@ -208,7 +208,7 @@ TEST_F(TestObOccamTimer, stop_and_wait) {
int ret = OB_SUCCESS;
ret = occam_timer->schedule_task_ignore_handle_repeat_and_immediately(20_ms, [](){ return false; });
ASSERT_EQ(ret, OB_SUCCESS);
occam_timer->stop_and_wait();
occam_timer->destroy();
}
TEST_F(TestObOccamTimer, stop_repeat_task_inside_destroy_outside) {
@ -218,7 +218,7 @@ TEST_F(TestObOccamTimer, stop_repeat_task_inside_destroy_outside) {
ASSERT_EQ(ret, OB_SUCCESS);
std::this_thread::sleep_for(chrono::milliseconds(50));
handle.stop_and_wait();
occam_timer->stop_and_wait();
occam_timer->destroy();
}
TEST_F(TestObOccamTimer, stop_and_destroy_repeat_task_outside) {
@ -228,7 +228,7 @@ TEST_F(TestObOccamTimer, stop_and_destroy_repeat_task_outside) {
ASSERT_EQ(ret, OB_SUCCESS);
std::this_thread::sleep_for(chrono::milliseconds(50));
handle.stop_and_wait();
occam_timer->stop_and_wait();
occam_timer->destroy();
}
TEST_F(TestObOccamTimer, test_specify_first_delay) {
@ -238,7 +238,7 @@ TEST_F(TestObOccamTimer, test_specify_first_delay) {
ASSERT_EQ(ret, OB_SUCCESS);
std::this_thread::sleep_for(chrono::milliseconds(505));
handle.stop_and_wait();
occam_timer->stop_and_wait();
occam_timer->destroy();
}
void test_func(const char *function_name = __builtin_FUNCTION(), const char *file = __builtin_FILE(), const int64_t line = __builtin_LINE())