[scn] change scn
This commit is contained in:
parent
5a50ec1939
commit
ac437d2f56
@ -154,9 +154,8 @@ int ObAllVirtualLSInfo::process_curr_tenant(ObNewRow *&row)
|
||||
cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset()));
|
||||
break;
|
||||
case OB_APP_MIN_COLUMN_ID + 9:
|
||||
//TODO: SCN
|
||||
// clog_checkpoint_ts
|
||||
cur_row_.cells_[i].set_uint64(ls_info.checkpoint_ts_ < 0 ? 0 : ls_info.checkpoint_ts_);
|
||||
cur_row_.cells_[i].set_uint64(!ls_info.checkpoint_scn_.is_valid() ? 0 : ls_info.checkpoint_scn_.get_val_for_tx());
|
||||
break;
|
||||
case OB_APP_MIN_COLUMN_ID + 10:
|
||||
// clog_checkpoint_lsn
|
||||
|
@ -680,7 +680,7 @@ int ObSSTable::check_row_locked(ObStoreCtx &ctx,
|
||||
if (!meta_.is_empty()) {
|
||||
// skip reference upper_trans_version of empty_sstable, which may greater than real
|
||||
// committed transaction's version
|
||||
lock_state.trans_version_.convert_for_lsn_allocator(get_upper_trans_version());
|
||||
lock_state.trans_version_.convert_for_tx(get_upper_trans_version());
|
||||
}
|
||||
} else if (NULL == (buf = allocator.alloc(sizeof(ObSSTableRowLockChecker)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
|
@ -779,7 +779,7 @@ int ObLS::get_ls_info(ObLSVTInfo &ls_info)
|
||||
ls_info.tablet_count_ = ls_tablet_svr_.get_tablet_count();
|
||||
ls_info.weak_read_scn_ = ls_wrs_handler_.get_ls_weak_read_ts();
|
||||
ls_info.need_rebuild_ = is_need_rebuild;
|
||||
ls_info.checkpoint_ts_ = ls_meta_.get_clog_checkpoint_ts();
|
||||
ls_info.checkpoint_scn_ = ls_meta_.get_clog_checkpoint_scn();
|
||||
ls_info.checkpoint_lsn_ = ls_meta_.get_clog_base_lsn().val_;
|
||||
}
|
||||
return ret;
|
||||
|
@ -119,8 +119,7 @@ struct ObLSVTInfo
|
||||
int64_t tablet_count_;
|
||||
palf::SCN weak_read_scn_;
|
||||
bool need_rebuild_;
|
||||
//TODO SCN
|
||||
int64_t checkpoint_ts_;
|
||||
palf::SCN checkpoint_scn_;
|
||||
//TODO SCN
|
||||
int64_t checkpoint_lsn_;
|
||||
};
|
||||
@ -304,7 +303,6 @@ public:
|
||||
// const bool write_slog = true);
|
||||
UPDATE_LSMETA_WITH_LOCK(ls_meta_, set_clog_checkpoint);
|
||||
UPDATE_LSMETA_WITHOUT_LOCK(ls_meta_, set_clog_checkpoint);
|
||||
CONST_DELEGATE_WITH_RET(ls_meta_, get_clog_checkpoint_ts, int64_t);
|
||||
CONST_DELEGATE_WITH_RET(ls_meta_, get_clog_checkpoint_scn, palf::SCN);
|
||||
DELEGATE_WITH_RET(ls_meta_, get_clog_base_lsn, palf::LSN &);
|
||||
DELEGATE_WITH_RET(ls_meta_, get_saved_info, int);
|
||||
|
@ -136,12 +136,6 @@ SCN ObLSMeta::get_clog_checkpoint_scn() const
|
||||
return clog_checkpoint_scn_;
|
||||
}
|
||||
|
||||
int64_t ObLSMeta::get_clog_checkpoint_ts() const
|
||||
{
|
||||
ObSpinLockTimeGuard guard(lock_);
|
||||
return clog_checkpoint_scn_.get_val_for_lsn_allocator();
|
||||
}
|
||||
|
||||
int ObLSMeta::set_clog_checkpoint(const LSN &clog_checkpoint_lsn,
|
||||
const SCN &clog_checkpoint_scn,
|
||||
const bool write_slog)
|
||||
|
@ -47,7 +47,6 @@ public:
|
||||
~ObLSMeta() {}
|
||||
ObLSMeta &operator=(const ObLSMeta &other);
|
||||
palf::SCN get_clog_checkpoint_scn() const;
|
||||
int64_t get_clog_checkpoint_ts() const;
|
||||
palf::LSN &get_clog_base_lsn();
|
||||
int set_clog_checkpoint(const palf::LSN &clog_checkpoint_lsn,
|
||||
const palf::SCN &clog_checkpoint_scn,
|
||||
|
@ -89,10 +89,10 @@ int ObMultiVersionValueIterator::init_multi_version_iter()
|
||||
iter = iter->prev_;
|
||||
}
|
||||
|
||||
max_committed_trans_version_ = (NULL != version_iter_) ? version_iter_->trans_version_.get_val_for_lsn_allocator() : -1;
|
||||
max_committed_trans_version_ = (NULL != version_iter_) ? version_iter_->trans_version_.get_val_for_tx() : -1;
|
||||
// NB: It will assign -1 to cur_trans_version_, while it will not
|
||||
// cause any wrong logic, but take care of it
|
||||
cur_trans_version_.convert_for_lsn_allocator(max_committed_trans_version_);
|
||||
cur_trans_version_.convert_for_tx(max_committed_trans_version_);
|
||||
multi_version_iter_ = iter;
|
||||
if (max_committed_trans_version_ <= version_range_.multi_version_start_) {
|
||||
//如果多版本的开始版本大于等于当前以提交的最大版本
|
||||
@ -285,7 +285,7 @@ int ObMultiVersionValueIterator::get_next_node(const void *&tnode)
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "not init", K(ret), KP(this));
|
||||
} else if (OB_ISNULL(version_iter_)
|
||||
|| version_iter_->trans_version_.get_val_for_lsn_allocator() <= version_range_.base_version_) {
|
||||
|| version_iter_->trans_version_.get_val_for_tx() <= version_range_.base_version_) {
|
||||
version_iter_ = NULL;
|
||||
ret = OB_ITER_END;
|
||||
} else {
|
||||
@ -317,7 +317,7 @@ int ObMultiVersionValueIterator::get_next_node_for_compact(const void *&tnode)
|
||||
ret = OB_NOT_INIT;
|
||||
TRANS_LOG(WARN, "not init", K(ret), KP(this));
|
||||
} else if ((OB_NOT_NULL(version_iter_)
|
||||
&& version_iter_->trans_version_.get_val_for_lsn_allocator() <= version_range_.base_version_) ||
|
||||
&& version_iter_->trans_version_.get_val_for_tx() <= version_range_.base_version_) ||
|
||||
OB_ISNULL(version_iter_)) {
|
||||
version_iter_ = nullptr;
|
||||
ret = OB_ITER_END;
|
||||
@ -357,11 +357,11 @@ int ObMultiVersionValueIterator::get_next_multi_version_node(const void *&tnode)
|
||||
const palf::SCN cur_trans_version = multi_version_iter_->trans_version_;
|
||||
ObMvccTransNode *record_node = nullptr;
|
||||
while (OB_SUCC(ret) && OB_NOT_NULL(multi_version_iter_) && OB_ISNULL(record_node)) {
|
||||
if (multi_version_iter_->trans_version_.get_val_for_lsn_allocator() <= version_range_.base_version_) {
|
||||
if (multi_version_iter_->trans_version_.get_val_for_tx() <= version_range_.base_version_) {
|
||||
multi_version_iter_ = NULL;
|
||||
break;
|
||||
} else if (NDT_COMPACT == multi_version_iter_->type_) { // meet compacted node
|
||||
if (multi_version_iter_->trans_version_.get_val_for_lsn_allocator() > version_range_.multi_version_start_) {
|
||||
if (multi_version_iter_->trans_version_.get_val_for_tx() > version_range_.multi_version_start_) {
|
||||
// ignore compact node
|
||||
is_compacted = true;
|
||||
} else { // multi_version_iter_->trans_version_ <= multi_version_start
|
||||
@ -371,7 +371,7 @@ int ObMultiVersionValueIterator::get_next_multi_version_node(const void *&tnode)
|
||||
break;
|
||||
}
|
||||
} else { // not compacted node
|
||||
if (multi_version_iter_->trans_version_.get_val_for_lsn_allocator() <= version_range_.multi_version_start_) {
|
||||
if (multi_version_iter_->trans_version_.get_val_for_tx() <= version_range_.multi_version_start_) {
|
||||
is_node_compacted_ = true;
|
||||
}
|
||||
record_node = multi_version_iter_;
|
||||
@ -443,7 +443,7 @@ void ObMultiVersionValueIterator::reset()
|
||||
bool ObMultiVersionValueIterator::is_multi_version_iter_end() const
|
||||
{
|
||||
return OB_ISNULL(multi_version_iter_)
|
||||
|| multi_version_iter_->trans_version_.get_val_for_lsn_allocator() <= version_range_.base_version_;
|
||||
|| multi_version_iter_->trans_version_.get_val_for_tx() <= version_range_.base_version_;
|
||||
}
|
||||
|
||||
bool ObMultiVersionValueIterator::is_trans_node_iter_null() const
|
||||
@ -454,7 +454,7 @@ bool ObMultiVersionValueIterator::is_trans_node_iter_null() const
|
||||
bool ObMultiVersionValueIterator::is_compact_iter_end() const
|
||||
{
|
||||
return OB_ISNULL(version_iter_)
|
||||
|| version_iter_->trans_version_.get_val_for_lsn_allocator() <= version_range_.base_version_;
|
||||
|| version_iter_->trans_version_.get_val_for_tx() <= version_range_.base_version_;
|
||||
}
|
||||
|
||||
DEF_TO_STRING(ObMultiVersionValueIterator) {
|
||||
@ -498,12 +498,12 @@ int ObMultiVersionRowIterator::init(
|
||||
if (OB_FAIL(query_engine.scan(
|
||||
range.start_key_, !range.border_flag_.inclusive_start(),
|
||||
range.end_key_, !range.border_flag_.inclusive_end(),
|
||||
ctx.snapshot_.version_.get_val_for_lsn_allocator(),
|
||||
ctx.snapshot_.version_.get_val_for_tx(),
|
||||
query_engine_iter_))) {
|
||||
TRANS_LOG(WARN, "query engine scan fail", K(ret));
|
||||
} else {
|
||||
query_engine_ = &query_engine;
|
||||
query_engine_iter_->set_version(ctx.snapshot_.version_.get_val_for_lsn_allocator());
|
||||
query_engine_iter_->set_version(ctx.snapshot_.version_.get_val_for_tx());
|
||||
ctx_ = &ctx;
|
||||
version_range_ = version_range;
|
||||
is_inited_ = true;
|
||||
|
@ -331,14 +331,14 @@ int ObMvccRowIterator::init(
|
||||
if (OB_FAIL(query_engine.scan(
|
||||
range.start_key_, !range.border_flag_.inclusive_start(),
|
||||
range.end_key_, !range.border_flag_.inclusive_end(),
|
||||
ctx.snapshot_.version_.get_val_for_lsn_allocator(),
|
||||
ctx.snapshot_.version_.get_val_for_tx(),
|
||||
query_engine_iter_))) {
|
||||
TRANS_LOG(WARN, "query engine scan fail", K(ret));
|
||||
} else {
|
||||
ctx_ = &ctx;
|
||||
query_flag_ = query_flag;
|
||||
query_engine_ = &query_engine;
|
||||
query_engine_iter_->set_version(ctx.snapshot_.version_.get_val_for_lsn_allocator());
|
||||
query_engine_iter_->set_version(ctx.snapshot_.version_.get_val_for_tx());
|
||||
is_inited_ = true;
|
||||
}
|
||||
return ret;
|
||||
@ -431,7 +431,7 @@ int ObMvccRowIterator::try_purge(const ObTxSnapshot &snapshot_info,
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
} else if (OB_FAIL(query_engine_->check_and_purge(key,
|
||||
row,
|
||||
snapshot_info.version_.get_val_for_lsn_allocator(),
|
||||
snapshot_info.version_.get_val_for_tx(),
|
||||
purged))) {
|
||||
STORAGE_LOG(ERROR, "check_and_purge", K(ret), K(key), K(row), K(snapshot_info));
|
||||
} else if (purged) {
|
||||
@ -447,7 +447,7 @@ int ObMvccRowIterator::get_end_gap_key(const ObTxSnapshot &snapshot_info, const
|
||||
bool is_reverse = iter->is_reverse_scan();
|
||||
return query_engine_->skip_gap(iter->get_key(),
|
||||
key,
|
||||
snapshot_info.version_.get_val_for_lsn_allocator(),
|
||||
snapshot_info.version_.get_val_for_tx(),
|
||||
is_reverse,
|
||||
size);
|
||||
}
|
||||
|
@ -475,7 +475,7 @@ bool ObMvccRow::is_partial(const int64_t version) const
|
||||
// by the version
|
||||
bool_ret = false;
|
||||
} else if (FALSE_IT(is_locked = !(last->is_committed() || last->is_aborted()))) {
|
||||
} else if (!is_locked && version > max_trans_version_.get_val_for_lsn_allocator()) {
|
||||
} else if (!is_locked && version > max_trans_version_.get_val_for_tx()) {
|
||||
// Case2: no data is locked on the memtable row and the max version on the
|
||||
// row is smaller than the version , so the row is completed by the
|
||||
// version
|
||||
@ -484,7 +484,7 @@ bool ObMvccRow::is_partial(const int64_t version) const
|
||||
// Case3: if row is locked or the max trans version on the row is larger
|
||||
// than the version, we mark it as partial, otherwise we mark it as
|
||||
// completed
|
||||
bool_ret = is_locked || (last->trans_version_.get_val_for_lsn_allocator() > version);
|
||||
bool_ret = is_locked || (last->trans_version_.get_val_for_tx() > version);
|
||||
}
|
||||
|
||||
return bool_ret;
|
||||
@ -510,7 +510,7 @@ bool ObMvccRow::is_del(const int64_t version) const
|
||||
// Case3: data on the memtable row is not locked while the last node is not
|
||||
// delete node so the row is not deleted by the version
|
||||
bool_ret = false;
|
||||
} else if (last->trans_version_.get_val_for_lsn_allocator() > version) {
|
||||
} else if (last->trans_version_.get_val_for_tx() > version) {
|
||||
// Case3: data on the memtable row is not locked, the last node is delete
|
||||
// node while the trans version of the last node is larger than the
|
||||
// version so the row may not deleted by the version
|
||||
|
@ -1132,8 +1132,7 @@ void ObMemtableMultiVersionScanIterator::set_flag_and_version_for_compacted_row(
|
||||
{
|
||||
const bool is_committed = reinterpret_cast<const ObMvccTransNode*>(tnode)->is_committed();
|
||||
const int64_t trans_version = is_committed
|
||||
? reinterpret_cast<const ObMvccTransNode*>(tnode)->trans_version_.get_val_for_lsn_allocator()
|
||||
: INT64_MAX;
|
||||
? reinterpret_cast<const ObMvccTransNode*>(tnode)->trans_version_.get_val_for_tx() : INT64_MAX;
|
||||
row.snapshot_version_ = std::max(trans_version, row.snapshot_version_);
|
||||
STORAGE_LOG(DEBUG, "row snapshot version", K(row.snapshot_version_));
|
||||
}
|
||||
@ -1276,7 +1275,7 @@ int ObMemtableMultiVersionScanIterator::iterate_multi_version_row_value_(ObDatum
|
||||
if (row.row_flag_.is_not_exist()) {
|
||||
row.row_flag_.set_flag(mtd->dml_flag_);
|
||||
}
|
||||
compare_trans_version = reinterpret_cast<const ObMvccTransNode *>(tnode)->trans_version_.get_val_for_lsn_allocator();
|
||||
compare_trans_version = reinterpret_cast<const ObMvccTransNode *>(tnode)->trans_version_.get_val_for_tx();
|
||||
if (compare_trans_version <= version_range.base_version_) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "trans version smaller than base version", K(compare_trans_version), K(version_range.base_version_));
|
||||
@ -1362,7 +1361,7 @@ OB_INLINE int ObReadRow::iterate_row_value_(
|
||||
TRANS_LOG(WARN, "transa node value is null", K(ret), KP(tnode), KP(mtd));
|
||||
} else {
|
||||
const bool is_committed = reinterpret_cast<const ObMvccTransNode *>(tnode)->is_committed();
|
||||
const int64_t trans_version = is_committed ? reinterpret_cast<const ObMvccTransNode *>(tnode)->trans_version_.get_val_for_lsn_allocator() : INT64_MAX;
|
||||
const int64_t trans_version = is_committed ? reinterpret_cast<const ObMvccTransNode *>(tnode)->trans_version_.get_val_for_tx() : INT64_MAX;
|
||||
if (row.row_flag_.is_not_exist()) {
|
||||
if (ObDmlFlag::DF_DELETE == mtd->dml_flag_) {
|
||||
row.row_flag_.set_flag(ObDmlFlag::DF_DELETE);
|
||||
|
@ -189,8 +189,8 @@ int ObLSTxCtxMgr::init(const int64_t tenant_id,
|
||||
lock_table_ = lock_table;
|
||||
txs_ = txs;
|
||||
ts_mgr_ = ts_mgr;
|
||||
aggre_rec_log_ts_ = OB_INVALID_TIMESTAMP;
|
||||
prev_aggre_rec_log_ts_ = OB_INVALID_TIMESTAMP;
|
||||
aggre_rec_scn_.reset();
|
||||
prev_aggre_rec_scn_.reset();
|
||||
online_ts_ = 0;
|
||||
TRANS_LOG(INFO, "ObLSTxCtxMgr inited success", KP(this), K(ls_id));
|
||||
}
|
||||
@ -218,8 +218,8 @@ void ObLSTxCtxMgr::reset()
|
||||
total_tx_ctx_count_ = 0;
|
||||
leader_takeover_ts_.reset();
|
||||
max_replay_commit_version_.reset();
|
||||
aggre_rec_log_ts_ = OB_INVALID_TIMESTAMP;
|
||||
prev_aggre_rec_log_ts_ = OB_INVALID_TIMESTAMP;
|
||||
aggre_rec_scn_.reset();
|
||||
prev_aggre_rec_scn_.reset();
|
||||
online_ts_ = 0;
|
||||
txs_ = NULL;
|
||||
ts_mgr_ = NULL;
|
||||
@ -232,8 +232,8 @@ void ObLSTxCtxMgr::reset()
|
||||
|
||||
int ObLSTxCtxMgr::offline()
|
||||
{
|
||||
aggre_rec_log_ts_ = OB_INVALID_TIMESTAMP;
|
||||
prev_aggre_rec_log_ts_ = OB_INVALID_TIMESTAMP;
|
||||
aggre_rec_scn_.reset();
|
||||
prev_aggre_rec_scn_.reset();
|
||||
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
@ -1286,8 +1286,8 @@ int ObLSTxCtxMgr::on_tx_ctx_table_flushed()
|
||||
} else if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) {
|
||||
TRANS_LOG(WARN, "for each transaction context error", KR(ret), "manager", *this);
|
||||
} else {
|
||||
// To mark the checkpoint is succeed, we reset the prev_aggre_rec_log_ts
|
||||
prev_aggre_rec_log_ts_ = OB_INVALID_TIMESTAMP;
|
||||
// To mark the checkpoint is succeed, we reset the prev_aggre_rec_scn
|
||||
prev_aggre_rec_scn_.reset();
|
||||
TRANS_LOG(INFO, "succ to on tx ctx table flushed", K(*this));
|
||||
}
|
||||
}
|
||||
@ -1311,50 +1311,43 @@ int ObLSTxCtxMgr::get_min_start_scn(palf::SCN &min_start_scn)
|
||||
|
||||
palf::SCN ObLSTxCtxMgr::get_aggre_rec_scn_()
|
||||
{
|
||||
// Default means no necessary to recover
|
||||
palf::SCN ret = palf::SCN::max_scn();
|
||||
int64_t prev_aggre_rec_log_ts = ATOMIC_LOAD(&prev_aggre_rec_log_ts_);
|
||||
int64_t aggre_rec_log_ts = ATOMIC_LOAD(&aggre_rec_log_ts_);
|
||||
int64_t ret_ts = INT64_MAX;
|
||||
|
||||
palf::SCN ret;
|
||||
palf::SCN prev_aggre_rec_scn = prev_aggre_rec_scn_.atomic_get();
|
||||
palf::SCN aggre_rec_scn = aggre_rec_scn_.atomic_get();
|
||||
|
||||
// Before the checkpoint of the tx ctx table is succeed, we should still use
|
||||
// the prev_aggre_log_ts. And after successfully checkpointed, we can use the
|
||||
// new aggre_rec_log_ts if exist
|
||||
if (OB_INVALID_TIMESTAMP != prev_aggre_rec_log_ts &&
|
||||
OB_INVALID_TIMESTAMP != aggre_rec_log_ts) {
|
||||
ret_ts = MIN(prev_aggre_rec_log_ts, aggre_rec_log_ts);
|
||||
} else if (OB_INVALID_TIMESTAMP != prev_aggre_rec_log_ts) {
|
||||
ret_ts = prev_aggre_rec_log_ts;
|
||||
} else if (OB_INVALID_TIMESTAMP != aggre_rec_log_ts) {
|
||||
ret_ts = aggre_rec_log_ts;
|
||||
}
|
||||
if (INT64_MAX != ret_ts) {
|
||||
ret.convert_for_lsn_allocator(ret_ts);
|
||||
}
|
||||
if (!ret.is_valid()) {
|
||||
TRANS_LOG(WARN, "convert for lsn fail", K(ret_ts), K(ret));
|
||||
// new aggre_rec_scn if exist
|
||||
if (prev_aggre_rec_scn.is_valid() &&
|
||||
aggre_rec_scn.is_valid()) {
|
||||
ret = MIN(prev_aggre_rec_scn, aggre_rec_scn);
|
||||
} else if (prev_aggre_rec_scn.is_valid()) {
|
||||
ret = prev_aggre_rec_scn;
|
||||
} else if (aggre_rec_scn.is_valid()) {
|
||||
ret = aggre_rec_scn;
|
||||
} else {
|
||||
ret.set_max();
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSTxCtxMgr::refresh_aggre_rec_log_ts()
|
||||
int ObLSTxCtxMgr::refresh_aggre_rec_scn()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
WLockGuard guard(rwlock_);
|
||||
|
||||
if (OB_INVALID_TIMESTAMP == prev_aggre_rec_log_ts_) {
|
||||
if (!prev_aggre_rec_scn_.is_valid()) {
|
||||
// We should remember the rec_log_ts before the tx ctx table is successfully
|
||||
// checkpointed
|
||||
int64_t old_v = 0;
|
||||
int64_t new_v = 0;
|
||||
palf::SCN old_v;
|
||||
palf::SCN new_v;
|
||||
do {
|
||||
old_v = aggre_rec_log_ts_;
|
||||
new_v = OB_INVALID_TIMESTAMP;
|
||||
} while (ATOMIC_CAS(&aggre_rec_log_ts_, old_v, new_v) != old_v);
|
||||
old_v = aggre_rec_scn_;
|
||||
new_v.reset();
|
||||
} while (aggre_rec_scn_.atomic_vcas(old_v, new_v) != old_v);
|
||||
|
||||
prev_aggre_rec_log_ts_ = old_v;
|
||||
prev_aggre_rec_scn_ = old_v;
|
||||
} else {
|
||||
TRANS_LOG(WARN, "Concurrent merge may be because of previous failure", K(*this));
|
||||
}
|
||||
@ -1362,24 +1355,24 @@ int ObLSTxCtxMgr::refresh_aggre_rec_log_ts()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSTxCtxMgr::update_aggre_log_ts_wo_lock(int64_t rec_log_ts)
|
||||
int ObLSTxCtxMgr::update_aggre_log_ts_wo_lock(palf::SCN rec_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (OB_INVALID_TIMESTAMP != rec_log_ts) {
|
||||
if (rec_scn.is_valid()) {
|
||||
// we cannot lock here, because the lock order must be
|
||||
// ObLSTxCtxMgr -> ObPartTransCtx, otherwise we may be
|
||||
// deadlocked
|
||||
int64_t old_v = 0;
|
||||
int64_t new_v = 0;
|
||||
palf::SCN old_v;
|
||||
palf::SCN new_v;
|
||||
do {
|
||||
old_v = aggre_rec_log_ts_;
|
||||
if (OB_INVALID_TIMESTAMP == old_v) {
|
||||
new_v = rec_log_ts;
|
||||
old_v = aggre_rec_scn_;
|
||||
if (!old_v.is_valid()) {
|
||||
new_v = rec_scn;
|
||||
} else {
|
||||
new_v = MIN(old_v, rec_log_ts);
|
||||
new_v = MIN(old_v, rec_scn);
|
||||
}
|
||||
} while (ATOMIC_CAS(&aggre_rec_log_ts_, old_v, new_v) != old_v);
|
||||
} while (aggre_rec_scn_.atomic_vcas(old_v, new_v) != old_v);
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
@ -469,11 +469,11 @@ public:
|
||||
int64_t get_state() { return get_state_(); }
|
||||
|
||||
// Switch the prev_aggre_log_ts and aggre_log_ts during dump starts
|
||||
int refresh_aggre_rec_log_ts();
|
||||
int refresh_aggre_rec_scn();
|
||||
|
||||
// Update aggre_log_ts without lock, because we canot lock using the order of
|
||||
// ObPartTransCtx -> ObLSTxCtxMgr, It will be a deadlock with normal order.
|
||||
int update_aggre_log_ts_wo_lock(int64_t rec_log_ts);
|
||||
int update_aggre_log_ts_wo_lock(palf::SCN rec_log_ts);
|
||||
|
||||
TO_STRING_KV(KP(this),
|
||||
K_(ls_id),
|
||||
@ -482,8 +482,8 @@ public:
|
||||
State::state_str(state_),
|
||||
K_(total_tx_ctx_count),
|
||||
K_(ls_retain_ctx_mgr),
|
||||
K_(aggre_rec_log_ts),
|
||||
K_(prev_aggre_rec_log_ts),
|
||||
K_(aggre_rec_scn),
|
||||
K_(prev_aggre_rec_scn),
|
||||
"uref",
|
||||
(!is_inited_ ? -1 : get_uref()));
|
||||
private:
|
||||
@ -706,8 +706,8 @@ private:
|
||||
// remembered while must be released soon, in the ctx_mgr.
|
||||
//
|
||||
// [1]: It you are interested in rec_log_ts, you can see ARIES paper.
|
||||
int64_t aggre_rec_log_ts_;
|
||||
int64_t prev_aggre_rec_log_ts_;
|
||||
palf::SCN aggre_rec_scn_;
|
||||
palf::SCN prev_aggre_rec_scn_;
|
||||
|
||||
// Online timestamp for ObLSTxCtxMgr
|
||||
int64_t online_ts_;
|
||||
|
@ -374,7 +374,7 @@ int ObPartTransCtx::trans_clear_()
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (is_ctx_table_merged_
|
||||
&& OB_FAIL(ls_tx_ctx_mgr_->update_aggre_log_ts_wo_lock(get_rec_log_ts_().get_val_for_lsn_allocator()))) {
|
||||
&& OB_FAIL(ls_tx_ctx_mgr_->update_aggre_log_ts_wo_lock(get_rec_log_ts_()))) {
|
||||
TRANS_LOG(ERROR, "update aggre log ts wo lock failed", KR(ret), "context", *this);
|
||||
} else {
|
||||
ret = mt_ctx_.trans_clear();
|
||||
@ -1354,30 +1354,6 @@ int ObPartTransCtx::on_tx_ctx_table_flushed()
|
||||
return ret;
|
||||
}
|
||||
|
||||
// for ob admin only
|
||||
void ObPartTransCtx::set_trans_table_status(const ObTrxToolArg &arg)
|
||||
{
|
||||
ObTransTableStatusType status = ObTransTableStatusType(arg.status_);
|
||||
int64_t trans_version = arg.trans_version_;
|
||||
const int64_t end_log_ts = arg.end_log_ts_;
|
||||
//TODO(handora.qc): fix it
|
||||
palf::SCN trans_scn;
|
||||
trans_scn.convert_for_lsn_allocator(trans_version);
|
||||
|
||||
if (ObTransTableStatusType::COMMIT == status) {
|
||||
mt_ctx_.set_commit_version(trans_scn);
|
||||
} else {
|
||||
mt_ctx_.set_trans_version(trans_scn);
|
||||
}
|
||||
|
||||
if (ObTransTableStatusType::COMMIT == status || ObTransTableStatusType::ABORT == status) {
|
||||
end_log_ts_.convert_for_lsn_allocator(end_log_ts);
|
||||
if (!end_log_ts_.is_valid()) {
|
||||
TRANS_LOG(WARN, "convert for lsn fail", K(end_log_ts));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int64_t ObPartTransCtx::to_string(char* buf, const int64_t buf_len) const
|
||||
{
|
||||
int64_t len1 = 0;
|
||||
@ -4690,8 +4666,7 @@ int ObPartTransCtx::check_with_tx_data(ObITxDataCheckFunctor &fn)
|
||||
// NB: we must read the state then the version without lock. If you are interested in the
|
||||
// order, then you can read the comment in ob_tx_data_functor.cpp
|
||||
ObTxState state = exec_info_.state_;
|
||||
const int64_t prepare_version = mt_ctx_.get_trans_version().get_val_for_lsn_allocator();
|
||||
ObTxCCCtx tx_cc_ctx(state, prepare_version);
|
||||
ObTxCCCtx tx_cc_ctx(state, mt_ctx_.get_trans_version());
|
||||
|
||||
if (OB_FAIL(fn(*tx_data_ptr, &tx_cc_ctx))) {
|
||||
TRANS_LOG(WARN, "do data check function fail.", KR(ret), K(*this));
|
||||
|
@ -199,7 +199,6 @@ public:
|
||||
bool need_update_schema_version(const int64_t log_id,
|
||||
const palf::SCN log_ts);
|
||||
|
||||
void set_trans_table_status(const obrpc::ObTrxToolArg &arg);
|
||||
share::ObLSID get_ls_id() const { return ls_id_; }
|
||||
|
||||
// for elr
|
||||
|
@ -191,19 +191,18 @@ public:
|
||||
common::SpinRWLock lock_;
|
||||
};
|
||||
|
||||
// TODO: Redefine it
|
||||
class ObTxCCCtx
|
||||
{
|
||||
public:
|
||||
// For Tx Ctx Table
|
||||
ObTxCCCtx(transaction::ObTxState state, int64_t prepare_version)
|
||||
ObTxCCCtx(transaction::ObTxState state, palf::SCN prepare_version)
|
||||
: state_(state), prepare_version_(prepare_version) {}
|
||||
// For Tx Data Table
|
||||
ObTxCCCtx() : state_(transaction::ObTxState::MAX), prepare_version_(-1) {}
|
||||
ObTxCCCtx() : state_(transaction::ObTxState::MAX), prepare_version_(palf::SCN::invalid_scn()) {}
|
||||
TO_STRING_KV(K_(state), K_(prepare_version));
|
||||
public:
|
||||
transaction::ObTxState state_;
|
||||
int64_t prepare_version_;
|
||||
palf::SCN prepare_version_;
|
||||
};
|
||||
|
||||
class ObTxCommitData
|
||||
|
@ -231,14 +231,14 @@ int LockForReadFunctor::inner_lock_for_read(const ObTxData &tx_data, ObTxCCCtx *
|
||||
// updated as prepared until log is applied and the application is
|
||||
// asynchronous. So we need use version instead of state as judgement and
|
||||
// mark it whenever we submit the commit/prepare log(using before_prepare)
|
||||
if (INT64_MAX == tx_cc_ctx->prepare_version_) {
|
||||
if (tx_cc_ctx->prepare_version_.is_max()) {
|
||||
// Case 2.2.1: data is not in 2pc state, so the prepare version and
|
||||
// commit version of the data must be bigger than the read txn's
|
||||
// snapshot version, so we cannot read it and trans version is
|
||||
// unnecessary for the running txn
|
||||
can_read_ = false;
|
||||
trans_version_ = SCN::min_scn();
|
||||
} else if (tx_cc_ctx->prepare_version_ > snapshot_version.get_val_for_lsn_allocator()) {
|
||||
} else if (tx_cc_ctx->prepare_version_ > snapshot_version) {
|
||||
// Case 2.2.2: data is at least in prepare state and the prepare
|
||||
// version is bigger than the read txn's snapshot version, then the
|
||||
// data's commit version must be bigger than the read txn's snapshot
|
||||
@ -360,7 +360,7 @@ int ObCleanoutTxNodeOperation::operator()(const ObTxData &tx_data, ObTxCCCtx *tx
|
||||
// updated as prepared until log is applied and the application is
|
||||
// asynchronous. So we need use version instead of state as judgement and
|
||||
// mark it whenever we submit the commit/prepare log(using before_prepare)
|
||||
&& INT64_MAX == tx_cc_ctx->prepare_version_) {
|
||||
&& tx_cc_ctx->prepare_version_.is_max()) {
|
||||
// Case 1: data is during execution, so we donot need write back
|
||||
// This is the case for most of the lock for read scenerio, so we need to
|
||||
// mainly optimize it through not latching the row
|
||||
@ -379,7 +379,7 @@ int ObCleanoutTxNodeOperation::operator()(const ObTxData &tx_data, ObTxCCCtx *tx
|
||||
(void)tnode_.trans_abort(tx_data.end_scn_);
|
||||
}
|
||||
} else if (ObTxData::RUNNING == tx_data.state_) {
|
||||
if (INT64_MAX != tx_cc_ctx->prepare_version_) {
|
||||
if (!tx_cc_ctx->prepare_version_.is_max()) {
|
||||
// Case 3: data is prepared, we also donot write back the prepare state
|
||||
}
|
||||
} else if (ObTxData::COMMIT == tx_data.state_) {
|
||||
|
@ -890,7 +890,7 @@ int ObTxCtxMemtableScanIterator::init(ObTxCtxMemtable *tx_ctx_memtable)
|
||||
STORAGE_LOG(WARN, "Failed to reserve tx ctx meta buffer", K(ret));
|
||||
// NB: We must first prepare the rec_scn for ObLSTxCtxMgr and then
|
||||
// prepare the rec_scn for tx ctx
|
||||
} else if (OB_FAIL(ls_tx_ctx_mgr->refresh_aggre_rec_log_ts())) {
|
||||
} else if (OB_FAIL(ls_tx_ctx_mgr->refresh_aggre_rec_scn())) {
|
||||
STORAGE_LOG(WARN, "Failed to prepare for dump tx ctx", K(ret));
|
||||
} else if (OB_FAIL(ls_tx_ctx_iter_.set_ready(ls_tx_ctx_mgr))) {
|
||||
STORAGE_LOG(WARN, "ls_tx_ctx_iter set_ready failed", K(ret));
|
||||
|
Loading…
x
Reference in New Issue
Block a user