diff --git a/src/sql/ob_sql_trans_control.cpp b/src/sql/ob_sql_trans_control.cpp index cf34f601d5..f51704afda 100644 --- a/src/sql/ob_sql_trans_control.cpp +++ b/src/sql/ob_sql_trans_control.cpp @@ -560,6 +560,16 @@ int ObSqlTransControl::start_stmt(ObExecContext &exec_ctx) LOG_WARN("call sql stmt end hook fail", K(tmp_ret)); } } + + if (OB_SUCC(ret) + && !ObSQLUtils::is_nested_sql(&exec_ctx) + && das_ctx.get_snapshot().core_.version_.is_valid()) { + // maintain the read snapshot version on session for multi-version garbage + // colloecor. It is maintained for all cases except remote exection with ac + // = 1. So we need carefully design the version for the corner case. + session->set_reserved_snapshot_version(das_ctx.get_snapshot().core_.version_); + } + bool print_log = false; #ifndef NDEBUG print_log = true; @@ -911,6 +921,10 @@ int ObSqlTransControl::end_stmt(ObExecContext &exec_ctx, const bool rollback) ret = COVER_SUCC(tmp_ret); } + if (OB_SUCC(ret) && !ObSQLUtils::is_nested_sql(&exec_ctx)) { + session->reset_reserved_snapshot_version(); + } + bool print_log = false; #ifndef NDEBUG print_log = true; diff --git a/src/sql/session/ob_basic_session_info.cpp b/src/sql/session/ob_basic_session_info.cpp index 9e5e5f7cd6..bd4e412d91 100644 --- a/src/sql/session/ob_basic_session_info.cpp +++ b/src/sql/session/ob_basic_session_info.cpp @@ -70,7 +70,7 @@ ObBasicSessionInfo::ObBasicSessionInfo() sys_var_base_version_(OB_INVALID_VERSION), tx_desc_(NULL), tx_result_(), - unused_read_snapshot_version_(), + reserved_read_snapshot_version_(), xid_(), associated_xa_(false), cached_tenant_config_version_(0), @@ -404,7 +404,7 @@ void ObBasicSessionInfo::reset(bool skip_sys_var) sys_var_base_version_ = CACHED_SYS_VAR_VERSION; } curr_trans_last_stmt_end_time_ = 0; - unused_read_snapshot_version_.reset(); + reserved_read_snapshot_version_.reset(); check_sys_variable_ = true; is_foreign_key_cascade_ = false; is_foreign_key_check_exist_ = false; @@ -4258,7 +4258,7 @@ OB_DEF_SERIALIZE(ObBasicSessionInfo) nested_count_, thread_data_.user_name_, next_tx_isolation_, - unused_read_snapshot_version_, + reserved_read_snapshot_version_, check_sys_variable_, unused_weak_read_snapshot_source, database_id_, @@ -4453,7 +4453,7 @@ OB_DEF_DESERIALIZE(ObBasicSessionInfo) nested_count_, thread_data_.user_name_, next_tx_isolation_, - unused_read_snapshot_version_, + reserved_read_snapshot_version_, check_sys_variable_, unused_weak_read_snapshot_source, database_id_, @@ -4770,7 +4770,7 @@ OB_DEF_SERIALIZE_SIZE(ObBasicSessionInfo) nested_count_, thread_data_.user_name_, next_tx_isolation_, - unused_read_snapshot_version_, + reserved_read_snapshot_version_, check_sys_variable_, unused_weak_read_snapshot_source, database_id_, diff --git a/src/sql/session/ob_basic_session_info.h b/src/sql/session/ob_basic_session_info.h index bc8d422361..e322e6656c 100644 --- a/src/sql/session/ob_basic_session_info.h +++ b/src/sql/session/ob_basic_session_info.h @@ -1237,6 +1237,9 @@ public: void set_is_in_user_scope(bool value) { sql_scope_flags_.set_is_in_user_scope(value); } bool is_in_user_scope() const { return sql_scope_flags_.is_in_user_scope(); } SqlScopeFlags &get_sql_scope_flags() { return sql_scope_flags_; } + share::SCN get_reserved_snapshot_version() const { return reserved_read_snapshot_version_; } + void set_reserved_snapshot_version(const share::SCN snapshot_version) { reserved_read_snapshot_version_ = snapshot_version; } + void reset_reserved_snapshot_version() { reserved_read_snapshot_version_.reset(); } bool get_check_sys_variable() { return check_sys_variable_; } void set_check_sys_variable(bool check_sys_variable) { check_sys_variable_ = check_sys_variable; } @@ -1990,7 +1993,12 @@ private: protected: transaction::ObTxDesc *tx_desc_; transaction::ObTxExecResult tx_result_; // TODO: move to QueryCtx/ExecCtx - share::SCN unused_read_snapshot_version_;//serialize compatibility preserved + // reserved read snapshot version for current or previous stmt in the txn. And + // it is used for multi-version garbage colloector to collect ative snapshot. + // While it may be empty for the txn with ac = 1 and remote execution whose + // snapshot version is generated from remote server(called by start_stmt). So + // use it only query is active and version is valid. + share::SCN reserved_read_snapshot_version_; transaction::ObXATransID xid_; bool associated_xa_; // session joined distr-xa-trans by xa-start int64_t cached_tenant_config_version_; diff --git a/src/storage/concurrency_control/ob_multi_version_garbage_collector.cpp b/src/storage/concurrency_control/ob_multi_version_garbage_collector.cpp index d52138e33f..9cf2a26da0 100644 --- a/src/storage/concurrency_control/ob_multi_version_garbage_collector.cpp +++ b/src/storage/concurrency_control/ob_multi_version_garbage_collector.cpp @@ -1347,46 +1347,91 @@ bool GetMinActiveSnapshotVersionFunctor::operator()(sql::ObSQLSessionMgr::Key ke share::SCN snapshot_version(share::SCN::max_scn()); if (sess_info->is_in_transaction()) { + share::SCN desc_snapshot; transaction::ObTxDesc *tx_desc = nullptr; + share::SCN sess_snapshot = sess_info->get_reserved_snapshot_version(); if (OB_ISNULL(tx_desc = sess_info->get_tx_desc())) { - MVCC_LOG(WARN, "tx desc is nullptr", KPC(sess_info)); + ret = OB_ERR_UNEXPECTED; + MVCC_LOG(ERROR, "tx desc is nullptr", K(ret), KPC(sess_info)); + } else if (FALSE_IT(desc_snapshot = tx_desc->get_snapshot_version())) { } else if (transaction::ObTxIsolationLevel::SERIAL == tx_desc->get_isolation_level() || transaction::ObTxIsolationLevel::RR == tx_desc->get_isolation_level()) { - // Case 1: RR/SI with tx desc exists, so we can get snapshot version from tx desc - share::SCN tmp_snapshot_version = tx_desc->get_snapshot_version(); - if (tmp_snapshot_version.is_valid()) { - snapshot_version = tmp_snapshot_version; + // Case 1: RR/SI with tx desc exists, it means the snapshot is get from + // scheduler and must maintained in the session and tx desc + if (desc_snapshot.is_valid()) { + snapshot_version = desc_snapshot; } MVCC_LOG(DEBUG, "RR/SI txn with tx_desc", K(MTL_ID()), KPC(tx_desc), KPC(sess_info), - K(snapshot_version), K(min_active_snapshot_version_)); + K(snapshot_version), K(min_active_snapshot_version_), K(desc_snapshot), + K(sess_snapshot)); } else if (transaction::ObTxIsolationLevel::RC == tx_desc->get_isolation_level()) { - // Case 2: RC with tx desc exists, while the snapshot version is not - // maintained, so we use query start time instead - // TODO(handora.qc): use better snapshot version + // Case 2: RC with tx desc exists, it may exists that snapshot is get from + // the executor and not maintained in the session and tx desc. So we need + // use session query start time carefully if (sql::ObSQLSessionState::QUERY_ACTIVE == sess_info->get_session_state()) { - snapshot_version.convert_from_ts(sess_info->get_cur_state_start_time()); + if (desc_snapshot.is_valid()) { + snapshot_version = desc_snapshot; + } else if (sess_snapshot.is_valid()) { + snapshot_version = sess_snapshot; + } else { + // We gave a 5 minutes redundancy when get from session query start + // time under the case that local snapshot from tx_desc and session + // is unusable + snapshot_version.convert_from_ts(sess_info->get_cur_state_start_time() + - 5L * 1000L * 1000L * 60L); + MVCC_LOG(INFO, "RC txn with tx_desc while from session start time", + K(MTL_ID()), KPC(tx_desc), KPC(sess_info), K(snapshot_version), + K(min_active_snapshot_version_), K(sess_info->get_cur_state_start_time())); + } } MVCC_LOG(DEBUG, "RC txn with tx_desc", K(MTL_ID()), KPC(tx_desc), KPC(sess_info), - K(snapshot_version), K(min_active_snapshot_version_)); + K(snapshot_version), K(min_active_snapshot_version_), K(desc_snapshot), + K(sess_snapshot)); } else { - MVCC_LOG(WARN, "unknown txn with tx_desc", K(MTL_ID()), KPC(tx_desc), KPC(sess_info), + MVCC_LOG(INFO, "unknown txn with tx_desc", K(MTL_ID()), KPC(tx_desc), KPC(sess_info), K(snapshot_version), K(min_active_snapshot_version_)); } } else { + share::SCN sess_snapshot = sess_info->get_reserved_snapshot_version(); if (transaction::ObTxIsolationLevel::SERIAL == sess_info->get_tx_isolation() || transaction::ObTxIsolationLevel::RR == sess_info->get_tx_isolation()) { - // Case 3: RR/SI with tx desc does not exist, it is not for the scheduler - MVCC_LOG(DEBUG, "RR/SI txn with non tx_desc", K(MTL_ID()), KPC(sess_info), - K(snapshot_version), K(min_active_snapshot_version_)); - } else if (transaction::ObTxIsolationLevel::RC == sess_info->get_tx_isolation()) { - // Case 4: RC with tx desc does not exist, while the snapshot version is not - // maintained, so we use query start time instead - // TODO(handora.qc): use better snapshot version + // Case 3: RR/SI with tx desc does not exist or not in tx, it is not for + // the current running scheduler if (sql::ObSQLSessionState::QUERY_ACTIVE == sess_info->get_session_state()) { - snapshot_version.convert_from_ts(sess_info->get_cur_state_start_time()); + if (sess_snapshot.is_valid()) { + snapshot_version = sess_snapshot; + } else { + // We gave a 5 minutes redundancy when get from session query start + // time under the case that local snapshot from tx_desc and session + // is unusable + snapshot_version.convert_from_ts(sess_info->get_cur_state_start_time() + - 5L * 1000L * 1000L * 60L); + MVCC_LOG(INFO, "RR/SI txn with non tx_desc while from session start time", + K(MTL_ID()), KPC(sess_info), K(snapshot_version), K(sess_snapshot), + K(min_active_snapshot_version_), K(sess_info->get_cur_state_start_time())); + } + } + MVCC_LOG(DEBUG, "RR/SI txn with non tx_desc", K(MTL_ID()), KPC(sess_info), + K(snapshot_version), K(min_active_snapshot_version_), K(sess_snapshot)); + } else if (transaction::ObTxIsolationLevel::RC == sess_info->get_tx_isolation()) { + // Case 4: RC with tx desc does not exist, and the snapshot version may not + // maintained, so we use query start time instead + if (sql::ObSQLSessionState::QUERY_ACTIVE == sess_info->get_session_state()) { + if (sess_snapshot.is_valid()) { + snapshot_version = sess_snapshot; + } else { + // We gave a 5 minutes redundancy when get from session query start + // time under the case that local snapshot from tx_desc and session + // is unusable + snapshot_version.convert_from_ts(sess_info->get_cur_state_start_time() + - 5L * 1000L * 1000L * 60L); + MVCC_LOG(INFO, "RC txn with non tx_desc while from session start time", + K(MTL_ID()), KPC(sess_info), K(snapshot_version), K(sess_snapshot), + K(min_active_snapshot_version_), K(sess_info->get_cur_state_start_time())); + } } MVCC_LOG(DEBUG, "RC txn with non tx_desc", K(MTL_ID()), KPC(sess_info), - K(snapshot_version), K(min_active_snapshot_version_)); + K(snapshot_version), K(min_active_snapshot_version_), K(sess_snapshot)); } else { MVCC_LOG(INFO, "unknown txn with non tx_desc", K(MTL_ID()), KPC(sess_info), K(snapshot_version), K(min_active_snapshot_version_));