Print logs for the tx_ctx member as needed

This commit is contained in:
KyrielightWei 2024-06-17 11:10:10 +00:00 committed by ob-robot
parent 6dfa79db04
commit 905a5e35fa
16 changed files with 519 additions and 198 deletions

View File

@ -376,6 +376,7 @@ ob_set_subtarget(ob_storage tx
tx/ob_tx_big_segment_buf.cpp
tx/ob_tx_ls_log_writer.cpp
tx/ob_tx_msg.cpp
tx/ob_tx_on_demand_print.cpp
tx/ob_tx_replay_executor.cpp
tx/ob_dup_table_util.cpp
tx/ob_dup_table_lease.cpp

View File

@ -81,6 +81,40 @@ enum class ObTxState : uint8_t
const int64_t OB_C2PC_UPSTREAM_ID = INT64_MAX - 1;
const int64_t OB_C2PC_SENDER_ID = INT64_MAX - 2;
#define TRX_ENUM_CASE_TO_STR(class_name, src) \
case class_name::src: \
str = #src; \
break;
static const char *to_str_2pc_role(Ob2PCRole role)
{
const char *str = "INVALID";
switch (role) {
TRX_ENUM_CASE_TO_STR(Ob2PCRole, UNKNOWN)
TRX_ENUM_CASE_TO_STR(Ob2PCRole, ROOT)
TRX_ENUM_CASE_TO_STR(Ob2PCRole, INTERNAL)
TRX_ENUM_CASE_TO_STR(Ob2PCRole, LEAF)
};
return str;
}
static const char *to_str_tx_state(ObTxState state)
{
const char *str = "INVALID";
switch (state) {
TRX_ENUM_CASE_TO_STR(ObTxState, UNKNOWN)
TRX_ENUM_CASE_TO_STR(ObTxState, INIT)
TRX_ENUM_CASE_TO_STR(ObTxState, REDO_COMPLETE)
TRX_ENUM_CASE_TO_STR(ObTxState, PREPARE)
TRX_ENUM_CASE_TO_STR(ObTxState, PRE_COMMIT)
TRX_ENUM_CASE_TO_STR(ObTxState, COMMIT)
TRX_ENUM_CASE_TO_STR(ObTxState, ABORT)
TRX_ENUM_CASE_TO_STR(ObTxState, CLEAR)
TRX_ENUM_CASE_TO_STR(ObTxState, MAX)
};
return str;
}
/* // ObITxCommitter provides method to commit the transaction with user provided callbacks. */
/* // The interface need guarantee the atomicity of the transaction. */
/* class ObITxCommitter */

View File

@ -66,6 +66,41 @@ enum class ObTxDataSourceType : int64_t
MAX_TYPE = 100
};
static const char * to_str_mds_type(const ObTxDataSourceType & mds_type )
{
const char * str = "INVALID";
switch(mds_type)
{
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, UNKNOWN);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, MEM_TABLE);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TABLE_LOCK);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, LS_TABLE);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, DDL_BARRIER);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, DDL_TRANS);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, STANDBY_UPGRADE);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, BEFORE_VERSION_4_1);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TEST1);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TEST2);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TEST3);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, CREATE_TABLET_NEW_MDS);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, DELETE_TABLET_NEW_MDS);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, UNBIND_TABLET_NEW_MDS);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, START_TRANSFER_OUT);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, START_TRANSFER_IN);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, FINISH_TRANSFER_OUT);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, FINISH_TRANSFER_IN);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TRANSFER_TASK);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, START_TRANSFER_OUT_PREPARE);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, START_TRANSFER_OUT_V2);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TRANSFER_MOVE_TX_CTX);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, TRANSFER_DEST_PREPARE);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, CHANGE_TABLET_TO_TABLE_MDS);
TRX_ENUM_CASE_TO_STR(ObTxDataSourceType, MAX_TYPE);
}
return str;
}
enum class NotifyType : int64_t
{
@ -78,6 +113,22 @@ enum class NotifyType : int64_t
ON_ABORT = 5
};
static const char * to_str_notify_type(const NotifyType & notify_type)
{
const char * str = "INVALID";
switch(notify_type)
{
TRX_ENUM_CASE_TO_STR(NotifyType, UNKNOWN);
TRX_ENUM_CASE_TO_STR(NotifyType, REGISTER_SUCC);
TRX_ENUM_CASE_TO_STR(NotifyType, ON_REDO);
TRX_ENUM_CASE_TO_STR(NotifyType, TX_END);
TRX_ENUM_CASE_TO_STR(NotifyType, ON_PREPARE);
TRX_ENUM_CASE_TO_STR(NotifyType, ON_COMMIT);
TRX_ENUM_CASE_TO_STR(NotifyType, ON_ABORT);
}
return str;
}
class ObTxBufferNode
{
friend class ObPartTransCtx;
@ -144,7 +195,12 @@ public:
}
storage::mds::BufferCtxNode &get_buffer_ctx_node() const { return buffer_ctx_node_; }
TO_STRING_KV(K(register_no_), K(has_submitted_), K(has_synced_), K_(type), K(data_.length()));
TO_STRING_KV(K(register_no_),
K(has_submitted_),
K(has_synced_),
"type",
to_str_mds_type(type_),
K(data_.length()));
private:
uint64_t register_no_;
@ -261,7 +317,7 @@ public:
const share::ObLSID &get_ls_id() { return ls_id_; }
const ObRegisterMdsFlag &get_register_flag() { return register_flag_; }
TO_STRING_KV(K(mds_str_), K(type_), K(ls_id_), K(register_flag_));
TO_STRING_KV(K(mds_str_), "type_", to_str_mds_type(type_), K(ls_id_), K(register_flag_));
private:
// const char *msd_buf_;

View File

@ -35,6 +35,7 @@
#include "share/rc/ob_context.h"
#include "share/ob_light_hashmap.h"
#include "ob_tx_elr_handler.h"
#include "storage/tx/ob_tx_on_demand_print.h"
namespace oceanbase
{
@ -64,19 +65,6 @@ class KillTransArg;
class ObLSTxCtxMgr;
}
#define TO_STRING_KV_(args...) DEFINE_TO_STRING_(J_KV(args))
#define DEFINE_TO_STRING_(body) DECLARE_TO_STRING_ \
{ \
int64_t pos = 0; \
J_OBJ_START(); \
body; \
J_OBJ_END(); \
return pos; \
}
#define DECLARE_TO_STRING_ int64_t to_string_(char* buf, const int64_t buf_len) const
namespace transaction
{
static inline void protocol_error(const int64_t state, const int64_t msg_type)

View File

@ -152,7 +152,8 @@ void CtxLock::unlock_ctx()
if (lock_start_ts > 0) {
const int64_t lock_ts = ObClockGenerator::getClock() - lock_start_ts;
if (lock_ts > WARN_LOCK_TS) {
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "ctx lock too much time", K(arg.trans_id_), K(lock_ts), K(lbt()));
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "ctx lock too much time", K(arg.trans_id_),
K(arg.ls_id_), K(lock_ts), K(lbt()));
}
}
after_unlock(arg);

View File

@ -131,6 +131,7 @@ public:
~CtxLockGuard();
void set(CtxLock &lock, uint8_t mode = MODE::ALL);
void reset();
int64_t get_hold_ts() { return hold_ts_; }
int64_t get_lock_acquire_used_time() const
{
return hold_ts_ - request_ts_;

View File

@ -128,7 +128,7 @@ struct ObTxCreateArg
&& trans_expired_time_ > 0
&& NULL != trans_service_;
}
TO_STRING_KV(K_(for_replay), "ctx_source_", to_str(ctx_source_),
TO_STRING_KV(K_(for_replay), "ctx_source", to_str_ctx_source(ctx_source_),
K_(tenant_id), K_(tx_id),
K_(ls_id), K_(cluster_id), K_(cluster_version),
K_(session_id), K_(scheduler), K_(trans_expired_time), KP_(trans_service),

View File

@ -697,42 +697,6 @@ bool is_transfer_ctx(PartCtxSource ctx_source)
return PartCtxSource::TRANSFER == ctx_source || PartCtxSource::TRANSFER_RECOVER == ctx_source;
}
const char *to_str(PartCtxSource src)
{
const char *str = "INVALID";
switch (src) {
case PartCtxSource::UNKOWN: {
str = "UNKOWN";
break;
}
case PartCtxSource::MVCC_WRITE: {
str = "MVCC_WRITE";
break;
}
case PartCtxSource::REGISTER_MDS: {
str = "REGISTER_MDS";
break;
}
case PartCtxSource::REPLAY: {
str = "REPLAY";
break;
}
case PartCtxSource::RECOVER: {
str = "RECOVER";
break;
}
case PartCtxSource::TRANSFER: {
str = "TRANSFER";
break;
}
case PartCtxSource::TRANSFER_RECOVER: {
str = "TRANSFER_RECOVER";
break;
}
}
return str;
}
void ObTxExecInfo::reset()
{
state_ = ObTxState::INIT;

View File

@ -32,6 +32,7 @@
#include "storage/tx/ob_trans_result.h"
#include "storage/tx/ob_xa_define.h"
#include "storage/tx/ob_direct_load_tx_ctx_define.h"
#include "storage/tx/ob_tx_on_demand_print.h"
#include "ob_multi_data_source.h"
#include "share/scn.h"
#include "storage/tx/ob_tx_seq.h"
@ -968,96 +969,86 @@ private:
class ObTxSubState
{
public:
ObTxSubState() : flag_(0) {}
ObTxSubState() : flag_() {}
~ObTxSubState() {}
void reset() { flag_ = 0; }
void reset() { flag_.reset(); }
bool is_info_log_submitted() const
{ return flag_ & INFO_LOG_SUBMITTED_BIT; }
void set_info_log_submitted()
{ flag_ |= INFO_LOG_SUBMITTED_BIT; }
void clear_info_log_submitted()
{ flag_ &= ~INFO_LOG_SUBMITTED_BIT; }
bool is_info_log_submitted() const { return flag_.info_log_submitted_; }
void set_info_log_submitted() { flag_.info_log_submitted_ = 1; }
void clear_info_log_submitted() { flag_.info_log_submitted_ = 0; }
bool is_gts_waiting() const
{ return flag_ & GTS_WAITING_BIT; }
void set_gts_waiting()
{ flag_ |= GTS_WAITING_BIT; }
void clear_gts_waiting()
{ flag_ &= ~GTS_WAITING_BIT; }
bool is_gts_waiting() const { return flag_.gts_waiting_; }
void set_gts_waiting() { flag_.gts_waiting_ = 1; }
void clear_gts_waiting() { flag_.gts_waiting_ = 0; }
bool is_state_log_submitting() const
{ return flag_ & STATE_LOG_SUBMITTING_BIT; }
void set_state_log_submitting()
{ flag_ |= STATE_LOG_SUBMITTING_BIT; }
void clear_state_log_submitting()
{ flag_ &= ~STATE_LOG_SUBMITTING_BIT; }
bool is_state_log_submitting() const { return flag_.state_log_submitting_; }
void set_state_log_submitting() { flag_.state_log_submitting_ = 1; }
void clear_state_log_submitting() { flag_.state_log_submitting_ = 0; }
bool is_state_log_submitted() const
{ return flag_ & STATE_LOG_SUBMITTED_BIT; }
void set_state_log_submitted()
{ flag_ |= STATE_LOG_SUBMITTED_BIT; }
void clear_state_log_submitted()
{ flag_ &= ~STATE_LOG_SUBMITTED_BIT; }
bool is_state_log_submitted() const { return flag_.state_log_submitted_; }
void set_state_log_submitted() { flag_.state_log_submitted_ = 1; }
void clear_state_log_submitted() { flag_.state_log_submitted_ = 0; }
bool is_prepare_notified() const
{ return flag_ & PREPARE_NOTIFY_BIT; }
void set_prepare_notified()
{ flag_ |= PREPARE_NOTIFY_BIT; }
void clear_prepare_notified()
{ flag_ &= ~PREPARE_NOTIFY_BIT; }
bool is_prepare_notified() const { return flag_.prepare_notify_; }
void set_prepare_notified() { flag_.prepare_notify_ = 1; }
void clear_prepare_notified() { flag_.prepare_notify_ = 0; }
bool is_force_abort() const
{ return flag_ & FORCE_ABORT_BIT; }
void set_force_abort()
{ flag_ |= FORCE_ABORT_BIT; }
void clear_force_abort()
{ flag_ &= ~FORCE_ABORT_BIT; }
bool is_force_abort() const { return flag_.force_abort_; }
void set_force_abort() { flag_.force_abort_ = 1; }
void clear_force_abort() { flag_.force_abort_ = 0; }
bool is_transfer_blocking() const
{ return flag_ & TRANSFER_BLOCKING_BIT; }
void set_transfer_blocking()
{ flag_ |= TRANSFER_BLOCKING_BIT; }
void clear_transfer_blocking()
{ flag_ &= ~TRANSFER_BLOCKING_BIT; }
bool is_transfer_blocking() const { return flag_.transfer_blocking_; }
void set_transfer_blocking() { flag_.transfer_blocking_ = 1; }
void clear_transfer_blocking() { flag_.transfer_blocking_ = 0; }
// bool is_prepare_log_submitted() const
// { return flag_ & PREPARE_LOG_SUBMITTED_BIT; }
// void set_prepare_log_submitted()
// { flag_ |= PREPARE_LOG_SUBMITTED_BIT; }
DECLARE_ON_DEMAND_TO_STRING
TO_STRING_KV("info_log_submitted",
flag_.info_log_submitted_,
"gts_waiting",
flag_.gts_waiting_,
"state_log_submitting",
flag_.state_log_submitting_,
"state_log_submitted",
flag_.state_log_submitted_,
// "prepare_notify",
// flag_.prepare_notify_,
"force_abort",
flag_.force_abort_,
"transfer_blocking",
flag_.transfer_blocking_);
// bool is_commit_log_submitted() const
// { return flag_ & COMMIT_LOG_SUBMITTED_BIT; }
// void set_commit_log_submitted()
// { return flag_ |= COMMIT_LOG_SUBMITTED_BIT; }
// bool is_abort_log_submitted() const
// { return flag_ & ABORT_LOG_SUBMITTED_BIT; }
// void set_abort_log_submitted()
// { flag_ |= ABORT_LOG_SUBMITTED_BIT; }
// bool is_clear_log_submitted() const
// { return flag_ & CLEAR_LOG_SUBMITTED_BIT; }
// void set_clear_log_submitted()
// { flag_ |= CLEAR_LOG_SUBMITTED_BIT; }
TO_STRING_KV(K_(flag));
bool is_valid() const { return flag_.is_valid(); }
private:
static const int64_t INIT = 0;
static const int64_t INFO_LOG_SUBMITTED_BIT = 1UL << 1;
static const int64_t GTS_WAITING_BIT = 1UL << 2;
// static const int64_t GTS_RECEIVED = 3;
static const int64_t STATE_LOG_SUBMITTING_BIT = 1UL << 3;
static const int64_t STATE_LOG_SUBMITTED_BIT = 1UL << 4;
// static const int64_t PREPARE_LOG_SUBMITTED_BIT = 1UL << 4;
// static const int64_t COMMIT_LOG_SUBMITTED_BIT = 1UL << 5;
// static const int64_t ABORT_LOG_SUBMITTED_BIT = 1UL << 6;
// static const int64_t CLEAR_LOG_SUBMITTED_BIT = 1UL << 7;
// indicate whether notified multi data source to prepare
static const int64_t PREPARE_NOTIFY_BIT = 1UL << 5;
static const int64_t FORCE_ABORT_BIT = 1UL << 6;
static const int64_t TRANSFER_BLOCKING_BIT = 1UL << 7;
private:
int64_t flag_;
struct BitFlag
{
unsigned int info_log_submitted_ : 1;
unsigned int gts_waiting_ : 1;
unsigned int state_log_submitting_ : 1;
unsigned int state_log_submitted_ : 1;
unsigned int prepare_notify_ : 1;
unsigned int force_abort_ : 1;
unsigned int transfer_blocking_ : 1;
void reset()
{
info_log_submitted_ = 0;
gts_waiting_ = 0;
state_log_submitting_ = 0;
state_log_submitted_ = 0;
prepare_notify_ = 0;
force_abort_ = 0;
transfer_blocking_ = 0;
}
bool is_valid() const
{
return info_log_submitted_ > 0 || gts_waiting_ > 0 || state_log_submitted_ > 0
|| state_log_submitting_ > 0 || prepare_notify_ > 0 || force_abort_ > 0
|| transfer_blocking_ > 0;
}
BitFlag() { reset(); }
} flag_;
};
class Ob2PCPrepareState
@ -1649,7 +1640,7 @@ private:
enum class PartCtxSource
{
UNKOWN = 0,
UNKNOWN = 0,
MVCC_WRITE = 1,
REGISTER_MDS = 2,
REPLAY = 3,
@ -1658,9 +1649,24 @@ enum class PartCtxSource
TRANSFER_RECOVER = 6,
};
static const char * to_str_ctx_source(const PartCtxSource & ctx_src)
{
const char * str = "INVALID";
switch(ctx_src)
{
TRX_ENUM_CASE_TO_STR(PartCtxSource, UNKNOWN);
TRX_ENUM_CASE_TO_STR(PartCtxSource, MVCC_WRITE);
TRX_ENUM_CASE_TO_STR(PartCtxSource, REGISTER_MDS);
TRX_ENUM_CASE_TO_STR(PartCtxSource, REPLAY);
TRX_ENUM_CASE_TO_STR(PartCtxSource, RECOVER);
TRX_ENUM_CASE_TO_STR(PartCtxSource, TRANSFER);
TRX_ENUM_CASE_TO_STR(PartCtxSource, TRANSFER_RECOVER);
}
return str;
}
bool is_transfer_ctx(PartCtxSource ctx_source);
const char *to_str(PartCtxSource src);
enum class RetainCause : int16_t
{
@ -1722,6 +1728,8 @@ private:
const ObTxCommitParts &commit_parts);
public:
DECLARE_ON_DEMAND_TO_STRING
DECLARE_TO_STRING {
// const_cast<ObIArray<uint64_t>>(checksum_).set_max_print_count(512);
// const_cast<ObIArray<share::SCN>>(checksum_scn_).set_max_print_count(512);
@ -1760,7 +1768,7 @@ public:
K_(exec_epoch),
K_(serial_final_scn),
K_(serial_final_seq_no),
K_(dli_batch_set));
K(dli_batch_set_.size()));
return pos;
}
ObTxState state_;

View File

@ -368,7 +368,7 @@ void ObPartTransCtx::default_init_()
mds_cache_.reset();
retain_ctx_func_ptr_ = nullptr;
create_ctx_scn_.reset();
ctx_source_ = PartCtxSource::UNKOWN;
ctx_source_ = PartCtxSource::UNKNOWN;
replay_completeness_.reset();
is_submitting_redo_log_for_freeze_ = false;
start_working_log_ts_ = SCN::min_scn();
@ -2211,6 +2211,21 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb)
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
const int64_t cur_ts = ObTimeUtility::current_time();
const int64_t LOG_CB_ON_SUCC_TIME_LIMIT = 100 * 1000;
share::SCN max_cost_cb_scn;
int64_t max_cost_cb_time = 0;
int64_t skip_on_succ_cnt = 0;
int64_t invoke_on_succ_cnt = 0;
int64_t invoke_on_succ_time = 0;
int64_t submit_record_log_time = 0;
int64_t fast_commit_time = 0;
int64_t on_succ_ctx_lock_hold_time = 0;
int64_t try_submit_next_log_cost_time = 0;
int64_t log_sync_used_time = 0;
int64_t ctx_lock_wait_time = 0;
bool handle_fast_commit = false;
bool try_submit_next_log = false;
bool need_return_log_cb = false;
@ -2218,10 +2233,10 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb)
// allow fill redo concurrently with log callback
CtxLockGuard guard(lock_, is_committing_() ? CtxLockGuard::MODE::ALL : CtxLockGuard::MODE::CTX);
const int64_t log_sync_used_time = cur_ts - log_cb->get_submit_ts();
log_sync_used_time = cur_ts - log_cb->get_submit_ts();
ObTransStatistic::get_instance().add_clog_sync_time(tenant_id_, log_sync_used_time);
ObTransStatistic::get_instance().add_clog_sync_count(tenant_id_, 1);
const int64_t ctx_lock_wait_time = guard.get_lock_acquire_used_time();
ctx_lock_wait_time = guard.get_lock_acquire_used_time();
if (log_sync_used_time + ctx_lock_wait_time >= ObServerConfig::get_instance().clog_sync_time_warn_threshold) {
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "transaction log sync use too much time", KPC(log_cb),
K(log_sync_used_time), K(ctx_lock_wait_time));
@ -2236,12 +2251,14 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb)
#endif
}
if (log_cb->is_callbacked()) {
skip_on_succ_cnt++;
#ifndef NDEBUG
TRANS_LOG(INFO, "cb has been callbacked", KPC(log_cb));
#endif
busy_cbs_.remove(log_cb);
return_log_cb_(log_cb);
} else if (is_exiting_) {
skip_on_succ_cnt++;
// the TxCtx maybe has been killed forcedly by background GC thread
// the log_cb process has been skipped
if (sub_state_.is_force_abort()) {
@ -2264,8 +2281,11 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb)
// process all preceding log_cbs
for (int64_t i = 0; i < busy_cbs_.get_size(); i++) {
if (cur_cb->is_callbacked()) {
skip_on_succ_cnt++;
// do nothing
} else {
invoke_on_succ_cnt++;
const int64_t before_invoke_ts = ObTimeUtility::fast_current_time();
if (OB_FAIL(on_success_ops_(cur_cb))) {
TRANS_LOG(ERROR, "invoke on_success_ops failed", K(ret), K(*this), K(*cur_cb));
if (OB_SUCCESS == save_ret) {
@ -2279,6 +2299,16 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb)
}
// ignore ret and set cur_cb callbacked
cur_cb->set_callbacked();
const int64_t after_invoke_ts = ObTimeUtility::fast_current_time();
if (after_invoke_ts - before_invoke_ts > max_cost_cb_time) {
max_cost_cb_time = after_invoke_ts - before_invoke_ts;
max_cost_cb_scn = log_cb->get_log_ts();
}
if (after_invoke_ts - before_invoke_ts > LOG_CB_ON_SUCC_TIME_LIMIT) {
TRANS_LOG(WARN, "invoke on_succ cost too much time", K(ret), K(trans_id_), K(ls_id_), K(cur_cb),
K(log_cb));
}
invoke_on_succ_time += (after_invoke_ts - before_invoke_ts);
}
if (cur_cb == log_cb) {
break;
@ -2297,23 +2327,29 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb)
// try submit record log under CtxLock
if (need_record_log_()) {
// ignore error
const int64_t before_submit_record_ts = ObTimeUtility::fast_current_time();
if (OB_SUCCESS != (tmp_ret = submit_record_log_())) {
TRANS_LOG(WARN, "failed to submit record log", K(tmp_ret), K(*this));
}
submit_record_log_time = ObTimeUtility::fast_current_time() - before_submit_record_ts;
}
handle_fast_commit = !(sub_state_.is_state_log_submitted() || log_cb->get_callbacks().count() == 0);
try_submit_next_log = !ObTxLogTypeChecker::is_state_log(log_cb->get_last_log_type()) && is_committing_();
busy_cbs_.remove(log_cb);
need_return_log_cb = true;
}
on_succ_ctx_lock_hold_time = ObTimeUtility::fast_current_time() - guard.get_hold_ts();
}
// let fast commit out of ctx's lock, because it is time consuming in calculating checksum
if (handle_fast_commit) {
// acquire REDO_FLUSH_READ LOCK, which allow other thread flush redo
// but disable other manage operation on ctx
// FIXME: acquire CTX's READ lock maybe better
const int64_t before_fast_commit_ts = ObTimeUtility::fast_current_time();
CtxLockGuard guard(lock_, CtxLockGuard::MODE::REDO_FLUSH_R);
mt_ctx_.remove_callbacks_for_fast_commit(log_cb->get_callbacks());
fast_commit_time = ObTimeUtility::fast_current_time() - before_fast_commit_ts;
}
if (need_return_log_cb) {
return_log_cb_(log_cb);
@ -2327,6 +2363,14 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb)
if (OB_SUCCESS != (tmp_ret = ls_tx_ctx_mgr_->revert_tx_ctx_without_lock(this))) {
TRANS_LOG(ERROR, "release ctx ref failed", KR(tmp_ret));
}
if (ObTimeUtility::fast_current_time() - cur_ts > LOG_CB_ON_SUCC_TIME_LIMIT) {
TRANS_LOG(WARN, "on_success cost too much time", K(ret), K(trans_id_), K(ls_id_),
K(max_cost_cb_scn), K(max_cost_cb_time), K(skip_on_succ_cnt), K(invoke_on_succ_cnt),
K(invoke_on_succ_time), K(submit_record_log_time), K(fast_commit_time),
K(on_succ_ctx_lock_hold_time), K(try_submit_next_log_cost_time), K(log_sync_used_time),
K(ctx_lock_wait_time));
}
return ret;
}
@ -3998,12 +4042,19 @@ int ObPartTransCtx::submit_direct_load_inc_log_(
TRANS_LOG(WARN, "serialize direct load log failed", K(ret), K(dli_log), K(replay_hint),
KPC(this));
} else if (OB_FAIL(dli_log_op(ObTxLogOpType::SUBMIT))) {
TRANS_LOG(WARN, "try to submit direct load inc log failed", K(ret), KPC(this));
if (ret == OB_TX_NOLOGCB) {
if (REACH_COUNT_PER_SEC(10) && REACH_TIME_INTERVAL(100 * 1000)) {
TRANS_LOG(INFO, "no log cb with dli log", KR(ret), K(dli_log_type), K(batch_key),
KPC(busy_cbs_.get_first()));
}
} else {
TRANS_LOG(WARN, "try to submit direct load inc log failed", K(ret), KPC(this));
}
} else {
scn = dli_log_op.get_scn();
}
TRANS_LOG(INFO, "<ObTxDirectLoadIncLog> submit direct load inc log", K(ret), K(get_trans_id()),
TRANS_LOG(DEBUG, "<ObTxDirectLoadIncLog> submit direct load inc log", K(ret), K(get_trans_id()),
K(get_ls_id()), K(dli_log), K(dli_log_type), K(batch_key), K(replay_barrier),
K(replay_hint), K(scn));
return ret;
@ -4048,18 +4099,22 @@ int ObPartTransCtx::check_dli_batch_completed_(ObTxLogType submit_log_type)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
const int64_t start_ts = ObTimeUtility::fast_current_time();
if (exec_info_.dli_batch_set_.size() > 0) {
int64_t need_ddl_end_count = 0;
int64_t compensate_ddl_end_fail_cnt = 0;
const int64_t MAX_COMPENSATE_FAIL_CNT = 100;
ObTxDirectLoadBatchKeyArray unused_batch_key_array;
for (ObDLIBatchSet::iterator iter = exec_info_.dli_batch_set_.begin(); iter != exec_info_.dli_batch_set_.end();
iter++) {
for (ObDLIBatchSet::iterator iter = exec_info_.dli_batch_set_.begin();
iter != exec_info_.dli_batch_set_.end(); iter++) {
if (iter->first.need_compensate_ddl_end()) {
TRANS_LOG(INFO, "<ObTxDirectLoadIncLog> need compensate ddl end log", K(ret), K(tmp_ret),
TRANS_LOG(DEBUG, "<ObTxDirectLoadIncLog> need compensate ddl end log", K(ret), K(tmp_ret),
K(submit_log_type), K(trans_id_), K(ls_id_), K(iter->first));
if (ObTxLogType::TX_ABORT_LOG == submit_log_type
|| ObTxLogType::TX_COMMIT_INFO_LOG == submit_log_type) {
if (compensate_ddl_end_fail_cnt <= MAX_COMPENSATE_FAIL_CNT
&& (ObTxLogType::TX_ABORT_LOG == submit_log_type
|| ObTxLogType::TX_COMMIT_INFO_LOG == submit_log_type)) {
ObDDLIncCommitLog inc_commit_log;
ObDDLIncCommitClogCb *extra_cb = static_cast<ObDDLIncCommitClogCb *>(
mtl_malloc(sizeof(ObDDLIncCommitClogCb), "TxExtraCb"));
@ -4076,7 +4131,8 @@ int ObPartTransCtx::check_dli_batch_completed_(ObTxLogType submit_log_type)
if (OB_TMP_FAIL(ret)) {
// do nothing
} else if (OB_TMP_FAIL(extra_cb->init(ls_id_, iter->first.get_batch_key()))) {
TRANS_LOG(WARN, "init extra cb failed", K(ret), K(iter->first), K(trans_id_), K(ls_id_));
TRANS_LOG(WARN, "init extra cb failed", K(ret), K(iter->first), K(trans_id_),
K(ls_id_));
extra_cb->~ObDDLIncCommitClogCb();
mtl_free(extra_cb);
} else if (OB_TMP_FAIL(inc_commit_log.init(iter->first.get_batch_key()))) {
@ -4084,23 +4140,29 @@ int ObPartTransCtx::check_dli_batch_completed_(ObTxLogType submit_log_type)
K(trans_id_), K(ls_id_));
extra_cb->~ObDDLIncCommitClogCb();
mtl_free(extra_cb);
} else if (OB_TMP_FAIL(
submit_direct_load_inc_commit_log(inc_commit_log, extra_cb, submitted_scn, true))) {
TRANS_LOG(WARN, "submit direct load inc commit log failed", K(ret), K(inc_commit_log),
KPC(extra_cb), K(submitted_scn), K(trans_id_), K(ls_id_));
} else if (OB_TMP_FAIL(submit_direct_load_inc_commit_log(inc_commit_log, extra_cb,
submitted_scn, true))) {
if (tmp_ret != OB_TX_NOLOGCB) {
TRANS_LOG(WARN, "submit direct load inc commit log failed", K(ret), K(inc_commit_log),
KPC(extra_cb), K(submitted_scn), K(trans_id_), K(ls_id_));
}
extra_cb->~ObDDLIncCommitClogCb();
mtl_free(extra_cb);
} else {
}
}
if (OB_TMP_FAIL(tmp_ret)) {
compensate_ddl_end_fail_cnt += 1;
}
if (iter->first.need_compensate_ddl_end()) {
need_ddl_end_count += 1;
}
} else if (iter->first.is_ddl_start_logging()) {
need_ddl_end_count += 1;
TRANS_LOG(INFO, "the DDL inc start is logging", K(ret), K(tmp_ret), K(iter->first), K(trans_id_),
K(ls_id_));
TRANS_LOG(INFO, "the DDL inc start is logging", K(ret), K(tmp_ret), K(iter->first),
K(trans_id_), K(ls_id_));
} else if (iter->first.is_ddl_end_logging()) {
TRANS_LOG(INFO, "the DDL inc end is logging", K(ret), K(tmp_ret), K(iter->first),
K(trans_id_), K(ls_id_));
@ -4125,16 +4187,17 @@ int ObPartTransCtx::check_dli_batch_completed_(ObTxLogType submit_log_type)
&& (get_downstream_state() >= ObTxState::REDO_COMPLETE
|| submit_log_type != ObTxLogType::TX_CLEAR_LOG)) {
ret = OB_TRANS_NEED_ROLLBACK;
TRANS_LOG(ERROR, "<ObTxDirectLoadIncLog> incompleted direct load inc batch info with state log",
TRANS_LOG(ERROR,
"<ObTxDirectLoadIncLog> incompleted direct load inc batch info with state log",
K(ret), K(tmp_ret), K(submit_log_type), K(trans_id_), K(ls_id_),
K(need_ddl_end_count));
K(need_ddl_end_count), K(ObTimeUtility::fast_current_time() - start_ts));
} else {
ret = OB_TRANS_CANNOT_BE_KILLED;
TRANS_LOG(WARN,
"<ObTxDirectLoadIncLog> The trx can not be finished because of a incompleted "
"direct load inc batch info",
K(ret), K(tmp_ret), K(submit_log_type), K(trans_id_), K(ls_id_),
K(need_ddl_end_count));
K(need_ddl_end_count), K(ObTimeUtility::fast_current_time() - start_ts));
}
}
}
@ -7541,8 +7604,9 @@ int ObPartTransCtx::notify_data_source_(const NotifyType notify_type,
TRANS_LOG(WARN, "notify data source failed", K(ret), K(arg));
}
if (notify_array.count() > 0) {
TRANS_LOG(INFO, "notify MDS", K(ret), K(trans_id_), K(ls_id_), K(notify_type), K(log_ts),
K(notify_array.count()), K(notify_array), K(total_time));
TRANS_LOG(INFO, "notify MDS", K(ret), K(trans_id_), K(ls_id_), "notify_type",
to_str_notify_type(notify_type), K(log_ts), K(notify_array.count()), K(notify_array),
K(total_time));
}
}
return ret;

View File

@ -287,36 +287,32 @@ public:
int64_t to_string(char* buf, const int64_t buf_len) const;
private:
// thread unsafe
TO_STRING_KV_(K_(ls_id),
K_(session_id),
K_(part_trans_action),
K_(pending_write),
K_(exec_info),
K_(sub_state),
K(is_leaf()),
K(is_root()),
K(busy_cbs_.get_size()),
K(final_log_cb_),
K(ctx_tx_data_),
K(role_state_),
K(create_ctx_scn_),
"ctx_source", ctx_source_,
K(epoch_),
K(replay_completeness_),
K(mt_ctx_),
K(coord_prepare_info_arr_),
K_(upstream_state),
K_(retain_cause),
"2pc_role",
get_2pc_role(),
K_(collected),
K_(rec_log_ts),
K_(prev_rec_log_ts),
K_(lastest_snapshot),
K_(state_info_array),
K_(last_request_ts),
KP_(block_frozen_memtable),
K_(max_2pc_commit_scn));
ON_DEMAND_TO_STRING_KV_("self_ls_id",
ls_id_,
K_(session_id),
K_(part_trans_action),
K_(pending_write),
"2pc_role",
to_str_2pc_role(get_2pc_role()),
K(ctx_tx_data_),
K(role_state_),
K(create_ctx_scn_),
"ctx_source",
ctx_source_,
K(epoch_),
K(replay_completeness_),
"upstream_state",
to_str_tx_state(upstream_state_),
K_(collected),
K_(rec_log_ts),
K_(prev_rec_log_ts),
K_(lastest_snapshot),
K_(last_request_ts),
KP_(block_frozen_memtable),
K_(max_2pc_commit_scn),
K(mt_ctx_));
public:
static const int64_t OP_LOCAL_NUM = 16;
static const int64_t RESERVED_MEM_SIZE = 256;
@ -508,7 +504,7 @@ public:
ATOMIC_CAS(&retain_cause_, static_cast<int16_t>(RetainCause::UNKOWN),
static_cast<int16_t>(cause));
}
RetainCause get_retain_cause() { return static_cast<RetainCause>(ATOMIC_LOAD(&retain_cause_)); };
RetainCause get_retain_cause() const { return static_cast<RetainCause>(ATOMIC_LOAD(&retain_cause_)); };
int del_retain_ctx();

View File

@ -1179,7 +1179,7 @@ int ObTxMultiDataSourceLog::ob_admin_dump(ObAdminMutatorStringArg &arg)
arg.writer_ptr_->start_object();
for (int64_t i = 0; i < data_.count(); i++) {
arg.writer_ptr_->dump_key("type");
arg.writer_ptr_->dump_string(to_cstring(static_cast<int64_t>(data_[i].get_data_source_type())));
arg.writer_ptr_->dump_string(to_str_mds_type(data_[i].get_data_source_type()));
arg.writer_ptr_->dump_key("buf_len");
arg.writer_ptr_->dump_string(to_cstring(data_[i].get_data_size()));
arg.writer_ptr_->dump_key("content");

View File

@ -435,7 +435,9 @@ OB_INLINE int ObTxCtxLogOperator<T>::operator()(const ObTxLogOpType op_type)
if (OB_FAIL(prepare_special_resource_())) {
TRANS_LOG(WARN, "prepare special resource failed", K(ret), K(T::LOG_TYPE), KPC(this));
} else if (OB_FAIL(prepare_generic_resource_())) {
TRANS_LOG(WARN, "prepare generic resource failed", K(ret), K(T::LOG_TYPE), KPC(this));
if (OB_TX_NOLOGCB != ret) {
TRANS_LOG(WARN, "prepare generic resource failed", K(ret), K(T::LOG_TYPE), KPC(this));
}
} else if (OB_FAIL(construct_log_object_())) {
TRANS_LOG(WARN, "construct log object failed", K(ret), K(T::LOG_TYPE), KPC(this));
} else if (OB_FAIL(insert_into_log_block_())) {
@ -560,7 +562,7 @@ OB_INLINE void ObTxCtxLogOperator<ObTxDirectLoadIncLog>::after_submit_log_succ_(
log_op_arg_.submit_arg_.log_cb_->set_ddl_batch_key(construct_arg_->batch_key_);
log_op_arg_.submit_arg_.log_cb_->get_extra_cb()->__set_scn(scn_);
}
TRANS_LOG(INFO, "<ObTxDirectLoadIncLog> after submit log succ", K(ret), KPC(this));
TRANS_LOG(DEBUG, "<ObTxDirectLoadIncLog> after submit log succ", K(ret), KPC(this));
}
template <>
@ -590,7 +592,7 @@ OB_INLINE int ObTxCtxLogOperator<ObTxDirectLoadIncLog>::log_sync_succ_()
TRANS_LOG(WARN, "invoke the on_success of a extra_cb_ failed", K(ret),
KPC(log_op_arg_.submit_arg_.log_cb_));
} else {
TRANS_LOG(INFO, "<ObTxDirectLoadIncLog> sync log succ", K(ret), KPC(this));
TRANS_LOG(DEBUG, "<ObTxDirectLoadIncLog> sync log succ", K(ret), KPC(this));
}
return ret;
@ -691,9 +693,10 @@ OB_INLINE int ObTxCtxLogOperator<ObTxDirectLoadIncLog>::replay_out_ctx_()
}
}
TRANS_LOG(INFO, "<ObTxDirectLoadIncLog> replay out ctx", K(ret), KPC(this), K(log_object_ptr_),
K(log_op_arg_.replay_arg_));
if (OB_FAIL(ret)) {
TRANS_LOG(INFO, "<ObTxDirectLoadIncLog> replay out ctx", K(ret), KPC(this), K(log_object_ptr_),
K(log_op_arg_.replay_arg_));
}
return ret;
}
@ -703,7 +706,7 @@ OB_INLINE int ObTxCtxLogOperator<ObTxDirectLoadIncLog>::replay_fail_out_ctx_()
int ret = OB_SUCCESS;
// TODO direct_load_inc
// replay data into ddl kv?
TRANS_LOG(INFO, "<ObTxDirectLoadIncLog> replay fail out ctx", K(ret));
TRANS_LOG(DEBUG, "<ObTxDirectLoadIncLog> replay fail out ctx", K(ret));
return ret;
}
@ -767,8 +770,10 @@ OB_INLINE int ObTxCtxLogOperator<ObTxDirectLoadIncLog>::replay_in_ctx_()
}
}
TRANS_LOG(INFO, "<ObTxDirectLoadIncLog> replay in ctx", K(ret), KPC(this), K(log_object_ptr_),
K(log_op_arg_.replay_arg_));
if (OB_FAIL(ret)) {
TRANS_LOG(INFO, "<ObTxDirectLoadIncLog> replay in ctx", K(ret), KPC(this), K(log_object_ptr_),
K(log_op_arg_.replay_arg_));
}
return ret;
}

View File

@ -0,0 +1,130 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "storage/tx/ob_trans_part_ctx.h"
namespace oceanbase
{
namespace transaction
{
IMPL_ON_DEMAND_PRINT_FUNC(ObTxSubState)
{
int ret = OB_SUCCESS;
int tmp_pos = 0;
// if (flag_.is_valid()) {
ON_DEMAND_START_PRINT(SubState);
TX_KV_PRINT_WITH_ERR(flag_.info_log_submitted_ > 0, info_log_submitted,
flag_.info_log_submitted_, " ");
TX_KV_PRINT_WITH_ERR(flag_.gts_waiting_ > 0, gts_waiting, flag_.gts_waiting_, " ");
TX_KV_PRINT_WITH_ERR(flag_.state_log_submitting_ > 0, state_log_submitting,
flag_.state_log_submitting_, " ");
TX_KV_PRINT_WITH_ERR(flag_.state_log_submitted_ > 0, state_log_submitted,
flag_.state_log_submitted_, " ");
TX_KV_PRINT_WITH_ERR(flag_.prepare_notify_ > 0, prepare_notify, flag_.prepare_notify_, " ");
TX_KV_PRINT_WITH_ERR(flag_.force_abort_ > 0, force_abort, flag_.force_abort_, " ");
TX_KV_PRINT_WITH_ERR(flag_.transfer_blocking_ > 0, transfer_blocking, flag_.transfer_blocking_, " ");
ON_DEMAND_END_PRINT(SubState);
// }
return ret;
}
IMPL_ON_DEMAND_PRINT_FUNC(ObTxExecInfo)
{
int ret = OB_SUCCESS;
int tmp_pos = 0;
ON_DEMAND_START_PRINT(ExecInfo);
TX_KV_PRINT_WITH_ERR(true, downstream_state, state_, ", ");
TX_KV_PRINT_WITH_ERR(true, upstream, upstream_, ", ");
TX_KV_PRINT_WITH_ERR(true, participants,participants_, ", ");
TX_KV_PRINT_WITH_ERR(true, redo_log_no, redo_lsns_.count(), ", ");
TX_KV_PRINT_WITH_ERR(true, scheduler, scheduler_, ", ");
TX_KV_PRINT_WITH_ERR(true, prepare_version, prepare_version_, ", ");
TX_KV_PRINT_WITH_ERR(true, trans_type, trans_type_, ", ");
TX_KV_PRINT_WITH_ERR(true, next_log_entry_no, next_log_entry_no_, ", ");
TX_KV_PRINT_WITH_ERR(true, max_applied_log_ts, max_applied_log_ts_, ", ");
TX_KV_PRINT_WITH_ERR(true, max_appling_log_ts, max_applying_log_ts_, ", ");
TX_KV_PRINT_WITH_ERR(true, max_applying_part_log_no, max_applying_part_log_no_, ", ");
TX_KV_PRINT_WITH_ERR(true, max_submitted_seq_no, max_submitted_seq_no_, ", ");
TX_KV_PRINT_WITH_ERR(true, checksum, checksum_, ", ");
TX_KV_PRINT_WITH_ERR(true, checksum_scn, checksum_scn_, ", ");
TX_KV_PRINT_WITH_ERR(true, need_checksum, need_checksum_, ", ");
TX_KV_PRINT_WITH_ERR(true, data_complete, data_complete_, ", ");
TX_KV_PRINT_WITH_ERR(true, is_dup_tx, is_dup_tx_, ", ");
TX_KV_PRINT_WITH_ERR(true, exec_epoch, exec_epoch_, ", ");
TX_KV_PRINT_WITH_ERR(!incremental_participants_.empty(),incremental_participants, incremental_participants_, ", ");
TX_KV_PRINT_WITH_ERR(!intermediate_participants_.empty(), intermediate_participants, intermediate_participants_, ", ");
TX_KV_PRINT_WITH_ERR(prev_record_lsn_.is_valid(), prev_record_lsn, prev_record_lsn_, ", ");
TX_KV_PRINT_WITH_ERR(!redo_lsns_.empty(), redo_lsns, redo_lsns_, ", ");
TX_KV_PRINT_WITH_ERR(!multi_data_source_.empty(), multi_data_source, multi_data_source_, ", ");
TX_KV_PRINT_WITH_ERR(max_durable_lsn_.is_valid(),max_durable_lsn , max_durable_lsn_, ", ");
TX_KV_PRINT_WITH_ERR(!prepare_log_info_arr_.empty(),prepare_log_info_arr , prepare_log_info_arr_, ", ");
TX_KV_PRINT_WITH_ERR(!xid_.empty(), xid , xid_, ", ");
TX_KV_PRINT_WITH_ERR(is_sub2pc_, is_sub2pc , is_sub2pc_, ", ");
TX_KV_PRINT_WITH_ERR(is_transfer_blocking_, is_transfer_blocking , is_transfer_blocking_, ", ");
TX_KV_PRINT_WITH_ERR(!commit_parts_.empty(), commit_parts , commit_parts_, ", ");
TX_KV_PRINT_WITH_ERR(!transfer_parts_.empty(), transfer_parts, transfer_parts_, ", ");
TX_KV_PRINT_WITH_ERR(is_empty_ctx_created_by_transfer_, is_empty_ctx_created_by_transfer, is_empty_ctx_created_by_transfer_, ", ");
TX_KV_PRINT_WITH_ERR(serial_final_scn_.is_valid(), serial_final_scn, serial_final_scn_, ", ");
TX_KV_PRINT_WITH_ERR(serial_final_seq_no_.is_valid(), serial_final_seq_no,serial_final_seq_no_, ", ");
TX_KV_PRINT_WITH_ERR(!dli_batch_set_.empty(), dli_batch_count, dli_batch_set_.size(), ", ");
TX_KV_PRINT_WITH_ERR(!dli_batch_set_.empty(), dli_batch_set, dli_batch_set_, " ");
ON_DEMAND_END_PRINT(ExecInfo);
return ret;
}
IMPL_ON_DEMAND_PRINT_FUNC(ObPartTransCtx)
{
int ret = OB_SUCCESS;
int tmp_pos = 0;
ON_DEMAND_START_PRINT(TxCtxExtra);
TX_KV_PRINT_WITH_ERR(!busy_cbs_.is_empty(), busy_cbs_cnt, busy_cbs_.get_size(), ", ");
TX_KV_PRINT_WITH_ERR(!busy_cbs_.is_empty(), oldest_busy_cb, busy_cbs_.get_first(), ", ");
TX_KV_PRINT_WITH_ERR(final_log_cb_.is_valid() && !final_log_cb_.is_callbacked(), final_log_cb,
final_log_cb_, ", ");
TX_PRINT_FUNC_WITH_ERR(sub_state_.is_valid(), sub_state_.on_demand_print_, ", ");
TX_KV_PRINT_WITH_ERR(!coord_prepare_info_arr_.empty(), coord_prepare_info_arr_,
coord_prepare_info_arr_, ", ");
TX_KV_PRINT_WITH_ERR(get_retain_cause() != RetainCause::UNKOWN, retain_cause, retain_cause_, ", ");
TX_KV_PRINT_WITH_ERR(!state_info_array_.empty(), state_info_array, state_info_array_, ", ");
// TX_KV_PRINT_WITH_ERR(OB_NOT_NULL(block_frozen_memtable_), block_frozen_memtable,
// block_frozen_memtable_, ", ");
//
TX_PRINT_FUNC_WITH_ERR(true,
exec_info_.on_demand_print_, " ");
ON_DEMAND_END_PRINT(TxCtxExtra);
return ret;
}
} // namespace transaction
} // namespace oceanbase

View File

@ -0,0 +1,72 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_TRANSACTION_OB_TX_ON_DEMAND_PRINT_HEADER
#define OCEANBASE_TRANSACTION_OB_TX_ON_DEMAND_PRINT_HEADER
namespace oceanbase
{
namespace transaction
{
#define IMPL_ON_DEMAND_PRINT_FUNC(ClassName) \
int ClassName::on_demand_print_(char *buf, const int64_t buf_len, int64_t &pos) const
#define DECLARE_ON_DEMAND_TO_STRING \
int on_demand_print_(char *buf, const int64_t buf_len, int64_t &pos) const;
#define ON_DEMAND_TO_STRING_KV_(args...) DEFINE_ON_DEMAND_TO_STRING_(J_KV(args))
#define DEFINE_ON_DEMAND_TO_STRING_(body) \
DECLARE_ON_DEMAND_TO_STRING \
DECLARE_TO_STRING_ \
{ \
int64_t pos = 0; \
J_OBJ_START(); \
body; \
on_demand_print_(buf, buf_len, pos); \
J_OBJ_END(); \
return pos; \
}
#define DECLARE_TO_STRING_ int64_t to_string_(char *buf, const int64_t buf_len) const
#define OBJ_TO_STR(obj) #obj
#define ON_DEMAND_START_PRINT(prefix_name) \
BUF_PRINTF(" " OBJ_TO_STR(<prefix_name>-{))
#define ON_DEMAND_END_PRINT(postfix_name) \
BUF_PRINTF(OBJ_TO_STR(}-<postfix_name>))
#define TX_KV_PRINT_WITH_ERR(print_condition, name, obj, separator) \
if (print_condition) { \
int tmp_ret = OB_SUCCESS; \
int tmp_pos = pos; \
if (OB_TMP_FAIL(common::databuff_print_json_kv(buf, buf_len, pos, #name, obj))) { \
(void)common::databuff_print_kv(buf, buf_len, pos, #name, tmp_ret); \
} \
BUF_PRINTF(separator); \
}
#define TX_PRINT_FUNC_WITH_ERR(print_condition, func, separator) \
if (print_condition) { \
int tmp_ret = OB_SUCCESS; \
int tmp_pos = pos; \
if (OB_TMP_FAIL(func(buf, buf_len, pos))) { \
(void)common::databuff_print_kv(buf, buf_len, pos, #func, tmp_ret); \
} \
BUF_PRINTF(separator); \
}
} // namespace transaction
} // namespace oceanbase
#endif

View File

@ -14,6 +14,7 @@
#include <thread>
#define private public
#define protected public
#include "storage/tx/ob_committer_define.h"
#include "storage/tx/ob_multi_data_source.h"
#include "storage/tx/ob_trans_define.h"
#include "storage/tx/ob_trans_part_ctx.h"