branch-2.1: [Fix](mow) Should use rowset's schema when calculate delete bitmaps between segments #54351 (#54370)

pick https://github.com/apache/doris/pull/54351
This commit is contained in:
bobhan1
2025-08-07 16:14:29 +08:00
committed by GitHub
parent 80389069ee
commit 75cd3712f3
7 changed files with 119 additions and 14 deletions

View File

@ -262,8 +262,8 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
if (segments.size() > 1) {
// calculate delete bitmap between segments
RETURN_IF_ERROR(
tablet()->calc_delete_bitmap_between_segments(_rowset, segments, _delete_bitmap));
RETURN_IF_ERROR(tablet()->calc_delete_bitmap_between_segments(_tablet_schema, _rowset,
segments, _delete_bitmap));
}
// tablet is under alter process. The delete bitmap will be calculated after conversion.

View File

@ -3503,7 +3503,8 @@ Status Tablet::update_delete_bitmap_without_lock(
// calculate delete bitmap between segments if necessary.
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap));
RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset->tablet_schema(), rowset, segments,
delete_bitmap));
// get all base rowsets to calculate on
std::vector<RowsetSharedPtr> specified_rowsets;
@ -4089,22 +4090,21 @@ void Tablet::clear_cache() {
}
Status Tablet::calc_delete_bitmap_between_segments(
RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments,
DeleteBitmapPtr delete_bitmap) {
TabletSchemaSPtr schema, RowsetSharedPtr rowset,
const std::vector<segment_v2::SegmentSharedPtr>& segments, DeleteBitmapPtr delete_bitmap) {
size_t const num_segments = segments.size();
if (num_segments < 2) {
return Status::OK();
}
OlapStopWatch watch;
auto const rowset_id = rowset->rowset_id();
size_t seq_col_length = 0;
if (_tablet_meta->tablet_schema()->has_sequence_col()) {
auto seq_col_idx = _tablet_meta->tablet_schema()->sequence_col_idx();
seq_col_length = _tablet_meta->tablet_schema()->column(seq_col_idx).length() + 1;
if (schema->has_sequence_col()) {
auto seq_col_idx = schema->sequence_col_idx();
seq_col_length = schema->column(seq_col_idx).length() + 1;
}
size_t rowid_length = 0;
if (!_tablet_meta->tablet_schema()->cluster_key_idxes().empty()) {
if (!schema->cluster_key_idxes().empty()) {
rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH;
}

View File

@ -485,7 +485,8 @@ public:
RowsetWriter* rowset_writer);
Status calc_delete_bitmap_between_segments(
RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments,
TabletSchemaSPtr schema, RowsetSharedPtr rowset,
const std::vector<segment_v2::SegmentSharedPtr>& segments,
DeleteBitmapPtr delete_bitmap);
Status read_columns_by_plan(TabletSchemaSPtr tablet_schema, std::vector<uint32_t> cids_to_read,
const PartialUpdateReadPlan& read_plan,

View File

@ -549,7 +549,7 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(std::static_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments(
rowset, segments, tablet_txn_info->delete_bitmap));
rowset->tablet_schema(), rowset, segments, tablet_txn_info->delete_bitmap));
}
RETURN_IF_ERROR(tablet->update_delete_bitmap(tablet_txn_info.get(), transaction_id));

View File

@ -580,8 +580,8 @@ void _ingest_binlog(IngestBinlogArg* arg) {
}
if (segments.size() > 1) {
// calculate delete bitmap between segments
status = local_tablet->calc_delete_bitmap_between_segments(rowset, segments,
delete_bitmap);
status = local_tablet->calc_delete_bitmap_between_segments(
rowset->tablet_schema(), rowset, segments, delete_bitmap);
if (!status) {
LOG(WARNING) << "failed to calculate delete bitmap"
<< ". tablet_id: " << local_tablet->tablet_id()

View File

@ -0,0 +1,10 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1
-- !sql --
4064
-- !dup_key_count --
0

View File

@ -0,0 +1,94 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_mow_alter_seq_multi_segments", "nonConcurrent") {
def table1 = "test_mow_alter_seq_multi_segments"
sql "DROP TABLE IF EXISTS ${table1} FORCE;"
sql """ CREATE TABLE IF NOT EXISTS ${table1} (
`k1` int NOT NULL,
`c1` int,
`c2` int
)UNIQUE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"disable_auto_compaction" = "true",
"replication_num" = "1"); """
sql """insert into ${table1} values(1,1,1);"""
qt_sql "select * from ${table1} order by k1;"
sql """alter table ${table1} ENABLE FEATURE "SEQUENCE_LOAD" WITH PROPERTIES ("function_column.sequence_type"="int");"""
// to cause multi segments and segment compaction
def customBeConfig = [
doris_scanner_row_bytes : 1
]
setBeConfigTemporary(customBeConfig) {
try {
GetDebugPoint().clearDebugPointsForAllBEs()
GetDebugPoint().clearDebugPointsForAllFEs()
// batch_size is 4164 in csv_reader.cpp
// _batch_size is 8192 in vtablet_writer.cpp
// to cause multi segments
GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
Thread.sleep(1000)
int rows = 4064
// load data that will have multi segments and there are duplicate keys between segments
String content = ""
(1..rows).each {
int x = it
content += "${x},${x},${x},1\n"
}
(1..rows).each {
int x = it
content += "${x},${x},${x},2\n"
}
def t1 = Thread.start {
streamLoad {
table "${table1}"
set 'column_separator', ','
set 'columns', 'k1,c1,c2,seq'
set 'function_column.sequence_col', 'seq'
inputStream new ByteArrayInputStream(content.getBytes())
time 30000
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
def json = parseJson(result)
assert "success" == json.Status.toLowerCase()
}
}
}
t1.join()
qt_sql "select count() from ${table1};"
// qt_sql "select *,__DORIS_VERSION_COL__ as ver, __DORIS_DELETE_SIGN__ as del,__DORIS_SEQUENCE_COL__ as seq from ${table1} where k1<=10 order by k1,__DORIS_VERSION_COL__;"
sql "set disable_nereids_rules='ELIMINATE_GROUP_BY';"
qt_dup_key_count "select count() from (select k1,count() as cnt from ${table1} group by k1 having cnt > 1) A;"
} catch(Exception e) {
logger.info(e.getMessage())
throw e
} finally {
GetDebugPoint().clearDebugPointsForAllBEs()
GetDebugPoint().clearDebugPointsForAllFEs()
}
}
}