diff --git a/src/storage/memtable/ob_multi_source_data.cpp b/src/storage/memtable/ob_multi_source_data.cpp index 22eb1865d..264825e8c 100644 --- a/src/storage/memtable/ob_multi_source_data.cpp +++ b/src/storage/memtable/ob_multi_source_data.cpp @@ -159,7 +159,8 @@ void ObMultiSourceData::inner_release_rest_unit_data( int ObMultiSourceData::inner_mark_unit_sync_finish( const int64_t list_pos, const int64_t unit_version, - bool save_last_flag) + bool save_last_flag, + const int unsync_cnt_for_multi_data) { int ret = OB_SUCCESS; if (OB_UNLIKELY(list_pos < 0 || list_pos >= MAX_LIST_COUNT)) { @@ -176,6 +177,7 @@ int ObMultiSourceData::inner_mark_unit_sync_finish( KPC(last_item)); } else { last_item->set_sync_finish(true); + last_item->set_unsync_cnt_for_multi_data(unsync_cnt_for_multi_data); if (save_last_flag) { (void)inner_release_rest_unit_data(list_pos, unit_version); diff --git a/src/storage/memtable/ob_multi_source_data.h b/src/storage/memtable/ob_multi_source_data.h index c2569b41b..9306492d4 100644 --- a/src/storage/memtable/ob_multi_source_data.h +++ b/src/storage/memtable/ob_multi_source_data.h @@ -97,6 +97,7 @@ public: TRANS_LOG(INFO, "unsync_cnt_for_multi_data dec", KPC(this)); } int get_unsync_cnt_for_multi_data() const { return unsynced_cnt_for_multi_data_; } + void set_unsync_cnt_for_multi_data(const int unsynced_cnt_for_multi_data) { unsynced_cnt_for_multi_data_ = unsynced_cnt_for_multi_data; } virtual bool is_save_last() const { return true; } // only store one data unit with sync_finish=true virtual int64_t get_version() const { return common::OB_INVALID_VERSION; } // have to implement for unit list type VIRTUAL_TO_STRING_KV(K_(is_tx_end), @@ -142,7 +143,8 @@ private: int inner_mark_unit_sync_finish( const int64_t unit_type, const int64_t unit_version, - bool save_last_flag); + bool save_last_flag, + const int unsync_cnt_for_multi_data); void inner_release_rest_unit_data( const int64_t list_pos, const int64_t unit_version); @@ -209,7 +211,7 @@ int ObMultiSourceData::save_multi_source_data_unit_in_list(const T *const src, b (void)inner_release_rest_unit_data(list_pos, src->get_version()); } } else if (src->is_sync_finish() - && OB_FAIL(inner_mark_unit_sync_finish(list_pos, src->get_version(), src->is_save_last()))) { // mark finish + && OB_FAIL(inner_mark_unit_sync_finish(list_pos, src->get_version(), src->is_save_last(), src->get_unsync_cnt_for_multi_data()))) { // mark finish TRANS_LOG(WARN, "failed to makr unit sync finish", K(ret), K(list_pos), KPC(src)); } diff --git a/unittest/storage/memtable/test_memtable_basic.cpp b/unittest/storage/memtable/test_memtable_basic.cpp index ebff8bf49..06cb16388 100644 --- a/unittest/storage/memtable/test_memtable_basic.cpp +++ b/unittest/storage/memtable/test_memtable_basic.cpp @@ -31,6 +31,8 @@ #include "share/scn.h" #include "storage/ls/ob_ls.h" #include "storage/tx_storage/ob_ls_map.h" +#include "share/schema/ob_column_schema.h" +#include "storage/ob_storage_schema.h" namespace oceanbase { @@ -191,6 +193,57 @@ public: return OB_SUCCESS; } + void prepare_schema(share::schema::ObTableSchema &table_schema) + { + int ret = OB_SUCCESS; + int64_t micro_block_size = 16 * 1024; + const uint64_t tenant_id = 1; + const uint64_t table_id = 777; + share::schema::ObColumnSchemaV2 column; + + //generate data table schema + table_schema.reset(); + ret = table_schema.set_table_name("test_merge_multi_version"); + ASSERT_EQ(OB_SUCCESS, ret); + table_schema.set_tenant_id(tenant_id); + table_schema.set_tablegroup_id(1); + table_schema.set_database_id(1); + table_schema.set_table_id(table_id); + table_schema.set_rowkey_column_num(3); + table_schema.set_max_used_column_id(6); + table_schema.set_block_size(micro_block_size); + table_schema.set_compress_func_name("none"); + table_schema.set_row_store_type(FLAT_ROW_STORE); + //init column + char name[OB_MAX_FILE_NAME_LENGTH]; + memset(name, 0, sizeof(name)); + const int64_t column_ids[] = {16,17,20,21,22,23,24,29}; + for(int64_t i = 0; i < 6; ++i){ + ObObjType obj_type = ObIntType; + const int64_t column_id = column_ids[i]; + + if (i == 1) { + obj_type = ObVarcharType; + } + column.reset(); + column.set_table_id(table_id); + column.set_column_id(column_id); + sprintf(name, "test%020ld", i); + ASSERT_EQ(OB_SUCCESS, column.set_column_name(name)); + column.set_data_type(obj_type); + column.set_collation_type(CS_TYPE_UTF8MB4_GENERAL_CI); + column.set_data_length(10); + if (i < 3) { + column.set_rowkey_position(i + 1); + } else { + column.set_rowkey_position(0); + } + COMMON_LOG(INFO, "add column", K(i), K(column)); + ASSERT_EQ(OB_SUCCESS, table_schema.add_column(column)); + } + COMMON_LOG(INFO, "dump stable schema", K(table_schema)); + } + void reset_iter_param() { iter_param_.reset(); @@ -449,6 +502,29 @@ TEST_F(TestMemtable, multi_key) print(mvcc_row2); } +TEST_F(TestMemtable, test_unsync_cnt_for_multi_data) +{ + ObMemtable memtable; + EXPECT_EQ(OB_SUCCESS, init_memtable(memtable)); + share::schema::ObTableSchema table_schema; + ObStorageSchema storage_schema; + bool is_callback = true; + bool for_replay = true; + share::SCN scn; + + ASSERT_EQ(OB_SUCCESS, scn.convert_from_ts(100)); + prepare_schema(table_schema); + ASSERT_EQ(OB_SUCCESS, storage_schema.init(allocator_, table_schema, lib::Worker::CompatMode::MYSQL)); + + storage_schema.set_sync_finish(false); + ASSERT_EQ(OB_SUCCESS, memtable.save_multi_source_data_unit(&storage_schema, scn, !for_replay, memtable::MemtableRefOp::INC_REF, !is_callback)); + ASSERT_EQ(1, storage_schema.get_unsync_cnt_for_multi_data()); + + storage_schema.set_sync_finish(true); + ASSERT_EQ(OB_SUCCESS, memtable.save_multi_source_data_unit(&storage_schema, scn, !for_replay, memtable::MemtableRefOp::DEC_REF, is_callback)); + ASSERT_EQ(0, storage_schema.get_unsync_cnt_for_multi_data()); +} + }// end of oceanbase diff --git a/unittest/storage/test_storage_schema.cpp b/unittest/storage/test_storage_schema.cpp index 7ac39f3a3..cf425b3d2 100644 --- a/unittest/storage/test_storage_schema.cpp +++ b/unittest/storage/test_storage_schema.cpp @@ -299,7 +299,8 @@ TEST_F(TestStorageSchema, test_storage_schema_list_in_memtable) ASSERT_EQ(OB_ERR_UNEXPECTED, source_data.inner_mark_unit_sync_finish( list_pos, storage_schema.get_version(), // invalid version - true)); + true, + 0)); ASSERT_EQ(OB_SUCCESS, source_data.save_multi_source_data_unit(&storage_schema, is_callback)); @@ -310,7 +311,8 @@ TEST_F(TestStorageSchema, test_storage_schema_list_in_memtable) ASSERT_EQ(OB_SUCCESS, source_data.inner_mark_unit_sync_finish( list_pos, storage_schema.get_version(), - true)); + true, + 0)); ASSERT_EQ(OB_SUCCESS, source_data.get_multi_source_data_unit(&read_storage_schema, &allocator_)); ASSERT_EQ(true, judge_storage_schema_equal(storage_schema, read_storage_schema)); ASSERT_EQ(true, source_data.has_multi_source_data_unit(pos)); @@ -332,7 +334,8 @@ TEST_F(TestStorageSchema, test_storage_schema_list_in_memtable) ASSERT_EQ(OB_ERR_UNEXPECTED, source_data.inner_mark_unit_sync_finish( list_pos, storage_schema.get_version() + 1, // invalid version - true)); + true, + 0)); storage_schema.set_sync_finish(false); ASSERT_EQ(OB_SUCCESS, source_data.save_multi_source_data_unit(&storage_schema, true));