From bf61d2cfc08c55d276c6978ff149fa9edf9ac712 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Sun, 9 Jul 2023 16:27:11 +0800 Subject: [PATCH] [fix](sink) fix pipeline load stuck #21636 --- be/src/vec/sink/vtablet_sink.cpp | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index cd8716abcb..da9e5af5e2 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -109,6 +109,7 @@ public: void Run() override { SCOPED_TRACK_MEMORY_TO_UNKNOWN(); + DCHECK(packet_in_flight); auto open_partition_failed = [this]() { std::stringstream ss; ss << "failed to open partition, error=" << berror(this->cntl.ErrorCode()) @@ -128,7 +129,7 @@ public: open_partition_failed(); } } - done = true; + packet_in_flight = false; } void join() { brpc::Join(cntl.call_id()); } @@ -138,7 +139,7 @@ public: VNodeChannel* vnode_channel; IndexChannel* index_channel; int64_t partition_id; - std::atomic done {false}; + std::atomic packet_in_flight {false}; }; IndexChannel::~IndexChannel() {} @@ -549,6 +550,7 @@ void VNodeChannel::open_partition(int64_t partition_id) { remain_ms = config::min_load_rpc_timeout_ms; } open_partition_closure->cntl.set_timeout_ms(remain_ms); + open_partition_closure->packet_in_flight = true; _stub->open_partition(&open_partition_closure.get()->cntl, &request, &open_partition_closure.get()->result, open_partition_closure.get()); @@ -565,7 +567,7 @@ void VNodeChannel::open_partition_wait() { bool VNodeChannel::open_partition_finished() const { for (auto& open_partition_closure : _open_partition_closures) { - if (!open_partition_closure->done) { + if (open_partition_closure->packet_in_flight) { return false; } } @@ -914,12 +916,7 @@ void VNodeChannel::cancel(const std::string& cancel_msg) { } bool VNodeChannel::is_send_data_rpc_done() const { - if (_add_block_closure != nullptr) { - return _add_batches_finished || (_cancelled && !_add_block_closure->is_packet_in_flight()); - } else { - // such as, canceled before open_wait new closure. - return _add_batches_finished || _cancelled; - } + return _add_batches_finished || _cancelled; } Status VNodeChannel::close_wait(RuntimeState* state) { @@ -1498,8 +1495,9 @@ void VOlapTableSink::try_close(RuntimeState* state, Status exec_status) { } bool VOlapTableSink::is_close_done() { - if (config::enable_lazy_open_partition && !_open_partition_done) { - return false; + // Only after try_close, need to wait rpc end. + if (!_try_close) { + return true; } bool close_done = true; for (const auto& index_channel : _channels) {