From f03ab8c18670a5e272e6637c02e1e7fe9f68523b Mon Sep 17 00:00:00 2001 From: jw-guo Date: Thu, 28 Mar 2024 08:45:30 +0000 Subject: [PATCH] [CP] [xa/dblink] add statistics for xa and dblink --- src/sql/dblink/ob_tm_service.cpp | 33 +- src/sql/engine/cmd/ob_xa_executor.cpp | 56 +- src/storage/CMakeLists.txt | 1 + src/storage/tx/ob_dblink_client.cpp | 83 ++- src/storage/tx/ob_dblink_client.h | 8 +- src/storage/tx/ob_xa_ctx.cpp | 23 +- src/storage/tx/ob_xa_ctx.h | 2 + src/storage/tx/ob_xa_ctx_mgr.cpp | 4 +- src/storage/tx/ob_xa_dblink_service.cpp | 10 +- src/storage/tx/ob_xa_rpc.cpp | 43 ++ src/storage/tx/ob_xa_rpc.h | 27 +- src/storage/tx/ob_xa_service.cpp | 148 +++- src/storage/tx/ob_xa_service.h | 180 ++++- src/storage/tx/ob_xa_trans_event.cpp | 704 ++++++++++++++++++ src/storage/tx/ob_xa_trans_event.h | 281 +++++++ .../tx/ob_xa_trans_heartbeat_worker.cpp | 3 +- 16 files changed, 1530 insertions(+), 76 deletions(-) create mode 100644 src/storage/tx/ob_xa_trans_event.cpp create mode 100644 src/storage/tx/ob_xa_trans_event.h diff --git a/src/sql/dblink/ob_tm_service.cpp b/src/sql/dblink/ob_tm_service.cpp index 7db6deb626..5c0b50d3b6 100644 --- a/src/sql/dblink/ob_tm_service.cpp +++ b/src/sql/dblink/ob_tm_service.cpp @@ -138,6 +138,16 @@ int ObTMService::tm_rm_start(ObExecContext &exec_ctx, tx_id = tx_desc->tid(); } my_session->get_raw_audit_record().trans_id_ = my_session->get_tx_id(); + // for statistics + if (need_start || need_promote) { + DBLINK_STAT_ADD_TRANS_COUNT(); + if (need_promote) { + DBLINK_STAT_ADD_TRANS_PROMOTION_COUNT(); + } + if (OB_SUCCESS != ret) { + DBLINK_STAT_ADD_TRANS_FAIL_COUNT(); + } + } LOG_INFO("tm rm start", K(ret), K(tx_id), K(remote_xid), K(need_start), K(need_promote)); // if fail, the trans should be rolled back by client @@ -159,6 +169,8 @@ int ObTMService::tm_commit(ObExecContext &exec_ctx, ObSQLSessionInfo *my_session = GET_MY_SESSION(exec_ctx); ObTxDesc *&tx_desc = my_session->get_tx_desc(); ObXAService *xa_service = MTL(ObXAService*); + const int64_t start_ts = ObTimeUtility::current_time(); + if (NULL == xa_service || NULL == my_session || NULL == tx_desc) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected param", K(ret), KP(xa_service), KP(my_session), KP(tx_desc)); @@ -181,7 +193,14 @@ int ObTMService::tm_commit(ObExecContext &exec_ctx, my_session->reset_tx_variable(); my_session->disassociate_xa(); } - LOG_INFO("tm commit for dblink trans", K(tx_id)); + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + DBLINK_STAT_ADD_TRANS_COMMIT_COUNT(); + DBLINK_STAT_ADD_TRANS_COMMIT_USED_TIME(used_time_us); + if (OB_FAIL(ret)) { + DBLINK_STAT_ADD_TRANS_COMMIT_FAIL_COUNT(); + } + LOG_INFO("tm commit for dblink trans", K(tx_id), K(used_time_us)); return ret; } @@ -197,6 +216,8 @@ int ObTMService::tm_rollback(ObExecContext &exec_ctx, ObSQLSessionInfo *my_session = GET_MY_SESSION(exec_ctx); ObTxDesc *&tx_desc = my_session->get_tx_desc(); ObXAService *xa_service = MTL(ObXAService*); + const int64_t start_ts = ObTimeUtility::current_time(); + if (NULL == xa_service || NULL == my_session || NULL == tx_desc) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected param", K(ret), KP(xa_service), KP(my_session), KP(tx_desc)); @@ -216,7 +237,14 @@ int ObTMService::tm_rollback(ObExecContext &exec_ctx, my_session->reset_tx_variable(); my_session->disassociate_xa(); } - LOG_INFO("tm rollback for dblink trans", K(tx_id)); + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + DBLINK_STAT_ADD_TRANS_ROLLBACK_COUNT(); + DBLINK_STAT_ADD_TRANS_ROLLBACK_USED_TIME(used_time_us); + if (OB_FAIL(ret)) { + DBLINK_STAT_ADD_TRANS_ROLLBACK_FAIL_COUNT(); + } + LOG_INFO("tm rollback for dblink trans", K(tx_id), K(used_time_us)); return ret; } @@ -247,6 +275,7 @@ int ObTMService::recover_tx_for_callback(const ObTransID &tx_id, my_session->associate_xa(tx_desc->get_xid()); } } + DBLINK_STAT_ADD_TRANS_CALLBACK_COUNT(); LOG_INFO("recover tx for dblink callback", K(ret), K(tx_id)); return ret; } diff --git a/src/sql/engine/cmd/ob_xa_executor.cpp b/src/sql/engine/cmd/ob_xa_executor.cpp index 4687f18575..88773e3589 100644 --- a/src/sql/engine/cmd/ob_xa_executor.cpp +++ b/src/sql/engine/cmd/ob_xa_executor.cpp @@ -49,6 +49,8 @@ int ObPlXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt) int64_t org_cluster_id = OB_INVALID_ORG_CLUSTER_ID; int64_t tx_timeout = 0; uint64_t tenant_id = 0; + const int64_t start_ts = ObTimeUtility::current_time(); + ObXAStmtGuard xa_stmt_guard(start_ts); if (OB_ISNULL(plan_ctx) || OB_ISNULL(my_session)) { ret = OB_ERR_UNEXPECTED; @@ -111,7 +113,14 @@ int ObPlXaStartExecutor::execute(ObExecContext &ctx, ObXaStartStmt &stmt) my_session->get_raw_audit_record().trans_id_ = my_session->get_tx_id(); } } - LOG_INFO("xa start execute", K(stmt)); + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + XA_STAT_ADD_XA_START_TOTAL_COUNT(); + XA_STAT_ADD_XA_START_TOTAL_USED_TIME(used_time_us); + if (OB_FAIL(ret)) { + XA_STAT_ADD_XA_START_FAIL_COUNT(); + } + LOG_INFO("xa start", K(ret), K(stmt), K(xid), K(used_time_us)); return ret; } @@ -182,6 +191,8 @@ int ObPlXaEndExecutor::execute(ObExecContext &ctx, ObXaEndStmt &stmt) ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx); ObTaskExecutorCtx &task_exec_ctx = ctx.get_task_exec_ctx(); ObXATransID xid; + const int64_t start_ts = ObTimeUtility::current_time(); + ObXAStmtGuard xa_stmt_guard(start_ts); if (OB_ISNULL(my_session)) { ret = OB_ERR_UNEXPECTED; @@ -226,7 +237,14 @@ int ObPlXaEndExecutor::execute(ObExecContext &ctx, ObXaEndStmt &stmt) ctx.set_need_disconnect(false); } } - LOG_DEBUG("xa end execute", K(stmt)); + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + XA_STAT_ADD_XA_END_TOTAL_COUNT(); + XA_STAT_ADD_XA_END_TOTAL_USED_TIME(used_time_us); + if (OB_FAIL(ret)) { + XA_STAT_ADD_XA_END_FAIL_COUNT(); + } + LOG_DEBUG("xa end", K(ret), K(stmt), K(xid), K(used_time_us)); return ret; } @@ -280,6 +298,8 @@ int ObPlXaPrepareExecutor::execute(ObExecContext &ctx, ObXaPrepareStmt &stmt) ObTaskExecutorCtx &task_exec_ctx = ctx.get_task_exec_ctx(); ObXATransID xid; ObTransID tx_id; + const int64_t start_ts = ObTimeUtility::current_time(); + ObXAStmtGuard xa_stmt_guard(start_ts); if (OB_ISNULL(my_session)) { ret = OB_ERR_UNEXPECTED; @@ -304,8 +324,14 @@ int ObPlXaPrepareExecutor::execute(ObExecContext &ctx, ObXaPrepareStmt &stmt) } my_session->get_raw_audit_record().trans_id_ = tx_id; } - - LOG_INFO("xa prepare execute", K(ret), K(stmt), K(xid), K(tx_id)); + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + XA_STAT_ADD_XA_PREPARE_TOTAL_COUNT(); + XA_STAT_ADD_XA_PREPARE_TOTAL_USED_TIME(used_time_us); + if (OB_SUCCESS != ret && OB_TRANS_XA_RDONLY != ret) { + XA_STAT_ADD_XA_PREPARE_FAIL_COUNT(); + } + LOG_INFO("xa prepare", K(ret), K(xid), K(tx_id), K(used_time_us)); return ret; } @@ -410,6 +436,8 @@ int ObPlXaCommitExecutor::execute(ObExecContext &ctx, ObXaCommitStmt &stmt) ObXATransID xid; ObTransID tx_id; bool has_tx_level_temp_table = false; + const int64_t start_ts = ObTimeUtility::current_time(); + ObXAStmtGuard xa_stmt_guard(start_ts); if (!stmt.is_valid_oracle_xid()) { ret = OB_TRANS_XA_INVAL; LOG_WARN("invalid xid for oracle mode", K(ret), K(stmt)); @@ -436,7 +464,14 @@ int ObPlXaCommitExecutor::execute(ObExecContext &ctx, ObXaCommitStmt &stmt) LOG_WARN("trx level temporary table clean failed", KR(temp_ret)); } } - LOG_INFO("xa commit", K(ret), K(stmt), K(xid), K(tx_id)); + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + XA_STAT_ADD_XA_COMMIT_TOTAL_COUNT(); + XA_STAT_ADD_XA_COMMIT_TOTAL_USED_TIME(used_time_us); + if (OB_FAIL(ret)) { + XA_STAT_ADD_XA_COMMIT_FAIL_COUNT(); + } + LOG_INFO("xa commit", K(ret), K(stmt), K(xid), K(tx_id), K(used_time_us)); return ret; } @@ -448,6 +483,8 @@ int ObPlXaRollbackExecutor::execute(ObExecContext &ctx, ObXaRollBackStmt &stmt) int64_t xa_timeout_seconds = my_session->get_xa_end_timeout_seconds(); ObXATransID xid; ObTransID tx_id; + const int64_t start_ts = ObTimeUtility::current_time(); + ObXAStmtGuard xa_stmt_guard(start_ts); if (!stmt.is_valid_oracle_xid()) { ret = OB_TRANS_XA_INVAL; LOG_WARN("invalid xid for oracle mode", K(ret), K(stmt)); @@ -467,7 +504,14 @@ int ObPlXaRollbackExecutor::execute(ObExecContext &ctx, ObXaRollBackStmt &stmt) } my_session->get_raw_audit_record().trans_id_ = tx_id; } - LOG_INFO("xa rollback", K(ret), K(stmt), K(xid), K(tx_id)); + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + XA_STAT_ADD_XA_ROLLBACK_TOTAL_COUNT(); + XA_STAT_ADD_XA_ROLLBACK_TOTAL_USED_TIME(used_time_us); + if (OB_FAIL(ret)) { + XA_STAT_ADD_XA_ROLLBACK_FAIL_COUNT(); + } + LOG_INFO("xa rollback", K(ret), K(stmt), K(xid), K(tx_id), K(used_time_us)); return ret; } diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 03bb9df1fe..9a20b7bbfa 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -376,6 +376,7 @@ ob_set_subtarget(ob_storage tx tx/ob_xa_query.cpp tx/ob_xa_rpc.cpp tx/ob_xa_service.cpp + tx/ob_xa_trans_event.cpp tx/ob_xa_trans_heartbeat_worker.cpp tx/ob_tx_ctx_mds.cpp tx/ob_multi_data_source.cpp diff --git a/src/storage/tx/ob_dblink_client.cpp b/src/storage/tx/ob_dblink_client.cpp index ffeadf7dd9..b19c17a5c8 100644 --- a/src/storage/tx/ob_dblink_client.cpp +++ b/src/storage/tx/ob_dblink_client.cpp @@ -38,13 +38,16 @@ void ObDBLinkClient::reset() mtl_free(impl_); impl_ = NULL; } + tx_timeout_us_ = -1; + dblink_statistics_ = NULL; is_inited_ = false; } int ObDBLinkClient::init(const uint32_t index, const DblinkDriverProto dblink_type, const int64_t tx_timeout_us, - ObISQLConnection *dblink_conn) + ObISQLConnection *dblink_conn, + ObDBLinkTransStatistics *dblink_statistics) { int ret = OB_SUCCESS; if (is_inited_) { @@ -53,17 +56,19 @@ int ObDBLinkClient::init(const uint32_t index, } else if (DblinkDriverProto::DBLINK_UNKNOWN == dblink_type || NULL == dblink_conn || 0 > tx_timeout_us - || 0 == index) { - ret = OB_ERR_UNEXPECTED; + || 0 == index + || NULL == dblink_statistics) { + ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "invalid arguments", K(ret), K(dblink_type), KP(dblink_conn), - K(tx_timeout_us), K(index)); + K(tx_timeout_us), K(index), KP(dblink_statistics)); } else { index_ = index; dblink_conn_ = dblink_conn; dblink_type_ = dblink_type; tx_timeout_us_ = tx_timeout_us; + dblink_statistics_ = dblink_statistics; is_inited_ = true; - TRANS_LOG(INFO, "init", K(*this)); + TRANS_LOG(INFO, "dblink client init", K(*this)); } return ret; } @@ -76,10 +81,14 @@ int ObDBLinkClient::rm_xa_start(const ObXATransID &xid, const ObTxIsolationLevel { int ret = OB_SUCCESS; ObSpinLockGuard guard(lock_); + const int64_t start_ts = ObTimeUtility::current_time(); if (!is_inited_) { ret = OB_NOT_INIT; TRANS_LOG(WARN, "dblink client is not inited", K(ret), K(xid), K(*this)); + } else if (NULL == dblink_statistics_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "unexpected dblink client", K(ret), K(xid), K(*this), KP_(dblink_statistics)); } else if (!xid.is_valid() || xid.empty() || ObTxIsolationLevel::INVALID == isolation) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "invalid argument", K(ret), K(xid), K(isolation)); @@ -110,6 +119,13 @@ int ObDBLinkClient::rm_xa_start(const ObXATransID &xid, const ObTxIsolationLevel } TRANS_LOG(INFO, "rm xa start for dblink", K(ret), K(xid), K(isolation), K(flag)); } + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + dblink_statistics_->inc_dblink_trans_xa_start_count(); + dblink_statistics_->add_dblink_trans_xa_start_used_time(used_time_us); + if (OB_FAIL(ret)) { + dblink_statistics_->inc_dblink_trans_xa_start_fail_count(); + } return ret; } @@ -123,6 +139,9 @@ int ObDBLinkClient::rm_xa_end() if (!is_inited_) { ret = OB_NOT_INIT; TRANS_LOG(WARN, "dblink client is not inited", K(ret), K(*this)); + } else if (NULL == dblink_statistics_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "unexpected dblink client", K(ret), K(*this), KP_(dblink_statistics)); } else if (!xid_.is_valid() || xid_.empty()) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "invalid xid", K(ret), K(*this)); @@ -149,6 +168,9 @@ int ObDBLinkClient::rm_xa_prepare() if (!is_inited_) { ret = OB_NOT_INIT; TRANS_LOG(WARN, "dblink client is not inited", K(ret), K(*this)); + } else if (NULL == dblink_statistics_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "unexpected dblink client", K(ret), K(*this), KP_(dblink_statistics)); } else if (!xid_.is_valid() || xid_.empty()) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "invalid xid", K(ret), K(*this)); @@ -173,6 +195,7 @@ int ObDBLinkClient::rm_xa_prepare() TRANS_LOG(WARN, "unexpected dblink client", K(ret), K(*this)); } } else { + const int64_t start_ts = ObTimeUtility::current_time(); state_ = ObDBLinkClientState::PREPARING; if (OB_FAIL(impl_->xa_prepare(xid_))) { if (OB_TRANS_XA_RDONLY != ret) { @@ -186,7 +209,14 @@ int ObDBLinkClient::rm_xa_prepare() } else { // TODO, handle exceptions } - TRANS_LOG(INFO, "rm xa prepare for dblink", K(ret), K_(xid)); + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + dblink_statistics_->inc_dblink_trans_xa_prepare_count(); + dblink_statistics_->add_dblink_trans_xa_prepare_used_time(used_time_us); + if (OB_FAIL(ret) && OB_TRANS_XA_RDONLY != ret) { + dblink_statistics_->inc_dblink_trans_xa_prepare_fail_count(); + } + TRANS_LOG(INFO, "rm xa prepare for dblink", K(ret), K_(xid), K(used_time_us)); } return ret; } @@ -199,10 +229,12 @@ int ObDBLinkClient::rm_xa_commit() { int ret = OB_SUCCESS; ObSpinLockGuard guard(lock_); - ObSqlString sql; if (!is_inited_) { ret = OB_NOT_INIT; TRANS_LOG(WARN, "dblink client is not inited", K(ret), K(*this)); + } else if (NULL == dblink_statistics_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "unexpected dblink client", K(ret), K(*this), KP_(dblink_statistics)); } else if (!xid_.is_valid() || xid_.empty()) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "invalid xid", K(ret), K(xid_)); @@ -220,6 +252,7 @@ int ObDBLinkClient::rm_xa_commit() } } else { // two phase commit + const int64_t start_ts = ObTimeUtility::current_time(); const int64_t flags = ObXAFlag::OBTMNOFLAGS; state_ = ObDBLinkClientState::COMMITTING; if (OB_FAIL(impl_->xa_commit(xid_, flags))) { @@ -227,10 +260,14 @@ int ObDBLinkClient::rm_xa_commit() } else { state_ = ObDBLinkClientState::COMMITTED; } - if (OB_SUCCESS != ret) { - // TODO, handle exceptions + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + dblink_statistics_->inc_dblink_trans_xa_commit_count(); + dblink_statistics_->add_dblink_trans_xa_commit_used_time(used_time_us); + if (OB_FAIL(ret)) { + dblink_statistics_->inc_dblink_trans_xa_commit_fail_count(); } - TRANS_LOG(INFO, "rm xa commit for dblink", K(ret), K_(xid)); + TRANS_LOG(INFO, "rm xa commit for dblink", K(ret), K_(xid), K(used_time_us)); } return ret; } @@ -244,12 +281,14 @@ int ObDBLinkClient::rm_xa_rollback() { int ret = OB_SUCCESS; ObSpinLockGuard guard(lock_); - ObSqlString sql; // step 1, execute xa end if necessary if (!is_inited_) { ret = OB_NOT_INIT; TRANS_LOG(WARN, "dblink client is not inited", K(ret), K(*this)); + } else if (NULL == dblink_statistics_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "unexpected dblink client", K(ret), K(*this), KP_(dblink_statistics)); } else if (!xid_.is_valid() || xid_.empty()) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "invalid xid", K(ret), K(xid_)); @@ -276,16 +315,21 @@ int ObDBLinkClient::rm_xa_rollback() TRANS_LOG(WARN, "unexpected dblink client", K(ret), K(*this)); } } else { + const int64_t start_ts = ObTimeUtility::current_time(); state_ = ObDBLinkClientState::ROLLBACKING; if (OB_FAIL(impl_->xa_rollback(xid_))) { TRANS_LOG(WARN, "fail to execute query", K(ret), K(*this)); } else { state_ = ObDBLinkClientState::ROLLBACKED; } - if (OB_SUCCESS != ret) { - // TODO, handle exceptions + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + dblink_statistics_->inc_dblink_trans_xa_rollback_count(); + dblink_statistics_->add_dblink_trans_xa_rollback_used_time(used_time_us); + if (OB_FAIL(ret)) { + dblink_statistics_->inc_dblink_trans_xa_rollback_fail_count(); } - TRANS_LOG(INFO, "rm xa rollback for dblink", K(ret), K_(xid)); + TRANS_LOG(INFO, "rm xa rollback for dblink", K(ret), K_(xid), K(used_time_us)); } return ret; } @@ -293,7 +337,7 @@ int ObDBLinkClient::rm_xa_rollback() int ObDBLinkClient::rm_xa_end_() { int ret = OB_SUCCESS; - ObSqlString sql; + const int64_t start_ts = ObTimeUtility::current_time(); if (ObDBLinkClientState::START != state_) { if (ObDBLinkClientState::END == state_) { // return OB_SUCCESS @@ -310,8 +354,15 @@ int ObDBLinkClient::rm_xa_end_() if (OB_SUCCESS != ret) { // TODO, handle exceptions } - TRANS_LOG(INFO, "rm xa end for dblink", K(ret), K_(xid)); } + // for statistics + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts; + dblink_statistics_->inc_dblink_trans_xa_end_count(); + dblink_statistics_->add_dblink_trans_xa_end_used_time(used_time_us); + if (OB_FAIL(ret)) { + dblink_statistics_->inc_dblink_trans_xa_end_fail_count(); + } + TRANS_LOG(INFO, "rm xa end for dblink", K(ret), K_(xid), K(used_time_us)); return ret; } diff --git a/src/storage/tx/ob_dblink_client.h b/src/storage/tx/ob_dblink_client.h index 83046feea7..1eadbe733b 100644 --- a/src/storage/tx/ob_dblink_client.h +++ b/src/storage/tx/ob_dblink_client.h @@ -14,6 +14,7 @@ #include "lib/mysqlclient/ob_isql_connection_pool.h" #include "storage/tx/ob_trans_define.h" #include "storage/tx/ob_xa_query.h" +#include "ob_xa_trans_event.h" namespace oceanbase { @@ -54,7 +55,8 @@ public: explicit ObDBLinkClient() : lock_(), is_inited_(false), index_(0), xid_(), state_(ObDBLinkClientState::IDLE), dblink_type_(common::sqlclient::DblinkDriverProto::DBLINK_UNKNOWN), dblink_conn_(NULL), - impl_(NULL), tx_timeout_us_(-1) + impl_(NULL), tx_timeout_us_(-1), + dblink_statistics_(NULL) {} ~ObDBLinkClient() { destroy(); } void reset(); @@ -62,7 +64,8 @@ public: int init(const uint32_t index, const common::sqlclient::DblinkDriverProto dblink_type, const int64_t tx_timeout_us, - common::sqlclient::ObISQLConnection *dblink_conn); + common::sqlclient::ObISQLConnection *dblink_conn, + ObDBLinkTransStatistics *dblink_statistics); public: int rm_xa_start(const transaction::ObXATransID &xid, const ObTxIsolationLevel isolation); int rm_xa_end(); @@ -93,6 +96,7 @@ protected: common::sqlclient::ObISQLConnection *dblink_conn_; transaction::ObXAQuery *impl_; int64_t tx_timeout_us_; + ObDBLinkTransStatistics *dblink_statistics_; private: DISALLOW_COPY_AND_ASSIGN(ObDBLinkClient); }; diff --git a/src/storage/tx/ob_xa_ctx.cpp b/src/storage/tx/ob_xa_ctx.cpp index 4180c717c4..edf3b4f159 100644 --- a/src/storage/tx/ob_xa_ctx.cpp +++ b/src/storage/tx/ob_xa_ctx.cpp @@ -1090,13 +1090,14 @@ int ObXACtx::xa_start_for_dblink(const ObXATransID &xid, // @param[out] client int ObXACtx::get_dblink_client(const DblinkDriverProto dblink_type, ObISQLConnection *dblink_conn, + ObDBLinkTransStatistics *dblink_statistics, ObDBLinkClient *&client) { int ret = OB_SUCCESS; ObLatchWGuard guard(lock_, common::ObLatchIds::XA_CTX_LOCK); - if (OB_ISNULL(dblink_conn)) { + if (OB_ISNULL(dblink_conn) || OB_ISNULL(dblink_statistics)) { ret = OB_INVALID_ARGUMENT; - TRANS_LOG(WARN, "invalid argument", K(ret), KP(dblink_conn)); + TRANS_LOG(WARN, "invalid argument", K(ret), KP(dblink_conn), KP(dblink_statistics)); } else if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; TRANS_LOG(WARN, "xa ctx not inited", K(ret), K(*this)); @@ -1105,7 +1106,7 @@ int ObXACtx::get_dblink_client(const DblinkDriverProto dblink_type, TRANS_LOG(WARN, "xa trans is exiting", K(ret), K(*this)); } else { ObDBLinkClient *tmp_client = NULL; - if (OB_FAIL(get_dblink_client_(dblink_type, dblink_conn, tmp_client))) { + if (OB_FAIL(get_dblink_client_(dblink_type, dblink_conn, dblink_statistics, tmp_client))) { TRANS_LOG(WARN, "fail to get dblink client", K(ret), K(*this)); } else if (NULL == tmp_client) { ret = OB_ERR_UNEXPECTED; @@ -1124,6 +1125,7 @@ int ObXACtx::get_dblink_client(const DblinkDriverProto dblink_type, // @param[out] dblink_client int ObXACtx::get_dblink_client_(const DblinkDriverProto dblink_type, ObISQLConnection *dblink_conn, + ObDBLinkTransStatistics *dblink_statistics, ObDBLinkClient *&dblink_client) { int ret = OB_SUCCESS; @@ -1151,7 +1153,7 @@ int ObXACtx::get_dblink_client_(const DblinkDriverProto dblink_type, client = new(ptr) ObDBLinkClient(); const int64_t timeout_us = tx_desc_->get_timeout_us(); if (OB_FAIL(client->init(client_max_index + 1, dblink_type, timeout_us, - dblink_conn))) { + dblink_conn, dblink_statistics))) { TRANS_LOG(WARN, "fail to init dblink client", K(ret), K(*this)); } else if (OB_FAIL(dblink_client_array_.push_back(client))) { TRANS_LOG(WARN, "fail to push dblink client to array", K(ret), K(*this)); @@ -1578,6 +1580,8 @@ int ObXACtx::xa_start_remote_first_(const ObXATransID &xid, tx_desc_->set_xid(xid); tx_desc_->set_xa_ctx(this); tx_desc = tx_desc_; + // for statistics + XA_STAT_ADD_XA_START_REMOTE_COUNT(); } notify_xa_start_complete_(ret); @@ -1658,6 +1662,8 @@ int ObXACtx::xa_start_remote_second_(const ObXATransID &xid, ++xa_ref_count_; // set tx_desc in session tx_desc = tx_desc_; + // for statistics + XA_STAT_ADD_XA_START_REMOTE_COUNT(); } return ret; } @@ -2208,6 +2214,10 @@ int ObXACtx::xa_end_loose_remote_(const ObXATransID &xid, ret = result; } } + // for statistics + if (OB_SUCC(ret)) { + XA_STAT_ADD_XA_END_REMOTE_COUNT(); + } return ret; } @@ -2270,6 +2280,10 @@ int ObXACtx::xa_end_tight_remote_(const ObXATransID &xid, ret = result; } } + // for statistics + if (OB_SUCC(ret)) { + XA_STAT_ADD_XA_END_REMOTE_COUNT(); + } return ret; } @@ -2869,6 +2883,7 @@ int ObXACtx::xa_prepare_(const ObXATransID &xid, const int64_t timeout_us, bool uint64_t data_version = 0; if (OB_FAIL(MTL(ObTransService*)->prepare_tx_coord(*tx_desc_, coord))) { if (OB_ERR_READ_ONLY_TRANSACTION == ret) { + XA_STAT_ADD_XA_READ_ONLY_TRANS_TOTAL_COUNT(); TRANS_LOG(INFO, "xa is read only", K(ret), K(*this)); } else { TRANS_LOG(WARN, "fail to prepare tx coord", K(ret), K(*this)); diff --git a/src/storage/tx/ob_xa_ctx.h b/src/storage/tx/ob_xa_ctx.h index 0aee385cfe..fb6f4823e4 100644 --- a/src/storage/tx/ob_xa_ctx.h +++ b/src/storage/tx/ob_xa_ctx.h @@ -155,6 +155,7 @@ public: ObTxDesc *tx_desc); int get_dblink_client(const common::sqlclient::DblinkDriverProto dblink_type, common::sqlclient::ObISQLConnection *dblink_conn, + ObDBLinkTransStatistics *dblink_statistics, ObDBLinkClient *&client); int remove_dblink_client(ObDBLinkClient *client); ObDBLinkClientArray &get_dblink_client_array() { return dblink_client_array_; } @@ -257,6 +258,7 @@ private: // for 4.0 dblink int get_dblink_client_(const common::sqlclient::DblinkDriverProto dblink_type, common::sqlclient::ObISQLConnection *dblink_conn, + ObDBLinkTransStatistics *dblink_statistics, ObDBLinkClient *&dblink_client); private: static const int MIN_TX_REF_COUNT = 3; diff --git a/src/storage/tx/ob_xa_ctx_mgr.cpp b/src/storage/tx/ob_xa_ctx_mgr.cpp index 1096d573c6..d6c9e4a0f5 100644 --- a/src/storage/tx/ob_xa_ctx_mgr.cpp +++ b/src/storage/tx/ob_xa_ctx_mgr.cpp @@ -28,7 +28,7 @@ void XACtxAlloc::free_value(ObXACtx* ctx) ctx->destroy(); op_reclaim_free(ctx); ctx = NULL; - MTL(ObXAService *)->get_xa_statistics().dec_ctx_count(); + XA_ACTIVE_DECREMENT_XA_CTX_COUNT(); } } @@ -183,7 +183,7 @@ int ObXACtxMgr::get_xa_ctx_(const ObTransID &trans_id, bool &alloc, ObXACtx*& ct } else { ctx = tmp_ctx; inc_total_ctx_count(); - MTL(ObXAService *)->get_xa_statistics().inc_ctx_count(); + XA_ACTIVE_INCREMENT_XA_CTX_COUNT(); } } diff --git a/src/storage/tx/ob_xa_dblink_service.cpp b/src/storage/tx/ob_xa_dblink_service.cpp index d1b990a618..c1b716295c 100644 --- a/src/storage/tx/ob_xa_dblink_service.cpp +++ b/src/storage/tx/ob_xa_dblink_service.cpp @@ -135,10 +135,10 @@ int ObXAService::xa_start_for_tm_promotion(const int64_t flags, if (OB_FAIL(ret)) { TRANS_LOG(WARN, "xa start for dblink promotion failed", K(ret), K(xid), K(flags)); - xa_statistics_.inc_failure_dblink_promotion(); + // xa_statistics_.inc_failure_dblink_promotion(); } else { TRANS_LOG(INFO, "xa start for dblink promtion", K(ret), K(xid), K(flags)); - xa_statistics_.inc_success_dblink_promotion(); + // xa_statistics_.inc_success_dblink_promotion(); } return ret; @@ -270,10 +270,10 @@ int ObXAService::xa_start_for_tm(const int64_t flags, if (OB_FAIL(ret)) { TRANS_LOG(WARN, "xa start for dblink failed", K(ret), K(xid), K(flags)); - xa_statistics_.inc_failure_dblink(); + // xa_statistics_.inc_failure_dblink(); } else { TRANS_LOG(INFO, "xa start for dblink", K(ret), K(xid), K(flags)); - xa_statistics_.inc_success_dblink(); + // xa_statistics_.inc_success_dblink(); } return ret; @@ -455,7 +455,7 @@ int ObXAService::xa_start_for_dblink_client(const DblinkDriverProto dblink_type, if (NULL == xa_ctx) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "unexpected xa context", K(ret), K(xid), K(tx_id)); - } else if (OB_FAIL(xa_ctx->get_dblink_client(dblink_type, dblink_conn, client))) { + } else if (OB_FAIL(xa_ctx->get_dblink_client(dblink_type, dblink_conn, &dblink_statistics_, client))) { TRANS_LOG(WARN, "fail to preapre xa start for dblink client", K(ret), K(xid), K(tx_id)); } else if (NULL == client) { ret = OB_ERR_UNEXPECTED; diff --git a/src/storage/tx/ob_xa_rpc.cpp b/src/storage/tx/ob_xa_rpc.cpp index f7e81aaff5..8ad90a17dc 100644 --- a/src/storage/tx/ob_xa_rpc.cpp +++ b/src/storage/tx/ob_xa_rpc.cpp @@ -686,6 +686,49 @@ int ObXAHbRespP::process() return ret; } +template +void ObXARPCCB::statistics() +{ + int ret = OB_SUCCESS; + if (is_valid_tenant_id(tenant_id_) && 0 < start_ts_) { + MTL_SWITCH(tenant_id_) { + ObXAService *xa_service = MTL(ObXAService *); + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts_; + if (NULL != xa_service && 0 < used_time_us) { + xa_service->get_statistics().inc_xa_inner_rpc_total_count(); + xa_service->get_statistics().add_xa_inner_rpc_total_used_time(used_time_us); + if (10000 <= used_time_us) { + xa_service->get_statistics().inc_xa_inner_rpc_ten_ms_total_count(); + } + if (20000 <= used_time_us) { + xa_service->get_statistics().inc_xa_inner_rpc_twenty_ms_total_count(); + } + } + } // MTL_SWITCH + } +} + +void ObXACommitRPCCB::statistics() +{ + int ret = OB_SUCCESS; + ObXAService *xa_service = MTL(ObXAService *); + if (is_valid_tenant_id(tenant_id_) && 0 < start_ts_) { + MTL_SWITCH(tenant_id_) { + ObXAService *xa_service = MTL(ObXAService *); + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts_; + if (NULL != xa_service && 0 < used_time_us) { + xa_service->get_statistics().inc_xa_inner_rpc_total_count(); + xa_service->get_statistics().add_xa_inner_rpc_total_used_time(used_time_us); + if (10000 <= used_time_us) { + xa_service->get_statistics().inc_xa_inner_rpc_ten_ms_total_count(); + } + if (20000 <= used_time_us) { + xa_service->get_statistics().inc_xa_inner_rpc_twenty_ms_total_count(); + } + } + } + } +} }//obrpc using namespace obrpc; diff --git a/src/storage/tx/ob_xa_rpc.h b/src/storage/tx/ob_xa_rpc.h index 3f02183450..fab00e73dd 100644 --- a/src/storage/tx/ob_xa_rpc.h +++ b/src/storage/tx/ob_xa_rpc.h @@ -460,7 +460,7 @@ template class ObXARPCCB : public ObXARpcProxy::AsyncCB { public: - ObXARPCCB() : is_inited_(false), cond_(NULL) {} + ObXARPCCB() : is_inited_(false), cond_(NULL), start_ts_(-1), tenant_id_(OB_INVALID_TENANT_ID) {} ~ObXARPCCB() {} int init(transaction::ObTransCond *cond) { @@ -470,6 +470,8 @@ public: TRANS_LOG(WARN, "ObXARPCCB init twice", KR(ret)); } else { cond_ = cond; + start_ts_ = ObTimeUtility::current_time(); + tenant_id_ = MTL_ID(); is_inited_ = true; } return ret; @@ -477,6 +479,8 @@ public: void reset() { cond_ = NULL; + start_ts_ = -1; + tenant_id_ = OB_INVALID_TENANT_ID; is_inited_ = false; } void set_args(const typename ObXARpcProxy::AsyncCB::Request &args) @@ -490,6 +494,8 @@ public: if (NULL != buf) { newcb = new (buf) ObXARPCCB(); newcb->cond_ = cond_; + newcb->start_ts_ = start_ts_; + newcb->tenant_id_ = tenant_id_; newcb->is_inited_ = true; } return newcb; @@ -511,6 +517,7 @@ public: cond_->notify(result); } } + statistics(); return OB_SUCCESS; } void on_timeout() @@ -523,16 +530,21 @@ public: if (OB_NOT_NULL(cond_)) { cond_->notify(ret); } + statistics(); } + void statistics(); private: bool is_inited_; transaction::ObTransCond *cond_; + int64_t start_ts_; + uint64_t tenant_id_; }; class ObXACommitRPCCB : public ObXARpcProxy::AsyncCB { public: - ObXACommitRPCCB() : is_inited_(false), cond_(NULL), has_tx_level_temp_table_(NULL) {} + ObXACommitRPCCB() : is_inited_(false), cond_(NULL), has_tx_level_temp_table_(NULL), + start_ts_(-1), tenant_id_(OB_INVALID_TENANT_ID) {} ~ObXACommitRPCCB() {} int init(transaction::ObTransCond *cond, bool *has_tx_level_temp_table) { @@ -543,6 +555,8 @@ public: } else { has_tx_level_temp_table_ = has_tx_level_temp_table; cond_ = cond; + start_ts_ = ObTimeUtility::current_time(); + tenant_id_ = MTL_ID(); is_inited_ = true; } return ret; @@ -551,6 +565,8 @@ public: { has_tx_level_temp_table_ = NULL; cond_ = NULL; + start_ts_ = -1; + tenant_id_ = OB_INVALID_TENANT_ID; is_inited_ = false; } void set_args(const typename ObXARpcProxy::AsyncCB::Request &args) @@ -565,6 +581,8 @@ public: newcb = new (buf) ObXACommitRPCCB(); newcb->has_tx_level_temp_table_ = has_tx_level_temp_table_; newcb->cond_ = cond_; + newcb->start_ts_ = start_ts_; + newcb->tenant_id_ = tenant_id_; newcb->is_inited_ = true; } return newcb; @@ -589,6 +607,7 @@ public: cond_->notify(result.status_); } } + statistics(); return OB_SUCCESS; } void on_timeout() @@ -601,11 +620,15 @@ public: if (OB_NOT_NULL(cond_)) { cond_->notify(ret); } + statistics(); } + void statistics(); private: bool is_inited_; transaction::ObTransCond *cond_; bool *has_tx_level_temp_table_; + int64_t start_ts_; + uint64_t tenant_id_; }; }//obrpc diff --git a/src/storage/tx/ob_xa_service.cpp b/src/storage/tx/ob_xa_service.cpp index 0f5c4369e0..8d1b80cf40 100644 --- a/src/storage/tx/ob_xa_service.cpp +++ b/src/storage/tx/ob_xa_service.cpp @@ -64,9 +64,14 @@ int ObXAService::init(const ObAddr &self_addr, TRANS_LOG(ERROR, "xa trans relocate worker init error", KR(ret)); } else if (OB_FAIL(xa_inner_table_gc_worker_.init(this))) { TRANS_LOG(WARN, "xa inner table gc worker init error", KR(ret)); + } else if (OB_FAIL(xa_statistics_v2_.init(MTL_ID()))) { + TRANS_LOG(WARN, "xa statistics init error", KR(ret)); + } else if (OB_FAIL(dblink_statistics_.init(MTL_ID()))) { + TRANS_LOG(WARN, "dblink statistics init error", KR(ret)); } else { is_inited_ = true; } + TRANS_LOG(INFO, "xa service init", K(ret)); return ret; } @@ -85,6 +90,8 @@ void ObXAService::destroy() timer_.destroy(); xa_rpc_.destroy(); xa_proxy_.destroy(); + xa_statistics_v2_.destroy(); + dblink_statistics_.destroy(); is_inited_ = false; } TRANS_LOG(INFO, "xa service destroy"); @@ -93,7 +100,7 @@ void ObXAService::destroy() int ObXAService::start() { int ret = OB_SUCCESS; - + if (IS_NOT_INIT) { ret = OB_NOT_INIT; TRANS_LOG(WARN, "xa service is not inited", K(ret)); @@ -111,6 +118,7 @@ int ObXAService::start() } else { is_running_ = true; } + TRANS_LOG(INFO, "xa service start", KR(ret)); return ret; } @@ -202,6 +210,11 @@ int ObXAService::remote_one_phase_xa_commit_(const ObXATransID &xid, ret = result; } + // for statistics + if (OB_SUCC(ret)) { + XA_STAT_ADD_XA_COMMIT_REMOTE_COUNT(); + } + return ret; } @@ -302,7 +315,9 @@ int ObXAService::insert_record_for_standby(const uint64_t tenant_id, int64_t bqual_len = 0; const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); int64_t original_timeout_us = THIS_WORKER.get_timeout_ts(); - THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); + const int64_t start_ts = ObTimeUtility::current_time(); + THIS_WORKER.set_timeout_ts(start_ts + XA_INNER_TABLE_TIMEOUT); + ObXAInnerSqlStatGuard stat_guard(start_ts); if (!is_valid_tenant_id(tenant_id) || xid.empty() || !trans_id.is_valid()) { ret = OB_INVALID_ARGUMENT; @@ -329,7 +344,9 @@ int ObXAService::insert_record_for_standby(const uint64_t tenant_id, } else if (OB_FAIL(mysql_proxy->write(exec_tenant_id, sql.ptr(), affected_rows))) { TRANS_LOG(WARN, "execute insert record sql failed", KR(ret), K(exec_tenant_id), K(tenant_id)); } else { - xa_statistics_.inc_cleanup_tx_count(); + // xa_statistics_.inc_cleanup_tx_count(); + // XA_INNER_INCREMENT_COMPENSATE_COUNT + xa_statistics_v2_.inc_compensate_record_count(); TRANS_LOG(INFO, "execute insert record sql success", K(exec_tenant_id), K(tenant_id), K(sql), K(affected_rows)); } @@ -359,7 +376,9 @@ int ObXAService::insert_xa_lock(ObISQLClient &client, const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); int64_t original_timeout_us = THIS_WORKER.get_timeout_ts(); - THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); + const int64_t start_ts = ObTimeUtility::current_time(); + THIS_WORKER.set_timeout_ts(start_ts + XA_INNER_TABLE_TIMEOUT); + ObXAInnerSqlStatGuard stat_guard(start_ts); if (!is_valid_tenant_id(tenant_id) || xid.empty()) { ret = OB_INVALID_ARGUMENT; @@ -476,7 +495,9 @@ int ObXAService::insert_xa_record(ObISQLClient &client, const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); int64_t original_timeout_us = THIS_WORKER.get_timeout_ts(); - THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); + const int64_t start_ts = ObTimeUtility::current_time(); + THIS_WORKER.set_timeout_ts(start_ts + XA_INNER_TABLE_TIMEOUT); + ObXAInnerSqlStatGuard stat_guard(start_ts); if (!is_valid_tenant_id(tenant_id) || xid.empty() || !trans_id.is_valid()) { ret = OB_INVALID_ARGUMENT; @@ -534,7 +555,9 @@ int ObXAService::delete_xa_record(const uint64_t tenant_id, int64_t original_timeout_us = THIS_WORKER.get_timeout_ts(); const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); - THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); + const int64_t start_ts = ObTimeUtility::current_time(); + THIS_WORKER.set_timeout_ts(start_ts + XA_INNER_TABLE_TIMEOUT); + ObXAInnerSqlStatGuard stat_guard(start_ts); if (!is_valid_tenant_id(tenant_id) || xid.empty()) { ret = OB_INVALID_ARGUMENT; @@ -602,7 +625,9 @@ int ObXAService::delete_xa_all_tightly_branch(const uint64_t tenant_id, int64_t original_timeout_us = THIS_WORKER.get_timeout_ts(); const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); - THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); + const int64_t start_ts = ObTimeUtility::current_time(); + THIS_WORKER.set_timeout_ts(start_ts + XA_INNER_TABLE_TIMEOUT); + ObXAInnerSqlStatGuard stat_guard(start_ts); if (!is_valid_tenant_id(tenant_id) || xid.empty()) { ret = OB_INVALID_ARGUMENT; @@ -653,8 +678,10 @@ int ObXAService::delete_xa_branch(const uint64_t tenant_id, char bqual_str[128] = {0}; int64_t bqual_len = 0; int64_t original_timeout_us = THIS_WORKER.get_timeout_ts(); - THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); + const int64_t start_ts = ObTimeUtility::current_time(); + THIS_WORKER.set_timeout_ts(start_ts + XA_INNER_TABLE_TIMEOUT); + ObXAInnerSqlStatGuard stat_guard(start_ts); if (!is_valid_tenant_id(tenant_id) || xid.empty()) { ret = OB_INVALID_ARGUMENT; @@ -750,7 +777,9 @@ int ObXAService::query_xa_scheduler_trans_id(const uint64_t tenant_id, int64_t original_timeout_us = THIS_WORKER.get_timeout_ts(); const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); - THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); + const int64_t start_ts = ObTimeUtility::current_time(); + THIS_WORKER.set_timeout_ts(start_ts + XA_INNER_TABLE_TIMEOUT); + ObXAInnerSqlStatGuard stat_guard(start_ts); if (!is_valid_tenant_id(tenant_id) || xid.empty()) { ret = OB_INVALID_ARGUMENT; @@ -1028,11 +1057,12 @@ int ObXAService::xa_start(const ObXATransID &xid, tx_desc->set_xa_start_addr(GCONF.self_addr_); } if (OB_FAIL(ret)) { - xa_statistics_.inc_failure_xa_start(); + // xa_statistics_.inc_failure_xa_start(); TRANS_LOG(WARN, "xa start failed", K(ret), K(xid), K(flags), K(timeout_seconds)); } else { - xa_statistics_.inc_success_xa_start(); - TRANS_LOG(INFO, "xa start", K(ret), K(xid), K(flags), K(timeout_seconds), "tx_id", tx_desc->get_tx_id(), KPC(tx_desc)); + // xa_statistics_.inc_success_xa_start(); + TRANS_LOG(INFO, "xa start", K(ret), K(xid), K(flags), K(timeout_seconds), + "tx_id", tx_desc->get_tx_id(), KPC(tx_desc)); } return ret; @@ -1063,8 +1093,17 @@ int ObXAService::xa_start_(const ObXATransID &xid, // step 1: if tightly coupled, insert lock record first. if (OB_FAIL(MTL(transaction::ObTransService *)->gen_trans_id(trans_id))) { TRANS_LOG(WARN, "gen trans id fail", K(ret), K(exec_tenant_id), K(xid)); - } else if (OB_FAIL(trans.start(MTL(ObTransService *)->get_mysql_proxy(), exec_tenant_id))) { - TRANS_LOG(WARN, "trans start failed", K(ret), K(exec_tenant_id), K(xid)); + } + if (OB_FAIL(ret)) { + // do nothing + } else { + ObXAInnerSqlStatGuard stat_guard(ObTimeUtility::current_time()); + if (OB_FAIL(trans.start(MTL(ObTransService *)->get_mysql_proxy(), exec_tenant_id))) { + TRANS_LOG(WARN, "trans start failed", K(ret), K(exec_tenant_id), K(xid)); + } + } + if (OB_FAIL(ret)) { + // do nothing } else { if (is_tightly_coupled) { // tightly couple @@ -1096,6 +1135,7 @@ int ObXAService::xa_start_(const ObXATransID &xid, } // step 2: if first xa start, alloc tx_desc if (OB_FAIL(ret)) { + ObXAInnerSqlStatGuard stat_guard(ObTimeUtility::current_time()); (void)trans.end(false); TRANS_LOG(WARN, "insert or query xa lock record failed", K(ret), K(trans_id), K(xid)); } else { @@ -1128,8 +1168,11 @@ int ObXAService::xa_start_(const ObXATransID &xid, } else { TRANS_LOG(WARN, "insert xa trans into inner table error", K(ret), K(trans_id), K(xid)); } - if (OB_SUCCESS != (tmp_ret = trans.end(false))) { - TRANS_LOG(WARN, "rollback lock record failed", K(tmp_ret), K(xid)); + { // for inner sql statistics guard + ObXAInnerSqlStatGuard stat_guard(ObTimeUtility::current_time()); + if (OB_SUCCESS != (tmp_ret = trans.end(false))) { + TRANS_LOG(WARN, "rollback lock record failed", K(tmp_ret), K(xid)); + } } if (is_first_xa_start) { if (OB_SUCCESS != (tmp_ret = MTL(ObTransService*)->abort_tx(*tx_desc, @@ -1163,8 +1206,13 @@ int ObXAService::xa_start_(const ObXATransID &xid, if (OB_SUCC(ret)) { //commit record - if (OB_FAIL(trans.end(true))) { - TRANS_LOG(WARN, "commit inner table trans failed", K(ret), K(xid)); + { // for inner sql statistics guard + ObXAInnerSqlStatGuard stat_guard(ObTimeUtility::current_time()); + if (OB_FAIL(trans.end(true))) { + TRANS_LOG(WARN, "commit inner table trans failed", K(ret), K(xid)); + } + } + if (OB_FAIL(ret)) { const bool need_decrease_ref = true; if (OB_SUCCESS != (tmp_ret = MTL(ObTransService*)->abort_tx(*tx_desc, ObTxAbortCause::IMPLICIT_ROLLBACK))) { @@ -1173,11 +1221,16 @@ int ObXAService::xa_start_(const ObXATransID &xid, xa_ctx->try_exit(need_decrease_ref); xa_ctx_mgr_.revert_xa_ctx(xa_ctx); tx_desc = NULL; + } else { + XA_STAT_ADD_XA_TRANS_START_COUNT(); } } else { //rollback record - if (OB_SUCCESS != (tmp_ret = trans.end(false))) { - TRANS_LOG(WARN, "rollback inner table trans failed", K(tmp_ret), K(xid)); + { // for inner sql statistics guard + ObXAInnerSqlStatGuard stat_guard(ObTimeUtility::current_time()); + if (OB_SUCCESS != (tmp_ret = trans.end(false))) { + TRANS_LOG(WARN, "rollback inner table trans failed", K(tmp_ret), K(xid)); + } } if (OB_NOT_NULL(xa_ctx)) { xa_ctx_mgr_.erase_xa_ctx(trans_id); @@ -1196,8 +1249,14 @@ int ObXAService::xa_start_(const ObXATransID &xid, // tightly coupled mode, xa start noflags // this xa start is not the first for this xa trans // therefore the tx_id is from the inner table - if (OB_FAIL(trans.end(true))) { - TRANS_LOG(WARN, "commit inner table trans failed", K(ret), K(xid)); + { // for inner sql statistics guard + ObXAInnerSqlStatGuard stat_guard(ObTimeUtility::current_time()); + if (OB_FAIL(trans.end(true))) { + TRANS_LOG(WARN, "commit inner table trans failed", K(ret), K(xid)); + } + } + if (OB_FAIL(ret)) { + // do nothing } else { alloc = (GCTX.self_addr() == sche_addr) ? false : true; bool need_retry = false; @@ -1485,7 +1544,7 @@ int ObXAService::xa_commit(const ObXATransID &xid, { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - const int64_t request_id = ObTimeUtility::current_time(); + const int64_t start_ts = ObTimeUtility::current_time(); if (OB_UNLIKELY(!xid.is_valid()) || OB_UNLIKELY(!ObXAFlag::is_valid(flags, ObXAReqType::XA_COMMIT)) @@ -1497,21 +1556,22 @@ int ObXAService::xa_commit(const ObXATransID &xid, TRANS_LOG(WARN, "xa service not inited", K(ret), K(xid)); } else { const int64_t timeout_us = xa_timeout_seconds * 1000000; + const int64_t request_id = start_ts; if (ObXAFlag::is_tmnoflags(flags, ObXAReqType::XA_COMMIT)) { if (OB_FAIL(two_phase_xa_commit_(xid, timeout_us, request_id, has_tx_level_temp_table, tx_id))) { TRANS_LOG(WARN, "two phase xa commit failed", K(ret), K(xid)); - xa_statistics_.inc_failure_xa_2pc_commit(); + // xa_statistics_.inc_failure_xa_2pc_commit(); } else { - xa_statistics_.inc_success_xa_2pc_commit(); + // xa_statistics_.inc_success_xa_2pc_commit(); } } else if (ObXAFlag::is_tmonephase(flags)) { if (OB_FAIL(one_phase_xa_commit_(xid, timeout_us, request_id, has_tx_level_temp_table, tx_id))) { TRANS_LOG(WARN, "one phase xa commit failed", K(ret), K(xid)); - xa_statistics_.inc_failure_xa_1pc_commit(); + // xa_statistics_.inc_failure_xa_1pc_commit(); } else { - xa_statistics_.inc_success_xa_1pc_commit(); + // xa_statistics_.inc_success_xa_1pc_commit(); } } else { ret = OB_TRANS_XA_INVAL; @@ -1600,6 +1660,8 @@ int ObXAService::one_phase_xa_commit_(const ObXATransID &xid, } } } + // for statistics + XA_STAT_ADD_XA_ONE_PHASE_COMMIT_TOTAL_COUNT(); return ret; } @@ -1666,7 +1728,7 @@ int ObXAService::xa_rollback(const ObXATransID &xid, } } TRANS_LOG(INFO, "xa rollback", K(ret), K(xid), K(xa_timeout_seconds)); - xa_statistics_.inc_xa_rollback(); + // xa_statistics_.inc_xa_rollback(); return ret; } @@ -1944,6 +2006,10 @@ int ObXAService::xa_rollback_remote_(const ObXATransID &xid, result = OB_TRANS_CTX_NOT_EXIST; } #endif + // for statistics + if (OB_SUCC(ret)) { + XA_STAT_ADD_XA_ROLLBACK_REMOTE_COUNT(); + } return ret; } @@ -2190,9 +2256,9 @@ int ObXAService::xa_prepare(const ObXATransID &xid, } if (OB_FAIL(ret) && OB_TRANS_XA_RDONLY != ret) { - xa_statistics_.inc_failure_xa_prepare(); + // xa_statistics_.inc_failure_xa_prepare(); } else { - xa_statistics_.inc_success_xa_prepare(); + // xa_statistics_.inc_success_xa_prepare(); } return ret; } @@ -2344,6 +2410,10 @@ int ObXAService::remote_xa_prepare_(const ObXATransID &xid, ret = result; } } + // for statistics + if (OB_SUCC(ret)) { + XA_STAT_ADD_XA_PREPARE_REMOTE_COUNT(); + } return ret; } @@ -2549,7 +2619,9 @@ int ObXAService::query_xa_coord_from_tableone(const uint64_t tenant_id, int64_t original_timeout_us = THIS_WORKER.get_timeout_ts(); const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); - THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); + const int64_t start_ts = ObTimeUtility::current_time(); + THIS_WORKER.set_timeout_ts(start_ts + XA_INNER_TABLE_TIMEOUT); + ObXAInnerSqlStatGuard stat_guard(start_ts); if (!is_valid_tenant_id(tenant_id) || xid.empty()) { ret = OB_INVALID_ARGUMENT; @@ -2642,7 +2714,9 @@ int ObXAService::query_sche_and_coord(const uint64_t tenant_id, int64_t original_timeout_us = THIS_WORKER.get_timeout_ts(); const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id); - THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); + const int64_t start_ts = ObTimeUtility::current_time(); + THIS_WORKER.set_timeout_ts(start_ts + XA_INNER_TABLE_TIMEOUT); + ObXAInnerSqlStatGuard stat_guard(start_ts); if (!is_valid_tenant_id(tenant_id) || xid.empty()) { ret = OB_INVALID_ARGUMENT; @@ -2745,7 +2819,9 @@ int ObXAService::update_coord(const uint64_t tenant_id, mask = ObXAFlag::OBTEMPTABLE; } - THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + XA_INNER_TABLE_TIMEOUT); + const int64_t start_ts = ObTimeUtility::current_time(); + THIS_WORKER.set_timeout_ts(start_ts + XA_INNER_TABLE_TIMEOUT); + ObXAInnerSqlStatGuard stat_guard(start_ts); if (!is_valid_tenant_id(tenant_id) || xid.empty() @@ -2946,6 +3022,12 @@ void ObXACache::clean_prepare_cache_item_(ObXACacheItem &item) { item.reset(); } +void ObXAService::try_print_statistics() +{ + xa_statistics_v2_.try_print_xa_statistics(); + dblink_statistics_.try_print_dblink_statistics(); +} + }//transaction diff --git a/src/storage/tx/ob_xa_service.h b/src/storage/tx/ob_xa_service.h index 819a1cf931..d23e5fcba2 100644 --- a/src/storage/tx/ob_xa_service.h +++ b/src/storage/tx/ob_xa_service.h @@ -20,6 +20,7 @@ #include "ob_xa_define.h" #include "ob_xa_inner_table_gc_worker.h" #include "ob_xa_trans_heartbeat_worker.h" +#include "ob_xa_trans_event.h" namespace oceanbase { @@ -91,7 +92,10 @@ private: class ObXAService { public: - ObXAService() : is_running_(false), is_inited_(false) {} + ObXAService() + : xa_ctx_mgr_(), xa_proxy_(), xa_rpc_(), timer_(), xa_trans_heartbeat_worker_(), + xa_inner_table_gc_worker_(), is_running_(false), is_inited_(false), + xa_statistics_v2_(), dblink_statistics_() {} virtual ~ObXAService() { destroy(); } int init(const ObAddr &self_addr, rpc::frame::ObReqTransport *req_transport); @@ -100,6 +104,8 @@ public: void stop(); void wait(); void destroy(); +public: + void try_print_statistics(); public: int xa_start(const ObXATransID &xid, const int64_t flags, @@ -230,7 +236,9 @@ public: const ObTransID &trans_id, const share::ObLSID &coordinator, const ObAddr &sche_addr); - ObXAStatistics &get_xa_statistics() { return xa_statistics_; } + // ObXAStatistics &get_xa_statistics() { return xa_statistics_; } + ObXATransStatistics &get_statistics() { return xa_statistics_v2_; } + ObDBLinkTransStatistics &get_dblink_statistics() { return dblink_statistics_; } private: int local_one_phase_xa_commit_ (const ObXATransID &xid, const ObTransID &trans_id, @@ -336,10 +344,176 @@ private: ObXAInnerTableGCWorker xa_inner_table_gc_worker_; bool is_running_; bool is_inited_; - ObXAStatistics xa_statistics_; ObXACache xa_cache_; + // ObXAStatistics xa_statistics_; + ObXATransStatistics xa_statistics_v2_; + ObDBLinkTransStatistics dblink_statistics_; }; + +class ObXAInnerSqlStatGuard +{ +public: + explicit ObXAInnerSqlStatGuard(const int64_t start_ts) + : start_ts_(start_ts) + { + if (0 > start_ts_) { + start_ts_ = 0; + } + } + ~ObXAInnerSqlStatGuard() + { + ObXATransStatistics &statistics = MTL(ObXAService*)->get_statistics(); + if (0 < start_ts_) { + const int64_t used_time_us = ObTimeUtility::current_time() - start_ts_; + if (0 < used_time_us) { + statistics.inc_xa_inner_sql_total_count(); + statistics.add_xa_inner_sql_total_used_time(used_time_us); + if (10000 < used_time_us) { + statistics.inc_xa_inner_sql_ten_ms_total_count(); + } + if (20000 < used_time_us) { + statistics.inc_xa_inner_sql_twenty_ms_total_count(); + } + } + } + } +private: + int64_t start_ts_; +}; + +class ObXAStmtGuard +{ +public: + explicit ObXAStmtGuard(const int64_t start_ts) + : start_ts_(start_ts) + { + ObXATransStatistics &statistics = MTL(ObXAService*)->get_statistics(); + statistics.inc_active_xa_stmt_count(); + statistics.try_print_active(start_ts); + } + ~ObXAStmtGuard() + { + ObXATransStatistics &statistics = MTL(ObXAService*)->get_statistics(); + statistics.dec_active_xa_stmt_count(); + } +private: + int64_t start_ts_; +}; + +/////// statistics of xa stmt +#define XA_STAT_ADD_XA_START_TOTAL_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_start_total_count();} \ + +#define XA_STAT_ADD_XA_START_REMOTE_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_start_remote_count();} \ + +#define XA_STAT_ADD_XA_START_FAIL_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_start_fail_count();} \ + +#define XA_STAT_ADD_XA_END_TOTAL_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_end_total_count();} \ + +#define XA_STAT_ADD_XA_END_REMOTE_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_end_remote_count();} \ + +#define XA_STAT_ADD_XA_END_FAIL_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_end_fail_count();} \ + +#define XA_STAT_ADD_XA_PREPARE_TOTAL_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_prepare_total_count();} \ + +#define XA_STAT_ADD_XA_PREPARE_REMOTE_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_prepare_remote_count();} \ + +#define XA_STAT_ADD_XA_PREPARE_FAIL_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_prepare_fail_count();} \ + +#define XA_STAT_ADD_XA_COMMIT_TOTAL_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_commit_total_count();} \ + +#define XA_STAT_ADD_XA_COMMIT_REMOTE_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_commit_remote_count();} \ + +#define XA_STAT_ADD_XA_COMMIT_FAIL_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_commit_fail_count();} \ + +#define XA_STAT_ADD_XA_ROLLBACK_TOTAL_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_rollback_total_count();} \ + +#define XA_STAT_ADD_XA_ROLLBACK_REMOTE_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_rollback_remote_count();} \ + +#define XA_STAT_ADD_XA_ROLLBACK_FAIL_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_rollback_fail_count();} \ + +#define XA_STAT_ADD_XA_START_TOTAL_USED_TIME(used_time) \ + { MTL(ObXAService*)->get_statistics().add_xa_start_total_used_time(used_time);} \ + +#define XA_STAT_ADD_XA_END_TOTAL_USED_TIME(used_time) \ + { MTL(ObXAService*)->get_statistics().add_xa_end_total_used_time(used_time);} \ + +#define XA_STAT_ADD_XA_PREPARE_TOTAL_USED_TIME(used_time) \ + { MTL(ObXAService*)->get_statistics().add_xa_prepare_total_used_time(used_time);} \ + +#define XA_STAT_ADD_XA_COMMIT_TOTAL_USED_TIME(used_time) \ + { MTL(ObXAService*)->get_statistics().add_xa_commit_total_used_time(used_time);} \ + +#define XA_STAT_ADD_XA_ROLLBACK_TOTAL_USED_TIME(used_time) \ + { MTL(ObXAService*)->get_statistics().add_xa_rollback_total_used_time(used_time);} \ + +/////// statistics of xa trans +#define XA_STAT_ADD_XA_TRANS_START_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_trans_start_count();} \ + +#define XA_STAT_ADD_XA_READ_ONLY_TRANS_TOTAL_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_read_only_trans_total_count();} \ + +#define XA_STAT_ADD_XA_ONE_PHASE_COMMIT_TOTAL_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_xa_one_phase_commit_total_count();} \ + +/////// statistics of active xa info +#define XA_ACTIVE_INCREMENT_XA_CTX_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_active_xa_ctx_count();} \ + +#define XA_ACTIVE_DECREMENT_XA_CTX_COUNT() \ + { MTL(ObXAService*)->get_statistics().dec_active_xa_ctx_count();} \ + +/////// statistics of xa inner logic +#define XA_INNER_INCREMENT_COMPENSATE_COUNT() \ + { MTL(ObXAService*)->get_statistics().inc_compensate_record_count();} \ + +/////// statistics of dblink trans +#define DBLINK_STAT_ADD_TRANS_COUNT() \ + { MTL(ObXAService*)->get_dblink_statistics().inc_dblink_trans_count();} \ + +#define DBLINK_STAT_ADD_TRANS_FAIL_COUNT() \ + { MTL(ObXAService*)->get_dblink_statistics().inc_dblink_trans_fail_count();} \ + +#define DBLINK_STAT_ADD_TRANS_PROMOTION_COUNT() \ + { MTL(ObXAService*)->get_dblink_statistics().inc_dblink_trans_promotion_count();} \ + +#define DBLINK_STAT_ADD_TRANS_CALLBACK_COUNT() \ + { MTL(ObXAService*)->get_dblink_statistics().inc_dblink_trans_callback_count();} \ + +#define DBLINK_STAT_ADD_TRANS_COMMIT_COUNT() \ + { MTL(ObXAService*)->get_dblink_statistics().inc_dblink_trans_commit_count();} \ + +#define DBLINK_STAT_ADD_TRANS_COMMIT_USED_TIME(used_time) \ + { MTL(ObXAService*)->get_dblink_statistics().add_dblink_trans_commit_used_time(used_time);} \ + +#define DBLINK_STAT_ADD_TRANS_COMMIT_FAIL_COUNT() \ + { MTL(ObXAService*)->get_dblink_statistics().inc_dblink_trans_commit_fail_count();} \ + +#define DBLINK_STAT_ADD_TRANS_ROLLBACK_COUNT() \ + { MTL(ObXAService*)->get_dblink_statistics().inc_dblink_trans_rollback_count();} \ + +#define DBLINK_STAT_ADD_TRANS_ROLLBACK_USED_TIME(used_time) \ + { MTL(ObXAService*)->get_dblink_statistics().add_dblink_trans_rollback_used_time(used_time);} \ + +#define DBLINK_STAT_ADD_TRANS_ROLLBACK_FAIL_COUNT() \ + { MTL(ObXAService*)->get_dblink_statistics().inc_dblink_trans_rollback_fail_count();} \ + }//transaction diff --git a/src/storage/tx/ob_xa_trans_event.cpp b/src/storage/tx/ob_xa_trans_event.cpp new file mode 100644 index 0000000000..9163042453 --- /dev/null +++ b/src/storage/tx/ob_xa_trans_event.cpp @@ -0,0 +1,704 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX TRANS +#include "ob_xa_trans_event.h" +#include "lib/statistic_event/ob_stat_event.h" +#include "lib/stat/ob_diagnose_info.h" +#include "lib/stat/ob_session_stat.h" +#include "lib/lock/ob_spin_lock.h" +#include "share/ob_force_print_log.h" + +namespace oceanbase +{ +using namespace common; + +namespace transaction +{ + +int ObXATransStatistics::init(const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + TRANS_LOG(WARN, "xa trans statistics init twice", K(ret)); + } else if (!is_valid_tenant_id(tenant_id)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id)); + } else { + tenant_id_ = tenant_id; + is_inited_ = true; + } + return ret; +} + +void ObXATransStatistics::reset() +{ + tenant_id_ = OB_INVALID_TENANT_ID; + last_print_stat_ts_ = 0; + last_print_active_ts_ = 0; + active_xa_stmt_count_ = 0; + active_xa_ctx_count_ = 0; + is_inited_ = false; +} + +void ObXATransStatistics::inc_xa_start_total_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_START_TOTAL_COUNT, 1); + ATOMIC_INC(&xa_start_count_); +} + +void ObXATransStatistics::add_xa_start_total_used_time(const int64_t value) +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_START_TOTAL_USED_TIME, value); + ATOMIC_FAA(&xa_start_used_time_us_, value); +} + +void ObXATransStatistics::inc_xa_start_remote_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_START_REMOTE_COUNT, 1); + ATOMIC_INC(&xa_start_remote_count_); +} + +void ObXATransStatistics::inc_xa_start_fail_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_START_FAIL_COUNT, 1); + ATOMIC_INC(&xa_start_fail_count_); +} + +void ObXATransStatistics::inc_xa_end_total_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_END_TOTAL_COUNT, 1); + ATOMIC_INC(&xa_end_count_); +} + +void ObXATransStatistics::add_xa_end_total_used_time(const int64_t value) +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_END_TOTAL_USED_TIME, value); + ATOMIC_FAA(&xa_end_used_time_us_, value); +} + +void ObXATransStatistics::inc_xa_end_remote_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_END_REMOTE_COUNT, 1); + ATOMIC_INC(&xa_end_remote_count_); +} + +void ObXATransStatistics::inc_xa_end_fail_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_END_FAIL_COUNT, 1); + ATOMIC_INC(&xa_end_fail_count_); +} + +void ObXATransStatistics::inc_xa_prepare_total_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_PREPARE_TOTAL_COUNT, 1); + ATOMIC_INC(&xa_prepare_count_); +} + +void ObXATransStatistics::add_xa_prepare_total_used_time(const int64_t value) +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_PREPARE_TOTAL_USED_TIME, value); + ATOMIC_FAA(&xa_prepare_used_time_us_, value); +} + +void ObXATransStatistics::inc_xa_prepare_remote_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_PREPARE_REMOTE_COUNT, 1); + ATOMIC_INC(&xa_prepare_remote_count_); +} + +void ObXATransStatistics::inc_xa_prepare_fail_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_PREPARE_FAIL_COUNT, 1); + ATOMIC_INC(&xa_prepare_fail_count_); +} + +void ObXATransStatistics::inc_xa_commit_total_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_COMMIT_TOTAL_COUNT, 1); + ATOMIC_INC(&xa_commit_count_); +} + +void ObXATransStatistics::add_xa_commit_total_used_time(const int64_t value) +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_COMMIT_TOTAL_USED_TIME, value); + ATOMIC_FAA(&xa_commit_used_time_us_, value); +} + +void ObXATransStatistics::inc_xa_commit_remote_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_COMMIT_REMOTE_COUNT, 1); + ATOMIC_INC(&xa_commit_remote_count_); +} + +void ObXATransStatistics::inc_xa_commit_fail_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_COMMIT_FAIL_COUNT, 1); + ATOMIC_INC(&xa_commit_fail_count_); +} + +void ObXATransStatistics::inc_xa_rollback_total_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_ROLLBACK_TOTAL_COUNT, 1); + ATOMIC_INC(&xa_rollback_count_); +} + +void ObXATransStatistics::add_xa_rollback_total_used_time(const int64_t value) +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_ROLLBACK_TOTAL_USED_TIME, value); + ATOMIC_FAA(&xa_rollback_used_time_us_, value); +} + +void ObXATransStatistics::inc_xa_rollback_remote_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_ROLLBACK_REMOTE_COUNT, 1); + ATOMIC_INC(&xa_rollback_remote_count_); +} + +void ObXATransStatistics::inc_xa_rollback_fail_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_ROLLBACK_FAIL_COUNT, 1); + ATOMIC_INC(&xa_rollback_fail_count_); +} + +void ObXATransStatistics::inc_xa_trans_start_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_TRANS_START_COUNT, 1); + ATOMIC_INC(&xa_trans_start_count_); +} + +void ObXATransStatistics::inc_xa_read_only_trans_total_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_READ_ONLY_TRANS_TOTAL_COUNT, 1); + ATOMIC_INC(&xa_read_only_trans_count_); +} + +void ObXATransStatistics::inc_xa_one_phase_commit_total_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_ONE_PHASE_COMMIT_TOTAL_COUNT, 1); + ATOMIC_INC(&xa_one_phase_commit_count_); +} + +void ObXATransStatistics::inc_xa_inner_sql_total_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_INNER_SQL_TOTAL_COUNT, 1); + ATOMIC_INC(&xa_inner_sql_count_); +} + +void ObXATransStatistics::add_xa_inner_sql_total_used_time(const int64_t value) +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_INNER_SQL_TOTAL_USED_TIME, value); + ATOMIC_FAA(&xa_inner_sql_used_time_us_, value); +} + +void ObXATransStatistics::inc_xa_inner_rpc_total_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_INNER_RPC_TOTAL_COUNT, 1); + ATOMIC_INC(&xa_inner_rpc_count_); +} + +void ObXATransStatistics::add_xa_inner_rpc_total_used_time(const int64_t value) +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_INNER_RPC_TOTAL_USED_TIME, value); + ATOMIC_FAA(&xa_inner_rpc_used_time_us_, value); +} + +void ObXATransStatistics::inc_xa_inner_sql_ten_ms_total_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_INNER_SQL_TEN_MS_COUNT, 1); + ATOMIC_INC(&xa_inner_sql_ten_ms_count_); +} + +void ObXATransStatistics::inc_xa_inner_sql_twenty_ms_total_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_INNER_SQL_TWENTY_MS_COUNT, 1); + ATOMIC_INC(&xa_inner_sql_twenty_ms_count_); +} + +void ObXATransStatistics::inc_xa_inner_rpc_ten_ms_total_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_INNER_RPC_TEN_MS_COUNT, 1); + ATOMIC_INC(&xa_inner_rpc_ten_ms_count_); +} + +void ObXATransStatistics::inc_xa_inner_rpc_twenty_ms_total_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(XA_INNER_RPC_TWENTY_MS_COUNT, 1); + ATOMIC_INC(&xa_inner_rpc_twenty_ms_count_); +} + +void ObXATransStatistics::inc_active_xa_stmt_count() +{ + ATOMIC_INC(&active_xa_stmt_count_); +} + +void ObXATransStatistics::dec_active_xa_stmt_count() +{ + ATOMIC_DEC(&active_xa_stmt_count_); +} + +void ObXATransStatistics::inc_active_xa_ctx_count() +{ + ATOMIC_INC(&active_xa_ctx_count_); +} + +void ObXATransStatistics::dec_active_xa_ctx_count() +{ + ATOMIC_DEC(&active_xa_ctx_count_); +} + +void ObXATransStatistics::inc_compensate_record_count() +{ + ATOMIC_INC(&xa_compensate_record_count_); +} + +void ObXATransStatistics::try_print_xa_statistics() +{ + static const int64_t STAT_INTERVAL = 9000000; // 9 seconds + const int64_t current_ts = ObTimeUtility::current_time(); + const int64_t last_print_stat_ts = ATOMIC_LOAD(&last_print_stat_ts_); + if (current_ts - last_print_stat_ts >= STAT_INTERVAL) { + if (ATOMIC_BCAS(&last_print_stat_ts_, last_print_stat_ts, current_ts)) { + int64_t xa_start_count = ATOMIC_LOAD(&xa_start_count_); + int64_t xa_end_count = ATOMIC_LOAD(&xa_end_count_); + int64_t xa_prepare_count = ATOMIC_LOAD(&xa_prepare_count_); + int64_t xa_commit_count = ATOMIC_LOAD(&xa_commit_count_); + int64_t xa_rollback_count = ATOMIC_LOAD(&xa_rollback_count_); + int64_t xa_inner_sql_count = ATOMIC_LOAD(&xa_inner_sql_count_); + if (0 != xa_inner_sql_count || 0 != xa_start_count || 0 != xa_end_count + || 0 != xa_prepare_count || 0 != xa_commit_count || 0 != xa_rollback_count) { + // for xa stmt + FLOG_INFO("xa stmt statistics", + "xa_start_count", xa_start_count, + "xa_start_latency", (0 == xa_start_count) ? 0 : ATOMIC_LOAD(&xa_start_used_time_us_) / xa_start_count, + "xa_start_remote_count", ATOMIC_LOAD(&xa_start_remote_count_), + "xa_start_fail_count", ATOMIC_LOAD(&xa_start_fail_count_), + "xa_end_count", xa_end_count, + "xa_end_latency", (0 == xa_end_count) ? 0 : ATOMIC_LOAD(&xa_end_used_time_us_) / xa_end_count, + "xa_end_remote_count", ATOMIC_LOAD(&xa_end_remote_count_), + "xa_end_fail_count", ATOMIC_LOAD(&xa_end_fail_count_), + "xa_prepare_count", xa_prepare_count, + "xa_prepare_latency", (0 == xa_prepare_count) ? 0 : ATOMIC_LOAD(&xa_prepare_used_time_us_) / xa_prepare_count, + "xa_prepare_remote_count", ATOMIC_LOAD(&xa_prepare_remote_count_), + "xa_prepare_fail_count", ATOMIC_LOAD(&xa_prepare_fail_count_), + "xa_commit_count", xa_commit_count, + "xa_commit_latency", (0 == xa_commit_count) ? 0 : ATOMIC_LOAD(&xa_commit_used_time_us_) / xa_commit_count, + "xa_commit_remote_count", ATOMIC_LOAD(&xa_commit_remote_count_), + "xa_commit_fail_count", ATOMIC_LOAD(&xa_commit_fail_count_), + "xa_rollback_count", xa_rollback_count, + "xa_rollback_latency", (0 == xa_rollback_count) ? 0 : ATOMIC_LOAD(&xa_rollback_used_time_us_) / xa_rollback_count, + "xa_rollback_remote_count", ATOMIC_LOAD(&xa_rollback_remote_count_), + "xa_rollback_fail_count", ATOMIC_LOAD(&xa_rollback_fail_count_), + K(last_print_stat_ts)); + // for xa trans + xa_inner_sql_count = ATOMIC_LOAD(&xa_inner_sql_count_); + int64_t xa_trans_start_count = ATOMIC_LOAD(&xa_trans_start_count_); + int64_t xa_inner_rpc_count = ATOMIC_LOAD(&xa_inner_rpc_count_); + FLOG_INFO("xa trans statistics", + "xa_trans_start_count", xa_trans_start_count, + "xa_read_only_trans_count", ATOMIC_LOAD(&xa_read_only_trans_count_), + "xa_one_phase_commit_count", ATOMIC_LOAD(&xa_one_phase_commit_count_), + "xa_inner_sql_count", xa_inner_sql_count, + "xa_inner_sql_latency", (0 == xa_inner_sql_count) ? 0 : ATOMIC_LOAD(&xa_inner_sql_used_time_us_) / xa_inner_sql_count, + "xa_inner_sql_ten_ms_count", ATOMIC_LOAD(&xa_inner_sql_ten_ms_count_), + "xa_inner_sql_twenty_ms_count", ATOMIC_LOAD(&xa_inner_sql_twenty_ms_count_), + "xa_inner_rpc_count", xa_inner_rpc_count, + "xa_inner_rpc_latency", (0 == xa_inner_rpc_count) ? 0 : ATOMIC_LOAD(&xa_inner_rpc_used_time_us_) / xa_inner_rpc_count, + "xa_inner_rpc_ten_ms_count", ATOMIC_LOAD(&xa_inner_rpc_ten_ms_count_), + "xa_inner_rpc_twenty_ms_count", ATOMIC_LOAD(&xa_inner_rpc_twenty_ms_count_), + "duration", (current_ts - last_print_stat_ts)); + } + // for inner logic + int64_t xa_compensate_record_count = ATOMIC_LOAD(&xa_compensate_record_count_); + if (0 != xa_compensate_record_count) { + FLOG_INFO("xa statistics of inner logic", + "xa_compensate_record_count", xa_compensate_record_count); + } + // reset + // NOTE that the active info should not be reset + ATOMIC_STORE(&xa_start_count_, 0); + ATOMIC_STORE(&xa_start_used_time_us_, 0); + ATOMIC_STORE(&xa_start_remote_count_, 0); + ATOMIC_STORE(&xa_start_fail_count_, 0); + ATOMIC_STORE(&xa_end_count_, 0); + ATOMIC_STORE(&xa_end_used_time_us_, 0); + ATOMIC_STORE(&xa_end_remote_count_, 0); + ATOMIC_STORE(&xa_end_fail_count_, 0); + ATOMIC_STORE(&xa_prepare_count_, 0); + ATOMIC_STORE(&xa_prepare_used_time_us_, 0); + ATOMIC_STORE(&xa_prepare_remote_count_, 0); + ATOMIC_STORE(&xa_prepare_fail_count_, 0); + ATOMIC_STORE(&xa_commit_count_, 0); + ATOMIC_STORE(&xa_commit_used_time_us_, 0); + ATOMIC_STORE(&xa_commit_remote_count_, 0); + ATOMIC_STORE(&xa_commit_fail_count_, 0); + ATOMIC_STORE(&xa_rollback_count_, 0); + ATOMIC_STORE(&xa_rollback_used_time_us_, 0); + ATOMIC_STORE(&xa_rollback_remote_count_, 0); + ATOMIC_STORE(&xa_rollback_fail_count_, 0); + ATOMIC_STORE(&xa_trans_start_count_, 0); + ATOMIC_STORE(&xa_read_only_trans_count_, 0); + ATOMIC_STORE(&xa_one_phase_commit_count_, 0); + ATOMIC_STORE(&xa_inner_sql_count_, 0); + ATOMIC_STORE(&xa_inner_sql_used_time_us_, 0); + ATOMIC_STORE(&xa_inner_rpc_count_, 0); + ATOMIC_STORE(&xa_inner_rpc_used_time_us_, 0); + ATOMIC_STORE(&xa_inner_sql_ten_ms_count_, 0); + ATOMIC_STORE(&xa_inner_sql_twenty_ms_count_, 0); + ATOMIC_STORE(&xa_inner_rpc_ten_ms_count_, 0); + ATOMIC_STORE(&xa_inner_rpc_twenty_ms_count_, 0); + ATOMIC_STORE(&xa_compensate_record_count_, 0); + } + // for active info + int64_t active_xa_stmt_count = ATOMIC_LOAD(&active_xa_stmt_count_); + int64_t active_xa_ctx_count = ATOMIC_LOAD(&active_xa_ctx_count_); + if (0 != active_xa_stmt_count || 0 != active_xa_ctx_count) { + FLOG_INFO("active xa statistics", + "active_xa_stmt_count", active_xa_stmt_count, + "active_xa_ctx_count", active_xa_ctx_count); + } + } +} + +void ObXATransStatistics::try_print_active(const int64_t current_ts) +{ + static const int64_t STAT_INTERVAL = 1000000; // 1 seconds + static const int64_t ACTIVE_STMT_THRESHOLD = 40; + const int64_t last_print_ts = ATOMIC_LOAD(&last_print_active_ts_); + const int64_t active_xa_stmt_count = ATOMIC_LOAD(&active_xa_stmt_count_); + if ((current_ts - last_print_ts >= STAT_INTERVAL) + && active_xa_stmt_count > ACTIVE_STMT_THRESHOLD) { + if (ATOMIC_BCAS(&last_print_active_ts_, last_print_ts, current_ts)) { + FLOG_INFO("active xa statistics", + "active_xa_stmt_count", active_xa_stmt_count, + "active_xa_ctx_count", ATOMIC_LOAD(&active_xa_ctx_count_), + K(last_print_ts)); + } + } +} + +int ObDBLinkTransStatistics::init(const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + TRANS_LOG(WARN, "xa trans statistics init twice", K(ret)); + } else if (!is_valid_tenant_id(tenant_id)) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), K(tenant_id)); + } else { + tenant_id_ = tenant_id; + is_inited_ = true; + } + return ret; +} + +void ObDBLinkTransStatistics::reset() +{ + tenant_id_ = OB_INVALID_TENANT_ID; + last_print_stat_ts_ = 0; + is_inited_ = false; +} + +void ObDBLinkTransStatistics::inc_dblink_trans_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(DBLINK_TRANS_COUNT, 1); + ATOMIC_INC(&dblink_trans_count_); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_callback_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(DBLINK_TRANS_CALLBACK_COUNT, 1); + ATOMIC_INC(&dblink_trans_callback_count_); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_fail_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(DBLINK_TRANS_FAIL_COUNT, 1); + ATOMIC_INC(&dblink_trans_fail_count_); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_promotion_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(DBLINK_TRANS_PROMOTION_COUNT, 1); + ATOMIC_INC(&dblink_trans_promotion_count_); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_commit_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(DBLINK_TRANS_COMMIT_COUNT, 1); + ATOMIC_INC(&dblink_trans_commit_count_); +} + +void ObDBLinkTransStatistics::add_dblink_trans_commit_used_time(const int64_t used_time) +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(DBLINK_TRANS_COMMIT_USED_TIME, used_time); + ATOMIC_FAA(&dblink_trans_commit_used_time_, used_time); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_commit_fail_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(DBLINK_TRANS_COMMIT_FAIL_COUNT, 1); + ATOMIC_INC(&dblink_trans_commit_fail_count_); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_rollback_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(DBLINK_TRANS_ROLLBACK_COUNT, 1); + ATOMIC_INC(&dblink_trans_rollback_count_); +} + +void ObDBLinkTransStatistics::add_dblink_trans_rollback_used_time(const int64_t used_time) +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(DBLINK_TRANS_ROLLBACK_USED_TIME, used_time); + ATOMIC_FAA(&dblink_trans_rollback_used_time_, used_time); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_rollback_fail_count() +{ + common::ObTenantStatEstGuard guard(tenant_id_); + EVENT_ADD(DBLINK_TRANS_ROLLBACK_FAIL_COUNT, 1); + ATOMIC_INC(&dblink_trans_rollback_fail_count_); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_xa_start_count() +{ + // common::ObTenantStatEstGuard guard(tenant_id_); + // EVENT_ADD(DBLINK_TRANS_XA_START_COUNT, 1); + ATOMIC_INC(&dblink_trans_xa_start_count_); +} + +void ObDBLinkTransStatistics::add_dblink_trans_xa_start_used_time(const int64_t used_time) +{ + // common::ObTenantStatEstGuard guard(tenant_id_); + // EVENT_ADD(DBLINK_TRANS_XA_START_USED_TIME, used_time); + ATOMIC_FAA(&dblink_trans_xa_start_used_time_, used_time); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_xa_start_fail_count() +{ + // common::ObTenantStatEstGuard guard(tenant_id_); + // EVENT_ADD(DBLINK_TRANS_XA_START_FAIL_COUNT, 1); + ATOMIC_INC(&dblink_trans_xa_start_fail_count_); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_xa_end_count() +{ + // common::ObTenantStatEstGuard guard(tenant_id_); + // EVENT_ADD(DBLINK_TRANS_XA_END_COUNT, 1); + ATOMIC_INC(&dblink_trans_xa_end_count_); +} + +void ObDBLinkTransStatistics::add_dblink_trans_xa_end_used_time(const int64_t used_time) +{ + // common::ObTenantStatEstGuard guard(tenant_id_); + // EVENT_ADD(DBLINK_TRANS_XA_END_USED_TIME, used_time); + ATOMIC_FAA(&dblink_trans_xa_end_used_time_, used_time); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_xa_end_fail_count() +{ + // common::ObTenantStatEstGuard guard(tenant_id_); + // EVENT_ADD(DBLINK_TRANS_XA_END_FAIL_COUNT, 1); + ATOMIC_INC(&dblink_trans_xa_end_fail_count_); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_xa_prepare_count() +{ + // common::ObTenantStatEstGuard guard(tenant_id_); + // EVENT_ADD(DBLINK_TRANS_XA_PREPARE_COUNT, 1); + ATOMIC_INC(&dblink_trans_xa_prepare_count_); +} + +void ObDBLinkTransStatistics::add_dblink_trans_xa_prepare_used_time(const int64_t used_time) +{ + // common::ObTenantStatEstGuard guard(tenant_id_); + // EVENT_ADD(DBLINK_TRANS_XA_PREPARE_USED_TIME, used_time); + ATOMIC_FAA(&dblink_trans_xa_prepare_used_time_, used_time); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_xa_prepare_fail_count() +{ + // common::ObTenantStatEstGuard guard(tenant_id_); + // EVENT_ADD(DBLINK_TRANS_XA_PREPARE_FAIL_COUNT, 1); + ATOMIC_INC(&dblink_trans_xa_prepare_fail_count_); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_xa_commit_count() +{ + // common::ObTenantStatEstGuard guard(tenant_id_); + // EVENT_ADD(DBLINK_TRANS_XA_COMMIT_COUNT, 1); + ATOMIC_INC(&dblink_trans_xa_commit_count_); +} + +void ObDBLinkTransStatistics::add_dblink_trans_xa_commit_used_time(const int64_t used_time) +{ + // common::ObTenantStatEstGuard guard(tenant_id_); + // EVENT_ADD(DBLINK_TRANS_XA_COMMIT_USED_TIME, used_time); + ATOMIC_FAA(&dblink_trans_xa_commit_used_time_, used_time); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_xa_commit_fail_count() +{ + // common::ObTenantStatEstGuard guard(tenant_id_); + // EVENT_ADD(DBLINK_TRANS_XA_COMMIT_FAIL_COUNT, 1); + ATOMIC_INC(&dblink_trans_xa_commit_fail_count_); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_xa_rollback_count() +{ + // common::ObTenantStatEstGuard guard(tenant_id_); + // EVENT_ADD(DBLINK_TRANS_XA_ROLLBACK_COUNT, 1); + ATOMIC_INC(&dblink_trans_xa_rollback_count_); +} + +void ObDBLinkTransStatistics::add_dblink_trans_xa_rollback_used_time(const int64_t used_time) +{ + // common::ObTenantStatEstGuard guard(tenant_id_); + // EVENT_ADD(DBLINK_TRANS_XA_ROLLBACK_USED_TIME, used_time); + ATOMIC_FAA(&dblink_trans_xa_rollback_used_time_, used_time); +} + +void ObDBLinkTransStatistics::inc_dblink_trans_xa_rollback_fail_count() +{ + // common::ObTenantStatEstGuard guard(tenant_id_); + // EVENT_ADD(DBLINK_TRANS_XA_ROLLBACK_FAIL_COUNT, 1); + ATOMIC_INC(&dblink_trans_xa_rollback_fail_count_); +} + +void ObDBLinkTransStatistics::try_print_dblink_statistics() +{ + static const int64_t STAT_INTERVAL = 9000000; // 9 seconds + const int64_t current_ts = ObTimeUtility::current_time(); + const int64_t last_print_stat_ts = ATOMIC_LOAD(&last_print_stat_ts_); + if (current_ts - last_print_stat_ts >= STAT_INTERVAL) { + if (ATOMIC_BCAS(&last_print_stat_ts_, last_print_stat_ts, current_ts)) { + int64_t dblink_trans_count = ATOMIC_LOAD(&dblink_trans_count_); + int64_t dblink_trans_commit_count = ATOMIC_LOAD(&dblink_trans_commit_count_); + int64_t dblink_trans_rollback_count = ATOMIC_LOAD(&dblink_trans_rollback_count_); + if (0 != dblink_trans_count + || 0 != dblink_trans_commit_count + || 0 != dblink_trans_rollback_count) { + FLOG_INFO("dblink trans statistics", + "dblink_trans_count", dblink_trans_count, + "dblink_trans_fail_count", ATOMIC_LOAD(&dblink_trans_fail_count_), + "dblink_trans_promotion_count", ATOMIC_LOAD(&dblink_trans_promotion_count_), + "dblink_trans_callback_count", ATOMIC_LOAD(&dblink_trans_callback_count_), + "dblink_trans_commit_count", dblink_trans_commit_count, + "dblink_trans_commit_latency", (0 == dblink_trans_commit_count) + ? 0 : ATOMIC_LOAD(&dblink_trans_commit_used_time_) / dblink_trans_commit_count, + "dblink_trans_commit_fail_count", ATOMIC_LOAD(&dblink_trans_commit_fail_count_), + "dblink_trans_rollback_count", dblink_trans_rollback_count, + "dblink_trans_rollback_latency", (0 == dblink_trans_rollback_count) + ? 0 : ATOMIC_LOAD(&dblink_trans_rollback_used_time_) / dblink_trans_rollback_count, + "dblink_trans_rollback_fail_count", ATOMIC_LOAD(&dblink_trans_rollback_fail_count_), + "duration", (current_ts - last_print_stat_ts)); + int64_t dblink_trans_xa_start_count = ATOMIC_LOAD(&dblink_trans_xa_start_count_); + int64_t dblink_trans_xa_end_count = ATOMIC_LOAD(&dblink_trans_xa_end_count_); + int64_t dblink_trans_xa_prepare_count = ATOMIC_LOAD(&dblink_trans_xa_prepare_count_); + int64_t dblink_trans_xa_commit_count = ATOMIC_LOAD(&dblink_trans_xa_commit_count_); + int64_t dblink_trans_xa_rollback_count = ATOMIC_LOAD(&dblink_trans_xa_rollback_count_); + FLOG_INFO("dblink trans xa statistics", + "dblink_trans_xa_start_count", dblink_trans_xa_start_count, + "dblink_trans_xa_start_latency", (0 == dblink_trans_xa_start_count) + ? 0 : ATOMIC_LOAD(&dblink_trans_xa_start_used_time_) / dblink_trans_xa_start_count, + "dblink_trans_xa_start_fail_count", ATOMIC_LOAD(&dblink_trans_xa_start_fail_count_), + "dblink_trans_xa_end_count", dblink_trans_xa_end_count, + "dblink_trans_xa_end_latency", (0 == dblink_trans_xa_end_count) + ? 0 : ATOMIC_LOAD(&dblink_trans_xa_end_used_time_) / dblink_trans_xa_end_count, + "dblink_trans_xa_end_fail_count", ATOMIC_LOAD(&dblink_trans_xa_end_fail_count_), + "dblink_trans_xa_prepare_count", dblink_trans_xa_prepare_count, + "dblink_trans_xa_prepare_latency", (0 == dblink_trans_xa_prepare_count) + ? 0 : ATOMIC_LOAD(&dblink_trans_xa_prepare_used_time_) / dblink_trans_xa_prepare_count, + "dblink_trans_xa_prepare_fail_count", ATOMIC_LOAD(&dblink_trans_xa_prepare_fail_count_), + "dblink_trans_xa_commit_count", dblink_trans_xa_commit_count, + "dblink_trans_xa_commit_latency", (0 == dblink_trans_xa_commit_count) + ? 0 : ATOMIC_LOAD(&dblink_trans_xa_commit_used_time_) / dblink_trans_xa_commit_count, + "dblink_trans_xa_commit_fail_count", ATOMIC_LOAD(&dblink_trans_xa_commit_fail_count_), + "dblink_trans_xa_rollback_count", dblink_trans_xa_rollback_count, + "dblink_trans_xa_rollback_latency", (0 == dblink_trans_xa_rollback_count) + ? 0 : ATOMIC_LOAD(&dblink_trans_xa_rollback_used_time_) / dblink_trans_xa_rollback_count, + "dblink_trans_xa_rollback_fail_count", ATOMIC_LOAD(&dblink_trans_xa_rollback_fail_count_), + K(last_print_stat_ts)); + } + ATOMIC_STORE(&dblink_trans_count_, 0); + ATOMIC_STORE(&dblink_trans_fail_count_, 0); + ATOMIC_STORE(&dblink_trans_promotion_count_, 0); + ATOMIC_STORE(&dblink_trans_commit_count_, 0); + ATOMIC_STORE(&dblink_trans_commit_used_time_, 0); + ATOMIC_STORE(&dblink_trans_commit_fail_count_, 0); + ATOMIC_STORE(&dblink_trans_rollback_count_, 0); + ATOMIC_STORE(&dblink_trans_rollback_used_time_, 0); + ATOMIC_STORE(&dblink_trans_rollback_fail_count_, 0); + ATOMIC_STORE(&dblink_trans_xa_start_count_, 0); + ATOMIC_STORE(&dblink_trans_xa_start_used_time_, 0); + ATOMIC_STORE(&dblink_trans_xa_start_fail_count_, 0); + ATOMIC_STORE(&dblink_trans_xa_end_count_, 0); + ATOMIC_STORE(&dblink_trans_xa_end_used_time_, 0); + ATOMIC_STORE(&dblink_trans_xa_end_fail_count_, 0); + ATOMIC_STORE(&dblink_trans_xa_prepare_count_, 0); + ATOMIC_STORE(&dblink_trans_xa_prepare_used_time_, 0); + ATOMIC_STORE(&dblink_trans_xa_prepare_fail_count_, 0); + ATOMIC_STORE(&dblink_trans_xa_commit_count_, 0); + ATOMIC_STORE(&dblink_trans_xa_commit_used_time_, 0); + ATOMIC_STORE(&dblink_trans_xa_commit_fail_count_, 0); + ATOMIC_STORE(&dblink_trans_xa_rollback_count_, 0); + ATOMIC_STORE(&dblink_trans_xa_rollback_used_time_, 0); + ATOMIC_STORE(&dblink_trans_xa_rollback_fail_count_, 0); + ATOMIC_STORE(&dblink_trans_callback_count_, 0); + } + } +} + +} // transaction +} // oceanbase diff --git a/src/storage/tx/ob_xa_trans_event.h b/src/storage/tx/ob_xa_trans_event.h new file mode 100644 index 0000000000..0325355fa3 --- /dev/null +++ b/src/storage/tx/ob_xa_trans_event.h @@ -0,0 +1,281 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_TRANSACTION_OB_XA_TRANS_EVENT_ +#define OCEANBASE_TRANSACTION_OB_XA_TRANS_EVENT_ + +#include "ob_trans_define.h" + +namespace oceanbase +{ +namespace transaction +{ +class ObXATransStatistics +{ +public: + ObXATransStatistics() + : is_inited_(false), tenant_id_(OB_INVALID_TENANT_ID), last_print_stat_ts_(0), last_print_active_ts_(0), + xa_start_count_(0), xa_start_remote_count_(0), xa_start_fail_count_(0), xa_start_used_time_us_(0), + xa_end_count_(0), xa_end_remote_count_(0), xa_end_fail_count_(0), xa_end_used_time_us_(0), + xa_prepare_count_(0), xa_prepare_remote_count_(0), xa_prepare_fail_count_(0), xa_prepare_used_time_us_(0), + xa_rollback_count_(0), xa_rollback_remote_count_(0), xa_rollback_fail_count_(0), xa_rollback_used_time_us_(0), + xa_commit_count_(0), xa_commit_remote_count_(0), xa_commit_fail_count_(0), xa_commit_used_time_us_(0), + xa_trans_start_count_(0), xa_read_only_trans_count_(0), xa_one_phase_commit_count_(0), + xa_inner_sql_count_(0), xa_inner_sql_used_time_us_(0), xa_inner_rpc_count_(0), xa_inner_rpc_used_time_us_(0), + xa_inner_sql_ten_ms_count_(0), xa_inner_sql_twenty_ms_count_(0), + xa_inner_rpc_ten_ms_count_(0), xa_inner_rpc_twenty_ms_count_(0), + active_xa_stmt_count_(0), active_xa_ctx_count_(0), xa_compensate_record_count_(0) {} + ~ObXATransStatistics() { destroy(); } + int init(const uint64_t tenant_id); + void reset(); + void destroy() { reset(); } + + // total count of xa start + void inc_xa_start_total_count(); + // total used time of xa start + void add_xa_start_total_used_time(const int64_t value); + // total count of remote xa start executed successfully + void inc_xa_start_remote_count(); + // total count of failed xa start + void inc_xa_start_fail_count(); + // total count of xa end + void inc_xa_end_total_count(); + // total used time of xa end + void add_xa_end_total_used_time(const int64_t value); + // total count of remote xa end executed successfully + void inc_xa_end_remote_count(); + // total count of failed xa end + void inc_xa_end_fail_count(); + // total count of xa prepare + void inc_xa_prepare_total_count(); + // total used time of xa prepare + void add_xa_prepare_total_used_time(const int64_t value); + // total count of remote xa prepare executed successfully + void inc_xa_prepare_remote_count(); + // total count of failed xa prepare + void inc_xa_prepare_fail_count(); + // total count of xa rollback + void inc_xa_rollback_total_count(); + // total used time of xa rollback + void add_xa_rollback_total_used_time(const int64_t value); + // total count of remote xa rollback + void inc_xa_rollback_remote_count(); + // total count of failed xa rollback + void inc_xa_rollback_fail_count(); + // total count of xa commit + void inc_xa_commit_total_count(); + // total used time of xa commit + void add_xa_commit_total_used_time(const int64_t value); + // total count of remote xa commit + void inc_xa_commit_remote_count(); + // total count of failed xa commit + void inc_xa_commit_fail_count(); + // total count xa trans has been started + void inc_xa_trans_start_count(); + // total count of read only xa trans + void inc_xa_read_only_trans_total_count(); + // total count of xa trans with one phase commit + void inc_xa_one_phase_commit_total_count(); + // total count of inner sql in xa stmt + void inc_xa_inner_sql_total_count(); + // total used time of inner sql in xa stmt + void add_xa_inner_sql_total_used_time(const int64_t value); + // total count of inner rpc in xa stmt + void inc_xa_inner_rpc_total_count(); + // total used time of inner rpc in xa stmt + void add_xa_inner_rpc_total_used_time(const int64_t value); + // increment active xa stmt count + void inc_active_xa_stmt_count(); + // decrement active xa stmt count + void dec_active_xa_stmt_count(); + // increase total count of inner sql whose latency is greater than 10ms + void inc_xa_inner_sql_ten_ms_total_count(); + // increase total count of inner sql whose latency is greater than 20ms + void inc_xa_inner_sql_twenty_ms_total_count(); + // increase total count of inner rpc whose latency is greater than 10ms + void inc_xa_inner_rpc_ten_ms_total_count(); + // increase total count of inner rpc whose latency is greater than 20ms + void inc_xa_inner_rpc_twenty_ms_total_count(); + // increment active xa ctx count + void inc_active_xa_ctx_count(); + // decrement active xa ctx count + void dec_active_xa_ctx_count(); + // increment compensate record count + void inc_compensate_record_count(); + +public: + void try_print_xa_statistics(); + void try_print_active(const int64_t start_ts); + +private: + DISALLOW_COPY_AND_ASSIGN(ObXATransStatistics); +private: + bool is_inited_; + uint64_t tenant_id_; + int64_t last_print_stat_ts_; + int64_t last_print_active_ts_; + // for xa statistics + int64_t xa_start_count_; + int64_t xa_start_remote_count_; + int64_t xa_start_fail_count_; + int64_t xa_start_used_time_us_; + int64_t xa_end_count_; + int64_t xa_end_remote_count_; + int64_t xa_end_fail_count_; + int64_t xa_end_used_time_us_; + int64_t xa_prepare_count_; + int64_t xa_prepare_remote_count_; + int64_t xa_prepare_fail_count_; + int64_t xa_prepare_used_time_us_; + int64_t xa_rollback_count_; + int64_t xa_rollback_remote_count_; + int64_t xa_rollback_fail_count_; + int64_t xa_rollback_used_time_us_; + int64_t xa_commit_count_; + int64_t xa_commit_remote_count_; + int64_t xa_commit_fail_count_; + int64_t xa_commit_used_time_us_; + int64_t xa_trans_start_count_; + int64_t xa_read_only_trans_count_; + int64_t xa_one_phase_commit_count_; + int64_t xa_inner_sql_count_; + int64_t xa_inner_sql_used_time_us_; + int64_t xa_inner_rpc_count_; + int64_t xa_inner_rpc_used_time_us_; + int64_t xa_inner_sql_ten_ms_count_; + int64_t xa_inner_sql_twenty_ms_count_; + int64_t xa_inner_rpc_ten_ms_count_; + int64_t xa_inner_rpc_twenty_ms_count_; + // active info + int64_t active_xa_stmt_count_; + int64_t active_xa_ctx_count_; + // inner logic + int64_t xa_compensate_record_count_; +}; + +class ObDBLinkTransStatistics +{ +public: + ObDBLinkTransStatistics() + : is_inited_(false), tenant_id_(OB_INVALID_TENANT_ID), last_print_stat_ts_(0), + dblink_trans_count_(0), dblink_trans_fail_count_(0), + dblink_trans_promotion_count_(0), dblink_trans_callback_count_(0), + dblink_trans_commit_count_(0), dblink_trans_commit_used_time_(0), + dblink_trans_commit_fail_count_(0), + dblink_trans_rollback_count_(0), dblink_trans_rollback_used_time_(0), + dblink_trans_rollback_fail_count_(0), + dblink_trans_xa_start_count_(0), dblink_trans_xa_start_used_time_(0), + dblink_trans_xa_start_fail_count_(0), + dblink_trans_xa_end_count_(0), dblink_trans_xa_end_used_time_(0), + dblink_trans_xa_end_fail_count_(0), + dblink_trans_xa_prepare_count_(0), dblink_trans_xa_prepare_used_time_(0), + dblink_trans_xa_prepare_fail_count_(0), + dblink_trans_xa_commit_count_(0), dblink_trans_xa_commit_used_time_(0), + dblink_trans_xa_commit_fail_count_(0), + dblink_trans_xa_rollback_count_(0), dblink_trans_xa_rollback_used_time_(0), + dblink_trans_xa_rollback_fail_count_(0) {} + ~ObDBLinkTransStatistics() { destroy(); } + int init(const uint64_t tenant_id); + void reset(); + void destroy() { reset(); } +public: + // increment dblink trans count + void inc_dblink_trans_count(); + // increment failed dblink trans count + void inc_dblink_trans_fail_count(); + // increment dblink trans promotion count + void inc_dblink_trans_promotion_count(); + // increment dblink trans callback count + void inc_dblink_trans_callback_count(); + // increment dblink trans commit count + void inc_dblink_trans_commit_count(); + // add dblink trans commit used time + void add_dblink_trans_commit_used_time(const int64_t used_time); + // increment failed dblink trans commit count + void inc_dblink_trans_commit_fail_count(); + // increment dblink trans rollback count + void inc_dblink_trans_rollback_count(); + // add dblink trans rollback used time + void add_dblink_trans_rollback_used_time(const int64_t used_time); + // increment failed dblink trans rollback count + void inc_dblink_trans_rollback_fail_count(); + // increment dblink trans xa_start count + void inc_dblink_trans_xa_start_count(); + // add dblink trans xa_start used time + void add_dblink_trans_xa_start_used_time(const int64_t used_time); + // increment failed dblink trans xa_start count + void inc_dblink_trans_xa_start_fail_count(); + // increment dblink trans xa_end count + void inc_dblink_trans_xa_end_count(); + // add dblink trans xa_end used time + void add_dblink_trans_xa_end_used_time(const int64_t used_time); + // increment failed dblink trans xa_end count + void inc_dblink_trans_xa_end_fail_count(); + // increment dblink trans xa_prepare count + void inc_dblink_trans_xa_prepare_count(); + // add dblink trans xa_prepare used time + void add_dblink_trans_xa_prepare_used_time(const int64_t used_time); + // increment failed dblink trans xa_prepare count + void inc_dblink_trans_xa_prepare_fail_count(); + // increment dblink trans xa_commit count + void inc_dblink_trans_xa_commit_count(); + // add dblink trans xa_commit used time + void add_dblink_trans_xa_commit_used_time(const int64_t used_time); + // increment failed dblink trans xa_commit count + void inc_dblink_trans_xa_commit_fail_count(); + // increment dblink trans xa_rollback count + void inc_dblink_trans_xa_rollback_count(); + // add dblink trans xa_rollback used time + void add_dblink_trans_xa_rollback_used_time(const int64_t used_time); + // increment failed dblink trans xa_rollback count + void inc_dblink_trans_xa_rollback_fail_count(); +public: + void try_print_dblink_statistics(); +private: + DISALLOW_COPY_AND_ASSIGN(ObDBLinkTransStatistics); +private: + bool is_inited_; + uint64_t tenant_id_; + int64_t last_print_stat_ts_; + // for statistics + int64_t dblink_trans_count_; + int64_t dblink_trans_fail_count_; + int64_t dblink_trans_promotion_count_; + int64_t dblink_trans_callback_count_; + int64_t dblink_trans_commit_count_; + int64_t dblink_trans_commit_used_time_; + int64_t dblink_trans_commit_fail_count_; + int64_t dblink_trans_rollback_count_; + int64_t dblink_trans_rollback_used_time_; + int64_t dblink_trans_rollback_fail_count_; + int64_t dblink_trans_xa_start_count_; + int64_t dblink_trans_xa_start_used_time_; + int64_t dblink_trans_xa_start_fail_count_; + int64_t dblink_trans_xa_end_count_; + int64_t dblink_trans_xa_end_used_time_; + int64_t dblink_trans_xa_end_fail_count_; + int64_t dblink_trans_xa_prepare_count_; + int64_t dblink_trans_xa_prepare_used_time_; + int64_t dblink_trans_xa_prepare_fail_count_; + int64_t dblink_trans_xa_commit_count_; + int64_t dblink_trans_xa_commit_used_time_; + int64_t dblink_trans_xa_commit_fail_count_; + int64_t dblink_trans_xa_rollback_count_; + int64_t dblink_trans_xa_rollback_used_time_; + int64_t dblink_trans_xa_rollback_fail_count_; +}; + +} // transaction +} // oceanbase + + +#endif // OCEANABAE_TRANSACTION_OB_XA_TRANS_EVENT_ diff --git a/src/storage/tx/ob_xa_trans_heartbeat_worker.cpp b/src/storage/tx/ob_xa_trans_heartbeat_worker.cpp index b057874801..e26be94542 100644 --- a/src/storage/tx/ob_xa_trans_heartbeat_worker.cpp +++ b/src/storage/tx/ob_xa_trans_heartbeat_worker.cpp @@ -64,7 +64,8 @@ void ObXATransHeartbeatWorker::run1() while (!has_set_stop()) { int64_t start_time = ObTimeUtility::current_time(); loop_count++; - MTL(ObXAService *)->get_xa_statistics().print_statistics(start_time); + // MTL(ObXAService *)->get_xa_statistics().print_statistics(start_time); + MTL(ObXAService *)->try_print_statistics(); if (OB_UNLIKELY(!is_inited_)) { TRANS_LOG(WARN, "xa trans heartbeat not init");