[Improvement](pipelineX) Improve local exchange on pipelineX engine (#26464)

This commit is contained in:
Gabriel
2023-11-07 22:11:44 +08:00
committed by GitHub
parent ceccc451fa
commit 5d80e7dc2f
6 changed files with 85 additions and 44 deletions

View File

@ -192,15 +192,14 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name);
} else if (local_size > 0) {
size_t dep_id = 0;
_channels_dependency.resize(local_size);
_local_channels_dependency.resize(local_size);
_wait_channel_timer.resize(local_size);
auto deps_for_channels = AndDependency::create_shared(_parent->operator_id());
for (auto channel : channels) {
if (channel->is_local()) {
_channels_dependency[dep_id] =
ChannelDependency::create_shared(_parent->operator_id());
channel->set_dependency(_channels_dependency[dep_id]);
deps_for_channels->add_child(_channels_dependency[dep_id]);
_local_channels_dependency[dep_id] = channel->get_local_channel_dependency();
DCHECK(_local_channels_dependency[dep_id] != nullptr);
deps_for_channels->add_child(_local_channels_dependency[dep_id]);
_wait_channel_timer[dep_id] =
ADD_CHILD_TIMER(_profile, "WaitForLocalExchangeBuffer", timer_name);
dep_id++;
@ -428,7 +427,8 @@ Status ExchangeSinkOperatorX::serialize_block(ExchangeSinkLocalState& state, vec
{
SCOPED_TIMER(state.serialize_batch_timer());
dest->Clear();
size_t uncompressed_bytes = 0, compressed_bytes = 0;
size_t uncompressed_bytes = 0;
size_t compressed_bytes = 0;
RETURN_IF_ERROR(src->serialize(_state->be_exec_version(), dest, &uncompressed_bytes,
&compressed_bytes, _compression_type,
_transfer_large_data_by_brpc));
@ -521,9 +521,9 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
COUNTER_UPDATE(_wait_broadcast_buffer_timer,
_broadcast_dependency->write_watcher_elapse_time());
}
for (size_t i = 0; i < _channels_dependency.size(); i++) {
for (size_t i = 0; i < _local_channels_dependency.size(); i++) {
COUNTER_UPDATE(_wait_channel_timer[i],
_channels_dependency[i]->write_watcher_elapse_time());
_local_channels_dependency[i]->write_watcher_elapse_time());
}
_sink_buffer->update_profile(profile());
_sink_buffer->close();

View File

@ -113,11 +113,30 @@ private:
std::atomic<int> _available_block;
};
class ChannelDependency final : public WriteDependency {
/**
* We use this to control the execution for local exchange.
* +---------------+ +---------------+ +---------------+
* | ExchangeSink1 | | ExchangeSink2 | | ExchangeSink3 |
* +---------------+ +---------------+ +---------------+
* | | |
* | +----------------------------+----------------------------------+ |
* +----+------------------|------------------------------------------+ | |
* | | +------------------------|--------------------|------------+-----+
* Dependency 1-1 | Dependency 2-1 | Dependency 3-1 | Dependency 1-2 | Dependency 2-2 | Dependency 3-2 |
* +----------------------------------------------+ +----------------------------------------------+
* | queue1 queue2 queue3 | | queue1 queue2 queue3 |
* | LocalRecvr | | LocalRecvr |
* +----------------------------------------------+ +----------------------------------------------+
* +-----------------+ +------------------+
* | ExchangeSource1 | | ExchangeSource2 |
* +-----------------+ +------------------+
*/
class LocalExchangeChannelDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(ChannelDependency);
ChannelDependency(int id) : WriteDependency(id, "ChannelDependency") {}
~ChannelDependency() override = default;
ENABLE_FACTORY_CREATOR(LocalExchangeChannelDependency);
LocalExchangeChannelDependency(int id)
: WriteDependency(id, "LocalExchangeChannelDependency") {}
~LocalExchangeChannelDependency() override = default;
void* shared_state() override { return nullptr; }
};
@ -209,7 +228,7 @@ private:
std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency = nullptr;
std::shared_ptr<AndDependency> _exchange_sink_dependency = nullptr;
std::shared_ptr<BroadcastDependency> _broadcast_dependency = nullptr;
std::vector<std::shared_ptr<ChannelDependency>> _channels_dependency;
std::vector<std::shared_ptr<LocalExchangeChannelDependency>> _local_channels_dependency;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
int _partition_count;
};

View File

@ -329,6 +329,10 @@ public:
return _query_options.__isset.enable_pipeline_engine &&
_query_options.enable_pipeline_engine;
}
bool enable_pipeline_x_exec() const {
return _query_options.__isset.enable_pipeline_x_engine &&
_query_options.enable_pipeline_x_engine;
}
bool enable_local_shuffle() const {
return _query_options.__isset.enable_local_shuffle && _query_options.enable_local_shuffle;
}

View File

@ -100,7 +100,12 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
_recvr->update_blocks_memory_usage(-block_byte_size);
_block_queue.pop_front();
if (_block_queue.size() == 0 && _dependency) {
_dependency->block_reading();
if (!_is_cancelled && _num_remaining_senders > 0) {
_dependency->block_reading();
}
for (auto& it : _local_channel_dependency) {
it->set_ready_for_write();
}
}
if (!_pending_closures.empty()) {
@ -350,6 +355,14 @@ VDataStreamRecvr::VDataStreamRecvr(
std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id));
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
if (state->enable_pipeline_x_exec()) {
_sender_to_local_channel_dependency.resize(num_senders);
for (size_t i = 0; i < num_senders; i++) {
_sender_to_local_channel_dependency[i] =
pipeline::LocalExchangeChannelDependency::create_shared(_dest_node_id);
}
}
// Create one queue per sender if is_merging is true.
int num_queues = is_merging ? num_senders : 1;
_sender_queues.reserve(num_queues);
@ -358,6 +371,16 @@ VDataStreamRecvr::VDataStreamRecvr(
SenderQueue* queue = nullptr;
if (_enable_pipeline) {
queue = _sender_queue_pool.add(new PipSenderQueue(this, num_sender_per_queue, profile));
if (state->enable_pipeline_x_exec()) {
auto dependencies =
is_merging
? std::vector<std::shared_ptr<
pipeline::
LocalExchangeChannelDependency>> {_sender_to_local_channel_dependency
[i]}
: _sender_to_local_channel_dependency;
queue->set_local_channel_dependency(dependencies);
}
} else {
queue = _sender_queue_pool.add(new SenderQueue(this, num_sender_per_queue, profile));
}
@ -424,11 +447,11 @@ bool VDataStreamRecvr::sender_queue_empty(int sender_id) {
return _sender_queues[use_sender_id]->queue_empty();
}
void VDataStreamRecvr::set_dependency(std::shared_ptr<pipeline::ChannelDependency> dependency) {
_dependency = dependency;
for (auto& queue : _sender_queues) {
queue->set_channel_dependency(dependency);
}
std::shared_ptr<pipeline::LocalExchangeChannelDependency>
VDataStreamRecvr::get_local_channel_dependency(int sender_id) {
DCHECK_GT(_sender_to_local_channel_dependency.size(), sender_id);
DCHECK(_sender_to_local_channel_dependency[sender_id] != nullptr);
return _sender_to_local_channel_dependency[sender_id];
}
bool VDataStreamRecvr::ready_to_read() {
@ -482,13 +505,6 @@ void VDataStreamRecvr::cancel_stream(Status exec_status) {
void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) {
_blocks_memory_usage->add(size);
_blocks_memory_usage_current_value = _blocks_memory_usage->current_value();
if (_dependency && size > 0 &&
_blocks_memory_usage_current_value > config::exchg_node_buffer_size_bytes && !_is_closed) {
_dependency->block_writing();
} else if (_dependency && size < 0 &&
_blocks_memory_usage_current_value <= config::exchg_node_buffer_size_bytes) {
_dependency->set_ready_for_write();
}
}
void VDataStreamRecvr::close() {
@ -496,8 +512,8 @@ void VDataStreamRecvr::close() {
return;
}
_is_closed = true;
if (_dependency) {
_dependency->set_ready_for_write();
for (auto& it : _sender_to_local_channel_dependency) {
it->set_ready_for_write();
}
for (int i = 0; i < _sender_queues.size(); ++i) {
_sender_queues[i]->close();

View File

@ -59,7 +59,7 @@ class RuntimeState;
namespace pipeline {
struct ExchangeDataDependency;
class ChannelDependency;
class LocalExchangeChannelDependency;
} // namespace pipeline
namespace vectorized {
@ -125,7 +125,8 @@ public:
bool is_closed() const { return _is_closed; }
void set_dependency(std::shared_ptr<pipeline::ChannelDependency> dependency);
std::shared_ptr<pipeline::LocalExchangeChannelDependency> get_local_channel_dependency(
int sender_id);
private:
void update_blocks_memory_usage(int64_t size);
@ -183,7 +184,8 @@ private:
std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
bool _enable_pipeline;
std::shared_ptr<pipeline::ChannelDependency> _dependency;
std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
_sender_to_local_channel_dependency;
};
class ThreadClosure : public google::protobuf::Closure {
@ -201,6 +203,12 @@ public:
virtual ~SenderQueue();
void set_local_channel_dependency(
std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>&
local_channel_dependency) {
_local_channel_dependency = local_channel_dependency;
}
virtual bool should_wait();
virtual Status get_batch(Block* next_block, bool* eos);
@ -225,10 +233,6 @@ public:
_dependency = dependency;
}
void set_channel_dependency(std::shared_ptr<pipeline::ChannelDependency> channel_dependency) {
_channel_dependency = channel_dependency;
}
protected:
Status _inner_get_batch_without_lock(Block* block, bool* eos);
@ -251,7 +255,8 @@ protected:
std::unordered_map<std::thread::id, std::unique_ptr<ThreadClosure>> _local_closure;
std::shared_ptr<pipeline::ExchangeDataDependency> _dependency = nullptr;
std::shared_ptr<pipeline::ChannelDependency> _channel_dependency = nullptr;
std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
_local_channel_dependency;
};
class VDataStreamRecvr::PipSenderQueue : public SenderQueue {
@ -270,5 +275,6 @@ public:
void add_block(Block* block, bool use_move) override;
};
} // namespace vectorized
} // namespace doris

View File

@ -65,7 +65,7 @@ enum CompressionTypePB : int;
namespace pipeline {
class ExchangeSinkOperator;
class ExchangeSinkOperatorX;
class ChannelDependency;
class LocalExchangeChannelDependency;
} // namespace pipeline
namespace vectorized {
@ -310,11 +310,6 @@ public:
bool is_local() const { return _is_local; }
VDataStreamRecvr* local_recvr() {
DCHECK(_is_local && _local_recvr != nullptr);
return _local_recvr.get();
}
virtual void ch_roll_pb_block();
bool can_write() {
@ -557,11 +552,12 @@ public:
return _closure.get();
}
void set_dependency(std::shared_ptr<pipeline::ChannelDependency> dependency) {
std::shared_ptr<pipeline::LocalExchangeChannelDependency> get_local_channel_dependency() {
if (!Channel<Parent>::_local_recvr) {
throw Exception(ErrorCode::INTERNAL_ERROR, "_local_recvr is null");
}
Channel<Parent>::_local_recvr->set_dependency(dependency);
return Channel<Parent>::_local_recvr->get_local_channel_dependency(
Channel<Parent>::_parent->sender_id());
}
private: