fix: worker maybe create after tenant has set stop and once group worker has cleared
This commit is contained in:
parent
b555d3dbf6
commit
8c8510fa1b
@ -291,30 +291,35 @@ void ObResourceGroup::calibrate_token_count()
|
||||
const auto current_time = ObTimeUtility::current_time();
|
||||
if (current_time - last_calibrate_token_ts_ > CALIBRATE_TOKEN_INTERVAL &&
|
||||
OB_SUCC(workers_lock_.trylock())) {
|
||||
int64_t wait_worker = 0;
|
||||
int64_t active_workers = 0;
|
||||
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
|
||||
const auto w = static_cast<ObThWorker*>(wnode->get_data());
|
||||
if (w->is_active()) {
|
||||
active_workers++;
|
||||
if (!w->has_req_flag()) {
|
||||
wait_worker++;
|
||||
if (has_stop_) {
|
||||
// do nothing
|
||||
} else {
|
||||
int64_t wait_worker = 0;
|
||||
int64_t active_workers = 0;
|
||||
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
|
||||
const auto w = static_cast<ObThWorker*>(wnode->get_data());
|
||||
if (w->is_active()) {
|
||||
active_workers++;
|
||||
if (!w->has_req_flag()) {
|
||||
wait_worker++;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (static_cast<int64_t>(ceil(tenant_->unit_min_cpu())) != min_token_cnt_) { // If the user manually adjusts the tenant specifications, the dynamic token adjustment alone cannot respond quickly, and it needs to be adjusted forcibly
|
||||
set_token_cnt(static_cast<int64_t>(ceil(tenant_->unit_min_cpu())));
|
||||
set_min_token_cnt(token_cnt_);
|
||||
}
|
||||
if (last_pop_req_cnt_ != 0 && pop_req_cnt_ == last_pop_req_cnt_
|
||||
&& token_cnt_ == ass_token_cnt_) {
|
||||
set_token_cnt(min(token_cnt_ + 1, max_token_cnt_));
|
||||
}
|
||||
if (wait_worker > active_workers / 2) {
|
||||
set_token_cnt(max(token_cnt_ - 1, min_token_cnt_));
|
||||
}
|
||||
last_calibrate_token_ts_ = current_time;
|
||||
last_pop_req_cnt_ = pop_req_cnt_;
|
||||
}
|
||||
if (static_cast<int64_t>(ceil(tenant_->unit_min_cpu())) != min_token_cnt_) { // If the user manually adjusts the tenant specifications, the dynamic token adjustment alone cannot respond quickly, and it needs to be adjusted forcibly
|
||||
set_token_cnt(static_cast<int64_t>(ceil(tenant_->unit_min_cpu())));
|
||||
set_min_token_cnt(token_cnt_);
|
||||
}
|
||||
if (last_pop_req_cnt_ != 0 && pop_req_cnt_ == last_pop_req_cnt_
|
||||
&& token_cnt_ == ass_token_cnt_) {
|
||||
set_token_cnt(min(token_cnt_ + 1, max_token_cnt_));
|
||||
}
|
||||
if (wait_worker > active_workers / 2) {
|
||||
set_token_cnt(max(token_cnt_ - 1, min_token_cnt_));
|
||||
}
|
||||
last_calibrate_token_ts_ = current_time;
|
||||
last_pop_req_cnt_ = pop_req_cnt_;
|
||||
|
||||
IGNORE_RETURN workers_lock_.unlock();
|
||||
}
|
||||
}
|
||||
@ -323,30 +328,34 @@ void ObResourceGroup::check_worker_count()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_SUCC(workers_lock_.trylock())) {
|
||||
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
|
||||
const auto w = static_cast<ObThWorker*>(wnode->get_data());
|
||||
const auto active_inactive_ts = w->get_active_inactive_ts();
|
||||
const auto sojourn_time = ObTimeUtility::current_time() - active_inactive_ts;
|
||||
if (w->is_active()) {
|
||||
//w->set_tidx(active_workers);
|
||||
} else if (w->is_waiting_active() &&
|
||||
sojourn_time > PRESERVE_INACTIVE_WORKER_TIME) {
|
||||
if (has_stop_) {
|
||||
// do nothing
|
||||
} else {
|
||||
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
|
||||
const auto w = static_cast<ObThWorker*>(wnode->get_data());
|
||||
const auto active_inactive_ts = w->get_active_inactive_ts();
|
||||
const auto sojourn_time = ObTimeUtility::current_time() - active_inactive_ts;
|
||||
if (sojourn_time > PRESERVE_INACTIVE_WORKER_TIME) {
|
||||
workers_.remove(wnode);
|
||||
w->reset();
|
||||
worker_pool_->free(w);
|
||||
if (w->is_active()) {
|
||||
//w->set_tidx(active_workers);
|
||||
} else if (w->is_waiting_active() &&
|
||||
sojourn_time > PRESERVE_INACTIVE_WORKER_TIME) {
|
||||
const auto active_inactive_ts = w->get_active_inactive_ts();
|
||||
const auto sojourn_time = ObTimeUtility::current_time() - active_inactive_ts;
|
||||
if (sojourn_time > PRESERVE_INACTIVE_WORKER_TIME) {
|
||||
workers_.remove(wnode);
|
||||
w->reset();
|
||||
worker_pool_->free(w);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
const auto diff = token_cnt_ - ass_token_cnt_;
|
||||
if (diff > 0) {
|
||||
int64_t succ_num = 0L;
|
||||
acquire_more_worker(diff, succ_num);
|
||||
ass_token_cnt_ += succ_num;
|
||||
} else if (diff < 0) {
|
||||
//ret = OB_NEED_WAIT;
|
||||
const auto diff = token_cnt_ - ass_token_cnt_;
|
||||
if (diff > 0) {
|
||||
int64_t succ_num = 0L;
|
||||
acquire_more_worker(diff, succ_num);
|
||||
ass_token_cnt_ += succ_num;
|
||||
} else if (diff < 0) {
|
||||
//ret = OB_NEED_WAIT;
|
||||
}
|
||||
}
|
||||
IGNORE_RETURN workers_lock_.unlock();
|
||||
}
|
||||
@ -359,7 +368,9 @@ void ObResourceGroup::check_worker_count(ObThWorker &w)
|
||||
OB_SUCC(workers_lock_.trylock())) {
|
||||
const auto diff = token_cnt_ - ass_token_cnt_;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (diff > 0) {
|
||||
if (has_stop_) {
|
||||
// do nothing
|
||||
} else if (diff > 0) {
|
||||
int64_t succ_num = 0L;
|
||||
acquire_more_worker(diff, succ_num);
|
||||
ass_token_cnt_ += succ_num;
|
||||
@ -378,29 +389,28 @@ void ObResourceGroup::check_worker_count(ObThWorker &w)
|
||||
int ObResourceGroup::clear_worker()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMutexGuard guard(workers_lock_);
|
||||
while (workers_.get_size() > 0) {
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_SUCC(workers_lock_.trylock())) {
|
||||
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
|
||||
const auto w = static_cast<ObThWorker*>(wnode->get_data());
|
||||
w->set_inactive();
|
||||
if (w->is_waiting_active()) {
|
||||
w->reset();
|
||||
workers_.remove(wnode);
|
||||
worker_pool_->free(w);
|
||||
}
|
||||
}
|
||||
IGNORE_RETURN workers_lock_.unlock();
|
||||
if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
|
||||
LOG_INFO(
|
||||
"Tenant has some group workers need stop",
|
||||
K(tenant_->id()),
|
||||
"group workers", workers_.get_size(),
|
||||
"group type", get_group_id());
|
||||
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
|
||||
const auto w = static_cast<ObThWorker*>(wnode->get_data());
|
||||
w->set_inactive();
|
||||
if (w->is_waiting_active()) {
|
||||
w->reset();
|
||||
workers_.remove(wnode);
|
||||
worker_pool_->free(w);
|
||||
}
|
||||
}
|
||||
if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) {
|
||||
LOG_INFO(
|
||||
"Tenant has some group workers need stop",
|
||||
K(tenant_->id()),
|
||||
"group workers", workers_.get_size(),
|
||||
"group type", get_group_id());
|
||||
}
|
||||
ob_usleep(10L * 1000L);
|
||||
}
|
||||
has_stop_ = true;
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -261,7 +261,8 @@ public:
|
||||
last_calibrate_token_ts_(0),
|
||||
tenant_(tenant),
|
||||
worker_pool_(worker_pool),
|
||||
cgroup_ctrl_(cgroup_ctrl)
|
||||
cgroup_ctrl_(cgroup_ctrl),
|
||||
has_stop_(false)
|
||||
{
|
||||
}
|
||||
~ObResourceGroup() {}
|
||||
@ -313,6 +314,7 @@ private:
|
||||
ObTenant *tenant_;
|
||||
ObWorkerPool *worker_pool_;
|
||||
share::ObCgroupCtrl *cgroup_ctrl_;
|
||||
bool has_stop_;
|
||||
};
|
||||
|
||||
typedef common::FixedHash2<ObResourceGroupNode> GroupHash;
|
||||
|
Loading…
x
Reference in New Issue
Block a user