[CP] to issue<54683599>:adjust snapshot check logic when fetch cursor
This commit is contained in:
@ -4143,22 +4143,38 @@ int ObSPIService::do_cursor_fetch(ObPLExecCtx *ctx,
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (cursor->is_need_check_snapshot()) { /* case: select * from dual, snapshot do not initilize, so it's invalid */
|
||||
/* 流式场景:
|
||||
非for update skip lock:isvalid=false,或者commited=true,则报错
|
||||
for update skip lock检查逻辑和for update一致
|
||||
非流式场景:
|
||||
forupdate:isvalid=false,或者commited=true,则报错,否则继续检查事务状态
|
||||
非forupdate,直接看isvalid是否为true */
|
||||
if (lib::is_oracle_mode()) {
|
||||
if (!cursor->get_snapshot().valid_) {
|
||||
if (cursor->is_for_update()) {
|
||||
if (!cursor->get_snapshot().is_valid() || cursor->get_snapshot().is_committed()) {
|
||||
ret = OB_ERR_FETCH_OUT_SEQUENCE;
|
||||
LOG_WARN("snapshot is invalid", K(cursor->get_snapshot()), K(ret));
|
||||
} else {
|
||||
transaction::ObTransID tx_id = cursor->get_trans_id();
|
||||
transaction::ObTransService* txs = MTL(transaction::ObTransService*);
|
||||
bool tx_active = false;
|
||||
CK (OB_NOT_NULL(txs));
|
||||
CK (tx_id.is_valid());
|
||||
OZ (txs->is_tx_active(tx_id, tx_active), tx_id);
|
||||
if (OB_SUCC(ret) && !tx_active) {
|
||||
ret = OB_ERR_FETCH_OUT_SEQUENCE;
|
||||
LOG_WARN("cursor has been closed because of txn was terminated",
|
||||
K(ret), K(tx_id), K(cursor->get_snapshot()));
|
||||
}
|
||||
}
|
||||
} else if (cursor->is_streaming()) {
|
||||
if (!cursor->get_snapshot().is_valid() || cursor->get_snapshot().is_committed()) {
|
||||
ret = OB_ERR_FETCH_OUT_SEQUENCE;
|
||||
LOG_WARN("snapshot is invalid", K(cursor->get_snapshot()), K(ret));
|
||||
}
|
||||
} else if (!cursor->get_snapshot().is_valid()) {
|
||||
ret = OB_ERR_FETCH_OUT_SEQUENCE;
|
||||
LOG_WARN("snapshot is invalid", K(cursor->get_snapshot()), K(ret));
|
||||
} else if (cursor->is_for_update()) {
|
||||
transaction::ObTransID tx_id = cursor->get_trans_id();
|
||||
transaction::ObTransService* txs = MTL(transaction::ObTransService*);
|
||||
bool tx_active = false;
|
||||
CK (OB_NOT_NULL(txs));
|
||||
CK (tx_id.is_valid());
|
||||
OZ (txs->is_tx_active(tx_id, tx_active), tx_id);
|
||||
if (OB_SUCC(ret) && !tx_active) {
|
||||
ret = OB_ERR_FETCH_OUT_SEQUENCE;
|
||||
LOG_WARN("cursor has been closed because of txn was terminated",
|
||||
K(ret), K(tx_id), K(cursor->get_snapshot()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -192,7 +192,8 @@ OB_SERIALIZE_MEMBER(ObTxReadSnapshot,
|
||||
uncertain_bound_,
|
||||
snapshot_lsid_,
|
||||
parts_,
|
||||
snapshot_ls_role_);
|
||||
snapshot_ls_role_,
|
||||
committed_);
|
||||
OB_SERIALIZE_MEMBER(ObTxPart, id_, addr_, epoch_, first_scn_, last_scn_);
|
||||
|
||||
DEFINE_SERIALIZE(ObTxDesc::FLAG::FOR_FIXED_SER_VAL)
|
||||
@ -1160,6 +1161,7 @@ ObTxSnapshot &ObTxSnapshot::operator=(const ObTxSnapshot &r)
|
||||
|
||||
ObTxReadSnapshot::ObTxReadSnapshot()
|
||||
: valid_(false),
|
||||
committed_(false),
|
||||
core_(),
|
||||
source_(SRC::INVL),
|
||||
snapshot_lsid_(),
|
||||
@ -1171,6 +1173,7 @@ ObTxReadSnapshot::ObTxReadSnapshot()
|
||||
ObTxReadSnapshot::~ObTxReadSnapshot()
|
||||
{
|
||||
valid_ = false;
|
||||
committed_ = false;
|
||||
source_ = SRC::INVL;
|
||||
snapshot_ls_role_ = common::INVALID_ROLE;
|
||||
uncertain_bound_ = 0;
|
||||
@ -1179,6 +1182,7 @@ ObTxReadSnapshot::~ObTxReadSnapshot()
|
||||
void ObTxReadSnapshot::reset()
|
||||
{
|
||||
valid_ = false;
|
||||
committed_ = false;
|
||||
core_.reset();
|
||||
source_ = SRC::INVL;
|
||||
snapshot_lsid_.reset();
|
||||
@ -1191,6 +1195,7 @@ int ObTxReadSnapshot::assign(const ObTxReadSnapshot &from)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
valid_ = from.valid_;
|
||||
committed_ = from.committed_;
|
||||
core_ = from.core_;
|
||||
source_ = from.source_;
|
||||
snapshot_lsid_ = from.snapshot_lsid_;
|
||||
|
||||
@ -248,7 +248,8 @@ struct ObTxSnapshot
|
||||
// snapshot used to consistency read
|
||||
struct ObTxReadSnapshot
|
||||
{
|
||||
bool valid_;
|
||||
bool valid_; // used by cursor check snapshot state
|
||||
bool committed_; // used by cursor check snapshot state
|
||||
ObTxSnapshot core_;
|
||||
enum class SRC {
|
||||
INVL = 0,
|
||||
@ -275,6 +276,8 @@ struct ObTxReadSnapshot
|
||||
bool is_none_read() const { return SRC::NONE == source_; }
|
||||
bool is_special() const { return SRC::SPECIAL == source_; }
|
||||
bool is_ls_snapshot() const { return SRC::LS == source_; }
|
||||
bool is_valid() const { return valid_; }
|
||||
bool is_committed() const { return committed_; }
|
||||
int format_source_for_display(char *buf, const int64_t buf_len) const;
|
||||
void reset();
|
||||
int assign(const ObTxReadSnapshot &);
|
||||
@ -287,7 +290,8 @@ struct ObTxReadSnapshot
|
||||
K_(uncertain_bound),
|
||||
K_(snapshot_lsid),
|
||||
K_(snapshot_ls_role),
|
||||
K_(parts));
|
||||
K_(parts),
|
||||
K_(committed));
|
||||
OB_UNIS_VERSION(1);
|
||||
};
|
||||
|
||||
|
||||
@ -628,13 +628,17 @@ void ObTransService::invalid_registered_snapshot_(ObTxDesc &tx)
|
||||
}
|
||||
}
|
||||
|
||||
void ObTransService::registered_snapshot_clear_part_(ObTxDesc &tx)
|
||||
void ObTransService::process_registered_snapshot_on_commit_(ObTxDesc &tx)
|
||||
{
|
||||
// cleanup snapshot's participant info, so that they will skip
|
||||
// verify participant txn ctx, which cause false negative,
|
||||
// because txn ctx has quit when txn committed.
|
||||
int ret = OB_SUCCESS;
|
||||
ARRAY_FOREACH(tx.savepoints_, i) {
|
||||
ObTxSavePoint &p = tx.savepoints_[i];
|
||||
if (p.is_snapshot() && p.snapshot_->valid_) {
|
||||
p.snapshot_->parts_.reset();
|
||||
p.snapshot_->committed_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -347,7 +347,7 @@ int update_user_savepoint_(ObTxDesc &tx, const ObTxSavePointList &savepoints);
|
||||
private:
|
||||
ObTxCtxMgr tx_ctx_mgr_;
|
||||
void invalid_registered_snapshot_(ObTxDesc &tx);
|
||||
void registered_snapshot_clear_part_(ObTxDesc &tx);
|
||||
void process_registered_snapshot_on_commit_(ObTxDesc &tx);
|
||||
int ls_rollback_to_savepoint_(const ObTransID &tx_id,
|
||||
const share::ObLSID &ls,
|
||||
const int64_t verify_epoch,
|
||||
|
||||
@ -1907,20 +1907,10 @@ int ObTransService::add_tx_exec_result(ObTxDesc &tx, const ObTxExecResult &exec_
|
||||
void ObTransService::tx_post_terminate_(ObTxDesc &tx)
|
||||
{
|
||||
// invalid registered snapshot
|
||||
if (tx.state_ == ObTxDesc::State::ABORTED ||
|
||||
tx.is_commit_unsucc()
|
||||
// FIXME: (yunxing.cyx)
|
||||
// bellow line is temproary, when storage layer support
|
||||
// record txn-id in SSTable's MvccRow, we can remove this
|
||||
// (the support was planed in v4.1)
|
||||
|| tx.state_ == ObTxDesc::State::COMMITTED
|
||||
) {
|
||||
if (tx.state_ == ObTxDesc::State::ABORTED || tx.is_commit_unsucc()) {
|
||||
invalid_registered_snapshot_(tx);
|
||||
} else if (tx.state_ == ObTxDesc::State::COMMITTED) {
|
||||
// cleanup snapshot's participant info, so that they will skip
|
||||
// verify participant txn ctx, which cause false negative,
|
||||
// because txn ctx has quit when txn committed.
|
||||
registered_snapshot_clear_part_(tx);
|
||||
process_registered_snapshot_on_commit_(tx);
|
||||
}
|
||||
// statistic
|
||||
if (tx.is_tx_end()) {
|
||||
|
||||
Reference in New Issue
Block a user