diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index 5b346b611..0d2599aba 100644 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -1884,7 +1884,8 @@ int ObTransService::handle_tx_batch_req(int msg_type, } else if (OB_FAIL(get_tx_ctx_(msg.get_receiver(), msg.get_trans_id(), ctx))) { \ TRANS_LOG(WARN, "get tx context fail", K(ret), K(msg)); \ if (OB_TRANS_CTX_NOT_EXIST == ret || \ - OB_PARTITION_NOT_EXIST == ret) { \ + OB_PARTITION_NOT_EXIST == ret || \ + OB_LS_NOT_EXIST == ret) { \ /* need_check_leader : just for unittest case*/ \ handle_orphan_2pc_msg_(msg, need_check_leader); \ } \ @@ -2085,11 +2086,18 @@ void ObTransService::handle_orphan_2pc_msg_(const ObTxMsg &msg, const bool need_ bool leader = false; if (need_check_leader && OB_FAIL(check_ls_status_(msg.get_receiver(), leader))) { - TRANS_LOG(WARN, "check ls status error", K(ret), K(msg)); + if (OB_LS_NOT_EXIST == ret) { + ret = OB_SUCCESS; + TRANS_LOG(INFO, "check ls status with ls not exist", K(ret), K(msg), K(need_check_leader)); + } else { + TRANS_LOG(WARN, "check ls status error", K(ret), K(msg), K(need_check_leader)); + } } else if (need_check_leader && !leader) { ret = OB_NOT_MASTER; TRANS_LOG(WARN, "receiver not master", K(ret), K(msg)); - } else if (OB_FAIL(ObPartTransCtx::handle_tx_orphan_2pc_msg(msg, get_server(), get_trans_rpc()))) { + } + + if (OB_SUCC(ret) && OB_FAIL(ObPartTransCtx::handle_tx_orphan_2pc_msg(msg, get_server(), get_trans_rpc()))) { TRANS_LOG(WARN, "handle tx orphan 2pc msg failed", K(ret), K(msg)); } else { // do nothing diff --git a/src/storage/tx/ob_tx_2pc_msg_handler.cpp b/src/storage/tx/ob_tx_2pc_msg_handler.cpp index b8c16ae9d..7dc5e2a3c 100644 --- a/src/storage/tx/ob_tx_2pc_msg_handler.cpp +++ b/src/storage/tx/ob_tx_2pc_msg_handler.cpp @@ -472,15 +472,18 @@ int ObPartTransCtx::set_2pc_commit_version_(const SCN &commit_version) int ObPartTransCtx::apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type) { int ret = OB_SUCCESS; + ObTwoPhaseCommitMsgType cache_msg_type = switch_msg_type_(msg_2pc_cache_->type_); if (OB_ISNULL(msg_2pc_cache_)) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "empty 2pc msg", K(ret)); - } else if (switch_msg_type_(msg_2pc_cache_->type_) != msg_type) { + } else if (cache_msg_type != msg_type + && (ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_RESP != msg_type + || ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_RESP != cache_msg_type)) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "unexpected 2pc msg type", K(ret), K(msg_type), KPC(msg_2pc_cache_)); } else { - switch (msg_type) { + switch (cache_msg_type) { case ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REDO_REQ: { const Ob2pcPrepareRedoReqMsg &msg = *(static_cast(msg_2pc_cache_)); if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) { diff --git a/unittest/storage/tx/test_dup_msg_tx_commit.cpp b/unittest/storage/tx/test_dup_msg_tx_commit.cpp index eadbf4e28..f12fde83b 100644 --- a/unittest/storage/tx/test_dup_msg_tx_commit.cpp +++ b/unittest/storage/tx/test_dup_msg_tx_commit.cpp @@ -458,6 +458,93 @@ TEST_F(TestDupMsgMockOb2pcCtx, test_dup_2pc_commit_response) EXPECT_EQ(true, ctx2.check_status_valid(true/*should commit*/)); } +TEST_F(TestDupMsgMockOb2pcCtx, test_dup_2pc_commit_response2) +{ + MockOb2pcCtx ctx1; + MockOb2pcCtx ctx2; + ctx1.init(&mailbox_mgr_); + ctx2.init(&mailbox_mgr_); + auto addr1 = ctx1.get_addr(); + auto addr2 = ctx2.get_addr(); + MockObParticipants participants; + participants.push_back(addr1); + participants.push_back(addr2); + + // ========== Prepare Duplicated Messages ========== + // Mock duplicated 2pc commit mail + ObMail dup_commit_mail; + EXPECT_EQ(OB_SUCCESS, dup_commit_mail.init(addr2 /*from*/, + addr1 /*to*/, + sizeof(ObTwoPhaseCommitMsgType), + ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_RESP)); + // Mock duplicated 2pc abort mail + ObMail dup_abort_mail; + EXPECT_EQ(OB_SUCCESS, dup_abort_mail.init(addr2 /*from*/, + addr1 /*to*/, + sizeof(ObTwoPhaseCommitMsgType), + ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_RESP)); + + // ========== Two Phase Commit prepare Phase ========== + // ctx1 start to commit + ctx1.commit(participants); + // ctx2 handle prepare request + EXPECT_EQ(OB_SUCCESS, ctx2.handle_all()); + // ctx2 handle prepare request + EXPECT_EQ(OB_SUCCESS, ctx2.apply()); + // ctx1 handle prepare response + EXPECT_EQ(OB_SUCCESS, ctx1.handle_all()); + // ctx1 apply prepare log + EXPECT_EQ(OB_SUCCESS, ctx1.apply()); + + // ========== Two Phase Commit pre commit Phase ====== + // ctx2 handle commit request + EXPECT_EQ(OB_SUCCESS, ctx2.handle_all()); + // ctx1 handle commit response + EXPECT_EQ(OB_SUCCESS, ctx1.handle_all()); + // [DUP_MSG]: ctx1 handle duplicated commit response + EXPECT_EQ(OB_SUCCESS, dup_and_handle_msg(dup_commit_mail, &ctx1)); + // [DUP_MSG]: ctx1 handle duplicated commit response + EXPECT_EQ(OB_SUCCESS, dup_and_handle_msg(dup_commit_mail, &ctx1)); + + // ========== Two Phase Commit commit Phase ========== + // ctx2 handle commit request + EXPECT_EQ(OB_SUCCESS, ctx2.handle_all()); + // ctx2 apply commit log + EXPECT_EQ(OB_SUCCESS, ctx2.apply()); + // ctx1 handle commit response + EXPECT_EQ(OB_SUCCESS, ctx1.handle_all()); + // [DUP_MSG]: ctx1 handle duplicated commit response + EXPECT_EQ(OB_SUCCESS, dup_and_handle_msg(dup_commit_mail, &ctx1)); + // ctx1 apply commit log + EXPECT_EQ(OB_SUCCESS, ctx1.apply()); + // // [DUP_MSG]: ctx1 handle duplicated commit response + // EXPECT_EQ(OB_SUCCESS, dup_and_handle_msg(dup_commit_mail, &ctx1)); + + TRANS_LOG(INFO, "qc debug"); + // [DUP_MSG]: ctx1 handle duplicated abort response + EXPECT_EQ(OB_SUCCESS, dup_and_handle_msg(dup_abort_mail, &ctx1)); + + // ========== Two Phase Commit clear Phase ========== + // [DUP_MSG]: ctx1 handle duplicated commit response + EXPECT_EQ(OB_SUCCESS, dup_and_handle_msg(dup_commit_mail, &ctx1)); + // ctx1 apply clear log + EXPECT_EQ(OB_SUCCESS, ctx1.apply()); + // [DUP_MSG]: ctx1 handle duplicated commit response + EXPECT_EQ(OB_SUCCESS, dup_and_handle_msg(dup_commit_mail, &ctx1)); + // ctx2 handle clear request + EXPECT_EQ(OB_SUCCESS, ctx2.handle_all()); + // [DUP_MSG]: ctx1 handle duplicated commit response + EXPECT_EQ(OB_SUCCESS, dup_and_handle_msg(dup_commit_mail, &ctx1)); + // ctx2 apply clear log + EXPECT_EQ(OB_SUCCESS, ctx2.apply()); + // [DUP_MSG]: ctx1 handle duplicated commit response + EXPECT_EQ(OB_SUCCESS, dup_and_handle_msg(dup_commit_mail, &ctx1)); + + // ========== Check Test Valid ========== + EXPECT_EQ(true, ctx1.check_status_valid(true/*should commit*/)); + EXPECT_EQ(true, ctx2.check_status_valid(true/*should commit*/)); +} + TEST_F(TestDupMsgMockOb2pcCtx, test_dup_2pc_clear_request) { MockOb2pcCtx ctx1;