From b3db035771a0b232233b0a9657873dc69f6b0ef6 Mon Sep 17 00:00:00 2001 From: chinaxing Date: Mon, 17 Jul 2023 13:12:20 +0000 Subject: [PATCH] [master] tx_desc used by px_worker relese without lock protected --- src/observer/ob_inner_sql_connection.cpp | 5 ++++- src/observer/table/ob_table_context.cpp | 4 +++- src/observer/table/ob_table_session_pool.h | 7 +++++-- src/sql/engine/cmd/ob_xa_executor.cpp | 2 +- src/sql/engine/px/ob_px_sqc_proxy.cpp | 1 + src/sql/engine/px/ob_px_task_process.cpp | 2 ++ src/sql/session/ob_basic_session_info.cpp | 3 ++- .../ob_multi_version_garbage_collector.cpp | 2 ++ 8 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/observer/ob_inner_sql_connection.cpp b/src/observer/ob_inner_sql_connection.cpp index 621647f6b..e6e80a49c 100644 --- a/src/observer/ob_inner_sql_connection.cpp +++ b/src/observer/ob_inner_sql_connection.cpp @@ -168,7 +168,10 @@ ObInnerSQLConnection::~ObInnerSQLConnection() if (OB_SUCC(guard.switch_to(inner_session_.get_tx_desc()->get_tenant_id(), false))) { MTL(transaction::ObTransService*)->release_tx(*inner_session_.get_tx_desc()); } - inner_session_.get_tx_desc() = NULL; + { + ObSQLSessionInfo::LockGuard guard(inner_session_.get_thread_data_lock()); + inner_session_.get_tx_desc() = NULL; + } } } diff --git a/src/observer/table/ob_table_context.cpp b/src/observer/table/ob_table_context.cpp index 60a8c71fb..7d6cea698 100644 --- a/src/observer/table/ob_table_context.cpp +++ b/src/observer/table/ob_table_context.cpp @@ -1024,7 +1024,9 @@ int ObTableCtx::init_trans(transaction::ObTxDesc *trans_desc, ret = OB_ERR_UNEXPECTED; LOG_WARN("trans desc is null", K(ret)); } else { - get_session_info().get_tx_desc() = trans_desc; + sql::ObSQLSessionInfo &session = get_session_info(); + sql::ObSQLSessionInfo::LockGuard guard(session.get_thread_data_lock()); + session.get_tx_desc() = trans_desc; exec_ctx_.get_das_ctx().set_snapshot(tx_snapshot); } diff --git a/src/observer/table/ob_table_session_pool.h b/src/observer/table/ob_table_session_pool.h index c4e7cd817..6389d7455 100644 --- a/src/observer/table/ob_table_session_pool.h +++ b/src/observer/table/ob_table_session_pool.h @@ -189,7 +189,10 @@ public: sql::ObSQLSessionInfo& get_sess_info() { return sess_info_; } const sql::ObSQLSessionInfo& get_sess_info() const { return sess_info_; } int init_sess_info(); - void reset_tx_desc() { sess_info_.get_tx_desc() = nullptr; } // 防止异步提交场景在 session 析构的时候 rollback 事务 + void reset_tx_desc() { // 防止异步提交场景在 session 析构的时候 rollback 事务 + sql::ObSQLSessionInfo::LockGuard guard(sess_info_.get_thread_data_lock()); + sess_info_.get_tx_desc() = nullptr; + } void give_back_to_free_list(); private: common::ObArenaAllocator allocator_; @@ -391,4 +394,4 @@ private: } // end namespace table } // end namespace oceanbase -#endif /* OCEANBASE_OBSERVER_OB_TABLE_SESSION_POOL_H_ */ \ No newline at end of file +#endif /* OCEANBASE_OBSERVER_OB_TABLE_SESSION_POOL_H_ */ diff --git a/src/sql/engine/cmd/ob_xa_executor.cpp b/src/sql/engine/cmd/ob_xa_executor.cpp index eeb65ebd0..382fc0fc7 100644 --- a/src/sql/engine/cmd/ob_xa_executor.cpp +++ b/src/sql/engine/cmd/ob_xa_executor.cpp @@ -62,7 +62,7 @@ int ObPlXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt) stmt.get_format_id()))) { LOG_WARN("set xid error", K(ret), K(stmt)); } else if (my_session->get_in_transaction()) { - ObTxDesc *&tx_desc = my_session->get_tx_desc(); + auto tx_desc = my_session->get_tx_desc(); ret = OB_TRANS_XA_OUTSIDE; LOG_WARN("already start trans", K(ret), K(xid), K(tx_desc->tid()), K(tx_desc->get_xid())); diff --git a/src/sql/engine/px/ob_px_sqc_proxy.cpp b/src/sql/engine/px/ob_px_sqc_proxy.cpp index 952b0b9d3..54f717486 100644 --- a/src/sql/engine/px/ob_px_sqc_proxy.cpp +++ b/src/sql/engine/px/ob_px_sqc_proxy.cpp @@ -467,6 +467,7 @@ int ObPxSQCProxy::report(int end_ret) const (void)MTL(transaction::ObTransService*)->merge_tx_state(*sqc_tx_desc, *task_tx_desc); (void)MTL(transaction::ObTransService*)->release_tx(*task_tx_desc); } else { + sql::ObSQLSessionInfo::LockGuard guard(session->get_thread_data_lock()); sqc_tx_desc = task_tx_desc; } task_tx_desc = NULL; diff --git a/src/sql/engine/px/ob_px_task_process.cpp b/src/sql/engine/px/ob_px_task_process.cpp index 2ea52a7d8..ef1fe746e 100644 --- a/src/sql/engine/px/ob_px_task_process.cpp +++ b/src/sql/engine/px/ob_px_task_process.cpp @@ -544,6 +544,8 @@ int ObPxTaskProcess::record_tx_desc() CK (OB_NOT_NULL(cur_exec_ctx = arg_.exec_ctx_)); CK (OB_NOT_NULL(cur_session = cur_exec_ctx->get_my_session())); if (OB_SUCC(ret) && !arg_.sqc_task_ptr_->is_use_local_thread()) { + // move session's tx_desc to task, accumulate when sqc report + ObSQLSessionInfo::LockGuard guard(cur_session->get_thread_data_lock()); transaction::ObTxDesc *&cur_tx_desc = cur_session->get_tx_desc(); if (OB_NOT_NULL(cur_tx_desc)) { transaction::ObTxDesc *&task_tx_desc = arg_.sqc_task_ptr_->get_tx_desc(); diff --git a/src/sql/session/ob_basic_session_info.cpp b/src/sql/session/ob_basic_session_info.cpp index d819e2461..31a5d7701 100644 --- a/src/sql/session/ob_basic_session_info.cpp +++ b/src/sql/session/ob_basic_session_info.cpp @@ -286,6 +286,7 @@ void ObBasicSessionInfo::clean_status() sql_scope_flags_.reset(); trans_spec_status_ = TRANS_SPEC_NOT_SET; if (OB_NOT_NULL(tx_desc_)) { + LockGuard lock_guard(thread_data_mutex_); int ret = OB_SUCCESS; MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); if (OB_SUCC(guard.switch_to(tx_desc_->get_tenant_id(), false))) { @@ -3729,7 +3730,7 @@ int64_t ObBasicSessionInfo::to_string(char *buf, const int64_t buf_len) const bool ac = false; get_autocommit(ac), J_OBJ_START(); - J_KV(KP(this), "id", sessid_, + J_KV(KP(this), "id", sessid_, "deser", is_deserialized_, N_TENANT, get_tenant_name(), "tenant_id", tenant_id_, N_EFFECTIVE_TENANT, get_effective_tenant_name(), "effective_tenant_id", effective_tenant_id_, N_DATABASE, get_database_name(), diff --git a/src/storage/concurrency_control/ob_multi_version_garbage_collector.cpp b/src/storage/concurrency_control/ob_multi_version_garbage_collector.cpp index 51db719a0..a0887fec7 100644 --- a/src/storage/concurrency_control/ob_multi_version_garbage_collector.cpp +++ b/src/storage/concurrency_control/ob_multi_version_garbage_collector.cpp @@ -1344,6 +1344,8 @@ bool GetMinActiveSnapshotVersionFunctor::operator()(sql::ObSQLSessionMgr::Key ke } else if (false == sess_info->is_valid()) { ret = OB_INVALID_ARGUMENT; MVCC_LOG(WARN, "session info is not valid", K(ret)); + } else if (sess_info->get_is_deserialized()) { + // skip deserialized session, only visit the original } else if (MTL_ID() == sess_info->get_effective_tenant_id()) { sql::ObSQLSessionInfo::LockGuard data_lock_guard(sess_info->get_thread_data_lock()); share::SCN snapshot_version(share::SCN::max_scn());