BUGFIX: write throttle sleep as expected after freeze
This commit is contained in:
@ -21,6 +21,7 @@
|
|||||||
#include "observer/omt/ob_tenant_config_mgr.h"
|
#include "observer/omt/ob_tenant_config_mgr.h"
|
||||||
#include "lib/alloc/alloc_struct.h"
|
#include "lib/alloc/alloc_struct.h"
|
||||||
#include "lib/stat/ob_diagnose_info.h"
|
#include "lib/stat/ob_diagnose_info.h"
|
||||||
|
#include "share/throttle/ob_throttle_common.h"
|
||||||
|
|
||||||
using namespace oceanbase::lib;
|
using namespace oceanbase::lib;
|
||||||
using namespace oceanbase::omt;
|
using namespace oceanbase::omt;
|
||||||
@ -315,6 +316,7 @@ void ObFifoArena::speed_limit(const int64_t cur_mem_hold, const int64_t alloc_si
|
|||||||
advance_clock();
|
advance_clock();
|
||||||
get_seq() = seq;
|
get_seq() = seq;
|
||||||
tl_need_speed_limit() = need_speed_limit;
|
tl_need_speed_limit() = need_speed_limit;
|
||||||
|
share::get_thread_alloc_stat() += alloc_size;
|
||||||
|
|
||||||
if (need_speed_limit && REACH_TIME_INTERVAL(1 * 1000 * 1000L)) {
|
if (need_speed_limit && REACH_TIME_INTERVAL(1 * 1000 * 1000L)) {
|
||||||
COMMON_LOG(INFO, "report write throttle info", K(alloc_size), K(attr_), K(throttling_interval),
|
COMMON_LOG(INFO, "report write throttle info", K(alloc_size), K(attr_), K(throttling_interval),
|
||||||
@ -337,6 +339,22 @@ int64_t ObFifoArena::get_clock()
|
|||||||
return clock_;
|
return clock_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ObFifoArena::skip_clock(const int64_t skip_size)
|
||||||
|
{
|
||||||
|
int64_t ov = 0;
|
||||||
|
int64_t nv = ATOMIC_LOAD(&clock_);
|
||||||
|
while ((ov = nv) < ATOMIC_LOAD(&max_seq_)
|
||||||
|
&& ov != (nv = ATOMIC_CAS(&clock_, ov, min(ATOMIC_LOAD(&max_seq_), ov + skip_size)))) {
|
||||||
|
PAUSE();
|
||||||
|
if (REACH_TIME_INTERVAL(100 * 1000L)) {
|
||||||
|
const int64_t max_seq = ATOMIC_LOAD(&max_seq_);
|
||||||
|
const int64_t cur_mem_hold = ATOMIC_LOAD(&hold_);
|
||||||
|
COMMON_LOG(INFO, "skip clock",
|
||||||
|
K(clock_), K(max_seq_), K(skip_size), K(cur_mem_hold), K(attr_.tenant_id_));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void ObFifoArena::advance_clock()
|
void ObFifoArena::advance_clock()
|
||||||
{
|
{
|
||||||
int64_t cur_ts = ObTimeUtility::current_time();
|
int64_t cur_ts = ObTimeUtility::current_time();
|
||||||
|
|||||||
@ -172,6 +172,7 @@ public:
|
|||||||
bool check_clock_over_seq(const int64_t seq);
|
bool check_clock_over_seq(const int64_t seq);
|
||||||
int64_t get_clock();
|
int64_t get_clock();
|
||||||
int64_t expected_wait_time(const int64_t seq) const;
|
int64_t expected_wait_time(const int64_t seq) const;
|
||||||
|
void skip_clock(const int64_t skip_size);
|
||||||
int64_t get_max_cached_memstore_size() const
|
int64_t get_max_cached_memstore_size() const
|
||||||
{
|
{
|
||||||
return MAX_CACHED_GROUP_COUNT * ATOMIC_LOAD(&nway_) * (PAGE_SIZE + ACHUNK_PRESERVE_SIZE);
|
return MAX_CACHED_GROUP_COUNT * ATOMIC_LOAD(&nway_) * (PAGE_SIZE + ACHUNK_PRESERVE_SIZE);
|
||||||
|
|||||||
@ -182,6 +182,10 @@ public:
|
|||||||
{
|
{
|
||||||
return arena_.expected_wait_time(seq);
|
return arena_.expected_wait_time(seq);
|
||||||
}
|
}
|
||||||
|
void skip_clock(const int64_t skip_size)
|
||||||
|
{
|
||||||
|
arena_.skip_clock(skip_size);
|
||||||
|
}
|
||||||
private:
|
private:
|
||||||
int64_t nway_per_group();
|
int64_t nway_per_group();
|
||||||
int set_memstore_threshold_without_lock(uint64_t tenant_id);
|
int set_memstore_threshold_without_lock(uint64_t tenant_id);
|
||||||
|
|||||||
@ -80,6 +80,13 @@ OB_INLINE ObThrottleStat &get_throttle_stat()
|
|||||||
return throttle_stat_;
|
return throttle_stat_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// record the alloc size of current thread
|
||||||
|
OB_INLINE int64_t &get_thread_alloc_stat()
|
||||||
|
{
|
||||||
|
RLOCAL_INLINE(int64_t, allock_stat);
|
||||||
|
return allock_stat;
|
||||||
|
}
|
||||||
|
|
||||||
} // end namespace share
|
} // end namespace share
|
||||||
} // end namespace oceanbase
|
} // end namespace oceanbase
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@ -47,6 +47,7 @@ ObStorageTableGuard::ObStorageTableGuard(
|
|||||||
for_multi_source_data_(for_multi_source_data)
|
for_multi_source_data_(for_multi_source_data)
|
||||||
{
|
{
|
||||||
init_ts_ = ObTimeUtility::current_time();
|
init_ts_ = ObTimeUtility::current_time();
|
||||||
|
get_thread_alloc_stat() = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObStorageTableGuard::~ObStorageTableGuard()
|
ObStorageTableGuard::~ObStorageTableGuard()
|
||||||
@ -122,6 +123,11 @@ ObStorageTableGuard::~ObStorageTableGuard()
|
|||||||
has_sleep = true;
|
has_sleep = true;
|
||||||
need_sleep = memstore_allocator->need_do_writing_throttle();
|
need_sleep = memstore_allocator->need_do_writing_throttle();
|
||||||
}
|
}
|
||||||
|
const int64_t finish_clock = memstore_allocator->get_clock();
|
||||||
|
if (finish_clock < seq) { // we has skip some time, need make the clock skip too.
|
||||||
|
const int64_t skip_clock = MIN(seq - finish_clock, get_thread_alloc_stat());
|
||||||
|
memstore_allocator->skip_clock(skip_clock);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (REACH_TIME_INTERVAL(100 * 1000L) &&
|
if (REACH_TIME_INTERVAL(100 * 1000L) &&
|
||||||
|
|||||||
Reference in New Issue
Block a user