[CP] fix print log sanity core & add defense for dag scheduler

This commit is contained in:
yangqise7en 2024-10-30 07:14:23 +00:00 committed by ob-robot
parent 82a5c6fe6c
commit 7cabe08a1c
5 changed files with 50 additions and 28 deletions

View File

@ -486,7 +486,6 @@ ObIDag::ObIDag(const ObDagType::ObDagTypeEnum type)
dag_status_(ObIDag::DAG_STATUS_INITING),
running_task_cnt_(0),
is_stop_(false),
force_cancel_flag_(false),
max_retry_times_(0),
running_times_(0),
dag_net_(nullptr),
@ -556,7 +555,6 @@ void ObIDag::reset()
type_ = ObDagType::DAG_TYPE_MAX;
priority_ = ObDagPrio::DAG_PRIO_MAX;
is_stop_ = false;
force_cancel_flag_ = false;
dag_net_ = nullptr;
list_idx_ = DAG_LIST_MAX;
emergency_ = false;
@ -704,6 +702,9 @@ bool ObIDag::has_finished()
} else {
bret = 0 == running_task_cnt_;
}
if (bret) { // when return true, this dag will finish soon
is_stop_ = true;
}
return bret;
}
@ -713,7 +714,7 @@ int ObIDag::get_next_ready_task(ObITask *&task)
bool found = false;
ObMutexGuard guard(lock_);
if (ObIDag::DAG_STATUS_NODE_RUNNING == dag_status_) {
if (!is_stop_ && ObIDag::DAG_STATUS_NODE_RUNNING == dag_status_) {
ObITask *cur_task = task_list_.get_first();
const ObITask *head = task_list_.get_header();
while (!found && head != cur_task && nullptr != cur_task) {
@ -846,7 +847,7 @@ void ObIDag::reset_task_running_status(ObITask &task, ObITask::ObITaskStatus tas
int64_t ObIDag::to_string(char *buf, const int64_t buf_len) const
{
int64_t pos = 0;
if (OB_ISNULL(buf) || buf_len <= 0) {
if (OB_ISNULL(buf) || buf_len <= 0 || !is_inited_ || is_stop_) {
} else {
J_OBJ_START();
J_KV(KP(this), K_(is_inited), K_(type), "name", get_dag_type_str(type_), K_(id), KPC_(dag_net), K_(dag_ret), K_(dag_status),
@ -912,9 +913,10 @@ int ObIDag::finish(const ObDagStatus status, bool &dag_net_finished)
return ret;
}
void ObIDag::set_force_cancel_flag()
void ObIDag::set_stop()
{
force_cancel_flag_ = true;
ObMutexGuard guard(lock_);
is_stop_ = true;
// dag_net and dags in the same dag net should be canceled too.
if (OB_NOT_NULL(dag_net_)) {
dag_net_->set_cancel();
@ -1703,7 +1705,7 @@ bool ObTenantDagWorker::get_force_cancel_flag()
// ignore ret
COMMON_LOG(WARN, "task does not belong to dag");
} else {
flag = dag->get_force_cancel_flag();
flag = dag->has_set_stop();
}
return flag;
}
@ -2434,7 +2436,7 @@ int ObDagPrioScheduler::pop_task_from_ready_list_(ObITask *&task)
if (OB_FAIL(schedule_dag_(*cur, move_dag_to_waiting_list))) {
COMMON_LOG(WARN, "failed to schedule dag", K(ret), KPC(cur));
}
} else if (ObIDag::DAG_STATUS_NODE_FAILED == dag_status
} else if ((ObIDag::DAG_STATUS_NODE_FAILED == dag_status || cur->has_set_stop())
&& 0 == cur->get_running_task_count()) { // no task running failed dag, need free
tmp_dag = cur;
cur = cur->get_next();
@ -2557,6 +2559,9 @@ int ObDagPrioScheduler::finish_dag_(
if (OB_UNLIKELY(dag.get_priority() != priority_ || OB_ISNULL(scheduler_))) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "unexpect value", K(ret), K(dag.get_priority()), K_(priority), KP_(scheduler));
} else if (OB_UNLIKELY(dag.get_running_task_count() > 0)) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(ERROR, "exist running task", K(ret), K(dag.get_priority()), K(dag), KP_(scheduler));
} else if (FALSE_IT(add_dag_warning_info_into_dag_net_(dag, need_add))) {
} else if (OB_FAIL(dag.finish(status, dag_net_finished))) { // dag record will be erase, making this dag_net could be free.
COMMON_LOG(WARN, "dag finished failed", K(ret), K(dag), "dag_ret", dag.get_dag_ret());
@ -2569,7 +2574,7 @@ int ObDagPrioScheduler::finish_dag_(
"runtime", ObTimeUtility::fast_current_time() - dag.get_start_time(),
"dag_cnt", scheduler_->get_cur_dag_cnt(), "dag_type_cnt", scheduler_->get_type_dag_cnt(dag.get_type()),
K(&dag), K(dag));
if (OB_TMP_FAIL(ObSysTaskStatMgr::get_instance().del_task(dag.get_dag_id()))) {
if (dag.get_dag_id().is_valid() && OB_TMP_FAIL(ObSysTaskStatMgr::get_instance().del_task(dag.get_dag_id()))) {
STORAGE_LOG(WARN, "failed to del sys task", K(tmp_ret), K(dag.get_dag_id()));
}
if (OB_TMP_FAIL(dag.report_result())) {
@ -2672,7 +2677,7 @@ int ObDagPrioScheduler::deal_with_fail_dag_(ObIDag &dag, bool &retry_flag)
{
int ret = OB_SUCCESS;
// dag retry is triggered by last finish task
if (OB_UNLIKELY(dag.get_force_cancel_flag())) {
if (OB_UNLIKELY(dag.has_set_stop())) {
} else if (1 == dag.get_running_task_count() && dag.check_can_retry()) {
COMMON_LOG(INFO, "dag retry", K(ret), K(dag));
if (OB_FAIL(dag.reset_status_for_retry())) { // clear task/running_info and init again
@ -3016,7 +3021,7 @@ int ObDagPrioScheduler::check_ls_compaction_dag_exist_with_cancel(
}
} else { // for running dag
exist = true;
cur->set_force_cancel_flag(); // dag exists before finding force_cancel_flag, need check exists again
cur->set_stop(); // dag exists before finding stop, need check exists again
cur = cur->get_next();
}
} else {
@ -3289,7 +3294,7 @@ int ObDagPrioScheduler::cancel_dag(const ObIDag &dag, const bool force_cancel)
}
} else if (force_cancel && cur_dag->get_dag_status() == ObIDag::DAG_STATUS_NODE_RUNNING) {
LOG_INFO("cancel running dag", K(ret), KP(cur_dag), K(force_cancel));
cur_dag->set_force_cancel_flag();
cur_dag->set_stop();
}
}
return ret;

View File

@ -409,8 +409,8 @@ public:
}
return diagnose_type;
}
bool has_set_stop() { return ATOMIC_LOAD(&is_stop_); }
void set_stop() { ATOMIC_SET(&is_stop_, true); }
bool has_set_stop() { return is_stop_; }
void set_stop();
ObIDagNet *get_dag_net() const { return dag_net_; }
void set_dag_net(ObIDagNet &dag_net)
{
@ -469,8 +469,6 @@ public:
}
void set_start_time() { start_time_ = ObTimeUtility::fast_current_time(); }
int64_t get_start_time() const { return start_time_; }
void set_force_cancel_flag();
bool get_force_cancel_flag() { return force_cancel_flag_; }
int add_child_without_inheritance(ObIDag &child);
int add_child_without_inheritance(const common::ObIArray<ObINodeWithChild*> &child_array);
int get_next_ready_task(ObITask *&task);
@ -544,8 +542,7 @@ private:
ObDagStatus dag_status_;
int64_t running_task_cnt_;
TaskList task_list_; // should protect by lock
bool is_stop_;
bool force_cancel_flag_; // should protect by lock
bool is_stop_; // should protect by lock
uint32_t max_retry_times_; // should protect by lock
uint32_t running_times_;
ObIDagNet *dag_net_; // should protect by lock
@ -1316,15 +1313,18 @@ int ObIDag::alloc_task(T *&task)
ntask->set_dag(*this);
{
lib::ObMutexGuard guard(lock_);
if (!task_list_.add_last(ntask)) {
if (is_stop_) {
ret = OB_CANCELED;
} else if (!task_list_.add_last(ntask)) {
ret = common::OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "Failed to add task", K(task), K_(id));
ntask->~T();
allocator_->free(ntask);
}
}
if (OB_SUCC(ret)) {
task = ntask;
} else {
ntask->~T();
allocator_->free(ntask);
}
}
return ret;

View File

@ -47,13 +47,19 @@ int ObColumnCkmStruct::deserialize(ObArenaAllocator &allocator, const char *buf,
const int64_t data_len, int64_t &pos)
{
int ret = OB_SUCCESS;
OB_UNIS_DECODE(count_);
if (OB_FAIL(ret) || 0 == count_) {
} else if (OB_ISNULL(column_checksums_ = static_cast<int64_t *>(allocator.alloc(sizeof(int64_t) * count_)))) {
int64_t tmp_count = 0;
OB_UNIS_DECODE(tmp_count);
if (OB_FAIL(ret) || 0 == tmp_count) {
} else if (OB_ISNULL(column_checksums_ = static_cast<int64_t *>(allocator.alloc(sizeof(int64_t) * tmp_count)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate column checksum memory", K(ret), K(count_));
LOG_WARN("fail to allocate column checksum memory", K(ret), K(tmp_count));
} else {
OB_UNIS_DECODE_ARRAY(column_checksums_, count_);
OB_UNIS_DECODE_ARRAY(column_checksums_, tmp_count);
if (OB_SUCC(ret)) {
count_ = tmp_count;
} else {
reset();
}
}
return ret;
}

View File

@ -876,7 +876,7 @@ int ObSSTableMeta::deserialize_(
if (OB_FAIL(basic_meta_.deserialize(buf, data_len, pos))) {
LOG_WARN("fail to deserialize basic meta", K(ret), KP(buf), K(data_len), K(pos));
} else if (OB_FAIL(column_ckm_struct_.deserialize(allocator, buf, data_len, pos))) {
LOG_WARN("fail to deserialize column checksum", K(ret), K(column_ckm_struct_));
LOG_WARN("fail to deserialize column checksum", K(ret));
} else {
ObMicroBlockDesMeta des_meta(basic_meta_.compressor_type_,
basic_meta_.root_row_store_type_,

View File

@ -944,6 +944,17 @@ public:
}
return common::OB_SUCCESS;
}
virtual int fill_info_param(compaction::ObIBasicInfoParam *&out_param,
ObIAllocator &allocator) const override
{
int ret = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
} else if (OB_FAIL(ADD_DAG_WARN_INFO_PARAM(out_param, allocator, get_type(), id_, id_+1))) {
COMMON_LOG(WARN, "fail to add dag warning info param", K(ret));
}
return ret;
}
int inner_reset_status_for_retry() { return OB_SUCCESS; }
INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(is_inited), K_(type), K_(id), K(task_list_.get_size()), K_(dag_ret));
};
@ -1685,7 +1696,7 @@ TEST_F(TestDagScheduler, test_cancel_waiting_dag)
ObCancelDag *first_dag = dag_net->first_dag_;
EXPECT_NE(nullptr, first_dag);
first_dag->set_force_cancel_flag();
first_dag->set_stop();
first_dag->can_schedule_ = true;
wait_scheduler();
}