[opt](FileReader) turn off prefetch data in parquet page reader when using MergeRangeFileReader (#19102)

Using both `MergeRangeFileReader` and `BufferedStreamReader` simultaneously would waste a lot of memory,
so turn off prefetch data in `BufferedStreamReader` when using MergeRangeFileReader.
This commit is contained in:
Ashin Gau
2023-04-28 09:27:56 +08:00
committed by GitHub
parent 86be6d27e7
commit 65a82a0b57
5 changed files with 35 additions and 17 deletions

View File

@ -68,7 +68,7 @@ struct PrefetchRange {
*
* When reading at offset, if [offset, offset + 8MB) contains many random access ranges, the reader
* will read data in [offset, offset + 8MB) as a whole, and copy the data in random access ranges
* into small buffers(name as box, default 1MB, 64MB in total). A box can be occupied by many ranges,
* into small buffers(name as box, default 1MB, 128MB in total). A box can be occupied by many ranges,
* and use a reference counter to record how many ranges are cached in the box. If reference counter
* equals zero, the box can be release or reused by other ranges. When there is no empty box for a new
* read operation, the read operation will do directly.
@ -118,11 +118,11 @@ public:
}
};
static constexpr size_t TOTAL_BUFFER_SIZE = 64 * 1024 * 1024; // 64MB
static constexpr size_t TOTAL_BUFFER_SIZE = 128 * 1024 * 1024; // 128MB
static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024; // 8MB
static constexpr size_t BOX_SIZE = 1 * 1024 * 1024; // 1MB
static constexpr size_t SMALL_IO = 2 * 1024 * 1024; // 2MB
static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 64
static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128
MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
const std::vector<PrefetchRange>& random_access_ranges)

View File

@ -26,7 +26,6 @@
#include <utility>
#include "runtime/define_primitive_type.h"
#include "runtime/types.h"
#include "schema_desc.h"
#include "util/runtime_profile.h"
#include "vec/columns/column.h"
@ -194,8 +193,13 @@ Status ScalarColumnReader::init(io::FileReaderSPtr file, FieldSchema* field, siz
? chunk_meta.dictionary_page_offset
: chunk_meta.data_page_offset;
size_t chunk_len = chunk_meta.total_compressed_size;
_stream_reader = std::make_unique<io::BufferedFileStreamReader>(
file, chunk_start, chunk_len, std::min(chunk_len, max_buf_size));
size_t prefetch_buffer_size = std::min(chunk_len, max_buf_size);
if (typeid_cast<io::MergeRangeFileReader*>(file.get())) {
// turn off prefetch data when using MergeRangeFileReader
prefetch_buffer_size = 0;
}
_stream_reader = std::make_unique<io::BufferedFileStreamReader>(file, chunk_start, chunk_len,
prefetch_buffer_size);
_chunk_reader = std::make_unique<ColumnChunkReader>(_stream_reader.get(), &_chunk_meta, field,
_ctz, _io_ctx);
RETURN_IF_ERROR(_chunk_reader->init());

View File

@ -561,11 +561,18 @@ Status ParquetReader::_next_row_group_reader() {
RowGroupReader::PositionDeleteContext position_delete_ctx =
_get_position_delete_ctx(row_group, row_group_index);
io::FileReaderSPtr random_reader = std::make_shared<io::MergeRangeFileReader>(
_profile, _file_reader, _generate_random_access_ranges(row_group_index));
size_t avg_io_size = 0;
const std::vector<io::PrefetchRange> io_ranges =
_generate_random_access_ranges(row_group_index, &avg_io_size);
// The underlying page reader will prefetch data in column.
// Using both MergeRangeFileReader and BufferedStreamReader simultaneously would waste a lot of memory.
io::FileReaderSPtr group_file_reader =
avg_io_size < io::MergeRangeFileReader::SMALL_IO
? std::make_shared<io::MergeRangeFileReader>(_profile, _file_reader, io_ranges)
: _file_reader;
_current_group_reader.reset(new RowGroupReader(
random_reader, _read_columns, row_group_index.row_group_id, row_group, _ctz, _io_ctx,
position_delete_ctx, _lazy_read_ctx, _state));
group_file_reader, _read_columns, row_group_index.row_group_id, row_group, _ctz,
_io_ctx, position_delete_ctx, _lazy_read_ctx, _state));
_row_group_eof = false;
return _current_group_reader->init(_file_metadata->schema(), candidate_row_ranges, _col_offsets,
_tuple_descriptor, _row_descriptor, _colname_to_slot_id,
@ -619,9 +626,10 @@ Status ParquetReader::_init_row_groups(const bool& is_filter_groups) {
}
std::vector<io::PrefetchRange> ParquetReader::_generate_random_access_ranges(
const RowGroupReader::RowGroupIndex& group) {
const RowGroupReader::RowGroupIndex& group, size_t* avg_io_size) {
std::vector<io::PrefetchRange> result;
int64_t last_chunk_end = -1;
size_t total_io_size = 0;
std::function<void(const FieldSchema*, const tparquet::RowGroup&)> scalar_range =
[&](const FieldSchema* field, const tparquet::RowGroup& row_group) {
if (field->type.type == TYPE_ARRAY) {
@ -643,6 +651,7 @@ std::vector<io::PrefetchRange> ParquetReader::_generate_random_access_ranges(
int64_t chunk_end = chunk_start + chunk_meta.total_compressed_size;
DCHECK_GE(chunk_start, last_chunk_end);
result.emplace_back(chunk_start, chunk_end);
total_io_size += chunk_meta.total_compressed_size;
last_chunk_end = chunk_end;
}
};
@ -651,6 +660,9 @@ std::vector<io::PrefetchRange> ParquetReader::_generate_random_access_ranges(
const FieldSchema* field = _file_metadata->schema().get_column(read_col._file_slot_name);
scalar_range(field, row_group);
}
if (!result.empty()) {
*avg_io_size = total_io_size / result.size();
}
return result;
}

View File

@ -203,7 +203,7 @@ private:
int64_t _get_column_start_offset(const tparquet::ColumnMetaData& column_init_column_readers);
std::string _meta_cache_key(const std::string& path) { return "meta_" + path; }
std::vector<io::PrefetchRange> _generate_random_access_ranges(
const RowGroupReader::RowGroupIndex& group);
const RowGroupReader::RowGroupIndex& group, size_t* avg_io_size);
RuntimeProfile* _profile;
const TFileScanRangeParams& _scan_params;

View File

@ -284,8 +284,9 @@ TEST_F(BufferedReaderTest, test_merged_io) {
// read column 0
merge_reader.read_at(0, result, &bytes_read, nullptr);
// will merge 3MB + 1MB + 3MB, and read out 1MB
// so _remaining in MergeRangeFileReader is: 64MB - (3MB + 3MB - 1MB) = 59MB
EXPECT_EQ(59 * 1024 * 1024, merge_reader.buffer_remaining());
// so _remaining in MergeRangeFileReader is: ${NUM_BOX}MB - (3MB + 3MB - 1MB)
EXPECT_EQ((io::MergeRangeFileReader::NUM_BOX - 5) * 1024 * 1024,
merge_reader.buffer_remaining());
auto& range_cached_data = merge_reader.range_cached_data();
// range 0 is read out 1MB, so the cached range is [1MB, 3MB)
// range 1 is not read, so the cached range is [4MB, 7MB)
@ -299,14 +300,15 @@ TEST_F(BufferedReaderTest, test_merged_io) {
// the column 1 is already cached
EXPECT_EQ(5 * 1024 * 1024, range_cached_data[1].start_offset);
EXPECT_EQ(7 * 1024 * 1024, range_cached_data[1].end_offset);
EXPECT_EQ(60 * 1024 * 1024, merge_reader.buffer_remaining());
EXPECT_EQ((io::MergeRangeFileReader::NUM_BOX - 4) * 1024 * 1024,
merge_reader.buffer_remaining());
// read all cached data
merge_reader.read_at(1 * 1024 * 1024, result, &bytes_read, nullptr);
merge_reader.read_at(2 * 1024 * 1024, result, &bytes_read, nullptr);
merge_reader.read_at(5 * 1024 * 1024, result, &bytes_read, nullptr);
merge_reader.read_at(6 * 1024 * 1024, result, &bytes_read, nullptr);
EXPECT_EQ(64 * 1024 * 1024, merge_reader.buffer_remaining());
EXPECT_EQ(io::MergeRangeFileReader::TOTAL_BUFFER_SIZE, merge_reader.buffer_remaining());
// read all remaining columns
for (int i = 0; i < 3; ++i) {
@ -334,7 +336,7 @@ TEST_F(BufferedReaderTest, test_merged_io) {
}
// check the final state
EXPECT_EQ(64 * 1024 * 1024, merge_reader.buffer_remaining());
EXPECT_EQ(io::MergeRangeFileReader::TOTAL_BUFFER_SIZE, merge_reader.buffer_remaining());
for (auto& cached_data : merge_reader.range_cached_data()) {
EXPECT_TRUE(cached_data.empty());
}