[FEAT MERGE] transfer without kill tx

This commit is contained in:
Handora
2024-02-07 14:20:47 +00:00
committed by ob-robot
parent 233bf43b61
commit 46b64790bc
125 changed files with 10809 additions and 1109 deletions

View File

@ -22,7 +22,7 @@ namespace oceanbase
namespace storage
{
int build_test_schema(share::schema::ObTableSchema &table_schema, uint64_t table_id)
int __attribute__((weak)) build_test_schema(share::schema::ObTableSchema &table_schema, uint64_t table_id)
{
int ret = OB_SUCCESS;
ObColumnSchemaV2 column;
@ -47,7 +47,7 @@ int build_test_schema(share::schema::ObTableSchema &table_schema, uint64_t table
return ret;
}
int gen_create_ls_arg(const int64_t tenant_id,
int __attribute__((weak)) gen_create_ls_arg(const int64_t tenant_id,
const share::ObLSID &ls_id,
obrpc::ObCreateLSArg &arg)
{
@ -67,7 +67,7 @@ int gen_create_ls_arg(const int64_t tenant_id,
return ret;
}
int gen_create_tablet_arg(const int64_t tenant_id,
int __attribute__((weak)) gen_create_tablet_arg(const int64_t tenant_id,
const share::ObLSID &ls_id,
const ObTabletID &tablet_id,
obrpc::ObBatchCreateTabletArg &arg,

View File

@ -30,6 +30,7 @@ endfunction()
c2pc_unittest(test_simple_tx_commit)
c2pc_unittest(test_dup_msg_tx_commit)
c2pc_unittest(test_cycle_commit)
tx_unittest(test_simple_tx_ctx)
tx_unittest(test_ls_log_writer)
tx_unittest(test_ob_trans_hashmap)
@ -41,6 +42,7 @@ storage_unittest(test_ob_trans_rpc)
storage_unittest(test_ob_tx_msg)
storage_unittest(test_ob_id_meta)
storage_unittest(test_ob_standby_read)
storage_unittest(test_ob_standby_read_transfer)
storage_unittest(test_ob_trans_tlog)
add_subdirectory(it)

View File

@ -103,6 +103,57 @@ TEST_F(ObTestTx, basic)
COMMIT_TX(n1, tx, 500 * 1000);
}
TEST_F(ObTestTx, tx_2pc_blocking_and_get_gts_callback_concurrent_problem)
{
GCONF._ob_trans_rpc_timeout = 50;
ObTxNode::reset_localtion_adapter();
START_ONE_TX_NODE(n1);
PREPARE_TX(n1, tx);
PREPARE_TX_PARAM(tx_param);
GET_READ_SNAPSHOT(n1, tx, tx_param, snapshot);
ASSERT_EQ(OB_SUCCESS, n1->start_tx(tx, tx_param));
ASSERT_EQ(OB_SUCCESS, n1->write(tx, snapshot, 100, 112));
ObPartTransCtx *part_ctx = NULL;
ObLSID ls_id(1);
ASSERT_EQ(OB_SUCCESS, n1->get_tx_ctx(ls_id, tx.tx_id_, part_ctx));
// mock gts waiting
part_ctx->sub_state_.set_gts_waiting();
// mock transfer
part_ctx->sub_state_.set_transfer_blocking();
ObMonotonicTs stc(99);
ObMonotonicTs srr(100);
ObMonotonicTs rgt(100);
share::SCN scn;
scn.convert_for_gts(100);
part_ctx->stc_ = stc;
part_ctx->part_trans_action_ = ObPartTransAction::COMMIT;
EXPECT_EQ(OB_SUCCESS, part_ctx->get_gts_callback(srr, scn, rgt));
EXPECT_EQ(true, part_ctx->ctx_tx_data_.get_commit_version() >= scn);
ObLSID dst_ls_id(2);
share::SCN start_scn;
share::SCN end_scn;
start_scn.convert_for_gts(888);
end_scn.convert_for_gts(1000);
part_ctx->ctx_tx_data_.set_start_log_ts(start_scn);
ObSEArray<ObTabletID, 8> array;
ObTxCtxMoveArg arg;
bool is_collected;
TRANS_LOG(INFO, "qc debug");
ASSERT_EQ(OB_SUCCESS, part_ctx->collect_tx_ctx(dst_ls_id,
end_scn,
array,
arg,
is_collected));
ASSERT_EQ(true, is_collected);
n1->get_ts_mgr_().repair_get_gts_error();
}
TEST_F(ObTestTx, start_trans_expired)
{
GCONF._ob_trans_rpc_timeout = 50;

View File

@ -13,13 +13,15 @@
#ifndef OCEANBASE_UNITTEST_STORAGE_TX_OB_MAILBOX
#define OCEANBASE_UNITTEST_STORAGE_TX_OB_MAILBOX
#include <cstring>
#include <deque>
#include <map>
#include <set>
#include "lib/ob_errno.h"
#include "lib/utility/ob_macro_utils.h"
#include "lib/utility/ob_print_utils.h"
#include "storage/tx/ob_committer_define.h"
#include "storage/tx/ob_tx_msg.h"
namespace oceanbase
{
@ -79,6 +81,26 @@ public:
std::memcpy((void*)mail_, (void*)(other.mail_), size_);
return *this;
}
/* ObMail operator=(const ObMail& other) */
/* { */
/* if (NULL != mail_) { */
/* std::free(mail_); */
/* } */
/* from_ = other.from_; */
/* to_ = other.to_; */
/* size_ = other.size_; */
/* mail_ = (MailType*)std::malloc(size_); */
/* std::memcpy((void*)mail_, (void*)(other.mail_), size_); */
/* return *this; */
/* } */
bool operator<(const ObMail& other) const
{
return from_ < other.from_
|| to_ < other.to_
|| size_ < other.size_
|| (size_ == other.size_ && memcmp((void*)mail_, (void*)other.mail_, size_) < 0);
}
/* ObMail& operator=(const ObMail &other) */
/* { */
/* from_ = other.from_; */
@ -117,6 +139,7 @@ public:
{
mailbox_.clear();
}
bool empty() { return mailbox_.empty(); }
int init(int64_t addr,
ObMailBoxMgr<MailType> *mailbox_mgr,
ObMailHandler<MailType> *ctx);
@ -136,6 +159,7 @@ class ObMailBoxMgr
public:
int64_t counter_ = 0;
std::map<int64_t, ObMailBox<MailType>*> mgr_;
std::set<ObMail<MailType>> cache_msg_;
int register_mailbox(int64_t &addr,
ObMailBox<MailType> &mailbox,
ObMailHandler<MailType> *ctx);
@ -143,6 +167,7 @@ public:
const int64_t receive);
int send_to_head(const ObMail<MailType>& mail,
const int64_t receive);
bool random_dup_and_send();
void reset();
};
@ -269,6 +294,7 @@ void ObMailBoxMgr<MailType>::reset()
{
counter_ = 0;
mgr_.clear();
cache_msg_.clear();
TRANS_LOG(INFO, "reset mailbox",K(this));
}
@ -279,6 +305,7 @@ int ObMailBoxMgr<MailType>::send(const ObMail<MailType>& mail,
int ret = OB_SUCCESS;
if (mgr_.count(mail.to_) != 0) {
cache_msg_.insert(mail);
mgr_[receiver]->mailbox_.push_back(mail);
TRANS_LOG(INFO, "send mailbox success", K(ret), K(mail),
K(*mgr_[receiver]));
@ -294,6 +321,7 @@ int ObMailBoxMgr<MailType>::send_to_head(const ObMail<MailType>& mail,
int ret = OB_SUCCESS;
if (mgr_.count(mail.to_) != 0) {
cache_msg_.insert(mail);
mgr_[receiver]->mailbox_.push_front(mail);
TRANS_LOG(INFO, "send to mailbox front success", K(ret), K(mail),
K(*mgr_[receiver]));
@ -302,6 +330,37 @@ int ObMailBoxMgr<MailType>::send_to_head(const ObMail<MailType>& mail,
return ret;
}
template <typename MailType>
bool ObMailBoxMgr<MailType>::random_dup_and_send()
{
int64_t idx = ObRandom::rand(0, cache_msg_.size() - 1);
if (idx >= 0 && cache_msg_.size() >= 0) {
int i = 0;
bool found = false;
ObMail<MailType> mail;
for (auto iter = cache_msg_.begin();
iter != cache_msg_.end();
iter++) {
if (idx == i) {
mail = *iter;
found = true;
break;
}
i++;
}
if (!found) {
ob_abort();
}
mgr_[mail.to_]->mailbox_.push_front(mail);
TRANS_LOG(INFO, "random_dup_and_send success", K(idx), K(cache_msg_.size()),
K(mail));
return true;
} else {
return false;
}
}
} // namespace transaction
} // namespace oceanbase

View File

@ -59,7 +59,9 @@ int MockOb2pcCtx::init(ObMailBoxMgr<ObTwoPhaseCommitMsgType> *mgr)
tx_state_ = ObTxState::INIT;
log_queue_.clear();
participants_.clear();
intermediate_participants_.clear();
coordinator_ = -1;
sender_ = -1;
mailbox_mgr_ = mgr;
if (OB_FAIL(mailbox_mgr_->register_mailbox(addr_, mailbox_, this))) {
TRANS_LOG(ERROR, "mock ctx register mailbox failed");
@ -72,6 +74,7 @@ int MockOb2pcCtx::commit(const MockObParticipants& participants)
{
ObLockGuard<ObSpinLock> lock_guard(latch_);
participants_.assign(participants.begin(), participants.end());
coordinator_ = addr_;
return two_phase_commit();
}
@ -131,12 +134,13 @@ int MockOb2pcCtx::on_clear()
Ob2PCRole MockOb2pcCtx::get_2pc_role() const
{
Ob2PCRole role;
if (participants_.size()!=0) {
if (coordinator_ == -1) {
role = Ob2PCRole::UNKNOWN;
} else if (addr_ == coordinator_) {
role = Ob2PCRole::ROOT;
} else if (participants_.size()==0) {
} else if (0 == participants_.size()) {
// not root & downstream is empty
role = Ob2PCRole::LEAF;
} else {
@ -180,21 +184,33 @@ int MockOb2pcCtx::post_msg(const ObTwoPhaseCommitMsgType& msg_type,
from = mailbox_.addr_;
if (participant == OB_C2PC_UPSTREAM_ID) {
to = coordinator_;
} else if (participant == OB_C2PC_SENDER_ID) {
if (-1 != sender_) {
to = sender_;
} else if (-1 != coordinator_) {
to = coordinator_;
} else {
to = -1;
}
} else {
to = participants_[participant];
}
mail.init(from, to, sizeof(ObTwoPhaseCommitMsgType), msg_type);
if (-1 == coordinator_
if (-1 == to
&& participant == OB_C2PC_UPSTREAM_ID
&& ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_RESP == msg_type) {
TRANS_LOG(INFO, "self decide abort", K(ret), K(msg_type), K(participant),
K(mail));
} else if (-1 == coordinator_
} else if (-1 == to
&& participant == OB_C2PC_UPSTREAM_ID) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(ERROR, "invalid dst", K(ret), K(msg_type), K(participant),
K(mail));
} else if (-1 == to
&& participant == OB_C2PC_SENDER_ID) {
TRANS_LOG(INFO, "new transfer without sender", K(ret), K(msg_type), K(participant),
K(mail), K(to));
} else if (mail.from_ != mail.to_) {
if (OB_FAIL(mailbox_mgr_->send(mail, mail.to_))) {
TRANS_LOG(WARN, "send mailbox failed", K(ret), K(msg_type), K(participant));
@ -234,11 +250,14 @@ int MockOb2pcCtx::handle(const ObMail<ObTwoPhaseCommitMsgType> &mail)
ObTwoPhaseCommitMsgType type = *mail.mail_;
ObLockGuard<ObSpinLock> lock_guard(latch_);
sender_ = mail.from_;
if ((participant_id = find_participant_id(mail.from_)) == -1
&& is_2pc_response_msg(type)) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(ERROR, "2pc request with wrong participant id", K(ret));
} else if (is_2pc_request_msg(type)
&& -1 == coordinator_
&& FALSE_IT(coordinator_ = mail.from_)) {
} else if (is_2pc_request_msg(type)
&& OB_FAIL(handle_2pc_req(type))) {
@ -250,6 +269,8 @@ int MockOb2pcCtx::handle(const ObMail<ObTwoPhaseCommitMsgType> &mail)
TRANS_LOG(INFO, "handle msg success", K(addr_), K(mail), K(*this));
}
sender_ = -1;
return ret;
}
@ -416,5 +437,103 @@ bool MockOb2pcCtx::is_sub2pc() const
return false;
}
int MockOb2pcCtx::merge_intermediate_participants()
{
int ret = OB_SUCCESS;
bool exist = false;
for (int64_t i = 0; i < intermediate_participants_.size(); i++) {
exist = false;
for (int64_t j = 0; !exist && j < participants_.size(); j++) {
if (participants_[j] == intermediate_participants_[i]) {
exist = true;
}
}
if (!exist) {
participants_.push_back(intermediate_participants_[i]);
}
}
intermediate_participants_.clear();
return ret;
}
void MockOb2pcCtx::add_intermediate_participants(const int64_t ls_id)
{
bool exist = false;
for (int64_t i = 0; !exist && i < intermediate_participants_.size(); i++) {
if (intermediate_participants_[i] == ls_id) {
exist = true;
}
}
if (!exist) {
intermediate_participants_.push_back(ls_id);
}
}
void MockOb2pcCtx::print_downstream()
{
TRANS_LOG(INFO, "[TREE_COMMIT_PRINT]", K(addr_));
for (int64_t i = 0; i < participants_.size(); i++) {
TRANS_LOG(INFO, "[TREE_COMMIT_PRINT] ", K(participants_[i]));
}
}
bool MockOb2pcCtx::is_real_upstream()
{
bool bret = false;
if (-1 == sender_) {
bret = true;
} else {
bret = sender_ == coordinator_;
}
return bret;
}
bool MockOb2pcCtx::need_to_advance()
{
Ob2PCRole role = get_2pc_role();
if (role == Ob2PCRole::ROOT) {
if (!all_downstream_collected_()) {
return true;
} else {
return false;
}
} else if (role == Ob2PCRole::INTERNAL) {
if (!all_downstream_collected_()) {
return true;
} else {
return false;
}
} else {
return false;
}
return false;
}
// bool MockOb2pcCtx::is_downstream_of(const int64_t ls_id)
// {
// for (int64_t i = 0; i < participants_.size(); i++) {
// if (participants_[i] == ls_id) {
// return true;
// }
// }
// for (int64_t i = 0; i < incremental_participants_.size(); i++) {
// if (intermediate_participants_[i] == ls_id) {
// return true;
// }
// }
// return false;
// }
} // end namespace transaction
} // end namespace oceanbase

View File

@ -73,6 +73,10 @@ public:
// commit(with one consensus round and 2*H transport round latency)
int commit(const MockObParticipants& participants);
int64_t get_coordinator() { return coordinator_; }
bool is_real_downstream() { return true; }
INHERIT_TO_STRING_KV("ObTxCycleTwoPhaseCommitter",
ObTxCycleTwoPhaseCommitter,
K_(addr),
@ -82,6 +86,7 @@ public:
K_(tx_state),
K_(log_queue),
K_(participants),
K_(intermediate_participants),
K_(coordinator),
K_(sender));
protected:
@ -120,9 +125,19 @@ protected:
virtual ObTxState get_upstream_state() const override;
virtual int set_upstream_state(const ObTxState state) override;
virtual bool is_2pc_logging() const override;
virtual bool is_2pc_blocking() const { return false; };
// for xa
virtual bool is_sub2pc() const override;
virtual int merge_intermediate_participants() override;
void add_intermediate_participants(const int64_t ls_id);
void print_downstream();
virtual bool is_real_upstream() override;
bool need_to_advance();
// Oceanbase's optimized log handler, if it returns success, the log is definitely proposed
// to the consensus layer and we can rely on its sequential commitment to submit the log
@ -138,7 +153,7 @@ private:
int64_t find_participant_id(int64_t participant_key);
virtual int apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type) override;
private:
public:
common::ObSpinLock latch_;
int64_t addr_;
ObMailBox<ObTwoPhaseCommitMsgType> mailbox_;
@ -150,6 +165,7 @@ private:
int64_t coordinator_;
int64_t sender_;
MockObParticipants participants_;
MockObParticipants intermediate_participants_;
ObMailBoxMgr<ObTwoPhaseCommitMsgType>* mailbox_mgr_;
};

View File

@ -118,6 +118,7 @@ void MockObTxCtx::destroy()
int MockObTxCtx::submit_log(const ObTwoPhaseCommitLogType& log_type)
{
merge_intermediate_participants();
log_queue_.push_back(log_type);
TRANS_LOG(INFO, "submit log success", K(log_type), KPC(this));
return OB_SUCCESS;
@ -259,12 +260,20 @@ int MockObTxCtx::apply()
TRANS_LOG(ERROR, "log_queue is empty", KPC(this));
ob_abort();
} else {
ObLSLogInfo info((ObLSID(addr_)), palf::LSN());
ObTwoPhaseCommitLogType log_type = log_queue_.front();
log_queue_.pop_front();
if (ObTwoPhaseCommitLogType::OB_LOG_TX_PREPARE == log_type) {
merge_prepare_log_info_(info);
}
ret = ObTxCycleTwoPhaseCommitter::apply_log(log_type);
if (OB_FAIL(ret)) {
TRANS_LOG(ERROR, "apply log success", K(ret), K(log_type), KPC(this));
TRANS_LOG(ERROR, "apply log failed", K(ret), K(log_type), KPC(this));
ob_abort();
} else {
TRANS_LOG(INFO, "apply log success", K(ret), K(log_type), KPC(this), K(info));
}
}
@ -301,7 +310,7 @@ int MockObTxCtx::handle(const ObMail<ObTxMsg>& mail)
case TX_COMMIT: {
const ObTxCommitMsg *msg = dynamic_cast<const ObTxCommitMsg*>(mail.mail_);
scheduler_addr_ = mail.from_;
ret = commit(msg->parts_,
ret = commit(msg->commit_parts_,
MonotonicTs::current_time(),
msg->expire_ts_,
msg->app_trace_info_,
@ -315,7 +324,10 @@ int MockObTxCtx::handle(const ObMail<ObTxMsg>& mail)
}
case TX_2PC_PREPARE_RESP: {
const Ob2pcPrepareRespMsg *prepare_resp = dynamic_cast<const Ob2pcPrepareRespMsg*>(mail.mail_);
ret = handle_tx_2pc_prepare_resp(*prepare_resp);
Ob2pcPrepareRespMsg prepare_resp2 = *prepare_resp;
prepare_resp2.prepare_info_array_.reset();
prepare_resp2.prepare_info_array_.push_back(ObLSLogInfo((ObLSID(mail.from_)), palf::LSN()));
ret = handle_tx_2pc_prepare_resp(prepare_resp2);
break;
}
case TX_2PC_PRE_COMMIT_REQ: {
@ -434,6 +446,35 @@ void MockObTxCtx::set_exiting_()
is_exiting_ = true;
}
bool MockObTxCtx::check_status_valid(const bool should_commit)
{
bool bret = true;
// check commit or abort
if (bret) {
ObTxData *tx_data_ptr = NULL;
ctx_tx_data_.get_tx_data_ptr(tx_data_ptr);
const int32_t state = (*tx_data_ptr).state_;
if (should_commit) {
bret = ObTxData::COMMIT == state;
} else {
bret = ObTxData::ABORT == state;
}
}
// check clear
if (bret) {
bret = ObTxState::CLEAR == exec_info_.state_;
}
if (!bret) {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "state is not match", K(*this), K(should_commit));
}
return bret;
}
} // end namespace transaction
} // end namespace oceanbase

View File

@ -44,10 +44,11 @@ public:
const std::vector<share::ObLSID> &participants,
ObTxCommitMsg &msg);
static int build_scheduler_mailbox(ObMailBoxMgr<ObTxMsg>* mailbox_mgr);
/* static int check_mail(ObMailBox<ObTxMsg> mailbox, */
/* int64_t from, */
/* int64_t to, */
/* int64_t type); */
static int check_mail(ObMailBox<ObTxMsg> mailbox,
int64_t from,
int64_t to,
int64_t type);
bool check_status_valid(const bool should_commit);
void destroy();
void set_exiting_();
virtual int register_timeout_task_(const int64_t interval_us);
@ -56,7 +57,7 @@ public:
K_(addr), K_(mailbox), K_(log_queue), K_(collected));
public:
int64_t scheduler_addr_ = 0;
/* static ObMailBox<ObTxMsg> scheduler_mailbox_; */
static ObMailBox<ObTxMsg> scheduler_mailbox_;
protected:
virtual int post_msg_(const share::ObLSID &receiver, ObTxMsg &msg) override;
virtual int post_msg_(const ObAddr &receiver, ObTxMsg &msg) override;

View File

@ -0,0 +1,479 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#include <vector>
#define private public
#define protected public
#include "ob_mock_2pc_ctx.h"
#include "lib/random/ob_random.h"
#include "lib/function/ob_function.h"
namespace oceanbase
{
using namespace ::testing;
using namespace transaction;
namespace unittest
{
class TestCycleCtx : public ::testing::Test
{
protected:
virtual void SetUp() override
{
mailbox_mgr_.reset();
}
virtual void TearDown() override
{
mailbox_mgr_.reset();
}
public:
ObMailBoxMgr<ObTwoPhaseCommitMsgType> mailbox_mgr_;
};
TEST_F(TestCycleCtx, test_basic_cycle_commit)
{
// normal participants
MockOb2pcCtx ctx1;
MockOb2pcCtx ctx2;
// incremental participants
MockOb2pcCtx ctx3;
ctx1.init(&mailbox_mgr_);
ctx2.init(&mailbox_mgr_);
ctx3.init(&mailbox_mgr_);
auto addr1 = ctx1.get_addr();
auto addr2 = ctx2.get_addr();
auto addr3 = ctx3.get_addr();
MockObParticipants participants;
participants.push_back(addr1);
participants.push_back(addr2);
ctx2.add_intermediate_participants(addr1);
ctx1.add_intermediate_participants(addr2);
// ========== Two Phase Commit prepare Phase ==========
// ctx1 start to commit
ctx1.commit(participants);
// ctx2 handle prepare request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx2 apply prepare log
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// ctx1 apply prepare log
EXPECT_EQ(OB_SUCCESS, ctx1.apply());
// ctx1 handle prepare request
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ctx2 handle prepare response
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx1 handle prepare response
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ========== Two Phase Commit pre commit Phase ======
// ctx2 handle pre-commit request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx1 handle pre-commit request
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ctx2 handle pre-commit response
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx1 handle pre-commit response
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ========== Two Phase Commit commit Phase ==========
// ctx2 handle commit request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx1 apply commit log
EXPECT_EQ(OB_SUCCESS, ctx1.apply());
// ctx1 handle commit request
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ctx2 handle commit response
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx2 apply commit log
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// ctx1 handle commit response
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ========== Two Phase Commit clear Phase ==========
// ctx2 handle clear request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx3 handle clear request
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ctx2 apply clear log
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// ctx1 apply clear log
EXPECT_EQ(OB_SUCCESS, ctx1.apply());
// ========== Check Test Valid ==========
EXPECT_EQ(true, ctx1.check_status_valid(true/*should commit*/));
EXPECT_EQ(true, ctx2.check_status_valid(true/*should commit*/));
EXPECT_EQ(Ob2PCRole::ROOT, ctx1.get_2pc_role());
EXPECT_EQ(Ob2PCRole::INTERNAL, ctx2.get_2pc_role());
}
TEST_F(TestCycleCtx, test_random_cycle_commit)
{
// root coordinator
MockOb2pcCtx root_ctx;
root_ctx.init(&mailbox_mgr_);
int64_t root_addr = root_ctx.get_addr();
// normal participants
MockOb2pcCtx ctx1;
MockOb2pcCtx ctx2;
ctx1.init(&mailbox_mgr_);
ctx2.init(&mailbox_mgr_);
int64_t addr1 = ctx1.get_addr();
int64_t addr2 = ctx2.get_addr();
// incremental participants
const int64_t MAX_INC_CTX_COUNT = 100;
const int64_t MAX_OLD_CTX_COUNT = 100;
MockOb2pcCtx inc_ctx[MAX_INC_CTX_COUNT];
int64_t inc_addr[MAX_INC_CTX_COUNT];
int64_t inc_index = 0;
int64_t old_index = 0;
for (int i = 0; i < MAX_INC_CTX_COUNT; i++) {
inc_ctx[i].init(&mailbox_mgr_);
inc_addr[i] = inc_ctx[i].get_addr();
}
ObFunction<MockOb2pcCtx *(const int64_t participant)> get_ctx_op =
[&](const int64_t participant) -> MockOb2pcCtx * {
if (participant == root_addr) {
return &root_ctx;
} else if (participant == addr1) {
return &ctx1;
} else if (participant == addr2) {
return &ctx2;
} else {
for (int64_t i = 0; i < inc_index; i++) {
if (participant == inc_addr[i]) {
return &inc_ctx[i];
}
}
}
return NULL;
};
ObFunction<bool()> transfer_to_new_op =
[&]() -> bool {
if (inc_index >= MAX_INC_CTX_COUNT) {
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Transfer Op Limited", K(inc_index));
return false;
}
int64_t src_ctx_idx = ObRandom::rand(0, inc_index + 2);
int64_t dst_ctx_idx = inc_index;
MockOb2pcCtx *ctx;
if (src_ctx_idx == 0) {
ctx = &root_ctx;
} else if (src_ctx_idx == 1) {
ctx = &ctx1;
} else if (src_ctx_idx == 2) {
ctx = &ctx2;
} else {
ctx = &inc_ctx[src_ctx_idx - 3];
}
if (ctx->is_2pc_logging()) {
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Transfer Op Failed", K(src_ctx_idx), K(dst_ctx_idx), KPC(ctx));
return false;
}
inc_ctx[dst_ctx_idx].downstream_state_ = ctx->downstream_state_;
inc_ctx[dst_ctx_idx].upstream_state_ = ctx->upstream_state_;
inc_ctx[dst_ctx_idx].tx_state_ = ctx->tx_state_;
ctx->add_intermediate_participants(inc_addr[dst_ctx_idx]);
inc_index++;
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Transfer Op Succeed", K(src_ctx_idx), K(dst_ctx_idx), KPC(ctx), K(inc_ctx[dst_ctx_idx]));
return true;
};
ObFunction<bool()> transfer_to_old_op =
[&]() -> bool {
if (old_index >= MAX_OLD_CTX_COUNT) {
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Transfer Op Limited", K(old_index));
return false;
}
int64_t src_ctx_idx = 0;
int64_t dst_ctx_idx = 0;
while (src_ctx_idx == dst_ctx_idx) {
src_ctx_idx = ObRandom::rand(0, inc_index + 2);
dst_ctx_idx = ObRandom::rand(0, inc_index + 2);
}
EXPECT_NE(src_ctx_idx, dst_ctx_idx);
MockOb2pcCtx *src_ctx = NULL;
MockOb2pcCtx *dst_ctx = NULL;
if (src_ctx_idx == 0) {
src_ctx = &root_ctx;
} else if (src_ctx_idx == 1) {
src_ctx = &ctx1;
} else if (src_ctx_idx == 2) {
src_ctx = &ctx2;
} else {
src_ctx = &inc_ctx[src_ctx_idx - 3];
}
if (dst_ctx_idx == 0) {
dst_ctx = &root_ctx;
} else if (dst_ctx_idx == 1) {
dst_ctx = &ctx1;
} else if (dst_ctx_idx == 2) {
dst_ctx = &ctx2;
} else {
dst_ctx = &inc_ctx[dst_ctx_idx - 3];
}
if (src_ctx->is_2pc_logging()) {
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Transfer Op Failed", K(src_ctx_idx), K(dst_ctx_idx), KPC(src_ctx));
return false;
}
int64_t dst_addr = 0;
if (dst_ctx_idx == 0) {
dst_addr = root_addr;
} else if (dst_ctx_idx == 1) {
dst_addr = addr1;
} else if (dst_ctx_idx == 2) {
dst_addr = addr2;
} else {
dst_addr = inc_addr[dst_ctx_idx - 3];
}
EXPECT_EQ(dst_addr, dst_ctx->get_addr());
src_ctx->add_intermediate_participants(dst_addr);
old_index++;
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Transfer Op Succeed", K(src_ctx_idx), K(dst_ctx_idx), KPC(src_ctx), KPC(dst_ctx), K(dst_addr));
return true;
};
ObFunction<MockOb2pcCtx *()> get_advancing_ctx =
[&]() -> MockOb2pcCtx * {
if (!root_ctx.mailbox_.empty() || !root_ctx.log_queue_.empty()) {
return &root_ctx;
} else if (!ctx1.mailbox_.empty() || !ctx1.log_queue_.empty()) {
return &ctx1;
} else if (!ctx2.mailbox_.empty() || !ctx2.log_queue_.empty()) {
return &ctx2;
}
for (int i = 0; i < inc_index; i++) {
if (!inc_ctx[i].mailbox_.empty() || !inc_ctx[i].log_queue_.empty()) {
return &inc_ctx[i];
}
}
return NULL;
};
ObFunction<MockOb2pcCtx *()> get_to_advance_ctx =
[&]() -> MockOb2pcCtx * {
if (root_ctx.need_to_advance()) {
return &root_ctx;
} else if (ctx1.need_to_advance()) {
return &ctx1;
} else if (ctx2.need_to_advance()) {
return &ctx2;
}
for (int i = 0; i < inc_index; i++) {
if (inc_ctx[i].need_to_advance()) {
return &inc_ctx[i];
}
}
return NULL;
};
ObFunction<bool()> drive_op =
[&]() -> bool {
int64_t ctx_idx = ObRandom::rand(0, inc_index + 2);
MockOb2pcCtx *ctx;
if (ctx_idx == 0) {
ctx = &root_ctx;
} else if (ctx_idx == 1) {
ctx = &ctx1;
} else if (ctx_idx == 2) {
ctx = &ctx2;
} else {
ctx = &inc_ctx[ctx_idx - 3];
}
bool is_mail_empty = ctx->mailbox_.empty();
bool is_log_empty = ctx->log_queue_.empty();
int64_t job = 0;
if (is_mail_empty && is_log_empty) {
MockOb2pcCtx *advancing_ctx = get_advancing_ctx();
if (NULL == advancing_ctx) {
ctx = get_to_advance_ctx();
bool is_ok = ctx->need_to_advance();
ctx->handle_timeout();
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Drive Handle Timeout Op Succeed", KPC(ctx), K(is_ok));
return true;
} else {
ctx = advancing_ctx;
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Drive Op Asing advancing Ctx", KPC(ctx));
}
}
is_mail_empty = ctx->mailbox_.empty();
is_log_empty = ctx->log_queue_.empty();
if (is_mail_empty && is_log_empty) {
ob_abort();
} else if (!is_mail_empty && !is_log_empty) {
job = ObRandom::rand(0, 1);
} else if (!is_mail_empty) {
job = 0;
} else if (!is_log_empty) {
job = 1;
}
if (job == 0) {
// has mail to drive
ctx->handle();
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Drive Mail Op Succeed", KPC(ctx));
return true;
} else if (job == 1) {
// has log to drive
ctx->apply();
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Drive Log Op Succeed", KPC(ctx));
return true;
}
ob_abort();
return false;
};
ObFunction<bool()> is_all_released =
[&]() -> bool {
if (root_ctx.downstream_state_ != ObTxState::CLEAR) {
return false;
} else if (ctx1.downstream_state_ != ObTxState::CLEAR) {
return false;
} else if (ctx2.downstream_state_ != ObTxState::CLEAR) {
return false;
}
for (int i = 0; i < inc_index; i++) {
if (inc_ctx[i].downstream_state_ != ObTxState::CLEAR) {
return false;
}
}
return true;
};
ObFunction<bool()> print_tree =
[&]() -> bool {
root_ctx.print_downstream();
ctx1.print_downstream();
ctx2.print_downstream();
for (int i = 0; i < inc_index; i++) {
inc_ctx[i].print_downstream();
}
return true;
};
ObFunction<bool()> validate_tree =
[&]() -> bool {
for (int i = 0; i < inc_index; i++) {
int64_t upstream = inc_ctx[i].coordinator_;
MockOb2pcCtx *upstream_ctx = NULL;
if (upstream == root_addr) {
upstream_ctx = &root_ctx;
} else if (upstream == addr1) {
upstream_ctx = &ctx1;
} else if (upstream == addr2) {
upstream_ctx = &ctx2;
} else {
for (int i = 0; i < inc_index; i++) {
if (inc_addr[i] == upstream) {
upstream_ctx = &inc_ctx[i];
break;
}
}
}
bool found = false;
for (int j = 0; j < upstream_ctx->participants_.size(); j++) {
if (upstream_ctx->participants_[j] == inc_ctx[i].get_addr()) {
found = true;
break;
}
}
EXPECT_EQ(true, found);
}
return true;
};
MockObParticipants participants;
participants.push_back(addr1);
participants.push_back(addr2);
participants.push_back(root_addr);
// ctx start to commit
root_ctx.commit(participants);
while (!is_all_released()) {
bool enable = false;
int64_t job = ObRandom::rand(0, 2);
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Decide Job", K(job));
if (0 == job) {
enable = transfer_to_new_op();
} else if (1 == job) {
enable = transfer_to_old_op();
} else {
enable = drive_op();
}
}
// ========== Check Test Valid ==========
EXPECT_EQ(true, root_ctx.check_status_valid(true/*should commit*/));
EXPECT_EQ(true, ctx1.check_status_valid(true/*should commit*/));
EXPECT_EQ(true, ctx2.check_status_valid(true/*should commit*/));
EXPECT_EQ(Ob2PCRole::ROOT, root_ctx.get_2pc_role());
print_tree();
validate_tree();
}
} // namespace transaction
} // namespace oceanbase
int main(int argc, char **argv)
{
system("rm -rf test_simple_tx_commit.log*");
OB_LOGGER.set_file_name("test_simple_tx_commit.log");
OB_LOGGER.set_log_level("INFO");
STORAGE_LOG(INFO, "begin unittest: test simple mock ob tx ctx");
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -12,7 +12,12 @@
#include <gtest/gtest.h>
#include <vector>
#define private public
#define protected public
#include "ob_mock_2pc_ctx.h"
#include "lib/random/ob_random.h"
#include "lib/function/ob_function.h"
// You can grep [DUP_MSG] to find all duplicated msg for testing
@ -520,7 +525,6 @@ TEST_F(TestDupMsgMockOb2pcCtx, test_dup_2pc_commit_response2)
// // [DUP_MSG]: ctx1 handle duplicated commit response
// EXPECT_EQ(OB_SUCCESS, dup_and_handle_msg(dup_commit_mail, &ctx1));
TRANS_LOG(INFO, "qc debug");
// [DUP_MSG]: ctx1 handle duplicated abort response
EXPECT_EQ(OB_SUCCESS, dup_and_handle_msg(dup_abort_mail, &ctx1));
@ -610,6 +614,219 @@ TEST_F(TestDupMsgMockOb2pcCtx, test_dup_2pc_clear_request)
EXPECT_EQ(true, ctx2.check_status_valid(true/*should commit*/));
}
TEST_F(TestDupMsgMockOb2pcCtx, test_random_dup_tree_commit)
{
// root coordinator
MockOb2pcCtx root_ctx;
root_ctx.init(&mailbox_mgr_);
int64_t root_addr = root_ctx.get_addr();
// normal participants
MockOb2pcCtx ctx1;
MockOb2pcCtx ctx2;
ctx1.init(&mailbox_mgr_);
ctx2.init(&mailbox_mgr_);
int64_t addr1 = ctx1.get_addr();
int64_t addr2 = ctx2.get_addr();
// incremental participants
const int64_t MAX_INC_CTX_COUNT = 100;
MockOb2pcCtx inc_ctx[MAX_INC_CTX_COUNT];
int64_t inc_addr[MAX_INC_CTX_COUNT];
int64_t inc_index = 0;
for (int i = 0; i < MAX_INC_CTX_COUNT; i++) {
inc_ctx[i].init(&mailbox_mgr_);
inc_addr[i] = inc_ctx[i].get_addr();
}
ObFunction<MockOb2pcCtx *(const int64_t participant)> get_ctx_op =
[&](const int64_t participant) -> MockOb2pcCtx * {
if (participant == root_addr) {
return &root_ctx;
} else if (participant == addr1) {
return &ctx1;
} else if (participant == addr2) {
return &ctx2;
} else {
for (int64_t i = 0; i < inc_index; i++) {
if (participant == inc_addr[i]) {
return &inc_ctx[i];
}
}
}
return NULL;
};
ObFunction<bool()> transfer_op =
[&]() -> bool {
if (inc_index >= MAX_INC_CTX_COUNT) {
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Transfer Op Limited", K(inc_index));
return false;
}
int64_t src_ctx_idx = ObRandom::rand(0, inc_index + 2);
int64_t dst_ctx_idx = inc_index;
MockOb2pcCtx *ctx;
if (src_ctx_idx == 0) {
ctx = &root_ctx;
} else if (src_ctx_idx == 1) {
ctx = &ctx1;
} else if (src_ctx_idx == 2) {
ctx = &ctx2;
} else {
ctx = &inc_ctx[src_ctx_idx - 3];
}
if (ctx->is_2pc_logging()) {
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Transfer Op Failed", K(src_ctx_idx), K(dst_ctx_idx), KPC(ctx));
return false;
}
inc_ctx[dst_ctx_idx].downstream_state_ = ctx->downstream_state_;
inc_ctx[dst_ctx_idx].upstream_state_ = ctx->upstream_state_;
inc_ctx[dst_ctx_idx].tx_state_ = ctx->tx_state_;
inc_ctx[dst_ctx_idx].coordinator_ = ctx->get_addr();
ctx->add_intermediate_participants(inc_addr[dst_ctx_idx]);
inc_index++;
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Transfer Op Succeed", K(src_ctx_idx), K(dst_ctx_idx), KPC(ctx), K(inc_ctx[dst_ctx_idx]));
return true;
};
ObFunction<bool()> dup_msg_op =
[&]() -> bool {
bool ret = mailbox_mgr_.random_dup_and_send();
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Dup Op Succeed", K(ret));
return ret;
};
ObFunction<bool()> drive_op =
[&]() -> bool {
int64_t ctx_idx = ObRandom::rand(0, inc_index + 2);
MockOb2pcCtx *ctx;
if (ctx_idx == 0) {
ctx = &root_ctx;
} else if (ctx_idx == 1) {
ctx = &ctx1;
} else if (ctx_idx == 2) {
ctx = &ctx2;
} else {
ctx = &inc_ctx[ctx_idx - 3];
}
bool is_mail_empty = ctx->mailbox_.empty();
bool is_log_empty = ctx->log_queue_.empty();
int64_t job = 0;
if (is_mail_empty && is_log_empty) {
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Drive Op Failed", KPC(ctx));
return false;
} else if (!is_mail_empty && !is_log_empty) {
job = ObRandom::rand(0, 1);
} else if (!is_mail_empty) {
job = 0;
} else if (!is_log_empty) {
job = 1;
}
if (job == 0) {
// has mail to drive
ctx->handle();
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Drive Mail Op Succeed", KPC(ctx));
return true;
} else if (job == 1) {
// has log to drive
ctx->apply();
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Drive Log Op Succeed", KPC(ctx));
return true;
}
ob_abort();
return false;
};
ObFunction<bool()> is_all_released =
[&]() -> bool {
if (root_ctx.downstream_state_ != ObTxState::CLEAR) {
return false;
} else if (ctx1.downstream_state_ != ObTxState::CLEAR) {
return false;
} else if (ctx2.downstream_state_ != ObTxState::CLEAR) {
return false;
}
for (int i = 0; i < inc_index; i++) {
if (inc_ctx[i].downstream_state_ != ObTxState::CLEAR) {
return false;
}
}
return true;
};
ObFunction<bool()> print_tree =
[&]() -> bool {
root_ctx.print_downstream();
ctx1.print_downstream();
ctx2.print_downstream();
for (int i = 0; i < inc_index; i++) {
inc_ctx[i].print_downstream();
}
return true;
};
ObFunction<bool()> validate_tree =
[&]() -> bool {
for (int i = 0; i < inc_index; i++) {
for (int j = 0; j < inc_ctx[i].participants_.size(); j++) {
int64_t participant = inc_ctx[i].participants_[j];
EXPECT_EQ(inc_ctx[i].addr_, get_ctx_op(participant)->get_coordinator());
}
}
return true;
};
MockObParticipants participants;
participants.push_back(addr1);
participants.push_back(addr2);
participants.push_back(root_addr);
// ctx start to commit
root_ctx.commit(participants);
while (!is_all_released()) {
bool enable = false;
int64_t job = ObRandom::rand(0, 4);
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Decide Job", K(job));
if (0 == job || 1 == job) {
transfer_op();
} else if (2 == job || 3 == job) {
drive_op();
} else {
dup_msg_op();
}
}
// ========== Check Test Valid ==========
EXPECT_EQ(true, root_ctx.check_status_valid(true/*should commit*/));
EXPECT_EQ(true, ctx1.check_status_valid(true/*should commit*/));
EXPECT_EQ(true, ctx2.check_status_valid(true/*should commit*/));
EXPECT_EQ(Ob2PCRole::ROOT, root_ctx.get_2pc_role());
print_tree();
validate_tree();
}
}
}

View File

@ -39,14 +39,12 @@ class MockLockForReadFunctor
public :
MockLockForReadFunctor(const int64_t snapshot) :
snapshot(snapshot), can_read(false),
trans_version(OB_INVALID_TIMESTAMP),
is_determined_state(false)
trans_version(OB_INVALID_TIMESTAMP)
{}
~MockLockForReadFunctor() {}
int64_t snapshot;
bool can_read;
int64_t trans_version;
bool is_determined_state;
};
class MockObPartTransCtx : public transaction::ObPartTransCtx
@ -61,8 +59,9 @@ public :
is_inited_ = true;
}
~MockObPartTransCtx() {}
int check_for_standby(const SCN &snapshot, bool &can_read,
SCN &trans_version, bool &is_determined_state)
int check_for_standby(const SCN &snapshot,
bool &can_read,
SCN &trans_version)
{
int ret = OB_ERR_SHARED_LOCK_CONFLICT;
SCN min_snapshot = SCN::max_scn();
@ -86,7 +85,6 @@ public :
if (tmp_state_info.version_ > snapshot) {
can_read = false;
trans_version.set_min();
is_determined_state = false;
ret = OB_SUCCESS;
} else {
version = MAX(version, tmp_state_info.version_);
@ -96,7 +94,6 @@ public :
case ObTxState::ABORT: {
can_read = false;
trans_version.set_min();
is_determined_state = true;
ret = OB_SUCCESS;
break;
}
@ -108,7 +105,6 @@ public :
can_read = false;
}
trans_version = tmp_state_info.version_;
is_determined_state = true;
ret = OB_SUCCESS;
break;
}
@ -119,7 +115,6 @@ public :
if (count != 0 && OB_ERR_SHARED_LOCK_CONFLICT == ret && state == ObTxState::PREPARE && version <= snapshot) {
can_read = true;
trans_version = version;
is_determined_state = true;
ret = OB_SUCCESS;
}
if (count == 0 || (OB_ERR_SHARED_LOCK_CONFLICT == ret && min_snapshot < snapshot)) {
@ -237,7 +232,6 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
max_decided_scn.convert_for_tx(10);
bool can_read = false;
SCN trans_version = SCN::min_scn();
bool is_determined_state = false;
ObStateInfo state_info;
ObAskStateRespMsg resp;
share::ObLSID coord_ls = share::ObLSID(1);
@ -246,11 +240,11 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
share::ObLSID part3_ls = share::ObLSID(1003);
MockObPartTransCtx coord(coord_ls);
MockObPartTransCtx part1(part1_ls), part2(part2_ls), part3(part3_ls);
share::ObLSArray parts;
ASSERT_EQ(OB_SUCCESS, parts.push_back(coord_ls));
ASSERT_EQ(OB_SUCCESS, parts.push_back(part1_ls));
ASSERT_EQ(OB_SUCCESS, parts.push_back(part2_ls));
ASSERT_EQ(OB_SUCCESS, parts.push_back(part3_ls));
ObTxCommitParts parts;
ASSERT_EQ(OB_SUCCESS, parts.push_back(ObTxExecPart(coord_ls, coord.epoch_, 0)));
ASSERT_EQ(OB_SUCCESS, parts.push_back(ObTxExecPart(part1_ls, part1.epoch_, 0)));
ASSERT_EQ(OB_SUCCESS, parts.push_back(ObTxExecPart(part2_ls, part2.epoch_, 0)));
ASSERT_EQ(OB_SUCCESS, parts.push_back(ObTxExecPart(part3_ls, part3.epoch_, 0)));
part1.set_2pc_upstream_(coord_ls);
part2.set_2pc_upstream_(coord_ls);
part3.set_2pc_upstream_(coord_ls);
@ -265,10 +259,10 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
part1.exec_info_.prepare_version_.set_min();
part2.set_downstream_state(ObTxState::INIT);
part3.set_downstream_state(ObTxState::UNKNOWN);
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_collect_state(state_info, max_decided_scn));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state_resp(state_info));
ASSERT_EQ(OB_SUCCESS, part2.handle_trans_collect_state(state_info, max_decided_scn));
@ -278,7 +272,7 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
resp.state_info_array_.reset();
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
TRANS_LOG(INFO, "test2:can read = false with upper prepare version");
coord.set_downstream_state(ObTxState::PREPARE);
@ -290,10 +284,10 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
can_read = true;
part1.state_info_array_.reset();
coord.state_info_array_.reset();
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_collect_state(state_info, max_decided_scn));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state_resp(state_info));
ASSERT_EQ(OB_SUCCESS, part2.handle_trans_collect_state(state_info, max_decided_scn));
@ -303,7 +297,7 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
resp.state_info_array_.reset();
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_SUCCESS, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_SUCCESS, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(false, can_read);
TRANS_LOG(INFO, "test3:can read = true with commit");
@ -324,10 +318,10 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
can_read = false;
part1.state_info_array_.reset();
coord.state_info_array_.reset();
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_collect_state(state_info, max_decided_scn));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state_resp(state_info));
ASSERT_EQ(OB_SUCCESS, part2.handle_trans_collect_state(state_info, max_decided_scn));
@ -337,7 +331,7 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
resp.state_info_array_.reset();
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_SUCCESS, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_SUCCESS, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(true, can_read);
TRANS_LOG(INFO, "test4:can read = false with commit");
@ -353,10 +347,10 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
can_read = true;
part1.state_info_array_.reset();
coord.state_info_array_.reset();
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_collect_state(state_info, max_decided_scn));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state_resp(state_info));
ASSERT_EQ(OB_SUCCESS, part2.handle_trans_collect_state(state_info, max_decided_scn));
@ -366,7 +360,7 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
resp.state_info_array_.reset();
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_SUCCESS, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_SUCCESS, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(false, can_read);
TRANS_LOG(INFO, "test5:can read = true with all prepare");
@ -383,10 +377,10 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
can_read = false;
part1.state_info_array_.reset();
coord.state_info_array_.reset();
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state(state_info, max_decided_scn));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state_resp(state_info));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_collect_state(state_info, max_decided_scn));
@ -398,7 +392,7 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
resp.state_info_array_.reset();
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_SUCCESS, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_SUCCESS, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(true, can_read);
TRANS_LOG(INFO, "test6:can read = false with all prepare");
@ -415,10 +409,10 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
can_read = true;
part1.state_info_array_.reset();
coord.state_info_array_.reset();
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state(state_info, max_decided_scn));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state_resp(state_info));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_collect_state(state_info, max_decided_scn));
@ -430,7 +424,7 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
resp.state_info_array_.reset();
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_SUCCESS, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_SUCCESS, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(false, can_read);
TRANS_LOG(INFO, "test7:OB_ERR_SHARED_LOCK_CONFLICT with unknown state");
@ -445,10 +439,10 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
part3.set_downstream_state(ObTxState::UNKNOWN);
part1.state_info_array_.reset();
coord.state_info_array_.reset();
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_collect_state(state_info, max_decided_scn));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state_resp(state_info));
ASSERT_EQ(OB_SUCCESS, part2.handle_trans_collect_state(state_info, max_decided_scn));
@ -458,7 +452,7 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
resp.state_info_array_.reset();
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
TRANS_LOG(INFO, "test8:can read = false with abort");
snapshot.convert_for_tx(300);
@ -473,10 +467,10 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
part1.state_info_array_.reset();
coord.state_info_array_.reset();
can_read = true;
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_collect_state(state_info, max_decided_scn));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state_resp(state_info));
ASSERT_EQ(OB_SUCCESS, part2.handle_trans_collect_state(state_info, max_decided_scn));
@ -486,7 +480,7 @@ TEST_F(TestObStandbyRead, trans_check_for_standby)
resp.state_info_array_.reset();
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_SUCCESS, part1.check_for_standby(snapshot, can_read, trans_version, is_determined_state));
ASSERT_EQ(OB_SUCCESS, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(false, can_read);
}

View File

@ -0,0 +1,289 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#include "share/ob_errno.h"
#include "lib/oblog/ob_log.h"
#define private public
#define protected public
#include "src/storage/tx/ob_trans_part_ctx.h"
#include "src/storage/tx/ob_tx_msg.h"
namespace oceanbase
{
using namespace common;
using namespace share;
using namespace transaction;
using namespace obrpc;
namespace unittest
{
class TestObStandbyReadTransfer : public ::testing::Test
{
public :
virtual void SetUp() {}
virtual void TearDown() {}
};
class MockLockForReadFunctor
{
public :
MockLockForReadFunctor(const int64_t snapshot) :
snapshot(snapshot), can_read(false),
trans_version(OB_INVALID_TIMESTAMP)
{}
~MockLockForReadFunctor() {}
int64_t snapshot;
bool can_read;
int64_t trans_version;
};
class MockObPartTransCtx : public transaction::ObPartTransCtx
{
public :
MockObPartTransCtx(const share::ObLSID &ls_id)
{
default_init_();
ls_id_ = ls_id;
lastest_snapshot_.reset();
standby_part_collected_.reset();
is_inited_ = true;
}
~MockObPartTransCtx() {}
int check_for_standby(const SCN &snapshot,
bool &can_read,
SCN &trans_version)
{
int ret = OB_ERR_SHARED_LOCK_CONFLICT;
SCN min_snapshot = SCN::max_scn();
ObStateInfo tmp_state_info;
// for all parts has been prepared
ObTxState state = ObTxState::PREPARE;
SCN version = SCN::min_scn();
int count = state_info_array_.count();
ARRAY_FOREACH_NORET(state_info_array_, i) {
tmp_state_info = state_info_array_.at(i);
min_snapshot = MIN(tmp_state_info.snapshot_version_, min_snapshot);
if (tmp_state_info.state_ != ObTxState::PREPARE) {
state = tmp_state_info.state_;
}
switch (tmp_state_info.state_) {
case ObTxState::UNKNOWN:
break;
case ObTxState::INIT:
case ObTxState::REDO_COMPLETE:
case ObTxState::PREPARE: {
if (tmp_state_info.version_ > snapshot) {
can_read = false;
trans_version.set_min();
ret = OB_SUCCESS;
} else {
version = MAX(version, tmp_state_info.version_);
}
break;
}
case ObTxState::ABORT: {
can_read = false;
trans_version.set_min();
ret = OB_SUCCESS;
break;
}
case ObTxState::COMMIT:
case ObTxState::CLEAR: {
if (tmp_state_info.version_ <= snapshot) {
can_read = true;
} else {
can_read = false;
}
trans_version = tmp_state_info.version_;
ret = OB_SUCCESS;
break;
}
default:
ret = OB_ERR_UNEXPECTED;
}
}
if (count != 0 && OB_ERR_SHARED_LOCK_CONFLICT == ret && state == ObTxState::PREPARE && version <= snapshot) {
can_read = true;
trans_version = version;
ret = OB_SUCCESS;
}
if (count == 0 || (OB_ERR_SHARED_LOCK_CONFLICT == ret && min_snapshot < snapshot)) {
if (REACH_TIME_INTERVAL(100 * 1000)) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = build_and_post_ask_state_msg(snapshot))) {
TRANS_LOG(WARN, "ask state from coord fail", K(ret), K(snapshot), KPC(this));
}
}
}
TRANS_LOG(INFO, "check for standby", K(ret), K(can_read), K(trans_version), KPC(this));
return ret;
}
int build_and_post_ask_state_msg(const SCN &snapshot)
{
int ret = OB_SUCCESS;
if (is_root()) {
build_and_post_collect_state_msg(snapshot);
}
return ret;
}
int handle_trans_ask_state(const SCN &snapshot, ObAskStateRespMsg &resp)
{
int ret = OB_SUCCESS;
CtxLockGuard guard(lock_);
build_and_post_collect_state_msg(snapshot);
if (OB_FAIL(resp.state_info_array_.assign(state_info_array_))) {
TRANS_LOG(WARN, "build ObAskStateRespMsg fail", K(ret), K(snapshot), KPC(this));
}
TRANS_LOG(INFO, "handle trans ask state", K(ret), K(resp), KPC(this));
return ret;
}
void build_and_post_collect_state_msg(const SCN &snapshot)
{
int ret = OB_SUCCESS;
if (state_info_array_.empty() && OB_FAIL(set_state_info_array_())) {
TRANS_LOG(WARN, "merge participants fail", K(ret));
}
TRANS_LOG(INFO, "build and post collect state", K(ret), K(state_info_array_), K(lastest_snapshot_));
}
};
TEST_F(TestObStandbyReadTransfer, trans_check_for_standby_transfer)
{
TRANS_LOG(INFO, "called", "func", test_info_->name());
SCN snapshot;
snapshot.convert_for_tx(100);
SCN compute_prepare_version;
SCN max_decided_scn;
max_decided_scn.convert_for_tx(10);
bool can_read = false;
SCN trans_version = SCN::min_scn();
ObStateInfo state_info;
ObAskStateRespMsg resp;
share::ObLSID coord_ls = share::ObLSID(1);
share::ObLSID part1_ls = share::ObLSID(1001);
share::ObLSID part2_ls = share::ObLSID(1002);
share::ObLSID part3_ls = share::ObLSID(1003);
MockObPartTransCtx coord(coord_ls);
MockObPartTransCtx part1(part1_ls), part2(part2_ls), part3(part3_ls);
ObTxCommitParts parts;
ASSERT_EQ(OB_SUCCESS, parts.push_back(ObTxExecPart(coord_ls, coord.epoch_, 0)));
ASSERT_EQ(OB_SUCCESS, parts.push_back(ObTxExecPart(part1_ls, part1.epoch_, 0)));
ASSERT_EQ(OB_SUCCESS, parts.push_back(ObTxExecPart(part2_ls, part2.epoch_, 0)));
ASSERT_EQ(OB_SUCCESS, parts.push_back(ObTxExecPart(part3_ls, part3.epoch_, 0)));
coord.set_2pc_participants_(parts);
part1.set_2pc_upstream_(coord_ls);
part2.set_2pc_upstream_(coord_ls);
part3.set_2pc_upstream_(coord_ls);
share::ObLSID part_transfer_ls = share::ObLSID(1004);
MockObPartTransCtx transfer_part(part_transfer_ls);
transfer_part.set_2pc_upstream_(part1_ls);
ASSERT_EQ(OB_SUCCESS, transfer_part.exec_info_.transfer_parts_.push_back(ObTxExecPart(part1_ls, -1, 1)));
ObTxCommitParts transfer_parts;
ASSERT_EQ(OB_SUCCESS, transfer_parts.push_back(ObTxExecPart(part_transfer_ls, -1, 1)));
part1.set_2pc_participants_(transfer_parts);
TRANS_LOG(INFO, "test1:OB_ERR_SHARED_LOCK_CONFLICT with unknown prepare version");
state_info.snapshot_version_ = snapshot;
coord.set_downstream_state(ObTxState::PREPARE);
coord.exec_info_.prepare_version_.convert_for_tx(10);
part1.set_downstream_state(ObTxState::PREPARE);
part1.exec_info_.prepare_version_.convert_for_tx(20);
part2.set_downstream_state(ObTxState::PREPARE);
part2.exec_info_.prepare_version_.convert_for_tx(30);
part3.set_downstream_state(ObTxState::PREPARE);
part3.exec_info_.prepare_version_.convert_for_tx(40);
transfer_part.set_downstream_state(ObTxState::PREPARE);
transfer_part.exec_info_.prepare_version_.convert_for_tx(50);
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ObCollectStateMsg collect_state_req;
ObCollectStateRespMsg collect_state_resp;
collect_state_req.check_info_ = coord.state_info_array_.at(0).check_info_;
collect_state_resp.sender_ = coord_ls;
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state(collect_state_resp, collect_state_req));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state_resp(collect_state_resp));
ASSERT_EQ(4, coord.state_info_array_.count());
collect_state_resp.transfer_parts_.reset();
collect_state_req.check_info_ = coord.state_info_array_.at(1).check_info_;
collect_state_resp.sender_ = part1_ls;
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_collect_state(collect_state_resp, collect_state_req));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state_resp(collect_state_resp));
ASSERT_EQ(5, coord.state_info_array_.count());
collect_state_resp.transfer_parts_.reset();
collect_state_req.check_info_ = coord.state_info_array_.at(2).check_info_;
collect_state_resp.sender_ = part2_ls;
ASSERT_EQ(OB_SUCCESS, part2.handle_trans_collect_state(collect_state_resp, collect_state_req));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state_resp(collect_state_resp));
ASSERT_EQ(5, coord.state_info_array_.count());
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_ERR_SHARED_LOCK_CONFLICT, part1.check_for_standby(snapshot, can_read, trans_version));
collect_state_req.check_info_ = coord.state_info_array_.at(3).check_info_;
collect_state_resp.sender_ = part3_ls;
ASSERT_EQ(OB_SUCCESS, part3.handle_trans_collect_state(collect_state_resp, collect_state_req));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state_resp(collect_state_resp));
ASSERT_EQ(5, coord.state_info_array_.count());
collect_state_req.check_info_ = coord.state_info_array_.at(4).check_info_;
collect_state_resp.sender_ = part_transfer_ls;
ASSERT_EQ(OB_SUCCESS, transfer_part.handle_trans_collect_state(collect_state_resp, collect_state_req));
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_collect_state_resp(collect_state_resp));
ASSERT_EQ(5, coord.state_info_array_.count());
ASSERT_EQ(OB_SUCCESS, coord.handle_trans_ask_state(snapshot, resp));
ASSERT_EQ(OB_SUCCESS, part1.handle_trans_ask_state_resp(resp));
ASSERT_EQ(OB_SUCCESS, part1.check_for_standby(snapshot, can_read, trans_version));
ASSERT_EQ(true, can_read);
compute_prepare_version.convert_for_sql(50);
ASSERT_EQ(compute_prepare_version, trans_version);
}
}//end of unittest
}//end of oceanbase
using namespace oceanbase;
using namespace oceanbase::common;
int main(int argc, char **argv)
{
int ret = 1;
ObLogger &logger = ObLogger::get_logger();
logger.set_file_name("test_ob_standby_read_transfer.log", true);
logger.set_log_level(OB_LOG_LEVEL_INFO);
testing::InitGoogleTest(&argc, argv);
ret = RUN_ALL_TESTS();
return ret;
}

View File

@ -151,6 +151,8 @@ TEST_F(TestObTxLog, tx_log_body_except_redo)
ObLSArray TEST_LS_ARRAY;
TEST_LS_ARRAY.push_back(LSKey());
ObTxCommitParts TEST_COMMIT_PARTS;
TEST_COMMIT_PARTS.push_back(ObTxExecPart(TEST_LS_KEY, 0, 0));
ObRedoLSNArray TEST_LOG_OFFSET_ARRY;
TEST_LOG_OFFSET_ARRY.push_back(TEST_LOG_OFFSET);
ObLSLogInfoArray TEST_INFO_ARRAY;
@ -173,7 +175,9 @@ TEST_F(TestObTxLog, tx_log_body_except_redo)
TEST_LOG_OFFSET_ARRY,
TEST_LS_ARRAY,
TEST_CLUSTER_VERSION,
TEST_XID);
TEST_XID,
TEST_COMMIT_PARTS,
TEST_EPOCH);
// ASSERT_EQ(OB_SUCCESS, fill_commit_state.before_serialize());
ObTxActiveInfoLog fill_active_state(TEST_ADDR,
TEST_TRANS_TYPE,
@ -289,6 +293,8 @@ TEST_F(TestObTxLog, tx_log_body_redo)
ObLSArray TEST_LS_ARRAY;
TEST_LS_ARRAY.push_back(LSKey());
ObTxCommitParts TEST_COMMIT_PARTS;
TEST_COMMIT_PARTS.push_back(ObTxExecPart(TEST_LS_KEY, 0, 0));
ObRedoLSNArray TEST_LOG_OFFSET_ARRY;
TEST_LOG_OFFSET_ARRY.push_back(TEST_LOG_OFFSET);
ObLSLogInfoArray TEST_INFO_ARRAY;
@ -311,7 +317,9 @@ TEST_F(TestObTxLog, tx_log_body_redo)
TEST_LOG_OFFSET_ARRY,
TEST_LS_ARRAY,
TEST_CLUSTER_VERSION,
TEST_XID);
TEST_XID,
TEST_COMMIT_PARTS,
TEST_EPOCH);
ObTxCommitLog fill_commit(share::SCN::base_scn(),
TEST_CHECKSUM,
TEST_LS_ARRAY,
@ -407,6 +415,8 @@ TEST_F(TestObTxLog, test_compat_bytes)
{
ObLSArray TEST_LS_ARRAY;
TEST_LS_ARRAY.push_back(LSKey());
ObTxCommitParts TEST_COMMIT_PARTS;
TEST_COMMIT_PARTS.push_back(ObTxExecPart(TEST_LS_KEY, 0, 0));
ObRedoLSNArray TEST_LOG_OFFSET_ARRY;
TEST_LOG_OFFSET_ARRY.push_back(TEST_LOG_OFFSET);
ObLSLogInfoArray TEST_INFO_ARRAY;
@ -429,7 +439,9 @@ TEST_F(TestObTxLog, test_compat_bytes)
TEST_LOG_OFFSET_ARRY,
TEST_LS_ARRAY,
TEST_CLUSTER_VERSION,
TEST_XID);
TEST_XID,
TEST_COMMIT_PARTS,
TEST_EPOCH);
ObTxCommitInfoLogTempRef commit_info_temp_ref;
ObTxCommitInfoLog replay_commit_info(commit_info_temp_ref);
@ -589,6 +601,10 @@ TEST_F(TestObTxLog, test_default_log_deserialize)
replay_member_cnt++;
EXPECT_EQ(fill_commit_state.get_xid(), replay_commit_state.get_xid());
replay_member_cnt++;
EXPECT_EQ(fill_commit_state.get_commit_parts().count(), replay_commit_state.get_commit_parts().count());
replay_member_cnt++;
EXPECT_EQ(fill_commit_state.get_epoch(), replay_commit_state.get_epoch());
replay_member_cnt++;
EXPECT_EQ(replay_member_cnt, fill_member_cnt);
ObTxPrepareLogTempRef prepare_temp_ref;
@ -676,6 +692,8 @@ void test_big_commit_info_log(int64_t log_size)
ObLSArray TEST_LS_ARRAY;
TEST_LS_ARRAY.push_back(LSKey());
ObTxCommitParts TEST_COMMIT_PARTS;
TEST_COMMIT_PARTS.push_back(ObTxExecPart(TEST_LS_KEY, 0, 0));
ObRedoLSNArray TEST_BIG_REDO_LSN_ARRAY;
for (int i = 0; i < log_size / sizeof(palf::LSN); i++) {
TEST_BIG_REDO_LSN_ARRAY.push_back(palf::LSN(i));
@ -685,7 +703,7 @@ void test_big_commit_info_log(int64_t log_size)
ObTxCommitInfoLog fill_commit_state(TEST_ADDR, TEST_LS_ARRAY, TEST_LS_KEY, TEST_IS_SUB2PC,
TEST_IS_DUP, TEST_CAN_ELR, TEST_TRACE_ID_STR, TEST_TRCE_INFO,
TEST_LOG_OFFSET, TEST_BIG_REDO_LSN_ARRAY, TEST_LS_ARRAY,
TEST_CLUSTER_VERSION, TEST_XID);
TEST_CLUSTER_VERSION, TEST_XID, TEST_COMMIT_PARTS, TEST_EPOCH);
ObTxLogBlockHeader
fill_block_header(TEST_ORG_CLUSTER_ID, TEST_LOG_ENTRY_NO, ObTransID(TEST_TX_ID), TEST_ADDR);
ASSERT_EQ(OB_SUCCESS, fill_block.init(TEST_TX_ID, fill_block_header));
@ -786,6 +804,10 @@ void test_big_commit_info_log(int64_t log_size)
replay_member_cnt++;
EXPECT_EQ(fill_commit_state.get_xid(), replay_commit_state.get_xid());
replay_member_cnt++;
EXPECT_EQ(fill_commit_state.get_commit_parts().count(), replay_commit_state.get_commit_parts().count());
replay_member_cnt++;
EXPECT_EQ(fill_commit_state.get_epoch(), replay_commit_state.get_epoch());
replay_member_cnt++;
EXPECT_EQ(replay_member_cnt, fill_member_cnt);
}

View File

@ -15,6 +15,8 @@
#define private public
#define protected public
#include "ob_mock_2pc_ctx.h"
#include "lib/random/ob_random.h"
#include "lib/function/ob_function.h"
namespace oceanbase
{
@ -241,6 +243,7 @@ TEST_F(TestMockOb2pcCtx, test_single_participants_prepare)
// ========== Two Phase Commit prepare Phase ==========
// ctx1 start prepare state
ctx1.coordinator_ = addr1;
ctx1.downstream_state_ = ObTxState::PREPARE;
ctx1.set_upstream_state(ObTxState::PREPARE);
ctx1.handle_timeout();
@ -258,6 +261,7 @@ TEST_F(TestMockOb2pcCtx, test_single_participants_precommit)
ctx1.participants_.assign(participants.begin(), participants.end());
// ========== Two Phase Commit precommit Phase ==========
ctx1.coordinator_ = addr1;
ctx1.downstream_state_ = ObTxState::PREPARE;
ctx1.set_upstream_state(ObTxState::PRE_COMMIT);
ctx1.handle_timeout();
@ -275,6 +279,7 @@ TEST_F(TestMockOb2pcCtx, test_single_participants_precommit2)
ctx1.participants_.assign(participants.begin(), participants.end());
// ========== Two Phase Commit precommit Phase ==========
ctx1.coordinator_ = addr1;
ctx1.downstream_state_ = ObTxState::PRE_COMMIT;
ctx1.set_upstream_state(ObTxState::PRE_COMMIT);
ctx1.handle_timeout();
@ -295,6 +300,7 @@ TEST_F(TestMockOb2pcCtx, test_single_participants_commit)
ctx1.participants_.assign(participants.begin(), participants.end());
// ========== Two Phase Commit precommit Phase ==========
ctx1.coordinator_ = addr1;
ctx1.downstream_state_ = ObTxState::COMMIT;
ctx1.set_upstream_state(ObTxState::COMMIT);
ctx1.handle_timeout();
@ -305,6 +311,379 @@ TEST_F(TestMockOb2pcCtx, test_single_participants_commit)
EXPECT_EQ(ObTxState::CLEAR, ctx1.get_downstream_state());
}
TEST_F(TestMockOb2pcCtx, test_basic_tree_commit)
{
// normal participants
MockOb2pcCtx ctx1;
MockOb2pcCtx ctx2;
// incremental participants
MockOb2pcCtx ctx3;
ctx1.init(&mailbox_mgr_);
ctx2.init(&mailbox_mgr_);
ctx3.init(&mailbox_mgr_);
auto addr1 = ctx1.get_addr();
auto addr2 = ctx2.get_addr();
auto addr3 = ctx3.get_addr();
MockObParticipants participants;
participants.push_back(addr1);
participants.push_back(addr2);
ctx2.add_intermediate_participants(addr3);
// ========== Two Phase Commit prepare Phase ==========
// ctx1 start to commit
ctx1.commit(participants);
// ctx2 handle prepare request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx2 apply prepare log
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// ctx3 handle prepare request
EXPECT_EQ(OB_SUCCESS, ctx3.handle());
// ctx3 apply prepare log
EXPECT_EQ(OB_SUCCESS, ctx3.apply());
// ctx2 handle prepare response
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx1 handle prepare response
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ctx1 apply prepare log
EXPECT_EQ(OB_SUCCESS, ctx1.apply());
// ========== Two Phase Commit pre commit Phase ======
// ctx2 handle pre-commit request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx3 handle pre-commit request
EXPECT_EQ(OB_SUCCESS, ctx3.handle());
// ctx2 handle pre-commit response
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx1 handle pre-commit response
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ========== Two Phase Commit commit Phase ==========
// ctx2 handle commit request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx3 handle commit request
EXPECT_EQ(OB_SUCCESS, ctx3.handle());
// ctx3 apply commit log
EXPECT_EQ(OB_SUCCESS, ctx3.apply());
// ctx2 handle commit response
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx2 apply commit log
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// ctx1 handle commit response
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ctx1 apply commit log
EXPECT_EQ(OB_SUCCESS, ctx1.apply());
// ========== Two Phase Commit clear Phase ==========
// ctx2 handle clear request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx3 handle clear request
EXPECT_EQ(OB_SUCCESS, ctx3.handle());
// ctx3 apply clear log
EXPECT_EQ(OB_SUCCESS, ctx3.apply());
// ctx2 apply clear log
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// ctx1 apply clear log
EXPECT_EQ(OB_SUCCESS, ctx1.apply());
// ========== Check Test Valid ==========
EXPECT_EQ(true, ctx1.check_status_valid(true/*should commit*/));
EXPECT_EQ(true, ctx2.check_status_valid(true/*should commit*/));
EXPECT_EQ(Ob2PCRole::ROOT, ctx1.get_2pc_role());
EXPECT_EQ(Ob2PCRole::INTERNAL, ctx2.get_2pc_role());
EXPECT_EQ(Ob2PCRole::LEAF, ctx3.get_2pc_role());
}
TEST_F(TestMockOb2pcCtx, test_basic_cycle_commit)
{
// normal participants
MockOb2pcCtx ctx1;
MockOb2pcCtx ctx2;
// incremental participants
MockOb2pcCtx ctx3;
ctx1.init(&mailbox_mgr_);
ctx2.init(&mailbox_mgr_);
ctx3.init(&mailbox_mgr_);
auto addr1 = ctx1.get_addr();
auto addr2 = ctx2.get_addr();
auto addr3 = ctx3.get_addr();
MockObParticipants participants;
participants.push_back(addr1);
participants.push_back(addr2);
ctx2.add_intermediate_participants(addr1);
ctx1.add_intermediate_participants(addr2);
// ========== Two Phase Commit prepare Phase ==========
// ctx1 start to commit
ctx1.commit(participants);
// ctx2 handle prepare request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx2 apply prepare log
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// ctx1 apply prepare log
EXPECT_EQ(OB_SUCCESS, ctx1.apply());
// ctx1 handle prepare request
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ctx2 handle prepare response
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx1 handle prepare response
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ========== Two Phase Commit pre commit Phase ======
// ctx2 handle pre-commit request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx1 handle pre-commit request
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ctx2 handle pre-commit response
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx1 handle pre-commit response
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ========== Two Phase Commit commit Phase ==========
// ctx2 handle commit request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx1 apply commit log
EXPECT_EQ(OB_SUCCESS, ctx1.apply());
// ctx1 handle commit request
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ctx2 handle commit response
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx2 apply commit log
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// ctx1 handle commit response
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ========== Two Phase Commit clear Phase ==========
// ctx2 handle clear request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx3 handle clear request
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ctx2 apply clear log
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// ctx1 apply clear log
EXPECT_EQ(OB_SUCCESS, ctx1.apply());
// ========== Check Test Valid ==========
EXPECT_EQ(true, ctx1.check_status_valid(true/*should commit*/));
EXPECT_EQ(true, ctx2.check_status_valid(true/*should commit*/));
EXPECT_EQ(Ob2PCRole::ROOT, ctx1.get_2pc_role());
EXPECT_EQ(Ob2PCRole::INTERNAL, ctx2.get_2pc_role());
}
TEST_F(TestMockOb2pcCtx, test_random_tree_commit)
{
// root coordinator
MockOb2pcCtx root_ctx;
root_ctx.init(&mailbox_mgr_);
int64_t root_addr = root_ctx.get_addr();
// normal participants
MockOb2pcCtx ctx1;
MockOb2pcCtx ctx2;
ctx1.init(&mailbox_mgr_);
ctx2.init(&mailbox_mgr_);
int64_t addr1 = ctx1.get_addr();
int64_t addr2 = ctx2.get_addr();
// incremental participants
const int64_t MAX_INC_CTX_COUNT = 100;
MockOb2pcCtx inc_ctx[MAX_INC_CTX_COUNT];
int64_t inc_addr[MAX_INC_CTX_COUNT];
int64_t inc_index = 0;
for (int i = 0; i < MAX_INC_CTX_COUNT; i++) {
inc_ctx[i].init(&mailbox_mgr_);
inc_addr[i] = inc_ctx[i].get_addr();
}
ObFunction<MockOb2pcCtx *(const int64_t participant)> get_ctx_op =
[&](const int64_t participant) -> MockOb2pcCtx * {
if (participant == root_addr) {
return &root_ctx;
} else if (participant == addr1) {
return &ctx1;
} else if (participant == addr2) {
return &ctx2;
} else {
for (int64_t i = 0; i < inc_index; i++) {
if (participant == inc_addr[i]) {
return &inc_ctx[i];
}
}
}
return NULL;
};
ObFunction<bool()> transfer_op =
[&]() -> bool {
if (inc_index >= MAX_INC_CTX_COUNT) {
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Transfer Op Limited", K(inc_index));
return false;
}
int64_t src_ctx_idx = ObRandom::rand(0, inc_index + 2);
int64_t dst_ctx_idx = inc_index;
MockOb2pcCtx *ctx;
if (src_ctx_idx == 0) {
ctx = &root_ctx;
} else if (src_ctx_idx == 1) {
ctx = &ctx1;
} else if (src_ctx_idx == 2) {
ctx = &ctx2;
} else {
ctx = &inc_ctx[src_ctx_idx - 3];
}
if (ctx->is_2pc_logging()) {
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Transfer Op Failed", K(src_ctx_idx), K(dst_ctx_idx), KPC(ctx));
return false;
}
inc_ctx[dst_ctx_idx].downstream_state_ = ctx->downstream_state_;
inc_ctx[dst_ctx_idx].upstream_state_ = ctx->upstream_state_;
inc_ctx[dst_ctx_idx].tx_state_ = ctx->tx_state_;
inc_ctx[dst_ctx_idx].coordinator_ = ctx->get_addr();
ctx->add_intermediate_participants(inc_addr[dst_ctx_idx]);
inc_index++;
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Transfer Op Succeed", K(src_ctx_idx), K(dst_ctx_idx), KPC(ctx), K(inc_ctx[dst_ctx_idx]));
return true;
};
ObFunction<bool()> drive_op =
[&]() -> bool {
int64_t ctx_idx = ObRandom::rand(0, inc_index + 2);
MockOb2pcCtx *ctx;
if (ctx_idx == 0) {
ctx = &root_ctx;
} else if (ctx_idx == 1) {
ctx = &ctx1;
} else if (ctx_idx == 2) {
ctx = &ctx2;
} else {
ctx = &inc_ctx[ctx_idx - 3];
}
bool is_mail_empty = ctx->mailbox_.empty();
bool is_log_empty = ctx->log_queue_.empty();
int64_t job = 0;
if (is_mail_empty && is_log_empty) {
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Drive Op Failed", KPC(ctx));
return false;
} else if (!is_mail_empty && !is_log_empty) {
job = ObRandom::rand(0, 1);
} else if (!is_mail_empty) {
job = 0;
} else if (!is_log_empty) {
job = 1;
}
if (job == 0) {
// has mail to drive
ctx->handle();
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Drive Mail Op Succeed", KPC(ctx));
return true;
} else if (job == 1) {
// has log to drive
ctx->apply();
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Drive Log Op Succeed", KPC(ctx));
return true;
}
ob_abort();
return false;
};
ObFunction<bool()> is_all_released =
[&]() -> bool {
if (root_ctx.downstream_state_ != ObTxState::CLEAR) {
return false;
} else if (ctx1.downstream_state_ != ObTxState::CLEAR) {
return false;
} else if (ctx2.downstream_state_ != ObTxState::CLEAR) {
return false;
}
for (int i = 0; i < inc_index; i++) {
if (inc_ctx[i].downstream_state_ != ObTxState::CLEAR) {
return false;
}
}
return true;
};
ObFunction<bool()> print_tree =
[&]() -> bool {
root_ctx.print_downstream();
ctx1.print_downstream();
ctx2.print_downstream();
for (int i = 0; i < inc_index; i++) {
inc_ctx[i].print_downstream();
}
return true;
};
ObFunction<bool()> validate_tree =
[&]() -> bool {
for (int i = 0; i < inc_index; i++) {
for (int j = 0; j < inc_ctx[i].participants_.size(); j++) {
int64_t participant = inc_ctx[i].participants_[j];
EXPECT_EQ(inc_ctx[i].addr_, get_ctx_op(participant)->get_coordinator());
}
}
return true;
};
MockObParticipants participants;
participants.push_back(addr1);
participants.push_back(addr2);
participants.push_back(root_addr);
// ctx start to commit
root_ctx.commit(participants);
while (!is_all_released()) {
bool enable = false;
int64_t job = ObRandom::rand(0, 1);
TRANS_LOG(INFO, "[TREE_COMMIT_GEAR] Decide Job", K(job));
if (0 == job) {
transfer_op();
} else {
drive_op();
}
}
// ========== Check Test Valid ==========
EXPECT_EQ(true, root_ctx.check_status_valid(true/*should commit*/));
EXPECT_EQ(true, ctx1.check_status_valid(true/*should commit*/));
EXPECT_EQ(true, ctx2.check_status_valid(true/*should commit*/));
EXPECT_EQ(Ob2PCRole::ROOT, root_ctx.get_2pc_role());
print_tree();
validate_tree();
}
}
}

View File

@ -103,10 +103,6 @@ TEST_F(TestMockObTxCtx, test_simple_tx_ctx1)
EXPECT_EQ(OB_SUCCESS, ctx1.init(ls_id1, trans_id1, nullptr, &data1, &mailbox_mgr_));
EXPECT_EQ(OB_SUCCESS, ctx2.init(ls_id2, trans_id1, nullptr, &data2, &mailbox_mgr_));
std::vector<ObLSID> participants;
participants.push_back(ls_id1);
participants.push_back(ls_id2);
EXPECT_EQ(OB_SUCCESS, build_scheduler_mailbox());
ctx1.addr_memo_[ls_id2] = ctx2.addr_;
@ -118,58 +114,42 @@ TEST_F(TestMockObTxCtx, test_simple_tx_ctx1)
ctx1.set_downstream_state(ObTxState::REDO_COMPLETE);
ctx1.exec_info_.participants_.push_back(ls_id1);
ctx1.exec_info_.participants_.push_back(ls_id2);
ctx1.exec_info_.commit_parts_.push_back(ObTxExecPart(ls_id1, ctx1.epoch_, 0));
ctx1.exec_info_.commit_parts_.push_back(ObTxExecPart(ls_id2, ctx2.epoch_, 0));
ctx1.scheduler_addr_ = scheduler_mailbox_.addr_;
ctx2.addr_memo_[ls_id1] = ctx1.addr_;
ctx2.ls_memo_[ctx1.addr_] = ls_id1;
ctx2.set_trans_type_(TransType::DIST_TRANS);
ctx2.upstream_state_ = ObTxState::PREPARE;
ctx2.exec_info_.upstream_ = ls_id1;
ctx2.log_queue_.push_back(ObTwoPhaseCommitLogType::OB_LOG_TX_PREPARE);
bool unused;
EXPECT_EQ(OB_SUCCESS, ctx2.do_prepare(unused));
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
EXPECT_EQ(OB_SUCCESS, ctx2.handle_timeout(100000));
// ctx2.addr_memo_[ls_id1] = ctx1.addr_;
// ctx2.ls_memo_[ctx1.addr_] = ls_id1;
// ctx2.set_trans_type_(TransType::DIST_TRANS);
// ctx2.upstream_state_ = ObTxState::PREPARE;
// ctx2.exec_info_.upstream_ = ls_id1;
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
EXPECT_EQ(OB_SUCCESS, ctx1.two_phase_commit());
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
EXPECT_EQ(OB_SUCCESS, ctx1.apply());
EXPECT_NE(share::SCN::max_scn(), ctx1.mt_ctx_.trans_version_);
// EXPECT_NE(share::SCN::max_scn(), ctx1.mt_ctx_.trans_version_);
// ========== Two Phase Commit prepare Phase ==========
// ctx1 start to commit
// mailbox_mgr_.send_to_head(tx_commit_mail, tx_commit_mail.to_);
// EXPECT_EQ(OB_SUCCESS, ctx1.handle());
EXPECT_EQ(OB_SUCCESS, ctx1.two_phase_commit());
// // ctx2 handle prepare request
// EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// // ctx2 handle prepare request
// EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// // ctx1 handle prepare response
// EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// // ctx1 apply prepare log
// EXPECT_EQ(OB_SUCCESS, ctx1.apply());
// ctx2 handle prepare request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx2 apply prepare log
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// ctx1 handle prepare response
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ctx1 apply prepare log
EXPECT_EQ(OB_SUCCESS, ctx1.apply());
// TODO shanyan.g
/*
// ========== Two Phase Commit pre commit Phase ======
// ctx2 handle pre commit request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx1 handle pre commit response
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
*/
//EXPECT_EQ(OB_SUCCESS, check_mail(scheduler_mailbox_,
// ctx1.get_mailbox_addr() /*from*/,
// scheduler_addr_,
// TX_COMMIT_RESP));
EXPECT_EQ(OB_SUCCESS, check_mail(scheduler_mailbox_,
ctx1.get_mailbox_addr() /*from*/,
scheduler_addr_,
TX_COMMIT_RESP));
/*
// ========== Two Phase Commit commit Phase ==========
// ctx2 handle commit request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
@ -187,13 +167,140 @@ TEST_F(TestMockObTxCtx, test_simple_tx_ctx1)
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// ctx1 apply clear log
EXPECT_EQ(OB_SUCCESS, ctx1.apply());
*/
// ========== Check Test Valid ==========
// EXPECT_EQ(true, ctx1.check_status_valid(true/*should commit*/));
// EXPECT_EQ(true, ctx2.check_status_valid(true/*should commit*/));
EXPECT_EQ(true, ctx1.check_status_valid(true/*should commit*/));
EXPECT_EQ(true, ctx2.check_status_valid(true/*should commit*/));
EXPECT_EQ(2, ctx1.coord_prepare_info_arr_.count());
int64_t id1 = ctx1.coord_prepare_info_arr_[0].id_.id_;
int64_t id2 = ctx1.coord_prepare_info_arr_[1].id_.id_;
EXPECT_EQ(true, id1 == 1 || id1 == 2);
EXPECT_EQ(true, id2 == 1 || id2 == 2);
TRANS_LOG(INFO, "qianchen debug", K(id1), K(id2), K(ctx1));
}
TEST_F(TestMockObTxCtx, test_transfer_tx_ctx)
{
int64_t ls_id_gen = ObLSID::MIN_USER_LS_ID;
MockObTxCtx ctx1;
MockObTxCtx ctx2;
MockObTxCtx ctx3;
ObTransID trans_id1(1);
ObLSID ls_id0(++ls_id_gen);
ObLSID ls_id1(++ls_id_gen);
ObLSID ls_id2(++ls_id_gen);
ObLSID ls_id3(++ls_id_gen);
ObTxData data1;
ObTxData data2;
ObTxData data3;
ctx1.change_to_leader();
ctx2.change_to_leader();
ctx3.change_to_leader();
EXPECT_EQ(OB_SUCCESS, ctx1.init(ls_id1, trans_id1, nullptr, &data1, &mailbox_mgr_));
EXPECT_EQ(OB_SUCCESS, ctx2.init(ls_id2, trans_id1, nullptr, &data2, &mailbox_mgr_));
EXPECT_EQ(OB_SUCCESS, ctx3.init(ls_id3, trans_id1, nullptr, &data3, &mailbox_mgr_));
EXPECT_EQ(OB_SUCCESS, build_scheduler_mailbox());
ctx1.addr_memo_[ls_id2] = ctx2.addr_;
ctx1.ls_memo_[ctx2.addr_] = ls_id2;
ctx2.addr_memo_[ls_id3] = ctx3.addr_;
ctx2.ls_memo_[ctx3.addr_] = ls_id3;
ctx1.set_trans_type_(TransType::DIST_TRANS);
ctx1.upstream_state_ = ObTxState::INIT;
// set self to root
ctx1.exec_info_.upstream_ = ls_id1;
ctx1.set_downstream_state(ObTxState::REDO_COMPLETE);
ctx1.exec_info_.participants_.push_back(ls_id1);
ctx1.exec_info_.participants_.push_back(ls_id2);
ctx1.scheduler_addr_ = scheduler_mailbox_.addr_;
ctx1.exec_info_.commit_parts_.push_back(ObTxExecPart(ls_id1, ctx1.epoch_, 0));
ctx1.exec_info_.commit_parts_.push_back(ObTxExecPart(ls_id2, ctx2.epoch_, 0));
// ctx2.addr_memo_[ls_id1] = ctx1.addr_;
// ctx2.ls_memo_[ctx1.addr_] = ls_id1;
// ctx2.set_trans_type_(TransType::DIST_TRANS);
// ctx2.upstream_state_ = ObTxState::PREPARE;
// ctx2.exec_info_.upstream_ = ls_id1;
// EXPECT_NE(share::SCN::max_scn(), ctx1.mt_ctx_.trans_version_);
EXPECT_EQ(OB_SUCCESS, ctx2.add_intermediate_participants(ls_id3, 1000));
// ========== Two Phase Commit prepare Phase ==========
// ctx1 start to commit
EXPECT_EQ(OB_SUCCESS, ctx1.two_phase_commit());
// ctx2 handle prepare request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx2 apply prepare log
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// ctx3 handle prepare request
EXPECT_EQ(OB_SUCCESS, ctx3.handle());
// ctx3 apply prepare log
EXPECT_EQ(OB_SUCCESS, ctx3.apply());
// ctx2 handle prepare response
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx1 handle prepare response
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ctx1 apply prepare log
EXPECT_EQ(OB_SUCCESS, ctx1.apply());
// ========== Two Phase Commit pre commit Phase ======
// ctx2 handle pre commit request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx3 handle pre commit request
EXPECT_EQ(OB_SUCCESS, ctx3.handle());
// ctx2 handle pre commit response
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx1 handle pre commit response
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
EXPECT_EQ(OB_SUCCESS, check_mail(scheduler_mailbox_,
ctx1.get_mailbox_addr() /*from*/,
scheduler_addr_,
TX_COMMIT_RESP));
// ========== Two Phase Commit commit Phase ==========
// ctx2 handle commit request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx2 apply commit log
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// ctx3 handle commit request
EXPECT_EQ(OB_SUCCESS, ctx3.handle());
// ctx3 apply commit log
EXPECT_EQ(OB_SUCCESS, ctx3.apply());
// ctx2 handle commit response
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx1 handle commit response
EXPECT_EQ(OB_SUCCESS, ctx1.handle());
// ctx1 apply commit log
EXPECT_EQ(OB_SUCCESS, ctx1.apply());
// ========== Two Phase Commit clear Phase ==========
// ctx2 handle clear request
EXPECT_EQ(OB_SUCCESS, ctx2.handle());
// ctx2 apply clear log
EXPECT_EQ(OB_SUCCESS, ctx2.apply());
// ctx3 handle commit request
EXPECT_EQ(OB_SUCCESS, ctx3.handle());
// ctx3 apply commit log
EXPECT_EQ(OB_SUCCESS, ctx3.apply());
// ctx1 apply clear log
EXPECT_EQ(OB_SUCCESS, ctx1.apply());
// ========== Check Test Valid ==========
EXPECT_EQ(true, ctx1.check_status_valid(true/*should commit*/));
EXPECT_EQ(true, ctx2.check_status_valid(true/*should commit*/));
EXPECT_EQ(true, ctx3.check_status_valid(true/*should commit*/));
EXPECT_EQ(2, ctx1.coord_prepare_info_arr_.count());
int64_t id1 = ctx1.coord_prepare_info_arr_[0].id_.id_;
int64_t id2 = ctx1.coord_prepare_info_arr_[1].id_.id_;
EXPECT_EQ(true, id1 == 1 || id1 == 2 || id1 == 3);
EXPECT_EQ(true, id2 == 1 || id2 == 2 || id2 == 3);
TRANS_LOG(INFO, "qianchen debug", K(id1), K(id2), K(ctx1));
}
}
namespace transaction
@ -201,6 +308,27 @@ namespace transaction
void ObTransCtx::after_unlock(CtxLockArg &)
{
}
void ObTransCtx::set_exiting_()
{
int tmp_ret = OB_SUCCESS;
if (!is_exiting_) {
is_exiting_ = true;
print_trace_log_if_necessary_();
const int64_t ctx_ref = get_ref();
if (NULL == ls_tx_ctx_mgr_) {
TRANS_LOG_RET(ERROR, tmp_ret, "ls_tx_ctx_mgr_ is null, unexpected error", KP(ls_tx_ctx_mgr_), "context", *this);
} else {
ls_tx_ctx_mgr_->dec_active_tx_count();
ls_tx_ctx_mgr_->del_tx_ctx(this);
TRANS_LOG(DEBUG, "transaction exiting", "context", *this, K(lbt()));
REC_TRANS_TRACE_EXT2(tlog_, exiting, OB_ID(ctx_ref), ctx_ref, OB_ID(arg1), session_id_);
}
}
}
}
}

View File

@ -1 +1,2 @@
storage_unittest(test_tx_ctx_table)
storage_unittest(test_tx_table_guards)

View File

@ -0,0 +1,242 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#define protected public
#define private public
#define UNITTEST
#include "storage/tx/ob_tx_data_define.h"
#include "storage/tx_table/ob_tx_table.h"
namespace oceanbase
{
using namespace ::testing;
using namespace transaction;
using namespace storage;
using namespace blocksstable;
using namespace share;
static int turn = 0;
static ObTxData src_tx_data;
static ObTxData dst_tx_data;
namespace storage {
int ObTxTable::check_with_tx_data(ObReadTxDataArg &read_tx_data_arg,
ObITxDataCheckFunctor &fn)
{
TRANS_LOG(INFO, "turn is", K(turn));
if (turn == 0) {
// use dst
turn++;
return fn(dst_tx_data, NULL);
} else if (turn == 1) {
// use src
turn++;
return fn(src_tx_data, NULL);
} else {
return OB_SUCCESS;
}
}
}
namespace unittest
{
class TestTxTableGuards : public ::testing::Test
{
public:
TestTxTableGuards() {}
virtual void SetUp() override
{
turn = 0;
src_tx_data.reset();
dst_tx_data.reset();
TRANS_LOG(INFO, "setup success");
}
virtual void TearDown() override
{
turn = 0;
src_tx_data.reset();
dst_tx_data.reset();
TRANS_LOG(INFO, "teardown success");
}
static void SetUpTestCase()
{
turn = 0;
src_tx_data.reset();
dst_tx_data.reset();
TRANS_LOG(INFO, "SetUpTestCase");
}
static void TearDownTestCase()
{
turn = 0;
src_tx_data.reset();
dst_tx_data.reset();
TRANS_LOG(INFO, "TearDownTestCase");
}
};
TEST_F(TestTxTableGuards, check_on_single_dest_1) {
ObTxTable dst_tx_table;
ObTxTableGuards guards;
share::SCN scn;
int64_t state;
share::SCN trans_version;
scn.convert_from_ts(100);
guards.tx_table_guard_.tx_table_ = &dst_tx_table;
dst_tx_data.commit_version_.convert_from_ts(1);
dst_tx_data.end_scn_.convert_from_ts(90);
dst_tx_data.state_ = ObTxData::COMMIT;
EXPECT_EQ(OB_SUCCESS, guards.get_tx_state_with_scn(ObTransID(1),
scn,
state,
trans_version));
EXPECT_EQ(dst_tx_data.commit_version_, trans_version);
EXPECT_EQ(dst_tx_data.state_, ObTxData::COMMIT);
}
TEST_F(TestTxTableGuards, check_on_single_dest_2) {
ObTxTable dst_tx_table;
ObTxTableGuards guards;
share::SCN scn;
int64_t state;
share::SCN trans_version;
scn.convert_from_ts(80);
guards.tx_table_guard_.tx_table_ = &dst_tx_table;
dst_tx_data.commit_version_.convert_from_ts(1);
dst_tx_data.end_scn_.convert_from_ts(90);
dst_tx_data.state_ = ObTxData::COMMIT;
EXPECT_EQ(OB_SUCCESS, guards.get_tx_state_with_scn(ObTransID(1),
scn,
state,
trans_version));
EXPECT_EQ(trans_version, SCN::max_scn());
EXPECT_EQ(state, ObTxData::RUNNING);
}
TEST_F(TestTxTableGuards, check_on_dest_and_src_1) {
ObTxTable dst_tx_table;
ObTxTable src_tx_table;
ObTxTableGuards guards;
share::SCN scn;
int64_t state;
share::SCN trans_version;
scn.convert_from_ts(100);
guards.tx_table_guard_.tx_table_ = &dst_tx_table;
guards.src_tx_table_guard_.tx_table_ = &src_tx_table;
dst_tx_data.commit_version_.convert_from_ts(1);
dst_tx_data.end_scn_.convert_from_ts(90);
dst_tx_data.state_ = ObTxData::COMMIT;
src_tx_data.commit_version_.convert_from_ts(2);
src_tx_data.end_scn_.convert_from_ts(90);
src_tx_data.state_ = ObTxData::COMMIT;
EXPECT_EQ(OB_SUCCESS, guards.get_tx_state_with_scn(ObTransID(1),
scn,
state,
trans_version));
EXPECT_EQ(dst_tx_data.commit_version_, trans_version);
EXPECT_EQ(dst_tx_data.state_, ObTxData::COMMIT);
}
TEST_F(TestTxTableGuards, check_on_dest_and_src_2) {
ObTxTable dst_tx_table;
ObTxTable src_tx_table;
ObTxTableGuards guards;
share::SCN scn;
int64_t state;
share::SCN trans_version;
scn.convert_from_ts(100);
guards.tx_table_guard_.tx_table_ = &dst_tx_table;
guards.src_tx_table_guard_.tx_table_ = &src_tx_table;
dst_tx_data.commit_version_.convert_from_ts(1);
dst_tx_data.end_scn_.convert_from_ts(110);
dst_tx_data.state_ = ObTxData::ABORT;
src_tx_data.commit_version_.convert_from_ts(2);
src_tx_data.end_scn_.convert_from_ts(90);
src_tx_data.state_ = ObTxData::COMMIT;
EXPECT_EQ(OB_SUCCESS, guards.get_tx_state_with_scn(ObTransID(1),
scn,
state,
trans_version));
EXPECT_EQ(SCN::max_scn(), trans_version);
EXPECT_EQ(state, ObTxData::RUNNING);
}
TEST_F(TestTxTableGuards, check_on_dest_and_src_3) {
ObTxTable dst_tx_table;
ObTxTable src_tx_table;
ObTxTableGuards guards;
share::SCN scn;
int64_t state;
share::SCN trans_version;
scn.convert_from_ts(100);
guards.tx_table_guard_.tx_table_ = &dst_tx_table;
guards.src_tx_table_guard_.tx_table_ = &src_tx_table;
dst_tx_data.commit_version_.convert_from_ts(1);
dst_tx_data.end_scn_.convert_from_ts(110);
dst_tx_data.state_ = ObTxData::COMMIT;
src_tx_data.commit_version_.convert_from_ts(2);
src_tx_data.end_scn_.convert_from_ts(90);
src_tx_data.state_ = ObTxData::ABORT;
EXPECT_EQ(OB_SUCCESS, guards.get_tx_state_with_scn(ObTransID(1),
scn,
state,
trans_version));
EXPECT_EQ(SCN::min_scn(), trans_version);
EXPECT_EQ(src_tx_data.state_, state);
}
} // namespace unittest
} // namespace oceanbase
int main(int argc, char **argv)
{
system("rm -rf test_tx_table_guards.log*");
OB_LOGGER.set_file_name("test_tx_table_guards.log");
OB_LOGGER.set_log_level("DEBUG");
STORAGE_LOG(INFO, "begin unittest: test tx table guards");
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}