From 905a5e35fa23f240846dc27eb5a9001908c73d36 Mon Sep 17 00:00:00 2001 From: KyrielightWei Date: Mon, 17 Jun 2024 11:10:10 +0000 Subject: [PATCH] Print logs for the tx_ctx member as needed --- src/storage/CMakeLists.txt | 1 + src/storage/tx/ob_committer_define.h | 34 ++++ src/storage/tx/ob_multi_data_source.h | 60 ++++++- src/storage/tx/ob_trans_ctx.h | 14 +- src/storage/tx/ob_trans_ctx_lock.cpp | 3 +- src/storage/tx/ob_trans_ctx_lock.h | 1 + src/storage/tx/ob_trans_ctx_mgr_v4.h | 2 +- src/storage/tx/ob_trans_define.cpp | 36 ---- src/storage/tx/ob_trans_define.h | 172 ++++++++++--------- src/storage/tx/ob_trans_part_ctx.cpp | 108 +++++++++--- src/storage/tx/ob_trans_part_ctx.h | 58 +++---- src/storage/tx/ob_tx_log.cpp | 2 +- src/storage/tx/ob_tx_log_operator.h | 23 ++- src/storage/tx/ob_tx_on_demand_print.cpp | 130 ++++++++++++++ src/storage/tx/ob_tx_on_demand_print.h | 72 ++++++++ unittest/storage/tx/it/test_register_mds.cpp | 1 + 16 files changed, 519 insertions(+), 198 deletions(-) create mode 100644 src/storage/tx/ob_tx_on_demand_print.cpp create mode 100644 src/storage/tx/ob_tx_on_demand_print.h diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index 07298df63..c01f31510 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -376,6 +376,7 @@ ob_set_subtarget(ob_storage tx tx/ob_tx_big_segment_buf.cpp tx/ob_tx_ls_log_writer.cpp tx/ob_tx_msg.cpp + tx/ob_tx_on_demand_print.cpp tx/ob_tx_replay_executor.cpp tx/ob_dup_table_util.cpp tx/ob_dup_table_lease.cpp diff --git a/src/storage/tx/ob_committer_define.h b/src/storage/tx/ob_committer_define.h index 613622c20..73b4267c6 100644 --- a/src/storage/tx/ob_committer_define.h +++ b/src/storage/tx/ob_committer_define.h @@ -81,6 +81,40 @@ enum class ObTxState : uint8_t const int64_t OB_C2PC_UPSTREAM_ID = INT64_MAX - 1; const int64_t OB_C2PC_SENDER_ID = INT64_MAX - 2; +#define TRX_ENUM_CASE_TO_STR(class_name, src) \ + case class_name::src: \ + str = #src; \ + break; + +static const char *to_str_2pc_role(Ob2PCRole role) +{ + const char *str = "INVALID"; + switch (role) { + TRX_ENUM_CASE_TO_STR(Ob2PCRole, UNKNOWN) + TRX_ENUM_CASE_TO_STR(Ob2PCRole, ROOT) + TRX_ENUM_CASE_TO_STR(Ob2PCRole, INTERNAL) + TRX_ENUM_CASE_TO_STR(Ob2PCRole, LEAF) + }; + return str; +} + +static const char *to_str_tx_state(ObTxState state) +{ + const char *str = "INVALID"; + switch (state) { + TRX_ENUM_CASE_TO_STR(ObTxState, UNKNOWN) + TRX_ENUM_CASE_TO_STR(ObTxState, INIT) + TRX_ENUM_CASE_TO_STR(ObTxState, REDO_COMPLETE) + TRX_ENUM_CASE_TO_STR(ObTxState, PREPARE) + TRX_ENUM_CASE_TO_STR(ObTxState, PRE_COMMIT) + TRX_ENUM_CASE_TO_STR(ObTxState, COMMIT) + TRX_ENUM_CASE_TO_STR(ObTxState, ABORT) + TRX_ENUM_CASE_TO_STR(ObTxState, CLEAR) + TRX_ENUM_CASE_TO_STR(ObTxState, MAX) + }; + return str; +} + /* // ObITxCommitter provides method to commit the transaction with user provided callbacks. */ /* // The interface need guarantee the atomicity of the transaction. */ /* class ObITxCommitter */ diff --git a/src/storage/tx/ob_multi_data_source.h b/src/storage/tx/ob_multi_data_source.h index 2d7e7f027..c5a8bd22b 100644 --- a/src/storage/tx/ob_multi_data_source.h +++ b/src/storage/tx/ob_multi_data_source.h @@ -66,6 +66,41 @@ enum class ObTxDataSourceType : int64_t MAX_TYPE = 100 }; +static const char * to_str_mds_type(const ObTxDataSourceType & mds_type ) +{ + const char * str = "INVALID"; + switch(mds_type) + { + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, UNKNOWN); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, MEM_TABLE); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TABLE_LOCK); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, LS_TABLE); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, DDL_BARRIER); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, DDL_TRANS); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, STANDBY_UPGRADE); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, BEFORE_VERSION_4_1); + + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TEST1); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TEST2); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TEST3); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, CREATE_TABLET_NEW_MDS); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, DELETE_TABLET_NEW_MDS); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, UNBIND_TABLET_NEW_MDS); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, START_TRANSFER_OUT); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, START_TRANSFER_IN); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, FINISH_TRANSFER_OUT); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, FINISH_TRANSFER_IN); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TRANSFER_TASK); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, START_TRANSFER_OUT_PREPARE); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, START_TRANSFER_OUT_V2); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TRANSFER_MOVE_TX_CTX); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TRANSFER_DEST_PREPARE); + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, CHANGE_TABLET_TO_TABLE_MDS); + + TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, MAX_TYPE); + } + return str; +} enum class NotifyType : int64_t { @@ -78,6 +113,22 @@ enum class NotifyType : int64_t ON_ABORT = 5 }; +static const char * to_str_notify_type(const NotifyType & notify_type) +{ + const char * str = "INVALID"; + switch(notify_type) + { + TRX_ENUM_CASE_TO_STR(NotifyType, UNKNOWN); + TRX_ENUM_CASE_TO_STR(NotifyType, REGISTER_SUCC); + TRX_ENUM_CASE_TO_STR(NotifyType, ON_REDO); + TRX_ENUM_CASE_TO_STR(NotifyType, TX_END); + TRX_ENUM_CASE_TO_STR(NotifyType, ON_PREPARE); + TRX_ENUM_CASE_TO_STR(NotifyType, ON_COMMIT); + TRX_ENUM_CASE_TO_STR(NotifyType, ON_ABORT); + } + return str; +} + class ObTxBufferNode { friend class ObPartTransCtx; @@ -144,7 +195,12 @@ public: } storage::mds::BufferCtxNode &get_buffer_ctx_node() const { return buffer_ctx_node_; } - TO_STRING_KV(K(register_no_), K(has_submitted_), K(has_synced_), K_(type), K(data_.length())); + TO_STRING_KV(K(register_no_), + K(has_submitted_), + K(has_synced_), + "type", + to_str_mds_type(type_), + K(data_.length())); private: uint64_t register_no_; @@ -261,7 +317,7 @@ public: const share::ObLSID &get_ls_id() { return ls_id_; } const ObRegisterMdsFlag &get_register_flag() { return register_flag_; } - TO_STRING_KV(K(mds_str_), K(type_), K(ls_id_), K(register_flag_)); + TO_STRING_KV(K(mds_str_), "type_", to_str_mds_type(type_), K(ls_id_), K(register_flag_)); private: // const char *msd_buf_; diff --git a/src/storage/tx/ob_trans_ctx.h b/src/storage/tx/ob_trans_ctx.h index b00a51238..3ffcb2558 100644 --- a/src/storage/tx/ob_trans_ctx.h +++ b/src/storage/tx/ob_trans_ctx.h @@ -35,6 +35,7 @@ #include "share/rc/ob_context.h" #include "share/ob_light_hashmap.h" #include "ob_tx_elr_handler.h" +#include "storage/tx/ob_tx_on_demand_print.h" namespace oceanbase { @@ -64,19 +65,6 @@ class KillTransArg; class ObLSTxCtxMgr; } -#define TO_STRING_KV_(args...) DEFINE_TO_STRING_(J_KV(args)) - -#define DEFINE_TO_STRING_(body) DECLARE_TO_STRING_ \ - { \ - int64_t pos = 0; \ - J_OBJ_START(); \ - body; \ - J_OBJ_END(); \ - return pos; \ - } - -#define DECLARE_TO_STRING_ int64_t to_string_(char* buf, const int64_t buf_len) const - namespace transaction { static inline void protocol_error(const int64_t state, const int64_t msg_type) diff --git a/src/storage/tx/ob_trans_ctx_lock.cpp b/src/storage/tx/ob_trans_ctx_lock.cpp index 7da43e1cd..6dbafe3b2 100644 --- a/src/storage/tx/ob_trans_ctx_lock.cpp +++ b/src/storage/tx/ob_trans_ctx_lock.cpp @@ -152,7 +152,8 @@ void CtxLock::unlock_ctx() if (lock_start_ts > 0) { const int64_t lock_ts = ObClockGenerator::getClock() - lock_start_ts; if (lock_ts > WARN_LOCK_TS) { - TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "ctx lock too much time", K(arg.trans_id_), K(lock_ts), K(lbt())); + TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "ctx lock too much time", K(arg.trans_id_), + K(arg.ls_id_), K(lock_ts), K(lbt())); } } after_unlock(arg); diff --git a/src/storage/tx/ob_trans_ctx_lock.h b/src/storage/tx/ob_trans_ctx_lock.h index c44c19293..bffc8dc4b 100644 --- a/src/storage/tx/ob_trans_ctx_lock.h +++ b/src/storage/tx/ob_trans_ctx_lock.h @@ -131,6 +131,7 @@ public: ~CtxLockGuard(); void set(CtxLock &lock, uint8_t mode = MODE::ALL); void reset(); + int64_t get_hold_ts() { return hold_ts_; } int64_t get_lock_acquire_used_time() const { return hold_ts_ - request_ts_; diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.h b/src/storage/tx/ob_trans_ctx_mgr_v4.h index 282ef9e68..e88fc7d21 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.h +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.h @@ -128,7 +128,7 @@ struct ObTxCreateArg && trans_expired_time_ > 0 && NULL != trans_service_; } - TO_STRING_KV(K_(for_replay), "ctx_source_", to_str(ctx_source_), + TO_STRING_KV(K_(for_replay), "ctx_source", to_str_ctx_source(ctx_source_), K_(tenant_id), K_(tx_id), K_(ls_id), K_(cluster_id), K_(cluster_version), K_(session_id), K_(scheduler), K_(trans_expired_time), KP_(trans_service), diff --git a/src/storage/tx/ob_trans_define.cpp b/src/storage/tx/ob_trans_define.cpp index 4bfef44e4..b8b3b5abe 100644 --- a/src/storage/tx/ob_trans_define.cpp +++ b/src/storage/tx/ob_trans_define.cpp @@ -697,42 +697,6 @@ bool is_transfer_ctx(PartCtxSource ctx_source) return PartCtxSource::TRANSFER == ctx_source || PartCtxSource::TRANSFER_RECOVER == ctx_source; } -const char *to_str(PartCtxSource src) -{ - const char *str = "INVALID"; - switch (src) { - case PartCtxSource::UNKOWN: { - str = "UNKOWN"; - break; - } - case PartCtxSource::MVCC_WRITE: { - str = "MVCC_WRITE"; - break; - } - case PartCtxSource::REGISTER_MDS: { - str = "REGISTER_MDS"; - break; - } - case PartCtxSource::REPLAY: { - str = "REPLAY"; - break; - } - case PartCtxSource::RECOVER: { - str = "RECOVER"; - break; - } - case PartCtxSource::TRANSFER: { - str = "TRANSFER"; - break; - } - case PartCtxSource::TRANSFER_RECOVER: { - str = "TRANSFER_RECOVER"; - break; - } - } - return str; -} - void ObTxExecInfo::reset() { state_ = ObTxState::INIT; diff --git a/src/storage/tx/ob_trans_define.h b/src/storage/tx/ob_trans_define.h index dd03b2f14..e715f13ec 100644 --- a/src/storage/tx/ob_trans_define.h +++ b/src/storage/tx/ob_trans_define.h @@ -32,6 +32,7 @@ #include "storage/tx/ob_trans_result.h" #include "storage/tx/ob_xa_define.h" #include "storage/tx/ob_direct_load_tx_ctx_define.h" +#include "storage/tx/ob_tx_on_demand_print.h" #include "ob_multi_data_source.h" #include "share/scn.h" #include "storage/tx/ob_tx_seq.h" @@ -968,96 +969,86 @@ private: class ObTxSubState { public: - ObTxSubState() : flag_(0) {} + ObTxSubState() : flag_() {} ~ObTxSubState() {} - void reset() { flag_ = 0; } + void reset() { flag_.reset(); } - bool is_info_log_submitted() const - { return flag_ & INFO_LOG_SUBMITTED_BIT; } - void set_info_log_submitted() - { flag_ |= INFO_LOG_SUBMITTED_BIT; } - void clear_info_log_submitted() - { flag_ &= ~INFO_LOG_SUBMITTED_BIT; } + bool is_info_log_submitted() const { return flag_.info_log_submitted_; } + void set_info_log_submitted() { flag_.info_log_submitted_ = 1; } + void clear_info_log_submitted() { flag_.info_log_submitted_ = 0; } - bool is_gts_waiting() const - { return flag_ & GTS_WAITING_BIT; } - void set_gts_waiting() - { flag_ |= GTS_WAITING_BIT; } - void clear_gts_waiting() - { flag_ &= ~GTS_WAITING_BIT; } + bool is_gts_waiting() const { return flag_.gts_waiting_; } + void set_gts_waiting() { flag_.gts_waiting_ = 1; } + void clear_gts_waiting() { flag_.gts_waiting_ = 0; } - bool is_state_log_submitting() const - { return flag_ & STATE_LOG_SUBMITTING_BIT; } - void set_state_log_submitting() - { flag_ |= STATE_LOG_SUBMITTING_BIT; } - void clear_state_log_submitting() - { flag_ &= ~STATE_LOG_SUBMITTING_BIT; } + bool is_state_log_submitting() const { return flag_.state_log_submitting_; } + void set_state_log_submitting() { flag_.state_log_submitting_ = 1; } + void clear_state_log_submitting() { flag_.state_log_submitting_ = 0; } - bool is_state_log_submitted() const - { return flag_ & STATE_LOG_SUBMITTED_BIT; } - void set_state_log_submitted() - { flag_ |= STATE_LOG_SUBMITTED_BIT; } - void clear_state_log_submitted() - { flag_ &= ~STATE_LOG_SUBMITTED_BIT; } + bool is_state_log_submitted() const { return flag_.state_log_submitted_; } + void set_state_log_submitted() { flag_.state_log_submitted_ = 1; } + void clear_state_log_submitted() { flag_.state_log_submitted_ = 0; } - bool is_prepare_notified() const - { return flag_ & PREPARE_NOTIFY_BIT; } - void set_prepare_notified() - { flag_ |= PREPARE_NOTIFY_BIT; } - void clear_prepare_notified() - { flag_ &= ~PREPARE_NOTIFY_BIT; } + bool is_prepare_notified() const { return flag_.prepare_notify_; } + void set_prepare_notified() { flag_.prepare_notify_ = 1; } + void clear_prepare_notified() { flag_.prepare_notify_ = 0; } - bool is_force_abort() const - { return flag_ & FORCE_ABORT_BIT; } - void set_force_abort() - { flag_ |= FORCE_ABORT_BIT; } - void clear_force_abort() - { flag_ &= ~FORCE_ABORT_BIT; } + bool is_force_abort() const { return flag_.force_abort_; } + void set_force_abort() { flag_.force_abort_ = 1; } + void clear_force_abort() { flag_.force_abort_ = 0; } - bool is_transfer_blocking() const - { return flag_ & TRANSFER_BLOCKING_BIT; } - void set_transfer_blocking() - { flag_ |= TRANSFER_BLOCKING_BIT; } - void clear_transfer_blocking() - { flag_ &= ~TRANSFER_BLOCKING_BIT; } + bool is_transfer_blocking() const { return flag_.transfer_blocking_; } + void set_transfer_blocking() { flag_.transfer_blocking_ = 1; } + void clear_transfer_blocking() { flag_.transfer_blocking_ = 0; } - // bool is_prepare_log_submitted() const - // { return flag_ & PREPARE_LOG_SUBMITTED_BIT; } - // void set_prepare_log_submitted() - // { flag_ |= PREPARE_LOG_SUBMITTED_BIT; } + DECLARE_ON_DEMAND_TO_STRING + TO_STRING_KV("info_log_submitted", + flag_.info_log_submitted_, + "gts_waiting", + flag_.gts_waiting_, + "state_log_submitting", + flag_.state_log_submitting_, + "state_log_submitted", + flag_.state_log_submitted_, + // "prepare_notify", + // flag_.prepare_notify_, + "force_abort", + flag_.force_abort_, + "transfer_blocking", + flag_.transfer_blocking_); - // bool is_commit_log_submitted() const - // { return flag_ & COMMIT_LOG_SUBMITTED_BIT; } - // void set_commit_log_submitted() - // { return flag_ |= COMMIT_LOG_SUBMITTED_BIT; } - - // bool is_abort_log_submitted() const - // { return flag_ & ABORT_LOG_SUBMITTED_BIT; } - // void set_abort_log_submitted() - // { flag_ |= ABORT_LOG_SUBMITTED_BIT; } - - // bool is_clear_log_submitted() const - // { return flag_ & CLEAR_LOG_SUBMITTED_BIT; } - // void set_clear_log_submitted() - // { flag_ |= CLEAR_LOG_SUBMITTED_BIT; } - TO_STRING_KV(K_(flag)); + bool is_valid() const { return flag_.is_valid(); } private: - static const int64_t INIT = 0; - static const int64_t INFO_LOG_SUBMITTED_BIT = 1UL << 1; - static const int64_t GTS_WAITING_BIT = 1UL << 2; - // static const int64_t GTS_RECEIVED = 3; - static const int64_t STATE_LOG_SUBMITTING_BIT = 1UL << 3; - static const int64_t STATE_LOG_SUBMITTED_BIT = 1UL << 4; - // static const int64_t PREPARE_LOG_SUBMITTED_BIT = 1UL << 4; - // static const int64_t COMMIT_LOG_SUBMITTED_BIT = 1UL << 5; - // static const int64_t ABORT_LOG_SUBMITTED_BIT = 1UL << 6; - // static const int64_t CLEAR_LOG_SUBMITTED_BIT = 1UL << 7; - // indicate whether notified multi data source to prepare - static const int64_t PREPARE_NOTIFY_BIT = 1UL << 5; - static const int64_t FORCE_ABORT_BIT = 1UL << 6; - static const int64_t TRANSFER_BLOCKING_BIT = 1UL << 7; -private: - int64_t flag_; + struct BitFlag + { + unsigned int info_log_submitted_ : 1; + unsigned int gts_waiting_ : 1; + unsigned int state_log_submitting_ : 1; + unsigned int state_log_submitted_ : 1; + unsigned int prepare_notify_ : 1; + unsigned int force_abort_ : 1; + unsigned int transfer_blocking_ : 1; + + void reset() + { + info_log_submitted_ = 0; + gts_waiting_ = 0; + state_log_submitting_ = 0; + state_log_submitted_ = 0; + prepare_notify_ = 0; + force_abort_ = 0; + transfer_blocking_ = 0; + } + + bool is_valid() const + { + return info_log_submitted_ > 0 || gts_waiting_ > 0 || state_log_submitted_ > 0 + || state_log_submitting_ > 0 || prepare_notify_ > 0 || force_abort_ > 0 + || transfer_blocking_ > 0; + } + + BitFlag() { reset(); } + } flag_; }; class Ob2PCPrepareState @@ -1649,7 +1640,7 @@ private: enum class PartCtxSource { - UNKOWN = 0, + UNKNOWN = 0, MVCC_WRITE = 1, REGISTER_MDS = 2, REPLAY = 3, @@ -1658,9 +1649,24 @@ enum class PartCtxSource TRANSFER_RECOVER = 6, }; +static const char * to_str_ctx_source(const PartCtxSource & ctx_src) +{ + const char * str = "INVALID"; + switch(ctx_src) + { + TRX_ENUM_CASE_TO_STR(PartCtxSource, UNKNOWN); + TRX_ENUM_CASE_TO_STR(PartCtxSource, MVCC_WRITE); + TRX_ENUM_CASE_TO_STR(PartCtxSource, REGISTER_MDS); + TRX_ENUM_CASE_TO_STR(PartCtxSource, REPLAY); + TRX_ENUM_CASE_TO_STR(PartCtxSource, RECOVER); + TRX_ENUM_CASE_TO_STR(PartCtxSource, TRANSFER); + TRX_ENUM_CASE_TO_STR(PartCtxSource, TRANSFER_RECOVER); + } + return str; +} + bool is_transfer_ctx(PartCtxSource ctx_source); -const char *to_str(PartCtxSource src); enum class RetainCause : int16_t { @@ -1722,6 +1728,8 @@ private: const ObTxCommitParts &commit_parts); public: + DECLARE_ON_DEMAND_TO_STRING + DECLARE_TO_STRING { // const_cast>(checksum_).set_max_print_count(512); // const_cast>(checksum_scn_).set_max_print_count(512); @@ -1760,7 +1768,7 @@ public: K_(exec_epoch), K_(serial_final_scn), K_(serial_final_seq_no), - K_(dli_batch_set)); + K(dli_batch_set_.size())); return pos; } ObTxState state_; diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 3fc528f8c..6fa5324d2 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -368,7 +368,7 @@ void ObPartTransCtx::default_init_() mds_cache_.reset(); retain_ctx_func_ptr_ = nullptr; create_ctx_scn_.reset(); - ctx_source_ = PartCtxSource::UNKOWN; + ctx_source_ = PartCtxSource::UNKNOWN; replay_completeness_.reset(); is_submitting_redo_log_for_freeze_ = false; start_working_log_ts_ = SCN::min_scn(); @@ -2211,6 +2211,21 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb) int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; const int64_t cur_ts = ObTimeUtility::current_time(); + + const int64_t LOG_CB_ON_SUCC_TIME_LIMIT = 100 * 1000; + share::SCN max_cost_cb_scn; + int64_t max_cost_cb_time = 0; + int64_t skip_on_succ_cnt = 0; + int64_t invoke_on_succ_cnt = 0; + int64_t invoke_on_succ_time = 0; + int64_t submit_record_log_time = 0; + int64_t fast_commit_time = 0; + int64_t on_succ_ctx_lock_hold_time = 0; + int64_t try_submit_next_log_cost_time = 0; + + int64_t log_sync_used_time = 0; + int64_t ctx_lock_wait_time = 0; + bool handle_fast_commit = false; bool try_submit_next_log = false; bool need_return_log_cb = false; @@ -2218,10 +2233,10 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb) // allow fill redo concurrently with log callback CtxLockGuard guard(lock_, is_committing_() ? CtxLockGuard::MODE::ALL : CtxLockGuard::MODE::CTX); - const int64_t log_sync_used_time = cur_ts - log_cb->get_submit_ts(); + log_sync_used_time = cur_ts - log_cb->get_submit_ts(); ObTransStatistic::get_instance().add_clog_sync_time(tenant_id_, log_sync_used_time); ObTransStatistic::get_instance().add_clog_sync_count(tenant_id_, 1); - const int64_t ctx_lock_wait_time = guard.get_lock_acquire_used_time(); + ctx_lock_wait_time = guard.get_lock_acquire_used_time(); if (log_sync_used_time + ctx_lock_wait_time >= ObServerConfig::get_instance().clog_sync_time_warn_threshold) { TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "transaction log sync use too much time", KPC(log_cb), K(log_sync_used_time), K(ctx_lock_wait_time)); @@ -2236,12 +2251,14 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb) #endif } if (log_cb->is_callbacked()) { + skip_on_succ_cnt++; #ifndef NDEBUG TRANS_LOG(INFO, "cb has been callbacked", KPC(log_cb)); #endif busy_cbs_.remove(log_cb); return_log_cb_(log_cb); } else if (is_exiting_) { + skip_on_succ_cnt++; // the TxCtx maybe has been killed forcedly by background GC thread // the log_cb process has been skipped if (sub_state_.is_force_abort()) { @@ -2264,8 +2281,11 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb) // process all preceding log_cbs for (int64_t i = 0; i < busy_cbs_.get_size(); i++) { if (cur_cb->is_callbacked()) { + skip_on_succ_cnt++; // do nothing } else { + invoke_on_succ_cnt++; + const int64_t before_invoke_ts = ObTimeUtility::fast_current_time(); if (OB_FAIL(on_success_ops_(cur_cb))) { TRANS_LOG(ERROR, "invoke on_success_ops failed", K(ret), K(*this), K(*cur_cb)); if (OB_SUCCESS == save_ret) { @@ -2279,6 +2299,16 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb) } // ignore ret and set cur_cb callbacked cur_cb->set_callbacked(); + const int64_t after_invoke_ts = ObTimeUtility::fast_current_time(); + if (after_invoke_ts - before_invoke_ts > max_cost_cb_time) { + max_cost_cb_time = after_invoke_ts - before_invoke_ts; + max_cost_cb_scn = log_cb->get_log_ts(); + } + if (after_invoke_ts - before_invoke_ts > LOG_CB_ON_SUCC_TIME_LIMIT) { + TRANS_LOG(WARN, "invoke on_succ cost too much time", K(ret), K(trans_id_), K(ls_id_), K(cur_cb), + K(log_cb)); + } + invoke_on_succ_time += (after_invoke_ts - before_invoke_ts); } if (cur_cb == log_cb) { break; @@ -2297,23 +2327,29 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb) // try submit record log under CtxLock if (need_record_log_()) { // ignore error + const int64_t before_submit_record_ts = ObTimeUtility::fast_current_time(); if (OB_SUCCESS != (tmp_ret = submit_record_log_())) { TRANS_LOG(WARN, "failed to submit record log", K(tmp_ret), K(*this)); } + submit_record_log_time = ObTimeUtility::fast_current_time() - before_submit_record_ts; } handle_fast_commit = !(sub_state_.is_state_log_submitted() || log_cb->get_callbacks().count() == 0); try_submit_next_log = !ObTxLogTypeChecker::is_state_log(log_cb->get_last_log_type()) && is_committing_(); busy_cbs_.remove(log_cb); need_return_log_cb = true; } + + on_succ_ctx_lock_hold_time = ObTimeUtility::fast_current_time() - guard.get_hold_ts(); } // let fast commit out of ctx's lock, because it is time consuming in calculating checksum if (handle_fast_commit) { // acquire REDO_FLUSH_READ LOCK, which allow other thread flush redo // but disable other manage operation on ctx // FIXME: acquire CTX's READ lock maybe better + const int64_t before_fast_commit_ts = ObTimeUtility::fast_current_time(); CtxLockGuard guard(lock_, CtxLockGuard::MODE::REDO_FLUSH_R); mt_ctx_.remove_callbacks_for_fast_commit(log_cb->get_callbacks()); + fast_commit_time = ObTimeUtility::fast_current_time() - before_fast_commit_ts; } if (need_return_log_cb) { return_log_cb_(log_cb); @@ -2327,6 +2363,14 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb) if (OB_SUCCESS != (tmp_ret = ls_tx_ctx_mgr_->revert_tx_ctx_without_lock(this))) { TRANS_LOG(ERROR, "release ctx ref failed", KR(tmp_ret)); } + + if (ObTimeUtility::fast_current_time() - cur_ts > LOG_CB_ON_SUCC_TIME_LIMIT) { + TRANS_LOG(WARN, "on_success cost too much time", K(ret), K(trans_id_), K(ls_id_), + K(max_cost_cb_scn), K(max_cost_cb_time), K(skip_on_succ_cnt), K(invoke_on_succ_cnt), + K(invoke_on_succ_time), K(submit_record_log_time), K(fast_commit_time), + K(on_succ_ctx_lock_hold_time), K(try_submit_next_log_cost_time), K(log_sync_used_time), + K(ctx_lock_wait_time)); + } return ret; } @@ -3998,12 +4042,19 @@ int ObPartTransCtx::submit_direct_load_inc_log_( TRANS_LOG(WARN, "serialize direct load log failed", K(ret), K(dli_log), K(replay_hint), KPC(this)); } else if (OB_FAIL(dli_log_op(ObTxLogOpType::SUBMIT))) { - TRANS_LOG(WARN, "try to submit direct load inc log failed", K(ret), KPC(this)); + if (ret == OB_TX_NOLOGCB) { + if (REACH_COUNT_PER_SEC(10) && REACH_TIME_INTERVAL(100 * 1000)) { + TRANS_LOG(INFO, "no log cb with dli log", KR(ret), K(dli_log_type), K(batch_key), + KPC(busy_cbs_.get_first())); + } + } else { + TRANS_LOG(WARN, "try to submit direct load inc log failed", K(ret), KPC(this)); + } } else { scn = dli_log_op.get_scn(); } - TRANS_LOG(INFO, " submit direct load inc log", K(ret), K(get_trans_id()), + TRANS_LOG(DEBUG, " submit direct load inc log", K(ret), K(get_trans_id()), K(get_ls_id()), K(dli_log), K(dli_log_type), K(batch_key), K(replay_barrier), K(replay_hint), K(scn)); return ret; @@ -4048,18 +4099,22 @@ int ObPartTransCtx::check_dli_batch_completed_(ObTxLogType submit_log_type) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; + const int64_t start_ts = ObTimeUtility::fast_current_time(); if (exec_info_.dli_batch_set_.size() > 0) { int64_t need_ddl_end_count = 0; + int64_t compensate_ddl_end_fail_cnt = 0; + const int64_t MAX_COMPENSATE_FAIL_CNT = 100; ObTxDirectLoadBatchKeyArray unused_batch_key_array; - for (ObDLIBatchSet::iterator iter = exec_info_.dli_batch_set_.begin(); iter != exec_info_.dli_batch_set_.end(); - iter++) { + for (ObDLIBatchSet::iterator iter = exec_info_.dli_batch_set_.begin(); + iter != exec_info_.dli_batch_set_.end(); iter++) { if (iter->first.need_compensate_ddl_end()) { - TRANS_LOG(INFO, " need compensate ddl end log", K(ret), K(tmp_ret), + TRANS_LOG(DEBUG, " need compensate ddl end log", K(ret), K(tmp_ret), K(submit_log_type), K(trans_id_), K(ls_id_), K(iter->first)); - if (ObTxLogType::TX_ABORT_LOG == submit_log_type - || ObTxLogType::TX_COMMIT_INFO_LOG == submit_log_type) { + if (compensate_ddl_end_fail_cnt <= MAX_COMPENSATE_FAIL_CNT + && (ObTxLogType::TX_ABORT_LOG == submit_log_type + || ObTxLogType::TX_COMMIT_INFO_LOG == submit_log_type)) { ObDDLIncCommitLog inc_commit_log; ObDDLIncCommitClogCb *extra_cb = static_cast( mtl_malloc(sizeof(ObDDLIncCommitClogCb), "TxExtraCb")); @@ -4076,7 +4131,8 @@ int ObPartTransCtx::check_dli_batch_completed_(ObTxLogType submit_log_type) if (OB_TMP_FAIL(ret)) { // do nothing } else if (OB_TMP_FAIL(extra_cb->init(ls_id_, iter->first.get_batch_key()))) { - TRANS_LOG(WARN, "init extra cb failed", K(ret), K(iter->first), K(trans_id_), K(ls_id_)); + TRANS_LOG(WARN, "init extra cb failed", K(ret), K(iter->first), K(trans_id_), + K(ls_id_)); extra_cb->~ObDDLIncCommitClogCb(); mtl_free(extra_cb); } else if (OB_TMP_FAIL(inc_commit_log.init(iter->first.get_batch_key()))) { @@ -4084,23 +4140,29 @@ int ObPartTransCtx::check_dli_batch_completed_(ObTxLogType submit_log_type) K(trans_id_), K(ls_id_)); extra_cb->~ObDDLIncCommitClogCb(); mtl_free(extra_cb); - } else if (OB_TMP_FAIL( - submit_direct_load_inc_commit_log(inc_commit_log, extra_cb, submitted_scn, true))) { - TRANS_LOG(WARN, "submit direct load inc commit log failed", K(ret), K(inc_commit_log), - KPC(extra_cb), K(submitted_scn), K(trans_id_), K(ls_id_)); + } else if (OB_TMP_FAIL(submit_direct_load_inc_commit_log(inc_commit_log, extra_cb, + submitted_scn, true))) { + if (tmp_ret != OB_TX_NOLOGCB) { + TRANS_LOG(WARN, "submit direct load inc commit log failed", K(ret), K(inc_commit_log), + KPC(extra_cb), K(submitted_scn), K(trans_id_), K(ls_id_)); + } extra_cb->~ObDDLIncCommitClogCb(); mtl_free(extra_cb); } else { } } + if (OB_TMP_FAIL(tmp_ret)) { + compensate_ddl_end_fail_cnt += 1; + } + if (iter->first.need_compensate_ddl_end()) { need_ddl_end_count += 1; } } else if (iter->first.is_ddl_start_logging()) { need_ddl_end_count += 1; - TRANS_LOG(INFO, "the DDL inc start is logging", K(ret), K(tmp_ret), K(iter->first), K(trans_id_), - K(ls_id_)); + TRANS_LOG(INFO, "the DDL inc start is logging", K(ret), K(tmp_ret), K(iter->first), + K(trans_id_), K(ls_id_)); } else if (iter->first.is_ddl_end_logging()) { TRANS_LOG(INFO, "the DDL inc end is logging", K(ret), K(tmp_ret), K(iter->first), K(trans_id_), K(ls_id_)); @@ -4125,16 +4187,17 @@ int ObPartTransCtx::check_dli_batch_completed_(ObTxLogType submit_log_type) && (get_downstream_state() >= ObTxState::REDO_COMPLETE || submit_log_type != ObTxLogType::TX_CLEAR_LOG)) { ret = OB_TRANS_NEED_ROLLBACK; - TRANS_LOG(ERROR, " incompleted direct load inc batch info with state log", + TRANS_LOG(ERROR, + " incompleted direct load inc batch info with state log", K(ret), K(tmp_ret), K(submit_log_type), K(trans_id_), K(ls_id_), - K(need_ddl_end_count)); + K(need_ddl_end_count), K(ObTimeUtility::fast_current_time() - start_ts)); } else { ret = OB_TRANS_CANNOT_BE_KILLED; TRANS_LOG(WARN, " The trx can not be finished because of a incompleted " "direct load inc batch info", K(ret), K(tmp_ret), K(submit_log_type), K(trans_id_), K(ls_id_), - K(need_ddl_end_count)); + K(need_ddl_end_count), K(ObTimeUtility::fast_current_time() - start_ts)); } } } @@ -7541,8 +7604,9 @@ int ObPartTransCtx::notify_data_source_(const NotifyType notify_type, TRANS_LOG(WARN, "notify data source failed", K(ret), K(arg)); } if (notify_array.count() > 0) { - TRANS_LOG(INFO, "notify MDS", K(ret), K(trans_id_), K(ls_id_), K(notify_type), K(log_ts), - K(notify_array.count()), K(notify_array), K(total_time)); + TRANS_LOG(INFO, "notify MDS", K(ret), K(trans_id_), K(ls_id_), "notify_type", + to_str_notify_type(notify_type), K(log_ts), K(notify_array.count()), K(notify_array), + K(total_time)); } } return ret; diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 032339538..dc37b6b0d 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -287,36 +287,32 @@ public: int64_t to_string(char* buf, const int64_t buf_len) const; private: // thread unsafe - TO_STRING_KV_(K_(ls_id), - K_(session_id), - K_(part_trans_action), - K_(pending_write), - K_(exec_info), - K_(sub_state), - K(is_leaf()), - K(is_root()), - K(busy_cbs_.get_size()), - K(final_log_cb_), - K(ctx_tx_data_), - K(role_state_), - K(create_ctx_scn_), - "ctx_source", ctx_source_, - K(epoch_), - K(replay_completeness_), - K(mt_ctx_), - K(coord_prepare_info_arr_), - K_(upstream_state), - K_(retain_cause), - "2pc_role", - get_2pc_role(), - K_(collected), - K_(rec_log_ts), - K_(prev_rec_log_ts), - K_(lastest_snapshot), - K_(state_info_array), - K_(last_request_ts), - KP_(block_frozen_memtable), - K_(max_2pc_commit_scn)); + ON_DEMAND_TO_STRING_KV_("self_ls_id", + ls_id_, + K_(session_id), + K_(part_trans_action), + K_(pending_write), + "2pc_role", + to_str_2pc_role(get_2pc_role()), + K(ctx_tx_data_), + K(role_state_), + K(create_ctx_scn_), + "ctx_source", + ctx_source_, + K(epoch_), + K(replay_completeness_), + "upstream_state", + to_str_tx_state(upstream_state_), + K_(collected), + K_(rec_log_ts), + K_(prev_rec_log_ts), + K_(lastest_snapshot), + K_(last_request_ts), + KP_(block_frozen_memtable), + K_(max_2pc_commit_scn), + K(mt_ctx_)); + + public: static const int64_t OP_LOCAL_NUM = 16; static const int64_t RESERVED_MEM_SIZE = 256; @@ -508,7 +504,7 @@ public: ATOMIC_CAS(&retain_cause_, static_cast(RetainCause::UNKOWN), static_cast(cause)); } - RetainCause get_retain_cause() { return static_cast(ATOMIC_LOAD(&retain_cause_)); }; + RetainCause get_retain_cause() const { return static_cast(ATOMIC_LOAD(&retain_cause_)); }; int del_retain_ctx(); diff --git a/src/storage/tx/ob_tx_log.cpp b/src/storage/tx/ob_tx_log.cpp index d2301e3d8..0d8776615 100644 --- a/src/storage/tx/ob_tx_log.cpp +++ b/src/storage/tx/ob_tx_log.cpp @@ -1179,7 +1179,7 @@ int ObTxMultiDataSourceLog::ob_admin_dump(ObAdminMutatorStringArg &arg) arg.writer_ptr_->start_object(); for (int64_t i = 0; i < data_.count(); i++) { arg.writer_ptr_->dump_key("type"); - arg.writer_ptr_->dump_string(to_cstring(static_cast(data_[i].get_data_source_type()))); + arg.writer_ptr_->dump_string(to_str_mds_type(data_[i].get_data_source_type())); arg.writer_ptr_->dump_key("buf_len"); arg.writer_ptr_->dump_string(to_cstring(data_[i].get_data_size())); arg.writer_ptr_->dump_key("content"); diff --git a/src/storage/tx/ob_tx_log_operator.h b/src/storage/tx/ob_tx_log_operator.h index 9e9887cfc..57170116f 100644 --- a/src/storage/tx/ob_tx_log_operator.h +++ b/src/storage/tx/ob_tx_log_operator.h @@ -435,7 +435,9 @@ OB_INLINE int ObTxCtxLogOperator::operator()(const ObTxLogOpType op_type) if (OB_FAIL(prepare_special_resource_())) { TRANS_LOG(WARN, "prepare special resource failed", K(ret), K(T::LOG_TYPE), KPC(this)); } else if (OB_FAIL(prepare_generic_resource_())) { - TRANS_LOG(WARN, "prepare generic resource failed", K(ret), K(T::LOG_TYPE), KPC(this)); + if (OB_TX_NOLOGCB != ret) { + TRANS_LOG(WARN, "prepare generic resource failed", K(ret), K(T::LOG_TYPE), KPC(this)); + } } else if (OB_FAIL(construct_log_object_())) { TRANS_LOG(WARN, "construct log object failed", K(ret), K(T::LOG_TYPE), KPC(this)); } else if (OB_FAIL(insert_into_log_block_())) { @@ -560,7 +562,7 @@ OB_INLINE void ObTxCtxLogOperator::after_submit_log_succ_( log_op_arg_.submit_arg_.log_cb_->set_ddl_batch_key(construct_arg_->batch_key_); log_op_arg_.submit_arg_.log_cb_->get_extra_cb()->__set_scn(scn_); } - TRANS_LOG(INFO, " after submit log succ", K(ret), KPC(this)); + TRANS_LOG(DEBUG, " after submit log succ", K(ret), KPC(this)); } template <> @@ -590,7 +592,7 @@ OB_INLINE int ObTxCtxLogOperator::log_sync_succ_() TRANS_LOG(WARN, "invoke the on_success of a extra_cb_ failed", K(ret), KPC(log_op_arg_.submit_arg_.log_cb_)); } else { - TRANS_LOG(INFO, " sync log succ", K(ret), KPC(this)); + TRANS_LOG(DEBUG, " sync log succ", K(ret), KPC(this)); } return ret; @@ -691,9 +693,10 @@ OB_INLINE int ObTxCtxLogOperator::replay_out_ctx_() } } - TRANS_LOG(INFO, " replay out ctx", K(ret), KPC(this), K(log_object_ptr_), - K(log_op_arg_.replay_arg_)); - + if (OB_FAIL(ret)) { + TRANS_LOG(INFO, " replay out ctx", K(ret), KPC(this), K(log_object_ptr_), + K(log_op_arg_.replay_arg_)); + } return ret; } @@ -703,7 +706,7 @@ OB_INLINE int ObTxCtxLogOperator::replay_fail_out_ctx_() int ret = OB_SUCCESS; // TODO direct_load_inc // replay data into ddl kv? - TRANS_LOG(INFO, " replay fail out ctx", K(ret)); + TRANS_LOG(DEBUG, " replay fail out ctx", K(ret)); return ret; } @@ -767,8 +770,10 @@ OB_INLINE int ObTxCtxLogOperator::replay_in_ctx_() } } - TRANS_LOG(INFO, " replay in ctx", K(ret), KPC(this), K(log_object_ptr_), - K(log_op_arg_.replay_arg_)); + if (OB_FAIL(ret)) { + TRANS_LOG(INFO, " replay in ctx", K(ret), KPC(this), K(log_object_ptr_), + K(log_op_arg_.replay_arg_)); + } return ret; } diff --git a/src/storage/tx/ob_tx_on_demand_print.cpp b/src/storage/tx/ob_tx_on_demand_print.cpp new file mode 100644 index 000000000..f8627d18d --- /dev/null +++ b/src/storage/tx/ob_tx_on_demand_print.cpp @@ -0,0 +1,130 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "storage/tx/ob_trans_part_ctx.h" + +namespace oceanbase +{ + +namespace transaction +{ + +IMPL_ON_DEMAND_PRINT_FUNC(ObTxSubState) +{ + int ret = OB_SUCCESS; + int tmp_pos = 0; + + // if (flag_.is_valid()) { + ON_DEMAND_START_PRINT(SubState); + + TX_KV_PRINT_WITH_ERR(flag_.info_log_submitted_ > 0, info_log_submitted, + flag_.info_log_submitted_, " "); + TX_KV_PRINT_WITH_ERR(flag_.gts_waiting_ > 0, gts_waiting, flag_.gts_waiting_, " "); + TX_KV_PRINT_WITH_ERR(flag_.state_log_submitting_ > 0, state_log_submitting, + flag_.state_log_submitting_, " "); + TX_KV_PRINT_WITH_ERR(flag_.state_log_submitted_ > 0, state_log_submitted, + flag_.state_log_submitted_, " "); + TX_KV_PRINT_WITH_ERR(flag_.prepare_notify_ > 0, prepare_notify, flag_.prepare_notify_, " "); + TX_KV_PRINT_WITH_ERR(flag_.force_abort_ > 0, force_abort, flag_.force_abort_, " "); + TX_KV_PRINT_WITH_ERR(flag_.transfer_blocking_ > 0, transfer_blocking, flag_.transfer_blocking_, " "); + + ON_DEMAND_END_PRINT(SubState); + // } + return ret; +} + +IMPL_ON_DEMAND_PRINT_FUNC(ObTxExecInfo) +{ + int ret = OB_SUCCESS; + int tmp_pos = 0; + + ON_DEMAND_START_PRINT(ExecInfo); + + TX_KV_PRINT_WITH_ERR(true, downstream_state, state_, ", "); + TX_KV_PRINT_WITH_ERR(true, upstream, upstream_, ", "); + TX_KV_PRINT_WITH_ERR(true, participants,participants_, ", "); + TX_KV_PRINT_WITH_ERR(true, redo_log_no, redo_lsns_.count(), ", "); + TX_KV_PRINT_WITH_ERR(true, scheduler, scheduler_, ", "); + TX_KV_PRINT_WITH_ERR(true, prepare_version, prepare_version_, ", "); + TX_KV_PRINT_WITH_ERR(true, trans_type, trans_type_, ", "); + TX_KV_PRINT_WITH_ERR(true, next_log_entry_no, next_log_entry_no_, ", "); + TX_KV_PRINT_WITH_ERR(true, max_applied_log_ts, max_applied_log_ts_, ", "); + TX_KV_PRINT_WITH_ERR(true, max_appling_log_ts, max_applying_log_ts_, ", "); + TX_KV_PRINT_WITH_ERR(true, max_applying_part_log_no, max_applying_part_log_no_, ", "); + TX_KV_PRINT_WITH_ERR(true, max_submitted_seq_no, max_submitted_seq_no_, ", "); + TX_KV_PRINT_WITH_ERR(true, checksum, checksum_, ", "); + TX_KV_PRINT_WITH_ERR(true, checksum_scn, checksum_scn_, ", "); + TX_KV_PRINT_WITH_ERR(true, need_checksum, need_checksum_, ", "); + TX_KV_PRINT_WITH_ERR(true, data_complete, data_complete_, ", "); + TX_KV_PRINT_WITH_ERR(true, is_dup_tx, is_dup_tx_, ", "); + TX_KV_PRINT_WITH_ERR(true, exec_epoch, exec_epoch_, ", "); + + + TX_KV_PRINT_WITH_ERR(!incremental_participants_.empty(),incremental_participants, incremental_participants_, ", "); + TX_KV_PRINT_WITH_ERR(!intermediate_participants_.empty(), intermediate_participants, intermediate_participants_, ", "); + TX_KV_PRINT_WITH_ERR(prev_record_lsn_.is_valid(), prev_record_lsn, prev_record_lsn_, ", "); + TX_KV_PRINT_WITH_ERR(!redo_lsns_.empty(), redo_lsns, redo_lsns_, ", "); + TX_KV_PRINT_WITH_ERR(!multi_data_source_.empty(), multi_data_source, multi_data_source_, ", "); + TX_KV_PRINT_WITH_ERR(max_durable_lsn_.is_valid(),max_durable_lsn , max_durable_lsn_, ", "); + TX_KV_PRINT_WITH_ERR(!prepare_log_info_arr_.empty(),prepare_log_info_arr , prepare_log_info_arr_, ", "); + TX_KV_PRINT_WITH_ERR(!xid_.empty(), xid , xid_, ", "); + TX_KV_PRINT_WITH_ERR(is_sub2pc_, is_sub2pc , is_sub2pc_, ", "); + TX_KV_PRINT_WITH_ERR(is_transfer_blocking_, is_transfer_blocking , is_transfer_blocking_, ", "); + TX_KV_PRINT_WITH_ERR(!commit_parts_.empty(), commit_parts , commit_parts_, ", "); + TX_KV_PRINT_WITH_ERR(!transfer_parts_.empty(), transfer_parts, transfer_parts_, ", "); + TX_KV_PRINT_WITH_ERR(is_empty_ctx_created_by_transfer_, is_empty_ctx_created_by_transfer, is_empty_ctx_created_by_transfer_, ", "); + TX_KV_PRINT_WITH_ERR(serial_final_scn_.is_valid(), serial_final_scn, serial_final_scn_, ", "); + TX_KV_PRINT_WITH_ERR(serial_final_seq_no_.is_valid(), serial_final_seq_no,serial_final_seq_no_, ", "); + + TX_KV_PRINT_WITH_ERR(!dli_batch_set_.empty(), dli_batch_count, dli_batch_set_.size(), ", "); + TX_KV_PRINT_WITH_ERR(!dli_batch_set_.empty(), dli_batch_set, dli_batch_set_, " "); + + ON_DEMAND_END_PRINT(ExecInfo); + + return ret; +} + +IMPL_ON_DEMAND_PRINT_FUNC(ObPartTransCtx) +{ + int ret = OB_SUCCESS; + int tmp_pos = 0; + + ON_DEMAND_START_PRINT(TxCtxExtra); + + TX_KV_PRINT_WITH_ERR(!busy_cbs_.is_empty(), busy_cbs_cnt, busy_cbs_.get_size(), ", "); + TX_KV_PRINT_WITH_ERR(!busy_cbs_.is_empty(), oldest_busy_cb, busy_cbs_.get_first(), ", "); + TX_KV_PRINT_WITH_ERR(final_log_cb_.is_valid() && !final_log_cb_.is_callbacked(), final_log_cb, + final_log_cb_, ", "); + + TX_PRINT_FUNC_WITH_ERR(sub_state_.is_valid(), sub_state_.on_demand_print_, ", "); + + + TX_KV_PRINT_WITH_ERR(!coord_prepare_info_arr_.empty(), coord_prepare_info_arr_, + coord_prepare_info_arr_, ", "); + + TX_KV_PRINT_WITH_ERR(get_retain_cause() != RetainCause::UNKOWN, retain_cause, retain_cause_, ", "); + + TX_KV_PRINT_WITH_ERR(!state_info_array_.empty(), state_info_array, state_info_array_, ", "); + + // TX_KV_PRINT_WITH_ERR(OB_NOT_NULL(block_frozen_memtable_), block_frozen_memtable, + // block_frozen_memtable_, ", "); + // + TX_PRINT_FUNC_WITH_ERR(true, + exec_info_.on_demand_print_, " "); + ON_DEMAND_END_PRINT(TxCtxExtra); + + return ret; +} + +} // namespace transaction + +} // namespace oceanbase diff --git a/src/storage/tx/ob_tx_on_demand_print.h b/src/storage/tx/ob_tx_on_demand_print.h new file mode 100644 index 000000000..a1b9b450e --- /dev/null +++ b/src/storage/tx/ob_tx_on_demand_print.h @@ -0,0 +1,72 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_TRANSACTION_OB_TX_ON_DEMAND_PRINT_HEADER +#define OCEANBASE_TRANSACTION_OB_TX_ON_DEMAND_PRINT_HEADER + +namespace oceanbase +{ +namespace transaction +{ + +#define IMPL_ON_DEMAND_PRINT_FUNC(ClassName) \ + int ClassName::on_demand_print_(char *buf, const int64_t buf_len, int64_t &pos) const + +#define DECLARE_ON_DEMAND_TO_STRING \ + int on_demand_print_(char *buf, const int64_t buf_len, int64_t &pos) const; + +#define ON_DEMAND_TO_STRING_KV_(args...) DEFINE_ON_DEMAND_TO_STRING_(J_KV(args)) + +#define DEFINE_ON_DEMAND_TO_STRING_(body) \ + DECLARE_ON_DEMAND_TO_STRING \ + DECLARE_TO_STRING_ \ + { \ + int64_t pos = 0; \ + J_OBJ_START(); \ + body; \ + on_demand_print_(buf, buf_len, pos); \ + J_OBJ_END(); \ + return pos; \ + } + +#define DECLARE_TO_STRING_ int64_t to_string_(char *buf, const int64_t buf_len) const +#define OBJ_TO_STR(obj) #obj + +#define ON_DEMAND_START_PRINT(prefix_name) \ + BUF_PRINTF(" " OBJ_TO_STR(-{)) + +#define ON_DEMAND_END_PRINT(postfix_name) \ + BUF_PRINTF(OBJ_TO_STR(}-)) + +#define TX_KV_PRINT_WITH_ERR(print_condition, name, obj, separator) \ + if (print_condition) { \ + int tmp_ret = OB_SUCCESS; \ + int tmp_pos = pos; \ + if (OB_TMP_FAIL(common::databuff_print_json_kv(buf, buf_len, pos, #name, obj))) { \ + (void)common::databuff_print_kv(buf, buf_len, pos, #name, tmp_ret); \ + } \ + BUF_PRINTF(separator); \ + } + +#define TX_PRINT_FUNC_WITH_ERR(print_condition, func, separator) \ + if (print_condition) { \ + int tmp_ret = OB_SUCCESS; \ + int tmp_pos = pos; \ + if (OB_TMP_FAIL(func(buf, buf_len, pos))) { \ + (void)common::databuff_print_kv(buf, buf_len, pos, #func, tmp_ret); \ + } \ + BUF_PRINTF(separator); \ + } +} // namespace transaction +} // namespace oceanbase + +#endif diff --git a/unittest/storage/tx/it/test_register_mds.cpp b/unittest/storage/tx/it/test_register_mds.cpp index 4ad59a8f2..58d07b4ca 100644 --- a/unittest/storage/tx/it/test_register_mds.cpp +++ b/unittest/storage/tx/it/test_register_mds.cpp @@ -14,6 +14,7 @@ #include #define private public #define protected public +#include "storage/tx/ob_committer_define.h" #include "storage/tx/ob_multi_data_source.h" #include "storage/tx/ob_trans_define.h" #include "storage/tx/ob_trans_part_ctx.h"