diff --git a/src/sql/das/ob_das_ref.cpp b/src/sql/das/ob_das_ref.cpp index 07c39f6b01..01bf4ab2eb 100644 --- a/src/sql/das/ob_das_ref.cpp +++ b/src/sql/das/ob_das_ref.cpp @@ -235,16 +235,38 @@ int ObDASRef::execute_all_task() LOG_WARN("failed to move local tasks to last.", K(ret)); } else { uint32_t finished_cnt = 0; + uint32_t high_priority_task_execution_cnt = 0; + bool has_unstart_high_priority_tasks = true; while (finished_cnt < aggregated_tasks_.get_size() && OB_SUCC(ret)) { finished_cnt = 0; // execute tasks follows aggregated task state machine. - DLIST_FOREACH_X(curr, aggregated_tasks_.get_obj_list(), OB_SUCC(ret)) { - ObDasAggregatedTasks* aggregated_task = curr->get_obj(); - if (aggregated_task->has_unstart_tasks() && OB_FAIL(MTL(ObDataAccessService *) - ->execute_das_task(*this, *aggregated_task, async))) { - LOG_WARN("failed to execute aggregated das task", KR(ret), KPC(aggregated_task), K(async)); - } else { - LOG_DEBUG("successfully executing aggregated task", "server", aggregated_task->server_); + if (has_unstart_high_priority_tasks) { + high_priority_task_execution_cnt = 0; + DLIST_FOREACH_X(curr, aggregated_tasks_.get_obj_list(), OB_SUCC(ret)) { + ObDasAggregatedTasks* aggregated_task = curr->get_obj(); + if (aggregated_task->has_unstart_high_priority_tasks()) { + if (OB_FAIL(MTL(ObDataAccessService *)->execute_das_task(*this, *aggregated_task, async))) { + LOG_WARN("failed to execute high priority aggregated das task", KR(ret), KPC(aggregated_task), K(async)); + } else { + ++high_priority_task_execution_cnt; + LOG_DEBUG("successfully executing aggregated task", "server", aggregated_task->server_); + } + } + } + if (high_priority_task_execution_cnt == 0) { + has_unstart_high_priority_tasks = false; + } + } + if (!has_unstart_high_priority_tasks) { + DLIST_FOREACH_X(curr, aggregated_tasks_.get_obj_list(), OB_SUCC(ret)) { + ObDasAggregatedTasks* aggregated_task = curr->get_obj(); + if (aggregated_task->has_unstart_tasks()) { + if (OB_FAIL(MTL(ObDataAccessService *)->execute_das_task(*this, *aggregated_task, async))) { + LOG_WARN("failed to execute aggregated das task", KR(ret), KPC(aggregated_task), K(async)); + } else { + LOG_DEBUG("successfully executing aggregated task", "server", aggregated_task->server_); + } + } } } if (OB_FAIL(ret)) { @@ -878,6 +900,11 @@ bool ObDasAggregatedTasks::has_unstart_tasks() const failed_tasks_.get_size() != 0; } +bool ObDasAggregatedTasks::has_unstart_high_priority_tasks() const +{ + return high_priority_tasks_.get_size() != 0; +} + int32_t ObDasAggregatedTasks::get_unstart_task_size() const { return tasks_.get_size() + high_priority_tasks_.get_size(); diff --git a/src/sql/das/ob_das_ref.h b/src/sql/das/ob_das_ref.h index 220a846d5d..360fdd342a 100644 --- a/src/sql/das/ob_das_ref.h +++ b/src/sql/das/ob_das_ref.h @@ -58,6 +58,7 @@ struct ObDasAggregatedTasks bool has_failed_tasks() const { return failed_tasks_.get_size() > 0; }; int failed_tasks_can_retry() const; bool has_unstart_tasks() const; + bool has_unstart_high_priority_tasks() const; int32_t get_unstart_task_size() const; TO_STRING_KV(K_(server), K(high_priority_tasks_.get_size()), K(tasks_.get_size()), K(failed_tasks_.get_size()), K(success_tasks_.get_size())); common::ObAddr server_;