bug fix: execute DELETE das task first.

This commit is contained in:
rolandqi
2023-02-07 10:43:40 +08:00
committed by ob-robot
parent fab9a2b3ae
commit 24f272e74b
2 changed files with 35 additions and 7 deletions

View File

@ -235,16 +235,38 @@ int ObDASRef::execute_all_task()
LOG_WARN("failed to move local tasks to last.", K(ret));
} else {
uint32_t finished_cnt = 0;
uint32_t high_priority_task_execution_cnt = 0;
bool has_unstart_high_priority_tasks = true;
while (finished_cnt < aggregated_tasks_.get_size() && OB_SUCC(ret)) {
finished_cnt = 0;
// execute tasks follows aggregated task state machine.
DLIST_FOREACH_X(curr, aggregated_tasks_.get_obj_list(), OB_SUCC(ret)) {
ObDasAggregatedTasks* aggregated_task = curr->get_obj();
if (aggregated_task->has_unstart_tasks() && OB_FAIL(MTL(ObDataAccessService *)
->execute_das_task(*this, *aggregated_task, async))) {
LOG_WARN("failed to execute aggregated das task", KR(ret), KPC(aggregated_task), K(async));
} else {
LOG_DEBUG("successfully executing aggregated task", "server", aggregated_task->server_);
if (has_unstart_high_priority_tasks) {
high_priority_task_execution_cnt = 0;
DLIST_FOREACH_X(curr, aggregated_tasks_.get_obj_list(), OB_SUCC(ret)) {
ObDasAggregatedTasks* aggregated_task = curr->get_obj();
if (aggregated_task->has_unstart_high_priority_tasks()) {
if (OB_FAIL(MTL(ObDataAccessService *)->execute_das_task(*this, *aggregated_task, async))) {
LOG_WARN("failed to execute high priority aggregated das task", KR(ret), KPC(aggregated_task), K(async));
} else {
++high_priority_task_execution_cnt;
LOG_DEBUG("successfully executing aggregated task", "server", aggregated_task->server_);
}
}
}
if (high_priority_task_execution_cnt == 0) {
has_unstart_high_priority_tasks = false;
}
}
if (!has_unstart_high_priority_tasks) {
DLIST_FOREACH_X(curr, aggregated_tasks_.get_obj_list(), OB_SUCC(ret)) {
ObDasAggregatedTasks* aggregated_task = curr->get_obj();
if (aggregated_task->has_unstart_tasks()) {
if (OB_FAIL(MTL(ObDataAccessService *)->execute_das_task(*this, *aggregated_task, async))) {
LOG_WARN("failed to execute aggregated das task", KR(ret), KPC(aggregated_task), K(async));
} else {
LOG_DEBUG("successfully executing aggregated task", "server", aggregated_task->server_);
}
}
}
}
if (OB_FAIL(ret)) {
@ -878,6 +900,11 @@ bool ObDasAggregatedTasks::has_unstart_tasks() const
failed_tasks_.get_size() != 0;
}
bool ObDasAggregatedTasks::has_unstart_high_priority_tasks() const
{
return high_priority_tasks_.get_size() != 0;
}
int32_t ObDasAggregatedTasks::get_unstart_task_size() const
{
return tasks_.get_size() + high_priority_tasks_.get_size();

View File

@ -58,6 +58,7 @@ struct ObDasAggregatedTasks
bool has_failed_tasks() const { return failed_tasks_.get_size() > 0; };
int failed_tasks_can_retry() const;
bool has_unstart_tasks() const;
bool has_unstart_high_priority_tasks() const;
int32_t get_unstart_task_size() const;
TO_STRING_KV(K_(server), K(high_priority_tasks_.get_size()), K(tasks_.get_size()), K(failed_tasks_.get_size()), K(success_tasks_.get_size()));
common::ObAddr server_;