diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.cpp b/src/share/scheduler/ob_tenant_dag_scheduler.cpp index 45fe0ff0b..03503646b 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.cpp +++ b/src/share/scheduler/ob_tenant_dag_scheduler.cpp @@ -486,7 +486,6 @@ ObIDag::ObIDag(const ObDagType::ObDagTypeEnum type) dag_status_(ObIDag::DAG_STATUS_INITING), running_task_cnt_(0), is_stop_(false), - force_cancel_flag_(false), max_retry_times_(0), running_times_(0), dag_net_(nullptr), @@ -556,7 +555,6 @@ void ObIDag::reset() type_ = ObDagType::DAG_TYPE_MAX; priority_ = ObDagPrio::DAG_PRIO_MAX; is_stop_ = false; - force_cancel_flag_ = false; dag_net_ = nullptr; list_idx_ = DAG_LIST_MAX; emergency_ = false; @@ -704,6 +702,9 @@ bool ObIDag::has_finished() } else { bret = 0 == running_task_cnt_; } + if (bret) { // when return true, this dag will finish soon + is_stop_ = true; + } return bret; } @@ -713,7 +714,7 @@ int ObIDag::get_next_ready_task(ObITask *&task) bool found = false; ObMutexGuard guard(lock_); - if (ObIDag::DAG_STATUS_NODE_RUNNING == dag_status_) { + if (!is_stop_ && ObIDag::DAG_STATUS_NODE_RUNNING == dag_status_) { ObITask *cur_task = task_list_.get_first(); const ObITask *head = task_list_.get_header(); while (!found && head != cur_task && nullptr != cur_task) { @@ -846,7 +847,7 @@ void ObIDag::reset_task_running_status(ObITask &task, ObITask::ObITaskStatus tas int64_t ObIDag::to_string(char *buf, const int64_t buf_len) const { int64_t pos = 0; - if (OB_ISNULL(buf) || buf_len <= 0) { + if (OB_ISNULL(buf) || buf_len <= 0 || !is_inited_ || is_stop_) { } else { J_OBJ_START(); J_KV(KP(this), K_(is_inited), K_(type), "name", get_dag_type_str(type_), K_(id), KPC_(dag_net), K_(dag_ret), K_(dag_status), @@ -912,9 +913,10 @@ int ObIDag::finish(const ObDagStatus status, bool &dag_net_finished) return ret; } -void ObIDag::set_force_cancel_flag() +void ObIDag::set_stop() { - force_cancel_flag_ = true; + ObMutexGuard guard(lock_); + is_stop_ = true; // dag_net and dags in the same dag net should be canceled too. if (OB_NOT_NULL(dag_net_)) { dag_net_->set_cancel(); @@ -1703,7 +1705,7 @@ bool ObTenantDagWorker::get_force_cancel_flag() // ignore ret COMMON_LOG(WARN, "task does not belong to dag"); } else { - flag = dag->get_force_cancel_flag(); + flag = dag->has_set_stop(); } return flag; } @@ -2434,7 +2436,7 @@ int ObDagPrioScheduler::pop_task_from_ready_list_(ObITask *&task) if (OB_FAIL(schedule_dag_(*cur, move_dag_to_waiting_list))) { COMMON_LOG(WARN, "failed to schedule dag", K(ret), KPC(cur)); } - } else if (ObIDag::DAG_STATUS_NODE_FAILED == dag_status + } else if ((ObIDag::DAG_STATUS_NODE_FAILED == dag_status || cur->has_set_stop()) && 0 == cur->get_running_task_count()) { // no task running failed dag, need free tmp_dag = cur; cur = cur->get_next(); @@ -2557,6 +2559,9 @@ int ObDagPrioScheduler::finish_dag_( if (OB_UNLIKELY(dag.get_priority() != priority_ || OB_ISNULL(scheduler_))) { ret = OB_ERR_UNEXPECTED; COMMON_LOG(WARN, "unexpect value", K(ret), K(dag.get_priority()), K_(priority), KP_(scheduler)); + } else if (OB_UNLIKELY(dag.get_running_task_count() > 0)) { + ret = OB_ERR_UNEXPECTED; + COMMON_LOG(ERROR, "exist running task", K(ret), K(dag.get_priority()), K(dag), KP_(scheduler)); } else if (FALSE_IT(add_dag_warning_info_into_dag_net_(dag, need_add))) { } else if (OB_FAIL(dag.finish(status, dag_net_finished))) { // dag record will be erase, making this dag_net could be free. COMMON_LOG(WARN, "dag finished failed", K(ret), K(dag), "dag_ret", dag.get_dag_ret()); @@ -2569,7 +2574,7 @@ int ObDagPrioScheduler::finish_dag_( "runtime", ObTimeUtility::fast_current_time() - dag.get_start_time(), "dag_cnt", scheduler_->get_cur_dag_cnt(), "dag_type_cnt", scheduler_->get_type_dag_cnt(dag.get_type()), K(&dag), K(dag)); - if (OB_TMP_FAIL(ObSysTaskStatMgr::get_instance().del_task(dag.get_dag_id()))) { + if (dag.get_dag_id().is_valid() && OB_TMP_FAIL(ObSysTaskStatMgr::get_instance().del_task(dag.get_dag_id()))) { STORAGE_LOG(WARN, "failed to del sys task", K(tmp_ret), K(dag.get_dag_id())); } if (OB_TMP_FAIL(dag.report_result())) { @@ -2672,7 +2677,7 @@ int ObDagPrioScheduler::deal_with_fail_dag_(ObIDag &dag, bool &retry_flag) { int ret = OB_SUCCESS; // dag retry is triggered by last finish task - if (OB_UNLIKELY(dag.get_force_cancel_flag())) { + if (OB_UNLIKELY(dag.has_set_stop())) { } else if (1 == dag.get_running_task_count() && dag.check_can_retry()) { COMMON_LOG(INFO, "dag retry", K(ret), K(dag)); if (OB_FAIL(dag.reset_status_for_retry())) { // clear task/running_info and init again @@ -3016,7 +3021,7 @@ int ObDagPrioScheduler::check_ls_compaction_dag_exist_with_cancel( } } else { // for running dag exist = true; - cur->set_force_cancel_flag(); // dag exists before finding force_cancel_flag, need check exists again + cur->set_stop(); // dag exists before finding stop, need check exists again cur = cur->get_next(); } } else { @@ -3289,7 +3294,7 @@ int ObDagPrioScheduler::cancel_dag(const ObIDag &dag, const bool force_cancel) } } else if (force_cancel && cur_dag->get_dag_status() == ObIDag::DAG_STATUS_NODE_RUNNING) { LOG_INFO("cancel running dag", K(ret), KP(cur_dag), K(force_cancel)); - cur_dag->set_force_cancel_flag(); + cur_dag->set_stop(); } } return ret; diff --git a/src/share/scheduler/ob_tenant_dag_scheduler.h b/src/share/scheduler/ob_tenant_dag_scheduler.h index cdba4a45c..c7c2ef2a3 100644 --- a/src/share/scheduler/ob_tenant_dag_scheduler.h +++ b/src/share/scheduler/ob_tenant_dag_scheduler.h @@ -409,8 +409,8 @@ public: } return diagnose_type; } - bool has_set_stop() { return ATOMIC_LOAD(&is_stop_); } - void set_stop() { ATOMIC_SET(&is_stop_, true); } + bool has_set_stop() { return is_stop_; } + void set_stop(); ObIDagNet *get_dag_net() const { return dag_net_; } void set_dag_net(ObIDagNet &dag_net) { @@ -469,8 +469,6 @@ public: } void set_start_time() { start_time_ = ObTimeUtility::fast_current_time(); } int64_t get_start_time() const { return start_time_; } - void set_force_cancel_flag(); - bool get_force_cancel_flag() { return force_cancel_flag_; } int add_child_without_inheritance(ObIDag &child); int add_child_without_inheritance(const common::ObIArray &child_array); int get_next_ready_task(ObITask *&task); @@ -544,8 +542,7 @@ private: ObDagStatus dag_status_; int64_t running_task_cnt_; TaskList task_list_; // should protect by lock - bool is_stop_; - bool force_cancel_flag_; // should protect by lock + bool is_stop_; // should protect by lock uint32_t max_retry_times_; // should protect by lock uint32_t running_times_; ObIDagNet *dag_net_; // should protect by lock @@ -1316,15 +1313,18 @@ int ObIDag::alloc_task(T *&task) ntask->set_dag(*this); { lib::ObMutexGuard guard(lock_); - if (!task_list_.add_last(ntask)) { + if (is_stop_) { + ret = OB_CANCELED; + } else if (!task_list_.add_last(ntask)) { ret = common::OB_ERR_UNEXPECTED; COMMON_LOG(WARN, "Failed to add task", K(task), K_(id)); - ntask->~T(); - allocator_->free(ntask); } } if (OB_SUCC(ret)) { task = ntask; + } else { + ntask->~T(); + allocator_->free(ntask); } } return ret; diff --git a/src/storage/blocksstable/ob_column_checksum_struct.cpp b/src/storage/blocksstable/ob_column_checksum_struct.cpp index 6551a0941..533fde067 100644 --- a/src/storage/blocksstable/ob_column_checksum_struct.cpp +++ b/src/storage/blocksstable/ob_column_checksum_struct.cpp @@ -47,13 +47,19 @@ int ObColumnCkmStruct::deserialize(ObArenaAllocator &allocator, const char *buf, const int64_t data_len, int64_t &pos) { int ret = OB_SUCCESS; - OB_UNIS_DECODE(count_); - if (OB_FAIL(ret) || 0 == count_) { - } else if (OB_ISNULL(column_checksums_ = static_cast(allocator.alloc(sizeof(int64_t) * count_)))) { + int64_t tmp_count = 0; + OB_UNIS_DECODE(tmp_count); + if (OB_FAIL(ret) || 0 == tmp_count) { + } else if (OB_ISNULL(column_checksums_ = static_cast(allocator.alloc(sizeof(int64_t) * tmp_count)))) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to allocate column checksum memory", K(ret), K(count_)); + LOG_WARN("fail to allocate column checksum memory", K(ret), K(tmp_count)); } else { - OB_UNIS_DECODE_ARRAY(column_checksums_, count_); + OB_UNIS_DECODE_ARRAY(column_checksums_, tmp_count); + if (OB_SUCC(ret)) { + count_ = tmp_count; + } else { + reset(); + } } return ret; } diff --git a/src/storage/blocksstable/ob_sstable_meta.cpp b/src/storage/blocksstable/ob_sstable_meta.cpp index 0453a955d..b5801c830 100644 --- a/src/storage/blocksstable/ob_sstable_meta.cpp +++ b/src/storage/blocksstable/ob_sstable_meta.cpp @@ -876,7 +876,7 @@ int ObSSTableMeta::deserialize_( if (OB_FAIL(basic_meta_.deserialize(buf, data_len, pos))) { LOG_WARN("fail to deserialize basic meta", K(ret), KP(buf), K(data_len), K(pos)); } else if (OB_FAIL(column_ckm_struct_.deserialize(allocator, buf, data_len, pos))) { - LOG_WARN("fail to deserialize column checksum", K(ret), K(column_ckm_struct_)); + LOG_WARN("fail to deserialize column checksum", K(ret)); } else { ObMicroBlockDesMeta des_meta(basic_meta_.compressor_type_, basic_meta_.root_row_store_type_, diff --git a/unittest/share/scheduler/test_dag_net_in_dag_scheduler.cpp b/unittest/share/scheduler/test_dag_net_in_dag_scheduler.cpp index ccb16a475..46de611c1 100644 --- a/unittest/share/scheduler/test_dag_net_in_dag_scheduler.cpp +++ b/unittest/share/scheduler/test_dag_net_in_dag_scheduler.cpp @@ -944,6 +944,17 @@ public: } return common::OB_SUCCESS; } + virtual int fill_info_param(compaction::ObIBasicInfoParam *&out_param, + ObIAllocator &allocator) const override + { + int ret = OB_SUCCESS; + if (!is_inited_) { + ret = OB_NOT_INIT; + } else if (OB_FAIL(ADD_DAG_WARN_INFO_PARAM(out_param, allocator, get_type(), id_, id_+1))) { + COMMON_LOG(WARN, "fail to add dag warning info param", K(ret)); + } + return ret; + } int inner_reset_status_for_retry() { return OB_SUCCESS; } INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(is_inited), K_(type), K_(id), K(task_list_.get_size()), K_(dag_ret)); }; @@ -1685,7 +1696,7 @@ TEST_F(TestDagScheduler, test_cancel_waiting_dag) ObCancelDag *first_dag = dag_net->first_dag_; EXPECT_NE(nullptr, first_dag); - first_dag->set_force_cancel_flag(); + first_dag->set_stop(); first_dag->can_schedule_ = true; wait_scheduler(); }