From 18ad8562f268ed8e702e126e2f41dbe26da01f2e Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Wed, 20 Dec 2023 21:23:25 +0800 Subject: [PATCH] [refactor](broadcastbuffer) using a queue to remove ref and unref codes (#28698) Co-authored-by: yiguolei Add a new class broadcastbufferholderqueue to manage holders Using shared ptr to manage holders, not use ref and unref, it is too difficult to maintain. --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 43 +++++++++---- be/src/pipeline/exec/exchange_sink_buffer.h | 64 +++++++++++++------ .../pipeline/exec/exchange_sink_operator.cpp | 40 +++++------- be/src/pipeline/exec/exchange_sink_operator.h | 51 +-------------- .../exec/result_file_sink_operator.cpp | 3 +- .../pipeline/exec/result_file_sink_operator.h | 2 +- be/src/pipeline/pipeline_x/dependency.h | 2 + be/src/runtime/query_context.cpp | 3 +- be/src/vec/sink/vdata_stream_sender.cpp | 41 ++++-------- be/src/vec/sink/vdata_stream_sender.h | 16 ++--- 10 files changed, 120 insertions(+), 145 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 1a4a786654..1f579b6a97 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -45,16 +45,40 @@ #include "vec/sink/vdata_stream_sender.h" namespace doris { -namespace vectorized { -void BroadcastPBlockHolder::unref() noexcept { - DCHECK_GT(_ref_count._value, 0); - auto old_value = _ref_count._value.fetch_sub(1); - if (_dep && old_value == 1) { - _dep->return_available_block(); +namespace vectorized { +BroadcastPBlockHolder::~BroadcastPBlockHolder() { + // lock the parent queue, if the queue could lock success, then return the block + // to the queue, to reuse the block + std::shared_ptr tmp_queue = _parent_creator.lock(); + if (tmp_queue != nullptr) { + tmp_queue->push(BroadcastPBlockHolder::create_shared(std::move(_pblock))); + } + // If the queue already deconstruted, then release pblock automatically since it + // is a unique ptr. +} + +void BroadcastPBlockHolderQueue::push(std::shared_ptr holder) { + std::unique_lock l(_holders_lock); + holder->set_parent_creator(shared_from_this()); + _holders.push(holder); + if (_broadcast_dependency) { + _broadcast_dependency->set_ready(); } } +std::shared_ptr BroadcastPBlockHolderQueue::pop() { + std::unique_lock l(_holders_lock); + if (_holders.empty()) { + return {}; + } + std::shared_ptr res = _holders.top(); + _holders.pop(); + if (_holders.empty() && _broadcast_dependency != nullptr) { + _broadcast_dependency->block(); + } + return res; +} } // namespace vectorized namespace pipeline { @@ -184,12 +208,10 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { template Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { if (_is_finishing) { - request.block_holder->unref(); return Status::OK(); } TUniqueId ins_id = request.channel->_fragment_instance_id; if (_is_receiver_eof(ins_id.lo)) { - request.block_holder->unref(); return Status::EndOfFile("receiver eof"); } bool send_now = false; @@ -243,7 +265,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { if (!request.exec_status.ok()) { request.exec_status.to_protobuf(brpc_request->mutable_exec_status()); } - auto send_callback = request.channel->get_send_callback(id, request.eos, nullptr); + auto send_callback = request.channel->get_send_callback(id, request.eos); _instance_to_rpc_ctx[id]._send_callback = send_callback; _instance_to_rpc_ctx[id].is_cancelled = false; @@ -307,8 +329,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { auto statistic = brpc_request->mutable_query_statistics(); _statistics->to_pb(statistic); } - auto send_callback = - request.channel->get_send_callback(id, request.eos, request.block_holder); + auto send_callback = request.channel->get_send_callback(id, request.eos); ExchangeRpcContext rpc_ctx; rpc_ctx._send_callback = send_callback; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index c60beb9bcd..83b20f9c8a 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -29,6 +29,7 @@ #include #include #include +#include #include #include "common/global_types.h" @@ -45,7 +46,6 @@ class TUniqueId; using InstanceLoId = int64_t; namespace pipeline { -class BroadcastDependency; class ExchangeSinkQueueDependency; class Dependency; } // namespace pipeline @@ -71,25 +71,52 @@ struct AtomicWrapper { // We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock // will be shared between different channel, so we have to use a ref count to mark if this // PBlock is available for next serialization. +class BroadcastPBlockHolderQueue; class BroadcastPBlockHolder { + ENABLE_FACTORY_CREATOR(BroadcastPBlockHolder); + public: - BroadcastPBlockHolder() : _ref_count(0), _dep(nullptr) {} - BroadcastPBlockHolder(pipeline::BroadcastDependency* dep) : _ref_count(0), _dep(dep) {} - ~BroadcastPBlockHolder() noexcept = default; + BroadcastPBlockHolder() { _pblock = std::make_unique(); } + BroadcastPBlockHolder(std::unique_ptr&& pblock) { _pblock = std::move(pblock); } + ~BroadcastPBlockHolder(); - void ref(int delta) noexcept { _ref_count._value.fetch_add(delta); } - void unref() noexcept; - void ref() noexcept { ref(1); } - - bool available() { return _ref_count._value == 0; } - - PBlock* get_block() { return &pblock; } + PBlock* get_block() { return _pblock.get(); } private: - AtomicWrapper _ref_count; - PBlock pblock; - pipeline::BroadcastDependency* _dep = nullptr; + friend class BroadcastPBlockHolderQueue; + std::unique_ptr _pblock; + std::weak_ptr _parent_creator; + void set_parent_creator(std::shared_ptr parent_creator) { + _parent_creator = parent_creator; + } }; + +// Use a stack inside to ensure that the PBlock is in cpu cache +class BroadcastPBlockHolderQueue : public std::enable_shared_from_this { + ENABLE_FACTORY_CREATOR(BroadcastPBlockHolderQueue); + +public: + BroadcastPBlockHolderQueue() = default; + + BroadcastPBlockHolderQueue(std::shared_ptr& broadcast_dependency) { + _broadcast_dependency = broadcast_dependency; + } + + void push(std::shared_ptr holder); + + bool empty() { + std::unique_lock l(_holders_lock); + return _holders.empty(); + } + + std::shared_ptr pop(); + +private: + std::stack> _holders; + std::shared_ptr _broadcast_dependency; + std::mutex _holders_lock; +}; + } // namespace vectorized namespace pipeline { @@ -104,7 +131,7 @@ struct TransmitInfo { template struct BroadcastTransmitInfo { vectorized::PipChannel* channel = nullptr; - vectorized::BroadcastPBlockHolder* block_holder = nullptr; + std::shared_ptr block_holder = nullptr; bool eos; }; @@ -115,10 +142,9 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback { public: ExchangeSendCallback() = default; - void init(InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data) { + void init(InstanceLoId id, bool eos) { _id = id; _eos = eos; - _data = data; } ~ExchangeSendCallback() override = default; @@ -135,9 +161,6 @@ public: void call() noexcept override { try { - if (_data) { - _data->unref(); - } if (::doris::DummyBrpcCallback::cntl_->Failed()) { std::string err = fmt::format( "failed to send brpc when exchange, error={}, error_text={}, client: {}, " @@ -164,7 +187,6 @@ private: std::function _suc_fn; InstanceLoId _id; bool _eos; - vectorized::BroadcastPBlockHolder* _data = nullptr; }; struct ExchangeRpcContext { diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 0dd1dda415..548680a25c 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -181,13 +181,13 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf _exchange_sink_dependency->add_child(_queue_dependency); if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && !only_local_exchange) { - _broadcast_dependency = BroadcastDependency::create_shared( - _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); - _broadcast_dependency->set_available_block(config::num_broadcast_buffer); - _broadcast_pb_blocks.reserve(config::num_broadcast_buffer); - for (size_t i = 0; i < config::num_broadcast_buffer; i++) { - _broadcast_pb_blocks.emplace_back( - vectorized::BroadcastPBlockHolder(_broadcast_dependency.get())); + _broadcast_dependency = + Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "BroadcastDependency", true, state->get_query_ctx()); + _broadcast_pb_blocks = + vectorized::BroadcastPBlockHolderQueue::create_shared(_broadcast_dependency); + for (int i = 0; i < config::num_broadcast_buffer; ++i) { + _broadcast_pb_blocks->push(vectorized::BroadcastPBlockHolder::create_shared()); } _exchange_sink_dependency->add_child(_broadcast_dependency); @@ -338,7 +338,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } } } else { - vectorized::BroadcastPBlockHolder* block_holder = nullptr; + std::shared_ptr block_holder = nullptr; RETURN_IF_ERROR(local_state.get_next_available_buffer(&block_holder)); { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -355,13 +355,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block } else { block_holder->get_block()->Clear(); } - local_state._broadcast_dependency->take_available_block(); - block_holder->ref(local_state.channels.size()); for (auto channel : local_state.channels) { if (!channel->is_receiver_eof()) { Status status; if (channel->is_local()) { - block_holder->unref(); status = channel->send_local_block(&cur_block); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -369,8 +366,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block block_holder, source_state == SourceState::FINISHED); } HANDLE_CHANNEL_STATUS(state, channel, status); - } else { - block_holder->unref(); } } cur_block.clear_column_data(); @@ -454,21 +449,16 @@ void ExchangeSinkLocalState::register_channels( } Status ExchangeSinkLocalState::get_next_available_buffer( - vectorized::BroadcastPBlockHolder** holder) { + std::shared_ptr* holder) { // This condition means we need use broadcast buffer, so we should make sure // there are available buffer before running pipeline - for (size_t broadcast_pb_block_idx = 0; broadcast_pb_block_idx < _broadcast_pb_blocks.size(); - broadcast_pb_block_idx++) { - if (_broadcast_pb_blocks[broadcast_pb_block_idx].available()) { - *holder = &_broadcast_pb_blocks[broadcast_pb_block_idx]; - return Status::OK(); - } + if (_broadcast_pb_blocks->empty()) { + return Status::InternalError("No broadcast buffer left! Dependency: {}", + _broadcast_dependency->debug_string()); + } else { + *holder = _broadcast_pb_blocks->pop(); + return Status::OK(); } - return Status::InternalError("No broadcast buffer left! Available blocks: " + - std::to_string(_broadcast_dependency->available_blocks()) + - " and number of buffer is " + - std::to_string(_broadcast_pb_blocks.size()) + - " Dependency: " + _broadcast_dependency->debug_string()); } template diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 766f43dc80..70ae126fca 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -72,51 +72,6 @@ public: ~ExchangeSinkQueueDependency() override = default; }; -class BroadcastDependency final : public Dependency { -public: - ENABLE_FACTORY_CREATOR(BroadcastDependency); - BroadcastDependency(int id, int node_id, QueryContext* query_ctx) - : Dependency(id, node_id, "BroadcastDependency", true, query_ctx), - _available_block(0) {} - ~BroadcastDependency() override = default; - - std::string debug_string(int indentation_level = 0) override { - fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, - "{}{}: id={}, block task = {}, ready={}, _available_block = {}", - std::string(indentation_level * 2, ' '), _name, _node_id, - _blocked_task.size(), _ready, _available_block.load()); - return fmt::to_string(debug_string_buffer); - } - - void set_available_block(int available_block) { _available_block = available_block; } - - void return_available_block() { - if (_available_block.fetch_add(1) == 0) { - std::lock_guard lock(_lock); - if (_available_block == 0) { - return; - } - Dependency::set_ready(); - } - } - - void take_available_block() { - if (_available_block.fetch_sub(1) == 1) { - std::lock_guard lock(_lock); - if (_available_block == 0) { - Dependency::block(); - } - } - } - - int available_blocks() const { return _available_block; } - -private: - std::atomic _available_block; - std::mutex _lock; -}; - /** * We use this to control the execution for local exchange. * +---------------+ +---------------+ +---------------+ @@ -165,7 +120,7 @@ public: Dependency* finishdependency() override { return _finish_dependency.get(); } Status serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers = 1); void register_channels(pipeline::ExchangeSinkBuffer* buffer); - Status get_next_available_buffer(vectorized::BroadcastPBlockHolder** holder); + Status get_next_available_buffer(std::shared_ptr* holder); RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; } RuntimeProfile::Counter* blocks_sent_counter() { return _blocks_sent_counter; } @@ -231,12 +186,12 @@ private: // Sender instance id, unique within a fragment. int _sender_id; - std::vector _broadcast_pb_blocks; + std::shared_ptr _broadcast_pb_blocks; vectorized::BlockSerializer _serializer; std::shared_ptr _queue_dependency; - std::shared_ptr _broadcast_dependency; + std::shared_ptr _broadcast_dependency; std::vector> _local_channels_dependency; std::unique_ptr _partitioner; int _partition_count; diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index b19c93cd28..11045c5f06 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -240,8 +240,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status) status = channel->send_local_block(&cur_block); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = channel->send_broadcast_block(_block_holder.get(), - true); + status = channel->send_broadcast_block(_block_holder, true); } HANDLE_CHANNEL_STATUS(state, channel, status); } diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h index 9c0ada27a1..69235ee670 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.h +++ b/be/src/pipeline/exec/result_file_sink_operator.h @@ -77,7 +77,7 @@ private: std::vector*> _channels; bool _only_local_exchange = false; vectorized::BlockSerializer _serializer; - std::unique_ptr _block_holder; + std::shared_ptr _block_holder; RuntimeProfile::Counter* _brpc_wait_timer = nullptr; RuntimeProfile::Counter* _local_send_timer = nullptr; RuntimeProfile::Counter* _brpc_send_timer = nullptr; diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index a136ed710f..db7be6f469 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -67,6 +67,8 @@ struct BasicSharedState { }; class Dependency : public std::enable_shared_from_this { + ENABLE_FACTORY_CREATOR(Dependency); + public: Dependency(int id, int node_id, std::string name, QueryContext* query_ctx) : _id(id), diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 1f54a0dcde..9a90ea344e 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -40,7 +40,8 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* _start_time = VecDateTimeValue::local_time(); _shared_hash_table_controller.reset(new vectorized::SharedHashTableController()); _shared_scanner_controller.reset(new vectorized::SharedScannerController()); - _execution_dependency.reset(new pipeline::Dependency(-1, -1, "ExecutionDependency", this)); + _execution_dependency = + pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", this); _runtime_filter_mgr.reset( new RuntimeFilterMgr(TUniqueId(), RuntimeFilterParamsContext::create(this))); } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 55cfc2d232..53c140ef5a 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -377,8 +377,10 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int } _name = "VDataStreamSender"; if (_enable_pipeline_exec) { - _broadcast_pb_blocks.resize(config::num_broadcast_buffer); - _broadcast_pb_block_idx = 0; + _broadcast_pb_blocks = vectorized::BroadcastPBlockHolderQueue::create_shared(); + for (int i = 0; i < config::num_broadcast_buffer; ++i) { + _broadcast_pb_blocks->push(vectorized::BroadcastPBlockHolder::create_shared()); + } } else { _cur_pb_block = &_pb_block1; } @@ -552,7 +554,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { } } } else if (_enable_pipeline_exec) { - BroadcastPBlockHolder* block_holder = nullptr; + std::shared_ptr block_holder = nullptr; RETURN_IF_ERROR(_get_next_available_buffer(&block_holder)); { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -568,19 +570,15 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { block_holder->get_block()->Clear(); } Status status; - block_holder->ref(_channels.size()); for (auto channel : _channels) { if (!channel->is_receiver_eof()) { if (channel->is_local()) { - block_holder->unref(); status = channel->send_local_block(&cur_block); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_broadcast_block(block_holder, eos); } HANDLE_CHANNEL_STATUS(state, channel, status); - } else { - block_holder->unref(); } } cur_block.clear_column_data(); @@ -793,18 +791,14 @@ void VDataStreamSender::_roll_pb_block() { _cur_pb_block = (_cur_pb_block == &_pb_block1 ? &_pb_block2 : &_pb_block1); } -Status VDataStreamSender::_get_next_available_buffer(BroadcastPBlockHolder** holder) { - if (_broadcast_pb_block_idx >= _broadcast_pb_blocks.size()) { - return Status::InternalError( - "get_next_available_buffer meet invalid index, index={}, size={}", - _broadcast_pb_block_idx, _broadcast_pb_blocks.size()); +Status VDataStreamSender::_get_next_available_buffer( + std::shared_ptr* holder) { + if (_broadcast_pb_blocks->empty()) { + return Status::InternalError("No broadcast buffer left!"); + } else { + *holder = _broadcast_pb_blocks->pop(); + return Status::OK(); } - if (!_broadcast_pb_blocks[_broadcast_pb_block_idx].available()) { - return Status::InternalError("broadcast_pb_blocks not available"); - } - *holder = &_broadcast_pb_blocks[_broadcast_pb_block_idx]; - _broadcast_pb_block_idx++; - return Status::OK(); } void VDataStreamSender::register_pipeline_channels( @@ -819,16 +813,7 @@ bool VDataStreamSender::channel_all_can_write() { !_only_local_exchange) { // This condition means we need use broadcast buffer, so we should make sure // there are available buffer before running pipeline - if (_broadcast_pb_block_idx == _broadcast_pb_blocks.size()) { - _broadcast_pb_block_idx = 0; - } - - for (; _broadcast_pb_block_idx < _broadcast_pb_blocks.size(); _broadcast_pb_block_idx++) { - if (_broadcast_pb_blocks[_broadcast_pb_block_idx].available()) { - return true; - } - } - return false; + return !_broadcast_pb_blocks->empty(); } else { for (auto channel : _channels) { if (!channel->can_write()) { diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index c4bf7fdd45..54822f8968 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -158,7 +158,7 @@ protected: friend class pipeline::ExchangeSinkBuffer; void _roll_pb_block(); - Status _get_next_available_buffer(BroadcastPBlockHolder** holder); + Status _get_next_available_buffer(std::shared_ptr* holder); template Status channel_add_rows(RuntimeState* state, Channels& channels, int num_channels, @@ -185,8 +185,7 @@ protected: PBlock* _cur_pb_block = nullptr; // used by pipeline engine - std::vector _broadcast_pb_blocks; - int _broadcast_pb_block_idx; + std::shared_ptr _broadcast_pb_blocks; std::unique_ptr _partitioner; size_t _partition_count; @@ -273,7 +272,8 @@ public: virtual Status send_remote_block(PBlock* block, bool eos = false, Status exec_status = Status::OK()); - virtual Status send_broadcast_block(BroadcastPBlockHolder* block, bool eos = false) { + virtual Status send_broadcast_block(std::shared_ptr& block, + bool eos = false) { return Status::InternalError("Send BroadcastPBlockHolder is not allowed!"); } @@ -488,11 +488,11 @@ public: return Status::OK(); } - Status send_broadcast_block(BroadcastPBlockHolder* block, bool eos = false) override { + Status send_broadcast_block(std::shared_ptr& block, + bool eos = false) override { COUNTER_UPDATE(Channel::_parent->blocks_sent_counter(), 1); if (eos) { if (_eos_send) { - block->unref(); return Status::OK(); } _eos_send = true; @@ -536,13 +536,13 @@ public: } std::shared_ptr> get_send_callback( - InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data) { + InstanceLoId id, bool eos) { if (!_send_callback) { _send_callback = pipeline::ExchangeSendCallback::create_shared(); } else { _send_callback->cntl_->Reset(); } - _send_callback->init(id, eos, data); + _send_callback->init(id, eos); return _send_callback; }