fix ddl scheduler not quit
This commit is contained in:
@ -188,6 +188,9 @@ int ObDDLTaskQueue::add_task_to_last(ObDDLTask *task)
|
|||||||
if (OB_UNLIKELY(!is_inited_)) {
|
if (OB_UNLIKELY(!is_inited_)) {
|
||||||
ret = common::OB_NOT_INIT;
|
ret = common::OB_NOT_INIT;
|
||||||
LOG_WARN("ObDDLTaskQueue has not been inited", K(ret));
|
LOG_WARN("ObDDLTaskQueue has not been inited", K(ret));
|
||||||
|
} else if (has_set_stop()) {
|
||||||
|
ret = OB_STATE_NOT_MATCH;
|
||||||
|
LOG_WARN("scheduler has stopped", K(ret));
|
||||||
} else if (OB_ISNULL(task)) {
|
} else if (OB_ISNULL(task)) {
|
||||||
ret = common::OB_INVALID_ARGUMENT;
|
ret = common::OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", K(ret), KP(task));
|
LOG_WARN("invalid argument", K(ret), KP(task));
|
||||||
@ -886,7 +889,7 @@ void ObDDLScheduler::run1()
|
|||||||
lib::set_thread_name("DDLTaskExecutor");
|
lib::set_thread_name("DDLTaskExecutor");
|
||||||
THIS_WORKER.set_worker_level(1);
|
THIS_WORKER.set_worker_level(1);
|
||||||
THIS_WORKER.set_curr_request_level(1);
|
THIS_WORKER.set_curr_request_level(1);
|
||||||
while (true) {
|
while (!has_set_stop()) {
|
||||||
const bool stop = task_queue_.has_set_stop();
|
const bool stop = task_queue_.has_set_stop();
|
||||||
bool do_idle = false;
|
bool do_idle = false;
|
||||||
if (OB_FAIL(task_queue_.get_next_task(task))) {
|
if (OB_FAIL(task_queue_.get_next_task(task))) {
|
||||||
@ -908,7 +911,13 @@ void ObDDLScheduler::run1()
|
|||||||
} else if (task == first_retry_task || !task->need_schedule()) {
|
} else if (task == first_retry_task || !task->need_schedule()) {
|
||||||
// add the task back to the queue
|
// add the task back to the queue
|
||||||
if (OB_FAIL(task_queue_.add_task_to_last(task))) {
|
if (OB_FAIL(task_queue_.add_task_to_last(task))) {
|
||||||
STORAGE_LOG(ERROR, "fail to add task to last", K(ret), K(*task));
|
if (OB_STATE_NOT_MATCH == ret) {
|
||||||
|
LOG_INFO("rootserver stopped, remove this task", K(*task));
|
||||||
|
// overwrite ret
|
||||||
|
if (OB_FAIL(remove_ddl_task(task))) {
|
||||||
|
LOG_WARN("remove ddl task failed", K(ret));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
do_idle = true;
|
do_idle = true;
|
||||||
} else {
|
} else {
|
||||||
@ -917,7 +926,13 @@ void ObDDLScheduler::run1()
|
|||||||
task->calc_next_schedule_ts(task_ret, task_queue_.get_task_cnt() + thread_cnt);
|
task->calc_next_schedule_ts(task_ret, task_queue_.get_task_cnt() + thread_cnt);
|
||||||
if (task->need_retry() && !stop && !ObIDDLTask::is_ddl_force_no_more_process(task_ret)) {
|
if (task->need_retry() && !stop && !ObIDDLTask::is_ddl_force_no_more_process(task_ret)) {
|
||||||
if (OB_FAIL(task_queue_.add_task_to_last(task))) {
|
if (OB_FAIL(task_queue_.add_task_to_last(task))) {
|
||||||
STORAGE_LOG(ERROR, "fail to add task to last, which should not happen", K(ret), K(*task));
|
if (OB_STATE_NOT_MATCH == ret) {
|
||||||
|
LOG_INFO("rootserver stopped, remove this task", K(*task));
|
||||||
|
// overwrite ret
|
||||||
|
if (OB_FAIL(remove_ddl_task(task))) {
|
||||||
|
LOG_WARN("remove ddl task failed", K(ret));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
first_retry_task = nullptr == first_retry_task ? task : first_retry_task;
|
first_retry_task = nullptr == first_retry_task ? task : first_retry_task;
|
||||||
} else if (OB_FAIL(remove_ddl_task(task))) {
|
} else if (OB_FAIL(remove_ddl_task(task))) {
|
||||||
@ -2734,9 +2749,11 @@ int ObDDLScheduler::remove_ddl_task(ObDDLTask *ddl_task)
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObLongopsMgr &longops_mgr = ObLongopsMgr::get_instance();
|
ObLongopsMgr &longops_mgr = ObLongopsMgr::get_instance();
|
||||||
|
ObDDLTaskKey task_key;
|
||||||
if (OB_ISNULL(ddl_task)) {
|
if (OB_ISNULL(ddl_task)) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid arguments", K(ret), KP(ddl_task));
|
LOG_WARN("invalid arguments", K(ret), KP(ddl_task));
|
||||||
|
} else if (OB_FALSE_IT(task_key = ddl_task->get_task_key())) {
|
||||||
} else if (OB_FAIL(task_queue_.remove_task(ddl_task))) {
|
} else if (OB_FAIL(task_queue_.remove_task(ddl_task))) {
|
||||||
LOG_WARN("fail to remove task, which should not happen", K(ret), KPC(ddl_task));
|
LOG_WARN("fail to remove task, which should not happen", K(ret), KPC(ddl_task));
|
||||||
} else {
|
} else {
|
||||||
@ -2746,6 +2763,7 @@ int ObDDLScheduler::remove_ddl_task(ObDDLTask *ddl_task)
|
|||||||
remove_sys_task(ddl_task);
|
remove_sys_task(ddl_task);
|
||||||
free_ddl_task(ddl_task);
|
free_ddl_task(ddl_task);
|
||||||
}
|
}
|
||||||
|
LOG_INFO("remove ddl task", K(task_key));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user