diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index 90b443a4c5..a6aafe14eb 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -140,6 +140,7 @@ ObMemtable::ObMemtable() max_column_cnt_(0) { mt_stat_.reset(); + migration_clog_checkpoint_scn_.set_min(); } ObMemtable::~ObMemtable() @@ -270,6 +271,7 @@ void ObMemtable::destroy() transfer_freeze_flag_ = false; recommend_snapshot_version_.reset(); max_end_scn_ = ObScnRange::MIN_SCN; + migration_clog_checkpoint_scn_.set_min(); rec_scn_ = SCN::max_scn(); read_barrier_ = false; is_tablet_freeze_ = false; @@ -1749,7 +1751,7 @@ void ObMemtable::resolve_left_boundary_for_active_memtable() { int ret = OB_SUCCESS; storage::ObTabletMemtableMgr *memtable_mgr = get_memtable_mgr_(); - const SCN new_start_scn = get_end_scn(); + const SCN new_start_scn = MAX(get_end_scn(), get_migration_clog_checkpoint_scn()); if (OB_NOT_NULL(memtable_mgr)) { do { @@ -1814,6 +1816,23 @@ int ObMemtable::get_frozen_schema_version(int64_t &schema_version) const return OB_NOT_SUPPORTED; } +int ObMemtable::set_migration_clog_checkpoint_scn(const SCN &clog_checkpoint_scn) +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "not inited", K(ret)); + } else if (clog_checkpoint_scn <= ObScnRange::MIN_SCN) { + ret = OB_SCN_OUT_OF_BOUND; + TRANS_LOG(WARN, "invalid clog_checkpoint_ts", K(ret)); + } else { + (void)migration_clog_checkpoint_scn_.atomic_store(clog_checkpoint_scn); + } + + return ret; +} + int ObMemtable::set_snapshot_version(const SCN snapshot_version) { int ret = OB_SUCCESS; @@ -2285,6 +2304,38 @@ int ObMemtable::resolve_right_boundary() return ret; } +int ObMemtable::resolve_right_boundary_for_migration() +{ + bool bool_ret = false; + int ret = OB_SUCCESS; + share::ObLSID ls_id = freezer_->get_ls_id(); + int64_t start_time = ObTimeUtility::current_time(); + + do { + bool_ret = is_frozen_memtable() && 0 == get_write_ref(); + if (bool_ret) { + if (OB_FAIL(resolve_snapshot_version_())) { + TRANS_LOG(WARN, "fail to resolve snapshot version", K(ret), KPC(this), K(ls_id)); + } else if (OB_FAIL(resolve_max_end_scn_())) { + TRANS_LOG(WARN, "fail to resolve max_end_scn", K(ret), KPC(this), K(ls_id)); + } else { + resolve_right_boundary(); + TRANS_LOG(INFO, "resolve_right_boundary_for_migration", K(ls_id), KPC(this)); + } + } else { + const int64_t cost_time = ObTimeUtility::current_time() - start_time; + if (cost_time > 5 * 1000 * 1000) { + if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) { + TRANS_LOG(WARN, "cannot resolve_right_boundary_for_migration", K(ret), KPC(this), K(ls_id)); + } + } + ob_usleep(100); + } + } while (!bool_ret); + + return ret; +} + void ObMemtable::resolve_left_boundary(SCN end_scn) { set_start_scn(end_scn); diff --git a/src/storage/memtable/ob_memtable.h b/src/storage/memtable/ob_memtable.h index 34bbe9e32b..dd2ef79915 100644 --- a/src/storage/memtable/ob_memtable.h +++ b/src/storage/memtable/ob_memtable.h @@ -481,6 +481,9 @@ public: blocksstable::ObDatumRange &m_get_real_range(blocksstable::ObDatumRange &real_range, const blocksstable::ObDatumRange &range, const bool is_reverse) const; int get_tx_table_guard(storage::ObTxTableGuard &tx_table_guard); + int set_migration_clog_checkpoint_scn(const share::SCN &clog_checkpoint_scn); + share::SCN get_migration_clog_checkpoint_scn() { return migration_clog_checkpoint_scn_.atomic_get(); } + int resolve_right_boundary_for_migration(); void unset_logging_blocked_for_active_memtable(); void resolve_left_boundary_for_active_memtable(); void set_allow_freeze(const bool allow_freeze); @@ -506,7 +509,7 @@ public: K_(freeze_clock), K_(max_schema_version), K_(max_data_schema_version), K_(max_column_cnt), K_(write_ref_cnt), K_(local_allocator), K_(unsubmitted_cnt), K_(logging_blocked), K_(unset_active_memtable_logging_blocked), K_(resolved_active_memtable_left_boundary), - K_(contain_hotspot_row), K_(max_end_scn), K_(rec_scn), K_(snapshot_version), + K_(contain_hotspot_row), K_(max_end_scn), K_(rec_scn), K_(snapshot_version), K_(migration_clog_checkpoint_scn), K_(is_tablet_freeze), K_(contain_hotspot_row), K_(read_barrier), K_(is_flushed), K_(freeze_state), K_(allow_freeze), K_(mt_stat_.frozen_time), K_(mt_stat_.ready_for_flush_time), @@ -651,6 +654,7 @@ private: int64_t state_; int64_t freeze_state_; int64_t timestamp_; + share::SCN migration_clog_checkpoint_scn_; bool is_tablet_freeze_; bool is_flushed_; bool read_barrier_ CACHE_ALIGNED; diff --git a/src/storage/ob_storage_table_guard.cpp b/src/storage/ob_storage_table_guard.cpp index d614342ad7..7579e859ba 100644 --- a/src/storage/ob_storage_table_guard.cpp +++ b/src/storage/ob_storage_table_guard.cpp @@ -365,6 +365,7 @@ int ObStorageTableGuard::check_freeze_to_inc_write_ref(ObITable *table, bool &bo if (0 == write_ref) { SCN clog_checkpoint_scn; bool need_create_memtable = true; + SCN migration_clog_checkpoint_scn; ObTabletHandle tmp_handle; ObLSHandle ls_handle; if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) { @@ -376,9 +377,13 @@ int ObStorageTableGuard::check_freeze_to_inc_write_ref(ObITable *table, bool &bo tmp_handle, 0, ObMDSGetTabletMode::READ_WITHOUT_CHECK))) { LOG_WARN("fail to get tablet", K(ret), K(ls_id), K(tablet_id)); } else if (FALSE_IT(clog_checkpoint_scn = tmp_handle.get_obj()->get_tablet_meta().clog_checkpoint_scn_)) { - } else if (for_replay_ && replay_scn_ <= clog_checkpoint_scn) { - for_replace_tablet_meta = true; - need_create_memtable = false; + } else if (FALSE_IT(migration_clog_checkpoint_scn = static_cast(memtable)->get_migration_clog_checkpoint_scn())) { + } else if (for_replay_ && !migration_clog_checkpoint_scn.is_min()) { + static_cast(memtable)->resolve_right_boundary(); + if (replay_scn_ <= clog_checkpoint_scn) { + for_replace_tablet_meta = true; + need_create_memtable = false; + } } // create a new memtable if no write in the old memtable diff --git a/src/storage/tablet/ob_tablet.cpp b/src/storage/tablet/ob_tablet.cpp index 5ac15cfc8f..ba4261d8f8 100644 --- a/src/storage/tablet/ob_tablet.cpp +++ b/src/storage/tablet/ob_tablet.cpp @@ -5778,6 +5778,46 @@ int ObTablet::get_finish_medium_scn(int64_t &finish_medium_scn) const return ret; } +int ObTablet::set_memtable_clog_checkpoint_scn( + const ObMigrationTabletParam *tablet_meta) +{ + int ret = OB_SUCCESS; + ObTableHandleV2 handle; + memtable::ObMemtable *memtable = nullptr; + + ObProtectedMemtableMgrHandle *protected_handle = NULL; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not inited", K(ret), K_(is_inited)); + } else if (OB_ISNULL(tablet_meta)) { + // no need to set memtable clog checkpoint ts + } else if (tablet_meta->clog_checkpoint_scn_ <= tablet_meta_.clog_checkpoint_scn_) { + // do nothing + } else if (is_ls_inner_tablet()) { + if (OB_FAIL(get_protected_memtable_mgr_handle(protected_handle))) { + LOG_WARN("failed to get_protected_memtable_mgr_handle", K(ret), KPC(this)); + } else if (OB_UNLIKELY(protected_handle->has_memtable())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ls inner tablet should not have memtable", K(ret), KPC(tablet_meta)); + } + } else if (OB_FAIL(get_boundary_memtable(handle))) { + if (OB_ENTRY_NOT_EXIST == ret) { + ret = OB_SUCCESS; + } else { + LOG_WARN("failed to get boundary memtable for tablet", K(ret), KPC(this), KPC(tablet_meta)); + } + } else if (OB_FAIL(handle.get_data_memtable(memtable))) { + LOG_WARN("failed to get memtable", K(ret), K(handle)); + } else if (OB_ISNULL(memtable)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected null memtable", K(ret), KPC(memtable)); + } else if (OB_FAIL(memtable->set_migration_clog_checkpoint_scn(tablet_meta->clog_checkpoint_scn_))) { + LOG_WARN("failed to set migration clog checkpoint ts", K(ret), K(handle), KPC(this)); + } + + return ret; +} + int ObTablet::get_medium_info_list( common::ObArenaAllocator &allocator, compaction::ObMediumCompactionInfoList &medium_info_list) const diff --git a/src/storage/tablet/ob_tablet.h b/src/storage/tablet/ob_tablet.h index 2a1e040e1a..e522fcc83f 100644 --- a/src/storage/tablet/ob_tablet.h +++ b/src/storage/tablet/ob_tablet.h @@ -518,6 +518,7 @@ public: int64_t &required_size, const bool need_checksums = true); int check_and_set_initial_state(); + int set_memtable_clog_checkpoint_scn(const ObMigrationTabletParam *tablet_meta); int read_mds_table( common::ObIAllocator &allocator, ObTabletMdsData &mds_data, diff --git a/src/storage/tablet/ob_tablet_memtable_mgr.cpp b/src/storage/tablet/ob_tablet_memtable_mgr.cpp index 961985a90d..e6d5413d6a 100644 --- a/src/storage/tablet/ob_tablet_memtable_mgr.cpp +++ b/src/storage/tablet/ob_tablet_memtable_mgr.cpp @@ -206,7 +206,8 @@ inline int ObTabletMemtableMgr::try_resolve_boundary_on_create_memtable_for_lead last_frozen_memtable->resolve_right_boundary(); TRANS_LOG(INFO, "[resolve_right_boundary] in create_memtable on leader", KPC(last_frozen_memtable)); if (new_memtable != last_frozen_memtable) { - new_memtable->resolve_left_boundary(last_frozen_memtable->get_end_scn()); + const SCN &new_start_scn = MAX(last_frozen_memtable->get_end_scn(), last_frozen_memtable->get_migration_clog_checkpoint_scn()); + new_memtable->resolve_left_boundary(new_start_scn); } } else if (unsubmitted_cnt > 0) { new_memtable->set_logging_blocked(); @@ -312,7 +313,9 @@ int ObTabletMemtableMgr::create_memtable(const SCN clog_checkpoint_scn, last_frozen_memtable->resolve_right_boundary(); TRANS_LOG(INFO, "[resolve_right_boundary] in create_memtable on replay", KPC(last_frozen_memtable)); if (memtable != last_frozen_memtable) { - memtable->resolve_left_boundary(last_frozen_memtable->get_end_scn()); + const SCN start_scn = MAX(last_frozen_memtable->get_end_scn(), + last_frozen_memtable->get_migration_clog_checkpoint_scn()); + memtable->resolve_left_boundary(start_scn); } } // for leader, decide the right boundary of frozen memtable