Fix Add LS Archive Task

This commit is contained in:
obdev
2023-02-21 13:44:31 +00:00
committed by ob-robot
parent 0aa6a494c8
commit 6d8e0f1191
6 changed files with 43 additions and 26 deletions

View File

@ -12,6 +12,7 @@
#include "ob_archive_sequencer.h"
#include "lib/ob_define.h"
#include "lib/ob_errno.h"
#include "lib/utility/ob_macro_utils.h"
#include "ob_ls_mgr.h" // ObArchiveLSMgr
#include "logservice/ob_log_service.h" // ObLogService
@ -201,6 +202,9 @@ bool GenFetchTaskFunctor::operator()(const ObLSID &id, ObLSArchiveTask *ls_archi
ARCHIVE_LOG(WARN, "get sequence progress failed", K(ret), K(id), KPC(ls_archive_task));
} else if (OB_FAIL(ls_archive_task->get_archive_progress(station, unused_file_id, unused_file_offset, archive_tuple))) {
ARCHIVE_LOG(WARN, "get archive progress failed", K(ret), K(id), KPC(ls_archive_task));
} else if (OB_UNLIKELY(seq_lsn < archive_tuple.get_lsn())) {
ret = OB_ERR_UNEXPECTED;
ARCHIVE_LOG(ERROR, "seq_lsn smaller than archive progress lsn", K(id), K(seq_lsn), K(archive_tuple));
} else if (seq_lsn - archive_tuple.get_lsn() >= MAX_LS_ARCHIVE_MEMORY_LIMIT) {
// just skip
ARCHIVE_LOG(TRACE, "cache sequenced log size reach limit, just wait", K(id), K(seq_lsn), K(archive_tuple));

View File

@ -15,6 +15,7 @@
#include "lib/ob_define.h"
#include "lib/time/ob_time_utility.h"
#include "share/backup/ob_backup_struct.h" // ObBackupPathString
#include "share/ob_debug_sync.h" // DEBUG
#include "storage/ls/ob_ls.h" // ObLS
#include "storage/tx_storage/ob_ls_map.h" // ObLSIterator
#include "storage/tx_storage/ob_ls_service.h" // ObLSService
@ -345,6 +346,8 @@ void ObArchiveLSMgr::do_thread_task_()
const bool is_in_doing = state.is_doing() || state.is_interrupted() || state.is_suspend();
gc_stale_ls_task_(key, is_in_doing);
DEBUG_SYNC(BEFORE_ARCHIVE_ADD_LS_TASK);
if (is_in_doing) {
add_ls_task_();
}

View File

@ -54,7 +54,7 @@ int ObLSArchiveTask::init(const StartArchiveHelper &helper, ObArchiveAllocator *
ARCHIVE_LOG(WARN, "invalid argument", K(ret), K(helper), K(allocator));
} else {
allocator_ = allocator;
update_unlock_(helper, allocator_);
ret = update_unlock_(helper, allocator_);
}
return ret;
}
@ -69,7 +69,7 @@ int ObLSArchiveTask::update_ls_task(const StartArchiveHelper &helper)
} else if (OB_UNLIKELY(! is_task_stale_(helper.get_station()))) {
ARCHIVE_LOG(INFO, "ls archive task exist, skip it", K(ret), K(helper));
} else {
update_unlock_(helper, allocator_);
ret = update_unlock_(helper, allocator_);
}
return ret;
}
@ -450,18 +450,20 @@ int ObLSArchiveTask::mark_error(const ArchiveKey &key)
return ret;
}
void ObLSArchiveTask::update_unlock_(const StartArchiveHelper &helper,
int ObLSArchiveTask::update_unlock_(const StartArchiveHelper &helper,
ObArchiveAllocator *allocator)
{
int ret = OB_SUCCESS;
id_ = helper.get_ls_id();
tenant_id_ = helper.get_tenant_id();
station_ = helper.get_station();
round_start_scn_ = helper.get_round_start_scn();
dest_.init(helper.get_piece_min_lsn(), helper.get_offset(),
helper.get_file_id(), helper.get_file_offset(),
helper.get_piece(), helper.get_max_archived_scn(),
ret = dest_.init(helper.get_piece_min_lsn(), helper.get_offset(),
helper.get_file_id(), helper.get_file_offset(),
helper.get_piece(), helper.get_max_archived_scn(),
helper.is_log_gap_exist(), allocator);
ARCHIVE_LOG(INFO, "update_unlock_", KPC(this), K(helper));
return ret;
}
bool ObLSArchiveTask::is_task_stale_(const ArchiveWorkStation &station) const
@ -488,7 +490,6 @@ ObLSArchiveTask::ArchiveDest::ArchiveDest() :
max_fetch_info_(),
wait_send_task_array_(),
wait_send_task_count_(0),
seq_no_(0),
send_task_queue_(NULL),
allocator_(NULL)
{}
@ -515,7 +516,7 @@ void ObLSArchiveTask::ArchiveDest::destroy()
allocator_ = NULL;
}
void ObLSArchiveTask::ArchiveDest::init(const LSN &piece_min_lsn,
int ObLSArchiveTask::ArchiveDest::init(const LSN &piece_min_lsn,
const LSN &lsn,
const int64_t file_id,
const int64_t file_offset,
@ -524,28 +525,37 @@ void ObLSArchiveTask::ArchiveDest::init(const LSN &piece_min_lsn,
const bool is_log_gap_exist,
ObArchiveAllocator *allocator)
{
int ret = OB_SUCCESS;
const ObArchivePiece &cur_piece = max_archived_info_.get_piece();
if (! cur_piece.is_valid() || piece != cur_piece) {
piece_min_lsn_ = piece_min_lsn;
}
LogFileTuple tmp_tuple(lsn, max_archived_scn, piece);
LogFileTuple tuple;
if (max_archived_info_.is_valid()) {
tuple = std::max(tmp_tuple, max_archived_info_);
const bool renew_context = (!max_archived_info_.is_valid()) || max_archived_info_ < tmp_tuple;
if (renew_context) {
if (archive_file_id_ > file_id || (archive_file_id_ == file_id && archive_file_offset_ > file_offset)) {
ret = OB_ERR_UNEXPECTED;
ARCHIVE_LOG(ERROR, "local cache archive progress is old, but file info is bigger", K(piece_min_lsn),
K(tmp_tuple), K(piece), K(file_id), K(file_offset), KPC(this));
} else {
piece_min_lsn_ = piece_min_lsn;
max_archived_info_ = tmp_tuple;
archive_file_id_ = file_id;
archive_file_offset_ = file_offset;
has_encount_error_ = is_log_gap_exist;
max_seq_log_offset_ = lsn;
max_fetch_info_ = tmp_tuple;
ARCHIVE_LOG(INFO, "update archive dest with remote info", K(piece_min_lsn),
K(tmp_tuple), K(piece), K(file_id), K(file_offset), KPC(this));
}
} else {
tuple = tmp_tuple;
max_seq_log_offset_ = max_archived_info_.get_lsn();
max_fetch_info_ = max_archived_info_;
ARCHIVE_LOG(INFO, "update archive dest with local archive progress", K(piece_min_lsn),
K(tmp_tuple), K(piece), K(file_id), K(file_offset), KPC(this));
}
has_encount_error_ = is_log_gap_exist;
max_archived_info_ = tuple;
archive_file_id_ = file_id;
archive_file_offset_ = file_offset;
max_seq_log_offset_ = lsn;
max_fetch_info_ = tuple;
wait_send_task_count_ = 0;
seq_no_ = 0;
free_fetch_log_tasks_();
free_send_task_status_();
allocator_ = allocator;
return ret;
}
void ObLSArchiveTask::ArchiveDest::get_sequencer_progress(LSN &offset) const

View File

@ -150,7 +150,7 @@ private:
~ArchiveDest();
public:
void init(const LSN &piece_min_lsn, const LSN &lsn, const int64_t file_id,
int init(const LSN &piece_min_lsn, const LSN &lsn, const int64_t file_id,
const int64_t file_offset, const share::ObArchivePiece &piece,
const share::SCN &max_archived_scn, const bool is_log_gap_exist,
ObArchiveAllocator *allocator);
@ -192,7 +192,6 @@ private:
LogFileTuple max_fetch_info_;
ObArchiveLogFetchTask *wait_send_task_array_[MAX_FETCH_TASK_NUM];
int64_t wait_send_task_count_;
int64_t seq_no_;
ObArchiveTaskStatus *send_task_queue_;
ObArchiveAllocator *allocator_;
@ -203,7 +202,7 @@ private:
private:
bool is_task_stale_(const ArchiveWorkStation &station) const;
void update_unlock_(const StartArchiveHelper &helper, ObArchiveAllocator *allocator);
int update_unlock_(const StartArchiveHelper &helper, ObArchiveAllocator *allocator);
private:
ObLSID id_;

View File

@ -138,7 +138,7 @@ void ObLogRestoreService::run1()
lib::set_thread_name("LogRessvr");
ObCurTraceId::init(GCONF.self_addr_);
const int64_t THREAD_RUN_INTERVAL = 5 * 1000 * 1000L;
const int64_t THREAD_RUN_INTERVAL = 1 * 1000 * 1000L;
if (OB_UNLIKELY(! inited_)) {
LOG_ERROR_RET(OB_NOT_INIT, "ObLogRestoreService not init", "tenant_id", MTL_ID());
} else {

View File

@ -451,6 +451,7 @@ class ObString;
ACT(BEFORE_RESTORE_HANDLE_FETCH_LOG_TASK,)\
ACT(BEFORE_DATA_TABLETS_MIGRATION_TASK,)\
ACT(AFTER_LS_GC_DELETE_ALL_TABLETS,)\
ACT(BEFORE_ARCHIVE_ADD_LS_TASK,)\
ACT(MAX_DEBUG_SYNC_POINT,)
DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF);