diff --git a/src/logservice/palf/scn.cpp b/src/logservice/palf/scn.cpp index 710bca82d..66a4c50a4 100644 --- a/src/logservice/palf/scn.cpp +++ b/src/logservice/palf/scn.cpp @@ -169,15 +169,14 @@ SCN SCN::minus(const SCN &ref, uint64_t delta) { int ret = OB_SUCCESS; SCN result; - uint64_t new_val = OB_INVALID_SCN_VAL; if (OB_UNLIKELY(delta >= OB_MAX_SCN_TS_NS || !ref.is_valid())) { ret = OB_INVALID_ARGUMENT; PALF_LOG(ERROR, "invalid argument", K(delta), K(ref), K(ret)); - } else if (OB_UNLIKELY((new_val = ref.val_ - delta) > ref.val_)) { + } else if (OB_UNLIKELY(ref.val_ < delta)) { ret = OB_INVALID_ARGUMENT; - PALF_LOG(ERROR , "new_val is not valid", K(new_val), K(ref), K(delta), K(ret)); + PALF_LOG(ERROR , "new_val is not valid", K(ref), K(delta), K(ret)); } else { - result.val_ = new_val; + result.val_ = ref.val_ - delta; } return result; } @@ -319,6 +318,11 @@ uint64_t SCN::get_val_for_lsn_allocator() const return val_; } +uint64_t SCN::get_val_for_row_cell() const +{ + return val_; +} + int SCN::convert_for_tx(int64_t val) { int ret = OB_SUCCESS; diff --git a/src/logservice/palf/scn.h b/src/logservice/palf/scn.h index 2c22b8147..3c7710b8c 100644 --- a/src/logservice/palf/scn.h +++ b/src/logservice/palf/scn.h @@ -78,6 +78,7 @@ public: //only for gts use uint64_t get_val_for_gts() const; uint64_t get_val_for_lsn_allocator() const; + uint64_t get_val_for_row_cell() const; // @param[in] convert for tx int convert_for_tx(int64_t commit_trans_version); diff --git a/src/observer/virtual_table/ob_all_virtual_tx_data_table.cpp b/src/observer/virtual_table/ob_all_virtual_tx_data_table.cpp index 2727439e7..6ef635ae7 100644 --- a/src/observer/virtual_table/ob_all_virtual_tx_data_table.cpp +++ b/src/observer/virtual_table/ob_all_virtual_tx_data_table.cpp @@ -125,10 +125,10 @@ int ObAllVirtualTxDataTable::process_curr_tenant(common::ObNewRow *&row) cur_row_.cells_[i].set_int(row_data.tx_data_count_); break; case MIN_TX_SCN_COL: - cur_row_.cells_[i].set_uint64(row_data.min_tx_scn_.get_val_for_lsn_allocator()); + cur_row_.cells_[i].set_uint64(row_data.min_tx_scn_.get_val_for_inner_table_field()); break; case MAX_TX_SCN_COL: - cur_row_.cells_[i].set_uint64(row_data.max_tx_scn_.get_val_for_lsn_allocator()); + cur_row_.cells_[i].set_uint64(row_data.max_tx_scn_.get_val_for_inner_table_field()); break; default: ret = OB_ERR_UNEXPECTED; diff --git a/src/storage/blocksstable/ob_micro_block_row_scanner.cpp b/src/storage/blocksstable/ob_micro_block_row_scanner.cpp index 9e5d295e1..353f4d1cd 100644 --- a/src/storage/blocksstable/ob_micro_block_row_scanner.cpp +++ b/src/storage/blocksstable/ob_micro_block_row_scanner.cpp @@ -24,6 +24,8 @@ namespace oceanbase { using namespace storage; +using namespace palf; + namespace blocksstable { ObIMicroBlockRowScanner::ObIMicroBlockRowScanner(common::ObIAllocator &allocator) @@ -1272,9 +1274,12 @@ int ObMultiVersionMicroBlockRowScanner::lock_for_read( int ret = OB_SUCCESS; auto &tx_table_guard = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guard(); int64_t read_epoch = tx_table_guard.epoch(); + SCN scn_trans_version = SCN::invalid_scn(); if (OB_FAIL(tx_table_guard.get_tx_table()->lock_for_read( - lock_for_read_arg, read_epoch, can_read, trans_version, is_determined_state))) { + lock_for_read_arg, read_epoch, can_read, scn_trans_version, is_determined_state))) { LOG_WARN("failed to check transaction status", K(ret)); + } else { + trans_version = scn_trans_version.get_val_for_tx(); } return ret; } @@ -1989,12 +1994,16 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::get_trans_state( { int ret = OB_SUCCESS; //get trans status & committed_trans_version_ - commit_trans_version = INT64_MAX; + SCN scn_commit_trans_version = SCN::max_scn(); + SCN merge_scn; + merge_scn.convert_for_lsn_allocator(context_->merge_log_ts_); auto &tx_table_guard = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guard(); int64_t read_epoch = tx_table_guard.epoch();; - if (OB_FAIL(tx_table_guard.get_tx_table()->get_tx_state_with_log_ts( - trans_id, context_->merge_log_ts_, read_epoch, state, commit_trans_version))) { + if (OB_FAIL(tx_table_guard.get_tx_table()->get_tx_state_with_scn( + trans_id, merge_scn, read_epoch, state, scn_commit_trans_version))) { LOG_WARN("get transaction status failed", K(ret), K(trans_id), K(state)); + } else { + commit_trans_version = scn_commit_trans_version.get_val_for_tx(); } return ret; } diff --git a/src/storage/ls/ob_ls.h b/src/storage/ls/ob_ls.h index 634baa0d4..3d66351d9 100644 --- a/src/storage/ls/ob_ls.h +++ b/src/storage/ls/ob_ls.h @@ -627,7 +627,7 @@ public: // ObTxTable interface DELEGATE_WITH_RET(tx_table_, get_tx_table_guard, int); - DELEGATE_WITH_RET(tx_table_, get_upper_trans_version_before_given_log_ts, int); + DELEGATE_WITH_RET(tx_table_, get_upper_trans_version_before_given_scn, int); DELEGATE_WITH_RET(tx_table_, dump_single_tx_data_2_text, int); // ObCheckpointExecutor interface: diff --git a/src/storage/memtable/mvcc/ob_multi_version_iterator.cpp b/src/storage/memtable/mvcc/ob_multi_version_iterator.cpp index 23a257046..2d5355e26 100644 --- a/src/storage/memtable/mvcc/ob_multi_version_iterator.cpp +++ b/src/storage/memtable/mvcc/ob_multi_version_iterator.cpp @@ -30,6 +30,8 @@ using namespace storage; using namespace transaction; using namespace common; using namespace blocksstable; +using namespace palf; + namespace memtable { @@ -263,14 +265,14 @@ int ObMultiVersionValueIterator::get_trans_status( { UNUSED(cluster_version); int ret = OB_SUCCESS; - int64_t trans_version = INT64_MAX; + SCN trans_version = SCN::max_scn(); ObTxTable *tx_table = ctx_->get_tx_table_guard().get_tx_table(); int64_t read_epoch = ctx_->get_tx_table_guard().epoch(); if (OB_ISNULL(tx_table)) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "tx_table_ is null", K(ret), KPC_(ctx), K(merge_scn_), K(trans_id)); - } else if (OB_FAIL(tx_table->get_tx_state_with_log_ts(trans_id, - merge_scn_.get_val_for_lsn_allocator(), + } else if (OB_FAIL(tx_table->get_tx_state_with_scn(trans_id, + merge_scn_, read_epoch, state, trans_version))) { diff --git a/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp b/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp index 0d1176cd2..3e42dbd81 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp @@ -198,8 +198,7 @@ int ObMvccValueIterator::lock_for_read_inner_(const ObQueryFlag &flag, // is_delay_cleanout() to check the state and we only cleanout it // when data is delay cleanout bool can_read = false; - // palf::SCN data_scn = palf::SCN::max_scn(); - int64_t data_version = INT64_MAX; + palf::SCN data_version = palf::SCN::max_scn(); bool is_determined_state = false; // Opt3: we only cleanout tx node who is delay cleanout ObCleanoutOp cleanout_op; @@ -225,7 +224,7 @@ int ObMvccValueIterator::lock_for_read_inner_(const ObQueryFlag &flag, cleanout_op, recheck_op))) { TRANS_LOG(WARN, "lock for read failed", KPC(iter), K(lock_for_read_arg)); - } else if (can_read && snapshot_version.get_val_for_lsn_allocator() >= data_version) { + } else if (can_read && snapshot_version >= data_version) { // Case 5.1: data is cleanout by lock for read and can be read by reader's // snapshot version_iter_ = iter; diff --git a/src/storage/ob_i_memtable_mgr.h b/src/storage/ob_i_memtable_mgr.h index 374a4efa9..ff741a906 100644 --- a/src/storage/ob_i_memtable_mgr.h +++ b/src/storage/ob_i_memtable_mgr.h @@ -59,9 +59,6 @@ public: ObFreezer *freezer, ObTenantMetaMemMgr *t3m, ObTabletDDLKvMgr *ddl_kv_mgr); - virtual int create_memtable(const int64_t clog_checkpoint_ts, - const int64_t schema_version, - const bool for_replay=false) = 0; virtual int create_memtable(const palf::SCN clog_checkpoint_scn, const int64_t schema_version, const bool for_replay = false) diff --git a/src/storage/tablelock/ob_lock_memtable_mgr.cpp b/src/storage/tablelock/ob_lock_memtable_mgr.cpp index ed7182e80..94def1cff 100644 --- a/src/storage/tablelock/ob_lock_memtable_mgr.cpp +++ b/src/storage/tablelock/ob_lock_memtable_mgr.cpp @@ -77,12 +77,11 @@ void ObLockMemtableMgr::reset() is_inited_ = false; } -int ObLockMemtableMgr::create_memtable( - const int64_t last_replay_log_ts, - const int64_t schema_version, - const bool for_replay) +int ObLockMemtableMgr::create_memtable(const palf::SCN clog_checkpoint_scn, + const int64_t schema_version, + const bool for_replay) { - UNUSED(last_replay_log_ts); + UNUSED(clog_checkpoint_scn); UNUSED(schema_version); UNUSED(for_replay); diff --git a/src/storage/tablelock/ob_lock_memtable_mgr.h b/src/storage/tablelock/ob_lock_memtable_mgr.h index a78e318a8..0036d9f11 100644 --- a/src/storage/tablelock/ob_lock_memtable_mgr.h +++ b/src/storage/tablelock/ob_lock_memtable_mgr.h @@ -63,9 +63,9 @@ public: storage::ObTabletDDLKvMgr *ddl_kv_mgr) override; virtual void destroy() override; - virtual int create_memtable(const int64_t last_replay_log_ts, + virtual int create_memtable(const palf::SCN clog_checkpoint_scn, const int64_t schema_version, - const bool for_replay=false) override; + const bool for_replay = false) override; DECLARE_TO_STRING; private: diff --git a/src/storage/tablet/ob_tablet.cpp b/src/storage/tablet/ob_tablet.cpp index 768296f0f..c01523063 100644 --- a/src/storage/tablet/ob_tablet.cpp +++ b/src/storage/tablet/ob_tablet.cpp @@ -1108,9 +1108,11 @@ int ObTablet::update_upper_trans_version(ObLS &ls, bool &is_updated) LOG_WARN("sstable must not be null", K(ret), K(i), K(minor_tables)); } else if (INT64_MAX == sstable->get_upper_trans_version()) { int64_t max_trans_version = INT64_MAX; - if (OB_FAIL(ls.get_upper_trans_version_before_given_log_ts( - sstable->get_end_log_ts(), max_trans_version))) { + SCN tmp_scn = SCN::max_scn(); + if (OB_FAIL(ls.get_upper_trans_version_before_given_scn( + sstable->get_end_scn(), tmp_scn))) { LOG_WARN("failed to get upper trans version before given log ts", K(ret), KPC(sstable)); + } else if (FALSE_IT(max_trans_version = tmp_scn.is_max() ? INT64_MAX : tmp_scn.get_val_for_lsn_allocator())) { } else if (0 == max_trans_version) { ret = OB_ERR_UNEXPECTED; LOG_WARN("max trans version should not be 0", KPC(sstable)); @@ -1641,7 +1643,7 @@ int ObTablet::inner_create_memtable( LOG_WARN("invalid args", K(ret), K(clog_checkpoint_scn), K(schema_version)); } else if (OB_FAIL(get_memtable_mgr(memtable_mgr))) { LOG_WARN("failed to get memtable mgr", K(ret)); - } else if (OB_FAIL(memtable_mgr->create_memtable(clog_checkpoint_scn.get_val_for_gts(), schema_version, for_replay))) { + } else if (OB_FAIL(memtable_mgr->create_memtable(clog_checkpoint_scn, schema_version, for_replay))) { if (OB_ENTRY_EXIST != ret && OB_MINOR_FREEZE_NOT_ALLOW != ret) { LOG_WARN("failed to create memtable for tablet", K(ret), K(clog_checkpoint_scn), K(schema_version), K(for_replay)); diff --git a/src/storage/tablet/ob_tablet_memtable_mgr.cpp b/src/storage/tablet/ob_tablet_memtable_mgr.cpp index a1cec9acc..d03d72dbf 100644 --- a/src/storage/tablet/ob_tablet_memtable_mgr.cpp +++ b/src/storage/tablet/ob_tablet_memtable_mgr.cpp @@ -129,7 +129,7 @@ int ObTabletMemtableMgr::init_storage_schema_recorder( // There are two cases: // 1. create the first memtable for tablet // 2. create the new memtable after freezing the old memtable -int ObTabletMemtableMgr::create_memtable(const int64_t clog_checkpoint_ts, +int ObTabletMemtableMgr::create_memtable(const palf::SCN clog_checkpoint_scn, const int64_t schema_version, const bool for_replay) { @@ -168,7 +168,7 @@ int ObTabletMemtableMgr::create_memtable(const int64_t clog_checkpoint_ts, ObITable::TableKey table_key; table_key.table_type_ = ObITable::DATA_MEMTABLE; table_key.tablet_id_ = tablet_id_; - table_key.scn_range_.start_scn_.convert_for_gts(clog_checkpoint_ts); //TODO(SCN) fix clog_checkpoint_ts with SCN + table_key.scn_range_.start_scn_ = clog_checkpoint_scn; table_key.scn_range_.end_scn_.set_max(); memtable::ObMemtable *memtable = NULL; @@ -213,10 +213,10 @@ int ObTabletMemtableMgr::create_memtable(const int64_t clog_checkpoint_ts, } } // there is no frozen memtable and new sstable will not be generated, - // meaning that clog_checkpoint_ts will not be updated now, - // so get newest clog_checkpoint_ts to set left boundary + // meaning that clog_checkpoint_scn will not be updated now, + // so get newest clog_checkpoint_scn to set left boundary } else if (OB_FAIL(get_newest_clog_checkpoint_scn(new_clog_checkpoint_scn))){ - LOG_WARN("failed to get newest clog_checkpoint_ts", K(ret), K(ls_id), K(tablet_id_), + LOG_WARN("failed to get newest clog_checkpoint_scn", K(ret), K(ls_id), K(tablet_id_), K(new_clog_checkpoint_scn)); } else if (OB_FAIL(get_newest_snapshot_version(new_snapshot_version))){ LOG_WARN("failed to get newest snapshot_version", K(ret), K(ls_id), K(tablet_id_), K(new_snapshot_version)); diff --git a/src/storage/tablet/ob_tablet_memtable_mgr.h b/src/storage/tablet/ob_tablet_memtable_mgr.h index e7b1fa069..678bec82c 100644 --- a/src/storage/tablet/ob_tablet_memtable_mgr.h +++ b/src/storage/tablet/ob_tablet_memtable_mgr.h @@ -48,9 +48,6 @@ public: ObTenantMetaMemMgr *t3m, ObTabletDDLKvMgr *ddl_kv_mgr) override; - virtual int create_memtable(const int64_t clog_checkpoint_ts, - const int64_t schema_version, - const bool for_replay=false) override; virtual int get_active_memtable(ObTableHandleV2 &handle) const override; virtual int get_all_memtables(ObTableHdlArray &handle) override; virtual void destroy() override; @@ -70,6 +67,9 @@ public: memtable::ObMemtable *&memtable, const memtable::MultiSourceDataUnitType type) const override; int release_tail_memtable(memtable::ObIMemtable *memtable); + int create_memtable(const palf::SCN clog_checkpoint_scn, + const int64_t schema_version, + const bool for_replay); int get_memtables( ObTableHdlArray &handle, const bool reset_handle = true, diff --git a/src/storage/tx/ob_ctx_tx_data.cpp b/src/storage/tx/ob_ctx_tx_data.cpp index b3cb25cc6..60de6ba91 100644 --- a/src/storage/tx/ob_ctx_tx_data.cpp +++ b/src/storage/tx/ob_ctx_tx_data.cpp @@ -280,7 +280,7 @@ int ObCtxTxData::set_commit_version(const palf::SCN &commit_version) if (OB_FAIL(check_tx_data_writable_())) { TRANS_LOG(WARN, "tx data is not writeable", K(ret), K(*this)); } else { - tx_data_->commit_scn_ = commit_version; + tx_data_->commit_version_ = commit_version; } return ret; @@ -324,7 +324,7 @@ int32_t ObCtxTxData::get_state() const const palf::SCN ObCtxTxData::get_commit_version() const { RLockGuard guard(lock_); - SCN commit_version = (NULL != tx_data_ ? tx_data_->commit_scn_ : tx_commit_data_.commit_scn_); + SCN commit_version = (NULL != tx_data_ ? tx_data_->commit_version_ : tx_commit_data_.commit_version_); return commit_version; } diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index eec61b0f6..bf4317b9d 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -1612,20 +1612,16 @@ int ObTransService::get_tx_state_from_tx_table_(const share::ObLSID &lsid, ObTxTable *tx_table = NULL; int64_t _state = 0; int64_t read_epoch = ObTxTable::INVALID_READ_EPOCH; - int64_t commit_ts = 0; if (OB_FAIL(get_tx_table_guard_(NULL, lsid, tx_table_guard))) { TRANS_LOG(WARN, "get tx table guard failed", KR(ret), K(lsid), KPC(this)); } else if (!tx_table_guard.is_valid()) { TRANS_LOG(WARN, "tx table is null", KR(ret), K(lsid), KPC(this)); } else if (FALSE_IT(tx_table = tx_table_guard.get_tx_table())) { } else if (FALSE_IT(read_epoch = tx_table_guard.epoch())) { - } else if (OB_FAIL(tx_table->try_get_tx_state(tx_id, read_epoch, _state, commit_ts))) { + } else if (OB_FAIL(tx_table->try_get_tx_state(tx_id, read_epoch, _state, commit_version))) { TRANS_LOG(WARN, "get tx state failed", KR(ret), K(lsid), K(tx_id), KPC(this)); } else { state = (int)_state; - if (OB_FAIL(commit_version.convert_for_lsn_allocator(commit_ts))) { - TRANS_LOG(WARN, "convert for lsn fail", K(commit_ts)); - } } return ret; } diff --git a/src/storage/tx/ob_tx_data_define.cpp b/src/storage/tx/ob_tx_data_define.cpp index ea3746be4..12419da14 100644 --- a/src/storage/tx/ob_tx_data_define.cpp +++ b/src/storage/tx/ob_tx_data_define.cpp @@ -217,7 +217,7 @@ void ObTxCommitData::reset() { tx_id_ = INT64_MAX; state_ = RUNNING; - commit_scn_.reset(); + commit_version_.reset(); start_scn_.reset(); end_scn_.reset(); is_in_tx_data_table_ = false; @@ -257,18 +257,18 @@ int ObTxData::serialize(char *buf, const int64_t buf_len, int64_t &pos) const int ObTxData::serialize_(char *buf, const int64_t buf_len, int64_t &pos) const { int ret = OB_SUCCESS; - // LST_DO_CODE(OB_UNIS_ENCODE, state_, commit_scn_, start_scn_, end_scn_); + // LST_DO_CODE(OB_UNIS_ENCODE, state_, commit_version_, start_scn_, end_scn_); if (OB_FAIL(tx_id_.serialize(buf, buf_len, pos))) { STORAGE_LOG(WARN, "serialize tx_id fail.", KR(ret), K(pos), K(buf_len)); } else if (OB_FAIL(serialization::encode_vi32(buf, buf_len, pos, state_))) { STORAGE_LOG(WARN, "serialize state fail.", KR(ret), K(pos), K(buf_len)); - } else if (OB_FAIL(commit_scn_.serialize(buf, buf_len, pos))) { - STORAGE_LOG(WARN, "serialize commit_scn fail.", KR(ret), K(pos), K(buf_len)); + } else if (OB_FAIL(commit_version_.serialize(buf, buf_len, pos))) { + STORAGE_LOG(WARN, "serialize commit_version fail.", KR(ret), K(pos), K(buf_len)); } else if (OB_FAIL(start_scn_.serialize(buf, buf_len, pos))) { - STORAGE_LOG(WARN, "serialize start_log_ts fail.", KR(ret), K(pos), K(buf_len)); + STORAGE_LOG(WARN, "serialize start_scn fail.", KR(ret), K(pos), K(buf_len)); } else if (OB_FAIL(end_scn_.serialize(buf, buf_len, pos))) { - STORAGE_LOG(WARN, "serialize end_log_ts fail.", KR(ret), K(pos), K(buf_len)); + STORAGE_LOG(WARN, "serialize end_scn fail.", KR(ret), K(pos), K(buf_len)); } else if (OB_FAIL(undo_status_list_.serialize(buf, buf_len, pos))) { STORAGE_LOG(WARN, "serialize undo_status_list fail.", KR(ret), K(pos), K(buf_len)); } @@ -289,10 +289,10 @@ int64_t ObTxData::get_serialize_size() const int64_t ObTxData::get_serialize_size_() const { int64_t len = 0; - // LST_DO_CODE(OB_UNIS_ADD_LEN, state_, commit_scn_, start_scn_, end_scn_); + // LST_DO_CODE(OB_UNIS_ADD_LEN, state_, commit_version_, start_scn_, end_scn_); len += tx_id_.get_serialize_size(); len += serialization::encoded_length_vi32(state_); - len += commit_scn_.get_serialize_size(); + len += commit_version_.get_serialize_size(); len += start_scn_.get_serialize_size(); len += end_scn_.get_serialize_size(); len += undo_status_list_.get_serialize_size(); @@ -339,12 +339,12 @@ int ObTxData::deserialize_(const char *buf, STORAGE_LOG(WARN, "deserialize tx_id fail.", KR(ret), K(pos), K(data_len)); } else if (OB_FAIL(serialization::decode_vi32(buf, data_len, pos, &state_))) { STORAGE_LOG(WARN, "deserialize state fail.", KR(ret), K(pos), K(data_len)); - } else if (OB_FAIL(commit_scn_.deserialize(buf, data_len, pos))) { - STORAGE_LOG(WARN, "deserialize commit_scn fail.", KR(ret), K(pos), K(data_len)); + } else if (OB_FAIL(commit_version_.deserialize(buf, data_len, pos))) { + STORAGE_LOG(WARN, "deserialize commit_version fail.", KR(ret), K(pos), K(data_len)); } else if (OB_FAIL(start_scn_.deserialize(buf, data_len, pos))) { - STORAGE_LOG(WARN, "deserialize start_log_ts fail.", KR(ret), K(pos), K(data_len)); + STORAGE_LOG(WARN, "deserialize start_scn fail.", KR(ret), K(pos), K(data_len)); } else if (OB_FAIL(end_scn_.deserialize(buf, data_len, pos))) { - STORAGE_LOG(WARN, "deserialize end_log_ts fail.", KR(ret), K(pos), K(data_len)); + STORAGE_LOG(WARN, "deserialize end_scn fail.", KR(ret), K(pos), K(data_len)); } else if (OB_FAIL(undo_status_list_.deserialize(buf, data_len, pos, slice_allocator))) { STORAGE_LOG(WARN, "deserialize undo_status_list fail.", KR(ret), K(pos), K(data_len)); } @@ -367,7 +367,7 @@ ObTxData &ObTxData::operator=(const ObTxData &rhs) { tx_id_ = rhs.tx_id_; state_ = rhs.state_; - commit_scn_ = rhs.commit_scn_; + commit_version_ = rhs.commit_version_; start_scn_ = rhs.start_scn_; end_scn_ = rhs.end_scn_; undo_status_list_ = rhs.undo_status_list_; @@ -379,7 +379,7 @@ ObTxData &ObTxData::operator=(const ObTxCommitData &rhs) { tx_id_ = rhs.tx_id_; state_ = rhs.state_; - commit_scn_ = rhs.commit_scn_; + commit_version_ = rhs.commit_version_; start_scn_ = rhs.start_scn_; end_scn_ = rhs.end_scn_; is_in_tx_data_table_ = rhs.is_in_tx_data_table_; @@ -406,16 +406,16 @@ bool ObTxData::is_valid_in_tx_data_table() const STORAGE_LOG(ERROR, "tx data state is invalid", KPC(this)); } else if (!start_scn_.is_valid()) { bool_ret = false; - STORAGE_LOG(ERROR, "tx data start_log_ts is invalid", KPC(this)); + STORAGE_LOG(ERROR, "tx data start_scn is invalid", KPC(this)); } else if (!end_scn_.is_valid()) { bool_ret = false; - STORAGE_LOG(ERROR, "tx data end_log_ts is invalid", KPC(this)); + STORAGE_LOG(ERROR, "tx data end_scn is invalid", KPC(this)); } else if (end_scn_ < start_scn_) { bool_ret = false; - STORAGE_LOG(ERROR, "tx data end_log_ts is less than start_log_ts", KPC(this)); - } else if (!commit_scn_.is_valid() && state_ != RUNNING && state_ != ABORT) { + STORAGE_LOG(ERROR, "tx data end_scn is less than start_scn", KPC(this)); + } else if (!commit_version_.is_valid() && state_ != RUNNING && state_ != ABORT) { bool_ret = false; - STORAGE_LOG(ERROR, "tx data commit_scn is invalid but state is not running or abort", + STORAGE_LOG(ERROR, "tx data commit_version is invalid but state is not running or abort", KPC(this)); } @@ -511,15 +511,15 @@ bool ObTxData::equals_(ObTxData &rhs) } else if (state_ != rhs.state_) { bool_ret = false; STORAGE_LOG(INFO, "state is not equal."); - } else if (commit_scn_ != rhs.commit_scn_) { + } else if (commit_version_ != rhs.commit_version_) { bool_ret = false; - STORAGE_LOG(INFO, "commit_scn is not equal."); + STORAGE_LOG(INFO, "commit_version is not equal."); } else if (start_scn_ != rhs.start_scn_) { bool_ret = false; - STORAGE_LOG(INFO, "start_log_ts is not equal."); + STORAGE_LOG(INFO, "start_scn is not equal."); } else if (end_scn_ != rhs.end_scn_) { bool_ret = false; - STORAGE_LOG(INFO, "end_log_ts is not equal."); + STORAGE_LOG(INFO, "end_scn is not equal."); } else if (undo_status_list_.undo_node_cnt_ != rhs.undo_status_list_.undo_node_cnt_) { bool_ret = false; STORAGE_LOG(INFO, "undo_node_cnt is not equal."); @@ -565,13 +565,13 @@ bool ObTxData::equals_(ObTxData &rhs) void ObTxData::print_to_stderr(const ObTxData &tx_data) { fprintf(stderr, - "TX_DATA:{tx_id=%-20ld in_tx_data_table=%-6s start_log_scn=%-20s end_log_scn=%-20s commit_scn=%-20s " + "TX_DATA:{tx_id=%-20ld in_tx_data_table=%-6s start_log_scn=%-20s end_log_scn=%-20s commit_version=%-20s " "state=%s", tx_data.tx_id_.get_id(), tx_data.is_in_tx_data_table_ ? "True" : "False", to_cstring(tx_data.start_scn_), to_cstring(tx_data.end_scn_), - to_cstring(tx_data.commit_scn_), + to_cstring(tx_data.commit_version_), get_state_string(tx_data.state_)); tx_data.undo_status_list_.dump_2_text(stderr); @@ -585,12 +585,12 @@ void ObTxData::dump_2_text(FILE *fd) const fprintf(fd, "TX_DATA:\n{\n tx_id=%-20ld\n in_tx_data_table=%-6s\n start_log_scn=%-20s\n end_log_scn=%-20s\n " - " commit_scn=%-20s\n state=%s\n", + " commit_version=%-20s\n state=%s\n", tx_id_.get_id(), is_in_tx_data_table_ ? "True" : "False", to_cstring(start_scn_), to_cstring(end_scn_), - to_cstring(commit_scn_), + to_cstring(commit_version_), get_state_string(state_)); undo_status_list_.dump_2_text(fd); @@ -605,7 +605,7 @@ DEF_TO_STRING(ObTxData) J_KV(K_(tx_id), "state", get_state_string(state_), "in_tx_data_table", is_in_tx_data_table_ ? "True" : "False", - K_(commit_scn), + K_(commit_version), K_(start_scn), K_(end_scn), K_(undo_status_list)); diff --git a/src/storage/tx/ob_tx_data_define.h b/src/storage/tx/ob_tx_data_define.h index 9637a3986..7ca3ac617 100644 --- a/src/storage/tx/ob_tx_data_define.h +++ b/src/storage/tx/ob_tx_data_define.h @@ -207,7 +207,6 @@ public: int64_t prepare_version_; }; -// FIXME : @gengli remove log ts class ObTxCommitData { public: @@ -216,7 +215,7 @@ public: TO_STRING_KV(K_(tx_id), K_(state), K_(is_in_tx_data_table), - K_(commit_scn), + K_(commit_version), K_(start_scn), K_(end_scn)); @@ -235,12 +234,11 @@ public: transaction::ObTransID tx_id_; int32_t state_; bool is_in_tx_data_table_; - palf::SCN commit_scn_; + palf::SCN commit_version_; palf::SCN start_scn_; palf::SCN end_scn_; }; -// DONT : Modify this definition class ObTxData : public ObTxCommitData, public TxDataHashValue { friend TxDataHashMapAllocHandle; diff --git a/src/storage/tx/ob_tx_data_functor.cpp b/src/storage/tx/ob_tx_data_functor.cpp index 38026c749..9e183521f 100644 --- a/src/storage/tx/ob_tx_data_functor.cpp +++ b/src/storage/tx/ob_tx_data_functor.cpp @@ -20,6 +20,9 @@ namespace oceanbase { + +using namespace palf; + namespace storage { @@ -88,7 +91,7 @@ int CheckRowLockedFunctor::operator() (const ObTxData &tx_data, ObTxCCCtx *tx_cc // Case 1: data is committed, so the lock is locked by the data and we // also need return the commit version for tsc check lock_state_.is_locked_ = false; - lock_state_.trans_version_ = tx_data.commit_scn_; + lock_state_.trans_version_ = tx_data.commit_version_; break; } case ObTxData::RUNNING: { @@ -128,7 +131,7 @@ int CheckRowLockedFunctor::operator() (const ObTxData &tx_data, ObTxCCCtx *tx_cc } -int GetTxStateWithLogTSFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx) +int GetTxStateWithSCNFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx) { UNUSED(tx_cc_ctx); int ret = OB_SUCCESS; @@ -140,22 +143,22 @@ int GetTxStateWithLogTSFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *t // Case 1: data is during execution, so we return the running state with // INT64_MAX as version state_ = ObTxData::RUNNING; - trans_version_ = INT64_MAX; - } else if (log_ts_ < tx_data.end_scn_.get_val_for_lsn_allocator()) { + trans_version_ = SCN::max_scn(); + } else if (scn_ < tx_data.end_scn_) { // Case 2: data is decided while the required state is before the merge log // ts, so we return the running state with INT64_MAX as txn version state_ = ObTxData::RUNNING; - trans_version_ = INT64_MAX; + trans_version_ = SCN::max_scn(); } else if (ObTxData::COMMIT == tx_data.state_) { // Case 3: data is committed and the required state is after the merge log // ts, so we return the commit state with commit version as txn version state_ = ObTxData::COMMIT; - trans_version_ = tx_data.commit_scn_.get_val_for_lsn_allocator(); + trans_version_ = tx_data.commit_version_; } else if (ObTxData::ABORT == tx_data.state_) { // Case 4: data is aborted and the required state is after the merge log // ts, so we return the abort state with 0 as txn version state_ = ObTxData::ABORT; - trans_version_ = 0; + trans_version_ = SCN::min_scn(); } else { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "unexpected transaction state_", K(ret), K(tx_data)); @@ -169,7 +172,7 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx * { int ret = OB_SUCCESS; can_read_ = false; - trans_version_ = OB_INVALID_VERSION; + trans_version_ = SCN::invalid_scn(); is_determined_state_ = false; auto &snapshot = lock_for_read_arg_.mvcc_acc_ctx_.snapshot_; auto snapshot_version = snapshot.version_; @@ -186,13 +189,13 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx * // depends on whether undo status contains the data. Then we return the commit // version as data version. can_read_ = !tx_data.undo_status_list_.is_contain(data_sql_sequence); - trans_version_ = tx_data.commit_scn_.get_val_for_lsn_allocator(); + trans_version_ = tx_data.commit_version_; is_determined_state_ = true; break; } case ObTxData::ELR_COMMIT: { can_read_ = !tx_data.undo_status_list_.is_contain(data_sql_sequence); - trans_version_ = tx_data.commit_scn_.get_val_for_lsn_allocator(); + trans_version_ = tx_data.commit_version_; is_determined_state_ = false; break; } @@ -201,7 +204,7 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx * if (read_latest && reader_tx_id == data_tx_id) { // Case 2.0: read the latest written of current txn can_read_ = !tx_data.undo_status_list_.is_contain(data_sql_sequence); - trans_version_ = 0; + trans_version_ = SCN::min_scn(); } else if (snapshot_tx_id == data_tx_id) { // Case 2.1: data is owned by the read txn bool tmp_can_read = false; @@ -219,7 +222,7 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx * can_read_ = tmp_can_read && !tx_data.undo_status_list_.is_contain(data_sql_sequence); // Tip 2.1.2: trans version is unnecessary for the running txn - trans_version_ = 0; + trans_version_ = SCN::min_scn(); } else { // Case 2.2: data is not owned by the read txn // NB: we need pay attention to the choice condition when issuing the @@ -234,7 +237,7 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx * // snapshot version, so we cannot read it and trans version is // unnecessary for the running txn can_read_ = false; - trans_version_ = 0; + trans_version_ = SCN::min_scn(); } else if (tx_cc_ctx->prepare_version_ > snapshot_version.get_val_for_lsn_allocator()) { // Case 2.2.2: data is at least in prepare state and the prepare // version is bigger than the read txn's snapshot version, then the @@ -242,7 +245,7 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx * // version, so we cannot read it and trans version is unnecessary for // the running txn can_read_ = false; - trans_version_ = 0; + trans_version_ = SCN::min_scn(); } else { // Case 2.2.3: data is in prepare state and the prepare version is // smaller than the read txn's snapshot version, then the data's @@ -264,7 +267,7 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx * // Case 3: data is aborted, so the state is decided, then we can not read // the data and the trans version is unnecessary for the aborted txn can_read_ = false; - trans_version_ = 0; + trans_version_ = SCN::min_scn(); is_determined_state_ = true; break; } @@ -331,7 +334,7 @@ bool ObReCheckTxNodeForLockForReadOperation::operator()() if (tnode_.is_aborted()) { can_read_ = false; - trans_version_ = 0; + trans_version_ = SCN::min_scn(); is_determined_state_ = true; ret = true; } @@ -381,9 +384,9 @@ int ObCleanoutTxNodeOperation::operator()(const ObTxData &tx_data, ObTxCCCtx *tx } } else if (ObTxData::COMMIT == tx_data.state_) { // Case 4: data is committed, so we should write back the commit state - if (OB_FAIL(value_.trans_commit(tx_data.commit_scn_, tnode_))) { + if (OB_FAIL(value_.trans_commit(tx_data.commit_version_, tnode_))) { TRANS_LOG(WARN, "mvcc trans ctx trans commit error", K(ret), K(value_), K(tnode_)); - } else if (FALSE_IT(tnode_.trans_commit(tx_data.commit_scn_, tx_data.end_scn_))) { + } else if (FALSE_IT(tnode_.trans_commit(tx_data.commit_version_, tx_data.end_scn_))) { } else if (blocksstable::ObDmlFlag::DF_LOCK == tnode_.get_dml_flag() && OB_FAIL(value_.unlink_trans_node(tnode_))) { TRANS_LOG(WARN, "unlink lock node failed", K(ret), K(value_), K(tnode_)); diff --git a/src/storage/tx/ob_tx_data_functor.h b/src/storage/tx/ob_tx_data_functor.h index dba6e523e..6164f7c3b 100644 --- a/src/storage/tx/ob_tx_data_functor.h +++ b/src/storage/tx/ob_tx_data_functor.h @@ -33,7 +33,7 @@ class ObReCheckTxNodeForLockForReadOperation public: ObReCheckTxNodeForLockForReadOperation(memtable::ObMvccTransNode &tnode, bool &can_read, - int64_t &trans_version, + palf::SCN &trans_version, bool &is_determined_state) : tnode_(tnode), can_read_(can_read), @@ -45,7 +45,7 @@ private: memtable::ObMvccTransNode &tnode_; bool &can_read_; bool &is_determined_state_; - int64_t &trans_version_; + palf::SCN &trans_version_; }; class ObReCheckNothingOperation @@ -127,24 +127,24 @@ public: ObStoreRowLockState &lock_state_; }; -// fetch the state of txn DATA_TRANS_ID when replaying to LOG_TS +// fetch the state of txn DATA_TRANS_ID when replaying to SCN // the requirement can be seen from https://yuque.antfin-inc.com/ob/storage/adk6yx // return the txn state and commit version if committed, INT64_MAX if running // and 0 if rollbacked when replaying to LOG_ID -class GetTxStateWithLogTSFunctor : public ObITxDataCheckFunctor +class GetTxStateWithSCNFunctor : public ObITxDataCheckFunctor { public: - GetTxStateWithLogTSFunctor(const int64_t log_ts, + GetTxStateWithSCNFunctor(const palf::SCN scn, int64_t &state, - int64_t &trans_version) - : log_ts_(log_ts), state_(state), trans_version_(trans_version) {} - virtual ~GetTxStateWithLogTSFunctor() {} + palf::SCN &trans_version) + : scn_(scn), state_(state), trans_version_(trans_version) {} + virtual ~GetTxStateWithSCNFunctor() {} virtual int operator()(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx = nullptr) override; - TO_STRING_KV(K(log_ts_), K(state_), K(trans_version_)); + TO_STRING_KV(K(scn_), K(state_), K(trans_version_)); public: - const int64_t log_ts_; + const palf::SCN scn_; int64_t &state_; - int64_t &trans_version_; + palf::SCN &trans_version_; }; // the txn READ_TRANS_ID use SNAPSHOT_VERSION to read the data, @@ -158,7 +158,7 @@ class LockForReadFunctor : public ObITxDataCheckFunctor public: LockForReadFunctor(const transaction::ObLockForReadArg &lock_for_read_arg, bool &can_read, - int64_t &trans_version, + palf::SCN &trans_version, bool &is_determined_state, const ObCleanoutOp &cleanout_op = ObCleanoutNothingOperation(), const ObReCheckOp &recheck_op = ObReCheckNothingOperation()) @@ -180,7 +180,7 @@ public: const transaction::ObLockForReadArg &lock_for_read_arg_; bool &can_read_; bool &is_determined_state_; - int64_t &trans_version_; + palf::SCN &trans_version_; // Cleanout the tx node if necessary ObCleanoutOp cleanout_op_; // ReCheck whether tx node is valid. diff --git a/src/storage/tx_table/ob_tx_ctx_memtable.cpp b/src/storage/tx_table/ob_tx_ctx_memtable.cpp index f117cfe42..b62f43896 100644 --- a/src/storage/tx_table/ob_tx_ctx_memtable.cpp +++ b/src/storage/tx_table/ob_tx_ctx_memtable.cpp @@ -19,6 +19,8 @@ namespace oceanbase { using namespace share; +using namespace palf; + namespace storage { @@ -225,9 +227,9 @@ palf::SCN ObTxCtxMemtable::get_rec_scn() palf::SCN rec_scn; if (OB_FAIL(get_ls_tx_ctx_mgr()->get_rec_scn(rec_scn))) { - TRANS_LOG(WARN, "get rec log ts failed", K(ret)); + TRANS_LOG(WARN, "get rec scn failed", K(ret)); } else { - TRANS_LOG(INFO, "tx ctx memtable get rec log ts", KPC(this), K(rec_scn)); + TRANS_LOG(INFO, "tx ctx memtable get rec scn", KPC(this), K(rec_scn)); } return rec_scn; @@ -265,7 +267,7 @@ int ObTxCtxMemtable::flush(palf::SCN recycle_scn, bool need_freeze) ObSpinLockGuard guard(flush_lock_); if (need_freeze) { - palf::SCN rec_scn = get_rec_scn(); + SCN rec_scn = get_rec_scn(); if (rec_scn >= recycle_scn) { TRANS_LOG(INFO, "no need to freeze", K(rec_scn), K(recycle_scn)); } else if (is_active_memtable()) { diff --git a/src/storage/tx_table/ob_tx_ctx_memtable.h b/src/storage/tx_table/ob_tx_ctx_memtable.h index a91478093..7f2d9eaf1 100644 --- a/src/storage/tx_table/ob_tx_ctx_memtable.h +++ b/src/storage/tx_table/ob_tx_ctx_memtable.h @@ -70,7 +70,6 @@ public: virtual int flush(palf::SCN recycle_scn, bool need_freeze = true); virtual ObTabletID get_tablet_id() const override; - virtual bool is_flushing() const override; // ================ NOT SUPPORTED INTERFACE =============== diff --git a/src/storage/tx_table/ob_tx_ctx_memtable_mgr.cpp b/src/storage/tx_table/ob_tx_ctx_memtable_mgr.cpp index c54904877..40b4209b8 100644 --- a/src/storage/tx_table/ob_tx_ctx_memtable_mgr.cpp +++ b/src/storage/tx_table/ob_tx_ctx_memtable_mgr.cpp @@ -21,6 +21,7 @@ namespace oceanbase { using namespace share; +using namespace palf; namespace storage { @@ -65,11 +66,11 @@ void ObTxCtxMemtableMgr::reset() is_inited_ = false; } -int ObTxCtxMemtableMgr::create_memtable(const int64_t last_replay_log_ts, +int ObTxCtxMemtableMgr::create_memtable(const SCN last_replay_scn, const int64_t schema_version, const bool for_replay) { - UNUSED(last_replay_log_ts); + UNUSED(last_replay_scn); UNUSED(schema_version); UNUSED(for_replay); diff --git a/src/storage/tx_table/ob_tx_ctx_memtable_mgr.h b/src/storage/tx_table/ob_tx_ctx_memtable_mgr.h index b9982a419..e058f3da6 100644 --- a/src/storage/tx_table/ob_tx_ctx_memtable_mgr.h +++ b/src/storage/tx_table/ob_tx_ctx_memtable_mgr.h @@ -58,7 +58,7 @@ public: virtual void destroy() override; // create_memtable is used for creating the only memtable for CheckpointMgr - virtual int create_memtable(const int64_t last_replay_log_ts, + virtual int create_memtable(const palf::SCN last_replay_scn, const int64_t schema_version, const bool for_replay=false) override; diff --git a/src/storage/tx_table/ob_tx_data_memtable.cpp b/src/storage/tx_table/ob_tx_data_memtable.cpp index 5df2bbd6d..aff167904 100644 --- a/src/storage/tx_table/ob_tx_data_memtable.cpp +++ b/src/storage/tx_table/ob_tx_data_memtable.cpp @@ -418,7 +418,7 @@ bool ObTxDataMemtable::ready_for_flush() bool_ret = true; STORAGE_LOG(INFO, "memtable is frozen yet.", KP(this)); } else if (OB_FAIL(freezer_->get_max_consequent_callbacked_scn(max_consequent_callbacked_scn))) { - STORAGE_LOG(WARN, "get_max_consequent_callbacked_log_ts failed", K(ret), K(freezer_->get_ls_id())); + STORAGE_LOG(WARN, "get_max_consequent_callbacked_scn failed", K(ret), K(freezer_->get_ls_id())); } else if (max_consequent_callbacked_scn >= key_.scn_range_.end_scn_) { state_ = ObTxDataMemtable::State::FROZEN; set_snapshot_version(min_tx_scn_); @@ -463,7 +463,7 @@ int ObTxDataMemtable::do_sort_by_tx_id_() int64_t get_start_ts_(const ObTxData &tx_data) { - return tx_data.start_scn_.get_val_for_lsn_allocator(); + return tx_data.start_scn_.get_val_for_tx(); } int ObTxDataMemtable::do_sort_by_start_scn_() diff --git a/src/storage/tx_table/ob_tx_data_memtable.h b/src/storage/tx_table/ob_tx_data_memtable.h index 93c21f362..a8b1a44a5 100644 --- a/src/storage/tx_table/ob_tx_data_memtable.h +++ b/src/storage/tx_table/ob_tx_data_memtable.h @@ -239,7 +239,6 @@ public: /* derived from ObIMemtable */ const blocksstable::ObDatumRowkey &rowkey) override; public: // checkpoint - int64_t get_rec_log_ts(); palf::SCN get_rec_scn(); // int freeze(); @@ -277,7 +276,6 @@ public: // getter && setter void reset_is_iterating() { ATOMIC_STORE(&is_iterating_, false); } - // FIXME : @gengli remove palf::SCN get_end_scn() { return key_.scn_range_.end_scn_;} @@ -309,13 +307,13 @@ private: // ObTxDataMemtable bool is_iterating_; bool has_constructed_list_; - // the minimum log ts of commit_scn in this tx data memtable + // the minimum scn of commit_version in this tx data memtable palf::SCN min_tx_scn_; - // the maximum log ts in this tx data memtable + // the maximum scn in this tx data memtable palf::SCN max_tx_scn_; - // the minimum start log ts in this tx data memtable + // the minimum start scn in this tx data memtable palf::SCN min_start_scn_; int64_t inserted_cnt_; @@ -380,15 +378,15 @@ public: UNUSED(key); // printf basic info fprintf(fd_, - "ObTxData : tx_id=%-19ld is_in_memtable=%-3d state=%-8s start_scn=%-19ld " - "end_scn=%-19ld " - "commit_scn=%-19ld ", + "ObTxData : tx_id=%-19ld is_in_memtable=%-3d state=%-8s start_scn=%-19s " + "end_scn=%-19s " + "commit_version=%-19s ", tx_data->tx_id_.get_id(), tx_data->is_in_tx_data_table_, ObTxData::get_state_string(tx_data->state_), - tx_data->start_scn_.get_val_for_lsn_allocator(), - tx_data->end_scn_.get_val_for_lsn_allocator(), - tx_data->commit_scn_.get_val_for_lsn_allocator()); + to_cstring(tx_data->start_scn_), + to_cstring(tx_data->end_scn_), + to_cstring(tx_data->commit_version_)); // printf undo status list fprintf(fd_, "Undo Actions : {"); @@ -409,17 +407,6 @@ private: FILE *fd_; }; - -OB_INLINE int64_t ObTxDataMemtable::get_rec_log_ts() -{ - // TODO : @gengli - // rec_scn changes constantly. The rec_scn obtained by checkpoint mgr - // may be greater than the actual checkpoint of tx_data_memtable because the - // callback functions are not sequential. The checkpoint is determined both on - // the max-sequential callback point of the log and the rec_scn. - return min_tx_scn_.get_val_for_lsn_allocator(); -} - OB_INLINE palf::SCN ObTxDataMemtable::get_rec_scn() { // TODO : @gengli diff --git a/src/storage/tx_table/ob_tx_data_memtable_mgr.cpp b/src/storage/tx_table/ob_tx_data_memtable_mgr.cpp index d83072d2f..90146c2af 100644 --- a/src/storage/tx_table/ob_tx_data_memtable_mgr.cpp +++ b/src/storage/tx_table/ob_tx_data_memtable_mgr.cpp @@ -146,17 +146,6 @@ int ObTxDataMemtableMgr::create_memtable(const palf::SCN clog_checkpoint_scn, return ret; } -int ObTxDataMemtableMgr::create_memtable(const int64_t clog_checkpoint_ts, - const int64_t schema_version, - const bool for_replay) -{ - UNUSED(for_replay); - SCN clog_checkpoint_scn; - clog_checkpoint_scn.convert_for_lsn_allocator(clog_checkpoint_ts); - - return create_memtable(clog_checkpoint_scn, schema_version); -} - int ObTxDataMemtableMgr::create_memtable_(const palf::SCN clog_checkpoint_scn, int64_t schema_version) { UNUSED(schema_version); diff --git a/src/storage/tx_table/ob_tx_data_memtable_mgr.h b/src/storage/tx_table/ob_tx_data_memtable_mgr.h index faf4f1ae9..672008ca2 100644 --- a/src/storage/tx_table/ob_tx_data_memtable_mgr.h +++ b/src/storage/tx_table/ob_tx_data_memtable_mgr.h @@ -63,13 +63,10 @@ public: // ObTxDataMemtableMgr * @brief Using to create a new active tx data memtable * * @param[in] clog_checkpoint_ts clog_checkpoint_ts, using to init multiversion_start, - * base_version and start_log_ts. The start_log_ts will be modified if this function is called by + * base_version and start_scn. The start_scn will be modified if this function is called by * freeze(). * @param[in] schema_version schema_version, not used */ - virtual int create_memtable(const int64_t clog_checkpoint_ts, - const int64_t schema_version, - const bool for_replay=false) override; virtual int create_memtable(const palf::SCN clog_checkpoint_scn, const int64_t schema_version, const bool for_replay=false) override; diff --git a/src/storage/tx_table/ob_tx_data_table.cpp b/src/storage/tx_table/ob_tx_data_table.cpp index 9e5fe0a06..532aeeaed 100644 --- a/src/storage/tx_table/ob_tx_data_table.cpp +++ b/src/storage/tx_table/ob_tx_data_table.cpp @@ -179,7 +179,7 @@ void ObTxDataTable::stop() void ObTxDataTable::reset() { min_start_scn_in_ctx_.reset(); - last_update_min_start_scn_ts_ = 0; + last_update_ts_ = 0; tablet_id_ = 0; ls_ = nullptr; ls_tablet_svr_ = nullptr; @@ -383,7 +383,7 @@ int ObTxDataTable::insert_(ObTxData *&tx_data, ObTxDataMemtableWriteGuard &write tg.click(); // If this tx data can not be inserted into all memtables, check if it should be filtered. - // We use the start log ts of the first memtable as the filtering time stamp + // We use the start scn of the first memtable as the filtering time stamp if (OB_SUCC(ret) && OB_NOT_NULL(tx_data) && OB_NOT_NULL(tx_data_memtable)) { SCN clog_checkpoint_scn = tx_data_memtable->get_key().get_start_scn(); if (tx_data->end_scn_ <= clog_checkpoint_scn) { @@ -407,7 +407,7 @@ int ObTxDataTable::insert_(ObTxData *&tx_data, ObTxDataMemtableWriteGuard &write return ret; } -// A tx data with lesser end_log_ts may be inserted after a tx data with greater end_log_ts due to +// A tx data with lesser end_scn may be inserted after a tx data with greater end_scn due to // the out-of-order callback of log. This function will retain the newer tx data and delete the // older one. int ObTxDataTable::insert_into_memtable_(ObTxDataMemtable *tx_data_memtable, ObTxData *&tx_data) @@ -438,7 +438,7 @@ int ObTxDataTable::insert_into_memtable_(ObTxDataMemtable *tx_data_memtable, ObT // successfully remove } } else { - // The end_log_ts of tx data in memtable is greater than or equal to the end_log_ts of current + // The end_scn of tx data in memtable is greater than or equal to the end_scn of current // tx data, which means the tx data waiting to insert is an older one. So we set need_insert = // false to skip inserting this tx data need_insert = false; @@ -735,7 +735,7 @@ int ObTxDataTable::get_ls_min_end_scn_in_latest_tablets_(SCN &min_end_scn) while (OB_SUCC(iterator.get_next_tablet(tablet_handle))) { SCN end_scn_from_single_tablet = SCN::max_scn(); if (OB_FAIL(get_min_end_scn_from_single_tablet_(tablet_handle, end_scn_from_single_tablet))) { - STORAGE_LOG(WARN, "get min end_log_ts from a single tablet failed.", KR(ret)); + STORAGE_LOG(WARN, "get min end_scn from a single tablet failed.", KR(ret)); } else if (end_scn_from_single_tablet < min_end_scn) { min_end_scn = end_scn_from_single_tablet; } @@ -792,27 +792,13 @@ int ObTxDataTable::self_freeze_task() return ret; } -// The main steps in calculating upper_trans_scn. For more details, see : +// The main steps in calculating upper_trans_version. For more details, see : // https://yuque.antfin-inc.com/ob/transaction/lurtok -int ObTxDataTable::get_upper_trans_version_before_given_log_ts(const int64_t sstable_end_log_ts, - int64_t &upper_trans_version) -{ - int ret = OB_SUCCESS; - SCN sstable_end_scn; - sstable_end_scn.convert_for_lsn_allocator(sstable_end_log_ts); - SCN tmp_upper_trans_version; - if (OB_FAIL(get_upper_trans_scn_before_given_scn(sstable_end_scn, tmp_upper_trans_version))) { - } else { - upper_trans_version = tmp_upper_trans_version.get_val_for_lsn_allocator(); - } - return ret; -} - -int ObTxDataTable::get_upper_trans_scn_before_given_scn(const SCN sstable_end_scn, SCN &upper_trans_scn) +int ObTxDataTable::get_upper_trans_version_before_given_scn(const SCN sstable_end_scn, SCN &upper_trans_version) { int ret = OB_SUCCESS; bool skip_calc = false; - upper_trans_scn.set_max(); + upper_trans_version.set_max(); STORAGE_LOG(DEBUG, "start get upper trans version", K(get_ls_id())); @@ -820,9 +806,9 @@ int ObTxDataTable::get_upper_trans_scn_before_given_scn(const SCN sstable_end_sc ret = OB_NOT_INIT; STORAGE_LOG(WARN, "The tx data table is not inited.", KR(ret)); } else if (true == (skip_calc = skip_this_sstable_end_scn_(sstable_end_scn))) { - // there is a start_log_ts of running transactions is smaller than the sstable_end_log_ts + // there is a start_scn of running transactions is smaller than the sstable_end_scn } else { - TCWLockGuard lock_guard(calc_upper_trans_scn_cache_.lock_); + TCWLockGuard lock_guard(calc_upper_trans_version_cache_.lock_); if (OB_FAIL(update_cache_if_needed_(skip_calc))) { STORAGE_LOG(WARN, "update cache failed.", KR(ret)); } @@ -830,14 +816,14 @@ int ObTxDataTable::get_upper_trans_scn_before_given_scn(const SCN sstable_end_sc if (OB_FAIL(ret)) { } else if (skip_calc) { - } else if (0 == calc_upper_trans_scn_cache_.commit_scns_.array_.count()) { - STORAGE_LOG(ERROR, "Unexpected empty array.", K(calc_upper_trans_scn_cache_)); + } else if (0 == calc_upper_trans_version_cache_.commit_scns_.array_.count()) { + STORAGE_LOG(ERROR, "Unexpected empty array.", K(calc_upper_trans_version_cache_)); } else { - TCRLockGuard lock_guard(calc_upper_trans_scn_cache_.lock_); - if (!calc_upper_trans_scn_cache_.commit_scns_.is_valid()) { + TCRLockGuard lock_guard(calc_upper_trans_version_cache_.lock_); + if (!calc_upper_trans_version_cache_.commit_scns_.is_valid()) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(ERROR, "invalid cache for upper trans version calculation", KR(ret)); - } else if (OB_FAIL(calc_upper_trans_scn_(sstable_end_scn, upper_trans_scn))) { + } else if (OB_FAIL(calc_upper_trans_scn_(sstable_end_scn, upper_trans_version))) { STORAGE_LOG(WARN, "calc upper trans version failed", KR(ret), "ls_id", get_ls_id()); } else { FLOG_INFO("get upper trans version finish.", @@ -845,7 +831,7 @@ int ObTxDataTable::get_upper_trans_scn_before_given_scn(const SCN sstable_end_sc K(skip_calc), "ls_id", get_ls_id(), K(sstable_end_scn), - K(upper_trans_scn)); + K(upper_trans_version)); } } return ret; @@ -855,20 +841,20 @@ int ObTxDataTable::get_upper_trans_scn_before_given_scn(const SCN sstable_end_sc // calculation. // // 1. Accuracy : -// If there are some running transactions with start_log_ts which is less than or equal to -// sstable_end_log_ts, we skip this upper_trans_version calculation because it is currently +// If there are some running transactions with start_scn which is less than or equal to +// sstable_end_scn, we skip this upper_trans_version calculation because it is currently // undetermined. // // 2. Performance : -// If there are some commited transactions with start_log_ts which is less than or equal to -// sstable_end_log_ts stil in tx data memtable, we skip this upper_trans_version calculation because +// If there are some commited transactions with start_scn which is less than or equal to +// sstable_end_scn stil in tx data memtable, we skip this upper_trans_version calculation because // calculating upper_trans_version in tx data memtable is very slow. bool ObTxDataTable::skip_this_sstable_end_scn_(SCN sstable_end_scn) { int ret = OB_SUCCESS; bool need_skip = false; int64_t cur_ts = common::ObTimeUtility::fast_current_time(); - int64_t tmp_update_ts = ATOMIC_LOAD(&last_update_min_start_scn_ts_); + int64_t tmp_update_ts = ATOMIC_LOAD(&last_update_ts_); SCN min_start_scn_in_tx_data_memtable = SCN::max_scn(); SCN max_decided_scn = SCN::min_scn(); @@ -880,13 +866,13 @@ bool ObTxDataTable::skip_this_sstable_end_scn_(SCN sstable_end_scn) STORAGE_LOG(INFO, "skip calc upper trans version once", K(max_decided_scn), K(sstable_end_scn)); } - // If the min_start_log_ts_in_ctx has not been updated for more than 30 seconds, + // If the min_start_scn_in_ctx has not been updated for more than 30 seconds, if (need_skip) { } else if (cur_ts - tmp_update_ts > 30_s - && tmp_update_ts == ATOMIC_CAS(&last_update_min_start_scn_ts_, tmp_update_ts, cur_ts)) { - // update last_update_min_start_log_ts + && tmp_update_ts == ATOMIC_CAS(&last_update_ts_, tmp_update_ts, cur_ts)) { + // update last_update_min_start_scn if (OB_FAIL(tx_ctx_table_->get_min_start_scn(min_start_scn_in_ctx_))) { - STORAGE_LOG(WARN, "get min start log ts from tx ctx table failed.", KR(ret)); + STORAGE_LOG(WARN, "get min start scn from tx ctx table failed.", KR(ret)); } } @@ -894,7 +880,7 @@ bool ObTxDataTable::skip_this_sstable_end_scn_(SCN sstable_end_scn) } else if (OB_FAIL(ret)) { need_skip = true; } else if (min_start_scn_in_ctx_ <= sstable_end_scn) { - // skip this sstable end log ts calculation + // skip this sstable end scn calculation need_skip = true; } else if (OB_FAIL(update_memtables_cache())) { STORAGE_LOG(WARN, "update memtables fail.", KR(ret)); @@ -918,7 +904,7 @@ bool ObTxDataTable::skip_this_sstable_end_scn_(SCN sstable_end_scn) = std::min(min_start_scn_in_tx_data_memtable, tx_data_memtable->get_min_start_scn()))) { } else if (sstable_end_scn >= min_start_scn_in_tx_data_memtable) { - // there is a min_start_log_ts in tx_data_memtable less than sstable_end_log_ts, skip this + // there is a min_start_scn in tx_data_memtable less than sstable_end_scn, skip this // calculation need_skip = true; break; @@ -949,13 +935,13 @@ int ObTxDataTable::update_cache_if_needed_(bool &skip_calc) if (nullptr == table) { skip_calc = true; - } else if (!calc_upper_trans_scn_cache_.is_inited_ || - calc_upper_trans_scn_cache_.cache_version_scn_.get_val_for_lsn_allocator() < table->get_end_log_ts()) { - ret = update_calc_upper_trans_scn_cache_(table); - } else if (calc_upper_trans_scn_cache_.cache_version_scn_.get_val_for_lsn_allocator() > table->get_end_log_ts()) { + } else if (!calc_upper_trans_version_cache_.is_inited_ || + calc_upper_trans_version_cache_.cache_version_ < table->get_end_scn()) { + ret = update_calc_upper_trans_version_cache_(table); + } else if (calc_upper_trans_version_cache_.cache_version_ > table->get_end_scn()) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(ERROR, - "the end log ts of the latest sstable is unexpected smaller", + "the end scn of the latest sstable is unexpected smaller", KR(ret), KPC(tablet_handle.get_obj()), KPC(table)); @@ -965,7 +951,7 @@ int ObTxDataTable::update_cache_if_needed_(bool &skip_calc) return ret; } -int ObTxDataTable::update_calc_upper_trans_scn_cache_(ObITable *table) +int ObTxDataTable::update_calc_upper_trans_version_cache_(ObITable *table) { int ret = OB_SUCCESS; STORAGE_LOG(DEBUG, "update calc upper trans version cache once."); @@ -982,26 +968,26 @@ int ObTxDataTable::update_calc_upper_trans_scn_cache_(ObITable *table) LOG_WARN("invalid tablet handle", KR(ret), K(tablet_handle), K(tablet_id_)); } else { ObCommitVersionsGetter getter(iter_param, table); - if (OB_FAIL(getter.get_next_row(calc_upper_trans_scn_cache_.commit_scns_))) { + if (OB_FAIL(getter.get_next_row(calc_upper_trans_version_cache_.commit_scns_))) { STORAGE_LOG(WARN, "update calc_upper_trans_trans_version_cache failed.", KR(ret), KPC(table)); } else { - calc_upper_trans_scn_cache_.is_inited_ = true; - calc_upper_trans_scn_cache_.cache_version_scn_.convert_for_lsn_allocator(table->get_end_log_ts()); + calc_upper_trans_version_cache_.is_inited_ = true; + calc_upper_trans_version_cache_.cache_version_ = table->get_end_scn(); // update calc_upper_trans_scn_cache succeed. } } return ret; } -int ObTxDataTable::calc_upper_trans_scn_(const SCN sstable_end_scn, SCN &upper_trans_scn) +int ObTxDataTable::calc_upper_trans_scn_(const SCN sstable_end_scn, SCN &upper_trans_version) { int ret = OB_SUCCESS; - const auto &array = calc_upper_trans_scn_cache_.commit_scns_.array_; + const auto &array = calc_upper_trans_version_cache_.commit_scns_.array_; int l = 0; int r = array.count() - 1; - // Binary find the first start_log_ts that is greater than or equal to sstable_end_log_ts + // Binary find the first start_scn that is greater than or equal to sstable_end_scn while (l < r) { int mid = (l + r) >> 1; if (array.at(mid).start_scn_ < sstable_end_scn) { @@ -1011,21 +997,21 @@ int ObTxDataTable::calc_upper_trans_scn_(const SCN sstable_end_scn, SCN &upper_t } } - // Check if the start_log_ts is greater than or equal to the sstable_end_log_ts. If not, delay the + // Check if the start_scn is greater than or equal to the sstable_end_scn. If not, delay the // upper_trans_version calculation to the next time. - if (0 == array.count() || !array.at(l).commit_scn_.is_valid()) { - upper_trans_scn.set_max(); + if (0 == array.count() || !array.at(l).commit_version_.is_valid()) { + upper_trans_version.set_max(); ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "unexpected array count or commit version", KR(ret), K(array.count()), K(array.at(l))); } else { - upper_trans_scn = array.at(l).commit_scn_; + upper_trans_version = array.at(l).commit_version_; } STORAGE_LOG(INFO, "calculate upper trans version finish", K(sstable_end_scn), - K(upper_trans_scn), - K(calc_upper_trans_scn_cache_), + K(upper_trans_version), + K(calc_upper_trans_version_cache_), "ls_id", get_ls_id(), "array_count", array.count(), "chose_idx", l); diff --git a/src/storage/tx_table/ob_tx_data_table.h b/src/storage/tx_table/ob_tx_data_table.h index 4dca16239..10276a20f 100644 --- a/src/storage/tx_table/ob_tx_data_table.h +++ b/src/storage/tx_table/ob_tx_data_table.h @@ -105,7 +105,7 @@ public: // ObTxDataTable : is_inited_(false), is_started_(false), min_start_scn_in_ctx_(), - last_update_min_start_scn_ts_(0), + last_update_ts_(0), tablet_id_(0), mem_attr_(), slice_allocator_(), @@ -186,11 +186,9 @@ public: // ObTxDataTable int get_recycle_scn(palf::SCN &recycle_scn); /** - * @brief see ObTxTable::get_upper_trans_version_before_given_log_ts() + * @brief see ObTxTable::get_upper_trans_version_before_given_scn() */ - int get_upper_trans_version_before_given_log_ts(const int64_t sstable_end_log_ts, - int64_t &upper_trans_version); - int get_upper_trans_scn_before_given_scn(const palf::SCN sstable_end_scn, palf::SCN &upper_trans_scn); + int get_upper_trans_version_before_given_scn(const palf::SCN sstable_end_scn, palf::SCN &upper_trans_version); /** * @brief see ObTxTable::supplement_undo_actions_if_exist @@ -216,7 +214,7 @@ public: // ObTxDataTable K_(is_inited), K_(is_started), K_(min_start_scn_in_ctx), - K_(last_update_min_start_scn_ts), + K_(last_update_ts), K_(tablet_id), KP_(ls), KP_(ls_tablet_svr), @@ -258,7 +256,7 @@ private: int update_cache_if_needed_(bool &skip_calc); - int update_calc_upper_trans_scn_cache_(ObITable *table); + int update_calc_upper_trans_version_cache_(ObITable *table); int calc_upper_trans_scn_(const palf::SCN sstable_end_scn, palf::SCN &upper_trans_version); @@ -274,15 +272,6 @@ private: int dump_tx_data_in_memtable_2_text_(const transaction::ObTransID tx_id, FILE *fd); int dump_tx_data_in_sstable_2_text_(const transaction::ObTransID tx_id, FILE *fd); - // int DEBUG_slowly_calc_upper_trans_scn_(const int64_t sstable_end_log_ts, - // int64_t &tmp_upper_trans_version); - - // int DEBUG_calc_with_all_sstables_(ObTableAccessContext &access_context, - // const int64_t sstable_end_log_ts, - // int64_t &tmp_upper_trans_version); - // int DEBUG_calc_with_row_iter_(ObStoreRowIterator *row_iter, - // const int64_t sstable_end_log_ts, - // int64_t &tmp_upper_trans_version); bool skip_this_sstable_end_scn_(palf::SCN sstable_end_scn); void print_alloc_size_for_test_(); @@ -316,7 +305,7 @@ private: bool is_inited_; bool is_started_; palf::SCN min_start_scn_in_ctx_; - int64_t last_update_min_start_scn_ts_; + int64_t last_update_ts_; ObTabletID tablet_id_; ObMemAttr mem_attr_; // Allocator to allocate ObTxData and ObUndoStatus @@ -329,7 +318,7 @@ private: ObTxDataMemtableMgr *memtable_mgr_; ObTxCtxTable *tx_ctx_table_; TxDataReadSchema read_schema_; - CalcUpperTransSCNCache calc_upper_trans_scn_cache_; + CalcUpperTransSCNCache calc_upper_trans_version_cache_; MemtableHandlesCache memtables_cache_; }; // tx_table diff --git a/src/storage/tx_table/ob_tx_table.cpp b/src/storage/tx_table/ob_tx_table.cpp index e8be9ebfd..a92aea027 100644 --- a/src/storage/tx_table/ob_tx_table.cpp +++ b/src/storage/tx_table/ob_tx_table.cpp @@ -52,7 +52,7 @@ int ObTxTable::init(ObLS *ls) } else { ls_ = ls; epoch_ = 0; - max_tablet_clog_checkpoint_ = 0; + max_tablet_clog_checkpoint_ = SCN::min_scn(); state_ = TxTableState::ONLINE; LOG_INFO("init tx table successfully", K(ret), K(ls->get_ls_id())); is_inited_ = true; @@ -170,8 +170,8 @@ int ObTxTable::offline() int ObTxTable::prepare_online() { int ret = OB_SUCCESS; - int64_t tmp_max_tablet_clog_checkpiont = -1; - max_tablet_clog_checkpoint_ = INT64_MAX; + SCN tmp_max_tablet_clog_checkpiont = SCN::min_scn(); + max_tablet_clog_checkpoint_ = SCN::invalid_scn(); ATOMIC_INC(&epoch_); @@ -197,23 +197,23 @@ int ObTxTable::prepare_online() int ObTxTable::check_and_online() { int ret = OB_SUCCESS; - int64_t max_consequent_callbacked_log_ts = 0; + SCN max_consequent_callbacked_scn = SCN::min_scn(); - if (OB_FAIL(ls_->get_max_decided_log_ts_ns(max_consequent_callbacked_log_ts))) { - LOG_WARN("get max decided log ts from ls failed", KR(ret), "ls_id", ls_->get_ls_id()); - } else if (max_consequent_callbacked_log_ts >= max_tablet_clog_checkpoint_) { + if (OB_FAIL(ls_->get_max_decided_scn(max_consequent_callbacked_scn))) { + LOG_WARN("get max decided scn from ls failed", KR(ret), "ls_id", ls_->get_ls_id()); + } else if (max_consequent_callbacked_scn >= max_tablet_clog_checkpoint_) { ATOMIC_STORE(&state_, TxTableState::ONLINE); LOG_INFO("tx table online finish", "ls_id", ls_->get_ls_id(), - K(max_consequent_callbacked_log_ts), + K(max_consequent_callbacked_scn), K(max_tablet_clog_checkpoint_)); } else { int64_t current_ts = ObTimeUtil::current_time(); int64_t time_after_prepare_online_ms = (current_ts - prepare_online_ts_) / 1000LL; LOG_INFO("tx table is PREPARE_ONLINE but not ONLINE yet", "ls_id", ls_->get_ls_id(), - K(max_consequent_callbacked_log_ts), + K(max_consequent_callbacked_scn), K(max_tablet_clog_checkpoint_), K(time_after_prepare_online_ms), KTIME(current_ts), @@ -425,7 +425,7 @@ int ObTxTable::get_data_table_schema_(const uint64_t tenant_id, const char *const TX_ID_NAME = "tx_id"; const char *const IDX_NAME = "idx"; const char *const TOTAL_ROW_CNT_NAME = "total_row_cnt"; - const char *const END_TS_NAME = "end_log_ts"; + const char *const END_SCN_NAME = "end_scn"; const char *const VALUE_NAME = "tx_info"; const char *const TABLE_NAME = "tx_data_table"; const int64_t SCHEMA_VERSION = 1; @@ -497,8 +497,8 @@ int ObTxTable::get_data_table_schema_(const uint64_t tenant_id, LOG_WARN("failed to set column name", KR(ret), K(IDX_NAME)); } else if (OB_FAIL(total_row_cnt_column.set_column_name(TOTAL_ROW_CNT_NAME))) { LOG_WARN("failed to set column name", KR(ret), K(TOTAL_ROW_CNT_NAME)); - } else if (OB_FAIL(end_ts_column.set_column_name(END_TS_NAME))) { - LOG_WARN("failed to set column name", KR(ret), K(END_TS_NAME)); + } else if (OB_FAIL(end_ts_column.set_column_name(END_SCN_NAME))) { + LOG_WARN("failed to set column name", KR(ret), K(END_SCN_NAME)); } else if (OB_FAIL(value_column.set_column_name(VALUE_NAME))) { LOG_WARN("failed to set column name", KR(ret), K(VALUE_NAME)); } else if (OB_FAIL(schema.set_table_name(TABLE_NAME))) { @@ -648,7 +648,7 @@ int ObTxTable::load_tx_data_table_() return ret; } -int ObTxTable::get_max_tablet_clog_checkpoint_(int64_t &max_tablet_clog_checkpoint) +int ObTxTable::get_max_tablet_clog_checkpoint_(SCN &max_tablet_clog_checkpoint) { int ret = OB_SUCCESS; ObLSTabletIterator tablet_iter(ObTabletCommon::DIRECT_GET_COMMITTED_TABLET_TIMEOUT_US); @@ -669,7 +669,7 @@ int ObTxTable::get_max_tablet_clog_checkpoint_(int64_t &max_tablet_clog_checkpoi } else if (tablet_handle.get_obj()->get_tablet_meta().tablet_id_.is_inner_tablet()) { // skip inner tablet } else { - int64_t tmp_clog_checkpoint = tablet_handle.get_obj()->get_clog_checkpoint_ts(); + SCN tmp_clog_checkpoint = tablet_handle.get_obj()->get_clog_checkpoint_scn(); if (tmp_clog_checkpoint > max_tablet_clog_checkpoint) { max_tablet_clog_checkpoint = tmp_clog_checkpoint; } @@ -951,26 +951,26 @@ int ObTxTable::check_sql_sequence_can_read(const transaction::ObTransID &data_tr return ret; } -int ObTxTable::get_tx_state_with_log_ts(const transaction::ObTransID &data_trans_id, - const int64_t log_ts, +int ObTxTable::get_tx_state_with_scn(const transaction::ObTransID &data_trans_id, + const SCN scn, const int64_t read_epoch, int64_t &state, - int64_t &trans_version) + SCN &trans_version) { - GetTxStateWithLogTSFunctor fn(log_ts, state, trans_version); + GetTxStateWithSCNFunctor fn(scn, state, trans_version); int ret = check_with_tx_data(data_trans_id, fn, read_epoch); // TODO(handora.qc): remove it - LOG_DEBUG("finish get tx state with log ts", K(data_trans_id), K(log_ts), K(state), K(trans_version)); + LOG_DEBUG("finish get tx state with scn", K(data_trans_id), K(scn), K(state), K(trans_version)); return ret; } int ObTxTable::try_get_tx_state(const transaction::ObTransID tx_id, const int64_t read_epoch, int64_t &state, - int64_t &trans_version) + SCN &trans_version) { int ret = OB_SUCCESS; - GetTxStateWithLogTSFunctor fn(INT64_MAX, state, trans_version); + GetTxStateWithSCNFunctor fn(SCN::max_scn(), state, trans_version); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("tx table is not init.", KR(ret), K(tx_id)); @@ -988,7 +988,7 @@ int ObTxTable::try_get_tx_state(const transaction::ObTransID tx_id, int ObTxTable::lock_for_read(const transaction::ObLockForReadArg &lock_for_read_arg, const int64_t read_epoch, bool &can_read, - int64_t &trans_version, + SCN &trans_version, bool &is_determined_state, const ObCleanoutOp &cleanout_op, const ObReCheckOp &recheck_op) @@ -1017,21 +1017,9 @@ int ObTxTable::get_recycle_scn(SCN &recycle_scn) return ret; } -int ObTxTable::get_upper_trans_version_before_given_log_ts(const int64_t sstable_end_log_ts, int64_t &upper_trans_version) +int ObTxTable::get_upper_trans_version_before_given_scn(const palf::SCN sstable_end_scn, palf::SCN &upper_trans_version) { - return tx_data_table_.get_upper_trans_version_before_given_log_ts(sstable_end_log_ts, upper_trans_version); -} - -int ObTxTable::get_start_tx_log_ts(int64_t &start_tx_log_ts) -{ - int ret = OB_SUCCESS; - SCN tmp_start_scn; - if (OB_FAIL(get_start_tx_scn(tmp_start_scn))) { - STORAGE_LOG(WARN, "get start tx scn failed", KR(ret)); - } else { - start_tx_log_ts = tmp_start_scn.get_val_for_lsn_allocator(); - } - return ret; + return tx_data_table_.get_upper_trans_version_before_given_scn(sstable_end_scn, upper_trans_version); } int ObTxTable::get_start_tx_scn(SCN &start_tx_scn) diff --git a/src/storage/tx_table/ob_tx_table.h b/src/storage/tx_table/ob_tx_table.h index 6676863d0..7864439c4 100644 --- a/src/storage/tx_table/ob_tx_table.h +++ b/src/storage/tx_table/ob_tx_table.h @@ -54,15 +54,16 @@ public: ObTxTable() : is_inited_(false), epoch_(INVALID_READ_EPOCH), - max_tablet_clog_checkpoint_(0), + max_tablet_clog_checkpoint_(), prepare_online_ts_(0), state_(OFFLINE), ls_(nullptr), tx_data_table_(default_tx_data_table_) {} + ObTxTable(ObTxDataTable &tx_data_table) : is_inited_(false), epoch_(INVALID_READ_EPOCH), - max_tablet_clog_checkpoint_(0), + max_tablet_clog_checkpoint_(), prepare_online_ts_(0), state_(OFFLINE), ls_(nullptr), @@ -144,16 +145,16 @@ public: * https://yuque.antfin-inc.com/ob/storage/adk6yx * * @param[in] data_trans_id - * @param[in] log_ts + * @param[in] scn * @param[in] read_epoch * @param[out] state * @param[out] trans_version */ - int get_tx_state_with_log_ts(const transaction::ObTransID &data_trans_id, - const int64_t log_ts, + int get_tx_state_with_scn(const transaction::ObTransID &data_trans_id, + const palf::SCN scn, const int64_t read_epoch, int64_t &state, - int64_t &trans_version); + palf::SCN &trans_version); /** * @brief Try to get a tx data from tx_data_table. This function used in special situation when the trans service do @@ -167,7 +168,7 @@ public: int try_get_tx_state(const transaction::ObTransID tx_id, const int64_t read_epoch, int64_t &state, - int64_t &trans_version); + palf::SCN &trans_version); /** * @brief the txn READ_TRANS_ID use SNAPSHOT_VERSION to read the data, and check whether the data is locked, readable or unreadable by txn DATA_TRANS_ID. READ_LATEST is used to check whether read the data belong to the same txn @@ -182,7 +183,7 @@ public: int lock_for_read(const transaction::ObLockForReadArg &lock_for_read_arg, const int64_t read_epoch, bool &can_read, - int64_t &trans_version, + palf::SCN &trans_version, bool &is_determined_state, const ObCleanoutOp &cleanout_op = ObCleanoutNothingOperation(), const ObReCheckOp &recheck_op = ObReCheckNothingOperation()); @@ -208,7 +209,7 @@ public: * @brief The tx data sstables need to be cleared periodically. This function returns a recycle_scn * to decide which tx data should be cleared. * - * @param[out] recycle_scn the tx data whose end_log_ts is smaller or equals to the recycle_scn can + * @param[out] recycle_scn the tx data whose end_scn is smaller or equals to the recycle_scn can * be cleared. */ int get_recycle_scn(palf::SCN &recycle_scn); @@ -222,12 +223,12 @@ public: } /** - * @brief Get the upper trans version for each given end_log_ts + * @brief Get the upper trans version for each given end_scn * - * @param[in] sstable_end_log_ts the end_log_ts of the data sstable which is waitting to get the upper_trans_version + * @param[in] sstable_end_scn the end_scn of the data sstable which is waitting to get the upper_trans_version * @param[out] upper_trans_version the upper_trans_version */ - int get_upper_trans_version_before_given_log_ts(const int64_t sstable_end_log_ts, int64_t &upper_trans_version); + int get_upper_trans_version_before_given_scn(const palf::SCN sstable_end_scn, palf::SCN &upper_trans_version); /** * @brief When a transaction is replayed in the middle, it will read tx data from tx data sstable @@ -248,7 +249,6 @@ public: * * @param[out] start_tx_scn */ - int get_start_tx_log_ts(int64_t &start_log_ts); int get_start_tx_scn(palf::SCN &start_tx_scn); int dump_single_tx_data_2_text(const int64_t tx_id_int, const char *fname); @@ -303,7 +303,7 @@ private: int load_tx_data_table_(); int offline_tx_ctx_table_(); int offline_tx_data_table_(); - int get_max_tablet_clog_checkpoint_(int64_t &max_tablet_clog_checkpoint); + int get_max_tablet_clog_checkpoint_(palf::SCN &max_tablet_clog_checkpoint); void check_state_and_epoch_(const transaction::ObTransID tx_id, const int64_t read_epoch, @@ -316,7 +316,7 @@ private: static const int64_t LS_TX_CTX_SCHEMA_COLUMN_CNT = 3; bool is_inited_; int64_t epoch_ CACHE_ALIGNED; - int64_t max_tablet_clog_checkpoint_; + palf::SCN max_tablet_clog_checkpoint_; int64_t prepare_online_ts_; TxTableState state_ CACHE_ALIGNED; ObLS *ls_; diff --git a/src/storage/tx_table/ob_tx_table_define.cpp b/src/storage/tx_table/ob_tx_table_define.cpp index 46acab0b8..741548af1 100644 --- a/src/storage/tx_table/ob_tx_table_define.cpp +++ b/src/storage/tx_table/ob_tx_table_define.cpp @@ -275,7 +275,7 @@ DEF_TO_STRING(ObCommitSCNsArray::Node) { int64_t pos = 0; J_KV(K_(start_scn), - K_(commit_scn)); + K_(commit_version)); return pos; } @@ -353,7 +353,7 @@ int ObCommitSCNsArray::serialize_(char *buf, const int64_t buf_len, int64_t &pos { int ret = OB_SUCCESS; for (int i = 0; OB_SUCC(ret) && i < array_.count(); i++) { - LST_DO_CODE(OB_UNIS_ENCODE, array_.at(i).start_scn_, array_.at(i).commit_scn_); + LST_DO_CODE(OB_UNIS_ENCODE, array_.at(i).start_scn_, array_.at(i).commit_version_); } return ret; } @@ -364,7 +364,7 @@ int ObCommitSCNsArray::deserialize_(const char *buf, const int64_t data_len, int ObCommitSCNsArray::Node node; while (OB_SUCC(ret) && pos < data_len) { - LST_DO_CODE(OB_UNIS_DECODE, node.start_scn_, node.commit_scn_); + LST_DO_CODE(OB_UNIS_DECODE, node.start_scn_, node.commit_version_); array_.push_back(node); } @@ -375,7 +375,7 @@ int64_t ObCommitSCNsArray::get_serialize_size_() const { int64_t len = 0; for (int i = 0; i < array_.count(); i++) { - LST_DO_CODE(OB_UNIS_ADD_LEN, array_.at(i).start_scn_, array_.at(i).commit_scn_); + LST_DO_CODE(OB_UNIS_ADD_LEN, array_.at(i).start_scn_, array_.at(i).commit_version_); } return len; } @@ -385,9 +385,9 @@ bool ObCommitSCNsArray::is_valid() bool bool_ret = true; for (int i = 0; i < array_.count() - 1; i++) { if (!array_.at(i).start_scn_.is_valid() || - !array_.at(i).commit_scn_.is_valid() || + !array_.at(i).commit_version_.is_valid() || array_.at(i).start_scn_ > array_.at(i + 1).start_scn_ || - array_.at(i).start_scn_ > array_.at(i).commit_scn_) { + array_.at(i).start_scn_ > array_.at(i).commit_version_) { bool_ret = false; STORAGE_LOG(ERROR, "this commit version array is invalid", K(array_.at(i)), K(array_.at(i + 1))); diff --git a/src/storage/tx_table/ob_tx_table_define.h b/src/storage/tx_table/ob_tx_table_define.h index c46509865..b2ed98891 100644 --- a/src/storage/tx_table/ob_tx_table_define.h +++ b/src/storage/tx_table/ob_tx_table_define.h @@ -198,18 +198,18 @@ private: public: struct Node { palf::SCN start_scn_; - palf::SCN commit_scn_; + palf::SCN commit_version_; - Node() : start_scn_(), commit_scn_() {} + Node() : start_scn_(), commit_version_() {} - Node(const palf::SCN start_scn, const palf::SCN commit_scn) - : start_scn_(start_scn), commit_scn_(commit_scn) {} + Node(const palf::SCN start_scn, const palf::SCN commit_version) + : start_scn_(start_scn), commit_version_(commit_version) {} bool operator==(const Node &rhs) const { bool is_equal = true; if (this->start_scn_ != rhs.start_scn_ - || this->commit_scn_ != rhs.commit_scn_) { + || this->commit_version_ != rhs.commit_version_) { is_equal = false; } return is_equal; @@ -243,9 +243,9 @@ public: if (i % 3 == 0) { fprintf(stderr, "\n "); } - fprintf(stderr, "(start_log_ts=%-20ld, commit_version=%-20ld) ", - commit_versions.array_.at(i).start_scn_.get_val_for_lsn_allocator(), - commit_versions.array_.at(i).commit_scn_.get_val_for_lsn_allocator()); + fprintf(stderr, "(start_scn=%-20s, commit_version=%-20s) ", + to_cstring(commit_versions.array_.at(i).start_scn_), + to_cstring(commit_versions.array_.at(i).commit_version_)); } fprintf(stderr, "\npre-process data end.\n"); } @@ -264,22 +264,22 @@ public: class CalcUpperTransSCNCache { public: - CalcUpperTransSCNCache() : is_inited_(false), cache_version_scn_(), commit_scns_() {} + CalcUpperTransSCNCache() : is_inited_(false), cache_version_(), commit_scns_() {} void reset() { is_inited_ = false; - cache_version_scn_.reset(); + cache_version_.reset(); commit_scns_.reset(); } - TO_STRING_KV(K_(is_inited), K_(cache_version_scn), K_(commit_scns)); + TO_STRING_KV(K_(is_inited), K_(cache_version), K_(commit_scns)); public: bool is_inited_; - // The end_log_ts of the sstable will be used as the cache_version - palf::SCN cache_version_scn_; + // The end_scn of the sstable will be used as the cache_version + palf::SCN cache_version_; mutable common::TCRWLock lock_; diff --git a/src/storage/tx_table/ob_tx_table_iterator.cpp b/src/storage/tx_table/ob_tx_table_iterator.cpp index f89b69715..1bfd24cbb 100644 --- a/src/storage/tx_table/ob_tx_table_iterator.cpp +++ b/src/storage/tx_table/ob_tx_table_iterator.cpp @@ -96,7 +96,7 @@ int ObTxDataMemtableScanIterator::init(ObTxDataMemtable *tx_data_memtable) // cur_node_ point to the next tx data cur_node_ = cur_node_->next_; tx_data_memtable_ = tx_data_memtable; - cur_max_commit_scn_.set_min(); + cur_max_commit_version_.set_min(); pre_start_scn_.set_min(); tx_data_row_cnt_ = 0; pre_tx_data_ = nullptr; @@ -113,7 +113,7 @@ void ObTxDataMemtableScanIterator::reset() tx_data_memtable_->reset_is_iterating(); } dump_tx_data_done_ = false; - cur_max_commit_scn_.set_min(); + cur_max_commit_version_.set_min(); pre_start_scn_.set_min(); tx_data_row_cnt_ = 0; pre_tx_data_ = nullptr; @@ -215,7 +215,7 @@ int ObTxDataMemtableScanIterator::get_next_tx_data_row_(const blocksstable::ObDa int64_t end_ts_column = TX_DATA_END_TS_COLUMN + SSTABLE_HIDDEN_COLUMN_CNT; int64_t value_column = TX_DATA_VAL_COLUMN + SSTABLE_HIDDEN_COLUMN_CNT; row_.storage_datums_[total_row_cnt_column].set_int(1); - row_.storage_datums_[end_ts_column].set_int(tx_data->end_scn_.get_val_for_tx()); + row_.storage_datums_[end_ts_column].set_int(tx_data->end_scn_.get_val_for_row_cell()); row_.storage_datums_[value_column].set_string(ObString(serialize_size, buf_.get_ptr())); row_.set_first_multi_version_row(); row_.set_last_multi_version_row(); @@ -234,8 +234,8 @@ int ObTxDataMemtableScanIterator::get_next_tx_data_row_(const blocksstable::ObDa // This function is called after sorting tx_data by start_scn and the following steps is // executed: -// 1. Select (start_scn, commit_scn) point per second and push them into an array. -// 2. Read (start_scn, commit_scn) array from the latest tx data sstable. +// 1. Select (start_scn, commit_version) point per second and push them into an array. +// 2. Read (start_scn, commit_version) array from the latest tx data sstable. // 3. Get the recycle_scn to filtrate the point which is not needed any more. // 4. Merge the arrays above. This procedure should filtrate the points are not needed and keep the // commit versions monotonically increasing. @@ -298,7 +298,7 @@ int ObTxDataMemtableScanIterator::DEBUG_try_calc_upper_and_check_(ObCommitSCNsAr SCN upper_trans_version = SCN::min_scn(); if (OB_FAIL(DEBUG_fake_calc_upper_trans_version(tx_data->start_scn_, upper_trans_version, merged_commit_versions))) { STORAGE_LOG(ERROR, "invalid upper trans version", KR(ret)); - } else if (upper_trans_version < tx_data->commit_scn_) { + } else if (upper_trans_version < tx_data->commit_version_) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(ERROR, "invalid upper trans version", KR(ret), K(upper_trans_version), KPC(tx_data)); } @@ -326,7 +326,7 @@ int ObTxDataMemtableScanIterator::DEBUG_fake_calc_upper_trans_version(const SCN int l = 0; int r = array.count() - 1; - // Binary find the first start_scn that is greater than or equal to sstable_end_log_ts + // Binary find the first start_scn that is greater than or equal to sstable_end_scn while (l < r) { int mid = (l + r) >> 1; if (array.at(mid).start_scn_ < sstable_end_scn) { @@ -336,14 +336,14 @@ int ObTxDataMemtableScanIterator::DEBUG_fake_calc_upper_trans_version(const SCN } } - // Check if the start_scn is greater than or equal to the sstable_end_log_ts. If not, delay the + // Check if the start_scn is greater than or equal to the sstable_end_scn. If not, delay the // upper_trans_version calculation to the next time. - if (0 == array.count() || !array.at(l).commit_scn_.is_valid()) { + if (0 == array.count() || !array.at(l).commit_version_.is_valid()) { upper_trans_version.set_max(); ret = OB_ERR_UNDEFINED; STORAGE_LOG(WARN, "unexpected array count or commit version", K(array.count()), K(array.at(l))); } else { - upper_trans_version = array.at(l).commit_scn_; + upper_trans_version = array.at(l).commit_version_; } return ret; @@ -352,7 +352,7 @@ int ObTxDataMemtableScanIterator::DEBUG_fake_calc_upper_trans_version(const SCN void ObTxDataMemtableScanIterator::DEBUG_print_start_scn_list_() { int ret = OB_SUCCESS; - const char *real_fname = "tx_data_start_log_ts_list"; + const char *real_fname = "tx_data_start_scn_list"; FILE *fd = NULL; if (NULL == (fd = fopen(real_fname, "w"))) { @@ -369,18 +369,18 @@ void ObTxDataMemtableScanIterator::DEBUG_print_start_scn_list_() fprintf(fd, "ObTxData : tx_id=%-19ld is_in_memtable=%-3d state=%-8s start_scn=%-19s " "end_scn=%-19s " - "commit_scn=%-19s\n", + "commit_version=%-19s\n", tx_data->tx_id_.get_id(), tx_data->is_in_tx_data_table_, ObTxData::get_state_string(tx_data->state_), to_cstring(tx_data->start_scn_), to_cstring(tx_data->end_scn_), - to_cstring(tx_data->commit_scn_)); + to_cstring(tx_data->commit_version_)); } } if (NULL != fd) { - fprintf(fd, "end of start log ts list\n"); + fprintf(fd, "end of start scn list\n"); fclose(fd); fd = NULL; } @@ -405,9 +405,9 @@ void ObTxDataMemtableScanIterator::DEBUG_print_merged_commit_versions_(ObCommitS for (int i = 0; i < array.count(); i++) { fprintf(fd, "start_scn=%-19s " - "commit_scn=%-19s\n", + "commit_version=%-19s\n", to_cstring(array.at(i).start_scn_), - to_cstring(array.at(i).commit_scn_)); + to_cstring(array.at(i).commit_version_)); } } @@ -490,19 +490,19 @@ int ObTxDataMemtableScanIterator::periodical_get_next_commit_scn_(ObCommitSCNsAr if (DEBUG_last_start_scn_ > tx_data->start_scn_) { ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(ERROR, "unexpected start log ts order", K(DEBUG_last_start_scn_), KPC(tx_data)); + STORAGE_LOG(ERROR, "unexpected start scn order", K(DEBUG_last_start_scn_), KPC(tx_data)); break; } else { DEBUG_last_start_scn_ = tx_data->start_scn_; } // update pre_commit_version - if (tx_data->commit_scn_ > cur_max_commit_scn_) { - cur_max_commit_scn_ = tx_data->commit_scn_; + if (tx_data->commit_version_ > cur_max_commit_version_) { + cur_max_commit_version_ = tx_data->commit_version_; } // If this tx data is the first tx data in sorted list or its start_scn is 1_s larger than - // the pre_start_log_ts, we use this start_scn to calculate upper_trans_version + // the pre_start_scn, we use this start_scn to calculate upper_trans_version if (pre_start_scn_.is_min() || tx_data->start_scn_ >= SCN::plus(pre_start_scn_, PERIODICAL_SELECT_INTERVAL_NS)/*1s*/) { pre_start_scn_ = tx_data->start_scn_; @@ -513,7 +513,7 @@ int ObTxDataMemtableScanIterator::periodical_get_next_commit_scn_(ObCommitSCNsAr if (nullptr != tx_data) { node.start_scn_ = tx_data->start_scn_; // use cur_max_commit_version_ to keep the commit versions monotonically increasing - node.commit_scn_ = cur_max_commit_scn_; + node.commit_version_ = cur_max_commit_version_; // STORAGE_LOG(INFO, "GENGLI ", K(iter_cnt), K(PERIODICAL_SELECT_INTERVAL_NS), K(node)); tx_data = nullptr; } else if (nullptr == cur_node_) { @@ -591,12 +591,12 @@ int ObTxDataMemtableScanIterator::merge_cur_and_past_commit_verisons_(const palf // array whose start_scn is larger than the minimum start_scn in current array will be dropped. The reason is in this // issue: https://work.aone.alibaba-inc.com/issue/43389863 palf::SCN cur_min_start_scn = cur_arr.count() > 0 ? cur_arr.at(0).start_scn_ : palf::SCN::max_scn(); - palf::SCN max_commit_scn = palf::SCN::min_scn(); + palf::SCN max_commit_version = palf::SCN::min_scn(); if (OB_FAIL( - merge_pre_process_node_(step_len, cur_min_start_scn, recycle_scn, past_arr, max_commit_scn, merged_arr))) { + merge_pre_process_node_(step_len, cur_min_start_scn, recycle_scn, past_arr, max_commit_version, merged_arr))) { STORAGE_LOG(WARN, "merge past commit versions failed.", KR(ret), K(past_arr), KPC(tx_data_memtable_)); } else if (OB_FAIL( - merge_pre_process_node_(step_len, SCN::max_scn(), recycle_scn, cur_arr, max_commit_scn, merged_arr))) { + merge_pre_process_node_(step_len, SCN::max_scn(), recycle_scn, cur_arr, max_commit_version, merged_arr))) { STORAGE_LOG(WARN, "merge current commit versions failed.", KR(ret), K(cur_arr), KPC(tx_data_memtable_)); } else if (0 == merged_arr.count()) { if (OB_FAIL(merged_arr.push_back(ObCommitSCNsArray::Node(SCN::max_scn(), SCN::max_scn())))) { @@ -621,7 +621,7 @@ int ObTxDataMemtableScanIterator::merge_pre_process_node_(const int64_t step_len const palf::SCN start_scn_limit, const palf::SCN recycle_scn, const ObIArray &data_arr, - palf::SCN &max_commit_scn, + palf::SCN &max_commit_version, ObIArray &merged_arr) { int ret = OB_SUCCESS; @@ -635,9 +635,9 @@ int ObTxDataMemtableScanIterator::merge_pre_process_node_(const int64_t step_len if (data_arr.at(i).start_scn_ >= start_scn_limit) { break; } - max_commit_scn = std::max(max_commit_scn, data_arr.at(i).commit_scn_); - ObCommitSCNsArray::Node new_node(data_arr.at(i).start_scn_, max_commit_scn); - if (new_node.commit_scn_ <= recycle_scn) { + max_commit_version = std::max(max_commit_version, data_arr.at(i).commit_version_); + ObCommitSCNsArray::Node new_node(data_arr.at(i).start_scn_, max_commit_version); + if (new_node.commit_version_ <= recycle_scn) { // this tx data should be recycled // do nothing } else if (OB_FAIL(merged_arr.push_back(new_node))) { @@ -646,9 +646,9 @@ int ObTxDataMemtableScanIterator::merge_pre_process_node_(const int64_t step_len } // push back the last pre-process node - max_commit_scn = std::max(max_commit_scn, data_arr.at(arr_len - 1).commit_scn_); + max_commit_version = std::max(max_commit_version, data_arr.at(arr_len - 1).commit_version_); if (OB_SUCC(ret) && data_arr.at(arr_len - 1).start_scn_ < start_scn_limit) { - ObCommitSCNsArray::Node new_node(data_arr.at(arr_len - 1).start_scn_, max_commit_scn); + ObCommitSCNsArray::Node new_node(data_arr.at(arr_len - 1).start_scn_, max_commit_version); if (OB_FAIL(merged_arr.push_back(new_node))) { STORAGE_LOG(WARN, "push back commit version node failed.", KR(ret), KPC(tx_data_memtable_)); } @@ -888,8 +888,8 @@ int ObTxCtxMemtableScanIterator::init(ObTxCtxMemtable *tx_ctx_memtable) STORAGE_LOG(WARN, "Failed to reserve tx ctx buffer", K(ret)); } else if (OB_FAIL(meta_buf_.reserve(TX_CTX_META_BUF_LENGTH))) { STORAGE_LOG(WARN, "Failed to reserve tx ctx meta buffer", K(ret)); - // NB: We must first prepare the rec_log_ts for ObLSTxCtxMgr and then - // prepare the rec_log_ts for tx ctx + // NB: We must first prepare the rec_scn for ObLSTxCtxMgr and then + // prepare the rec_scn for tx ctx } else if (OB_FAIL(ls_tx_ctx_mgr->refresh_aggre_rec_log_ts())) { STORAGE_LOG(WARN, "Failed to prepare for dump tx ctx", K(ret)); } else if (OB_FAIL(ls_tx_ctx_iter_.set_ready(ls_tx_ctx_mgr))) { diff --git a/src/storage/tx_table/ob_tx_table_iterator.h b/src/storage/tx_table/ob_tx_table_iterator.h index 9e3f83928..58e15f2a1 100644 --- a/src/storage/tx_table/ob_tx_table_iterator.h +++ b/src/storage/tx_table/ob_tx_table_iterator.h @@ -90,7 +90,7 @@ public: : is_inited_(false), iter_param_(iter_param), dump_tx_data_done_(false), - cur_max_commit_scn_(), + cur_max_commit_version_(), pre_start_scn_(), tx_data_row_cnt_(0), pre_tx_data_(nullptr), @@ -157,7 +157,7 @@ private: const palf::SCN start_scn_limit, const palf::SCN recycle_scn, const ObIArray &data_arr, - palf::SCN &max_commit_scn, + palf::SCN &max_commit_version, ObIArray &merged_arr); int set_row_with_merged_commit_scns_(ObCommitSCNsArray &merged_commit_scns, @@ -179,7 +179,7 @@ private: bool is_inited_; const ObTableIterParam &iter_param_; bool dump_tx_data_done_; - palf::SCN cur_max_commit_scn_; + palf::SCN cur_max_commit_version_; palf::SCN pre_start_scn_; int64_t tx_data_row_cnt_; ObTxData *pre_tx_data_; diff --git a/unittest/libobcdc/log_generator.h b/unittest/libobcdc/log_generator.h index bccaf3c56..f7bb2b634 100644 --- a/unittest/libobcdc/log_generator.h +++ b/unittest/libobcdc/log_generator.h @@ -287,8 +287,8 @@ void ObTxLogGenerator::gen_prepare_log() void ObTxLogGenerator::gen_commit_log() { int64_t commit_ts = get_timestamp(); - palf::SCN commit_scn; - commit_scn.convert_for_lsn_allocator(commit_ts); + palf::SCN commit_version; + commit_version.convert_for_lsn_allocator(commit_ts); uint64_t checksum = 0; share::ObLSArray inc_ls_arr; ObTxBufferNodeArray mds_arr; @@ -302,7 +302,7 @@ void ObTxLogGenerator::gen_commit_log() } ObTxCommitLog commit_log( - commit_scn, + commit_version, checksum, inc_ls_arr, mds_arr, diff --git a/unittest/storage/tx_table/test_tx_ctx_table.cpp b/unittest/storage/tx_table/test_tx_ctx_table.cpp index af4bf32d4..6388e4368 100644 --- a/unittest/storage/tx_table/test_tx_ctx_table.cpp +++ b/unittest/storage/tx_table/test_tx_ctx_table.cpp @@ -161,7 +161,7 @@ int64_t TestTxCtxTable::ref_count_; TEST_F(TestTxCtxTable, test_tx_ctx_memtable_mgr) { EXPECT_EQ(0, TestTxCtxTable::ref_count_); - EXPECT_EQ(OB_SUCCESS, mt_mgr_->create_memtable(0, /*last_replay_log_ts*/ + EXPECT_EQ(OB_SUCCESS, mt_mgr_->create_memtable(SCN::min_scn(), /*last_replay_log_ts*/ 0 /*schema_version*/)); EXPECT_EQ(1, TestTxCtxTable::ref_count_);