From 0921188899f8ba9f624fca7eaad2609909d73d37 Mon Sep 17 00:00:00 2001 From: Fengjingkun Date: Fri, 8 Mar 2024 12:45:48 +0000 Subject: [PATCH] fix some bugs && refine some codes --- src/share/ob_ddl_common.cpp | 2 +- src/share/ob_freeze_info_manager.cpp | 11 ++- src/share/scheduler/ob_dag_scheduler_config.h | 2 +- src/storage/column_store/ob_cg_scanner.cpp | 4 +- .../column_store/ob_cg_sstable_row_getter.cpp | 2 +- .../ob_column_oriented_merger.cpp | 2 +- .../ob_column_oriented_sstable.cpp | 10 ++- .../column_store/ob_column_oriented_sstable.h | 3 +- .../compaction/ob_medium_compaction_func.cpp | 12 +++- .../compaction/ob_medium_compaction_func.h | 3 +- .../compaction/ob_tablet_merge_ctx.cpp | 4 +- .../compaction/ob_tablet_merge_task.cpp | 70 ++++++++++--------- src/storage/compaction/ob_tablet_merge_task.h | 37 +++++++--- .../compaction/ob_tenant_tablet_scheduler.cpp | 31 ++++---- .../compaction/ob_tenant_tablet_scheduler.h | 6 +- src/storage/ddl/ob_ddl_merge_task.cpp | 2 +- 16 files changed, 123 insertions(+), 78 deletions(-) diff --git a/src/share/ob_ddl_common.cpp b/src/share/ob_ddl_common.cpp index ae3b6f68c..07f31850e 100644 --- a/src/share/ob_ddl_common.cpp +++ b/src/share/ob_ddl_common.cpp @@ -2525,7 +2525,7 @@ int ObCODDLUtil::get_column_checksums( LOG_WARN("unexpected column_group", K(ret), K(i)); } else if (OB_FAIL(co_sstable->fetch_cg_sstable(i, cg_sstable_wrapper))) { LOG_WARN("fail to get cg sstable", K(ret), K(i)); - } else if (OB_FAIL(cg_sstable_wrapper.get_sstable(cg_sstable))) { + } else if (OB_FAIL(cg_sstable_wrapper.get_loaded_column_store_sstable(cg_sstable))) { LOG_WARN("get sstable failed", K(ret)); } else if (OB_UNLIKELY(cg_sstable == nullptr || !cg_sstable->is_valid())) { ret = OB_ERR_UNEXPECTED; diff --git a/src/share/ob_freeze_info_manager.cpp b/src/share/ob_freeze_info_manager.cpp index 2fc82f6ce..3e956acda 100644 --- a/src/share/ob_freeze_info_manager.cpp +++ b/src/share/ob_freeze_info_manager.cpp @@ -199,7 +199,12 @@ int ObFreezeInfoManager::update_freeze_info( const share::SCN &latest_snapshot_gc_scn) { int ret = OB_SUCCESS; - if (OB_FAIL(freeze_info_.frozen_statuses_.prepare_allocate(freeze_infos.count()))) { + const int64_t freeze_info_cnt = freeze_infos.count(); + + if (OB_UNLIKELY(freeze_infos.empty())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get invalid arguments", K(ret), K(freeze_infos), K(latest_snapshot_gc_scn)); + } else if (OB_FAIL(freeze_info_.frozen_statuses_.prepare_allocate(freeze_info_cnt))) { LOG_WARN("failed to prepare allocate mem for new freeze info", KR(ret), K(freeze_infos), K(freeze_info_)); } else if (OB_FAIL(freeze_info_.frozen_statuses_.assign(freeze_infos))) { LOG_WARN("fail to assign", KR(ret), K(freeze_infos)); @@ -211,7 +216,7 @@ int ObFreezeInfoManager::update_freeze_info( if (OB_SUCC(ret)) { freeze_info_.latest_snapshot_gc_scn_ = latest_snapshot_gc_scn; - LOG_INFO("inner load succ", K(freeze_info_)); + LOG_INFO("inner load succ", "latest_freeze_info", freeze_info_.frozen_statuses_.at(freeze_info_cnt - 1), K(freeze_info_)); } return ret; } @@ -354,7 +359,7 @@ int ObFreezeInfoManager::get_freeze_info_behind_major_snapshot( LOG_WARN("get invalid arguments", K(ret), K(snapshot_version)); } else { bool found = false; - for (int64_t i = freeze_info_.frozen_statuses_.count() - 1; OB_SUCC(ret) && i >= 0; --i) { + for (int64_t i = 0; OB_SUCC(ret) && i < freeze_info_.frozen_statuses_.count(); ++i) { if (snapshot_version < freeze_info_.frozen_statuses_.at(i).frozen_scn_.get_val_for_tx()) { frozen_status = freeze_info_.frozen_statuses_.at(i); found = true; diff --git a/src/share/scheduler/ob_dag_scheduler_config.h b/src/share/scheduler/ob_dag_scheduler_config.h index 24b34800e..eb50c7bbf 100644 --- a/src/share/scheduler/ob_dag_scheduler_config.h +++ b/src/share/scheduler/ob_dag_scheduler_config.h @@ -61,7 +61,7 @@ DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_WRITE_CKPT, ObDagPrio::DAG_PRIO_COMPACTION_L DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_MDS_TABLE_MERGE, ObDagPrio::DAG_PRIO_COMPACTION_HIGH, ObSysTaskType::MDS_TABLE_MERGE_TASK, "MDS_TABLE_MERGE", "COMPACTION", false, 3, {"ls_id", "tablet_id", "flush_scn"}) DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_BATCH_FREEZE_TABLETS, ObDagPrio::DAG_PRIO_COMPACTION_HIGH, ObSysTaskType::BATCH_FREEZE_TABLET_TASK, "BATCH_FREEZE", "COMPACTION", - false, 3, {"ls_id", "compaction_scn", "tablet_count"}) + false, 2, {"ls_id", "tablet_count"}) DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_DDL, ObDagPrio::DAG_PRIO_DDL, ObSysTaskType::DDL_TASK, "DDL_COMPLEMENT", "DDL", true, 7, {"ls_id", "source_tablet_id", "dest_tablet_id", "data_table_id", "target_table_id", "schema_version", "snapshot_version"}) diff --git a/src/storage/column_store/ob_cg_scanner.cpp b/src/storage/column_store/ob_cg_scanner.cpp index ac83efd7b..bc4f6c684 100644 --- a/src/storage/column_store/ob_cg_scanner.cpp +++ b/src/storage/column_store/ob_cg_scanner.cpp @@ -38,7 +38,7 @@ int ObCGScanner::init( ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid argument to init ObCGScanner", K(ret), K(wrapper), K(iter_param)); } else if (FALSE_IT(table_wrapper_ = wrapper)) { - } else if (OB_FAIL(table_wrapper_.get_sstable(sstable_))) { + } else if (OB_FAIL(table_wrapper_.get_loaded_column_store_sstable(sstable_))) { LOG_WARN("fail to get sstable", K(ret), K(wrapper)); } else if (OB_UNLIKELY(!sstable_->is_normal_cg_sstable())) { ret = OB_ERR_UNEXPECTED; @@ -78,7 +78,7 @@ int ObCGScanner::switch_context( ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid argument", K(ret), K(wrapper), K(iter_param)); } else if (FALSE_IT(table_wrapper_ = wrapper)) { - } else if (OB_FAIL(table_wrapper_.get_sstable(sstable_))) { + } else if (OB_FAIL(table_wrapper_.get_loaded_column_store_sstable(sstable_))) { LOG_WARN("fail to get sstable", K(ret), K(wrapper)); } else if (OB_UNLIKELY(!sstable_->is_normal_cg_sstable())) { ret = OB_ERR_UNEXPECTED; diff --git a/src/storage/column_store/ob_cg_sstable_row_getter.cpp b/src/storage/column_store/ob_cg_sstable_row_getter.cpp index 026bbc21e..848b16577 100644 --- a/src/storage/column_store/ob_cg_sstable_row_getter.cpp +++ b/src/storage/column_store/ob_cg_sstable_row_getter.cpp @@ -62,7 +62,7 @@ int ObCGGetter::init( 1 != idx_key.get_datum_cnt())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid argument to init ObCGGetter", K(ret), K(wrapper), K(iter_param)); - } else if (OB_FAIL(wrapper.get_sstable(sstable))) { + } else if (OB_FAIL(wrapper.get_loaded_column_store_sstable(sstable))) { LOG_WARN("fail to get sstable", K(ret), K(wrapper)); } else { is_same_data_block_ = false; diff --git a/src/storage/column_store/ob_column_oriented_merger.cpp b/src/storage/column_store/ob_column_oriented_merger.cpp index 135a214af..30f98cfa7 100644 --- a/src/storage/column_store/ob_column_oriented_merger.cpp +++ b/src/storage/column_store/ob_column_oriented_merger.cpp @@ -249,7 +249,7 @@ int ObCOMerger::alloc_writers( } } else if (OB_FAIL(co_sstable.fetch_cg_sstable(idx, cg_wrapper))) { STORAGE_LOG(WARN, "failed to get cg sstable", K(ret), K(sstable)); - } else if (OB_FAIL(cg_wrapper.get_sstable(cg_sstable))) { + } else if (OB_FAIL(cg_wrapper.get_loaded_column_store_sstable(cg_sstable))) { STORAGE_LOG(WARN, "failed to get sstable from wrapper", K(ret), K(cg_wrapper)); } else if (OB_FAIL(cg_wrappers_.push_back(cg_wrapper))) { STORAGE_LOG(WARN, "failed to push cg wrapper", K(ret), K(cg_wrappers_)); diff --git a/src/storage/column_store/ob_column_oriented_sstable.cpp b/src/storage/column_store/ob_column_oriented_sstable.cpp index 6e4f15ce6..1e2f1d37e 100644 --- a/src/storage/column_store/ob_column_oriented_sstable.cpp +++ b/src/storage/column_store/ob_column_oriented_sstable.cpp @@ -69,7 +69,7 @@ int ObSSTableWrapper::set_sstable( return ret; } -int ObSSTableWrapper::get_sstable(ObSSTable *&table) +int ObSSTableWrapper::get_loaded_column_store_sstable(ObSSTable *&table) { int ret = OB_SUCCESS; ObSSTable *meta_sstable = nullptr; @@ -78,8 +78,14 @@ int ObSSTableWrapper::get_sstable(ObSSTable *&table) if (OB_UNLIKELY(!is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("wrapper not valid", K(ret), KPC(this)); + } else if (OB_UNLIKELY(!sstable_->is_column_store_sstable())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("This func can only be used when fetching column store SSTable", K(ret), KPC(sstable_)); } else if (sstable_->is_loaded()) { table = sstable_; + } else if (OB_UNLIKELY(!meta_handle_.is_valid())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("meta handle is unexpected not valid", K(ret), KPC(sstable_), K(meta_handle_)); } else if (OB_FAIL(meta_handle_.get_sstable(meta_sstable))) { LOG_WARN("failed to get sstable", K(ret), KPC(this)); } else if (sstable_->get_key() == meta_sstable->get_key()) { @@ -129,7 +135,7 @@ int ObSSTableWrapper::get_merge_row_cnt(const ObTableIterParam &iter_param, int6 blocksstable::ObSSTable *cur_sstable = nullptr; ObDDLMergeBlockRowIterator ddl_merge_iter; - if (OB_FAIL(get_sstable(cur_sstable))) { + if (OB_FAIL(get_loaded_column_store_sstable(cur_sstable))) { LOG_WARN("fail to get sstable", K(ret), K(*this)); } else if (OB_FAIL((cur_sstable->get_index_tree_root(root_block)))) { LOG_WARN("fail to get index tree root", K(ret), K(root_block), K(*this)); diff --git a/src/storage/column_store/ob_column_oriented_sstable.h b/src/storage/column_store/ob_column_oriented_sstable.h index 1ba20011f..47b0f5fa7 100644 --- a/src/storage/column_store/ob_column_oriented_sstable.h +++ b/src/storage/column_store/ob_column_oriented_sstable.h @@ -46,7 +46,8 @@ public: void reset(); bool is_valid() const { return sstable_ != nullptr; } int set_sstable(blocksstable::ObSSTable *sstable, ObStorageMetaHandle *meta_handle = nullptr); - int get_sstable(blocksstable::ObSSTable *&table); + // this interface will return the loaded column store type sstable + int get_loaded_column_store_sstable(blocksstable::ObSSTable *&table); int get_merge_row_cnt(const ObTableIterParam &iter_param, int64_t &row_cnt); blocksstable::ObSSTable *get_sstable() const { return sstable_; } const ObStorageMetaHandle &get_meta_handle() const { return meta_handle_; } diff --git a/src/storage/compaction/ob_medium_compaction_func.cpp b/src/storage/compaction/ob_medium_compaction_func.cpp index 813ab3f4c..e4498ad3d 100644 --- a/src/storage/compaction/ob_medium_compaction_func.cpp +++ b/src/storage/compaction/ob_medium_compaction_func.cpp @@ -1246,13 +1246,14 @@ int ObMediumCompactionScheduleFunc::batch_check_medium_finish( int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge( ObLS &ls, ObTablet &tablet, - bool &tablet_need_freeze_flag, + ObTabletSchedulePair &schedule_pair, bool &create_dag_flag, const int64_t input_major_snapshot, const bool scheduler_called) { int ret = OB_SUCCESS; create_dag_flag = false; + #ifdef ERRSIM ret = OB_E(EventTable::EN_MEDIUM_CREATE_DAG) ret; if (OB_FAIL(ret)) { @@ -1274,6 +1275,8 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge( const int64_t major_frozen_snapshot = 0 == input_major_snapshot ? MTL(ObTenantTabletScheduler *)->get_frozen_version() : input_major_snapshot; // broadcast scn ObMediumCompactionInfo::ObCompactionType compaction_type = ObMediumCompactionInfo::COMPACTION_TYPE_MAX; int64_t schedule_scn = 0; // medium_snapshot in medium info + bool tablet_need_freeze_flag = false; + if (OB_FAIL(tablet.read_medium_info_list(temp_allocator, medium_list))) { LOG_WARN("failed to load medium info list", K(ret), K(tablet)); } else if (OB_FAIL(read_medium_info_from_list(*medium_list, last_major_snapshot, major_frozen_snapshot, compaction_type, schedule_scn))) { @@ -1289,6 +1292,13 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge( } else if (schedule_scn > 0 && OB_FAIL(check_need_merge_and_schedule(ls, tablet, schedule_scn, tablet_need_freeze_flag, create_dag_flag))) { LOG_WARN("failed to check medium merge", K(ret), K(ls_id), K(tablet_id), K(schedule_scn)); } + + if (OB_SUCC(ret) && tablet_need_freeze_flag) { + schedule_pair.tablet_id_ = tablet_id; + schedule_pair.schedule_merge_scn_ = schedule_scn; + } else { + schedule_pair.reset(); + } } } diff --git a/src/storage/compaction/ob_medium_compaction_func.h b/src/storage/compaction/ob_medium_compaction_func.h index e42b06aa8..a628ba8cf 100644 --- a/src/storage/compaction/ob_medium_compaction_func.h +++ b/src/storage/compaction/ob_medium_compaction_func.h @@ -23,6 +23,7 @@ namespace oceanbase { namespace compaction { +struct ObTabletSchedulePair; class ObMediumCompactionScheduleFunc { @@ -47,7 +48,7 @@ public: static int schedule_tablet_medium_merge( ObLS &ls, ObTablet &tablet, - bool &tablet_need_freeze_flag, + ObTabletSchedulePair &schedule_pair, bool &create_dag_flag, const int64_t major_frozen_scn = 0, const bool scheduler_called = false); diff --git a/src/storage/compaction/ob_tablet_merge_ctx.cpp b/src/storage/compaction/ob_tablet_merge_ctx.cpp index b44fa520a..280307544 100644 --- a/src/storage/compaction/ob_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_tablet_merge_ctx.cpp @@ -226,12 +226,12 @@ int ObTabletMiniMergeCtx::try_schedule_meta_merge( // try schedule medium merge if (!medium_is_cooling_down) { - bool non_used_freeze_flag = false; // no meaning, just for placeholder for refering bool non_used_schedule_dag_flag = false; + ObTabletSchedulePair non_used_schedule_pair; if (OB_TMP_FAIL(ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge( *get_ls(), *tablet_handle.get_obj(), - non_used_freeze_flag, + non_used_schedule_pair, non_used_schedule_dag_flag))) { if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) { LOG_WARN_RET(tmp_ret, "failed to schedule tablet medium merge", K(tmp_ret), "param", get_dag_param()); diff --git a/src/storage/compaction/ob_tablet_merge_task.cpp b/src/storage/compaction/ob_tablet_merge_task.cpp index 3320a65c1..83f705d06 100644 --- a/src/storage/compaction/ob_tablet_merge_task.cpp +++ b/src/storage/compaction/ob_tablet_merge_task.cpp @@ -1193,10 +1193,9 @@ void prepare_allocator( */ ObBatchFreezeTabletsParam::ObBatchFreezeTabletsParam() : ls_id_(), - compaction_scn_(), - tablet_ids_() + tablet_pairs_() { - tablet_ids_.set_attr(lib::ObMemAttr(MTL_ID(), "BtFrzTbls", ObCtxIds::MERGE_NORMAL_CTX_ID)); + tablet_pairs_.set_attr(lib::ObMemAttr(MTL_ID(), "BtFrzTbls", ObCtxIds::MERGE_NORMAL_CTX_ID)); } int ObBatchFreezeTabletsParam::assign( @@ -1206,11 +1205,10 @@ int ObBatchFreezeTabletsParam::assign( if (this == &other) { // do nothing - } else if (OB_FAIL(tablet_ids_.assign(other.tablet_ids_))) { + } else if (OB_FAIL(tablet_pairs_.assign(other.tablet_pairs_))) { LOG_WARN("failed to copy tablet ids", K(ret)); } else { ls_id_ = other.ls_id_; - compaction_scn_ = other.compaction_scn_; } return ret; } @@ -1219,7 +1217,6 @@ int64_t ObBatchFreezeTabletsParam::get_hash() const { int64_t hash_val = 0; hash_val = common::murmurhash(&ls_id_, sizeof(ls_id_), hash_val); - hash_val = common::murmurhash(&compaction_scn_, sizeof(compaction_scn_), hash_val); return hash_val; } @@ -1280,8 +1277,6 @@ bool ObBatchFreezeTabletsDag::operator == (const ObIDag &other) const is_same = false; } else if (param_.ls_id_ != static_cast(other).param_.ls_id_) { is_same = false; - } else if (param_.compaction_scn_ != static_cast(other).param_.compaction_scn_) { - is_same = false; } return is_same; } @@ -1303,8 +1298,7 @@ int ObBatchFreezeTabletsDag::fill_info_param( allocator, get_type(), param_.ls_id_.id(), - param_.compaction_scn_, - param_.tablet_ids_.count()))) { + param_.tablet_pairs_.count()))) { LOG_WARN("failed to fill info param", K(ret), K(param_)); } return ret; @@ -1313,8 +1307,8 @@ int ObBatchFreezeTabletsDag::fill_info_param( int ObBatchFreezeTabletsDag::fill_dag_key(char *buf, const int64_t buf_len) const { int ret = OB_SUCCESS; - if (OB_FAIL(databuff_printf(buf, buf_len, "ls_id=%ld compaction_scn=%ld freeze_tablet_cnt=%ld", - param_.ls_id_.id(), param_.compaction_scn_, param_.tablet_ids_.count()))) { + if (OB_FAIL(databuff_printf(buf, buf_len, "ls_id=%ld freeze_tablet_cnt=%ld", + param_.ls_id_.id(), param_.tablet_pairs_.count()))) { LOG_WARN("failed to fill dag key", K(ret), K(param_)); } return ret; @@ -1362,40 +1356,47 @@ int ObBatchFreezeTabletsTask::process() const ObBatchFreezeTabletsParam ¶m = base_dag_->get_param(); ObLSHandle ls_handle; - ObLS *ls_ptr = nullptr; - if (OB_TMP_FAIL(MTL(ObLSService *)->get_ls(param.ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD))) { - LOG_WARN_RET(tmp_ret, "failed to get log stream", K(param)); + ObLS *ls = nullptr; + int64_t weak_read_ts = 0; + if (OB_FAIL(MTL(ObLSService *)->get_ls(param.ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD))) { + LOG_WARN("failed to get log stream", K(ret), K(param)); + } else if (OB_ISNULL(ls = ls_handle.get_ls())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null ls", K(ret), K(param)); } else { - ls_ptr = ls_handle.get_ls(); + weak_read_ts = ls->get_ls_wrs_handler()->get_ls_weak_read_ts().get_val_for_tx(); } int64_t fail_freeze_cnt = 0; int64_t succ_schedule_cnt = 0; - for (int64_t i = 0; OB_SUCC(ret) && i < param.tablet_ids_.count(); ++i) { - const ObTabletID &tablet_id = param.tablet_ids_.at(i); + for (int64_t i = 0; OB_SUCC(ret) && i < param.tablet_pairs_.count(); ++i) { + const ObTabletSchedulePair &cur_pair = param.tablet_pairs_.at(i); ObTabletHandle tablet_handle; ObTablet *tablet = nullptr; - if (OB_TMP_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(tablet_id, true/*need_rewrite*/, true/*is_sync*/))) { - LOG_WARN_RET(tmp_ret, "failed to force freeze tablet", K(param), K(tablet_id)); + if (OB_UNLIKELY(!cur_pair.is_valid())) { + tmp_ret = OB_ERR_UNEXPECTED; + LOG_WARN_RET(tmp_ret, "get invalid tablet pair", K(cur_pair)); + } else if (cur_pair.schedule_merge_scn_ > weak_read_ts) { + // no need to force freeze + } else if (OB_TMP_FAIL(MTL(ObTenantFreezer *)->tablet_freeze(cur_pair.tablet_id_, true/*need_rewrite*/, true/*is_sync*/))) { + LOG_WARN_RET(tmp_ret, "failed to force freeze tablet", K(param), K(cur_pair)); ++fail_freeze_cnt; - } else if (OB_ISNULL(ls_ptr)) { - // do nothing - } else if (OB_TMP_FAIL(ls_ptr->get_tablet_svr()->get_tablet(tablet_id, - tablet_handle, - 0/*timeout_us*/, - storage::ObMDSGetTabletMode::READ_ALL_COMMITED))) { - LOG_WARN_RET(tmp_ret, "failed to get tablet", K(param), K(tablet_id)); + } else if (OB_TMP_FAIL(ls->get_tablet_svr()->get_tablet(cur_pair.tablet_id_, + tablet_handle, + 0/*timeout_us*/, + storage::ObMDSGetTabletMode::READ_ALL_COMMITED))) { + LOG_WARN_RET(tmp_ret, "failed to get tablet", K(param), K(cur_pair)); } else if (FALSE_IT(tablet = tablet_handle.get_obj())) { - } else if (OB_UNLIKELY(tablet->get_snapshot_version() < param.compaction_scn_)) { + } else if (OB_UNLIKELY(tablet->get_snapshot_version() < cur_pair.schedule_merge_scn_)) { // do nothing } else if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_merge_dag(param.ls_id_, *tablet, MEDIUM_MERGE, - param.compaction_scn_))) { + cur_pair.schedule_merge_scn_))) { if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) { ret = tmp_ret; - LOG_WARN_RET(tmp_ret, "failed to schedule medium merge dag", K(param), K(tablet_id)); + LOG_WARN_RET(tmp_ret, "failed to schedule medium merge dag", K(param), K(cur_pair)); } } else { ++succ_schedule_cnt; @@ -1404,12 +1405,15 @@ int ObBatchFreezeTabletsTask::process() if (OB_FAIL(share::dag_yield())) { LOG_WARN("failed to dag yield", K(ret)); } - } + if (REACH_TENANT_TIME_INTERVAL(5_s)) { + weak_read_ts = ls->get_ls_wrs_handler()->get_ls_weak_read_ts().get_val_for_tx(); + } + } // end for - if (OB_UNLIKELY(fail_freeze_cnt * 2 > param.tablet_ids_.count())) { + if (OB_UNLIKELY(fail_freeze_cnt * 2 > param.tablet_pairs_.count())) { ret = OB_PARTIAL_FAILED; } - FLOG_INFO("batch freeze tablets finished", KR(ret), K(param), K(fail_freeze_cnt), KP(ls_ptr), K(succ_schedule_cnt)); + FLOG_INFO("batch freeze tablets finished", KR(ret), K(param), K(fail_freeze_cnt), K(succ_schedule_cnt)); return ret; } diff --git a/src/storage/compaction/ob_tablet_merge_task.h b/src/storage/compaction/ob_tablet_merge_task.h index 7d3e957db..b0e8e9020 100644 --- a/src/storage/compaction/ob_tablet_merge_task.h +++ b/src/storage/compaction/ob_tablet_merge_task.h @@ -353,22 +353,44 @@ public: }; +struct ObTabletSchedulePair +{ +public: + ObTabletSchedulePair() + : tablet_id_(), + schedule_merge_scn_(0) + { } + ObTabletSchedulePair( + const common::ObTabletID &tablet_id, + const int64_t schedule_merge_scn) + : tablet_id_(tablet_id), + schedule_merge_scn_(schedule_merge_scn) + { } + bool is_valid() const { return tablet_id_.is_valid() && schedule_merge_scn_ > 0; } + bool need_force_freeze() const { return schedule_merge_scn_ > 0; } + void reset() { tablet_id_.reset(); schedule_merge_scn_ = 0; } + TO_STRING_KV(K_(tablet_id), K_(schedule_merge_scn)); +public: + common::ObTabletID tablet_id_; + int64_t schedule_merge_scn_; +}; + + struct ObBatchFreezeTabletsParam : public share::ObIDagInitParam { public: ObBatchFreezeTabletsParam(); - virtual ~ObBatchFreezeTabletsParam() { tablet_ids_.reset(); } - virtual bool is_valid() const override { return ls_id_.is_valid() && compaction_scn_ > 0 && tablet_ids_.count() > 0; } + virtual ~ObBatchFreezeTabletsParam() { tablet_pairs_.reset(); } + virtual bool is_valid() const override { return ls_id_.is_valid() && tablet_pairs_.count() > 0; } int assign(const ObBatchFreezeTabletsParam &other); bool operator == (const ObBatchFreezeTabletsParam &other) const; bool operator != (const ObBatchFreezeTabletsParam &other) const { return !this->operator==(other); } int64_t get_hash() const; - VIRTUAL_TO_STRING_KV(K_(ls_id), K_(compaction_scn), "tablet_count", tablet_ids_.count(), K_(tablet_ids)); + VIRTUAL_TO_STRING_KV(K_(ls_id), "tablet_pair_cnt", tablet_pairs_.count(), K_(tablet_pairs)); public: static constexpr int64_t DEFAULT_BATCH_SIZE = 16; share::ObLSID ls_id_; - int64_t compaction_scn_; - common::ObSEArray tablet_ids_; + common::ObSEArray tablet_pairs_; }; @@ -388,11 +410,6 @@ public: virtual lib::Worker::CompatMode get_compat_mode() const override { return lib::Worker::CompatMode::MYSQL; } virtual uint64_t get_consumer_group_id() const override { return consumer_group_id_; } const ObBatchFreezeTabletsParam &get_param() const { return param_; } - virtual bool ignore_warning() override - { - return OB_PARTIAL_FAILED != dag_ret_; - } - INHERIT_TO_STRING_KV("ObIDag", ObIDag, K_(is_inited), K_(param)); private: bool is_inited_; diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index 0d05b3f2f..1ebaed989 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -1528,7 +1528,7 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge( bool is_leader = false; bool could_major_merge = false; const int64_t major_frozen_scn = get_frozen_version(); - ObSEArray need_freeze_tablets; + ObSEArray need_freeze_tablets; need_freeze_tablets.set_attr(ObMemAttr(MTL_ID(), "MediumBatch")); if (could_major_merge_start()) { could_major_merge = true; @@ -1550,15 +1550,16 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge( } bool enable_adaptive_compaction = get_enable_adaptive_compaction(); - bool tablet_need_freeze_flag = false; + ObTabletSchedulePair schedule_pair; while (OB_SUCC(ret)) { // loop all tablet in ls tablet_time_guard.reuse(); bool tablet_merge_finish = false; - tablet_need_freeze_flag = false; // ATTENTION!!! load weak ts before get tablet const share::SCN &weak_read_ts = ls.get_ls_wrs_handler()->get_ls_weak_read_ts(); tablet_could_schedule_medium = false; + schedule_pair.reset(); + if (OB_FAIL(medium_ls_tablet_iter_.get_next_tablet(tablet_handle))) { if (OB_ITER_END == ret) { ret = OB_SUCCESS; @@ -1587,7 +1588,7 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge( } else if (OB_TMP_FAIL(schedule_tablet_medium( ls, tablet_handle, major_frozen_scn, weak_read_ts, could_major_merge, tablet_could_schedule_medium, merge_version, enable_adaptive_compaction, - is_leader, tablet_merge_finish, tablet_need_freeze_flag, tablet_time_guard))) { + is_leader, tablet_merge_finish, schedule_pair, tablet_time_guard))) { LOG_WARN("failed to schedule tablet medium", KR(tmp_ret), K(ls_id), K(tablet_id)); } if (tablet_could_schedule_medium @@ -1596,8 +1597,8 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge( LOG_WARN("failed to clear prohibit schedule medium flag", K(tmp_ret), K(ret), K(ls_id), K(tablet_id)); } medium_ls_tablet_iter_.update_merge_finish(tablet_merge_finish); - if (tablet_need_freeze_flag) { - if (OB_TMP_FAIL(need_freeze_tablets.push_back(tablet_id))) { + if (schedule_pair.need_force_freeze()) { + if (OB_TMP_FAIL(need_freeze_tablets.push_back(schedule_pair))) { LOG_WARN("failed to push back tablet_id for batch_freeze", KR(tmp_ret), K(ls_id), K(tablet_id)); } } @@ -1605,7 +1606,8 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge( } // end of while if (OB_FAIL(ret) || need_freeze_tablets.empty()) { - } else if (OB_TMP_FAIL(schedule_batch_freeze_dag(merge_version, ls_id, need_freeze_tablets))) { + } else if (OB_TMP_FAIL(schedule_batch_freeze_dag(ls_id, + need_freeze_tablets))) { LOG_WARN("failed to schedule batch force freeze tablets dag", K(tmp_ret), K(ls_id), "tablet_count", need_freeze_tablets.count()); } @@ -1654,7 +1656,7 @@ int ObTenantTabletScheduler::schedule_tablet_medium( const bool enable_adaptive_compaction, bool &is_leader, bool &tablet_merge_finish, - bool &tablet_need_freeze_flag, + ObTabletSchedulePair &schedule_pair, ObCompactionTimeGuard &time_guard) { int ret = OB_SUCCESS; @@ -1665,6 +1667,7 @@ int ObTenantTabletScheduler::schedule_tablet_medium( bool need_diagnose = false; bool tablet_could_schedule_merge = false; bool create_dag_flag = false; + schedule_pair.reset(); if (tablet_could_schedule_medium && OB_TMP_FAIL(ObTabletMergeChecker::check_could_merge_for_medium(tablet, tablet_could_schedule_merge))) { @@ -1721,7 +1724,7 @@ int ObTenantTabletScheduler::schedule_tablet_medium( if (OB_FAIL(ret)) { } else if (could_major_merge) { if (OB_TMP_FAIL(ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge( - ls, tablet, tablet_need_freeze_flag, create_dag_flag, + ls, tablet, schedule_pair, create_dag_flag, major_frozen_scn, true /*scheduler_called*/))) { if (OB_EAGAIN != ret) { LOG_WARN("failed to schedule medium", K(tmp_ret), K(ls_id), K(tablet_id)); @@ -2049,19 +2052,17 @@ void ObTenantTabletScheduler::report_blocking_medium( } int ObTenantTabletScheduler::schedule_batch_freeze_dag( - const int64_t merge_version, const share::ObLSID &ls_id, - const common::ObIArray &tablet_ids) + const common::ObIArray &tablet_pairs) { int ret = OB_SUCCESS; ObBatchFreezeTabletsParam param; - if (OB_UNLIKELY(!ls_id.is_valid() || tablet_ids.empty())) { + if (OB_UNLIKELY(!ls_id.is_valid() || tablet_pairs.empty())) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("get invalid arguments", K(ret), K(ls_id), K(tablet_ids)); + LOG_WARN("get invalid arguments", K(ret), K(ls_id), K(tablet_pairs)); } else if (FALSE_IT(param.ls_id_ = ls_id)) { - } else if (FALSE_IT(param.compaction_scn_ = merge_version)) { - } else if (OB_FAIL(param.tablet_ids_.assign(tablet_ids))) { + } else if (OB_FAIL(param.tablet_pairs_.assign(tablet_pairs))) { LOG_WARN("failed to assign tablet ids", K(ret)); } else if (OB_FAIL(MTL(ObTenantDagScheduler *)->create_and_add_dag(¶m, true/*is_emergency*/))) { if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) { diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.h b/src/storage/compaction/ob_tenant_tablet_scheduler.h index f1cce5a94..b8cf85cac 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.h +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.h @@ -47,6 +47,7 @@ struct ObTabletStatKey; namespace compaction { +struct ObTabletSchedulePair; class ObFastFreezeChecker { @@ -261,7 +262,7 @@ private: const bool enable_adaptive_compaction, bool &is_leader, bool &tablet_merge_finish, - bool &tablet_need_freeze_flag, + ObTabletSchedulePair &schedule_pair, ObCompactionTimeGuard &time_guard); int after_schedule_tenant_medium( const int64_t merge_version, @@ -295,9 +296,8 @@ private: const bool &could_major_merge, const share::ObLSID &ls_id); int schedule_batch_freeze_dag( - const int64_t merge_version, const share::ObLSID &ls_id, - const common::ObIArray &tablet_ids); + const common::ObIArray &tablet_ids); public: static const int64_t INIT_COMPACTION_SCN = 1; typedef common::ObSEArray MinorParallelResultArray; diff --git a/src/storage/ddl/ob_ddl_merge_task.cpp b/src/storage/ddl/ob_ddl_merge_task.cpp index 71ac6e51d..0d1f6b75b 100644 --- a/src/storage/ddl/ob_ddl_merge_task.cpp +++ b/src/storage/ddl/ob_ddl_merge_task.cpp @@ -732,7 +732,7 @@ int get_sstables(ObTableStoreIterator &ddl_sstable_iter, const int64_t cg_idx, O // skip } else if (OB_FAIL(cur_co_sstable->fetch_cg_sstable(cg_idx, cg_sstable_wrapper))) { LOG_WARN("get all tables failed", K(ret)); - } else if (OB_FAIL(cg_sstable_wrapper.get_sstable(cg_sstable))) { + } else if (OB_FAIL(cg_sstable_wrapper.get_loaded_column_store_sstable(cg_sstable))) { LOG_WARN("get sstable failed", K(ret)); } else if (OB_ISNULL(cg_sstable)) { // skip