From ed3420000eae78e4e1942758fe2189d80ff08dcd Mon Sep 17 00:00:00 2001 From: YueW <45946325+Tanya-W@users.noreply.github.com> Date: Tue, 14 Feb 2023 00:08:57 +0800 Subject: [PATCH] [fix](bthread) fix bthread hang (#16594) --- be/src/runtime/fragment_mgr.cpp | 2 +- be/src/vec/sink/vtablet_sink.cpp | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 1a512ec808..c1252f6d05 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -159,7 +159,7 @@ private: std::string _group; int _timeout_second; - bool _cancelled = false; + std::atomic _cancelled {false}; // This context is shared by all fragments of this host in a query std::shared_ptr _fragments_ctx; diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index d21b4b4bf5..c471df3ee1 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -694,8 +694,9 @@ Status VNodeChannel::close_wait(RuntimeState* state) { } // waiting for finished, it may take a long time, so we couldn't set a timeout - while (!_add_batches_finished && !_cancelled) { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) { + // std::this_thread::sleep_for(std::chrono::milliseconds(1)); + bthread_usleep(1000); } _close_time_ms = UnixMillis() - _close_time_ms;