[pipeline](shuffle) Improve broadcast shuffle (#16779)
Now we reuse buffer pool for broadcast shuffle on pipeline engine. This PR ensures that a pipeline with a broadcast shuffle sink will not be scheduled if there are no available buffer in the buffer pool
This commit is contained in:
@ -446,9 +446,14 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
|
||||
Status VDataStreamSender::open(RuntimeState* state) {
|
||||
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VDataStreamSender::open");
|
||||
DCHECK(state != nullptr);
|
||||
int local_size = 0;
|
||||
for (int i = 0; i < _channels.size(); ++i) {
|
||||
RETURN_IF_ERROR(_channels[i]->init(state));
|
||||
if (_channels[i]->is_local()) {
|
||||
local_size++;
|
||||
}
|
||||
}
|
||||
_only_local_exchange = local_size == _channels.size();
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state));
|
||||
|
||||
@ -463,13 +468,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
|
||||
// 1. serialize depends on it is not local exchange
|
||||
// 2. send block
|
||||
// 3. rollover block
|
||||
int local_size = 0;
|
||||
for (auto channel : _channels) {
|
||||
if (channel->is_local()) {
|
||||
local_size++;
|
||||
}
|
||||
}
|
||||
if (local_size == _channels.size()) {
|
||||
if (_only_local_exchange) {
|
||||
for (auto channel : _channels) {
|
||||
RETURN_IF_ERROR(channel->send_local_block(block));
|
||||
}
|
||||
@ -641,25 +640,10 @@ void VDataStreamSender::_roll_pb_block() {
|
||||
}
|
||||
|
||||
Status VDataStreamSender::_get_next_available_buffer(BroadcastPBlockHolder** holder) {
|
||||
constexpr int MAX_LOOP = 1000;
|
||||
|
||||
size_t it = 0;
|
||||
while (it < MAX_LOOP) {
|
||||
if (_broadcast_pb_block_idx == _broadcast_pb_blocks.size()) {
|
||||
_broadcast_pb_block_idx = 0;
|
||||
}
|
||||
|
||||
for (; _broadcast_pb_block_idx < _broadcast_pb_blocks.size(); _broadcast_pb_block_idx++) {
|
||||
if (_broadcast_pb_blocks[_broadcast_pb_block_idx].available()) {
|
||||
_broadcast_pb_block_idx++;
|
||||
*holder = &_broadcast_pb_blocks[_broadcast_pb_block_idx - 1];
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
it++;
|
||||
}
|
||||
return Status::InternalError(
|
||||
"Exceed the max loop limit when acquire the next available buffer!");
|
||||
DCHECK(_broadcast_pb_blocks[_broadcast_pb_block_idx].available());
|
||||
*holder = &_broadcast_pb_blocks[_broadcast_pb_block_idx];
|
||||
_broadcast_pb_block_idx++;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void VDataStreamSender::registe_channels(pipeline::ExchangeSinkBuffer* buffer) {
|
||||
@ -669,12 +653,28 @@ void VDataStreamSender::registe_channels(pipeline::ExchangeSinkBuffer* buffer) {
|
||||
}
|
||||
|
||||
bool VDataStreamSender::channel_all_can_write() {
|
||||
for (auto channel : _channels) {
|
||||
if (!channel->can_write()) {
|
||||
return false;
|
||||
if ((_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) &&
|
||||
!_only_local_exchange) {
|
||||
// This condition means we need use broadcast buffer, so we should make sure
|
||||
// there are available buffer before running pipeline
|
||||
if (_broadcast_pb_block_idx == _broadcast_pb_blocks.size()) {
|
||||
_broadcast_pb_block_idx = 0;
|
||||
}
|
||||
|
||||
for (; _broadcast_pb_block_idx < _broadcast_pb_blocks.size(); _broadcast_pb_block_idx++) {
|
||||
if (_broadcast_pb_blocks[_broadcast_pb_block_idx].available()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
} else {
|
||||
for (auto channel : _channels) {
|
||||
if (!channel->can_write()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -207,6 +207,7 @@ protected:
|
||||
segment_v2::CompressionTypePB _compression_type;
|
||||
|
||||
bool _new_shuffle_hash_method = false;
|
||||
bool _only_local_exchange = false;
|
||||
};
|
||||
|
||||
class Channel {
|
||||
|
||||
Reference in New Issue
Block a user