[Archive] add parameter archive_lag_target

This commit is contained in:
taoshuning
2023-07-11 07:18:23 +00:00
committed by ob-robot
parent 2f11cd7f67
commit 1f46662883
15 changed files with 157 additions and 39 deletions

View File

@ -14,6 +14,7 @@
#include "lib/oblog/ob_log_module.h" #include "lib/oblog/ob_log_module.h"
#include "lib/time/ob_time_utility.h" #include "lib/time/ob_time_utility.h"
#include "lib/utility/ob_macro_utils.h" #include "lib/utility/ob_macro_utils.h"
#include "logservice/palf/lsn.h"
#include "ob_archive_fetcher.h" #include "ob_archive_fetcher.h"
#include "lib/ob_define.h" #include "lib/ob_define.h"
#include "lib/ob_errno.h" #include "lib/ob_errno.h"
@ -31,10 +32,14 @@
#include "ob_archive_util.h" #include "ob_archive_util.h"
#include "ob_archive_sequencer.h" // ObArchivesSequencer #include "ob_archive_sequencer.h" // ObArchivesSequencer
#include "objit/common/ob_item_type.h" // print #include "objit/common/ob_item_type.h" // print
#include "observer/omt/ob_tenant_config_mgr.h"
#include "rootserver/ob_tenant_info_loader.h" // ObTenantInfoLoader
#include "share/ob_debug_sync.h" // DEBUG_SYNC #include "share/ob_debug_sync.h" // DEBUG_SYNC
#include "share/ob_debug_sync_point.h" // LOG_ARCHIVE_PUSH_LOG #include "share/ob_debug_sync_point.h" // LOG_ARCHIVE_PUSH_LOG
#include "share/ob_errno.h" #include "share/ob_errno.h"
#include "share/ob_ls_id.h" #include "share/ob_ls_id.h"
#include "share/ob_tenant_info_proxy.h" // ObAllTenantInfo
#include "share/scn.h"
namespace oceanbase namespace oceanbase
{ {
@ -362,8 +367,8 @@ int ObArchiveFetcher::handle_log_fetch_task_(ObArchiveLogFetchTask &task)
const ObLSID id = task.get_ls_id(); const ObLSID id = task.get_ls_id();
const ArchiveWorkStation &station = task.get_station(); const ArchiveWorkStation &station = task.get_station();
ArchiveKey key = station.get_round(); ArchiveKey key = station.get_round();
LSN commit_lsn;
SCN commit_scn; SCN commit_scn;
LSN commit_lsn;
DEBUG_SYNC(BEFORE_ARCHIVE_FETCH_LOG); DEBUG_SYNC(BEFORE_ARCHIVE_FETCH_LOG);
@ -376,12 +381,11 @@ int ObArchiveFetcher::handle_log_fetch_task_(ObArchiveLogFetchTask &task)
ARCHIVE_LOG(WARN, "invalid task", K(ret), K(task)); ARCHIVE_LOG(WARN, "invalid task", K(ret), K(task));
} else if (OB_FAIL(get_max_lsn_scn_(id, commit_lsn, commit_scn))) { } else if (OB_FAIL(get_max_lsn_scn_(id, commit_lsn, commit_scn))) {
ARCHIVE_LOG(WARN, "get max lsn scn failed", K(ret), K(id)); ARCHIVE_LOG(WARN, "get max lsn scn failed", K(ret), K(id));
} else if (OB_FAIL(check_need_delay_(id, station, task.get_cur_offset(), } else if (OB_FAIL(check_need_delay_(task, commit_lsn, need_delay))) {
task.get_end_offset(), commit_lsn, commit_scn, need_delay))) { ARCHIVE_LOG(WARN, "check need delay failed", K(ret), K(commit_lsn), K(task));
ARCHIVE_LOG(WARN, "check need delay failed", K(ret), K(commit_lsn), K(commit_scn), K(task));
} else if (need_delay) { } else if (need_delay) {
// just skip // just skip
ARCHIVE_LOG(INFO, "need delay", K(task), K(need_delay)); ARCHIVE_LOG(TRACE, "need delay", K(task), K(need_delay));
} else if (OB_FAIL(init_helper_(task, commit_lsn, helper))) { } else if (OB_FAIL(init_helper_(task, commit_lsn, helper))) {
ARCHIVE_LOG(WARN, "init helper failed", K(ret), K(task)); ARCHIVE_LOG(WARN, "init helper failed", K(ret), K(task));
} else if (OB_FAIL(init_iterator_(task.get_ls_id(), helper, palf_handle_guard, iter))) { } else if (OB_FAIL(init_iterator_(task.get_ls_id(), helper, palf_handle_guard, iter))) {
@ -428,7 +432,7 @@ int ObArchiveFetcher::get_max_lsn_scn_(const ObLSID &id, palf::LSN &lsn, SCN &sc
} else if (OB_FAIL(palf_handle_guard.get_end_scn(scn))) { } else if (OB_FAIL(palf_handle_guard.get_end_scn(scn))) {
ARCHIVE_LOG(WARN, "get end ts ns failed", K(ret), K(id)); ARCHIVE_LOG(WARN, "get end ts ns failed", K(ret), K(id));
} else { } else {
ARCHIVE_LOG(INFO, "get end lsn scn succ", K(ret), K(id), K(lsn), K(scn)); ARCHIVE_LOG(TRACE, "get end lsn scn succ", K(ret), K(id), K(lsn), K(scn));
} }
return ret; return ret;
} }
@ -444,37 +448,39 @@ int ObArchiveFetcher::get_max_lsn_scn_(const ObLSID &id, palf::LSN &lsn, SCN &sc
// The archive progress lag is smaller than target, just need delay. // The archive progress lag is smaller than target, just need delay.
// //
// The buffer to archive is not enough and not reach the block end, just need delay. // The buffer to archive is not enough and not reach the block end, just need delay.
int ObArchiveFetcher::check_need_delay_(const ObLSID &id, int ObArchiveFetcher::check_need_delay_(const ObArchiveLogFetchTask &task,
const ArchiveWorkStation &station,
const LSN &cur_lsn,
const LSN &end_lsn,
const LSN &commit_lsn, const LSN &commit_lsn,
const share::SCN &commit_scn,
bool &need_delay) bool &need_delay)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
bool data_enough = false; bool data_enough = false;
bool data_full = false; bool data_full = false;
const ObLSID &id = task.get_ls_id();
const ArchiveWorkStation &station = task.get_station();
const share::SCN &base_scn = task.get_base_scn();
const LSN &start_lsn = task.get_start_offset();
const LSN &cur_lsn = task.get_cur_offset();
const LSN &end_lsn = task.get_end_offset();
LSN offset; LSN offset;
SCN fetch_scn; SCN fetch_scn;
need_delay = false; need_delay = false;
int64_t send_task_count = 0; int64_t send_task_count = 0;
int64_t ls_archive_task_count = 0; int64_t ls_archive_task_count = 0;
int64_t send_task_status_count = 0; int64_t send_task_status_count = 0;
const bool new_block = start_lsn == cur_lsn;
palf::LSN max_no_limit_lsn;
GET_LS_TASK_CTX(ls_mgr_, id) { GET_LS_TASK_CTX(ls_mgr_, id) {
if (OB_FAIL(ls_archive_task->get_fetcher_progress(station, offset, fetch_scn))) { if (OB_FAIL(ls_archive_task->get_fetcher_progress(station, offset, fetch_scn))) {
ARCHIVE_LOG(WARN, "get fetch progress failed", K(ret), K(id), K(station)); ARCHIVE_LOG(WARN, "get fetch progress failed", K(ret), K(id), K(station));
} else if (OB_FAIL(ls_archive_task->get_send_task_count(station, send_task_count))) { } else if (OB_FAIL(ls_archive_task->get_send_task_count(station, send_task_count))) {
ARCHIVE_LOG(WARN, "get send task count failed", K(ret), K(id), K(station)); ARCHIVE_LOG(WARN, "get send task count failed", K(ret), K(id), K(station));
} else if (OB_FAIL(ls_archive_task->get_max_no_limit_lsn(station, max_no_limit_lsn))) {
ARCHIVE_LOG(WARN, "get max_no_limit_lsn failed", K(id), K(station));
} else if (send_task_count >= MAX_LS_SEND_TASK_COUNT_LIMIT) { } else if (send_task_count >= MAX_LS_SEND_TASK_COUNT_LIMIT) {
need_delay = true; need_delay = true;
ARCHIVE_LOG(TRACE, "send_task_count exceed threshold, need delay", ARCHIVE_LOG(TRACE, "send_task_count exceed threshold, need delay",
K(id), K(station), K(send_task_count)); K(id), K(station), K(send_task_count));
} else if (! check_scn_enough_(fetch_scn, commit_scn)) {
need_delay = true;
ARCHIVE_LOG(TRACE, "scn not enough, need delay", K(id),
K(station), K(fetch_scn), K(commit_scn));
} else { } else {
ls_archive_task_count = ls_mgr_->get_ls_task_count(); ls_archive_task_count = ls_mgr_->get_ls_task_count();
send_task_status_count = archive_sender_->get_send_task_status_count(); send_task_status_count = archive_sender_->get_send_task_status_count();
@ -488,6 +494,10 @@ int ObArchiveFetcher::check_need_delay_(const ObLSID &id,
// although data buffer not enough, but data reaches the end of the block, do archive // although data buffer not enough, but data reaches the end of the block, do archive
ARCHIVE_LOG(TRACE, "data buffer reach clog block end, do archive", ARCHIVE_LOG(TRACE, "data buffer reach clog block end, do archive",
K(id), K(station), K(end_lsn), K(commit_lsn)); K(id), K(station), K(end_lsn), K(commit_lsn));
} else if (! check_scn_enough_(id, new_block, cur_lsn, max_no_limit_lsn, base_scn, fetch_scn)) {
need_delay = true;
ARCHIVE_LOG(TRACE, "scn not enough, need delay", K(id), K(station), K(new_block), K(cur_lsn),
K(max_no_limit_lsn), K(base_scn), K(fetch_scn));
} else if (! data_enough) { } else if (! data_enough) {
// data not enough to fill unit, just wait // data not enough to fill unit, just wait
need_delay = true; need_delay = true;
@ -511,9 +521,30 @@ void ObArchiveFetcher::check_capacity_enough_(const LSN &commit_lsn,
data_enough = data_full || commit_lsn >= cur_lsn + unit_size_; data_enough = data_full || commit_lsn >= cur_lsn + unit_size_;
} }
bool ObArchiveFetcher::check_scn_enough_(const SCN &fetch_scn, const SCN &end_scn) const bool ObArchiveFetcher::check_scn_enough_(const share::ObLSID &id,
const bool new_block,
const palf::LSN &lsn,
const palf::LSN &max_no_limit_lsn,
const SCN &base_scn,
const SCN &fetch_scn)
{ {
return true; int ret = OB_SUCCESS;
bool bret = false; // archive limit default
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id_));
const int64_t lag_target = tenant_config.is_valid() ? tenant_config->archive_lag_target : 0L;
share::SCN replayable_scn;
if (lsn <= max_no_limit_lsn || 0 == lag_target) {
bret = true;
// when ls archive task init or update, the max_no_limit_lsn set
// logs whose lsn smaller than the max_no_limit_lsn will ignore the archive_lag_target limit
} else if (OB_FAIL(get_max_archive_point_(replayable_scn))) {
ARCHIVE_LOG(WARN, "get replayable_scn failed", K(id));
} else if (new_block) {
bret = ((replayable_scn.convert_to_ts() - base_scn.convert_to_ts()) >= lag_target);
} else {
bret = ((replayable_scn.convert_to_ts() - fetch_scn.convert_to_ts()) >= lag_target);
}
return bret;
} }
int ObArchiveFetcher::init_helper_(ObArchiveLogFetchTask &task, const LSN &commit_lsn, TmpMemoryHelper &helper) int ObArchiveFetcher::init_helper_(ObArchiveLogFetchTask &task, const LSN &commit_lsn, TmpMemoryHelper &helper)

View File

@ -135,15 +135,15 @@ private:
int get_max_lsn_scn_(const ObLSID &id, palf::LSN &lsn, share::SCN &scn); int get_max_lsn_scn_(const ObLSID &id, palf::LSN &lsn, share::SCN &scn);
// 1.1 检查任务是否delay处理 // 1.1 检查任务是否delay处理
int check_need_delay_(const ObLSID &id, const ArchiveWorkStation &station, const LSN &cur_lsn, int check_need_delay_(const ObArchiveLogFetchTask &task, const LSN &commit_lsn, bool &need_delay);
const LSN &end_lsn, const LSN &commit_lsn, const share::SCN &commit_scn, bool &need_delay);
// 1.1.1 检查ob日志是否有产生满足处理单元大小的数据 // 1.1.1 检查ob日志是否有产生满足处理单元大小的数据
void check_capacity_enough_(const LSN &commit_lsn, const LSN &cur_lsn, void check_capacity_enough_(const LSN &commit_lsn, const LSN &cur_lsn,
const LSN &end_offset, bool &data_enough, bool &data_full); const LSN &end_offset, bool &data_enough, bool &data_full);
// 1.1.2 检查日志流落后程度是否需要触发归档 // 1.1.2 检查日志流落后程度是否需要触发归档
bool check_scn_enough_(const share::SCN &fetch_scn, const share::SCN &end_scn) const; bool check_scn_enough_(const share::ObLSID &id, const bool new_block, const palf::LSN &lsn,
const palf::LSN &max_no_limit_lsn, const share::SCN &base_scn, const share::SCN &fetch_scn);
// 1.2 初始化TmpMemoryHelper // 1.2 初始化TmpMemoryHelper
int init_helper_(ObArchiveLogFetchTask &task, const LSN &commit_lsn, TmpMemoryHelper &helper); int init_helper_(ObArchiveLogFetchTask &task, const LSN &commit_lsn, TmpMemoryHelper &helper);

View File

@ -29,7 +29,8 @@ int ObArchiveIO::push_log(const ObString &uri,
const share::ObBackupStorageInfo *storage_info, const share::ObBackupStorageInfo *storage_info,
char *data, char *data,
const int64_t data_len, const int64_t data_len,
const int64_t offset) const int64_t offset,
const bool is_full_file)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObBackupIoAdapter util; ObBackupIoAdapter util;
@ -48,14 +49,28 @@ int ObArchiveIO::push_log(const ObString &uri,
} else if (OB_UNLIKELY(NULL == data || data_len < 0 || offset < 0)) { } else if (OB_UNLIKELY(NULL == data || data_len < 0 || offset < 0)) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
ARCHIVE_LOG(WARN, "invalid argument", K(ret), K(data), K(data_len)); ARCHIVE_LOG(WARN, "invalid argument", K(ret), K(data), K(data_len));
} else if (OB_FAIL(util.open_with_access_type(device_handle, fd, storage_info, uri, } else {
common::ObStorageAccessType::OB_STORAGE_ACCESS_RANDOMWRITER))) { if (is_full_file) {
ARCHIVE_LOG(INFO, "open_with_access_type failed", K(ret), K(uri), KP(storage_info)); if (OB_FAIL(util.open_with_access_type(device_handle, fd, storage_info, uri,
} else if (OB_ISNULL(device_handle)) { common::ObStorageAccessType::OB_STORAGE_ACCESS_OVERWRITER))) {
ret = OB_ERR_UNEXPECTED; ARCHIVE_LOG(INFO, "open_with_access_type failed", K(ret), K(uri), KP(storage_info));
ARCHIVE_LOG(ERROR, "device_handle is NULL", K(ret), K(device_handle), K(uri)); } else if (OB_ISNULL(device_handle)) {
} else if (OB_FAIL(device_handle->pwrite(fd, offset, data_len, data, write_size))) { ret = OB_ERR_UNEXPECTED;
ARCHIVE_LOG(WARN, "fail to write file", K(ret), K(uri), KP(storage_info), K(data), K(data_len)); ARCHIVE_LOG(ERROR, "device_handle is NULL", K(ret), K(device_handle), K(uri));
} else if (OB_FAIL(device_handle->write(fd, data, data_len, write_size))) {
ARCHIVE_LOG(WARN, "fail to write file", K(ret), K(uri), KP(storage_info), K(data), K(data_len));
}
} else {
if (OB_FAIL(util.open_with_access_type(device_handle, fd, storage_info, uri,
common::ObStorageAccessType::OB_STORAGE_ACCESS_RANDOMWRITER))) {
ARCHIVE_LOG(INFO, "open_with_access_type failed", K(ret), K(uri), KP(storage_info));
} else if (OB_ISNULL(device_handle)) {
ret = OB_ERR_UNEXPECTED;
ARCHIVE_LOG(ERROR, "device_handle is NULL", K(ret), K(device_handle), K(uri));
} else if (OB_FAIL(device_handle->pwrite(fd, offset, data_len, data, write_size))) {
ARCHIVE_LOG(WARN, "fail to write file", K(ret), K(uri), KP(storage_info), K(data), K(data_len));
}
}
} }
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;

View File

@ -32,7 +32,8 @@ public:
const share::ObBackupStorageInfo *storage_info, const share::ObBackupStorageInfo *storage_info,
char *data, char *data,
const int64_t data_len, const int64_t data_len,
const int64_t offset); const int64_t offset,
const bool is_full_file);
int mkdir(const ObString &uri, int mkdir(const ObString &uri,
const share::ObBackupStorageInfo *storage_info); const share::ObBackupStorageInfo *storage_info);

View File

@ -587,6 +587,7 @@ int ObArchiveSender::archive_log_(const ObBackupDest &backup_dest,
int64_t origin_data_len = 0; int64_t origin_data_len = 0;
char *filled_data = NULL; char *filled_data = NULL;
int64_t filled_data_len = 0; int64_t filled_data_len = 0;
const bool is_full_file = (task.get_end_lsn() - task.get_start_lsn()) == MAX_ARCHIVE_FILE_SIZE;
const int64_t start_ts = common::ObTimeUtility::current_time(); const int64_t start_ts = common::ObTimeUtility::current_time();
// 1. decide archive file // 1. decide archive file
if (OB_FAIL(decide_archive_file_(task, arg.cur_file_id_, arg.cur_file_offset_, if (OB_FAIL(decide_archive_file_(task, arg.cur_file_id_, arg.cur_file_offset_,
@ -616,7 +617,7 @@ int ObArchiveSender::archive_log_(const ObBackupDest &backup_dest,
ARCHIVE_LOG(WARN, "fill file header if needed failed", K(ret)); ARCHIVE_LOG(WARN, "fill file header if needed failed", K(ret));
} }
// 6. push log // 6. push log
else if (OB_FAIL(push_log_(id, path.get_obstr(), backup_dest.get_storage_info(), new_file ? else if (OB_FAIL(push_log_(id, path.get_obstr(), backup_dest.get_storage_info(), is_full_file, new_file ?
file_offset : file_offset + ARCHIVE_FILE_HEADER_SIZE, file_offset : file_offset + ARCHIVE_FILE_HEADER_SIZE,
new_file ? filled_data : origin_data, new_file ? filled_data_len : origin_data_len))) { new_file ? filled_data : origin_data, new_file ? filled_data_len : origin_data_len))) {
ARCHIVE_LOG(WARN, "push log failed", K(ret), K(task)); ARCHIVE_LOG(WARN, "push log failed", K(ret), K(task));
@ -726,6 +727,7 @@ int ObArchiveSender::fill_file_header_if_needed_(const ObArchiveSendTask &task,
int ObArchiveSender::push_log_(const ObLSID &id, int ObArchiveSender::push_log_(const ObLSID &id,
const ObString &uri, const ObString &uri,
const share::ObBackupStorageInfo *storage_info, const share::ObBackupStorageInfo *storage_info,
const bool is_full_file,
const int64_t offset, const int64_t offset,
char *data, char *data,
const int64_t data_len) const int64_t data_len)
@ -733,7 +735,7 @@ int ObArchiveSender::push_log_(const ObLSID &id,
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObArchiveIO archive_io; ObArchiveIO archive_io;
if (OB_FAIL(archive_io.push_log(uri, storage_info, data, data_len, offset))) { if (OB_FAIL(archive_io.push_log(uri, storage_info, data, data_len, offset, is_full_file))) {
ARCHIVE_LOG(WARN, "push log failed", K(ret)); ARCHIVE_LOG(WARN, "push log failed", K(ret));
} else { } else {
ARCHIVE_LOG(INFO, "push log succ", K(id)); ARCHIVE_LOG(INFO, "push log succ", K(id));

View File

@ -159,6 +159,7 @@ private:
int push_log_(const share::ObLSID &id, int push_log_(const share::ObLSID &id,
const ObString &uri, const ObString &uri,
const share::ObBackupStorageInfo *storage_info, const share::ObBackupStorageInfo *storage_info,
const bool is_full_file,
const int64_t offset, const int64_t offset,
char *data, char *data,
const int64_t data_len); const int64_t data_len);

View File

@ -284,12 +284,18 @@ int GenFetchTaskFunctor::generate_log_fetch_task_(const ObLSID &id,
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObArchiveLogFetchTask *tmp_task = NULL; ObArchiveLogFetchTask *tmp_task = NULL;
palf::PalfHandleGuard palf_handle;
share::SCN scn;
task = NULL; task = NULL;
if (OB_ISNULL(tmp_task = archive_fetcher_->alloc_log_fetch_task())) { if (OB_ISNULL(tmp_task = archive_fetcher_->alloc_log_fetch_task())) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
ARCHIVE_LOG(WARN, "alloc log fetch task failed", K(ret), K(id)); ARCHIVE_LOG(WARN, "alloc log fetch task failed", K(ret), K(id));
} else if (OB_FAIL(tmp_task->init(tenant_id_, id, station, start_lsn, end_lsn))) { } else if (OB_FAIL(log_service_->open_palf(id, palf_handle))) {
ARCHIVE_LOG(WARN, "open_palf failed", K(id));
} else if (OB_FAIL(palf_handle.locate_by_lsn_coarsely(start_lsn, scn))) {
ARCHIVE_LOG(WARN, "locate by lsn failed", K(id), K(start_lsn));
} else if (OB_FAIL(tmp_task->init(tenant_id_, id, station, scn, start_lsn, end_lsn))) {
ARCHIVE_LOG(WARN, "log fetch task init failed", K(ret), K(id), K(station)); ARCHIVE_LOG(WARN, "log fetch task init failed", K(ret), K(id), K(station));
} else { } else {
task = tmp_task; task = tmp_task;

View File

@ -57,6 +57,7 @@ ObArchiveLogFetchTask::~ObArchiveLogFetchTask()
int ObArchiveLogFetchTask::init(const uint64_t tenant_id, int ObArchiveLogFetchTask::init(const uint64_t tenant_id,
const ObLSID &id, const ObLSID &id,
const ArchiveWorkStation &station, const ArchiveWorkStation &station,
const share::SCN &base_scn,
const LSN &start_lsn, const LSN &start_lsn,
const LSN &end_lsn) const LSN &end_lsn)
{ {
@ -64,16 +65,18 @@ int ObArchiveLogFetchTask::init(const uint64_t tenant_id,
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id
|| !id.is_valid() || !id.is_valid()
|| !station.is_valid() || !station.is_valid()
|| !base_scn.is_valid()
|| !start_lsn.is_valid() || !start_lsn.is_valid()
|| ! end_lsn.is_valid() || ! end_lsn.is_valid()
|| end_lsn <= start_lsn)) { || end_lsn <= start_lsn)) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
ARCHIVE_LOG(WARN, "invalid argument", K(ret), K(tenant_id), ARCHIVE_LOG(WARN, "invalid argument", K(ret), K(tenant_id),
K(id), K(station), K(start_lsn), K(end_lsn)); K(id), K(station), K(base_scn), K(start_lsn), K(end_lsn));
} else { } else {
tenant_id_ = tenant_id; tenant_id_ = tenant_id;
id_ = id; id_ = id;
station_ = station; station_ = station;
base_scn_ = base_scn;
start_offset_ = start_lsn; start_offset_ = start_lsn;
cur_offset_ = start_lsn; cur_offset_ = start_lsn;
end_offset_ = end_lsn; end_offset_ = end_lsn;

View File

@ -68,11 +68,13 @@ public:
int init(const uint64_t tenant_id, int init(const uint64_t tenant_id,
const ObLSID &id, const ObLSID &id,
const ArchiveWorkStation &station, const ArchiveWorkStation &station,
const share::SCN &base_scn,
const LSN &start_lsn, const LSN &start_lsn,
const LSN &end_lsn); const LSN &end_lsn);
uint64_t get_tenant_id() const { return tenant_id_; } uint64_t get_tenant_id() const { return tenant_id_; }
ObLSID get_ls_id() const { return id_; } ObLSID get_ls_id() const { return id_; }
const ArchiveWorkStation &get_station() { return station_; } const ArchiveWorkStation &get_station() const { return station_; }
const share::SCN &get_base_scn() const { return base_scn_; }
const LSN &get_start_offset() const { return start_offset_; } const LSN &get_start_offset() const { return start_offset_; }
const LSN &get_cur_offset() const { return cur_offset_; } const LSN &get_cur_offset() const { return cur_offset_; }
const LSN &get_end_offset() const { return end_offset_; } const LSN &get_end_offset() const { return end_offset_; }
@ -97,6 +99,7 @@ public:
K_(station), K_(station),
K_(cur_piece), K_(cur_piece),
K_(next_piece), K_(next_piece),
K_(base_scn),
K_(start_offset), K_(start_offset),
K_(end_offset), K_(end_offset),
K_(cur_offset), K_(cur_offset),
@ -111,6 +114,7 @@ private:
ArchiveWorkStation station_; ArchiveWorkStation station_;
ObArchivePiece cur_piece_; // 该份数据属于ObArchivePiece目录 ObArchivePiece cur_piece_; // 该份数据属于ObArchivePiece目录
ObArchivePiece next_piece_; ObArchivePiece next_piece_;
share::SCN base_scn_;
LSN start_offset_; // 起始拉日志文件起始offset LSN start_offset_; // 起始拉日志文件起始offset
LSN end_offset_; // 拉取日志文件结束offset(如果是拉取完整文件任务,该值为infoblock起点) LSN end_offset_; // 拉取日志文件结束offset(如果是拉取完整文件任务,该值为infoblock起点)
LSN cur_offset_; LSN cur_offset_;

View File

@ -378,6 +378,19 @@ int ObLSArchiveTask::update_archive_progress(const ArchiveWorkStation &station,
return ret; return ret;
} }
int ObLSArchiveTask::get_max_no_limit_lsn(const ArchiveWorkStation &station, LSN &lsn)
{
int ret = OB_SUCCESS;
RLockGuard guard(rwlock_);
if (OB_UNLIKELY(station != station_)) {
ret = OB_LOG_ARCHIVE_LEADER_CHANGED;
ARCHIVE_LOG(INFO, "stale task, just skip it", K(ret), K(station), K(station_), K(id_));
} else {
dest_.get_max_no_limit_lsn(lsn);
}
return ret;
}
int ObLSArchiveTask::print_self() int ObLSArchiveTask::print_self()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -392,6 +405,7 @@ int64_t ObLSArchiveTask::ArchiveDest::to_string(char *buf, const int64_t buf_len
J_OBJ_START(); J_OBJ_START();
J_KV(K_(has_encount_error), J_KV(K_(has_encount_error),
K_(is_worm), K_(is_worm),
K_(max_no_limit_lsn),
K_(max_archived_info), K_(max_archived_info),
K_(max_seq_log_offset), K_(max_seq_log_offset),
K_(max_fetch_info), K_(max_fetch_info),
@ -458,7 +472,8 @@ int ObLSArchiveTask::update_unlock_(const StartArchiveHelper &helper,
tenant_id_ = helper.get_tenant_id(); tenant_id_ = helper.get_tenant_id();
station_ = helper.get_station(); station_ = helper.get_station();
round_start_scn_ = helper.get_round_start_scn(); round_start_scn_ = helper.get_round_start_scn();
ret = dest_.init(helper.get_piece_min_lsn(), helper.get_offset(), ret = dest_.init(helper.get_max_no_limit_lsn(),
helper.get_piece_min_lsn(), helper.get_offset(),
helper.get_file_id(), helper.get_file_offset(), helper.get_file_id(), helper.get_file_offset(),
helper.get_piece(), helper.get_max_archived_scn(), helper.get_piece(), helper.get_max_archived_scn(),
helper.is_log_gap_exist(), allocator); helper.is_log_gap_exist(), allocator);
@ -481,6 +496,7 @@ void ObLSArchiveTask::mock_init(const ObLSID &id, ObArchiveAllocator *allocator)
ObLSArchiveTask::ArchiveDest::ArchiveDest() : ObLSArchiveTask::ArchiveDest::ArchiveDest() :
has_encount_error_(false), has_encount_error_(false),
is_worm_(false), is_worm_(false),
max_no_limit_lsn_(),
piece_min_lsn_(), piece_min_lsn_(),
max_archived_info_(), max_archived_info_(),
archive_file_id_(OB_INVALID_ARCHIVE_FILE_ID), archive_file_id_(OB_INVALID_ARCHIVE_FILE_ID),
@ -503,6 +519,7 @@ void ObLSArchiveTask::ArchiveDest::destroy()
{ {
has_encount_error_ = false; has_encount_error_ = false;
is_worm_ = false; is_worm_ = false;
max_no_limit_lsn_.reset();
piece_min_lsn_.reset(); piece_min_lsn_.reset();
max_archived_info_.reset(); max_archived_info_.reset();
archive_file_id_ = OB_INVALID_ARCHIVE_FILE_ID; archive_file_id_ = OB_INVALID_ARCHIVE_FILE_ID;
@ -516,7 +533,8 @@ void ObLSArchiveTask::ArchiveDest::destroy()
allocator_ = NULL; allocator_ = NULL;
} }
int ObLSArchiveTask::ArchiveDest::init(const LSN &piece_min_lsn, int ObLSArchiveTask::ArchiveDest::init(const LSN &max_no_limit_lsn,
const LSN &piece_min_lsn,
const LSN &lsn, const LSN &lsn,
const int64_t file_id, const int64_t file_id,
const int64_t file_offset, const int64_t file_offset,
@ -551,6 +569,7 @@ int ObLSArchiveTask::ArchiveDest::init(const LSN &piece_min_lsn,
ARCHIVE_LOG(INFO, "update archive dest with local archive progress", K(piece_min_lsn), ARCHIVE_LOG(INFO, "update archive dest with local archive progress", K(piece_min_lsn),
K(tmp_tuple), K(piece), K(file_id), K(file_offset), KPC(this)); K(tmp_tuple), K(piece), K(file_id), K(file_offset), KPC(this));
} }
max_no_limit_lsn_ = max_no_limit_lsn;
wait_send_task_count_ = 0; wait_send_task_count_ = 0;
free_fetch_log_tasks_(); free_fetch_log_tasks_();
free_send_task_status_(); free_send_task_status_();
@ -782,6 +801,11 @@ void ObLSArchiveTask::ArchiveDest::get_archive_send_arg(ObArchiveSendDestArg &ar
arg.piece_dir_exist_ = piece_dir_exist_; arg.piece_dir_exist_ = piece_dir_exist_;
} }
void ObLSArchiveTask::ArchiveDest::get_max_no_limit_lsn(LSN &lsn)
{
lsn = max_no_limit_lsn_;
}
void ObLSArchiveTask::ArchiveDest::mark_error() void ObLSArchiveTask::ArchiveDest::mark_error()
{ {
has_encount_error_ = true; has_encount_error_ = true;

View File

@ -125,6 +125,8 @@ public:
int get_max_archive_info(const ArchiveKey &key, int get_max_archive_info(const ArchiveKey &key,
ObLSArchivePersistInfo &info); ObLSArchivePersistInfo &info);
int get_max_no_limit_lsn(const ArchiveWorkStation &station, LSN &lsn);
int mark_error(const ArchiveKey &key); int mark_error(const ArchiveKey &key);
int print_self(); int print_self();
@ -150,7 +152,8 @@ private:
~ArchiveDest(); ~ArchiveDest();
public: public:
int init(const LSN &piece_min_lsn, const LSN &lsn, const int64_t file_id, int init(const LSN &max_no_limit_lsn,
const LSN &piece_min_lsn, const LSN &lsn, const int64_t file_id,
const int64_t file_offset, const share::ObArchivePiece &piece, const int64_t file_offset, const share::ObArchivePiece &piece,
const share::SCN &max_archived_scn, const bool is_log_gap_exist, const share::SCN &max_archived_scn, const bool is_log_gap_exist,
ObArchiveAllocator *allocator); ObArchiveAllocator *allocator);
@ -170,6 +173,7 @@ private:
void get_archive_progress(int64_t &file_id, int64_t &file_offset, LogFileTuple &tuple); void get_archive_progress(int64_t &file_id, int64_t &file_offset, LogFileTuple &tuple);
void get_send_task_count(int64_t &count); void get_send_task_count(int64_t &count);
void get_archive_send_arg(ObArchiveSendDestArg &arg); void get_archive_send_arg(ObArchiveSendDestArg &arg);
void get_max_no_limit_lsn(LSN &lsn);
void mark_error(); void mark_error();
void print_tasks_(); void print_tasks_();
int64_t to_string(char *buf, const int64_t buf_len) const; int64_t to_string(char *buf, const int64_t buf_len) const;
@ -181,6 +185,8 @@ private:
private: private:
bool has_encount_error_; bool has_encount_error_;
bool is_worm_; bool is_worm_;
// archive_lag_target with noneffective for logs whose lsn smaller than this lsn
palf::LSN max_no_limit_lsn_;
palf::LSN piece_min_lsn_; palf::LSN piece_min_lsn_;
// archived log description // archived log description
LogFileTuple max_archived_info_; LogFileTuple max_archived_info_;

View File

@ -13,7 +13,7 @@
#include "ob_start_archive_helper.h" #include "ob_start_archive_helper.h"
#include "lib/ob_define.h" // OB_INVALID_FILE_ID #include "lib/ob_define.h" // OB_INVALID_FILE_ID
#include "lib/ob_errno.h" #include "lib/ob_errno.h"
#include "logservice/archiveservice/ob_archive_define.h" #include "ob_archive_define.h"
#include "logservice/ob_log_handler.h" #include "logservice/ob_log_handler.h"
#include "logservice/ob_log_service.h" // ObLogService #include "logservice/ob_log_service.h" // ObLogService
#include "logservice/palf/log_define.h" #include "logservice/palf/log_define.h"
@ -47,6 +47,7 @@ StartArchiveHelper::StartArchiveHelper(const ObLSID &id,
piece_interval_(piece_interval), piece_interval_(piece_interval),
genesis_scn_(genesis_scn), genesis_scn_(genesis_scn),
base_piece_id_(base_piece_id), base_piece_id_(base_piece_id),
max_offset_(),
start_offset_(), start_offset_(),
archive_file_id_(OB_INVALID_ARCHIVE_FILE_ID), archive_file_id_(OB_INVALID_ARCHIVE_FILE_ID),
archive_file_offset_(OB_INVALID_ARCHIVE_FILE_OFFSET), archive_file_offset_(OB_INVALID_ARCHIVE_FILE_OFFSET),
@ -65,6 +66,7 @@ StartArchiveHelper::~StartArchiveHelper()
piece_interval_ = 0; piece_interval_ = 0;
genesis_scn_.reset(); genesis_scn_.reset();
base_piece_id_ = 0; base_piece_id_ = 0;
max_offset_.reset();
start_offset_.reset(); start_offset_.reset();
archive_file_id_ = OB_INVALID_ARCHIVE_FILE_ID; archive_file_id_ = OB_INVALID_ARCHIVE_FILE_ID;
archive_file_offset_ = OB_INVALID_ARCHIVE_FILE_OFFSET; archive_file_offset_ = OB_INVALID_ARCHIVE_FILE_OFFSET;
@ -80,6 +82,7 @@ bool StartArchiveHelper::is_valid() const
&& station_.is_valid() && station_.is_valid()
&& piece_.is_valid() && piece_.is_valid()
&& max_archived_scn_.is_valid() && max_archived_scn_.is_valid()
&& max_offset_.is_valid()
&& (log_gap_exist_ && (log_gap_exist_
|| (start_offset_.is_valid() || (start_offset_.is_valid()
&& OB_INVALID_ARCHIVE_FILE_ID != archive_file_id_ && OB_INVALID_ARCHIVE_FILE_ID != archive_file_id_
@ -105,6 +108,14 @@ int StartArchiveHelper::handle()
ARCHIVE_LOG(WARN, "locate round start archive point failed", K(ret)); ARCHIVE_LOG(WARN, "locate round start archive point failed", K(ret));
} }
if (OB_SUCC(ret)) {
palf::PalfHandleGuard guard;
if (OB_FAIL(MTL(ObLogService*)->open_palf(id_, guard))) {
ARCHIVE_LOG(WARN, "open_palf failed", K(id_));
} else if (OB_FAIL(guard.get_end_lsn(max_offset_))) {
ARCHIVE_LOG(WARN, "get end_lsn failed", K(id_));
}
}
return ret; return ret;
} }

View File

@ -63,6 +63,7 @@ public:
uint64_t get_tenant_id() const { return tenant_id_; } uint64_t get_tenant_id() const { return tenant_id_; }
const ArchiveWorkStation &get_station() const { return station_; } const ArchiveWorkStation &get_station() const { return station_; }
const LSN &get_piece_min_lsn() const { return piece_min_lsn_; } const LSN &get_piece_min_lsn() const { return piece_min_lsn_; }
const LSN &get_max_no_limit_lsn() const { return max_offset_; }
const LSN &get_offset() const { return start_offset_; } const LSN &get_offset() const { return start_offset_; }
int64_t get_file_id() const { return archive_file_id_; } int64_t get_file_id() const { return archive_file_id_; }
int64_t get_file_offset() const { return archive_file_offset_; } int64_t get_file_offset() const { return archive_file_offset_; }
@ -78,6 +79,7 @@ public:
K_(genesis_scn), K_(genesis_scn),
K_(base_piece_id), K_(base_piece_id),
K_(piece_min_lsn), K_(piece_min_lsn),
K_(max_offset),
K_(start_offset), K_(start_offset),
K_(archive_file_id), K_(archive_file_id),
K_(archive_file_offset), K_(archive_file_offset),
@ -102,6 +104,7 @@ private:
share::SCN genesis_scn_; share::SCN genesis_scn_;
int64_t base_piece_id_; int64_t base_piece_id_;
LSN piece_min_lsn_; LSN piece_min_lsn_;
LSN max_offset_; // archive_lag_target with noneffective smaller than this lsn
LSN start_offset_; LSN start_offset_;
int64_t archive_file_id_; int64_t archive_file_id_;
int64_t archive_file_offset_; int64_t archive_file_offset_;

View File

@ -603,6 +603,16 @@ DEF_TIME(standby_db_fetch_log_rpc_timeout, OB_TENANT_PARAMETER, "15s",
"When the rpc timeout, the log transport service switches to another server of the log restore source tenant to fetch logs. " "When the rpc timeout, the log transport service switches to another server of the log restore source tenant to fetch logs. "
"Range: [2s, +∞)", "Range: [2s, +∞)",
ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_TIME(archive_lag_target, OB_TENANT_PARAMETER, "120s",
"[0ms,7200s]",
"The lag target of the log archive. The log archive target affects not only the backup availability, "
"but also the lag of the standby database based on archive. Values larger than 7200s are not reasonable lag. "
"The typical value is 120s. Extremely low values can result in high IOPS, which is not optimal for object storage; "
"such values can also affect the performance of the database. The value 0ms means to archive as soon as possible. "
"Range: [0ms,7200s]",
ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(_log_writer_parallelism, OB_TENANT_PARAMETER, "3", DEF_INT(_log_writer_parallelism, OB_TENANT_PARAMETER, "3",
"[1,8]", "[1,8]",
"the number of parallel log writer threads that can be used to write redo log entries to disk. ", "the number of parallel log writer threads that can be used to write redo log entries to disk. ",

View File

@ -23,6 +23,7 @@ select name from __all_virtual_sys_parameter_stat where name not like "module_te
name name
all_server_list all_server_list
arbitration_timeout arbitration_timeout
archive_lag_target
audit_sys_operations audit_sys_operations
audit_trail audit_trail
autoinc_cache_refresh_interval autoinc_cache_refresh_interval