[master] handle txn info sync in asc order
This commit is contained in:
@ -140,6 +140,12 @@ int ObMPUtils::sync_session_info(sql::ObSQLSessionInfo &sess, const common::ObSt
|
||||
// decode sess_info
|
||||
if (NULL != sess_infos.ptr() && !sess.get_is_in_retry()) {
|
||||
common::ObFixedBitSet<oceanbase::sql::SessionSyncInfoType::SESSION_SYNC_MAX_TYPE> succ_info_types;
|
||||
struct TxnTypeInfo { bool has = false; int64_t pos; int32_t len; };
|
||||
int min = 0; int max = 0;
|
||||
SessSyncTxnTypeSet::get_instance().type_range(min, max);
|
||||
TxnTypeInfo txn_type_infos[max - min + 1];
|
||||
bool has_txn_type = false;
|
||||
// phase 1: iterate all types and do sync if required
|
||||
while (OB_SUCC(ret) && pos < len) {
|
||||
int16_t info_type = 0;
|
||||
int32_t info_len = 0;
|
||||
@ -156,6 +162,14 @@ int ObMPUtils::sync_session_info(sql::ObSQLSessionInfo &sess, const common::ObSt
|
||||
} else if (info_type >= SESSION_SYNC_MAX_TYPE) {
|
||||
pos += info_len;
|
||||
} else if (FALSE_IT(pos0 = pos)) {
|
||||
} else if (SessSyncTxnTypeSet::get_instance().is_contain(info_type)) {
|
||||
// need order defer to next phase
|
||||
auto &info = txn_type_infos[info_type - min];
|
||||
info.has = true;
|
||||
info.pos = pos0;
|
||||
info.len = info_len;
|
||||
has_txn_type = true;
|
||||
pos += info_len; // skip
|
||||
} else if (OB_FAIL(sess.update_sess_sync_info(
|
||||
(oceanbase::sql::SessionSyncInfoType)(info_type),
|
||||
buf, (int64_t)info_len + pos0, pos0))) {
|
||||
@ -163,10 +177,24 @@ int ObMPUtils::sync_session_info(sql::ObSQLSessionInfo &sess, const common::ObSt
|
||||
K(ret), K(info_type), K(sess.get_sessid()), K(succ_info_types), K(pos), K(info_len), K(info_len+pos));
|
||||
} else {
|
||||
pos += info_len;
|
||||
succ_info_types.add_member(info_type);
|
||||
}
|
||||
succ_info_types.add_member(info_type);
|
||||
LOG_DEBUG("sync-session-info", K(info_type), K(info_len));
|
||||
}
|
||||
// phase 2: handle txn relative types in order
|
||||
if (OB_SUCC(ret) && has_txn_type) {
|
||||
for(int info_type = min; info_type <= max; info_type++) {
|
||||
auto &info = txn_type_infos[info_type - min];
|
||||
if (info.has) {
|
||||
if (OB_FAIL(sess.update_sess_sync_info((sql::SessionSyncInfoType)info_type, buf, info.pos + info.len, info.pos))) {
|
||||
LOG_WARN("failed to update txn session sync info",
|
||||
K(ret), K(info_type), K(sess.get_sessid()), K(succ_info_types), K(info.pos), K(info.len));
|
||||
} else {
|
||||
succ_info_types.add_member(info_type);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
sess.post_sync_session_info();
|
||||
} else {
|
||||
|
||||
@ -216,6 +216,26 @@ enum SessionSyncInfoType {
|
||||
SESSION_SYNC_MAX_TYPE,
|
||||
};
|
||||
|
||||
|
||||
struct SessSyncTxnTypeSet {
|
||||
SessSyncTxnTypeSet() {
|
||||
types_.add_member(SessionSyncInfoType::SESSION_SYNC_TXN_STATIC_INFO);
|
||||
types_.add_member(SessionSyncInfoType::SESSION_SYNC_TXN_DYNAMIC_INFO);
|
||||
types_.add_member(SessionSyncInfoType::SESSION_SYNC_TXN_PARTICIPANTS_INFO);
|
||||
types_.add_member(SessionSyncInfoType::SESSION_SYNC_TXN_EXTRA_INFO);
|
||||
};
|
||||
common::ObFixedBitSet<oceanbase::sql::SessionSyncInfoType::SESSION_SYNC_MAX_TYPE> types_;
|
||||
bool is_contain(const int t) { return types_.has_member(t); }
|
||||
void type_range(int &min, int &max) {
|
||||
min = SessionSyncInfoType::SESSION_SYNC_TXN_STATIC_INFO;
|
||||
max = SessionSyncInfoType::SESSION_SYNC_TXN_EXTRA_INFO;
|
||||
}
|
||||
static SessSyncTxnTypeSet &get_instance() {
|
||||
static SessSyncTxnTypeSet instance;
|
||||
return instance;
|
||||
}
|
||||
};
|
||||
|
||||
class ObSessInfoEncoder {
|
||||
public:
|
||||
ObSessInfoEncoder() : is_changed_(false) {}
|
||||
|
||||
Reference in New Issue
Block a user