close skip_ghost_row for wrong is_rowkey_first_row_already_output_ flag
This commit is contained in:
@ -80,7 +80,14 @@ int ObTxTable::check_with_tx_data(ObReadTxDataArg &read_tx_data_arg, ObITxDataCh
|
|||||||
for (int i = 0; i < TX_DATA_ARR.count(); i++)
|
for (int i = 0; i < TX_DATA_ARR.count(); i++)
|
||||||
{
|
{
|
||||||
if (read_tx_data_arg.tx_id_ == TX_DATA_ARR.at(i).tx_id_) {
|
if (read_tx_data_arg.tx_id_ == TX_DATA_ARR.at(i).tx_id_) {
|
||||||
ret = fn(TX_DATA_ARR[i]);
|
if (TX_DATA_ARR.at(i).state_ == ObTxData::RUNNING) {
|
||||||
|
SCN tmp_scn;
|
||||||
|
tmp_scn.convert_from_ts(30);
|
||||||
|
ObTxCCCtx tmp_ctx(ObTxState::PREPARE, tmp_scn);
|
||||||
|
ret = fn(TX_DATA_ARR[i], &tmp_ctx);
|
||||||
|
} else {
|
||||||
|
ret = fn(TX_DATA_ARR[i]);
|
||||||
|
}
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
STORAGE_LOG(ERROR, "check with tx data failed", KR(ret), K(read_tx_data_arg), K(TX_DATA_ARR.at(i)));
|
STORAGE_LOG(ERROR, "check with tx data failed", KR(ret), K(read_tx_data_arg), K(TX_DATA_ARR.at(i)));
|
||||||
}
|
}
|
||||||
@ -1449,7 +1456,6 @@ TEST_F(TestMultiVersionMerge, test_merge_with_multi_trans_can_not_compact)
|
|||||||
tx_data->state_ = ObTxData::RUNNING;
|
tx_data->state_ = ObTxData::RUNNING;
|
||||||
transaction::ObUndoAction undo_action(ObTxSEQ(2, 0),ObTxSEQ(1, 0));
|
transaction::ObUndoAction undo_action(ObTxSEQ(2, 0),ObTxSEQ(1, 0));
|
||||||
tx_data->add_undo_action(tx_table, undo_action);
|
tx_data->add_undo_action(tx_table, undo_action);
|
||||||
;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT_EQ(OB_SUCCESS, tx_table->insert(tx_data));
|
ASSERT_EQ(OB_SUCCESS, tx_table->insert(tx_data));
|
||||||
@ -3272,6 +3278,120 @@ TEST_F(TestMultiVersionMerge, test_trans_cross_macro_with_ghost_row2)
|
|||||||
merger.reset();
|
merger.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(TestMultiVersionMerge, test_running_trans_cross_macro_with_abort_sql_seq)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObTabletMergeDagParam param;
|
||||||
|
ObTabletMergeCtx merge_context(param, allocator_);
|
||||||
|
ObPartitionMinorMerger merger(local_arena_, merge_context.static_param_);
|
||||||
|
|
||||||
|
ObTableHandleV2 handle1;
|
||||||
|
const char *micro_data[2];
|
||||||
|
micro_data[0] =
|
||||||
|
"bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n"
|
||||||
|
"1 var1 MIN -10 8 NOP EXIST FU trans_id_1\n";
|
||||||
|
|
||||||
|
micro_data[1] =
|
||||||
|
"bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n"
|
||||||
|
"1 var1 MIN -8 8 NOP EXIST LU trans_id_1\n"
|
||||||
|
"3 var3 -16 0 18 NOP EXIST LF trans_id_0\n";
|
||||||
|
|
||||||
|
int schema_rowkey_cnt = 2;
|
||||||
|
|
||||||
|
int64_t snapshot_version = 10;
|
||||||
|
share::ObScnRange scn_range;
|
||||||
|
scn_range.start_scn_.set_min();
|
||||||
|
scn_range.end_scn_.convert_for_tx(30);
|
||||||
|
prepare_table_schema(micro_data, schema_rowkey_cnt, scn_range, snapshot_version);
|
||||||
|
reset_writer(snapshot_version);
|
||||||
|
prepare_one_macro(micro_data, 1);
|
||||||
|
prepare_one_macro(µ_data[1], 1);
|
||||||
|
prepare_data_end(handle1);
|
||||||
|
merge_context.static_param_.tables_handle_.add_table(handle1);
|
||||||
|
STORAGE_LOG(INFO, "finish prepare sstable1");
|
||||||
|
|
||||||
|
ObTableHandleV2 handle2;
|
||||||
|
const char *micro_data2[2];
|
||||||
|
micro_data2[0] =
|
||||||
|
"bigint var bigint bigint bigint bigint flag multi_version_row_flag\n"
|
||||||
|
"0 var0 -26 0 7 NOP EXIST LF\n"
|
||||||
|
"2 var2 -26 0 7 NOP EXIST LF\n";
|
||||||
|
|
||||||
|
snapshot_version = 20;
|
||||||
|
scn_range.start_scn_.convert_for_tx(30);
|
||||||
|
scn_range.end_scn_.convert_for_tx(50);
|
||||||
|
table_key_.scn_range_ = scn_range;
|
||||||
|
reset_writer(snapshot_version);
|
||||||
|
prepare_one_macro(micro_data2, 1);
|
||||||
|
prepare_data_end(handle2);
|
||||||
|
merge_context.static_param_.tables_handle_.add_table(handle2);
|
||||||
|
STORAGE_LOG(INFO, "finish prepare sstable2");
|
||||||
|
|
||||||
|
ObLSID ls_id(ls_id_);
|
||||||
|
ObTabletID tablet_id(tablet_id_);
|
||||||
|
ObLSHandle ls_handle;
|
||||||
|
ObLSService *ls_svr = MTL(ObLSService*);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ls_svr->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD));
|
||||||
|
|
||||||
|
ObTxTable *tx_table = nullptr;
|
||||||
|
ObTxTableGuard tx_table_guard;
|
||||||
|
ls_handle.get_ls()->get_tx_table_guard(tx_table_guard);
|
||||||
|
ASSERT_NE(nullptr, tx_table = tx_table_guard.get_tx_table());
|
||||||
|
|
||||||
|
ObTxData *tx_data = new ObTxData();
|
||||||
|
transaction::ObTransID tx_id = 1;
|
||||||
|
|
||||||
|
// fill in data
|
||||||
|
tx_data->tx_id_ = tx_id;
|
||||||
|
tx_data->commit_version_.convert_for_tx(INT64_MAX);
|
||||||
|
tx_data->start_scn_.convert_for_tx(1);
|
||||||
|
tx_data->end_scn_.convert_for_tx(50);
|
||||||
|
tx_data->state_ = ObTxData::RUNNING;
|
||||||
|
transaction::ObUndoAction undo_action(ObTxSEQ(9, 0), ObTxSEQ(1, 0));
|
||||||
|
tx_data->add_undo_action(tx_table, undo_action);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, tx_table->insert(tx_data));
|
||||||
|
delete tx_data;
|
||||||
|
|
||||||
|
ObVersionRange trans_version_range;
|
||||||
|
trans_version_range.snapshot_version_ = 100;
|
||||||
|
trans_version_range.multi_version_start_ = 18;
|
||||||
|
trans_version_range.base_version_ = 18;
|
||||||
|
|
||||||
|
prepare_merge_context(MINOR_MERGE, false, trans_version_range, merge_context);
|
||||||
|
// minor mrege
|
||||||
|
ObSSTable *merged_sstable = nullptr;
|
||||||
|
ASSERT_EQ(OB_SUCCESS, merger.merge_partition(merge_context, 0));
|
||||||
|
build_sstable(merge_context, merged_sstable);
|
||||||
|
|
||||||
|
const char *result1 =
|
||||||
|
"bigint var bigint bigint bigint bigint flag multi_version_row_flag trans_id\n"
|
||||||
|
"0 var0 -26 0 7 NOP EXIST LF trans_id_0\n"
|
||||||
|
"1 var1 MIN -10 8 NOP EXIST FU trans_id_1\n"
|
||||||
|
"1 var1 MAX MAX NOP NOP EXIST LG trans_id_1\n"
|
||||||
|
"2 var2 -26 0 7 NOP EXIST LF trans_id_0\n"
|
||||||
|
"3 var3 -16 0 18 NOP EXIST LF trans_id_0\n";
|
||||||
|
|
||||||
|
ObMockIterator res_iter;
|
||||||
|
ObStoreRowIterator *scanner = NULL;
|
||||||
|
ObDatumRange range;
|
||||||
|
res_iter.reset();
|
||||||
|
range.set_whole_range();
|
||||||
|
trans_version_range.base_version_ = 1;
|
||||||
|
trans_version_range.multi_version_start_ = 1;
|
||||||
|
trans_version_range.snapshot_version_ = INT64_MAX;
|
||||||
|
prepare_query_param(trans_version_range);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, merged_sstable->scan(iter_param_, context_, range, scanner));
|
||||||
|
ASSERT_EQ(OB_SUCCESS, res_iter.from(result1));
|
||||||
|
ObMockDirectReadIterator sstable_iter;
|
||||||
|
ASSERT_EQ(OB_SUCCESS, sstable_iter.init(scanner, allocator_, full_read_info_));
|
||||||
|
ASSERT_TRUE(res_iter.equals(sstable_iter, true/*cmp multi version row flag*/));
|
||||||
|
ASSERT_EQ(OB_SUCCESS, clear_tx_data());
|
||||||
|
scanner->~ObStoreRowIterator();
|
||||||
|
handle1.reset();
|
||||||
|
handle2.reset();
|
||||||
|
merger.reset();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -2023,7 +2023,6 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::inner_get_next_row(const ObDat
|
|||||||
// set delete/insert committed row compacted
|
// set delete/insert committed row compacted
|
||||||
row_.set_compacted_multi_version_row();
|
row_.set_compacted_multi_version_row();
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1604,10 +1604,12 @@ int ObPartitionMinorRowMergeIter::next()
|
|||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("Unexpceted null current row", K(ret), K(*this));
|
LOG_WARN("Unexpceted null current row", K(ret), K(*this));
|
||||||
}
|
}
|
||||||
|
/* TODO is_rowkey_first_row_already_output_ is not right, can't skip ghost row now
|
||||||
} else if (OB_FAIL(skip_ghost_row())) {
|
} else if (OB_FAIL(skip_ghost_row())) {
|
||||||
if (OB_UNLIKELY(ret != OB_ITER_END)) {
|
if (OB_UNLIKELY(ret != OB_ITER_END)) {
|
||||||
LOG_WARN("Failed to skip_ghost_row", K(ret));
|
LOG_WARN("Failed to skip_ghost_row", K(ret));
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
} else if (need_recycle_mv_row()) {
|
} else if (need_recycle_mv_row()) {
|
||||||
if (OB_FAIL(compact_old_row())) {
|
if (OB_FAIL(compact_old_row())) {
|
||||||
LOG_WARN("Failed to compact_old_row", K(ret));
|
LOG_WARN("Failed to compact_old_row", K(ret));
|
||||||
@ -1619,7 +1621,6 @@ int ObPartitionMinorRowMergeIter::next()
|
|||||||
if (OB_SUCC(ret) && curr_row_ != nullptr && curr_row_->is_last_multi_version_row()) {
|
if (OB_SUCC(ret) && curr_row_ != nullptr && curr_row_->is_last_multi_version_row()) {
|
||||||
check_committing_trans_compacted_ = true;
|
check_committing_trans_compacted_ = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -265,9 +265,7 @@ bool ObMockIterator::equals(const ObStoreRow &r1, const ObStoreRow &r2,
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
bool bool_ret = false;
|
bool bool_ret = false;
|
||||||
STORAGE_LOG(INFO, "compare two rows", K(r1), K(r2));
|
STORAGE_LOG(INFO, "compare two rows", K(r1), K(r2));
|
||||||
if (r1.flag_ != r2.flag_) {
|
if (!cmp_multi_version_row_flag && (r1.flag_.is_not_exist() || r1.flag_.is_delete())) {
|
||||||
STORAGE_LOG(WARN, "flag not equals", K(r1), K(r2));
|
|
||||||
} else if (!cmp_multi_version_row_flag && (r1.flag_.is_not_exist() || r1.flag_.is_delete())) {
|
|
||||||
STORAGE_LOG(DEBUG, "flag is not row exist, return true", K(r1), K(r2));
|
STORAGE_LOG(DEBUG, "flag is not row exist, return true", K(r1), K(r2));
|
||||||
bool_ret = true;
|
bool_ret = true;
|
||||||
} else if (cmp_multi_version_row_flag && r1.row_type_flag_.flag_ != r2.row_type_flag_.flag_) {
|
} else if (cmp_multi_version_row_flag && r1.row_type_flag_.flag_ != r2.row_type_flag_.flag_) {
|
||||||
|
|||||||
Reference in New Issue
Block a user