[Opt](parquet/orc-reader) Opt get dict ids in _rewrite_dict_predicates() (#40095)
## Proposed changes backport #39893.
This commit is contained in:
@ -2054,7 +2054,6 @@ Status OrcReader::on_string_dicts_loaded(
|
||||
orc::StringDictionary* dict = file_column_name_to_dict_map_iter->second;
|
||||
|
||||
std::vector<StringRef> dict_values;
|
||||
std::unordered_map<StringRef, int64_t> dict_value_to_code;
|
||||
size_t max_value_length = 0;
|
||||
uint64_t dictionaryCount = dict->dictionaryOffset.size() - 1;
|
||||
if (dictionaryCount == 0) {
|
||||
@ -2074,7 +2073,6 @@ Status OrcReader::on_string_dicts_loaded(
|
||||
max_value_length = length;
|
||||
}
|
||||
dict_values.emplace_back(dict_value);
|
||||
dict_value_to_code[dict_value] = i;
|
||||
}
|
||||
dict_value_column->insert_many_strings_overflow(&dict_values[0], dict_values.size(),
|
||||
max_value_length);
|
||||
@ -2113,31 +2111,37 @@ Status OrcReader::on_string_dicts_loaded(
|
||||
++index;
|
||||
}
|
||||
|
||||
// 2.2 Execute conjuncts and filter block.
|
||||
std::vector<uint32_t> columns_to_filter(1, dict_pos);
|
||||
int column_to_keep = temp_block.columns();
|
||||
// 2.2 Execute conjuncts.
|
||||
if (dict_pos != 0) {
|
||||
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
|
||||
// The following process may be tricky and time-consuming, but we have no other way.
|
||||
temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
|
||||
}
|
||||
RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
|
||||
ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep)));
|
||||
IColumn::Filter result_filter(temp_block.rows(), 1);
|
||||
bool can_filter_all;
|
||||
RETURN_IF_ERROR(VExprContext::execute_conjuncts(ctxs, nullptr, &temp_block, &result_filter,
|
||||
&can_filter_all));
|
||||
if (dict_pos != 0) {
|
||||
// We have to clean the first column to insert right data.
|
||||
temp_block.get_by_position(0).column->assume_mutable()->clear();
|
||||
}
|
||||
|
||||
// Check some conditions.
|
||||
ColumnPtr& dict_column = temp_block.get_by_position(dict_pos).column;
|
||||
// If dict_column->size() == 0, can filter this stripe.
|
||||
if (dict_column->size() == 0) {
|
||||
// If can_filter_all = true, can filter this stripe.
|
||||
if (can_filter_all) {
|
||||
*is_stripe_filtered = true;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// 3. Get dict codes.
|
||||
std::vector<int32_t> dict_codes;
|
||||
for (size_t i = 0; i < result_filter.size(); ++i) {
|
||||
if (result_filter[i]) {
|
||||
dict_codes.emplace_back(i);
|
||||
}
|
||||
}
|
||||
|
||||
// About Performance: if dict_column size is too large, it will generate a large IN filter.
|
||||
if (dict_column->size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
|
||||
if (dict_codes.size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
|
||||
it = _dict_filter_cols.erase(it);
|
||||
for (auto& ctx : ctxs) {
|
||||
_non_dict_filter_conjuncts.emplace_back(ctx);
|
||||
@ -2145,26 +2149,9 @@ Status OrcReader::on_string_dicts_loaded(
|
||||
continue;
|
||||
}
|
||||
|
||||
// 3. Get dict codes.
|
||||
std::vector<int32_t> dict_codes;
|
||||
if (dict_column->is_nullable()) {
|
||||
const ColumnNullable* nullable_column =
|
||||
static_cast<const ColumnNullable*>(dict_column.get());
|
||||
const ColumnString* nested_column = static_cast<const ColumnString*>(
|
||||
nullable_column->get_nested_column_ptr().get());
|
||||
for (int i = 0; i < nested_column->size(); ++i) {
|
||||
StringRef dict_value = nested_column->get_data_at(i);
|
||||
dict_codes.emplace_back(dict_value_to_code[dict_value]);
|
||||
}
|
||||
} else {
|
||||
for (int i = 0; i < dict_column->size(); ++i) {
|
||||
StringRef dict_value = dict_column->get_data_at(i);
|
||||
dict_codes.emplace_back(dict_value_to_code[dict_value]);
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Rewrite conjuncts.
|
||||
RETURN_IF_ERROR(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable()));
|
||||
RETURN_IF_ERROR(_rewrite_dict_conjuncts(
|
||||
dict_codes, slot_id, temp_block.get_by_position(dict_pos).column->is_nullable()));
|
||||
++it;
|
||||
}
|
||||
return Status::OK();
|
||||
|
||||
@ -44,7 +44,6 @@ Status ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t
|
||||
total_length += l;
|
||||
}
|
||||
|
||||
_dict_value_to_code.reserve(num_values);
|
||||
// For insert_many_strings_overflow
|
||||
_dict_data.resize(total_length + ColumnString::MAX_STRINGS_OVERFLOW_SIZE);
|
||||
_max_value_length = 0;
|
||||
@ -55,7 +54,6 @@ Status ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t
|
||||
offset_cursor += 4;
|
||||
memcpy(&_dict_data[offset], dict_item_address + offset_cursor, l);
|
||||
_dict_items.emplace_back(&_dict_data[offset], l);
|
||||
_dict_value_to_code[StringRef(&_dict_data[offset], l)] = i;
|
||||
offset_cursor += l;
|
||||
offset += l;
|
||||
if (offset_cursor > length) {
|
||||
@ -77,15 +75,6 @@ Status ByteArrayDictDecoder::read_dict_values_to_column(MutableColumnPtr& doris_
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status ByteArrayDictDecoder::get_dict_codes(const ColumnString* string_column,
|
||||
std::vector<int32_t>* dict_codes) {
|
||||
for (int i = 0; i < string_column->size(); ++i) {
|
||||
StringRef dict_value = string_column->get_data_at(i);
|
||||
dict_codes->emplace_back(_dict_value_to_code[dict_value]);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
MutableColumnPtr ByteArrayDictDecoder::convert_dict_column_to_string_column(
|
||||
const ColumnInt32* dict_column) {
|
||||
auto res = ColumnString::create();
|
||||
|
||||
@ -54,9 +54,6 @@ public:
|
||||
|
||||
Status read_dict_values_to_column(MutableColumnPtr& doris_column) override;
|
||||
|
||||
Status get_dict_codes(const ColumnString* column_string,
|
||||
std::vector<int32_t>* dict_codes) override;
|
||||
|
||||
MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override;
|
||||
|
||||
protected:
|
||||
@ -64,6 +61,5 @@ protected:
|
||||
std::vector<StringRef> _dict_items;
|
||||
std::vector<uint8_t> _dict_data;
|
||||
size_t _max_value_length;
|
||||
std::unordered_map<StringRef, int32_t> _dict_value_to_code;
|
||||
};
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -78,11 +78,6 @@ public:
|
||||
return Status::NotSupported("read_dict_values_to_column is not supported");
|
||||
}
|
||||
|
||||
virtual Status get_dict_codes(const ColumnString* column_string,
|
||||
std::vector<int32_t>* dict_codes) {
|
||||
return Status::NotSupported("get_dict_codes is not supported");
|
||||
}
|
||||
|
||||
virtual MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) {
|
||||
LOG(FATAL) << "Method convert_dict_column_to_string_column is not supported";
|
||||
__builtin_unreachable();
|
||||
|
||||
@ -109,10 +109,8 @@ protected:
|
||||
_dict = std::move(dict);
|
||||
char* dict_item_address = reinterpret_cast<char*>(_dict.get());
|
||||
_dict_items.resize(num_values);
|
||||
_dict_value_to_code.reserve(num_values);
|
||||
for (size_t i = 0; i < num_values; ++i) {
|
||||
_dict_items[i] = dict_item_address;
|
||||
_dict_value_to_code[StringRef(_dict_items[i], _type_length)] = i;
|
||||
dict_item_address += _type_length;
|
||||
}
|
||||
return Status::OK();
|
||||
@ -128,17 +126,6 @@ protected:
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status get_dict_codes(const ColumnString* string_column,
|
||||
std::vector<int32_t>* dict_codes) override {
|
||||
size_t size = string_column->size();
|
||||
dict_codes->reserve(size);
|
||||
for (int i = 0; i < size; ++i) {
|
||||
StringRef dict_value = string_column->get_data_at(i);
|
||||
dict_codes->emplace_back(_dict_value_to_code[dict_value]);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override {
|
||||
auto res = ColumnString::create();
|
||||
std::vector<StringRef> dict_values(dict_column->size());
|
||||
@ -149,7 +136,6 @@ protected:
|
||||
res->insert_many_strings(&dict_values[0], dict_values.size());
|
||||
return res;
|
||||
}
|
||||
std::unordered_map<StringRef, int32_t> _dict_value_to_code;
|
||||
// For dictionary encoding
|
||||
std::vector<char*> _dict_items;
|
||||
};
|
||||
|
||||
@ -183,11 +183,6 @@ public:
|
||||
->read_dict_values_to_column(doris_column);
|
||||
}
|
||||
|
||||
Status get_dict_codes(const ColumnString* column_string, std::vector<int32_t>* dict_codes) {
|
||||
return _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)]->get_dict_codes(
|
||||
column_string, dict_codes);
|
||||
}
|
||||
|
||||
MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) {
|
||||
return _decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)]
|
||||
->convert_dict_column_to_string_column(dict_column);
|
||||
|
||||
@ -454,11 +454,6 @@ Status ScalarColumnReader::read_dict_values_to_column(MutableColumnPtr& doris_co
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status ScalarColumnReader::get_dict_codes(const ColumnString* column_string,
|
||||
std::vector<int32_t>* dict_codes) {
|
||||
return _chunk_reader->get_dict_codes(column_string, dict_codes);
|
||||
}
|
||||
|
||||
MutableColumnPtr ScalarColumnReader::convert_dict_column_to_string_column(
|
||||
const ColumnInt32* dict_column) {
|
||||
return _chunk_reader->convert_dict_column_to_string_column(dict_column);
|
||||
|
||||
@ -128,11 +128,6 @@ public:
|
||||
return Status::NotSupported("read_dict_values_to_column is not supported");
|
||||
}
|
||||
|
||||
virtual Status get_dict_codes(const ColumnString* column_string,
|
||||
std::vector<int32_t>* dict_codes) {
|
||||
return Status::NotSupported("get_dict_codes is not supported");
|
||||
}
|
||||
|
||||
virtual MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) {
|
||||
LOG(FATAL) << "Method convert_dict_column_to_string_column is not supported";
|
||||
__builtin_unreachable();
|
||||
@ -180,8 +175,6 @@ public:
|
||||
ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows,
|
||||
bool* eof, bool is_dict_filter) override;
|
||||
Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) override;
|
||||
Status get_dict_codes(const ColumnString* column_string,
|
||||
std::vector<int32_t>* dict_codes) override;
|
||||
MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override;
|
||||
const std::vector<level_t>& get_rep_level() const override { return _rep_levels; }
|
||||
const std::vector<level_t>& get_def_level() const override { return _def_levels; }
|
||||
|
||||
@ -841,7 +841,7 @@ Status RowGroupReader::_rewrite_dict_predicates() {
|
||||
++index;
|
||||
}
|
||||
|
||||
// 2.2 Execute conjuncts and filter block.
|
||||
// 2.2 Execute conjuncts.
|
||||
VExprContextSPtrs ctxs;
|
||||
auto iter = _slot_id_to_filter_conjuncts->find(slot_id);
|
||||
if (iter != _slot_id_to_filter_conjuncts->end()) {
|
||||
@ -854,33 +854,39 @@ Status RowGroupReader::_rewrite_dict_predicates() {
|
||||
return Status::NotFound(msg.str());
|
||||
}
|
||||
|
||||
std::vector<uint32_t> columns_to_filter(1, dict_pos);
|
||||
int column_to_keep = temp_block.columns();
|
||||
if (dict_pos != 0) {
|
||||
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
|
||||
// The following process may be tricky and time-consuming, but we have no other way.
|
||||
temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
|
||||
}
|
||||
IColumn::Filter result_filter(temp_block.rows(), 1);
|
||||
bool can_filter_all;
|
||||
{
|
||||
SCOPED_RAW_TIMER(&_predicate_filter_time);
|
||||
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts_and_filter_block(
|
||||
ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep));
|
||||
RETURN_IF_ERROR(VExprContext::execute_conjuncts(ctxs, nullptr, &temp_block,
|
||||
&result_filter, &can_filter_all));
|
||||
}
|
||||
if (dict_pos != 0) {
|
||||
// We have to clean the first column to insert right data.
|
||||
temp_block.get_by_position(0).column->assume_mutable()->clear();
|
||||
}
|
||||
|
||||
// Check some conditions.
|
||||
ColumnPtr& dict_column = temp_block.get_by_position(dict_pos).column;
|
||||
// If dict_column->size() == 0, can filter this row group.
|
||||
if (dict_column->size() == 0) {
|
||||
// If can_filter_all = true, can filter this row group.
|
||||
if (can_filter_all) {
|
||||
_is_row_group_filtered = true;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// 3. Get dict codes.
|
||||
std::vector<int32_t> dict_codes;
|
||||
for (size_t i = 0; i < result_filter.size(); ++i) {
|
||||
if (result_filter[i]) {
|
||||
dict_codes.emplace_back(i);
|
||||
}
|
||||
}
|
||||
|
||||
// About Performance: if dict_column size is too large, it will generate a large IN filter.
|
||||
if (dict_column->size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
|
||||
if (dict_codes.size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
|
||||
it = _dict_filter_cols.erase(it);
|
||||
for (auto& ctx : ctxs) {
|
||||
_filter_conjuncts.push_back(ctx);
|
||||
@ -888,22 +894,9 @@ Status RowGroupReader::_rewrite_dict_predicates() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 3. Get dict codes.
|
||||
std::vector<int32_t> dict_codes;
|
||||
if (dict_column->is_nullable()) {
|
||||
const ColumnNullable* nullable_column =
|
||||
static_cast<const ColumnNullable*>(dict_column.get());
|
||||
const ColumnString* nested_column = static_cast<const ColumnString*>(
|
||||
nullable_column->get_nested_column_ptr().get());
|
||||
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes(
|
||||
assert_cast<const ColumnString*>(nested_column), &dict_codes));
|
||||
} else {
|
||||
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes(
|
||||
assert_cast<const ColumnString*>(dict_column.get()), &dict_codes));
|
||||
}
|
||||
|
||||
// 4. Rewrite conjuncts.
|
||||
RETURN_IF_ERROR(_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable()));
|
||||
RETURN_IF_ERROR(_rewrite_dict_conjuncts(
|
||||
dict_codes, slot_id, temp_block.get_by_position(dict_pos).column->is_nullable()));
|
||||
++it;
|
||||
}
|
||||
return Status::OK();
|
||||
|
||||
Reference in New Issue
Block a user