[fix](scan) Fix incorrect query results due to data race of compaction and parallel scanners building (#40552) (#40829)
pick: #40552 Capture rowset splits and delete predicates atomicly in `ParallelScannerBuilder::_load` as a single read source. In this way, we could prevent reading stale rowsets with the delete predicates eliminated by (base) compaction.
This commit is contained in:
@ -41,32 +41,23 @@ Status ParallelScannerBuilder<ParentType>::_build_scanners_by_rowid(
|
||||
std::list<VScannerSPtr>& scanners) {
|
||||
DCHECK_GE(_rows_per_scanner, _min_rows_per_scanner);
|
||||
for (auto&& [tablet, version] : _tablets) {
|
||||
DCHECK(_all_rowsets.contains(tablet->tablet_id()));
|
||||
auto& rowsets = _all_rowsets[tablet->tablet_id()];
|
||||
|
||||
TabletReader::ReadSource reade_source_with_delete_info;
|
||||
if (!_state->skip_delete_predicate()) {
|
||||
{
|
||||
std::shared_lock rdlock(tablet->get_header_lock());
|
||||
RETURN_IF_ERROR(tablet->capture_rs_readers(
|
||||
{0, version}, &reade_source_with_delete_info.rs_splits, false));
|
||||
}
|
||||
reade_source_with_delete_info.fill_delete_predicates();
|
||||
}
|
||||
|
||||
TabletReader::ReadSource read_source;
|
||||
DCHECK(_all_read_sources.contains(tablet->tablet_id()));
|
||||
auto& entire_read_source = _all_read_sources[tablet->tablet_id()];
|
||||
|
||||
// `rs_splits` in `entire read source` will be devided into several partitial read sources
|
||||
// to build several parallel scanners, based on segment rows number. All the partitial read sources
|
||||
// share the same delete predicates from their corresponding entire read source.
|
||||
TabletReader::ReadSource partitial_read_source;
|
||||
int64_t rows_collected = 0;
|
||||
for (auto& rowset : rowsets) {
|
||||
auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
|
||||
RowsetReaderSharedPtr reader;
|
||||
RETURN_IF_ERROR(beta_rowset->create_reader(&reader));
|
||||
const auto rowset_id = beta_rowset->rowset_id();
|
||||
for (auto& rs_split : entire_read_source.rs_splits) {
|
||||
auto reader = rs_split.rs_reader;
|
||||
auto rowset = reader->rowset();
|
||||
const auto rowset_id = rowset->rowset_id();
|
||||
|
||||
DCHECK(_segment_cache_handles.contains(rowset_id));
|
||||
auto& segment_cache_handle = _segment_cache_handles[rowset_id];
|
||||
|
||||
if (beta_rowset->num_rows() == 0) {
|
||||
if (rowset->num_rows() == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -106,14 +97,14 @@ Status ParallelScannerBuilder<ParentType>::_build_scanners_by_rowid(
|
||||
DCHECK_EQ(split.segment_offsets.second - split.segment_offsets.first,
|
||||
split.segment_row_ranges.size());
|
||||
|
||||
read_source.rs_splits.emplace_back(std::move(split));
|
||||
partitial_read_source.rs_splits.emplace_back(std::move(split));
|
||||
|
||||
scanners.emplace_back(
|
||||
_build_scanner(tablet, version, _key_ranges,
|
||||
{std::move(read_source.rs_splits),
|
||||
reade_source_with_delete_info.delete_predicates}));
|
||||
{std::move(partitial_read_source.rs_splits),
|
||||
entire_read_source.delete_predicates}));
|
||||
|
||||
read_source = TabletReader::ReadSource();
|
||||
partitial_read_source = {};
|
||||
split = RowSetSplits(reader->clone());
|
||||
row_ranges = RowRanges();
|
||||
|
||||
@ -137,25 +128,24 @@ Status ParallelScannerBuilder<ParentType>::_build_scanners_by_rowid(
|
||||
DCHECK_GT(split.segment_offsets.second, split.segment_offsets.first);
|
||||
DCHECK_EQ(split.segment_row_ranges.size(),
|
||||
split.segment_offsets.second - split.segment_offsets.first);
|
||||
read_source.rs_splits.emplace_back(std::move(split));
|
||||
partitial_read_source.rs_splits.emplace_back(std::move(split));
|
||||
}
|
||||
} // end `for (auto& rowset : rowsets)`
|
||||
|
||||
DCHECK_LE(rows_collected, _rows_per_scanner);
|
||||
if (rows_collected > 0) {
|
||||
DCHECK_GT(read_source.rs_splits.size(), 0);
|
||||
DCHECK_GT(partitial_read_source.rs_splits.size(), 0);
|
||||
#ifndef NDEBUG
|
||||
for (auto& split : read_source.rs_splits) {
|
||||
for (auto& split : partitial_read_source.rs_splits) {
|
||||
DCHECK(split.rs_reader != nullptr);
|
||||
DCHECK_LT(split.segment_offsets.first, split.segment_offsets.second);
|
||||
DCHECK_EQ(split.segment_row_ranges.size(),
|
||||
split.segment_offsets.second - split.segment_offsets.first);
|
||||
}
|
||||
#endif
|
||||
scanners.emplace_back(
|
||||
_build_scanner(tablet, version, _key_ranges,
|
||||
{std::move(read_source.rs_splits),
|
||||
reade_source_with_delete_info.delete_predicates}));
|
||||
scanners.emplace_back(_build_scanner(tablet, version, _key_ranges,
|
||||
{std::move(partitial_read_source.rs_splits),
|
||||
entire_read_source.delete_predicates}));
|
||||
}
|
||||
}
|
||||
|
||||
@ -170,13 +160,14 @@ Status ParallelScannerBuilder<ParentType>::_load() {
|
||||
_total_rows = 0;
|
||||
for (auto&& [tablet, version] : _tablets) {
|
||||
const auto tablet_id = tablet->tablet_id();
|
||||
auto& rowsets = _all_rowsets[tablet_id];
|
||||
{
|
||||
std::shared_lock read_lock(tablet->get_header_lock());
|
||||
RETURN_IF_ERROR(tablet->capture_consistent_rowsets({0, version}, &rowsets));
|
||||
auto& read_source = _all_read_sources[tablet_id];
|
||||
RETURN_IF_ERROR(tablet->capture_rs_readers({0, version}, &read_source.rs_splits, false));
|
||||
if (!_state->skip_delete_predicate()) {
|
||||
read_source.fill_delete_predicates();
|
||||
}
|
||||
|
||||
for (auto& rowset : rowsets) {
|
||||
for (auto& rs_split : read_source.rs_splits) {
|
||||
auto rowset = rs_split.rs_reader->rowset();
|
||||
RETURN_IF_ERROR(rowset->load());
|
||||
const auto rowset_id = rowset->rowset_id();
|
||||
auto& segment_cache_handle = _segment_cache_handles[rowset_id];
|
||||
@ -205,4 +196,4 @@ std::shared_ptr<NewOlapScanner> ParallelScannerBuilder<ParentType>::_build_scann
|
||||
template class ParallelScannerBuilder<NewOlapScanNode>;
|
||||
template class ParallelScannerBuilder<pipeline::OlapScanLocalState>;
|
||||
|
||||
} // namespace doris
|
||||
} // namespace doris
|
||||
|
||||
@ -19,8 +19,10 @@
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <utility>
|
||||
|
||||
#include "olap/rowset/rowset_fwd.h"
|
||||
#include "olap/rowset/segment_v2/row_ranges.h"
|
||||
#include "olap/segment_loader.h"
|
||||
#include "olap/tablet.h"
|
||||
@ -92,7 +94,7 @@ private:
|
||||
bool _is_preaggregation;
|
||||
std::vector<TabletWithVersion> _tablets;
|
||||
std::vector<OlapScanRange*> _key_ranges;
|
||||
std::unordered_map<int64_t, std::vector<RowsetSharedPtr>> _all_rowsets;
|
||||
std::unordered_map<int64_t, TabletReader::ReadSource> _all_read_sources;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
} // namespace doris
|
||||
|
||||
Reference in New Issue
Block a user