[improvement](scan) remove concurrency limit if scan has predicate (#13021)

If a scan node has predicate, we can not limit the concurrency of scanner.
Because we don't know how much data need to be scan.
If we limit the concurrency, this will cause query to be very slow.

For exmple:
select * from tbl limit 1, the concurrency will be 1;
select * from tbl where k1=1 limit 1, the concurrency will not limit.
This commit is contained in:
Mingyu Chen
2022-09-28 17:07:07 +08:00
committed by GitHub
parent 28ce1878ca
commit cd549d8a8f
2 changed files with 42 additions and 30 deletions

View File

@ -616,6 +616,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
BackendOptions::get_localhost());
}
fragments_ctx = search->second;
_set_scan_concurrency(params, fragments_ctx.get());
} else {
// This may be a first fragment request of the query.
// Create the query fragments context.
@ -636,36 +637,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
}
fragments_ctx->timeout_second = params.query_options.query_timeout;
#ifndef BE_TEST
// set thread token
// the thread token will be set if
// 1. the cpu_limit is set, or
// 2. the limit is very small ( < 1024)
int concurrency = 1;
bool is_serial = false;
if (params.query_options.__isset.resource_limit &&
params.query_options.resource_limit.__isset.cpu_limit) {
concurrency = params.query_options.resource_limit.cpu_limit;
} else {
concurrency = config::doris_scanner_thread_pool_thread_num;
}
if (params.__isset.fragment && params.fragment.__isset.plan &&
params.fragment.plan.nodes.size() > 0) {
for (auto& node : params.fragment.plan.nodes) {
// Only for SCAN NODE
if (!_is_scan_node(node.node_type)) {
continue;
}
if (node.limit > 0 && node.limit < 1024) {
concurrency = 1;
is_serial = true;
break;
}
}
}
fragments_ctx->set_thread_token(concurrency, is_serial);
#endif
_set_scan_concurrency(params, fragments_ctx.get());
{
// Find _fragments_ctx_map again, in case some other request has already
@ -723,6 +695,43 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
return Status::OK();
}
void FragmentMgr::_set_scan_concurrency(const TExecPlanFragmentParams& params,
QueryFragmentsCtx* fragments_ctx) {
#ifndef BE_TEST
// set thread token
// the thread token will be set if
// 1. the cpu_limit is set, or
// 2. the limit is very small ( < 1024)
int concurrency = 1;
bool is_serial = false;
if (params.query_options.__isset.resource_limit &&
params.query_options.resource_limit.__isset.cpu_limit) {
concurrency = params.query_options.resource_limit.cpu_limit;
} else {
concurrency = config::doris_scanner_thread_pool_thread_num;
}
if (params.__isset.fragment && params.fragment.__isset.plan &&
params.fragment.plan.nodes.size() > 0) {
for (auto& node : params.fragment.plan.nodes) {
// Only for SCAN NODE
if (!_is_scan_node(node.node_type)) {
continue;
}
if (node.__isset.conjuncts && !node.conjuncts.empty()) {
// If the scan node has where predicate, do not set concurrency
continue;
}
if (node.limit > 0 && node.limit < 1024) {
concurrency = 1;
is_serial = true;
break;
}
}
}
fragments_ctx->set_thread_token(concurrency, is_serial);
#endif
}
bool FragmentMgr::_is_scan_node(const TPlanNodeType::type& type) {
return type == TPlanNodeType::OLAP_SCAN_NODE || type == TPlanNodeType::MYSQL_SCAN_NODE ||
type == TPlanNodeType::SCHEMA_SCAN_NODE || type == TPlanNodeType::META_SCAN_NODE ||

View File

@ -96,6 +96,9 @@ public:
private:
void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, FinishCallback cb);
void _set_scan_concurrency(const TExecPlanFragmentParams& params,
QueryFragmentsCtx* fragments_ctx);
bool _is_scan_node(const TPlanNodeType::type& type);
// This is input params