From a038fdaec642ff6a42a2c776d8d76076138efd5b Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 9 Feb 2023 19:22:40 +0800 Subject: [PATCH] [Bug](pipeline) Fix bug in non-local exchange on pipeline engine (#16463) Currently, for broadcast shuffle, we serialize a block once and then send it by RPC through multiple channel. After this, we will serialize next block in the same memory for consideration of memory reuse. However, since the RPC is asynchronized, maybe the next block serialization will happen before sending the previous block. So, in this PR, I use a ref count to identify if the serialized block can be reuse in broadcast shuffle. --- be/src/common/config.h | 2 + be/src/pipeline/exec/exchange_sink_buffer.cpp | 127 ++++++++++++------ be/src/pipeline/exec/exchange_sink_buffer.h | 17 ++- be/src/vec/sink/vdata_stream_sender.cpp | 48 ++++++- be/src/vec/sink/vdata_stream_sender.h | 62 ++++++++- 5 files changed, 207 insertions(+), 49 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index de719094bd..b823aff817 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -896,6 +896,8 @@ CONF_Int32(query_bkd_inverted_index_limit_percent, "5"); // 5% CONF_String(inverted_index_dict_path, "${DORIS_HOME}/dict"); // tree depth for bkd index CONF_Int32(max_depth_in_bkd_tree, "32"); +// use num_broadcast_buffer blocks as buffer to do broadcast +CONF_Int32(num_broadcast_buffer, "32"); #ifdef BE_TEST // test s3 CONF_String(test_s3_resource, "resource"); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index aedeeec19b..36f5aab77c 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -32,7 +32,8 @@ namespace doris::pipeline { template class SelfDeleteClosure : public google::protobuf::Closure { public: - SelfDeleteClosure(InstanceLoId id, bool eos) : _id(id), _eos(eos) {} + SelfDeleteClosure(InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data = nullptr) + : _id(id), _eos(eos), _data(data) {} ~SelfDeleteClosure() override = default; SelfDeleteClosure(const SelfDeleteClosure& other) = delete; SelfDeleteClosure& operator=(const SelfDeleteClosure& other) = delete; @@ -56,6 +57,9 @@ public: } else { _suc_fn(_id, _eos, result); } + if (_data) { + _data->unref(); + } } catch (const std::exception& exp) { LOG(FATAL) << "brpc callback error: " << exp.what(); } catch (...) { @@ -71,6 +75,7 @@ private: std::function _suc_fn; InstanceLoId _id; bool _eos; + vectorized::BroadcastPBlockHolder* _data; }; ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, @@ -126,6 +131,8 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { _instance_to_package_queue_mutex[low_id] = std::make_unique(); _instance_to_seq[low_id] = 0; _instance_to_package_queue[low_id] = std::queue>(); + _instance_to_broadcast_package_queue[low_id] = + std::queue>(); PUniqueId finst_id; finst_id.set_hi(fragment_instance_id.hi); finst_id.set_lo(fragment_instance_id.lo); @@ -155,59 +162,93 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { return Status::OK(); } +Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { + if (_is_finishing) { + return Status::OK(); + } + TUniqueId ins_id = request.channel->_fragment_instance_id; + bool send_now = false; + request.block_holder->ref(); + { + std::unique_lock lock(*_instance_to_package_queue_mutex[ins_id.lo]); + // Do not have in process rpc, directly send + if (_instance_to_sending_by_pipeline[ins_id.lo]) { + send_now = true; + _instance_to_sending_by_pipeline[ins_id.lo] = false; + } + _instance_to_broadcast_package_queue[ins_id.lo].emplace(std::move(request)); + } + if (send_now) { + RETURN_IF_ERROR(_send_rpc(ins_id.lo)); + } + + return Status::OK(); +} + Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); std::queue>& q = _instance_to_package_queue[id]; - if (q.empty() || _is_finishing) { + std::queue>& broadcast_q = + _instance_to_broadcast_package_queue[id]; + + if (_is_finishing) { _instance_to_sending_by_pipeline[id] = true; return Status::OK(); } - TransmitInfo& request = q.front(); +#define DO_RPC(QUEUE, BLOCK, HOLDER) \ + auto& request = QUEUE.front(); \ + if (!_instance_to_request[id]) { \ + _construct_request(id); \ + } \ + auto brpc_request = _instance_to_request[id]; \ + brpc_request->set_eos(request.eos); \ + brpc_request->set_packet_seq(_instance_to_seq[id]++); \ + if (request.BLOCK) { \ + brpc_request->set_allocated_block(request.BLOCK); \ + } \ + auto* _closure = new SelfDeleteClosure(id, request.eos, HOLDER); \ + _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); \ + _closure->addFailedHandler( \ + [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); \ + _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, \ + const PTransmitDataResult& result) { \ + Status s = Status(result.status()); \ + if (!s.ok()) { \ + _failed(id, \ + fmt::format("exchange req success but status isn't ok: {}", s.to_string())); \ + } else if (eos) { \ + _ended(id); \ + } else { \ + _send_rpc(id); \ + } \ + }); \ + { \ + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); \ + if (enable_http_send_block(*brpc_request)) { \ + RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure, \ + *brpc_request, request.channel->_brpc_dest_addr)); \ + } else { \ + transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request); \ + } \ + } \ + if (request.BLOCK) { \ + brpc_request->release_block(); \ + } \ + QUEUE.pop(); - if (!_instance_to_request[id]) { - _construct_request(id); + if (!q.empty()) { + // If we have data to shuffle which is not broadcasted + DO_RPC(q, block, nullptr) + } else if (!broadcast_q.empty()) { + // If we have data to shuffle which is broadcasted + DO_RPC(broadcast_q, block_holder->get_block(), request.block_holder) + } else { + _instance_to_sending_by_pipeline[id] = true; + return Status::OK(); } - auto brpc_request = _instance_to_request[id]; - brpc_request->set_eos(request.eos); - brpc_request->set_packet_seq(_instance_to_seq[id]++); - if (request.block) { - brpc_request->set_allocated_block(request.block.get()); - } - - auto* _closure = new SelfDeleteClosure(id, request.eos); - _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); - _closure->addFailedHandler( - [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); - _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, - const PTransmitDataResult& result) { - Status s = Status(result.status()); - if (!s.ok()) { - _failed(id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); - } else if (eos) { - _ended(id); - } else { - _send_rpc(id); - } - }); - - { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); - if (enable_http_send_block(*brpc_request)) { - RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure, - *brpc_request, request.channel->_brpc_dest_addr)); - } else { - transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request); - } - } - - if (request.block) { - brpc_request->release_block(); - } - q.pop(); - return Status::OK(); } diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index eff77c05bb..7be17706ce 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -31,13 +31,20 @@ namespace doris { namespace vectorized { class PipChannel; -} +class BroadcastPBlockHolder; +} // namespace vectorized namespace pipeline { using InstanceLoId = int64_t; struct TransmitInfo { vectorized::PipChannel* channel; - std::unique_ptr block; + PBlock* block; + bool eos; +}; + +struct BroadcastTransmitInfo { + vectorized::PipChannel* channel; + vectorized::BroadcastPBlockHolder* block_holder; bool eos; }; @@ -50,6 +57,7 @@ public: ~ExchangeSinkBuffer(); void register_sink(TUniqueId); Status add_block(TransmitInfo&& request); + Status add_block(BroadcastTransmitInfo&& request); bool can_write() const; bool is_pending_finish() const; void close(); @@ -57,8 +65,13 @@ public: private: phmap::flat_hash_map> _instance_to_package_queue_mutex; + // store data in non-broadcast shuffle phmap::flat_hash_map>> _instance_to_package_queue; + // store data in broadcast shuffle + phmap::flat_hash_map>> + _instance_to_broadcast_package_queue; using PackageSeq = int64_t; // must init zero phmap::flat_hash_map _instance_to_seq; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index c44e8acd74..c7a9e62335 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -242,7 +242,7 @@ Status Channel::close_internal() { RETURN_IF_ERROR(send_current_block(true)); } else { SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); - RETURN_IF_ERROR(send_block(nullptr, true)); + RETURN_IF_ERROR(send_block((PBlock*)nullptr, true)); } // Don't wait for the last packet to finish, left it to close_wait. return Status::OK(); @@ -287,7 +287,6 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sink.output_partition.type == TPartitionType::RANDOM || sink.output_partition.type == TPartitionType::RANGE_PARTITIONED || sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED); - _cur_pb_block = &_pb_block1; std::map fragment_id_to_channel_index; @@ -317,6 +316,12 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int } } _name = "VDataStreamSender"; + if (state->enable_pipeline_exec()) { + _broadcast_pb_blocks.resize(config::num_broadcast_buffer); + _broadcast_pb_block_idx = 0; + } else { + _cur_pb_block = &_pb_block1; + } } VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, @@ -470,6 +475,23 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { for (auto channel : _channels) { RETURN_IF_ERROR(channel->send_local_block(block)); } + } else if (state->enable_pipeline_exec()) { + BroadcastPBlockHolder* block_holder = nullptr; + RETURN_IF_ERROR(_get_next_available_buffer(&block_holder)); + { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + RETURN_IF_ERROR( + serialize_block(block, block_holder->get_block(), _channels.size())); + } + + for (auto channel : _channels) { + if (channel->is_local()) { + RETURN_IF_ERROR(channel->send_local_block(block)); + } else { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + RETURN_IF_ERROR(channel->send_block(block_holder, eos)); + } + } } else { { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -620,6 +642,28 @@ void VDataStreamSender::_roll_pb_block() { _cur_pb_block = (_cur_pb_block == &_pb_block1 ? &_pb_block2 : &_pb_block1); } +Status VDataStreamSender::_get_next_available_buffer(BroadcastPBlockHolder** holder) { + constexpr int MAX_LOOP = 1000; + + size_t it = 0; + while (it < MAX_LOOP) { + if (_broadcast_pb_block_idx == _broadcast_pb_blocks.size()) { + _broadcast_pb_block_idx = 0; + } + + for (; _broadcast_pb_block_idx < _broadcast_pb_blocks.size(); _broadcast_pb_block_idx++) { + if (_broadcast_pb_blocks[_broadcast_pb_block_idx].available()) { + _broadcast_pb_block_idx++; + *holder = &_broadcast_pb_blocks[_broadcast_pb_block_idx - 1]; + return Status::OK(); + } + } + it++; + } + return Status::InternalError( + "Exceed the max loop limit when acquire the next available buffer!"); +} + void VDataStreamSender::registe_channels(pipeline::ExchangeSinkBuffer* buffer) { for (auto channel : _channels) { ((PipChannel*)channel)->registe(buffer); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index e644aaf847..19858c259d 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -49,6 +49,42 @@ namespace vectorized { class VExprContext; class Channel; +template +struct AtomicWrapper { + std::atomic _value; + + AtomicWrapper() : _value() {} + + AtomicWrapper(const std::atomic& a) : _value(a.load()) {} + + AtomicWrapper(const AtomicWrapper& other) : _value(other._value.load()) {} + + AtomicWrapper& operator=(const AtomicWrapper& other) { _value.store(other._a.load()); } +}; + +// We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock +// will be shared between different channel, so we have to use a ref count to mark if this +// PBlock is available for next serialization. +class BroadcastPBlockHolder { +public: + BroadcastPBlockHolder() : _ref_count(0) {} + ~BroadcastPBlockHolder() noexcept = default; + + void unref() noexcept { + DCHECK_GT(_ref_count._value, 0); + _ref_count._value.fetch_sub(1); + } + void ref() noexcept { _ref_count._value.fetch_add(1); } + + bool available() { return _ref_count._value == 0; } + + PBlock* get_block() { return &pblock; } + +private: + AtomicWrapper _ref_count; + PBlock pblock; +}; + class VDataStreamSender : public DataSink { public: friend class pipeline::ExchangeSinkOperator; @@ -91,6 +127,7 @@ protected: friend class pipeline::ExchangeSinkBuffer; void _roll_pb_block(); + Status _get_next_available_buffer(BroadcastPBlockHolder** holder); Status get_partition_column_result(Block* block, int* result) const { int counter = 0; @@ -131,6 +168,10 @@ protected: PBlock _pb_block2; PBlock* _cur_pb_block; + // used by pipeline engine + std::vector _broadcast_pb_blocks; + int _broadcast_pb_block_idx; + // compute per-row partition values std::vector _partition_expr_ctxs; @@ -219,6 +260,10 @@ public: // if batch is nullptr, send the eof packet virtual Status send_block(PBlock* block, bool eos = false); + virtual Status send_block(BroadcastPBlockHolder* block, bool eos = false) { + return Status::InternalError("Send BroadcastPBlockHolder is not allowed!"); + } + Status add_rows(Block* block, const std::vector& row); virtual Status send_current_block(bool eos); @@ -369,8 +414,21 @@ public: } } if (eos || block->column_metas_size()) { - RETURN_IF_ERROR(_buffer->add_block( - {this, block ? std::make_unique(*block) : nullptr, eos})); + RETURN_IF_ERROR(_buffer->add_block({this, block, eos})); + } + return Status::OK(); + } + + Status send_block(BroadcastPBlockHolder* block, bool eos = false) override { + if (eos) { + if (_eos_send) { + return Status::OK(); + } else { + _eos_send = true; + } + } + if (eos || block->get_block()->column_metas_size()) { + RETURN_IF_ERROR(_buffer->add_block({this, block, eos})); } return Status::OK(); }