[Fix](parquet-reader) Fix iceberg_schema_evolution regression test caused by slot col name different with parquet col name. (#17988)

This commit is contained in:
Qi Chen
2023-03-23 11:23:08 +08:00
committed by GitHub
parent abeec4848a
commit 3870689cbb
3 changed files with 55 additions and 46 deletions

View File

@ -101,13 +101,16 @@ Status RowGroupReader::init(
if (!_slot_id_to_filter_conjuncts) {
return Status::OK();
}
for (auto& predicate_col_name : _lazy_read_ctx.predicate_columns) {
const std::vector<string>& predicate_col_names = _lazy_read_ctx.predicate_columns.first;
const std::vector<int>& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second;
for (size_t i = 0; i < predicate_col_names.size(); ++i) {
const string& predicate_col_name = predicate_col_names[i];
int slot_id = predicate_col_slot_ids[i];
auto field = const_cast<FieldSchema*>(schema.get_column(predicate_col_name));
if (_can_filter_by_dict(predicate_col_name,
if (_can_filter_by_dict(slot_id,
_row_group_meta.columns[field->physical_column_index].meta_data)) {
_dict_filter_col_names.emplace_back(predicate_col_name);
_dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id));
} else {
int slot_id = _col_name_to_slot_id->at(predicate_col_name);
if (_slot_id_to_filter_conjuncts->find(slot_id) !=
_slot_id_to_filter_conjuncts->end()) {
for (VExprContext* ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
@ -120,11 +123,10 @@ Status RowGroupReader::init(
return Status::OK();
}
bool RowGroupReader::_can_filter_by_dict(const string& predicate_col_name,
bool RowGroupReader::_can_filter_by_dict(int slot_id,
const tparquet::ColumnMetaData& column_metadata) {
SlotDescriptor* slot = nullptr;
const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
int slot_id = _col_name_to_slot_id->at(predicate_col_name);
for (auto each : slots) {
if (each->id() == slot_id) {
slot = each;
@ -290,27 +292,29 @@ Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::st
ColumnSelectVector& select_vector) {
size_t batch_read_rows = 0;
bool has_eof = false;
for (auto& read_col : columns) {
auto& column_with_type_and_name = block->get_by_name(read_col);
for (auto& read_col_name : columns) {
auto& column_with_type_and_name = block->get_by_name(read_col_name);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
auto col_iter =
std::find(_dict_filter_col_names.begin(), _dict_filter_col_names.end(), read_col);
bool is_dict_filter = false;
if (col_iter != _dict_filter_col_names.end()) {
MutableColumnPtr dict_column = ColumnVector<Int32>::create();
size_t pos = block->get_position_by_name(read_col);
if (column_type->is_nullable()) {
block->get_by_position(pos).type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
block->replace_by_position(
pos, ColumnNullable::create(std::move(dict_column),
ColumnUInt8::create(dict_column->size(), 0)));
} else {
block->get_by_position(pos).type = std::make_shared<DataTypeInt32>();
block->replace_by_position(pos, std::move(dict_column));
for (auto& _dict_filter_col : _dict_filter_cols) {
if (_dict_filter_col.first == read_col_name) {
MutableColumnPtr dict_column = ColumnVector<Int32>::create();
size_t pos = block->get_position_by_name(read_col_name);
if (column_type->is_nullable()) {
block->get_by_position(pos).type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
block->replace_by_position(
pos,
ColumnNullable::create(std::move(dict_column),
ColumnUInt8::create(dict_column->size(), 0)));
} else {
block->get_by_position(pos).type = std::make_shared<DataTypeInt32>();
block->replace_by_position(pos, std::move(dict_column));
}
is_dict_filter = true;
break;
}
is_dict_filter = true;
}
size_t col_read_rows = 0;
@ -319,7 +323,7 @@ Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::st
select_vector.reset();
while (!col_eof && col_read_rows < batch_size) {
size_t loop_rows = 0;
RETURN_IF_ERROR(_column_readers[read_col]->read_column_data(
RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data(
column_ptr, column_type, select_vector, batch_size - col_read_rows, &loop_rows,
&col_eof, is_dict_filter));
col_read_rows += loop_rows;
@ -349,7 +353,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
pre_read_rows = 0;
pre_eof = false;
ColumnSelectVector run_length_vector;
RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns, batch_size,
RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns.first, batch_size,
&pre_read_rows, &pre_eof, run_length_vector));
if (pre_read_rows == 0) {
DCHECK_EQ(pre_eof, true);
@ -387,7 +391,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
if (select_vector_ptr->filter_all() && !pre_eof) {
// If continuous batches are skipped, we can cache them to skip a whole page
_cached_filtered_rows += pre_read_rows;
for (auto& col : _lazy_read_ctx.predicate_columns) {
for (auto& col : _lazy_read_ctx.predicate_columns.first) {
// clean block to read predicate columns
block->get_by_name(col).column->assume_mutable()->clear();
}
@ -668,10 +672,9 @@ Status RowGroupReader::_filter_block_internal(Block* block,
}
Status RowGroupReader::_rewrite_dict_predicates() {
for (vector<std::string>::iterator it = _dict_filter_col_names.begin();
it != _dict_filter_col_names.end();) {
std::string& dict_filter_col_name = *it;
int slot_id = _col_name_to_slot_id->at(dict_filter_col_name);
for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) {
std::string& dict_filter_col_name = it->first;
int slot_id = it->second;
// 1. Get dictionary values to a string column.
MutableColumnPtr dict_value_column = ColumnString::create();
bool has_dict = false;
@ -750,7 +753,7 @@ Status RowGroupReader::_rewrite_dict_predicates() {
for (auto& ctx : (*ctxs)) {
_filter_conjuncts.push_back(ctx);
}
it = _dict_filter_col_names.erase(it);
it = _dict_filter_cols.erase(it);
continue;
}
@ -869,8 +872,8 @@ Status RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes,
}
void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
for (auto& dict_filter_col_name : _dict_filter_col_names) {
size_t pos = block->get_position_by_name(dict_filter_col_name);
for (auto& dict_filter_cols : _dict_filter_cols) {
size_t pos = block->get_position_by_name(dict_filter_cols.first);
ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos);
const ColumnPtr& column = column_with_type_and_name.column;
if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
@ -879,7 +882,7 @@ void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
DCHECK(dict_column);
MutableColumnPtr string_column =
_column_readers[dict_filter_col_name]->convert_dict_column_to_string_column(
_column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
dict_column);
column_with_type_and_name.type =
@ -890,7 +893,7 @@ void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
} else {
const ColumnInt32* dict_column = assert_cast<const ColumnInt32*>(column.get());
MutableColumnPtr string_column =
_column_readers[dict_filter_col_name]->convert_dict_column_to_string_column(
_column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
dict_column);
column_with_type_and_name.type = std::make_shared<DataTypeString>();

View File

@ -49,7 +49,10 @@ public:
std::vector<std::string> all_read_columns;
// include predicate_partition_columns & predicate_missing_columns
std::vector<uint32_t> all_predicate_col_ids;
std::vector<std::string> predicate_columns;
// save slot_id to find dict filter column name, because expr column name may
// be different with parquet column name
// std::pair<std::vector<col_name>, std::vector<slot_id>>
std::pair<std::vector<std::string>, std::vector<int>> predicate_columns;
std::vector<std::string> lazy_read_columns;
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
predicate_partition_columns;
@ -143,8 +146,7 @@ private:
Status _filter_block_internal(Block* block, const vector<uint32_t>& columns_to_filter,
const IColumn::Filter& filter);
bool _can_filter_by_dict(const string& predicate_col_name,
const tparquet::ColumnMetaData& column_metadata);
bool _can_filter_by_dict(int slot_id, const tparquet::ColumnMetaData& column_metadata);
bool is_dictionary_encoded(const tparquet::ColumnMetaData& column_metadata);
Status _rewrite_dict_predicates();
Status _rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id, bool is_nullable);
@ -182,7 +184,8 @@ private:
const std::unordered_map<int, std::vector<VExprContext*>>* _slot_id_to_filter_conjuncts;
std::vector<VExprContext*> _dict_filter_conjuncts;
std::vector<VExprContext*> _filter_conjuncts;
std::vector<std::string> _dict_filter_col_names;
// std::pair<col_name, slot_id>
std::vector<std::pair<std::string, int>> _dict_filter_cols;
RuntimeState* _state;
std::shared_ptr<ObjectPool> _obj_pool;
bool _is_row_group_filtered = false;

View File

@ -251,7 +251,8 @@ Status ParquetReader::set_fill_columns(
partition_columns,
const std::unordered_map<std::string, VExprContext*>& missing_columns) {
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
std::unordered_map<std::string, uint32_t> predicate_columns;
// std::unordered_map<column_name, std::pair<col_id, slot_id>>
std::unordered_map<std::string, std::pair<uint32_t, int>> predicate_columns;
std::function<void(VExpr * expr)> visit_slot = [&](VExpr* expr) {
if (VSlotRef* slot_ref = typeid_cast<VSlotRef*>(expr)) {
auto expr_name = slot_ref->expr_name();
@ -259,7 +260,8 @@ Status ParquetReader::set_fill_columns(
if (iter != _table_col_to_file_col.end()) {
expr_name = iter->second;
}
predicate_columns.emplace(expr_name, slot_ref->column_id());
predicate_columns.emplace(expr_name,
std::make_pair(slot_ref->column_id(), slot_ref->slot_id()));
if (slot_ref->column_id() == 0) {
_lazy_read_ctx.resize_first_column = false;
}
@ -302,8 +304,9 @@ Status ParquetReader::set_fill_columns(
if (iter == predicate_columns.end()) {
_lazy_read_ctx.lazy_read_columns.emplace_back(read_col._file_slot_name);
} else {
_lazy_read_ctx.predicate_columns.emplace_back(iter->first);
_lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second);
_lazy_read_ctx.predicate_columns.first.emplace_back(iter->first);
_lazy_read_ctx.predicate_columns.second.emplace_back(iter->second.second);
_lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second.first);
}
}
}
@ -314,7 +317,7 @@ Status ParquetReader::set_fill_columns(
_lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
} else {
_lazy_read_ctx.predicate_partition_columns.emplace(kv.first, kv.second);
_lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second);
_lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second.first);
}
}
@ -324,11 +327,11 @@ Status ParquetReader::set_fill_columns(
_lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
} else {
_lazy_read_ctx.predicate_missing_columns.emplace(kv.first, kv.second);
_lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second);
_lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second.first);
}
}
if (!_has_complex_type && _lazy_read_ctx.predicate_columns.size() > 0 &&
if (!_has_complex_type && _lazy_read_ctx.predicate_columns.first.size() > 0 &&
_lazy_read_ctx.lazy_read_columns.size() > 0) {
_lazy_read_ctx.can_lazy_read = true;
}