diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 93798c0b6e..1b1140cc5e 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -101,13 +101,16 @@ Status RowGroupReader::init( if (!_slot_id_to_filter_conjuncts) { return Status::OK(); } - for (auto& predicate_col_name : _lazy_read_ctx.predicate_columns) { + const std::vector& predicate_col_names = _lazy_read_ctx.predicate_columns.first; + const std::vector& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second; + for (size_t i = 0; i < predicate_col_names.size(); ++i) { + const string& predicate_col_name = predicate_col_names[i]; + int slot_id = predicate_col_slot_ids[i]; auto field = const_cast(schema.get_column(predicate_col_name)); - if (_can_filter_by_dict(predicate_col_name, + if (_can_filter_by_dict(slot_id, _row_group_meta.columns[field->physical_column_index].meta_data)) { - _dict_filter_col_names.emplace_back(predicate_col_name); + _dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id)); } else { - int slot_id = _col_name_to_slot_id->at(predicate_col_name); if (_slot_id_to_filter_conjuncts->find(slot_id) != _slot_id_to_filter_conjuncts->end()) { for (VExprContext* ctx : _slot_id_to_filter_conjuncts->at(slot_id)) { @@ -120,11 +123,10 @@ Status RowGroupReader::init( return Status::OK(); } -bool RowGroupReader::_can_filter_by_dict(const string& predicate_col_name, +bool RowGroupReader::_can_filter_by_dict(int slot_id, const tparquet::ColumnMetaData& column_metadata) { SlotDescriptor* slot = nullptr; const std::vector& slots = _tuple_descriptor->slots(); - int slot_id = _col_name_to_slot_id->at(predicate_col_name); for (auto each : slots) { if (each->id() == slot_id) { slot = each; @@ -290,27 +292,29 @@ Status RowGroupReader::_read_column_data(Block* block, const std::vectorget_by_name(read_col); + for (auto& read_col_name : columns) { + auto& column_with_type_and_name = block->get_by_name(read_col_name); auto& column_ptr = column_with_type_and_name.column; auto& column_type = column_with_type_and_name.type; - auto col_iter = - std::find(_dict_filter_col_names.begin(), _dict_filter_col_names.end(), read_col); bool is_dict_filter = false; - if (col_iter != _dict_filter_col_names.end()) { - MutableColumnPtr dict_column = ColumnVector::create(); - size_t pos = block->get_position_by_name(read_col); - if (column_type->is_nullable()) { - block->get_by_position(pos).type = - std::make_shared(std::make_shared()); - block->replace_by_position( - pos, ColumnNullable::create(std::move(dict_column), - ColumnUInt8::create(dict_column->size(), 0))); - } else { - block->get_by_position(pos).type = std::make_shared(); - block->replace_by_position(pos, std::move(dict_column)); + for (auto& _dict_filter_col : _dict_filter_cols) { + if (_dict_filter_col.first == read_col_name) { + MutableColumnPtr dict_column = ColumnVector::create(); + size_t pos = block->get_position_by_name(read_col_name); + if (column_type->is_nullable()) { + block->get_by_position(pos).type = + std::make_shared(std::make_shared()); + block->replace_by_position( + pos, + ColumnNullable::create(std::move(dict_column), + ColumnUInt8::create(dict_column->size(), 0))); + } else { + block->get_by_position(pos).type = std::make_shared(); + block->replace_by_position(pos, std::move(dict_column)); + } + is_dict_filter = true; + break; } - is_dict_filter = true; } size_t col_read_rows = 0; @@ -319,7 +323,7 @@ Status RowGroupReader::_read_column_data(Block* block, const std::vectorread_column_data( + RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data( column_ptr, column_type, select_vector, batch_size - col_read_rows, &loop_rows, &col_eof, is_dict_filter)); col_read_rows += loop_rows; @@ -349,7 +353,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re pre_read_rows = 0; pre_eof = false; ColumnSelectVector run_length_vector; - RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns, batch_size, + RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns.first, batch_size, &pre_read_rows, &pre_eof, run_length_vector)); if (pre_read_rows == 0) { DCHECK_EQ(pre_eof, true); @@ -387,7 +391,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re if (select_vector_ptr->filter_all() && !pre_eof) { // If continuous batches are skipped, we can cache them to skip a whole page _cached_filtered_rows += pre_read_rows; - for (auto& col : _lazy_read_ctx.predicate_columns) { + for (auto& col : _lazy_read_ctx.predicate_columns.first) { // clean block to read predicate columns block->get_by_name(col).column->assume_mutable()->clear(); } @@ -668,10 +672,9 @@ Status RowGroupReader::_filter_block_internal(Block* block, } Status RowGroupReader::_rewrite_dict_predicates() { - for (vector::iterator it = _dict_filter_col_names.begin(); - it != _dict_filter_col_names.end();) { - std::string& dict_filter_col_name = *it; - int slot_id = _col_name_to_slot_id->at(dict_filter_col_name); + for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) { + std::string& dict_filter_col_name = it->first; + int slot_id = it->second; // 1. Get dictionary values to a string column. MutableColumnPtr dict_value_column = ColumnString::create(); bool has_dict = false; @@ -750,7 +753,7 @@ Status RowGroupReader::_rewrite_dict_predicates() { for (auto& ctx : (*ctxs)) { _filter_conjuncts.push_back(ctx); } - it = _dict_filter_col_names.erase(it); + it = _dict_filter_cols.erase(it); continue; } @@ -869,8 +872,8 @@ Status RowGroupReader::_rewrite_dict_conjuncts(std::vector& dict_codes, } void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { - for (auto& dict_filter_col_name : _dict_filter_col_names) { - size_t pos = block->get_position_by_name(dict_filter_col_name); + for (auto& dict_filter_cols : _dict_filter_cols) { + size_t pos = block->get_position_by_name(dict_filter_cols.first); ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos); const ColumnPtr& column = column_with_type_and_name.column; if (auto* nullable_column = check_and_get_column(*column)) { @@ -879,7 +882,7 @@ void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { DCHECK(dict_column); MutableColumnPtr string_column = - _column_readers[dict_filter_col_name]->convert_dict_column_to_string_column( + _column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column( dict_column); column_with_type_and_name.type = @@ -890,7 +893,7 @@ void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) { } else { const ColumnInt32* dict_column = assert_cast(column.get()); MutableColumnPtr string_column = - _column_readers[dict_filter_col_name]->convert_dict_column_to_string_column( + _column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column( dict_column); column_with_type_and_name.type = std::make_shared(); diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 3768bf49c9..49545e1051 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -49,7 +49,10 @@ public: std::vector all_read_columns; // include predicate_partition_columns & predicate_missing_columns std::vector all_predicate_col_ids; - std::vector predicate_columns; + // save slot_id to find dict filter column name, because expr column name may + // be different with parquet column name + // std::pair, std::vector> + std::pair, std::vector> predicate_columns; std::vector lazy_read_columns; std::unordered_map> predicate_partition_columns; @@ -143,8 +146,7 @@ private: Status _filter_block_internal(Block* block, const vector& columns_to_filter, const IColumn::Filter& filter); - bool _can_filter_by_dict(const string& predicate_col_name, - const tparquet::ColumnMetaData& column_metadata); + bool _can_filter_by_dict(int slot_id, const tparquet::ColumnMetaData& column_metadata); bool is_dictionary_encoded(const tparquet::ColumnMetaData& column_metadata); Status _rewrite_dict_predicates(); Status _rewrite_dict_conjuncts(std::vector& dict_codes, int slot_id, bool is_nullable); @@ -182,7 +184,8 @@ private: const std::unordered_map>* _slot_id_to_filter_conjuncts; std::vector _dict_filter_conjuncts; std::vector _filter_conjuncts; - std::vector _dict_filter_col_names; + // std::pair + std::vector> _dict_filter_cols; RuntimeState* _state; std::shared_ptr _obj_pool; bool _is_row_group_filtered = false; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 6da31fee04..5a74fa0e03 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -251,7 +251,8 @@ Status ParquetReader::set_fill_columns( partition_columns, const std::unordered_map& missing_columns) { SCOPED_RAW_TIMER(&_statistics.parse_meta_time); - std::unordered_map predicate_columns; + // std::unordered_map> + std::unordered_map> predicate_columns; std::function visit_slot = [&](VExpr* expr) { if (VSlotRef* slot_ref = typeid_cast(expr)) { auto expr_name = slot_ref->expr_name(); @@ -259,7 +260,8 @@ Status ParquetReader::set_fill_columns( if (iter != _table_col_to_file_col.end()) { expr_name = iter->second; } - predicate_columns.emplace(expr_name, slot_ref->column_id()); + predicate_columns.emplace(expr_name, + std::make_pair(slot_ref->column_id(), slot_ref->slot_id())); if (slot_ref->column_id() == 0) { _lazy_read_ctx.resize_first_column = false; } @@ -302,8 +304,9 @@ Status ParquetReader::set_fill_columns( if (iter == predicate_columns.end()) { _lazy_read_ctx.lazy_read_columns.emplace_back(read_col._file_slot_name); } else { - _lazy_read_ctx.predicate_columns.emplace_back(iter->first); - _lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second); + _lazy_read_ctx.predicate_columns.first.emplace_back(iter->first); + _lazy_read_ctx.predicate_columns.second.emplace_back(iter->second.second); + _lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second.first); } } } @@ -314,7 +317,7 @@ Status ParquetReader::set_fill_columns( _lazy_read_ctx.partition_columns.emplace(kv.first, kv.second); } else { _lazy_read_ctx.predicate_partition_columns.emplace(kv.first, kv.second); - _lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second); + _lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second.first); } } @@ -324,11 +327,11 @@ Status ParquetReader::set_fill_columns( _lazy_read_ctx.missing_columns.emplace(kv.first, kv.second); } else { _lazy_read_ctx.predicate_missing_columns.emplace(kv.first, kv.second); - _lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second); + _lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second.first); } } - if (!_has_complex_type && _lazy_read_ctx.predicate_columns.size() > 0 && + if (!_has_complex_type && _lazy_read_ctx.predicate_columns.first.size() > 0 && _lazy_read_ctx.lazy_read_columns.size() > 0) { _lazy_read_ctx.can_lazy_read = true; }