fix thread leak when _ob_enable_dynaimc_worker=false

This commit is contained in:
nroskill
2023-05-25 03:23:23 +00:00
committed by ob-robot
parent 8ebf5ba3da
commit f22b9232d8

View File

@ -369,17 +369,16 @@ void ObResourceGroup::check_worker_count()
ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_->id())); ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_->id()));
enable_dynamic_worker = tenant_config.is_valid() ? tenant_config->_ob_enable_dynamic_worker : true; enable_dynamic_worker = tenant_config.is_valid() ? tenant_config->_ob_enable_dynamic_worker : true;
} }
if (OB_LIKELY(enable_dynamic_worker)) { DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
DLIST_FOREACH_REMOVESAFE(wnode, workers_) { const auto w = static_cast<ObThWorker*>(wnode->get_data());
const auto w = static_cast<ObThWorker*>(wnode->get_data()); if (w->has_set_stop()) {
if (w->has_set_stop()) { workers_.remove(wnode);
workers_.remove(wnode); destroy_worker(w);
destroy_worker(w); } else if (w->has_req_flag()
} else if (w->has_req_flag() && w->is_blocking()
&& w->is_blocking() && w->is_default_worker()
&& w->is_default_worker()) { && enable_dynamic_worker) {
++token; ++token;
}
} }
} }
token = std::max(token, min_worker_cnt()); token = std::max(token, min_worker_cnt());
@ -395,6 +394,7 @@ void ObResourceGroup::check_worker_count()
&& ObMallocAllocator::get_instance()->get_tenant_remain(tenant_->id()) > ObMallocAllocator::get_instance()->get_tenant_limit(tenant_->id()) * 0.05) { && ObMallocAllocator::get_instance()->get_tenant_remain(tenant_->id()) > ObMallocAllocator::get_instance()->get_tenant_limit(tenant_->id()) * 0.05) {
acquire_more_worker(1, succ_num); acquire_more_worker(1, succ_num);
token_change_ts_ = now; token_change_ts_ = now;
LOG_INFO("worker thread created", K(tenant_->id()), K(token_cnt_), K(token));
} }
token_cnt_ = token; token_cnt_ = token;
IGNORE_RETURN workers_lock_.unlock(); IGNORE_RETURN workers_lock_.unlock();
@ -416,6 +416,7 @@ void ObResourceGroup::check_worker_count(ObThWorker &w)
&& OB_FAIL(cgroup_ctrl_->remove_self_from_cgroup(tenant_->id()))) { && OB_FAIL(cgroup_ctrl_->remove_self_from_cgroup(tenant_->id()))) {
LOG_WARN("remove thread from cgroup failed", K(ret), "tenant:", tenant_->id(), K_(group_id)); LOG_WARN("remove thread from cgroup failed", K(ret), "tenant:", tenant_->id(), K_(group_id));
} }
LOG_INFO("worker thread exit", K(tenant_->id()), K(token_cnt_), K(workers_.get_size()));
} }
IGNORE_RETURN workers_lock_.unlock(); IGNORE_RETURN workers_lock_.unlock();
} }
@ -1355,18 +1356,17 @@ void ObTenant::check_worker_count()
ObTenantConfigGuard tenant_config(TENANT_CONF(id_)); ObTenantConfigGuard tenant_config(TENANT_CONF(id_));
enable_dynamic_worker = tenant_config.is_valid() ? tenant_config->_ob_enable_dynamic_worker : true; enable_dynamic_worker = tenant_config.is_valid() ? tenant_config->_ob_enable_dynamic_worker : true;
} }
if (OB_LIKELY(enable_dynamic_worker)) { // assume that high priority and normal priority were busy.
// assume that high priority and normal priority were busy. DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
DLIST_FOREACH_REMOVESAFE(wnode, workers_) { const auto w = static_cast<ObThWorker*>(wnode->get_data());
const auto w = static_cast<ObThWorker*>(wnode->get_data()); if (w->has_set_stop()) {
if (w->has_set_stop()) { workers_.remove(wnode);
workers_.remove(wnode); destroy_worker(w);
destroy_worker(w); } else if (w->has_req_flag()
} else if (w->has_req_flag() && w->is_blocking()
&& w->is_blocking() && w->is_default_worker()
&& w->is_default_worker()) { && enable_dynamic_worker) {
++token; ++token;
}
} }
} }
token = std::max(token, min_worker_cnt()); token = std::max(token, min_worker_cnt());
@ -1382,6 +1382,7 @@ void ObTenant::check_worker_count()
&& ObMallocAllocator::get_instance()->get_tenant_remain(id_) > ObMallocAllocator::get_instance()->get_tenant_limit(id_) * 0.05) { && ObMallocAllocator::get_instance()->get_tenant_remain(id_) > ObMallocAllocator::get_instance()->get_tenant_limit(id_) * 0.05) {
acquire_more_worker(1, succ_num); acquire_more_worker(1, succ_num);
token_change_ts_ = now; token_change_ts_ = now;
LOG_INFO("worker thread created", K(id_), K(token_cnt_), K(token));
} }
token_cnt_ = token; token_cnt_ = token;
IGNORE_RETURN workers_lock_.unlock(); IGNORE_RETURN workers_lock_.unlock();
@ -1417,6 +1418,7 @@ void ObTenant::check_worker_count(ObThWorker &w)
if (cgroup_ctrl_.is_valid() && OB_FAIL(cgroup_ctrl_.remove_self_from_cgroup(id_))) { if (cgroup_ctrl_.is_valid() && OB_FAIL(cgroup_ctrl_.remove_self_from_cgroup(id_))) {
LOG_WARN("remove thread from cgroup failed", K(ret), K_(id)); LOG_WARN("remove thread from cgroup failed", K(ret), K_(id));
} }
LOG_INFO("worker thread exit", K(id_), K(token_cnt_), K(workers_.get_size()));
} }
IGNORE_RETURN workers_lock_.unlock(); IGNORE_RETURN workers_lock_.unlock();
} }