[feature](pipelineX) control exchange sink by memory usage (#28814)

This commit is contained in:
Mryange
2023-12-25 10:31:50 +08:00
committed by GitHub
parent d42fd68d6b
commit e326ebb63e
6 changed files with 38 additions and 7 deletions

View File

@ -199,17 +199,22 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_wait_channel_timer.resize(local_size);
auto deps_for_channels = AndDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
for (auto channel : channels) {
auto deps_for_channels_mem_limit = AndDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
for (auto* channel : channels) {
if (channel->is_local()) {
_local_channels_dependency[dep_id] = channel->get_local_channel_dependency();
DCHECK(_local_channels_dependency[dep_id] != nullptr);
deps_for_channels->add_child(_local_channels_dependency[dep_id]);
_wait_channel_timer[dep_id] = ADD_CHILD_TIMER(
_profile, fmt::format("WaitForLocalExchangeBuffer{}", dep_id), timer_name);
auto local_recvr = channel->local_recvr();
deps_for_channels_mem_limit->add_child(local_recvr->get_mem_limit_dependency());
dep_id++;
}
}
_exchange_sink_dependency->add_child(deps_for_channels);
_exchange_sink_dependency->add_child(deps_for_channels_mem_limit);
}
if (p._part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = channels.size();

View File

@ -96,9 +96,14 @@ public:
LocalExchangeChannelDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "LocalExchangeChannelDependency", true, query_ctx) {}
~LocalExchangeChannelDependency() override = default;
// TODO(gabriel): blocked by memory
};
class LocalExchangeMemLimitDependency final : public Dependency {
ENABLE_FACTORY_CREATOR(LocalExchangeMemLimitDependency);
LocalExchangeMemLimitDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "LocalExchangeMemLimitDependency", true, query_ctx) {}
~LocalExchangeMemLimitDependency() override = default;
};
class ExchangeSinkLocalState final : public PipelineXSinkLocalState<AndDependency> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
using Base = PipelineXSinkLocalState<AndDependency>;

View File

@ -74,6 +74,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(),
profile(), p.is_merging(), p.sub_plan_query_statistics_recvr());
stream_recvr->create_mem_limit_dependency(p.operator_id(), p.node_id(), state->get_query_ctx());
auto* source_dependency = _dependency;
const auto& queues = stream_recvr->sender_queues();
deps.resize(queues.size());

View File

@ -354,8 +354,7 @@ VDataStreamRecvr::VDataStreamRecvr(
_profile(profile),
_peak_memory_usage_counter(nullptr),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
_enable_pipeline(state->enable_pipeline_exec()),
_mem_available(std::make_shared<bool>(true)) {
_enable_pipeline(state->enable_pipeline_exec()) {
// DataStreamRecvr may be destructed after the instance execution thread ends.
_mem_tracker =
std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id));
@ -506,12 +505,21 @@ void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) {
_blocks_memory_usage->add(size);
auto val = _blocks_memory_usage_current_value.fetch_add(size);
if (val + size > config::exchg_node_buffer_size_bytes) {
*_mem_available = false;
if (_exchange_sink_mem_limit_dependency) {
_exchange_sink_mem_limit_dependency->block();
}
} else {
*_mem_available = true;
if (_exchange_sink_mem_limit_dependency) {
_exchange_sink_mem_limit_dependency->set_ready();
}
}
}
void VDataStreamRecvr::create_mem_limit_dependency(int id, int node_id, QueryContext* query_ctx) {
_exchange_sink_mem_limit_dependency =
pipeline::LocalExchangeMemLimitDependency::create_shared(id, node_id, query_ctx);
}
void VDataStreamRecvr::close() {
if (_is_closed) {
return;

View File

@ -42,6 +42,7 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "runtime/descriptors.h"
#include "runtime/query_context.h"
#include "runtime/query_statistics.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
@ -61,6 +62,7 @@ class RuntimeState;
namespace pipeline {
struct ExchangeDataDependency;
class LocalExchangeChannelDependency;
class LocalExchangeMemLimitDependency;
class ExchangeLocalState;
} // namespace pipeline
@ -130,6 +132,10 @@ public:
std::shared_ptr<pipeline::LocalExchangeChannelDependency> get_local_channel_dependency(
int sender_id);
void create_mem_limit_dependency(int id, int node_id, QueryContext* query_ctx);
auto get_mem_limit_dependency() { return _exchange_sink_mem_limit_dependency; }
private:
void update_blocks_memory_usage(int64_t size);
class PipSenderQueue;
@ -189,7 +195,8 @@ private:
std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
_sender_to_local_channel_dependency;
std::shared_ptr<bool> _mem_available;
// use to limit sink write
std::shared_ptr<pipeline::LocalExchangeMemLimitDependency> _exchange_sink_mem_limit_dependency;
};
class ThreadClosure : public google::protobuf::Closure {

View File

@ -321,6 +321,11 @@ public:
void set_receiver_eof(Status st) { _receiver_status = st; }
auto local_recvr() {
DCHECK(is_local());
return _local_recvr;
}
protected:
bool _recvr_is_valid() {
if (_local_recvr && !_local_recvr->is_closed()) {