Fix archive log consumption with larger piece
This commit is contained in:
@ -111,6 +111,7 @@ int FakeArchivePieceContext::init(const share::ObLSID &id, FakeRounds *rounds)
|
||||
} else {
|
||||
id_ = id;
|
||||
rounds_ = rounds;
|
||||
archive_dest_.set("file:///data/1/");
|
||||
is_inited_ = true;
|
||||
}
|
||||
return ret;
|
||||
@ -146,6 +147,7 @@ int FakeArchivePieceContext::get_round_(const share::SCN &start_scn)
|
||||
int FakeArchivePieceContext::get_round_range_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
dest_id_ = 1;
|
||||
min_round_id_ = rounds_->array_.at(0).round_id_;
|
||||
max_round_id_ = rounds_->array_.at(rounds_->array_.count() - 1).round_id_;
|
||||
return ret;
|
||||
|
||||
@ -11,6 +11,7 @@
|
||||
*/
|
||||
|
||||
#include "lib/ob_errno.h"
|
||||
#include "logservice/palf/log_define.h"
|
||||
#include "share/ob_errno.h"
|
||||
#include "share/ob_ls_id.h"
|
||||
#include <cstdint>
|
||||
@ -218,7 +219,7 @@ TEST(FakeArchivePieceContext, get_piece)
|
||||
lsn = LSN(10 * 64 * ONE_MB);
|
||||
CLOG_LOG(INFO, "print get piece 8", K(lsn));
|
||||
ret = archive_context.get_piece(scn, lsn, dest_id, round_id, piece_id, cur_file_id, offset, max_lsn, to_newest);
|
||||
EXPECT_EQ(OB_ITER_END, ret);
|
||||
EXPECT_EQ(OB_ERR_OUT_OF_LOWER_BOUND, ret);
|
||||
|
||||
archive_context.reset_locate_info();
|
||||
log_ts = 1010;
|
||||
|
||||
@ -175,6 +175,7 @@ int ObArchiveFetcher::set_archive_info(
|
||||
const ObCompressorType type,
|
||||
const bool need_encrypt)
|
||||
{
|
||||
UNUSED(unit_size);
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(interval_us <= 0 || !genesis_scn.is_valid() || base_piece_id < 1 || unit_size <= 0)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
@ -186,7 +187,6 @@ int ObArchiveFetcher::set_archive_info(
|
||||
UNUSED(need_encrypt);
|
||||
genesis_scn_ = genesis_scn;
|
||||
base_piece_id_ = base_piece_id;
|
||||
unit_size_ = unit_size;
|
||||
unit_size_ = 1;
|
||||
}
|
||||
return ret;
|
||||
|
||||
@ -208,6 +208,19 @@ int ObLogArchivePieceContext::init(const share::ObLSID &id,
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObLogArchivePieceContext::is_valid() const
|
||||
{
|
||||
return is_inited_
|
||||
&& locate_round_
|
||||
&& id_.is_valid()
|
||||
&& dest_id_ > 0
|
||||
&& min_round_id_ > 0
|
||||
&& max_round_id_ >= min_round_id_
|
||||
&& round_context_.is_valid()
|
||||
&& inner_piece_context_.is_valid()
|
||||
&& archive_dest_.is_valid();
|
||||
}
|
||||
|
||||
int ObLogArchivePieceContext::get_piece(const SCN &pre_scn,
|
||||
const palf::LSN &start_lsn,
|
||||
int64_t &dest_id,
|
||||
@ -219,6 +232,11 @@ int ObLogArchivePieceContext::get_piece(const SCN &pre_scn,
|
||||
bool &to_newest)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// if piece context not valid, reset it
|
||||
if (! is_valid()) {
|
||||
reset_locate_info();
|
||||
}
|
||||
|
||||
file_id = cal_archive_file_id_(start_lsn);
|
||||
if (OB_UNLIKELY(! pre_scn.is_valid() || ! start_lsn.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
@ -485,7 +503,7 @@ int ObLogArchivePieceContext::switch_round_if_need_(const SCN &scn, const palf::
|
||||
int ret = OB_SUCCESS;
|
||||
RoundOp op = RoundOp::NONE;
|
||||
RoundContext pre_round = round_context_;
|
||||
check_if_switch_round_(lsn, op);
|
||||
check_if_switch_round_(scn, lsn, op);
|
||||
switch (op) {
|
||||
case RoundOp::NONE:
|
||||
break;
|
||||
@ -512,7 +530,7 @@ int ObLogArchivePieceContext::switch_round_if_need_(const SCN &scn, const palf::
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObLogArchivePieceContext::check_if_switch_round_(const palf::LSN &lsn, RoundOp &op)
|
||||
void ObLogArchivePieceContext::check_if_switch_round_(const share::SCN &scn, const palf::LSN &lsn, RoundOp &op)
|
||||
{
|
||||
op = RoundOp::NONE;
|
||||
if (min_round_id_ == 0 || max_round_id_ == 0
|
||||
@ -525,7 +543,7 @@ void ObLogArchivePieceContext::check_if_switch_round_(const palf::LSN &lsn, Roun
|
||||
op = RoundOp::BACKWARD;
|
||||
} else if (need_forward_round_(lsn)/*确定当前round日志全部小于需要消费日志, 并且当前round小于最大round id*/) {
|
||||
op = RoundOp::FORWARD;
|
||||
} else if (need_load_round_info_(lsn)/*当前round能访问到的最大piece已经STOP, 并且当前round还是ACTIVE的*/) {
|
||||
} else if (need_load_round_info_(scn, lsn)/*当前round能访问到的最大piece已经STOP, 并且当前round还是ACTIVE的*/) {
|
||||
op = RoundOp::LOAD;
|
||||
}
|
||||
|
||||
@ -607,7 +625,7 @@ bool ObLogArchivePieceContext::need_forward_round_(const palf::LSN &lsn) const
|
||||
// 前提: 当前round_context状态非STOP
|
||||
// 1. 当前round_context信息是无效的
|
||||
// 2. 当前round最大piece不包含需要消费的日志
|
||||
bool ObLogArchivePieceContext::need_load_round_info_(const palf::LSN &lsn) const
|
||||
bool ObLogArchivePieceContext::need_load_round_info_(const share::SCN &scn, const palf::LSN &lsn) const
|
||||
{
|
||||
bool bret = false;
|
||||
if (round_context_.is_in_stop_state()) {
|
||||
@ -620,6 +638,8 @@ bool ObLogArchivePieceContext::need_load_round_info_(const palf::LSN &lsn) const
|
||||
&& inner_piece_context_.is_fronze_()
|
||||
&& lsn >= inner_piece_context_.max_lsn_in_piece_) {
|
||||
bret = true;
|
||||
} else {
|
||||
bret = cal_piece_id_(scn) > round_context_.max_piece_id_;
|
||||
}
|
||||
return bret;
|
||||
}
|
||||
|
||||
@ -52,6 +52,8 @@ public:
|
||||
|
||||
void reset();
|
||||
|
||||
bool is_valid() const;
|
||||
|
||||
int get_piece(const share::SCN &pre_scn,
|
||||
const palf::LSN &start_lsn,
|
||||
int64_t &dest_id,
|
||||
@ -230,11 +232,11 @@ private:
|
||||
// 如果round不满足数据需求, 支持切round
|
||||
int switch_round_if_need_(const share::SCN &scn, const palf::LSN &lsn);
|
||||
|
||||
void check_if_switch_round_(const palf::LSN &lsn, RoundOp &op);
|
||||
void check_if_switch_round_(const share::SCN &scn, const palf::LSN &lsn, RoundOp &op);
|
||||
bool is_max_round_done_(const palf::LSN &lsn) const;
|
||||
bool need_backward_round_(const palf::LSN &lsn) const;
|
||||
bool need_forward_round_(const palf::LSN &lsn) const;
|
||||
bool need_load_round_info_(const palf::LSN &lsn) const;
|
||||
bool need_load_round_info_(const share::SCN &scn, const palf::LSN &lsn) const;
|
||||
|
||||
// 获取指定round元信息
|
||||
virtual int load_round_(const int64_t round_id, RoundContext &round_context, bool &exist);
|
||||
|
||||
@ -399,6 +399,7 @@ RawPathDataGenerator::RawPathDataGenerator(const uint64_t tenant_id,
|
||||
RemoteDataGenerator(tenant_id, id, start_lsn, end_lsn, end_scn),
|
||||
array_(array),
|
||||
data_len_(0),
|
||||
file_id_(0),
|
||||
base_lsn_(),
|
||||
index_(piece_index),
|
||||
min_file_id_(min_file_id),
|
||||
|
||||
@ -222,6 +222,8 @@ int ObRemoteLocationParent::update_locate_info(ObRemoteLogParent &source)
|
||||
} else if (dst.root_path_ != root_path_) {
|
||||
// parent changed
|
||||
CLOG_LOG(WARN, "parent changed, just skip", K(dst), KPC(this));
|
||||
} else if (OB_UNLIKELY(! dst.piece_context_.is_valid())) {
|
||||
CLOG_LOG(TRACE, "piece_context not valid, just skip", K(dst));
|
||||
} else if (OB_FAIL(dst.piece_context_.deep_copy_to(piece_context_))) {
|
||||
CLOG_LOG(WARN, "deep copy to piece context failed", K(ret));
|
||||
piece_context_.reset_locate_info();
|
||||
|
||||
Reference in New Issue
Block a user