[Bug-fix] When query cancel, transfer_thread does not continue to schedule scanner_thread (#5768)
The cause of the problem is that after query cancel, OlapScanNode::transfer_thread still continues to schedule OlapScanNode::scanner_thread until all tasks are scheduled. Although each task does not scan data and exits quickly, it still consumes a lot of resources. (Guess)This may be the cause of the BUG (#5767) causing the I/O to be full. So after query cancel, immediately exit the scheduling loop in transfer_thread, and after waiting for the end of all scanner_threads, transfer_thread will also exit.
This commit is contained in:
@ -1252,6 +1252,13 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
|
||||
}
|
||||
// read from scanner
|
||||
while (LIKELY(status.ok())) {
|
||||
// When query cancel, _transfer_done is set to true at OlapScanNode::close,
|
||||
// and the loop is exited at this time, and the current thread exits after
|
||||
// waiting for _running_thread to decrease to 0.
|
||||
if (UNLIKELY(_transfer_done)) {
|
||||
LOG(INFO) << "Transfer thread cancelled, wait for the end of scan thread.";
|
||||
break;
|
||||
}
|
||||
int assigned_thread_num = 0;
|
||||
// copy to local
|
||||
{
|
||||
@ -1361,6 +1368,15 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
|
||||
}
|
||||
|
||||
void OlapScanNode::scanner_thread(OlapScanner* scanner) {
|
||||
if (UNLIKELY(_transfer_done)) {
|
||||
_scanner_done = true;
|
||||
std::unique_lock<std::mutex> l(_scan_batches_lock);
|
||||
_running_thread--;
|
||||
_scan_batch_added_cv.notify_one();
|
||||
_scan_thread_exit_cv.notify_one();
|
||||
LOG(INFO) << "Scan thread cancelled, cause query done, scan thread started to exit";
|
||||
return;
|
||||
}
|
||||
int64_t wait_time = scanner->update_wait_worker_timer();
|
||||
// Do not use ScopedTimer. There is no guarantee that, the counter
|
||||
// (_scan_cpu_timer, the class member) is not destroyed after `_running_thread==0`.
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
#ifndef DORIS_BE_SRC_QUERY_EXEC_OLAP_SCAN_NODE_H
|
||||
#define DORIS_BE_SRC_QUERY_EXEC_OLAP_SCAN_NODE_H
|
||||
|
||||
#include <atomic>
|
||||
#include <boost/thread.hpp>
|
||||
#include <boost/variant/static_visitor.hpp>
|
||||
#include <condition_variable>
|
||||
@ -245,8 +246,9 @@ private:
|
||||
|
||||
int _max_materialized_row_batches;
|
||||
bool _start;
|
||||
bool _scanner_done;
|
||||
bool _transfer_done;
|
||||
// Used in Scan thread to ensure thread-safe
|
||||
std::atomic_bool _scanner_done;
|
||||
std::atomic_bool _transfer_done;
|
||||
size_t _direct_conjunct_size;
|
||||
|
||||
int _total_assign_num;
|
||||
|
||||
Reference in New Issue
Block a user