diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 47ead19529..1b00293258 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -200,8 +200,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf auto deps_for_channels = AndDependency::create_shared(_parent->id()); for (auto channel : channels) { if (channel->is_local()) { - _channels_dependency[dep_id] = ChannelDependency::create_shared( - _parent->id(), _sender_id, channel->local_recvr()); + _channels_dependency[dep_id] = ChannelDependency::create_shared(_parent->id()); channel->set_dependency(_channels_dependency[dep_id]); deps_for_channels->add_child(_channels_dependency[dep_id]); _wait_channel_timer[dep_id] = diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 2c899ec49a..db83a71097 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -116,38 +116,10 @@ private: class ChannelDependency final : public WriteDependency { public: ENABLE_FACTORY_CREATOR(ChannelDependency); - ChannelDependency(int id, int sender_id, vectorized::VDataStreamRecvr* local_recvr) - : WriteDependency(id, "ChannelDependency"), - _sender_id(sender_id), - _local_recvr(local_recvr) {} + ChannelDependency(int id) : WriteDependency(id, "ChannelDependency") {} ~ChannelDependency() override = default; void* shared_state() override { return nullptr; } - - void try_set_ready_for_write() { - if (_ready_for_write) { - return; - } - if (_is_runnable()) { - _write_dependency_watcher.stop(); - _ready_for_write = true; - } - } - - void try_block_writing() { - if (!_is_runnable()) { - _ready_for_write = false; - } - } - -private: - bool _is_runnable() { - return _local_recvr->is_closed() || !_local_recvr->exceeds_limit(0) || - _local_recvr->sender_queue_empty(_sender_id); - } - - int _sender_id; - vectorized::VDataStreamRecvr* _local_recvr; }; class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index b65f8f6bfe..dd3e1cee29 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -96,9 +96,6 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block auto [next_block, block_byte_size] = std::move(_block_queue.front()); _recvr->update_blocks_memory_usage(-block_byte_size); _block_queue.pop_front(); - if (_channel_dependency) { - _channel_dependency->try_set_ready_for_write(); - } if (_block_queue.size() == 0 && _dependency) { _dependency->block_reading(); } @@ -172,9 +169,6 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num if (!empty) { _block_queue.emplace_back(std::move(block), block_byte_size); - if (_channel_dependency) { - _channel_dependency->try_block_writing(); - } if (_dependency) { _dependency->set_ready_for_read(); } @@ -231,9 +225,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { if (!empty) { _block_queue.emplace_back(std::move(nblock), block_mem_size); - if (_channel_dependency) { - _channel_dependency->try_block_writing(); - } if (_dependency) { _dependency->set_ready_for_read(); } @@ -467,10 +458,12 @@ void VDataStreamRecvr::cancel_stream() { void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) { _blocks_memory_usage->add(size); _blocks_memory_usage_current_value = _blocks_memory_usage->current_value(); - if (_dependency && _blocks_memory_usage_current_value > config::exchg_node_buffer_size_bytes) { - _dependency->try_block_writing(); - } else if (_dependency) { - _dependency->try_set_ready_for_write(); + if (_dependency && size > 0 && + _blocks_memory_usage_current_value > config::exchg_node_buffer_size_bytes && !_is_closed) { + _dependency->block_writing(); + } else if (_dependency && size < 0 && + _blocks_memory_usage_current_value <= config::exchg_node_buffer_size_bytes) { + _dependency->set_ready_for_write(); } } @@ -480,7 +473,7 @@ void VDataStreamRecvr::close() { } _is_closed = true; if (_dependency) { - _dependency->try_set_ready_for_write(); + _dependency->set_ready_for_write(); } for (int i = 0; i < _sender_queues.size(); ++i) { _sender_queues[i]->close(); @@ -527,9 +520,6 @@ void VDataStreamRecvr::PipSenderQueue::add_block(Block* block, bool use_move) { return; } _block_queue.emplace_back(std::move(nblock), block_mem_size); - if (_channel_dependency) { - _channel_dependency->try_block_writing(); - } if (_dependency) { _dependency->set_ready_for_read(); }