From 4ea455b9f9df59b208d431f9ff7116d7f1fc8b40 Mon Sep 17 00:00:00 2001 From: chinaxing Date: Thu, 25 May 2023 08:11:51 +0000 Subject: [PATCH] [tx-route] support receive duplicate state from proxy --- deps/oblib/src/rpc/obmysql/ob_mysql_packet.h | 4 +- src/observer/mysql/obmp_connect.cpp | 1 + src/observer/mysql/obmp_utils.cpp | 2 +- src/sql/session/ob_sql_session_info.cpp | 2 + src/storage/tx/ob_tx_free_route.cpp | 309 ++++++++++++------ src/storage/tx/ob_tx_free_route.h | 87 ++++- unittest/storage/tx/it/test_tx_free_route.cpp | 8 +- 7 files changed, 295 insertions(+), 118 deletions(-) diff --git a/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h b/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h index b002ff775d..184504974b 100644 --- a/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h +++ b/deps/oblib/src/rpc/obmysql/ob_mysql_packet.h @@ -192,7 +192,9 @@ union ObProxyCapabilityFlags uint64_t OB_CAP_PROXY_SESSION_VAR_SYNC: 1; uint64_t OB_CAP_PROXY_WEAK_STALE_FEEDBACK: 1; uint64_t OB_CAP_PROXY_FULL_LINK_TRACING_EXT: 1; - uint64_t OB_CAP_RESERVED_NOT_USE: 45; + // duplicate session_info sync of transaction type + uint64_t OB_CAP_SERVER_DUP_SESS_INFO_SYNC: 1; + uint64_t OB_CAP_RESERVED_NOT_USE: 44; } cap_flags_; }; diff --git a/src/observer/mysql/obmp_connect.cpp b/src/observer/mysql/obmp_connect.cpp index dd6025c079..eccdbaa69c 100644 --- a/src/observer/mysql/obmp_connect.cpp +++ b/src/observer/mysql/obmp_connect.cpp @@ -1487,6 +1487,7 @@ int ObMPConnect::check_update_proxy_capability(ObSMConnection &conn) const } server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_SESSION_VAR_SYNC = 1; server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_FULL_LINK_TRACING_EXT = 1; + server_proxy_cap_flag.cap_flags_.OB_CAP_SERVER_DUP_SESS_INFO_SYNC = 1; conn.proxy_cap_flags_.capability_ = (server_proxy_cap_flag.capability_ & client_proxy_cap);//if old java client, set it 0 LOG_DEBUG("Negotiated capability", diff --git a/src/observer/mysql/obmp_utils.cpp b/src/observer/mysql/obmp_utils.cpp index dafaebd696..257441c128 100644 --- a/src/observer/mysql/obmp_utils.cpp +++ b/src/observer/mysql/obmp_utils.cpp @@ -183,7 +183,7 @@ int ObMPUtils::sync_session_info(sql::ObSQLSessionInfo &sess, const common::ObSt } // phase 2: handle txn relative types in order if (OB_SUCC(ret) && has_txn_type) { - for(int info_type = min; info_type <= max; info_type++) { + for(int info_type = min; OB_SUCC(ret) && info_type <= max; info_type++) { auto &info = txn_type_infos[info_type - min]; if (info.has) { if (OB_FAIL(sess.update_sess_sync_info((sql::SessionSyncInfoType)info_type, buf, info.pos + info.len, info.pos))) { diff --git a/src/sql/session/ob_sql_session_info.cpp b/src/sql/session/ob_sql_session_info.cpp index 1807b46de5..7f29750088 100644 --- a/src/sql/session/ob_sql_session_info.cpp +++ b/src/sql/session/ob_sql_session_info.cpp @@ -201,6 +201,7 @@ int ObSQLSessionInfo::init(uint32_t sessid, uint64_t proxy_sessid, static const int64_t PS_BUCKET_NUM = 64; if (OB_FAIL(ObBasicSessionInfo::init(sessid, proxy_sessid, bucket_allocator, tz_info))) { LOG_WARN("fail to init basic session info", K(ret)); + } else if (FALSE_IT(txn_free_route_ctx_.set_sessid(sessid))) { } else if (!is_acquire_from_pool() && OB_FAIL(package_state_map_.create(hash::cal_next_prime(4), "PackStateMap", @@ -243,6 +244,7 @@ int ObSQLSessionInfo::test_init(uint32_t version, uint32_t sessid, uint64_t prox UNUSED(version); if (OB_FAIL(ObBasicSessionInfo::test_init(sessid, proxy_sessid, bucket_allocator))) { LOG_WARN("fail to init basic session info", K(ret)); + } else if (FALSE_IT(txn_free_route_ctx_.set_sessid(sessid))) { } else { is_inited_ = true; } diff --git a/src/storage/tx/ob_tx_free_route.cpp b/src/storage/tx/ob_tx_free_route.cpp index 2739f9f430..eb58cf4fd7 100644 --- a/src/storage/tx/ob_tx_free_route.cpp +++ b/src/storage/tx/ob_tx_free_route.cpp @@ -42,7 +42,6 @@ bool ObTxnFreeRouteCtx::is_temp(const ObTxDesc &tx) const void ObTxnFreeRouteCtx::init_before_update_state(bool proxy_support) { is_proxy_support_ = proxy_support; - global_version_water_mark_ = global_version_; is_txn_switch_ = false; } @@ -135,76 +134,173 @@ int ObTransService::txn_free_route__sanity_check_fallback_(ObTxDesc *tx, ObTxnFr return ret; } -inline int ObTxnFreeRouteCtx::state_update_verify_by_version(const int64_t version) const +inline int ObTxnFreeRouteCtx::state_update_verify_by_version(const TxnFreeRouteState state, + const int64_t version, + const uint32_t backend_sess_id, + bool &dup_sync) const { int ret = OB_SUCCESS; // if ctx is switch to new txn in this request // water_mark was established by static state // the following state (dyn, parts, extra) should be >= water_mark - if (is_txn_switch_) { - if (global_version_water_mark_ > version) { + if (is_txn_switch_ && global_version_water_mark_ > version) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "the state is stale", K(ret)); + } + dup_sync = false; + auto &sync_info = state_sync_infos_[state]; + if (sync_info.last_version_ > version) { + // stale + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "receive stale state", K(ret)); + } else if (sync_info.last_version_ == version) { + if (backend_sess_id > 0 + && sync_info.last_backend_sess_id_ > 0 + && sync_info.last_backend_sess_id_ != backend_sess_id) { + // invalid, state of same version from diff backend_session ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "the state is stale", K(ret), K(version)); + TRANS_LOG(ERROR, "receive diverged state", K(ret)); + } else { + // duplicate + dup_sync = true; + TRANS_LOG(INFO, "receive duplicate state", K(ret), K(state)); } - // otherwise, the new state's version should be > water_mark - } else if (global_version_water_mark_ == version) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "duplicated state sync", K(ret), K(version)); - } else if (global_version_water_mark_ > version) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "the state is stale", K(ret), K(version)); + } else { + // pass } return ret; } -#define DECODE_HEADER_BASE() \ - ObTxnFreeRouteFlag flag; \ - ObTransID tx_id; \ - int64_t global_version; \ - { \ - int64_t tmp_tx_id = 0; \ - if (OB_FAIL(OB_E(EventTable::EN_TX_FREE_ROUTE_UPDATE_STATE_ERROR, session_id) OB_SUCCESS)) { \ - TRANS_LOG(ERROR, "inject failure", K(ret), KPC(tx), K(session_id)); \ - } else if (OB_FAIL(decode_i64(buf, len, pos, &tmp_tx_id))) { \ - TRANS_LOG(ERROR, "decode tx_id fail", K(ret)); \ - } else if (FALSE_IT(tx_id = ObTransID(tmp_tx_id))) { \ - } else if (OB_FAIL(decode_i64(buf, len, pos, &global_version))) { \ - TRANS_LOG(ERROR, "decode global_version fail", K(ret)); \ - } else if (OB_FAIL(decode_i8(buf, len, pos, &flag.v_))) { \ - TRANS_LOG(ERROR, "decode flag fail", K(ret)); \ - } \ +struct TxStateHeader { + uint8_t compat_ver_; + ObTransID tx_id_; + int64_t global_version_; + ObTxnFreeRouteFlag flag_; + uint32_t backend_sess_id_; + static const uint8_t VER_0 = 0; + static const uint8_t VER_1 = 1; + static const uint8_t VERSION = VER_1; +private: + static bool with_version_() { return GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_1_0_1; } +public: + TxStateHeader(): tx_id_(), global_version_(0), flag_(), backend_sess_id_(0) {} + TO_STRING_KV(K_(compat_ver), K_(tx_id), K_(global_version), K_(flag), K_(backend_sess_id)); + int encode(char* buf, const int64_t len, int64_t &pos); + static int64_t encode_length(); + int decode(const char* buf, const int64_t len, int64_t &pos); +}; + +int64_t TxStateHeader::encode_length() +{ + int64_t l = encoded_length_i64(1) + + encoded_length_i64(1) + + encoded_length_i8(1); + if (with_version_()) { + l += encoded_length_i16(100); // length + l += encoded_length_i8(1); // version + l += encoded_length_i32(1); // backend_sess_id } + return l; +} -#define DECODE_HEADER() \ - DECODE_HEADER_BASE() \ - if (OB_FAIL(ret)) { \ - } else if (OB_FAIL(ctx.state_update_verify_by_version(global_version))) { \ - } else if (!tx_id.is_valid()) { \ - ret = OB_ERR_UNEXPECTED; \ - TRANS_LOG(ERROR, "tx id is invalid", K(ret)); \ - } else if (ctx.global_version_ < global_version) { \ - ctx.global_version_ = global_version; \ - } \ - -#define ENCODE_HEADER() \ - auto tx_id = ctx.prev_tx_id_.is_valid() ? ctx.prev_tx_id_ : ctx.tx_id_; \ - if (OB_FAIL(OB_E(EventTable::EN_TX_FREE_ROUTE_ENCODE_STATE_ERROR, session_id) OB_SUCCESS)) { \ - TRANS_LOG(ERROR, "inject failure", K(ret), KPC(tx), K(session_id)); \ - } else if (!ctx.tx_id_.is_valid()) { \ - ret = OB_ERR_UNEXPECTED; \ - TRANS_LOG(ERROR, "tx_id is invalid", K(ret), K(ctx)); \ - } else if (OB_FAIL(encode_i64(buf, len, pos, tx_id.get_id()))) { \ - TRANS_LOG(WARN, "encode tx_id fail", K(ret)); \ - } else if (OB_FAIL(encode_i64(buf, len, pos, ctx.global_version_))) { \ - TRANS_LOG(WARN, "encode global_version fail", K(ret)); \ - } else if (OB_FAIL(encode_i8(buf, len, pos, ctx.flag_.v_))) { \ - TRANS_LOG(WARN, "encode flag fail", K(ret)); \ +int TxStateHeader::encode(char* buf, const int64_t len, int64_t &pos) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(encode_i64(buf, len, pos, tx_id_.get_id()))) { + TRANS_LOG(WARN, "encode tx_id fail", K(ret)); + } else if (OB_FAIL(encode_i64(buf, len, pos, global_version_))) { + TRANS_LOG(WARN, "encode global_version fail", K(ret)); + } else { + const bool with_version = with_version_(); + flag_.set_with_version(with_version); + if (OB_FAIL(encode_i8(buf, len, pos, flag_.v_))) { + TRANS_LOG(WARN, "encode flag fail", K(ret)); + } else if (with_version) { + if (OB_FAIL(encode_i16(buf, len, pos, encode_length()))) { + TRANS_LOG(WARN, "encode header len fail", K(ret)); + } else if (OB_FAIL(encode_i8(buf, len, pos, (int)VERSION))) { + TRANS_LOG(WARN, "encode version fail", K(ret)); + } else if (OB_FAIL(encode_i32(buf, len, pos, backend_sess_id_))) { + TRANS_LOG(WARN, "encode backend_sess_id fail", K(ret)); + } + } } + return ret; +} -#define ENCODE_HEADER_LENGTH() \ - int64_t l = encoded_length_i64(ctx.tx_id_.get_id()) \ - + encoded_length_i64(ctx.global_version_) \ - + encoded_length_i8(ctx.flag_.v_) +int TxStateHeader::decode(const char* buf, const int64_t len, int64_t &pos) +{ + int ret = OB_SUCCESS; + int64_t tmp_tx_id = 0, pos0 = pos; + if (OB_FAIL(decode_i64(buf, len, pos, &tmp_tx_id))) { + TRANS_LOG(ERROR, "decode tx_id fail", K(ret)); + } else if (FALSE_IT(tx_id_ = ObTransID(tmp_tx_id))) { + } else if (OB_FAIL(decode_i64(buf, len, pos, &global_version_))) { + TRANS_LOG(ERROR, "decode global_version fail", K(ret)); + } else if (OB_FAIL(decode_i8(buf, len, pos, &flag_.v_))) { + TRANS_LOG(ERROR, "decode flag fail", K(ret)); + } + if (OB_SUCC(ret) && flag_.is_with_version()) { + int16_t header_len = 0; + if (OB_FAIL(decode_i16(buf, len, pos, &header_len))) { + TRANS_LOG(ERROR, "decode header len fail", K(ret)); + } else if (OB_FAIL(decode_i8(buf, pos0 + header_len, pos, (int8_t*)&compat_ver_))) { + TRANS_LOG(ERROR, "decode version fail", K(ret)); + } else { + if (compat_ver_ >= VER_1 && OB_FAIL(decode_i32(buf, pos0 + header_len , pos, (int32_t*)&backend_sess_id_))) { + TRANS_LOG(ERROR, "decode backend_sess_id fail", K(ret)); + } + if (OB_SUCC(ret)) { + pos = pos0 + header_len; + } + } + } + return ret; +} + +static int process_header_(TxStateHeader &header, + ObTxnFreeRouteCtx &ctx, + const TxnFreeRouteState cur_state, + const char* buf, + const int64_t len, + int64_t &pos, + bool &dup_sync) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(OB_E(EventTable::EN_TX_FREE_ROUTE_UPDATE_STATE_ERROR, ctx.get_session_id()) OB_SUCCESS)) { + TRANS_LOG(ERROR, "inject failure", K(ret), K(ctx)); + } else if (OB_FAIL(header.decode(buf, len, pos))) { + TRANS_LOG(ERROR, "decode header fail", K(ret)); + } else if (OB_FAIL(ctx.state_update_verify_by_version(cur_state, header.global_version_, header.backend_sess_id_, dup_sync))) { + TRANS_LOG(WARN, "version verify failed", K(ret), K(header)); + } else if (!header.tx_id_.is_valid()) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "tx id is invalid", K(ret)); + } + return ret; +} + +static int encode_header_(const ObTxnFreeRouteCtx &ctx, char* buf, const int64_t len, int64_t &pos) +{ + int ret = OB_SUCCESS; + TxStateHeader header; + auto &tx_id = ctx.get_prev_tx_id().is_valid() ? ctx.get_prev_tx_id() : ctx.get_tx_id(); + if (OB_FAIL(OB_E(EventTable::EN_TX_FREE_ROUTE_ENCODE_STATE_ERROR, ctx.get_session_id()) OB_SUCCESS)) { + TRANS_LOG(ERROR, "inject failure", K(ret), K(ctx)); + } else if (!tx_id.is_valid()) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "tx_id is invalid", K(ret), K(ctx)); + } else { + header.tx_id_ = tx_id; + header.global_version_ = ctx.get_global_version(); + header.flag_ = ctx.get_flag(); + header.backend_sess_id_ = ctx.get_session_id(); + if (OB_FAIL(header.encode(buf, len, pos))) { + TRANS_LOG(WARN, "encode header fail", K(ret)); + } + } + return ret; +} int ObTransService::txn_free_route__kill_session_(const uint32_t session_id) { @@ -266,6 +362,16 @@ int ObTransService::txn_free_route__handle_tx_exist_(const ObTransID &tx_id, ObT return ret; } +#define TXN_FREE_ROUTE_PROCESS_HEADER(state_type) \ + TxStateHeader header; \ + bool dup_sync = false; \ + if (OB_FAIL(process_header_(header, ctx, state_type, buf, len, pos, dup_sync))) { \ + TRANS_LOG(WARN, "process header fail", K(ret)); \ + } else if (dup_sync) { \ + TRANS_LOG(INFO, "duplicate sync", K(state_type), K(ctx), K(header)); \ + return OB_SUCCESS; \ + } + int ObTransService::txn_free_route__update_static_state(const uint32_t session_id, ObTxDesc *&tx, ObTxnFreeRouteCtx &ctx, @@ -278,25 +384,21 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i auto &audit_record = ctx.audit_record_; audit_record.upd_static_ = true; auto before_tx_id = OB_NOT_NULL(tx) ? tx->tx_id_ : ObTransID(); - DECODE_HEADER(); - if (OB_SUCC(ret)) { - ctx.is_txn_switch_ = true; - ctx.global_version_water_mark_ = global_version; - } + TXN_FREE_ROUTE_PROCESS_HEADER(TxnFreeRouteState::STATIC); if (OB_FAIL(ret)) { - } else if (flag.is_tx_terminated_) { + } else if (header.flag_.is_tx_terminated()) { audit_record.upd_term_ = true; audit_record.upd_clean_tx_ = OB_NOT_NULL(tx); - if (OB_NOT_NULL(tx) && OB_FAIL(clean_txn_state_(tx, ctx, tx_id))) { - TRANS_LOG(WARN, "cleanup prev txn state fail", K(ret), K(tx_id), K(tx)); + if (OB_NOT_NULL(tx) && OB_FAIL(clean_txn_state_(tx, ctx, header.tx_id_))) { + TRANS_LOG(WARN, "cleanup prev txn state fail", K(ret), K(tx)); } - } else if (flag.is_fallback_) { + } else if (header.flag_.is_fallback()) { audit_record.upd_fallback_ = true; ret = txn_free_route__sanity_check_fallback_(tx, ctx); } else { if (OB_ISNULL(tx)) { - if (OB_FAIL(txn_free_route__handle_tx_exist_(tx_id, audit_record, tx))) { - TRANS_LOG(WARN, "handle tx exist fail", K(ret), K(tx_id)); + if (OB_FAIL(txn_free_route__handle_tx_exist_(header.tx_id_, audit_record, tx))) { + TRANS_LOG(WARN, "handle tx exist fail", K(ret)); } else if (OB_ISNULL(tx)) { audit_record.alloc_tx_ = true; if (OB_FAIL(acquire_tx(tx, session_id))) { @@ -308,7 +410,7 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i // reuse, overwrite need_add_tx = true; audit_record.reuse_tx_ = true; - } else if (tx->tx_id_ != tx_id) { + } else if (tx->tx_id_ != header.tx_id_) { // replace audit_record.replace_tx_ = true; tx_desc_mgr_.remove(*tx); @@ -346,7 +448,7 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i auto elapsed_us = ObTimeUtility::current_time() - start_ts; ObTransTraceLog &tlog = tx->get_tlog(); REC_TRANS_TRACE_EXT(&tlog, tx_free_route_update_static, OB_Y(ret), - OB_ID(txid), tx_id.get_id(), + OB_ID(txid), header.tx_id_.get_id(), OB_ID(from), before_tx_id.get_id(), OB_ID(time_used), elapsed_us, OB_ID(length), len, @@ -356,10 +458,13 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i } } #ifndef NDEBUG - TRANS_LOG(INFO, "update-static", K(tx_id), K(flag)); + TRANS_LOG(INFO, "update-static", K(header)); #endif + if (OB_SUCC(ret)) { + ctx.update_last_synced_state(TxnFreeRouteState::STATIC, header.backend_sess_id_, header.global_version_); + } if (OB_FAIL(ret)) { - TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(flag), K(before_tx_id), K(tx_id), + TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(header), K(before_tx_id), K(session_id), K(ctx), KP(tx)); } return ret; @@ -395,20 +500,20 @@ int ObTransService::txn_free_route__update_dynamic_state(const uint32_t session_ auto &audit_record = ctx.audit_record_; audit_record.upd_dyn_ = true; int64_t logic_clock = 0; - DECODE_HEADER(); + TXN_FREE_ROUTE_PROCESS_HEADER(TxnFreeRouteState::DYNAMIC); if (OB_FAIL(ret)) { - } else if (flag.is_tx_terminated_) { + } else if (header.flag_.is_tx_terminated()) { audit_record.upd_term_ = true; if (OB_NOT_NULL(tx)) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(ERROR, "tx should be null: released in static state update", K(ret), K(tx->tx_id_)); } - } else if (flag.is_fallback_) { + } else if (header.flag_.is_fallback()) { audit_record.upd_fallback_ = true; ret = txn_free_route__sanity_check_fallback_(tx, ctx); } else if (OB_ISNULL(tx)) { ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "tx should not be null", K(ret), K(tx_id), K(flag), K(session_id)); + TRANS_LOG(ERROR, "tx should not be null", K(ret), K(session_id)); } else { auto start_ts = ObTimeUtility::current_time(); ObSpinLockGuard guard(tx->lock_); @@ -427,14 +532,17 @@ int ObTransService::txn_free_route__update_dynamic_state(const uint32_t session_ ObTransTraceLog &tlog = tx->get_tlog(); REC_TRANS_TRACE_EXT(&tlog, tx_free_route_update_dynamic, OB_Y(ret), OB_ID(time_used), elapsed_us, - OB_ID(txid), tx_id.get_id(), + OB_ID(txid), header.tx_id_.get_id(), OB_ID(logic_clock), logic_clock, OB_ID(length), len, OB_ID(ref), tx->get_ref(), OB_ID(thread_id), GETTID()); } + if (OB_SUCC(ret)) { + ctx.update_last_synced_state(TxnFreeRouteState::DYNAMIC, header.backend_sess_id_, header.global_version_); + } if (OB_FAIL(ret)) { - TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(flag), K(tx_id), K(logic_clock), + TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(header), K(logic_clock), K(session_id), K(ctx), KP(tx)); } return ret; @@ -450,9 +558,9 @@ int ObTransService::txn_free_route__update_parts_state(const uint32_t session_id int ret = OB_SUCCESS; auto &audit_record = ctx.audit_record_; audit_record.upd_parts_ = true; - DECODE_HEADER(); + TXN_FREE_ROUTE_PROCESS_HEADER(TxnFreeRouteState::PARTICIPANT); if (OB_FAIL(ret)) { - } else if (flag.is_tx_terminated_) { + } else if (header.flag_.is_tx_terminated()) { audit_record.upd_term_ = true; // [prev req] : [action] // : do nothing @@ -463,33 +571,36 @@ int ObTransService::txn_free_route__update_parts_state(const uint32_t session_id ObSpinLockGuard guard(tx->lock_); tx->parts_.reset(); } - } else if (flag.is_fallback_) { + } else if (header.flag_.is_fallback()) { audit_record.upd_fallback_ = true; ret = txn_free_route__sanity_check_fallback_(tx, ctx); } else if (OB_ISNULL(tx)) { ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "tx should not be null", K(ret), K(tx_id), K(flag), K(session_id)); + TRANS_LOG(ERROR, "tx should not be null", K(ret), K(session_id)); } else { auto start_ts = ObTimeUtility::current_time(); ObSpinLockGuard guard(tx->lock_); if (!tx->tx_id_.is_valid()) { // bug, dynamic state exist, txn should be active ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "tx id should be active", K(ret), K(tx_id), K(tx->tx_id_)); + TRANS_LOG(ERROR, "tx id should be active", K(ret), K(tx->tx_id_)); } else if (OB_FAIL(tx->decode_parts_state(buf, len, pos))) { TRANS_LOG(WARN, "decode participants fail", K(ret)); } auto elapsed_us = ObTimeUtility::current_time() - start_ts; ObTransTraceLog &tlog = tx->get_tlog(); REC_TRANS_TRACE_EXT(&tlog, tx_free_route_update_participants, OB_Y(ret), - OB_ID(txid), tx_id.get_id(), + OB_ID(txid), header.tx_id_.get_id(), OB_ID(time_used), elapsed_us, OB_ID(length), len, OB_ID(ref), tx->get_ref(), OB_ID(thread_id), GETTID()); } + if (OB_SUCC(ret)) { + ctx.update_last_synced_state(TxnFreeRouteState::PARTICIPANT, header.backend_sess_id_, header.global_version_); + } if (OB_FAIL(ret)) { - TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(flag), K(tx_id), K(session_id), K(ctx), KP(tx)); + TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(header), K(session_id), K(ctx), KP(tx)); } return ret; } @@ -505,9 +616,9 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id int64_t logic_clock = 0; auto &audit_record = ctx.audit_record_; audit_record.upd_extra_ = true; - DECODE_HEADER(); + TXN_FREE_ROUTE_PROCESS_HEADER(TxnFreeRouteState::EXTRA); if (OB_FAIL(ret)) { - } else if (flag.is_tx_terminated_) { + } else if (header.flag_.is_tx_terminated()) { audit_record.upd_term_ = true; // [prev req] : [action] // : cleanup snapshot_version_, snapshot_scn @@ -520,12 +631,12 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id tx->snapshot_version_.reset(); tx->snapshot_scn_ = 0; } - } else if (flag.is_fallback_) { + } else if (header.flag_.is_fallback()) { audit_record.upd_fallback_ = true; ret = txn_free_route__sanity_check_fallback_(tx, ctx); } else { bool add_tx = OB_ISNULL(tx); - bool replace_tx = OB_NOT_NULL(tx) && tx->tx_id_ != tx_id; + bool replace_tx = OB_NOT_NULL(tx) && tx->tx_id_ != header.tx_id_; auto before_tx_id = OB_NOT_NULL(tx) ? tx->tx_id_ : ObTransID(); audit_record.replace_tx_ = replace_tx; audit_record.alloc_tx_ = add_tx; @@ -542,7 +653,7 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id if (OB_SUCC(ret) && replace_tx && tx->tx_id_.is_valid()) { if (OB_UNLIKELY(tx->in_tx_for_free_route())) { ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "try overwrite tx which is active", K(ret), K(tx_id), K(tx->tx_id_)); + TRANS_LOG(ERROR, "try overwrite tx which is active", K(ret), K(tx->tx_id_)); } else if (OB_FAIL(tx_desc_mgr_.remove(*tx))) { TRANS_LOG(WARN, "unregister old tx fail", K(ret), K(tx->tx_id_)); } @@ -563,7 +674,7 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id auto elapsed_us = ObTimeUtility::current_time() - start_ts; ObTransTraceLog &tlog = tx->get_tlog(); REC_TRANS_TRACE_EXT(&tlog, tx_free_route_update_extra, OB_Y(ret), - OB_ID(txid), tx_id.get_id(), + OB_ID(txid), header.tx_id_.get_id(), OB_ID(from), before_tx_id.get_id(), OB_ID(time_used), elapsed_us, OB_ID(logic_clock), logic_clock, @@ -575,8 +686,11 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id } } } + if (OB_SUCC(ret)) { + ctx.update_last_synced_state(TxnFreeRouteState::EXTRA, header.backend_sess_id_, header.global_version_); + } if (OB_FAIL(ret)) { - TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(flag), K(tx_id), K(logic_clock), + TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(header), K(logic_clock), K(session_id), K(ctx), KP(tx)); if (OB_NOT_NULL(tx)) { ObSpinLockGuard guard(tx->lock_); @@ -631,13 +745,13 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id DEF_TXN_FREE_ROUTE_SERIALIZE(type) \ { \ int ret = OB_SUCCESS; \ - ENCODE_HEADER(); \ + ret = encode_header_(ctx, buf, len, pos); \ TXN_ENCODE_NORMAL_STATE_X(type, ##__VA_ARGS__); \ return ret; \ } \ - DEF_TXN_FREE_ROUTE_SERIALIZE_LENGTH(type) \ + DEF_TXN_FREE_ROUTE_SERIALIZE_LENGTH(type) \ { \ - ENCODE_HEADER_LENGTH(); \ + int64_t l = TxStateHeader::encode_length(); \ ENCODE_NORMAL_STATE_LENGTH(type, ##__VA_ARGS__); \ return l; \ } @@ -884,7 +998,7 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx) } if (return_fallback_state) { audit_record.ret_fallback_ = true; - ctx.flag_.is_fallback_ = true; + ctx.flag_.set_fallback(); ctx.static_changed_ = true; ctx.dynamic_changed_ = true; ctx.parts_changed_ = true; @@ -892,7 +1006,7 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx) } if (return_terminated_state) { audit_record.ret_term_ = true; - ctx.flag_.is_tx_terminated_ = true; + ctx.flag_.set_tx_terminated(); ctx.static_changed_ = true; ctx.dynamic_changed_ = true; ctx.parts_changed_ = true; @@ -908,6 +1022,9 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx) } if (ctx.is_changed()) { ctx.inc_global_version(); + if (ctx.static_changed_) { + ctx.global_version_water_mark_ = ctx.global_version_; + } } ctx.set_calculated(); // audit record diff --git a/src/storage/tx/ob_tx_free_route.h b/src/storage/tx/ob_tx_free_route.h index c1f49c4a9a..42d4e7d824 100644 --- a/src/storage/tx/ob_tx_free_route.h +++ b/src/storage/tx/ob_tx_free_route.h @@ -16,16 +16,26 @@ namespace transaction { class ObTxDesc; union ObTxnFreeRouteFlag { int8_t v_; - struct { - // it is terminated (committed or rollbacked) - bool is_tx_terminated_ : 1; - // it is fallbacked to fixed route - bool is_fallback_ : 1; + static const int TX_TERMINATED_OFFSET = 0; + static const int FALLBACK_OFFSET = 1; + static const int IDLE_RELEASED_OFFSET = 2; + static const int STATE_MASK = ~(1 << 7); + static const int WITH_VERSION_OFFSET = 7; + // it is terminated (committed or rollbacked) + bool is_tx_terminated() const { return (v_ & (1 << TX_TERMINATED_OFFSET)) != 0; } + void set_tx_terminated() { v_ |= (1 << TX_TERMINATED_OFFSET); } + // it is fallbacked to fixed route + bool is_fallback() const { return (v_ & (1 << FALLBACK_OFFSET)) !=0; } + void set_fallback() { v_ |= (1 << FALLBACK_OFFSET); } // it is released during session idle, by doing check alive - bool is_idle_released_ : 1; - }; - bool is_return_normal_state() const { return v_ == 0; } - TO_STRING_KV(K_(is_tx_terminated), K_(is_fallback), K_(is_idle_released)); + bool is_idle_released() const { return (v_ & (1 << IDLE_RELEASED_OFFSET)) !=0; } + void set_idle_released() { v_ |= (1 << IDLE_RELEASED_OFFSET); } + // identify new Pkt format : Header part has version + bool is_with_version() const { return (v_ & (1 << WITH_VERSION_OFFSET)) !=0; } + void set_with_version(bool b) { if (b) { v_ |= (1 << WITH_VERSION_OFFSET); } else { v_ &= ~(1 << WITH_VERSION_OFFSET); } } + bool is_return_normal_state() const { return (v_ & STATE_MASK) == 0; } + void reset() { v_ = 0; } + TO_STRING_KV(K_(v)); }; union ObTxnFreeRouteAuditRecord @@ -66,6 +76,10 @@ union ObTxnFreeRouteAuditRecord }; }; +enum TxnFreeRouteState { + STATIC = 0, DYNAMIC = 1, PARTICIPANT = 2, EXTRA = 3, _CNT_VAL = 4 +}; + struct ObTxnFreeRouteCtx { friend class ObTransService; ObTxnFreeRouteCtx() { reset(); } @@ -82,9 +96,11 @@ struct ObTxnFreeRouteCtx { in_txn_before_handle_request_ = false; can_free_route_ = false; is_fallbacked_ = false; + MEMSET(state_sync_infos_, 0, sizeof(state_sync_infos_)); reset_changed_(); audit_record_.reset(); } + void set_sessid(const uint32_t sessid) { session_id_ = sessid; } void init_before_update_state(bool proxy_support); void init_before_handle_request(ObTxDesc *txdesc); bool is_temp(const ObTxDesc &tx) const; @@ -94,22 +110,42 @@ struct ObTxnFreeRouteCtx { bool is_dynamic_changed() const { return dynamic_changed_; } bool is_parts_changed() const { return parts_changed_; } bool is_extra_changed() const { return extra_changed_; } - void set_idle_released() { flag_.is_idle_released_ = true; } - bool is_idle_released() const { return flag_.is_idle_released_; } + void set_idle_released() { flag_.set_idle_released(); } + bool is_idle_released() const { return flag_.is_idle_released(); } bool has_calculated() const { return calculated_; } void set_calculated() { calculated_ = true; } int64_t get_local_version() const { return local_version_; } int64_t get_global_version() const { return global_version_; } + void inc_update_global_version(const int64_t v) { if (global_version_ < v) { global_version_ = v; } } void inc_global_version() { ++global_version_; } void reset_audit_record() { audit_record_.reset(); } + const ObTransID &get_prev_tx_id() const { return prev_tx_id_; } + const ObTransID &get_tx_id() const { return tx_id_; } + const ObTxnFreeRouteFlag &get_flag() const { return flag_; } + uint32_t get_session_id() const { return session_id_; } uint64_t get_audit_record() const { return audit_record_.v_; } - int state_update_verify_by_version(const int64_t version) const; + int state_update_verify_by_version(const TxnFreeRouteState state, + const int64_t version, + const uint32_t backend_sess_id, + bool &dup) const; + void update_last_synced_state(const TxnFreeRouteState state, uint32_t backend_sess_id, const int64_t version) + { + state_sync_infos_[state].last_backend_sess_id_ = backend_sess_id; + state_sync_infos_[state].last_version_ = version; + inc_update_global_version(version); + if (TxnFreeRouteState::STATIC == state) { + is_txn_switch_ = true; + global_version_water_mark_ = version; + } + } private: void reset_changed_() { _changed_ = false; - flag_.v_ = 0; + flag_.reset(); calculated_ = false; } + // the session this ctx belongs to + uint32_t session_id_; // the local_version updated when session handle a request // from proxy which caused txn state synced // it is used as request id for checkAlive request @@ -120,8 +156,7 @@ private: // when they update txn state and propagated in txn state // sync via OBProxy int64_t global_version_; - // used to mark the safe global version and verify the - // update's version in order to discover stale or dup + // the txn left boundary version, it's updated when txn started int64_t global_version_water_mark_; // remember txn is switched by sync 'static' state bool is_txn_switch_; @@ -159,7 +194,13 @@ private: // reset pre handle request // setup post handle request, remember fallback decision bool is_fallbacked_; - + // record each state's synced info, used to reject stale and duplicate sync + struct StateSyncInfo { + StateSyncInfo(): last_backend_sess_id_(0), last_version_(0) {} + uint32_t last_backend_sess_id_; + int64_t last_version_; + TO_STRING_KV(K_(last_backend_sess_id), K_(last_version)); + } state_sync_infos_[TxnFreeRouteState::_CNT_VAL]; // following are changed after request process // used to mark state changed and special state // need to return to proxy @@ -181,6 +222,19 @@ private: // reset before handle request ObTxnFreeRouteFlag flag_; ObTxnFreeRouteAuditRecord audit_record_; +private: + template + struct _ForRawArrayDisplay { + _ForRawArrayDisplay(const T (&a)[N]): a_(a) {} + const T (&a_)[N]; + DEFINE_TO_STRING({ + J_ARRAY_START(); + for(int i = 0; i < N; i++) { BUF_PRINTO(a_[i]); J_COMMA(); } + J_ARRAY_END(); + }); + }; + template + const _ForRawArrayDisplay for_display_(const T (&a)[N]) const { return _ForRawArrayDisplay(a); } public: TO_STRING_KV(K_(tx_id), K_(txn_addr), @@ -197,6 +251,7 @@ public: K_(local_version), K_(global_version), K_(global_version_water_mark), + "state_sync_infos", for_display_(state_sync_infos_), "audit_record", audit_record_.v_); }; } diff --git a/unittest/storage/tx/it/test_tx_free_route.cpp b/unittest/storage/tx/it/test_tx_free_route.cpp index ccdaf5324e..7a9f8c1b43 100644 --- a/unittest/storage/tx/it/test_tx_free_route.cpp +++ b/unittest/storage/tx/it/test_tx_free_route.cpp @@ -946,8 +946,8 @@ TEST_F(ObTestTxFreeRoute, sample) A_T(txn_free_route_ctx.dynamic_changed_), A_T(txn_free_route_ctx.parts_changed_), A_T(txn_free_route_ctx.extra_changed_), - A_F(txn_free_route_ctx.flag_.is_tx_terminated_), - A_F(txn_free_route_ctx.flag_.is_fallback_)); + A_F(txn_free_route_ctx.flag_.is_tx_terminated()), + A_F(txn_free_route_ctx.flag_.is_fallback())); EX_START_TX(1); RESET_HOOKS_2(); EXPECT_PROXY(POST_ROUTE, A_EQ(backend->server_, &server2)); @@ -1002,7 +1002,7 @@ TEST_F(ObTestTxFreeRoute, sample) A_F(txn_free_route_ctx.dynamic_changed_), A_F(txn_free_route_ctx.parts_changed_), A_F(txn_free_route_ctx.extra_changed_), - A_F(txn_free_route_ctx.flag_.is_tx_terminated_)); + A_F(txn_free_route_ctx.flag_.is_tx_terminated())); EX_DUMMY_WRITE(201,100); // step2 RESET_HOOKS_2(); @@ -1014,7 +1014,7 @@ TEST_F(ObTestTxFreeRoute, sample) A_F(txn_free_route_ctx.dynamic_changed_), A_F(txn_free_route_ctx.parts_changed_), A_T(txn_free_route_ctx.extra_changed_), - A_F(txn_free_route_ctx.flag_.is_tx_terminated_)); + A_F(txn_free_route_ctx.flag_.is_tx_terminated())); EX_SAVEPOINT(202, 102); // step3 RESET_HOOKS_2();