From 195359c6f33497ccde3d9da4f9055ceaf1430a64 Mon Sep 17 00:00:00 2001 From: chinaxing Date: Tue, 14 Mar 2023 17:44:03 +0000 Subject: [PATCH] [master][tx-route][xa] xa-tightly-couple will be enabled free-route incorrectly --- src/storage/tx/ob_tx_free_route.cpp | 58 ++++++++++++++++++++--------- src/storage/tx/ob_tx_free_route.h | 54 ++++++++++++++++----------- 2 files changed, 72 insertions(+), 40 deletions(-) diff --git a/src/storage/tx/ob_tx_free_route.cpp b/src/storage/tx/ob_tx_free_route.cpp index 9410f6c223..8accd939b2 100644 --- a/src/storage/tx/ob_tx_free_route.cpp +++ b/src/storage/tx/ob_tx_free_route.cpp @@ -68,6 +68,7 @@ void ObTxnFreeRouteCtx::init_before_handle_request(ObTxDesc *tx) txn_addr_.reset(); tx_id_.reset(); } + prev_tx_id_.reset(); reset_changed_(); ++local_version_; #ifndef NDEBUG @@ -184,12 +185,13 @@ inline int ObTransService::txn_state_update_verify_by_version_(const ObTxnFreeRo #define ENCODE_HEADER() \ + auto tx_id = ctx.prev_tx_id_.is_valid() ? ctx.prev_tx_id_ : ctx.tx_id_; \ if (OB_FAIL(OB_E(EventTable::EN_TX_FREE_ROUTE_ENCODE_STATE_ERROR, session_id) OB_SUCCESS)) { \ TRANS_LOG(ERROR, "inject failure", K(ret), KPC(tx), K(session_id)); \ } else if (!ctx.tx_id_.is_valid()) { \ ret = OB_ERR_UNEXPECTED; \ TRANS_LOG(ERROR, "tx_id is invalid", K(ret), K(ctx)); \ - } else if (OB_FAIL(encode_i64(buf, len, pos, ctx.tx_id_.get_id()))) { \ + } else if (OB_FAIL(encode_i64(buf, len, pos, tx_id.get_id()))) { \ TRANS_LOG(WARN, "encode tx_id fail", K(ret)); \ } else if (OB_FAIL(encode_i64(buf, len, pos, ctx.global_version_))) { \ TRANS_LOG(WARN, "encode global_version fail", K(ret)); \ @@ -735,17 +737,14 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx) bool is_tx_start = !prev_in_txn && in_txn; // IDLE => ACTIVE bool is_tx_terminated = prev_in_txn && !in_txn; // ACTIVE => ROLLBACK/COMMIT/IDLE bool is_tx_active_to_active = prev_in_txn && in_txn; // ACTIVE => ACTIVE - bool support_free_route = false; + bool is_tx_switch = is_tx_active_to_active && ctx.tx_id_ != tx->tx_id_; // TXN1 -> TXN2 + bool support_free_route = false, fallback_happened = false; bool return_normal_state = false, return_terminated_state = false, return_fallback_state = false; int64_t state_size = 0; - if (is_tx_start) { - audit_record.tx_start_ = true; - ctx.can_free_route_ = false; - ctx.is_fallbacked_ = false; - ctx.tx_id_ = tx->tx_id_; - ctx.txn_addr_ = self_; + // decide free-route flag for newly started txn + if (is_tx_start || is_tx_switch) { if (proxy_support) { if (!is_xa_tightly_couple) { omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID())); @@ -756,10 +755,8 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx) TRANS_LOG(DEBUG, "observer not upgrade to 4_1_0_0"); } else if (!need_fallback_(*tx, state_size)) { support_free_route = true; - ctx.can_free_route_ = true; - return_normal_state = true; } else { - ctx.is_fallbacked_ = true; + fallback_happened = true; TRANS_LOG(TRACE, "txn free route is enabled but need fallback", K(state_size)); } } else { @@ -769,6 +766,33 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx) } } } + + if (is_tx_start) { + ctx.can_free_route_ = support_free_route; + ctx.is_fallbacked_ = fallback_happened; + ctx.tx_id_ = tx->tx_id_; + ctx.txn_addr_ = self_; + if (support_free_route) { + return_normal_state = true; + } + } + + if (is_tx_switch) { + // if previouse tx is free-routed, need to terminated it + if (!support_free_route && ctx.can_free_route_) { + ctx.prev_tx_id_ = ctx.tx_id_; + audit_record.tx_term_ = true; // terminated prev + return_terminated_state = true; + } + ctx.can_free_route_ = support_free_route; + ctx.is_fallbacked_ = fallback_happened; + ctx.tx_id_ = tx->tx_id_; + ctx.txn_addr_ = self_; + if (support_free_route) { + return_normal_state = true; + } + } + if (is_tx_terminated) { audit_record.tx_term_ = true; // current node should be the txn's start node @@ -786,15 +810,11 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx) return_terminated_state = true; } } - bool fallback_happened = false, fallback_state_synced = false; - if (is_tx_active_to_active) { + bool fallback_state_synced = false; + if (is_tx_active_to_active && !is_tx_switch) { // if on txn start node, and if free_route is open, // refer proxy switch to do fallback if (self_ == ctx.txn_addr_) { - if (ctx.tx_id_ != tx->tx_id_) { - // implicit commit and start new tx - ctx.tx_id_ = tx->tx_id_; - } if (ctx.can_free_route_ && !ctx.is_fallbacked_) { if (!proxy_support || need_fallback_(*tx, state_size)) { ctx.is_fallbacked_ = true; @@ -884,6 +904,8 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx) ctx.set_calculated(); // audit record audit_record.calculated_ = true; + audit_record.tx_start_ = is_tx_start; + audit_record.tx_switch_ = is_tx_switch; audit_record.free_route_ = ctx.can_free_route_; audit_record.fallback_ = ctx.is_fallbacked_; audit_record.chg_static_ = ctx.static_changed_; @@ -896,7 +918,7 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx) #ifndef NDEBUG ObTransID tx_id = tx ? tx->tx_id_ : ObTransID(); TRANS_LOG(INFO, "[tx free route] calc tx free route properities done", K(ret), - K(is_tx_start), K(is_tx_terminated), K(is_tx_active_to_active), + K(is_tx_start), K(is_tx_switch), K(is_tx_terminated), K(is_tx_active_to_active), K(prev_in_txn), K(is_xa), K(is_xa_tightly_couple), K(proxy_support), K(support_free_route), K(fallback_happened), K(fallback_state_synced), diff --git a/src/storage/tx/ob_tx_free_route.h b/src/storage/tx/ob_tx_free_route.h index 96f56183f6..a5f88de8cd 100644 --- a/src/storage/tx/ob_tx_free_route.h +++ b/src/storage/tx/ob_tx_free_route.h @@ -40,28 +40,29 @@ union ObTxnFreeRouteAuditRecord bool tx_term_: 1; // 5 bool free_route_: 1; // 6 bool fallback_: 1; // 7 - bool upd_static_: 1; // 8 - bool upd_parts_: 1; // 9 - bool upd_dyn_: 1; // 10 - bool upd_extra_: 1; // 11 - bool upd_term_: 1; // 12 - bool upd_fallback_: 1; // 13 - bool upd_clean_tx_: 1; // 14 - bool upd_reset_snapshot_: 1; // 15 - bool chg_static_: 1; // 16 - bool chg_dyn_: 1; // 17 - bool chg_parts_: 1; // 18 - bool chg_extra_: 1; // 19 - bool start_node_: 1; // 20 - bool push_state_: 1; // 21 - bool ret_fallback_: 1; // 22 - bool ret_term_: 1; // 23 - bool xa_: 1; // 24 - bool xa_tightly_couple_: 1; // 25 - bool assoc_xa_orig_ :1; // 26 - bool alloc_tx_ :1; // 27 - bool reuse_tx_ :1; // 28 - bool replace_tx_ :1; // 29 + bool tx_switch_: 1; // 8 + bool upd_static_: 1; // 9 + bool upd_parts_: 1; // 10 + bool upd_dyn_: 1; // 11 + bool upd_extra_: 1; // 12 + bool upd_term_: 1; // 13 + bool upd_fallback_: 1; // 14 + bool upd_clean_tx_: 1; // 15 + bool upd_reset_snapshot_: 1; // 16 + bool chg_static_: 1; // 17 + bool chg_dyn_: 1; // 18 + bool chg_parts_: 1; // 19 + bool chg_extra_: 1; // 20 + bool start_node_: 1; // 21 + bool push_state_: 1; // 22 + bool ret_fallback_: 1; // 23 + bool ret_term_: 1; // 24 + bool xa_: 1; // 25 + bool xa_tightly_couple_: 1; // 26 + bool assoc_xa_orig_ :1; // 27 + bool alloc_tx_ :1; // 28 + bool reuse_tx_ :1; // 29 + bool replace_tx_ :1; // 30 }; }; @@ -76,6 +77,7 @@ struct ObTxnFreeRouteCtx { is_txn_switch_ = false; txn_addr_.reset(); tx_id_.reset(); + prev_tx_id_.reset(); is_proxy_support_ = false; in_txn_before_handle_request_ = false; can_free_route_ = false; @@ -126,7 +128,15 @@ private: // updated when receive request // if no txn alive, set to 0.0.0.0 common::ObAddr txn_addr_; + // current tx_id which setup before handle request + // and updated after handle request + // if `switch txn` happended, prev_tx_id_ is used to + // save the before one ObTransID tx_id_; + // the tx_id of previouse txn which implicit committed + // by current request/stmt/command and create a new txn + // also names with `switch txn` (prev_tx_id_ -> tx_id_) + ObTransID prev_tx_id_; // proxy's hint of support future free route // used to fallback on txn start node // used to decide free route when txn start