diff --git a/src/observer/mysql/obmp_utils.cpp b/src/observer/mysql/obmp_utils.cpp index 722533bd89..dafaebd696 100644 --- a/src/observer/mysql/obmp_utils.cpp +++ b/src/observer/mysql/obmp_utils.cpp @@ -140,6 +140,12 @@ int ObMPUtils::sync_session_info(sql::ObSQLSessionInfo &sess, const common::ObSt // decode sess_info if (NULL != sess_infos.ptr() && !sess.get_is_in_retry()) { common::ObFixedBitSet succ_info_types; + struct TxnTypeInfo { bool has = false; int64_t pos; int32_t len; }; + int min = 0; int max = 0; + SessSyncTxnTypeSet::get_instance().type_range(min, max); + TxnTypeInfo txn_type_infos[max - min + 1]; + bool has_txn_type = false; + // phase 1: iterate all types and do sync if required while (OB_SUCC(ret) && pos < len) { int16_t info_type = 0; int32_t info_len = 0; @@ -156,6 +162,14 @@ int ObMPUtils::sync_session_info(sql::ObSQLSessionInfo &sess, const common::ObSt } else if (info_type >= SESSION_SYNC_MAX_TYPE) { pos += info_len; } else if (FALSE_IT(pos0 = pos)) { + } else if (SessSyncTxnTypeSet::get_instance().is_contain(info_type)) { + // need order defer to next phase + auto &info = txn_type_infos[info_type - min]; + info.has = true; + info.pos = pos0; + info.len = info_len; + has_txn_type = true; + pos += info_len; // skip } else if (OB_FAIL(sess.update_sess_sync_info( (oceanbase::sql::SessionSyncInfoType)(info_type), buf, (int64_t)info_len + pos0, pos0))) { @@ -163,10 +177,24 @@ int ObMPUtils::sync_session_info(sql::ObSQLSessionInfo &sess, const common::ObSt K(ret), K(info_type), K(sess.get_sessid()), K(succ_info_types), K(pos), K(info_len), K(info_len+pos)); } else { pos += info_len; + succ_info_types.add_member(info_type); } - succ_info_types.add_member(info_type); LOG_DEBUG("sync-session-info", K(info_type), K(info_len)); } + // phase 2: handle txn relative types in order + if (OB_SUCC(ret) && has_txn_type) { + for(int info_type = min; info_type <= max; info_type++) { + auto &info = txn_type_infos[info_type - min]; + if (info.has) { + if (OB_FAIL(sess.update_sess_sync_info((sql::SessionSyncInfoType)info_type, buf, info.pos + info.len, info.pos))) { + LOG_WARN("failed to update txn session sync info", + K(ret), K(info_type), K(sess.get_sessid()), K(succ_info_types), K(info.pos), K(info.len)); + } else { + succ_info_types.add_member(info_type); + } + } + } + } if (OB_FAIL(ret)) { sess.post_sync_session_info(); } else { diff --git a/src/sql/session/ob_sql_session_info.h b/src/sql/session/ob_sql_session_info.h index 15fdb4df3a..4093cf4e89 100644 --- a/src/sql/session/ob_sql_session_info.h +++ b/src/sql/session/ob_sql_session_info.h @@ -216,6 +216,26 @@ enum SessionSyncInfoType { SESSION_SYNC_MAX_TYPE, }; + +struct SessSyncTxnTypeSet { + SessSyncTxnTypeSet() { + types_.add_member(SessionSyncInfoType::SESSION_SYNC_TXN_STATIC_INFO); + types_.add_member(SessionSyncInfoType::SESSION_SYNC_TXN_DYNAMIC_INFO); + types_.add_member(SessionSyncInfoType::SESSION_SYNC_TXN_PARTICIPANTS_INFO); + types_.add_member(SessionSyncInfoType::SESSION_SYNC_TXN_EXTRA_INFO); + }; + common::ObFixedBitSet types_; + bool is_contain(const int t) { return types_.has_member(t); } + void type_range(int &min, int &max) { + min = SessionSyncInfoType::SESSION_SYNC_TXN_STATIC_INFO; + max = SessionSyncInfoType::SESSION_SYNC_TXN_EXTRA_INFO; + } + static SessSyncTxnTypeSet &get_instance() { + static SessSyncTxnTypeSet instance; + return instance; + } +}; + class ObSessInfoEncoder { public: ObSessInfoEncoder() : is_changed_(false) {}