fix speed_limit
This commit is contained in:
		
							
								
								
									
										12
									
								
								deps/oblib/src/lib/ob_define.h
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										12
									
								
								deps/oblib/src/lib/ob_define.h
									
									
									
									
										vendored
									
									
								
							@ -2245,6 +2245,18 @@ OB_INLINE bool &get_ignore_mem_limit()
 | 
			
		||||
  return ignore_mem_limit;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
OB_INLINE int64_t &get_seq()
 | 
			
		||||
{
 | 
			
		||||
  RLOCAL_INLINE(int64_t, seq);
 | 
			
		||||
  return seq;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
OB_INLINE bool &tl_need_speed_limit()
 | 
			
		||||
{
 | 
			
		||||
  RLOCAL_INLINE(bool, tl_need_speed_limit);
 | 
			
		||||
  return tl_need_speed_limit;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
OB_INLINE uint32_t &get_writing_throttling_sleep_interval()
 | 
			
		||||
{
 | 
			
		||||
  RLOCAL_INLINE(uint32_t, writing_throttling_sleep_interval);
 | 
			
		||||
 | 
			
		||||
@ -85,7 +85,12 @@ int ObFifoArena::init(uint64_t tenant_id)
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  lib::ObMallocAllocator *allocator = lib::ObMallocAllocator::get_instance();
 | 
			
		||||
  uint64_t ctx_id = ObCtxIds::MEMSTORE_CTX_ID;
 | 
			
		||||
  if (OB_ISNULL(allocator)) {
 | 
			
		||||
  advance_clock_timer_.set_run_wrapper(MTL_CTX());
 | 
			
		||||
  if (OB_FAIL(advance_clock_timer_.init("ADV_CLOCK"))) {
 | 
			
		||||
    STORAGE_LOG(ERROR, "fail to init advance_clock_timer_", K(ret));
 | 
			
		||||
  } else if (OB_FAIL(advance_clock_timer_.schedule(advance_clock_task_, ADVANCE_CLOCK_INTERVAL, true))) {
 | 
			
		||||
    STORAGE_LOG(ERROR, "fail to schedule advance_clock task", K(ret));
 | 
			
		||||
  } else if (OB_ISNULL(allocator)) {
 | 
			
		||||
    ret = OB_INIT_FAIL;
 | 
			
		||||
    OB_LOG(ERROR, "mallocator instance is NULLL", K(ret));
 | 
			
		||||
  } else if (OB_ISNULL(allocator_ = allocator->get_tenant_ctx_allocator(tenant_id, ctx_id))) {
 | 
			
		||||
@ -107,6 +112,9 @@ int ObFifoArena::init(uint64_t tenant_id)
 | 
			
		||||
 | 
			
		||||
void ObFifoArena::reset()
 | 
			
		||||
{
 | 
			
		||||
  advance_clock_timer_.stop();
 | 
			
		||||
  advance_clock_timer_.wait();
 | 
			
		||||
  advance_clock_timer_.destroy();
 | 
			
		||||
  COMMON_LOG(INFO, "MTALLOC.reset", "tenant_id", get_tenant_id());
 | 
			
		||||
  shrink_cached_page(0);
 | 
			
		||||
}
 | 
			
		||||
@ -273,63 +281,105 @@ void ObFifoArena::destroy_page(Page* page)
 | 
			
		||||
 | 
			
		||||
bool ObFifoArena::need_do_writing_throttle() const
 | 
			
		||||
{
 | 
			
		||||
  bool need_do_writing_throttle = false;
 | 
			
		||||
  int64_t trigger_percentage = get_writing_throttling_trigger_percentage_();
 | 
			
		||||
  int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100;
 | 
			
		||||
  int64_t cur_mem_hold = ATOMIC_LOAD(&hold_);
 | 
			
		||||
  bool need_do_writing_throttle = cur_mem_hold > trigger_mem_limit;
 | 
			
		||||
  if (trigger_percentage < 100) {
 | 
			
		||||
    int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100;
 | 
			
		||||
    int64_t cur_mem_hold = ATOMIC_LOAD(&hold_);
 | 
			
		||||
    need_do_writing_throttle = cur_mem_hold > trigger_mem_limit;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return need_do_writing_throttle;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void ObFifoArena::speed_limit(int64_t cur_mem_hold, int64_t alloc_size)
 | 
			
		||||
void ObFifoArena::speed_limit(const int64_t cur_mem_hold, const int64_t alloc_size)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  int64_t trigger_percentage = get_writing_throttling_trigger_percentage_();
 | 
			
		||||
  int64_t trigger_mem_limit = 0;
 | 
			
		||||
  bool need_speed_limit = false;
 | 
			
		||||
  int64_t seq = 0;
 | 
			
		||||
  int64_t throttling_interval = 0;
 | 
			
		||||
  if (trigger_percentage < 100) {
 | 
			
		||||
    if (OB_UNLIKELY(cur_mem_hold < 0 || alloc_size <= 0 || lastest_memstore_threshold_ <= 0 || trigger_percentage <= 0)) {
 | 
			
		||||
      COMMON_LOG(ERROR, "invalid arguments", K(cur_mem_hold), K(alloc_size), K(lastest_memstore_threshold_), K(trigger_percentage));
 | 
			
		||||
    } else if (cur_mem_hold > (trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100)) {
 | 
			
		||||
      need_speed_limit = true;
 | 
			
		||||
      int64_t alloc_duration = get_writing_throttling_maximum_duration_();
 | 
			
		||||
      if (OB_FAIL(throttle_info_.check_and_calc_decay_factor(lastest_memstore_threshold_, trigger_percentage, alloc_duration))) {
 | 
			
		||||
        COMMON_LOG(WARN, "failed to check_and_calc_decay_factor", K(cur_mem_hold), K(alloc_size), K(throttle_info_));
 | 
			
		||||
      } else {
 | 
			
		||||
        int64_t throttling_interval = get_throttling_interval(cur_mem_hold, alloc_size, trigger_mem_limit);
 | 
			
		||||
        int64_t cur_ts = ObTimeUtility::current_time();
 | 
			
		||||
        int64_t new_base_ts = ATOMIC_AAF(&last_base_ts_, throttling_interval);
 | 
			
		||||
        int64_t sleep_interval = new_base_ts - cur_ts;
 | 
			
		||||
        uint32_t final_sleep_interval = 0;
 | 
			
		||||
        if (sleep_interval > 0) {
 | 
			
		||||
          //The playback of a single log may allocate 2M blocks multiple times
 | 
			
		||||
          final_sleep_interval = static_cast<uint32_t>(
 | 
			
		||||
              MIN((get_writing_throttling_sleep_interval() + sleep_interval - 1), MAX_WAIT_INTERVAL));
 | 
			
		||||
          get_writing_throttling_sleep_interval() = final_sleep_interval;
 | 
			
		||||
          throttle_info_.record_limit_event(sleep_interval - 1);
 | 
			
		||||
        } else {
 | 
			
		||||
          inc_update(&last_base_ts_, ObTimeUtility::current_time());
 | 
			
		||||
          throttle_info_.reset_period_stat_info();
 | 
			
		||||
          last_reclaimed_ = ATOMIC_LOAD(&reclaimed_);
 | 
			
		||||
        }
 | 
			
		||||
        if (REACH_TIME_INTERVAL(1 * 1000 * 1000L)) {
 | 
			
		||||
          COMMON_LOG(INFO,
 | 
			
		||||
                     "report write throttle info",
 | 
			
		||||
                     K(alloc_size),
 | 
			
		||||
                     K(throttling_interval),
 | 
			
		||||
                     K(attr_),
 | 
			
		||||
                     "freed memory(MB):", (ATOMIC_LOAD(&reclaimed_) - last_reclaimed_) / 1024 / 1024,
 | 
			
		||||
                     "last_base_ts", ATOMIC_LOAD(&last_base_ts_),
 | 
			
		||||
                     K(cur_mem_hold),
 | 
			
		||||
                     K(throttle_info_),
 | 
			
		||||
                     K(sleep_interval),
 | 
			
		||||
                     K(final_sleep_interval));
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    } else {/*do nothing*/}
 | 
			
		||||
    }
 | 
			
		||||
    seq = ATOMIC_AAF(&max_seq_, alloc_size);
 | 
			
		||||
    get_seq() = seq;
 | 
			
		||||
    tl_need_speed_limit() = need_speed_limit;
 | 
			
		||||
 | 
			
		||||
    if (need_speed_limit && REACH_TIME_INTERVAL(1 * 1000 * 1000L)) {
 | 
			
		||||
      COMMON_LOG(INFO, "report write throttle info", K(alloc_size), K(attr_), K(throttling_interval),
 | 
			
		||||
                  "max_seq_", ATOMIC_LOAD(&max_seq_), K(clock_),
 | 
			
		||||
                  K(cur_mem_hold), K(throttle_info_), K(seq));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int64_t ObFifoArena::get_throttling_interval(int64_t cur_mem_hold,
 | 
			
		||||
                                             int64_t alloc_size,
 | 
			
		||||
                                             int64_t trigger_mem_limit)
 | 
			
		||||
bool ObFifoArena::check_clock_over_seq(const int64_t req)
 | 
			
		||||
{
 | 
			
		||||
  int64_t clock = ATOMIC_LOAD(&clock_);
 | 
			
		||||
  return req <= clock;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void ObFifoArena::advance_clock()
 | 
			
		||||
{
 | 
			
		||||
  bool need_speed_limit = need_do_writing_throttle();
 | 
			
		||||
  int64_t trigger_percentage = get_writing_throttling_trigger_percentage_();
 | 
			
		||||
  int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100;
 | 
			
		||||
  int64_t mem_limit = (need_speed_limit ? calc_mem_limit(hold_, trigger_mem_limit, ADVANCE_CLOCK_INTERVAL) : trigger_mem_limit - hold_);
 | 
			
		||||
  int64_t clock = ATOMIC_LOAD(&clock_);
 | 
			
		||||
  int64_t max_seq = ATOMIC_LOAD(&max_seq_);
 | 
			
		||||
  ATOMIC_SET(&clock_, min(max_seq, clock + mem_limit));
 | 
			
		||||
  if (REACH_TIME_INTERVAL(100 * 1000L)) {
 | 
			
		||||
    COMMON_LOG(INFO, "current clock is ", K(clock_), K(max_seq_), K(mem_limit), K(hold_), K(attr_.tenant_id_));
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int64_t ObFifoArena::expected_wait_time(const int64_t seq) const
 | 
			
		||||
{
 | 
			
		||||
  int64_t expected_wait_time = 0;
 | 
			
		||||
  int64_t trigger_percentage = get_writing_throttling_trigger_percentage_();
 | 
			
		||||
  int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100;
 | 
			
		||||
  int64_t can_assign_in_next_period = calc_mem_limit(hold_, trigger_mem_limit, ADVANCE_CLOCK_INTERVAL);
 | 
			
		||||
  int64_t clock = ATOMIC_LOAD(&clock_);
 | 
			
		||||
  if (seq > clock) {
 | 
			
		||||
    expected_wait_time = (seq - clock) * ADVANCE_CLOCK_INTERVAL / can_assign_in_next_period;
 | 
			
		||||
  }
 | 
			
		||||
  return expected_wait_time;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int64_t ObFifoArena::calc_mem_limit(const int64_t cur_mem_hold, const int64_t trigger_mem_limit, const int64_t dt) const
 | 
			
		||||
{
 | 
			
		||||
  double cur_chunk_seq = static_cast<double>(((cur_mem_hold - trigger_mem_limit) + MEM_SLICE_SIZE - 1)/ (MEM_SLICE_SIZE));
 | 
			
		||||
  int64_t mem_can_be_assigned = 0;
 | 
			
		||||
 | 
			
		||||
  int64_t allocate_size_in_the_page = MEM_SLICE_SIZE - (cur_mem_hold - trigger_mem_limit) % MEM_SLICE_SIZE;
 | 
			
		||||
  int64_t accumulate_interval = 0;
 | 
			
		||||
  int64_t the_page_interval = 0;
 | 
			
		||||
  while (accumulate_interval < dt) {
 | 
			
		||||
    the_page_interval = static_cast<int64_t>(throttle_info_.decay_factor_ * cur_chunk_seq * cur_chunk_seq * cur_chunk_seq) * allocate_size_in_the_page / MEM_SLICE_SIZE;
 | 
			
		||||
    accumulate_interval += the_page_interval;
 | 
			
		||||
 | 
			
		||||
    mem_can_be_assigned += (accumulate_interval > dt ?
 | 
			
		||||
                            allocate_size_in_the_page - (accumulate_interval - dt) * allocate_size_in_the_page / the_page_interval :
 | 
			
		||||
                            allocate_size_in_the_page);
 | 
			
		||||
    allocate_size_in_the_page = MEM_SLICE_SIZE;
 | 
			
		||||
    cur_chunk_seq += double(1);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return mem_can_be_assigned;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int64_t ObFifoArena::get_throttling_interval(const int64_t cur_mem_hold,
 | 
			
		||||
                                             const int64_t alloc_size,
 | 
			
		||||
                                             const int64_t trigger_mem_limit)
 | 
			
		||||
{
 | 
			
		||||
  constexpr int64_t MIN_INTERVAL_PER_ALLOC = 20;
 | 
			
		||||
  int64_t chunk_cnt = ((alloc_size + MEM_SLICE_SIZE - 1) / (MEM_SLICE_SIZE));
 | 
			
		||||
 | 
			
		||||
@ -18,6 +18,8 @@
 | 
			
		||||
#include "lib/allocator/ob_qsync.h"
 | 
			
		||||
#include "lib/allocator/ob_malloc.h"
 | 
			
		||||
#include "lib/allocator/ob_allocator.h"
 | 
			
		||||
#include "lib/lock/ob_spin_rwlock.h"           // SpinRWLock
 | 
			
		||||
#include "lib/task/ob_timer.h"
 | 
			
		||||
 | 
			
		||||
namespace oceanbase
 | 
			
		||||
{
 | 
			
		||||
@ -141,9 +143,21 @@ public:
 | 
			
		||||
    Ref* ref_[MAX_NWAY];
 | 
			
		||||
    int64_t allocated_;
 | 
			
		||||
  };
 | 
			
		||||
 | 
			
		||||
  class ObAdvanceClockTask : public common::ObTimerTask
 | 
			
		||||
  {
 | 
			
		||||
  public:
 | 
			
		||||
    ObAdvanceClockTask(ObFifoArena &arena) : arena_(arena) {}
 | 
			
		||||
    virtual ~ObAdvanceClockTask() {}
 | 
			
		||||
    virtual void runTimerTask() {
 | 
			
		||||
      arena_.advance_clock();
 | 
			
		||||
    }
 | 
			
		||||
  private:
 | 
			
		||||
    ObFifoArena &arena_;
 | 
			
		||||
  };
 | 
			
		||||
public:
 | 
			
		||||
  enum { MAX_CACHED_GROUP_COUNT = 16, MAX_CACHED_PAGE_COUNT = MAX_CACHED_GROUP_COUNT * Handle::MAX_NWAY, PAGE_SIZE = OB_MALLOC_BIG_BLOCK_SIZE + sizeof(Page) + sizeof(Ref)};
 | 
			
		||||
  ObFifoArena(): allocator_(NULL), nway_(0), allocated_(0), reclaimed_(0), hold_(0), retired_(0), last_base_ts_(0),
 | 
			
		||||
  ObFifoArena(): allocator_(NULL), nway_(0), allocated_(0), reclaimed_(0), hold_(0), retired_(0), max_seq_(0), clock_(0), advance_clock_timer_(), advance_clock_task_(*this),
 | 
			
		||||
    last_reclaimed_(0), lastest_memstore_threshold_(0)
 | 
			
		||||
    { memset(cur_pages_, 0, sizeof(cur_pages_)); }
 | 
			
		||||
  ~ObFifoArena() { reset(); }
 | 
			
		||||
@ -164,6 +178,8 @@ public:
 | 
			
		||||
 | 
			
		||||
  void set_memstore_threshold(int64_t memstore_threshold);
 | 
			
		||||
  bool need_do_writing_throttle() const;
 | 
			
		||||
  bool check_clock_over_seq(const int64_t seq);
 | 
			
		||||
  int64_t expected_wait_time(const int64_t seq) const;
 | 
			
		||||
private:
 | 
			
		||||
  ObQSync& get_qs() {
 | 
			
		||||
    static ObQSync s_qs;
 | 
			
		||||
@ -209,15 +225,18 @@ private:
 | 
			
		||||
  void retire_page(int64_t way_id, Handle& handle, Page* ptr);
 | 
			
		||||
  void destroy_page(Page* page);
 | 
			
		||||
  void shrink_cached_page(int64_t nway);
 | 
			
		||||
  void speed_limit(int64_t cur_mem_hold, int64_t alloc_size);
 | 
			
		||||
  int64_t get_throttling_interval(int64_t cur_mem_hold,
 | 
			
		||||
                               int64_t alloc_size,
 | 
			
		||||
                               int64_t trigger_mem_limit);
 | 
			
		||||
  void speed_limit(const int64_t cur_mem_hold, const int64_t alloc_size);
 | 
			
		||||
  int64_t get_throttling_interval(const int64_t cur_mem_hold,
 | 
			
		||||
                                  const int64_t alloc_size,
 | 
			
		||||
                                  const int64_t trigger_mem_limit);
 | 
			
		||||
  void advance_clock();
 | 
			
		||||
  int64_t calc_mem_limit(const int64_t cur_mem_hold, const int64_t trigger_mem_limit, const int64_t dt) const;
 | 
			
		||||
  int64_t get_actual_hold_size(Page* page);
 | 
			
		||||
  int64_t get_writing_throttling_trigger_percentage_() const;
 | 
			
		||||
  int64_t get_writing_throttling_maximum_duration_() const;
 | 
			
		||||
private:
 | 
			
		||||
  static const int64_t MAX_WAIT_INTERVAL = 20 * 1000 * 1000;//20s
 | 
			
		||||
  static const int64_t ADVANCE_CLOCK_INTERVAL = 200;// 200us
 | 
			
		||||
  static const int64_t MEM_SLICE_SIZE = 2 * 1024 * 1024; //Bytes per usecond
 | 
			
		||||
  static const int64_t MIN_INTERVAL = 20000;
 | 
			
		||||
  static const int64_t DEFAULT_TRIGGER_PERCENTAGE = 100;
 | 
			
		||||
@ -229,7 +248,15 @@ private:
 | 
			
		||||
  int64_t reclaimed_;
 | 
			
		||||
  int64_t hold_;//for single tenant
 | 
			
		||||
  int64_t retired_;
 | 
			
		||||
  int64_t last_base_ts_;
 | 
			
		||||
 | 
			
		||||
  // typedef common::SpinRWLock RWLock;
 | 
			
		||||
  // typedef common::SpinRLockGuard  RLockGuard;
 | 
			
		||||
  // typedef common::SpinWLockGuard  WLockGuard;
 | 
			
		||||
  // RWLock rwlock_;
 | 
			
		||||
  int64_t max_seq_;
 | 
			
		||||
  int64_t clock_;
 | 
			
		||||
  common::ObTimer advance_clock_timer_;
 | 
			
		||||
  ObAdvanceClockTask advance_clock_task_;
 | 
			
		||||
 | 
			
		||||
  int64_t last_reclaimed_;
 | 
			
		||||
  Page* cur_pages_[MAX_CACHED_PAGE_COUNT];
 | 
			
		||||
 | 
			
		||||
@ -145,6 +145,14 @@ public:
 | 
			
		||||
public:
 | 
			
		||||
  int set_memstore_threshold(uint64_t tenant_id);
 | 
			
		||||
  bool need_do_writing_throttle() const {return arena_.need_do_writing_throttle();}
 | 
			
		||||
  bool check_clock_over_seq(int64_t seq)
 | 
			
		||||
  {
 | 
			
		||||
    return arena_.check_clock_over_seq(seq);
 | 
			
		||||
  }
 | 
			
		||||
  int64_t expected_wait_time(int64_t seq) const
 | 
			
		||||
  {
 | 
			
		||||
    return arena_.expected_wait_time(seq);
 | 
			
		||||
  }
 | 
			
		||||
  int64_t get_retire_clock() const { return arena_.retired(); }
 | 
			
		||||
  bool exist_active_memtable_below_clock(const int64_t clock) const {
 | 
			
		||||
    return hlist_.hazard() < clock;
 | 
			
		||||
 | 
			
		||||
@ -45,53 +45,68 @@ ObStorageTableGuard::ObStorageTableGuard(
 | 
			
		||||
    replay_scn_(replay_scn),
 | 
			
		||||
    for_multi_source_data_(for_multi_source_data)
 | 
			
		||||
{
 | 
			
		||||
  get_writing_throttling_sleep_interval() = 0;
 | 
			
		||||
  init_ts_ = ObTimeUtility::current_time();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
ObStorageTableGuard::~ObStorageTableGuard()
 | 
			
		||||
{
 | 
			
		||||
  uint32_t &interval = get_writing_throttling_sleep_interval();
 | 
			
		||||
  if (need_control_mem_ && interval > 0) {
 | 
			
		||||
    uint64_t timeout = 10000;//10s
 | 
			
		||||
    common::ObWaitEventGuard wait_guard(common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, interval);
 | 
			
		||||
  bool &need_speed_limit = tl_need_speed_limit();
 | 
			
		||||
  if (need_control_mem_ && need_speed_limit) {
 | 
			
		||||
    bool need_sleep = true;
 | 
			
		||||
    const int32_t SLEEP_INTERVAL_PER_TIME = 100 * 1000;//100ms
 | 
			
		||||
    int64_t left_interval = interval;
 | 
			
		||||
    int64_t left_interval = SPEED_LIMIT_MAX_SLEEP_TIME;
 | 
			
		||||
    if (!for_replay_) {
 | 
			
		||||
      int64_t query_left_time = store_ctx_.timeout_ - ObTimeUtility::current_time();
 | 
			
		||||
      left_interval = common::min(left_interval, query_left_time);
 | 
			
		||||
      left_interval = min(SPEED_LIMIT_MAX_SLEEP_TIME, store_ctx_.timeout_ - ObTimeUtility::current_time());
 | 
			
		||||
    }
 | 
			
		||||
    if (NULL != memtable_) {
 | 
			
		||||
      need_sleep = memtable_->is_active_memtable();
 | 
			
		||||
    }
 | 
			
		||||
    uint64_t timeout = 10000;//10s
 | 
			
		||||
    common::ObWaitEventGuard wait_guard(common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval);
 | 
			
		||||
 | 
			
		||||
    reset();
 | 
			
		||||
    int tmp_ret = OB_SUCCESS;
 | 
			
		||||
    bool has_sleep = false;
 | 
			
		||||
 | 
			
		||||
    while ((left_interval > 0) && need_sleep) {
 | 
			
		||||
      //because left_interval and SLEEP_INTERVAL_PER_TIME both are greater than
 | 
			
		||||
      //zero, so it's safe to convert to uint32_t, be careful with comparation between int and uint
 | 
			
		||||
      uint32_t sleep_interval = static_cast<uint32_t>(min(left_interval, SLEEP_INTERVAL_PER_TIME));
 | 
			
		||||
      if (for_replay_) {
 | 
			
		||||
        if(MTL(ObTenantFreezer *)->exist_ls_freezing()) {
 | 
			
		||||
          break;
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      ob_usleep<common::ObWaitEventIds::STORAGE_WRITING_THROTTLE_SLEEP>(sleep_interval);
 | 
			
		||||
      has_sleep = true;
 | 
			
		||||
      left_interval -= sleep_interval;
 | 
			
		||||
      if (store_ctx_.mvcc_acc_ctx_.is_write()) {
 | 
			
		||||
        ObGMemstoreAllocator* memstore_allocator = NULL;
 | 
			
		||||
        if (OB_SUCCESS != (tmp_ret = ObMemstoreAllocatorMgr::get_instance().get_tenant_memstore_allocator(
 | 
			
		||||
            MTL_ID(), memstore_allocator))) {
 | 
			
		||||
        } else if (OB_ISNULL(memstore_allocator)) {
 | 
			
		||||
          LOG_WARN("get_tenant_mutil_allocator failed", K(store_ctx_.tablet_id_), K(tmp_ret));
 | 
			
		||||
        } else {
 | 
			
		||||
    int64_t sleep_time = 0;
 | 
			
		||||
    int time = 0;
 | 
			
		||||
    int64_t &seq = get_seq();
 | 
			
		||||
    if (store_ctx_.mvcc_acc_ctx_.is_write()) {
 | 
			
		||||
      ObGMemstoreAllocator* memstore_allocator = NULL;
 | 
			
		||||
      if (OB_SUCCESS != (tmp_ret = ObMemstoreAllocatorMgr::get_instance().get_tenant_memstore_allocator(
 | 
			
		||||
          MTL_ID(), memstore_allocator))) {
 | 
			
		||||
      } else if (OB_ISNULL(memstore_allocator)) {
 | 
			
		||||
        LOG_WARN("get_tenant_mutil_allocator failed", K(store_ctx_.tablet_id_), K(tmp_ret));
 | 
			
		||||
      } else {
 | 
			
		||||
        while (need_sleep &&
 | 
			
		||||
               !memstore_allocator->check_clock_over_seq(seq) &&
 | 
			
		||||
               (left_interval > 0)) {
 | 
			
		||||
          if (for_replay_) {
 | 
			
		||||
            if(MTL(ObTenantFreezer *)->exist_ls_freezing()) {
 | 
			
		||||
              break;
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
          //because left_interval and SLEEP_INTERVAL_PER_TIME both are greater than
 | 
			
		||||
          //zero, so it's safe to convert to uint32_t, be careful with comparation between int and uint
 | 
			
		||||
          int64_t expected_wait_time = memstore_allocator->expected_wait_time(seq);
 | 
			
		||||
          if (expected_wait_time == 0) {
 | 
			
		||||
            break;
 | 
			
		||||
          }
 | 
			
		||||
          uint32_t sleep_interval =
 | 
			
		||||
            static_cast<uint32_t>(min(min(left_interval, SLEEP_INTERVAL_PER_TIME), expected_wait_time));
 | 
			
		||||
          ob_usleep<common::ObWaitEventIds::STORAGE_WRITING_THROTTLE_SLEEP>(sleep_interval);
 | 
			
		||||
          sleep_time += sleep_interval;
 | 
			
		||||
          time++;
 | 
			
		||||
          left_interval -= sleep_interval;
 | 
			
		||||
          has_sleep = true;
 | 
			
		||||
          need_sleep = memstore_allocator->need_do_writing_throttle();
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (REACH_TIME_INTERVAL(100 * 1000L)) {
 | 
			
		||||
      int64_t cost_time = ObTimeUtility::current_time() - init_ts_;
 | 
			
		||||
      LOG_INFO("throttle situation", K(sleep_time), K(time), K(seq), K(for_replay_), K(cost_time));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (for_replay_ && has_sleep) {
 | 
			
		||||
      // avoid print replay_timeout
 | 
			
		||||
      get_replay_is_writing_throttling() = true;
 | 
			
		||||
@ -384,4 +399,4 @@ bool ObStorageTableGuard::check_if_need_log()
 | 
			
		||||
  return need_log;
 | 
			
		||||
}
 | 
			
		||||
} // namespace storage
 | 
			
		||||
} // namespace oceanbase
 | 
			
		||||
} // namespace oceanbase
 | 
			
		||||
@ -63,6 +63,8 @@ private:
 | 
			
		||||
private:
 | 
			
		||||
  static const int64_t LOG_INTERVAL_US = 10 * 1000 * 1000;
 | 
			
		||||
  static const int64_t GET_TS_INTERVAL = 10 * 1000;
 | 
			
		||||
  static const int64_t SPEED_LIMIT_MAX_SLEEP_TIME = 20 * 1000 * 1000;
 | 
			
		||||
  static const int64_t SLEEP_INTERVAL_PER_TIME = 20 * 1000;
 | 
			
		||||
 | 
			
		||||
  ObTablet *tablet_;
 | 
			
		||||
  ObStoreCtx &store_ctx_;
 | 
			
		||||
@ -70,6 +72,8 @@ private:
 | 
			
		||||
  memtable::ObIMemtable *memtable_;
 | 
			
		||||
  int64_t retry_count_;
 | 
			
		||||
  int64_t last_ts_;
 | 
			
		||||
  // record write latency
 | 
			
		||||
  int64_t init_ts_;
 | 
			
		||||
  bool for_replay_;
 | 
			
		||||
  share::SCN replay_scn_;
 | 
			
		||||
  bool for_multi_source_data_;
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user