BUGFIX: optimize the use of ls lock at freeze and offline
This commit is contained in:
		| @ -1070,11 +1070,11 @@ int ObService::tenant_freeze_(const uint64_t tenant_id) | ||||
|     LOG_INFO("no need to freeze virtual tenant", K(ret), K(tenant_id)); | ||||
|   } else { | ||||
|     MTL_SWITCH(tenant_id) { | ||||
|       checkpoint::ObCheckPointService* checkpoint_serv = nullptr; | ||||
|       if (OB_ISNULL(checkpoint_serv = MTL(checkpoint::ObCheckPointService*))) { | ||||
|       storage::ObTenantFreezer* freezer = nullptr; | ||||
|       if (OB_ISNULL(freezer = MTL(storage::ObTenantFreezer*))) { | ||||
|         ret = OB_ERR_UNEXPECTED; | ||||
|         LOG_WARN("ObCheckPointService shouldn't be null", K(ret), K(tenant_id)); | ||||
|       } else if (OB_FAIL(checkpoint_serv->do_minor_freeze())) { | ||||
|         LOG_WARN("ObTenantFreezer shouldn't be null", K(ret), K(tenant_id)); | ||||
|       } else if (OB_FAIL(freezer->tenant_freeze())) { | ||||
|         if (OB_ENTRY_EXIST == ret) { | ||||
|           ret = OB_SUCCESS; | ||||
|         } else { | ||||
|  | ||||
| @ -1344,7 +1344,7 @@ int ObLS::replay_get_tablet(const common::ObTabletID &tablet_id, | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObLS::logstream_freeze(const bool is_sync) | ||||
| int ObLS::logstream_freeze(const bool is_sync, const int64_t abs_timeout_ts) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   ObFuture<int> result; | ||||
| @ -1352,8 +1352,11 @@ int ObLS::logstream_freeze(const bool is_sync) | ||||
|   { | ||||
|     int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA; | ||||
|     int64_t write_lock = 0; | ||||
|     ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock); | ||||
|     if (IS_NOT_INIT) { | ||||
|     ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock, abs_timeout_ts); | ||||
|     if (!lock_myself.locked()) { | ||||
|       ret = OB_TIMEOUT; | ||||
|       LOG_WARN("lock ls failed, please retry later", K(ret), K(ls_meta_)); | ||||
|     } else if (IS_NOT_INIT) { | ||||
|       ret = OB_NOT_INIT; | ||||
|       LOG_WARN("ls is not inited", K(ret)); | ||||
|     } else if (OB_UNLIKELY(is_stopped_)) { | ||||
| @ -1376,7 +1379,9 @@ int ObLS::logstream_freeze(const bool is_sync) | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObLS::tablet_freeze(const ObTabletID &tablet_id, const bool is_sync) | ||||
| int ObLS::tablet_freeze(const ObTabletID &tablet_id, | ||||
|                         const bool is_sync, | ||||
|                         const int64_t abs_timeout_ts) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   ObFuture<int> result; | ||||
| @ -1384,8 +1389,11 @@ int ObLS::tablet_freeze(const ObTabletID &tablet_id, const bool is_sync) | ||||
|   { | ||||
|     int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA; | ||||
|     int64_t write_lock = 0; | ||||
|     ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock); | ||||
|     if (IS_NOT_INIT) { | ||||
|     ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock, abs_timeout_ts); | ||||
|     if (!lock_myself.locked()) { | ||||
|       ret = OB_TIMEOUT; | ||||
|       LOG_WARN("lock failed, please retry later", K(ret), K(ls_meta_)); | ||||
|     } else if (IS_NOT_INIT) { | ||||
|       ret = OB_NOT_INIT; | ||||
|       LOG_WARN("ls is not inited", K(ret)); | ||||
|     } else if (OB_UNLIKELY(is_stopped_)) { | ||||
| @ -1408,13 +1416,16 @@ int ObLS::tablet_freeze(const ObTabletID &tablet_id, const bool is_sync) | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObLS::force_tablet_freeze(const ObTabletID &tablet_id) | ||||
| int ObLS::force_tablet_freeze(const ObTabletID &tablet_id, const int64_t abs_timeout_ts) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA; | ||||
|   int64_t write_lock = 0; | ||||
|   ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock); | ||||
|   if (IS_NOT_INIT) { | ||||
|   ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock, abs_timeout_ts); | ||||
|   if (!lock_myself.locked()) { | ||||
|     ret = OB_TIMEOUT; | ||||
|     LOG_WARN("lock failed, please retry later", K(ret), K(ls_meta_)); | ||||
|   } else if (IS_NOT_INIT) { | ||||
|     ret = OB_NOT_INIT; | ||||
|     LOG_WARN("ls is not inited", K(ret)); | ||||
|   } else if (OB_UNLIKELY(is_stopped_)) { | ||||
| @ -1431,7 +1442,9 @@ int ObLS::force_tablet_freeze(const ObTabletID &tablet_id) | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObLS::batch_tablet_freeze(const ObIArray<ObTabletID> &tablet_ids, const bool is_sync) | ||||
| int ObLS::batch_tablet_freeze(const ObIArray<ObTabletID> &tablet_ids, | ||||
|                               const bool is_sync, | ||||
|                               const int64_t abs_timeout_ts) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   ObFuture<int> result; | ||||
| @ -1439,8 +1452,11 @@ int ObLS::batch_tablet_freeze(const ObIArray<ObTabletID> &tablet_ids, const bool | ||||
|   { | ||||
|     int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA; | ||||
|     int64_t write_lock = 0; | ||||
|     ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock); | ||||
|     if (IS_NOT_INIT) { | ||||
|     ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock, abs_timeout_ts); | ||||
|     if (!lock_myself.locked()) { | ||||
|       ret = OB_TIMEOUT; | ||||
|       LOG_WARN("lock failed, please retry later", K(ret), K(ls_meta_)); | ||||
|     } else if (IS_NOT_INIT) { | ||||
|       ret = OB_NOT_INIT; | ||||
|       LOG_WARN("ls is not inited", K(ret)); | ||||
|     } else if (OB_UNLIKELY(is_stopped_)) { | ||||
| @ -1463,12 +1479,19 @@ int ObLS::batch_tablet_freeze(const ObIArray<ObTabletID> &tablet_ids, const bool | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObLS::advance_checkpoint_by_flush(SCN recycle_scn) | ||||
| int ObLS::advance_checkpoint_by_flush(SCN recycle_scn, const int64_t abs_timeout_ts) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   int64_t read_lock = LSLOCKALL; | ||||
|   int64_t write_lock = 0; | ||||
|   ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock); | ||||
|   return checkpoint_executor_.advance_checkpoint_by_flush(recycle_scn); | ||||
|   ObLSLockGuard lock_myself(this, lock_, read_lock, write_lock, abs_timeout_ts); | ||||
|   if (!lock_myself.locked()) { | ||||
|     ret = OB_TIMEOUT; | ||||
|     LOG_WARN("lock failed, please retry later", K(ret), K(ls_meta_)); | ||||
|   } else { | ||||
|     ret = checkpoint_executor_.advance_checkpoint_by_flush(recycle_scn); | ||||
|   } | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObLS::get_ls_meta_package_and_tablet_ids(const bool check_archive, | ||||
|  | ||||
| @ -636,27 +636,28 @@ public: | ||||
|   DELEGATE_WITH_RET(replay_handler_, replay, int); | ||||
|  | ||||
|   // ObFreezer interface: | ||||
|   // @param [in] is_sync | ||||
|   // int logstream_freeze(const bool is_sync = false); | ||||
|   // DELEGATE_WITH_RET(ls_freezer_, logstream_freeze, int); | ||||
|   int logstream_freeze(const bool is_sync = false); | ||||
|   // freeze the data of ls: | ||||
|   // @param [in] is_sync, only used for wait_freeze_finished() | ||||
|   // @param [in] abs_timeout_ts, wait until timeout if lock conflict | ||||
|   int logstream_freeze(const bool is_sync = false, | ||||
|                        const int64_t abs_timeout_ts = INT64_MAX); | ||||
|   // tablet freeze | ||||
|   // @param [in] tablet_id | ||||
|   // @param [in] is_sync | ||||
|   // int tablet_freeze(const ObTabletID &tablet_id, const bool is_sync = false); | ||||
|   // DELEGATE_WITH_RET(ls_freezer_, tablet_freeze, int); | ||||
|   int tablet_freeze(const ObTabletID &tablet_id, const bool is_sync = false); | ||||
|   // @param [in] is_sync, only used for wait_freeze_finished() | ||||
|   // @param [in] abs_timeout_ts, wait until timeout if lock conflict | ||||
|   int tablet_freeze(const ObTabletID &tablet_id, | ||||
|                     const bool is_sync = false, | ||||
|                     const int64_t abs_timeout_ts = INT64_MAX); | ||||
|   // force freeze tablet | ||||
|   // @param [in] tablet_id | ||||
|   // int force_tablet_freeze(const ObTabletID &tablet_id); | ||||
|   // DELEGATE_WITH_RET(ls_freezer_, force_tablet_freeze, int); | ||||
|   int force_tablet_freeze(const ObTabletID &tablet_id); | ||||
|   // @param [in] abs_timeout_ts, wait until timeout if lock conflict | ||||
|   int force_tablet_freeze(const ObTabletID &tablet_id, | ||||
|                           const int64_t abs_timeout_ts = INT64_MAX); | ||||
|   // batch tablet freeze | ||||
|   // @param [in] tablet_ids | ||||
|   // @param [in] is_sync | ||||
|   // int batch_tablet_freeze(const ObIArray<ObTabletID> &tablet_ids, const bool is_sync = false); | ||||
|   // DELEGATE_WITH_RET(ls_freezer_, batch_tablet_freeze, int); | ||||
|   int batch_tablet_freeze(const ObIArray<ObTabletID> &tablet_ids, const bool is_sync = false); | ||||
|   // @param [in] abs_timeout_ts, wait until timeout if lock conflict | ||||
|   int batch_tablet_freeze(const ObIArray<ObTabletID> &tablet_ids, | ||||
|                           const bool is_sync = false, | ||||
|                           const int64_t abs_timeout_ts = INT64_MAX); | ||||
|  | ||||
|   // ObTxTable interface | ||||
|   DELEGATE_WITH_RET(tx_table_, get_tx_table_guard, int); | ||||
| @ -666,7 +667,10 @@ public: | ||||
|  | ||||
|   // ObCheckpointExecutor interface: | ||||
|   DELEGATE_WITH_RET(checkpoint_executor_, get_checkpoint_info, int); | ||||
|   int advance_checkpoint_by_flush(share::SCN recycle_scn); | ||||
|   // advance the checkpoint of this ls | ||||
|   // @param [in] abs_timeout_ts, wait until timeout if lock conflict | ||||
|   int advance_checkpoint_by_flush(share::SCN recycle_scn, | ||||
|                                   const int64_t abs_timeout_ts = INT64_MAX); | ||||
|  | ||||
|   // ObDataCheckpoint interface: | ||||
|   DELEGATE_WITH_RET(data_checkpoint_, get_freezecheckpoint_info, int); | ||||
|  | ||||
| @ -35,7 +35,7 @@ ObLSLock::~ObLSLock() | ||||
| { | ||||
| } | ||||
|  | ||||
| int64_t ObLSLock::lock(const ObLS *ls, int64_t hold, int64_t change) | ||||
| int64_t ObLSLock::lock(const ObLS *ls, int64_t hold, int64_t change, const int64_t abs_timeout_us) | ||||
| { | ||||
|   int ret = OB_SUCCESS; // tmp_ret, rewrite every time | ||||
|   int64_t pos = 0; | ||||
| @ -46,14 +46,16 @@ int64_t ObLSLock::lock(const ObLS *ls, int64_t hold, int64_t change) | ||||
|   ObTimeGuard tg("ObLSLock::lock", LOCK_CONFLICT_WARN_TIME); | ||||
|   while (hold | change) { | ||||
|     if (change & 1) { | ||||
|       if (OB_FAIL(locks_[pos].wrlock(ObLatchIds::LS_LOCK))) { | ||||
|         LOG_ERROR("wrlock error", K(ret), K(pos)); | ||||
|       if (OB_FAIL(locks_[pos].wrlock(ObLatchIds::LS_LOCK, abs_timeout_us))) { | ||||
|         // maybe timeout, it is expected error code. | ||||
|         LOG_WARN("wrlock failed", KR(ret), K(pos)); | ||||
|       } else { | ||||
|         res |= 1L << pos; | ||||
|       } | ||||
|     } else if (hold & 1) { | ||||
|       if (OB_FAIL(locks_[pos].rdlock(ObLatchIds::LS_LOCK))) { | ||||
|         LOG_ERROR("rdlock error", K(ret), K(pos)); | ||||
|       if (OB_FAIL(locks_[pos].rdlock(ObLatchIds::LS_LOCK, abs_timeout_us))) { | ||||
|         // maybe timeout, it is expected error code. | ||||
|         LOG_WARN("rdlock failed", KR(ret), K(pos)); | ||||
|       } else { | ||||
|         res |= 1L << pos; | ||||
|       } | ||||
| @ -153,6 +155,29 @@ ObLSLockGuard::ObLSLockGuard(ObLS *ls, | ||||
|   } | ||||
| } | ||||
|  | ||||
| ObLSLockGuard::ObLSLockGuard(ObLS *ls, | ||||
|                              ObLSLock &lock, | ||||
|                              int64_t hold, | ||||
|                              int64_t change, | ||||
|                              const int64_t abs_timeout_us) | ||||
|   : lock_(lock), | ||||
|     mark_(0), | ||||
|     start_ts_(INT64_MAX), | ||||
|     ls_(ls) | ||||
| { | ||||
|   hold &= LSLOCKMASK; | ||||
|   change &= LSLOCKMASK; | ||||
|   // upgrade hold to change | ||||
|   hold ^= hold & change; | ||||
|  | ||||
|   if ((hold | change) != (mark_ = lock_.lock(ls, hold, change, abs_timeout_us))) { | ||||
|     // reset the the one we have locked. | ||||
|     lock_.unlock(mark_); | ||||
|     mark_ = 0; | ||||
|   } | ||||
|   start_ts_ = ObTimeUtility::current_time(); | ||||
| } | ||||
|  | ||||
| ObLSLockGuard::ObLSLockGuard(ObLS *ls, const bool rdlock) | ||||
|   : lock_(ls->lock_), | ||||
|     mark_(0), | ||||
|  | ||||
| @ -66,7 +66,7 @@ public: | ||||
|   ObLSLock(const ObLSLock&) = delete; | ||||
|   ObLSLock& operator=(const ObLSLock&) = delete; | ||||
| private: | ||||
|   int64_t lock(const ObLS *ls, int64_t hold, int64_t change); | ||||
|   int64_t lock(const ObLS *ls, int64_t hold, int64_t change, const int64_t abs_timeout_us = INT64_MAX); | ||||
|   int64_t try_lock(const ObLS *ls, int64_t hold, int64_t change); | ||||
|   void unlock(int64_t target); | ||||
|  | ||||
| @ -81,6 +81,11 @@ public: | ||||
|                 int64_t hold, | ||||
|                 int64_t change, | ||||
|                 const bool trylock = false); | ||||
|   ObLSLockGuard(ObLS *ls, | ||||
|                 ObLSLock &lock, | ||||
|                 int64_t hold, | ||||
|                 int64_t change, | ||||
|                 const int64_t abs_timeout_us); | ||||
|   // lock all by default. | ||||
|   // WARNING: make sure ls is not null. | ||||
|   ObLSLockGuard(ObLS *ls, const bool rdlock = false); | ||||
|  | ||||
| @ -608,13 +608,10 @@ int ObLSTxService::offline() | ||||
|     TRANS_LOG(WARN, "block tx failed", K_(ls_id)); | ||||
|   } else if (OB_FAIL(mgr_->kill_all_tx(graceful, unused_is_all_tx_clean_up))) { | ||||
|     TRANS_LOG(WARN, "kill_all_tx failed", K_(ls_id)); | ||||
|   } else { | ||||
|     while (mgr_->get_tx_ctx_count() > 0) { | ||||
|       ob_usleep(SLEEP_US); // retry ater 20 ms | ||||
|       if (REACH_TIME_INTERVAL(PRINT_LOG_INTERVAL)) { | ||||
|         TRANS_LOG(WARN, "transaction not empty, try again", KP(mgr_), K_(ls_id)); | ||||
|         mgr_->print_all_tx_ctx(ObLSTxCtxMgr::MAX_HASH_ITEM_PRINT, verbose); | ||||
|       } | ||||
|   } else if (mgr_->get_tx_ctx_count() > 0) { | ||||
|     ret = OB_EAGAIN; | ||||
|     if (REACH_TIME_INTERVAL(PRINT_LOG_INTERVAL)) { | ||||
|       TRANS_LOG(WARN, "transaction not empty, try again", KP(mgr_), K_(ls_id), K(mgr_->get_tx_ctx_count())); | ||||
|     } | ||||
|   } | ||||
|   return ret; | ||||
|  | ||||
| @ -362,35 +362,6 @@ void ObCheckPointService::ObCheckClogDiskUsageTask::runTimerTask() | ||||
|   } | ||||
| } | ||||
|  | ||||
| int ObCheckPointService::do_minor_freeze() | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   int tmp_ret = OB_SUCCESS; | ||||
|   ObLSIterator *iter = NULL; | ||||
|   common::ObSharedGuard<ObLSIterator> guard; | ||||
|   ObLSService *ls_svr = MTL(ObLSService*); | ||||
|   if (OB_ISNULL(ls_svr)) { | ||||
|     STORAGE_LOG(WARN, "mtl ObLSService should not be null", K(ret)); | ||||
|   } else if (OB_FAIL(ls_svr->get_ls_iter(guard, ObLSGetMod::TXSTORAGE_MOD))) { | ||||
|     STORAGE_LOG(WARN, "get log stream iter failed", K(ret)); | ||||
|   } else if (OB_ISNULL(iter = guard.get_ptr())) { | ||||
|     STORAGE_LOG(WARN, "iter is NULL", K(ret)); | ||||
|   } else { | ||||
|     ObLS *ls = nullptr; | ||||
|     int ls_cnt = 0; | ||||
|     for (; OB_SUCC(iter->get_next(ls)); ++ls_cnt) { | ||||
|       if (OB_SUCCESS != (tmp_ret = (ls->advance_checkpoint_by_flush(SCN::max_scn())))) { | ||||
|         STORAGE_LOG(WARN, "advance_checkpoint_by_flush failed", K(tmp_ret), K(ls->get_ls_id())); | ||||
|       } | ||||
|     } | ||||
|     if (ret == OB_ITER_END) { | ||||
|       ret = OB_SUCCESS; | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| } // checkpoint | ||||
| } // storage | ||||
| } // oceanbase | ||||
|  | ||||
| @ -52,9 +52,6 @@ public: | ||||
|   int add_ls_freeze_task( | ||||
|       ObDataCheckpoint *data_checkpoint, | ||||
|       share::SCN rec_scn); | ||||
|  | ||||
|   int do_minor_freeze(); | ||||
|  | ||||
| private: | ||||
|   bool is_inited_; | ||||
|  | ||||
|  | ||||
| @ -19,6 +19,7 @@ | ||||
| #include "rootserver/freeze/ob_major_freeze_helper.h" | ||||
| #include "share/allocator/ob_memstore_allocator_mgr.h" | ||||
| #include "share/config/ob_server_config.h" | ||||
| #include "share/ob_share_util.h" | ||||
| #include "share/rc/ob_tenant_module_init_ctx.h" | ||||
| #include "storage/ls/ob_ls.h" | ||||
| #include "storage/tx_storage/ob_ls_handle.h" | ||||
| @ -31,6 +32,7 @@ using namespace share; | ||||
| namespace storage | ||||
| { | ||||
|  | ||||
|  | ||||
| typedef ObMemstoreAllocatorMgr::TAllocator ObTenantMemstoreAllocator; | ||||
|  | ||||
| ObTenantFreezer::ObTenantFreezer() | ||||
| @ -191,21 +193,115 @@ bool ObTenantFreezer::exist_ls_freezing() | ||||
|   return exist_ls_freezing_; | ||||
| } | ||||
|  | ||||
| int ObTenantFreezer::ls_freeze_(ObLS *ls) | ||||
| // force freeze means we must do another freeze rather than use the freeze | ||||
| // result of others | ||||
| int ObTenantFreezer::ls_freeze_(ObLS *ls, | ||||
|                                 const bool is_sync, | ||||
|                                 const bool force_freeze, | ||||
|                                 const int64_t abs_timeout_ts) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   const int64_t SLEEP_TS = 1000 * 1000; // 1s | ||||
|   int64_t current_ts = 0; | ||||
|   int64_t retry_times = 0; | ||||
|   // wait if there is a freeze is doing | ||||
|   bool is_timeout = false; | ||||
|   bool need_retry = false; | ||||
|   // wait and retry if there is a freeze is doing | ||||
|   // or if we can not get the ls lock. | ||||
|   do { | ||||
|     need_retry = false; | ||||
|     retry_times++; | ||||
|     if (OB_FAIL(ls->logstream_freeze(true/*is_sync*/)) && OB_ENTRY_EXIST == ret) { | ||||
|     if (OB_SUCC(ls->logstream_freeze(is_sync, abs_timeout_ts))) { | ||||
|     } else { | ||||
|       current_ts = ObTimeUtil::current_time(); | ||||
|       is_timeout = (current_ts >= abs_timeout_ts); | ||||
|       // retry condition 1 | ||||
|       need_retry = (!is_timeout); | ||||
|       // retry condition 2, 3 | ||||
|       need_retry = need_retry && ((OB_EAGAIN == ret) || (force_freeze && OB_ENTRY_EXIST == ret)); | ||||
|     } | ||||
|     if (need_retry) { | ||||
|       ob_usleep(SLEEP_TS); | ||||
|     } | ||||
|     if (retry_times % 10 == 0) { | ||||
|       LOG_WARN_RET(OB_ERR_TOO_MUCH_TIME, "wait ls freeze finished cost too much time", K(retry_times)); | ||||
|     } | ||||
|   } while (ret == OB_ENTRY_EXIST); | ||||
|   } while (need_retry); | ||||
|   if (OB_NOT_RUNNING == ret) { | ||||
|     ret = OB_SUCCESS; | ||||
|   } | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObTenantFreezer::ls_freeze_all_unit_(ObLS *ls, const int64_t abs_timeout_ts) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   const int64_t SLEEP_TS = 1000 * 1000; // 1s | ||||
|   int64_t current_ts = 0; | ||||
|   int64_t retry_times = 0; | ||||
|   bool is_timeout = false; | ||||
|   bool need_retry = false; | ||||
|   // wait and retry if we can not get the ls lock. | ||||
|   do { | ||||
|     need_retry = false; | ||||
|     retry_times++; | ||||
|     if (OB_SUCC(ls->advance_checkpoint_by_flush(SCN::max_scn(), abs_timeout_ts))) { | ||||
|     } else { | ||||
|       current_ts = ObTimeUtil::current_time(); | ||||
|       is_timeout = (current_ts >= abs_timeout_ts); | ||||
|       // retry condition 1 | ||||
|       need_retry = (!is_timeout); | ||||
|       // retry condition 2 | ||||
|       need_retry = need_retry && (OB_EAGAIN == ret); | ||||
|     } | ||||
|     if (need_retry) { | ||||
|       ob_usleep(SLEEP_TS); | ||||
|     } | ||||
|     if (retry_times % 10 == 0) { | ||||
|       LOG_WARN_RET(OB_ERR_TOO_MUCH_TIME, "wait ls freeze finished cost too much time", K(retry_times)); | ||||
|     } | ||||
|   } while (need_retry); | ||||
|   if (OB_NOT_RUNNING == ret) { | ||||
|     ret = OB_SUCCESS; | ||||
|   } | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObTenantFreezer::tablet_freeze_(ObLS *ls, | ||||
|                                     const common::ObTabletID &tablet_id, | ||||
|                                     const bool force_tablet_freeze, | ||||
|                                     const bool is_sync, | ||||
|                                     const int64_t abs_timeout_ts) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   const int64_t SLEEP_TS = 1000 * 1000; // 1s | ||||
|   int64_t current_ts = 0; | ||||
|   int64_t retry_times = 0; | ||||
|   bool is_timeout = false; | ||||
|   bool need_retry = false; | ||||
|   // wait and retry if there is a freeze is doing | ||||
|   // or if we can not get the ls lock. | ||||
|   do { | ||||
|     need_retry = false; | ||||
|     retry_times++; | ||||
|     if (OB_SUCC(force_tablet_freeze | ||||
|                 ? ls->force_tablet_freeze(tablet_id, abs_timeout_ts) | ||||
|                 : ls->tablet_freeze(tablet_id, is_sync, abs_timeout_ts))) { | ||||
|     } else { | ||||
|       current_ts = ObTimeUtil::current_time(); | ||||
|       is_timeout = (current_ts >= abs_timeout_ts); | ||||
|       // retry condition 1 | ||||
|       need_retry = (!is_timeout); | ||||
|       // retry condition 2, 3 | ||||
|       need_retry = need_retry && (OB_EAGAIN == ret); | ||||
|     } | ||||
|     if (need_retry) { | ||||
|       ob_usleep(SLEEP_TS); | ||||
|     } | ||||
|     if (retry_times % 10 == 0) { | ||||
|       LOG_WARN_RET(OB_ERR_TOO_MUCH_TIME, "wait ls freeze finished cost too much time", K(retry_times)); | ||||
|     } | ||||
|   } while (need_retry); | ||||
|   if (OB_NOT_RUNNING == ret) { | ||||
|     ret = OB_SUCCESS; | ||||
|   } | ||||
| @ -252,22 +348,64 @@ int ObTenantFreezer::tenant_freeze_() | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObTenantFreezer::tenant_freeze() | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   int tmp_ret = OB_SUCCESS; | ||||
|   ObLSService *ls_svr = MTL(ObLSService *); | ||||
|   common::ObSharedGuard<ObLSIterator> guard; | ||||
|   ObLSIterator *iter = NULL; | ||||
|   ObLS *ls = nullptr; | ||||
|   int ls_cnt = 0; | ||||
|   int64_t abs_timeout_ts = INT64_MAX; | ||||
|  | ||||
|   if (IS_NOT_INIT) { | ||||
|     ret = OB_NOT_INIT; | ||||
|     LOG_WARN("[TenantFreezer] tenant freezer not inited", KR(ret)); | ||||
|   } else if (OB_FAIL(ObShareUtil::get_abs_timeout(MAX_FREEZE_TIMEOUT_US /* default timeout */, | ||||
|                                                   abs_timeout_ts))) { | ||||
|     LOG_WARN("get timeout ts failed", KR(ret)); | ||||
|   } else if (OB_FAIL(ls_svr->get_ls_iter(guard, ObLSGetMod::TXSTORAGE_MOD))) { | ||||
|     LOG_WARN("get log stream iter failed", K(ret)); | ||||
|   } else if (OB_ISNULL(iter = guard.get_ptr())) { | ||||
|     LOG_WARN("iter is NULL", K(ret)); | ||||
|   } else { | ||||
|     for (; OB_SUCC(iter->get_next(ls)); ++ls_cnt) { | ||||
|       if (OB_TMP_FAIL(ls_freeze_all_unit_(ls, abs_timeout_ts))) { | ||||
|         LOG_WARN("ls freeze all unit failed", K(tmp_ret), K(ls->get_ls_id())); | ||||
|       } | ||||
|     } | ||||
|     if (ret == OB_ITER_END) { | ||||
|       ret = OB_SUCCESS; | ||||
|     } | ||||
|   } | ||||
|   LOG_INFO("tenant_freeze finished", KR(ret), K(abs_timeout_ts)); | ||||
|  | ||||
|   return ret; | ||||
| } | ||||
|  | ||||
| int ObTenantFreezer::ls_freeze(const share::ObLSID &ls_id) | ||||
| { | ||||
|   int ret = OB_SUCCESS; | ||||
|   ObLSService *ls_srv = MTL(ObLSService *); | ||||
|   ObLSHandle handle; | ||||
|   ObLS *ls = nullptr; | ||||
|   const bool is_sync = false; | ||||
|   const bool force_freeze = false; | ||||
|   int64_t abs_timeout_ts = INT64_MAX; | ||||
|  | ||||
|   if (IS_NOT_INIT) { | ||||
|     ret = OB_NOT_INIT; | ||||
|     LOG_WARN("[TenantFreezer] tenant freezer not inited", KR(ret)); | ||||
|   } else if (OB_FAIL(ObShareUtil::get_abs_timeout(MAX_FREEZE_TIMEOUT_US /* default timeout */, | ||||
|                                                   abs_timeout_ts))) { | ||||
|     LOG_WARN("get timeout ts failed", KR(ret)); | ||||
|   } else if (OB_FAIL(ls_srv->get_ls(ls_id, handle, ObLSGetMod::TXSTORAGE_MOD))) { | ||||
|     LOG_WARN("[TenantFreezer] fail to get ls", K(ret), K(ls_id)); | ||||
|   } else if (OB_ISNULL(ls = handle.get_ls())) { | ||||
|     ret = OB_ERR_UNEXPECTED; | ||||
|     LOG_WARN("[TenantFreezer] ls is null", KR(ret), K(ls_id)); | ||||
|   } else if (OB_FAIL(ls->logstream_freeze())) { | ||||
|   } else if (OB_FAIL(ls_freeze_(ls, is_sync, force_freeze, abs_timeout_ts))) { | ||||
|     LOG_WARN("[TenantFreezer] logstream freeze failed", KR(ret), K(ls_id)); | ||||
|   } | ||||
|  | ||||
| @ -278,7 +416,10 @@ int ObTenantFreezer::tablet_freeze(const common::ObTabletID &tablet_id, | ||||
|                                    const bool is_force_freeze, | ||||
|                                    const bool is_sync) | ||||
| { | ||||
|   return tablet_freeze(ObLSID(ObLSID::INVALID_LS_ID), tablet_id, is_force_freeze, is_sync); | ||||
|   return tablet_freeze(ObLSID(ObLSID::INVALID_LS_ID), | ||||
|                        tablet_id, | ||||
|                        is_force_freeze, | ||||
|                        is_sync); | ||||
| } | ||||
|  | ||||
| int ObTenantFreezer::tablet_freeze(share::ObLSID ls_id, | ||||
| @ -291,11 +432,15 @@ int ObTenantFreezer::tablet_freeze(share::ObLSID ls_id, | ||||
|   ObLSService *ls_srv = MTL(ObLSService *); | ||||
|   ObLSHandle handle; | ||||
|   ObLS *ls = nullptr; | ||||
|   int64_t abs_timeout_ts = INT64_MAX; | ||||
|   FLOG_INFO("[TenantFreezer] tablet_freeze start", KR(ret), K(tablet_id)); | ||||
|  | ||||
|   if (IS_NOT_INIT) { | ||||
|     ret = OB_NOT_INIT; | ||||
|     LOG_WARN("[TenantFreezer] tenant freezer not inited", KR(ret)); | ||||
|   } else if (OB_FAIL(ObShareUtil::get_abs_timeout(MAX_FREEZE_TIMEOUT_US/* default timeout */, | ||||
|                                                   abs_timeout_ts))) { | ||||
|     LOG_WARN("get timeout ts failed", KR(ret)); | ||||
|   } else if (!ls_id.is_valid()) { | ||||
|     // if ls_id is invalid, get ls id by tablet id | ||||
|     if (OB_FAIL(GCTX.location_service_->get(tenant_info_.tenant_id_, tablet_id, INT64_MAX, is_cache_hit, ls_id))) { | ||||
| @ -309,9 +454,11 @@ int ObTenantFreezer::tablet_freeze(share::ObLSID ls_id, | ||||
|   } else if (OB_ISNULL(ls = handle.get_ls())) { | ||||
|     ret = OB_ERR_UNEXPECTED; | ||||
|     LOG_WARN("[TenantFreezer] ls is null", KR(ret), K(ls_id)); | ||||
|   } else if (OB_FAIL(is_force_freeze | ||||
|                      ? ls->force_tablet_freeze(tablet_id) | ||||
|                      : ls->tablet_freeze(tablet_id, is_sync))) { | ||||
|   } else if (OB_FAIL(tablet_freeze_(ls, | ||||
|                                     tablet_id, | ||||
|                                     is_force_freeze, | ||||
|                                     is_sync, | ||||
|                                     abs_timeout_ts))) { | ||||
|     LOG_WARN("[TenantFreezer] fail to freeze tablet", KR(ret), K(ls_id), K(tablet_id)); | ||||
|   } | ||||
|  | ||||
| @ -337,10 +484,10 @@ int ObTenantFreezer::get_ls_tx_data_mem_used_(ObLS *ls, int64_t &ls_tx_data_mem_ | ||||
|   } else if (OB_FAIL(memtable_mgr->get_active_memtable(memtable_handle))) { | ||||
|     LOG_WARN("get active memtable from tx data memtable mgr failed.", KR(ret)); | ||||
|   } else if (OB_FAIL(memtable_handle.get_tx_data_memtable(memtable))) { | ||||
|     STORAGE_LOG(ERROR, "get tx data memtable failed.", KR(ret), K(tenant_info_.tenant_id_)); | ||||
|     LOG_ERROR("get tx data memtable failed.", KR(ret), K(tenant_info_.tenant_id_)); | ||||
|   } else if (OB_ISNULL(memtable)) { | ||||
|     ret = OB_ERR_UNEXPECTED; | ||||
|     STORAGE_LOG(ERROR, "unexpected nullptr of tx data memtable", KR(ret), K(tenant_info_.tenant_id_)); | ||||
|     LOG_ERROR("unexpected nullptr of tx data memtable", KR(ret), K(tenant_info_.tenant_id_)); | ||||
|   } else { | ||||
|     ls_tx_data_mem_used = memtable->get_occupied_size(); | ||||
|   } | ||||
| @ -921,7 +1068,7 @@ int ObTenantFreezer::check_tenant_out_of_memstore_limit(bool &is_out_of_mem) | ||||
|       } else if (OB_FAIL(get_tenant_mem_usage_(ctx))) { | ||||
|         LOG_WARN("[TenantFreezer] fail to get mem usage", KR(ret), K(tenant_info_.tenant_id_)); | ||||
|       } else { | ||||
|         is_out_of_mem = (ctx.total_memstore_hold_ > ctx.mem_memstore_limit_ + REPLAY_RESERVE_MEMSTORE_BYTES); | ||||
|         is_out_of_mem = (ctx.total_memstore_hold_ > ctx.mem_memstore_limit_ - REPLAY_RESERVE_MEMSTORE_BYTES); | ||||
|       } | ||||
|       last_check_timestamp = current_time; | ||||
|     } | ||||
| @ -1213,6 +1360,8 @@ int ObTenantFreezer::do_major_if_need_(const bool need_freeze) | ||||
|   if (!tenant_info_.is_loaded_) { | ||||
|     // do nothing | ||||
|     // update frozen scn | ||||
|   } else if (!need_freeze) { | ||||
|     // no need major | ||||
|   } else if (OB_FAIL(get_global_frozen_scn_(frozen_scn))) { | ||||
|     LOG_WARN("fail to get global frozen version", K(ret)); | ||||
|   } else if (0 != frozen_scn && OB_FAIL(tenant_info_.update_frozen_scn(frozen_scn))) { | ||||
|  | ||||
| @ -34,6 +34,7 @@ namespace storage | ||||
| class ObTenantFreezer; | ||||
| class ObTenantTxDataFreezeGuard; | ||||
|  | ||||
| // this is used for tenant freeze, all the freeze task should call the function of this unit. | ||||
| class ObTenantFreezer | ||||
| { | ||||
| friend ObTenantTxDataFreezeGuard; | ||||
| @ -44,6 +45,7 @@ friend class ObFreezer; | ||||
|   const static int FREEZE_THREAD_NUM= 5; | ||||
|   const static int64_t FREEZE_TRIGGER_INTERVAL = 2_s; | ||||
|   const static int64_t UPDATE_INTERVAL = 100_ms; | ||||
|   const static int64_t MAX_FREEZE_TIMEOUT_US = 1800 * 1000 * 1000; // 30 min | ||||
|   // replay use 1G/s | ||||
|   const static int64_t REPLAY_RESERVE_MEMSTORE_BYTES = 100 * 1024 * 1024; // 100 MB | ||||
|   const static int64_t MEMSTORE_USED_CACHE_REFRESH_INTERVAL = 100_ms; | ||||
| @ -57,10 +59,12 @@ public: | ||||
|   int stop(); | ||||
|   void wait(); | ||||
|  | ||||
|   // freeze all the ls of this tenant. | ||||
|   // return the first failed code. | ||||
|   // freeze all the checkpoint unit of this tenant. | ||||
|   int tenant_freeze(); | ||||
|  | ||||
|   // freeze a ls, if the ls is freezing, do nothing and return OB_ENTRY_EXIST. | ||||
|   // if there is some process hold the ls lock or a OB_EAGAIN occur, we will retry | ||||
|   // until timeout. | ||||
|   int ls_freeze(const share::ObLSID &ls_id); | ||||
|   // freeze a tablet | ||||
|   int tablet_freeze(const common::ObTabletID &tablet_id, | ||||
| @ -136,7 +140,17 @@ public: | ||||
|   ObServerConfig *get_config() { return config_; } | ||||
|   bool exist_ls_freezing(); | ||||
| private: | ||||
|   static int ls_freeze_(ObLS *ls); | ||||
|   static int ls_freeze_(ObLS *ls, | ||||
|                         const bool is_sync = true, | ||||
|                         const bool force_freeze = true, | ||||
|                         const int64_t abs_timeout_ts = INT64_MAX); | ||||
|   static int ls_freeze_all_unit_(ObLS *ls, | ||||
|                                  const int64_t abs_timeout_ts = INT64_MAX); | ||||
|   static int tablet_freeze_(ObLS *ls, | ||||
|                             const common::ObTabletID &tablet_id, | ||||
|                             const bool force_tablet_freeze, | ||||
|                             const bool is_sync, | ||||
|                             const int64_t abs_timeout_ts); | ||||
|   // freeze all the ls of this tenant. | ||||
|   // return the first failed code. | ||||
|   static int tenant_freeze_(); | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 obdev
					obdev