From 227a6f35c8516eda51ec9836f293a1841f49e94a Mon Sep 17 00:00:00 2001 From: fengdeyiji <546976189@qq.com> Date: Tue, 9 Jul 2024 11:20:03 +0000 Subject: [PATCH] [DeadLock] add associated_session_id to trans_part_ctx --- src/storage/tablet/ob_tablet_transfer_tx_ctx.h | 1 + src/storage/tx/ob_trans_ctx.h | 3 +++ src/storage/tx/ob_trans_ctx_mgr_v4.cpp | 2 ++ src/storage/tx/ob_trans_ctx_mgr_v4.h | 6 +++++- src/storage/tx/ob_trans_part_ctx.cpp | 5 +++++ src/storage/tx/ob_trans_part_ctx.h | 4 +++- src/storage/tx/ob_trans_service_v4.cpp | 1 + src/storage/tx/ob_tx_log.cpp | 3 ++- src/storage/tx/ob_tx_log.h | 7 ++++++- src/storage/tx/ob_tx_replay_executor.cpp | 1 + src/storage/tx_table/ob_tx_ctx_table.cpp | 1 + unittest/storage/tx/test_ob_tx_log.cpp | 1 + 12 files changed, 31 insertions(+), 4 deletions(-) diff --git a/src/storage/tablet/ob_tablet_transfer_tx_ctx.h b/src/storage/tablet/ob_tablet_transfer_tx_ctx.h index a8a4fc8998..ccf48d57ec 100644 --- a/src/storage/tablet/ob_tablet_transfer_tx_ctx.h +++ b/src/storage/tablet/ob_tablet_transfer_tx_ctx.h @@ -31,6 +31,7 @@ public: transaction::ObTransID tx_id_; int64_t epoch_; uint32_t session_id_; + uint32_t associated_session_id_; transaction::ObTxState tx_state_; share::SCN trans_version_; share::SCN prepare_version_; diff --git a/src/storage/tx/ob_trans_ctx.h b/src/storage/tx/ob_trans_ctx.h index 3ffcb2558a..33bd5ecc18 100644 --- a/src/storage/tx/ob_trans_ctx.h +++ b/src/storage/tx/ob_trans_ctx.h @@ -91,6 +91,7 @@ public: trans_service_(NULL), tlog_(NULL), cluster_version_(0), ls_tx_ctx_mgr_(NULL), session_id_(UINT32_MAX), + associated_session_id_(UINT32_MAX), stc_(0), part_trans_action_(ObPartTransAction::UNKNOWN), callback_scheduler_on_clear_(false), pending_callback_param_(common::OB_SUCCESS), p_mt_ctx_(NULL), @@ -137,6 +138,7 @@ public: bool contain(const ObTransID trans_id) { return trans_id_ == trans_id; } int set_session_id(const uint32_t session_id) { session_id_ = session_id; return common::OB_SUCCESS; } uint32_t get_session_id() const { return session_id_; } + uint32_t get_associated_session_id() const { return associated_session_id_; } void before_unlock(CtxLockArg &arg); void after_unlock(CtxLockArg &arg); public: @@ -244,6 +246,7 @@ protected: uint64_t cluster_version_; ObLSTxCtxMgr *ls_tx_ctx_mgr_; uint32_t session_id_; + uint32_t associated_session_id_;// associated session id in distributed scenario // set stc only by set_stc_xxx, and // get stc only by get_stc_() MonotonicTs stc_; diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp index 8695ead464..15b3a9a46f 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp @@ -362,6 +362,7 @@ int ObLSTxCtxMgr::create_tx_ctx_(const ObTxCreateArg &arg, if (OB_FAIL(tmp->init(arg.tenant_id_, arg.scheduler_, arg.session_id_, + arg.associated_session_id_, arg.tx_id_, arg.trans_expired_time_, arg.ls_id_, @@ -2739,6 +2740,7 @@ int ObLSTxCtxMgr::move_tx_op(const ObTransferMoveTxParam &move_tx_param, arg.cluster_id_, arg.cluster_version_, arg.session_id_, + arg.associated_session_id_, arg.scheduler_, INT64_MAX, // tx expired time txs_, diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.h b/src/storage/tx/ob_trans_ctx_mgr_v4.h index e88fc7d21d..e116f82a9c 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.h +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.h @@ -101,6 +101,7 @@ struct ObTxCreateArg const uint64_t cluster_id, const uint64_t cluster_version, const uint32_t session_id, + const uint32_t associated_session_id, const common::ObAddr &scheduler, const int64_t trans_expired_time, ObTransService *trans_service, @@ -115,6 +116,7 @@ struct ObTxCreateArg cluster_id_(cluster_id), cluster_version_(cluster_version), session_id_(session_id), + associated_session_id_(associated_session_id), scheduler_(scheduler), trans_expired_time_(trans_expired_time), trans_service_(trans_service), @@ -131,7 +133,8 @@ struct ObTxCreateArg TO_STRING_KV(K_(for_replay), "ctx_source", to_str_ctx_source(ctx_source_), K_(tenant_id), K_(tx_id), K_(ls_id), K_(cluster_id), K_(cluster_version), - K_(session_id), K_(scheduler), K_(trans_expired_time), KP_(trans_service), + K_(session_id), K_(associated_session_id), + K_(scheduler), K_(trans_expired_time), KP_(trans_service), K_(epoch), K_(xid)); bool for_replay_; PartCtxSource ctx_source_; @@ -141,6 +144,7 @@ struct ObTxCreateArg uint64_t cluster_id_; uint64_t cluster_version_; uint32_t session_id_; + uint32_t associated_session_id_; const common::ObAddr &scheduler_; int64_t trans_expired_time_; ObTransService *trans_service_; diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 655bee340f..bfbe3a209d 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -84,6 +84,7 @@ static void statistics_for_standby() int ObPartTransCtx::init(const uint64_t tenant_id, const common::ObAddr &scheduler, const uint32_t session_id, + const uint32_t associated_session_id, const ObTransID &trans_id, const int64_t trans_expired_time, const ObLSID &ls_id, @@ -138,6 +139,7 @@ int ObPartTransCtx::init(const uint64_t tenant_id, if (OB_SUCC(ret)) { tenant_id_ = tenant_id; session_id_ = session_id; + associated_session_id_ = associated_session_id; addr_ = trans_service->get_server(); trans_id_ = trans_id; trans_expired_time_ = trans_expired_time; @@ -339,6 +341,7 @@ void ObPartTransCtx::default_init_() request_id_ = OB_INVALID_TIMESTAMP; session_id_ = 0; + associated_session_id_ = 0; timeout_task_.reset(); trace_info_.reset(); can_elr_ = false; @@ -3402,6 +3405,7 @@ int ObPartTransCtx::submit_redo_active_info_log_() TRANS_LOG(WARN, "reuse log block failed", KR(ret), K(*this)); } else { ObTxActiveInfoLog active_info_log(exec_info_.scheduler_, exec_info_.trans_type_, session_id_, + associated_session_id_, trace_info_.get_app_trace_id(), mt_ctx_.get_min_table_version(), can_elr_, addr_, // @@ -9986,6 +9990,7 @@ int ObPartTransCtx::collect_tx_ctx(const ObLSID dest_ls_id, // must differ with src epoch bacause of may transfer back arg.epoch_ = epoch_ | (ObTimeUtility::current_time_ns() & ~(0xFFFFUL << 48)); arg.session_id_ = session_id_; + arg.associated_session_id_ = associated_session_id_; arg.trans_version_ = mt_ctx_.get_trans_version(); arg.prepare_version_ = exec_info_.prepare_version_; arg.commit_version_ = get_commit_version(); diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index d6b1355571..3af85819a5 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -182,7 +182,8 @@ public: void destroy(); int init(const uint64_t tenant_id, const common::ObAddr &scheduler, - const uint32_t session_id_, + const uint32_t session_id, + const uint32_t associated_session_id, const ObTransID &trans_id, const int64_t trans_expired_time, const share::ObLSID &ls_id, @@ -291,6 +292,7 @@ private: ON_DEMAND_TO_STRING_KV_("self_ls_id", ls_id_, K_(session_id), + K_(associated_session_id), K_(part_trans_action), K_(pending_write), "2pc_role", diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index 903769ef00..6d6adfea79 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -1318,6 +1318,7 @@ int ObTransService::create_tx_ctx_(const share::ObLSID &ls_id, tx.cluster_id_, tx.cluster_version_, tx.sess_id_, /*session_id*/ + tx.assoc_sess_id_, /*associated_session_id*/ tx.addr_, tx.get_expire_ts(), this, diff --git a/src/storage/tx/ob_tx_log.cpp b/src/storage/tx/ob_tx_log.cpp index 2954f3eb8c..ea16a266cb 100644 --- a/src/storage/tx/ob_tx_log.cpp +++ b/src/storage/tx/ob_tx_log.cpp @@ -299,7 +299,8 @@ OB_TX_SERIALIZE_MEMBER(ObTxActiveInfoLog, /* 16 */ cluster_version_, /* 17 */ max_submitted_seq_no_, /* 18 */ xid_, - /* 19 */ serial_final_seq_no_); + /* 19 */ serial_final_seq_no_, + /* 20 */ associated_session_id_); OB_TX_SERIALIZE_MEMBER(ObTxCommitInfoLog, compat_bytes_, diff --git a/src/storage/tx/ob_tx_log.h b/src/storage/tx/ob_tx_log.h index 9691df5186..92471bdd27 100644 --- a/src/storage/tx/ob_tx_log.h +++ b/src/storage/tx/ob_tx_log.h @@ -658,7 +658,7 @@ class ObTxActiveInfoLog public: ObTxActiveInfoLog(ObTxActiveInfoLogTempRef &temp_ref) - : scheduler_(temp_ref.scheduler_), trans_type_(TransType::SP_TRANS), session_id_(0), + : scheduler_(temp_ref.scheduler_), trans_type_(TransType::SP_TRANS), session_id_(0), associated_session_id_(0), app_trace_id_str_(temp_ref.app_trace_id_str_), schema_version_(0), can_elr_(false), proposal_leader_(temp_ref.proposal_leader_), cur_query_start_time_(0), is_sub2pc_(false), is_dup_tx_(false), tx_expired_time_(0), epoch_(0), last_op_sn_(0), first_seq_no_(), @@ -670,6 +670,7 @@ public: ObTxActiveInfoLog(common::ObAddr &scheduler, int trans_type, int session_id, + uint32_t associated_session_id, common::ObString &app_trace_id_str, int64_t schema_version, bool elr, @@ -687,6 +688,7 @@ public: const ObXATransID &xid, ObTxSEQ serial_final_seq_no) : scheduler_(scheduler), trans_type_(trans_type), session_id_(session_id), + associated_session_id_(associated_session_id), app_trace_id_str_(app_trace_id_str), schema_version_(schema_version), can_elr_(elr), proposal_leader_(proposal_leader), cur_query_start_time_(cur_query_start_time), is_sub2pc_(is_sub2pc), is_dup_tx_(is_dup_tx), tx_expired_time_(tx_expired_time), @@ -700,6 +702,7 @@ public: const common::ObAddr &get_scheduler() const { return scheduler_; } int get_trans_type() const { return trans_type_; } int get_session_id() const { return session_id_; } + int get_associated_session_id() const { return associated_session_id_; } const common::ObString &get_app_trace_id() const { return app_trace_id_str_; } const int64_t &get_schema_version() { return schema_version_; } bool is_elr() const { return can_elr_; } @@ -724,6 +727,7 @@ public: K(scheduler_), K(trans_type_), K(session_id_), + K(associated_session_id_), K(app_trace_id_str_), K(schema_version_), K(can_elr_), @@ -749,6 +753,7 @@ private: common::ObAddr &scheduler_; int trans_type_; int session_id_; + uint32_t associated_session_id_; common::ObString &app_trace_id_str_; int64_t schema_version_; bool can_elr_; diff --git a/src/storage/tx/ob_tx_replay_executor.cpp b/src/storage/tx/ob_tx_replay_executor.cpp index 55a46cd664..22dec53465 100644 --- a/src/storage/tx/ob_tx_replay_executor.cpp +++ b/src/storage/tx/ob_tx_replay_executor.cpp @@ -296,6 +296,7 @@ int ObTxReplayExecutor::try_get_tx_ctx_() log_block_.get_header().get_org_cluster_id(), cluster_version, 0, /*session_id*/ + 0, /*associated_session_id*/ scheduler, INT64_MAX, /*trans_expired_time_*/ ls_tx_srv_->get_trans_service()); diff --git a/src/storage/tx_table/ob_tx_ctx_table.cpp b/src/storage/tx_table/ob_tx_ctx_table.cpp index b3c134a097..255b03b929 100644 --- a/src/storage/tx_table/ob_tx_ctx_table.cpp +++ b/src/storage/tx_table/ob_tx_ctx_table.cpp @@ -114,6 +114,7 @@ int ObTxCtxTableRecoverHelper::recover_one_tx_ctx_(transaction::ObLSTxCtxMgr* ls ctx_info.cluster_id_, /* cluster_id */ cluster_version, 0, /*session_id*/ + 0, /*associated_session_id*/ scheduler, INT64_MAX, MTL(ObTransService*)); diff --git a/unittest/storage/tx/test_ob_tx_log.cpp b/unittest/storage/tx/test_ob_tx_log.cpp index a67fb74db4..8eebecc637 100644 --- a/unittest/storage/tx/test_ob_tx_log.cpp +++ b/unittest/storage/tx/test_ob_tx_log.cpp @@ -200,6 +200,7 @@ TEST_F(TestObTxLog, tx_log_body_except_redo) ObTxActiveInfoLog fill_active_state(TEST_ADDR, TEST_TRANS_TYPE, TEST_SESSION_ID, + 0, TEST_TRACE_ID_STR, TEST_SCHEMA_VERSION, TEST_CAN_ELR,