From 56e036e68b0ff73d5a57e6a64e6232f4f40e319c Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Wed, 20 Jul 2022 12:36:57 +0800 Subject: [PATCH] [feature-wip](multi-catalog) Support runtime filter for file scan node (#11000) * [feature-wip](multi-catalog) Support runtime filter for file scan node Co-authored-by: morningman --- be/src/exec/arrow/arrow_reader.h | 1 + .../exec/arrow/parquet_row_group_reader.cpp | 12 +- be/src/exprs/runtime_filter.cpp | 5 +- be/src/io/buffered_reader.cpp | 14 +- be/src/io/buffered_reader.h | 3 + be/src/runtime/runtime_filter_mgr.cpp | 2 +- be/src/vec/exec/file_arrow_scanner.cpp | 12 +- be/src/vec/exec/file_arrow_scanner.h | 1 + be/src/vec/exec/file_scan_node.cpp | 131 +++++++++++++++++- be/src/vec/exec/file_scan_node.h | 23 +++ be/src/vec/exec/file_scanner.h | 8 ++ be/src/vec/exec/file_text_scanner.cpp | 2 - be/src/vec/exprs/vexpr_context.cpp | 2 +- .../doris/planner/RuntimeFilterGenerator.java | 3 +- .../external/ExternalFileScanNode.java | 8 +- 15 files changed, 204 insertions(+), 23 deletions(-) diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h index ad888061be..1389426fc9 100644 --- a/be/src/exec/arrow/arrow_reader.h +++ b/be/src/exec/arrow/arrow_reader.h @@ -55,6 +55,7 @@ struct Statistics { int64_t filtered_rows = 0; int64_t total_rows = 0; int64_t filtered_total_bytes = 0; + int64_t total_bytes = 0; }; class ArrowFile : public arrow::io::RandomAccessFile { diff --git a/be/src/exec/arrow/parquet_row_group_reader.cpp b/be/src/exec/arrow/parquet_row_group_reader.cpp index 517309db76..a2f83c8f21 100644 --- a/be/src/exec/arrow/parquet_row_group_reader.cpp +++ b/be/src/exec/arrow/parquet_row_group_reader.cpp @@ -139,9 +139,10 @@ Status RowGroupReader::init_filter_groups(const TupleDescriptor* tuple_desc, std::unordered_set parquet_column_ids(include_column_ids.begin(), include_column_ids.end()); _init_conjuncts(tuple_desc, map_column, parquet_column_ids); - _parent->statistics()->total_groups = total_group; - _parent->statistics()->total_rows = _file_metadata->num_rows(); + int64_t total_groups = 0; + int64_t total_rows = 0; + int64_t total_bytes = 0; bool update_statistics = false; for (int row_group_id = 0; row_group_id < total_group; row_group_id++) { auto row_group_meta = _file_metadata->RowGroup(row_group_id); @@ -156,6 +157,10 @@ Status RowGroupReader::init_filter_groups(const TupleDescriptor* tuple_desc, continue; } } + // only include row groups in range + ++total_groups; + total_rows += row_group_meta->num_rows(); + total_bytes += row_group_meta->total_byte_size(); // if head_read_offset <= start_offset < end_offset <= tail_read_offset for (SlotId slot_id = 0; slot_id < tuple_desc->slots().size(); slot_id++) { const std::string& col_name = tuple_desc->slots()[slot_id]->col_name(); @@ -189,6 +194,9 @@ Status RowGroupReader::init_filter_groups(const TupleDescriptor* tuple_desc, } } } + _parent->statistics()->total_groups = total_groups; + _parent->statistics()->total_rows = total_rows; + _parent->statistics()->total_bytes = total_bytes; if (update_statistics) { _parent->statistics()->filtered_row_groups = _filtered_num_row_groups; diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 262247f864..63a4d148f9 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1013,9 +1013,8 @@ Status IRuntimeFilter::publish() { if (_has_local_target) { IRuntimeFilter* consumer_filter = nullptr; // TODO: log if err - Status status = - _state->runtime_filter_mgr()->get_consume_filter(_filter_id, &consumer_filter); - DCHECK(status.ok()); + RETURN_IF_ERROR( + _state->runtime_filter_mgr()->get_consume_filter(_filter_id, &consumer_filter)); // push down std::swap(this->_wrapper, consumer_filter->_wrapper); consumer_filter->update_runtime_filter_type_to_profile(); diff --git a/be/src/io/buffered_reader.cpp b/be/src/io/buffered_reader.cpp index 97f04dbbcd..bd0deb823c 100644 --- a/be/src/io/buffered_reader.cpp +++ b/be/src/io/buffered_reader.cpp @@ -54,9 +54,15 @@ Status BufferedReader::open() { // the macro ADD_XXX is idempotent. // So although each scanner calls the ADD_XXX method, they all use the same counters. _read_timer = ADD_TIMER(_profile, "FileReadTime"); - _remote_read_timer = ADD_CHILD_TIMER(_profile, "FileRemoteReadTime", "FileReadTime"); + _remote_read_timer = ADD_TIMER(_profile, "FileRemoteReadTime"); _read_counter = ADD_COUNTER(_profile, "FileReadCalls", TUnit::UNIT); _remote_read_counter = ADD_COUNTER(_profile, "FileRemoteReadCalls", TUnit::UNIT); + _remote_read_bytes = ADD_COUNTER(_profile, "FileRemoteReadBytes", TUnit::BYTES); + _remote_read_rate = _profile->add_derived_counter( + "FileRemoteReadRate", TUnit::BYTES_PER_SECOND, + std::bind(&RuntimeProfile::units_per_second, _remote_read_bytes, + _remote_read_timer), + ""); RETURN_IF_ERROR(_reader->open()); return Status::OK(); @@ -113,6 +119,8 @@ Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* byt auto st = _reader->readat(position, nbytes, bytes_read, out); if (st.ok()) { _cur_offset = position + *bytes_read; + ++_remote_read_count; + _remote_bytes += *bytes_read; } return st; } @@ -138,6 +146,7 @@ Status BufferedReader::_fill() { RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, &bytes_read, _buffer)); _buffer_limit = _buffer_offset + bytes_read; ++_remote_read_count; + _remote_bytes += bytes_read; } return Status::OK(); } @@ -166,6 +175,9 @@ void BufferedReader::close() { if (_remote_read_counter != nullptr) { COUNTER_UPDATE(_remote_read_counter, _remote_read_count); } + if (_remote_read_bytes != nullptr) { + COUNTER_UPDATE(_remote_read_bytes, _remote_bytes); + } } bool BufferedReader::closed() { diff --git a/be/src/io/buffered_reader.h b/be/src/io/buffered_reader.h index c853606bd5..75dec5cf7f 100644 --- a/be/src/io/buffered_reader.h +++ b/be/src/io/buffered_reader.h @@ -68,6 +68,7 @@ private: int64_t _read_count = 0; int64_t _remote_read_count = 0; + int64_t _remote_bytes = 0; // total time cost in this reader RuntimeProfile::Counter* _read_timer = nullptr; @@ -77,6 +78,8 @@ private: RuntimeProfile::Counter* _read_counter = nullptr; // counter of calling "remote read()" RuntimeProfile::Counter* _remote_read_counter = nullptr; + RuntimeProfile::Counter* _remote_read_bytes = nullptr; + RuntimeProfile::Counter* _remote_read_rate = nullptr; }; } // namespace doris diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 654f865729..223be0a34d 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -63,7 +63,7 @@ Status RuntimeFilterMgr::get_filter_by_role(const int filter_id, const RuntimeFi auto iter = filter_map->find(key); if (iter == filter_map->end()) { - LOG(WARNING) << "unknown filter...:" << key << ",role:" << (int)role; + LOG(WARNING) << "unknown runtime filter: " << key << ", role:" << (int)role; return Status::InvalidArgument("unknown filter"); } *target = iter->second.filter; diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp index 55199335bd..65b4a032ad 100644 --- a/be/src/vec/exec/file_arrow_scanner.cpp +++ b/be/src/vec/exec/file_arrow_scanner.cpp @@ -198,6 +198,7 @@ void VFileParquetScanner::_update_profile(std::shared_ptr& statistic COUNTER_UPDATE(_filtered_bytes_counter, statistics->filtered_total_bytes); COUNTER_UPDATE(_total_rows_counter, statistics->total_rows); COUNTER_UPDATE(_total_groups_counter, statistics->total_groups); + COUNTER_UPDATE(_total_bytes_counter, statistics->total_bytes); } void FileArrowScanner::close() { @@ -226,11 +227,12 @@ ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(FileReader* file_reader, } void VFileParquetScanner::_init_profiles(RuntimeProfile* profile) { - _filtered_row_groups_counter = ADD_COUNTER(_profile, "ParquetFilteredRowGroups", TUnit::UNIT); - _filtered_rows_counter = ADD_COUNTER(_profile, "FileFilteredRows", TUnit::UNIT); - _filtered_bytes_counter = ADD_COUNTER(_profile, "FileFilteredBytes", TUnit::BYTES); - _total_rows_counter = ADD_COUNTER(_profile, "FileTotalRows", TUnit::UNIT); - _total_groups_counter = ADD_COUNTER(_profile, "ParquetTotalRowGroups", TUnit::UNIT); + _filtered_row_groups_counter = ADD_COUNTER(_profile, "ParquetRowGroupsFiltered", TUnit::UNIT); + _filtered_rows_counter = ADD_COUNTER(_profile, "ParquetRowsFiltered", TUnit::UNIT); + _filtered_bytes_counter = ADD_COUNTER(_profile, "ParquetBytesFiltered", TUnit::BYTES); + _total_rows_counter = ADD_COUNTER(_profile, "ParquetRowsTotal", TUnit::UNIT); + _total_groups_counter = ADD_COUNTER(_profile, "ParquetRowGroupsTotal", TUnit::UNIT); + _total_bytes_counter = ADD_COUNTER(_profile, "ParquetBytesTotal", TUnit::BYTES); } VFileORCScanner::VFileORCScanner(RuntimeState* state, RuntimeProfile* profile, diff --git a/be/src/vec/exec/file_arrow_scanner.h b/be/src/vec/exec/file_arrow_scanner.h index 3f7537c134..46117cecb0 100644 --- a/be/src/vec/exec/file_arrow_scanner.h +++ b/be/src/vec/exec/file_arrow_scanner.h @@ -95,6 +95,7 @@ private: RuntimeProfile::Counter* _filtered_bytes_counter; RuntimeProfile::Counter* _total_rows_counter; RuntimeProfile::Counter* _total_groups_counter; + RuntimeProfile::Counter* _total_bytes_counter; }; class VFileORCScanner final : public FileArrowScanner { diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp index ad79cc0536..1141d603f3 100644 --- a/be/src/vec/exec/file_scan_node.cpp +++ b/be/src/vec/exec/file_scan_node.cpp @@ -20,6 +20,7 @@ #include "common/config.h" #include "gen_cpp/PlanNodes_types.h" #include "runtime/mem_tracker.h" +#include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_state.h" #include "runtime/string_value.h" #include "runtime/tuple.h" @@ -30,6 +31,8 @@ #include "util/types.h" #include "vec/exec/file_arrow_scanner.h" #include "vec/exec/file_text_scanner.h" +#include "vec/exprs/vcompound_pred.h" +#include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" namespace doris::vectorized { @@ -42,7 +45,10 @@ FileScanNode::FileScanNode(ObjectPool* pool, const TPlanNode& tnode, const Descr _num_running_scanners(0), _scan_finished(false), _max_buffered_batches(32), - _wait_scanner_timer(nullptr) {} + _wait_scanner_timer(nullptr), + _runtime_filter_descs(tnode.runtime_filters) { + LOG(WARNING) << "file scan node runtime filter size=" << _runtime_filter_descs.size(); +} Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ScanNode::init(tnode, state)); @@ -52,6 +58,22 @@ Status FileScanNode::init(const TPlanNode& tnode, RuntimeState* state) { _pre_filter_texprs = file_scan_node.pre_filter_exprs; } + int filter_size = _runtime_filter_descs.size(); + _runtime_filter_ctxs.resize(filter_size); + _runtime_filter_ready_flag.resize(filter_size); + for (int i = 0; i < filter_size; ++i) { + IRuntimeFilter* runtime_filter = nullptr; + const auto& filter_desc = _runtime_filter_descs[i]; + RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter( + RuntimeFilterRole::CONSUMER, filter_desc, state->query_options(), id())); + RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, + &runtime_filter)); + + _runtime_filter_ctxs[i].runtimefilter = runtime_filter; + _runtime_filter_ready_flag[i] = false; + _rf_locks.push_back(std::make_unique()); + } + return Status::OK(); } @@ -80,6 +102,9 @@ Status FileScanNode::prepare(RuntimeState* state) { // Profile _wait_scanner_timer = ADD_TIMER(runtime_profile(), "WaitScannerTime"); + _filter_timer = ADD_TIMER(runtime_profile(), "PredicateFilteredTime"); + _num_rows_filtered = ADD_COUNTER(runtime_profile(), "PredicateFilteredRows", TUnit::UNIT); + _num_scanners = ADD_COUNTER(runtime_profile(), "NumScanners", TUnit::UNIT); return Status::OK(); } @@ -90,11 +115,94 @@ Status FileScanNode::open(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::open(state)); RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(_acquire_and_build_runtime_filter(state)); + RETURN_IF_ERROR(start_scanners()); return Status::OK(); } +Status FileScanNode::_acquire_and_build_runtime_filter(RuntimeState* state) { + // acquire runtime filter + _runtime_filter_ctxs.resize(_runtime_filter_descs.size()); + for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) { + auto& filter_desc = _runtime_filter_descs[i]; + IRuntimeFilter* runtime_filter = nullptr; + state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter); + DCHECK(runtime_filter != nullptr); + if (runtime_filter == nullptr) { + continue; + } + bool ready = runtime_filter->is_ready(); + if (!ready) { + ready = runtime_filter->await(); + } + if (ready) { + _runtime_filter_ctxs[i].apply_mark = true; + _runtime_filter_ctxs[i].runtimefilter = runtime_filter; + + // TODO: currently, after calling get_push_expr_ctxs(), the func ptr in runtime_filter + // will be released, and it will not be used again for building vexpr. + // + // std::list expr_context; + // RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(&expr_context)); + // for (auto ctx : expr_context) { + // ctx->prepare(state, row_desc(), _expr_mem_tracker); + // ctx->open(state); + // int index = _conjunct_ctxs.size(); + // _conjunct_ctxs.push_back(ctx); + // // it's safe to store address from a fix-resized vector + // _conjunctid_to_runtime_filter_ctxs[index] = &_runtime_filter_ctxs[i]; + // } + } + } + + // rebuild vexpr + for (int i = 0; i < _runtime_filter_ctxs.size(); ++i) { + if (!_runtime_filter_ctxs[i].apply_mark) { + continue; + } + IRuntimeFilter* runtime_filter = _runtime_filter_ctxs[i].runtimefilter; + std::vector vexprs; + runtime_filter->get_prepared_vexprs(&vexprs, row_desc(), _expr_mem_tracker); + if (vexprs.empty()) { + continue; + } + auto last_expr = _vconjunct_ctx_ptr ? (*_vconjunct_ctx_ptr)->root() : vexprs[0]; + for (size_t j = _vconjunct_ctx_ptr ? 0 : 1; j < vexprs.size(); j++) { + TExprNode texpr_node; + texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN)); + texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED); + texpr_node.__set_opcode(TExprOpcode::COMPOUND_AND); + VExpr* new_node = _pool->add(new VcompoundPred(texpr_node)); + new_node->add_child(last_expr); + new_node->add_child(vexprs[j]); + last_expr = new_node; + } + auto new_vconjunct_ctx_ptr = _pool->add(new VExprContext(last_expr)); + auto expr_status = new_vconjunct_ctx_ptr->prepare(state, row_desc(), expr_mem_tracker()); + if (UNLIKELY(!expr_status.OK())) { + LOG(WARNING) << "Something wrong for runtime filters: " << expr_status; + vexprs.clear(); + break; + } + + expr_status = new_vconjunct_ctx_ptr->open(state); + if (UNLIKELY(!expr_status.OK())) { + LOG(WARNING) << "Something wrong for runtime filters: " << expr_status; + vexprs.clear(); + break; + } + if (_vconjunct_ctx_ptr) { + _stale_vexpr_ctxs.push_back(std::move(_vconjunct_ctx_ptr)); + } + _vconjunct_ctx_ptr.reset(new doris::vectorized::VExprContext*); + *(_vconjunct_ctx_ptr.get()) = new_vconjunct_ctx_ptr; + _runtime_filter_ready_flag[i] = true; + } + return Status::OK(); +} + Status FileScanNode::start_scanners() { { std::unique_lock l(_batch_queue_lock); @@ -102,6 +210,7 @@ Status FileScanNode::start_scanners() { } _scanners_status.resize(_scan_ranges.size()); + COUNTER_UPDATE(_num_scanners, _scan_ranges.size()); ThreadPoolToken* thread_token = _runtime_state->get_query_fragments_ctx()->get_token(); PriorityThreadPool* thread_pool = _runtime_state->exec_env()->scan_thread_pool(); for (int i = 0; i < _scan_ranges.size(); ++i) { @@ -245,6 +354,18 @@ Status FileScanNode::close(RuntimeState* state) { } // Close _batch_queue.clear(); + + for (auto& filter_desc : _runtime_filter_descs) { + IRuntimeFilter* runtime_filter = nullptr; + state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id, &runtime_filter); + DCHECK(runtime_filter != nullptr); + runtime_filter->consumer_close(); + } + + for (auto& ctx : _stale_vexpr_ctxs) { + (*ctx)->close(state); + } + return ExecNode::close(state); } @@ -266,8 +387,11 @@ Status FileScanNode::scanner_scan(const TFileScanRange& scan_range, ScannerCount continue; } auto old_rows = block->rows(); - RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(), - _tuple_desc->slots().size())); + { + SCOPED_TIMER(_filter_timer); + RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block.get(), + _tuple_desc->slots().size())); + } counter->num_rows_unselected += old_rows - block->rows(); if (block->rows() == 0) { continue; @@ -318,6 +442,7 @@ void FileScanNode::scanner_worker(int start_idx, int length, std::promiseupdate_num_rows_load_filtered(counter.num_rows_filtered); _runtime_state->update_num_rows_load_unselected(counter.num_rows_unselected); + COUNTER_UPDATE(_num_rows_filtered, counter.num_rows_unselected); // scanner is going to finish { diff --git a/be/src/vec/exec/file_scan_node.h b/be/src/vec/exec/file_scan_node.h index 93a8916c0e..850c74db0c 100644 --- a/be/src/vec/exec/file_scan_node.h +++ b/be/src/vec/exec/file_scan_node.h @@ -29,6 +29,7 @@ #include "common/status.h" #include "exec/base_scanner.h" #include "exec/scan_node.h" +#include "exprs/runtime_filter.h" #include "gen_cpp/PaloInternalService_types.h" #include "runtime/descriptors.h" #include "vec/exec/file_scanner.h" @@ -88,6 +89,8 @@ private: // Scan one range Status scanner_scan(const TFileScanRange& scan_range, ScannerCounter* counter); + Status _acquire_and_build_runtime_filter(RuntimeState* state); + TupleId _tuple_id; RuntimeState* _runtime_state; TupleDescriptor* _tuple_desc; @@ -117,9 +120,29 @@ private: std::vector _pre_filter_texprs; RuntimeProfile::Counter* _wait_scanner_timer; + RuntimeProfile::Counter* _num_rows_filtered; + RuntimeProfile::Counter* _filter_timer; + RuntimeProfile::Counter* _num_scanners; std::deque> _block_queue; std::unique_ptr _mutable_block; + +protected: + struct RuntimeFilterContext { + RuntimeFilterContext() : apply_mark(false), runtimefilter(nullptr) {} + bool apply_mark; + IRuntimeFilter* runtimefilter; + }; + + const std::vector& runtime_filter_descs() const { + return _runtime_filter_descs; + } + std::vector _runtime_filter_descs; + std::vector _runtime_filter_ctxs; + std::vector _runtime_filter_ready_flag; + std::vector> _rf_locks; + std::map _conjunctid_to_runtime_filter_ctxs; + std::vector> _stale_vexpr_ctxs; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/file_scanner.h b/be/src/vec/exec/file_scanner.h index de0912061c..66940e8b37 100644 --- a/be/src/vec/exec/file_scanner.h +++ b/be/src/vec/exec/file_scanner.h @@ -26,6 +26,7 @@ #include "vec/exprs/vexpr_context.h" namespace doris::vectorized { +class FileScanNode; class FileScanner { public: FileScanner(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams& params, @@ -48,6 +49,8 @@ public: // Close this scanner virtual void close() = 0; + std::vector* mutable_runtime_filter_marks() { return &_runtime_filter_marks; } + protected: virtual void _init_profiles(RuntimeProfile* profile) = 0; @@ -96,6 +99,11 @@ protected: // slot_ids for parquet predicate push down are in tuple desc TupleId _tupleId; + // to record which runtime filters have been used + std::vector _runtime_filter_marks; + + FileScanNode* _parent; + private: Status _init_expr_ctxes(); Status _filter_block(vectorized::Block* output_block); diff --git a/be/src/vec/exec/file_text_scanner.cpp b/be/src/vec/exec/file_text_scanner.cpp index 883cbd5040..c7d95e45ab 100644 --- a/be/src/vec/exec/file_text_scanner.cpp +++ b/be/src/vec/exec/file_text_scanner.cpp @@ -101,8 +101,6 @@ Status FileTextScanner::get_next(Block* block, bool* eof) { const uint8_t* ptr = nullptr; size_t size = 0; RETURN_IF_ERROR(_cur_line_reader->read_line(&ptr, &size, &_cur_line_reader_eof)); - std::unique_ptr u_ptr; - u_ptr.reset(ptr); if (_skip_lines > 0) { _skip_lines--; continue; diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index 7da3d86d1e..00d63173c8 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -132,7 +132,7 @@ Status VExprContext::filter_block(const std::unique_ptr& vexpr_ct } DCHECK((*vexpr_ctx_ptr) != nullptr); int result_column_id = -1; - (*vexpr_ctx_ptr)->execute(block, &result_column_id); + RETURN_IF_ERROR((*vexpr_ctx_ptr)->execute(block, &result_column_id)); return Block::filter_block(block, result_column_id, column_to_keep); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java index 40addddc43..1c7f8cf5f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java @@ -27,6 +27,7 @@ import org.apache.doris.analysis.TupleId; import org.apache.doris.catalog.Type; import org.apache.doris.common.IdGenerator; import org.apache.doris.common.util.BitUtil; +import org.apache.doris.planner.external.ExternalFileScanNode; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; import org.apache.doris.thrift.TRuntimeFilterMode; @@ -321,7 +322,7 @@ public final class RuntimeFilterGenerator { * 2. Only olap scan nodes are supported: */ private void assignRuntimeFilters(ScanNode scanNode) { - if (!(scanNode instanceof OlapScanNode)) { + if (!(scanNode instanceof OlapScanNode) && !(scanNode instanceof ExternalFileScanNode)) { return; } TupleId tid = scanNode.getTupleIds().get(0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index 00051837ad..78ef63f257 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -55,6 +55,7 @@ import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; +import com.google.common.base.Joiner; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -63,7 +64,6 @@ import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.mortbay.log.Log; import java.io.IOException; import java.util.ArrayList; @@ -308,9 +308,9 @@ public class ExternalFileScanNode extends ExternalScanNode { TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); - Log.debug("Assign to backend " + curLocations.getLocations().get(0).getBackendId() - + " with table split: " + fileSplit.getPath() - + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")"); + LOG.info("Assign to backend " + curLocations.getLocations().get(0).getBackendId() + " with table split: " + + fileSplit.getPath() + " ( " + fileSplit.getStart() + "," + fileSplit.getLength() + ")" + + " loaction: " + Joiner.on("|").join(split.getLocations())); fileSplitStrategy.update(fileSplit); // Add a new location when it's can be split