[feature-wip](parquet-reader) add parquet reader profile (#12797)

Add profile for parquet reader. New counters:
- ParquetFilteredGroups: Filtered row groups by `RowGroup` min-max statistics
- ParquetReadGroups: The number of row groups to read
- ParquetFilteredRowsByGroup: The number of filtered rows by `RowGroup` min-max statistics
- ParquetFilteredRowsByPage: The number of filtered rows by page min-max statistics
- ParquetFilteredBytes: The filtered bytes by `RowGroup` min-max statistics
- ParquetReadBytes: The total bytes in `ParquetReadGroups`, may be further filtered If a page is skipped as a whole
## Result
```
┌──────────────────────────────────────────────────────┐
│[0: VFILE_SCAN_NODE]                                  │
│(Active: 1s29ms, non-child: 96.42)                    │
│  - Counters:                                         │
│      - BytesRead: 0.00                               │
│      - FileReadCalls: 1.826K (1826)                  │
│      - FileReadTime: 510.627ms                       │
│      - FileRemoteReadBytes: 65.23 MB                 │
│      - FileRemoteReadCalls: 1.146K (1146)            │
│      - FileRemoteReadRate: 128.29331970214844 MB/sec │
│      - FileRemoteReadTime: 508.469ms                 │
│      - NumDiskAccess: 0                              │
│      - NumScanners: 1                                │
│      - ParquetFilteredBytes: 0.00                    │
│      - ParquetFilteredGroups: 0                      │
│      - ParquetFilteredRowsByGroup: 0                 │
│      - ParquetFilteredRowsByPage: 6.600003M (6600003)│
│      - ParquetReadBytes: 2.13 GB                     │
│      - ParquetReadGroups: 20                         │
│      - PeakMemoryUsage: 0.00                         │
│      - PredicateFilteredRows: 3.399797M (3399797)    │
│      - PredicateFilteredTime: 133.302ms              │
│      - RowsRead: 3.399997M (3399997)                 │
│      - RowsReturned: 200                             │
│      - RowsReturnedRate: 194                         │
│      - TotalRawReadTime(*): 726.566ms                │
│      - TotalReadThroughput: 0.0 /sec                 │
│      - WaitScannerTime: 1s27ms                       │
└──────────────────────────────────────────────────────┘
```
This commit is contained in:
Ashin Gau
2022-09-23 18:42:14 +08:00
committed by GitHub
parent f7e3ca29b5
commit 5bfdfac387
9 changed files with 165 additions and 63 deletions

View File

@ -186,3 +186,35 @@ doris::Status doris::FileFactory::create_file_reader(doris::ExecEnv* env, Runtim
return Status::OK();
}
doris::Status doris::FileFactory::create_file_reader(RuntimeProfile* profile,
const TFileScanRangeParams& params,
const TFileRangeDesc& range,
std::unique_ptr<FileReader>& file_reader,
int64_t buffer_size) {
doris::TFileType::type type = params.file_type;
FileReader* file_reader_ptr;
switch (type) {
case TFileType::FILE_LOCAL: {
file_reader_ptr = new LocalFileReader(range.path, range.start_offset);
break;
}
case TFileType::FILE_S3: {
file_reader_ptr = new S3Reader(params.properties, range.path, range.start_offset);
break;
}
case TFileType::FILE_HDFS: {
RETURN_IF_ERROR(HdfsReaderWriter::create_reader(params.hdfs_params, range.path,
range.start_offset, &file_reader_ptr));
break;
}
default:
return Status::InternalError("Unsupported File Reader Type: " + std::to_string(type));
}
if (buffer_size > 0) {
file_reader.reset(new BufferedReader(profile, file_reader_ptr, buffer_size));
} else {
file_reader.reset(file_reader_ptr);
}
return Status::OK();
}

View File

@ -58,6 +58,14 @@ public:
const TFileRangeDesc& range,
std::shared_ptr<FileReader>& file_reader);
/**
* Create FileReader. If buffer_size > 0, use BufferedReader to wrap the underlying FileReader;
* Otherwise, return the underlying FileReader directly.
*/
static Status create_file_reader(RuntimeProfile* profile, const TFileScanRangeParams& params,
const TFileRangeDesc& range,
std::unique_ptr<FileReader>& file_reader, int64_t buffer_size);
static TFileType::type convert_storage_type(TStorageBackendType::type type) {
switch (type) {
case TStorageBackendType::LOCAL:

View File

@ -78,9 +78,9 @@ Status ParquetFileHdfsScanner::_get_next_reader() {
for (int i = 0; i < _file_slot_descs.size(); i++) {
column_names.push_back(_file_slot_descs[i]->col_name());
}
_reader.reset(new ParquetReader(
file_reader.release(), column_names, _state->query_options().batch_size,
range.start_offset, range.size, const_cast<cctz::time_zone*>(&_state->timezone_obj())));
_reader.reset(new ParquetReader(_profile, _params, range, column_names,
_state->query_options().batch_size,
const_cast<cctz::time_zone*>(&_state->timezone_obj())));
Status status = _reader->init_reader(_conjunct_ctxs);
if (!status.ok()) {
if (status.is_end_of_file()) {

View File

@ -52,12 +52,9 @@ FileScanner::FileScanner(RuntimeState* state, RuntimeProfile* profile,
}
Status FileScanner::open() {
RETURN_IF_ERROR(_init_expr_ctxes());
_rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT);
_read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)");
return Status::OK();
return _init_expr_ctxes();
}
void FileScanner::reg_conjunct_ctxs(const TupleId& tupleId,
@ -127,7 +124,9 @@ void FileScanner::close() {
if (_vpre_filter_ctx_ptr) {
(*_vpre_filter_ctx_ptr)->close(_state);
}
COUNTER_UPDATE(_rows_read_counter, _read_row_counter);
if (_rows_read_counter) {
COUNTER_UPDATE(_rows_read_counter, _read_row_counter);
}
}
Status FileScanner::init_block(vectorized::Block* block) {

View File

@ -17,19 +17,30 @@
#include "vparquet_reader.h"
#include "io/file_factory.h"
#include "parquet_thrift_util.h"
namespace doris::vectorized {
ParquetReader::ParquetReader(FileReader* file_reader, const std::vector<std::string>& column_names,
size_t batch_size, int64_t range_start_offset, int64_t range_size,
ParquetReader::ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
const TFileRangeDesc& range,
const std::vector<std::string>& column_names, size_t batch_size,
cctz::time_zone* ctz)
: _file_reader(file_reader),
: _profile(profile),
_scan_params(params),
_scan_range(range),
_batch_size(batch_size),
_range_start_offset(range_start_offset),
_range_size(range_size),
_range_start_offset(range.start_offset),
_range_size(range.size),
_ctz(ctz),
_column_names(column_names) {
// _statistics = std::make_shared<Statistics>();
if (profile != nullptr) {
_filtered_row_groups = ADD_COUNTER(profile, "ParquetFilteredGroups", TUnit::UNIT);
_to_read_row_groups = ADD_COUNTER(profile, "ParquetReadGroups", TUnit::UNIT);
_filtered_group_rows = ADD_COUNTER(profile, "ParquetFilteredRowsByGroup", TUnit::UNIT);
_filtered_page_rows = ADD_COUNTER(profile, "ParquetFilteredRowsByPage", TUnit::UNIT);
_filtered_bytes = ADD_COUNTER(profile, "ParquetFilteredBytes", TUnit::BYTES);
_to_read_bytes = ADD_COUNTER(profile, "ParquetReadBytes", TUnit::BYTES);
}
}
ParquetReader::~ParquetReader() {
@ -37,16 +48,30 @@ ParquetReader::~ParquetReader() {
}
void ParquetReader::close() {
for (auto& conjuncts : _slot_conjuncts) {
conjuncts.second.clear();
if (!_closed) {
if (_profile != nullptr) {
COUNTER_UPDATE(_filtered_row_groups, _statistics.filtered_row_groups);
COUNTER_UPDATE(_to_read_row_groups, _statistics.read_row_groups);
COUNTER_UPDATE(_filtered_group_rows, _statistics.filtered_group_rows);
COUNTER_UPDATE(_filtered_page_rows, _statistics.filtered_page_rows);
COUNTER_UPDATE(_filtered_bytes, _statistics.filtered_bytes);
COUNTER_UPDATE(_to_read_bytes, _statistics.read_bytes);
}
_closed = true;
}
_file_reader->close();
delete _file_reader;
}
Status ParquetReader::init_reader(std::vector<ExprContext*>& conjunct_ctxs) {
_file_reader->open();
RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata));
if (_file_reader == nullptr) {
RETURN_IF_ERROR(FileFactory::create_file_reader(
_profile, _scan_params, _scan_range, _file_reader,
config::remote_storage_read_buffer_mb * 1024 * 1024));
}
RETURN_IF_ERROR(_file_reader->open());
if (_file_reader->size() == 0) {
return Status::EndOfFile("Empty Parquet File");
}
RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata));
_t_metadata = &_file_metadata->to_thrift();
_total_groups = _t_metadata->row_groups.size();
if (_total_groups == 0) {
@ -54,7 +79,6 @@ Status ParquetReader::init_reader(std::vector<ExprContext*>& conjunct_ctxs) {
}
auto schema_desc = _file_metadata->schema();
for (int i = 0; i < schema_desc.size(); ++i) {
VLOG_DEBUG << schema_desc.debug_string();
// Get the Column Reader for the boolean column
_map_column.emplace(schema_desc.get_column(i)->name, i);
}
@ -124,10 +148,13 @@ Status ParquetReader::_init_row_group_readers(const std::vector<ExprContext*>& c
for (auto row_group_id : _read_row_groups) {
auto& row_group = _t_metadata->row_groups[row_group_id];
std::shared_ptr<RowGroupReader> row_group_reader;
row_group_reader.reset(
new RowGroupReader(_file_reader, _read_columns, row_group_id, row_group, _ctz));
row_group_reader.reset(new RowGroupReader(_file_reader.get(), _read_columns, row_group_id,
row_group, _ctz));
std::vector<RowRange> candidate_row_ranges;
RETURN_IF_ERROR(_process_page_index(row_group, candidate_row_ranges));
if (candidate_row_ranges.empty()) {
_statistics.read_rows += row_group.num_rows;
}
RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(), candidate_row_ranges,
_col_offsets));
_row_group_readers.emplace_back(row_group_reader);
@ -182,8 +209,20 @@ Status ParquetReader::_filter_row_groups() {
}
bool filter_group = false;
RETURN_IF_ERROR(_process_row_group_filter(row_group, &filter_group));
int64_t group_size = 0; // only calculate the needed columns
for (auto& parquet_col_id : _include_column_ids) {
if (row_group.columns[parquet_col_id].__isset.meta_data) {
group_size += row_group.columns[parquet_col_id].meta_data.total_compressed_size;
}
}
if (!filter_group) {
_read_row_groups.emplace_back(row_group_idx);
_statistics.read_row_groups++;
_statistics.read_bytes += group_size;
} else {
_statistics.filtered_row_groups++;
_statistics.filtered_bytes += group_size;
_statistics.filtered_group_rows += row_group.num_rows;
}
}
return Status::OK();
@ -260,6 +299,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
std::tie(rhs.first_row, rhs.last_row);
});
int skip_end = 0;
int64_t read_rows = 0;
for (auto& skip_range : skipped_row_ranges) {
if (skip_end >= skip_range.first_row) {
if (skip_end < skip_range.last_row) {
@ -267,13 +307,18 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
}
} else {
// read row with candidate ranges rather than skipped ranges
candidate_row_ranges.push_back({skip_end, skip_range.first_row});
candidate_row_ranges.emplace_back(skip_end, skip_range.first_row);
read_rows += skip_range.first_row - skip_end;
skip_end = skip_range.last_row;
}
}
DCHECK_LE(skip_end, row_group.num_rows);
if (skip_end != row_group.num_rows) {
candidate_row_ranges.push_back({skip_end, row_group.num_rows});
candidate_row_ranges.emplace_back(skip_end, row_group.num_rows);
read_rows += row_group.num_rows - skip_end;
}
_statistics.read_rows += read_rows;
_statistics.filtered_page_rows += row_group.num_rows - read_rows;
return Status::OK();
}

View File

@ -35,14 +35,16 @@
namespace doris::vectorized {
// struct Statistics {
// int32_t filtered_row_groups = 0;
// int32_t total_groups = 0;
// int64_t filtered_rows = 0;
// int64_t total_rows = 0;
// int64_t filtered_total_bytes = 0;
// int64_t total_bytes = 0;
// };
struct ParquetStatistics {
int32_t filtered_row_groups = 0;
int32_t read_row_groups = 0;
int64_t filtered_group_rows = 0;
int64_t filtered_page_rows = 0;
int64_t read_rows = 0;
int64_t filtered_bytes = 0;
int64_t read_bytes = 0;
};
class RowGroupReader;
class PageIndex;
@ -68,23 +70,26 @@ private:
class ParquetReader : public GenericReader {
public:
ParquetReader(FileReader* file_reader, const std::vector<std::string>& column_names,
size_t batch_size, int64_t range_start_offset, int64_t range_size,
cctz::time_zone* ctz);
ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
const TFileRangeDesc& range, const std::vector<std::string>& column_names,
size_t batch_size, cctz::time_zone* ctz);
virtual ~ParquetReader();
// for test
void set_file_reader(FileReader* file_reader) { _file_reader.reset(file_reader); }
Status init_reader(std::vector<ExprContext*>& conjunct_ctxs);
Status get_next_block(Block* block, bool* eof) override;
// std::shared_ptr<Statistics>& statistics() { return _statistics; }
void close();
int64_t size() const { return _file_reader->size(); }
std::unordered_map<std::string, TypeDescriptor> get_name_to_type() override;
ParquetStatistics& statistics() { return _statistics; }
private:
bool _next_row_group_reader();
Status _init_read_columns();
@ -114,13 +119,16 @@ private:
bool& need_filter);
private:
FileReader* _file_reader;
RuntimeProfile* _profile;
const TFileScanRangeParams& _scan_params;
const TFileRangeDesc& _scan_range;
std::unique_ptr<FileReader> _file_reader = nullptr;
std::shared_ptr<FileMetaData> _file_metadata;
const tparquet::FileMetaData* _t_metadata;
std::list<std::shared_ptr<RowGroupReader>> _row_group_readers;
std::shared_ptr<RowGroupReader> _current_group_reader;
int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
// std::shared_ptr<Statistics> _statistics;
int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
std::map<std::string, int> _map_column; // column-name <---> column-index
std::unordered_map<int, std::vector<ExprContext*>> _slot_conjuncts;
std::vector<int> _include_column_ids; // columns that need to get from file
@ -134,5 +142,16 @@ private:
std::unordered_map<int, tparquet::OffsetIndex> _col_offsets;
const std::vector<std::string> _column_names;
ParquetStatistics _statistics;
bool _closed = false;
// parquet profile
RuntimeProfile::Counter* _filtered_row_groups;
RuntimeProfile::Counter* _to_read_row_groups;
RuntimeProfile::Counter* _filtered_group_rows;
RuntimeProfile::Counter* _filtered_page_rows;
RuntimeProfile::Counter* _filtered_bytes;
RuntimeProfile::Counter* _to_read_bytes;
};
} // namespace doris::vectorized

View File

@ -330,36 +330,32 @@ Status VFileScanner::_get_next_reader() {
return Status::OK();
}
const TFileRangeDesc& range = _ranges[_next_range++];
std::unique_ptr<FileReader> file_reader;
RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params,
range, file_reader));
RETURN_IF_ERROR(file_reader->open());
if (file_reader->size() == 0) {
file_reader->close();
continue;
}
std::vector<std::string> column_names;
switch (_params.format_type) {
case TFileFormatType::FORMAT_PARQUET:
case TFileFormatType::FORMAT_PARQUET: {
for (int i = 0; i < _file_slot_descs.size(); i++) {
column_names.push_back(_file_slot_descs[i]->col_name());
}
_cur_reader = new ParquetReader(file_reader.release(), column_names,
_state->query_options().batch_size, range.start_offset,
range.size,
_cur_reader = new ParquetReader(_profile, _params, range, column_names,
_state->query_options().batch_size,
const_cast<cctz::time_zone*>(&_state->timezone_obj()));
RETURN_IF_ERROR(((ParquetReader*)_cur_reader)->init_reader(_conjunct_ctxs));
break;
Status status = ((ParquetReader*)_cur_reader)->init_reader(_conjunct_ctxs);
if (status.ok()) {
_cur_reader_eof = false;
return status;
} else if (status.is_end_of_file()) {
continue;
} else {
return status;
}
}
default:
std::stringstream error_msg;
error_msg << "Not supported file format " << _params.format_type;
return Status::InternalError(error_msg.str());
}
_cur_reader_eof = false;
return Status::OK();
}
return Status::OK();
}
Status VFileScanner::_init_expr_ctxes() {

View File

@ -52,8 +52,6 @@ public:
protected:
Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) override;
void _init_profiles(RuntimeProfile* profile);
Status _fill_columns_from_path();
Status _get_next_reader();
@ -87,8 +85,6 @@ protected:
// Profile
RuntimeProfile* _profile;
RuntimeProfile::Counter* _rows_read_counter;
RuntimeProfile::Counter* _read_timer;
ScannerCounter _counter;
bool _scanner_eof = false;

View File

@ -100,7 +100,14 @@ TEST_F(ParquetReaderTest, normal) {
for (int i = 0; i < slot_descs.size(); i++) {
column_names.push_back(slot_descs[i]->col_name());
}
auto p_reader = new ParquetReader(reader, column_names, 1024, 0, 1000, &ctz);
TFileScanRangeParams scan_params;
TFileRangeDesc scan_range;
{
scan_range.start_offset = 0;
scan_range.size = 1000;
}
auto p_reader = new ParquetReader(nullptr, scan_params, scan_range, column_names, 992, &ctz);
p_reader->set_file_reader(reader);
RuntimeState runtime_state((TQueryGlobals()));
runtime_state.set_desc_tbl(desc_tbl);
runtime_state.init_instance_mem_tracker();