@ -170,19 +170,20 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
|
||||
_part_type = p._part_type;
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
|
||||
int local_size = 0;
|
||||
for (int i = 0; i < channels.size(); ++i) {
|
||||
RETURN_IF_ERROR(channels[i]->open(state));
|
||||
if (channels[i]->is_local()) {
|
||||
local_size++;
|
||||
}
|
||||
}
|
||||
if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM ||
|
||||
_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
|
||||
std::random_device rd;
|
||||
std::mt19937 g(rd());
|
||||
shuffle(channels.begin(), channels.end(), g);
|
||||
}
|
||||
int local_size = 0;
|
||||
for (int i = 0; i < channels.size(); ++i) {
|
||||
RETURN_IF_ERROR(channels[i]->open(state));
|
||||
if (channels[i]->is_local()) {
|
||||
local_size++;
|
||||
_last_local_channel_idx = i;
|
||||
}
|
||||
}
|
||||
only_local_exchange = local_size == channels.size();
|
||||
|
||||
PUniqueId id;
|
||||
@ -446,11 +447,17 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
|
||||
if (local_state.only_local_exchange) {
|
||||
if (!block->empty()) {
|
||||
Status status;
|
||||
size_t idx = 0;
|
||||
for (auto* channel : local_state.channels) {
|
||||
if (!channel->is_receiver_eof()) {
|
||||
status = channel->send_local_block(block);
|
||||
// If this channel is the last, we can move this block to downstream pipeline.
|
||||
// Otherwise, this block also need to be broadcasted to other channels so should be copied.
|
||||
DCHECK_GE(local_state._last_local_channel_idx, 0);
|
||||
status = channel->send_local_block(
|
||||
block, idx == local_state._last_local_channel_idx);
|
||||
HANDLE_CHANNEL_STATUS(state, channel, status);
|
||||
}
|
||||
idx++;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -471,21 +478,33 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
|
||||
} else {
|
||||
block_holder->get_block()->Clear();
|
||||
}
|
||||
size_t idx = 0;
|
||||
bool moved = false;
|
||||
for (auto* channel : local_state.channels) {
|
||||
if (!channel->is_receiver_eof()) {
|
||||
Status status;
|
||||
if (channel->is_local()) {
|
||||
status = channel->send_local_block(&cur_block);
|
||||
// If this channel is the last, we can move this block to downstream pipeline.
|
||||
// Otherwise, this block also need to be broadcasted to other channels so should be copied.
|
||||
DCHECK_GE(local_state._last_local_channel_idx, 0);
|
||||
status = channel->send_local_block(
|
||||
&cur_block, idx == local_state._last_local_channel_idx);
|
||||
moved = idx == local_state._last_local_channel_idx;
|
||||
} else {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
status = channel->send_broadcast_block(block_holder, eos);
|
||||
}
|
||||
HANDLE_CHANNEL_STATUS(state, channel, status);
|
||||
}
|
||||
idx++;
|
||||
}
|
||||
if (moved) {
|
||||
local_state._serializer.reset_block();
|
||||
} else {
|
||||
cur_block.clear_column_data();
|
||||
local_state._serializer.get_block()->set_mutable_columns(
|
||||
cur_block.mutate_columns());
|
||||
}
|
||||
cur_block.clear_column_data();
|
||||
local_state._serializer.get_block()->set_mutable_columns(
|
||||
cur_block.mutate_columns());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -496,7 +515,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
|
||||
if (!current_channel->is_receiver_eof()) {
|
||||
// 2. serialize, send and rollover block
|
||||
if (current_channel->is_local()) {
|
||||
auto status = current_channel->send_local_block(block);
|
||||
auto status = current_channel->send_local_block(block, true);
|
||||
HANDLE_CHANNEL_STATUS(state, current_channel, status);
|
||||
} else {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
@ -582,7 +601,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
|
||||
if (!current_channel->is_receiver_eof()) {
|
||||
// 2. serialize, send and rollover block
|
||||
if (current_channel->is_local()) {
|
||||
auto status = current_channel->send_local_block(block);
|
||||
auto status = current_channel->send_local_block(block, true);
|
||||
HANDLE_CHANNEL_STATUS(state, current_channel, status);
|
||||
} else {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
|
||||
@ -234,6 +234,7 @@ private:
|
||||
// for external table sink hash partition
|
||||
std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
|
||||
std::atomic<bool> _reach_limit = false;
|
||||
int _last_local_channel_idx = -1;
|
||||
};
|
||||
|
||||
class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> {
|
||||
|
||||
@ -210,7 +210,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status)
|
||||
Status status;
|
||||
for (auto channel : _channels) {
|
||||
if (!channel->is_receiver_eof()) {
|
||||
status = channel->send_local_block(_output_block.get());
|
||||
status = channel->send_local_block(_output_block.get(), false);
|
||||
HANDLE_CHANNEL_STATUS(state, channel, status);
|
||||
}
|
||||
}
|
||||
@ -234,7 +234,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status)
|
||||
for (auto channel : _channels) {
|
||||
if (!channel->is_receiver_eof()) {
|
||||
if (channel->is_local()) {
|
||||
status = channel->send_local_block(&cur_block);
|
||||
status = channel->send_local_block(&cur_block, false);
|
||||
} else {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
status = channel->send_broadcast_block(_block_holder, true);
|
||||
|
||||
@ -27,6 +27,11 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
|
||||
SCOPED_TIMER(_init_timer);
|
||||
_compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
|
||||
_distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
|
||||
if (_parent->cast<LocalExchangeSinkOperatorX>()._type == ExchangeType::HASH_SHUFFLE) {
|
||||
_profile->add_info_string(
|
||||
"UseGlobalShuffle",
|
||||
std::to_string(_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle));
|
||||
}
|
||||
_channel_id = info.task_idx;
|
||||
return Status::OK();
|
||||
}
|
||||
@ -61,10 +66,12 @@ Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_statu
|
||||
std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const {
|
||||
fmt::memory_buffer debug_string_buffer;
|
||||
fmt::format_to(debug_string_buffer,
|
||||
"{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, "
|
||||
"{}, _use_global_shuffle: {}, _channel_id: {}, _num_partitions: {}, "
|
||||
"_num_senders: {}, _num_sources: {}, "
|
||||
"_running_sink_operators: {}, _running_source_operators: {}, _release_count: {}",
|
||||
Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions,
|
||||
_exchanger->_num_senders, _exchanger->_num_sources,
|
||||
Base::debug_string(indentation_level),
|
||||
_parent->cast<LocalExchangeSinkOperatorX>()._use_global_shuffle, _channel_id,
|
||||
_exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources,
|
||||
_exchanger->_running_sink_operators, _exchanger->_running_source_operators,
|
||||
_release_count);
|
||||
return fmt::to_string(debug_string_buffer);
|
||||
@ -76,6 +83,7 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets
|
||||
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")";
|
||||
_type = type;
|
||||
if (_type == ExchangeType::HASH_SHUFFLE) {
|
||||
_use_global_shuffle = should_disable_bucket_shuffle;
|
||||
// For shuffle join, if data distribution has been broken by previous operator, we
|
||||
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned,
|
||||
// we should use map shuffle idx to instance idx because all instances will be
|
||||
|
||||
@ -125,6 +125,7 @@ private:
|
||||
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
|
||||
const std::map<int, int> _bucket_seq_to_instance_idx;
|
||||
std::vector<std::pair<int, int>> _shuffle_idx_to_instance_idx;
|
||||
bool _use_global_shuffle = false;
|
||||
};
|
||||
|
||||
} // namespace doris::pipeline
|
||||
|
||||
@ -231,7 +231,7 @@ Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
|
||||
}
|
||||
|
||||
template <typename Parent>
|
||||
Status Channel<Parent>::send_local_block(Block* block) {
|
||||
Status Channel<Parent>::send_local_block(Block* block, bool can_be_moved) {
|
||||
SCOPED_TIMER(_parent->local_send_timer());
|
||||
if (_recvr_is_valid()) {
|
||||
if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
|
||||
@ -239,7 +239,7 @@ Status Channel<Parent>::send_local_block(Block* block) {
|
||||
COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
|
||||
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
|
||||
}
|
||||
_local_recvr->add_block(block, _parent->sender_id(), false);
|
||||
_local_recvr->add_block(block, _parent->sender_id(), can_be_moved);
|
||||
return Status::OK();
|
||||
} else {
|
||||
return _receiver_status;
|
||||
@ -646,7 +646,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
|
||||
Status status;
|
||||
for (auto channel : _channels) {
|
||||
if (!channel->is_receiver_eof()) {
|
||||
status = channel->send_local_block(block);
|
||||
status = channel->send_local_block(block, false);
|
||||
HANDLE_CHANNEL_STATUS(state, channel, status);
|
||||
}
|
||||
}
|
||||
@ -671,7 +671,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
|
||||
for (auto channel : _channels) {
|
||||
if (!channel->is_receiver_eof()) {
|
||||
if (channel->is_local()) {
|
||||
status = channel->send_local_block(&cur_block);
|
||||
status = channel->send_local_block(&cur_block, false);
|
||||
} else {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
status = channel->send_broadcast_block(block_holder, eos);
|
||||
@ -698,7 +698,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
|
||||
for (auto channel : _channels) {
|
||||
if (!channel->is_receiver_eof()) {
|
||||
if (channel->is_local()) {
|
||||
status = channel->send_local_block(&cur_block);
|
||||
status = channel->send_local_block(&cur_block, false);
|
||||
} else {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
status = channel->send_remote_block(_cur_pb_block, false);
|
||||
@ -717,7 +717,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
|
||||
if (!current_channel->is_receiver_eof()) {
|
||||
// 2. serialize, send and rollover block
|
||||
if (current_channel->is_local()) {
|
||||
auto status = current_channel->send_local_block(block);
|
||||
auto status = current_channel->send_local_block(block, false);
|
||||
HANDLE_CHANNEL_STATUS(state, current_channel, status);
|
||||
} else {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
@ -829,7 +829,7 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) {
|
||||
for (auto channel : _channels) {
|
||||
if (!channel->is_receiver_eof()) {
|
||||
if (channel->is_local()) {
|
||||
status = channel->send_local_block(&block);
|
||||
status = channel->send_local_block(&block, false);
|
||||
} else {
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
status = channel->send_remote_block(_cur_pb_block, false);
|
||||
|
||||
@ -307,7 +307,7 @@ public:
|
||||
|
||||
Status send_local_block(Status exec_status, bool eos = false);
|
||||
|
||||
Status send_local_block(Block* block);
|
||||
Status send_local_block(Block* block, bool can_be_moved);
|
||||
// Flush buffered rows and close channel. This function don't wait the response
|
||||
// of close operation, client should call close_wait() to finish channel's close.
|
||||
// We split one close operation into two phases in order to make multiple channels
|
||||
|
||||
Reference in New Issue
Block a user