quit throttle if ls offline
This commit is contained in:
@ -17,6 +17,7 @@
|
||||
#include "share/throttle/ob_share_throttle_define.h"
|
||||
#include "storage/multi_data_source/runtime_utility/mds_tenant_service.h"
|
||||
#include "storage/tx_storage/ob_tenant_freezer.h"
|
||||
#include "storage/tx_storage/ob_ls_service.h"
|
||||
|
||||
using namespace oceanbase::storage::mds;
|
||||
|
||||
@ -171,7 +172,8 @@ void ObTenantBufferCtxAllocator::free(void *ptr)
|
||||
MTL(ObTenantMdsService*)->erase_alloc_backtrace(ptr);
|
||||
}
|
||||
|
||||
ObMdsThrottleGuard::ObMdsThrottleGuard(const bool for_replay, const int64_t abs_expire_time) : for_replay_(for_replay), abs_expire_time_(abs_expire_time)
|
||||
ObMdsThrottleGuard::ObMdsThrottleGuard(const share::ObLSID ls_id, const bool for_replay, const int64_t abs_expire_time)
|
||||
: ls_id_(ls_id), for_replay_(for_replay), abs_expire_time_(abs_expire_time)
|
||||
{
|
||||
throttle_tool_ = &(MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool());
|
||||
if (0 == abs_expire_time) {
|
||||
@ -183,14 +185,29 @@ ObMdsThrottleGuard::ObMdsThrottleGuard(const bool for_replay, const int64_t abs_
|
||||
|
||||
ObMdsThrottleGuard::~ObMdsThrottleGuard()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSHandle ls_handle;
|
||||
ObThrottleInfoGuard share_ti_guard;
|
||||
ObThrottleInfoGuard module_ti_guard;
|
||||
|
||||
if (OB_ISNULL(throttle_tool_)) {
|
||||
MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "throttle tool is unexpected nullptr", KP(throttle_tool_));
|
||||
} else if (throttle_tool_->is_throttling<ObTenantMdsAllocator>(share_ti_guard, module_ti_guard)) {
|
||||
(void)TxShareMemThrottleUtil::do_throttle<ObTenantMdsAllocator>(
|
||||
for_replay_, abs_expire_time_, share::mds_throttled_alloc(), *throttle_tool_, share_ti_guard, module_ti_guard);
|
||||
|
||||
if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
||||
STORAGE_LOG(WARN, "get ls handle failed", KR(ret), K(ls_id_));
|
||||
} else if (OB_ISNULL(ls_handle.get_ls())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "get ls handle failed", KR(ret), K(ls_id_));
|
||||
} else {
|
||||
(void)TxShareMemThrottleUtil::do_throttle<ObTenantMdsAllocator>(for_replay_,
|
||||
abs_expire_time_,
|
||||
share::mds_throttled_alloc(),
|
||||
*(ls_handle.get_ls()),
|
||||
*throttle_tool_,
|
||||
share_ti_guard,
|
||||
module_ti_guard);
|
||||
}
|
||||
|
||||
if (throttle_tool_->still_throttling<ObTenantMdsAllocator>(share_ti_guard, module_ti_guard)) {
|
||||
(void)throttle_tool_->skip_throttle<ObTenantMdsAllocator>(
|
||||
|
@ -15,9 +15,9 @@
|
||||
|
||||
#include "lib/allocator/ob_vslice_alloc.h"
|
||||
#include "share/throttle/ob_share_throttle_define.h"
|
||||
#include "share/ob_ls_id.h"
|
||||
|
||||
namespace oceanbase {
|
||||
|
||||
namespace share {
|
||||
|
||||
OB_INLINE int64_t &mds_throttled_alloc()
|
||||
@ -64,10 +64,11 @@ struct ObTenantBufferCtxAllocator : public ObIAllocator// for now, it is just a
|
||||
class ObMdsThrottleGuard
|
||||
{
|
||||
public:
|
||||
ObMdsThrottleGuard(const bool for_replay, const int64_t abs_expire_time);
|
||||
ObMdsThrottleGuard(const share::ObLSID ls_id, const bool for_replay, const int64_t abs_expire_time);
|
||||
~ObMdsThrottleGuard();
|
||||
|
||||
private:
|
||||
const share::ObLSID ls_id_;
|
||||
bool for_replay_;
|
||||
int64_t abs_expire_time_;
|
||||
share::TxShareThrottleTool *throttle_tool_;
|
||||
|
@ -19,6 +19,7 @@
|
||||
#include "share/throttle/ob_share_resource_throttle_tool.h"
|
||||
#include "share/rc/ob_tenant_base.h"
|
||||
#include "storage/tx_storage/ob_tenant_freezer.h"
|
||||
#include "storage/ls/ob_ls.h"
|
||||
|
||||
namespace oceanbase {
|
||||
namespace share {
|
||||
@ -92,6 +93,7 @@ public:
|
||||
static int do_throttle(const bool for_replay,
|
||||
const int64_t abs_expire_time,
|
||||
const int64_t throttle_memory_size,
|
||||
const ObLS &ls,
|
||||
TxShareThrottleTool &throttle_tool,
|
||||
ObThrottleInfoGuard &share_ti_guard,
|
||||
ObThrottleInfoGuard &module_ti_guard)
|
||||
@ -112,8 +114,8 @@ public:
|
||||
while (throttle_tool.still_throttling<ALLOCATOR>(share_ti_guard, module_ti_guard) &&
|
||||
(left_interval > 0)) {
|
||||
int64_t expected_wait_time = 0;
|
||||
if (for_replay && MTL(ObTenantFreezer *)->exist_ls_throttle_is_skipping()) {
|
||||
// skip throttle if ls freeze exists
|
||||
if ((for_replay && MTL(ObTenantFreezer *)->exist_ls_throttle_is_skipping()) || ls.is_offline()) {
|
||||
// skip throttle if : 1) throttle need skipping; 2) this logstream offline
|
||||
break;
|
||||
} else if ((expected_wait_time =
|
||||
throttle_tool.expected_wait_time<ALLOCATOR>(share_ti_guard, module_ti_guard)) <= 0) {
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
#include "share/allocator/ob_shared_memory_allocator_mgr.h"
|
||||
#include "share/rc/ob_tenant_base.h"
|
||||
#include "storage/tx_storage/ob_ls_service.h"
|
||||
#include "storage/tx/ob_tx_data_define.h"
|
||||
#include "storage/tx_storage/ob_tenant_freezer.h"
|
||||
|
||||
@ -109,8 +110,10 @@ void *ObTenantTxDataAllocator::alloc(const bool enable_throttle, const int64_t a
|
||||
return res;
|
||||
}
|
||||
|
||||
ObTxDataThrottleGuard::ObTxDataThrottleGuard(const bool for_replay, const int64_t abs_expire_time)
|
||||
: for_replay_(for_replay), abs_expire_time_(abs_expire_time)
|
||||
ObTxDataThrottleGuard::ObTxDataThrottleGuard(const ObLSID ls_id,
|
||||
const bool for_replay,
|
||||
const int64_t abs_expire_time)
|
||||
: ls_id_(ls_id), for_replay_(for_replay), abs_expire_time_(abs_expire_time)
|
||||
{
|
||||
throttle_tool_ = &(MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool());
|
||||
if (0 == abs_expire_time) {
|
||||
@ -122,18 +125,28 @@ ObTxDataThrottleGuard::ObTxDataThrottleGuard(const bool for_replay, const int64_
|
||||
|
||||
ObTxDataThrottleGuard::~ObTxDataThrottleGuard()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSHandle ls_handle;
|
||||
ObThrottleInfoGuard share_ti_guard;
|
||||
ObThrottleInfoGuard module_ti_guard;
|
||||
|
||||
if (OB_ISNULL(throttle_tool_)) {
|
||||
MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "throttle tool is unexpected nullptr", KP(throttle_tool_));
|
||||
} else if (throttle_tool_->is_throttling<ObTenantTxDataAllocator>(share_ti_guard, module_ti_guard)) {
|
||||
(void)TxShareMemThrottleUtil::do_throttle<ObTenantTxDataAllocator>(for_replay_,
|
||||
abs_expire_time_,
|
||||
share::tx_data_throttled_alloc(),
|
||||
*throttle_tool_,
|
||||
share_ti_guard,
|
||||
module_ti_guard);
|
||||
if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
||||
STORAGE_LOG(WARN, "get ls handle failed", KR(ret), K(ls_id_));
|
||||
} else if (OB_ISNULL(ls_handle.get_ls())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "get ls handle failed", KR(ret), K(ls_id_));
|
||||
} else {
|
||||
(void)TxShareMemThrottleUtil::do_throttle<ObTenantTxDataAllocator>(for_replay_,
|
||||
abs_expire_time_,
|
||||
share::tx_data_throttled_alloc(),
|
||||
*(ls_handle.get_ls()),
|
||||
*throttle_tool_,
|
||||
share_ti_guard,
|
||||
module_ti_guard);
|
||||
}
|
||||
|
||||
if (throttle_tool_->still_throttling<ObTenantTxDataAllocator>(share_ti_guard, module_ti_guard)) {
|
||||
(void)throttle_tool_->skip_throttle<ObTenantTxDataAllocator>(
|
||||
|
@ -14,12 +14,14 @@
|
||||
#define OCEANBASE_ALLOCATOR_OB_TX_DATA_ALLOCATOR_H_
|
||||
|
||||
#include "lib/allocator/ob_slice_alloc.h"
|
||||
#include "share/ob_delegate.h"
|
||||
#include "share/throttle/ob_share_throttle_define.h"
|
||||
#include "lib/allocator/ob_vslice_alloc.h"
|
||||
#include "share/ob_delegate.h"
|
||||
#include "share/ob_ls_id.h"
|
||||
#include "share/throttle/ob_share_throttle_define.h"
|
||||
|
||||
namespace oceanbase {
|
||||
namespace share {
|
||||
class ObLSID;
|
||||
|
||||
OB_INLINE int64_t &tx_data_throttled_alloc()
|
||||
{
|
||||
@ -67,10 +69,11 @@ private:
|
||||
class ObTxDataThrottleGuard
|
||||
{
|
||||
public:
|
||||
ObTxDataThrottleGuard(const bool for_replay, const int64_t abs_expire_time);
|
||||
ObTxDataThrottleGuard(const share::ObLSID ls_id, const bool for_replay, const int64_t abs_expire_time);
|
||||
~ObTxDataThrottleGuard();
|
||||
|
||||
private:
|
||||
const share::ObLSID ls_id_;
|
||||
bool for_replay_;
|
||||
int64_t abs_expire_time_;
|
||||
share::TxShareThrottleTool *throttle_tool_;
|
||||
|
@ -252,7 +252,7 @@ int ObLSTxService::get_write_store_ctx(ObTxDesc &tx,
|
||||
abs_expire_ts = ObClockGenerator::getClock() + share::ObThrottleUnit<ObTenantTxDataAllocator>::DEFAULT_MAX_THROTTLE_TIME;
|
||||
}
|
||||
|
||||
ObTxDataThrottleGuard tx_data_throttle_guard(false /* for_replay */, abs_expire_ts);
|
||||
ObTxDataThrottleGuard tx_data_throttle_guard(ls_id_, false /* for_replay */, abs_expire_ts);
|
||||
ret = trans_service_->get_write_store_ctx(tx, snapshot, write_flag, store_ctx, spec_seq_no, false);
|
||||
}
|
||||
return ret;
|
||||
|
@ -74,12 +74,21 @@ void ObStorageTableGuard::throttle_if_needed_()
|
||||
// only do throttle on active memtable
|
||||
if (OB_NOT_NULL(memtable_) && memtable_->is_active_memtable()) {
|
||||
reset();
|
||||
(void)TxShareMemThrottleUtil::do_throttle<ObMemstoreAllocator>(for_replay_,
|
||||
store_ctx_.timeout_,
|
||||
share::memstore_throttled_alloc(),
|
||||
throttle_tool,
|
||||
share_ti_guard,
|
||||
module_ti_guard);
|
||||
ObLSHandle ls_handle;
|
||||
ObLS *ls = nullptr;
|
||||
const ObLSID &ls_id = tablet_->get_tablet_meta().ls_id_;
|
||||
if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
||||
STORAGE_LOG(WARN, "get ls handle failed", KR(ret), K(ls_id));
|
||||
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
|
||||
} else {
|
||||
(void)TxShareMemThrottleUtil::do_throttle<ObMemstoreAllocator>(for_replay_,
|
||||
store_ctx_.timeout_,
|
||||
share::memstore_throttled_alloc(),
|
||||
*ls,
|
||||
throttle_tool,
|
||||
share_ti_guard,
|
||||
module_ti_guard);
|
||||
}
|
||||
}
|
||||
|
||||
// if throttle is skipped due to some reasons, advance clock by call skip_throttle() and clean throttle status
|
||||
|
@ -979,7 +979,7 @@ int ObTransService::register_mds_into_ctx_(ObTxDesc &tx_desc,
|
||||
TRANS_LOG(WARN, "get store ctx failed", KR(ret), K(tx_desc), K(ls_id));
|
||||
} else {
|
||||
ObPartTransCtx *ctx = store_ctx.mvcc_acc_ctx_.tx_ctx_;
|
||||
ObMdsThrottleGuard mds_throttle_guard(false/* for_replay */, ctx->get_trans_expired_time());
|
||||
ObMdsThrottleGuard mds_throttle_guard(ls_id, false /* for_replay */, ctx->get_trans_expired_time());
|
||||
if (OB_ISNULL(ctx)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "unexpected null ptr", KR(ret), K(tx_desc), K(ls_id), K(type));
|
||||
|
@ -301,6 +301,7 @@ int ObTxReplayExecutor::try_get_tx_ctx_()
|
||||
INT64_MAX, /*trans_expired_time_*/
|
||||
ls_tx_srv_->get_trans_service());
|
||||
ObTxDataThrottleGuard tx_data_throttle_guard(
|
||||
ls_id_,
|
||||
true /* for_replay_ */,
|
||||
ObClockGenerator::getClock() + share::ObThrottleUnit<ObTenantTxDataAllocator>::DEFAULT_MAX_THROTTLE_TIME);
|
||||
if (OB_FAIL(ls_tx_srv_->create_tx_ctx(arg, tx_ctx_existed, ctx_))) {
|
||||
@ -418,6 +419,7 @@ int ObTxReplayExecutor::replay_rollback_to_()
|
||||
ObTxRollbackToLog log;
|
||||
const bool pre_barrier = base_header_.need_pre_replay_barrier();
|
||||
ObTxDataThrottleGuard tx_data_throttle_guard(
|
||||
ls_id_,
|
||||
true /* for_replay_ */,
|
||||
ObClockGenerator::getClock() + share::ObThrottleUnit<ObTenantTxDataAllocator>::DEFAULT_MAX_THROTTLE_TIME);
|
||||
if (OB_FAIL(log_block_.deserialize_log_body(log))) {
|
||||
@ -466,7 +468,8 @@ int ObTxReplayExecutor::replay_multi_source_data_()
|
||||
int ret = OB_SUCCESS;
|
||||
ObTxMultiDataSourceLog log;
|
||||
|
||||
ObMdsThrottleGuard mds_throttle_guard(true /* for_replay */,
|
||||
ObMdsThrottleGuard mds_throttle_guard(ls_id_,
|
||||
true /* for_replay */,
|
||||
ObClockGenerator::getClock() +
|
||||
share::ObThrottleUnit<ObTenantMdsAllocator>::DEFAULT_MAX_THROTTLE_TIME);
|
||||
|
||||
|
Reference in New Issue
Block a user