diff --git a/mittest/multi_replica/test_mds_replay_from_ctx_table.cpp b/mittest/multi_replica/test_mds_replay_from_ctx_table.cpp index ac3950362d..70d469b3a0 100644 --- a/mittest/multi_replica/test_mds_replay_from_ctx_table.cpp +++ b/mittest/multi_replica/test_mds_replay_from_ctx_table.cpp @@ -209,7 +209,8 @@ TEST_F(GET_RESTART_ZONE_TEST_CLASS_NAME(2, 1), restart_zone2_from_tx_ctx_table) ASSERT_EQ(ls_handle.is_valid(), true); ASSERT_EQ(OB_SUCCESS, ls_handle.get_ls()->get_tx_ctx(register_succ_arg.tx_id_, true, tx_ctx)); - RETRY_UNTIL_TIMEOUT(tx_ctx->start_recover_ts_ == register_succ_arg.register_scn2_, + RETRY_UNTIL_TIMEOUT(tx_ctx->ctx_source_ == PartCtxSource::RECOVER + && tx_ctx->create_ctx_scn_ == register_succ_arg.register_scn2_, 5 * 1000 * 1000, 5000); // tx_ctx->print_trace_log(); // TRANS_LOG(INFO, "after restart, print tx ctx",K(*tx_ctx)); diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp index 4a16441c4d..4a3586fd0c 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp @@ -407,7 +407,7 @@ int ObLSTxCtxMgr::create_tx_ctx_(const ObTxCreateArg &arg, if (is_tx_blocked_()) { block = true; } else if (is_normal_blocked_()) { - if (!arg.for_special_tx_) { + if (arg.ctx_source_ != PartCtxSource::REGISTER_MDS) { block = true; } } @@ -464,6 +464,7 @@ int ObLSTxCtxMgr::create_tx_ctx_(const ObTxCreateArg &arg, epoch_v, this, arg.for_replay_, + arg.ctx_source_, arg.xid_))) { // when transfer move active tx ctx, we will create tx ctx when dest_ls has no this tx // we want to promise the created ctx state new enouth before insert to dest_ls ctx_map @@ -2687,7 +2688,7 @@ int ObLSTxCtxMgr::move_tx_op(const ObTransferMoveTxParam &move_tx_param, TRANS_LOG(WARN, "tx ctx not exist", K(ls_id_), K(move_tx_param), K(arg)); } ObTxCreateArg create_arg(!is_master(), - false, + PartCtxSource::TRANSFER, tenant_id_, arg.tx_id_, ls_id_, diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.h b/src/storage/tx/ob_trans_ctx_mgr_v4.h index adb818917c..cbefa7e4d3 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.h +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.h @@ -92,7 +92,7 @@ typedef common::LinkHashValue ObLSTxCtxMgrHashValue; struct ObTxCreateArg { ObTxCreateArg(const bool for_replay, - const bool for_special_tx, + const PartCtxSource ctx_source, const uint64_t tenant_id, const ObTransID &trans_id, const share::ObLSID &ls_id, @@ -106,7 +106,7 @@ struct ObTxCreateArg int64_t epoch = -1, const ObTxCtxMoveArg *move_arg = NULL) : for_replay_(for_replay), - for_special_tx_(for_special_tx), + ctx_source_(ctx_source), tenant_id_(tenant_id), tx_id_(trans_id), ls_id_(ls_id), @@ -126,13 +126,13 @@ struct ObTxCreateArg && trans_expired_time_ > 0 && NULL != trans_service_; } - TO_STRING_KV(K_(for_replay), K_(for_special_tx), + TO_STRING_KV(K_(for_replay), "ctx_source_", to_str(ctx_source_), K_(tenant_id), K_(tx_id), K_(ls_id), K_(cluster_id), K_(cluster_version), K_(session_id), K_(scheduler), K_(trans_expired_time), KP_(trans_service), K_(epoch), K_(xid)); bool for_replay_; - bool for_special_tx_; + PartCtxSource ctx_source_; uint64_t tenant_id_; ObTransID tx_id_; share::ObLSID ls_id_; diff --git a/src/storage/tx/ob_trans_define.cpp b/src/storage/tx/ob_trans_define.cpp index 774b86c57b..22ce3bab7a 100644 --- a/src/storage/tx/ob_trans_define.cpp +++ b/src/storage/tx/ob_trans_define.cpp @@ -692,6 +692,47 @@ DEF_TO_STRING(ObLockForReadArg) DEFINE_TO_STRING_AND_YSON(ObTransKey, OB_ID(hash), hash_val_, OB_ID(trans_id), trans_id_); +bool is_transfer_ctx(PartCtxSource ctx_source) +{ + return PartCtxSource::TRANSFER == ctx_source || PartCtxSource::TRANSFER_RECOVER == ctx_source; +} + +const char *to_str(PartCtxSource src) +{ + const char *str = "INVALID"; + switch (src) { + case PartCtxSource::UNKOWN: { + str = "UNKOWN"; + break; + } + case PartCtxSource::MVCC_WRITE: { + str = "MVCC_WRITE"; + break; + } + case PartCtxSource::REGISTER_MDS: { + str = "REGISTER_MDS"; + break; + } + case PartCtxSource::REPLAY: { + str = "REPLAY"; + break; + } + case PartCtxSource::RECOVER: { + str = "RECOVER"; + break; + } + case PartCtxSource::TRANSFER: { + str = "TRANSFER"; + break; + } + case PartCtxSource::TRANSFER_RECOVER: { + str = "TRANSFER_RECOVER"; + break; + } + } + return str; +} + void ObTxExecInfo::reset() { state_ = ObTxState::INIT; diff --git a/src/storage/tx/ob_trans_define.h b/src/storage/tx/ob_trans_define.h index 08c6e56701..befeaae254 100644 --- a/src/storage/tx/ob_trans_define.h +++ b/src/storage/tx/ob_trans_define.h @@ -1758,6 +1758,21 @@ private: ObBlockedTransArray blocked_trans_ids_; }; +enum class PartCtxSource +{ + UNKOWN = 0, + MVCC_WRITE = 1, + REGISTER_MDS = 2, + REPLAY = 3, + RECOVER = 4, + TRANSFER = 5, + TRANSFER_RECOVER = 6, +}; + +bool is_transfer_ctx(PartCtxSource ctx_source); + +const char *to_str(PartCtxSource src); + enum class RetainCause : int16_t { UNKOWN = -1, diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index c2ec9f3dff..a3e727939a 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -87,6 +87,7 @@ int ObPartTransCtx::init(const uint64_t tenant_id, const int64_t epoch, ObLSTxCtxMgr *ls_ctx_mgr, const bool for_replay, + const PartCtxSource ctx_source, ObXATransID xid) { int ret = OB_SUCCESS; @@ -135,6 +136,7 @@ int ObPartTransCtx::init(const uint64_t tenant_id, trans_id_ = trans_id; trans_expired_time_ = trans_expired_time; ctx_create_time_ = ObClockGenerator::getClock(); + ctx_source_ = ctx_source; cluster_version_accurate_ = cluster_version > 0; cluster_version_ = cluster_version ?: LAST_BARRIER_DATA_VERSION; part_trans_action_ = ObPartTransAction::INIT; @@ -286,8 +288,8 @@ void ObPartTransCtx::destroy() if (mds_cache_.is_mem_leak()) { TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "mds memory leak!", K(trans_id_), K(ls_id_), - K(mds_cache_), K(exec_info_), K(ctx_tx_data_), K(start_replay_ts_), - K(start_recover_ts_), K(ctx_create_time_)); + K(mds_cache_), K(exec_info_), K(ctx_tx_data_), K(create_ctx_scn_), + K(ctx_source_), K(ctx_create_time_)); FORCE_PRINT_TRACE(tlog_, "[check mds mem leak] "); } @@ -354,8 +356,8 @@ void ObPartTransCtx::default_init_() is_ctx_table_merged_ = false; mds_cache_.reset(); retain_ctx_func_ptr_ = nullptr; - start_replay_ts_.reset(); - start_recover_ts_.reset(); + create_ctx_scn_.reset(); + ctx_source_ = PartCtxSource::UNKOWN; replay_completeness_.reset(); is_submitting_redo_log_for_freeze_ = false; start_working_log_ts_ = SCN::min_scn(); @@ -1685,7 +1687,11 @@ int ObPartTransCtx::recover_tx_ctx_table_info(ObTxCtxTableInfo &ctx_info) ->get_tx_version_mgr() .update_max_commit_ts(ctx_tx_data_.get_commit_version(), false); } - start_recover_ts_ = exec_info_.max_applying_log_ts_; + create_ctx_scn_ = exec_info_.max_applying_log_ts_; + + if(!exec_info_.transfer_parts_.empty()) { + ctx_source_ = PartCtxSource::TRANSFER_RECOVER; + } } TRANS_LOG(INFO, "recover tx ctx table info succeed", K(ret), KPC(this), K(ctx_info)); } @@ -3165,12 +3171,13 @@ int ObPartTransCtx::submit_prepare_log_() if (OB_SUCC(ret)) { ObTxLogCb *log_cb = NULL; + ObTxPrevLogType prev_log_type(ObTxPrevLogType::TypeEnum::COMMIT_INFO); - if (OB_FAIL(get_prev_log_lsn_(log_block, ObTxLogType::TX_COMMIT_INFO_LOG, prev_lsn))) { + if (OB_FAIL(get_prev_log_lsn_(log_block, prev_log_type, prev_lsn))) { TRANS_LOG(WARN, "get prev log lsn failed", K(ret), K(*this)); } - ObTxPrepareLog prepare_log(exec_info_.incremental_participants_, prev_lsn); + ObTxPrepareLog prepare_log(exec_info_.incremental_participants_, prev_lsn, prev_log_type); if (OB_FAIL(ret)) { // do nothing @@ -3330,20 +3337,20 @@ int ObPartTransCtx::submit_commit_log_() if (OB_SUCC(ret)) { SCN log_commit_version; ObSEArray checksum_arr; + ObTxPrevLogType prev_log_type; if (exec_info_.need_checksum_ && replay_completeness_.is_complete() && OB_FAIL(mt_ctx_.calc_checksum_all(checksum_arr))) { TRANS_LOG(WARN, "calc checksum failed", K(ret)); } else if (!local_tx) { log_commit_version = ctx_tx_data_.get_commit_version(); - if (OB_FAIL(get_prev_log_lsn_(log_block, ObTxLogType::TX_PREPARE_LOG, prev_lsn))) { + prev_log_type.set_prepare(); + if (OB_FAIL(get_prev_log_lsn_(log_block, prev_log_type, prev_lsn))) { TRANS_LOG(WARN, "get prev log lsn failed", K(ret), K(*this)); - } else if (!prev_lsn.is_valid()) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "unexpected prev lsn in 2pc commit log", K(ret), KPC(this)); } } else { - if (OB_FAIL(get_prev_log_lsn_(log_block, ObTxLogType::TX_COMMIT_INFO_LOG, prev_lsn))) { + prev_log_type.set_commit_info(); + if (OB_FAIL(get_prev_log_lsn_(log_block, prev_log_type, prev_lsn))) { TRANS_LOG(WARN, "get prev log lsn failed", K(ret), K(*this)); } } @@ -3356,7 +3363,8 @@ int ObPartTransCtx::submit_commit_log_() checksum_sig, exec_info_.incremental_participants_, multi_source_data, exec_info_.trans_type_, prev_lsn, - coord_prepare_info_arr_); + coord_prepare_info_arr_, + prev_log_type); ObTxLogCb *log_cb = NULL; bool redo_log_submitted = false; LogBarrierType commit_log_barrier_type = LogBarrierType::NO_NEED_BARRIER; @@ -4225,12 +4233,16 @@ int ObPartTransCtx::get_max_submitting_log_info_(palf::LSN &lsn, SCN &log_ts) lsn = log_cb->get_lsn(); log_ts = log_cb->get_log_ts(); } + } else + { + lsn.reset(); } + return ret; } int ObPartTransCtx::get_prev_log_lsn_(const ObTxLogBlock &log_block, - ObTxLogType prev_log_type, + ObTxPrevLogType &prev_log_type, palf::LSN &lsn) { int ret = OB_SUCCESS; @@ -4238,10 +4250,13 @@ int ObPartTransCtx::get_prev_log_lsn_(const ObTxLogBlock &log_block, SCN tmp_log_ts; bool in_same_block = false; - if (is_contain(log_block.get_cb_arg_array(), prev_log_type)) { + if (!prev_log_type.is_normal_log() || !log_block.is_inited()) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(INFO, "invalid arguments", K(ret), K(prev_log_type), K(log_block)); + } else if (is_contain(log_block.get_cb_arg_array(), prev_log_type.convert_to_tx_log_type())) { // invalid lsn lsn.reset(); - in_same_block = true; + prev_log_type.set_self(); } else if (OB_FAIL(get_max_submitting_log_info_(tmp_lsn, tmp_log_ts))) { } else if (tmp_lsn.is_valid()) { if (exec_info_.max_durable_lsn_.is_valid() && exec_info_.max_durable_lsn_ > tmp_lsn) { @@ -4252,9 +4267,14 @@ int ObPartTransCtx::get_prev_log_lsn_(const ObTxLogBlock &log_block, lsn = exec_info_.max_durable_lsn_; } - if (!in_same_block && !lsn.is_valid()) { + if (OB_SUCC(ret) && !lsn.is_valid() && is_transfer_ctx(ctx_source_) /*is_transfer*/) { + prev_log_type.set_tranfer_in(); + } + + if (!prev_log_type.is_valid() || (prev_log_type.is_normal_log() && !lsn.is_valid())) { ret = OB_ERR_UNEXPECTED; - TRANS_LOG(WARN, "unexpected prev lsn", K(ret), K(log_block), K(prev_log_type), K(lsn), KPC(this)); + TRANS_LOG(WARN, "unexpected prev lsn", K(ret), K(log_block), K(prev_log_type), K(lsn), + KPC(this)); } return ret; } @@ -4429,8 +4449,8 @@ int ObPartTransCtx::check_replay_avaliable_(const palf::LSN &offset, } if (OB_SUCC(ret)) { - if (need_replay && !start_replay_ts_.is_valid()) { - start_replay_ts_ = timestamp; + if (need_replay && !create_ctx_scn_.is_valid()) { + create_ctx_scn_ = timestamp; } if (need_replay) { @@ -4605,7 +4625,7 @@ int ObPartTransCtx::check_trans_type_for_replay_(const int32_t &trans_type, { int ret = OB_SUCCESS; - if (start_replay_ts_ == commit_log_ts) { + if (create_ctx_scn_ == commit_log_ts) { // TRANS_LOG(INFO, "start replay from commit log", K(trans_type), K(commit_log_ts)); exec_info_.trans_type_ = trans_type; } else if (exec_info_.trans_type_ != trans_type) { diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index d777d803f9..c1c0bc15ce 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -183,6 +183,7 @@ public: const int64_t epoch, ObLSTxCtxMgr *ls_ctx_mgr, const bool for_replay, + const PartCtxSource ctx_source, ObXATransID xid); void reset() { } int construct_context(const ObTransMsg &msg); @@ -289,8 +290,8 @@ private: K(final_log_cb_), K(ctx_tx_data_), K(role_state_), - K(start_replay_ts_), - K(start_recover_ts_), + K(create_ctx_scn_), + "ctx_source", ctx_source_, K(epoch_), K(replay_completeness_), K(mt_ctx_), @@ -665,7 +666,7 @@ private: int get_log_cb_(const bool need_final_cb, ObTxLogCb *&log_cb); int return_log_cb_(ObTxLogCb *log_cb, bool release_final_cb = false); int get_max_submitting_log_info_(palf::LSN &lsn, share::SCN &log_ts); - int get_prev_log_lsn_(const ObTxLogBlock &log_block, ObTxLogType prev_log_type, palf::LSN &lsn); + int get_prev_log_lsn_(const ObTxLogBlock &log_block, ObTxPrevLogType &prev_log_type, palf::LSN &lsn); int set_start_scn_in_commit_log_(ObTxCommitLog &commit_log); // int init_tx_data_(const share::ObLSID&ls_id, const ObTransID &tx_id); @@ -1031,8 +1032,8 @@ private: } replay_completeness_; // set true when submitting redo log for freezing and reset after freezing bool is_submitting_redo_log_for_freeze_; - share::SCN start_replay_ts_; // replay debug - share::SCN start_recover_ts_; // recover debug + share::SCN create_ctx_scn_; // replay or recover debug + PartCtxSource ctx_source_; // For CDC - prev_lsn share::SCN start_working_log_ts_; diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index a8057c0199..87435e23ee 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -1282,8 +1282,12 @@ int ObTransService::create_tx_ctx_(const share::ObLSID &ls_id, int ret = OB_SUCCESS; bool existed = false; int64_t epoch = 0; + PartCtxSource ctx_source = PartCtxSource::MVCC_WRITE; + if(special) { + ctx_source = PartCtxSource::REGISTER_MDS; + } ObTxCreateArg arg(false, /* for_replay */ - special, /* speclial tx not blocked when in block_normal state */ + ctx_source, /* speclial tx not blocked when in block_normal state */ tx.tenant_id_, tx.tx_id_, ls_id, diff --git a/src/storage/tx/ob_tx_log.cpp b/src/storage/tx/ob_tx_log.cpp index 89636412b1..51d6b56998 100644 --- a/src/storage/tx/ob_tx_log.cpp +++ b/src/storage/tx/ob_tx_log.cpp @@ -106,6 +106,45 @@ int ObTxLogTypeChecker::decide_final_barrier_type( return ret; } +ObTxLogType ObTxPrevLogType::convert_to_tx_log_type() +{ + ObTxLogType tx_log_type = ObTxLogType::UNKNOWN; + if (TypeEnum::COMMIT_INFO == prev_log_type_) { + tx_log_type = ObTxLogType::TX_COMMIT_INFO_LOG; + } else if (TypeEnum::PREPARE == prev_log_type_) { + tx_log_type = ObTxLogType::TX_PREPARE_LOG; + } + return tx_log_type; +} + +int ObTxPrevLogType::serialize(char *buf, const int64_t buf_len, int64_t &pos) const +{ + int8_t prev_val; + memcpy(&prev_val, &prev_log_type_, 1); + return serialization::encode_i8(buf, buf_len, pos, prev_val); +} + +int ObTxPrevLogType::deserialize(const char *buf, const int64_t data_len, int64_t &pos) +{ + int ret = OB_SUCCESS; + int8_t prev_val = 0; + int64_t tmp_pos = pos; + + if (OB_SUCC(serialization::decode_i8(buf, data_len, pos, &prev_val))) { + memcpy(&prev_log_type_, &prev_val, 1); + pos = tmp_pos; + } + return ret; +} + +int64_t ObTxPrevLogType::get_serialize_size(void) const +{ + int8_t prev_val; + memcpy(&prev_val, &prev_log_type_, 1); + return serialization::encoded_length_i8(prev_val); +} + + // ============================== Tx Log Header ============================= DEFINE_SERIALIZE(ObTxLogHeader) @@ -280,7 +319,8 @@ OB_TX_SERIALIZE_MEMBER(ObTxCommitInfoLog, OB_TX_SERIALIZE_MEMBER(ObTxPrepareLog, compat_bytes_, /* 1 */ incremental_participants_, - /* 2 */ prev_lsn_); + /* 2 */ prev_lsn_, + /* 3 */ prev_log_type_); OB_TX_SERIALIZE_MEMBER(ObTxCommitLog, compat_bytes_, @@ -292,7 +332,8 @@ OB_TX_SERIALIZE_MEMBER(ObTxCommitLog, /* 6 */ tx_data_backup_, /* 7 */ prev_lsn_, /* 8 */ ls_log_info_arr_, - /* 9 */ checksum_sig_serde_); + /* 9 */ checksum_sig_serde_, + /* 10 */ prev_log_type_); OB_TX_SERIALIZE_MEMBER(ObTxClearLog, compat_bytes_, /* 1 */ incremental_participants_); @@ -392,7 +433,7 @@ int ObTxPrepareLog::before_serialize() TRANS_LOG(WARN, "reset all compat_bytes_ valid failed", K(ret)); } } else { - if (OB_FAIL(compat_bytes_.init(2))) { + if (OB_FAIL(compat_bytes_.init(3))) { TRANS_LOG(WARN, "init compat_bytes_ failed", K(ret)); } } @@ -400,6 +441,7 @@ int ObTxPrepareLog::before_serialize() if (OB_SUCC(ret)) { TX_NO_NEED_SER(incremental_participants_.empty(), 1, compat_bytes_); TX_NO_NEED_SER(prev_lsn_.is_valid() == false, 2, compat_bytes_); + TX_NO_NEED_SER(prev_log_type_.is_valid() == false, 3, compat_bytes_); } return ret; } @@ -413,7 +455,7 @@ int ObTxCommitLog::before_serialize() TRANS_LOG(WARN, "reset all compat_bytes_ valid failed", K(ret)); } } else { - if (OB_FAIL(compat_bytes_.init(9))) { + if (OB_FAIL(compat_bytes_.init(10))) { TRANS_LOG(WARN, "init compat_bytes_ failed", K(ret)); } } @@ -428,6 +470,7 @@ int ObTxCommitLog::before_serialize() TX_NO_NEED_SER(prev_lsn_.is_valid() == false, 7, compat_bytes_); TX_NO_NEED_SER(ls_log_info_arr_.empty(), 8, compat_bytes_); TX_NO_NEED_SER(checksum_sig_.count() == 0, 9, compat_bytes_); + TX_NO_NEED_SER(prev_log_type_.is_valid() == false, 10, compat_bytes_); } return ret; } diff --git a/src/storage/tx/ob_tx_log.h b/src/storage/tx/ob_tx_log.h index 2d1fe6f7df..1044d887c9 100644 --- a/src/storage/tx/ob_tx_log.h +++ b/src/storage/tx/ob_tx_log.h @@ -190,6 +190,46 @@ inline bool is_contain_stat_log(const ObTxCbArgArray &array) return bool_ret; } +class ObTxPrevLogType +{ +public: + enum TypeEnum : uint8_t + { + UNKOWN = 0, + SELF = 1, + COMMIT_INFO = 2, + PREPARE = 3, + TRANSFER_IN = 10, + }; + +public: + NEED_SERIALIZE_AND_DESERIALIZE; + ObTxPrevLogType() { reset(); } + ObTxPrevLogType(const TypeEnum prev_log_type) : prev_log_type_(prev_log_type) {} + + bool is_valid() { return prev_log_type_ > 0; } + void set_self() { prev_log_type_ = TypeEnum::SELF; } + bool is_self() { return TypeEnum::SELF == prev_log_type_; } + void set_tranfer_in() { prev_log_type_ = TypeEnum::TRANSFER_IN; } + bool is_transfer_in() { return TypeEnum::TRANSFER_IN == prev_log_type_; } + + void set_prepare() { prev_log_type_ = TypeEnum::PREPARE; } + void set_commit_info() { prev_log_type_ = TypeEnum::COMMIT_INFO; } + bool is_normal_log() + { + return TypeEnum::COMMIT_INFO == prev_log_type_ || TypeEnum::PREPARE == prev_log_type_; + } + + ObTxLogType convert_to_tx_log_type(); + + void reset() { prev_log_type_ = TypeEnum::UNKOWN; } + + TO_STRING_KV("val", prev_log_type_); + +private: + TypeEnum prev_log_type_; +}; + // ============================== Tx Log Header ============================== class ObTxLogHeader { @@ -598,23 +638,24 @@ class ObTxPrepareLog public: ObTxPrepareLog(ObTxPrepareLogTempRef &temp_ref) - : incremental_participants_(temp_ref.incremental_participants_), prev_lsn_() + : incremental_participants_(temp_ref.incremental_participants_), prev_lsn_(), prev_log_type_() { before_serialize(); }; - ObTxPrepareLog(share::ObLSArray &incremental_participants, LogOffSet &commit_info_lsn) - : incremental_participants_(incremental_participants), prev_lsn_(commit_info_lsn) + ObTxPrepareLog(share::ObLSArray &incremental_participants, LogOffSet &commit_info_lsn, ObTxPrevLogType prev_log_type) + : incremental_participants_(incremental_participants), prev_lsn_(commit_info_lsn), prev_log_type_(prev_log_type) { before_serialize(); }; const share::ObLSArray &get_incremental_participants() const { return incremental_participants_; } const LogOffSet &get_prev_lsn() { return prev_lsn_; } + const ObTxPrevLogType &get_prev_log_type() const { return prev_log_type_; } void set_prev_lsn(const LogOffSet &lsn) { prev_lsn_ = lsn; } int ob_admin_dump(share::ObAdminMutatorStringArg &arg); static const ObTxLogType LOG_TYPE; - TO_STRING_KV(K(LOG_TYPE), K(incremental_participants_), K(prev_lsn_)); + TO_STRING_KV(K(LOG_TYPE), K(incremental_participants_), K(prev_lsn_), K(prev_log_type_)); public: int before_serialize(); @@ -625,6 +666,7 @@ private: //--------- for liboblog ----------- LogOffSet prev_lsn_; + ObTxPrevLogType prev_log_type_; }; class ObTxDataBackup @@ -670,7 +712,7 @@ public: checksum_sig_serde_(checksum_sig_), incremental_participants_(temp_ref.incremental_participants_), multi_source_data_(temp_ref.multi_source_data_), trans_type_(TransType::SP_TRANS), - tx_data_backup_(), prev_lsn_(), ls_log_info_arr_(temp_ref.ls_log_info_arr_) + tx_data_backup_(), prev_lsn_(), ls_log_info_arr_(temp_ref.ls_log_info_arr_), prev_log_type_() { before_serialize(); } @@ -681,10 +723,11 @@ public: ObTxBufferNodeArray &multi_source_data, int32_t trans_type, LogOffSet prev_lsn, - ObLSLogInfoArray &ls_log_info_arr) + ObLSLogInfoArray &ls_log_info_arr, + ObTxPrevLogType prev_log_type) : checksum_(checksum), checksum_sig_(checksum_sig), checksum_sig_serde_(checksum_sig_), incremental_participants_(incremental_participants), multi_source_data_(multi_source_data), - trans_type_(trans_type), prev_lsn_(prev_lsn), ls_log_info_arr_(ls_log_info_arr) + trans_type_(trans_type), prev_lsn_(prev_lsn), ls_log_info_arr_(ls_log_info_arr), prev_log_type_(prev_log_type) { commit_version_ = commit_version; before_serialize(); @@ -698,6 +741,7 @@ public: const int32_t &get_trans_type() const { return trans_type_; } const ObLSLogInfoArray &get_ls_log_info_arr() const { return ls_log_info_arr_; } const LogOffSet &get_prev_lsn() const { return prev_lsn_; } + const ObTxPrevLogType &get_prev_log_type() const { return prev_log_type_; } void set_prev_lsn(const LogOffSet &lsn) { prev_lsn_ = lsn; } const share::SCN get_backup_start_scn() { return tx_data_backup_.get_start_log_ts(); } @@ -714,7 +758,8 @@ public: K(trans_type_), K(tx_data_backup_), K(prev_lsn_), - K(ls_log_info_arr_)); + K(ls_log_info_arr_), + K(prev_log_type_)); public: int before_serialize(); @@ -733,9 +778,10 @@ private: ObTxDataBackup tx_data_backup_; - //--------- fo r liboblog ----------- + //--------- for liboblog ----------- LogOffSet prev_lsn_; ObLSLogInfoArray &ls_log_info_arr_; + ObTxPrevLogType prev_log_type_; }; class ObTxClearLogTempRef diff --git a/src/storage/tx/ob_tx_replay_executor.cpp b/src/storage/tx/ob_tx_replay_executor.cpp index 6bf8fec5e2..0be823dc22 100644 --- a/src/storage/tx/ob_tx_replay_executor.cpp +++ b/src/storage/tx/ob_tx_replay_executor.cpp @@ -277,7 +277,7 @@ int ObTxReplayExecutor::try_get_tx_ctx_() // since 4.3, cluster version in log_block_header const uint64_t cluster_version = log_block_.get_header().get_cluster_version(); ObTxCreateArg arg(true, /* for_replay */ - false, /* for_special_tx */ + PartCtxSource::REPLAY, tenant_id_, tx_id, ls_id_, diff --git a/src/storage/tx_table/ob_tx_ctx_table.cpp b/src/storage/tx_table/ob_tx_ctx_table.cpp index 1d91bfd729..b9da91eb75 100644 --- a/src/storage/tx_table/ob_tx_ctx_table.cpp +++ b/src/storage/tx_table/ob_tx_ctx_table.cpp @@ -107,7 +107,7 @@ int ObTxCtxTableRecoverHelper::recover_one_tx_ctx_(transaction::ObLSTxCtxMgr* ls // since 4.3 cluster_version in ctx_info uint64_t cluster_version = ctx_info.cluster_version_; transaction::ObTxCreateArg arg(true, /* for_replay */ - false, + PartCtxSource::RECOVER, MTL_ID(), ctx_info.tx_id_, ctx_info.ls_id_, diff --git a/unittest/libobcdc/log_generator.h b/unittest/libobcdc/log_generator.h index 8f018440ba..c3ade58167 100644 --- a/unittest/libobcdc/log_generator.h +++ b/unittest/libobcdc/log_generator.h @@ -300,7 +300,8 @@ void ObTxLogGenerator::gen_prepare_log() { share::ObLSArray inc_ls_arr; transaction::LogOffSet prev_lsn = last_lsn_(); - ObTxPrepareLog prepare_log(inc_ls_arr, prev_lsn); + ObTxPrevLogType prev_log_type(ObTxPrevLogType::TypeEnum::COMMIT_INFO); + ObTxPrepareLog prepare_log(inc_ls_arr, prev_lsn, prev_log_type); LOG_DEBUG("gen prepare_log", K(prepare_log)); EXPECT_EQ(OB_SUCCESS, block_builder_.fill_tx_log_except_redo(prepare_log)); trans_type_ = transaction::TransType::DIST_TRANS; // dist trans. @@ -323,6 +324,7 @@ void ObTxLogGenerator::gen_commit_log() EXPECT_EQ(OB_SUCCESS, ls_info_arr.push_back(ls_info2)); } ObArray checksum_signature; + ObTxPrevLogType prev_log_type(ObTxPrevLogType::TypeEnum::PREPARE); ObTxCommitLog commit_log( commit_version, checksum, @@ -331,7 +333,8 @@ void ObTxLogGenerator::gen_commit_log() mds_arr, trans_type_, last_lsn_(), - ls_info_arr); + ls_info_arr, + prev_log_type); LOG_DEBUG("gen commit_log", K(commit_log)); EXPECT_EQ(OB_SUCCESS, block_builder_.fill_tx_log_except_redo(commit_log)); } diff --git a/unittest/storage/tx/test_ob_tx_log.cpp b/unittest/storage/tx/test_ob_tx_log.cpp index cfc9f1b544..3e01c21c3a 100644 --- a/unittest/storage/tx/test_ob_tx_log.cpp +++ b/unittest/storage/tx/test_ob_tx_log.cpp @@ -62,6 +62,7 @@ ObTxSEQ TEST_MAX_SUBMITTED_SEQ_NO = ObTxSEQ(12345, 0); ObTxSEQ TEST_SERIAL_FINAL_SEQ_NO = ObTxSEQ(12346, 0); LSKey TEST_LS_KEY; ObXATransID TEST_XID; +ObTxPrevLogType TEST_PREV_LOG_TYPE(ObTxPrevLogType::TypeEnum::TRANSFER_IN); struct OldTestLog @@ -215,7 +216,7 @@ TEST_F(TestObTxLog, tx_log_body_except_redo) TEST_CLUSTER_VERSION, TEST_XID, TEST_SERIAL_FINAL_SEQ_NO); - ObTxPrepareLog filll_prepare(TEST_LS_ARRAY, TEST_LOG_OFFSET); + ObTxPrepareLog filll_prepare(TEST_LS_ARRAY, TEST_LOG_OFFSET, TEST_PREV_LOG_TYPE); ObTxCommitLog fill_commit(share::SCN::base_scn(), TEST_CHECKSUM, TEST_CHECKSUM_SIGNATURE_ARRAY, @@ -223,7 +224,8 @@ TEST_F(TestObTxLog, tx_log_body_except_redo) TEST_TX_BUFFER_NODE_ARRAY, TEST_TRANS_TYPE, TEST_LOG_OFFSET, - TEST_INFO_ARRAY); + TEST_INFO_ARRAY, + TEST_PREV_LOG_TYPE); ObTxClearLog fill_clear(TEST_LS_ARRAY); ObTxAbortLog fill_abort(TEST_TX_BUFFER_NODE_ARRAY); ObTxRecordLog fill_record(TEST_LOG_OFFSET, TEST_LOG_OFFSET_ARRY); @@ -282,12 +284,14 @@ TEST_F(TestObTxLog, tx_log_body_except_redo) ASSERT_EQ(OB_SUCCESS, replay_block.get_next_log(tx_log_header)); EXPECT_EQ(ObTxLogType::TX_PREPARE_LOG, tx_log_header.get_tx_log_type()); ASSERT_EQ(OB_SUCCESS, replay_block.deserialize_log_body(replay_prepare)); + EXPECT_EQ(TEST_PREV_LOG_TYPE.prev_log_type_, replay_prepare.get_prev_log_type().prev_log_type_); ObTxCommitLogTempRef commit_temp_ref; ObTxCommitLog replay_commit(commit_temp_ref); ASSERT_EQ(OB_SUCCESS, replay_block.get_next_log(tx_log_header)); EXPECT_EQ(ObTxLogType::TX_COMMIT_LOG, tx_log_header.get_tx_log_type()); ASSERT_EQ(OB_SUCCESS, replay_block.deserialize_log_body(replay_commit)); + EXPECT_EQ(TEST_PREV_LOG_TYPE.prev_log_type_, replay_commit.get_prev_log_type().prev_log_type_); ObTxClearLogTempRef clear_temp_ref; ObTxClearLog replay_clear(clear_temp_ref); @@ -347,7 +351,8 @@ TEST_F(TestObTxLog, tx_log_body_redo) TEST_TX_BUFFER_NODE_ARRAY, TEST_TRANS_TYPE, TEST_LOG_OFFSET, - TEST_INFO_ARRAY); + TEST_INFO_ARRAY, + TEST_PREV_LOG_TYPE); ObTxLogBlockHeader &fill_block_header = fill_block.get_header(); fill_block_header.init(TEST_ORG_CLUSTER_ID, TEST_CLUSTER_VERSION, TEST_LOG_ENTRY_NO, ObTransID(TEST_TX_ID), TEST_ADDR); @@ -645,6 +650,8 @@ TEST_F(TestObTxLog, test_default_log_deserialize) replay_member_cnt++; EXPECT_EQ(fill_prepare.get_prev_lsn(), replay_prepare.get_prev_lsn()); replay_member_cnt++; + EXPECT_EQ(fill_prepare.get_prev_log_type().prev_log_type_, replay_prepare.get_prev_log_type().prev_log_type_); + replay_member_cnt++; EXPECT_EQ(replay_member_cnt, fill_member_cnt); ObTxCommitLogTempRef commit_temp_ref; @@ -870,7 +877,8 @@ TEST_F(TestObTxLog, test_commit_log_with_checksum_signature) tx_buffer_node_array, 1, LogOffSet(100), - ls_info_array); + ls_info_array, + TEST_PREV_LOG_TYPE); int64_t size = log0.get_serialize_size(); char *buf = new char[size]; int64_t pos = 0;