From f94e8d14ef1dfd7303db07bd411f29f3dd5b0c51 Mon Sep 17 00:00:00 2001 From: Hongqin-Li Date: Tue, 4 Apr 2023 09:16:14 +0000 Subject: [PATCH] Reject adding task to ddl scheduler after stopped --- src/rootserver/ddl_task/ob_ddl_scheduler.cpp | 77 ++++++++++++-------- src/rootserver/ddl_task/ob_ddl_scheduler.h | 3 + 2 files changed, 48 insertions(+), 32 deletions(-) diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp index 079a3bc04..74af93001 100644 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp @@ -41,7 +41,7 @@ namespace rootserver { ObDDLTaskQueue::ObDDLTaskQueue() - : task_list_(), task_map_(), lock_(), is_inited_(false) + : task_list_(), task_map_(), lock_(), stop_(true), is_inited_(false) { } @@ -71,6 +71,7 @@ int ObDDLTaskQueue::init(const int64_t bucket_num) } else if (OB_FAIL(task_id_map_.create(bucket_num, lib::ObLabel("DdlQue")))) { LOG_WARN("fail to create task set", K(ret), K(bucket_num)); } else { + stop_ = true; is_inited_ = true; } return ret; @@ -85,6 +86,9 @@ int ObDDLTaskQueue::push_task(ObDDLTask *task) if (OB_UNLIKELY(!is_inited_)) { ret = common::OB_NOT_INIT; LOG_WARN("ObDDLTaskQueue has not been inited", K(ret)); + } else if (has_set_stop()) { + ret = OB_STATE_NOT_MATCH; + LOG_WARN("scheduler has stopped", K(ret)); } else if (OB_ISNULL(task)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), KP(task)); @@ -547,6 +551,7 @@ int ObDDLScheduler::start() LOG_WARN("not init", K(ret)); } else if (is_started_) { // do nothing + } else if (OB_FALSE_IT(task_queue_.set_stop(false))) { } else if (OB_FAIL(TG_SET_RUNNABLE(tg_id_, *this))) { LOG_WARN("tg set runnable failed", K(ret)); } else if (OB_FAIL(TG_START(tg_id_))) { @@ -571,9 +576,9 @@ void ObDDLScheduler::stop() TG_STOP(tg_id_); TG_STOP(lib::TGDefIDs::DDLScanTask); TG_STOP(lib::TGDefIDs::HeartBeatCheckTask); + task_queue_.set_stop(true); idle_stop_ = true; is_started_ = false; - destroy_all_tasks(); } void ObDDLScheduler::wait() @@ -595,42 +600,50 @@ void ObDDLScheduler::run1() ObDDLTask *task = nullptr; ObDDLTask *first_retry_task = nullptr; lib::set_thread_name("DDLTaskExecutor"); - while (!has_set_stop()) { - THIS_WORKER.set_worker_level(1); - THIS_WORKER.set_curr_request_level(1); - while (!has_set_stop()) { - if (OB_FAIL(task_queue_.get_next_task(task))) { - if (common::OB_ENTRY_NOT_EXIST == ret) { - break; - } else { - LOG_WARN("fail to get next task", K(ret)); + THIS_WORKER.set_worker_level(1); + THIS_WORKER.set_curr_request_level(1); + while (true) { + const bool stop = task_queue_.has_set_stop(); + bool do_idle = false; + if (OB_FAIL(task_queue_.get_next_task(task))) { + if (common::OB_ENTRY_NOT_EXIST == ret) { + if (stop) { + // Task queue remains empty after the last scheduler exit here. + // Otherwise, a successful push_task must happen after this get_next_task, + // which must have seen the stop flag (modification order consistency of atomic operation) + // and failed, contradition. break; } - } else if (OB_ISNULL(task)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("error unexpected, task must not be NULL", K(ret)); - } else if (task == first_retry_task || !task->need_schedule()) { - // add the task back to the queue - if (OB_FAIL(task_queue_.add_task_to_last(task))) { - STORAGE_LOG(ERROR, "fail to add task to last", K(ret), K(*task)); - } - break; } else { - ObCurTraceId::set(task->get_trace_id()); - int task_ret = task->process(); - task->calc_next_schedule_ts(task_ret, task_queue_.get_task_cnt() + thread_cnt); - if (task->need_retry() && !has_set_stop() && !ObIDDLTask::is_ddl_force_no_more_process(task_ret)) { - if (OB_FAIL(task_queue_.add_task_to_last(task))) { - STORAGE_LOG(ERROR, "fail to add task to last, which should not happen", K(ret), K(*task)); - } - first_retry_task = nullptr == first_retry_task ? task : first_retry_task; - } else if (OB_FAIL(remove_ddl_task(task))) { - LOG_WARN("remove ddl task failed", K(ret)); + LOG_WARN("fail to get next task", K(ret)); + } + do_idle = true; + } else if (OB_ISNULL(task)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("error unexpected, task must not be NULL", K(ret)); + } else if (task == first_retry_task || !task->need_schedule()) { + // add the task back to the queue + if (OB_FAIL(task_queue_.add_task_to_last(task))) { + STORAGE_LOG(ERROR, "fail to add task to last", K(ret), K(*task)); + } + do_idle = true; + } else { + ObCurTraceId::set(task->get_trace_id()); + int task_ret = task->process(); + task->calc_next_schedule_ts(task_ret, task_queue_.get_task_cnt() + thread_cnt); + if (task->need_retry() && !stop && !ObIDDLTask::is_ddl_force_no_more_process(task_ret)) { + if (OB_FAIL(task_queue_.add_task_to_last(task))) { + STORAGE_LOG(ERROR, "fail to add task to last, which should not happen", K(ret), K(*task)); } + first_retry_task = nullptr == first_retry_task ? task : first_retry_task; + } else if (OB_FAIL(remove_ddl_task(task))) { + LOG_WARN("remove ddl task failed", K(ret)); } } - first_retry_task = nullptr; - idler_.idle(100 * 1000L); + if (do_idle) { + first_retry_task = nullptr; + idler_.idle(100 * 1000L); + } } } } diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.h b/src/rootserver/ddl_task/ob_ddl_scheduler.h index d8f1020c2..17c117e87 100644 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.h +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.h @@ -55,6 +55,8 @@ public: ObDDLTaskQueue(); virtual ~ObDDLTaskQueue(); int init(const int64_t bucket_num); + bool has_set_stop() const { return ATOMIC_LOAD(&stop_); } + void set_stop(bool stop) { ATOMIC_STORE(&stop_, stop); } int push_task(ObDDLTask *task); int get_next_task(ObDDLTask *&task); int remove_task(ObDDLTask *task); @@ -83,6 +85,7 @@ private: TaskKeyMap task_map_; TaskIdMap task_id_map_; common::ObSpinLock lock_; + bool stop_; bool is_inited_; };