[FIX] improve throttle logic
This commit is contained in:
@ -174,6 +174,11 @@ void ObTenantBufferCtxAllocator::free(void *ptr)
|
|||||||
ObMdsThrottleGuard::ObMdsThrottleGuard(const bool for_replay, const int64_t abs_expire_time) : for_replay_(for_replay), abs_expire_time_(abs_expire_time)
|
ObMdsThrottleGuard::ObMdsThrottleGuard(const bool for_replay, const int64_t abs_expire_time) : for_replay_(for_replay), abs_expire_time_(abs_expire_time)
|
||||||
{
|
{
|
||||||
throttle_tool_ = &(MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool());
|
throttle_tool_ = &(MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool());
|
||||||
|
if (0 == abs_expire_time) {
|
||||||
|
abs_expire_time_ =
|
||||||
|
ObClockGenerator::getClock() + ObThrottleUnit<ObMdsThrottleGuard>::DEFAULT_MAX_THROTTLE_TIME;
|
||||||
|
}
|
||||||
|
share::mds_throttled_alloc() = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObMdsThrottleGuard::~ObMdsThrottleGuard()
|
ObMdsThrottleGuard::~ObMdsThrottleGuard()
|
||||||
@ -184,23 +189,16 @@ ObMdsThrottleGuard::~ObMdsThrottleGuard()
|
|||||||
if (OB_ISNULL(throttle_tool_)) {
|
if (OB_ISNULL(throttle_tool_)) {
|
||||||
MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "throttle tool is unexpected nullptr", KP(throttle_tool_));
|
MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "throttle tool is unexpected nullptr", KP(throttle_tool_));
|
||||||
} else if (throttle_tool_->is_throttling<ObTenantMdsAllocator>(share_ti_guard, module_ti_guard)) {
|
} else if (throttle_tool_->is_throttling<ObTenantMdsAllocator>(share_ti_guard, module_ti_guard)) {
|
||||||
if (MTL(ObTenantFreezer *)->exist_ls_freezing()) {
|
(void)TxShareMemThrottleUtil::do_throttle<ObTenantMdsAllocator>(
|
||||||
|
for_replay_, abs_expire_time_, *throttle_tool_, share_ti_guard, module_ti_guard);
|
||||||
|
|
||||||
|
if (throttle_tool_->still_throttling<ObTenantMdsAllocator>(share_ti_guard, module_ti_guard)) {
|
||||||
(void)throttle_tool_->skip_throttle<ObTenantMdsAllocator>(
|
(void)throttle_tool_->skip_throttle<ObTenantMdsAllocator>(
|
||||||
share::mds_throttled_alloc(), share_ti_guard, module_ti_guard);
|
share::mds_throttled_alloc(), share_ti_guard, module_ti_guard);
|
||||||
|
|
||||||
if (module_ti_guard.is_valid()) {
|
if (module_ti_guard.is_valid()) {
|
||||||
module_ti_guard.throttle_info()->reset();
|
module_ti_guard.throttle_info()->reset();
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
uint64_t timeout = 10000; // 10s
|
|
||||||
int64_t left_interval = abs_expire_time_ - ObClockGenerator::getClock();
|
|
||||||
common::ObWaitEventGuard wait_guard(
|
|
||||||
common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval);
|
|
||||||
(void)throttle_tool_->do_throttle<ObTenantMdsAllocator>(abs_expire_time_);
|
|
||||||
|
|
||||||
if (for_replay_) {
|
|
||||||
get_replay_is_writing_throttling() = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset mds throttled alloc size
|
// reset mds throttled alloc size
|
||||||
|
@ -17,6 +17,8 @@
|
|||||||
#include "share/allocator/ob_tx_data_allocator.h"
|
#include "share/allocator/ob_tx_data_allocator.h"
|
||||||
#include "share/allocator/ob_mds_allocator.h"
|
#include "share/allocator/ob_mds_allocator.h"
|
||||||
#include "share/throttle/ob_share_resource_throttle_tool.h"
|
#include "share/throttle/ob_share_resource_throttle_tool.h"
|
||||||
|
#include "share/rc/ob_tenant_base.h"
|
||||||
|
#include "storage/tx_storage/ob_tenant_freezer.h"
|
||||||
|
|
||||||
namespace oceanbase {
|
namespace oceanbase {
|
||||||
namespace share {
|
namespace share {
|
||||||
@ -78,6 +80,78 @@ private:
|
|||||||
ObTenantMdsAllocator mds_allocator_;
|
ObTenantMdsAllocator mds_allocator_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class TxShareMemThrottleUtil
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
static const int64_t SLEEP_INTERVAL_PER_TIME = 20 * 1000; // 20ms;
|
||||||
|
template <typename ALLOCATOR>
|
||||||
|
static int do_throttle(const bool for_replay,
|
||||||
|
const int64_t abs_expire_time,
|
||||||
|
TxShareThrottleTool &throttle_tool,
|
||||||
|
ObThrottleInfoGuard &share_ti_guard,
|
||||||
|
ObThrottleInfoGuard &module_ti_guard)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
bool has_printed_lbt = false;
|
||||||
|
int64_t sleep_time = 0;
|
||||||
|
int64_t left_interval = share::ObThrottleUnit<ALLOCATOR>::DEFAULT_MAX_THROTTLE_TIME;
|
||||||
|
|
||||||
|
if (!for_replay) {
|
||||||
|
left_interval = min(left_interval, abs_expire_time - ObClockGenerator::getClock());
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t timeout = 10000; // 10s
|
||||||
|
common::ObWaitEventGuard wait_guard(
|
||||||
|
common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval);
|
||||||
|
|
||||||
|
while (throttle_tool.still_throttling<ALLOCATOR>(share_ti_guard, module_ti_guard) &&
|
||||||
|
(left_interval > 0)) {
|
||||||
|
int64_t expected_wait_time = 0;
|
||||||
|
if (for_replay && MTL(ObTenantFreezer *)->exist_ls_freezing()) {
|
||||||
|
// skip throttle if ls freeze exists
|
||||||
|
break;
|
||||||
|
} else if ((expected_wait_time =
|
||||||
|
throttle_tool.expected_wait_time<ALLOCATOR>(share_ti_guard, module_ti_guard)) <= 0) {
|
||||||
|
if (expected_wait_time < 0) {
|
||||||
|
SHARE_LOG(ERROR,
|
||||||
|
"expected wait time should not smaller than 0",
|
||||||
|
K(expected_wait_time),
|
||||||
|
KPC(share_ti_guard.throttle_info()),
|
||||||
|
KPC(module_ti_guard.throttle_info()),
|
||||||
|
K(clock),
|
||||||
|
K(left_interval));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// do sleep when expected_wait_time and left_interval are not equal to 0
|
||||||
|
int64_t sleep_interval = min(SLEEP_INTERVAL_PER_TIME, expected_wait_time);
|
||||||
|
if (0 < sleep_interval) {
|
||||||
|
sleep_time += sleep_interval;
|
||||||
|
left_interval -= sleep_interval;
|
||||||
|
::usleep(sleep_interval);
|
||||||
|
}
|
||||||
|
|
||||||
|
PrintThrottleUtil::pirnt_throttle_info(ret,
|
||||||
|
ALLOCATOR::throttle_unit_name(),
|
||||||
|
sleep_time,
|
||||||
|
left_interval,
|
||||||
|
expected_wait_time,
|
||||||
|
abs_expire_time,
|
||||||
|
share_ti_guard,
|
||||||
|
module_ti_guard,
|
||||||
|
has_printed_lbt);
|
||||||
|
}
|
||||||
|
PrintThrottleUtil::print_throttle_statistic(ret, ALLOCATOR::throttle_unit_name(), sleep_time);
|
||||||
|
|
||||||
|
if (for_replay && sleep_time > 0) {
|
||||||
|
// avoid print replay_timeout
|
||||||
|
get_replay_is_writing_throttling() = true;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace share
|
} // namespace share
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
||||||
|
|
||||||
|
@ -111,6 +111,11 @@ ObTxDataThrottleGuard::ObTxDataThrottleGuard(const bool for_replay, const int64_
|
|||||||
: for_replay_(for_replay), abs_expire_time_(abs_expire_time)
|
: for_replay_(for_replay), abs_expire_time_(abs_expire_time)
|
||||||
{
|
{
|
||||||
throttle_tool_ = &(MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool());
|
throttle_tool_ = &(MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool());
|
||||||
|
if (0 == abs_expire_time) {
|
||||||
|
abs_expire_time_ =
|
||||||
|
ObClockGenerator::getClock() + ObThrottleUnit<ObTenantTxDataAllocator>::DEFAULT_MAX_THROTTLE_TIME;
|
||||||
|
}
|
||||||
|
share::tx_data_throttled_alloc() = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObTxDataThrottleGuard::~ObTxDataThrottleGuard()
|
ObTxDataThrottleGuard::~ObTxDataThrottleGuard()
|
||||||
@ -121,23 +126,16 @@ ObTxDataThrottleGuard::~ObTxDataThrottleGuard()
|
|||||||
if (OB_ISNULL(throttle_tool_)) {
|
if (OB_ISNULL(throttle_tool_)) {
|
||||||
MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "throttle tool is unexpected nullptr", KP(throttle_tool_));
|
MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "throttle tool is unexpected nullptr", KP(throttle_tool_));
|
||||||
} else if (throttle_tool_->is_throttling<ObTenantTxDataAllocator>(share_ti_guard, module_ti_guard)) {
|
} else if (throttle_tool_->is_throttling<ObTenantTxDataAllocator>(share_ti_guard, module_ti_guard)) {
|
||||||
if (MTL(ObTenantFreezer *)->exist_ls_freezing()) {
|
(void)TxShareMemThrottleUtil::do_throttle<ObTenantTxDataAllocator>(
|
||||||
|
for_replay_, abs_expire_time_, *throttle_tool_, share_ti_guard, module_ti_guard);
|
||||||
|
|
||||||
|
if (throttle_tool_->still_throttling<ObTenantTxDataAllocator>(share_ti_guard, module_ti_guard)) {
|
||||||
(void)throttle_tool_->skip_throttle<ObTenantTxDataAllocator>(
|
(void)throttle_tool_->skip_throttle<ObTenantTxDataAllocator>(
|
||||||
share::tx_data_throttled_alloc(), share_ti_guard, module_ti_guard);
|
share::tx_data_throttled_alloc(), share_ti_guard, module_ti_guard);
|
||||||
|
|
||||||
if (module_ti_guard.is_valid()) {
|
if (module_ti_guard.is_valid()) {
|
||||||
module_ti_guard.throttle_info()->reset();
|
module_ti_guard.throttle_info()->reset();
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
uint64_t timeout = 10000; // 10s
|
|
||||||
int64_t left_interval = abs_expire_time_ - ObClockGenerator::getClock();
|
|
||||||
common::ObWaitEventGuard wait_guard(
|
|
||||||
common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval);
|
|
||||||
(void)throttle_tool_->do_throttle<ObTenantTxDataAllocator>(abs_expire_time_);
|
|
||||||
|
|
||||||
if (for_replay_) {
|
|
||||||
get_replay_is_writing_throttling() = true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset tx data throttled alloc size
|
// reset tx data throttled alloc size
|
||||||
|
@ -41,7 +41,7 @@ template <typename FakeAllocator, typename ...Args>
|
|||||||
class ObShareResourceThrottleTool
|
class ObShareResourceThrottleTool
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
static const uint32_t DEFAULT_THROTTLE_SLEEP_INTERVAL = 20 * 1000;
|
static const int64_t DEFAULT_THROTTLE_SLEEP_INTERVAL = 20 * 1000;
|
||||||
private: // Functors for ShareResourceThrottleTool
|
private: // Functors for ShareResourceThrottleTool
|
||||||
struct SumModuleHoldResourceFunctor {
|
struct SumModuleHoldResourceFunctor {
|
||||||
int64_t sum_;
|
int64_t sum_;
|
||||||
|
@ -190,43 +190,6 @@ int64_t ObShareResourceThrottleTool<FakeAllocator, Args...>::expected_wait_time(
|
|||||||
return expected_wait_time;
|
return expected_wait_time;
|
||||||
}
|
}
|
||||||
|
|
||||||
#define PRINT_THROTTLE_WARN \
|
|
||||||
do { \
|
|
||||||
const int64_t WARN_LOG_INTERVAL = 1LL * 60L * 1000L * 1000L /* one minute */; \
|
|
||||||
if (sleep_time > (WARN_LOG_INTERVAL) && TC_REACH_TIME_INTERVAL(WARN_LOG_INTERVAL)) { \
|
|
||||||
SHARE_LOG(WARN, \
|
|
||||||
"[Throttling] Attention!! Sleep More Than One Minute!!", \
|
|
||||||
"Throttle Unit Name", \
|
|
||||||
ALLOCATOR::throttle_unit_name(), \
|
|
||||||
K(sleep_time), \
|
|
||||||
K(left_interval), \
|
|
||||||
K(expected_wait_t), \
|
|
||||||
K(abs_expire_time)); \
|
|
||||||
if (!has_printed_lbt) { \
|
|
||||||
has_printed_lbt = true; \
|
|
||||||
oceanbase::share::ObTaskController::get().allow_next_syslog(); \
|
|
||||||
SHARE_LOG(INFO, \
|
|
||||||
"[Throttling] (report write throttle info) LBT Info", \
|
|
||||||
"Throttle Unit Name", \
|
|
||||||
ALLOCATOR::throttle_unit_name(), \
|
|
||||||
K(lbt())); \
|
|
||||||
} \
|
|
||||||
} \
|
|
||||||
} while (0)
|
|
||||||
|
|
||||||
#define PRINT_THROTTLE_STATISTIC \
|
|
||||||
do { \
|
|
||||||
const int64_t MEMSTORE_THROTTLE_LOG_INTERVAL = 1L * 1000L * 1000L; /*one seconds*/ \
|
|
||||||
if (sleep_time > 0 && REACH_TIME_INTERVAL(MEMSTORE_THROTTLE_LOG_INTERVAL)) { \
|
|
||||||
SHARE_LOG(INFO, \
|
|
||||||
"[Throttling] (report write throttle info) Time Info", \
|
|
||||||
"Throttle Unit Name", \
|
|
||||||
ALLOCATOR::throttle_unit_name(), \
|
|
||||||
"Throttle Sleep Time(us)", \
|
|
||||||
sleep_time); \
|
|
||||||
} \
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
template <typename FakeAllocator, typename... Args>
|
template <typename FakeAllocator, typename... Args>
|
||||||
template <typename ALLOCATOR>
|
template <typename ALLOCATOR>
|
||||||
void ObShareResourceThrottleTool<FakeAllocator, Args...>::do_throttle(const int64_t abs_expire_time)
|
void ObShareResourceThrottleTool<FakeAllocator, Args...>::do_throttle(const int64_t abs_expire_time)
|
||||||
@ -256,20 +219,27 @@ void ObShareResourceThrottleTool<FakeAllocator, Args...>::do_throttle(const int6
|
|||||||
module_ti_guard.throttle_info()->reset();
|
module_ti_guard.throttle_info()->reset();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
uint32_t sleep_interval = min(DEFAULT_THROTTLE_SLEEP_INTERVAL, (uint32_t)expected_wait_t);
|
int64_t sleep_interval = min(DEFAULT_THROTTLE_SLEEP_INTERVAL, expected_wait_t);
|
||||||
sleep_time += sleep_interval;
|
if (0 < sleep_interval) {
|
||||||
left_interval -= sleep_interval;
|
sleep_time += sleep_interval;
|
||||||
::usleep(sleep_interval);
|
left_interval -= sleep_interval;
|
||||||
|
::usleep(sleep_interval);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
PRINT_THROTTLE_WARN;
|
PrintThrottleUtil::pirnt_throttle_info(ret,
|
||||||
|
ALLOCATOR::throttle_unit_name(),
|
||||||
|
sleep_time,
|
||||||
|
left_interval,
|
||||||
|
expected_wait_t,
|
||||||
|
abs_expire_time,
|
||||||
|
share_ti_guard,
|
||||||
|
module_ti_guard,
|
||||||
|
has_printed_lbt);
|
||||||
}
|
}
|
||||||
PRINT_THROTTLE_STATISTIC;
|
PrintThrottleUtil::print_throttle_statistic(ret, ALLOCATOR::throttle_unit_name(), sleep_time);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#undef PRINT_THROTTLE_WARN
|
|
||||||
#undef PRINT_THROTTLE_STATISTIC
|
|
||||||
|
|
||||||
template <typename FakeAllocator, typename... Args>
|
template <typename FakeAllocator, typename... Args>
|
||||||
template <typename ALLOCATOR>
|
template <typename ALLOCATOR>
|
||||||
void ObShareResourceThrottleTool<FakeAllocator, Args...>::skip_throttle(const int64_t skip_size)
|
void ObShareResourceThrottleTool<FakeAllocator, Args...>::skip_throttle(const int64_t skip_size)
|
||||||
@ -318,8 +288,6 @@ void ObShareResourceThrottleTool<FakeAllocator, Args...>::update_throttle_config
|
|||||||
}
|
}
|
||||||
|
|
||||||
#undef ACQUIRE_THROTTLE_UNIT
|
#undef ACQUIRE_THROTTLE_UNIT
|
||||||
#undef ACQUIRE_ALLOCATOR
|
|
||||||
#undef FUNC_PREFIX
|
|
||||||
|
|
||||||
} // namespace share
|
} // namespace share
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
||||||
|
@ -12,10 +12,12 @@
|
|||||||
|
|
||||||
#include "ob_share_throttle_define.h"
|
#include "ob_share_throttle_define.h"
|
||||||
|
|
||||||
|
#include "share/throttle/ob_throttle_info.h"
|
||||||
#include "observer/omt/ob_tenant_config_mgr.h"
|
#include "observer/omt/ob_tenant_config_mgr.h"
|
||||||
#include "lib/alloc/alloc_func.h"
|
#include "lib/alloc/alloc_func.h"
|
||||||
#include "storage/tx_storage/ob_tenant_freezer.h"
|
#include "storage/tx_storage/ob_tenant_freezer.h"
|
||||||
|
|
||||||
|
|
||||||
namespace oceanbase {
|
namespace oceanbase {
|
||||||
|
|
||||||
namespace share {
|
namespace share {
|
||||||
@ -113,5 +115,56 @@ void FakeAllocatorForTxShare::adaptive_update_limit(const int64_t tenant_id,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void PrintThrottleUtil::pirnt_throttle_info(const int err_code,
|
||||||
|
const char *throttle_unit_name,
|
||||||
|
const int64_t sleep_time,
|
||||||
|
const int64_t left_interval,
|
||||||
|
const int64_t expected_wait_time,
|
||||||
|
const int64_t abs_expire_time,
|
||||||
|
const ObThrottleInfoGuard &share_ti_guard,
|
||||||
|
const ObThrottleInfoGuard &module_ti_guard,
|
||||||
|
bool &has_printed_lbt)
|
||||||
|
{
|
||||||
|
int ret = err_code;
|
||||||
|
const int64_t WARN_LOG_INTERVAL = 1LL * 60L * 1000L * 1000L /* one minute */;
|
||||||
|
if (sleep_time > (WARN_LOG_INTERVAL) && TC_REACH_TIME_INTERVAL(WARN_LOG_INTERVAL)) {
|
||||||
|
SHARE_LOG(WARN,
|
||||||
|
"[Throttling] Attention!! Sleep More Than One Minute!!",
|
||||||
|
"Throttle Unit Name",
|
||||||
|
throttle_unit_name,
|
||||||
|
K(sleep_time),
|
||||||
|
K(left_interval),
|
||||||
|
K(expected_wait_time),
|
||||||
|
K(abs_expire_time),
|
||||||
|
KPC(share_ti_guard.throttle_info()),
|
||||||
|
KPC(module_ti_guard.throttle_info()));
|
||||||
|
if (!has_printed_lbt) {
|
||||||
|
has_printed_lbt = true;
|
||||||
|
oceanbase::share::ObTaskController::get().allow_next_syslog();
|
||||||
|
SHARE_LOG(INFO,
|
||||||
|
"[Throttling] (report write throttle info) LBT Info",
|
||||||
|
"Throttle Unit Name",
|
||||||
|
throttle_unit_name,
|
||||||
|
K(lbt()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void PrintThrottleUtil::print_throttle_statistic(const int err_code,
|
||||||
|
const char *throttle_unit_name,
|
||||||
|
const int64_t sleep_time)
|
||||||
|
{
|
||||||
|
int ret = err_code;
|
||||||
|
const int64_t THROTTLE_LOG_INTERVAL = 1L * 1000L * 1000L; /*one seconds*/
|
||||||
|
if (sleep_time > 0 && REACH_TIME_INTERVAL(THROTTLE_LOG_INTERVAL)) {
|
||||||
|
SHARE_LOG(INFO,
|
||||||
|
"[Throttling] (report write throttle info) Time Info",
|
||||||
|
"Throttle Unit Name",
|
||||||
|
throttle_unit_name,
|
||||||
|
"Throttle Sleep Time(us)",
|
||||||
|
sleep_time);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace share
|
} // namespace share
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
||||||
|
@ -77,6 +77,26 @@ namespace share {
|
|||||||
// ObTenantMdsAllocator>;
|
// ObTenantMdsAllocator>;
|
||||||
DEFINE_SHARE_THROTTLE(TxShare, ObMemstoreAllocator, ObTenantTxDataAllocator, ObTenantMdsAllocator)
|
DEFINE_SHARE_THROTTLE(TxShare, ObMemstoreAllocator, ObTenantTxDataAllocator, ObTenantMdsAllocator)
|
||||||
|
|
||||||
|
|
||||||
|
class ObThrottleInfoGuard;
|
||||||
|
|
||||||
|
class PrintThrottleUtil {
|
||||||
|
public:
|
||||||
|
static void pirnt_throttle_info(const int err_code,
|
||||||
|
const char *throttle_unit_name,
|
||||||
|
const int64_t sleep_time,
|
||||||
|
const int64_t left_interval,
|
||||||
|
const int64_t expected_wait_time,
|
||||||
|
const int64_t abs_expire_time,
|
||||||
|
const ObThrottleInfoGuard &share_ti_guard,
|
||||||
|
const ObThrottleInfoGuard &module_ti_guard,
|
||||||
|
bool &has_printed_lbt);
|
||||||
|
|
||||||
|
static void print_throttle_statistic(const int err_code,
|
||||||
|
const char *throttle_unit_name,
|
||||||
|
const int64_t sleep_time);
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace share
|
} // namespace share
|
||||||
|
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
||||||
|
@ -130,6 +130,7 @@ struct ObThrottleInfoGuard
|
|||||||
}
|
}
|
||||||
|
|
||||||
ObThrottleInfo *throttle_info() { return throttle_info_; }
|
ObThrottleInfo *throttle_info() { return throttle_info_; }
|
||||||
|
const ObThrottleInfo *throttle_info() const { return throttle_info_; }
|
||||||
|
|
||||||
TO_STRING_KV(KP(throttle_info_), KP(throttle_info_map_));
|
TO_STRING_KV(KP(throttle_info_), KP(throttle_info_map_));
|
||||||
};
|
};
|
||||||
|
@ -288,7 +288,7 @@ void ObThrottleUnit<ALLOCATOR>::advance_clock(const int64_t holding_size)
|
|||||||
const int64_t clock = ATOMIC_LOAD(&clock_);
|
const int64_t clock = ATOMIC_LOAD(&clock_);
|
||||||
const int64_t cur_seq = ATOMIC_LOAD(&sequence_num_);
|
const int64_t cur_seq = ATOMIC_LOAD(&sequence_num_);
|
||||||
ATOMIC_SET(&clock_, min(cur_seq, clock + avaliable_resource));
|
ATOMIC_SET(&clock_, min(cur_seq, clock + avaliable_resource));
|
||||||
if (REACH_TIME_INTERVAL(1LL * 1000LL * 1000LL/* 1 second */)) {
|
if (REACH_TIME_INTERVAL(10LL * 1000LL * 1000LL/* 10 seconds */)) {
|
||||||
SHARE_LOG(INFO,
|
SHARE_LOG(INFO,
|
||||||
"[Throttling] Advance Clock",
|
"[Throttling] Advance Clock",
|
||||||
K(avaliable_resource),
|
K(avaliable_resource),
|
||||||
|
@ -71,7 +71,8 @@ void ObStorageTableGuard::throttle_if_needed_()
|
|||||||
// only do throttle on active memtable
|
// only do throttle on active memtable
|
||||||
if (OB_NOT_NULL(memtable_) && memtable_->is_active_memtable()) {
|
if (OB_NOT_NULL(memtable_) && memtable_->is_active_memtable()) {
|
||||||
reset();
|
reset();
|
||||||
(void)do_throttle_(throttle_tool, share_ti_guard, module_ti_guard);
|
(void)TxShareMemThrottleUtil::do_throttle<ObMemstoreAllocator>(
|
||||||
|
for_replay_, store_ctx_.timeout_, throttle_tool, share_ti_guard, module_ti_guard);
|
||||||
}
|
}
|
||||||
|
|
||||||
// if throttle is skipped due to some reasons, advance clock by call skip_throttle() and clean throttle status
|
// if throttle is skipped due to some reasons, advance clock by call skip_throttle() and clean throttle status
|
||||||
@ -88,84 +89,6 @@ void ObStorageTableGuard::throttle_if_needed_()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#define PRINT_THROTTLE_WARN \
|
|
||||||
do { \
|
|
||||||
const int64_t WARN_LOG_INTERVAL = 60L * 1000L * 1000L /* one minute */; \
|
|
||||||
if (sleep_time > (WARN_LOG_INTERVAL) && TC_REACH_TIME_INTERVAL(WARN_LOG_INTERVAL)) { \
|
|
||||||
SHARE_LOG(WARN, \
|
|
||||||
"[Throttling] Attention!! Sleep More Than One Minute!!", \
|
|
||||||
K(sleep_time), \
|
|
||||||
K(left_interval), \
|
|
||||||
K(expected_wait_time)); \
|
|
||||||
} \
|
|
||||||
} while (0)
|
|
||||||
|
|
||||||
#define PRINT_THROTTLE_STATISTIC \
|
|
||||||
do { \
|
|
||||||
const int64_t MEMSTORE_THROTTLE_LOG_INTERVAL = 1L * 1000L * 1000L; /*one seconds*/ \
|
|
||||||
if (sleep_time > 0 && REACH_TIME_INTERVAL(MEMSTORE_THROTTLE_LOG_INTERVAL)) { \
|
|
||||||
SHARE_LOG(INFO, \
|
|
||||||
"[Throttling] (report write throttle info) Time Info", \
|
|
||||||
"Throttle Unit Name", \
|
|
||||||
ObMemstoreAllocator::throttle_unit_name(), \
|
|
||||||
"Throttle Sleep Time(us)", \
|
|
||||||
sleep_time); \
|
|
||||||
} \
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
void ObStorageTableGuard::do_throttle_(TxShareThrottleTool &throttle_tool,
|
|
||||||
ObThrottleInfoGuard &share_ti_guard,
|
|
||||||
ObThrottleInfoGuard &module_ti_guard)
|
|
||||||
{
|
|
||||||
int ret = OB_SUCCESS;
|
|
||||||
int64_t sleep_time = 0;
|
|
||||||
int64_t left_interval = share::ObThrottleUnit<ObTenantMdsAllocator>::DEFAULT_MAX_THROTTLE_TIME;
|
|
||||||
|
|
||||||
if (!for_replay_) {
|
|
||||||
left_interval = min(left_interval, store_ctx_.timeout_ - ObClockGenerator::getClock());
|
|
||||||
}
|
|
||||||
|
|
||||||
uint64_t timeout = 10000; // 10s
|
|
||||||
common::ObWaitEventGuard wait_guard(
|
|
||||||
common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval);
|
|
||||||
|
|
||||||
while (throttle_tool.still_throttling<ObMemstoreAllocator>(share_ti_guard, module_ti_guard) && (left_interval > 0)) {
|
|
||||||
int64_t expected_wait_time = 0;
|
|
||||||
if (for_replay_ && MTL(ObTenantFreezer *)->exist_ls_freezing()) {
|
|
||||||
// skip throttle if ls freeze exists
|
|
||||||
break;
|
|
||||||
} else if ((expected_wait_time =
|
|
||||||
throttle_tool.expected_wait_time<ObMemstoreAllocator>(share_ti_guard, module_ti_guard)) <= 0) {
|
|
||||||
if (expected_wait_time < 0) {
|
|
||||||
LOG_ERROR("expected wait time should not smaller than 0",
|
|
||||||
K(expected_wait_time),
|
|
||||||
KPC(share_ti_guard.throttle_info()),
|
|
||||||
KPC(module_ti_guard.throttle_info()),
|
|
||||||
K(clock),
|
|
||||||
K(left_interval));
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// do sleep when expected_wait_time and left_interval are not equal to 0
|
|
||||||
int64_t sleep_interval = min(SLEEP_INTERVAL_PER_TIME, expected_wait_time);
|
|
||||||
::usleep(sleep_interval);
|
|
||||||
sleep_time += sleep_interval;
|
|
||||||
left_interval -= sleep_interval;
|
|
||||||
|
|
||||||
PRINT_THROTTLE_WARN;
|
|
||||||
}
|
|
||||||
PRINT_THROTTLE_STATISTIC;
|
|
||||||
|
|
||||||
if (for_replay_ && sleep_time > 0) {
|
|
||||||
// avoid print replay_timeout
|
|
||||||
get_replay_is_writing_throttling() = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#undef PRINT_THROTTLE_WARN
|
|
||||||
#undef PRINT_THROTTLE_STATISTIC
|
|
||||||
|
|
||||||
int ObStorageTableGuard::refresh_and_protect_table(ObRelativeTable &relative_table)
|
int ObStorageTableGuard::refresh_and_protect_table(ObRelativeTable &relative_table)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
@ -66,9 +66,6 @@ private:
|
|||||||
bool need_to_refresh_table(ObTableStoreIterator &iter);
|
bool need_to_refresh_table(ObTableStoreIterator &iter);
|
||||||
void check_if_need_log_(bool &need_log, bool &need_log_error);
|
void check_if_need_log_(bool &need_log, bool &need_log_error);
|
||||||
void throttle_if_needed_();
|
void throttle_if_needed_();
|
||||||
void do_throttle_(share::TxShareThrottleTool &throttle_tool,
|
|
||||||
share::ObThrottleInfoGuard &share_ti_guard,
|
|
||||||
share::ObThrottleInfoGuard &module_ti_guard);
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static const int64_t LOG_INTERVAL_US = 10 * 1000 * 1000; // 10s
|
static const int64_t LOG_INTERVAL_US = 10 * 1000 * 1000; // 10s
|
||||||
|
Reference in New Issue
Block a user