Revert "[feature](pipelineX) control exchange sink by memory usage (#28814)" (#29652)

This reverts commit e326ebb63e4e07d8ee6595561ab19dc5d411f592.
This commit is contained in:
Gabriel
2024-01-08 15:01:15 +08:00
committed by yiguolei
parent 59d7f64360
commit 767de7afe8
6 changed files with 7 additions and 38 deletions

View File

@ -199,22 +199,17 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_wait_channel_timer.resize(local_size);
auto deps_for_channels = AndDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
auto deps_for_channels_mem_limit = AndDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
for (auto* channel : channels) {
for (auto channel : channels) {
if (channel->is_local()) {
_local_channels_dependency[dep_id] = channel->get_local_channel_dependency();
DCHECK(_local_channels_dependency[dep_id] != nullptr);
deps_for_channels->add_child(_local_channels_dependency[dep_id]);
_wait_channel_timer[dep_id] = ADD_CHILD_TIMER(
_profile, fmt::format("WaitForLocalExchangeBuffer{}", dep_id), timer_name);
auto local_recvr = channel->local_recvr();
deps_for_channels_mem_limit->add_child(local_recvr->get_mem_limit_dependency());
dep_id++;
}
}
_exchange_sink_dependency->add_child(deps_for_channels);
_exchange_sink_dependency->add_child(deps_for_channels_mem_limit);
}
if (p._part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = channels.size();

View File

@ -96,14 +96,9 @@ public:
LocalExchangeChannelDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "LocalExchangeChannelDependency", true, query_ctx) {}
~LocalExchangeChannelDependency() override = default;
// TODO(gabriel): blocked by memory
};
class LocalExchangeMemLimitDependency final : public Dependency {
ENABLE_FACTORY_CREATOR(LocalExchangeMemLimitDependency);
LocalExchangeMemLimitDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "LocalExchangeMemLimitDependency", true, query_ctx) {}
~LocalExchangeMemLimitDependency() override = default;
};
class ExchangeSinkLocalState final : public PipelineXSinkLocalState<AndDependency> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
using Base = PipelineXSinkLocalState<AndDependency>;

View File

@ -74,7 +74,6 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(),
profile(), p.is_merging(), p.sub_plan_query_statistics_recvr());
stream_recvr->create_mem_limit_dependency(p.operator_id(), p.node_id(), state->get_query_ctx());
auto* source_dependency = _dependency;
const auto& queues = stream_recvr->sender_queues();
deps.resize(queues.size());

View File

@ -354,7 +354,8 @@ VDataStreamRecvr::VDataStreamRecvr(
_profile(profile),
_peak_memory_usage_counter(nullptr),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr),
_enable_pipeline(state->enable_pipeline_exec()) {
_enable_pipeline(state->enable_pipeline_exec()),
_mem_available(std::make_shared<bool>(true)) {
// DataStreamRecvr may be destructed after the instance execution thread ends.
_mem_tracker =
std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id));
@ -505,21 +506,12 @@ void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) {
_blocks_memory_usage->add(size);
auto val = _blocks_memory_usage_current_value.fetch_add(size);
if (val + size > config::exchg_node_buffer_size_bytes) {
if (_exchange_sink_mem_limit_dependency) {
_exchange_sink_mem_limit_dependency->block();
}
*_mem_available = false;
} else {
if (_exchange_sink_mem_limit_dependency) {
_exchange_sink_mem_limit_dependency->set_ready();
}
*_mem_available = true;
}
}
void VDataStreamRecvr::create_mem_limit_dependency(int id, int node_id, QueryContext* query_ctx) {
_exchange_sink_mem_limit_dependency =
pipeline::LocalExchangeMemLimitDependency::create_shared(id, node_id, query_ctx);
}
void VDataStreamRecvr::close() {
if (_is_closed) {
return;

View File

@ -42,7 +42,6 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "runtime/descriptors.h"
#include "runtime/query_context.h"
#include "runtime/query_statistics.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
@ -62,7 +61,6 @@ class RuntimeState;
namespace pipeline {
struct ExchangeDataDependency;
class LocalExchangeChannelDependency;
class LocalExchangeMemLimitDependency;
class ExchangeLocalState;
} // namespace pipeline
@ -132,10 +130,6 @@ public:
std::shared_ptr<pipeline::LocalExchangeChannelDependency> get_local_channel_dependency(
int sender_id);
void create_mem_limit_dependency(int id, int node_id, QueryContext* query_ctx);
auto get_mem_limit_dependency() { return _exchange_sink_mem_limit_dependency; }
private:
void update_blocks_memory_usage(int64_t size);
class PipSenderQueue;
@ -195,8 +189,7 @@ private:
std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
_sender_to_local_channel_dependency;
// use to limit sink write
std::shared_ptr<pipeline::LocalExchangeMemLimitDependency> _exchange_sink_mem_limit_dependency;
std::shared_ptr<bool> _mem_available;
};
class ThreadClosure : public google::protobuf::Closure {

View File

@ -321,11 +321,6 @@ public:
void set_receiver_eof(Status st) { _receiver_status = st; }
auto local_recvr() {
DCHECK(is_local());
return _local_recvr;
}
protected:
bool _recvr_is_valid() {
if (_local_recvr && !_local_recvr->is_closed()) {