[master] tx_desc used by px_worker relese without lock protected

This commit is contained in:
chinaxing 2023-07-17 13:12:20 +00:00 committed by ob-robot
parent a1d13b8c78
commit b3db035771
8 changed files with 20 additions and 6 deletions

View File

@ -168,7 +168,10 @@ ObInnerSQLConnection::~ObInnerSQLConnection()
if (OB_SUCC(guard.switch_to(inner_session_.get_tx_desc()->get_tenant_id(), false))) {
MTL(transaction::ObTransService*)->release_tx(*inner_session_.get_tx_desc());
}
inner_session_.get_tx_desc() = NULL;
{
ObSQLSessionInfo::LockGuard guard(inner_session_.get_thread_data_lock());
inner_session_.get_tx_desc() = NULL;
}
}
}

View File

@ -1024,7 +1024,9 @@ int ObTableCtx::init_trans(transaction::ObTxDesc *trans_desc,
ret = OB_ERR_UNEXPECTED;
LOG_WARN("trans desc is null", K(ret));
} else {
get_session_info().get_tx_desc() = trans_desc;
sql::ObSQLSessionInfo &session = get_session_info();
sql::ObSQLSessionInfo::LockGuard guard(session.get_thread_data_lock());
session.get_tx_desc() = trans_desc;
exec_ctx_.get_das_ctx().set_snapshot(tx_snapshot);
}

View File

@ -189,7 +189,10 @@ public:
sql::ObSQLSessionInfo& get_sess_info() { return sess_info_; }
const sql::ObSQLSessionInfo& get_sess_info() const { return sess_info_; }
int init_sess_info();
void reset_tx_desc() { sess_info_.get_tx_desc() = nullptr; } // 防止异步提交场景在 session 析构的时候 rollback 事务
void reset_tx_desc() { // 防止异步提交场景在 session 析构的时候 rollback 事务
sql::ObSQLSessionInfo::LockGuard guard(sess_info_.get_thread_data_lock());
sess_info_.get_tx_desc() = nullptr;
}
void give_back_to_free_list();
private:
common::ObArenaAllocator allocator_;
@ -391,4 +394,4 @@ private:
} // end namespace table
} // end namespace oceanbase
#endif /* OCEANBASE_OBSERVER_OB_TABLE_SESSION_POOL_H_ */
#endif /* OCEANBASE_OBSERVER_OB_TABLE_SESSION_POOL_H_ */

View File

@ -62,7 +62,7 @@ int ObPlXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt)
stmt.get_format_id()))) {
LOG_WARN("set xid error", K(ret), K(stmt));
} else if (my_session->get_in_transaction()) {
ObTxDesc *&tx_desc = my_session->get_tx_desc();
auto tx_desc = my_session->get_tx_desc();
ret = OB_TRANS_XA_OUTSIDE;
LOG_WARN("already start trans", K(ret), K(xid), K(tx_desc->tid()),
K(tx_desc->get_xid()));

View File

@ -467,6 +467,7 @@ int ObPxSQCProxy::report(int end_ret) const
(void)MTL(transaction::ObTransService*)->merge_tx_state(*sqc_tx_desc, *task_tx_desc);
(void)MTL(transaction::ObTransService*)->release_tx(*task_tx_desc);
} else {
sql::ObSQLSessionInfo::LockGuard guard(session->get_thread_data_lock());
sqc_tx_desc = task_tx_desc;
}
task_tx_desc = NULL;

View File

@ -544,6 +544,8 @@ int ObPxTaskProcess::record_tx_desc()
CK (OB_NOT_NULL(cur_exec_ctx = arg_.exec_ctx_));
CK (OB_NOT_NULL(cur_session = cur_exec_ctx->get_my_session()));
if (OB_SUCC(ret) && !arg_.sqc_task_ptr_->is_use_local_thread()) {
// move session's tx_desc to task, accumulate when sqc report
ObSQLSessionInfo::LockGuard guard(cur_session->get_thread_data_lock());
transaction::ObTxDesc *&cur_tx_desc = cur_session->get_tx_desc();
if (OB_NOT_NULL(cur_tx_desc)) {
transaction::ObTxDesc *&task_tx_desc = arg_.sqc_task_ptr_->get_tx_desc();

View File

@ -286,6 +286,7 @@ void ObBasicSessionInfo::clean_status()
sql_scope_flags_.reset();
trans_spec_status_ = TRANS_SPEC_NOT_SET;
if (OB_NOT_NULL(tx_desc_)) {
LockGuard lock_guard(thread_data_mutex_);
int ret = OB_SUCCESS;
MAKE_TENANT_SWITCH_SCOPE_GUARD(guard);
if (OB_SUCC(guard.switch_to(tx_desc_->get_tenant_id(), false))) {
@ -3729,7 +3730,7 @@ int64_t ObBasicSessionInfo::to_string(char *buf, const int64_t buf_len) const
bool ac = false;
get_autocommit(ac),
J_OBJ_START();
J_KV(KP(this), "id", sessid_,
J_KV(KP(this), "id", sessid_, "deser", is_deserialized_,
N_TENANT, get_tenant_name(), "tenant_id", tenant_id_,
N_EFFECTIVE_TENANT, get_effective_tenant_name(), "effective_tenant_id", effective_tenant_id_,
N_DATABASE, get_database_name(),

View File

@ -1344,6 +1344,8 @@ bool GetMinActiveSnapshotVersionFunctor::operator()(sql::ObSQLSessionMgr::Key ke
} else if (false == sess_info->is_valid()) {
ret = OB_INVALID_ARGUMENT;
MVCC_LOG(WARN, "session info is not valid", K(ret));
} else if (sess_info->get_is_deserialized()) {
// skip deserialized session, only visit the original
} else if (MTL_ID() == sess_info->get_effective_tenant_id()) {
sql::ObSQLSessionInfo::LockGuard data_lock_guard(sess_info->get_thread_data_lock());
share::SCN snapshot_version(share::SCN::max_scn());