diff --git a/src/share/allocator/ob_mds_allocator.cpp b/src/share/allocator/ob_mds_allocator.cpp index e14ad08d41..a901996c0b 100644 --- a/src/share/allocator/ob_mds_allocator.cpp +++ b/src/share/allocator/ob_mds_allocator.cpp @@ -17,6 +17,7 @@ #include "share/throttle/ob_share_throttle_define.h" #include "storage/multi_data_source/runtime_utility/mds_tenant_service.h" #include "storage/tx_storage/ob_tenant_freezer.h" +#include "storage/tx_storage/ob_ls_service.h" using namespace oceanbase::storage::mds; @@ -171,7 +172,8 @@ void ObTenantBufferCtxAllocator::free(void *ptr) MTL(ObTenantMdsService*)->erase_alloc_backtrace(ptr); } -ObMdsThrottleGuard::ObMdsThrottleGuard(const bool for_replay, const int64_t abs_expire_time) : for_replay_(for_replay), abs_expire_time_(abs_expire_time) +ObMdsThrottleGuard::ObMdsThrottleGuard(const share::ObLSID ls_id, const bool for_replay, const int64_t abs_expire_time) + : ls_id_(ls_id), for_replay_(for_replay), abs_expire_time_(abs_expire_time) { throttle_tool_ = &(MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool()); if (0 == abs_expire_time) { @@ -183,14 +185,29 @@ ObMdsThrottleGuard::ObMdsThrottleGuard(const bool for_replay, const int64_t abs_ ObMdsThrottleGuard::~ObMdsThrottleGuard() { + int ret = OB_SUCCESS; + ObLSHandle ls_handle; ObThrottleInfoGuard share_ti_guard; ObThrottleInfoGuard module_ti_guard; if (OB_ISNULL(throttle_tool_)) { MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "throttle tool is unexpected nullptr", KP(throttle_tool_)); } else if (throttle_tool_->is_throttling(share_ti_guard, module_ti_guard)) { - (void)TxShareMemThrottleUtil::do_throttle( - for_replay_, abs_expire_time_, share::mds_throttled_alloc(), *throttle_tool_, share_ti_guard, module_ti_guard); + + if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD))) { + STORAGE_LOG(WARN, "get ls handle failed", KR(ret), K(ls_id_)); + } else if (OB_ISNULL(ls_handle.get_ls())) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(ERROR, "get ls handle failed", KR(ret), K(ls_id_)); + } else { + (void)TxShareMemThrottleUtil::do_throttle(for_replay_, + abs_expire_time_, + share::mds_throttled_alloc(), + *(ls_handle.get_ls()), + *throttle_tool_, + share_ti_guard, + module_ti_guard); + } if (throttle_tool_->still_throttling(share_ti_guard, module_ti_guard)) { (void)throttle_tool_->skip_throttle( diff --git a/src/share/allocator/ob_mds_allocator.h b/src/share/allocator/ob_mds_allocator.h index 31de9324f4..1d572dc007 100644 --- a/src/share/allocator/ob_mds_allocator.h +++ b/src/share/allocator/ob_mds_allocator.h @@ -15,9 +15,9 @@ #include "lib/allocator/ob_vslice_alloc.h" #include "share/throttle/ob_share_throttle_define.h" +#include "share/ob_ls_id.h" namespace oceanbase { - namespace share { OB_INLINE int64_t &mds_throttled_alloc() @@ -64,10 +64,11 @@ struct ObTenantBufferCtxAllocator : public ObIAllocator// for now, it is just a class ObMdsThrottleGuard { public: - ObMdsThrottleGuard(const bool for_replay, const int64_t abs_expire_time); + ObMdsThrottleGuard(const share::ObLSID ls_id, const bool for_replay, const int64_t abs_expire_time); ~ObMdsThrottleGuard(); private: + const share::ObLSID ls_id_; bool for_replay_; int64_t abs_expire_time_; share::TxShareThrottleTool *throttle_tool_; diff --git a/src/share/allocator/ob_shared_memory_allocator_mgr.h b/src/share/allocator/ob_shared_memory_allocator_mgr.h index 9cf88bc22c..1cdc15dbe7 100644 --- a/src/share/allocator/ob_shared_memory_allocator_mgr.h +++ b/src/share/allocator/ob_shared_memory_allocator_mgr.h @@ -19,6 +19,7 @@ #include "share/throttle/ob_share_resource_throttle_tool.h" #include "share/rc/ob_tenant_base.h" #include "storage/tx_storage/ob_tenant_freezer.h" +#include "storage/ls/ob_ls.h" namespace oceanbase { namespace share { @@ -92,6 +93,7 @@ public: static int do_throttle(const bool for_replay, const int64_t abs_expire_time, const int64_t throttle_memory_size, + const ObLS &ls, TxShareThrottleTool &throttle_tool, ObThrottleInfoGuard &share_ti_guard, ObThrottleInfoGuard &module_ti_guard) @@ -112,8 +114,8 @@ public: while (throttle_tool.still_throttling(share_ti_guard, module_ti_guard) && (left_interval > 0)) { int64_t expected_wait_time = 0; - if (for_replay && MTL(ObTenantFreezer *)->exist_ls_throttle_is_skipping()) { - // skip throttle if ls freeze exists + if ((for_replay && MTL(ObTenantFreezer *)->exist_ls_throttle_is_skipping()) || ls.is_offline()) { + // skip throttle if : 1) throttle need skipping; 2) this logstream offline break; } else if ((expected_wait_time = throttle_tool.expected_wait_time(share_ti_guard, module_ti_guard)) <= 0) { diff --git a/src/share/allocator/ob_tx_data_allocator.cpp b/src/share/allocator/ob_tx_data_allocator.cpp index a1f10d051c..2b84ff15d6 100644 --- a/src/share/allocator/ob_tx_data_allocator.cpp +++ b/src/share/allocator/ob_tx_data_allocator.cpp @@ -16,6 +16,7 @@ #include "share/allocator/ob_shared_memory_allocator_mgr.h" #include "share/rc/ob_tenant_base.h" +#include "storage/tx_storage/ob_ls_service.h" #include "storage/tx/ob_tx_data_define.h" #include "storage/tx_storage/ob_tenant_freezer.h" @@ -109,8 +110,10 @@ void *ObTenantTxDataAllocator::alloc(const bool enable_throttle, const int64_t a return res; } -ObTxDataThrottleGuard::ObTxDataThrottleGuard(const bool for_replay, const int64_t abs_expire_time) - : for_replay_(for_replay), abs_expire_time_(abs_expire_time) +ObTxDataThrottleGuard::ObTxDataThrottleGuard(const ObLSID ls_id, + const bool for_replay, + const int64_t abs_expire_time) + : ls_id_(ls_id), for_replay_(for_replay), abs_expire_time_(abs_expire_time) { throttle_tool_ = &(MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool()); if (0 == abs_expire_time) { @@ -122,18 +125,28 @@ ObTxDataThrottleGuard::ObTxDataThrottleGuard(const bool for_replay, const int64_ ObTxDataThrottleGuard::~ObTxDataThrottleGuard() { + int ret = OB_SUCCESS; + ObLSHandle ls_handle; ObThrottleInfoGuard share_ti_guard; ObThrottleInfoGuard module_ti_guard; if (OB_ISNULL(throttle_tool_)) { MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "throttle tool is unexpected nullptr", KP(throttle_tool_)); } else if (throttle_tool_->is_throttling(share_ti_guard, module_ti_guard)) { - (void)TxShareMemThrottleUtil::do_throttle(for_replay_, - abs_expire_time_, - share::tx_data_throttled_alloc(), - *throttle_tool_, - share_ti_guard, - module_ti_guard); + if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD))) { + STORAGE_LOG(WARN, "get ls handle failed", KR(ret), K(ls_id_)); + } else if (OB_ISNULL(ls_handle.get_ls())) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(ERROR, "get ls handle failed", KR(ret), K(ls_id_)); + } else { + (void)TxShareMemThrottleUtil::do_throttle(for_replay_, + abs_expire_time_, + share::tx_data_throttled_alloc(), + *(ls_handle.get_ls()), + *throttle_tool_, + share_ti_guard, + module_ti_guard); + } if (throttle_tool_->still_throttling(share_ti_guard, module_ti_guard)) { (void)throttle_tool_->skip_throttle( diff --git a/src/share/allocator/ob_tx_data_allocator.h b/src/share/allocator/ob_tx_data_allocator.h index 56c3bf8cbd..e32e737c03 100644 --- a/src/share/allocator/ob_tx_data_allocator.h +++ b/src/share/allocator/ob_tx_data_allocator.h @@ -14,12 +14,14 @@ #define OCEANBASE_ALLOCATOR_OB_TX_DATA_ALLOCATOR_H_ #include "lib/allocator/ob_slice_alloc.h" -#include "share/ob_delegate.h" -#include "share/throttle/ob_share_throttle_define.h" #include "lib/allocator/ob_vslice_alloc.h" +#include "share/ob_delegate.h" +#include "share/ob_ls_id.h" +#include "share/throttle/ob_share_throttle_define.h" namespace oceanbase { namespace share { +class ObLSID; OB_INLINE int64_t &tx_data_throttled_alloc() { @@ -67,10 +69,11 @@ private: class ObTxDataThrottleGuard { public: - ObTxDataThrottleGuard(const bool for_replay, const int64_t abs_expire_time); + ObTxDataThrottleGuard(const share::ObLSID ls_id, const bool for_replay, const int64_t abs_expire_time); ~ObTxDataThrottleGuard(); private: + const share::ObLSID ls_id_; bool for_replay_; int64_t abs_expire_time_; share::TxShareThrottleTool *throttle_tool_; diff --git a/src/storage/ls/ob_ls_tx_service.cpp b/src/storage/ls/ob_ls_tx_service.cpp index fd8be1fd97..85085da5eb 100644 --- a/src/storage/ls/ob_ls_tx_service.cpp +++ b/src/storage/ls/ob_ls_tx_service.cpp @@ -252,7 +252,7 @@ int ObLSTxService::get_write_store_ctx(ObTxDesc &tx, abs_expire_ts = ObClockGenerator::getClock() + share::ObThrottleUnit::DEFAULT_MAX_THROTTLE_TIME; } - ObTxDataThrottleGuard tx_data_throttle_guard(false /* for_replay */, abs_expire_ts); + ObTxDataThrottleGuard tx_data_throttle_guard(ls_id_, false /* for_replay */, abs_expire_ts); ret = trans_service_->get_write_store_ctx(tx, snapshot, write_flag, store_ctx, spec_seq_no, false); } return ret; diff --git a/src/storage/ob_storage_table_guard.cpp b/src/storage/ob_storage_table_guard.cpp index 1f37afe9f2..dd739f63db 100644 --- a/src/storage/ob_storage_table_guard.cpp +++ b/src/storage/ob_storage_table_guard.cpp @@ -74,12 +74,21 @@ void ObStorageTableGuard::throttle_if_needed_() // only do throttle on active memtable if (OB_NOT_NULL(memtable_) && memtable_->is_active_memtable()) { reset(); - (void)TxShareMemThrottleUtil::do_throttle(for_replay_, - store_ctx_.timeout_, - share::memstore_throttled_alloc(), - throttle_tool, - share_ti_guard, - module_ti_guard); + ObLSHandle ls_handle; + ObLS *ls = nullptr; + const ObLSID &ls_id = tablet_->get_tablet_meta().ls_id_; + if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) { + STORAGE_LOG(WARN, "get ls handle failed", KR(ret), K(ls_id)); + } else if (OB_ISNULL(ls = ls_handle.get_ls())) { + } else { + (void)TxShareMemThrottleUtil::do_throttle(for_replay_, + store_ctx_.timeout_, + share::memstore_throttled_alloc(), + *ls, + throttle_tool, + share_ti_guard, + module_ti_guard); + } } // if throttle is skipped due to some reasons, advance clock by call skip_throttle() and clean throttle status diff --git a/src/storage/tx/ob_trans_service.cpp b/src/storage/tx/ob_trans_service.cpp index 23f98c7721..6608048aa8 100644 --- a/src/storage/tx/ob_trans_service.cpp +++ b/src/storage/tx/ob_trans_service.cpp @@ -979,7 +979,7 @@ int ObTransService::register_mds_into_ctx_(ObTxDesc &tx_desc, TRANS_LOG(WARN, "get store ctx failed", KR(ret), K(tx_desc), K(ls_id)); } else { ObPartTransCtx *ctx = store_ctx.mvcc_acc_ctx_.tx_ctx_; - ObMdsThrottleGuard mds_throttle_guard(false/* for_replay */, ctx->get_trans_expired_time()); + ObMdsThrottleGuard mds_throttle_guard(ls_id, false /* for_replay */, ctx->get_trans_expired_time()); if (OB_ISNULL(ctx)) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "unexpected null ptr", KR(ret), K(tx_desc), K(ls_id), K(type)); diff --git a/src/storage/tx/ob_tx_replay_executor.cpp b/src/storage/tx/ob_tx_replay_executor.cpp index 13594d2236..bdf79986cb 100644 --- a/src/storage/tx/ob_tx_replay_executor.cpp +++ b/src/storage/tx/ob_tx_replay_executor.cpp @@ -301,6 +301,7 @@ int ObTxReplayExecutor::try_get_tx_ctx_() INT64_MAX, /*trans_expired_time_*/ ls_tx_srv_->get_trans_service()); ObTxDataThrottleGuard tx_data_throttle_guard( + ls_id_, true /* for_replay_ */, ObClockGenerator::getClock() + share::ObThrottleUnit::DEFAULT_MAX_THROTTLE_TIME); if (OB_FAIL(ls_tx_srv_->create_tx_ctx(arg, tx_ctx_existed, ctx_))) { @@ -418,6 +419,7 @@ int ObTxReplayExecutor::replay_rollback_to_() ObTxRollbackToLog log; const bool pre_barrier = base_header_.need_pre_replay_barrier(); ObTxDataThrottleGuard tx_data_throttle_guard( + ls_id_, true /* for_replay_ */, ObClockGenerator::getClock() + share::ObThrottleUnit::DEFAULT_MAX_THROTTLE_TIME); if (OB_FAIL(log_block_.deserialize_log_body(log))) { @@ -466,7 +468,8 @@ int ObTxReplayExecutor::replay_multi_source_data_() int ret = OB_SUCCESS; ObTxMultiDataSourceLog log; - ObMdsThrottleGuard mds_throttle_guard(true /* for_replay */, + ObMdsThrottleGuard mds_throttle_guard(ls_id_, + true /* for_replay */, ObClockGenerator::getClock() + share::ObThrottleUnit::DEFAULT_MAX_THROTTLE_TIME);