diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 214581f696..7b5c3af432 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -616,6 +616,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi BackendOptions::get_localhost()); } fragments_ctx = search->second; + _set_scan_concurrency(params, fragments_ctx.get()); } else { // This may be a first fragment request of the query. // Create the query fragments context. @@ -636,36 +637,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi } fragments_ctx->timeout_second = params.query_options.query_timeout; - -#ifndef BE_TEST - // set thread token - // the thread token will be set if - // 1. the cpu_limit is set, or - // 2. the limit is very small ( < 1024) - int concurrency = 1; - bool is_serial = false; - if (params.query_options.__isset.resource_limit && - params.query_options.resource_limit.__isset.cpu_limit) { - concurrency = params.query_options.resource_limit.cpu_limit; - } else { - concurrency = config::doris_scanner_thread_pool_thread_num; - } - if (params.__isset.fragment && params.fragment.__isset.plan && - params.fragment.plan.nodes.size() > 0) { - for (auto& node : params.fragment.plan.nodes) { - // Only for SCAN NODE - if (!_is_scan_node(node.node_type)) { - continue; - } - if (node.limit > 0 && node.limit < 1024) { - concurrency = 1; - is_serial = true; - break; - } - } - } - fragments_ctx->set_thread_token(concurrency, is_serial); -#endif + _set_scan_concurrency(params, fragments_ctx.get()); { // Find _fragments_ctx_map again, in case some other request has already @@ -723,6 +695,43 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi return Status::OK(); } +void FragmentMgr::_set_scan_concurrency(const TExecPlanFragmentParams& params, + QueryFragmentsCtx* fragments_ctx) { +#ifndef BE_TEST + // set thread token + // the thread token will be set if + // 1. the cpu_limit is set, or + // 2. the limit is very small ( < 1024) + int concurrency = 1; + bool is_serial = false; + if (params.query_options.__isset.resource_limit && + params.query_options.resource_limit.__isset.cpu_limit) { + concurrency = params.query_options.resource_limit.cpu_limit; + } else { + concurrency = config::doris_scanner_thread_pool_thread_num; + } + if (params.__isset.fragment && params.fragment.__isset.plan && + params.fragment.plan.nodes.size() > 0) { + for (auto& node : params.fragment.plan.nodes) { + // Only for SCAN NODE + if (!_is_scan_node(node.node_type)) { + continue; + } + if (node.__isset.conjuncts && !node.conjuncts.empty()) { + // If the scan node has where predicate, do not set concurrency + continue; + } + if (node.limit > 0 && node.limit < 1024) { + concurrency = 1; + is_serial = true; + break; + } + } + } + fragments_ctx->set_thread_token(concurrency, is_serial); +#endif +} + bool FragmentMgr::_is_scan_node(const TPlanNodeType::type& type) { return type == TPlanNodeType::OLAP_SCAN_NODE || type == TPlanNodeType::MYSQL_SCAN_NODE || type == TPlanNodeType::SCHEMA_SCAN_NODE || type == TPlanNodeType::META_SCAN_NODE || diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index f3341c0795..66b06540d4 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -96,6 +96,9 @@ public: private: void _exec_actual(std::shared_ptr exec_state, FinishCallback cb); + void _set_scan_concurrency(const TExecPlanFragmentParams& params, + QueryFragmentsCtx* fragments_ctx); + bool _is_scan_node(const TPlanNodeType::type& type); // This is input params