[master] add sql_audit txn-route diagnose info

This commit is contained in:
chinaxing 2023-02-14 04:15:02 +00:00 committed by ob-robot
parent 6d1f508977
commit 092b683a60
16 changed files with 204 additions and 42 deletions

View File

@ -19,7 +19,7 @@
#include "observer/mysql/obmp_utils.h"
#include "rpc/obmysql/ob_2_0_protocol_utils.h"
#include "sql/monitor/flt/ob_flt_control_info_mgr.h"
#include "lib/container/ob_bit_set.h"
namespace oceanbase
{
namespace observer
@ -125,10 +125,12 @@ int ObMPUtils::sync_session_info(sql::ObSQLSessionInfo &sess, const common::ObSt
const char *end = buf + len;
int64_t pos = 0;
LOG_TRACE("sync sess_inf", K(sess.get_is_in_retry()), K(sess.get_sessid()), KP(data), K(len), KPHEX(data, len));
LOG_TRACE("sync sess_inf", K(sess.get_is_in_retry()),
K(sess.get_sessid()), KP(data), K(len), KPHEX(data, len));
// decode sess_info
if (NULL != sess_infos.ptr() && !sess.get_is_in_retry()) {
common::ObFixedBitSet<oceanbase::sql::SessionSyncInfoType::SESSION_SYNC_MAX_TYPE> succ_info_types;
while (OB_SUCC(ret) && pos < len) {
int16_t info_type = 0;
int32_t info_len = 0;
@ -149,10 +151,11 @@ int ObMPUtils::sync_session_info(sql::ObSQLSessionInfo &sess, const common::ObSt
(oceanbase::sql::SessionSyncInfoType)(info_type),
buf, (int64_t)info_len + pos0, pos0))) {
LOG_WARN("failed to update session sync info",
K(ret), K(pos), K(info_len), K(info_len+pos));
K(ret), K(info_type), K(succ_info_types), K(pos), K(info_len), K(info_len+pos));
} else {
pos += info_len;
}
succ_info_types.add_member(info_type);
LOG_DEBUG("sync-session-info", K(info_type), K(info_len));
}
}
@ -214,7 +217,7 @@ int ObMPUtils::append_modfied_sess_info(common::ObIAllocator &allocator,
} else if (encoder->is_changed_) {
int16_t info_type = (int16_t)i;
int32_t info_len = sess_size[i];
LOG_INFO("session-info-encode", K(info_type), K(info_len));
LOG_DEBUG("session-info-encode", K(sess.get_sessid()), K(info_type), K(info_len));
if (info_len < 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid session info length", K(info_len), K(info_type), K(ret));

View File

@ -970,13 +970,17 @@ int ObGvSqlAudit::fill_cells(obmysql::ObMySQLRequestRecord &record)
cells[cell_idx].set_collation_type(ObCharset::get_default_collation(
ObCharset::get_default_charset()));
} break;
case TX_FREE_ROUTE_FLAG: {
case TX_INTERNAL_ROUTE_FLAG: {
cells[cell_idx].set_uint64(record.data_.txn_free_route_flag_);
break;
}
case PARTITION_HIT: {
cells[cell_idx].set_bool(record.data_.partition_hit_);
} break;
case TX_INTERNAL_ROUTE_VERSION: {
cells[cell_idx].set_uint64(record.data_.txn_free_route_version_);
break;
}
default: {
ret = OB_ERR_UNEXPECTED;
SERVER_LOG(WARN, "invalid column id", K(ret), K(cell_idx), K(col_id));

View File

@ -161,9 +161,10 @@ private:
PARAMS_VALUE,
RULE_NAME,
PROXY_SESSION_ID,
TX_FREE_ROUTE_FLAG,
TX_INTERNAL_ROUTE_FLAG,
PARTITION_HIT,
TX_INTERNAL_ROUTE_VERSION,
};
const static int64_t PRI_KEY_IP_IDX = 0;

View File

@ -10768,7 +10768,7 @@ int ObInnerTableSchema::all_virtual_sql_audit_schema(ObTableSchema &table_schema
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("tx_free_route_flag", //column_name
ADD_COLUMN_SCHEMA("tx_internal_route_flag", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
@ -10796,6 +10796,21 @@ int ObInnerTableSchema::all_virtual_sql_audit_schema(ObTableSchema &table_schema
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("tx_internal_route_version", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObUInt64Type, //column_type
CS_TYPE_INVALID, //column_collation_type
sizeof(uint64_t), //column_length
-1, //column_precision
-1, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
table_schema.get_part_option().set_part_num(1);
table_schema.set_part_level(PARTITION_LEVEL_ONE);
@ -12461,7 +12476,7 @@ int ObInnerTableSchema::all_virtual_sql_audit_all_virtual_sql_audit_i1_schema(Ob
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA_WITH_COLUMN_FLAGS("tx_free_route_flag", //column_name
ADD_COLUMN_SCHEMA_WITH_COLUMN_FLAGS("tx_internal_route_flag", //column_name
column_id + 95, //column_id
0, //rowkey_id
0, //index_id
@ -12494,7 +12509,24 @@ int ObInnerTableSchema::all_virtual_sql_audit_all_virtual_sql_audit_i1_schema(Ob
true);//is_storing_column
}
table_schema.set_max_used_column_id(column_id + 96);
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA_WITH_COLUMN_FLAGS("tx_internal_route_version", //column_name
column_id + 97, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObUInt64Type, //column_type
CS_TYPE_INVALID, //column_collation_type
sizeof(uint64_t), //column_length
-1, //column_precision
-1, //column_scale
false,//is_nullable
false,//is_autoincrement
false,//is_hidden
true);//is_storing_column
}
table_schema.set_max_used_column_id(column_id + 97);
return ret;
}

View File

@ -1470,7 +1470,7 @@ int ObInnerTableSchema::all_virtual_sql_audit_ora_schema(ObTableSchema &table_sc
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("TX_FREE_ROUTE_FLAG", //column_name
ADD_COLUMN_SCHEMA("TX_INTERNAL_ROUTE_FLAG", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
@ -1498,6 +1498,21 @@ int ObInnerTableSchema::all_virtual_sql_audit_ora_schema(ObTableSchema &table_sc
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA("TX_INTERNAL_ROUTE_VERSION", //column_name
++column_id, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObNumberType, //column_type
CS_TYPE_INVALID, //column_collation_type
38, //column_length
38, //column_precision
0, //column_scale
false, //is_nullable
false); //is_autoincrement
}
if (OB_SUCC(ret)) {
table_schema.get_part_option().set_part_num(1);
table_schema.set_part_level(PARTITION_LEVEL_ONE);
@ -3163,7 +3178,7 @@ int ObInnerTableSchema::all_virtual_sql_audit_ora_all_virtual_sql_audit_i1_schem
}
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA_WITH_COLUMN_FLAGS("TX_FREE_ROUTE_FLAG", //column_name
ADD_COLUMN_SCHEMA_WITH_COLUMN_FLAGS("TX_INTERNAL_ROUTE_FLAG", //column_name
column_id + 95, //column_id
0, //rowkey_id
0, //index_id
@ -3196,7 +3211,24 @@ int ObInnerTableSchema::all_virtual_sql_audit_ora_all_virtual_sql_audit_i1_schem
true);//is_storing_column
}
table_schema.set_max_used_column_id(column_id + 96);
if (OB_SUCC(ret)) {
ADD_COLUMN_SCHEMA_WITH_COLUMN_FLAGS("TX_INTERNAL_ROUTE_VERSION", //column_name
column_id + 97, //column_id
0, //rowkey_id
0, //index_id
0, //part_key_pos
ObNumberType, //column_type
CS_TYPE_INVALID, //column_collation_type
38, //column_length
38, //column_precision
0, //column_scale
false,//is_nullable
false,//is_autoincrement
false,//is_hidden
true);//is_storing_column
}
table_schema.set_max_used_column_id(column_id + 97);
return ret;
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -6685,9 +6685,10 @@ def_table_schema(
('params_value', 'longtext'),
('rule_name', 'varchar:256'),
('proxy_session_id', 'uint'),
('tx_free_route_flag', 'uint'),
('tx_internal_route_flag', 'uint'),
('partition_hit', 'bool')
('partition_hit', 'bool'),
('tx_internal_route_version', 'uint')
],
partition_columns = ['svr_ip', 'svr_port'],
vtable_route_policy = 'distributed',
@ -12975,7 +12976,10 @@ def_table_schema(
lock_for_read_time as LOCK_FOR_READ_TIME,
params_value as PARAMS_VALUE,
rule_name as RULE_NAME,
partition_hit as PARTITION_HIT
partition_hit as PARTITION_HIT,
case when tx_internal_route_flag & 96 = 32 then 1 else 0 end
as TX_INTERNAL_ROUTING,
tx_internal_route_version as TX_STATE_VERSION
from oceanbase.__all_virtual_sql_audit
""".replace("\n", " "),
@ -43236,7 +43240,10 @@ def_table_schema(
ob_trace_info as OB_TRACE_INFO,
plan_hash as PLAN_HASH,
params_value as PARAMS_VALUE,
rule_name as RULE_NAME
rule_name as RULE_NAME,
case when bitand(tx_internal_route_flag, 96) = 32 then 1 else 0 end
as TX_INTERNAL_ROUTING,
tx_internal_route_version as TX_STATE_VERSION
FROM SYS.ALL_VIRTUAL_SQL_AUDIT
""".replace("\n", " ")
)

View File

@ -433,6 +433,7 @@ struct ObAuditRecordData {
char const* source_; // snapshot's acquire source
} snapshot_; // stmt's tx snapshot
uint64_t txn_free_route_flag_; // flag contains txn free route meta
uint64_t txn_free_route_version_; // the version of txn's state
bool partition_hit_;// flag for need das partition route or not
};

View File

@ -1100,13 +1100,14 @@ int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id,
int64_t ObSqlTransControl::get_txn_##name##_state_serialize_size(ObSQLSessionInfo &session) \
{ \
int ret = OB_SUCCESS; \
int64_t size = -1; \
transaction::ObTransService *txs = NULL; \
MTL_SWITCH(session.get_effective_tenant_id()) { \
OZ (get_tx_service(&session, txs)); \
OZ (txs->txn_free_route__get_##name##_state_serialize_size(session.get_tx_desc(), session.get_txn_free_route_ctx())); \
size = txs->txn_free_route__get_##name##_state_serialize_size(session.get_tx_desc(), session.get_txn_free_route_ctx()); \
} \
LOG_DEBUG("get-serialize-size-txn-state", K(session)); \
return ret; \
return size; \
}
DELEGATE_TO_TXN(static);

View File

@ -1660,6 +1660,7 @@ const ObAuditRecordData &ObSQLSessionInfo::get_final_audit_record(
}
}
audit_record_.txn_free_route_flag_ = txn_free_route_ctx_.get_audit_record();
audit_record_.txn_free_route_version_ = txn_free_route_ctx_.get_global_version();
return audit_record_;
}
@ -2453,7 +2454,7 @@ void ObSQLSessionInfo::post_sync_session_info()
void ObSQLSessionInfo::set_txn_free_route(bool txn_free_route)
{
txn_free_route_ctx_.reset_audit_record();
txn_free_route_ctx_.set_proxy_support(txn_free_route);
txn_free_route_ctx_.init_before_update_state(txn_free_route);
}
bool ObSQLSessionInfo::can_txn_free_route() const

View File

@ -39,6 +39,12 @@ bool ObTxnFreeRouteCtx::is_temp(const ObTxDesc &tx) const
{
return !TX_START_OR_RESUME_LOCAL(&tx);
}
void ObTxnFreeRouteCtx::init_before_update_state(bool proxy_support)
{
is_proxy_support_ = proxy_support;
global_version_water_mark_ = global_version_;
is_txn_switch_ = false;
}
void ObTxnFreeRouteCtx::init_before_handle_request(ObTxDesc *tx)
{
@ -61,7 +67,7 @@ void ObTxnFreeRouteCtx::init_before_handle_request(ObTxDesc *tx)
tx_id_.reset();
}
reset_changed_();
++version_;
++local_version_;
#ifndef NDEBUG
TRANS_LOG(INFO, "[tx free route] after sync state and before handle request", KPC(tx), KPC(this));
#endif
@ -119,26 +125,60 @@ int ObTransService::txn_free_route__sanity_check_fallback_(ObTxDesc *tx, ObTxnFr
return ret;
}
inline int ObTransService::txn_state_update_verify_by_version_(const ObTxnFreeRouteCtx &ctx, const int64_t version)
{
int ret = OB_SUCCESS;
// if ctx is switch to new txn in this request
// water_mark was established by static state
// the following state (dyn, parts, extra) should be >= water_mark
if (ctx.is_txn_switch_) {
if (ctx.global_version_water_mark_ > version) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "the state is stale", K(ret), K(version), K_(ctx.global_version_water_mark));
}
// otherwise, the new state's version should be > water_mark
} else if (ctx.global_version_water_mark_ == version) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "duplicated state sync", K(ret), K(version));
} else if (ctx.global_version_water_mark_ > version) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "the state is stale", K(ret), K(version), K_(ctx.global_version_water_mark));
}
return ret;
}
#define DECODE_HEADER() \
ObTxnFreeRouteFlag flag; \
ObTransID tx_id; \
int64_t global_version; \
if (OB_FAIL(OB_E(EventTable::EN_TX_FREE_ROUTE_UPDATE_STATE_ERROR, session_id) OB_SUCCESS)) { \
TRANS_LOG(ERROR, "inject failure", K(ret), KPC(tx), K(session_id)); \
} else if (OB_FAIL(decode(buf, len, pos, tx_id))) { \
TRANS_LOG(ERROR, "decode tx_id fail", K(ret)); \
} else if (OB_FAIL(decode(buf, len, pos, global_version))) { \
TRANS_LOG(ERROR, "decode global_version fail", K(ret)); \
} else if (OB_FAIL(decode(buf, len, pos, flag.v_))) { \
TRANS_LOG(ERROR, "decode flag fail", K(ret)); \
} else if (OB_FAIL(txn_state_update_verify_by_version_(ctx, global_version))) { \
} else if (ctx.global_version_ < global_version) { \
ctx.global_version_ = global_version; \
}
#define ENCODE_HEADER() \
if (OB_FAIL(OB_E(EventTable::EN_TX_FREE_ROUTE_ENCODE_STATE_ERROR, session_id) OB_SUCCESS)) { \
TRANS_LOG(ERROR, "inject failure", K(ret), KPC(tx), K(session_id)); \
} else if (OB_FAIL(encode(buf, len, pos, ctx.tx_id_))) { \
TRANS_LOG(WARN, "encode tx_id fail", K(ret)); \
} else if (OB_FAIL(encode(buf, len, pos, ctx.global_version_))) { \
TRANS_LOG(WARN, "encode global_version fail", K(ret)); \
} else if (OB_FAIL(encode(buf, len, pos, ctx.flag_.v_))) { \
TRANS_LOG(WARN, "encode flag fail", K(ret)); \
}
#define ENCODE_HEADER_LENGTH() int64_t l = encoded_length(ctx.tx_id_) + encoded_length(ctx.flag_.v_)
#define ENCODE_HEADER_LENGTH() \
int64_t l = encoded_length(ctx.tx_id_) \
+ encoded_length(ctx.global_version_) \
+ encoded_length(ctx.flag_.v_)
int ObTransService::txn_free_route__update_static_state(const uint32_t session_id,
ObTxDesc *&tx,
@ -153,6 +193,10 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
audit_record.upd_static_ = true;
auto before_tx_id = OB_NOT_NULL(tx) ? tx->tx_id_ : ObTransID();
DECODE_HEADER();
if (OB_SUCC(ret)) {
ctx.is_txn_switch_ = true;
ctx.global_version_water_mark_ = global_version;
}
if (OB_FAIL(ret)) {
} else if (flag.is_tx_terminated_) {
audit_record.upd_term_ = true;
@ -752,8 +796,10 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx)
if (!in_txn) {
ctx.can_free_route_ = false;
}
if (ctx.is_changed()) {
ctx.inc_global_version();
}
ctx.set_calculated();
// audit record
audit_record.calculated_ = true;
audit_record.free_route_ = ctx.can_free_route_;
@ -930,7 +976,7 @@ int ObTransService::tx_free_route_check_alive(ObTxnFreeRouteCtx &ctx, const ObTx
if (ctx.txn_addr_.is_valid() && ctx.txn_addr_ != self_ && tx.is_in_tx()) {
common::ObCurTraceId::init(self_);
ObTxFreeRouteCheckAliveMsg m;
m.request_id_ = ctx.version();
m.request_id_ = ctx.get_local_version();
m.tx_id_ = tx.tx_id_;
m.sender_ = self_;
m.receiver_ = ctx.txn_addr_;

View File

@ -70,7 +70,10 @@ struct ObTxnFreeRouteCtx {
ObTxnFreeRouteCtx() { reset(); }
~ObTxnFreeRouteCtx() { reset(); }
void reset() {
version_ = 1;
local_version_ = 1;
global_version_ = 0;
global_version_water_mark_ = 0;
is_txn_switch_ = false;
txn_addr_.reset();
tx_id_.reset();
is_proxy_support_ = false;
@ -80,11 +83,11 @@ struct ObTxnFreeRouteCtx {
reset_changed_();
audit_record_.reset();
}
int64_t version() const { return version_; }
void init_before_update_state(bool proxy_support);
void init_before_handle_request(ObTxDesc *txdesc);
bool is_temp(const ObTxDesc &tx) const;
void set_proxy_support(bool support) { is_proxy_support_ = support; }
bool can_free_route() const { return can_free_route_ && !is_fallbacked_; }
bool is_changed() const { return _changed_ != 0; }
bool is_static_changed() const { return static_changed_; }
bool is_dynamic_changed() const { return dynamic_changed_; }
bool is_parts_changed() const { return parts_changed_; }
@ -93,23 +96,32 @@ struct ObTxnFreeRouteCtx {
bool is_idle_released() const { return flag_.is_idle_released_; }
bool has_calculated() const { return calculated_; }
void set_calculated() { calculated_ = true; }
int64_t get_local_version() const { return local_version_; }
int64_t get_global_version() const { return global_version_; }
void inc_global_version() { ++global_version_; }
void reset_audit_record() { audit_record_.reset(); }
uint64_t get_audit_record() const { return audit_record_.v_; }
private:
void reset_changed_() {
static_changed_ = false;
dynamic_changed_ = false;
parts_changed_ = false;
extra_changed_ = false;
_changed_ = false;
flag_.v_ = 0;
calculated_ = false;
}
// the version updated when session handle a request
// the local_version updated when session handle a request
// from proxy which caused txn state synced
// it is used as request id for checkAlive request
// do prevent stale checkAlive release txn state
// updated by state sync of later request
int64_t version_;
int64_t local_version_;
// the global_version was update by each backend session
// when they update txn state and propagated in txn state
// sync via OBProxy
int64_t global_version_;
// used to mark the safe global version and verify the
// update's version in order to discover stale or dup
int64_t global_version_water_mark_;
// remember txn is switched by sync 'static' state
bool is_txn_switch_;
// address of where txn started
// updated when receive request
// if no txn alive, set to 0.0.0.0
@ -143,10 +155,15 @@ private:
// these fields will be set each request
// NOTE:
// code should not depends on these before request process
bool static_changed_ : 1;
bool dynamic_changed_ : 1;
bool parts_changed_ : 1;
bool extra_changed_ : 1;
union {
uint8_t _changed_;
struct {
bool static_changed_ : 1;
bool dynamic_changed_ : 1;
bool parts_changed_ : 1;
bool extra_changed_ : 1;
};
};
// used do de-duplicate calculation
bool calculated_ :1;
// set after handle request
@ -154,8 +171,21 @@ private:
ObTxnFreeRouteFlag flag_;
ObTxnFreeRouteAuditRecord audit_record_;
public:
TO_STRING_KV(K_(tx_id), K_(txn_addr), K_(is_proxy_support), K_(in_txn_before_handle_request), K_(can_free_route), K_(is_fallbacked),
K_(static_changed), K_(dynamic_changed), K_(parts_changed), K_(extra_changed), K_(calculated), K_(flag), K_(version),
TO_STRING_KV(K_(tx_id),
K_(txn_addr),
K_(is_proxy_support),
K_(in_txn_before_handle_request),
K_(can_free_route),
K_(is_fallbacked),
K_(static_changed),
K_(dynamic_changed),
K_(parts_changed),
K_(extra_changed),
K_(calculated),
K_(flag),
K_(local_version),
K_(global_version),
K_(global_version_water_mark),
"audit_record", audit_record_.v_);
};
}

View File

@ -17,3 +17,4 @@ static int update_logic_clock_(const int64_t logic_clock);
bool need_fallback_(ObTxDesc &tx, int64_t &state_size);
int push_tx_state_to_remote_(ObTxDesc &tx, const ObAddr &txn_addr);
int txn_free_route__sanity_check_fallback_(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx);
int txn_state_update_verify_by_version_(const ObTxnFreeRouteCtx &ctx, const int64_t version);

View File

@ -100,8 +100,8 @@ int ObTxFreeRouteCheckAliveRespP::release_session_tx_()
sql::ObSQLSessionInfo::LockGuard data_lock_guard(session->get_thread_data_lock());
auto &ctx = session->get_txn_free_route_ctx();
auto &tx_desc = session->get_tx_desc();
if (ctx.version() != arg_.request_id_) {
TRANS_LOG(INFO, "skip handle checkAliveResp, staled", K(arg_), K(ctx.version()));
if (ctx.get_local_version() != arg_.request_id_) {
TRANS_LOG(INFO, "skip handle checkAliveResp, staled", K(arg_), K(ctx.get_local_version()));
} else if (OB_NOT_NULL(tx_desc) && tx_desc->get_tx_id() == arg_.tx_id_) {
// mark idle release, if an Query has release query_lock but not send txn state Packet yet,
// it can sens the txn was released in plan (not a surprise)

View File

@ -2319,6 +2319,8 @@ TEST_F(ObTestTx, interrupt_get_read_snapshot)
int main(int argc, char **argv)
{
int64_t tx_id = 21533427;
uint64_t h = murmurhash(&tx_id, sizeof(tx_id), 0);
system("rm -rf test_tx.log*");
ObLogger &logger = ObLogger::get_logger();
logger.set_file_name("test_tx.log", true, false,
@ -2327,5 +2329,6 @@ int main(int argc, char **argv)
"test_tx.log"); // audit
logger.set_log_level(OB_LOG_LEVEL_DEBUG);
::testing::InitGoogleTest(&argc, argv);
TRANS_LOG(INFO, "mmhash:", K(h));
return RUN_ALL_TESTS();
}