[FIX] do throttle when mds operation is finished instead of allocating mds memory
This commit is contained in:
		@ -105,16 +105,13 @@ void *ObTenantMdsAllocator::alloc(const int64_t size, const int64_t abs_expire_t
 | 
			
		||||
{
 | 
			
		||||
  bool is_throttled = false;
 | 
			
		||||
 | 
			
		||||
  // try alloc resource from throttle tool
 | 
			
		||||
  // record alloc resource in throttle tool, but do not throttle immediately
 | 
			
		||||
  // ObMdsThrottleGuard calls the real throttle logic
 | 
			
		||||
  (void)throttle_tool_->alloc_resource<ObTenantMdsAllocator>(size, abs_expire_time, is_throttled);
 | 
			
		||||
 | 
			
		||||
  // if is throttled, do throttle
 | 
			
		||||
  if (OB_UNLIKELY(is_throttled)) {
 | 
			
		||||
    if (MTL(ObTenantFreezer *)->exist_ls_freezing()) {
 | 
			
		||||
      (void)throttle_tool_->skip_throttle<ObTenantTxDataAllocator>(size);
 | 
			
		||||
    } else {
 | 
			
		||||
      (void)throttle_tool_->do_throttle<ObTenantTxDataAllocator>(abs_expire_time);
 | 
			
		||||
    }
 | 
			
		||||
    share::mds_throttled_alloc() += size;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  void *obj = allocator_.alloc(size);
 | 
			
		||||
@ -174,5 +171,44 @@ void ObTenantBufferCtxAllocator::free(void *ptr)
 | 
			
		||||
  MTL(ObTenantMdsService*)->erase_alloc_backtrace(ptr);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
ObMdsThrottleGuard::ObMdsThrottleGuard(const bool for_replay, const int64_t abs_expire_time) : for_replay_(for_replay), abs_expire_time_(abs_expire_time)
 | 
			
		||||
{
 | 
			
		||||
  throttle_tool_ = &(MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
ObMdsThrottleGuard::~ObMdsThrottleGuard()
 | 
			
		||||
{
 | 
			
		||||
  ObThrottleInfoGuard share_ti_guard;
 | 
			
		||||
  ObThrottleInfoGuard module_ti_guard;
 | 
			
		||||
 | 
			
		||||
  if (OB_ISNULL(throttle_tool_)) {
 | 
			
		||||
    MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "throttle tool is unexpected nullptr", KP(throttle_tool_));
 | 
			
		||||
  } else if (throttle_tool_->is_throttling<ObTenantMdsAllocator>(share_ti_guard, module_ti_guard)) {
 | 
			
		||||
    if (MTL(ObTenantFreezer *)->exist_ls_freezing()) {
 | 
			
		||||
      (void)throttle_tool_->skip_throttle<ObTenantMdsAllocator>(
 | 
			
		||||
          share::mds_throttled_alloc(), share_ti_guard, module_ti_guard);
 | 
			
		||||
 | 
			
		||||
      if (module_ti_guard.is_valid()) {
 | 
			
		||||
        module_ti_guard.throttle_info()->reset();
 | 
			
		||||
      }
 | 
			
		||||
    } else {
 | 
			
		||||
      uint64_t timeout = 10000;  // 10s
 | 
			
		||||
      int64_t left_interval = abs_expire_time_ - ObClockGenerator::getClock();
 | 
			
		||||
      common::ObWaitEventGuard wait_guard(
 | 
			
		||||
          common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval);
 | 
			
		||||
      (void)throttle_tool_->do_throttle<ObTenantMdsAllocator>(abs_expire_time_);
 | 
			
		||||
 | 
			
		||||
      if (for_replay_) {
 | 
			
		||||
        get_replay_is_writing_throttling() = true;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // reset mds throttled alloc size
 | 
			
		||||
    share::mds_throttled_alloc() = 0;
 | 
			
		||||
  } else {
 | 
			
		||||
    // do not need throttle, exit directly
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
}  // namespace share
 | 
			
		||||
}  // namespace oceanbase
 | 
			
		||||
@ -20,6 +20,12 @@ namespace oceanbase {
 | 
			
		||||
 | 
			
		||||
namespace share {
 | 
			
		||||
 | 
			
		||||
OB_INLINE int64_t &mds_throttled_alloc()
 | 
			
		||||
{
 | 
			
		||||
  RLOCAL_INLINE(int64_t, mds_throttled_alloc);
 | 
			
		||||
  return mds_throttled_alloc;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
class ObTenantMdsAllocator : public ObIAllocator {
 | 
			
		||||
private:
 | 
			
		||||
  static const int64_t MDS_ALLOC_CONCURRENCY = 32;
 | 
			
		||||
@ -55,6 +61,18 @@ struct ObTenantBufferCtxAllocator : public ObIAllocator// for now, it is just a
 | 
			
		||||
  virtual void set_attr(const ObMemAttr &) override {}
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class ObMdsThrottleGuard
 | 
			
		||||
{
 | 
			
		||||
public:
 | 
			
		||||
  ObMdsThrottleGuard(const bool for_replay, const int64_t abs_expire_time);
 | 
			
		||||
  ~ObMdsThrottleGuard();
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
  bool for_replay_;
 | 
			
		||||
  int64_t abs_expire_time_;
 | 
			
		||||
  share::TxShareThrottleTool *throttle_tool_;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
}  // namespace share
 | 
			
		||||
}  // namespace oceanbase
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -100,6 +100,10 @@ void *ObTenantTxDataAllocator::alloc(const bool enable_throttle, const int64_t a
 | 
			
		||||
      if (MTL(ObTenantFreezer *)->exist_ls_freezing()) {
 | 
			
		||||
        (void)throttle_tool_->skip_throttle<ObTenantTxDataAllocator>(storage::TX_DATA_SLICE_SIZE);
 | 
			
		||||
      } else {
 | 
			
		||||
        uint64_t timeout = 10000;  // 10s
 | 
			
		||||
        int64_t left_interval = abs_expire_time - ObClockGenerator::getClock();
 | 
			
		||||
        common::ObWaitEventGuard wait_guard(
 | 
			
		||||
            common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval);
 | 
			
		||||
        (void)throttle_tool_->do_throttle<ObTenantTxDataAllocator>(abs_expire_time);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -10,6 +10,7 @@
 | 
			
		||||
 * See the Mulan PubL v2 for more details.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
#ifndef OCEABASE_SHARE_THROTTLE_OB_SHARE_RESOURCE_THROTTLE_TOOL_IPP
 | 
			
		||||
#define OCEABASE_SHARE_THROTTLE_OB_SHARE_RESOURCE_THROTTLE_TOOL_IPP
 | 
			
		||||
 | 
			
		||||
@ -18,6 +19,8 @@
 | 
			
		||||
#include "ob_share_resource_throttle_tool.h"
 | 
			
		||||
#endif
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
namespace oceanbase {
 | 
			
		||||
namespace share {
 | 
			
		||||
 | 
			
		||||
@ -200,7 +203,8 @@ int64_t ObShareResourceThrottleTool<FakeAllocator, Args...>::expected_wait_time(
 | 
			
		||||
                K(expected_wait_t));                                                     \
 | 
			
		||||
      if (!has_printed_lbt) {                                                            \
 | 
			
		||||
        has_printed_lbt = true;                                                          \
 | 
			
		||||
        SHARE_LOG(WARN,                                                                   \
 | 
			
		||||
        oceanbase::share::ObTaskController::get().allow_next_syslog();                   \
 | 
			
		||||
        SHARE_LOG(INFO,                                                                  \
 | 
			
		||||
                  "[Throttling] (report write throttle info) LBT Info",                  \
 | 
			
		||||
                  "Throttle Unit Name",                                                  \
 | 
			
		||||
                  ALLOCATOR::throttle_unit_name(),                                       \
 | 
			
		||||
 | 
			
		||||
@ -288,6 +288,15 @@ void ObThrottleUnit<ALLOCATOR>::advance_clock(const int64_t holding_size)
 | 
			
		||||
    const int64_t clock = ATOMIC_LOAD(&clock_);
 | 
			
		||||
    const int64_t cur_seq = ATOMIC_LOAD(&sequence_num_);
 | 
			
		||||
    ATOMIC_SET(&clock_, min(cur_seq, clock + avaliable_resource));
 | 
			
		||||
    if (REACH_TIME_INTERVAL(1LL * 1000LL * 1000LL/* 1 second */)) {
 | 
			
		||||
      SHARE_LOG(INFO,
 | 
			
		||||
                "[Throttling] Advance Clock",
 | 
			
		||||
                K(avaliable_resource),
 | 
			
		||||
                K(clock),
 | 
			
		||||
                K(clock_),
 | 
			
		||||
                K(cur_seq),
 | 
			
		||||
                THROTTLE_UNIT_INFO);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1180,6 +1180,7 @@ int ObMemtableCtx::register_multi_source_data_if_need_(
 | 
			
		||||
    int64_t pos = 0;
 | 
			
		||||
    ObTxDataSourceType type = ObTxDataSourceType::TABLE_LOCK;
 | 
			
		||||
    ObPartTransCtx *part_ctx = static_cast<ObPartTransCtx *>(ctx_);
 | 
			
		||||
 | 
			
		||||
    if (serialize_size > tablelock::ObTableLockOp::MAX_SERIALIZE_SIZE) {
 | 
			
		||||
      ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
      TRANS_LOG(WARN, "lock op serialize size if over flow", K(ret), K(serialize_size));
 | 
			
		||||
 | 
			
		||||
@ -66,7 +66,6 @@ void ObStorageTableGuard::throttle_if_needed_()
 | 
			
		||||
    TxShareThrottleTool &throttle_tool = MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool();
 | 
			
		||||
    ObThrottleInfoGuard share_ti_guard;
 | 
			
		||||
    ObThrottleInfoGuard module_ti_guard;
 | 
			
		||||
    int64_t thread_idx = common::get_itid();
 | 
			
		||||
    if (throttle_tool.is_throttling<ObMemstoreAllocator>(share_ti_guard, module_ti_guard)) {
 | 
			
		||||
 | 
			
		||||
      // only do throttle on active memtable
 | 
			
		||||
@ -120,12 +119,16 @@ void ObStorageTableGuard::do_throttle_(TxShareThrottleTool &throttle_tool,
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  int64_t sleep_time = 0;
 | 
			
		||||
  int64_t left_interval = INT64_MAX;
 | 
			
		||||
  int64_t left_interval = share::ObThrottleUnit<ObTenantMdsAllocator>::DEFAULT_MAX_THROTTLE_TIME;
 | 
			
		||||
 | 
			
		||||
  if (!for_replay_) {
 | 
			
		||||
    left_interval = min(left_interval, store_ctx_.timeout_ - ObClockGenerator::getClock());
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  uint64_t timeout = 10000;  // 10s
 | 
			
		||||
  common::ObWaitEventGuard wait_guard(
 | 
			
		||||
      common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval);
 | 
			
		||||
 | 
			
		||||
  while (throttle_tool.still_throttling<ObMemstoreAllocator>(share_ti_guard, module_ti_guard) && (left_interval > 0)) {
 | 
			
		||||
    int64_t expected_wait_time = 0;
 | 
			
		||||
    if (for_replay_ && MTL(ObTenantFreezer *)->exist_ls_freezing()) {
 | 
			
		||||
 | 
			
		||||
@ -149,6 +149,7 @@ int ObLockMemtable::lock_(
 | 
			
		||||
  do {
 | 
			
		||||
    // retry if there is lock conflict at part trans ctx.
 | 
			
		||||
    need_retry = false;
 | 
			
		||||
 | 
			
		||||
    {
 | 
			
		||||
      succ_step = STEP_BEGIN;
 | 
			
		||||
      lock_exist = false;
 | 
			
		||||
 | 
			
		||||
@ -934,6 +934,7 @@ int ObTransService::register_mds_into_ctx_(ObTxDesc &tx_desc,
 | 
			
		||||
    TRANS_LOG(WARN, "get store ctx failed", KR(ret), K(tx_desc), K(ls_id));
 | 
			
		||||
  } else {
 | 
			
		||||
    ObPartTransCtx *ctx = store_ctx.mvcc_acc_ctx_.tx_ctx_;
 | 
			
		||||
    ObMdsThrottleGuard mds_throttle_guard(false/* for_replay */, ctx->get_trans_expired_time());
 | 
			
		||||
    if (OB_ISNULL(ctx)) {
 | 
			
		||||
      ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
      TRANS_LOG(WARN, "unexpected null ptr", KR(ret), K(tx_desc), K(ls_id), K(type));
 | 
			
		||||
 | 
			
		||||
@ -10,6 +10,7 @@
 | 
			
		||||
 * See the Mulan PubL v2 for more details.
 | 
			
		||||
 */
 | 
			
		||||
 | 
			
		||||
#include "share/throttle/ob_throttle_unit.h"
 | 
			
		||||
#include "storage/ls/ob_ls.h"
 | 
			
		||||
#include "storage/ls/ob_ls_tx_service.h"
 | 
			
		||||
#include "storage/memtable/ob_memtable.h"
 | 
			
		||||
@ -443,6 +444,11 @@ int ObTxReplayExecutor::replay_multi_source_data_()
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObTxMultiDataSourceLog log;
 | 
			
		||||
 | 
			
		||||
  ObMdsThrottleGuard mds_throttle_guard(true /* for_replay */,
 | 
			
		||||
                                        ObClockGenerator::getClock() +
 | 
			
		||||
                                            share::ObThrottleUnit<ObTenantMdsAllocator>::DEFAULT_MAX_THROTTLE_TIME);
 | 
			
		||||
 | 
			
		||||
  if (OB_FAIL(log_block_.deserialize_log_body(log))) {
 | 
			
		||||
    TRANS_LOG(WARN, "[Replay Tx] deserialize log body error", KR(ret), K(lsn_), K(log_ts_ns_));
 | 
			
		||||
  } else if (OB_FAIL(ctx_->replay_multi_data_source(log, lsn_, log_ts_ns_, tx_part_log_no_))) {
 | 
			
		||||
 | 
			
		||||
@ -32,6 +32,9 @@ using namespace share;
 | 
			
		||||
static ObSharedMemAllocMgr MTL_MEM_ALLOC_MGR;
 | 
			
		||||
 | 
			
		||||
namespace share {
 | 
			
		||||
 | 
			
		||||
ObMdsThrottleGuard::~ObMdsThrottleGuard() {}
 | 
			
		||||
 | 
			
		||||
int ObTenantTxDataAllocator::init(const char *label)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user