diff --git a/deps/oblib/src/lib/ob_name_id_def.h b/deps/oblib/src/lib/ob_name_id_def.h index e0818b3ac2..0640a0edf1 100644 --- a/deps/oblib/src/lib/ob_name_id_def.h +++ b/deps/oblib/src/lib/ob_name_id_def.h @@ -229,7 +229,7 @@ DEF_NAME(id, "id") DEF_NAME(submit_log_cb, "submit log callback") DEF_NAME(submit_commit, "submit commit") DEF_NAME(submit_abort, "submit abort") - DEF_NAME(update_trans_version, "update transaction version") + DEF_NAME(push_max_commit_version, "push max commit version") DEF_NAME(prepare, "prepare") DEF_NAME(ctx_ref, "transaction context reference") DEF_NAME(on_submit_log_succ_cb, "on submit log succ cb") diff --git a/src/storage/memtable/ob_memtable_context.cpp b/src/storage/memtable/ob_memtable_context.cpp index 63fc9c9289..9e203a2c54 100644 --- a/src/storage/memtable/ob_memtable_context.cpp +++ b/src/storage/memtable/ob_memtable_context.cpp @@ -247,16 +247,16 @@ int ObMemtableCtx::write_auth(const bool exclusive) do { if (ATOMIC_LOAD(&is_read_only_)) { ret = OB_ERR_READ_ONLY_TRANSACTION; - TRANS_LOG(ERROR, "WriteAuth fail: readonly trans not support update operation", - "trans_id", NULL == ctx_ ? "" : S(ctx_->get_trans_id()), K(ret)); + TRANS_LOG(ERROR, "WriteAuth: readonly trans not support update operation", + "trans_id", ctx_->get_trans_id(), "ls_id", ctx_->get_ls_id(), K(ret)); } else if (OB_SUCCESS != ATOMIC_LOAD(&end_code_)) { ret = get_trans_status_retcode(); TRANS_LOG(WARN, "WriteAuth: trans is already end", K(ret), - "trans_id", NULL == ctx_ ? "" : S(ctx_->get_trans_id()), K_(end_code)); + "trans_id", ctx_->get_trans_id(), "ls_id", ctx_->get_ls_id(), K_(end_code)); } else if (!ATOMIC_LOAD(&is_master_)) { ret = OB_NOT_MASTER; TRANS_LOG(WARN, "WriteAuth: trans is already not master", - "trans_id", NULL == ctx_ ? "" : S(ctx_->get_trans_id()), K(ret)); + "trans_id", ctx_->get_trans_id(), "ls_id", ctx_->get_ls_id(), K(ret)); } else if (lock_succ) { // all check passed after lock succ break; diff --git a/src/storage/tx/ob_trans_ctx.cpp b/src/storage/tx/ob_trans_ctx.cpp index a33519cd39..43f8340219 100644 --- a/src/storage/tx/ob_trans_ctx.cpp +++ b/src/storage/tx/ob_trans_ctx.cpp @@ -132,9 +132,9 @@ void ObTransCtx::print_trace_log_if_necessary_() } if (is_slow_query_()) { - static ObMiniStat::ObStatItem item("slow trans statistics", 60 * 1000 * 1000); + static ObMiniStat::ObStatItem item("long trans statistics", 60 * 1000 * 1000); ObMiniStat::stat(item); - FORCE_PRINT_TRACE(tlog_, "[slow trans] "); + FORCE_PRINT_TRACE(tlog_, "[long trans] "); } else if (OB_UNLIKELY(trans_id_ % 128 == 1)) { FORCE_PRINT_TRACE(tlog_, "[trans sampling] "); } else { @@ -197,7 +197,15 @@ ObITsMgr *ObTransCtx::get_ts_mgr_() return trans_service_->get_ts_mgr(); } -int ObTransCtx::defer_commit_callback_(const int retcode, const int64_t commit_version) + +bool ObTransCtx::has_callback_scheduler_() +{ + return !commit_cb_.is_enabled() // callback scheduler has accomplished by others + || commit_cb_.is_inited(); // callback has been defered +} + +// callback scheduler commit result +int ObTransCtx::defer_callback_scheduler_(const int retcode, const int64_t commit_version) { int ret = OB_SUCCESS; if (!commit_cb_.is_enabled()) { @@ -250,9 +258,10 @@ int ObTransCtx::register_timeout_task_(const int64_t interval_us) (void)ls_tx_ctx_mgr_->revert_tx_ctx_without_lock(this); } } - REC_TRANS_TRACE_EXT2(tlog_, register_timeout_task, OB_ID(ret), ret, - OB_ID(ctx_ref), get_ref()); - + if (OB_FAIL(ret)) { + REC_TRANS_TRACE_EXT2(tlog_, register_timeout_task, OB_ID(ret), ret, + OB_ID(ctx_ref), get_ref()); + } return ret; } @@ -277,9 +286,10 @@ int ObTransCtx::unregister_timeout_task_() (void)ls_tx_ctx_mgr_->revert_tx_ctx_without_lock(this); } } - REC_TRANS_TRACE_EXT2(tlog_, unregister_timeout_task, OB_ID(ret), ret, - OB_ID(ctx_ref), get_ref()); - + if (OB_FAIL(ret)) { + REC_TRANS_TRACE_EXT2(tlog_, unregister_timeout_task, OB_ID(ret), ret, + OB_ID(ctx_ref), get_ref()); + } return ret; } diff --git a/src/storage/tx/ob_trans_ctx.h b/src/storage/tx/ob_trans_ctx.h index abb4d9dc53..4510c5072c 100644 --- a/src/storage/tx/ob_trans_ctx.h +++ b/src/storage/tx/ob_trans_ctx.h @@ -115,7 +115,7 @@ public: void get_ctx_guard(CtxLockGuard &guard); void print_trace_log(); // ATTENTION! There is no lock protect - bool is_too_slow_transaction() const + bool is_too_long_transaction() const { return ObClockGenerator::getRealClock() >= ctx_create_time_ + OB_TRANS_WARN_USE_TIME; } bool is_readonly() const { return false; } void set_for_replay(const bool for_replay) { for_replay_ = for_replay; } @@ -196,7 +196,8 @@ protected: void set_stc_by_now_(); MonotonicTs get_stc_(); ObITsMgr *get_ts_mgr_(); - int defer_commit_callback_(const int ret, const int64_t commit_version); + bool has_callback_scheduler_(); + int defer_callback_scheduler_(const int ret, const int64_t commit_version); int64_t get_remaining_wait_interval_us_() { return trans_need_wait_wrap_.get_remaining_wait_interval_us(); diff --git a/src/storage/tx/ob_trans_functor.h b/src/storage/tx/ob_trans_functor.h index 4646605cfe..b7e8c177f9 100644 --- a/src/storage/tx/ob_trans_functor.h +++ b/src/storage/tx/ob_trans_functor.h @@ -717,7 +717,7 @@ public: if (ObTxState::INIT < tx_ctx->exec_info_.state_) { has_decided = true; } - if (tx_ctx->is_too_slow_transaction()) { + if (tx_ctx->is_too_long_transaction()) { // If the transaction has not completed in 600 seconds, print its trace log tx_ctx->print_trace_log(); } diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 9b86669319..e1f7b670fe 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -635,6 +635,7 @@ int ObPartTransCtx::commit(const ObLSArray &parts, case ObTxState::ABORT: ret = OB_TRANS_KILLED; break; + case ObTxState::PRE_COMMIT: case ObTxState::COMMIT: case ObTxState::CLEAR: ret = OB_TRANS_COMMITED; @@ -866,7 +867,7 @@ int ObPartTransCtx::update_publish_version_(const int64_t publish_version, const TRANS_LOG(WARN, "set commit version failed", K(ret)); } else { trans_service_->get_tx_version_mgr().update_max_commit_ts(publish_version, false); - REC_TRANS_TRACE_EXT2(tlog_, update_trans_version, OB_ID(trans_version), publish_version, + REC_TRANS_TRACE_EXT2(tlog_, push_max_commit_version, OB_ID(trans_version), publish_version, OB_ID(ctx_ref), get_ref()); } @@ -1846,7 +1847,7 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb) } REC_TRANS_TRACE_EXT(tlog_, log_sync_succ_cb, OB_ID(ret), ret, - Y(log_type), + OB_ID(log_type), (void*)log_type, OB_ID(t), log_ts, OB_ID(offset), log_lsn, OB_ID(ctx_ref), get_ref()); @@ -1974,7 +1975,7 @@ int ObPartTransCtx::on_failure(ObTxLogCb *log_cb) } if (need_callback_scheduler_()) { int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(defer_commit_callback_(OB_TRANS_KILLED, -1))) { + if (OB_TMP_FAIL(defer_callback_scheduler_(OB_TRANS_KILLED, -1))) { TRANS_LOG(WARN, "notify scheduler txn killed fail", K(tmp_ret), K_(trans_id)); } else { commit_cb_.disable(); @@ -1985,7 +1986,7 @@ int ObPartTransCtx::on_failure(ObTxLogCb *log_cb) } REC_TRANS_TRACE_EXT(tlog_, on_fail_cb, OB_ID(ret), ret, - Y(log_type), + OB_ID(log_type), (void*)log_type, OB_ID(t), log_ts, OB_ID(ctx_ref), get_ref()); TRANS_LOG(INFO, "ObPartTransCtx::on_failure end", KR(ret), K(*this), KPC(log_cb)); diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index a17e40fb98..f3dd76b555 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -1047,13 +1047,13 @@ int ObTransService::acquire_tx_ctx(const share::ObLSID &ls_id, const ObTxDesc &t TX_PARTS_CONTAIN_(tx.parts_, id_, ls_id, exist); if (exist) { if (OB_FAIL(get_tx_ctx_(ls_id, ls, tx.tx_id_, ctx))) { - TRANS_LOG(WARN, "get tx ctx fail", K(ret), K(tx)); + TRANS_LOG(WARN, "get tx ctx fail", K(ret), K(ls_id), K(tx)); if (ret == OB_TRANS_CTX_NOT_EXIST) { - TRANS_LOG(WARN, "participant lost update", K_(tx.tx_id)); + TRANS_LOG(WARN, "participant lost update", K(ls_id), K_(tx.tx_id)); } } } else if (OB_FAIL(create_tx_ctx_(ls_id, ls, tx, ctx))) { - TRANS_LOG(WARN, "create tx ctx fail", K(ret), K(tx)); + TRANS_LOG(WARN, "create tx ctx fail", K(ret), K(ls_id), K(tx)); } TRANS_LOG(TRACE, "acquire tx ctx", K(ret), K(*this), K(ls_id), K(tx), KP(ctx)); return ret; diff --git a/src/storage/tx/ob_tx_2pc_ctx_impl.cpp b/src/storage/tx/ob_tx_2pc_ctx_impl.cpp index cbba00ef08..0246d237ff 100644 --- a/src/storage/tx/ob_tx_2pc_ctx_impl.cpp +++ b/src/storage/tx/ob_tx_2pc_ctx_impl.cpp @@ -127,7 +127,7 @@ int ObPartTransCtx::do_pre_commit(bool &need_wait) } } - if (OB_FAIL(update_local_max_commit_version_(ctx_tx_data_.get_commit_version()))) { + if (!need_wait && OB_FAIL(update_local_max_commit_version_(ctx_tx_data_.get_commit_version()))) { TRANS_LOG(ERROR, "update publish version failed", KR(ret), KPC(this)); } if (OB_SUCC(ret) && OB_FAIL(restart_2pc_trans_timer_())) { diff --git a/src/storage/tx/ob_tx_2pc_msg_handler.cpp b/src/storage/tx/ob_tx_2pc_msg_handler.cpp index d08d47deca..19cd3e8152 100644 --- a/src/storage/tx/ob_tx_2pc_msg_handler.cpp +++ b/src/storage/tx/ob_tx_2pc_msg_handler.cpp @@ -846,18 +846,20 @@ ObTwoPhaseCommitMsgType ObPartTransCtx::switch_msg_type_(const int16_t msg_type) int ObPartTransCtx::post_tx_commit_resp_(const int status) { int ret = OB_SUCCESS; + bool has_skip = false, use_rpc = true; const auto commit_version = ctx_tx_data_.get_commit_version(); // scheduler on this server, direct call if (exec_info_.scheduler_ == addr_) { - if (need_callback_scheduler_()) { - if (OB_FAIL(defer_commit_callback_(status, commit_version))) { + use_rpc = false; + if (!has_callback_scheduler_()) { + if (OB_FAIL(defer_callback_scheduler_(status, commit_version))) { TRANS_LOG(WARN, "report tx commit result fail", K(ret), K(status), KPC(this)); } else { #ifndef NDEBUG TRANS_LOG(INFO, "report tx commit result to local scheduler succeed", K(status), KP(this)); #endif } - } + } else { has_skip = true; } } else { ObTxCommitRespMsg msg; build_tx_common_msg_(SCHEDULER_LS, msg); @@ -872,7 +874,10 @@ int ObPartTransCtx::post_tx_commit_resp_(const int status) #endif } } - REC_TRANS_TRACE_EXT(tlog_, response_scheduler, OB_ID(ret), ret, + REC_TRANS_TRACE_EXT(tlog_, response_scheduler, + OB_ID(ret), ret, + OB_ID(tag1), has_skip, + OB_ID(tag2), use_rpc, OB_ID(status), status, OB_ID(commit_version), commit_version); return ret; diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index 44fa54231b..8e8e7fb69e 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -964,7 +964,7 @@ int ObTransService::rollback_to_global_implicit_savepoint_(ObTxDesc &tx, tx.inc_op_sn(); bool reset_tx = false, normal_rollback = false; // merge extra touched ls - if (OB_NOT_NULL(extra_touched_ls)) { + if (OB_NOT_NULL(extra_touched_ls) && !extra_touched_ls->empty()) { if (OB_FAIL(tx.update_parts(*extra_touched_ls))) { TRANS_LOG(WARN, "add tx part with extra_touched_ls fail", K(ret), K(tx), KPC(extra_touched_ls)); abort_tx_(tx, ret); diff --git a/src/storage/tx/ob_tx_replay_executor.cpp b/src/storage/tx/ob_tx_replay_executor.cpp index fc7d80a9a1..db8f726364 100644 --- a/src/storage/tx/ob_tx_replay_executor.cpp +++ b/src/storage/tx/ob_tx_replay_executor.cpp @@ -541,6 +541,7 @@ int ObTxReplayExecutor::replay_redo_in_memtable_(ObTxRedoLog &redo) TRANS_LOG(INFO, "[Replay Tx] Replay redo in MemTable cost too much time", K(ret), + K(timeguard.get_diff()), K(log_ts_ns_), K(ctx_->get_trans_id()), K(ctx_->get_ls_id()),