From dd06cc7609960cfdf34c9c505a9bb660b0c4ecc3 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 15 Feb 2023 22:03:27 +0800 Subject: [PATCH] [pipeline](shuffle) Improve broadcast shuffle (#16779) Now we reuse buffer pool for broadcast shuffle on pipeline engine. This PR ensures that a pipeline with a broadcast shuffle sink will not be scheduled if there are no available buffer in the buffer pool --- be/src/vec/sink/vdata_stream_sender.cpp | 60 ++++++++++++------------- be/src/vec/sink/vdata_stream_sender.h | 1 + 2 files changed, 31 insertions(+), 30 deletions(-) diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index be18cb931a..ee55a2c8b7 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -446,9 +446,14 @@ Status VDataStreamSender::prepare(RuntimeState* state) { Status VDataStreamSender::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VDataStreamSender::open"); DCHECK(state != nullptr); + int local_size = 0; for (int i = 0; i < _channels.size(); ++i) { RETURN_IF_ERROR(_channels[i]->init(state)); + if (_channels[i]->is_local()) { + local_size++; + } } + _only_local_exchange = local_size == _channels.size(); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state)); @@ -463,13 +468,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { // 1. serialize depends on it is not local exchange // 2. send block // 3. rollover block - int local_size = 0; - for (auto channel : _channels) { - if (channel->is_local()) { - local_size++; - } - } - if (local_size == _channels.size()) { + if (_only_local_exchange) { for (auto channel : _channels) { RETURN_IF_ERROR(channel->send_local_block(block)); } @@ -641,25 +640,10 @@ void VDataStreamSender::_roll_pb_block() { } Status VDataStreamSender::_get_next_available_buffer(BroadcastPBlockHolder** holder) { - constexpr int MAX_LOOP = 1000; - - size_t it = 0; - while (it < MAX_LOOP) { - if (_broadcast_pb_block_idx == _broadcast_pb_blocks.size()) { - _broadcast_pb_block_idx = 0; - } - - for (; _broadcast_pb_block_idx < _broadcast_pb_blocks.size(); _broadcast_pb_block_idx++) { - if (_broadcast_pb_blocks[_broadcast_pb_block_idx].available()) { - _broadcast_pb_block_idx++; - *holder = &_broadcast_pb_blocks[_broadcast_pb_block_idx - 1]; - return Status::OK(); - } - } - it++; - } - return Status::InternalError( - "Exceed the max loop limit when acquire the next available buffer!"); + DCHECK(_broadcast_pb_blocks[_broadcast_pb_block_idx].available()); + *holder = &_broadcast_pb_blocks[_broadcast_pb_block_idx]; + _broadcast_pb_block_idx++; + return Status::OK(); } void VDataStreamSender::registe_channels(pipeline::ExchangeSinkBuffer* buffer) { @@ -669,12 +653,28 @@ void VDataStreamSender::registe_channels(pipeline::ExchangeSinkBuffer* buffer) { } bool VDataStreamSender::channel_all_can_write() { - for (auto channel : _channels) { - if (!channel->can_write()) { - return false; + if ((_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) && + !_only_local_exchange) { + // This condition means we need use broadcast buffer, so we should make sure + // there are available buffer before running pipeline + if (_broadcast_pb_block_idx == _broadcast_pb_blocks.size()) { + _broadcast_pb_block_idx = 0; } + + for (; _broadcast_pb_block_idx < _broadcast_pb_blocks.size(); _broadcast_pb_block_idx++) { + if (_broadcast_pb_blocks[_broadcast_pb_block_idx].available()) { + return true; + } + } + return false; + } else { + for (auto channel : _channels) { + if (!channel->can_write()) { + return false; + } + } + return true; } - return true; } } // namespace doris::vectorized diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 412ed9f546..5b2da70ce1 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -207,6 +207,7 @@ protected: segment_v2::CompressionTypePB _compression_type; bool _new_shuffle_hash_method = false; + bool _only_local_exchange = false; }; class Channel {