[fix](runtime filter) append late arrival runtime filters in vfilecanner (#25996)
`VFileScanner` will try to append late arrival runtime filters in each loop of `ScannerScheduler::_scanner_scan`. However, `VFileScanner::_get_next_reader` only generates the `_push_down_conjuncts` in the first loop, so the late arrival runtime filters are ignored.
This commit is contained in:
@ -166,6 +166,8 @@ Status VFileScanner::prepare(
|
||||
ADD_TIMER(_parent->_scanner_profile, "FileScannerConvertOuputBlockTime");
|
||||
_empty_file_counter = ADD_COUNTER(_parent->_scanner_profile, "EmptyFileNum", TUnit::UNIT);
|
||||
_file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber", TUnit::UNIT);
|
||||
_has_fully_rf_file_counter =
|
||||
ADD_COUNTER(_parent->_scanner_profile, "HasFullyRfFileNumber", TUnit::UNIT);
|
||||
} else {
|
||||
_get_block_timer = ADD_TIMER(_local_state->scanner_profile(), "FileScannerGetBlockTime");
|
||||
_open_reader_timer =
|
||||
@ -182,6 +184,8 @@ Status VFileScanner::prepare(
|
||||
_empty_file_counter =
|
||||
ADD_COUNTER(_local_state->scanner_profile(), "EmptyFileNum", TUnit::UNIT);
|
||||
_file_counter = ADD_COUNTER(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT);
|
||||
_has_fully_rf_file_counter =
|
||||
ADD_COUNTER(_local_state->scanner_profile(), "HasFullyRfFileNumber", TUnit::UNIT);
|
||||
}
|
||||
|
||||
_file_cache_statistics.reset(new io::FileCacheStatistics());
|
||||
@ -222,7 +226,9 @@ Status VFileScanner::prepare(
|
||||
}
|
||||
|
||||
Status VFileScanner::_process_conjuncts_for_dict_filter() {
|
||||
for (auto& conjunct : _conjuncts) {
|
||||
_slot_id_to_filter_conjuncts.clear();
|
||||
_not_single_slot_filter_conjuncts.clear();
|
||||
for (auto& conjunct : _push_down_conjuncts) {
|
||||
auto impl = conjunct->root()->get_impl();
|
||||
// If impl is not null, which means this a conjuncts from runtime filter.
|
||||
auto cur_expr = impl ? impl : conjunct->root();
|
||||
@ -250,6 +256,22 @@ Status VFileScanner::_process_conjuncts_for_dict_filter() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VFileScanner::_process_late_arrival_conjuncts() {
|
||||
if (_push_down_conjuncts.size() < _conjuncts.size()) {
|
||||
_push_down_conjuncts.clear();
|
||||
_push_down_conjuncts.resize(_conjuncts.size());
|
||||
for (size_t i = 0; i != _conjuncts.size(); ++i) {
|
||||
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
|
||||
}
|
||||
RETURN_IF_ERROR(_process_conjuncts_for_dict_filter());
|
||||
_discard_conjuncts();
|
||||
}
|
||||
if (_applied_rf_num == _total_rf_num) {
|
||||
COUNTER_UPDATE(_has_fully_rf_file_counter, 1);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void VFileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
|
||||
for (auto& child_expr : expr->children()) {
|
||||
if (child_expr->is_slot_ref()) {
|
||||
@ -766,12 +788,8 @@ Status VFileScanner::_get_next_reader() {
|
||||
SCOPED_TIMER(_open_reader_timer);
|
||||
RETURN_IF_ERROR(parquet_reader->open());
|
||||
}
|
||||
if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
|
||||
_push_down_conjuncts.resize(_conjuncts.size());
|
||||
for (size_t i = 0; i != _conjuncts.size(); ++i) {
|
||||
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
|
||||
}
|
||||
_discard_conjuncts();
|
||||
if (push_down_predicates) {
|
||||
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
|
||||
}
|
||||
if (range.__isset.table_format_params &&
|
||||
range.table_format_params.table_format_type == "iceberg") {
|
||||
@ -802,12 +820,8 @@ Status VFileScanner::_get_next_reader() {
|
||||
std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
|
||||
_profile, _state, *_params, range, _state->query_options().batch_size,
|
||||
_state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat);
|
||||
if (push_down_predicates && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
|
||||
_push_down_conjuncts.resize(_conjuncts.size());
|
||||
for (size_t i = 0; i != _conjuncts.size(); ++i) {
|
||||
RETURN_IF_ERROR(_conjuncts[i]->clone(_state, _push_down_conjuncts[i]));
|
||||
}
|
||||
_discard_conjuncts();
|
||||
if (push_down_predicates) {
|
||||
RETURN_IF_ERROR(_process_late_arrival_conjuncts());
|
||||
}
|
||||
if (range.__isset.table_format_params &&
|
||||
range.table_format_params.table_format_type == "transactional_hive") {
|
||||
@ -1080,10 +1094,6 @@ Status VFileScanner::_init_expr_ctxes() {
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO: It should can move to scan node to process.
|
||||
if (!_conjuncts.empty()) {
|
||||
static_cast<void>(_process_conjuncts_for_dict_filter());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -176,6 +176,7 @@ private:
|
||||
RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
|
||||
RuntimeProfile::Counter* _empty_file_counter = nullptr;
|
||||
RuntimeProfile::Counter* _file_counter = nullptr;
|
||||
RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr;
|
||||
|
||||
const std::unordered_map<std::string, int>* _col_name_to_slot_id;
|
||||
// single slot filter conjuncts
|
||||
@ -206,6 +207,7 @@ private:
|
||||
Status _generate_fill_columns();
|
||||
Status _handle_dynamic_block(Block* block);
|
||||
Status _process_conjuncts_for_dict_filter();
|
||||
Status _process_late_arrival_conjuncts();
|
||||
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
|
||||
|
||||
void _reset_counter() {
|
||||
|
||||
Reference in New Issue
Block a user