fix iterate flag with same sql_seq
This commit is contained in:
@ -1337,6 +1337,8 @@ int ObMemtableMultiVersionScanIterator::iterate_uncommitted_row_value_(ObStoreRo
|
|||||||
if (OB_ITER_END != ret) {
|
if (OB_ITER_END != ret) {
|
||||||
TRANS_LOG(WARN, "failed to get next uncommitted node", K(ret), K(tnode));
|
TRANS_LOG(WARN, "failed to get next uncommitted node", K(ret), K(tnode));
|
||||||
}
|
}
|
||||||
|
} else if (ObActionFlag::OP_DEL_ROW == row.flag_) {
|
||||||
|
continue;
|
||||||
} else {
|
} else {
|
||||||
const ObMvccTransNode* trans_node = reinterpret_cast<const ObMvccTransNode*>(tnode);
|
const ObMvccTransNode* trans_node = reinterpret_cast<const ObMvccTransNode*>(tnode);
|
||||||
if (OB_ISNULL(trans_node)) {
|
if (OB_ISNULL(trans_node)) {
|
||||||
|
|||||||
@ -207,6 +207,7 @@ OB_INLINE static int simple_fuse_row(
|
|||||||
nop_pos.reset();
|
nop_pos.reset();
|
||||||
result.flag_ = former.flag_;
|
result.flag_ = former.flag_;
|
||||||
result.from_base_ = former.from_base_;
|
result.from_base_ = former.from_base_;
|
||||||
|
result.dml_ = former.dml_;
|
||||||
column_cnt = former.row_val_.count_;
|
column_cnt = former.row_val_.count_;
|
||||||
} else {
|
} else {
|
||||||
column_cnt = nop_pos.count_;
|
column_cnt = nop_pos.count_;
|
||||||
|
|||||||
@ -833,7 +833,9 @@ int ObMockIteratorBuilder::static_init()
|
|||||||
OB_SUCCESS != str_to_dml_.set_refactored(ObString::make_string("DELETE"), T_DML_DELETE) ||
|
OB_SUCCESS != str_to_dml_.set_refactored(ObString::make_string("DELETE"), T_DML_DELETE) ||
|
||||||
OB_SUCCESS != str_to_dml_.set_refactored(ObString::make_string("T_DML_DELETE"), T_DML_DELETE) ||
|
OB_SUCCESS != str_to_dml_.set_refactored(ObString::make_string("T_DML_DELETE"), T_DML_DELETE) ||
|
||||||
OB_SUCCESS != str_to_dml_.set_refactored(ObString::make_string("REPLACE"), T_DML_REPLACE) ||
|
OB_SUCCESS != str_to_dml_.set_refactored(ObString::make_string("REPLACE"), T_DML_REPLACE) ||
|
||||||
OB_SUCCESS != str_to_dml_.set_refactored(ObString::make_string("T_DML_REPLACE"), T_DML_REPLACE)) {
|
OB_SUCCESS != str_to_dml_.set_refactored(ObString::make_string("T_DML_REPLACE"), T_DML_REPLACE) ||
|
||||||
|
OB_SUCCESS != str_to_dml_.set_refactored(ObString::make_string("LOCK"), T_DML_LOCK) ||
|
||||||
|
OB_SUCCESS != str_to_dml_.set_refactored(ObString::make_string("T_DML_LOCK"), T_DML_LOCK)) {
|
||||||
ret = OB_INIT_FAIL;
|
ret = OB_INIT_FAIL;
|
||||||
STORAGE_LOG(WARN, "dml hashtable insert failed");
|
STORAGE_LOG(WARN, "dml hashtable insert failed");
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@ -289,7 +289,7 @@ public:
|
|||||||
static const int TYPE_NUM = 6;
|
static const int TYPE_NUM = 6;
|
||||||
static const int INFO_NUM = 2;
|
static const int INFO_NUM = 2;
|
||||||
static const int FLAG_NUM = 5;
|
static const int FLAG_NUM = 5;
|
||||||
static const int DML_NUM = 3;
|
static const int DML_NUM = 6;
|
||||||
static const int BASE_NUM = 2;
|
static const int BASE_NUM = 2;
|
||||||
static const int MULTI_VERSION_ROW_FLAG_NUM = 5;
|
static const int MULTI_VERSION_ROW_FLAG_NUM = 5;
|
||||||
static const int TRANS_ID_NUM = 10;
|
static const int TRANS_ID_NUM = 10;
|
||||||
|
|||||||
@ -3245,6 +3245,91 @@ TEST_F(TestMultiVersionMerge, test_trans_cross_sstable)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(TestMultiVersionMerge, test_merge_with_dml)
|
||||||
|
{
|
||||||
|
GCONF._enable_sparse_row = false;
|
||||||
|
ObMemtableCtxFactory mem_ctx;
|
||||||
|
ObSSTableMergeCtx merge_context;
|
||||||
|
const int64_t rowkey_cnt = TEST_ROWKEY_COLUMN_CNT + ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
|
||||||
|
storage::ObTablesHandle tables_handle;
|
||||||
|
ObSSTable sstable1;
|
||||||
|
const char *macro_data[1];
|
||||||
|
macro_data[0] =
|
||||||
|
"bigint var bigint bigint bigint bigint dml flag multi_version_row_flag trans_id\n"
|
||||||
|
"1 var1 MIN -28 NOP NOP T_DML_DELETE DELETE LU trans_id_1\n"
|
||||||
|
"2 var2 MIN -10 NOP NOP T_DML_DELETE DELETE LU trans_id_1\n"
|
||||||
|
"3 var3 MIN -21 NOP NOP T_DML_LOCK EXIST LU trans_id_1\n"
|
||||||
|
"4 var4 MIN -71 18 28 T_DML_INSERT EXIST U trans_id_1\n"
|
||||||
|
"4 var4 MIN -12 NOP NOP T_DML_DELETE EXIST LU trans_id_1\n"
|
||||||
|
"5 var5 MIN -18 NOP NOP T_DML_LOCK EXIST LU trans_id_1\n";
|
||||||
|
|
||||||
|
|
||||||
|
prepare_data_start(sstable1, macro_data, rowkey_cnt, 9, "none", FLAT_ROW_STORE, 0);
|
||||||
|
prepare_one_macro(macro_data, 1);
|
||||||
|
prepare_data_end(sstable1);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, tables_handle.add_table(&sstable1));
|
||||||
|
STORAGE_LOG(INFO, "finish prepare sstable1");
|
||||||
|
|
||||||
|
ObSSTable sstable2;
|
||||||
|
const char *macro_data2[1];
|
||||||
|
macro_data2[0] = "bigint var bigint bigint bigint bigint dml flag multi_version_row_flag trans_id\n"
|
||||||
|
"6 var6 MIN -18 NOP NOP T_DML_LOCK EXIST LU trans_id_1\n";
|
||||||
|
|
||||||
|
prepare_data_start(sstable2, macro_data2, rowkey_cnt, 10, "none", FLAT_ROW_STORE, 0);
|
||||||
|
prepare_one_macro(macro_data2, 1);
|
||||||
|
prepare_data_end(sstable2);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, tables_handle.add_table(&sstable2));
|
||||||
|
STORAGE_LOG(INFO, "finish prepare sstable2");
|
||||||
|
|
||||||
|
// make all trans running
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
test_trans_part_ctx_.clear_all();
|
||||||
|
if (OB_FAIL(test_trans_part_ctx_.add_transaction_status(
|
||||||
|
transaction::ObTransTableStatusType::COMMIT, 29))) {
|
||||||
|
STORAGE_LOG(ERROR, "add transaction status failed", K(ret));
|
||||||
|
}
|
||||||
|
|
||||||
|
ObVersionRange trans_version_range;
|
||||||
|
trans_version_range.snapshot_version_ = 100;
|
||||||
|
trans_version_range.multi_version_start_ = 1;
|
||||||
|
trans_version_range.base_version_ = 1;
|
||||||
|
|
||||||
|
prepare_merge_context(tables_handle, MINI_MINOR_MERGE, false, trans_version_range, merge_context);
|
||||||
|
ObMockIterator res_iter;
|
||||||
|
ObStoreRowIterator *scanner = NULL;
|
||||||
|
ObExtStoreRange range;
|
||||||
|
|
||||||
|
const char *result1 =
|
||||||
|
"bigint var bigint bigint bigint bigint dml flag multi_version_row_flag\n"
|
||||||
|
"1 var1 -29 -28 NOP NOP T_DML_DELETE DELETE CL\n"
|
||||||
|
"2 var2 -29 -10 NOP NOP T_DML_DELETE DELETE CL\n"
|
||||||
|
"3 var3 -29 -21 NOP NOP T_DML_LOCK EXIST CL\n"
|
||||||
|
"4 var4 -29 -71 18 28 T_DML_INSERT EXIST CL\n"
|
||||||
|
"5 var5 -29 -18 NOP NOP T_DML_LOCK EXIST CL\n"
|
||||||
|
"6 var6 -29 -18 NOP NOP T_DML_LOCK EXIST CL\n";
|
||||||
|
|
||||||
|
// minor mrege
|
||||||
|
|
||||||
|
ObMacroBlockBuilder builder;
|
||||||
|
ObSSTable *merged_sstable = nullptr;
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ObPartitionMergeUtil::merge_partition(&mem_ctx, merge_context, builder, 0));
|
||||||
|
build_sstable(merge_context, merged_sstable);
|
||||||
|
|
||||||
|
res_iter.reset();
|
||||||
|
range.get_range().set_whole_range();
|
||||||
|
prepare_query_param(trans_version_range);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, range.to_collation_free_range_on_demand_and_cutoff_range(allocator_));
|
||||||
|
if (OB_NOT_NULL(merged_sstable)) {
|
||||||
|
context_.read_out_type_ = FLAT_ROW_STORE;
|
||||||
|
ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(param_, context_, range, scanner));
|
||||||
|
ASSERT_EQ(OB_SUCCESS, res_iter.from(result1));
|
||||||
|
ASSERT_TRUE(res_iter.equals(*scanner, true));
|
||||||
|
scanner->~ObStoreRowIterator();
|
||||||
|
} else {
|
||||||
|
STORAGE_LOG(ERROR, "merged_sstable is null");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace storage
|
} // namespace storage
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user