[master][tx-route][xa] xa-tightly-couple will be enabled free-route incorrectly
This commit is contained in:
@ -68,6 +68,7 @@ void ObTxnFreeRouteCtx::init_before_handle_request(ObTxDesc *tx)
|
|||||||
txn_addr_.reset();
|
txn_addr_.reset();
|
||||||
tx_id_.reset();
|
tx_id_.reset();
|
||||||
}
|
}
|
||||||
|
prev_tx_id_.reset();
|
||||||
reset_changed_();
|
reset_changed_();
|
||||||
++local_version_;
|
++local_version_;
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
@ -184,12 +185,13 @@ inline int ObTransService::txn_state_update_verify_by_version_(const ObTxnFreeRo
|
|||||||
|
|
||||||
|
|
||||||
#define ENCODE_HEADER() \
|
#define ENCODE_HEADER() \
|
||||||
|
auto tx_id = ctx.prev_tx_id_.is_valid() ? ctx.prev_tx_id_ : ctx.tx_id_; \
|
||||||
if (OB_FAIL(OB_E(EventTable::EN_TX_FREE_ROUTE_ENCODE_STATE_ERROR, session_id) OB_SUCCESS)) { \
|
if (OB_FAIL(OB_E(EventTable::EN_TX_FREE_ROUTE_ENCODE_STATE_ERROR, session_id) OB_SUCCESS)) { \
|
||||||
TRANS_LOG(ERROR, "inject failure", K(ret), KPC(tx), K(session_id)); \
|
TRANS_LOG(ERROR, "inject failure", K(ret), KPC(tx), K(session_id)); \
|
||||||
} else if (!ctx.tx_id_.is_valid()) { \
|
} else if (!ctx.tx_id_.is_valid()) { \
|
||||||
ret = OB_ERR_UNEXPECTED; \
|
ret = OB_ERR_UNEXPECTED; \
|
||||||
TRANS_LOG(ERROR, "tx_id is invalid", K(ret), K(ctx)); \
|
TRANS_LOG(ERROR, "tx_id is invalid", K(ret), K(ctx)); \
|
||||||
} else if (OB_FAIL(encode_i64(buf, len, pos, ctx.tx_id_.get_id()))) { \
|
} else if (OB_FAIL(encode_i64(buf, len, pos, tx_id.get_id()))) { \
|
||||||
TRANS_LOG(WARN, "encode tx_id fail", K(ret)); \
|
TRANS_LOG(WARN, "encode tx_id fail", K(ret)); \
|
||||||
} else if (OB_FAIL(encode_i64(buf, len, pos, ctx.global_version_))) { \
|
} else if (OB_FAIL(encode_i64(buf, len, pos, ctx.global_version_))) { \
|
||||||
TRANS_LOG(WARN, "encode global_version fail", K(ret)); \
|
TRANS_LOG(WARN, "encode global_version fail", K(ret)); \
|
||||||
@ -735,17 +737,14 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx)
|
|||||||
bool is_tx_start = !prev_in_txn && in_txn; // IDLE => ACTIVE
|
bool is_tx_start = !prev_in_txn && in_txn; // IDLE => ACTIVE
|
||||||
bool is_tx_terminated = prev_in_txn && !in_txn; // ACTIVE => ROLLBACK/COMMIT/IDLE
|
bool is_tx_terminated = prev_in_txn && !in_txn; // ACTIVE => ROLLBACK/COMMIT/IDLE
|
||||||
bool is_tx_active_to_active = prev_in_txn && in_txn; // ACTIVE => ACTIVE
|
bool is_tx_active_to_active = prev_in_txn && in_txn; // ACTIVE => ACTIVE
|
||||||
bool support_free_route = false;
|
bool is_tx_switch = is_tx_active_to_active && ctx.tx_id_ != tx->tx_id_; // TXN1 -> TXN2
|
||||||
|
bool support_free_route = false, fallback_happened = false;
|
||||||
|
|
||||||
bool return_normal_state = false, return_terminated_state = false, return_fallback_state = false;
|
bool return_normal_state = false, return_terminated_state = false, return_fallback_state = false;
|
||||||
int64_t state_size = 0;
|
int64_t state_size = 0;
|
||||||
|
|
||||||
if (is_tx_start) {
|
// decide free-route flag for newly started txn
|
||||||
audit_record.tx_start_ = true;
|
if (is_tx_start || is_tx_switch) {
|
||||||
ctx.can_free_route_ = false;
|
|
||||||
ctx.is_fallbacked_ = false;
|
|
||||||
ctx.tx_id_ = tx->tx_id_;
|
|
||||||
ctx.txn_addr_ = self_;
|
|
||||||
if (proxy_support) {
|
if (proxy_support) {
|
||||||
if (!is_xa_tightly_couple) {
|
if (!is_xa_tightly_couple) {
|
||||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
|
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
|
||||||
@ -756,10 +755,8 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx)
|
|||||||
TRANS_LOG(DEBUG, "observer not upgrade to 4_1_0_0");
|
TRANS_LOG(DEBUG, "observer not upgrade to 4_1_0_0");
|
||||||
} else if (!need_fallback_(*tx, state_size)) {
|
} else if (!need_fallback_(*tx, state_size)) {
|
||||||
support_free_route = true;
|
support_free_route = true;
|
||||||
ctx.can_free_route_ = true;
|
|
||||||
return_normal_state = true;
|
|
||||||
} else {
|
} else {
|
||||||
ctx.is_fallbacked_ = true;
|
fallback_happened = true;
|
||||||
TRANS_LOG(TRACE, "txn free route is enabled but need fallback", K(state_size));
|
TRANS_LOG(TRACE, "txn free route is enabled but need fallback", K(state_size));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -769,6 +766,33 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (is_tx_start) {
|
||||||
|
ctx.can_free_route_ = support_free_route;
|
||||||
|
ctx.is_fallbacked_ = fallback_happened;
|
||||||
|
ctx.tx_id_ = tx->tx_id_;
|
||||||
|
ctx.txn_addr_ = self_;
|
||||||
|
if (support_free_route) {
|
||||||
|
return_normal_state = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is_tx_switch) {
|
||||||
|
// if previouse tx is free-routed, need to terminated it
|
||||||
|
if (!support_free_route && ctx.can_free_route_) {
|
||||||
|
ctx.prev_tx_id_ = ctx.tx_id_;
|
||||||
|
audit_record.tx_term_ = true; // terminated prev
|
||||||
|
return_terminated_state = true;
|
||||||
|
}
|
||||||
|
ctx.can_free_route_ = support_free_route;
|
||||||
|
ctx.is_fallbacked_ = fallback_happened;
|
||||||
|
ctx.tx_id_ = tx->tx_id_;
|
||||||
|
ctx.txn_addr_ = self_;
|
||||||
|
if (support_free_route) {
|
||||||
|
return_normal_state = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (is_tx_terminated) {
|
if (is_tx_terminated) {
|
||||||
audit_record.tx_term_ = true;
|
audit_record.tx_term_ = true;
|
||||||
// current node should be the txn's start node
|
// current node should be the txn's start node
|
||||||
@ -786,15 +810,11 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx)
|
|||||||
return_terminated_state = true;
|
return_terminated_state = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bool fallback_happened = false, fallback_state_synced = false;
|
bool fallback_state_synced = false;
|
||||||
if (is_tx_active_to_active) {
|
if (is_tx_active_to_active && !is_tx_switch) {
|
||||||
// if on txn start node, and if free_route is open,
|
// if on txn start node, and if free_route is open,
|
||||||
// refer proxy switch to do fallback
|
// refer proxy switch to do fallback
|
||||||
if (self_ == ctx.txn_addr_) {
|
if (self_ == ctx.txn_addr_) {
|
||||||
if (ctx.tx_id_ != tx->tx_id_) {
|
|
||||||
// implicit commit and start new tx
|
|
||||||
ctx.tx_id_ = tx->tx_id_;
|
|
||||||
}
|
|
||||||
if (ctx.can_free_route_ && !ctx.is_fallbacked_) {
|
if (ctx.can_free_route_ && !ctx.is_fallbacked_) {
|
||||||
if (!proxy_support || need_fallback_(*tx, state_size)) {
|
if (!proxy_support || need_fallback_(*tx, state_size)) {
|
||||||
ctx.is_fallbacked_ = true;
|
ctx.is_fallbacked_ = true;
|
||||||
@ -884,6 +904,8 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx)
|
|||||||
ctx.set_calculated();
|
ctx.set_calculated();
|
||||||
// audit record
|
// audit record
|
||||||
audit_record.calculated_ = true;
|
audit_record.calculated_ = true;
|
||||||
|
audit_record.tx_start_ = is_tx_start;
|
||||||
|
audit_record.tx_switch_ = is_tx_switch;
|
||||||
audit_record.free_route_ = ctx.can_free_route_;
|
audit_record.free_route_ = ctx.can_free_route_;
|
||||||
audit_record.fallback_ = ctx.is_fallbacked_;
|
audit_record.fallback_ = ctx.is_fallbacked_;
|
||||||
audit_record.chg_static_ = ctx.static_changed_;
|
audit_record.chg_static_ = ctx.static_changed_;
|
||||||
@ -896,7 +918,7 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx)
|
|||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
ObTransID tx_id = tx ? tx->tx_id_ : ObTransID();
|
ObTransID tx_id = tx ? tx->tx_id_ : ObTransID();
|
||||||
TRANS_LOG(INFO, "[tx free route] calc tx free route properities done", K(ret),
|
TRANS_LOG(INFO, "[tx free route] calc tx free route properities done", K(ret),
|
||||||
K(is_tx_start), K(is_tx_terminated), K(is_tx_active_to_active),
|
K(is_tx_start), K(is_tx_switch), K(is_tx_terminated), K(is_tx_active_to_active),
|
||||||
K(prev_in_txn), K(is_xa), K(is_xa_tightly_couple), K(proxy_support),
|
K(prev_in_txn), K(is_xa), K(is_xa_tightly_couple), K(proxy_support),
|
||||||
K(support_free_route),
|
K(support_free_route),
|
||||||
K(fallback_happened), K(fallback_state_synced),
|
K(fallback_happened), K(fallback_state_synced),
|
||||||
|
|||||||
@ -40,28 +40,29 @@ union ObTxnFreeRouteAuditRecord
|
|||||||
bool tx_term_: 1; // 5
|
bool tx_term_: 1; // 5
|
||||||
bool free_route_: 1; // 6
|
bool free_route_: 1; // 6
|
||||||
bool fallback_: 1; // 7
|
bool fallback_: 1; // 7
|
||||||
bool upd_static_: 1; // 8
|
bool tx_switch_: 1; // 8
|
||||||
bool upd_parts_: 1; // 9
|
bool upd_static_: 1; // 9
|
||||||
bool upd_dyn_: 1; // 10
|
bool upd_parts_: 1; // 10
|
||||||
bool upd_extra_: 1; // 11
|
bool upd_dyn_: 1; // 11
|
||||||
bool upd_term_: 1; // 12
|
bool upd_extra_: 1; // 12
|
||||||
bool upd_fallback_: 1; // 13
|
bool upd_term_: 1; // 13
|
||||||
bool upd_clean_tx_: 1; // 14
|
bool upd_fallback_: 1; // 14
|
||||||
bool upd_reset_snapshot_: 1; // 15
|
bool upd_clean_tx_: 1; // 15
|
||||||
bool chg_static_: 1; // 16
|
bool upd_reset_snapshot_: 1; // 16
|
||||||
bool chg_dyn_: 1; // 17
|
bool chg_static_: 1; // 17
|
||||||
bool chg_parts_: 1; // 18
|
bool chg_dyn_: 1; // 18
|
||||||
bool chg_extra_: 1; // 19
|
bool chg_parts_: 1; // 19
|
||||||
bool start_node_: 1; // 20
|
bool chg_extra_: 1; // 20
|
||||||
bool push_state_: 1; // 21
|
bool start_node_: 1; // 21
|
||||||
bool ret_fallback_: 1; // 22
|
bool push_state_: 1; // 22
|
||||||
bool ret_term_: 1; // 23
|
bool ret_fallback_: 1; // 23
|
||||||
bool xa_: 1; // 24
|
bool ret_term_: 1; // 24
|
||||||
bool xa_tightly_couple_: 1; // 25
|
bool xa_: 1; // 25
|
||||||
bool assoc_xa_orig_ :1; // 26
|
bool xa_tightly_couple_: 1; // 26
|
||||||
bool alloc_tx_ :1; // 27
|
bool assoc_xa_orig_ :1; // 27
|
||||||
bool reuse_tx_ :1; // 28
|
bool alloc_tx_ :1; // 28
|
||||||
bool replace_tx_ :1; // 29
|
bool reuse_tx_ :1; // 29
|
||||||
|
bool replace_tx_ :1; // 30
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -76,6 +77,7 @@ struct ObTxnFreeRouteCtx {
|
|||||||
is_txn_switch_ = false;
|
is_txn_switch_ = false;
|
||||||
txn_addr_.reset();
|
txn_addr_.reset();
|
||||||
tx_id_.reset();
|
tx_id_.reset();
|
||||||
|
prev_tx_id_.reset();
|
||||||
is_proxy_support_ = false;
|
is_proxy_support_ = false;
|
||||||
in_txn_before_handle_request_ = false;
|
in_txn_before_handle_request_ = false;
|
||||||
can_free_route_ = false;
|
can_free_route_ = false;
|
||||||
@ -126,7 +128,15 @@ private:
|
|||||||
// updated when receive request
|
// updated when receive request
|
||||||
// if no txn alive, set to 0.0.0.0
|
// if no txn alive, set to 0.0.0.0
|
||||||
common::ObAddr txn_addr_;
|
common::ObAddr txn_addr_;
|
||||||
|
// current tx_id which setup before handle request
|
||||||
|
// and updated after handle request
|
||||||
|
// if `switch txn` happended, prev_tx_id_ is used to
|
||||||
|
// save the before one
|
||||||
ObTransID tx_id_;
|
ObTransID tx_id_;
|
||||||
|
// the tx_id of previouse txn which implicit committed
|
||||||
|
// by current request/stmt/command and create a new txn
|
||||||
|
// also names with `switch txn` (prev_tx_id_ -> tx_id_)
|
||||||
|
ObTransID prev_tx_id_;
|
||||||
// proxy's hint of support future free route
|
// proxy's hint of support future free route
|
||||||
// used to fallback on txn start node
|
// used to fallback on txn start node
|
||||||
// used to decide free route when txn start
|
// used to decide free route when txn start
|
||||||
|
|||||||
Reference in New Issue
Block a user