[FIX] do throttle after creating tx ctx or rollback to savepoint in replay situation
This commit is contained in:
@ -96,16 +96,9 @@ void *ObTenantTxDataAllocator::alloc(const bool enable_throttle, const int64_t a
|
|||||||
bool is_throttled = false;
|
bool is_throttled = false;
|
||||||
(void)throttle_tool_->alloc_resource<ObTenantTxDataAllocator>(
|
(void)throttle_tool_->alloc_resource<ObTenantTxDataAllocator>(
|
||||||
storage::TX_DATA_SLICE_SIZE, abs_expire_time, is_throttled);
|
storage::TX_DATA_SLICE_SIZE, abs_expire_time, is_throttled);
|
||||||
|
|
||||||
if (OB_UNLIKELY(is_throttled)) {
|
if (OB_UNLIKELY(is_throttled)) {
|
||||||
if (MTL(ObTenantFreezer *)->exist_ls_freezing()) {
|
share::tx_data_throttled_alloc() += storage::TX_DATA_SLICE_SIZE;
|
||||||
(void)throttle_tool_->skip_throttle<ObTenantTxDataAllocator>(storage::TX_DATA_SLICE_SIZE);
|
|
||||||
} else {
|
|
||||||
uint64_t timeout = 10000; // 10s
|
|
||||||
int64_t left_interval = abs_expire_time - ObClockGenerator::getClock();
|
|
||||||
common::ObWaitEventGuard wait_guard(
|
|
||||||
common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval);
|
|
||||||
(void)throttle_tool_->do_throttle<ObTenantTxDataAllocator>(abs_expire_time);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -114,5 +107,45 @@ void *ObTenantTxDataAllocator::alloc(const bool enable_throttle, const int64_t a
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ObTxDataThrottleGuard::ObTxDataThrottleGuard(const bool for_replay, const int64_t abs_expire_time)
|
||||||
|
: for_replay_(for_replay), abs_expire_time_(abs_expire_time)
|
||||||
|
{
|
||||||
|
throttle_tool_ = &(MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool());
|
||||||
|
}
|
||||||
|
|
||||||
|
ObTxDataThrottleGuard::~ObTxDataThrottleGuard()
|
||||||
|
{
|
||||||
|
ObThrottleInfoGuard share_ti_guard;
|
||||||
|
ObThrottleInfoGuard module_ti_guard;
|
||||||
|
|
||||||
|
if (OB_ISNULL(throttle_tool_)) {
|
||||||
|
MDS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "throttle tool is unexpected nullptr", KP(throttle_tool_));
|
||||||
|
} else if (throttle_tool_->is_throttling<ObTenantTxDataAllocator>(share_ti_guard, module_ti_guard)) {
|
||||||
|
if (MTL(ObTenantFreezer *)->exist_ls_freezing()) {
|
||||||
|
(void)throttle_tool_->skip_throttle<ObTenantTxDataAllocator>(
|
||||||
|
share::tx_data_throttled_alloc(), share_ti_guard, module_ti_guard);
|
||||||
|
|
||||||
|
if (module_ti_guard.is_valid()) {
|
||||||
|
module_ti_guard.throttle_info()->reset();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
uint64_t timeout = 10000; // 10s
|
||||||
|
int64_t left_interval = abs_expire_time_ - ObClockGenerator::getClock();
|
||||||
|
common::ObWaitEventGuard wait_guard(
|
||||||
|
common::ObWaitEventIds::MEMSTORE_MEM_PAGE_ALLOC_WAIT, timeout, 0, 0, left_interval);
|
||||||
|
(void)throttle_tool_->do_throttle<ObTenantTxDataAllocator>(abs_expire_time_);
|
||||||
|
|
||||||
|
if (for_replay_) {
|
||||||
|
get_replay_is_writing_throttling() = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset tx data throttled alloc size
|
||||||
|
share::tx_data_throttled_alloc() = 0;
|
||||||
|
} else {
|
||||||
|
// do not need throttle, exit directly
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace share
|
} // namespace share
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
@ -20,6 +20,12 @@
|
|||||||
namespace oceanbase {
|
namespace oceanbase {
|
||||||
namespace share {
|
namespace share {
|
||||||
|
|
||||||
|
OB_INLINE int64_t &tx_data_throttled_alloc()
|
||||||
|
{
|
||||||
|
RLOCAL_INLINE(int64_t, tx_data_throttled_alloc);
|
||||||
|
return tx_data_throttled_alloc;
|
||||||
|
}
|
||||||
|
|
||||||
class ObTenantTxDataAllocator {
|
class ObTenantTxDataAllocator {
|
||||||
public:
|
public:
|
||||||
using SliceAllocator = ObSliceAlloc;
|
using SliceAllocator = ObSliceAlloc;
|
||||||
@ -57,6 +63,18 @@ private:
|
|||||||
SliceAllocator slice_allocator_;
|
SliceAllocator slice_allocator_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class ObTxDataThrottleGuard
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
ObTxDataThrottleGuard(const bool for_replay, const int64_t abs_expire_time);
|
||||||
|
~ObTxDataThrottleGuard();
|
||||||
|
|
||||||
|
private:
|
||||||
|
bool for_replay_;
|
||||||
|
int64_t abs_expire_time_;
|
||||||
|
share::TxShareThrottleTool *throttle_tool_;
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace share
|
} // namespace share
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
||||||
|
|
||||||
|
@ -200,7 +200,8 @@ int64_t ObShareResourceThrottleTool<FakeAllocator, Args...>::expected_wait_time(
|
|||||||
ALLOCATOR::throttle_unit_name(), \
|
ALLOCATOR::throttle_unit_name(), \
|
||||||
K(sleep_time), \
|
K(sleep_time), \
|
||||||
K(left_interval), \
|
K(left_interval), \
|
||||||
K(expected_wait_t)); \
|
K(expected_wait_t), \
|
||||||
|
K(abs_expire_time)); \
|
||||||
if (!has_printed_lbt) { \
|
if (!has_printed_lbt) { \
|
||||||
has_printed_lbt = true; \
|
has_printed_lbt = true; \
|
||||||
oceanbase::share::ObTaskController::get().allow_next_syslog(); \
|
oceanbase::share::ObTaskController::get().allow_next_syslog(); \
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
#define USING_LOG_PREFIX TRANS
|
#define USING_LOG_PREFIX TRANS
|
||||||
|
|
||||||
#include "ob_ls_tx_service.h"
|
#include "ob_ls_tx_service.h"
|
||||||
|
#include "share/throttle/ob_throttle_unit.h"
|
||||||
#include "storage/ls/ob_ls.h"
|
#include "storage/ls/ob_ls.h"
|
||||||
#include "storage/tablelock/ob_table_lock_common.h"
|
#include "storage/tablelock/ob_table_lock_common.h"
|
||||||
#include "storage/tx/ob_trans_ctx_mgr.h"
|
#include "storage/tx/ob_trans_ctx_mgr.h"
|
||||||
@ -191,6 +192,12 @@ int ObLSTxService::get_write_store_ctx(ObTxDesc &tx,
|
|||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
TRANS_LOG(WARN, "not init", K(ret));
|
TRANS_LOG(WARN, "not init", K(ret));
|
||||||
} else {
|
} else {
|
||||||
|
int64_t abs_expire_ts = ObClockGenerator::getClock() + tx.get_timeout_us();
|
||||||
|
if (abs_expire_ts < 0) {
|
||||||
|
abs_expire_ts = ObClockGenerator::getClock() + share::ObThrottleUnit<ObTenantTxDataAllocator>::DEFAULT_MAX_THROTTLE_TIME;
|
||||||
|
}
|
||||||
|
|
||||||
|
ObTxDataThrottleGuard tx_data_throttle_guard(false /* for_replay */, abs_expire_ts);
|
||||||
ret = trans_service_->get_write_store_ctx(tx, snapshot, write_flag, store_ctx, spec_seq_no, false);
|
ret = trans_service_->get_write_store_ctx(tx, snapshot, write_flag, store_ctx, spec_seq_no, false);
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -287,6 +287,9 @@ int ObTxReplayExecutor::try_get_tx_ctx_()
|
|||||||
scheduler,
|
scheduler,
|
||||||
INT64_MAX, /*trans_expired_time_*/
|
INT64_MAX, /*trans_expired_time_*/
|
||||||
ls_tx_srv_->get_trans_service());
|
ls_tx_srv_->get_trans_service());
|
||||||
|
ObTxDataThrottleGuard tx_data_throttle_guard(
|
||||||
|
true /* for_replay_ */,
|
||||||
|
ObClockGenerator::getClock() + share::ObThrottleUnit<ObTenantTxDataAllocator>::DEFAULT_MAX_THROTTLE_TIME);
|
||||||
if (OB_FAIL(ls_tx_srv_->create_tx_ctx(arg, tx_ctx_existed, ctx_))) {
|
if (OB_FAIL(ls_tx_srv_->create_tx_ctx(arg, tx_ctx_existed, ctx_))) {
|
||||||
TRANS_LOG(WARN, "get_tx_ctx error", K(ret), K(tx_id), KP(ctx_));
|
TRANS_LOG(WARN, "get_tx_ctx error", K(ret), K(tx_id), KP(ctx_));
|
||||||
} else {
|
} else {
|
||||||
@ -399,6 +402,9 @@ int ObTxReplayExecutor::replay_rollback_to_()
|
|||||||
const bool tx_queue = is_tx_log_replay_queue();
|
const bool tx_queue = is_tx_log_replay_queue();
|
||||||
ObTxRollbackToLog log;
|
ObTxRollbackToLog log;
|
||||||
const bool pre_barrier = base_header_.need_pre_replay_barrier();
|
const bool pre_barrier = base_header_.need_pre_replay_barrier();
|
||||||
|
ObTxDataThrottleGuard tx_data_throttle_guard(
|
||||||
|
true /* for_replay_ */,
|
||||||
|
ObClockGenerator::getClock() + share::ObThrottleUnit<ObTenantTxDataAllocator>::DEFAULT_MAX_THROTTLE_TIME);
|
||||||
if (OB_FAIL(log_block_.deserialize_log_body(log))) {
|
if (OB_FAIL(log_block_.deserialize_log_body(log))) {
|
||||||
TRANS_LOG(WARN, "[Replay Tx] deserialize log body error", KR(ret), "log_type", "RollbackTo",
|
TRANS_LOG(WARN, "[Replay Tx] deserialize log body error", KR(ret), "log_type", "RollbackTo",
|
||||||
K(lsn_), K(log_ts_ns_));
|
K(lsn_), K(log_ts_ns_));
|
||||||
|
@ -34,6 +34,7 @@ static ObSharedMemAllocMgr MTL_MEM_ALLOC_MGR;
|
|||||||
namespace share {
|
namespace share {
|
||||||
|
|
||||||
ObMdsThrottleGuard::~ObMdsThrottleGuard() {}
|
ObMdsThrottleGuard::~ObMdsThrottleGuard() {}
|
||||||
|
ObTxDataThrottleGuard::~ObTxDataThrottleGuard() {}
|
||||||
|
|
||||||
int ObTenantTxDataAllocator::init(const char *label)
|
int ObTenantTxDataAllocator::init(const char *label)
|
||||||
{
|
{
|
||||||
|
@ -32,6 +32,9 @@ using namespace share;
|
|||||||
static ObSharedMemAllocMgr MTL_MEM_ALLOC_MGR;
|
static ObSharedMemAllocMgr MTL_MEM_ALLOC_MGR;
|
||||||
|
|
||||||
namespace share {
|
namespace share {
|
||||||
|
|
||||||
|
ObTxDataThrottleGuard::~ObTxDataThrottleGuard() {}
|
||||||
|
|
||||||
int ObTenantTxDataAllocator::init(const char *label)
|
int ObTenantTxDataAllocator::init(const char *label)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
Reference in New Issue
Block a user