diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 58f6daecb2..0338406aec 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -398,20 +398,33 @@ Status Compaction::construct_input_rowset_readers() { Status Compaction::modify_rowsets(const Merger::Statistics* stats) { std::vector output_rowsets; output_rowsets.push_back(_output_rowset); - uint64_t missed_rows = 0; if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { Version version = _tablet->max_version(); DeleteBitmap output_rowset_delete_bitmap(_tablet->tablet_id()); + std::set missed_rows; std::map>> location_map; // Convert the delete bitmap of the input rowsets to output rowset. // New loads are not blocked, so some keys of input rowsets might // be deleted during the time. We need to deal with delete bitmap // of incremental data later. - missed_rows += _tablet->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, 0, version.second + 1, &location_map, - &output_rowset_delete_bitmap); + _tablet->calc_compaction_output_rowset_delete_bitmap( + _input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows, + &location_map, &output_rowset_delete_bitmap); + std::size_t missed_rows_size = missed_rows.size(); + if (compaction_type() == READER_CUMULATIVE_COMPACTION) { + std::string err_msg = fmt::format( + "cumulative compaction: the merged rows({}) is not equal to missed " + "rows({}) in rowid conversion, tablet_id: {}, table_id:{}", + stats->merged_rows, missed_rows_size, _tablet->tablet_id(), + _tablet->table_id()); + DCHECK(stats == nullptr || stats->merged_rows == missed_rows_size) << err_msg; + if (stats != nullptr && stats->merged_rows != missed_rows_size) { + LOG(WARNING) << err_msg; + } + } + RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map)); location_map.clear(); { @@ -420,22 +433,17 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) { // Convert the delete bitmap of the input rowsets to output rowset for // incremental data. - missed_rows += _tablet->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, version.second, UINT64_MAX, &location_map, - &output_rowset_delete_bitmap); - RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map)); - - if (compaction_type() == READER_CUMULATIVE_COMPACTION) { - std::string err_msg = fmt::format( - "cumulative compaction: the merged rows({}) is not equal to missed " - "rows({}) in rowid conversion, tablet_id: {}, table_id:{}", - stats->merged_rows, missed_rows, _tablet->tablet_id(), _tablet->table_id()); - DCHECK(stats == nullptr || stats->merged_rows == missed_rows) << err_msg; - if (stats != nullptr && stats->merged_rows != missed_rows) { - LOG(WARNING) << err_msg; - } + _tablet->calc_compaction_output_rowset_delete_bitmap( + _input_rowsets, _rowid_conversion, version.second, UINT64_MAX, &missed_rows, + &location_map, &output_rowset_delete_bitmap); + DCHECK_EQ(missed_rows.size(), missed_rows_size); + if (missed_rows.size() != missed_rows_size) { + LOG(WARNING) << "missed rows don't match, before: " << missed_rows_size + << " after: " << missed_rows.size(); } + RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map)); + _tablet->merge_delete_bitmap(output_rowset_delete_bitmap); RETURN_NOT_OK(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true)); } diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index adf5beeaf8..f70bc7f4c7 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -2674,12 +2674,11 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, const TabletT return Status::OK(); } -uint64_t Tablet::calc_compaction_output_rowset_delete_bitmap( +void Tablet::calc_compaction_output_rowset_delete_bitmap( const std::vector& input_rowsets, const RowIdConversion& rowid_conversion, - uint64_t start_version, uint64_t end_version, + uint64_t start_version, uint64_t end_version, std::set* missed_rows, std::map>>* location_map, DeleteBitmap* output_rowset_delete_bitmap) { - std::set missed_rows; RowLocation src; RowLocation dst; for (auto& rowset : input_rowsets) { @@ -2701,7 +2700,7 @@ uint64_t Tablet::calc_compaction_output_rowset_delete_bitmap( << " src loaction: |" << src.rowset_id << "|" << src.segment_id << "|" << src.row_id << " version: " << cur_version; - missed_rows.insert(src); + missed_rows->insert(src); continue; } VLOG_DEBUG << "calc_compaction_output_rowset_delete_bitmap dst location: |" @@ -2716,7 +2715,6 @@ uint64_t Tablet::calc_compaction_output_rowset_delete_bitmap( } } } - return missed_rows.size(); } void Tablet::merge_delete_bitmap(const DeleteBitmap& delete_bitmap) { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index a20719c307..93c7e78192 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -398,9 +398,10 @@ public: Status update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset); Status update_delete_bitmap(const RowsetSharedPtr& rowset, const TabletTxnInfo* load_info); - uint64_t calc_compaction_output_rowset_delete_bitmap( + void calc_compaction_output_rowset_delete_bitmap( const std::vector& input_rowsets, const RowIdConversion& rowid_conversion, uint64_t start_version, uint64_t end_version, + std::set* missed_rows, std::map>>* location_map, DeleteBitmap* output_rowset_delete_bitmap); void merge_delete_bitmap(const DeleteBitmap& delete_bitmap);