diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 9ec1235be1..b70beec687 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -345,8 +345,12 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_ } IColumn::Filter result_filter(block->rows(), 1); bool can_filter_all = false; - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts( - _filter_conjuncts, &filters, block, &result_filter, &can_filter_all)); + + { + SCOPED_RAW_TIMER(&_predicate_filter_time); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts( + _filter_conjuncts, &filters, block, &result_filter, &can_filter_all)); + } if (can_filter_all) { for (auto& col : columns_to_filter) { @@ -361,6 +365,7 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_ Block::filter_block_internal(block, columns_to_filter, result_filter)); if (!_not_single_slot_filter_conjuncts.empty()) { _convert_dict_cols_to_string_cols(block); + SCOPED_RAW_TIMER(&_predicate_filter_time); RETURN_IF_CATCH_EXCEPTION( RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( _not_single_slot_filter_conjuncts, nullptr, block, @@ -450,7 +455,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re columns_to_filter[i] = i; } IColumn::Filter result_filter; - while (true) { + while (!_state->is_cancelled()) { // read predicate columns pre_read_rows = 0; pre_eof = false; @@ -485,8 +490,12 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re for (auto& conjunct : _filter_conjuncts) { filter_contexts.emplace_back(conjunct); } - RETURN_IF_ERROR(VExprContext::execute_conjuncts(filter_contexts, &filters, block, - &result_filter, &can_filter_all)); + + { + SCOPED_RAW_TIMER(&_predicate_filter_time); + RETURN_IF_ERROR(VExprContext::execute_conjuncts(filter_contexts, &filters, block, + &result_filter, &can_filter_all)); + } if (_lazy_read_ctx.resize_first_column) { // We have to clean the first column to insert right data. @@ -523,6 +532,10 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re break; } } + if (_state->is_cancelled()) { + return Status::Cancelled("cancelled"); + } + if (select_vector_ptr == nullptr) { DCHECK_EQ(pre_read_rows + _cached_filtered_rows, 0); *read_rows = 0; @@ -584,9 +597,13 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns)); RETURN_IF_ERROR(_fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns)); if (!_not_single_slot_filter_conjuncts.empty()) { - RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( - _not_single_slot_filter_conjuncts, nullptr, block, columns_to_filter, - origin_column_num))); + { + SCOPED_RAW_TIMER(&_predicate_filter_time); + RETURN_IF_CATCH_EXCEPTION( + RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( + _not_single_slot_filter_conjuncts, nullptr, block, columns_to_filter, + origin_column_num))); + } } return Status::OK(); } @@ -844,8 +861,11 @@ Status RowGroupReader::_rewrite_dict_predicates() { // The following process may be tricky and time-consuming, but we have no other way. temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size); } - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts_and_filter_block( - ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep)); + { + SCOPED_RAW_TIMER(&_predicate_filter_time); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts_and_filter_block( + ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep)); + } if (dict_pos != 0) { // We have to clean the first column to insert right data. temp_block.get_by_position(0).column->assume_mutable()->clear(); diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index d9f7f2dbf3..a889c1774e 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -157,6 +157,7 @@ public: const std::unordered_map* slot_id_to_filter_conjuncts); Status next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof); int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; } + int64_t predicate_filter_time() const { return _predicate_filter_time; } ParquetColumnReader::Statistics statistics(); void set_remaining_rows(int64_t rows) { _remaining_rows = rows; } @@ -211,6 +212,7 @@ private: const LazyReadContext& _lazy_read_ctx; int64_t _lazy_read_filtered_rows = 0; + int64_t _predicate_filter_time = 0; // If continuous batches are skipped, we can cache them to skip a whole page size_t _cached_filtered_rows = 0; std::unique_ptr _pos_delete_filter_ptr; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 84c572a3a2..32391d1f0d 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -181,6 +181,8 @@ void ParquetReader::_init_profile() { _profile, "SkipPageHeaderNum", TUnit::UNIT, parquet_profile, 1); _parquet_profile.parse_page_header_num = ADD_CHILD_COUNTER_WITH_LEVEL( _profile, "ParsePageHeaderNum", TUnit::UNIT, parquet_profile, 1); + _parquet_profile.predicate_filter_time = + ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PredicateFilterTime", parquet_profile, 1); } } @@ -595,6 +597,7 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) auto column_st = _current_group_reader->statistics(); _column_statistics.merge(column_st); _statistics.lazy_read_filtered_rows += _current_group_reader->lazy_read_filtered_rows(); + _statistics.predicate_filter_time += _current_group_reader->predicate_filter_time(); if (_read_row_groups.size() == 0) { *eof = true; } else { @@ -1042,6 +1045,7 @@ void ParquetReader::_collect_profile() { COUNTER_UPDATE(_parquet_profile.skip_page_header_num, _column_statistics.skip_page_header_num); COUNTER_UPDATE(_parquet_profile.parse_page_header_num, _column_statistics.parse_page_header_num); + COUNTER_UPDATE(_parquet_profile.predicate_filter_time, _statistics.predicate_filter_time); COUNTER_UPDATE(_parquet_profile.file_read_time, _column_statistics.read_time); COUNTER_UPDATE(_parquet_profile.file_read_calls, _column_statistics.read_calls); COUNTER_UPDATE(_parquet_profile.file_meta_read_calls, _column_statistics.meta_read_calls); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 9691e596b7..1d70f9ab5d 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -91,6 +91,7 @@ public: int64_t page_index_filter_time = 0; int64_t read_page_index_time = 0; int64_t parse_page_index_time = 0; + int64_t predicate_filter_time = 0; }; ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, @@ -188,6 +189,7 @@ private: RuntimeProfile::Counter* decode_null_map_time = nullptr; RuntimeProfile::Counter* skip_page_header_num = nullptr; RuntimeProfile::Counter* parse_page_header_num = nullptr; + RuntimeProfile::Counter* predicate_filter_time = nullptr; }; Status _open_file();