diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index c0a471327d..45e106039f 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -29,7 +29,13 @@ namespace doris::vectorized { Status ScannerContext::init() { _real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc : _output_tuple_desc; - // 1. Calculate how many blocks need to be preallocated. + // 1. Calculate max concurrency + // TODO: now the max thread num <= config::doris_scanner_thread_pool_thread_num / 4 + // should find a more reasonable value. + _max_thread_num = + std::min(config::doris_scanner_thread_pool_thread_num / 4, (int32_t)_scanners.size()); + + // 2. Calculate how many blocks need to be preallocated. // The calculation logic is as follows: // 1. Assuming that at most M rows can be scanned in one scan(config::doris_scanner_row_num), // then figure out how many blocks are required for one scan(_block_per_scanner). @@ -41,9 +47,7 @@ Status ScannerContext::init() { int real_block_size = limit == -1 ? _state->batch_size() : std::min(static_cast(_state->batch_size()), limit); _block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / real_block_size; - auto pre_alloc_block_count = - std::min((int32_t)_scanners.size(), config::doris_scanner_thread_pool_thread_num) * - _block_per_scanner; + auto pre_alloc_block_count = _max_thread_num * _block_per_scanner; // The free blocks is used for final output block of scanners. // So use _output_tuple_desc; @@ -52,15 +56,6 @@ Status ScannerContext::init() { _free_blocks.emplace_back(block); } - // 2. Calculate max concurrency - _max_thread_num = config::doris_scanner_thread_pool_thread_num; - if (config::doris_scanner_row_num > _state->batch_size()) { - _max_thread_num /= config::doris_scanner_row_num / _state->batch_size(); - if (_max_thread_num <= 0) { - _max_thread_num = 1; - } - } - #ifndef BE_TEST // 3. get thread token thread_token = _state->get_query_fragments_ctx()->get_token(); @@ -72,7 +67,7 @@ Status ScannerContext::init() { _num_unfinished_scanners = _scanners.size(); - COUNTER_SET(_parent->_pre_alloc_free_blocks_num, (int64_t)_free_blocks.size()); + COUNTER_SET(_parent->_pre_alloc_free_blocks_num, (int64_t)pre_alloc_block_count); COUNTER_SET(_parent->_max_scanner_thread_num, (int64_t)_max_thread_num); return Status::OK(); @@ -223,6 +218,11 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) { if (scanner->need_to_close() && scanner->set_counted_down() && (--_num_unfinished_scanners) == 0) { _is_finished = true; + // ATTN: this 2 counters will be set at close() again, which is the final values. + // But we set them here because the counter set at close() can not send to FE's profile. + // So we set them here, and the counter value may be little less than final values. + COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling); + COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling); _blocks_queue_added_cv.notify_one(); } _ctx_finish_cv.notify_one();