[FIX] improve throttle logic
This commit is contained in:
@ -174,6 +174,11 @@ void ObTenantBufferCtxAllocator::free(void *ptr)
|
||||
ObMdsThrottleGuard::ObMdsThrottleGuard(const bool for_replay, const int64_t abs_expire_time) : for_replay_(for_replay), abs_expire_time_(abs_expire_time)
|
||||
{
|
||||
throttle_tool_ = &(MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool());
|
||||
if (0 == abs_expire_time) {
|
||||
abs_expire_time_ =
|
||||
ObClockGenerator::getClock() + ObThrottleUnit<ObMdsThrottleGuard>::DEFAULT_MAX_THROTTLE_TIME;
|
||||
}
|
||||
share::mds_throttled_alloc() = 0;
|
||||
}
|
||||
|
||||
ObMdsThrottleGuard::~ObMdsThrottleGuard()
|
||||
@ -184,23 +189,16 @@ ObMdsThrottleGuard::~ObMdsThrottleGuard()
|
||||
if (OB_ISNULL(throttle_tool_)) {
|
||||
MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "throttle tool is unexpected nullptr", KP(throttle_tool_));
|
||||
} else if (throttle_tool_->is_throttling<ObTenantMdsAllocator>(share_ti_guard, module_ti_guard)) {
|
||||
if (MTL(ObTenantFreezer *)->exist_ls_freezing()) {
|
||||
(void)TxShareMemThrottleUtil::do_throttle<ObTenantMdsAllocator>(
|
||||
for_replay_, abs_expire_time_, *throttle_tool_, share_ti_guard, module_ti_guard);
|
||||
|
||||
if (throttle_tool_->still_throttling<ObTenantMdsAllocator>(share_ti_guard, module_ti_guard)) {
|
||||
(void)throttle_tool_->skip_throttle<ObTenantMdsAllocator>(
|
||||
share::mds_throttled_alloc(), share_ti_guard, module_ti_guard);
|
||||
|
||||
if (module_ti_guard.is_valid()) {
|
||||
module_ti_guard.throttle_info()->reset();
|
||||
}
|
||||
} else {
|
||||
uint64_t timeout = 10000; // 10s
|
||||
int64_t left_interval = abs_expire_time_ - ObClockGenerator::getClock();
|
||||
common::ObWaitEventGuard wait_guard(
|
||||
common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval);
|
||||
(void)throttle_tool_->do_throttle<ObTenantMdsAllocator>(abs_expire_time_);
|
||||
|
||||
if (for_replay_) {
|
||||
get_replay_is_writing_throttling() = true;
|
||||
}
|
||||
}
|
||||
|
||||
// reset mds throttled alloc size
|
||||
|
||||
@ -17,6 +17,8 @@
|
||||
#include "share/allocator/ob_tx_data_allocator.h"
|
||||
#include "share/allocator/ob_mds_allocator.h"
|
||||
#include "share/throttle/ob_share_resource_throttle_tool.h"
|
||||
#include "share/rc/ob_tenant_base.h"
|
||||
#include "storage/tx_storage/ob_tenant_freezer.h"
|
||||
|
||||
namespace oceanbase {
|
||||
namespace share {
|
||||
@ -78,6 +80,78 @@ private:
|
||||
ObTenantMdsAllocator mds_allocator_;
|
||||
};
|
||||
|
||||
class TxShareMemThrottleUtil
|
||||
{
|
||||
public:
|
||||
static const int64_t SLEEP_INTERVAL_PER_TIME = 20 * 1000; // 20ms;
|
||||
template <typename ALLOCATOR>
|
||||
static int do_throttle(const bool for_replay,
|
||||
const int64_t abs_expire_time,
|
||||
TxShareThrottleTool &throttle_tool,
|
||||
ObThrottleInfoGuard &share_ti_guard,
|
||||
ObThrottleInfoGuard &module_ti_guard)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool has_printed_lbt = false;
|
||||
int64_t sleep_time = 0;
|
||||
int64_t left_interval = share::ObThrottleUnit<ALLOCATOR>::DEFAULT_MAX_THROTTLE_TIME;
|
||||
|
||||
if (!for_replay) {
|
||||
left_interval = min(left_interval, abs_expire_time - ObClockGenerator::getClock());
|
||||
}
|
||||
|
||||
uint64_t timeout = 10000; // 10s
|
||||
common::ObWaitEventGuard wait_guard(
|
||||
common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval);
|
||||
|
||||
while (throttle_tool.still_throttling<ALLOCATOR>(share_ti_guard, module_ti_guard) &&
|
||||
(left_interval > 0)) {
|
||||
int64_t expected_wait_time = 0;
|
||||
if (for_replay && MTL(ObTenantFreezer *)->exist_ls_freezing()) {
|
||||
// skip throttle if ls freeze exists
|
||||
break;
|
||||
} else if ((expected_wait_time =
|
||||
throttle_tool.expected_wait_time<ALLOCATOR>(share_ti_guard, module_ti_guard)) <= 0) {
|
||||
if (expected_wait_time < 0) {
|
||||
SHARE_LOG(ERROR,
|
||||
"expected wait time should not smaller than 0",
|
||||
K(expected_wait_time),
|
||||
KPC(share_ti_guard.throttle_info()),
|
||||
KPC(module_ti_guard.throttle_info()),
|
||||
K(clock),
|
||||
K(left_interval));
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// do sleep when expected_wait_time and left_interval are not equal to 0
|
||||
int64_t sleep_interval = min(SLEEP_INTERVAL_PER_TIME, expected_wait_time);
|
||||
if (0 < sleep_interval) {
|
||||
sleep_time += sleep_interval;
|
||||
left_interval -= sleep_interval;
|
||||
::usleep(sleep_interval);
|
||||
}
|
||||
|
||||
PrintThrottleUtil::pirnt_throttle_info(ret,
|
||||
ALLOCATOR::throttle_unit_name(),
|
||||
sleep_time,
|
||||
left_interval,
|
||||
expected_wait_time,
|
||||
abs_expire_time,
|
||||
share_ti_guard,
|
||||
module_ti_guard,
|
||||
has_printed_lbt);
|
||||
}
|
||||
PrintThrottleUtil::print_throttle_statistic(ret, ALLOCATOR::throttle_unit_name(), sleep_time);
|
||||
|
||||
if (for_replay && sleep_time > 0) {
|
||||
// avoid print replay_timeout
|
||||
get_replay_is_writing_throttling() = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace share
|
||||
} // namespace oceanbase
|
||||
|
||||
|
||||
@ -111,6 +111,11 @@ ObTxDataThrottleGuard::ObTxDataThrottleGuard(const bool for_replay, const int64_
|
||||
: for_replay_(for_replay), abs_expire_time_(abs_expire_time)
|
||||
{
|
||||
throttle_tool_ = &(MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool());
|
||||
if (0 == abs_expire_time) {
|
||||
abs_expire_time_ =
|
||||
ObClockGenerator::getClock() + ObThrottleUnit<ObTenantTxDataAllocator>::DEFAULT_MAX_THROTTLE_TIME;
|
||||
}
|
||||
share::tx_data_throttled_alloc() = 0;
|
||||
}
|
||||
|
||||
ObTxDataThrottleGuard::~ObTxDataThrottleGuard()
|
||||
@ -121,23 +126,16 @@ ObTxDataThrottleGuard::~ObTxDataThrottleGuard()
|
||||
if (OB_ISNULL(throttle_tool_)) {
|
||||
MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "throttle tool is unexpected nullptr", KP(throttle_tool_));
|
||||
} else if (throttle_tool_->is_throttling<ObTenantTxDataAllocator>(share_ti_guard, module_ti_guard)) {
|
||||
if (MTL(ObTenantFreezer *)->exist_ls_freezing()) {
|
||||
(void)TxShareMemThrottleUtil::do_throttle<ObTenantTxDataAllocator>(
|
||||
for_replay_, abs_expire_time_, *throttle_tool_, share_ti_guard, module_ti_guard);
|
||||
|
||||
if (throttle_tool_->still_throttling<ObTenantTxDataAllocator>(share_ti_guard, module_ti_guard)) {
|
||||
(void)throttle_tool_->skip_throttle<ObTenantTxDataAllocator>(
|
||||
share::tx_data_throttled_alloc(), share_ti_guard, module_ti_guard);
|
||||
|
||||
if (module_ti_guard.is_valid()) {
|
||||
module_ti_guard.throttle_info()->reset();
|
||||
}
|
||||
} else {
|
||||
uint64_t timeout = 10000; // 10s
|
||||
int64_t left_interval = abs_expire_time_ - ObClockGenerator::getClock();
|
||||
common::ObWaitEventGuard wait_guard(
|
||||
common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval);
|
||||
(void)throttle_tool_->do_throttle<ObTenantTxDataAllocator>(abs_expire_time_);
|
||||
|
||||
if (for_replay_) {
|
||||
get_replay_is_writing_throttling() = true;
|
||||
}
|
||||
}
|
||||
|
||||
// reset tx data throttled alloc size
|
||||
|
||||
Reference in New Issue
Block a user