/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #include #define protected public #define private public #define UNITTEST #include #include #include "lib/oblog/ob_log.h" #include "storage/ls/ob_freezer.h" #include "storage/ls/ob_ls.h" #include "storage/ls/ob_ls_tablet_service.h" #include "storage/ls/ob_ls_tx_service.h" #include "storage/tx_table/ob_tx_data_memtable_mgr.h" #include "storage/tx_table/ob_tx_data_table.h" #include "storage/tx_table/ob_tx_table_iterator.h" #include "storage/checkpoint/ob_data_checkpoint.h" #include "storage/meta_mem/ob_tenant_meta_mem_mgr.h" #include "mtlenv/mock_tenant_module_env.h" #undef private #undef protected static int64_t const_data_num; int64_t tx_data_num CACHE_ALIGNED = 0; int64_t inserted_cnt = 0; share::SCN insert_start_scn = share::SCN::min_scn(); const int64_t ONE_SEC_NS = 1000LL * 1000LL * 1000LL; const int64_t MOD_NS = 1000LL * ONE_SEC_NS; namespace oceanbase { using namespace share; using namespace palf; namespace storage { class TestTxDataTable; class MockTxDataTable; class MockTxTable; static const uint64_t TEST_TENANT_ID = 1; // shrink select interval to push more points in cur_commit_versions // then the code will merge commit versions array with step_len larger than 1 int64_t ObTxDataMemtable::PERIODICAL_SELECT_INTERVAL_NS = 10LL; int ObTxDataMemtable::get_past_commit_versions_(ObCommitVersionsArray &past_commit_versions) { int ret = OB_SUCCESS; return ret; } int ObTxDataTable::get_recycle_scn(SCN &recycle_scn) { recycle_scn = SCN::max_scn(); return OB_SUCCESS; } class MockTxDataMemtableMgr : public ObTxDataMemtableMgr { public: int init(const common::ObTabletID &tablet_id, ObTxDataTable *tx_data_table, ObFreezer *freezer, ObTenantMetaMemMgr *t3m) { int ret = OB_SUCCESS; ObLSHandle ls_handle; ObTxTableGuard tx_table_guard; if (IS_INIT) { ret = OB_INIT_TWICE; STORAGE_LOG(WARN, "ObTxDataMemtableMgr has been initialized.", KR(ret)); } else if (OB_UNLIKELY(!tablet_id.is_valid()) || OB_ISNULL(freezer) || OB_ISNULL(t3m) || OB_ISNULL(tx_data_table)) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid arguments", K(ret), K(tablet_id), KP(freezer), KP(t3m)); } else { tablet_id_ = tablet_id; t3m_ = t3m; table_type_ = ObITable::TableType::TX_DATA_MEMTABLE; freezer_ = freezer; tx_data_table_ = tx_data_table; ls_tablet_svr_ = ls_handle.get_ls()->get_tablet_svr(); if (OB_ISNULL(tx_data_table_) || OB_ISNULL(ls_tablet_svr_)) { ret = OB_ERR_NULL_VALUE; STORAGE_LOG(WARN, "Init tx data memtable mgr failed.", KR(ret)); } else { is_inited_ = true; } } if (IS_NOT_INIT) { destroy(); } return ret; } }; class MockTxDataTable : public ObTxDataTable { friend TestTxDataTable; public: MockTxDataTable() : ObTxDataTable() {} int init(ObLS &ls) { int ret = OB_SUCCESS; ObTabletID tablet_id(LS_TX_DATA_TABLET); ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr *); OB_ASSERT(OB_SUCCESS == ObTxDataTable::init(&ls, &tx_ctx_table_)); OB_ASSERT(OB_SUCCESS == mgr_.init(tablet_id, this, &freezer_, t3m)); mgr_.set_slice_allocator(get_slice_allocator()); return ret; } private: virtual ObTxDataMemtableMgr *get_memtable_mgr_() { return &mgr_; } private: ObFreezer freezer_; ObTabletHandle th_; MockTxDataMemtableMgr mgr_; ObLSTabletService ls_tablet_svr_; ObTxCtxTable tx_ctx_table_; // ObOccamTimer occam_timer_; // ObOccamThreadPool pool_; }; class MockTxTable : public ObTxTable { public: MockTxTable(MockTxDataTable *tx_data_table) : ObTxTable(*tx_data_table) {} }; class TestTxDataTable : public ::testing::Test { public: TestTxDataTable() : tx_table_(&tx_data_table_) {} virtual ~TestTxDataTable() {} static void SetUpTestCase() { EXPECT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init()); } virtual void SetUp() override { fake_ls_(ls_); } static void TearDownTestCase() { MockTenantModuleEnv::get_instance().destroy(); } void do_basic_test(); void do_undo_status_test(); void do_tx_data_serialize_test(); void do_repeat_insert_test(); void do_print_leak_slice_test(); private: void insert_tx_data_(); void insert_abort_tx_data_(); void make_freezing_to_frozen_(ObTxDataMemtableMgr *memtable_mgr); void test_serialize_with_action_cnt_(int cnt); void generate_past_commit_version_(ObCommitVersionsArray &past_commit_versions); void set_freezer_(); void init_memtable_mgr_(ObTxDataMemtableMgr *memtable_mgr); void check_freeze_(ObTxDataMemtableMgr *memtable_mgr, ObTxDataMemtable *&freezing_memtable, ObTxDataMemtable *&active_memtable); void test_serialize_with_action_cnt(int cnt); void insert_rollback_tx_data_(); void test_commit_versions_serialize_(); void fake_ls_(ObLS &ls); public: MockTxDataTable tx_data_table_; MockTxTable tx_table_; ObTenantMetaMemMgr *t3m_; ObLS ls_; ObArenaAllocator allocator_; }; void TestTxDataTable::make_freezing_to_frozen_(ObTxDataMemtableMgr *memtable_mgr) { ObArray memtables; ObTxDataMemtable *memtable = nullptr; memtable_mgr->get_all_memtables(memtables); for (int i = 0; i < memtables.count(); i++) { ASSERT_EQ(OB_SUCCESS, memtables[i].get_tx_data_memtable(memtable)); if (ObTxDataMemtable::State::FREEZING == memtable->get_state()) { memtable->set_state(ObTxDataMemtable::State::FROZEN); } } } void TestTxDataTable::insert_tx_data_() { insert_start_scn.convert_for_logservice(ObTimeUtil::current_time_ns()); ObTxData *tx_data = nullptr; transaction::ObTransID tx_id; while (true) { int64_t int_tx_id = 0; if ((int_tx_id = ATOMIC_AAF(&tx_data_num, -1)) <= 0) { break; } tx_id = int_tx_id; bool is_abort_tx = int_tx_id % 5 == 0; ObTxDataGuard tx_data_guard; tx_data_guard.reset(); ASSERT_EQ(OB_SUCCESS, tx_data_table_.alloc_tx_data(tx_data_guard)); ASSERT_NE(nullptr, tx_data = tx_data_guard.tx_data()); // fill in data tx_data->tx_id_ = tx_id; tx_data->start_scn_ = share::SCN::plus(insert_start_scn, (rand64(ObTimeUtil::current_time_ns()) % MOD_NS)); tx_data->commit_version_ = is_abort_tx ? share::SCN::invalid_scn() : share::SCN::plus(tx_data->start_scn_, (rand64(ObTimeUtil::current_time()) % MOD_NS)); tx_data->end_scn_ = is_abort_tx ? tx_data->start_scn_ : tx_data->commit_version_; tx_data->state_ = is_abort_tx ? ObTxData::ABORT : ObTxData::COMMIT; int undo_act_num = is_abort_tx ? 0 : rand() & 15; for (int j = 0; j < undo_act_num; j++) { auto from = random(); auto to = (from - 10000 > 0) ? (from - 100000) : 1; transaction::ObUndoAction undo_action(from, to); ASSERT_EQ(OB_SUCCESS, tx_data->add_undo_action(&tx_table_, undo_action)); } ASSERT_EQ(OB_SUCCESS, tx_data_table_.insert(tx_data)); } } void TestTxDataTable::insert_rollback_tx_data_() { auto tx_id = transaction::ObTransID(INT64_MAX-2); share::SCN max_end_scn = share::SCN::min_scn(); ObTableHandleV2 handle; ObTxDataMemtable *memtable; ASSERT_EQ(OB_SUCCESS, tx_data_table_.get_memtable_mgr_()->get_active_memtable(handle)); ASSERT_EQ(OB_SUCCESS, handle.get_tx_data_memtable(memtable)); ASSERT_NE(nullptr, memtable); for (int i = 0; i < 200; i++) { ObTxDataGuard tx_data_guard; ObTxData *tx_data = nullptr; ASSERT_EQ(OB_SUCCESS, tx_data_table_.alloc_tx_data(tx_data_guard)); ASSERT_NE(nullptr, tx_data = tx_data_guard.tx_data()); // fill in data tx_data->tx_id_ = tx_id; tx_data->start_scn_ = i % 2 ? insert_start_scn : share::SCN::max_scn(); tx_data->commit_version_ = share::SCN::invalid_scn(); tx_data->end_scn_ = share::SCN::plus(insert_start_scn, (rand64(ObTimeUtil::current_time_ns()) % (100LL * ONE_SEC_NS))); if (tx_data->end_scn_ > max_end_scn) { max_end_scn = tx_data->end_scn_; } transaction::ObUndoAction undo_action(100, 10); tx_data->add_undo_action(&tx_table_, undo_action); tx_data->state_ = ObTxData::RUNNING; ASSERT_EQ(OB_SUCCESS, tx_data_table_.insert(tx_data)); } } void TestTxDataTable::insert_abort_tx_data_() { insert_start_scn.convert_for_logservice(ObTimeUtil::current_time_ns()); ObTxData *tx_data = nullptr; transaction::ObTransID tx_id; tx_id = INT64_MAX - 3; ObTxDataGuard tx_data_guard; ASSERT_EQ(OB_SUCCESS, tx_data_table_.alloc_tx_data(tx_data_guard)); ASSERT_NE(nullptr, tx_data = tx_data_guard.tx_data()); // fill in data tx_data->tx_id_ = tx_id; tx_data->start_scn_ = share::SCN::plus(insert_start_scn, MOD_NS + 1); // bigger than all tx datas to be the last one tx_data->commit_version_ = share::SCN::invalid_scn(); tx_data->end_scn_ = tx_data->start_scn_; tx_data->state_ = ObTxData::ABORT; ASSERT_EQ(OB_SUCCESS, tx_data_table_.insert(tx_data)); } void TestTxDataTable::generate_past_commit_version_(ObCommitVersionsArray &past_commit_versions) { share::SCN start_scn = share::SCN::minus(insert_start_scn, 300LL * ONE_SEC_NS); share::SCN commit_version = share::SCN::plus(start_scn, 2LL * ONE_SEC_NS); for (int i = 0; i < 500; i++) { past_commit_versions.array_.push_back(ObCommitVersionsArray::Node(start_scn, commit_version)); start_scn = share::SCN::plus(start_scn, 1LL * ONE_SEC_NS + (rand64(ObTimeUtil::current_time_ns()) % ONE_SEC_NS)); commit_version = share::SCN::plus(std::max(commit_version, start_scn), (rand64(ObTimeUtil::current_time_ns()) % (2LL * ONE_SEC_NS))); } } void TestTxDataTable::set_freezer_() { ObLSID ls_id(1); ObLSWRSHandler ls_loop_worker; ObLSTxService ls_tx_svr(&ls_); ObLSTabletService *ls_tablet_svr = ls_.get_tablet_svr(); observer::ObIMetaReport *fake_reporter = (observer::ObIMetaReport *)0xff; ASSERT_EQ(OB_SUCCESS, ls_tablet_svr->init(&ls_, fake_reporter)); ls_.data_checkpoint_.is_inited_ = true; tx_data_table_.freezer_.init(&ls_); } void TestTxDataTable::init_memtable_mgr_(ObTxDataMemtableMgr *memtable_mgr) { ASSERT_NE(nullptr, memtable_mgr); memtable_mgr->set_freezer(&tx_data_table_.freezer_); ASSERT_EQ(OB_SUCCESS, memtable_mgr->create_memtable(SCN::min_scn(), 1)); ASSERT_EQ(1, memtable_mgr->get_memtable_count_()); } void TestTxDataTable::check_freeze_(ObTxDataMemtableMgr *memtable_mgr, ObTxDataMemtable *&freezing_memtable, ObTxDataMemtable *&active_memtable) { // do freeze STORAGETEST_LOG(INFO, "tx_data_table_.freeze_memtable() start."); ASSERT_EQ(OB_SUCCESS, memtable_mgr->freeze()); ASSERT_EQ(2, memtable_mgr->get_memtable_count_()); // check freeze result ObArray memtables; ASSERT_EQ(OB_SUCCESS, memtable_mgr->get_all_memtables(memtables)); ASSERT_EQ(2, memtables.size()); ASSERT_EQ(OB_SUCCESS, memtables[0].get_tx_data_memtable(freezing_memtable)); ASSERT_EQ(OB_SUCCESS, memtables[1].get_tx_data_memtable(active_memtable)); ASSERT_EQ(ObTxDataMemtable::State::FREEZING, freezing_memtable->get_state()); ASSERT_EQ(ObTxDataMemtable::State::ACTIVE, active_memtable->get_state()); ASSERT_EQ(freezing_memtable->get_end_scn(), active_memtable->get_start_scn()); // set frozen state freezing_memtable->set_state(ObTxDataMemtable::State::FROZEN); } void TestTxDataTable::do_basic_test() { // init tx data table ASSERT_EQ(OB_SUCCESS, tx_data_table_.init(ls_)); set_freezer_(); ObTxDataMemtableMgr *memtable_mgr = tx_data_table_.get_memtable_mgr_(); init_memtable_mgr_(memtable_mgr); fprintf(stdout, "start insert tx data\n"); insert_tx_data_(); fprintf(stdout, "start insert rollback tx data\n"); insert_rollback_tx_data_(); fprintf(stdout, "start insert abort tx data\n"); insert_abort_tx_data_(); fprintf(stdout, "start freezing\n"); ObTxDataMemtable *freezing_memtable = nullptr; ObTxDataMemtable *active_memtable = nullptr; check_freeze_(memtable_mgr, freezing_memtable, active_memtable); inserted_cnt = freezing_memtable->get_tx_data_count(); const int64_t range_cnt = 4; ObSEArray range_array; ASSERT_EQ(OB_SUCCESS, freezing_memtable->pre_process_for_merge()); ASSERT_EQ(OB_SUCCESS, freezing_memtable->get_split_ranges(nullptr, nullptr, range_cnt, range_array)); int64_t pre_range_end_key = 0; for (int i = 0; i < range_cnt; i++) { auto &range = range_array[i]; int64_t start_key = 0; int64_t end_key = 0; ASSERT_EQ(OB_SUCCESS, range.get_start_key().get_obj_ptr()[0].get_int(start_key)); ASSERT_EQ(OB_SUCCESS, range.get_end_key().get_obj_ptr()[0].get_int(end_key)); ASSERT_EQ(pre_range_end_key, start_key); ASSERT_GE(end_key, start_key); pre_range_end_key = end_key; } ObTxData *fake_tx_data = nullptr; // check sort result { transaction::ObTransID pre_tx_id = INT64_MIN; ObTxData *cur_tx_data = freezing_memtable->sort_list_head_.next_; ASSERT_NE(nullptr, cur_tx_data); int64_t cnt = 0; while (nullptr != cur_tx_data) { auto tx_id = cur_tx_data->tx_id_; if (INT64_MAX == tx_id) { fake_tx_data = cur_tx_data; } ASSERT_GE(tx_id.get_id(), pre_tx_id.get_id()); pre_tx_id = tx_id; cur_tx_data = cur_tx_data->sort_list_node_.next_; cnt++; } // there is a fake tx data inserted into link hash map after pre-process for upper_trans_version // calculation ASSERT_EQ(inserted_cnt + 1, cnt); } // free memtable freezing_memtable->reset(); active_memtable->reset(); } void TestTxDataTable::do_undo_status_test() { // init tx data table ASSERT_EQ(OB_SUCCESS, tx_data_table_.init(ls_)); // the last undo action covers all the previous undo actions { ObTxData *tx_data = nullptr; ObTxDataGuard tx_data_guard; ASSERT_EQ(OB_SUCCESS, tx_data_table_.alloc_tx_data(tx_data_guard)); ASSERT_NE(nullptr, tx_data = tx_data_guard.tx_data()); tx_data->tx_id_ = rand(); for (int i = 1; i <= 1001; i++) { transaction::ObUndoAction undo_action(10 * (i + 1), 10 * i); ASSERT_EQ(OB_SUCCESS, tx_data->add_undo_action(&tx_table_, undo_action)); } ASSERT_EQ(1000 / TX_DATA_UNDO_ACT_MAX_NUM_PER_NODE + 1, tx_data->undo_status_list_.undo_node_cnt_); { transaction::ObUndoAction undo_action(10000000, 10); ASSERT_EQ(OB_SUCCESS, tx_data->add_undo_action(&tx_table_, undo_action)); } STORAGETEST_LOG(INFO, "", K(tx_data->undo_status_list_)); ASSERT_EQ(1, tx_data->undo_status_list_.head_->size_); ASSERT_EQ(nullptr, tx_data->undo_status_list_.head_->next_); ASSERT_EQ(1, tx_data->undo_status_list_.undo_node_cnt_); } { // the last undo action covers eight previous undo actions // so the undo status just have one undo status node ObTxData *tx_data = nullptr; ObTxDataGuard tx_data_guard; ASSERT_EQ(OB_SUCCESS, tx_data_table_.alloc_tx_data(tx_data_guard)); ASSERT_NE(nullptr, tx_data = tx_data_guard.tx_data()); tx_data->tx_id_ = rand(); for (int i = 1; i <= 14; i++) { transaction::ObUndoAction undo_action(i + 1, i); ASSERT_EQ(OB_SUCCESS, tx_data->add_undo_action(&tx_table_, undo_action)); } ASSERT_EQ(2, tx_data->undo_status_list_.undo_node_cnt_); { transaction::ObUndoAction undo_action(15, 7); ASSERT_EQ(OB_SUCCESS, tx_data->add_undo_action(&tx_table_, undo_action)); } STORAGETEST_LOG(INFO, "", K(tx_data->undo_status_list_)); ASSERT_EQ(7, tx_data->undo_status_list_.head_->size_); ASSERT_EQ(nullptr, tx_data->undo_status_list_.head_->next_); ASSERT_EQ(1, tx_data->undo_status_list_.undo_node_cnt_); } } void TestTxDataTable::test_serialize_with_action_cnt_(int cnt) { ObTxData *tx_data = nullptr; ObTxDataGuard tx_data_guard; ASSERT_EQ(OB_SUCCESS, tx_data_table_.alloc_tx_data(tx_data_guard)); ASSERT_NE(nullptr, tx_data = tx_data_guard.tx_data()); tx_data->tx_id_ = transaction::ObTransID(269381); tx_data->commit_version_.convert_for_logservice(ObTimeUtil::current_time_ns()); tx_data->end_scn_.convert_for_logservice(ObTimeUtil::current_time_ns()); tx_data->start_scn_.convert_for_logservice(tx_data->end_scn_.get_val_for_logservice() - 10000); tx_data->state_ = ObTxData::COMMIT; for (int i = 1; i <= cnt; i++) { transaction::ObUndoAction undo_action(10 * (i + 1), 10 * i); ASSERT_EQ(OB_SUCCESS, tx_data->add_undo_action(&tx_table_, undo_action)); } int64_t node_cnt = 0; if (cnt % 7 == 0) { node_cnt = cnt / 7; } else { node_cnt = cnt / 7 + 1; } ASSERT_EQ(node_cnt, tx_data->undo_status_list_.undo_node_cnt_); char *buf = nullptr; ObArenaAllocator allocator; int64_t serialize_size = tx_data->get_serialize_size(); int64_t pos = 0; ASSERT_NE(nullptr, buf = static_cast(allocator.alloc(serialize_size))); ASSERT_EQ(OB_SUCCESS, tx_data->serialize(buf, serialize_size, pos)); ObTxData *new_tx_data = nullptr; ObTxDataGuard new_tx_data_guard; ASSERT_EQ(OB_SUCCESS, tx_data_table_.alloc_tx_data(new_tx_data_guard)); ASSERT_NE(nullptr, new_tx_data = new_tx_data_guard.tx_data()); new_tx_data->tx_id_ = transaction::ObTransID(269381); pos = 0; ASSERT_EQ(OB_SUCCESS, new_tx_data->deserialize(buf, serialize_size, pos, *tx_data_table_.get_slice_allocator())); ASSERT_TRUE(new_tx_data->equals_(*tx_data)); tx_data->dec_ref(); new_tx_data->dec_ref(); } void TestTxDataTable::do_tx_data_serialize_test() { ASSERT_EQ(OB_SUCCESS, tx_data_table_.init(ls_)); ObTxDataMemtableMgr *memtable_mgr = tx_data_table_.get_memtable_mgr_(); set_freezer_(); init_memtable_mgr_(memtable_mgr); test_serialize_with_action_cnt_(0); test_serialize_with_action_cnt_(TX_DATA_UNDO_ACT_MAX_NUM_PER_NODE); test_serialize_with_action_cnt_(TX_DATA_UNDO_ACT_MAX_NUM_PER_NODE + 1); test_serialize_with_action_cnt_(TX_DATA_UNDO_ACT_MAX_NUM_PER_NODE * 100); test_serialize_with_action_cnt_(TX_DATA_UNDO_ACT_MAX_NUM_PER_NODE * 100 + 1); test_commit_versions_serialize_(); } void TestTxDataTable::test_commit_versions_serialize_() { ObCommitVersionsArray cur_array; ObCommitVersionsArray past_array; ObCommitVersionsArray merged_array; ObCommitVersionsArray deserialized_array; share::SCN start_scn; start_scn.convert_for_logservice(ObTimeUtil::current_time_ns()); int64_t MOD = 137171; share::SCN recycle_scn = share::SCN::plus(start_scn, 1000LL * MOD); int64_t array_cnt = 50000; STORAGE_LOG(INFO, "start generate past array"); for (int64_t i = 0; i < array_cnt; i++) { start_scn = share::SCN::plus(start_scn, (rand64(ObTimeUtil::current_time_ns()) % MOD)); ObCommitVersionsArray::Node node(start_scn, share::SCN::plus(start_scn, (rand64(ObTimeUtil::current_time_ns()) % MOD))); STORAGE_LOG(INFO, "", K(node)); ASSERT_EQ(OB_SUCCESS, past_array.array_.push_back(node)); } ASSERT_EQ(true, past_array.is_valid()); STORAGE_LOG(INFO, "start generate cur array"); for (int i = 0; i < array_cnt; i++) { start_scn = share::SCN::plus(start_scn, (rand64(ObTimeUtil::current_time_ns()) % MOD)); ObCommitVersionsArray::Node node(start_scn, share::SCN::plus(start_scn, (rand64(ObTimeUtil::current_time_ns()) % MOD))); STORAGE_LOG(DEBUG, "", K(node)); ASSERT_EQ(OB_SUCCESS, cur_array.array_.push_back(node)); } ASSERT_EQ(true, cur_array.is_valid()); ObTxDataMemtableMgr *memtable_mgr = tx_data_table_.get_memtable_mgr_(); ASSERT_NE(nullptr, memtable_mgr); ObTableHandleV2 table_handle; ASSERT_EQ(OB_SUCCESS, memtable_mgr->get_active_memtable(table_handle)); ObTxDataMemtable *tx_data_memtable = nullptr; ASSERT_EQ(OB_SUCCESS, table_handle.get_tx_data_memtable(tx_data_memtable)); ASSERT_EQ(OB_SUCCESS, tx_data_memtable->merge_cur_and_past_commit_verisons_(recycle_scn, cur_array, past_array, merged_array)); ASSERT_EQ(true, merged_array.is_valid()); int64_t serialize_size = merged_array.get_serialize_size(); ObArenaAllocator allocator; ObTxLocalBuffer buf(allocator); int64_t s_pos = 0; ASSERT_EQ(OB_SUCCESS, buf.reserve(serialize_size)); ASSERT_EQ(OB_SUCCESS, merged_array.serialize(buf.get_ptr(), serialize_size, s_pos)); int64_t d_pos = 0; ASSERT_EQ(OB_SUCCESS, deserialized_array.deserialize(buf.get_ptr(), buf.get_length(), d_pos)); ASSERT_EQ(true, deserialized_array.is_valid()); ASSERT_EQ(s_pos, d_pos); // do deserialize again on the same array d_pos = 0; ASSERT_EQ(OB_SUCCESS, deserialized_array.deserialize(buf.get_ptr(), buf.get_length(), d_pos)); ASSERT_EQ(true, deserialized_array.is_valid()); ASSERT_EQ(s_pos, d_pos); } void TestTxDataTable::do_repeat_insert_test() { ASSERT_EQ(OB_SUCCESS, tx_data_table_.init(ls_)); set_freezer_(); ObTxDataMemtableMgr *memtable_mgr = tx_data_table_.get_memtable_mgr_(); ASSERT_NE(nullptr, memtable_mgr); memtable_mgr->set_freezer(&tx_data_table_.freezer_); ASSERT_EQ(OB_SUCCESS, memtable_mgr->create_memtable(SCN::min_scn(), 1)); ASSERT_EQ(1, memtable_mgr->get_memtable_count_()); insert_start_scn.convert_for_logservice(ObTimeUtil::current_time_ns()); ObTxData *tx_data = nullptr; int64_t int_tx_id = 0; transaction::ObTransID tx_id; for (int i = 1; i <= 100; i++) { tx_id = transaction::ObTransID(269381); tx_data = nullptr; ObTxDataGuard tx_data_guard; ASSERT_EQ(OB_SUCCESS, tx_data_table_.alloc_tx_data(tx_data_guard)); ASSERT_NE(nullptr, tx_data = tx_data_guard.tx_data()); // fill in data tx_data->tx_id_ = tx_id; tx_data->start_scn_ = insert_start_scn; tx_data->commit_version_.set_invalid(); tx_data->end_scn_ = share::SCN::plus(insert_start_scn, i); tx_data->state_ = ObTxData::RUNNING; int undo_act_num = (rand() & 31) + 1; for (int j = 0; j < undo_act_num; j++) { auto from = random(); auto to = (from - 10000 > 0) ? (from - 100000) : 1; transaction::ObUndoAction undo_action(from, to); ASSERT_EQ(OB_SUCCESS, tx_data->add_undo_action(&tx_table_, undo_action)); } ASSERT_EQ(OB_SUCCESS, tx_data_table_.insert(tx_data)); } memtable_mgr->destroy(); } void TestTxDataTable::fake_ls_(ObLS &ls) { ls.ls_meta_.tenant_id_ = 1; ls.ls_meta_.ls_id_.id_ = 1001; ls.ls_meta_.gc_state_ = logservice::LSGCState::NORMAL; ls.ls_meta_.migration_status_ = ObMigrationStatus::OB_MIGRATION_STATUS_NONE; ls.ls_meta_.restore_status_ = ObLSRestoreStatus::RESTORE_NONE; ls.ls_meta_.replica_type_ = ObReplicaType::REPLICA_TYPE_FULL; ls.ls_meta_.rebuild_seq_ = 0; } void TestTxDataTable::do_print_leak_slice_test() { const int32_t CONCURRENCY = 4; ObMemAttr attr; ObSliceAlloc slice_allocator; slice_allocator.init(128, OB_MALLOC_NORMAL_BLOCK_SIZE, default_blk_alloc, attr); slice_allocator.set_nway(CONCURRENCY); std::vector alloc_threads; for (int i = 0; i < CONCURRENCY; i++) { alloc_threads.push_back(std::thread([&slice_allocator](){ int64_t alloc_times = 1123; std::vector allocated_mem_ptr; while (--alloc_times > 0) { void *ret = slice_allocator.alloc(); if (nullptr != ret) { allocated_mem_ptr.push_back(ret); } } STORAGE_LOG(INFO, "unfreed slice", KP(allocated_mem_ptr[0])); for (int k = 1; k < allocated_mem_ptr.size(); k++) { slice_allocator.free(allocated_mem_ptr[k]); } })); } for (int i = 0; i < CONCURRENCY; i++) { alloc_threads[i].join(); } slice_allocator.destroy(); } TEST_F(TestTxDataTable, basic_test) { tx_data_num = const_data_num; do_basic_test(); } TEST_F(TestTxDataTable, repeat_insert_test) { do_repeat_insert_test(); } TEST_F(TestTxDataTable, undo_status_test) { do_undo_status_test(); } TEST_F(TestTxDataTable, serialize_test) { do_tx_data_serialize_test(); } // TEST_F(TestTxDataTable, print_leak_slice) { do_print_leak_slice_test(); } } // namespace storage } // namespace oceanbase int main(int argc, char **argv) { int ret = 1; system("rm -f test_tx_data_table.log*"); system("rm -fr run_*"); ObLogger &logger = ObLogger::get_logger(); logger.set_file_name("test_tx_data_table.log", true); logger.set_log_level(OB_LOG_LEVEL_INFO); TRANS_LOG(WARN, "init memory pool error!"); // OB_LOGGER.set_enable_async_log(false); // TEST_LOG("GCONF.syslog_io_bandwidth_limit %ld ", GCONF.syslog_io_bandwidth_limit.get_value()); // LOG_INFO("GCONF.syslog_io_bandwidth_limit ", K(GCONF.syslog_io_bandwidth_limit.get_value())); // GCONF.syslog_io_bandwidth_limit.set_value("4000MB"); // TEST_LOG("GCONF.syslog_io_bandwidth_limit %ld ", GCONF.syslog_io_bandwidth_limit.get_value()); // LOG_INFO("GCONF.syslog_io_bandwidth_limit ", K(GCONF.syslog_io_bandwidth_limit.get_value())); if (OB_SUCCESS != ObClockGenerator::init()) { TRANS_LOG(WARN, "ObClockGenerator::init error!"); } else { if (argc > 1) { const_data_num = atoi(argv[1]); } else { const_data_num = 524288; } testing::InitGoogleTest(&argc, argv); ret = RUN_ALL_TESTS(); } return ret; }