fix timer log & fix memleak when destroy scheduler
This commit is contained in:
8
deps/oblib/src/lib/task/ob_timer.cpp
vendored
8
deps/oblib/src/lib/task/ob_timer.cpp
vendored
@ -125,7 +125,7 @@ void ObTimer::destroy()
|
||||
ObMonitor<Mutex>::Lock guard(monitor_);
|
||||
for (int64_t i = 0; i < tasks_num_; ++i) {
|
||||
tokens_[i].task->cancelCallBack();
|
||||
ATOMIC_STORE(&(tokens_[i].task->timer_), nullptr);
|
||||
ATOMIC_STORE(&(tokens_[i].task->timer_), nullptr);
|
||||
}
|
||||
tasks_num_ = 0;
|
||||
}
|
||||
@ -261,7 +261,7 @@ int ObTimer::cancel(const ObTimerTask &task)
|
||||
}
|
||||
if (pos != -1) {
|
||||
tokens_[pos].task->cancelCallBack();
|
||||
ATOMIC_STORE(&(tokens_[pos].task->timer_), nullptr);
|
||||
ATOMIC_STORE(&(tokens_[pos].task->timer_), nullptr);
|
||||
memmove(&tokens_[pos], &tokens_[pos + 1],
|
||||
sizeof(tokens_[0]) * (tasks_num_ - pos - 1));
|
||||
--tasks_num_;
|
||||
@ -276,7 +276,7 @@ void ObTimer::cancel_all()
|
||||
ObMonitor<Mutex>::Lock guard(monitor_);
|
||||
for (int64_t i = 0; i < tasks_num_; ++i) {
|
||||
tokens_[i].task->cancelCallBack();
|
||||
ATOMIC_STORE(&(tokens_[i].task->timer_), nullptr);
|
||||
ATOMIC_STORE(&(tokens_[i].task->timer_), nullptr);
|
||||
}
|
||||
tasks_num_ = 0;
|
||||
OB_LOG(INFO, "cancel all", KP(this), K_(thread_id), K(wakeup_time_), K(tasks_num_));
|
||||
@ -391,7 +391,7 @@ void ObTimer::run1()
|
||||
ObTimerMonitor::get_instance().end_task(thread_id_, end_time);
|
||||
}
|
||||
|
||||
if (elapsed_time > 1000 * 1000) {
|
||||
if (elapsed_time > ELAPSED_TIME_LOG_THREASHOLD) {
|
||||
OB_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "timer task cost too much time", "task", to_cstring(*token.task),
|
||||
K(start_time), K(end_time), K(elapsed_time), KP(this), K_(thread_id));
|
||||
}
|
||||
|
1
deps/oblib/src/lib/task/ob_timer.h
vendored
1
deps/oblib/src/lib/task/ob_timer.h
vendored
@ -106,6 +106,7 @@ private:
|
||||
int schedule_task(ObTimerTask &task, const int64_t delay, const bool repeate, const bool is_scheduled_immediately);
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTimer);
|
||||
private:
|
||||
const static int64_t ELAPSED_TIME_LOG_THREASHOLD = 10 * 60 * 1000 * 1000; // 10 mins
|
||||
int32_t tasks_num_;
|
||||
int64_t max_task_num_;
|
||||
int64_t wakeup_time_;
|
||||
|
@ -1619,7 +1619,7 @@ void ObTenantDagScheduler::destroy()
|
||||
wait();
|
||||
|
||||
destroy_all_workers();
|
||||
|
||||
is_inited_ = false; // avoid alloc dag/dag_net
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
for (int64_t j = 0; j < DAG_LIST_MAX; ++j) {
|
||||
for (int64_t i = 0; i < PriorityDagList::PRIO_CNT; ++i) {
|
||||
@ -1646,6 +1646,10 @@ void ObTenantDagScheduler::destroy()
|
||||
dag_map_.destroy();
|
||||
}
|
||||
if (dag_net_map_[RUNNING_DAG_NET_MAP].created()) {
|
||||
for (DagNetMap::iterator iter = dag_net_map_[RUNNING_DAG_NET_MAP].begin(); iter != dag_net_map_[RUNNING_DAG_NET_MAP].end(); ++iter) {
|
||||
iter->second->~ObIDagNet();
|
||||
allocator_.free(iter->second);
|
||||
} // end of for
|
||||
dag_net_map_[RUNNING_DAG_NET_MAP].destroy();
|
||||
}
|
||||
if (dag_net_id_map_.created()) {
|
||||
@ -1664,7 +1668,6 @@ void ObTenantDagScheduler::destroy()
|
||||
MEMSET(dag_net_cnts_, 0, sizeof(dag_net_cnts_));
|
||||
waiting_workers_.reset();
|
||||
running_workers_.reset();
|
||||
is_inited_ = false;
|
||||
COMMON_LOG(INFO, "ObTenantDagScheduler destroyed");
|
||||
TG_DESTROY(tg_id_);
|
||||
}
|
||||
@ -1764,6 +1767,7 @@ int ObTenantDagScheduler::add_dag_into_list_and_map_(
|
||||
int ObTenantDagScheduler::add_dag_net(ObIDagNet *dag_net)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
COMMON_LOG(WARN, "ObTenantDagScheduler is not inited", K(ret));
|
||||
@ -1781,7 +1785,7 @@ int ObTenantDagScheduler::add_dag_net(ObIDagNet *dag_net)
|
||||
}
|
||||
} else if (OB_FAIL(dag_net_id_map_.set_refactored(dag_net->dag_id_, dag_net))) {
|
||||
COMMON_LOG(WARN, "failed to add dag net id into map", K(ret), KPC(dag_net));
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
|
||||
if (OB_HASH_EXIST == ret) {
|
||||
const ObIDagNet *exist_dag_net = nullptr;
|
||||
if (OB_TMP_FAIL(dag_net_id_map_.get_refactored(dag_net->dag_id_, exist_dag_net))) {
|
||||
@ -1798,7 +1802,6 @@ int ObTenantDagScheduler::add_dag_net(ObIDagNet *dag_net)
|
||||
} else if (!blocking_dag_net_list_.add_last(dag_net)) {// add into blocking_dag_net_list
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
COMMON_LOG(ERROR, "failed to add into blocking_dag_net_list", K(ret), KPC(dag_net));
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
|
||||
if (OB_TMP_FAIL(dag_net_id_map_.erase_refactored(dag_net->dag_id_))) {
|
||||
COMMON_LOG(ERROR, "failed to erase from dag net id from map", K(tmp_ret), KPC(dag_net));
|
||||
@ -3068,7 +3071,7 @@ void ObTenantDagScheduler::update_work_thread_num()
|
||||
for (int64_t i = 0; i < ObDagPrio::DAG_PRIO_MAX; ++i) {
|
||||
threads_sum += up_limits_[i];
|
||||
}
|
||||
work_thread_num_ = threads_sum;
|
||||
work_thread_num_ = threads_sum;
|
||||
}
|
||||
|
||||
int ObTenantDagScheduler::set_thread_score(const int64_t priority, const int64_t score)
|
||||
|
Reference in New Issue
Block a user