oceanbase/unittest/libobcdc/log_generator.h
wangzelin.wzl 93a1074b0c patch 4.0
2022-10-24 17:57:12 +08:00

320 lines
9.7 KiB
C++

#include <gtest/gtest.h>
#define USING_LOG_PREFIX OBLOG
#define private public
#include "logservice/palf/log_entry.h"
#include "storage/tx/ob_tx_log.h"
#undef private
#include "storage/memtable/ob_memtable_mutator.h"
#include "storage/memtable/ob_memtable_context.h"
#include "storage/tx/ob_clog_encrypt_info.h"
#include "logservice/libobcdc/src/ob_cdc_define.h"
#include "logservice/libobcdc/src/ob_log_utils.h"
#include "logservice/libobcdc/src/ob_small_arena.h"
#include "logservice/ob_log_base_header.h"
#include "logservice/palf/log_define.h"
#include "logservice/palf/log_entry_header.h"
#include "storage/memtable/utils_rowkey_builder.h"
namespace oceanbase
{
using namespace palf;
using namespace memtable;
using namespace transaction;
using namespace libobcdc;
namespace unittest
{
class ObTxLogBlockBuilder
{
public:
ObTxLogBlockBuilder(const int64_t tx_id, const uint64_t cluster_id)
: tx_id_(tx_id), cluster_id_(cluster_id), log_entry_no_(0), tx_log_block_() {}
~ObTxLogBlockBuilder() {}
public:
int next_log_block();
int fill_redo(ObTxRedoLog &redo_log);
template <typename T> int fill_tx_log_except_redo(T &tx_log_body)
{
return tx_log_block_.add_new_log(tx_log_body);
}
ObTxLogBlock &build() { return tx_log_block_; }
int64_t get_log_entry_no() { return log_entry_no_; }
private:
TxID tx_id_;
uint64 cluster_id_;
int64_t log_entry_no_;
ObTxLogBlock tx_log_block_;
};
// usage example: generate a log_entry with redo+commit_info
// LogEntry log_entry;
// palf::LSN lsn;
// ObTxLogGenerator log_generator(xxx);
// log_generator.gen_redo_log()
// log_generator.gen_commit_info_log()
// log_generator.gen_log_entry(log_entry, lsn)
class ObTxLogGenerator
{
public:
ObTxLogGenerator(const uint64_t tenant_id, const int64_t ls_id, const int64_t tx_id, const uint64_t cluster_id)
: tls_id_(tenant_id, share::ObLSID(ls_id)),
block_builder_(tx_id, cluster_id),
lsn_arr_(),
last_record_lsn_(),
trans_type_(transaction::TransType::SP_TRANS),
addr_(),
cluster_version_(1),
trace_id_str_("obcdc_unittest_trace"),
can_elr_(false),
is_dup_(false),
is_sub2pc_(false),
epoch_(1024),
last_op_scn_(2048),
checksum_(29890209),
allocator_()
{
addr_.set_ip_addr("127.0.0.1", 2881);
block_builder_.next_log_block();
}
~ObTxLogGenerator() { reset(); }
void reset() { allocator_.reset(); }
public:
static const int64_t ENTRY_BUF_SIZE = 1 << 21;
public:
int gen_log_entry(LogEntry &log_entry, LSN &lsn)
{
int ret = OB_SUCCESS;
log_entry.reset();
lsn = last_lsn_();
ObTxLogBlock &tx_log_block = block_builder_.build();
LogEntryHeader entry_header;
char *tx_buf = tx_log_block.get_buf();
int64_t buf_len = tx_log_block.get_size();
char *buf = static_cast<char*>(allocator_.alloc(buf_len));
MEMCPY(buf, tx_buf, buf_len);
LogType log_type = palf::LOG_PADDING;
const bool is_padding_log = (palf::LOG_PADDING == log_type);
int64_t log_entry_header_size = entry_header.get_serialize_size();
int64_t ts = get_timestamp();
if (OB_FAIL(entry_header.generate_header(buf, buf_len, ts))) {
LOG_ERROR("generate_header failed", KR(ret), K(buf), K(buf_len), K(ts));
} else {
log_entry.header_ = entry_header;
log_entry.buf_ = buf;
lsn.val_ += (log_entry.get_serialize_size());
if (OB_UNLIKELY(! log_entry.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("log_entry is not valid", KR(ret));
} else if (OB_UNLIKELY(! lsn.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("lsn is not valid", KR(ret));
} else if (OB_FAIL(block_builder_.next_log_block())) {
LOG_ERROR("next_log_block failed", "log_entry_no", block_builder_.get_log_entry_no());
} else if (OB_FAIL(lsn_arr_.push_back(lsn))) {
LOG_ERROR("push_back lsn to lsn_arr failed", KR(ret), K(lsn));
}
}
return ret;
}
int gen_ls_offline_log_entry(LogEntry &log_entry, LSN &lsn)
{
int ret = OB_SUCCESS;
int64_t pos = 0;
lsn = last_lsn_();
LogEntryHeader entry_header;
const int64_t ts = get_timestamp();
logservice::ObLogBaseHeader log_base_header(logservice::ObLogBaseType::GC_LS_LOG_BASE_TYPE, logservice::ObReplayBarrierType::NO_NEED_BARRIER, 1);
const int64_t serizlize_size = log_base_header.get_serialize_size();
char *buf = static_cast<char*>(allocator_.alloc(serizlize_size));
if (OB_FAIL(log_base_header.serialize(buf, serizlize_size, pos))) {
LOG_ERROR("serialize log_base_header failed", KR(ret), K(buf), K(serizlize_size), K(pos));
} else if (OB_FAIL(entry_header.generate_header(buf, serizlize_size, ts))) {
LOG_ERROR("generate_header for offline log_entry failed", KR(ret));
} else {
log_entry.buf_ = buf;
log_entry.header_ = entry_header;
lsn.val_ += log_entry.get_serialize_size();
if (OB_UNLIKELY(! log_entry.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("offline log_entry is not valid", KR(ret));
}
}
return ret;
}
void gen_redo_log();
void gen_rollback_to_log();
void gen_mds_log();
void gen_record_log();
void gen_commit_info_log();
void gen_prepare_log();
void gen_commit_log();
private:
// lsn of last log_entry, may be invalid if haven't generate any log_entry.
palf::LSN last_lsn_();
private:
TenantLSID tls_id_;
ObTxLogBlockBuilder block_builder_;
ObLogLSNArray lsn_arr_;
LSN last_record_lsn_;
int32_t trans_type_;
ObAddr addr_;
int64_t cluster_version_;
common::ObString trace_id_str_;
bool can_elr_;
bool is_dup_;
bool is_sub2pc_;
int64_t epoch_;
int64_t last_op_scn_;
int64_t checksum_;
common::ObArenaAllocator allocator_;
};
int ObTxLogBlockBuilder::next_log_block()
{
int ret = OB_SUCCESS;
ObTxLogBlockHeader block_header(cluster_id_, log_entry_no_, tx_id_);
tx_log_block_.reset();
if (OB_FAIL(tx_log_block_.init(tx_id_, block_header))) {
LOG_ERROR("init tx_log_block_ failed", KR(ret), K_(tx_id), K(block_header));
} else {
log_entry_no_ ++;
}
return ret;
}
int ObTxLogBlockBuilder::fill_redo(ObTxRedoLog &redo_log)
{
int ret = OB_SUCCESS;
uint8_t tmp_flag = memtable::ObTransRowFlag::NORMAL_ROW;
int64_t mutator_pos = 0;
common::ObString row_str("obcdc_test");
if (OB_FAIL(tx_log_block_.prepare_mutator_buf(redo_log))) {
LOG_ERROR("gen_mutator_buf for tx_log_block_ failed", KR(ret), K_(tx_log_block), K(redo_log));
} else {
ObMutatorWriter mmw;
// ObMemtableMutatorRow row;
ObCLogEncryptInfo encrypt_info;
mmw.set_buffer(redo_log.get_mutator_buf(), redo_log.get_mutator_size());
if (OB_FAIL(mmw.append_row_buf(row_str.ptr(), row_str.length()))) {
LOG_ERROR("append_row failed", KR(ret));
} else if (OB_FAIL(mmw.serialize(tmp_flag, mutator_pos))) {
LOG_ERROR("serialize memtable_mutator failed", KR(ret));
} else if (OB_FAIL(tx_log_block_.finish_mutator_buf(redo_log, mutator_pos))) {
LOG_ERROR("finish_mutator_buf failed", KR(ret), K_(tx_log_block), K(redo_log));
}
}
return ret;
}
void ObTxLogGenerator::gen_redo_log()
{
ObTxRedoLogTempRef redo_log_ref;
ObTxRedoLog redo_log(redo_log_ref);
EXPECT_EQ(OB_SUCCESS, block_builder_.fill_redo(redo_log));
}
void ObTxLogGenerator::gen_rollback_to_log()
{
ObTxRollbackToLog rollback_to_log(2, 1);
// rollback_to_log.from_ = 2;
// rollback_to_log.to_ = 1;
EXPECT_EQ(OB_SUCCESS, block_builder_.fill_tx_log_except_redo(rollback_to_log));
}
void ObTxLogGenerator::gen_mds_log()
{
}
void ObTxLogGenerator::gen_record_log()
{
ObTxRecordLogTempRef record_log_ref;
if (last_record_lsn_.is_valid()) {
record_log_ref.prev_record_lsn_ = last_record_lsn_;
}
for (int i = 0; i < lsn_arr_.count(); i++) {
EXPECT_EQ(OB_SUCCESS, record_log_ref.redo_lsns_.push_back(lsn_arr_.at(i)));
}
ObTxRecordLog record_log(record_log_ref);
LOG_DEBUG("gen record", K(record_log));
lsn_arr_.reset();
EXPECT_EQ(OB_SUCCESS, block_builder_.fill_tx_log_except_redo(record_log));
}
void ObTxLogGenerator::gen_commit_info_log()
{
ObTxCommitInfoLogTempRef commit_info_log_ref;
for (int i = 0; i < lsn_arr_.count(); i++) {
EXPECT_EQ(OB_SUCCESS, commit_info_log_ref.redo_lsns_.push_back(lsn_arr_.at(i)));
}
ObTxCommitInfoLog commit_info_log(commit_info_log_ref);
LOG_DEBUG("gen commit_info_log", K(commit_info_log));
EXPECT_EQ(OB_SUCCESS, block_builder_.fill_tx_log_except_redo(commit_info_log));
}
void ObTxLogGenerator::gen_prepare_log()
{
share::ObLSArray inc_ls_arr;
transaction::LogOffSet prev_lsn = last_lsn_();
ObTxPrepareLog prepare_log(inc_ls_arr, prev_lsn);
LOG_DEBUG("gen prepare_log", K(prepare_log));
EXPECT_EQ(OB_SUCCESS, block_builder_.fill_tx_log_except_redo(prepare_log));
trans_type_ = transaction::TransType::DIST_TRANS; // dist trans.
}
void ObTxLogGenerator::gen_commit_log()
{
int64_t commit_ts = get_timestamp();
uint64_t checksum = 0;
share::ObLSArray inc_ls_arr;
ObTxBufferNodeArray mds_arr;
transaction::ObLSLogInfoArray ls_info_arr;
if (transaction::TransType::DIST_TRANS == trans_type_) {
ObLSLogInfo ls_info1(share::ObLSID(1), LSN(190));
ObLSLogInfo ls_info2(share::ObLSID(1001), last_lsn_());
EXPECT_EQ(OB_SUCCESS, ls_info_arr.push_back(ls_info1));
EXPECT_EQ(OB_SUCCESS, ls_info_arr.push_back(ls_info2));
}
ObTxCommitLog commit_log(
commit_ts,
checksum,
inc_ls_arr,
mds_arr,
trans_type_,
last_lsn_(),
ls_info_arr);
LOG_DEBUG("gen commit_log", K(commit_log));
EXPECT_EQ(OB_SUCCESS, block_builder_.fill_tx_log_except_redo(commit_log));
}
palf::LSN ObTxLogGenerator::last_lsn_()
{
palf::LSN lsn;
const int64_t cnt = lsn_arr_.count();
if (cnt > 0) {
lsn = lsn_arr_.at(cnt - 1);
}
return lsn;
}
} // namespace unittest
} // namespace oceanbase