[tx-route] support receive duplicate state from proxy

This commit is contained in:
chinaxing
2023-05-25 08:11:51 +00:00
committed by ob-robot
parent 5663062a6f
commit 4ea455b9f9
7 changed files with 295 additions and 118 deletions

View File

@ -192,7 +192,9 @@ union ObProxyCapabilityFlags
uint64_t OB_CAP_PROXY_SESSION_VAR_SYNC: 1;
uint64_t OB_CAP_PROXY_WEAK_STALE_FEEDBACK: 1;
uint64_t OB_CAP_PROXY_FULL_LINK_TRACING_EXT: 1;
uint64_t OB_CAP_RESERVED_NOT_USE: 45;
// duplicate session_info sync of transaction type
uint64_t OB_CAP_SERVER_DUP_SESS_INFO_SYNC: 1;
uint64_t OB_CAP_RESERVED_NOT_USE: 44;
} cap_flags_;
};

View File

@ -1487,6 +1487,7 @@ int ObMPConnect::check_update_proxy_capability(ObSMConnection &conn) const
}
server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_SESSION_VAR_SYNC = 1;
server_proxy_cap_flag.cap_flags_.OB_CAP_PROXY_FULL_LINK_TRACING_EXT = 1;
server_proxy_cap_flag.cap_flags_.OB_CAP_SERVER_DUP_SESS_INFO_SYNC = 1;
conn.proxy_cap_flags_.capability_ = (server_proxy_cap_flag.capability_ & client_proxy_cap);//if old java client, set it 0
LOG_DEBUG("Negotiated capability",

View File

@ -183,7 +183,7 @@ int ObMPUtils::sync_session_info(sql::ObSQLSessionInfo &sess, const common::ObSt
}
// phase 2: handle txn relative types in order
if (OB_SUCC(ret) && has_txn_type) {
for(int info_type = min; info_type <= max; info_type++) {
for(int info_type = min; OB_SUCC(ret) && info_type <= max; info_type++) {
auto &info = txn_type_infos[info_type - min];
if (info.has) {
if (OB_FAIL(sess.update_sess_sync_info((sql::SessionSyncInfoType)info_type, buf, info.pos + info.len, info.pos))) {

View File

@ -201,6 +201,7 @@ int ObSQLSessionInfo::init(uint32_t sessid, uint64_t proxy_sessid,
static const int64_t PS_BUCKET_NUM = 64;
if (OB_FAIL(ObBasicSessionInfo::init(sessid, proxy_sessid, bucket_allocator, tz_info))) {
LOG_WARN("fail to init basic session info", K(ret));
} else if (FALSE_IT(txn_free_route_ctx_.set_sessid(sessid))) {
} else if (!is_acquire_from_pool() &&
OB_FAIL(package_state_map_.create(hash::cal_next_prime(4),
"PackStateMap",
@ -243,6 +244,7 @@ int ObSQLSessionInfo::test_init(uint32_t version, uint32_t sessid, uint64_t prox
UNUSED(version);
if (OB_FAIL(ObBasicSessionInfo::test_init(sessid, proxy_sessid, bucket_allocator))) {
LOG_WARN("fail to init basic session info", K(ret));
} else if (FALSE_IT(txn_free_route_ctx_.set_sessid(sessid))) {
} else {
is_inited_ = true;
}

View File

@ -42,7 +42,6 @@ bool ObTxnFreeRouteCtx::is_temp(const ObTxDesc &tx) const
void ObTxnFreeRouteCtx::init_before_update_state(bool proxy_support)
{
is_proxy_support_ = proxy_support;
global_version_water_mark_ = global_version_;
is_txn_switch_ = false;
}
@ -135,76 +134,173 @@ int ObTransService::txn_free_route__sanity_check_fallback_(ObTxDesc *tx, ObTxnFr
return ret;
}
inline int ObTxnFreeRouteCtx::state_update_verify_by_version(const int64_t version) const
inline int ObTxnFreeRouteCtx::state_update_verify_by_version(const TxnFreeRouteState state,
const int64_t version,
const uint32_t backend_sess_id,
bool &dup_sync) const
{
int ret = OB_SUCCESS;
// if ctx is switch to new txn in this request
// water_mark was established by static state
// the following state (dyn, parts, extra) should be >= water_mark
if (is_txn_switch_) {
if (global_version_water_mark_ > version) {
if (is_txn_switch_ && global_version_water_mark_ > version) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "the state is stale", K(ret), K(version));
TRANS_LOG(ERROR, "the state is stale", K(ret));
}
// otherwise, the new state's version should be > water_mark
} else if (global_version_water_mark_ == version) {
dup_sync = false;
auto &sync_info = state_sync_infos_[state];
if (sync_info.last_version_ > version) {
// stale
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "duplicated state sync", K(ret), K(version));
} else if (global_version_water_mark_ > version) {
TRANS_LOG(ERROR, "receive stale state", K(ret));
} else if (sync_info.last_version_ == version) {
if (backend_sess_id > 0
&& sync_info.last_backend_sess_id_ > 0
&& sync_info.last_backend_sess_id_ != backend_sess_id) {
// invalid, state of same version from diff backend_session
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "the state is stale", K(ret), K(version));
TRANS_LOG(ERROR, "receive diverged state", K(ret));
} else {
// duplicate
dup_sync = true;
TRANS_LOG(INFO, "receive duplicate state", K(ret), K(state));
}
} else {
// pass
}
return ret;
}
#define DECODE_HEADER_BASE() \
ObTxnFreeRouteFlag flag; \
ObTransID tx_id; \
int64_t global_version; \
{ \
int64_t tmp_tx_id = 0; \
if (OB_FAIL(OB_E(EventTable::EN_TX_FREE_ROUTE_UPDATE_STATE_ERROR, session_id) OB_SUCCESS)) { \
TRANS_LOG(ERROR, "inject failure", K(ret), KPC(tx), K(session_id)); \
} else if (OB_FAIL(decode_i64(buf, len, pos, &tmp_tx_id))) { \
TRANS_LOG(ERROR, "decode tx_id fail", K(ret)); \
} else if (FALSE_IT(tx_id = ObTransID(tmp_tx_id))) { \
} else if (OB_FAIL(decode_i64(buf, len, pos, &global_version))) { \
TRANS_LOG(ERROR, "decode global_version fail", K(ret)); \
} else if (OB_FAIL(decode_i8(buf, len, pos, &flag.v_))) { \
TRANS_LOG(ERROR, "decode flag fail", K(ret)); \
} \
struct TxStateHeader {
uint8_t compat_ver_;
ObTransID tx_id_;
int64_t global_version_;
ObTxnFreeRouteFlag flag_;
uint32_t backend_sess_id_;
static const uint8_t VER_0 = 0;
static const uint8_t VER_1 = 1;
static const uint8_t VERSION = VER_1;
private:
static bool with_version_() { return GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_1_0_1; }
public:
TxStateHeader(): tx_id_(), global_version_(0), flag_(), backend_sess_id_(0) {}
TO_STRING_KV(K_(compat_ver), K_(tx_id), K_(global_version), K_(flag), K_(backend_sess_id));
int encode(char* buf, const int64_t len, int64_t &pos);
static int64_t encode_length();
int decode(const char* buf, const int64_t len, int64_t &pos);
};
int64_t TxStateHeader::encode_length()
{
int64_t l = encoded_length_i64(1)
+ encoded_length_i64(1)
+ encoded_length_i8(1);
if (with_version_()) {
l += encoded_length_i16(100); // length
l += encoded_length_i8(1); // version
l += encoded_length_i32(1); // backend_sess_id
}
return l;
}
#define DECODE_HEADER() \
DECODE_HEADER_BASE() \
if (OB_FAIL(ret)) { \
} else if (OB_FAIL(ctx.state_update_verify_by_version(global_version))) { \
} else if (!tx_id.is_valid()) { \
ret = OB_ERR_UNEXPECTED; \
TRANS_LOG(ERROR, "tx id is invalid", K(ret)); \
} else if (ctx.global_version_ < global_version) { \
ctx.global_version_ = global_version; \
} \
#define ENCODE_HEADER() \
auto tx_id = ctx.prev_tx_id_.is_valid() ? ctx.prev_tx_id_ : ctx.tx_id_; \
if (OB_FAIL(OB_E(EventTable::EN_TX_FREE_ROUTE_ENCODE_STATE_ERROR, session_id) OB_SUCCESS)) { \
TRANS_LOG(ERROR, "inject failure", K(ret), KPC(tx), K(session_id)); \
} else if (!ctx.tx_id_.is_valid()) { \
ret = OB_ERR_UNEXPECTED; \
TRANS_LOG(ERROR, "tx_id is invalid", K(ret), K(ctx)); \
} else if (OB_FAIL(encode_i64(buf, len, pos, tx_id.get_id()))) { \
TRANS_LOG(WARN, "encode tx_id fail", K(ret)); \
} else if (OB_FAIL(encode_i64(buf, len, pos, ctx.global_version_))) { \
TRANS_LOG(WARN, "encode global_version fail", K(ret)); \
} else if (OB_FAIL(encode_i8(buf, len, pos, ctx.flag_.v_))) { \
TRANS_LOG(WARN, "encode flag fail", K(ret)); \
int TxStateHeader::encode(char* buf, const int64_t len, int64_t &pos)
{
int ret = OB_SUCCESS;
if (OB_FAIL(encode_i64(buf, len, pos, tx_id_.get_id()))) {
TRANS_LOG(WARN, "encode tx_id fail", K(ret));
} else if (OB_FAIL(encode_i64(buf, len, pos, global_version_))) {
TRANS_LOG(WARN, "encode global_version fail", K(ret));
} else {
const bool with_version = with_version_();
flag_.set_with_version(with_version);
if (OB_FAIL(encode_i8(buf, len, pos, flag_.v_))) {
TRANS_LOG(WARN, "encode flag fail", K(ret));
} else if (with_version) {
if (OB_FAIL(encode_i16(buf, len, pos, encode_length()))) {
TRANS_LOG(WARN, "encode header len fail", K(ret));
} else if (OB_FAIL(encode_i8(buf, len, pos, (int)VERSION))) {
TRANS_LOG(WARN, "encode version fail", K(ret));
} else if (OB_FAIL(encode_i32(buf, len, pos, backend_sess_id_))) {
TRANS_LOG(WARN, "encode backend_sess_id fail", K(ret));
}
}
}
return ret;
}
#define ENCODE_HEADER_LENGTH() \
int64_t l = encoded_length_i64(ctx.tx_id_.get_id()) \
+ encoded_length_i64(ctx.global_version_) \
+ encoded_length_i8(ctx.flag_.v_)
int TxStateHeader::decode(const char* buf, const int64_t len, int64_t &pos)
{
int ret = OB_SUCCESS;
int64_t tmp_tx_id = 0, pos0 = pos;
if (OB_FAIL(decode_i64(buf, len, pos, &tmp_tx_id))) {
TRANS_LOG(ERROR, "decode tx_id fail", K(ret));
} else if (FALSE_IT(tx_id_ = ObTransID(tmp_tx_id))) {
} else if (OB_FAIL(decode_i64(buf, len, pos, &global_version_))) {
TRANS_LOG(ERROR, "decode global_version fail", K(ret));
} else if (OB_FAIL(decode_i8(buf, len, pos, &flag_.v_))) {
TRANS_LOG(ERROR, "decode flag fail", K(ret));
}
if (OB_SUCC(ret) && flag_.is_with_version()) {
int16_t header_len = 0;
if (OB_FAIL(decode_i16(buf, len, pos, &header_len))) {
TRANS_LOG(ERROR, "decode header len fail", K(ret));
} else if (OB_FAIL(decode_i8(buf, pos0 + header_len, pos, (int8_t*)&compat_ver_))) {
TRANS_LOG(ERROR, "decode version fail", K(ret));
} else {
if (compat_ver_ >= VER_1 && OB_FAIL(decode_i32(buf, pos0 + header_len , pos, (int32_t*)&backend_sess_id_))) {
TRANS_LOG(ERROR, "decode backend_sess_id fail", K(ret));
}
if (OB_SUCC(ret)) {
pos = pos0 + header_len;
}
}
}
return ret;
}
static int process_header_(TxStateHeader &header,
ObTxnFreeRouteCtx &ctx,
const TxnFreeRouteState cur_state,
const char* buf,
const int64_t len,
int64_t &pos,
bool &dup_sync)
{
int ret = OB_SUCCESS;
if (OB_FAIL(OB_E(EventTable::EN_TX_FREE_ROUTE_UPDATE_STATE_ERROR, ctx.get_session_id()) OB_SUCCESS)) {
TRANS_LOG(ERROR, "inject failure", K(ret), K(ctx));
} else if (OB_FAIL(header.decode(buf, len, pos))) {
TRANS_LOG(ERROR, "decode header fail", K(ret));
} else if (OB_FAIL(ctx.state_update_verify_by_version(cur_state, header.global_version_, header.backend_sess_id_, dup_sync))) {
TRANS_LOG(WARN, "version verify failed", K(ret), K(header));
} else if (!header.tx_id_.is_valid()) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "tx id is invalid", K(ret));
}
return ret;
}
static int encode_header_(const ObTxnFreeRouteCtx &ctx, char* buf, const int64_t len, int64_t &pos)
{
int ret = OB_SUCCESS;
TxStateHeader header;
auto &tx_id = ctx.get_prev_tx_id().is_valid() ? ctx.get_prev_tx_id() : ctx.get_tx_id();
if (OB_FAIL(OB_E(EventTable::EN_TX_FREE_ROUTE_ENCODE_STATE_ERROR, ctx.get_session_id()) OB_SUCCESS)) {
TRANS_LOG(ERROR, "inject failure", K(ret), K(ctx));
} else if (!tx_id.is_valid()) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "tx_id is invalid", K(ret), K(ctx));
} else {
header.tx_id_ = tx_id;
header.global_version_ = ctx.get_global_version();
header.flag_ = ctx.get_flag();
header.backend_sess_id_ = ctx.get_session_id();
if (OB_FAIL(header.encode(buf, len, pos))) {
TRANS_LOG(WARN, "encode header fail", K(ret));
}
}
return ret;
}
int ObTransService::txn_free_route__kill_session_(const uint32_t session_id)
{
@ -266,6 +362,16 @@ int ObTransService::txn_free_route__handle_tx_exist_(const ObTransID &tx_id, ObT
return ret;
}
#define TXN_FREE_ROUTE_PROCESS_HEADER(state_type) \
TxStateHeader header; \
bool dup_sync = false; \
if (OB_FAIL(process_header_(header, ctx, state_type, buf, len, pos, dup_sync))) { \
TRANS_LOG(WARN, "process header fail", K(ret)); \
} else if (dup_sync) { \
TRANS_LOG(INFO, "duplicate sync", K(state_type), K(ctx), K(header)); \
return OB_SUCCESS; \
}
int ObTransService::txn_free_route__update_static_state(const uint32_t session_id,
ObTxDesc *&tx,
ObTxnFreeRouteCtx &ctx,
@ -278,25 +384,21 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
auto &audit_record = ctx.audit_record_;
audit_record.upd_static_ = true;
auto before_tx_id = OB_NOT_NULL(tx) ? tx->tx_id_ : ObTransID();
DECODE_HEADER();
if (OB_SUCC(ret)) {
ctx.is_txn_switch_ = true;
ctx.global_version_water_mark_ = global_version;
}
TXN_FREE_ROUTE_PROCESS_HEADER(TxnFreeRouteState::STATIC);
if (OB_FAIL(ret)) {
} else if (flag.is_tx_terminated_) {
} else if (header.flag_.is_tx_terminated()) {
audit_record.upd_term_ = true;
audit_record.upd_clean_tx_ = OB_NOT_NULL(tx);
if (OB_NOT_NULL(tx) && OB_FAIL(clean_txn_state_(tx, ctx, tx_id))) {
TRANS_LOG(WARN, "cleanup prev txn state fail", K(ret), K(tx_id), K(tx));
if (OB_NOT_NULL(tx) && OB_FAIL(clean_txn_state_(tx, ctx, header.tx_id_))) {
TRANS_LOG(WARN, "cleanup prev txn state fail", K(ret), K(tx));
}
} else if (flag.is_fallback_) {
} else if (header.flag_.is_fallback()) {
audit_record.upd_fallback_ = true;
ret = txn_free_route__sanity_check_fallback_(tx, ctx);
} else {
if (OB_ISNULL(tx)) {
if (OB_FAIL(txn_free_route__handle_tx_exist_(tx_id, audit_record, tx))) {
TRANS_LOG(WARN, "handle tx exist fail", K(ret), K(tx_id));
if (OB_FAIL(txn_free_route__handle_tx_exist_(header.tx_id_, audit_record, tx))) {
TRANS_LOG(WARN, "handle tx exist fail", K(ret));
} else if (OB_ISNULL(tx)) {
audit_record.alloc_tx_ = true;
if (OB_FAIL(acquire_tx(tx, session_id))) {
@ -308,7 +410,7 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
// reuse, overwrite
need_add_tx = true;
audit_record.reuse_tx_ = true;
} else if (tx->tx_id_ != tx_id) {
} else if (tx->tx_id_ != header.tx_id_) {
// replace
audit_record.replace_tx_ = true;
tx_desc_mgr_.remove(*tx);
@ -346,7 +448,7 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
auto elapsed_us = ObTimeUtility::current_time() - start_ts;
ObTransTraceLog &tlog = tx->get_tlog();
REC_TRANS_TRACE_EXT(&tlog, tx_free_route_update_static, OB_Y(ret),
OB_ID(txid), tx_id.get_id(),
OB_ID(txid), header.tx_id_.get_id(),
OB_ID(from), before_tx_id.get_id(),
OB_ID(time_used), elapsed_us,
OB_ID(length), len,
@ -356,10 +458,13 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
}
}
#ifndef NDEBUG
TRANS_LOG(INFO, "update-static", K(tx_id), K(flag));
TRANS_LOG(INFO, "update-static", K(header));
#endif
if (OB_SUCC(ret)) {
ctx.update_last_synced_state(TxnFreeRouteState::STATIC, header.backend_sess_id_, header.global_version_);
}
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(flag), K(before_tx_id), K(tx_id),
TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(header), K(before_tx_id),
K(session_id), K(ctx), KP(tx));
}
return ret;
@ -395,20 +500,20 @@ int ObTransService::txn_free_route__update_dynamic_state(const uint32_t session_
auto &audit_record = ctx.audit_record_;
audit_record.upd_dyn_ = true;
int64_t logic_clock = 0;
DECODE_HEADER();
TXN_FREE_ROUTE_PROCESS_HEADER(TxnFreeRouteState::DYNAMIC);
if (OB_FAIL(ret)) {
} else if (flag.is_tx_terminated_) {
} else if (header.flag_.is_tx_terminated()) {
audit_record.upd_term_ = true;
if (OB_NOT_NULL(tx)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "tx should be null: released in static state update", K(ret), K(tx->tx_id_));
}
} else if (flag.is_fallback_) {
} else if (header.flag_.is_fallback()) {
audit_record.upd_fallback_ = true;
ret = txn_free_route__sanity_check_fallback_(tx, ctx);
} else if (OB_ISNULL(tx)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "tx should not be null", K(ret), K(tx_id), K(flag), K(session_id));
TRANS_LOG(ERROR, "tx should not be null", K(ret), K(session_id));
} else {
auto start_ts = ObTimeUtility::current_time();
ObSpinLockGuard guard(tx->lock_);
@ -427,14 +532,17 @@ int ObTransService::txn_free_route__update_dynamic_state(const uint32_t session_
ObTransTraceLog &tlog = tx->get_tlog();
REC_TRANS_TRACE_EXT(&tlog, tx_free_route_update_dynamic, OB_Y(ret),
OB_ID(time_used), elapsed_us,
OB_ID(txid), tx_id.get_id(),
OB_ID(txid), header.tx_id_.get_id(),
OB_ID(logic_clock), logic_clock,
OB_ID(length), len,
OB_ID(ref), tx->get_ref(),
OB_ID(thread_id), GETTID());
}
if (OB_SUCC(ret)) {
ctx.update_last_synced_state(TxnFreeRouteState::DYNAMIC, header.backend_sess_id_, header.global_version_);
}
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(flag), K(tx_id), K(logic_clock),
TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(header), K(logic_clock),
K(session_id), K(ctx), KP(tx));
}
return ret;
@ -450,9 +558,9 @@ int ObTransService::txn_free_route__update_parts_state(const uint32_t session_id
int ret = OB_SUCCESS;
auto &audit_record = ctx.audit_record_;
audit_record.upd_parts_ = true;
DECODE_HEADER();
TXN_FREE_ROUTE_PROCESS_HEADER(TxnFreeRouteState::PARTICIPANT);
if (OB_FAIL(ret)) {
} else if (flag.is_tx_terminated_) {
} else if (header.flag_.is_tx_terminated()) {
audit_record.upd_term_ = true;
// [prev req] : [action]
// <commit> : do nothing
@ -463,33 +571,36 @@ int ObTransService::txn_free_route__update_parts_state(const uint32_t session_id
ObSpinLockGuard guard(tx->lock_);
tx->parts_.reset();
}
} else if (flag.is_fallback_) {
} else if (header.flag_.is_fallback()) {
audit_record.upd_fallback_ = true;
ret = txn_free_route__sanity_check_fallback_(tx, ctx);
} else if (OB_ISNULL(tx)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "tx should not be null", K(ret), K(tx_id), K(flag), K(session_id));
TRANS_LOG(ERROR, "tx should not be null", K(ret), K(session_id));
} else {
auto start_ts = ObTimeUtility::current_time();
ObSpinLockGuard guard(tx->lock_);
if (!tx->tx_id_.is_valid()) {
// bug, dynamic state exist, txn should be active
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "tx id should be active", K(ret), K(tx_id), K(tx->tx_id_));
TRANS_LOG(ERROR, "tx id should be active", K(ret), K(tx->tx_id_));
} else if (OB_FAIL(tx->decode_parts_state(buf, len, pos))) {
TRANS_LOG(WARN, "decode participants fail", K(ret));
}
auto elapsed_us = ObTimeUtility::current_time() - start_ts;
ObTransTraceLog &tlog = tx->get_tlog();
REC_TRANS_TRACE_EXT(&tlog, tx_free_route_update_participants, OB_Y(ret),
OB_ID(txid), tx_id.get_id(),
OB_ID(txid), header.tx_id_.get_id(),
OB_ID(time_used), elapsed_us,
OB_ID(length), len,
OB_ID(ref), tx->get_ref(),
OB_ID(thread_id), GETTID());
}
if (OB_SUCC(ret)) {
ctx.update_last_synced_state(TxnFreeRouteState::PARTICIPANT, header.backend_sess_id_, header.global_version_);
}
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(flag), K(tx_id), K(session_id), K(ctx), KP(tx));
TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(header), K(session_id), K(ctx), KP(tx));
}
return ret;
}
@ -505,9 +616,9 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id
int64_t logic_clock = 0;
auto &audit_record = ctx.audit_record_;
audit_record.upd_extra_ = true;
DECODE_HEADER();
TXN_FREE_ROUTE_PROCESS_HEADER(TxnFreeRouteState::EXTRA);
if (OB_FAIL(ret)) {
} else if (flag.is_tx_terminated_) {
} else if (header.flag_.is_tx_terminated()) {
audit_record.upd_term_ = true;
// [prev req] : [action]
// <start_tx> : cleanup snapshot_version_, snapshot_scn
@ -520,12 +631,12 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id
tx->snapshot_version_.reset();
tx->snapshot_scn_ = 0;
}
} else if (flag.is_fallback_) {
} else if (header.flag_.is_fallback()) {
audit_record.upd_fallback_ = true;
ret = txn_free_route__sanity_check_fallback_(tx, ctx);
} else {
bool add_tx = OB_ISNULL(tx);
bool replace_tx = OB_NOT_NULL(tx) && tx->tx_id_ != tx_id;
bool replace_tx = OB_NOT_NULL(tx) && tx->tx_id_ != header.tx_id_;
auto before_tx_id = OB_NOT_NULL(tx) ? tx->tx_id_ : ObTransID();
audit_record.replace_tx_ = replace_tx;
audit_record.alloc_tx_ = add_tx;
@ -542,7 +653,7 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id
if (OB_SUCC(ret) && replace_tx && tx->tx_id_.is_valid()) {
if (OB_UNLIKELY(tx->in_tx_for_free_route())) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "try overwrite tx which is active", K(ret), K(tx_id), K(tx->tx_id_));
TRANS_LOG(ERROR, "try overwrite tx which is active", K(ret), K(tx->tx_id_));
} else if (OB_FAIL(tx_desc_mgr_.remove(*tx))) {
TRANS_LOG(WARN, "unregister old tx fail", K(ret), K(tx->tx_id_));
}
@ -563,7 +674,7 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id
auto elapsed_us = ObTimeUtility::current_time() - start_ts;
ObTransTraceLog &tlog = tx->get_tlog();
REC_TRANS_TRACE_EXT(&tlog, tx_free_route_update_extra, OB_Y(ret),
OB_ID(txid), tx_id.get_id(),
OB_ID(txid), header.tx_id_.get_id(),
OB_ID(from), before_tx_id.get_id(),
OB_ID(time_used), elapsed_us,
OB_ID(logic_clock), logic_clock,
@ -575,8 +686,11 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id
}
}
}
if (OB_SUCC(ret)) {
ctx.update_last_synced_state(TxnFreeRouteState::EXTRA, header.backend_sess_id_, header.global_version_);
}
if (OB_FAIL(ret)) {
TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(flag), K(tx_id), K(logic_clock),
TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(header), K(logic_clock),
K(session_id), K(ctx), KP(tx));
if (OB_NOT_NULL(tx)) {
ObSpinLockGuard guard(tx->lock_);
@ -631,13 +745,13 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id
DEF_TXN_FREE_ROUTE_SERIALIZE(type) \
{ \
int ret = OB_SUCCESS; \
ENCODE_HEADER(); \
ret = encode_header_(ctx, buf, len, pos); \
TXN_ENCODE_NORMAL_STATE_X(type, ##__VA_ARGS__); \
return ret; \
} \
DEF_TXN_FREE_ROUTE_SERIALIZE_LENGTH(type) \
{ \
ENCODE_HEADER_LENGTH(); \
int64_t l = TxStateHeader::encode_length(); \
ENCODE_NORMAL_STATE_LENGTH(type, ##__VA_ARGS__); \
return l; \
}
@ -884,7 +998,7 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx)
}
if (return_fallback_state) {
audit_record.ret_fallback_ = true;
ctx.flag_.is_fallback_ = true;
ctx.flag_.set_fallback();
ctx.static_changed_ = true;
ctx.dynamic_changed_ = true;
ctx.parts_changed_ = true;
@ -892,7 +1006,7 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx)
}
if (return_terminated_state) {
audit_record.ret_term_ = true;
ctx.flag_.is_tx_terminated_ = true;
ctx.flag_.set_tx_terminated();
ctx.static_changed_ = true;
ctx.dynamic_changed_ = true;
ctx.parts_changed_ = true;
@ -908,6 +1022,9 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx)
}
if (ctx.is_changed()) {
ctx.inc_global_version();
if (ctx.static_changed_) {
ctx.global_version_water_mark_ = ctx.global_version_;
}
}
ctx.set_calculated();
// audit record

View File

@ -16,16 +16,26 @@ namespace transaction {
class ObTxDesc;
union ObTxnFreeRouteFlag {
int8_t v_;
struct {
static const int TX_TERMINATED_OFFSET = 0;
static const int FALLBACK_OFFSET = 1;
static const int IDLE_RELEASED_OFFSET = 2;
static const int STATE_MASK = ~(1 << 7);
static const int WITH_VERSION_OFFSET = 7;
// it is terminated (committed or rollbacked)
bool is_tx_terminated_ : 1;
bool is_tx_terminated() const { return (v_ & (1 << TX_TERMINATED_OFFSET)) != 0; }
void set_tx_terminated() { v_ |= (1 << TX_TERMINATED_OFFSET); }
// it is fallbacked to fixed route
bool is_fallback_ : 1;
bool is_fallback() const { return (v_ & (1 << FALLBACK_OFFSET)) !=0; }
void set_fallback() { v_ |= (1 << FALLBACK_OFFSET); }
// it is released during session idle, by doing check alive
bool is_idle_released_ : 1;
};
bool is_return_normal_state() const { return v_ == 0; }
TO_STRING_KV(K_(is_tx_terminated), K_(is_fallback), K_(is_idle_released));
bool is_idle_released() const { return (v_ & (1 << IDLE_RELEASED_OFFSET)) !=0; }
void set_idle_released() { v_ |= (1 << IDLE_RELEASED_OFFSET); }
// identify new Pkt format : Header part has version
bool is_with_version() const { return (v_ & (1 << WITH_VERSION_OFFSET)) !=0; }
void set_with_version(bool b) { if (b) { v_ |= (1 << WITH_VERSION_OFFSET); } else { v_ &= ~(1 << WITH_VERSION_OFFSET); } }
bool is_return_normal_state() const { return (v_ & STATE_MASK) == 0; }
void reset() { v_ = 0; }
TO_STRING_KV(K_(v));
};
union ObTxnFreeRouteAuditRecord
@ -66,6 +76,10 @@ union ObTxnFreeRouteAuditRecord
};
};
enum TxnFreeRouteState {
STATIC = 0, DYNAMIC = 1, PARTICIPANT = 2, EXTRA = 3, _CNT_VAL = 4
};
struct ObTxnFreeRouteCtx {
friend class ObTransService;
ObTxnFreeRouteCtx() { reset(); }
@ -82,9 +96,11 @@ struct ObTxnFreeRouteCtx {
in_txn_before_handle_request_ = false;
can_free_route_ = false;
is_fallbacked_ = false;
MEMSET(state_sync_infos_, 0, sizeof(state_sync_infos_));
reset_changed_();
audit_record_.reset();
}
void set_sessid(const uint32_t sessid) { session_id_ = sessid; }
void init_before_update_state(bool proxy_support);
void init_before_handle_request(ObTxDesc *txdesc);
bool is_temp(const ObTxDesc &tx) const;
@ -94,22 +110,42 @@ struct ObTxnFreeRouteCtx {
bool is_dynamic_changed() const { return dynamic_changed_; }
bool is_parts_changed() const { return parts_changed_; }
bool is_extra_changed() const { return extra_changed_; }
void set_idle_released() { flag_.is_idle_released_ = true; }
bool is_idle_released() const { return flag_.is_idle_released_; }
void set_idle_released() { flag_.set_idle_released(); }
bool is_idle_released() const { return flag_.is_idle_released(); }
bool has_calculated() const { return calculated_; }
void set_calculated() { calculated_ = true; }
int64_t get_local_version() const { return local_version_; }
int64_t get_global_version() const { return global_version_; }
void inc_update_global_version(const int64_t v) { if (global_version_ < v) { global_version_ = v; } }
void inc_global_version() { ++global_version_; }
void reset_audit_record() { audit_record_.reset(); }
const ObTransID &get_prev_tx_id() const { return prev_tx_id_; }
const ObTransID &get_tx_id() const { return tx_id_; }
const ObTxnFreeRouteFlag &get_flag() const { return flag_; }
uint32_t get_session_id() const { return session_id_; }
uint64_t get_audit_record() const { return audit_record_.v_; }
int state_update_verify_by_version(const int64_t version) const;
int state_update_verify_by_version(const TxnFreeRouteState state,
const int64_t version,
const uint32_t backend_sess_id,
bool &dup) const;
void update_last_synced_state(const TxnFreeRouteState state, uint32_t backend_sess_id, const int64_t version)
{
state_sync_infos_[state].last_backend_sess_id_ = backend_sess_id;
state_sync_infos_[state].last_version_ = version;
inc_update_global_version(version);
if (TxnFreeRouteState::STATIC == state) {
is_txn_switch_ = true;
global_version_water_mark_ = version;
}
}
private:
void reset_changed_() {
_changed_ = false;
flag_.v_ = 0;
flag_.reset();
calculated_ = false;
}
// the session this ctx belongs to
uint32_t session_id_;
// the local_version updated when session handle a request
// from proxy which caused txn state synced
// it is used as request id for checkAlive request
@ -120,8 +156,7 @@ private:
// when they update txn state and propagated in txn state
// sync via OBProxy
int64_t global_version_;
// used to mark the safe global version and verify the
// update's version in order to discover stale or dup
// the txn left boundary version, it's updated when txn started
int64_t global_version_water_mark_;
// remember txn is switched by sync 'static' state
bool is_txn_switch_;
@ -159,7 +194,13 @@ private:
// reset pre handle request
// setup post handle request, remember fallback decision
bool is_fallbacked_;
// record each state's synced info, used to reject stale and duplicate sync
struct StateSyncInfo {
StateSyncInfo(): last_backend_sess_id_(0), last_version_(0) {}
uint32_t last_backend_sess_id_;
int64_t last_version_;
TO_STRING_KV(K_(last_backend_sess_id), K_(last_version));
} state_sync_infos_[TxnFreeRouteState::_CNT_VAL];
// following are changed after request process
// used to mark state changed and special state
// need to return to proxy
@ -181,6 +222,19 @@ private:
// reset before handle request
ObTxnFreeRouteFlag flag_;
ObTxnFreeRouteAuditRecord audit_record_;
private:
template<typename T, int N>
struct _ForRawArrayDisplay {
_ForRawArrayDisplay(const T (&a)[N]): a_(a) {}
const T (&a_)[N];
DEFINE_TO_STRING({
J_ARRAY_START();
for(int i = 0; i < N; i++) { BUF_PRINTO(a_[i]); J_COMMA(); }
J_ARRAY_END();
});
};
template<typename T, int N>
const _ForRawArrayDisplay<T, N> for_display_(const T (&a)[N]) const { return _ForRawArrayDisplay<T, N>(a); }
public:
TO_STRING_KV(K_(tx_id),
K_(txn_addr),
@ -197,6 +251,7 @@ public:
K_(local_version),
K_(global_version),
K_(global_version_water_mark),
"state_sync_infos", for_display_(state_sync_infos_),
"audit_record", audit_record_.v_);
};
}

View File

@ -946,8 +946,8 @@ TEST_F(ObTestTxFreeRoute, sample)
A_T(txn_free_route_ctx.dynamic_changed_),
A_T(txn_free_route_ctx.parts_changed_),
A_T(txn_free_route_ctx.extra_changed_),
A_F(txn_free_route_ctx.flag_.is_tx_terminated_),
A_F(txn_free_route_ctx.flag_.is_fallback_));
A_F(txn_free_route_ctx.flag_.is_tx_terminated()),
A_F(txn_free_route_ctx.flag_.is_fallback()));
EX_START_TX(1);
RESET_HOOKS_2();
EXPECT_PROXY(POST_ROUTE, A_EQ(backend->server_, &server2));
@ -1002,7 +1002,7 @@ TEST_F(ObTestTxFreeRoute, sample)
A_F(txn_free_route_ctx.dynamic_changed_),
A_F(txn_free_route_ctx.parts_changed_),
A_F(txn_free_route_ctx.extra_changed_),
A_F(txn_free_route_ctx.flag_.is_tx_terminated_));
A_F(txn_free_route_ctx.flag_.is_tx_terminated()));
EX_DUMMY_WRITE(201,100);
// step2
RESET_HOOKS_2();
@ -1014,7 +1014,7 @@ TEST_F(ObTestTxFreeRoute, sample)
A_F(txn_free_route_ctx.dynamic_changed_),
A_F(txn_free_route_ctx.parts_changed_),
A_T(txn_free_route_ctx.extra_changed_),
A_F(txn_free_route_ctx.flag_.is_tx_terminated_));
A_F(txn_free_route_ctx.flag_.is_tx_terminated()));
EX_SAVEPOINT(202, 102);
// step3
RESET_HOOKS_2();