[Net Standby] Set global fetch log upper limit
This commit is contained in:
		@ -498,17 +498,24 @@ int ObLogFetcher::get_ls_proposal_id(const share::ObLSID &ls_id, int64_t &propos
 | 
			
		||||
      LOG_WARN("get_tls_proposal_id failed", K(tls_id));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TODO support
 | 
			
		||||
int ObLogFetcher::update_fetching_log_upper_limit(const share::SCN &upper_limit_scn)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  int64_t upper_limit_ts_ns = -1;
 | 
			
		||||
 | 
			
		||||
  if (IS_NOT_INIT) {
 | 
			
		||||
    ret = OB_NOT_INIT;
 | 
			
		||||
    LOG_ERROR("LogFetcher is not inited", KR(ret));
 | 
			
		||||
  } else if (OB_UNLIKELY(!upper_limit_scn.is_valid())) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid argument", K(upper_limit_scn));
 | 
			
		||||
  } else if (FALSE_IT(upper_limit_ts_ns = upper_limit_scn.convert_to_ts() * 1000L)) {
 | 
			
		||||
  } else if (OB_FAIL(progress_controller_.set_global_upper_limit(upper_limit_ts_ns))) {
 | 
			
		||||
    LOG_WARN("set_global_upper_limit failed", K(upper_limit_scn), K(upper_limit_ts_ns));
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return ret;
 | 
			
		||||
 | 
			
		||||
@ -195,7 +195,6 @@ public:
 | 
			
		||||
  virtual void resume();
 | 
			
		||||
  virtual bool is_paused();
 | 
			
		||||
  virtual void mark_stop_flag();
 | 
			
		||||
  int set_restore_log_upper_limit(const share::SCN &upper_limit) { return 0; }
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
  virtual void configure(const ObLogFetcherConfig &cfg);
 | 
			
		||||
 | 
			
		||||
@ -451,6 +451,7 @@ int FetchStream::get_upper_limit(int64_t &upper_limit_us)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  int64_t min_progress = OB_INVALID_TIMESTAMP;
 | 
			
		||||
  int64_t global_upper_limit = OB_INVALID_TIMESTAMP;
 | 
			
		||||
 | 
			
		||||
  if (OB_ISNULL(progress_controller_)) {
 | 
			
		||||
    LOG_ERROR("invalid progress controller", K(progress_controller_));
 | 
			
		||||
@ -470,6 +471,11 @@ int FetchStream::get_upper_limit(int64_t &upper_limit_us)
 | 
			
		||||
      // Other partition are limited by progress limit
 | 
			
		||||
      upper_limit_us = min_progress + ATOMIC_LOAD(&g_dml_progress_limit) * NS_CONVERSION;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    global_upper_limit = progress_controller_->get_global_upper_limit();
 | 
			
		||||
    if (OB_INVALID_TIMESTAMP != global_upper_limit) {
 | 
			
		||||
      upper_limit_us = std::min(upper_limit_us, global_upper_limit);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return ret;
 | 
			
		||||
 | 
			
		||||
@ -39,7 +39,8 @@ PartProgressController::PartProgressController() :
 | 
			
		||||
    valid_progress_cnt_(0),
 | 
			
		||||
    thread_counter_(0),
 | 
			
		||||
    last_global_count_and_timeval_(),
 | 
			
		||||
    global_count_and_timeval_()
 | 
			
		||||
    global_count_and_timeval_(),
 | 
			
		||||
    global_fetch_log_upper_limit_(OB_INVALID_TIMESTAMP)
 | 
			
		||||
{
 | 
			
		||||
  last_global_count_and_timeval_.lo = 0;
 | 
			
		||||
  last_global_count_and_timeval_.hi = 0;
 | 
			
		||||
@ -108,6 +109,7 @@ void PartProgressController::destroy()
 | 
			
		||||
  last_global_count_and_timeval_.hi = 0;
 | 
			
		||||
  global_count_and_timeval_.lo = 0;
 | 
			
		||||
  global_count_and_timeval_.hi = 0;
 | 
			
		||||
  global_fetch_log_upper_limit_ = OB_INVALID_TIMESTAMP;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int PartProgressController::acquire_progress(int64_t &progress_id, const int64_t start_progress)
 | 
			
		||||
@ -249,6 +251,26 @@ int PartProgressController::get_min_progress(int64_t &progress)
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int PartProgressController::set_global_upper_limit(const int64_t global_upper_limit)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
 | 
			
		||||
  if (OB_UNLIKELY(! inited_)) {
 | 
			
		||||
    LOG_ERROR("not init");
 | 
			
		||||
    ret = OB_NOT_INIT;
 | 
			
		||||
  } else if (OB_UNLIKELY(OB_INVALID_TIMESTAMP == global_upper_limit
 | 
			
		||||
        || global_upper_limit < ATOMIC_LOAD(&global_fetch_log_upper_limit_))) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid argument", K(global_upper_limit), K(global_fetch_log_upper_limit_));
 | 
			
		||||
  } else {
 | 
			
		||||
    ATOMIC_STORE(&global_fetch_log_upper_limit_, global_upper_limit);
 | 
			
		||||
 | 
			
		||||
    LOG_TRACE("[FETCHER] [SET_GLOBAL_UPPER_LIMIT]", K_(global_fetch_log_upper_limit));
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int64_t PartProgressController::get_itid_()
 | 
			
		||||
{
 | 
			
		||||
  static TLOCAL(int64_t, index) = -1;
 | 
			
		||||
 | 
			
		||||
@ -59,10 +59,15 @@ public:
 | 
			
		||||
  /// Get the current minimum progress value
 | 
			
		||||
  int get_min_progress(int64_t &progress);
 | 
			
		||||
 | 
			
		||||
  int64_t get_global_upper_limit() const { return ATOMIC_LOAD(&global_fetch_log_upper_limit_); }
 | 
			
		||||
 | 
			
		||||
  int set_global_upper_limit(const int64_t global_upper_limit);
 | 
			
		||||
 | 
			
		||||
  TO_STRING_KV(K_(progress_cnt),
 | 
			
		||||
      K_(valid_progress_cnt),
 | 
			
		||||
      "recycled_cnt", recycled_indices_.count(),
 | 
			
		||||
      K_(max_progress_cnt));
 | 
			
		||||
      K_(max_progress_cnt),
 | 
			
		||||
      K_(global_fetch_log_upper_limit));
 | 
			
		||||
private:
 | 
			
		||||
  // Assign IDs to each thread
 | 
			
		||||
  int64_t get_itid_();
 | 
			
		||||
@ -120,6 +125,9 @@ private:
 | 
			
		||||
  types::uint128_t last_global_count_and_timeval_;
 | 
			
		||||
  types::uint128_t global_count_and_timeval_;
 | 
			
		||||
 | 
			
		||||
  // global fetch log upper limit, used to control ls fetch log progress for net standby(timestamp with ns)
 | 
			
		||||
  int64_t global_fetch_log_upper_limit_;
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
  DISALLOW_COPY_AND_ASSIGN(PartProgressController);
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
@ -539,7 +539,7 @@ int ObLogRestoreNetDriver::set_restore_log_upper_limit_()
 | 
			
		||||
    // do nothing
 | 
			
		||||
  } else if (OB_FAIL(ObLogRestoreDriverBase::get_upper_resotore_scn(upper_limit_scn))) {
 | 
			
		||||
    LOG_WARN("get upper limit scn failed");
 | 
			
		||||
  } else if (OB_FAIL(fetcher_->set_restore_log_upper_limit(upper_limit_scn))) {
 | 
			
		||||
  } else if (OB_FAIL(fetcher_->update_fetching_log_upper_limit(upper_limit_scn))) {
 | 
			
		||||
    LOG_WARN("set restore log upper limit failed", K(upper_limit_scn));
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user