[Enhancement](multi-catalog) Add PredicateFilterTime, DictFilterRewriteTime, LazyReadFilteredRows profile metrics in parquet orc profiles. (#52815)

### What problem does this PR solve?

Problem Summary:

### Release note

Cherry-pick #51248

### Check List (For Author)

- Test <!-- At least one of them must be included. -->
    - [ ] Regression test
    - [ ] Unit Test
    - [ ] Manual test (add detailed scripts or steps below)
    - [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
        - [ ] Previous test can cover this change.
        - [ ] No code files have been changed.
        - [ ] Other reason <!-- Add your reason?  -->

- Behavior changed:
    - [ ] No.
    - [ ] Yes. <!-- Explain the behavior change -->

- Does this need documentation?
    - [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->

### Check List (For Reviewer who merge this PR)

- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
This commit is contained in:
Qi Chen
2025-07-16 09:07:38 +08:00
committed by GitHub
parent 412e5746a7
commit 4b261d43b6
6 changed files with 170 additions and 137 deletions

View File

@ -199,7 +199,9 @@ void OrcReader::_collect_profile_before_close() {
COUNTER_UPDATE(_orc_profile.set_fill_column_time, _statistics.set_fill_column_time);
COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time);
COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time);
COUNTER_UPDATE(_orc_profile.filter_block_time, _statistics.filter_block_time);
COUNTER_UPDATE(_orc_profile.predicate_filter_time, _statistics.predicate_filter_time);
COUNTER_UPDATE(_orc_profile.dict_filter_rewrite_time, _statistics.dict_filter_rewrite_time);
COUNTER_UPDATE(_orc_profile.lazy_read_filtered_rows, _statistics.lazy_read_filtered_rows);
if (_file_input_stream != nullptr) {
_file_input_stream->collect_profile_before_close();
@ -233,8 +235,12 @@ void OrcReader::_init_profile() {
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeValueTime", orc_profile, 1);
_orc_profile.decode_null_map_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeNullMapTime", orc_profile, 1);
_orc_profile.filter_block_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "FilterBlockTime", orc_profile, 1);
_orc_profile.predicate_filter_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PredicateFilterTime", orc_profile, 1);
_orc_profile.dict_filter_rewrite_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DictFilterRewriteTime", orc_profile, 1);
_orc_profile.lazy_read_filtered_rows =
ADD_COUNTER_WITH_LEVEL(_profile, "FilteredRowsByLazyRead", TUnit::UNIT, 1);
}
}
@ -1713,15 +1719,18 @@ Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof
*read_rows = 0;
return Status::OK();
}
_execute_filter_position_delete_rowids(*_filter);
{
SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, *_filter));
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
_execute_filter_position_delete_rowids(*_filter);
{
SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, *_filter));
}
Block::erase_useless_column(block, column_to_keep);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
*read_rows = block->rows();
}
Block::erase_useless_column(block, column_to_keep);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
*read_rows = block->rows();
} else {
uint64_t rr;
SCOPED_RAW_TIMER(&_statistics.column_read_time);
@ -1798,63 +1807,60 @@ Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof
return Status::OK();
}
_build_delete_row_filter(block, _batch->numElements);
{
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
_build_delete_row_filter(block, _batch->numElements);
std::vector<uint32_t> columns_to_filter;
int column_to_keep = block->columns();
columns_to_filter.resize(column_to_keep);
for (uint32_t i = 0; i < column_to_keep; ++i) {
columns_to_filter[i] = i;
}
if (!_lazy_read_ctx.conjuncts.empty()) {
VExprContextSPtrs filter_conjuncts;
filter_conjuncts.insert(filter_conjuncts.end(), _filter_conjuncts.begin(),
_filter_conjuncts.end());
for (auto& conjunct : _dict_filter_conjuncts) {
filter_conjuncts.emplace_back(conjunct);
std::vector<uint32_t> columns_to_filter;
int column_to_keep = block->columns();
columns_to_filter.resize(column_to_keep);
for (uint32_t i = 0; i < column_to_keep; ++i) {
columns_to_filter[i] = i;
}
for (auto& conjunct : _non_dict_filter_conjuncts) {
filter_conjuncts.emplace_back(conjunct);
}
std::vector<IColumn::Filter*> filters;
if (_delete_rows_filter_ptr) {
filters.push_back(_delete_rows_filter_ptr.get());
}
IColumn::Filter result_filter(block->rows(), 1);
bool can_filter_all = false;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
if (can_filter_all) {
for (auto& col : columns_to_filter) {
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
if (!_lazy_read_ctx.conjuncts.empty()) {
VExprContextSPtrs filter_conjuncts;
filter_conjuncts.insert(filter_conjuncts.end(), _filter_conjuncts.begin(),
_filter_conjuncts.end());
for (auto& conjunct : _dict_filter_conjuncts) {
filter_conjuncts.emplace_back(conjunct);
}
Block::erase_useless_column(block, column_to_keep);
return _convert_dict_cols_to_string_cols(block, &batch_vec);
}
_execute_filter_position_delete_rowids(result_filter);
{
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
for (auto& conjunct : _non_dict_filter_conjuncts) {
filter_conjuncts.emplace_back(conjunct);
}
std::vector<IColumn::Filter*> filters;
if (_delete_rows_filter_ptr) {
filters.push_back(_delete_rows_filter_ptr.get());
}
IColumn::Filter result_filter(block->rows(), 1);
bool can_filter_all = false;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
if (can_filter_all) {
for (auto& col : columns_to_filter) {
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
}
Block::erase_useless_column(block, column_to_keep);
return _convert_dict_cols_to_string_cols(block, &batch_vec);
}
_execute_filter_position_delete_rowids(result_filter);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, result_filter));
}
Block::erase_useless_column(block, column_to_keep);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
} else {
if (_delete_rows_filter_ptr) {
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter,
(*_delete_rows_filter_ptr)));
Block::erase_useless_column(block, column_to_keep);
} else {
std::unique_ptr<IColumn::Filter> filter(new IColumn::Filter(block->rows(), 1));
_execute_filter_position_delete_rowids(*filter);
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, (*filter)));
if (_delete_rows_filter_ptr) {
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
block, columns_to_filter, (*_delete_rows_filter_ptr)));
} else {
std::unique_ptr<IColumn::Filter> filter(new IColumn::Filter(block->rows(), 1));
_execute_filter_position_delete_rowids(*filter);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, (*filter)));
}
Block::erase_useless_column(block, column_to_keep);
}
Block::erase_useless_column(block, column_to_keep);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
}
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
*read_rows = block->rows();
}
return Status::OK();
@ -1898,6 +1904,7 @@ void OrcReader::_build_delete_row_filter(const Block* block, size_t rows) {
}
Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg) {
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
Block* block = (Block*)arg;
size_t origin_column_num = block->columns();
@ -1998,6 +2005,7 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
sel[new_size] = i;
new_size += result_filter_data[i] ? 1 : 0;
}
_statistics.lazy_read_filtered_rows += static_cast<int64_t>(size - new_size);
data.numElements = new_size;
return Status::OK();
}
@ -2071,6 +2079,7 @@ bool OrcReader::_can_filter_by_dict(int slot_id) {
Status OrcReader::on_string_dicts_loaded(
std::unordered_map<std::string, orc::StringDictionary*>& file_column_name_to_dict_map,
bool* is_stripe_filtered) {
SCOPED_RAW_TIMER(&_statistics.dict_filter_rewrite_time);
*is_stripe_filtered = false;
for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) {
std::string& dict_filter_col_name = it->first;

View File

@ -128,7 +128,9 @@ public:
int64_t set_fill_column_time = 0;
int64_t decode_value_time = 0;
int64_t decode_null_map_time = 0;
int64_t filter_block_time = 0;
int64_t predicate_filter_time = 0;
int64_t dict_filter_rewrite_time = 0;
int64_t lazy_read_filtered_rows = 0;
};
OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params,
@ -227,6 +229,9 @@ private:
RuntimeProfile::Counter* decode_value_time = nullptr;
RuntimeProfile::Counter* decode_null_map_time = nullptr;
RuntimeProfile::Counter* filter_block_time = nullptr;
RuntimeProfile::Counter* predicate_filter_time = nullptr;
RuntimeProfile::Counter* dict_filter_rewrite_time = nullptr;
RuntimeProfile::Counter* lazy_read_filtered_rows = nullptr;
};
class ORCFilterImpl : public orc::ORCFilter {

View File

@ -319,45 +319,46 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
*read_rows = block->rows();
return Status::OK();
}
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
RETURN_IF_ERROR(_build_pos_delete_filter(*read_rows));
RETURN_IF_ERROR(_build_pos_delete_filter(*read_rows));
std::vector<uint32_t> columns_to_filter;
int column_to_keep = block->columns();
columns_to_filter.resize(column_to_keep);
for (uint32_t i = 0; i < column_to_keep; ++i) {
columns_to_filter[i] = i;
}
if (!_lazy_read_ctx.conjuncts.empty()) {
std::vector<IColumn::Filter*> filters;
if (_position_delete_ctx.has_filter) {
filters.push_back(_pos_delete_filter_ptr.get());
std::vector<uint32_t> columns_to_filter;
int column_to_keep = block->columns();
columns_to_filter.resize(column_to_keep);
for (uint32_t i = 0; i < column_to_keep; ++i) {
columns_to_filter[i] = i;
}
IColumn::Filter result_filter(block->rows(), 1);
bool can_filter_all = false;
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
_filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
}
if (can_filter_all) {
for (auto& col : columns_to_filter) {
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
if (!_lazy_read_ctx.conjuncts.empty()) {
std::vector<IColumn::Filter*> filters;
if (_position_delete_ctx.has_filter) {
filters.push_back(_pos_delete_filter_ptr.get());
}
Block::erase_useless_column(block, column_to_keep);
_convert_dict_cols_to_string_cols(block);
return Status::OK();
}
IColumn::Filter result_filter(block->rows(), 1);
bool can_filter_all = false;
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, result_filter));
Block::erase_useless_column(block, column_to_keep);
{
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
_filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
}
if (can_filter_all) {
for (auto& col : columns_to_filter) {
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
}
Block::erase_useless_column(block, column_to_keep);
_convert_dict_cols_to_string_cols(block);
return Status::OK();
}
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, result_filter));
Block::erase_useless_column(block, column_to_keep);
} else {
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter)));
}
_convert_dict_cols_to_string_cols(block);
} else {
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter)));
}
*read_rows = block->rows();
return Status::OK();
@ -456,49 +457,56 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
RETURN_IF_ERROR(_build_pos_delete_filter(pre_read_rows));
// generate filter vector
if (_lazy_read_ctx.resize_first_column) {
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we have no other way.
block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows);
}
result_filter.assign(pre_read_rows, static_cast<unsigned char>(1));
bool can_filter_all = false;
std::vector<IColumn::Filter*> filters;
if (_position_delete_ctx.has_filter) {
filters.push_back(_pos_delete_filter_ptr.get());
}
VExprContextSPtrs filter_contexts;
for (auto& conjunct : _filter_conjuncts) {
filter_contexts.emplace_back(conjunct);
}
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
RETURN_IF_ERROR(VExprContext::execute_conjuncts(filter_contexts, &filters, block,
&result_filter, &can_filter_all));
}
if (_lazy_read_ctx.resize_first_column) {
// We have to clean the first column to insert right data.
block->get_by_position(0).column->assume_mutable()->clear();
// generate filter vector
if (_lazy_read_ctx.resize_first_column) {
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we have no other way.
block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows);
}
result_filter.assign(pre_read_rows, static_cast<unsigned char>(1));
std::vector<IColumn::Filter*> filters;
if (_position_delete_ctx.has_filter) {
filters.push_back(_pos_delete_filter_ptr.get());
}
VExprContextSPtrs filter_contexts;
for (auto& conjunct : _filter_conjuncts) {
filter_contexts.emplace_back(conjunct);
}
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
RETURN_IF_ERROR(VExprContext::execute_conjuncts(filter_contexts, &filters, block,
&result_filter, &can_filter_all));
}
if (_lazy_read_ctx.resize_first_column) {
// We have to clean the first column to insert right data.
block->get_by_position(0).column->assume_mutable()->clear();
}
}
const uint8_t* __restrict filter_map = result_filter.data();
select_vector_ptr.reset(new ColumnSelectVector(filter_map, pre_read_rows, can_filter_all));
if (select_vector_ptr->filter_all()) {
for (auto& col : _lazy_read_ctx.predicate_columns.first) {
// clean block to read predicate columns
block->get_by_name(col).column->assume_mutable()->clear();
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
for (auto& col : _lazy_read_ctx.predicate_columns.first) {
// clean block to read predicate columns
block->get_by_name(col).column->assume_mutable()->clear();
}
for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
block->get_by_name(col.first).column->assume_mutable()->clear();
}
for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
block->get_by_name(col.first).column->assume_mutable()->clear();
}
Block::erase_useless_column(block, origin_column_num);
}
for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
block->get_by_name(col.first).column->assume_mutable()->clear();
}
for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
block->get_by_name(col.first).column->assume_mutable()->clear();
}
Block::erase_useless_column(block, origin_column_num);
if (!pre_eof) {
// If continuous batches are skipped, we can cache them to skip a whole page
@ -551,18 +559,21 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
// we set pre_read_rows as batch_size for lazy read columns, so pre_eof != lazy_eof
// filter data in predicate columns, and remove filter column
if (select_vector.has_filter()) {
if (block->columns() == origin_column_num) {
// the whole row group has been filtered by _lazy_read_ctx.vconjunct_ctx, and batch_eof is
// generated from next batch, so the filter column is removed ahead.
DCHECK_EQ(block->rows(), 0);
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
if (select_vector.has_filter()) {
if (block->columns() == origin_column_num) {
// the whole row group has been filtered by _lazy_read_ctx.vconjunct_ctx, and batch_eof is
// generated from next batch, so the filter column is removed ahead.
DCHECK_EQ(block->rows(), 0);
} else {
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
block, _lazy_read_ctx.all_predicate_col_ids, result_filter));
Block::erase_useless_column(block, origin_column_num);
}
} else {
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
block, _lazy_read_ctx.all_predicate_col_ids, result_filter));
Block::erase_useless_column(block, origin_column_num);
}
} else {
Block::erase_useless_column(block, origin_column_num);
}
_convert_dict_cols_to_string_cols(block);
@ -778,6 +789,7 @@ Status RowGroupReader::_filter_block(Block* block, int column_to_keep,
}
Status RowGroupReader::_rewrite_dict_predicates() {
SCOPED_RAW_TIMER(&_dict_filter_rewrite_time);
for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) {
std::string& dict_filter_col_name = it->first;
int slot_id = it->second;
@ -843,7 +855,6 @@ Status RowGroupReader::_rewrite_dict_predicates() {
IColumn::Filter result_filter(temp_block.rows(), 1);
bool can_filter_all;
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
RETURN_IF_ERROR(VExprContext::execute_conjuncts(ctxs, nullptr, &temp_block,
&result_filter, &can_filter_all));
}

View File

@ -158,6 +158,7 @@ public:
Status next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof);
int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; }
int64_t predicate_filter_time() const { return _predicate_filter_time; }
int64_t dict_filter_rewrite_time() const { return _dict_filter_rewrite_time; }
ParquetColumnReader::Statistics statistics();
void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
@ -213,6 +214,7 @@ private:
const LazyReadContext& _lazy_read_ctx;
int64_t _lazy_read_filtered_rows = 0;
int64_t _predicate_filter_time = 0;
int64_t _dict_filter_rewrite_time = 0;
// If continuous batches are skipped, we can cache them to skip a whole page
size_t _cached_filtered_rows = 0;
std::unique_ptr<IColumn::Filter> _pos_delete_filter_ptr;

View File

@ -183,6 +183,8 @@ void ParquetReader::_init_profile() {
_profile, "ParsePageHeaderNum", TUnit::UNIT, parquet_profile, 1);
_parquet_profile.predicate_filter_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PredicateFilterTime", parquet_profile, 1);
_parquet_profile.dict_filter_rewrite_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DictFilterRewriteTime", parquet_profile, 1);
}
}
@ -590,6 +592,7 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof)
_column_statistics.merge(column_st);
_statistics.lazy_read_filtered_rows += _current_group_reader->lazy_read_filtered_rows();
_statistics.predicate_filter_time += _current_group_reader->predicate_filter_time();
_statistics.dict_filter_rewrite_time += _current_group_reader->dict_filter_rewrite_time();
if (_read_row_groups.size() == 0) {
*eof = true;
} else {
@ -1034,6 +1037,7 @@ void ParquetReader::_collect_profile() {
COUNTER_UPDATE(_parquet_profile.parse_page_header_num,
_column_statistics.parse_page_header_num);
COUNTER_UPDATE(_parquet_profile.predicate_filter_time, _statistics.predicate_filter_time);
COUNTER_UPDATE(_parquet_profile.dict_filter_rewrite_time, _statistics.dict_filter_rewrite_time);
COUNTER_UPDATE(_parquet_profile.file_read_time, _column_statistics.read_time);
COUNTER_UPDATE(_parquet_profile.file_read_calls, _column_statistics.read_calls);
COUNTER_UPDATE(_parquet_profile.file_meta_read_calls, _column_statistics.meta_read_calls);

View File

@ -92,6 +92,7 @@ public:
int64_t read_page_index_time = 0;
int64_t parse_page_index_time = 0;
int64_t predicate_filter_time = 0;
int64_t dict_filter_rewrite_time = 0;
};
ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
@ -190,6 +191,7 @@ private:
RuntimeProfile::Counter* skip_page_header_num = nullptr;
RuntimeProfile::Counter* parse_page_header_num = nullptr;
RuntimeProfile::Counter* predicate_filter_time = nullptr;
RuntimeProfile::Counter* dict_filter_rewrite_time = nullptr;
};
Status _open_file();