From 53505e903baeccea576873c3ce2e20e2bf298d15 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Mon, 8 Apr 2024 11:22:55 +0800 Subject: [PATCH] [scan](improvement) Adjust parallel scanners num (#33223) --- be/src/pipeline/exec/file_scan_operator.h | 2 ++ be/src/pipeline/exec/scan_operator.cpp | 7 ++++++- be/src/pipeline/exec/scan_operator.h | 2 ++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/file_scan_operator.h b/be/src/pipeline/exec/file_scan_operator.h index 4e64bd850b..4d0c38b285 100644 --- a/be/src/pipeline/exec/file_scan_operator.h +++ b/be/src/pipeline/exec/file_scan_operator.h @@ -79,6 +79,8 @@ public: Status prepare(RuntimeState* state) override; + bool is_file_scan_operator() const override { return true; } + private: friend class FileScanLocalState; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 4218ac1308..0107f57bd2 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1218,7 +1218,12 @@ Status ScanLocalState::_start_scanners( _scanner_ctx = PipXScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), state()->scan_queue_mem_limit(), _scan_dependency, - p.ignore_data_distribution() ? 1 : state()->query_parallel_instance_num()); + // 1. If data distribution is ignored , we use 1 instance to scan. + // 2. Else if this operator is not file scan operator, we use config::doris_scanner_thread_pool_thread_num scanners to scan. + // 3. Else, file scanner will consume much memory so we use config::doris_scanner_thread_pool_thread_num / query_parallel_instance_num scanners to scan. + p.ignore_data_distribution() || !p.is_file_scan_operator() + ? 1 + : state()->query_parallel_instance_num()); return Status::OK(); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index e941f8ce96..a741893a74 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -394,6 +394,8 @@ public: } [[nodiscard]] bool is_source() const override { return true; } + [[nodiscard]] virtual bool is_file_scan_operator() const { return false; } + const std::vector& runtime_filter_descs() override { return _runtime_filter_descs; }