Modify archive memory usage strategy

This commit is contained in:
obdev
2023-04-13 06:35:44 +00:00
committed by ob-robot
parent 7b958a4535
commit bb01a2fb4e

View File

@ -433,12 +433,17 @@ int ObArchiveFetcher::get_max_lsn_scn_(const ObLSID &id, palf::LSN &lsn, SCN &sc
return ret; return ret;
} }
// 消费LogFetchTask任务条件必须满足条件: // Archive Memory usage limit function is reached mainly in this function,
// 1. 日志量足够压缩 // so the basic rule to check if need delay based on the send_task count and send_task_status count.
// 2. 该日志流归档任务有效(leader/backup_zone) // 1) send_task count means total task count for single ls;
// 以及以上条件其中一条: // 2) send_task_status means total ls count. For archive, a ls leader gone and backed,
// 3. 日志流delay超过一定阈值 // it has different archive context and different send_task_status.
// 4. 可以塞满任务 //
// If any one reach the upper limit, just need delay.
//
// The archive progress lag is smaller than target, just need delay.
//
// The buffer to archive is not enough and not reach the block end, just need delay.
int ObArchiveFetcher::check_need_delay_(const ObLSID &id, int ObArchiveFetcher::check_need_delay_(const ObLSID &id,
const ArchiveWorkStation &station, const ArchiveWorkStation &station,
const LSN &cur_lsn, const LSN &cur_lsn,
@ -457,33 +462,37 @@ int ObArchiveFetcher::check_need_delay_(const ObLSID &id,
int64_t ls_archive_task_count = 0; int64_t ls_archive_task_count = 0;
int64_t send_task_status_count = 0; int64_t send_task_status_count = 0;
if (FALSE_IT(check_capacity_enough_(commit_lsn, cur_lsn, end_lsn, data_enough, data_full))) { GET_LS_TASK_CTX(ls_mgr_, id) {
} else if (data_full) { if (OB_FAIL(ls_archive_task->get_fetcher_progress(station, offset, fetch_scn))) {
// data is full, do archive ARCHIVE_LOG(WARN, "get fetch progress failed", K(ret), K(id), K(station));
} else if (! data_enough) { } else if (OB_FAIL(ls_archive_task->get_send_task_count(station, send_task_count))) {
// data not enough to fill unit, just wait ARCHIVE_LOG(WARN, "get send task count failed", K(ret), K(id), K(station));
need_delay = true; } else if (send_task_count >= MAX_LS_SEND_TASK_COUNT_LIMIT) {
ARCHIVE_LOG(TRACE, "data not enough, need delay", K(id), K(station), K(commit_lsn), need_delay = true;
K(cur_lsn), K(end_lsn), K(data_enough)); ARCHIVE_LOG(TRACE, "send_task_count exceed threshold, need delay",
} else { K(id), K(station), K(send_task_count));
GET_LS_TASK_CTX(ls_mgr_, id) { } else if (! check_scn_enough_(fetch_scn, commit_scn)) {
if (OB_FAIL(ls_archive_task->get_fetcher_progress(station, offset, fetch_scn))) { need_delay = true;
ARCHIVE_LOG(WARN, "get fetch progress failed", K(ret), K(id), K(station)); ARCHIVE_LOG(TRACE, "scn not enough, need delay", K(id),
} else if (OB_FAIL(ls_archive_task->get_send_task_count(station, send_task_count))) { K(station), K(fetch_scn), K(commit_scn));
ARCHIVE_LOG(WARN, "get send task count failed", K(ret), K(id), K(station)); } else {
} else if (true == (need_delay = (send_task_count >= MAX_LS_SEND_TASK_COUNT_LIMIT))) { ls_archive_task_count = ls_mgr_->get_ls_task_count();
ARCHIVE_LOG(TRACE, "send_task_count exceed threshold, need delay", send_task_status_count = archive_sender_->get_send_task_status_count();
K(id), K(station), K(send_task_count)); if (ls_archive_task_count < send_task_status_count) {
} else if (true == (need_delay = ! check_scn_enough_(fetch_scn, commit_scn))) { need_delay = true;
ARCHIVE_LOG(TRACE, "scn not enough, need delay", K(id), ARCHIVE_LOG(TRACE, "archive_sender_ task status count more than ls archive task count, just wait",
K(station), K(fetch_scn), K(commit_scn)); K(ls_archive_task_count), K(send_task_status_count), K(need_delay));
} else { } else {
ls_archive_task_count = ls_mgr_->get_ls_task_count(); check_capacity_enough_(commit_lsn, cur_lsn, end_lsn, data_enough, data_full);
send_task_status_count = archive_sender_->get_send_task_status_count(); if (data_full) {
if (ls_archive_task_count < send_task_status_count) { // although data buffer not enough, but data reaches the end of the block, do archive
ARCHIVE_LOG(TRACE, "data buffer reach clog block end, do archive",
K(id), K(station), K(end_lsn), K(commit_lsn));
} else if (! data_enough) {
// data not enough to fill unit, just wait
need_delay = true; need_delay = true;
ARCHIVE_LOG(TRACE, "archive_sender_ task status count more than ls archive task count, just wait", ARCHIVE_LOG(TRACE, "data not enough, need delay", K(id), K(station), K(commit_lsn),
K(ls_archive_task_count), K(send_task_status_count), K(need_delay)); K(cur_lsn), K(end_lsn), K(data_enough));
} }
} }
} }