From 523f0baa80676eaec47d7fa65e2ecd0d257209f0 Mon Sep 17 00:00:00 2001 From: Siyang Tang <82279870+TangSiyang2001@users.noreply.github.com> Date: Sat, 14 Sep 2024 11:08:55 +0800 Subject: [PATCH] [fix](scan) Fix incorrect query results due to data race of compaction and parallel scanners building (#40552) (#40829) pick: #40552 Capture rowset splits and delete predicates atomicly in `ParallelScannerBuilder::_load` as a single read source. In this way, we could prevent reading stale rowsets with the delete predicates eliminated by (base) compaction. --- be/src/olap/parallel_scanner_builder.cpp | 65 ++++++++++-------------- be/src/olap/parallel_scanner_builder.h | 6 ++- 2 files changed, 32 insertions(+), 39 deletions(-) diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp index 5ad7423221..c737731007 100644 --- a/be/src/olap/parallel_scanner_builder.cpp +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -41,32 +41,23 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid( std::list& scanners) { DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner); for (auto&& [tablet, version] : _tablets) { - DCHECK(_all_rowsets.contains(tablet->tablet_id())); - auto& rowsets = _all_rowsets[tablet->tablet_id()]; - - TabletReader::ReadSource reade_source_with_delete_info; - if (!_state->skip_delete_predicate()) { - { - std::shared_lock rdlock(tablet->get_header_lock()); - RETURN_IF_ERROR(tablet->capture_rs_readers( - {0, version}, &reade_source_with_delete_info.rs_splits, false)); - } - reade_source_with_delete_info.fill_delete_predicates(); - } - - TabletReader::ReadSource read_source; + DCHECK(_all_read_sources.contains(tablet->tablet_id())); + auto& entire_read_source = _all_read_sources[tablet->tablet_id()]; + // `rs_splits` in `entire read source` will be devided into several partitial read sources + // to build several parallel scanners, based on segment rows number. All the partitial read sources + // share the same delete predicates from their corresponding entire read source. + TabletReader::ReadSource partitial_read_source; int64_t rows_collected = 0; - for (auto& rowset : rowsets) { - auto beta_rowset = std::dynamic_pointer_cast(rowset); - RowsetReaderSharedPtr reader; - RETURN_IF_ERROR(beta_rowset->create_reader(&reader)); - const auto rowset_id = beta_rowset->rowset_id(); + for (auto& rs_split : entire_read_source.rs_splits) { + auto reader = rs_split.rs_reader; + auto rowset = reader->rowset(); + const auto rowset_id = rowset->rowset_id(); DCHECK(_segment_cache_handles.contains(rowset_id)); auto& segment_cache_handle = _segment_cache_handles[rowset_id]; - if (beta_rowset->num_rows() == 0) { + if (rowset->num_rows() == 0) { continue; } @@ -106,14 +97,14 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid( DCHECK_EQ(split.segment_offsets.second - split.segment_offsets.first, split.segment_row_ranges.size()); - read_source.rs_splits.emplace_back(std::move(split)); + partitial_read_source.rs_splits.emplace_back(std::move(split)); scanners.emplace_back( _build_scanner(tablet, version, _key_ranges, - {std::move(read_source.rs_splits), - reade_source_with_delete_info.delete_predicates})); + {std::move(partitial_read_source.rs_splits), + entire_read_source.delete_predicates})); - read_source = TabletReader::ReadSource(); + partitial_read_source = {}; split = RowSetSplits(reader->clone()); row_ranges = RowRanges(); @@ -137,25 +128,24 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid( DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first); DCHECK_EQ(split.segment_row_ranges.size(), split.segment_offsets.second - split.segment_offsets.first); - read_source.rs_splits.emplace_back(std::move(split)); + partitial_read_source.rs_splits.emplace_back(std::move(split)); } } // end `for (auto& rowset : rowsets)` DCHECK_LE(rows_collected, _rows_per_scanner); if (rows_collected > 0) { - DCHECK_GT(read_source.rs_splits.size(), 0); + DCHECK_GT(partitial_read_source.rs_splits.size(), 0); #ifndef NDEBUG - for (auto& split : read_source.rs_splits) { + for (auto& split : partitial_read_source.rs_splits) { DCHECK(split.rs_reader != nullptr); DCHECK_LT(split.segment_offsets.first, split.segment_offsets.second); DCHECK_EQ(split.segment_row_ranges.size(), split.segment_offsets.second - split.segment_offsets.first); } #endif - scanners.emplace_back( - _build_scanner(tablet, version, _key_ranges, - {std::move(read_source.rs_splits), - reade_source_with_delete_info.delete_predicates})); + scanners.emplace_back(_build_scanner(tablet, version, _key_ranges, + {std::move(partitial_read_source.rs_splits), + entire_read_source.delete_predicates})); } } @@ -170,13 +160,14 @@ Status ParallelScannerBuilder::_load() { _total_rows = 0; for (auto&& [tablet, version] : _tablets) { const auto tablet_id = tablet->tablet_id(); - auto& rowsets = _all_rowsets[tablet_id]; - { - std::shared_lock read_lock(tablet->get_header_lock()); - RETURN_IF_ERROR(tablet->capture_consistent_rowsets({0, version}, &rowsets)); + auto& read_source = _all_read_sources[tablet_id]; + RETURN_IF_ERROR(tablet->capture_rs_readers({0, version}, &read_source.rs_splits, false)); + if (!_state->skip_delete_predicate()) { + read_source.fill_delete_predicates(); } - for (auto& rowset : rowsets) { + for (auto& rs_split : read_source.rs_splits) { + auto rowset = rs_split.rs_reader->rowset(); RETURN_IF_ERROR(rowset->load()); const auto rowset_id = rowset->rowset_id(); auto& segment_cache_handle = _segment_cache_handles[rowset_id]; @@ -205,4 +196,4 @@ std::shared_ptr ParallelScannerBuilder::_build_scann template class ParallelScannerBuilder; template class ParallelScannerBuilder; -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/olap/parallel_scanner_builder.h b/be/src/olap/parallel_scanner_builder.h index 7d28dd706f..62ef430257 100644 --- a/be/src/olap/parallel_scanner_builder.h +++ b/be/src/olap/parallel_scanner_builder.h @@ -19,8 +19,10 @@ #include #include +#include #include +#include "olap/rowset/rowset_fwd.h" #include "olap/rowset/segment_v2/row_ranges.h" #include "olap/segment_loader.h" #include "olap/tablet.h" @@ -92,7 +94,7 @@ private: bool _is_preaggregation; std::vector _tablets; std::vector _key_ranges; - std::unordered_map> _all_rowsets; + std::unordered_map _all_read_sources; }; -} // namespace doris \ No newline at end of file +} // namespace doris