remove try_wait in thread

This commit is contained in:
nroskill
2023-04-06 08:26:51 +00:00
committed by ob-robot
parent 841bfb32a3
commit 2f7fae0cb4
6 changed files with 107 additions and 108 deletions

View File

@ -190,27 +190,6 @@ void Thread::dump_pth() // for debug pthread join faileds
#endif #endif
} }
int Thread::try_wait()
{
int ret = OB_SUCCESS;
if (pth_ != 0) {
int tmp = pthread_tryjoin_np(pth_, nullptr);
if (EBUSY == tmp) {
ret = OB_EAGAIN;
LOG_WARN("pthread_tryjoin_np failed", K(tmp), K(errno), K(tid_before_stop_));
} else if (0 == tmp) {
destroy_stack();
runnable_ = nullptr;
} else {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("pthread_tryjoin_np failed", K(tmp), K(errno), K(tid_before_stop_));
dump_pth();
abort();
}
}
return ret;
}
void Thread::wait() void Thread::wait()
{ {
int ret = 0; int ret = 0;

View File

@ -34,7 +34,6 @@ public:
int start(); int start();
int start(Runnable runnable); int start(Runnable runnable);
void stop(); void stop();
int try_wait();
void wait(); void wait();
void destroy(); void destroy();
void dump_pth(); void dump_pth();

View File

@ -240,19 +240,6 @@ void Threads::destroy_thread(Thread *thread)
ob_free(thread); ob_free(thread);
} }
int Threads::try_wait()
{
int ret = OB_SUCCESS;
if (threads_ != nullptr) {
for (int i = 0; i < n_threads_ && OB_SUCC(ret); i++) {
if (threads_[i] != nullptr) {
ret = threads_[i]->try_wait();
}
}
}
return ret;
}
void Threads::wait() void Threads::wait()
{ {
if (threads_ != nullptr) { if (threads_ != nullptr) {

View File

@ -93,7 +93,6 @@ public:
} }
virtual int start(); virtual int start();
virtual void stop(); virtual void stop();
virtual int try_wait();
virtual void wait(); virtual void wait();
void destroy(); void destroy();

View File

@ -417,26 +417,28 @@ void ObResourceGroup::check_worker_count(ObThWorker &w)
} }
} }
int ObResourceGroup::try_clear_worker() int ObResourceGroup::clear_worker()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObMutexGuard guard(workers_lock_); ObMutexGuard guard(workers_lock_);
if (req_queue_.size() > 0) { while (req_queue_.size() > 0) {
ret = OB_EAGAIN; ob_usleep(10L * 1000L);
} }
if (OB_FAIL(ret)) { while (workers_.get_size() > 0) {
// try next time int ret = OB_SUCCESS;
} else if (workers_.get_size() > 0) {
DLIST_FOREACH_REMOVESAFE(wnode, workers_) { DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
const auto w = static_cast<ObThWorker*>(wnode->get_data()); const auto w = static_cast<ObThWorker*>(wnode->get_data());
w->stop(); workers_.remove(wnode);
if (OB_FAIL(ret)) { destroy_worker(w);
// try next time
} else if (OB_SUCC(w->try_wait())) {
workers_.remove(wnode);
destroy_worker(w);
}
} }
if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
LOG_INFO(
"Tenant has some group workers need stop",
K(tenant_->id()),
"group workers", workers_.get_size(),
"group type", get_group_id());
}
ob_usleep(10L * 1000L);
} }
return ret; return ret;
} }
@ -470,17 +472,16 @@ int GroupMap::create_and_insert_group(int32_t group_id, ObTenant *tenant, ObCgro
return ret; return ret;
} }
int GroupMap::try_wait_group() void GroupMap::wait_group()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObResourceGroupNode* iter = NULL; ObResourceGroupNode* iter = NULL;
while (OB_NOT_NULL(iter = quick_next(iter)) && OB_SUCC(ret)) { while (nullptr != (iter = quick_next(iter))) {
ObResourceGroup *group = static_cast<ObResourceGroup*>(iter); ObResourceGroup *group = static_cast<ObResourceGroup*>(iter);
if (OB_FAIL(group->try_clear_worker())) { if (OB_FAIL(group->clear_worker())) {
// try next time LOG_ERROR("group clear worker failed", K(ret));
} }
} }
return ret;
} }
void GroupMap::destroy_group() void GroupMap::destroy_group()
@ -567,6 +568,7 @@ ObTenant::ObTenant(const int64_t id,
unit_min_cpu_(0), unit_min_cpu_(0),
token_cnt_(0), token_cnt_(0),
total_worker_cnt_(0), total_worker_cnt_(0),
gc_thread_(0),
stopped_(true), stopped_(true),
wait_mtl_finished_(false), wait_mtl_finished_(false),
req_queue_(), req_queue_(),
@ -828,61 +830,94 @@ int ObTenant::create_tenant_module()
return ret; return ret;
} }
void* ObTenant::wait(void* t)
{
int ret = OB_SUCCESS;
ObTenant* tenant = (ObTenant*)t;
ob_get_tenant_id() = tenant->id_;
lib::set_thread_name("UnitGC");
tenant->handle_retry_req(true);
while (tenant->req_queue_.size() > 0) {
ob_usleep(10L * 1000L);
}
while (tenant->workers_.get_size() > 0) {
if (OB_SUCC(tenant->workers_lock_.trylock())) {
DLIST_FOREACH_REMOVESAFE(wnode, tenant->workers_) {
const auto w = static_cast<ObThWorker*>(wnode->get_data());
tenant->workers_.remove(wnode);
destroy_worker(w);
}
IGNORE_RETURN tenant->workers_lock_.unlock();
if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
LOG_INFO(
"Tenant has some workers need stop", K_(tenant->id),
"workers", tenant->workers_.get_size(),
K_(tenant->req_queue));
}
}
ob_usleep(10L * 1000L);
}
LOG_WARN_RET(OB_SUCCESS,"start remove nesting", K(tenant->nesting_workers_.get_size()), K_(tenant->id));
while (tenant->nesting_workers_.get_size() > 0) {
int ret = OB_SUCCESS;
if (OB_SUCC(tenant->workers_lock_.trylock())) {
DLIST_FOREACH_REMOVESAFE(wnode, tenant->nesting_workers_) {
auto w = static_cast<ObThWorker*>(wnode->get_data());
tenant->nesting_workers_.remove(wnode);
destroy_worker(w);
}
IGNORE_RETURN tenant->workers_lock_.unlock();
if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
LOG_INFO(
"Tenant has some nesting workers need stop",
K_(tenant->id),
"nesting workers", tenant->nesting_workers_.get_size(),
K_(tenant->req_queue));
}
}
ob_usleep(10L * 1000L);
}
LOG_WARN_RET(OB_SUCCESS, "finish remove nesting", K(tenant->nesting_workers_.get_size()), K_(tenant->id));
LOG_WARN_RET(OB_SUCCESS, "start remove group_map", K_(tenant->id));
tenant->group_map_.wait_group();
LOG_WARN_RET(OB_SUCCESS, "finish remove group_map", K_(tenant->id));
if (!is_virtual_tenant_id(tenant->id_) && !tenant->wait_mtl_finished_) {
ObTenantSwitchGuard guard(tenant);
tenant->stop_mtl_module();
OB_PX_TARGET_MGR.delete_tenant(tenant->id_);
G_RES_MGR.get_col_mapping_rule_mgr().drop_tenant(tenant->id_);
tenant->wait_mtl_module();
tenant->wait_mtl_finished_ = true;
}
return nullptr;
}
int ObTenant::try_wait() int ObTenant::try_wait()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
handle_retry_req(true); int tmp = 0;
if (req_queue_.size() > 0) { if (-1 == gc_thread_) {
ret = OB_EAGAIN; LOG_WARN("try_wait after wait successfully", K(id_), K(wait_mtl_finished_));
} } else if (0 == gc_thread_) {
if (0 != (tmp = pthread_create(&gc_thread_, nullptr, wait, this))) {
if (OB_FAIL(ret)) { ret = OB_ERR_UNEXPECTED;
// try next time LOG_ERROR("tenant gc thread create failed", K(tmp), K(errno), K(id_));
} else if (workers_.get_size() > 0 && OB_SUCC(workers_lock_.trylock())) { } else {
DLIST_FOREACH_REMOVESAFE(wnode, workers_) { ret = OB_EAGAIN;
const auto w = static_cast<ObThWorker*>(wnode->get_data()); LOG_INFO("tenant pthread_create gc thread successfully", K(id_), K(gc_thread_));
w->stop();
if (OB_FAIL(ret)) {
// try next time
} else if (OB_SUCC(w->try_wait())) {
workers_.remove(wnode);
destroy_worker(w);
}
} }
IGNORE_RETURN workers_lock_.unlock();
}
if (OB_FAIL(ret)) {
// try next time
} else if (nesting_workers_.get_size() > 0 && OB_SUCC(workers_lock_.trylock())) {
DLIST_FOREACH_REMOVESAFE(wnode, nesting_workers_) {
auto w = static_cast<ObThWorker*>(wnode->get_data());
w->stop();
if (OB_FAIL(ret)) {
// try next time
} else if (OB_SUCC(w->try_wait())) {
nesting_workers_.remove(wnode);
destroy_worker(w);
}
}
IGNORE_RETURN workers_lock_.unlock();
}
if (OB_FAIL(ret)) {
// try next time
} else { } else {
ret = group_map_.try_wait_group(); tmp = pthread_tryjoin_np(gc_thread_, nullptr);
} if (EBUSY == tmp) {
ret = OB_EAGAIN;
if (OB_FAIL(ret)) { LOG_WARN("tenant pthread_tryjoin_np failed", K(id_));
// try next time } else if (0 == tmp) {
} else if (!is_virtual_tenant_id(id_) && !wait_mtl_finished_) { gc_thread_ = -1; // avoid try_wait again after wait success
ObTenantSwitchGuard guard(this); LOG_INFO("tenant pthread_tryjoin_np successfully", K(id_));
ObTenantBase::stop_mtl_module(); } else {
OB_PX_TARGET_MGR.delete_tenant(id_); ret = OB_ERR_UNEXPECTED;
G_RES_MGR.get_col_mapping_rule_mgr().drop_tenant(id_); LOG_ERROR("pthread_tryjoin_np failed", K(tmp), K(errno), K(id_));
ObTenantBase::wait_mtl_module(); }
wait_mtl_finished_ = true;
} }
return ret; return ret;
} }

View File

@ -295,8 +295,7 @@ public:
int acquire_more_worker(int64_t num, int64_t &succ_num); int acquire_more_worker(int64_t num, int64_t &succ_num);
void check_worker_count(); void check_worker_count();
void check_worker_count(ObThWorker &w); void check_worker_count(ObThWorker &w);
int try_clear_worker(); int clear_worker();
lib::ObMutex workers_lock_; lib::ObMutex workers_lock_;
protected: protected:
@ -323,7 +322,7 @@ public:
} }
~GroupMap() {} ~GroupMap() {}
int create_and_insert_group(int32_t group_id, ObTenant *tenant, share::ObCgroupCtrl *cgroup_ctrl, ObResourceGroup *&group); int create_and_insert_group(int32_t group_id, ObTenant *tenant, share::ObCgroupCtrl *cgroup_ctrl, ObResourceGroup *&group);
int try_wait_group(); void wait_group();
void destroy_group(); void destroy_group();
int64_t to_string(char *buf, const int64_t buf_len) const int64_t to_string(char *buf, const int64_t buf_len) const
{ {
@ -513,6 +512,7 @@ public:
return 0; return 0;
} }
private: private:
static void* wait(void* tenant);
// update CPU usage // update CPU usage
void update_token_usage(); void update_token_usage();
// acquire workers if tenant doesn't have sufficient worker. // acquire workers if tenant doesn't have sufficient worker.
@ -557,7 +557,7 @@ protected:
// workers can make progress. // workers can make progress.
int64_t token_cnt_ CACHE_ALIGNED; int64_t token_cnt_ CACHE_ALIGNED;
int64_t total_worker_cnt_ CACHE_ALIGNED; int64_t total_worker_cnt_ CACHE_ALIGNED;
pthread_t gc_thread_;
bool stopped_; bool stopped_;
bool wait_mtl_finished_; bool wait_mtl_finished_;