wait log replayed when server restarting
This commit is contained in:
		@ -894,25 +894,6 @@ int ObServer::start()
 | 
			
		||||
    }
 | 
			
		||||
    FLOG_INFO("check if multi tenant synced", KR(ret), K(stop_), K(synced));
 | 
			
		||||
 | 
			
		||||
    /*
 | 
			
		||||
     * FIXME: skip partition service op first
 | 
			
		||||
    if (OB_SUCC(ret)) {
 | 
			
		||||
      do {
 | 
			
		||||
        if (stop_) {
 | 
			
		||||
          ret = OB_SERVER_IS_STOPPING;
 | 
			
		||||
        // } else if (OB_FAIL(ObPartitionService::get_instance().wait_start_finish())) {
 | 
			
		||||
          if (OB_EAGAIN == ret) {
 | 
			
		||||
            ob_usleep(100 * 1000);
 | 
			
		||||
          } else {
 | 
			
		||||
            LOG_ERROR("wait scan inner table failed", KR(ret));
 | 
			
		||||
          }
 | 
			
		||||
        } else {
 | 
			
		||||
          LOG_INFO("[NOTICE] wait scan inner table success");
 | 
			
		||||
        }
 | 
			
		||||
      } while (OB_EAGAIN == ret);
 | 
			
		||||
    }
 | 
			
		||||
    */
 | 
			
		||||
 | 
			
		||||
    bool schema_ready = false;
 | 
			
		||||
    while (OB_SUCC(ret) && !stop_ && !schema_ready) {
 | 
			
		||||
      schema_ready = schema_service_.is_sys_full_schema();
 | 
			
		||||
@ -941,14 +922,34 @@ int ObServer::start()
 | 
			
		||||
    }
 | 
			
		||||
    LOG_INFO("[NOTICE] check if sys srs usable", K(ret), K(stop_));
 | 
			
		||||
 | 
			
		||||
    // check log replay and user tenant schema refresh status
 | 
			
		||||
    if (OB_SUCC(ret)) {
 | 
			
		||||
      if (stop_) {
 | 
			
		||||
        ret = OB_SERVER_IS_STOPPING;
 | 
			
		||||
        FLOG_WARN("server is in stopping status", KR(ret));
 | 
			
		||||
      } else if (OB_FAIL(check_server_can_start_service())) {
 | 
			
		||||
        LOG_ERROR("fail to check server can start service", KR(ret));
 | 
			
		||||
      } else {
 | 
			
		||||
        FLOG_INFO("success to check server can start service", KR(ret));
 | 
			
		||||
        ObSEArray<uint64_t, 16> tenant_ids;
 | 
			
		||||
        const int64_t MAX_CHECK_TIME = 15 * 60 * 1000 * 1000L; // 15min
 | 
			
		||||
        const int64_t start_ts = ObTimeUtility::current_time();
 | 
			
		||||
        int64_t schema_refreshed_ts = 0;
 | 
			
		||||
        const int64_t expire_time = start_ts + MAX_CHECK_TIME;
 | 
			
		||||
 | 
			
		||||
        if (OB_FAIL(multi_tenant_.get_mtl_tenant_ids(tenant_ids))) {
 | 
			
		||||
          FLOG_ERROR("get mtl tenant ids fail", KR(ret));
 | 
			
		||||
        } else if (tenant_ids.count() <= 0) {
 | 
			
		||||
          // do nothing
 | 
			
		||||
        } else {
 | 
			
		||||
          // check user tenant schema refresh
 | 
			
		||||
          check_user_tenant_schema_refreshed(tenant_ids, expire_time);
 | 
			
		||||
          schema_refreshed_ts = ObTimeUtility::current_time();
 | 
			
		||||
          // check log replay status
 | 
			
		||||
          check_log_replay_over(tenant_ids, expire_time);
 | 
			
		||||
        }
 | 
			
		||||
        FLOG_INFO("[OBSERVER_NOTICE] check log replay and user tenant schema finished",
 | 
			
		||||
            KR(ret),
 | 
			
		||||
            K(tenant_ids),
 | 
			
		||||
            "refresh_schema_cost_us", schema_refreshed_ts - start_ts,
 | 
			
		||||
            "replay_log_cost_us", ObTimeUtility::current_time() - schema_refreshed_ts);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
@ -2735,76 +2736,62 @@ int ObServer::reload_bandwidth_throttle_limit(int64_t network_speed)
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObServer::check_server_can_start_service()
 | 
			
		||||
void ObServer::check_user_tenant_schema_refreshed(const ObIArray<uint64_t> &tenant_ids, const int64_t expire_time)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  int64_t min_wrs = INT64_MAX;
 | 
			
		||||
  // TODO: implement this function
 | 
			
		||||
  return ret;
 | 
			
		||||
  /*
 | 
			
		||||
  for (int64_t i = 0; i < tenant_ids.count()
 | 
			
		||||
                      && ObTimeUtility::current_time() < expire_time; ++i) {
 | 
			
		||||
    uint64_t tenant_id = tenant_ids.at(i);
 | 
			
		||||
    bool tenant_schema_refreshed = false;
 | 
			
		||||
    while (!tenant_schema_refreshed
 | 
			
		||||
          && !stop_
 | 
			
		||||
          && ObTimeUtility::current_time() < expire_time) {
 | 
			
		||||
 | 
			
		||||
  //On the standby database, it is very likely that the minimum standby machine-readable timestamp cannot be pushed because the main database does not exist.
 | 
			
		||||
  //Do not stop the server from starting, otherwise you may not be able to create a connection
 | 
			
		||||
  int64_t get_min_wrs_ts = ObTimeUtility::current_time();
 | 
			
		||||
  do {
 | 
			
		||||
    bool can_start_service = true;
 | 
			
		||||
    if (stop_) {
 | 
			
		||||
      ret = OB_SERVER_IS_STOPPING;
 | 
			
		||||
      tenant_schema_refreshed = is_user_tenant(tenant_id) ?
 | 
			
		||||
          gctx_.schema_service_->is_tenant_refreshed(tenant_id) : true;
 | 
			
		||||
      if (!tenant_schema_refreshed) {
 | 
			
		||||
        // check wait and retry
 | 
			
		||||
        usleep(1000 * 1000);
 | 
			
		||||
        if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
 | 
			
		||||
          FLOG_INFO("[OBSERVER_NOTICE] Refreshing user tenant schema, need to wait ", K(tenant_id));
 | 
			
		||||
        }
 | 
			
		||||
        // check success
 | 
			
		||||
      } else if (i == tenant_ids.count() - 1) {
 | 
			
		||||
        FLOG_INFO("[OBSERVER_NOTICE] Refresh all user tenant schema successfully ", K(tenant_ids));
 | 
			
		||||
        // check timeout
 | 
			
		||||
      } else if (ObTimeUtility::current_time() > expire_time) {
 | 
			
		||||
        FLOG_INFO("[OBSERVER_NOTICE] Refresh user tenant schema timeout ", K(tenant_id));
 | 
			
		||||
      } else {
 | 
			
		||||
      //Check whether the lagging amount of all partitions of the machine is greater than max_stale_time_for_weak_consistency. If true, it cannot be restarted
 | 
			
		||||
      int64_t tmp_min_wrs = INT64_MAX;
 | 
			
		||||
      weak_read_service_.check_server_can_start_service(can_start_service, tmp_min_wrs);
 | 
			
		||||
        FLOG_INFO("[OBSERVER_NOTICE] Refresh user tenant schema successfully ", K(tenant_id));
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void ObServer::check_log_replay_over(const ObIArray<uint64_t> &tenant_ids, const int64_t expire_time)
 | 
			
		||||
{
 | 
			
		||||
  for (int64_t i = 0; i < tenant_ids.count()
 | 
			
		||||
                      && ObTimeUtility::current_time() < expire_time; ++i) {
 | 
			
		||||
    SCN min_version;
 | 
			
		||||
    uint64_t tenant_id = tenant_ids.at(i);
 | 
			
		||||
    bool can_start_service = false;
 | 
			
		||||
    while (!can_start_service
 | 
			
		||||
          && !stop_
 | 
			
		||||
          && ObTimeUtility::current_time() < expire_time) {
 | 
			
		||||
      weak_read_service_.check_tenant_can_start_service(tenant_id, can_start_service, min_version);
 | 
			
		||||
        // check wait and retry
 | 
			
		||||
      if (!can_start_service) {
 | 
			
		||||
        const int64_t STANDBY_WAIT_WRS_DURATION = 60 * 1000 * 1000;
 | 
			
		||||
        const int64_t current_time = ObTimeUtility::current_time();
 | 
			
		||||
        if (min_wrs != tmp_min_wrs) {
 | 
			
		||||
          get_min_wrs_ts = current_time;
 | 
			
		||||
          min_wrs = tmp_min_wrs;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (GCTX.is_standby_cluster() && STANDBY_WAIT_WRS_DURATION < current_time - get_min_wrs_ts
 | 
			
		||||
            // && ObPartitionService::get_instance().is_scan_disk_finished()) {
 | 
			
		||||
          //If it is in the standby database, and the minimum standby machine readable timestamp has been one minute, there is no way to continue advancing, and the clog playback has ended,
 | 
			
		||||
          //You need to let go of this processing
 | 
			
		||||
          ret = OB_SUCCESS;
 | 
			
		||||
          LOG_INFO("[NOTICE] in standby cluster, no need to wait weak read timestamp", K(min_wrs),
 | 
			
		||||
              "hanging time", current_time - get_min_wrs_ts);
 | 
			
		||||
        usleep(1000 * 1000);
 | 
			
		||||
        // check success
 | 
			
		||||
      } else if (i == tenant_ids.count() -1) {
 | 
			
		||||
        FLOG_INFO("[OBSERVER_NOTICE] all tenant replay log finished, start to service ", K(tenant_ids));
 | 
			
		||||
        // check timeout
 | 
			
		||||
      } else if (ObTimeUtility::current_time() > expire_time) {
 | 
			
		||||
        FLOG_INFO("[OBSERVER_NOTICE] replay log timeout and force to start service ", K(tenant_id));
 | 
			
		||||
      } else {
 | 
			
		||||
          ret = OB_EAGAIN;
 | 
			
		||||
          ob_usleep(1000 * 1000);
 | 
			
		||||
          if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
 | 
			
		||||
            LOG_INFO("[NOTICE] clog is behind, service starting need to wait !");
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      } else {
 | 
			
		||||
        ret = OB_SUCCESS;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  } while (OB_EAGAIN == ret);
 | 
			
		||||
 | 
			
		||||
  //Wait for OBS to load information related to cluster_info
 | 
			
		||||
  if (OB_SUCC(ret)) {
 | 
			
		||||
    ObClusterRole cluster_role = INVALID_CLUSTER_ROLE;
 | 
			
		||||
    share::ServerServiceStatus server_status = OBSERVER_INVALID_STATUS;
 | 
			
		||||
    while (OB_SUCC(ret)) {
 | 
			
		||||
      gctx_.get_cluster_role_and_status(cluster_role, server_status);
 | 
			
		||||
      if (OBSERVER_INVALID_STATUS == server_status
 | 
			
		||||
          || INVALID_CLUSTER_ROLE == cluster_role) {
 | 
			
		||||
        ob_usleep(1000 * 1000);
 | 
			
		||||
        if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
 | 
			
		||||
          LOG_INFO("[NOTICE] not load cluster info, service starting need to wait !");
 | 
			
		||||
        }
 | 
			
		||||
        if (stop_) {
 | 
			
		||||
          ret = OB_SERVER_IS_STOPPING;
 | 
			
		||||
          break;
 | 
			
		||||
        }
 | 
			
		||||
      } else {
 | 
			
		||||
        break;
 | 
			
		||||
        // do nothing
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
*/
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
ObServer::ObCTASCleanUpTask::ObCTASCleanUpTask()
 | 
			
		||||
 | 
			
		||||
@ -17,6 +17,8 @@
 | 
			
		||||
#include "lib/signal/ob_signal_worker.h"
 | 
			
		||||
#include "lib/net/ob_net_util.h"
 | 
			
		||||
#include "lib/random/ob_mysql_random.h"
 | 
			
		||||
#include "lib/container/ob_iarray.h"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
#include "share/stat/ob_opt_stat_service.h"
 | 
			
		||||
#include "share/ratelimit/ob_rl_mgr.h"
 | 
			
		||||
@ -290,7 +292,8 @@ private:
 | 
			
		||||
  int init_refresh_network_speed_task();
 | 
			
		||||
  int init_refresh_cpu_frequency();
 | 
			
		||||
  int set_running_mode();
 | 
			
		||||
  int check_server_can_start_service();
 | 
			
		||||
  void check_user_tenant_schema_refreshed(const common::ObIArray<uint64_t> &tenant_ids, const int64_t expire_time);
 | 
			
		||||
  void check_log_replay_over(const common::ObIArray<uint64_t> &tenant_ids, const int64_t expire_time);
 | 
			
		||||
  int try_create_hidden_sys();
 | 
			
		||||
  int parse_mode();
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -77,9 +77,10 @@ public:
 | 
			
		||||
  /// @param [out] version     wrs version
 | 
			
		||||
  virtual int get_cluster_version(const uint64_t tenant_id, share::SCN &version) = 0;
 | 
			
		||||
 | 
			
		||||
  /// check server can start service or not
 | 
			
		||||
  virtual void check_server_can_start_service(bool &can_start_service,
 | 
			
		||||
                                              int64_t &min_wrs) const = 0;
 | 
			
		||||
  /// check tenant can start service or not
 | 
			
		||||
  virtual int check_tenant_can_start_service(const uint64_t tenant_id,
 | 
			
		||||
                                             bool &can_start_service,
 | 
			
		||||
                                             share::SCN &version) const = 0;
 | 
			
		||||
 | 
			
		||||
  /// get RPC instance
 | 
			
		||||
  virtual ObIWrsRpc &get_wrs_rpc() = 0;
 | 
			
		||||
 | 
			
		||||
@ -44,6 +44,7 @@ public:
 | 
			
		||||
                                              share::SCN &wrs_version,
 | 
			
		||||
                                              const int64_t max_stale_time);
 | 
			
		||||
  share::SCN get_ls_weak_read_ts() const { return ls_weak_read_ts_; }
 | 
			
		||||
  bool can_skip_ls() const { return !is_enabled_; }
 | 
			
		||||
private:
 | 
			
		||||
  int generate_weak_read_timestamp_(oceanbase::storage::ObLS &ls, const int64_t max_stale_time, share::SCN ×tamp);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -711,6 +711,81 @@ void ObTenantWeakReadService::run1()
 | 
			
		||||
  ISTAT("thread end", K_(tenant_id));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObTenantWeakReadService::check_can_start_service(const SCN ¤t_gts,
 | 
			
		||||
                                                     bool &can_start_service,
 | 
			
		||||
                                                     SCN &min_version,
 | 
			
		||||
                                                     share::ObLSID &ls_id)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  storage::ObLSService *ls_svr = MTL(storage::ObLSService*);
 | 
			
		||||
  common::ObSharedGuard<storage::ObLSIterator> iter;
 | 
			
		||||
  ObLSID tmp_ls_id;
 | 
			
		||||
  SCN tmp_min_version;
 | 
			
		||||
  int64_t total_ls_cnt = 0;
 | 
			
		||||
  can_start_service = true;
 | 
			
		||||
  const int64_t MAX_STALE_TIME = 30 * 1000 * 1000;
 | 
			
		||||
 | 
			
		||||
  if (OB_ISNULL(ls_svr)) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    FLOG_ERROR("unexpected ls service", K(ret), KP(ls_svr));
 | 
			
		||||
  } else if (OB_FAIL(ls_svr->get_ls_iter(iter, ObLSGetMod::TRANS_MOD))) {
 | 
			
		||||
    if (OB_NOT_RUNNING != ret) {
 | 
			
		||||
      FLOG_WARN("fail to alloc ls iter", KR(ret));
 | 
			
		||||
    }
 | 
			
		||||
  } else {
 | 
			
		||||
    int64_t start_time = ObTimeUtility::current_time();
 | 
			
		||||
    while (OB_SUCCESS == ret) {
 | 
			
		||||
      ObLS *ls = NULL;
 | 
			
		||||
      if (OB_FAIL(iter->get_next(ls))) {
 | 
			
		||||
        if (OB_ITER_END == ret) {
 | 
			
		||||
          // do nothing
 | 
			
		||||
        } else {
 | 
			
		||||
          FLOG_WARN("iterate next ls fail", KR(ret));
 | 
			
		||||
        }
 | 
			
		||||
      } else if (OB_ISNULL(ls)) {
 | 
			
		||||
        ret = OB_PARTITION_NOT_EXIST;
 | 
			
		||||
        FLOG_WARN("iterate ls fail", KP(ls));
 | 
			
		||||
      } else if (ls->get_ls_wrs_handler()->can_skip_ls()) {
 | 
			
		||||
        // do nothing
 | 
			
		||||
      } else {
 | 
			
		||||
        ++total_ls_cnt;
 | 
			
		||||
        if (tmp_min_version.is_valid()) {
 | 
			
		||||
          if (ls->get_ls_wrs_handler()->get_ls_weak_read_ts() < tmp_min_version) {
 | 
			
		||||
            tmp_min_version = ls->get_ls_wrs_handler()->get_ls_weak_read_ts();
 | 
			
		||||
            tmp_ls_id = ls->get_ls_id();
 | 
			
		||||
          }
 | 
			
		||||
        } else {
 | 
			
		||||
          tmp_min_version = ls->get_ls_wrs_handler()->get_ls_weak_read_ts();
 | 
			
		||||
          tmp_ls_id = ls->get_ls_id();
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    } // while
 | 
			
		||||
 | 
			
		||||
    if (OB_ITER_END == ret) {
 | 
			
		||||
      ret = OB_SUCCESS;
 | 
			
		||||
    }
 | 
			
		||||
    if (total_ls_cnt == 0) {
 | 
			
		||||
      can_start_service = true;
 | 
			
		||||
      FLOG_INFO("empty ls, no need to wait replaying log", K(current_gts), K(tmp_min_version), K(ls_id));
 | 
			
		||||
    } else if (OB_SUCC(ret)) {
 | 
			
		||||
      min_version = tmp_min_version;
 | 
			
		||||
      ls_id = tmp_ls_id;
 | 
			
		||||
      can_start_service = (tmp_min_version.is_valid() &&
 | 
			
		||||
                           current_gts.convert_to_ts() - MAX_STALE_TIME < tmp_min_version.convert_to_ts());
 | 
			
		||||
      if (!can_start_service) {
 | 
			
		||||
        FLOG_WARN("current ls can not start service, waiting for replaying log",
 | 
			
		||||
            "target_ts", current_gts.convert_to_ts(),
 | 
			
		||||
            "min_ts", tmp_min_version.convert_to_ts(),
 | 
			
		||||
            "delta_us", (current_gts.convert_to_ts() - tmp_min_version.convert_to_ts()),
 | 
			
		||||
            K(ls_id));
 | 
			
		||||
      }
 | 
			
		||||
    } else {
 | 
			
		||||
      // do nothing
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void ObTenantWeakReadService::set_cluster_service_master_(const ObAddr &addr)
 | 
			
		||||
{
 | 
			
		||||
 | 
			
		||||
@ -108,6 +108,10 @@ public:
 | 
			
		||||
 | 
			
		||||
  // get weak read info stat
 | 
			
		||||
  void get_weak_read_stat(ObTenantWeakReadStat &wrs_stat) const;
 | 
			
		||||
  int check_can_start_service(const SCN ¤t_gts,
 | 
			
		||||
                              bool &can_start_service,
 | 
			
		||||
                              SCN &min_version,
 | 
			
		||||
                              share::ObLSID &ls_id);
 | 
			
		||||
public:
 | 
			
		||||
  // tenant level variables init and destroy function
 | 
			
		||||
  static int mtl_init(ObTenantWeakReadService* &twrs);
 | 
			
		||||
 | 
			
		||||
@ -23,6 +23,7 @@
 | 
			
		||||
#include "ob_weak_read_util.h"                               //ObWeakReadUtil
 | 
			
		||||
#include "storage/tx_storage/ob_ls_map.h"
 | 
			
		||||
#include "storage/tx_storage/ob_ls_service.h"
 | 
			
		||||
#include "storage/tx/ob_ts_mgr.h"
 | 
			
		||||
#include "logservice/ob_log_service.h"
 | 
			
		||||
 | 
			
		||||
namespace oceanbase
 | 
			
		||||
@ -134,29 +135,56 @@ int ObWeakReadService::get_cluster_version(const uint64_t tenant_id, SCN &versio
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TODO shanyan.g
 | 
			
		||||
void ObWeakReadService::check_server_can_start_service(bool &can_start_service,
 | 
			
		||||
                                                       int64_t &min_wrs) const
 | 
			
		||||
int ObWeakReadService::check_tenant_can_start_service(const uint64_t tenant_id,
 | 
			
		||||
                                                      bool &can_start_service,
 | 
			
		||||
                                                      SCN &version) const
 | 
			
		||||
{
 | 
			
		||||
  UNUSEDx(can_start_service, min_wrs);
 | 
			
		||||
  // int ret = OB_SUCCESS;
 | 
			
		||||
  // int64_t safe_weak_read_snapshot = INT64_MAX;
 | 
			
		||||
  // ObPartitionService &ps = ObPartitionService::get_instance();
 | 
			
		||||
  // if (OB_FAIL(ps.check_can_start_service(can_start_service, safe_weak_read_snapshot, pkey))) {
 | 
			
		||||
  //   LOG_WARN("partition service check can start service error", KR(ret));
 | 
			
		||||
  // } else if (!can_start_service) {
 | 
			
		||||
  //   if (REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
 | 
			
		||||
  //     LOG_INFO("[WRS] [NOTICE] server can not start service", K(can_start_service),
 | 
			
		||||
  //         K(safe_weak_read_snapshot),
 | 
			
		||||
  //         "delta", ObTimeUtility::current_time() - safe_weak_read_snapshot, K(pkey));
 | 
			
		||||
  //   }
 | 
			
		||||
  // } else {
 | 
			
		||||
  //   LOG_INFO("[WRS] [NOTICE] server can start service", K(can_start_service), K(safe_weak_read_snapshot),
 | 
			
		||||
  //       "delta", ObTimeUtility::current_time() - safe_weak_read_snapshot, K(pkey));
 | 
			
		||||
  // }
 | 
			
		||||
  // if (OB_SUCC(ret)) {
 | 
			
		||||
  //   min_wrs = safe_weak_read_snapshot;
 | 
			
		||||
  // }
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  SCN gts_scn;
 | 
			
		||||
  ObLSID ls_id;
 | 
			
		||||
  SCN min_version;
 | 
			
		||||
 | 
			
		||||
  MTL_SWITCH(tenant_id) {
 | 
			
		||||
    ObTenantWeakReadService *twrs = MTL(ObTenantWeakReadService *);
 | 
			
		||||
    if (OB_ISNULL(twrs)) {
 | 
			
		||||
      ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
      FLOG_ERROR("MTL ObTenantWeakReadService object is NULL", K(twrs), K(tenant_id), KR(ret));
 | 
			
		||||
    } else if (OB_FAIL(OB_TS_MGR.get_gts(tenant_id, NULL, gts_scn))) {
 | 
			
		||||
      FLOG_WARN("[WRS] [OBSERVER_NOTICE] get gts scn error", K(ret), K(tenant_id));
 | 
			
		||||
    } else if (OB_FAIL(twrs->check_can_start_service(gts_scn,
 | 
			
		||||
                                                     can_start_service,
 | 
			
		||||
                                                     min_version,
 | 
			
		||||
                                                     ls_id))) {
 | 
			
		||||
      FLOG_WARN("get tenant weak read service cluster version fail", KR(ret), K(tenant_id));
 | 
			
		||||
    } else if (!can_start_service) {
 | 
			
		||||
      version = min_version;
 | 
			
		||||
    } else {
 | 
			
		||||
      // success
 | 
			
		||||
    }
 | 
			
		||||
  } else {
 | 
			
		||||
    FLOG_WARN("change tenant context fail when get weak read service cluster version",
 | 
			
		||||
        KR(ret), K(tenant_id));
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  if (can_start_service) {
 | 
			
		||||
    FLOG_INFO("[WRS] [OBSERVER_NOTICE] current tenant start service successfully",
 | 
			
		||||
        K(tenant_id),
 | 
			
		||||
        "target_ts", gts_scn.convert_to_ts(),
 | 
			
		||||
        "min_ts", (min_version.is_valid() ? min_version.convert_to_ts() : 0),
 | 
			
		||||
        "delta_us", (min_version.is_valid() ? (gts_scn.convert_to_ts() - min_version.convert_to_ts()) : 0));
 | 
			
		||||
  } else {
 | 
			
		||||
    if (REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
 | 
			
		||||
      int64_t tmp_version = min_version.is_valid() ? min_version.convert_to_ts() : 0;
 | 
			
		||||
      FLOG_INFO("[WRS] [OBSERVER_NOTICE] waiting log replay... ",
 | 
			
		||||
          K(ret),
 | 
			
		||||
          K(tenant_id),
 | 
			
		||||
          K(can_start_service),
 | 
			
		||||
          "min_version", tmp_version,
 | 
			
		||||
          "delta", ObTimeUtility::current_time() - tmp_version,
 | 
			
		||||
          "slowest_ls_id", ls_id);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void ObWeakReadService::process_get_cluster_version_rpc(const uint64_t tenant_id,
 | 
			
		||||
 | 
			
		||||
@ -50,7 +50,7 @@ public:
 | 
			
		||||
  /// get CLUSTER level weak read version
 | 
			
		||||
  int get_cluster_version(const uint64_t tenant_id, share::SCN &version);
 | 
			
		||||
 | 
			
		||||
  void check_server_can_start_service(bool &can_start_service, int64_t &min_wrs) const;
 | 
			
		||||
  int check_tenant_can_start_service(const uint64_t tenant_id, bool &can_start_service, SCN &version) const;
 | 
			
		||||
 | 
			
		||||
  ///////////// RPC process functions /////////////////
 | 
			
		||||
  void process_get_cluster_version_rpc(const uint64_t tenant_id,
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user