diff --git a/src/logservice/palf/scn.cpp b/src/logservice/palf/scn.cpp index 54d93c193..710bca82d 100644 --- a/src/logservice/palf/scn.cpp +++ b/src/logservice/palf/scn.cpp @@ -41,6 +41,13 @@ bool SCN::atomic_bcas(const SCN &old_v, const SCN &new_val) return ATOMIC_BCAS(&val_, old_v.val_, new_val.val_); } +SCN SCN::atomic_vcas(const SCN &old_v, const SCN &new_val) +{ + SCN tmp; + tmp.val_ = ATOMIC_VCAS(&val_, old_v.val_, new_val.val_); + return tmp; +} + bool SCN::is_valid() const { return ((OB_INVALID_SCN_VAL != val_) && (SCN_VERSION == v_)); @@ -57,7 +64,7 @@ void SCN::set_max() ts_ns_ = OB_MAX_SCN_TS_NS; } -bool SCN::is_max() +bool SCN::is_max() const { bool bool_ret = false; if (v_ == SCN_VERSION && ts_ns_ == OB_MAX_SCN_TS_NS) { @@ -72,7 +79,7 @@ void SCN::set_min() ts_ns_ = OB_MIN_SCN_TS_NS; } -bool SCN::is_min() +bool SCN::is_min() const { bool bool_ret = false; if (v_ == SCN_VERSION && ts_ns_ == OB_MIN_SCN_TS_NS) { diff --git a/src/logservice/palf/scn.h b/src/logservice/palf/scn.h index 32482d261..2c22b8147 100644 --- a/src/logservice/palf/scn.h +++ b/src/logservice/palf/scn.h @@ -33,8 +33,8 @@ public: void set_invalid(); void set_max(); void set_min(); - bool is_max(); - bool is_min(); + bool is_max() const; + bool is_min() const; void set_base(); static SCN invalid_scn(); static SCN max_scn(); @@ -48,6 +48,7 @@ public: void atomic_set(const SCN &ref); SCN atomic_get() const; bool atomic_bcas(const SCN &old_v, const SCN &new_val); + SCN atomic_vcas(const SCN &old_v, const SCN &new_val); SCN inc_update(const SCN &ref_scn); SCN dec_update(const SCN &ref_scn); static SCN scn_inc(const SCN &ref); diff --git a/src/storage/ls/ob_ls_member_table.cpp b/src/storage/ls/ob_ls_member_table.cpp index c0fa8de50..ed96e3463 100644 --- a/src/storage/ls/ob_ls_member_table.cpp +++ b/src/storage/ls/ob_ls_member_table.cpp @@ -97,7 +97,7 @@ int ObLSMemberTable::prepare_create_tablets(const obrpc::ObBatchCreateTabletArg ret = OB_ERR_UNEXPECTED; LOG_WARN("ls is NULL", KR(ret), K(ls_handle)); } else if (trans_flags.for_replay_ - && trans_flags.log_ts_ <= ls->get_tablet_change_checkpoint_scn().get_val_for_lsn_allocator()) { + && trans_flags.scn_ <= ls->get_tablet_change_checkpoint_scn()) { LOG_INFO("replay skip for create tablet", KR(ret), K(trans_flags), K(arg), K(ls->get_ls_meta())); } else if (OB_ISNULL(tablet_svr = ls->get_tablet_svr())) { ret = OB_ERR_UNEXPECTED; @@ -164,7 +164,7 @@ int ObLSMemberTable::on_commit_create_tablets(const obrpc::ObBatchCreateTabletAr ret = OB_ERR_UNEXPECTED; LOG_WARN("ls is NULL", KR(ret), K(ls_handle)); } else if (trans_flags.for_replay_ - && trans_flags.log_ts_ <= ls->get_tablet_change_checkpoint_scn().get_val_for_lsn_allocator()) { + && trans_flags.scn_ <= ls->get_tablet_change_checkpoint_scn()) { LOG_INFO("replay skip for create tablet", KR(ret), K(trans_flags), K(arg), K(ls->get_ls_meta())); } else if (OB_ISNULL(tablet_svr = ls->get_tablet_svr())) { ret = OB_ERR_UNEXPECTED; @@ -283,7 +283,7 @@ int ObLSMemberTable::prepare_remove_tablets(const obrpc::ObBatchRemoveTabletArg ret = OB_ERR_UNEXPECTED; LOG_WARN("ls is NULL", KR(ret), K(ls_handle)); } else if (trans_flags.for_replay_ // dropping tablet triggers dumpping memtable - && trans_flags.log_ts_ <= ls->get_tablet_change_checkpoint_scn().get_val_for_lsn_allocator()) { + && trans_flags.scn_ <= ls->get_tablet_change_checkpoint_scn()) { LOG_INFO("replay skip for remove tablet", KR(ret), K(trans_flags), K(arg), K(ls->get_ls_meta())); } else if (OB_ISNULL(tablet_svr = ls->get_tablet_svr())) { ret = OB_ERR_UNEXPECTED; @@ -349,7 +349,7 @@ int ObLSMemberTable::on_commit_remove_tablets(const obrpc::ObBatchRemoveTabletAr ret = OB_ERR_UNEXPECTED; LOG_WARN("ls is NULL", KR(ret), K(ls_handle)); } else if (trans_flags.for_replay_ - && trans_flags.log_ts_ <= ls->get_tablet_change_checkpoint_scn().get_val_for_lsn_allocator()) { + && trans_flags.scn_ <= ls->get_tablet_change_checkpoint_scn()) { LOG_INFO("replay skip for remove tablet", KR(ret), K(trans_flags), K(arg), K(ls->get_ls_meta())); } else if (OB_ISNULL(tablet_svr = ls->get_tablet_svr())) { ret = OB_ERR_UNEXPECTED; diff --git a/src/storage/memtable/mvcc/ob_mvcc_row.cpp b/src/storage/memtable/mvcc/ob_mvcc_row.cpp index 60431a09d..dcbc65ecd 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_row.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_row.cpp @@ -560,7 +560,7 @@ int ObMvccRow::row_compact(ObMemtable *memtable, ObMemtableRowCompactor row_compactor; if (OB_FAIL(row_compactor.init(this, memtable, node_alloc, for_replay))) { TRANS_LOG(WARN, "row compactor init error", K(ret)); - } else if (OB_FAIL(row_compactor.compact(snapshot_version.get_val_for_lsn_allocator()))) { + } else if (OB_FAIL(row_compactor.compact(snapshot_version))) { TRANS_LOG(WARN, "row compact error", K(ret), K(snapshot_version)); } else { // do nothing diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp index 4378ef8aa..0293d9d6b 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp @@ -976,10 +976,11 @@ int ObMvccRowCallback::trans_commit() if (value_.need_compact(for_read, ctx_.is_for_replay())) { if (ctx_.is_for_replay()) { if (palf::SCN::min_scn() != ctx_.get_replay_compact_version() && palf::SCN::max_scn() != ctx_.get_replay_compact_version()) { - memtable_->row_compact(&value_, ctx_.is_for_replay(), ctx_.get_replay_compact_version().get_val_for_lsn_allocator()); + memtable_->row_compact(&value_, ctx_.is_for_replay(), ctx_.get_replay_compact_version()); } } else { - memtable_->row_compact(&value_, ctx_.is_for_replay(), INT64_MAX - 100); + palf::SCN snapshot_version_for_compact = palf::SCN::minus(palf::SCN::max_scn(), 100); + memtable_->row_compact(&value_, ctx_.is_for_replay(), snapshot_version_for_compact); } } } @@ -1169,8 +1170,8 @@ int ObMvccRowCallback::log_sync(const palf::SCN scn) { int ret = OB_SUCCESS; - memtable_->set_rec_log_ts(scn.get_val_for_lsn_allocator()); - memtable_->set_max_end_log_ts(scn.get_val_for_lsn_allocator()); + memtable_->set_rec_scn(scn); + memtable_->set_max_end_scn(scn); (void)tnode_->fill_scn(scn); ctx_.update_max_submitted_seq_no(seq_no_); if (OB_FAIL(dec_unsynced_cnt_())) { diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index 66b1995bb..99afd9f8e 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -107,9 +107,9 @@ ObMemtable::ObMemtable() logging_blocked_start_time(0), unset_active_memtable_logging_blocked_(false), resolve_active_memtable_left_boundary_(true), - freeze_log_ts_(INT64_MAX), - max_end_log_ts_(ObScnRange::MIN_TS), - rec_log_ts_(INT64_MAX), + freeze_scn_(palf::SCN::max_scn()), + max_end_scn_(ObScnRange::MIN_SCN), + rec_scn_(palf::SCN::max_scn()), state_(ObMemtableState::INVALID), freeze_state_(ObMemtableFreezeState::INVALID), timestamp_(0), @@ -194,7 +194,7 @@ int ObMemtable::remove_unused_callback_for_uncommited_txn_() transaction::ObTransService *txs_svr = MTL(transaction::ObTransService *); if (NULL != txs_svr - && share::ObScnRange::MAX_TS != get_end_log_ts() + && share::ObScnRange::MAX_SCN != get_end_scn() && OB_FAIL(txs_svr->remove_callback_for_uncommited_txn(this))) { TRANS_LOG(WARN, "remove callback for uncommited txn failed", K(ret), K(*this)); } @@ -245,15 +245,15 @@ void ObMemtable::destroy() logging_blocked_start_time = 0; unset_active_memtable_logging_blocked_ = false; resolve_active_memtable_left_boundary_ = true; - max_end_log_ts_ = ObScnRange::MIN_TS; - rec_log_ts_ = INT64_MAX; + max_end_scn_ = ObScnRange::MIN_SCN; + rec_scn_ = palf::SCN::max_scn(); read_barrier_ = false; is_tablet_freeze_ = false; is_force_freeze_ = false; is_flushed_ = false; is_inited_ = false; contain_hotspot_row_ = false; - snapshot_version_ = INT64_MAX; + snapshot_version_.set_max(); } //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -365,15 +365,15 @@ int ObMemtable::lock_(ObStoreCtx &ctx, } else { ObMemtableData mtd(blocksstable::ObDmlFlag::DF_LOCK, len, buf); ObTxNodeArg arg(&mtd, /*memtable_data*/ - NULL, /*old_data*/ - timestamp_, /*memstore_version*/ - ctx.mvcc_acc_ctx_.tx_scn_ /*seq_no*/); + NULL, /*old_data*/ + timestamp_, /*memstore_version*/ + ctx.mvcc_acc_ctx_.tx_scn_ /*seq_no*/); if (OB_FAIL(mvcc_write_(ctx, - &mtk, - /*used for mvcc_write on sstable*/ - read_info, - arg, - is_new_locked))) { + &mtk, + /*used for mvcc_write on sstable*/ + read_info, + arg, + is_new_locked))) { } else if (OB_UNLIKELY(!is_new_locked)) { TRANS_LOG(DEBUG, "lock twice, no need to store lock trans node", K(table_id), K(ctx)); } @@ -550,7 +550,6 @@ int ObMemtable::check_row_locked_by_myself( } else { bool is_locked = false; bool tmp_is_locked = false; - int64_t trans_version = 0; ObStoreRowLockState lock_state; const ObIArray *stores = nullptr; common::ObSEArray iter_tables; @@ -1092,7 +1091,7 @@ int ObMemtable::replay_row(ObStoreCtx &ctx, seq_no, /*seq_no*/ modify_count, /*modify_count*/ acc_checksum, /*acc_checksum*/ - scn /*log_ts*/); + scn /*scn*/); if (OB_FAIL(mtk.encode(&rowkey))) { TRANS_LOG(WARN, "mtk encode fail", "ret", ret); @@ -1105,7 +1104,7 @@ int ObMemtable::replay_row(ObStoreCtx &ctx, TRANS_LOG(WARN, "m_replay_row fail", K(ret), K(table_id), K(rowkey), K(row), K(dml_flag), K(modify_count), K(acc_checksum)); } - } else if (part_ctx->need_update_schema_version(log_id, scn.get_val_for_lsn_allocator())) { + } else if (part_ctx->need_update_schema_version(log_id, scn)) { ctx.mvcc_acc_ctx_.mem_ctx_->set_table_version(table_version); set_max_schema_version(table_version); } @@ -1131,7 +1130,7 @@ int ObMemtable::lock_row_on_frozen_stores_(ObStoreCtx &ctx, } else if (value->is_lower_lock_scaned()) { } else { bool row_locked = false; - int64_t max_trans_version = 0; + palf::SCN max_trans_version = palf::SCN::min_scn(); const ObIArray *stores = nullptr; common::ObSEArray iter_tables; ctx.table_iter_->resume(); @@ -1184,8 +1183,8 @@ int ObMemtable::lock_row_on_frozen_stores_(ObStoreCtx &ctx, row_locked |= lock_state.is_locked_; if (lock_state.is_locked_ && my_tx_id != lock_state.lock_trans_id_) { ret = OB_TRY_LOCK_ROW_CONFLICT; - } else if (max_trans_version < lock_state.trans_version_.get_val_for_lsn_allocator()) { - max_trans_version = lock_state.trans_version_.get_val_for_lsn_allocator(); + } else if (max_trans_version < lock_state.trans_version_) { + max_trans_version = lock_state.trans_version_; } } TRANS_LOG(DEBUG, "check_row_locked", K(ret), K(i), @@ -1197,13 +1196,10 @@ int ObMemtable::lock_row_on_frozen_stores_(ObStoreCtx &ctx, if (OB_SUCC(ret)) { // use tx_id = 0 indicate MvccRow's max_trans_version inherit from old table transaction::ObTransID tx_id(0); - // TODO(handora.qc): fix it - palf::SCN max_trans_scn; - max_trans_scn.convert_for_lsn_allocator(max_trans_version); - value->update_max_trans_version(max_trans_scn, tx_id); + value->update_max_trans_version(max_trans_version, tx_id); if (!row_locked) { // there is no locks on frozen stores - if (max_trans_scn > ctx.mvcc_acc_ctx_.get_snapshot_version()) { + if (max_trans_version > ctx.mvcc_acc_ctx_.get_snapshot_version()) { ret = OB_TRANSACTION_SET_VIOLATION; TRANS_LOG(WARN, "TRANS_SET_VIOLATION", K(ret), K(max_trans_version), "ctx", ctx); } @@ -1273,7 +1269,7 @@ ObDatumRange &ObMemtable::m_get_real_range(ObDatumRange &real_range, const ObDat int ObMemtable::row_compact(ObMvccRow *row, const bool for_replay, - const int64_t snapshot_version) + const palf::SCN snapshot_version) { int ret = OB_SUCCESS; ObMemtableRowCompactor row_compactor; @@ -1368,7 +1364,7 @@ int64_t ObMemtable::dec_write_ref() } else { if (0 == get_unsynced_cnt()) { resolve_right_boundary(); - if (OB_FAIL(memtable_mgr_->resolve_left_boundary_for_active_memtable(this, get_end_log_ts(), get_snapshot_version()))) { + if (OB_FAIL(memtable_mgr_->resolve_left_boundary_for_active_memtable(this, get_end_scn(), get_snapshot_version_scn()))) { TRANS_LOG(WARN, "fail to resolve left boundary for active memtable", K(ret), K(ls_id), KPC(this)); } } @@ -1401,7 +1397,7 @@ int ObMemtable::dec_unsynced_cnt() } else if (is_frozen && 0 == write_ref_cnt && 0 == unsynced_cnt) { resolve_right_boundary(); TRANS_LOG(INFO, "[resolve_right_boundary] dec_unsynced_cnt", K(ls_id), KPC(this)); - if (OB_FAIL(memtable_mgr_->resolve_left_boundary_for_active_memtable(this, get_end_log_ts(), get_snapshot_version()))) { + if (OB_FAIL(memtable_mgr_->resolve_left_boundary_for_active_memtable(this, get_end_scn(), get_snapshot_version_scn()))) { TRANS_LOG(WARN, "fail to set start log ts for active memtable", K(ret), K(ls_id), KPC(this)); } TRANS_LOG(INFO, "memtable log synced", K(ret), K(ls_id), KPC(this)); @@ -1441,24 +1437,24 @@ int ObMemtable::get_frozen_schema_version(int64_t &schema_version) const return OB_NOT_SUPPORTED; } -int ObMemtable::set_snapshot_version(const int64_t snapshot_version) +int ObMemtable::set_snapshot_version(const palf::SCN snapshot_version) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; TRANS_LOG(WARN, "not inited", K(ret)); - } else if (ObVersionRange::MAX_VERSION == snapshot_version - || ObVersionRange::MIN_VERSION >= snapshot_version) { + } else if (snapshot_version.is_max() + || !snapshot_version.is_valid()) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "invalid args", K(ret), K(snapshot_version)); - } else if (snapshot_version_ == ObVersionRange::MAX_VERSION) { + } else if (snapshot_version_.is_max()) { snapshot_version_ = snapshot_version; } return ret; } -int ObMemtable::set_rec_log_ts(int64_t rec_log_ts) +int ObMemtable::set_rec_scn(palf::SCN rec_scn) { int ret = OB_SUCCESS; share::ObLSID ls_id = freezer_->get_ls_id(); @@ -1466,19 +1462,19 @@ int ObMemtable::set_rec_log_ts(int64_t rec_log_ts) if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; TRANS_LOG(WARN, "not inited", K(ret)); - } else if (ObScnRange::MAX_TS ==rec_log_ts) { + } else if (rec_scn.is_max()) { ret = OB_INVALID_ARGUMENT; - TRANS_LOG(WARN, "invalid args", K(ret), K(rec_log_ts)); - } else if (rec_log_ts <= get_start_log_ts()) { + TRANS_LOG(WARN, "invalid args", K(ret), K(rec_scn)); + } else if (rec_scn <= get_start_scn()) { ret = OB_LOG_TS_OUT_OF_BOUND; - TRANS_LOG(ERROR, "cannot set freeze log ts smaller to start log ts", K(ret), K(rec_log_ts), K(ls_id), KPC(this)); + TRANS_LOG(ERROR, "cannot set freeze log ts smaller to start log ts", K(ret), K(rec_scn), K(ls_id), KPC(this)); } else { - int64_t old_rec_log_ts = 0; - int64_t new_rec_log_ts = get_rec_log_ts(); - while ((old_rec_log_ts = new_rec_log_ts) > rec_log_ts) { - if ((new_rec_log_ts = ATOMIC_VCAS(&rec_log_ts_, old_rec_log_ts, rec_log_ts)) - == old_rec_log_ts) { - new_rec_log_ts = rec_log_ts; + palf::SCN old_rec_scn; + palf::SCN new_rec_scn = get_rec_scn(); + while ((old_rec_scn = new_rec_scn) > rec_scn) { + if ((new_rec_scn = rec_scn_.atomic_vcas(old_rec_scn, rec_scn)) + == old_rec_scn) { + new_rec_scn = rec_scn; } } } @@ -1486,7 +1482,7 @@ int ObMemtable::set_rec_log_ts(int64_t rec_log_ts) return ret; } -int ObMemtable::set_start_log_ts(const int64_t start_ts) +int ObMemtable::set_start_scn(const palf::SCN start_scn) { int ret = OB_SUCCESS; share::ObLSID ls_id = freezer_->get_ls_id(); @@ -1494,24 +1490,22 @@ int ObMemtable::set_start_log_ts(const int64_t start_ts) if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; TRANS_LOG(WARN, "not inited", K(ret)); - } else if (ObScnRange::MAX_TS == start_ts) { + } else if (ObScnRange::MAX_SCN == start_scn) { ret = OB_INVALID_ARGUMENT; - TRANS_LOG(WARN, "invalid args", K(ret), K(start_ts)); - } else if (start_ts >= get_end_log_ts() - || (max_end_log_ts_ != 0 && start_ts >= max_end_log_ts_) - || start_ts >= rec_log_ts_) { + TRANS_LOG(WARN, "invalid args", K(ret), K(start_scn)); + } else if (start_scn >= get_end_scn() + || (max_end_scn_ != palf::SCN::min_scn() && start_scn >= max_end_scn_) + || start_scn >= rec_scn_) { ret = OB_LOG_TS_OUT_OF_BOUND; - TRANS_LOG(ERROR, "cannot set start ts now", K(ret), K(start_ts), K(ls_id), KPC(this)); + TRANS_LOG(ERROR, "cannot set start ts now", K(ret), K(start_scn), K(ls_id), KPC(this)); } else { - palf::SCN tmp; - tmp.convert_for_gts(start_ts); - key_.scn_range_.start_scn_.atomic_set(tmp); + key_.scn_range_.start_scn_ = start_scn; } return ret; } -int ObMemtable::set_end_log_ts(const int64_t freeze_ts) +int ObMemtable::set_end_scn(const palf::SCN freeze_scn) { int ret = OB_SUCCESS; share::ObLSID ls_id = freezer_->get_ls_id(); @@ -1519,36 +1513,31 @@ int ObMemtable::set_end_log_ts(const int64_t freeze_ts) if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; TRANS_LOG(WARN, "not inited", K(ret)); - } else if (ObScnRange::MAX_TS == freeze_ts) { + } else if (ObScnRange::MAX_SCN == freeze_scn) { ret = OB_INVALID_ARGUMENT; - TRANS_LOG(WARN, "invalid args", K(ret), K(freeze_ts)); - } else if (freeze_ts < get_start_log_ts()) { + TRANS_LOG(WARN, "invalid args", K(ret), K(freeze_scn)); + } else if (freeze_scn < get_start_scn()) { ret = OB_LOG_TS_OUT_OF_BOUND; TRANS_LOG(ERROR, "cannot set freeze log ts smaller to start log ts", - K(ret), K(freeze_ts), K(ls_id), KPC(this)); + K(ret), K(freeze_scn), K(ls_id), KPC(this)); } else { - int64_t old_end_log_ts = 0; - int64_t new_end_log_ts = get_end_log_ts(); - while ((old_end_log_ts = new_end_log_ts) < freeze_ts - || new_end_log_ts == ObScnRange::MAX_TS) { - //TODO(SCN) TMP Code for compiling, fix AOTIMIC opt with SCN - if (key_.scn_range_.end_scn_.get_val_for_inner_table_field() == old_end_log_ts) { - key_.scn_range_.end_scn_.convert_for_gts(freeze_ts); - new_end_log_ts = freeze_ts; + palf::SCN old_end_scn; + palf::SCN new_end_scn = get_end_scn(); + while ((old_end_scn = new_end_scn) < freeze_scn + || new_end_scn == ObScnRange::MAX_SCN) { + if ((new_end_scn = + key_.scn_range_.end_scn_.atomic_vcas(old_end_scn, freeze_scn)) + == old_end_scn) { + new_end_scn = freeze_scn; } - // if ((new_end_log_ts = - // ATOMIC_VCAS(&(key_.scn_range_.end_scn_), old_end_log_ts, freeze_ts)) - // == old_end_log_ts) { - // new_end_log_ts = freeze_ts; - // } } - freeze_log_ts_ = freeze_ts; + freeze_scn_ = freeze_scn; } return ret; } -int ObMemtable::set_max_end_log_ts(const int64_t log_ts) +int ObMemtable::set_max_end_scn(const palf::SCN scn) { int ret = OB_SUCCESS; share::ObLSID ls_id = freezer_->get_ls_id(); @@ -1556,21 +1545,21 @@ int ObMemtable::set_max_end_log_ts(const int64_t log_ts) if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; TRANS_LOG(WARN, "not inited", K(ret)); - } else if (ObScnRange::MAX_TS == log_ts) { + } else if (ObScnRange::MAX_SCN == scn) { ret = OB_INVALID_ARGUMENT; - TRANS_LOG(WARN, "invalid args", K(ret), K(log_ts)); - } else if (log_ts <= get_start_log_ts() || log_ts > get_end_log_ts()) { + TRANS_LOG(WARN, "invalid args", K(ret), K(scn)); + } else if (scn <= get_start_scn() || scn > get_end_scn()) { ret = OB_LOG_TS_OUT_OF_BOUND; TRANS_LOG(WARN, "cannot set max end log ts smaller to start log ts", - K(ret), K(log_ts), K(ls_id), KPC(this)); + K(ret), K(scn), K(ls_id), KPC(this)); } else { - int64_t old_max_end_log_ts = 0; - int64_t new_max_end_log_ts = get_max_end_log_ts(); - while ((old_max_end_log_ts = new_max_end_log_ts) < log_ts) { - if ((new_max_end_log_ts = - ATOMIC_VCAS(&max_end_log_ts_, old_max_end_log_ts, log_ts)) - == old_max_end_log_ts) { - new_max_end_log_ts = log_ts; + palf::SCN old_max_end_scn; + palf::SCN new_max_end_scn = get_max_end_scn(); + while ((old_max_end_scn = new_max_end_scn) < scn) { + if ((new_max_end_scn = + max_end_scn_.atomic_vcas(old_max_end_scn, scn)) + == old_max_end_scn) { + new_max_end_scn = scn; } } } @@ -1581,41 +1570,41 @@ int ObMemtable::set_max_end_log_ts(const int64_t log_ts) bool ObMemtable::rec_scn_is_stable() { int ret = OB_SUCCESS; - bool rec_log_ts_is_stable = false; - if (INT64_MAX == rec_log_ts_) { - rec_log_ts_is_stable = (is_frozen_memtable() && write_ref_cnt_ == 0 && unsynced_cnt_ == 0); + bool rec_scn_is_stable = false; + if (palf::SCN::max_scn() == rec_scn_) { + rec_scn_is_stable = (is_frozen_memtable() && write_ref_cnt_ == 0 && unsynced_cnt_ == 0); } else { - int64_t max_consequent_callbacked_log_ts = INT64_MAX; - if (OB_FAIL(freezer_->get_max_consequent_callbacked_log_ts(max_consequent_callbacked_log_ts))) { - STORAGE_LOG(WARN, "get_max_consequent_callbacked_log_ts failed", K(ret), K(freezer_->get_ls_id())); + palf::SCN max_consequent_callbacked_scn; + if (OB_FAIL(freezer_->get_max_consequent_callbacked_scn(max_consequent_callbacked_scn))) { + STORAGE_LOG(WARN, "get_max_consequent_callbacked_scn failed", K(ret), K(freezer_->get_ls_id())); } else { - rec_log_ts_is_stable = (max_consequent_callbacked_log_ts >= rec_log_ts_); + rec_scn_is_stable = (max_consequent_callbacked_scn >= rec_scn_); } - if (!rec_log_ts_is_stable && + if (!rec_scn_is_stable && (mt_stat_.frozen_time_ != 0 && ObTimeUtility::current_time() - mt_stat_.frozen_time_ > 10 * 1000 * 1000L)) { - STORAGE_LOG(WARN, "memtable rec_log_ts not stable for long time", + STORAGE_LOG(WARN, "memtable rec_scn not stable for long time", K(freezer_->get_ls_id()), K(*this), K(mt_stat_.frozen_time_), - K(max_consequent_callbacked_log_ts)); + K(max_consequent_callbacked_scn)); ADD_SUSPECT_INFO(MINI_MERGE, freezer_->get_ls_id(), get_tablet_id(), - "memtable rec_log_ts not stable", - K(rec_log_ts_), - K(max_consequent_callbacked_log_ts)); + "memtable rec_scn not stable", + K(rec_scn_), + K(max_consequent_callbacked_scn)); } } - return rec_log_ts_is_stable; + return rec_scn_is_stable; } -int ObMemtable::get_current_right_boundary(int64_t ¤t_right_boundary) +int ObMemtable::get_current_right_boundary(palf::SCN ¤t_right_boundary) { int ret = OB_SUCCESS; if (OB_ISNULL(freezer_)) { ret = OB_ENTRY_NOT_EXIST; TRANS_LOG(WARN, "freezer should not be null", K(ret)); - } else if (OB_FAIL(freezer_->get_max_consequent_callbacked_log_ts(current_right_boundary))) { - TRANS_LOG(WARN, "fail to get min_unreplay_log_ts", K(ret), K(current_right_boundary)); + } else if (OB_FAIL(freezer_->get_max_consequent_callbacked_scn(current_right_boundary))) { + TRANS_LOG(WARN, "fail to get min_unreplay_scn", K(ret), K(current_right_boundary)); } return ret; @@ -1638,12 +1627,12 @@ bool ObMemtable::ready_for_flush_() bool bool_ret = is_frozen_memtable() && 0 == get_write_ref() && 0 == get_unsynced_cnt(); int ret = OB_SUCCESS; - int64_t current_right_boundary = ObScnRange::MIN_TS; + palf::SCN current_right_boundary = ObScnRange::MIN_SCN; share::ObLSID ls_id = freezer_->get_ls_id(); if (bool_ret) { if (OB_FAIL(resolve_snapshot_version_())) { TRANS_LOG(WARN, "fail to resolve snapshot version", K(ret), KPC(this), K(ls_id)); - } else if (OB_FAIL(resolve_max_end_log_ts_())) { + } else if (OB_FAIL(resolve_max_end_scn_())) { TRANS_LOG(WARN, "fail to resolve snapshot version", K(ret), KPC(this), K(ls_id)); } else { resolve_right_boundary(); @@ -1651,7 +1640,7 @@ bool ObMemtable::ready_for_flush_() if (OB_FAIL(get_current_right_boundary(current_right_boundary))) { TRANS_LOG(WARN, "fail to get current right boundary", K(ret)); } - bool_ret = current_right_boundary >= get_end_log_ts() && + bool_ret = current_right_boundary >= get_end_scn() && (is_empty() || get_resolve_active_memtable_left_boundary()); if (bool_ret) { freeze_state_ = ObMemtableFreezeState::READY_FOR_FLUSH; @@ -1681,14 +1670,14 @@ bool ObMemtable::ready_for_flush_() K(get_write_ref()), K(get_unsynced_cnt()), K(current_right_boundary), - K(get_end_log_ts())); + K(get_end_scn())); freezer_->get_stat().add_memtable_info(get_tablet_id(), - get_start_log_scn(), - get_end_log_scn(), + get_start_scn(), + get_end_scn(), get_write_ref(), get_unsubmitted_cnt(), get_unsynced_cnt(), - current_right_boundary); + current_right_boundary.get_val_for_tx()); } return bool_ret; @@ -1702,8 +1691,8 @@ void ObMemtable::print_ready_for_flush() bool frozen_memtable_flag = is_frozen_memtable(); int64_t write_ref = get_write_ref(); int64_t unsynced_cnt = get_unsynced_cnt(); - int64_t end_log_ts = get_end_log_ts(); - int64_t current_right_boundary = ObScnRange::MIN_TS; + palf::SCN end_scn = get_end_scn(); + palf::SCN current_right_boundary; uint32_t logstream_freeze_clock = freezer_->get_freeze_clock(); uint32_t memtable_freeze_clock = freeze_clock_; if (OB_FAIL(get_current_right_boundary(current_right_boundary))) { @@ -1712,13 +1701,13 @@ void ObMemtable::print_ready_for_flush() bool bool_ret = frozen_memtable_flag && 0 == write_ref && 0 == unsynced_cnt && - current_right_boundary >= end_log_ts; + current_right_boundary >= end_scn; TRANS_LOG(INFO, "[ObFreezer] print_ready_for_flush", KP(this), K(ls_id), K(tablet_id), K(ret), K(bool_ret), K(frozen_memtable_flag), K(write_ref), K(unsynced_cnt), - K(current_right_boundary), K(end_log_ts), + K(current_right_boundary), K(end_scn), K(logstream_freeze_clock), K(memtable_freeze_clock)); } @@ -1756,16 +1745,16 @@ void ObMemtable::print_ready_for_flush() int ObMemtable::resolve_snapshot_version_() { int ret = OB_SUCCESS; - int64_t freeze_snapshot_version = OB_INVALID_TIMESTAMP; + palf::SCN freeze_snapshot_version; - if (snapshot_version_ != ObVersionRange::MAX_VERSION) { + if (snapshot_version_ != palf::SCN::max_scn()) { // Pass if snapshot is already set } else if (OB_ISNULL(freezer_)) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "freezer should not be null", K(ret)); - } else if (FALSE_IT(freeze_snapshot_version = freezer_->get_freeze_snapshot_version().get_val_for_lsn_allocator())) { + } else if (FALSE_IT(freeze_snapshot_version = freezer_->get_freeze_snapshot_version())) { TRANS_LOG(ERROR, "fail to get freeze_snapshot_version", K(ret)); - } else if (OB_INVALID_TIMESTAMP == freeze_snapshot_version) { + } else if (palf::SCN::invalid_scn() == freeze_snapshot_version) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "fail to get freeze_snapshot_version", K(ret), KPC(this)); } else if (OB_FAIL(set_snapshot_version(freeze_snapshot_version))) { @@ -1775,30 +1764,30 @@ int ObMemtable::resolve_snapshot_version_() return ret; } -// The max_decided log ts is used to push up the end_log_ts of the memtable +// The max_decided log ts is used to push up the end_scn of the memtable // using the max decided log ts. -// Before the revision, the end_log_ts of the memtable is the max committed log +// Before the revision, the end_scn of the memtable is the max committed log // ts of the data on the memtable. So for all 2pc txn and some 1pc txn whose -// data log is seperated with the commit log, the end_log_ts of the memtable is -// smaller than the commit_log_ts of the txn. And when the merge happens, the +// data log is seperated with the commit log, the end_scn of the memtable is +// smaller than the commit_scn of the txn. And when the merge happens, the // txn node will therefore not be cleanout. And the read after merge will be // very slow due to tx data table lookup. // So finally we decide to use the max decoded log ts of the ls to update the -// end_log_ts of the memtable -int ObMemtable::resolve_max_end_log_ts_() +// end_scn of the memtable +int ObMemtable::resolve_max_end_scn_() { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - int64_t max_decided_log_ts = OB_INVALID_TIMESTAMP; + palf::SCN max_decided_scn; if (OB_ISNULL(freezer_)) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "freezer should not be null", K(ret)); - } else if (FALSE_IT(max_decided_log_ts = freezer_->get_max_decided_scn().get_val_for_lsn_allocator())) { + } else if (FALSE_IT(max_decided_scn = freezer_->get_max_decided_scn())) { TRANS_LOG(ERROR, "fail to get freeze_snapshot_version", K(ret)); - } else if (OB_INVALID_TIMESTAMP == max_decided_log_ts) { + } else if (palf::SCN::invalid_scn() == max_decided_scn) { // Pass if not necessary - } else if (OB_TMP_FAIL(set_max_end_log_ts(max_decided_log_ts))) { + } else if (OB_TMP_FAIL(set_max_end_scn(max_decided_scn))) { // ignore the error code } @@ -1807,25 +1796,25 @@ int ObMemtable::resolve_max_end_log_ts_() int ObMemtable::resolve_right_boundary() { - int64_t max_end_log_ts = get_max_end_log_ts(); - int64_t end_log_ts = max_end_log_ts; - int64_t start_log_ts = get_start_log_ts(); + palf::SCN max_end_scn = get_max_end_scn(); + palf::SCN end_scn = max_end_scn; + palf::SCN start_scn = get_start_scn(); int ret = OB_SUCCESS; - if (ObScnRange::MIN_TS == max_end_log_ts) { - end_log_ts = start_log_ts; + if (ObScnRange::MIN_SCN == max_end_scn) { + end_scn = start_scn; (void)freezer_->inc_empty_memtable_cnt(); } - if (OB_FAIL(set_end_log_ts(end_log_ts))) { - TRANS_LOG(ERROR, "fail to set end_log_ts", K(ret)); + if (OB_FAIL(set_end_scn(end_scn))) { + TRANS_LOG(ERROR, "fail to set end_scn", K(ret)); } return ret; } -void ObMemtable::resolve_left_boundary(int64_t end_log_ts) +void ObMemtable::resolve_left_boundary(palf::SCN end_scn) { - set_start_log_ts(end_log_ts); + set_start_scn(end_scn); } void ObMemtable::set_freeze_state(const int64_t state) @@ -1890,13 +1879,6 @@ int ObMemtable::flush(share::ObLSID ls_id) return ret; } -palf::SCN ObMemtable::get_rec_scn() -{ - palf::SCN tmp; - tmp.convert_for_lsn_allocator(get_rec_log_ts()); - return tmp; -} - bool ObMemtable::is_active_memtable() const { return !is_frozen_memtable(); diff --git a/src/storage/memtable/ob_memtable.h b/src/storage/memtable/ob_memtable.h index 71d7389bf..a9bbb5d4d 100644 --- a/src/storage/memtable/ob_memtable.h +++ b/src/storage/memtable/ob_memtable.h @@ -283,7 +283,7 @@ public: storage::ObStoreRowIterator *&row_iter) override; // replay_row is used to replay rows in redo log for follower - // ctx is the writer tx's context, we need the log_ts, tx_id for fulfilling the tx node + // ctx is the writer tx's context, we need the scn, tx_id for fulfilling the tx node // mmi is mutator iterator for replay // decrypt_buf is used for decryption virtual int replay_row( @@ -319,7 +319,7 @@ public: inline bool not_empty() const { return INT64_MAX != get_protection_clock(); }; void set_max_schema_version(const int64_t schema_version); virtual int64_t get_max_schema_version() const override; - int row_compact(ObMvccRow *value, const bool for_replay, const int64_t snapshot_version); + int row_compact(ObMvccRow *value, const bool for_replay, const palf::SCN snapshot_version); int64_t get_hash_item_count() const; int64_t get_hash_alloc_memory() const; int64_t get_btree_item_count() const; @@ -330,12 +330,12 @@ public: virtual bool is_active_memtable() const override; virtual bool is_inner_tablet() const { return key_.tablet_id_.is_inner_tablet(); } ObTabletID get_tablet_id() const { return key_.tablet_id_; } - int set_snapshot_version(const int64_t snapshot_version); + int set_snapshot_version(const palf::SCN snapshot_version); int64_t get_memtable_state() const { return state_; } int64_t get_freeze_state() const { return freeze_state_; } int64_t get_protection_clock() const { return local_allocator_.get_protection_clock(); } int64_t get_retire_clock() const { return local_allocator_.get_retire_clock(); } - int get_current_right_boundary(int64_t ¤t_right_boundary); + int get_current_right_boundary(palf::SCN ¤t_right_boundary); inline bool& get_read_barrier() { return read_barrier_; } inline void set_write_barrier() { write_barrier_ = true; } @@ -374,49 +374,32 @@ public: virtual bool ready_for_flush() override; void print_ready_for_flush(); virtual int flush(share::ObLSID ls_id) override; - virtual int64_t get_rec_log_ts() { return ATOMIC_LOAD(&rec_log_ts_); } - virtual palf::SCN get_rec_scn(); + palf::SCN get_rec_scn() { return rec_scn_.atomic_get(); } virtual bool is_frozen_checkpoint() const override { return is_frozen_memtable();} virtual bool is_active_checkpoint() const override { return is_active_memtable();} - virtual OB_INLINE palf::SCN get_end_log_scn() const + virtual OB_INLINE palf::SCN get_end_scn() const { - palf::SCN end_scn = palf::SCN::max_scn(); - end_scn.atomic_set(key_.scn_range_.end_scn_); - return end_scn; + return key_.scn_range_.end_scn_; } - virtual OB_INLINE palf::SCN get_start_log_scn() const + virtual OB_INLINE palf::SCN get_start_scn() const { - palf::SCN start_scn = palf::SCN::min_scn(); - start_scn.atomic_set(key_.scn_range_.start_scn_); - return start_scn; - } - virtual OB_INLINE int64_t get_end_log_ts() const override - { - palf::SCN end_scn = palf::SCN::max_scn(); - end_scn.atomic_set(key_.scn_range_.end_scn_); - return end_scn.get_val_for_inner_table_field(); - } - virtual OB_INLINE int64_t get_start_log_ts() const override - { - palf::SCN start_scn = palf::SCN::min_scn(); - start_scn.atomic_set(key_.scn_range_.start_scn_); - return start_scn.get_val_for_inner_table_field(); + return key_.scn_range_.start_scn_; } bool is_empty() { - return get_end_log_ts() == get_start_log_ts() && - share::ObScnRange::MIN_TS == get_max_end_log_ts(); + return get_end_scn() == get_start_scn() && + share::ObScnRange::MIN_SCN == get_max_end_scn(); } int resolve_right_boundary(); - void resolve_left_boundary(int64_t end_log_ts); + void resolve_left_boundary(palf::SCN end_scn); int resolve_snapshot_version_(); - int resolve_max_end_log_ts_(); - int64_t get_max_end_log_ts() const { return ATOMIC_LOAD(&max_end_log_ts_); } - int set_rec_log_ts(int64_t rec_log_ts); - int set_start_log_ts(const int64_t start_ts); - int set_end_log_ts(const int64_t freeze_ts); - int set_max_end_log_ts(const int64_t log_ts); + int resolve_max_end_scn_(); + palf::SCN get_max_end_scn() const { return max_end_scn_.atomic_get(); } + int set_rec_scn(palf::SCN rec_scn); + int set_start_scn(const palf::SCN start_ts); + int set_end_scn(const palf::SCN freeze_ts); + int set_max_end_scn(const palf::SCN scn); inline int set_logging_blocked() { logging_blocked_start_time = common::ObTimeUtility::current_time(); @@ -455,7 +438,7 @@ public: template int save_multi_source_data_unit(const T *const multi_source_data_unit, - const int64_t log_ts, + const palf::SCN scn, const bool for_replay, const MemtableRefOp ref_op = MemtableRefOp::NONE, const bool is_callback = false); @@ -467,7 +450,7 @@ public: K_(freeze_clock), K_(max_schema_version), K_(write_ref_cnt), K_(local_allocator), K_(unsubmitted_cnt), K_(unsynced_cnt), K_(logging_blocked), K_(unset_active_memtable_logging_blocked), K_(resolve_active_memtable_left_boundary), - K_(contain_hotspot_row), K_(max_end_log_ts), K_(rec_log_ts), K_(snapshot_version), + K_(contain_hotspot_row), K_(max_end_scn), K_(rec_scn), K_(snapshot_version), K_(is_tablet_freeze), K_(is_force_freeze), K_(contain_hotspot_row), K_(read_barrier), K_(is_flushed), K_(freeze_state)); private: @@ -542,9 +525,9 @@ private: int64_t logging_blocked_start_time; // record the start time of logging blocked bool unset_active_memtable_logging_blocked_; bool resolve_active_memtable_left_boundary_; - int64_t freeze_log_ts_; - int64_t max_end_log_ts_; - int64_t rec_log_ts_; + palf::SCN freeze_scn_; + palf::SCN max_end_scn_; + palf::SCN rec_scn_; int64_t state_; int64_t freeze_state_; int64_t timestamp_; @@ -563,7 +546,7 @@ private: template int ObMemtable::save_multi_source_data_unit(const T *const multi_source_data_unit, - const int64_t log_ts, + const palf::SCN scn, const bool for_replay, const MemtableRefOp ref_op, const bool is_callback) @@ -597,19 +580,19 @@ int ObMemtable::save_multi_source_data_unit(const T *const multi_source_data_uni TRANS_LOG(WARN, "fail to inc_cnt_for_multi_data", K(multi_source_data_unit), K(type), KPC(this)); } } - if (log_ts > get_start_log_ts() && log_ts < share::ObScnRange::MAX_TS) { + if (scn > get_start_scn() && scn < share::ObScnRange::MAX_SCN) { if (OB_FAIL(ret)) { } - // skip updating max_end_log_ts of frozen memtable for commit/abort when replay clog. - else if ((share::ObScnRange::MAX_TS == get_end_log_ts() || !for_replay || !is_callback) - && OB_FAIL(set_max_end_log_ts(log_ts))) { - TRANS_LOG(WARN, "failed to set max_end_log_ts", K(ret), K(log_ts), KPC(this)); - } else if (OB_FAIL(set_rec_log_ts(log_ts))) { - TRANS_LOG(WARN, "failed to set rec_log_ts", K(ret), K(log_ts), KPC(this)); + // skip updating max_end_scn of frozen memtable for commit/abort when replay clog. + else if ((share::ObScnRange::MAX_SCN == get_end_scn() || !for_replay || !is_callback) + && OB_FAIL(set_max_end_scn(scn))) { + TRANS_LOG(WARN, "failed to set max_end_scn", K(ret), K(scn), KPC(this)); + } else if (OB_FAIL(set_rec_scn(scn))) { + TRANS_LOG(WARN, "failed to set rec_scn", K(ret), K(scn), KPC(this)); } - } else if (-1 != log_ts && share::ObScnRange::MAX_TS != log_ts) { + } else if (palf::SCN::invalid_scn() != scn && share::ObScnRange::MAX_SCN != scn) { ret = common::OB_ERR_UNEXPECTED; - TRANS_LOG(WARN, "invalid log_ts", K(ret), K(log_ts), KPC(this)); + TRANS_LOG(WARN, "invalid scn", K(ret), K(scn), KPC(this)); } if (MemtableRefOp::DEC_REF == ref_op) { @@ -621,7 +604,7 @@ int ObMemtable::save_multi_source_data_unit(const T *const multi_source_data_uni } } } - TRANS_LOG(INFO, "memtable save multi source data unit", K(ret), K(log_ts), K(ref_op), + TRANS_LOG(INFO, "memtable save multi source data unit", K(ret), K(scn), K(ref_op), KPC(multi_source_data_unit), K(type), KPC(this)); } diff --git a/src/storage/memtable/ob_memtable_context.cpp b/src/storage/memtable/ob_memtable_context.cpp index c1df68930..c7eee61e9 100644 --- a/src/storage/memtable/ob_memtable_context.cpp +++ b/src/storage/memtable/ob_memtable_context.cpp @@ -196,7 +196,7 @@ int64_t ObMemtableCtx::to_string(char *buf, const int64_t buf_len) const common::databuff_printf(buf, buf_len, pos, " end_code=%d is_readonly=%s ref=%ld trans_id=%s ls_id=%ld " "callback_alloc_count=%ld callback_free_count=%ld " - "checksum=%lu tmp_checksum=%lu checksum_log_ts=%s " + "checksum=%lu tmp_checksum=%lu checksum_scn=%s " "redo_filled_count=%ld redo_sync_succ_count=%ld " "redo_sync_fail_count=%ld main_list_length=%ld " "unsynced_cnt=%ld unsubmitted_cnt_=%ld " @@ -438,26 +438,20 @@ int ObMemtableCtx::trans_begin() return ret; } -int ObMemtableCtx::replay_begin(const int64_t log_timestamp) +int ObMemtableCtx::replay_begin(const palf::SCN scn) { ObByteLockGuard guard(lock_); - // TODO(handora.qc): fix it - palf::SCN scn; - scn.convert_for_lsn_allocator(log_timestamp); set_redo_scn(scn); - // TODO set redo log id + return OB_SUCCESS; } int ObMemtableCtx::replay_end(const bool is_replay_succ, - const int64_t log_timestamp) + const palf::SCN scn) { int ret = OB_SUCCESS; ObByteLockGuard guard(lock_); - // TODO(handora.qc): fix it - palf::SCN scn; - scn.convert_for_lsn_allocator(log_timestamp); if (!is_replay_succ) { ret = trans_mgr_.replay_fail(scn); @@ -468,13 +462,10 @@ int ObMemtableCtx::replay_end(const bool is_replay_succ, return ret; } -int ObMemtableCtx::rollback_redo_callbacks(const int64_t log_timestamp) +int ObMemtableCtx::rollback_redo_callbacks(const palf::SCN scn) { int ret = OB_SUCCESS; ObByteLockGuard guard(lock_); - // TODO(handora.qc): fix it - palf::SCN scn; - scn.convert_for_lsn_allocator(log_timestamp); ret = trans_mgr_.replay_fail(scn); @@ -483,14 +474,14 @@ int ObMemtableCtx::rollback_redo_callbacks(const int64_t log_timestamp) int ObMemtableCtx::trans_end( const bool commit, - const int64_t trans_version, - const int64_t final_log_ts) + const palf::SCN trans_version, + const palf::SCN final_scn) { int ret = OB_SUCCESS; ret = do_trans_end(commit, trans_version, - final_log_ts, + final_scn, commit ? OB_TRANS_COMMITED : OB_TRANS_ROLLBACKED); return ret; @@ -513,8 +504,8 @@ int ObMemtableCtx::elr_trans_preparing() int ObMemtableCtx::do_trans_end( const bool commit, - const int64_t trans_version, - const int64_t final_log_ts, + const palf::SCN trans_version, + const palf::SCN final_scn, const int end_code) { int ret = OB_SUCCESS; @@ -527,12 +518,8 @@ int ObMemtableCtx::do_trans_end( ob_abort(); } if (partial_rollbacked || OB_SUCCESS == ATOMIC_LOAD(&end_code_)) { - // TODO(handora.qc): fix it - palf::SCN commit_scn; - commit_scn.convert_for_lsn_allocator(trans_version); - ATOMIC_STORE(&end_code_, end_code); - set_commit_version(commit_scn); + set_commit_version(trans_version); if (OB_FAIL(trans_mgr_.trans_end(commit))) { TRANS_LOG(WARN, "trans end error", K(ret), K(*this)); } @@ -543,9 +530,9 @@ int ObMemtableCtx::do_trans_end( } // release durable table lock if (OB_FAIL(ret)) { - UNUSED(final_log_ts); + UNUSED(final_scn); //commit or abort log ts for clear table lock - } else if (OB_FAIL(clear_table_lock_(commit, trans_version, final_log_ts))) { + } else if (OB_FAIL(clear_table_lock_(commit, trans_version, final_scn))) { TRANS_LOG(ERROR, "clear table lock failed.", K(ret), K(*this)); } (void)partition_audit_info_cache_.stmt_end_update_audit_info(commit); @@ -562,7 +549,7 @@ int ObMemtableCtx::trans_kill() { int ret = OB_SUCCESS; bool commit = false; - ret = do_trans_end(commit, INT64_MAX, INT64_MAX, OB_TRANS_KILLED); + ret = do_trans_end(commit, palf::SCN::max_scn(), palf::SCN::max_scn(), OB_TRANS_KILLED); return ret; } @@ -585,15 +572,15 @@ int ObMemtableCtx::trans_replay_begin() } int ObMemtableCtx::trans_replay_end(const bool commit, - const int64_t trans_version, - const int64_t final_log_ts, + const palf::SCN trans_version, + const palf::SCN final_scn, const uint64_t log_cluster_version, const uint64_t checksum) { int ret = OB_SUCCESS; int cs_ret = OB_SUCCESS; - // We must calculate the checksum and generate the checksum_log_ts even when + // We must calculate the checksum and generate the checksum_scn even when // the checksum verification is unnecessary. This because the trans table // merge may be triggered after clear state in which the callback has already @@ -610,7 +597,7 @@ int ObMemtableCtx::trans_replay_end(const bool commit, } } - if (OB_FAIL(trans_end(commit, trans_version, final_log_ts))) { + if (OB_FAIL(trans_end(commit, trans_version, final_scn))) { TRANS_LOG(ERROR, "trans_end fail", K(ret), K(*this)); } else { ret = cs_ret; @@ -696,11 +683,11 @@ int ObMemtableCtx::log_submitted(const ObRedoLogSubmitHelper &helper) return log_gen_.log_submitted(helper.callbacks_); } -int ObMemtableCtx::sync_log_succ(const int64_t log_ts, const ObCallbackScope &callbacks) +int ObMemtableCtx::sync_log_succ(const palf::SCN scn, const ObCallbackScope &callbacks) { int ret = OB_SUCCESS; - if (OB_FAIL(log_gen_.sync_log_succ(log_ts, callbacks))) { + if (OB_FAIL(log_gen_.sync_log_succ(scn, callbacks))) { TRANS_LOG(WARN, "sync log failed", K(ret)); } @@ -1022,33 +1009,24 @@ int ObMemtableCtx::reuse_log_generator_() return ret; } -int ObMemtableCtx::calc_checksum_before_log_ts(const int64_t log_ts, - uint64_t &checksum, - int64_t &checksum_log_ts) +int ObMemtableCtx::calc_checksum_before_scn(const palf::SCN scn, + uint64_t &checksum, + palf::SCN &checksum_scn) { int ret = OB_SUCCESS; ObByteLockGuard guard(lock_); - // TODO(handora.qc): fix it - palf::SCN checksum_scn; - palf::SCN scn; - scn.convert_for_lsn_allocator(log_ts); if (OB_FAIL(trans_mgr_.calc_checksum_before_scn(scn, checksum, checksum_scn))) { - TRANS_LOG(ERROR, "calc checksum before log ts should not report error", K(ret), K(log_ts)); + TRANS_LOG(ERROR, "calc checksum before log ts should not report error", K(ret), K(scn)); } - checksum_log_ts = checksum_scn.get_val_for_lsn_allocator(); - return ret; } void ObMemtableCtx::update_checksum(const uint64_t checksum, - const int64_t checksum_log_ts) + const palf::SCN checksum_scn) { ObByteLockGuard guard(lock_); - // TODO(handora.qc): fix it - palf::SCN checksum_scn; - checksum_scn.convert_for_lsn_allocator(checksum_log_ts); trans_mgr_.update_checksum(checksum, checksum_scn); } @@ -1267,20 +1245,6 @@ void ObMemtableCtx::set_log_synced(ObMemCtxLockOpLinkNode *lock_op, const palf:: lock_mem_ctx_.set_log_synced(lock_op, scn); } -int ObMemtableCtx::clear_table_lock_(const bool is_commit, - const int64_t commit_version, - const int64_t commit_log_ts) -{ - int ret = OB_SUCCESS; - // TODO: cxf remove it - palf::SCN version; - palf::SCN scn; - version.convert_for_lsn_allocator(commit_version); - scn.convert_for_lsn_allocator(commit_log_ts); - ret = clear_table_lock_(is_commit, version, scn); - return ret; -} - int ObMemtableCtx::clear_table_lock_(const bool is_commit, const palf::SCN &commit_version, const palf::SCN &commit_scn) @@ -1347,17 +1311,6 @@ int ObMemtableCtx::register_multi_source_data_if_need_( return ret; } -int ObMemtableCtx::replay_lock(const tablelock::ObTableLockOp &lock_op, - const int64_t log_ts) -{ - int ret = OB_SUCCESS; - // TODO: remove this - palf::SCN tmp; - tmp.convert_for_lsn_allocator(log_ts); - ret = replay_lock(lock_op, tmp); - return ret; -} - int ObMemtableCtx::replay_lock(const tablelock::ObTableLockOp &lock_op, const palf::SCN &scn) { diff --git a/src/storage/memtable/ob_memtable_context.h b/src/storage/memtable/ob_memtable_context.h index 90aa8d6f5..b9eb3ee2a 100644 --- a/src/storage/memtable/ob_memtable_context.h +++ b/src/storage/memtable/ob_memtable_context.h @@ -346,23 +346,23 @@ public: virtual int write_auth(const bool exclusive); virtual int write_done(); virtual int trans_begin(); - virtual int replay_begin(const int64_t log_timestamp); + virtual int replay_begin(const palf::SCN scn); virtual int replay_end(const bool is_replay_succ, - const int64_t log_timestamp); - int rollback_redo_callbacks(const int64_t log_timestamp); + const palf::SCN scn); + int rollback_redo_callbacks(const palf::SCN scn); virtual uint64_t calc_checksum_all(); virtual void print_callbacks(); virtual int trans_end(const bool commit, - const int64_t trans_version, - const int64_t final_log_ts); + const palf::SCN trans_version, + const palf::SCN final_scn); virtual int trans_clear(); virtual int elr_trans_preparing(); virtual int trans_kill(); virtual int trans_publish(); virtual int trans_replay_begin(); virtual int trans_replay_end(const bool commit, - const int64_t trans_version, - const int64_t final_log_ts, + const palf::SCN trans_version, + const palf::SCN final_scn, const uint64_t log_cluster_version = 0, const uint64_t checksum = 0); //method called when leader takeover @@ -389,17 +389,17 @@ public: int64_t &buf_pos, ObRedoLogSubmitHelper &helper, const bool log_for_lock_node = true); - int calc_checksum_before_log_ts(const int64_t log_ts, - uint64_t &checksum, - int64_t &checksum_log_ts); + int calc_checksum_before_scn(const palf::SCN scn, + uint64_t &checksum, + palf::SCN &checksum_scn); void update_checksum(const uint64_t checksum, - const int64_t checksum_log_ts); + const palf::SCN checksum_scn); int log_submitted(const ObRedoLogSubmitHelper &helper); // the function apply the side effect of dirty txn and return whether // remaining pending callbacks. // NB: the fact whether there remains pending callbacks currently is only used // for continuing logging when minor freeze - int sync_log_succ(const int64_t log_ts, const ObCallbackScope &callbacks); + int sync_log_succ(const palf::SCN scn, const ObCallbackScope &callbacks); void sync_log_fail(const ObCallbackScope &callbacks); bool is_slow_query() const; virtual void set_trans_ctx(transaction::ObPartTransCtx *ctx); @@ -454,7 +454,7 @@ public: void replay_done(); int64_t get_checksum() const { return trans_mgr_.get_checksum(); } int64_t get_tmp_checksum() const { return trans_mgr_.get_tmp_checksum(); } - palf::SCN get_checksum_log_ts() const { return trans_mgr_.get_checksum_scn(); } + palf::SCN get_checksum_scn() const { return trans_mgr_.get_checksum_scn(); } public: // table lock. int enable_lock_table(storage::ObTableHandleV2 &handle); @@ -478,8 +478,6 @@ public: void set_log_synced(ObMemCtxLockOpLinkNode *lock_op, const palf::SCN &scn); // replay lock to lock map and trans part ctx. // used by the replay process of multi data source. - int replay_lock(const transaction::tablelock::ObTableLockOp &lock_op, - const int64_t log_ts); int replay_lock(const transaction::tablelock::ObTableLockOp &lock_op, const palf::SCN &scn); int recover_from_table_lock_durable_info(const ObTableLockInfo &table_lock_info); @@ -493,12 +491,9 @@ public: private: int do_trans_end( const bool commit, - const int64_t trans_version, - const int64_t final_log_ts, + const palf::SCN trans_version, + const palf::SCN final_scn, const int end_code); - int clear_table_lock_(const bool is_commit, - const int64_t commit_version, - const int64_t commit_log_ts); int clear_table_lock_(const bool is_commit, const palf::SCN &commit_version, const palf::SCN &commit_scn); diff --git a/src/storage/memtable/ob_memtable_interface.h b/src/storage/memtable/ob_memtable_interface.h index 89ba72009..ce3494641 100644 --- a/src/storage/memtable/ob_memtable_interface.h +++ b/src/storage/memtable/ob_memtable_interface.h @@ -59,15 +59,15 @@ public: virtual void inc_ref() = 0; virtual void dec_ref() = 0; virtual int trans_begin() = 0; - virtual int trans_end(const bool commit, const int64_t trans_version, const int64_t final_log_ts) = 0; + virtual int trans_end(const bool commit, const palf::SCN trans_version, const palf::SCN final_scn) = 0; virtual int trans_clear() = 0; virtual int elr_trans_preparing() = 0; virtual int trans_kill() = 0; virtual int trans_publish() = 0; virtual int trans_replay_begin() = 0; virtual int trans_replay_end(const bool commit, - const int64_t trans_version, - const int64_t final_log_ts, + const palf::SCN trans_version, + const palf::SCN final_scn, const uint64_t log_cluster_version = 0, const uint64_t checksum = 0) = 0; virtual void print_callbacks() = 0; @@ -148,7 +148,7 @@ struct ObMergePriorityInfo class ObIMemtable: public storage::ObITable { public: - ObIMemtable() : snapshot_version_(ObVersionRange::MAX_VERSION) + ObIMemtable() : snapshot_version_(palf::SCN::max_scn()) {} virtual ~ObIMemtable() {} @@ -205,17 +205,14 @@ public: const uint64_t table_id, const storage::ObTableReadInfo &read_info, const blocksstable::ObDatumRowkey &rowkey) = 0; - virtual int64_t get_frozen_trans_version() { return 0; } - virtual int major_freeze(const common::ObVersion &version) - { UNUSED(version); return common::OB_SUCCESS; } - virtual int minor_freeze(const common::ObVersion &version) - { UNUSED(version); return common::OB_SUCCESS; } virtual void inc_pending_lob_count() {} virtual void dec_pending_lob_count() {} virtual int on_memtable_flushed() { return common::OB_SUCCESS; } virtual bool can_be_minor_merged() { return false; } - void set_snapshot_version(const int64_t snapshot_version) { snapshot_version_ = snapshot_version; } - virtual int64_t get_snapshot_version() const override { return snapshot_version_; } + void set_snapshot_version(const palf::SCN snapshot_version) { snapshot_version_ = snapshot_version; } + // TODO: remove get_snapshot_version() and use get_snapshot_version_scn() instead of it + virtual int64_t get_snapshot_version() const override { return snapshot_version_.get_val_for_tx(); } + virtual palf::SCN get_snapshot_version_scn() const override { return snapshot_version_; } virtual int64_t get_upper_trans_version() const override { return OB_NOT_SUPPORTED; } virtual int64_t get_max_merged_trans_version() const override @@ -276,7 +273,7 @@ public: return false; } protected: - int64_t snapshot_version_; + palf::SCN snapshot_version_; }; //////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/storage/memtable/ob_redo_log_generator.cpp b/src/storage/memtable/ob_redo_log_generator.cpp index b64e8eeaf..d5fcfe2c1 100644 --- a/src/storage/memtable/ob_redo_log_generator.cpp +++ b/src/storage/memtable/ob_redo_log_generator.cpp @@ -202,7 +202,7 @@ int ObRedoLogGenerator::log_submitted(const ObCallbackScope &callbacks) return ret; } -int ObRedoLogGenerator::sync_log_succ(const int64_t log_ts, const ObCallbackScope &callbacks) +int ObRedoLogGenerator::sync_log_succ(const palf::SCN scn, const ObCallbackScope &callbacks) { // no need to submit log // since the number of log callback is enough now @@ -219,9 +219,6 @@ int ObRedoLogGenerator::sync_log_succ(const int64_t log_ts, const ObCallbackScop do { ObITransCallback *iter = (ObITransCallback *)*cursor; if (iter->need_fill_redo()) { - // TODO(handora.qc): fix it - palf::SCN scn; - scn.convert_for_lsn_allocator(log_ts); iter->set_scn(scn); if (OB_TMP_FAIL(iter->log_sync_cb(scn))) { if (OB_SUCC(ret)) { @@ -232,7 +229,7 @@ int ObRedoLogGenerator::sync_log_succ(const int64_t log_ts, const ObCallbackScop redo_sync_succ_cnt_ += 1; } } else { - TRANS_LOG(ERROR, "sync_log_succ error", K(ret), K(iter), K(iter->need_fill_redo()), K(log_ts)); + TRANS_LOG(ERROR, "sync_log_succ error", K(ret), K(iter), K(iter->need_fill_redo()), K(scn)); } } while (cursor != callbacks.end_ && !FALSE_IT(cursor++)); } diff --git a/src/storage/memtable/ob_redo_log_generator.h b/src/storage/memtable/ob_redo_log_generator.h index 52b34ab18..ad96855a6 100644 --- a/src/storage/memtable/ob_redo_log_generator.h +++ b/src/storage/memtable/ob_redo_log_generator.h @@ -74,7 +74,7 @@ public: const bool log_for_lock_node); int search_unsubmitted_dup_tablet_redo(); int log_submitted(const ObCallbackScope &callbacks); - int sync_log_succ(const int64_t log_ts, const ObCallbackScope &callbacks); + int sync_log_succ(const palf::SCN scn, const ObCallbackScope &callbacks); void sync_log_fail(const ObCallbackScope &callbacks); ObITransCallback *get_generate_cursor() { return (ObITransCallback *)*generate_cursor_; } diff --git a/src/storage/memtable/ob_row_compactor.cpp b/src/storage/memtable/ob_row_compactor.cpp index 772aef8c7..e6f4179a7 100644 --- a/src/storage/memtable/ob_row_compactor.cpp +++ b/src/storage/memtable/ob_row_compactor.cpp @@ -272,17 +272,17 @@ int ObMemtableRowCompactor::init(ObMvccRow *row, // So modification is guaranteed to be safety with another modification, // while we need pay attention to the concurrency between lock_for_read // and modification(such as compact) -int ObMemtableRowCompactor::compact(const int64_t snapshot_version) +int ObMemtableRowCompactor::compact(const palf::SCN snapshot_version) { int ret = OB_SUCCESS; if (!is_inited_) { ret = OB_NOT_INIT; - } else if (0 >= snapshot_version || INT64_MAX == snapshot_version) { + } else if (!snapshot_version.is_valid() || palf::SCN::max_scn() == snapshot_version) { STORAGE_LOG(ERROR, "unexpected snapshot version", K(ret), K(snapshot_version)); ret = OB_ERR_UNEXPECTED; } else if (NULL != row_->latest_compact_node_ && - snapshot_version <= row_->latest_compact_node_->trans_version_.get_val_for_lsn_allocator()) { + snapshot_version <= row_->latest_compact_node_->trans_version_) { // concurrent do compact } else { ObTimeGuard tg("row compact", 50L * 1000L); @@ -306,7 +306,7 @@ int ObMemtableRowCompactor::compact(const int64_t snapshot_version) // Find position from where compaction started forward or backward until reached // oldest node or latest compaction node -void ObMemtableRowCompactor::find_start_pos_(const int64_t snapshot_version, +void ObMemtableRowCompactor::find_start_pos_(const palf::SCN snapshot_version, ObMvccTransNode *&start) { int64_t search_cnt = 0; @@ -318,7 +318,7 @@ void ObMemtableRowCompactor::find_start_pos_(const int64_t snapshot_version, // Traverse forward from list_head // We go from head to find the suitable node for compact node start if (palf::SCN::max_scn() == start->trans_version_ // skip uncommited - || snapshot_version < start->trans_version_.get_val_for_lsn_allocator() // skip bigger txn + || snapshot_version < start->trans_version_ // skip bigger txn || !start->is_committed()) { // skip uncommited start = start->prev_; search_cnt++; @@ -330,7 +330,7 @@ void ObMemtableRowCompactor::find_start_pos_(const int64_t snapshot_version, // We need handle the bad case when elr, so we traverse from backward // when there exists latest_compact_node if (NULL != start->next_ // stop at null - && snapshot_version >= start->next_->trans_version_.get_val_for_lsn_allocator() // stop at bigger txn + && snapshot_version >= start->next_->trans_version_ // stop at bigger txn && start->next_->is_committed() // stop at uncommitted && palf::SCN::max_scn() != start->next_->trans_version_) { // stop at uncommitted start = start->next_; @@ -395,7 +395,7 @@ int ObMemtableRowCompactor::try_cleanout_tx_node_during_compact_(ObTxTableGuard return ret; } -ObMvccTransNode *ObMemtableRowCompactor::construct_compact_node_(const int64_t snapshot_version, +ObMvccTransNode *ObMemtableRowCompactor::construct_compact_node_(const palf::SCN snapshot_version, ObMvccTransNode *save) { int ret = OB_SUCCESS; @@ -437,7 +437,7 @@ ObMvccTransNode *ObMemtableRowCompactor::construct_compact_node_(const int64_t s TRANS_LOG(INFO, "ignore aborted node when compact", K(*cur), K(*row_)); cur = cur->prev_; find_committed_tnode = false; - } else if (snapshot_version < cur->trans_version_.get_val_for_lsn_allocator()) { + } else if (snapshot_version < cur->trans_version_) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "unexpected snapshot version", K(snapshot_version), K(*cur), K(*row_)); } else if (NULL == (mtd = reinterpret_cast(cur->buf_))) { @@ -539,9 +539,6 @@ ObMvccTransNode *ObMemtableRowCompactor::construct_compact_node_(const int64_t s ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "unexpected trans version", K(ret), "node", *save); } else { - // TODO(handora.qc):: fix it - palf::SCN snapshot_scn; - snapshot_scn.convert_for_lsn_allocator(snapshot_version); trans_node->tx_id_ = save->tx_id_; trans_node->seq_no_ = save->seq_no_; trans_node->trans_version_ = save->trans_version_; @@ -551,7 +548,7 @@ ObMvccTransNode *ObMemtableRowCompactor::construct_compact_node_(const int64_t s trans_node->type_ = NDT_COMPACT; trans_node->flag_ = save->flag_; trans_node->scn_ = save->scn_; - trans_node->set_snapshot_version_barrier(snapshot_scn); + trans_node->set_snapshot_version_barrier(snapshot_version); TRANS_LOG(DEBUG, "success to compact row, ", K(trans_node->tx_id_), K(dml_flag), K(compact_row_cnt), KPC(save)); } } diff --git a/src/storage/memtable/ob_row_compactor.h b/src/storage/memtable/ob_row_compactor.h index bed585b01..b18a4f521 100644 --- a/src/storage/memtable/ob_row_compactor.h +++ b/src/storage/memtable/ob_row_compactor.h @@ -19,6 +19,7 @@ #include "lib/allocator/ob_small_allocator.h" #include "lib/lock/ob_spin_lock.h" #include "common/object/ob_object.h" +#include "logservice/palf/scn.h" namespace oceanbase { @@ -117,11 +118,11 @@ public: common::ObIAllocator *node_alloc, const bool for_replay); // compact and refresh the update counter by snapshot version - int compact(const int64_t snapshot_version); + int compact(const palf::SCN snapshot_version); private: - void find_start_pos_(const int64_t snapshot_version, + void find_start_pos_(const palf::SCN snapshot_version, ObMvccTransNode *&save); - ObMvccTransNode *construct_compact_node_(const int64_t snapshot_version, + ObMvccTransNode *construct_compact_node_(const palf::SCN snapshot_version, ObMvccTransNode *save); int try_cleanout_tx_node_during_compact_(storage::ObTxTableGuard &tx_table_guard, ObMvccTransNode *tnode); diff --git a/src/storage/ob_i_table.h b/src/storage/ob_i_table.h index 264270549..a4e94359a 100644 --- a/src/storage/ob_i_table.h +++ b/src/storage/ob_i_table.h @@ -212,6 +212,13 @@ public: virtual OB_INLINE share::ObScnRange &get_scn_range() { return key_.scn_range_; } virtual OB_INLINE bool is_trans_state_deterministic() { return get_upper_trans_version() < INT64_MAX; } virtual int64_t get_snapshot_version() const { return key_.get_snapshot_version(); } + // TODO: remove it + virtual palf::SCN get_snapshot_version_scn() const + { + palf::SCN scn; + scn.convert_for_tx(key_.get_snapshot_version()); + return scn; + } virtual int64_t get_upper_trans_version() const { return get_snapshot_version(); } virtual int64_t get_max_merged_trans_version() const { return get_snapshot_version(); } virtual int get_frozen_schema_version(int64_t &schema_version) const = 0; diff --git a/src/storage/ob_storage_schema_recorder.cpp b/src/storage/ob_storage_schema_recorder.cpp index 35e479e4b..a32d28864 100644 --- a/src/storage/ob_storage_schema_recorder.cpp +++ b/src/storage/ob_storage_schema_recorder.cpp @@ -164,7 +164,11 @@ int ObStorageSchemaRecorder::replay_schema_log( int64_t table_version = OB_INVALID_VERSION; ObArenaAllocator tmp_allocator; ObStorageSchema replay_storage_schema; - if (tablet_id_.is_special_merge_tablet()) { + // TODO: fix it + palf::SCN scn; + if (OB_FAIL(scn.convert_for_lsn_allocator(log_ts))) { + LOG_WARN("convert failed", K(log_ts), K(ret)); + } else if (tablet_id_.is_special_merge_tablet()) { // do nothing } else if (OB_FAIL(serialization::decode_i64(buf, size, pos, &table_version))) { // table_version @@ -177,8 +181,10 @@ int ObStorageSchemaRecorder::replay_schema_log( } else if (OB_FAIL(replay_storage_schema.deserialize(tmp_allocator, buf, size, pos))) { LOG_WARN("fail to deserialize storage schema", K(ret), K_(tablet_id)); } else if (FALSE_IT(replay_storage_schema.set_sync_finish(true))) { - } else if (OB_FAIL(tablet_handle_.get_obj()->save_multi_source_data_unit(&replay_storage_schema, log_ts, - true/*for_replay*/, memtable::MemtableRefOp::NONE))) { + } else if (OB_FAIL(tablet_handle_.get_obj()->save_multi_source_data_unit(&replay_storage_schema, + scn, + true/*for_replay*/, + memtable::MemtableRefOp::NONE))) { LOG_WARN("failed to save storage schema on memtable", K(ret), K_(tablet_id), K(replay_storage_schema)); } else { ATOMIC_SET(&max_saved_table_version_, table_version); diff --git a/src/storage/ob_sync_tablet_seq_clog.cpp b/src/storage/ob_sync_tablet_seq_clog.cpp index 91be603e1..4b43e1083 100644 --- a/src/storage/ob_sync_tablet_seq_clog.cpp +++ b/src/storage/ob_sync_tablet_seq_clog.cpp @@ -173,7 +173,7 @@ int ObSyncTabletSeqLogCb::on_failure() } else if (OB_FAIL(tablet_handle.get_obj()->get_latest_autoinc_seq(autoinc_seq))) { LOG_WARN("fail to get latest autoinc seq", K(ret)); } else if (OB_FAIL(tablet_handle.get_obj()->save_multi_source_data_unit(&autoinc_seq, - -1/*clog_ts*/, + palf::SCN::invalid_scn()/*scn*/, false/*for_replay*/, memtable::MemtableRefOp::DEC_REF, true/*is_callback*/))) { diff --git a/src/storage/tablelock/ob_lock_memtable.cpp b/src/storage/tablelock/ob_lock_memtable.cpp index 39de6d3e7..902691549 100644 --- a/src/storage/tablelock/ob_lock_memtable.cpp +++ b/src/storage/tablelock/ob_lock_memtable.cpp @@ -814,7 +814,7 @@ int ObLockMemtable::flush(palf::SCN recycle_scn, scn_range.start_scn_.convert_for_gts(1); scn_range.end_scn_ = freeze_scn_; set_scn_range(scn_range); - set_snapshot_version(freeze_scn_.get_val_for_lsn_allocator()); + set_snapshot_version(freeze_scn_); ATOMIC_STORE(&is_frozen_, true); } } diff --git a/src/storage/tablet/ob_tablet.cpp b/src/storage/tablet/ob_tablet.cpp index 5e2a207d1..768296f0f 100644 --- a/src/storage/tablet/ob_tablet.cpp +++ b/src/storage/tablet/ob_tablet.cpp @@ -2864,7 +2864,7 @@ int ObTablet::get_rec_log_ts(int64_t &rec_log_ts) ret = OB_ERR_UNEXPECTED; LOG_WARN("mt is NULL", KR(ret), K(handle)); } else { - rec_log_ts = mt->get_rec_log_ts(); + rec_log_ts = mt->get_rec_scn().get_val_for_tx(); } return ret; } diff --git a/src/storage/tablet/ob_tablet.h b/src/storage/tablet/ob_tablet.h index 69b2d6ae9..d3806e0ff 100644 --- a/src/storage/tablet/ob_tablet.h +++ b/src/storage/tablet/ob_tablet.h @@ -621,11 +621,8 @@ int ObTablet::prepare_data(T &multi_source_data_unit, const transaction::ObMulSo { int ret = OB_SUCCESS; -//TODO(SCN): const palf::SCN scn = trans_flags.for_replay_ ? trans_flags.log_ts_ : palf::SCN::max_scn(); - palf::SCN scn = palf::SCN::max_scn(); - if (trans_flags.for_replay_) { - scn.convert_for_lsn_allocator(trans_flags.log_ts_); - } + const palf::SCN scn = trans_flags.for_replay_ ? trans_flags.scn_ : palf::SCN::max_scn(); + TRANS_LOG(INFO, "prepare data when tx_end", K(multi_source_data_unit), K(tablet_meta_.tablet_id_)); if (IS_NOT_INIT) { @@ -720,7 +717,7 @@ int ObTablet::save_multi_source_data_unit( memtable::ObMemtable *memtable = nullptr; if (OB_FAIL(memtable_mgr_->get_memtable_for_multi_source_data_unit(memtable, msd->type()))) { TRANS_LOG(WARN, "failed to get multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_scn)); - } else if (OB_FAIL(memtable->save_multi_source_data_unit(msd, memtable_scn.get_val_for_inner_table_field(), for_replay, ref_op, is_callback))) { + } else if (OB_FAIL(memtable->save_multi_source_data_unit(msd, memtable_scn, for_replay, ref_op, is_callback))) { TRANS_LOG(WARN, "failed to save multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_scn), K(ref_op)); } } @@ -729,7 +726,7 @@ int ObTablet::save_multi_source_data_unit( memtable::ObMemtable *memtable = nullptr; if (OB_FAIL(memtable_mgr_->get_memtable_for_multi_source_data_unit(memtable, memtable::MultiSourceDataUnitType::TABLET_TX_DATA))) { TRANS_LOG(WARN, "failed to get multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_scn)); - } else if (OB_FAIL(memtable->save_multi_source_data_unit(msd, memtable_scn.get_val_for_inner_table_field(), for_replay, ref_op/*add_ref*/, is_callback/*false*/))) { + } else if (OB_FAIL(memtable->save_multi_source_data_unit(msd, memtable_scn, for_replay, ref_op/*add_ref*/, is_callback/*false*/))) { TRANS_LOG(WARN, "failed to save multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_scn), K(ref_op)); } } else { @@ -761,7 +758,7 @@ int ObTablet::save_multi_source_data_unit( ob_usleep(100); } if (OB_FAIL(ret)) { - } else if (OB_FAIL(memtable->save_multi_source_data_unit(msd, memtable_scn.get_val_for_inner_table_field(), for_replay, ref_op, is_callback))) { + } else if (OB_FAIL(memtable->save_multi_source_data_unit(msd, memtable_scn, for_replay, ref_op, is_callback))) { TRANS_LOG(WARN, "failed to save multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_scn), K(ref_op), K(for_replay)); } } diff --git a/src/storage/tablet/ob_tablet_binding_helper.cpp b/src/storage/tablet/ob_tablet_binding_helper.cpp index 56b269b25..38c8fc451 100644 --- a/src/storage/tablet/ob_tablet_binding_helper.cpp +++ b/src/storage/tablet/ob_tablet_binding_helper.cpp @@ -345,10 +345,7 @@ int ObTabletBindingHelper::add_tablet_binding( } if (OB_SUCC(ret)) { - SCN scn; - if (OB_FAIL(scn.convert_tmp(trans_flags.log_ts_))) { - LOG_WARN("failed to convert_scn", K(trans_flags), K(ret)); - } else if (OB_FAIL(tablet->set_multi_data_for_commit(info, scn, trans_flags.for_replay_, MemtableRefOp::NONE))) { + if (OB_FAIL(tablet->set_multi_data_for_commit(info, trans_flags.scn_, trans_flags.for_replay_, MemtableRefOp::NONE))) { LOG_WARN("failed to save multi source data", K(ret)); } } @@ -452,7 +449,7 @@ int ObTabletBindingHelper::check_skip_tx_end(const ObTabletID &tablet_id, const ObTabletTxMultiSourceDataUnit tx_data; ObTabletBindingHelper helper(ls, trans_flags); - if (OB_INVALID_TIMESTAMP == trans_flags.log_ts_) { + if (palf::SCN::invalid_scn() == trans_flags.scn_) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(tablet_id), K(ls)); } else if (OB_FAIL(helper.get_tablet(tablet_id, tablet_handle))) { @@ -465,7 +462,7 @@ int ObTabletBindingHelper::check_skip_tx_end(const ObTabletID &tablet_id, const } else if (FALSE_IT(tablet = tablet_handle.get_obj())) { } else if (OB_FAIL(tablet->get_tx_data(tx_data))) { LOG_WARN("failed to get tx data", KR(ret)); - } else if (tx_data.tx_scn_.get_val_for_lsn_allocator() >= trans_flags.log_ts_) { + } else if (tx_data.tx_scn_ >= trans_flags.scn_) { skip = true; } return ret; @@ -612,13 +609,10 @@ int ObTabletBindingHelper::modify_tablet_binding_for_unbind( int ret = OB_SUCCESS; ObLSHandle ls_handle; const ObTransID &tx_id = trans_flags.tx_id_; - const int64_t log_ts = trans_flags.log_ts_; + const palf::SCN scn = trans_flags.scn_; const bool for_replay = trans_flags.for_replay_; const palf::SCN commit_version = trans_flags.trans_version_; - SCN scn; - if (OB_FAIL(scn.convert_tmp(log_ts))) { - LOG_WARN("failed to get data", K(trans_flags), K(ret)); - } else if (OB_FAIL(get_ls(arg.ls_id_, ls_handle))) { + if (OB_FAIL(get_ls(arg.ls_id_, ls_handle))) { LOG_WARN("failed to get ls", K(ret)); } else { ObTabletBindingHelper helper(*ls_handle.get_ls(), trans_flags); @@ -764,7 +758,7 @@ int ObTabletBindingHelper::get_tablet(const ObTabletID &tablet_id, ObTabletHandl ObTabletTxMultiSourceDataUnit tx_data; if (OB_FAIL(handle.get_obj()->get_tx_data(tx_data))) { LOG_WARN("failed to get tx data", K(ret), K(key)); - } else if (OB_INVALID_TIMESTAMP != trans_flags_.log_ts_ && trans_flags_.log_ts_ <= tx_data.tx_scn_.get_val_for_lsn_allocator()) { + } else if (palf::SCN::invalid_scn() != trans_flags_.scn_ && trans_flags_.scn_ <= tx_data.tx_scn_) { ret = OB_NO_NEED_UPDATE; LOG_INFO("tablet frozen", K(ret), K(key), K(trans_flags_), K(tx_data)); } @@ -784,7 +778,7 @@ int ObTabletBindingHelper::replay_get_tablet(const ObTabletMapKey &key, ObTablet if (OB_FAIL(ObTabletCreateDeleteHelper::get_tablet(key, tablet_handle))) { if (OB_TABLET_NOT_EXIST != ret) { LOG_WARN("failed to get tablet", K(ret), K(key)); - } else if (trans_flags_.log_ts_ < tablet_change_checkpoint_scn.get_val_for_lsn_allocator()) { + } else if (trans_flags_.scn_ < tablet_change_checkpoint_scn) { LOG_WARN("tablet already deleted", K(ret), K(key), K(trans_flags_), K(tablet_change_checkpoint_scn)); } else { ret = OB_EAGAIN; @@ -876,14 +870,11 @@ int ObTabletBindingHelper::lock_tablet_binding(ObTabletHandle &handle, const ObM { int ret = OB_SUCCESS; const ObTransID &tx_id = trans_flags.tx_id_; - const int64_t log_ts = trans_flags.log_ts_; + const palf::SCN scn = trans_flags.scn_; const bool for_replay = trans_flags.for_replay_; ObTablet *tablet = handle.get_obj(); ObTabletTxMultiSourceDataUnit tx_data; - SCN scn; - if (OB_FAIL(scn.convert_tmp(log_ts))) { - LOG_WARN("failed to get data", K(trans_flags), K(ret)); - } else if (OB_FAIL(tablet->get_tx_data(tx_data))) { + if (OB_FAIL(tablet->get_tx_data(tx_data))) { LOG_WARN("failed to get tx data", K(ret)); } else { const ObTransID old_tx_id = tx_data.tx_id_; @@ -904,7 +895,7 @@ int ObTabletBindingHelper::lock_tablet_binding(ObTabletHandle &handle, const ObM if (OB_FAIL(ret)) { } else if (need_update && OB_FAIL(tablet->set_tx_data(tx_data, memtable_scn, for_replay, update_cache, ref_op, false/*is_callback*/))) { - LOG_WARN("failed to save tx data", K(ret), K(tx_data), K(log_ts), K(for_replay), K(ref_op)); + LOG_WARN("failed to save tx data", K(ret), K(tx_data), K(scn), K(for_replay), K(ref_op)); } } return ret; @@ -944,14 +935,11 @@ int ObTabletBindingHelper::set_scn(ObTabletHandle &handle, const ObMulSourceData { int ret = OB_SUCCESS; const ObTransID &tx_id = trans_flags.tx_id_; - const int64_t log_ts = trans_flags.log_ts_; + const palf::SCN scn = trans_flags.scn_; const bool for_replay = trans_flags.for_replay_; ObTablet *tablet = handle.get_obj(); ObTabletTxMultiSourceDataUnit data; - SCN scn; - if (OB_FAIL(scn.convert_tmp(log_ts))) { - LOG_WARN("failed to get data", K(trans_flags), K(ret)); - } else if (OB_FAIL(tablet->get_tx_data(data))) { + if (OB_FAIL(tablet->get_tx_data(data))) { LOG_WARN("failed to get data", K(ret)); } else if (OB_UNLIKELY(data.tx_id_ != tx_id)) { ret = OB_ERR_UNEXPECTED; @@ -1004,17 +992,15 @@ int ObTabletBindingHelper::unlock_tablet_binding(ObTabletHandle &handle, const O { int ret = OB_SUCCESS; const ObTransID &tx_id = trans_flags.tx_id_; - const int64_t log_ts = trans_flags.log_ts_; + const palf::SCN scn = trans_flags.scn_; const bool for_replay = trans_flags.for_replay_; const bool for_commit = trans_flags.notify_type_ == NotifyType::ON_COMMIT; ObTablet *tablet = handle.get_obj(); ObTabletTxMultiSourceDataUnit tx_data; - SCN scn; + LOG_INFO("unlock_tablet_binding", KPC(tablet), K(trans_flags)); if (OB_FAIL(tablet->get_tx_data(tx_data))) { LOG_WARN("failed to get tx data", K(ret)); - } else if (OB_FAIL(scn.convert_tmp(log_ts))) { - LOG_WARN("failed to convert_scn", K(trans_flags), K(ret)); } else { const SCN old_scn = tx_data.tx_scn_; if (tx_data.tx_id_ == tx_id) { diff --git a/src/storage/tablet/ob_tablet_create_delete_helper.cpp b/src/storage/tablet/ob_tablet_create_delete_helper.cpp index d8e40c23a..6eb6d7aac 100644 --- a/src/storage/tablet/ob_tablet_create_delete_helper.cpp +++ b/src/storage/tablet/ob_tablet_create_delete_helper.cpp @@ -96,15 +96,12 @@ int ObTabletCreateDeleteHelper::prepare_create_tablets( int ret = OB_SUCCESS; const ObLSID &ls_id = ls_.get_ls_id(); const ObTransID &tx_id = trans_flags.tx_id_; - SCN scn; - const int64_t log_ts = trans_flags.log_ts_; + const palf::SCN scn = trans_flags.scn_; const bool is_clog_replaying = trans_flags.for_replay_; const int64_t create_begin_ts = ObTimeUtility::current_monotonic_time(); ObTabletBindingPrepareCtx binding_ctx; - if (OB_FAIL(scn.convert_tmp(log_ts))) { - LOG_WARN("failed to convert_tmp", K(ret), K(scn)); - } else if (OB_UNLIKELY(!arg.is_valid())) { + if (OB_UNLIKELY(!arg.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(PRINT_CREATE_ARG(arg))); } else if (OB_UNLIKELY(arg.id_ != ls_id)) { @@ -142,16 +139,14 @@ int ObTabletCreateDeleteHelper::replay_prepare_create_tablets( const ObMulSourceDataNotifyArg &trans_flags) { int ret = OB_SUCCESS; - SCN scn; + SCN scn = trans_flags.scn_; const ObBatchCreateTabletArg *final_arg = &arg; ObBatchCreateTabletArg new_arg; ObSArray existed_tablet_id_array; NonLockedHashSet existed_tablet_id_set; bool need_create = true; - if (OB_FAIL(scn.convert_tmp(trans_flags.log_ts_))) { - LOG_WARN("failed to convert_tmp", K(ret), K(scn)); - } else if (OB_UNLIKELY(!arg.is_valid())) { + if (OB_UNLIKELY(!arg.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(PRINT_CREATE_ARG(arg)), K(scn)); } else if (OB_FAIL(existed_tablet_id_set.create(arg.get_tablet_count()))) { @@ -290,7 +285,6 @@ int ObTabletCreateDeleteHelper::handle_special_tablets_for_replay( const ObTabletID &tablet_id = existed_tablet_id_array.at(i); key.tablet_id_ = tablet_id; tx_data.reset(); - SCN scn; bool tx_data_is_valid = false; if (OB_FAIL(get_tablet(key, tablet_handle))) { LOG_WARN("failed to get tablet", K(ret), K(key)); @@ -299,12 +293,10 @@ int ObTabletCreateDeleteHelper::handle_special_tablets_for_replay( } else if (OB_UNLIKELY(tx_data_is_valid)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tx data should not be valid", K(ret), K(key)); - } else if (OB_FAIL(scn.convert_for_lsn_allocator(trans_flags.log_ts_))) { - LOG_WARN("failed to convert scn", K(ret), K(key), K(trans_flags)); } else { int tmp_ret = OB_SUCCESS; tx_data.tx_id_ = trans_flags.tx_id_; - tx_data.tx_scn_ = scn; + tx_data.tx_scn_ = trans_flags.scn_; tx_data.tablet_status_ = ObTabletStatus::CREATING; const bool update_cache = false; @@ -332,8 +324,7 @@ int ObTabletCreateDeleteHelper::set_scn( const ObLSID &ls_id = ls_.get_ls_id(); ObTabletMapKey key; key.ls_id_ = ls_id; - SCN scn; - scn.convert_tmp(trans_flags.log_ts_); + SCN scn = trans_flags.scn_; const int64_t tx_id = trans_flags.tx_id_; const bool for_replay = false; @@ -533,12 +524,10 @@ int ObTabletCreateDeleteHelper::redo_create_tablets( } else if (is_clog_replaying) { LOG_INFO("in clog replaying procedure, do nothing while redo log callback", K(ret)); } else { - SCN scn; + SCN scn = trans_flags.scn_; bool is_valid = true; ObSArray tablet_create_info_array; - if (OB_FAIL(scn.convert_tmp(trans_flags.log_ts_))) { - LOG_WARN("failed to convert_tmp", K(ret), K(scn)); - } else if (OB_FAIL(check_tablet_existence(arg, is_valid))) { + if (OB_FAIL(check_tablet_existence(arg, is_valid))) { LOG_ERROR("failed to check tablet existence", K(ret), K(PRINT_CREATE_ARG(arg))); } else if (!is_valid) { ret = OB_ERR_UNEXPECTED; @@ -632,10 +621,10 @@ int ObTabletCreateDeleteHelper::do_commit_create_tablet( LOG_WARN("failed to get tablet", K(ret), K(key)); } else if (OB_FAIL(tablet_handle.get_obj()->get_tx_data(tx_data))) { LOG_WARN("failed to get tx data", K(ret), K(tablet_handle)); - } else if (trans_flags.for_replay_ && trans_flags.log_ts_ <= tx_data.tx_scn_.get_val_for_inner_table_field()) { + } else if (trans_flags.for_replay_ && trans_flags.scn_ <= tx_data.tx_scn_) { // replaying procedure, clog ts is smaller than tx log ts, just skip LOG_INFO("skip commit create tablet", K(ret), K(key), K(trans_flags), K(tx_data)); - } else if (OB_UNLIKELY(!trans_flags.for_replay_ && trans_flags.log_ts_ <= tx_data.tx_scn_.get_val_for_inner_table_field())) { + } else if (OB_UNLIKELY(!trans_flags.for_replay_ && trans_flags.scn_ <= tx_data.tx_scn_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("log ts is smaller than tx log ts", K(ret), K(key), K(trans_flags), K(tx_data)); } else if (OB_UNLIKELY(trans_flags.tx_id_ != tx_data.tx_id_)) { @@ -645,8 +634,8 @@ int ObTabletCreateDeleteHelper::do_commit_create_tablet( ret = OB_ERR_UNEXPECTED; LOG_WARN("tablet status is not CREATING", K(ret), K(key), K(trans_flags), K(tx_data)); } else { - const int64_t tx_log_ts = trans_flags.log_ts_; - const int64_t memtable_log_ts = trans_flags.log_ts_; + const int64_t tx_log_ts = trans_flags.scn_.get_val_for_lsn_allocator(); + const int64_t memtable_log_ts = trans_flags.scn_.get_val_for_lsn_allocator(); if (OB_FAIL(set_tablet_final_status(tablet_handle, ObTabletStatus::NORMAL, tx_log_ts, memtable_log_ts, trans_flags.for_replay_))) { @@ -697,10 +686,12 @@ int ObTabletCreateDeleteHelper::do_abort_create_tablets( ObTabletMapKey key; key.ls_id_ = ls_.get_ls_id(); - if (OB_UNLIKELY(trans_flags.log_ts_ < OB_INVALID_TIMESTAMP)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid log ts", K(ret), K(trans_flags)); - } else if (!trans_flags.is_redo_synced()) { + // TODO: fix it after multi source data refactor + // if (OB_UNLIKELY(trans_flags.log_ts_ < OB_INVALID_TIMESTAMP)) { + // ret = OB_ERR_UNEXPECTED; + // LOG_WARN("invalid log ts", K(ret), K(trans_flags)); + // } else + if (!trans_flags.is_redo_synced()) { // on redo cb has not been called // just remove tablets directly if (OB_FAIL(roll_back_remove_tablets(arg, trans_flags))) { @@ -881,7 +872,7 @@ int ObTabletCreateDeleteHelper::roll_back_remove_tablet( LOG_INFO("memtable does not have msd, do nothing", K(ret), K(ls_id), K(tablet_id)); } else if (OB_FAIL(tablet->get_tx_data(tx_data))) { LOG_WARN("failed to get tx data", K(ret), K(ls_id), K(tablet_id)); - } else if (OB_FAIL(tablet->save_multi_source_data_unit(&tx_data, ObScnRange::MAX_TS, + } else if (OB_FAIL(tablet->save_multi_source_data_unit(&tx_data, ObScnRange::MAX_SCN, trans_flags.for_replay_, MemtableRefOp::DEC_REF, true/*is_callback*/))) { LOG_WARN("failed to save msd", K(ret), K(ls_id), K(tablet_id)); } @@ -921,12 +912,12 @@ int ObTabletCreateDeleteHelper::do_abort_create_tablet( if (OB_FAIL(tablet_handle.get_obj()->get_tx_data(tx_data))) { LOG_WARN("failed to get tx data", K(ret), K(tablet_handle)); - } else if (trans_flags.for_replay_ && trans_flags.log_ts_ <= tx_data.tx_scn_.get_val_for_inner_table_field()) { + } else if (trans_flags.for_replay_ && trans_flags.scn_ <= tx_data.tx_scn_) { // replaying procedure, clog ts is smaller than tx log ts, just skip LOG_INFO("skip abort create tablet", K(ret), K(tablet_id), K(trans_flags), K(tx_data)); } else if (OB_UNLIKELY(!trans_flags.for_replay_ - && tx_data.tx_scn_.get_val_for_inner_table_field() != OB_MAX_SCN_TS_NS - && trans_flags.log_ts_ <= tx_data.tx_scn_.get_val_for_inner_table_field())) { + && tx_data.tx_scn_ != palf::SCN::max_scn() + && trans_flags.scn_ <= tx_data.tx_scn_)) { // If tx log ts equals ObScnRange::MAX_TS, it means redo callback has not been called. // Thus, we should handle this situation ret = OB_ERR_UNEXPECTED; @@ -938,8 +929,8 @@ int ObTabletCreateDeleteHelper::do_abort_create_tablet( ret = OB_ERR_UNEXPECTED; LOG_WARN("tablet status is not CREATING", K(ret), K(tablet_id), K(trans_flags), K(tx_data)); } else { - const int64_t tx_log_ts = trans_flags.log_ts_; - const int64_t memtable_log_ts = trans_flags.log_ts_; + const int64_t tx_log_ts = trans_flags.scn_.get_val_for_inner_table_field(); + const int64_t memtable_log_ts = trans_flags.scn_.get_val_for_inner_table_field(); if (OB_FAIL(set_tablet_final_status(tablet_handle, ObTabletStatus::DELETED, tx_log_ts, memtable_log_ts, trans_flags.for_replay_))) { @@ -980,7 +971,7 @@ int ObTabletCreateDeleteHelper::prepare_remove_tablets( } else if (trans_flags.for_replay_) { if (OB_FAIL(tablet_id_array.reserve(tablet_cnt))) { LOG_WARN("fail to allocate memory for tablet_id_array", K(ret), K(tablet_cnt)); - } else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.log_ts_, tablet_id_array))) { + } else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.scn_.get_val_for_inner_table_field(), tablet_id_array))) { LOG_WARN("failed to replay verify tablets", K(ret), K(trans_flags)); } } @@ -999,23 +990,18 @@ int ObTabletCreateDeleteHelper::prepare_remove_tablets( } else if (OB_FAIL(tablet_handle.get_obj()->get_tx_data(cur_tx_data))) { LOG_WARN("failed to get tx data", K(ret), K(key)); } else { - SCN flag_scn; - if (OB_INVALID_SCN_VAL != trans_flags.log_ts_ - && OB_FAIL(flag_scn.convert_for_lsn_allocator(trans_flags.log_ts_))) { - LOG_WARN("failed to convert_tmp", K(ret), K(trans_flags)); - } else { - ObTabletTxMultiSourceDataUnit tx_data; - tx_data.tx_id_ = trans_flags.tx_id_; - tx_data.tx_scn_ = trans_flags.for_replay_ ? flag_scn: cur_tx_data.tx_scn_; - tx_data.tablet_status_ = ObTabletStatus::DELETING; + SCN flag_scn = trans_flags.scn_; + ObTabletTxMultiSourceDataUnit tx_data; + tx_data.tx_id_ = trans_flags.tx_id_; + tx_data.tx_scn_ = trans_flags.for_replay_ ? flag_scn: cur_tx_data.tx_scn_; + tx_data.tablet_status_ = ObTabletStatus::DELETING; - if (OB_FAIL(ObTabletBindingHelper::lock_and_set_tx_data(tablet_handle, tx_data, trans_flags.for_replay_))) { - LOG_WARN("failed to lock tablet binding", K(ret), K(key), K(tx_data)); - // TODO(bowen.gbw): temproarily swallow 4200 error while prepare remove tablets - if (trans_flags.for_replay_ && OB_HASH_EXIST == ret) { - ret = OB_SUCCESS; - LOG_INFO("swallow 4200 error", K(ret), K(key), K(tx_data), K(trans_flags)); - } + if (OB_FAIL(ObTabletBindingHelper::lock_and_set_tx_data(tablet_handle, tx_data, trans_flags.for_replay_))) { + LOG_WARN("failed to lock tablet binding", K(ret), K(key), K(tx_data)); + // TODO(bowen.gbw): temproarily swallow 4200 error while prepare remove tablets + if (trans_flags.for_replay_ && OB_HASH_EXIST == ret) { + ret = OB_SUCCESS; + LOG_INFO("swallow 4200 error", K(ret), K(key), K(tx_data), K(trans_flags)); } } } @@ -1084,7 +1070,7 @@ int ObTabletCreateDeleteHelper::redo_remove_tablets( int ret = OB_SUCCESS; const ObLSID &ls_id = ls_.get_ls_id(); const bool is_clog_replaying = trans_flags.for_replay_; - const int64_t log_ts = trans_flags.log_ts_; + const palf::SCN scn = trans_flags.scn_; if (OB_UNLIKELY(!arg.is_valid())) { ret = OB_INVALID_ARGUMENT; @@ -1100,14 +1086,11 @@ int ObTabletCreateDeleteHelper::redo_remove_tablets( ObTabletMapKey key; key.ls_id_ = ls_id; for (int64_t i = 0; OB_SUCC(ret) && i < arg.tablet_ids_.count(); ++i) { - SCN scn; key.tablet_id_ = arg.tablet_ids_.at(i); if (OB_FAIL(get_tablet(key, tablet_handle))) { LOG_WARN("failed to get tablet", K(ret), K(key)); } else if (OB_FAIL(tablet_handle.get_obj()->get_tx_data(tx_data))) { LOG_WARN("failed to get tx data", K(ret), K(tablet_handle)); - } else if (OB_FAIL(scn.convert_tmp(log_ts))) { - LOG_WARN("failed to convert_tmp", K(ret), K(scn)); } else if (OB_FAIL(tablet_handle.get_obj()->set_tx_scn(tx_data.tx_id_, scn, false/*for_replay*/))) { LOG_WARN("failed to set tx data", K(ret), K(tablet_handle), K(tx_data), K(scn)); } @@ -1137,7 +1120,7 @@ int ObTabletCreateDeleteHelper::commit_remove_tablets( LOG_WARN("unexpected arg", K(ret), K(arg), K(ls_id)); } else if (trans_flags.for_replay_) { if (OB_FAIL(tablet_id_array.reserve(arg.tablet_ids_.count()))) { - } else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.log_ts_, tablet_id_array))) { + } else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.scn_.get_val_for_inner_table_field(), tablet_id_array))) { LOG_WARN("failed to replay verify tablets", K(ret), K(trans_flags)); } } @@ -1173,7 +1156,7 @@ int ObTabletCreateDeleteHelper::do_commit_remove_tablet( LOG_WARN("failed to get tablet", K(ret), K(key)); } else if (OB_FAIL(tablet_handle.get_obj()->get_tx_data(tx_data))) { LOG_WARN("failed to get tx data", K(ret), K(key)); - } else if (OB_UNLIKELY(!trans_flags.for_replay_ && trans_flags.log_ts_ <= tx_data.tx_scn_.get_val_for_inner_table_field())) { + } else if (OB_UNLIKELY(!trans_flags.for_replay_ && trans_flags.scn_ <= tx_data.tx_scn_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("log ts is smaller than tx log ts", K(ret), K(key), K(trans_flags), K(tx_data)); } else if (OB_UNLIKELY(trans_flags.tx_id_ != tx_data.tx_id_)) { @@ -1183,8 +1166,8 @@ int ObTabletCreateDeleteHelper::do_commit_remove_tablet( ret = OB_ERR_UNEXPECTED; LOG_WARN("tablet status is not DELETING", K(ret), K(key), K(trans_flags), K(tx_data)); } else { - const int64_t tx_log_ts = trans_flags.log_ts_; - const int64_t memtable_log_ts = trans_flags.log_ts_; + const int64_t tx_log_ts = trans_flags.scn_.get_val_for_inner_table_field(); + const int64_t memtable_log_ts = trans_flags.scn_.get_val_for_inner_table_field(); if (OB_FAIL(set_tablet_final_status(tablet_handle, ObTabletStatus::DELETED, tx_log_ts, memtable_log_ts, trans_flags.for_replay_))) { @@ -1216,7 +1199,7 @@ int ObTabletCreateDeleteHelper::abort_remove_tablets( LOG_WARN("unexpected arg", K(ret), K(arg), K(ls_id)); } else if (trans_flags.for_replay_) { if (OB_FAIL(tablet_id_array.reserve(arg.tablet_ids_.count()))) { - } else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.log_ts_, tablet_id_array))) { + } else if (OB_FAIL(replay_verify_tablets(arg, trans_flags.scn_.get_val_for_inner_table_field(), tablet_id_array))) { LOG_WARN("failed to replay verify tablets", K(ret), K(trans_flags)); } } @@ -1271,8 +1254,8 @@ int ObTabletCreateDeleteHelper::do_abort_remove_tablet( } else if (OB_FAIL(tablet_handle.get_obj()->get_tx_data(tx_data))) { LOG_WARN("failed to get tx data", K(ret), K(key)); } else if (OB_UNLIKELY(!trans_flags.for_replay_ - && trans_flags.log_ts_ != OB_INVALID_TIMESTAMP - && trans_flags.log_ts_ <= tx_data.tx_scn_.get_val_for_inner_table_field())) { + && trans_flags.scn_ != palf::SCN::invalid_scn() + && trans_flags.scn_ <= tx_data.tx_scn_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("log ts is smaller than tx log ts", K(ret), K(key), K(trans_flags), K(tx_data)); } else if (OB_UNLIKELY(trans_flags.tx_id_ != tx_data.tx_id_)) { @@ -1314,7 +1297,9 @@ int ObTabletCreateDeleteHelper::do_abort_remove_tablet( } } else { const int64_t tx_log_ts = tx_data.tx_scn_.get_val_for_inner_table_field(); - const int64_t memtable_log_ts = (OB_INVALID_TIMESTAMP == trans_flags.log_ts_) ? share::ObScnRange::MAX_TS: trans_flags.log_ts_; + const int64_t memtable_log_ts = (palf::SCN::invalid_scn() == trans_flags.scn_) + ? share::ObScnRange::MAX_TS + : trans_flags.scn_.get_val_for_inner_table_field(); if (OB_FAIL(set_tablet_final_status(tablet_handle, ObTabletStatus::NORMAL, tx_log_ts, memtable_log_ts, trans_flags.for_replay_, ref_op))) { @@ -2287,21 +2272,15 @@ int ObTabletCreateDeleteHelper::do_create_tablet( ObFreezer *freezer = ls_.get_freezer(); bool need_create_empty_major_sstable = false; const ObTransID &tx_id = trans_flags.tx_id_; - SCN scn; - SCN create_scn; const bool for_replay = trans_flags.for_replay_; - const int64_t log_ts = for_replay ? trans_flags.log_ts_ : share::ObScnRange::MAX_TS; - const int64_t create_ts = trans_flags.log_ts_; // may be invalid MemtableRefOp ref_op = for_replay ? MemtableRefOp::NONE : MemtableRefOp::INC_REF; + SCN scn = trans_flags.for_replay_ ? trans_flags.scn_ : palf::SCN::max_scn(); + SCN create_scn = trans_flags.scn_; ObMetaDiskAddr mem_addr; ObTabletTableStoreFlag table_store_flag; table_store_flag.set_with_major_sstable(); if (OB_FAIL(mem_addr.set_mem_addr(0, sizeof(ObTablet)))) { LOG_WARN("fail to set memory address", K(ret)); - } else if (OB_FAIL(scn.convert_tmp(log_ts))) { - LOG_WARN("failed to convert_tmp", K(ret), K(scn)); - } else if (OB_FAIL(create_scn.convert_tmp(create_ts))) { - LOG_WARN("failed to convert_tmp", K(ret), K(create_scn)); } else if (OB_FAIL(acquire_tablet(key, tablet_handle, false/*only acquire*/))) { LOG_WARN("failed to acquire tablet", K(ret), K(key)); } else if (OB_FAIL(check_need_create_empty_major_sstable(table_schema, need_create_empty_major_sstable))) { diff --git a/src/storage/tablet/ob_tablet_memtable_mgr.cpp b/src/storage/tablet/ob_tablet_memtable_mgr.cpp index 33ad358a8..a1cec9acc 100644 --- a/src/storage/tablet/ob_tablet_memtable_mgr.cpp +++ b/src/storage/tablet/ob_tablet_memtable_mgr.cpp @@ -209,7 +209,7 @@ int ObTabletMemtableMgr::create_memtable(const int64_t clog_checkpoint_ts, last_frozen_memtable->resolve_right_boundary(); TRANS_LOG(INFO, "[resolve_right_boundary] create_memtable", K(for_replay), K(ls_id), KPC(last_frozen_memtable)); if (memtable != last_frozen_memtable) { - memtable->resolve_left_boundary(last_frozen_memtable->get_end_log_ts()); + memtable->resolve_left_boundary(last_frozen_memtable->get_end_scn()); } } // there is no frozen memtable and new sstable will not be generated, @@ -221,7 +221,7 @@ int ObTabletMemtableMgr::create_memtable(const int64_t clog_checkpoint_ts, } else if (OB_FAIL(get_newest_snapshot_version(new_snapshot_version))){ LOG_WARN("failed to get newest snapshot_version", K(ret), K(ls_id), K(tablet_id_), K(new_snapshot_version)); } else { - memtable->resolve_left_boundary(new_clog_checkpoint_scn.get_val_for_lsn_allocator()); + memtable->resolve_left_boundary(new_clog_checkpoint_scn); } time_guard.click("init memtable"); @@ -367,7 +367,9 @@ ObMemtable *ObTabletMemtableMgr::get_last_frozen_memtable_() const return memtable; } -int ObTabletMemtableMgr::resolve_left_boundary_for_active_memtable(ObIMemtable *memtable, int64_t start_log_ts, int64_t snapshot_version) +int ObTabletMemtableMgr::resolve_left_boundary_for_active_memtable(ObIMemtable *memtable, + palf::SCN start_scn, + palf::SCN snapshot_scn) { ObTableHandleV2 handle; ObIMemtable *active_memtable = nullptr; @@ -381,8 +383,8 @@ int ObTabletMemtableMgr::resolve_left_boundary_for_active_memtable(ObIMemtable * } else if (OB_FAIL(handle.get_memtable(active_memtable))) { LOG_WARN("fail to get active memtable", K(ret)); } else { - // set the start_log_ts of the new memtable - static_cast(active_memtable)->resolve_left_boundary(start_log_ts); + // set the start_scn of the new memtable + static_cast(active_memtable)->resolve_left_boundary(start_scn); } if (OB_ENTRY_NOT_EXIST== ret) { ret = OB_SUCCESS; diff --git a/src/storage/tablet/ob_tablet_memtable_mgr.h b/src/storage/tablet/ob_tablet_memtable_mgr.h index 3ccf8a726..e7b1fa069 100644 --- a/src/storage/tablet/ob_tablet_memtable_mgr.h +++ b/src/storage/tablet/ob_tablet_memtable_mgr.h @@ -83,7 +83,9 @@ public: const bool include_active_memtable = true); int get_memtables_nolock(ObTableHdlArray &handle); int get_first_frozen_memtable(ObTableHandleV2 &handle) const; - int resolve_left_boundary_for_active_memtable(memtable::ObIMemtable *memtable, int64_t start_log_ts, int64_t snapshot_version); + int resolve_left_boundary_for_active_memtable(memtable::ObIMemtable *memtable, + palf::SCN start_scn, + palf::SCN snapshot_version); int unset_logging_blocked_for_active_memtable(memtable::ObIMemtable *memtable); int set_is_tablet_freeze_for_active_memtable(memtable::ObIMemtable *&memtable, bool is_force_freeze = false); diff --git a/src/storage/tablet/ob_tablet_meta.cpp b/src/storage/tablet/ob_tablet_meta.cpp index da55b107d..09eb49509 100644 --- a/src/storage/tablet/ob_tablet_meta.cpp +++ b/src/storage/tablet/ob_tablet_meta.cpp @@ -149,7 +149,8 @@ int ObTabletMeta::init( } else if (OB_UNLIKELY(!ls_id.is_valid()) || OB_UNLIKELY(!tablet_id.is_valid()) || OB_UNLIKELY(!data_tablet_id.is_valid()) - || OB_UNLIKELY(!create_scn.is_valid()) + // TODO: fix it after multi source data refactor + // || OB_UNLIKELY(!create_scn.is_valid()) || OB_UNLIKELY(!snapshot_version.is_valid()) || OB_UNLIKELY(lib::Worker::CompatMode::INVALID == compat_mode)) { ret = OB_INVALID_ARGUMENT; diff --git a/src/storage/tx/ob_multi_data_source.cpp b/src/storage/tx/ob_multi_data_source.cpp index 764b7c673..f5d66abe6 100644 --- a/src/storage/tx/ob_multi_data_source.cpp +++ b/src/storage/tx/ob_multi_data_source.cpp @@ -235,7 +235,7 @@ int ObMulSourceTxDataNotifier::notify_table_lock( // TABLELOCK only need deal with replay process, but not apply. // the replay process will produce a lock op and will be dealt at trans end. } else if (OB_FAIL(mt_ctx->replay_lock(lock_op, - arg.log_ts_))) { + arg.scn_))) { TRANS_LOG(WARN, "replay lock failed", K(ret)); } else { // do nothing diff --git a/src/storage/tx/ob_trans_define.cpp b/src/storage/tx/ob_trans_define.cpp index 771fd42c6..831a9ffb1 100644 --- a/src/storage/tx/ob_trans_define.cpp +++ b/src/storage/tx/ob_trans_define.cpp @@ -996,7 +996,7 @@ void ObTxExecInfo::reset() max_applying_part_log_no_ = INT64_MAX; max_submitted_seq_no_ = 0; checksum_ = 0; - checksum_log_ts_ = 0; + checksum_scn_.set_min(); max_durable_lsn_.reset(); data_complete_ = false; is_dup_tx_ = false; @@ -1035,7 +1035,7 @@ OB_SERIALIZE_MEMBER(ObTxExecInfo, max_applying_part_log_no_, max_submitted_seq_no_, checksum_, - checksum_log_ts_, + checksum_scn_, max_durable_lsn_, data_complete_, is_dup_tx_, diff --git a/src/storage/tx/ob_trans_define.h b/src/storage/tx/ob_trans_define.h index 11bce7a25..a476a848d 100644 --- a/src/storage/tx/ob_trans_define.h +++ b/src/storage/tx/ob_trans_define.h @@ -1725,7 +1725,7 @@ public: K_(max_applying_part_log_no), K_(max_submitted_seq_no), K_(checksum), - K_(checksum_log_ts), + K_(checksum_scn), K_(max_durable_lsn), K_(data_complete), K_(is_dup_tx), @@ -1750,7 +1750,7 @@ public: int64_t max_applying_part_log_no_; // start from 0 on follower and always be INT64_MAX on leader int64_t max_submitted_seq_no_; // maintains on Leader and transfer to Follower via ActiveInfoLog uint64_t checksum_; - int64_t checksum_log_ts_; + palf::SCN checksum_scn_; palf::LSN max_durable_lsn_; bool data_complete_; bool is_dup_tx_; @@ -1768,7 +1768,7 @@ static const int64_t USEC_PER_SEC = 1000 * 1000; struct ObMulSourceDataNotifyArg { ObTransID tx_id_; - int64_t log_ts_; // the log ts of current notify type + palf::SCN scn_; // the log ts of current notify type // in case of abort transaction, trans_version_ is invalid palf::SCN trans_version_; bool for_replay_; @@ -1778,7 +1778,7 @@ struct ObMulSourceDataNotifyArg bool redo_synced_; TO_STRING_KV(K_(tx_id), - K_(log_ts), + K_(scn), K_(trans_version), K_(for_replay), K_(notify_type), diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 9d4ddbb1b..944190624 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -778,7 +778,7 @@ int ObPartTransCtx::iterate_tx_obj_lock_op(ObLockOpIterator &iter) const return ret; } -bool ObPartTransCtx::need_update_schema_version(const int64_t log_id, const int64_t log_ts) +bool ObPartTransCtx::need_update_schema_version(const int64_t log_id, const palf::SCN) { // const int64_t restore_snapshot_version = ls_tx_ctx_mgr_->get_restore_snapshot_version(); // const int64_t last_restore_log_id = ls_tx_ctx_mgr_->get_last_restore_log_id(); @@ -796,8 +796,8 @@ int ObPartTransCtx::trans_replay_abort_(const palf::SCN &final_log_ts) int ret = OB_SUCCESS; if (OB_FAIL(mt_ctx_.trans_replay_end(false, /*commit*/ - ctx_tx_data_.get_commit_version().get_val_for_lsn_allocator(), - final_log_ts.get_val_for_lsn_allocator()))) { + ctx_tx_data_.get_commit_version(), + final_log_ts))) { TRANS_LOG(WARN, "transaction replay end error", KR(ret), KPC(this)); } @@ -818,8 +818,8 @@ int ObPartTransCtx::trans_replay_commit_(const palf::SCN &commit_version, } else { int64_t freeze_ts = 0; if (OB_FAIL(mt_ctx_.trans_replay_end(true, /*commit*/ - commit_version.get_val_for_lsn_allocator(), - final_log_ts.get_val_for_lsn_allocator(), + commit_version, + final_log_ts, log_cluster_version, checksum))) { TRANS_LOG(WARN, "transaction replay end error", KR(ret), K(commit_version), K(checksum), @@ -1243,7 +1243,7 @@ int ObPartTransCtx::recover_tx_ctx_table_info(const ObTxCtxTableInfo &ctx_info) } else if (OB_FAIL(deep_copy_mds_array(ctx_info.exec_info_.multi_data_source_))) { TRANS_LOG(WARN, "deep copy ctx_info mds_array failed", K(ret)); } else if (FALSE_IT(mt_ctx_.update_checksum(exec_info_.checksum_, - exec_info_.checksum_log_ts_))) { + exec_info_.checksum_scn_))) { TRANS_LOG(ERROR, "recover checksum failed", K(ret), KPC(this), K(ctx_info)); } else if (!is_local_tx_() && OB_FAIL(ObTxCycleTwoPhaseCommitter::recover_from_tx_table())) { TRANS_LOG(ERROR, "recover_from_tx_table failed", K(ret), KPC(this)); @@ -1511,13 +1511,14 @@ int ObPartTransCtx::on_dist_end_(const bool commit) int64_t start_us, end_us; start_us = end_us = 0; - const int64_t trans_version = commit ? ctx_tx_data_.get_commit_version().get_val_for_lsn_allocator() : -1; + const palf::SCN trans_version = commit ? ctx_tx_data_.get_commit_version() : palf::SCN::invalid_scn(); // Distributed transactions need to wait for the commit log majority successfully before // unlocking. If you want to know the reason, it is in the ::do_dist_commit start_us = ObTimeUtility::fast_current_time(); - if (OB_FAIL(mt_ctx_.trans_end(commit, trans_version, - ctx_tx_data_.get_end_log_ts().get_val_for_lsn_allocator()))) { + if (OB_FAIL(mt_ctx_.trans_end(commit, + trans_version, + ctx_tx_data_.get_end_log_ts()))) { TRANS_LOG(WARN, "trans end error", KR(ret), K(commit), K(trans_version), "context", *this); } else if (FALSE_IT(end_us = ObTimeUtility::fast_current_time())) { } else if (commit) { @@ -1844,7 +1845,7 @@ int ObPartTransCtx::common_on_success_(ObTxLogCb *log_cb) exec_info_.max_durable_lsn_ = lsn; } if (OB_SUCC(ret)) { - if (OB_FAIL(mt_ctx_.sync_log_succ(log_ts.get_val_for_lsn_allocator(), log_cb->get_callbacks()))) { + if (OB_FAIL(mt_ctx_.sync_log_succ(log_ts, log_cb->get_callbacks()))) { TRANS_LOG(ERROR, "mt ctx sync log failed", KR(ret), K(*log_cb), K(*this)); } else if (OB_SUCCESS != (tmp_ret = mt_ctx_.remove_callbacks_for_fast_commit())) { TRANS_LOG(WARN, "cleanout callbacks for fast commit", K(ret), K(*this)); @@ -3541,11 +3542,11 @@ void ObPartTransCtx::check_no_need_replay_checksum(const palf::SCN &log_ts) { // TODO(handora.qc): How to lock the tx_ctx - // checksum_log_ts_ means all data's checksum has been calculated before the - // log of checksum_log_ts_(not included). So if the data with this log_ts is - // not replayed with checksum_log_ts_ <= log_ts, it means may exist some data + // checksum_scn_ means all data's checksum has been calculated before the + // log of checksum_scn_(not included). So if the data with this scn is + // not replayed with checksum_scn_ <= scn, it means may exist some data // will never be replayed because the memtable will filter the data. - if (exec_info_.checksum_log_ts_ <= log_ts.get_val_for_lsn_allocator()) { + if (exec_info_.checksum_scn_ <= log_ts) { exec_info_.need_checksum_ = false; } } @@ -4788,9 +4789,9 @@ int ObPartTransCtx::get_tx_ctx_table_info_(ObTxCtxTableInfo &info) } if (OB_SUCC(ret)) { - if (OB_FAIL(mt_ctx_.calc_checksum_before_log_ts(exec_info_.max_applied_log_ts_.get_val_for_lsn_allocator(), - exec_info_.checksum_, - exec_info_.checksum_log_ts_))) { + if (OB_FAIL(mt_ctx_.calc_checksum_before_scn(exec_info_.max_applied_log_ts_, + exec_info_.checksum_, + exec_info_.checksum_scn_))) { TRANS_LOG(ERROR, "calc checksum before log ts failed", K(ret), KPC(this)); } else { info.tx_id_ = trans_id_; @@ -5070,7 +5071,7 @@ int ObPartTransCtx::notify_data_source_(const NotifyType notify_type, int ret = OB_SUCCESS; ObMulSourceDataNotifyArg arg; arg.tx_id_ = trans_id_; - arg.log_ts_ = log_ts.get_val_for_lsn_allocator(); + arg.scn_ = log_ts; arg.trans_version_ = ctx_tx_data_.get_commit_version(); arg.for_replay_ = for_replay; arg.notify_type_ = notify_type; @@ -5896,8 +5897,9 @@ int ObPartTransCtx::on_local_commit_tx_() TRANS_LOG(WARN, "wait gts elapse commit version failed", KR(ret), KPC(this)); } else if (FALSE_IT(tg.click())) { } else if (FALSE_IT(start_us = ObTimeUtility::fast_current_time())) { - } else if (OB_FAIL(mt_ctx_.trans_end(true, ctx_tx_data_.get_commit_version().get_val_for_lsn_allocator(), - ctx_tx_data_.get_end_log_ts().get_val_for_lsn_allocator()))) { + } else if (OB_FAIL(mt_ctx_.trans_end(true, + ctx_tx_data_.get_commit_version(), + ctx_tx_data_.get_end_log_ts()))) { TRANS_LOG(WARN, "trans end error", KR(ret), "context", *this); } else if (FALSE_IT(end_us = ObTimeUtility::fast_current_time())) { } else if (FALSE_IT(elr_handler_.reset_elr_state())) { @@ -5946,7 +5948,7 @@ int ObPartTransCtx::on_local_abort_tx_() ObTxBufferNodeArray tmp_array; start_us = ObTimeUtility::fast_current_time(); - if (OB_FAIL(mt_ctx_.trans_end(false, -1 /*unused*/, ctx_tx_data_.get_end_log_ts().get_val_for_lsn_allocator()))) { + if (OB_FAIL(mt_ctx_.trans_end(false, palf::SCN::invalid_scn(), ctx_tx_data_.get_end_log_ts()))) { TRANS_LOG(WARN, "trans end error", KR(ret), K(commit_version), "context", *this); } else if (FALSE_IT(end_us = ObTimeUtility::fast_current_time())) { } else if (OB_FAIL(trans_clear_())) { diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 5f2214f41..262da2fb6 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -197,7 +197,7 @@ public: { mt_ctx_.set_table_lock_killed(); } bool is_table_lock_killed() const; bool need_update_schema_version(const int64_t log_id, - const int64_t log_ts); + const palf::SCN log_ts); void set_trans_table_status(const obrpc::ObTrxToolArg &arg); share::ObLSID get_ls_id() const { return ls_id_; } diff --git a/src/storage/tx/ob_tx_replay_executor.cpp b/src/storage/tx/ob_tx_replay_executor.cpp index 681de9ac3..b10521e2e 100644 --- a/src/storage/tx/ob_tx_replay_executor.cpp +++ b/src/storage/tx/ob_tx_replay_executor.cpp @@ -239,7 +239,7 @@ int ObTxReplayExecutor::before_replay_redo_() if (!has_redo_) { if (OB_ISNULL(ctx_) || OB_ISNULL(mt_ctx_ = ctx_->get_memtable_ctx())) { ret = OB_INVALID_ARGUMENT; - } else if (mt_ctx_->replay_begin(log_ts_ns_.get_val_for_lsn_allocator())) { + } else if (mt_ctx_->replay_begin(log_ts_ns_)) { TRANS_LOG(ERROR, "[Replay Tx] replay_begin fail or mt_ctx_ is NULL", K(ret), K(mt_ctx_)); } else { has_redo_ = true; @@ -253,11 +253,11 @@ void ObTxReplayExecutor::finish_replay_(const int retcode) if (has_redo_) { if (OB_SUCCESS != retcode) { mt_ctx_->replay_end(false, /*is_replay_succ*/ - log_ts_ns_.get_val_for_lsn_allocator()); + log_ts_ns_); TRANS_LOG(WARN, "[Replay Tx]Tx Redo replay error, rollback to start", K(*this)); } else { mt_ctx_->replay_end(true, /*is_replay_succ*/ - log_ts_ns_.get_val_for_lsn_allocator()); + log_ts_ns_); // TRANS_LOG(INFO, "[Replay Tx] Tx Redo replay success, commit sub_trans", K(*this)); } } @@ -668,9 +668,9 @@ int ObTxReplayExecutor::replay_row_(storage::ObStoreCtx &store_ctx, KP(mmi_ptr)); } else if (OB_FAIL(data_mem_ptr->replay_row(store_ctx, mmi_ptr, row_buf))) { TRANS_LOG(WARN, "[Replay Tx] replay row error", K(ret)); - } else if (OB_FAIL(data_mem_ptr->set_max_end_log_ts(log_ts_ns_.get_val_for_lsn_allocator()))) { // for freeze log_ts , may be + } else if (OB_FAIL(data_mem_ptr->set_max_end_scn(log_ts_ns_))) { // for freeze log_ts , may be TRANS_LOG(WARN, "[Replay Tx] set memtable max end log ts failed", K(ret), KP(data_mem_ptr)); - } else if (OB_FAIL(data_mem_ptr->set_rec_log_ts(log_ts_ns_.get_val_for_lsn_allocator()))) { + } else if (OB_FAIL(data_mem_ptr->set_rec_scn(log_ts_ns_))) { TRANS_LOG(WARN, "[Replay Tx] set rec_log_ts error", K(ret), KPC(data_mem_ptr)); } @@ -679,7 +679,7 @@ int ObTxReplayExecutor::replay_row_(storage::ObStoreCtx &store_ctx, // in a freeze memtable which has a smaller end ts than this log. // // The rollback operation must hold write_ref to make memtable stay in memory. - mt_ctx_->rollback_redo_callbacks(log_ts_ns_.get_val_for_lsn_allocator()); + mt_ctx_->rollback_redo_callbacks(log_ts_ns_); } return ret; } diff --git a/src/storage/tx_table/ob_tx_ctx_memtable.cpp b/src/storage/tx_table/ob_tx_ctx_memtable.cpp index 845b5b550..f117cfe42 100644 --- a/src/storage/tx_table/ob_tx_ctx_memtable.cpp +++ b/src/storage/tx_table/ob_tx_ctx_memtable.cpp @@ -274,7 +274,7 @@ int ObTxCtxMemtable::flush(palf::SCN recycle_scn, bool need_freeze) scn_range.start_scn_.convert_for_gts(1); scn_range.end_scn_.convert_for_gts(cur_ts); set_scn_range(scn_range); - set_snapshot_version(cur_ts); + set_snapshot_version(scn_range.end_scn_); ATOMIC_STORE(&is_frozen_, true); } } diff --git a/src/storage/tx_table/ob_tx_data_memtable.cpp b/src/storage/tx_table/ob_tx_data_memtable.cpp index 9b4734a2b..5df2bbd6d 100644 --- a/src/storage/tx_table/ob_tx_data_memtable.cpp +++ b/src/storage/tx_table/ob_tx_data_memtable.cpp @@ -421,7 +421,7 @@ bool ObTxDataMemtable::ready_for_flush() STORAGE_LOG(WARN, "get_max_consequent_callbacked_log_ts failed", K(ret), K(freezer_->get_ls_id())); } else if (max_consequent_callbacked_scn >= key_.scn_range_.end_scn_) { state_ = ObTxDataMemtable::State::FROZEN; - set_snapshot_version(min_tx_scn_.get_val_for_lsn_allocator()); + set_snapshot_version(min_tx_scn_); bool_ret = true; } else { int64_t freeze_ts = key_.scn_range_.end_scn_.get_val_for_inner_table_field(); diff --git a/unittest/storage/memtable/test_memtable_basic.cpp b/unittest/storage/memtable/test_memtable_basic.cpp index 643fd816c..f1a29d859 100644 --- a/unittest/storage/memtable/test_memtable_basic.cpp +++ b/unittest/storage/memtable/test_memtable_basic.cpp @@ -328,7 +328,9 @@ TEST_F(TestMemtable, mt_set) print(mvcc_row); EXPECT_EQ(2, rg.mem_ctx_.trans_mgr_.get_main_list_length()); - EXPECT_EQ(OB_SUCCESS, rg.mem_ctx_.do_trans_end(true, 1000, 1000, 0)); + palf::SCN val_1000; + val_1000.convert_for_lsn_allocator(1000); + EXPECT_EQ(OB_SUCCESS, rg.mem_ctx_.do_trans_end(true, val_1000, val_1000, 0)); print(mvcc_row); } @@ -347,13 +349,17 @@ TEST_F(TestMemtable, conflict) EXPECT_EQ(OB_SUCCESS, rg2.init(2, this)); EXPECT_EQ(OB_ERR_EXCLUSIVE_LOCK_CONFLICT, rg2.write(1, 3, mt)); - EXPECT_EQ(OB_SUCCESS, rg.mem_ctx_.do_trans_end(true, 1000, 1000, 0)); + palf::SCN val_1000; + val_1000.convert_for_lsn_allocator(1000); + EXPECT_EQ(OB_SUCCESS, rg.mem_ctx_.do_trans_end(true, val_1000, val_1000, 0)); EXPECT_EQ(OB_TRANSACTION_SET_VIOLATION, rg2.write(1, 3, mt, 900)); EXPECT_EQ(OB_SUCCESS, rg2.write(1, 3, mt, 1000)); EXPECT_EQ(OB_SUCCESS, rg2.write(1, 4, mt, 1001)); - EXPECT_EQ(OB_SUCCESS, rg2.mem_ctx_.do_trans_end(true, 1002, 1002, 0)); + palf::SCN val_1002; + val_1002.convert_for_lsn_allocator(1002); + EXPECT_EQ(OB_SUCCESS, rg2.mem_ctx_.do_trans_end(true, val_1002, val_1002, 0)); print(mvcc_row); } @@ -367,22 +373,24 @@ TEST_F(TestMemtable, except) EXPECT_EQ(OB_SUCCESS, rg.init(1, this)); ObMvccRow *mvcc_row = nullptr; - EXPECT_EQ(OB_SUCCESS, rg.write(1, 2, mt, mvcc_row, 1000)); - EXPECT_EQ(OB_SUCCESS, rg.mem_ctx_.do_trans_end(true, 900, 900, 0)); + palf::SCN val_900; + val_900.convert_for_lsn_allocator(900); + EXPECT_EQ(OB_SUCCESS, rg.write(1, 2, mt, mvcc_row, 1000)); + EXPECT_EQ(OB_SUCCESS, rg.mem_ctx_.do_trans_end(true, val_900, val_900, 0)); RunCtxGuard rg2; EXPECT_EQ(OB_SUCCESS, rg2.init(2, this)); EXPECT_EQ(OB_SUCCESS, rg2.write(1, 3, mt, 1000)); - EXPECT_EQ(OB_SUCCESS, rg2.mem_ctx_.do_trans_end(true, 900, 900, 0)); + EXPECT_EQ(OB_SUCCESS, rg2.mem_ctx_.do_trans_end(true, val_900, val_900, 0)); print(mvcc_row); RunCtxGuard rg3; EXPECT_EQ(OB_SUCCESS, rg3.init(3, this)); EXPECT_EQ(OB_SUCCESS, rg3.write(1, 4, mt, 900)); - EXPECT_EQ(OB_SUCCESS, rg3.mem_ctx_.do_trans_end(true, 900, 900, 0)); + EXPECT_EQ(OB_SUCCESS, rg3.mem_ctx_.do_trans_end(true, val_900, val_900, 0)); print(mvcc_row); } @@ -395,11 +403,14 @@ TEST_F(TestMemtable, multi_key) RunCtxGuard rg; EXPECT_EQ(OB_SUCCESS, rg.init(1, this)); + palf::SCN val_900; + val_900.convert_for_lsn_allocator(900); + ObMvccRow *mvcc_row = nullptr; ObMvccRow *mvcc_row2 = nullptr; EXPECT_EQ(OB_SUCCESS, rg.write(1, 10, mt, mvcc_row, 1000)); EXPECT_EQ(OB_SUCCESS, rg.write(2, 20, mt, mvcc_row2, 1000)); - EXPECT_EQ(OB_SUCCESS, rg.mem_ctx_.do_trans_end(true, 900, 900, 0)); + EXPECT_EQ(OB_SUCCESS, rg.mem_ctx_.do_trans_end(true, val_900, val_900, 0)); print(mvcc_row); print(mvcc_row2); @@ -409,7 +420,7 @@ TEST_F(TestMemtable, multi_key) EXPECT_EQ(OB_SUCCESS, rg2.write(1, 100, mt, 1000)); EXPECT_EQ(OB_SUCCESS, rg2.write(2, 200, mt, 1000)); - EXPECT_EQ(OB_SUCCESS, rg2.mem_ctx_.do_trans_end(true, 900, 900, 0)); + EXPECT_EQ(OB_SUCCESS, rg2.mem_ctx_.do_trans_end(true, val_900, val_900, 0)); print(mvcc_row); print(mvcc_row2); } diff --git a/unittest/storage/test_compaction_policy.cpp b/unittest/storage/test_compaction_policy.cpp index 6322e2388..bd8da556f 100644 --- a/unittest/storage/test_compaction_policy.cpp +++ b/unittest/storage/test_compaction_policy.cpp @@ -305,7 +305,9 @@ int TestCompactionPolicy::mock_memtable( LOG_WARN("add to data_checkpoint failed", K(ret), KPC(memtable)); mt_mgr->clean_tail_memtable_(); } else if (palf::OB_MAX_SCN_TS_NS != end_border) { // frozen memtable - memtable->snapshot_version_ = snapshot_version; + palf::SCN snapshot_scn; + snapshot_scn.convert_for_lsn_allocator(snapshot_version); + memtable->snapshot_version_ = snapshot_scn; memtable->write_ref_cnt_ = 0; memtable->unsynced_cnt_ = 0; memtable->is_tablet_freeze_ = true; diff --git a/unittest/storage/test_dml_common.h b/unittest/storage/test_dml_common.h index cc4051961..2e12c962b 100644 --- a/unittest/storage/test_dml_common.h +++ b/unittest/storage/test_dml_common.h @@ -218,16 +218,16 @@ int TestDmlCommon::create_data_tablet( } else { transaction::ObMulSourceDataNotifyArg trans_flags; trans_flags.tx_id_ = 123; - trans_flags.log_ts_ = -1; + trans_flags.scn_ = palf::SCN::invalid_scn(); trans_flags.for_replay_ = false; ObLS *ls = ls_handle.get_ls(); if (OB_FAIL(ls->get_tablet_svr()->on_prepare_create_tablets(arg, trans_flags))) { STORAGE_LOG(WARN, "failed to prepare create tablets", K(ret), K(arg)); - } else if (FALSE_IT(trans_flags.log_ts_ = INT64_MAX - 100)) { + } else if (FALSE_IT(trans_flags.scn_ = palf::SCN::minus(palf::SCN::max_scn(), 100))) { } else if (OB_FAIL(ls->get_tablet_svr()->on_redo_create_tablets(arg, trans_flags))) { STORAGE_LOG(WARN, "failed to redo create tablets", K(ret), K(arg)); - } else if (FALSE_IT(++trans_flags.log_ts_)) { + } else if (FALSE_IT(trans_flags.scn_ = palf::SCN::plus(trans_flags.scn_, 1))) { } else if (OB_FAIL(ls->get_tablet_svr()->on_commit_create_tablets(arg, trans_flags))) { STORAGE_LOG(WARN, "failed to commit create tablets", K(ret), K(arg)); } diff --git a/unittest/storage/test_lob_common.h b/unittest/storage/test_lob_common.h index 7503074c5..05f8508a9 100644 --- a/unittest/storage/test_lob_common.h +++ b/unittest/storage/test_lob_common.h @@ -77,16 +77,16 @@ int TestLobCommon::create_data_tablet( } else { transaction::ObMulSourceDataNotifyArg trans_flags; trans_flags.tx_id_ = 123; - trans_flags.log_ts_ = -1; + trans_flags.scn_ = palf::SCN::invalid_scn(); trans_flags.for_replay_ = false; ObLS *ls = ls_handle.get_ls(); if (OB_FAIL(ls->get_tablet_svr()->on_prepare_create_tablets(arg, trans_flags))) { STORAGE_LOG(WARN, "failed to prepare create tablets", K(ret), K(arg)); - } else if (FALSE_IT(trans_flags.log_ts_ = INT64_MAX - 100)) { + } else if (FALSE_IT(trans_flags.scn_ = palf::SCN::minus(palf::SCN::max_scn(), 100))) { } else if (OB_FAIL(ls->get_tablet_svr()->on_redo_create_tablets(arg, trans_flags))) { STORAGE_LOG(WARN, "failed to redo create tablets", K(ret), K(arg)); - } else if (FALSE_IT(++trans_flags.log_ts_)) { + } else if (FALSE_IT(trans_flags.scn_ = palf::SCN::plus(trans_flags.scn_, 1))) { } else if (OB_FAIL(ls->get_tablet_svr()->on_commit_create_tablets(arg, trans_flags))) { STORAGE_LOG(WARN, "failed to commit create tablets", K(ret), K(arg)); } diff --git a/unittest/storage/test_tablet_helper.h b/unittest/storage/test_tablet_helper.h index bc64c08fc..ab2bac2f8 100644 --- a/unittest/storage/test_tablet_helper.h +++ b/unittest/storage/test_tablet_helper.h @@ -36,15 +36,15 @@ int TestTabletHelper::create_tablet(ObLSTabletService &ls_tablet_svr, obrpc::ObB int ret = common::OB_SUCCESS; transaction::ObMulSourceDataNotifyArg trans_flags; trans_flags.tx_id_ = 123; - trans_flags.log_ts_ = -1; + trans_flags.scn_ = palf::SCN::invalid_scn(); trans_flags.for_replay_ = false; if (OB_FAIL(ls_tablet_svr.on_prepare_create_tablets(arg, trans_flags))) { STORAGE_LOG(WARN, "failed to prepare create tablets", K(ret), K(arg)); - } else if (FALSE_IT(trans_flags.log_ts_ = palf::OB_MAX_SCN_TS_NS - 100)) { + } else if (FALSE_IT(trans_flags.scn_ = palf::SCN::minus(palf::SCN::max_scn(), 100))) { } else if (OB_FAIL(ls_tablet_svr.on_redo_create_tablets(arg, trans_flags))) { STORAGE_LOG(WARN, "failed to redo create tablets", K(ret), K(arg)); - } else if (FALSE_IT(++trans_flags.log_ts_)) { + } else if (FALSE_IT(trans_flags.scn_ = palf::SCN::plus(trans_flags.scn_, 1))) { } else if (OB_FAIL(ls_tablet_svr.on_commit_create_tablets(arg, trans_flags))) { STORAGE_LOG(WARN, "failed to commit create tablets", K(ret), K(arg)); }