[BUG] fix 4138 because unclear session timestamp

This commit is contained in:
Handora
2023-06-06 07:42:32 +00:00
committed by ob-robot
parent f16a8319fc
commit 82b923c9db
4 changed files with 94 additions and 27 deletions

View File

@ -560,6 +560,16 @@ int ObSqlTransControl::start_stmt(ObExecContext &exec_ctx)
LOG_WARN("call sql stmt end hook fail", K(tmp_ret)); LOG_WARN("call sql stmt end hook fail", K(tmp_ret));
} }
} }
if (OB_SUCC(ret)
&& !ObSQLUtils::is_nested_sql(&exec_ctx)
&& das_ctx.get_snapshot().core_.version_.is_valid()) {
// maintain the read snapshot version on session for multi-version garbage
// colloecor. It is maintained for all cases except remote exection with ac
// = 1. So we need carefully design the version for the corner case.
session->set_reserved_snapshot_version(das_ctx.get_snapshot().core_.version_);
}
bool print_log = false; bool print_log = false;
#ifndef NDEBUG #ifndef NDEBUG
print_log = true; print_log = true;
@ -911,6 +921,10 @@ int ObSqlTransControl::end_stmt(ObExecContext &exec_ctx, const bool rollback)
ret = COVER_SUCC(tmp_ret); ret = COVER_SUCC(tmp_ret);
} }
if (OB_SUCC(ret) && !ObSQLUtils::is_nested_sql(&exec_ctx)) {
session->reset_reserved_snapshot_version();
}
bool print_log = false; bool print_log = false;
#ifndef NDEBUG #ifndef NDEBUG
print_log = true; print_log = true;

View File

@ -70,7 +70,7 @@ ObBasicSessionInfo::ObBasicSessionInfo()
sys_var_base_version_(OB_INVALID_VERSION), sys_var_base_version_(OB_INVALID_VERSION),
tx_desc_(NULL), tx_desc_(NULL),
tx_result_(), tx_result_(),
unused_read_snapshot_version_(), reserved_read_snapshot_version_(),
xid_(), xid_(),
associated_xa_(false), associated_xa_(false),
cached_tenant_config_version_(0), cached_tenant_config_version_(0),
@ -404,7 +404,7 @@ void ObBasicSessionInfo::reset(bool skip_sys_var)
sys_var_base_version_ = CACHED_SYS_VAR_VERSION; sys_var_base_version_ = CACHED_SYS_VAR_VERSION;
} }
curr_trans_last_stmt_end_time_ = 0; curr_trans_last_stmt_end_time_ = 0;
unused_read_snapshot_version_.reset(); reserved_read_snapshot_version_.reset();
check_sys_variable_ = true; check_sys_variable_ = true;
is_foreign_key_cascade_ = false; is_foreign_key_cascade_ = false;
is_foreign_key_check_exist_ = false; is_foreign_key_check_exist_ = false;
@ -4258,7 +4258,7 @@ OB_DEF_SERIALIZE(ObBasicSessionInfo)
nested_count_, nested_count_,
thread_data_.user_name_, thread_data_.user_name_,
next_tx_isolation_, next_tx_isolation_,
unused_read_snapshot_version_, reserved_read_snapshot_version_,
check_sys_variable_, check_sys_variable_,
unused_weak_read_snapshot_source, unused_weak_read_snapshot_source,
database_id_, database_id_,
@ -4453,7 +4453,7 @@ OB_DEF_DESERIALIZE(ObBasicSessionInfo)
nested_count_, nested_count_,
thread_data_.user_name_, thread_data_.user_name_,
next_tx_isolation_, next_tx_isolation_,
unused_read_snapshot_version_, reserved_read_snapshot_version_,
check_sys_variable_, check_sys_variable_,
unused_weak_read_snapshot_source, unused_weak_read_snapshot_source,
database_id_, database_id_,
@ -4770,7 +4770,7 @@ OB_DEF_SERIALIZE_SIZE(ObBasicSessionInfo)
nested_count_, nested_count_,
thread_data_.user_name_, thread_data_.user_name_,
next_tx_isolation_, next_tx_isolation_,
unused_read_snapshot_version_, reserved_read_snapshot_version_,
check_sys_variable_, check_sys_variable_,
unused_weak_read_snapshot_source, unused_weak_read_snapshot_source,
database_id_, database_id_,

View File

@ -1237,6 +1237,9 @@ public:
void set_is_in_user_scope(bool value) { sql_scope_flags_.set_is_in_user_scope(value); } void set_is_in_user_scope(bool value) { sql_scope_flags_.set_is_in_user_scope(value); }
bool is_in_user_scope() const { return sql_scope_flags_.is_in_user_scope(); } bool is_in_user_scope() const { return sql_scope_flags_.is_in_user_scope(); }
SqlScopeFlags &get_sql_scope_flags() { return sql_scope_flags_; } SqlScopeFlags &get_sql_scope_flags() { return sql_scope_flags_; }
share::SCN get_reserved_snapshot_version() const { return reserved_read_snapshot_version_; }
void set_reserved_snapshot_version(const share::SCN snapshot_version) { reserved_read_snapshot_version_ = snapshot_version; }
void reset_reserved_snapshot_version() { reserved_read_snapshot_version_.reset(); }
bool get_check_sys_variable() { return check_sys_variable_; } bool get_check_sys_variable() { return check_sys_variable_; }
void set_check_sys_variable(bool check_sys_variable) { check_sys_variable_ = check_sys_variable; } void set_check_sys_variable(bool check_sys_variable) { check_sys_variable_ = check_sys_variable; }
@ -1990,7 +1993,12 @@ private:
protected: protected:
transaction::ObTxDesc *tx_desc_; transaction::ObTxDesc *tx_desc_;
transaction::ObTxExecResult tx_result_; // TODO: move to QueryCtx/ExecCtx transaction::ObTxExecResult tx_result_; // TODO: move to QueryCtx/ExecCtx
share::SCN unused_read_snapshot_version_;//serialize compatibility preserved // reserved read snapshot version for current or previous stmt in the txn. And
// it is used for multi-version garbage colloector to collect ative snapshot.
// While it may be empty for the txn with ac = 1 and remote execution whose
// snapshot version is generated from remote server(called by start_stmt). So
// use it only query is active and version is valid.
share::SCN reserved_read_snapshot_version_;
transaction::ObXATransID xid_; transaction::ObXATransID xid_;
bool associated_xa_; // session joined distr-xa-trans by xa-start bool associated_xa_; // session joined distr-xa-trans by xa-start
int64_t cached_tenant_config_version_; int64_t cached_tenant_config_version_;

View File

@ -1347,46 +1347,91 @@ bool GetMinActiveSnapshotVersionFunctor::operator()(sql::ObSQLSessionMgr::Key ke
share::SCN snapshot_version(share::SCN::max_scn()); share::SCN snapshot_version(share::SCN::max_scn());
if (sess_info->is_in_transaction()) { if (sess_info->is_in_transaction()) {
share::SCN desc_snapshot;
transaction::ObTxDesc *tx_desc = nullptr; transaction::ObTxDesc *tx_desc = nullptr;
share::SCN sess_snapshot = sess_info->get_reserved_snapshot_version();
if (OB_ISNULL(tx_desc = sess_info->get_tx_desc())) { if (OB_ISNULL(tx_desc = sess_info->get_tx_desc())) {
MVCC_LOG(WARN, "tx desc is nullptr", KPC(sess_info)); ret = OB_ERR_UNEXPECTED;
MVCC_LOG(ERROR, "tx desc is nullptr", K(ret), KPC(sess_info));
} else if (FALSE_IT(desc_snapshot = tx_desc->get_snapshot_version())) {
} else if (transaction::ObTxIsolationLevel::SERIAL == tx_desc->get_isolation_level() || } else if (transaction::ObTxIsolationLevel::SERIAL == tx_desc->get_isolation_level() ||
transaction::ObTxIsolationLevel::RR == tx_desc->get_isolation_level()) { transaction::ObTxIsolationLevel::RR == tx_desc->get_isolation_level()) {
// Case 1: RR/SI with tx desc exists, so we can get snapshot version from tx desc // Case 1: RR/SI with tx desc exists, it means the snapshot is get from
share::SCN tmp_snapshot_version = tx_desc->get_snapshot_version(); // scheduler and must maintained in the session and tx desc
if (tmp_snapshot_version.is_valid()) { if (desc_snapshot.is_valid()) {
snapshot_version = tmp_snapshot_version; snapshot_version = desc_snapshot;
} }
MVCC_LOG(DEBUG, "RR/SI txn with tx_desc", K(MTL_ID()), KPC(tx_desc), KPC(sess_info), MVCC_LOG(DEBUG, "RR/SI txn with tx_desc", K(MTL_ID()), KPC(tx_desc), KPC(sess_info),
K(snapshot_version), K(min_active_snapshot_version_)); K(snapshot_version), K(min_active_snapshot_version_), K(desc_snapshot),
K(sess_snapshot));
} else if (transaction::ObTxIsolationLevel::RC == tx_desc->get_isolation_level()) { } else if (transaction::ObTxIsolationLevel::RC == tx_desc->get_isolation_level()) {
// Case 2: RC with tx desc exists, while the snapshot version is not // Case 2: RC with tx desc exists, it may exists that snapshot is get from
// maintained, so we use query start time instead // the executor and not maintained in the session and tx desc. So we need
// TODO(handora.qc): use better snapshot version // use session query start time carefully
if (sql::ObSQLSessionState::QUERY_ACTIVE == sess_info->get_session_state()) { if (sql::ObSQLSessionState::QUERY_ACTIVE == sess_info->get_session_state()) {
snapshot_version.convert_from_ts(sess_info->get_cur_state_start_time()); if (desc_snapshot.is_valid()) {
snapshot_version = desc_snapshot;
} else if (sess_snapshot.is_valid()) {
snapshot_version = sess_snapshot;
} else {
// We gave a 5 minutes redundancy when get from session query start
// time under the case that local snapshot from tx_desc and session
// is unusable
snapshot_version.convert_from_ts(sess_info->get_cur_state_start_time()
- 5L * 1000L * 1000L * 60L);
MVCC_LOG(INFO, "RC txn with tx_desc while from session start time",
K(MTL_ID()), KPC(tx_desc), KPC(sess_info), K(snapshot_version),
K(min_active_snapshot_version_), K(sess_info->get_cur_state_start_time()));
}
} }
MVCC_LOG(DEBUG, "RC txn with tx_desc", K(MTL_ID()), KPC(tx_desc), KPC(sess_info), MVCC_LOG(DEBUG, "RC txn with tx_desc", K(MTL_ID()), KPC(tx_desc), KPC(sess_info),
K(snapshot_version), K(min_active_snapshot_version_)); K(snapshot_version), K(min_active_snapshot_version_), K(desc_snapshot),
K(sess_snapshot));
} else { } else {
MVCC_LOG(WARN, "unknown txn with tx_desc", K(MTL_ID()), KPC(tx_desc), KPC(sess_info), MVCC_LOG(INFO, "unknown txn with tx_desc", K(MTL_ID()), KPC(tx_desc), KPC(sess_info),
K(snapshot_version), K(min_active_snapshot_version_)); K(snapshot_version), K(min_active_snapshot_version_));
} }
} else { } else {
share::SCN sess_snapshot = sess_info->get_reserved_snapshot_version();
if (transaction::ObTxIsolationLevel::SERIAL == sess_info->get_tx_isolation() || if (transaction::ObTxIsolationLevel::SERIAL == sess_info->get_tx_isolation() ||
transaction::ObTxIsolationLevel::RR == sess_info->get_tx_isolation()) { transaction::ObTxIsolationLevel::RR == sess_info->get_tx_isolation()) {
// Case 3: RR/SI with tx desc does not exist, it is not for the scheduler // Case 3: RR/SI with tx desc does not exist or not in tx, it is not for
MVCC_LOG(DEBUG, "RR/SI txn with non tx_desc", K(MTL_ID()), KPC(sess_info), // the current running scheduler
K(snapshot_version), K(min_active_snapshot_version_));
} else if (transaction::ObTxIsolationLevel::RC == sess_info->get_tx_isolation()) {
// Case 4: RC with tx desc does not exist, while the snapshot version is not
// maintained, so we use query start time instead
// TODO(handora.qc): use better snapshot version
if (sql::ObSQLSessionState::QUERY_ACTIVE == sess_info->get_session_state()) { if (sql::ObSQLSessionState::QUERY_ACTIVE == sess_info->get_session_state()) {
snapshot_version.convert_from_ts(sess_info->get_cur_state_start_time()); if (sess_snapshot.is_valid()) {
snapshot_version = sess_snapshot;
} else {
// We gave a 5 minutes redundancy when get from session query start
// time under the case that local snapshot from tx_desc and session
// is unusable
snapshot_version.convert_from_ts(sess_info->get_cur_state_start_time()
- 5L * 1000L * 1000L * 60L);
MVCC_LOG(INFO, "RR/SI txn with non tx_desc while from session start time",
K(MTL_ID()), KPC(sess_info), K(snapshot_version), K(sess_snapshot),
K(min_active_snapshot_version_), K(sess_info->get_cur_state_start_time()));
}
}
MVCC_LOG(DEBUG, "RR/SI txn with non tx_desc", K(MTL_ID()), KPC(sess_info),
K(snapshot_version), K(min_active_snapshot_version_), K(sess_snapshot));
} else if (transaction::ObTxIsolationLevel::RC == sess_info->get_tx_isolation()) {
// Case 4: RC with tx desc does not exist, and the snapshot version may not
// maintained, so we use query start time instead
if (sql::ObSQLSessionState::QUERY_ACTIVE == sess_info->get_session_state()) {
if (sess_snapshot.is_valid()) {
snapshot_version = sess_snapshot;
} else {
// We gave a 5 minutes redundancy when get from session query start
// time under the case that local snapshot from tx_desc and session
// is unusable
snapshot_version.convert_from_ts(sess_info->get_cur_state_start_time()
- 5L * 1000L * 1000L * 60L);
MVCC_LOG(INFO, "RC txn with non tx_desc while from session start time",
K(MTL_ID()), KPC(sess_info), K(snapshot_version), K(sess_snapshot),
K(min_active_snapshot_version_), K(sess_info->get_cur_state_start_time()));
}
} }
MVCC_LOG(DEBUG, "RC txn with non tx_desc", K(MTL_ID()), KPC(sess_info), MVCC_LOG(DEBUG, "RC txn with non tx_desc", K(MTL_ID()), KPC(sess_info),
K(snapshot_version), K(min_active_snapshot_version_)); K(snapshot_version), K(min_active_snapshot_version_), K(sess_snapshot));
} else { } else {
MVCC_LOG(INFO, "unknown txn with non tx_desc", K(MTL_ID()), KPC(sess_info), MVCC_LOG(INFO, "unknown txn with non tx_desc", K(MTL_ID()), KPC(sess_info),
K(snapshot_version), K(min_active_snapshot_version_)); K(snapshot_version), K(min_active_snapshot_version_));