From fc9414974c2dda721a2499a97ee2289196cca515 Mon Sep 17 00:00:00 2001 From: ZenoWang Date: Fri, 5 Jan 2024 03:42:39 +0000 Subject: [PATCH] [FIX] do throttle after creating tx ctx or rollback to savepoint in replay situation --- src/share/allocator/ob_tx_data_allocator.cpp | 51 +++++++++++++++---- src/share/allocator/ob_tx_data_allocator.h | 18 +++++++ .../ob_share_resource_throttle_tool.ipp | 3 +- src/storage/ls/ob_ls_tx_service.cpp | 7 +++ src/storage/tx/ob_tx_replay_executor.cpp | 6 +++ unittest/storage/tx/it/test_register_mds.cpp | 1 + unittest/storage/tx/it/test_tx.cpp | 3 ++ 7 files changed, 79 insertions(+), 10 deletions(-) diff --git a/src/share/allocator/ob_tx_data_allocator.cpp b/src/share/allocator/ob_tx_data_allocator.cpp index cb0f7f104..d7e6da82c 100644 --- a/src/share/allocator/ob_tx_data_allocator.cpp +++ b/src/share/allocator/ob_tx_data_allocator.cpp @@ -96,16 +96,9 @@ void *ObTenantTxDataAllocator::alloc(const bool enable_throttle, const int64_t a bool is_throttled = false; (void)throttle_tool_->alloc_resource( storage::TX_DATA_SLICE_SIZE, abs_expire_time, is_throttled); + if (OB_UNLIKELY(is_throttled)) { - if (MTL(ObTenantFreezer *)->exist_ls_freezing()) { - (void)throttle_tool_->skip_throttle(storage::TX_DATA_SLICE_SIZE); - } else { - uint64_t timeout = 10000; // 10s - int64_t left_interval = abs_expire_time - ObClockGenerator::getClock(); - common::ObWaitEventGuard wait_guard( - common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval); - (void)throttle_tool_->do_throttle(abs_expire_time); - } + share::tx_data_throttled_alloc() += storage::TX_DATA_SLICE_SIZE; } } @@ -114,5 +107,45 @@ void *ObTenantTxDataAllocator::alloc(const bool enable_throttle, const int64_t a return res; } +ObTxDataThrottleGuard::ObTxDataThrottleGuard(const bool for_replay, const int64_t abs_expire_time) + : for_replay_(for_replay), abs_expire_time_(abs_expire_time) +{ + throttle_tool_ = &(MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool()); +} + +ObTxDataThrottleGuard::~ObTxDataThrottleGuard() +{ + ObThrottleInfoGuard share_ti_guard; + ObThrottleInfoGuard module_ti_guard; + + if (OB_ISNULL(throttle_tool_)) { + MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "throttle tool is unexpected nullptr", KP(throttle_tool_)); + } else if (throttle_tool_->is_throttling(share_ti_guard, module_ti_guard)) { + if (MTL(ObTenantFreezer *)->exist_ls_freezing()) { + (void)throttle_tool_->skip_throttle( + share::tx_data_throttled_alloc(), share_ti_guard, module_ti_guard); + + if (module_ti_guard.is_valid()) { + module_ti_guard.throttle_info()->reset(); + } + } else { + uint64_t timeout = 10000; // 10s + int64_t left_interval = abs_expire_time_ - ObClockGenerator::getClock(); + common::ObWaitEventGuard wait_guard( + common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval); + (void)throttle_tool_->do_throttle(abs_expire_time_); + + if (for_replay_) { + get_replay_is_writing_throttling() = true; + } + } + + // reset tx data throttled alloc size + share::tx_data_throttled_alloc() = 0; + } else { + // do not need throttle, exit directly + } +} + } // namespace share } // namespace oceanbase \ No newline at end of file diff --git a/src/share/allocator/ob_tx_data_allocator.h b/src/share/allocator/ob_tx_data_allocator.h index 971bf9fa8..ab146549f 100644 --- a/src/share/allocator/ob_tx_data_allocator.h +++ b/src/share/allocator/ob_tx_data_allocator.h @@ -20,6 +20,12 @@ namespace oceanbase { namespace share { +OB_INLINE int64_t &tx_data_throttled_alloc() +{ + RLOCAL_INLINE(int64_t, tx_data_throttled_alloc); + return tx_data_throttled_alloc; +} + class ObTenantTxDataAllocator { public: using SliceAllocator = ObSliceAlloc; @@ -57,6 +63,18 @@ private: SliceAllocator slice_allocator_; }; +class ObTxDataThrottleGuard +{ +public: + ObTxDataThrottleGuard(const bool for_replay, const int64_t abs_expire_time); + ~ObTxDataThrottleGuard(); + +private: + bool for_replay_; + int64_t abs_expire_time_; + share::TxShareThrottleTool *throttle_tool_; +}; + } // namespace share } // namespace oceanbase diff --git a/src/share/throttle/ob_share_resource_throttle_tool.ipp b/src/share/throttle/ob_share_resource_throttle_tool.ipp index d690e970c..effee3b6e 100644 --- a/src/share/throttle/ob_share_resource_throttle_tool.ipp +++ b/src/share/throttle/ob_share_resource_throttle_tool.ipp @@ -200,7 +200,8 @@ int64_t ObShareResourceThrottleTool::expected_wait_time( ALLOCATOR::throttle_unit_name(), \ K(sleep_time), \ K(left_interval), \ - K(expected_wait_t)); \ + K(expected_wait_t), \ + K(abs_expire_time)); \ if (!has_printed_lbt) { \ has_printed_lbt = true; \ oceanbase::share::ObTaskController::get().allow_next_syslog(); \ diff --git a/src/storage/ls/ob_ls_tx_service.cpp b/src/storage/ls/ob_ls_tx_service.cpp index 6c73992e5..fa6a12e6e 100644 --- a/src/storage/ls/ob_ls_tx_service.cpp +++ b/src/storage/ls/ob_ls_tx_service.cpp @@ -15,6 +15,7 @@ #define USING_LOG_PREFIX TRANS #include "ob_ls_tx_service.h" +#include "share/throttle/ob_throttle_unit.h" #include "storage/ls/ob_ls.h" #include "storage/tablelock/ob_table_lock_common.h" #include "storage/tx/ob_trans_ctx_mgr.h" @@ -191,6 +192,12 @@ int ObLSTxService::get_write_store_ctx(ObTxDesc &tx, ret = OB_NOT_INIT; TRANS_LOG(WARN, "not init", K(ret)); } else { + int64_t abs_expire_ts = ObClockGenerator::getClock() + tx.get_timeout_us(); + if (abs_expire_ts < 0) { + abs_expire_ts = ObClockGenerator::getClock() + share::ObThrottleUnit::DEFAULT_MAX_THROTTLE_TIME; + } + + ObTxDataThrottleGuard tx_data_throttle_guard(false /* for_replay */, abs_expire_ts); ret = trans_service_->get_write_store_ctx(tx, snapshot, write_flag, store_ctx, spec_seq_no, false); } return ret; diff --git a/src/storage/tx/ob_tx_replay_executor.cpp b/src/storage/tx/ob_tx_replay_executor.cpp index 6d8c44a2e..b6e6ac16e 100644 --- a/src/storage/tx/ob_tx_replay_executor.cpp +++ b/src/storage/tx/ob_tx_replay_executor.cpp @@ -287,6 +287,9 @@ int ObTxReplayExecutor::try_get_tx_ctx_() scheduler, INT64_MAX, /*trans_expired_time_*/ ls_tx_srv_->get_trans_service()); + ObTxDataThrottleGuard tx_data_throttle_guard( + true /* for_replay_ */, + ObClockGenerator::getClock() + share::ObThrottleUnit::DEFAULT_MAX_THROTTLE_TIME); if (OB_FAIL(ls_tx_srv_->create_tx_ctx(arg, tx_ctx_existed, ctx_))) { TRANS_LOG(WARN, "get_tx_ctx error", K(ret), K(tx_id), KP(ctx_)); } else { @@ -399,6 +402,9 @@ int ObTxReplayExecutor::replay_rollback_to_() const bool tx_queue = is_tx_log_replay_queue(); ObTxRollbackToLog log; const bool pre_barrier = base_header_.need_pre_replay_barrier(); + ObTxDataThrottleGuard tx_data_throttle_guard( + true /* for_replay_ */, + ObClockGenerator::getClock() + share::ObThrottleUnit::DEFAULT_MAX_THROTTLE_TIME); if (OB_FAIL(log_block_.deserialize_log_body(log))) { TRANS_LOG(WARN, "[Replay Tx] deserialize log body error", KR(ret), "log_type", "RollbackTo", K(lsn_), K(log_ts_ns_)); diff --git a/unittest/storage/tx/it/test_register_mds.cpp b/unittest/storage/tx/it/test_register_mds.cpp index 24c013a14..0bc7e5c51 100644 --- a/unittest/storage/tx/it/test_register_mds.cpp +++ b/unittest/storage/tx/it/test_register_mds.cpp @@ -34,6 +34,7 @@ static ObSharedMemAllocMgr MTL_MEM_ALLOC_MGR; namespace share { ObMdsThrottleGuard::~ObMdsThrottleGuard() {} +ObTxDataThrottleGuard::~ObTxDataThrottleGuard() {} int ObTenantTxDataAllocator::init(const char *label) { diff --git a/unittest/storage/tx/it/test_tx.cpp b/unittest/storage/tx/it/test_tx.cpp index fd525a83c..f2a6a2c67 100644 --- a/unittest/storage/tx/it/test_tx.cpp +++ b/unittest/storage/tx/it/test_tx.cpp @@ -32,6 +32,9 @@ using namespace share; static ObSharedMemAllocMgr MTL_MEM_ALLOC_MGR; namespace share { + +ObTxDataThrottleGuard::~ObTxDataThrottleGuard() {} + int ObTenantTxDataAllocator::init(const char *label) { int ret = OB_SUCCESS;