Files
oceanbase/unittest/storage/tx_table/test_tx_ctx_table.cpp

438 lines
14 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#define protected public
#define private public
#define UNITTEST
#include <vector>
#include "storage/meta_mem/ob_tenant_meta_mem_mgr.h"
#include "storage/tx/ob_trans_ctx_mgr.h"
#include "storage/tx/ob_trans_part_ctx.h"
#include "storage/tx_table/ob_tx_ctx_memtable_mgr.h"
#include "storage/tx/ob_tx_log_adapter.h"
#include "storage/checkpoint/ob_data_checkpoint.h"
#include "storage/mock_ob_log_handler.h"
#include "storage/ls/ob_ls_tx_service.h"
#include "storage/ls/ob_ls.h"
#include "logservice/ob_log_handler.h"
namespace oceanbase
{
using namespace ::testing;
using namespace transaction;
using namespace storage;
using namespace blocksstable;
using namespace share;
namespace storage
{
int64_t ObTenantMetaMemMgr::cal_adaptive_bucket_num()
{
return 1000;
}
}
namespace unittest
{
class ObTxCtxTableRecoverHelperUT : public ObTxCtxTableRecoverHelper
{
public:
virtual int recover_one_tx_ctx_(transaction::ObLSTxCtxMgr* ls_tx_ctx_mgr,
ObTxCtxTableInfo& ctx_info)
{
TRANS_LOG(INFO, "recover_one_tx_ctx_ called", K(ctx_info));
recover_tx_id_arr_.push_back(ctx_info.tx_id_);
return 0;
}
bool tx_id_recovered(transaction::ObTransID tx_id)
{
bool exist = false;
for (int64_t i = 0; i < recover_tx_id_arr_.size(); ++i) {
if (recover_tx_id_arr_[i] == tx_id) {
exist = true;
break;
}
}
return exist;
}
private:
std::vector<transaction::ObTransID> recover_tx_id_arr_;
};
class TestTxCtxTable : public ::testing::Test
{
public:
TestTxCtxTable():
ls_(),
ls_tx_service_(&ls_),
ls_data_checkpoint_(),
log_handler_(),
tablet_id_(LS_TX_DATA_TABLET),
ls_id_(1),
tenant_id_(1),
freezer_(&ls_),
t3m_(common::OB_SERVER_TENANT_ID),
mt_mgr_(nullptr),
ctx_mt_mgr_(nullptr),
tenant_base_(tenant_id_)
{
observer::ObIMetaReport *fake_reporter = (observer::ObIMetaReport *)0xff;
ObLSTabletService *tablet_svr = ls_.get_tablet_svr();
tablet_svr->init(&ls_, fake_reporter);
}
public:
static ObLSTxCtxMgr ls_tx_ctx_mgr_;
static ObLSTxCtxMgr ls_tx_ctx_mgr2_;
ObLS ls_;
ObLSWRSHandler ls_loop_worker_;
ObLSTxService ls_tx_service_;
ObLSTabletService ls_tablet_service_;
checkpoint::ObDataCheckpoint ls_data_checkpoint_;
MockObLogHandler log_handler_;
static int64_t ref_count_;
protected:
virtual void SetUp() override
{
ObTxPalfParam palf_param((logservice::ObLogHandler *)(0x01));
freezer_.init(&ls_);
EXPECT_EQ(OB_SUCCESS, t3m_.init());
EXPECT_EQ(OB_SUCCESS,
ls_tx_ctx_mgr_.init(tenant_id_, /*tenant_id*/
ls_id_,
&ls_.tx_table_,
ls_.get_lock_table(),
(ObITsMgr *)(0x01),
MTL(transaction::ObTransService*),
&palf_param,
nullptr));
EXPECT_EQ(OB_SUCCESS,
ls_tx_ctx_mgr2_.init(tenant_id_, /*tenant_id*/
ls_id_,
&ls_.tx_table_,
ls_.get_lock_table(),
(ObITsMgr *)(0x01),
MTL(transaction::ObTransService*),
&palf_param,
nullptr));
ref_count_ = 0;
ctx_mt_mgr_ = new ObTxCtxMemtableMgr();
EXPECT_EQ(OB_SUCCESS, ctx_mt_mgr_->init(tablet_id_,
ls_id_,
&freezer_,
&t3m_));
mt_mgr_ = ctx_mt_mgr_;
// tenant_base_.set(t3m_);
ObTenantEnv::set_tenant(&tenant_base_);
ASSERT_EQ(OB_SUCCESS, tenant_base_.init());
}
virtual void TearDown() override
{
ctx_mt_mgr_->reset();
t3m_.destroy();
ls_tx_ctx_mgr_.reset();
delete mt_mgr_;
mt_mgr_ = NULL;
ctx_mt_mgr_ = NULL;
ASSERT_EQ(0, ref_count_);
tenant_base_.destroy();
ObTenantEnv::set_tenant(nullptr);
}
public:
ObTabletID tablet_id_;
ObLSID ls_id_;
int64_t tenant_id_;
ObFreezer freezer_;
ObTenantMetaMemMgr t3m_;
ObIMemtableMgr *mt_mgr_;
ObTxCtxMemtableMgr *ctx_mt_mgr_;
ObTenantBase tenant_base_;
};
ObLSTxCtxMgr TestTxCtxTable::ls_tx_ctx_mgr_;
ObLSTxCtxMgr TestTxCtxTable::ls_tx_ctx_mgr2_;
int64_t TestTxCtxTable::ref_count_;
TEST_F(TestTxCtxTable, test_tx_ctx_memtable_mgr)
{
EXPECT_EQ(0, TestTxCtxTable::ref_count_);
EXPECT_EQ(OB_SUCCESS, mt_mgr_->create_memtable(SCN::min_scn(), /*last_replay_log_ts*/
0 /*schema_version*/));
EXPECT_EQ(1, TestTxCtxTable::ref_count_);
ObTableHandleV2 handle1;
ObTxCtxMemtable *memtable1 = NULL;
EXPECT_EQ(OB_SUCCESS, mt_mgr_->get_active_memtable(handle1));
EXPECT_EQ(true, handle1.get_table()->is_tx_ctx_memtable());
EXPECT_EQ(OB_SUCCESS, handle1.get_tx_ctx_memtable(memtable1));
ObSEArray<ObTableHandleV2, 5> handles2;
ObTableHandleV2 handle2;
ObTxCtxMemtable *memtable2 = NULL;
EXPECT_EQ(OB_SUCCESS, mt_mgr_->get_all_memtables(handles2));
EXPECT_EQ(1, handles2.count());
handle2 = handles2[0];
EXPECT_EQ(true, handle2.get_table()->is_tx_ctx_memtable());
EXPECT_EQ(OB_SUCCESS, handle2.get_tx_ctx_memtable(memtable2));
EXPECT_EQ(memtable1, memtable2);
handle2.reset();
handles2.reset();
EXPECT_EQ(1, TestTxCtxTable::ref_count_);
TRANS_LOG(INFO, "[TX_CTX_TABLE] tx ctx memtable mgr test successfully", KPC(ctx_mt_mgr_), K(TestTxCtxTable::ref_count_));
ObArenaAllocator allocator;
ObTableReadInfo read_info;
ObTableIterParam param;
ObArray<ObColDesc> columns_;
columns_.reset();
ObColDesc col_desc;
columns_.push_back(col_desc);
param.reset();
read_info.init(allocator, 16000, 1, lib::is_oracle_mode(), columns_);
param.tablet_id_ = tablet_id_;
param.read_info_ = &read_info;
param.is_multi_version_minor_merge_ = true;
ObTableAccessContext context;
ObStoreCtx store_ctx;
context.reset();
context.store_ctx_ = &store_ctx;
context.allocator_ = &allocator;
context.stmt_allocator_ = &allocator;
context.is_inited_ = true;
ObDatumRange key_range;
ObStoreRowIterator *row_iter = NULL;
const ObDatumRow *row = NULL;
ObDatumRow row_copy;
EXPECT_EQ(OB_SUCCESS, memtable1->scan(param,
context,
key_range,
row_iter));
EXPECT_EQ(OB_ITER_END, row_iter->get_next_row(row));
row_iter->reset();
row_iter = NULL;
row = NULL;
ObTransID id1(1);
ObLSID ls_id(1);
static ObPartTransCtx ctx1;
ctx1.tenant_id_ = 1;
ctx1.trans_id_ = id1;
ctx1.is_inited_ = true;
ctx1.ls_id_ = ls_id;
ctx1.exec_info_.max_applying_log_ts_.convert_from_ts(1);
ObTxData data1;
// ctx1.tx_data_ = &data1;
ctx1.ctx_tx_data_.test_init(data1, &ls_tx_ctx_mgr_);
ObTransID id2(2);
static ObPartTransCtx ctx2;
ctx2.tenant_id_ = 1;
ctx2.trans_id_ = id2;
ctx2.is_inited_ = true;
ctx2.ls_id_ = ls_id;
ctx2.exec_info_.max_applying_log_ts_.convert_from_ts(2);
ObTxData data2;
// ctx2.tx_data_ = &data2;
ctx2.ctx_tx_data_.test_init(data2, &ls_tx_ctx_mgr_);
ObTransCtx *ctx = NULL;
EXPECT_EQ(OB_SUCCESS, ls_tx_ctx_mgr_.ls_tx_ctx_map_.insert_and_get(id1, &ctx1, &ctx));
EXPECT_EQ(OB_SUCCESS, ls_tx_ctx_mgr_.ls_tx_ctx_map_.insert_and_get(id2, &ctx2, &ctx));
int64_t idx = 0;
// ObTransSSTableDurableCtxInfo ctx_info;
ObTxCtxTableInfo ctx_info;
ObSliceAlloc slice_allocator;
ObTxDataTable tx_data_table;
ObMemAttr attr;
attr.tenant_id_ = MTL_ID();
tx_data_table.slice_allocator_.init(sizeof(ObTxData), OB_MALLOC_NORMAL_BLOCK_SIZE, common::default_blk_alloc, attr);
ObTxPalfParam palf_param((logservice::ObLogHandler *)(0x01));
ObTxCtxTableRecoverHelperUT recover_helper;
ObLSTxCtxMgr* ls_tx_ctx_mgr_recover = &unittest::TestTxCtxTable::ls_tx_ctx_mgr2_;
EXPECT_EQ(OB_SUCCESS, memtable1->scan(param,
context,
key_range,
row_iter));
recover_helper.reset();
for (int64_t ctx_idx = 0; ctx_idx < 2; ++ctx_idx) {
ls_tx_ctx_mgr_recover->reset();
EXPECT_EQ(OB_SUCCESS,
ls_tx_ctx_mgr_recover->init(TestTxCtxTable::tenant_id_, /*tenant_id*/
TestTxCtxTable::ls_id_,
&ls_.tx_table_,
&ls_.lock_table_,
(ObITsMgr *)(0x01),
MTL(transaction::ObTransService*),
&palf_param,
nullptr));
ObTxCtxMemtableScanIterator* tx_ctx_memtable_iter = dynamic_cast<ObTxCtxMemtableScanIterator*>(row_iter);
ASSERT_EQ(OB_SUCCESS, tx_ctx_memtable_iter->TEST_set_max_value_length(32));
do {
EXPECT_EQ(OB_SUCCESS, row_iter->get_next_row(row));
TRANS_LOG(INFO, "row_info", K(*row));
int64_t meta_col = TX_CTX_TABLE_META_COLUMN +
ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
int64_t value_col = meta_col + 1;
row_copy.reset();
row_copy.init(allocator, 8);
row_copy.deep_copy(*row, allocator);
row_copy.storage_datums_[TX_CTX_TABLE_META_COLUMN] = row_copy.storage_datums_[meta_col];
row_copy.storage_datums_[TX_CTX_TABLE_VAL_COLUMN] = row_copy.storage_datums_[value_col];
TRANS_LOG(INFO, "row_info projected", K(row_copy));
ASSERT_EQ(OB_SUCCESS, recover_helper.recover(row_copy, tx_data_table, ls_tx_ctx_mgr_recover));
} while (tx_ctx_memtable_iter->has_unmerged_buf_);
}
EXPECT_EQ(true, recover_helper.tx_id_recovered(id1));
EXPECT_EQ(true, recover_helper.tx_id_recovered(id2));
TRANS_LOG(INFO, "[TX_CTX_TABLE] successfully recover");
/*
TRANS_LOG(INFO, "[TX_CTX_TABLE] get next row return", KPC(row));
EXPECT_EQ(OB_SUCCESS, ObTxCtxTable::TEST_recover(*row, idx, ctx_info, slice_allocator));
EXPECT_EQ(0, idx);
EXPECT_EQ(id1, ctx_info.tx_id_);
TRANS_LOG(INFO, "[TX_CTX_TABLE] successfully recover", K(ctx_info));
EXPECT_EQ(OB_SUCCESS, row_iter->get_next_row(row));
TRANS_LOG(INFO, "[TX_CTX_TABLE] get next row return", KPC(row));
EXPECT_EQ(OB_SUCCESS, ObTxCtxTable::TEST_recover(*row, idx, ctx_info, slice_allocator));
EXPECT_EQ(1, idx);
EXPECT_EQ(id2, ctx_info.tx_id_);
TRANS_LOG(INFO, "[TX_CTX_TABLE] successfully recover", K(ctx_info));
EXPECT_EQ(OB_ITER_END, row_iter->get_next_row(row));
EXPECT_EQ(1, TestTxCtxTable::ref_count_);
*/
TRANS_LOG(INFO, "[TX_CTX_TABLE] tx ctx memtable test successfully", KPC(memtable1), K(TestTxCtxTable::ref_count_));
ctx1.is_inited_ = false;
ctx2.is_inited_ = false;
}
} // namspace unittest
namespace storage
{
int ObTxCtxTable::acquire_ref_(const ObLSID& ls_id)
{
int ret = OB_SUCCESS;
ls_tx_ctx_mgr_ = &unittest::TestTxCtxTable::ls_tx_ctx_mgr_;
unittest::TestTxCtxTable::ref_count_++;
TRANS_LOG(INFO, "[TX_CTX_TABLE] tx ctx table acquire ref", K(ls_id), K(unittest::TestTxCtxTable::ref_count_), K(this));
return ret;
}
int ObTxCtxTable::release_ref_()
{
int ret = OB_SUCCESS;
ls_tx_ctx_mgr_ = NULL;
unittest::TestTxCtxTable::ref_count_--;
TRANS_LOG(INFO, "[TX_CTX_TABLE] tx ctx table release ref", K(unittest::TestTxCtxTable::ref_count_), K(this));
return ret;
}
} // namespace storage
namespace transaction
{
int ObLSTxCtxMgr::init(const int64_t tenant_id,
const ObLSID &ls_id,
ObTxTable *tx_table,
ObLockTable *lock_table,
ObITsMgr *ts_mgr,
ObTransService *txs,
ObITxLogParam *param,
ObITxLogAdapter *log_adapter)
{
int ret = OB_SUCCESS;
UNUSED(log_adapter);
if (is_inited_) {
TRANS_LOG(WARN, "ObLSTxCtxMgr inited twice");
ret = OB_INIT_TWICE;
} else {
//if (ObTransCtxType::PARTICIPANT == ctx_type) {
// bool is_dup_table = false;
// // FIXME. xiaoshi.xjl
// //} else if (OB_FAIL(txs->check_duplicated_partition(ls_id_, is_dup_table))) {
// // TRANS_LOG(WARN, "check duplicated partition serving error", KR(ret), K(ls_id_));
// if (is_dup_table && OB_FAIL(init_dup_table_mgr())) {
// TRANS_LOG(WARN, "failed to init dup table", K(ret), K(ls_id));
// } else {
// // do nothing
// }
//}
//if (OB_SUCC(ret) && (ObTransCtxType::PARTICIPANT == ctx_type)) {
// if (OB_ISNULL(core_local_partition_audit_info_ = ObCoreLocalPartitionAuditInfoFactory::alloc())) {
// ret = OB_ALLOCATE_MEMORY_FAILED;
// TRANS_LOG(WARN, "alloc partition audit info error", KR(ret), K(ls_id));
// } else if (OB_FAIL(core_local_partition_audit_info_->init(OB_PARTITION_AUDIT_LOCAL_STORAGE_COUNT))) {
// TRANS_LOG(WARN, "ls_id audit info init error", KR(ret), K(ls_id));
// } else {
// // do nothing
// }
//}
if (OB_FAIL(ls_tx_ctx_map_.init(lib::ObMemAttr(tenant_id, "LSTxCtxMgr")))) {
TRANS_LOG(WARN, "ls_tx_ctx_map_ init fail", KR(ret));
} else {
is_inited_ = true;
state_ = State::F_WORKING;
tenant_id_ = tenant_id;
ls_id_ = ls_id;
tx_table_ = tx_table;
lock_table_ = lock_table,
txs_ = txs;
ts_mgr_ = ts_mgr;
TRANS_LOG(INFO, "ObLSTxCtxMgr inited success", KP(this), K(ls_id));
}
}
return ret;
}
}
} // namespace oceanbase
int main(int argc, char **argv)
{
system("rm -rf test_tx_ctx_table.log*");
OB_LOGGER.set_file_name("test_tx_ctx_table.log");
OB_LOGGER.set_log_level("INFO");
STORAGE_LOG(INFO, "begin unittest: test tx ctx table");
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}