diff --git a/src/sql/ob_spi.cpp b/src/sql/ob_spi.cpp index 3ee255d770..cdbbbe7490 100644 --- a/src/sql/ob_spi.cpp +++ b/src/sql/ob_spi.cpp @@ -4143,22 +4143,38 @@ int ObSPIService::do_cursor_fetch(ObPLExecCtx *ctx, if (OB_FAIL(ret)) { } else if (cursor->is_need_check_snapshot()) { /* case: select * from dual, snapshot do not initilize, so it's invalid */ + /* 流式场景: + 非for update skip lock:isvalid=false,或者commited=true,则报错 + for update skip lock检查逻辑和for update一致 + 非流式场景: + forupdate:isvalid=false,或者commited=true,则报错,否则继续检查事务状态 + 非forupdate,直接看isvalid是否为true */ if (lib::is_oracle_mode()) { - if (!cursor->get_snapshot().valid_) { + if (cursor->is_for_update()) { + if (!cursor->get_snapshot().is_valid() || cursor->get_snapshot().is_committed()) { + ret = OB_ERR_FETCH_OUT_SEQUENCE; + LOG_WARN("snapshot is invalid", K(cursor->get_snapshot()), K(ret)); + } else { + transaction::ObTransID tx_id = cursor->get_trans_id(); + transaction::ObTransService* txs = MTL(transaction::ObTransService*); + bool tx_active = false; + CK (OB_NOT_NULL(txs)); + CK (tx_id.is_valid()); + OZ (txs->is_tx_active(tx_id, tx_active), tx_id); + if (OB_SUCC(ret) && !tx_active) { + ret = OB_ERR_FETCH_OUT_SEQUENCE; + LOG_WARN("cursor has been closed because of txn was terminated", + K(ret), K(tx_id), K(cursor->get_snapshot())); + } + } + } else if (cursor->is_streaming()) { + if (!cursor->get_snapshot().is_valid() || cursor->get_snapshot().is_committed()) { + ret = OB_ERR_FETCH_OUT_SEQUENCE; + LOG_WARN("snapshot is invalid", K(cursor->get_snapshot()), K(ret)); + } + } else if (!cursor->get_snapshot().is_valid()) { ret = OB_ERR_FETCH_OUT_SEQUENCE; LOG_WARN("snapshot is invalid", K(cursor->get_snapshot()), K(ret)); - } else if (cursor->is_for_update()) { - transaction::ObTransID tx_id = cursor->get_trans_id(); - transaction::ObTransService* txs = MTL(transaction::ObTransService*); - bool tx_active = false; - CK (OB_NOT_NULL(txs)); - CK (tx_id.is_valid()); - OZ (txs->is_tx_active(tx_id, tx_active), tx_id); - if (OB_SUCC(ret) && !tx_active) { - ret = OB_ERR_FETCH_OUT_SEQUENCE; - LOG_WARN("cursor has been closed because of txn was terminated", - K(ret), K(tx_id), K(cursor->get_snapshot())); - } } } } diff --git a/src/storage/tx/ob_trans_define_v4.cpp b/src/storage/tx/ob_trans_define_v4.cpp index 0e12770b95..c8b42bdd14 100644 --- a/src/storage/tx/ob_trans_define_v4.cpp +++ b/src/storage/tx/ob_trans_define_v4.cpp @@ -192,7 +192,8 @@ OB_SERIALIZE_MEMBER(ObTxReadSnapshot, uncertain_bound_, snapshot_lsid_, parts_, - snapshot_ls_role_); + snapshot_ls_role_, + committed_); OB_SERIALIZE_MEMBER(ObTxPart, id_, addr_, epoch_, first_scn_, last_scn_); DEFINE_SERIALIZE(ObTxDesc::FLAG::FOR_FIXED_SER_VAL) @@ -1160,6 +1161,7 @@ ObTxSnapshot &ObTxSnapshot::operator=(const ObTxSnapshot &r) ObTxReadSnapshot::ObTxReadSnapshot() : valid_(false), + committed_(false), core_(), source_(SRC::INVL), snapshot_lsid_(), @@ -1171,6 +1173,7 @@ ObTxReadSnapshot::ObTxReadSnapshot() ObTxReadSnapshot::~ObTxReadSnapshot() { valid_ = false; + committed_ = false; source_ = SRC::INVL; snapshot_ls_role_ = common::INVALID_ROLE; uncertain_bound_ = 0; @@ -1179,6 +1182,7 @@ ObTxReadSnapshot::~ObTxReadSnapshot() void ObTxReadSnapshot::reset() { valid_ = false; + committed_ = false; core_.reset(); source_ = SRC::INVL; snapshot_lsid_.reset(); @@ -1191,6 +1195,7 @@ int ObTxReadSnapshot::assign(const ObTxReadSnapshot &from) { int ret = OB_SUCCESS; valid_ = from.valid_; + committed_ = from.committed_; core_ = from.core_; source_ = from.source_; snapshot_lsid_ = from.snapshot_lsid_; diff --git a/src/storage/tx/ob_trans_define_v4.h b/src/storage/tx/ob_trans_define_v4.h index c6af0a497b..4348e0bd8c 100644 --- a/src/storage/tx/ob_trans_define_v4.h +++ b/src/storage/tx/ob_trans_define_v4.h @@ -248,7 +248,8 @@ struct ObTxSnapshot // snapshot used to consistency read struct ObTxReadSnapshot { - bool valid_; + bool valid_; // used by cursor check snapshot state + bool committed_; // used by cursor check snapshot state ObTxSnapshot core_; enum class SRC { INVL = 0, @@ -275,6 +276,8 @@ struct ObTxReadSnapshot bool is_none_read() const { return SRC::NONE == source_; } bool is_special() const { return SRC::SPECIAL == source_; } bool is_ls_snapshot() const { return SRC::LS == source_; } + bool is_valid() const { return valid_; } + bool is_committed() const { return committed_; } int format_source_for_display(char *buf, const int64_t buf_len) const; void reset(); int assign(const ObTxReadSnapshot &); @@ -287,7 +290,8 @@ struct ObTxReadSnapshot K_(uncertain_bound), K_(snapshot_lsid), K_(snapshot_ls_role), - K_(parts)); + K_(parts), + K_(committed)); OB_UNIS_VERSION(1); }; diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index 79f7304811..2138bae116 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -628,13 +628,17 @@ void ObTransService::invalid_registered_snapshot_(ObTxDesc &tx) } } -void ObTransService::registered_snapshot_clear_part_(ObTxDesc &tx) +void ObTransService::process_registered_snapshot_on_commit_(ObTxDesc &tx) { + // cleanup snapshot's participant info, so that they will skip + // verify participant txn ctx, which cause false negative, + // because txn ctx has quit when txn committed. int ret = OB_SUCCESS; ARRAY_FOREACH(tx.savepoints_, i) { ObTxSavePoint &p = tx.savepoints_[i]; if (p.is_snapshot() && p.snapshot_->valid_) { p.snapshot_->parts_.reset(); + p.snapshot_->committed_ = true; } } } diff --git a/src/storage/tx/ob_trans_service_v4.h b/src/storage/tx/ob_trans_service_v4.h index 3ef7af9de0..b834c7777b 100644 --- a/src/storage/tx/ob_trans_service_v4.h +++ b/src/storage/tx/ob_trans_service_v4.h @@ -347,7 +347,7 @@ int update_user_savepoint_(ObTxDesc &tx, const ObTxSavePointList &savepoints); private: ObTxCtxMgr tx_ctx_mgr_; void invalid_registered_snapshot_(ObTxDesc &tx); -void registered_snapshot_clear_part_(ObTxDesc &tx); +void process_registered_snapshot_on_commit_(ObTxDesc &tx); int ls_rollback_to_savepoint_(const ObTransID &tx_id, const share::ObLSID &ls, const int64_t verify_epoch, diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index c12086155b..232d689830 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -1907,20 +1907,10 @@ int ObTransService::add_tx_exec_result(ObTxDesc &tx, const ObTxExecResult &exec_ void ObTransService::tx_post_terminate_(ObTxDesc &tx) { // invalid registered snapshot - if (tx.state_ == ObTxDesc::State::ABORTED || - tx.is_commit_unsucc() - // FIXME: (yunxing.cyx) - // bellow line is temproary, when storage layer support - // record txn-id in SSTable's MvccRow, we can remove this - // (the support was planed in v4.1) - || tx.state_ == ObTxDesc::State::COMMITTED - ) { + if (tx.state_ == ObTxDesc::State::ABORTED || tx.is_commit_unsucc()) { invalid_registered_snapshot_(tx); } else if (tx.state_ == ObTxDesc::State::COMMITTED) { - // cleanup snapshot's participant info, so that they will skip - // verify participant txn ctx, which cause false negative, - // because txn ctx has quit when txn committed. - registered_snapshot_clear_part_(tx); + process_registered_snapshot_on_commit_(tx); } // statistic if (tx.is_tx_end()) {