From efd2bdb2034affdf1892e7c88e72d78118d2411d Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Mon, 12 Sep 2022 10:22:54 +0800 Subject: [PATCH] [improvement](new-scan) avoid too many scanner context scheduling (#12491) When select large number of data from a table, the profile will show that: - ScannerCtxSchedCount: 2.82664M(2826640) But there is only 8 times of ScannerSchedCount, most of them are busy running. After improvement, the ScannerCtxSchedCount will be reduced to only 10. --- be/src/vec/exec/scan/scanner_context.cpp | 36 ++++++++++++------- be/src/vec/exec/scan/scanner_context.h | 8 +++-- be/src/vec/exec/scan/scanner_scheduler.cpp | 20 +++++------ .../rollup/test_materialized_view_date.groovy | 3 ++ 4 files changed, 42 insertions(+), 25 deletions(-) diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index cfe15a0ab1..f3db5049bf 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -108,6 +108,16 @@ void ScannerContext::append_blocks_to_queue(const std::vector l(_transfer_lock); + // Normally, the scanner scheduler will schedule ctx. + // But when the amount of data in the blocks queue exceeds the upper limit, + // the scheduler will stop scheduling. + // (if the scheduler continues to schedule, it will cause a lot of busy running). + // At this point, consumers are required to trigger new scheduling to ensure that + // data can be continuously fetched. + if (_has_enough_space_in_blocks_queue() && _num_running_scanners == 0) { + _num_scheduling_ctx++; + _state->exec_env()->scanner_scheduler()->submit(this); + } // Wait for block from queue while (_process_status.ok() && !_is_finished && blocks_queue.empty()) { if (_state->is_cancelled()) { @@ -170,6 +180,9 @@ void ScannerContext::clear_and_join() { // So that we can make sure to close all scanners. _close_and_clear_scanners(); + COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling); + COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling); + std::for_each(blocks_queue.begin(), blocks_queue.end(), std::default_delete()); std::for_each(_free_blocks.begin(), _free_blocks.end(), @@ -189,8 +202,7 @@ std::string ScannerContext::debug_string() { _max_thread_num, _block_per_scanner, _cur_bytes_in_queue, _max_bytes_in_queue); } -void ScannerContext::push_back_scanner_and_reschedule(ScannerScheduler* scheduler, - VScanner* scanner) { +void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) { { std::unique_lock l(_scanners_lock); _scanners.push_front(scanner); @@ -199,22 +211,20 @@ void ScannerContext::push_back_scanner_and_reschedule(ScannerScheduler* schedule std::lock_guard l(_transfer_lock); _num_running_scanners--; _num_scheduling_ctx++; - scheduler->submit(this); + _state->exec_env()->scanner_scheduler()->submit(this); if (scanner->need_to_close() && (--_num_unfinished_scanners) == 0) { _is_finished = true; - COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling); - COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling); _blocks_queue_added_cv.notify_one(); } _ctx_finish_cv.notify_one(); } -bool ScannerContext::get_next_batch_of_scanners(std::list* current_run) { +void ScannerContext::get_next_batch_of_scanners(std::list* current_run) { // 1. Calculate how many scanners should be scheduled at this run. int thread_slot_num = 0; { std::unique_lock l(_transfer_lock); - if (_cur_bytes_in_queue < _max_bytes_in_queue / 2) { + if (_has_enough_space_in_blocks_queue()) { // If there are enough space in blocks queue, // the scanner number depends on the _free_blocks numbers std::lock_guard l(_free_blocks_lock); @@ -225,10 +235,12 @@ bool ScannerContext::get_next_batch_of_scanners(std::list* current_ru thread_slot_num = 1; } } else { - // The blocks queue reaches limit, so no more scanners will be scheduled this time. - // We need to return false so that the scanner scheduler will push bash this ctx - // to the scheduling queue, waiting next scheduling. - return false; + // The blocks queue reaches limit, just return to stop scheduling + // There will be two cases: + // 1. There are running scanners, these scanner will continue scheduler the ctx. + // 2. No running scanners, the consumer(ScanNode.get_next()) will continue scheduling the ctx. + // In both cases, we do not need to continue to schedule ctx here. So just return + return; } } @@ -247,7 +259,7 @@ bool ScannerContext::get_next_batch_of_scanners(std::list* current_ru } } } - return true; + return; } } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 9bac5e3517..ba6dc65918 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -79,7 +79,7 @@ public: // When a scanner complete a scan, this method will be called // to return the scanner to the list for next scheduling. - void push_back_scanner_and_reschedule(ScannerScheduler* scheduler, VScanner* scanner); + void push_back_scanner_and_reschedule(VScanner* scanner); bool set_status_on_error(const Status& status); @@ -111,7 +111,7 @@ public: _ctx_finish_cv.notify_one(); } - bool get_next_batch_of_scanners(std::list* current_run); + void get_next_batch_of_scanners(std::list* current_run); void clear_and_join(); @@ -135,6 +135,10 @@ public: private: Status _close_and_clear_scanners(); + inline bool _has_enough_space_in_blocks_queue() { + return _cur_bytes_in_queue < _max_bytes_in_queue / 2; + } + private: RuntimeState* _state; VScanNode* _parent; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 8246180ec4..7108143967 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -99,17 +99,15 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { } std::list this_run; - bool res = ctx->get_next_batch_of_scanners(&this_run); + ctx->get_next_batch_of_scanners(&this_run); if (this_run.empty()) { - if (!res) { - // This means we failed to choose scanners this time, and there may be no other scanners running. - // So we need to submit this ctx back to queue to be scheduled next time. - submit(ctx); - } else { - // No need to push back this ctx to reschedule - // There will be running scanners to push it back. - ctx->update_num_running(0, -1); - } + // There will be 2 cases when this_run is empty: + // 1. The blocks queue reaches limit. + // The consumer will continue scheduling the ctx. + // 2. All scanners are running. + // There running scanner will schedule the ctx after they are finished. + // So here we just return to stop scheduling ctx. + ctx->update_num_running(0, -1); return; } @@ -264,7 +262,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext scanner->mark_to_need_to_close(); } - ctx->push_back_scanner_and_reschedule(scheduler, scanner); + ctx->push_back_scanner_and_reschedule(scanner); } } // namespace doris::vectorized diff --git a/regression-test/suites/rollup/test_materialized_view_date.groovy b/regression-test/suites/rollup/test_materialized_view_date.groovy index eedc68c339..ce2d4cb44a 100644 --- a/regression-test/suites/rollup/test_materialized_view_date.groovy +++ b/regression-test/suites/rollup/test_materialized_view_date.groovy @@ -51,6 +51,7 @@ suite("test_materialized_view_date", "rollup") { } } } + Thread.sleep(2) max_try_secs = 60 sql "CREATE materialized VIEW amt_max2 AS SELECT store_id, max(sale_datetime1) FROM ${tbName1} GROUP BY store_id;" while (max_try_secs--) { @@ -65,6 +66,7 @@ suite("test_materialized_view_date", "rollup") { } } } + Thread.sleep(2) max_try_secs = 60 sql "CREATE materialized VIEW amt_max3 AS SELECT store_id, max(sale_datetime2) FROM ${tbName1} GROUP BY store_id;" while (max_try_secs--) { @@ -79,6 +81,7 @@ suite("test_materialized_view_date", "rollup") { } } } + Thread.sleep(2) max_try_secs = 60 sql "CREATE materialized VIEW amt_max4 AS SELECT store_id, max(sale_datetime3) FROM ${tbName1} GROUP BY store_id;" while (max_try_secs--) {