[SCN Refactor] replace log_ts with scn in tx_table phase 2

This commit is contained in:
ZenoWang 2022-11-28 02:05:14 +00:00 committed by ob-robot
parent e77375018e
commit cdf5734cac
37 changed files with 288 additions and 340 deletions

View File

@ -169,15 +169,14 @@ SCN SCN::minus(const SCN &ref, uint64_t delta)
{
int ret = OB_SUCCESS;
SCN result;
uint64_t new_val = OB_INVALID_SCN_VAL;
if (OB_UNLIKELY(delta >= OB_MAX_SCN_TS_NS || !ref.is_valid())) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "invalid argument", K(delta), K(ref), K(ret));
} else if (OB_UNLIKELY((new_val = ref.val_ - delta) > ref.val_)) {
} else if (OB_UNLIKELY(ref.val_ < delta)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR , "new_val is not valid", K(new_val), K(ref), K(delta), K(ret));
PALF_LOG(ERROR , "new_val is not valid", K(ref), K(delta), K(ret));
} else {
result.val_ = new_val;
result.val_ = ref.val_ - delta;
}
return result;
}
@ -319,6 +318,11 @@ uint64_t SCN::get_val_for_lsn_allocator() const
return val_;
}
uint64_t SCN::get_val_for_row_cell() const
{
return val_;
}
int SCN::convert_for_tx(int64_t val)
{
int ret = OB_SUCCESS;

View File

@ -78,6 +78,7 @@ public:
//only for gts use
uint64_t get_val_for_gts() const;
uint64_t get_val_for_lsn_allocator() const;
uint64_t get_val_for_row_cell() const;
// @param[in] convert for tx
int convert_for_tx(int64_t commit_trans_version);

View File

@ -125,10 +125,10 @@ int ObAllVirtualTxDataTable::process_curr_tenant(common::ObNewRow *&row)
cur_row_.cells_[i].set_int(row_data.tx_data_count_);
break;
case MIN_TX_SCN_COL:
cur_row_.cells_[i].set_uint64(row_data.min_tx_scn_.get_val_for_lsn_allocator());
cur_row_.cells_[i].set_uint64(row_data.min_tx_scn_.get_val_for_inner_table_field());
break;
case MAX_TX_SCN_COL:
cur_row_.cells_[i].set_uint64(row_data.max_tx_scn_.get_val_for_lsn_allocator());
cur_row_.cells_[i].set_uint64(row_data.max_tx_scn_.get_val_for_inner_table_field());
break;
default:
ret = OB_ERR_UNEXPECTED;

View File

@ -24,6 +24,8 @@
namespace oceanbase
{
using namespace storage;
using namespace palf;
namespace blocksstable
{
ObIMicroBlockRowScanner::ObIMicroBlockRowScanner(common::ObIAllocator &allocator)
@ -1272,9 +1274,12 @@ int ObMultiVersionMicroBlockRowScanner::lock_for_read(
int ret = OB_SUCCESS;
auto &tx_table_guard = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guard();
int64_t read_epoch = tx_table_guard.epoch();
SCN scn_trans_version = SCN::invalid_scn();
if (OB_FAIL(tx_table_guard.get_tx_table()->lock_for_read(
lock_for_read_arg, read_epoch, can_read, trans_version, is_determined_state))) {
lock_for_read_arg, read_epoch, can_read, scn_trans_version, is_determined_state))) {
LOG_WARN("failed to check transaction status", K(ret));
} else {
trans_version = scn_trans_version.get_val_for_tx();
}
return ret;
}
@ -1989,12 +1994,16 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::get_trans_state(
{
int ret = OB_SUCCESS;
//get trans status & committed_trans_version_
commit_trans_version = INT64_MAX;
SCN scn_commit_trans_version = SCN::max_scn();
SCN merge_scn;
merge_scn.convert_for_lsn_allocator(context_->merge_log_ts_);
auto &tx_table_guard = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guard();
int64_t read_epoch = tx_table_guard.epoch();;
if (OB_FAIL(tx_table_guard.get_tx_table()->get_tx_state_with_log_ts(
trans_id, context_->merge_log_ts_, read_epoch, state, commit_trans_version))) {
if (OB_FAIL(tx_table_guard.get_tx_table()->get_tx_state_with_scn(
trans_id, merge_scn, read_epoch, state, scn_commit_trans_version))) {
LOG_WARN("get transaction status failed", K(ret), K(trans_id), K(state));
} else {
commit_trans_version = scn_commit_trans_version.get_val_for_tx();
}
return ret;
}

View File

@ -627,7 +627,7 @@ public:
// ObTxTable interface
DELEGATE_WITH_RET(tx_table_, get_tx_table_guard, int);
DELEGATE_WITH_RET(tx_table_, get_upper_trans_version_before_given_log_ts, int);
DELEGATE_WITH_RET(tx_table_, get_upper_trans_version_before_given_scn, int);
DELEGATE_WITH_RET(tx_table_, dump_single_tx_data_2_text, int);
// ObCheckpointExecutor interface:

View File

@ -30,6 +30,8 @@ using namespace storage;
using namespace transaction;
using namespace common;
using namespace blocksstable;
using namespace palf;
namespace memtable
{
@ -263,14 +265,14 @@ int ObMultiVersionValueIterator::get_trans_status(
{
UNUSED(cluster_version);
int ret = OB_SUCCESS;
int64_t trans_version = INT64_MAX;
SCN trans_version = SCN::max_scn();
ObTxTable *tx_table = ctx_->get_tx_table_guard().get_tx_table();
int64_t read_epoch = ctx_->get_tx_table_guard().epoch();
if (OB_ISNULL(tx_table)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "tx_table_ is null", K(ret), KPC_(ctx), K(merge_scn_), K(trans_id));
} else if (OB_FAIL(tx_table->get_tx_state_with_log_ts(trans_id,
merge_scn_.get_val_for_lsn_allocator(),
} else if (OB_FAIL(tx_table->get_tx_state_with_scn(trans_id,
merge_scn_,
read_epoch,
state,
trans_version))) {

View File

@ -198,8 +198,7 @@ int ObMvccValueIterator::lock_for_read_inner_(const ObQueryFlag &flag,
// is_delay_cleanout() to check the state and we only cleanout it
// when data is delay cleanout
bool can_read = false;
// palf::SCN data_scn = palf::SCN::max_scn();
int64_t data_version = INT64_MAX;
palf::SCN data_version = palf::SCN::max_scn();
bool is_determined_state = false;
// Opt3: we only cleanout tx node who is delay cleanout
ObCleanoutOp cleanout_op;
@ -225,7 +224,7 @@ int ObMvccValueIterator::lock_for_read_inner_(const ObQueryFlag &flag,
cleanout_op,
recheck_op))) {
TRANS_LOG(WARN, "lock for read failed", KPC(iter), K(lock_for_read_arg));
} else if (can_read && snapshot_version.get_val_for_lsn_allocator() >= data_version) {
} else if (can_read && snapshot_version >= data_version) {
// Case 5.1: data is cleanout by lock for read and can be read by reader's
// snapshot
version_iter_ = iter;

View File

@ -59,9 +59,6 @@ public:
ObFreezer *freezer,
ObTenantMetaMemMgr *t3m,
ObTabletDDLKvMgr *ddl_kv_mgr);
virtual int create_memtable(const int64_t clog_checkpoint_ts,
const int64_t schema_version,
const bool for_replay=false) = 0;
virtual int create_memtable(const palf::SCN clog_checkpoint_scn,
const int64_t schema_version,
const bool for_replay = false)

View File

@ -77,12 +77,11 @@ void ObLockMemtableMgr::reset()
is_inited_ = false;
}
int ObLockMemtableMgr::create_memtable(
const int64_t last_replay_log_ts,
const int64_t schema_version,
const bool for_replay)
int ObLockMemtableMgr::create_memtable(const palf::SCN clog_checkpoint_scn,
const int64_t schema_version,
const bool for_replay)
{
UNUSED(last_replay_log_ts);
UNUSED(clog_checkpoint_scn);
UNUSED(schema_version);
UNUSED(for_replay);

View File

@ -63,9 +63,9 @@ public:
storage::ObTabletDDLKvMgr *ddl_kv_mgr) override;
virtual void destroy() override;
virtual int create_memtable(const int64_t last_replay_log_ts,
virtual int create_memtable(const palf::SCN clog_checkpoint_scn,
const int64_t schema_version,
const bool for_replay=false) override;
const bool for_replay = false) override;
DECLARE_TO_STRING;
private:

View File

@ -1108,9 +1108,11 @@ int ObTablet::update_upper_trans_version(ObLS &ls, bool &is_updated)
LOG_WARN("sstable must not be null", K(ret), K(i), K(minor_tables));
} else if (INT64_MAX == sstable->get_upper_trans_version()) {
int64_t max_trans_version = INT64_MAX;
if (OB_FAIL(ls.get_upper_trans_version_before_given_log_ts(
sstable->get_end_log_ts(), max_trans_version))) {
SCN tmp_scn = SCN::max_scn();
if (OB_FAIL(ls.get_upper_trans_version_before_given_scn(
sstable->get_end_scn(), tmp_scn))) {
LOG_WARN("failed to get upper trans version before given log ts", K(ret), KPC(sstable));
} else if (FALSE_IT(max_trans_version = tmp_scn.is_max() ? INT64_MAX : tmp_scn.get_val_for_lsn_allocator())) {
} else if (0 == max_trans_version) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("max trans version should not be 0", KPC(sstable));
@ -1641,7 +1643,7 @@ int ObTablet::inner_create_memtable(
LOG_WARN("invalid args", K(ret), K(clog_checkpoint_scn), K(schema_version));
} else if (OB_FAIL(get_memtable_mgr(memtable_mgr))) {
LOG_WARN("failed to get memtable mgr", K(ret));
} else if (OB_FAIL(memtable_mgr->create_memtable(clog_checkpoint_scn.get_val_for_gts(), schema_version, for_replay))) {
} else if (OB_FAIL(memtable_mgr->create_memtable(clog_checkpoint_scn, schema_version, for_replay))) {
if (OB_ENTRY_EXIST != ret && OB_MINOR_FREEZE_NOT_ALLOW != ret) {
LOG_WARN("failed to create memtable for tablet", K(ret),
K(clog_checkpoint_scn), K(schema_version), K(for_replay));

View File

@ -129,7 +129,7 @@ int ObTabletMemtableMgr::init_storage_schema_recorder(
// There are two cases:
// 1. create the first memtable for tablet
// 2. create the new memtable after freezing the old memtable
int ObTabletMemtableMgr::create_memtable(const int64_t clog_checkpoint_ts,
int ObTabletMemtableMgr::create_memtable(const palf::SCN clog_checkpoint_scn,
const int64_t schema_version,
const bool for_replay)
{
@ -168,7 +168,7 @@ int ObTabletMemtableMgr::create_memtable(const int64_t clog_checkpoint_ts,
ObITable::TableKey table_key;
table_key.table_type_ = ObITable::DATA_MEMTABLE;
table_key.tablet_id_ = tablet_id_;
table_key.scn_range_.start_scn_.convert_for_gts(clog_checkpoint_ts); //TODO(SCN) fix clog_checkpoint_ts with SCN
table_key.scn_range_.start_scn_ = clog_checkpoint_scn;
table_key.scn_range_.end_scn_.set_max();
memtable::ObMemtable *memtable = NULL;
@ -213,10 +213,10 @@ int ObTabletMemtableMgr::create_memtable(const int64_t clog_checkpoint_ts,
}
}
// there is no frozen memtable and new sstable will not be generated,
// meaning that clog_checkpoint_ts will not be updated now,
// so get newest clog_checkpoint_ts to set left boundary
// meaning that clog_checkpoint_scn will not be updated now,
// so get newest clog_checkpoint_scn to set left boundary
} else if (OB_FAIL(get_newest_clog_checkpoint_scn(new_clog_checkpoint_scn))){
LOG_WARN("failed to get newest clog_checkpoint_ts", K(ret), K(ls_id), K(tablet_id_),
LOG_WARN("failed to get newest clog_checkpoint_scn", K(ret), K(ls_id), K(tablet_id_),
K(new_clog_checkpoint_scn));
} else if (OB_FAIL(get_newest_snapshot_version(new_snapshot_version))){
LOG_WARN("failed to get newest snapshot_version", K(ret), K(ls_id), K(tablet_id_), K(new_snapshot_version));

View File

@ -48,9 +48,6 @@ public:
ObTenantMetaMemMgr *t3m,
ObTabletDDLKvMgr *ddl_kv_mgr) override;
virtual int create_memtable(const int64_t clog_checkpoint_ts,
const int64_t schema_version,
const bool for_replay=false) override;
virtual int get_active_memtable(ObTableHandleV2 &handle) const override;
virtual int get_all_memtables(ObTableHdlArray &handle) override;
virtual void destroy() override;
@ -70,6 +67,9 @@ public:
memtable::ObMemtable *&memtable,
const memtable::MultiSourceDataUnitType type) const override;
int release_tail_memtable(memtable::ObIMemtable *memtable);
int create_memtable(const palf::SCN clog_checkpoint_scn,
const int64_t schema_version,
const bool for_replay);
int get_memtables(
ObTableHdlArray &handle,
const bool reset_handle = true,

View File

@ -280,7 +280,7 @@ int ObCtxTxData::set_commit_version(const palf::SCN &commit_version)
if (OB_FAIL(check_tx_data_writable_())) {
TRANS_LOG(WARN, "tx data is not writeable", K(ret), K(*this));
} else {
tx_data_->commit_scn_ = commit_version;
tx_data_->commit_version_ = commit_version;
}
return ret;
@ -324,7 +324,7 @@ int32_t ObCtxTxData::get_state() const
const palf::SCN ObCtxTxData::get_commit_version() const
{
RLockGuard guard(lock_);
SCN commit_version = (NULL != tx_data_ ? tx_data_->commit_scn_ : tx_commit_data_.commit_scn_);
SCN commit_version = (NULL != tx_data_ ? tx_data_->commit_version_ : tx_commit_data_.commit_version_);
return commit_version;
}

View File

@ -1612,20 +1612,16 @@ int ObTransService::get_tx_state_from_tx_table_(const share::ObLSID &lsid,
ObTxTable *tx_table = NULL;
int64_t _state = 0;
int64_t read_epoch = ObTxTable::INVALID_READ_EPOCH;
int64_t commit_ts = 0;
if (OB_FAIL(get_tx_table_guard_(NULL, lsid, tx_table_guard))) {
TRANS_LOG(WARN, "get tx table guard failed", KR(ret), K(lsid), KPC(this));
} else if (!tx_table_guard.is_valid()) {
TRANS_LOG(WARN, "tx table is null", KR(ret), K(lsid), KPC(this));
} else if (FALSE_IT(tx_table = tx_table_guard.get_tx_table())) {
} else if (FALSE_IT(read_epoch = tx_table_guard.epoch())) {
} else if (OB_FAIL(tx_table->try_get_tx_state(tx_id, read_epoch, _state, commit_ts))) {
} else if (OB_FAIL(tx_table->try_get_tx_state(tx_id, read_epoch, _state, commit_version))) {
TRANS_LOG(WARN, "get tx state failed", KR(ret), K(lsid), K(tx_id), KPC(this));
} else {
state = (int)_state;
if (OB_FAIL(commit_version.convert_for_lsn_allocator(commit_ts))) {
TRANS_LOG(WARN, "convert for lsn fail", K(commit_ts));
}
}
return ret;
}

View File

@ -217,7 +217,7 @@ void ObTxCommitData::reset()
{
tx_id_ = INT64_MAX;
state_ = RUNNING;
commit_scn_.reset();
commit_version_.reset();
start_scn_.reset();
end_scn_.reset();
is_in_tx_data_table_ = false;
@ -257,18 +257,18 @@ int ObTxData::serialize(char *buf, const int64_t buf_len, int64_t &pos) const
int ObTxData::serialize_(char *buf, const int64_t buf_len, int64_t &pos) const
{
int ret = OB_SUCCESS;
// LST_DO_CODE(OB_UNIS_ENCODE, state_, commit_scn_, start_scn_, end_scn_);
// LST_DO_CODE(OB_UNIS_ENCODE, state_, commit_version_, start_scn_, end_scn_);
if (OB_FAIL(tx_id_.serialize(buf, buf_len, pos))) {
STORAGE_LOG(WARN, "serialize tx_id fail.", KR(ret), K(pos), K(buf_len));
} else if (OB_FAIL(serialization::encode_vi32(buf, buf_len, pos, state_))) {
STORAGE_LOG(WARN, "serialize state fail.", KR(ret), K(pos), K(buf_len));
} else if (OB_FAIL(commit_scn_.serialize(buf, buf_len, pos))) {
STORAGE_LOG(WARN, "serialize commit_scn fail.", KR(ret), K(pos), K(buf_len));
} else if (OB_FAIL(commit_version_.serialize(buf, buf_len, pos))) {
STORAGE_LOG(WARN, "serialize commit_version fail.", KR(ret), K(pos), K(buf_len));
} else if (OB_FAIL(start_scn_.serialize(buf, buf_len, pos))) {
STORAGE_LOG(WARN, "serialize start_log_ts fail.", KR(ret), K(pos), K(buf_len));
STORAGE_LOG(WARN, "serialize start_scn fail.", KR(ret), K(pos), K(buf_len));
} else if (OB_FAIL(end_scn_.serialize(buf, buf_len, pos))) {
STORAGE_LOG(WARN, "serialize end_log_ts fail.", KR(ret), K(pos), K(buf_len));
STORAGE_LOG(WARN, "serialize end_scn fail.", KR(ret), K(pos), K(buf_len));
} else if (OB_FAIL(undo_status_list_.serialize(buf, buf_len, pos))) {
STORAGE_LOG(WARN, "serialize undo_status_list fail.", KR(ret), K(pos), K(buf_len));
}
@ -289,10 +289,10 @@ int64_t ObTxData::get_serialize_size() const
int64_t ObTxData::get_serialize_size_() const
{
int64_t len = 0;
// LST_DO_CODE(OB_UNIS_ADD_LEN, state_, commit_scn_, start_scn_, end_scn_);
// LST_DO_CODE(OB_UNIS_ADD_LEN, state_, commit_version_, start_scn_, end_scn_);
len += tx_id_.get_serialize_size();
len += serialization::encoded_length_vi32(state_);
len += commit_scn_.get_serialize_size();
len += commit_version_.get_serialize_size();
len += start_scn_.get_serialize_size();
len += end_scn_.get_serialize_size();
len += undo_status_list_.get_serialize_size();
@ -339,12 +339,12 @@ int ObTxData::deserialize_(const char *buf,
STORAGE_LOG(WARN, "deserialize tx_id fail.", KR(ret), K(pos), K(data_len));
} else if (OB_FAIL(serialization::decode_vi32(buf, data_len, pos, &state_))) {
STORAGE_LOG(WARN, "deserialize state fail.", KR(ret), K(pos), K(data_len));
} else if (OB_FAIL(commit_scn_.deserialize(buf, data_len, pos))) {
STORAGE_LOG(WARN, "deserialize commit_scn fail.", KR(ret), K(pos), K(data_len));
} else if (OB_FAIL(commit_version_.deserialize(buf, data_len, pos))) {
STORAGE_LOG(WARN, "deserialize commit_version fail.", KR(ret), K(pos), K(data_len));
} else if (OB_FAIL(start_scn_.deserialize(buf, data_len, pos))) {
STORAGE_LOG(WARN, "deserialize start_log_ts fail.", KR(ret), K(pos), K(data_len));
STORAGE_LOG(WARN, "deserialize start_scn fail.", KR(ret), K(pos), K(data_len));
} else if (OB_FAIL(end_scn_.deserialize(buf, data_len, pos))) {
STORAGE_LOG(WARN, "deserialize end_log_ts fail.", KR(ret), K(pos), K(data_len));
STORAGE_LOG(WARN, "deserialize end_scn fail.", KR(ret), K(pos), K(data_len));
} else if (OB_FAIL(undo_status_list_.deserialize(buf, data_len, pos, slice_allocator))) {
STORAGE_LOG(WARN, "deserialize undo_status_list fail.", KR(ret), K(pos), K(data_len));
}
@ -367,7 +367,7 @@ ObTxData &ObTxData::operator=(const ObTxData &rhs)
{
tx_id_ = rhs.tx_id_;
state_ = rhs.state_;
commit_scn_ = rhs.commit_scn_;
commit_version_ = rhs.commit_version_;
start_scn_ = rhs.start_scn_;
end_scn_ = rhs.end_scn_;
undo_status_list_ = rhs.undo_status_list_;
@ -379,7 +379,7 @@ ObTxData &ObTxData::operator=(const ObTxCommitData &rhs)
{
tx_id_ = rhs.tx_id_;
state_ = rhs.state_;
commit_scn_ = rhs.commit_scn_;
commit_version_ = rhs.commit_version_;
start_scn_ = rhs.start_scn_;
end_scn_ = rhs.end_scn_;
is_in_tx_data_table_ = rhs.is_in_tx_data_table_;
@ -406,16 +406,16 @@ bool ObTxData::is_valid_in_tx_data_table() const
STORAGE_LOG(ERROR, "tx data state is invalid", KPC(this));
} else if (!start_scn_.is_valid()) {
bool_ret = false;
STORAGE_LOG(ERROR, "tx data start_log_ts is invalid", KPC(this));
STORAGE_LOG(ERROR, "tx data start_scn is invalid", KPC(this));
} else if (!end_scn_.is_valid()) {
bool_ret = false;
STORAGE_LOG(ERROR, "tx data end_log_ts is invalid", KPC(this));
STORAGE_LOG(ERROR, "tx data end_scn is invalid", KPC(this));
} else if (end_scn_ < start_scn_) {
bool_ret = false;
STORAGE_LOG(ERROR, "tx data end_log_ts is less than start_log_ts", KPC(this));
} else if (!commit_scn_.is_valid() && state_ != RUNNING && state_ != ABORT) {
STORAGE_LOG(ERROR, "tx data end_scn is less than start_scn", KPC(this));
} else if (!commit_version_.is_valid() && state_ != RUNNING && state_ != ABORT) {
bool_ret = false;
STORAGE_LOG(ERROR, "tx data commit_scn is invalid but state is not running or abort",
STORAGE_LOG(ERROR, "tx data commit_version is invalid but state is not running or abort",
KPC(this));
}
@ -511,15 +511,15 @@ bool ObTxData::equals_(ObTxData &rhs)
} else if (state_ != rhs.state_) {
bool_ret = false;
STORAGE_LOG(INFO, "state is not equal.");
} else if (commit_scn_ != rhs.commit_scn_) {
} else if (commit_version_ != rhs.commit_version_) {
bool_ret = false;
STORAGE_LOG(INFO, "commit_scn is not equal.");
STORAGE_LOG(INFO, "commit_version is not equal.");
} else if (start_scn_ != rhs.start_scn_) {
bool_ret = false;
STORAGE_LOG(INFO, "start_log_ts is not equal.");
STORAGE_LOG(INFO, "start_scn is not equal.");
} else if (end_scn_ != rhs.end_scn_) {
bool_ret = false;
STORAGE_LOG(INFO, "end_log_ts is not equal.");
STORAGE_LOG(INFO, "end_scn is not equal.");
} else if (undo_status_list_.undo_node_cnt_ != rhs.undo_status_list_.undo_node_cnt_) {
bool_ret = false;
STORAGE_LOG(INFO, "undo_node_cnt is not equal.");
@ -565,13 +565,13 @@ bool ObTxData::equals_(ObTxData &rhs)
void ObTxData::print_to_stderr(const ObTxData &tx_data)
{
fprintf(stderr,
"TX_DATA:{tx_id=%-20ld in_tx_data_table=%-6s start_log_scn=%-20s end_log_scn=%-20s commit_scn=%-20s "
"TX_DATA:{tx_id=%-20ld in_tx_data_table=%-6s start_log_scn=%-20s end_log_scn=%-20s commit_version=%-20s "
"state=%s",
tx_data.tx_id_.get_id(),
tx_data.is_in_tx_data_table_ ? "True" : "False",
to_cstring(tx_data.start_scn_),
to_cstring(tx_data.end_scn_),
to_cstring(tx_data.commit_scn_),
to_cstring(tx_data.commit_version_),
get_state_string(tx_data.state_));
tx_data.undo_status_list_.dump_2_text(stderr);
@ -585,12 +585,12 @@ void ObTxData::dump_2_text(FILE *fd) const
fprintf(fd,
"TX_DATA:\n{\n tx_id=%-20ld\n in_tx_data_table=%-6s\n start_log_scn=%-20s\n end_log_scn=%-20s\n "
" commit_scn=%-20s\n state=%s\n",
" commit_version=%-20s\n state=%s\n",
tx_id_.get_id(),
is_in_tx_data_table_ ? "True" : "False",
to_cstring(start_scn_),
to_cstring(end_scn_),
to_cstring(commit_scn_),
to_cstring(commit_version_),
get_state_string(state_));
undo_status_list_.dump_2_text(fd);
@ -605,7 +605,7 @@ DEF_TO_STRING(ObTxData)
J_KV(K_(tx_id),
"state", get_state_string(state_),
"in_tx_data_table", is_in_tx_data_table_ ? "True" : "False",
K_(commit_scn),
K_(commit_version),
K_(start_scn),
K_(end_scn),
K_(undo_status_list));

View File

@ -207,7 +207,6 @@ public:
int64_t prepare_version_;
};
// FIXME : @gengli remove log ts
class ObTxCommitData
{
public:
@ -216,7 +215,7 @@ public:
TO_STRING_KV(K_(tx_id),
K_(state),
K_(is_in_tx_data_table),
K_(commit_scn),
K_(commit_version),
K_(start_scn),
K_(end_scn));
@ -235,12 +234,11 @@ public:
transaction::ObTransID tx_id_;
int32_t state_;
bool is_in_tx_data_table_;
palf::SCN commit_scn_;
palf::SCN commit_version_;
palf::SCN start_scn_;
palf::SCN end_scn_;
};
// DONT : Modify this definition
class ObTxData : public ObTxCommitData, public TxDataHashValue
{
friend TxDataHashMapAllocHandle;

View File

@ -20,6 +20,9 @@
namespace oceanbase
{
using namespace palf;
namespace storage
{
@ -88,7 +91,7 @@ int CheckRowLockedFunctor::operator() (const ObTxData &tx_data, ObTxCCCtx *tx_cc
// Case 1: data is committed, so the lock is locked by the data and we
// also need return the commit version for tsc check
lock_state_.is_locked_ = false;
lock_state_.trans_version_ = tx_data.commit_scn_;
lock_state_.trans_version_ = tx_data.commit_version_;
break;
}
case ObTxData::RUNNING: {
@ -128,7 +131,7 @@ int CheckRowLockedFunctor::operator() (const ObTxData &tx_data, ObTxCCCtx *tx_cc
}
int GetTxStateWithLogTSFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx)
int GetTxStateWithSCNFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx)
{
UNUSED(tx_cc_ctx);
int ret = OB_SUCCESS;
@ -140,22 +143,22 @@ int GetTxStateWithLogTSFunctor::operator()(const ObTxData &tx_data, ObTxCCCtx *t
// Case 1: data is during execution, so we return the running state with
// INT64_MAX as version
state_ = ObTxData::RUNNING;
trans_version_ = INT64_MAX;
} else if (log_ts_ < tx_data.end_scn_.get_val_for_lsn_allocator()) {
trans_version_ = SCN::max_scn();
} else if (scn_ < tx_data.end_scn_) {
// Case 2: data is decided while the required state is before the merge log
// ts, so we return the running state with INT64_MAX as txn version
state_ = ObTxData::RUNNING;
trans_version_ = INT64_MAX;
trans_version_ = SCN::max_scn();
} else if (ObTxData::COMMIT == tx_data.state_) {
// Case 3: data is committed and the required state is after the merge log
// ts, so we return the commit state with commit version as txn version
state_ = ObTxData::COMMIT;
trans_version_ = tx_data.commit_scn_.get_val_for_lsn_allocator();
trans_version_ = tx_data.commit_version_;
} else if (ObTxData::ABORT == tx_data.state_) {
// Case 4: data is aborted and the required state is after the merge log
// ts, so we return the abort state with 0 as txn version
state_ = ObTxData::ABORT;
trans_version_ = 0;
trans_version_ = SCN::min_scn();
} else {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "unexpected transaction state_", K(ret), K(tx_data));
@ -169,7 +172,7 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx *
{
int ret = OB_SUCCESS;
can_read_ = false;
trans_version_ = OB_INVALID_VERSION;
trans_version_ = SCN::invalid_scn();
is_determined_state_ = false;
auto &snapshot = lock_for_read_arg_.mvcc_acc_ctx_.snapshot_;
auto snapshot_version = snapshot.version_;
@ -186,13 +189,13 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx *
// depends on whether undo status contains the data. Then we return the commit
// version as data version.
can_read_ = !tx_data.undo_status_list_.is_contain(data_sql_sequence);
trans_version_ = tx_data.commit_scn_.get_val_for_lsn_allocator();
trans_version_ = tx_data.commit_version_;
is_determined_state_ = true;
break;
}
case ObTxData::ELR_COMMIT: {
can_read_ = !tx_data.undo_status_list_.is_contain(data_sql_sequence);
trans_version_ = tx_data.commit_scn_.get_val_for_lsn_allocator();
trans_version_ = tx_data.commit_version_;
is_determined_state_ = false;
break;
}
@ -201,7 +204,7 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx *
if (read_latest && reader_tx_id == data_tx_id) {
// Case 2.0: read the latest written of current txn
can_read_ = !tx_data.undo_status_list_.is_contain(data_sql_sequence);
trans_version_ = 0;
trans_version_ = SCN::min_scn();
} else if (snapshot_tx_id == data_tx_id) {
// Case 2.1: data is owned by the read txn
bool tmp_can_read = false;
@ -219,7 +222,7 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx *
can_read_ = tmp_can_read &&
!tx_data.undo_status_list_.is_contain(data_sql_sequence);
// Tip 2.1.2: trans version is unnecessary for the running txn
trans_version_ = 0;
trans_version_ = SCN::min_scn();
} else {
// Case 2.2: data is not owned by the read txn
// NB: we need pay attention to the choice condition when issuing the
@ -234,7 +237,7 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx *
// snapshot version, so we cannot read it and trans version is
// unnecessary for the running txn
can_read_ = false;
trans_version_ = 0;
trans_version_ = SCN::min_scn();
} else if (tx_cc_ctx->prepare_version_ > snapshot_version.get_val_for_lsn_allocator()) {
// Case 2.2.2: data is at least in prepare state and the prepare
// version is bigger than the read txn's snapshot version, then the
@ -242,7 +245,7 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx *
// version, so we cannot read it and trans version is unnecessary for
// the running txn
can_read_ = false;
trans_version_ = 0;
trans_version_ = SCN::min_scn();
} else {
// Case 2.2.3: data is in prepare state and the prepare version is
// smaller than the read txn's snapshot version, then the data's
@ -264,7 +267,7 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx *
// Case 3: data is aborted, so the state is decided, then we can not read
// the data and the trans version is unnecessary for the aborted txn
can_read_ = false;
trans_version_ = 0;
trans_version_ = SCN::min_scn();
is_determined_state_ = true;
break;
}
@ -331,7 +334,7 @@ bool ObReCheckTxNodeForLockForReadOperation::operator()()
if (tnode_.is_aborted()) {
can_read_ = false;
trans_version_ = 0;
trans_version_ = SCN::min_scn();
is_determined_state_ = true;
ret = true;
}
@ -381,9 +384,9 @@ int ObCleanoutTxNodeOperation::operator()(const ObTxData &tx_data, ObTxCCCtx *tx
}
} else if (ObTxData::COMMIT == tx_data.state_) {
// Case 4: data is committed, so we should write back the commit state
if (OB_FAIL(value_.trans_commit(tx_data.commit_scn_, tnode_))) {
if (OB_FAIL(value_.trans_commit(tx_data.commit_version_, tnode_))) {
TRANS_LOG(WARN, "mvcc trans ctx trans commit error", K(ret), K(value_), K(tnode_));
} else if (FALSE_IT(tnode_.trans_commit(tx_data.commit_scn_, tx_data.end_scn_))) {
} else if (FALSE_IT(tnode_.trans_commit(tx_data.commit_version_, tx_data.end_scn_))) {
} else if (blocksstable::ObDmlFlag::DF_LOCK == tnode_.get_dml_flag()
&& OB_FAIL(value_.unlink_trans_node(tnode_))) {
TRANS_LOG(WARN, "unlink lock node failed", K(ret), K(value_), K(tnode_));

View File

@ -33,7 +33,7 @@ class ObReCheckTxNodeForLockForReadOperation
public:
ObReCheckTxNodeForLockForReadOperation(memtable::ObMvccTransNode &tnode,
bool &can_read,
int64_t &trans_version,
palf::SCN &trans_version,
bool &is_determined_state)
: tnode_(tnode),
can_read_(can_read),
@ -45,7 +45,7 @@ private:
memtable::ObMvccTransNode &tnode_;
bool &can_read_;
bool &is_determined_state_;
int64_t &trans_version_;
palf::SCN &trans_version_;
};
class ObReCheckNothingOperation
@ -127,24 +127,24 @@ public:
ObStoreRowLockState &lock_state_;
};
// fetch the state of txn DATA_TRANS_ID when replaying to LOG_TS
// fetch the state of txn DATA_TRANS_ID when replaying to SCN
// the requirement can be seen from https://yuque.antfin-inc.com/ob/storage/adk6yx
// return the txn state and commit version if committed, INT64_MAX if running
// and 0 if rollbacked when replaying to LOG_ID
class GetTxStateWithLogTSFunctor : public ObITxDataCheckFunctor
class GetTxStateWithSCNFunctor : public ObITxDataCheckFunctor
{
public:
GetTxStateWithLogTSFunctor(const int64_t log_ts,
GetTxStateWithSCNFunctor(const palf::SCN scn,
int64_t &state,
int64_t &trans_version)
: log_ts_(log_ts), state_(state), trans_version_(trans_version) {}
virtual ~GetTxStateWithLogTSFunctor() {}
palf::SCN &trans_version)
: scn_(scn), state_(state), trans_version_(trans_version) {}
virtual ~GetTxStateWithSCNFunctor() {}
virtual int operator()(const ObTxData &tx_data, ObTxCCCtx *tx_cc_ctx = nullptr) override;
TO_STRING_KV(K(log_ts_), K(state_), K(trans_version_));
TO_STRING_KV(K(scn_), K(state_), K(trans_version_));
public:
const int64_t log_ts_;
const palf::SCN scn_;
int64_t &state_;
int64_t &trans_version_;
palf::SCN &trans_version_;
};
// the txn READ_TRANS_ID use SNAPSHOT_VERSION to read the data,
@ -158,7 +158,7 @@ class LockForReadFunctor : public ObITxDataCheckFunctor
public:
LockForReadFunctor(const transaction::ObLockForReadArg &lock_for_read_arg,
bool &can_read,
int64_t &trans_version,
palf::SCN &trans_version,
bool &is_determined_state,
const ObCleanoutOp &cleanout_op = ObCleanoutNothingOperation(),
const ObReCheckOp &recheck_op = ObReCheckNothingOperation())
@ -180,7 +180,7 @@ public:
const transaction::ObLockForReadArg &lock_for_read_arg_;
bool &can_read_;
bool &is_determined_state_;
int64_t &trans_version_;
palf::SCN &trans_version_;
// Cleanout the tx node if necessary
ObCleanoutOp cleanout_op_;
// ReCheck whether tx node is valid.

View File

@ -19,6 +19,8 @@
namespace oceanbase
{
using namespace share;
using namespace palf;
namespace storage
{
@ -225,9 +227,9 @@ palf::SCN ObTxCtxMemtable::get_rec_scn()
palf::SCN rec_scn;
if (OB_FAIL(get_ls_tx_ctx_mgr()->get_rec_scn(rec_scn))) {
TRANS_LOG(WARN, "get rec log ts failed", K(ret));
TRANS_LOG(WARN, "get rec scn failed", K(ret));
} else {
TRANS_LOG(INFO, "tx ctx memtable get rec log ts", KPC(this), K(rec_scn));
TRANS_LOG(INFO, "tx ctx memtable get rec scn", KPC(this), K(rec_scn));
}
return rec_scn;
@ -265,7 +267,7 @@ int ObTxCtxMemtable::flush(palf::SCN recycle_scn, bool need_freeze)
ObSpinLockGuard guard(flush_lock_);
if (need_freeze) {
palf::SCN rec_scn = get_rec_scn();
SCN rec_scn = get_rec_scn();
if (rec_scn >= recycle_scn) {
TRANS_LOG(INFO, "no need to freeze", K(rec_scn), K(recycle_scn));
} else if (is_active_memtable()) {

View File

@ -70,7 +70,6 @@ public:
virtual int flush(palf::SCN recycle_scn, bool need_freeze = true);
virtual ObTabletID get_tablet_id() const override;
virtual bool is_flushing() const override;
// ================ NOT SUPPORTED INTERFACE ===============

View File

@ -21,6 +21,7 @@
namespace oceanbase
{
using namespace share;
using namespace palf;
namespace storage
{
@ -65,11 +66,11 @@ void ObTxCtxMemtableMgr::reset()
is_inited_ = false;
}
int ObTxCtxMemtableMgr::create_memtable(const int64_t last_replay_log_ts,
int ObTxCtxMemtableMgr::create_memtable(const SCN last_replay_scn,
const int64_t schema_version,
const bool for_replay)
{
UNUSED(last_replay_log_ts);
UNUSED(last_replay_scn);
UNUSED(schema_version);
UNUSED(for_replay);

View File

@ -58,7 +58,7 @@ public:
virtual void destroy() override;
// create_memtable is used for creating the only memtable for CheckpointMgr
virtual int create_memtable(const int64_t last_replay_log_ts,
virtual int create_memtable(const palf::SCN last_replay_scn,
const int64_t schema_version,
const bool for_replay=false) override;

View File

@ -418,7 +418,7 @@ bool ObTxDataMemtable::ready_for_flush()
bool_ret = true;
STORAGE_LOG(INFO, "memtable is frozen yet.", KP(this));
} else if (OB_FAIL(freezer_->get_max_consequent_callbacked_scn(max_consequent_callbacked_scn))) {
STORAGE_LOG(WARN, "get_max_consequent_callbacked_log_ts failed", K(ret), K(freezer_->get_ls_id()));
STORAGE_LOG(WARN, "get_max_consequent_callbacked_scn failed", K(ret), K(freezer_->get_ls_id()));
} else if (max_consequent_callbacked_scn >= key_.scn_range_.end_scn_) {
state_ = ObTxDataMemtable::State::FROZEN;
set_snapshot_version(min_tx_scn_);
@ -463,7 +463,7 @@ int ObTxDataMemtable::do_sort_by_tx_id_()
int64_t get_start_ts_(const ObTxData &tx_data)
{
return tx_data.start_scn_.get_val_for_lsn_allocator();
return tx_data.start_scn_.get_val_for_tx();
}
int ObTxDataMemtable::do_sort_by_start_scn_()

View File

@ -239,7 +239,6 @@ public: /* derived from ObIMemtable */
const blocksstable::ObDatumRowkey &rowkey) override;
public: // checkpoint
int64_t get_rec_log_ts();
palf::SCN get_rec_scn();
// int freeze();
@ -277,7 +276,6 @@ public: // getter && setter
void reset_is_iterating() { ATOMIC_STORE(&is_iterating_, false); }
// FIXME : @gengli remove
palf::SCN get_end_scn() { return key_.scn_range_.end_scn_;}
@ -309,13 +307,13 @@ private: // ObTxDataMemtable
bool is_iterating_;
bool has_constructed_list_;
// the minimum log ts of commit_scn in this tx data memtable
// the minimum scn of commit_version in this tx data memtable
palf::SCN min_tx_scn_;
// the maximum log ts in this tx data memtable
// the maximum scn in this tx data memtable
palf::SCN max_tx_scn_;
// the minimum start log ts in this tx data memtable
// the minimum start scn in this tx data memtable
palf::SCN min_start_scn_;
int64_t inserted_cnt_;
@ -380,15 +378,15 @@ public:
UNUSED(key);
// printf basic info
fprintf(fd_,
"ObTxData : tx_id=%-19ld is_in_memtable=%-3d state=%-8s start_scn=%-19ld "
"end_scn=%-19ld "
"commit_scn=%-19ld ",
"ObTxData : tx_id=%-19ld is_in_memtable=%-3d state=%-8s start_scn=%-19s "
"end_scn=%-19s "
"commit_version=%-19s ",
tx_data->tx_id_.get_id(),
tx_data->is_in_tx_data_table_,
ObTxData::get_state_string(tx_data->state_),
tx_data->start_scn_.get_val_for_lsn_allocator(),
tx_data->end_scn_.get_val_for_lsn_allocator(),
tx_data->commit_scn_.get_val_for_lsn_allocator());
to_cstring(tx_data->start_scn_),
to_cstring(tx_data->end_scn_),
to_cstring(tx_data->commit_version_));
// printf undo status list
fprintf(fd_, "Undo Actions : {");
@ -409,17 +407,6 @@ private:
FILE *fd_;
};
OB_INLINE int64_t ObTxDataMemtable::get_rec_log_ts()
{
// TODO : @gengli
// rec_scn changes constantly. The rec_scn obtained by checkpoint mgr
// may be greater than the actual checkpoint of tx_data_memtable because the
// callback functions are not sequential. The checkpoint is determined both on
// the max-sequential callback point of the log and the rec_scn.
return min_tx_scn_.get_val_for_lsn_allocator();
}
OB_INLINE palf::SCN ObTxDataMemtable::get_rec_scn()
{
// TODO : @gengli

View File

@ -146,17 +146,6 @@ int ObTxDataMemtableMgr::create_memtable(const palf::SCN clog_checkpoint_scn,
return ret;
}
int ObTxDataMemtableMgr::create_memtable(const int64_t clog_checkpoint_ts,
const int64_t schema_version,
const bool for_replay)
{
UNUSED(for_replay);
SCN clog_checkpoint_scn;
clog_checkpoint_scn.convert_for_lsn_allocator(clog_checkpoint_ts);
return create_memtable(clog_checkpoint_scn, schema_version);
}
int ObTxDataMemtableMgr::create_memtable_(const palf::SCN clog_checkpoint_scn, int64_t schema_version)
{
UNUSED(schema_version);

View File

@ -63,13 +63,10 @@ public: // ObTxDataMemtableMgr
* @brief Using to create a new active tx data memtable
*
* @param[in] clog_checkpoint_ts clog_checkpoint_ts, using to init multiversion_start,
* base_version and start_log_ts. The start_log_ts will be modified if this function is called by
* base_version and start_scn. The start_scn will be modified if this function is called by
* freeze().
* @param[in] schema_version schema_version, not used
*/
virtual int create_memtable(const int64_t clog_checkpoint_ts,
const int64_t schema_version,
const bool for_replay=false) override;
virtual int create_memtable(const palf::SCN clog_checkpoint_scn,
const int64_t schema_version,
const bool for_replay=false) override;

View File

@ -179,7 +179,7 @@ void ObTxDataTable::stop()
void ObTxDataTable::reset()
{
min_start_scn_in_ctx_.reset();
last_update_min_start_scn_ts_ = 0;
last_update_ts_ = 0;
tablet_id_ = 0;
ls_ = nullptr;
ls_tablet_svr_ = nullptr;
@ -383,7 +383,7 @@ int ObTxDataTable::insert_(ObTxData *&tx_data, ObTxDataMemtableWriteGuard &write
tg.click();
// If this tx data can not be inserted into all memtables, check if it should be filtered.
// We use the start log ts of the first memtable as the filtering time stamp
// We use the start scn of the first memtable as the filtering time stamp
if (OB_SUCC(ret) && OB_NOT_NULL(tx_data) && OB_NOT_NULL(tx_data_memtable)) {
SCN clog_checkpoint_scn = tx_data_memtable->get_key().get_start_scn();
if (tx_data->end_scn_ <= clog_checkpoint_scn) {
@ -407,7 +407,7 @@ int ObTxDataTable::insert_(ObTxData *&tx_data, ObTxDataMemtableWriteGuard &write
return ret;
}
// A tx data with lesser end_log_ts may be inserted after a tx data with greater end_log_ts due to
// A tx data with lesser end_scn may be inserted after a tx data with greater end_scn due to
// the out-of-order callback of log. This function will retain the newer tx data and delete the
// older one.
int ObTxDataTable::insert_into_memtable_(ObTxDataMemtable *tx_data_memtable, ObTxData *&tx_data)
@ -438,7 +438,7 @@ int ObTxDataTable::insert_into_memtable_(ObTxDataMemtable *tx_data_memtable, ObT
// successfully remove
}
} else {
// The end_log_ts of tx data in memtable is greater than or equal to the end_log_ts of current
// The end_scn of tx data in memtable is greater than or equal to the end_scn of current
// tx data, which means the tx data waiting to insert is an older one. So we set need_insert =
// false to skip inserting this tx data
need_insert = false;
@ -735,7 +735,7 @@ int ObTxDataTable::get_ls_min_end_scn_in_latest_tablets_(SCN &min_end_scn)
while (OB_SUCC(iterator.get_next_tablet(tablet_handle))) {
SCN end_scn_from_single_tablet = SCN::max_scn();
if (OB_FAIL(get_min_end_scn_from_single_tablet_(tablet_handle, end_scn_from_single_tablet))) {
STORAGE_LOG(WARN, "get min end_log_ts from a single tablet failed.", KR(ret));
STORAGE_LOG(WARN, "get min end_scn from a single tablet failed.", KR(ret));
} else if (end_scn_from_single_tablet < min_end_scn) {
min_end_scn = end_scn_from_single_tablet;
}
@ -792,27 +792,13 @@ int ObTxDataTable::self_freeze_task()
return ret;
}
// The main steps in calculating upper_trans_scn. For more details, see :
// The main steps in calculating upper_trans_version. For more details, see :
// https://yuque.antfin-inc.com/ob/transaction/lurtok
int ObTxDataTable::get_upper_trans_version_before_given_log_ts(const int64_t sstable_end_log_ts,
int64_t &upper_trans_version)
{
int ret = OB_SUCCESS;
SCN sstable_end_scn;
sstable_end_scn.convert_for_lsn_allocator(sstable_end_log_ts);
SCN tmp_upper_trans_version;
if (OB_FAIL(get_upper_trans_scn_before_given_scn(sstable_end_scn, tmp_upper_trans_version))) {
} else {
upper_trans_version = tmp_upper_trans_version.get_val_for_lsn_allocator();
}
return ret;
}
int ObTxDataTable::get_upper_trans_scn_before_given_scn(const SCN sstable_end_scn, SCN &upper_trans_scn)
int ObTxDataTable::get_upper_trans_version_before_given_scn(const SCN sstable_end_scn, SCN &upper_trans_version)
{
int ret = OB_SUCCESS;
bool skip_calc = false;
upper_trans_scn.set_max();
upper_trans_version.set_max();
STORAGE_LOG(DEBUG, "start get upper trans version", K(get_ls_id()));
@ -820,9 +806,9 @@ int ObTxDataTable::get_upper_trans_scn_before_given_scn(const SCN sstable_end_sc
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "The tx data table is not inited.", KR(ret));
} else if (true == (skip_calc = skip_this_sstable_end_scn_(sstable_end_scn))) {
// there is a start_log_ts of running transactions is smaller than the sstable_end_log_ts
// there is a start_scn of running transactions is smaller than the sstable_end_scn
} else {
TCWLockGuard lock_guard(calc_upper_trans_scn_cache_.lock_);
TCWLockGuard lock_guard(calc_upper_trans_version_cache_.lock_);
if (OB_FAIL(update_cache_if_needed_(skip_calc))) {
STORAGE_LOG(WARN, "update cache failed.", KR(ret));
}
@ -830,14 +816,14 @@ int ObTxDataTable::get_upper_trans_scn_before_given_scn(const SCN sstable_end_sc
if (OB_FAIL(ret)) {
} else if (skip_calc) {
} else if (0 == calc_upper_trans_scn_cache_.commit_scns_.array_.count()) {
STORAGE_LOG(ERROR, "Unexpected empty array.", K(calc_upper_trans_scn_cache_));
} else if (0 == calc_upper_trans_version_cache_.commit_scns_.array_.count()) {
STORAGE_LOG(ERROR, "Unexpected empty array.", K(calc_upper_trans_version_cache_));
} else {
TCRLockGuard lock_guard(calc_upper_trans_scn_cache_.lock_);
if (!calc_upper_trans_scn_cache_.commit_scns_.is_valid()) {
TCRLockGuard lock_guard(calc_upper_trans_version_cache_.lock_);
if (!calc_upper_trans_version_cache_.commit_scns_.is_valid()) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "invalid cache for upper trans version calculation", KR(ret));
} else if (OB_FAIL(calc_upper_trans_scn_(sstable_end_scn, upper_trans_scn))) {
} else if (OB_FAIL(calc_upper_trans_scn_(sstable_end_scn, upper_trans_version))) {
STORAGE_LOG(WARN, "calc upper trans version failed", KR(ret), "ls_id", get_ls_id());
} else {
FLOG_INFO("get upper trans version finish.",
@ -845,7 +831,7 @@ int ObTxDataTable::get_upper_trans_scn_before_given_scn(const SCN sstable_end_sc
K(skip_calc),
"ls_id", get_ls_id(),
K(sstable_end_scn),
K(upper_trans_scn));
K(upper_trans_version));
}
}
return ret;
@ -855,20 +841,20 @@ int ObTxDataTable::get_upper_trans_scn_before_given_scn(const SCN sstable_end_sc
// calculation.
//
// 1. Accuracy :
// If there are some running transactions with start_log_ts which is less than or equal to
// sstable_end_log_ts, we skip this upper_trans_version calculation because it is currently
// If there are some running transactions with start_scn which is less than or equal to
// sstable_end_scn, we skip this upper_trans_version calculation because it is currently
// undetermined.
//
// 2. Performance :
// If there are some commited transactions with start_log_ts which is less than or equal to
// sstable_end_log_ts stil in tx data memtable, we skip this upper_trans_version calculation because
// If there are some commited transactions with start_scn which is less than or equal to
// sstable_end_scn stil in tx data memtable, we skip this upper_trans_version calculation because
// calculating upper_trans_version in tx data memtable is very slow.
bool ObTxDataTable::skip_this_sstable_end_scn_(SCN sstable_end_scn)
{
int ret = OB_SUCCESS;
bool need_skip = false;
int64_t cur_ts = common::ObTimeUtility::fast_current_time();
int64_t tmp_update_ts = ATOMIC_LOAD(&last_update_min_start_scn_ts_);
int64_t tmp_update_ts = ATOMIC_LOAD(&last_update_ts_);
SCN min_start_scn_in_tx_data_memtable = SCN::max_scn();
SCN max_decided_scn = SCN::min_scn();
@ -880,13 +866,13 @@ bool ObTxDataTable::skip_this_sstable_end_scn_(SCN sstable_end_scn)
STORAGE_LOG(INFO, "skip calc upper trans version once", K(max_decided_scn), K(sstable_end_scn));
}
// If the min_start_log_ts_in_ctx has not been updated for more than 30 seconds,
// If the min_start_scn_in_ctx has not been updated for more than 30 seconds,
if (need_skip) {
} else if (cur_ts - tmp_update_ts > 30_s
&& tmp_update_ts == ATOMIC_CAS(&last_update_min_start_scn_ts_, tmp_update_ts, cur_ts)) {
// update last_update_min_start_log_ts
&& tmp_update_ts == ATOMIC_CAS(&last_update_ts_, tmp_update_ts, cur_ts)) {
// update last_update_min_start_scn
if (OB_FAIL(tx_ctx_table_->get_min_start_scn(min_start_scn_in_ctx_))) {
STORAGE_LOG(WARN, "get min start log ts from tx ctx table failed.", KR(ret));
STORAGE_LOG(WARN, "get min start scn from tx ctx table failed.", KR(ret));
}
}
@ -894,7 +880,7 @@ bool ObTxDataTable::skip_this_sstable_end_scn_(SCN sstable_end_scn)
} else if (OB_FAIL(ret)) {
need_skip = true;
} else if (min_start_scn_in_ctx_ <= sstable_end_scn) {
// skip this sstable end log ts calculation
// skip this sstable end scn calculation
need_skip = true;
} else if (OB_FAIL(update_memtables_cache())) {
STORAGE_LOG(WARN, "update memtables fail.", KR(ret));
@ -918,7 +904,7 @@ bool ObTxDataTable::skip_this_sstable_end_scn_(SCN sstable_end_scn)
= std::min(min_start_scn_in_tx_data_memtable,
tx_data_memtable->get_min_start_scn()))) {
} else if (sstable_end_scn >= min_start_scn_in_tx_data_memtable) {
// there is a min_start_log_ts in tx_data_memtable less than sstable_end_log_ts, skip this
// there is a min_start_scn in tx_data_memtable less than sstable_end_scn, skip this
// calculation
need_skip = true;
break;
@ -949,13 +935,13 @@ int ObTxDataTable::update_cache_if_needed_(bool &skip_calc)
if (nullptr == table) {
skip_calc = true;
} else if (!calc_upper_trans_scn_cache_.is_inited_ ||
calc_upper_trans_scn_cache_.cache_version_scn_.get_val_for_lsn_allocator() < table->get_end_log_ts()) {
ret = update_calc_upper_trans_scn_cache_(table);
} else if (calc_upper_trans_scn_cache_.cache_version_scn_.get_val_for_lsn_allocator() > table->get_end_log_ts()) {
} else if (!calc_upper_trans_version_cache_.is_inited_ ||
calc_upper_trans_version_cache_.cache_version_ < table->get_end_scn()) {
ret = update_calc_upper_trans_version_cache_(table);
} else if (calc_upper_trans_version_cache_.cache_version_ > table->get_end_scn()) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR,
"the end log ts of the latest sstable is unexpected smaller",
"the end scn of the latest sstable is unexpected smaller",
KR(ret),
KPC(tablet_handle.get_obj()),
KPC(table));
@ -965,7 +951,7 @@ int ObTxDataTable::update_cache_if_needed_(bool &skip_calc)
return ret;
}
int ObTxDataTable::update_calc_upper_trans_scn_cache_(ObITable *table)
int ObTxDataTable::update_calc_upper_trans_version_cache_(ObITable *table)
{
int ret = OB_SUCCESS;
STORAGE_LOG(DEBUG, "update calc upper trans version cache once.");
@ -982,26 +968,26 @@ int ObTxDataTable::update_calc_upper_trans_scn_cache_(ObITable *table)
LOG_WARN("invalid tablet handle", KR(ret), K(tablet_handle), K(tablet_id_));
} else {
ObCommitVersionsGetter getter(iter_param, table);
if (OB_FAIL(getter.get_next_row(calc_upper_trans_scn_cache_.commit_scns_))) {
if (OB_FAIL(getter.get_next_row(calc_upper_trans_version_cache_.commit_scns_))) {
STORAGE_LOG(WARN, "update calc_upper_trans_trans_version_cache failed.", KR(ret), KPC(table));
} else {
calc_upper_trans_scn_cache_.is_inited_ = true;
calc_upper_trans_scn_cache_.cache_version_scn_.convert_for_lsn_allocator(table->get_end_log_ts());
calc_upper_trans_version_cache_.is_inited_ = true;
calc_upper_trans_version_cache_.cache_version_ = table->get_end_scn();
// update calc_upper_trans_scn_cache succeed.
}
}
return ret;
}
int ObTxDataTable::calc_upper_trans_scn_(const SCN sstable_end_scn, SCN &upper_trans_scn)
int ObTxDataTable::calc_upper_trans_scn_(const SCN sstable_end_scn, SCN &upper_trans_version)
{
int ret = OB_SUCCESS;
const auto &array = calc_upper_trans_scn_cache_.commit_scns_.array_;
const auto &array = calc_upper_trans_version_cache_.commit_scns_.array_;
int l = 0;
int r = array.count() - 1;
// Binary find the first start_log_ts that is greater than or equal to sstable_end_log_ts
// Binary find the first start_scn that is greater than or equal to sstable_end_scn
while (l < r) {
int mid = (l + r) >> 1;
if (array.at(mid).start_scn_ < sstable_end_scn) {
@ -1011,21 +997,21 @@ int ObTxDataTable::calc_upper_trans_scn_(const SCN sstable_end_scn, SCN &upper_t
}
}
// Check if the start_log_ts is greater than or equal to the sstable_end_log_ts. If not, delay the
// Check if the start_scn is greater than or equal to the sstable_end_scn. If not, delay the
// upper_trans_version calculation to the next time.
if (0 == array.count() || !array.at(l).commit_scn_.is_valid()) {
upper_trans_scn.set_max();
if (0 == array.count() || !array.at(l).commit_version_.is_valid()) {
upper_trans_version.set_max();
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "unexpected array count or commit version", KR(ret), K(array.count()), K(array.at(l)));
} else {
upper_trans_scn = array.at(l).commit_scn_;
upper_trans_version = array.at(l).commit_version_;
}
STORAGE_LOG(INFO,
"calculate upper trans version finish",
K(sstable_end_scn),
K(upper_trans_scn),
K(calc_upper_trans_scn_cache_),
K(upper_trans_version),
K(calc_upper_trans_version_cache_),
"ls_id", get_ls_id(),
"array_count", array.count(),
"chose_idx", l);

View File

@ -105,7 +105,7 @@ public: // ObTxDataTable
: is_inited_(false),
is_started_(false),
min_start_scn_in_ctx_(),
last_update_min_start_scn_ts_(0),
last_update_ts_(0),
tablet_id_(0),
mem_attr_(),
slice_allocator_(),
@ -186,11 +186,9 @@ public: // ObTxDataTable
int get_recycle_scn(palf::SCN &recycle_scn);
/**
* @brief see ObTxTable::get_upper_trans_version_before_given_log_ts()
* @brief see ObTxTable::get_upper_trans_version_before_given_scn()
*/
int get_upper_trans_version_before_given_log_ts(const int64_t sstable_end_log_ts,
int64_t &upper_trans_version);
int get_upper_trans_scn_before_given_scn(const palf::SCN sstable_end_scn, palf::SCN &upper_trans_scn);
int get_upper_trans_version_before_given_scn(const palf::SCN sstable_end_scn, palf::SCN &upper_trans_version);
/**
* @brief see ObTxTable::supplement_undo_actions_if_exist
@ -216,7 +214,7 @@ public: // ObTxDataTable
K_(is_inited),
K_(is_started),
K_(min_start_scn_in_ctx),
K_(last_update_min_start_scn_ts),
K_(last_update_ts),
K_(tablet_id),
KP_(ls),
KP_(ls_tablet_svr),
@ -258,7 +256,7 @@ private:
int update_cache_if_needed_(bool &skip_calc);
int update_calc_upper_trans_scn_cache_(ObITable *table);
int update_calc_upper_trans_version_cache_(ObITable *table);
int calc_upper_trans_scn_(const palf::SCN sstable_end_scn, palf::SCN &upper_trans_version);
@ -274,15 +272,6 @@ private:
int dump_tx_data_in_memtable_2_text_(const transaction::ObTransID tx_id, FILE *fd);
int dump_tx_data_in_sstable_2_text_(const transaction::ObTransID tx_id, FILE *fd);
// int DEBUG_slowly_calc_upper_trans_scn_(const int64_t sstable_end_log_ts,
// int64_t &tmp_upper_trans_version);
// int DEBUG_calc_with_all_sstables_(ObTableAccessContext &access_context,
// const int64_t sstable_end_log_ts,
// int64_t &tmp_upper_trans_version);
// int DEBUG_calc_with_row_iter_(ObStoreRowIterator *row_iter,
// const int64_t sstable_end_log_ts,
// int64_t &tmp_upper_trans_version);
bool skip_this_sstable_end_scn_(palf::SCN sstable_end_scn);
void print_alloc_size_for_test_();
@ -316,7 +305,7 @@ private:
bool is_inited_;
bool is_started_;
palf::SCN min_start_scn_in_ctx_;
int64_t last_update_min_start_scn_ts_;
int64_t last_update_ts_;
ObTabletID tablet_id_;
ObMemAttr mem_attr_;
// Allocator to allocate ObTxData and ObUndoStatus
@ -329,7 +318,7 @@ private:
ObTxDataMemtableMgr *memtable_mgr_;
ObTxCtxTable *tx_ctx_table_;
TxDataReadSchema read_schema_;
CalcUpperTransSCNCache calc_upper_trans_scn_cache_;
CalcUpperTransSCNCache calc_upper_trans_version_cache_;
MemtableHandlesCache memtables_cache_;
}; // tx_table

View File

@ -52,7 +52,7 @@ int ObTxTable::init(ObLS *ls)
} else {
ls_ = ls;
epoch_ = 0;
max_tablet_clog_checkpoint_ = 0;
max_tablet_clog_checkpoint_ = SCN::min_scn();
state_ = TxTableState::ONLINE;
LOG_INFO("init tx table successfully", K(ret), K(ls->get_ls_id()));
is_inited_ = true;
@ -170,8 +170,8 @@ int ObTxTable::offline()
int ObTxTable::prepare_online()
{
int ret = OB_SUCCESS;
int64_t tmp_max_tablet_clog_checkpiont = -1;
max_tablet_clog_checkpoint_ = INT64_MAX;
SCN tmp_max_tablet_clog_checkpiont = SCN::min_scn();
max_tablet_clog_checkpoint_ = SCN::invalid_scn();
ATOMIC_INC(&epoch_);
@ -197,23 +197,23 @@ int ObTxTable::prepare_online()
int ObTxTable::check_and_online()
{
int ret = OB_SUCCESS;
int64_t max_consequent_callbacked_log_ts = 0;
SCN max_consequent_callbacked_scn = SCN::min_scn();
if (OB_FAIL(ls_->get_max_decided_log_ts_ns(max_consequent_callbacked_log_ts))) {
LOG_WARN("get max decided log ts from ls failed", KR(ret), "ls_id", ls_->get_ls_id());
} else if (max_consequent_callbacked_log_ts >= max_tablet_clog_checkpoint_) {
if (OB_FAIL(ls_->get_max_decided_scn(max_consequent_callbacked_scn))) {
LOG_WARN("get max decided scn from ls failed", KR(ret), "ls_id", ls_->get_ls_id());
} else if (max_consequent_callbacked_scn >= max_tablet_clog_checkpoint_) {
ATOMIC_STORE(&state_, TxTableState::ONLINE);
LOG_INFO("tx table online finish",
"ls_id",
ls_->get_ls_id(),
K(max_consequent_callbacked_log_ts),
K(max_consequent_callbacked_scn),
K(max_tablet_clog_checkpoint_));
} else {
int64_t current_ts = ObTimeUtil::current_time();
int64_t time_after_prepare_online_ms = (current_ts - prepare_online_ts_) / 1000LL;
LOG_INFO("tx table is PREPARE_ONLINE but not ONLINE yet",
"ls_id", ls_->get_ls_id(),
K(max_consequent_callbacked_log_ts),
K(max_consequent_callbacked_scn),
K(max_tablet_clog_checkpoint_),
K(time_after_prepare_online_ms),
KTIME(current_ts),
@ -425,7 +425,7 @@ int ObTxTable::get_data_table_schema_(const uint64_t tenant_id,
const char *const TX_ID_NAME = "tx_id";
const char *const IDX_NAME = "idx";
const char *const TOTAL_ROW_CNT_NAME = "total_row_cnt";
const char *const END_TS_NAME = "end_log_ts";
const char *const END_SCN_NAME = "end_scn";
const char *const VALUE_NAME = "tx_info";
const char *const TABLE_NAME = "tx_data_table";
const int64_t SCHEMA_VERSION = 1;
@ -497,8 +497,8 @@ int ObTxTable::get_data_table_schema_(const uint64_t tenant_id,
LOG_WARN("failed to set column name", KR(ret), K(IDX_NAME));
} else if (OB_FAIL(total_row_cnt_column.set_column_name(TOTAL_ROW_CNT_NAME))) {
LOG_WARN("failed to set column name", KR(ret), K(TOTAL_ROW_CNT_NAME));
} else if (OB_FAIL(end_ts_column.set_column_name(END_TS_NAME))) {
LOG_WARN("failed to set column name", KR(ret), K(END_TS_NAME));
} else if (OB_FAIL(end_ts_column.set_column_name(END_SCN_NAME))) {
LOG_WARN("failed to set column name", KR(ret), K(END_SCN_NAME));
} else if (OB_FAIL(value_column.set_column_name(VALUE_NAME))) {
LOG_WARN("failed to set column name", KR(ret), K(VALUE_NAME));
} else if (OB_FAIL(schema.set_table_name(TABLE_NAME))) {
@ -648,7 +648,7 @@ int ObTxTable::load_tx_data_table_()
return ret;
}
int ObTxTable::get_max_tablet_clog_checkpoint_(int64_t &max_tablet_clog_checkpoint)
int ObTxTable::get_max_tablet_clog_checkpoint_(SCN &max_tablet_clog_checkpoint)
{
int ret = OB_SUCCESS;
ObLSTabletIterator tablet_iter(ObTabletCommon::DIRECT_GET_COMMITTED_TABLET_TIMEOUT_US);
@ -669,7 +669,7 @@ int ObTxTable::get_max_tablet_clog_checkpoint_(int64_t &max_tablet_clog_checkpoi
} else if (tablet_handle.get_obj()->get_tablet_meta().tablet_id_.is_inner_tablet()) {
// skip inner tablet
} else {
int64_t tmp_clog_checkpoint = tablet_handle.get_obj()->get_clog_checkpoint_ts();
SCN tmp_clog_checkpoint = tablet_handle.get_obj()->get_clog_checkpoint_scn();
if (tmp_clog_checkpoint > max_tablet_clog_checkpoint) {
max_tablet_clog_checkpoint = tmp_clog_checkpoint;
}
@ -951,26 +951,26 @@ int ObTxTable::check_sql_sequence_can_read(const transaction::ObTransID &data_tr
return ret;
}
int ObTxTable::get_tx_state_with_log_ts(const transaction::ObTransID &data_trans_id,
const int64_t log_ts,
int ObTxTable::get_tx_state_with_scn(const transaction::ObTransID &data_trans_id,
const SCN scn,
const int64_t read_epoch,
int64_t &state,
int64_t &trans_version)
SCN &trans_version)
{
GetTxStateWithLogTSFunctor fn(log_ts, state, trans_version);
GetTxStateWithSCNFunctor fn(scn, state, trans_version);
int ret = check_with_tx_data(data_trans_id, fn, read_epoch);
// TODO(handora.qc): remove it
LOG_DEBUG("finish get tx state with log ts", K(data_trans_id), K(log_ts), K(state), K(trans_version));
LOG_DEBUG("finish get tx state with scn", K(data_trans_id), K(scn), K(state), K(trans_version));
return ret;
}
int ObTxTable::try_get_tx_state(const transaction::ObTransID tx_id,
const int64_t read_epoch,
int64_t &state,
int64_t &trans_version)
SCN &trans_version)
{
int ret = OB_SUCCESS;
GetTxStateWithLogTSFunctor fn(INT64_MAX, state, trans_version);
GetTxStateWithSCNFunctor fn(SCN::max_scn(), state, trans_version);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("tx table is not init.", KR(ret), K(tx_id));
@ -988,7 +988,7 @@ int ObTxTable::try_get_tx_state(const transaction::ObTransID tx_id,
int ObTxTable::lock_for_read(const transaction::ObLockForReadArg &lock_for_read_arg,
const int64_t read_epoch,
bool &can_read,
int64_t &trans_version,
SCN &trans_version,
bool &is_determined_state,
const ObCleanoutOp &cleanout_op,
const ObReCheckOp &recheck_op)
@ -1017,21 +1017,9 @@ int ObTxTable::get_recycle_scn(SCN &recycle_scn)
return ret;
}
int ObTxTable::get_upper_trans_version_before_given_log_ts(const int64_t sstable_end_log_ts, int64_t &upper_trans_version)
int ObTxTable::get_upper_trans_version_before_given_scn(const palf::SCN sstable_end_scn, palf::SCN &upper_trans_version)
{
return tx_data_table_.get_upper_trans_version_before_given_log_ts(sstable_end_log_ts, upper_trans_version);
}
int ObTxTable::get_start_tx_log_ts(int64_t &start_tx_log_ts)
{
int ret = OB_SUCCESS;
SCN tmp_start_scn;
if (OB_FAIL(get_start_tx_scn(tmp_start_scn))) {
STORAGE_LOG(WARN, "get start tx scn failed", KR(ret));
} else {
start_tx_log_ts = tmp_start_scn.get_val_for_lsn_allocator();
}
return ret;
return tx_data_table_.get_upper_trans_version_before_given_scn(sstable_end_scn, upper_trans_version);
}
int ObTxTable::get_start_tx_scn(SCN &start_tx_scn)

View File

@ -54,15 +54,16 @@ public:
ObTxTable()
: is_inited_(false),
epoch_(INVALID_READ_EPOCH),
max_tablet_clog_checkpoint_(0),
max_tablet_clog_checkpoint_(),
prepare_online_ts_(0),
state_(OFFLINE),
ls_(nullptr),
tx_data_table_(default_tx_data_table_) {}
ObTxTable(ObTxDataTable &tx_data_table)
: is_inited_(false),
epoch_(INVALID_READ_EPOCH),
max_tablet_clog_checkpoint_(0),
max_tablet_clog_checkpoint_(),
prepare_online_ts_(0),
state_(OFFLINE),
ls_(nullptr),
@ -144,16 +145,16 @@ public:
* https://yuque.antfin-inc.com/ob/storage/adk6yx
*
* @param[in] data_trans_id
* @param[in] log_ts
* @param[in] scn
* @param[in] read_epoch
* @param[out] state
* @param[out] trans_version
*/
int get_tx_state_with_log_ts(const transaction::ObTransID &data_trans_id,
const int64_t log_ts,
int get_tx_state_with_scn(const transaction::ObTransID &data_trans_id,
const palf::SCN scn,
const int64_t read_epoch,
int64_t &state,
int64_t &trans_version);
palf::SCN &trans_version);
/**
* @brief Try to get a tx data from tx_data_table. This function used in special situation when the trans service do
@ -167,7 +168,7 @@ public:
int try_get_tx_state(const transaction::ObTransID tx_id,
const int64_t read_epoch,
int64_t &state,
int64_t &trans_version);
palf::SCN &trans_version);
/**
* @brief the txn READ_TRANS_ID use SNAPSHOT_VERSION to read the data, and check whether the data is locked, readable or unreadable by txn DATA_TRANS_ID. READ_LATEST is used to check whether read the data belong to the same txn
@ -182,7 +183,7 @@ public:
int lock_for_read(const transaction::ObLockForReadArg &lock_for_read_arg,
const int64_t read_epoch,
bool &can_read,
int64_t &trans_version,
palf::SCN &trans_version,
bool &is_determined_state,
const ObCleanoutOp &cleanout_op = ObCleanoutNothingOperation(),
const ObReCheckOp &recheck_op = ObReCheckNothingOperation());
@ -208,7 +209,7 @@ public:
* @brief The tx data sstables need to be cleared periodically. This function returns a recycle_scn
* to decide which tx data should be cleared.
*
* @param[out] recycle_scn the tx data whose end_log_ts is smaller or equals to the recycle_scn can
* @param[out] recycle_scn the tx data whose end_scn is smaller or equals to the recycle_scn can
* be cleared.
*/
int get_recycle_scn(palf::SCN &recycle_scn);
@ -222,12 +223,12 @@ public:
}
/**
* @brief Get the upper trans version for each given end_log_ts
* @brief Get the upper trans version for each given end_scn
*
* @param[in] sstable_end_log_ts the end_log_ts of the data sstable which is waitting to get the upper_trans_version
* @param[in] sstable_end_scn the end_scn of the data sstable which is waitting to get the upper_trans_version
* @param[out] upper_trans_version the upper_trans_version
*/
int get_upper_trans_version_before_given_log_ts(const int64_t sstable_end_log_ts, int64_t &upper_trans_version);
int get_upper_trans_version_before_given_scn(const palf::SCN sstable_end_scn, palf::SCN &upper_trans_version);
/**
* @brief When a transaction is replayed in the middle, it will read tx data from tx data sstable
@ -248,7 +249,6 @@ public:
*
* @param[out] start_tx_scn
*/
int get_start_tx_log_ts(int64_t &start_log_ts);
int get_start_tx_scn(palf::SCN &start_tx_scn);
int dump_single_tx_data_2_text(const int64_t tx_id_int, const char *fname);
@ -303,7 +303,7 @@ private:
int load_tx_data_table_();
int offline_tx_ctx_table_();
int offline_tx_data_table_();
int get_max_tablet_clog_checkpoint_(int64_t &max_tablet_clog_checkpoint);
int get_max_tablet_clog_checkpoint_(palf::SCN &max_tablet_clog_checkpoint);
void check_state_and_epoch_(const transaction::ObTransID tx_id,
const int64_t read_epoch,
@ -316,7 +316,7 @@ private:
static const int64_t LS_TX_CTX_SCHEMA_COLUMN_CNT = 3;
bool is_inited_;
int64_t epoch_ CACHE_ALIGNED;
int64_t max_tablet_clog_checkpoint_;
palf::SCN max_tablet_clog_checkpoint_;
int64_t prepare_online_ts_;
TxTableState state_ CACHE_ALIGNED;
ObLS *ls_;

View File

@ -275,7 +275,7 @@ DEF_TO_STRING(ObCommitSCNsArray::Node)
{
int64_t pos = 0;
J_KV(K_(start_scn),
K_(commit_scn));
K_(commit_version));
return pos;
}
@ -353,7 +353,7 @@ int ObCommitSCNsArray::serialize_(char *buf, const int64_t buf_len, int64_t &pos
{
int ret = OB_SUCCESS;
for (int i = 0; OB_SUCC(ret) && i < array_.count(); i++) {
LST_DO_CODE(OB_UNIS_ENCODE, array_.at(i).start_scn_, array_.at(i).commit_scn_);
LST_DO_CODE(OB_UNIS_ENCODE, array_.at(i).start_scn_, array_.at(i).commit_version_);
}
return ret;
}
@ -364,7 +364,7 @@ int ObCommitSCNsArray::deserialize_(const char *buf, const int64_t data_len, int
ObCommitSCNsArray::Node node;
while (OB_SUCC(ret) && pos < data_len) {
LST_DO_CODE(OB_UNIS_DECODE, node.start_scn_, node.commit_scn_);
LST_DO_CODE(OB_UNIS_DECODE, node.start_scn_, node.commit_version_);
array_.push_back(node);
}
@ -375,7 +375,7 @@ int64_t ObCommitSCNsArray::get_serialize_size_() const
{
int64_t len = 0;
for (int i = 0; i < array_.count(); i++) {
LST_DO_CODE(OB_UNIS_ADD_LEN, array_.at(i).start_scn_, array_.at(i).commit_scn_);
LST_DO_CODE(OB_UNIS_ADD_LEN, array_.at(i).start_scn_, array_.at(i).commit_version_);
}
return len;
}
@ -385,9 +385,9 @@ bool ObCommitSCNsArray::is_valid()
bool bool_ret = true;
for (int i = 0; i < array_.count() - 1; i++) {
if (!array_.at(i).start_scn_.is_valid() ||
!array_.at(i).commit_scn_.is_valid() ||
!array_.at(i).commit_version_.is_valid() ||
array_.at(i).start_scn_ > array_.at(i + 1).start_scn_ ||
array_.at(i).start_scn_ > array_.at(i).commit_scn_) {
array_.at(i).start_scn_ > array_.at(i).commit_version_) {
bool_ret = false;
STORAGE_LOG(ERROR, "this commit version array is invalid", K(array_.at(i)),
K(array_.at(i + 1)));

View File

@ -198,18 +198,18 @@ private:
public:
struct Node {
palf::SCN start_scn_;
palf::SCN commit_scn_;
palf::SCN commit_version_;
Node() : start_scn_(), commit_scn_() {}
Node() : start_scn_(), commit_version_() {}
Node(const palf::SCN start_scn, const palf::SCN commit_scn)
: start_scn_(start_scn), commit_scn_(commit_scn) {}
Node(const palf::SCN start_scn, const palf::SCN commit_version)
: start_scn_(start_scn), commit_version_(commit_version) {}
bool operator==(const Node &rhs) const
{
bool is_equal = true;
if (this->start_scn_ != rhs.start_scn_
|| this->commit_scn_ != rhs.commit_scn_) {
|| this->commit_version_ != rhs.commit_version_) {
is_equal = false;
}
return is_equal;
@ -243,9 +243,9 @@ public:
if (i % 3 == 0) {
fprintf(stderr, "\n ");
}
fprintf(stderr, "(start_log_ts=%-20ld, commit_version=%-20ld) ",
commit_versions.array_.at(i).start_scn_.get_val_for_lsn_allocator(),
commit_versions.array_.at(i).commit_scn_.get_val_for_lsn_allocator());
fprintf(stderr, "(start_scn=%-20s, commit_version=%-20s) ",
to_cstring(commit_versions.array_.at(i).start_scn_),
to_cstring(commit_versions.array_.at(i).commit_version_));
}
fprintf(stderr, "\npre-process data end.\n");
}
@ -264,22 +264,22 @@ public:
class CalcUpperTransSCNCache
{
public:
CalcUpperTransSCNCache() : is_inited_(false), cache_version_scn_(), commit_scns_() {}
CalcUpperTransSCNCache() : is_inited_(false), cache_version_(), commit_scns_() {}
void reset()
{
is_inited_ = false;
cache_version_scn_.reset();
cache_version_.reset();
commit_scns_.reset();
}
TO_STRING_KV(K_(is_inited), K_(cache_version_scn), K_(commit_scns));
TO_STRING_KV(K_(is_inited), K_(cache_version), K_(commit_scns));
public:
bool is_inited_;
// The end_log_ts of the sstable will be used as the cache_version
palf::SCN cache_version_scn_;
// The end_scn of the sstable will be used as the cache_version
palf::SCN cache_version_;
mutable common::TCRWLock lock_;

View File

@ -96,7 +96,7 @@ int ObTxDataMemtableScanIterator::init(ObTxDataMemtable *tx_data_memtable)
// cur_node_ point to the next tx data
cur_node_ = cur_node_->next_;
tx_data_memtable_ = tx_data_memtable;
cur_max_commit_scn_.set_min();
cur_max_commit_version_.set_min();
pre_start_scn_.set_min();
tx_data_row_cnt_ = 0;
pre_tx_data_ = nullptr;
@ -113,7 +113,7 @@ void ObTxDataMemtableScanIterator::reset()
tx_data_memtable_->reset_is_iterating();
}
dump_tx_data_done_ = false;
cur_max_commit_scn_.set_min();
cur_max_commit_version_.set_min();
pre_start_scn_.set_min();
tx_data_row_cnt_ = 0;
pre_tx_data_ = nullptr;
@ -215,7 +215,7 @@ int ObTxDataMemtableScanIterator::get_next_tx_data_row_(const blocksstable::ObDa
int64_t end_ts_column = TX_DATA_END_TS_COLUMN + SSTABLE_HIDDEN_COLUMN_CNT;
int64_t value_column = TX_DATA_VAL_COLUMN + SSTABLE_HIDDEN_COLUMN_CNT;
row_.storage_datums_[total_row_cnt_column].set_int(1);
row_.storage_datums_[end_ts_column].set_int(tx_data->end_scn_.get_val_for_tx());
row_.storage_datums_[end_ts_column].set_int(tx_data->end_scn_.get_val_for_row_cell());
row_.storage_datums_[value_column].set_string(ObString(serialize_size, buf_.get_ptr()));
row_.set_first_multi_version_row();
row_.set_last_multi_version_row();
@ -234,8 +234,8 @@ int ObTxDataMemtableScanIterator::get_next_tx_data_row_(const blocksstable::ObDa
// This function is called after sorting tx_data by start_scn and the following steps is
// executed:
// 1. Select (start_scn, commit_scn) point per second and push them into an array.
// 2. Read (start_scn, commit_scn) array from the latest tx data sstable.
// 1. Select (start_scn, commit_version) point per second and push them into an array.
// 2. Read (start_scn, commit_version) array from the latest tx data sstable.
// 3. Get the recycle_scn to filtrate the point which is not needed any more.
// 4. Merge the arrays above. This procedure should filtrate the points are not needed and keep the
// commit versions monotonically increasing.
@ -298,7 +298,7 @@ int ObTxDataMemtableScanIterator::DEBUG_try_calc_upper_and_check_(ObCommitSCNsAr
SCN upper_trans_version = SCN::min_scn();
if (OB_FAIL(DEBUG_fake_calc_upper_trans_version(tx_data->start_scn_, upper_trans_version, merged_commit_versions))) {
STORAGE_LOG(ERROR, "invalid upper trans version", KR(ret));
} else if (upper_trans_version < tx_data->commit_scn_) {
} else if (upper_trans_version < tx_data->commit_version_) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "invalid upper trans version", KR(ret), K(upper_trans_version), KPC(tx_data));
}
@ -326,7 +326,7 @@ int ObTxDataMemtableScanIterator::DEBUG_fake_calc_upper_trans_version(const SCN
int l = 0;
int r = array.count() - 1;
// Binary find the first start_scn that is greater than or equal to sstable_end_log_ts
// Binary find the first start_scn that is greater than or equal to sstable_end_scn
while (l < r) {
int mid = (l + r) >> 1;
if (array.at(mid).start_scn_ < sstable_end_scn) {
@ -336,14 +336,14 @@ int ObTxDataMemtableScanIterator::DEBUG_fake_calc_upper_trans_version(const SCN
}
}
// Check if the start_scn is greater than or equal to the sstable_end_log_ts. If not, delay the
// Check if the start_scn is greater than or equal to the sstable_end_scn. If not, delay the
// upper_trans_version calculation to the next time.
if (0 == array.count() || !array.at(l).commit_scn_.is_valid()) {
if (0 == array.count() || !array.at(l).commit_version_.is_valid()) {
upper_trans_version.set_max();
ret = OB_ERR_UNDEFINED;
STORAGE_LOG(WARN, "unexpected array count or commit version", K(array.count()), K(array.at(l)));
} else {
upper_trans_version = array.at(l).commit_scn_;
upper_trans_version = array.at(l).commit_version_;
}
return ret;
@ -352,7 +352,7 @@ int ObTxDataMemtableScanIterator::DEBUG_fake_calc_upper_trans_version(const SCN
void ObTxDataMemtableScanIterator::DEBUG_print_start_scn_list_()
{
int ret = OB_SUCCESS;
const char *real_fname = "tx_data_start_log_ts_list";
const char *real_fname = "tx_data_start_scn_list";
FILE *fd = NULL;
if (NULL == (fd = fopen(real_fname, "w"))) {
@ -369,18 +369,18 @@ void ObTxDataMemtableScanIterator::DEBUG_print_start_scn_list_()
fprintf(fd,
"ObTxData : tx_id=%-19ld is_in_memtable=%-3d state=%-8s start_scn=%-19s "
"end_scn=%-19s "
"commit_scn=%-19s\n",
"commit_version=%-19s\n",
tx_data->tx_id_.get_id(),
tx_data->is_in_tx_data_table_,
ObTxData::get_state_string(tx_data->state_),
to_cstring(tx_data->start_scn_),
to_cstring(tx_data->end_scn_),
to_cstring(tx_data->commit_scn_));
to_cstring(tx_data->commit_version_));
}
}
if (NULL != fd) {
fprintf(fd, "end of start log ts list\n");
fprintf(fd, "end of start scn list\n");
fclose(fd);
fd = NULL;
}
@ -405,9 +405,9 @@ void ObTxDataMemtableScanIterator::DEBUG_print_merged_commit_versions_(ObCommitS
for (int i = 0; i < array.count(); i++) {
fprintf(fd,
"start_scn=%-19s "
"commit_scn=%-19s\n",
"commit_version=%-19s\n",
to_cstring(array.at(i).start_scn_),
to_cstring(array.at(i).commit_scn_));
to_cstring(array.at(i).commit_version_));
}
}
@ -490,19 +490,19 @@ int ObTxDataMemtableScanIterator::periodical_get_next_commit_scn_(ObCommitSCNsAr
if (DEBUG_last_start_scn_ > tx_data->start_scn_) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "unexpected start log ts order", K(DEBUG_last_start_scn_), KPC(tx_data));
STORAGE_LOG(ERROR, "unexpected start scn order", K(DEBUG_last_start_scn_), KPC(tx_data));
break;
} else {
DEBUG_last_start_scn_ = tx_data->start_scn_;
}
// update pre_commit_version
if (tx_data->commit_scn_ > cur_max_commit_scn_) {
cur_max_commit_scn_ = tx_data->commit_scn_;
if (tx_data->commit_version_ > cur_max_commit_version_) {
cur_max_commit_version_ = tx_data->commit_version_;
}
// If this tx data is the first tx data in sorted list or its start_scn is 1_s larger than
// the pre_start_log_ts, we use this start_scn to calculate upper_trans_version
// the pre_start_scn, we use this start_scn to calculate upper_trans_version
if (pre_start_scn_.is_min() ||
tx_data->start_scn_ >= SCN::plus(pre_start_scn_, PERIODICAL_SELECT_INTERVAL_NS)/*1s*/) {
pre_start_scn_ = tx_data->start_scn_;
@ -513,7 +513,7 @@ int ObTxDataMemtableScanIterator::periodical_get_next_commit_scn_(ObCommitSCNsAr
if (nullptr != tx_data) {
node.start_scn_ = tx_data->start_scn_;
// use cur_max_commit_version_ to keep the commit versions monotonically increasing
node.commit_scn_ = cur_max_commit_scn_;
node.commit_version_ = cur_max_commit_version_;
// STORAGE_LOG(INFO, "GENGLI ", K(iter_cnt), K(PERIODICAL_SELECT_INTERVAL_NS), K(node));
tx_data = nullptr;
} else if (nullptr == cur_node_) {
@ -591,12 +591,12 @@ int ObTxDataMemtableScanIterator::merge_cur_and_past_commit_verisons_(const palf
// array whose start_scn is larger than the minimum start_scn in current array will be dropped. The reason is in this
// issue: https://work.aone.alibaba-inc.com/issue/43389863
palf::SCN cur_min_start_scn = cur_arr.count() > 0 ? cur_arr.at(0).start_scn_ : palf::SCN::max_scn();
palf::SCN max_commit_scn = palf::SCN::min_scn();
palf::SCN max_commit_version = palf::SCN::min_scn();
if (OB_FAIL(
merge_pre_process_node_(step_len, cur_min_start_scn, recycle_scn, past_arr, max_commit_scn, merged_arr))) {
merge_pre_process_node_(step_len, cur_min_start_scn, recycle_scn, past_arr, max_commit_version, merged_arr))) {
STORAGE_LOG(WARN, "merge past commit versions failed.", KR(ret), K(past_arr), KPC(tx_data_memtable_));
} else if (OB_FAIL(
merge_pre_process_node_(step_len, SCN::max_scn(), recycle_scn, cur_arr, max_commit_scn, merged_arr))) {
merge_pre_process_node_(step_len, SCN::max_scn(), recycle_scn, cur_arr, max_commit_version, merged_arr))) {
STORAGE_LOG(WARN, "merge current commit versions failed.", KR(ret), K(cur_arr), KPC(tx_data_memtable_));
} else if (0 == merged_arr.count()) {
if (OB_FAIL(merged_arr.push_back(ObCommitSCNsArray::Node(SCN::max_scn(), SCN::max_scn())))) {
@ -621,7 +621,7 @@ int ObTxDataMemtableScanIterator::merge_pre_process_node_(const int64_t step_len
const palf::SCN start_scn_limit,
const palf::SCN recycle_scn,
const ObIArray<ObCommitSCNsArray::Node> &data_arr,
palf::SCN &max_commit_scn,
palf::SCN &max_commit_version,
ObIArray<ObCommitSCNsArray::Node> &merged_arr)
{
int ret = OB_SUCCESS;
@ -635,9 +635,9 @@ int ObTxDataMemtableScanIterator::merge_pre_process_node_(const int64_t step_len
if (data_arr.at(i).start_scn_ >= start_scn_limit) {
break;
}
max_commit_scn = std::max(max_commit_scn, data_arr.at(i).commit_scn_);
ObCommitSCNsArray::Node new_node(data_arr.at(i).start_scn_, max_commit_scn);
if (new_node.commit_scn_ <= recycle_scn) {
max_commit_version = std::max(max_commit_version, data_arr.at(i).commit_version_);
ObCommitSCNsArray::Node new_node(data_arr.at(i).start_scn_, max_commit_version);
if (new_node.commit_version_ <= recycle_scn) {
// this tx data should be recycled
// do nothing
} else if (OB_FAIL(merged_arr.push_back(new_node))) {
@ -646,9 +646,9 @@ int ObTxDataMemtableScanIterator::merge_pre_process_node_(const int64_t step_len
}
// push back the last pre-process node
max_commit_scn = std::max(max_commit_scn, data_arr.at(arr_len - 1).commit_scn_);
max_commit_version = std::max(max_commit_version, data_arr.at(arr_len - 1).commit_version_);
if (OB_SUCC(ret) && data_arr.at(arr_len - 1).start_scn_ < start_scn_limit) {
ObCommitSCNsArray::Node new_node(data_arr.at(arr_len - 1).start_scn_, max_commit_scn);
ObCommitSCNsArray::Node new_node(data_arr.at(arr_len - 1).start_scn_, max_commit_version);
if (OB_FAIL(merged_arr.push_back(new_node))) {
STORAGE_LOG(WARN, "push back commit version node failed.", KR(ret), KPC(tx_data_memtable_));
}
@ -888,8 +888,8 @@ int ObTxCtxMemtableScanIterator::init(ObTxCtxMemtable *tx_ctx_memtable)
STORAGE_LOG(WARN, "Failed to reserve tx ctx buffer", K(ret));
} else if (OB_FAIL(meta_buf_.reserve(TX_CTX_META_BUF_LENGTH))) {
STORAGE_LOG(WARN, "Failed to reserve tx ctx meta buffer", K(ret));
// NB: We must first prepare the rec_log_ts for ObLSTxCtxMgr and then
// prepare the rec_log_ts for tx ctx
// NB: We must first prepare the rec_scn for ObLSTxCtxMgr and then
// prepare the rec_scn for tx ctx
} else if (OB_FAIL(ls_tx_ctx_mgr->refresh_aggre_rec_log_ts())) {
STORAGE_LOG(WARN, "Failed to prepare for dump tx ctx", K(ret));
} else if (OB_FAIL(ls_tx_ctx_iter_.set_ready(ls_tx_ctx_mgr))) {

View File

@ -90,7 +90,7 @@ public:
: is_inited_(false),
iter_param_(iter_param),
dump_tx_data_done_(false),
cur_max_commit_scn_(),
cur_max_commit_version_(),
pre_start_scn_(),
tx_data_row_cnt_(0),
pre_tx_data_(nullptr),
@ -157,7 +157,7 @@ private:
const palf::SCN start_scn_limit,
const palf::SCN recycle_scn,
const ObIArray<ObCommitSCNsArray::Node> &data_arr,
palf::SCN &max_commit_scn,
palf::SCN &max_commit_version,
ObIArray<ObCommitSCNsArray::Node> &merged_arr);
int set_row_with_merged_commit_scns_(ObCommitSCNsArray &merged_commit_scns,
@ -179,7 +179,7 @@ private:
bool is_inited_;
const ObTableIterParam &iter_param_;
bool dump_tx_data_done_;
palf::SCN cur_max_commit_scn_;
palf::SCN cur_max_commit_version_;
palf::SCN pre_start_scn_;
int64_t tx_data_row_cnt_;
ObTxData *pre_tx_data_;

View File

@ -287,8 +287,8 @@ void ObTxLogGenerator::gen_prepare_log()
void ObTxLogGenerator::gen_commit_log()
{
int64_t commit_ts = get_timestamp();
palf::SCN commit_scn;
commit_scn.convert_for_lsn_allocator(commit_ts);
palf::SCN commit_version;
commit_version.convert_for_lsn_allocator(commit_ts);
uint64_t checksum = 0;
share::ObLSArray inc_ls_arr;
ObTxBufferNodeArray mds_arr;
@ -302,7 +302,7 @@ void ObTxLogGenerator::gen_commit_log()
}
ObTxCommitLog commit_log(
commit_scn,
commit_version,
checksum,
inc_ls_arr,
mds_arr,

View File

@ -161,7 +161,7 @@ int64_t TestTxCtxTable::ref_count_;
TEST_F(TestTxCtxTable, test_tx_ctx_memtable_mgr)
{
EXPECT_EQ(0, TestTxCtxTable::ref_count_);
EXPECT_EQ(OB_SUCCESS, mt_mgr_->create_memtable(0, /*last_replay_log_ts*/
EXPECT_EQ(OB_SUCCESS, mt_mgr_->create_memtable(SCN::min_scn(), /*last_replay_log_ts*/
0 /*schema_version*/));
EXPECT_EQ(1, TestTxCtxTable::ref_count_);