diff --git a/src/share/scheduler/ob_dag_scheduler.cpp b/src/share/scheduler/ob_dag_scheduler.cpp index 75f59f142f..6167687575 100755 --- a/src/share/scheduler/ob_dag_scheduler.cpp +++ b/src/share/scheduler/ob_dag_scheduler.cpp @@ -2172,32 +2172,57 @@ int ObTenantDagScheduler::get_minor_exe_dag_info( return ret; } -int ObTenantDagScheduler::check_ls_compaction_dag_exist(const ObLSID &ls_id, bool &exist) +int ObTenantDagScheduler::check_ls_compaction_dag_exist_with_cancel( + const ObLSID &ls_id, + bool &exist) { int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; exist = false; compaction::ObTabletMergeDag *dag = nullptr; - ObThreadCondGuard guard(scheduler_sync_); - for (int64_t i = 0; i < ObIDag::MergeDagPrioCnt; ++i) { - ObIDag *head = dag_list_[READY_DAG_LIST].get_head(ObIDag::MergeDagPrio[i]); - ObIDag *cur = head->get_next(); - while (head != cur) { - if (ObDagType::DAG_TYPE_MDS_TABLE_MERGE == cur->get_type()) { - // TODO (bowen.gbw) : make ObMdsTableMergeDag inherit from ObTabletMergeDag - const mds::ObMdsTableMergeDag *mds_dag = static_cast(cur); - if (ls_id == mds_dag->get_param().ls_id_) { - exist = true; - break; + ObIDagNet *unused_erase_dag_net = nullptr; + ObIDag *cancel_dag = nullptr; + bool cancel_flag = false; + int64_t cancel_dag_cnt = 0; + { + ObThreadCondGuard guard(scheduler_sync_); + for (int64_t i = 0; i < ObIDag::MergeDagPrioCnt; ++i) { + ObIDag *head = dag_list_[READY_DAG_LIST].get_head(ObIDag::MergeDagPrio[i]); + ObIDag *cur = head->get_next(); + while (head != cur) { + if (ObDagType::DAG_TYPE_MDS_TABLE_MERGE == cur->get_type()) { + // TODO (bowen.gbw) : make ObMdsTableMergeDag inherit from ObTabletMergeDag + const mds::ObMdsTableMergeDag *mds_dag = static_cast(cur); + cancel_flag = (ls_id == mds_dag->get_param().ls_id_); + } else { + dag = static_cast(cur); + cancel_flag = (ls_id == dag->get_ls_id()); } - } else { - dag = static_cast(cur); - if (ls_id == dag->get_ctx().param_.ls_id_) { - exist = true; - break; + if (cancel_flag) { + if (cur->get_dag_status() == ObIDag::DAG_STATUS_READY) { + cancel_dag = cur; + cur = cur->get_next(); + if (OB_UNLIKELY(nullptr != cancel_dag->get_dag_net())) { + tmp_ret = OB_ERR_UNEXPECTED; + COMMON_LOG(WARN, "compaction dag should not in dag net", KR(tmp_ret)); + } else if (OB_TMP_FAIL(finish_dag_(ObIDag::DAG_STATUS_ABORT, *cancel_dag, unused_erase_dag_net))) { + COMMON_LOG(WARN, "failed to erase dag", K(tmp_ret), KPC(cancel_dag)); + ob_abort(); + } else { + ++cancel_dag_cnt; + } + } else { + exist = true; + cur = cur->get_next(); + } + } else { + cur = cur->get_next(); } } - cur = cur->get_next(); } + } // end of scheduler_sync_ + if (OB_SUCC(ret)) { + COMMON_LOG(INFO, "cancel dag when check ls compaction dag exist", KR(ret), K(cancel_dag_cnt), K(exist)); } return ret; } diff --git a/src/share/scheduler/ob_dag_scheduler.h b/src/share/scheduler/ob_dag_scheduler.h index f5f245e2b3..9922dd6ea9 100644 --- a/src/share/scheduler/ob_dag_scheduler.h +++ b/src/share/scheduler/ob_dag_scheduler.h @@ -822,7 +822,10 @@ public: compaction::ObDiagnoseTabletCompProgress &progress); int get_max_major_finish_time(const int64_t version, int64_t &estimated_finish_time); int diagnose_dag(const ObIDag *dag, compaction::ObDiagnoseTabletCompProgress &input_progress); - int check_ls_compaction_dag_exist(const ObLSID &ls_id, bool &exist); + + // 1. check ls compaction exist + // 2. cancel ls compaction waiting dag + int check_ls_compaction_dag_exist_with_cancel(const ObLSID &ls_id, bool &exist); int check_dag_net_exist( const ObDagId &dag_id, bool &exist); int cancel_dag_net(const ObDagId &dag_id); diff --git a/src/storage/compaction/ob_medium_compaction_func.cpp b/src/storage/compaction/ob_medium_compaction_func.cpp index dd8902eea9..619810fde6 100755 --- a/src/storage/compaction/ob_medium_compaction_func.cpp +++ b/src/storage/compaction/ob_medium_compaction_func.cpp @@ -534,7 +534,37 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot( return ret; } -int ObMediumCompactionScheduleFunc::init_parallel_range( +int ObMediumCompactionScheduleFunc::init_schema_changed( + const ObSSTableMeta &sstable_meta, + ObMediumCompactionInfo &medium_info) +{ + int ret = OB_SUCCESS; + int64_t full_stored_col_cnt = 0; + const ObSSTableBasicMeta &basic_meta = sstable_meta.get_basic_meta(); + const ObStorageSchema &schema = medium_info.storage_schema_; + if (OB_UNLIKELY(!schema.is_inited())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("schema is not inited", KR(ret), K(schema)); + } else if (OB_FAIL(schema.get_stored_column_count_in_sstable(full_stored_col_cnt))) { + LOG_WARN("failed to get stored column count in sstable", K(ret), K(schema)); + } else if (OB_UNLIKELY(sstable_meta.get_column_count() > full_stored_col_cnt)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("stored col cnt in curr schema is less than old major sstable", K(ret), + "col_cnt_in_sstable", sstable_meta.get_column_count(), + "col_cnt_in_schema", full_stored_col_cnt, KPC(this)); + } else if (sstable_meta.get_column_count() != full_stored_col_cnt + || basic_meta.compressor_type_ != schema.get_compressor_type() + || (ObRowStoreType::DUMMY_ROW_STORE != basic_meta.latest_row_store_type_ + && basic_meta.latest_row_store_type_ != schema.row_store_type_)) { + medium_info.is_schema_changed_ = true; + LOG_INFO("schema changed", K(sstable_meta), K(schema)); + } else { + medium_info.is_schema_changed_ = false; + } + return ret; +} + +int ObMediumCompactionScheduleFunc::init_parallel_range_and_schema_changed( const ObGetMergeTablesResult &result, ObMediumCompactionInfo &medium_info) { @@ -575,6 +605,10 @@ int ObMediumCompactionScheduleFunc::init_parallel_range( KPC(first_sstable), K(meta_handle)); } } + // init is_schema_changed + if (FAILEDx(init_schema_changed(meta_handle.get_sstable_meta(), medium_info))) { + STORAGE_LOG(WARN, "failed to init schema changed", KR(ret), "sstable_meta", meta_handle.get_sstable_meta()); + } if (OB_FAIL(ret)) { } else if (expected_task_count <= 1) { @@ -713,7 +747,7 @@ int ObMediumCompactionScheduleFunc::prepare_medium_info( } } } - if (FAILEDx(init_parallel_range(result, medium_info))) { + if (FAILEDx(init_parallel_range_and_schema_changed(result, medium_info))) { LOG_WARN("failed to init parallel range", K(ret), K(medium_info)); } else if (OB_UNLIKELY(result.handle_.empty())) { ret = OB_ERR_UNEXPECTED; diff --git a/src/storage/compaction/ob_medium_compaction_func.h b/src/storage/compaction/ob_medium_compaction_func.h index 882e35d8df..8c2d92d6bc 100755 --- a/src/storage/compaction/ob_medium_compaction_func.h +++ b/src/storage/compaction/ob_medium_compaction_func.h @@ -70,9 +70,12 @@ protected: const ObGetMergeTablesResult &result, const int64_t schema_version, ObMediumCompactionInfo &medium_info); - int init_parallel_range( + int init_parallel_range_and_schema_changed( const ObGetMergeTablesResult &result, ObMediumCompactionInfo &medium_info); + int init_schema_changed( + const ObSSTableMeta &sstable_meta, + ObMediumCompactionInfo &medium_info); static int get_result_for_major( ObTablet &tablet, const ObMediumCompactionInfo &medium_info, @@ -128,7 +131,6 @@ protected: const ObTabletID &tablet_id, const int64_t schema_version, uint64_t &table_id); - static const int64_t DEFAULT_SYNC_SCHEMA_CLOG_TIMEOUT = 1000L * 1000L; // 1s static const int64_t DEFAULT_SCHEDULE_MEDIUM_INTERVAL = 60L * 1000L * 1000L; // 60s static constexpr double SCHEDULE_RANGE_INC_ROW_COUNT_PERCENRAGE_THRESHOLD = 0.2; static const int64_t SCHEDULE_RANGE_ROW_COUNT_THRESHOLD = 1000 * 1000L; // 100w diff --git a/src/storage/compaction/ob_medium_compaction_info.cpp b/src/storage/compaction/ob_medium_compaction_info.cpp index 923aa61586..fc4daf1f5b 100644 --- a/src/storage/compaction/ob_medium_compaction_info.cpp +++ b/src/storage/compaction/ob_medium_compaction_info.cpp @@ -352,6 +352,7 @@ ObMediumCompactionInfo::ObMediumCompactionInfo() compaction_type_(COMPACTION_TYPE_MAX), contain_parallel_range_(false), medium_merge_reason_(ObAdaptiveMergePolicy::NONE), + is_schema_changed_(false), reserved_(0), cluster_id_(0), data_version_(0), @@ -412,6 +413,9 @@ void ObMediumCompactionInfo::reset() info_ = 0; medium_compat_version_ = 0; compaction_type_ = COMPACTION_TYPE_MAX; + contain_parallel_range_ = false; + medium_merge_reason_ = ObAdaptiveMergePolicy::NONE; + is_schema_changed_ = false; cluster_id_ = 0; medium_snapshot_ = 0; last_medium_snapshot_ = 0; @@ -540,8 +544,8 @@ int64_t ObMediumCompactionInfo::to_string(char* buf, const int64_t buf_len) cons J_OBJ_START(); J_KV(K_(cluster_id), K_(medium_compat_version), K_(data_version), "compaction_type", ObMediumCompactionInfo::get_compaction_type_str((ObCompactionType)compaction_type_), - "medium_merge_reason", ObAdaptiveMergePolicy::merge_reason_to_str(medium_merge_reason_), K_(cluster_id), - K_(medium_snapshot), K_(last_medium_snapshot), K_(storage_schema), + "medium_merge_reason", ObAdaptiveMergePolicy::merge_reason_to_str(medium_merge_reason_), + K_(is_schema_changed), K_(cluster_id), K_(medium_snapshot), K_(last_medium_snapshot), K_(storage_schema), K_(contain_parallel_range), K_(parallel_merge_info)); J_OBJ_END(); } diff --git a/src/storage/compaction/ob_medium_compaction_info.h b/src/storage/compaction/ob_medium_compaction_info.h index d0edaf41f7..9b3d9f6f0d 100755 --- a/src/storage/compaction/ob_medium_compaction_info.h +++ b/src/storage/compaction/ob_medium_compaction_info.h @@ -213,7 +213,7 @@ public: private: static const int32_t SCS_ONE_BIT = 1; - static const int32_t SCS_RESERVED_BITS = 49; + static const int32_t SCS_RESERVED_BITS = 48; public: union { @@ -223,6 +223,7 @@ public: uint64_t compaction_type_ : 2; uint64_t contain_parallel_range_ : SCS_ONE_BIT; uint64_t medium_merge_reason_ : 8; + uint64_t is_schema_changed_ : SCS_ONE_BIT; uint64_t reserved_ : SCS_RESERVED_BITS; }; }; diff --git a/src/storage/compaction/ob_tablet_merge_ctx.cpp b/src/storage/compaction/ob_tablet_merge_ctx.cpp index 7c7732ecae..a592cfbce1 100755 --- a/src/storage/compaction/ob_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_tablet_merge_ctx.cpp @@ -651,6 +651,7 @@ int ObTabletMergeCtx::inner_init_for_medium() int ret = OB_SUCCESS; const ObMediumCompactionInfo *medium_info = nullptr; ObGetMergeTablesResult get_merge_table_result; + bool is_schema_changed = false; if (OB_FAIL(get_merge_tables(get_merge_table_result))) { if (OB_NO_NEED_MERGE != ret) { LOG_WARN("failed to get merge tables", K(ret), KPC(this), K(get_merge_table_result)); @@ -663,11 +664,12 @@ int ObTabletMergeCtx::inner_init_for_medium() ret = OB_EAGAIN; LOG_INFO("tx table is not ready. waiting for max_decided_log_ts ...", KR(ret), "merge_scn", get_merge_table_result.scn_range_.end_scn_); - } else if (OB_FAIL(init_get_medium_compaction_info(param_.merge_version_, get_merge_table_result))) { // have checked medium info inside + } else if (OB_FAIL(init_get_medium_compaction_info(param_.merge_version_, get_merge_table_result, is_schema_changed))) { + // have checked medium info inside LOG_WARN("failed to get medium compaction info", K(ret), KPC(this)); } else if (OB_FAIL(get_basic_info_from_result(get_merge_table_result))) { LOG_WARN("failed to set basic info to ctx", K(ret), K(get_merge_table_result), KPC(this)); - } else if (OB_FAIL(cal_major_merge_param(get_merge_table_result))) { + } else if (OB_FAIL(cal_major_merge_param(get_merge_table_result, is_schema_changed))) { LOG_WARN("fail to cal major merge param", K(ret), KPC(this)); } return ret; @@ -693,7 +695,8 @@ int ObTabletMergeCtx::get_merge_tables(ObGetMergeTablesResult &get_merge_table_r int ObTabletMergeCtx::init_get_medium_compaction_info( const int64_t medium_snapshot, - ObGetMergeTablesResult &get_merge_table_result) + ObGetMergeTablesResult &get_merge_table_result, + bool &is_schema_changed) { int ret = OB_SUCCESS; ObTablet *tablet = tablet_handle_.get_obj(); @@ -997,11 +1000,12 @@ int ObTabletMergeCtx::cal_minor_merge_param() return ret; } -int ObTabletMergeCtx::cal_major_merge_param(const ObGetMergeTablesResult &get_merge_table_result) +int ObTabletMergeCtx::cal_major_merge_param( + const ObGetMergeTablesResult &get_merge_table_result, + const bool is_schema_changed) { int ret = OB_SUCCESS; ObSSTable *base_table = nullptr; - bool is_schema_changed = false; int64_t full_stored_col_cnt = 0; ObSSTableMetaHandle sstable_meta_hdl; @@ -1029,14 +1033,7 @@ int ObTabletMergeCtx::cal_major_merge_param(const ObGetMergeTablesResult &get_me } else { is_full_merge_ = false; } - const ObSSTableBasicMeta &base_meta = sstable_meta_hdl.get_sstable_meta().get_basic_meta(); - if (sstable_meta_hdl.get_sstable_meta().get_column_count() != full_stored_col_cnt - || base_meta.compressor_type_ != get_schema()->get_compressor_type() - || (ObRowStoreType::DUMMY_ROW_STORE != base_meta.latest_row_store_type_ - && base_meta.latest_row_store_type_ != get_schema()->row_store_type_)) { - is_schema_changed = true; // used to change merge_level, merge_round is from schema - } const int64_t meta_progressive_merge_round = base_meta.progressive_merge_round_; const int64_t schema_progressive_merge_round = get_schema()->get_progressive_merge_round(); diff --git a/src/storage/compaction/ob_tablet_merge_ctx.h b/src/storage/compaction/ob_tablet_merge_ctx.h index b857868138..064df77898 100755 --- a/src/storage/compaction/ob_tablet_merge_ctx.h +++ b/src/storage/compaction/ob_tablet_merge_ctx.h @@ -162,22 +162,19 @@ struct ObTabletMergeCtx int inner_init_for_mini(bool &skip_rest_operation); int inner_init_for_medium(); - int init_get_medium_compaction_info(const int64_t medium_snapshot, ObGetMergeTablesResult &result); - int get_specified_medium_compaction_info_from_memtable( - ObIAllocator &allocator, + int init_get_medium_compaction_info( const int64_t medium_snapshot, - ObMediumCompactionInfo &info); + ObGetMergeTablesResult &result, + bool &is_schema_changed); int get_schema_and_gene_from_result(const ObGetMergeTablesResult &get_merge_table_result); - int get_storage_schema_and_gene_from_result(const ObGetMergeTablesResult &get_merge_table_result); int get_storage_schema_to_merge(const ObTablesHandleArray &merge_tables_handle); - int get_max_data_scn(const ObTablesHandleArray &merge_tables_handle); int try_swap_tablet_handle(); int get_medium_compaction_info_to_store(); static bool need_swap_tablet(const ObTablet &tablet, const int64_t row_count, const int64_t macro_count); int get_basic_info_from_result(const ObGetMergeTablesResult &get_merge_table_result); int cal_minor_merge_param(); - int cal_major_merge_param(const ObGetMergeTablesResult &get_merge_table_result); + int cal_major_merge_param(const ObGetMergeTablesResult &get_merge_table_result, const bool is_schema_changed); int init_merge_info(); int prepare_index_tree(); int prepare_merge_progress(); diff --git a/src/storage/compaction/ob_tablet_merge_task.cpp b/src/storage/compaction/ob_tablet_merge_task.cpp index ff97002fef..99f6e188e4 100755 --- a/src/storage/compaction/ob_tablet_merge_task.cpp +++ b/src/storage/compaction/ob_tablet_merge_task.cpp @@ -151,15 +151,15 @@ int ObMergeParameter::init(compaction::ObTabletMergeCtx &merge_ctx, const int64_ rowkey_read_info_ = &(merge_ctx.tablet_handle_.get_obj()->get_rowkey_read_info()); merge_scn_ = merge_ctx.merge_scn_; - if (merge_scn_ > scn_range_.end_scn_) { - if (ObMergeType::BACKFILL_TX_MERGE != merge_type_) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("merge scn is bigger than scn range but merge type is not backfill, unexpected", - K(ret), K(merge_scn_), K(scn_range_), K(merge_type_)); - } else { - FLOG_INFO("set backfill merge scn", K(merge_scn_), K(scn_range_), K(merge_type_)); - } - } + if (merge_scn_ > scn_range_.end_scn_) { + if (ObMergeType::BACKFILL_TX_MERGE != merge_type_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("merge scn is bigger than scn range but merge type is not backfill, unexpected", + K(ret), K(merge_scn_), K(scn_range_), K(merge_type_)); + } else { + FLOG_INFO("set backfill merge scn", K(merge_scn_), K(scn_range_), K(merge_type_)); + } + } } return ret; } diff --git a/src/storage/compaction/ob_tablet_merge_task.h b/src/storage/compaction/ob_tablet_merge_task.h index 6b3c7596ca..12c5b8ec6c 100644 --- a/src/storage/compaction/ob_tablet_merge_task.h +++ b/src/storage/compaction/ob_tablet_merge_task.h @@ -76,10 +76,6 @@ struct ObMergeParameter { bool is_full_merge_; // full merge or increment merge, duplicated with merge_level compaction::ObCachedTransStateMgr *trans_state_mgr_; share::SCN merge_scn_; - - OB_INLINE bool is_major_merge() const { return storage::is_major_merge(merge_type_); } - OB_INLINE bool is_mini_merge() const { return storage::is_mini_merge(merge_type_); } - OB_INLINE bool need_checksum() const { return storage::is_major_merge(merge_type_); } TO_STRING_KV(KPC_(tables_handle), K_(merge_type), K_(merge_level), KP_(merge_schema), K_(merge_range), K_(version_range), K_(scn_range), K_(is_full_merge), K_(merge_scn)); private: @@ -140,7 +136,6 @@ public: private: virtual int check_before_init() override; virtual int inner_init_ctx(ObTabletMergeCtx &ctx, bool &skip_merge_task_flag) override; - int create_sstable_directly(); private: DISALLOW_COPY_AND_ASSIGN(ObTabletMajorPrepareTask); }; @@ -166,8 +161,6 @@ public: private: int create_sstable_after_merge(); - int check_data_checksum(); - int check_empty_merge_valid(ObTabletMergeCtx &ctx); int get_merged_sstable(ObTabletMergeCtx &ctx); int add_sstable_for_merge(ObTabletMergeCtx &ctx); int try_schedule_compaction_after_mini(ObTabletMergeCtx &ctx, storage::ObTabletHandle &tablet_handle); @@ -214,6 +207,7 @@ public: virtual ~ObBasicTabletMergeDag(); ObTabletMergeCtx &get_ctx() { return *ctx_; } ObTabletMergeDagParam &get_param() { return param_; } + virtual const share::ObLSID & get_ls_id() const { return param_.ls_id_; } virtual bool operator == (const ObIDag &other) const override; virtual int64_t hash() const override; virtual int fill_info_param(compaction::ObIBasicInfoParam *&out_param, ObIAllocator &allocator) const override; @@ -333,8 +327,6 @@ public: INHERIT_TO_STRING_KV("ObBasicTabletMergeDag", ObBasicTabletMergeDag, K_(merge_scn_range)); private: - int prepare_compaction(const ObGetMergeTablesResult &result); - virtual int prepare_compaction_filter() { return OB_SUCCESS; } virtual int create_first_task(const ObGetMergeTablesResult &result, const bool need_swap_tablet_flag); DISALLOW_COPY_AND_ASSIGN(ObTabletMergeExecuteDag); diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index da4c057fd6..37167ceb35 100755 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -514,7 +514,7 @@ int ObTenantTabletScheduler::check_ls_compaction_finish(const share::ObLSID &ls_ if (OB_UNLIKELY(!ls_id.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(ls_id)); - } else if (OB_FAIL(MTL(ObTenantDagScheduler*)->check_ls_compaction_dag_exist(ls_id, exist))) { + } else if (OB_FAIL(MTL(ObTenantDagScheduler*)->check_ls_compaction_dag_exist_with_cancel(ls_id, exist))) { LOG_WARN("failed to check ls compaction dag", K(ret), K(ls_id)); } else if (exist) { // the compaction dag exists, need retry later. diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.h b/src/storage/compaction/ob_tenant_tablet_scheduler.h index 653e5c8f7f..75a002652d 100755 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.h +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.h @@ -292,9 +292,7 @@ private: MINOR_MERGE, HISTORY_MINOR_MERGE}; static const int64_t SSTABLE_GC_INTERVAL = 30 * 1000 * 1000L; // 30s static const int64_t INFO_POOL_RESIZE_INTERVAL = 30 * 1000 * 1000L; // 30s - static const int64_t DEFAULT_HASH_MAP_BUCKET_CNT = 1009; static const int64_t DEFAULT_COMPACTION_SCHEDULE_INTERVAL = 30 * 1000 * 1000L; // 30s - static const int64_t CHECK_WEAK_READ_TS_SCHEDULE_INTERVAL = 10 * 1000 * 1000L; // 10s static const int64_t CHECK_REPORT_SCN_INTERVAL = 5 * 60 * 1000 * 1000L; // 600s static const int64_t ADD_LOOP_EVENT_INTERVAL = 120 * 1000 * 1000L; // 120s static const int64_t WAIT_MEDIUM_CHECK_THRESHOLD = 10 * 60 * 1000 * 1000L; // 10m diff --git a/src/storage/ob_storage_schema_recorder.h b/src/storage/ob_storage_schema_recorder.h index 51426c173b..b395a69d71 100644 --- a/src/storage/ob_storage_schema_recorder.h +++ b/src/storage/ob_storage_schema_recorder.h @@ -53,14 +53,6 @@ public: void destroy(); void reset(); bool is_inited() const { return is_inited_; } - bool is_valid() const - { - return is_inited_ - && ls_id_.is_valid() - && tablet_id_.is_valid() - && nullptr != log_handler_ - && max_saved_version_ >= 0; - } // follower int replay_schema_log(const share::SCN &scn, const char *buf, const int64_t size, int64_t &pos); diff --git a/unittest/share/scheduler/test_dag_scheduler.cpp b/unittest/share/scheduler/test_dag_scheduler.cpp index cda4ad386f..14efb1e54f 100644 --- a/unittest/share/scheduler/test_dag_scheduler.cpp +++ b/unittest/share/scheduler/test_dag_scheduler.cpp @@ -1511,6 +1511,63 @@ TEST_F(TestDagScheduler, test_emergency_task) EXPECT_EQ(3, op.value()); } + +class TestCompMidCancelDag : public compaction::ObTabletMergeDag +{ +public: + TestCompMidCancelDag() + : compaction::ObTabletMergeDag(ObDagType::DAG_TYPE_MERGE_EXECUTE){} + virtual const share::ObLSID & get_ls_id() const override { return ls_id_; } + virtual lib::Worker::CompatMode get_compat_mode() const override + { return lib::Worker::CompatMode::MYSQL; } +private: + DISALLOW_COPY_AND_ASSIGN(TestCompMidCancelDag); +}; + +TEST_F(TestDagScheduler, test_check_ls_compaction_dag_exist_with_cancel) +{ + ObTenantDagScheduler *scheduler = MTL(ObTenantDagScheduler*); + ASSERT_TRUE(nullptr != scheduler); + ASSERT_EQ(OB_SUCCESS, scheduler->init(MTL_ID(), time_slice, 64)); + EXPECT_EQ(OB_SUCCESS, scheduler->set_thread_score(ObDagPrio::DAG_PRIO_COMPACTION_MID, 1)); + EXPECT_EQ(1, scheduler->up_limits_[ObDagPrio::DAG_PRIO_COMPACTION_MID]); + + LoopWaitTask *wait_task = nullptr; + const int64_t dag_cnt = 6; + // add 6 dag at prio = DAG_PRIO_COMPACTION_MID + ObLSID ls_ids[2] = {ObLSID(1), ObLSID(2)}; + bool finish_flag[2] = {false, false}; + for (int64_t i = 0; i < dag_cnt; ++i) { + const int64_t idx = i % 2; + TestCompMidCancelDag *dag = NULL; + EXPECT_EQ(OB_SUCCESS, scheduler->alloc_dag(dag)); + dag->ls_id_ = ls_ids[idx]; + dag->tablet_id_ = ObTabletID(i); + EXPECT_EQ(OB_SUCCESS, alloc_task(*dag, wait_task)); + EXPECT_EQ(OB_SUCCESS, wait_task->init(1, 2, finish_flag[idx])); + EXPECT_EQ(OB_SUCCESS, dag->add_task(*wait_task)); + EXPECT_EQ(OB_SUCCESS, scheduler->add_dag(dag)); + } + EXPECT_EQ(dag_cnt, scheduler->dag_cnts_[ObDagType::DAG_TYPE_MERGE_EXECUTE]); + CHECK_EQ_UTIL_TIMEOUT(1, scheduler->running_task_cnts_[ObDagPrio::DAG_PRIO_COMPACTION_MID]); + + // cancel two waiting dag of ls_ids[0] + bool exist = false; + EXPECT_EQ(OB_SUCCESS, scheduler->check_ls_compaction_dag_exist_with_cancel(ls_ids[0], exist)); + EXPECT_EQ(exist, true); + EXPECT_EQ(4, scheduler->dag_cnts_[ObDagType::DAG_TYPE_MERGE_EXECUTE]); + + EXPECT_EQ(OB_SUCCESS, scheduler->check_ls_compaction_dag_exist_with_cancel(ls_ids[1], exist)); + EXPECT_EQ(exist, false); + EXPECT_EQ(1, scheduler->dag_cnts_[ObDagType::DAG_TYPE_MERGE_EXECUTE]); + + finish_flag[0] = true; + wait_scheduler(); + + EXPECT_EQ(OB_SUCCESS, scheduler->check_ls_compaction_dag_exist_with_cancel(ls_ids[0], exist)); + EXPECT_EQ(exist, false); +} + /* TEST_F(TestDagScheduler, test_large_thread_cnt) {