From f27d2efc83fd54902c2e760838422e831f4e287d Mon Sep 17 00:00:00 2001 From: obdev Date: Sat, 28 Jan 2023 18:05:52 +0800 Subject: [PATCH] [FEAT MERGE] productization of xa trans --- src/diagnose/lua/ob_lua_api.cpp | 209 ++++++++++-------- src/diagnose/lua/test.lua | 7 +- .../virtual_table/ob_all_virtual_tx_stat.cpp | 34 +++ .../virtual_table/ob_all_virtual_tx_stat.h | 6 + .../ob_inner_table_schema.11001_11050.cpp | 80 +++++++ .../ob_inner_table_schema.15201_15250.cpp | 75 +++++++ .../ob_inner_table_schema.21201_21250.cpp | 2 +- .../ob_inner_table_schema.28101_28150.cpp | 2 +- .../inner_table/ob_inner_table_schema_def.py | 19 +- src/storage/tx/ob_trans_define.cpp | 4 +- src/storage/tx/ob_trans_define.h | 4 +- src/storage/tx/ob_trans_define_v4.cpp | 6 +- src/storage/tx/ob_trans_functor.h | 5 +- src/storage/tx/ob_trans_part_ctx.cpp | 17 +- src/storage/tx/ob_trans_part_ctx.h | 9 +- src/storage/tx/ob_trans_service_v4.cpp | 9 +- .../tx/ob_two_phase_upstream_committer.cpp | 1 + src/storage/tx/ob_tx_2pc_msg_handler.cpp | 1 + src/storage/tx/ob_tx_log.cpp | 6 +- src/storage/tx/ob_tx_log.h | 16 +- src/storage/tx/ob_tx_stat.cpp | 13 +- src/storage/tx/ob_tx_stat.h | 10 +- src/storage/tx/ob_xa_ctx.cpp | 40 ++-- src/storage/tx/ob_xa_ctx_mgr.cpp | 1 + src/storage/tx/ob_xa_ctx_mgr.h | 1 + src/storage/tx/ob_xa_define.cpp | 10 + src/storage/tx/ob_xa_define.h | 25 +++ src/storage/tx/ob_xa_service.cpp | 62 +++--- .../tx/ob_xa_trans_heartbeat_worker.cpp | 1 + unittest/storage/tx/test_ob_tx_log.cpp | 5 +- 30 files changed, 500 insertions(+), 180 deletions(-) diff --git a/src/diagnose/lua/ob_lua_api.cpp b/src/diagnose/lua/ob_lua_api.cpp index 415f18ecd2..1c5dfddd73 100644 --- a/src/diagnose/lua/ob_lua_api.cpp +++ b/src/diagnose/lua/ob_lua_api.cpp @@ -1012,10 +1012,8 @@ int select_trans_stat(lua_State *L) int ret = OB_SUCCESS; int argc = lua_gettop(L); ObTransService *trans_service = nullptr; - ObTxStatIterator iter; ObArray tenant_ids; ObTxStat tx_stat; - iter.reset(); if (argc > 1) { OB_LOG(ERROR, "call select_trans_stat() failed, bad arguments count, should be 0."); lua_pushnil(L); @@ -1026,104 +1024,125 @@ int select_trans_stat(lua_State *L) OB_LOG(ERROR, "failed to get tenant_ids", K(ret)); lua_pushnil(L); } else { - for (int i = 0; i < tenant_ids.count() && OB_SUCC(ret); ++i) { - uint64_t tenant_id = tenant_ids.at(i); - MTL_SWITCH(tenant_id) { - auto* txs = MTL(transaction::ObTransService*); - if (OB_FAIL(txs->iterate_all_observer_tx_stat(iter))) { - OB_LOG(ERROR, "iterate transaction stat failed", K(ret)); + HEAP_VAR(ObTxStatIterator, iter) { + iter.reset(); + for (int i = 0; i < tenant_ids.count() && OB_SUCC(ret); ++i) { + uint64_t tenant_id = tenant_ids.at(i); + MTL_SWITCH(tenant_id) { + auto* txs = MTL(transaction::ObTransService*); + if (OB_FAIL(txs->iterate_all_observer_tx_stat(iter))) { + OB_LOG(ERROR, "iterate transaction stat failed", K(ret)); + } } } - } - if (OB_FAIL(ret)) { - lua_pushnil(L); - } else if (OB_FAIL(iter.set_ready())) { - OB_LOG(ERROR, "ObTxStatIterator set ready error", K(ret)); - lua_pushnil(L); - } else { - ret = iter.get_next(tx_stat); - } - static constexpr int64_t OB_MAX_BUFFER_SIZE = 512; - static constexpr int64_t OB_MIN_BUFFER_SIZE = 128; - std::vector columns = { - "tenant_id", - "session_id", - "scheduler_addr", - "trans_type", - "trans_id", - "has_decided", - "ls_id", - "participants", - "ctx_create_time", - "expired_time", - "ref_cnt", - "last_op_sn", - "pending_write", - "state", - "part_trans_action", - "trans_ctx_addr", - "mem_ctx_id", - "pending_log_size", - "flushed_log_size", - "role" - }; - LuaVtableGenerator gen(L, columns); - while (OB_SUCC(ret) && !gen.is_end()) { - gen.next_row(); - // tenant_id - gen.next_column(tx_stat.tenant_id_); - // session_id - gen.next_column(tx_stat.session_id_); - // scheduler_addr - char addr_buf[32]; - tx_stat.scheduler_addr_.to_string(addr_buf, 32); - gen.next_column(addr_buf); - // trans_type - gen.next_column(tx_stat.tx_type_); - // trans_id - gen.next_column(tx_stat.tx_id_.get_id()); - // has_decided - gen.next_column(tx_stat.has_decided_); - // ls_id - gen.next_column(tx_stat.ls_id_.id()); - // participants - if (0 < tx_stat.participants_.count()) { - char participants_buffer[OB_MAX_BUFFER_SIZE]; - tx_stat.participants_.to_string(participants_buffer, OB_MAX_BUFFER_SIZE); - gen.next_column(participants_buffer); + if (OB_FAIL(ret)) { + lua_pushnil(L); + } else if (OB_FAIL(iter.set_ready())) { + OB_LOG(ERROR, "ObTxStatIterator set ready error", K(ret)); + lua_pushnil(L); } else { - gen.next_column("NULL"); + ret = iter.get_next(tx_stat); } - // ctx_create_time - gen.next_column(tx_stat.tx_ctx_create_time_); - // expired_time - gen.next_column(tx_stat.tx_expired_time_); - // refer - gen.next_column(tx_stat.ref_cnt_); - // last_op_sn - gen.next_column(tx_stat.last_op_sn_); - // pending_write - gen.next_column(tx_stat.pending_write_); - // state - gen.next_column(tx_stat.state_); - // part_trans_action - gen.next_column(tx_stat.part_tx_action_); - // trans_ctx_addr - gen.next_column((int64_t)tx_stat.tx_ctx_addr_); - // mem_ctx_id - lua_pushinteger(L, 0); - // pending_log_size - lua_pushinteger(L, tx_stat.pending_log_size_); - // flushed_log_size - lua_pushinteger(L, tx_stat.flushed_log_size_); - // role - gen.next_column(tx_stat.role_state_); + static constexpr int64_t OB_MAX_BUFFER_SIZE = 512; + static constexpr int64_t OB_MIN_BUFFER_SIZE = 128; + std::vector columns = { + "tenant_id", + "session_id", + "scheduler_addr", + "trans_type", + "trans_id", + "has_decided", + "ls_id", + "participants", + "ctx_create_time", + "expired_time", + "ref_cnt", + "last_op_sn", + "pending_write", + "state", + "part_trans_action", + "trans_ctx_addr", + "mem_ctx_id", + "pending_log_size", + "flushed_log_size", + "role", + "is_exiting", + "coord", + "last_request_time", + "gtrid", + "bqual", + "format_id" + }; + LuaVtableGenerator gen(L, columns); + while (OB_SUCC(ret) && !gen.is_end()) { + gen.next_row(); + // tenant_id + gen.next_column(tx_stat.tenant_id_); + // session_id + gen.next_column(tx_stat.session_id_); + // scheduler_addr + char addr_buf[32]; + tx_stat.scheduler_addr_.to_string(addr_buf, 32); + gen.next_column(addr_buf); + // trans_type + gen.next_column(tx_stat.tx_type_); + // trans_id + gen.next_column(tx_stat.tx_id_.get_id()); + // has_decided + gen.next_column(tx_stat.has_decided_); + // ls_id + gen.next_column(tx_stat.ls_id_.id()); + // participants + if (0 < tx_stat.participants_.count()) { + char participants_buffer[OB_MAX_BUFFER_SIZE]; + tx_stat.participants_.to_string(participants_buffer, OB_MAX_BUFFER_SIZE); + gen.next_column(participants_buffer); + } else { + gen.next_column("NULL"); + } + // ctx_create_time + gen.next_column(tx_stat.tx_ctx_create_time_); + // expired_time + gen.next_column(tx_stat.tx_expired_time_); + // refer + gen.next_column(tx_stat.ref_cnt_); + // last_op_sn + gen.next_column(tx_stat.last_op_sn_); + // pending_write + gen.next_column(tx_stat.pending_write_); + // state + gen.next_column(tx_stat.state_); + // part_trans_action + gen.next_column(tx_stat.part_tx_action_); + // trans_ctx_addr + gen.next_column((int64_t)tx_stat.tx_ctx_addr_); + // mem_ctx_id + lua_pushinteger(L, 0); + // pending_log_size + lua_pushinteger(L, tx_stat.pending_log_size_); + // flushed_log_size + lua_pushinteger(L, tx_stat.flushed_log_size_); + // role + gen.next_column(tx_stat.role_state_); + // is_exiting + gen.next_column(tx_stat.is_exiting_); + // coord + gen.next_column(tx_stat.coord_.id()); + // last_request_ts + gen.next_column(tx_stat.last_request_ts_); + // gtrid + gen.next_column(tx_stat.xid_.get_gtrid_str()); + // bqual + gen.next_column(tx_stat.xid_.get_bqual_str()); + // format_id + gen.next_column(tx_stat.xid_.get_format_id()); - gen.row_end(); - ret = iter.get_next(tx_stat); - } - if (OB_ITER_END != ret) { - OB_LOG(ERROR, "iter failed", K(ret)); + gen.row_end(); + ret = iter.get_next(tx_stat); + } + if (OB_ITER_END != ret) { + OB_LOG(ERROR, "iter failed", K(ret)); + } } } return 1; diff --git a/src/diagnose/lua/test.lua b/src/diagnose/lua/test.lua index a458432163..1afd3833e9 100644 --- a/src/diagnose/lua/test.lua +++ b/src/diagnose/lua/test.lua @@ -112,8 +112,13 @@ for i=1,#trans_stat do trans_stat[i]['trans_ctx_id'], trans_stat[i]['pending_log_size'], trans_stat[i]['flushed_log_size'], + trans_stat[i]['role'], trans_stat[i]['is_exiting'], - trans_stat[i]['role']) + trans_stat[i]['coord'], + trans_stat[i]['last_request_time'], + trans_stat[i]['gtrid'], + trans_stat[i]['bqual'], + trans_stat[i]['format_id']) end para["select"] = {"tenant_id", "trans_id", "session_id"} diff --git a/src/observer/virtual_table/ob_all_virtual_tx_stat.cpp b/src/observer/virtual_table/ob_all_virtual_tx_stat.cpp index aaa46acb3b..5eb4c24063 100644 --- a/src/observer/virtual_table/ob_all_virtual_tx_stat.cpp +++ b/src/observer/virtual_table/ob_all_virtual_tx_stat.cpp @@ -31,6 +31,7 @@ void ObGVTxStat::reset() ObVirtualTableScannerIterator::reset(); all_tenants_.reset(); + xid_.reset(); init_ = false; } @@ -43,6 +44,7 @@ void ObGVTxStat::destroy() ObVirtualTableScannerIterator::reset(); all_tenants_.reset(); + xid_.reset(); init_ = false; } @@ -151,6 +153,7 @@ int ObGVTxStat::inner_get_next_row(ObNewRow *&row) } } else { const int64_t col_count = output_column_ids_.count(); + xid_ = tx_stat.xid_; for (int64_t i = 0; OB_SUCC(ret) && i < col_count; ++i) { uint64_t col_id = output_column_ids_.at(i); switch (col_id) { @@ -246,6 +249,37 @@ int ObGVTxStat::inner_get_next_row(ObNewRow *&row) case IS_EXITING: cur_row_.cells_[i].set_int(tx_stat.is_exiting_); break; + case COORD: + cur_row_.cells_[i].set_int(tx_stat.coord_.id()); + break; + case LAST_REQUEST_TS: + cur_row_.cells_[i].set_timestamp(tx_stat.last_request_ts_); + break; + case GTRID: + if (!xid_.empty()) { + cur_row_.cells_[i].set_varchar(xid_.get_gtrid_str()); + cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset())); + } else { + // use default value NULL + cur_row_.cells_[i].reset(); + } + break; + case BQUAL: + if (!xid_.empty()) { + cur_row_.cells_[i].set_varchar(xid_.get_bqual_str()); + cur_row_.cells_[i].set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset())); + } else { + // use default value NULL + cur_row_.cells_[i].reset(); + } + break; + case FORMAT_ID: + if (!xid_.empty()) { + cur_row_.cells_[i].set_int(xid_.get_format_id()); + } else { + cur_row_.cells_[i].set_int(-1); + } + break; default: ret = OB_ERR_UNEXPECTED; SERVER_LOG(WARN, "invalid coloum_id", K(ret), K(col_id)); diff --git a/src/observer/virtual_table/ob_all_virtual_tx_stat.h b/src/observer/virtual_table/ob_all_virtual_tx_stat.h index 8e393a3d39..2b0322b3ec 100644 --- a/src/observer/virtual_table/ob_all_virtual_tx_stat.h +++ b/src/observer/virtual_table/ob_all_virtual_tx_stat.h @@ -80,6 +80,11 @@ private: FLUSHED_LOG_SIZE, ROLE_STATE, IS_EXITING, + COORD, + LAST_REQUEST_TS, + GTRID, + BQUAL, + FORMAT_ID, }; static const int64_t OB_MAX_BUFFER_SIZE = 1024; @@ -93,6 +98,7 @@ private: bool init_; transaction::ObTxStatIterator tx_stat_iter_; common::ObArray all_tenants_; + transaction::ObXATransID xid_; private: DISALLOW_COPY_AND_ASSIGN(ObGVTxStat); }; diff --git a/src/share/inner_table/ob_inner_table_schema.11001_11050.cpp b/src/share/inner_table/ob_inner_table_schema.11001_11050.cpp index 3f03ac19ef..b04bde09d6 100644 --- a/src/share/inner_table/ob_inner_table_schema.11001_11050.cpp +++ b/src/share/inner_table/ob_inner_table_schema.11001_11050.cpp @@ -8646,6 +8646,86 @@ int ObInnerTableSchema::all_virtual_trans_stat_schema(ObTableSchema &table_schem false, //is_nullable false); //is_autoincrement } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("coordinator", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObIntType, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(int64_t), //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA_TS("last_request_time", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObTimestampType, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(ObPreciseDateTime), //column_length + -1, //column_precision + -1, //column_scale + true, //is_nullable + false, //is_autoincrement + false); //is_on_update_for_timestamp + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("gtrid", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObVarcharType, //column_type + CS_TYPE_BINARY, //column_collation_type + 128, //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("bqual", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObVarcharType, //column_type + CS_TYPE_BINARY, //column_collation_type + 128, //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ObObj format_id_default; + format_id_default.set_int(1); + ADD_COLUMN_SCHEMA_T("format_id", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObIntType, //column_type + CS_TYPE_INVALID, //column_collation_type + sizeof(int64_t), //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false, //is_autoincrement + format_id_default, + format_id_default); //default_value + } if (OB_SUCC(ret)) { table_schema.get_part_option().set_part_num(1); table_schema.set_part_level(PARTITION_LEVEL_ONE); diff --git a/src/share/inner_table/ob_inner_table_schema.15201_15250.cpp b/src/share/inner_table/ob_inner_table_schema.15201_15250.cpp index 3a9cefa84f..f8dc5c9118 100644 --- a/src/share/inner_table/ob_inner_table_schema.15201_15250.cpp +++ b/src/share/inner_table/ob_inner_table_schema.15201_15250.cpp @@ -2136,6 +2136,81 @@ int ObInnerTableSchema::all_virtual_trans_stat_ora_schema(ObTableSchema &table_s false, //is_nullable false); //is_autoincrement } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("COORDINATOR", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObNumberType, //column_type + CS_TYPE_INVALID, //column_collation_type + 38, //column_length + 38, //column_precision + 0, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("LAST_REQUEST_TIME", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObTimestampLTZType, //column_type + CS_TYPE_INVALID, //column_collation_type + 0, //column_length + -1, //column_precision + -1, //column_scale + true, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("GTRID", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObVarcharType, //column_type + CS_TYPE_BINARY, //column_collation_type + 128, //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("BQUAL", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObVarcharType, //column_type + CS_TYPE_BINARY, //column_collation_type + 128, //column_length + -1, //column_precision + -1, //column_scale + false, //is_nullable + false); //is_autoincrement + } + + if (OB_SUCC(ret)) { + ADD_COLUMN_SCHEMA("FORMAT_ID", //column_name + ++column_id, //column_id + 0, //rowkey_id + 0, //index_id + 0, //part_key_pos + ObNumberType, //column_type + CS_TYPE_INVALID, //column_collation_type + 38, //column_length + 38, //column_precision + 0, //column_scale + false, //is_nullable + false); //is_autoincrement + } if (OB_SUCC(ret)) { table_schema.get_part_option().set_part_num(1); table_schema.set_part_level(PARTITION_LEVEL_ONE); diff --git a/src/share/inner_table/ob_inner_table_schema.21201_21250.cpp b/src/share/inner_table/ob_inner_table_schema.21201_21250.cpp index 0a2ebe9a1d..58f545e280 100644 --- a/src/share/inner_table/ob_inner_table_schema.21201_21250.cpp +++ b/src/share/inner_table/ob_inner_table_schema.21201_21250.cpp @@ -1259,7 +1259,7 @@ int ObInnerTableSchema::gv_ob_transaction_participants_schema(ObTableSchema &tab table_schema.set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset())); if (OB_SUCC(ret)) { - if (OB_FAIL(table_schema.set_view_definition(R"__(SELECT tenant_id AS TENANT_ID, svr_ip AS SVR_IP, svr_port AS SVR_PORT, session_id AS SESSION_ID, scheduler_addr AS SCHEDULER_ADDR, CASE WHEN part_trans_action >= 3 AND trans_type = 0 THEN 'LOCAL' WHEN part_trans_action >= 3 AND trans_type = 2 THEN 'DISTRIBUTED' WHEN trans_type = 0 and state = 10 THEN 'UNDECIDED' WHEN trans_type = 0 THEN 'LOCAL' ELSE 'DISTRIBUTED' END AS TX_TYPE, trans_id AS TX_ID, ls_id AS LS_ID, participants AS PARTICIPANTS, ctx_create_time AS CTX_CREATE_TIME, expired_time AS TX_EXPIRED_TIME, CASE WHEN state = 0 THEN 'UNKNOWN' WHEN state = 10 THEN 'ACTIVE' WHEN state = 20 THEN 'REDO COMPLETE' WHEN state = 30 THEN 'PREPARE' WHEN state = 40 THEN 'PRECOMMIT' WHEN state = 50 THEN 'COMMIT' WHEN state = 60 THEN 'ABORT' WHEN state = 70 THEN 'CLEAR' ELSE 'UNDEFINED' END AS STATE, CAST (CASE WHEN part_trans_action = 1 THEN 'NULL' WHEN part_trans_action = 2 THEN 'START' WHEN part_trans_action = 3 THEN 'COMMIT' WHEN part_trans_action = 4 THEN 'ABORT' WHEN part_trans_action = 5 THEN 'DIED' WHEN part_trans_action = 6 THEN 'END' ELSE 'UNKNOWN' END AS CHAR(10)) AS ACTION, pending_log_size AS PENDING_LOG_SIZE, flushed_log_size AS FLUSHED_LOG_SIZE, CASE WHEN role = 0 THEN 'LEADER' ELSE 'FOLLOWER' END AS ROLE FROM oceanbase.__all_virtual_trans_stat WHERE is_exiting = 0 )__"))) { + if (OB_FAIL(table_schema.set_view_definition(R"__(SELECT tenant_id AS TENANT_ID, svr_ip AS SVR_IP, svr_port AS SVR_PORT, session_id AS SESSION_ID, scheduler_addr AS SCHEDULER_ADDR, CASE WHEN part_trans_action >= 3 AND trans_type = 0 THEN 'LOCAL' WHEN part_trans_action >= 3 AND trans_type = 2 THEN 'DISTRIBUTED' WHEN trans_type = 0 and state = 10 THEN 'UNDECIDED' WHEN trans_type = 0 THEN 'LOCAL' ELSE 'DISTRIBUTED' END AS TX_TYPE, trans_id AS TX_ID, ls_id AS LS_ID, participants AS PARTICIPANTS, ctx_create_time AS CTX_CREATE_TIME, expired_time AS TX_EXPIRED_TIME, CASE WHEN state = 0 THEN 'UNKNOWN' WHEN state = 10 THEN 'ACTIVE' WHEN state = 20 THEN 'REDO COMPLETE' WHEN state = 30 THEN 'PREPARE' WHEN state = 40 THEN 'PRECOMMIT' WHEN state = 50 THEN 'COMMIT' WHEN state = 60 THEN 'ABORT' WHEN state = 70 THEN 'CLEAR' ELSE 'UNDEFINED' END AS STATE, CAST (CASE WHEN part_trans_action = 1 THEN 'NULL' WHEN part_trans_action = 2 THEN 'START' WHEN part_trans_action = 3 THEN 'COMMIT' WHEN part_trans_action = 4 THEN 'ABORT' WHEN part_trans_action = 5 THEN 'DIED' WHEN part_trans_action = 6 THEN 'END' ELSE 'UNKNOWN' END AS CHAR(10)) AS ACTION, pending_log_size AS PENDING_LOG_SIZE, flushed_log_size AS FLUSHED_LOG_SIZE, CASE WHEN role = 0 THEN 'LEADER' ELSE 'FOLLOWER' END AS ROLE, COORDINATOR AS COORD, LAST_REQUEST_TIME, FORMAT_ID AS FORMATID, HEX(GTRID) AS GLOBALID, HEX(BQUAL) AS BRANCHID FROM oceanbase.__all_virtual_trans_stat WHERE is_exiting = 0 )__"))) { LOG_ERROR("fail to set view_definition", K(ret)); } } diff --git a/src/share/inner_table/ob_inner_table_schema.28101_28150.cpp b/src/share/inner_table/ob_inner_table_schema.28101_28150.cpp index 219f27f8b8..52a8b542ed 100644 --- a/src/share/inner_table/ob_inner_table_schema.28101_28150.cpp +++ b/src/share/inner_table/ob_inner_table_schema.28101_28150.cpp @@ -1159,7 +1159,7 @@ int ObInnerTableSchema::gv_ob_transaction_participants_ora_schema(ObTableSchema table_schema.set_collation_type(ObCharset::get_default_collation(ObCharset::get_default_charset())); if (OB_SUCC(ret)) { - if (OB_FAIL(table_schema.set_view_definition(R"__(SELECT tenant_id AS TENANT_ID, svr_ip AS SVR_IP, svr_port AS SVR_PORT, session_id AS SESSION_ID, scheduler_addr AS SCHEDULER_ADDR, CAST (CASE WHEN part_trans_action >= 3 AND trans_type = 0 THEN 'LOCAL' WHEN part_trans_action >= 3 AND trans_type = 2 THEN 'DISTRIBUTED' WHEN trans_type = 0 AND state = 10 THEN 'UNDECIDED' WHEN trans_type = 0 THEN 'LOCAL' ELSE 'DISTRIBUTED' END AS VARCHAR2(11)) AS TX_TYPE, trans_id AS TX_ID, ls_id AS LS_ID, participants AS PARTICIPANTS, ctx_create_time AS CTX_CREATE_TIME, expired_time AS TX_EXPIRED_TIME, CAST (CASE WHEN state = 0 THEN 'UNKNOWN' WHEN state = 10 THEN 'ACTIVE' WHEN state = 20 THEN 'REDO COMPLETE' WHEN state = 30 THEN 'PREPARE' WHEN state = 40 THEN 'PRECOMMIT' WHEN state = 50 THEN 'COMMIT' WHEN state = 60 THEN 'ABORT' WHEN state = 70 THEN 'CLEAR' ELSE 'UNDEFINED' END AS VARCHAR2(13)) AS STATE, CAST (CASE WHEN part_trans_action = 1 THEN 'NULL' WHEN part_trans_action = 2 THEN 'START' WHEN part_trans_action = 3 THEN 'COMMIT' WHEN part_trans_action = 4 THEN 'ABORT' WHEN part_trans_action = 5 THEN 'DIED' WHEN part_trans_action = 6 THEN 'END' ELSE 'UNKNOWN' END AS VARCHAR2(10)) AS ACTION, pending_log_size AS PENDING_LOG_SIZE, flushed_log_size AS FLUSHED_LOG_SIZE, CAST (CASE WHEN role = 0 THEN 'LEADER' ELSE 'FOLLOWER' END AS VARCHAR2(8)) AS ROLE FROM SYS.ALL_VIRTUAL_TRANS_STAT WHERE is_exiting = 0 )__"))) { + if (OB_FAIL(table_schema.set_view_definition(R"__(SELECT tenant_id AS TENANT_ID, svr_ip AS SVR_IP, svr_port AS SVR_PORT, session_id AS SESSION_ID, scheduler_addr AS SCHEDULER_ADDR, CAST (CASE WHEN part_trans_action >= 3 AND trans_type = 0 THEN 'LOCAL' WHEN part_trans_action >= 3 AND trans_type = 2 THEN 'DISTRIBUTED' WHEN trans_type = 0 AND state = 10 THEN 'UNDECIDED' WHEN trans_type = 0 THEN 'LOCAL' ELSE 'DISTRIBUTED' END AS VARCHAR2(11)) AS TX_TYPE, trans_id AS TX_ID, ls_id AS LS_ID, participants AS PARTICIPANTS, ctx_create_time AS CTX_CREATE_TIME, expired_time AS TX_EXPIRED_TIME, CAST (CASE WHEN state = 0 THEN 'UNKNOWN' WHEN state = 10 THEN 'ACTIVE' WHEN state = 20 THEN 'REDO COMPLETE' WHEN state = 30 THEN 'PREPARE' WHEN state = 40 THEN 'PRECOMMIT' WHEN state = 50 THEN 'COMMIT' WHEN state = 60 THEN 'ABORT' WHEN state = 70 THEN 'CLEAR' ELSE 'UNDEFINED' END AS VARCHAR2(13)) AS STATE, CAST (CASE WHEN part_trans_action = 1 THEN 'NULL' WHEN part_trans_action = 2 THEN 'START' WHEN part_trans_action = 3 THEN 'COMMIT' WHEN part_trans_action = 4 THEN 'ABORT' WHEN part_trans_action = 5 THEN 'DIED' WHEN part_trans_action = 6 THEN 'END' ELSE 'UNKNOWN' END AS VARCHAR2(10)) AS ACTION, pending_log_size AS PENDING_LOG_SIZE, flushed_log_size AS FLUSHED_LOG_SIZE, CAST (CASE WHEN role = 0 THEN 'LEADER' ELSE 'FOLLOWER' END AS VARCHAR2(8)) AS ROLE, COORDINATOR AS COORD, LAST_REQUEST_TIME, FORMAT_ID AS FORMATID, RAWTOHEX(GTRID) AS GLOBALID, RAWTOHEX(BQUAL) AS BRANCHID FROM SYS.ALL_VIRTUAL_TRANS_STAT WHERE is_exiting = 0 )__"))) { LOG_ERROR("fail to set view_definition", K(ret)); } } diff --git a/src/share/inner_table/ob_inner_table_schema_def.py b/src/share/inner_table/ob_inner_table_schema_def.py index a2e36b5591..c4452b54fc 100644 --- a/src/share/inner_table/ob_inner_table_schema_def.py +++ b/src/share/inner_table/ob_inner_table_schema_def.py @@ -6413,6 +6413,11 @@ def_table_schema( ('flushed_log_size', 'int'), ('role', 'int'), ('is_exiting', 'int'), + ('coordinator', 'int'), + ('last_request_time', 'timestamp', 'true'), + ('gtrid', 'varbinary:128'), + ('bqual', 'varbinary:128'), + ('format_id', 'int', 'false', '1'), ], partition_columns = ['svr_ip', 'svr_port'], vtable_route_policy = 'distributed', @@ -20006,7 +20011,12 @@ def_table_schema( CASE WHEN role = 0 THEN 'LEADER' ELSE 'FOLLOWER' - END AS ROLE + END AS ROLE, + COORDINATOR AS COORD, + LAST_REQUEST_TIME, + FORMAT_ID AS FORMATID, + HEX(GTRID) AS GLOBALID, + HEX(BQUAL) AS BRANCHID FROM oceanbase.__all_virtual_trans_stat WHERE is_exiting = 0 """.replace("\n", " "), @@ -46393,7 +46403,12 @@ def_table_schema( CAST (CASE WHEN role = 0 THEN 'LEADER' ELSE 'FOLLOWER' - END AS VARCHAR2(8)) AS ROLE + END AS VARCHAR2(8)) AS ROLE, + COORDINATOR AS COORD, + LAST_REQUEST_TIME, + FORMAT_ID AS FORMATID, + RAWTOHEX(GTRID) AS GLOBALID, + RAWTOHEX(BQUAL) AS BRANCHID FROM SYS.ALL_VIRTUAL_TRANS_STAT WHERE is_exiting = 0 """.replace("\n", " "), diff --git a/src/storage/tx/ob_trans_define.cpp b/src/storage/tx/ob_trans_define.cpp index 21709f2bdd..34f70ee8ab 100644 --- a/src/storage/tx/ob_trans_define.cpp +++ b/src/storage/tx/ob_trans_define.cpp @@ -1005,6 +1005,7 @@ void ObTxExecInfo::reset() prepare_log_info_arr_.reset(); xid_.reset(); need_checksum_ = true; + is_sub2pc_ = false; } void ObTxExecInfo::destroy() @@ -1042,7 +1043,8 @@ OB_SERIALIZE_MEMBER(ObTxExecInfo, // touched_pkeys_, prepare_log_info_arr_, xid_, - need_checksum_); + need_checksum_, + is_sub2pc_); bool ObMulSourceDataNotifyArg::is_redo_submitted() const { return redo_submitted_; } diff --git a/src/storage/tx/ob_trans_define.h b/src/storage/tx/ob_trans_define.h index 4c4f4a5a5d..dce53ddb35 100644 --- a/src/storage/tx/ob_trans_define.h +++ b/src/storage/tx/ob_trans_define.h @@ -1712,7 +1712,8 @@ public: //K_(touched_pkeys), K_(prepare_log_info_arr), K_(xid), - K_(need_checksum)); + K_(need_checksum), + K_(is_sub2pc)); ObTxState state_; share::ObLSID upstream_; share::ObLSArray participants_; @@ -1740,6 +1741,7 @@ public: // for xa ObXATransID xid_; bool need_checksum_; + bool is_sub2pc_; }; static const int64_t GET_GTS_AHEAD_INTERVAL = 300; diff --git a/src/storage/tx/ob_trans_define_v4.cpp b/src/storage/tx/ob_trans_define_v4.cpp index faeabe70b3..1bcca887b7 100644 --- a/src/storage/tx/ob_trans_define_v4.cpp +++ b/src/storage/tx/ob_trans_define_v4.cpp @@ -151,7 +151,8 @@ OB_SERIALIZE_MEMBER(ObTxDesc, timeout_us_, lock_timeout_us_, active_scn_, - parts_); + parts_, + xid_); OB_SERIALIZE_MEMBER(ObTxParam, timeout_us_, lock_timeout_us_, @@ -1251,7 +1252,8 @@ int ObTxDescMgr::add_with_txid(const ObTransID &tx_id, ObTxDesc &tx_desc) // if fail revert tx_desc.tx_id_ member if (OB_FAIL(ret) && !desc_tx_id.is_valid()) { tx_desc.reset_tx_id(); } } - TRANS_LOG(INFO, "txDescMgr.register trans with txid", K(ret), K(tx_id), K(tx_desc)); + TRANS_LOG(INFO, "txDescMgr.register trans with txid", K(ret), K(tx_id), + K(map_.alloc_cnt())); return ret; } diff --git a/src/storage/tx/ob_trans_functor.h b/src/storage/tx/ob_trans_functor.h index a9aaaaa20b..c6c577e1b5 100644 --- a/src/storage/tx/ob_trans_functor.h +++ b/src/storage/tx/ob_trans_functor.h @@ -797,7 +797,10 @@ public: tx_ctx->role_state_, tx_ctx->session_id_, tx_ctx->exec_info_.scheduler_, - tx_ctx->is_exiting_))) { + tx_ctx->is_exiting_, + tx_ctx->exec_info_.xid_, + tx_ctx->exec_info_.upstream_, + tx_ctx->last_request_ts_))) { TRANS_LOG(WARN, "ObTxStat init error", K(tmp_ret), KPC(tx_ctx)); } else if (OB_TMP_FAIL(tx_stat_iter_.push(tx_stat))) { TRANS_LOG(WARN, "ObTxStatIterator push trans stat error", K(tmp_ret)); diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 8a1a275e21..476fff5de5 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -116,6 +116,7 @@ int ObPartTransCtx::init(const uint64_t tenant_id, cluster_version_ = cluster_version; part_trans_action_ = ObPartTransAction::INIT; trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval; + last_request_ts_ = ctx_create_time_; exec_info_.scheduler_ = scheduler; exec_info_.trans_type_ = TransType::SP_TRANS; @@ -713,6 +714,7 @@ int ObPartTransCtx::commit(const ObLSArray &parts, } if (OB_SUCC(ret)) { part_trans_action_ = ObPartTransAction::COMMIT; + last_request_ts_ = ObClockGenerator::getClock(); } REC_TRANS_TRACE_EXT2(tlog_, commit, OB_ID(ret), ret, OB_ID(thread_id), GETTID(), @@ -2536,7 +2538,7 @@ int ObPartTransCtx::submit_redo_commit_info_log_(ObTxLogBlock &log_block, } else { ObTxCommitInfoLog commit_info_log( exec_info_.scheduler_, exec_info_.participants_, exec_info_.upstream_, - false, // sub2pc_ + exec_info_.is_sub2pc_, exec_info_.is_dup_tx_, can_elr_, trace_info_.get_app_trace_id(), trace_info_.get_app_trace_info(), exec_info_.prev_record_lsn_, exec_info_.redo_lsns_, exec_info_.incremental_participants_, cluster_version_, exec_info_.xid_); @@ -2617,7 +2619,8 @@ int ObPartTransCtx::submit_redo_active_info_log_() exec_info_.is_dup_tx_, trans_expired_time_, epoch_, last_op_sn_, first_scn_, last_scn_, cur_submitted_seq_no, - cluster_version_); + cluster_version_, + exec_info_.xid_); bool redo_log_submitted = false; ObTxLogCb *log_cb = nullptr; if (OB_FAIL(prepare_log_cb_(!NEED_FINAL_CB, log_cb))) { @@ -3999,7 +4002,8 @@ int ObPartTransCtx::replay_active_info(const ObTxActiveInfoLog &log, // schema_version can_elr_ = log.is_elr(); // cur_query_start_time - // sub2pc + exec_info_.is_sub2pc_ = log.is_sub2pc(); + exec_info_.xid_ = log.get_xid(); epoch_ = log.get_epoch(); last_op_sn_ = log.get_last_op_sn(); first_scn_ = log.get_first_scn(); @@ -4062,6 +4066,7 @@ int ObPartTransCtx::replay_commit_info(const ObTxCommitInfoLog &commit_info_log, set_2pc_upstream_(commit_info_log.get_upstream()); } exec_info_.xid_ = commit_info_log.get_xid(); + exec_info_.is_sub2pc_ = commit_info_log.is_sub2pc(); can_elr_ = commit_info_log.is_elr(); cluster_version_ = commit_info_log.get_cluster_version(); sub_state_.set_info_log_submitted(); @@ -4661,6 +4666,7 @@ int ObPartTransCtx::switch_to_leader(const SCN &start_working_ts) if (OB_FAIL(ret)) { TRANS_LOG(WARN, "switch to leader failed", KR(ret), K(ret), KPC(this)); } else { + last_request_ts_ = ObClockGenerator::getClock(); #ifndef NDEBUG TRANS_LOG(INFO, "switch to leader succeed", KPC(this)); #endif @@ -5590,12 +5596,14 @@ int ObPartTransCtx::sub_prepare(const ObLSArray &parts, set_2pc_upstream_(ls_id_); exec_info_.trans_type_ = TransType::DIST_TRANS; exec_info_.xid_ = xid; + exec_info_.is_sub2pc_ = true; // (void)set_sub2pc_coord_state(Ob2PCPrepareState::REDO_PREPARING); if (OB_FAIL(prepare_redo())) { TRANS_LOG(WARN, "fail to execute sub prepare", K(ret), KPC(this)); } else { part_trans_action_ = ObPartTransAction::COMMIT; } + last_request_ts_ = ObClockGenerator::getClock(); TRANS_LOG(INFO, "sub prepare", K(ret), K(xid), KPC(this)); } return ret; @@ -5639,6 +5647,7 @@ int ObPartTransCtx::sub_end_tx(const int64_t &request_id, if (OB_FAIL(continue_execution(is_rollback))) { TRANS_LOG(WARN, "fail to continue execution", KR(ret), KPC(this)); } + last_request_ts_ = ObClockGenerator::getClock(); } TRANS_LOG(INFO, "sub end tx", K(ret), K(xid), K(is_rollback), K(tmp_scheduler), KPC(this)); return ret; @@ -5747,6 +5756,7 @@ int ObPartTransCtx::start_access(const ObTxDesc &tx_desc, mt_ctx_.inc_ref(); mt_ctx_.acquire_callback_list(); } + last_request_ts_ = ObClockGenerator::getClock(); TRANS_LOG(TRACE, "start_access", K(ret), KPC(this)); REC_TRANS_TRACE_EXT(tlog_, start_access, OB_ID(ret), ret, @@ -5972,6 +5982,7 @@ int ObPartTransCtx::abort(const int reason) if (OB_FAIL(abort_(reason))) { TRANS_LOG(WARN, "abort_ failed", KR(ret), K(*this)); } + last_request_ts_ = ObClockGenerator::getClock(); } return ret; } diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index e05d21a2ca..07ac50c2ec 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -232,7 +232,8 @@ private: K_(collected), K_(ref), K_(rec_log_ts), - K_(prev_rec_log_ts)); + K_(prev_rec_log_ts), + K_(last_request_ts)); public: static const int64_t OP_LOCAL_NUM = 16; static const int64_t RESERVED_MEM_SIZE = 256; @@ -667,7 +668,7 @@ private: protected: // for xa virtual bool is_sub2pc() const override - { return !exec_info_.xid_.empty(); } + { return exec_info_.is_sub2pc_; } private: DISALLOW_COPY_AND_ASSIGN(ObPartTransCtx); @@ -766,6 +767,10 @@ private: TransModulePageAllocator reserve_allocator_; // tmp scheduler addr is used to post response for the second phase of xa commit/rollback common::ObAddr tmp_scheduler_; + // this is used to denote the time of last request including start_access, commit, rollback + // this is a tempoary variable which is set to now by default + // therefore, if a follower switchs to leader, the variable is set to now + int64_t last_request_ts_; // ======================================================== }; diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index 77e83ac5bf..434f46aa19 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -695,6 +695,7 @@ int ObTransService::end_two_phase_tx(const ObTransID &tx_id, tx_desc_mgr_.revert(*tx); tx = NULL; } else { + tx->tenant_id_ = MTL_ID(); tx->commit_expire_ts_ = now + timeout_us; tx->coord_id_ = coord; tx->xid_ = xid; @@ -1157,6 +1158,8 @@ int ObTransService::create_tx_ctx_(const share::ObLSID &ls_id, if (OB_FAIL(ret)) { TRANS_LOG(WARN, "get tx ctx from mgr fail", K(ret), K(tx.tx_id_), K(ls_id), K(tx), K(arg)); ctx = NULL; + } else if (!tx.xid_.empty()) { + ctx->exec_info_.xid_ = tx.xid_; } TRANS_LOG(TRACE, "create tx ctx", K(ret), K(ls_id), K(tx)); return ret; @@ -2241,9 +2244,11 @@ int ObTransService::recover_tx(const ObTxInfo &tx_info, ObTxDesc *&tx) } else if (OB_FAIL(tx->parts_.assign(tx_info.parts_))) { tx_desc_mgr_.revert(*tx); tx = NULL; - TRANS_LOG(WARN, "assgin parts fail", K(ret), K(tx)); + TRANS_LOG(WARN, "assgin parts fail", K(ret)); } else if (OB_FAIL(tx_desc_mgr_.add_with_txid(tx_info.tx_id_, *tx))) { - TRANS_LOG(WARN, "add tx to txMgr fail", K(ret), K(tx)); + tx_desc_mgr_.revert(*tx); + tx = NULL; + TRANS_LOG(WARN, "add tx to txMgr fail", K(ret)); } else { tx->flags_.REPLICA_ = true; tx->flags_.EXPLICIT_ = true; diff --git a/src/storage/tx/ob_two_phase_upstream_committer.cpp b/src/storage/tx/ob_two_phase_upstream_committer.cpp index 51b5d65804..bc71ea4a5e 100644 --- a/src/storage/tx/ob_two_phase_upstream_committer.cpp +++ b/src/storage/tx/ob_two_phase_upstream_committer.cpp @@ -632,6 +632,7 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_abort_response_impl_(const int64_t pa } else if (is_2pc_logging()) { TRANS_LOG(INFO, "committer is under logging", K(ret), K(*this)); } else if (ObTxState::INIT == get_upstream_state() + || ObTxState::REDO_COMPLETE == get_upstream_state() || ObTxState::PREPARE == get_upstream_state()) { // Abandoned: We should not skip the msg during log synchronization, for example, // one of the participants aborts and response with the abort, and the diff --git a/src/storage/tx/ob_tx_2pc_msg_handler.cpp b/src/storage/tx/ob_tx_2pc_msg_handler.cpp index 1663ae72e0..ba0a1385f4 100644 --- a/src/storage/tx/ob_tx_2pc_msg_handler.cpp +++ b/src/storage/tx/ob_tx_2pc_msg_handler.cpp @@ -635,6 +635,7 @@ int ObPartTransCtx::handle_tx_2pc_prepare_redo_req(const Ob2pcPrepareRedoReqMsg ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type()); exec_info_.trans_type_ = TransType::DIST_TRANS; exec_info_.xid_ = msg.xid_; + exec_info_.is_sub2pc_ = true; if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) { } else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) { diff --git a/src/storage/tx/ob_tx_log.cpp b/src/storage/tx/ob_tx_log.cpp index a68393bfbf..00c26a18c9 100644 --- a/src/storage/tx/ob_tx_log.cpp +++ b/src/storage/tx/ob_tx_log.cpp @@ -188,7 +188,8 @@ OB_TX_SERIALIZE_MEMBER(ObTxActiveInfoLog, /* 14 */ first_scn_, /* 15 */ last_scn_, /* 16 */ cluster_version_, - /* 17 */ max_submitted_seq_no_); + /* 17 */ max_submitted_seq_no_, + /* 18 */ xid_); OB_TX_SERIALIZE_MEMBER(ObTxCommitInfoLog, compat_bytes_, @@ -246,7 +247,7 @@ int ObTxActiveInfoLog::before_serialize() TRANS_LOG(WARN, "reset all compat_bytes_ valid failed", K(ret)); } } else { - if (OB_FAIL(compat_bytes_.init(17))) { + if (OB_FAIL(compat_bytes_.init(18))) { TRANS_LOG(WARN, "init compat_bytes_ failed", K(ret)); } } @@ -269,6 +270,7 @@ int ObTxActiveInfoLog::before_serialize() TX_NO_NEED_SER(last_scn_ == 0, 15, compat_bytes_); TX_NO_NEED_SER(cluster_version_ == 0, 16, compat_bytes_); TX_NO_NEED_SER(max_submitted_seq_no_ == 0, 17, compat_bytes_); + TX_NO_NEED_SER(xid_.empty(), 18, compat_bytes_); } return ret; diff --git a/src/storage/tx/ob_tx_log.h b/src/storage/tx/ob_tx_log.h index 4b5f20048f..c7297279f6 100644 --- a/src/storage/tx/ob_tx_log.h +++ b/src/storage/tx/ob_tx_log.h @@ -309,12 +309,13 @@ private: class ObTxActiveInfoLogTempRef { public: - ObTxActiveInfoLogTempRef() : scheduler_(), app_trace_id_str_(), proposal_leader_() {} + ObTxActiveInfoLogTempRef() : scheduler_(), app_trace_id_str_(), proposal_leader_(), xid_() {} public: common::ObAddr scheduler_; common::ObString app_trace_id_str_; common::ObAddr proposal_leader_; + ObXATransID xid_; }; class ObTxActiveInfoLog @@ -327,7 +328,7 @@ public: app_trace_id_str_(temp_ref.app_trace_id_str_), schema_version_(0), can_elr_(false), proposal_leader_(temp_ref.proposal_leader_), cur_query_start_time_(0), is_sub2pc_(false), is_dup_tx_(false), tx_expired_time_(0), epoch_(0), last_op_sn_(0), first_scn_(0), - last_scn_(0), max_submitted_seq_no_(0), cluster_version_(0) + last_scn_(0), max_submitted_seq_no_(0), cluster_version_(0), xid_(temp_ref.xid_) { before_serialize(); } @@ -347,13 +348,15 @@ public: int64_t first_scn, int64_t last_scn, int64_t max_submitted_seq_no, - uint64_t cluster_version) + uint64_t cluster_version, + const ObXATransID &xid) : scheduler_(scheduler), trans_type_(trans_type), session_id_(session_id), app_trace_id_str_(app_trace_id_str), schema_version_(schema_version), can_elr_(elr), proposal_leader_(proposal_leader), cur_query_start_time_(cur_query_start_time), is_sub2pc_(is_sub2pc), is_dup_tx_(is_dup_tx), tx_expired_time_(tx_expired_time), epoch_(epoch), last_op_sn_(last_op_sn), first_scn_(first_scn), last_scn_(last_scn), - max_submitted_seq_no_(max_submitted_seq_no), cluster_version_(cluster_version) + max_submitted_seq_no_(max_submitted_seq_no), cluster_version_(cluster_version), + xid_(xid) { before_serialize(); }; @@ -375,6 +378,7 @@ public: int64_t get_last_scn() const { return last_scn_; } int64_t get_max_submitted_seq_no() const { return max_submitted_seq_no_; } uint64_t get_cluster_version() const { return cluster_version_; } + const ObXATransID &get_xid() const { return xid_; } // for ob_admin int ob_admin_dump(share::ObAdminMutatorStringArg &arg); @@ -396,7 +400,8 @@ public: K(first_scn_), K(last_scn_), K(max_submitted_seq_no_), - K(cluster_version_)); + K(cluster_version_), + K(xid_)); public: int before_serialize(); @@ -425,6 +430,7 @@ private: int64_t max_submitted_seq_no_; uint64_t cluster_version_; + ObXATransID xid_; }; class ObTxCommitInfoLogTempRef diff --git a/src/storage/tx/ob_tx_stat.cpp b/src/storage/tx/ob_tx_stat.cpp index f7dd4f853b..32bbd4f940 100644 --- a/src/storage/tx/ob_tx_stat.cpp +++ b/src/storage/tx/ob_tx_stat.cpp @@ -44,6 +44,9 @@ void ObTxStat::reset() session_id_ = 0; scheduler_addr_.reset(); is_exiting_ = false; + xid_.reset(); + coord_.reset(); + last_request_ts_ = OB_INVALID_TIMESTAMP; } int ObTxStat::init(const common::ObAddr &addr, const ObTransID &tx_id, const uint64_t tenant_id, const bool has_decided, @@ -56,7 +59,8 @@ int ObTxStat::init(const common::ObAddr &addr, const ObTransID &tx_id, const int64_t pending_log_size, const int64_t flushed_log_size, const int64_t role_state, const int64_t session_id, const common::ObAddr &scheduler, - const bool is_exiting) + const bool is_exiting, const ObXATransID &xid, + const share::ObLSID &coord, const int64_t last_request_ts) { int ret = OB_SUCCESS; if (is_inited_) { @@ -86,6 +90,13 @@ int ObTxStat::init(const common::ObAddr &addr, const ObTransID &tx_id, session_id_ = session_id; scheduler_addr_ = scheduler; is_exiting_ = is_exiting; + xid_ = xid; + if (part_tx_action == ObPartTransAction::COMMIT && !coord.is_valid()) { + coord_ = ls_id; + } else { + coord_ = coord; + } + last_request_ts_ = last_request_ts; } return ret; } diff --git a/src/storage/tx/ob_tx_stat.h b/src/storage/tx/ob_tx_stat.h index 6fccf2eacf..96aee7e997 100644 --- a/src/storage/tx/ob_tx_stat.h +++ b/src/storage/tx/ob_tx_stat.h @@ -37,7 +37,8 @@ struct ObTxStat const int64_t pending_log_size, const int64_t flushed_log_size, const int64_t role_state, const int64_t session_id, const common::ObAddr &scheduler, - const bool is_exiting); + const bool is_exiting, const ObXATransID &xid, + const share::ObLSID &coord, const int64_t last_request_ts); TO_STRING_KV(K_(addr), K_(tx_id), K_(tenant_id), K_(has_decided), K_(ls_id), K_(participants), K_(tx_ctx_create_time), K_(tx_expired_time), K_(ref_cnt), @@ -45,8 +46,8 @@ struct ObTxStat KP_(tx_ctx_addr), K_(pending_log_size), K_(flushed_log_size), K_(role_state), K_(session_id), - K_(scheduler_addr), - K_(is_exiting)); + K_(scheduler_addr), K_(is_exiting), + K_(xid), K_(coord), K_(last_request_ts)); public: bool is_inited_; common::ObAddr addr_; @@ -70,6 +71,9 @@ public: int64_t session_id_; common::ObAddr scheduler_addr_; bool is_exiting_; + ObXATransID xid_; + share::ObLSID coord_; + int64_t last_request_ts_; }; class ObTxLockStat diff --git a/src/storage/tx/ob_xa_ctx.cpp b/src/storage/tx/ob_xa_ctx.cpp index c1eb4a7186..5cc325dd2b 100644 --- a/src/storage/tx/ob_xa_ctx.cpp +++ b/src/storage/tx/ob_xa_ctx.cpp @@ -162,9 +162,9 @@ int ObXACtx::handle_timeout(const int64_t delay) } } else { set_terminated_(); - if (0 == xa_ref_count_) { - set_exiting_(); - } + } + if (0 == xa_ref_count_) { + set_exiting_(); } timeout_task_.set_running(false); } @@ -1147,10 +1147,11 @@ int ObXACtx::xa_start_(const ObXATransID &xid, if (OB_FAIL(ret)) { tx_desc->set_xa_ctx(NULL); + // if fail, the local variable tx_desc_ should be NULL + tx_desc_ = NULL; //xa_ref_count_ is added only when success is returned if (0 == xa_ref_count_) { - is_exiting_ = true; - xa_ctx_mgr_->erase_xa_ctx(trans_id_); + set_exiting_(); } } @@ -1683,9 +1684,9 @@ int ObXACtx::end_stmt_remote_(const ObXATransID &xid) } if (OB_SUCC(ret)) { if (OB_FAIL(result)) { + set_terminated_(); + ret = OB_TRANS_XA_BRANCH_FAIL; if (OB_TRANS_XA_BRANCH_FAIL == result || OB_TRANS_CTX_NOT_EXIST == result) { - set_terminated_(); - ret = OB_TRANS_XA_BRANCH_FAIL; TRANS_LOG(INFO, "original scheduler has terminated", K(ret), K(xid), K(*this)); } else { TRANS_LOG(WARN, "fail to end stmt remote", K(ret), K(xid), K(*this)); @@ -1888,11 +1889,14 @@ int ObXACtx::set_exiting_() TRANS_LOG(ERROR, "unexpected xa ref count", K(ret), K(xa_ref_count_), K_(xid), K(*this)); } else { is_exiting_ = true; + if (NULL != tx_desc_) { + tx_desc_->reset_for_xa(); + MTL(ObTransService *)->release_tx(*tx_desc_); + tx_desc_ = NULL; + } if (OB_FAIL(xa_ctx_mgr_->erase_xa_ctx(trans_id_))) { TRANS_LOG(WARN, "erase xa ctx failed", K(ret), K_(xid), K(*this)); } - tx_desc_->reset_for_xa(); - MTL(ObTransService *)->release_tx(*tx_desc_); } TRANS_LOG(INFO, "xa ctx set exiting", K(ret), K_(xid), K(*this)); @@ -2459,10 +2463,6 @@ int ObXACtx::wait_xa_prepare(const ObXATransID &xid, const int64_t timeout_us) xa_trans_state_ = ObXATransState::PREPARED; } - if (OB_UNLIKELY(OB_TRANS_UNKNOWN == ret)) { - ret = OB_TRANS_XA_RBROLLBACK; - } - if (OB_LIKELY(!is_exiting_)) { is_exiting_ = true; if (OB_NOT_NULL(xa_ctx_mgr_)) { @@ -2517,12 +2517,8 @@ int ObXACtx::two_phase_end_trans(const ObXATransID &xid, } if (OB_FAIL(ret)) { - if (OB_LIKELY(!is_exiting_)) { - is_exiting_ = true; - if (OB_NOT_NULL(xa_ctx_mgr_)) { - xa_ctx_mgr_->erase_xa_ctx(trans_id_); - } - } + set_exiting_(); + tx_desc_ = NULL; } REC_TRACE_EXT(tlog_, xa_end_trans, OB_Y(ret), OB_ID(is_rollback), is_rollback, @@ -2550,10 +2546,8 @@ int ObXACtx::wait_two_phase_end_trans(const ObXATransID &xid, } if (OB_LIKELY(!is_exiting_)) { - is_exiting_ = true; - if (OB_NOT_NULL(xa_ctx_mgr_)) { - xa_ctx_mgr_->erase_xa_ctx(trans_id_); - } + set_exiting_(); + tx_desc_ = NULL; } return ret; diff --git a/src/storage/tx/ob_xa_ctx_mgr.cpp b/src/storage/tx/ob_xa_ctx_mgr.cpp index fc2b1d2151..9f2f1eda32 100644 --- a/src/storage/tx/ob_xa_ctx_mgr.cpp +++ b/src/storage/tx/ob_xa_ctx_mgr.cpp @@ -172,6 +172,7 @@ int ObXACtxMgr::get_xa_ctx_(const ObTransID &trans_id, bool &alloc, ObXACtx*& ct } else { ctx = tmp_ctx; inc_total_ctx_count(); + ObXAStatistics::get_instance().inc_ctx_count(); } } diff --git a/src/storage/tx/ob_xa_ctx_mgr.h b/src/storage/tx/ob_xa_ctx_mgr.h index 0cc0b84fc2..3b3f922875 100644 --- a/src/storage/tx/ob_xa_ctx_mgr.h +++ b/src/storage/tx/ob_xa_ctx_mgr.h @@ -38,6 +38,7 @@ public: ctx->destroy(); op_reclaim_free(ctx); ctx = NULL; + ObXAStatistics::get_instance().dec_ctx_count(); } } XACtxHashNode* alloc_node(ObXACtx* node) diff --git a/src/storage/tx/ob_xa_define.cpp b/src/storage/tx/ob_xa_define.cpp index e5586508fe..407cc78318 100644 --- a/src/storage/tx/ob_xa_define.cpp +++ b/src/storage/tx/ob_xa_define.cpp @@ -421,6 +421,16 @@ void ObXATimeoutTask::runTimerTask() } } +void ObXAStatistics::print_statistics(int64_t cur_ts) +{ + const int64_t last_stat_ts = ATOMIC_LOAD(&last_stat_ts_); + if (cur_ts - last_stat_ts >= STAT_INTERVAL) { + if (ATOMIC_BCAS(&last_stat_ts_, last_stat_ts, cur_ts)) { + TRANS_LOG(INFO, "xa statistics", K(*this)); + } + } +} + }//transaction }//oceanbase diff --git a/src/storage/tx/ob_xa_define.h b/src/storage/tx/ob_xa_define.h index 94fb13f2c2..5493edec44 100644 --- a/src/storage/tx/ob_xa_define.h +++ b/src/storage/tx/ob_xa_define.h @@ -257,6 +257,31 @@ public: static constexpr const char* OB_XA_TIMEOUT_NAME = "ob_xa_timeout"; }; +class ObXAStatistics +{ +public: + static ObXAStatistics &get_instance() + { + static ObXAStatistics xa_statistics_; + return xa_statistics_; + } + ~ObXAStatistics() {} +public: + void inc_ctx_count() { ATOMIC_INC(&total_active_xa_ctx_count_); } + void dec_ctx_count() { ATOMIC_DEC(&total_active_xa_ctx_count_); } + void print_statistics(int64_t cur_ts); +public: + TO_STRING_KV(K_(total_active_xa_ctx_count)); +private: + ObXAStatistics() : last_stat_ts_(0), total_active_xa_ctx_count_(0) {} + DISALLOW_COPY_AND_ASSIGN(ObXAStatistics); +private: + static const int64_t STAT_INTERVAL = 10 * 1000 * 1000; +private: + int64_t last_stat_ts_; + int64_t total_active_xa_ctx_count_; +}; + }//transaction }//oceanbase diff --git a/src/storage/tx/ob_xa_service.cpp b/src/storage/tx/ob_xa_service.cpp index 6445a61633..09eaf5a68d 100644 --- a/src/storage/tx/ob_xa_service.cpp +++ b/src/storage/tx/ob_xa_service.cpp @@ -1107,6 +1107,7 @@ int ObXAService::xa_start_(const ObXATransID &xid, tx_desc = NULL; } } else { + // if enter this branch, tx_desc must be valid if (is_first_xa_start) { if (is_tightly_coupled && OB_FAIL(update_xa_lock(trans, tenant_id, xid, trans_id))) { TRANS_LOG(WARN, "update xa lock record failed", K(ret), K(trans_id), K(xid)); @@ -1143,6 +1144,7 @@ int ObXAService::xa_start_(const ObXATransID &xid, if (OB_FAIL(ret)) { if (OB_NOT_NULL(xa_ctx)) { + xa_ctx->set_exiting(); xa_ctx_mgr_.revert_xa_ctx(xa_ctx); } MTL(ObTransService *)->release_tx(*tx_desc); @@ -1250,6 +1252,10 @@ int ObXAService::xa_start_join_(const ObXATransID &xid, TRANS_LOG(WARN, "invalid arguments from inner table", K(ret), K(xid), K(flags), K(end_flag), K(trans_id), K(scheduler_addr)); } else { + if (NULL != tx_desc) { + MTL(ObTransService *)->release_tx(*tx_desc); + tx_desc = NULL; + } const bool is_tightly_coupled = !ObXAFlag::contain_loosely(end_flag); ObXACtx *xa_ctx = NULL; bool need_retry = false; @@ -1691,6 +1697,7 @@ int ObXAService::two_phase_xa_rollback_(const ObXATransID &xid, TRANS_LOG(INFO, "two phase xa rollback success", K(ret), K(xid), K(tx_id)); } } + xa_ctx_mgr_.revert_xa_ctx(xa_ctx); } return ret; @@ -1767,51 +1774,50 @@ int ObXAService::xa_rollback_remote_(const ObXATransID &xid, int ret = OB_SUCCESS; int result = OB_SUCCESS; int64_t now = ObTimeUtility::current_time(); - obrpc::ObXARollbackRPCRequest req; - obrpc::ObXARPCCB cb; - ObTransCond cond; const int64_t wait_time = (INT64_MAX - now) / 2; const int64_t expire_ts = ObTimeUtility::current_time() + timeout_us; - if (OB_FAIL(cb.init(&cond))) { - TRANS_LOG(WARN, "ObXARPCCB init failed", KR(ret)); - } else if (OB_FAIL(req.init(tx_id, xid, timeout_us, request_id))) { - TRANS_LOG(WARN, "fail to init xa rollback rpc request", KR(ret), K(tx_id), K(xid)); - } else { - do { - result = OB_SUCCESS; - if (OB_FAIL(xa_rpc_.xa_rollback(MTL_ID(), sche_addr, req, &cb))) { - TRANS_LOG(WARN, "xa rollback rpc failed", KR(ret), K(req), K(sche_addr)); - } else if (OB_FAIL(cond.wait(wait_time, result))) { - TRANS_LOG(WARN, "wait xa rollback rpc callback failed", KR(ret), K(req), K(sche_addr)); - } else if (OB_SUCCESS != result) { - // do nothing - } - } while (OB_SUCC(ret) && (OB_EAGAIN == result || OB_TIMEOUT == result) && expire_ts > ObTimeUtility::current_time()); - } + do { + obrpc::ObXARollbackRPCRequest req; + obrpc::ObXARPCCB cb; + ObTransCond cond; + result = OB_SUCCESS; + if (OB_FAIL(cb.init(&cond))) { + TRANS_LOG(WARN, "ObXARPCCB init failed", KR(ret)); + } else if (OB_FAIL(req.init(tx_id, xid, timeout_us, request_id))) { + TRANS_LOG(WARN, "fail to init xa rollback rpc request", KR(ret), K(tx_id), K(xid)); + } else if (OB_FAIL(xa_rpc_.xa_rollback(MTL_ID(), sche_addr, req, &cb))) { + TRANS_LOG(WARN, "xa rollback rpc failed", KR(ret), K(req), K(sche_addr)); + } else if (OB_FAIL(cond.wait(wait_time, result))) { + TRANS_LOG(WARN, "wait xa rollback rpc callback failed", KR(ret), K(req), K(sche_addr)); + } else if (OB_SUCCESS != result) { + // do nothing + } + } while (OB_SUCC(ret) && (OB_EAGAIN == result || OB_TIMEOUT == result) && expire_ts > ObTimeUtility::current_time()); + if (OB_SUCC(ret)) { switch (result) { case OB_TRANS_XA_PROTO: case OB_TRANS_COMMITED: { ret = OB_TRANS_XA_PROTO; - TRANS_LOG(WARN, "xa rollback failed", KR(ret), K(result), K(req), K(sche_addr)); + TRANS_LOG(WARN, "xa rollback failed", KR(ret), K(result), K(sche_addr)); break; } case OB_SUCCESS: case OB_TRANS_ROLLBACKED: { ret = OB_SUCCESS; - TRANS_LOG(WARN, "xa rollback success", KR(ret), K(result), K(req), K(sche_addr)); + TRANS_LOG(WARN, "xa rollback success", KR(ret), K(result), K(sche_addr)); break; } case OB_TIMEOUT: case OB_EAGAIN: { ret = OB_TRANS_XA_RMFAIL; - TRANS_LOG(WARN, "xa rollback rpc failed", KR(ret), K(result), K(req), K(sche_addr)); + TRANS_LOG(WARN, "xa rollback rpc failed", KR(ret), K(result), K(sche_addr)); break; } default: { ret = OB_TRANS_XA_PROTO; - TRANS_LOG(WARN, "xa rollback rpc failed", KR(ret), K(result), K(req), K(sche_addr)); + TRANS_LOG(WARN, "xa rollback rpc failed", KR(ret), K(result), K(sche_addr)); break; } } @@ -2217,8 +2223,6 @@ int ObXAService::xa_prepare(const ObXATransID &xid, // 1. tightly coupled, OB_ERR_READ_ONLY_TRANSACTION, delete lock and record // 2. tightly coupled, OB_TRANS_XA_RDONLY, delete record only // 3. loosely coupled, OB_ERR_READ_ONLY_TRANSACTION, delete record - // 4. tightly coupled, OB_TRANS_XA_RBROLLBACK, delete lock and all records - // 5. loosely coupled, OB_TRANS_XA_RBROLLBACK, delete record if (OB_TRANS_XA_RDONLY == ret) { // TODO, verify delete_xa_record_state and ret(tmp_ret) if (OB_SUCCESS != (tmp_ret = delete_xa_record(tenant_id, xid))) { @@ -2230,10 +2234,6 @@ int ObXAService::xa_prepare(const ObXATransID &xid, TRANS_LOG(WARN, "delete xa branch failed", K(tmp_ret), K(xid), K(tenant_id), K(tx_id)); } ret = OB_TRANS_XA_RDONLY; - } else if (OB_TRANS_XA_RBROLLBACK == ret) { - if (OB_SUCCESS != (tmp_ret = delete_xa_branch(tenant_id, xid, is_tightly_coupled))) { - TRANS_LOG(WARN, "delete xa record failed", K(tmp_ret), K(xid), K(tenant_id), K(tx_id)); - } } return ret; @@ -2260,10 +2260,6 @@ int ObXAService::local_xa_prepare_(const ObXATransID &xid, if (OB_FAIL(xa_ctx_mgr_.get_xa_ctx(tx_id, alloc, xa_ctx))) { TRANS_LOG(WARN, "get xa ctx failed", K(ret), K(xid), K(tx_id), KP(xa_ctx)); - // TODO, verify - if (OB_TRANS_CTX_NOT_EXIST == ret) { - ret = OB_TRANS_XA_RBROLLBACK; - } } else if (OB_ISNULL(xa_ctx)) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "xa ctx is null", K(ret), K(xid), K(tx_id)); diff --git a/src/storage/tx/ob_xa_trans_heartbeat_worker.cpp b/src/storage/tx/ob_xa_trans_heartbeat_worker.cpp index 4eb65ad5ec..6d6b6c0df1 100644 --- a/src/storage/tx/ob_xa_trans_heartbeat_worker.cpp +++ b/src/storage/tx/ob_xa_trans_heartbeat_worker.cpp @@ -64,6 +64,7 @@ void ObXATransHeartbeatWorker::run1() while (!has_set_stop()) { int64_t start_time = ObTimeUtility::current_time(); loop_count++; + ObXAStatistics::get_instance().print_statistics(start_time); if (OB_UNLIKELY(!is_inited_)) { TRANS_LOG(WARN, "xa trans heartbeat not init"); diff --git a/unittest/storage/tx/test_ob_tx_log.cpp b/unittest/storage/tx/test_ob_tx_log.cpp index 34d5d5cf35..75ade3ce29 100644 --- a/unittest/storage/tx/test_ob_tx_log.cpp +++ b/unittest/storage/tx/test_ob_tx_log.cpp @@ -190,7 +190,8 @@ TEST_F(TestObTxLog, tx_log_body_except_redo) TEST_FIRST_SCN, TEST_LAST_SCN, TEST_MAX_SUBMITTED_SEQ_NO, - TEST_CLUSTER_VERSION); + TEST_CLUSTER_VERSION, + TEST_XID); ObTxPrepareLog filll_prepare(TEST_LS_ARRAY, TEST_LOG_OFFSET); ObTxCommitLog fill_commit(share::SCN::base_scn(), TEST_CHECKSUM, @@ -548,6 +549,8 @@ TEST_F(TestObTxLog, test_default_log_deserialize) replay_member_cnt++; EXPECT_EQ(fill_active_state.get_max_submitted_seq_no(), replay_active_state.get_max_submitted_seq_no()); replay_member_cnt++; + EXPECT_EQ(fill_active_state.get_xid(), replay_active_state.get_xid()); + replay_member_cnt++; EXPECT_EQ(replay_member_cnt, fill_member_cnt); ObTxCommitInfoLogTempRef commit_state_temp_ref;