diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index c89ed3a9f9..0208139ba1 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -68,7 +68,7 @@ struct PrefetchRange { * * When reading at offset, if [offset, offset + 8MB) contains many random access ranges, the reader * will read data in [offset, offset + 8MB) as a whole, and copy the data in random access ranges - * into small buffers(name as box, default 1MB, 64MB in total). A box can be occupied by many ranges, + * into small buffers(name as box, default 1MB, 128MB in total). A box can be occupied by many ranges, * and use a reference counter to record how many ranges are cached in the box. If reference counter * equals zero, the box can be release or reused by other ranges. When there is no empty box for a new * read operation, the read operation will do directly. @@ -118,11 +118,11 @@ public: } }; - static constexpr size_t TOTAL_BUFFER_SIZE = 64 * 1024 * 1024; // 64MB + static constexpr size_t TOTAL_BUFFER_SIZE = 128 * 1024 * 1024; // 128MB static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024; // 8MB static constexpr size_t BOX_SIZE = 1 * 1024 * 1024; // 1MB static constexpr size_t SMALL_IO = 2 * 1024 * 1024; // 2MB - static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 64 + static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128 MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader, const std::vector& random_access_ranges) diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 3bb43e2ae4..152012da4b 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -26,7 +26,6 @@ #include #include "runtime/define_primitive_type.h" -#include "runtime/types.h" #include "schema_desc.h" #include "util/runtime_profile.h" #include "vec/columns/column.h" @@ -194,8 +193,13 @@ Status ScalarColumnReader::init(io::FileReaderSPtr file, FieldSchema* field, siz ? chunk_meta.dictionary_page_offset : chunk_meta.data_page_offset; size_t chunk_len = chunk_meta.total_compressed_size; - _stream_reader = std::make_unique( - file, chunk_start, chunk_len, std::min(chunk_len, max_buf_size)); + size_t prefetch_buffer_size = std::min(chunk_len, max_buf_size); + if (typeid_cast(file.get())) { + // turn off prefetch data when using MergeRangeFileReader + prefetch_buffer_size = 0; + } + _stream_reader = std::make_unique(file, chunk_start, chunk_len, + prefetch_buffer_size); _chunk_reader = std::make_unique(_stream_reader.get(), &_chunk_meta, field, _ctz, _io_ctx); RETURN_IF_ERROR(_chunk_reader->init()); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 388dbcf838..24539fc0ec 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -561,11 +561,18 @@ Status ParquetReader::_next_row_group_reader() { RowGroupReader::PositionDeleteContext position_delete_ctx = _get_position_delete_ctx(row_group, row_group_index); - io::FileReaderSPtr random_reader = std::make_shared( - _profile, _file_reader, _generate_random_access_ranges(row_group_index)); + size_t avg_io_size = 0; + const std::vector io_ranges = + _generate_random_access_ranges(row_group_index, &avg_io_size); + // The underlying page reader will prefetch data in column. + // Using both MergeRangeFileReader and BufferedStreamReader simultaneously would waste a lot of memory. + io::FileReaderSPtr group_file_reader = + avg_io_size < io::MergeRangeFileReader::SMALL_IO + ? std::make_shared(_profile, _file_reader, io_ranges) + : _file_reader; _current_group_reader.reset(new RowGroupReader( - random_reader, _read_columns, row_group_index.row_group_id, row_group, _ctz, _io_ctx, - position_delete_ctx, _lazy_read_ctx, _state)); + group_file_reader, _read_columns, row_group_index.row_group_id, row_group, _ctz, + _io_ctx, position_delete_ctx, _lazy_read_ctx, _state)); _row_group_eof = false; return _current_group_reader->init(_file_metadata->schema(), candidate_row_ranges, _col_offsets, _tuple_descriptor, _row_descriptor, _colname_to_slot_id, @@ -619,9 +626,10 @@ Status ParquetReader::_init_row_groups(const bool& is_filter_groups) { } std::vector ParquetReader::_generate_random_access_ranges( - const RowGroupReader::RowGroupIndex& group) { + const RowGroupReader::RowGroupIndex& group, size_t* avg_io_size) { std::vector result; int64_t last_chunk_end = -1; + size_t total_io_size = 0; std::function scalar_range = [&](const FieldSchema* field, const tparquet::RowGroup& row_group) { if (field->type.type == TYPE_ARRAY) { @@ -643,6 +651,7 @@ std::vector ParquetReader::_generate_random_access_ranges( int64_t chunk_end = chunk_start + chunk_meta.total_compressed_size; DCHECK_GE(chunk_start, last_chunk_end); result.emplace_back(chunk_start, chunk_end); + total_io_size += chunk_meta.total_compressed_size; last_chunk_end = chunk_end; } }; @@ -651,6 +660,9 @@ std::vector ParquetReader::_generate_random_access_ranges( const FieldSchema* field = _file_metadata->schema().get_column(read_col._file_slot_name); scalar_range(field, row_group); } + if (!result.empty()) { + *avg_io_size = total_io_size / result.size(); + } return result; } diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 565fe63675..89dbc832a6 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -203,7 +203,7 @@ private: int64_t _get_column_start_offset(const tparquet::ColumnMetaData& column_init_column_readers); std::string _meta_cache_key(const std::string& path) { return "meta_" + path; } std::vector _generate_random_access_ranges( - const RowGroupReader::RowGroupIndex& group); + const RowGroupReader::RowGroupIndex& group, size_t* avg_io_size); RuntimeProfile* _profile; const TFileScanRangeParams& _scan_params; diff --git a/be/test/io/fs/buffered_reader_test.cpp b/be/test/io/fs/buffered_reader_test.cpp index 35d53b6f57..35b0cc60e1 100644 --- a/be/test/io/fs/buffered_reader_test.cpp +++ b/be/test/io/fs/buffered_reader_test.cpp @@ -284,8 +284,9 @@ TEST_F(BufferedReaderTest, test_merged_io) { // read column 0 merge_reader.read_at(0, result, &bytes_read, nullptr); // will merge 3MB + 1MB + 3MB, and read out 1MB - // so _remaining in MergeRangeFileReader is: 64MB - (3MB + 3MB - 1MB) = 59MB - EXPECT_EQ(59 * 1024 * 1024, merge_reader.buffer_remaining()); + // so _remaining in MergeRangeFileReader is: ${NUM_BOX}MB - (3MB + 3MB - 1MB) + EXPECT_EQ((io::MergeRangeFileReader::NUM_BOX - 5) * 1024 * 1024, + merge_reader.buffer_remaining()); auto& range_cached_data = merge_reader.range_cached_data(); // range 0 is read out 1MB, so the cached range is [1MB, 3MB) // range 1 is not read, so the cached range is [4MB, 7MB) @@ -299,14 +300,15 @@ TEST_F(BufferedReaderTest, test_merged_io) { // the column 1 is already cached EXPECT_EQ(5 * 1024 * 1024, range_cached_data[1].start_offset); EXPECT_EQ(7 * 1024 * 1024, range_cached_data[1].end_offset); - EXPECT_EQ(60 * 1024 * 1024, merge_reader.buffer_remaining()); + EXPECT_EQ((io::MergeRangeFileReader::NUM_BOX - 4) * 1024 * 1024, + merge_reader.buffer_remaining()); // read all cached data merge_reader.read_at(1 * 1024 * 1024, result, &bytes_read, nullptr); merge_reader.read_at(2 * 1024 * 1024, result, &bytes_read, nullptr); merge_reader.read_at(5 * 1024 * 1024, result, &bytes_read, nullptr); merge_reader.read_at(6 * 1024 * 1024, result, &bytes_read, nullptr); - EXPECT_EQ(64 * 1024 * 1024, merge_reader.buffer_remaining()); + EXPECT_EQ(io::MergeRangeFileReader::TOTAL_BUFFER_SIZE, merge_reader.buffer_remaining()); // read all remaining columns for (int i = 0; i < 3; ++i) { @@ -334,7 +336,7 @@ TEST_F(BufferedReaderTest, test_merged_io) { } // check the final state - EXPECT_EQ(64 * 1024 * 1024, merge_reader.buffer_remaining()); + EXPECT_EQ(io::MergeRangeFileReader::TOTAL_BUFFER_SIZE, merge_reader.buffer_remaining()); for (auto& cached_data : merge_reader.range_cached_data()) { EXPECT_TRUE(cached_data.empty()); }