diff --git a/src/storage/blocksstable/ob_macro_block.cpp b/src/storage/blocksstable/ob_macro_block.cpp index 41e28e0ed..6789fec23 100644 --- a/src/storage/blocksstable/ob_macro_block.cpp +++ b/src/storage/blocksstable/ob_macro_block.cpp @@ -141,37 +141,6 @@ int ObDataStoreDesc::cal_row_store_type(const share::schema::ObMergeSchema &merg return ret; } -int ObDataStoreDesc::set_major_working_cluster_version() -{ - int ret = OB_SUCCESS; - if (OB_UNLIKELY((!is_major_merge() && !is_meta_major_merge()) || snapshot_version_ <= 0)) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "Unexpected data store to get major working cluster version", - K(ret), K_(merge_type), K_(snapshot_version)); - } else { - ObTenantFreezeInfoMgr::FreezeInfo freeze_info; - if (OB_SUCC(MTL_CALL_FREEZE_INFO_MGR(get_freeze_info_by_snapshot_version, snapshot_version_, freeze_info))) { - // succ to get freeze info - } else if (OB_ENTRY_NOT_EXIST != ret) { - STORAGE_LOG(WARN, "Failed to get freeze info", K(ret), K_(snapshot_version), "tenant_id", MTL_ID()); - } else if (OB_FAIL(MTL_CALL_FREEZE_INFO_MGR(get_latest_freeze_info, freeze_info))) { - STORAGE_LOG(WARN, "Failed to get latest freeze info", K(ret)); - } - if (OB_SUCC(ret)) { - if (freeze_info.cluster_version < 0) { - STORAGE_LOG(ERROR, "Unexpected cluster version of freeze info", K(ret), K(freeze_info)); - major_working_cluster_version_ = 0; - } else { - major_working_cluster_version_ = freeze_info.cluster_version; - } - ObTaskController::get().allow_next_syslog(); - STORAGE_LOG(INFO, "Succ to get major working cluster version", - K_(major_working_cluster_version), K(freeze_info), K_(snapshot_version)); - } - } - return ret; -} - int ObDataStoreDesc::init( const ObMergeSchema &merge_schema, const share::ObLSID &ls_id, @@ -250,14 +219,17 @@ int ObDataStoreDesc::init( encoder_opt_.set_store_type(row_store_type_); } - if (OB_SUCC(ret) && storage::is_major_merge(merge_type)) { // exactly MAJOR MERGE + if (OB_SUCC(ret) && is_major) { + uint64_t compat_version = 0; + int tmp_ret = OB_SUCCESS; if (cluster_version > 0) { major_working_cluster_version_ = cluster_version; + } else if (OB_TMP_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) { + STORAGE_LOG(WARN, "fail to get data version", K(tmp_ret)); } else { - if (OB_FAIL(set_major_working_cluster_version())) { - STORAGE_LOG(WARN, "Failed to set major working cluster version", K(ret), K(*this)); - } + major_working_cluster_version_ = compat_version; } + STORAGE_LOG(INFO, "success to set major working cluster version", K(tmp_ret), K(merge_type), K(cluster_version), K(major_working_cluster_version_)); } if (OB_FAIL(ret)) { diff --git a/src/storage/blocksstable/ob_macro_block.h b/src/storage/blocksstable/ob_macro_block.h index bb78e1075..84ffd669c 100644 --- a/src/storage/blocksstable/ob_macro_block.h +++ b/src/storage/blocksstable/ob_macro_block.h @@ -129,7 +129,6 @@ private: int cal_row_store_type( const share::schema::ObMergeSchema &schema, const storage::ObMergeType merge_type); - int set_major_working_cluster_version(); int get_emergency_row_store_type(); private: DISALLOW_COPY_AND_ASSIGN(ObDataStoreDesc); diff --git a/src/storage/compaction/ob_medium_compaction_func.cpp b/src/storage/compaction/ob_medium_compaction_func.cpp index 9d4c8b777..d8dcd1036 100644 --- a/src/storage/compaction/ob_medium_compaction_func.cpp +++ b/src/storage/compaction/ob_medium_compaction_func.cpp @@ -69,7 +69,6 @@ int ObMediumCompactionScheduleFunc::choose_medium_snapshot( medium_info.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION; medium_info.medium_merge_reason_ = merge_reason; medium_info.medium_snapshot_ = result.version_range_.snapshot_version_; - medium_info.medium_scn_ = result.scn_range_.end_scn_; LOG_TRACE("choose_medium_snapshot", K(ret), "ls_id", ls.get_ls_id(), "tablet_id", tablet.get_tablet_meta().tablet_id_, K(result), K(medium_info)); } @@ -94,8 +93,6 @@ int ObMediumCompactionScheduleFunc::choose_major_snapshot( } else { ret = OB_NO_NEED_MERGE; } - } else if (OB_FAIL(medium_info.medium_scn_.convert_for_tx(schedule_medium_snapshot))) { - LOG_WARN("failed to convert into scn", K(ret), K(schedule_medium_snapshot)); } else { medium_info.compaction_type_ = ObMediumCompactionInfo::MAJOR_COMPACTION; medium_info.medium_merge_reason_ = ObAdaptiveMergePolicy::AdaptiveMergeReason::NONE; @@ -211,10 +208,17 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot( int64_t multi_version_start = 0; ObGetMergeTablesResult result; ObMediumCompactionInfo medium_info; + uint64_t compat_version = 0; if (OB_FAIL(choose_medium_scn[is_major](ls_, tablet_, schedule_medium_snapshot, merge_reason, medium_info, result))) { if (OB_NO_NEED_MERGE != ret) { LOG_WARN("failed to choose medium snapshot", K(ret), KPC(this)); } + } else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) { + LOG_WARN("fail to get data version", K(ret)); + } else if (OB_UNLIKELY(compat_version < DATA_VERSION_4_1_0_0)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid data version to schedule medium compaction", K(ret), K(compat_version)); + } else if (FALSE_IT(medium_info.data_version_ = compat_version)) { } else if (is_major) { // do nothing } else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) { diff --git a/src/storage/compaction/ob_medium_compaction_mgr.cpp b/src/storage/compaction/ob_medium_compaction_mgr.cpp index e82b7c6c1..86cb4591c 100644 --- a/src/storage/compaction/ob_medium_compaction_mgr.cpp +++ b/src/storage/compaction/ob_medium_compaction_mgr.cpp @@ -234,8 +234,8 @@ ObMediumCompactionInfo::ObMediumCompactionInfo() medium_merge_reason_(ObAdaptiveMergePolicy::NONE), reserved_(0), cluster_id_(0), + data_version_(0), medium_snapshot_(0), - medium_scn_(), storage_schema_(), parallel_merge_info_() { @@ -263,7 +263,7 @@ int ObMediumCompactionInfo::init( info_ = medium_info.info_; cluster_id_ = medium_info.cluster_id_; medium_snapshot_ = medium_info.medium_snapshot_; - medium_scn_ = medium_info.medium_scn_; + data_version_ = medium_info.data_version_; } return ret; } @@ -272,7 +272,7 @@ bool ObMediumCompactionInfo::is_valid() const { return COMPACTION_TYPE_MAX != compaction_type_ && medium_snapshot_ > 0 - && medium_scn_.get_val_for_tx() > 0 + && data_version_ > 0 && storage_schema_.is_valid() && parallel_merge_info_.is_valid(); } @@ -284,7 +284,7 @@ void ObMediumCompactionInfo::reset() compaction_type_ = COMPACTION_TYPE_MAX; cluster_id_ = 0; medium_snapshot_ = 0; - medium_scn_.set_min(); + data_version_ = 0; storage_schema_.reset(); parallel_merge_info_.destroy(); } @@ -349,7 +349,7 @@ int ObMediumCompactionInfo::serialize(char *buf, const int64_t buf_len, int64_t info_, cluster_id_, medium_snapshot_, - medium_scn_, + data_version_, storage_schema_); if (contain_parallel_range_) { LST_DO_CODE( @@ -376,7 +376,7 @@ int ObMediumCompactionInfo::deserialize( info_, cluster_id_, medium_snapshot_, - medium_scn_); + data_version_); if (OB_FAIL(ret)) { } else if (OB_FAIL(storage_schema_.deserialize(allocator, buf, data_len, pos))) { LOG_WARN("failed to deserialize storage schema", K(ret)); @@ -400,7 +400,7 @@ int64_t ObMediumCompactionInfo::get_serialize_size() const info_, cluster_id_, medium_snapshot_, - medium_scn_, + data_version_, storage_schema_); if (contain_parallel_range_) { LST_DO_CODE(OB_UNIS_ADD_LEN, parallel_merge_info_); @@ -702,7 +702,7 @@ int ObTabletMediumCompactionInfoRecorder::submit_log( } else if (OB_FAIL(write_clog(clog_buf, clog_len))) { LOG_WARN("fail to submit log", K(ret), K_(tablet_id), K(medium_info_)); int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(dec_ref_on_memtable(false))) { + if (clog_scn_.get_val_for_tx() > 0 && OB_TMP_FAIL(dec_ref_on_memtable(false))) { LOG_ERROR("failed to dec ref on memtable", K(tmp_ret), K_(ls_id), K_(tablet_id)); } } else { diff --git a/src/storage/compaction/ob_medium_compaction_mgr.h b/src/storage/compaction/ob_medium_compaction_mgr.h index 3bc51afec..87dc4647c 100644 --- a/src/storage/compaction/ob_medium_compaction_mgr.h +++ b/src/storage/compaction/ob_medium_compaction_mgr.h @@ -128,10 +128,10 @@ public: int64_t get_serialize_size() const; void gene_info(char* buf, const int64_t buf_len, int64_t &pos) const; - TO_STRING_KV(K_(cluster_id), K_(medium_compat_version), + TO_STRING_KV(K_(cluster_id), K_(medium_compat_version), K_(data_version), "compaction_type", ObMediumCompactionInfo::get_compaction_type_str((ObCompactionType)compaction_type_), "medium_merge_reason", ObAdaptiveMergePolicy::merge_reason_to_str(medium_merge_reason_), K_(cluster_id), - K_(medium_snapshot), K_(medium_scn), K_(storage_schema), + K_(medium_snapshot), K_(storage_schema), K_(contain_parallel_range), K_(parallel_merge_info)); public: static const int64_t MEIDUM_COMPAT_VERSION = 1; @@ -153,8 +153,8 @@ public: }; uint64_t cluster_id_; // for backup database to throw MEDIUM_COMPACTION clog + uint64_t data_version_; int64_t medium_snapshot_; - share::SCN medium_scn_; // for follower minor merge storage::ObStorageSchema storage_schema_; ObParallelMergeInfo parallel_merge_info_; }; diff --git a/src/storage/compaction/ob_partition_merge_policy.cpp b/src/storage/compaction/ob_partition_merge_policy.cpp index 5abbc23ea..8a3c404e8 100644 --- a/src/storage/compaction/ob_partition_merge_policy.cpp +++ b/src/storage/compaction/ob_partition_merge_policy.cpp @@ -780,7 +780,7 @@ int ObPartitionMergePolicy::refine_minor_merge_result( { int ret = OB_SUCCESS; ObMergeType &merge_type = result.suggest_merge_type_; - if (result.handle_.get_count() <= minor_compact_trigger) { + if (result.handle_.get_count() <= MAX(minor_compact_trigger, 1)) { ret = OB_NO_NEED_MERGE; LOG_DEBUG("minor refine, no need to do minor merge", K(result)); result.handle_.reset(); diff --git a/src/storage/compaction/ob_partition_merger.cpp b/src/storage/compaction/ob_partition_merger.cpp index 8f97d738c..efb476064 100644 --- a/src/storage/compaction/ob_partition_merger.cpp +++ b/src/storage/compaction/ob_partition_merger.cpp @@ -85,7 +85,8 @@ int ObPartitionMerger::init_data_store_desc(ObTabletMergeCtx &ctx) ctx.param_.ls_id_, ctx.param_.tablet_id_, ctx.param_.get_merge_type(), - ctx.sstable_version_range_.snapshot_version_))) { + ctx.sstable_version_range_.snapshot_version_, + ctx.data_version_))) { STORAGE_LOG(WARN, "Failed to init data store desc", K(ret), K(ctx)); } else { merge_info_.reset(); diff --git a/src/storage/compaction/ob_tablet_merge_ctx.cpp b/src/storage/compaction/ob_tablet_merge_ctx.cpp index 3693c6d0c..2d9b8500f 100644 --- a/src/storage/compaction/ob_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_tablet_merge_ctx.cpp @@ -524,6 +524,7 @@ ObTabletMergeCtx::ObTabletMergeCtx( compaction_filter_(nullptr), time_guard_(), rebuild_seq_(-1), + data_version_(0), merge_list_() { merge_scn_.set_max(); @@ -667,6 +668,7 @@ int ObTabletMergeCtx::inner_init_for_medium() } else if (OB_FAIL(init_get_medium_compaction_info(param_.merge_version_, medium_info))) { // have checked medium info inside LOG_WARN("failed to get medium compaction info", K(ret), KPC(this)); } else if (FALSE_IT(get_merge_table_result.schema_version_ = medium_info->storage_schema_.schema_version_)) { + } else if (FALSE_IT(data_version_ = medium_info->data_version_)) { } else if (FALSE_IT(is_tenant_major_merge_ = medium_info->is_major_compaction())) { } else if (OB_FAIL(get_basic_info_from_result(get_merge_table_result))) { LOG_WARN("failed to set basic info to ctx", K(ret), K(get_merge_table_result), KPC(this)); @@ -1111,7 +1113,8 @@ int ObTabletMergeCtx::prepare_index_tree() param_.ls_id_, param_.tablet_id_, param_.merge_type_, - sstable_version_range_.snapshot_version_))) { + sstable_version_range_.snapshot_version_, + data_version_))) { LOG_WARN("failed to init index store desc", K(ret), KPC(this)); } else { // TODO(zhuixin.gsy) modify index_desc.init to avoid reset col_desc_array_ diff --git a/src/storage/compaction/ob_tablet_merge_ctx.h b/src/storage/compaction/ob_tablet_merge_ctx.h index 96990fa74..81e3adcee 100644 --- a/src/storage/compaction/ob_tablet_merge_ctx.h +++ b/src/storage/compaction/ob_tablet_merge_ctx.h @@ -242,6 +242,7 @@ public: compaction::ObICompactionFilter *compaction_filter_; ObCompactionTimeGuard time_guard_; int64_t rebuild_seq_; + uint64_t data_version_; ObMediumCompactionInfoList merge_list_; TO_STRING_KV(K_(param), K_(sstable_version_range), K_(create_snapshot_version), @@ -255,7 +256,7 @@ public: K_(scn_range), K_(merge_scn), K_(read_base_version), K_(ls_handle), K_(tablet_handle), KPC_(merge_progress), - KPC_(compaction_filter), K_(time_guard), K_(rebuild_seq), K_(merge_list)); + KPC_(compaction_filter), K_(time_guard), K_(rebuild_seq), K_(data_version), K_(merge_list)); private: DISALLOW_COPY_AND_ASSIGN(ObTabletMergeCtx); }; diff --git a/src/storage/compaction/ob_tenant_freeze_info_mgr.cpp b/src/storage/compaction/ob_tenant_freeze_info_mgr.cpp index c7ae746e3..677540ad1 100644 --- a/src/storage/compaction/ob_tenant_freeze_info_mgr.cpp +++ b/src/storage/compaction/ob_tenant_freeze_info_mgr.cpp @@ -912,7 +912,12 @@ void ObTenantFreezeInfoMgr::ReloadTask::runTimerTask() void ObTenantFreezeInfoMgr::UpdateLSResvSnapshotTask::runTimerTask() { int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(mgr_.try_update_reserved_snapshot())) { + uint64_t compat_version = 0; + if (OB_TMP_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) { + LOG_WARN("fail to get data version", K(tmp_ret)); + } else if (compat_version < DATA_VERSION_4_1_0_0) { + // do nothing, should not update reserved snapshot + } else if (OB_TMP_FAIL(mgr_.try_update_reserved_snapshot())) { LOG_WARN("fail to try reserved snapshot", KR(tmp_ret)); } } diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index c7ae9c6e8..340797ccb 100644 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -708,7 +708,9 @@ int ObTenantTabletScheduler::schedule_tablet_minor_merge( } else { ObTabletMergeDagParam dag_param(MERGE_TYPES[i], ls_id, tablet_id); for (int k = 0; OB_SUCC(ret) && k < parallel_results.count(); ++k) { - if (OB_FAIL(schedule_merge_execute_dag(dag_param, ls_handle, tablet_handle, parallel_results.at(k)))) { + if (OB_UNLIKELY(parallel_results.at(k).handle_.get_count() <= 1)) { + LOG_WARN("invalid parallel result", K(ret), K(k), K(parallel_results)); + } else if (OB_FAIL(schedule_merge_execute_dag(dag_param, ls_handle, tablet_handle, parallel_results.at(k)))) { LOG_WARN("failed to schedule minor execute dag", K(ret), K(k), K(parallel_results.at(k))); } else { LOG_INFO("success to schedule tablet minor merge", K(ret), K(ls_id), K(tablet_id), diff --git a/src/storage/ls/ob_ls_reserved_snapshot_mgr.cpp b/src/storage/ls/ob_ls_reserved_snapshot_mgr.cpp index 6d246946b..8d5d59e89 100644 --- a/src/storage/ls/ob_ls_reserved_snapshot_mgr.cpp +++ b/src/storage/ls/ob_ls_reserved_snapshot_mgr.cpp @@ -179,7 +179,7 @@ int ObLSReservedSnapshotMgr::update_min_reserved_snapshot_for_leader(const int64 } // end of lock if (OB_SUCC(ret) && send_log_flag) { - if (OB_FAIL(try_update_for_leader(new_snapshot_version, nullptr/*allocator*/))) { + if (OB_FAIL(sync_clog(new_snapshot_version))) { LOG_WARN("failed to send update reserved snapshot log", K(ret), K(new_snapshot_version)); } else if (need_print_log()) { LOG_INFO("submit reserved snapshot log success", "ls_id", ls_->get_ls_id(), @@ -204,7 +204,7 @@ int ObLSReservedSnapshotMgr::try_sync_reserved_snapshot( if (OB_FAIL(update_min_reserved_snapshot_for_leader(new_reserved_snapshot))) { LOG_WARN("failed to update min_reserved_snapshot", K(ret), K(new_reserved_snapshot)); } - } else if (OB_FAIL(try_update_for_leader(new_reserved_snapshot, nullptr/*allocator*/))) { + } else if (OB_FAIL(sync_clog(new_reserved_snapshot))) { LOG_WARN("failed to send update reserved snapshot log", K(ret), K(new_reserved_snapshot)); } else if (need_print_log()) { LOG_INFO("submit reserved snapshot log success", "ls_id", ls_->get_ls_id(), @@ -213,6 +213,20 @@ int ObLSReservedSnapshotMgr::try_sync_reserved_snapshot( return ret; } +int ObLSReservedSnapshotMgr::sync_clog(const int64_t new_reserved_snapshot) +{ + int ret = OB_SUCCESS; + uint64_t compat_version = 0; + if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) { + LOG_WARN("fail to get data version", K(ret)); + } else if (compat_version < DATA_VERSION_4_1_0_0) { + // do nothing, should sync clog + } else if (OB_FAIL(try_update_for_leader(new_reserved_snapshot, nullptr/*allocator*/))) { + LOG_WARN("failed to send update reserved snapshot log", K(ret), K(new_reserved_snapshot)); + } + return ret; +} + int ObLSReservedSnapshotMgr::replay_reserved_snapshot_log( const share::SCN &scn, const char *buf, const int64_t size, int64_t &pos) { diff --git a/src/storage/ls/ob_ls_reserved_snapshot_mgr.h b/src/storage/ls/ob_ls_reserved_snapshot_mgr.h index a52451221..8d48af4bf 100644 --- a/src/storage/ls/ob_ls_reserved_snapshot_mgr.h +++ b/src/storage/ls/ob_ls_reserved_snapshot_mgr.h @@ -90,6 +90,7 @@ private: } return bret; } + int sync_clog(const int64_t new_reserved_snapshot); bool is_inited_; common::ObArenaAllocator allocator_; diff --git a/src/storage/tablet/ob_tablet.cpp b/src/storage/tablet/ob_tablet.cpp index b422ac4ca..d5c8c580b 100644 --- a/src/storage/tablet/ob_tablet.cpp +++ b/src/storage/tablet/ob_tablet.cpp @@ -2425,10 +2425,12 @@ int ObTablet::get_kept_multi_version_start( int64_t &multi_version_start) { int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; multi_version_start = 0; int64_t max_merged_snapshot = 0; int64_t min_reserved_snapshot = 0; int64_t min_medium_snapshot = INT64_MAX; + int64_t ls_min_reserved_snapshot = INT64_MAX; const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_; const ObTabletTableStore &table_store = tablet.get_table_store(); if (0 != table_store.get_major_sstables().count()) { @@ -2444,9 +2446,17 @@ int ObTablet::get_kept_multi_version_start( && OB_FAIL(tablet.get_min_medium_snapshot(min_medium_snapshot))) { LOG_WARN("failed to get min medium snapshot", K(ret), K(tablet)); } + + // for compat, if cluster not upgrade to 4.1, should not consider ls.get_min_reserved_snapshot() + uint64_t compat_version = 0; + if (OB_TMP_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) { + LOG_WARN("fail to get data version", K(tmp_ret)); + } else if (compat_version >= DATA_VERSION_4_1_0_0) { + ls_min_reserved_snapshot = ls.get_min_reserved_snapshot(); + } if (OB_SUCC(ret)) { min_reserved_snapshot = common::min( - ls.get_min_reserved_snapshot(), + ls_min_reserved_snapshot, common::min(min_reserved_snapshot, min_medium_snapshot)); multi_version_start = MIN(MAX(min_reserved_snapshot, multi_version_start), tablet.get_snapshot_version()); } diff --git a/unittest/storage/test_compaction_policy.cpp b/unittest/storage/test_compaction_policy.cpp index 1396ffa20..4c93cb25b 100644 --- a/unittest/storage/test_compaction_policy.cpp +++ b/unittest/storage/test_compaction_policy.cpp @@ -169,7 +169,7 @@ void TestCompactionPolicy::SetUp() medium_info_.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION; medium_info_.medium_snapshot_ = 100; - medium_info_.medium_scn_.convert_for_tx(100); + medium_info_.data_version_ = 100; medium_info_.storage_schema_.init(allocator_, table_schema, lib::Worker::CompatMode::MYSQL); } diff --git a/unittest/storage/test_medium_compaction_mgr.cpp b/unittest/storage/test_medium_compaction_mgr.cpp index dcb7d51de..10e859b69 100644 --- a/unittest/storage/test_medium_compaction_mgr.cpp +++ b/unittest/storage/test_medium_compaction_mgr.cpp @@ -30,7 +30,7 @@ public: medium_info_.compaction_type_ = ObMediumCompactionInfo::MEDIUM_COMPACTION; medium_info_.medium_snapshot_ = 100; - medium_info_.medium_scn_.convert_for_tx(100); + medium_info_.data_version_ = 100; medium_info_.cluster_id_ = INIT_CLUSTER_ID; medium_info_.storage_schema_.init(allocator_, table_schema, lib::Worker::CompatMode::MYSQL);