From 3bdc9cb5df2ffa8bcdbac78040a38f13e384ac69 Mon Sep 17 00:00:00 2001 From: ZenoWang Date: Thu, 4 Jan 2024 13:42:38 +0000 Subject: [PATCH] [FIX] do throttle when mds operation is finished instead of allocating mds memory --- src/share/allocator/ob_mds_allocator.cpp | 48 ++++++++++++++++--- src/share/allocator/ob_mds_allocator.h | 18 +++++++ src/share/allocator/ob_tx_data_allocator.cpp | 4 ++ .../ob_share_resource_throttle_tool.ipp | 44 +++++++++-------- src/share/throttle/ob_throttle_unit.ipp | 9 ++++ src/storage/memtable/ob_memtable_context.cpp | 1 + src/storage/ob_storage_table_guard.cpp | 7 ++- src/storage/tablelock/ob_lock_memtable.cpp | 1 + src/storage/tx/ob_trans_service.cpp | 1 + src/storage/tx/ob_tx_replay_executor.cpp | 6 +++ unittest/storage/tx/it/test_register_mds.cpp | 3 ++ 11 files changed, 114 insertions(+), 28 deletions(-) diff --git a/src/share/allocator/ob_mds_allocator.cpp b/src/share/allocator/ob_mds_allocator.cpp index b2b55beaf7..166dfa4162 100644 --- a/src/share/allocator/ob_mds_allocator.cpp +++ b/src/share/allocator/ob_mds_allocator.cpp @@ -105,16 +105,13 @@ void *ObTenantMdsAllocator::alloc(const int64_t size, const int64_t abs_expire_t { bool is_throttled = false; - // try alloc resource from throttle tool + // record alloc resource in throttle tool, but do not throttle immediately + // ObMdsThrottleGuard calls the real throttle logic (void)throttle_tool_->alloc_resource(size, abs_expire_time, is_throttled); // if is throttled, do throttle if (OB_UNLIKELY(is_throttled)) { - if (MTL(ObTenantFreezer *)->exist_ls_freezing()) { - (void)throttle_tool_->skip_throttle(size); - } else { - (void)throttle_tool_->do_throttle(abs_expire_time); - } + share::mds_throttled_alloc() += size; } void *obj = allocator_.alloc(size); @@ -174,5 +171,44 @@ void ObTenantBufferCtxAllocator::free(void *ptr) MTL(ObTenantMdsService*)->erase_alloc_backtrace(ptr); } +ObMdsThrottleGuard::ObMdsThrottleGuard(const bool for_replay, const int64_t abs_expire_time) : for_replay_(for_replay), abs_expire_time_(abs_expire_time) +{ + throttle_tool_ = &(MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool()); +} + +ObMdsThrottleGuard::~ObMdsThrottleGuard() +{ + ObThrottleInfoGuard share_ti_guard; + ObThrottleInfoGuard module_ti_guard; + + if (OB_ISNULL(throttle_tool_)) { + MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "throttle tool is unexpected nullptr", KP(throttle_tool_)); + } else if (throttle_tool_->is_throttling(share_ti_guard, module_ti_guard)) { + if (MTL(ObTenantFreezer *)->exist_ls_freezing()) { + (void)throttle_tool_->skip_throttle( + share::mds_throttled_alloc(), share_ti_guard, module_ti_guard); + + if (module_ti_guard.is_valid()) { + module_ti_guard.throttle_info()->reset(); + } + } else { + uint64_t timeout = 10000; // 10s + int64_t left_interval = abs_expire_time_ - ObClockGenerator::getClock(); + common::ObWaitEventGuard wait_guard( + common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval); + (void)throttle_tool_->do_throttle(abs_expire_time_); + + if (for_replay_) { + get_replay_is_writing_throttling() = true; + } + } + + // reset mds throttled alloc size + share::mds_throttled_alloc() = 0; + } else { + // do not need throttle, exit directly + } +} + } // namespace share } // namespace oceanbase \ No newline at end of file diff --git a/src/share/allocator/ob_mds_allocator.h b/src/share/allocator/ob_mds_allocator.h index 54f80186a3..31de9324f4 100644 --- a/src/share/allocator/ob_mds_allocator.h +++ b/src/share/allocator/ob_mds_allocator.h @@ -20,6 +20,12 @@ namespace oceanbase { namespace share { +OB_INLINE int64_t &mds_throttled_alloc() +{ + RLOCAL_INLINE(int64_t, mds_throttled_alloc); + return mds_throttled_alloc; +} + class ObTenantMdsAllocator : public ObIAllocator { private: static const int64_t MDS_ALLOC_CONCURRENCY = 32; @@ -55,6 +61,18 @@ struct ObTenantBufferCtxAllocator : public ObIAllocator// for now, it is just a virtual void set_attr(const ObMemAttr &) override {} }; +class ObMdsThrottleGuard +{ +public: + ObMdsThrottleGuard(const bool for_replay, const int64_t abs_expire_time); + ~ObMdsThrottleGuard(); + +private: + bool for_replay_; + int64_t abs_expire_time_; + share::TxShareThrottleTool *throttle_tool_; +}; + } // namespace share } // namespace oceanbase diff --git a/src/share/allocator/ob_tx_data_allocator.cpp b/src/share/allocator/ob_tx_data_allocator.cpp index e51a839854..cb0f7f1049 100644 --- a/src/share/allocator/ob_tx_data_allocator.cpp +++ b/src/share/allocator/ob_tx_data_allocator.cpp @@ -100,6 +100,10 @@ void *ObTenantTxDataAllocator::alloc(const bool enable_throttle, const int64_t a if (MTL(ObTenantFreezer *)->exist_ls_freezing()) { (void)throttle_tool_->skip_throttle(storage::TX_DATA_SLICE_SIZE); } else { + uint64_t timeout = 10000; // 10s + int64_t left_interval = abs_expire_time - ObClockGenerator::getClock(); + common::ObWaitEventGuard wait_guard( + common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval); (void)throttle_tool_->do_throttle(abs_expire_time); } } diff --git a/src/share/throttle/ob_share_resource_throttle_tool.ipp b/src/share/throttle/ob_share_resource_throttle_tool.ipp index fb37b34e4d..d690e970c2 100644 --- a/src/share/throttle/ob_share_resource_throttle_tool.ipp +++ b/src/share/throttle/ob_share_resource_throttle_tool.ipp @@ -10,6 +10,7 @@ * See the Mulan PubL v2 for more details. */ + #ifndef OCEABASE_SHARE_THROTTLE_OB_SHARE_RESOURCE_THROTTLE_TOOL_IPP #define OCEABASE_SHARE_THROTTLE_OB_SHARE_RESOURCE_THROTTLE_TOOL_IPP @@ -18,6 +19,8 @@ #include "ob_share_resource_throttle_tool.h" #endif + + namespace oceanbase { namespace share { @@ -187,26 +190,27 @@ int64_t ObShareResourceThrottleTool::expected_wait_time( return expected_wait_time; } -#define PRINT_THROTTLE_WARN \ - do { \ - const int64_t WARN_LOG_INTERVAL = 1LL * 60L * 1000L * 1000L /* one minute */; \ - if (sleep_time > (WARN_LOG_INTERVAL) && TC_REACH_TIME_INTERVAL(WARN_LOG_INTERVAL)) { \ - SHARE_LOG(WARN, \ - "[Throttling] Attention!! Sleep More Than One Minute!!", \ - "Throttle Unit Name", \ - ALLOCATOR::throttle_unit_name(), \ - K(sleep_time), \ - K(left_interval), \ - K(expected_wait_t)); \ - if (!has_printed_lbt) { \ - has_printed_lbt = true; \ - SHARE_LOG(WARN, \ - "[Throttling] (report write throttle info) LBT Info", \ - "Throttle Unit Name", \ - ALLOCATOR::throttle_unit_name(), \ - K(lbt())); \ - } \ - } \ +#define PRINT_THROTTLE_WARN \ + do { \ + const int64_t WARN_LOG_INTERVAL = 1LL * 60L * 1000L * 1000L /* one minute */; \ + if (sleep_time > (WARN_LOG_INTERVAL) && TC_REACH_TIME_INTERVAL(WARN_LOG_INTERVAL)) { \ + SHARE_LOG(WARN, \ + "[Throttling] Attention!! Sleep More Than One Minute!!", \ + "Throttle Unit Name", \ + ALLOCATOR::throttle_unit_name(), \ + K(sleep_time), \ + K(left_interval), \ + K(expected_wait_t)); \ + if (!has_printed_lbt) { \ + has_printed_lbt = true; \ + oceanbase::share::ObTaskController::get().allow_next_syslog(); \ + SHARE_LOG(INFO, \ + "[Throttling] (report write throttle info) LBT Info", \ + "Throttle Unit Name", \ + ALLOCATOR::throttle_unit_name(), \ + K(lbt())); \ + } \ + } \ } while (0) #define PRINT_THROTTLE_STATISTIC \ diff --git a/src/share/throttle/ob_throttle_unit.ipp b/src/share/throttle/ob_throttle_unit.ipp index acaa8c1eb5..ddd0a98cb8 100644 --- a/src/share/throttle/ob_throttle_unit.ipp +++ b/src/share/throttle/ob_throttle_unit.ipp @@ -288,6 +288,15 @@ void ObThrottleUnit::advance_clock(const int64_t holding_size) const int64_t clock = ATOMIC_LOAD(&clock_); const int64_t cur_seq = ATOMIC_LOAD(&sequence_num_); ATOMIC_SET(&clock_, min(cur_seq, clock + avaliable_resource)); + if (REACH_TIME_INTERVAL(1LL * 1000LL * 1000LL/* 1 second */)) { + SHARE_LOG(INFO, + "[Throttling] Advance Clock", + K(avaliable_resource), + K(clock), + K(clock_), + K(cur_seq), + THROTTLE_UNIT_INFO); + } } } diff --git a/src/storage/memtable/ob_memtable_context.cpp b/src/storage/memtable/ob_memtable_context.cpp index 5d5a904599..f3bfe323cd 100644 --- a/src/storage/memtable/ob_memtable_context.cpp +++ b/src/storage/memtable/ob_memtable_context.cpp @@ -1180,6 +1180,7 @@ int ObMemtableCtx::register_multi_source_data_if_need_( int64_t pos = 0; ObTxDataSourceType type = ObTxDataSourceType::TABLE_LOCK; ObPartTransCtx *part_ctx = static_cast(ctx_); + if (serialize_size > tablelock::ObTableLockOp::MAX_SERIALIZE_SIZE) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "lock op serialize size if over flow", K(ret), K(serialize_size)); diff --git a/src/storage/ob_storage_table_guard.cpp b/src/storage/ob_storage_table_guard.cpp index 2d5ff4d38a..d614342ad7 100644 --- a/src/storage/ob_storage_table_guard.cpp +++ b/src/storage/ob_storage_table_guard.cpp @@ -66,7 +66,6 @@ void ObStorageTableGuard::throttle_if_needed_() TxShareThrottleTool &throttle_tool = MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool(); ObThrottleInfoGuard share_ti_guard; ObThrottleInfoGuard module_ti_guard; - int64_t thread_idx = common::get_itid(); if (throttle_tool.is_throttling(share_ti_guard, module_ti_guard)) { // only do throttle on active memtable @@ -120,12 +119,16 @@ void ObStorageTableGuard::do_throttle_(TxShareThrottleTool &throttle_tool, { int ret = OB_SUCCESS; int64_t sleep_time = 0; - int64_t left_interval = INT64_MAX; + int64_t left_interval = share::ObThrottleUnit::DEFAULT_MAX_THROTTLE_TIME; if (!for_replay_) { left_interval = min(left_interval, store_ctx_.timeout_ - ObClockGenerator::getClock()); } + uint64_t timeout = 10000; // 10s + common::ObWaitEventGuard wait_guard( + common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval); + while (throttle_tool.still_throttling(share_ti_guard, module_ti_guard) && (left_interval > 0)) { int64_t expected_wait_time = 0; if (for_replay_ && MTL(ObTenantFreezer *)->exist_ls_freezing()) { diff --git a/src/storage/tablelock/ob_lock_memtable.cpp b/src/storage/tablelock/ob_lock_memtable.cpp index 648cc0789f..eae49566a1 100644 --- a/src/storage/tablelock/ob_lock_memtable.cpp +++ b/src/storage/tablelock/ob_lock_memtable.cpp @@ -149,6 +149,7 @@ int ObLockMemtable::lock_( do { // retry if there is lock conflict at part trans ctx. need_retry = false; + { succ_step = STEP_BEGIN; lock_exist = false; diff --git a/src/storage/tx/ob_trans_service.cpp b/src/storage/tx/ob_trans_service.cpp index af60de9954..dd246ce09b 100644 --- a/src/storage/tx/ob_trans_service.cpp +++ b/src/storage/tx/ob_trans_service.cpp @@ -934,6 +934,7 @@ int ObTransService::register_mds_into_ctx_(ObTxDesc &tx_desc, TRANS_LOG(WARN, "get store ctx failed", KR(ret), K(tx_desc), K(ls_id)); } else { ObPartTransCtx *ctx = store_ctx.mvcc_acc_ctx_.tx_ctx_; + ObMdsThrottleGuard mds_throttle_guard(false/* for_replay */, ctx->get_trans_expired_time()); if (OB_ISNULL(ctx)) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "unexpected null ptr", KR(ret), K(tx_desc), K(ls_id), K(type)); diff --git a/src/storage/tx/ob_tx_replay_executor.cpp b/src/storage/tx/ob_tx_replay_executor.cpp index 55e789e4ca..6d8c44a2ee 100644 --- a/src/storage/tx/ob_tx_replay_executor.cpp +++ b/src/storage/tx/ob_tx_replay_executor.cpp @@ -10,6 +10,7 @@ * See the Mulan PubL v2 for more details. */ +#include "share/throttle/ob_throttle_unit.h" #include "storage/ls/ob_ls.h" #include "storage/ls/ob_ls_tx_service.h" #include "storage/memtable/ob_memtable.h" @@ -443,6 +444,11 @@ int ObTxReplayExecutor::replay_multi_source_data_() { int ret = OB_SUCCESS; ObTxMultiDataSourceLog log; + + ObMdsThrottleGuard mds_throttle_guard(true /* for_replay */, + ObClockGenerator::getClock() + + share::ObThrottleUnit::DEFAULT_MAX_THROTTLE_TIME); + if (OB_FAIL(log_block_.deserialize_log_body(log))) { TRANS_LOG(WARN, "[Replay Tx] deserialize log body error", KR(ret), K(lsn_), K(log_ts_ns_)); } else if (OB_FAIL(ctx_->replay_multi_data_source(log, lsn_, log_ts_ns_, tx_part_log_no_))) { diff --git a/unittest/storage/tx/it/test_register_mds.cpp b/unittest/storage/tx/it/test_register_mds.cpp index c2647d41f9..24c013a141 100644 --- a/unittest/storage/tx/it/test_register_mds.cpp +++ b/unittest/storage/tx/it/test_register_mds.cpp @@ -32,6 +32,9 @@ using namespace share; static ObSharedMemAllocMgr MTL_MEM_ALLOC_MGR; namespace share { + +ObMdsThrottleGuard::~ObMdsThrottleGuard() {} + int ObTenantTxDataAllocator::init(const char *label) { int ret = OB_SUCCESS;