fix row offset in DDL_MERGE_CO_SSTABLE
This commit is contained in:
@ -35,6 +35,7 @@
|
||||
#include "storage/mock_access_service.h"
|
||||
#include "storage/test_dml_common.h"
|
||||
#include "storage/test_tablet_helper.h"
|
||||
#include "storage/ddl/ob_ddl_merge_task.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -102,6 +103,7 @@ protected:
|
||||
ObMergeType merge_type_;
|
||||
int64_t max_row_cnt_;
|
||||
int64_t max_partial_row_cnt_;
|
||||
int64_t co_sstable_row_offset_;
|
||||
int64_t partial_kv_start_idx_;
|
||||
ObTableSchema table_schema_;
|
||||
ObTableSchema index_schema_;
|
||||
@ -193,6 +195,7 @@ TestIndexBlockDataPrepare::TestIndexBlockDataPrepare(
|
||||
const int64_t mirco_blocks_per_macro_block)
|
||||
: merge_type_(merge_type),
|
||||
max_row_cnt_(max_row_cnt),
|
||||
co_sstable_row_offset_(0),
|
||||
row_cnt_(0),
|
||||
partial_sstable_row_cnt_(0),
|
||||
root_index_builder_(nullptr),
|
||||
@ -890,6 +893,36 @@ void TestIndexBlockDataPrepare::prepare_partial_ddl_data()
|
||||
ASSERT_EQ(OB_SUCCESS, storage_schema->get_stored_column_count_in_sstable(column_cnt));
|
||||
prepare_partial_sstable(column_cnt);
|
||||
prepare_merge_ddl_kvs();
|
||||
|
||||
ObArray<blocksstable::ObSSTable *> ddl_tables;
|
||||
ASSERT_EQ(OB_SUCCESS, ddl_tables.push_back(&partial_sstable_));
|
||||
for (int64_t i = 0; i < DDL_KVS_CNT; ++i) {
|
||||
ASSERT_EQ(OB_SUCCESS, ddl_tables.push_back(ddl_kvs_.get_obj()->get_ddl_memtables().at(i)));
|
||||
}
|
||||
share::SCN ddl_start_scn;
|
||||
ddl_start_scn.convert_from_ts(ObTimeUtility::current_time());
|
||||
ObTabletDDLParam ddl_param;
|
||||
ddl_param.table_key_ = partial_sstable_.key_;
|
||||
ddl_param.data_format_version_ = DATA_VERSION_4_3_0_0;
|
||||
ddl_param.start_scn_ = ddl_start_scn;
|
||||
ddl_param.commit_scn_ = ddl_start_scn;
|
||||
ddl_param.direct_load_type_ = DIRECT_LOAD_DDL;
|
||||
ddl_param.ls_id_ = ls_id_;
|
||||
ddl_param.snapshot_version_ = SNAPSHOT_VERSION;
|
||||
ObArray<ObDDLBlockMeta> sorted_metas;
|
||||
ObArenaAllocator arena("compact_test", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
|
||||
ASSERT_EQ(OB_SUCCESS, ObTabletDDLUtil::get_compact_meta_array(
|
||||
*tablet_handle.get_obj(),
|
||||
ddl_tables,
|
||||
ddl_param,
|
||||
tablet_handle.get_obj()->get_rowkey_read_info(),
|
||||
storage_schema,
|
||||
arena,
|
||||
sorted_metas));
|
||||
if (co_sstable_row_offset_ != 0) {
|
||||
ASSERT_EQ(sorted_metas.at(2).end_row_offset_, co_sstable_row_offset_);
|
||||
}
|
||||
arena.reset();
|
||||
ObTabletObjLoadHelper::free(allocator_, storage_schema);
|
||||
}
|
||||
|
||||
@ -983,6 +1016,24 @@ void TestIndexBlockDataPrepare::prepare_partial_sstable(const int64_t column_cnt
|
||||
STORAGE_LOG(INFO, "not supported root block", K(root_desc));
|
||||
ASSERT_TRUE(false);
|
||||
}
|
||||
ObMicroBlockReaderHelper reader_helper;
|
||||
ObIMicroBlockReader *micro_reader;
|
||||
ASSERT_EQ(OB_SUCCESS, reader_helper.init(allocator_));
|
||||
ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(merge_root_index_builder_->index_store_desc_.get_desc().row_store_type_, micro_reader));
|
||||
ObMicroBlockData root_block(root_buf, root_size);
|
||||
ObDatumRow row;
|
||||
OK(row.init(allocator_, merge_root_index_builder_->index_store_desc_.get_desc().col_desc_->row_column_count_));
|
||||
OK(micro_reader->init(root_block, nullptr));
|
||||
ObIndexBlockRowParser idx_row_parser;
|
||||
int64_t last_idx = 0;
|
||||
for (int64_t it = 0; it != micro_reader->row_count(); ++it) {
|
||||
idx_row_parser.reset();
|
||||
OK(micro_reader->get_row(it, row));
|
||||
OK(idx_row_parser.init(TEST_ROWKEY_COLUMN_CNT + 2, row));
|
||||
int64_t before_last_idx = last_idx;
|
||||
STORAGE_LOG(INFO, "check offset", K(idx_row_parser.get_row_offset()), K(it));
|
||||
}
|
||||
|
||||
// deserialize micro block header in root block buf
|
||||
ObMicroBlockHeader root_micro_header;
|
||||
int64_t des_pos = 0;
|
||||
|
||||
@ -162,10 +162,6 @@ TEST_F(TestCgSSTable, test_cg_index_tree_cursor)
|
||||
is_macro_start = false;
|
||||
}
|
||||
if ((i + 1) % 100 == 0) {
|
||||
int64_t row_offset = 0;
|
||||
OK(tree_cursor.get_start_row_offset(row_offset));
|
||||
ASSERT_EQ(row_offset, start_row_offset);
|
||||
|
||||
OK(tree_cursor.get_idx_parser(parser));
|
||||
ASSERT_EQ(parser->get_row_offset(), 99);
|
||||
|
||||
|
||||
@ -76,6 +76,7 @@ TestDDLMergeRowMultiScanner::TestDDLMergeRowMultiScanner()
|
||||
is_ddl_merge_data_ = true;
|
||||
max_row_cnt_ = 150000;
|
||||
max_partial_row_cnt_ = 78881;
|
||||
co_sstable_row_offset_ = max_partial_row_cnt_ - 1;
|
||||
partial_kv_start_idx_ = 3;
|
||||
}
|
||||
|
||||
|
||||
@ -73,6 +73,7 @@ TestDDLMergeRowScanner::TestDDLMergeRowScanner()
|
||||
is_ddl_merge_data_ = true;
|
||||
max_row_cnt_ = 150000;
|
||||
max_partial_row_cnt_ = 78881;
|
||||
co_sstable_row_offset_ = max_partial_row_cnt_ - 1;
|
||||
partial_kv_start_idx_ = 3;
|
||||
}
|
||||
|
||||
|
||||
@ -1624,6 +1624,70 @@ TEST_F(TestIndexTree, test_cg_row_offset)
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestIndexTree, test_absolute_offset)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t test_row_num = 500;
|
||||
ObArray<ObMacroBlocksWriteCtx *> data_write_ctxs;
|
||||
ObArray<ObMacroBlocksWriteCtx *> index_write_ctxs;
|
||||
ObMacroMetasArray *merge_info_list = nullptr;
|
||||
ObSSTableMergeRes res;
|
||||
IndexTreeRootCtxList *roots = nullptr;
|
||||
|
||||
ObWholeDataStoreDesc data_desc;
|
||||
ObSSTableIndexBuilder sstable_builder;
|
||||
|
||||
prepare_data_desc(data_desc, &sstable_builder);
|
||||
data_desc.get_desc().micro_block_size_ = 512; // make test index tree height > 2
|
||||
ret = sstable_builder.init(data_desc.get_desc());
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
|
||||
ObWholeDataStoreDesc index_desc;
|
||||
prepare_index_desc(data_desc, index_desc);
|
||||
|
||||
ObIndexBlockRebuilder rebuilder;
|
||||
OK(rebuilder.init(sstable_builder, false, nullptr, true/*use_absolute_offset*/));
|
||||
|
||||
mock_compaction(test_row_num, data_write_ctxs, index_write_ctxs, merge_info_list, res, roots);
|
||||
ASSERT_EQ(test_row_num, merge_info_list->count());
|
||||
vector<int64_t> absolute_offsets;
|
||||
for (int meta_idx = 0; meta_idx < test_row_num; meta_idx += 10) {
|
||||
rebuilder.append_macro_row(*merge_info_list->at(meta_idx), meta_idx);
|
||||
absolute_offsets.push_back(meta_idx);
|
||||
}
|
||||
OK(rebuilder.close());
|
||||
ObSSTableMergeRes res2;
|
||||
OK(sstable_builder.close(res2));
|
||||
ASSERT_GT(res2.root_desc_.height_, 2);
|
||||
|
||||
ObMicroBlockReaderHelper reader_helper;
|
||||
ObIMicroBlockReader *micro_reader;
|
||||
ASSERT_EQ(OB_SUCCESS, reader_helper.init(allocator_));
|
||||
ASSERT_EQ(OB_SUCCESS, reader_helper.get_reader(sstable_builder.index_store_desc_.get_desc().row_store_type_, micro_reader));
|
||||
|
||||
|
||||
ObMicroBlockData root_block(res2.root_desc_.buf_, res2.root_desc_.addr_.size_);
|
||||
ObDatumRow row;
|
||||
OK(row.init(allocator_, index_desc.get_desc().col_desc_->row_column_count_));
|
||||
OK(micro_reader->init(root_block, nullptr));
|
||||
ObIndexBlockRowParser idx_row_parser;
|
||||
int64_t last_idx = 0;
|
||||
for (int64_t it = 0; it != micro_reader->row_count(); ++it) {
|
||||
idx_row_parser.reset();
|
||||
OK(micro_reader->get_row(it, row));
|
||||
OK(idx_row_parser.init(TEST_ROWKEY_COLUMN_CNT + 2, row));
|
||||
int64_t before_last_idx = last_idx;
|
||||
for (int absolute_idx = last_idx; absolute_idx < absolute_offsets.size(); absolute_idx ++) {
|
||||
if (absolute_offsets[absolute_idx] == idx_row_parser.get_row_offset()) {
|
||||
last_idx = absolute_idx;
|
||||
}
|
||||
}
|
||||
ASSERT_GT(last_idx, before_last_idx);
|
||||
if (it == micro_reader->row_count() - 1) {
|
||||
ASSERT_EQ(absolute_offsets[absolute_offsets.size() - 1], idx_row_parser.get_row_offset());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestIndexTree, test_close_with_old_schema)
|
||||
{
|
||||
|
||||
@ -2808,6 +2808,8 @@ int ObIndexBlockRebuilder::append_macro_row(
|
||||
STORAGE_LOG(WARN, "failed to add macro block meta", K(ret), K(macro_meta));
|
||||
} else if (OB_FAIL(index_tree_root_ctx_->add_absolute_row_offset(absolute_row_offset))) {
|
||||
STORAGE_LOG(WARN, "failed to add abs row offset", K(ret), K(absolute_row_offset));
|
||||
} else {
|
||||
STORAGE_LOG(DEBUG, "append macro meta with absolute offset", K(absolute_row_offset), K(macro_meta));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
||||
@ -314,8 +314,6 @@ int ObIndexBlockMacroIterator::get_next_macro_block(
|
||||
ret = OB_ITER_END;
|
||||
} else if (OB_FAIL(tree_cursor_.get_macro_block_id(macro_id))) {
|
||||
LOG_WARN("Fail to get macro row block id", K(ret), K(macro_id));
|
||||
} else if (OB_FAIL(tree_cursor_.get_start_row_offset(start_row_offset))) {
|
||||
LOG_WARN("Fail to get prev row offset", K(ret), K(start_row_offset));
|
||||
} else if (OB_FAIL(tree_cursor_.get_idx_parser(idx_row_parser))) {
|
||||
LOG_WARN("Fail to get idx row parser", K(ret), K_(tree_cursor));
|
||||
} else if (OB_ISNULL(idx_row_parser)) {
|
||||
@ -330,9 +328,7 @@ int ObIndexBlockMacroIterator::get_next_macro_block(
|
||||
if ((macro_id == begin_ && is_reverse_scan_) || (macro_id == end_ && !is_reverse_scan_)) {
|
||||
is_iter_end_ = true;
|
||||
}
|
||||
if (sstable_->is_normal_cg_sstable()) {
|
||||
start_row_offset = idx_row_parser->get_row_offset() - idx_row_header->get_row_count() + 1;
|
||||
}
|
||||
start_row_offset = idx_row_parser->get_row_offset() - idx_row_header->get_row_count() + 1;
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
|
||||
@ -910,31 +910,6 @@ int ObIndexBlockTreeCursor::get_macro_block_id(MacroBlockId ¯o_id)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObIndexBlockTreeCursor::get_start_row_offset(int64_t &start_row_offset)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const ObIndexBlockRowHeader *idx_header = nullptr;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("Not init", K(ret));
|
||||
} else if (OB_ISNULL(curr_path_item_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("curr path item is null", K(ret));
|
||||
} else if (FALSE_IT(start_row_offset = curr_path_item_->start_row_offset_)) {
|
||||
} else if (OB_FAIL(idx_row_parser_.get_header(idx_header))) {
|
||||
LOG_WARN("Fail to get index block row header", K(ret));
|
||||
} else if (OB_ISNULL(idx_header)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("Got null pointer for index block row header", K(ret));
|
||||
} else if (!(idx_header->is_data_block() || idx_header->is_leaf_block())) {
|
||||
if (OB_UNLIKELY(0 != start_row_offset)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("prev row offset should be -1", K(ret), KPC(idx_header), KPC_(curr_path_item));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObIndexBlockTreeCursor::get_child_micro_infos(
|
||||
const ObDatumRange &range,
|
||||
ObArenaAllocator &endkey_allocator,
|
||||
@ -1108,10 +1083,8 @@ int ObIndexBlockTreeCursor::get_next_level_block(
|
||||
// curr_path_item_->is_block_transformed_ = false;
|
||||
}
|
||||
curr_path_item_->start_row_offset_ = 0;
|
||||
if (OB_SUCC(ret) && is_normal_cg_sstable_ && TreeType::INDEX_BLOCK == tree_type_) {
|
||||
if (idx_row_header.is_leaf_block() || idx_row_header.is_data_block()) {
|
||||
curr_path_item_->start_row_offset_ = curr_row_offset - idx_row_header.row_count_ + 1;
|
||||
}
|
||||
if (OB_SUCC(ret) && TreeType::INDEX_BLOCK == tree_type_ && (idx_row_header.is_leaf_block() || idx_row_header.is_data_block())) {
|
||||
curr_path_item_->start_row_offset_ = curr_row_offset - idx_row_header.row_count_ + 1;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -178,7 +178,6 @@ public:
|
||||
int get_idx_parser(const ObIndexBlockRowParser *&parser);
|
||||
int get_idx_row_header(const ObIndexBlockRowHeader *&idx_header);
|
||||
int get_macro_block_id(MacroBlockId ¯o_id);
|
||||
int get_start_row_offset(int64_t &start_row_offset);
|
||||
|
||||
// Need to release held item at the end of lifetime
|
||||
int get_child_micro_infos(
|
||||
|
||||
@ -1674,9 +1674,11 @@ int ObSSTable::get_index_tree_root(
|
||||
LOG_WARN("can not get index tree rot from an unloaded sstable", K(ret));
|
||||
} else if (is_ddl_merge_empty_sstable()) {
|
||||
// mock here, skip valid_check
|
||||
index_data.reset();
|
||||
index_data.type_ = ObMicroBlockData::DDL_MERGE_INDEX_BLOCK;
|
||||
index_data.buf_ = DDL_EMPTY_SSTABLE_DUMMY_INDEX_DATA_BUF;
|
||||
index_data.size_ = DDL_EMPTY_SSTABLE_DUMMY_INDEX_DATA_SIZE;
|
||||
LOG_INFO("empty ddl merge sstable", K(index_data));
|
||||
} else if (OB_UNLIKELY(!meta_->get_root_info().get_addr().is_valid()
|
||||
|| !meta_->get_root_info().get_block_data().is_valid())) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
@ -1693,7 +1695,7 @@ int ObSSTable::get_index_tree_root(
|
||||
}
|
||||
if (OB_SUCC(ret) && is_ddl_merge_sstable()) {
|
||||
index_data.type_ = ObMicroBlockData::DDL_MERGE_INDEX_BLOCK;
|
||||
LOG_INFO("empty ddl merge sstable", K(index_data));
|
||||
LOG_INFO("ddl merge sstable get root", K(index_data));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -881,7 +881,7 @@ int ObDDLMacroBlockIterator::get_next(ObDataMacroBlockMeta &data_macro_meta, int
|
||||
if (OB_FAIL(macro_block_iter_->get_next_macro_block(block_desc))) {
|
||||
LOG_WARN("get next macro block failed", K(ret));
|
||||
} else {
|
||||
end_row_offset = block_desc.start_row_offset_ + block_desc.row_count_;
|
||||
end_row_offset = block_desc.start_row_offset_ + block_desc.row_count_ - 1;
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(sec_meta_iter_->get_next(data_macro_meta))) {
|
||||
@ -951,12 +951,13 @@ int get_sorted_meta_array(
|
||||
copied_meta->~ObDataMacroBlockMeta();
|
||||
} else {
|
||||
FLOG_INFO("append meta tree success", K(ret), "table_key", cur_sstable->get_key(), "macro_block_id", data_macro_meta.get_macro_id(),
|
||||
"data_checksum", copied_meta->val_.data_checksum_, K(meta_tree.get_macro_block_cnt()), "macro_block_end_key", to_cstring(copied_meta->end_key_));
|
||||
"data_checksum", copied_meta->val_.data_checksum_, K(meta_tree.get_macro_block_cnt()), "macro_block_end_key", to_cstring(copied_meta->end_key_),
|
||||
"end_row_offset", end_row_offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG_INFO("append meta tree finished", K(ret), "table_key", cur_sstable->get_key(), "data_macro_block_cnt_in_sstable", cur_sstable->get_data_macro_block_count(),
|
||||
K(meta_tree.get_macro_block_cnt()), "sstable_end_key", OB_ISNULL(copied_meta) ? "NOT_EXIST": to_cstring(copied_meta->end_key_));
|
||||
K(meta_tree.get_macro_block_cnt()), "sstable_end_key", OB_ISNULL(copied_meta) ? "NOT_EXIST": to_cstring(copied_meta->end_key_), "end_row_offset", end_row_offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1007,6 +1008,26 @@ int compact_sstables(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletDDLUtil::get_compact_meta_array(
|
||||
ObTablet &tablet,
|
||||
ObIArray<ObSSTable *> &sstables,
|
||||
const ObTabletDDLParam &ddl_param,
|
||||
const ObITableReadInfo &read_info,
|
||||
const ObStorageSchema *storage_schema,
|
||||
common::ObArenaAllocator &allocator,
|
||||
ObArray<ObDDLBlockMeta> &sorted_metas)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
sorted_metas.reset();
|
||||
ObBlockMetaTree meta_tree;
|
||||
if (OB_FAIL(meta_tree.init(tablet, ddl_param.table_key_, ddl_param.start_scn_, ddl_param.data_format_version_, storage_schema))) {
|
||||
LOG_WARN("init meta tree failed", K(ret), K(ddl_param));
|
||||
} else if (OB_FAIL(get_sorted_meta_array(sstables, read_info, meta_tree, allocator, sorted_metas))) {
|
||||
LOG_WARN("get sorted meta array failed", K(ret), K(read_info), K(sstables));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int compact_co_ddl_sstable(
|
||||
ObTablet &tablet,
|
||||
ObTableStoreIterator &ddl_sstable_iter,
|
||||
|
||||
@ -114,6 +114,14 @@ public:
|
||||
const ObStorageSchema *storage_schema,
|
||||
blocksstable::ObWholeDataStoreDesc &data_desc);
|
||||
|
||||
static int get_compact_meta_array(
|
||||
ObTablet &tablet,
|
||||
ObIArray<blocksstable::ObSSTable *> &sstables,
|
||||
const ObTabletDDLParam &ddl_param,
|
||||
const ObITableReadInfo &read_info,
|
||||
const ObStorageSchema *storage_schema,
|
||||
common::ObArenaAllocator &allocator,
|
||||
ObArray<ObDDLBlockMeta> &sorted_metas);
|
||||
static int create_ddl_sstable(
|
||||
ObTablet &tablet,
|
||||
const ObTabletDDLParam &ddl_param,
|
||||
|
||||
Reference in New Issue
Block a user