diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 30d1c93a2d..fefe3aef50 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -955,18 +955,22 @@ Status OrcReader::set_fill_columns( } } - if (!_slot_id_to_filter_conjuncts) { - return Status::OK(); + if (!_not_single_slot_filter_conjuncts.empty()) { + _filter_conjuncts.insert(_filter_conjuncts.end(), _not_single_slot_filter_conjuncts.begin(), + _not_single_slot_filter_conjuncts.end()); + _disable_dict_filter = true; } - // Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts) - // to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts. - for (auto& kv : _lazy_read_ctx.predicate_partition_columns) { - auto& [value, slot_desc] = kv.second; - auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id()); - if (iter != _slot_id_to_filter_conjuncts->end()) { - for (auto& ctx : iter->second) { - _filter_conjuncts.push_back(ctx); + if (_slot_id_to_filter_conjuncts && !_slot_id_to_filter_conjuncts->empty()) { + // Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts) + // to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts. + for (auto& kv : _lazy_read_ctx.predicate_partition_columns) { + auto& [value, slot_desc] = kv.second; + auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id()); + if (iter != _slot_id_to_filter_conjuncts->end()) { + for (const auto& ctx : iter->second) { + _filter_conjuncts.push_back(ctx); + } } } } @@ -1715,16 +1719,8 @@ Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof RETURN_IF_CATCH_EXCEPTION( Block::filter_block_internal(block, columns_to_filter, *_filter)); } - if (!_not_single_slot_filter_conjuncts.empty()) { - RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec)); - RETURN_IF_CATCH_EXCEPTION( - RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( - _not_single_slot_filter_conjuncts, nullptr, block, columns_to_filter, - column_to_keep))); - } else { - Block::erase_useless_column(block, column_to_keep); - RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec)); - } + Block::erase_useless_column(block, column_to_keep); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec)); *read_rows = block->rows(); } else { uint64_t rr; @@ -1841,17 +1837,8 @@ Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof RETURN_IF_CATCH_EXCEPTION( Block::filter_block_internal(block, columns_to_filter, result_filter)); } - //_not_single_slot_filter_conjuncts check : missing column1 == missing column2 , missing column == exists column ... - if (!_not_single_slot_filter_conjuncts.empty()) { - RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec)); - RETURN_IF_CATCH_EXCEPTION( - RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( - _not_single_slot_filter_conjuncts, nullptr, block, - columns_to_filter, column_to_keep))); - } else { - Block::erase_useless_column(block, column_to_keep); - RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec)); - } + Block::erase_useless_column(block, column_to_keep); + RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec)); } else { if (_delete_rows_filter_ptr) { _execute_filter_position_delete_rowids(*_delete_rows_filter_ptr); @@ -2032,8 +2019,8 @@ Status OrcReader::fill_dict_filter_column_names( int i = 0; for (auto& predicate_col_name : predicate_col_names) { int slot_id = predicate_col_slot_ids[i]; - if (_can_filter_by_dict(slot_id)) { - _dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id)); + if (!_disable_dict_filter && _can_filter_by_dict(slot_id)) { + _dict_filter_cols.emplace_back(predicate_col_name, slot_id); column_names.emplace_back(_col_name_to_file_col_name[predicate_col_name]); } else { if (_slot_id_to_filter_conjuncts->find(slot_id) != diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 2065f233ea..6fb4c886ee 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -612,6 +612,7 @@ private: VExprContextSPtrs _dict_filter_conjuncts; VExprContextSPtrs _non_dict_filter_conjuncts; VExprContextSPtrs _filter_conjuncts; + bool _disable_dict_filter = false; // std::pair std::vector> _dict_filter_cols; std::shared_ptr _obj_pool; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 9ca8b457c8..b3eae11109 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -109,11 +109,6 @@ Status RowGroupReader::init( _tuple_descriptor = tuple_descriptor; _row_descriptor = row_descriptor; _col_name_to_slot_id = colname_to_slot_id; - if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) { - _not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(), - not_single_slot_filter_conjuncts->begin(), - not_single_slot_filter_conjuncts->end()); - } _slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts; _merge_read_ranges(row_ranges); if (_read_columns.empty()) { @@ -140,45 +135,52 @@ Status RowGroupReader::init( } _column_readers[read_col] = std::move(reader); } - // Check if single slot can be filtered by dict. - if (!_slot_id_to_filter_conjuncts) { - return Status::OK(); + + bool disable_dict_filter = false; + if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) { + disable_dict_filter = true; + _filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(), + not_single_slot_filter_conjuncts->end()); } - const std::vector& predicate_col_names = _lazy_read_ctx.predicate_columns.first; - const std::vector& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second; - for (size_t i = 0; i < predicate_col_names.size(); ++i) { - const string& predicate_col_name = predicate_col_names[i]; - int slot_id = predicate_col_slot_ids[i]; - auto field = const_cast(schema.get_column(predicate_col_name)); - if (!_lazy_read_ctx.has_complex_type && - _can_filter_by_dict(slot_id, - _row_group_meta.columns[field->physical_column_index].meta_data)) { - _dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id)); - } else { - if (_slot_id_to_filter_conjuncts->find(slot_id) != - _slot_id_to_filter_conjuncts->end()) { - for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) { + + // Check if single slot can be filtered by dict. + if (_slot_id_to_filter_conjuncts && !_slot_id_to_filter_conjuncts->empty()) { + const std::vector& predicate_col_names = _lazy_read_ctx.predicate_columns.first; + const std::vector& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second; + for (size_t i = 0; i < predicate_col_names.size(); ++i) { + const string& predicate_col_name = predicate_col_names[i]; + int slot_id = predicate_col_slot_ids[i]; + auto field = const_cast(schema.get_column(predicate_col_name)); + if (!disable_dict_filter && !_lazy_read_ctx.has_complex_type && + _can_filter_by_dict( + slot_id, _row_group_meta.columns[field->physical_column_index].meta_data)) { + _dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id)); + } else { + if (_slot_id_to_filter_conjuncts->find(slot_id) != + _slot_id_to_filter_conjuncts->end()) { + for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) { + _filter_conjuncts.push_back(ctx); + } + } + } + } + // Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts) + // to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts. + for (auto& kv : _lazy_read_ctx.predicate_partition_columns) { + auto& [value, slot_desc] = kv.second; + auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id()); + if (iter != _slot_id_to_filter_conjuncts->end()) { + for (auto& ctx : iter->second) { _filter_conjuncts.push_back(ctx); } } } + //For check missing column : missing column == xx, missing column is null,missing column is not null. + _filter_conjuncts.insert(_filter_conjuncts.end(), + _lazy_read_ctx.missing_columns_conjuncts.begin(), + _lazy_read_ctx.missing_columns_conjuncts.end()); + RETURN_IF_ERROR(_rewrite_dict_predicates()); } - // Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts) - // to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts. - for (auto& kv : _lazy_read_ctx.predicate_partition_columns) { - auto& [value, slot_desc] = kv.second; - auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id()); - if (iter != _slot_id_to_filter_conjuncts->end()) { - for (auto& ctx : iter->second) { - _filter_conjuncts.push_back(ctx); - } - } - } - //For check missing column : missing column == xx, missing column is null,missing column is not null. - _filter_conjuncts.insert(_filter_conjuncts.end(), - _lazy_read_ctx.missing_columns_conjuncts.begin(), - _lazy_read_ctx.missing_columns_conjuncts.end()); - RETURN_IF_ERROR(_rewrite_dict_predicates()); return Status::OK(); } @@ -351,17 +353,8 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_ RETURN_IF_CATCH_EXCEPTION( Block::filter_block_internal(block, columns_to_filter, result_filter)); - if (!_not_single_slot_filter_conjuncts.empty()) { - _convert_dict_cols_to_string_cols(block); - SCOPED_RAW_TIMER(&_predicate_filter_time); - RETURN_IF_CATCH_EXCEPTION( - RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( - _not_single_slot_filter_conjuncts, nullptr, block, - columns_to_filter, column_to_keep))); - } else { - Block::erase_useless_column(block, column_to_keep); - _convert_dict_cols_to_string_cols(block); - } + Block::erase_useless_column(block, column_to_keep); + _convert_dict_cols_to_string_cols(block); } else { RETURN_IF_CATCH_EXCEPTION( RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter))); @@ -591,15 +584,6 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re *batch_eof = pre_eof; RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns)); RETURN_IF_ERROR(_fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns)); - if (!_not_single_slot_filter_conjuncts.empty()) { - { - SCOPED_RAW_TIMER(&_predicate_filter_time); - RETURN_IF_CATCH_EXCEPTION( - RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( - _not_single_slot_filter_conjuncts, nullptr, block, columns_to_filter, - origin_column_num))); - } - } return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index a889c1774e..2e57c5721e 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -220,7 +220,6 @@ private: const TupleDescriptor* _tuple_descriptor = nullptr; const RowDescriptor* _row_descriptor = nullptr; const std::unordered_map* _col_name_to_slot_id = nullptr; - VExprContextSPtrs _not_single_slot_filter_conjuncts; const std::unordered_map* _slot_id_to_filter_conjuncts = nullptr; VExprContextSPtrs _dict_filter_conjuncts; VExprContextSPtrs _filter_conjuncts;