fix dag scheduler destroy memleak

This commit is contained in:
yangqise7en 2023-09-15 03:13:43 +00:00 committed by ob-robot
parent eeacd9c6bc
commit 3760b224a2
2 changed files with 9 additions and 8 deletions

View File

@ -1720,7 +1720,7 @@ void ObTenantDagScheduler::reset()
STORAGE_LOG_RET(WARN, tmp_ret, "failed to del sys task", K(tmp_ret), K(cur_dag->get_dag_id()));
}
}
if (OB_TMP_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cur_dag, tmp_dag_net))) {
if (OB_TMP_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cur_dag, tmp_dag_net, false/*try_move_child*/))) {
STORAGE_LOG_RET(WARN, tmp_ret, "failed to abort dag", K(tmp_ret), KPC(cur_dag));
} else {
++abort_dag_cnt;
@ -2274,7 +2274,7 @@ int ObTenantDagScheduler::check_ls_compaction_dag_exist_with_cancel(
if (OB_UNLIKELY(nullptr != cancel_dag->get_dag_net())) {
tmp_ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "compaction dag should not in dag net", KR(tmp_ret));
} else if (OB_TMP_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cancel_dag, unused_erase_dag_net))) {
} else if (OB_TMP_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cancel_dag, unused_erase_dag_net, false/*try_move_child*/))) {
COMMON_LOG(WARN, "failed to erase dag", K(tmp_ret), KPC(cancel_dag));
ob_abort();
} else {
@ -2569,7 +2569,7 @@ int ObTenantDagScheduler::finish_task_in_dag(ObITask &task, ObIDag &dag, ObIDagN
if (dag.has_finished()) {
ObIDag::ObDagStatus status =
dag.is_dag_failed() ? ObIDag::DAG_STATUS_ABORT : ObIDag::DAG_STATUS_FINISH;
if (OB_FAIL(finish_dag_(status, dag, finish_dag_net))) {
if (OB_FAIL(finish_dag_(status, dag, finish_dag_net, true/*try_move_child*/))) {
COMMON_LOG(WARN, "failed to finish dag", K(ret), K(dag));
ob_abort();
}
@ -2608,7 +2608,8 @@ int ObTenantDagScheduler::finish_dag_net(ObIDagNet *dag_net)
int ObTenantDagScheduler::finish_dag_(
const ObIDag::ObDagStatus status,
ObIDag &dag,
ObIDagNet *&finish_dag_net)
ObIDagNet *&finish_dag_net,
const bool try_move_child)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
@ -2616,7 +2617,7 @@ int ObTenantDagScheduler::finish_dag_(
if (OB_FAIL(dag.finish(status))) {
COMMON_LOG(WARN, "dag finished failed", K(ret), K(dag), "dag_ret", dag.get_dag_ret());
} else if (OB_FAIL(try_move_child_to_ready_list(dag))) {
} else if (try_move_child && OB_FAIL(try_move_child_to_ready_list(dag))) {
LOG_WARN("failed to try move child to ready list", K(ret), K(&dag));
} else if (OB_FAIL(erase_dag_(dag))) {
COMMON_LOG(ERROR, "failed to erase dag from dag_map", K(ret), K(dag));
@ -2984,7 +2985,7 @@ int ObTenantDagScheduler::pop_task_from_ready_list(
cur = cur->get_next();
if (OB_FAIL(tmp_dag->report_result())) {
LOG_WARN("failed to report reuslt", K(ret), KPC(tmp_dag));
} else if (OB_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *tmp_dag, erase_dag_net))) {
} else if (OB_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *tmp_dag, erase_dag_net, true/*try_move_child*/))) {
COMMON_LOG(ERROR, "failed to finish dag", K(ret), KPC(tmp_dag));
ob_abort();
} else if (OB_NOT_NULL(erase_dag_net)) {
@ -3364,7 +3365,7 @@ int ObTenantDagScheduler::cancel_dag(const ObIDag *dag, ObIDag *parent_dag)
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cur_dag, erase_dag_net))) {
} else if (OB_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cur_dag, erase_dag_net, true/*try_move_child*/))) {
COMMON_LOG(WARN, "failed to erase dag", K(ret), KPC(cur_dag));
ob_abort();
}

View File

@ -884,7 +884,7 @@ private:
int loop_blocking_dag_net_list();
int loop_running_dag_net_map();
int finish_task_in_dag(ObITask &task, ObIDag &dag, ObIDagNet *&erase_dag_net);
int finish_dag_(const ObIDag::ObDagStatus status, ObIDag &dag, ObIDagNet *&erase_dag_net);
int finish_dag_(const ObIDag::ObDagStatus status, ObIDag &dag, ObIDagNet *&erase_dag_net, const bool try_move_child);
int finish_dag_net(ObIDagNet *dag_net);
int deal_with_finish_task(ObITask &task, ObTenantDagWorker &worker, int error_code);
int deal_with_fail_task(ObITask &task, ObIDag &dag, const int error_code, bool &retry_flag);