fix storage schema bug that dec unsync_cnt_for_multi_data
This commit is contained in:
@ -159,7 +159,8 @@ void ObMultiSourceData::inner_release_rest_unit_data(
|
|||||||
int ObMultiSourceData::inner_mark_unit_sync_finish(
|
int ObMultiSourceData::inner_mark_unit_sync_finish(
|
||||||
const int64_t list_pos,
|
const int64_t list_pos,
|
||||||
const int64_t unit_version,
|
const int64_t unit_version,
|
||||||
bool save_last_flag)
|
bool save_last_flag,
|
||||||
|
const int unsync_cnt_for_multi_data)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_UNLIKELY(list_pos < 0 || list_pos >= MAX_LIST_COUNT)) {
|
if (OB_UNLIKELY(list_pos < 0 || list_pos >= MAX_LIST_COUNT)) {
|
||||||
@ -176,6 +177,7 @@ int ObMultiSourceData::inner_mark_unit_sync_finish(
|
|||||||
KPC(last_item));
|
KPC(last_item));
|
||||||
} else {
|
} else {
|
||||||
last_item->set_sync_finish(true);
|
last_item->set_sync_finish(true);
|
||||||
|
last_item->set_unsync_cnt_for_multi_data(unsync_cnt_for_multi_data);
|
||||||
|
|
||||||
if (save_last_flag) {
|
if (save_last_flag) {
|
||||||
(void)inner_release_rest_unit_data(list_pos, unit_version);
|
(void)inner_release_rest_unit_data(list_pos, unit_version);
|
||||||
|
|||||||
@ -97,6 +97,7 @@ public:
|
|||||||
TRANS_LOG(INFO, "unsync_cnt_for_multi_data dec", KPC(this));
|
TRANS_LOG(INFO, "unsync_cnt_for_multi_data dec", KPC(this));
|
||||||
}
|
}
|
||||||
int get_unsync_cnt_for_multi_data() const { return unsynced_cnt_for_multi_data_; }
|
int get_unsync_cnt_for_multi_data() const { return unsynced_cnt_for_multi_data_; }
|
||||||
|
void set_unsync_cnt_for_multi_data(const int unsynced_cnt_for_multi_data) { unsynced_cnt_for_multi_data_ = unsynced_cnt_for_multi_data; }
|
||||||
virtual bool is_save_last() const { return true; } // only store one data unit with sync_finish=true
|
virtual bool is_save_last() const { return true; } // only store one data unit with sync_finish=true
|
||||||
virtual int64_t get_version() const { return common::OB_INVALID_VERSION; } // have to implement for unit list type
|
virtual int64_t get_version() const { return common::OB_INVALID_VERSION; } // have to implement for unit list type
|
||||||
VIRTUAL_TO_STRING_KV(K_(is_tx_end),
|
VIRTUAL_TO_STRING_KV(K_(is_tx_end),
|
||||||
@ -142,7 +143,8 @@ private:
|
|||||||
int inner_mark_unit_sync_finish(
|
int inner_mark_unit_sync_finish(
|
||||||
const int64_t unit_type,
|
const int64_t unit_type,
|
||||||
const int64_t unit_version,
|
const int64_t unit_version,
|
||||||
bool save_last_flag);
|
bool save_last_flag,
|
||||||
|
const int unsync_cnt_for_multi_data);
|
||||||
void inner_release_rest_unit_data(
|
void inner_release_rest_unit_data(
|
||||||
const int64_t list_pos,
|
const int64_t list_pos,
|
||||||
const int64_t unit_version);
|
const int64_t unit_version);
|
||||||
@ -209,7 +211,7 @@ int ObMultiSourceData::save_multi_source_data_unit_in_list(const T *const src, b
|
|||||||
(void)inner_release_rest_unit_data(list_pos, src->get_version());
|
(void)inner_release_rest_unit_data(list_pos, src->get_version());
|
||||||
}
|
}
|
||||||
} else if (src->is_sync_finish()
|
} else if (src->is_sync_finish()
|
||||||
&& OB_FAIL(inner_mark_unit_sync_finish(list_pos, src->get_version(), src->is_save_last()))) { // mark finish
|
&& OB_FAIL(inner_mark_unit_sync_finish(list_pos, src->get_version(), src->is_save_last(), src->get_unsync_cnt_for_multi_data()))) { // mark finish
|
||||||
TRANS_LOG(WARN, "failed to makr unit sync finish", K(ret), K(list_pos), KPC(src));
|
TRANS_LOG(WARN, "failed to makr unit sync finish", K(ret), K(list_pos), KPC(src));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -31,6 +31,8 @@
|
|||||||
#include "share/scn.h"
|
#include "share/scn.h"
|
||||||
#include "storage/ls/ob_ls.h"
|
#include "storage/ls/ob_ls.h"
|
||||||
#include "storage/tx_storage/ob_ls_map.h"
|
#include "storage/tx_storage/ob_ls_map.h"
|
||||||
|
#include "share/schema/ob_column_schema.h"
|
||||||
|
#include "storage/ob_storage_schema.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
@ -191,6 +193,57 @@ public:
|
|||||||
return OB_SUCCESS;
|
return OB_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void prepare_schema(share::schema::ObTableSchema &table_schema)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
int64_t micro_block_size = 16 * 1024;
|
||||||
|
const uint64_t tenant_id = 1;
|
||||||
|
const uint64_t table_id = 777;
|
||||||
|
share::schema::ObColumnSchemaV2 column;
|
||||||
|
|
||||||
|
//generate data table schema
|
||||||
|
table_schema.reset();
|
||||||
|
ret = table_schema.set_table_name("test_merge_multi_version");
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
table_schema.set_tenant_id(tenant_id);
|
||||||
|
table_schema.set_tablegroup_id(1);
|
||||||
|
table_schema.set_database_id(1);
|
||||||
|
table_schema.set_table_id(table_id);
|
||||||
|
table_schema.set_rowkey_column_num(3);
|
||||||
|
table_schema.set_max_used_column_id(6);
|
||||||
|
table_schema.set_block_size(micro_block_size);
|
||||||
|
table_schema.set_compress_func_name("none");
|
||||||
|
table_schema.set_row_store_type(FLAT_ROW_STORE);
|
||||||
|
//init column
|
||||||
|
char name[OB_MAX_FILE_NAME_LENGTH];
|
||||||
|
memset(name, 0, sizeof(name));
|
||||||
|
const int64_t column_ids[] = {16,17,20,21,22,23,24,29};
|
||||||
|
for(int64_t i = 0; i < 6; ++i){
|
||||||
|
ObObjType obj_type = ObIntType;
|
||||||
|
const int64_t column_id = column_ids[i];
|
||||||
|
|
||||||
|
if (i == 1) {
|
||||||
|
obj_type = ObVarcharType;
|
||||||
|
}
|
||||||
|
column.reset();
|
||||||
|
column.set_table_id(table_id);
|
||||||
|
column.set_column_id(column_id);
|
||||||
|
sprintf(name, "test%020ld", i);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, column.set_column_name(name));
|
||||||
|
column.set_data_type(obj_type);
|
||||||
|
column.set_collation_type(CS_TYPE_UTF8MB4_GENERAL_CI);
|
||||||
|
column.set_data_length(10);
|
||||||
|
if (i < 3) {
|
||||||
|
column.set_rowkey_position(i + 1);
|
||||||
|
} else {
|
||||||
|
column.set_rowkey_position(0);
|
||||||
|
}
|
||||||
|
COMMON_LOG(INFO, "add column", K(i), K(column));
|
||||||
|
ASSERT_EQ(OB_SUCCESS, table_schema.add_column(column));
|
||||||
|
}
|
||||||
|
COMMON_LOG(INFO, "dump stable schema", K(table_schema));
|
||||||
|
}
|
||||||
|
|
||||||
void reset_iter_param()
|
void reset_iter_param()
|
||||||
{
|
{
|
||||||
iter_param_.reset();
|
iter_param_.reset();
|
||||||
@ -449,6 +502,29 @@ TEST_F(TestMemtable, multi_key)
|
|||||||
print(mvcc_row2);
|
print(mvcc_row2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(TestMemtable, test_unsync_cnt_for_multi_data)
|
||||||
|
{
|
||||||
|
ObMemtable memtable;
|
||||||
|
EXPECT_EQ(OB_SUCCESS, init_memtable(memtable));
|
||||||
|
share::schema::ObTableSchema table_schema;
|
||||||
|
ObStorageSchema storage_schema;
|
||||||
|
bool is_callback = true;
|
||||||
|
bool for_replay = true;
|
||||||
|
share::SCN scn;
|
||||||
|
|
||||||
|
ASSERT_EQ(OB_SUCCESS, scn.convert_from_ts(100));
|
||||||
|
prepare_schema(table_schema);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, storage_schema.init(allocator_, table_schema, lib::Worker::CompatMode::MYSQL));
|
||||||
|
|
||||||
|
storage_schema.set_sync_finish(false);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, memtable.save_multi_source_data_unit(&storage_schema, scn, !for_replay, memtable::MemtableRefOp::INC_REF, !is_callback));
|
||||||
|
ASSERT_EQ(1, storage_schema.get_unsync_cnt_for_multi_data());
|
||||||
|
|
||||||
|
storage_schema.set_sync_finish(true);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, memtable.save_multi_source_data_unit(&storage_schema, scn, !for_replay, memtable::MemtableRefOp::DEC_REF, is_callback));
|
||||||
|
ASSERT_EQ(0, storage_schema.get_unsync_cnt_for_multi_data());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}// end of oceanbase
|
}// end of oceanbase
|
||||||
|
|
||||||
|
|||||||
@ -299,7 +299,8 @@ TEST_F(TestStorageSchema, test_storage_schema_list_in_memtable)
|
|||||||
ASSERT_EQ(OB_ERR_UNEXPECTED, source_data.inner_mark_unit_sync_finish(
|
ASSERT_EQ(OB_ERR_UNEXPECTED, source_data.inner_mark_unit_sync_finish(
|
||||||
list_pos,
|
list_pos,
|
||||||
storage_schema.get_version(), // invalid version
|
storage_schema.get_version(), // invalid version
|
||||||
true));
|
true,
|
||||||
|
0));
|
||||||
|
|
||||||
ASSERT_EQ(OB_SUCCESS, source_data.save_multi_source_data_unit(&storage_schema, is_callback));
|
ASSERT_EQ(OB_SUCCESS, source_data.save_multi_source_data_unit(&storage_schema, is_callback));
|
||||||
|
|
||||||
@ -310,7 +311,8 @@ TEST_F(TestStorageSchema, test_storage_schema_list_in_memtable)
|
|||||||
ASSERT_EQ(OB_SUCCESS, source_data.inner_mark_unit_sync_finish(
|
ASSERT_EQ(OB_SUCCESS, source_data.inner_mark_unit_sync_finish(
|
||||||
list_pos,
|
list_pos,
|
||||||
storage_schema.get_version(),
|
storage_schema.get_version(),
|
||||||
true));
|
true,
|
||||||
|
0));
|
||||||
ASSERT_EQ(OB_SUCCESS, source_data.get_multi_source_data_unit(&read_storage_schema, &allocator_));
|
ASSERT_EQ(OB_SUCCESS, source_data.get_multi_source_data_unit(&read_storage_schema, &allocator_));
|
||||||
ASSERT_EQ(true, judge_storage_schema_equal(storage_schema, read_storage_schema));
|
ASSERT_EQ(true, judge_storage_schema_equal(storage_schema, read_storage_schema));
|
||||||
ASSERT_EQ(true, source_data.has_multi_source_data_unit(pos));
|
ASSERT_EQ(true, source_data.has_multi_source_data_unit(pos));
|
||||||
@ -332,7 +334,8 @@ TEST_F(TestStorageSchema, test_storage_schema_list_in_memtable)
|
|||||||
ASSERT_EQ(OB_ERR_UNEXPECTED, source_data.inner_mark_unit_sync_finish(
|
ASSERT_EQ(OB_ERR_UNEXPECTED, source_data.inner_mark_unit_sync_finish(
|
||||||
list_pos,
|
list_pos,
|
||||||
storage_schema.get_version() + 1, // invalid version
|
storage_schema.get_version() + 1, // invalid version
|
||||||
true));
|
true,
|
||||||
|
0));
|
||||||
|
|
||||||
storage_schema.set_sync_finish(false);
|
storage_schema.set_sync_finish(false);
|
||||||
ASSERT_EQ(OB_SUCCESS, source_data.save_multi_source_data_unit(&storage_schema, true));
|
ASSERT_EQ(OB_SUCCESS, source_data.save_multi_source_data_unit(&storage_schema, true));
|
||||||
|
|||||||
Reference in New Issue
Block a user