diff --git a/mittest/simple_server/test_transfer_between_rollback_to.cpp b/mittest/simple_server/test_transfer_between_rollback_to.cpp index 2727428375..548c91624d 100644 --- a/mittest/simple_server/test_transfer_between_rollback_to.cpp +++ b/mittest/simple_server/test_transfer_between_rollback_to.cpp @@ -131,7 +131,6 @@ public: WRITE_SQL_BY_CONN(connection, "set GLOBAL ob_query_timeout = 10000000000"); WRITE_SQL_BY_CONN(connection, "alter system set enable_early_lock_release = False;"); WRITE_SQL_BY_CONN(connection, "alter system set undo_retention = 1800;"); - WRITE_SQL_BY_CONN(connection, "alter system set partition_balance_schedule_interval = '10s';"); sleep(5); } @@ -213,6 +212,7 @@ public: MTL_SWITCH(tenant_id) { TRANS_LOG(INFO, "worker to do partition_balance"); auto b_svr = MTL(rootserver::ObTenantBalanceService*); + b_svr->stop(); b_svr->reset(); int64_t job_cnt = 0; int64_t start_time = OB_INVALID_TIMESTAMP, finish_time = OB_INVALID_TIMESTAMP; @@ -410,6 +410,22 @@ TEST_F(ObTransferBetweenRollbackTo, transfer_between_rollback_to) } ASSERT_EQ(loc1, loc2); + while (true) { + int64_t transfer_task_count = 0; + if (OB_FAIL(SSH::g_select_int64(tenant_id, "select count(*) as val from __all_transfer_task", transfer_task_count))) { + TRANS_LOG(WARN, "fail to wait for transfer task"); + } else if (transfer_task_count == 0) { + fprintf(stdout, "succeed wait for balancer v2\n"); + break; + } else if (ObTimeUtility::current_time() - begin_time > 300 * 1000 * 1000) { + fprintf(stdout, "ERROR: fail to wait for balancer\n"); + break; + } else { + fprintf(stdout, "wait for balancer v2\n"); + ob_usleep(200 * 1000); + } + } + usleep(1000 * 1000); global_ls_id.reset(); diff --git a/src/storage/tx/ob_trans_define.cpp b/src/storage/tx/ob_trans_define.cpp index 5ed129c77b..667416e83e 100644 --- a/src/storage/tx/ob_trans_define.cpp +++ b/src/storage/tx/ob_trans_define.cpp @@ -1007,7 +1007,9 @@ int RollbackMaskSet::merge_part(const share::ObLSID add_ls_id, const int64_t exe break; } } - if (!is_exist && OB_FAIL(rollback_parts_->push_back(ObTxExecPart(add_ls_id, exec_epoch, transfer_epoch)))) { + if (!is_exist && OB_FAIL(rollback_parts_->push_back(ObTxExecPart(add_ls_id, + exec_epoch, + transfer_epoch)))) { TRANS_LOG(WARN, "push part to array failed", KR(ret), K(add_ls_id)); } } @@ -1016,6 +1018,7 @@ int RollbackMaskSet::merge_part(const share::ObLSID add_ls_id, const int64_t exe int RollbackMaskSet::find_part(const share::ObLSID ls_id, const int64_t orig_epoch, + const int64_t transfer_epoch, ObTxExecPart &part) { int ret = OB_SUCCESS; @@ -1031,6 +1034,8 @@ int RollbackMaskSet::find_part(const share::ObLSID ls_id, ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "check rollback part failed", K(ret), K(rollback_parts_), K(orig_epoch)); } else { + rollback_parts_->at(idx).transfer_epoch_ = + MAX(transfer_epoch, rollback_parts_->at(idx).transfer_epoch_); part = rollback_parts_->at(idx); is_exist = true; } @@ -1042,7 +1047,7 @@ int RollbackMaskSet::find_part(const share::ObLSID ls_id, ret = OB_ENTRY_NOT_EXIST; } if (OB_FAIL(ret)) { - TRANS_LOG(WARN, "find part", K(ret), K(ls_id), K(orig_epoch), K(rollback_parts_)); + TRANS_LOG(WARN, "find part", K(ret), K(ls_id), K(orig_epoch), K(rollback_parts_), K(transfer_epoch)); } return ret; } diff --git a/src/storage/tx/ob_trans_define_v4.h b/src/storage/tx/ob_trans_define_v4.h index a52cc3470d..3d07a4cc24 100644 --- a/src/storage/tx/ob_trans_define_v4.h +++ b/src/storage/tx/ob_trans_define_v4.h @@ -432,6 +432,7 @@ public: const int64_t transfer_epoch); int find_part(const share::ObLSID ls_id, const int64_t orig_epoch, + const int64_t transfer_epoch, ObTxExecPart &part); private: ObSpinLock lock_; diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 4f24155885..0a44d31375 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -318,10 +318,6 @@ void ObPartTransCtx::destroy() trace_info_.reset(); block_frozen_memtable_ = nullptr; - last_rollback_to_request_id_ = 0; - last_rollback_to_timestamp_ = 0; - last_transfer_in_timestamp_ = 0; - is_inited_ = false; } } @@ -386,9 +382,6 @@ void ObPartTransCtx::default_init_() standby_part_collected_.reset(); trace_log_.reset(); transfer_deleted_ = false; - last_rollback_to_request_id_ = 0; - last_rollback_to_timestamp_ = 0; - last_transfer_in_timestamp_ = 0; } int ObPartTransCtx::init_log_cbs_(const ObLSID &ls_id, const ObTransID &tx_id) @@ -8627,6 +8620,18 @@ int ObPartTransCtx::end_access() return ret; } +int64_t ObPartTransCtx::get_max_transfer_epoch_() +{ + int64_t max_transfer_epoch = 0; + + for (int64_t i = 0; i < exec_info_.transfer_parts_.count(); i++) { + max_transfer_epoch = MAX(max_transfer_epoch, + exec_info_.transfer_parts_[i].transfer_epoch_); + } + + return max_transfer_epoch; +} + /* * rollback_to_savepoint - rollback to savepoint * @@ -8657,7 +8662,8 @@ int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn, ObTxSEQ from_scn, const ObTxSEQ to_scn, const int64_t seq_base, - const int64_t request_id, + const int64_t input_transfer_epoch, + int64_t &output_transfer_epoch, ObIArray &downstream_parts) { int ret = OB_SUCCESS; @@ -8697,25 +8703,16 @@ int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn, if (OB_SUCC(ret)) { bool need_downstream = true; - int64_t current_rollback_to_timestamp = ObTimeUtility::current_time(); - if (request_id != 0 && // come from rollback to request - request_id == last_rollback_to_request_id_) { // the same rollback to with the last one - if (last_transfer_in_timestamp_ != 0 && - last_rollback_to_timestamp_ != 0 && - // encounter transfer between two same rollback to - last_transfer_in_timestamp_ > last_rollback_to_timestamp_) { - need_downstream = true; - TRANS_LOG(INFO, "transfer between rollback to happened", K(ret), K(request_id), - K(last_rollback_to_timestamp_), K(last_transfer_in_timestamp_), - K(last_rollback_to_request_id_), KPC(this)); - } else { - need_downstream = false; - TRANS_LOG(INFO, "no transfer between rollback to happened", K(ret), K(request_id), - K(last_rollback_to_timestamp_), K(last_transfer_in_timestamp_), - K(last_rollback_to_request_id_), KPC(this)); - } - } else { + output_transfer_epoch = get_max_transfer_epoch_(); + + if (input_transfer_epoch != output_transfer_epoch) { need_downstream = true; + TRANS_LOG(INFO, "transfer between rollback to happened", K(ret), + K(input_transfer_epoch), K(output_transfer_epoch), KPC(this)); + } else { + need_downstream = false; + TRANS_LOG(INFO, "no transfer between rollback to happened", K(ret), + K(input_transfer_epoch), K(output_transfer_epoch), KPC(this)); } // must add downstream parts when return success @@ -8730,11 +8727,6 @@ int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn, TRANS_LOG(WARN, "push parts to array failed", K(ret), KPC(this)); } } - - if (OB_SUCC(ret)) { - last_rollback_to_request_id_ = request_id; - last_rollback_to_timestamp_ = current_rollback_to_timestamp; - } } REC_TRANS_TRACE_EXT(tlog_, rollback_savepoint, @@ -10266,21 +10258,22 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param, ctx_tx_data_.set_state(ObTxData::ABORT); } - if (!arg.happened_before_) { - bool epoch_exist = false; - for (int64_t idx = 0; idx < exec_info_.transfer_parts_.count(); idx++) { - if (exec_info_.transfer_parts_.at(idx).ls_id_ == move_tx_param.src_ls_id_ && - exec_info_.transfer_parts_.at(idx).transfer_epoch_ == move_tx_param.transfer_epoch_) { - epoch_exist = true; - break; - } - } - if (!epoch_exist) { - if (OB_FAIL(exec_info_.transfer_parts_.push_back(ObTxExecPart(move_tx_param.src_ls_id_, -1, move_tx_param.transfer_epoch_)))) { - TRANS_LOG(WARN, "epochs push failed", K(ret)); - } + bool epoch_exist = false; + for (int64_t idx = 0; idx < exec_info_.transfer_parts_.count(); idx++) { + if (exec_info_.transfer_parts_.at(idx).ls_id_ == move_tx_param.src_ls_id_ && + exec_info_.transfer_parts_.at(idx).transfer_epoch_ == move_tx_param.transfer_epoch_) { + epoch_exist = true; + break; } } + if (!epoch_exist) { + if (OB_FAIL(exec_info_.transfer_parts_.push_back(ObTxExecPart(move_tx_param.src_ls_id_, + -1, /*exec_epoch*/ + move_tx_param.transfer_epoch_)))) { + TRANS_LOG(WARN, "epochs push failed", K(ret)); + } + } + if (OB_FAIL(ret)) { } else if (exec_info_.state_ < ObTxState::COMMIT && OB_FAIL(mt_ctx_.recover_from_table_lock_durable_info(arg.table_lock_info_, true))) { @@ -10296,8 +10289,6 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param, exec_info_.is_transfer_blocking_ = false; if (OB_FAIL(transfer_op_log_cb_(move_tx_param.op_scn_, move_tx_param.op_type_))) { TRANS_LOG(WARN, "transfer op loc_cb failed", KR(ret), K(move_tx_param)); - } else { - last_transfer_in_timestamp_ = ObTimeUtility::current_time(); } } } diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 3af85819a5..96516d9c51 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -173,10 +173,7 @@ public: coord_prepare_info_arr_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(reserve_allocator_, "PREPARE_INFO")), standby_part_collected_(), ask_state_info_interval_(100 * 1000), refresh_state_info_interval_(100 * 1000), - transfer_deleted_(false), - last_rollback_to_request_id_(0), - last_rollback_to_timestamp_(0), - last_transfer_in_timestamp_(0) + transfer_deleted_(false) { /*reset();*/ } ~ObPartTransCtx() { destroy(); } void destroy(); @@ -739,6 +736,8 @@ private: int check_is_aborted_in_tx_data_(const ObTransID tx_id, bool &is_aborted); + int64_t get_max_transfer_epoch_(); + // ======================================================== // ======================== C2PC MSG HANDLER BEGIN ======================== @@ -921,7 +920,8 @@ public: ObTxSEQ from_seq, const ObTxSEQ to_seq, const int64_t seq_base, - const int64_t request_id, + const int64_t input_transfer_epoch, + int64_t &output_transfer_epoch, ObIArray &downstream_parts); bool is_xa_trans() const { return !exec_info_.xid_.empty(); } bool is_transfer_deleted() const { return transfer_deleted_; } @@ -1120,11 +1120,6 @@ private: // for transfer move tx ctx to clean for abort bool transfer_deleted_; - - // TODO(handora.qc): remove after fix the transfer bwteen rollback_to bug - int64_t last_rollback_to_request_id_; - int64_t last_rollback_to_timestamp_; - int64_t last_transfer_in_timestamp_; // ======================================================== }; diff --git a/src/storage/tx/ob_trans_rpc.cpp b/src/storage/tx/ob_trans_rpc.cpp index 9692cf67e0..8ecc5f005a 100644 --- a/src/storage/tx/ob_trans_rpc.cpp +++ b/src/storage/tx/ob_trans_rpc.cpp @@ -30,7 +30,8 @@ using namespace share; namespace obrpc { OB_SERIALIZE_MEMBER(ObTransRpcResult, status_, send_timestamp_, private_data_); -OB_SERIALIZE_MEMBER(ObTxRpcRollbackSPResult, status_, send_timestamp_, addr_, born_epoch_, ignore_, downstream_parts_); +OB_SERIALIZE_MEMBER(ObTxRpcRollbackSPResult, status_, send_timestamp_, addr_, + born_epoch_, ignore_, downstream_parts_, output_transfer_epoch_); bool need_refresh_location_cache_(const int ret) { @@ -79,8 +80,14 @@ int handle_sp_rollback_resp(const share::ObLSID &receiver_ls_id, return OB_SUCCESS; } return MTL(ObTransService *)->handle_sp_rollback_resp(receiver_ls_id, - epoch, tx_id, status, request_id, result.born_epoch_, result.addr_, - result.downstream_parts_); + epoch, + tx_id, + status, + request_id, + result.born_epoch_, + result.addr_, + result.output_transfer_epoch_, + result.downstream_parts_); } void ObTransRpcResult::reset() diff --git a/src/storage/tx/ob_trans_rpc.h b/src/storage/tx/ob_trans_rpc.h index 425500390c..0d332ddfbb 100644 --- a/src/storage/tx/ob_trans_rpc.h +++ b/src/storage/tx/ob_trans_rpc.h @@ -79,9 +79,12 @@ public: // use this field to indicate handler ignore handle by this msg bool ignore_; ObSEArray downstream_parts_; + // used for transfer info during rollback + int64_t output_transfer_epoch_; public: int get_status() const { return status_; } - TO_STRING_KV(K_(status), K_(send_timestamp), K_(born_epoch), K_(addr), K_(ignore), K_(downstream_parts)); + TO_STRING_KV(K_(status), K_(send_timestamp), K_(born_epoch), K_(addr), + K_(ignore), K_(output_transfer_epoch), K_(downstream_parts)); }; class ObTransRpcProxy : public obrpc::ObRpcProxy diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index b74f9d4450..4e07360721 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -1867,6 +1867,7 @@ int ObTransService::batch_post_rollback_savepoint_msg_(ObTxDesc &tx, if (p.exec_epoch_ <= 0 && p.transfer_epoch_ > 0) { msg.set_for_transfer(); } + msg.input_transfer_epoch_ = p.transfer_epoch_; if (OB_FAIL(rpc_->post_msg(msg.receiver_, msg))) { if (OB_LS_IS_DELETED == ret) { ObSpinLockGuard lock(tx.lock_); @@ -2109,7 +2110,8 @@ int ObTransService::handle_sp_rollback_request(ObTxRollbackSPMsg &msg, msg.tx_ptr_, msg.for_transfer(), msg.specified_from_scn_, - msg.request_id_, + msg.input_transfer_epoch_, + result.output_transfer_epoch_, result.downstream_parts_); if (msg.use_async_resp()) { ObTxRollbackSPRespMsg resp; @@ -2124,6 +2126,7 @@ int ObTransService::handle_sp_rollback_request(ObTxRollbackSPMsg &msg, resp.ret_ = ret; resp.orig_epoch_ = msg.epoch_, resp.epoch_ = ctx_born_epoch; + resp.output_transfer_epoch_ = result.output_transfer_epoch_; int tmp_ret = OB_SUCCESS; if (OB_TMP_FAIL(resp.downstream_parts_.assign(result.downstream_parts_))) { TRANS_LOG(WARN, "parts assign failed", K(tmp_ret), K(resp)); @@ -2157,6 +2160,7 @@ int ObTransService::handle_sp_rollback_response(ObTxRollbackSPRespMsg &msg, msg.request_id_, msg.epoch_, msg.sender_addr_, + msg.output_transfer_epoch_, msg.downstream_parts_); result.reset(); result.init(ret, msg.get_timestamp()); @@ -2319,7 +2323,9 @@ int ObTransService::merge_rollback_downstream_parts_(ObTxDesc &tx, const ObIArra int ret = OB_SUCCESS; for (int64_t idx = 0; OB_SUCC(ret) && idx < downstream_parts.count(); idx++) { ObLSID add_ls_id = downstream_parts.at(idx).left_; - if (OB_FAIL(tx.brpc_mask_set_.merge_part(add_ls_id, 0, downstream_parts.at(idx).right_))) { + if (OB_FAIL(tx.brpc_mask_set_.merge_part(add_ls_id, + 0/*exec_epoch*/, + -1/*transfer_epoch*/))) { TRANS_LOG(WARN, "merge part failed", KR(ret), K(tx.tx_id_), K(add_ls_id)); } else { TRANS_LOG(INFO, "merge rollback parts", K(tx.tx_id_), K(add_ls_id)); @@ -2335,10 +2341,12 @@ int ObTransService::handle_sp_rollback_resp(const share::ObLSID &ls_id, const int64_t request_id, const int64_t ret_epoch, const ObAddr &ret_addr, + const int64_t transfer_epoch, const ObIArray &downstream_parts) { int ret = OB_SUCCESS; - TRANS_LOG(INFO, "handle_sp_rollback_resp", K(tx_id), K(ls_id), K(status), K(downstream_parts)); + TRANS_LOG(INFO, "handle_sp_rollback_resp", K(tx_id), K(ls_id), K(status), + K(transfer_epoch), K(downstream_parts)); ObRollbackSPMsgGuard *rollback_sp_msg_guard = NULL; ObTxDesc *tx = NULL; // find tx_msg by request_id @@ -2368,7 +2376,7 @@ int ObTransService::handle_sp_rollback_resp(const share::ObLSID &ls_id, ObTxExecPart p; if (downstream_parts.count() > 0 && OB_FAIL(merge_rollback_downstream_parts_(*tx, downstream_parts))) { TRANS_LOG(WARN, "merge rollback downstream parts failed", K(ret), K(tx_id), K(downstream_parts)); - } else if (OB_FAIL(tx->brpc_mask_set_.find_part(ls_id, orig_epoch, p))) { + } else if (OB_FAIL(tx->brpc_mask_set_.find_part(ls_id, orig_epoch, transfer_epoch, p))) { TRANS_LOG(WARN, "find part failed", K(ret), K(ls_id), K(tx_id)); } else { // find rollback part by ls_id diff --git a/src/storage/tx/ob_trans_service_v4.h b/src/storage/tx/ob_trans_service_v4.h index 59c5d6a1c3..f6c6f53320 100644 --- a/src/storage/tx/ob_trans_service_v4.h +++ b/src/storage/tx/ob_trans_service_v4.h @@ -107,6 +107,7 @@ int handle_sp_rollback_resp(const share::ObLSID &ls_id, const int64_t request_id, const int64_t ret_epoch, const ObAddr &ret_addr, + const int64_t transfer_epoch, const ObIArray &downstream_parts); int handle_trans_msg_callback(const share::ObLSID &sender_ls_id, const share::ObLSID &receiver_ls_id, @@ -369,7 +370,8 @@ int ls_rollback_to_savepoint_(const ObTransID &tx_id, const ObTxDesc *tx, const bool for_transfer, const ObTxSEQ from_scn, - const int64_t request_id, + const int64_t input_transfer_epoch, + int64_t &output_transfer_epoch, ObIArray &downstream_parts, int64_t expire_ts = -1); int sync_rollback_savepoint__(ObTxDesc &tx, @@ -397,7 +399,8 @@ int ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx, const int64_t tx_seq_base, const int64_t expire_ts, const ObTxSEQ specified_from_scn, - const int64_t request_id, + const int64_t input_transfer_epoch, + int64_t &output_transfer_epoch, ObIArray &downstream_parts); void tx_post_terminate_(ObTxDesc &tx); int start_epoch_(ObTxDesc &tx); diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index 9537d53fa6..14bc7c861d 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -1132,6 +1132,7 @@ int ObTransService::rollback_to_local_implicit_savepoint_(ObTxDesc &tx, ARRAY_FOREACH(parts, i) { ObPartTransCtx *ctx = NULL; ObTxPart &p = parts[i]; + int64_t output_transfer_epoch = 0; ObSEArray downstream_parts; if (OB_FAIL(get_tx_ctx_(p.id_, tx.tx_id_, ctx))) { TRANS_LOG(WARN, "get tx ctx fail", K(ret), K_(p.id), K(tx)); @@ -1143,13 +1144,16 @@ int ObTransService::rollback_to_local_implicit_savepoint_(ObTxDesc &tx, tx.seq_base_, expire_ts, from_scn, - 0, /*request_id, only used for request*/ + -1, /*input_transfer_epoch*/ + output_transfer_epoch, downstream_parts))) { TRANS_LOG(WARN, "LS rollback savepoint fail", K(ret), K(savepoint), K(tx)); } else { // merge find new downstream to tx.rollback parts for (int64_t idx = 0; OB_SUCC(ret) && idx < downstream_parts.count(); idx++) { - if (OB_FAIL(rollback_parts.push_back(ObTxExecPart(downstream_parts.at(idx).left_, 0, downstream_parts.at(idx).right_)))) { + if (OB_FAIL(rollback_parts.push_back(ObTxExecPart(downstream_parts.at(idx).left_, + 0, /*exec_epoch*/ + -1 /*transfer_epoch*/)))) { TRANS_LOG(WARN, "push part to array failed", K(ret), K(tx)); } } @@ -1308,7 +1312,8 @@ int ObTransService::ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx, const int64_t tx_seq_base, const int64_t expire_ts, const ObTxSEQ specified_from_scn, - const int64_t request_id, + const int64_t input_transfer_epoch, + int64_t &output_transfer_epoch, ObIArray &downstream_parts) { int ret = OB_SUCCESS; @@ -1319,7 +1324,8 @@ int ObTransService::ls_sync_rollback_savepoint__(ObPartTransCtx *part_ctx, specified_from_scn, savepoint, tx_seq_base, - request_id, + input_transfer_epoch, + output_transfer_epoch, downstream_parts); if ((OB_NEED_RETRY == ret || OB_EAGAIN == ret) && blockable) { if (ObTimeUtility::current_time() >= expire_ts) { @@ -1537,6 +1543,7 @@ int ObTransService::rollback_savepoint_(ObTxDesc &tx, ObTxPart &p = parts[0]; int64_t born_epoch = 0; ObSEArray downstream_parts; + int64_t output_transfer_epoch = 0; if (OB_FAIL(ls_rollback_to_savepoint_(tx.tx_id_, p.id_, p.epoch_, @@ -1547,7 +1554,8 @@ int ObTransService::rollback_savepoint_(ObTxDesc &tx, &tx, false,/*for transfer*/ ObTxSEQ::INVL(), - 0, /*request_id, only for rollback_to request*/ + -1,/*input_transfer_epoch*/ + output_transfer_epoch, downstream_parts, -1/*non-blocking*/))) { if (common_retryable_error_(ret)) { @@ -1571,7 +1579,9 @@ int ObTransService::rollback_savepoint_(ObTxDesc &tx, TRANS_LOG(WARN, "reserve space fail", K(ret), K(parts), K(tx)); } else { ARRAY_FOREACH(parts, i) { - rollback_parts.push_back(ObTxExecPart(parts[i].id_, parts[i].epoch_, 0)); + rollback_parts.push_back(ObTxExecPart(parts[i].id_, + parts[i].epoch_, + -1 /*transfer_epoch*/)); } } if (FAILEDx(rollback_savepoint_slowpath_(tx, @@ -1618,7 +1628,8 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id, const ObTxDesc *tx, const bool for_transfer, const ObTxSEQ from_scn, - const int64_t request_id, + const int64_t input_transfer_epoch, + int64_t &output_transfer_epoch, ObIArray &downstream_parts, int64_t expire_ts) { @@ -1682,7 +1693,8 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id, tx_seq_base, expire_ts, from_scn, - request_id, + input_transfer_epoch, + output_transfer_epoch, downstream_parts))) { TRANS_LOG(WARN, "LS rollback to savepoint fail", K(ret), K(tx_id), K(ls), K(op_sn), K(savepoint), KPC(ctx)); } @@ -1724,6 +1736,7 @@ inline int ObTransService::rollback_savepoint_slowpath_(ObTxDesc &tx, msg.epoch_ = -1; msg.request_id_ = tx_msg_id; msg.specified_from_scn_ = specified_from_scn; + msg.input_transfer_epoch_ = -1; // prepare msg.tx_ptr_ if required // TODO(yunxing.cyx) : in 4.1 rework here, won't serialize txDesc ObTxDesc *tmp_tx_desc = NULL; diff --git a/src/storage/tx/ob_tx_msg.cpp b/src/storage/tx/ob_tx_msg.cpp index f208e1bbf0..2d3c2d3f75 100644 --- a/src/storage/tx/ob_tx_msg.cpp +++ b/src/storage/tx/ob_tx_msg.cpp @@ -127,7 +127,7 @@ OB_TX_MSG_SERDE(ObAskStateMsg, ObTxMsg, snapshot_, ori_ls_id_, ori_addr_); OB_TX_MSG_SERDE(ObAskStateRespMsg, ObTxMsg, state_info_array_); OB_TX_MSG_SERDE(ObCollectStateMsg, ObTxMsg, snapshot_, check_info_); OB_TX_MSG_SERDE(ObCollectStateRespMsg, ObTxMsg, state_info_, transfer_parts_); -OB_SERIALIZE_MEMBER((ObTxRollbackSPRespMsg, ObTxMsg), ret_, orig_epoch_, downstream_parts_); +OB_SERIALIZE_MEMBER((ObTxRollbackSPRespMsg, ObTxMsg), ret_, orig_epoch_, downstream_parts_, output_transfer_epoch_); OB_DEF_SERIALIZE_SIZE(ObTxRollbackSPMsg) { @@ -142,6 +142,7 @@ OB_DEF_SERIALIZE_SIZE(ObTxRollbackSPMsg) } OB_UNIS_ADD_LEN(flag_); OB_UNIS_ADD_LEN(specified_from_scn_); + OB_UNIS_ADD_LEN(input_transfer_epoch_); return len; } @@ -158,6 +159,7 @@ OB_DEF_SERIALIZE(ObTxRollbackSPMsg) } OB_UNIS_ENCODE(flag_); OB_UNIS_ENCODE(specified_from_scn_); + OB_UNIS_ENCODE(input_transfer_epoch_); } return ret; } @@ -181,6 +183,7 @@ OB_DEF_DESERIALIZE(ObTxRollbackSPMsg) } OB_UNIS_DECODE(flag_); OB_UNIS_DECODE(specified_from_scn_); + OB_UNIS_DECODE(input_transfer_epoch_); } return ret; } diff --git a/src/storage/tx/ob_tx_msg.h b/src/storage/tx/ob_tx_msg.h index 64cdacfea5..b00942b7bc 100644 --- a/src/storage/tx/ob_tx_msg.h +++ b/src/storage/tx/ob_tx_msg.h @@ -249,7 +249,8 @@ namespace transaction tx_seq_base_(0), tx_ptr_(NULL), flag_(USE_ASYNC_RESP), - specified_from_scn_() + specified_from_scn_(), + input_transfer_epoch_(-1) {} ~ObTxRollbackSPMsg() { if (OB_NOT_NULL(tx_ptr_)) { @@ -258,6 +259,7 @@ namespace transaction tx_ptr_ = NULL; } specified_from_scn_.reset(); + input_transfer_epoch_ = -1; } ObTxSEQ savepoint_; int64_t op_sn_; @@ -265,6 +267,7 @@ namespace transaction const ObTxDesc *tx_ptr_; uint8_t flag_; ObTxSEQ specified_from_scn_; + int64_t input_transfer_epoch_; bool use_async_resp() const { return (flag_ & USE_ASYNC_RESP) !=0; } void set_for_transfer() { flag_ |= ROLLBACK_FOR_TRANSFER; } bool for_transfer() const { return (flag_ & ROLLBACK_FOR_TRANSFER) !=0; } @@ -273,24 +276,30 @@ namespace transaction bool is_valid() const; INHERIT_TO_STRING_KV("txMsg", ObTxMsg, K_(savepoint), K_(op_sn), K_(tx_seq_base), K_(flag), - K_(specified_from_scn), KP_(tx_ptr)); + K_(specified_from_scn), KP_(tx_ptr), K_(input_transfer_epoch)); OB_UNIS_VERSION(1); }; struct ObTxRollbackSPRespMsg : public ObTxMsg { ObTxRollbackSPRespMsg() : - ObTxMsg(ROLLBACK_SAVEPOINT_RESP), - ret_(-1), - orig_epoch_(0) + ObTxMsg(ROLLBACK_SAVEPOINT_RESP), + ret_(-1), + orig_epoch_(0), + downstream_parts_(), + output_transfer_epoch_(-1) {} ~ObTxRollbackSPRespMsg() { ret_ = -1; orig_epoch_ = 0; + output_transfer_epoch_ = -1; } int ret_; int64_t orig_epoch_; ObSEArray downstream_parts_; - INHERIT_TO_STRING_KV("txMsg", ObTxMsg, K_(ret), K_(orig_epoch), K_(downstream_parts)); + int64_t output_transfer_epoch_; + + INHERIT_TO_STRING_KV("txMsg", ObTxMsg, K_(ret), K_(orig_epoch), + K_(output_transfer_epoch), K_(downstream_parts)); OB_UNIS_VERSION(1); };