Files
oceanbase/unittest/liboblog/test_log_fetcher_common_utils.h
2021-09-30 19:50:25 +08:00

1495 lines
40 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#include <vector>
#include <algorithm>
#include "share/ob_define.h"
#include "storage/ob_storage_log_type.h"
#include "storage/transaction/ob_trans_log.h"
#include "liboblog/src/ob_log_instance.h"
#include "liboblog/src/ob_log_fetcher_stream.h"
#include "liboblog/src/ob_log_fetcher_part_stream.h"
#include "ob_log_utils.h" // get_timestamp
using namespace oceanbase;
using namespace common;
using namespace liboblog;
using namespace fetcher;
using namespace transaction;
using namespace storage;
using namespace clog;
namespace oceanbase
{
namespace unittest
{
/*
* Utils.
*/
typedef std::vector<ObAddr> Svrs;
typedef std::vector<ObPartitionKey> PKeys;
typedef std::vector<uint64_t> LogIds;
typedef std::vector<int64_t> Tstamps;
/*
* Mock Rpc Interface 1.
* It owns N partitions, each has M log entries.
* It returns L log entries in each fetch_log() call.
* Log entry contains nothing.
* Used to test:
* - add partition into Stream.
* - update file id & offset.
* - fetch log.
* - kick out offline partitions.
* - discard partitions.
*/
class MockRpcInterface1 : public IFetcherRpcInterface
{
struct Entry
{
file_id_t file_id_;
offset_t offset_;
uint64_t log_id_;
ObPartitionKey pkey_;
};
// Use offset_ as index of Entry in EntryVec.
typedef std::vector<Entry> EntryVec;
public:
MockRpcInterface1(const PKeys &pkeys,
const int64_t log_entry_per_p,
const int64_t log_entry_per_call)
{
log_entry_per_call_ = log_entry_per_call;
log_entry_per_p_ = log_entry_per_p;
addr_ = ObAddr(ObAddr::IPV4, "127.0.0.1", 5999);
// Gen entries.
int64_t log_entry_cnt = 0;
for (int64_t log_id = 1; log_id < log_entry_per_p + 1; ++log_id) {
for (int64_t pidx = 0, cnt = pkeys.size(); pidx < cnt; ++pidx) {
// Gen entry.
Entry entry;
entry.pkey_ = pkeys.at(pidx);
entry.file_id_ = 1;
entry.offset_ = static_cast<offset_t>(log_entry_cnt++);
entry.log_id_ = log_id;
// Save it.
entries_.push_back(entry);
}
}
}
virtual ~MockRpcInterface1() { }
virtual void set_svr(const common::ObAddr& svr) { UNUSED(svr); }
virtual const ObAddr& get_svr() const { static ObAddr svr; return svr; }
virtual void set_timeout(const int64_t timeout) { UNUSED(timeout); }
virtual int req_start_log_id_by_ts(
const obrpc::ObLogReqStartLogIdByTsRequest& req,
obrpc::ObLogReqStartLogIdByTsResponse& res)
{
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int req_start_log_id_by_ts_2(const obrpc::ObLogReqStartLogIdByTsRequestWithBreakpoint &req,
obrpc::ObLogReqStartLogIdByTsResponseWithBreakpoint &res) {
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int req_start_pos_by_log_id_2(const obrpc::ObLogReqStartPosByLogIdRequestWithBreakpoint &req,
obrpc::ObLogReqStartPosByLogIdResponseWithBreakpoint &res) {
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int req_start_pos_by_log_id(
const obrpc::ObLogReqStartPosByLogIdRequest& req,
obrpc::ObLogReqStartPosByLogIdResponse& res)
{
/*
* Err Supportted:
* - OB_SUCCESS
* - OB_ENTRY_NOT_EXIST: partition exists without any log
* - OB_ERR_OUT_OF_LOWER_BOUND: log id beyond lower bound
* - OB_ERR_OUT_OF_UPPER_BOUND: log id beyond upper bound
*/
typedef obrpc::ObLogReqStartPosByLogIdRequest::Param Param;
typedef obrpc::ObLogReqStartPosByLogIdResponse::Result Result;
for (int64_t idx = 0, cnt = req.get_params().count(); idx < cnt; ++idx) {
// Locate for a partition.
const Param &param = req.get_params().at(idx);
const ObPartitionKey &pkey = param.pkey_;
const uint64_t start_log_id = param.start_log_id_;
Result result;
result.reset();
// Search.
bool done = false;
bool partition_exist = false;
for (int64_t entry_idx = 0, entry_cnt = entries_.size();
_SUCC_(result.err_) && !done && entry_idx < entry_cnt;
++entry_idx) {
const Entry &entry = entries_.at(entry_idx);
if (entry.pkey_ == pkey) {
partition_exist = true;
// Got it.
if (start_log_id == entry.log_id_) {
result.err_ = OB_SUCCESS;
result.file_id_ = 1;
result.offset_ = entry.offset_;
done = true;
}
// Too small log id.
else if (start_log_id < entry.log_id_) {
result.err_ = OB_ERR_OUT_OF_LOWER_BOUND;
}
}
}
if (!done && _SUCC_(result.err_)) {
// No log entry.
if (!partition_exist) {
result.err_ = OB_ENTRY_NOT_EXIST;
}
// Too large log id.
else {
result.err_ = OB_ERR_OUT_OF_UPPER_BOUND;
}
}
res.append_result(result);
}
_D_("mock rpc 1 req pos", K(req), K(res));
return OB_SUCCESS;
}
virtual int fetch_log(
const obrpc::ObLogExternalFetchLogRequest& req,
obrpc::ObLogExternalFetchLogResponse& res)
{
typedef obrpc::ObLogExternalFetchLogRequest::Param Param;
typedef obrpc::ObLogExternalFetchLogResponse::OfflinePartition OP;
// Fetch log.
const offset_t offset = req.get_offset();
if (offset < 0) {
return OB_INVALID_ARGUMENT;
}
offset_t ret_offset = offset;
// Scan.
for (int64_t idx = static_cast<int64_t>(offset), cnt = entries_.size();
idx < cnt && res.get_log_num() < log_entry_per_call_;
++idx) {
const Entry &entry = entries_.at(idx);
bool fetch = false;
for (int64_t pidx = 0, pcnt = req.get_params().count();
!fetch && pidx < pcnt;
++pidx) {
const Param &param = req.get_params().at(pidx);
if (entry.pkey_ == param.pkey_
&& param.start_log_id_ <= entry.log_id_
&& entry.log_id_ <= param.last_log_id_) {
fetch = true;
}
}
if (fetch) {
ret_offset = static_cast<offset_t>(entry.offset_);
// Gen header.
int64_t ts = get_timestamp();
ObProposalID proposal_id;
proposal_id.addr_ = addr_;
proposal_id.ts_ = ts;
ObLogEntryHeader header;
header.generate_header(OB_LOG_SUBMIT, entry.pkey_,
entry.log_id_, mock_load_, mock_load_len_,
ts, ts, proposal_id, ts, ObVersion(1));
ObLogEntry log_entry;
log_entry.generate_entry(header, mock_load_);
res.append_log(log_entry);
}
}
res.set_file_id_offset(1, ret_offset + 1);
// Handle offline partition.
// Here, if a partition reaches its last log, it is offline.
for (int64_t idx = 0, cnt = req.get_params().count(); idx < cnt; ++idx) {
const Param &param = req.get_params().at(idx);
const uint64_t last_log_id = log_entry_per_p_;
if (last_log_id < param.start_log_id_) {
OP op;
op.pkey_ = param.pkey_;
// op.last_log_id_ = last_log_id;
op.sync_ts_ = last_log_id;
res.append_offline_partition(op);
}
}
_D_("mock rpc 1 fetch log", K(req), K(res));
return OB_SUCCESS;
}
virtual int req_heartbeat_info(
const obrpc::ObLogReqHeartbeatInfoRequest& req,
obrpc::ObLogReqHeartbeatInfoResponse& res)
{
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int req_leader_heartbeat(
const obrpc::ObLogLeaderHeartbeatReq &req,
obrpc::ObLogLeaderHeartbeatResp &res)
{
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int open_stream(const obrpc::ObLogOpenStreamReq &req,
obrpc::ObLogOpenStreamResp &res) {
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int fetch_stream_log(const obrpc::ObLogStreamFetchLogReq &req,
obrpc::ObLogStreamFetchLogResp &res) {
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int req_svr_feedback(const ReqLogSvrFeedback &feedback)
{
UNUSED(feedback);
return OB_SUCCESS;
}
private:
ObAddr addr_;
int64_t log_entry_per_call_;
int64_t log_entry_per_p_;
EntryVec entries_;
static const int64_t mock_load_len_ = 8;
char mock_load_[mock_load_len_];
};
/*
* Factory.
*/
class MockRpcInterface1Factory : public IFetcherRpcInterfaceFactory
{
public:
MockRpcInterface1Factory(const PKeys &pkeys,
const int64_t log_entry_per_p,
const int64_t log_entry_per_call)
: pkeys_(pkeys),
log_entry_per_p_(log_entry_per_p),
log_entry_per_call_(log_entry_per_call)
{ }
virtual int new_fetcher_rpc_interface(IFetcherRpcInterface*& rpc)
{
rpc = new MockRpcInterface1(pkeys_, log_entry_per_p_, log_entry_per_call_);
return OB_SUCCESS;
}
virtual int delete_fetcher_rpc_interface(IFetcherRpcInterface* rpc)
{
delete rpc;
return OB_SUCCESS;
}
private:
PKeys pkeys_;
int64_t log_entry_per_p_;
int64_t log_entry_per_call_;
};
/*
* Mock Rpc Interface 2.
* It owns N servers, each hold some partitions, one of them is
* the leader. When request start log id, a preseted value is returned.
* Used to test:
* - fetch partition
* - locate start log id
* - activate partition stream
* - discard partition stream
*/
class MockRpcInterface2 : public IFetcherRpcInterface
{
struct Partition
{
ObPartitionKey pkey_;
uint64_t start_log_id_;
bool is_leader_;
};
struct Svr
{
ObAddr svr_;
std::vector<Partition> partitions_;
bool operator==(const Svr &other) const
{
return svr_ == other.svr_;
}
bool operator<(const Svr &other) const
{
return svr_ < other.svr_;
}
};
public:
/*
* Set static result set. The first partition in svrs is the leader.
*/
static void add_partition(const ObPartitionKey &pkey,
uint64_t start_log_id,
std::vector<ObAddr> svrs)
{
EXPECT_NE(0, svrs.size());
Partition pt = { pkey, start_log_id, false };
for (int64_t idx = 0, cnt = svrs.size();
idx < cnt;
++idx) {
pt.is_leader_ = (0 == idx);
Svr target;
target.svr_ = svrs.at(idx);
std::vector<Svr>::iterator itor =
std::find(svrs_.begin(), svrs_.end(), target);
if (svrs_.end() == itor) {
target.partitions_.push_back(pt);
svrs_.push_back(target);
std::sort(svrs_.begin(), svrs_.end());
}
else {
(*itor).partitions_.push_back(pt);
}
}
}
/*
* Clear static result set.
*/
static void clear_result_set()
{
svrs_.clear();
}
public:
virtual void set_svr(const common::ObAddr& svr) { svr_ = svr; }
virtual const ObAddr& get_svr() const { return svr_; }
virtual void set_timeout(const int64_t timeout) { UNUSED(timeout); }
virtual int req_start_log_id_by_ts(
const obrpc::ObLogReqStartLogIdByTsRequest& req,
obrpc::ObLogReqStartLogIdByTsResponse& res)
{
bool svr_exist = false;
for (int64_t idx = 0, cnt = req.get_params().count(); idx < cnt; ++idx) {
const ObPartitionKey &pkey = req.get_params().at(idx).pkey_;
bool done = false;
for (int64_t svr_idx = 0, svr_cnt = svrs_.size();
svr_idx < svr_cnt;
++svr_idx) {
// Simulating sending rpc to svr.
if (svr_ == svrs_.at(svr_idx).svr_) {
svr_exist = true;
const Svr &svr = svrs_.at(svr_idx);
for (int64_t pidx = 0, pcnt = svr.partitions_.size();
pidx < pcnt;
++pidx) {
const Partition &p = svr.partitions_.at(pidx);
if (pkey == p.pkey_) {
done = true;
typedef obrpc::ObLogReqStartLogIdByTsResponse::Result Result;
Result result = { OB_SUCCESS, p.start_log_id_, false};
res.append_result(result);
}
}
if (!done) {
res.set_err(OB_PARTITION_NOT_EXIST);
}
}
} // End for.
}
_D_("mock rpc req start log id", K(req), K(res));
return (svr_exist) ? OB_SUCCESS : OB_TIMEOUT;
}
virtual int req_start_log_id_by_ts_2(const obrpc::ObLogReqStartLogIdByTsRequestWithBreakpoint &req,
obrpc::ObLogReqStartLogIdByTsResponseWithBreakpoint &res) {
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int req_start_pos_by_log_id_2(const obrpc::ObLogReqStartPosByLogIdRequestWithBreakpoint &req,
obrpc::ObLogReqStartPosByLogIdResponseWithBreakpoint &res) {
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int req_start_pos_by_log_id(
const obrpc::ObLogReqStartPosByLogIdRequest& req,
obrpc::ObLogReqStartPosByLogIdResponse& res)
{
UNUSED(req);
UNUSED(res);
// Timeout.
_D_("mock rpc req pos by log id", K(req), K(res));
return OB_TIMEOUT;
}
virtual int fetch_log(
const obrpc::ObLogExternalFetchLogRequest& req,
obrpc::ObLogExternalFetchLogResponse& res)
{
UNUSED(req);
UNUSED(res);
// Timeout.
_D_("mock rpc req pos by log id", K(req), K(res));
return OB_TIMEOUT;
}
virtual int req_heartbeat_info(
const obrpc::ObLogReqHeartbeatInfoRequest& req,
obrpc::ObLogReqHeartbeatInfoResponse& res)
{
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int req_leader_heartbeat(
const obrpc::ObLogLeaderHeartbeatReq &req,
obrpc::ObLogLeaderHeartbeatResp &res)
{
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int open_stream(const obrpc::ObLogOpenStreamReq &req,
obrpc::ObLogOpenStreamResp &res) {
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int fetch_stream_log(const obrpc::ObLogStreamFetchLogReq &req,
obrpc::ObLogStreamFetchLogResp &res) {
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int get_log_svr(const ObPartitionKey& pkey, const uint64_t log_id,
ObSvrs& svrs, int& leader_cnt)
{
UNUSED(log_id);
leader_cnt = 0;
for (int64_t svr_idx = 0, svr_cnt = svrs_.size();
svr_idx < svr_cnt;
++svr_idx) {
Svr &svr = svrs_.at(svr_idx);
for (int64_t pidx = 0, pcnt = svr.partitions_.size();
pidx < pcnt;
++pidx) {
const Partition &p = svr.partitions_.at(pidx);
if (pkey == p.pkey_) {
svrs.push_back(svr.svr_);
if (p.is_leader_) {
std::swap(svrs.at(leader_cnt), svrs.at(svrs.count() - 1));
leader_cnt += 1;
}
}
}
}
_D_("mock rpc req log servers", K(pkey), K(svrs), K(leader_cnt));
return OB_SUCCESS;
}
virtual int req_svr_feedback(const ReqLogSvrFeedback &feedback)
{
UNUSED(feedback);
return OB_SUCCESS;
}
private:
// Target svr.
ObAddr svr_;
// Data set.
static std::vector<Svr> svrs_;
};
// Static data set. So all instances could access it.
std::vector<MockRpcInterface2::Svr> MockRpcInterface2::svrs_;
/*
* Factory.
*/
class MockRpcInterface2Factory : public IFetcherRpcInterfaceFactory
{
public:
virtual int new_fetcher_rpc_interface(IFetcherRpcInterface*& rpc)
{
rpc = new MockRpcInterface2();
return OB_SUCCESS;
}
virtual int delete_fetcher_rpc_interface(IFetcherRpcInterface* rpc)
{
delete rpc;
return OB_SUCCESS;
}
};
/*
* Mock Rpc Interface 3.
* It owns some servers and partitions, can return
* svr addr and heartbeat timestamps.
* Notice: user set <svr, partition key, log id, tstamp> tuples
* as results.
* Used to test:
* - Heartbeat facilities.
*/
class MockRpcInterface3 : public IFetcherRpcInterface
{
struct Entry
{
ObAddr svr_;
ObPartitionKey pkey_;
uint64_t log_id_;
int64_t tstamp_;
bool operator<(const Entry &other)
{
return log_id_ < other.log_id_;
}
};
typedef std::vector<Entry> EntryVec;
public:
static void clear_result()
{
entry_vec_.clear();
}
static void add_result(const ObAddr &svr, const ObPartitionKey &pkey,
const uint64_t log_id, const int64_t tstamp)
{
Entry entry = { svr, pkey, log_id, tstamp };
entry_vec_.push_back(entry);
}
public:
virtual void set_svr(const common::ObAddr& svr) { UNUSED(svr); }
virtual const ObAddr& get_svr() const { static ObAddr svr; return svr; }
virtual void set_timeout(const int64_t timeout) { UNUSED(timeout); }
virtual int req_start_log_id_by_ts(
const obrpc::ObLogReqStartLogIdByTsRequest& req,
obrpc::ObLogReqStartLogIdByTsResponse& res)
{
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int req_start_log_id_by_ts_2(const obrpc::ObLogReqStartLogIdByTsRequestWithBreakpoint &req,
obrpc::ObLogReqStartLogIdByTsResponseWithBreakpoint &res) {
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int req_start_pos_by_log_id_2(const obrpc::ObLogReqStartPosByLogIdRequestWithBreakpoint &req,
obrpc::ObLogReqStartPosByLogIdResponseWithBreakpoint &res) {
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int req_start_pos_by_log_id(
const obrpc::ObLogReqStartPosByLogIdRequest& req,
obrpc::ObLogReqStartPosByLogIdResponse& res)
{
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int fetch_log(
const obrpc::ObLogExternalFetchLogRequest& req,
obrpc::ObLogExternalFetchLogResponse& res)
{
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int req_heartbeat_info(
const obrpc::ObLogReqHeartbeatInfoRequest& req,
obrpc::ObLogReqHeartbeatInfoResponse& res)
{
typedef obrpc::ObLogReqHeartbeatInfoRequest::Param Param;
typedef obrpc::ObLogReqHeartbeatInfoResponse::Result Result;
// Itor params.
for (int64_t idx = 0, cnt = req.get_params().count(); idx < cnt; ++idx) {
const Param &param = req.get_params().at(idx);
// Find result.
bool done = false;
for (int64_t idx2 = 0, cnt2 = entry_vec_.size();
!done && idx2 < cnt2;
++idx2) {
const Entry &entry = entry_vec_[idx2];
if (param.pkey_ == entry.pkey_
&& param.log_id_ == entry.log_id_) {
done = true;
Result result;
result.err_ = OB_SUCCESS;
result.tstamp_ = entry.tstamp_;
res.append_result(result);
}
}
if (!done) {
Result result;
result.err_ = OB_NEED_RETRY;
res.append_result(result);
}
}
_D_("mock rpc: req heartbeat", K(req), K(res));
return OB_SUCCESS;
}
virtual int req_leader_heartbeat(
const obrpc::ObLogLeaderHeartbeatReq &req,
obrpc::ObLogLeaderHeartbeatResp &res)
{
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int open_stream(const obrpc::ObLogOpenStreamReq &req,
obrpc::ObLogOpenStreamResp &res) {
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int fetch_stream_log(const obrpc::ObLogStreamFetchLogReq &req,
obrpc::ObLogStreamFetchLogResp &res) {
UNUSED(req);
UNUSED(res);
return OB_NOT_IMPLEMENT;
}
virtual int get_log_svr(const ObPartitionKey& pkey, const uint64_t log_id,
ObSvrs& svrs, int& leader_cnt)
{
// Todo. In this version, only one result is enough, log_id is not used.
UNUSED(log_id);
UNUSED(leader_cnt);
for (int64_t idx = 0, cnt = entry_vec_.size(); idx < cnt; ++idx) {
const Entry &entry = entry_vec_[idx];
if (pkey == entry.pkey_) {
svrs.push_back(entry.svr_);
break;
}
}
return OB_SUCCESS;
}
virtual int req_svr_feedback(const ReqLogSvrFeedback &feedback)
{
UNUSED(feedback);
return OB_SUCCESS;
}
private:
static EntryVec entry_vec_;
};
MockRpcInterface3::EntryVec MockRpcInterface3::entry_vec_;
/*
* Factory.
*/
class MockRpcInterface3Factory : public IFetcherRpcInterfaceFactory
{
public:
virtual int new_fetcher_rpc_interface(IFetcherRpcInterface*& rpc)
{
rpc = new MockRpcInterface3();
return OB_SUCCESS;
}
virtual int delete_fetcher_rpc_interface(IFetcherRpcInterface* rpc)
{
delete rpc;
return OB_SUCCESS;
}
};
/*
* TransLog Generator 1.
* Generate single partition transaction logs.
* Support get trans logs in CORRECT order.
* Use:
* - Call next_trans(), specify trans params.
* - Get logs in correct order: redo, redo, ..., prepare, commit/abort.
*/
struct TransParam1
{
// Params used in trans log.
ObPartitionKey pkey_;
ObTransID trans_id_;
ObAddr scheduler_;
ObPartitionKey coordinator_;
ObPartitionArray participants_;
ObStartTransParam trans_param_;
};
class TransLogGenerator1
{
public:
TransLogGenerator1()
: param_(),
redo_(),
prepare_(),
commit_(),
abort_()
{ }
virtual ~TransLogGenerator1() { }
public:
void next_trans(const TransParam1 &param)
{
param_ = param;
}
const ObTransRedoLog& next_redo(const uint64_t log_id)
{
int err = OB_SUCCESS;
uint64_t tenant_id = 100;
const uint64_t cluster_id = 1000;
redo_.reset();
ObVersion active_memstore_version(1);
err = redo_.init(OB_LOG_TRANS_REDO, param_.pkey_, param_.trans_id_,
tenant_id, log_id, param_.scheduler_, param_.coordinator_,
param_.participants_, param_.trans_param_, cluster_id, active_memstore_version);
EXPECT_EQ(OB_SUCCESS, err);
ObTransMutator &mutator = redo_.get_mutator();
if (NULL == mutator.get_mutator_buf()) {
mutator.init(true);
}
const char *data = "fly";
char *buf = static_cast<char*>(mutator.alloc(strlen(data)));
strcpy(buf, data);
return redo_;
}
const ObTransPrepareLog& next_prepare(const ObRedoLogIdArray &all_redos)
{
int err = OB_SUCCESS;
uint64_t tenant_id = 100;
const uint64_t cluster_id = 1000;
prepare_.reset();
ObVersion active_memstore_version(1);
err = prepare_.init(OB_LOG_TRANS_PREPARE, param_.pkey_, param_.trans_id_,
tenant_id, param_.scheduler_, param_.coordinator_,
param_.participants_, param_.trans_param_,
OB_SUCCESS, all_redos, 0, cluster_id, active_memstore_version);
EXPECT_EQ(OB_SUCCESS, err);
return prepare_;
}
const ObTransCommitLog& next_commit(const uint64_t prepare_log_id)
{
int err = OB_SUCCESS;
const uint64_t cluster_id = 1000;
ObPartitionLogInfo ptl_id(param_.pkey_, prepare_log_id, get_timestamp());
PartitionLogInfoArray ptl_ids;
if (OB_INVALID_ID == prepare_log_id) {
// Pass. For prepare-commit trans log.
}
else {
err = ptl_ids.push_back(ptl_id);
EXPECT_EQ(OB_SUCCESS, err);
}
commit_.reset();
err = commit_.init(OB_LOG_TRANS_COMMIT, param_.pkey_, param_.trans_id_,
ptl_ids, 1, 0, cluster_id);
EXPECT_EQ(OB_SUCCESS, err);
return commit_;
}
const ObTransAbortLog& next_abort()
{
int err = OB_SUCCESS;
const uint64_t cluster_id = 1000;
PartitionLogInfoArray array;
abort_.reset();
err = abort_.init(OB_LOG_TRANS_ABORT, param_.pkey_, param_.trans_id_, array, cluster_id);
EXPECT_EQ(OB_SUCCESS, err);
return abort_;
}
private:
TransParam1 param_;
ObTransRedoLog redo_;
ObTransPrepareLog prepare_;
ObTransCommitLog commit_;
ObTransAbortLog abort_;
};
/*
* Transaction Log Entry Generator 1.
* Generate log entries of transactions.
*/
class TransLogEntryGenerator1
{
public:
TransLogEntryGenerator1(const ObPartitionKey &pkey)
: pkey_(pkey),
log_id_(0),
remain_log_cnt_(0),
commit_(false),
param_(),
trans_log_gen_(),
prepare_id_(0),
redos_(),
data_len_(0)
{
ObAddr addr(ObAddr::IPV4, "127.0.0.1", 5566);
param_.pkey_ = pkey_;
param_.trans_id_ = ObTransID(addr);
param_.scheduler_ = addr;
param_.coordinator_ = pkey_;
int err = param_.participants_.push_back(pkey_);
EXPECT_EQ(OB_SUCCESS, err);
param_.trans_param_.set_access_mode(ObTransAccessMode::READ_WRITE);
param_.trans_param_.set_isolation(ObTransIsolation::READ_COMMITED);
param_.trans_param_.set_type(ObTransType::TRANS_NORMAL);
buf_ = new char[buf_len_];
EXPECT_TRUE(NULL != buf_);
}
virtual ~TransLogEntryGenerator1()
{
delete[] buf_;
}
// Generate normal trans.
// Start a new trans.
void next_trans(const int64_t redo_cnt, bool commit)
{
remain_log_cnt_ = 2 + redo_cnt;
commit_ = commit;
redos_.reset();
trans_log_gen_.next_trans(param_);
}
// Get next log entry.
int next_log_entry(ObLogEntry &log_entry)
{
int ret = OB_SUCCESS;
if (2 < remain_log_cnt_) {
next_redo_(log_entry);
// Store redo id.
int err = redos_.push_back(log_id_);
EXPECT_EQ(OB_SUCCESS, err);
log_id_ += 1;
remain_log_cnt_ -= 1;
}
else if (2 == remain_log_cnt_) {
next_prepare_(log_entry);
prepare_id_ = log_id_;
log_id_ += 1;
remain_log_cnt_ -= 1;
}
else if (1 == remain_log_cnt_ && commit_) {
next_commit_(log_entry);
log_id_ += 1;
remain_log_cnt_ -= 1;
}
else if (1 == remain_log_cnt_ && !commit_) {
next_abort_(log_entry);
log_id_ += 1;
remain_log_cnt_ -= 1;
}
else {
ret = OB_ITER_END;
}
return ret;
}
// Generate: redo, redo, redo, prepare-commit.
int next_log_entry_2(ObLogEntry &log_entry)
{
int ret = OB_SUCCESS;
if (2 < remain_log_cnt_) {
next_redo_(log_entry);
// Store redo id.
int err = redos_.push_back(log_id_);
EXPECT_EQ(OB_SUCCESS, err);
log_id_ += 1;
remain_log_cnt_ -= 1;
}
else if (2 == remain_log_cnt_ && commit_) {
next_prepare_with_commit(log_entry);
log_id_ += 1;
remain_log_cnt_ -= 2;
}
else if (2 == remain_log_cnt_ && !commit_) {
next_prepare_(log_entry);
log_id_ += 1;
remain_log_cnt_ -= 1;
}
else if (1 == remain_log_cnt_ && !commit_) {
next_abort_(log_entry);
log_id_ += 1;
remain_log_cnt_ -= 1;
}
else {
ret = OB_ITER_END;
}
return ret;
}
private:
void next_redo_(ObLogEntry &log_entry)
{
int err = OB_SUCCESS;
// Gen trans log.
const ObTransRedoLog &redo = trans_log_gen_.next_redo(log_id_);
int64_t pos = 0;
err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_REDO);
EXPECT_EQ(OB_SUCCESS, err);
err = serialization::encode_i64(buf_, buf_len_, pos, 0);
EXPECT_EQ(OB_SUCCESS, err);
err = redo.serialize(buf_, buf_len_, pos);
EXPECT_EQ(OB_SUCCESS, err);
data_len_ = pos;
// Gen entry header.
ObLogEntryHeader header;
header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_,
data_len_, get_timestamp(), get_timestamp(),
ObProposalID(), get_timestamp(), ObVersion(0));
// Gen log entry.
log_entry.generate_entry(header, buf_);
}
void next_prepare_(ObLogEntry &log_entry)
{
int err = OB_SUCCESS;
// Gen trans log.
const ObTransPrepareLog &prepare= trans_log_gen_.next_prepare(redos_);
int64_t pos = 0;
err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_PREPARE);
EXPECT_EQ(OB_SUCCESS, err);
err = serialization::encode_i64(buf_, buf_len_, pos, 0);
EXPECT_EQ(OB_SUCCESS, err);
err = prepare.serialize(buf_, buf_len_, pos);
EXPECT_EQ(OB_SUCCESS, err);
data_len_ = pos;
// Gen entry header.
ObLogEntryHeader header;
header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_,
data_len_, get_timestamp(), get_timestamp(),
ObProposalID(), get_timestamp(), ObVersion(0));
// Gen log entry.
log_entry.generate_entry(header, buf_);
}
void next_commit_(ObLogEntry &log_entry)
{
int err = OB_SUCCESS;
// Gen trans log.
const ObTransCommitLog &commit = trans_log_gen_.next_commit(prepare_id_);
int64_t pos = 0;
err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_COMMIT);
EXPECT_EQ(OB_SUCCESS, err);
err = serialization::encode_i64(buf_, buf_len_, pos, 0);
EXPECT_EQ(OB_SUCCESS, err);
err = commit.serialize(buf_, buf_len_, pos);
EXPECT_EQ(OB_SUCCESS, err);
data_len_ = pos;
// Gen entry header.
ObLogEntryHeader header;
header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_,
data_len_, get_timestamp(), get_timestamp(),
ObProposalID(), get_timestamp(), ObVersion(0));
// Gen log entry.
log_entry.generate_entry(header, buf_);
}
void next_abort_(ObLogEntry &log_entry)
{
int err = OB_SUCCESS;
// Gen trans log.
const ObTransAbortLog &abort = trans_log_gen_.next_abort();
int64_t pos = 0;
err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_ABORT);
EXPECT_EQ(OB_SUCCESS, err);
err = serialization::encode_i64(buf_, buf_len_, pos, 0);
EXPECT_EQ(OB_SUCCESS, err);
err = abort.serialize(buf_, buf_len_, pos);
EXPECT_EQ(OB_SUCCESS, err);
data_len_ = pos;
// Gen entry header.
ObLogEntryHeader header;
header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_,
data_len_, get_timestamp(), get_timestamp(),
ObProposalID(), get_timestamp(), ObVersion(0));
// Gen log entry.
log_entry.generate_entry(header, buf_);
}
void next_prepare_with_commit(ObLogEntry &log_entry)
{
int err = OB_SUCCESS;
// Gen trans log.
const ObTransPrepareLog &prepare= trans_log_gen_.next_prepare(redos_);
const ObTransCommitLog &commit = trans_log_gen_.next_commit(OB_INVALID_ID);
int64_t pos = 0;
err = serialization::encode_i64(buf_, buf_len_,
pos, OB_LOG_TRANS_PREPARE_WITH_COMMIT);
EXPECT_EQ(OB_SUCCESS, err);
err = serialization::encode_i64(buf_, buf_len_, pos, 0);
EXPECT_EQ(OB_SUCCESS, err);
err = prepare.serialize(buf_, buf_len_, pos);
EXPECT_EQ(OB_SUCCESS, err);
err = commit.serialize(buf_, buf_len_, pos);
EXPECT_EQ(OB_SUCCESS, err);
data_len_ = pos;
// Gen entry header.
ObLogEntryHeader header;
header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_,
data_len_, get_timestamp(), get_timestamp(),
ObProposalID(), get_timestamp(), ObVersion(0));
// Gen log entry.
log_entry.generate_entry(header, buf_);
}
private:
// Params.
ObPartitionKey pkey_;
uint64_t log_id_;
int64_t remain_log_cnt_;
bool commit_;
// Gen.
TransParam1 param_;
TransLogGenerator1 trans_log_gen_;
uint64_t prepare_id_;
ObRedoLogIdArray redos_;
// Buf.
int64_t data_len_;
static const int64_t buf_len_ = 2 * _M_;
char *buf_;
};
/*
* Mock Parser 1.
* Read Task, revert it immediately, and count Task number.
*/
class MockParser1 : public IObLogParser
{
public:
MockParser1() : trans_cnt_(0) { }
virtual ~MockParser1() { }
virtual int start() { return OB_SUCCESS; }
virtual void stop() { }
virtual void mark_stop_flag() { }
virtual int push(PartTransTask* task, const int64_t timeout)
{
UNUSED(timeout);
if (NULL != task && task->is_normal_trans()) {
task->revert();
trans_cnt_ += 1;
// Debug.
// _I_(">>> push parser", "req", task->get_seq());
}
return OB_SUCCESS;
}
int64_t get_trans_cnt() const { return trans_cnt_; }
private:
int64_t trans_cnt_;
};
/*
* Mock Fetcher Error Handler.
*/
class MockFetcherErrHandler1 : public IErrHandler
{
public:
virtual ~MockFetcherErrHandler1() { }
public:
virtual void handle_err(int err_no, const char* fmt, ...)
{
UNUSED(err_no);
va_list ap;
va_start(ap, fmt);
__E__(fmt, ap);
va_end(ap);
abort();
}
};
/*
* Mock Liboblog Error Handler.
*/
class MockLiboblogErrHandler1 : public IObLogErrHandler
{
public:
virtual void handle_error(int err_no, const char* fmt, ...)
{
UNUSED(err_no);
va_list ap;
va_start(ap, fmt);
__E__(fmt, ap);
va_end(ap);
}
};
/*
* Mock SvrProvider.
* User set svrs into it.
*/
class MockSvrProvider1 : public sqlclient::ObMySQLServerProvider
{
public:
virtual ~MockSvrProvider1() { }
void add_svr(const ObAddr &svr) { svrs_.push_back(svr); }
public:
virtual int get_cluster_list(common::ObIArray<int64_t> &cluster_list)
{
int ret = OB_SUCCESS;
if (svrs_.size() > 0) {
if (OB_FAIL(cluster_list.push_back(common::OB_INVALID_ID))) {
LOG_WARN("fail to push back cluster_id", K(ret));
}
}
return ret;
}
virtual int get_server(const int64_t cluster_id, const int64_t svr_idx, ObAddr& server)
{
UNUSED(cluster_id);
int ret = OB_SUCCESS;
if (0 <= svr_idx && svr_idx < static_cast<int64_t>(svrs_.size())) {
server = svrs_[svr_idx];
}
else {
ret = OB_ERR_UNEXPECTED;
}
return ret;
}
virtual int64_t get_cluster_count() const
{
return svrs_.size() > 0 ? 1 : 0;
}
virtual int64_t get_server_count(const int64_t cluster_id) const
{
UNUSED(cluster_id)
return static_cast<int64_t>(svrs_.size());
}
virtual int refresh_server_list() { return OB_SUCCESS; }
private:
std::vector<ObAddr> svrs_;
};
/*
* Test Dataset Generator.
*/
/*
* Svr Config.
* Set svr address and mysql port.
*/
struct SvrCfg
{
// Svr.
const char *svr_addr_;
int internal_port_;
// Mysql.
int mysql_port_;
const char *mysql_user_;
const char *mysql_password_;
const char *mysql_db_;
int64_t mysql_timeout_;
};
/*
* Configuration for mysql connector.
*/
inline ConnectorConfig prepare_cfg_1(const SvrCfg &svr_cfg)
{
ConnectorConfig cfg;
cfg.mysql_addr_ = svr_cfg.svr_addr_;
cfg.mysql_port_ = svr_cfg.mysql_port_;
cfg.mysql_user_ = svr_cfg.mysql_user_;
cfg.mysql_password_ = svr_cfg.mysql_password_;
cfg.mysql_db_ = svr_cfg.mysql_db_;
cfg.mysql_timeout_ = svr_cfg.mysql_timeout_;
return cfg;
}
/*
* Build table names.
*/
inline const char** prepare_table_name_1()
{
static const char* tnames[] = {
"table1",
"table2",
"table3",
"table4",
"table5",
"table6",
"table7",
"table8",
"table9",
"table10",
"table11",
"table12",
"table13",
"table14",
"table15",
"table16"
};
return tnames;
}
/*
* Build table schema.
*/
inline const char* prepare_table_schema_1()
{
return "c1 int primary key";
}
/*
* Create table.
*/
class CreateTable : public MySQLQueryBase
{
public:
CreateTable(const char *tname, const char *schema)
{
snprintf(buf_, 512, "create table %s(%s)", tname, schema);
sql_ = buf_;
sql_len_ = strlen(sql_);
}
private:
char buf_[512];
};
/*
* Drop table.
*/
class DropTable : public MySQLQueryBase
{
public:
DropTable(const char *tname)
{
snprintf(buf_, 512, "drop table if exists %s", tname);
sql_ = buf_;
sql_len_ = strlen(sql_);
}
private:
char buf_[512];
};
/*
* Get table id.
*/
class GetTableId : public MySQLQueryBase
{
public:
GetTableId(const char *tname)
{
snprintf(buf_, 512, "select table_id "
"from __all_table where table_name='%s'", tname);
sql_ = buf_;
sql_len_ = strlen(sql_);
}
int get_tid(uint64_t &tid)
{
int ret = common::OB_SUCCESS;
while (common::OB_SUCCESS == (ret = next_row())) {
uint64_t table_id = 0;
if (OB_SUCC(ret)) {
if (common::OB_SUCCESS != (ret = get_uint(0, table_id))) {
OBLOG_LOG(WARN, "err get uint", K(ret));
}
}
tid = table_id;
}
ret = (common::OB_ITER_END == ret) ? common::OB_SUCCESS : ret;
return ret;
}
private:
char buf_[512];
};
/*
* Get partition key by table id from system table.
*/
class GetPartitionKey : public MySQLQueryBase
{
public:
GetPartitionKey(const uint64_t tid)
{
snprintf(buf_, 512, "select table_id, partition_id, partition_cnt "
"from __all_meta_table where table_id=%lu", tid);
sql_ = buf_;
sql_len_ = strlen(sql_);
}
int get_pkeys(ObArray<ObPartitionKey> &pkeys)
{
int ret = common::OB_SUCCESS;
while (common::OB_SUCCESS == (ret = next_row())) {
uint64_t table_id = 0;
int32_t partition_id = 0;
int32_t partition_cnt = 0;
if (OB_SUCC(ret)) {
if (common::OB_SUCCESS != (ret = get_uint(0, table_id))) {
OBLOG_LOG(WARN, "err get uint", K(ret));
}
}
if (OB_SUCC(ret)) {
int64_t val = 0;
if (common::OB_SUCCESS != (ret = get_int(1, val))) {
OBLOG_LOG(WARN, "err get int", K(ret));
} else {
partition_id = static_cast<int32_t>(val);
}
}
if (OB_SUCC(ret)) {
int64_t val = 0;
if (common::OB_SUCCESS != (ret = get_int(2, val))) {
OBLOG_LOG(WARN, "err get int", K(ret));
} else {
partition_cnt = static_cast<int32_t>(val);
}
}
ObPartitionKey pkey;
pkey.init(table_id, partition_id, partition_cnt);
pkeys.push_back(pkey);
}
ret = (common::OB_ITER_END == ret) ? common::OB_SUCCESS : ret;
return ret;
}
private:
char buf_[512];
};
/*
* Create table and return their partition keys.
*/
inline void prepare_table_1(const SvrCfg& svr_cfg,
const char** tnames,
const int64_t tcnt,
const char* schema,
ObArray<ObPartitionKey>& pkeys)
{
ObLogMySQLConnector conn;
ConnectorConfig cfg = prepare_cfg_1(svr_cfg);
int ret = conn.init(cfg);
EXPECT_EQ(OB_SUCCESS, ret);
// Prepare tables.
for (int64_t idx = 0; idx < tcnt; ++idx) {
// Drop.
DropTable drop_table(tnames[idx]);
ret = conn.exec(drop_table);
EXPECT_EQ(OB_SUCCESS, ret);
// Create.
CreateTable create_table(tnames[idx], schema);
ret = conn.exec(create_table);
EXPECT_EQ(OB_SUCCESS, ret);
// Get tid.
GetTableId get_tid(tnames[idx]);
ret = conn.query(get_tid);
EXPECT_EQ(OB_SUCCESS, ret);
uint64_t tid = OB_INVALID_ID;
ret = get_tid.get_tid(tid);
EXPECT_EQ(OB_SUCCESS, ret);
// Get pkeys.
GetPartitionKey get_pkey(tid);
ret = conn.query(get_pkey);
EXPECT_EQ(OB_SUCCESS, ret);
ret = get_pkey.get_pkeys(pkeys);
EXPECT_EQ(OB_SUCCESS, ret);
}
ret = conn.destroy();
EXPECT_EQ(OB_SUCCESS, ret);
}
/*
* Data generator.
* Insert a bunch of data in server.
* For schema 1.
*/
class DataGenerator1 : public Runnable
{
class Inserter : public MySQLQueryBase
{
public:
void set_data(const char *tname, const int64_t data)
{
reuse();
snprintf(buf_, 512, "insert into %s (c1) values (%ld)", tname, data);
sql_ = buf_;
sql_len_ = strlen(sql_);
}
private:
char buf_[512];
};
public:
DataGenerator1(const ConnectorConfig &cfg) :
conn_(), cfg_(cfg), tname_(NULL), start_(0), end_(0)
{
int err = conn_.init(cfg_);
EXPECT_EQ(OB_SUCCESS, err);
}
~DataGenerator1()
{
conn_.destroy();
}
void insert(const char *tname, const int64_t start, const int64_t end)
{
tname_ = tname;
start_ = start;
end_ = end;
create();
}
private:
int routine()
{
Inserter inserter;
int err = OB_SUCCESS;
for (int64_t cur = start_;
OB_SUCCESS == err && cur < end_;
cur++) {
inserter.set_data(tname_, cur);
err = conn_.exec(inserter);
EXPECT_EQ(OB_SUCCESS, err);
}
return OB_SUCCESS;
}
private:
ObLogMySQLConnector conn_;
ConnectorConfig cfg_;
const char *tname_;
int64_t start_;
int64_t end_;
};
/*
* End of Test Dataset Generator.
*/
}
}