From cd549d8a8f50a01f356c4f7883f8a6f2cbf43c50 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Wed, 28 Sep 2022 17:07:07 +0800 Subject: [PATCH] [improvement](scan) remove concurrency limit if scan has predicate (#13021) If a scan node has predicate, we can not limit the concurrency of scanner. Because we don't know how much data need to be scan. If we limit the concurrency, this will cause query to be very slow. For exmple: select * from tbl limit 1, the concurrency will be 1; select * from tbl where k1=1 limit 1, the concurrency will not limit. --- be/src/runtime/fragment_mgr.cpp | 69 +++++++++++++++++++-------------- be/src/runtime/fragment_mgr.h | 3 ++ 2 files changed, 42 insertions(+), 30 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 214581f696..7b5c3af432 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -616,6 +616,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi BackendOptions::get_localhost()); } fragments_ctx = search->second; + _set_scan_concurrency(params, fragments_ctx.get()); } else { // This may be a first fragment request of the query. // Create the query fragments context. @@ -636,36 +637,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi } fragments_ctx->timeout_second = params.query_options.query_timeout; - -#ifndef BE_TEST - // set thread token - // the thread token will be set if - // 1. the cpu_limit is set, or - // 2. the limit is very small ( < 1024) - int concurrency = 1; - bool is_serial = false; - if (params.query_options.__isset.resource_limit && - params.query_options.resource_limit.__isset.cpu_limit) { - concurrency = params.query_options.resource_limit.cpu_limit; - } else { - concurrency = config::doris_scanner_thread_pool_thread_num; - } - if (params.__isset.fragment && params.fragment.__isset.plan && - params.fragment.plan.nodes.size() > 0) { - for (auto& node : params.fragment.plan.nodes) { - // Only for SCAN NODE - if (!_is_scan_node(node.node_type)) { - continue; - } - if (node.limit > 0 && node.limit < 1024) { - concurrency = 1; - is_serial = true; - break; - } - } - } - fragments_ctx->set_thread_token(concurrency, is_serial); -#endif + _set_scan_concurrency(params, fragments_ctx.get()); { // Find _fragments_ctx_map again, in case some other request has already @@ -723,6 +695,43 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi return Status::OK(); } +void FragmentMgr::_set_scan_concurrency(const TExecPlanFragmentParams& params, + QueryFragmentsCtx* fragments_ctx) { +#ifndef BE_TEST + // set thread token + // the thread token will be set if + // 1. the cpu_limit is set, or + // 2. the limit is very small ( < 1024) + int concurrency = 1; + bool is_serial = false; + if (params.query_options.__isset.resource_limit && + params.query_options.resource_limit.__isset.cpu_limit) { + concurrency = params.query_options.resource_limit.cpu_limit; + } else { + concurrency = config::doris_scanner_thread_pool_thread_num; + } + if (params.__isset.fragment && params.fragment.__isset.plan && + params.fragment.plan.nodes.size() > 0) { + for (auto& node : params.fragment.plan.nodes) { + // Only for SCAN NODE + if (!_is_scan_node(node.node_type)) { + continue; + } + if (node.__isset.conjuncts && !node.conjuncts.empty()) { + // If the scan node has where predicate, do not set concurrency + continue; + } + if (node.limit > 0 && node.limit < 1024) { + concurrency = 1; + is_serial = true; + break; + } + } + } + fragments_ctx->set_thread_token(concurrency, is_serial); +#endif +} + bool FragmentMgr::_is_scan_node(const TPlanNodeType::type& type) { return type == TPlanNodeType::OLAP_SCAN_NODE || type == TPlanNodeType::MYSQL_SCAN_NODE || type == TPlanNodeType::SCHEMA_SCAN_NODE || type == TPlanNodeType::META_SCAN_NODE || diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index f3341c0795..66b06540d4 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -96,6 +96,9 @@ public: private: void _exec_actual(std::shared_ptr exec_state, FinishCallback cb); + void _set_scan_concurrency(const TExecPlanFragmentParams& params, + QueryFragmentsCtx* fragments_ctx); + bool _is_scan_node(const TPlanNodeType::type& type); // This is input params