[improvement](new-scan) avoid too many scanner context scheduling (#12491)
When select large number of data from a table, the profile will show that: - ScannerCtxSchedCount: 2.82664M(2826640) But there is only 8 times of ScannerSchedCount, most of them are busy running. After improvement, the ScannerCtxSchedCount will be reduced to only 10.
This commit is contained in:
@ -108,6 +108,16 @@ void ScannerContext::append_blocks_to_queue(const std::vector<vectorized::Block*
|
||||
|
||||
Status ScannerContext::get_block_from_queue(vectorized::Block** block, bool* eos) {
|
||||
std::unique_lock<std::mutex> l(_transfer_lock);
|
||||
// Normally, the scanner scheduler will schedule ctx.
|
||||
// But when the amount of data in the blocks queue exceeds the upper limit,
|
||||
// the scheduler will stop scheduling.
|
||||
// (if the scheduler continues to schedule, it will cause a lot of busy running).
|
||||
// At this point, consumers are required to trigger new scheduling to ensure that
|
||||
// data can be continuously fetched.
|
||||
if (_has_enough_space_in_blocks_queue() && _num_running_scanners == 0) {
|
||||
_num_scheduling_ctx++;
|
||||
_state->exec_env()->scanner_scheduler()->submit(this);
|
||||
}
|
||||
// Wait for block from queue
|
||||
while (_process_status.ok() && !_is_finished && blocks_queue.empty()) {
|
||||
if (_state->is_cancelled()) {
|
||||
@ -170,6 +180,9 @@ void ScannerContext::clear_and_join() {
|
||||
// So that we can make sure to close all scanners.
|
||||
_close_and_clear_scanners();
|
||||
|
||||
COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling);
|
||||
COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling);
|
||||
|
||||
std::for_each(blocks_queue.begin(), blocks_queue.end(),
|
||||
std::default_delete<vectorized::Block>());
|
||||
std::for_each(_free_blocks.begin(), _free_blocks.end(),
|
||||
@ -189,8 +202,7 @@ std::string ScannerContext::debug_string() {
|
||||
_max_thread_num, _block_per_scanner, _cur_bytes_in_queue, _max_bytes_in_queue);
|
||||
}
|
||||
|
||||
void ScannerContext::push_back_scanner_and_reschedule(ScannerScheduler* scheduler,
|
||||
VScanner* scanner) {
|
||||
void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
|
||||
{
|
||||
std::unique_lock<std::mutex> l(_scanners_lock);
|
||||
_scanners.push_front(scanner);
|
||||
@ -199,22 +211,20 @@ void ScannerContext::push_back_scanner_and_reschedule(ScannerScheduler* schedule
|
||||
std::lock_guard<std::mutex> l(_transfer_lock);
|
||||
_num_running_scanners--;
|
||||
_num_scheduling_ctx++;
|
||||
scheduler->submit(this);
|
||||
_state->exec_env()->scanner_scheduler()->submit(this);
|
||||
if (scanner->need_to_close() && (--_num_unfinished_scanners) == 0) {
|
||||
_is_finished = true;
|
||||
COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling);
|
||||
COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling);
|
||||
_blocks_queue_added_cv.notify_one();
|
||||
}
|
||||
_ctx_finish_cv.notify_one();
|
||||
}
|
||||
|
||||
bool ScannerContext::get_next_batch_of_scanners(std::list<VScanner*>* current_run) {
|
||||
void ScannerContext::get_next_batch_of_scanners(std::list<VScanner*>* current_run) {
|
||||
// 1. Calculate how many scanners should be scheduled at this run.
|
||||
int thread_slot_num = 0;
|
||||
{
|
||||
std::unique_lock<std::mutex> l(_transfer_lock);
|
||||
if (_cur_bytes_in_queue < _max_bytes_in_queue / 2) {
|
||||
if (_has_enough_space_in_blocks_queue()) {
|
||||
// If there are enough space in blocks queue,
|
||||
// the scanner number depends on the _free_blocks numbers
|
||||
std::lock_guard<std::mutex> l(_free_blocks_lock);
|
||||
@ -225,10 +235,12 @@ bool ScannerContext::get_next_batch_of_scanners(std::list<VScanner*>* current_ru
|
||||
thread_slot_num = 1;
|
||||
}
|
||||
} else {
|
||||
// The blocks queue reaches limit, so no more scanners will be scheduled this time.
|
||||
// We need to return false so that the scanner scheduler will push bash this ctx
|
||||
// to the scheduling queue, waiting next scheduling.
|
||||
return false;
|
||||
// The blocks queue reaches limit, just return to stop scheduling
|
||||
// There will be two cases:
|
||||
// 1. There are running scanners, these scanner will continue scheduler the ctx.
|
||||
// 2. No running scanners, the consumer(ScanNode.get_next()) will continue scheduling the ctx.
|
||||
// In both cases, we do not need to continue to schedule ctx here. So just return
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@ -247,7 +259,7 @@ bool ScannerContext::get_next_batch_of_scanners(std::list<VScanner*>* current_ru
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
return;
|
||||
}
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -79,7 +79,7 @@ public:
|
||||
|
||||
// When a scanner complete a scan, this method will be called
|
||||
// to return the scanner to the list for next scheduling.
|
||||
void push_back_scanner_and_reschedule(ScannerScheduler* scheduler, VScanner* scanner);
|
||||
void push_back_scanner_and_reschedule(VScanner* scanner);
|
||||
|
||||
bool set_status_on_error(const Status& status);
|
||||
|
||||
@ -111,7 +111,7 @@ public:
|
||||
_ctx_finish_cv.notify_one();
|
||||
}
|
||||
|
||||
bool get_next_batch_of_scanners(std::list<VScanner*>* current_run);
|
||||
void get_next_batch_of_scanners(std::list<VScanner*>* current_run);
|
||||
|
||||
void clear_and_join();
|
||||
|
||||
@ -135,6 +135,10 @@ public:
|
||||
private:
|
||||
Status _close_and_clear_scanners();
|
||||
|
||||
inline bool _has_enough_space_in_blocks_queue() {
|
||||
return _cur_bytes_in_queue < _max_bytes_in_queue / 2;
|
||||
}
|
||||
|
||||
private:
|
||||
RuntimeState* _state;
|
||||
VScanNode* _parent;
|
||||
|
||||
@ -99,17 +99,15 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
|
||||
}
|
||||
|
||||
std::list<VScanner*> this_run;
|
||||
bool res = ctx->get_next_batch_of_scanners(&this_run);
|
||||
ctx->get_next_batch_of_scanners(&this_run);
|
||||
if (this_run.empty()) {
|
||||
if (!res) {
|
||||
// This means we failed to choose scanners this time, and there may be no other scanners running.
|
||||
// So we need to submit this ctx back to queue to be scheduled next time.
|
||||
submit(ctx);
|
||||
} else {
|
||||
// No need to push back this ctx to reschedule
|
||||
// There will be running scanners to push it back.
|
||||
ctx->update_num_running(0, -1);
|
||||
}
|
||||
// There will be 2 cases when this_run is empty:
|
||||
// 1. The blocks queue reaches limit.
|
||||
// The consumer will continue scheduling the ctx.
|
||||
// 2. All scanners are running.
|
||||
// There running scanner will schedule the ctx after they are finished.
|
||||
// So here we just return to stop scheduling ctx.
|
||||
ctx->update_num_running(0, -1);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -264,7 +262,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext
|
||||
scanner->mark_to_need_to_close();
|
||||
}
|
||||
|
||||
ctx->push_back_scanner_and_reschedule(scheduler, scanner);
|
||||
ctx->push_back_scanner_and_reschedule(scanner);
|
||||
}
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -51,6 +51,7 @@ suite("test_materialized_view_date", "rollup") {
|
||||
}
|
||||
}
|
||||
}
|
||||
Thread.sleep(2)
|
||||
max_try_secs = 60
|
||||
sql "CREATE materialized VIEW amt_max2 AS SELECT store_id, max(sale_datetime1) FROM ${tbName1} GROUP BY store_id;"
|
||||
while (max_try_secs--) {
|
||||
@ -65,6 +66,7 @@ suite("test_materialized_view_date", "rollup") {
|
||||
}
|
||||
}
|
||||
}
|
||||
Thread.sleep(2)
|
||||
max_try_secs = 60
|
||||
sql "CREATE materialized VIEW amt_max3 AS SELECT store_id, max(sale_datetime2) FROM ${tbName1} GROUP BY store_id;"
|
||||
while (max_try_secs--) {
|
||||
@ -79,6 +81,7 @@ suite("test_materialized_view_date", "rollup") {
|
||||
}
|
||||
}
|
||||
}
|
||||
Thread.sleep(2)
|
||||
max_try_secs = 60
|
||||
sql "CREATE materialized VIEW amt_max4 AS SELECT store_id, max(sale_datetime3) FROM ${tbName1} GROUP BY store_id;"
|
||||
while (max_try_secs--) {
|
||||
|
||||
Reference in New Issue
Block a user