enhance compaction dag statistic

This commit is contained in:
a1iive
2024-02-07 00:52:13 +00:00
committed by ob-robot
parent 08b18b92c0
commit 99f684b537
20 changed files with 810 additions and 249 deletions

View File

@ -831,7 +831,7 @@ int64_t ObIDag::to_string(char *buf, const int64_t buf_len) const
} else {
J_OBJ_START();
J_KV(KP(this), K_(is_inited), K_(type), "name", get_dag_type_str(type_), K_(id), KPC_(dag_net), K_(dag_ret), K_(dag_status),
K_(start_time), K_(running_task_cnt), K_(indegree), K_(consumer_group_id), "hash", hash(), K(task_list_.get_size()));
K_(add_time), K_(start_time), K_(running_task_cnt), K_(indegree), K_(consumer_group_id), "hash", hash(), K(task_list_.get_size()));
J_OBJ_END();
}
return pos;
@ -2003,8 +2003,7 @@ int ObDagPrioScheduler::inner_add_dag_(
}
} else {
add_added_info_(dag->get_type());
COMMON_LOG(INFO, "add dag success", KP(dag), "start_time", dag->get_start_time(),
"id", dag->get_dag_id(), K(dag->hash()), "dag_cnt", scheduler_->get_cur_dag_cnt(),
COMMON_LOG(INFO, "add dag success", KP(dag), "id", dag->get_dag_id(), K(dag->hash()), "dag_cnt", scheduler_->get_cur_dag_cnt(),
"dag_type_cnts", scheduler_->get_type_dag_cnt(dag->get_type()));
dag = nullptr;
@ -2016,6 +2015,7 @@ void ObDagPrioScheduler::add_schedule_info_(const ObDagType::ObDagTypeEnum dag_t
{
scheduler_->add_total_running_task_cnt();
scheduler_->add_scheduled_task_cnt();
scheduler_->add_running_dag_cnts(dag_type);
scheduler_->add_scheduled_task_cnts(dag_type);
scheduler_->add_scheduled_data_size(dag_type, data_size);
++running_task_cnts_;
@ -2513,6 +2513,14 @@ int ObDagPrioScheduler::finish_dag_(
if (OB_TMP_FAIL(dag.report_result())) {
COMMON_LOG(WARN, "failed to report result", K(tmp_ret), K(dag));
}
compaction::ObCompactionSuggestionMgr *suggestion_mgr = MTL(compaction::ObCompactionSuggestionMgr *);
if (OB_NOT_NULL(suggestion_mgr)
&& OB_TMP_FAIL(suggestion_mgr->update_finish_cnt(
dag.get_type(),
OB_SUCCESS != dag.get_dag_ret(),
ObTimeUtility::fast_current_time() - dag.get_start_time()))) {
COMMON_LOG(WARN, "failed to update finish cnt", K(tmp_ret), K_(priority), K(dag));
}
// compaction dag success
if (ObIDag::DAG_STATUS_FINISH == status && is_compaction_dag(dag.get_type())) {
@ -2662,6 +2670,8 @@ void ObDagPrioScheduler::pause_worker_(ObTenantDagWorker &worker)
COMMON_LOG_RET(WARN, OB_ERR_UNEXPECTED, "unexpected null scheduler", KP_(scheduler));
} else {
scheduler_->sub_total_running_task_cnt();
// dag still exist now
scheduler_->sub_running_dag_cnts(worker.get_task()->get_dag()->get_type());
--running_task_cnts_;
running_workers_.remove(&worker);
waiting_workers_.add_last(&worker);
@ -3152,6 +3162,7 @@ int ObDagPrioScheduler::deal_with_finish_task(
COMMON_LOG(WARN, "unexpected null scheduler", K(ret), KP_(scheduler));
} else {
ObMutexGuard guard(prio_lock_);
ObDagType::ObDagTypeEnum dag_type = dag->get_type();
if (OB_SUCCESS != error_code
&& ObIDag::DAG_STATUS_NODE_FAILED != dag->get_dag_status()) {
// set errno on first failure
@ -3188,6 +3199,7 @@ int ObDagPrioScheduler::deal_with_finish_task(
if (finish_task_flag && OB_FAIL(finish_task_in_dag_(task, *dag, erase_dag_net))) {
COMMON_LOG(WARN, "failed to finish task", K(ret), KPC(dag));
}
scheduler_->sub_running_dag_cnts(dag_type);
}
// ensure that prio_lock is not locked before locking dag_net_map_lock
@ -3280,6 +3292,12 @@ int64_t ObDagPrioScheduler::get_limit()
return limits_;
}
int64_t ObDagPrioScheduler::get_adaptive_limit()
{
ObMutexGuard guard(prio_lock_);
return adaptive_task_limit_;
}
int64_t ObDagPrioScheduler::get_running_task_cnt()
{
ObMutexGuard guard(prio_lock_);
@ -3901,6 +3919,7 @@ int ObTenantDagScheduler::init(
dag_limit_ = dag_limit;
work_thread_num_ = default_work_thread_num_ = 0;
MEMSET(dag_cnts_, 0, sizeof(dag_cnts_));
MEMSET(running_dag_cnts_, 0, sizeof(running_dag_cnts_));
MEMSET(added_dag_cnts_, 0, sizeof(added_dag_cnts_));
MEMSET(scheduled_dag_cnts_, 0, sizeof(scheduled_dag_cnts_));
MEMSET(scheduled_task_cnts_, 0, sizeof(scheduled_task_cnts_));
@ -3981,6 +4000,7 @@ void ObTenantDagScheduler::reset()
total_running_task_cnt_ = 0;
scheduled_task_cnt_ = 0;
MEMSET(dag_cnts_, 0, sizeof(dag_cnts_));
MEMSET(running_dag_cnts_, 0, sizeof(running_dag_cnts_));
MEMSET(added_dag_cnts_, 0, sizeof(added_dag_cnts_));
MEMSET(scheduled_dag_cnts_, 0, sizeof(scheduled_dag_cnts_));
MEMSET(scheduled_task_cnts_, 0, sizeof(scheduled_task_cnts_));
@ -4068,30 +4088,84 @@ int ObTenantDagScheduler::add_dag(
return ret;
}
void ObTenantDagScheduler::get_suggestion_reason(
const int64_t priority,
int64_t &reason)
{
reason = compaction::ObCompactionSuggestionMgr::ObCompactionSuggestionReason::MAX_REASON;
if (ObDagPrio::DAG_PRIO_COMPACTION_HIGH == priority) {
inner_get_suggestion_reason(ObDagType::DAG_TYPE_MINI_MERGE, reason);
} else if (ObDagPrio::DAG_PRIO_COMPACTION_MID == priority) {
inner_get_suggestion_reason(ObDagType::DAG_TYPE_MERGE_EXECUTE, reason);
} else if (ObDagPrio::DAG_PRIO_COMPACTION_LOW == priority) {
inner_get_suggestion_reason(ObDagType::DAG_TYPE_MAJOR_MERGE, reason);
if (compaction::ObCompactionSuggestionMgr::ObCompactionSuggestionReason::MAX_REASON == reason) {
inner_get_suggestion_reason(ObDagType::DAG_TYPE_CO_MERGE_BATCH_EXECUTE, reason);
}
}
}
void ObTenantDagScheduler::inner_get_suggestion_reason(const ObDagType::ObDagTypeEnum type, int64_t &reason)
{
reason = compaction::ObCompactionSuggestionMgr::ObCompactionSuggestionReason::MAX_REASON;
if (MANY_DAG_COUNT < get_type_dag_cnt(type) && get_added_dag_cnts(type) > 2 * get_scheduled_dag_cnts(type)) {
reason = compaction::ObCompactionSuggestionMgr::ObCompactionSuggestionReason::SCHE_SLOW;
} else if (dag_count_overflow(type)) {
reason = compaction::ObCompactionSuggestionMgr::ObCompactionSuggestionReason::DAG_FULL;
}
}
void ObTenantDagScheduler::diagnose_for_suggestion()
{
int tmp_ret = OB_SUCCESS;
if (REACH_TENANT_TIME_INTERVAL(DUMP_DAG_STATUS_INTERVAL)) {
ObSEArray<int64_t, ObIDag::MergeDagPrioCnt> reasons;
ObSEArray<int64_t, ObIDag::MergeDagPrioCnt> thread_limits;
ObSEArray<int64_t, ObIDag::MergeDagPrioCnt> running_cnts;
ObSEArray<int64_t, compaction::ObCompactionDagStatus::COMPACTION_DAG_MAX> dag_running_cnts;
int64_t value = 0;
for (int64_t i = 0; i < ObIDag::MergeDagPrioCnt; ++i) {
if (FALSE_IT(get_suggestion_reason(ObIDag::MergeDagPrio[i], value))) {
} else if (OB_TMP_FAIL(reasons.push_back(value))) {
LOG_WARN_RET(tmp_ret, "failed to push back reasons");
} else if (FALSE_IT(value = prio_sche_[ObIDag::MergeDagPrio[i]].get_adaptive_limit())) {
} else if (OB_TMP_FAIL(thread_limits.push_back(value))) {
LOG_WARN_RET(tmp_ret, "failed to push back thread_limits");
} else if (FALSE_IT(value = prio_sche_[ObIDag::MergeDagPrio[i]].get_running_task_cnt())) {
} else if (OB_TMP_FAIL(running_cnts.push_back(value))) {
LOG_WARN_RET(tmp_ret, "failed to push back running_cnts");
}
}
for (int64_t i = 0; i < compaction::ObCompactionDagStatus::COMPACTION_DAG_MAX; ++i) {
if (FALSE_IT(value = get_running_dag_cnts(i))) {
} else if (OB_TMP_FAIL(dag_running_cnts.push_back(value))) {
LOG_WARN_RET(tmp_ret, "failed to push back reasons");
}
}
compaction::ObCompactionSuggestionMgr *suggestion_mgr = MTL(compaction::ObCompactionSuggestionMgr *);
if (OB_NOT_NULL(suggestion_mgr)
&& OB_TMP_FAIL(suggestion_mgr->diagnose_for_suggestion(
reasons, running_cnts, thread_limits, dag_running_cnts))) {
LOG_WARN_RET(tmp_ret, "fail to diagnose for suggestion");
}
}
}
void ObTenantDagScheduler::dump_dag_status(const bool force_dump/*false*/)
{
int tmp_ret = OB_SUCCESS;
if (force_dump || REACH_TENANT_TIME_INTERVAL(DUMP_DAG_STATUS_INTERVAL)) {
int64_t total_worker_cnt = 0;
int64_t work_thread_num = 0;
for (int64_t i = 0; i < ObDagPrio::DAG_PRIO_MAX; ++i) {
prio_sche_[i].dump_dag_status();
}
COMMON_LOG(INFO, "dump_dag_status", "dag_cnt", get_cur_dag_cnt());
for (int64_t i = 0; i < ObDagType::DAG_TYPE_MAX; ++i) {
if (OB_TMP_FAIL(compaction::ObCompactionSuggestionMgr::get_instance().analyze_insufficient_thread(
MTL_ID(),
(ObDagType::ObDagTypeEnum)i,
OB_DAG_TYPES[i].init_dag_prio_,
prio_sche_[OB_DAG_TYPES[i].init_dag_prio_].get_limit(),
get_added_dag_cnts(i),
get_scheduled_dag_cnts(i)))) {
COMMON_LOG_RET(WARN, tmp_ret, "fail to analyze insufficient thread", K(tmp_ret));
}
COMMON_LOG(INFO, "dump_dag_status", "type", OB_DAG_TYPES[i],
"dag_count", get_type_dag_cnt(i),
"running_dag_count", get_running_dag_cnts(i),
"added_dag_count", get_added_dag_cnts(i),
"scheduled_dag_count", get_scheduled_dag_cnts(i),
"scheduled_task_count", get_scheduled_task_cnts(i),
@ -4109,7 +4183,9 @@ void ObTenantDagScheduler::dump_dag_status(const bool force_dump/*false*/)
total_worker_cnt = total_worker_cnt_;
work_thread_num = work_thread_num_;
}
COMMON_LOG(INFO, "dump_dag_status", K(total_worker_cnt),
COMMON_LOG(INFO, "dump_dag_status",
"dag_cnt", get_cur_dag_cnt(),
K(total_worker_cnt),
"total_running_task_cnt", get_total_running_task_cnt(),
K(work_thread_num),
"scheduled_task_cnt", get_scheduled_task_cnt());
@ -4384,6 +4460,7 @@ void ObTenantDagScheduler::run1()
int ret = OB_SUCCESS;
lib::set_thread_name("DagScheduler");
while (!has_set_stop()) {
diagnose_for_suggestion();
dump_dag_status();
loop_dag_net();
{
@ -4689,6 +4766,22 @@ int ObTenantDagScheduler::get_limit(const int64_t prio, int64_t &limit)
return ret;
}
int ObTenantDagScheduler::get_adaptive_limit(const int64_t prio, int64_t &limit)
{
int ret = OB_SUCCESS;
limit = 0;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
COMMON_LOG(WARN, "ObTenantDagScheduler is not inited", K(ret));
} else if (OB_UNLIKELY(prio < 0 || prio >= ObDagPrio::DAG_PRIO_MAX)) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "invalid argument", K(ret), K(prio));
} else if (FALSE_IT(limit = prio_sche_[prio].get_adaptive_limit())) {
// TODO get max thread concurrency of current prio type
}
return ret;
}
int ObTenantDagScheduler::check_dag_exist(const ObIDag *dag, bool &exist)
{
int ret = OB_SUCCESS;