bp #40005
This commit is contained in:
@ -345,8 +345,12 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
|
||||
}
|
||||
IColumn::Filter result_filter(block->rows(), 1);
|
||||
bool can_filter_all = false;
|
||||
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
|
||||
_filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
|
||||
|
||||
{
|
||||
SCOPED_RAW_TIMER(&_predicate_filter_time);
|
||||
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
|
||||
_filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
|
||||
}
|
||||
|
||||
if (can_filter_all) {
|
||||
for (auto& col : columns_to_filter) {
|
||||
@ -361,6 +365,7 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
|
||||
Block::filter_block_internal(block, columns_to_filter, result_filter));
|
||||
if (!_not_single_slot_filter_conjuncts.empty()) {
|
||||
_convert_dict_cols_to_string_cols(block);
|
||||
SCOPED_RAW_TIMER(&_predicate_filter_time);
|
||||
RETURN_IF_CATCH_EXCEPTION(
|
||||
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
|
||||
_not_single_slot_filter_conjuncts, nullptr, block,
|
||||
@ -450,7 +455,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
|
||||
columns_to_filter[i] = i;
|
||||
}
|
||||
IColumn::Filter result_filter;
|
||||
while (true) {
|
||||
while (!_state->is_cancelled()) {
|
||||
// read predicate columns
|
||||
pre_read_rows = 0;
|
||||
pre_eof = false;
|
||||
@ -485,8 +490,12 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
|
||||
for (auto& conjunct : _filter_conjuncts) {
|
||||
filter_contexts.emplace_back(conjunct);
|
||||
}
|
||||
RETURN_IF_ERROR(VExprContext::execute_conjuncts(filter_contexts, &filters, block,
|
||||
&result_filter, &can_filter_all));
|
||||
|
||||
{
|
||||
SCOPED_RAW_TIMER(&_predicate_filter_time);
|
||||
RETURN_IF_ERROR(VExprContext::execute_conjuncts(filter_contexts, &filters, block,
|
||||
&result_filter, &can_filter_all));
|
||||
}
|
||||
|
||||
if (_lazy_read_ctx.resize_first_column) {
|
||||
// We have to clean the first column to insert right data.
|
||||
@ -523,6 +532,10 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (_state->is_cancelled()) {
|
||||
return Status::Cancelled("cancelled");
|
||||
}
|
||||
|
||||
if (select_vector_ptr == nullptr) {
|
||||
DCHECK_EQ(pre_read_rows + _cached_filtered_rows, 0);
|
||||
*read_rows = 0;
|
||||
@ -584,9 +597,13 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
|
||||
RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns));
|
||||
RETURN_IF_ERROR(_fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns));
|
||||
if (!_not_single_slot_filter_conjuncts.empty()) {
|
||||
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
|
||||
_not_single_slot_filter_conjuncts, nullptr, block, columns_to_filter,
|
||||
origin_column_num)));
|
||||
{
|
||||
SCOPED_RAW_TIMER(&_predicate_filter_time);
|
||||
RETURN_IF_CATCH_EXCEPTION(
|
||||
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
|
||||
_not_single_slot_filter_conjuncts, nullptr, block, columns_to_filter,
|
||||
origin_column_num)));
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
@ -844,8 +861,11 @@ Status RowGroupReader::_rewrite_dict_predicates() {
|
||||
// The following process may be tricky and time-consuming, but we have no other way.
|
||||
temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
|
||||
}
|
||||
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts_and_filter_block(
|
||||
ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep));
|
||||
{
|
||||
SCOPED_RAW_TIMER(&_predicate_filter_time);
|
||||
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts_and_filter_block(
|
||||
ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep));
|
||||
}
|
||||
if (dict_pos != 0) {
|
||||
// We have to clean the first column to insert right data.
|
||||
temp_block.get_by_position(0).column->assume_mutable()->clear();
|
||||
|
||||
@ -157,6 +157,7 @@ public:
|
||||
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts);
|
||||
Status next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof);
|
||||
int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; }
|
||||
int64_t predicate_filter_time() const { return _predicate_filter_time; }
|
||||
|
||||
ParquetColumnReader::Statistics statistics();
|
||||
void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
|
||||
@ -211,6 +212,7 @@ private:
|
||||
|
||||
const LazyReadContext& _lazy_read_ctx;
|
||||
int64_t _lazy_read_filtered_rows = 0;
|
||||
int64_t _predicate_filter_time = 0;
|
||||
// If continuous batches are skipped, we can cache them to skip a whole page
|
||||
size_t _cached_filtered_rows = 0;
|
||||
std::unique_ptr<IColumn::Filter> _pos_delete_filter_ptr;
|
||||
|
||||
@ -181,6 +181,8 @@ void ParquetReader::_init_profile() {
|
||||
_profile, "SkipPageHeaderNum", TUnit::UNIT, parquet_profile, 1);
|
||||
_parquet_profile.parse_page_header_num = ADD_CHILD_COUNTER_WITH_LEVEL(
|
||||
_profile, "ParsePageHeaderNum", TUnit::UNIT, parquet_profile, 1);
|
||||
_parquet_profile.predicate_filter_time =
|
||||
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PredicateFilterTime", parquet_profile, 1);
|
||||
}
|
||||
}
|
||||
|
||||
@ -595,6 +597,7 @@ Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof)
|
||||
auto column_st = _current_group_reader->statistics();
|
||||
_column_statistics.merge(column_st);
|
||||
_statistics.lazy_read_filtered_rows += _current_group_reader->lazy_read_filtered_rows();
|
||||
_statistics.predicate_filter_time += _current_group_reader->predicate_filter_time();
|
||||
if (_read_row_groups.size() == 0) {
|
||||
*eof = true;
|
||||
} else {
|
||||
@ -1042,6 +1045,7 @@ void ParquetReader::_collect_profile() {
|
||||
COUNTER_UPDATE(_parquet_profile.skip_page_header_num, _column_statistics.skip_page_header_num);
|
||||
COUNTER_UPDATE(_parquet_profile.parse_page_header_num,
|
||||
_column_statistics.parse_page_header_num);
|
||||
COUNTER_UPDATE(_parquet_profile.predicate_filter_time, _statistics.predicate_filter_time);
|
||||
COUNTER_UPDATE(_parquet_profile.file_read_time, _column_statistics.read_time);
|
||||
COUNTER_UPDATE(_parquet_profile.file_read_calls, _column_statistics.read_calls);
|
||||
COUNTER_UPDATE(_parquet_profile.file_meta_read_calls, _column_statistics.meta_read_calls);
|
||||
|
||||
@ -91,6 +91,7 @@ public:
|
||||
int64_t page_index_filter_time = 0;
|
||||
int64_t read_page_index_time = 0;
|
||||
int64_t parse_page_index_time = 0;
|
||||
int64_t predicate_filter_time = 0;
|
||||
};
|
||||
|
||||
ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
|
||||
@ -188,6 +189,7 @@ private:
|
||||
RuntimeProfile::Counter* decode_null_map_time = nullptr;
|
||||
RuntimeProfile::Counter* skip_page_header_num = nullptr;
|
||||
RuntimeProfile::Counter* parse_page_header_num = nullptr;
|
||||
RuntimeProfile::Counter* predicate_filter_time = nullptr;
|
||||
};
|
||||
|
||||
Status _open_file();
|
||||
|
||||
Reference in New Issue
Block a user