/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #include #define private public #define protected public #include "storage/memtable/ob_memtable_interface.h" #include "storage/test_tablet_helper.h" #include "storage/column_store/ob_column_oriented_merger.h" #include "mtlenv/storage/test_merge_basic.h" #include "unittest/storage/test_schema_prepare.h" namespace oceanbase { using namespace common; using namespace share::schema; using namespace share; using namespace blocksstable; using namespace compaction; using namespace memtable; using namespace observer; using namespace unittest; using namespace memtable; using namespace transaction; ObCOTabletMergeCtx::~ObCOTabletMergeCtx() { if (OB_NOT_NULL(cg_merge_info_array_)) { for (int i = 0; i < array_count_; ++i) { if (OB_NOT_NULL(cg_merge_info_array_[i])) { cg_merge_info_array_[i]->destroy(); cg_merge_info_array_[i] = nullptr; } } mem_ctx_.free(cg_merge_info_array_); cg_merge_info_array_ = nullptr; merged_sstable_array_ = nullptr; } } namespace storage { static ObArenaAllocator allocator_; static ObLocalArena merger_allocator_("TestArena", OB_MALLOC_NORMAL_BLOCK_SIZE); void close_builder_and_prepare_sstable( const blocksstable::ObDataStoreDesc &data_store_desc, const ObITable::TableKey &table_key, const ObTableSchema &table_schema, const int64_t column_group_cnt, ObSSTableIndexBuilder &index_builder, ObTableHandleV2 &table_handle) { ObSSTableMergeRes res; OK(index_builder.close(res)); ObIndexTreeRootBlockDesc root_desc; root_desc = res.root_desc_; ASSERT_TRUE(root_desc.is_valid()); ObTabletCreateSSTableParam param; param.table_key_ = table_key; param.co_base_type_ = ObCOSSTableBaseType::ALL_CG_TYPE; param.column_group_cnt_ = column_group_cnt; param.schema_version_ = ObMultiVersionSSTableTest::SCHEMA_VERSION; param.create_snapshot_version_ = 0; param.progressive_merge_round_ = 0; param.progressive_merge_step_ = 0; param.table_mode_ = table_schema.get_table_mode_struct(); param.index_type_ = table_schema.get_index_type(); param.rowkey_column_cnt_ = data_store_desc.is_cg() ? 0 : table_schema.get_rowkey_column_num() + ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt(); ObSSTableMergeRes::fill_addr_and_data(res.root_desc_, param.root_block_addr_, param.root_block_data_); ObSSTableMergeRes::fill_addr_and_data(res.data_root_desc_, param.data_block_macro_meta_addr_, param.data_block_macro_meta_); param.is_meta_root_ = res.data_root_desc_.is_meta_root_; param.root_row_store_type_ = res.root_row_store_type_; param.latest_row_store_type_ = res.root_row_store_type_; param.data_index_tree_height_ = res.root_desc_.height_; param.index_blocks_cnt_ = res.index_blocks_cnt_; param.data_blocks_cnt_ = res.data_blocks_cnt_; param.micro_block_cnt_ = res.micro_block_cnt_; param.use_old_macro_block_count_ = res.use_old_macro_block_count_; param.row_count_ = res.row_count_; param.column_cnt_ = res.data_column_cnt_; param.data_checksum_ = res.data_checksum_; param.occupy_size_ = res.occupy_size_; param.original_size_ = res.original_size_; param.compressor_type_ = ObCompressorType::NONE_COMPRESSOR; param.max_merged_trans_version_ = res.max_merged_trans_version_; param.contain_uncommitted_row_ = res.contain_uncommitted_row_; param.compressor_type_ = res.compressor_type_; param.encrypt_id_ = res.encrypt_id_; param.master_key_id_ = res.master_key_id_; param.nested_size_ = res.nested_size_; param.nested_offset_ = res.nested_offset_; ASSERT_EQ(OB_SUCCESS, param.data_block_ids_.assign(res.data_block_ids_)); ASSERT_EQ(OB_SUCCESS, param.other_block_ids_.assign(res.other_block_ids_)); param.table_backup_flag_.reset(); param.table_shared_flag_.reset(); param.tx_data_recycle_scn_.set_min(); param.sstable_logic_seq_ = 0; param.recycle_version_ = 0; param.root_macro_seq_ = 0; param.full_column_cnt_ = 0; param.is_co_table_without_cgs_ = false; param.co_base_snapshot_version_ = 0; param.ddl_scn_.set_min(); param.filled_tx_scn_ = table_key.is_major_sstable() ? SCN::min_scn() : table_key.get_end_scn(); if (is_major_merge_type(data_store_desc.get_merge_type())) { ASSERT_EQ(OB_SUCCESS, ObSSTableMergeRes::fill_column_checksum_for_empty_major(param.column_cnt_, param.column_checksums_)); } if (data_store_desc.is_cg()) { OK(ObTabletCreateDeleteHelper::create_sstable(param, allocator_, table_handle)); } else { OK(ObTabletCreateDeleteHelper::create_sstable(param, allocator_, table_handle)); } } int get_col_ids(const ObTableSchema &table_schema, ObIArray &col_ids) { return table_schema.get_store_column_ids(col_ids); } class TestCOMerge : public TestMergeBasic { public: static const int64_t MAX_PARALLEL_DEGREE = 10; TestCOMerge() :TestMergeBasic("testco_merge") {} virtual ~TestCOMerge() {} void SetUp(); void TearDown(); static void SetUpTestCase(); static void TearDownTestCase(); void prepare_query_param(const ObVersionRange &version_range); void prepare_merge_context(const ObMergeType &merge_type, const bool is_full_merge, const ObVersionRange &trans_version_range, ObCOTabletMergeCtx &merge_context); void alloc_merge_infos(ObCOTabletMergeCtx &merge_context); void prepare_data( const int64_t micro_row_cnt, const int64_t macro_row_cnt, const int64_t row_count, ObMockIterator &data_iter, ObMacroBlockWriter ¯o_writer, ObCOMergeProjector *projector); void prepare_co_sstable( const ObTableSchema &table_schema, const ObMergeType &merge_type, const int64_t snapshot_version, const int64_t multi_version_start, const int64_t* micro_row_cnt, const int64_t* macro_row_cnt, ObMockIterator &data_iter, ObTableHandleV2 &co_table_handle); void create_empty_data_co_sstable(const int64_t snapshot_version, ObTableHandleV2 &table_handle); void add_all_and_each_column_group(); void prepare_scan_param( const ObITableReadInfo &cg_read_info, const ObVersionRange &version_range, ObStoreCtx &store_ctx, ObTableIterParam &iter_param, ObTableAccessContext &context); void init_co_sstable(storage::ObTablesHandleArray &merged_cg_tables_handle, const int64_t cnt) { int ret = OB_SUCCESS; common::ObArray cg_tables; ObCOSSTableV2 *co_sstable = nullptr; for (int64_t i = 0; i < cnt; i++) { ObSSTable *merged_sstable = static_cast(merged_cg_tables_handle.get_table(i)); assert(merged_sstable); if (!merged_sstable->is_co_sstable()) { cg_tables.push_back(merged_sstable); } else { co_sstable = static_cast(merged_sstable); } } assert(co_sstable); ASSERT_EQ(OB_SUCCESS, co_sstable->fill_cg_sstables(cg_tables)); } void get_cg_read_info(const ObColDesc &col_desc, const ObITableReadInfo *&cg_read_info) { int ret = OB_SUCCESS; cg_read_info_.reset(); if (OB_FAIL(ObTenantCGReadInfoMgr::construct_cg_read_info(allocator_, lib::is_oracle_mode(), col_desc, nullptr, cg_read_info_))) { LOG_WARN("Fail to init cg read info", K(ret)); } else { cg_read_info = &cg_read_info_; } ASSERT_EQ(OB_SUCCESS, ret); } public: ObCOMergeDagParam param_; ObCOMergeDagNet dag_net_; ObStoreCtx store_ctx_; ObTableReadInfo cg_read_info_; }; void TestCOMerge::SetUpTestCase() { ObMultiVersionSSTableTest::SetUpTestCase(); //mock sequence no ObClockGenerator::init(); ObLSID ls_id(ls_id_); ObTabletID tablet_id(tablet_id_); ObLSHandle ls_handle; ObLSService *ls_svr = MTL(ObLSService*); ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD)); // create tablet obrpc::ObBatchCreateTabletArg create_tablet_arg; share::schema::ObTableSchema table_schema; ASSERT_EQ(OB_SUCCESS, gen_create_tablet_arg(tenant_id_, ls_id, tablet_id, create_tablet_arg, 1, &table_schema)); ASSERT_EQ(OB_SUCCESS, TestTabletHelper::create_tablet(ls_handle, tablet_id, table_schema, allocator_)); ObTenantFreezeInfoMgr *mgr = MTL(ObTenantFreezeInfoMgr *); ASSERT_TRUE(nullptr != mgr); share::ObFreezeInfoList &info_list = mgr->freeze_info_mgr_.freeze_info_; info_list.reset(); share::SCN frozen_val; frozen_val.val_ = 1; ASSERT_EQ(OB_SUCCESS, info_list.frozen_statuses_.push_back(share::ObFreezeInfo(frozen_val, 1, 0))); info_list.latest_snapshot_gc_scn_.val_ = 2; } void TestCOMerge::TearDownTestCase() { ObMultiVersionSSTableTest::TearDownTestCase(); // reset sequence no ObClockGenerator::destroy(); } void TestCOMerge::SetUp() { ObMultiVersionSSTableTest::SetUp(); param_.compat_mode_ = lib::Worker::CompatMode::MYSQL; } void TestCOMerge::TearDown() { ObMultiVersionSSTableTest::TearDown(); TRANS_LOG(INFO, "teardown success"); } void TestCOMerge::prepare_scan_param( const ObITableReadInfo &cg_read_info, const ObVersionRange &version_range, ObStoreCtx &store_ctx, ObTableIterParam &iter_param, ObTableAccessContext &context) { context.reset(); ObLSID ls_id(ls_id_); iter_param.table_id_ = table_id_; iter_param.tablet_id_ = tablet_id_; iter_param.read_info_ = &cg_read_info; iter_param.out_cols_project_ = nullptr; iter_param.is_same_schema_column_ = true; iter_param.has_virtual_columns_ = false; iter_param.vectorized_enabled_ = false; ASSERT_EQ(OB_SUCCESS, store_ctx.init_for_read(ls_id, iter_param.tablet_id_, INT64_MAX, // query_expire_ts -1, // lock_timeout_us share::SCN::max_scn())); ObQueryFlag query_flag(ObQueryFlag::Forward, true, /*is daily merge scan*/ true, /*is read multiple macro block*/ true, /*sys task scan, read one macro block in single io*/ false /*full row scan flag, obsoleted*/, false,/*index back*/ false); /*query_stat*/ query_flag.set_not_use_row_cache(); query_flag.set_not_use_block_cache(); //query_flag.multi_version_minor_merge_ = true; ASSERT_EQ(OB_SUCCESS, context.init(query_flag, store_ctx, allocator_, allocator_, version_range)); context.limit_param_ = nullptr; } void TestCOMerge::prepare_data( const int64_t micro_row_cnt, const int64_t macro_row_cnt, const int64_t column_count, ObMockIterator &data_iter, ObMacroBlockWriter ¯o_writer, ObCOMergeProjector *projector) { const ObStoreRow *row = nullptr; ObDatumRow datum_row; datum_row.init(allocator_, column_count); for (int64_t i = 0; i < data_iter.count(); i++) { OK(data_iter.get_row(i, row)); ASSERT_TRUE(nullptr != row); datum_row.from_store_row(*row); if (nullptr != projector) { projector->project(datum_row); } const ObDatumRow &append_row = projector == nullptr ? datum_row : projector->get_project_row(); ASSERT_EQ(OB_SUCCESS, macro_writer.append_row(append_row)); if (macro_writer.micro_writer_->get_row_count() >= micro_row_cnt) { OK(macro_writer.build_micro_block()); } if (macro_writer.macro_blocks_[macro_writer.current_index_].get_row_count() >= macro_row_cnt) { OK(macro_writer.try_switch_macro_block()); } } OK(macro_writer.try_switch_macro_block()); } void TestCOMerge::prepare_co_sstable( const ObTableSchema &table_schema, const ObMergeType &merge_type, const int64_t snapshot_version, const int64_t multi_version_start, const int64_t* micro_row_cnt, const int64_t* macro_row_cnt, ObMockIterator &data_iter, ObTableHandleV2 &co_table_handle) { ObStorageSchema storage_schema; common::ObArray cg_tables; ObITable *co_table = nullptr; ASSERT_EQ(OB_SUCCESS, storage_schema.init(allocator_, table_schema, lib::Worker::CompatMode::MYSQL)); const common::ObIArray &cg_array = storage_schema.get_column_groups(); for (int64_t i = 0; i < cg_array.count(); i++) { const ObStorageColumnGroupSchema &cg_schema = cg_array.at(i); ObCOMergeProjector projector; blocksstable::ObWholeDataStoreDesc data_store_desc; ObMacroBlockWriter macro_writer; ObSSTableIndexBuilder root_index_builder(false/* not need writer buffer */); ObCOMergeProjector *row_project = nullptr; ObTableHandleV2 *table_handle = nullptr; OK(data_store_desc.init(false/*is_ddl*/, table_schema, ObLSID(ls_id_), ObTabletID(tablet_id_), merge_type, snapshot_version, DATA_CURRENT_VERSION, table_schema.get_micro_index_clustered(), 0 /*tablet_transfer_seq*/, share::SCN::invalid_scn(), &cg_schema, i)); ASSERT_EQ(OB_SUCCESS, root_index_builder.init(data_store_desc.get_desc())); data_store_desc.get_desc().sstable_index_builder_ = &root_index_builder; if (!cg_schema.is_all_column_group()) { OK(projector.init(cg_schema)); row_project = &projector; } ObMacroSeqParam seq_param; seq_param.seq_type_ = ObMacroSeqParam::SEQ_TYPE_INC; seq_param.start_ = 0; ObPreWarmerParam pre_warm_param(MEM_PRE_WARM); ObSSTablePrivateObjectCleaner cleaner; OK(macro_writer.open(data_store_desc.get_desc(), 0/*parallel_idx*/, seq_param/*start_seq*/, pre_warm_param, cleaner)); prepare_data(micro_row_cnt[i], macro_row_cnt[i], table_schema.get_column_count(), data_iter, macro_writer, row_project); OK(macro_writer.close()); // data write ctx has been moved to root_index_builder ASSERT_EQ(macro_writer.get_macro_block_write_ctx().get_macro_block_count(), 0); ObITable::TableKey table_key; table_key.tablet_id_ = ObTabletID(tablet_id_); table_key.table_type_ = cg_schema.is_all_column_group() ? ObITable::COLUMN_ORIENTED_SSTABLE : ObITable::NORMAL_COLUMN_GROUP_SSTABLE; if (cg_schema.is_all_column_group()) { table_key.column_group_idx_ = i; } ASSERT_NE(nullptr, table_handle = OB_NEWx(ObTableHandleV2, (&allocator_))); const int64_t cg_cnt = cg_schema.is_all_column_group() ? cg_array.count() : 1; close_builder_and_prepare_sstable(data_store_desc.get_desc(), table_key, table_schema, cg_cnt, root_index_builder, *table_handle); if (cg_schema.is_all_column_group()) { co_table_handle.set_sstable(table_handle->get_table(), &allocator_); } else { OK(cg_tables.push_back(table_handle->get_table())); } //table_key.log_ts_range_ = 0; } for (int i = 0; i < cg_tables.count(); i++ ) { assert(cg_tables.at(i)->is_sstable()); } ASSERT_EQ(OB_SUCCESS, static_cast(co_table_handle.get_table())->fill_cg_sstables(cg_tables)); } void TestCOMerge::prepare_query_param(const ObVersionRange &version_range) { context_.reset(); ObLSID ls_id(ls_id_); iter_param_.table_id_ = table_id_; iter_param_.tablet_id_ = tablet_id_; iter_param_.read_info_ = &full_read_info_; iter_param_.out_cols_project_ = nullptr; iter_param_.is_same_schema_column_ = true; iter_param_.has_virtual_columns_ = false; iter_param_.vectorized_enabled_ = false; ASSERT_EQ(OB_SUCCESS, store_ctx_.init_for_read(ls_id, iter_param_.tablet_id_, INT64_MAX, // query_expire_ts -1, // lock_timeout_us share::SCN::max_scn())); ObQueryFlag query_flag(ObQueryFlag::Forward, true, /*is daily merge scan*/ true, /*is read multiple macro block*/ true, /*sys task scan, read one macro block in single io*/ false /*full row scan flag, obsoleted*/, false,/*index back*/ false); /*query_stat*/ query_flag.set_not_use_row_cache(); query_flag.set_not_use_block_cache(); //query_flag.multi_version_minor_merge_ = true; ASSERT_EQ(OB_SUCCESS, context_.init(query_flag, store_ctx_, allocator_, allocator_, version_range)); context_.limit_param_ = nullptr; } void TestCOMerge::alloc_merge_infos(ObCOTabletMergeCtx &merge_context) { ASSERT_EQ(OB_SUCCESS, merge_context.init_tablet_merge_info()); } void TestCOMerge::create_empty_data_co_sstable(const int64_t snapshot_version, ObTableHandleV2 &table_handle) { ObStorageSchema storage_schema; ASSERT_EQ(OB_SUCCESS, storage_schema.init(allocator_, table_schema_, lib::Worker::CompatMode::MYSQL)); ASSERT_EQ(OB_SUCCESS, ObTabletCreateDeleteHelper::create_empty_sstable( allocator_, storage_schema, ObTabletID(tablet_id_), snapshot_version, table_handle)); LOG_INFO("succ to create_empty_sstable", K(table_schema_), K(storage_schema), KPC(table_handle.get_table())); } void TestCOMerge::add_all_and_each_column_group() { unittest::TestSchemaPrepare::add_all_and_each_column_group(allocator_, table_schema_); } void TestCOMerge::prepare_merge_context(const ObMergeType &merge_type, const bool is_full_merge, const ObVersionRange &trans_version_range, ObCOTabletMergeCtx &merge_context) { TestMergeBasic::prepare_merge_context(merge_type, is_full_merge, trans_version_range, merge_context); merge_context.static_param_.co_major_merge_type_ = ObCOMajorMergePolicy::BUILD_COLUMN_STORE_MERGE; merge_context.static_param_.data_version_ = DATA_VERSION_4_3_0_0; merge_context.static_param_.dag_param_.merge_version_ = trans_version_range.snapshot_version_; ASSERT_EQ(OB_SUCCESS, merge_context.cal_merge_param()); ASSERT_EQ(OB_SUCCESS, merge_context.init_parallel_merge_ctx()); ASSERT_EQ(OB_SUCCESS, merge_context.init_static_param_and_desc()); ASSERT_EQ(OB_SUCCESS, merge_context.init_read_info()); ASSERT_EQ(OB_SUCCESS, merge_context.init_tablet_merge_info()); } void set_cg_idx( ObCOTabletMergeCtx &ctx, const int64_t start_cg_idx, const int64_t end_cg_idx) { ObCOMergeDagParam *dag_param = static_cast(&ctx.static_param_.dag_param_); dag_param->start_cg_idx_ = start_cg_idx; dag_param->end_cg_idx_ = end_cg_idx; dag_param->compat_mode_ = lib::Worker::CompatMode::MYSQL; } TEST_F(TestCOMerge, test_merge_default_row_store_with_empty_major) { int ret = OB_SUCCESS; ObCOTabletMergeCtx merge_context(dag_net_, param_, allocator_); ObTableHandleV2 handle1; const char *micro_data[3]; micro_data[0] = "bigint var bigint bigint bigint bigint dml flag multi_version_row_flag\n" "0 var1 -8 0 NOP 1 T_DML_UPDATE EXIST LF\n" "1 var1 -8 0 2 2 T_DML_INSERT EXIST CLF\n"; micro_data[1] = "bigint var bigint bigint bigint bigint dml flag multi_version_row_flag\n" "2 var1 -8 MIN 3 2 T_DML_UPDATE EXIST SCF\n" "2 var1 -8 0 3 NOP T_DML_UPDATE EXIST N\n"; micro_data[2] = "bigint var bigint bigint bigint bigint dml flag multi_version_row_flag\n" "2 var1 -6 0 2 2 T_DML_INSERT EXIST CL\n"; int schema_rowkey_cnt = 2; int64_t snapshot_version = 7; ObScnRange scn_range; scn_range.start_scn_.set_min(); scn_range.end_scn_.convert_for_tx(10); prepare_table_schema(micro_data, schema_rowkey_cnt, scn_range, snapshot_version); add_all_and_each_column_group(); reset_writer(snapshot_version); prepare_one_macro(micro_data, 2); prepare_one_macro(µ_data[2], 1); prepare_data_end(handle1); ObTableHandleV2 empty_co_table_handle; ObStorageSchema storage_schema; ASSERT_EQ(OB_SUCCESS, storage_schema.init(allocator_, table_schema_, lib::Worker::CompatMode::MYSQL)); ASSERT_EQ(OB_SUCCESS, ObTabletCreateDeleteHelper::create_empty_sstable(allocator_, storage_schema, ObTabletID(tablet_id_), snapshot_version, empty_co_table_handle)); merge_context.static_param_.tables_handle_.add_table(empty_co_table_handle); merge_context.static_param_.tables_handle_.add_table(handle1); STORAGE_LOG(INFO, "finish prepare sstable1"); ObTableHandleV2 handle2; const char *micro_data2[1]; micro_data2[0] = "bigint var bigint bigint bigint bigint dml flag multi_version_row_flag\n" "0 var1 -10 0 NOP 10 T_DML_UPDATE EXIST LF\n" "2 var1 -10 0 NOP 12 T_DML_UPDATE EXIST LF\n" "3 var1 -10 0 NOP 13 T_DML_UPDATE EXIST LF\n"; snapshot_version = 20; table_key_.scn_range_.start_scn_.convert_for_tx(10); table_key_.scn_range_.end_scn_.convert_for_tx(20); reset_writer(snapshot_version); prepare_one_macro(micro_data2, 1); prepare_data_end(handle2); merge_context.static_param_.tables_handle_.add_table(handle2); STORAGE_LOG(INFO, "finish prepare sstable2"); ObVersionRange trans_version_range; trans_version_range.snapshot_version_ = 100; trans_version_range.multi_version_start_ = 7; trans_version_range.base_version_ = 7; //prepare merge_ctx prepare_merge_context(MAJOR_MERGE, false, trans_version_range, merge_context); merge_context.array_count_ = 1; alloc_merge_infos(merge_context); OK(merge_context.prepare_index_builder(0, 1)); set_cg_idx(merge_context, 0, 1); ObCOMerger merger(merger_allocator_, merge_context.static_param_, 0, 1); ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0)); ASSERT_EQ(OB_SUCCESS, merge_context.create_sstables(0, 1)); ObSSTable *merged_sstable = static_cast(merge_context.merged_cg_tables_handle_.get_table(0)); ASSERT_NE(nullptr, merged_sstable); if (merged_sstable->is_co_sstable()) { static_cast(merged_sstable)->valid_for_cs_reading_ = true; static_cast(merged_sstable)->cs_meta_.column_group_cnt_ = 1; } const char *result1 = "bigint var bigint bigint bigint bigint flag multi_version_row_flag\n" "0 var1 -10 0 NULL 10 EXIST \n" "1 var1 -8 0 2 2 EXIST \n" "2 var1 -10 0 3 12 EXIST \n" "3 var1 -10 0 NULL 13 EXIST \n"; ObMockIterator res_iter; ObStoreRowIterator *scanner = NULL; ObDatumRange range; res_iter.reset(); range.set_whole_range(); trans_version_range.base_version_ = 1; trans_version_range.multi_version_start_ = 1; trans_version_range.snapshot_version_ = INT64_MAX; prepare_query_param(trans_version_range); ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param_, context_, range, scanner)); ASSERT_EQ(OB_SUCCESS, res_iter.from(result1)); ObMockDirectReadIterator sstable_iter; ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, full_read_info_)); bool is_equal = res_iter.equals(sstable_iter, false/*cmp multi version row flag*/); ASSERT_TRUE(is_equal); scanner->~ObStoreRowIterator(); handle1.reset(); handle2.reset(); merger.reset(); } // TEST_F(TestCOMerge, test_merge_default_row_store_with_major_table) // { // int ret = OB_SUCCESS; // ObCOMergeDagParam param; // ObCOTabletMergeCtx merge_context(param, allocator_, "COTabletMerge"); // int schema_rowkey_cnt = 2; // ObTableHandleV2 handle1; // const char *micro_data[3]; // micro_data[0] = // "bigint var bigint bigint bigint bigint flag multi_version_row_flag\n" // "0 var1 -8 0 0 1 EXIST \n" // "1 var1 -8 0 2 2 EXIST \n"; // micro_data[1] = // "bigint var bigint bigint bigint bigint flag multi_version_row_flag\n" // "2 var1 -8 0 3 2 EXIST \n" // "3 var1 -8 0 3 0 EXIST \n"; // micro_data[2] = // "bigint var bigint bigint bigint bigint flag multi_version_row_flag\n" // "4 var1 -6 0 2 2 EXIST \n"; // int snapshot_version = 20; // ObScnRange scn_range; // scn_range.start_scn_.convert_for_tx(10); // scn_range.end_scn_.convert_for_tx(20); // prepare_table_schema(micro_data, schema_rowkey_cnt, scn_range, snapshot_version); // add_all_and_each_column_group(); // reset_writer(snapshot_version); // prepare_one_macro(micro_data, 1); // prepare_one_macro(µ_data[1], 2); // prepare_data_end(handle1, ObITable::TableType::COLUMN_ORIENTED_SSTABLE, 0); // OK(static_cast(handle1.get_table())->build_cs_meta()); // //ASSERT_EQ(OB_SUCCESS, ObCOSSTable::make_co_sstable(allocator_, static_cast(handle1.get_table()), co_table_handle)); // merge_context.static_param_.tables_handle_.add_table(handle1); // STORAGE_LOG(INFO, "finish prepare co sstable"); // ObTableHandleV2 handle2; // const char *micro_data2[1]; // micro_data2[0] = // "bigint var bigint bigint bigint bigint dml flag multi_version_row_flag\n" // "0 var1 -10 0 1 10 T_DML_UPDATE EXIST LF\n" // "5 var1 -10 0 1 13 T_DML_UPDATE EXIST LF\n"; // snapshot_version = 20; // table_key_.scn_range_ = scn_range; // reset_writer(snapshot_version); // prepare_one_macro(micro_data2, 1); // prepare_data_end(handle2); // merge_context.static_param_.tables_handle_.add_table(handle2); // STORAGE_LOG(INFO, "finish prepare sstable2"); // ObVersionRange trans_version_range; // trans_version_range.snapshot_version_ = 100; // trans_version_range.multi_version_start_ = 7; // trans_version_range.base_version_ = 7; // //prepare merge_ctx // prepare_merge_context(MAJOR_MERGE, false, trans_version_range, merge_context); // merge_context.array_count_ = 1; // alloc_merge_infos(merge_context); // OK(merge_context.prepare_index_builder(0, 1)); // ObCOMergeDagParam *dag_param = static_cast(&merge_context.param_); // dag_param->start_cg_idx_ = 0; dag_param->end_cg_idx_ = 1; dag_param->compat_mode_ = lib::Worker::CompatMode::MYSQL; // ObCOMerger merger(merger_allocator_, 0, 1); // ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0)); // ASSERT_EQ(OB_SUCCESS, merge_context.create_sstables(0, 1)); // ObSSTable *merged_sstable = static_cast(merge_context.merged_cg_tables_handle_.get_table(0)); // ASSERT_NE(nullptr, merged_sstable); // const char *result1 = // "bigint var bigint bigint bigint bigint flag multi_version_row_flag\n" // "0 var1 -10 0 1 10 EXIST \n" // "1 var1 -8 0 2 2 EXIST \n" // "2 var1 -8 0 3 2 EXIST \n" // "3 var1 -8 0 3 0 EXIST \n" // "4 var1 -6 0 2 2 EXIST \n" // "5 var1 -10 0 1 13 EXIST \n"; // ObMockIterator res_iter; // ObStoreRowIterator *scanner = NULL; // ObDatumRange range; // res_iter.reset(); // range.set_whole_range(); // trans_version_range.base_version_ = 1; // trans_version_range.multi_version_start_ = 1; // trans_version_range.snapshot_version_ = INT64_MAX; // prepare_query_param(trans_version_range); // ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param_, context_, range, scanner)); // ASSERT_EQ(OB_SUCCESS, res_iter.from(result1)); // ObMockDirectReadIterator sstable_iter; // ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, full_read_info_)); // bool is_equal = res_iter.equals(sstable_iter, false/*cmp multi version row flag*/); // ASSERT_TRUE(is_equal); // scanner->~ObStoreRowIterator(); // handle1.reset(); // handle2.reset(); // merger.reset(); // } TEST_F(TestCOMerge, test_column_store_merge_with_empty_co_table) { int ret = OB_SUCCESS; ObCOTabletMergeCtx merge_context(dag_net_, param_, allocator_); ObTableHandleV2 handle1; const char *micro_data[3]; micro_data[0] = "bigint var bigint bigint bigint bigint dml flag multi_version_row_flag\n" "0 var1 -8 0 8 4 T_DML_UPDATE EXIST LF\n" "1 var1 -8 0 6 7 T_DML_INSERT EXIST CLF\n"; micro_data[1] = "bigint var bigint bigint bigint bigint dml flag multi_version_row_flag\n" "2 var1 -8 MIN 3 2 T_DML_UPDATE EXIST SCF\n" "2 var1 -8 0 3 NOP T_DML_UPDATE EXIST N\n"; micro_data[2] = "bigint var bigint bigint bigint bigint dml flag multi_version_row_flag\n" "2 var1 -6 0 2 2 T_DML_INSERT EXIST CL\n"; int schema_rowkey_cnt = 2; int64_t snapshot_version = 7; ObScnRange scn_range; scn_range.start_scn_.set_min(); scn_range.end_scn_.convert_for_tx(10); prepare_table_schema(micro_data, schema_rowkey_cnt, scn_range, snapshot_version); ObArray col_ids; ASSERT_EQ(OB_SUCCESS, get_col_ids(table_schema_, col_ids)); ASSERT_EQ(4, col_ids.count()); add_all_and_each_column_group(); init_tablet(); // create co sstable ObTableHandleV2 co_table_handle; create_empty_data_co_sstable(snapshot_version, co_table_handle); ASSERT_EQ(5, static_cast(co_table_handle.get_table())->cs_meta_.column_group_cnt_); merge_context.static_param_.tables_handle_.add_table(co_table_handle); reset_writer(snapshot_version); prepare_one_macro(micro_data, 2); prepare_one_macro(µ_data[2], 1); prepare_data_end(handle1); merge_context.static_param_.tables_handle_.add_table(handle1); STORAGE_LOG(INFO, "finish prepare sstable1"); ObVersionRange trans_version_range; trans_version_range.snapshot_version_ = 100; trans_version_range.multi_version_start_ = 7; trans_version_range.base_version_ = 7; //prepare merge_ctx prepare_merge_context(MAJOR_MERGE, false, trans_version_range, merge_context); alloc_merge_infos(merge_context); OK(merge_context.prepare_index_builder(0, 4)); set_cg_idx(merge_context, 0, 4); ObCOMerger merger(merger_allocator_, merge_context.static_param_, 0, 4); ObTableHandleV2 new_co_table_handle; ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0)); ASSERT_EQ(OB_SUCCESS, merge_context.create_sstables(0, 4)); const char *result1 = " bigint flag multi_version_row_flag\n" " 8 EXIST \n" " 6 EXIST \n" " 3 EXIST \n"; ObMockIterator res_iter; ObStoreRowIterator *scanner = nullptr; ObDatumRange range; res_iter.reset(); range.set_whole_range(); trans_version_range.base_version_ = 1; trans_version_range.multi_version_start_ = 1; trans_version_range.snapshot_version_ = INT64_MAX; ObTableIterParam iter_param; ObTableAccessContext context; const ObITableReadInfo *cg_read_info = nullptr; ObStoreCtx store_ctx; get_cg_read_info(col_ids.at(2), cg_read_info); prepare_scan_param(*cg_read_info, trans_version_range, store_ctx, iter_param, context); ObSSTable *merged_sstable = static_cast(merge_context.merged_cg_tables_handle_.get_table(3)); STORAGE_LOG(INFO, "chaser debug sstable", K(ret), KPC(merged_sstable), K(merge_context.merged_cg_tables_handle_)); ASSERT_NE(nullptr, merged_sstable); ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param, context, range, scanner)); ASSERT_EQ(OB_SUCCESS, res_iter.from(result1)); ObMockDirectReadIterator sstable_iter; ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, *cg_read_info)); bool is_equal = res_iter.equals(sstable_iter, false/*cmp multi version row flag*/); ASSERT_TRUE(is_equal); scanner->~ObStoreRowIterator(); handle1.reset(); merger.reset(); } TEST_F(TestCOMerge, test_co_merge_with_twice_major) { int ret = OB_SUCCESS; ObCOTabletMergeCtx merge_context(dag_net_, param_, allocator_); const char *micro_data[3]; micro_data[0] = "bigint var bigint bigint bigint dml flag multi_version_row_flag\n" "0 var1 -8 0 8 T_DML_UPDATE EXIST LF\n" "1 var1 -8 0 NOP T_DML_INSERT EXIST CLF\n"; micro_data[1] = "bigint var bigint bigint bigint dml flag multi_version_row_flag\n" "4 var1 -8 MIN 3 T_DML_UPDATE EXIST SCF\n" "4 var1 -8 0 3 T_DML_UPDATE EXIST N\n"; micro_data[2] = "bigint var bigint bigint bigint dml flag multi_version_row_flag\n" "4 var1 -6 0 2 T_DML_INSERT EXIST CL\n"; int schema_rowkey_cnt = 2; int64_t snapshot_version = 7; ObScnRange scn_range; scn_range.start_scn_.set_min(); scn_range.end_scn_.convert_for_tx(7); //prepare table schema prepare_table_schema(micro_data, schema_rowkey_cnt, scn_range, snapshot_version); ObArray col_ids; ASSERT_EQ(OB_SUCCESS, get_col_ids(table_schema_, col_ids)); ASSERT_EQ(3, col_ids.count()); add_all_and_each_column_group(); init_tablet(); // create co sstable ObTableHandleV2 co_table_handle; create_empty_data_co_sstable(snapshot_version, co_table_handle); ASSERT_EQ(4, static_cast(co_table_handle.get_table())->cs_meta_.column_group_cnt_); merge_context.static_param_.tables_handle_.add_table(co_table_handle); ObTableHandleV2 handle1; reset_writer(snapshot_version); prepare_one_macro(micro_data, 2); prepare_one_macro(µ_data[2], 1); prepare_data_end(handle1); merge_context.static_param_.tables_handle_.add_table(handle1); STORAGE_LOG(INFO, "finish prepare sstable1"); ObVersionRange trans_version_range; trans_version_range.snapshot_version_ = 9; trans_version_range.multi_version_start_ = 7; trans_version_range.base_version_ = 7; //prepare merge_ctx prepare_merge_context(MAJOR_MERGE, false, trans_version_range, merge_context); merge_context.array_count_ = 4; alloc_merge_infos(merge_context); OK(merge_context.prepare_index_builder(0, 4)); set_cg_idx(merge_context, 0, 4); ObCOMerger merger(merger_allocator_, merge_context.static_param_, 0, 4); ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0)); ASSERT_EQ(OB_SUCCESS, merge_context.create_sstables(0, 4)); ASSERT_EQ(4, merge_context.merged_cg_tables_handle_.get_count()); const char *result[4]; result[0] = "bigint var bigint bigint bigint flag multi_version_row_flag\n" "0 var1 -8 0 8 EXIST \n" "1 var1 -8 0 NULL EXIST \n" "4 var1 -8 0 3 EXIST \n"; result[1] = "bigint flag multi_version_row_flag\n" "0 EXIST \n" "1 EXIST \n" "4 EXIST \n"; result[2] = "var flag multi_version_row_flag\n" "var1 EXIST \n" "var1 EXIST \n" "var1 EXIST \n"; result[3] = "bigint flag multi_version_row_flag\n" "8 EXIST \n" "NULL EXIST \n" "3 EXIST \n"; ObTableHandleV2 new_co_table_handle; init_co_sstable(merge_context.merged_cg_tables_handle_, 4); for (int64_t i = 0; i < 4; i++) { ObDatumRange range; range.set_whole_range(); trans_version_range.base_version_ = 1; trans_version_range.multi_version_start_ = 1; trans_version_range.snapshot_version_ = INT64_MAX; ObTableIterParam iter_param; ObTableAccessContext context; const ObITableReadInfo *cg_read_info = nullptr; ObStoreCtx store_ctx; ObSSTable *merged_sstable = static_cast(merge_context.merged_cg_tables_handle_.get_table(i)); ASSERT_NE(nullptr, merged_sstable); if (i > 0) { get_cg_read_info(col_ids.at(i - 1), cg_read_info); } else { cg_read_info = &full_read_info_; } if (merged_sstable->is_co_sstable()) { merge_context.merged_cg_tables_handle_.get_table(i, new_co_table_handle); } ObStoreRowIterator *scanner = nullptr; ObMockDirectReadIterator sstable_iter; prepare_scan_param(*cg_read_info, trans_version_range, store_ctx, iter_param, context); ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param, context, range, scanner)); ASSERT_NE(nullptr, scanner); ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, *cg_read_info)); ObMockIterator res_iter; res_iter.reset(); ASSERT_EQ(OB_SUCCESS, res_iter.from(result[i])); bool is_equal = res_iter.equals(sstable_iter, false/*cmp multi version row flag*/); ASSERT_TRUE(is_equal); scanner->~ObStoreRowIterator(); } handle1.reset(); merger.reset(); ObCOTabletMergeCtx new_merge_context(dag_net_, param_, allocator_); new_merge_context.static_param_.tables_handle_.add_table(new_co_table_handle); ObTableHandleV2 handle2; const char *micro_data2[1]; micro_data2[0] = "bigint var bigint bigint bigint dml flag multi_version_row_flag\n" "0 var1 -10 0 nop T_DML_UPDATE EXIST LF\n" "1 var1 -10 0 1 T_DML_UPDATE EXIST LF\n" "3 var1 -10 0 1 T_DML_UPDATE EXIST LF\n" "5 var1 -10 0 1 T_DML_UPDATE EXIST LF\n"; snapshot_version = 20; table_key_.scn_range_.start_scn_.convert_for_tx(10); table_key_.scn_range_.end_scn_.convert_for_tx(20); reset_writer(snapshot_version); prepare_one_macro(micro_data2, 1); prepare_data_end(handle2); new_merge_context.static_param_.tables_handle_.add_table(handle2); STORAGE_LOG(INFO, "finish prepare sstable2"); trans_version_range.snapshot_version_ = 100; trans_version_range.multi_version_start_ = 7; trans_version_range.base_version_ = 7; //prepare new merge_ctx prepare_merge_context(MAJOR_MERGE, false, trans_version_range, new_merge_context); new_merge_context.array_count_ = 4; alloc_merge_infos(new_merge_context); OK(new_merge_context.prepare_index_builder(0, 4)); set_cg_idx(new_merge_context, 0, 4); ObCOMerger merger2(merger_allocator_, new_merge_context.static_param_, 0, 4); ASSERT_EQ(OB_SUCCESS, merger2.merge_partition(new_merge_context, 0)); ASSERT_EQ(OB_SUCCESS, new_merge_context.create_sstables(0, 4)); const char *new_result[4]; new_result[0] = "bigint var bigint bigint bigint flag multi_version_row_flag\n" "0 var1 -10 0 8 EXIST \n" "1 var1 -10 0 1 EXIST \n" "3 var1 -10 0 1 EXIST \n" "4 var1 -8 0 3 EXIST \n" "5 var1 -10 0 1 EXIST \n"; new_result[1] = "bigint flag multi_version_row_flag\n" "0 EXIST \n" "1 EXIST \n" "3 EXIST \n" "4 EXIST \n" "5 EXIST \n"; new_result[2] = "var flag multi_version_row_flag\n" "var1 EXIST \n" "var1 EXIST \n" "var1 EXIST \n" "var1 EXIST \n" "var1 EXIST \n"; new_result[3] = "bigint flag multi_version_row_flag\n" "8 EXIST \n" "1 EXIST \n" "1 EXIST \n" "3 EXIST \n" "1 EXIST \n"; init_co_sstable(new_merge_context.merged_cg_tables_handle_, 4); for (int64_t i = 0; i < 4; i++) { ObDatumRange range; range.set_whole_range(); trans_version_range.base_version_ = 1; trans_version_range.multi_version_start_ = 1; trans_version_range.snapshot_version_ = INT64_MAX; ObTableIterParam iter_param; ObTableAccessContext context; const ObITableReadInfo *cg_read_info = nullptr; ObStoreCtx store_ctx; ObSSTable *merged_sstable = static_cast(new_merge_context.merged_cg_tables_handle_.get_table(i)); ASSERT_NE(nullptr, merged_sstable); if (i > 0) { get_cg_read_info(col_ids.at(i - 1), cg_read_info); } else { cg_read_info = &full_read_info_; } ObStoreRowIterator *scanner = nullptr; ObMockDirectReadIterator sstable_iter; prepare_scan_param(*cg_read_info, trans_version_range, store_ctx, iter_param, context); ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param, context, range, scanner)); ASSERT_NE(nullptr, scanner); ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, *cg_read_info)); ObMockIterator res_iter; res_iter.reset(); ASSERT_EQ(OB_SUCCESS, res_iter.from(new_result[i])); bool is_equal = res_iter.equals(sstable_iter, false/*cmp multi version row flag*/); ASSERT_TRUE(is_equal); scanner->~ObStoreRowIterator(); } } TEST_F(TestCOMerge, test_merge_range) { int ret = OB_SUCCESS; ObCOTabletMergeCtx merge_context(dag_net_, param_, allocator_); ObCOMerger merger(merger_allocator_, merge_context.static_param_, 0, 3); const char *co_table_data[1]; co_table_data[0]= "bigint bigint bigint bigint flag multi_version_row_flag\n" "0 -8 0 8 EXIST \n" "1 -8 0 NULL EXIST \n" "3 -8 0 3 EXIST \n" "4 -8 0 2 EXIST \n" "6 -8 0 3 EXIST \n" "7 -8 0 3 EXIST \n" "9 -8 0 3 EXIST \n" "12 -8 0 3 EXIST \n"; const char *micro_data1[1]; micro_data1[0] = "bigint bigint bigint bigint dml flag multi_version_row_flag\n" "0 -13 0 9 T_DML_UPDATE EXIST CLF\n" "1 -11 0 NOP T_DML_INSERT EXIST CLF\n" "5 -10 0 NOP T_DML_INSERT EXIST CLF\n"; int schema_rowkey_cnt = 1; int64_t snapshot_version = 10; ObScnRange scn_range; scn_range.start_scn_.set_min(); scn_range.end_scn_.convert_for_tx(10); //prepare table schema prepare_table_schema(micro_data1, schema_rowkey_cnt, scn_range, snapshot_version); ObArray col_ids; ASSERT_EQ(OB_SUCCESS, get_col_ids(table_schema_, col_ids)); ASSERT_EQ(2, col_ids.count()); add_all_and_each_column_group(); init_tablet(); // create co sstable ObMockIterator data_iter; data_iter.reset(); OK(data_iter.from(co_table_data[0])); ObTableHandleV2 co_table_handle; const int64_t micro_row_count[4] = {20, 2, 2, 1}; const int64_t macro_row_count[4] = {30, 4, 4, 3}; prepare_co_sstable(table_schema_, MAJOR_MERGE, snapshot_version, 0, micro_row_count, macro_row_count, data_iter, co_table_handle); ASSERT_EQ(3, static_cast(co_table_handle.get_table())->cs_meta_.column_group_cnt_); merge_context.static_param_.tables_handle_.add_table(co_table_handle); ObTableHandleV2 handle1; scn_range.start_scn_.convert_for_tx(10); scn_range.end_scn_.convert_for_tx(20); table_key_.scn_range_ = scn_range; reset_writer(snapshot_version); prepare_one_macro(micro_data1, 1); prepare_data_end(handle1); merge_context.static_param_.tables_handle_.add_table(handle1); STORAGE_LOG(INFO, "finish prepare sstable1"); ObVersionRange trans_version_range; trans_version_range.snapshot_version_ = 100; trans_version_range.multi_version_start_ = 7; trans_version_range.base_version_ = 7; //prepare merge_ctx prepare_merge_context(MAJOR_MERGE, false, trans_version_range, merge_context); merge_context.array_count_ = 3; alloc_merge_infos(merge_context); OK(merge_context.prepare_index_builder(0, 3)); //prepare merge_range ObDatumRowkey start_key, end_key; void *buf = allocator_.alloc(sizeof(ObStorageDatum) * 2); ObStorageDatum *datums = new (buf) ObStorageDatum[2](); datums[0].set_int(3); datums[1].set_int(6); start_key.datums_ = datums; start_key.datum_cnt_ = 1; end_key.datums_ = &datums[1]; end_key.datum_cnt_ = 1; ObDatumRange merge_range; merge_range.reset(); merge_range.set_start_key(start_key); merge_range.set_end_key(end_key); merge_range.set_left_closed(); merge_range.set_right_closed(); merge_context.parallel_merge_ctx_.range_array_.reset(); OK(merge_context.parallel_merge_ctx_.range_array_.push_back(merge_range)); set_cg_idx(merge_context, 0, 3); ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0)); STORAGE_LOG(INFO, "finish co merge"); ASSERT_EQ(OB_SUCCESS, merge_context.create_sstables(0,3)); ASSERT_EQ(3, merge_context.merged_cg_tables_handle_.get_count()); const char *result[3]; result[0] = "bigint bigint bigint bigint flag multi_version_row_flag\n" "3 -8 0 3 EXIST \n" "4 -8 0 2 EXIST \n" "5 -10 0 NULL EXIST \n" "6 -8 0 3 EXIST \n"; result[1] = "bigint flag multi_version_row_flag\n" "3 EXIST \n" "4 EXIST \n" "5 EXIST \n" "6 EXIST \n"; result[2] = "bigint flag multi_version_row_flag\n" "3 EXIST \n" "2 EXIST \n" "NULL EXIST \n" "3 EXIST \n"; init_co_sstable(merge_context.merged_cg_tables_handle_, 3); for (int64_t i = 0; i < 3; i++) { ObDatumRange range; range.set_whole_range(); trans_version_range.base_version_ = 1; trans_version_range.multi_version_start_ = 1; trans_version_range.snapshot_version_ = INT64_MAX; ObTableIterParam iter_param; ObTableAccessContext context; const ObITableReadInfo *cg_read_info = nullptr; ObStoreCtx store_ctx; ObSSTable *merged_sstable = static_cast(merge_context.merged_cg_tables_handle_.get_table(i)); ASSERT_NE(nullptr, merged_sstable); if (i > 0) { get_cg_read_info(col_ids.at(i - 1), cg_read_info); } else { cg_read_info = &full_read_info_; } ObStoreRowIterator *scanner = nullptr; ObMockDirectReadIterator sstable_iter; prepare_scan_param(*cg_read_info, trans_version_range, store_ctx, iter_param, context); ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param, context, range, scanner)); ASSERT_NE(nullptr, scanner); ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, *cg_read_info)); ObMockIterator res_iter; res_iter.reset(); ASSERT_EQ(OB_SUCCESS, res_iter.from(result[i])); bool is_equal = res_iter.equals(sstable_iter, false/*cmp multi version row flag*/); ASSERT_TRUE(is_equal); scanner->~ObStoreRowIterator(); } } TEST_F(TestCOMerge, test_merge_range_with_open) { int ret = OB_SUCCESS; ObCOTabletMergeCtx merge_context(dag_net_, param_, allocator_); ObCOMerger merger(merger_allocator_, merge_context.static_param_, 0, 3); const char *co_table_data[1]; co_table_data[0]= "bigint bigint bigint bigint flag multi_version_row_flag\n" "0 -8 0 8 EXIST \n" "1 -8 0 NULL EXIST \n" "3 -8 0 3 EXIST \n" "4 -8 0 2 EXIST \n" "6 -8 0 3 EXIST \n" "7 -8 0 3 EXIST \n" "9 -8 0 3 EXIST \n" "12 -8 0 3 EXIST \n"; const char *micro_data1[1]; micro_data1[0] = "bigint bigint bigint bigint dml flag multi_version_row_flag\n" "0 -13 0 9 T_DML_UPDATE EXIST CLF\n" "1 -11 0 NOP T_DML_INSERT EXIST CLF\n" "5 -10 0 NOP T_DML_INSERT EXIST CLF\n"; int schema_rowkey_cnt = 1; int64_t snapshot_version = 10; ObScnRange scn_range; scn_range.start_scn_.set_min(); scn_range.end_scn_.convert_for_tx(10); //prepare table schema prepare_table_schema(micro_data1, schema_rowkey_cnt, scn_range, snapshot_version); ObArray col_ids; ASSERT_EQ(OB_SUCCESS, get_col_ids(table_schema_, col_ids)); ASSERT_EQ(2, col_ids.count()); add_all_and_each_column_group(); init_tablet(); // create co sstable ObMockIterator data_iter; data_iter.reset(); OK(data_iter.from(co_table_data[0])); ObTableHandleV2 co_table_handle; const int64_t micro_row_count[4] = {20, 2, 2, 1}; const int64_t macro_row_count[4] = {30, 4, 4, 3}; prepare_co_sstable(table_schema_, MAJOR_MERGE, snapshot_version, 0, micro_row_count, macro_row_count, data_iter, co_table_handle); ASSERT_EQ(3, static_cast(co_table_handle.get_table())->cs_meta_.column_group_cnt_); merge_context.static_param_.tables_handle_.add_table(co_table_handle); ObTableHandleV2 handle1; scn_range.start_scn_.convert_for_tx(10); scn_range.end_scn_.convert_for_tx(20); table_key_.scn_range_ = scn_range; reset_writer(snapshot_version); prepare_one_macro(micro_data1, 1); prepare_data_end(handle1); merge_context.static_param_.tables_handle_.add_table(handle1); STORAGE_LOG(INFO, "finish prepare sstable1"); ObVersionRange trans_version_range; trans_version_range.snapshot_version_ = 100; trans_version_range.multi_version_start_ = 7; trans_version_range.base_version_ = 7; //prepare merge_ctx prepare_merge_context(MAJOR_MERGE, false, trans_version_range, merge_context); merge_context.array_count_ = 3; alloc_merge_infos(merge_context); OK(merge_context.prepare_index_builder(0, 3)); //prepare merge_range ObDatumRowkey start_key, end_key; void *buf = allocator_.alloc(sizeof(ObStorageDatum) * 2); ObStorageDatum *datums = new (buf) ObStorageDatum[2](); datums[0].set_int(3); datums[1].set_int(6); start_key.datums_ = datums; start_key.datum_cnt_ = 1; end_key.datums_ = &datums[1]; end_key.datum_cnt_ = 1; ObDatumRange merge_range; merge_range.reset(); merge_range.set_start_key(start_key); merge_range.set_end_key(end_key); merge_range.set_left_open(); merge_range.set_right_open(); merge_context.parallel_merge_ctx_.range_array_.reset(); OK(merge_context.parallel_merge_ctx_.range_array_.push_back(merge_range)); set_cg_idx(merge_context, 0, 3); ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0)); STORAGE_LOG(INFO, "finish co merge"); ASSERT_EQ(OB_SUCCESS, merge_context.create_sstables(0,3)); ASSERT_EQ(3, merge_context.merged_cg_tables_handle_.get_count()); const char *result[3]; result[0] = "bigint bigint bigint bigint flag multi_version_row_flag\n" "4 -8 0 2 EXIST \n" "5 -10 0 NULL EXIST \n"; result[1] = "bigint flag multi_version_row_flag\n" "4 EXIST \n" "5 EXIST \n"; result[2] = "bigint flag multi_version_row_flag\n" "2 EXIST \n" "NULL EXIST \n"; init_co_sstable(merge_context.merged_cg_tables_handle_, 3); for (int64_t i = 0; i < 3; i++) { ObDatumRange range; range.set_whole_range(); trans_version_range.base_version_ = 1; trans_version_range.multi_version_start_ = 1; trans_version_range.snapshot_version_ = INT64_MAX; ObTableIterParam iter_param; ObTableAccessContext context; const ObITableReadInfo *cg_read_info = nullptr; ObStoreCtx store_ctx; ObSSTable *merged_sstable = static_cast(merge_context.merged_cg_tables_handle_.get_table(i)); ASSERT_NE(nullptr, merged_sstable); if (i > 0) { get_cg_read_info(col_ids.at(i - 1), cg_read_info); } else { cg_read_info = &full_read_info_; } ObStoreRowIterator *scanner = nullptr; ObMockDirectReadIterator sstable_iter; prepare_scan_param(*cg_read_info, trans_version_range, store_ctx, iter_param, context); ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param, context, range, scanner)); ASSERT_NE(nullptr, scanner); ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, *cg_read_info)); ObMockIterator res_iter; res_iter.reset(); ASSERT_EQ(OB_SUCCESS, res_iter.from(result[i])); bool is_equal = res_iter.equals(sstable_iter, false/*cmp multi version row flag*/); ASSERT_TRUE(is_equal); scanner->~ObStoreRowIterator(); } } TEST_F(TestCOMerge, test_merge_range_with_left_open) { int ret = OB_SUCCESS; ObCOTabletMergeCtx merge_context(dag_net_, param_, allocator_); ObCOMerger merger(merger_allocator_, merge_context.static_param_, 0, 3); const char *co_table_data[1]; co_table_data[0]= "bigint bigint bigint bigint flag multi_version_row_flag\n" "0 -8 0 8 EXIST \n" "1 -8 0 NULL EXIST \n" "3 -8 0 3 EXIST \n" "4 -8 0 2 EXIST \n" "6 -8 0 3 EXIST \n" "7 -8 0 3 EXIST \n" "9 -8 0 3 EXIST \n" "12 -8 0 3 EXIST \n"; const char *micro_data1[1]; micro_data1[0] = "bigint bigint bigint bigint dml flag multi_version_row_flag\n" "0 -13 0 9 T_DML_UPDATE EXIST CLF\n" "1 -11 0 NOP T_DML_INSERT EXIST CLF\n" "5 -10 0 NOP T_DML_INSERT EXIST CLF\n"; int schema_rowkey_cnt = 1; int64_t snapshot_version = 10; ObScnRange scn_range; scn_range.start_scn_.set_min(); scn_range.end_scn_.convert_for_tx(10); //prepare table schema prepare_table_schema(micro_data1, schema_rowkey_cnt, scn_range, snapshot_version); ObArray col_ids; ASSERT_EQ(OB_SUCCESS, get_col_ids(table_schema_, col_ids)); ASSERT_EQ(2, col_ids.count()); add_all_and_each_column_group(); init_tablet(); // create co sstable ObMockIterator data_iter; data_iter.reset(); OK(data_iter.from(co_table_data[0])); ObTableHandleV2 co_table_handle; const int64_t micro_row_count[4] = {20, 2, 2, 1}; const int64_t macro_row_count[4] = {30, 4, 4, 3}; prepare_co_sstable(table_schema_, MAJOR_MERGE, snapshot_version, 0, micro_row_count, macro_row_count, data_iter, co_table_handle); ASSERT_EQ(3, static_cast(co_table_handle.get_table())->cs_meta_.column_group_cnt_); merge_context.static_param_.tables_handle_.add_table(co_table_handle); ObTableHandleV2 handle1; scn_range.start_scn_.convert_for_tx(10); scn_range.end_scn_.convert_for_tx(20); table_key_.scn_range_ = scn_range; reset_writer(snapshot_version); prepare_one_macro(micro_data1, 1); prepare_data_end(handle1); merge_context.static_param_.tables_handle_.add_table(handle1); STORAGE_LOG(INFO, "finish prepare sstable1"); ObVersionRange trans_version_range; trans_version_range.snapshot_version_ = 100; trans_version_range.multi_version_start_ = 7; trans_version_range.base_version_ = 7; //prepare merge_ctx prepare_merge_context(MAJOR_MERGE, false, trans_version_range, merge_context); merge_context.array_count_ = 3; alloc_merge_infos(merge_context); OK(merge_context.prepare_index_builder(0, 3)); //prepare merge_range ObDatumRowkey start_key, end_key; void *buf = allocator_.alloc(sizeof(ObStorageDatum) * 2); ObStorageDatum *datums = new (buf) ObStorageDatum[2](); datums[0].set_int(3); datums[1].set_int(6); start_key.datums_ = datums; start_key.datum_cnt_ = 1; end_key.datums_ = &datums[1]; end_key.datum_cnt_ = 1; ObDatumRange merge_range; merge_range.reset(); merge_range.set_start_key(start_key); merge_range.set_end_key(end_key); merge_range.set_left_open(); merge_range.set_right_closed(); merge_context.parallel_merge_ctx_.range_array_.reset(); OK(merge_context.parallel_merge_ctx_.range_array_.push_back(merge_range)); set_cg_idx(merge_context, 0, 3); ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0)); STORAGE_LOG(INFO, "finish co merge"); ASSERT_EQ(OB_SUCCESS, merge_context.create_sstables(0,3)); ASSERT_EQ(3, merge_context.merged_cg_tables_handle_.get_count()); const char *result[3]; result[0] = "bigint bigint bigint bigint flag multi_version_row_flag\n" "4 -8 0 2 EXIST \n" "5 -10 0 NULL EXIST \n" "6 -8 0 3 EXIST \n"; result[1] = "bigint flag multi_version_row_flag\n" "4 EXIST \n" "5 EXIST \n" "6 EXIST \n"; result[2] = "bigint flag multi_version_row_flag\n" "2 EXIST \n" "NULL EXIST \n" "3 EXIST \n"; init_co_sstable(merge_context.merged_cg_tables_handle_, 3); for (int64_t i = 0; i < 3; i++) { ObDatumRange range; range.set_whole_range(); trans_version_range.base_version_ = 1; trans_version_range.multi_version_start_ = 1; trans_version_range.snapshot_version_ = INT64_MAX; ObTableIterParam iter_param; ObTableAccessContext context; const ObITableReadInfo *cg_read_info = nullptr; ObStoreCtx store_ctx; ObSSTable *merged_sstable = static_cast(merge_context.merged_cg_tables_handle_.get_table(i)); ASSERT_NE(nullptr, merged_sstable); if (i > 0) { get_cg_read_info(col_ids.at(i - 1), cg_read_info); } else { cg_read_info = &full_read_info_; } ObStoreRowIterator *scanner = nullptr; ObMockDirectReadIterator sstable_iter; prepare_scan_param(*cg_read_info, trans_version_range, store_ctx, iter_param, context); ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param, context, range, scanner)); ASSERT_NE(nullptr, scanner); ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, *cg_read_info)); ObMockIterator res_iter; res_iter.reset(); ASSERT_EQ(OB_SUCCESS, res_iter.from(result[i])); bool is_equal = res_iter.equals(sstable_iter, false/*cmp multi version row flag*/); ASSERT_TRUE(is_equal); scanner->~ObStoreRowIterator(); } } TEST_F(TestCOMerge, test_merge_range_with_right_open) { int ret = OB_SUCCESS; ObCOTabletMergeCtx merge_context(dag_net_, param_, allocator_); ObCOMerger merger(merger_allocator_, merge_context.static_param_, 0, 3); const char *co_table_data[1]; co_table_data[0]= "bigint bigint bigint bigint flag multi_version_row_flag\n" "0 -8 0 8 EXIST \n" "1 -8 0 NULL EXIST \n" "3 -8 0 3 EXIST \n" "4 -8 0 2 EXIST \n" "6 -8 0 3 EXIST \n" "7 -8 0 3 EXIST \n" "9 -8 0 3 EXIST \n" "12 -8 0 3 EXIST \n"; const char *micro_data1[1]; micro_data1[0] = "bigint bigint bigint bigint dml flag multi_version_row_flag\n" "0 -13 0 9 T_DML_UPDATE EXIST CLF\n" "1 -11 0 NOP T_DML_INSERT EXIST CLF\n" "5 -10 0 NOP T_DML_INSERT EXIST CLF\n"; int schema_rowkey_cnt = 1; int64_t snapshot_version = 10; ObScnRange scn_range; scn_range.start_scn_.set_min(); scn_range.end_scn_.convert_for_tx(10); //prepare table schema prepare_table_schema(micro_data1, schema_rowkey_cnt, scn_range, snapshot_version); ObArray col_ids; ASSERT_EQ(OB_SUCCESS, get_col_ids(table_schema_, col_ids)); ASSERT_EQ(2, col_ids.count()); add_all_and_each_column_group(); init_tablet(); // create co sstable ObMockIterator data_iter; data_iter.reset(); OK(data_iter.from(co_table_data[0])); ObTableHandleV2 co_table_handle; const int64_t micro_row_count[4] = {20, 2, 2, 1}; const int64_t macro_row_count[4] = {30, 4, 4, 3}; prepare_co_sstable(table_schema_, MAJOR_MERGE, snapshot_version, 0, micro_row_count, macro_row_count, data_iter, co_table_handle); ASSERT_EQ(3, static_cast(co_table_handle.get_table())->cs_meta_.column_group_cnt_); merge_context.static_param_.tables_handle_.add_table(co_table_handle); ObTableHandleV2 handle1; scn_range.start_scn_.convert_for_tx(10); scn_range.end_scn_.convert_for_tx(20); table_key_.scn_range_ = scn_range; reset_writer(snapshot_version); prepare_one_macro(micro_data1, 1); prepare_data_end(handle1); merge_context.static_param_.tables_handle_.add_table(handle1); STORAGE_LOG(INFO, "finish prepare sstable1"); ObVersionRange trans_version_range; trans_version_range.snapshot_version_ = 100; trans_version_range.multi_version_start_ = 7; trans_version_range.base_version_ = 7; //prepare merge_ctx prepare_merge_context(MAJOR_MERGE, false, trans_version_range, merge_context); merge_context.array_count_ = 3; alloc_merge_infos(merge_context); OK(merge_context.prepare_index_builder(0, 3)); //prepare merge_range ObDatumRowkey start_key, end_key; void *buf = allocator_.alloc(sizeof(ObStorageDatum) * 2); ObStorageDatum *datums = new (buf) ObStorageDatum[2](); datums[0].set_int(3); datums[1].set_int(6); start_key.datums_ = datums; start_key.datum_cnt_ = 1; end_key.datums_ = &datums[1]; end_key.datum_cnt_ = 1; ObDatumRange merge_range; merge_range.reset(); merge_range.set_start_key(start_key); merge_range.set_end_key(end_key); merge_range.set_left_closed(); merge_range.set_right_open(); merge_context.parallel_merge_ctx_.range_array_.reset(); OK(merge_context.parallel_merge_ctx_.range_array_.push_back(merge_range)); set_cg_idx(merge_context, 0, 3); ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0)); STORAGE_LOG(INFO, "finish co merge"); ASSERT_EQ(OB_SUCCESS, merge_context.create_sstables(0,3)); ASSERT_EQ(3, merge_context.merged_cg_tables_handle_.get_count()); const char *result[3]; result[0] = "bigint bigint bigint bigint flag multi_version_row_flag\n" "3 -8 0 3 EXIST \n" "4 -8 0 2 EXIST \n" "5 -10 0 NULL EXIST \n"; result[1] = "bigint flag multi_version_row_flag\n" "3 EXIST \n" "4 EXIST \n" "5 EXIST \n"; result[2] = "bigint flag multi_version_row_flag\n" "3 EXIST \n" "2 EXIST \n" "NULL EXIST \n"; init_co_sstable(merge_context.merged_cg_tables_handle_, 3); for (int64_t i = 0; i < 3; i++) { ObDatumRange range; range.set_whole_range(); trans_version_range.base_version_ = 1; trans_version_range.multi_version_start_ = 1; trans_version_range.snapshot_version_ = INT64_MAX; ObTableIterParam iter_param; ObTableAccessContext context; const ObITableReadInfo *cg_read_info = nullptr; ObStoreCtx store_ctx; ObSSTable *merged_sstable = static_cast(merge_context.merged_cg_tables_handle_.get_table(i)); ASSERT_NE(nullptr, merged_sstable); if (i > 0) { get_cg_read_info(col_ids.at(i - 1), cg_read_info); } else { cg_read_info = &full_read_info_; } ObStoreRowIterator *scanner = nullptr; ObMockDirectReadIterator sstable_iter; prepare_scan_param(*cg_read_info, trans_version_range, store_ctx, iter_param, context); ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param, context, range, scanner)); ASSERT_NE(nullptr, scanner); ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, *cg_read_info)); ObMockIterator res_iter; res_iter.reset(); ASSERT_EQ(OB_SUCCESS, res_iter.from(result[i])); bool is_equal = res_iter.equals(sstable_iter, false/*cmp multi version row flag*/); ASSERT_TRUE(is_equal); scanner->~ObStoreRowIterator(); } } TEST_F(TestCOMerge, test_merge_range_left_is_min) { int ret = OB_SUCCESS; ObCOTabletMergeCtx merge_context(dag_net_, param_, allocator_); ObCOMerger merger(merger_allocator_, merge_context.static_param_, 0, 3); const char *co_table_data[1]; co_table_data[0]= "bigint bigint bigint bigint flag multi_version_row_flag\n" "0 -8 0 8 EXIST \n" "1 -8 0 NULL EXIST \n" "3 -8 0 3 EXIST \n" "4 -8 0 2 EXIST \n" "6 -8 0 3 EXIST \n" "7 -8 0 3 EXIST \n" "9 -8 0 3 EXIST \n" "12 -8 0 3 EXIST \n"; const char *micro_data1[1]; micro_data1[0] = "bigint bigint bigint bigint dml flag multi_version_row_flag\n" "0 -13 0 9 T_DML_UPDATE EXIST CLF\n" "1 -11 0 NOP T_DML_INSERT EXIST CLF\n" "5 -10 0 NOP T_DML_INSERT EXIST CLF\n"; int schema_rowkey_cnt = 1; int64_t snapshot_version = 10; ObScnRange scn_range; scn_range.start_scn_.set_min(); scn_range.end_scn_.convert_for_tx(10); //prepare table schema prepare_table_schema(micro_data1, schema_rowkey_cnt, scn_range, snapshot_version); ObArray col_ids; ASSERT_EQ(OB_SUCCESS, get_col_ids(table_schema_, col_ids)); ASSERT_EQ(2, col_ids.count()); add_all_and_each_column_group(); init_tablet(); // create co sstable ObMockIterator data_iter; data_iter.reset(); OK(data_iter.from(co_table_data[0])); ObTableHandleV2 co_table_handle; const int64_t micro_row_count[4] = {20, 2, 2, 1}; const int64_t macro_row_count[4] = {30, 4, 4, 3}; prepare_co_sstable(table_schema_, MAJOR_MERGE, snapshot_version, 0, micro_row_count, macro_row_count, data_iter, co_table_handle); ASSERT_EQ(3, static_cast(co_table_handle.get_table())->cs_meta_.column_group_cnt_); merge_context.static_param_.tables_handle_.add_table(co_table_handle); ObTableHandleV2 handle1; scn_range.start_scn_.convert_for_tx(10); scn_range.end_scn_.convert_for_tx(20); table_key_.scn_range_ = scn_range; reset_writer(snapshot_version); prepare_one_macro(micro_data1, 1); prepare_data_end(handle1); merge_context.static_param_.tables_handle_.add_table(handle1); STORAGE_LOG(INFO, "finish prepare sstable1"); ObVersionRange trans_version_range; trans_version_range.snapshot_version_ = 100; trans_version_range.multi_version_start_ = 7; trans_version_range.base_version_ = 7; //prepare merge_ctx prepare_merge_context(MAJOR_MERGE, false, trans_version_range, merge_context); merge_context.array_count_ = 3; alloc_merge_infos(merge_context); OK(merge_context.prepare_index_builder(0, 3)); //prepare merge_range ObDatumRowkey start_key, end_key; void *buf = allocator_.alloc(sizeof(ObStorageDatum) * 2); ObStorageDatum *datums = new (buf) ObStorageDatum[2](); datums[0].set_int(3); datums[1].set_int(6); start_key.datums_ = datums; start_key.datum_cnt_ = 1; end_key.datums_ = &datums[1]; end_key.datum_cnt_ = 1; ObDatumRange merge_range; merge_range.reset(); merge_range.start_key_.set_min_rowkey(); merge_range.set_end_key(end_key); merge_range.set_left_open(); merge_range.set_right_closed(); merge_context.parallel_merge_ctx_.range_array_.reset(); OK(merge_context.parallel_merge_ctx_.range_array_.push_back(merge_range)); set_cg_idx(merge_context, 0, 3); ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0)); STORAGE_LOG(INFO, "finish co merge"); ASSERT_EQ(OB_SUCCESS, merge_context.create_sstables(0,3)); ASSERT_EQ(3, merge_context.merged_cg_tables_handle_.get_count()); const char *result[3]; result[0] = "bigint bigint bigint bigint flag multi_version_row_flag\n" "0 -13 0 9 EXIST \n" "1 -11 0 NULL EXIST \n" "3 -8 0 3 EXIST \n" "4 -8 0 2 EXIST \n" "5 -10 0 NULL EXIST \n" "6 -8 0 3 EXIST \n"; result[1] = "bigint flag multi_version_row_flag\n" "0 EXIST \n" "1 EXIST \n" "3 EXIST \n" "4 EXIST \n" "5 EXIST \n" "6 EXIST \n"; result[2] = "bigint flag multi_version_row_flag\n" "9 EXIST \n" "NULL EXIST \n" "3 EXIST \n" "2 EXIST \n" "NULL EXIST \n" "3 EXIST \n"; init_co_sstable(merge_context.merged_cg_tables_handle_, 3); for (int64_t i = 0; i < 3; i++) { ObDatumRange range; range.set_whole_range(); trans_version_range.base_version_ = 1; trans_version_range.multi_version_start_ = 1; trans_version_range.snapshot_version_ = INT64_MAX; ObTableIterParam iter_param; ObTableAccessContext context; const ObITableReadInfo *cg_read_info = nullptr; ObStoreCtx store_ctx; ObSSTable *merged_sstable = static_cast(merge_context.merged_cg_tables_handle_.get_table(i)); ASSERT_NE(nullptr, merged_sstable); if (i > 0) { get_cg_read_info(col_ids.at(i - 1), cg_read_info); } else { cg_read_info = &full_read_info_; } ObStoreRowIterator *scanner = nullptr; ObMockDirectReadIterator sstable_iter; prepare_scan_param(*cg_read_info, trans_version_range, store_ctx, iter_param, context); ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param, context, range, scanner)); ASSERT_NE(nullptr, scanner); ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, *cg_read_info)); ObMockIterator res_iter; res_iter.reset(); ASSERT_EQ(OB_SUCCESS, res_iter.from(result[i])); bool is_equal = res_iter.equals(sstable_iter, false/*cmp multi version row flag*/); ASSERT_TRUE(is_equal); scanner->~ObStoreRowIterator(); } } TEST_F(TestCOMerge, test_merge_range_with_right_max) { int ret = OB_SUCCESS; ObCOTabletMergeCtx merge_context(dag_net_, param_, allocator_); ObCOMerger merger(merger_allocator_, merge_context.static_param_, 0, 3); const char *co_table_data[1]; co_table_data[0]= "bigint bigint bigint bigint flag multi_version_row_flag\n" "0 -8 0 8 EXIST \n" "1 -8 0 NULL EXIST \n" "3 -8 0 3 EXIST \n" "4 -8 0 2 EXIST \n" "6 -8 0 3 EXIST \n" "7 -8 0 3 EXIST \n" "9 -8 0 3 EXIST \n" "12 -8 0 3 EXIST \n"; const char *micro_data1[1]; micro_data1[0] = "bigint bigint bigint bigint dml flag multi_version_row_flag\n" "0 -13 0 9 T_DML_UPDATE EXIST CLF\n" "1 -11 0 NOP T_DML_INSERT EXIST CLF\n" "5 -10 0 NOP T_DML_INSERT EXIST CLF\n"; int schema_rowkey_cnt = 1; int64_t snapshot_version = 10; ObScnRange scn_range; scn_range.start_scn_.set_min(); scn_range.end_scn_.convert_for_tx(10); //prepare table schema prepare_table_schema(micro_data1, schema_rowkey_cnt, scn_range, snapshot_version); ObArray col_ids; ASSERT_EQ(OB_SUCCESS, get_col_ids(table_schema_, col_ids)); ASSERT_EQ(2, col_ids.count()); add_all_and_each_column_group(); init_tablet(); // create co sstable ObMockIterator data_iter; data_iter.reset(); OK(data_iter.from(co_table_data[0])); ObTableHandleV2 co_table_handle; const int64_t micro_row_count[4] = {20, 2, 2, 1}; const int64_t macro_row_count[4] = {30, 4, 4, 3}; prepare_co_sstable(table_schema_, MAJOR_MERGE, snapshot_version, 0, micro_row_count, macro_row_count, data_iter, co_table_handle); ASSERT_EQ(3, static_cast(co_table_handle.get_table())->cs_meta_.column_group_cnt_); merge_context.static_param_.tables_handle_.add_table(co_table_handle); ObTableHandleV2 handle1; scn_range.start_scn_.convert_for_tx(10); scn_range.end_scn_.convert_for_tx(20); table_key_.scn_range_ = scn_range; reset_writer(snapshot_version); prepare_one_macro(micro_data1, 1); prepare_data_end(handle1); merge_context.static_param_.tables_handle_.add_table(handle1); STORAGE_LOG(INFO, "finish prepare sstable1"); ObVersionRange trans_version_range; trans_version_range.snapshot_version_ = 100; trans_version_range.multi_version_start_ = 7; trans_version_range.base_version_ = 7; //prepare merge_ctx prepare_merge_context(MAJOR_MERGE, false, trans_version_range, merge_context); merge_context.array_count_ = 3; alloc_merge_infos(merge_context); OK(merge_context.prepare_index_builder(0, 3)); //prepare merge_range ObDatumRowkey start_key, end_key; void *buf = allocator_.alloc(sizeof(ObStorageDatum) * 2); ObStorageDatum *datums = new (buf) ObStorageDatum[2](); datums[0].set_int(3); datums[1].set_int(6); start_key.datums_ = datums; start_key.datum_cnt_ = 1; end_key.datums_ = &datums[1]; end_key.datum_cnt_ = 1; ObDatumRange merge_range; merge_range.reset(); merge_range.set_start_key(start_key); merge_range.end_key_.set_max_rowkey(); merge_range.set_left_closed(); merge_range.set_right_open(); merge_context.parallel_merge_ctx_.range_array_.reset(); OK(merge_context.parallel_merge_ctx_.range_array_.push_back(merge_range)); set_cg_idx(merge_context, 0, 3); ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0)); STORAGE_LOG(INFO, "finish co merge"); ASSERT_EQ(OB_SUCCESS, merge_context.create_sstables(0,3)); ASSERT_EQ(3, merge_context.merged_cg_tables_handle_.get_count()); const char *result[3]; result[0] = "bigint bigint bigint bigint flag multi_version_row_flag\n" "3 -8 0 3 EXIST \n" "4 -8 0 2 EXIST \n" "5 -10 0 NULL EXIST \n" "6 -8 0 3 EXIST \n" "7 -8 0 3 EXIST \n" "9 -8 0 3 EXIST \n" "12 -8 0 3 EXIST \n"; result[1] = "bigint flag multi_version_row_flag\n" "3 EXIST \n" "4 EXIST \n" "5 EXIST \n" "6 EXIST \n" "7 EXIST \n" "9 EXIST \n" "12 EXIST \n"; result[2] = "bigint flag multi_version_row_flag\n" "3 EXIST \n" "2 EXIST \n" "NULL EXIST \n" "3 EXIST \n" "3 EXIST \n" "3 EXIST \n" "3 EXIST \n"; init_co_sstable(merge_context.merged_cg_tables_handle_, 3); for (int64_t i = 0; i < 3; i++) { ObDatumRange range; range.set_whole_range(); trans_version_range.base_version_ = 1; trans_version_range.multi_version_start_ = 1; trans_version_range.snapshot_version_ = INT64_MAX; ObTableIterParam iter_param; ObTableAccessContext context; const ObITableReadInfo *cg_read_info = nullptr; ObStoreCtx store_ctx; ObSSTable *merged_sstable = static_cast(merge_context.merged_cg_tables_handle_.get_table(i)); ASSERT_NE(nullptr, merged_sstable); if (i > 0) { get_cg_read_info(col_ids.at(i - 1), cg_read_info); } else { cg_read_info = &full_read_info_; } ObStoreRowIterator *scanner = nullptr; ObMockDirectReadIterator sstable_iter; prepare_scan_param(*cg_read_info, trans_version_range, store_ctx, iter_param, context); ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param, context, range, scanner)); ASSERT_NE(nullptr, scanner); ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, *cg_read_info)); ObMockIterator res_iter; res_iter.reset(); ASSERT_EQ(OB_SUCCESS, res_iter.from(result[i])); bool is_equal = res_iter.equals(sstable_iter, false/*cmp multi version row flag*/); ASSERT_TRUE(is_equal); scanner->~ObStoreRowIterator(); } } TEST_F(TestCOMerge, test_merge_range_with_empty) { int ret = OB_SUCCESS; ObCOTabletMergeCtx merge_context(dag_net_, param_, allocator_); ObCOMerger merger(merger_allocator_, merge_context.static_param_, 0, 3); const char *co_table_data[1]; co_table_data[0]= "bigint bigint bigint bigint flag multi_version_row_flag\n" "0 -8 0 8 EXIST \n" "1 -8 0 NULL EXIST \n" "3 -8 0 3 EXIST \n" "4 -8 0 2 EXIST \n" "6 -8 0 3 EXIST \n" "7 -8 0 3 EXIST \n" "9 -8 0 3 EXIST \n" "12 -8 0 3 EXIST \n"; const char *micro_data1[1]; micro_data1[0] = "bigint bigint bigint bigint dml flag multi_version_row_flag\n" "0 -13 0 9 T_DML_UPDATE EXIST CLF\n" "1 -11 0 NOP T_DML_INSERT EXIST CLF\n" "5 -10 0 NOP T_DML_INSERT EXIST CLF\n"; int schema_rowkey_cnt = 1; int64_t snapshot_version = 10; ObScnRange scn_range; scn_range.start_scn_.set_min(); scn_range.end_scn_.convert_for_tx(10); //prepare table schema prepare_table_schema(micro_data1, schema_rowkey_cnt, scn_range, snapshot_version); ObArray col_ids; ASSERT_EQ(OB_SUCCESS, get_col_ids(table_schema_, col_ids)); ASSERT_EQ(2, col_ids.count()); add_all_and_each_column_group(); init_tablet(); // create co sstable ObMockIterator data_iter; data_iter.reset(); OK(data_iter.from(co_table_data[0])); ObTableHandleV2 co_table_handle; const int64_t micro_row_count[4] = {20, 2, 2, 1}; const int64_t macro_row_count[4] = {30, 4, 4, 3}; prepare_co_sstable(table_schema_, MAJOR_MERGE, snapshot_version, 0, micro_row_count, macro_row_count, data_iter, co_table_handle); ASSERT_EQ(3, static_cast(co_table_handle.get_table())->cs_meta_.column_group_cnt_); merge_context.static_param_.tables_handle_.add_table(co_table_handle); STORAGE_LOG(INFO, "finish prepare co sstable", KPC(co_table_handle.get_table())); ObTableHandleV2 handle1; scn_range.start_scn_.convert_for_tx(10); scn_range.end_scn_.convert_for_tx(20); table_key_.scn_range_ = scn_range; reset_writer(snapshot_version); prepare_one_macro(micro_data1, 1); prepare_data_end(handle1); merge_context.static_param_.tables_handle_.add_table(handle1); STORAGE_LOG(INFO, "finish prepare sstable1", KPC(handle1.get_table())); ObVersionRange trans_version_range; trans_version_range.snapshot_version_ = 100; trans_version_range.multi_version_start_ = 7; trans_version_range.base_version_ = 7; //prepare merge_ctx prepare_merge_context(MAJOR_MERGE, false, trans_version_range, merge_context); merge_context.array_count_ = 3; alloc_merge_infos(merge_context); OK(merge_context.prepare_index_builder(0, 3)); //prepare merge_range ObDatumRowkey start_key, end_key; void *buf = allocator_.alloc(sizeof(ObStorageDatum) * 2); ObStorageDatum *datums = new (buf) ObStorageDatum[2](); datums[0].set_int(10); datums[1].set_int(11); start_key.datums_ = datums; start_key.datum_cnt_ = 1; end_key.datums_ = &datums[1]; end_key.datum_cnt_ = 1; ObDatumRange merge_range; merge_range.reset(); merge_range.set_start_key(start_key); merge_range.set_end_key(end_key); merge_range.set_left_closed(); merge_range.set_right_closed(); merge_context.parallel_merge_ctx_.range_array_.reset(); OK(merge_context.parallel_merge_ctx_.range_array_.push_back(merge_range)); set_cg_idx(merge_context, 0, 3); ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0)); STORAGE_LOG(INFO, "finish co merge"); ASSERT_EQ(OB_SUCCESS, merge_context.create_sstables(0,3)); EXPECT_EQ(1, merge_context.merged_cg_tables_handle_.get_count()); for (int64_t i = 0; i < 3; i++) { ObDatumRange range; range.set_whole_range(); trans_version_range.base_version_ = 1; trans_version_range.multi_version_start_ = 1; trans_version_range.snapshot_version_ = INT64_MAX; ObTableIterParam iter_param; ObTableAccessContext context; const ObITableReadInfo *cg_read_info = nullptr; ObStoreCtx store_ctx; ObSSTable *merged_sstable = static_cast(merge_context.merged_cg_tables_handle_.get_table(i)); if (NULL != merged_sstable) { EXPECT_EQ(true, merged_sstable->is_co_sstable()); EXPECT_EQ(true, static_cast(merged_sstable)->is_cgs_empty_co_table()); } } } TEST_F(TestCOMerge, test_merge_range_is_whole_range) { int ret = OB_SUCCESS; ObCOTabletMergeCtx merge_context(dag_net_, param_, allocator_); ObCOMerger merger(merger_allocator_, merge_context.static_param_, 0, 3); const char *co_table_data[1]; co_table_data[0]= "bigint bigint bigint bigint flag multi_version_row_flag\n" "0 -8 0 8 EXIST \n" "1 -8 0 NULL EXIST \n" "3 -8 0 3 EXIST \n" "4 -8 0 2 EXIST \n" "6 -8 0 3 EXIST \n" "7 -8 0 3 EXIST \n" "9 -8 0 3 EXIST \n" "12 -8 0 3 EXIST \n"; const char *micro_data1[1]; micro_data1[0] = "bigint bigint bigint bigint dml flag multi_version_row_flag\n" "0 -13 0 9 T_DML_UPDATE EXIST CLF\n" "1 -11 0 NOP T_DML_INSERT EXIST CLF\n" "5 -10 0 NOP T_DML_INSERT EXIST CLF\n"; int schema_rowkey_cnt = 1; int64_t snapshot_version = 10; ObScnRange scn_range; scn_range.start_scn_.set_min(); scn_range.end_scn_.convert_for_tx(10); //prepare table schema prepare_table_schema(micro_data1, schema_rowkey_cnt, scn_range, snapshot_version); ObArray col_ids; ASSERT_EQ(OB_SUCCESS, get_col_ids(table_schema_, col_ids)); ASSERT_EQ(2, col_ids.count()); add_all_and_each_column_group(); init_tablet(); // create co sstable ObMockIterator data_iter; data_iter.reset(); OK(data_iter.from(co_table_data[0])); ObTableHandleV2 co_table_handle; const int64_t micro_row_count[4] = {20, 2, 2, 1}; const int64_t macro_row_count[4] = {30, 4, 4, 3}; prepare_co_sstable(table_schema_, MAJOR_MERGE, snapshot_version, 0, micro_row_count, macro_row_count, data_iter, co_table_handle); ASSERT_EQ(3, static_cast(co_table_handle.get_table())->cs_meta_.column_group_cnt_); merge_context.static_param_.tables_handle_.add_table(co_table_handle); ObTableHandleV2 handle1; scn_range.start_scn_.convert_for_tx(10); scn_range.end_scn_.convert_for_tx(20); table_key_.scn_range_ = scn_range; reset_writer(snapshot_version); prepare_one_macro(micro_data1, 1); prepare_data_end(handle1); merge_context.static_param_.tables_handle_.add_table(handle1); STORAGE_LOG(INFO, "finish prepare sstable1"); ObVersionRange trans_version_range; trans_version_range.snapshot_version_ = 100; trans_version_range.multi_version_start_ = 7; trans_version_range.base_version_ = 7; //prepare merge_ctx prepare_merge_context(MAJOR_MERGE, false, trans_version_range, merge_context); merge_context.array_count_ = 3; alloc_merge_infos(merge_context); OK(merge_context.prepare_index_builder(0, 3)); //prepare merge_range ObDatumRange merge_range; merge_range.reset(); merge_range.set_whole_range(); merge_context.parallel_merge_ctx_.range_array_.reset(); OK(merge_context.parallel_merge_ctx_.range_array_.push_back(merge_range)); set_cg_idx(merge_context, 0, 3); ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0)); STORAGE_LOG(INFO, "finish co merge"); ASSERT_EQ(OB_SUCCESS, merge_context.create_sstables(0,3)); ASSERT_EQ(3, merge_context.merged_cg_tables_handle_.get_count()); const char *result[3]; result[0] = "bigint bigint bigint bigint flag multi_version_row_flag\n" "0 -13 0 9 EXIST \n" "1 -11 0 NULL EXIST \n" "3 -8 0 3 EXIST \n" "4 -8 0 2 EXIST \n" "5 -10 0 NULL EXIST \n" "6 -8 0 3 EXIST \n" "7 -8 0 3 EXIST \n" "9 -8 0 3 EXIST \n" "12 -8 0 3 EXIST \n"; result[1] = "bigint flag multi_version_row_flag\n" "0 EXIST \n" "1 EXIST \n" "3 EXIST \n" "4 EXIST \n" "5 EXIST \n" "6 EXIST \n" "7 EXIST \n" "9 EXIST \n" "12 EXIST \n"; result[2] = "bigint flag multi_version_row_flag\n" "9 EXIST \n" "NULL EXIST \n" "3 EXIST \n" "2 EXIST \n" "NULL EXIST \n" "3 EXIST \n" "3 EXIST \n" "3 EXIST \n" "3 EXIST \n"; init_co_sstable(merge_context.merged_cg_tables_handle_, 3); for (int64_t i = 0; i < 3; i++) { ObDatumRange range; range.set_whole_range(); trans_version_range.base_version_ = 1; trans_version_range.multi_version_start_ = 1; trans_version_range.snapshot_version_ = INT64_MAX; ObTableIterParam iter_param; ObTableAccessContext context; const ObITableReadInfo *cg_read_info = nullptr; ObStoreCtx store_ctx; ObSSTable *merged_sstable = static_cast(merge_context.merged_cg_tables_handle_.get_table(i)); ASSERT_NE(nullptr, merged_sstable); if (i > 0) { get_cg_read_info(col_ids.at(i - 1), cg_read_info); } else { cg_read_info = &full_read_info_; } ObStoreRowIterator *scanner = nullptr; ObMockDirectReadIterator sstable_iter; prepare_scan_param(*cg_read_info, trans_version_range, store_ctx, iter_param, context); ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param, context, range, scanner)); ASSERT_NE(nullptr, scanner); ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, *cg_read_info)); ObMockIterator res_iter; res_iter.reset(); ASSERT_EQ(OB_SUCCESS, res_iter.from(result[i])); bool is_equal = res_iter.equals(sstable_iter, false/*cmp multi version row flag*/); ASSERT_TRUE(is_equal); scanner->~ObStoreRowIterator(); } } TEST_F(TestCOMerge, test_merge_range_with_beyond_range) { int ret = OB_SUCCESS; ObCOTabletMergeCtx merge_context(dag_net_, param_, allocator_); ObCOMerger merger(merger_allocator_, merge_context.static_param_, 0, 3); const char *co_table_data[1]; co_table_data[0]= "bigint bigint bigint bigint flag multi_version_row_flag\n" "0 -8 0 8 EXIST \n" "1 -8 0 NULL EXIST \n" "3 -8 0 3 EXIST \n" "4 -8 0 2 EXIST \n" "6 -8 0 3 EXIST \n" "7 -8 0 3 EXIST \n" "11 -8 0 3 EXIST \n" "12 -8 0 3 EXIST \n"; const char *micro_data1[1]; micro_data1[0] = "bigint bigint bigint bigint dml flag multi_version_row_flag\n" "0 -13 0 9 T_DML_UPDATE EXIST CLF\n" "1 -11 0 NOP T_DML_INSERT EXIST CLF\n" "5 -10 0 NOP T_DML_INSERT EXIST CLF\n"; int schema_rowkey_cnt = 1; int64_t snapshot_version = 10; ObScnRange scn_range; scn_range.start_scn_.set_min(); scn_range.end_scn_.convert_for_tx(10); //prepare table schema prepare_table_schema(micro_data1, schema_rowkey_cnt, scn_range, snapshot_version); ObArray col_ids; ASSERT_EQ(OB_SUCCESS, get_col_ids(table_schema_, col_ids)); ASSERT_EQ(2, col_ids.count()); add_all_and_each_column_group(); init_tablet(); // create co sstable ObMockIterator data_iter; data_iter.reset(); OK(data_iter.from(co_table_data[0])); ObTableHandleV2 co_table_handle; const int64_t micro_row_count[4] = {2, 2, 2, 1}; const int64_t macro_row_count[4] = {30, 4, 4, 3}; prepare_co_sstable(table_schema_, MAJOR_MERGE, snapshot_version, 0, micro_row_count, macro_row_count, data_iter, co_table_handle); ASSERT_EQ(3, static_cast(co_table_handle.get_table())->cs_meta_.column_group_cnt_); merge_context.static_param_.tables_handle_.add_table(co_table_handle); ObTableHandleV2 handle1; scn_range.start_scn_.convert_for_tx(10); scn_range.end_scn_.convert_for_tx(20); table_key_.scn_range_ = scn_range; reset_writer(snapshot_version); prepare_one_macro(micro_data1, 1); prepare_data_end(handle1); merge_context.static_param_.tables_handle_.add_table(handle1); STORAGE_LOG(INFO, "finish prepare sstable1"); ObVersionRange trans_version_range; trans_version_range.snapshot_version_ = 100; trans_version_range.multi_version_start_ = 7; trans_version_range.base_version_ = 7; //prepare merge_ctx prepare_merge_context(MAJOR_MERGE, false, trans_version_range, merge_context); merge_context.array_count_ = 3; alloc_merge_infos(merge_context); OK(merge_context.prepare_index_builder(0, 3)); //prepare merge_range ObDatumRowkey start_key, end_key; void *buf = allocator_.alloc(sizeof(ObStorageDatum) * 2); ObStorageDatum *datums = new (buf) ObStorageDatum[2](); datums[0].set_int(8); datums[1].set_int(9); start_key.datums_ = datums; start_key.datum_cnt_ = 1; end_key.datums_ = &datums[1]; end_key.datum_cnt_ = 1; ObDatumRange merge_range; merge_range.reset(); merge_range.set_start_key(start_key); merge_range.set_end_key(end_key); merge_range.set_left_closed(); merge_range.set_right_closed(); merge_context.parallel_merge_ctx_.range_array_.reset(); OK(merge_context.parallel_merge_ctx_.range_array_.push_back(merge_range)); set_cg_idx(merge_context, 0, 3); ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0)); STORAGE_LOG(INFO, "finish co merge"); ASSERT_EQ(OB_SUCCESS, merge_context.create_sstables(0,3)); EXPECT_EQ(1, merge_context.merged_cg_tables_handle_.get_count()); for (int64_t i = 0; i < 3; i++) { ObDatumRange range; range.set_whole_range(); trans_version_range.base_version_ = 1; trans_version_range.multi_version_start_ = 1; trans_version_range.snapshot_version_ = INT64_MAX; ObTableIterParam iter_param; ObTableAccessContext context; const ObITableReadInfo *cg_read_info = nullptr; ObStoreCtx store_ctx; ObSSTable *merged_sstable = static_cast(merge_context.merged_cg_tables_handle_.get_table(i)); if (NULL != merged_sstable) { EXPECT_EQ(true, merged_sstable->is_co_sstable()); EXPECT_EQ(true, static_cast(merged_sstable)->is_cgs_empty_co_table()); } } } TEST_F(TestCOMerge, test_rebuild_sstable) { int ret = OB_SUCCESS; ObCOTabletMergeCtx merge_context(dag_net_, param_, allocator_); ObCOMerger merger(merger_allocator_, merge_context.static_param_, 0, 3); const char *co_table_data[1]; co_table_data[0]= "bigint bigint bigint bigint flag multi_version_row_flag\n" "0 -8 0 8 EXIST \n" "1 -8 0 NULL EXIST \n" "3 -8 0 3 EXIST \n" "4 -8 0 2 EXIST \n" "6 -8 0 3 EXIST \n" "7 -8 0 3 EXIST \n" "9 -8 0 3 EXIST \n" "12 -8 0 3 EXIST \n"; const char *micro_data1[1]; micro_data1[0] = "bigint bigint bigint bigint dml flag multi_version_row_flag\n" "0 -13 0 9 T_DML_UPDATE EXIST CLF\n" "1 -11 0 NOP T_DML_INSERT EXIST CLF\n" "5 -10 0 NOP T_DML_INSERT EXIST CLF\n"; int schema_rowkey_cnt = 1; int64_t snapshot_version = 10; ObScnRange scn_range; scn_range.start_scn_.set_min(); scn_range.end_scn_.convert_for_tx(10); //prepare table schema prepare_table_schema(micro_data1, schema_rowkey_cnt, scn_range, snapshot_version); ObArray col_ids; ASSERT_EQ(OB_SUCCESS, get_col_ids(table_schema_, col_ids)); ASSERT_EQ(2, col_ids.count()); add_all_and_each_column_group(); init_tablet(); // create co sstable ObMockIterator data_iter; data_iter.reset(); OK(data_iter.from(co_table_data[0])); ObTableHandleV2 co_table_handle; const int64_t micro_row_count[4] = {20, 2, 2, 1}; const int64_t macro_row_count[4] = {30, 4, 4, 3}; prepare_co_sstable(table_schema_, MAJOR_MERGE, snapshot_version, 0, micro_row_count, macro_row_count, data_iter, co_table_handle); ASSERT_EQ(3, static_cast(co_table_handle.get_table())->cs_meta_.column_group_cnt_); merge_context.static_param_.tables_handle_.add_table(co_table_handle); ObTableHandleV2 handle1; scn_range.start_scn_.convert_for_tx(10); scn_range.end_scn_.convert_for_tx(20); table_key_.scn_range_ = scn_range; reset_writer(snapshot_version); prepare_one_macro(micro_data1, 1); prepare_data_end(handle1); merge_context.static_param_.tables_handle_.add_table(handle1); STORAGE_LOG(INFO, "finish prepare sstable1"); ObVersionRange trans_version_range; trans_version_range.snapshot_version_ = 100; trans_version_range.multi_version_start_ = 7; trans_version_range.base_version_ = 7; //prepare merge_ctx prepare_merge_context(MAJOR_MERGE, false, trans_version_range, merge_context); merge_context.array_count_ = 3; alloc_merge_infos(merge_context); OK(merge_context.prepare_index_builder(0, 3)); //prepare merge_range ObDatumRowkey start_key, end_key; void *buf = allocator_.alloc(sizeof(ObStorageDatum) * 2); ObStorageDatum *datums = new (buf) ObStorageDatum[2](); datums[0].set_int(3); datums[1].set_int(6); start_key.datums_ = datums; start_key.datum_cnt_ = 1; end_key.datums_ = &datums[1]; end_key.datum_cnt_ = 1; ObDatumRange merge_range, merge_range_1; merge_range_1.reset(); merge_range_1.set_start_key(start_key); merge_range_1.set_end_key(end_key); merge_range_1.set_left_closed(); merge_range_1.set_right_open(); merge_range.reset(); merge_range.set_start_key(end_key); merge_range.end_key_.set_max_rowkey(); merge_range.set_left_closed(); merge_range.set_right_open(); merge_context.parallel_merge_ctx_.range_array_.reset(); merge_context.parallel_merge_ctx_.parallel_type_ = ObParallelMergeCtx::PARALLEL_MAJOR; merge_context.parallel_merge_ctx_.concurrent_cnt_ = 2; OK(merge_context.parallel_merge_ctx_.range_array_.push_back(merge_range_1)); OK(merge_context.parallel_merge_ctx_.range_array_.push_back(merge_range)); merge_context.static_param_.concurrent_cnt_ = 2; set_cg_idx(merge_context, 0, 3); ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0)); ObCOMerger merger1(merger_allocator_, merge_context.static_param_, 0, 3); ASSERT_EQ(OB_SUCCESS, merger1.merge_partition(merge_context, 1)); STORAGE_LOG(INFO, "finish co merge"); merge_context.cg_merge_info_array_[0]->sstable_builder_.data_store_desc_.get_desc().static_desc_->major_working_cluster_version_ = DATA_VERSION_4_3_0_0; merge_context.cg_merge_info_array_[1]->sstable_builder_.data_store_desc_.get_desc().static_desc_->major_working_cluster_version_ = DATA_VERSION_4_3_0_0; ASSERT_EQ(OB_SUCCESS, merge_context.create_sstables(0,3)); ASSERT_EQ(3, merge_context.merged_cg_tables_handle_.get_count()); const char *result[3]; result[0] = "bigint bigint bigint bigint flag multi_version_row_flag\n" "3 -8 0 3 EXIST \n" "4 -8 0 2 EXIST \n" "5 -10 0 NULL EXIST \n" "6 -8 0 3 EXIST \n" "7 -8 0 3 EXIST \n" "9 -8 0 3 EXIST \n" "12 -8 0 3 EXIST \n"; result[1] = "bigint flag multi_version_row_flag\n" "3 EXIST \n" "4 EXIST \n" "5 EXIST \n" "6 EXIST \n" "7 EXIST \n" "9 EXIST \n" "12 EXIST \n"; result[2] = "bigint flag multi_version_row_flag\n" "3 EXIST \n" "2 EXIST \n" "NULL EXIST \n" "3 EXIST \n" "3 EXIST \n" "3 EXIST \n" "3 EXIST \n"; init_co_sstable(merge_context.merged_cg_tables_handle_, 3); for (int64_t i = 0; i < 3; i++) { ObDatumRange range; range.set_whole_range(); trans_version_range.base_version_ = 1; trans_version_range.multi_version_start_ = 1; trans_version_range.snapshot_version_ = INT64_MAX; ObTableIterParam iter_param; ObTableAccessContext context; const ObITableReadInfo *cg_read_info = nullptr; ObStoreCtx store_ctx; ObSSTable *merged_sstable = static_cast(merge_context.merged_cg_tables_handle_.get_table(i)); ASSERT_NE(nullptr, merged_sstable); if (i > 0) { get_cg_read_info(col_ids.at(i - 1), cg_read_info); } else { cg_read_info = &full_read_info_; } ObStoreRowIterator *scanner = nullptr; ObMockDirectReadIterator sstable_iter; prepare_scan_param(*cg_read_info, trans_version_range, store_ctx, iter_param, context); ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param, context, range, scanner)); ASSERT_NE(nullptr, scanner); ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, *cg_read_info)); ObMockIterator res_iter; res_iter.reset(); ASSERT_EQ(OB_SUCCESS, res_iter.from(result[i])); bool is_equal = res_iter.equals(sstable_iter, false/*cmp multi version row flag*/); ASSERT_TRUE(is_equal); scanner->~ObStoreRowIterator(); } } } } int main(int argc, char **argv) { system("rm -rf test_co_merge.log*"); OB_LOGGER.set_file_name("test_co_merge.log", true); OB_LOGGER.set_log_level("INFO"); oceanbase::common::ObLogger::get_logger().set_log_level("INFO"); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }