From eb7ad61eeef2f425a19d0fad8d7abcdf810849c1 Mon Sep 17 00:00:00 2001 From: godyangfight Date: Sun, 27 Oct 2024 12:43:42 +0000 Subject: [PATCH] Fix generate next dag emergency flag lost bug. --- .../scheduler/ob_tenant_dag_scheduler.cpp | 24 +++++++++---------- src/share/scheduler/ob_tenant_dag_scheduler.h | 8 +++---- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.cpp b/src/share/scheduler/ob_tenant_dag_scheduler.cpp index e6ba862cc..45fe0ff0b 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.cpp +++ b/src/share/scheduler/ob_tenant_dag_scheduler.cpp @@ -490,7 +490,8 @@ ObIDag::ObIDag(const ObDagType::ObDagTypeEnum type) max_retry_times_(0), running_times_(0), dag_net_(nullptr), - list_idx_(DAG_LIST_MAX) + list_idx_(DAG_LIST_MAX), + emergency_(false) { STATIC_ASSERT(static_cast(DAG_STATUS_MAX) == ARRAYSIZEOF(ObIDagStatusStr), "dag status str len is mismatch"); STATIC_ASSERT(MergeDagPrioCnt == ARRAYSIZEOF(MergeDagPrio), "merge dag prio len is mismatch"); @@ -558,6 +559,7 @@ void ObIDag::reset() force_cancel_flag_ = false; dag_net_ = nullptr; list_idx_ = DAG_LIST_MAX; + emergency_ = false; } int ObIDag::add_task(ObITask &task) @@ -848,7 +850,8 @@ int64_t ObIDag::to_string(char *buf, const int64_t buf_len) const } else { J_OBJ_START(); J_KV(KP(this), K_(is_inited), K_(type), "name", get_dag_type_str(type_), K_(id), KPC_(dag_net), K_(dag_ret), K_(dag_status), - K_(add_time), K_(start_time), K_(running_task_cnt), K_(indegree), K_(consumer_group_id), "hash", hash(), K(task_list_.get_size())); + K_(add_time), K_(start_time), K_(running_task_cnt), K_(indegree), K_(consumer_group_id), "hash", hash(), K(task_list_.get_size()), + K_(emergency)); J_OBJ_END(); } return pos; @@ -1975,8 +1978,7 @@ int ObDagPrioScheduler::get_stored_dag_(ObIDag &dag, ObIDag *&stored_dag) // call this func with locked int ObDagPrioScheduler::add_dag_into_list_and_map_( const ObDagListIndex list_index, - ObIDag &dag, - const bool emergency) + ObIDag &dag) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; @@ -2010,6 +2012,7 @@ int ObDagPrioScheduler::add_dag_into_list_and_map_( COMMON_LOG(WARN, "failed to set dag_map", K(ret), K(dag)); } } else { + const bool emergency = dag.get_emergency(); bool add_ret = false; ObDagListIndex add_list_index = emergency ? READY_DAG_LIST : list_index; // skip to rank emergency dag if (!emergency) { @@ -2037,7 +2040,6 @@ int ObDagPrioScheduler::add_dag_into_list_and_map_( // call this func with locked int ObDagPrioScheduler::inner_add_dag_( - const bool emergency, const bool check_size_overflow, ObIDag *&dag) { @@ -2054,8 +2056,7 @@ int ObDagPrioScheduler::inner_add_dag_( } else if (OB_FAIL(add_dag_into_list_and_map_( is_waiting_dag_type(dag->get_type()) ? WAITING_DAG_LIST : is_rank_dag_type(dag->get_type()) ? RANK_DAG_LIST : READY_DAG_LIST, // compaction dag should add into RANK_LIST first. - *dag, - emergency))) { + *dag))) { if (OB_EAGAIN != ret) { COMMON_LOG(WARN, "failed to add dag into list and map", K(ret), KPC(dag)); } @@ -2478,7 +2479,6 @@ int ObDagPrioScheduler::generate_next_dag_(ObIDag &dag) ObIDag *next_dag = nullptr; ObIDag *child_dag = nullptr; ObIDagNet *dag_net = nullptr; - const bool emergency = false; const bool check_size_overflow = true; if (OB_UNLIKELY(dag.get_priority() != priority_ || OB_ISNULL(scheduler_))) { @@ -2512,7 +2512,7 @@ int ObDagPrioScheduler::generate_next_dag_(ObIDag &dag) } if (OB_FAIL(ret)) { - } else if (OB_FAIL(inner_add_dag_(emergency, check_size_overflow, next_dag))) { + } else if (OB_FAIL(inner_add_dag_(check_size_overflow, next_dag))) { LOG_WARN("failed to add next dag", K(ret), KPC(next_dag)); } } @@ -2829,12 +2829,11 @@ void ObDagPrioScheduler::dump_dag_status() } int ObDagPrioScheduler::inner_add_dag( - const bool emergency, const bool check_size_overflow, ObIDag *&dag) { ObMutexGuard guard(prio_lock_); - return inner_add_dag_(emergency, check_size_overflow, dag); + return inner_add_dag_(check_size_overflow, dag); } #define ADD_DAG_SCHEDULER_INFO(value_type, key_str, value) \ @@ -4192,7 +4191,8 @@ int ObTenantDagScheduler::add_dag( } else if (OB_UNLIKELY(!dag->is_valid())) { ret = OB_INVALID_ARGUMENT; COMMON_LOG(WARN, "invalid argument", K(ret), KPC(dag)); - } else if (OB_FAIL(prio_sche_[dag->get_priority()].inner_add_dag(emergency, check_size_overflow, dag))) { + } else if (FALSE_IT(dag->set_dag_emergency(emergency))) { + } else if (OB_FAIL(prio_sche_[dag->get_priority()].inner_add_dag(check_size_overflow, dag))) { if (OB_EAGAIN != ret) { LOG_WARN("failed to inner add dag", K(ret), KPC(dag)); } diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.h b/src/share/scheduler/ob_tenant_dag_scheduler.h index 064810e16..ab21f40db 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.h +++ b/src/share/scheduler/ob_tenant_dag_scheduler.h @@ -487,6 +487,8 @@ public: int fill_comment(char *buf, const int64_t buf_len); virtual bool is_ha_dag() const { return false; } + void set_dag_emergency(const bool emergency) { emergency_ = emergency; } + bool get_emergency() const { return emergency_; } DECLARE_VIRTUAL_TO_STRING; DISABLE_COPY_ASSIGN(ObIDag); @@ -547,6 +549,7 @@ private: uint32_t running_times_; ObIDagNet *dag_net_; // should protect by lock ObDagListIndex list_idx_; + bool emergency_; }; /* @@ -920,7 +923,6 @@ public: int loop_waiting_dag_list(); void dump_dag_status(); int inner_add_dag( - const bool emergency, const bool check_size_overflow, ObIDag *&dag); void get_all_dag_scheduler_info( @@ -1025,11 +1027,9 @@ private: const bool add_last = true); int add_dag_into_list_and_map_( const ObDagListIndex list_index, - ObIDag &dag, - const bool emergency); + ObIDag &dag); int get_stored_dag_(ObIDag &dag, ObIDag *&stored_dag); int inner_add_dag_( - const bool emergency, const bool check_size_overflow, ObIDag *&dag); void add_schedule_info_(const ObDagType::ObDagTypeEnum dag_type, const int64_t data_size);