diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 04c29e8dd7..0b2d5bf614 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -856,6 +856,7 @@ DEFINE_mInt32(parquet_header_max_size_mb, "1"); DEFINE_mInt32(parquet_rowgroup_max_buffer_mb, "128"); // Max buffer size for parquet chunk column DEFINE_mInt32(parquet_column_max_buffer_mb, "8"); +DEFINE_mDouble(max_amplified_read_ratio, "0.8"); // OrcReader DEFINE_mInt32(orc_natural_read_size_mb, "8"); diff --git a/be/src/common/config.h b/be/src/common/config.h index c1d51cfb3b..895783ebfc 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -884,6 +884,8 @@ DECLARE_mInt32(parquet_header_max_size_mb); DECLARE_mInt32(parquet_rowgroup_max_buffer_mb); // Max buffer size for parquet chunk column DECLARE_mInt32(parquet_column_max_buffer_mb); +// Merge small IO, the max amplified read ratio +DECLARE_mDouble(max_amplified_read_ratio); // OrcReader DECLARE_mInt32(orc_natural_read_size_mb); diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 1e0ec6921e..6dcbad96b7 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -135,6 +135,16 @@ Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* b } if (gap < merge_end - merge_start && content_size < _remaining && !_range_cached_data[merge_index + 1].has_read) { + size_t next_content = + std::min(_random_access_ranges[merge_index + 1].end_offset, merge_end) - + _random_access_ranges[merge_index + 1].start_offset; + next_content = std::min(next_content, _remaining - content_size); + double amplified_ratio = config::max_amplified_read_ratio; + if ((content_size + hollow_size) > MIN_READ_SIZE && + (hollow_size + gap) > (next_content + content_size) * amplified_ratio) { + // too large gap + break; + } hollow_size += gap; merge_start = _random_access_ranges[merge_index + 1].start_offset; } else { diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index d0fb79a2d7..a23112027b 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -123,6 +123,7 @@ public: static constexpr size_t BOX_SIZE = 1 * 1024 * 1024; // 1MB static constexpr size_t SMALL_IO = 2 * 1024 * 1024; // 2MB static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128 + static constexpr size_t MIN_READ_SIZE = 4096; // 4KB MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader, const std::vector& random_access_ranges) diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 23e1fcf7d9..a4f49e6ad9 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -569,15 +569,21 @@ Status ParquetReader::_next_row_group_reader() { RowGroupReader::PositionDeleteContext position_delete_ctx = _get_position_delete_ctx(row_group, row_group_index); - size_t avg_io_size = 0; - const std::vector io_ranges = - _generate_random_access_ranges(row_group_index, &avg_io_size); - // The underlying page reader will prefetch data in column. - // Using both MergeRangeFileReader and BufferedStreamReader simultaneously would waste a lot of memory. - io::FileReaderSPtr group_file_reader = - avg_io_size < io::MergeRangeFileReader::SMALL_IO - ? std::make_shared(_profile, _file_reader, io_ranges) - : _file_reader; + io::FileReaderSPtr group_file_reader; + if (typeid_cast(_file_reader.get())) { + // InMemoryFileReader has the ability to merge small IO + group_file_reader = _file_reader; + } else { + size_t avg_io_size = 0; + const std::vector io_ranges = + _generate_random_access_ranges(row_group_index, &avg_io_size); + // The underlying page reader will prefetch data in column. + // Using both MergeRangeFileReader and BufferedStreamReader simultaneously would waste a lot of memory. + group_file_reader = avg_io_size < io::MergeRangeFileReader::SMALL_IO + ? std::make_shared( + _profile, _file_reader, io_ranges) + : _file_reader; + } _current_group_reader.reset(new RowGroupReader( group_file_reader, _read_columns, row_group_index.row_group_id, row_group, _ctz, _io_ctx, position_delete_ctx, _lazy_read_ctx, _state));