diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index b1099fb0c6..9ab9e0a520 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -1252,6 +1252,13 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { } // read from scanner while (LIKELY(status.ok())) { + // When query cancel, _transfer_done is set to true at OlapScanNode::close, + // and the loop is exited at this time, and the current thread exits after + // waiting for _running_thread to decrease to 0. + if (UNLIKELY(_transfer_done)) { + LOG(INFO) << "Transfer thread cancelled, wait for the end of scan thread."; + break; + } int assigned_thread_num = 0; // copy to local { @@ -1361,6 +1368,15 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { } void OlapScanNode::scanner_thread(OlapScanner* scanner) { + if (UNLIKELY(_transfer_done)) { + _scanner_done = true; + std::unique_lock l(_scan_batches_lock); + _running_thread--; + _scan_batch_added_cv.notify_one(); + _scan_thread_exit_cv.notify_one(); + LOG(INFO) << "Scan thread cancelled, cause query done, scan thread started to exit"; + return; + } int64_t wait_time = scanner->update_wait_worker_timer(); // Do not use ScopedTimer. There is no guarantee that, the counter // (_scan_cpu_timer, the class member) is not destroyed after `_running_thread==0`. diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index e643e1b5a0..f0c6695b6c 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -18,6 +18,7 @@ #ifndef DORIS_BE_SRC_QUERY_EXEC_OLAP_SCAN_NODE_H #define DORIS_BE_SRC_QUERY_EXEC_OLAP_SCAN_NODE_H +#include #include #include #include @@ -245,8 +246,9 @@ private: int _max_materialized_row_batches; bool _start; - bool _scanner_done; - bool _transfer_done; + // Used in Scan thread to ensure thread-safe + std::atomic_bool _scanner_done; + std::atomic_bool _transfer_done; size_t _direct_conjunct_size; int _total_assign_num;