diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 9f692d0bee..44b655150a 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -464,6 +464,8 @@ void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { _rpc_channel_is_idle[id] = true; _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1); } + std::queue, std::list>> empty; + swap(empty, _instance_to_broadcast_package_queue[id]); } template diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 8c0375499c..43fdc98d24 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -216,6 +216,10 @@ public: _finish_dependency = finish_dependency; } + void set_broadcast_dependency(std::shared_ptr broadcast_dependency) { + _broadcast_dependency = broadcast_dependency; + } + void set_should_stop() { _should_stop = true; _set_ready_to_finish(_busy_channels == 0); @@ -270,8 +274,9 @@ private: int64_t get_sum_rpc_time(); std::atomic _total_queue_size = 0; - std::shared_ptr _queue_dependency; - std::shared_ptr _finish_dependency; + std::shared_ptr _queue_dependency = nullptr; + std::shared_ptr _finish_dependency = nullptr; + std::shared_ptr _broadcast_dependency = nullptr; std::atomic _should_stop {false}; }; diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index ca50f7bd05..07c7130894 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -181,6 +181,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _broadcast_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "BroadcastDependency", true, state->get_query_ctx()); + _sink_buffer->set_broadcast_dependency(_broadcast_dependency); _broadcast_pb_blocks = vectorized::BroadcastPBlockHolderQueue::create_shared(_broadcast_dependency); for (int i = 0; i < config::num_broadcast_buffer; ++i) {