[4.0] replace log_ts with SCN(src/storage/ls) 2
This commit is contained in:
parent
11485d6a98
commit
fd4a35f42a
@ -1244,11 +1244,11 @@ int ObLogHandler::disable_replay()
|
||||
int ObLogHandler::get_max_decided_log_ts_ns(int64_t &log_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SCN log_scn;
|
||||
if (OB_FAIL(get_max_decided_log_scn(log_scn))) {
|
||||
SCN scn;
|
||||
if (OB_FAIL(get_max_decided_scn(scn))) {
|
||||
CLOG_LOG(WARN, "failed to get_max_decided_log_ts_ns", K(ret));
|
||||
} else {
|
||||
log_ts = log_scn.get_val_for_lsn_allocator();
|
||||
log_ts = scn.get_val_for_lsn_allocator();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1301,11 +1301,11 @@ bool ObLogHandler::is_replay_enabled() const
|
||||
return bool_ret;
|
||||
}
|
||||
|
||||
int ObLogHandler::get_max_decided_log_scn(SCN &log_scn)
|
||||
int ObLogHandler::get_max_decided_scn(SCN &scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
SCN min_unreplay_log_scn;
|
||||
SCN min_unapply_log_scn;
|
||||
SCN min_unreplay_scn;
|
||||
SCN min_unapply_scn;
|
||||
share::ObLSID id;
|
||||
RLockGuard guard(lock_);
|
||||
if (IS_NOT_INIT) {
|
||||
@ -1314,36 +1314,36 @@ int ObLogHandler::get_max_decided_log_scn(SCN &log_scn)
|
||||
//和replay service统一返回4109
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
} else if (FALSE_IT(id = id_)) {
|
||||
} else if (OB_FAIL(apply_service_->get_min_unapplied_log_scn(id, min_unapply_log_scn))) {
|
||||
} else if (OB_FAIL(apply_service_->get_min_unapplied_log_scn(id, min_unapply_scn))) {
|
||||
CLOG_LOG(WARN, "failed to get_min_unapplied_log_scn", K(ret), K(id));
|
||||
} else if (OB_FAIL(replay_service_->get_min_unreplayed_log_scn(id, min_unreplay_log_scn))) {
|
||||
} else if (OB_FAIL(replay_service_->get_min_unreplayed_log_scn(id, min_unreplay_scn))) {
|
||||
if (OB_STATE_NOT_MATCH != ret) {
|
||||
CLOG_LOG(WARN, "failed to get_min_unreplayed_log_scn", K(ret), K(id));
|
||||
} else if (palf_reach_time_interval(1000 * 1000, get_max_decided_log_ts_ns_debug_time_)) {
|
||||
CLOG_LOG(WARN, "failed to get_min_unreplayed_log_ts_ns, replay status is not enabled", K(ret), K(id));
|
||||
}
|
||||
if (OB_STATE_NOT_MATCH == ret && min_unapply_log_scn.is_valid()) {
|
||||
if (OB_STATE_NOT_MATCH == ret && min_unapply_scn.is_valid()) {
|
||||
//回放尚未enable,但是apply service中拿到的最大连续回调位点合法
|
||||
ret = OB_SUCCESS;
|
||||
if (min_unapply_log_scn > SCN::base_scn()) {
|
||||
if (min_unapply_scn > SCN::base_scn()) {
|
||||
//TODO(scn):yaoying.yyy
|
||||
log_scn.convert_for_gts(min_unapply_log_scn.get_val_for_lsn_allocator() - 1);
|
||||
scn.convert_for_gts(min_unapply_scn.get_val_for_lsn_allocator() - 1);
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
log_scn.set_min();
|
||||
scn.set_min();
|
||||
}
|
||||
CLOG_LOG(INFO, "replay is not enabled, get_max_decided_log_scn from apply", K(ret), K(id),
|
||||
K(min_unreplay_log_scn), K(min_unapply_log_scn), K(log_scn));
|
||||
CLOG_LOG(INFO, "replay is not enabled, get_max_decided_scn from apply", K(ret), K(id),
|
||||
K(min_unreplay_scn), K(min_unapply_scn), K(scn));
|
||||
}
|
||||
} else {
|
||||
SCN tmp_scn = SCN::max(min_unreplay_log_scn, min_unapply_log_scn);
|
||||
SCN tmp_scn = SCN::max(min_unreplay_scn, min_unapply_scn);
|
||||
if (tmp_scn > SCN::base_scn()) {
|
||||
//TODO(scn):yaoying.yyy
|
||||
log_scn.convert_for_gts(tmp_scn.get_val_for_lsn_allocator() - 1);
|
||||
scn.convert_for_gts(tmp_scn.get_val_for_lsn_allocator() - 1);
|
||||
} else {
|
||||
log_scn.set_min();
|
||||
scn.set_min();
|
||||
}
|
||||
CLOG_LOG(TRACE, "get_max_decided_log_scn", K(ret), K(id), K(min_unreplay_log_scn), K(min_unapply_log_scn), K(log_scn));
|
||||
CLOG_LOG(TRACE, "get_max_decided_scn", K(ret), K(id), K(min_unreplay_scn), K(min_unapply_scn), K(scn));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ public:
|
||||
virtual int enable_replay(const palf::LSN &initial_lsn, const palf::SCN &initial_log_scn) = 0;
|
||||
virtual int enable_replay(const palf::LSN &initial_lsn, const int64_t &initial_log_ts) = 0;
|
||||
virtual int disable_replay() = 0;
|
||||
virtual int get_max_decided_log_scn(palf::SCN &scn) = 0;
|
||||
virtual int get_max_decided_scn(palf::SCN &scn) = 0;
|
||||
virtual int get_max_decided_log_ts_ns(int64_t &log_ts) = 0;
|
||||
virtual int pend_submit_replay_log() = 0;
|
||||
virtual int restore_submit_replay_log() = 0;
|
||||
@ -531,7 +531,7 @@ public:
|
||||
bool is_replay_enabled() const override final;
|
||||
// @brief, get max decided log ts considering both apply and replay.
|
||||
// @param[out] int64_t&, max decided log ts ns.
|
||||
int get_max_decided_log_scn(palf::SCN &scn) override final;
|
||||
int get_max_decided_scn(palf::SCN &scn) override final;
|
||||
int get_max_decided_log_ts_ns(int64_t &log_ts) override final;
|
||||
// @brief: store a persistent flag which means this paxos replica
|
||||
// can not reply ack when receiving logs.
|
||||
|
@ -242,12 +242,12 @@ int ObAllVirtualMinorFreezeInfo::generate_memtables_info()
|
||||
// tablet_id
|
||||
strcat(memtables_info_string_, "tablet_id:");
|
||||
strcat(memtables_info_string_, to_cstring(memtables_info_[i].tablet_id_.id()));
|
||||
// start_log_scn
|
||||
strcat(memtables_info_string_, ", start_log_scn:");
|
||||
strcat(memtables_info_string_, to_cstring(memtables_info_[i].start_log_scn_));
|
||||
// end_log_scn
|
||||
strcat(memtables_info_string_, ", end_log_scn:");
|
||||
strcat(memtables_info_string_, to_cstring(memtables_info_[i].end_log_scn_));
|
||||
// start_scn
|
||||
strcat(memtables_info_string_, ", start_scn:");
|
||||
strcat(memtables_info_string_, to_cstring(memtables_info_[i].start_scn_));
|
||||
// end_scn
|
||||
strcat(memtables_info_string_, ", end_scn:");
|
||||
strcat(memtables_info_string_, to_cstring(memtables_info_[i].end_scn_));
|
||||
// write_ref_cnt
|
||||
strcat(memtables_info_string_, ", write_ref_cnt:");
|
||||
strcat(memtables_info_string_, to_cstring(memtables_info_[i].write_ref_cnt_));
|
||||
|
@ -798,7 +798,7 @@ int ObStartPrepareMigrationTask::deal_with_local_ls_()
|
||||
} else if (OB_FAIL(ls->get_saved_info(saved_info))) {
|
||||
LOG_WARN("failed to get saved info", K(ret), KPC(ls));
|
||||
} else if (!saved_info.is_empty()) {
|
||||
ctx_->log_sync_scn_ = saved_info.clog_checkpoint_ts_;
|
||||
ctx_->log_sync_scn_ = saved_info.clog_checkpoint_scn_.get_val_for_lsn_allocator();
|
||||
} else if (OB_FAIL(ls->get_end_ts_ns(ctx_->log_sync_scn_))) {
|
||||
LOG_WARN("failed to get end ts ns", K(ret), KPC(ctx_));
|
||||
}
|
||||
|
@ -31,8 +31,8 @@ namespace storage
|
||||
{
|
||||
ObFrozenMemtableInfo::ObFrozenMemtableInfo()
|
||||
: tablet_id_(),
|
||||
start_log_scn_(share::ObScnRange::MIN_SCN),
|
||||
end_log_scn_(share::ObScnRange::MIN_SCN),
|
||||
start_scn_(share::ObScnRange::MIN_SCN),
|
||||
end_scn_(share::ObScnRange::MIN_SCN),
|
||||
write_ref_cnt_(0),
|
||||
unsubmitted_cnt_(0),
|
||||
unsynced_cnt_(0),
|
||||
@ -40,15 +40,15 @@ ObFrozenMemtableInfo::ObFrozenMemtableInfo()
|
||||
{}
|
||||
|
||||
ObFrozenMemtableInfo::ObFrozenMemtableInfo(const ObTabletID &tablet_id,
|
||||
const palf::SCN &start_log_scn,
|
||||
const palf::SCN &end_log_scn,
|
||||
const palf::SCN &start_scn,
|
||||
const palf::SCN &end_scn,
|
||||
const int64_t write_ref_cnt,
|
||||
const int64_t unsubmitted_cnt,
|
||||
const int64_t unsynced_cnt,
|
||||
const int64_t current_right_boundary)
|
||||
: tablet_id_(tablet_id),
|
||||
start_log_scn_(start_log_scn),
|
||||
end_log_scn_(end_log_scn),
|
||||
start_scn_(start_scn),
|
||||
end_scn_(end_scn),
|
||||
write_ref_cnt_(write_ref_cnt),
|
||||
unsubmitted_cnt_(unsubmitted_cnt),
|
||||
unsynced_cnt_(unsynced_cnt),
|
||||
@ -63,8 +63,8 @@ ObFrozenMemtableInfo::~ObFrozenMemtableInfo()
|
||||
void ObFrozenMemtableInfo::reset()
|
||||
{
|
||||
tablet_id_.reset();
|
||||
start_log_scn_ = share::ObScnRange::MIN_SCN;
|
||||
end_log_scn_ = share::ObScnRange::MIN_SCN;
|
||||
start_scn_ = share::ObScnRange::MIN_SCN;
|
||||
end_scn_ = share::ObScnRange::MIN_SCN;
|
||||
write_ref_cnt_ = 0;
|
||||
unsubmitted_cnt_ = 0;
|
||||
unsynced_cnt_ = 0;
|
||||
@ -72,16 +72,16 @@ void ObFrozenMemtableInfo::reset()
|
||||
}
|
||||
|
||||
void ObFrozenMemtableInfo::set(const ObTabletID &tablet_id,
|
||||
const palf::SCN &start_log_scn,
|
||||
const palf::SCN &end_log_scn,
|
||||
const palf::SCN &start_scn,
|
||||
const palf::SCN &end_scn,
|
||||
const int64_t write_ref_cnt,
|
||||
const int64_t unsubmitted_cnt,
|
||||
const int64_t unsynced_cnt,
|
||||
const int64_t current_right_boundary)
|
||||
{
|
||||
tablet_id_ = tablet_id;
|
||||
start_log_scn_ = start_log_scn;
|
||||
end_log_scn_ = end_log_scn;
|
||||
start_scn_ = start_scn;
|
||||
end_scn_ = end_scn;
|
||||
write_ref_cnt_ = write_ref_cnt;
|
||||
unsubmitted_cnt_ = unsubmitted_cnt;
|
||||
unsynced_cnt_ = unsynced_cnt;
|
||||
@ -90,7 +90,7 @@ void ObFrozenMemtableInfo::set(const ObTabletID &tablet_id,
|
||||
|
||||
bool ObFrozenMemtableInfo::is_valid()
|
||||
{
|
||||
return tablet_id_.is_valid() && start_log_scn_ > share::ObScnRange::MIN_SCN && end_log_scn_ > share::ObScnRange::MIN_SCN;
|
||||
return tablet_id_.is_valid() && start_scn_ > share::ObScnRange::MIN_SCN && end_scn_ > share::ObScnRange::MIN_SCN;
|
||||
}
|
||||
|
||||
ObFreezerStat::ObFreezerStat()
|
||||
@ -131,8 +131,8 @@ bool ObFreezerStat::is_valid()
|
||||
}
|
||||
|
||||
int ObFreezerStat::add_memtable_info(const ObTabletID &tablet_id,
|
||||
const palf::SCN &start_log_scn,
|
||||
const palf::SCN &end_log_scn,
|
||||
const palf::SCN &start_scn,
|
||||
const palf::SCN &end_scn,
|
||||
const int64_t write_ref_cnt,
|
||||
const int64_t unsubmitted_cnt,
|
||||
const int64_t unsynced_cnt,
|
||||
@ -143,8 +143,8 @@ int ObFreezerStat::add_memtable_info(const ObTabletID &tablet_id,
|
||||
ObSpinLockGuard guard(memtables_info_lock_);
|
||||
if (memtables_info_.count() < FROZEN_MEMTABLE_INFO_CNT) {
|
||||
ObFrozenMemtableInfo memtable_info(tablet_id,
|
||||
start_log_scn,
|
||||
end_log_scn,
|
||||
start_scn,
|
||||
end_scn,
|
||||
write_ref_cnt,
|
||||
unsubmitted_cnt,
|
||||
unsynced_cnt,
|
||||
@ -193,7 +193,7 @@ void ObFreezerStat::add_diagnose_info(const ObString &str)
|
||||
ObFreezer::ObFreezer()
|
||||
: freeze_flag_(0),
|
||||
freeze_snapshot_version_(),
|
||||
max_decided_log_scn_(),
|
||||
max_decided_scn_(),
|
||||
ls_wrs_handler_(nullptr),
|
||||
ls_tx_svr_(nullptr),
|
||||
ls_tablet_svr_(nullptr),
|
||||
@ -213,7 +213,7 @@ ObFreezer::ObFreezer(ObLSWRSHandler *ls_loop_worker,
|
||||
const share::ObLSID &ls_id)
|
||||
: freeze_flag_(0),
|
||||
freeze_snapshot_version_(),
|
||||
max_decided_log_scn_(),
|
||||
max_decided_scn_(),
|
||||
ls_wrs_handler_(ls_loop_worker),
|
||||
ls_tx_svr_(ls_tx_svr),
|
||||
ls_tablet_svr_(ls_tablet_svr),
|
||||
@ -240,7 +240,7 @@ void ObFreezer::set(ObLSWRSHandler *ls_loop_worker,
|
||||
{
|
||||
freeze_flag_ = freeze_flag;
|
||||
freeze_snapshot_version_.reset();
|
||||
max_decided_log_scn_.reset();
|
||||
max_decided_scn_.reset();
|
||||
ls_wrs_handler_ = ls_loop_worker;
|
||||
ls_tx_svr_ = ls_tx_svr;
|
||||
ls_tablet_svr_ = ls_tablet_svr;
|
||||
@ -255,7 +255,7 @@ void ObFreezer::reset()
|
||||
{
|
||||
freeze_flag_ = 0;
|
||||
freeze_snapshot_version_.reset();
|
||||
max_decided_log_scn_.reset();
|
||||
max_decided_scn_.reset();
|
||||
ls_wrs_handler_ = nullptr;
|
||||
ls_tx_svr_ = nullptr;
|
||||
data_checkpoint_ = nullptr;
|
||||
@ -296,7 +296,7 @@ int ObFreezer::logstream_freeze()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
palf::SCN freeze_snapshot_version;
|
||||
palf::SCN max_decided_log_scn;
|
||||
palf::SCN max_decided_scn;
|
||||
FLOG_INFO("[Freezer] logstream_freeze start", K(ret), K_(ls_id));
|
||||
stat_.reset();
|
||||
stat_.start_time_ = ObTimeUtility::current_time();
|
||||
@ -305,7 +305,7 @@ int ObFreezer::logstream_freeze()
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("[Freezer] not inited", K(ret), K_(ls_id));
|
||||
} else if (OB_FAIL(decide_max_decided_log_scn(max_decided_log_scn))) {
|
||||
} else if (OB_FAIL(decide_max_decided_scn(max_decided_scn))) {
|
||||
TRANS_LOG(WARN, "[Freezer] decide max decided log ts failure", K(ret), K_(ls_id));
|
||||
} else if (OB_FAIL(get_ls_weak_read_scn(freeze_snapshot_version))) {
|
||||
TRANS_LOG(WARN, "[Freezer] get ls weak read ts failure", K(ret), K_(ls_id));
|
||||
@ -315,7 +315,7 @@ int ObFreezer::logstream_freeze()
|
||||
LOG_WARN("[Freezer] weak read service not inited", K(ret), K_(ls_id), K(freeze_snapshot_version));
|
||||
} else if (OB_FAIL(set_freeze_flag())) {
|
||||
FLOG_INFO("[Freezer] freeze is running", K(ret), K_(ls_id));
|
||||
} else if (FALSE_IT(max_decided_log_scn_ = max_decided_log_scn)) {
|
||||
} else if (FALSE_IT(max_decided_scn_ = max_decided_scn)) {
|
||||
} else if (FALSE_IT(freeze_snapshot_version_ = freeze_snapshot_version)) {
|
||||
} else if (FALSE_IT(stat_.state_ = ObFreezeState::NOT_SUBMIT_LOG)) {
|
||||
} else if (OB_FAIL(inner_logstream_freeze())) {
|
||||
@ -721,8 +721,8 @@ void ObFreezer::unset_freeze_()
|
||||
// Step1: unset freeze_snapshot_version to invalid value
|
||||
freeze_snapshot_version_.reset();
|
||||
|
||||
// Step2: unset max_decided_log_scn to invalid value
|
||||
max_decided_log_scn_.reset();
|
||||
// Step2: unset max_decided_scn to invalid value
|
||||
max_decided_scn_.reset();
|
||||
|
||||
// Step3: unset freeze_flag to flag the end of freeze
|
||||
// set the first bit 0
|
||||
@ -740,8 +740,8 @@ void ObFreezer::undo_freeze_()
|
||||
// Step1: unset freeze_snapshot_version to invalid value
|
||||
freeze_snapshot_version_.reset();
|
||||
|
||||
// Step2: unset max_decided_log_scn to invalid value
|
||||
max_decided_log_scn_.reset();
|
||||
// Step2: unset max_decided_scn to invalid value
|
||||
max_decided_scn_.reset();
|
||||
|
||||
// Step3: unset freeze_flag and dec freeze_clock
|
||||
// used when freeze fails
|
||||
@ -781,25 +781,25 @@ int ObFreezer::get_max_consequent_callbacked_log_ts(int64_t &max_decided_log_ts)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObFreezer::decide_max_decided_log_scn(palf::SCN &max_decided_log_scn)
|
||||
int ObFreezer::decide_max_decided_scn(palf::SCN &max_decided_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("[Freezer] not inited", K(ret), K_(ls_id));
|
||||
} else if (OB_FAIL(loghandler_->get_max_decided_log_scn(max_decided_log_scn))) {
|
||||
} else if (OB_FAIL(loghandler_->get_max_decided_scn(max_decided_scn))) {
|
||||
if (OB_STATE_NOT_MATCH == ret) {
|
||||
max_decided_log_scn.reset();
|
||||
max_decided_scn.reset();
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
TRANS_LOG(WARN, "[Freezer] fail to get max_decided_log_scn", K(ret), K_(ls_id),
|
||||
K(max_decided_log_scn));
|
||||
TRANS_LOG(WARN, "[Freezer] fail to get max_decided_scn", K(ret), K_(ls_id),
|
||||
K(max_decided_scn));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
TRANS_LOG(TRACE, "[Freezer] decide max decided log ts", K(ret), K_(ls_id), K(max_decided_log_scn));
|
||||
TRANS_LOG(TRACE, "[Freezer] decide max decided log ts", K(ret), K_(ls_id), K(max_decided_scn));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -810,7 +810,7 @@ int ObFreezer::get_max_consequent_callbacked_scn(palf::SCN &max_consequent_callb
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("[Freezer] not inited", K(ret), K_(ls_id));
|
||||
} else if (OB_FAIL(loghandler_->get_max_decided_log_scn(max_consequent_callbacked_scn))) {
|
||||
} else if (OB_FAIL(loghandler_->get_max_decided_scn(max_consequent_callbacked_scn))) {
|
||||
if (OB_STATE_NOT_MATCH == ret) {
|
||||
max_consequent_callbacked_scn.set_min();
|
||||
ret = OB_SUCCESS;
|
||||
|
@ -107,8 +107,8 @@ class ObFrozenMemtableInfo
|
||||
public:
|
||||
ObFrozenMemtableInfo();
|
||||
ObFrozenMemtableInfo(const ObTabletID &tablet_id,
|
||||
const palf::SCN &start_log_scn_,
|
||||
const palf::SCN &end_log_scn,
|
||||
const palf::SCN &start_scn_,
|
||||
const palf::SCN &end_scn,
|
||||
const int64_t write_ref_cnt,
|
||||
const int64_t unsubmitted_cnt,
|
||||
const int64_t unsynced_cnt,
|
||||
@ -117,8 +117,8 @@ public:
|
||||
|
||||
void reset();
|
||||
void set(const ObTabletID &tablet_id,
|
||||
const palf::SCN &start_log_scn,
|
||||
const palf::SCN &end_log_scn,
|
||||
const palf::SCN &start_scn,
|
||||
const palf::SCN &end_scn,
|
||||
const int64_t write_ref_cnt,
|
||||
const int64_t unsubmitted_cnt,
|
||||
const int64_t unsynced_cnt,
|
||||
@ -127,13 +127,13 @@ public:
|
||||
|
||||
public:
|
||||
ObTabletID tablet_id_;
|
||||
palf::SCN start_log_scn_;
|
||||
palf::SCN end_log_scn_;
|
||||
palf::SCN start_scn_;
|
||||
palf::SCN end_scn_;
|
||||
int64_t write_ref_cnt_;
|
||||
int64_t unsubmitted_cnt_;
|
||||
int64_t unsynced_cnt_;
|
||||
int64_t current_right_boundary_;
|
||||
TO_STRING_KV(K_(tablet_id), K_(start_log_scn), K_(end_log_scn), K_(write_ref_cnt),
|
||||
TO_STRING_KV(K_(tablet_id), K_(start_scn), K_(end_scn), K_(write_ref_cnt),
|
||||
K_(unsubmitted_cnt), K_(unsynced_cnt), K_(current_right_boundary));
|
||||
};
|
||||
|
||||
@ -151,8 +151,8 @@ public:
|
||||
|
||||
public:
|
||||
int add_memtable_info(const ObTabletID &tablet_id,
|
||||
const palf::SCN &start_log_scn,
|
||||
const palf::SCN &end_log_scn,
|
||||
const palf::SCN &start_scn,
|
||||
const palf::SCN &end_scn,
|
||||
const int64_t write_ref_cnt,
|
||||
const int64_t unsubmitted_cnt,
|
||||
const int64_t unsynced_cnt,
|
||||
@ -226,8 +226,8 @@ public:
|
||||
/* freeze_snapshot_version */
|
||||
palf::SCN get_freeze_snapshot_version() { return freeze_snapshot_version_; }
|
||||
|
||||
/* max_decided_log_scn */
|
||||
palf::SCN get_max_decided_log_scn() { return max_decided_log_scn_; }
|
||||
/* max_decided_scn */
|
||||
palf::SCN get_max_decided_scn() { return max_decided_scn_; }
|
||||
|
||||
/* statistics*/
|
||||
void inc_empty_memtable_cnt();
|
||||
@ -241,7 +241,7 @@ public:
|
||||
virtual int get_max_consequent_callbacked_scn(palf::SCN &max_consequent_callbacked_scn);
|
||||
// to set snapshot version when memtables meet ready_for_flush
|
||||
int get_ls_weak_read_scn(palf::SCN &weak_read_scn);
|
||||
int decide_max_decided_log_scn(palf::SCN &max_decided_log_scn);
|
||||
int decide_max_decided_scn(palf::SCN &max_decided_scn);
|
||||
// to resolve concurrency problems about multi-version tablet
|
||||
int get_newest_clog_checkpoint_scn(const ObTabletID &tablet_id,
|
||||
palf::SCN &clog_checkpoint_scn);
|
||||
@ -274,9 +274,9 @@ private:
|
||||
// which all transaction has been saved into the memtable and the memtables
|
||||
// before it.
|
||||
palf::SCN freeze_snapshot_version_;
|
||||
// max_decided_log_scn saved for memtable when freeze happen, which means the
|
||||
// max_decided_scn saved for memtable when freeze happen, which means the
|
||||
// log ts before which will be smaller than the log ts in the latter memtables
|
||||
palf::SCN max_decided_log_scn_;
|
||||
palf::SCN max_decided_scn_;
|
||||
|
||||
ObLSWRSHandler *ls_wrs_handler_;
|
||||
ObLSTxService *ls_tx_svr_;
|
||||
|
@ -76,7 +76,7 @@ int ObLS::init(const share::ObLSID &ls_id,
|
||||
const ObReplicaType replica_type,
|
||||
const ObMigrationStatus &migration_status,
|
||||
const ObLSRestoreStatus &restore_status,
|
||||
const int64_t create_scn,
|
||||
const palf::SCN &create_scn,
|
||||
observer::ObIMetaReport *reporter)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -950,25 +950,25 @@ int ObLS::replay_get_tablet(const common::ObTabletID &tablet_id,
|
||||
int ret = OB_SUCCESS;
|
||||
const ObTabletMapKey key(ls_meta_.ls_id_, tablet_id);
|
||||
const palf::SCN tablet_change_checkpoint_scn = ls_meta_.get_tablet_change_checkpoint_scn();
|
||||
palf::SCN log_scn;
|
||||
palf::SCN scn;
|
||||
ObTabletHandle tablet_handle;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ls is not inited", K(ret));
|
||||
} else if (OB_FAIL(log_scn.convert_for_lsn_allocator(log_ts))) {
|
||||
STORAGE_LOG(WARN, "fail to convert to scn", KR(ret), K(log_scn));
|
||||
} else if (OB_UNLIKELY(!tablet_id.is_valid() || !log_scn.is_valid())) {
|
||||
} else if (OB_FAIL(scn.convert_for_lsn_allocator(log_ts))) {
|
||||
STORAGE_LOG(WARN, "fail to convert to scn", KR(ret), K(scn));
|
||||
} else if (OB_UNLIKELY(!tablet_id.is_valid() || !scn.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", K(ret), K(tablet_id), K(log_scn));
|
||||
LOG_WARN("invalid args", K(ret), K(tablet_id), K(scn));
|
||||
} else if (OB_FAIL(ObTabletCreateDeleteHelper::get_tablet(key, tablet_handle))) {
|
||||
if (OB_TABLET_NOT_EXIST != ret) {
|
||||
LOG_WARN("failed to get tablet", K(ret), K(key));
|
||||
} else if (log_scn < tablet_change_checkpoint_scn) {
|
||||
LOG_WARN("tablet already deleted", K(ret), K(key), K(log_scn), K(tablet_change_checkpoint_scn));
|
||||
} else if (scn < tablet_change_checkpoint_scn) {
|
||||
LOG_WARN("tablet already deleted", K(ret), K(key), K(scn), K(tablet_change_checkpoint_scn));
|
||||
} else {
|
||||
ret = OB_EAGAIN;
|
||||
LOG_INFO("tablet does not exist, but need retry", K(ret), K(key), K(log_scn), K(tablet_change_checkpoint_scn));
|
||||
LOG_INFO("tablet does not exist, but need retry", K(ret), K(key), K(scn), K(tablet_change_checkpoint_scn));
|
||||
}
|
||||
} else {
|
||||
ObTabletTxMultiSourceDataUnit tx_data;
|
||||
@ -976,17 +976,17 @@ int ObLS::replay_get_tablet(const common::ObTabletID &tablet_id,
|
||||
LOG_WARN("failed to get tablet tx data", K(ret), K(tablet_handle));
|
||||
} else if (ObTabletStatus::CREATING == tx_data.tablet_status_) {
|
||||
ret = OB_EAGAIN;
|
||||
LOG_INFO("tablet is CREATING, need retry", K(ret), K(key), K(tx_data), K(log_scn));
|
||||
LOG_INFO("tablet is CREATING, need retry", K(ret), K(key), K(tx_data), K(scn));
|
||||
} else if (ObTabletStatus::NORMAL == tx_data.tablet_status_) {
|
||||
// do nothing
|
||||
} else if (ObTabletStatus::DELETING == tx_data.tablet_status_) {
|
||||
LOG_INFO("tablet is DELETING, just continue", K(ret), K(key), K(tx_data), K(log_scn));
|
||||
LOG_INFO("tablet is DELETING, just continue", K(ret), K(key), K(tx_data), K(scn));
|
||||
} else if (ObTabletStatus::DELETED == tx_data.tablet_status_) {
|
||||
ret = OB_TABLET_NOT_EXIST;
|
||||
LOG_INFO("tablet is already deleted", K(ret), K(key), K(tx_data), K(log_scn));
|
||||
LOG_INFO("tablet is already deleted", K(ret), K(key), K(tx_data), K(scn));
|
||||
} else {
|
||||
ret = OB_EAGAIN;
|
||||
LOG_INFO("tablet may be in creating procedure", K(ret), K(key), K(tx_data), K(log_scn));
|
||||
LOG_INFO("tablet may be in creating procedure", K(ret), K(key), K(tx_data), K(scn));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1288,7 +1288,7 @@ int ObLS::set_tablet_change_checkpoint_scn(const palf::SCN &scn)
|
||||
|
||||
int ObLS::update_id_meta_with_writing_slog(const int64_t service_type,
|
||||
const int64_t limited_id,
|
||||
const palf::SCN &latest_log_scn)
|
||||
const palf::SCN &latest_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t read_lock = LSLOCKLS;
|
||||
@ -1304,8 +1304,8 @@ int ObLS::update_id_meta_with_writing_slog(const int64_t service_type,
|
||||
} else if (!can_update_ls_meta(ls_meta_.ls_create_status_)) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
STORAGE_LOG(WARN, "state not match, cannot update ls meta", K(ret), K(ls_meta_));
|
||||
} else if (OB_FAIL(ls_meta_.update_id_meta(service_type, limited_id, latest_log_scn, true))) {
|
||||
STORAGE_LOG(WARN, "update id meta with slog fail", K(ret), K_(ls_meta), K(service_type), K(limited_id), K(latest_log_scn));
|
||||
} else if (OB_FAIL(ls_meta_.update_id_meta(service_type, limited_id, latest_scn, true))) {
|
||||
STORAGE_LOG(WARN, "update id meta with slog fail", K(ret), K_(ls_meta), K(service_type), K(limited_id), K(latest_scn));
|
||||
} else {
|
||||
// do nothing
|
||||
}
|
||||
@ -1315,7 +1315,7 @@ int ObLS::update_id_meta_with_writing_slog(const int64_t service_type,
|
||||
|
||||
int ObLS::update_id_meta_without_writing_slog(const int64_t service_type,
|
||||
const int64_t limited_id,
|
||||
const palf::SCN &latest_log_scn)
|
||||
const palf::SCN &latest_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -1325,8 +1325,8 @@ int ObLS::update_id_meta_without_writing_slog(const int64_t service_type,
|
||||
} else if (OB_UNLIKELY(is_stopped_)) {
|
||||
ret = OB_NOT_RUNNING;
|
||||
STORAGE_LOG(WARN, "ls stopped", K(ret), K_(ls_meta));
|
||||
} else if (OB_FAIL(ls_meta_.update_id_meta(service_type, limited_id, latest_log_scn, false))) {
|
||||
STORAGE_LOG(WARN, "update id meta fail", K(ret), K_(ls_meta), K(service_type), K(limited_id), K(latest_log_scn));
|
||||
} else if (OB_FAIL(ls_meta_.update_id_meta(service_type, limited_id, latest_scn, false))) {
|
||||
STORAGE_LOG(WARN, "update id meta fail", K(ret), K_(ls_meta), K(service_type), K(limited_id), K(latest_scn));
|
||||
} else {
|
||||
// do nothing
|
||||
}
|
||||
|
@ -157,7 +157,7 @@ public:
|
||||
const ObReplicaType replica_type,
|
||||
const ObMigrationStatus &migration_status,
|
||||
const share::ObLSRestoreStatus &restore_status,
|
||||
const int64_t create_scn,
|
||||
const palf::SCN &create_scn,
|
||||
observer::ObIMetaReport *reporter);
|
||||
// I am ready to work now.
|
||||
int start();
|
||||
@ -291,17 +291,17 @@ public:
|
||||
// ObLSMeta interface:
|
||||
int update_id_meta_with_writing_slog(const int64_t service_type,
|
||||
const int64_t limited_id,
|
||||
const palf::SCN &latest_log_scn);
|
||||
const palf::SCN &latest_scn);
|
||||
int update_id_meta_without_writing_slog(const int64_t service_type,
|
||||
const int64_t limited_id,
|
||||
const palf::SCN &latest_log_scn);
|
||||
const palf::SCN &latest_scn);
|
||||
// int set_ls_rebuild();
|
||||
UPDATE_LSMETA_WITH_LOCK(ls_meta_, set_ls_rebuild);
|
||||
// protect in ls lock
|
||||
// int set_gc_state(const logservice::LSGCState &gc_state);
|
||||
UPDATE_LSMETA_WITH_LOCK(ls_meta_, set_gc_state);
|
||||
// int set_clog_checkpoint(const palf::LSN &clog_checkpoint_lsn,
|
||||
// const int64_t clog_checkpoint_ts,
|
||||
// const palf::SCN &clog_checkpoint_scn,
|
||||
// const bool write_slog = true);
|
||||
UPDATE_LSMETA_WITH_LOCK(ls_meta_, set_clog_checkpoint);
|
||||
UPDATE_LSMETA_WITHOUT_LOCK(ls_meta_, set_clog_checkpoint);
|
||||
@ -364,8 +364,8 @@ public:
|
||||
// int get_ls_replayable_point(int64_t &replayable_point);
|
||||
DELEGATE_WITH_RET(ls_meta_, get_ls_replayable_point, int);
|
||||
// set tablet_change_checkpoint_scn, add write lock of LSLOCKLOGMETA.
|
||||
// @param [in] log_scn
|
||||
int set_tablet_change_checkpoint_scn(const palf::SCN &log_scn);
|
||||
// @param [in] scn
|
||||
int set_tablet_change_checkpoint_scn(const palf::SCN &scn);
|
||||
// get ls_meta_package and unsorted tablet_ids, add read lock of LSLOCKLOGMETA.
|
||||
// @param [out] meta_package
|
||||
// @param [out] tablet_ids
|
||||
@ -491,7 +491,7 @@ public:
|
||||
DELEGATE_WITH_RET(log_handler_, get_max_decided_log_ts_ns, int);
|
||||
// @brief, get max decided log scn considering both apply and replay.
|
||||
// @param[out] palf::SCN&, max decided log scn.
|
||||
DELEGATE_WITH_RET(log_handler_, get_max_decided_log_scn, int);
|
||||
DELEGATE_WITH_RET(log_handler_, get_max_decided_scn, int);
|
||||
// @breif, check request server is in self member list
|
||||
// @param[in] const common::ObAddr, request server.
|
||||
// @param[out] bool&, whether in self member list.
|
||||
|
@ -49,7 +49,7 @@ ObLSMeta::ObLSMeta()
|
||||
ls_id_(),
|
||||
replica_type_(REPLICA_TYPE_MAX),
|
||||
ls_create_status_(ObInnerLSStatus::CREATING),
|
||||
clog_checkpoint_scn_(),
|
||||
clog_checkpoint_scn_(ObScnRange::MIN_SCN),
|
||||
clog_base_lsn_(PALF_INITIAL_LSN_VAL),
|
||||
rebuild_seq_(0),
|
||||
migration_status_(ObMigrationStatus::OB_MIGRATION_STATUS_MAX),
|
||||
@ -113,7 +113,7 @@ void ObLSMeta::reset()
|
||||
ls_id_.reset();
|
||||
replica_type_ = REPLICA_TYPE_MAX;
|
||||
clog_base_lsn_.reset();
|
||||
clog_checkpoint_scn_.reset();
|
||||
clog_checkpoint_scn_ = ObScnRange::MIN_SCN;
|
||||
rebuild_seq_ = 0;
|
||||
migration_status_ = ObMigrationStatus::OB_MIGRATION_STATUS_MAX;
|
||||
gc_state_ = LSGCState::INVALID_LS_GC_STATE;
|
||||
@ -143,7 +143,7 @@ int64_t ObLSMeta::get_clog_checkpoint_ts() const
|
||||
}
|
||||
|
||||
int ObLSMeta::set_clog_checkpoint(const LSN &clog_checkpoint_lsn,
|
||||
const SCN clog_checkpoint_scn,
|
||||
const SCN &clog_checkpoint_scn,
|
||||
const bool write_slog)
|
||||
{
|
||||
ObSpinLockTimeGuard guard(lock_);
|
||||
@ -541,7 +541,7 @@ int ObLSMeta::build_saved_info()
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("saved info is not empty, can not build saved info", K(ret), K(*this));
|
||||
} else {
|
||||
saved_info.clog_checkpoint_ts_ = clog_checkpoint_scn_.convert_to_ts();
|
||||
saved_info.clog_checkpoint_scn_ = clog_checkpoint_scn_;
|
||||
saved_info.clog_base_lsn_ = clog_base_lsn_;
|
||||
saved_info.tablet_change_checkpoint_scn_ = tablet_change_checkpoint_scn_;
|
||||
ObLSMeta tmp(*this);
|
||||
@ -583,7 +583,7 @@ int ObLSMeta::init(
|
||||
const ObReplicaType &replica_type,
|
||||
const ObMigrationStatus &migration_status,
|
||||
const share::ObLSRestoreStatus &restore_status,
|
||||
const int64_t create_scn)
|
||||
const palf::SCN &create_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_INVALID_ID == tenant_id || !ls_id.is_valid()
|
||||
@ -598,7 +598,7 @@ int ObLSMeta::init(
|
||||
ls_id_ = ls_id;
|
||||
replica_type_ = replica_type;
|
||||
ls_create_status_ = ObInnerLSStatus::CREATING;
|
||||
clog_checkpoint_scn_.convert_for_lsn_allocator(create_scn);
|
||||
clog_checkpoint_scn_ = create_scn;
|
||||
clog_base_lsn_.val_ = PALF_INITIAL_LSN_VAL;
|
||||
rebuild_seq_ = 0;
|
||||
migration_status_ = migration_status;
|
||||
@ -615,20 +615,20 @@ void ObLSMeta::set_write_slog_func_(WriteSlog write_slog)
|
||||
|
||||
int ObLSMeta::update_id_meta(const int64_t service_type,
|
||||
const int64_t limited_id,
|
||||
const palf::SCN &latest_log_scn,
|
||||
const palf::SCN &latest_scn,
|
||||
const bool write_slog)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
ObSpinLockTimeGuard guard(lock_);
|
||||
all_id_meta_.update_id_meta(service_type, limited_id, latest_log_scn);
|
||||
all_id_meta_.update_id_meta(service_type, limited_id, latest_scn);
|
||||
guard.click();
|
||||
if (write_slog) {
|
||||
if (OB_FAIL(write_slog_(*this))) {
|
||||
LOG_WARN("id service flush write slog failed", K(ret));
|
||||
}
|
||||
}
|
||||
LOG_INFO("update id meta", K(ret), K(service_type), K(limited_id), K(latest_log_scn),
|
||||
LOG_INFO("update id meta", K(ret), K(service_type), K(limited_id), K(latest_scn),
|
||||
K(*this));
|
||||
|
||||
return ret;
|
||||
|
@ -50,7 +50,7 @@ public:
|
||||
int64_t get_clog_checkpoint_ts() const;
|
||||
palf::LSN &get_clog_base_lsn();
|
||||
int set_clog_checkpoint(const palf::LSN &clog_checkpoint_lsn,
|
||||
const palf::SCN clog_checkpoint_scn,
|
||||
const palf::SCN &clog_checkpoint_scn,
|
||||
const bool write_slog = true);
|
||||
void reset();
|
||||
bool is_valid() const;
|
||||
@ -80,7 +80,7 @@ public:
|
||||
int set_tablet_change_checkpoint_scn(const palf::SCN &tablet_change_checkpoint_scn);
|
||||
int update_id_meta(const int64_t service_type,
|
||||
const int64_t limited_id,
|
||||
const palf::SCN &latest_log_scn,
|
||||
const palf::SCN &latest_scn,
|
||||
const bool write_slog);
|
||||
int update_id_service()
|
||||
{
|
||||
@ -96,7 +96,7 @@ public:
|
||||
const ObReplicaType &replica_type,
|
||||
const ObMigrationStatus &migration_status,
|
||||
const share::ObLSRestoreStatus &restore_status,
|
||||
const int64_t create_scn);
|
||||
const palf::SCN &create_scn);
|
||||
class ObSpinLockTimeGuard
|
||||
{
|
||||
public:
|
||||
@ -132,7 +132,7 @@ private:
|
||||
palf::SCN clog_checkpoint_scn_;
|
||||
// clog_base_lsn_, meaning:
|
||||
// 1. all clog entries which lsn are smaller than clog_base_lsn_ have been recycled
|
||||
// 2. log_ts of log entry that clog_base_lsn_ points to is smaller than/equal to clog_checkpoint_ts_
|
||||
// 2. log_scn of log entry that clog_base_lsn_ points to is smaller than/equal to clog_checkpoint_scn_
|
||||
// 3. clog starts to replay log entries from clog_base_lsn_ on crash recovery
|
||||
palf::LSN clog_base_lsn_;
|
||||
int64_t rebuild_seq_;
|
||||
|
@ -13,6 +13,7 @@
|
||||
#define USING_LOG_PREFIX STORAGE
|
||||
|
||||
#include "ob_ls_saved_info.h"
|
||||
#include "share/ob_table_range.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -20,7 +21,7 @@ namespace storage
|
||||
{
|
||||
|
||||
ObLSSavedInfo::ObLSSavedInfo()
|
||||
: clog_checkpoint_ts_(0),
|
||||
: clog_checkpoint_scn_(share::ObScnRange::MIN_SCN),
|
||||
clog_base_lsn_(palf::PALF_INITIAL_LSN_VAL),
|
||||
replayable_point_(0),
|
||||
tablet_change_checkpoint_scn_(palf::SCN::min_scn())
|
||||
@ -29,7 +30,7 @@ ObLSSavedInfo::ObLSSavedInfo()
|
||||
|
||||
void ObLSSavedInfo::reset()
|
||||
{
|
||||
clog_checkpoint_ts_ = 0;
|
||||
clog_checkpoint_scn_ = share::ObScnRange::MIN_SCN;
|
||||
clog_base_lsn_ = palf::PALF_INITIAL_LSN_VAL;
|
||||
replayable_point_ = 0;
|
||||
tablet_change_checkpoint_scn_ = palf::SCN::min_scn();
|
||||
@ -37,7 +38,8 @@ void ObLSSavedInfo::reset()
|
||||
|
||||
bool ObLSSavedInfo::is_valid() const
|
||||
{
|
||||
return clog_checkpoint_ts_ >= 0
|
||||
return clog_checkpoint_scn_ >= share::ObScnRange::MIN_SCN
|
||||
&& clog_checkpoint_scn_.is_valid()
|
||||
&& clog_base_lsn_.is_valid()
|
||||
&& replayable_point_ >= 0
|
||||
&& tablet_change_checkpoint_scn_.is_valid();
|
||||
@ -45,13 +47,13 @@ bool ObLSSavedInfo::is_valid() const
|
||||
|
||||
bool ObLSSavedInfo::is_empty() const
|
||||
{
|
||||
return 0 == clog_checkpoint_ts_
|
||||
return share::ObScnRange::MIN_SCN == clog_checkpoint_scn_
|
||||
&& palf::PALF_INITIAL_LSN_VAL == clog_base_lsn_
|
||||
&& 0 == replayable_point_
|
||||
&& !tablet_change_checkpoint_scn_.is_valid();
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObLSSavedInfo, clog_checkpoint_ts_, clog_base_lsn_, replayable_point_, tablet_change_checkpoint_scn_);
|
||||
OB_SERIALIZE_MEMBER(ObLSSavedInfo, clog_checkpoint_scn_, clog_base_lsn_, replayable_point_, tablet_change_checkpoint_scn_);
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -32,9 +32,9 @@ public:
|
||||
void reset();
|
||||
bool is_empty() const;
|
||||
|
||||
TO_STRING_KV(K_(clog_checkpoint_ts), K_(clog_base_lsn), K_(replayable_point), K_(tablet_change_checkpoint_scn));
|
||||
TO_STRING_KV(K_(clog_checkpoint_scn), K_(clog_base_lsn), K_(replayable_point), K_(tablet_change_checkpoint_scn));
|
||||
|
||||
int64_t clog_checkpoint_ts_;
|
||||
palf::SCN clog_checkpoint_scn_;
|
||||
palf::LSN clog_base_lsn_;
|
||||
int64_t replayable_point_;
|
||||
palf::SCN tablet_change_checkpoint_scn_;
|
||||
|
@ -4740,7 +4740,7 @@ int ObLSTabletService::prepare_dml_running_ctx(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSTabletService::get_ls_min_end_log_ts_in_old_tablets(int64_t &end_log_ts)
|
||||
int ObLSTabletService::get_ls_min_end_scn_in_old_tablets(palf::SCN &end_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObLSID &ls_id = ls_->get_ls_id();
|
||||
@ -4748,7 +4748,7 @@ int ObLSTabletService::get_ls_min_end_log_ts_in_old_tablets(int64_t &end_log_ts)
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not inited", K(ret), K_(is_inited));
|
||||
} else if (OB_FAIL(t3m->get_min_end_log_ts_for_ls(ls_id, end_log_ts))) {
|
||||
} else if (OB_FAIL(t3m->get_min_end_scn_for_ls(ls_id, end_scn))) {
|
||||
LOG_WARN("fail to get ls min end log ts in all of old tablets", K(ret), K(ls_id));
|
||||
}
|
||||
return ret;
|
||||
|
@ -187,7 +187,7 @@ public:
|
||||
ObTabletHandle &handle,
|
||||
const int64_t timeout_us = ObTabletCommon::DEFAULT_GET_TABLET_TIMEOUT_US);
|
||||
int remove_tablets(const common::ObIArray<common::ObTabletID> &tablet_id_array);
|
||||
int get_ls_min_end_log_ts_in_old_tablets(int64_t &end_log_ts);
|
||||
int get_ls_min_end_scn_in_old_tablets(palf::SCN &end_scn);
|
||||
int get_tx_data_memtable_mgr(ObMemtableMgrHandle &mgr_handle);
|
||||
int get_tx_ctx_memtable_mgr(ObMemtableMgrHandle &mgr_handle);
|
||||
int get_lock_memtable_mgr(ObMemtableMgrHandle &mgr_handle);
|
||||
|
@ -1794,7 +1794,7 @@ int ObMemtable::resolve_max_end_log_ts_()
|
||||
if (OB_ISNULL(freezer_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "freezer should not be null", K(ret));
|
||||
} else if (FALSE_IT(max_decided_log_ts = freezer_->get_max_decided_log_scn().get_val_for_lsn_allocator())) {
|
||||
} else if (FALSE_IT(max_decided_log_ts = freezer_->get_max_decided_scn().get_val_for_lsn_allocator())) {
|
||||
TRANS_LOG(ERROR, "fail to get freeze_snapshot_version", K(ret));
|
||||
} else if (OB_INVALID_TIMESTAMP == max_decided_log_ts) {
|
||||
// Pass if not necessary
|
||||
|
@ -365,10 +365,10 @@ void ObTenantMetaMemMgr::gc_sstable(ObSSTable *sstable)
|
||||
}
|
||||
}
|
||||
|
||||
int ObTenantMetaMemMgr::get_min_end_log_ts_for_ls(const share::ObLSID &ls_id, int64_t &end_log_ts)
|
||||
int ObTenantMetaMemMgr::get_min_end_scn_for_ls(const share::ObLSID &ls_id, palf::SCN &end_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
end_log_ts = INT64_MAX;
|
||||
end_scn = ObScnRange::MAX_SCN;
|
||||
if (OB_UNLIKELY(!ls_id.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(ls_id));
|
||||
@ -382,8 +382,8 @@ int ObTenantMetaMemMgr::get_min_end_log_ts_for_ls(const share::ObLSID &ls_id, in
|
||||
} else if (OB_UNLIKELY(!info.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error, info is invalid", K(ret), K(info));
|
||||
} else if (info.sstable_handle_.get_table()->get_end_log_ts() < end_log_ts) {
|
||||
end_log_ts = info.sstable_handle_.get_table()->get_end_log_ts();
|
||||
} else if (info.sstable_handle_.get_table()->get_end_scn() < end_scn) {
|
||||
end_scn = info.sstable_handle_.get_table()->get_end_scn();
|
||||
}
|
||||
++iter;
|
||||
}
|
||||
|
@ -123,7 +123,7 @@ public:
|
||||
|
||||
// TIPS:
|
||||
// - only for tx data table to find min log ts.
|
||||
int get_min_end_log_ts_for_ls(const share::ObLSID &ls_id, int64_t &end_log_ts);
|
||||
int get_min_end_scn_for_ls(const share::ObLSID &ls_id, palf::SCN &end_scn);
|
||||
|
||||
// garbage collector for sstable and memtable.
|
||||
int push_table_into_gc_queue(ObITable *table, const ObITable::TableType table_type);
|
||||
|
@ -245,7 +245,7 @@ int ObLSService::inner_create_ls_(const share::ObLSID &lsid,
|
||||
const ObReplicaType replica_type,
|
||||
const ObMigrationStatus &migration_status,
|
||||
const ObLSRestoreStatus &restore_status,
|
||||
const int64_t create_scn,
|
||||
const palf::SCN &create_scn,
|
||||
ObLS *&ls)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -421,7 +421,7 @@ int ObLSService::create_ls(const obrpc::ObCreateLSArg &arg)
|
||||
(is_ls_to_restore_(arg) ?
|
||||
ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_START) :
|
||||
ObLSRestoreStatus(ObLSRestoreStatus::RESTORE_NONE)),
|
||||
create_scn.get_val_for_lsn_allocator(),
|
||||
create_scn,
|
||||
ls))) {
|
||||
LOG_WARN("inner create log stream failed.", K(ret), K(arg), K(migration_status));
|
||||
} else {
|
||||
@ -779,7 +779,7 @@ int ObLSService::replay_create_ls_(const ObLSMeta &ls_meta)
|
||||
ls_meta.replica_type_,
|
||||
migration_status,
|
||||
restore_status,
|
||||
ls_meta.get_clog_checkpoint_ts(),
|
||||
ls_meta.get_clog_checkpoint_scn(),
|
||||
ls))) {
|
||||
LOG_WARN("fail to inner create ls", K(ret), K(ls_meta.ls_id_));
|
||||
} else if (FALSE_IT(state = ObLSCreateState::CREATE_STATE_INNER_CREATED)) {
|
||||
@ -998,7 +998,7 @@ int ObLSService::create_ls_for_ha(
|
||||
arg.dst_.get_replica_type(),
|
||||
migration_status,
|
||||
restore_status,
|
||||
0, /* create scn */
|
||||
ObScnRange::MIN_SCN, /* create scn */
|
||||
ls))) {
|
||||
LOG_WARN("create ls failed", K(ret), K(arg), K(task_id));
|
||||
} else {
|
||||
|
@ -157,7 +157,7 @@ private:
|
||||
const ObReplicaType replica_type,
|
||||
const ObMigrationStatus &migration_status,
|
||||
const share::ObLSRestoreStatus &restore_status,
|
||||
const int64_t create_scn,
|
||||
const palf::SCN &create_scn,
|
||||
ObLS *&ls);
|
||||
int inner_del_ls_(ObLS *&ls);
|
||||
int add_ls_to_map_(ObLS *ls);
|
||||
|
@ -146,7 +146,7 @@ void ObTabletGCService::ObTabletGCTask::runTimerTask()
|
||||
STORAGE_LOG(WARN, "freezer should not null", K(ls->get_ls_id()), KR(ret));
|
||||
}
|
||||
// 1. get minor merge point
|
||||
else if (OB_FAIL(freezer->decide_max_decided_log_scn(checkpoint_scn))) {
|
||||
else if (OB_FAIL(freezer->decide_max_decided_scn(checkpoint_scn))) {
|
||||
need_retry = true;
|
||||
STORAGE_LOG(WARN, "get_max_consequent_callbacked_log_ts failed", KR(ret), K(freezer->get_ls_id()));
|
||||
} else if (!checkpoint_scn.is_valid()
|
||||
|
@ -688,7 +688,6 @@ int ObTxDataTable::get_recycle_scn(SCN &recycle_scn)
|
||||
SCN min_end_scn = SCN::max_scn();
|
||||
SCN min_end_scn_from_old_tablets = SCN::max_scn();
|
||||
SCN min_end_scn_from_latest_tablets = SCN::max_scn();
|
||||
int64_t min_end_ts_from_old_tablets = INT64_MAX;
|
||||
|
||||
// set recycle_scn = SCN::min_scn() as default which means clear nothing
|
||||
recycle_scn.set_min();
|
||||
@ -704,12 +703,9 @@ int ObTxDataTable::get_recycle_scn(SCN &recycle_scn)
|
||||
} else if (OB_FAIL(get_ls_min_end_scn_in_latest_tablets_(min_end_scn_from_latest_tablets))) {
|
||||
// get_ls_min_end_log_ts_in_latest_tablets must before get_ls_min_end_log_ts_in_old_tablets
|
||||
STORAGE_LOG(WARN, "fail to get ls min end log ts in all of latest tablets", KR(ret));
|
||||
} else if (OB_FAIL(ls_tablet_svr_->get_ls_min_end_log_ts_in_old_tablets(min_end_ts_from_old_tablets))) {
|
||||
} else if (OB_FAIL(ls_tablet_svr_->get_ls_min_end_scn_in_old_tablets(min_end_scn_from_old_tablets))) {
|
||||
STORAGE_LOG(WARN, "fail to get ls min end log ts in all of old tablets", KR(ret));
|
||||
} else {
|
||||
if (INT64_MAX != min_end_ts_from_old_tablets) {
|
||||
min_end_scn_from_old_tablets.convert_for_lsn_allocator(min_end_ts_from_old_tablets);
|
||||
}
|
||||
min_end_scn = std::min(min_end_scn_from_old_tablets, min_end_scn_from_latest_tablets);
|
||||
if (!min_end_scn.is_max()) {
|
||||
recycle_scn = min_end_scn;
|
||||
@ -874,14 +870,14 @@ bool ObTxDataTable::skip_this_sstable_end_scn_(SCN sstable_end_scn)
|
||||
int64_t cur_ts = common::ObTimeUtility::fast_current_time();
|
||||
int64_t tmp_update_ts = ATOMIC_LOAD(&last_update_min_start_scn_ts_);
|
||||
SCN min_start_scn_in_tx_data_memtable = SCN::max_scn();
|
||||
SCN max_decided_log_scn = SCN::min_scn();
|
||||
SCN max_decided_scn = SCN::min_scn();
|
||||
|
||||
// make sure the max decided log ts is greater than sstable_end_log_ts
|
||||
if (OB_FAIL(ls_->get_max_decided_log_scn(max_decided_log_scn))) {
|
||||
if (OB_FAIL(ls_->get_max_decided_scn(max_decided_scn))) {
|
||||
STORAGE_LOG(WARN, "get max decided log ts failed", KR(ret), "ls_id", get_ls_id().id());
|
||||
} else if (max_decided_log_scn < sstable_end_scn) {
|
||||
} else if (max_decided_scn < sstable_end_scn) {
|
||||
need_skip = true;
|
||||
STORAGE_LOG(INFO, "skip calc upper trans version once", K(max_decided_log_scn), K(sstable_end_scn));
|
||||
STORAGE_LOG(INFO, "skip calc upper trans version once", K(max_decided_scn), K(sstable_end_scn));
|
||||
}
|
||||
|
||||
// If the min_start_log_ts_in_ctx has not been updated for more than 30 seconds,
|
||||
|
@ -394,9 +394,9 @@ public:
|
||||
{
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
int get_max_decided_log_scn(palf::SCN &log_scn)
|
||||
int get_max_decided_scn(palf::SCN &scn)
|
||||
{
|
||||
log_scn.set_max();
|
||||
scn.set_max();
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
int get_max_decided_log_ts_ns(int64_t &log_ts)
|
||||
|
Loading…
x
Reference in New Issue
Block a user