Files
oceanbase/unittest/obcdc/test_ob_log_part_trans_resolver.cpp
2022-03-25 18:10:38 +08:00

967 lines
30 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX OBLOG_FETCHER
#include "gtest/gtest.h"
#include "share/ob_define.h"
#include "storage/ob_storage_log_type.h"
#include "storage/transaction/ob_trans_log.h"
#include "ob_log_fetch_stat_info.h"
#define private public
#include "obcdc/src/ob_log_part_trans_resolver.h"
#include "test_trans_log_generator.h"
#include "test_sp_trans_log_generator.h"
using namespace oceanbase;
using namespace common;
using namespace liboblog;
using namespace transaction;
using namespace storage;
using namespace clog;
namespace oceanbase
{
namespace unittest
{
// Task Pool
static const int64_t PREALLOC_POOL_SIZE = 10 * 1024;
static const int64_t TRANS_TASK_PAGE_SIZE = 1024;
static const int64_t TRANS_TASK_BLOCK_SIZE = 4 * 1024 *1024;
static const int64_t PREALLOC_PAGE_COUNT = 1024;
// For task pool init
ObConcurrentFIFOAllocator fifo_allocator;
// test trans count
static const int64_t TRANS_COUNT = 100;
// redo log count
static const int64_t TRANS_REDO_LOG_COUNT = 100;
int init_task_pool(ObLogTransTaskPool<PartTransTask> &task_pool)
{
int ret = OB_SUCCESS;
ret = fifo_allocator.init(16 * _G_, 16 * _M_, OB_MALLOC_NORMAL_BLOCK_SIZE);
EXPECT_EQ(OB_SUCCESS, ret);
ret = task_pool.init(&fifo_allocator, PREALLOC_POOL_SIZE, TRANS_TASK_PAGE_SIZE,
TRANS_TASK_BLOCK_SIZE, true, PREALLOC_PAGE_COUNT);
EXPECT_EQ(OB_SUCCESS, ret);
return ret;
}
/*
* Test scenario.
* For N transactions, half of which commit, half of which abort
* Each transaction has a random redo log
*
* Log sequence: redo, redo, ... redo, prepare, commit/abort
*
* // redo info
* redo_log_cnt
* ObLogIdArray redo_log_ids;
*
* // prepare info
* int64_t seq;
* common::ObPartitionKey partition;
* int64_t prepare_timestamp;
* ObTransID trans_id;
* uint64_t prepare_log_id;
* uint64_t cluster_id;
*
* // commit info
* int64_t global_trans_version;
* PartitionLogInfoArray *participants;
*
*/
TEST(PartTransResolver, BasicTest1)
{
int err = OB_SUCCESS;
// Commit half trans, whose has even idx.
const int64_t trans_cnt = TRANS_COUNT;
const int64_t commit_trans_cnt = trans_cnt / 2;
const int64_t abort_trans_cnt = trans_cnt - commit_trans_cnt;
ObAddr addr(ObAddr::IPV4, "127.0.0.1", 8888);
TransLogInfo trans_log_info;
// redo info
int64_t redo_cnt = 0;
ObLogIdArray redo_log_ids;
// prepare info
int64_t seq = 0;
ObPartitionKey pkey(1000U, 1, 1);
int64_t prepare_timestamp = PREPARE_TIMESTAMP;
ObTransID trans_id(addr);
uint64_t prepare_log_id = 0;
uint64_t CLOUSTER_ID = 1000;
// commit info
int64_t global_trans_version = GLOBAL_TRANS_VERSION;
PartitionLogInfoArray ptl_ids;
// Log gen.
TransLogEntryGeneratorBase log_gen(pkey, trans_id);
// Task Pool.
ObLogTransTaskPool<PartTransTask> task_pool;
EXPECT_EQ(OB_SUCCESS, init_task_pool(task_pool));
// Parser.
MockParser1 parser;
EXPECT_EQ(OB_SUCCESS, parser.init());
// Partitioned Transaction Parser
PartTransResolver pr;
err = pr.init(pkey, parser, task_pool);
EXPECT_EQ(OB_SUCCESS, err);
// Read logs.
ObLogIdArray missing;
TransStatInfo tsi;
volatile bool stop_flag = false;
for (int64_t idx = 0; idx < trans_cnt; ++idx) {
redo_cnt = get_timestamp() % TRANS_REDO_LOG_COUNT + 1;
redo_log_ids.reset();
for (int64_t cnt = 0; cnt < redo_cnt; ++cnt) {
EXPECT_EQ(OB_SUCCESS, redo_log_ids.push_back(log_gen.get_log_id() + cnt));
}
prepare_log_id = log_gen.get_log_id() + redo_cnt;
ptl_ids.reset();
ObPartitionLogInfo ptl_id(pkey, prepare_log_id, PREPARE_TIMESTAMP);
err = ptl_ids.push_back(ptl_id);
EXPECT_EQ(OB_SUCCESS, err);
// push fixed participant information
for (int64_t idx = 0; idx < FIXED_PART_COUNT; ++idx) {
err = ptl_ids.push_back(FIXED_PART_INFO[idx]);
EXPECT_EQ(OB_SUCCESS, err);
}
trans_log_info.reset(redo_cnt, redo_log_ids, seq, pkey, prepare_timestamp,
trans_id, prepare_log_id, CLOUSTER_ID, global_trans_version, ptl_ids);
EXPECT_EQ(OB_SUCCESS, parser.push_into_queue(&trans_log_info));
seq++;
// Commit trans with even idx.
log_gen.next_trans(redo_cnt, (0 == idx % 2));
clog::ObLogEntry log_entry;
while (OB_SUCCESS == log_gen.next_log_entry(log_entry)) {
err = pr.read(log_entry, missing, tsi);
EXPECT_EQ(OB_SUCCESS, err);
}
err = pr.flush(stop_flag);
EXPECT_EQ(OB_SUCCESS, err);
// Verify the correctness of partition task data
bool check_result;
EXPECT_EQ(OB_SUCCESS, parser.get_check_result(check_result));
EXPECT_TRUE(check_result);
LOG_DEBUG("debug", K(idx));
}
// Check.
EXPECT_EQ(commit_trans_cnt, parser.get_commit_trans_cnt());
EXPECT_EQ(abort_trans_cnt, parser.get_abort_trans_cnt());
// Destroy.
pr.destroy();
task_pool.destroy();
fifo_allocator.destroy();
}
/*
* Test scenario.
* For N transactions, half of which commit, half of which abort
* Each transaction has a random redo log
* Log sequence: redo, redo... redo-prepare, commit/abort
* redo-prepare in a log entry
*
*/
TEST(PartTransResolver, BasicTest2)
{
int err = OB_SUCCESS;
// Commit half trans, whose has even idx.
const int64_t trans_cnt = TRANS_COUNT;
const int64_t commit_trans_cnt = trans_cnt / 2;
const int64_t abort_trans_cnt = trans_cnt - commit_trans_cnt;
ObAddr addr(ObAddr::IPV4, "127.0.0.1", 8888);
TransLogInfo trans_log_info;
// redo info
int64_t redo_cnt = 0;
ObLogIdArray redo_log_ids;
// prepare info
int64_t seq = 0;
ObPartitionKey pkey(1000U, 1, 1);
int64_t prepare_timestamp = PREPARE_TIMESTAMP;
ObTransID trans_id(addr);
uint64_t prepare_log_id = 0;
uint64_t CLOUSTER_ID = 1000;
// commit info
int64_t global_trans_version = GLOBAL_TRANS_VERSION;
PartitionLogInfoArray ptl_ids;
// Log gen.
TransLogEntryGeneratorBase log_gen(pkey, trans_id);
// Task Pool.
ObLogTransTaskPool<PartTransTask> task_pool;
EXPECT_EQ(OB_SUCCESS, init_task_pool(task_pool));
// Parser.
MockParser1 parser;
EXPECT_EQ(OB_SUCCESS, parser.init());
// Partitioned Transaction Parser
PartTransResolver pr;
err = pr.init(pkey, parser, task_pool);
EXPECT_EQ(OB_SUCCESS, err);
// Read logs.
ObLogIdArray missing;
TransStatInfo tsi;
bool stop_flag = false;
for (int64_t idx = 0; idx < trans_cnt; ++idx) {
redo_cnt = get_timestamp() % TRANS_REDO_LOG_COUNT + 2;
redo_log_ids.reset();
for (int64_t cnt = 0; cnt < redo_cnt; ++cnt) {
EXPECT_EQ(OB_SUCCESS, redo_log_ids.push_back(log_gen.get_log_id() + cnt));
}
prepare_log_id = log_gen.get_log_id() + redo_cnt - 1;
ptl_ids.reset();
ObPartitionLogInfo ptl_id(pkey, prepare_log_id, PREPARE_TIMESTAMP);
err = ptl_ids.push_back(ptl_id);
EXPECT_EQ(OB_SUCCESS, err);
// push fixed participant information
for (int64_t idx = 0; idx < FIXED_PART_COUNT; ++idx) {
err = ptl_ids.push_back(FIXED_PART_INFO[idx]);
EXPECT_EQ(OB_SUCCESS, err);
}
trans_log_info.reset(redo_cnt, redo_log_ids, seq, pkey, prepare_timestamp,
trans_id, prepare_log_id, CLOUSTER_ID, global_trans_version, ptl_ids);
EXPECT_EQ(OB_SUCCESS, parser.push_into_queue(&trans_log_info));
seq++;
// Commit trans with even idx.
log_gen.next_trans_with_redo_prepare(redo_cnt, (0 == idx % 2));
clog::ObLogEntry log_entry;
// read redo, redo... redo-prepare
for (int64_t log_cnt = 0; log_cnt < redo_cnt; log_cnt++) {
EXPECT_EQ(OB_SUCCESS, log_gen.next_log_entry_with_redo_prepare(log_entry));
err = pr.read(log_entry, missing, tsi);
EXPECT_EQ(OB_SUCCESS, err);
}
// read commit/abort log
EXPECT_EQ(OB_SUCCESS, log_gen.next_log_entry_with_redo_prepare(log_entry));
err = pr.read(log_entry, missing, tsi);
EXPECT_EQ(OB_SUCCESS, err);
err = pr.flush(stop_flag);
EXPECT_EQ(OB_SUCCESS, err);
// Verify the correctness of partition task data
bool check_result;
EXPECT_EQ(OB_SUCCESS, parser.get_check_result(check_result));
EXPECT_TRUE(check_result);
}
// Check.
EXPECT_EQ(commit_trans_cnt, parser.get_commit_trans_cnt());
EXPECT_EQ(abort_trans_cnt, parser.get_abort_trans_cnt());
// Destroy.
pr.destroy();
task_pool.destroy();
fifo_allocator.destroy();
}
/*
* Test scenario.
* Parse to prepare log, find redo log missing, need to read miss log
* For N transactions, half of them commit, half of them abort
* Each transaction has a random redo log
* Two cases.
* 1. redo, redo, redo...prepare, commit/abort
* 2. redo, redo, redo...redo-prepare, commit/abort
*
*/
TEST(PartTransResolver, BasicTest3)
{
int err = OB_SUCCESS;
// Commit half trans, whose has even idx.
const int64_t trans_cnt = TRANS_COUNT;
const int64_t commit_trans_cnt = trans_cnt / 2;
int64_t redo_cnt = 0;
int64_t miss_redo_cnt = 0;
int64_t can_read_redo_cnt = 0;
// Pkey.
ObPartitionKey pkey(1000U, 1, 1);
// addr
ObAddr addr(ObAddr::IPV4, "127.0.0.1", 8888);
ObTransID trans_id(addr);
// Log gen.
TransLogEntryGenerator1 log_gen(pkey, trans_id);
// Task Pool.
ObLogTransTaskPool<PartTransTask> task_pool;
EXPECT_EQ(OB_SUCCESS, init_task_pool(task_pool));
// Parser.
MockParser2 parser;
// Partitioned Transaction Parser
PartTransResolver pr;
err = pr.init(pkey, parser, task_pool);
EXPECT_EQ(OB_SUCCESS, err);
// Read logs.
ObLogIdArray missing;
TransStatInfo tsi;
bool stop_flag = false;
// case 1: redo, redo, redo...prepare, commit/abort
// case 2: redo, redo, redo...redo-prepare, commit/abort
bool is_normal_trans = false;
bool is_redo_with_prapare_trans = false;
for (int64_t idx = 0; idx < trans_cnt; ++idx) {
if (idx < trans_cnt / 2) {
is_normal_trans = true;
} else {
is_redo_with_prapare_trans = true;
}
redo_cnt = get_timestamp() % TRANS_REDO_LOG_COUNT + 1;
if (is_normal_trans) {
miss_redo_cnt = get_timestamp() % redo_cnt + 1;
can_read_redo_cnt = redo_cnt - miss_redo_cnt;
} else if (is_redo_with_prapare_trans){
miss_redo_cnt = get_timestamp() % redo_cnt;
can_read_redo_cnt = redo_cnt - miss_redo_cnt - 1;
} else {
}
// Commit trans with even idx.
if (is_normal_trans) {
log_gen.next_trans_with_miss_redo(redo_cnt, miss_redo_cnt, (0 == idx % 2), NORMAL_TRAN);
} else if (is_redo_with_prapare_trans){
log_gen.next_trans_with_miss_redo(redo_cnt, miss_redo_cnt, (0 == idx % 2), REDO_WITH_PREPARE_TRAN);
} else {
}
uint64_t start_redo_log_id = log_gen.get_log_id();
clog::ObLogEntry log_entry;
// First read the can_read_redo_cnt redo log
for (int64_t log_cnt = 0; log_cnt < can_read_redo_cnt; ++log_cnt) {
if (is_normal_trans) {
EXPECT_EQ(OB_SUCCESS, log_gen.next_log_entry_missing_redo(NORMAL_TRAN, log_entry));
} else if (is_redo_with_prapare_trans){
EXPECT_EQ(OB_SUCCESS, log_gen.next_log_entry_missing_redo(REDO_WITH_PREPARE_TRAN, log_entry));
} else {
}
err = pr.read(log_entry, missing, tsi);
EXPECT_EQ(OB_SUCCESS, err);
}
// Read prepare log and find miss redo log
if (is_normal_trans) {
EXPECT_EQ(OB_SUCCESS, log_gen.next_log_entry_missing_redo(NORMAL_TRAN, log_entry));
} else if (is_redo_with_prapare_trans){
EXPECT_EQ(OB_SUCCESS, log_gen.next_log_entry_missing_redo(REDO_WITH_PREPARE_TRAN, log_entry));
} else {
}
err = pr.read(log_entry, missing, tsi);
EXPECT_EQ(OB_ITEM_NOT_SETTED, err);
// Verify the misses array and read the misses redo log
const int64_t miss_array_cnt = missing.count();
EXPECT_EQ(miss_redo_cnt, miss_array_cnt);
for (int64_t log_cnt = 0; log_cnt < miss_array_cnt; ++log_cnt) {
LOG_DEBUG("miss", K(missing[log_cnt]));
EXPECT_EQ(start_redo_log_id, missing[log_cnt]);
start_redo_log_id++;
clog::ObLogEntry miss_log_entry;
EXPECT_EQ(OB_SUCCESS, log_gen.next_miss_log_entry(missing[log_cnt], miss_log_entry));
err = pr.read_missing_redo(miss_log_entry);
EXPECT_EQ(OB_SUCCESS, err);
}
// After reading the missing redo log, read the prepare log again to advance the partitioning task
if (is_normal_trans) {
EXPECT_EQ(OB_SUCCESS, log_gen.get_prepare_log_entry(NORMAL_TRAN, log_entry));
} else if (is_redo_with_prapare_trans){
EXPECT_EQ(OB_SUCCESS, log_gen.get_prepare_log_entry(REDO_WITH_PREPARE_TRAN, log_entry));
} else {
}
err = pr.read(log_entry, missing, tsi);
EXPECT_EQ(OB_SUCCESS, err);
// read commit/abort log
if (is_normal_trans) {
EXPECT_EQ(OB_SUCCESS, log_gen.next_log_entry_missing_redo(NORMAL_TRAN, log_entry));
} else if (is_redo_with_prapare_trans){
EXPECT_EQ(OB_SUCCESS, log_gen.next_log_entry_missing_redo(REDO_WITH_PREPARE_TRAN, log_entry));
} else {
}
err = pr.read(log_entry, missing, tsi);
EXPECT_EQ(OB_SUCCESS, err);
err = pr.flush(stop_flag);
EXPECT_EQ(OB_SUCCESS, err);
}
// Check.
EXPECT_EQ(commit_trans_cnt, parser.get_commit_trans_cnt());
// Destroy.
pr.destroy();
task_pool.destroy();
fifo_allocator.destroy();
}
/*
* r stands for redo, p stands for prepare, c stands for commit, a stands for abort)
* The numbers after r/p/c/a represent the different transactions
* Log sequence:
* r1 r2 r2 r2 p2 p1 c1 c2 r3 p3 c3
* Verifying the correctness of parsing multiple transactions, i.e. constructing different partitioned transaction tasks based on different transaction IDs
* Verify the output order of transactions: transaction 2 -> transaction 1 -> transaction 3
*/
TEST(PartTransResolver, BasicTest4)
{
int err = OB_SUCCESS;
ObPartitionKey pkey(1000U, 1, 1);
// Task Pool.
ObLogTransTaskPool<PartTransTask> task_pool;
EXPECT_EQ(OB_SUCCESS, init_task_pool(task_pool));
// Parser.
MockParser1 parser;
EXPECT_EQ(OB_SUCCESS, parser.init());
// Partitioned Transaction Parser
PartTransResolver pr;
err = pr.init(pkey, parser, task_pool);
EXPECT_EQ(OB_SUCCESS, err);
const int64_t commit_trans_cnt = 3;
// redo info
int64_t redo_cnt_array[3] = {1, 3, 1};
ObLogIdArray redo_log_ids_array[3];
for (int64_t i = 0; i < 3; ++i) {
for (int64_t j = 0; j < redo_cnt_array[i]; ++j) {
EXPECT_EQ(OB_SUCCESS, redo_log_ids_array[i].push_back(j));
}
}
// prepare info
// trans 2 - trans 1 - trans 3->seq: 0, 1, 2
int64_t seq_array[3] = {1, 0, 2};
int64_t prepare_timestamp = PREPARE_TIMESTAMP;
ObAddr addr_array[3];
for (int64_t idx = 0; idx < 3; idx++) {
addr_array[idx] = ObAddr(ObAddr::IPV4, "127.0.0.1", static_cast<int32_t>(8888 + idx));
}
// trans ID
ObTransID trans_id_array[3] = {
ObTransID(addr_array[0]), ObTransID(addr_array[1]), ObTransID(addr_array[2])
};
uint64_t prepare_log_id_array[3] = {1, 3, 1};
uint64_t CLOUSTER_ID = 1000;
// commit info
int64_t global_trans_version = GLOBAL_TRANS_VERSION;
PartitionLogInfoArray ptl_ids_array[3];
for (int64_t i = 0; i < 3; ++i) {
ptl_ids_array[i].reset();
ObPartitionLogInfo ptl_id(pkey, prepare_log_id_array[i], PREPARE_TIMESTAMP);
err = ptl_ids_array[i].push_back(ptl_id);
EXPECT_EQ(OB_SUCCESS, err);
// push fixed participant information
for (int64_t j = 0; j < FIXED_PART_COUNT; ++j) {
err = ptl_ids_array[i].push_back(FIXED_PART_INFO[j]);
EXPECT_EQ(OB_SUCCESS, err);
}
}
TransLogInfo trans_log_info_array[3];
for (int64_t i = 0; i < 3; ++i) {
trans_log_info_array[i].reset(redo_cnt_array[i], redo_log_ids_array[i], seq_array[i], pkey, prepare_timestamp,
trans_id_array[i], prepare_log_id_array[i],
CLOUSTER_ID, global_trans_version, ptl_ids_array[i]);
}
// Push in the order of transaction 2 - transaction 1 - transaction 3 for subsequent validation of the transaction output order
EXPECT_EQ(OB_SUCCESS, parser.push_into_queue(&trans_log_info_array[1]));
EXPECT_EQ(OB_SUCCESS, parser.push_into_queue(&trans_log_info_array[0]));
EXPECT_EQ(OB_SUCCESS, parser.push_into_queue(&trans_log_info_array[2]));
// Log gen. Generate logs for transactions 1, 2 and 3 respectively
TransLogEntryGeneratorBase log_gen_1(pkey, trans_id_array[0]);
TransLogEntryGeneratorBase log_gen_2(pkey, trans_id_array[1]);
TransLogEntryGeneratorBase log_gen_3(pkey, trans_id_array[2]);
log_gen_1.next_trans(redo_cnt_array[0], true);
log_gen_2.next_trans(redo_cnt_array[1], true);
log_gen_3.next_trans(redo_cnt_array[2], true);
// Read logs.
ObLogIdArray missing1;
ObLogIdArray missing2;
ObLogIdArray missing3;
TransStatInfo tsi;
volatile bool stop_flag = false;
// log seq:
// r1 r2 r2 r2 p2 p1 c1 c2 r3 p3 c3
clog::ObLogEntry log_entry;
EXPECT_EQ(0, pr.task_seq_);
EXPECT_EQ(0, pr.prepare_seq_);
// r1
EXPECT_EQ(OB_SUCCESS, log_gen_1.next_log_entry(log_entry));
err = pr.read(log_entry, missing1, tsi);
EXPECT_EQ(OB_SUCCESS, err);
// r2
EXPECT_EQ(OB_SUCCESS, log_gen_2.next_log_entry(log_entry));
err = pr.read(log_entry, missing2, tsi);
EXPECT_EQ(OB_SUCCESS, err);
// r2
EXPECT_EQ(OB_SUCCESS, log_gen_2.next_log_entry(log_entry));
err = pr.read(log_entry, missing2, tsi);
EXPECT_EQ(OB_SUCCESS, err);
// r2
EXPECT_EQ(OB_SUCCESS, log_gen_2.next_log_entry(log_entry));
err = pr.read(log_entry, missing2, tsi);
EXPECT_EQ(OB_SUCCESS, err);
// p2
EXPECT_EQ(OB_SUCCESS, log_gen_2.next_log_entry(log_entry));
err = pr.read(log_entry, missing2, tsi);
EXPECT_EQ(OB_SUCCESS, err);
EXPECT_EQ(1, pr.prepare_seq_);
// p1
EXPECT_EQ(OB_SUCCESS, log_gen_1.next_log_entry(log_entry));
err = pr.read(log_entry, missing1, tsi);
EXPECT_EQ(OB_SUCCESS, err);
EXPECT_EQ(2, pr.prepare_seq_);
// c1
EXPECT_EQ(OB_SUCCESS, log_gen_1.next_log_entry(log_entry));
err = pr.read(log_entry, missing1, tsi);
EXPECT_EQ(OB_SUCCESS, err);
// c2
EXPECT_EQ(OB_SUCCESS, log_gen_2.next_log_entry(log_entry));
err = pr.read(log_entry, missing2, tsi);
EXPECT_EQ(OB_SUCCESS, err);
// r3
EXPECT_EQ(OB_SUCCESS, log_gen_3.next_log_entry(log_entry));
err = pr.read(log_entry, missing3, tsi);
EXPECT_EQ(OB_SUCCESS, err);
// p3
EXPECT_EQ(OB_SUCCESS, log_gen_3.next_log_entry(log_entry));
err = pr.read(log_entry, missing3, tsi);
EXPECT_EQ(OB_SUCCESS, err);
EXPECT_EQ(3, pr.prepare_seq_);
// c3
EXPECT_EQ(OB_SUCCESS, log_gen_3.next_log_entry(log_entry));
err = pr.read(log_entry, missing3, tsi);
EXPECT_EQ(OB_SUCCESS, err);
err = pr.flush(stop_flag);
EXPECT_EQ(OB_SUCCESS, err);
EXPECT_EQ(3, pr.task_seq_);
// Check.
EXPECT_EQ(commit_trans_cnt, parser.get_commit_trans_cnt());
// Verify the correctness of partition task data
for (int64_t idx = 0; idx < 3; ++idx) {
bool check_result;
EXPECT_EQ(OB_SUCCESS, parser.get_check_result(check_result));
EXPECT_TRUE(check_result);
}
// Destroy.
pr.destroy();
task_pool.destroy();
fifo_allocator.destroy();
}
/*
* Test scenario:
* For N Sp transactions, half of them commit, half of them abort
* Each Sp transaction has a random redo log
*
* log seq: redo, redo, ... redo, commit/abort
*
* // redo info
* redo_log_cnt
* ObLogIdArray redo_log_ids;
*
* // prepare info
* int64_t seq;
* common::ObPartitionKey partition;
* int64_t prepare_timestamp;
* ObTransID trans_id;
* uint64_t prepare_log_id;
* uint64_t cluster_id;
*
* // commit info
* int64_t global_trans_version;
* PartitionLogInfoArray *participants;
*
*/
TEST(PartTransResolver, BasicTest5)
{
int err = OB_SUCCESS;
// Commit half trans, whose has even idx.
const int64_t trans_cnt = TRANS_COUNT;
const int64_t commit_trans_cnt = trans_cnt / 2;
const int64_t abort_trans_cnt = trans_cnt - commit_trans_cnt;
ObAddr addr(ObAddr::IPV4, "127.0.0.1", 8888);
TransLogInfo trans_log_info;
// redo info
int64_t redo_cnt = 0;
ObLogIdArray redo_log_ids;
// prepare info
int64_t seq = 0;
ObPartitionKey pkey(1000U, 1, 1);
int64_t prepare_timestamp = SP_PREPARE_TIMESTAMP;
ObTransID trans_id(addr);
uint64_t prepare_log_id = 0;
uint64_t CLOUSTER_ID = 1000;
// commit info
int64_t global_trans_version = SP_GLOBAL_TRANS_VERSION;
PartitionLogInfoArray ptl_ids;
// Log gen.
SpTransLogEntryGeneratorBase log_gen(pkey, trans_id);
// Task Pool.
ObLogTransTaskPool<PartTransTask> task_pool;
EXPECT_EQ(OB_SUCCESS, init_task_pool(task_pool));
// Parser.
MockParser1 parser;
EXPECT_EQ(OB_SUCCESS, parser.init());
// Partitioned Transaction Parser
PartTransResolver pr;
err = pr.init(pkey, parser, task_pool);
EXPECT_EQ(OB_SUCCESS, err);
// Read logs.
ObLogIdArray missing;
TransStatInfo tsi;
volatile bool stop_flag = false;
for (int64_t idx = 0; idx < trans_cnt; ++idx) {
redo_cnt = get_timestamp() % TRANS_REDO_LOG_COUNT + 1;
redo_log_ids.reset();
for (int64_t cnt = 0; cnt < redo_cnt; ++cnt) {
EXPECT_EQ(OB_SUCCESS, redo_log_ids.push_back(log_gen.get_log_id() + cnt));
}
prepare_log_id = log_gen.get_log_id() + redo_cnt;
ptl_ids.reset();
ObPartitionLogInfo ptl_id(pkey, prepare_log_id, PREPARE_TIMESTAMP);
err = ptl_ids.push_back(ptl_id);
EXPECT_EQ(OB_SUCCESS, err);
trans_log_info.reset(redo_cnt, redo_log_ids, seq, pkey, prepare_timestamp,
trans_id, prepare_log_id, CLOUSTER_ID, global_trans_version, ptl_ids);
EXPECT_EQ(OB_SUCCESS, parser.push_into_queue(&trans_log_info));
seq++;
// Commit trans with even idx.
log_gen.next_trans(redo_cnt, (0 == idx % 2));
clog::ObLogEntry log_entry;
while (OB_SUCCESS == log_gen.next_log_entry(log_entry)) {
err = pr.read(log_entry, missing, tsi);
EXPECT_EQ(OB_SUCCESS, err);
}
err = pr.flush(stop_flag);
EXPECT_EQ(OB_SUCCESS, err);
// Verify the correctness of partition task data
bool check_result;
EXPECT_EQ(OB_SUCCESS, parser.get_check_result(check_result));
EXPECT_TRUE(check_result);
}
// Check.
EXPECT_EQ(commit_trans_cnt, parser.get_commit_trans_cnt());
EXPECT_EQ(abort_trans_cnt, parser.get_abort_trans_cnt());
// Destroy.
pr.destroy();
task_pool.destroy();
fifo_allocator.destroy();
}
/*
* Test scenario:
* For N Sp transactions, redo and commit in the same log entry
* Each Sp transaction has a random redo log
*
* log seq: redo, redo, ... redo, redo-commit
*
*/
TEST(PartTransResolver, BasicTest6)
{
int err = OB_SUCCESS;
const int64_t trans_cnt = TRANS_COUNT;
const int64_t commit_trans_cnt = trans_cnt;
ObAddr addr(ObAddr::IPV4, "127.0.0.1", 8888);
TransLogInfo trans_log_info;
// redo info
int64_t redo_cnt = 0;
ObLogIdArray redo_log_ids;
// prepare info
int64_t seq = 0;
ObPartitionKey pkey(1000U, 1, 1);
int64_t prepare_timestamp = SP_PREPARE_TIMESTAMP;
ObTransID trans_id(addr);
uint64_t prepare_log_id = 0;
uint64_t CLOUSTER_ID = 1000;
// commit info
int64_t global_trans_version = SP_GLOBAL_TRANS_VERSION;
PartitionLogInfoArray ptl_ids;
// Log gen.
SpTransLogEntryGeneratorBase log_gen(pkey, trans_id);
// Task Pool.
ObLogTransTaskPool<PartTransTask> task_pool;
EXPECT_EQ(OB_SUCCESS, init_task_pool(task_pool));
// Parser.
MockParser1 parser;
//MockParser2 parser;
EXPECT_EQ(OB_SUCCESS, parser.init());
// Partitioned Transaction Parser
PartTransResolver pr;
err = pr.init(pkey, parser, task_pool);
EXPECT_EQ(OB_SUCCESS, err);
// Read logs.
ObLogIdArray missing;
TransStatInfo tsi;
volatile bool stop_flag = false;
for (int64_t idx = 0; idx < trans_cnt; ++idx) {
redo_cnt = get_timestamp() % TRANS_REDO_LOG_COUNT + 1;
// First test, if redo_cnt=1, only one redo-commit, prepare_log_id=0, illegal
if (0 == idx && 1 == redo_cnt) {
redo_cnt++;
}
redo_log_ids.reset();
for (int64_t cnt = 0; cnt < redo_cnt; ++cnt) {
EXPECT_EQ(OB_SUCCESS, redo_log_ids.push_back(log_gen.get_log_id() + cnt));
}
// sp transaction does not have prepare log, prepare log id is the same as commit log id
prepare_log_id = log_gen.get_log_id() + redo_cnt - 1;
ptl_ids.reset();
ObPartitionLogInfo ptl_id(pkey, prepare_log_id, PREPARE_TIMESTAMP);
err = ptl_ids.push_back(ptl_id);
EXPECT_EQ(OB_SUCCESS, err);
trans_log_info.reset(redo_cnt, redo_log_ids, seq, pkey, prepare_timestamp,
trans_id, prepare_log_id, CLOUSTER_ID, global_trans_version, ptl_ids);
EXPECT_EQ(OB_SUCCESS, parser.push_into_queue(&trans_log_info));
seq++;
log_gen.next_trans_with_redo_commit(redo_cnt);
clog::ObLogEntry log_entry;
while (OB_SUCCESS == log_gen.next_log_entry_with_redo_commit(log_entry)) {
err = pr.read(log_entry, missing, tsi);
EXPECT_EQ(OB_SUCCESS, err);
}
err = pr.flush(stop_flag);
EXPECT_EQ(OB_SUCCESS, err);
// Verify the correctness of partition task data
bool check_result;
EXPECT_EQ(OB_SUCCESS, parser.get_check_result(check_result));
EXPECT_TRUE(check_result);
}
// Check.
EXPECT_EQ(commit_trans_cnt, parser.get_commit_trans_cnt());
// Destroy.
pr.destroy();
task_pool.destroy();
fifo_allocator.destroy();
}
/*
* Test scenario:
* For N Sp transactions, redo and commit in the same log entry
* Each Sp transaction has a random redo log
*
* Log sequence: redo, redo, ... redo, redo-commit
* Read to redo-commit and find redo log missing, need to read miss log
*
*/
TEST(PartTransResolver, BasicTest7)
{
int err = OB_SUCCESS;
const int64_t trans_cnt = TRANS_COUNT;
//const int64_t trans_cnt = 2;
const int64_t commit_trans_cnt = trans_cnt;
int64_t redo_cnt = 0;
int64_t miss_redo_cnt = 0;
int64_t can_read_redo_cnt = 0;
// Pkey.
ObPartitionKey pkey(1000U, 1, 1);
ObAddr addr(ObAddr::IPV4, "127.0.0.1", 8888);
ObTransID trans_id(addr);
// Log gen.
SpTransLogEntryGenerator1 log_gen(pkey, trans_id);
// Task Pool.
ObLogTransTaskPool<PartTransTask> task_pool;
EXPECT_EQ(OB_SUCCESS, init_task_pool(task_pool));
// Parser.
MockParser2 parser;
// Partitioned Transaction Parser
PartTransResolver pr;
err = pr.init(pkey, parser, task_pool);
EXPECT_EQ(OB_SUCCESS, err);
// Read logs.
ObLogIdArray missing;
TransStatInfo tsi;
volatile bool stop_flag = false;
// case 1: redo, redo, redo, ... redo, commit
// case 2: redo, redo, redo, ... redo, redo-commit
bool is_normal_trans = false;
bool is_redo_with_commit_trans = false;
for (int64_t idx = 0; idx < trans_cnt; ++idx) {
if (idx < trans_cnt / 2) {
is_normal_trans = true;
} else {
is_redo_with_commit_trans = true;
}
redo_cnt = get_timestamp() % TRANS_REDO_LOG_COUNT + 1;
//redo_cnt = 2;
if (is_normal_trans) {
miss_redo_cnt = get_timestamp() % redo_cnt + 1;
can_read_redo_cnt = redo_cnt - miss_redo_cnt;
} else if (is_redo_with_commit_trans){
miss_redo_cnt = get_timestamp() % redo_cnt;
can_read_redo_cnt = redo_cnt - miss_redo_cnt - 1;
} else {
}
if (is_normal_trans) {
log_gen.next_trans_with_miss_redo(redo_cnt, miss_redo_cnt, SP_NORMAL_TRAN);
} else if (is_redo_with_commit_trans){
log_gen.next_trans_with_miss_redo(redo_cnt, miss_redo_cnt, SP_REDO_WITH_COMMIT_TRAN);
} else {
}
uint64_t start_redo_log_id = log_gen.get_log_id();
clog::ObLogEntry log_entry;
// First read the can_read_redo_cnt redo log
for (int64_t log_cnt = 0; log_cnt < can_read_redo_cnt; ++log_cnt) {
if (is_normal_trans) {
EXPECT_EQ(OB_SUCCESS, log_gen.next_log_entry_missing_redo(SP_NORMAL_TRAN, log_entry));
} else if (is_redo_with_commit_trans) {
EXPECT_EQ(OB_SUCCESS, log_gen.next_log_entry_missing_redo(SP_REDO_WITH_COMMIT_TRAN, log_entry));
} else {
}
err = pr.read(log_entry, missing, tsi);
EXPECT_EQ(OB_SUCCESS, err);
}
// read commit log. found miss redo log,
if (is_normal_trans) {
EXPECT_EQ(OB_SUCCESS, log_gen.next_log_entry_missing_redo(SP_NORMAL_TRAN, log_entry));
} else if (is_redo_with_commit_trans) {
EXPECT_EQ(OB_SUCCESS, log_gen.next_log_entry_missing_redo(SP_REDO_WITH_COMMIT_TRAN, log_entry));
} else {
}
err = pr.read(log_entry, missing, tsi);
EXPECT_EQ(OB_ITEM_NOT_SETTED, err);
// Verify the misses array and read the misses redo log
const int64_t miss_array_cnt = missing.count();
EXPECT_EQ(miss_redo_cnt, miss_array_cnt);
for (int64_t log_cnt = 0; log_cnt < miss_array_cnt; ++log_cnt) {
LOG_DEBUG("miss", K(missing[log_cnt]));
EXPECT_EQ(start_redo_log_id, missing[log_cnt]);
start_redo_log_id++;
clog::ObLogEntry miss_log_entry;
EXPECT_EQ(OB_SUCCESS, log_gen.next_miss_log_entry(missing[log_cnt], miss_log_entry));
err = pr.read_missing_redo(miss_log_entry);
EXPECT_EQ(OB_SUCCESS, err);
}
// After reading the missing redo log, read the commit log again to advance the partitioning task and free up commit_log_entry memory
if (is_normal_trans) {
EXPECT_EQ(OB_SUCCESS, log_gen.get_commit_log_entry(SP_NORMAL_TRAN, log_entry));
} else if (is_redo_with_commit_trans){
EXPECT_EQ(OB_SUCCESS, log_gen.get_commit_log_entry(SP_REDO_WITH_COMMIT_TRAN, log_entry));
} else {
}
err = pr.read(log_entry, missing, tsi);
EXPECT_EQ(OB_SUCCESS, err);
err = pr.flush(stop_flag);
EXPECT_EQ(OB_SUCCESS, err);
}
// Check.
EXPECT_EQ(commit_trans_cnt, parser.get_commit_trans_cnt());
// Destroy.
pr.destroy();
task_pool.destroy();
fifo_allocator.destroy();
}
}
}
int main(int argc, char **argv)
{
//ObLogger::get_logger().set_mod_log_levels("ALL.*:DEBUG, TLOG.*:DEBUG");
// testing::FLAGS_gtest_filter = "DO_NOT_RUN";
ObLogger &logger = ObLogger::get_logger();
logger.set_file_name("test_ob_log_part_trans_resolver.log", true);
logger.set_log_level(OB_LOG_LEVEL_INFO);
testing::InitGoogleTest(&argc,argv);
return RUN_ALL_TESTS();
}