[master] fix session sync verify for txn state
This commit is contained in:
@ -718,6 +718,8 @@ public:
|
||||
int decode_##name##_state(const char *buf, const int64_t len, int64_t &pos); \
|
||||
int64_t name##_state_encoded_length(); \
|
||||
static int display_##name##_state(const char *buf, const int64_t len, int64_t &pos); \
|
||||
int encode_##name##_state_for_verify(char *buf, const int64_t len, int64_t &pos); \
|
||||
int64_t name##_state_encoded_length_for_verify(); \
|
||||
int64_t est_##name##_size__()
|
||||
#define DEF_FREE_ROUTE_DECODE(name) DEF_FREE_ROUTE_DECODE_(name)
|
||||
LST_DO(DEF_FREE_ROUTE_DECODE, (;), static, dynamic, parts, extra);
|
||||
|
||||
@ -1276,7 +1276,8 @@ int64_t ObTransService::txn_free_route__get_##name##_state_size(ObTxDesc *tx) \
|
||||
l += encoded_length_bool(true); \
|
||||
l += encoded_length_i64(1024); \
|
||||
if (tx->tx_id_.is_valid()) { \
|
||||
l += tx->name##_state_encoded_length(); \
|
||||
ObSpinLockGuard guard(tx->lock_); \
|
||||
l += tx->name##_state_encoded_length_for_verify(); \
|
||||
} \
|
||||
} \
|
||||
return l; \
|
||||
@ -1288,7 +1289,9 @@ int ObTransService::txn_free_route__get_##name##_state(ObTxDesc *tx, const ObTxn
|
||||
} else if (OB_NOT_NULL(tx)) { \
|
||||
if (OB_FAIL(encode_bool(buf, len, pos, tx->in_tx_for_free_route()))) { \
|
||||
} else if (OB_FAIL(encode_i64(buf, len, pos, ctx.global_version_))) { \
|
||||
} else if (tx->tx_id_.is_valid() && OB_FAIL(tx->encode_##name##_state(buf, len, pos))) { \
|
||||
} else if (tx->tx_id_.is_valid()) { \
|
||||
ObSpinLockGuard guard(tx->lock_); \
|
||||
ret = tx->encode_##name##_state_for_verify(buf, len, pos); \
|
||||
} \
|
||||
} \
|
||||
return ret; \
|
||||
@ -1300,16 +1303,16 @@ int ObTransService::txn_free_route__cmp_##name##_state(const char* cur_buf, int6
|
||||
ObTransID cur_tx_id, last_tx_id; \
|
||||
{ \
|
||||
int64_t tx_id = 0; \
|
||||
if (OB_FAIL(decode_i64(cur_buf, cur_len, cur_pos, &tx_id))) { \
|
||||
if (OB_FAIL(decode_i64(cur_buf, cur_len, cur_pos, &tx_id))) { \
|
||||
} else { cur_tx_id = ObTransID(tx_id); } \
|
||||
if (OB_SUCC(ret) && OB_FAIL(decode_i64(last_buf, last_len, last_pos, &tx_id))) { \
|
||||
} else { last_tx_id = ObTransID(tx_id); } \
|
||||
} \
|
||||
if (OB_SUCC(ret) && cur_tx_id != last_tx_id) { \
|
||||
if (OB_SUCC(ret) && cur_tx_id != last_tx_id && last_tx_id.is_valid()) { \
|
||||
ret = OB_ERR_UNEXPECTED; \
|
||||
TRANS_LOG(WARN, "tx_id not equals", K(cur_tx_id), K(last_tx_id)); \
|
||||
} \
|
||||
if (OB_SUCC(ret) && cur_tx_id.is_valid()) { \
|
||||
if (OB_SUCC(ret) && cur_tx_id == last_tx_id && cur_tx_id.is_valid()) { \
|
||||
if (cur_len != last_len) { \
|
||||
ret = OB_ERR_UNEXPECTED; \
|
||||
TRANS_LOG(WARN, "state len not equals", K(cur_len), K(last_len)); \
|
||||
|
||||
@ -14,12 +14,17 @@
|
||||
|
||||
namespace oceanbase {
|
||||
namespace transaction {
|
||||
// override this.flags_, because different on each node
|
||||
// override can_elr_ because it is useless and not synced between node
|
||||
#define PRE_ENCODE_DYNAMIC_FOR_VERIFY \
|
||||
FLAG flags_ = { .v_ = 0 }; \
|
||||
bool can_elr_ = false;
|
||||
#define PRE_STATIC_DECODE
|
||||
#define POST_STATIC_DECODE
|
||||
// bookkeep the original before update, then after receive the update,
|
||||
// recover flags which should not be overwriten
|
||||
#define PRE_DYNAMIC_DECODE \
|
||||
auto save_flags = flags_;
|
||||
FLAG save_flags = flags_;
|
||||
#define POST_DYNAMIC_DECODE \
|
||||
flags_ = save_flags.update_with(flags_);
|
||||
|
||||
@ -32,16 +37,24 @@ template<>
|
||||
int64_t SIZE_OF_<ObTxPartList>::get_size(ObTxPartList &x) { return x.count() * sizeof(ObTxPart); }
|
||||
#define SIZE_OF(x) SIZE_OF_<typeof(x)>::get_size(x)
|
||||
#define TXN_UNIS_DECODE(x, idx) OB_UNIS_DECODE(_member_##idx)
|
||||
#define TXN_STATE_K(x, idx) #x, _member_##idx
|
||||
#define TXN_STATE_K_(x, idx) #x, _member_##idx
|
||||
#define TXN_STATE_K(x, idx) TXN_STATE_K_(x, idx)
|
||||
#define DEF_MEMBER_(m, idx) decltype(ObTxDesc::m) _member_ ##idx
|
||||
#define DEF_MEMBER(m, idx) DEF_MEMBER_(m, idx)
|
||||
#define TXN_FREE_ROUTE_MEMBERS(name, PRE_DECODE_HANDLER, POST_DECODE_HANDLER, ...) \
|
||||
#define TXN_FREE_ROUTE_MEMBERS(name, PRE_ENCODE_FOR_VERIFY_HANDLER, PRE_DECODE_HANDLER, POST_DECODE_HANDLER, ...) \
|
||||
int ObTxDesc::encode_##name##_state(char *buf, const int64_t buf_len, int64_t &pos) \
|
||||
{ \
|
||||
int ret = OB_SUCCESS; \
|
||||
LST_DO_CODE(OB_UNIS_ENCODE, ##__VA_ARGS__); \
|
||||
return ret; \
|
||||
} \
|
||||
int ObTxDesc::encode_##name##_state_for_verify(char *buf, const int64_t buf_len, int64_t &pos) \
|
||||
{ \
|
||||
int ret = OB_SUCCESS; \
|
||||
PRE_ENCODE_FOR_VERIFY_HANDLER; \
|
||||
LST_DO_CODE(OB_UNIS_ENCODE, ##__VA_ARGS__); \
|
||||
return ret; \
|
||||
} \
|
||||
int ObTxDesc::decode_##name##_state(const char *buf, const int64_t data_len, int64_t &pos) \
|
||||
{ \
|
||||
int ret = OB_SUCCESS; \
|
||||
@ -58,6 +71,13 @@ int64_t ObTxDesc::name##_state_encoded_length() \
|
||||
LST_DO_CODE(OB_UNIS_ADD_LEN, ##__VA_ARGS__); \
|
||||
return len; \
|
||||
} \
|
||||
int64_t ObTxDesc::name##_state_encoded_length_for_verify() \
|
||||
{ \
|
||||
int64_t len = 0; \
|
||||
PRE_ENCODE_FOR_VERIFY_HANDLER; \
|
||||
LST_DO_CODE(OB_UNIS_ADD_LEN, ##__VA_ARGS__); \
|
||||
return len; \
|
||||
} \
|
||||
inline int64_t ObTxDesc::est_##name##_size__() { return LST_DO(SIZE_OF, (+), ##__VA_ARGS__); } \
|
||||
int ObTxDesc::display_##name##_state(const char* buf, const int64_t len, int64_t &pos) \
|
||||
{ \
|
||||
@ -82,7 +102,7 @@ int ObTxDesc::display_##name##_state(const char* buf, const int64_t len, int64_t
|
||||
return ret; \
|
||||
}
|
||||
|
||||
TXN_FREE_ROUTE_MEMBERS(static, PRE_STATIC_DECODE, POST_STATIC_DECODE,
|
||||
TXN_FREE_ROUTE_MEMBERS(static, , PRE_STATIC_DECODE, POST_STATIC_DECODE,
|
||||
tenant_id_,
|
||||
cluster_id_,
|
||||
cluster_version_,
|
||||
@ -96,7 +116,7 @@ TXN_FREE_ROUTE_MEMBERS(static, PRE_STATIC_DECODE, POST_STATIC_DECODE,
|
||||
sess_id_,
|
||||
timeout_us_,
|
||||
expire_ts_);
|
||||
TXN_FREE_ROUTE_MEMBERS(dynamic,PRE_DYNAMIC_DECODE, POST_DYNAMIC_DECODE,
|
||||
TXN_FREE_ROUTE_MEMBERS(dynamic, PRE_ENCODE_DYNAMIC_FOR_VERIFY, PRE_DYNAMIC_DECODE, POST_DYNAMIC_DECODE,
|
||||
op_sn_,
|
||||
state_,
|
||||
flags_.compat_for_tx_route_,
|
||||
@ -105,12 +125,12 @@ TXN_FREE_ROUTE_MEMBERS(dynamic,PRE_DYNAMIC_DECODE, POST_DYNAMIC_DECODE,
|
||||
abort_cause_,
|
||||
can_elr_,
|
||||
flags_.for_serialize_v_);
|
||||
TXN_FREE_ROUTE_MEMBERS(parts,,,
|
||||
TXN_FREE_ROUTE_MEMBERS(parts,,,,
|
||||
parts_);
|
||||
// the fields 'dup with static' are required when preceding of txn is of query like
|
||||
// savepoint or read only stmt with isolation of SERIALIZABLE / REPEATABLE READ
|
||||
// because such type of query caused the txn into 'start' in perspective of proxy
|
||||
TXN_FREE_ROUTE_MEMBERS(extra, PRE_EXTRA_DECODE, POST_EXTRA_DECODE,
|
||||
TXN_FREE_ROUTE_MEMBERS(extra, , PRE_EXTRA_DECODE, POST_EXTRA_DECODE,
|
||||
tx_id_, // dup with static
|
||||
sess_id_, // dup with static
|
||||
addr_, // dup with static
|
||||
|
||||
Reference in New Issue
Block a user