From 5c19d8c8c7cba612a7da20fa093795e6c0aac71f Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 19 Dec 2022 13:23:14 +0000 Subject: [PATCH] fix speed_limit --- deps/oblib/src/lib/ob_define.h | 12 ++ src/share/allocator/ob_fifo_arena.cpp | 128 +++++++++++++------ src/share/allocator/ob_fifo_arena.h | 39 +++++- src/share/allocator/ob_gmemstore_allocator.h | 8 ++ src/storage/ob_storage_table_guard.cpp | 75 ++++++----- src/storage/ob_storage_table_guard.h | 4 + 6 files changed, 191 insertions(+), 75 deletions(-) diff --git a/deps/oblib/src/lib/ob_define.h b/deps/oblib/src/lib/ob_define.h index 1d860cb87..517455b8e 100644 --- a/deps/oblib/src/lib/ob_define.h +++ b/deps/oblib/src/lib/ob_define.h @@ -2245,6 +2245,18 @@ OB_INLINE bool &get_ignore_mem_limit() return ignore_mem_limit; } +OB_INLINE int64_t &get_seq() +{ + RLOCAL_INLINE(int64_t, seq); + return seq; +} + +OB_INLINE bool &tl_need_speed_limit() +{ + RLOCAL_INLINE(bool, tl_need_speed_limit); + return tl_need_speed_limit; +} + OB_INLINE uint32_t &get_writing_throttling_sleep_interval() { RLOCAL_INLINE(uint32_t, writing_throttling_sleep_interval); diff --git a/src/share/allocator/ob_fifo_arena.cpp b/src/share/allocator/ob_fifo_arena.cpp index 6c87c9020..396a0fcde 100644 --- a/src/share/allocator/ob_fifo_arena.cpp +++ b/src/share/allocator/ob_fifo_arena.cpp @@ -85,7 +85,12 @@ int ObFifoArena::init(uint64_t tenant_id) int ret = OB_SUCCESS; lib::ObMallocAllocator *allocator = lib::ObMallocAllocator::get_instance(); uint64_t ctx_id = ObCtxIds::MEMSTORE_CTX_ID; - if (OB_ISNULL(allocator)) { + advance_clock_timer_.set_run_wrapper(MTL_CTX()); + if (OB_FAIL(advance_clock_timer_.init("ADV_CLOCK"))) { + STORAGE_LOG(ERROR, "fail to init advance_clock_timer_", K(ret)); + } else if (OB_FAIL(advance_clock_timer_.schedule(advance_clock_task_, ADVANCE_CLOCK_INTERVAL, true))) { + STORAGE_LOG(ERROR, "fail to schedule advance_clock task", K(ret)); + } else if (OB_ISNULL(allocator)) { ret = OB_INIT_FAIL; OB_LOG(ERROR, "mallocator instance is NULLL", K(ret)); } else if (OB_ISNULL(allocator_ = allocator->get_tenant_ctx_allocator(tenant_id, ctx_id))) { @@ -107,6 +112,9 @@ int ObFifoArena::init(uint64_t tenant_id) void ObFifoArena::reset() { + advance_clock_timer_.stop(); + advance_clock_timer_.wait(); + advance_clock_timer_.destroy(); COMMON_LOG(INFO, "MTALLOC.reset", "tenant_id", get_tenant_id()); shrink_cached_page(0); } @@ -273,63 +281,105 @@ void ObFifoArena::destroy_page(Page* page) bool ObFifoArena::need_do_writing_throttle() const { + bool need_do_writing_throttle = false; int64_t trigger_percentage = get_writing_throttling_trigger_percentage_(); - int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100; - int64_t cur_mem_hold = ATOMIC_LOAD(&hold_); - bool need_do_writing_throttle = cur_mem_hold > trigger_mem_limit; + if (trigger_percentage < 100) { + int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100; + int64_t cur_mem_hold = ATOMIC_LOAD(&hold_); + need_do_writing_throttle = cur_mem_hold > trigger_mem_limit; + } + return need_do_writing_throttle; } -void ObFifoArena::speed_limit(int64_t cur_mem_hold, int64_t alloc_size) +void ObFifoArena::speed_limit(const int64_t cur_mem_hold, const int64_t alloc_size) { int ret = OB_SUCCESS; int64_t trigger_percentage = get_writing_throttling_trigger_percentage_(); int64_t trigger_mem_limit = 0; + bool need_speed_limit = false; + int64_t seq = 0; + int64_t throttling_interval = 0; if (trigger_percentage < 100) { if (OB_UNLIKELY(cur_mem_hold < 0 || alloc_size <= 0 || lastest_memstore_threshold_ <= 0 || trigger_percentage <= 0)) { COMMON_LOG(ERROR, "invalid arguments", K(cur_mem_hold), K(alloc_size), K(lastest_memstore_threshold_), K(trigger_percentage)); } else if (cur_mem_hold > (trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100)) { + need_speed_limit = true; int64_t alloc_duration = get_writing_throttling_maximum_duration_(); if (OB_FAIL(throttle_info_.check_and_calc_decay_factor(lastest_memstore_threshold_, trigger_percentage, alloc_duration))) { COMMON_LOG(WARN, "failed to check_and_calc_decay_factor", K(cur_mem_hold), K(alloc_size), K(throttle_info_)); - } else { - int64_t throttling_interval = get_throttling_interval(cur_mem_hold, alloc_size, trigger_mem_limit); - int64_t cur_ts = ObTimeUtility::current_time(); - int64_t new_base_ts = ATOMIC_AAF(&last_base_ts_, throttling_interval); - int64_t sleep_interval = new_base_ts - cur_ts; - uint32_t final_sleep_interval = 0; - if (sleep_interval > 0) { - //The playback of a single log may allocate 2M blocks multiple times - final_sleep_interval = static_cast( - MIN((get_writing_throttling_sleep_interval() + sleep_interval - 1), MAX_WAIT_INTERVAL)); - get_writing_throttling_sleep_interval() = final_sleep_interval; - throttle_info_.record_limit_event(sleep_interval - 1); - } else { - inc_update(&last_base_ts_, ObTimeUtility::current_time()); - throttle_info_.reset_period_stat_info(); - last_reclaimed_ = ATOMIC_LOAD(&reclaimed_); - } - if (REACH_TIME_INTERVAL(1 * 1000 * 1000L)) { - COMMON_LOG(INFO, - "report write throttle info", - K(alloc_size), - K(throttling_interval), - K(attr_), - "freed memory(MB):", (ATOMIC_LOAD(&reclaimed_) - last_reclaimed_) / 1024 / 1024, - "last_base_ts", ATOMIC_LOAD(&last_base_ts_), - K(cur_mem_hold), - K(throttle_info_), - K(sleep_interval), - K(final_sleep_interval)); - } } - } else {/*do nothing*/} + } + seq = ATOMIC_AAF(&max_seq_, alloc_size); + get_seq() = seq; + tl_need_speed_limit() = need_speed_limit; + + if (need_speed_limit && REACH_TIME_INTERVAL(1 * 1000 * 1000L)) { + COMMON_LOG(INFO, "report write throttle info", K(alloc_size), K(attr_), K(throttling_interval), + "max_seq_", ATOMIC_LOAD(&max_seq_), K(clock_), + K(cur_mem_hold), K(throttle_info_), K(seq)); + } } } -int64_t ObFifoArena::get_throttling_interval(int64_t cur_mem_hold, - int64_t alloc_size, - int64_t trigger_mem_limit) +bool ObFifoArena::check_clock_over_seq(const int64_t req) +{ + int64_t clock = ATOMIC_LOAD(&clock_); + return req <= clock; +} + +void ObFifoArena::advance_clock() +{ + bool need_speed_limit = need_do_writing_throttle(); + int64_t trigger_percentage = get_writing_throttling_trigger_percentage_(); + int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100; + int64_t mem_limit = (need_speed_limit ? calc_mem_limit(hold_, trigger_mem_limit, ADVANCE_CLOCK_INTERVAL) : trigger_mem_limit - hold_); + int64_t clock = ATOMIC_LOAD(&clock_); + int64_t max_seq = ATOMIC_LOAD(&max_seq_); + ATOMIC_SET(&clock_, min(max_seq, clock + mem_limit)); + if (REACH_TIME_INTERVAL(100 * 1000L)) { + COMMON_LOG(INFO, "current clock is ", K(clock_), K(max_seq_), K(mem_limit), K(hold_), K(attr_.tenant_id_)); + } +} + +int64_t ObFifoArena::expected_wait_time(const int64_t seq) const +{ + int64_t expected_wait_time = 0; + int64_t trigger_percentage = get_writing_throttling_trigger_percentage_(); + int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100; + int64_t can_assign_in_next_period = calc_mem_limit(hold_, trigger_mem_limit, ADVANCE_CLOCK_INTERVAL); + int64_t clock = ATOMIC_LOAD(&clock_); + if (seq > clock) { + expected_wait_time = (seq - clock) * ADVANCE_CLOCK_INTERVAL / can_assign_in_next_period; + } + return expected_wait_time; +} + +int64_t ObFifoArena::calc_mem_limit(const int64_t cur_mem_hold, const int64_t trigger_mem_limit, const int64_t dt) const +{ + double cur_chunk_seq = static_cast(((cur_mem_hold - trigger_mem_limit) + MEM_SLICE_SIZE - 1)/ (MEM_SLICE_SIZE)); + int64_t mem_can_be_assigned = 0; + + int64_t allocate_size_in_the_page = MEM_SLICE_SIZE - (cur_mem_hold - trigger_mem_limit) % MEM_SLICE_SIZE; + int64_t accumulate_interval = 0; + int64_t the_page_interval = 0; + while (accumulate_interval < dt) { + the_page_interval = static_cast(throttle_info_.decay_factor_ * cur_chunk_seq * cur_chunk_seq * cur_chunk_seq) * allocate_size_in_the_page / MEM_SLICE_SIZE; + accumulate_interval += the_page_interval; + + mem_can_be_assigned += (accumulate_interval > dt ? + allocate_size_in_the_page - (accumulate_interval - dt) * allocate_size_in_the_page / the_page_interval : + allocate_size_in_the_page); + allocate_size_in_the_page = MEM_SLICE_SIZE; + cur_chunk_seq += double(1); + } + + return mem_can_be_assigned; +} + +int64_t ObFifoArena::get_throttling_interval(const int64_t cur_mem_hold, + const int64_t alloc_size, + const int64_t trigger_mem_limit) { constexpr int64_t MIN_INTERVAL_PER_ALLOC = 20; int64_t chunk_cnt = ((alloc_size + MEM_SLICE_SIZE - 1) / (MEM_SLICE_SIZE)); diff --git a/src/share/allocator/ob_fifo_arena.h b/src/share/allocator/ob_fifo_arena.h index 3c06b263a..6ad2ef69d 100644 --- a/src/share/allocator/ob_fifo_arena.h +++ b/src/share/allocator/ob_fifo_arena.h @@ -18,6 +18,8 @@ #include "lib/allocator/ob_qsync.h" #include "lib/allocator/ob_malloc.h" #include "lib/allocator/ob_allocator.h" +#include "lib/lock/ob_spin_rwlock.h" // SpinRWLock +#include "lib/task/ob_timer.h" namespace oceanbase { @@ -141,9 +143,21 @@ public: Ref* ref_[MAX_NWAY]; int64_t allocated_; }; + + class ObAdvanceClockTask : public common::ObTimerTask + { + public: + ObAdvanceClockTask(ObFifoArena &arena) : arena_(arena) {} + virtual ~ObAdvanceClockTask() {} + virtual void runTimerTask() { + arena_.advance_clock(); + } + private: + ObFifoArena &arena_; + }; public: enum { MAX_CACHED_GROUP_COUNT = 16, MAX_CACHED_PAGE_COUNT = MAX_CACHED_GROUP_COUNT * Handle::MAX_NWAY, PAGE_SIZE = OB_MALLOC_BIG_BLOCK_SIZE + sizeof(Page) + sizeof(Ref)}; - ObFifoArena(): allocator_(NULL), nway_(0), allocated_(0), reclaimed_(0), hold_(0), retired_(0), last_base_ts_(0), + ObFifoArena(): allocator_(NULL), nway_(0), allocated_(0), reclaimed_(0), hold_(0), retired_(0), max_seq_(0), clock_(0), advance_clock_timer_(), advance_clock_task_(*this), last_reclaimed_(0), lastest_memstore_threshold_(0) { memset(cur_pages_, 0, sizeof(cur_pages_)); } ~ObFifoArena() { reset(); } @@ -164,6 +178,8 @@ public: void set_memstore_threshold(int64_t memstore_threshold); bool need_do_writing_throttle() const; + bool check_clock_over_seq(const int64_t seq); + int64_t expected_wait_time(const int64_t seq) const; private: ObQSync& get_qs() { static ObQSync s_qs; @@ -209,15 +225,18 @@ private: void retire_page(int64_t way_id, Handle& handle, Page* ptr); void destroy_page(Page* page); void shrink_cached_page(int64_t nway); - void speed_limit(int64_t cur_mem_hold, int64_t alloc_size); - int64_t get_throttling_interval(int64_t cur_mem_hold, - int64_t alloc_size, - int64_t trigger_mem_limit); + void speed_limit(const int64_t cur_mem_hold, const int64_t alloc_size); + int64_t get_throttling_interval(const int64_t cur_mem_hold, + const int64_t alloc_size, + const int64_t trigger_mem_limit); + void advance_clock(); + int64_t calc_mem_limit(const int64_t cur_mem_hold, const int64_t trigger_mem_limit, const int64_t dt) const; int64_t get_actual_hold_size(Page* page); int64_t get_writing_throttling_trigger_percentage_() const; int64_t get_writing_throttling_maximum_duration_() const; private: static const int64_t MAX_WAIT_INTERVAL = 20 * 1000 * 1000;//20s + static const int64_t ADVANCE_CLOCK_INTERVAL = 200;// 200us static const int64_t MEM_SLICE_SIZE = 2 * 1024 * 1024; //Bytes per usecond static const int64_t MIN_INTERVAL = 20000; static const int64_t DEFAULT_TRIGGER_PERCENTAGE = 100; @@ -229,7 +248,15 @@ private: int64_t reclaimed_; int64_t hold_;//for single tenant int64_t retired_; - int64_t last_base_ts_; + + // typedef common::SpinRWLock RWLock; + // typedef common::SpinRLockGuard RLockGuard; + // typedef common::SpinWLockGuard WLockGuard; + // RWLock rwlock_; + int64_t max_seq_; + int64_t clock_; + common::ObTimer advance_clock_timer_; + ObAdvanceClockTask advance_clock_task_; int64_t last_reclaimed_; Page* cur_pages_[MAX_CACHED_PAGE_COUNT]; diff --git a/src/share/allocator/ob_gmemstore_allocator.h b/src/share/allocator/ob_gmemstore_allocator.h index 93c2c7f8b..747c3e60a 100644 --- a/src/share/allocator/ob_gmemstore_allocator.h +++ b/src/share/allocator/ob_gmemstore_allocator.h @@ -145,6 +145,14 @@ public: public: int set_memstore_threshold(uint64_t tenant_id); bool need_do_writing_throttle() const {return arena_.need_do_writing_throttle();} + bool check_clock_over_seq(int64_t seq) + { + return arena_.check_clock_over_seq(seq); + } + int64_t expected_wait_time(int64_t seq) const + { + return arena_.expected_wait_time(seq); + } int64_t get_retire_clock() const { return arena_.retired(); } bool exist_active_memtable_below_clock(const int64_t clock) const { return hlist_.hazard() < clock; diff --git a/src/storage/ob_storage_table_guard.cpp b/src/storage/ob_storage_table_guard.cpp index 5a320eb25..4d01505dd 100644 --- a/src/storage/ob_storage_table_guard.cpp +++ b/src/storage/ob_storage_table_guard.cpp @@ -45,53 +45,68 @@ ObStorageTableGuard::ObStorageTableGuard( replay_scn_(replay_scn), for_multi_source_data_(for_multi_source_data) { - get_writing_throttling_sleep_interval() = 0; + init_ts_ = ObTimeUtility::current_time(); } ObStorageTableGuard::~ObStorageTableGuard() { - uint32_t &interval = get_writing_throttling_sleep_interval(); - if (need_control_mem_ && interval > 0) { - uint64_t timeout = 10000;//10s - common::ObWaitEventGuard wait_guard(common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, interval); + bool &need_speed_limit = tl_need_speed_limit(); + if (need_control_mem_ && need_speed_limit) { bool need_sleep = true; - const int32_t SLEEP_INTERVAL_PER_TIME = 100 * 1000;//100ms - int64_t left_interval = interval; + int64_t left_interval = SPEED_LIMIT_MAX_SLEEP_TIME; if (!for_replay_) { - int64_t query_left_time = store_ctx_.timeout_ - ObTimeUtility::current_time(); - left_interval = common::min(left_interval, query_left_time); + left_interval = min(SPEED_LIMIT_MAX_SLEEP_TIME, store_ctx_.timeout_ - ObTimeUtility::current_time()); } if (NULL != memtable_) { need_sleep = memtable_->is_active_memtable(); } + uint64_t timeout = 10000;//10s + common::ObWaitEventGuard wait_guard(common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval); reset(); int tmp_ret = OB_SUCCESS; bool has_sleep = false; - - while ((left_interval > 0) && need_sleep) { - //because left_interval and SLEEP_INTERVAL_PER_TIME both are greater than - //zero, so it's safe to convert to uint32_t, be careful with comparation between int and uint - uint32_t sleep_interval = static_cast(min(left_interval, SLEEP_INTERVAL_PER_TIME)); - if (for_replay_) { - if(MTL(ObTenantFreezer *)->exist_ls_freezing()) { - break; - } - } - ob_usleep(sleep_interval); - has_sleep = true; - left_interval -= sleep_interval; - if (store_ctx_.mvcc_acc_ctx_.is_write()) { - ObGMemstoreAllocator* memstore_allocator = NULL; - if (OB_SUCCESS != (tmp_ret = ObMemstoreAllocatorMgr::get_instance().get_tenant_memstore_allocator( - MTL_ID(), memstore_allocator))) { - } else if (OB_ISNULL(memstore_allocator)) { - LOG_WARN("get_tenant_mutil_allocator failed", K(store_ctx_.tablet_id_), K(tmp_ret)); - } else { + int64_t sleep_time = 0; + int time = 0; + int64_t &seq = get_seq(); + if (store_ctx_.mvcc_acc_ctx_.is_write()) { + ObGMemstoreAllocator* memstore_allocator = NULL; + if (OB_SUCCESS != (tmp_ret = ObMemstoreAllocatorMgr::get_instance().get_tenant_memstore_allocator( + MTL_ID(), memstore_allocator))) { + } else if (OB_ISNULL(memstore_allocator)) { + LOG_WARN("get_tenant_mutil_allocator failed", K(store_ctx_.tablet_id_), K(tmp_ret)); + } else { + while (need_sleep && + !memstore_allocator->check_clock_over_seq(seq) && + (left_interval > 0)) { + if (for_replay_) { + if(MTL(ObTenantFreezer *)->exist_ls_freezing()) { + break; + } + } + //because left_interval and SLEEP_INTERVAL_PER_TIME both are greater than + //zero, so it's safe to convert to uint32_t, be careful with comparation between int and uint + int64_t expected_wait_time = memstore_allocator->expected_wait_time(seq); + if (expected_wait_time == 0) { + break; + } + uint32_t sleep_interval = + static_cast(min(min(left_interval, SLEEP_INTERVAL_PER_TIME), expected_wait_time)); + ob_usleep(sleep_interval); + sleep_time += sleep_interval; + time++; + left_interval -= sleep_interval; + has_sleep = true; need_sleep = memstore_allocator->need_do_writing_throttle(); } } } + + if (REACH_TIME_INTERVAL(100 * 1000L)) { + int64_t cost_time = ObTimeUtility::current_time() - init_ts_; + LOG_INFO("throttle situation", K(sleep_time), K(time), K(seq), K(for_replay_), K(cost_time)); + } + if (for_replay_ && has_sleep) { // avoid print replay_timeout get_replay_is_writing_throttling() = true; @@ -384,4 +399,4 @@ bool ObStorageTableGuard::check_if_need_log() return need_log; } } // namespace storage -} // namespace oceanbase +} // namespace oceanbase \ No newline at end of file diff --git a/src/storage/ob_storage_table_guard.h b/src/storage/ob_storage_table_guard.h index 38d098816..fbae49197 100644 --- a/src/storage/ob_storage_table_guard.h +++ b/src/storage/ob_storage_table_guard.h @@ -63,6 +63,8 @@ private: private: static const int64_t LOG_INTERVAL_US = 10 * 1000 * 1000; static const int64_t GET_TS_INTERVAL = 10 * 1000; + static const int64_t SPEED_LIMIT_MAX_SLEEP_TIME = 20 * 1000 * 1000; + static const int64_t SLEEP_INTERVAL_PER_TIME = 20 * 1000; ObTablet *tablet_; ObStoreCtx &store_ctx_; @@ -70,6 +72,8 @@ private: memtable::ObIMemtable *memtable_; int64_t retry_count_; int64_t last_ts_; + // record write latency + int64_t init_ts_; bool for_replay_; share::SCN replay_scn_; bool for_multi_source_data_;