Optimize get max archive log info

This commit is contained in:
obdev
2023-02-27 07:11:22 +00:00
committed by ob-robot
parent 640e418beb
commit 45a7245eb4
3 changed files with 142 additions and 61 deletions

View File

@ -16,6 +16,7 @@
#include "lib/ob_errno.h"
#include "lib/string/ob_string.h" // ObString
#include "lib/utility/ob_macro_utils.h"
#include "lib/utility/utility.h"
#include "logservice/archiveservice/ob_archive_define.h"
#include "logservice/archiveservice/ob_archive_file_utils.h" // ObArchiveFileUtils
#include "logservice/archiveservice/ob_archive_util.h"
@ -279,13 +280,15 @@ int ObLogArchivePieceContext::update_file_info(const int64_t dest_id,
return ret;
}
//TODO 可以依赖当前piece context优化获取速度
int ObLogArchivePieceContext::get_max_archive_log(palf::LSN &lsn, SCN &scn)
{
int ret = OB_SUCCESS;
if (OB_FAIL(get_round_range_())) {
ObLogArchivePieceContext orign_context;
if (OB_FAIL(deep_copy_to(orign_context))) {
CLOG_LOG(WARN, "piece context deep copy failed", KPC(this));
} else if (OB_FAIL(get_round_range_())) {
CLOG_LOG(WARN, "get round range failed", K(ret), KPC(this));
} else if (OB_FAIL(get_max_archive_log_(lsn, scn))) {
} else if (OB_FAIL(get_max_archive_log_(orign_context, lsn, scn))) {
CLOG_LOG(WARN, "get max archive log failed", K(ret), KPC(this));
} else {
CLOG_LOG(INFO, "get max archive log succ", K(ret), K(lsn), K(scn), KPC(this));
@ -310,7 +313,7 @@ int ObLogArchivePieceContext::get_ls_meta_data(
const SCN &timestamp,
char *buf,
const int64_t buf_size,
int64_t &real_size,
int64_t &read_size,
const bool fuzzy_match)
{
int ret = OB_SUCCESS;
@ -320,7 +323,7 @@ int ObLogArchivePieceContext::get_ls_meta_data(
|| !timestamp.is_valid())) {
ret = OB_INVALID_ARGUMENT;
} else {
ret = get_ls_meta_data_(meta_type, timestamp, fuzzy_match, buf, buf_size, real_size);
ret = get_ls_meta_data_(meta_type, timestamp, fuzzy_match, buf, buf_size, read_size);
}
return ret;
}
@ -1062,6 +1065,7 @@ int ObLogArchivePieceContext::get_min_lsn_in_piece_()
int ret = OB_SUCCESS;
char *buf = NULL;
const int64_t buf_size = archive::ARCHIVE_FILE_HEADER_SIZE;
const int64_t file_offset = 0;
int64_t read_size = 0;
palf::LSN base_lsn;
if (inner_piece_context_.is_empty_()
@ -1073,8 +1077,10 @@ int ObLogArchivePieceContext::get_min_lsn_in_piece_()
CLOG_LOG(WARN, "alloc memory failed", K(ret));
} else if (OB_FAIL(read_part_file_(inner_piece_context_.round_id_,
inner_piece_context_.piece_id_, inner_piece_context_.min_file_id_,
buf, buf_size, read_size, base_lsn))) {
file_offset, buf, buf_size, read_size))) {
CLOG_LOG(WARN, "read part file failed", K(ret));
} else if (OB_FAIL(extract_file_base_lsn_(buf, read_size, base_lsn))) {
CLOG_LOG(WARN, "extract base_lsn failed", KPC(this));
} else {
inner_piece_context_.min_lsn_in_piece_ = base_lsn;
CLOG_LOG(INFO, "get min lsn in piece succ", K(ret), K(base_lsn), KPC(this));
@ -1180,13 +1186,13 @@ int ObLogArchivePieceContext::get_(const palf::LSN &lsn,
return ret;
}
int ObLogArchivePieceContext::get_max_archive_log_(palf::LSN &lsn, SCN &scn)
int ObLogArchivePieceContext::get_max_archive_log_(const ObLogArchivePieceContext &origin, palf::LSN &lsn, SCN &scn)
{
int ret = OB_SUCCESS;
bool done = false;
int64_t round_id = max_round_id_;
while (!done && OB_SUCC(ret) && round_id >= min_round_id_) {
ret = get_max_log_in_round_(round_id, lsn, scn, done);
ret = get_max_log_in_round_(origin, round_id, lsn, scn, done);
round_id--;
}
if (OB_SUCC(ret) && !done) {
@ -1196,12 +1202,17 @@ int ObLogArchivePieceContext::get_max_archive_log_(palf::LSN &lsn, SCN &scn)
return ret;
}
int ObLogArchivePieceContext::get_max_log_in_round_(const int64_t round_id, palf::LSN &lsn, SCN &scn, bool &exist)
int ObLogArchivePieceContext::get_max_log_in_round_(const ObLogArchivePieceContext &origin,
const int64_t round_id,
palf::LSN &lsn,
SCN &scn,
bool &exist)
{
int ret = OB_SUCCESS;
round_context_.reset();
inner_piece_context_.reset();
round_context_.round_id_ = round_id;
common::ObTimeGuard guard("get_max_log_in_piece_", 1000 * 1000L);
if (OB_FAIL(load_round_info_())) {
ARCHIVE_LOG(WARN, "load round info failed", K(ret), K_(id), K(round_id));
} else if (! round_context_.is_valid()) {
@ -1212,22 +1223,29 @@ int ObLogArchivePieceContext::get_max_log_in_round_(const int64_t round_id, palf
} else if (round_context_.is_in_active_state() && round_context_.max_piece_id_ == 0) {
ARCHIVE_LOG(INFO, "no piece exist, just skip", K(ret), K_(id), K_(round_context));
} else {
guard.click("load_round_info");
int64_t piece_id = round_context_.max_piece_id_;
while (!exist && OB_SUCC(ret) && piece_id >= round_context_.min_piece_id_) {
ret = get_max_log_in_piece_(round_id, piece_id, lsn, scn, exist);
ret = get_max_log_in_piece_(origin, round_id, piece_id, lsn, scn, exist);
piece_id--;
}
guard.click("get_max_log_in_pieces");
}
return ret;
}
int ObLogArchivePieceContext::get_max_log_in_piece_(const int64_t round_id,
const int64_t piece_id, palf::LSN &lsn, SCN &scn, bool &exist)
int ObLogArchivePieceContext::get_max_log_in_piece_(const ObLogArchivePieceContext &origin,
const int64_t round_id,
const int64_t piece_id,
palf::LSN &lsn,
SCN &scn,
bool &exist)
{
int ret = OB_SUCCESS;
inner_piece_context_.reset();
inner_piece_context_.round_id_ = round_id;
inner_piece_context_.piece_id_ = piece_id;
common::ObTimeGuard guard("get_max_log_in_piece_", 1000 * 1000L);
if (OB_FAIL(get_piece_meta_info_(piece_id))) {
ARCHIVE_LOG(WARN, "get piece meta info failed", K(ret), K_(id), K_(round_context), K(piece_id));
} else if (OB_FAIL(get_piece_file_range_())) {
@ -1235,12 +1253,14 @@ int ObLogArchivePieceContext::get_max_log_in_piece_(const int64_t round_id,
} else if (inner_piece_context_.is_empty_() || inner_piece_context_.max_file_id_ == 0) {
ARCHIVE_LOG(INFO, "no file exist in piece, just skip", K(ret), K_(id), K_(round_context), K_(inner_piece_context));
} else {
ret = get_max_log_in_file_(round_id, piece_id, inner_piece_context_.max_file_id_, lsn, scn, exist);
guard.click("get_piece_meta_info");
ret = get_max_log_in_file_(origin, round_id, piece_id, inner_piece_context_.max_file_id_, lsn, scn, exist);
}
return ret;
}
int ObLogArchivePieceContext::get_max_log_in_file_(const int64_t round_id,
int ObLogArchivePieceContext::get_max_log_in_file_(const ObLogArchivePieceContext &origin,
const int64_t round_id,
const int64_t piece_id,
const int64_t file_id,
palf::LSN &lsn,
@ -1252,43 +1272,75 @@ int ObLogArchivePieceContext::get_max_log_in_file_(const int64_t round_id,
const int64_t buf_size = archive::ARCHIVE_FILE_DATA_BUF_SIZE;
const int64_t header_size = archive::ARCHIVE_FILE_HEADER_SIZE;
int64_t read_size = 0;
palf::LSN base_lsn;
palf::MemoryStorage mem_storage;
palf::MemPalfGroupBufferIterator iter;
// if get max log context match origin piece context
// only need to read data not restored
const bool context_match = (dest_id_ == origin.dest_id_
&& round_id == origin.round_context_.round_id_
&& round_id == origin.inner_piece_context_.round_id_
&& piece_id == origin.inner_piece_context_.piece_id_
&& file_id == origin.inner_piece_context_.file_id_
&& origin.inner_piece_context_.file_offset_ > 0);
const int64_t file_offset = context_match ? origin.inner_piece_context_.file_offset_ : 0;
// if context match, use max_lsn in origin piece context
palf::LSN base_lsn = context_match ? origin.inner_piece_context_.max_lsn_ : palf::LSN(palf::LOG_INVALID_LSN_VAL);
common::ObTimeGuard guard("get_max_log_in_file", 1000 * 1000L);
if (OB_ISNULL(buf = (char *)mtl_malloc(buf_size, "ArcFile"))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
CLOG_LOG(WARN, "alloc memory failed", K(ret), K_(id));
} else if (OB_FAIL(read_part_file_(round_id, piece_id, file_id,
buf, buf_size, read_size, base_lsn))) {
file_offset, buf, buf_size, read_size))) {
CLOG_LOG(WARN, "read part file failed", K(ret), K_(id));
} else if (OB_FAIL(mem_storage.init(base_lsn))) {
CLOG_LOG(WARN, "MemoryStorage init failed", K(ret), K(base_lsn), KPC(this));
} else if (OB_FAIL(mem_storage.append(buf + header_size, read_size - header_size))) {
CLOG_LOG(WARN, "MemoryStorage append failed", K(ret));
} else if (OB_FAIL(iter.init(base_lsn, [](){ return palf::LSN(palf::LOG_MAX_LSN_VAL); }, &mem_storage))) {
CLOG_LOG(WARN, "iter init failed", K(ret));
} else if (0 == read_size && context_match) {
lsn = base_lsn;
exist = true;
CLOG_LOG(INFO, "origin piece context match and no more archive log exists to restore",
K(context_match), K(base_lsn), K(origin), KPC(this));
} else if (!context_match && OB_FAIL(extract_file_base_lsn_(buf, buf_size, base_lsn))) {
// if contex not match, extract base_lsn in the archive file
CLOG_LOG(WARN, "extract base_lsn failed", KPC(this));
} else {
palf::LogGroupEntry entry;
while (OB_SUCC(ret)) {
if (OB_FAIL(iter.next())) {
if (OB_ITER_END != ret) {
CLOG_LOG(WARN, "iter next failed", K(ret), KPC(this), K(iter));
guard.click("read_data");
const char *log_buf = context_match ? buf : buf + header_size;
const int64_t log_buf_size = context_match ? read_size : read_size - header_size;
if (OB_FAIL(mem_storage.init(base_lsn))) {
CLOG_LOG(WARN, "MemoryStorage init failed", K(ret), K(base_lsn), KPC(this));
} else if (OB_FAIL(mem_storage.append(log_buf, log_buf_size))) {
CLOG_LOG(WARN, "MemoryStorage append failed", K(log_buf), K(log_buf_size),
K(context_match), K(file_id), K(file_offset), K(id_));
} else if (OB_FAIL(iter.init(base_lsn, [](){ return palf::LSN(palf::LOG_MAX_LSN_VAL); }, &mem_storage))) {
CLOG_LOG(WARN, "iter init failed", K(id_), K(base_lsn), K(log_buf), K(log_buf_size));
} else {
palf::LogGroupEntry entry;
while (OB_SUCC(ret)) {
if (OB_FAIL(iter.next())) {
if (OB_ITER_END != ret) {
CLOG_LOG(WARN, "iter next failed", K(ret), KPC(this), K(iter));
}
} else if (OB_FAIL(iter.get_entry(entry, lsn))) {
CLOG_LOG(WARN, "get entry failed", K(ret));
} else if (! entry.check_integrity()) {
ret = OB_INVALID_DATA;
CLOG_LOG(WARN, "invalid data", K(ret), KPC(this), K(iter), K(entry));
} else {
lsn = lsn + entry.get_serialize_size();
scn = entry.get_scn();
exist = true;
}
} else if (OB_FAIL(iter.get_entry(entry, lsn))) {
CLOG_LOG(WARN, "get entry failed", K(ret));
} else if (! entry.check_integrity()) {
ret = OB_INVALID_DATA;
CLOG_LOG(WARN, "invalid data", K(ret), KPC(this), K(iter), K(entry));
} else {
scn = entry.get_scn();
exist = true;
}
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
if (exist) {
CLOG_LOG(INFO, "get max archive log through log iteration from archive log", K(context_match),
K(lsn), K(scn), K(base_lsn), K(origin), KPC(this));
}
}
guard.click("iterate_log");
}
}
if (NULL != buf) {
mtl_free(buf);
buf = NULL;
@ -1347,6 +1399,7 @@ int ObLogArchivePieceContext::seek_in_file_(const int64_t file_id, const SCN &sc
char *buf = NULL;
const int64_t buf_size = archive::ARCHIVE_FILE_DATA_BUF_SIZE;
const int64_t header_size = archive::ARCHIVE_FILE_HEADER_SIZE;
const int64_t file_offset = 0;
int64_t read_size = 0;
palf::LSN base_lsn;
palf::MemoryStorage mem_storage;
@ -1355,8 +1408,10 @@ int ObLogArchivePieceContext::seek_in_file_(const int64_t file_id, const SCN &sc
ret = OB_ALLOCATE_MEMORY_FAILED;
CLOG_LOG(WARN, "alloc memory failed", K(ret));
} else if (OB_FAIL(read_part_file_(inner_piece_context_.round_id_,
inner_piece_context_.piece_id_, file_id, buf, buf_size, read_size, base_lsn))) {
inner_piece_context_.piece_id_, file_id, file_offset, buf, buf_size, read_size))) {
CLOG_LOG(WARN, "read part file failed", K(ret), K(file_id), KPC(this));
} else if (OB_FAIL(extract_file_base_lsn_(buf, buf_size, base_lsn))) {
CLOG_LOG(WARN, "extract base_lsn failed", KPC(this));
} else if (OB_FAIL(mem_storage.init(base_lsn))) {
CLOG_LOG(WARN, "MemoryStorage init failed", K(ret), K_(id), K(base_lsn), KPC(this));
} else if (OB_FAIL(mem_storage.append(buf + header_size, read_size - header_size))) {
@ -1406,10 +1461,10 @@ int ObLogArchivePieceContext::seek_in_file_(const int64_t file_id, const SCN &sc
int ObLogArchivePieceContext::read_part_file_(const int64_t round_id,
const int64_t piece_id,
const int64_t file_id,
const int64_t file_offset,
char *buf,
const int64_t buf_size,
int64_t &read_size,
palf::LSN &base_lsn)
int64_t &read_size)
{
int ret = OB_SUCCESS;
share::ObBackupPath path;
@ -1419,16 +1474,27 @@ int ObLogArchivePieceContext::read_part_file_(const int64_t round_id,
round_id, piece_id, id_, file_id, path))) {
CLOG_LOG(WARN, "get ls archive file path failed", K(ret), KPC(this));
} else if (OB_FAIL(archive::ObArchiveFileUtils::range_read(path.get_ptr(),
archive_dest_.get_storage_info(), buf, buf_size, 0, read_size))) {
archive_dest_.get_storage_info(), buf, buf_size, file_offset, read_size))) {
CLOG_LOG(WARN, "range read failed", K(ret), K(path));
} else if (OB_UNLIKELY(read_size < archive::ARCHIVE_FILE_HEADER_SIZE)) {
ret = OB_INVALID_DATA;
CLOG_LOG(WARN, "invalid data", K(ret), K(read_size), K(buf_size), K(path));
}
return ret;
}
int ObLogArchivePieceContext::extract_file_base_lsn_(const char *buf,
const int64_t buf_size,
palf::LSN &base_lsn)
{
int ret = OB_SUCCESS;
int64_t pos = 0;
archive::ObArchiveFileHeader header;
if (OB_ISNULL(buf) || OB_UNLIKELY(buf_size < archive::ARCHIVE_FILE_HEADER_SIZE)) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid buffer", K(buf), K(buf_size), K(id_));
} else if (OB_FAIL(header.deserialize(buf, buf_size, pos))) {
CLOG_LOG(WARN, "archive file header deserialize failed", K(ret));
CLOG_LOG(WARN, "archive file header deserialize failed", K(buf), K(buf_size));
} else if (OB_UNLIKELY(! header.is_valid())) {
ret = OB_INVALID_DATA;
CLOG_LOG(WARN, "archive file header not valid", K(ret), K(header), K(path));
CLOG_LOG(WARN, "archive file header not valid", K(header), K(buf), K(buf_size));
} else {
base_lsn = palf::LSN(header.start_lsn_);
}
@ -1441,7 +1507,7 @@ int ObLogArchivePieceContext::get_ls_meta_data_(
const bool fuzzy_match,
char *buf,
const int64_t buf_size,
int64_t &real_size)
int64_t &read_size)
{
int ret = OB_SUCCESS;
int64_t piece_id = 0;
@ -1455,7 +1521,7 @@ int ObLogArchivePieceContext::get_ls_meta_data_(
} else if (FALSE_IT(piece_id = cal_piece_id_(timestamp))) {
} else {
piece_id = min(round_context_.max_piece_id_, piece_id);
ret = get_ls_meta_in_piece_(meta_type, timestamp, fuzzy_match, piece_id, buf, buf_size, real_size);
ret = get_ls_meta_in_piece_(meta_type, timestamp, fuzzy_match, piece_id, buf, buf_size, read_size);
}
return ret;
}
@ -1467,7 +1533,7 @@ int ObLogArchivePieceContext::get_ls_meta_in_piece_(
const int64_t base_piece_id,
char *buf,
const int64_t buf_size,
int64_t &real_size)
int64_t &read_size)
{
int ret = OB_SUCCESS;
bool done = false;
@ -1508,7 +1574,7 @@ int ObLogArchivePieceContext::get_ls_meta_in_piece_(
round_context_.round_id_, piece_id, id_, meta_type, file_id, path))) {
CLOG_LOG(WARN, "ger ls meta record prefix failed", K(ret), KPC(this));
} else if (OB_FAIL(archive::ObArchiveFileUtils::read_file(path.get_obstr(),
archive_dest_.get_storage_info(), buf, buf_size, real_size))) {
archive_dest_.get_storage_info(), buf, buf_size, read_size))) {
CLOG_LOG(WARN, "read_file failed", K(ret), K(path), KPC(this));
}
}

View File

@ -279,20 +279,23 @@ private:
bool &done,
bool &to_newest);
int get_max_archive_log_(palf::LSN &lsn, share::SCN &scn);
int get_max_archive_log_(const ObLogArchivePieceContext &origin, palf::LSN &lsn, share::SCN &scn);
int get_max_log_in_round_(const int64_t round_id,
int get_max_log_in_round_(const ObLogArchivePieceContext &origin,
const int64_t round_id,
palf::LSN &lsn,
share::SCN &scn,
bool &exist);
int get_max_log_in_piece_(const int64_t round_id,
int get_max_log_in_piece_(const ObLogArchivePieceContext &origin,
const int64_t round_id,
const int64_t piece_id,
palf::LSN &lsn,
share::SCN &scn,
bool &exist);
int get_max_log_in_file_(const int64_t round_id,
int get_max_log_in_file_(const ObLogArchivePieceContext &origin,
const int64_t round_id,
const int64_t piece_id,
const int64_t file_id,
palf::LSN &lsn,
@ -306,11 +309,14 @@ private:
int read_part_file_(const int64_t round_id,
const int64_t piece_id,
const int64_t file_id,
const int64_t file_offset,
char *buf,
const int64_t buf_size,
int64_t &read_size,
palf::LSN &base_lsn);
int64_t &read_size);
int extract_file_base_lsn_(const char *buf,
const int64_t buf_size,
palf::LSN &base_lsn);
int get_ls_meta_data_(const share::ObArchiveLSMetaType &meta_type,
const share::SCN &timestamp,
const bool fuzzy_match,

View File

@ -568,6 +568,7 @@ int ObLogRestoreHandler::check_restore_to_newest(share::SCN &end_scn, share::SCN
end_scn = SCN::min_scn();
archive_scn = SCN::min_scn();
SCN restore_scn = SCN::min_scn();
CLOG_LOG(INFO, "start check restore to newest", K(id_));
{
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -591,11 +592,19 @@ int ObLogRestoreHandler::check_restore_to_newest(share::SCN &end_scn, share::SCN
if (OB_SUCC(ret) && NULL != piece_context) {
palf::LSN archive_lsn;
if (OB_FAIL(palf_handle_.get_end_scn(end_scn))) {
CLOG_LOG(WARN, "get end scn failed", K(ret), K(id_));
palf::LSN end_lsn;
if (OB_FAIL(palf_handle_.get_end_lsn(end_lsn))) {
CLOG_LOG(WARN, "get end lsn failed", K(id_));
} else if (OB_FAIL(palf_handle_.get_end_scn(end_scn))) {
CLOG_LOG(WARN, "get end scn failed", K(id_));
} else if (OB_FAIL(piece_context->get_max_archive_log(archive_lsn, archive_scn))) {
CLOG_LOG(WARN, "get max archive log failed", K(ret), K(id_));
CLOG_LOG(WARN, "get max archive log failed", K(id_));
} else {
if (archive_lsn == end_lsn && archive_scn == SCN::min_scn()) {
archive_scn = end_scn;
CLOG_LOG(INFO, "rewrite archive_scn while end_lsn equals to archive_lsn and archive_scn not got",
K(id_), K(archive_lsn), K(archive_scn), K(end_lsn), K(end_scn));
}
CLOG_LOG(INFO, "check_restore_to_newest succ", K(id_), K(archive_scn), K(end_scn));
}
}