From fd54ded76c926d8d06729d9afc9ecf4a11979407 Mon Sep 17 00:00:00 2001 From: felix-w15 <806547150@qq.com> Date: Tue, 27 Feb 2024 09:44:46 +0000 Subject: [PATCH] cache data version in session for performance optimize --- src/observer/table/ob_table_rpc_processor.cpp | 4 ++-- src/sql/dblink/ob_tm_service.cpp | 2 +- src/sql/engine/cmd/ob_xa_executor.cpp | 3 ++- src/sql/ob_sql_trans_control.cpp | 18 +++++++++------ src/sql/ob_sql_trans_control.h | 5 ++++- src/sql/session/ob_sql_session_info.cpp | 7 ++++++ src/sql/session/ob_sql_session_info.h | 8 +++++++ src/storage/tx/ob_trans_service_v4.h | 6 +++-- src/storage/tx/ob_tx_api.cpp | 22 ++++++++++++------- src/storage/tx/ob_tx_api.h | 7 +++--- src/storage/tx/ob_tx_free_route.cpp | 18 +++++++++------ src/storage/tx/ob_tx_free_route_api.h | 2 +- src/storage/tx/ob_xa_dblink_service.cpp | 10 +++++---- src/storage/tx/ob_xa_service.cpp | 10 +++++---- src/storage/tx/ob_xa_service.h | 12 ++++++---- unittest/storage/tx/it/test_tx_free_route.cpp | 2 +- 16 files changed, 90 insertions(+), 46 deletions(-) diff --git a/src/observer/table/ob_table_rpc_processor.cpp b/src/observer/table/ob_table_rpc_processor.cpp index 7ddb8a701..7d02a8187 100644 --- a/src/observer/table/ob_table_rpc_processor.cpp +++ b/src/observer/table/ob_table_rpc_processor.cpp @@ -394,7 +394,7 @@ int ObTableApiProcessorBase::init_read_trans(const ObTableConsistencyLevel consi bool strong_read = ObTableConsistencyLevel::STRONG == consistency_level; transaction::ObTransService *txs = MTL(transaction::ObTransService*); - if (OB_FAIL(txs->acquire_tx(trans_desc_, session().get_sessid()))) { + if (OB_FAIL(txs->acquire_tx(trans_desc_, session().get_sessid(), session().get_data_version()))) { LOG_WARN("failed to acquire tx desc", K(ret)); } else if (OB_FAIL(setup_tx_snapshot_(*trans_desc_, tx_snapshot_, strong_read, ls_id, timeout_ts))) { LOG_WARN("setup txn snapshot fail", K(ret), KPC_(trans_desc), K(strong_read), K(ls_id), K(timeout_ts)); @@ -485,7 +485,7 @@ int ObTableApiProcessorBase::start_trans_(bool is_readonly, ret = OB_ERR_UNEXPECTED; LOG_ERROR("start_trans is executed", K(ret)); } else { - if (OB_FAIL(txs->acquire_tx(trans_desc, session().get_sessid()))) { + if (OB_FAIL(txs->acquire_tx(trans_desc, session().get_sessid(), session().get_data_version()))) { LOG_WARN("failed to acquire tx desc", K(ret)); } else if (OB_FAIL(txs->start_tx(*trans_desc, tx_param))) { LOG_WARN("failed to start trans", K(ret), KPC(trans_desc)); diff --git a/src/sql/dblink/ob_tm_service.cpp b/src/sql/dblink/ob_tm_service.cpp index eb0f521aa..7db6deb62 100644 --- a/src/sql/dblink/ob_tm_service.cpp +++ b/src/sql/dblink/ob_tm_service.cpp @@ -106,7 +106,7 @@ int ObTMService::tm_rm_start(ObExecContext &exec_ctx, tx_param.timeout_us_ = tx_timeout; tx_param.lock_timeout_us_ = my_session->get_trx_lock_timeout(); if (OB_FAIL(xa_service->xa_start_for_tm(0, timeout_seconds, my_session->get_sessid(), - tx_param, tx_desc, xid))) { + tx_param, tx_desc, xid, my_session->get_data_version()))) { LOG_WARN("xa start for dblink failed", K(ret), K(tx_param)); // TODO, reset my_session->reset_first_need_txn_stmt_type(); diff --git a/src/sql/engine/cmd/ob_xa_executor.cpp b/src/sql/engine/cmd/ob_xa_executor.cpp index 8ceb8b86f..4687f1857 100644 --- a/src/sql/engine/cmd/ob_xa_executor.cpp +++ b/src/sql/engine/cmd/ob_xa_executor.cpp @@ -99,7 +99,8 @@ int ObPlXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt) my_session->get_xa_end_timeout_seconds(), my_session->get_sessid(), tx_param, - tx_desc))) { + tx_desc, + my_session->get_data_version()))) { LOG_WARN("xa start failed", K(ret), K(tx_param)); my_session->reset_tx_variable(); my_session->set_early_lock_release(false); diff --git a/src/sql/ob_sql_trans_control.cpp b/src/sql/ob_sql_trans_control.cpp index 6046fee42..bd004c412 100644 --- a/src/sql/ob_sql_trans_control.cpp +++ b/src/sql/ob_sql_trans_control.cpp @@ -171,7 +171,7 @@ int ObSqlTransControl::explicit_start_trans(ObExecContext &ctx, const bool read_ ObTxParam &tx_param = plan_ctx->get_trans_param(); OZ (build_tx_param_(session, tx_param, &read_only)); - OZ (txs->acquire_tx(session->get_tx_desc(), session->get_sessid())); + OZ (txs->acquire_tx(session->get_tx_desc(), session->get_sessid(), session->get_data_version())); OZ (txs->start_tx(*session->get_tx_desc(), tx_param), tx_param); OX (tx_id = session->get_tx_desc()->get_tx_id()); @@ -1220,7 +1220,10 @@ int ObSqlTransControl::get_trans_result(ObExecContext &exec_ctx) return get_trans_result(exec_ctx, exec_ctx.get_my_session()->get_trans_result()); } -int ObSqlTransControl::reset_session_tx_state(ObBasicSessionInfo *session, bool reuse_tx_desc, bool reset_trans_variable) +int ObSqlTransControl::reset_session_tx_state(ObBasicSessionInfo *session, + bool reuse_tx_desc, + bool reset_trans_variable, + const uint64_t data_version) { int ret = OB_SUCCESS; LOG_DEBUG("reset session tx state", KPC(session->get_tx_desc()), K(lbt())); @@ -1233,7 +1236,7 @@ int ObSqlTransControl::reset_session_tx_state(ObBasicSessionInfo *session, bool transaction::ObTransService *txs = NULL; OZ (get_tx_service(session, txs), *session, tx_desc); if (reuse_tx_desc) { - if (OB_FAIL(txs->reuse_tx(tx_desc))) { + if (OB_FAIL(txs->reuse_tx(tx_desc, data_version))) { LOG_ERROR("reuse txn descriptor fail, will release it", K(ret), KPC(session), K(tx_desc)); OZ (txs->release_tx(tx_desc), tx_id); session->get_tx_desc() = NULL; @@ -1262,7 +1265,8 @@ int ObSqlTransControl::reset_session_tx_state(ObSQLSessionInfo *session, bool re LOG_WARN_RET(temp_ret, "trx level temporary table clean failed", KR(temp_ret)); } } - int ret = reset_session_tx_state(static_cast(session), reuse_tx_desc, reset_trans_variable); + int ret = reset_session_tx_state(static_cast(session), reuse_tx_desc, + reset_trans_variable, session->get_data_version()); return COVER_SUCC(temp_ret); } @@ -1291,7 +1295,7 @@ int ObSqlTransControl::acquire_tx_if_need_(transaction::ObTransService *txs, ObS { int ret = OB_SUCCESS; if (OB_ISNULL(session.get_tx_desc())) { - OZ (txs->acquire_tx(session.get_tx_desc(), session.get_sessid()), session); + OZ (txs->acquire_tx(session.get_tx_desc(), session.get_sessid(), session.get_data_version()), session); } return ret; } @@ -1313,7 +1317,7 @@ int ObSqlTransControl::lock_table(ObExecContext &exec_ctx, OZ (get_tx_service(session, txs)); OZ (get_lock_service(session->get_effective_tenant_id(), lock_service)); if (OB_SUCC(ret) && OB_ISNULL(session->get_tx_desc())) { - OZ (txs->acquire_tx(session->get_tx_desc(), session->get_sessid()), *session); + OZ (txs->acquire_tx(session->get_tx_desc(), session->get_sessid(), session->get_data_version()), *session); } ObTxParam tx_param; OZ (build_tx_param_(session, tx_param)); @@ -1442,7 +1446,7 @@ int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id, bool has_tx_desc = OB_NOT_NULL(tx_desc); \ transaction::ObTransID prev_tx_id; \ if (has_tx_desc) { prev_tx_id = session.get_tx_id(); } \ - OZ (txs->txn_free_route__update_##name##_state(session.get_sessid(), tx_desc, session.get_txn_free_route_ctx(), buf, len, pos), session); \ + OZ (txs->txn_free_route__update_##name##_state(session.get_sessid(), tx_desc, session.get_txn_free_route_ctx(), buf, len, pos, session.get_data_version()), session); \ if (OB_SUCC(ret) && has_tx_desc && (OB_ISNULL(tx_desc) || tx_desc->get_tx_id() != prev_tx_id)) { \ session.reset_tx_variable(); \ } \ diff --git a/src/sql/ob_sql_trans_control.h b/src/sql/ob_sql_trans_control.h index 24e10db7e..d1d3afc26 100644 --- a/src/sql/ob_sql_trans_control.h +++ b/src/sql/ob_sql_trans_control.h @@ -168,7 +168,10 @@ class ObSqlTransControl { public: static int reset_session_tx_state(ObSQLSessionInfo *session, bool reuse_tx_desc = false, bool active_tx_end = true); - static int reset_session_tx_state(ObBasicSessionInfo *session, bool reuse_tx_desc = false, bool active_tx_end = true); + static int reset_session_tx_state(ObBasicSessionInfo *session, + bool reuse_tx_desc = false, + bool active_tx_end = true, + const uint64_t data_version = 0); static int create_stash_savepoint(ObExecContext &exec_ctx, const ObString &name); static int release_stash_savepoint(ObExecContext &exec_ctx, const ObString &name); static int explicit_start_trans(ObExecContext &exec_ctx, const bool read_only, const ObString hint = ObString()); diff --git a/src/sql/session/ob_sql_session_info.cpp b/src/sql/session/ob_sql_session_info.cpp index e99bab3a4..094dcd4ab 100644 --- a/src/sql/session/ob_sql_session_info.cpp +++ b/src/sql/session/ob_sql_session_info.cpp @@ -2791,6 +2791,13 @@ void ObSQLSessionInfo::ObCachedTenantConfigInfo::refresh() LOG_DEBUG("refresh tenant config where tenant changed", K_(saved_tenant_info), K(effective_tenant_id)); ATOMIC_STORE(&saved_tenant_info_, effective_tenant_id); + } + // 缓存data version 用于性能优化 + uint64_t data_version = 0; + if (OB_TMP_FAIL(GET_MIN_DATA_VERSION(effective_tenant_id, data_version))) { + LOG_WARN_RET(tmp_ret, "get data version fail", "ret", tmp_ret, K(effective_tenant_id)); + } else { + ATOMIC_STORE(&data_version_, data_version); } // 1.是否支持外部一致性 is_external_consistent_ = transaction::ObTsMgr::get_instance().is_external_consistent(effective_tenant_id); diff --git a/src/sql/session/ob_sql_session_info.h b/src/sql/session/ob_sql_session_info.h index 7a7ae0f86..7487e4c86 100644 --- a/src/sql/session/ob_sql_session_info.h +++ b/src/sql/session/ob_sql_session_info.h @@ -689,6 +689,7 @@ public: at_type_(ObAuditTrailType::NONE), sort_area_size_(128*1024*1024), hash_area_size_(128*1024*1024), + data_version_(0), enable_query_response_time_stats_(false), enable_user_defined_rewrite_rules_(false), range_optimizer_max_mem_size_(128*1024*1024), @@ -708,6 +709,7 @@ public: ObAuditTrailType get_at_type() const { return at_type_; } int64_t get_sort_area_size() const { return ATOMIC_LOAD(&sort_area_size_); } int64_t get_hash_area_size() const { return ATOMIC_LOAD(&hash_area_size_); } + uint64_t get_data_version() const { return ATOMIC_LOAD(&data_version_); } bool enable_query_response_time_stats() const { return enable_query_response_time_stats_; } bool enable_udr() const { return ATOMIC_LOAD(&enable_user_defined_rewrite_rules_); } int64_t get_print_sample_ppm() const { return ATOMIC_LOAD(&print_sample_ppm_); } @@ -728,6 +730,7 @@ public: ObAuditTrailType at_type_; int64_t sort_area_size_; int64_t hash_area_size_; + uint64_t data_version_; bool enable_query_response_time_stats_; bool enable_user_defined_rewrite_rules_; int64_t range_optimizer_max_mem_size_; @@ -1280,6 +1283,11 @@ public: cached_tenant_config_info_.refresh(); return cached_tenant_config_info_.get_sort_area_size(); } + uint64_t get_data_version() + { + cached_tenant_config_info_.refresh(); + return cached_tenant_config_info_.get_data_version(); + } bool enable_query_response_time_stats() { cached_tenant_config_info_.refresh(); diff --git a/src/storage/tx/ob_trans_service_v4.h b/src/storage/tx/ob_trans_service_v4.h index ed10d87cb..adfa55b98 100644 --- a/src/storage/tx/ob_trans_service_v4.h +++ b/src/storage/tx/ob_trans_service_v4.h @@ -195,8 +195,10 @@ TO_STRING_KV(K(is_inited_), K(tenant_id_), KP(this)); private: int check_ls_status_(const share::ObLSID &ls_id, bool &leader); -int init_tx_(ObTxDesc &tx, const uint32_t session_id); -int reinit_tx_(ObTxDesc &tx, const uint32_t session_id); +int init_tx_(ObTxDesc &tx, + const uint32_t session_id, + const uint64_t cluster_version); +int reinit_tx_(ObTxDesc &tx, const uint32_t session_id, const uint64_t cluster_version); int start_tx_(ObTxDesc &tx); int abort_tx_(ObTxDesc &tx, const int cause, bool cleanup = true); void abort_tx__(ObTxDesc &tx, const bool cleanup); diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index f2f53b996..2b4e78489 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -60,7 +60,9 @@ using namespace share; namespace transaction { -inline int ObTransService::init_tx_(ObTxDesc &tx, const uint32_t session_id) +inline int ObTransService::init_tx_(ObTxDesc &tx, + const uint32_t session_id, + const uint64_t cluster_version) { int ret = OB_SUCCESS; tx.tenant_id_ = tenant_id_; @@ -71,7 +73,9 @@ inline int ObTransService::init_tx_(ObTxDesc &tx, const uint32_t session_id) tx.expire_ts_ = INT64_MAX; tx.op_sn_ = 1; tx.state_ = ObTxDesc::State::IDLE; - if (OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, tx.cluster_version_))) { + tx.cluster_version_ = cluster_version; + // cluster_version is invalid, need to get it + if (0 == cluster_version && OB_FAIL(GET_MIN_DATA_VERSION(tenant_id_, tx.cluster_version_))) { TRANS_LOG(WARN, "get min data version fail", K(ret), K(tx)); } else if (tx.cluster_version_ >= DATA_VERSION_4_3_0_0) { tx.seq_base_ = common::ObSequence::get_max_seq_no() - 1; @@ -79,13 +83,15 @@ inline int ObTransService::init_tx_(ObTxDesc &tx, const uint32_t session_id) return ret; } -int ObTransService::acquire_tx(ObTxDesc *&tx, const uint32_t session_id) +int ObTransService::acquire_tx(ObTxDesc *&tx, + const uint32_t session_id, + const uint64_t cluster_version) { int ret = OB_SUCCESS; if (OB_FAIL(tx_desc_mgr_.alloc(tx))) { TRANS_LOG(WARN, "alloc tx fail", K(ret)); } else { - ret = init_tx_(*tx, session_id); + ret = init_tx_(*tx, session_id, cluster_version); } TRANS_LOG(TRACE, "acquire tx", KPC(tx), K(session_id)); if (OB_SUCC(ret)) { @@ -172,7 +178,7 @@ int ObTransService::release_tx(ObTxDesc &tx, const bool is_from_xa) return ret; } -int ObTransService::reuse_tx(ObTxDesc &tx) +int ObTransService::reuse_tx(ObTxDesc &tx, const uint64_t data_version) { int ret = OB_SUCCESS; int spin_cnt = 0; @@ -211,7 +217,7 @@ int ObTransService::reuse_tx(ObTxDesc &tx) #endif } // it is safe to operate tx without lock when not shared - ret = reinit_tx_(tx, tx.sess_id_); + ret = reinit_tx_(tx, tx.sess_id_, data_version); } TRANS_LOG(TRACE, "reuse tx", K(ret), K(orig_tx_id), K(tx)); ObTransTraceLog &tlog = tx.get_tlog(); @@ -225,10 +231,10 @@ int ObTransService::reuse_tx(ObTxDesc &tx) return ret; } -int ObTransService::reinit_tx_(ObTxDesc &tx, const uint32_t session_id) +int ObTransService::reinit_tx_(ObTxDesc &tx, const uint32_t session_id, const uint64_t cluster_version) { tx.reset(); - return init_tx_(tx, session_id); + return init_tx_(tx, session_id, cluster_version); } int ObTransService::stop_tx(ObTxDesc &tx) diff --git a/src/storage/tx/ob_tx_api.h b/src/storage/tx/ob_tx_api.h index c8e569a27..bca7da529 100644 --- a/src/storage/tx/ob_tx_api.h +++ b/src/storage/tx/ob_tx_api.h @@ -10,7 +10,7 @@ * See the Mulan PubL v2 for more details. */ -int acquire_tx(ObTxDesc *&tx, const uint32_t session_id = 0); +int acquire_tx(ObTxDesc *&tx, const uint32_t session_id = 0, const uint64_t data_version = 0); /** * start_tx - explicit start transaction @@ -112,11 +112,12 @@ int release_tx(ObTxDesc &tx, const bool is_from_xa = false); * when txn end, in stead of release txn descriptor, reuse it for * better performance. * - * @tx: the target transaction's descriptor + * @tx: the target transaction's descriptor + * @data_version: tx data_version * * Return: OB_SUCCESS -OK */ -int reuse_tx(ObTxDesc &tx); +int reuse_tx(ObTxDesc &tx, const uint64_t data_version); /** * stop_tx - stop txn immediately (for admin reason) diff --git a/src/storage/tx/ob_tx_free_route.cpp b/src/storage/tx/ob_tx_free_route.cpp index 662382b8e..23e201ceb 100644 --- a/src/storage/tx/ob_tx_free_route.cpp +++ b/src/storage/tx/ob_tx_free_route.cpp @@ -395,7 +395,8 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i ObTxnFreeRouteCtx &ctx, const char* buf, const int64_t len, - int64_t &pos) + int64_t &pos, + const uint64_t data_version) { int ret = OB_SUCCESS; bool need_add_tx = false; @@ -419,7 +420,7 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i TRANS_LOG(WARN, "handle tx exist fail", K(ret)); } else if (OB_ISNULL(tx)) { audit_record.alloc_tx_ = true; - if (OB_FAIL(acquire_tx(tx, session_id))) { + if (OB_FAIL(acquire_tx(tx, session_id, data_version))) { // if acquire tx failed, it may retryable: alloc-memory failed TRANS_LOG(WARN, "acquire tx for decode failed", K(ret)); } else { need_add_tx = true; } @@ -435,7 +436,7 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i tx_desc_mgr_.remove(*tx); } // reset tx to cleanup for new txn - reinit_tx_(*tx, session_id); + reinit_tx_(*tx, session_id, tx->get_cluster_version()); need_add_tx = true; } else { // update @@ -516,7 +517,8 @@ int ObTransService::txn_free_route__update_dynamic_state(const uint32_t session_ ObTxnFreeRouteCtx &ctx, const char* buf, const int64_t len, - int64_t &pos) + int64_t &pos, + const uint64_t data_version) { int ret = OB_SUCCESS; ObTxnFreeRouteAuditRecord &audit_record = ctx.audit_record_; @@ -575,7 +577,8 @@ int ObTransService::txn_free_route__update_parts_state(const uint32_t session_id ObTxnFreeRouteCtx &ctx, const char* buf, const int64_t len, - int64_t &pos) + int64_t &pos, + const uint64_t data_version) { int ret = OB_SUCCESS; ObTxnFreeRouteAuditRecord &audit_record = ctx.audit_record_; @@ -632,7 +635,8 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id ObTxnFreeRouteCtx &ctx, const char* buf, const int64_t len, - int64_t &pos) + int64_t &pos, + const uint64_t data_version) { int ret = OB_SUCCESS; int64_t logic_clock = 0; @@ -667,7 +671,7 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id } else if (OB_FAIL(update_logic_clock_(logic_clock, NULL, false))) { TRANS_LOG(ERROR, "update logic clock fail", K(ret)); } - if (OB_SUCC(ret) && add_tx && OB_FAIL(acquire_tx(tx, session_id))) { + if (OB_SUCC(ret) && add_tx && OB_FAIL(acquire_tx(tx, session_id, data_version))) { // only has savepoints or txn scope snapshot, txn not started // acquire tx to hold extra info TRANS_LOG(WARN, "acquire tx fail", K(ret)); diff --git a/src/storage/tx/ob_tx_free_route_api.h b/src/storage/tx/ob_tx_free_route_api.h index 63c29053e..ded334c87 100644 --- a/src/storage/tx/ob_tx_free_route_api.h +++ b/src/storage/tx/ob_tx_free_route_api.h @@ -11,7 +11,7 @@ */ #define DEF_FREE_ROUTE_API_(name) \ - int txn_free_route__update_##name##_state(const uint32_t session_id, ObTxDesc *&tx, ObTxnFreeRouteCtx &ctx, const char* buf, const int64_t len, int64_t &pos); \ + int txn_free_route__update_##name##_state(const uint32_t session_id, ObTxDesc *&tx, ObTxnFreeRouteCtx &ctx, const char* buf, const int64_t len, int64_t &pos, const uint64_t data_version); \ int txn_free_route__serialize_##name##_state(const uint32_t session_id, ObTxDesc *tx, ObTxnFreeRouteCtx &ctx, char* buf, const int64_t len, int64_t &pos); \ int64_t txn_free_route__get_##name##_state_serialize_size(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx); \ static int64_t txn_free_route__get_##name##_state_size(ObTxDesc *tx); \ diff --git a/src/storage/tx/ob_xa_dblink_service.cpp b/src/storage/tx/ob_xa_dblink_service.cpp index 335bc1495..d1b990a61 100644 --- a/src/storage/tx/ob_xa_dblink_service.cpp +++ b/src/storage/tx/ob_xa_dblink_service.cpp @@ -244,7 +244,8 @@ int ObXAService::xa_start_for_tm(const int64_t flags, const uint32_t session_id, const ObTxParam &tx_param, ObTxDesc *&tx_desc, - ObXATransID &xid) + ObXATransID &xid, + const uint64_t data_version) { int ret = OB_SUCCESS; @@ -258,7 +259,7 @@ int ObXAService::xa_start_for_tm(const int64_t flags, TRANS_LOG(WARN, "invalid flags for xa start", K(ret), K(xid), K(flags)); } else { if (ObXAFlag::is_tmnoflags(flags, ObXAReqType::XA_START)) { - if (OB_FAIL(xa_start_for_tm_(flags, timeout_seconds, session_id, tx_param, tx_desc, xid))) { + if (OB_FAIL(xa_start_for_tm_(flags, timeout_seconds, session_id, tx_param, tx_desc, xid, data_version))) { TRANS_LOG(WARN, "xa start promotion failed", K(ret), K(flags), K(xid)); } } else { @@ -283,7 +284,8 @@ int ObXAService::xa_start_for_tm_(const int64_t flags, const uint32_t session_id, const ObTxParam &tx_param, ObTxDesc *&tx_desc, - ObXATransID &xid) + ObXATransID &xid, + const uint64_t data_version) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; @@ -306,7 +308,7 @@ int ObXAService::xa_start_for_tm_(const int64_t flags, } // the first xa start for xa trans with this xid // therefore tx_desc should be allocated - if (OB_FAIL(MTL(ObTransService *)->acquire_tx(tx_desc, session_id))) { + if (OB_FAIL(MTL(ObTransService *)->acquire_tx(tx_desc, session_id, data_version))) { TRANS_LOG(WARN, "fail acquire trans", K(ret), K(tx_param)); } else if (OB_FAIL(MTL(ObTransService *)->start_tx(*tx_desc, tx_param))) { TRANS_LOG(WARN, "fail start trans", K(ret), KPC(tx_desc)); diff --git a/src/storage/tx/ob_xa_service.cpp b/src/storage/tx/ob_xa_service.cpp index 8210c454b..0f5c4369e 100644 --- a/src/storage/tx/ob_xa_service.cpp +++ b/src/storage/tx/ob_xa_service.cpp @@ -997,7 +997,8 @@ int ObXAService::xa_start(const ObXATransID &xid, const int64_t timeout_seconds, const uint32_t session_id, const ObTxParam &tx_param, - ObTxDesc *&tx_desc) + ObTxDesc *&tx_desc, + const uint64_t data_version) { int ret = OB_SUCCESS; @@ -1010,7 +1011,7 @@ int ObXAService::xa_start(const ObXATransID &xid, ret = OB_TRANS_XA_INVAL; TRANS_LOG(WARN, "invalid flags for xa start", K(ret), K(xid), K(flags)); } else if (ObXAFlag::is_tmnoflags(flags, ObXAReqType::XA_START)) { - if (OB_FAIL(xa_start_(xid, flags, timeout_seconds, session_id, tx_param, tx_desc))) { + if (OB_FAIL(xa_start_(xid, flags, timeout_seconds, session_id, tx_param, tx_desc, data_version))) { TRANS_LOG(WARN, "xa start failed", K(ret), K(flags), K(xid)); } } else if (ObXAFlag::is_tmjoin(flags) || ObXAFlag::is_tmresume(flags)) { @@ -1043,7 +1044,8 @@ int ObXAService::xa_start_(const ObXATransID &xid, const int64_t timeout_seconds, const uint32_t session_id, const ObTxParam &tx_param, - ObTxDesc *&tx_desc) + ObTxDesc *&tx_desc, + const uint64_t data_version) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; @@ -1105,7 +1107,7 @@ int ObXAService::xa_start_(const ObXATransID &xid, // the first xa start for xa trans with this xid // therefore tx_desc should be allocated // this code may be moved to pl sql level - if (OB_FAIL(MTL(ObTransService *)->acquire_tx(tx_desc, session_id))) { + if (OB_FAIL(MTL(ObTransService *)->acquire_tx(tx_desc, session_id, data_version))) { TRANS_LOG(WARN, "fail acquire trans", K(ret), K(tx_param)); } else if (OB_FAIL(MTL(ObTransService *)->start_tx(*tx_desc, tx_param, trans_id))) { TRANS_LOG(WARN, "fail start trans", K(ret), KPC(tx_desc)); diff --git a/src/storage/tx/ob_xa_service.h b/src/storage/tx/ob_xa_service.h index d0761c5db..819a1cf93 100644 --- a/src/storage/tx/ob_xa_service.h +++ b/src/storage/tx/ob_xa_service.h @@ -106,7 +106,8 @@ public: const int64_t timeout_seconds, const uint32_t session_id, const ObTxParam &tx_param, - ObTxDesc *&tx_desc); + ObTxDesc *&tx_desc, + const uint64_t data_version); int xa_end(const ObXATransID &xid, const int64_t flags, ObTxDesc *&tx_desc); @@ -143,7 +144,8 @@ public: const uint32_t session_id, const ObTxParam &tx_param, ObTxDesc *&tx_desc, - ObXATransID &xid); + ObXATransID &xid, + const uint64_t data_version); int xa_start_for_dblink_client(const common::sqlclient::DblinkDriverProto dblink_type, common::sqlclient::ObISQLConnection *dblink_conn, ObTxDesc *&tx_desc, @@ -247,7 +249,8 @@ private: const int64_t timeout_seconds, const uint32_t session_id, const ObTxParam &tx_param, - ObTxDesc *&tx_desc); + ObTxDesc *&tx_desc, + const uint64_t data_version); int xa_start_join_(const ObXATransID &xid, const int64_t flags, const int64_t timeout_seconds, @@ -318,7 +321,8 @@ private: const uint32_t session_id, const ObTxParam &tx_param, ObTxDesc *&tx_desc, - ObXATransID &xid); + ObXATransID &xid, + const uint64_t data_version); public: int xa_scheduler_hb_req(); int gc_invalid_xa_record(const uint64_t tenant_id); diff --git a/unittest/storage/tx/it/test_tx_free_route.cpp b/unittest/storage/tx/it/test_tx_free_route.cpp index 10696ff89..008e32f22 100644 --- a/unittest/storage/tx/it/test_tx_free_route.cpp +++ b/unittest/storage/tx/it/test_tx_free_route.cpp @@ -471,7 +471,7 @@ int MockObServer::handle(ObReq &req, ObResp &resp) #define TX_STATE_UPDATE__(T, tn) \ case SyncTxState::T: \ if (OB_SUCC(ret) && \ - OB_FAIL(tx_node_.txn_free_route__update_##tn##_state(session_.get_sessid(), tx_desc, free_route_ctx, buf, len, pos))) { \ + OB_FAIL(tx_node_.txn_free_route__update_##tn##_state(session_.get_sessid(), tx_desc, free_route_ctx, buf, len, pos, session_.get_data_version()))) { \ TRANS_LOG(ERROR, "update txn state fail", K(ret), "type", #T); \ } else if (pos != len) { \ TRANS_LOG(WARN, "[maybe] pos != len, consume buffer incomplete", K(ret), K(pos), K(len), "state_type", #T); \