[Refactor](multi-catalog) Refactor to process splitted conjuncts for dict filter. (#21459)
Conjuncts are currently split, so refactor source code to handle split conjuncts for dict filters.
This commit is contained in:
@ -156,47 +156,30 @@ Status VFileScanner::prepare(
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VFileScanner::_split_conjuncts() {
|
||||
Status VFileScanner::_process_conjuncts_for_dict_filter() {
|
||||
for (auto& conjunct : _conjuncts) {
|
||||
RETURN_IF_ERROR(_split_conjuncts_expr(conjunct, conjunct->root()));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
Status VFileScanner::_split_conjuncts_expr(const VExprContextSPtr& context,
|
||||
const VExprSPtr& conjunct_expr_root) {
|
||||
static constexpr auto is_leaf = [](const auto& expr) { return !expr->is_and_expr(); };
|
||||
if (conjunct_expr_root) {
|
||||
if (is_leaf(conjunct_expr_root)) {
|
||||
auto impl = conjunct_expr_root->get_impl();
|
||||
// If impl is not null, which means this a conjuncts from runtime filter.
|
||||
auto cur_expr = impl ? impl : conjunct_expr_root;
|
||||
VExprContextSPtr new_ctx = VExprContext::create_shared(cur_expr);
|
||||
context->clone_fn_contexts(new_ctx.get());
|
||||
RETURN_IF_ERROR(new_ctx->prepare(_state, *_default_val_row_desc));
|
||||
RETURN_IF_ERROR(new_ctx->open(_state));
|
||||
auto impl = conjunct->root()->get_impl();
|
||||
// If impl is not null, which means this a conjuncts from runtime filter.
|
||||
auto cur_expr = impl ? impl : conjunct->root();
|
||||
|
||||
std::vector<int> slot_ids;
|
||||
_get_slot_ids(cur_expr.get(), &slot_ids);
|
||||
if (slot_ids.size() == 0) {
|
||||
_not_single_slot_filter_conjuncts.emplace_back(new_ctx);
|
||||
return Status::OK();
|
||||
}
|
||||
bool single_slot = true;
|
||||
for (int i = 1; i < slot_ids.size(); i++) {
|
||||
if (slot_ids[i] != slot_ids[0]) {
|
||||
single_slot = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (single_slot) {
|
||||
SlotId slot_id = slot_ids[0];
|
||||
_slot_id_to_filter_conjuncts[slot_id].emplace_back(new_ctx);
|
||||
} else {
|
||||
_not_single_slot_filter_conjuncts.emplace_back(new_ctx);
|
||||
std::vector<int> slot_ids;
|
||||
_get_slot_ids(cur_expr.get(), &slot_ids);
|
||||
if (slot_ids.size() == 0) {
|
||||
_not_single_slot_filter_conjuncts.emplace_back(conjunct);
|
||||
return Status::OK();
|
||||
}
|
||||
bool single_slot = true;
|
||||
for (int i = 1; i < slot_ids.size(); i++) {
|
||||
if (slot_ids[i] != slot_ids[0]) {
|
||||
single_slot = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (single_slot) {
|
||||
SlotId slot_id = slot_ids[0];
|
||||
_slot_id_to_filter_conjuncts[slot_id].emplace_back(conjunct);
|
||||
} else {
|
||||
RETURN_IF_ERROR(_split_conjuncts_expr(context, conjunct_expr_root->children()[0]));
|
||||
RETURN_IF_ERROR(_split_conjuncts_expr(context, conjunct_expr_root->children()[1]));
|
||||
_not_single_slot_filter_conjuncts.emplace_back(conjunct);
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
@ -918,7 +901,7 @@ Status VFileScanner::_init_expr_ctxes() {
|
||||
|
||||
// TODO: It should can move to scan node to process.
|
||||
if (!_conjuncts.empty()) {
|
||||
_split_conjuncts();
|
||||
_process_conjuncts_for_dict_filter();
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -185,9 +185,7 @@ private:
|
||||
Status _convert_to_output_block(Block* block);
|
||||
Status _generate_fill_columns();
|
||||
Status _handle_dynamic_block(Block* block);
|
||||
Status _split_conjuncts();
|
||||
Status _split_conjuncts_expr(const VExprContextSPtr& context,
|
||||
const VExprSPtr& conjunct_expr_root);
|
||||
Status _process_conjuncts_for_dict_filter();
|
||||
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
|
||||
|
||||
void _reset_counter() {
|
||||
|
||||
Reference in New Issue
Block a user