fix upgrade error && open adaptive_merge_schedule

This commit is contained in:
obdev
2024-02-08 00:23:49 +00:00
committed by ob-robot
parent f0b3bd5883
commit af5bc4a3c3
19 changed files with 146 additions and 111 deletions

View File

@ -2178,6 +2178,7 @@ int ObDagPrioScheduler::do_rank_compaction_dags_(
int ret = OB_SUCCESS;
rank_dags.reset();
compaction::ObCompactionDagRanker ranker;
common::ObSEArray<compaction::ObTabletMergeDag *, 32> need_rank_dags;
const int64_t cur_time = common::ObTimeUtility::current_time();
if (OB_UNLIKELY(batch_size <= 0)) {
@ -2200,7 +2201,11 @@ int ObDagPrioScheduler::do_rank_compaction_dags_(
} else if (ObIDag::DAG_STATUS_READY != dag_status) { // dag status must be INITING or READY
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "dag in rank list must be ready", K(ret), K(dag_status), KPC(compaction_dag));
} else if (OB_FAIL(rank_dags.push_back(compaction_dag))) {
} else if (is_meta_major_merge(compaction_dag->get_param().merge_type_)) {
if (OB_FAIL(rank_dags.push_back(compaction_dag))) {
COMMON_LOG(WARN, "failed to add meta merge dag", K(ret), KPC(compaction_dag));
}
} else if (OB_FAIL(need_rank_dags.push_back(compaction_dag))) {
COMMON_LOG(WARN, "failed to add compaction dag", K(ret), KPC(compaction_dag));
} else {
ranker.update(cur_time, compaction_dag->get_param().compaction_param_);
@ -2208,16 +2213,22 @@ int ObDagPrioScheduler::do_rank_compaction_dags_(
if (OB_SUCC(ret)) {
cur = cur->get_next();
if (rank_dags.count() >= batch_size) {
if (rank_dags.count() + need_rank_dags.count() >= batch_size) {
break; // reached max rank count, stop adding ready dags
}
}
} // end while
if (OB_SUCC(ret) && !rank_dags.empty() && ranker.is_valid()) {
if (OB_FAIL(ranker.sort(rank_dags))) {
if (OB_SUCC(ret) && !need_rank_dags.empty() && ranker.is_valid()) {
if (OB_FAIL(ranker.sort(need_rank_dags))) {
COMMON_LOG(WARN, "failed to sort compaction dags, move dags to ready list directly",
K(ret), K(rank_dags));
K(ret), K(need_rank_dags));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < need_rank_dags.count(); ++i) {
if (OB_FAIL(rank_dags.push_back(need_rank_dags.at(i)))) {
COMMON_LOG(WARN, "failed to add rank dags", K(ret));
}
}
}
}
}
@ -2303,7 +2314,7 @@ int ObDagPrioScheduler::rank_compaction_dags_()
// Time Statistics
int64_t cost_time = common::ObTimeUtility::fast_current_time() - cur_time;
if (REACH_TENANT_TIME_INTERVAL(DUMP_STATUS_INTERVAL)) {
COMMON_LOG(INFO, "[DAG_RERANK] Ranking compaction dags costs: ", K(cost_time), K_(priority), K(ret), K(tmp_ret));
COMMON_LOG(INFO, "[ADAPTIVE_SCHED] Ranking compaction dags costs: ", K(cost_time), K_(priority), K(ret), K(tmp_ret));
}
}
@ -2336,7 +2347,7 @@ void ObDagPrioScheduler::try_update_adaptive_task_limit_(const int64_t batch_siz
const int64_t mem_allow_max_thread = lib::get_tenant_memory_remain(MTL_ID()) * ADAPTIVE_PERCENT / estimate_mem_per_thread;
if (mem_allow_max_thread >= adaptive_task_limit_ * 5) {
++adaptive_task_limit_;
FLOG_INFO("[ADAPTIVE_DAG] increment adaptive task limit", K(priority_), K(adaptive_task_limit_));
FLOG_INFO("[ADAPTIVE_SCHED] increment adaptive task limit", K(priority_), K(adaptive_task_limit_));
}
}
}
@ -2350,7 +2361,7 @@ int ObDagPrioScheduler::pop_task_from_ready_list_(ObITask *&task)
// adaptive compaction scheduling
if (is_rank_dag_prio() && OB_TMP_FAIL(rank_compaction_dags_())) {
COMMON_LOG(WARN, "[DAG_RERANK] Failed to rank compaction dags", K(tmp_ret), K_(priority));
COMMON_LOG(WARN, "[ADAPTIVE_SCHED] Failed to rank compaction dags", K(tmp_ret), K_(priority));
}
if (!dag_list_[READY_DAG_LIST].is_empty()) {
@ -3165,7 +3176,7 @@ int ObDagPrioScheduler::deal_with_finish_task(
// dag can retry & this task is the last running task
if (OB_ALLOCATE_MEMORY_FAILED == error_code && is_mini_compaction_dag(dag->get_type())) {
if (static_cast<compaction::ObTabletMergeDag *>(dag)->is_reserve_mode() &&
MTL(ObTenantCompactionMemPool *)->is_emergency_mode()) {
MTL(ObTenantCompactionMemPool *)->is_emergency_mode() && !MTL_IS_MINI_MODE()) {
COMMON_LOG(ERROR, "reserve mode dag failed to alloc mem unexpectly", KPC(dag)); // tmp debug log for reserve mode, remove later.
}
MTL(ObTenantCompactionMemPool *)->set_memory_mode(ObTenantCompactionMemPool::EMERGENCY_MODE);
@ -3348,11 +3359,12 @@ bool ObDagPrioScheduler::check_need_load_shedding_(const bool for_schedule)
} else if (load_shedding_factor <= 1 || !is_rank_dag_prio()) {
// no need to load shedding
} else {
const int64_t load_shedding_limit = MAX(2, limits_ / load_shedding_factor);
const int64_t load_shedding_limit = MAX(2, adaptive_task_limit_ / load_shedding_factor);
if (running_task_cnts_ > load_shedding_limit + extra_limit) {
need_shedding = true;
if (REACH_TENANT_TIME_INTERVAL(30_s)) {
FLOG_INFO("DagScheduler needs to load shedding", K(load_shedding_factor), K(extra_limit), K_(limits));
FLOG_INFO("[ADAPTIVE_SCHED] DagScheduler needs to load shedding", K(load_shedding_factor), K(for_schedule),
K(extra_limit), K_(adaptive_task_limit), K_(running_task_cnts), K_(priority));
}
}
}