Add barrier snapshot version for defensive check【4377】
This commit is contained in:
@ -78,7 +78,8 @@ public:
|
|||||||
}
|
}
|
||||||
bool is_valid() const {
|
bool is_valid() const {
|
||||||
switch(type_) {
|
switch(type_) {
|
||||||
case T::READ: return is_read_valid__();
|
case T::STRONG_READ: return is_read_valid__();
|
||||||
|
case T::WEAK_READ: return is_read_valid__();
|
||||||
case T::WRITE: return is_write_valid__();
|
case T::WRITE: return is_write_valid__();
|
||||||
case T::REPLAY: return is_replay_valid__();
|
case T::REPLAY: return is_replay_valid__();
|
||||||
default: return false;
|
default: return false;
|
||||||
@ -109,10 +110,11 @@ public:
|
|||||||
const storage::ObTxTableGuard &tx_table_guard,
|
const storage::ObTxTableGuard &tx_table_guard,
|
||||||
const transaction::ObTxSnapshot &snapshot,
|
const transaction::ObTxSnapshot &snapshot,
|
||||||
const int64_t abs_lock_timeout,
|
const int64_t abs_lock_timeout,
|
||||||
const int64_t tx_lock_timeout)
|
const int64_t tx_lock_timeout,
|
||||||
|
const bool is_weak_read)
|
||||||
{
|
{
|
||||||
reset();
|
reset();
|
||||||
type_ = T::READ;
|
type_ = is_weak_read ? T::WEAK_READ : T::STRONG_READ;
|
||||||
tx_ctx_ = tx_ctx;
|
tx_ctx_ = tx_ctx;
|
||||||
mem_ctx_ = mem_ctx;
|
mem_ctx_ = mem_ctx;
|
||||||
tx_table_guard_ = tx_table_guard;
|
tx_table_guard_ = tx_table_guard;
|
||||||
@ -128,7 +130,7 @@ public:
|
|||||||
{
|
{
|
||||||
transaction::ObTxSnapshot snapshot;
|
transaction::ObTxSnapshot snapshot;
|
||||||
snapshot.version_ = snapshot_version;
|
snapshot.version_ = snapshot_version;
|
||||||
init_read(NULL, NULL, tx_table_guard, snapshot, timeout, tx_lock_timeout);
|
init_read(NULL, NULL, tx_table_guard, snapshot, timeout, tx_lock_timeout, false);
|
||||||
}
|
}
|
||||||
void init_write(transaction::ObPartTransCtx &tx_ctx,
|
void init_write(transaction::ObPartTransCtx &tx_ctx,
|
||||||
ObMemtableCtx &mem_ctx,
|
ObMemtableCtx &mem_ctx,
|
||||||
@ -177,7 +179,8 @@ public:
|
|||||||
ObMemtableCtx *get_mem_ctx() const {
|
ObMemtableCtx *get_mem_ctx() const {
|
||||||
return mem_ctx_;
|
return mem_ctx_;
|
||||||
}
|
}
|
||||||
bool is_read() const { return type_ == T::READ; }
|
bool is_read() const { return type_ == T::STRONG_READ || type_ == T::WEAK_READ; }
|
||||||
|
bool is_weak_read() const { return type_ == T::WEAK_READ; }
|
||||||
bool is_write() const { return type_ == T::WRITE; }
|
bool is_write() const { return type_ == T::WRITE; }
|
||||||
bool is_replay() const { return type_ == T::REPLAY; }
|
bool is_replay() const { return type_ == T::REPLAY; }
|
||||||
int64_t eval_lock_expire_ts(int64_t lock_wait_start_ts = 0) const {
|
int64_t eval_lock_expire_ts(int64_t lock_wait_start_ts = 0) const {
|
||||||
@ -212,7 +215,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
void warn_tx_ctx_leaky_();
|
void warn_tx_ctx_leaky_();
|
||||||
public: // NOTE: those field should only be accessed by txn relative routine
|
public: // NOTE: those field should only be accessed by txn relative routine
|
||||||
enum class T { INVL, READ, WRITE, REPLAY } type_;
|
enum class T { INVL, STRONG_READ, WEAK_READ, WRITE, REPLAY } type_;
|
||||||
// abs_lock_timeout is calculated from the minimum of the wait time of the
|
// abs_lock_timeout is calculated from the minimum of the wait time of the
|
||||||
// select_for_update and timeout in dml_param / scan_param
|
// select_for_update and timeout in dml_param / scan_param
|
||||||
int64_t abs_lock_timeout_;
|
int64_t abs_lock_timeout_;
|
||||||
|
@ -90,6 +90,17 @@ int ObMvccValueIterator::lock_for_read_(const ObQueryFlag &flag)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// add barrier snapshot version for defensive check
|
||||||
|
if (NULL != version_iter_) {
|
||||||
|
if (ctx_->is_weak_read()) {
|
||||||
|
version_iter_->set_safe_read_barrier(ctx_->snapshot_.version_);
|
||||||
|
}
|
||||||
|
if (!flag.is_prewarm()
|
||||||
|
&& !version_iter_->is_elr()) {
|
||||||
|
version_iter_->set_snapshot_version_barrier(ctx_->snapshot_.version_);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
lock_for_read_end(lock_start_time, ret);
|
lock_for_read_end(lock_start_time, ret);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -457,6 +457,11 @@ int ObMemtableCtx::trans_end(
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
if (commit && INT64_MAX == get_trans_version()) {
|
||||||
|
TRANS_LOG(ERROR, "unexpected prepare version", K(*this));
|
||||||
|
// no retcode
|
||||||
|
}
|
||||||
|
|
||||||
ret = do_trans_end(commit,
|
ret = do_trans_end(commit,
|
||||||
trans_version,
|
trans_version,
|
||||||
final_log_ts,
|
final_log_ts,
|
||||||
|
@ -936,7 +936,8 @@ int ObTransService::get_read_store_ctx(const ObTxReadSnapshot &snapshot,
|
|||||||
tx_table_guard,
|
tx_table_guard,
|
||||||
snapshot.core_,
|
snapshot.core_,
|
||||||
store_ctx.timeout_,
|
store_ctx.timeout_,
|
||||||
lock_timeout
|
lock_timeout,
|
||||||
|
snapshot.is_weak_read()
|
||||||
);
|
);
|
||||||
update_max_read_ts_(tenant_id_, ls_id, snapshot.core_.version_);
|
update_max_read_ts_(tenant_id_, ls_id, snapshot.core_.version_);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user