From e326ebb63e4e07d8ee6595561ab19dc5d411f592 Mon Sep 17 00:00:00 2001 From: Mryange <59914473+Mryange@users.noreply.github.com> Date: Mon, 25 Dec 2023 10:31:50 +0800 Subject: [PATCH] [feature](pipelineX) control exchange sink by memory usage (#28814) --- be/src/pipeline/exec/exchange_sink_operator.cpp | 7 ++++++- be/src/pipeline/exec/exchange_sink_operator.h | 7 ++++++- .../pipeline/exec/exchange_source_operator.cpp | 1 + be/src/vec/runtime/vdata_stream_recvr.cpp | 16 ++++++++++++---- be/src/vec/runtime/vdata_stream_recvr.h | 9 ++++++++- be/src/vec/sink/vdata_stream_sender.h | 5 +++++ 6 files changed, 38 insertions(+), 7 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 43bec0bd92..9f9b36d1cb 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -199,17 +199,22 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _wait_channel_timer.resize(local_size); auto deps_for_channels = AndDependency::create_shared( _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); - for (auto channel : channels) { + auto deps_for_channels_mem_limit = AndDependency::create_shared( + _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); + for (auto* channel : channels) { if (channel->is_local()) { _local_channels_dependency[dep_id] = channel->get_local_channel_dependency(); DCHECK(_local_channels_dependency[dep_id] != nullptr); deps_for_channels->add_child(_local_channels_dependency[dep_id]); _wait_channel_timer[dep_id] = ADD_CHILD_TIMER( _profile, fmt::format("WaitForLocalExchangeBuffer{}", dep_id), timer_name); + auto local_recvr = channel->local_recvr(); + deps_for_channels_mem_limit->add_child(local_recvr->get_mem_limit_dependency()); dep_id++; } } _exchange_sink_dependency->add_child(deps_for_channels); + _exchange_sink_dependency->add_child(deps_for_channels_mem_limit); } if (p._part_type == TPartitionType::HASH_PARTITIONED) { _partition_count = channels.size(); diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 5df03ea777..a34c4f4b43 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -96,9 +96,14 @@ public: LocalExchangeChannelDependency(int id, int node_id, QueryContext* query_ctx) : Dependency(id, node_id, "LocalExchangeChannelDependency", true, query_ctx) {} ~LocalExchangeChannelDependency() override = default; - // TODO(gabriel): blocked by memory }; +class LocalExchangeMemLimitDependency final : public Dependency { + ENABLE_FACTORY_CREATOR(LocalExchangeMemLimitDependency); + LocalExchangeMemLimitDependency(int id, int node_id, QueryContext* query_ctx) + : Dependency(id, node_id, "LocalExchangeMemLimitDependency", true, query_ctx) {} + ~LocalExchangeMemLimitDependency() override = default; +}; class ExchangeSinkLocalState final : public PipelineXSinkLocalState { ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState); using Base = PipelineXSinkLocalState; diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 255cb15141..847891104c 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -74,6 +74,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) { stream_recvr = state->exec_env()->vstream_mgr()->create_recvr( state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(), profile(), p.is_merging(), p.sub_plan_query_statistics_recvr()); + stream_recvr->create_mem_limit_dependency(p.operator_id(), p.node_id(), state->get_query_ctx()); auto* source_dependency = _dependency; const auto& queues = stream_recvr->sender_queues(); deps.resize(queues.size()); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 56f6c51e68..0dc47363a8 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -354,8 +354,7 @@ VDataStreamRecvr::VDataStreamRecvr( _profile(profile), _peak_memory_usage_counter(nullptr), _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr), - _enable_pipeline(state->enable_pipeline_exec()), - _mem_available(std::make_shared(true)) { + _enable_pipeline(state->enable_pipeline_exec()) { // DataStreamRecvr may be destructed after the instance execution thread ends. _mem_tracker = std::make_unique("VDataStreamRecvr:" + print_id(_fragment_instance_id)); @@ -506,12 +505,21 @@ void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) { _blocks_memory_usage->add(size); auto val = _blocks_memory_usage_current_value.fetch_add(size); if (val + size > config::exchg_node_buffer_size_bytes) { - *_mem_available = false; + if (_exchange_sink_mem_limit_dependency) { + _exchange_sink_mem_limit_dependency->block(); + } } else { - *_mem_available = true; + if (_exchange_sink_mem_limit_dependency) { + _exchange_sink_mem_limit_dependency->set_ready(); + } } } +void VDataStreamRecvr::create_mem_limit_dependency(int id, int node_id, QueryContext* query_ctx) { + _exchange_sink_mem_limit_dependency = + pipeline::LocalExchangeMemLimitDependency::create_shared(id, node_id, query_ctx); +} + void VDataStreamRecvr::close() { if (_is_closed) { return; diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index e0b63459ad..122a9d763e 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -42,6 +42,7 @@ #include "common/object_pool.h" #include "common/status.h" #include "runtime/descriptors.h" +#include "runtime/query_context.h" #include "runtime/query_statistics.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" @@ -61,6 +62,7 @@ class RuntimeState; namespace pipeline { struct ExchangeDataDependency; class LocalExchangeChannelDependency; +class LocalExchangeMemLimitDependency; class ExchangeLocalState; } // namespace pipeline @@ -130,6 +132,10 @@ public: std::shared_ptr get_local_channel_dependency( int sender_id); + void create_mem_limit_dependency(int id, int node_id, QueryContext* query_ctx); + + auto get_mem_limit_dependency() { return _exchange_sink_mem_limit_dependency; } + private: void update_blocks_memory_usage(int64_t size); class PipSenderQueue; @@ -189,7 +195,8 @@ private: std::vector> _sender_to_local_channel_dependency; - std::shared_ptr _mem_available; + // use to limit sink write + std::shared_ptr _exchange_sink_mem_limit_dependency; }; class ThreadClosure : public google::protobuf::Closure { diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index f59dad266f..0e727a41f0 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -321,6 +321,11 @@ public: void set_receiver_eof(Status st) { _receiver_status = st; } + auto local_recvr() { + DCHECK(is_local()); + return _local_recvr; + } + protected: bool _recvr_is_valid() { if (_local_recvr && !_local_recvr->is_closed()) {