[pipelineX](broadcast) Set dependency ready if a limited exchange returns EOS (#33525)

This commit is contained in:
Gabriel
2024-04-12 16:22:35 +08:00
committed by yiguolei
parent 379c2e0762
commit 1a8b1e6787
3 changed files with 10 additions and 2 deletions

View File

@ -464,6 +464,8 @@ void ExchangeSinkBuffer<Parent>::_set_receiver_eof(InstanceLoId id) {
_rpc_channel_is_idle[id] = true;
_set_ready_to_finish(_busy_channels.fetch_sub(1) == 1);
}
std::queue<BroadcastTransmitInfo<Parent>, std::list<BroadcastTransmitInfo<Parent>>> empty;
swap(empty, _instance_to_broadcast_package_queue[id]);
}
template <typename Parent>

View File

@ -216,6 +216,10 @@ public:
_finish_dependency = finish_dependency;
}
void set_broadcast_dependency(std::shared_ptr<Dependency> broadcast_dependency) {
_broadcast_dependency = broadcast_dependency;
}
void set_should_stop() {
_should_stop = true;
_set_ready_to_finish(_busy_channels == 0);
@ -270,8 +274,9 @@ private:
int64_t get_sum_rpc_time();
std::atomic<int> _total_queue_size = 0;
std::shared_ptr<Dependency> _queue_dependency;
std::shared_ptr<Dependency> _finish_dependency;
std::shared_ptr<Dependency> _queue_dependency = nullptr;
std::shared_ptr<Dependency> _finish_dependency = nullptr;
std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
std::atomic<bool> _should_stop {false};
};

View File

@ -181,6 +181,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_broadcast_dependency =
Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"BroadcastDependency", true, state->get_query_ctx());
_sink_buffer->set_broadcast_dependency(_broadcast_dependency);
_broadcast_pb_blocks =
vectorized::BroadcastPBlockHolderQueue::create_shared(_broadcast_dependency);
for (int i = 0; i < config::num_broadcast_buffer; ++i) {