fix archive_lag_target bad case

This commit is contained in:
taoshuning
2023-07-12 08:18:17 +00:00
committed by ob-robot
parent ef8f47dc40
commit a5b93de261
5 changed files with 69 additions and 39 deletions

View File

@ -463,46 +463,58 @@ int ObArchiveFetcher::check_need_delay_(const ObArchiveLogFetchTask &task,
const LSN &end_lsn = task.get_end_offset();
LSN offset;
SCN fetch_scn;
int64_t last_fetch_timestamp = OB_INVALID_TIMESTAMP;
need_delay = false;
int64_t send_task_count = 0;
int64_t ls_archive_task_count = 0;
int64_t send_task_status_count = 0;
const bool new_block = start_lsn == cur_lsn;
palf::LSN max_no_limit_lsn;
storage::ObLSHandle handle;
share::SCN offline_scn;
GET_LS_TASK_CTX(ls_mgr_, id) {
if (OB_FAIL(ls_archive_task->get_fetcher_progress(station, offset, fetch_scn))) {
ARCHIVE_LOG(WARN, "get fetch progress failed", K(ret), K(id), K(station));
} else if (OB_FAIL(ls_archive_task->get_send_task_count(station, send_task_count))) {
ARCHIVE_LOG(WARN, "get send task count failed", K(ret), K(id), K(station));
} else if (OB_FAIL(ls_archive_task->get_max_no_limit_lsn(station, max_no_limit_lsn))) {
ARCHIVE_LOG(WARN, "get max_no_limit_lsn failed", K(id), K(station));
} else if (send_task_count >= MAX_LS_SEND_TASK_COUNT_LIMIT) {
need_delay = true;
ARCHIVE_LOG(TRACE, "send_task_count exceed threshold, need delay",
K(id), K(station), K(send_task_count));
} else {
ls_archive_task_count = ls_mgr_->get_ls_task_count();
send_task_status_count = archive_sender_->get_send_task_status_count();
if (ls_archive_task_count < send_task_status_count) {
if (OB_FAIL(MTL(storage::ObLSService*)->get_ls(id, handle, ObLSGetMod::ARCHIVE_MOD))) {
ARCHIVE_LOG(WARN, "get ls failed", K(id));
} else if (OB_FAIL(handle.get_ls()->get_offline_scn(offline_scn))) {
ARCHIVE_LOG(WARN, "get offline_scn failed", K(id));
} else if (OB_UNLIKELY(offline_scn.is_valid())) {
// if ls is offline, it should be archived as soon as possible
need_delay = false;
} else {
GET_LS_TASK_CTX(ls_mgr_, id) {
if (OB_FAIL(ls_archive_task->get_fetcher_progress(station, offset, fetch_scn, last_fetch_timestamp))) {
ARCHIVE_LOG(WARN, "get fetch progress failed", K(ret), K(id), K(station));
} else if (OB_FAIL(ls_archive_task->get_send_task_count(station, send_task_count))) {
ARCHIVE_LOG(WARN, "get send task count failed", K(ret), K(id), K(station));
} else if (OB_FAIL(ls_archive_task->get_max_no_limit_lsn(station, max_no_limit_lsn))) {
ARCHIVE_LOG(WARN, "get max_no_limit_lsn failed", K(id), K(station));
} else if (send_task_count >= MAX_LS_SEND_TASK_COUNT_LIMIT) {
need_delay = true;
ARCHIVE_LOG(TRACE, "archive_sender_ task status count more than ls archive task count, just wait",
K(ls_archive_task_count), K(send_task_status_count), K(need_delay));
ARCHIVE_LOG(TRACE, "send_task_count exceed threshold, need delay",
K(id), K(station), K(send_task_count));
} else {
check_capacity_enough_(commit_lsn, cur_lsn, end_lsn, data_enough, data_full);
if (data_full) {
// although data buffer not enough, but data reaches the end of the block, do archive
ARCHIVE_LOG(TRACE, "data buffer reach clog block end, do archive",
K(id), K(station), K(end_lsn), K(commit_lsn));
} else if (! check_scn_enough_(id, new_block, cur_lsn, max_no_limit_lsn, base_scn, fetch_scn)) {
ls_archive_task_count = ls_mgr_->get_ls_task_count();
send_task_status_count = archive_sender_->get_send_task_status_count();
if (ls_archive_task_count < send_task_status_count) {
need_delay = true;
ARCHIVE_LOG(TRACE, "scn not enough, need delay", K(id), K(station), K(new_block), K(cur_lsn),
K(max_no_limit_lsn), K(base_scn), K(fetch_scn));
} else if (! data_enough) {
// data not enough to fill unit, just wait
need_delay = true;
ARCHIVE_LOG(TRACE, "data not enough, need delay", K(id), K(station), K(commit_lsn),
K(cur_lsn), K(end_lsn), K(data_enough));
ARCHIVE_LOG(TRACE, "archive_sender_ task status count more than ls archive task count, just wait",
K(ls_archive_task_count), K(send_task_status_count), K(need_delay));
} else {
check_capacity_enough_(commit_lsn, cur_lsn, end_lsn, data_enough, data_full);
if (data_full) {
// although data buffer not enough, but data reaches the end of the block, do archive
ARCHIVE_LOG(TRACE, "data buffer reach clog block end, do archive",
K(id), K(station), K(end_lsn), K(commit_lsn));
} else if (! check_scn_enough_(id, new_block, cur_lsn, max_no_limit_lsn, base_scn, fetch_scn, last_fetch_timestamp)) {
need_delay = true;
ARCHIVE_LOG(TRACE, "scn not enough, need delay", K(id), K(station), K(new_block), K(cur_lsn),
K(max_no_limit_lsn), K(base_scn), K(fetch_scn));
} else if (! data_enough) {
// data not enough to fill unit, just wait
need_delay = true;
ARCHIVE_LOG(TRACE, "data not enough, need delay", K(id), K(station), K(commit_lsn),
K(cur_lsn), K(end_lsn), K(data_enough));
}
}
}
}
@ -526,7 +538,8 @@ bool ObArchiveFetcher::check_scn_enough_(const share::ObLSID &id,
const palf::LSN &lsn,
const palf::LSN &max_no_limit_lsn,
const SCN &base_scn,
const SCN &fetch_scn)
const SCN &fetch_scn,
const int64_t last_fetch_timestamp)
{
int ret = OB_SUCCESS;
bool bret = false; // archive limit default
@ -537,6 +550,10 @@ bool ObArchiveFetcher::check_scn_enough_(const share::ObLSID &id,
bret = true;
// when ls archive task init or update, the max_no_limit_lsn set
// logs whose lsn smaller than the max_no_limit_lsn will ignore the archive_lag_target limit
} else if (common::ObTimeUtility::fast_current_time() - last_fetch_timestamp >= lag_target) {
// for standby tenant, sync_scn will stop at the tenant dropping scn X,
// so logs whose scn bigger than (X - archive_lag_target) will be archived according to the condition
bret = true;
} else if (OB_FAIL(get_max_archive_point_(replayable_scn))) {
ARCHIVE_LOG(WARN, "get replayable_scn failed", K(id));
} else if (new_block) {

View File

@ -143,7 +143,8 @@ private:
// 1.1.2 检查日志流落后程度是否需要触发归档
bool check_scn_enough_(const share::ObLSID &id, const bool new_block, const palf::LSN &lsn,
const palf::LSN &max_no_limit_lsn, const share::SCN &base_scn, const share::SCN &fetch_scn);
const palf::LSN &max_no_limit_lsn, const share::SCN &base_scn, const share::SCN &fetch_scn,
const int64_t last_fetch_timestamp);
// 1.2 初始化TmpMemoryHelper
int init_helper_(ObArchiveLogFetchTask &task, const LSN &commit_lsn, TmpMemoryHelper &helper);

View File

@ -195,6 +195,7 @@ bool GenFetchTaskFunctor::operator()(const ObLSID &id, ObLSArchiveTask *ls_archi
LogFileTuple archive_tuple;
int64_t unused_file_id = 0;
int64_t unused_file_offset = 0;
int64_t unused_timestamp = common::OB_INVALID_TIMESTAMP;
if (OB_ISNULL(ls_archive_task)) {
ret = OB_ERR_UNEXPECTED;
ARCHIVE_LOG(ERROR, "ls_archive_task is NULL", K(ret), K(id), K(ls_archive_task));
@ -210,7 +211,7 @@ bool GenFetchTaskFunctor::operator()(const ObLSID &id, ObLSArchiveTask *ls_archi
ARCHIVE_LOG(TRACE, "cache sequenced log size reach limit, just wait", K(id), K(seq_lsn), K(archive_tuple));
} else if (OB_FAIL(get_commit_index_(id, commit_lsn))) {
ARCHIVE_LOG(WARN, "get commit index failed", K(ret), K(id));
} else if (OB_FAIL(ls_archive_task->get_fetcher_progress(station, fetch_lsn, fetch_scn))) {
} else if (OB_FAIL(ls_archive_task->get_fetcher_progress(station, fetch_lsn, fetch_scn, unused_timestamp))) {
ARCHIVE_LOG(WARN, "get fetch progress failed", K(ret), K(ls_archive_task));
} else {
LSN lsn = seq_lsn;

View File

@ -12,7 +12,9 @@
#include "ob_ls_task.h"
#include <cstdint>
#include "lib/ob_define.h"
#include "lib/ob_errno.h"
#include "lib/time/ob_time_utility.h"
#include "lib/utility/ob_print_utils.h"
#include "ob_archive_define.h" // ArchiveKey
#include "share/backup/ob_archive_piece.h" // ObArchivePiece
@ -146,7 +148,8 @@ int ObLSArchiveTask::push_fetch_log(ObArchiveLogFetchTask &task)
int ObLSArchiveTask::get_fetcher_progress(const ArchiveWorkStation &station,
LSN &offset,
SCN &scn)
SCN &scn,
int64_t &last_fetch_timestamp)
{
int ret = OB_SUCCESS;
RLockGuard guard(rwlock_);
@ -159,7 +162,7 @@ int ObLSArchiveTask::get_fetcher_progress(const ArchiveWorkStation &station,
ARCHIVE_LOG(WARN, "stale task, just skip", K(ret), K(station), KPC(this));
} else {
LogFileTuple tuple;
dest_.get_fetcher_progress(tuple);
dest_.get_fetcher_progress(tuple, last_fetch_timestamp);
offset = tuple.get_lsn();
scn= tuple.get_scn();
}
@ -169,11 +172,12 @@ int ObLSArchiveTask::get_fetcher_progress(const ArchiveWorkStation &station,
int ObLSArchiveTask::get_sorted_fetch_log(ObArchiveLogFetchTask *&task)
{
int ret = OB_SUCCESS;
int64_t unused_timestamp = OB_INVALID_TIMESTAMP;
WLockGuard guard(rwlock_);
ObArchiveLogFetchTask *tmp_task = NULL;
LogFileTuple tuple;
task = NULL;
dest_.get_fetcher_progress(tuple);
dest_.get_fetcher_progress(tuple, unused_timestamp);
const LSN &cur_offset = tuple.get_lsn();
if (OB_FAIL(dest_.get_top_fetch_log(tmp_task))) {
@ -504,6 +508,7 @@ ObLSArchiveTask::ArchiveDest::ArchiveDest() :
piece_dir_exist_(false),
max_seq_log_offset_(),
max_fetch_info_(),
last_fetch_timestamp_(OB_INVALID_TIMESTAMP),
wait_send_task_array_(),
wait_send_task_count_(0),
send_task_queue_(NULL),
@ -527,6 +532,7 @@ void ObLSArchiveTask::ArchiveDest::destroy()
piece_dir_exist_ = false;
max_seq_log_offset_.reset();
max_fetch_info_.reset();
last_fetch_timestamp_ = OB_INVALID_TIMESTAMP;
free_fetch_log_tasks_();
free_send_task_status_();
@ -596,9 +602,11 @@ int ObLSArchiveTask::ArchiveDest::update_sequencer_progress(const int64_t size,
return ret;
}
void ObLSArchiveTask::ArchiveDest::get_fetcher_progress(LogFileTuple &tuple) const
void ObLSArchiveTask::ArchiveDest::get_fetcher_progress(LogFileTuple &tuple,
int64_t &last_fetch_timestamp) const
{
tuple = max_fetch_info_;
last_fetch_timestamp = last_fetch_timestamp_;
}
int ObLSArchiveTask::ArchiveDest::get_top_fetch_log(ObArchiveLogFetchTask *&task)
@ -626,6 +634,7 @@ int ObLSArchiveTask::ArchiveDest::update_fetcher_progress(const SCN &round_start
ARCHIVE_LOG(ERROR, "fetcher progress rollback", K(ret), K(max_fetch_info_), K(tuple));
} else {
max_fetch_info_ = tuple;
last_fetch_timestamp_ = common::ObTimeUtility::fast_current_time();
ARCHIVE_LOG(TRACE, "update fetcher progress succ", K(max_fetch_info_));
}
return ret;

View File

@ -97,7 +97,8 @@ public:
// 获取fetch进度
int get_fetcher_progress(const ArchiveWorkStation &station,
palf::LSN &offset,
share::SCN &scn);
share::SCN &scn,
int64_t &last_fetch_timestamp);
int compensate_piece(const ArchiveWorkStation &station,
const int64_t next_compensate_piece_id);
@ -160,7 +161,7 @@ private:
void destroy();
void get_sequencer_progress(LSN &offset) const;
int update_sequencer_progress(const int64_t size, const LSN &offset);
void get_fetcher_progress(LogFileTuple &tuple) const;
void get_fetcher_progress(LogFileTuple &tuple, int64_t &last_fetch_timestamp) const;
int update_fetcher_progress(const share::SCN &round_start_scn, const LogFileTuple &tuple);
int push_fetch_log(ObArchiveLogFetchTask &task);
int push_send_task(ObArchiveSendTask &task, ObArchiveWorker &worker);
@ -196,6 +197,7 @@ private:
LSN max_seq_log_offset_;
LogFileTuple max_fetch_info_;
int64_t last_fetch_timestamp_;
ObArchiveLogFetchTask *wait_send_task_array_[MAX_FETCH_TASK_NUM];
int64_t wait_send_task_count_;
ObArchiveTaskStatus *send_task_queue_;