[4.0] replace log_ts with SCN(src/storage/ls) 3
This commit is contained in:
parent
caf3220c8c
commit
bee212d8a9
@ -761,26 +761,6 @@ bool ObFreezer::is_freeze(uint32_t freeze_flag) const
|
||||
}
|
||||
|
||||
/* other public functions */
|
||||
int ObFreezer::get_max_consequent_callbacked_log_ts(int64_t &max_decided_log_ts)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("[Freezer] not inited", K(ret), K_(ls_id));
|
||||
} else if (OB_FAIL(loghandler_->get_max_decided_log_ts_ns(max_decided_log_ts))) {
|
||||
if (OB_STATE_NOT_MATCH == ret) {
|
||||
max_decided_log_ts = 0;
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
TRANS_LOG(WARN, "[Freezer] fail to get min_unreplay_log_ts", K(ret), K_(ls_id),
|
||||
K(max_decided_log_ts));
|
||||
}
|
||||
} else {
|
||||
TRANS_LOG(TRACE, "[Freezer] get_max_decided_log_ts", K(ret), K_(ls_id), K(max_decided_log_ts));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObFreezer::decide_max_decided_scn(palf::SCN &max_decided_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -237,7 +237,6 @@ public:
|
||||
|
||||
/* others */
|
||||
// get consequent callbacked log_ts right boundary
|
||||
virtual int get_max_consequent_callbacked_log_ts(int64_t &max_consequent_callbacked_log_ts);
|
||||
virtual int get_max_consequent_callbacked_scn(palf::SCN &max_consequent_callbacked_scn);
|
||||
// to set snapshot version when memtables meet ready_for_flush
|
||||
int get_ls_weak_read_scn(palf::SCN &weak_read_scn);
|
||||
|
@ -944,20 +944,17 @@ int ObLS::finish_slog_replay()
|
||||
}
|
||||
|
||||
int ObLS::replay_get_tablet(const common::ObTabletID &tablet_id,
|
||||
const int64_t log_ts,
|
||||
const palf::SCN &scn,
|
||||
ObTabletHandle &handle) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObTabletMapKey key(ls_meta_.ls_id_, tablet_id);
|
||||
const palf::SCN tablet_change_checkpoint_scn = ls_meta_.get_tablet_change_checkpoint_scn();
|
||||
palf::SCN scn;
|
||||
ObTabletHandle tablet_handle;
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ls is not inited", K(ret));
|
||||
} else if (OB_FAIL(scn.convert_for_lsn_allocator(log_ts))) {
|
||||
STORAGE_LOG(WARN, "fail to convert to scn", KR(ret), K(scn));
|
||||
} else if (OB_UNLIKELY(!tablet_id.is_valid() || !scn.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", K(ret), K(tablet_id), K(scn));
|
||||
|
@ -273,7 +273,7 @@ public:
|
||||
|
||||
// get tablet while replaying clog
|
||||
int replay_get_tablet(const common::ObTabletID &tablet_id,
|
||||
const int64_t log_ts,
|
||||
const palf::SCN &scn,
|
||||
ObTabletHandle &handle) const;
|
||||
|
||||
int flush_if_need(const bool need_flush);
|
||||
|
@ -64,7 +64,7 @@ int ObLSSyncTabletSeqHandler::replay(const void *buffer,
|
||||
LOG_WARN("log base header deserialize error", K(ret));
|
||||
} else if (OB_FAIL(log.deserialize(log_buf, nbytes, tmp_pos))) {
|
||||
LOG_WARN("ObSyncTabletSeqLog deserialize error", K(ret));
|
||||
} else if (OB_FAIL(ls_->replay_get_tablet(log.get_tablet_id(), scn.get_val_for_inner_table_field(), tablet_handle))) {
|
||||
} else if (OB_FAIL(ls_->replay_get_tablet(log.get_tablet_id(), scn, tablet_handle))) {
|
||||
if (OB_TABLET_NOT_EXIST == ret) {
|
||||
LOG_INFO("tablet may be deleted, skip this log", K(ret), "tablet_id", log.get_tablet_id(), K(scn));
|
||||
ret = OB_SUCCESS;
|
||||
|
@ -318,10 +318,13 @@ int ObStorageSchemaRecorder::replay_get_tablet_handle(const int64_t log_ts, ObTa
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSHandle ls_handle;
|
||||
palf::SCN scn;
|
||||
if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
||||
LOG_WARN("failed to get log stream", K(ret), K(ls_id_));
|
||||
} else if (OB_FAIL(ls_handle.get_ls()->replay_get_tablet(tablet_id_, log_ts, tablet_handle))) {
|
||||
LOG_WARN("failed to get tablet", K(ret), K_(ls_id), K_(tablet_id), K(log_ts));
|
||||
} else if (OB_FAIL(scn.convert_for_lsn_allocator(log_ts))) {
|
||||
LOG_WARN("convert failed", K(log_ts), K(ret));
|
||||
} else if (OB_FAIL(ls_handle.get_ls()->replay_get_tablet(tablet_id_, scn, tablet_handle))) {
|
||||
LOG_WARN("failed to get tablet", K(ret), K_(ls_id), K_(tablet_id), K(scn));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -806,7 +806,7 @@ int ObLockMemtable::flush(palf::SCN recycle_scn,
|
||||
K(is_frozen_), K(ls_id_));
|
||||
} else if (is_active_memtable()) {
|
||||
if (OB_FAIL(freezer_->get_max_consequent_callbacked_scn(freeze_scn_))) {
|
||||
LOG_WARN("get_max_consequent_callbacked_log_ts failed", K(ret), K(ls_id_));
|
||||
LOG_WARN("get_max_consequent_callbacked_log_scn failed", K(ret), K(ls_id_));
|
||||
} else if (flushed_scn_ >= freeze_scn_) {
|
||||
LOG_INFO("skip freeze because of flushed", K_(ls_id), K_(flushed_scn), K_(freeze_scn));
|
||||
} else {
|
||||
|
@ -971,7 +971,7 @@ int ObTabletCreateDeleteHelper::prepare_remove_tablets(
|
||||
} else if (trans_flags.for_replay_) {
|
||||
if (OB_FAIL(tablet_id_array.reserve(tablet_cnt))) {
|
||||
LOG_WARN("fail to allocate memory for tablet_id_array", K(ret), K(tablet_cnt));
|
||||
} else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.scn_.get_val_for_inner_table_field(), tablet_id_array))) {
|
||||
} else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.scn_, tablet_id_array))) {
|
||||
LOG_WARN("failed to replay verify tablets", K(ret), K(trans_flags));
|
||||
}
|
||||
}
|
||||
@ -1027,7 +1027,7 @@ int ObTabletCreateDeleteHelper::prepare_remove_tablets(
|
||||
|
||||
int ObTabletCreateDeleteHelper::replay_verify_tablets(
|
||||
const ObBatchRemoveTabletArg &arg,
|
||||
const int64_t log_ts,
|
||||
const SCN &scn,
|
||||
ObIArray<ObTabletID> &tablet_id_array)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1035,22 +1035,19 @@ int ObTabletCreateDeleteHelper::replay_verify_tablets(
|
||||
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < arg.tablet_ids_.count(); ++i) {
|
||||
const ObTabletID &tablet_id = arg.tablet_ids_[i];
|
||||
if (OB_FAIL(ls_.replay_get_tablet(tablet_id, log_ts, tablet_handle))) {
|
||||
if (OB_FAIL(ls_.replay_get_tablet(tablet_id, scn, tablet_handle))) {
|
||||
if (OB_TABLET_NOT_EXIST == ret) {
|
||||
LOG_INFO("tablet already deleted", K(ret), K(tablet_id), K(log_ts));
|
||||
LOG_INFO("tablet already deleted", K(ret), K(tablet_id), K(scn));
|
||||
ret = OB_SUCCESS; // tablet already deleted, won't regard it as error
|
||||
} else if (OB_EAGAIN == ret) {
|
||||
// retry again
|
||||
} else {
|
||||
LOG_WARN("failed to replay get tablet", K(ret), K(tablet_id), K(log_ts));
|
||||
LOG_WARN("failed to replay get tablet", K(ret), K(tablet_id), K(scn));
|
||||
}
|
||||
} else {
|
||||
ObTabletTxMultiSourceDataUnit tx_data;
|
||||
SCN scn;
|
||||
if (OB_FAIL(tablet_handle.get_obj()->get_tx_data(tx_data))) {
|
||||
LOG_WARN("failed to get tx data", K(ret), K(tablet_handle));
|
||||
} else if (OB_FAIL(scn.convert_tmp(log_ts))) {
|
||||
LOG_WARN("failed to convert_tmp", K(ret), K(scn));
|
||||
} else if (scn <= tx_data.tx_scn_) {
|
||||
FLOG_INFO("clog ts is no bigger than tx log ts in tx data, should skip this clog",
|
||||
K(tablet_id), K(scn), K(tx_data));
|
||||
@ -1120,7 +1117,7 @@ int ObTabletCreateDeleteHelper::commit_remove_tablets(
|
||||
LOG_WARN("unexpected arg", K(ret), K(arg), K(ls_id));
|
||||
} else if (trans_flags.for_replay_) {
|
||||
if (OB_FAIL(tablet_id_array.reserve(arg.tablet_ids_.count()))) {
|
||||
} else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.scn_.get_val_for_inner_table_field(), tablet_id_array))) {
|
||||
} else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.scn_, tablet_id_array))) {
|
||||
LOG_WARN("failed to replay verify tablets", K(ret), K(trans_flags));
|
||||
}
|
||||
}
|
||||
@ -1199,7 +1196,7 @@ int ObTabletCreateDeleteHelper::abort_remove_tablets(
|
||||
LOG_WARN("unexpected arg", K(ret), K(arg), K(ls_id));
|
||||
} else if (trans_flags.for_replay_) {
|
||||
if (OB_FAIL(tablet_id_array.reserve(arg.tablet_ids_.count()))) {
|
||||
} else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.scn_.get_val_for_inner_table_field(), tablet_id_array))) {
|
||||
} else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.scn_, tablet_id_array))) {
|
||||
LOG_WARN("failed to replay verify tablets", K(ret), K(trans_flags));
|
||||
}
|
||||
}
|
||||
@ -1785,7 +1782,7 @@ int ObTabletCreateDeleteHelper::ensure_skip_create_all_tablets_safe(
|
||||
const ObTabletID &data_tablet_id = info.data_tablet_id_;
|
||||
if (is_mixed_tablets(info) || is_pure_data_tablets(info)) {
|
||||
// data tablet already checked, can ignore
|
||||
} else if (OB_FAIL(ls_.replay_get_tablet(data_tablet_id, scn.get_val_for_inner_table_field(), tablet_handle))) {
|
||||
} else if (OB_FAIL(ls_.replay_get_tablet(data_tablet_id, scn, tablet_handle))) {
|
||||
if (OB_TABLET_NOT_EXIST != ret && OB_EAGAIN != ret) {
|
||||
LOG_WARN("failed to replay get tablet", K(ret), K(data_tablet_id), K(scn));
|
||||
} else if (OB_TABLET_NOT_EXIST == ret) {
|
||||
|
@ -302,7 +302,7 @@ private:
|
||||
const transaction::ObMulSourceDataNotifyArg &trans_flags);
|
||||
int replay_verify_tablets(
|
||||
const obrpc::ObBatchRemoveTabletArg &arg,
|
||||
const int64_t log_ts,
|
||||
const palf::SCN &scn,
|
||||
common::ObIArray<common::ObTabletID> &tablet_id_array);
|
||||
private:
|
||||
ObLS &ls_;
|
||||
|
@ -92,7 +92,7 @@ int ObTabletServiceClogReplayExecutor::replay_update_storage_schema(
|
||||
LOG_WARN("invalid args", K(ret), KP(buf), K(buf_size), K(pos));
|
||||
} else if (OB_FAIL(tablet_id.deserialize(buf, buf_size, tmp_pos))) {
|
||||
LOG_WARN("fail to deserialize tablet id", K(ret), K(buf_size), K(pos), K(tablet_id));
|
||||
} else if (OB_FAIL(ls_->replay_get_tablet(tablet_id, scn.get_val_for_lsn_allocator(), handle))) {
|
||||
} else if (OB_FAIL(ls_->replay_get_tablet(tablet_id, scn, handle))) {
|
||||
if (OB_TABLET_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS; // TODO (bowen.gbw): unify multi data replay logic
|
||||
LOG_INFO("tablet does not exist, skip", K(ret), K(tablet_id));
|
||||
|
@ -558,7 +558,7 @@ int ObTxReplayExecutor::replay_one_row_in_memtable_(ObMutatorRowHeader &row_head
|
||||
lib::Worker::CompatMode mode;
|
||||
ObTabletHandle tablet_handle;
|
||||
|
||||
if (OB_FAIL(ls_->replay_get_tablet(row_head.tablet_id_, log_ts_ns_.get_val_for_lsn_allocator(), tablet_handle))) {
|
||||
if (OB_FAIL(ls_->replay_get_tablet(row_head.tablet_id_, log_ts_ns_, tablet_handle))) {
|
||||
if (OB_TABLET_NOT_EXIST == ret) {
|
||||
ctx_->force_no_need_replay_checksum();
|
||||
ret = OB_SUCCESS;
|
||||
|
@ -148,7 +148,7 @@ void ObTabletGCService::ObTabletGCTask::runTimerTask()
|
||||
// 1. get minor merge point
|
||||
else if (OB_FAIL(freezer->decide_max_decided_scn(checkpoint_scn))) {
|
||||
need_retry = true;
|
||||
STORAGE_LOG(WARN, "get_max_consequent_callbacked_log_ts failed", KR(ret), K(freezer->get_ls_id()));
|
||||
STORAGE_LOG(WARN, "decide_max_decided_scn failed", KR(ret), K(freezer->get_ls_id()));
|
||||
} else if (!checkpoint_scn.is_valid()
|
||||
|| SCN::min_scn() == checkpoint_scn
|
||||
|| checkpoint_scn <= ls->get_tablet_change_checkpoint_scn()) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user