[Enhancement](merge-on-write) use delete bitmap to mark delete for rows with delete sign when sequence column doesn't exist (#24011)

This commit is contained in:
bobhan1
2023-09-12 08:56:46 +08:00
committed by GitHub
parent 11e052c7a4
commit 6913d68ba0
8 changed files with 305 additions and 16 deletions

View File

@ -192,7 +192,8 @@ Status MergeIndexDeleteBitmapCalculator::calculate_all(DeleteBitmapPtr delete_bi
break;
}
RETURN_IF_ERROR(st);
delete_bitmap->add({_rowset_id, loc.segment_id, 0}, loc.row_id);
delete_bitmap->add({_rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
loc.row_id);
}
return Status::OK();
}

View File

@ -423,7 +423,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
if (_tablet_schema->is_strict_mode()) {
++num_rows_filtered;
// delete the invalid newly inserted row
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id, 0},
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id,
DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);
}
@ -447,6 +448,13 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
if (delete_sign_column_data != nullptr && delete_sign_column_data[block_pos] != 0) {
has_default_or_nullable = true;
use_default_or_null_flag.emplace_back(true);
if (!_tablet_schema->has_sequence_col() && !have_input_seq_column) {
// we can directly use delete bitmap to mark the rows with delete sign as deleted
// if sequence column doesn't exist to eliminate reading delete sign columns in later reads
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id,
DeleteBitmap::TEMP_VERSION_FOR_DELETE_SIGN},
segment_pos);
}
} else {
// partial update should not contain invisible columns
use_default_or_null_flag.emplace_back(false);
@ -457,10 +465,12 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
if (st.is<KEY_ALREADY_EXISTS>()) {
// although we need to mark delete current row, we still need to read missing columns
// for this row, we need to ensure that each column is aligned
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id, 0},
segment_pos);
_mow_context->delete_bitmap->add(
{_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
segment_pos);
} else {
_mow_context->delete_bitmap->add({loc.rowset_id, loc.segment_id, 0}, loc.row_id);
_mow_context->delete_bitmap->add(
{loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, loc.row_id);
}
}
CHECK(use_default_or_null_flag.size() == num_rows);
@ -638,6 +648,29 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
_serialize_block_to_row_column(*const_cast<vectorized::Block*>(block));
}
if (_opts.write_type == DataWriteType::TYPE_DIRECT && _opts.enable_unique_key_merge_on_write &&
!_tablet_schema->has_sequence_col() && _tablet_schema->delete_sign_idx() != -1) {
const vectorized::ColumnWithTypeAndName& delete_sign_column =
block->get_by_position(_tablet_schema->delete_sign_idx());
auto& delete_sign_col =
reinterpret_cast<const vectorized::ColumnInt8&>(*(delete_sign_column.column));
if (delete_sign_col.size() >= row_pos + num_rows) {
const vectorized::Int8* delete_sign_column_data = delete_sign_col.get_data().data();
uint32_t segment_start_pos =
_column_writers[_tablet_schema->delete_sign_idx()]->get_next_rowid();
for (size_t block_pos = row_pos, seg_pos = segment_start_pos;
seg_pos < segment_start_pos + num_rows; block_pos++, seg_pos++) {
// we can directly use delete bitmap to mark the rows with delete sign as deleted
// if sequence column doesn't exist to eliminate reading delete sign columns in later reads
if (delete_sign_column_data[block_pos]) {
_mow_context->delete_bitmap->add({_opts.rowset_ctx->rowset_id, _segment_id,
DeleteBitmap::TEMP_VERSION_FOR_DELETE_SIGN},
seg_pos);
}
}
}
}
_olap_data_convertor->set_source_content(block, row_pos, num_rows);
// find all row pos for short key indexes

View File

@ -2971,7 +2971,8 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
Slice key = Slice(index_column->get_data_at(i).data, index_column->get_data_at(i).size);
RowLocation loc;
// same row in segments should be filtered
if (delete_bitmap->contains({rowset_id, seg->id(), 0}, row_id)) {
if (delete_bitmap->contains({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id)) {
continue;
}
@ -2989,7 +2990,8 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
// sequence id smaller than the previous one, so delete current row
if (st.is<KEY_ALREADY_EXISTS>()) {
delete_bitmap->add({rowset_id, seg->id(), 0}, row_id);
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
continue;
} else if (is_partial_update && rowset_writer != nullptr) {
// In publish version, record rows to be deleted for concurrent update
@ -3011,12 +3013,16 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
// rowset.
// just set 0 as a unified temporary version number, and update to
// the real version number later.
delete_bitmap->add({loc.rowset_id, loc.segment_id, 0}, loc.row_id);
delete_bitmap->add({rowset_id, seg->id(), 0}, row_id);
delete_bitmap->add(
{loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
loc.row_id);
delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON},
row_id);
continue;
}
// when st = ok
delete_bitmap->add({loc.rowset_id, loc.segment_id, 0}, loc.row_id);
delete_bitmap->add({loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
loc.row_id);
}
remaining -= num_read;
}
@ -3717,8 +3723,9 @@ Status Tablet::calc_delete_bitmap_between_segments(
void Tablet::add_sentinel_mark_to_delete_bitmap(DeleteBitmap* delete_bitmap,
const RowsetIdUnorderedSet& rowsetids) {
for (const auto& rowsetid : rowsetids) {
delete_bitmap->add({rowsetid, DeleteBitmap::INVALID_SEGMENT_ID, 0},
DeleteBitmap::ROWSET_SENTINEL_MARK);
delete_bitmap->add(
{rowsetid, DeleteBitmap::INVALID_SEGMENT_ID, DeleteBitmap::TEMP_VERSION_COMMON},
DeleteBitmap::ROWSET_SENTINEL_MARK);
}
}
@ -3739,8 +3746,8 @@ Status Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, in
std::vector<RowsetSharedPtr>* rowsets) {
RowsetIdUnorderedSet missing_ids;
for (const auto& rowsetid : rowset_ids) {
if (!delete_bitmap->delete_bitmap.contains(
{rowsetid, DeleteBitmap::INVALID_SEGMENT_ID, 0})) {
if (!delete_bitmap->delete_bitmap.contains({rowsetid, DeleteBitmap::INVALID_SEGMENT_ID,
DeleteBitmap::TEMP_VERSION_COMMON})) {
missing_ids.insert(rowsetid);
}
}

View File

@ -340,6 +340,13 @@ public:
constexpr static inline uint32_t ROWSET_SENTINEL_MARK =
std::numeric_limits<uint32_t>::max() - 1;
// When a delete bitmap is merged into tablet's delete bitmap, the version of entries in the delete bitmap
// will be replaced to the correspoding correct version. So before we finally merge a delete bitmap into
// tablet's delete bitmap we can use arbitary version number in BitmapKey. Here we define some version numbers
// for specific usage during this periods to avoid conflicts
constexpr static inline uint64_t TEMP_VERSION_COMMON = 0;
constexpr static inline uint64_t TEMP_VERSION_FOR_DELETE_SIGN = 1;
/**
*
* @param tablet_id the tablet which this delete bitmap associates with

View File

@ -10,7 +10,7 @@
2 2 2 2 2
4 4 4 4 4
-- !with_delete_sign --
-- !1 --
1 \N \N \N \N 1
1 1 1 1 1 0
2 2 2 2 2 0
@ -20,3 +20,35 @@
5 \N \N \N \N 1
5 5 5 5 5 0
-- !2 --
2 2 2 2 2 0
4 4 4 4 4 0
-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3
4 4 4 4 4
5 5 5 5 5
-- !after_delete --
2 2 2 2 2
4 4 4 4 4
-- !1 --
1 \N \N \N \N 1
1 1 1 1 1 0
2 2 2 2 2 0
3 \N \N \N \N 1
3 3 3 3 3 0
4 4 4 4 4 0
5 \N \N \N \N 1
5 5 5 5 5 0
-- !2 --
1 \N \N \N \N 1
2 2 2 2 2 0
3 \N \N \N \N 1
4 4 4 4 4 0
5 \N \N \N \N 1

View File

@ -0,0 +1,54 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3
4 4 4 4 4
5 5 5 5 5
-- !after_delete --
2 2 2 2 2
4 4 4 4 4
-- !1 --
1 1 1 1 1 0
1 1 1 1 1 1
2 2 2 2 2 0
3 3 3 3 3 0
3 3 3 3 3 1
4 4 4 4 4 0
5 5 5 5 5 0
5 5 5 5 5 1
-- !2 --
2 2 2 2 2 0
4 4 4 4 4 0
-- !sql --
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3
4 4 4 4 4
5 5 5 5 5
-- !after_delete --
2 2 2 2 2
4 4 4 4 4
-- !1 --
1 1 1 1 1 0
1 1 1 1 1 1
2 2 2 2 2 0
3 3 3 3 3 0
3 3 3 3 3 1
4 4 4 4 4 0
5 5 5 5 5 0
5 5 5 5 5 1
-- !2 --
1 1 1 1 1 1
2 2 2 2 2 0
3 3 3 3 3 1
4 4 4 4 4 0
5 5 5 5 5 1

View File

@ -55,6 +55,65 @@ suite('test_partial_update_delete_sign') {
sql "set skip_storage_engine_merge=true;"
sql "set skip_delete_bitmap=true;"
sql "sync"
qt_with_delete_sign "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
// skip_delete_bitmap=true, skip_delete_sign=true
qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
sql "set skip_delete_sign=true;"
sql "set skip_delete_bitmap=false;"
sql "sync"
// skip_delete_bitmap=false, skip_delete_sign=true
qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
sql "drop table if exists ${tableName1};"
sql "set skip_delete_sign=false;"
sql "set skip_storage_engine_merge=false;"
sql "set skip_delete_bitmap=false;"
sql "sync"
def tableName2 = "test_partial_update_delete_sign2"
sql "DROP TABLE IF EXISTS ${tableName2};"
sql """ CREATE TABLE IF NOT EXISTS ${tableName2} (
`k1` int NOT NULL,
`c1` int,
`c2` int,
`c3` int,
`c4` int
)UNIQUE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"disable_auto_compaction" = "true",
"replication_num" = "1",
"function_column.sequence_col" = 'c4'
);"""
sql "insert into ${tableName2} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);"
qt_sql "select * from ${tableName2} order by k1,c1,c2,c3,c4;"
streamLoad {
table "${tableName2}"
set 'column_separator', ','
set 'format', 'csv'
set 'partial_columns', 'true'
set 'columns', 'k1,__DORIS_DELETE_SIGN__'
file 'delete_sign.csv'
time 10000 // limit inflight 10s
}
sql "sync"
qt_after_delete "select * from ${tableName2} order by k1,c1,c2,c3,c4;"
sql "set skip_delete_sign=true;"
sql "set skip_storage_engine_merge=true;"
sql "set skip_delete_bitmap=true;"
sql "sync"
// skip_delete_bitmap=true, skip_delete_sign=true
qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
sql "set skip_delete_sign=true;"
sql "set skip_delete_bitmap=false;"
sql "sync"
// skip_delete_bitmap=false, skip_delete_sign=true
qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
sql "drop table if exists ${tableName2};"
}

View File

@ -0,0 +1,96 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite('test_delete_sign_delete_bitmap') {
def tableName1 = "test_delete_sign_delete_bitmap1"
sql "DROP TABLE IF EXISTS ${tableName1};"
sql """ CREATE TABLE IF NOT EXISTS ${tableName1} (
`k1` int NOT NULL,
`c1` int,
`c2` int,
`c3` int,
`c4` int
)UNIQUE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"disable_auto_compaction" = "true",
"replication_num" = "1"
);"""
sql "insert into ${tableName1} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);"
qt_sql "select * from ${tableName1} order by k1,c1,c2,c3,c4;"
// sql "insert into ${tableName1}(k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__) select k1,c1,c2,c3,c4,1 from ${tableName1} where k1 in (1,3,5);"
sql """insert into ${tableName1}(k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__) values(1,1,1,1,1,1),(3,3,3,3,3,1),(5,5,5,5,5,1);"""
sql "sync"
qt_after_delete "select * from ${tableName1} order by k1,c1,c2,c3,c4;"
sql "set skip_delete_sign=true;"
sql "set skip_storage_engine_merge=true;"
sql "set skip_delete_bitmap=true;"
sql "sync"
// skip_delete_bitmap=true, skip_delete_sign=true
qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
sql "set skip_delete_sign=true;"
sql "set skip_delete_bitmap=false;"
sql "sync"
// skip_delete_bitmap=false, skip_delete_sign=true
qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName1} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
sql "drop table if exists ${tableName1};"
sql "set skip_delete_sign=false;"
sql "set skip_storage_engine_merge=false;"
sql "set skip_delete_bitmap=false;"
sql "sync"
def tableName2 = "test_delete_sign_delete_bitmap2"
sql "DROP TABLE IF EXISTS ${tableName2};"
sql """ CREATE TABLE IF NOT EXISTS ${tableName2} (
`k1` int NOT NULL,
`c1` int,
`c2` int,
`c3` int,
`c4` int
)UNIQUE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"disable_auto_compaction" = "true",
"replication_num" = "1",
"function_column.sequence_col" = 'c4'
);"""
sql "insert into ${tableName2} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3),(4,4,4,4,4),(5,5,5,5,5);"
qt_sql "select * from ${tableName2} order by k1,c1,c2,c3,c4;"
sql """insert into ${tableName2}(k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__) values(1,1,1,1,1,1),(3,3,3,3,3,1),(5,5,5,5,5,1);"""
sql "sync"
qt_after_delete "select * from ${tableName2} order by k1,c1,c2,c3,c4;"
sql "set skip_delete_sign=true;"
sql "set skip_storage_engine_merge=true;"
sql "set skip_delete_bitmap=true;"
sql "sync"
// skip_delete_bitmap=true, skip_delete_sign=true
qt_1 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
sql "set skip_delete_sign=true;"
sql "set skip_delete_bitmap=false;"
sql "sync"
// skip_delete_bitmap=false, skip_delete_sign=true
qt_2 "select k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__ from ${tableName2} order by k1,c1,c2,c3,c4,__DORIS_DELETE_SIGN__;"
sql "drop table if exists ${tableName2};"
}