[Opt](multi-catalog)Disable dict filter in parquet/orc reader if have non-single conjuncts. (#52617)
### What problem does this PR solve? Problem Summary: Cherry-pick #44777
This commit is contained in:
@ -955,18 +955,22 @@ Status OrcReader::set_fill_columns(
|
||||
}
|
||||
}
|
||||
|
||||
if (!_slot_id_to_filter_conjuncts) {
|
||||
return Status::OK();
|
||||
if (!_not_single_slot_filter_conjuncts.empty()) {
|
||||
_filter_conjuncts.insert(_filter_conjuncts.end(), _not_single_slot_filter_conjuncts.begin(),
|
||||
_not_single_slot_filter_conjuncts.end());
|
||||
_disable_dict_filter = true;
|
||||
}
|
||||
|
||||
// Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts)
|
||||
// to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts.
|
||||
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
|
||||
auto& [value, slot_desc] = kv.second;
|
||||
auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
|
||||
if (iter != _slot_id_to_filter_conjuncts->end()) {
|
||||
for (auto& ctx : iter->second) {
|
||||
_filter_conjuncts.push_back(ctx);
|
||||
if (_slot_id_to_filter_conjuncts && !_slot_id_to_filter_conjuncts->empty()) {
|
||||
// Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts)
|
||||
// to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts.
|
||||
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
|
||||
auto& [value, slot_desc] = kv.second;
|
||||
auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
|
||||
if (iter != _slot_id_to_filter_conjuncts->end()) {
|
||||
for (const auto& ctx : iter->second) {
|
||||
_filter_conjuncts.push_back(ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1715,16 +1719,8 @@ Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof
|
||||
RETURN_IF_CATCH_EXCEPTION(
|
||||
Block::filter_block_internal(block, columns_to_filter, *_filter));
|
||||
}
|
||||
if (!_not_single_slot_filter_conjuncts.empty()) {
|
||||
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
|
||||
RETURN_IF_CATCH_EXCEPTION(
|
||||
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
|
||||
_not_single_slot_filter_conjuncts, nullptr, block, columns_to_filter,
|
||||
column_to_keep)));
|
||||
} else {
|
||||
Block::erase_useless_column(block, column_to_keep);
|
||||
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
|
||||
}
|
||||
Block::erase_useless_column(block, column_to_keep);
|
||||
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
|
||||
*read_rows = block->rows();
|
||||
} else {
|
||||
uint64_t rr;
|
||||
@ -1841,17 +1837,8 @@ Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof
|
||||
RETURN_IF_CATCH_EXCEPTION(
|
||||
Block::filter_block_internal(block, columns_to_filter, result_filter));
|
||||
}
|
||||
//_not_single_slot_filter_conjuncts check : missing column1 == missing column2 , missing column == exists column ...
|
||||
if (!_not_single_slot_filter_conjuncts.empty()) {
|
||||
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
|
||||
RETURN_IF_CATCH_EXCEPTION(
|
||||
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
|
||||
_not_single_slot_filter_conjuncts, nullptr, block,
|
||||
columns_to_filter, column_to_keep)));
|
||||
} else {
|
||||
Block::erase_useless_column(block, column_to_keep);
|
||||
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
|
||||
}
|
||||
Block::erase_useless_column(block, column_to_keep);
|
||||
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
|
||||
} else {
|
||||
if (_delete_rows_filter_ptr) {
|
||||
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
|
||||
@ -2032,8 +2019,8 @@ Status OrcReader::fill_dict_filter_column_names(
|
||||
int i = 0;
|
||||
for (auto& predicate_col_name : predicate_col_names) {
|
||||
int slot_id = predicate_col_slot_ids[i];
|
||||
if (_can_filter_by_dict(slot_id)) {
|
||||
_dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id));
|
||||
if (!_disable_dict_filter && _can_filter_by_dict(slot_id)) {
|
||||
_dict_filter_cols.emplace_back(predicate_col_name, slot_id);
|
||||
column_names.emplace_back(_col_name_to_file_col_name[predicate_col_name]);
|
||||
} else {
|
||||
if (_slot_id_to_filter_conjuncts->find(slot_id) !=
|
||||
|
||||
@ -612,6 +612,7 @@ private:
|
||||
VExprContextSPtrs _dict_filter_conjuncts;
|
||||
VExprContextSPtrs _non_dict_filter_conjuncts;
|
||||
VExprContextSPtrs _filter_conjuncts;
|
||||
bool _disable_dict_filter = false;
|
||||
// std::pair<col_name, slot_id>
|
||||
std::vector<std::pair<std::string, int>> _dict_filter_cols;
|
||||
std::shared_ptr<ObjectPool> _obj_pool;
|
||||
|
||||
@ -109,11 +109,6 @@ Status RowGroupReader::init(
|
||||
_tuple_descriptor = tuple_descriptor;
|
||||
_row_descriptor = row_descriptor;
|
||||
_col_name_to_slot_id = colname_to_slot_id;
|
||||
if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) {
|
||||
_not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(),
|
||||
not_single_slot_filter_conjuncts->begin(),
|
||||
not_single_slot_filter_conjuncts->end());
|
||||
}
|
||||
_slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
|
||||
_merge_read_ranges(row_ranges);
|
||||
if (_read_columns.empty()) {
|
||||
@ -140,45 +135,52 @@ Status RowGroupReader::init(
|
||||
}
|
||||
_column_readers[read_col] = std::move(reader);
|
||||
}
|
||||
// Check if single slot can be filtered by dict.
|
||||
if (!_slot_id_to_filter_conjuncts) {
|
||||
return Status::OK();
|
||||
|
||||
bool disable_dict_filter = false;
|
||||
if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) {
|
||||
disable_dict_filter = true;
|
||||
_filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(),
|
||||
not_single_slot_filter_conjuncts->end());
|
||||
}
|
||||
const std::vector<string>& predicate_col_names = _lazy_read_ctx.predicate_columns.first;
|
||||
const std::vector<int>& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second;
|
||||
for (size_t i = 0; i < predicate_col_names.size(); ++i) {
|
||||
const string& predicate_col_name = predicate_col_names[i];
|
||||
int slot_id = predicate_col_slot_ids[i];
|
||||
auto field = const_cast<FieldSchema*>(schema.get_column(predicate_col_name));
|
||||
if (!_lazy_read_ctx.has_complex_type &&
|
||||
_can_filter_by_dict(slot_id,
|
||||
_row_group_meta.columns[field->physical_column_index].meta_data)) {
|
||||
_dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id));
|
||||
} else {
|
||||
if (_slot_id_to_filter_conjuncts->find(slot_id) !=
|
||||
_slot_id_to_filter_conjuncts->end()) {
|
||||
for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
|
||||
|
||||
// Check if single slot can be filtered by dict.
|
||||
if (_slot_id_to_filter_conjuncts && !_slot_id_to_filter_conjuncts->empty()) {
|
||||
const std::vector<string>& predicate_col_names = _lazy_read_ctx.predicate_columns.first;
|
||||
const std::vector<int>& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second;
|
||||
for (size_t i = 0; i < predicate_col_names.size(); ++i) {
|
||||
const string& predicate_col_name = predicate_col_names[i];
|
||||
int slot_id = predicate_col_slot_ids[i];
|
||||
auto field = const_cast<FieldSchema*>(schema.get_column(predicate_col_name));
|
||||
if (!disable_dict_filter && !_lazy_read_ctx.has_complex_type &&
|
||||
_can_filter_by_dict(
|
||||
slot_id, _row_group_meta.columns[field->physical_column_index].meta_data)) {
|
||||
_dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id));
|
||||
} else {
|
||||
if (_slot_id_to_filter_conjuncts->find(slot_id) !=
|
||||
_slot_id_to_filter_conjuncts->end()) {
|
||||
for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
|
||||
_filter_conjuncts.push_back(ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts)
|
||||
// to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts.
|
||||
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
|
||||
auto& [value, slot_desc] = kv.second;
|
||||
auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
|
||||
if (iter != _slot_id_to_filter_conjuncts->end()) {
|
||||
for (auto& ctx : iter->second) {
|
||||
_filter_conjuncts.push_back(ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
//For check missing column : missing column == xx, missing column is null,missing column is not null.
|
||||
_filter_conjuncts.insert(_filter_conjuncts.end(),
|
||||
_lazy_read_ctx.missing_columns_conjuncts.begin(),
|
||||
_lazy_read_ctx.missing_columns_conjuncts.end());
|
||||
RETURN_IF_ERROR(_rewrite_dict_predicates());
|
||||
}
|
||||
// Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts)
|
||||
// to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts.
|
||||
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
|
||||
auto& [value, slot_desc] = kv.second;
|
||||
auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
|
||||
if (iter != _slot_id_to_filter_conjuncts->end()) {
|
||||
for (auto& ctx : iter->second) {
|
||||
_filter_conjuncts.push_back(ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
//For check missing column : missing column == xx, missing column is null,missing column is not null.
|
||||
_filter_conjuncts.insert(_filter_conjuncts.end(),
|
||||
_lazy_read_ctx.missing_columns_conjuncts.begin(),
|
||||
_lazy_read_ctx.missing_columns_conjuncts.end());
|
||||
RETURN_IF_ERROR(_rewrite_dict_predicates());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -351,17 +353,8 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
|
||||
|
||||
RETURN_IF_CATCH_EXCEPTION(
|
||||
Block::filter_block_internal(block, columns_to_filter, result_filter));
|
||||
if (!_not_single_slot_filter_conjuncts.empty()) {
|
||||
_convert_dict_cols_to_string_cols(block);
|
||||
SCOPED_RAW_TIMER(&_predicate_filter_time);
|
||||
RETURN_IF_CATCH_EXCEPTION(
|
||||
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
|
||||
_not_single_slot_filter_conjuncts, nullptr, block,
|
||||
columns_to_filter, column_to_keep)));
|
||||
} else {
|
||||
Block::erase_useless_column(block, column_to_keep);
|
||||
_convert_dict_cols_to_string_cols(block);
|
||||
}
|
||||
Block::erase_useless_column(block, column_to_keep);
|
||||
_convert_dict_cols_to_string_cols(block);
|
||||
} else {
|
||||
RETURN_IF_CATCH_EXCEPTION(
|
||||
RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter)));
|
||||
@ -591,15 +584,6 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
|
||||
*batch_eof = pre_eof;
|
||||
RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns));
|
||||
RETURN_IF_ERROR(_fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns));
|
||||
if (!_not_single_slot_filter_conjuncts.empty()) {
|
||||
{
|
||||
SCOPED_RAW_TIMER(&_predicate_filter_time);
|
||||
RETURN_IF_CATCH_EXCEPTION(
|
||||
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
|
||||
_not_single_slot_filter_conjuncts, nullptr, block, columns_to_filter,
|
||||
origin_column_num)));
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -220,7 +220,6 @@ private:
|
||||
const TupleDescriptor* _tuple_descriptor = nullptr;
|
||||
const RowDescriptor* _row_descriptor = nullptr;
|
||||
const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
|
||||
VExprContextSPtrs _not_single_slot_filter_conjuncts;
|
||||
const std::unordered_map<int, VExprContextSPtrs>* _slot_id_to_filter_conjuncts = nullptr;
|
||||
VExprContextSPtrs _dict_filter_conjuncts;
|
||||
VExprContextSPtrs _filter_conjuncts;
|
||||
|
||||
Reference in New Issue
Block a user