Fix generate next dag emergency flag lost bug.

This commit is contained in:
godyangfight 2024-10-27 12:43:42 +00:00 committed by ob-robot
parent c2369f5b3e
commit eb7ad61eee
2 changed files with 16 additions and 16 deletions

View File

@ -490,7 +490,8 @@ ObIDag::ObIDag(const ObDagType::ObDagTypeEnum type)
max_retry_times_(0),
running_times_(0),
dag_net_(nullptr),
list_idx_(DAG_LIST_MAX)
list_idx_(DAG_LIST_MAX),
emergency_(false)
{
STATIC_ASSERT(static_cast<int64_t>(DAG_STATUS_MAX) == ARRAYSIZEOF(ObIDagStatusStr), "dag status str len is mismatch");
STATIC_ASSERT(MergeDagPrioCnt == ARRAYSIZEOF(MergeDagPrio), "merge dag prio len is mismatch");
@ -558,6 +559,7 @@ void ObIDag::reset()
force_cancel_flag_ = false;
dag_net_ = nullptr;
list_idx_ = DAG_LIST_MAX;
emergency_ = false;
}
int ObIDag::add_task(ObITask &task)
@ -848,7 +850,8 @@ int64_t ObIDag::to_string(char *buf, const int64_t buf_len) const
} else {
J_OBJ_START();
J_KV(KP(this), K_(is_inited), K_(type), "name", get_dag_type_str(type_), K_(id), KPC_(dag_net), K_(dag_ret), K_(dag_status),
K_(add_time), K_(start_time), K_(running_task_cnt), K_(indegree), K_(consumer_group_id), "hash", hash(), K(task_list_.get_size()));
K_(add_time), K_(start_time), K_(running_task_cnt), K_(indegree), K_(consumer_group_id), "hash", hash(), K(task_list_.get_size()),
K_(emergency));
J_OBJ_END();
}
return pos;
@ -1975,8 +1978,7 @@ int ObDagPrioScheduler::get_stored_dag_(ObIDag &dag, ObIDag *&stored_dag)
// call this func with locked
int ObDagPrioScheduler::add_dag_into_list_and_map_(
const ObDagListIndex list_index,
ObIDag &dag,
const bool emergency)
ObIDag &dag)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
@ -2010,6 +2012,7 @@ int ObDagPrioScheduler::add_dag_into_list_and_map_(
COMMON_LOG(WARN, "failed to set dag_map", K(ret), K(dag));
}
} else {
const bool emergency = dag.get_emergency();
bool add_ret = false;
ObDagListIndex add_list_index = emergency ? READY_DAG_LIST : list_index; // skip to rank emergency dag
if (!emergency) {
@ -2037,7 +2040,6 @@ int ObDagPrioScheduler::add_dag_into_list_and_map_(
// call this func with locked
int ObDagPrioScheduler::inner_add_dag_(
const bool emergency,
const bool check_size_overflow,
ObIDag *&dag)
{
@ -2054,8 +2056,7 @@ int ObDagPrioScheduler::inner_add_dag_(
} else if (OB_FAIL(add_dag_into_list_and_map_(
is_waiting_dag_type(dag->get_type()) ? WAITING_DAG_LIST :
is_rank_dag_type(dag->get_type()) ? RANK_DAG_LIST : READY_DAG_LIST, // compaction dag should add into RANK_LIST first.
*dag,
emergency))) {
*dag))) {
if (OB_EAGAIN != ret) {
COMMON_LOG(WARN, "failed to add dag into list and map", K(ret), KPC(dag));
}
@ -2478,7 +2479,6 @@ int ObDagPrioScheduler::generate_next_dag_(ObIDag &dag)
ObIDag *next_dag = nullptr;
ObIDag *child_dag = nullptr;
ObIDagNet *dag_net = nullptr;
const bool emergency = false;
const bool check_size_overflow = true;
if (OB_UNLIKELY(dag.get_priority() != priority_ || OB_ISNULL(scheduler_))) {
@ -2512,7 +2512,7 @@ int ObDagPrioScheduler::generate_next_dag_(ObIDag &dag)
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(inner_add_dag_(emergency, check_size_overflow, next_dag))) {
} else if (OB_FAIL(inner_add_dag_(check_size_overflow, next_dag))) {
LOG_WARN("failed to add next dag", K(ret), KPC(next_dag));
}
}
@ -2829,12 +2829,11 @@ void ObDagPrioScheduler::dump_dag_status()
}
int ObDagPrioScheduler::inner_add_dag(
const bool emergency,
const bool check_size_overflow,
ObIDag *&dag)
{
ObMutexGuard guard(prio_lock_);
return inner_add_dag_(emergency, check_size_overflow, dag);
return inner_add_dag_(check_size_overflow, dag);
}
#define ADD_DAG_SCHEDULER_INFO(value_type, key_str, value) \
@ -4192,7 +4191,8 @@ int ObTenantDagScheduler::add_dag(
} else if (OB_UNLIKELY(!dag->is_valid())) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "invalid argument", K(ret), KPC(dag));
} else if (OB_FAIL(prio_sche_[dag->get_priority()].inner_add_dag(emergency, check_size_overflow, dag))) {
} else if (FALSE_IT(dag->set_dag_emergency(emergency))) {
} else if (OB_FAIL(prio_sche_[dag->get_priority()].inner_add_dag(check_size_overflow, dag))) {
if (OB_EAGAIN != ret) {
LOG_WARN("failed to inner add dag", K(ret), KPC(dag));
}

View File

@ -487,6 +487,8 @@ public:
int fill_comment(char *buf, const int64_t buf_len);
virtual bool is_ha_dag() const { return false; }
void set_dag_emergency(const bool emergency) { emergency_ = emergency; }
bool get_emergency() const { return emergency_; }
DECLARE_VIRTUAL_TO_STRING;
DISABLE_COPY_ASSIGN(ObIDag);
@ -547,6 +549,7 @@ private:
uint32_t running_times_;
ObIDagNet *dag_net_; // should protect by lock
ObDagListIndex list_idx_;
bool emergency_;
};
/*
@ -920,7 +923,6 @@ public:
int loop_waiting_dag_list();
void dump_dag_status();
int inner_add_dag(
const bool emergency,
const bool check_size_overflow,
ObIDag *&dag);
void get_all_dag_scheduler_info(
@ -1025,11 +1027,9 @@ private:
const bool add_last = true);
int add_dag_into_list_and_map_(
const ObDagListIndex list_index,
ObIDag &dag,
const bool emergency);
ObIDag &dag);
int get_stored_dag_(ObIDag &dag, ObIDag *&stored_dag);
int inner_add_dag_(
const bool emergency,
const bool check_size_overflow,
ObIDag *&dag);
void add_schedule_info_(const ObDagType::ObDagTypeEnum dag_type, const int64_t data_size);