From d183e08f6d9dcfacf804362e3ab9ad78dfaa2b06 Mon Sep 17 00:00:00 2001 From: Ashin Gau Date: Wed, 6 Sep 2023 22:45:31 +0800 Subject: [PATCH] [opt](MergedIO) optimize merge small IO, prevent amplified read (#23849) There were two vulnerabilities in the previous fix(https://github.com/apache/doris/pull/20305): 1. `next_content` may not necessarily be a truly readable range 2. The last range of the merged data may be the hollow This PR fundamentally solves the problem of reading amplification by rechecking the calculation range. According to the algorithm, there is only one possibility of generating read amplification, with only a small content of data within the 4k(`MIN_READ_SIZE `) range. However, 4k is generally the minimum IO size and there is no need for further segmentation. --- be/src/io/fs/buffered_reader.cpp | 49 ++++++++++++++++++-------- be/src/io/fs/buffered_reader.h | 21 ++++++----- be/test/io/fs/buffered_reader_test.cpp | 45 +++++++++++++++++++++++ 3 files changed, 92 insertions(+), 23 deletions(-) diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 00f88c7515..2a7187cc28 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -49,7 +49,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b if (result.size == 0) { return Status::OK(); } - int range_index = _search_read_range(offset, offset + result.size); + const int range_index = _search_read_range(offset, offset + result.size); if (range_index < 0) { SCOPED_RAW_TIMER(&_statistics.read_time); Status st = _reader->read_at(offset, result, bytes_read, io_ctx); @@ -99,6 +99,8 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b // merge small IO size_t merge_start = offset + has_read; const size_t merge_end = merge_start + READ_SLICE_SIZE; + // + std::vector> merged_slice; size_t content_size = 0; size_t hollow_size = 0; if (merge_start > _random_access_ranges[range_index].end_offset) { @@ -118,12 +120,14 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b size_t add_content = std::min(merge_end - merge_start, content_max); content_size += add_content; merge_start += add_content; + merged_slice.emplace_back(add_content, true); break; } size_t add_content = std::min(_random_access_ranges[merge_index].end_offset - merge_start, content_max); content_size += add_content; merge_start += add_content; + merged_slice.emplace_back(add_content, true); if (merge_start != _random_access_ranges[merge_index].end_offset) { break; } @@ -136,18 +140,9 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b } if (gap < merge_end - merge_start && content_size < _remaining && !_range_cached_data[merge_index + 1].has_read) { - size_t next_content = - std::min(_random_access_ranges[merge_index + 1].end_offset, merge_end) - - _random_access_ranges[merge_index + 1].start_offset; - next_content = std::min(next_content, _remaining - content_size); - double amplified_ratio = config::max_amplified_read_ratio; - if ((content_size + hollow_size) > MIN_READ_SIZE && - (hollow_size + gap) > (next_content + content_size) * amplified_ratio) { - // too large gap - break; - } hollow_size += gap; merge_start = _random_access_ranges[merge_index + 1].start_offset; + merged_slice.emplace_back(gap, false); } else { // there's no enough memory to read hollow data break; @@ -155,7 +150,33 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b } merge_index++; } - if (content_size + hollow_size == to_read) { + content_size = 0; + hollow_size = 0; + double amplified_ratio = config::max_amplified_read_ratio; + std::vector> ratio_and_size; + // Calculate the read amplified ratio for each merge operation and the size of the merged data. + // Find the largest size of the merged data whose amplified ratio is less than config::max_amplified_read_ratio + for (const std::pair& slice : merged_slice) { + if (slice.second) { + content_size += slice.first; + if (slice.first > 0) { + ratio_and_size.emplace_back((double)hollow_size / content_size, + content_size + hollow_size); + } + } else { + hollow_size += slice.first; + } + } + size_t best_merged_size = 0; + for (const std::pair& rs : ratio_and_size) { + if (rs.second > best_merged_size) { + if (rs.first < amplified_ratio || rs.second <= MIN_READ_SIZE) { + best_merged_size = rs.second; + } + } + } + + if (best_merged_size == to_read) { // read directly to avoid copy operation SCOPED_RAW_TIMER(&_statistics.read_time); size_t read_size = 0; @@ -170,8 +191,8 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b merge_start = offset + has_read; size_t merge_read_size = 0; - RETURN_IF_ERROR(_fill_box(range_index, merge_start, content_size + hollow_size, - &merge_read_size, io_ctx)); + RETURN_IF_ERROR( + _fill_box(range_index, merge_start, best_merged_size, &merge_read_size, io_ctx)); if (cached_data.start_offset != merge_start) { return Status::IOError("Wrong start offset in merged IO"); } diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 25a6811330..84235f0a46 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -75,6 +75,15 @@ struct PrefetchRange { */ class MergeRangeFileReader : public io::FileReader { public: + struct Statistics { + int64_t copy_time = 0; + int64_t read_time = 0; + int64_t request_io = 0; + int64_t merged_io = 0; + int64_t request_bytes = 0; + int64_t read_bytes = 0; + }; + struct RangeCachedData { size_t start_offset; size_t end_offset; @@ -190,20 +199,14 @@ public: // for test only const std::vector& box_reference() const { return _box_ref; } + // for test only + const Statistics& statistics() const { return _statistics; } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; private: - struct Statistics { - int64_t copy_time = 0; - int64_t read_time = 0; - int64_t request_io = 0; - int64_t merged_io = 0; - int64_t request_bytes = 0; - int64_t read_bytes = 0; - }; - RuntimeProfile::Counter* _copy_time; RuntimeProfile::Counter* _read_time; RuntimeProfile::Counter* _request_io; diff --git a/be/test/io/fs/buffered_reader_test.cpp b/be/test/io/fs/buffered_reader_test.cpp index 6a281e125f..97ef217136 100644 --- a/be/test/io/fs/buffered_reader_test.cpp +++ b/be/test/io/fs/buffered_reader_test.cpp @@ -270,6 +270,51 @@ TEST_F(BufferedReaderTest, test_miss) { EXPECT_EQ(45, bytes_read); } +TEST_F(BufferedReaderTest, test_read_amplify) { + size_t kb = 1024; + io::FileReaderSPtr offset_reader = std::make_shared(2048 * kb); // 2MB + std::vector random_access_ranges; + random_access_ranges.emplace_back(0, 1 * kb); // column0 + // if read the follow slice, amplified_ratio = 1, but data size <= MIN_READ_SIZE + random_access_ranges.emplace_back(3 * kb, 4 * kb); // column1 + // if read the follow slice, amplified_ratio = 1, + // but merge the next rand, amplified_ratio will be decreased + random_access_ranges.emplace_back(5 * kb, 6 * kb); // column2 + random_access_ranges.emplace_back(7 * kb, 12 * kb); // column3 + // read the last range first, so we can't merge the last range when reading the former ranges, + // even if the amplified_ratio < 0.8 + random_access_ranges.emplace_back(512 * kb, 2048 * kb); // column4 + + io::MergeRangeFileReader merge_reader(nullptr, offset_reader, random_access_ranges); + char data[2048 * kb]; // 2MB + Slice result(data, 2048 * kb); + size_t bytes_read = 0; + + // read column4 + result.size = 1024 * kb; + merge_reader.read_at(1024 * kb, result, &bytes_read, nullptr); + EXPECT_EQ(bytes_read, 1024 * kb); + EXPECT_EQ(merge_reader.statistics().request_bytes, 1024 * kb); + EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb); + // read column0 + result.size = 1 * kb; + // will merge column 0 ~ 3 + merge_reader.read_at(0, result, &bytes_read, nullptr); + EXPECT_EQ(bytes_read, 1 * kb); + EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb + 12 * kb); + // read column1 + result.size = 1 * kb; + merge_reader.read_at(3 * kb, result, &bytes_read, nullptr); + // read column2 + result.size = 1 * kb; + merge_reader.read_at(5 * kb, result, &bytes_read, nullptr); + // read column3 + result.size = 5 * kb; + merge_reader.read_at(7 * kb, result, &bytes_read, nullptr); + EXPECT_EQ(merge_reader.statistics().request_bytes, 1024 * kb + 8 * kb); + EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb + 12 * kb); +} + TEST_F(BufferedReaderTest, test_merged_io) { io::FileReaderSPtr offset_reader = std::make_shared(128 * 1024 * 1024); // 128MB