diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index ea8eb9046d..e1bd3651db 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -773,43 +773,11 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi void FragmentMgr::_set_scan_concurrency(const TExecPlanFragmentParams& params, QueryFragmentsCtx* fragments_ctx) { #ifndef BE_TEST - // set thread token - // the thread token will be set if - // 1. the cpu_limit is set, or - // 2. the limit is very small ( < 1024) // If the token is set, the scan task will use limited_scan_pool in scanner scheduler. // Otherwise, the scan task will use local/remote scan pool in scanner scheduler - int concurrency = 1; - bool is_serial = false; - bool need_token = false; if (params.query_options.__isset.resource_limit && params.query_options.resource_limit.__isset.cpu_limit) { - concurrency = params.query_options.resource_limit.cpu_limit; - need_token = true; - } else { - concurrency = config::doris_scanner_thread_pool_thread_num; - } - if (params.__isset.fragment && params.fragment.__isset.plan && - params.fragment.plan.nodes.size() > 0) { - for (auto& node : params.fragment.plan.nodes) { - // Only for SCAN NODE - if (!_is_scan_node(node.node_type)) { - continue; - } - if (node.__isset.conjuncts && !node.conjuncts.empty()) { - // If the scan node has where predicate, do not set concurrency - continue; - } - if (node.limit > 0 && node.limit < 1024) { - concurrency = 1; - is_serial = true; - need_token = true; - break; - } - } - } - if (need_token) { - fragments_ctx->set_thread_token(concurrency, is_serial); + fragments_ctx->set_thread_token(params.query_options.resource_limit.cpu_limit, false); } #endif } diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index edf7c61c60..25a23f603c 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -36,6 +36,10 @@ Status ScannerContext::init() { // should find a more reasonable value. _max_thread_num = std::min(config::doris_scanner_thread_pool_thread_num / 4, (int32_t)_scanners.size()); + // For select * from table limit 10; should just use one thread. + if (_parent->should_run_serial()) { + _max_thread_num = 1; + } // 2. Calculate how many blocks need to be preallocated. // The calculation logic is as follows: diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 7de79247b6..085d647262 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -274,7 +274,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext // judge if we need to yield. So we record all raw data read in this round // scan, if this exceeds row number or bytes threshold, we yield this thread. std::vector blocks; - int64_t raw_rows_read = scanner->raw_rows_read(); + int64_t raw_rows_read = scanner->get_rows_read(); int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num; int64_t raw_bytes_read = 0; int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; @@ -332,7 +332,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext blocks.push_back(block); } } - raw_rows_read = scanner->raw_rows_read(); + raw_rows_read = scanner->get_rows_read(); } // end for while // if we failed, check status. diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 2ac36d35b7..c8d7269087 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -49,7 +49,14 @@ struct FilterPredicates { class VScanNode : public ExecNode { public: VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {} + : ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) { + if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) { + // Which means the request could be fullfilled in a single segment iterator request. + if (tnode.limit > 0 && tnode.limit < 1024) { + _should_run_serial = true; + } + } + } virtual ~VScanNode() = default; friend class VScanner; @@ -95,6 +102,8 @@ public: Status try_close(); + bool should_run_serial() const { return _should_run_serial; } + enum class PushDownType { // The predicate can not be pushed down to data source UNACCEPTABLE, @@ -243,6 +252,9 @@ protected: bool _need_agg_finalize = true; bool _blocked_by_rf = false; + // If the query like select * from table limit 10; then the query should run in + // single scanner to avoid too many scanners which will cause lots of useless read. + bool _should_run_serial = false; // Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in this vector // so that it will be destroyed uniformly at the end of the query. diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index dfb260de50..f3c2180b43 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -37,7 +37,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { // only empty block should be here DCHECK(block->rows() == 0); SCOPED_RAW_TIMER(&_per_scanner_timer); - int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; + int64_t rows_read_threshold = _num_rows_read + config::doris_scanner_row_num; if (!block->mem_reuse()) { for (const auto slot_desc : _output_tuple_desc->slots()) { if (!slot_desc->need_materialize()) { @@ -71,7 +71,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) { _num_rows_return += block->rows(); } } while (!state->is_cancelled() && block->rows() == 0 && !(*eof) && - raw_rows_read() < raw_rows_threshold); + _num_rows_read < rows_read_threshold); } if (state->is_cancelled()) { diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index a91b87c0b5..8c01727ba8 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -47,9 +47,6 @@ public: virtual Status close(RuntimeState* state); - // Subclass must implement this to return the current rows read - virtual int64_t raw_rows_read() { return 0; } - protected: // Subclass should implement this to return data. virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) = 0;