819 lines
24 KiB
C++
819 lines
24 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#include <gtest/gtest.h>
|
|
#include <vector>
|
|
#include <algorithm>
|
|
|
|
#include "share/ob_define.h"
|
|
#include "storage/ob_storage_log_type.h"
|
|
#include "storage/transaction/ob_trans_log.h"
|
|
|
|
#include "obcdc/src/ob_log_instance.h"
|
|
#include "obcdc/src/ob_log_parser.h"
|
|
|
|
#include "ob_log_utils.h" // get_timestamp
|
|
#include "obcdc/src/ob_map_queue.h"
|
|
#include "lib/oblog/ob_log_module.h"
|
|
|
|
using namespace oceanbase;
|
|
using namespace common;
|
|
using namespace liboblog;
|
|
using namespace transaction;
|
|
using namespace storage;
|
|
using namespace clog;
|
|
|
|
namespace oceanbase
|
|
{
|
|
namespace unittest
|
|
{
|
|
// prepare log
|
|
static const int64_t PREPARE_TIMESTAMP = 10 * 1000 * 1000;
|
|
// commit log
|
|
static const int64_t GLOBAL_TRANS_VERSION = 100;
|
|
|
|
static const int64_t FIXED_PART_COUNT = 6;
|
|
static const ObPartitionLogInfo FIXED_PART_INFO[FIXED_PART_COUNT] =
|
|
{
|
|
ObPartitionLogInfo(ObPartitionKey(1000U, 0, 6), 100, PREPARE_TIMESTAMP),
|
|
ObPartitionLogInfo(ObPartitionKey(1000U, 1, 6), 100, PREPARE_TIMESTAMP),
|
|
ObPartitionLogInfo(ObPartitionKey(1000U, 2, 6), 100, PREPARE_TIMESTAMP),
|
|
ObPartitionLogInfo(ObPartitionKey(1000U, 3, 6), 100, PREPARE_TIMESTAMP),
|
|
ObPartitionLogInfo(ObPartitionKey(1000U, 4, 6), 100, PREPARE_TIMESTAMP),
|
|
ObPartitionLogInfo(ObPartitionKey(1000U, 5, 6), 100, PREPARE_TIMESTAMP)
|
|
};
|
|
|
|
/*
|
|
* TransLog Generator 1.
|
|
* Generate single partition transaction logs.
|
|
* Support get trans logs in CORRECT order.
|
|
* Use:
|
|
* - Call next_trans(), specify trans params.
|
|
* - Get logs in correct order: redo, redo, ..., prepare, commit/abort.
|
|
*/
|
|
struct TransParam1
|
|
{
|
|
// Params used in trans log.
|
|
ObPartitionKey pkey_;
|
|
ObTransID trans_id_;
|
|
ObAddr scheduler_;
|
|
ObPartitionKey coordinator_;
|
|
ObPartitionArray participants_;
|
|
ObStartTransParam trans_param_;
|
|
};
|
|
|
|
class TransLogGenerator1
|
|
{
|
|
public:
|
|
TransLogGenerator1()
|
|
: param_(),
|
|
redo_(),
|
|
prepare_(),
|
|
commit_(),
|
|
abort_()
|
|
{ }
|
|
virtual ~TransLogGenerator1() { }
|
|
public:
|
|
void next_trans(const TransParam1 ¶m)
|
|
{
|
|
param_ = param;
|
|
}
|
|
const ObTransRedoLog& next_redo(const uint64_t log_id)
|
|
{
|
|
int err = OB_SUCCESS;
|
|
uint64_t tenant_id = 100;
|
|
const uint64_t cluster_id = 1000;
|
|
redo_.reset();
|
|
ObVersion active_memstore_version(1);
|
|
err = redo_.init(OB_LOG_TRANS_REDO, param_.pkey_, param_.trans_id_,
|
|
tenant_id, log_id, param_.scheduler_, param_.coordinator_,
|
|
param_.participants_, param_.trans_param_, cluster_id, active_memstore_version);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
ObTransMutator &mutator = redo_.get_mutator();
|
|
if (NULL == mutator.get_mutator_buf()) {
|
|
mutator.init(true);
|
|
}
|
|
const char *data = "fly";
|
|
char *buf = static_cast<char*>(mutator.alloc(strlen(data)));
|
|
strcpy(buf, data);
|
|
return redo_;
|
|
}
|
|
const ObTransPrepareLog& next_prepare(const ObRedoLogIdArray &all_redos)
|
|
{
|
|
int err = OB_SUCCESS;
|
|
uint64_t tenant_id = 100;
|
|
const uint64_t cluster_id = 1000;
|
|
ObString trace_id;
|
|
prepare_.reset();
|
|
ObVersion active_memstore_version(1);
|
|
err = prepare_.init(OB_LOG_TRANS_PREPARE, param_.pkey_, param_.trans_id_,
|
|
tenant_id, param_.scheduler_, param_.coordinator_,
|
|
param_.participants_, param_.trans_param_,
|
|
OB_SUCCESS, all_redos, 0, cluster_id, active_memstore_version, trace_id);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
return prepare_;
|
|
}
|
|
const ObTransCommitLog& next_commit(const uint64_t prepare_log_id)
|
|
{
|
|
int err = OB_SUCCESS;
|
|
const uint64_t cluster_id = 1000;
|
|
PartitionLogInfoArray ptl_ids;
|
|
|
|
ObPartitionLogInfo ptl_id(param_.pkey_, prepare_log_id, PREPARE_TIMESTAMP);
|
|
err = ptl_ids.push_back(ptl_id);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
|
|
// push Fixed participant information
|
|
for (int64_t idx = 0; idx < FIXED_PART_COUNT; ++idx) {
|
|
err = ptl_ids.push_back(FIXED_PART_INFO[idx]);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
}
|
|
|
|
commit_.reset();
|
|
err = commit_.init(OB_LOG_TRANS_COMMIT, param_.pkey_, param_.trans_id_,
|
|
ptl_ids, GLOBAL_TRANS_VERSION, 0, cluster_id);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
return commit_;
|
|
}
|
|
const ObTransAbortLog& next_abort()
|
|
{
|
|
int err = OB_SUCCESS;
|
|
const uint64_t cluster_id = 1000;
|
|
PartitionLogInfoArray array;
|
|
abort_.reset();
|
|
err = abort_.init(OB_LOG_TRANS_ABORT, param_.pkey_, param_.trans_id_, array, cluster_id);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
return abort_;
|
|
}
|
|
private:
|
|
TransParam1 param_;
|
|
ObTransRedoLog redo_;
|
|
ObTransPrepareLog prepare_;
|
|
ObTransCommitLog commit_;
|
|
ObTransAbortLog abort_;
|
|
};
|
|
|
|
/*
|
|
* Transaction Log Entry Generator base
|
|
* Generate log entries of transactions.
|
|
*/
|
|
class TransLogEntryGeneratorBase
|
|
{
|
|
static const ObAddr SCHEDULER;
|
|
public:
|
|
// Pass in the ObTransID, which can be used to specify different transactions for the same partition
|
|
TransLogEntryGeneratorBase(const ObPartitionKey &pkey, const ObTransID &trans_id)
|
|
: pkey_(pkey),
|
|
log_id_(0),
|
|
remain_log_cnt_(0),
|
|
is_commit_(false),
|
|
param_(),
|
|
trans_log_gen_(),
|
|
prepare_id_(0),
|
|
redos_(),
|
|
redo_cnt_(0),
|
|
data_len_(0)
|
|
{
|
|
param_.pkey_ = pkey_;
|
|
param_.trans_id_ = trans_id;
|
|
param_.scheduler_ = SCHEDULER;
|
|
param_.coordinator_ = pkey_;
|
|
int err = param_.participants_.push_back(pkey_);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
param_.trans_param_.set_access_mode(ObTransAccessMode::READ_WRITE);
|
|
param_.trans_param_.set_isolation(ObTransIsolation::READ_COMMITED);
|
|
param_.trans_param_.set_type(ObTransType::TRANS_NORMAL);
|
|
|
|
buf_ = new char[buf_len_];
|
|
EXPECT_TRUE(NULL != buf_);
|
|
}
|
|
|
|
virtual ~TransLogEntryGeneratorBase()
|
|
{
|
|
delete[] buf_;
|
|
}
|
|
|
|
// Generate normal trans. redo, redo...prepare, commit/abort
|
|
// Start a new trans.
|
|
// Specify the number of redo entries and call next_log_entry to get them in order
|
|
void next_trans(const int64_t redo_cnt, bool is_commit)
|
|
{
|
|
// total log count of normal trans = redo log count + 2(prepare log + commit/abort log)
|
|
remain_log_cnt_ = redo_cnt + 2;
|
|
is_commit_ = is_commit;
|
|
redos_.reset();
|
|
redo_cnt_ = redo_cnt;
|
|
trans_log_gen_.next_trans(param_);
|
|
}
|
|
|
|
// Generate: redo, redo... redo-prepare, commit/abort
|
|
// Start a new trans.
|
|
// redo and prepare logs in the same log entry
|
|
void next_trans_with_redo_prepare(const int64_t redo_cnt, bool is_commit)
|
|
{
|
|
next_trans(redo_cnt, is_commit);
|
|
// redo and prepare in the same log_entry
|
|
remain_log_cnt_ = redo_cnt + 1;
|
|
}
|
|
|
|
// Get next log entry.
|
|
// normal trans: get redo, redo...prepare, commit/abort by sequence
|
|
int next_log_entry(clog::ObLogEntry &log_entry)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
if (2 < remain_log_cnt_) {
|
|
next_redo_(log_id_, log_entry);
|
|
// Store redo id.
|
|
int err = redos_.push_back(log_id_);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
log_id_ += 1;
|
|
remain_log_cnt_ -= 1;
|
|
} else if (2 == remain_log_cnt_) {
|
|
next_prepare_(log_id_, log_entry);
|
|
prepare_id_ = log_id_;
|
|
log_id_ += 1;
|
|
remain_log_cnt_ -= 1;
|
|
} else if (1 == remain_log_cnt_ && is_commit_) {
|
|
next_commit_(log_entry);
|
|
log_id_ += 1;
|
|
remain_log_cnt_ -= 1;
|
|
} else if (1 == remain_log_cnt_ && !is_commit_) {
|
|
next_abort_(log_entry);
|
|
log_id_ += 1;
|
|
remain_log_cnt_ -= 1;
|
|
} else {
|
|
ret = OB_ITER_END;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
// Get next log entry.
|
|
// trans log with redo-prepare: get by order as follows: redo, redo...redo-prepare, commit/abort
|
|
int next_log_entry_with_redo_prepare(clog::ObLogEntry &log_entry)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
if (2 < remain_log_cnt_) {
|
|
next_redo_(log_id_, log_entry);
|
|
// Store redo id.
|
|
int err = redos_.push_back(log_id_);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
log_id_ += 1;
|
|
remain_log_cnt_ -= 1;
|
|
} else if (2 == remain_log_cnt_) {
|
|
// redo-prepare
|
|
next_redo_with_prepare_(log_id_, log_entry);
|
|
prepare_id_ = log_id_;
|
|
log_id_ += 1;
|
|
remain_log_cnt_ -= 1;
|
|
} else if (1 == remain_log_cnt_ && is_commit_) {
|
|
next_commit_(log_entry);
|
|
log_id_ += 1;
|
|
remain_log_cnt_ -= 1;
|
|
} else if (1 == remain_log_cnt_ && !is_commit_) {
|
|
next_abort_(log_entry);
|
|
log_id_ += 1;
|
|
remain_log_cnt_ -= 1;
|
|
} else {
|
|
ret = OB_ITER_END;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
public:
|
|
uint64_t get_log_id()
|
|
{
|
|
return log_id_;
|
|
}
|
|
protected:
|
|
// return specified log_id and redo log
|
|
void next_redo_(const uint64_t redo_log_id, clog::ObLogEntry &log_entry)
|
|
{
|
|
int err = OB_SUCCESS;
|
|
// Gen trans log.
|
|
const ObTransRedoLog &redo = trans_log_gen_.next_redo(redo_log_id);
|
|
int64_t pos = 0;
|
|
err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_REDO);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
err = serialization::encode_i64(buf_, buf_len_, pos, 0);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
err = redo.serialize(buf_, buf_len_, pos);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
data_len_ = pos;
|
|
// Gen entry header.
|
|
ObLogEntryHeader header;
|
|
header.generate_header(OB_LOG_SUBMIT, pkey_, redo_log_id, buf_,
|
|
data_len_, get_timestamp(), get_timestamp(),
|
|
ObProposalID(), get_timestamp(), ObVersion(0));
|
|
// Gen log entry.
|
|
log_entry.generate_entry(header, buf_);
|
|
}
|
|
// Returns the prepare log with the specified log_id
|
|
void next_prepare_(const uint64_t prepare_log_id, clog::ObLogEntry &log_entry)
|
|
{
|
|
int err = OB_SUCCESS;
|
|
// Gen trans log.
|
|
const ObTransPrepareLog &prepare= trans_log_gen_.next_prepare(redos_);
|
|
int64_t pos = 0;
|
|
err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_PREPARE);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
err = serialization::encode_i64(buf_, buf_len_, pos, 0);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
err = prepare.serialize(buf_, buf_len_, pos);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
data_len_ = pos;
|
|
// Gen entry header.
|
|
ObLogEntryHeader header;
|
|
header.generate_header(OB_LOG_SUBMIT, pkey_, prepare_log_id, buf_,
|
|
data_len_, get_timestamp(), get_timestamp(),
|
|
ObProposalID(), PREPARE_TIMESTAMP, ObVersion(0));
|
|
// Gen log entry.
|
|
log_entry.generate_entry(header, buf_);
|
|
}
|
|
void next_commit_(clog::ObLogEntry &log_entry)
|
|
{
|
|
int err = OB_SUCCESS;
|
|
// Gen trans log.
|
|
const ObTransCommitLog &commit = trans_log_gen_.next_commit(prepare_id_);
|
|
int64_t pos = 0;
|
|
err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_COMMIT);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
err = serialization::encode_i64(buf_, buf_len_, pos, 0);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
err = commit.serialize(buf_, buf_len_, pos);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
data_len_ = pos;
|
|
// Gen entry header.
|
|
ObLogEntryHeader header;
|
|
header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_,
|
|
data_len_, get_timestamp(), get_timestamp(),
|
|
ObProposalID(), get_timestamp(), ObVersion(0));
|
|
// Gen log entry.
|
|
log_entry.generate_entry(header, buf_);
|
|
}
|
|
void next_abort_(clog::ObLogEntry &log_entry)
|
|
{
|
|
int err = OB_SUCCESS;
|
|
// Gen trans log.
|
|
const ObTransAbortLog &abort = trans_log_gen_.next_abort();
|
|
int64_t pos = 0;
|
|
err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_ABORT);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
err = serialization::encode_i64(buf_, buf_len_, pos, 0);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
err = abort.serialize(buf_, buf_len_, pos);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
data_len_ = pos;
|
|
// Gen entry header.
|
|
ObLogEntryHeader header;
|
|
header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_,
|
|
data_len_, get_timestamp(), get_timestamp(),
|
|
ObProposalID(), get_timestamp(), ObVersion(0));
|
|
// Gen log entry.
|
|
log_entry.generate_entry(header, buf_);
|
|
}
|
|
void next_redo_with_prepare_(const uint64_t prepare_log_id, clog::ObLogEntry &log_entry)
|
|
{
|
|
int err = OB_SUCCESS;
|
|
// Gen trans log.
|
|
const ObTransRedoLog &redo = trans_log_gen_.next_redo(prepare_log_id);
|
|
const ObTransPrepareLog &prepare= trans_log_gen_.next_prepare(redos_);
|
|
int64_t pos = 0;
|
|
err = serialization::encode_i64(buf_, buf_len_,
|
|
pos, OB_LOG_TRANS_REDO_WITH_PREPARE);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
err = serialization::encode_i64(buf_, buf_len_, pos, 0);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
|
|
err = redo.serialize(buf_, buf_len_, pos);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
err = prepare.serialize(buf_, buf_len_, pos);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
|
|
data_len_ = pos;
|
|
// Gen entry header.
|
|
ObLogEntryHeader header;
|
|
header.generate_header(OB_LOG_SUBMIT, pkey_, prepare_log_id, buf_,
|
|
data_len_, get_timestamp(), get_timestamp(),
|
|
ObProposalID(), PREPARE_TIMESTAMP, ObVersion(0));
|
|
// Gen log entry.
|
|
log_entry.generate_entry(header, buf_);
|
|
}
|
|
protected:
|
|
// Params.
|
|
ObPartitionKey pkey_;
|
|
uint64_t log_id_;
|
|
int64_t remain_log_cnt_;
|
|
// mark current trans is commit or not
|
|
bool is_commit_;
|
|
// Gen.
|
|
TransParam1 param_;
|
|
TransLogGenerator1 trans_log_gen_;
|
|
uint64_t prepare_id_;
|
|
ObRedoLogIdArray redos_;
|
|
int64_t redo_cnt_;
|
|
// Buf.
|
|
int64_t data_len_;
|
|
static const int64_t buf_len_ = 2 * _M_;
|
|
char *buf_;
|
|
};
|
|
|
|
const ObAddr TransLogEntryGeneratorBase::SCHEDULER = ObAddr(ObAddr::IPV4, "127.0.0.1", 5566);
|
|
|
|
/*
|
|
* test missing redo log, When the prepare log is read, the missing redo can be detected
|
|
*
|
|
* two case:
|
|
* 1. redo, redo, redo...prepare, commit/abort
|
|
* 2. redo, redo, redo...redo-prepare, commit/abort
|
|
*/
|
|
enum CaseType
|
|
{
|
|
NORMAL_TRAN,
|
|
REDO_WITH_PREPARE_TRAN
|
|
};
|
|
class TransLogEntryGenerator1 : public TransLogEntryGeneratorBase
|
|
{
|
|
public:
|
|
TransLogEntryGenerator1(const ObPartitionKey &pkey, const ObTransID &trans_id)
|
|
: TransLogEntryGeneratorBase(pkey, trans_id),
|
|
is_first(false),
|
|
miss_redo_cnt_(0)
|
|
{}
|
|
~TransLogEntryGenerator1() {}
|
|
public:
|
|
// Specify the number of redo logs in redo_cnt, and the number of missing redo logs
|
|
void next_trans_with_miss_redo(const int64_t redo_cnt, const int64_t miss_redo_cnt,
|
|
bool is_commit, CaseType type)
|
|
{
|
|
if (NORMAL_TRAN == type) {
|
|
next_trans(redo_cnt, is_commit);
|
|
} else if(REDO_WITH_PREPARE_TRAN == type) {
|
|
next_trans_with_redo_prepare(redo_cnt, is_commit);
|
|
} else {
|
|
}
|
|
miss_redo_cnt_ = miss_redo_cnt;
|
|
is_first = true;
|
|
}
|
|
|
|
int next_log_entry_missing_redo(CaseType type, clog::ObLogEntry &log_entry)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
// miss_redo_cnt_bars before miss, no return but add redos
|
|
if (is_first) {
|
|
for (int64_t idx = 0; idx < miss_redo_cnt_; idx++) {
|
|
next_redo_(log_id_, log_entry);
|
|
// Store redo id.
|
|
int err = redos_.push_back(log_id_);
|
|
EXPECT_EQ(OB_SUCCESS, err);
|
|
log_id_ += 1;
|
|
remain_log_cnt_ -= 1;
|
|
}
|
|
is_first = false;
|
|
}
|
|
|
|
if (NORMAL_TRAN == type) {
|
|
ret = next_log_entry(log_entry);
|
|
} else if(REDO_WITH_PREPARE_TRAN == type) {
|
|
ret = next_log_entry_with_redo_prepare(log_entry);
|
|
} else {
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
int next_miss_log_entry(const uint64_t miss_log_id, clog::ObLogEntry &miss_log_entry)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
next_redo_(miss_log_id, miss_log_entry);
|
|
|
|
return ret;
|
|
}
|
|
|
|
int get_prepare_log_entry(CaseType type, clog::ObLogEntry &log_entry)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
if (NORMAL_TRAN == type) {
|
|
next_prepare_(prepare_id_, log_entry);
|
|
} else if(REDO_WITH_PREPARE_TRAN == type) {
|
|
next_redo_with_prepare_(prepare_id_, log_entry);
|
|
} else {
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
private:
|
|
bool is_first;
|
|
int64_t miss_redo_cnt_;
|
|
};
|
|
|
|
struct TransLogInfo
|
|
{
|
|
// redo info
|
|
int64_t redo_log_cnt_;
|
|
ObLogIdArray redo_log_ids_;
|
|
|
|
// prepare info
|
|
int64_t seq_;
|
|
common::ObPartitionKey partition_;
|
|
int64_t prepare_timestamp_;
|
|
ObTransID trans_id_;
|
|
uint64_t prepare_log_id_;
|
|
uint64_t cluster_id_;
|
|
|
|
// commit info
|
|
int64_t global_trans_version_;
|
|
PartitionLogInfoArray participants_;
|
|
|
|
void reset()
|
|
{
|
|
redo_log_cnt_ = -1;
|
|
redo_log_ids_.reset();
|
|
seq_ = -1;
|
|
partition_.reset();
|
|
prepare_timestamp_ = -1;
|
|
trans_id_.reset();
|
|
prepare_log_id_ = -1;
|
|
cluster_id_ = -1;
|
|
global_trans_version_ = -1;
|
|
participants_.reset();
|
|
}
|
|
|
|
void reset(int64_t redo_cnt, ObLogIdArray &redo_log_ids,
|
|
int64_t seq, const ObPartitionKey partition, int64_t prepare_timestamp,
|
|
ObTransID &trans_id, uint64_t prepare_log_id, uint64_t cluster_id,
|
|
uint64_t global_trans_version, PartitionLogInfoArray &participants)
|
|
{
|
|
reset();
|
|
|
|
// redo
|
|
redo_log_cnt_ = redo_cnt;
|
|
redo_log_ids_ = redo_log_ids;
|
|
// prepare
|
|
seq_ = seq;
|
|
partition_ = partition;
|
|
prepare_timestamp_ = prepare_timestamp;
|
|
trans_id_ = trans_id;
|
|
prepare_log_id_ = prepare_log_id;
|
|
cluster_id_ = cluster_id;
|
|
|
|
// commmit
|
|
global_trans_version_ = global_trans_version;
|
|
participants_ = participants;
|
|
}
|
|
};
|
|
|
|
/*
|
|
* Mock Parser 1.
|
|
* Read Task, revert it immediately, and count Task number.
|
|
*/
|
|
class MockParser1 : public IObLogParser
|
|
{
|
|
public:
|
|
MockParser1() : commit_trans_cnt_(0), abort_trans_cnt_(0), info_queue_(), res_queue_() {}
|
|
virtual ~MockParser1()
|
|
{
|
|
info_queue_.destroy();
|
|
res_queue_.destroy();
|
|
}
|
|
|
|
int init()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
if (OB_FAIL(info_queue_.init(MOD_ID))) {
|
|
} else if (OB_FAIL(res_queue_.init(MOD_ID))) {
|
|
} else {
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
virtual int start() { return OB_SUCCESS; }
|
|
virtual void stop() { }
|
|
virtual void mark_stop_flag() { }
|
|
virtual int push(PartTransTask *task, const int64_t timeout)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
UNUSED(timeout);
|
|
|
|
if (OB_ISNULL(task)) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
} else {
|
|
TransLogInfo *trans_log_info = NULL;
|
|
int tmp_ret = OB_SUCCESS;
|
|
|
|
if (OB_SUCCESS != (tmp_ret = info_queue_.pop(trans_log_info))) {
|
|
// pop error
|
|
} else if(NULL == trans_log_info){
|
|
tmp_ret = OB_ERR_UNEXPECTED;
|
|
} else {
|
|
// do nothing
|
|
}
|
|
|
|
bool check_result;
|
|
if (task->is_normal_trans()) {
|
|
// Verify correct data for partitioning tasks
|
|
if (OB_SUCCESS == tmp_ret) {
|
|
check_result = check_nomal_tran(*task, *trans_log_info);
|
|
} else {
|
|
check_result = false;
|
|
}
|
|
task->revert();
|
|
commit_trans_cnt_ += 1;
|
|
} else if (task->is_heartbeat()) {
|
|
// Verify correct data for partitioning tasks
|
|
if (OB_SUCCESS == tmp_ret) {
|
|
check_result = check_abort_tran(*task, *trans_log_info);
|
|
} else {
|
|
check_result = false;
|
|
}
|
|
task->revert();
|
|
abort_trans_cnt_ += 1;
|
|
}
|
|
|
|
// Save the validation result, no need to handle a failed push, the pop result will be validated
|
|
if (OB_SUCCESS != (tmp_ret = res_queue_.push(check_result))) {
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
virtual int get_pending_task_count(int64_t &task_count)
|
|
{
|
|
UNUSED(task_count);
|
|
|
|
return OB_SUCCESS;
|
|
}
|
|
int64_t get_commit_trans_cnt() const { return commit_trans_cnt_; }
|
|
int64_t get_abort_trans_cnt() const { return abort_trans_cnt_; }
|
|
int push_into_queue(TransLogInfo *trans_log_info)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
if (OB_FAIL(info_queue_.push(trans_log_info))) {
|
|
} else {
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
int get_check_result(bool &result)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
if (OB_FAIL(res_queue_.pop(result))) {
|
|
} else {
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
private:
|
|
// for PartTransTask correctness validation
|
|
// for commit transactions, validate redo/prepare/commit info
|
|
// For abort transactions, since they are converted to heartbeat information, only seq_, partition_, prepare_timestamp_ need to be validated
|
|
bool check_nomal_tran(PartTransTask &task, TransLogInfo &trans_log_info)
|
|
{
|
|
bool bool_ret = true;
|
|
|
|
// redo info
|
|
const SortedRedoLogList &redo_list = task.get_sorted_redo_list();
|
|
if (redo_list.log_num_ != trans_log_info.redo_log_cnt_) {
|
|
bool_ret = false;
|
|
} else {
|
|
RedoLogNode *redo_node = redo_list.head_;
|
|
for (int64_t idx = 0; true == bool_ret && idx < trans_log_info.redo_log_cnt_; ++idx) {
|
|
if (redo_node->start_log_id_ == trans_log_info.redo_log_ids_[idx]) {
|
|
// do nothing
|
|
} else {
|
|
bool_ret = false;
|
|
}
|
|
|
|
redo_node = redo_node->next_;
|
|
}
|
|
}
|
|
|
|
// prepare info
|
|
if (bool_ret) {
|
|
if (trans_log_info.seq_ == task.get_seq()
|
|
&& trans_log_info.partition_ == task.get_partition()
|
|
&& trans_log_info.prepare_timestamp_ == task.get_timestamp()
|
|
&& trans_log_info.trans_id_ == task.get_trans_id()
|
|
&& trans_log_info.prepare_log_id_ == task.get_prepare_log_id()
|
|
&& trans_log_info.cluster_id_ == task.get_cluster_id()) {
|
|
} else {
|
|
bool_ret = false;
|
|
OBLOG_LOG(INFO, "compare", K(trans_log_info.seq_), K(task.get_seq()));
|
|
OBLOG_LOG(INFO, "compare", K(trans_log_info.partition_), K(task.get_partition()));
|
|
OBLOG_LOG(INFO, "compare", K(trans_log_info.prepare_timestamp_), K(task.get_timestamp()));
|
|
OBLOG_LOG(INFO, "compare", K(trans_log_info.trans_id_), K(task.get_trans_id()));
|
|
OBLOG_LOG(INFO, "compare", K(trans_log_info.prepare_log_id_), K(task.get_prepare_log_id()));
|
|
OBLOG_LOG(INFO, "compare", K(trans_log_info.cluster_id_), K(task.get_cluster_id()));
|
|
}
|
|
}
|
|
|
|
//// commit info
|
|
if (bool_ret) {
|
|
if (trans_log_info.global_trans_version_ != task.get_global_trans_version()) {
|
|
bool_ret = false;
|
|
} else {
|
|
const ObPartitionLogInfo *part = task.get_participants();
|
|
const int64_t part_cnt = task.get_participant_count();
|
|
|
|
if (trans_log_info.participants_.count() != part_cnt) {
|
|
bool_ret = false;
|
|
} else {
|
|
const ObPartitionLogInfo *pinfo1 = NULL;
|
|
const ObPartitionLogInfo *pinfo2 = NULL;
|
|
|
|
for (int64_t idx = 0; true == bool_ret && idx < part_cnt; ++idx) {
|
|
pinfo1 = &trans_log_info.participants_.at(idx);
|
|
pinfo2 = part + idx;
|
|
|
|
if (pinfo1->get_partition() == pinfo2->get_partition()
|
|
&& pinfo1->get_log_id() == pinfo2->get_log_id()
|
|
&& pinfo1->get_log_timestamp() == pinfo2->get_log_timestamp()) {
|
|
// do nothing
|
|
} else {
|
|
bool_ret = false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return bool_ret;
|
|
}
|
|
|
|
bool check_abort_tran(PartTransTask &task, TransLogInfo &trans_log_info)
|
|
{
|
|
bool bool_ret = true;
|
|
|
|
if (trans_log_info.seq_ == task.get_seq()
|
|
&& trans_log_info.partition_ == task.get_partition()
|
|
&& trans_log_info.prepare_timestamp_ == task.get_timestamp()) {
|
|
} else {
|
|
bool_ret = false;
|
|
}
|
|
|
|
return bool_ret;
|
|
}
|
|
private:
|
|
static const int64_t MOD_ID = 1;
|
|
private:
|
|
int64_t commit_trans_cnt_;
|
|
int64_t abort_trans_cnt_;
|
|
// save TransLogInfo
|
|
ObMapQueue<TransLogInfo*> info_queue_;
|
|
// save verify result
|
|
ObMapQueue<bool> res_queue_;
|
|
};
|
|
|
|
class MockParser2 : public IObLogParser
|
|
{
|
|
public:
|
|
MockParser2() : commit_trans_cnt_(0) {}
|
|
virtual ~MockParser2() { }
|
|
|
|
virtual int start() { return OB_SUCCESS; }
|
|
virtual void stop() { }
|
|
virtual void mark_stop_flag() { }
|
|
virtual int push(PartTransTask *task, const int64_t timeout)
|
|
{
|
|
UNUSED(timeout);
|
|
if (OB_ISNULL(task)) {
|
|
} else if (task->is_normal_trans()) {
|
|
task->revert();
|
|
commit_trans_cnt_ += 1;
|
|
}
|
|
|
|
return OB_SUCCESS;
|
|
}
|
|
virtual int get_pending_task_count(int64_t &task_count)
|
|
{
|
|
UNUSED(task_count);
|
|
|
|
return OB_SUCCESS;
|
|
}
|
|
int64_t get_commit_trans_cnt() const { return commit_trans_cnt_; }
|
|
private:
|
|
int64_t commit_trans_cnt_;
|
|
};
|
|
|
|
|
|
}
|
|
}
|