[scn] scn for memtable

This commit is contained in:
Handora 2022-11-28 02:02:38 +00:00 committed by ob-robot
parent a4c9f3e4d4
commit e77375018e
38 changed files with 416 additions and 507 deletions

View File

@ -41,6 +41,13 @@ bool SCN::atomic_bcas(const SCN &old_v, const SCN &new_val)
return ATOMIC_BCAS(&val_, old_v.val_, new_val.val_);
}
SCN SCN::atomic_vcas(const SCN &old_v, const SCN &new_val)
{
SCN tmp;
tmp.val_ = ATOMIC_VCAS(&val_, old_v.val_, new_val.val_);
return tmp;
}
bool SCN::is_valid() const
{
return ((OB_INVALID_SCN_VAL != val_) && (SCN_VERSION == v_));
@ -57,7 +64,7 @@ void SCN::set_max()
ts_ns_ = OB_MAX_SCN_TS_NS;
}
bool SCN::is_max()
bool SCN::is_max() const
{
bool bool_ret = false;
if (v_ == SCN_VERSION && ts_ns_ == OB_MAX_SCN_TS_NS) {
@ -72,7 +79,7 @@ void SCN::set_min()
ts_ns_ = OB_MIN_SCN_TS_NS;
}
bool SCN::is_min()
bool SCN::is_min() const
{
bool bool_ret = false;
if (v_ == SCN_VERSION && ts_ns_ == OB_MIN_SCN_TS_NS) {

View File

@ -33,8 +33,8 @@ public:
void set_invalid();
void set_max();
void set_min();
bool is_max();
bool is_min();
bool is_max() const;
bool is_min() const;
void set_base();
static SCN invalid_scn();
static SCN max_scn();
@ -48,6 +48,7 @@ public:
void atomic_set(const SCN &ref);
SCN atomic_get() const;
bool atomic_bcas(const SCN &old_v, const SCN &new_val);
SCN atomic_vcas(const SCN &old_v, const SCN &new_val);
SCN inc_update(const SCN &ref_scn);
SCN dec_update(const SCN &ref_scn);
static SCN scn_inc(const SCN &ref);

View File

@ -97,7 +97,7 @@ int ObLSMemberTable::prepare_create_tablets(const obrpc::ObBatchCreateTabletArg
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls is NULL", KR(ret), K(ls_handle));
} else if (trans_flags.for_replay_
&& trans_flags.log_ts_ <= ls->get_tablet_change_checkpoint_scn().get_val_for_lsn_allocator()) {
&& trans_flags.scn_ <= ls->get_tablet_change_checkpoint_scn()) {
LOG_INFO("replay skip for create tablet", KR(ret), K(trans_flags), K(arg), K(ls->get_ls_meta()));
} else if (OB_ISNULL(tablet_svr = ls->get_tablet_svr())) {
ret = OB_ERR_UNEXPECTED;
@ -164,7 +164,7 @@ int ObLSMemberTable::on_commit_create_tablets(const obrpc::ObBatchCreateTabletAr
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls is NULL", KR(ret), K(ls_handle));
} else if (trans_flags.for_replay_
&& trans_flags.log_ts_ <= ls->get_tablet_change_checkpoint_scn().get_val_for_lsn_allocator()) {
&& trans_flags.scn_ <= ls->get_tablet_change_checkpoint_scn()) {
LOG_INFO("replay skip for create tablet", KR(ret), K(trans_flags), K(arg), K(ls->get_ls_meta()));
} else if (OB_ISNULL(tablet_svr = ls->get_tablet_svr())) {
ret = OB_ERR_UNEXPECTED;
@ -283,7 +283,7 @@ int ObLSMemberTable::prepare_remove_tablets(const obrpc::ObBatchRemoveTabletArg
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls is NULL", KR(ret), K(ls_handle));
} else if (trans_flags.for_replay_ // dropping tablet triggers dumpping memtable
&& trans_flags.log_ts_ <= ls->get_tablet_change_checkpoint_scn().get_val_for_lsn_allocator()) {
&& trans_flags.scn_ <= ls->get_tablet_change_checkpoint_scn()) {
LOG_INFO("replay skip for remove tablet", KR(ret), K(trans_flags), K(arg), K(ls->get_ls_meta()));
} else if (OB_ISNULL(tablet_svr = ls->get_tablet_svr())) {
ret = OB_ERR_UNEXPECTED;
@ -349,7 +349,7 @@ int ObLSMemberTable::on_commit_remove_tablets(const obrpc::ObBatchRemoveTabletAr
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls is NULL", KR(ret), K(ls_handle));
} else if (trans_flags.for_replay_
&& trans_flags.log_ts_ <= ls->get_tablet_change_checkpoint_scn().get_val_for_lsn_allocator()) {
&& trans_flags.scn_ <= ls->get_tablet_change_checkpoint_scn()) {
LOG_INFO("replay skip for remove tablet", KR(ret), K(trans_flags), K(arg), K(ls->get_ls_meta()));
} else if (OB_ISNULL(tablet_svr = ls->get_tablet_svr())) {
ret = OB_ERR_UNEXPECTED;

View File

@ -560,7 +560,7 @@ int ObMvccRow::row_compact(ObMemtable *memtable,
ObMemtableRowCompactor row_compactor;
if (OB_FAIL(row_compactor.init(this, memtable, node_alloc, for_replay))) {
TRANS_LOG(WARN, "row compactor init error", K(ret));
} else if (OB_FAIL(row_compactor.compact(snapshot_version.get_val_for_lsn_allocator()))) {
} else if (OB_FAIL(row_compactor.compact(snapshot_version))) {
TRANS_LOG(WARN, "row compact error", K(ret), K(snapshot_version));
} else {
// do nothing

View File

@ -976,10 +976,11 @@ int ObMvccRowCallback::trans_commit()
if (value_.need_compact(for_read, ctx_.is_for_replay())) {
if (ctx_.is_for_replay()) {
if (palf::SCN::min_scn() != ctx_.get_replay_compact_version() && palf::SCN::max_scn() != ctx_.get_replay_compact_version()) {
memtable_->row_compact(&value_, ctx_.is_for_replay(), ctx_.get_replay_compact_version().get_val_for_lsn_allocator());
memtable_->row_compact(&value_, ctx_.is_for_replay(), ctx_.get_replay_compact_version());
}
} else {
memtable_->row_compact(&value_, ctx_.is_for_replay(), INT64_MAX - 100);
palf::SCN snapshot_version_for_compact = palf::SCN::minus(palf::SCN::max_scn(), 100);
memtable_->row_compact(&value_, ctx_.is_for_replay(), snapshot_version_for_compact);
}
}
}
@ -1169,8 +1170,8 @@ int ObMvccRowCallback::log_sync(const palf::SCN scn)
{
int ret = OB_SUCCESS;
memtable_->set_rec_log_ts(scn.get_val_for_lsn_allocator());
memtable_->set_max_end_log_ts(scn.get_val_for_lsn_allocator());
memtable_->set_rec_scn(scn);
memtable_->set_max_end_scn(scn);
(void)tnode_->fill_scn(scn);
ctx_.update_max_submitted_seq_no(seq_no_);
if (OB_FAIL(dec_unsynced_cnt_())) {

View File

@ -107,9 +107,9 @@ ObMemtable::ObMemtable()
logging_blocked_start_time(0),
unset_active_memtable_logging_blocked_(false),
resolve_active_memtable_left_boundary_(true),
freeze_log_ts_(INT64_MAX),
max_end_log_ts_(ObScnRange::MIN_TS),
rec_log_ts_(INT64_MAX),
freeze_scn_(palf::SCN::max_scn()),
max_end_scn_(ObScnRange::MIN_SCN),
rec_scn_(palf::SCN::max_scn()),
state_(ObMemtableState::INVALID),
freeze_state_(ObMemtableFreezeState::INVALID),
timestamp_(0),
@ -194,7 +194,7 @@ int ObMemtable::remove_unused_callback_for_uncommited_txn_()
transaction::ObTransService *txs_svr = MTL(transaction::ObTransService *);
if (NULL != txs_svr
&& share::ObScnRange::MAX_TS != get_end_log_ts()
&& share::ObScnRange::MAX_SCN != get_end_scn()
&& OB_FAIL(txs_svr->remove_callback_for_uncommited_txn(this))) {
TRANS_LOG(WARN, "remove callback for uncommited txn failed", K(ret), K(*this));
}
@ -245,15 +245,15 @@ void ObMemtable::destroy()
logging_blocked_start_time = 0;
unset_active_memtable_logging_blocked_ = false;
resolve_active_memtable_left_boundary_ = true;
max_end_log_ts_ = ObScnRange::MIN_TS;
rec_log_ts_ = INT64_MAX;
max_end_scn_ = ObScnRange::MIN_SCN;
rec_scn_ = palf::SCN::max_scn();
read_barrier_ = false;
is_tablet_freeze_ = false;
is_force_freeze_ = false;
is_flushed_ = false;
is_inited_ = false;
contain_hotspot_row_ = false;
snapshot_version_ = INT64_MAX;
snapshot_version_.set_max();
}
////////////////////////////////////////////////////////////////////////////////////////////////////
@ -365,15 +365,15 @@ int ObMemtable::lock_(ObStoreCtx &ctx,
} else {
ObMemtableData mtd(blocksstable::ObDmlFlag::DF_LOCK, len, buf);
ObTxNodeArg arg(&mtd, /*memtable_data*/
NULL, /*old_data*/
timestamp_, /*memstore_version*/
ctx.mvcc_acc_ctx_.tx_scn_ /*seq_no*/);
NULL, /*old_data*/
timestamp_, /*memstore_version*/
ctx.mvcc_acc_ctx_.tx_scn_ /*seq_no*/);
if (OB_FAIL(mvcc_write_(ctx,
&mtk,
/*used for mvcc_write on sstable*/
read_info,
arg,
is_new_locked))) {
&mtk,
/*used for mvcc_write on sstable*/
read_info,
arg,
is_new_locked))) {
} else if (OB_UNLIKELY(!is_new_locked)) {
TRANS_LOG(DEBUG, "lock twice, no need to store lock trans node", K(table_id), K(ctx));
}
@ -550,7 +550,6 @@ int ObMemtable::check_row_locked_by_myself(
} else {
bool is_locked = false;
bool tmp_is_locked = false;
int64_t trans_version = 0;
ObStoreRowLockState lock_state;
const ObIArray<ObITable *> *stores = nullptr;
common::ObSEArray<ObITable *, 4> iter_tables;
@ -1092,7 +1091,7 @@ int ObMemtable::replay_row(ObStoreCtx &ctx,
seq_no, /*seq_no*/
modify_count, /*modify_count*/
acc_checksum, /*acc_checksum*/
scn /*log_ts*/);
scn /*scn*/);
if (OB_FAIL(mtk.encode(&rowkey))) {
TRANS_LOG(WARN, "mtk encode fail", "ret", ret);
@ -1105,7 +1104,7 @@ int ObMemtable::replay_row(ObStoreCtx &ctx,
TRANS_LOG(WARN, "m_replay_row fail", K(ret), K(table_id), K(rowkey), K(row), K(dml_flag),
K(modify_count), K(acc_checksum));
}
} else if (part_ctx->need_update_schema_version(log_id, scn.get_val_for_lsn_allocator())) {
} else if (part_ctx->need_update_schema_version(log_id, scn)) {
ctx.mvcc_acc_ctx_.mem_ctx_->set_table_version(table_version);
set_max_schema_version(table_version);
}
@ -1131,7 +1130,7 @@ int ObMemtable::lock_row_on_frozen_stores_(ObStoreCtx &ctx,
} else if (value->is_lower_lock_scaned()) {
} else {
bool row_locked = false;
int64_t max_trans_version = 0;
palf::SCN max_trans_version = palf::SCN::min_scn();
const ObIArray<ObITable *> *stores = nullptr;
common::ObSEArray<ObITable *, 4> iter_tables;
ctx.table_iter_->resume();
@ -1184,8 +1183,8 @@ int ObMemtable::lock_row_on_frozen_stores_(ObStoreCtx &ctx,
row_locked |= lock_state.is_locked_;
if (lock_state.is_locked_ && my_tx_id != lock_state.lock_trans_id_) {
ret = OB_TRY_LOCK_ROW_CONFLICT;
} else if (max_trans_version < lock_state.trans_version_.get_val_for_lsn_allocator()) {
max_trans_version = lock_state.trans_version_.get_val_for_lsn_allocator();
} else if (max_trans_version < lock_state.trans_version_) {
max_trans_version = lock_state.trans_version_;
}
}
TRANS_LOG(DEBUG, "check_row_locked", K(ret), K(i),
@ -1197,13 +1196,10 @@ int ObMemtable::lock_row_on_frozen_stores_(ObStoreCtx &ctx,
if (OB_SUCC(ret)) {
// use tx_id = 0 indicate MvccRow's max_trans_version inherit from old table
transaction::ObTransID tx_id(0);
// TODO(handora.qc): fix it
palf::SCN max_trans_scn;
max_trans_scn.convert_for_lsn_allocator(max_trans_version);
value->update_max_trans_version(max_trans_scn, tx_id);
value->update_max_trans_version(max_trans_version, tx_id);
if (!row_locked) {
// there is no locks on frozen stores
if (max_trans_scn > ctx.mvcc_acc_ctx_.get_snapshot_version()) {
if (max_trans_version > ctx.mvcc_acc_ctx_.get_snapshot_version()) {
ret = OB_TRANSACTION_SET_VIOLATION;
TRANS_LOG(WARN, "TRANS_SET_VIOLATION", K(ret), K(max_trans_version), "ctx", ctx);
}
@ -1273,7 +1269,7 @@ ObDatumRange &ObMemtable::m_get_real_range(ObDatumRange &real_range, const ObDat
int ObMemtable::row_compact(ObMvccRow *row,
const bool for_replay,
const int64_t snapshot_version)
const palf::SCN snapshot_version)
{
int ret = OB_SUCCESS;
ObMemtableRowCompactor row_compactor;
@ -1368,7 +1364,7 @@ int64_t ObMemtable::dec_write_ref()
} else {
if (0 == get_unsynced_cnt()) {
resolve_right_boundary();
if (OB_FAIL(memtable_mgr_->resolve_left_boundary_for_active_memtable(this, get_end_log_ts(), get_snapshot_version()))) {
if (OB_FAIL(memtable_mgr_->resolve_left_boundary_for_active_memtable(this, get_end_scn(), get_snapshot_version_scn()))) {
TRANS_LOG(WARN, "fail to resolve left boundary for active memtable", K(ret), K(ls_id), KPC(this));
}
}
@ -1401,7 +1397,7 @@ int ObMemtable::dec_unsynced_cnt()
} else if (is_frozen && 0 == write_ref_cnt && 0 == unsynced_cnt) {
resolve_right_boundary();
TRANS_LOG(INFO, "[resolve_right_boundary] dec_unsynced_cnt", K(ls_id), KPC(this));
if (OB_FAIL(memtable_mgr_->resolve_left_boundary_for_active_memtable(this, get_end_log_ts(), get_snapshot_version()))) {
if (OB_FAIL(memtable_mgr_->resolve_left_boundary_for_active_memtable(this, get_end_scn(), get_snapshot_version_scn()))) {
TRANS_LOG(WARN, "fail to set start log ts for active memtable", K(ret), K(ls_id), KPC(this));
}
TRANS_LOG(INFO, "memtable log synced", K(ret), K(ls_id), KPC(this));
@ -1441,24 +1437,24 @@ int ObMemtable::get_frozen_schema_version(int64_t &schema_version) const
return OB_NOT_SUPPORTED;
}
int ObMemtable::set_snapshot_version(const int64_t snapshot_version)
int ObMemtable::set_snapshot_version(const palf::SCN snapshot_version)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not inited", K(ret));
} else if (ObVersionRange::MAX_VERSION == snapshot_version
|| ObVersionRange::MIN_VERSION >= snapshot_version) {
} else if (snapshot_version.is_max()
|| !snapshot_version.is_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), K(snapshot_version));
} else if (snapshot_version_ == ObVersionRange::MAX_VERSION) {
} else if (snapshot_version_.is_max()) {
snapshot_version_ = snapshot_version;
}
return ret;
}
int ObMemtable::set_rec_log_ts(int64_t rec_log_ts)
int ObMemtable::set_rec_scn(palf::SCN rec_scn)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = freezer_->get_ls_id();
@ -1466,19 +1462,19 @@ int ObMemtable::set_rec_log_ts(int64_t rec_log_ts)
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not inited", K(ret));
} else if (ObScnRange::MAX_TS ==rec_log_ts) {
} else if (rec_scn.is_max()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), K(rec_log_ts));
} else if (rec_log_ts <= get_start_log_ts()) {
TRANS_LOG(WARN, "invalid args", K(ret), K(rec_scn));
} else if (rec_scn <= get_start_scn()) {
ret = OB_LOG_TS_OUT_OF_BOUND;
TRANS_LOG(ERROR, "cannot set freeze log ts smaller to start log ts", K(ret), K(rec_log_ts), K(ls_id), KPC(this));
TRANS_LOG(ERROR, "cannot set freeze log ts smaller to start log ts", K(ret), K(rec_scn), K(ls_id), KPC(this));
} else {
int64_t old_rec_log_ts = 0;
int64_t new_rec_log_ts = get_rec_log_ts();
while ((old_rec_log_ts = new_rec_log_ts) > rec_log_ts) {
if ((new_rec_log_ts = ATOMIC_VCAS(&rec_log_ts_, old_rec_log_ts, rec_log_ts))
== old_rec_log_ts) {
new_rec_log_ts = rec_log_ts;
palf::SCN old_rec_scn;
palf::SCN new_rec_scn = get_rec_scn();
while ((old_rec_scn = new_rec_scn) > rec_scn) {
if ((new_rec_scn = rec_scn_.atomic_vcas(old_rec_scn, rec_scn))
== old_rec_scn) {
new_rec_scn = rec_scn;
}
}
}
@ -1486,7 +1482,7 @@ int ObMemtable::set_rec_log_ts(int64_t rec_log_ts)
return ret;
}
int ObMemtable::set_start_log_ts(const int64_t start_ts)
int ObMemtable::set_start_scn(const palf::SCN start_scn)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = freezer_->get_ls_id();
@ -1494,24 +1490,22 @@ int ObMemtable::set_start_log_ts(const int64_t start_ts)
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not inited", K(ret));
} else if (ObScnRange::MAX_TS == start_ts) {
} else if (ObScnRange::MAX_SCN == start_scn) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), K(start_ts));
} else if (start_ts >= get_end_log_ts()
|| (max_end_log_ts_ != 0 && start_ts >= max_end_log_ts_)
|| start_ts >= rec_log_ts_) {
TRANS_LOG(WARN, "invalid args", K(ret), K(start_scn));
} else if (start_scn >= get_end_scn()
|| (max_end_scn_ != palf::SCN::min_scn() && start_scn >= max_end_scn_)
|| start_scn >= rec_scn_) {
ret = OB_LOG_TS_OUT_OF_BOUND;
TRANS_LOG(ERROR, "cannot set start ts now", K(ret), K(start_ts), K(ls_id), KPC(this));
TRANS_LOG(ERROR, "cannot set start ts now", K(ret), K(start_scn), K(ls_id), KPC(this));
} else {
palf::SCN tmp;
tmp.convert_for_gts(start_ts);
key_.scn_range_.start_scn_.atomic_set(tmp);
key_.scn_range_.start_scn_ = start_scn;
}
return ret;
}
int ObMemtable::set_end_log_ts(const int64_t freeze_ts)
int ObMemtable::set_end_scn(const palf::SCN freeze_scn)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = freezer_->get_ls_id();
@ -1519,36 +1513,31 @@ int ObMemtable::set_end_log_ts(const int64_t freeze_ts)
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not inited", K(ret));
} else if (ObScnRange::MAX_TS == freeze_ts) {
} else if (ObScnRange::MAX_SCN == freeze_scn) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), K(freeze_ts));
} else if (freeze_ts < get_start_log_ts()) {
TRANS_LOG(WARN, "invalid args", K(ret), K(freeze_scn));
} else if (freeze_scn < get_start_scn()) {
ret = OB_LOG_TS_OUT_OF_BOUND;
TRANS_LOG(ERROR, "cannot set freeze log ts smaller to start log ts",
K(ret), K(freeze_ts), K(ls_id), KPC(this));
K(ret), K(freeze_scn), K(ls_id), KPC(this));
} else {
int64_t old_end_log_ts = 0;
int64_t new_end_log_ts = get_end_log_ts();
while ((old_end_log_ts = new_end_log_ts) < freeze_ts
|| new_end_log_ts == ObScnRange::MAX_TS) {
//TODO(SCN) TMP Code for compiling, fix AOTIMIC opt with SCN
if (key_.scn_range_.end_scn_.get_val_for_inner_table_field() == old_end_log_ts) {
key_.scn_range_.end_scn_.convert_for_gts(freeze_ts);
new_end_log_ts = freeze_ts;
palf::SCN old_end_scn;
palf::SCN new_end_scn = get_end_scn();
while ((old_end_scn = new_end_scn) < freeze_scn
|| new_end_scn == ObScnRange::MAX_SCN) {
if ((new_end_scn =
key_.scn_range_.end_scn_.atomic_vcas(old_end_scn, freeze_scn))
== old_end_scn) {
new_end_scn = freeze_scn;
}
// if ((new_end_log_ts =
// ATOMIC_VCAS(&(key_.scn_range_.end_scn_), old_end_log_ts, freeze_ts))
// == old_end_log_ts) {
// new_end_log_ts = freeze_ts;
// }
}
freeze_log_ts_ = freeze_ts;
freeze_scn_ = freeze_scn;
}
return ret;
}
int ObMemtable::set_max_end_log_ts(const int64_t log_ts)
int ObMemtable::set_max_end_scn(const palf::SCN scn)
{
int ret = OB_SUCCESS;
share::ObLSID ls_id = freezer_->get_ls_id();
@ -1556,21 +1545,21 @@ int ObMemtable::set_max_end_log_ts(const int64_t log_ts)
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
TRANS_LOG(WARN, "not inited", K(ret));
} else if (ObScnRange::MAX_TS == log_ts) {
} else if (ObScnRange::MAX_SCN == scn) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid args", K(ret), K(log_ts));
} else if (log_ts <= get_start_log_ts() || log_ts > get_end_log_ts()) {
TRANS_LOG(WARN, "invalid args", K(ret), K(scn));
} else if (scn <= get_start_scn() || scn > get_end_scn()) {
ret = OB_LOG_TS_OUT_OF_BOUND;
TRANS_LOG(WARN, "cannot set max end log ts smaller to start log ts",
K(ret), K(log_ts), K(ls_id), KPC(this));
K(ret), K(scn), K(ls_id), KPC(this));
} else {
int64_t old_max_end_log_ts = 0;
int64_t new_max_end_log_ts = get_max_end_log_ts();
while ((old_max_end_log_ts = new_max_end_log_ts) < log_ts) {
if ((new_max_end_log_ts =
ATOMIC_VCAS(&max_end_log_ts_, old_max_end_log_ts, log_ts))
== old_max_end_log_ts) {
new_max_end_log_ts = log_ts;
palf::SCN old_max_end_scn;
palf::SCN new_max_end_scn = get_max_end_scn();
while ((old_max_end_scn = new_max_end_scn) < scn) {
if ((new_max_end_scn =
max_end_scn_.atomic_vcas(old_max_end_scn, scn))
== old_max_end_scn) {
new_max_end_scn = scn;
}
}
}
@ -1581,41 +1570,41 @@ int ObMemtable::set_max_end_log_ts(const int64_t log_ts)
bool ObMemtable::rec_scn_is_stable()
{
int ret = OB_SUCCESS;
bool rec_log_ts_is_stable = false;
if (INT64_MAX == rec_log_ts_) {
rec_log_ts_is_stable = (is_frozen_memtable() && write_ref_cnt_ == 0 && unsynced_cnt_ == 0);
bool rec_scn_is_stable = false;
if (palf::SCN::max_scn() == rec_scn_) {
rec_scn_is_stable = (is_frozen_memtable() && write_ref_cnt_ == 0 && unsynced_cnt_ == 0);
} else {
int64_t max_consequent_callbacked_log_ts = INT64_MAX;
if (OB_FAIL(freezer_->get_max_consequent_callbacked_log_ts(max_consequent_callbacked_log_ts))) {
STORAGE_LOG(WARN, "get_max_consequent_callbacked_log_ts failed", K(ret), K(freezer_->get_ls_id()));
palf::SCN max_consequent_callbacked_scn;
if (OB_FAIL(freezer_->get_max_consequent_callbacked_scn(max_consequent_callbacked_scn))) {
STORAGE_LOG(WARN, "get_max_consequent_callbacked_scn failed", K(ret), K(freezer_->get_ls_id()));
} else {
rec_log_ts_is_stable = (max_consequent_callbacked_log_ts >= rec_log_ts_);
rec_scn_is_stable = (max_consequent_callbacked_scn >= rec_scn_);
}
if (!rec_log_ts_is_stable &&
if (!rec_scn_is_stable &&
(mt_stat_.frozen_time_ != 0 &&
ObTimeUtility::current_time() - mt_stat_.frozen_time_ > 10 * 1000 * 1000L)) {
STORAGE_LOG(WARN, "memtable rec_log_ts not stable for long time",
STORAGE_LOG(WARN, "memtable rec_scn not stable for long time",
K(freezer_->get_ls_id()), K(*this), K(mt_stat_.frozen_time_),
K(max_consequent_callbacked_log_ts));
K(max_consequent_callbacked_scn));
ADD_SUSPECT_INFO(MINI_MERGE,
freezer_->get_ls_id(), get_tablet_id(),
"memtable rec_log_ts not stable",
K(rec_log_ts_),
K(max_consequent_callbacked_log_ts));
"memtable rec_scn not stable",
K(rec_scn_),
K(max_consequent_callbacked_scn));
}
}
return rec_log_ts_is_stable;
return rec_scn_is_stable;
}
int ObMemtable::get_current_right_boundary(int64_t &current_right_boundary)
int ObMemtable::get_current_right_boundary(palf::SCN &current_right_boundary)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(freezer_)) {
ret = OB_ENTRY_NOT_EXIST;
TRANS_LOG(WARN, "freezer should not be null", K(ret));
} else if (OB_FAIL(freezer_->get_max_consequent_callbacked_log_ts(current_right_boundary))) {
TRANS_LOG(WARN, "fail to get min_unreplay_log_ts", K(ret), K(current_right_boundary));
} else if (OB_FAIL(freezer_->get_max_consequent_callbacked_scn(current_right_boundary))) {
TRANS_LOG(WARN, "fail to get min_unreplay_scn", K(ret), K(current_right_boundary));
}
return ret;
@ -1638,12 +1627,12 @@ bool ObMemtable::ready_for_flush_()
bool bool_ret = is_frozen_memtable() && 0 == get_write_ref() && 0 == get_unsynced_cnt();
int ret = OB_SUCCESS;
int64_t current_right_boundary = ObScnRange::MIN_TS;
palf::SCN current_right_boundary = ObScnRange::MIN_SCN;
share::ObLSID ls_id = freezer_->get_ls_id();
if (bool_ret) {
if (OB_FAIL(resolve_snapshot_version_())) {
TRANS_LOG(WARN, "fail to resolve snapshot version", K(ret), KPC(this), K(ls_id));
} else if (OB_FAIL(resolve_max_end_log_ts_())) {
} else if (OB_FAIL(resolve_max_end_scn_())) {
TRANS_LOG(WARN, "fail to resolve snapshot version", K(ret), KPC(this), K(ls_id));
} else {
resolve_right_boundary();
@ -1651,7 +1640,7 @@ bool ObMemtable::ready_for_flush_()
if (OB_FAIL(get_current_right_boundary(current_right_boundary))) {
TRANS_LOG(WARN, "fail to get current right boundary", K(ret));
}
bool_ret = current_right_boundary >= get_end_log_ts() &&
bool_ret = current_right_boundary >= get_end_scn() &&
(is_empty() || get_resolve_active_memtable_left_boundary());
if (bool_ret) {
freeze_state_ = ObMemtableFreezeState::READY_FOR_FLUSH;
@ -1681,14 +1670,14 @@ bool ObMemtable::ready_for_flush_()
K(get_write_ref()),
K(get_unsynced_cnt()),
K(current_right_boundary),
K(get_end_log_ts()));
K(get_end_scn()));
freezer_->get_stat().add_memtable_info(get_tablet_id(),
get_start_log_scn(),
get_end_log_scn(),
get_start_scn(),
get_end_scn(),
get_write_ref(),
get_unsubmitted_cnt(),
get_unsynced_cnt(),
current_right_boundary);
current_right_boundary.get_val_for_tx());
}
return bool_ret;
@ -1702,8 +1691,8 @@ void ObMemtable::print_ready_for_flush()
bool frozen_memtable_flag = is_frozen_memtable();
int64_t write_ref = get_write_ref();
int64_t unsynced_cnt = get_unsynced_cnt();
int64_t end_log_ts = get_end_log_ts();
int64_t current_right_boundary = ObScnRange::MIN_TS;
palf::SCN end_scn = get_end_scn();
palf::SCN current_right_boundary;
uint32_t logstream_freeze_clock = freezer_->get_freeze_clock();
uint32_t memtable_freeze_clock = freeze_clock_;
if (OB_FAIL(get_current_right_boundary(current_right_boundary))) {
@ -1712,13 +1701,13 @@ void ObMemtable::print_ready_for_flush()
bool bool_ret = frozen_memtable_flag &&
0 == write_ref &&
0 == unsynced_cnt &&
current_right_boundary >= end_log_ts;
current_right_boundary >= end_scn;
TRANS_LOG(INFO, "[ObFreezer] print_ready_for_flush",
KP(this), K(ls_id), K(tablet_id),
K(ret), K(bool_ret),
K(frozen_memtable_flag), K(write_ref), K(unsynced_cnt),
K(current_right_boundary), K(end_log_ts),
K(current_right_boundary), K(end_scn),
K(logstream_freeze_clock), K(memtable_freeze_clock));
}
@ -1756,16 +1745,16 @@ void ObMemtable::print_ready_for_flush()
int ObMemtable::resolve_snapshot_version_()
{
int ret = OB_SUCCESS;
int64_t freeze_snapshot_version = OB_INVALID_TIMESTAMP;
palf::SCN freeze_snapshot_version;
if (snapshot_version_ != ObVersionRange::MAX_VERSION) {
if (snapshot_version_ != palf::SCN::max_scn()) {
// Pass if snapshot is already set
} else if (OB_ISNULL(freezer_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "freezer should not be null", K(ret));
} else if (FALSE_IT(freeze_snapshot_version = freezer_->get_freeze_snapshot_version().get_val_for_lsn_allocator())) {
} else if (FALSE_IT(freeze_snapshot_version = freezer_->get_freeze_snapshot_version())) {
TRANS_LOG(ERROR, "fail to get freeze_snapshot_version", K(ret));
} else if (OB_INVALID_TIMESTAMP == freeze_snapshot_version) {
} else if (palf::SCN::invalid_scn() == freeze_snapshot_version) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "fail to get freeze_snapshot_version", K(ret), KPC(this));
} else if (OB_FAIL(set_snapshot_version(freeze_snapshot_version))) {
@ -1775,30 +1764,30 @@ int ObMemtable::resolve_snapshot_version_()
return ret;
}
// The max_decided log ts is used to push up the end_log_ts of the memtable
// The max_decided log ts is used to push up the end_scn of the memtable
// using the max decided log ts.
// Before the revision, the end_log_ts of the memtable is the max committed log
// Before the revision, the end_scn of the memtable is the max committed log
// ts of the data on the memtable. So for all 2pc txn and some 1pc txn whose
// data log is seperated with the commit log, the end_log_ts of the memtable is
// smaller than the commit_log_ts of the txn. And when the merge happens, the
// data log is seperated with the commit log, the end_scn of the memtable is
// smaller than the commit_scn of the txn. And when the merge happens, the
// txn node will therefore not be cleanout. And the read after merge will be
// very slow due to tx data table lookup.
// So finally we decide to use the max decoded log ts of the ls to update the
// end_log_ts of the memtable
int ObMemtable::resolve_max_end_log_ts_()
// end_scn of the memtable
int ObMemtable::resolve_max_end_scn_()
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
int64_t max_decided_log_ts = OB_INVALID_TIMESTAMP;
palf::SCN max_decided_scn;
if (OB_ISNULL(freezer_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "freezer should not be null", K(ret));
} else if (FALSE_IT(max_decided_log_ts = freezer_->get_max_decided_scn().get_val_for_lsn_allocator())) {
} else if (FALSE_IT(max_decided_scn = freezer_->get_max_decided_scn())) {
TRANS_LOG(ERROR, "fail to get freeze_snapshot_version", K(ret));
} else if (OB_INVALID_TIMESTAMP == max_decided_log_ts) {
} else if (palf::SCN::invalid_scn() == max_decided_scn) {
// Pass if not necessary
} else if (OB_TMP_FAIL(set_max_end_log_ts(max_decided_log_ts))) {
} else if (OB_TMP_FAIL(set_max_end_scn(max_decided_scn))) {
// ignore the error code
}
@ -1807,25 +1796,25 @@ int ObMemtable::resolve_max_end_log_ts_()
int ObMemtable::resolve_right_boundary()
{
int64_t max_end_log_ts = get_max_end_log_ts();
int64_t end_log_ts = max_end_log_ts;
int64_t start_log_ts = get_start_log_ts();
palf::SCN max_end_scn = get_max_end_scn();
palf::SCN end_scn = max_end_scn;
palf::SCN start_scn = get_start_scn();
int ret = OB_SUCCESS;
if (ObScnRange::MIN_TS == max_end_log_ts) {
end_log_ts = start_log_ts;
if (ObScnRange::MIN_SCN == max_end_scn) {
end_scn = start_scn;
(void)freezer_->inc_empty_memtable_cnt();
}
if (OB_FAIL(set_end_log_ts(end_log_ts))) {
TRANS_LOG(ERROR, "fail to set end_log_ts", K(ret));
if (OB_FAIL(set_end_scn(end_scn))) {
TRANS_LOG(ERROR, "fail to set end_scn", K(ret));
}
return ret;
}
void ObMemtable::resolve_left_boundary(int64_t end_log_ts)
void ObMemtable::resolve_left_boundary(palf::SCN end_scn)
{
set_start_log_ts(end_log_ts);
set_start_scn(end_scn);
}
void ObMemtable::set_freeze_state(const int64_t state)
@ -1890,13 +1879,6 @@ int ObMemtable::flush(share::ObLSID ls_id)
return ret;
}
palf::SCN ObMemtable::get_rec_scn()
{
palf::SCN tmp;
tmp.convert_for_lsn_allocator(get_rec_log_ts());
return tmp;
}
bool ObMemtable::is_active_memtable() const
{
return !is_frozen_memtable();

View File

@ -283,7 +283,7 @@ public:
storage::ObStoreRowIterator *&row_iter) override;
// replay_row is used to replay rows in redo log for follower
// ctx is the writer tx's context, we need the log_ts, tx_id for fulfilling the tx node
// ctx is the writer tx's context, we need the scn, tx_id for fulfilling the tx node
// mmi is mutator iterator for replay
// decrypt_buf is used for decryption
virtual int replay_row(
@ -319,7 +319,7 @@ public:
inline bool not_empty() const { return INT64_MAX != get_protection_clock(); };
void set_max_schema_version(const int64_t schema_version);
virtual int64_t get_max_schema_version() const override;
int row_compact(ObMvccRow *value, const bool for_replay, const int64_t snapshot_version);
int row_compact(ObMvccRow *value, const bool for_replay, const palf::SCN snapshot_version);
int64_t get_hash_item_count() const;
int64_t get_hash_alloc_memory() const;
int64_t get_btree_item_count() const;
@ -330,12 +330,12 @@ public:
virtual bool is_active_memtable() const override;
virtual bool is_inner_tablet() const { return key_.tablet_id_.is_inner_tablet(); }
ObTabletID get_tablet_id() const { return key_.tablet_id_; }
int set_snapshot_version(const int64_t snapshot_version);
int set_snapshot_version(const palf::SCN snapshot_version);
int64_t get_memtable_state() const { return state_; }
int64_t get_freeze_state() const { return freeze_state_; }
int64_t get_protection_clock() const { return local_allocator_.get_protection_clock(); }
int64_t get_retire_clock() const { return local_allocator_.get_retire_clock(); }
int get_current_right_boundary(int64_t &current_right_boundary);
int get_current_right_boundary(palf::SCN &current_right_boundary);
inline bool& get_read_barrier() { return read_barrier_; }
inline void set_write_barrier() { write_barrier_ = true; }
@ -374,49 +374,32 @@ public:
virtual bool ready_for_flush() override;
void print_ready_for_flush();
virtual int flush(share::ObLSID ls_id) override;
virtual int64_t get_rec_log_ts() { return ATOMIC_LOAD(&rec_log_ts_); }
virtual palf::SCN get_rec_scn();
palf::SCN get_rec_scn() { return rec_scn_.atomic_get(); }
virtual bool is_frozen_checkpoint() const override { return is_frozen_memtable();}
virtual bool is_active_checkpoint() const override { return is_active_memtable();}
virtual OB_INLINE palf::SCN get_end_log_scn() const
virtual OB_INLINE palf::SCN get_end_scn() const
{
palf::SCN end_scn = palf::SCN::max_scn();
end_scn.atomic_set(key_.scn_range_.end_scn_);
return end_scn;
return key_.scn_range_.end_scn_;
}
virtual OB_INLINE palf::SCN get_start_log_scn() const
virtual OB_INLINE palf::SCN get_start_scn() const
{
palf::SCN start_scn = palf::SCN::min_scn();
start_scn.atomic_set(key_.scn_range_.start_scn_);
return start_scn;
}
virtual OB_INLINE int64_t get_end_log_ts() const override
{
palf::SCN end_scn = palf::SCN::max_scn();
end_scn.atomic_set(key_.scn_range_.end_scn_);
return end_scn.get_val_for_inner_table_field();
}
virtual OB_INLINE int64_t get_start_log_ts() const override
{
palf::SCN start_scn = palf::SCN::min_scn();
start_scn.atomic_set(key_.scn_range_.start_scn_);
return start_scn.get_val_for_inner_table_field();
return key_.scn_range_.start_scn_;
}
bool is_empty()
{
return get_end_log_ts() == get_start_log_ts() &&
share::ObScnRange::MIN_TS == get_max_end_log_ts();
return get_end_scn() == get_start_scn() &&
share::ObScnRange::MIN_SCN == get_max_end_scn();
}
int resolve_right_boundary();
void resolve_left_boundary(int64_t end_log_ts);
void resolve_left_boundary(palf::SCN end_scn);
int resolve_snapshot_version_();
int resolve_max_end_log_ts_();
int64_t get_max_end_log_ts() const { return ATOMIC_LOAD(&max_end_log_ts_); }
int set_rec_log_ts(int64_t rec_log_ts);
int set_start_log_ts(const int64_t start_ts);
int set_end_log_ts(const int64_t freeze_ts);
int set_max_end_log_ts(const int64_t log_ts);
int resolve_max_end_scn_();
palf::SCN get_max_end_scn() const { return max_end_scn_.atomic_get(); }
int set_rec_scn(palf::SCN rec_scn);
int set_start_scn(const palf::SCN start_ts);
int set_end_scn(const palf::SCN freeze_ts);
int set_max_end_scn(const palf::SCN scn);
inline int set_logging_blocked()
{
logging_blocked_start_time = common::ObTimeUtility::current_time();
@ -455,7 +438,7 @@ public:
template<class T>
int save_multi_source_data_unit(const T *const multi_source_data_unit,
const int64_t log_ts,
const palf::SCN scn,
const bool for_replay,
const MemtableRefOp ref_op = MemtableRefOp::NONE,
const bool is_callback = false);
@ -467,7 +450,7 @@ public:
K_(freeze_clock), K_(max_schema_version), K_(write_ref_cnt), K_(local_allocator),
K_(unsubmitted_cnt), K_(unsynced_cnt),
K_(logging_blocked), K_(unset_active_memtable_logging_blocked), K_(resolve_active_memtable_left_boundary),
K_(contain_hotspot_row), K_(max_end_log_ts), K_(rec_log_ts), K_(snapshot_version),
K_(contain_hotspot_row), K_(max_end_scn), K_(rec_scn), K_(snapshot_version),
K_(is_tablet_freeze), K_(is_force_freeze), K_(contain_hotspot_row),
K_(read_barrier), K_(is_flushed), K_(freeze_state));
private:
@ -542,9 +525,9 @@ private:
int64_t logging_blocked_start_time; // record the start time of logging blocked
bool unset_active_memtable_logging_blocked_;
bool resolve_active_memtable_left_boundary_;
int64_t freeze_log_ts_;
int64_t max_end_log_ts_;
int64_t rec_log_ts_;
palf::SCN freeze_scn_;
palf::SCN max_end_scn_;
palf::SCN rec_scn_;
int64_t state_;
int64_t freeze_state_;
int64_t timestamp_;
@ -563,7 +546,7 @@ private:
template<class T>
int ObMemtable::save_multi_source_data_unit(const T *const multi_source_data_unit,
const int64_t log_ts,
const palf::SCN scn,
const bool for_replay,
const MemtableRefOp ref_op,
const bool is_callback)
@ -597,19 +580,19 @@ int ObMemtable::save_multi_source_data_unit(const T *const multi_source_data_uni
TRANS_LOG(WARN, "fail to inc_cnt_for_multi_data", K(multi_source_data_unit), K(type), KPC(this));
}
}
if (log_ts > get_start_log_ts() && log_ts < share::ObScnRange::MAX_TS) {
if (scn > get_start_scn() && scn < share::ObScnRange::MAX_SCN) {
if (OB_FAIL(ret)) {
}
// skip updating max_end_log_ts of frozen memtable for commit/abort when replay clog.
else if ((share::ObScnRange::MAX_TS == get_end_log_ts() || !for_replay || !is_callback)
&& OB_FAIL(set_max_end_log_ts(log_ts))) {
TRANS_LOG(WARN, "failed to set max_end_log_ts", K(ret), K(log_ts), KPC(this));
} else if (OB_FAIL(set_rec_log_ts(log_ts))) {
TRANS_LOG(WARN, "failed to set rec_log_ts", K(ret), K(log_ts), KPC(this));
// skip updating max_end_scn of frozen memtable for commit/abort when replay clog.
else if ((share::ObScnRange::MAX_SCN == get_end_scn() || !for_replay || !is_callback)
&& OB_FAIL(set_max_end_scn(scn))) {
TRANS_LOG(WARN, "failed to set max_end_scn", K(ret), K(scn), KPC(this));
} else if (OB_FAIL(set_rec_scn(scn))) {
TRANS_LOG(WARN, "failed to set rec_scn", K(ret), K(scn), KPC(this));
}
} else if (-1 != log_ts && share::ObScnRange::MAX_TS != log_ts) {
} else if (palf::SCN::invalid_scn() != scn && share::ObScnRange::MAX_SCN != scn) {
ret = common::OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "invalid log_ts", K(ret), K(log_ts), KPC(this));
TRANS_LOG(WARN, "invalid scn", K(ret), K(scn), KPC(this));
}
if (MemtableRefOp::DEC_REF == ref_op) {
@ -621,7 +604,7 @@ int ObMemtable::save_multi_source_data_unit(const T *const multi_source_data_uni
}
}
}
TRANS_LOG(INFO, "memtable save multi source data unit", K(ret), K(log_ts), K(ref_op),
TRANS_LOG(INFO, "memtable save multi source data unit", K(ret), K(scn), K(ref_op),
KPC(multi_source_data_unit), K(type), KPC(this));
}

View File

@ -196,7 +196,7 @@ int64_t ObMemtableCtx::to_string(char *buf, const int64_t buf_len) const
common::databuff_printf(buf, buf_len, pos,
" end_code=%d is_readonly=%s ref=%ld trans_id=%s ls_id=%ld "
"callback_alloc_count=%ld callback_free_count=%ld "
"checksum=%lu tmp_checksum=%lu checksum_log_ts=%s "
"checksum=%lu tmp_checksum=%lu checksum_scn=%s "
"redo_filled_count=%ld redo_sync_succ_count=%ld "
"redo_sync_fail_count=%ld main_list_length=%ld "
"unsynced_cnt=%ld unsubmitted_cnt_=%ld "
@ -438,26 +438,20 @@ int ObMemtableCtx::trans_begin()
return ret;
}
int ObMemtableCtx::replay_begin(const int64_t log_timestamp)
int ObMemtableCtx::replay_begin(const palf::SCN scn)
{
ObByteLockGuard guard(lock_);
// TODO(handora.qc): fix it
palf::SCN scn;
scn.convert_for_lsn_allocator(log_timestamp);
set_redo_scn(scn);
// TODO set redo log id
return OB_SUCCESS;
}
int ObMemtableCtx::replay_end(const bool is_replay_succ,
const int64_t log_timestamp)
const palf::SCN scn)
{
int ret = OB_SUCCESS;
ObByteLockGuard guard(lock_);
// TODO(handora.qc): fix it
palf::SCN scn;
scn.convert_for_lsn_allocator(log_timestamp);
if (!is_replay_succ) {
ret = trans_mgr_.replay_fail(scn);
@ -468,13 +462,10 @@ int ObMemtableCtx::replay_end(const bool is_replay_succ,
return ret;
}
int ObMemtableCtx::rollback_redo_callbacks(const int64_t log_timestamp)
int ObMemtableCtx::rollback_redo_callbacks(const palf::SCN scn)
{
int ret = OB_SUCCESS;
ObByteLockGuard guard(lock_);
// TODO(handora.qc): fix it
palf::SCN scn;
scn.convert_for_lsn_allocator(log_timestamp);
ret = trans_mgr_.replay_fail(scn);
@ -483,14 +474,14 @@ int ObMemtableCtx::rollback_redo_callbacks(const int64_t log_timestamp)
int ObMemtableCtx::trans_end(
const bool commit,
const int64_t trans_version,
const int64_t final_log_ts)
const palf::SCN trans_version,
const palf::SCN final_scn)
{
int ret = OB_SUCCESS;
ret = do_trans_end(commit,
trans_version,
final_log_ts,
final_scn,
commit ? OB_TRANS_COMMITED : OB_TRANS_ROLLBACKED);
return ret;
@ -513,8 +504,8 @@ int ObMemtableCtx::elr_trans_preparing()
int ObMemtableCtx::do_trans_end(
const bool commit,
const int64_t trans_version,
const int64_t final_log_ts,
const palf::SCN trans_version,
const palf::SCN final_scn,
const int end_code)
{
int ret = OB_SUCCESS;
@ -527,12 +518,8 @@ int ObMemtableCtx::do_trans_end(
ob_abort();
}
if (partial_rollbacked || OB_SUCCESS == ATOMIC_LOAD(&end_code_)) {
// TODO(handora.qc): fix it
palf::SCN commit_scn;
commit_scn.convert_for_lsn_allocator(trans_version);
ATOMIC_STORE(&end_code_, end_code);
set_commit_version(commit_scn);
set_commit_version(trans_version);
if (OB_FAIL(trans_mgr_.trans_end(commit))) {
TRANS_LOG(WARN, "trans end error", K(ret), K(*this));
}
@ -543,9 +530,9 @@ int ObMemtableCtx::do_trans_end(
}
// release durable table lock
if (OB_FAIL(ret)) {
UNUSED(final_log_ts);
UNUSED(final_scn);
//commit or abort log ts for clear table lock
} else if (OB_FAIL(clear_table_lock_(commit, trans_version, final_log_ts))) {
} else if (OB_FAIL(clear_table_lock_(commit, trans_version, final_scn))) {
TRANS_LOG(ERROR, "clear table lock failed.", K(ret), K(*this));
}
(void)partition_audit_info_cache_.stmt_end_update_audit_info(commit);
@ -562,7 +549,7 @@ int ObMemtableCtx::trans_kill()
{
int ret = OB_SUCCESS;
bool commit = false;
ret = do_trans_end(commit, INT64_MAX, INT64_MAX, OB_TRANS_KILLED);
ret = do_trans_end(commit, palf::SCN::max_scn(), palf::SCN::max_scn(), OB_TRANS_KILLED);
return ret;
}
@ -585,15 +572,15 @@ int ObMemtableCtx::trans_replay_begin()
}
int ObMemtableCtx::trans_replay_end(const bool commit,
const int64_t trans_version,
const int64_t final_log_ts,
const palf::SCN trans_version,
const palf::SCN final_scn,
const uint64_t log_cluster_version,
const uint64_t checksum)
{
int ret = OB_SUCCESS;
int cs_ret = OB_SUCCESS;
// We must calculate the checksum and generate the checksum_log_ts even when
// We must calculate the checksum and generate the checksum_scn even when
// the checksum verification is unnecessary. This because the trans table
// merge may be triggered after clear state in which the callback has already
@ -610,7 +597,7 @@ int ObMemtableCtx::trans_replay_end(const bool commit,
}
}
if (OB_FAIL(trans_end(commit, trans_version, final_log_ts))) {
if (OB_FAIL(trans_end(commit, trans_version, final_scn))) {
TRANS_LOG(ERROR, "trans_end fail", K(ret), K(*this));
} else {
ret = cs_ret;
@ -696,11 +683,11 @@ int ObMemtableCtx::log_submitted(const ObRedoLogSubmitHelper &helper)
return log_gen_.log_submitted(helper.callbacks_);
}
int ObMemtableCtx::sync_log_succ(const int64_t log_ts, const ObCallbackScope &callbacks)
int ObMemtableCtx::sync_log_succ(const palf::SCN scn, const ObCallbackScope &callbacks)
{
int ret = OB_SUCCESS;
if (OB_FAIL(log_gen_.sync_log_succ(log_ts, callbacks))) {
if (OB_FAIL(log_gen_.sync_log_succ(scn, callbacks))) {
TRANS_LOG(WARN, "sync log failed", K(ret));
}
@ -1022,33 +1009,24 @@ int ObMemtableCtx::reuse_log_generator_()
return ret;
}
int ObMemtableCtx::calc_checksum_before_log_ts(const int64_t log_ts,
uint64_t &checksum,
int64_t &checksum_log_ts)
int ObMemtableCtx::calc_checksum_before_scn(const palf::SCN scn,
uint64_t &checksum,
palf::SCN &checksum_scn)
{
int ret = OB_SUCCESS;
ObByteLockGuard guard(lock_);
// TODO(handora.qc): fix it
palf::SCN checksum_scn;
palf::SCN scn;
scn.convert_for_lsn_allocator(log_ts);
if (OB_FAIL(trans_mgr_.calc_checksum_before_scn(scn, checksum, checksum_scn))) {
TRANS_LOG(ERROR, "calc checksum before log ts should not report error", K(ret), K(log_ts));
TRANS_LOG(ERROR, "calc checksum before log ts should not report error", K(ret), K(scn));
}
checksum_log_ts = checksum_scn.get_val_for_lsn_allocator();
return ret;
}
void ObMemtableCtx::update_checksum(const uint64_t checksum,
const int64_t checksum_log_ts)
const palf::SCN checksum_scn)
{
ObByteLockGuard guard(lock_);
// TODO(handora.qc): fix it
palf::SCN checksum_scn;
checksum_scn.convert_for_lsn_allocator(checksum_log_ts);
trans_mgr_.update_checksum(checksum, checksum_scn);
}
@ -1267,20 +1245,6 @@ void ObMemtableCtx::set_log_synced(ObMemCtxLockOpLinkNode *lock_op, const palf::
lock_mem_ctx_.set_log_synced(lock_op, scn);
}
int ObMemtableCtx::clear_table_lock_(const bool is_commit,
const int64_t commit_version,
const int64_t commit_log_ts)
{
int ret = OB_SUCCESS;
// TODO: cxf remove it
palf::SCN version;
palf::SCN scn;
version.convert_for_lsn_allocator(commit_version);
scn.convert_for_lsn_allocator(commit_log_ts);
ret = clear_table_lock_(is_commit, version, scn);
return ret;
}
int ObMemtableCtx::clear_table_lock_(const bool is_commit,
const palf::SCN &commit_version,
const palf::SCN &commit_scn)
@ -1347,17 +1311,6 @@ int ObMemtableCtx::register_multi_source_data_if_need_(
return ret;
}
int ObMemtableCtx::replay_lock(const tablelock::ObTableLockOp &lock_op,
const int64_t log_ts)
{
int ret = OB_SUCCESS;
// TODO: remove this
palf::SCN tmp;
tmp.convert_for_lsn_allocator(log_ts);
ret = replay_lock(lock_op, tmp);
return ret;
}
int ObMemtableCtx::replay_lock(const tablelock::ObTableLockOp &lock_op,
const palf::SCN &scn)
{

View File

@ -346,23 +346,23 @@ public:
virtual int write_auth(const bool exclusive);
virtual int write_done();
virtual int trans_begin();
virtual int replay_begin(const int64_t log_timestamp);
virtual int replay_begin(const palf::SCN scn);
virtual int replay_end(const bool is_replay_succ,
const int64_t log_timestamp);
int rollback_redo_callbacks(const int64_t log_timestamp);
const palf::SCN scn);
int rollback_redo_callbacks(const palf::SCN scn);
virtual uint64_t calc_checksum_all();
virtual void print_callbacks();
virtual int trans_end(const bool commit,
const int64_t trans_version,
const int64_t final_log_ts);
const palf::SCN trans_version,
const palf::SCN final_scn);
virtual int trans_clear();
virtual int elr_trans_preparing();
virtual int trans_kill();
virtual int trans_publish();
virtual int trans_replay_begin();
virtual int trans_replay_end(const bool commit,
const int64_t trans_version,
const int64_t final_log_ts,
const palf::SCN trans_version,
const palf::SCN final_scn,
const uint64_t log_cluster_version = 0,
const uint64_t checksum = 0);
//method called when leader takeover
@ -389,17 +389,17 @@ public:
int64_t &buf_pos,
ObRedoLogSubmitHelper &helper,
const bool log_for_lock_node = true);
int calc_checksum_before_log_ts(const int64_t log_ts,
uint64_t &checksum,
int64_t &checksum_log_ts);
int calc_checksum_before_scn(const palf::SCN scn,
uint64_t &checksum,
palf::SCN &checksum_scn);
void update_checksum(const uint64_t checksum,
const int64_t checksum_log_ts);
const palf::SCN checksum_scn);
int log_submitted(const ObRedoLogSubmitHelper &helper);
// the function apply the side effect of dirty txn and return whether
// remaining pending callbacks.
// NB: the fact whether there remains pending callbacks currently is only used
// for continuing logging when minor freeze
int sync_log_succ(const int64_t log_ts, const ObCallbackScope &callbacks);
int sync_log_succ(const palf::SCN scn, const ObCallbackScope &callbacks);
void sync_log_fail(const ObCallbackScope &callbacks);
bool is_slow_query() const;
virtual void set_trans_ctx(transaction::ObPartTransCtx *ctx);
@ -454,7 +454,7 @@ public:
void replay_done();
int64_t get_checksum() const { return trans_mgr_.get_checksum(); }
int64_t get_tmp_checksum() const { return trans_mgr_.get_tmp_checksum(); }
palf::SCN get_checksum_log_ts() const { return trans_mgr_.get_checksum_scn(); }
palf::SCN get_checksum_scn() const { return trans_mgr_.get_checksum_scn(); }
public:
// table lock.
int enable_lock_table(storage::ObTableHandleV2 &handle);
@ -478,8 +478,6 @@ public:
void set_log_synced(ObMemCtxLockOpLinkNode *lock_op, const palf::SCN &scn);
// replay lock to lock map and trans part ctx.
// used by the replay process of multi data source.
int replay_lock(const transaction::tablelock::ObTableLockOp &lock_op,
const int64_t log_ts);
int replay_lock(const transaction::tablelock::ObTableLockOp &lock_op,
const palf::SCN &scn);
int recover_from_table_lock_durable_info(const ObTableLockInfo &table_lock_info);
@ -493,12 +491,9 @@ public:
private:
int do_trans_end(
const bool commit,
const int64_t trans_version,
const int64_t final_log_ts,
const palf::SCN trans_version,
const palf::SCN final_scn,
const int end_code);
int clear_table_lock_(const bool is_commit,
const int64_t commit_version,
const int64_t commit_log_ts);
int clear_table_lock_(const bool is_commit,
const palf::SCN &commit_version,
const palf::SCN &commit_scn);

View File

@ -59,15 +59,15 @@ public:
virtual void inc_ref() = 0;
virtual void dec_ref() = 0;
virtual int trans_begin() = 0;
virtual int trans_end(const bool commit, const int64_t trans_version, const int64_t final_log_ts) = 0;
virtual int trans_end(const bool commit, const palf::SCN trans_version, const palf::SCN final_scn) = 0;
virtual int trans_clear() = 0;
virtual int elr_trans_preparing() = 0;
virtual int trans_kill() = 0;
virtual int trans_publish() = 0;
virtual int trans_replay_begin() = 0;
virtual int trans_replay_end(const bool commit,
const int64_t trans_version,
const int64_t final_log_ts,
const palf::SCN trans_version,
const palf::SCN final_scn,
const uint64_t log_cluster_version = 0,
const uint64_t checksum = 0) = 0;
virtual void print_callbacks() = 0;
@ -148,7 +148,7 @@ struct ObMergePriorityInfo
class ObIMemtable: public storage::ObITable
{
public:
ObIMemtable() : snapshot_version_(ObVersionRange::MAX_VERSION)
ObIMemtable() : snapshot_version_(palf::SCN::max_scn())
{}
virtual ~ObIMemtable() {}
@ -205,17 +205,14 @@ public:
const uint64_t table_id,
const storage::ObTableReadInfo &read_info,
const blocksstable::ObDatumRowkey &rowkey) = 0;
virtual int64_t get_frozen_trans_version() { return 0; }
virtual int major_freeze(const common::ObVersion &version)
{ UNUSED(version); return common::OB_SUCCESS; }
virtual int minor_freeze(const common::ObVersion &version)
{ UNUSED(version); return common::OB_SUCCESS; }
virtual void inc_pending_lob_count() {}
virtual void dec_pending_lob_count() {}
virtual int on_memtable_flushed() { return common::OB_SUCCESS; }
virtual bool can_be_minor_merged() { return false; }
void set_snapshot_version(const int64_t snapshot_version) { snapshot_version_ = snapshot_version; }
virtual int64_t get_snapshot_version() const override { return snapshot_version_; }
void set_snapshot_version(const palf::SCN snapshot_version) { snapshot_version_ = snapshot_version; }
// TODO: remove get_snapshot_version() and use get_snapshot_version_scn() instead of it
virtual int64_t get_snapshot_version() const override { return snapshot_version_.get_val_for_tx(); }
virtual palf::SCN get_snapshot_version_scn() const override { return snapshot_version_; }
virtual int64_t get_upper_trans_version() const override
{ return OB_NOT_SUPPORTED; }
virtual int64_t get_max_merged_trans_version() const override
@ -276,7 +273,7 @@ public:
return false;
}
protected:
int64_t snapshot_version_;
palf::SCN snapshot_version_;
};
////////////////////////////////////////////////////////////////////////////////////////////////////

View File

@ -202,7 +202,7 @@ int ObRedoLogGenerator::log_submitted(const ObCallbackScope &callbacks)
return ret;
}
int ObRedoLogGenerator::sync_log_succ(const int64_t log_ts, const ObCallbackScope &callbacks)
int ObRedoLogGenerator::sync_log_succ(const palf::SCN scn, const ObCallbackScope &callbacks)
{
// no need to submit log
// since the number of log callback is enough now
@ -219,9 +219,6 @@ int ObRedoLogGenerator::sync_log_succ(const int64_t log_ts, const ObCallbackScop
do {
ObITransCallback *iter = (ObITransCallback *)*cursor;
if (iter->need_fill_redo()) {
// TODO(handora.qc): fix it
palf::SCN scn;
scn.convert_for_lsn_allocator(log_ts);
iter->set_scn(scn);
if (OB_TMP_FAIL(iter->log_sync_cb(scn))) {
if (OB_SUCC(ret)) {
@ -232,7 +229,7 @@ int ObRedoLogGenerator::sync_log_succ(const int64_t log_ts, const ObCallbackScop
redo_sync_succ_cnt_ += 1;
}
} else {
TRANS_LOG(ERROR, "sync_log_succ error", K(ret), K(iter), K(iter->need_fill_redo()), K(log_ts));
TRANS_LOG(ERROR, "sync_log_succ error", K(ret), K(iter), K(iter->need_fill_redo()), K(scn));
}
} while (cursor != callbacks.end_ && !FALSE_IT(cursor++));
}

View File

@ -74,7 +74,7 @@ public:
const bool log_for_lock_node);
int search_unsubmitted_dup_tablet_redo();
int log_submitted(const ObCallbackScope &callbacks);
int sync_log_succ(const int64_t log_ts, const ObCallbackScope &callbacks);
int sync_log_succ(const palf::SCN scn, const ObCallbackScope &callbacks);
void sync_log_fail(const ObCallbackScope &callbacks);
ObITransCallback *get_generate_cursor() { return (ObITransCallback *)*generate_cursor_; }

View File

@ -272,17 +272,17 @@ int ObMemtableRowCompactor::init(ObMvccRow *row,
// So modification is guaranteed to be safety with another modification,
// while we need pay attention to the concurrency between lock_for_read
// and modification(such as compact)
int ObMemtableRowCompactor::compact(const int64_t snapshot_version)
int ObMemtableRowCompactor::compact(const palf::SCN snapshot_version)
{
int ret = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
} else if (0 >= snapshot_version || INT64_MAX == snapshot_version) {
} else if (!snapshot_version.is_valid() || palf::SCN::max_scn() == snapshot_version) {
STORAGE_LOG(ERROR, "unexpected snapshot version", K(ret), K(snapshot_version));
ret = OB_ERR_UNEXPECTED;
} else if (NULL != row_->latest_compact_node_ &&
snapshot_version <= row_->latest_compact_node_->trans_version_.get_val_for_lsn_allocator()) {
snapshot_version <= row_->latest_compact_node_->trans_version_) {
// concurrent do compact
} else {
ObTimeGuard tg("row compact", 50L * 1000L);
@ -306,7 +306,7 @@ int ObMemtableRowCompactor::compact(const int64_t snapshot_version)
// Find position from where compaction started forward or backward until reached
// oldest node or latest compaction node
void ObMemtableRowCompactor::find_start_pos_(const int64_t snapshot_version,
void ObMemtableRowCompactor::find_start_pos_(const palf::SCN snapshot_version,
ObMvccTransNode *&start)
{
int64_t search_cnt = 0;
@ -318,7 +318,7 @@ void ObMemtableRowCompactor::find_start_pos_(const int64_t snapshot_version,
// Traverse forward from list_head
// We go from head to find the suitable node for compact node start
if (palf::SCN::max_scn() == start->trans_version_ // skip uncommited
|| snapshot_version < start->trans_version_.get_val_for_lsn_allocator() // skip bigger txn
|| snapshot_version < start->trans_version_ // skip bigger txn
|| !start->is_committed()) { // skip uncommited
start = start->prev_;
search_cnt++;
@ -330,7 +330,7 @@ void ObMemtableRowCompactor::find_start_pos_(const int64_t snapshot_version,
// We need handle the bad case when elr, so we traverse from backward
// when there exists latest_compact_node
if (NULL != start->next_ // stop at null
&& snapshot_version >= start->next_->trans_version_.get_val_for_lsn_allocator() // stop at bigger txn
&& snapshot_version >= start->next_->trans_version_ // stop at bigger txn
&& start->next_->is_committed() // stop at uncommitted
&& palf::SCN::max_scn() != start->next_->trans_version_) { // stop at uncommitted
start = start->next_;
@ -395,7 +395,7 @@ int ObMemtableRowCompactor::try_cleanout_tx_node_during_compact_(ObTxTableGuard
return ret;
}
ObMvccTransNode *ObMemtableRowCompactor::construct_compact_node_(const int64_t snapshot_version,
ObMvccTransNode *ObMemtableRowCompactor::construct_compact_node_(const palf::SCN snapshot_version,
ObMvccTransNode *save)
{
int ret = OB_SUCCESS;
@ -437,7 +437,7 @@ ObMvccTransNode *ObMemtableRowCompactor::construct_compact_node_(const int64_t s
TRANS_LOG(INFO, "ignore aborted node when compact", K(*cur), K(*row_));
cur = cur->prev_;
find_committed_tnode = false;
} else if (snapshot_version < cur->trans_version_.get_val_for_lsn_allocator()) {
} else if (snapshot_version < cur->trans_version_) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected snapshot version", K(snapshot_version), K(*cur), K(*row_));
} else if (NULL == (mtd = reinterpret_cast<const ObMemtableDataHeader *>(cur->buf_))) {
@ -539,9 +539,6 @@ ObMvccTransNode *ObMemtableRowCompactor::construct_compact_node_(const int64_t s
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected trans version", K(ret), "node", *save);
} else {
// TODO(handora.qc):: fix it
palf::SCN snapshot_scn;
snapshot_scn.convert_for_lsn_allocator(snapshot_version);
trans_node->tx_id_ = save->tx_id_;
trans_node->seq_no_ = save->seq_no_;
trans_node->trans_version_ = save->trans_version_;
@ -551,7 +548,7 @@ ObMvccTransNode *ObMemtableRowCompactor::construct_compact_node_(const int64_t s
trans_node->type_ = NDT_COMPACT;
trans_node->flag_ = save->flag_;
trans_node->scn_ = save->scn_;
trans_node->set_snapshot_version_barrier(snapshot_scn);
trans_node->set_snapshot_version_barrier(snapshot_version);
TRANS_LOG(DEBUG, "success to compact row, ", K(trans_node->tx_id_), K(dml_flag), K(compact_row_cnt), KPC(save));
}
}

View File

@ -19,6 +19,7 @@
#include "lib/allocator/ob_small_allocator.h"
#include "lib/lock/ob_spin_lock.h"
#include "common/object/ob_object.h"
#include "logservice/palf/scn.h"
namespace oceanbase
{
@ -117,11 +118,11 @@ public:
common::ObIAllocator *node_alloc,
const bool for_replay);
// compact and refresh the update counter by snapshot version
int compact(const int64_t snapshot_version);
int compact(const palf::SCN snapshot_version);
private:
void find_start_pos_(const int64_t snapshot_version,
void find_start_pos_(const palf::SCN snapshot_version,
ObMvccTransNode *&save);
ObMvccTransNode *construct_compact_node_(const int64_t snapshot_version,
ObMvccTransNode *construct_compact_node_(const palf::SCN snapshot_version,
ObMvccTransNode *save);
int try_cleanout_tx_node_during_compact_(storage::ObTxTableGuard &tx_table_guard,
ObMvccTransNode *tnode);

View File

@ -212,6 +212,13 @@ public:
virtual OB_INLINE share::ObScnRange &get_scn_range() { return key_.scn_range_; }
virtual OB_INLINE bool is_trans_state_deterministic() { return get_upper_trans_version() < INT64_MAX; }
virtual int64_t get_snapshot_version() const { return key_.get_snapshot_version(); }
// TODO: remove it
virtual palf::SCN get_snapshot_version_scn() const
{
palf::SCN scn;
scn.convert_for_tx(key_.get_snapshot_version());
return scn;
}
virtual int64_t get_upper_trans_version() const { return get_snapshot_version(); }
virtual int64_t get_max_merged_trans_version() const { return get_snapshot_version(); }
virtual int get_frozen_schema_version(int64_t &schema_version) const = 0;

View File

@ -164,7 +164,11 @@ int ObStorageSchemaRecorder::replay_schema_log(
int64_t table_version = OB_INVALID_VERSION;
ObArenaAllocator tmp_allocator;
ObStorageSchema replay_storage_schema;
if (tablet_id_.is_special_merge_tablet()) {
// TODO: fix it
palf::SCN scn;
if (OB_FAIL(scn.convert_for_lsn_allocator(log_ts))) {
LOG_WARN("convert failed", K(log_ts), K(ret));
} else if (tablet_id_.is_special_merge_tablet()) {
// do nothing
} else if (OB_FAIL(serialization::decode_i64(buf, size, pos, &table_version))) {
// table_version
@ -177,8 +181,10 @@ int ObStorageSchemaRecorder::replay_schema_log(
} else if (OB_FAIL(replay_storage_schema.deserialize(tmp_allocator, buf, size, pos))) {
LOG_WARN("fail to deserialize storage schema", K(ret), K_(tablet_id));
} else if (FALSE_IT(replay_storage_schema.set_sync_finish(true))) {
} else if (OB_FAIL(tablet_handle_.get_obj()->save_multi_source_data_unit(&replay_storage_schema, log_ts,
true/*for_replay*/, memtable::MemtableRefOp::NONE))) {
} else if (OB_FAIL(tablet_handle_.get_obj()->save_multi_source_data_unit(&replay_storage_schema,
scn,
true/*for_replay*/,
memtable::MemtableRefOp::NONE))) {
LOG_WARN("failed to save storage schema on memtable", K(ret), K_(tablet_id), K(replay_storage_schema));
} else {
ATOMIC_SET(&max_saved_table_version_, table_version);

View File

@ -173,7 +173,7 @@ int ObSyncTabletSeqLogCb::on_failure()
} else if (OB_FAIL(tablet_handle.get_obj()->get_latest_autoinc_seq(autoinc_seq))) {
LOG_WARN("fail to get latest autoinc seq", K(ret));
} else if (OB_FAIL(tablet_handle.get_obj()->save_multi_source_data_unit(&autoinc_seq,
-1/*clog_ts*/,
palf::SCN::invalid_scn()/*scn*/,
false/*for_replay*/,
memtable::MemtableRefOp::DEC_REF,
true/*is_callback*/))) {

View File

@ -814,7 +814,7 @@ int ObLockMemtable::flush(palf::SCN recycle_scn,
scn_range.start_scn_.convert_for_gts(1);
scn_range.end_scn_ = freeze_scn_;
set_scn_range(scn_range);
set_snapshot_version(freeze_scn_.get_val_for_lsn_allocator());
set_snapshot_version(freeze_scn_);
ATOMIC_STORE(&is_frozen_, true);
}
}

View File

@ -2864,7 +2864,7 @@ int ObTablet::get_rec_log_ts(int64_t &rec_log_ts)
ret = OB_ERR_UNEXPECTED;
LOG_WARN("mt is NULL", KR(ret), K(handle));
} else {
rec_log_ts = mt->get_rec_log_ts();
rec_log_ts = mt->get_rec_scn().get_val_for_tx();
}
return ret;
}

View File

@ -621,11 +621,8 @@ int ObTablet::prepare_data(T &multi_source_data_unit, const transaction::ObMulSo
{
int ret = OB_SUCCESS;
//TODO(SCN): const palf::SCN scn = trans_flags.for_replay_ ? trans_flags.log_ts_ : palf::SCN::max_scn();
palf::SCN scn = palf::SCN::max_scn();
if (trans_flags.for_replay_) {
scn.convert_for_lsn_allocator(trans_flags.log_ts_);
}
const palf::SCN scn = trans_flags.for_replay_ ? trans_flags.scn_ : palf::SCN::max_scn();
TRANS_LOG(INFO, "prepare data when tx_end", K(multi_source_data_unit), K(tablet_meta_.tablet_id_));
if (IS_NOT_INIT) {
@ -720,7 +717,7 @@ int ObTablet::save_multi_source_data_unit(
memtable::ObMemtable *memtable = nullptr;
if (OB_FAIL(memtable_mgr_->get_memtable_for_multi_source_data_unit(memtable, msd->type()))) {
TRANS_LOG(WARN, "failed to get multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_scn));
} else if (OB_FAIL(memtable->save_multi_source_data_unit(msd, memtable_scn.get_val_for_inner_table_field(), for_replay, ref_op, is_callback))) {
} else if (OB_FAIL(memtable->save_multi_source_data_unit(msd, memtable_scn, for_replay, ref_op, is_callback))) {
TRANS_LOG(WARN, "failed to save multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_scn), K(ref_op));
}
}
@ -729,7 +726,7 @@ int ObTablet::save_multi_source_data_unit(
memtable::ObMemtable *memtable = nullptr;
if (OB_FAIL(memtable_mgr_->get_memtable_for_multi_source_data_unit(memtable, memtable::MultiSourceDataUnitType::TABLET_TX_DATA))) {
TRANS_LOG(WARN, "failed to get multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_scn));
} else if (OB_FAIL(memtable->save_multi_source_data_unit(msd, memtable_scn.get_val_for_inner_table_field(), for_replay, ref_op/*add_ref*/, is_callback/*false*/))) {
} else if (OB_FAIL(memtable->save_multi_source_data_unit(msd, memtable_scn, for_replay, ref_op/*add_ref*/, is_callback/*false*/))) {
TRANS_LOG(WARN, "failed to save multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_scn), K(ref_op));
}
} else {
@ -761,7 +758,7 @@ int ObTablet::save_multi_source_data_unit(
ob_usleep(100);
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(memtable->save_multi_source_data_unit(msd, memtable_scn.get_val_for_inner_table_field(), for_replay, ref_op, is_callback))) {
} else if (OB_FAIL(memtable->save_multi_source_data_unit(msd, memtable_scn, for_replay, ref_op, is_callback))) {
TRANS_LOG(WARN, "failed to save multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_scn), K(ref_op), K(for_replay));
}
}

View File

@ -345,10 +345,7 @@ int ObTabletBindingHelper::add_tablet_binding(
}
if (OB_SUCC(ret)) {
SCN scn;
if (OB_FAIL(scn.convert_tmp(trans_flags.log_ts_))) {
LOG_WARN("failed to convert_scn", K(trans_flags), K(ret));
} else if (OB_FAIL(tablet->set_multi_data_for_commit(info, scn, trans_flags.for_replay_, MemtableRefOp::NONE))) {
if (OB_FAIL(tablet->set_multi_data_for_commit(info, trans_flags.scn_, trans_flags.for_replay_, MemtableRefOp::NONE))) {
LOG_WARN("failed to save multi source data", K(ret));
}
}
@ -452,7 +449,7 @@ int ObTabletBindingHelper::check_skip_tx_end(const ObTabletID &tablet_id, const
ObTabletTxMultiSourceDataUnit tx_data;
ObTabletBindingHelper helper(ls, trans_flags);
if (OB_INVALID_TIMESTAMP == trans_flags.log_ts_) {
if (palf::SCN::invalid_scn() == trans_flags.scn_) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), K(tablet_id), K(ls));
} else if (OB_FAIL(helper.get_tablet(tablet_id, tablet_handle))) {
@ -465,7 +462,7 @@ int ObTabletBindingHelper::check_skip_tx_end(const ObTabletID &tablet_id, const
} else if (FALSE_IT(tablet = tablet_handle.get_obj())) {
} else if (OB_FAIL(tablet->get_tx_data(tx_data))) {
LOG_WARN("failed to get tx data", KR(ret));
} else if (tx_data.tx_scn_.get_val_for_lsn_allocator() >= trans_flags.log_ts_) {
} else if (tx_data.tx_scn_ >= trans_flags.scn_) {
skip = true;
}
return ret;
@ -612,13 +609,10 @@ int ObTabletBindingHelper::modify_tablet_binding_for_unbind(
int ret = OB_SUCCESS;
ObLSHandle ls_handle;
const ObTransID &tx_id = trans_flags.tx_id_;
const int64_t log_ts = trans_flags.log_ts_;
const palf::SCN scn = trans_flags.scn_;
const bool for_replay = trans_flags.for_replay_;
const palf::SCN commit_version = trans_flags.trans_version_;
SCN scn;
if (OB_FAIL(scn.convert_tmp(log_ts))) {
LOG_WARN("failed to get data", K(trans_flags), K(ret));
} else if (OB_FAIL(get_ls(arg.ls_id_, ls_handle))) {
if (OB_FAIL(get_ls(arg.ls_id_, ls_handle))) {
LOG_WARN("failed to get ls", K(ret));
} else {
ObTabletBindingHelper helper(*ls_handle.get_ls(), trans_flags);
@ -764,7 +758,7 @@ int ObTabletBindingHelper::get_tablet(const ObTabletID &tablet_id, ObTabletHandl
ObTabletTxMultiSourceDataUnit tx_data;
if (OB_FAIL(handle.get_obj()->get_tx_data(tx_data))) {
LOG_WARN("failed to get tx data", K(ret), K(key));
} else if (OB_INVALID_TIMESTAMP != trans_flags_.log_ts_ && trans_flags_.log_ts_ <= tx_data.tx_scn_.get_val_for_lsn_allocator()) {
} else if (palf::SCN::invalid_scn() != trans_flags_.scn_ && trans_flags_.scn_ <= tx_data.tx_scn_) {
ret = OB_NO_NEED_UPDATE;
LOG_INFO("tablet frozen", K(ret), K(key), K(trans_flags_), K(tx_data));
}
@ -784,7 +778,7 @@ int ObTabletBindingHelper::replay_get_tablet(const ObTabletMapKey &key, ObTablet
if (OB_FAIL(ObTabletCreateDeleteHelper::get_tablet(key, tablet_handle))) {
if (OB_TABLET_NOT_EXIST != ret) {
LOG_WARN("failed to get tablet", K(ret), K(key));
} else if (trans_flags_.log_ts_ < tablet_change_checkpoint_scn.get_val_for_lsn_allocator()) {
} else if (trans_flags_.scn_ < tablet_change_checkpoint_scn) {
LOG_WARN("tablet already deleted", K(ret), K(key), K(trans_flags_), K(tablet_change_checkpoint_scn));
} else {
ret = OB_EAGAIN;
@ -876,14 +870,11 @@ int ObTabletBindingHelper::lock_tablet_binding(ObTabletHandle &handle, const ObM
{
int ret = OB_SUCCESS;
const ObTransID &tx_id = trans_flags.tx_id_;
const int64_t log_ts = trans_flags.log_ts_;
const palf::SCN scn = trans_flags.scn_;
const bool for_replay = trans_flags.for_replay_;
ObTablet *tablet = handle.get_obj();
ObTabletTxMultiSourceDataUnit tx_data;
SCN scn;
if (OB_FAIL(scn.convert_tmp(log_ts))) {
LOG_WARN("failed to get data", K(trans_flags), K(ret));
} else if (OB_FAIL(tablet->get_tx_data(tx_data))) {
if (OB_FAIL(tablet->get_tx_data(tx_data))) {
LOG_WARN("failed to get tx data", K(ret));
} else {
const ObTransID old_tx_id = tx_data.tx_id_;
@ -904,7 +895,7 @@ int ObTabletBindingHelper::lock_tablet_binding(ObTabletHandle &handle, const ObM
if (OB_FAIL(ret)) {
} else if (need_update && OB_FAIL(tablet->set_tx_data(tx_data, memtable_scn, for_replay,
update_cache, ref_op, false/*is_callback*/))) {
LOG_WARN("failed to save tx data", K(ret), K(tx_data), K(log_ts), K(for_replay), K(ref_op));
LOG_WARN("failed to save tx data", K(ret), K(tx_data), K(scn), K(for_replay), K(ref_op));
}
}
return ret;
@ -944,14 +935,11 @@ int ObTabletBindingHelper::set_scn(ObTabletHandle &handle, const ObMulSourceData
{
int ret = OB_SUCCESS;
const ObTransID &tx_id = trans_flags.tx_id_;
const int64_t log_ts = trans_flags.log_ts_;
const palf::SCN scn = trans_flags.scn_;
const bool for_replay = trans_flags.for_replay_;
ObTablet *tablet = handle.get_obj();
ObTabletTxMultiSourceDataUnit data;
SCN scn;
if (OB_FAIL(scn.convert_tmp(log_ts))) {
LOG_WARN("failed to get data", K(trans_flags), K(ret));
} else if (OB_FAIL(tablet->get_tx_data(data))) {
if (OB_FAIL(tablet->get_tx_data(data))) {
LOG_WARN("failed to get data", K(ret));
} else if (OB_UNLIKELY(data.tx_id_ != tx_id)) {
ret = OB_ERR_UNEXPECTED;
@ -1004,17 +992,15 @@ int ObTabletBindingHelper::unlock_tablet_binding(ObTabletHandle &handle, const O
{
int ret = OB_SUCCESS;
const ObTransID &tx_id = trans_flags.tx_id_;
const int64_t log_ts = trans_flags.log_ts_;
const palf::SCN scn = trans_flags.scn_;
const bool for_replay = trans_flags.for_replay_;
const bool for_commit = trans_flags.notify_type_ == NotifyType::ON_COMMIT;
ObTablet *tablet = handle.get_obj();
ObTabletTxMultiSourceDataUnit tx_data;
SCN scn;
LOG_INFO("unlock_tablet_binding", KPC(tablet), K(trans_flags));
if (OB_FAIL(tablet->get_tx_data(tx_data))) {
LOG_WARN("failed to get tx data", K(ret));
} else if (OB_FAIL(scn.convert_tmp(log_ts))) {
LOG_WARN("failed to convert_scn", K(trans_flags), K(ret));
} else {
const SCN old_scn = tx_data.tx_scn_;
if (tx_data.tx_id_ == tx_id) {

View File

@ -96,15 +96,12 @@ int ObTabletCreateDeleteHelper::prepare_create_tablets(
int ret = OB_SUCCESS;
const ObLSID &ls_id = ls_.get_ls_id();
const ObTransID &tx_id = trans_flags.tx_id_;
SCN scn;
const int64_t log_ts = trans_flags.log_ts_;
const palf::SCN scn = trans_flags.scn_;
const bool is_clog_replaying = trans_flags.for_replay_;
const int64_t create_begin_ts = ObTimeUtility::current_monotonic_time();
ObTabletBindingPrepareCtx binding_ctx;
if (OB_FAIL(scn.convert_tmp(log_ts))) {
LOG_WARN("failed to convert_tmp", K(ret), K(scn));
} else if (OB_UNLIKELY(!arg.is_valid())) {
if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), K(PRINT_CREATE_ARG(arg)));
} else if (OB_UNLIKELY(arg.id_ != ls_id)) {
@ -142,16 +139,14 @@ int ObTabletCreateDeleteHelper::replay_prepare_create_tablets(
const ObMulSourceDataNotifyArg &trans_flags)
{
int ret = OB_SUCCESS;
SCN scn;
SCN scn = trans_flags.scn_;
const ObBatchCreateTabletArg *final_arg = &arg;
ObBatchCreateTabletArg new_arg;
ObSArray<ObTabletID> existed_tablet_id_array;
NonLockedHashSet existed_tablet_id_set;
bool need_create = true;
if (OB_FAIL(scn.convert_tmp(trans_flags.log_ts_))) {
LOG_WARN("failed to convert_tmp", K(ret), K(scn));
} else if (OB_UNLIKELY(!arg.is_valid())) {
if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), K(PRINT_CREATE_ARG(arg)), K(scn));
} else if (OB_FAIL(existed_tablet_id_set.create(arg.get_tablet_count()))) {
@ -290,7 +285,6 @@ int ObTabletCreateDeleteHelper::handle_special_tablets_for_replay(
const ObTabletID &tablet_id = existed_tablet_id_array.at(i);
key.tablet_id_ = tablet_id;
tx_data.reset();
SCN scn;
bool tx_data_is_valid = false;
if (OB_FAIL(get_tablet(key, tablet_handle))) {
LOG_WARN("failed to get tablet", K(ret), K(key));
@ -299,12 +293,10 @@ int ObTabletCreateDeleteHelper::handle_special_tablets_for_replay(
} else if (OB_UNLIKELY(tx_data_is_valid)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tx data should not be valid", K(ret), K(key));
} else if (OB_FAIL(scn.convert_for_lsn_allocator(trans_flags.log_ts_))) {
LOG_WARN("failed to convert scn", K(ret), K(key), K(trans_flags));
} else {
int tmp_ret = OB_SUCCESS;
tx_data.tx_id_ = trans_flags.tx_id_;
tx_data.tx_scn_ = scn;
tx_data.tx_scn_ = trans_flags.scn_;
tx_data.tablet_status_ = ObTabletStatus::CREATING;
const bool update_cache = false;
@ -332,8 +324,7 @@ int ObTabletCreateDeleteHelper::set_scn(
const ObLSID &ls_id = ls_.get_ls_id();
ObTabletMapKey key;
key.ls_id_ = ls_id;
SCN scn;
scn.convert_tmp(trans_flags.log_ts_);
SCN scn = trans_flags.scn_;
const int64_t tx_id = trans_flags.tx_id_;
const bool for_replay = false;
@ -533,12 +524,10 @@ int ObTabletCreateDeleteHelper::redo_create_tablets(
} else if (is_clog_replaying) {
LOG_INFO("in clog replaying procedure, do nothing while redo log callback", K(ret));
} else {
SCN scn;
SCN scn = trans_flags.scn_;
bool is_valid = true;
ObSArray<ObTabletCreateInfo> tablet_create_info_array;
if (OB_FAIL(scn.convert_tmp(trans_flags.log_ts_))) {
LOG_WARN("failed to convert_tmp", K(ret), K(scn));
} else if (OB_FAIL(check_tablet_existence(arg, is_valid))) {
if (OB_FAIL(check_tablet_existence(arg, is_valid))) {
LOG_ERROR("failed to check tablet existence", K(ret), K(PRINT_CREATE_ARG(arg)));
} else if (!is_valid) {
ret = OB_ERR_UNEXPECTED;
@ -632,10 +621,10 @@ int ObTabletCreateDeleteHelper::do_commit_create_tablet(
LOG_WARN("failed to get tablet", K(ret), K(key));
} else if (OB_FAIL(tablet_handle.get_obj()->get_tx_data(tx_data))) {
LOG_WARN("failed to get tx data", K(ret), K(tablet_handle));
} else if (trans_flags.for_replay_ && trans_flags.log_ts_ <= tx_data.tx_scn_.get_val_for_inner_table_field()) {
} else if (trans_flags.for_replay_ && trans_flags.scn_ <= tx_data.tx_scn_) {
// replaying procedure, clog ts is smaller than tx log ts, just skip
LOG_INFO("skip commit create tablet", K(ret), K(key), K(trans_flags), K(tx_data));
} else if (OB_UNLIKELY(!trans_flags.for_replay_ && trans_flags.log_ts_ <= tx_data.tx_scn_.get_val_for_inner_table_field())) {
} else if (OB_UNLIKELY(!trans_flags.for_replay_ && trans_flags.scn_ <= tx_data.tx_scn_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("log ts is smaller than tx log ts", K(ret), K(key), K(trans_flags), K(tx_data));
} else if (OB_UNLIKELY(trans_flags.tx_id_ != tx_data.tx_id_)) {
@ -645,8 +634,8 @@ int ObTabletCreateDeleteHelper::do_commit_create_tablet(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet status is not CREATING", K(ret), K(key), K(trans_flags), K(tx_data));
} else {
const int64_t tx_log_ts = trans_flags.log_ts_;
const int64_t memtable_log_ts = trans_flags.log_ts_;
const int64_t tx_log_ts = trans_flags.scn_.get_val_for_lsn_allocator();
const int64_t memtable_log_ts = trans_flags.scn_.get_val_for_lsn_allocator();
if (OB_FAIL(set_tablet_final_status(tablet_handle, ObTabletStatus::NORMAL,
tx_log_ts, memtable_log_ts, trans_flags.for_replay_))) {
@ -697,10 +686,12 @@ int ObTabletCreateDeleteHelper::do_abort_create_tablets(
ObTabletMapKey key;
key.ls_id_ = ls_.get_ls_id();
if (OB_UNLIKELY(trans_flags.log_ts_ < OB_INVALID_TIMESTAMP)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid log ts", K(ret), K(trans_flags));
} else if (!trans_flags.is_redo_synced()) {
// TODO: fix it after multi source data refactor
// if (OB_UNLIKELY(trans_flags.log_ts_ < OB_INVALID_TIMESTAMP)) {
// ret = OB_ERR_UNEXPECTED;
// LOG_WARN("invalid log ts", K(ret), K(trans_flags));
// } else
if (!trans_flags.is_redo_synced()) {
// on redo cb has not been called
// just remove tablets directly
if (OB_FAIL(roll_back_remove_tablets(arg, trans_flags))) {
@ -881,7 +872,7 @@ int ObTabletCreateDeleteHelper::roll_back_remove_tablet(
LOG_INFO("memtable does not have msd, do nothing", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(tablet->get_tx_data(tx_data))) {
LOG_WARN("failed to get tx data", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(tablet->save_multi_source_data_unit(&tx_data, ObScnRange::MAX_TS,
} else if (OB_FAIL(tablet->save_multi_source_data_unit(&tx_data, ObScnRange::MAX_SCN,
trans_flags.for_replay_, MemtableRefOp::DEC_REF, true/*is_callback*/))) {
LOG_WARN("failed to save msd", K(ret), K(ls_id), K(tablet_id));
}
@ -921,12 +912,12 @@ int ObTabletCreateDeleteHelper::do_abort_create_tablet(
if (OB_FAIL(tablet_handle.get_obj()->get_tx_data(tx_data))) {
LOG_WARN("failed to get tx data", K(ret), K(tablet_handle));
} else if (trans_flags.for_replay_ && trans_flags.log_ts_ <= tx_data.tx_scn_.get_val_for_inner_table_field()) {
} else if (trans_flags.for_replay_ && trans_flags.scn_ <= tx_data.tx_scn_) {
// replaying procedure, clog ts is smaller than tx log ts, just skip
LOG_INFO("skip abort create tablet", K(ret), K(tablet_id), K(trans_flags), K(tx_data));
} else if (OB_UNLIKELY(!trans_flags.for_replay_
&& tx_data.tx_scn_.get_val_for_inner_table_field() != OB_MAX_SCN_TS_NS
&& trans_flags.log_ts_ <= tx_data.tx_scn_.get_val_for_inner_table_field())) {
&& tx_data.tx_scn_ != palf::SCN::max_scn()
&& trans_flags.scn_ <= tx_data.tx_scn_)) {
// If tx log ts equals ObScnRange::MAX_TS, it means redo callback has not been called.
// Thus, we should handle this situation
ret = OB_ERR_UNEXPECTED;
@ -938,8 +929,8 @@ int ObTabletCreateDeleteHelper::do_abort_create_tablet(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet status is not CREATING", K(ret), K(tablet_id), K(trans_flags), K(tx_data));
} else {
const int64_t tx_log_ts = trans_flags.log_ts_;
const int64_t memtable_log_ts = trans_flags.log_ts_;
const int64_t tx_log_ts = trans_flags.scn_.get_val_for_inner_table_field();
const int64_t memtable_log_ts = trans_flags.scn_.get_val_for_inner_table_field();
if (OB_FAIL(set_tablet_final_status(tablet_handle, ObTabletStatus::DELETED,
tx_log_ts, memtable_log_ts, trans_flags.for_replay_))) {
@ -980,7 +971,7 @@ int ObTabletCreateDeleteHelper::prepare_remove_tablets(
} else if (trans_flags.for_replay_) {
if (OB_FAIL(tablet_id_array.reserve(tablet_cnt))) {
LOG_WARN("fail to allocate memory for tablet_id_array", K(ret), K(tablet_cnt));
} else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.log_ts_, tablet_id_array))) {
} else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.scn_.get_val_for_inner_table_field(), tablet_id_array))) {
LOG_WARN("failed to replay verify tablets", K(ret), K(trans_flags));
}
}
@ -999,23 +990,18 @@ int ObTabletCreateDeleteHelper::prepare_remove_tablets(
} else if (OB_FAIL(tablet_handle.get_obj()->get_tx_data(cur_tx_data))) {
LOG_WARN("failed to get tx data", K(ret), K(key));
} else {
SCN flag_scn;
if (OB_INVALID_SCN_VAL != trans_flags.log_ts_
&& OB_FAIL(flag_scn.convert_for_lsn_allocator(trans_flags.log_ts_))) {
LOG_WARN("failed to convert_tmp", K(ret), K(trans_flags));
} else {
ObTabletTxMultiSourceDataUnit tx_data;
tx_data.tx_id_ = trans_flags.tx_id_;
tx_data.tx_scn_ = trans_flags.for_replay_ ? flag_scn: cur_tx_data.tx_scn_;
tx_data.tablet_status_ = ObTabletStatus::DELETING;
SCN flag_scn = trans_flags.scn_;
ObTabletTxMultiSourceDataUnit tx_data;
tx_data.tx_id_ = trans_flags.tx_id_;
tx_data.tx_scn_ = trans_flags.for_replay_ ? flag_scn: cur_tx_data.tx_scn_;
tx_data.tablet_status_ = ObTabletStatus::DELETING;
if (OB_FAIL(ObTabletBindingHelper::lock_and_set_tx_data(tablet_handle, tx_data, trans_flags.for_replay_))) {
LOG_WARN("failed to lock tablet binding", K(ret), K(key), K(tx_data));
// TODO(bowen.gbw): temproarily swallow 4200 error while prepare remove tablets
if (trans_flags.for_replay_ && OB_HASH_EXIST == ret) {
ret = OB_SUCCESS;
LOG_INFO("swallow 4200 error", K(ret), K(key), K(tx_data), K(trans_flags));
}
if (OB_FAIL(ObTabletBindingHelper::lock_and_set_tx_data(tablet_handle, tx_data, trans_flags.for_replay_))) {
LOG_WARN("failed to lock tablet binding", K(ret), K(key), K(tx_data));
// TODO(bowen.gbw): temproarily swallow 4200 error while prepare remove tablets
if (trans_flags.for_replay_ && OB_HASH_EXIST == ret) {
ret = OB_SUCCESS;
LOG_INFO("swallow 4200 error", K(ret), K(key), K(tx_data), K(trans_flags));
}
}
}
@ -1084,7 +1070,7 @@ int ObTabletCreateDeleteHelper::redo_remove_tablets(
int ret = OB_SUCCESS;
const ObLSID &ls_id = ls_.get_ls_id();
const bool is_clog_replaying = trans_flags.for_replay_;
const int64_t log_ts = trans_flags.log_ts_;
const palf::SCN scn = trans_flags.scn_;
if (OB_UNLIKELY(!arg.is_valid())) {
ret = OB_INVALID_ARGUMENT;
@ -1100,14 +1086,11 @@ int ObTabletCreateDeleteHelper::redo_remove_tablets(
ObTabletMapKey key;
key.ls_id_ = ls_id;
for (int64_t i = 0; OB_SUCC(ret) && i < arg.tablet_ids_.count(); ++i) {
SCN scn;
key.tablet_id_ = arg.tablet_ids_.at(i);
if (OB_FAIL(get_tablet(key, tablet_handle))) {
LOG_WARN("failed to get tablet", K(ret), K(key));
} else if (OB_FAIL(tablet_handle.get_obj()->get_tx_data(tx_data))) {
LOG_WARN("failed to get tx data", K(ret), K(tablet_handle));
} else if (OB_FAIL(scn.convert_tmp(log_ts))) {
LOG_WARN("failed to convert_tmp", K(ret), K(scn));
} else if (OB_FAIL(tablet_handle.get_obj()->set_tx_scn(tx_data.tx_id_, scn, false/*for_replay*/))) {
LOG_WARN("failed to set tx data", K(ret), K(tablet_handle), K(tx_data), K(scn));
}
@ -1137,7 +1120,7 @@ int ObTabletCreateDeleteHelper::commit_remove_tablets(
LOG_WARN("unexpected arg", K(ret), K(arg), K(ls_id));
} else if (trans_flags.for_replay_) {
if (OB_FAIL(tablet_id_array.reserve(arg.tablet_ids_.count()))) {
} else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.log_ts_, tablet_id_array))) {
} else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.scn_.get_val_for_inner_table_field(), tablet_id_array))) {
LOG_WARN("failed to replay verify tablets", K(ret), K(trans_flags));
}
}
@ -1173,7 +1156,7 @@ int ObTabletCreateDeleteHelper::do_commit_remove_tablet(
LOG_WARN("failed to get tablet", K(ret), K(key));
} else if (OB_FAIL(tablet_handle.get_obj()->get_tx_data(tx_data))) {
LOG_WARN("failed to get tx data", K(ret), K(key));
} else if (OB_UNLIKELY(!trans_flags.for_replay_ && trans_flags.log_ts_ <= tx_data.tx_scn_.get_val_for_inner_table_field())) {
} else if (OB_UNLIKELY(!trans_flags.for_replay_ && trans_flags.scn_ <= tx_data.tx_scn_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("log ts is smaller than tx log ts", K(ret), K(key), K(trans_flags), K(tx_data));
} else if (OB_UNLIKELY(trans_flags.tx_id_ != tx_data.tx_id_)) {
@ -1183,8 +1166,8 @@ int ObTabletCreateDeleteHelper::do_commit_remove_tablet(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet status is not DELETING", K(ret), K(key), K(trans_flags), K(tx_data));
} else {
const int64_t tx_log_ts = trans_flags.log_ts_;
const int64_t memtable_log_ts = trans_flags.log_ts_;
const int64_t tx_log_ts = trans_flags.scn_.get_val_for_inner_table_field();
const int64_t memtable_log_ts = trans_flags.scn_.get_val_for_inner_table_field();
if (OB_FAIL(set_tablet_final_status(tablet_handle, ObTabletStatus::DELETED,
tx_log_ts, memtable_log_ts, trans_flags.for_replay_))) {
@ -1216,7 +1199,7 @@ int ObTabletCreateDeleteHelper::abort_remove_tablets(
LOG_WARN("unexpected arg", K(ret), K(arg), K(ls_id));
} else if (trans_flags.for_replay_) {
if (OB_FAIL(tablet_id_array.reserve(arg.tablet_ids_.count()))) {
} else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.log_ts_, tablet_id_array))) {
} else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.scn_.get_val_for_inner_table_field(), tablet_id_array))) {
LOG_WARN("failed to replay verify tablets", K(ret), K(trans_flags));
}
}
@ -1271,8 +1254,8 @@ int ObTabletCreateDeleteHelper::do_abort_remove_tablet(
} else if (OB_FAIL(tablet_handle.get_obj()->get_tx_data(tx_data))) {
LOG_WARN("failed to get tx data", K(ret), K(key));
} else if (OB_UNLIKELY(!trans_flags.for_replay_
&& trans_flags.log_ts_ != OB_INVALID_TIMESTAMP
&& trans_flags.log_ts_ <= tx_data.tx_scn_.get_val_for_inner_table_field())) {
&& trans_flags.scn_ != palf::SCN::invalid_scn()
&& trans_flags.scn_ <= tx_data.tx_scn_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("log ts is smaller than tx log ts", K(ret), K(key), K(trans_flags), K(tx_data));
} else if (OB_UNLIKELY(trans_flags.tx_id_ != tx_data.tx_id_)) {
@ -1314,7 +1297,9 @@ int ObTabletCreateDeleteHelper::do_abort_remove_tablet(
}
} else {
const int64_t tx_log_ts = tx_data.tx_scn_.get_val_for_inner_table_field();
const int64_t memtable_log_ts = (OB_INVALID_TIMESTAMP == trans_flags.log_ts_) ? share::ObScnRange::MAX_TS: trans_flags.log_ts_;
const int64_t memtable_log_ts = (palf::SCN::invalid_scn() == trans_flags.scn_)
? share::ObScnRange::MAX_TS
: trans_flags.scn_.get_val_for_inner_table_field();
if (OB_FAIL(set_tablet_final_status(tablet_handle, ObTabletStatus::NORMAL,
tx_log_ts, memtable_log_ts, trans_flags.for_replay_, ref_op))) {
@ -2287,21 +2272,15 @@ int ObTabletCreateDeleteHelper::do_create_tablet(
ObFreezer *freezer = ls_.get_freezer();
bool need_create_empty_major_sstable = false;
const ObTransID &tx_id = trans_flags.tx_id_;
SCN scn;
SCN create_scn;
const bool for_replay = trans_flags.for_replay_;
const int64_t log_ts = for_replay ? trans_flags.log_ts_ : share::ObScnRange::MAX_TS;
const int64_t create_ts = trans_flags.log_ts_; // may be invalid
MemtableRefOp ref_op = for_replay ? MemtableRefOp::NONE : MemtableRefOp::INC_REF;
SCN scn = trans_flags.for_replay_ ? trans_flags.scn_ : palf::SCN::max_scn();
SCN create_scn = trans_flags.scn_;
ObMetaDiskAddr mem_addr;
ObTabletTableStoreFlag table_store_flag;
table_store_flag.set_with_major_sstable();
if (OB_FAIL(mem_addr.set_mem_addr(0, sizeof(ObTablet)))) {
LOG_WARN("fail to set memory address", K(ret));
} else if (OB_FAIL(scn.convert_tmp(log_ts))) {
LOG_WARN("failed to convert_tmp", K(ret), K(scn));
} else if (OB_FAIL(create_scn.convert_tmp(create_ts))) {
LOG_WARN("failed to convert_tmp", K(ret), K(create_scn));
} else if (OB_FAIL(acquire_tablet(key, tablet_handle, false/*only acquire*/))) {
LOG_WARN("failed to acquire tablet", K(ret), K(key));
} else if (OB_FAIL(check_need_create_empty_major_sstable(table_schema, need_create_empty_major_sstable))) {

View File

@ -209,7 +209,7 @@ int ObTabletMemtableMgr::create_memtable(const int64_t clog_checkpoint_ts,
last_frozen_memtable->resolve_right_boundary();
TRANS_LOG(INFO, "[resolve_right_boundary] create_memtable", K(for_replay), K(ls_id), KPC(last_frozen_memtable));
if (memtable != last_frozen_memtable) {
memtable->resolve_left_boundary(last_frozen_memtable->get_end_log_ts());
memtable->resolve_left_boundary(last_frozen_memtable->get_end_scn());
}
}
// there is no frozen memtable and new sstable will not be generated,
@ -221,7 +221,7 @@ int ObTabletMemtableMgr::create_memtable(const int64_t clog_checkpoint_ts,
} else if (OB_FAIL(get_newest_snapshot_version(new_snapshot_version))){
LOG_WARN("failed to get newest snapshot_version", K(ret), K(ls_id), K(tablet_id_), K(new_snapshot_version));
} else {
memtable->resolve_left_boundary(new_clog_checkpoint_scn.get_val_for_lsn_allocator());
memtable->resolve_left_boundary(new_clog_checkpoint_scn);
}
time_guard.click("init memtable");
@ -367,7 +367,9 @@ ObMemtable *ObTabletMemtableMgr::get_last_frozen_memtable_() const
return memtable;
}
int ObTabletMemtableMgr::resolve_left_boundary_for_active_memtable(ObIMemtable *memtable, int64_t start_log_ts, int64_t snapshot_version)
int ObTabletMemtableMgr::resolve_left_boundary_for_active_memtable(ObIMemtable *memtable,
palf::SCN start_scn,
palf::SCN snapshot_scn)
{
ObTableHandleV2 handle;
ObIMemtable *active_memtable = nullptr;
@ -381,8 +383,8 @@ int ObTabletMemtableMgr::resolve_left_boundary_for_active_memtable(ObIMemtable *
} else if (OB_FAIL(handle.get_memtable(active_memtable))) {
LOG_WARN("fail to get active memtable", K(ret));
} else {
// set the start_log_ts of the new memtable
static_cast<ObMemtable*>(active_memtable)->resolve_left_boundary(start_log_ts);
// set the start_scn of the new memtable
static_cast<ObMemtable*>(active_memtable)->resolve_left_boundary(start_scn);
}
if (OB_ENTRY_NOT_EXIST== ret) {
ret = OB_SUCCESS;

View File

@ -83,7 +83,9 @@ public:
const bool include_active_memtable = true);
int get_memtables_nolock(ObTableHdlArray &handle);
int get_first_frozen_memtable(ObTableHandleV2 &handle) const;
int resolve_left_boundary_for_active_memtable(memtable::ObIMemtable *memtable, int64_t start_log_ts, int64_t snapshot_version);
int resolve_left_boundary_for_active_memtable(memtable::ObIMemtable *memtable,
palf::SCN start_scn,
palf::SCN snapshot_version);
int unset_logging_blocked_for_active_memtable(memtable::ObIMemtable *memtable);
int set_is_tablet_freeze_for_active_memtable(memtable::ObIMemtable *&memtable,
bool is_force_freeze = false);

View File

@ -149,7 +149,8 @@ int ObTabletMeta::init(
} else if (OB_UNLIKELY(!ls_id.is_valid())
|| OB_UNLIKELY(!tablet_id.is_valid())
|| OB_UNLIKELY(!data_tablet_id.is_valid())
|| OB_UNLIKELY(!create_scn.is_valid())
// TODO: fix it after multi source data refactor
// || OB_UNLIKELY(!create_scn.is_valid())
|| OB_UNLIKELY(!snapshot_version.is_valid())
|| OB_UNLIKELY(lib::Worker::CompatMode::INVALID == compat_mode)) {
ret = OB_INVALID_ARGUMENT;

View File

@ -235,7 +235,7 @@ int ObMulSourceTxDataNotifier::notify_table_lock(
// TABLELOCK only need deal with replay process, but not apply.
// the replay process will produce a lock op and will be dealt at trans end.
} else if (OB_FAIL(mt_ctx->replay_lock(lock_op,
arg.log_ts_))) {
arg.scn_))) {
TRANS_LOG(WARN, "replay lock failed", K(ret));
} else {
// do nothing

View File

@ -996,7 +996,7 @@ void ObTxExecInfo::reset()
max_applying_part_log_no_ = INT64_MAX;
max_submitted_seq_no_ = 0;
checksum_ = 0;
checksum_log_ts_ = 0;
checksum_scn_.set_min();
max_durable_lsn_.reset();
data_complete_ = false;
is_dup_tx_ = false;
@ -1035,7 +1035,7 @@ OB_SERIALIZE_MEMBER(ObTxExecInfo,
max_applying_part_log_no_,
max_submitted_seq_no_,
checksum_,
checksum_log_ts_,
checksum_scn_,
max_durable_lsn_,
data_complete_,
is_dup_tx_,

View File

@ -1725,7 +1725,7 @@ public:
K_(max_applying_part_log_no),
K_(max_submitted_seq_no),
K_(checksum),
K_(checksum_log_ts),
K_(checksum_scn),
K_(max_durable_lsn),
K_(data_complete),
K_(is_dup_tx),
@ -1750,7 +1750,7 @@ public:
int64_t max_applying_part_log_no_; // start from 0 on follower and always be INT64_MAX on leader
int64_t max_submitted_seq_no_; // maintains on Leader and transfer to Follower via ActiveInfoLog
uint64_t checksum_;
int64_t checksum_log_ts_;
palf::SCN checksum_scn_;
palf::LSN max_durable_lsn_;
bool data_complete_;
bool is_dup_tx_;
@ -1768,7 +1768,7 @@ static const int64_t USEC_PER_SEC = 1000 * 1000;
struct ObMulSourceDataNotifyArg
{
ObTransID tx_id_;
int64_t log_ts_; // the log ts of current notify type
palf::SCN scn_; // the log ts of current notify type
// in case of abort transaction, trans_version_ is invalid
palf::SCN trans_version_;
bool for_replay_;
@ -1778,7 +1778,7 @@ struct ObMulSourceDataNotifyArg
bool redo_synced_;
TO_STRING_KV(K_(tx_id),
K_(log_ts),
K_(scn),
K_(trans_version),
K_(for_replay),
K_(notify_type),

View File

@ -778,7 +778,7 @@ int ObPartTransCtx::iterate_tx_obj_lock_op(ObLockOpIterator &iter) const
return ret;
}
bool ObPartTransCtx::need_update_schema_version(const int64_t log_id, const int64_t log_ts)
bool ObPartTransCtx::need_update_schema_version(const int64_t log_id, const palf::SCN)
{
// const int64_t restore_snapshot_version = ls_tx_ctx_mgr_->get_restore_snapshot_version();
// const int64_t last_restore_log_id = ls_tx_ctx_mgr_->get_last_restore_log_id();
@ -796,8 +796,8 @@ int ObPartTransCtx::trans_replay_abort_(const palf::SCN &final_log_ts)
int ret = OB_SUCCESS;
if (OB_FAIL(mt_ctx_.trans_replay_end(false, /*commit*/
ctx_tx_data_.get_commit_version().get_val_for_lsn_allocator(),
final_log_ts.get_val_for_lsn_allocator()))) {
ctx_tx_data_.get_commit_version(),
final_log_ts))) {
TRANS_LOG(WARN, "transaction replay end error", KR(ret), KPC(this));
}
@ -818,8 +818,8 @@ int ObPartTransCtx::trans_replay_commit_(const palf::SCN &commit_version,
} else {
int64_t freeze_ts = 0;
if (OB_FAIL(mt_ctx_.trans_replay_end(true, /*commit*/
commit_version.get_val_for_lsn_allocator(),
final_log_ts.get_val_for_lsn_allocator(),
commit_version,
final_log_ts,
log_cluster_version,
checksum))) {
TRANS_LOG(WARN, "transaction replay end error", KR(ret), K(commit_version), K(checksum),
@ -1243,7 +1243,7 @@ int ObPartTransCtx::recover_tx_ctx_table_info(const ObTxCtxTableInfo &ctx_info)
} else if (OB_FAIL(deep_copy_mds_array(ctx_info.exec_info_.multi_data_source_))) {
TRANS_LOG(WARN, "deep copy ctx_info mds_array failed", K(ret));
} else if (FALSE_IT(mt_ctx_.update_checksum(exec_info_.checksum_,
exec_info_.checksum_log_ts_))) {
exec_info_.checksum_scn_))) {
TRANS_LOG(ERROR, "recover checksum failed", K(ret), KPC(this), K(ctx_info));
} else if (!is_local_tx_() && OB_FAIL(ObTxCycleTwoPhaseCommitter::recover_from_tx_table())) {
TRANS_LOG(ERROR, "recover_from_tx_table failed", K(ret), KPC(this));
@ -1511,13 +1511,14 @@ int ObPartTransCtx::on_dist_end_(const bool commit)
int64_t start_us, end_us;
start_us = end_us = 0;
const int64_t trans_version = commit ? ctx_tx_data_.get_commit_version().get_val_for_lsn_allocator() : -1;
const palf::SCN trans_version = commit ? ctx_tx_data_.get_commit_version() : palf::SCN::invalid_scn();
// Distributed transactions need to wait for the commit log majority successfully before
// unlocking. If you want to know the reason, it is in the ::do_dist_commit
start_us = ObTimeUtility::fast_current_time();
if (OB_FAIL(mt_ctx_.trans_end(commit, trans_version,
ctx_tx_data_.get_end_log_ts().get_val_for_lsn_allocator()))) {
if (OB_FAIL(mt_ctx_.trans_end(commit,
trans_version,
ctx_tx_data_.get_end_log_ts()))) {
TRANS_LOG(WARN, "trans end error", KR(ret), K(commit), K(trans_version), "context", *this);
} else if (FALSE_IT(end_us = ObTimeUtility::fast_current_time())) {
} else if (commit) {
@ -1844,7 +1845,7 @@ int ObPartTransCtx::common_on_success_(ObTxLogCb *log_cb)
exec_info_.max_durable_lsn_ = lsn;
}
if (OB_SUCC(ret)) {
if (OB_FAIL(mt_ctx_.sync_log_succ(log_ts.get_val_for_lsn_allocator(), log_cb->get_callbacks()))) {
if (OB_FAIL(mt_ctx_.sync_log_succ(log_ts, log_cb->get_callbacks()))) {
TRANS_LOG(ERROR, "mt ctx sync log failed", KR(ret), K(*log_cb), K(*this));
} else if (OB_SUCCESS != (tmp_ret = mt_ctx_.remove_callbacks_for_fast_commit())) {
TRANS_LOG(WARN, "cleanout callbacks for fast commit", K(ret), K(*this));
@ -3541,11 +3542,11 @@ void ObPartTransCtx::check_no_need_replay_checksum(const palf::SCN &log_ts)
{
// TODO(handora.qc): How to lock the tx_ctx
// checksum_log_ts_ means all data's checksum has been calculated before the
// log of checksum_log_ts_(not included). So if the data with this log_ts is
// not replayed with checksum_log_ts_ <= log_ts, it means may exist some data
// checksum_scn_ means all data's checksum has been calculated before the
// log of checksum_scn_(not included). So if the data with this scn is
// not replayed with checksum_scn_ <= scn, it means may exist some data
// will never be replayed because the memtable will filter the data.
if (exec_info_.checksum_log_ts_ <= log_ts.get_val_for_lsn_allocator()) {
if (exec_info_.checksum_scn_ <= log_ts) {
exec_info_.need_checksum_ = false;
}
}
@ -4788,9 +4789,9 @@ int ObPartTransCtx::get_tx_ctx_table_info_(ObTxCtxTableInfo &info)
}
if (OB_SUCC(ret)) {
if (OB_FAIL(mt_ctx_.calc_checksum_before_log_ts(exec_info_.max_applied_log_ts_.get_val_for_lsn_allocator(),
exec_info_.checksum_,
exec_info_.checksum_log_ts_))) {
if (OB_FAIL(mt_ctx_.calc_checksum_before_scn(exec_info_.max_applied_log_ts_,
exec_info_.checksum_,
exec_info_.checksum_scn_))) {
TRANS_LOG(ERROR, "calc checksum before log ts failed", K(ret), KPC(this));
} else {
info.tx_id_ = trans_id_;
@ -5070,7 +5071,7 @@ int ObPartTransCtx::notify_data_source_(const NotifyType notify_type,
int ret = OB_SUCCESS;
ObMulSourceDataNotifyArg arg;
arg.tx_id_ = trans_id_;
arg.log_ts_ = log_ts.get_val_for_lsn_allocator();
arg.scn_ = log_ts;
arg.trans_version_ = ctx_tx_data_.get_commit_version();
arg.for_replay_ = for_replay;
arg.notify_type_ = notify_type;
@ -5896,8 +5897,9 @@ int ObPartTransCtx::on_local_commit_tx_()
TRANS_LOG(WARN, "wait gts elapse commit version failed", KR(ret), KPC(this));
} else if (FALSE_IT(tg.click())) {
} else if (FALSE_IT(start_us = ObTimeUtility::fast_current_time())) {
} else if (OB_FAIL(mt_ctx_.trans_end(true, ctx_tx_data_.get_commit_version().get_val_for_lsn_allocator(),
ctx_tx_data_.get_end_log_ts().get_val_for_lsn_allocator()))) {
} else if (OB_FAIL(mt_ctx_.trans_end(true,
ctx_tx_data_.get_commit_version(),
ctx_tx_data_.get_end_log_ts()))) {
TRANS_LOG(WARN, "trans end error", KR(ret), "context", *this);
} else if (FALSE_IT(end_us = ObTimeUtility::fast_current_time())) {
} else if (FALSE_IT(elr_handler_.reset_elr_state())) {
@ -5946,7 +5948,7 @@ int ObPartTransCtx::on_local_abort_tx_()
ObTxBufferNodeArray tmp_array;
start_us = ObTimeUtility::fast_current_time();
if (OB_FAIL(mt_ctx_.trans_end(false, -1 /*unused*/, ctx_tx_data_.get_end_log_ts().get_val_for_lsn_allocator()))) {
if (OB_FAIL(mt_ctx_.trans_end(false, palf::SCN::invalid_scn(), ctx_tx_data_.get_end_log_ts()))) {
TRANS_LOG(WARN, "trans end error", KR(ret), K(commit_version), "context", *this);
} else if (FALSE_IT(end_us = ObTimeUtility::fast_current_time())) {
} else if (OB_FAIL(trans_clear_())) {

View File

@ -197,7 +197,7 @@ public:
{ mt_ctx_.set_table_lock_killed(); }
bool is_table_lock_killed() const;
bool need_update_schema_version(const int64_t log_id,
const int64_t log_ts);
const palf::SCN log_ts);
void set_trans_table_status(const obrpc::ObTrxToolArg &arg);
share::ObLSID get_ls_id() const { return ls_id_; }

View File

@ -239,7 +239,7 @@ int ObTxReplayExecutor::before_replay_redo_()
if (!has_redo_) {
if (OB_ISNULL(ctx_) || OB_ISNULL(mt_ctx_ = ctx_->get_memtable_ctx())) {
ret = OB_INVALID_ARGUMENT;
} else if (mt_ctx_->replay_begin(log_ts_ns_.get_val_for_lsn_allocator())) {
} else if (mt_ctx_->replay_begin(log_ts_ns_)) {
TRANS_LOG(ERROR, "[Replay Tx] replay_begin fail or mt_ctx_ is NULL", K(ret), K(mt_ctx_));
} else {
has_redo_ = true;
@ -253,11 +253,11 @@ void ObTxReplayExecutor::finish_replay_(const int retcode)
if (has_redo_) {
if (OB_SUCCESS != retcode) {
mt_ctx_->replay_end(false, /*is_replay_succ*/
log_ts_ns_.get_val_for_lsn_allocator());
log_ts_ns_);
TRANS_LOG(WARN, "[Replay Tx]Tx Redo replay error, rollback to start", K(*this));
} else {
mt_ctx_->replay_end(true, /*is_replay_succ*/
log_ts_ns_.get_val_for_lsn_allocator());
log_ts_ns_);
// TRANS_LOG(INFO, "[Replay Tx] Tx Redo replay success, commit sub_trans", K(*this));
}
}
@ -668,9 +668,9 @@ int ObTxReplayExecutor::replay_row_(storage::ObStoreCtx &store_ctx,
KP(mmi_ptr));
} else if (OB_FAIL(data_mem_ptr->replay_row(store_ctx, mmi_ptr, row_buf))) {
TRANS_LOG(WARN, "[Replay Tx] replay row error", K(ret));
} else if (OB_FAIL(data_mem_ptr->set_max_end_log_ts(log_ts_ns_.get_val_for_lsn_allocator()))) { // for freeze log_ts , may be
} else if (OB_FAIL(data_mem_ptr->set_max_end_scn(log_ts_ns_))) { // for freeze log_ts , may be
TRANS_LOG(WARN, "[Replay Tx] set memtable max end log ts failed", K(ret), KP(data_mem_ptr));
} else if (OB_FAIL(data_mem_ptr->set_rec_log_ts(log_ts_ns_.get_val_for_lsn_allocator()))) {
} else if (OB_FAIL(data_mem_ptr->set_rec_scn(log_ts_ns_))) {
TRANS_LOG(WARN, "[Replay Tx] set rec_log_ts error", K(ret), KPC(data_mem_ptr));
}
@ -679,7 +679,7 @@ int ObTxReplayExecutor::replay_row_(storage::ObStoreCtx &store_ctx,
// in a freeze memtable which has a smaller end ts than this log.
//
// The rollback operation must hold write_ref to make memtable stay in memory.
mt_ctx_->rollback_redo_callbacks(log_ts_ns_.get_val_for_lsn_allocator());
mt_ctx_->rollback_redo_callbacks(log_ts_ns_);
}
return ret;
}

View File

@ -274,7 +274,7 @@ int ObTxCtxMemtable::flush(palf::SCN recycle_scn, bool need_freeze)
scn_range.start_scn_.convert_for_gts(1);
scn_range.end_scn_.convert_for_gts(cur_ts);
set_scn_range(scn_range);
set_snapshot_version(cur_ts);
set_snapshot_version(scn_range.end_scn_);
ATOMIC_STORE(&is_frozen_, true);
}
}

View File

@ -421,7 +421,7 @@ bool ObTxDataMemtable::ready_for_flush()
STORAGE_LOG(WARN, "get_max_consequent_callbacked_log_ts failed", K(ret), K(freezer_->get_ls_id()));
} else if (max_consequent_callbacked_scn >= key_.scn_range_.end_scn_) {
state_ = ObTxDataMemtable::State::FROZEN;
set_snapshot_version(min_tx_scn_.get_val_for_lsn_allocator());
set_snapshot_version(min_tx_scn_);
bool_ret = true;
} else {
int64_t freeze_ts = key_.scn_range_.end_scn_.get_val_for_inner_table_field();

View File

@ -328,7 +328,9 @@ TEST_F(TestMemtable, mt_set)
print(mvcc_row);
EXPECT_EQ(2, rg.mem_ctx_.trans_mgr_.get_main_list_length());
EXPECT_EQ(OB_SUCCESS, rg.mem_ctx_.do_trans_end(true, 1000, 1000, 0));
palf::SCN val_1000;
val_1000.convert_for_lsn_allocator(1000);
EXPECT_EQ(OB_SUCCESS, rg.mem_ctx_.do_trans_end(true, val_1000, val_1000, 0));
print(mvcc_row);
}
@ -347,13 +349,17 @@ TEST_F(TestMemtable, conflict)
EXPECT_EQ(OB_SUCCESS, rg2.init(2, this));
EXPECT_EQ(OB_ERR_EXCLUSIVE_LOCK_CONFLICT, rg2.write(1, 3, mt));
EXPECT_EQ(OB_SUCCESS, rg.mem_ctx_.do_trans_end(true, 1000, 1000, 0));
palf::SCN val_1000;
val_1000.convert_for_lsn_allocator(1000);
EXPECT_EQ(OB_SUCCESS, rg.mem_ctx_.do_trans_end(true, val_1000, val_1000, 0));
EXPECT_EQ(OB_TRANSACTION_SET_VIOLATION, rg2.write(1, 3, mt, 900));
EXPECT_EQ(OB_SUCCESS, rg2.write(1, 3, mt, 1000));
EXPECT_EQ(OB_SUCCESS, rg2.write(1, 4, mt, 1001));
EXPECT_EQ(OB_SUCCESS, rg2.mem_ctx_.do_trans_end(true, 1002, 1002, 0));
palf::SCN val_1002;
val_1002.convert_for_lsn_allocator(1002);
EXPECT_EQ(OB_SUCCESS, rg2.mem_ctx_.do_trans_end(true, val_1002, val_1002, 0));
print(mvcc_row);
}
@ -367,22 +373,24 @@ TEST_F(TestMemtable, except)
EXPECT_EQ(OB_SUCCESS, rg.init(1, this));
ObMvccRow *mvcc_row = nullptr;
EXPECT_EQ(OB_SUCCESS, rg.write(1, 2, mt, mvcc_row, 1000));
EXPECT_EQ(OB_SUCCESS, rg.mem_ctx_.do_trans_end(true, 900, 900, 0));
palf::SCN val_900;
val_900.convert_for_lsn_allocator(900);
EXPECT_EQ(OB_SUCCESS, rg.write(1, 2, mt, mvcc_row, 1000));
EXPECT_EQ(OB_SUCCESS, rg.mem_ctx_.do_trans_end(true, val_900, val_900, 0));
RunCtxGuard rg2;
EXPECT_EQ(OB_SUCCESS, rg2.init(2, this));
EXPECT_EQ(OB_SUCCESS, rg2.write(1, 3, mt, 1000));
EXPECT_EQ(OB_SUCCESS, rg2.mem_ctx_.do_trans_end(true, 900, 900, 0));
EXPECT_EQ(OB_SUCCESS, rg2.mem_ctx_.do_trans_end(true, val_900, val_900, 0));
print(mvcc_row);
RunCtxGuard rg3;
EXPECT_EQ(OB_SUCCESS, rg3.init(3, this));
EXPECT_EQ(OB_SUCCESS, rg3.write(1, 4, mt, 900));
EXPECT_EQ(OB_SUCCESS, rg3.mem_ctx_.do_trans_end(true, 900, 900, 0));
EXPECT_EQ(OB_SUCCESS, rg3.mem_ctx_.do_trans_end(true, val_900, val_900, 0));
print(mvcc_row);
}
@ -395,11 +403,14 @@ TEST_F(TestMemtable, multi_key)
RunCtxGuard rg;
EXPECT_EQ(OB_SUCCESS, rg.init(1, this));
palf::SCN val_900;
val_900.convert_for_lsn_allocator(900);
ObMvccRow *mvcc_row = nullptr;
ObMvccRow *mvcc_row2 = nullptr;
EXPECT_EQ(OB_SUCCESS, rg.write(1, 10, mt, mvcc_row, 1000));
EXPECT_EQ(OB_SUCCESS, rg.write(2, 20, mt, mvcc_row2, 1000));
EXPECT_EQ(OB_SUCCESS, rg.mem_ctx_.do_trans_end(true, 900, 900, 0));
EXPECT_EQ(OB_SUCCESS, rg.mem_ctx_.do_trans_end(true, val_900, val_900, 0));
print(mvcc_row);
print(mvcc_row2);
@ -409,7 +420,7 @@ TEST_F(TestMemtable, multi_key)
EXPECT_EQ(OB_SUCCESS, rg2.write(1, 100, mt, 1000));
EXPECT_EQ(OB_SUCCESS, rg2.write(2, 200, mt, 1000));
EXPECT_EQ(OB_SUCCESS, rg2.mem_ctx_.do_trans_end(true, 900, 900, 0));
EXPECT_EQ(OB_SUCCESS, rg2.mem_ctx_.do_trans_end(true, val_900, val_900, 0));
print(mvcc_row);
print(mvcc_row2);
}

View File

@ -305,7 +305,9 @@ int TestCompactionPolicy::mock_memtable(
LOG_WARN("add to data_checkpoint failed", K(ret), KPC(memtable));
mt_mgr->clean_tail_memtable_();
} else if (palf::OB_MAX_SCN_TS_NS != end_border) { // frozen memtable
memtable->snapshot_version_ = snapshot_version;
palf::SCN snapshot_scn;
snapshot_scn.convert_for_lsn_allocator(snapshot_version);
memtable->snapshot_version_ = snapshot_scn;
memtable->write_ref_cnt_ = 0;
memtable->unsynced_cnt_ = 0;
memtable->is_tablet_freeze_ = true;

View File

@ -218,16 +218,16 @@ int TestDmlCommon::create_data_tablet(
} else {
transaction::ObMulSourceDataNotifyArg trans_flags;
trans_flags.tx_id_ = 123;
trans_flags.log_ts_ = -1;
trans_flags.scn_ = palf::SCN::invalid_scn();
trans_flags.for_replay_ = false;
ObLS *ls = ls_handle.get_ls();
if (OB_FAIL(ls->get_tablet_svr()->on_prepare_create_tablets(arg, trans_flags))) {
STORAGE_LOG(WARN, "failed to prepare create tablets", K(ret), K(arg));
} else if (FALSE_IT(trans_flags.log_ts_ = INT64_MAX - 100)) {
} else if (FALSE_IT(trans_flags.scn_ = palf::SCN::minus(palf::SCN::max_scn(), 100))) {
} else if (OB_FAIL(ls->get_tablet_svr()->on_redo_create_tablets(arg, trans_flags))) {
STORAGE_LOG(WARN, "failed to redo create tablets", K(ret), K(arg));
} else if (FALSE_IT(++trans_flags.log_ts_)) {
} else if (FALSE_IT(trans_flags.scn_ = palf::SCN::plus(trans_flags.scn_, 1))) {
} else if (OB_FAIL(ls->get_tablet_svr()->on_commit_create_tablets(arg, trans_flags))) {
STORAGE_LOG(WARN, "failed to commit create tablets", K(ret), K(arg));
}

View File

@ -77,16 +77,16 @@ int TestLobCommon::create_data_tablet(
} else {
transaction::ObMulSourceDataNotifyArg trans_flags;
trans_flags.tx_id_ = 123;
trans_flags.log_ts_ = -1;
trans_flags.scn_ = palf::SCN::invalid_scn();
trans_flags.for_replay_ = false;
ObLS *ls = ls_handle.get_ls();
if (OB_FAIL(ls->get_tablet_svr()->on_prepare_create_tablets(arg, trans_flags))) {
STORAGE_LOG(WARN, "failed to prepare create tablets", K(ret), K(arg));
} else if (FALSE_IT(trans_flags.log_ts_ = INT64_MAX - 100)) {
} else if (FALSE_IT(trans_flags.scn_ = palf::SCN::minus(palf::SCN::max_scn(), 100))) {
} else if (OB_FAIL(ls->get_tablet_svr()->on_redo_create_tablets(arg, trans_flags))) {
STORAGE_LOG(WARN, "failed to redo create tablets", K(ret), K(arg));
} else if (FALSE_IT(++trans_flags.log_ts_)) {
} else if (FALSE_IT(trans_flags.scn_ = palf::SCN::plus(trans_flags.scn_, 1))) {
} else if (OB_FAIL(ls->get_tablet_svr()->on_commit_create_tablets(arg, trans_flags))) {
STORAGE_LOG(WARN, "failed to commit create tablets", K(ret), K(arg));
}

View File

@ -36,15 +36,15 @@ int TestTabletHelper::create_tablet(ObLSTabletService &ls_tablet_svr, obrpc::ObB
int ret = common::OB_SUCCESS;
transaction::ObMulSourceDataNotifyArg trans_flags;
trans_flags.tx_id_ = 123;
trans_flags.log_ts_ = -1;
trans_flags.scn_ = palf::SCN::invalid_scn();
trans_flags.for_replay_ = false;
if (OB_FAIL(ls_tablet_svr.on_prepare_create_tablets(arg, trans_flags))) {
STORAGE_LOG(WARN, "failed to prepare create tablets", K(ret), K(arg));
} else if (FALSE_IT(trans_flags.log_ts_ = palf::OB_MAX_SCN_TS_NS - 100)) {
} else if (FALSE_IT(trans_flags.scn_ = palf::SCN::minus(palf::SCN::max_scn(), 100))) {
} else if (OB_FAIL(ls_tablet_svr.on_redo_create_tablets(arg, trans_flags))) {
STORAGE_LOG(WARN, "failed to redo create tablets", K(ret), K(arg));
} else if (FALSE_IT(++trans_flags.log_ts_)) {
} else if (FALSE_IT(trans_flags.scn_ = palf::SCN::plus(trans_flags.scn_, 1))) {
} else if (OB_FAIL(ls_tablet_svr.on_commit_create_tablets(arg, trans_flags))) {
STORAGE_LOG(WARN, "failed to commit create tablets", K(ret), K(arg));
}