diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 00f88c7515..2a7187cc28 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -49,7 +49,7 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b if (result.size == 0) { return Status::OK(); } - int range_index = _search_read_range(offset, offset + result.size); + const int range_index = _search_read_range(offset, offset + result.size); if (range_index < 0) { SCOPED_RAW_TIMER(&_statistics.read_time); Status st = _reader->read_at(offset, result, bytes_read, io_ctx); @@ -99,6 +99,8 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b // merge small IO size_t merge_start = offset + has_read; const size_t merge_end = merge_start + READ_SLICE_SIZE; + // + std::vector> merged_slice; size_t content_size = 0; size_t hollow_size = 0; if (merge_start > _random_access_ranges[range_index].end_offset) { @@ -118,12 +120,14 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b size_t add_content = std::min(merge_end - merge_start, content_max); content_size += add_content; merge_start += add_content; + merged_slice.emplace_back(add_content, true); break; } size_t add_content = std::min(_random_access_ranges[merge_index].end_offset - merge_start, content_max); content_size += add_content; merge_start += add_content; + merged_slice.emplace_back(add_content, true); if (merge_start != _random_access_ranges[merge_index].end_offset) { break; } @@ -136,18 +140,9 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b } if (gap < merge_end - merge_start && content_size < _remaining && !_range_cached_data[merge_index + 1].has_read) { - size_t next_content = - std::min(_random_access_ranges[merge_index + 1].end_offset, merge_end) - - _random_access_ranges[merge_index + 1].start_offset; - next_content = std::min(next_content, _remaining - content_size); - double amplified_ratio = config::max_amplified_read_ratio; - if ((content_size + hollow_size) > MIN_READ_SIZE && - (hollow_size + gap) > (next_content + content_size) * amplified_ratio) { - // too large gap - break; - } hollow_size += gap; merge_start = _random_access_ranges[merge_index + 1].start_offset; + merged_slice.emplace_back(gap, false); } else { // there's no enough memory to read hollow data break; @@ -155,7 +150,33 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b } merge_index++; } - if (content_size + hollow_size == to_read) { + content_size = 0; + hollow_size = 0; + double amplified_ratio = config::max_amplified_read_ratio; + std::vector> ratio_and_size; + // Calculate the read amplified ratio for each merge operation and the size of the merged data. + // Find the largest size of the merged data whose amplified ratio is less than config::max_amplified_read_ratio + for (const std::pair& slice : merged_slice) { + if (slice.second) { + content_size += slice.first; + if (slice.first > 0) { + ratio_and_size.emplace_back((double)hollow_size / content_size, + content_size + hollow_size); + } + } else { + hollow_size += slice.first; + } + } + size_t best_merged_size = 0; + for (const std::pair& rs : ratio_and_size) { + if (rs.second > best_merged_size) { + if (rs.first < amplified_ratio || rs.second <= MIN_READ_SIZE) { + best_merged_size = rs.second; + } + } + } + + if (best_merged_size == to_read) { // read directly to avoid copy operation SCOPED_RAW_TIMER(&_statistics.read_time); size_t read_size = 0; @@ -170,8 +191,8 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b merge_start = offset + has_read; size_t merge_read_size = 0; - RETURN_IF_ERROR(_fill_box(range_index, merge_start, content_size + hollow_size, - &merge_read_size, io_ctx)); + RETURN_IF_ERROR( + _fill_box(range_index, merge_start, best_merged_size, &merge_read_size, io_ctx)); if (cached_data.start_offset != merge_start) { return Status::IOError("Wrong start offset in merged IO"); } diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 25a6811330..84235f0a46 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -75,6 +75,15 @@ struct PrefetchRange { */ class MergeRangeFileReader : public io::FileReader { public: + struct Statistics { + int64_t copy_time = 0; + int64_t read_time = 0; + int64_t request_io = 0; + int64_t merged_io = 0; + int64_t request_bytes = 0; + int64_t read_bytes = 0; + }; + struct RangeCachedData { size_t start_offset; size_t end_offset; @@ -190,20 +199,14 @@ public: // for test only const std::vector& box_reference() const { return _box_ref; } + // for test only + const Statistics& statistics() const { return _statistics; } + protected: Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, const IOContext* io_ctx) override; private: - struct Statistics { - int64_t copy_time = 0; - int64_t read_time = 0; - int64_t request_io = 0; - int64_t merged_io = 0; - int64_t request_bytes = 0; - int64_t read_bytes = 0; - }; - RuntimeProfile::Counter* _copy_time; RuntimeProfile::Counter* _read_time; RuntimeProfile::Counter* _request_io; diff --git a/be/test/io/fs/buffered_reader_test.cpp b/be/test/io/fs/buffered_reader_test.cpp index 6a281e125f..97ef217136 100644 --- a/be/test/io/fs/buffered_reader_test.cpp +++ b/be/test/io/fs/buffered_reader_test.cpp @@ -270,6 +270,51 @@ TEST_F(BufferedReaderTest, test_miss) { EXPECT_EQ(45, bytes_read); } +TEST_F(BufferedReaderTest, test_read_amplify) { + size_t kb = 1024; + io::FileReaderSPtr offset_reader = std::make_shared(2048 * kb); // 2MB + std::vector random_access_ranges; + random_access_ranges.emplace_back(0, 1 * kb); // column0 + // if read the follow slice, amplified_ratio = 1, but data size <= MIN_READ_SIZE + random_access_ranges.emplace_back(3 * kb, 4 * kb); // column1 + // if read the follow slice, amplified_ratio = 1, + // but merge the next rand, amplified_ratio will be decreased + random_access_ranges.emplace_back(5 * kb, 6 * kb); // column2 + random_access_ranges.emplace_back(7 * kb, 12 * kb); // column3 + // read the last range first, so we can't merge the last range when reading the former ranges, + // even if the amplified_ratio < 0.8 + random_access_ranges.emplace_back(512 * kb, 2048 * kb); // column4 + + io::MergeRangeFileReader merge_reader(nullptr, offset_reader, random_access_ranges); + char data[2048 * kb]; // 2MB + Slice result(data, 2048 * kb); + size_t bytes_read = 0; + + // read column4 + result.size = 1024 * kb; + merge_reader.read_at(1024 * kb, result, &bytes_read, nullptr); + EXPECT_EQ(bytes_read, 1024 * kb); + EXPECT_EQ(merge_reader.statistics().request_bytes, 1024 * kb); + EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb); + // read column0 + result.size = 1 * kb; + // will merge column 0 ~ 3 + merge_reader.read_at(0, result, &bytes_read, nullptr); + EXPECT_EQ(bytes_read, 1 * kb); + EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb + 12 * kb); + // read column1 + result.size = 1 * kb; + merge_reader.read_at(3 * kb, result, &bytes_read, nullptr); + // read column2 + result.size = 1 * kb; + merge_reader.read_at(5 * kb, result, &bytes_read, nullptr); + // read column3 + result.size = 5 * kb; + merge_reader.read_at(7 * kb, result, &bytes_read, nullptr); + EXPECT_EQ(merge_reader.statistics().request_bytes, 1024 * kb + 8 * kb); + EXPECT_EQ(merge_reader.statistics().read_bytes, 1024 * kb + 12 * kb); +} + TEST_F(BufferedReaderTest, test_merged_io) { io::FileReaderSPtr offset_reader = std::make_shared(128 * 1024 * 1024); // 128MB