Files
oceanbase/unittest/storage/transaction/test_ob_trans_msg.cpp
gm 4a92b6d7df reformat source code
according to code styles, 'AccessModifierOffset' should be -2.
2021-06-17 10:40:36 +08:00

913 lines
27 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "storage/transaction/ob_trans_msg.h"
#include "storage/transaction/ob_trans_msg_type.h"
#include <gtest/gtest.h>
#include "share/ob_errno.h"
#include "lib/oblog/ob_log.h"
#include "common/ob_partition_key.h"
#include "common/ob_clock_generator.h"
namespace oceanbase {
using namespace common;
using namespace transaction;
namespace unittest {
class TestObTransMsg : public ::testing::Test {
public:
virtual void SetUp()
{
init_();
}
virtual void TearDown()
{}
private:
int init_();
public:
// valid partition parameters
static const int64_t VALID_TABLE_ID = 1;
static const int32_t VALID_PARTITION_ID = 1;
static const int32_t VALID_PARTITION_COUNT = 100;
// invalid partition parameters
static const int64_t INVALID_TABLE_ID = -1;
static const int32_t INVALID_PARTITION_ID = -1;
static const int32_t INVALID_PARTITION_COUNT = -100;
static const int32_t PORT = 8080;
static const ObAddr::VER IP_TYPE = ObAddr::IPV4;
static const char* LOCAL_IP;
static const uint64_t TENANT_ID = 1001;
public:
common::ObAddr observer_;
};
const char* TestObTransMsg::LOCAL_IP = "127.0.0.1";
int TestObTransMsg::init_()
{
int ret = OB_SUCCESS;
observer_ = ObAddr(TestObTransMsg::IP_TYPE, TestObTransMsg::LOCAL_IP, TestObTransMsg::PORT);
return ret;
}
//////////////////////basic function test//////////////////////////////////////////
// test the init of ObTransMsg
TEST_F(TestObTransMsg, trans_msg_init_valid)
{
TRANS_LOG(INFO, "called", "func", test_info_->name());
const int64_t PARTICIPANT_NUM = 10;
// create an object of ObTtransID
ObTransID trans_id(observer_);
// create all participants
ObPartitionArray participants;
for (int64_t i = 0; i < PARTICIPANT_NUM; ++i) {
ObPartitionKey partition_key(VALID_TABLE_ID + i, VALID_PARTITION_ID, VALID_PARTITION_COUNT);
ASSERT_EQ(OB_SUCCESS, participants.push_back(partition_key));
}
// create scheduler and coordinator
ObAddr& scheduler = observer_;
const ObPartitionKey& coordinator = participants.at(0);
// create sender and receiver
const ObPartitionKey& sender = participants.at(0);
const ObPartitionKey& receiver = participants.at(1);
// create an object of ObStartTransParm
ObStartTransParam parms;
parms.set_access_mode(ObTransAccessMode::READ_ONLY);
parms.set_type(ObTransType::TRANS_USER);
parms.set_isolation(ObTransIsolation::READ_COMMITED);
int64_t msg_type = OB_TRANS_COMMIT_REQUEST;
int64_t trans_time = 9870;
int64_t sql_no = 1;
int32_t status = 1;
int64_t state = Ob2PCState::COMMIT;
const int64_t trans_version = 1;
const int64_t request_id = ObClockGenerator::getClock();
const ObStmtRollbackInfo stmt_rollback_info;
const ObString trace_id = "trance_id=xxx";
ObTransMsg msg;
ASSERT_EQ(OB_SUCCESS,
msg.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
observer_,
sql_no,
status,
state,
trans_version,
request_id,
MonotonicTs(0)));
ASSERT_TRUE(msg.is_valid());
// test the serialization of ObTransMsg
int64_t pos = 0;
const int64_t BUFFER_SIZE = 10240;
char buffer[BUFFER_SIZE];
ASSERT_EQ(OB_SUCCESS, msg.serialize(buffer, BUFFER_SIZE, pos));
// test the deserialization of ObTransMsg
ObTransMsg msg1;
int64_t start_index = 0;
ASSERT_EQ(OB_SUCCESS, msg1.deserialize(buffer, pos, start_index));
EXPECT_EQ(trans_id, msg1.get_trans_id());
EXPECT_EQ(msg_type, msg1.get_msg_type());
EXPECT_EQ(trans_time, msg1.get_trans_time());
EXPECT_EQ(sender, msg1.get_sender());
EXPECT_EQ(receiver, msg1.get_receiver());
EXPECT_EQ(scheduler, msg1.get_scheduler());
EXPECT_EQ(coordinator, msg1.get_coordinator());
EXPECT_EQ(sql_no, msg1.get_sql_no());
EXPECT_EQ(status, msg1.get_status());
EXPECT_EQ(observer_, msg1.get_sender_addr());
EXPECT_EQ(state, msg1.get_state());
EXPECT_EQ(trans_version, msg1.get_trans_version());
EXPECT_EQ(request_id, msg1.get_request_id());
// results of deserializing ObStartTransParam
ObStartTransParam start_trans_param = msg1.get_trans_param();
EXPECT_EQ(parms.get_access_mode(), start_trans_param.get_access_mode());
EXPECT_EQ(parms.get_type(), start_trans_param.get_type());
EXPECT_EQ(parms.get_isolation(), start_trans_param.get_isolation());
// results of deserializing participants
EXPECT_EQ(participants.count(), msg1.get_participants().count());
// operations of serializing and deserializing ObPartitionToLogId
ObPartitionKey partition(VALID_TABLE_ID, VALID_PARTITION_ID, VALID_PARTITION_COUNT);
int64_t log_id = 1;
ObPartitionLogInfo partition_log_info(partition, log_id, ObClockGenerator::getClock());
pos = 0;
start_index = 0;
ASSERT_EQ(OB_SUCCESS, partition_log_info.serialize(buffer, BUFFER_SIZE, pos));
// test the reulst of deserialization
ObPartitionLogInfo partition_log_info1;
ASSERT_EQ(OB_SUCCESS, partition_log_info1.deserialize(buffer, pos, start_index));
EXPECT_EQ(partition, partition_log_info1.get_partition());
EXPECT_EQ(log_id, partition_log_info1.get_log_id());
// invalid input parameters of init
ObStartTransParam parms1;
ObTransMsg msg2;
EXPECT_EQ(OB_INVALID_ARGUMENT,
msg2.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms1,
observer_,
sql_no,
status,
state,
trans_version,
ObClockGenerator::getClock(),
MonotonicTs(0)));
PartitionLogInfoArray log_arr;
EXPECT_EQ(OB_SUCCESS, log_arr.push_back(partition_log_info));
EXPECT_EQ(OB_NOT_INIT, msg2.set_partition_log_info_arr(log_arr));
EXPECT_EQ(OB_NOT_INIT, msg2.set_prepare_log_id(log_id));
EXPECT_FALSE(msg2.is_valid());
// repeated init
EXPECT_EQ(OB_SUCCESS,
msg2.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
observer_,
sql_no,
status,
state,
trans_version,
ObClockGenerator::getClock(),
MonotonicTs(0)));
EXPECT_TRUE(msg2.is_valid());
EXPECT_EQ(OB_INIT_TWICE,
msg2.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
observer_,
sql_no,
status,
state,
trans_version,
ObClockGenerator::getClock(),
MonotonicTs(0)));
}
// OB_TRANS_COMMIT_REQUEST/RESPONSE
// OB_TRANS_ABORT_REQUEST/RESPONSE
TEST_F(TestObTransMsg, trans_commit_abort_request_response_msg)
{
TRANS_LOG(INFO, "called", "func", test_info_->name());
const int64_t PARTICIPANT_NUM = 10;
ObTransID trans_id(observer_);
ObPartitionArray participants;
ObTransLocationCache cache;
for (int64_t i = 0; i < PARTICIPANT_NUM; ++i) {
ObPartitionKey partition_key(VALID_TABLE_ID + i, VALID_PARTITION_ID, VALID_PARTITION_COUNT);
ASSERT_EQ(OB_SUCCESS, participants.push_back(partition_key));
ObPartitionLeaderInfo info(partition_key, observer_);
ASSERT_EQ(OB_SUCCESS, cache.push_back(info));
}
const ObPartitionKey& sender = participants.at(0);
const ObPartitionKey& receiver = participants.at(1);
const ObAddr& scheduler = observer_;
const ObPartitionKey coordinator = participants.at(0);
// create an object of ObStartTransParm
ObStartTransParam parms;
parms.set_access_mode(ObTransAccessMode::READ_ONLY);
parms.set_type(ObTransType::TRANS_USER);
parms.set_isolation(ObTransIsolation::READ_COMMITED);
int64_t trans_time = 9870;
const int32_t status = OB_SUCCESS;
// OB_TRANS_COMMIT_REQUEST
int64_t msg_type = OB_TRANS_STMT_ROLLBACK_REQUEST;
const int64_t commit_times = 1;
ObTransMsg trans_commit_request_msg;
const int64_t need_wait_interval_us = 0;
const ObStmtRollbackInfo stmt_rollback_info;
const ObString trace_id = "trance_id=xxx";
const ObXATransID xid;
EXPECT_EQ(OB_INVALID_ARGUMENT,
trans_commit_request_msg.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
observer_,
cache,
commit_times,
MonotonicTs(0),
false,
stmt_rollback_info,
trace_id,
xid));
msg_type = OB_TRANS_COMMIT_REQUEST;
EXPECT_EQ(OB_SUCCESS,
trans_commit_request_msg.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
observer_,
cache,
commit_times,
MonotonicTs(0),
false,
stmt_rollback_info,
trace_id,
xid));
EXPECT_EQ(OB_INIT_TWICE,
trans_commit_request_msg.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
observer_,
cache,
commit_times,
MonotonicTs(0),
false,
stmt_rollback_info,
trace_id,
xid));
EXPECT_EQ(true, trans_commit_request_msg.is_valid());
// OB_TRANS_COMMIT_RESPONSE
ObTransMsg trans_commit_response_msg;
msg_type = OB_TRANS_COMMIT_RESPONSE;
trans_time = -1;
EXPECT_EQ(OB_INVALID_ARGUMENT,
trans_commit_response_msg.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
parms,
observer_,
status,
commit_times,
need_wait_interval_us));
trans_time = 1;
EXPECT_EQ(OB_SUCCESS,
trans_commit_response_msg.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
parms,
observer_,
status,
commit_times,
need_wait_interval_us));
EXPECT_EQ(OB_INIT_TWICE,
trans_commit_response_msg.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
parms,
observer_,
status,
commit_times,
need_wait_interval_us));
EXPECT_EQ(true, trans_commit_response_msg.is_valid());
// OB_TRANS_ABORT_REQUEST
msg_type = OB_TRANS_STMT_ROLLBACK_REQUEST;
ObTransMsg trans_abort_request_msg;
EXPECT_EQ(OB_INVALID_ARGUMENT,
trans_abort_request_msg.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
observer_,
cache,
commit_times,
MonotonicTs(0),
false,
stmt_rollback_info,
trace_id,
xid));
msg_type = OB_TRANS_ABORT_REQUEST;
EXPECT_EQ(OB_SUCCESS,
trans_abort_request_msg.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
observer_,
cache,
commit_times,
MonotonicTs(0),
false,
stmt_rollback_info,
trace_id,
xid));
EXPECT_EQ(OB_INIT_TWICE,
trans_abort_request_msg.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
observer_,
cache,
commit_times,
MonotonicTs(0),
false,
stmt_rollback_info,
trace_id,
xid));
EXPECT_EQ(true, trans_abort_request_msg.is_valid());
// OB_TRANS_ABORT_RESPONSE
ObTransMsg trans_abort_response_msg;
msg_type = OB_TRANS_ABORT_RESPONSE;
trans_time = -1;
EXPECT_EQ(OB_INVALID_ARGUMENT,
trans_abort_response_msg.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
parms,
observer_,
status,
commit_times,
need_wait_interval_us));
trans_time = 1;
EXPECT_EQ(OB_SUCCESS,
trans_abort_response_msg.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
parms,
observer_,
status,
commit_times,
need_wait_interval_us));
EXPECT_EQ(OB_INIT_TWICE,
trans_abort_response_msg.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
parms,
observer_,
status,
commit_times,
need_wait_interval_us));
EXPECT_EQ(true, trans_abort_response_msg.is_valid());
}
// test OB_TRANS_ERROR_MSG
TEST_F(TestObTransMsg, trans_error_msg)
{
TRANS_LOG(INFO, "called", "func", test_info_->name());
int64_t msg_type = OB_TRANS_MSG_UNKNOWN;
int64_t error_msg_type = OB_TRANS_MSG_UNKNOWN;
ObTransID trans_id(observer_);
ObPartitionKey partition_key(VALID_TABLE_ID, VALID_PARTITION_ID, VALID_PARTITION_COUNT);
const ObPartitionKey& sender = partition_key;
const int32_t status = OB_SUCCESS;
const ObStmtRollbackInfo stmt_rollback_info;
ObTransMsg error_msg;
EXPECT_EQ(OB_INVALID_ARGUMENT,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
msg_type = OB_TRANS_ERROR_MSG;
// error_msg_type = OB_TRANS_MSG_TYPE_UNKNOWN
EXPECT_EQ(OB_INVALID_ARGUMENT,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
ASSERT_FALSE(error_msg.is_valid());
error_msg.reset();
error_msg_type = OB_TRANS_STMT_CREATE_CTX_REQUEST;
EXPECT_EQ(OB_SUCCESS,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
EXPECT_EQ(true, error_msg.is_valid());
error_msg.reset();
error_msg_type = OB_TRANS_STMT_CREATE_CTX_RESPONSE;
EXPECT_EQ(OB_SUCCESS,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
EXPECT_EQ(true, error_msg.is_valid());
error_msg.reset();
error_msg_type = OB_TRANS_STMT_ROLLBACK_REQUEST;
EXPECT_EQ(OB_SUCCESS,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
EXPECT_EQ(true, error_msg.is_valid());
error_msg.reset();
error_msg_type = OB_TRANS_STMT_ROLLBACK_RESPONSE;
EXPECT_EQ(OB_SUCCESS,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
EXPECT_EQ(true, error_msg.is_valid());
error_msg.reset();
error_msg_type = OB_TRANS_COMMIT_REQUEST;
EXPECT_EQ(OB_SUCCESS,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
EXPECT_EQ(true, error_msg.is_valid());
error_msg.reset();
error_msg_type = OB_TRANS_COMMIT_RESPONSE;
EXPECT_EQ(OB_SUCCESS,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
EXPECT_EQ(true, error_msg.is_valid());
error_msg.reset();
error_msg_type = OB_TRANS_ABORT_REQUEST;
EXPECT_EQ(OB_SUCCESS,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
EXPECT_EQ(true, error_msg.is_valid());
error_msg.reset();
error_msg_type = OB_TRANS_ABORT_RESPONSE;
EXPECT_EQ(OB_SUCCESS,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
EXPECT_EQ(true, error_msg.is_valid());
error_msg.reset();
error_msg_type = OB_TRANS_2PC_PREPARE_REQUEST;
EXPECT_EQ(OB_SUCCESS,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
EXPECT_EQ(true, error_msg.is_valid());
error_msg.reset();
error_msg_type = OB_TRANS_2PC_PREPARE_RESPONSE;
EXPECT_EQ(OB_SUCCESS,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
EXPECT_EQ(true, error_msg.is_valid());
error_msg.reset();
error_msg_type = OB_TRANS_2PC_COMMIT_REQUEST;
EXPECT_EQ(OB_SUCCESS,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
EXPECT_EQ(true, error_msg.is_valid());
error_msg.reset();
error_msg_type = OB_TRANS_2PC_COMMIT_RESPONSE;
EXPECT_EQ(OB_SUCCESS,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
EXPECT_EQ(true, error_msg.is_valid());
error_msg.reset();
error_msg_type = OB_TRANS_2PC_ABORT_REQUEST;
EXPECT_EQ(OB_SUCCESS,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
EXPECT_EQ(true, error_msg.is_valid());
error_msg.reset();
error_msg_type = OB_TRANS_2PC_ABORT_RESPONSE;
EXPECT_EQ(OB_SUCCESS,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
EXPECT_EQ(true, error_msg.is_valid());
error_msg.reset();
error_msg_type = OB_TRANS_2PC_CLEAR_REQUEST;
EXPECT_EQ(OB_SUCCESS,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
EXPECT_EQ(true, error_msg.is_valid());
error_msg.reset();
error_msg_type = OB_TRANS_2PC_CLEAR_RESPONSE;
EXPECT_EQ(OB_SUCCESS,
error_msg.init(msg_type, error_msg_type, TestObTransMsg::TENANT_ID, trans_id, sender, observer_, status, 1, 1));
EXPECT_EQ(true, error_msg.is_valid());
error_msg.reset();
}
// OB_TRANS_2PC_PREPARE_REQUEST/RESPONSE
// OB_TRANS_2PC_COMMIT_REQUEST/RESPONSE
TEST_F(TestObTransMsg, trans_2pc_prepare_commit_request_response_msg)
{
TRANS_LOG(INFO, "called", "func", test_info_->name());
const int64_t PARTICIPANT_NUM = 10;
ObTransID trans_id(observer_);
ObPartitionArray participants;
for (int64_t i = 0; i < PARTICIPANT_NUM; ++i) {
ObPartitionKey partition_key(VALID_TABLE_ID + i, VALID_PARTITION_ID, VALID_PARTITION_COUNT);
ASSERT_EQ(OB_SUCCESS, participants.push_back(partition_key));
}
const ObPartitionKey& sender = participants.at(0);
const ObPartitionKey& receiver = participants.at(1);
const ObAddr& scheduler = observer_;
const ObPartitionKey coordinator = participants.at(0);
// create an object of ObStartTransParm
ObStartTransParam parms;
parms.set_access_mode(ObTransAccessMode::READ_ONLY);
parms.set_type(ObTransType::TRANS_USER);
parms.set_isolation(ObTransIsolation::READ_COMMITED);
int64_t trans_time = 9870;
const int32_t status = OB_SUCCESS;
const int64_t request_id = ObClockGenerator::getClock();
const int64_t prepare_log_id = 100;
const int64_t trans_version = ObClockGenerator::getClock();
int64_t state = Ob2PCState::PREPARE;
const PartitionLogInfoArray arr;
const ObStmtRollbackInfo stmt_rollback_info;
const ObString trace_id = "trance_id=xxx";
const ObXATransID xid;
const bool is_xa_prepare = false;
// OB_TRANS_2PC_PREPARE_REQUEST
int64_t msg_type = OB_TRANS_STMT_ROLLBACK_REQUEST;
ObTransMsg trans_2pc_prepare_request;
EXPECT_EQ(OB_INVALID_ARGUMENT,
trans_2pc_prepare_request.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
observer_,
status,
request_id,
MonotonicTs(0),
arr,
stmt_rollback_info,
trace_id,
xid,
is_xa_prepare));
msg_type = OB_TRANS_2PC_PREPARE_REQUEST;
EXPECT_EQ(OB_SUCCESS,
trans_2pc_prepare_request.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
observer_,
status,
request_id,
MonotonicTs(0),
arr,
stmt_rollback_info,
trace_id,
xid,
is_xa_prepare));
EXPECT_EQ(OB_INIT_TWICE,
trans_2pc_prepare_request.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
observer_,
status,
request_id,
MonotonicTs(0),
arr,
stmt_rollback_info,
trace_id,
xid,
is_xa_prepare));
EXPECT_EQ(true, trans_2pc_prepare_request.is_valid());
// OB_TRANS_2PC_PREPARE_RESPONSE
ObTransMsg trans_2pc_prepare_response;
msg_type = OB_TRANS_2PC_PREPARE_RESPONSE;
PartitionLogInfoArray tmp_array;
trans_time = -1;
EXPECT_EQ(OB_INVALID_ARGUMENT,
trans_2pc_prepare_response.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
prepare_log_id,
ObClockGenerator::getClock(),
observer_,
status,
state,
trans_version,
request_id,
tmp_array,
0,
trace_id,
xid,
is_xa_prepare));
trans_time = 1;
EXPECT_EQ(OB_SUCCESS,
trans_2pc_prepare_response.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
prepare_log_id,
ObClockGenerator::getClock(),
observer_,
status,
state,
trans_version,
request_id,
tmp_array,
0,
trace_id,
xid,
is_xa_prepare));
EXPECT_EQ(OB_INIT_TWICE,
trans_2pc_prepare_response.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
prepare_log_id,
ObClockGenerator::getClock(),
observer_,
status,
state,
trans_version,
request_id,
tmp_array,
0,
trace_id,
xid,
is_xa_prepare));
// OB_TRANS_2PC_COMMIT_REQUEST
ObPartitionLogInfo partition_log_info(receiver, prepare_log_id, ObClockGenerator::getClock());
PartitionLogInfoArray partition_log_info_arr;
EXPECT_EQ(OB_SUCCESS, partition_log_info_arr.push_back(partition_log_info));
msg_type = OB_TRANS_ABORT_REQUEST;
ObTransMsg trans_2pc_commit_request;
EXPECT_EQ(OB_INVALID_ARGUMENT,
trans_2pc_commit_request.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
trans_version,
observer_,
request_id,
partition_log_info_arr,
OB_SUCCESS));
msg_type = OB_TRANS_2PC_COMMIT_REQUEST;
EXPECT_EQ(OB_SUCCESS,
trans_2pc_commit_request.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
trans_version,
observer_,
request_id,
partition_log_info_arr,
OB_SUCCESS));
EXPECT_EQ(OB_INIT_TWICE,
trans_2pc_commit_request.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
trans_version,
observer_,
request_id,
partition_log_info_arr,
OB_SUCCESS));
EXPECT_EQ(true, trans_2pc_commit_request.is_valid());
// OB_TRANS_2PC_COMMIT_RESPONSE
ObTransMsg trans_2pc_commit_response;
msg_type = OB_TRANS_2PC_COMMIT_RESPONSE;
trans_time = -1;
EXPECT_EQ(OB_INVALID_ARGUMENT,
trans_2pc_commit_response.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
trans_version,
observer_,
request_id,
partition_log_info_arr,
OB_SUCCESS));
trans_time = 1;
EXPECT_EQ(OB_SUCCESS,
trans_2pc_commit_response.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
trans_version,
observer_,
request_id,
partition_log_info_arr,
OB_SUCCESS));
EXPECT_EQ(OB_INIT_TWICE,
trans_2pc_commit_response.init(TestObTransMsg::TENANT_ID,
trans_id,
msg_type,
trans_time,
sender,
receiver,
scheduler,
coordinator,
participants,
parms,
trans_version,
observer_,
request_id,
partition_log_info_arr,
OB_SUCCESS));
EXPECT_EQ(true, trans_2pc_commit_response.is_valid());
}
} // namespace unittest
} // namespace oceanbase
using namespace oceanbase;
using namespace oceanbase::common;
int main(int argc, char** argv)
{
int ret = 1;
ObLogger& logger = ObLogger::get_logger();
logger.set_file_name("test_ob_trans_msg.log", true);
logger.set_log_level(OB_LOG_LEVEL_INFO);
testing::InitGoogleTest(&argc, argv);
ret = RUN_ALL_TESTS();
return ret;
}