diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index ca746f8aac..f9353a0b64 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -262,8 +262,8 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() { RETURN_IF_ERROR(beta_rowset->load_segments(&segments)); if (segments.size() > 1) { // calculate delete bitmap between segments - RETURN_IF_ERROR( - tablet()->calc_delete_bitmap_between_segments(_rowset, segments, _delete_bitmap)); + RETURN_IF_ERROR(tablet()->calc_delete_bitmap_between_segments(_tablet_schema, _rowset, + segments, _delete_bitmap)); } // tablet is under alter process. The delete bitmap will be calculated after conversion. diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 4661f5eae8..2ad8d43bcb 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3503,7 +3503,8 @@ Status Tablet::update_delete_bitmap_without_lock( // calculate delete bitmap between segments if necessary. DeleteBitmapPtr delete_bitmap = std::make_shared(tablet_id()); - RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap)); + RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset->tablet_schema(), rowset, segments, + delete_bitmap)); // get all base rowsets to calculate on std::vector specified_rowsets; @@ -4089,22 +4090,21 @@ void Tablet::clear_cache() { } Status Tablet::calc_delete_bitmap_between_segments( - RowsetSharedPtr rowset, const std::vector& segments, - DeleteBitmapPtr delete_bitmap) { + TabletSchemaSPtr schema, RowsetSharedPtr rowset, + const std::vector& segments, DeleteBitmapPtr delete_bitmap) { size_t const num_segments = segments.size(); if (num_segments < 2) { return Status::OK(); } - OlapStopWatch watch; auto const rowset_id = rowset->rowset_id(); size_t seq_col_length = 0; - if (_tablet_meta->tablet_schema()->has_sequence_col()) { - auto seq_col_idx = _tablet_meta->tablet_schema()->sequence_col_idx(); - seq_col_length = _tablet_meta->tablet_schema()->column(seq_col_idx).length() + 1; + if (schema->has_sequence_col()) { + auto seq_col_idx = schema->sequence_col_idx(); + seq_col_length = schema->column(seq_col_idx).length() + 1; } size_t rowid_length = 0; - if (!_tablet_meta->tablet_schema()->cluster_key_idxes().empty()) { + if (!schema->cluster_key_idxes().empty()) { rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH; } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index a3f2c40a61..ee70008279 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -485,7 +485,8 @@ public: RowsetWriter* rowset_writer); Status calc_delete_bitmap_between_segments( - RowsetSharedPtr rowset, const std::vector& segments, + TabletSchemaSPtr schema, RowsetSharedPtr rowset, + const std::vector& segments, DeleteBitmapPtr delete_bitmap); Status read_columns_by_plan(TabletSchemaSPtr tablet_schema, std::vector cids_to_read, const PartialUpdateReadPlan& read_plan, diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 851718f49c..a9f3bf3aa2 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -549,7 +549,7 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, std::vector segments; RETURN_IF_ERROR(std::static_pointer_cast(rowset)->load_segments(&segments)); RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments( - rowset, segments, tablet_txn_info->delete_bitmap)); + rowset->tablet_schema(), rowset, segments, tablet_txn_info->delete_bitmap)); } RETURN_IF_ERROR(tablet->update_delete_bitmap(tablet_txn_info.get(), transaction_id)); diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 07e5ef4104..7aa8da0e0b 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -580,8 +580,8 @@ void _ingest_binlog(IngestBinlogArg* arg) { } if (segments.size() > 1) { // calculate delete bitmap between segments - status = local_tablet->calc_delete_bitmap_between_segments(rowset, segments, - delete_bitmap); + status = local_tablet->calc_delete_bitmap_between_segments( + rowset->tablet_schema(), rowset, segments, delete_bitmap); if (!status) { LOG(WARNING) << "failed to calculate delete bitmap" << ". tablet_id: " << local_tablet->tablet_id() diff --git a/regression-test/data/fault_injection_p0/test_mow_alter_seq_multi_segments.out b/regression-test/data/fault_injection_p0/test_mow_alter_seq_multi_segments.out new file mode 100644 index 0000000000..aa44268bf3 --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_mow_alter_seq_multi_segments.out @@ -0,0 +1,10 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 1 + +-- !sql -- +4064 + +-- !dup_key_count -- +0 + diff --git a/regression-test/suites/fault_injection_p0/test_mow_alter_seq_multi_segments.groovy b/regression-test/suites/fault_injection_p0/test_mow_alter_seq_multi_segments.groovy new file mode 100644 index 0000000000..d6b17bd5c7 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_mow_alter_seq_multi_segments.groovy @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mow_alter_seq_multi_segments", "nonConcurrent") { + def table1 = "test_mow_alter_seq_multi_segments" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql """insert into ${table1} values(1,1,1);""" + qt_sql "select * from ${table1} order by k1;" + sql """alter table ${table1} ENABLE FEATURE "SEQUENCE_LOAD" WITH PROPERTIES ("function_column.sequence_type"="int");""" + + // to cause multi segments and segment compaction + def customBeConfig = [ + doris_scanner_row_bytes : 1 + ] + + setBeConfigTemporary(customBeConfig) { + try { + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + // batch_size is 4164 in csv_reader.cpp + // _batch_size is 8192 in vtablet_writer.cpp + // to cause multi segments + GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush") + + Thread.sleep(1000) + + int rows = 4064 + // load data that will have multi segments and there are duplicate keys between segments + String content = "" + (1..rows).each { + int x = it + content += "${x},${x},${x},1\n" + } + (1..rows).each { + int x = it + content += "${x},${x},${x},2\n" + } + def t1 = Thread.start { + streamLoad { + table "${table1}" + set 'column_separator', ',' + set 'columns', 'k1,c1,c2,seq' + set 'function_column.sequence_col', 'seq' + inputStream new ByteArrayInputStream(content.getBytes()) + time 30000 + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + assert "success" == json.Status.toLowerCase() + } + } + } + t1.join() + qt_sql "select count() from ${table1};" + // qt_sql "select *,__DORIS_VERSION_COL__ as ver, __DORIS_DELETE_SIGN__ as del,__DORIS_SEQUENCE_COL__ as seq from ${table1} where k1<=10 order by k1,__DORIS_VERSION_COL__;" + sql "set disable_nereids_rules='ELIMINATE_GROUP_BY';" + qt_dup_key_count "select count() from (select k1,count() as cnt from ${table1} group by k1 having cnt > 1) A;" + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + } + } +}