diff --git a/src/share/allocator/ob_fifo_arena.cpp b/src/share/allocator/ob_fifo_arena.cpp index 7b17718d9b..89a9d1f8c7 100644 --- a/src/share/allocator/ob_fifo_arena.cpp +++ b/src/share/allocator/ob_fifo_arena.cpp @@ -21,6 +21,7 @@ #include "observer/omt/ob_tenant_config_mgr.h" #include "lib/alloc/alloc_struct.h" #include "lib/stat/ob_diagnose_info.h" +#include "share/throttle/ob_throttle_common.h" using namespace oceanbase::lib; using namespace oceanbase::omt; @@ -315,6 +316,7 @@ void ObFifoArena::speed_limit(const int64_t cur_mem_hold, const int64_t alloc_si advance_clock(); get_seq() = seq; tl_need_speed_limit() = need_speed_limit; + share::get_thread_alloc_stat() += alloc_size; if (need_speed_limit && REACH_TIME_INTERVAL(1 * 1000 * 1000L)) { COMMON_LOG(INFO, "report write throttle info", K(alloc_size), K(attr_), K(throttling_interval), @@ -337,6 +339,22 @@ int64_t ObFifoArena::get_clock() return clock_; } +void ObFifoArena::skip_clock(const int64_t skip_size) +{ + int64_t ov = 0; + int64_t nv = ATOMIC_LOAD(&clock_); + while ((ov = nv) < ATOMIC_LOAD(&max_seq_) + && ov != (nv = ATOMIC_CAS(&clock_, ov, min(ATOMIC_LOAD(&max_seq_), ov + skip_size)))) { + PAUSE(); + if (REACH_TIME_INTERVAL(100 * 1000L)) { + const int64_t max_seq = ATOMIC_LOAD(&max_seq_); + const int64_t cur_mem_hold = ATOMIC_LOAD(&hold_); + COMMON_LOG(INFO, "skip clock", + K(clock_), K(max_seq_), K(skip_size), K(cur_mem_hold), K(attr_.tenant_id_)); + } + } +} + void ObFifoArena::advance_clock() { int64_t cur_ts = ObTimeUtility::current_time(); diff --git a/src/share/allocator/ob_fifo_arena.h b/src/share/allocator/ob_fifo_arena.h index 71c3f00f69..0b3e260ca5 100644 --- a/src/share/allocator/ob_fifo_arena.h +++ b/src/share/allocator/ob_fifo_arena.h @@ -172,6 +172,7 @@ public: bool check_clock_over_seq(const int64_t seq); int64_t get_clock(); int64_t expected_wait_time(const int64_t seq) const; + void skip_clock(const int64_t skip_size); int64_t get_max_cached_memstore_size() const { return MAX_CACHED_GROUP_COUNT * ATOMIC_LOAD(&nway_) * (PAGE_SIZE + ACHUNK_PRESERVE_SIZE); diff --git a/src/share/allocator/ob_gmemstore_allocator.h b/src/share/allocator/ob_gmemstore_allocator.h index be4300aed9..ff52631b7c 100644 --- a/src/share/allocator/ob_gmemstore_allocator.h +++ b/src/share/allocator/ob_gmemstore_allocator.h @@ -182,6 +182,10 @@ public: { return arena_.expected_wait_time(seq); } + void skip_clock(const int64_t skip_size) + { + arena_.skip_clock(skip_size); + } private: int64_t nway_per_group(); int set_memstore_threshold_without_lock(uint64_t tenant_id); diff --git a/src/share/throttle/ob_throttle_common.h b/src/share/throttle/ob_throttle_common.h index 5f7245884e..8eeaf530ad 100644 --- a/src/share/throttle/ob_throttle_common.h +++ b/src/share/throttle/ob_throttle_common.h @@ -80,6 +80,13 @@ OB_INLINE ObThrottleStat &get_throttle_stat() return throttle_stat_; } +// record the alloc size of current thread +OB_INLINE int64_t &get_thread_alloc_stat() +{ + RLOCAL_INLINE(int64_t, allock_stat); + return allock_stat; +} + } // end namespace share } // end namespace oceanbase #endif diff --git a/src/storage/ob_storage_table_guard.cpp b/src/storage/ob_storage_table_guard.cpp index de59de23f8..d0ee435557 100644 --- a/src/storage/ob_storage_table_guard.cpp +++ b/src/storage/ob_storage_table_guard.cpp @@ -47,6 +47,7 @@ ObStorageTableGuard::ObStorageTableGuard( for_multi_source_data_(for_multi_source_data) { init_ts_ = ObTimeUtility::current_time(); + get_thread_alloc_stat() = 0; } ObStorageTableGuard::~ObStorageTableGuard() @@ -122,6 +123,11 @@ ObStorageTableGuard::~ObStorageTableGuard() has_sleep = true; need_sleep = memstore_allocator->need_do_writing_throttle(); } + const int64_t finish_clock = memstore_allocator->get_clock(); + if (finish_clock < seq) { // we has skip some time, need make the clock skip too. + const int64_t skip_clock = MIN(seq - finish_clock, get_thread_alloc_stat()); + memstore_allocator->skip_clock(skip_clock); + } } if (REACH_TIME_INTERVAL(100 * 1000L) &&