Consider task count for ddl delay scheduling
This commit is contained in:
@ -337,7 +337,7 @@ void ObDDLScheduler::run1()
|
|||||||
} else {
|
} else {
|
||||||
ObCurTraceId::set(task->get_trace_id());
|
ObCurTraceId::set(task->get_trace_id());
|
||||||
int task_ret = task->process();
|
int task_ret = task->process();
|
||||||
task->calc_next_schedule_ts(task_ret);
|
task->calc_next_schedule_ts(task_ret, task_queue_.get_task_cnt());
|
||||||
if (task->need_retry() && !has_set_stop()) {
|
if (task->need_retry() && !has_set_stop()) {
|
||||||
if (OB_FAIL(task_queue_.add_task_to_last(task))) {
|
if (OB_FAIL(task_queue_.add_task_to_last(task))) {
|
||||||
STORAGE_LOG(ERROR, "fail to add task to last, which should not happen", K(ret), K(*task));
|
STORAGE_LOG(ERROR, "fail to add task to last, which should not happen", K(ret), K(*task));
|
||||||
|
|||||||
@ -61,6 +61,7 @@ public:
|
|||||||
int add_task_to_last(ObDDLTask *task);
|
int add_task_to_last(ObDDLTask *task);
|
||||||
int get_task(const ObDDLTaskKey &task_key, ObDDLTask *&task);
|
int get_task(const ObDDLTaskKey &task_key, ObDDLTask *&task);
|
||||||
int get_task(const int64_t task_id, ObDDLTask *&task);
|
int get_task(const int64_t task_id, ObDDLTask *&task);
|
||||||
|
int64_t get_task_cnt() const { return task_list_.get_size(); }
|
||||||
void destroy();
|
void destroy();
|
||||||
private:
|
private:
|
||||||
typedef common::ObDList<ObDDLTask> TaskList;
|
typedef common::ObDList<ObDDLTask> TaskList;
|
||||||
|
|||||||
@ -623,13 +623,15 @@ int ObDDLTask::push_execution_id()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObDDLTask::calc_next_schedule_ts(int ret_code)
|
// The length of [min_dt, max_dt] controls the execution rate of ddl tasks.
|
||||||
|
void ObDDLTask::calc_next_schedule_ts(const int ret_code, const int64_t total_task_cnt)
|
||||||
{
|
{
|
||||||
if (OB_TIMEOUT == ret_code) {
|
if (OB_TIMEOUT == ret_code) {
|
||||||
const int64_t SEC = 1000000;
|
const int64_t SEC = 1000000;
|
||||||
delay_schedule_time_ = std::min(delay_schedule_time_ * 6/5 + SEC/10, 30*SEC);
|
const int64_t max_delay = total_task_cnt * ObDDLUtil::get_ddl_rpc_timeout() * 10;
|
||||||
|
delay_schedule_time_ = std::min(delay_schedule_time_ * 6/5 + SEC/10, max_delay);
|
||||||
const int64_t max_dt = delay_schedule_time_;
|
const int64_t max_dt = delay_schedule_time_;
|
||||||
const int64_t min_dt = std::max(0L, max_dt - 3*SEC);
|
const int64_t min_dt = max_dt / 2;
|
||||||
next_schedule_ts_ = ObTimeUtility::current_time() + ObRandom::rand(min_dt, max_dt);
|
next_schedule_ts_ = ObTimeUtility::current_time() + ObRandom::rand(min_dt, max_dt);
|
||||||
} else {
|
} else {
|
||||||
delay_schedule_time_ = 0;
|
delay_schedule_time_ = 0;
|
||||||
|
|||||||
@ -323,7 +323,7 @@ public:
|
|||||||
void set_sys_task_id(const TraceId &sys_task_id) { sys_task_id_ = sys_task_id; }
|
void set_sys_task_id(const TraceId &sys_task_id) { sys_task_id_ = sys_task_id; }
|
||||||
void set_sql_exec_addr(const common::ObAddr &addr) { sql_exec_addr_ = addr; }
|
void set_sql_exec_addr(const common::ObAddr &addr) { sql_exec_addr_ = addr; }
|
||||||
const TraceId &get_sys_task_id() const { return sys_task_id_; }
|
const TraceId &get_sys_task_id() const { return sys_task_id_; }
|
||||||
void calc_next_schedule_ts(int ret_code);
|
void calc_next_schedule_ts(const int ret_code, const int64_t total_task_cnt);
|
||||||
bool need_schedule() { return next_schedule_ts_ <= ObTimeUtility::current_time(); }
|
bool need_schedule() { return next_schedule_ts_ <= ObTimeUtility::current_time(); }
|
||||||
bool is_replica_build_need_retry(const int ret_code);
|
bool is_replica_build_need_retry(const int ret_code);
|
||||||
int push_execution_id();
|
int push_execution_id();
|
||||||
|
|||||||
Reference in New Issue
Block a user