From 5d80e7dc2fe7a9cc80cc26244146d5dda8070908 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 7 Nov 2023 22:11:44 +0800 Subject: [PATCH] [Improvement](pipelineX) Improve local exchange on pipelineX engine (#26464) --- .../pipeline/exec/exchange_sink_operator.cpp | 16 +++---- be/src/pipeline/exec/exchange_sink_operator.h | 29 ++++++++++-- be/src/runtime/runtime_state.h | 4 ++ be/src/vec/runtime/vdata_stream_recvr.cpp | 46 +++++++++++++------ be/src/vec/runtime/vdata_stream_recvr.h | 22 +++++---- be/src/vec/sink/vdata_stream_sender.h | 12 ++--- 6 files changed, 85 insertions(+), 44 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index b5cceb5a29..e6bcccbabb 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -192,15 +192,14 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name); } else if (local_size > 0) { size_t dep_id = 0; - _channels_dependency.resize(local_size); + _local_channels_dependency.resize(local_size); _wait_channel_timer.resize(local_size); auto deps_for_channels = AndDependency::create_shared(_parent->operator_id()); for (auto channel : channels) { if (channel->is_local()) { - _channels_dependency[dep_id] = - ChannelDependency::create_shared(_parent->operator_id()); - channel->set_dependency(_channels_dependency[dep_id]); - deps_for_channels->add_child(_channels_dependency[dep_id]); + _local_channels_dependency[dep_id] = channel->get_local_channel_dependency(); + DCHECK(_local_channels_dependency[dep_id] != nullptr); + deps_for_channels->add_child(_local_channels_dependency[dep_id]); _wait_channel_timer[dep_id] = ADD_CHILD_TIMER(_profile, "WaitForLocalExchangeBuffer", timer_name); dep_id++; @@ -428,7 +427,8 @@ Status ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, vec { SCOPED_TIMER(state.serialize_batch_timer()); dest->Clear(); - size_t uncompressed_bytes = 0, compressed_bytes = 0; + size_t uncompressed_bytes = 0; + size_t compressed_bytes = 0; RETURN_IF_ERROR(src->serialize(_state->be_exec_version(), dest, &uncompressed_bytes, &compressed_bytes, _compression_type, _transfer_large_data_by_brpc)); @@ -521,9 +521,9 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { COUNTER_UPDATE(_wait_broadcast_buffer_timer, _broadcast_dependency->write_watcher_elapse_time()); } - for (size_t i = 0; i < _channels_dependency.size(); i++) { + for (size_t i = 0; i < _local_channels_dependency.size(); i++) { COUNTER_UPDATE(_wait_channel_timer[i], - _channels_dependency[i]->write_watcher_elapse_time()); + _local_channels_dependency[i]->write_watcher_elapse_time()); } _sink_buffer->update_profile(profile()); _sink_buffer->close(); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 815d393057..2b42d28958 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -113,11 +113,30 @@ private: std::atomic _available_block; }; -class ChannelDependency final : public WriteDependency { +/** + * We use this to control the execution for local exchange. + * +---------------+ +---------------+ +---------------+ + * | ExchangeSink1 | | ExchangeSink2 | | ExchangeSink3 | + * +---------------+ +---------------+ +---------------+ + * | | | + * | +----------------------------+----------------------------------+ | + * +----+------------------|------------------------------------------+ | | + * | | +------------------------|--------------------|------------+-----+ + * Dependency 1-1 | Dependency 2-1 | Dependency 3-1 | Dependency 1-2 | Dependency 2-2 | Dependency 3-2 | + * +----------------------------------------------+ +----------------------------------------------+ + * | queue1 queue2 queue3 | | queue1 queue2 queue3 | + * | LocalRecvr | | LocalRecvr | + * +----------------------------------------------+ +----------------------------------------------+ + * +-----------------+ +------------------+ + * | ExchangeSource1 | | ExchangeSource2 | + * +-----------------+ +------------------+ + */ +class LocalExchangeChannelDependency final : public WriteDependency { public: - ENABLE_FACTORY_CREATOR(ChannelDependency); - ChannelDependency(int id) : WriteDependency(id, "ChannelDependency") {} - ~ChannelDependency() override = default; + ENABLE_FACTORY_CREATOR(LocalExchangeChannelDependency); + LocalExchangeChannelDependency(int id) + : WriteDependency(id, "LocalExchangeChannelDependency") {} + ~LocalExchangeChannelDependency() override = default; void* shared_state() override { return nullptr; } }; @@ -209,7 +228,7 @@ private: std::shared_ptr _queue_dependency = nullptr; std::shared_ptr _exchange_sink_dependency = nullptr; std::shared_ptr _broadcast_dependency = nullptr; - std::vector> _channels_dependency; + std::vector> _local_channels_dependency; std::unique_ptr _partitioner; int _partition_count; }; diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 1c20725cd3..805951e34e 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -329,6 +329,10 @@ public: return _query_options.__isset.enable_pipeline_engine && _query_options.enable_pipeline_engine; } + bool enable_pipeline_x_exec() const { + return _query_options.__isset.enable_pipeline_x_engine && + _query_options.enable_pipeline_x_engine; + } bool enable_local_shuffle() const { return _query_options.__isset.enable_local_shuffle && _query_options.enable_local_shuffle; } diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index b842cbeea4..9384d4abbf 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -100,7 +100,12 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block _recvr->update_blocks_memory_usage(-block_byte_size); _block_queue.pop_front(); if (_block_queue.size() == 0 && _dependency) { - _dependency->block_reading(); + if (!_is_cancelled && _num_remaining_senders > 0) { + _dependency->block_reading(); + } + for (auto& it : _local_channel_dependency) { + it->set_ready_for_write(); + } } if (!_pending_closures.empty()) { @@ -350,6 +355,14 @@ VDataStreamRecvr::VDataStreamRecvr( std::make_unique("VDataStreamRecvr:" + print_id(_fragment_instance_id)); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + if (state->enable_pipeline_x_exec()) { + _sender_to_local_channel_dependency.resize(num_senders); + for (size_t i = 0; i < num_senders; i++) { + _sender_to_local_channel_dependency[i] = + pipeline::LocalExchangeChannelDependency::create_shared(_dest_node_id); + } + } + // Create one queue per sender if is_merging is true. int num_queues = is_merging ? num_senders : 1; _sender_queues.reserve(num_queues); @@ -358,6 +371,16 @@ VDataStreamRecvr::VDataStreamRecvr( SenderQueue* queue = nullptr; if (_enable_pipeline) { queue = _sender_queue_pool.add(new PipSenderQueue(this, num_sender_per_queue, profile)); + if (state->enable_pipeline_x_exec()) { + auto dependencies = + is_merging + ? std::vector> {_sender_to_local_channel_dependency + [i]} + : _sender_to_local_channel_dependency; + queue->set_local_channel_dependency(dependencies); + } } else { queue = _sender_queue_pool.add(new SenderQueue(this, num_sender_per_queue, profile)); } @@ -424,11 +447,11 @@ bool VDataStreamRecvr::sender_queue_empty(int sender_id) { return _sender_queues[use_sender_id]->queue_empty(); } -void VDataStreamRecvr::set_dependency(std::shared_ptr dependency) { - _dependency = dependency; - for (auto& queue : _sender_queues) { - queue->set_channel_dependency(dependency); - } +std::shared_ptr +VDataStreamRecvr::get_local_channel_dependency(int sender_id) { + DCHECK_GT(_sender_to_local_channel_dependency.size(), sender_id); + DCHECK(_sender_to_local_channel_dependency[sender_id] != nullptr); + return _sender_to_local_channel_dependency[sender_id]; } bool VDataStreamRecvr::ready_to_read() { @@ -482,13 +505,6 @@ void VDataStreamRecvr::cancel_stream(Status exec_status) { void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) { _blocks_memory_usage->add(size); _blocks_memory_usage_current_value = _blocks_memory_usage->current_value(); - if (_dependency && size > 0 && - _blocks_memory_usage_current_value > config::exchg_node_buffer_size_bytes && !_is_closed) { - _dependency->block_writing(); - } else if (_dependency && size < 0 && - _blocks_memory_usage_current_value <= config::exchg_node_buffer_size_bytes) { - _dependency->set_ready_for_write(); - } } void VDataStreamRecvr::close() { @@ -496,8 +512,8 @@ void VDataStreamRecvr::close() { return; } _is_closed = true; - if (_dependency) { - _dependency->set_ready_for_write(); + for (auto& it : _sender_to_local_channel_dependency) { + it->set_ready_for_write(); } for (int i = 0; i < _sender_queues.size(); ++i) { _sender_queues[i]->close(); diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index b2fd6afdc7..2f5c88301e 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -59,7 +59,7 @@ class RuntimeState; namespace pipeline { struct ExchangeDataDependency; -class ChannelDependency; +class LocalExchangeChannelDependency; } // namespace pipeline namespace vectorized { @@ -125,7 +125,8 @@ public: bool is_closed() const { return _is_closed; } - void set_dependency(std::shared_ptr dependency); + std::shared_ptr get_local_channel_dependency( + int sender_id); private: void update_blocks_memory_usage(int64_t size); @@ -183,7 +184,8 @@ private: std::shared_ptr _sub_plan_query_statistics_recvr; bool _enable_pipeline; - std::shared_ptr _dependency; + std::vector> + _sender_to_local_channel_dependency; }; class ThreadClosure : public google::protobuf::Closure { @@ -201,6 +203,12 @@ public: virtual ~SenderQueue(); + void set_local_channel_dependency( + std::vector>& + local_channel_dependency) { + _local_channel_dependency = local_channel_dependency; + } + virtual bool should_wait(); virtual Status get_batch(Block* next_block, bool* eos); @@ -225,10 +233,6 @@ public: _dependency = dependency; } - void set_channel_dependency(std::shared_ptr channel_dependency) { - _channel_dependency = channel_dependency; - } - protected: Status _inner_get_batch_without_lock(Block* block, bool* eos); @@ -251,7 +255,8 @@ protected: std::unordered_map> _local_closure; std::shared_ptr _dependency = nullptr; - std::shared_ptr _channel_dependency = nullptr; + std::vector> + _local_channel_dependency; }; class VDataStreamRecvr::PipSenderQueue : public SenderQueue { @@ -270,5 +275,6 @@ public: void add_block(Block* block, bool use_move) override; }; + } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index a09cb4b7d4..30a55f1329 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -65,7 +65,7 @@ enum CompressionTypePB : int; namespace pipeline { class ExchangeSinkOperator; class ExchangeSinkOperatorX; -class ChannelDependency; +class LocalExchangeChannelDependency; } // namespace pipeline namespace vectorized { @@ -310,11 +310,6 @@ public: bool is_local() const { return _is_local; } - VDataStreamRecvr* local_recvr() { - DCHECK(_is_local && _local_recvr != nullptr); - return _local_recvr.get(); - } - virtual void ch_roll_pb_block(); bool can_write() { @@ -557,11 +552,12 @@ public: return _closure.get(); } - void set_dependency(std::shared_ptr dependency) { + std::shared_ptr get_local_channel_dependency() { if (!Channel::_local_recvr) { throw Exception(ErrorCode::INTERNAL_ERROR, "_local_recvr is null"); } - Channel::_local_recvr->set_dependency(dependency); + return Channel::_local_recvr->get_local_channel_dependency( + Channel::_parent->sender_id()); } private: