From 5bfdfac38742888287b57cdfddf149e7af85f162 Mon Sep 17 00:00:00 2001 From: Ashin Gau Date: Fri, 23 Sep 2022 18:42:14 +0800 Subject: [PATCH] [feature-wip](parquet-reader) add parquet reader profile (#12797) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add profile for parquet reader. New counters: - ParquetFilteredGroups: Filtered row groups by `RowGroup` min-max statistics - ParquetReadGroups: The number of row groups to read - ParquetFilteredRowsByGroup: The number of filtered rows by `RowGroup` min-max statistics - ParquetFilteredRowsByPage: The number of filtered rows by page min-max statistics - ParquetFilteredBytes: The filtered bytes by `RowGroup` min-max statistics - ParquetReadBytes: The total bytes in `ParquetReadGroups`, may be further filtered If a page is skipped as a whole ## Result ``` ┌──────────────────────────────────────────────────────┐ │[0: VFILE_SCAN_NODE] │ │(Active: 1s29ms, non-child: 96.42) │ │ - Counters: │ │ - BytesRead: 0.00 │ │ - FileReadCalls: 1.826K (1826) │ │ - FileReadTime: 510.627ms │ │ - FileRemoteReadBytes: 65.23 MB │ │ - FileRemoteReadCalls: 1.146K (1146) │ │ - FileRemoteReadRate: 128.29331970214844 MB/sec │ │ - FileRemoteReadTime: 508.469ms │ │ - NumDiskAccess: 0 │ │ - NumScanners: 1 │ │ - ParquetFilteredBytes: 0.00 │ │ - ParquetFilteredGroups: 0 │ │ - ParquetFilteredRowsByGroup: 0 │ │ - ParquetFilteredRowsByPage: 6.600003M (6600003)│ │ - ParquetReadBytes: 2.13 GB │ │ - ParquetReadGroups: 20 │ │ - PeakMemoryUsage: 0.00 │ │ - PredicateFilteredRows: 3.399797M (3399797) │ │ - PredicateFilteredTime: 133.302ms │ │ - RowsRead: 3.399997M (3399997) │ │ - RowsReturned: 200 │ │ - RowsReturnedRate: 194 │ │ - TotalRawReadTime(*): 726.566ms │ │ - TotalReadThroughput: 0.0 /sec │ │ - WaitScannerTime: 1s27ms │ └──────────────────────────────────────────────────────┘ ``` --- be/src/io/file_factory.cpp | 32 ++++++++ be/src/io/file_factory.h | 8 ++ be/src/vec/exec/file_hdfs_scanner.cpp | 6 +- be/src/vec/exec/file_scanner.cpp | 9 +-- .../exec/format/parquet/vparquet_reader.cpp | 79 +++++++++++++++---- .../vec/exec/format/parquet/vparquet_reader.h | 49 ++++++++---- be/src/vec/exec/scan/vfile_scanner.cpp | 32 ++++---- be/src/vec/exec/scan/vfile_scanner.h | 4 - .../vec/exec/parquet/parquet_reader_test.cpp | 9 ++- 9 files changed, 165 insertions(+), 63 deletions(-) diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 7a367eadfe..36a8bdbdf4 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -186,3 +186,35 @@ doris::Status doris::FileFactory::create_file_reader(doris::ExecEnv* env, Runtim return Status::OK(); } + +doris::Status doris::FileFactory::create_file_reader(RuntimeProfile* profile, + const TFileScanRangeParams& params, + const TFileRangeDesc& range, + std::unique_ptr& file_reader, + int64_t buffer_size) { + doris::TFileType::type type = params.file_type; + FileReader* file_reader_ptr; + switch (type) { + case TFileType::FILE_LOCAL: { + file_reader_ptr = new LocalFileReader(range.path, range.start_offset); + break; + } + case TFileType::FILE_S3: { + file_reader_ptr = new S3Reader(params.properties, range.path, range.start_offset); + break; + } + case TFileType::FILE_HDFS: { + RETURN_IF_ERROR(HdfsReaderWriter::create_reader(params.hdfs_params, range.path, + range.start_offset, &file_reader_ptr)); + break; + } + default: + return Status::InternalError("Unsupported File Reader Type: " + std::to_string(type)); + } + if (buffer_size > 0) { + file_reader.reset(new BufferedReader(profile, file_reader_ptr, buffer_size)); + } else { + file_reader.reset(file_reader_ptr); + } + return Status::OK(); +} diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index c36b611f9e..4fe9a2600a 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -58,6 +58,14 @@ public: const TFileRangeDesc& range, std::shared_ptr& file_reader); + /** + * Create FileReader. If buffer_size > 0, use BufferedReader to wrap the underlying FileReader; + * Otherwise, return the underlying FileReader directly. + */ + static Status create_file_reader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, + std::unique_ptr& file_reader, int64_t buffer_size); + static TFileType::type convert_storage_type(TStorageBackendType::type type) { switch (type) { case TStorageBackendType::LOCAL: diff --git a/be/src/vec/exec/file_hdfs_scanner.cpp b/be/src/vec/exec/file_hdfs_scanner.cpp index 593a5c92d4..ec891730c8 100644 --- a/be/src/vec/exec/file_hdfs_scanner.cpp +++ b/be/src/vec/exec/file_hdfs_scanner.cpp @@ -78,9 +78,9 @@ Status ParquetFileHdfsScanner::_get_next_reader() { for (int i = 0; i < _file_slot_descs.size(); i++) { column_names.push_back(_file_slot_descs[i]->col_name()); } - _reader.reset(new ParquetReader( - file_reader.release(), column_names, _state->query_options().batch_size, - range.start_offset, range.size, const_cast(&_state->timezone_obj()))); + _reader.reset(new ParquetReader(_profile, _params, range, column_names, + _state->query_options().batch_size, + const_cast(&_state->timezone_obj()))); Status status = _reader->init_reader(_conjunct_ctxs); if (!status.ok()) { if (status.is_end_of_file()) { diff --git a/be/src/vec/exec/file_scanner.cpp b/be/src/vec/exec/file_scanner.cpp index bb1ba21924..04eb03db83 100644 --- a/be/src/vec/exec/file_scanner.cpp +++ b/be/src/vec/exec/file_scanner.cpp @@ -52,12 +52,9 @@ FileScanner::FileScanner(RuntimeState* state, RuntimeProfile* profile, } Status FileScanner::open() { - RETURN_IF_ERROR(_init_expr_ctxes()); - _rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT); _read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)"); - - return Status::OK(); + return _init_expr_ctxes(); } void FileScanner::reg_conjunct_ctxs(const TupleId& tupleId, @@ -127,7 +124,9 @@ void FileScanner::close() { if (_vpre_filter_ctx_ptr) { (*_vpre_filter_ctx_ptr)->close(_state); } - COUNTER_UPDATE(_rows_read_counter, _read_row_counter); + if (_rows_read_counter) { + COUNTER_UPDATE(_rows_read_counter, _read_row_counter); + } } Status FileScanner::init_block(vectorized::Block* block) { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index e8c59de8d5..b7372a7495 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -17,19 +17,30 @@ #include "vparquet_reader.h" +#include "io/file_factory.h" #include "parquet_thrift_util.h" namespace doris::vectorized { -ParquetReader::ParquetReader(FileReader* file_reader, const std::vector& column_names, - size_t batch_size, int64_t range_start_offset, int64_t range_size, +ParquetReader::ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, + const std::vector& column_names, size_t batch_size, cctz::time_zone* ctz) - : _file_reader(file_reader), + : _profile(profile), + _scan_params(params), + _scan_range(range), _batch_size(batch_size), - _range_start_offset(range_start_offset), - _range_size(range_size), + _range_start_offset(range.start_offset), + _range_size(range.size), _ctz(ctz), _column_names(column_names) { - // _statistics = std::make_shared(); + if (profile != nullptr) { + _filtered_row_groups = ADD_COUNTER(profile, "ParquetFilteredGroups", TUnit::UNIT); + _to_read_row_groups = ADD_COUNTER(profile, "ParquetReadGroups", TUnit::UNIT); + _filtered_group_rows = ADD_COUNTER(profile, "ParquetFilteredRowsByGroup", TUnit::UNIT); + _filtered_page_rows = ADD_COUNTER(profile, "ParquetFilteredRowsByPage", TUnit::UNIT); + _filtered_bytes = ADD_COUNTER(profile, "ParquetFilteredBytes", TUnit::BYTES); + _to_read_bytes = ADD_COUNTER(profile, "ParquetReadBytes", TUnit::BYTES); + } } ParquetReader::~ParquetReader() { @@ -37,16 +48,30 @@ ParquetReader::~ParquetReader() { } void ParquetReader::close() { - for (auto& conjuncts : _slot_conjuncts) { - conjuncts.second.clear(); + if (!_closed) { + if (_profile != nullptr) { + COUNTER_UPDATE(_filtered_row_groups, _statistics.filtered_row_groups); + COUNTER_UPDATE(_to_read_row_groups, _statistics.read_row_groups); + COUNTER_UPDATE(_filtered_group_rows, _statistics.filtered_group_rows); + COUNTER_UPDATE(_filtered_page_rows, _statistics.filtered_page_rows); + COUNTER_UPDATE(_filtered_bytes, _statistics.filtered_bytes); + COUNTER_UPDATE(_to_read_bytes, _statistics.read_bytes); + } + _closed = true; } - _file_reader->close(); - delete _file_reader; } Status ParquetReader::init_reader(std::vector& conjunct_ctxs) { - _file_reader->open(); - RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata)); + if (_file_reader == nullptr) { + RETURN_IF_ERROR(FileFactory::create_file_reader( + _profile, _scan_params, _scan_range, _file_reader, + config::remote_storage_read_buffer_mb * 1024 * 1024)); + } + RETURN_IF_ERROR(_file_reader->open()); + if (_file_reader->size() == 0) { + return Status::EndOfFile("Empty Parquet File"); + } + RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata)); _t_metadata = &_file_metadata->to_thrift(); _total_groups = _t_metadata->row_groups.size(); if (_total_groups == 0) { @@ -54,7 +79,6 @@ Status ParquetReader::init_reader(std::vector& conjunct_ctxs) { } auto schema_desc = _file_metadata->schema(); for (int i = 0; i < schema_desc.size(); ++i) { - VLOG_DEBUG << schema_desc.debug_string(); // Get the Column Reader for the boolean column _map_column.emplace(schema_desc.get_column(i)->name, i); } @@ -124,10 +148,13 @@ Status ParquetReader::_init_row_group_readers(const std::vector& c for (auto row_group_id : _read_row_groups) { auto& row_group = _t_metadata->row_groups[row_group_id]; std::shared_ptr row_group_reader; - row_group_reader.reset( - new RowGroupReader(_file_reader, _read_columns, row_group_id, row_group, _ctz)); + row_group_reader.reset(new RowGroupReader(_file_reader.get(), _read_columns, row_group_id, + row_group, _ctz)); std::vector candidate_row_ranges; RETURN_IF_ERROR(_process_page_index(row_group, candidate_row_ranges)); + if (candidate_row_ranges.empty()) { + _statistics.read_rows += row_group.num_rows; + } RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(), candidate_row_ranges, _col_offsets)); _row_group_readers.emplace_back(row_group_reader); @@ -182,8 +209,20 @@ Status ParquetReader::_filter_row_groups() { } bool filter_group = false; RETURN_IF_ERROR(_process_row_group_filter(row_group, &filter_group)); + int64_t group_size = 0; // only calculate the needed columns + for (auto& parquet_col_id : _include_column_ids) { + if (row_group.columns[parquet_col_id].__isset.meta_data) { + group_size += row_group.columns[parquet_col_id].meta_data.total_compressed_size; + } + } if (!filter_group) { _read_row_groups.emplace_back(row_group_idx); + _statistics.read_row_groups++; + _statistics.read_bytes += group_size; + } else { + _statistics.filtered_row_groups++; + _statistics.filtered_bytes += group_size; + _statistics.filtered_group_rows += row_group.num_rows; } } return Status::OK(); @@ -260,6 +299,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, std::tie(rhs.first_row, rhs.last_row); }); int skip_end = 0; + int64_t read_rows = 0; for (auto& skip_range : skipped_row_ranges) { if (skip_end >= skip_range.first_row) { if (skip_end < skip_range.last_row) { @@ -267,13 +307,18 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, } } else { // read row with candidate ranges rather than skipped ranges - candidate_row_ranges.push_back({skip_end, skip_range.first_row}); + candidate_row_ranges.emplace_back(skip_end, skip_range.first_row); + read_rows += skip_range.first_row - skip_end; skip_end = skip_range.last_row; } } + DCHECK_LE(skip_end, row_group.num_rows); if (skip_end != row_group.num_rows) { - candidate_row_ranges.push_back({skip_end, row_group.num_rows}); + candidate_row_ranges.emplace_back(skip_end, row_group.num_rows); + read_rows += row_group.num_rows - skip_end; } + _statistics.read_rows += read_rows; + _statistics.filtered_page_rows += row_group.num_rows - read_rows; return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 6994b8ebaa..098aa154e8 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -35,14 +35,16 @@ namespace doris::vectorized { -// struct Statistics { -// int32_t filtered_row_groups = 0; -// int32_t total_groups = 0; -// int64_t filtered_rows = 0; -// int64_t total_rows = 0; -// int64_t filtered_total_bytes = 0; -// int64_t total_bytes = 0; -// }; +struct ParquetStatistics { + int32_t filtered_row_groups = 0; + int32_t read_row_groups = 0; + int64_t filtered_group_rows = 0; + int64_t filtered_page_rows = 0; + int64_t read_rows = 0; + int64_t filtered_bytes = 0; + int64_t read_bytes = 0; +}; + class RowGroupReader; class PageIndex; @@ -68,23 +70,26 @@ private: class ParquetReader : public GenericReader { public: - ParquetReader(FileReader* file_reader, const std::vector& column_names, - size_t batch_size, int64_t range_start_offset, int64_t range_size, - cctz::time_zone* ctz); + ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, const std::vector& column_names, + size_t batch_size, cctz::time_zone* ctz); virtual ~ParquetReader(); + // for test + void set_file_reader(FileReader* file_reader) { _file_reader.reset(file_reader); } Status init_reader(std::vector& conjunct_ctxs); Status get_next_block(Block* block, bool* eof) override; - // std::shared_ptr& statistics() { return _statistics; } void close(); int64_t size() const { return _file_reader->size(); } std::unordered_map get_name_to_type() override; + ParquetStatistics& statistics() { return _statistics; } + private: bool _next_row_group_reader(); Status _init_read_columns(); @@ -114,13 +119,16 @@ private: bool& need_filter); private: - FileReader* _file_reader; + RuntimeProfile* _profile; + const TFileScanRangeParams& _scan_params; + const TFileRangeDesc& _scan_range; + std::unique_ptr _file_reader = nullptr; + std::shared_ptr _file_metadata; const tparquet::FileMetaData* _t_metadata; std::list> _row_group_readers; std::shared_ptr _current_group_reader; - int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file - // std::shared_ptr _statistics; + int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file std::map _map_column; // column-name <---> column-index std::unordered_map> _slot_conjuncts; std::vector _include_column_ids; // columns that need to get from file @@ -134,5 +142,16 @@ private: std::unordered_map _col_offsets; const std::vector _column_names; + + ParquetStatistics _statistics; + bool _closed = false; + + // parquet profile + RuntimeProfile::Counter* _filtered_row_groups; + RuntimeProfile::Counter* _to_read_row_groups; + RuntimeProfile::Counter* _filtered_group_rows; + RuntimeProfile::Counter* _filtered_page_rows; + RuntimeProfile::Counter* _filtered_bytes; + RuntimeProfile::Counter* _to_read_bytes; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index de32d46e31..985676eb48 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -330,36 +330,32 @@ Status VFileScanner::_get_next_reader() { return Status::OK(); } const TFileRangeDesc& range = _ranges[_next_range++]; - std::unique_ptr file_reader; - - RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, - range, file_reader)); - RETURN_IF_ERROR(file_reader->open()); - if (file_reader->size() == 0) { - file_reader->close(); - continue; - } std::vector column_names; switch (_params.format_type) { - case TFileFormatType::FORMAT_PARQUET: + case TFileFormatType::FORMAT_PARQUET: { for (int i = 0; i < _file_slot_descs.size(); i++) { column_names.push_back(_file_slot_descs[i]->col_name()); } - _cur_reader = new ParquetReader(file_reader.release(), column_names, - _state->query_options().batch_size, range.start_offset, - range.size, + _cur_reader = new ParquetReader(_profile, _params, range, column_names, + _state->query_options().batch_size, const_cast(&_state->timezone_obj())); - RETURN_IF_ERROR(((ParquetReader*)_cur_reader)->init_reader(_conjunct_ctxs)); - break; + Status status = ((ParquetReader*)_cur_reader)->init_reader(_conjunct_ctxs); + if (status.ok()) { + _cur_reader_eof = false; + return status; + } else if (status.is_end_of_file()) { + continue; + } else { + return status; + } + } default: std::stringstream error_msg; error_msg << "Not supported file format " << _params.format_type; return Status::InternalError(error_msg.str()); } - - _cur_reader_eof = false; - return Status::OK(); } + return Status::OK(); } Status VFileScanner::_init_expr_ctxes() { diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index aa6d203265..4bad47795d 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -52,8 +52,6 @@ public: protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override; - void _init_profiles(RuntimeProfile* profile); - Status _fill_columns_from_path(); Status _get_next_reader(); @@ -87,8 +85,6 @@ protected: // Profile RuntimeProfile* _profile; - RuntimeProfile::Counter* _rows_read_counter; - RuntimeProfile::Counter* _read_timer; ScannerCounter _counter; bool _scanner_eof = false; diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp b/be/test/vec/exec/parquet/parquet_reader_test.cpp index cc9e0ca134..23bf6b353f 100644 --- a/be/test/vec/exec/parquet/parquet_reader_test.cpp +++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp @@ -100,7 +100,14 @@ TEST_F(ParquetReaderTest, normal) { for (int i = 0; i < slot_descs.size(); i++) { column_names.push_back(slot_descs[i]->col_name()); } - auto p_reader = new ParquetReader(reader, column_names, 1024, 0, 1000, &ctz); + TFileScanRangeParams scan_params; + TFileRangeDesc scan_range; + { + scan_range.start_offset = 0; + scan_range.size = 1000; + } + auto p_reader = new ParquetReader(nullptr, scan_params, scan_range, column_names, 992, &ctz); + p_reader->set_file_reader(reader); RuntimeState runtime_state((TQueryGlobals())); runtime_state.set_desc_tbl(desc_tbl); runtime_state.init_instance_mem_tracker();