diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 1a512ec808..c1252f6d05 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -159,7 +159,7 @@ private: std::string _group; int _timeout_second; - bool _cancelled = false; + std::atomic _cancelled {false}; // This context is shared by all fragments of this host in a query std::shared_ptr _fragments_ctx; diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index d21b4b4bf5..c471df3ee1 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -694,8 +694,9 @@ Status VNodeChannel::close_wait(RuntimeState* state) { } // waiting for finished, it may take a long time, so we couldn't set a timeout - while (!_add_batches_finished && !_cancelled) { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) { + // std::this_thread::sleep_for(std::chrono::milliseconds(1)); + bthread_usleep(1000); } _close_time_ms = UnixMillis() - _close_time_ms;