// owner: baichangmin.bcm // owner group: storage /** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX STORAGE #include #define protected public #define OK(ass) ASSERT_EQ(OB_SUCCESS, (ass)) #define NOT_NULL(ass) ASSERT_NE(nullptr, (ass)) #define private public #include "storage/blocksstable/ob_row_generate.h" #include "storage/blocksstable/cs_encoding/ob_micro_block_cs_encoder.h" #include "mtlenv/mock_tenant_module_env.h" namespace oceanbase { using namespace common; using namespace compaction; using namespace blocksstable; using namespace storage; using namespace share::schema; using namespace std; static ObSimpleMemLimitGetter getter; // just for test, skip to use int ObCompactionBufferWriter::ensure_space(const int64_t size) { int ret = OB_SUCCESS; use_mem_pool_ = false; if (size <= 0) { // do nothing } else if (nullptr == data_) { // first alloc if (OB_UNLIKELY(!block_.empty())) { ret = OB_ERR_UNEXPECTED; } else { data_ = (char *) share::mtl_malloc(size, label_); pos_ = 0; capacity_ = size; block_.buffer_ = data_; block_.buffer_size_ = size; block_.type_ = ObCompactionBufferBlock::MTL_PIECE_TYPE; } } else if (capacity_ < size) { char *new_data = (char *) share::mtl_malloc(size, label_); MEMCPY(new_data, data_, pos_); data_ = new_data; capacity_ = size; block_.buffer_ = data_; block_.buffer_size_ = size; block_.type_ = ObCompactionBufferBlock::MTL_PIECE_TYPE; } return ret; } void ObCompactionBufferWriter::reset() { } namespace unittest { class TestIndexTree : public ::testing::Test { public: TestIndexTree(); virtual ~TestIndexTree(); static void SetUpTestCase(); static void TearDownTestCase(); virtual void SetUp(); virtual void TearDown(); protected: void fake_freeze_info(); void prepare_schema(); void prepare_4K_cols_schema(); void prepare_data(); void prepare_index_builder(ObWholeDataStoreDesc &data_desc, ObSSTableIndexBuilder &sstable_builder); void prepare_data_desc(ObWholeDataStoreDesc &data_desc, ObSSTableIndexBuilder *sstable_builder); void prepare_index_desc(const ObWholeDataStoreDesc &data_desc, ObWholeDataStoreDesc &index_desc); void mock_compaction(const int64_t test_row_num, ObIArray &data_write_ctxs, ObIArray &index_write_ctxs, ObMacroMetasArray *&meta_info_list, ObSSTableMergeRes &ctx, IndexTreeRootCtxList *&roots, ObSSTableIndexBuilder *&sstable_builder); int mock_init_backup_rebuilder(ObIndexBlockRebuilder &rebuilder, ObSSTableIndexBuilder &sstable_builder); // cg void mock_cg_compaction(const int64_t test_row_num, ObIArray &data_write_ctxs, ObIArray &index_write_ctxs, ObMacroMetasArray *&meta_info_list, ObSSTableMergeRes &ctx, IndexTreeRootCtxList *&roots, ObSSTableIndexBuilder *&sstable_builder); void prepare_cg_data_desc(ObWholeDataStoreDesc &data_desc, ObSSTableIndexBuilder *sstable_builder); static const int64_t TEST_COLUMN_CNT = ObExtendType - 1; static const int64_t TEST_ROWKEY_COLUMN_CNT = 2; static const int64_t TEST_ROW_CNT = 1000; static const int64_t MAX_TEST_COLUMN_CNT = TEST_COLUMN_CNT + 3; static const int64_t SNAPSHOT_VERSION = 2; const uint64_t tenant_id_; ObTenantFreezeInfoMgr *mgr_; ObDecodeResourcePool *decode_res_pool_; ObTenantCompactionMemPool *mem_pool_; share::ObTenantBase tenant_base_; ObTableSchema table_schema_; ObTableSchema index_schema_; ObRowGenerate row_generate_; ObRowGenerate index_row_generate_; ObITable::TableKey table_key_; ObDatumRow row_; ObDatumRow multi_row_; ObArenaAllocator allocator_; ObSharedMacroBlockMgr *shared_blk_mgr_; ObTimerService *timer_service_; }; TestIndexTree::TestIndexTree() : tenant_id_(500), mgr_(nullptr), decode_res_pool_(nullptr), mem_pool_(nullptr), tenant_base_(500), shared_blk_mgr_(nullptr), timer_service_(nullptr) { ObAddr self; rpc::frame::ObReqTransport req_transport(NULL, NULL); obrpc::ObSrvRpcProxy rpc_proxy; obrpc::ObCommonRpcProxy rs_rpc_proxy; share::ObRsMgr rs_mgr; int64_t tenant_id = 1; self.set_ip_addr("127.0.0.1", 8086); getter.add_tenant(tenant_id, 2L * 1024L * 1024L * 1024L, 4L * 1024L * 1024L * 1024L); } TestIndexTree::~TestIndexTree() { } void TestIndexTree::SetUpTestCase() { int ret = OB_SUCCESS; STORAGE_LOG(INFO, "SetUpTestCase"); EXPECT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init()); } void TestIndexTree::TearDownTestCase() { MockTenantModuleEnv::get_instance().destroy(); } void TestIndexTree::SetUp() { ASSERT_TRUE(MockTenantModuleEnv::get_instance().is_inited()); int ret = OB_SUCCESS; timer_service_ = OB_NEW(ObTimerService, ObModIds::TEST, 500); mgr_ = OB_NEW(ObTenantFreezeInfoMgr, ObModIds::TEST); decode_res_pool_ = OB_NEW(ObDecodeResourcePool, ObModIds::TEST); shared_blk_mgr_ = OB_NEW(ObSharedMacroBlockMgr, ObModIds::TEST); mem_pool_ = OB_NEW(ObTenantCompactionMemPool, ObModIds::TEST); tenant_base_.set(timer_service_); tenant_base_.set(shared_blk_mgr_); tenant_base_.set(mgr_); tenant_base_.set(decode_res_pool_); tenant_base_.set(mem_pool_); share::ObTenantEnv::set_tenant(&tenant_base_); ASSERT_EQ(OB_SUCCESS, timer_service_->start()); ASSERT_EQ(OB_SUCCESS, tenant_base_.init()); ASSERT_EQ(OB_SUCCESS, mgr_->init(500, *GCTX.sql_proxy_)); ASSERT_EQ(OB_SUCCESS, decode_res_pool_->init()); ASSERT_EQ(OB_SUCCESS, mem_pool_->init()); ASSERT_EQ(OB_SUCCESS, shared_blk_mgr_->init()); fake_freeze_info(); ASSERT_EQ(timer_service_, MTL(ObTimerService *)); ASSERT_EQ(shared_blk_mgr_, MTL(ObSharedMacroBlockMgr *)); ASSERT_EQ(mgr_, MTL(ObTenantFreezeInfoMgr *)); ASSERT_EQ(decode_res_pool_, MTL(ObDecodeResourcePool *)); ASSERT_EQ(mem_pool_, MTL(ObTenantCompactionMemPool *)); int tmp_ret = OB_SUCCESS; prepare_schema(); row_generate_.reset(); index_row_generate_.reset(); ret = row_generate_.init(table_schema_, &allocator_); ASSERT_EQ(OB_SUCCESS, ret); ret = index_row_generate_.init(index_schema_, &allocator_); ASSERT_EQ(OB_SUCCESS, ret); table_key_.tablet_id_ = table_schema_.get_table_id(); table_key_.table_type_ = ObITable::TableType::MINOR_SSTABLE; table_key_.version_range_.snapshot_version_ = share::OB_MAX_SCN_TS_NS - 1; ASSERT_TRUE(table_key_.is_valid()); } void TestIndexTree::TearDown() { row_generate_.reset(); index_row_generate_.reset(); table_schema_.reset(); if (nullptr != timer_service_) { timer_service_->stop(); timer_service_->wait(); timer_service_->destroy(); timer_service_ = nullptr; } tenant_base_.destroy_mtl_module(); // stop threads tenant_base_.destroy(); share::ObTenantEnv::set_tenant(nullptr); } void TestIndexTree::fake_freeze_info() { common::ObArray snapshots; ASSERT_TRUE(nullptr != mgr_); share::ObFreezeInfoList &info_list = mgr_->freeze_info_mgr_.freeze_info_; info_list.reset(); share::SCN frozen_val; frozen_val.val_ = 1; ASSERT_EQ(OB_SUCCESS, info_list.frozen_statuses_.push_back(share::ObFreezeInfo(frozen_val, 1, 0))); frozen_val.val_ = 100; ASSERT_EQ(OB_SUCCESS, info_list.frozen_statuses_.push_back(share::ObFreezeInfo(frozen_val, 1, 0))); frozen_val.val_ = 200; ASSERT_EQ(OB_SUCCESS, info_list.frozen_statuses_.push_back(share::ObFreezeInfo(frozen_val, 1, 0))); frozen_val.val_ = 400; ASSERT_EQ(OB_SUCCESS, info_list.frozen_statuses_.push_back(share::ObFreezeInfo(frozen_val, 1, 0))); info_list.latest_snapshot_gc_scn_.val_ = 500; } static void convert_to_multi_version_row(const ObDatumRow &org_row, const int64_t schema_rowkey_cnt, const int64_t column_cnt, const int64_t snapshot_version, const ObDmlFlag dml_flag, ObDatumRow &multi_row) { for (int64_t i = 0; i < schema_rowkey_cnt; ++i) { multi_row.storage_datums_[i] = org_row.storage_datums_[i]; } multi_row.storage_datums_[schema_rowkey_cnt].set_int(-snapshot_version); multi_row.storage_datums_[schema_rowkey_cnt + 1].set_int(0); for(int64_t i = schema_rowkey_cnt; i < column_cnt; ++i) { multi_row.storage_datums_[i + 2] = org_row.storage_datums_[i]; } multi_row.count_ = column_cnt + 2; if (ObDmlFlag::DF_DELETE == dml_flag) { multi_row.row_flag_= ObDmlFlag::DF_DELETE; } else { multi_row.row_flag_ = ObDmlFlag::DF_INSERT; } multi_row.mvcc_row_flag_.set_last_multi_version_row(true); } void TestIndexTree::prepare_schema() { ObColumnSchemaV2 column; uint64_t table_id = 3001; int64_t micro_block_size = 16 * 1024; //init table schema table_schema_.reset(); index_schema_.reset(); ASSERT_EQ(OB_SUCCESS, table_schema_.set_table_name("test_index_tree")); ASSERT_EQ(OB_SUCCESS, index_schema_.set_table_name("test_index_tree")); table_schema_.set_tenant_id(1); table_schema_.set_tablegroup_id(1); table_schema_.set_database_id(1); table_schema_.set_table_id(table_id); table_schema_.set_rowkey_column_num(TEST_ROWKEY_COLUMN_CNT); table_schema_.set_max_used_column_id(TEST_COLUMN_CNT); table_schema_.set_block_size(micro_block_size); table_schema_.set_compress_func_name("none"); table_schema_.set_row_store_type(ENCODING_ROW_STORE); index_schema_.set_tenant_id(1); index_schema_.set_tablegroup_id(1); index_schema_.set_database_id(1); index_schema_.set_table_id(table_id); index_schema_.set_rowkey_column_num(TEST_ROWKEY_COLUMN_CNT); index_schema_.set_max_used_column_id(TEST_ROWKEY_COLUMN_CNT + 1); index_schema_.set_block_size(micro_block_size); index_schema_.set_compress_func_name("none"); index_schema_.set_row_store_type(ENCODING_ROW_STORE); //init column ObArray out_cols; const int64_t schema_version = 1; const int64_t rowkey_count = TEST_ROWKEY_COLUMN_CNT; const int64_t column_count = TEST_COLUMN_CNT; char name[OB_MAX_FILE_NAME_LENGTH]; memset(name, 0, sizeof(name)); for(int64_t i = 0; i < TEST_COLUMN_CNT; ++i){ ObObjType obj_type = static_cast(i + 1); column.reset(); column.set_table_id(table_id); column.set_column_id(i + OB_APP_MIN_COLUMN_ID); sprintf(name, "test%020ld", i); ASSERT_EQ(OB_SUCCESS, column.set_column_name(name)); column.set_data_type(obj_type); column.set_collation_type(CS_TYPE_BINARY); column.set_data_length(1); if(obj_type == common::ObInt32Type){ column.set_rowkey_position(1); column.set_order_in_rowkey(ObOrderType::ASC); } else if(obj_type == common::ObVarcharType) { column.set_rowkey_position(2); column.set_collation_type(CS_TYPE_UTF8MB4_GENERAL_CI); column.set_order_in_rowkey(ObOrderType::DESC); } else { column.set_rowkey_position(0); } share::schema::ObColDesc col; col.col_id_ = static_cast(i + OB_APP_MIN_COLUMN_ID); col.col_type_.set_type(obj_type); ASSERT_EQ(OB_SUCCESS, out_cols.push_back(col)); ASSERT_EQ(OB_SUCCESS, table_schema_.add_column(column)); if (column.is_rowkey_column()) { ASSERT_EQ(OB_SUCCESS, index_schema_.add_column(column)); } } column.reset(); column.set_table_id(table_id); column.set_column_id(TEST_COLUMN_CNT +OB_APP_MIN_COLUMN_ID); column.set_data_type(ObVarcharType); column.set_collation_type(CS_TYPE_BINARY); column.set_data_length(1); column.set_rowkey_position(0); ASSERT_EQ(OB_SUCCESS, index_schema_.add_column(column)); } void TestIndexTree::prepare_4K_cols_schema() { ObColumnSchemaV2 column; uint64_t table_id = 3001; int64_t micro_block_size = 16 * 1024; const int64_t test_rowkey_cnt = 1; const int64_t test_col_cnt = 4096; //init table schema table_schema_.reset(); index_schema_.reset(); ASSERT_EQ(OB_SUCCESS, table_schema_.set_table_name("test_index_tree")); ASSERT_EQ(OB_SUCCESS, index_schema_.set_table_name("test_index_tree")); table_schema_.set_tenant_id(1); table_schema_.set_tablegroup_id(1); table_schema_.set_database_id(1); table_schema_.set_table_id(table_id); table_schema_.set_rowkey_column_num(test_rowkey_cnt); table_schema_.set_max_used_column_id(test_col_cnt); table_schema_.set_block_size(micro_block_size); table_schema_.set_compress_func_name("none"); table_schema_.set_row_store_type(FLAT_ROW_STORE); index_schema_.set_tenant_id(1); index_schema_.set_tablegroup_id(1); index_schema_.set_database_id(1); index_schema_.set_table_id(table_id); index_schema_.set_rowkey_column_num(test_rowkey_cnt); index_schema_.set_max_used_column_id(test_col_cnt + 1); index_schema_.set_block_size(micro_block_size); index_schema_.set_compress_func_name("none"); index_schema_.set_row_store_type(FLAT_ROW_STORE); //init column ObArray out_cols; const int64_t schema_version = 1; const int64_t rowkey_count = test_rowkey_cnt; const int64_t column_count = test_col_cnt; char name[OB_MAX_FILE_NAME_LENGTH]; memset(name, 0, sizeof(name)); for(int64_t i = 0; i < test_col_cnt; ++i){ ObObjType obj_type = ObObjType::ObIntType; column.reset(); column.set_table_id(table_id); column.set_column_id(i + OB_APP_MIN_COLUMN_ID); sprintf(name, "test%020ld", i); ASSERT_EQ(OB_SUCCESS, column.set_column_name(name)); column.set_data_type(obj_type); column.set_collation_type(CS_TYPE_BINARY); column.set_data_length(1); if(0 == i){ column.set_rowkey_position(1); column.set_order_in_rowkey(ObOrderType::ASC); } else { column.set_rowkey_position(0); } share::schema::ObColDesc col; col.col_id_ = static_cast(i + OB_APP_MIN_COLUMN_ID); col.col_type_.set_type(obj_type); ASSERT_EQ(OB_SUCCESS, out_cols.push_back(col)); ASSERT_EQ(OB_SUCCESS, table_schema_.add_column(column)); if (column.is_rowkey_column()) { ASSERT_EQ(OB_SUCCESS, index_schema_.add_column(column)); } } column.reset(); column.set_table_id(table_id); column.set_column_id(test_col_cnt +OB_APP_MIN_COLUMN_ID); column.set_data_type(ObVarcharType); column.set_collation_type(CS_TYPE_BINARY); column.set_data_length(1); column.set_rowkey_position(0); ASSERT_EQ(OB_SUCCESS, index_schema_.add_column(column)); } void TestIndexTree::prepare_data() { int ret = OB_SUCCESS; //ObSSTable *data_sstable = NULL; ObWholeDataStoreDesc data_desc; ObMacroBlockWriter writer; ObRowGenerate data_row_generate; ObDatumRow multi_row; ASSERT_EQ(OB_SUCCESS, multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; ret = data_desc.init(false/*is_ddl*/, table_schema_, ObLSID(1), ObTabletID(1), MAJOR_MERGE, ObTimeUtility::fast_current_time()/*snapshot_version*/, DATA_CURRENT_VERSION, table_schema_.get_micro_index_clustered(), 0/*transfer_seq*/); ASSERT_EQ(OB_SUCCESS, ret); ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; ret = writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner); ASSERT_EQ(OB_SUCCESS, ret); ObDatumRow row; ASSERT_EQ(OB_SUCCESS, row.init(allocator_, TEST_COLUMN_CNT)); int64_t free_macro_block_cnt = OB_SERVER_BLOCK_MGR.get_free_macro_block_count(); int64_t row_id = 0; while (free_macro_block_cnt - OB_SERVER_BLOCK_MGR.get_free_macro_block_count() > 0) { ret = row_generate_.get_next_row(row_id, row); ASSERT_EQ(OB_SUCCESS, ret); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); ret = writer.append_row(multi_row); ASSERT_EQ(OB_SUCCESS, ret) << "rowkey cnt: " << table_schema_.get_rowkey_column_num() << "column cnt: " << table_schema_.get_column_count(); ++row_id; } int64_t border_row_id = row_id; row_id += 10000; free_macro_block_cnt = OB_SERVER_BLOCK_MGR.get_free_macro_block_count(); while (free_macro_block_cnt - OB_SERVER_BLOCK_MGR.get_free_macro_block_count() > 0) { ret = row_generate_.get_next_row(row_id, row); ASSERT_EQ(OB_SUCCESS, ret); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); ret = writer.append_row(multi_row); ASSERT_EQ(OB_SUCCESS, ret) << "rowkey cnt: " << table_schema_.get_rowkey_column_num() << "column cnt: " << table_schema_.get_column_count(); ++row_id; } ret = writer.close(); ASSERT_EQ(OB_SUCCESS, ret); writer.reset(); ObSSTablePrivateObjectCleaner another_cleaner; ret = writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, another_cleaner); ASSERT_EQ(OB_SUCCESS, ret); for (int64_t i = border_row_id + 1; i < border_row_id + 10000; ++i) { ret = row_generate_.get_next_row(i, row); ASSERT_EQ(OB_SUCCESS, ret); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); ret = writer.append_row(multi_row); ASSERT_EQ(OB_SUCCESS, ret) << "rowkey cnt: " << table_schema_.get_rowkey_column_num() << "column cnt: " << table_schema_.get_column_count(); ASSERT_EQ(OB_SUCCESS, ret); } ret = writer.close(); ASSERT_EQ(OB_SUCCESS, ret); } class TestIndexTreeStress : public share::ObThreadPool { public: TestIndexTreeStress(); virtual ~TestIndexTreeStress() {} int init(uint64_t tenant_id, const int64_t thread_cnt, ObRowGenerate &row_generate, ObWholeDataStoreDesc &data_store_desc, ObMacroDataSeq &start_seq, ObSSTableSecMetaIterator *lob_iter, ObMacroBlockWriter *index_writer = NULL); virtual void run1(); private: uint64_t tenant_id_; int64_t thread_cnt_; int64_t row_count_; ObArenaAllocator allocator_; ObRowGenerate *row_generate_; ObWholeDataStoreDesc *data_store_desc_; ObMacroDataSeq *start_seq_; ObSSTableSecMetaIterator *lob_iter_; ObMacroBlockWriter *index_writer_; ObSSTableIndexBuilder *sstable_builder_; common::SpinRWLock lock_; share::ObTenantBase tenant_base_; ObDecodeResourcePool *decode_res_pool_; }; TestIndexTreeStress::TestIndexTreeStress() : tenant_id_(0), thread_cnt_(0), row_count_(0), allocator_(), row_generate_(NULL), data_store_desc_(NULL), start_seq_(NULL), lob_iter_(NULL), index_writer_(NULL), sstable_builder_(NULL), lock_(), tenant_base_(500), decode_res_pool_(nullptr) { } int TestIndexTreeStress::init(uint64_t tenant_id, const int64_t thread_cnt, ObRowGenerate &row_generate, ObWholeDataStoreDesc &data_store_desc, ObMacroDataSeq &start_seq, ObSSTableSecMetaIterator *lob_iter, ObMacroBlockWriter *index_writer) { int ret = OB_SUCCESS; if (thread_cnt < 0) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid argument", K(ret), K(thread_cnt)); } else { tenant_id_ = tenant_id; thread_cnt_ = thread_cnt; row_count_ = 0; row_generate_ = &row_generate; data_store_desc_ = &data_store_desc; start_seq_ = &start_seq; lob_iter_ = lob_iter; index_writer_ = index_writer; set_thread_count(static_cast(thread_cnt)); decode_res_pool_ = new(allocator_.alloc(sizeof(ObDecodeResourcePool))) ObDecodeResourcePool; tenant_base_.set(decode_res_pool_); share::ObTenantEnv::set_tenant(&tenant_base_); tenant_base_.init(); decode_res_pool_->init(); } return ret; } void TestIndexTreeStress::run1() { int ret = OB_SUCCESS; ObTenantEnv::set_tenant(&tenant_base_); ObMacroBlockWriter *macro_writer = new ObMacroBlockWriter(); //{ SpinWLockGuard guard(lock_); start_seq_->set_parallel_degree(get_thread_idx() + 2); ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = start_seq_->macro_data_seq_; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; ret = macro_writer->open(data_store_desc_->get_desc(), start_seq_->get_parallel_idx(), seq_param, pre_warm_param, cleaner); //} ASSERT_EQ(OB_SUCCESS, ret); static const int64_t MAX_TEST_COLUMN_CNT = TestIndexTree::TEST_COLUMN_CNT + 3; ObDatumRow multi_row; ASSERT_EQ(OB_SUCCESS, multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; int64_t test_row_num = row_count_ > 0 ? row_count_ : TestIndexTree::TEST_ROW_CNT; ObDatumRow row; ObArenaAllocator allocator; ASSERT_EQ(OB_SUCCESS, row.init(allocator, TestIndexTree::TEST_COLUMN_CNT)); for(int64_t i = 0; i < test_row_num; ++i) { ASSERT_EQ(OB_SUCCESS, row_generate_->get_next_row(row)); convert_to_multi_version_row(row, TestIndexTree::TEST_ROWKEY_COLUMN_CNT, TestIndexTree::TEST_COLUMN_CNT, 2, dml, multi_row); ASSERT_EQ(OB_SUCCESS, macro_writer->append_row(multi_row)); } ASSERT_EQ(OB_SUCCESS, macro_writer->close()); delete macro_writer; } void TestIndexTree::prepare_index_desc( const ObWholeDataStoreDesc &data_desc, ObWholeDataStoreDesc &index_desc) { int ret = OB_SUCCESS; ret = index_desc.gen_index_store_desc(data_desc.get_desc()); ASSERT_EQ(OB_SUCCESS, ret); } void TestIndexTree::prepare_index_builder(ObWholeDataStoreDesc &data_desc, ObSSTableIndexBuilder &sstable_builder) { int ret = OB_SUCCESS; prepare_data_desc(data_desc, &sstable_builder); ret = sstable_builder.init(data_desc.get_desc()); ASSERT_EQ(OB_SUCCESS, ret); } void TestIndexTree::prepare_data_desc(ObWholeDataStoreDesc &data_desc, ObSSTableIndexBuilder *sstable_builder) { int ret = OB_SUCCESS; ret = data_desc.init(false/*is_ddl*/, table_schema_, ObLSID(1), ObTabletID(1), MAJOR_MERGE, ObTimeUtility::fast_current_time()/*snapshot_version*/, DATA_CURRENT_VERSION, table_schema_.get_micro_index_clustered(), 0/*transfer_seq*/); data_desc.get_desc().sstable_index_builder_ = sstable_builder; ASSERT_EQ(OB_SUCCESS, ret); } void TestIndexTree::prepare_cg_data_desc(ObWholeDataStoreDesc &data_desc, ObSSTableIndexBuilder *sstable_builder) { uint16_t cg_cols[1]; ObWholeDataStoreDesc desc; share::SCN scn; scn.convert_for_tx(SNAPSHOT_VERSION); const bool is_ddl = false; ASSERT_EQ(OB_SUCCESS, desc.init(is_ddl, table_schema_, ObLSID(1), ObTabletID(1), MAJOR_MERGE, SNAPSHOT_VERSION, DATA_CURRENT_VERSION, false/*micro_index_clustered*/, 0/*transfer_seq*/)); ObIArray &col_descs = desc.get_desc().col_desc_->col_desc_array_; for (int64_t i = 0; i < col_descs.count(); ++i) { if (col_descs.at(i).col_type_.type_ == ObIntType) { cg_cols[0] = i; } } storage::ObStorageColumnGroupSchema cg_schema; cg_schema.type_ = share::schema::SINGLE_COLUMN_GROUP; cg_schema.compressor_type_ = table_schema_.get_compressor_type(); cg_schema.row_store_type_ = table_schema_.get_row_store_type(); cg_schema.block_size_ = table_schema_.get_block_size(); cg_schema.column_cnt_ = 1; cg_schema.schema_column_cnt_ = table_schema_.get_column_count(); cg_schema.column_idxs_ = cg_cols; ASSERT_EQ(OB_SUCCESS, data_desc.init(is_ddl, table_schema_, ObLSID(1), ObTabletID(1), MAJOR_MERGE, SNAPSHOT_VERSION, DATA_CURRENT_VERSION, false/*micro_index_clustered*/, 0/*transfer_seq*/, scn, &cg_schema, 0)); data_desc.get_desc().static_desc_->schema_version_ = 10; data_desc.get_desc().sstable_index_builder_ = sstable_builder; } void TestIndexTree::mock_compaction(const int64_t test_row_num, ObIArray &data_write_ctxs, ObIArray &index_write_ctxs, ObMacroMetasArray *&meta_info_list, ObSSTableMergeRes &res, IndexTreeRootCtxList *&roots, ObSSTableIndexBuilder *&sstable_builder) { int ret = OB_SUCCESS; ObWholeDataStoreDesc data_desc; char *meta_list_buf = static_cast (allocator_.alloc(sizeof(ObMacroMetasArray))); ASSERT_NE(nullptr, meta_list_buf); meta_info_list = new (meta_list_buf) ObMacroMetasArray(); char *buf = static_cast (allocator_.alloc(sizeof(ObSSTableIndexBuilder))); ASSERT_NE(nullptr, buf); sstable_builder = new (buf) ObSSTableIndexBuilder(false /* not need writer buffer*/); prepare_index_builder(data_desc, *sstable_builder); ObDatumRow multi_row; ASSERT_EQ(OB_SUCCESS, multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; ObMacroBlockWriter data_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; ASSERT_EQ(OB_SUCCESS, data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner)); ObDatumRow row; ASSERT_EQ(OB_SUCCESS, row.init(allocator_, TEST_COLUMN_CNT)); for(int64_t i = 0; i < test_row_num; ++i){ ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(i, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); ASSERT_EQ(OB_SUCCESS, data_writer.append_row(multi_row)); ASSERT_EQ(OB_SUCCESS, data_writer.build_micro_block()); ASSERT_EQ(OB_SUCCESS, data_writer.try_switch_macro_block()); ObDataMacroBlockMeta *dst_macro_meta = nullptr; ASSERT_EQ(OB_SUCCESS, data_writer.builder_->macro_meta_.deep_copy(dst_macro_meta, allocator_)); ASSERT_EQ(OB_SUCCESS, meta_info_list->push_back(dst_macro_meta)); } ObMacroBlock &fir_blk = data_writer.macro_blocks_[0]; ObMacroBlock &sec_blk = data_writer.macro_blocks_[1]; ASSERT_EQ(fir_blk.original_size_, fir_blk.data_zsize_); ASSERT_EQ(sec_blk.original_size_, sec_blk.data_zsize_); ASSERT_EQ(OB_SUCCESS, data_writer.close()); ASSERT_EQ(OB_SUCCESS, sstable_builder->close(res)); ASSERT_EQ(false, res.table_backup_flag_.has_backup()); ASSERT_EQ(true, res.table_backup_flag_.has_local()); ObSSTableMergeRes tmp_res; ASSERT_EQ(OB_ERR_UNEXPECTED, data_writer.close()); // not re-entrant OK(sstable_builder->close(tmp_res)); // re-entrant ASSERT_EQ(tmp_res.root_desc_.buf_, res.root_desc_.buf_); ASSERT_EQ(tmp_res.data_root_desc_.buf_, res.data_root_desc_.buf_); ASSERT_EQ(tmp_res.data_blocks_cnt_, res.data_blocks_cnt_); ASSERT_EQ(false, tmp_res.table_backup_flag_.has_backup()); ASSERT_EQ(true, tmp_res.table_backup_flag_.has_local()); OK(data_write_ctxs.push_back(sstable_builder->roots_[0]->data_write_ctx_)); roots = &(sstable_builder->roots_); } int TestIndexTree::mock_init_backup_rebuilder(ObIndexBlockRebuilder &rebuilder, ObSSTableIndexBuilder &sstable_builder) { int ret = OB_SUCCESS; ObDataStoreDesc *data_store_desc = nullptr; ObDataStoreDesc *container_store_desc = nullptr; ObDataStoreDesc *leaf_store_desc = nullptr; //unused ObSEArray device_handle_array; if (OB_UNLIKELY(rebuilder.is_inited_)) { ret = OB_INIT_TWICE; STORAGE_LOG(WARN, "ObIndexBlockRebuilder has been inited", K(ret)); } else if (OB_FAIL(sstable_builder.init_builder_ptrs(rebuilder.sstable_builder_, data_store_desc, rebuilder.index_store_desc_, leaf_store_desc, container_store_desc, rebuilder.index_tree_root_ctx_))) { STORAGE_LOG(WARN, "fail to init referemce pointer members", K(ret)); } else if (OB_FAIL(rebuilder.meta_store_desc_.shallow_copy(*rebuilder.index_store_desc_))) { STORAGE_LOG(WARN, "fail to assign leaf store desc", K(ret), KPC(rebuilder.index_store_desc_)); } else if (rebuilder.meta_row_.init(rebuilder.task_allocator_, container_store_desc->get_row_column_count())) { STORAGE_LOG(WARN, "fail to init meta row", K(ret), K(container_store_desc->get_row_column_count())); } else if (FALSE_IT(rebuilder.set_task_type(rebuilder.index_store_desc_->is_cg(), false, &device_handle_array))) { // device_handle_array size must be 2, and the 1st one is index tree, the 2nd one is meta tree } else if (OB_UNLIKELY(rebuilder.micro_index_clustered())) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "micro_cluster_index is not support for backup task", K(ret), K(rebuilder.index_tree_root_ctx_->task_type_)); } else if (OB_ISNULL(rebuilder.meta_tree_dumper_ = OB_NEWx(ObBaseIndexBlockDumper, &rebuilder.task_allocator_))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "fail to alloc meta tree dumper for rebuilder", K(ret)); } else if (OB_FAIL(rebuilder.meta_tree_dumper_->init( *rebuilder.index_store_desc_, *container_store_desc, rebuilder.index_store_desc_->sstable_index_builder_, *rebuilder.index_tree_root_ctx_->allocator_, rebuilder.task_allocator_, true, rebuilder.sstable_builder_->enable_dump_disk(), nullptr))) { STORAGE_LOG(WARN, "fail to init meta tree dumper", K(ret)); } if (OB_SUCC(ret) && rebuilder.index_tree_root_ctx_->is_backup_task() && rebuilder.sstable_builder_->enable_dump_disk()) { if (OB_ISNULL(rebuilder.index_tree_dumper_ = OB_NEWx(ObIndexTreeBlockDumper, &rebuilder.task_allocator_))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "fail to alloc index tree dumper for rebuilder", K(ret)); } else if (OB_FAIL(rebuilder.index_tree_dumper_->init( *data_store_desc, *rebuilder.index_store_desc_, data_store_desc->sstable_index_builder_, *container_store_desc, *rebuilder.index_tree_root_ctx_->allocator_, rebuilder.task_allocator_, true, rebuilder.sstable_builder_->enable_dump_disk(), nullptr))) { STORAGE_LOG(WARN, "fail to init index tree dumper", K(ret)); } else if (OB_ISNULL(rebuilder.index_tree_root_ctx_->data_blocks_info_ = OB_NEWx(ObDataBlockInfo, rebuilder.index_tree_root_ctx_->allocator_))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "fail to alloc data blocks info for root ctx", K(ret)); } else if (OB_FAIL(rebuilder.index_tree_root_ctx_->data_blocks_info_->data_column_checksums_.reserve(rebuilder.index_store_desc_->get_full_stored_col_cnt()))) { STORAGE_LOG(WARN, "fail to reserve data column checksums", K(ret), "column count", rebuilder.index_store_desc_->get_full_stored_col_cnt()); } else { rebuilder.index_tree_dumper_->need_build_next_row_ = true; rebuilder.meta_tree_dumper_->need_build_next_row_ = true; rebuilder.index_tree_root_ctx_->data_blocks_info_->data_column_cnt_ = rebuilder.index_store_desc_->get_full_stored_col_cnt(); for (int64_t i = 0; OB_SUCC(ret) && i < rebuilder.index_store_desc_->get_full_stored_col_cnt(); i++) { if (OB_FAIL(rebuilder.index_tree_root_ctx_->data_blocks_info_->data_column_checksums_.push_back(0))) { STORAGE_LOG(WARN, "failed to push column checksum", K(ret)); } } STORAGE_LOG(DEBUG, "print data blocks_info", K(rebuilder.index_tree_root_ctx_->data_blocks_info_->data_column_checksums_.capacity_), K(rebuilder.index_tree_root_ctx_)); } } if (OB_FAIL(ret)) { if (OB_NOT_NULL(rebuilder.index_tree_root_ctx_->data_blocks_info_)) { rebuilder.index_tree_root_ctx_->data_blocks_info_->~ObDataBlockInfo(); rebuilder.index_tree_root_ctx_->allocator_->free(rebuilder.index_tree_root_ctx_->data_blocks_info_); rebuilder.index_tree_root_ctx_->data_blocks_info_ = nullptr; } if (OB_NOT_NULL(rebuilder.index_tree_dumper_)) { rebuilder.index_tree_dumper_->~ObIndexTreeBlockDumper(); rebuilder.task_allocator_.free(rebuilder.index_tree_dumper_); rebuilder.index_tree_dumper_ = nullptr; } if (OB_NOT_NULL(rebuilder.meta_tree_dumper_)) { rebuilder.meta_tree_dumper_->~ObBaseIndexBlockDumper(); rebuilder.task_allocator_.free(rebuilder.meta_tree_dumper_); rebuilder.meta_tree_dumper_ = nullptr; } } if (OB_FAIL(ret)) { } else { STORAGE_LOG(INFO, "succeed to init ObIndexBlockRebuilder", "task_idx", rebuilder.index_tree_root_ctx_->task_idx_, "task_type", rebuilder.index_tree_root_ctx_->task_type_, KP(rebuilder.device_handle_), KPC(rebuilder.index_store_desc_), KPC(container_store_desc)); rebuilder.is_inited_ = true; rebuilder.compressor_type_ = rebuilder.index_store_desc_->get_compressor_type(); rebuilder.device_handle_ = nullptr; } return ret; } void TestIndexTree::mock_cg_compaction(const int64_t test_row_num, ObIArray &data_write_ctxs, ObIArray &index_write_ctxs, ObMacroMetasArray *&meta_info_list, ObSSTableMergeRes &res, IndexTreeRootCtxList *&roots, ObSSTableIndexBuilder *&sstable_builder) { int ret = OB_SUCCESS; ObWholeDataStoreDesc data_desc; char *meta_list_buf = static_cast (allocator_.alloc(sizeof(ObMacroMetasArray))); ASSERT_NE(nullptr, meta_list_buf); meta_info_list = new (meta_list_buf) ObMacroMetasArray(); char *buf = static_cast (allocator_.alloc(sizeof(ObSSTableIndexBuilder))); ASSERT_NE(nullptr, buf); sstable_builder = new (buf) ObSSTableIndexBuilder(false/*use_double_write_buffer*/); prepare_cg_data_desc(data_desc, sstable_builder); ASSERT_EQ(OB_SUCCESS, sstable_builder->init(data_desc.get_desc())); ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObMacroBlockWriter data_writer; ObSSTablePrivateObjectCleaner cleaner; ASSERT_EQ(OB_SUCCESS, data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param, pre_warm_param, cleaner)); ObDatumRow cg_row; ASSERT_EQ(OB_SUCCESS, cg_row.init(1)); for(int64_t i = 0; i < test_row_num; ++i){ cg_row.storage_datums_[0].set_int(i); ASSERT_EQ(OB_SUCCESS, data_writer.append_row(cg_row)); ASSERT_EQ(OB_SUCCESS, data_writer.build_micro_block()); ASSERT_EQ(OB_SUCCESS, data_writer.try_switch_macro_block()); ObDataMacroBlockMeta *dst_macro_meta = nullptr; ASSERT_EQ(OB_SUCCESS, data_writer.builder_->macro_meta_.deep_copy(dst_macro_meta, allocator_)); ASSERT_EQ(OB_SUCCESS, meta_info_list->push_back(dst_macro_meta)); } ObMacroBlock &fir_blk = data_writer.macro_blocks_[0]; ObMacroBlock &sec_blk = data_writer.macro_blocks_[1]; ASSERT_EQ(fir_blk.original_size_, fir_blk.data_zsize_); ASSERT_EQ(sec_blk.original_size_, sec_blk.data_zsize_); ASSERT_EQ(OB_SUCCESS, data_writer.close()); ASSERT_EQ(OB_SUCCESS, sstable_builder->close(res)); ASSERT_EQ(false, res.table_backup_flag_.has_backup()); ASSERT_EQ(true, res.table_backup_flag_.has_local()); ObSSTableMergeRes tmp_res; ASSERT_EQ(OB_ERR_UNEXPECTED, data_writer.close()); // not re-entrant OK(sstable_builder->close(tmp_res)); // re-entrant ASSERT_EQ(tmp_res.root_desc_.buf_, res.root_desc_.buf_); ASSERT_EQ(tmp_res.data_root_desc_.buf_, res.data_root_desc_.buf_); ASSERT_EQ(tmp_res.data_blocks_cnt_, res.data_blocks_cnt_); ASSERT_EQ(false, tmp_res.table_backup_flag_.has_backup()); ASSERT_EQ(true, tmp_res.table_backup_flag_.has_local()); OK(data_write_ctxs.push_back(sstable_builder->roots_[0]->data_write_ctx_)); roots = &(sstable_builder->roots_); } TEST_F(TestIndexTree, test_macro_id_index_block) { LOG_INFO("BEGIN TestIndexTree.test_macro_id_index_block"); int ret = OB_SUCCESS; ObDatumRow row; ASSERT_EQ(OB_SUCCESS, row.init(allocator_, TEST_COLUMN_CNT)); ObDatumRow multi_row; ASSERT_EQ(OB_SUCCESS, multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; ObWholeDataStoreDesc data_desc_small; data_desc_small.get_static_desc().micro_block_size_limit_ = 1 * 1024L; ObSSTableIndexBuilder sstable_builder(false /* not need writer buffer*/); prepare_index_builder(data_desc_small, sstable_builder); ObWholeDataStoreDesc data_desc; prepare_data_desc(data_desc, &sstable_builder); int64_t num = 100; ObMacroBlockWriter data_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = num; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; ASSERT_EQ(OB_SUCCESS, data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param, pre_warm_param, cleaner)); ObDataIndexBlockBuilder *builder = data_writer.builder_; ASSERT_NE(data_writer.builder_, nullptr); ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(0, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); ASSERT_EQ(OB_SUCCESS, data_writer.append_row(multi_row)); ASSERT_EQ(OB_SUCCESS, data_writer.build_micro_block()); const MacroBlockId first_macro_id = data_writer.macro_handles_[0].get_macro_id(); ASSERT_TRUE(first_macro_id.is_valid()); ASSERT_EQ(OB_SUCCESS, data_writer.close()); ObDataMacroBlockMeta macro_meta; ObIndexBlockLoader index_block_loader; ObDatumRow meta_row; OK(index_block_loader.init(allocator_, CLUSTER_CURRENT_VERSION)); OK(index_block_loader.open(sstable_builder.roots_[0]->meta_block_info_)); OK(meta_row.init(allocator_, sstable_builder.container_store_desc_.get_row_column_count())); OK(index_block_loader.get_next_row(meta_row)); OK(macro_meta.parse_row(meta_row)); ASSERT_EQ(macro_meta.val_.macro_id_, first_macro_id); // read macro block ObMacroBlockReadInfo read_info; ObMacroBlockHandle macro_handle; const int64_t macro_block_size = 2 * 1024 * 1024; read_info.macro_block_id_ = first_macro_id; read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ); read_info.offset_ = 0; read_info.size_ = macro_block_size; read_info.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS; ASSERT_NE(nullptr, read_info.buf_ = reinterpret_cast(allocator_.alloc(read_info.size_))); OK(ObBlockManager::read_block(read_info, macro_handle)); ASSERT_NE(macro_handle.get_buffer(), nullptr); ASSERT_EQ(macro_handle.get_data_size(), macro_block_size); const char *macro_block_buf = macro_handle.get_buffer(); int64_t pos = sizeof(ObMacroBlockCommonHeader); ObSSTableMacroBlockHeader macro_header; OK(macro_header.deserialize(macro_block_buf, macro_block_size, pos)); ASSERT_EQ(macro_header.fixed_header_.idx_block_offset_, macro_meta.val_.block_offset_); // n-1 level block offset ASSERT_EQ(macro_header.fixed_header_.idx_block_size_, macro_meta.val_.block_size_); // n-1 level block size int64_t size = macro_meta.val_.block_size_; const char *leaf_block_buf = macro_block_buf + macro_meta.val_.block_offset_; int64_t payload_size = 0; const char *payload_buf = nullptr; ObMicroBlockHeader header; pos = 0; OK(header.deserialize(leaf_block_buf, size, pos)); OK(header.check_and_get_record( leaf_block_buf, size, MICRO_BLOCK_HEADER_MAGIC, payload_buf, payload_size)); ObMicroBlockData root_block(payload_buf, payload_size); ObMicroBlockReaderHelper reader_helper; ObIMicroBlockReader *micro_reader; ASSERT_EQ(OB_SUCCESS, reader_helper.init(allocator_)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sstable_builder.index_store_desc_.get_desc().row_store_type_, micro_reader)); OK(micro_reader->init(root_block, nullptr)); ObDatumRow index_row; OK(index_row.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); STORAGE_LOG(INFO, "macro block id", K(index_row)); void *buf = allocator_.alloc(common::OB_ROW_MAX_COLUMNS_COUNT * sizeof(common::ObObj)); int64_t row_count = micro_reader->row_count(); ASSERT_EQ(row_count, 1); for (int64_t it = 0; it != row_count; ++it) { OK(micro_reader->get_row(it, index_row)); ObString val = index_row.storage_datums_[TEST_ROWKEY_COLUMN_CNT + 2].get_string(); ObIndexBlockRowHeader *header = reinterpret_cast(val.ptr()); ASSERT_EQ(header->get_macro_id(), ObIndexBlockRowHeader::DEFAULT_IDX_ROW_MACRO_ID); } LOG_INFO("FINISH TestIndexTree.test_macro_id_index_block"); } TEST_F(TestIndexTree, test_macro_writer_bug1) { LOG_INFO("BEGIN TestIndexTree.test_macro_writer_bug1"); int ret = OB_SUCCESS; ObDatumRow row; OK(row.init(allocator_, TEST_COLUMN_CNT)); ObDatumRow multi_row; OK(multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; ObWholeDataStoreDesc data_desc; prepare_data_desc(data_desc, nullptr); ObMacroBlockWriter data_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; OK(data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner)); OK(row_generate_.get_next_row(0, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); OK(data_writer.append_row(multi_row)); OK(data_writer.build_micro_block()); OK(data_writer.try_switch_macro_block()); // force split // failed to append fake_block, but succeeded to switch block again ObMacroBlockDesc fake_block; // TODO(baichangmin): replace nullptr later. ASSERT_EQ(OB_INVALID_ARGUMENT, data_writer.append_macro_block(fake_block, nullptr)); OK(row_generate_.get_next_row(1, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); OK(data_writer.append_row(multi_row)); OK(data_writer.close()); LOG_INFO("FINISH TestIndexTree.test_macro_writer_bug1"); } TEST_F(TestIndexTree, test_index_macro_writer) { LOG_INFO("BEGIN TestIndexTree.test_index_macro_writer"); int ret = OB_SUCCESS; ObWholeDataStoreDesc data_desc; ObSSTableIndexBuilder sstable_builder(false /* not need writer buffer*/); prepare_index_builder(data_desc, sstable_builder); ObDatumRow multi_row; ASSERT_EQ(OB_SUCCESS, multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; ObDataStoreDesc &desc = sstable_builder.container_store_desc_; ASSERT_EQ(nullptr, desc.sstable_index_builder_); ObMacroBlockWriter container_macro_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; ret = container_macro_writer.open(desc, 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner); ASSERT_EQ(OB_SUCCESS, ret); const int64_t test_row_num = 100; ObDatumRow row; ASSERT_EQ(OB_SUCCESS, row.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 1)); for(int64_t i = 0; i < test_row_num; ++i){ ASSERT_EQ(OB_SUCCESS, index_row_generate_.get_next_row(row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), TEST_ROWKEY_COLUMN_CNT + 1, SNAPSHOT_VERSION, dml, multi_row); ASSERT_EQ(OB_SUCCESS, container_macro_writer.append_row(multi_row)); } ASSERT_EQ(OB_SUCCESS, container_macro_writer.close()); LOG_INFO("FINISH TestIndexTree.test_index_macro_writer"); } TEST_F(TestIndexTree, test_empty_index_tree) { LOG_INFO("BEGIN TestIndexTree.test_empty_index_tree"); int ret = OB_SUCCESS; ObWholeDataStoreDesc data_desc; ObSSTableIndexBuilder sstable_builder(false /* not need writer buffer */); prepare_index_builder(data_desc, sstable_builder); prepare_data_desc(data_desc, &sstable_builder); ObMacroBlockWriter data_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; ASSERT_EQ(OB_SUCCESS, data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner)); // do not insert any data ASSERT_EQ(OB_SUCCESS, data_writer.close()); ASSERT_EQ(1, sstable_builder.roots_.count()); ObSSTableMergeRes res; ret = sstable_builder.close(res); ASSERT_EQ(0, sstable_builder.roots_.count()); ASSERT_EQ(OB_SUCCESS, ret); ASSERT_TRUE(res.root_desc_.is_empty()); ASSERT_EQ(false, res.table_backup_flag_.has_backup()); ASSERT_EQ(false, res.table_backup_flag_.has_local()); // test rebuild macro blocks ObSSTablePrivateObjectCleaner cleaner2; ASSERT_EQ(OB_SUCCESS, data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner2)); // do not insert any data ASSERT_EQ(OB_SUCCESS, data_writer.close()); ASSERT_EQ(1, sstable_builder.roots_.count()); ObSSTableIndexBuilder::ObMacroMetaIter macro_iter; sstable_builder.init_meta_iter(allocator_, macro_iter); ASSERT_EQ(0, sstable_builder.roots_.count()); ret = sstable_builder.close(res); ASSERT_EQ(OB_SUCCESS, ret); ASSERT_TRUE(res.root_desc_.is_empty()); ASSERT_EQ(false, res.table_backup_flag_.has_backup()); ASSERT_EQ(false, res.table_backup_flag_.has_local()); // test easy index tree ObDatumRow row; OK(row.init(allocator_, TEST_COLUMN_CNT)); ObDatumRow multi_row; OK(multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; { // mock sstable builder release before index_builder and rebuilder ObMacroBlockWriter data_writer; ObIndexBlockRebuilder rebuilder; { ObWholeDataStoreDesc other_data_desc; ObSSTableIndexBuilder other_sstable_builder(false /* not need writer buffer*/); prepare_index_builder(other_data_desc, other_sstable_builder); prepare_data_desc(other_data_desc, &other_sstable_builder); OK(rebuilder.init(other_sstable_builder, nullptr, ObITable::TableKey())); ObSSTablePrivateObjectCleaner cleaner; OK(data_writer.open(other_data_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner)); for(int64_t i = 0; i < 10; ++i) { ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(i, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); ASSERT_EQ(OB_SUCCESS, data_writer.append_row(multi_row)); } OK(data_writer.close()); } // release other_sstable_builder data_writer.reset(); rebuilder.reset(); } LOG_INFO("FINISH TestIndexTree.test_empty_index_tree"); } TEST_F(TestIndexTree, test_accumulative_info) { LOG_INFO("BEGIN TestIndexTree.test_accumulative_info"); int ret = OB_SUCCESS; ObWholeDataStoreDesc data_desc; prepare_data_desc(data_desc, nullptr); ObWholeDataStoreDesc index_desc; prepare_index_desc(data_desc, index_desc); ObMacroBlockWriter index_macro_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; ASSERT_EQ(OB_SUCCESS, index_macro_writer.open(index_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner)); ObBaseIndexBlockBuilder builder; ret = builder.init(data_desc.get_desc(), index_desc.get_desc(), allocator_, &index_macro_writer, 0); ASSERT_EQ(OB_SUCCESS, ret); ObIMicroBlockWriter *micro_writer = builder.micro_writer_; const int64_t test_row_num = 100; const int64_t round_row_num = 10; // 10 rows one micro block const int64_t len = 128; ObStorageDatum obj[4]; obj[0].set_int32(0); obj[1].set_string(ObString(1, "1")); obj[2].set_int(-2); obj[3].set_int(0); ObDatumRowkey row_key; row_key.assign(obj, 4); ObIndexBlockRowDesc index_row(index_desc.get_desc()); index_row.row_count_ = 1; index_row.micro_block_count_ = 1; index_row.macro_block_count_ = 1; index_row.row_key_ = row_key; for(int64_t i = 0; i < test_row_num; ++i) { ASSERT_EQ(OB_SUCCESS, builder.append_row(index_row)); if (micro_writer->get_row_count() == round_row_num) { ASSERT_EQ(round_row_num, builder.index_block_aggregator_.get_row_count()); ASSERT_EQ(OB_SUCCESS, builder.append_index_micro_block()); ASSERT_EQ(0, builder.index_block_aggregator_.get_row_count()); } } ObBaseIndexBlockBuilder *next_builder = builder.next_level_builder_; ASSERT_NE(next_builder, nullptr); ASSERT_EQ(test_row_num, next_builder->index_block_aggregator_.get_row_count()); const int64_t row_cnt = test_row_num / round_row_num; ASSERT_EQ(OB_SUCCESS, index_macro_writer.try_switch_macro_block()); const int64_t cur_idx = index_macro_writer.current_index_; ObMacroBlock &cur_macro_block = index_macro_writer.macro_blocks_[cur_idx]; ObSSTableMacroBlockHeader &header = cur_macro_block.macro_header_; ASSERT_EQ(OB_SUCCESS, next_builder->append_index_micro_block()); ASSERT_EQ(1, header.fixed_header_.micro_block_count_); ASSERT_EQ(row_cnt, header.fixed_header_.row_count_); ObBaseIndexBlockBuilder *next_next_builder = next_builder->next_level_builder_; ASSERT_NE(next_next_builder, nullptr); ASSERT_EQ(test_row_num, next_next_builder->index_block_aggregator_.get_row_count()); ASSERT_EQ(test_row_num, next_next_builder->index_block_aggregator_.aggregate_info_.micro_block_count_); ASSERT_EQ(test_row_num, next_next_builder->index_block_aggregator_.aggregate_info_.macro_block_count_); } TEST_F(TestIndexTree, test_multi_writers_with_close) { LOG_INFO("BEGIN TestIndexTree.test_multi_writers_with_close"); int ret = OB_SUCCESS; ObWholeDataStoreDesc data_desc; ObSSTableIndexBuilder sstable_builder(false /* not need writer buffer*/); prepare_index_builder(data_desc, sstable_builder); ObMacroDataSeq data_seq(0); TestIndexTreeStress stress; int thread_ct = 3; stress.init(tenant_id_, thread_ct, row_generate_, data_desc, data_seq, nullptr); ASSERT_EQ(OB_SUCCESS, ret); stress.start(); stress.wait(); ASSERT_EQ(OB_SUCCESS, ret); // test sstable_builder re-entrant 4005 bug ObSSTableMergeRes res; OK(sstable_builder.index_builder_.init( sstable_builder.data_store_desc_.get_desc(), sstable_builder.index_store_desc_.get_desc(), sstable_builder.self_allocator_, &sstable_builder.macro_writer_, 1)); ASSERT_GT(sstable_builder.self_allocator_.used(), 0); res.reset(); sstable_builder.clean_status(); OK(sstable_builder.trim_empty_roots()); OK(sstable_builder.sort_roots()); OK(sstable_builder.index_block_loader_.init(sstable_builder.self_allocator_, CLUSTER_CURRENT_VERSION)); OK(res.prepare_column_checksum_array(sstable_builder.index_store_desc_.get_desc().get_full_stored_col_cnt())); int64_t tmp_seq = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); OK(sstable_builder.merge_index_tree(pre_warm_param, res, tmp_seq, nullptr)); res.reset(); sstable_builder.index_block_loader_.reset(); OK(sstable_builder.close(res)); ASSERT_EQ(false, res.table_backup_flag_.has_backup()); ASSERT_EQ(true, res.table_backup_flag_.has_local()); ObIndexTreeRootBlockDesc &root_desc = res.root_desc_; ASSERT_TRUE(root_desc.is_valid()); ASSERT_TRUE(root_desc.is_mem_type()); char *root_buf = root_desc.buf_; int64_t root_size = root_desc.addr_.size_; ObMicroBlockData root_block(root_buf, root_size); ObMicroBlockReaderHelper reader_helper; ObIMicroBlockReader *micro_reader; ASSERT_EQ(OB_SUCCESS, reader_helper.init(allocator_)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sstable_builder.index_store_desc_.get_desc().row_store_type_, micro_reader)); ret = micro_reader->init(root_block, nullptr); ASSERT_EQ(OB_SUCCESS, ret); ObDatumRow row; ASSERT_EQ(OB_SUCCESS, row.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); void *buf = allocator_.alloc(common::OB_ROW_MAX_COLUMNS_COUNT * sizeof(common::ObObj)); int64_t row_count = micro_reader->row_count(); int64_t total_row_cnt = 0; for (int64_t it = 0; it != row_count; ++it) { ret = micro_reader->get_row(it, row); ASSERT_EQ(OB_SUCCESS, ret); ObString val = row.storage_datums_[TEST_ROWKEY_COLUMN_CNT + 2].get_string(); ObIndexBlockRowHeader *header = reinterpret_cast(val.ptr()); total_row_cnt += header->row_count_; } int32_t last_rowkey_int = row.storage_datums_[0].get_int32(); int64_t test_row_num = TestIndexTree::TEST_ROW_CNT; ASSERT_EQ(thread_ct * test_row_num - 1, last_rowkey_int); ASSERT_EQ(total_row_cnt, test_row_num * thread_ct); LOG_INFO("FINISH TestIndexTree.test_multi_writers_with_close"); } //===================== test secondary meta ================ TEST_F(TestIndexTree, test_merge_info_build_row) { LOG_INFO("BEGIN TestIndexTree.test_merge_info_build_row"); int ret = OB_SUCCESS; const int64_t test_row_num = 10; ObArray data_write_ctxs; ObArray index_write_ctxs; ObMacroMetasArray *merge_info_list = nullptr; ObSSTableMergeRes res; IndexTreeRootCtxList *roots = nullptr; ObSSTableIndexBuilder *sst_builder = nullptr; mock_compaction(test_row_num, data_write_ctxs, index_write_ctxs, merge_info_list, res, roots, sst_builder); const int64_t rowkey_cnt = TEST_ROWKEY_COLUMN_CNT; blocksstable::ObDatumRow datum_row; ASSERT_EQ(OB_SUCCESS, datum_row.init(allocator_, rowkey_cnt + 3)); for (int64_t i = 0; nullptr != merge_info_list && i < merge_info_list->count(); ++i) { ObDataMacroBlockMeta *info = merge_info_list->at(i); ASSERT_NE(nullptr, info); ObDataBlockMetaVal &info_val = info->val_; ASSERT_TRUE(info->is_valid()); ASSERT_EQ(OB_SUCCESS, info->build_row(datum_row, allocator_, CLUSTER_CURRENT_VERSION)); ASSERT_EQ(i, datum_row.storage_datums_[0].get_int()); ObDataMacroBlockMeta parsed_meta; ASSERT_EQ(OB_SUCCESS, parsed_meta.parse_row(datum_row)); ASSERT_TRUE(parsed_meta.is_valid()); ASSERT_EQ(info_val.column_count_, parsed_meta.val_.column_count_); for (int64_t i = 0; i < info_val.column_count_; ++i) { ASSERT_EQ(info_val.column_checksums_[i], parsed_meta.val_.column_checksums_[i]); } } // test deep_copy of macro_meta with given allocator ObArenaAllocator arena_allocator; ObFIFOAllocator safe_allocator; OK(safe_allocator.init(&arena_allocator, OB_MALLOC_BIG_BLOCK_SIZE, ObMemAttr(OB_SERVER_TENANT_ID, ObNewModIds::TEST))); if (NULL != merge_info_list && NULL != merge_info_list->at(0)) { ObDataMacroBlockMeta *copy_meta = nullptr; ObDataMacroBlockMeta &large_meta = *merge_info_list->at(0); int64_t test_col_cnt = 100; for (int64_t i = 0; i < test_col_cnt; ++i) { OK(large_meta.val_.column_checksums_.push_back(i)); } ASSERT_EQ(safe_allocator.current_using_, nullptr); ASSERT_EQ(safe_allocator.normal_used_, 0); OK(large_meta.deep_copy(copy_meta, safe_allocator)); ASSERT_NE(safe_allocator.current_using_, nullptr); safe_allocator.free(copy_meta); copy_meta->~ObDataMacroBlockMeta(); const int64_t end_key_deep_copy_size = 64; // due to end_key::reset by memset ASSERT_EQ(safe_allocator.normal_used_, end_key_deep_copy_size); } sst_builder->~ObSSTableIndexBuilder(); } TEST_F(TestIndexTree, test_meta_builder) { LOG_INFO("BEGIN TestIndexTree.test_meta_builder"); int ret = OB_SUCCESS; const int64_t test_row_num = 100; // do not run small stt ObArray data_write_ctxs; ObArray index_write_ctxs; ObMacroMetasArray *merge_info_list = nullptr; ObSSTableMergeRes res; IndexTreeRootCtxList *roots = nullptr; ObSSTableIndexBuilder *sst_builder = nullptr; mock_compaction(test_row_num, data_write_ctxs, index_write_ctxs, merge_info_list, res, roots, sst_builder); ASSERT_TRUE(res.data_root_desc_.is_valid()); ASSERT_TRUE(res.data_root_desc_.is_mem_type()); int64_t root_size = res.data_root_desc_.addr_.size_; char *root_buf = res.data_root_desc_.buf_; ObMicroBlockData root_block(root_buf, root_size); ObMicroBlockReaderHelper reader_helper; ObIMicroBlockReader *micro_reader; ASSERT_EQ(OB_SUCCESS, reader_helper.init(allocator_)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sst_builder->index_store_desc_.get_desc().row_store_type_, micro_reader)); ASSERT_EQ(OB_SUCCESS, micro_reader->init(root_block, nullptr)); blocksstable::ObDatumRow row; ASSERT_EQ(OB_SUCCESS, row.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); int64_t total_row_cnt = 0; for (int64_t it = 0; it != micro_reader->row_count(); ++it) { ASSERT_EQ(OB_SUCCESS, micro_reader->get_row(it, row)); ObString val = row.storage_datums_[TEST_ROWKEY_COLUMN_CNT + 2].get_string(); ObIndexBlockRowHeader *header = reinterpret_cast(val.ptr()); total_row_cnt += header->row_count_; } ASSERT_EQ(total_row_cnt, test_row_num); sst_builder->~ObSSTableIndexBuilder(); } TEST_F(TestIndexTree, test_meta_builder_data_root) { LOG_INFO("BEGIN TestIndexTree.test_meta_builder_data_root"); int ret = OB_SUCCESS; const int64_t test_row_num = 2; ObArray data_write_ctxs; ObArray index_write_ctxs; ObMacroMetasArray *merge_info_list = nullptr; ObSSTableMergeRes res; IndexTreeRootCtxList *roots = nullptr; ObSSTableIndexBuilder *sst_builder = nullptr; mock_compaction(test_row_num, data_write_ctxs, index_write_ctxs, merge_info_list, res, roots, sst_builder); ASSERT_TRUE(res.data_root_desc_.is_valid()); ASSERT_TRUE(res.data_root_desc_.is_mem_type()); int64_t root_size = res.data_root_desc_.addr_.size_; char *root_buf = res.data_root_desc_.buf_; ObMicroBlockData root_block(root_buf, root_size); ObMicroBlockReaderHelper reader_helper; ObIMicroBlockReader *micro_reader; ASSERT_EQ(OB_SUCCESS, reader_helper.init(allocator_)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sst_builder->index_store_desc_.get_desc().row_store_type_, micro_reader)); ASSERT_EQ(OB_SUCCESS, micro_reader->init(root_block, nullptr)); blocksstable::ObDatumRow row; ASSERT_EQ(OB_SUCCESS, row.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); int64_t total_row_cnt = 0; ASSERT_EQ(micro_reader->row_count(), test_row_num); sst_builder->~ObSSTableIndexBuilder(); } TEST_F(TestIndexTree, test_meta_builder_mem_and_disk) { int ret = OB_SUCCESS; const int64_t test_row_num = 40; ObArray data_write_ctxs; ObArray index_write_ctxs; ObSSTableMergeRes res; ObMacroMetasArray *merge_info_list = nullptr; IndexTreeRootCtxList *roots = nullptr; ObSSTableIndexBuilder *sst_builder = nullptr; // mock compaction meta in disk ret = OB_SUCCESS; ObWholeDataStoreDesc index_desc; char *buf = static_cast (allocator_.alloc(sizeof(ObSSTableIndexBuilder))); ASSERT_NE(nullptr, buf); sst_builder = new (buf) ObSSTableIndexBuilder(false /* not need writer buffer*/); prepare_index_builder(index_desc, *sst_builder); ObWholeDataStoreDesc data_desc; prepare_data_desc(data_desc, sst_builder); ObDatumRow multi_row; ASSERT_EQ(OB_SUCCESS, multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; ObMacroBlockWriter data_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; ASSERT_EQ(OB_SUCCESS, data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param, pre_warm_param, cleaner)); ObDatumRow row; ASSERT_EQ(OB_SUCCESS, row.init(allocator_, TEST_COLUMN_CNT)); for(int64_t i = 0; i < test_row_num - 10; ++i){ ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(i, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); ASSERT_EQ(OB_SUCCESS, data_writer.append_row(multi_row)); ASSERT_EQ(OB_SUCCESS, data_writer.build_micro_block()); ASSERT_EQ(OB_SUCCESS, data_writer.try_switch_macro_block()); if (i != 0 && i % 10 == 0) { ASSERT_EQ(OB_SUCCESS, data_writer.builder_->macro_meta_dumper_.build_and_append_block()); ASSERT_EQ(OB_SUCCESS, data_writer.builder_->macro_meta_dumper_.meta_macro_writer_->try_switch_macro_block()); } } ObMacroBlock &fir_blk = data_writer.macro_blocks_[0]; ObMacroBlock &sec_blk = data_writer.macro_blocks_[1]; ASSERT_EQ(fir_blk.original_size_, fir_blk.data_zsize_); ASSERT_EQ(sec_blk.original_size_, sec_blk.data_zsize_); ASSERT_EQ(OB_SUCCESS, data_writer.close()); ASSERT_TRUE(sst_builder->roots_[0]->meta_block_info_.in_disk()); ASSERT_EQ((test_row_num - 10) / 10, sst_builder->roots_[0]->meta_block_info_.block_write_ctx_->get_macro_block_count()); ObMacroDataSeq data_seq2(data_writer.get_last_macro_seq() + 1); ObMacroBlockWriter data_writer2; ObSSTablePrivateObjectCleaner cleaner2; ASSERT_EQ(OB_SUCCESS, data_writer2.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param, pre_warm_param, cleaner2)); for(int64_t i = 0; i < 10; ++i){ ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(i + test_row_num - 10, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); ASSERT_EQ(OB_SUCCESS, data_writer2.append_row(multi_row)); ASSERT_EQ(OB_SUCCESS, data_writer2.build_micro_block()); ASSERT_EQ(OB_SUCCESS, data_writer2.try_switch_macro_block()); } ASSERT_EQ(OB_SUCCESS, data_writer2.close()); ASSERT_TRUE(sst_builder->roots_[1]->meta_block_info_.in_mem()); ASSERT_EQ(10, sst_builder->roots_[1]->meta_block_info_.get_row_count()); ASSERT_EQ(OB_SUCCESS, sst_builder->close(res)); ASSERT_EQ(false, res.table_backup_flag_.has_backup()); ASSERT_EQ(true, res.table_backup_flag_.has_local()); ObSSTableMergeRes tmp_res; ASSERT_EQ(OB_ERR_UNEXPECTED, data_writer.close()); // not re-entrant OK(sst_builder->close(tmp_res)); // re-entrant ASSERT_EQ(tmp_res.root_desc_.buf_, res.root_desc_.buf_); ASSERT_EQ(tmp_res.data_root_desc_.buf_, res.data_root_desc_.buf_); ASSERT_EQ(tmp_res.data_blocks_cnt_, res.data_blocks_cnt_); OK(data_write_ctxs.push_back(sst_builder->roots_[0]->data_write_ctx_)); roots = &(sst_builder->roots_); ASSERT_TRUE(res.data_root_desc_.is_valid()); ASSERT_TRUE(res.data_root_desc_.addr_.is_memory()); int64_t root_size = res.data_root_desc_.addr_.size_; char *root_buf = res.data_root_desc_.buf_; ObMicroBlockData root_block(root_buf, root_size); ObMicroBlockReaderHelper reader_helper; ObIMicroBlockReader *micro_reader; ASSERT_EQ(OB_SUCCESS, reader_helper.init(allocator_)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sst_builder->index_store_desc_.get_desc().row_store_type_, micro_reader)); ASSERT_EQ(OB_SUCCESS, micro_reader->init(root_block, nullptr)); blocksstable::ObDatumRow meta_row; ASSERT_EQ(OB_SUCCESS, meta_row.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); int64_t total_row_cnt = 0; for (int64_t it = 0; it != micro_reader->row_count(); ++it) { ASSERT_EQ(OB_SUCCESS, micro_reader->get_row(it, meta_row)); ObString val = meta_row.storage_datums_[TEST_ROWKEY_COLUMN_CNT + 2].get_string(); ObIndexBlockRowHeader *header = reinterpret_cast(val.ptr()); total_row_cnt += header->row_count_; } ASSERT_EQ(total_row_cnt, test_row_num); sst_builder->~ObSSTableIndexBuilder(); } TEST_F(TestIndexTree, test_index_block_dumper_get_row_in_mem) { const int64_t test_row_num = 10; ObArray data_write_ctxs; ObArray index_write_ctxs; ObMacroMetasArray *merge_info_list = nullptr; ObSSTableMergeRes res; IndexTreeRootCtxList *roots = nullptr; ObSSTableIndexBuilder *sst_builder = nullptr; mock_compaction(test_row_num, data_write_ctxs, index_write_ctxs, merge_info_list, res, roots, sst_builder); ObDatumRow leaf_row; ASSERT_EQ(OB_SUCCESS, leaf_row.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); ObWholeDataStoreDesc data_desc; ObSSTableIndexBuilder sstable_builder(false); prepare_index_builder(data_desc, sstable_builder); ObWholeDataStoreDesc index_desc; prepare_index_desc(data_desc, index_desc); ObArenaAllocator row_allocator_; // prepare ObBaseIndexBlockDumper macro_meta_dumper; ObIndexBlockInfo meta_block_info_; ASSERT_EQ(OB_SUCCESS, macro_meta_dumper.init(sst_builder->index_store_desc_.get_desc(), sst_builder->container_store_desc_, sst_builder->index_store_desc_.get_desc().sstable_index_builder_, allocator_, allocator_, true)); ASSERT_NE(nullptr, merge_info_list); ASSERT_EQ(test_row_num, merge_info_list->count()); // test unorder ObDataMacroBlockMeta *info = merge_info_list->at(merge_info_list->count() - 1); ASSERT_EQ(OB_SUCCESS, info->build_row(leaf_row, allocator_, CLUSTER_CURRENT_VERSION)); ASSERT_EQ(OB_SUCCESS, macro_meta_dumper.append_row(leaf_row)); info = merge_info_list->at(0); ASSERT_EQ(OB_SUCCESS, info->build_row(leaf_row, allocator_, CLUSTER_CURRENT_VERSION)); ASSERT_EQ(OB_ROWKEY_ORDER_ERROR, macro_meta_dumper.append_row(leaf_row)); macro_meta_dumper.reset(); ASSERT_EQ(OB_SUCCESS, macro_meta_dumper.init(sst_builder->index_store_desc_.get_desc(), sst_builder->container_store_desc_, sst_builder->index_store_desc_.get_desc().sstable_index_builder_, allocator_, allocator_, true)); // genenrate order row for (int64_t i = 0; nullptr != merge_info_list && i < merge_info_list->count(); ++i) { ObDataMacroBlockMeta *info = merge_info_list->at(i); ASSERT_EQ(OB_SUCCESS, info->build_row(leaf_row, allocator_, CLUSTER_CURRENT_VERSION)); ASSERT_EQ(OB_SUCCESS, macro_meta_dumper.append_row(leaf_row)); } // close ASSERT_EQ(OB_SUCCESS, macro_meta_dumper.close(meta_block_info_)); ASSERT_TRUE(meta_block_info_.in_mem()); ASSERT_EQ(merge_info_list->count(), meta_block_info_.get_row_count()); ASSERT_EQ(1, meta_block_info_.get_micro_block_count()); ObIndexBlockLoader index_block_loader; ASSERT_EQ(OB_SUCCESS, index_block_loader.init(allocator_, CLUSTER_CURRENT_VERSION)); ASSERT_EQ(OB_SUCCESS, index_block_loader.open(meta_block_info_)); ObDataMacroBlockMeta macro_meta; ObDatumRow load_row; ASSERT_EQ(OB_SUCCESS, load_row.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); int tmp_ret = OB_SUCCESS; int iter_cnt = 0; while (OB_SUCCESS == tmp_ret) { tmp_ret = index_block_loader.get_next_row(load_row); STORAGE_LOG(DEBUG, "loader get next row", K(tmp_ret), K(load_row)); if (OB_SUCCESS == tmp_ret) { ASSERT_EQ(OB_SUCCESS, merge_info_list->at(iter_cnt)->build_row(leaf_row, allocator_, CLUSTER_CURRENT_VERSION)); for (int64_t i = 0; i < TEST_ROWKEY_COLUMN_CNT + 3; ++i) { ASSERT_TRUE(ObDatum::binary_equal(load_row.storage_datums_[i], leaf_row.storage_datums_[i])); } ++iter_cnt; } } ASSERT_EQ(OB_ITER_END, tmp_ret); ASSERT_EQ(iter_cnt, meta_block_info_.get_row_count()); sst_builder->~ObSSTableIndexBuilder(); } TEST_F(TestIndexTree, test_index_block_dumper_get_row_in_disk) { const int64_t test_row_num = 40; ObArray data_write_ctxs; ObArray index_write_ctxs; ObMacroMetasArray *merge_info_list = nullptr; ObSSTableMergeRes res; IndexTreeRootCtxList *roots = nullptr; ObSSTableIndexBuilder *sst_builder = nullptr; mock_compaction(test_row_num, data_write_ctxs, index_write_ctxs, merge_info_list, res, roots, sst_builder); ObDatumRow leaf_row; ASSERT_EQ(OB_SUCCESS, leaf_row.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); ObWholeDataStoreDesc data_desc; ObSSTableIndexBuilder sstable_builder(false); prepare_index_builder(data_desc, sstable_builder); ObWholeDataStoreDesc index_desc; prepare_index_desc(data_desc, index_desc); ObArenaAllocator row_allocator_; // prepare ObBaseIndexBlockDumper macro_meta_dumper; ObIndexBlockInfo meta_block_info_; ASSERT_EQ(OB_SUCCESS, macro_meta_dumper.init(sst_builder->index_store_desc_.get_desc(), sst_builder->container_store_desc_, sst_builder->index_store_desc_.get_desc().sstable_index_builder_, allocator_, allocator_, true)); // genenrate row for (int64_t i = 0; nullptr != merge_info_list && i < merge_info_list->count(); ++i) { ObDataMacroBlockMeta *info = merge_info_list->at(i); ASSERT_EQ(OB_SUCCESS, info->build_row(leaf_row, allocator_, CLUSTER_CURRENT_VERSION)); ASSERT_EQ(OB_SUCCESS, macro_meta_dumper.append_row(leaf_row)); if (i != 0 && i % 10 == 0) { ASSERT_EQ(OB_SUCCESS, macro_meta_dumper.build_and_append_block()); ASSERT_EQ(OB_SUCCESS, macro_meta_dumper.meta_macro_writer_->try_switch_macro_block()); } } // close ASSERT_EQ(OB_SUCCESS, macro_meta_dumper.close(meta_block_info_)); ASSERT_EQ(merge_info_list->count(), meta_block_info_.get_row_count()); ASSERT_TRUE(meta_block_info_.in_disk()); ASSERT_EQ(test_row_num / 10, meta_block_info_.block_write_ctx_->get_macro_block_count()); ObIndexBlockLoader index_block_loader; ASSERT_EQ(OB_SUCCESS, index_block_loader.init(allocator_, CLUSTER_CURRENT_VERSION)); ASSERT_EQ(OB_SUCCESS, index_block_loader.open(meta_block_info_)); ObDataMacroBlockMeta macro_meta; ObDatumRow load_row; ASSERT_EQ(OB_SUCCESS, load_row.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); for (int64_t j = 0; j < meta_block_info_.get_row_count(); j++) { ASSERT_EQ(OB_SUCCESS, index_block_loader.get_next_row(load_row)); ASSERT_EQ(OB_SUCCESS, merge_info_list->at(j)->build_row(leaf_row, allocator_, CLUSTER_CURRENT_VERSION)); for (int64_t i = 0; i < TEST_ROWKEY_COLUMN_CNT + 3; ++i) { ASSERT_TRUE(ObDatum::binary_equal(load_row.storage_datums_[i], leaf_row.storage_datums_[i])); } } ASSERT_EQ(OB_ITER_END, index_block_loader.get_next_row(load_row)); sst_builder->~ObSSTableIndexBuilder(); } TEST_F(TestIndexTree, test_index_block_dumper_get_micro_in_disk) { const int64_t test_row_num = 40; ObArray data_write_ctxs; ObArray index_write_ctxs; ObMacroMetasArray *merge_info_list = nullptr; ObSSTableMergeRes res; IndexTreeRootCtxList *roots = nullptr; ObSSTableIndexBuilder *sst_builder = nullptr; mock_compaction(test_row_num, data_write_ctxs, index_write_ctxs, merge_info_list, res, roots, sst_builder); ObDatumRow leaf_row; ASSERT_EQ(OB_SUCCESS, leaf_row.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); ObWholeDataStoreDesc data_desc; ObSSTableIndexBuilder sstable_builder(false); prepare_index_builder(data_desc, sstable_builder); ObWholeDataStoreDesc index_desc; prepare_index_desc(data_desc, index_desc); ObArenaAllocator row_allocator_; // prepare ObBaseIndexBlockDumper macro_meta_dumper; ObIndexBlockInfo meta_block_info_; ASSERT_EQ(OB_SUCCESS, macro_meta_dumper.init(sst_builder->index_store_desc_.get_desc(), sst_builder->container_store_desc_, sst_builder->index_store_desc_.get_desc().sstable_index_builder_, allocator_, allocator_, true)); // genenrate row for (int64_t i = 0; nullptr != merge_info_list && i < merge_info_list->count(); ++i) { ObDataMacroBlockMeta *info = merge_info_list->at(i); ASSERT_EQ(OB_SUCCESS, info->build_row(leaf_row, allocator_, CLUSTER_CURRENT_VERSION)); ASSERT_EQ(OB_SUCCESS, macro_meta_dumper.append_row(leaf_row)); if (i != 0 && i % 10 == 0) { ASSERT_EQ(OB_SUCCESS, macro_meta_dumper.build_and_append_block()); ASSERT_EQ(OB_SUCCESS, macro_meta_dumper.meta_macro_writer_->try_switch_macro_block()); } } // close ASSERT_EQ(OB_SUCCESS, macro_meta_dumper.close(meta_block_info_)); ASSERT_TRUE(meta_block_info_.in_disk()); ASSERT_EQ(test_row_num / 10, meta_block_info_.block_write_ctx_->get_macro_block_count()); ObIndexBlockLoader index_block_loader; ASSERT_EQ(OB_SUCCESS, index_block_loader.init(allocator_, CLUSTER_CURRENT_VERSION)); ASSERT_EQ(OB_SUCCESS, index_block_loader.open(meta_block_info_)); ObMicroBlockDesc load_micro_block_desc; for (int64_t j = 0; j < meta_block_info_.get_micro_block_count(); j++) { ASSERT_EQ(OB_SUCCESS, index_block_loader.get_next_micro_block_desc(load_micro_block_desc, sst_builder->container_store_desc_, row_allocator_)); } ASSERT_EQ(OB_ITER_END, index_block_loader.get_next_micro_block_desc(load_micro_block_desc, sst_builder->container_store_desc_, row_allocator_)); sst_builder->~ObSSTableIndexBuilder(); } TEST_F(TestIndexTree, test_single_row_desc) { LOG_INFO("BEGIN TestIndexTree.test_single_row_desc"); // we need use 4K columns to test ObMetaIndexBlockBuilder::build_single_macro_row_desc prepare_4K_cols_schema(); ObSSTableIndexBuilder sstable_builder(false /* not need writer buffer*/); ObWholeDataStoreDesc data_desc; prepare_data_desc(data_desc, &sstable_builder); ObWholeDataStoreDesc index_desc; prepare_index_desc(data_desc, index_desc); prepare_index_builder(index_desc, sstable_builder); ObMacroBlockWriter data_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; OK(data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner)); ObDatumRow row; const int64_t test_col_cnt = 4096; OK(row.init(allocator_, test_col_cnt)); row_generate_.reset(); OK(row_generate_.init(table_schema_, &allocator_)); OK(row_generate_.get_next_row(0, row)); ObDatumRow multi_row; OK(multi_row.init(allocator_, test_col_cnt + 10)); ObDmlFlag dml = DF_INSERT; convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); OK(data_writer.append_row(multi_row)); OK(data_writer.close()); ObSSTableMergeRes res; sstable_builder.optimization_mode_ = ObSSTableIndexBuilder::ObSpaceOptimizationMode::DISABLE; OK(sstable_builder.close(res)); // test rebuild sstable ObSSTableIndexBuilder sstable_builder2(false /* not need writer buffer*/); prepare_index_builder(index_desc, sstable_builder2); ObIndexBlockRebuilder rebuilder; OK(rebuilder.init(sstable_builder2, nullptr, ObITable::TableKey())); ObMacroBlockHandle macro_handle; macro_handle.reset(); ObMacroBlockReadInfo info; const int64_t macro_block_size = 2 * 1024 * 1024; info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ); info.offset_ = 0; info.size_ = macro_block_size; info.macro_block_id_ = res.data_block_ids_[0]; info.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS; ASSERT_NE(nullptr, info.buf_ = reinterpret_cast(allocator_.alloc(info.size_))); OK(ObBlockManager::read_block(info, macro_handle)); OK(rebuilder.append_macro_row(macro_handle.get_buffer(), macro_handle.get_data_size(), info.macro_block_id_, -1 /*absolute_row_offset*/)); OK(rebuilder.close()); ObSSTableMergeRes res2; OK(sstable_builder2.close(res2)); // test rebuild sstable by another append_macro_row ObSSTableIndexBuilder sstable_builder3(false /* not need writer buffer*/); prepare_index_builder(index_desc, sstable_builder3); sstable_builder3.index_store_desc_.get_desc().static_desc_->major_working_cluster_version_ = DATA_VERSION_4_1_0_0; sstable_builder3.container_store_desc_.static_desc_->major_working_cluster_version_ = DATA_VERSION_4_1_0_0; ObIndexBlockRebuilder other_rebuilder; OK(other_rebuilder.init(sstable_builder3, nullptr, ObITable::TableKey())); ObDataMacroBlockMeta macro_meta; ObDatumRow meta_row; ObIndexBlockLoader index_block_loader; OK(index_block_loader.init(allocator_, CLUSTER_CURRENT_VERSION)); OK(index_block_loader.open(sstable_builder.roots_[0]->meta_block_info_)); OK(meta_row.init(allocator_, sstable_builder.container_store_desc_.get_row_column_count())); OK(index_block_loader.get_next_row(meta_row)); OK(macro_meta.parse_row(meta_row)); OK(other_rebuilder.append_macro_row(macro_meta)); OK(other_rebuilder.close()); ObSSTableMergeRes res3; sstable_builder3.optimization_mode_ = ObSSTableIndexBuilder::ObSpaceOptimizationMode::ENABLE; OK(sstable_builder3.close(res3)); LOG_INFO("FINISH TestIndexTree.test_single_row_desc"); } TEST_F(TestIndexTree, test_extend_micro_block_size) { LOG_INFO("BEGIN TestIndexTree.test_extend_micro_block_size"); int ret = OB_SUCCESS; const int64_t test_row_num = 20; ObArray data_write_ctxs; ObArray index_write_ctxs; ObMacroMetasArray *merge_info_list = nullptr; ObSSTableMergeRes res; IndexTreeRootCtxList *roots = nullptr; ObWholeDataStoreDesc data_desc; char *buf = static_cast (allocator_.alloc(sizeof(ObSSTableIndexBuilder))); ObSSTableIndexBuilder *sstable_builder = new (buf) ObSSTableIndexBuilder(false /* not need writer buffer*/); prepare_data_desc(data_desc, sstable_builder); data_desc.get_desc().micro_block_size_ = 248; // only push one index block row will exceed 248. ret = sstable_builder->init(data_desc.get_desc()); ASSERT_EQ(OB_SUCCESS, ret); ObDatumRow multi_row; ASSERT_EQ(OB_SUCCESS, multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; ObMacroBlockWriter data_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; ASSERT_EQ(OB_SUCCESS, data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner)); ObDatumRow row; ASSERT_EQ(OB_SUCCESS, row.init(allocator_, TEST_COLUMN_CNT)); for(int64_t i = 0; i < test_row_num; ++i){ ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(i, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); ASSERT_EQ(OB_SUCCESS, data_writer.append_row(multi_row)); ASSERT_EQ(OB_SUCCESS, data_writer.build_micro_block()); ASSERT_EQ(OB_SUCCESS, data_writer.try_switch_macro_block()); } ObMacroBlock &fir_blk = data_writer.macro_blocks_[0]; ObMacroBlock &sec_blk = data_writer.macro_blocks_[1]; ASSERT_EQ(fir_blk.original_size_, fir_blk.data_zsize_); ASSERT_EQ(sec_blk.original_size_, sec_blk.data_zsize_); ASSERT_EQ(OB_SUCCESS, data_writer.close()); ASSERT_EQ(OB_SUCCESS, sstable_builder->close(res)); ObSSTableMergeRes tmp_res; ASSERT_EQ(OB_ERR_UNEXPECTED, data_writer.close()); // not re-entrant OK(sstable_builder->close(tmp_res)); // re-entrant ASSERT_EQ(tmp_res.root_desc_.buf_, res.root_desc_.buf_); ASSERT_EQ(tmp_res.data_root_desc_.buf_, res.data_root_desc_.buf_); ASSERT_EQ(tmp_res.data_blocks_cnt_, res.data_blocks_cnt_); sstable_builder->~ObSSTableIndexBuilder(); } TEST_F(TestIndexTree, test_data_block_checksum) { LOG_INFO("BEGIN TestIndexTree.test_data_block_checksum"); int ret = OB_SUCCESS; const int64_t test_row_num = 10; ObArray data_write_ctxs; ObArray index_write_ctxs; ObMacroMetasArray *merge_info_list = nullptr; ObSSTableMergeRes res; IndexTreeRootCtxList *roots = nullptr; ObSSTableIndexBuilder *sst_builder = nullptr; mock_compaction(test_row_num, data_write_ctxs, index_write_ctxs, merge_info_list, res, roots, sst_builder); ASSERT_EQ(test_row_num, merge_info_list->count()); int64_t max_reported_column_id = TEST_COLUMN_CNT + 2; ObArray column_metas; ObArray column_default_checksum; for (int64_t i = 0; i < TEST_COLUMN_CNT + 2; ++i) { ObSSTableColumnMeta column_meta; column_meta.column_id_ = i; ASSERT_EQ(OB_SUCCESS, column_metas.push_back(column_meta)); ASSERT_EQ(OB_SUCCESS, column_default_checksum.push_back(column_meta.column_default_checksum_)); } for (int64_t i = TEST_COLUMN_CNT + 2; i < max_reported_column_id; ++i) { ObSSTableColumnMeta column_meta; column_meta.column_id_ = i; column_meta.column_default_checksum_ = i - TEST_COLUMN_CNT; ASSERT_EQ(OB_SUCCESS, column_metas.push_back(column_meta)); ASSERT_EQ(OB_SUCCESS, column_default_checksum.push_back(column_meta.column_default_checksum_)); } sst_builder->~ObSSTableIndexBuilder(); } TEST_F(TestIndexTree, test_reuse_macro_block) { int ret = OB_SUCCESS; const int64_t test_row_num = 10; ObArray data_write_ctxs; ObArray index_write_ctxs; ObMacroMetasArray *macro_metas = nullptr; ObSSTableMergeRes res; IndexTreeRootCtxList *roots = nullptr; ObSSTableIndexBuilder *sst_builder = nullptr; mock_compaction(test_row_num, data_write_ctxs, index_write_ctxs, macro_metas, res, roots, sst_builder); ASSERT_EQ(test_row_num, macro_metas->count()); ObWholeDataStoreDesc data_desc; ObSSTableIndexBuilder sstable_builder(false/* not need buffer*/); prepare_index_builder(data_desc, sstable_builder); ObMacroBlockWriter data_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; OK(data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner)); for (int64_t i = 0; i < test_row_num; ++i) { ObMacroBlockDesc macro_desc; macro_desc.macro_meta_ = macro_metas->at(i); OK(data_writer.append_macro_block(macro_desc, nullptr)); } OK(data_writer.close()); ObSSTableMergeRes reused_res; OK(sstable_builder.close(reused_res)); ASSERT_EQ(res.data_blocks_cnt_, reused_res.data_blocks_cnt_); ASSERT_EQ(res.row_count_, reused_res.row_count_); for (int64_t i = 0; i < res.data_blocks_cnt_; ++i) { ASSERT_EQ(res.data_block_ids_.at(i), reused_res.data_block_ids_.at(i)); } ASSERT_EQ(false, res.table_backup_flag_.has_backup()); ASSERT_EQ(true, res.table_backup_flag_.has_local()); sst_builder->~ObSSTableIndexBuilder(); } TEST_F(TestIndexTree, DISABLED_test_writer_try_to_append_row) { LOG_INFO("BEGIN TestIndexTree.DISABLED_test_writer_try_to_append_row"); // fix try_to_append_row, enable this case int ret = OB_SUCCESS; ObDatumRow row; OK(row.init(allocator_, TEST_COLUMN_CNT)); ObDatumRow multi_row; OK(multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; ObWholeDataStoreDesc data_desc; ObMacroBlockWriter data_writer; ObMicroBlockDesc micro_block_desc; prepare_data_desc(data_desc, nullptr); data_desc.get_desc().row_store_type_ = ObRowStoreType::ENCODING_ROW_STORE; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; OK(data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner)); ObIMicroBlockWriter *micro_writer = data_writer.micro_writer_; const int64_t test_num = 1; // only add one row to avoid encoding optimization for (int64_t i = 0; i < test_num; ++i) { OK(row_generate_.get_next_row(i, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); OK(micro_writer->append_row(multi_row)); } OK(micro_writer->build_micro_block_desc(micro_block_desc)); const int64_t need_store_size = micro_block_desc.buf_size_ + micro_block_desc.header_->header_size_; { // set size upper bound after reuse micro_writer->reuse(); micro_writer->set_block_size_upper_bound(need_store_size - 1); // if append succeeded, set_block_size_upper_bound has not limited the block size ASSERT_EQ(-4024, micro_writer->append_row(multi_row)); } { // set size upper bound after reset ObSSTablePrivateObjectCleaner cleaner; OK(data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner)); micro_writer = data_writer.micro_writer_; micro_writer->set_block_size_upper_bound(need_store_size - 1); // if append succeeded, set_block_size_upper_bound has not limited the block size ASSERT_EQ(-4024, micro_writer->append_row(multi_row)); } LOG_INFO("FINISH TestIndexTree.DISABLED_test_writer_try_to_append_row"); } TEST_F(TestIndexTree, test_writer_try_to_append_row) { LOG_INFO("BEGIN TestIndexTree.test_writer_try_to_append_row"); // If fail to pass this test, please check ObIMicroBlockWriter::try_to_append_row int ret = OB_SUCCESS; ObDatumRow row; OK(row.init(allocator_, TEST_COLUMN_CNT)); ObDatumRow multi_row; OK(multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; ObWholeDataStoreDesc data_desc; ObMacroBlockWriter data_writer; ObMicroBlockDesc micro_block_desc; prepare_data_desc(data_desc, nullptr); ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); for (int64_t j = 0; j < ObRowStoreType::MAX_ROW_STORE; ++j) { data_desc.get_desc().row_store_type_ = static_cast(j); ObSSTablePrivateObjectCleaner cleaner; OK(data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner)); ObIMicroBlockWriter *micro_writer = data_writer.micro_writer_; const int64_t test_num = 10; for (int64_t i = 0; i < test_num; ++i) { OK(row_generate_.get_next_row(i, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); OK(micro_writer->append_row(multi_row)); } OK(micro_writer->build_micro_block_desc(micro_block_desc)); int64_t estimate_size = 0; if (ObRowStoreType::FLAT_ROW_STORE == j) { estimate_size = micro_block_desc.original_size_ + micro_block_desc.header_->header_size_; } else if (ObRowStoreType::CS_ENCODING_ROW_STORE == j) { estimate_size = micro_block_desc.original_size_ + static_cast(micro_writer)->all_headers_size_; } else { estimate_size = micro_block_desc.original_size_ + static_cast(micro_writer)->header_size_; } { // set size upper bound after reuse micro_writer->reuse(); micro_writer->set_block_size_upper_bound(estimate_size); for (int64_t i = 0; i < test_num; ++i) { OK(row_generate_.get_next_row(i, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); OK(micro_writer->append_row(multi_row)); } } { // set size upper bound after reset ObSSTablePrivateObjectCleaner cleaner; OK(data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner)); micro_writer = data_writer.micro_writer_; micro_writer->set_block_size_upper_bound(estimate_size); for (int64_t i = 0; i < test_num; ++i) { OK(row_generate_.get_next_row(i, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); OK(micro_writer->append_row(multi_row)); } } } LOG_INFO("FINISH TestIndexTree.test_writer_try_to_append_row"); } TEST_F(TestIndexTree, test_rebuilder) { LOG_INFO("BEGIN TestIndexTree.test_rebuilder"); int ret = OB_SUCCESS; ObDatumRow row; OK(row.init(allocator_, TEST_COLUMN_CNT)); ObDatumRow multi_row; OK(multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; ObWholeDataStoreDesc data_desc1; ObWholeDataStoreDesc data_desc2; ObSSTableIndexBuilder sstable_builder1(false /* not need writer buffer*/); ObSSTableIndexBuilder sstable_builder2(false /* not need writer buffer*/); prepare_index_builder(data_desc1, sstable_builder1); prepare_index_builder(data_desc2, sstable_builder2); // write data int64_t test_num = 10; ObMacroBlockWriter data_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; OK(data_writer.open(data_desc1.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner)); for (int64_t i = 0; i < test_num; ++i) { OK(row_generate_.get_next_row(i, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); OK(data_writer.append_row(multi_row)); OK(data_writer.build_micro_block()); OK(data_writer.try_switch_macro_block()); } OK(data_writer.close()); ObSSTableMergeRes res1; OK(sstable_builder1.close(res1)); ASSERT_EQ(false, res1.table_backup_flag_.has_backup()); ASSERT_EQ(true, res1.table_backup_flag_.has_local()); ObIndexBlockRebuilder rebuilder; OK(rebuilder.init(sstable_builder2, nullptr, ObITable::TableKey())); // read data blocks ObMacroBlockReadInfo info; ObMacroBlockHandle macro_handle; const int64_t macro_block_size = 2 * 1024 * 1024; info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ); info.offset_ = 0; info.size_ = macro_block_size; ObIArray &data_block_ids = res1.data_block_ids_; ObDataMacroBlockMeta *macro_meta = nullptr; ObArenaAllocator meta_allocator; ASSERT_NE(nullptr, info.buf_ = reinterpret_cast(meta_allocator.alloc(info.size_))); ASSERT_EQ(data_block_ids.count(), test_num); // rebuild index tree for (int64_t i = 0; OB_SUCC(ret) && i < data_block_ids.count(); ++i) { MacroBlockId &cur_id = data_block_ids.at(i); info.macro_block_id_ = cur_id; info.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS; macro_handle.reset(); OK(ObBlockManager::read_block(info, macro_handle)); ASSERT_NE(macro_handle.get_buffer(), nullptr); ASSERT_EQ(macro_handle.get_data_size(), macro_block_size); OK(rebuilder.get_macro_meta(macro_handle.get_buffer(), macro_block_size, cur_id, meta_allocator, macro_meta)); OK(rebuilder.append_macro_row(*macro_meta)); } OK(rebuilder.close()); ObSSTableMergeRes res2; OK(sstable_builder2.close(res2)); ASSERT_EQ(false, res2.table_backup_flag_.has_backup()); ASSERT_EQ(true, res2.table_backup_flag_.has_local()); // compare merge res ASSERT_EQ(res1.root_desc_.height_, res2.root_desc_.height_); ASSERT_EQ(res1.root_desc_.height_, 2); ObMicroBlockData root_block1(res1.root_desc_.buf_, res1.root_desc_.addr_.size_); ObMicroBlockData root_block2(res2.root_desc_.buf_, res2.root_desc_.addr_.size_); ObIMicroBlockReader *micro_reader1; ObIMicroBlockReader *micro_reader2; ObMicroBlockReaderHelper reader_helper; ObIMicroBlockReader *micro_reader; ASSERT_EQ(OB_SUCCESS, reader_helper.init(allocator_)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sstable_builder1.index_store_desc_.get_desc().row_store_type_, micro_reader1)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sstable_builder2.index_store_desc_.get_desc().row_store_type_, micro_reader2)); OK(micro_reader1->init(root_block1, nullptr)); OK(micro_reader2->init(root_block2, nullptr)); blocksstable::ObDatumRow row1; blocksstable::ObDatumRow row2; OK(row1.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); OK(row2.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); ASSERT_EQ(micro_reader1->row_count(), micro_reader2->row_count()); ASSERT_EQ(micro_reader1->row_count(), test_num); for (int64_t it = 0; it != micro_reader1->row_count(); ++it) { OK(micro_reader1->get_row(it, row1)); OK(micro_reader2->get_row(it, row2)); ObString val1 = row1.storage_datums_[TEST_ROWKEY_COLUMN_CNT + 2].get_string(); ObString val2 = row2.storage_datums_[TEST_ROWKEY_COLUMN_CNT + 2].get_string(); ASSERT_EQ(val1, val2); } LOG_INFO("FINISH TestIndexTree.test_rebuilder"); } TEST_F(TestIndexTree, test_diagnose_dump) { LOG_INFO("BEGIN TestIndexTree.test_diagnose_dump"); int ret = OB_SUCCESS; ObWholeDataStoreDesc data_desc; ObSSTableIndexBuilder sstable_builder(false /* not need writer buffer*/); prepare_index_builder(data_desc, sstable_builder); ObDatumRow multi_row; ASSERT_EQ(OB_SUCCESS, multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; ObMacroBlockWriter data_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; ASSERT_EQ(OB_SUCCESS, data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner)); ObDatumRow row; ASSERT_EQ(OB_SUCCESS, row.init(allocator_, TEST_COLUMN_CNT)); for(int64_t i = 0; i < 10; ++i){ ASSERT_EQ(OB_SUCCESS, row_generate_.get_next_row(i, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); ASSERT_EQ(OB_SUCCESS, data_writer.append_row(multi_row)); } OK(data_writer.build_micro_block()); OK(row_generate_.get_next_row(10, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); OK(data_writer.append_row(multi_row)); data_writer.dump_block_and_writer_buffer(); data_writer.micro_writer_->dump_diagnose_info(); LOG_INFO("FINISH TestIndexTree.test_diagnose_dump"); } TEST_F(TestIndexTree, test_estimate_meta_block_size) { LOG_INFO("BEGIN TestIndexTree.test_estimate_meta_block_size"); int ret = OB_SUCCESS; ObWholeDataStoreDesc index_desc; ObSSTableIndexBuilder sstable_builder(false /* not need writer buffer*/); ObWholeDataStoreDesc data_desc; prepare_data_desc(data_desc, &sstable_builder); prepare_index_desc(data_desc, index_desc); OK(sstable_builder.init(index_desc.get_desc())); ObMacroBlockWriter data_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; OK(data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner)); ObDatumRow multi_row; OK(multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDatumRow row; OK(row.init(allocator_, TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; for(int64_t i = 0; i < 10; ++i) { OK(row_generate_.get_next_row(i, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); STORAGE_LOG(INFO, "append row", K(i), K(multi_row)); OK(data_writer.append_row(multi_row)); } const ObDatumRowkey& last_data_key = data_writer.last_key_; int64_t estimate_meta_block_size = 0; OK(data_writer.builder_->cal_macro_meta_block_size(last_data_key, estimate_meta_block_size)); ObMacroBlock ¯o_block = data_writer.macro_blocks_[0]; ObStorageObjectHandle ¯o_handle = data_writer.macro_handles_[0]; OK(data_writer.build_micro_block()); OK(data_writer.builder_->generate_macro_row(macro_block, macro_handle.get_macro_id(), -1/*ddl_start_row_offset*/)); const ObSSTableMacroBlockHeader ¯o_header_ = macro_block.macro_header_; ASSERT_EQ(macro_header_.fixed_header_.idx_block_offset_ + macro_header_.fixed_header_.idx_block_size_, macro_header_.fixed_header_.meta_block_offset_); ASSERT_GT(macro_header_.fixed_header_.meta_block_size_, 0); ASSERT_GE(estimate_meta_block_size, macro_header_.fixed_header_.meta_block_size_); ObDataMacroBlockMeta *macro_meta = &data_writer.builder_->macro_meta_; const int64_t val_max_size = macro_meta->val_.get_max_serialize_size(CLUSTER_CURRENT_VERSION); macro_meta->val_.macro_id_ = ObIndexBlockRowHeader::DEFAULT_IDX_ROW_MACRO_ID; ASSERT_GE(macro_meta->val_.get_serialize_size(CLUSTER_CURRENT_VERSION) + estimate_meta_block_size - val_max_size, macro_header_.fixed_header_.meta_block_size_); LOG_INFO("FINISH TestIndexTree.test_estimate_meta_block_size"); } TEST_F(TestIndexTree, test_cg_row_offset) { LOG_INFO("BEGIN TestIndexTree.test_cg_row_offset"); int ret = OB_SUCCESS; const int64_t test_row_num = 500; ObArray data_write_ctxs; ObArray index_write_ctxs; ObMacroMetasArray *merge_info_list = nullptr; ObSSTableMergeRes res; IndexTreeRootCtxList *roots = nullptr; ObWholeDataStoreDesc data_desc; ObSSTableIndexBuilder sstable_builder(false /* not need writer buffer*/); prepare_index_builder(data_desc, sstable_builder); ObWholeDataStoreDesc index_desc; prepare_index_desc(data_desc, index_desc); vector size_arr = { 1L<<10, 1L<<12, 1L<<14 }; ObMicroBlockReaderHelper reader_helper; ObIMicroBlockReader *micro_reader; ASSERT_EQ(OB_SUCCESS, reader_helper.init(allocator_)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sstable_builder.index_store_desc_.get_desc().row_store_type_, micro_reader)); for (int i = 0; i < size_arr.size(); ++i) { ObSSTableIndexBuilder *sst_builder = nullptr; const int64_t micro_block_size = size_arr.at(i); index_schema_.set_block_size(micro_block_size); mock_compaction(test_row_num, data_write_ctxs, index_write_ctxs, merge_info_list, res, roots, sst_builder); ObMicroBlockData root_block(res.root_desc_.buf_, res.root_desc_.addr_.size_); ObDatumRow row; OK(row.init(allocator_, index_desc.get_desc().col_desc_->row_column_count_)); OK(micro_reader->init(root_block, nullptr)); ObIndexBlockRowParser idx_row_parser; int64_t rows_cnt = -1; for (int64_t it = 0; it != micro_reader->row_count(); ++it) { idx_row_parser.reset(); OK(micro_reader->get_row(it, row)); OK(idx_row_parser.init(TEST_ROWKEY_COLUMN_CNT + 2, row)); rows_cnt += idx_row_parser.header_->row_count_; ASSERT_EQ(rows_cnt, idx_row_parser.get_row_offset()); } sst_builder->~ObSSTableIndexBuilder(); } LOG_INFO("FINISH TestIndexTree.test_cg_row_offset"); } TEST_F(TestIndexTree, test_absolute_offset) { LOG_INFO("BEGIN TestIndexTree.test_absolute_offset"); int ret = OB_SUCCESS; const int64_t test_row_num = 500; ObArray data_write_ctxs; ObArray index_write_ctxs; ObMacroMetasArray *merge_info_list = nullptr; ObSSTableMergeRes res; IndexTreeRootCtxList *roots = nullptr; ObWholeDataStoreDesc data_desc; ObSSTableIndexBuilder sstable_builder(false /* not need writer buffer*/); prepare_data_desc(data_desc, &sstable_builder); data_desc.get_desc().micro_block_size_ = 512; // make test index tree height > 2 ret = sstable_builder.init(data_desc.get_desc()); ASSERT_EQ(OB_SUCCESS, ret); ObWholeDataStoreDesc index_desc; prepare_index_desc(data_desc, index_desc); ObIndexBlockRebuilder rebuilder; ObITable::TableKey ddl_table_key; ddl_table_key.table_type_ = ObITable::DDL_MERGE_CG_SSTABLE; OK(rebuilder.init(sstable_builder, nullptr, ddl_table_key)); ObSSTableIndexBuilder *sst_builder = nullptr; mock_compaction(test_row_num, data_write_ctxs, index_write_ctxs, merge_info_list, res, roots, sst_builder); ASSERT_EQ(test_row_num, merge_info_list->count()); vector absolute_offsets; LOG_INFO("test absolute offset", K(test_row_num)); for (int meta_idx = 0; meta_idx < test_row_num; meta_idx += 10) { merge_info_list->at(meta_idx)->val_.ddl_end_row_offset_ = meta_idx; rebuilder.append_macro_row(*merge_info_list->at(meta_idx)); absolute_offsets.push_back(meta_idx); } OK(rebuilder.close()); ObSSTableMergeRes res2; OK(sstable_builder.close(res2)); ASSERT_GT(res2.root_desc_.height_, 2); ASSERT_EQ(false, res2.table_backup_flag_.has_backup()); ASSERT_EQ(true, res2.table_backup_flag_.has_local()); ObMicroBlockReaderHelper reader_helper; ObIMicroBlockReader *micro_reader; ASSERT_EQ(OB_SUCCESS, reader_helper.init(allocator_)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sstable_builder.index_store_desc_.get_desc().row_store_type_, micro_reader)); ObMicroBlockData root_block(res2.root_desc_.buf_, res2.root_desc_.addr_.size_); ObDatumRow row; OK(row.init(allocator_, index_desc.get_desc().col_desc_->row_column_count_)); OK(micro_reader->init(root_block, nullptr)); ObIndexBlockRowParser idx_row_parser; int64_t last_idx = 0; for (int64_t it = 0; it != micro_reader->row_count(); ++it) { idx_row_parser.reset(); OK(micro_reader->get_row(it, row)); OK(idx_row_parser.init(TEST_ROWKEY_COLUMN_CNT + 2, row)); int64_t before_last_idx = last_idx; for (int absolute_idx = last_idx; absolute_idx < absolute_offsets.size(); absolute_idx ++) { if (absolute_offsets[absolute_idx] == idx_row_parser.get_row_offset()) { last_idx = absolute_idx; } } ASSERT_GT(last_idx, before_last_idx); if (it == micro_reader->row_count() - 1) { ASSERT_EQ(absolute_offsets[absolute_offsets.size() - 1], idx_row_parser.get_row_offset()); } } sst_builder->~ObSSTableIndexBuilder(); } TEST_F(TestIndexTree, test_close_with_old_schema) { LOG_INFO("BEGIN TestIndexTree.test_close_with_old_schema"); int ret = OB_SUCCESS; const int64_t test_row_num = 10; ObArray data_write_ctxs; ObArray index_write_ctxs; ObMacroMetasArray *merge_info_list = nullptr; ObSSTableMergeRes res; IndexTreeRootCtxList *roots = nullptr; ObSSTableIndexBuilder *sst_builder = nullptr; mock_compaction(test_row_num, data_write_ctxs, index_write_ctxs, merge_info_list, res, roots, sst_builder); ASSERT_EQ(test_row_num, merge_info_list->count()); // mock old schema with fewer columns ObWholeDataStoreDesc index_desc; OK(index_desc.init(false/*is_ddl*/, table_schema_, ObLSID(1), ObTabletID(1), MAJOR_MERGE, ObTimeUtility::fast_current_time()/*snapshot*/, 0/*cluster_version*/, table_schema_.get_micro_index_clustered(), 0/*transfer_seq*/)); index_desc.static_desc_.major_working_cluster_version_ = DATA_VERSION_4_0_0_0; --index_desc.get_desc().col_desc_->full_stored_col_cnt_; index_desc.get_desc().col_desc_->col_default_checksum_array_.pop_back(); // read old macro block metas ObSSTableIndexBuilder sstable_builder(false /* not need writer buffer*/); OK(sstable_builder.init(index_desc.get_desc())); ObIndexBlockRebuilder rebuilder; OK(rebuilder.init(sstable_builder, nullptr, ObITable::TableKey())); LOG_INFO("test close with old schema", K(test_row_num)); for (int64_t i = 0; i < test_row_num; ++i) { OK(rebuilder.append_macro_row(*merge_info_list->at(i))); } OK(rebuilder.close()); ObSSTableMergeRes res2; ASSERT_EQ(OB_INVALID_ARGUMENT, sstable_builder.close(res2)); sst_builder->~ObSSTableIndexBuilder(); } TEST_F(TestIndexTree, test_rebuilder_backup_disable_dump_disk) { int ret = OB_SUCCESS; ObDatumRow row; OK(row.init(allocator_, TEST_COLUMN_CNT)); ObDatumRow multi_row; OK(multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; ObWholeDataStoreDesc index_desc1; ObWholeDataStoreDesc index_desc2; ObSSTableIndexBuilder sstable_builder1(false); ObSSTableIndexBuilder sstable_builder2(false); prepare_index_builder(index_desc1, sstable_builder1); prepare_index_builder(index_desc2, sstable_builder2); // write data ObWholeDataStoreDesc data_desc; prepare_data_desc(data_desc, &sstable_builder1); int64_t test_num = 10; ObMacroBlockWriter data_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; ASSERT_EQ(OB_SUCCESS, data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param, pre_warm_param, cleaner)); for (int64_t i = 0; i < test_num; ++i) { OK(row_generate_.get_next_row(i, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); OK(data_writer.append_row(multi_row)); OK(data_writer.build_micro_block()); OK(data_writer.try_switch_macro_block()); } OK(data_writer.close()); ObSSTableMergeRes res1; OK(sstable_builder1.close(res1)); ASSERT_EQ(false, res1.table_backup_flag_.has_backup()); ASSERT_EQ(true, res1.table_backup_flag_.has_local()); sstable_builder2.enable_dump_disk_ = false; // disable dump disk ObIndexBlockRebuilder rebuilder; OK(mock_init_backup_rebuilder(rebuilder, sstable_builder2)); // read data blocks ObMacroBlockReadInfo info; ObMacroBlockHandle macro_handle; const int64_t macro_block_size = 2 * 1024 * 1024; info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ); info.offset_ = 0; info.size_ = macro_block_size; info.buf_ = reinterpret_cast(allocator_.alloc(info.size_)); ASSERT_NE(nullptr, info.buf_); ObIArray &data_block_ids = res1.data_block_ids_; ObDataMacroBlockMeta *macro_meta = nullptr; ObArenaAllocator meta_allocator; ASSERT_EQ(data_block_ids.count(), test_num); // rebuild index tree int64_t absolute_row_offset = -1; for (int64_t i = 0; OB_SUCC(ret) && i < data_block_ids.count(); ++i) { MacroBlockId &cur_id = data_block_ids.at(i); info.macro_block_id_ = cur_id; macro_handle.reset(); OK(ObBlockManager::read_block(info, macro_handle)); ASSERT_NE(macro_handle.get_buffer(), nullptr); ASSERT_EQ(macro_handle.get_data_size(), macro_block_size); OK(rebuilder.get_macro_meta(macro_handle.get_buffer(), macro_block_size, cur_id, meta_allocator, macro_meta)); absolute_row_offset += macro_meta->val_.row_count_; OK(rebuilder.append_macro_row(macro_handle.get_buffer(), macro_block_size, cur_id, absolute_row_offset)); } // mock has backup id OK(rebuilder.close()); ObSSTableMergeRes res2; ASSERT_EQ(true, rebuilder.index_tree_root_ctx_->meta_block_info_.in_array()); rebuilder.~ObIndexBlockRebuilder(); OK(sstable_builder2.close(res2)); ASSERT_EQ(false, res2.table_backup_flag_.has_backup()); ASSERT_EQ(true, res2.table_backup_flag_.has_local()); ASSERT_EQ(true, res2.data_root_desc_.is_mem_type()); ASSERT_EQ(true, res2.root_desc_.is_mem_type()); // compare merge res ASSERT_EQ(res1.root_desc_.height_, res2.root_desc_.height_); ASSERT_EQ(res1.root_desc_.height_, 2); ObMicroBlockData root_block1(res1.root_desc_.buf_, res1.root_desc_.addr_.size_); ObMicroBlockData root_block2(res2.root_desc_.buf_, res2.root_desc_.addr_.size_); ObIMicroBlockReader *micro_reader1; ObIMicroBlockReader *micro_reader2; ObMicroBlockReaderHelper reader_helper; ObIMicroBlockReader *micro_reader; ASSERT_EQ(OB_SUCCESS, reader_helper.init(allocator_)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sstable_builder1.index_store_desc_.get_desc().row_store_type_, micro_reader1)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sstable_builder2.index_store_desc_.get_desc().row_store_type_, micro_reader2)); OK(micro_reader1->init(root_block1, nullptr)); OK(micro_reader2->init(root_block2, nullptr)); blocksstable::ObDatumRow row1; blocksstable::ObDatumRow row2; OK(row1.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); OK(row2.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); ASSERT_EQ(micro_reader1->row_count(), micro_reader2->row_count()); ASSERT_EQ(micro_reader1->row_count(), test_num); for (int64_t it = 0; it != micro_reader1->row_count(); ++it) { OK(micro_reader1->get_row(it, row1)); OK(micro_reader2->get_row(it, row2)); ObString val1 = row1.storage_datums_[TEST_ROWKEY_COLUMN_CNT + 2].get_string(); ObString val2 = row2.storage_datums_[TEST_ROWKEY_COLUMN_CNT + 2].get_string(); ASSERT_EQ(val1, val2); } } TEST_F(TestIndexTree, test_rebuilder_backup_all_mem) { int ret = OB_SUCCESS; ObDatumRow row; OK(row.init(allocator_, TEST_COLUMN_CNT)); ObDatumRow multi_row; OK(multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; ObWholeDataStoreDesc index_desc1; ObWholeDataStoreDesc index_desc2; ObSSTableIndexBuilder sstable_builder1(false); ObSSTableIndexBuilder sstable_builder2(false); prepare_index_builder(index_desc1, sstable_builder1); prepare_index_builder(index_desc2, sstable_builder2); // write data ObWholeDataStoreDesc data_desc; prepare_data_desc(data_desc, &sstable_builder1); int64_t test_num = 10; ObMacroBlockWriter data_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; ASSERT_EQ(OB_SUCCESS, data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param, pre_warm_param, cleaner)); for (int64_t i = 0; i < test_num; ++i) { OK(row_generate_.get_next_row(i, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); OK(data_writer.append_row(multi_row)); OK(data_writer.build_micro_block()); OK(data_writer.try_switch_macro_block()); } OK(data_writer.close()); ObSSTableMergeRes res1; OK(sstable_builder1.close(res1)); ASSERT_EQ(false, res1.table_backup_flag_.has_backup()); ASSERT_EQ(true, res1.table_backup_flag_.has_local()); ObIndexBlockRebuilder rebuilder; OK(mock_init_backup_rebuilder(rebuilder, sstable_builder2)); // read data blocks ObMacroBlockReadInfo info; ObMacroBlockHandle macro_handle; const int64_t macro_block_size = 2 * 1024 * 1024; info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ); info.offset_ = 0; info.size_ = macro_block_size; info.buf_ = reinterpret_cast(allocator_.alloc(info.size_)); ASSERT_NE(nullptr, info.buf_); ObIArray &data_block_ids = res1.data_block_ids_; ObDataMacroBlockMeta *macro_meta = nullptr; ObArenaAllocator meta_allocator; ASSERT_EQ(data_block_ids.count(), test_num); // rebuild index tree int64_t absolute_row_offset = -1; for (int64_t i = 0; OB_SUCC(ret) && i < data_block_ids.count(); ++i) { MacroBlockId &cur_id = data_block_ids.at(i); info.macro_block_id_ = cur_id; macro_handle.reset(); OK(ObBlockManager::read_block(info, macro_handle)); ASSERT_NE(macro_handle.get_buffer(), nullptr); ASSERT_EQ(macro_handle.get_data_size(), macro_block_size); OK(rebuilder.get_macro_meta(macro_handle.get_buffer(), macro_block_size, cur_id, meta_allocator, macro_meta)); absolute_row_offset += macro_meta->val_.row_count_; OK(rebuilder.append_macro_row(macro_handle.get_buffer(), macro_block_size, cur_id, absolute_row_offset)); } // mock has backup id OK(rebuilder.close()); ObSSTableMergeRes res2; ASSERT_EQ(true, rebuilder.index_tree_root_ctx_->index_tree_info_.in_mem()); rebuilder.~ObIndexBlockRebuilder(); OK(sstable_builder2.close(res2)); ASSERT_EQ(false, res2.table_backup_flag_.has_backup()); ASSERT_EQ(true, res2.table_backup_flag_.has_local()); ASSERT_EQ(true, res2.data_root_desc_.is_mem_type()); ASSERT_EQ(true, res2.root_desc_.is_mem_type()); // compare merge res ASSERT_EQ(res1.root_desc_.height_, res2.root_desc_.height_); ASSERT_EQ(res1.root_desc_.height_, 2); ObMicroBlockData root_block1(res1.root_desc_.buf_, res1.root_desc_.addr_.size_); ObMicroBlockData root_block2(res2.root_desc_.buf_, res2.root_desc_.addr_.size_); ObIMicroBlockReader *micro_reader1; ObIMicroBlockReader *micro_reader2; ObMicroBlockReaderHelper reader_helper; ObIMicroBlockReader *micro_reader; ASSERT_EQ(OB_SUCCESS, reader_helper.init(allocator_)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sstable_builder1.index_store_desc_.get_desc().row_store_type_, micro_reader1)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sstable_builder2.index_store_desc_.get_desc().row_store_type_, micro_reader2)); OK(micro_reader1->init(root_block1, nullptr)); OK(micro_reader2->init(root_block2, nullptr)); blocksstable::ObDatumRow row1; blocksstable::ObDatumRow row2; OK(row1.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); OK(row2.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); ASSERT_EQ(micro_reader1->row_count(), micro_reader2->row_count()); ASSERT_EQ(micro_reader1->row_count(), test_num); for (int64_t it = 0; it != micro_reader1->row_count(); ++it) { OK(micro_reader1->get_row(it, row1)); OK(micro_reader2->get_row(it, row2)); ObString val1 = row1.storage_datums_[TEST_ROWKEY_COLUMN_CNT + 2].get_string(); ObString val2 = row2.storage_datums_[TEST_ROWKEY_COLUMN_CNT + 2].get_string(); ASSERT_EQ(val1, val2); } } TEST_F(TestIndexTree, test_rebuilder_backup_all_disk) { int ret = OB_SUCCESS; ObDatumRow row; OK(row.init(allocator_, TEST_COLUMN_CNT)); ObDatumRow multi_row; OK(multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; ObWholeDataStoreDesc index_desc1; ObWholeDataStoreDesc index_desc2; ObSSTableIndexBuilder sstable_builder1(false); ObSSTableIndexBuilder sstable_builder2(false); prepare_index_builder(index_desc1, sstable_builder1); prepare_index_builder(index_desc2, sstable_builder2); // write data ObWholeDataStoreDesc data_desc; prepare_data_desc(data_desc, &sstable_builder1); int64_t test_num = 400; ObMacroBlockWriter data_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; ASSERT_EQ(OB_SUCCESS, data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param, pre_warm_param, cleaner)); for (int64_t i = 0; i < test_num; ++i) { OK(row_generate_.get_next_row(i, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); OK(data_writer.append_row(multi_row)); OK(data_writer.build_micro_block()); OK(data_writer.try_switch_macro_block()); } OK(data_writer.close()); ObSSTableMergeRes res1; OK(sstable_builder1.close(res1)); ASSERT_EQ(false, res1.table_backup_flag_.has_backup()); ASSERT_EQ(true, res1.table_backup_flag_.has_local()); ObIndexBlockRebuilder rebuilder; OK(mock_init_backup_rebuilder(rebuilder, sstable_builder2)); // read data blocks ObMacroBlockReadInfo info; ObMacroBlockHandle macro_handle; const int64_t macro_block_size = 2 * 1024 * 1024; info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ); info.offset_ = 0; info.size_ = macro_block_size; info.buf_ = reinterpret_cast(allocator_.alloc(info.size_)); ASSERT_NE(nullptr, info.buf_); ObIArray &data_block_ids = res1.data_block_ids_; ObDataMacroBlockMeta *macro_meta = nullptr; ObArenaAllocator meta_allocator; ASSERT_EQ(data_block_ids.count(), test_num); // rebuild index tree int64_t absolute_row_offset = -1; for (int64_t i = 0; OB_SUCC(ret) && i < data_block_ids.count(); ++i) { MacroBlockId &cur_id = data_block_ids.at(i); info.macro_block_id_ = cur_id; macro_handle.reset(); OK(ObBlockManager::read_block(info, macro_handle)); ASSERT_NE(macro_handle.get_buffer(), nullptr); ASSERT_EQ(macro_handle.get_data_size(), macro_block_size); OK(rebuilder.get_macro_meta(macro_handle.get_buffer(), macro_block_size, cur_id, meta_allocator, macro_meta)); absolute_row_offset += macro_meta->val_.row_count_; OK(rebuilder.append_macro_row(macro_handle.get_buffer(), macro_block_size, cur_id, absolute_row_offset)); } OK(rebuilder.close()); ASSERT_EQ(true, rebuilder.index_tree_root_ctx_->index_tree_info_.in_disk()); rebuilder.~ObIndexBlockRebuilder(); ObSSTableMergeRes res2; OK(sstable_builder2.close(res2)); ASSERT_EQ(false, res2.table_backup_flag_.has_backup()); ASSERT_EQ(true, res2.table_backup_flag_.has_local()); // compare merge res ASSERT_EQ(res1.root_desc_.height_, res2.root_desc_.height_); // ASSERT_EQ(res1.root_desc_.height_, 2); ObMicroBlockData root_block1(res1.root_desc_.buf_, res1.root_desc_.addr_.size_); ObMicroBlockData root_block2(res2.root_desc_.buf_, res2.root_desc_.addr_.size_); ObIMicroBlockReader *micro_reader1; ObIMicroBlockReader *micro_reader2; ObMicroBlockReaderHelper reader_helper; ObIMicroBlockReader *micro_reader; ASSERT_EQ(OB_SUCCESS, reader_helper.init(allocator_)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sstable_builder1.index_store_desc_.get_desc().row_store_type_, micro_reader1)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sstable_builder2.index_store_desc_.get_desc().row_store_type_, micro_reader2)); OK(micro_reader1->init(root_block1, nullptr)); OK(micro_reader2->init(root_block2, nullptr)); blocksstable::ObDatumRow row1; blocksstable::ObDatumRow row2; OK(row1.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); OK(row2.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); ASSERT_EQ(micro_reader1->row_count(), micro_reader2->row_count()); for (int64_t it = 0; it != micro_reader1->row_count(); ++it) { OK(micro_reader1->get_row(it, row1)); OK(micro_reader2->get_row(it, row2)); ObString val1 = row1.storage_datums_[TEST_ROWKEY_COLUMN_CNT + 2].get_string(); ObString val2 = row2.storage_datums_[TEST_ROWKEY_COLUMN_CNT + 2].get_string(); ASSERT_EQ(val1, val2); } } TEST_F(TestIndexTree, test_rebuilder_backup_mem_and_disk) { int ret = OB_SUCCESS; ObDatumRow row; OK(row.init(allocator_, TEST_COLUMN_CNT)); ObDatumRow multi_row; OK(multi_row.init(allocator_, MAX_TEST_COLUMN_CNT)); ObDmlFlag dml = DF_INSERT; ObWholeDataStoreDesc index_desc1; ObWholeDataStoreDesc index_desc2; ObSSTableIndexBuilder sstable_builder1(false); ObSSTableIndexBuilder sstable_builder2(false); prepare_index_builder(index_desc1, sstable_builder1); prepare_index_builder(index_desc2, sstable_builder2); // write data ObWholeDataStoreDesc data_desc; prepare_data_desc(data_desc, &sstable_builder1); int64_t test_num = 400; ObMacroBlockWriter data_writer; ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; const ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; ASSERT_EQ(OB_SUCCESS, data_writer.open(data_desc.get_desc(), 0/*parallel_idx*/, seq_param, pre_warm_param, cleaner)); for (int64_t i = 0; i < test_num; ++i) { OK(row_generate_.get_next_row(i, row)); convert_to_multi_version_row(row, table_schema_.get_rowkey_column_num(), table_schema_.get_column_count(), SNAPSHOT_VERSION, dml, multi_row); OK(data_writer.append_row(multi_row)); OK(data_writer.build_micro_block()); OK(data_writer.try_switch_macro_block()); } OK(data_writer.close()); ObSSTableMergeRes res1; OK(sstable_builder1.close(res1)); ASSERT_EQ(false, res1.table_backup_flag_.has_backup()); ASSERT_EQ(true, res1.table_backup_flag_.has_local()); ObIndexBlockRebuilder rebuilder_mem; OK(mock_init_backup_rebuilder(rebuilder_mem, sstable_builder2)); ObIndexBlockRebuilder rebuilder; OK(mock_init_backup_rebuilder(rebuilder, sstable_builder2)); // read data blocks ObMacroBlockReadInfo info; ObMacroBlockHandle macro_handle; const int64_t macro_block_size = 2 * 1024 * 1024; info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ); info.offset_ = 0; info.size_ = macro_block_size; info.buf_ = reinterpret_cast(allocator_.alloc(info.size_)); ASSERT_NE(nullptr, info.buf_); ObIArray &data_block_ids = res1.data_block_ids_; ObDataMacroBlockMeta *macro_meta = nullptr; ObArenaAllocator meta_allocator; ASSERT_EQ(data_block_ids.count(), test_num); // rebuild index tree int64_t absolute_row_offset = -1; for (int64_t i = 0; OB_SUCC(ret) && i < 50; ++i) { MacroBlockId &cur_id = data_block_ids.at(i); info.macro_block_id_ = cur_id; macro_handle.reset(); OK(ObBlockManager::read_block(info, macro_handle)); ASSERT_NE(macro_handle.get_buffer(), nullptr); ASSERT_EQ(macro_handle.get_data_size(), macro_block_size); OK(rebuilder_mem.get_macro_meta(macro_handle.get_buffer(), macro_block_size, cur_id, meta_allocator, macro_meta)); absolute_row_offset += macro_meta->val_.row_count_; OK(rebuilder_mem.append_macro_row(macro_handle.get_buffer(), macro_block_size, cur_id, absolute_row_offset)); } OK(rebuilder_mem.close()); ASSERT_EQ(true, rebuilder_mem.index_tree_root_ctx_->index_tree_info_.in_mem()); rebuilder_mem.~ObIndexBlockRebuilder(); for (int64_t i = 50; OB_SUCC(ret) && i < data_block_ids.count(); ++i) { MacroBlockId &cur_id = data_block_ids.at(i); info.macro_block_id_ = cur_id; macro_handle.reset(); OK(ObBlockManager::read_block(info, macro_handle)); ASSERT_NE(macro_handle.get_buffer(), nullptr); ASSERT_EQ(macro_handle.get_data_size(), macro_block_size); OK(rebuilder.get_macro_meta(macro_handle.get_buffer(), macro_block_size, cur_id, meta_allocator, macro_meta)); absolute_row_offset += macro_meta->val_.row_count_; OK(rebuilder.append_macro_row(macro_handle.get_buffer(), macro_block_size, cur_id, absolute_row_offset)); } OK(rebuilder.close()); ASSERT_EQ(true, rebuilder.index_tree_root_ctx_->index_tree_info_.in_disk()); rebuilder.~ObIndexBlockRebuilder(); ObSSTableMergeRes res2; OK(sstable_builder2.close(res2)); ASSERT_EQ(false, res2.table_backup_flag_.has_backup()); ASSERT_EQ(true, res2.table_backup_flag_.has_local()); } TEST_F(TestIndexTree, test_cg_compaction_all_mem) { const int64_t test_num = 10; ObArray data_write_ctxs; ObArray index_write_ctxs; ObMacroMetasArray *merge_info_list = nullptr; ObSSTableMergeRes res1; IndexTreeRootCtxList *roots = nullptr; ObSSTableIndexBuilder *sstable_builder1 = nullptr; mock_cg_compaction(test_num, data_write_ctxs, index_write_ctxs, merge_info_list, res1, roots, sstable_builder1); STORAGE_LOG(INFO, "finish cg compaction and prepare rebuild"); ObSSTableIndexBuilder sstable_builder2(false); ObWholeDataStoreDesc data_desc2; prepare_cg_data_desc(data_desc2, &sstable_builder2); ASSERT_EQ(OB_SUCCESS, sstable_builder2.init(data_desc2.get_desc())); ObIndexBlockRebuilder rebuilder; OK(mock_init_backup_rebuilder(rebuilder, sstable_builder2)); // read data blocks ObMacroBlockReadInfo info; ObMacroBlockHandle macro_handle; const int64_t macro_block_size = 2 * 1024 * 1024; info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ); info.offset_ = 0; info.size_ = macro_block_size; info.buf_ = reinterpret_cast(allocator_.alloc(info.size_)); ASSERT_NE(nullptr, info.buf_); ObIArray &data_block_ids = res1.data_block_ids_; ObDataMacroBlockMeta *macro_meta = nullptr; ObArenaAllocator meta_allocator; ASSERT_EQ(data_block_ids.count(), test_num); // rebuild index tree int64_t absolute_row_offset = -1; for (int64_t i = 0; i < data_block_ids.count(); ++i) { MacroBlockId &cur_id = data_block_ids.at(i); info.macro_block_id_ = cur_id; macro_handle.reset(); OK(ObBlockManager::read_block(info, macro_handle)); ASSERT_NE(macro_handle.get_buffer(), nullptr); ASSERT_EQ(macro_handle.get_data_size(), macro_block_size); OK(rebuilder.get_macro_meta(macro_handle.get_buffer(), macro_block_size, cur_id, meta_allocator, macro_meta)); absolute_row_offset += macro_meta->val_.row_count_; OK(rebuilder.append_macro_row(macro_handle.get_buffer(), macro_block_size, cur_id, absolute_row_offset)); } // mock has backup id OK(rebuilder.close()); ObSSTableMergeRes res2; ASSERT_EQ(true, rebuilder.index_tree_root_ctx_->index_tree_info_.in_mem()); rebuilder.~ObIndexBlockRebuilder(); OK(sstable_builder2.close(res2)); ASSERT_EQ(false, res2.table_backup_flag_.has_backup()); ASSERT_EQ(true, res2.table_backup_flag_.has_local()); ASSERT_EQ(true, res2.data_root_desc_.is_mem_type()); ASSERT_EQ(true, res2.root_desc_.is_mem_type()); // compare merge res ASSERT_EQ(res1.root_desc_.height_, res2.root_desc_.height_); ASSERT_EQ(res1.root_desc_.height_, 2); ObMicroBlockData root_block1(res1.root_desc_.buf_, res1.root_desc_.addr_.size_); ObMicroBlockData root_block2(res2.root_desc_.buf_, res2.root_desc_.addr_.size_); ObIMicroBlockReader *micro_reader1; ObIMicroBlockReader *micro_reader2; ObMicroBlockReaderHelper reader_helper; ObIMicroBlockReader *micro_reader; ASSERT_EQ(OB_SUCCESS, reader_helper.init(allocator_)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sstable_builder1->index_store_desc_.get_desc().row_store_type_, micro_reader1)); ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sstable_builder2.index_store_desc_.get_desc().row_store_type_, micro_reader2)); OK(micro_reader1->init(root_block1, nullptr)); OK(micro_reader2->init(root_block2, nullptr)); blocksstable::ObDatumRow row1; blocksstable::ObDatumRow row2; OK(row1.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); OK(row2.init(allocator_, TEST_ROWKEY_COLUMN_CNT + 3)); ASSERT_EQ(micro_reader1->row_count(), micro_reader2->row_count()); ASSERT_EQ(micro_reader1->row_count(), test_num); for (int64_t it = 0; it != micro_reader1->row_count(); ++it) { OK(micro_reader1->get_row(it, row1)); OK(micro_reader2->get_row(it, row2)); ObString val1 = row1.storage_datums_[TEST_ROWKEY_COLUMN_CNT + 2].get_string(); ObString val2 = row2.storage_datums_[TEST_ROWKEY_COLUMN_CNT + 2].get_string(); ASSERT_EQ(val1, val2); } sstable_builder1->~ObSSTableIndexBuilder(); } TEST_F(TestIndexTree, test_cg_compaction_all_disk) { const int64_t test_num = 400; ObArray data_write_ctxs; ObArray index_write_ctxs; ObMacroMetasArray *merge_info_list = nullptr; ObSSTableMergeRes res1; IndexTreeRootCtxList *roots = nullptr; ObSSTableIndexBuilder *sstable_builder1 = nullptr; mock_cg_compaction(test_num, data_write_ctxs, index_write_ctxs, merge_info_list, res1, roots, sstable_builder1); STORAGE_LOG(INFO, "finish cg compaction and prepare rebuild"); ObSSTableIndexBuilder sstable_builder2(false); ObWholeDataStoreDesc data_desc2; prepare_cg_data_desc(data_desc2, &sstable_builder2); ASSERT_EQ(OB_SUCCESS, sstable_builder2.init(data_desc2.get_desc())); ObIndexBlockRebuilder rebuilder; OK(mock_init_backup_rebuilder(rebuilder, sstable_builder2)); // read data blocks ObMacroBlockReadInfo info; ObMacroBlockHandle macro_handle; const int64_t macro_block_size = 2 * 1024 * 1024; info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ); info.offset_ = 0; info.size_ = macro_block_size; info.buf_ = reinterpret_cast(allocator_.alloc(info.size_)); ASSERT_NE(nullptr, info.buf_); ObIArray &data_block_ids = res1.data_block_ids_; ObDataMacroBlockMeta *macro_meta = nullptr; ObArenaAllocator meta_allocator; ASSERT_EQ(data_block_ids.count(), test_num); // rebuild index tree int64_t absolute_row_offset = -1; for (int64_t i = 0; i < data_block_ids.count(); ++i) { MacroBlockId &cur_id = data_block_ids.at(i); info.macro_block_id_ = cur_id; macro_handle.reset(); OK(ObBlockManager::read_block(info, macro_handle)); ASSERT_NE(macro_handle.get_buffer(), nullptr); ASSERT_EQ(macro_handle.get_data_size(), macro_block_size); OK(rebuilder.get_macro_meta(macro_handle.get_buffer(), macro_block_size, cur_id, meta_allocator, macro_meta)); absolute_row_offset += macro_meta->val_.row_count_; OK(rebuilder.append_macro_row(macro_handle.get_buffer(), macro_block_size, cur_id, absolute_row_offset)); } // mock has backup id OK(rebuilder.close()); ObSSTableMergeRes res2; ASSERT_EQ(true, rebuilder.index_tree_root_ctx_->index_tree_info_.in_disk()); rebuilder.~ObIndexBlockRebuilder(); OK(sstable_builder2.close(res2)); sstable_builder1->~ObSSTableIndexBuilder(); } TEST_F(TestIndexTree, test_cg_compaction_mem_and_disk) { const int64_t test_num = 400; ObArray data_write_ctxs; ObArray index_write_ctxs; ObMacroMetasArray *merge_info_list = nullptr; ObSSTableMergeRes res1; IndexTreeRootCtxList *roots = nullptr; ObSSTableIndexBuilder *sstable_builder1 = nullptr; mock_cg_compaction(test_num, data_write_ctxs, index_write_ctxs, merge_info_list, res1, roots, sstable_builder1); STORAGE_LOG(INFO, "finish cg compaction and prepare rebuild"); ObSSTableIndexBuilder sstable_builder2(false); ObWholeDataStoreDesc data_desc2; prepare_cg_data_desc(data_desc2, &sstable_builder2); ASSERT_EQ(OB_SUCCESS, sstable_builder2.init(data_desc2.get_desc())); ObIndexBlockRebuilder rebuilder_mem; OK(mock_init_backup_rebuilder(rebuilder_mem, sstable_builder2)); rebuilder_mem.index_tree_root_ctx_->task_idx_ = 0; ObIndexBlockRebuilder rebuilder_disk; OK(mock_init_backup_rebuilder(rebuilder_disk, sstable_builder2)); rebuilder_disk.index_tree_root_ctx_->task_idx_ = 0; // read data blocks ObMacroBlockReadInfo info; ObMacroBlockHandle macro_handle; const int64_t macro_block_size = 2 * 1024 * 1024; info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ); info.offset_ = 0; info.size_ = macro_block_size; info.buf_ = reinterpret_cast(allocator_.alloc(info.size_)); ASSERT_NE(nullptr, info.buf_); ObIArray &data_block_ids = res1.data_block_ids_; ObDataMacroBlockMeta *macro_meta = nullptr; ObArenaAllocator meta_allocator; ASSERT_EQ(data_block_ids.count(), test_num); // rebuild index tree int64_t absolute_row_offset = -1; for (int64_t i = 0; i < 100; ++i) { MacroBlockId &cur_id = data_block_ids.at(i); info.macro_block_id_ = cur_id; macro_handle.reset(); OK(ObBlockManager::read_block(info, macro_handle)); ASSERT_NE(macro_handle.get_buffer(), nullptr); ASSERT_EQ(macro_handle.get_data_size(), macro_block_size); OK(rebuilder_mem.get_macro_meta(macro_handle.get_buffer(), macro_block_size, cur_id, meta_allocator, macro_meta)); absolute_row_offset += macro_meta->val_.row_count_; OK(rebuilder_mem.append_macro_row(macro_handle.get_buffer(), macro_block_size, cur_id, absolute_row_offset)); } OK(rebuilder_mem.close()); ASSERT_EQ(true, rebuilder_mem.index_tree_root_ctx_->index_tree_info_.in_mem()); rebuilder_mem.~ObIndexBlockRebuilder(); for (int64_t i = 100; i < data_block_ids.count(); ++i) { MacroBlockId &cur_id = data_block_ids.at(i); info.macro_block_id_ = cur_id; macro_handle.reset(); OK(ObBlockManager::read_block(info, macro_handle)); ASSERT_NE(macro_handle.get_buffer(), nullptr); ASSERT_EQ(macro_handle.get_data_size(), macro_block_size); OK(rebuilder_disk.get_macro_meta(macro_handle.get_buffer(), macro_block_size, cur_id, meta_allocator, macro_meta)); absolute_row_offset += macro_meta->val_.row_count_; OK(rebuilder_disk.append_macro_row(macro_handle.get_buffer(), macro_block_size, cur_id, absolute_row_offset)); } OK(rebuilder_disk.close()); ObSSTableMergeRes res2; ASSERT_EQ(true, rebuilder_disk.index_tree_root_ctx_->index_tree_info_.in_disk()); rebuilder_disk.~ObIndexBlockRebuilder(); OK(sstable_builder2.close(res2)); sstable_builder1->~ObSSTableIndexBuilder(); } }//end namespace unittest }//end namespace oceanbase int main(int argc, char **argv) { system("rm -f test_index_tree.log*"); oceanbase::common::ObLogger::get_logger().set_log_level("INFO"); OB_LOGGER.set_file_name("test_index_tree.log", true); srand(time(NULL)); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }