[CP] [FIX] adjust checkpoint executor logic

This commit is contained in:
ZenoWang
2024-02-28 10:44:42 +00:00
committed by ob-robot
parent a699639ddb
commit 17d0754077
6 changed files with 149 additions and 79 deletions

View File

@ -30,7 +30,10 @@ namespace checkpoint
ObCheckpointExecutor::ObCheckpointExecutor() ObCheckpointExecutor::ObCheckpointExecutor()
: rwlock_(common::ObLatchIds::CLOG_CKPT_RWLOCK), : rwlock_(common::ObLatchIds::CLOG_CKPT_RWLOCK),
update_checkpoint_enabled_(false) update_checkpoint_enabled_(false),
reuse_recycle_scn_times_(0),
prev_clog_checkpoint_lsn_(),
prev_recycle_scn_()
{ {
reset(); reset();
} }
@ -47,6 +50,9 @@ void ObCheckpointExecutor::reset()
} }
ls_ = NULL; ls_ = NULL;
loghandler_ = NULL; loghandler_ = NULL;
reuse_recycle_scn_times_ = 0;
prev_clog_checkpoint_lsn_.reset();
prev_recycle_scn_.set_invalid();
} }
int ObCheckpointExecutor::register_handler(const ObLogBaseType &type, int ObCheckpointExecutor::register_handler(const ObLogBaseType &type,
@ -196,71 +202,112 @@ int ObCheckpointExecutor::update_clog_checkpoint()
return ret; return ret;
} }
int ObCheckpointExecutor::advance_checkpoint_by_flush( int ObCheckpointExecutor::advance_checkpoint_by_flush(SCN recycle_scn)
SCN recycle_scn)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const ObLSID ls_id = ls_->get_ls_id();
const LSN clog_checkpoint_lsn = ls_->get_clog_base_lsn();
const SCN clog_checkpoint_scn = ls_->get_clog_checkpoint_scn();
RLockGuard guard(rwlock_); RLockGuard guard(rwlock_);
if (update_checkpoint_enabled_) { if (update_checkpoint_enabled_) {
int tmp_ret = OB_SUCCESS; // calculate recycle_scn if it is invalid(called by clog disk full situation)
SCN max_decided_scn; if (!recycle_scn.is_valid() &&
OB_FAIL(calculate_recycle_scn_(clog_checkpoint_lsn, clog_checkpoint_scn, recycle_scn))) {
// calcu recycle_scn according to clog disk situation STORAGE_LOG(WARN, "calculate recycle scn failed", KR(ret));
if (!recycle_scn.is_valid()) {
LSN end_lsn;
if (OB_FAIL(loghandler_->get_end_lsn(end_lsn))) {
STORAGE_LOG(WARN, "get end lsn failed", K(ret), K(ls_->get_ls_id()));
} else {
LSN clog_checkpoint_lsn = ls_->get_clog_base_lsn();
LSN calcu_recycle_lsn = clog_checkpoint_lsn
+ ((end_lsn - clog_checkpoint_lsn) * CLOG_GC_PERCENT / 100);
if (OB_FAIL(loghandler_->locate_by_lsn_coarsely(calcu_recycle_lsn, recycle_scn))) {
STORAGE_LOG(WARN, "locate_by_lsn_coarsely failed", K(calcu_recycle_lsn),
K(recycle_scn), K(ls_->get_ls_id()));
} else {
STORAGE_LOG(INFO, "advance checkpoint by flush to avoid clog disk full",
K(recycle_scn), K(end_lsn), K(clog_checkpoint_lsn),
K(calcu_recycle_lsn), K(ls_->get_ls_id()));
}
}
} else {
if (recycle_scn.is_max()) {
} else if (OB_FAIL(loghandler_->get_max_decided_scn(max_decided_scn))) {
STORAGE_LOG(WARN, "failed to get_max_decided_scn",
K(recycle_scn), K(ls_->get_clog_checkpoint_scn()), K(ls_->get_ls_id()));
} else if (recycle_scn > max_decided_scn) {
ret = OB_EAGAIN;
STORAGE_LOG(WARN, "recycle_scn is more than max_decided_scn",
KR(ret), K(recycle_scn), K(ls_->get_clog_checkpoint_scn()), K(ls_->get_ls_id()), K(max_decided_scn));
}
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
} else if (recycle_scn < ls_->get_clog_checkpoint_scn()) { } else if (OB_FAIL(check_need_flush_(clog_checkpoint_scn, recycle_scn))) {
ret = OB_NO_NEED_UPDATE; STORAGE_LOG(WARN, "no need flush");
STORAGE_LOG(WARN, "recycle_scn should not smaller than checkpoint_log_scn",
K(recycle_scn), K(ls_->get_clog_checkpoint_scn()), K(ls_->get_ls_id()));
} else { } else {
STORAGE_LOG(INFO, "start flush", STORAGE_LOG(INFO, "start flush", K(recycle_scn), K(clog_checkpoint_scn), K(ls_id));
K(recycle_scn),
K(ls_->get_clog_checkpoint_scn()),
K(ls_->get_ls_id()));
for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) { for (int i = 1; i < ObLogBaseType::MAX_LOG_BASE_TYPE; i++) {
if (OB_NOT_NULL(handlers_[i]) int tmp_ret = OB_SUCCESS;
&& OB_SUCCESS != (tmp_ret = (handlers_[i]->flush(recycle_scn)))) { if (OB_NOT_NULL(handlers_[i]) && OB_TMP_FAIL(handlers_[i]->flush(recycle_scn))) {
STORAGE_LOG(WARN, "handler flush failed", K(recycle_scn), K(tmp_ret), STORAGE_LOG(WARN, "handler flush failed", KR(tmp_ret), K(recycle_scn), K(tmp_ret), K(i), K(ls_id));
K(i), K(ls_->get_ls_id()));
} }
} }
} }
} else { } else {
STORAGE_LOG(WARN, "update_checkpoint is not enabled", K(ls_->get_ls_id())); STORAGE_LOG(WARN, "update_checkpoint is not enabled", K(ls_id));
} }
return ret; return ret;
} }
int ObCheckpointExecutor::calculate_recycle_scn_(const LSN clog_checkpoint_lsn,
const SCN clog_checkpoint_scn,
SCN &recycle_scn)
{
int ret = OB_SUCCESS;
LSN end_lsn;
// locate_by_lsn_coarsely may return a recycle_scn less than checkpoint_scn
// so if prev_recycle_scn_ <= clog_checkpoint_scn, the recycle_scn is still needed to be calculated again
if (prev_clog_checkpoint_lsn_.is_valid() && (prev_clog_checkpoint_lsn_ == clog_checkpoint_lsn) &&
prev_recycle_scn_.is_valid() && (prev_recycle_scn_ > clog_checkpoint_scn)) {
recycle_scn = prev_recycle_scn_;
reuse_recycle_scn_times_++;
if (reuse_recycle_scn_times_ % 1000 == 0) {
STORAGE_LOG_RET(WARN, 0, "attention! clog checkpiont has not changed for a long time");
recycle_scn.set_max();
}
STORAGE_LOG(INFO,
"clog checkpoint has not changed yet. use previous recycle_scn to advance checkpoint",
K(reuse_recycle_scn_times_),
K(clog_checkpoint_lsn),
K(recycle_scn));
} else if (OB_FAIL(loghandler_->get_end_lsn(end_lsn))) {
STORAGE_LOG(WARN, "get end lsn failed", K(ret), K(ls_->get_ls_id()));
} else {
LSN calcu_recycle_lsn = clog_checkpoint_lsn + ((end_lsn - clog_checkpoint_lsn) * CLOG_GC_PERCENT / 100);
if (OB_FAIL(loghandler_->locate_by_lsn_coarsely(calcu_recycle_lsn, recycle_scn))) {
STORAGE_LOG(WARN, "locate_by_lsn_coarsely failed", K(calcu_recycle_lsn), K(recycle_scn), K(ls_->get_ls_id()));
} else {
prev_clog_checkpoint_lsn_ = clog_checkpoint_lsn;
prev_recycle_scn_ = recycle_scn;
reuse_recycle_scn_times_ = 0;
STORAGE_LOG(INFO,
"advance checkpoint by flush to avoid clog disk full",
K(recycle_scn),
K(end_lsn),
K(clog_checkpoint_lsn),
K(calcu_recycle_lsn),
K(ls_->get_ls_id()));
}
}
return ret;
}
int ObCheckpointExecutor::check_need_flush_(const SCN clog_checkpoint_scn, const SCN recycle_scn)
{
int ret = OB_SUCCESS;
SCN max_decided_scn;
const ObLSID ls_id = ls_->get_ls_id();
if (recycle_scn.is_max()) {
// must do flush
} else if (recycle_scn < clog_checkpoint_scn) {
ret = OB_NO_NEED_UPDATE;
STORAGE_LOG(WARN,
"recycle_scn should not smaller than checkpoint_log_scn",
K(recycle_scn),
K(clog_checkpoint_scn),
K(ls_id));
} else if (OB_FAIL(loghandler_->get_max_decided_scn(max_decided_scn))) {
STORAGE_LOG(WARN, "failed to get_max_decided_scn", K(recycle_scn), K(clog_checkpoint_scn), K(ls_id));
} else if (recycle_scn > max_decided_scn) {
ret = OB_EAGAIN;
STORAGE_LOG(WARN,
"recycle_scn is larger than max_decided_scn",
K(recycle_scn),
K(clog_checkpoint_scn),
K(ls_id),
K(max_decided_scn));
}
return ret;
}
int ObCheckpointExecutor::get_checkpoint_info(ObIArray<ObCheckpointVTInfo> &checkpoint_array) int ObCheckpointExecutor::get_checkpoint_info(ObIArray<ObCheckpointVTInfo> &checkpoint_array)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -283,9 +330,8 @@ int64_t ObCheckpointExecutor::get_cannot_recycle_log_size()
LSN end_lsn; LSN end_lsn;
if (OB_FAIL(loghandler_->get_end_lsn(end_lsn))) { if (OB_FAIL(loghandler_->get_end_lsn(end_lsn))) {
STORAGE_LOG(WARN, "get end lsn failed", K(ret), K(ls_->get_ls_id())); STORAGE_LOG(WARN, "get end lsn failed", K(ret), K(ls_->get_ls_id()));
} else if (!ls_->get_data_checkpoint()->is_flushing()) { } else {
cannot_recycle_log_size = cannot_recycle_log_size = end_lsn.val_ - ls_->get_clog_base_lsn().val_;
end_lsn.val_ - ls_->get_clog_base_lsn().val_;
} }
return cannot_recycle_log_size; return cannot_recycle_log_size;
} }

View File

@ -75,8 +75,8 @@ public:
// the service will flush and advance checkpoint // the service will flush and advance checkpoint
// after flush, checkpoint_scn will be equal or greater than recycle_scn // after flush, checkpoint_scn will be equal or greater than recycle_scn
int advance_checkpoint_by_flush( int advance_checkpoint_by_flush(share::SCN recycle_scn = share::SCN::invalid_scn());
share::SCN recycle_scn = share::SCN::invalid_scn());
// for __all_virtual_checkpoint // for __all_virtual_checkpoint
int get_checkpoint_info(ObIArray<ObCheckpointVTInfo> &checkpoint_array); int get_checkpoint_info(ObIArray<ObCheckpointVTInfo> &checkpoint_array);
@ -88,6 +88,9 @@ public:
int diagnose(CheckpointDiagnoseInfo &diagnose_info) const; int diagnose(CheckpointDiagnoseInfo &diagnose_info) const;
int traversal_flush() const; int traversal_flush() const;
private:
int check_need_flush_(const SCN clog_checkpoint_scn, const SCN recycle_scn);
int calculate_recycle_scn_(const palf::LSN clog_checkpoint_lsn, const SCN clog_checkpoint_snc, SCN &recycle_scn);
private: private:
static const int64_t CLOG_GC_PERCENT = 60; static const int64_t CLOG_GC_PERCENT = 60;
@ -105,8 +108,13 @@ private:
RWLock rwlock_for_update_clog_checkpoint_; RWLock rwlock_for_update_clog_checkpoint_;
bool update_checkpoint_enabled_; bool update_checkpoint_enabled_;
int64_t reuse_recycle_scn_times_;
palf::LSN prev_clog_checkpoint_lsn_;
share::SCN prev_recycle_scn_;
}; };
} // namespace checkpoint } // namespace checkpoint
} // namespace storage } // namespace storage
} // namespace oceanbase } // namespace oceanbase

View File

@ -241,11 +241,32 @@ SCN ObDataCheckpoint::get_rec_scn()
return min_rec_scn; return min_rec_scn;
} }
SCN ObDataCheckpoint::get_active_rec_scn()
{
RLOCK(NEW_CREATE | ACTIVE);
int ret = OB_SUCCESS;
SCN min_active_rec_scn = SCN::max_scn();
SCN tmp = SCN::max_scn();
if ((tmp = new_create_list_.get_min_rec_scn_in_list(false)) < min_active_rec_scn) {
min_active_rec_scn = tmp;
}
if ((tmp = active_list_.get_min_rec_scn_in_list()) < min_active_rec_scn) {
min_active_rec_scn = tmp;
}
return min_active_rec_scn;
}
int ObDataCheckpoint::flush(SCN recycle_scn, bool need_freeze) int ObDataCheckpoint::flush(SCN recycle_scn, bool need_freeze)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (need_freeze) { if (need_freeze) {
if (OB_FAIL(freeze_base_on_needs_(recycle_scn))) { SCN active_rec_scn = get_active_rec_scn();
if (active_rec_scn > recycle_scn) {
STORAGE_LOG(INFO,
"skip flush data checkpoint cause active_rec_scn is larger than recycle_scn",
K(active_rec_scn),
K(recycle_scn));
} else if (OB_FAIL(freeze_base_on_needs_(recycle_scn))) {
STORAGE_LOG(WARN, "freeze_base_on_needs failed", STORAGE_LOG(WARN, "freeze_base_on_needs failed",
K(ret), K(ls_->get_ls_id()), K(recycle_scn)); K(ret), K(ls_->get_ls_id()), K(recycle_scn));
} }
@ -868,31 +889,25 @@ int ObDataCheckpoint::get_need_flush_tablets_(const share::SCN recycle_scn,
int ObDataCheckpoint::freeze_base_on_needs_(share::SCN recycle_scn) int ObDataCheckpoint::freeze_base_on_needs_(share::SCN recycle_scn)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (get_rec_scn() <= recycle_scn) { if (is_tenant_freeze() || !is_flushing()) {
if (is_tenant_freeze() || (!is_flushing() && prepare_list_.is_empty())) { int64_t wait_flush_num = new_create_list_.checkpoint_list_.get_size() + active_list_.checkpoint_list_.get_size();
int64_t wait_flush_num = bool logstream_freeze = true;
new_create_list_.checkpoint_list_.get_size() ObSArray<ObTabletID> need_flush_tablets;
+ active_list_.checkpoint_list_.get_size(); if (wait_flush_num > MAX_FREEZE_CHECKPOINT_NUM) {
bool logstream_freeze = true; if (OB_FAIL(get_need_flush_tablets_(recycle_scn, need_flush_tablets))) {
ObSArray<ObTabletID> need_flush_tablets; // do nothing
if (wait_flush_num > MAX_FREEZE_CHECKPOINT_NUM) { } else {
if (OB_FAIL(get_need_flush_tablets_(recycle_scn, need_flush_tablets))) { int need_flush_num = need_flush_tablets.count();
// do nothing logstream_freeze = need_flush_num * 100 / wait_flush_num > TABLET_FREEZE_PERCENT;
} else {
int need_flush_num = need_flush_tablets.count();
logstream_freeze =
need_flush_num * 100 / wait_flush_num > TABLET_FREEZE_PERCENT;
}
} }
}
if (logstream_freeze) { if (logstream_freeze) {
if (OB_FAIL(ls_->logstream_freeze(false /* !is_sync */))) { if (OB_FAIL(ls_->logstream_freeze(false /* !is_sync */))) {
STORAGE_LOG(WARN, "minor freeze failed", K(ret), K(ls_->get_ls_id())); STORAGE_LOG(WARN, "minor freeze failed", K(ret), K(ls_->get_ls_id()));
}
} else if (OB_FAIL(ls_->batch_tablet_freeze(need_flush_tablets, false /* !is_sync */))) {
STORAGE_LOG(WARN, "batch tablet freeze failed",
K(ret), K(ls_->get_ls_id()), K(need_flush_tablets));
} }
} else if (OB_FAIL(ls_->batch_tablet_freeze(need_flush_tablets, false /* !is_sync */))) {
STORAGE_LOG(WARN, "batch tablet freeze failed", K(ret), K(ls_->get_ls_id()), K(need_flush_tablets));
} }
} }
return ret; return ret;

View File

@ -114,6 +114,7 @@ public:
} }
share::SCN get_rec_scn(); share::SCN get_rec_scn();
share::SCN get_active_rec_scn();
// if min_rec_scn <= the input rec_scn // if min_rec_scn <= the input rec_scn
// logstream freeze // logstream freeze
int flush(share::SCN recycle_scn, bool need_freeze = true); int flush(share::SCN recycle_scn, bool need_freeze = true);

View File

@ -265,7 +265,7 @@ bool ObCheckPointService::cannot_recycle_log_over_threshold_(const int64_t thres
return cannot_recycle_log_over_threshold; return cannot_recycle_log_over_threshold;
} }
int ObCheckPointService::flush_if_need_(bool need_flush) int ObCheckPointService::flush_if_need_()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
@ -284,7 +284,7 @@ int ObCheckPointService::flush_if_need_(bool need_flush)
int64_t ls_cnt = 0; int64_t ls_cnt = 0;
int64_t succ_ls_cnt = 0; int64_t succ_ls_cnt = 0;
for (; OB_SUCC(iter->get_next(ls)); ++ls_cnt) { for (; OB_SUCC(iter->get_next(ls)); ++ls_cnt) {
if (OB_TMP_FAIL(ls->flush_if_need(need_flush))) { if (OB_TMP_FAIL(ls->flush_if_need(true))) {
STORAGE_LOG(WARN, "flush ls failed", KR(tmp_ret), KPC(ls)); STORAGE_LOG(WARN, "flush ls failed", KR(tmp_ret), KPC(ls));
tmp_ret = OB_SUCCESS; tmp_ret = OB_SUCCESS;
} else { } else {
@ -361,7 +361,7 @@ void ObCheckPointService::ObCheckClogDiskUsageTask::runTimerTask()
} }
} }
if (need_flush && OB_FAIL(checkpoint_service_.flush_if_need_(need_flush))) { if (need_flush && OB_FAIL(checkpoint_service_.flush_if_need_())) {
STORAGE_LOG(ERROR, "flush if needed failed", K(ret), K(need_flush)); STORAGE_LOG(ERROR, "flush if needed failed", K(ret), K(need_flush));
} }
} }

View File

@ -60,7 +60,7 @@ private:
bool get_disk_usage_threshold_(int64_t &threshold); bool get_disk_usage_threshold_(int64_t &threshold);
bool cannot_recycle_log_over_threshold_(const int64_t threshold, const bool need_update_checkpoint_scn); bool cannot_recycle_log_over_threshold_(const int64_t threshold, const bool need_update_checkpoint_scn);
int flush_if_need_(bool need_flush); int flush_if_need_();
// reduce the risk of clog full due to checkpoint long interval // reduce the risk of clog full due to checkpoint long interval
static int64_t CHECK_CLOG_USAGE_INTERVAL; static int64_t CHECK_CLOG_USAGE_INTERVAL;
static int64_t CHECKPOINT_INTERVAL; static int64_t CHECKPOINT_INTERVAL;