/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #include #define protected public #define private public #define UNITTEST #include #include "storage/meta_mem/ob_tenant_meta_mem_mgr.h" #include "storage/tx/ob_trans_ctx_mgr.h" #include "storage/tx/ob_trans_part_ctx.h" #include "storage/tx_table/ob_tx_ctx_memtable_mgr.h" #include "storage/tx/ob_tx_log_adapter.h" #include "storage/checkpoint/ob_data_checkpoint.h" #include "storage/mock_ob_log_handler.h" #include "storage/ls/ob_ls_tx_service.h" #include "storage/ls/ob_ls.h" #include "logservice/ob_log_handler.h" namespace oceanbase { using namespace ::testing; using namespace transaction; using namespace storage; using namespace blocksstable; using namespace share; namespace storage { int64_t ObTenantMetaMemMgr::cal_adaptive_bucket_num() { return 1000; } } namespace unittest { class ObTxCtxTableRecoverHelperUT : public ObTxCtxTableRecoverHelper { public: virtual int recover_one_tx_ctx_(transaction::ObLSTxCtxMgr* ls_tx_ctx_mgr, ObTxCtxTableInfo& ctx_info) { TRANS_LOG(INFO, "recover_one_tx_ctx_ called", K(ctx_info)); recover_tx_id_arr_.push_back(ctx_info.tx_id_); return 0; } bool tx_id_recovered(transaction::ObTransID tx_id) { bool exist = false; for (int64_t i = 0; i < recover_tx_id_arr_.size(); ++i) { if (recover_tx_id_arr_[i] == tx_id) { exist = true; break; } } return exist; } private: std::vector recover_tx_id_arr_; }; class TestTxCtxTable : public ::testing::Test { public: TestTxCtxTable(): ls_(), ls_tx_service_(&ls_), ls_data_checkpoint_(), log_handler_(), tablet_id_(LS_TX_DATA_TABLET), ls_id_(1), tenant_id_(1), freezer_(&ls_), t3m_(common::OB_SERVER_TENANT_ID), mt_mgr_(nullptr), ctx_mt_mgr_(nullptr), tenant_base_(tenant_id_) { observer::ObIMetaReport *fake_reporter = (observer::ObIMetaReport *)0xff; ObLSTabletService *tablet_svr = ls_.get_tablet_svr(); tablet_svr->init(&ls_, fake_reporter); } public: static ObLSTxCtxMgr ls_tx_ctx_mgr_; static ObLSTxCtxMgr ls_tx_ctx_mgr2_; ObLS ls_; ObLSWRSHandler ls_loop_worker_; ObLSTxService ls_tx_service_; ObLSTabletService ls_tablet_service_; checkpoint::ObDataCheckpoint ls_data_checkpoint_; MockObLogHandler log_handler_; static int64_t ref_count_; protected: virtual void SetUp() override { ObTxPalfParam palf_param((logservice::ObLogHandler *)(0x01)); freezer_.init(&ls_); EXPECT_EQ(OB_SUCCESS, t3m_.init()); EXPECT_EQ(OB_SUCCESS, ls_tx_ctx_mgr_.init(tenant_id_, /*tenant_id*/ ls_id_, &ls_.tx_table_, ls_.get_lock_table(), (ObITsMgr *)(0x01), MTL(transaction::ObTransService*), &palf_param, nullptr)); EXPECT_EQ(OB_SUCCESS, ls_tx_ctx_mgr2_.init(tenant_id_, /*tenant_id*/ ls_id_, &ls_.tx_table_, ls_.get_lock_table(), (ObITsMgr *)(0x01), MTL(transaction::ObTransService*), &palf_param, nullptr)); ref_count_ = 0; ctx_mt_mgr_ = new ObTxCtxMemtableMgr(); EXPECT_EQ(OB_SUCCESS, ctx_mt_mgr_->init(tablet_id_, ls_id_, &freezer_, &t3m_)); mt_mgr_ = ctx_mt_mgr_; // tenant_base_.set(t3m_); ObTenantEnv::set_tenant(&tenant_base_); ASSERT_EQ(OB_SUCCESS, tenant_base_.init()); } virtual void TearDown() override { ctx_mt_mgr_->reset(); t3m_.destroy(); ls_tx_ctx_mgr_.reset(); delete mt_mgr_; mt_mgr_ = NULL; ctx_mt_mgr_ = NULL; ASSERT_EQ(0, ref_count_); tenant_base_.destroy(); ObTenantEnv::set_tenant(nullptr); } public: ObTabletID tablet_id_; ObLSID ls_id_; int64_t tenant_id_; ObFreezer freezer_; ObTenantMetaMemMgr t3m_; ObIMemtableMgr *mt_mgr_; ObTxCtxMemtableMgr *ctx_mt_mgr_; ObTenantBase tenant_base_; }; ObLSTxCtxMgr TestTxCtxTable::ls_tx_ctx_mgr_; ObLSTxCtxMgr TestTxCtxTable::ls_tx_ctx_mgr2_; int64_t TestTxCtxTable::ref_count_; TEST_F(TestTxCtxTable, test_tx_ctx_memtable_mgr) { EXPECT_EQ(0, TestTxCtxTable::ref_count_); EXPECT_EQ(OB_SUCCESS, mt_mgr_->create_memtable(SCN::min_scn(), /*last_replay_log_ts*/ 0 /*schema_version*/)); EXPECT_EQ(1, TestTxCtxTable::ref_count_); ObTableHandleV2 handle1; ObTxCtxMemtable *memtable1 = NULL; EXPECT_EQ(OB_SUCCESS, mt_mgr_->get_active_memtable(handle1)); EXPECT_EQ(true, handle1.get_table()->is_tx_ctx_memtable()); EXPECT_EQ(OB_SUCCESS, handle1.get_tx_ctx_memtable(memtable1)); ObSEArray handles2; ObTableHandleV2 handle2; ObTxCtxMemtable *memtable2 = NULL; EXPECT_EQ(OB_SUCCESS, mt_mgr_->get_all_memtables(handles2)); EXPECT_EQ(1, handles2.count()); handle2 = handles2[0]; EXPECT_EQ(true, handle2.get_table()->is_tx_ctx_memtable()); EXPECT_EQ(OB_SUCCESS, handle2.get_tx_ctx_memtable(memtable2)); EXPECT_EQ(memtable1, memtable2); handle2.reset(); handles2.reset(); EXPECT_EQ(1, TestTxCtxTable::ref_count_); TRANS_LOG(INFO, "[TX_CTX_TABLE] tx ctx memtable mgr test successfully", KPC(ctx_mt_mgr_), K(TestTxCtxTable::ref_count_)); ObArenaAllocator allocator; ObTableReadInfo read_info; ObTableIterParam param; ObArray columns_; columns_.reset(); ObColDesc col_desc; columns_.push_back(col_desc); param.reset(); read_info.init(allocator, 16000, 1, lib::is_oracle_mode(), columns_); param.tablet_id_ = tablet_id_; param.read_info_ = &read_info; param.is_multi_version_minor_merge_ = true; ObTableAccessContext context; ObStoreCtx store_ctx; context.reset(); context.store_ctx_ = &store_ctx; context.allocator_ = &allocator; context.stmt_allocator_ = &allocator; context.is_inited_ = true; ObDatumRange key_range; ObStoreRowIterator *row_iter = NULL; const ObDatumRow *row = NULL; ObDatumRow row_copy; EXPECT_EQ(OB_SUCCESS, memtable1->scan(param, context, key_range, row_iter)); EXPECT_EQ(OB_ITER_END, row_iter->get_next_row(row)); row_iter->reset(); row_iter = NULL; row = NULL; ObTransID id1(1); ObLSID ls_id(1); static ObPartTransCtx ctx1; ctx1.tenant_id_ = 1; ctx1.trans_id_ = id1; ctx1.is_inited_ = true; ctx1.ls_id_ = ls_id; ctx1.exec_info_.max_applying_log_ts_.convert_from_ts(1); ObTxData data1; // ctx1.tx_data_ = &data1; ctx1.ctx_tx_data_.test_init(data1, &ls_tx_ctx_mgr_); ObTransID id2(2); static ObPartTransCtx ctx2; ctx2.tenant_id_ = 1; ctx2.trans_id_ = id2; ctx2.is_inited_ = true; ctx2.ls_id_ = ls_id; ctx2.exec_info_.max_applying_log_ts_.convert_from_ts(2); ObTxData data2; // ctx2.tx_data_ = &data2; ctx2.ctx_tx_data_.test_init(data2, &ls_tx_ctx_mgr_); ObTransCtx *ctx = NULL; EXPECT_EQ(OB_SUCCESS, ls_tx_ctx_mgr_.ls_tx_ctx_map_.insert_and_get(id1, &ctx1, &ctx)); EXPECT_EQ(OB_SUCCESS, ls_tx_ctx_mgr_.ls_tx_ctx_map_.insert_and_get(id2, &ctx2, &ctx)); int64_t idx = 0; // ObTransSSTableDurableCtxInfo ctx_info; ObTxCtxTableInfo ctx_info; ObSliceAlloc slice_allocator; ObTxDataTable tx_data_table; ObMemAttr attr; attr.tenant_id_ = MTL_ID(); tx_data_table.slice_allocator_.init(sizeof(ObTxData), OB_MALLOC_NORMAL_BLOCK_SIZE, common::default_blk_alloc, attr); ObTxPalfParam palf_param((logservice::ObLogHandler *)(0x01)); ObTxCtxTableRecoverHelperUT recover_helper; ObLSTxCtxMgr* ls_tx_ctx_mgr_recover = &unittest::TestTxCtxTable::ls_tx_ctx_mgr2_; EXPECT_EQ(OB_SUCCESS, memtable1->scan(param, context, key_range, row_iter)); recover_helper.reset(); for (int64_t ctx_idx = 0; ctx_idx < 2; ++ctx_idx) { ls_tx_ctx_mgr_recover->reset(); EXPECT_EQ(OB_SUCCESS, ls_tx_ctx_mgr_recover->init(TestTxCtxTable::tenant_id_, /*tenant_id*/ TestTxCtxTable::ls_id_, &ls_.tx_table_, &ls_.lock_table_, (ObITsMgr *)(0x01), MTL(transaction::ObTransService*), &palf_param, nullptr)); ObTxCtxMemtableScanIterator* tx_ctx_memtable_iter = dynamic_cast(row_iter); ASSERT_EQ(OB_SUCCESS, tx_ctx_memtable_iter->TEST_set_max_value_length(32)); do { EXPECT_EQ(OB_SUCCESS, row_iter->get_next_row(row)); TRANS_LOG(INFO, "row_info", K(*row)); int64_t meta_col = TX_CTX_TABLE_META_COLUMN + ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(); int64_t value_col = meta_col + 1; row_copy.reset(); row_copy.init(allocator, 8); row_copy.deep_copy(*row, allocator); row_copy.storage_datums_[TX_CTX_TABLE_META_COLUMN] = row_copy.storage_datums_[meta_col]; row_copy.storage_datums_[TX_CTX_TABLE_VAL_COLUMN] = row_copy.storage_datums_[value_col]; TRANS_LOG(INFO, "row_info projected", K(row_copy)); ASSERT_EQ(OB_SUCCESS, recover_helper.recover(row_copy, tx_data_table, ls_tx_ctx_mgr_recover)); } while (tx_ctx_memtable_iter->has_unmerged_buf_); } EXPECT_EQ(true, recover_helper.tx_id_recovered(id1)); EXPECT_EQ(true, recover_helper.tx_id_recovered(id2)); TRANS_LOG(INFO, "[TX_CTX_TABLE] successfully recover"); /* TRANS_LOG(INFO, "[TX_CTX_TABLE] get next row return", KPC(row)); EXPECT_EQ(OB_SUCCESS, ObTxCtxTable::TEST_recover(*row, idx, ctx_info, slice_allocator)); EXPECT_EQ(0, idx); EXPECT_EQ(id1, ctx_info.tx_id_); TRANS_LOG(INFO, "[TX_CTX_TABLE] successfully recover", K(ctx_info)); EXPECT_EQ(OB_SUCCESS, row_iter->get_next_row(row)); TRANS_LOG(INFO, "[TX_CTX_TABLE] get next row return", KPC(row)); EXPECT_EQ(OB_SUCCESS, ObTxCtxTable::TEST_recover(*row, idx, ctx_info, slice_allocator)); EXPECT_EQ(1, idx); EXPECT_EQ(id2, ctx_info.tx_id_); TRANS_LOG(INFO, "[TX_CTX_TABLE] successfully recover", K(ctx_info)); EXPECT_EQ(OB_ITER_END, row_iter->get_next_row(row)); EXPECT_EQ(1, TestTxCtxTable::ref_count_); */ TRANS_LOG(INFO, "[TX_CTX_TABLE] tx ctx memtable test successfully", KPC(memtable1), K(TestTxCtxTable::ref_count_)); ctx1.is_inited_ = false; ctx2.is_inited_ = false; } } // namspace unittest namespace storage { int ObTxCtxTable::acquire_ref_(const ObLSID& ls_id) { int ret = OB_SUCCESS; ls_tx_ctx_mgr_ = &unittest::TestTxCtxTable::ls_tx_ctx_mgr_; unittest::TestTxCtxTable::ref_count_++; TRANS_LOG(INFO, "[TX_CTX_TABLE] tx ctx table acquire ref", K(ls_id), K(unittest::TestTxCtxTable::ref_count_), K(this)); return ret; } int ObTxCtxTable::release_ref_() { int ret = OB_SUCCESS; ls_tx_ctx_mgr_ = NULL; unittest::TestTxCtxTable::ref_count_--; TRANS_LOG(INFO, "[TX_CTX_TABLE] tx ctx table release ref", K(unittest::TestTxCtxTable::ref_count_), K(this)); return ret; } } // namespace storage namespace transaction { int ObLSTxCtxMgr::init(const int64_t tenant_id, const ObLSID &ls_id, ObTxTable *tx_table, ObLockTable *lock_table, ObITsMgr *ts_mgr, ObTransService *txs, ObITxLogParam *param, ObITxLogAdapter *log_adapter) { int ret = OB_SUCCESS; UNUSED(log_adapter); if (is_inited_) { TRANS_LOG(WARN, "ObLSTxCtxMgr inited twice"); ret = OB_INIT_TWICE; } else { //if (ObTransCtxType::PARTICIPANT == ctx_type) { // bool is_dup_table = false; // // FIXME. xiaoshi.xjl // //} else if (OB_FAIL(txs->check_duplicated_partition(ls_id_, is_dup_table))) { // // TRANS_LOG(WARN, "check duplicated partition serving error", KR(ret), K(ls_id_)); // if (is_dup_table && OB_FAIL(init_dup_table_mgr())) { // TRANS_LOG(WARN, "failed to init dup table", K(ret), K(ls_id)); // } else { // // do nothing // } //} //if (OB_SUCC(ret) && (ObTransCtxType::PARTICIPANT == ctx_type)) { // if (OB_ISNULL(core_local_partition_audit_info_ = ObCoreLocalPartitionAuditInfoFactory::alloc())) { // ret = OB_ALLOCATE_MEMORY_FAILED; // TRANS_LOG(WARN, "alloc partition audit info error", KR(ret), K(ls_id)); // } else if (OB_FAIL(core_local_partition_audit_info_->init(OB_PARTITION_AUDIT_LOCAL_STORAGE_COUNT))) { // TRANS_LOG(WARN, "ls_id audit info init error", KR(ret), K(ls_id)); // } else { // // do nothing // } //} if (OB_FAIL(ls_tx_ctx_map_.init(lib::ObMemAttr(tenant_id, "LSTxCtxMgr")))) { TRANS_LOG(WARN, "ls_tx_ctx_map_ init fail", KR(ret)); } else { is_inited_ = true; state_ = State::F_WORKING; tenant_id_ = tenant_id; ls_id_ = ls_id; tx_table_ = tx_table; lock_table_ = lock_table, txs_ = txs; ts_mgr_ = ts_mgr; TRANS_LOG(INFO, "ObLSTxCtxMgr inited success", KP(this), K(ls_id)); } } return ret; } } } // namespace oceanbase int main(int argc, char **argv) { system("rm -rf test_tx_ctx_table.log*"); OB_LOGGER.set_file_name("test_tx_ctx_table.log"); OB_LOGGER.set_log_level("INFO"); STORAGE_LOG(INFO, "begin unittest: test tx ctx table"); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }