[FIX] optimize freeze skip throttle && freeze pend replay

This commit is contained in:
ZenoWang 2024-07-10 10:54:20 +00:00 committed by ob-robot
parent 60cd981db6
commit 806e30570b
13 changed files with 430 additions and 177 deletions

View File

@ -77,9 +77,6 @@ namespace storage
bool ObFreezer::is_freeze(uint32_t freeze_flag) const
{
ob_usleep(rand() % SLEEP_TIME);
if (freeze_flag == UINT32_MAX) {
freeze_flag = (ATOMIC_LOAD(&freeze_flag_));
}
return 1 == (freeze_flag >> 31);
}
namespace checkpoint

View File

@ -112,7 +112,7 @@ public:
while (throttle_tool.still_throttling<ALLOCATOR>(share_ti_guard, module_ti_guard) &&
(left_interval > 0)) {
int64_t expected_wait_time = 0;
if (for_replay && MTL(ObTenantFreezer *)->exist_ls_freezing()) {
if (for_replay && MTL(ObTenantFreezer *)->exist_ls_throttle_is_skipping()) {
// skip throttle if ls freeze exists
break;
} else if ((expected_wait_time =

View File

@ -83,6 +83,9 @@ public:
template <typename ALLOCATOR>
void alloc_resource(const int64_t resource_size, const int64_t abs_expire_time, bool &is_throttled);
template <typename ALLOCATOR>
bool has_triggered_throttle();
template <typename ALLOCATOR>
bool is_throttling(ObThrottleInfoGuard &share_ti_guard, ObThrottleInfoGuard &module_ti_guard);

View File

@ -122,6 +122,24 @@ void ObShareResourceThrottleTool<FakeAllocator, Args...>::alloc_resource(const i
}
}
template <typename FakeAllocator, typename... Args>
template <typename ALLOCATOR>
bool ObShareResourceThrottleTool<FakeAllocator, Args...>::has_triggered_throttle()
{
ACQUIRE_THROTTLE_UNIT(FakeAllocator, share_throttle_unit);
ACQUIRE_UNIT_ALLOCATOR(ALLOCATOR, module_throttle_unit, allocator);
int64_t module_hold = allocator->hold();
SumModuleHoldResourceFunctor sum_hold_func;
(void)module_throttle_tuple_.for_each(sum_hold_func);
bool share_throttled = share_throttle_unit.has_triggered_throttle(sum_hold_func.sum_);
bool module_throttled = module_throttle_unit.has_triggered_throttle(allocator->hold());
bool has_triggered_throttle = (share_throttled | module_throttled);
return has_triggered_throttle;
}
template <typename FakeAllocator, typename... Args>
template <typename ALLOCATOR>
bool ObShareResourceThrottleTool<FakeAllocator, Args...>::is_throttling(ObThrottleInfoGuard &share_ti_guard,

View File

@ -97,6 +97,13 @@ public:
const int64_t abs_expire_time,
bool &is_throttled);
/**
* @brief Check if this throttle unit has triggerd throttle but do not alloc any resource
* ATTENTION : This function is different from is_throttling(). is_throttling() only checks if current
* thread is throttling, but this function checks if this tenant is throttling
*/
bool has_triggered_throttle(const int64_t holding_resource);
/**
* @brief Check if this throttle unit is throttling status.
*

View File

@ -101,6 +101,27 @@ int ObThrottleUnit<ALLOCATOR>::alloc_resource(const int64_t holding_size,
return ret;
}
template <typename ALLOCATOR>
bool ObThrottleUnit<ALLOCATOR>::has_triggered_throttle(const int64_t holding_size)
{
int ret = OB_SUCCESS;
bool triggered_throttle = false;
int64_t trigger_percentage = throttle_trigger_percentage_;
if (OB_LIKELY(trigger_percentage < 100)) {
int64_t throttle_trigger = resource_limit_ * trigger_percentage / 100;
if (OB_UNLIKELY(holding_size < 0 || trigger_percentage <= 0)) {
triggered_throttle = true;
SHARE_LOG(ERROR, "invalid arguments", K(holding_size), K(resource_limit_), K(trigger_percentage));
} else if (holding_size > throttle_trigger) {
triggered_throttle = true;
} else {
triggered_throttle = false;
}
}
return triggered_throttle;
}
template <typename ALLOCATOR>
bool ObThrottleUnit<ALLOCATOR>::is_throttling(ObThrottleInfoGuard &ti_guard)
{

View File

@ -14,6 +14,7 @@
#include "common/ob_tablet_id.h"
#include "logservice/ob_log_service.h"
#include "share/ob_force_print_log.h"
#include "share/allocator/ob_shared_memory_allocator_mgr.h"
#include "storage/ls/ob_ls.h"
#include "storage/ls/ob_freezer.h"
#include "storage/ls/ob_ls_tx_service.h"
@ -389,6 +390,8 @@ ObFreezer::ObFreezer()
enable_(false),
is_inited_(false),
is_async_tablet_freeze_task_running_(false),
throttle_is_skipping_(false),
tenant_replay_is_pending_(false),
async_freeze_tablets_() {}
ObFreezer::ObFreezer(ObLS *ls)
@ -406,6 +409,8 @@ ObFreezer::ObFreezer(ObLS *ls)
enable_(false),
is_inited_(false),
is_async_tablet_freeze_task_running_(false),
throttle_is_skipping_(false),
tenant_replay_is_pending_(false),
async_freeze_tablets_() {}
ObFreezer::~ObFreezer()
@ -429,6 +434,8 @@ void ObFreezer::reset()
async_freeze_tablets_.reuse();
is_inited_ = false;
is_async_tablet_freeze_task_running_ = false;
throttle_is_skipping_ = false;
tenant_replay_is_pending_ = false;
}
int ObFreezer::init(ObLS *ls)
@ -452,6 +459,8 @@ int ObFreezer::init(ObLS *ls)
pend_replay_cnt_ = 0;
need_resubmit_log_ = false;
is_async_tablet_freeze_task_running_ = false;
throttle_is_skipping_ = false;
tenant_replay_is_pending_ = false;
is_inited_ = true;
}
@ -569,50 +578,82 @@ void ObFreezer::submit_checkpoint_task()
int ObFreezer::wait_ls_freeze_finish()
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
const int64_t start = ObClockGenerator::getClock();
int64_t last_submit_log_time = start;
const share::ObLSID ls_id = get_ls_id();
const int64_t start_time = ObClockGenerator::getClock();
uint32_t freeze_clock = get_freeze_clock();
PendTenantReplayGuard pend_replay_guard;
TRANS_LOG(INFO, "[Freezer] freeze_clock", K(ls_id), K(freeze_clock));
{
PendTenantReplayHelper pend_replay_helper(*this, ls_);
(void)pend_replay_helper.set_skip_throttle_flag();
// wait till all memtables are moved from frozen_list to prepare_list
// this means that all memtables can be dumped
while (!get_ls_data_checkpoint()->ls_freeze_finished()) {
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
if (need_resubmit_log() ||
// In order to prevent the txn has already passed the try_submit test
// while failing to submit some logs due to an unexpected bug, we need
// retry to submit the log to go around the above case
(ObTimeUtility::current_time() - last_submit_log_time >= 1_min)) {
last_submit_log_time = ObTimeUtility::current_time();
TRANS_LOG(INFO,
"[Freezer] wait freeze : Logstream ",
K(ls_id),
K(freeze_clock),
K(throttle_is_skipping_),
K(tenant_replay_is_pending_));
(void)submit_log_for_freeze(false/*tablet freeze*/, false/*try*/);
TRANS_LOG(INFO, "[Freezer] resubmit log for ls_freeze", K(ls_id));
}
const int64_t cost_time = ObClockGenerator::getClock() - start;
if (cost_time > 5 * 1000 * 1000) {
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "[Freezer] finish ls_freeze costs too much time",
K(ls_id), K(cost_time));
stat_.add_diagnose_info("finish ls_freeze costs too much time");
// wait till all memtables are moved from frozen_list to prepare_list
// this means that all memtables can be dumped
int64_t time_counter = 0;
while (!get_ls_data_checkpoint()->ls_freeze_finished()) {
if (TC_REACH_TIME_INTERVAL(1LL * 1000LL * 1000LL /* 1 second */)) {
++time_counter;
// check pend condition every second
(void)pend_replay_helper.check_pend_condition_once();
// check resubmit log condition and report some debug info every 5 seconds
if (time_counter >= 5 && time_counter % 5 == 0) {
(void)resubmit_log_if_needed_(start_time, false /* is_tablet_freeze */, false /* is_try */);
}
}
ob_usleep(100);
}
ob_usleep(100);
stat_.add_diagnose_info("logstream_freeze success");
stat_.end_set_freeze_stat(ObFreezeState::FINISH, ObClockGenerator::getClock(), ret);
unset_freeze_();
const int64_t wait_freeze_finish_spend_time = ObClockGenerator::getClock() - start_time;
FLOG_INFO("[Freezer] logstream_freeze success", K(ls_id), K(freeze_clock), K(wait_freeze_finish_spend_time));
}
stat_.add_diagnose_info("logstream_freeze success");
stat_.end_set_freeze_stat(ObFreezeState::FINISH, ObClockGenerator::getClock(), ret);
unset_freeze_();
const int64_t wait_freeze_finish_spend_time = ObClockGenerator::getClock() - start;
FLOG_INFO("[Freezer] logstream_freeze success", K(ls_id), K(freeze_clock), K(wait_freeze_finish_spend_time));
// freeze tx data out of PendTenantReplayGuard
(void)try_freeze_tx_data_();
return ret;
}
void ObFreezer::resubmit_log_if_needed_(const int64_t start_time,
const bool is_tablet_freeze,
const bool is_try,
const ObITabletMemtable *memtable)
{
const share::ObLSID ls_id = get_ls_id();
int64_t last_submit_log_time = start_time;
if (need_resubmit_log() ||
// In order to prevent the txn has already passed the try_submit test
// while failing to submit some logs due to an unexpected bug, we need
// retry to submit the log to go around the above case
(ObClockGenerator::getClock() - last_submit_log_time >= 1_min)) {
last_submit_log_time = ObClockGenerator::getClock();
(void)submit_log_for_freeze(is_tablet_freeze, is_try);
}
const int64_t cost_time = ObClockGenerator::getClock() - start_time;
TRANS_LOG_RET(WARN,
OB_ERR_TOO_MUCH_TIME,
"[Freezer] wait freeze finish costs too much time",
K(ls_id),
K(is_tablet_freeze),
K(is_try),
K(cost_time),
K(throttle_is_skipping_),
K(tenant_replay_is_pending_),
KPC(memtable));
stat_.add_diagnose_info("wait freeze finish costs too much time");
}
// Define a functor to avoid using lambda
struct AsyncFreezeFunctor {
const int64_t trace_id_;
@ -1178,7 +1219,7 @@ int ObFreezer::wait_data_memtable_freeze_finish_(ObITabletMemtable *tablet_memta
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
ObMemtable *memtable = static_cast<ObMemtable*>(tablet_memtable);
if (OB_FAIL(wait_memtable_ready_for_flush_with_ls_lock(memtable))) {
if (OB_FAIL(wait_memtable_ready_for_flush_(memtable))) {
TRANS_LOG(WARN, "[Freezer] fail to wait memtable ready_for_flush", K(ret), K(ls_id));
} else {
int64_t read_lock = LSLOCKALL;
@ -1219,64 +1260,47 @@ int ObFreezer::wait_direct_load_memtable_freeze_finish_(ObITabletMemtable *table
return ret;
}
int ObFreezer::wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tablet_memtable)
int ObFreezer::wait_memtable_ready_for_flush_(ObITabletMemtable *tablet_memtable)
{
share::ObLSID ls_id = get_ls_id();
const int64_t start = ObClockGenerator::getClock();
int64_t last_submit_log_time = start;
int ret = OB_SUCCESS;
share::ObLSID ls_id = get_ls_id();
const int64_t start_time = ObClockGenerator::getClock();
bool ready_for_flush = false;
bool is_force_released = false;
do {
if (OB_FAIL(try_wait_memtable_ready_for_flush_with_ls_lock(tablet_memtable,
ready_for_flush,
is_force_released,
start,
last_submit_log_time))) {
TRANS_LOG(WARN, "[Freezer] memtable is not ready_for_flush", K(ret));
}
} while (OB_SUCC(ret) && !ready_for_flush && !is_force_released);
{
PendTenantReplayHelper pend_replay_helper(*this, ls_);
(void)pend_replay_helper.set_skip_throttle_flag();
return ret;
}
TRANS_LOG(INFO,
"[Freezer] wait freeze : Tablet",
K(ls_id),
KP(tablet_memtable),
K(throttle_is_skipping_),
K(tenant_replay_is_pending_));
int ObFreezer::try_wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tablet_memtable,
bool &ready_for_flush,
bool &is_force_released,
const int64_t start,
int64_t &last_submit_log_time)
{
int ret = OB_SUCCESS;
int64_t read_lock = LSLOCKALL;
int64_t write_lock = 0;
int64_t time_counter = 0;
do {
if (OB_ISNULL(tablet_memtable)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "[Freezer] memtable cannot be null", K(ret));
} else if (FALSE_IT(ready_for_flush = tablet_memtable->ready_for_flush())) {
} else if (FALSE_IT(is_force_released = tablet_memtable->is_force_released())) {
} else if (!ready_for_flush && !is_force_released) {
if (TC_REACH_TIME_INTERVAL(1LL * 1000LL * 1000LL /* 1 second */)) {
++time_counter;
if (OB_ISNULL(tablet_memtable)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "[Freezer] memtable cannot be null", K(ret));
} else if (FALSE_IT(ready_for_flush = tablet_memtable->ready_for_flush())) {
} else if (FALSE_IT(is_force_released = tablet_memtable->is_force_released())) {
} else if (!ready_for_flush && !is_force_released) {
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
if (need_resubmit_log() ||
// In order to prevent the txn has already passed the try_submit test
// while failing to submit some logs due to an unexpected bug, we need
// retry to submit the log to go around the above case
(ObTimeUtility::current_time() - last_submit_log_time >= 1_min)) {
last_submit_log_time = ObTimeUtility::current_time();
(void)submit_log_for_freeze(true/*tablet freeze*/, false/*try*/);
TRANS_LOG(INFO, "[Freezer] resubmit log", K(ret));
// check pend condition every second
(void)pend_replay_helper.check_pend_condition_once();
// check resubmit log condition and report some debug info every 5 seconds
if (time_counter >= 5 && time_counter % 5 == 0) {
(void)resubmit_log_if_needed_(start_time, true /* is_tablet_freeze */, false /* is_try */, tablet_memtable);
}
}
ob_usleep(100);
}
const int64_t cost_time = ObClockGenerator::getClock() - start;
if (cost_time > 5 * 1000 * 1000) {
TRANS_LOG(WARN, "[Freezer] ready_for_flush costs too much time",
K(cost_time), KPC(tablet_memtable));
stat_.add_diagnose_info("ready_for_flush costs too much time");
}
}
ob_usleep(100);
} while (OB_SUCC(ret) && !ready_for_flush && !is_force_released);
}
return ret;
@ -1455,9 +1479,6 @@ void ObFreezer::unset_freeze_()
/* public function about freeze_flag */
bool ObFreezer::is_freeze(uint32_t freeze_flag) const
{
if (freeze_flag == UINT32_MAX) {
freeze_flag = (ATOMIC_LOAD(&freeze_flag_));
}
return 1 == (freeze_flag >> 31);
}
@ -1626,11 +1647,6 @@ void ObFreezer::set_tablet_freeze_begin_()
}
}
void ObFreezer::set_tablet_freeze_end_()
{
ATOMIC_DEC(&low_priority_freeze_cnt_);
}
void ObFreezer::set_ls_freeze_begin_()
{
const int64_t SLEEP_INTERVAL = 100 * 1000; // 100 ms
@ -1646,11 +1662,6 @@ void ObFreezer::set_ls_freeze_begin_()
}
}
void ObFreezer::set_ls_freeze_end_()
{
ATOMIC_DEC(&high_priority_freeze_cnt_);
}
int ObFreezer::pend_ls_replay()
{
int ret = OB_SUCCESS;
@ -1823,45 +1834,136 @@ int ObFreezer::ObTabletFreezeGuard::try_set_tablet_freeze_begin()
return ret;
}
ObFreezer::PendTenantReplayGuard::PendTenantReplayGuard()
void ObFreezer::PendTenantReplayHelper::set_skip_throttle_flag()
{
int ret = OB_SUCCESS;
common::ObSharedGuard<ObLSIterator> iter;
ObLSService *ls_srv = MTL(ObLSService *);
if (OB_FAIL(ls_srv->get_ls_iter(iter, ObLSGetMod::STORAGE_MOD))) {
STORAGE_LOG(WARN, "[ObFreezer] fail to get ls iterator", KR(ret));
if (OB_ISNULL(current_freeze_ls_)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "invalid ls pointer", KP(current_freeze_ls_));
} else if (current_ls_is_leader_()){
// leader do not need skip throttle
} else {
ObLS *ls = nullptr;
ls_handle_array_.reuse();
while (OB_SUCC(iter->get_next(ls))) {
int tmp_ret = OB_SUCCESS;
ObLSHandle ls_handle;
if (OB_TMP_FAIL(ls_srv->get_ls(ls->get_ls_id(), ls_handle, ObLSGetMod::STORAGE_MOD))) {
STORAGE_LOG(WARN, "[ObFreezer] get ls handle failed", KR(tmp_ret), KP(ls));
} else if (OB_TMP_FAIL(ls_handle_array_.push_back(ls_handle))) {
STORAGE_LOG(WARN, "[ObFreezer] push back ls handle failed", KR(tmp_ret), KP(ls));
} else if (OB_TMP_FAIL(ls->get_freezer()->pend_ls_replay())) {
STORAGE_LOG(WARN, "[ObFreezer] pend replay failed", KR(tmp_ret), KPC(ls));
(void)ls_handle_array_.pop_back();
// pend replay before set_throttle_is_skipping to avoid skipping too much replay
if (remain_memory_is_exhausting_()) {
(void)pend_tenant_replay_();
}
host_.set_throttle_is_skipping();
}
}
bool ObFreezer::PendTenantReplayHelper::current_ls_is_leader_()
{
int ret = OB_SUCCESS;
// set default value as leader because leader do not skip throttle
bool is_leader = true;
ObRole role;
int64_t proposal_id = 0;
if (OB_FAIL(current_freeze_ls_->get_log_handler()->get_role(role, proposal_id))) {
LOG_WARN("get ls role failed", KR(ret), K(current_freeze_ls_->get_ls_id()));
} else if (common::is_strong_leader(role)) {
is_leader = true;
} else {
is_leader = false;
}
return is_leader;
}
void ObFreezer::PendTenantReplayHelper::check_pend_condition_once()
{
// only check pend condition when throttle is skipping
if (host_.throttle_is_skipping()) {
if (host_.tenant_replay_is_pending()) {
if (!remain_memory_is_exhausting_()) {
(void)restore_tenant_replay_();
} else {
// keep pending replay
}
} else {
// tenant replay is not pending
if (remain_memory_is_exhausting_()) {
(void)pend_tenant_replay_();
} else {
// keep not pending replay
}
}
}
}
ObFreezer::PendTenantReplayGuard::~PendTenantReplayGuard()
bool ObFreezer::PendTenantReplayHelper::remain_memory_is_exhausting_() {
TxShareThrottleTool &throttle_tool = (MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool());
const bool has_triggered_throttle = throttle_tool.has_triggered_throttle<ObMemstoreAllocator>();
const bool remain_memory_is_exhausting =
has_triggered_throttle || MTL(ObTenantFreezer *)->memstore_remain_memory_is_exhausting();
STORAGE_LOG(INFO, "finish check remain memory", K(has_triggered_throttle), K(remain_memory_is_exhausting));
return remain_memory_is_exhausting;
}
void ObFreezer::PendTenantReplayHelper::pend_tenant_replay_()
{
int ret = OB_SUCCESS;
for (int64_t i = 0; i < ls_handle_array_.count(); i++) {
int tmp_ret = OB_SUCCESS;
ObLSService *ls_srv = MTL(ObLSService *);
common::ObSharedGuard<ObLSIterator> iter;
if (OB_FAIL(ls_srv->get_ls_iter(iter, ObLSGetMod::STORAGE_MOD))) {
STORAGE_LOG(WARN, "[ObFreezer] fail to get ls iterator", KR(ret));
} else {
ObLS *ls = nullptr;
if (OB_ISNULL(ls = ls_handle_array_.at(i).get_ls())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "[ObFreezer] invalid ls handle", KR(ret), KPC(ls));
} else if (OB_TMP_FAIL(ls->get_freezer()->restore_ls_replay())) {
STORAGE_LOG(WARN, "[ObFreezer] restore replay failed", KR(ret), KPC(ls));
ls_handle_array_.reuse();
int64_t iterate_ls_count = 0;
int64_t pend_ls_replay_count = 0;
// Here we need to perform a pend_ls_replay on all ls, because if a switch_leader occurs for any ls during the
// freezing process, the absence of pend_ls_replay might result in an unthrottled memory usage
while (OB_SUCC(iter->get_next(ls))) {
ObLSHandle ls_handle;
iterate_ls_count++;
if (OB_FAIL(ls_srv->get_ls(ls->get_ls_id(), ls_handle, ObLSGetMod::STORAGE_MOD))) {
STORAGE_LOG(WARN, "[ObFreezer] get ls handle failed", KR(ret), KP(ls));
} else if (OB_FAIL(ls_handle_array_.push_back(ls_handle))) {
STORAGE_LOG(WARN, "[ObFreezer] push back ls handle failed", KR(ret), KP(ls));
} else if (OB_FAIL(ls->get_freezer()->pend_ls_replay())) {
STORAGE_LOG(WARN, "[ObFreezer] pend replay failed", KR(ret), KPC(ls));
(void)ls_handle_array_.pop_back();
} else {
pend_ls_replay_count++;
}
}
// only skip throttle when pend all ls replay successfully, or reset this guard to restore replay
if (iterate_ls_count == pend_ls_replay_count) {
(void)host_.set_tenant_replay_is_pending();
STORAGE_LOG(INFO,
"pend replay finish",
K(pend_ls_replay_count),
K(host_.throttle_is_skipping()),
K(host_.tenant_replay_is_pending()));
} else {
(void)restore_tenant_replay_();
}
}
}
void ObFreezer::PendTenantReplayHelper::restore_tenant_replay_()
{
for (int64_t i = 0; i < ls_handle_array_.count(); i++) {
int ret = OB_SUCCESS;
ObLS *ls = nullptr;
if (OB_ISNULL(ls = ls_handle_array_.at(i).get_ls())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "[ObFreezer] invalid ls handle", KR(ret), KPC(ls));
} else if (OB_FAIL(ls->get_freezer()->restore_ls_replay())) {
STORAGE_LOG(ERROR, "[ObFreezer] restore replay failed", KR(ret), KPC(ls));
}
}
host_.unset_tenant_replay_is_pending();
STORAGE_LOG(INFO,
"restore tenant replay",
K(ls_handle_array_.count()),
K(host_.tenant_replay_is_pending()),
K(host_.throttle_is_skipping()));
ls_handle_array_.reuse();
}
} // namespace storage
} // namespace oceanbase

View File

@ -222,7 +222,8 @@ public:
/********************** freeze **********************/
/* freeze_flag */
bool is_freeze(uint32_t is_freeze=UINT32_MAX) const;
bool is_freeze(uint32_t freeze_flag) const;
bool is_ls_freeze_running() const { return 0 < ATOMIC_LOAD(&high_priority_freeze_cnt_); }
uint32_t get_freeze_flag() const { return ATOMIC_LOAD(&freeze_flag_); };
uint32_t get_freeze_clock() { return ATOMIC_LOAD(&freeze_flag_) & (~(1 << 31)); }
@ -247,6 +248,13 @@ public:
void print_freezer_statistics();
/* others */
bool need_resubmit_log() { return ATOMIC_LOAD(&need_resubmit_log_); }
void set_throttle_is_skipping() { throttle_is_skipping_ = true; }
void unset_throttle_is_skipping() { throttle_is_skipping_ = false; }
bool throttle_is_skipping() { return throttle_is_skipping_; }
void set_tenant_replay_is_pending() { tenant_replay_is_pending_ = true; }
void unset_tenant_replay_is_pending() { tenant_replay_is_pending_ = false; }
bool tenant_replay_is_pending() const { return tenant_replay_is_pending_; }
// get consequent callbacked log_ts right boundary
virtual int get_max_consequent_callbacked_scn(share::SCN &max_consequent_callbacked_scn);
// to set snapshot version when memtables meet ready_for_flush
@ -258,7 +266,6 @@ public:
int get_newest_snapshot_version(const ObTabletID &tablet_id,
share::SCN &snapshot_version);
ObFreezerStat& get_stat() { return stat_; }
bool need_resubmit_log() { return ATOMIC_LOAD(&need_resubmit_log_); }
void set_need_resubmit_log(bool flag) { return ATOMIC_STORE(&need_resubmit_log_, flag); }
int pend_ls_replay();
int restore_ls_replay();
@ -282,11 +289,26 @@ private:
bool need_release_;
ObFreezer &parent_;
};
class PendTenantReplayGuard {
public:
PendTenantReplayGuard();
~PendTenantReplayGuard();
class PendTenantReplayHelper {
public:
PendTenantReplayHelper(ObFreezer &host, ObLS *current_freeze_ls)
: host_(host), current_freeze_ls_(current_freeze_ls) {}
~PendTenantReplayHelper() { reset_pend_status_(); }
void set_skip_throttle_flag();
void check_pend_condition_once();
private:
bool current_ls_is_leader_();
bool remain_memory_is_exhausting_();
void pend_tenant_replay_();
void restore_tenant_replay_();
void reset_pend_status_()
{
(void)host_.unset_throttle_is_skipping();
(void)restore_tenant_replay_();
}
private:
ObFreezer &host_;
ObLS *current_freeze_ls_;
ObSEArray<ObLSHandle, 16> ls_handle_array_;
};
@ -299,10 +321,13 @@ private:
void try_freeze_tx_data_();
/* inner subfunctions for freeze process */
int inner_logstream_freeze();
int submit_log_for_freeze(const bool is_tablet_freeze, const bool is_try);
void submit_log_if_needed_(ObIArray<ObTableHandleV2> &frozen_memtable_handles);
void try_submit_log_for_freeze_(const bool is_tablet_freeze);
void resubmit_log_if_needed_(const int64_t start_time,
const bool is_tablet_freeze,
const bool is_try,
const ObITabletMemtable *freeze_memtable = nullptr /* used for tablet freeze */);
int wait_data_memtable_freeze_finish_(ObITabletMemtable *tablet_memtable);
int wait_direct_load_memtable_freeze_finish_(ObITabletMemtable *tablet_memtable);
int set_tablet_freeze_flag_(const int64_t trace_id,
@ -329,12 +354,12 @@ private:
ObTabletID &record_tablet_id,
bool &try_guard);
int submit_wait_freeze_finish_task_(const bool is_ls_freeze, ObFuture<int> *result, ObTableHandleV2 &handle);
int wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tablet_memtable);
int wait_memtable_ready_for_flush_(ObITabletMemtable *tablet_memtable);
int try_set_tablet_freeze_begin_();
void set_tablet_freeze_begin_();
void set_tablet_freeze_end_();
void set_tablet_freeze_end_() { ATOMIC_DEC(&low_priority_freeze_cnt_); }
void set_ls_freeze_begin_();
void set_ls_freeze_end_();
void set_ls_freeze_end_() { ATOMIC_DEC(&high_priority_freeze_cnt_); }
int check_ls_state(); // must be used under the protection of ls_lock
int tablet_freeze_(const int64_t trace_id,
const ObIArray<ObTabletID> &tablet_ids,
@ -343,11 +368,6 @@ private:
ObIArray<ObTableHandleV2> &frozen_memtable_handles,
ObIArray<ObTabletID> &freeze_failed_tablets);
int inner_wait_memtable_freeze_finish_(ObTableHandleV2 &memtable_handle);
int try_wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tablet_memtable,
bool &ready_for_flush,
bool &is_force_released,
const int64_t start,
int64_t &last_submit_log_time);
void submit_checkpoint_task();
private:
// flag whether the logsteram is freezing
@ -375,6 +395,8 @@ private:
bool enable_; // whether we can do freeze now
bool is_inited_;
bool is_async_tablet_freeze_task_running_;
bool throttle_is_skipping_;
bool tenant_replay_is_pending_;
common::hash::ObHashSet<AsyncFreezeTabletInfo> async_freeze_tablets_;
};

View File

@ -1329,12 +1329,16 @@ int ObLSTxCtxMgr::traverse_tx_to_submit_redo_log(ObTransID &fail_tx_id, const ui
int ret = OB_SUCCESS;
RLockGuard guard(rwlock_);
ObTxSubmitLogFunctor fn(ObTxSubmitLogFunctor::SUBMIT_REDO_LOG, freeze_clock);
if (!is_follower_() && OB_FAIL(ls_tx_ctx_map_.for_each(fn))) {
if (is_follower_()) {
// quit submit log because this is a follower
} else if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) {
if (OB_SUCCESS != fn.get_result()) {
// get real ret code
ret = fn.get_result();
}
TRANS_LOG(WARN, "failed to submit log", K(ret));
} else {
TRANS_LOG(INFO, "traverse tx to submit redo log finish", K(ret), K(freeze_clock));
}
fail_tx_id = fn.get_fail_tx_id();

View File

@ -46,10 +46,10 @@ ObTenantFreezer::ObTenantFreezer()
rs_mgr_(nullptr),
freeze_thread_pool_(),
freeze_thread_pool_lock_(common::ObLatchIds::FREEZE_THREAD_POOL_LOCK),
exist_ls_freezing_(false),
last_update_ts_(0),
freezer_stat_(),
freezer_history_()
freezer_history_(),
throttle_is_skipping_cache_(),
memstore_remain_memory_is_exhausting_cache_()
{
freezer_stat_.reset();
}
@ -63,13 +63,14 @@ void ObTenantFreezer::destroy()
{
freeze_trigger_timer_.destroy();
is_freezing_tx_data_ = false;
exist_ls_freezing_ = false;
self_.reset();
svr_rpc_proxy_ = nullptr;
common_rpc_proxy_ = nullptr;
rs_mgr_ = nullptr;
freezer_stat_.reset();
freezer_history_.reset();
throttle_is_skipping_cache_.reset();
memstore_remain_memory_is_exhausting_cache_.reset();
is_inited_ = false;
}
@ -160,12 +161,46 @@ void ObTenantFreezer::wait()
bool ObTenantFreezer::exist_ls_freezing()
{
int64_t cur_ts = ObTimeUtility::fast_current_time();
int64_t old_ts = last_update_ts_;
int ret = OB_SUCCESS;
bool exist_ls_freezing = false;
common::ObSharedGuard<ObLSIterator> iter;
ObLSService *ls_srv = MTL(ObLSService *);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("[TenantFreezer] tenant freezer not inited", KR(ret));
} else if (OB_FAIL(ls_srv->get_ls_iter(iter, ObLSGetMod::TXSTORAGE_MOD))) {
LOG_WARN("[TenantFreezer] fail to get log stream iterator", KR(ret));
} else {
ObLS *ls = nullptr;
while (OB_SUCC(iter->get_next(ls))) {
if (ls->get_freezer()->is_ls_freeze_running()) {
exist_ls_freezing = true;
break;
}
}
if (ret == OB_ITER_END) {
ret = OB_SUCCESS;
}
if (OB_FAIL(ret)) {
LOG_WARN("[TenantFreezer] iter ls failed", K(ret));
}
}
return exist_ls_freezing;
}
bool ObTenantFreezer::exist_ls_throttle_is_skipping()
{
int ret = OB_SUCCESS;
int64_t cur_ts = ObClockGenerator::getClock();
int64_t last_update_ts = throttle_is_skipping_cache_.update_ts_;
if ((cur_ts - last_update_ts > UPDATE_INTERVAL) &&
ATOMIC_BCAS(&throttle_is_skipping_cache_.update_ts_, last_update_ts, cur_ts)) {
bool exist_ls_throttle_is_skipping = false;
if ((cur_ts - last_update_ts_ > UPDATE_INTERVAL) &&
old_ts == ATOMIC_CAS(&last_update_ts_, old_ts, cur_ts)) {
int ret = OB_SUCCESS;
common::ObSharedGuard<ObLSIterator> iter;
ObLSService *ls_srv = MTL(ObLSService *);
if (IS_NOT_INIT) {
@ -175,31 +210,71 @@ bool ObTenantFreezer::exist_ls_freezing()
LOG_WARN("[TenantFreezer] fail to get log stream iterator", KR(ret));
} else {
ObLS *ls = nullptr;
int ls_cnt = 0;
int exist_ls_freezing = false;
for (; OB_SUCC(iter->get_next(ls)); ++ls_cnt) {
int tmp_ret = OB_SUCCESS;
ObRole role;
int64_t proposal_id = 0;
if (OB_TMP_FAIL(ls->get_log_handler()->get_role(role, proposal_id))) {
LOG_WARN("get ls role failed", KR(tmp_ret), K(ls->get_ls_id()));
} else if (common::is_strong_leader(role)) {
// skip check leader logstream
} else if (ls->get_freezer()->is_freeze()) {
exist_ls_freezing = true;
while (OB_SUCC(iter->get_next(ls))) {
if (ls->get_freezer()->throttle_is_skipping()) {
exist_ls_throttle_is_skipping = true;
break;
}
}
exist_ls_freezing_ = exist_ls_freezing;
if (ret == OB_ITER_END) {
ret = OB_SUCCESS;
} else {
LOG_WARN("[TenantFreezer] iter ls failed", K(ret));
}
if (OB_FAIL(ret)) {
LOG_WARN("[TenantFreezer] iter ls failed", K(ret));
}
}
// assign need_skip_throttle here because if some error happened, the value can be reset to false
throttle_is_skipping_cache_.value_ = exist_ls_throttle_is_skipping;
}
return exist_ls_freezing_;
return throttle_is_skipping_cache_.value_;
}
bool ObTenantFreezer::memstore_remain_memory_is_exhausting()
{
int ret = OB_SUCCESS;
int64_t cur_ts = ObClockGenerator::getClock();
int64_t last_update_ts = memstore_remain_memory_is_exhausting_cache_.update_ts_;
if ((cur_ts - last_update_ts > UPDATE_INTERVAL) &&
ATOMIC_BCAS(&memstore_remain_memory_is_exhausting_cache_.update_ts_, last_update_ts, cur_ts)) {
bool remain_mem_exhausting = false;
if (false == tenant_info_.is_loaded_) {
LOG_INFO("[TenantFreezer] This tenant not exist", KR(ret));
} else {
const int64_t MEMORY_IS_EXHAUSTING_PERCENTAGE = 10;
// tenant memory condition
const int64_t tenant_memory_limit = get_tenant_memory_limit(MTL_ID());
const int64_t tenant_memory_remain = get_tenant_memory_remain(MTL_ID());
const bool tenant_memory_exhausting =
tenant_memory_remain < (tenant_memory_limit * MEMORY_IS_EXHAUSTING_PERCENTAGE / 100);
// memstore memory condition
const int64_t memstore_limit = tenant_info_.get_memstore_limit();
const int64_t memstore_remain = (memstore_limit - get_tenant_memory_hold(MTL_ID(), ObCtxIds::MEMSTORE_CTX_ID));
const bool memstore_memory_exhausting = memstore_remain < (memstore_limit * MEMORY_IS_EXHAUSTING_PERCENTAGE / 100);
remain_mem_exhausting = tenant_memory_exhausting || memstore_memory_exhausting;
if (remain_mem_exhausting && REACH_TIME_INTERVAL(1LL * 1000LL * 1000LL /* 1 second */)) {
STORAGE_LOG(INFO,
"[TenantFreezer] memstore remain memory is exhausting",
K(tenant_memory_limit),
K(tenant_memory_remain),
K(tenant_memory_exhausting),
K(memstore_limit),
K(memstore_remain),
K(memstore_memory_exhausting));
}
}
memstore_remain_memory_is_exhausting_cache_.value_ = remain_mem_exhausting;
}
return memstore_remain_memory_is_exhausting_cache_.value_;
}
int ObTenantFreezer::ls_freeze_(ObLS *ls, const bool is_sync, const int64_t abs_timeout_ts)

View File

@ -102,6 +102,16 @@ class ObTenantFreezer
{
friend ObTenantTxDataFreezeGuard;
friend class ObFreezer;
struct PeriodicalUpdateValueCache {
PeriodicalUpdateValueCache() : value_(false), update_ts_(0) {}
void reset()
{
value_ = false;
update_ts_ = 0;
}
bool value_;
int64_t update_ts_;
};
public:
const static int64_t TIME_WHEEL_PRECISION = 100_ms;
@ -218,6 +228,8 @@ public:
}
static int64_t get_freeze_trigger_interval() { return FREEZE_TRIGGER_INTERVAL; }
bool exist_ls_freezing();
bool exist_ls_throttle_is_skipping();
bool memstore_remain_memory_is_exhausting();
// freezer stat collector and generator
void add_merge_event(const compaction::ObMergeType type, const int64_t cost)
@ -305,13 +317,13 @@ private:
common::ObOccamTimerTaskRAIIHandle timer_handle_;
common::ObOccamThreadPool freeze_thread_pool_;
ObSpinLock freeze_thread_pool_lock_;
bool exist_ls_freezing_;
int64_t last_update_ts_;
// diagnose only, we capture the freeze stats every 30 minutes
ObTenantFreezerStat freezer_stat_;
// diagnose only, we capture the freeze history in one monthes
ObTenantFreezerStatHistory freezer_history_;
PeriodicalUpdateValueCache throttle_is_skipping_cache_;
PeriodicalUpdateValueCache memstore_remain_memory_is_exhausting_cache_;
};
class ObTenantTxDataFreezeGuard

View File

@ -37,9 +37,7 @@ OB_SERIALIZE_MEMBER(ObTenantFreezeArg,
try_frozen_scn_);
ObTenantFreezeCtx::ObTenantFreezeCtx()
: mem_lower_limit_(0),
mem_upper_limit_(0),
mem_memstore_limit_(0),
: mem_memstore_limit_(0),
memstore_freeze_trigger_(0),
max_mem_memstore_can_get_now_(0),
active_memstore_used_(0),
@ -52,8 +50,6 @@ ObTenantFreezeCtx::ObTenantFreezeCtx()
void ObTenantFreezeCtx::reset()
{
mem_lower_limit_ = 0;
mem_upper_limit_ = 0;
mem_memstore_limit_ = 0;
memstore_freeze_trigger_ = 0;
max_mem_memstore_can_get_now_ = 0;
@ -181,8 +177,6 @@ bool ObTenantInfo::is_memstore_limit_changed(const int64_t curr_memstore_limit_p
void ObTenantInfo::get_freeze_ctx(ObTenantFreezeCtx &ctx) const
{
SpinRLockGuard guard(lock_);
ctx.mem_lower_limit_ = mem_lower_limit_;
ctx.mem_upper_limit_ = mem_upper_limit_;
ctx.mem_memstore_limit_ = mem_memstore_limit_;
}

View File

@ -61,8 +61,6 @@ public:
void reset();
public:
// snapshot of tenant_info
int64_t mem_lower_limit_;
int64_t mem_upper_limit_;
int64_t mem_memstore_limit_;
// running data