Filter logs for replacing tablet_meta
This commit is contained in:
		@ -2065,7 +2065,8 @@ int ObLSTabletService::post_handle_batch_create_tablets(
 | 
			
		||||
int ObLSTabletService::create_memtable(
 | 
			
		||||
    const common::ObTabletID &tablet_id,
 | 
			
		||||
    const int64_t schema_version,
 | 
			
		||||
    const bool for_replay)
 | 
			
		||||
    const bool for_replay,
 | 
			
		||||
    const SCN clog_checkpoint_scn)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObTimeGuard time_guard("ObLSTabletService::create_memtable", 10 * 1000);
 | 
			
		||||
@ -2087,7 +2088,7 @@ int ObLSTabletService::create_memtable(
 | 
			
		||||
    } else if (OB_UNLIKELY(!handle.is_valid())) {
 | 
			
		||||
      ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
      LOG_WARN("unexpected error, invalid tablet handle", K(ret), K(handle));
 | 
			
		||||
    } else if (OB_FAIL(handle.get_obj()->create_memtable(schema_version, for_replay))) {
 | 
			
		||||
    } else if (OB_FAIL(handle.get_obj()->create_memtable(schema_version, clog_checkpoint_scn, for_replay))) {
 | 
			
		||||
      if (OB_MINOR_FREEZE_NOT_ALLOW != ret) {
 | 
			
		||||
        LOG_WARN("fail to create memtable", K(ret), K(handle), K(schema_version), K(tablet_id));
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
@ -290,7 +290,8 @@ public:
 | 
			
		||||
  int create_memtable(
 | 
			
		||||
      const common::ObTabletID &tablet_id,
 | 
			
		||||
      const int64_t schema_version,
 | 
			
		||||
      const bool for_replay = false);
 | 
			
		||||
      const bool for_replay = false,
 | 
			
		||||
      const share::SCN clog_checkpoint_scn = share::SCN::min_scn());
 | 
			
		||||
  int get_read_tables(
 | 
			
		||||
      const common::ObTabletID &tablet_id,
 | 
			
		||||
      const int64_t snapshot_version,
 | 
			
		||||
 | 
			
		||||
@ -160,6 +160,7 @@ int ObStorageTableGuard::refresh_and_protect_memtable()
 | 
			
		||||
  const common::ObTabletID &tablet_id = tablet_->get_tablet_meta().tablet_id_;
 | 
			
		||||
  SCN clog_checkpoint_scn;
 | 
			
		||||
  bool bool_ret = true;
 | 
			
		||||
  bool for_replace_tablet_meta = false;
 | 
			
		||||
  const int64_t start = ObTimeUtility::current_time();
 | 
			
		||||
 | 
			
		||||
  if (OB_ISNULL(memtable_mgr)) {
 | 
			
		||||
@ -182,7 +183,7 @@ int ObStorageTableGuard::refresh_and_protect_memtable()
 | 
			
		||||
              ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
              LOG_WARN("unexpected error, invalid ls handle", K(ret), K(ls_handle), K(ls_id), K(tablet_id));
 | 
			
		||||
            } else if (OB_FAIL(ls_handle.get_ls()->get_tablet_svr()->create_memtable(
 | 
			
		||||
                                                                                     tablet_id, 0/*schema version*/, for_replay_))) {
 | 
			
		||||
                                                                                     tablet_id, 0/*schema version*/, for_replay_, clog_checkpoint_scn))) {
 | 
			
		||||
              LOG_WARN("fail to create a boundary memtable", K(ret), K(ls_id), K(tablet_id));
 | 
			
		||||
            }
 | 
			
		||||
          } else { // replay_log_scn_ <= clog_checkpoint_scn
 | 
			
		||||
@ -195,8 +196,9 @@ int ObStorageTableGuard::refresh_and_protect_memtable()
 | 
			
		||||
        }
 | 
			
		||||
      } else if (OB_FAIL(handle.get_memtable(memtable))) {
 | 
			
		||||
        LOG_WARN("fail to get memtable from ObTableHandle", K(ret), K(ls_id), K(tablet_id));
 | 
			
		||||
      } else if (OB_FAIL(check_freeze_to_inc_write_ref(memtable, bool_ret))) {
 | 
			
		||||
        if (OB_MINOR_FREEZE_NOT_ALLOW != ret) {
 | 
			
		||||
      } else if (OB_FAIL(check_freeze_to_inc_write_ref(memtable, bool_ret, for_replace_tablet_meta))) {
 | 
			
		||||
        if (OB_EAGAIN == ret) {
 | 
			
		||||
        } else if (OB_MINOR_FREEZE_NOT_ALLOW != ret) {
 | 
			
		||||
          LOG_WARN("fail to check_freeze", K(ret), K(tablet_id), K(bool_ret), KPC(memtable));
 | 
			
		||||
        }
 | 
			
		||||
      } else {
 | 
			
		||||
@ -209,7 +211,7 @@ int ObStorageTableGuard::refresh_and_protect_memtable()
 | 
			
		||||
                    K(ls_id), K(tablet_id), K(cost_time));
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    } while ((OB_SUCC(ret) || OB_ENTRY_NOT_EXIST == ret) && bool_ret);
 | 
			
		||||
    } while ((OB_SUCC(ret) || OB_ENTRY_NOT_EXIST == ret || OB_EAGAIN == ret) && bool_ret);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return ret;
 | 
			
		||||
@ -260,7 +262,7 @@ int ObStorageTableGuard::get_memtable_for_replay(memtable::ObIMemtable *&memtabl
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObStorageTableGuard::check_freeze_to_inc_write_ref(ObITable *table, bool &bool_ret)
 | 
			
		||||
int ObStorageTableGuard::check_freeze_to_inc_write_ref(ObITable *table, bool &bool_ret, bool &for_replace_tablet_meta)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  bool_ret = true;
 | 
			
		||||
@ -282,7 +284,7 @@ int ObStorageTableGuard::check_freeze_to_inc_write_ref(ObITable *table, bool &bo
 | 
			
		||||
    LOG_INFO("table is null, need to refresh", K(bool_ret), K(ls_id), K(tablet_id));
 | 
			
		||||
  } else if (FALSE_IT(old_freeze_flag = memtable->get_freeze_flag())) {
 | 
			
		||||
  } else if (FALSE_IT(is_tablet_freeze = memtable->get_is_tablet_freeze())) {
 | 
			
		||||
  } else if (memtable->is_active_memtable()) {
 | 
			
		||||
  } else if (memtable->is_active_memtable() || for_replace_tablet_meta) {
 | 
			
		||||
    // the most recent memtable is active
 | 
			
		||||
    // no need to create a new memtable
 | 
			
		||||
    if (for_replay_ || for_multi_source_data_) {
 | 
			
		||||
@ -312,18 +314,42 @@ int ObStorageTableGuard::check_freeze_to_inc_write_ref(ObITable *table, bool &bo
 | 
			
		||||
    // need to create a new memtable
 | 
			
		||||
    // TODO: allow to write frozen memtables except for the boundary one when replaying
 | 
			
		||||
    const int64_t write_ref = memtable->get_write_ref();
 | 
			
		||||
    if (write_ref == 0) {
 | 
			
		||||
      // create a new memtable if no write in the old memtable
 | 
			
		||||
      ObLSHandle ls_handle;
 | 
			
		||||
      if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
 | 
			
		||||
        LOG_WARN("failed to get log stream", K(ret), K(bool_ret), K(ls_id), K(tablet_id));
 | 
			
		||||
      } else if (OB_UNLIKELY(!ls_handle.is_valid())) {
 | 
			
		||||
    if (0 == write_ref) {
 | 
			
		||||
      SCN clog_checkpoint_scn;
 | 
			
		||||
      bool need_create_memtable = true;
 | 
			
		||||
      const SCN migration_clog_checkpoint_scn = static_cast<memtable::ObMemtable *>(memtable)->get_migration_clog_checkpoint_scn();
 | 
			
		||||
      ObIMemtableMgr *memtable_mgr = tablet_->get_memtable_mgr();
 | 
			
		||||
 | 
			
		||||
      if (OB_ISNULL(memtable_mgr)) {
 | 
			
		||||
        ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
        LOG_WARN("unexpected error, invalid ls handle", K(ret), K(bool_ret), K(ls_handle), K(ls_id), K(tablet_id));
 | 
			
		||||
      } else if (OB_FAIL(ls_handle.get_ls()->get_tablet_svr()->create_memtable(tablet_id,
 | 
			
		||||
          memtable->get_max_schema_version()/*schema version*/, for_replay_))) {
 | 
			
		||||
        if (OB_MINOR_FREEZE_NOT_ALLOW != ret) {
 | 
			
		||||
          LOG_WARN("fail to create new memtable for freeze", K(ret), K(bool_ret), K(ls_id), K(tablet_id));
 | 
			
		||||
        LOG_WARN("memtable mgr is null", K(ret), K(bool_ret), K(ls_id), K(tablet_id), KP(memtable_mgr));
 | 
			
		||||
      } else if (OB_FAIL(memtable_mgr->get_newest_clog_checkpoint_scn(clog_checkpoint_scn))) {
 | 
			
		||||
        LOG_WARN("failed to get newest clog_checkpoint_scn", K(ret), K(ls_id), K(tablet_id), K(clog_checkpoint_scn));
 | 
			
		||||
      } else if (for_replay_ && !migration_clog_checkpoint_scn.is_min()) {
 | 
			
		||||
        static_cast<memtable::ObMemtable *>(memtable)->resolve_right_boundary();
 | 
			
		||||
        if (replay_scn_ <= clog_checkpoint_scn) {
 | 
			
		||||
          for_replace_tablet_meta = true;
 | 
			
		||||
          need_create_memtable = false;
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      // create a new memtable if no write in the old memtable
 | 
			
		||||
      if (OB_FAIL(ret)) {
 | 
			
		||||
      } else if (need_create_memtable) {
 | 
			
		||||
        ObLSHandle ls_handle;
 | 
			
		||||
        if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
 | 
			
		||||
          LOG_WARN("failed to get log stream", K(ret), K(bool_ret), K(ls_id), K(tablet_id));
 | 
			
		||||
        } else if (OB_UNLIKELY(!ls_handle.is_valid())) {
 | 
			
		||||
          ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
          LOG_WARN("unexpected error, invalid ls handle", K(ret), K(bool_ret), K(ls_handle), K(ls_id), K(tablet_id));
 | 
			
		||||
        } else if (OB_FAIL(ls_handle.get_ls()->get_tablet_svr()->create_memtable(tablet_id,
 | 
			
		||||
                                                                                 memtable->get_max_schema_version()/*schema version*/,
 | 
			
		||||
                                                                                 for_replay_,
 | 
			
		||||
                                                                                 clog_checkpoint_scn))) {
 | 
			
		||||
          if (OB_EAGAIN == ret) {
 | 
			
		||||
          } else if (OB_MINOR_FREEZE_NOT_ALLOW != ret) {
 | 
			
		||||
            LOG_WARN("fail to create new memtable for freeze", K(ret), K(bool_ret), K(ls_id), K(tablet_id));
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
@ -336,6 +362,7 @@ bool ObStorageTableGuard::need_to_refresh_table(ObTableStoreIterator &iter)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  bool bool_ret = false;
 | 
			
		||||
  bool for_replace_tablet_meta = false;
 | 
			
		||||
  int exit_flag = -1;
 | 
			
		||||
 | 
			
		||||
  ObITable *table = iter.get_boundary_table(true);
 | 
			
		||||
@ -352,7 +379,7 @@ bool ObStorageTableGuard::need_to_refresh_table(ObTableStoreIterator &iter)
 | 
			
		||||
  } else if (iter.check_store_expire()) {
 | 
			
		||||
    bool_ret = true;
 | 
			
		||||
    exit_flag = 1;
 | 
			
		||||
  } else if (OB_FAIL(check_freeze_to_inc_write_ref(table, bool_ret))) {
 | 
			
		||||
  } else if (OB_FAIL(check_freeze_to_inc_write_ref(table, bool_ret, for_replace_tablet_meta))) {
 | 
			
		||||
    bool_ret = true;
 | 
			
		||||
    exit_flag = 2;
 | 
			
		||||
    if (OB_MINOR_FREEZE_NOT_ALLOW != ret) {
 | 
			
		||||
 | 
			
		||||
@ -57,7 +57,7 @@ private:
 | 
			
		||||
      const bool is_tablet_freeze,
 | 
			
		||||
      memtable::ObIMemtable *memtable,
 | 
			
		||||
      bool &bool_ret);
 | 
			
		||||
  int check_freeze_to_inc_write_ref(ObITable *table, bool &bool_ret);
 | 
			
		||||
  int check_freeze_to_inc_write_ref(ObITable *table, bool &bool_ret, bool &for_replace_tablet_meta);
 | 
			
		||||
  bool need_to_refresh_table(ObTableStoreIterator &iter);
 | 
			
		||||
  bool check_if_need_log();
 | 
			
		||||
private:
 | 
			
		||||
 | 
			
		||||
@ -1835,13 +1835,14 @@ int ObTablet::get_active_memtable(ObTableHandleV2 &handle) const
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObTablet::create_memtable(const int64_t schema_version,
 | 
			
		||||
                              const SCN clog_checkpoint_scn,
 | 
			
		||||
                              const bool for_replay)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObTimeGuard time_guard("ObTablet::create_memtable", 10 * 1000);
 | 
			
		||||
  TCWLockGuard guard(table_store_lock_);
 | 
			
		||||
  time_guard.click("lock");
 | 
			
		||||
  const SCN clog_checkpoint_scn = tablet_meta_.clog_checkpoint_scn_;
 | 
			
		||||
  const SCN new_clog_checkpoint_scn = clog_checkpoint_scn.is_min() ? tablet_meta_.clog_checkpoint_scn_ : clog_checkpoint_scn;
 | 
			
		||||
 | 
			
		||||
  if (IS_NOT_INIT) {
 | 
			
		||||
    ret = OB_NOT_INIT;
 | 
			
		||||
@ -1852,11 +1853,11 @@ int ObTablet::create_memtable(const int64_t schema_version,
 | 
			
		||||
  } else if (OB_FAIL(table_store_.prepare_memtables())) {
 | 
			
		||||
    LOG_WARN("failed to pre-allocate memory for new memtable", K(ret), KPC(this));
 | 
			
		||||
  } else if (FALSE_IT(time_guard.click("prepare_memtables"))) {
 | 
			
		||||
  } else if (OB_FAIL(inner_create_memtable(clog_checkpoint_scn, schema_version, for_replay))) {
 | 
			
		||||
  } else if (OB_FAIL(inner_create_memtable(new_clog_checkpoint_scn, schema_version, for_replay))) {
 | 
			
		||||
    if (OB_ENTRY_EXIST == ret) {
 | 
			
		||||
      ret = OB_SUCCESS;
 | 
			
		||||
    } else if (OB_MINOR_FREEZE_NOT_ALLOW != ret) {
 | 
			
		||||
      LOG_WARN("failed to create memtable", K(ret), K(clog_checkpoint_scn),
 | 
			
		||||
      LOG_WARN("failed to create memtable", K(ret), K(new_clog_checkpoint_scn),
 | 
			
		||||
               K(schema_version), K(for_replay));
 | 
			
		||||
    }
 | 
			
		||||
  } else if (FALSE_IT(time_guard.click("inner_create_memtable"))) {
 | 
			
		||||
 | 
			
		||||
@ -407,7 +407,7 @@ private:
 | 
			
		||||
      const lib::Worker::CompatMode compat_mode,
 | 
			
		||||
      ObFreezer *freezer);
 | 
			
		||||
  int build_read_info(common::ObIAllocator &allocator);
 | 
			
		||||
  int create_memtable(const int64_t schema_version, const bool for_replay=false);
 | 
			
		||||
  int create_memtable(const int64_t schema_version, const share::SCN clog_checkpoint_scn, const bool for_replay=false);
 | 
			
		||||
  int try_update_start_scn();
 | 
			
		||||
  int try_update_ddl_checkpoint_scn();
 | 
			
		||||
  int try_update_table_store_flag(const ObUpdateTableStoreParam ¶m);
 | 
			
		||||
 | 
			
		||||
@ -166,6 +166,7 @@ int ObTabletMemtableMgr::create_memtable(const SCN clog_checkpoint_scn,
 | 
			
		||||
  memtable::ObMemtable *active_memtable = nullptr;
 | 
			
		||||
  uint32_t memtable_freeze_clock = UINT32_MAX;
 | 
			
		||||
  const share::ObLSID ls_id = ls_->get_ls_id();
 | 
			
		||||
  SCN new_clog_checkpoint_scn;
 | 
			
		||||
  if (has_memtable && OB_NOT_NULL(active_memtable = get_active_memtable_())) {
 | 
			
		||||
    memtable_freeze_clock = active_memtable->get_freeze_clock();
 | 
			
		||||
  }
 | 
			
		||||
@ -185,6 +186,11 @@ int ObTabletMemtableMgr::create_memtable(const SCN clog_checkpoint_scn,
 | 
			
		||||
               K(get_memtable_count_()),
 | 
			
		||||
               KPC(first_frozen_memtable.get_table()));
 | 
			
		||||
    }
 | 
			
		||||
  } else if (OB_FAIL(get_newest_clog_checkpoint_scn(new_clog_checkpoint_scn))) {
 | 
			
		||||
    LOG_WARN("failed to get newest clog_checkpoint_scn", K(ret), K(ls_id), K(tablet_id_), K(new_clog_checkpoint_scn));
 | 
			
		||||
  } else if (for_replay && clog_checkpoint_scn != new_clog_checkpoint_scn) {
 | 
			
		||||
    ret = OB_EAGAIN;
 | 
			
		||||
    LOG_INFO("clog_checkpoint_scn changed, need retry to replay", K(ls_id), K(tablet_id_), K(clog_checkpoint_scn), K(new_clog_checkpoint_scn));
 | 
			
		||||
  } else {
 | 
			
		||||
    ObITable::TableKey table_key;
 | 
			
		||||
    table_key.table_type_ = ObITable::DATA_MEMTABLE;
 | 
			
		||||
@ -208,8 +214,6 @@ int ObTabletMemtableMgr::create_memtable(const SCN clog_checkpoint_scn,
 | 
			
		||||
      LOG_WARN("failed to init memtable", K(ret), K(ls_id), K(table_key), KP(freezer_), KP(this),
 | 
			
		||||
               K(schema_version), K(logstream_freeze_clock));
 | 
			
		||||
    } else {
 | 
			
		||||
      SCN new_clog_checkpoint_scn;
 | 
			
		||||
      SCN new_snapshot_version;
 | 
			
		||||
      memtable::ObMemtable *last_frozen_memtable = get_last_frozen_memtable_();
 | 
			
		||||
      if (OB_NOT_NULL(last_frozen_memtable)) {
 | 
			
		||||
        // keep the check order: is_frozen, write_ref_cnt, then unsubmitted_cnt and unsynced_cnt
 | 
			
		||||
@ -237,11 +241,6 @@ int ObTabletMemtableMgr::create_memtable(const SCN clog_checkpoint_scn,
 | 
			
		||||
      // there is no frozen memtable and new sstable will not be generated,
 | 
			
		||||
      // meaning that clog_checkpoint_scn will not be updated now,
 | 
			
		||||
      // so get newest clog_checkpoint_scn to set left boundary
 | 
			
		||||
      } else if (OB_FAIL(get_newest_clog_checkpoint_scn(new_clog_checkpoint_scn))){
 | 
			
		||||
        LOG_WARN("failed to get newest clog_checkpoint_scn", K(ret), K(ls_id), K(tablet_id_),
 | 
			
		||||
                 K(new_clog_checkpoint_scn));
 | 
			
		||||
      } else if (OB_FAIL(get_newest_snapshot_version(new_snapshot_version))){
 | 
			
		||||
        LOG_WARN("failed to get newest snapshot_version", K(ret), K(ls_id), K(tablet_id_), K(new_snapshot_version));
 | 
			
		||||
      } else {
 | 
			
		||||
        memtable->resolve_left_boundary(new_clog_checkpoint_scn);
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user