Reject adding task to ddl scheduler after stopped
This commit is contained in:
@ -41,7 +41,7 @@ namespace rootserver
|
|||||||
{
|
{
|
||||||
|
|
||||||
ObDDLTaskQueue::ObDDLTaskQueue()
|
ObDDLTaskQueue::ObDDLTaskQueue()
|
||||||
: task_list_(), task_map_(), lock_(), is_inited_(false)
|
: task_list_(), task_map_(), lock_(), stop_(true), is_inited_(false)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,6 +71,7 @@ int ObDDLTaskQueue::init(const int64_t bucket_num)
|
|||||||
} else if (OB_FAIL(task_id_map_.create(bucket_num, lib::ObLabel("DdlQue")))) {
|
} else if (OB_FAIL(task_id_map_.create(bucket_num, lib::ObLabel("DdlQue")))) {
|
||||||
LOG_WARN("fail to create task set", K(ret), K(bucket_num));
|
LOG_WARN("fail to create task set", K(ret), K(bucket_num));
|
||||||
} else {
|
} else {
|
||||||
|
stop_ = true;
|
||||||
is_inited_ = true;
|
is_inited_ = true;
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -85,6 +86,9 @@ int ObDDLTaskQueue::push_task(ObDDLTask *task)
|
|||||||
if (OB_UNLIKELY(!is_inited_)) {
|
if (OB_UNLIKELY(!is_inited_)) {
|
||||||
ret = common::OB_NOT_INIT;
|
ret = common::OB_NOT_INIT;
|
||||||
LOG_WARN("ObDDLTaskQueue has not been inited", K(ret));
|
LOG_WARN("ObDDLTaskQueue has not been inited", K(ret));
|
||||||
|
} else if (has_set_stop()) {
|
||||||
|
ret = OB_STATE_NOT_MATCH;
|
||||||
|
LOG_WARN("scheduler has stopped", K(ret));
|
||||||
} else if (OB_ISNULL(task)) {
|
} else if (OB_ISNULL(task)) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid arguments", K(ret), KP(task));
|
LOG_WARN("invalid arguments", K(ret), KP(task));
|
||||||
@ -547,6 +551,7 @@ int ObDDLScheduler::start()
|
|||||||
LOG_WARN("not init", K(ret));
|
LOG_WARN("not init", K(ret));
|
||||||
} else if (is_started_) {
|
} else if (is_started_) {
|
||||||
// do nothing
|
// do nothing
|
||||||
|
} else if (OB_FALSE_IT(task_queue_.set_stop(false))) {
|
||||||
} else if (OB_FAIL(TG_SET_RUNNABLE(tg_id_, *this))) {
|
} else if (OB_FAIL(TG_SET_RUNNABLE(tg_id_, *this))) {
|
||||||
LOG_WARN("tg set runnable failed", K(ret));
|
LOG_WARN("tg set runnable failed", K(ret));
|
||||||
} else if (OB_FAIL(TG_START(tg_id_))) {
|
} else if (OB_FAIL(TG_START(tg_id_))) {
|
||||||
@ -571,9 +576,9 @@ void ObDDLScheduler::stop()
|
|||||||
TG_STOP(tg_id_);
|
TG_STOP(tg_id_);
|
||||||
TG_STOP(lib::TGDefIDs::DDLScanTask);
|
TG_STOP(lib::TGDefIDs::DDLScanTask);
|
||||||
TG_STOP(lib::TGDefIDs::HeartBeatCheckTask);
|
TG_STOP(lib::TGDefIDs::HeartBeatCheckTask);
|
||||||
|
task_queue_.set_stop(true);
|
||||||
idle_stop_ = true;
|
idle_stop_ = true;
|
||||||
is_started_ = false;
|
is_started_ = false;
|
||||||
destroy_all_tasks();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObDDLScheduler::wait()
|
void ObDDLScheduler::wait()
|
||||||
@ -595,17 +600,24 @@ void ObDDLScheduler::run1()
|
|||||||
ObDDLTask *task = nullptr;
|
ObDDLTask *task = nullptr;
|
||||||
ObDDLTask *first_retry_task = nullptr;
|
ObDDLTask *first_retry_task = nullptr;
|
||||||
lib::set_thread_name("DDLTaskExecutor");
|
lib::set_thread_name("DDLTaskExecutor");
|
||||||
while (!has_set_stop()) {
|
|
||||||
THIS_WORKER.set_worker_level(1);
|
THIS_WORKER.set_worker_level(1);
|
||||||
THIS_WORKER.set_curr_request_level(1);
|
THIS_WORKER.set_curr_request_level(1);
|
||||||
while (!has_set_stop()) {
|
while (true) {
|
||||||
|
const bool stop = task_queue_.has_set_stop();
|
||||||
|
bool do_idle = false;
|
||||||
if (OB_FAIL(task_queue_.get_next_task(task))) {
|
if (OB_FAIL(task_queue_.get_next_task(task))) {
|
||||||
if (common::OB_ENTRY_NOT_EXIST == ret) {
|
if (common::OB_ENTRY_NOT_EXIST == ret) {
|
||||||
break;
|
if (stop) {
|
||||||
} else {
|
// Task queue remains empty after the last scheduler exit here.
|
||||||
LOG_WARN("fail to get next task", K(ret));
|
// Otherwise, a successful push_task must happen after this get_next_task,
|
||||||
|
// which must have seen the stop flag (modification order consistency of atomic operation)
|
||||||
|
// and failed, contradition.
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
LOG_WARN("fail to get next task", K(ret));
|
||||||
|
}
|
||||||
|
do_idle = true;
|
||||||
} else if (OB_ISNULL(task)) {
|
} else if (OB_ISNULL(task)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("error unexpected, task must not be NULL", K(ret));
|
LOG_WARN("error unexpected, task must not be NULL", K(ret));
|
||||||
@ -614,12 +626,12 @@ void ObDDLScheduler::run1()
|
|||||||
if (OB_FAIL(task_queue_.add_task_to_last(task))) {
|
if (OB_FAIL(task_queue_.add_task_to_last(task))) {
|
||||||
STORAGE_LOG(ERROR, "fail to add task to last", K(ret), K(*task));
|
STORAGE_LOG(ERROR, "fail to add task to last", K(ret), K(*task));
|
||||||
}
|
}
|
||||||
break;
|
do_idle = true;
|
||||||
} else {
|
} else {
|
||||||
ObCurTraceId::set(task->get_trace_id());
|
ObCurTraceId::set(task->get_trace_id());
|
||||||
int task_ret = task->process();
|
int task_ret = task->process();
|
||||||
task->calc_next_schedule_ts(task_ret, task_queue_.get_task_cnt() + thread_cnt);
|
task->calc_next_schedule_ts(task_ret, task_queue_.get_task_cnt() + thread_cnt);
|
||||||
if (task->need_retry() && !has_set_stop() && !ObIDDLTask::is_ddl_force_no_more_process(task_ret)) {
|
if (task->need_retry() && !stop && !ObIDDLTask::is_ddl_force_no_more_process(task_ret)) {
|
||||||
if (OB_FAIL(task_queue_.add_task_to_last(task))) {
|
if (OB_FAIL(task_queue_.add_task_to_last(task))) {
|
||||||
STORAGE_LOG(ERROR, "fail to add task to last, which should not happen", K(ret), K(*task));
|
STORAGE_LOG(ERROR, "fail to add task to last, which should not happen", K(ret), K(*task));
|
||||||
}
|
}
|
||||||
@ -628,11 +640,12 @@ void ObDDLScheduler::run1()
|
|||||||
LOG_WARN("remove ddl task failed", K(ret));
|
LOG_WARN("remove ddl task failed", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
if (do_idle) {
|
||||||
first_retry_task = nullptr;
|
first_retry_task = nullptr;
|
||||||
idler_.idle(100 * 1000L);
|
idler_.idle(100 * 1000L);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m,
|
int ObDDLScheduler::create_ddl_task(const ObCreateDDLTaskParam ¶m,
|
||||||
|
@ -55,6 +55,8 @@ public:
|
|||||||
ObDDLTaskQueue();
|
ObDDLTaskQueue();
|
||||||
virtual ~ObDDLTaskQueue();
|
virtual ~ObDDLTaskQueue();
|
||||||
int init(const int64_t bucket_num);
|
int init(const int64_t bucket_num);
|
||||||
|
bool has_set_stop() const { return ATOMIC_LOAD(&stop_); }
|
||||||
|
void set_stop(bool stop) { ATOMIC_STORE(&stop_, stop); }
|
||||||
int push_task(ObDDLTask *task);
|
int push_task(ObDDLTask *task);
|
||||||
int get_next_task(ObDDLTask *&task);
|
int get_next_task(ObDDLTask *&task);
|
||||||
int remove_task(ObDDLTask *task);
|
int remove_task(ObDDLTask *task);
|
||||||
@ -83,6 +85,7 @@ private:
|
|||||||
TaskKeyMap task_map_;
|
TaskKeyMap task_map_;
|
||||||
TaskIdMap task_id_map_;
|
TaskIdMap task_id_map_;
|
||||||
common::ObSpinLock lock_;
|
common::ObSpinLock lock_;
|
||||||
|
bool stop_;
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user