diff --git a/be/src/common/config.h b/be/src/common/config.h index de719094bd..b823aff817 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -896,6 +896,8 @@ CONF_Int32(query_bkd_inverted_index_limit_percent, "5"); // 5% CONF_String(inverted_index_dict_path, "${DORIS_HOME}/dict"); // tree depth for bkd index CONF_Int32(max_depth_in_bkd_tree, "32"); +// use num_broadcast_buffer blocks as buffer to do broadcast +CONF_Int32(num_broadcast_buffer, "32"); #ifdef BE_TEST // test s3 CONF_String(test_s3_resource, "resource"); diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index aedeeec19b..36f5aab77c 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -32,7 +32,8 @@ namespace doris::pipeline { template class SelfDeleteClosure : public google::protobuf::Closure { public: - SelfDeleteClosure(InstanceLoId id, bool eos) : _id(id), _eos(eos) {} + SelfDeleteClosure(InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data = nullptr) + : _id(id), _eos(eos), _data(data) {} ~SelfDeleteClosure() override = default; SelfDeleteClosure(const SelfDeleteClosure& other) = delete; SelfDeleteClosure& operator=(const SelfDeleteClosure& other) = delete; @@ -56,6 +57,9 @@ public: } else { _suc_fn(_id, _eos, result); } + if (_data) { + _data->unref(); + } } catch (const std::exception& exp) { LOG(FATAL) << "brpc callback error: " << exp.what(); } catch (...) { @@ -71,6 +75,7 @@ private: std::function _suc_fn; InstanceLoId _id; bool _eos; + vectorized::BroadcastPBlockHolder* _data; }; ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, @@ -126,6 +131,8 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { _instance_to_package_queue_mutex[low_id] = std::make_unique(); _instance_to_seq[low_id] = 0; _instance_to_package_queue[low_id] = std::queue>(); + _instance_to_broadcast_package_queue[low_id] = + std::queue>(); PUniqueId finst_id; finst_id.set_hi(fragment_instance_id.hi); finst_id.set_lo(fragment_instance_id.lo); @@ -155,59 +162,93 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { return Status::OK(); } +Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { + if (_is_finishing) { + return Status::OK(); + } + TUniqueId ins_id = request.channel->_fragment_instance_id; + bool send_now = false; + request.block_holder->ref(); + { + std::unique_lock lock(*_instance_to_package_queue_mutex[ins_id.lo]); + // Do not have in process rpc, directly send + if (_instance_to_sending_by_pipeline[ins_id.lo]) { + send_now = true; + _instance_to_sending_by_pipeline[ins_id.lo] = false; + } + _instance_to_broadcast_package_queue[ins_id.lo].emplace(std::move(request)); + } + if (send_now) { + RETURN_IF_ERROR(_send_rpc(ins_id.lo)); + } + + return Status::OK(); +} + Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); std::queue>& q = _instance_to_package_queue[id]; - if (q.empty() || _is_finishing) { + std::queue>& broadcast_q = + _instance_to_broadcast_package_queue[id]; + + if (_is_finishing) { _instance_to_sending_by_pipeline[id] = true; return Status::OK(); } - TransmitInfo& request = q.front(); +#define DO_RPC(QUEUE, BLOCK, HOLDER) \ + auto& request = QUEUE.front(); \ + if (!_instance_to_request[id]) { \ + _construct_request(id); \ + } \ + auto brpc_request = _instance_to_request[id]; \ + brpc_request->set_eos(request.eos); \ + brpc_request->set_packet_seq(_instance_to_seq[id]++); \ + if (request.BLOCK) { \ + brpc_request->set_allocated_block(request.BLOCK); \ + } \ + auto* _closure = new SelfDeleteClosure(id, request.eos, HOLDER); \ + _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); \ + _closure->addFailedHandler( \ + [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); \ + _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, \ + const PTransmitDataResult& result) { \ + Status s = Status(result.status()); \ + if (!s.ok()) { \ + _failed(id, \ + fmt::format("exchange req success but status isn't ok: {}", s.to_string())); \ + } else if (eos) { \ + _ended(id); \ + } else { \ + _send_rpc(id); \ + } \ + }); \ + { \ + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); \ + if (enable_http_send_block(*brpc_request)) { \ + RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure, \ + *brpc_request, request.channel->_brpc_dest_addr)); \ + } else { \ + transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request); \ + } \ + } \ + if (request.BLOCK) { \ + brpc_request->release_block(); \ + } \ + QUEUE.pop(); - if (!_instance_to_request[id]) { - _construct_request(id); + if (!q.empty()) { + // If we have data to shuffle which is not broadcasted + DO_RPC(q, block, nullptr) + } else if (!broadcast_q.empty()) { + // If we have data to shuffle which is broadcasted + DO_RPC(broadcast_q, block_holder->get_block(), request.block_holder) + } else { + _instance_to_sending_by_pipeline[id] = true; + return Status::OK(); } - auto brpc_request = _instance_to_request[id]; - brpc_request->set_eos(request.eos); - brpc_request->set_packet_seq(_instance_to_seq[id]++); - if (request.block) { - brpc_request->set_allocated_block(request.block.get()); - } - - auto* _closure = new SelfDeleteClosure(id, request.eos); - _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); - _closure->addFailedHandler( - [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); - _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, - const PTransmitDataResult& result) { - Status s = Status(result.status()); - if (!s.ok()) { - _failed(id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); - } else if (eos) { - _ended(id); - } else { - _send_rpc(id); - } - }); - - { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); - if (enable_http_send_block(*brpc_request)) { - RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure, - *brpc_request, request.channel->_brpc_dest_addr)); - } else { - transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request); - } - } - - if (request.block) { - brpc_request->release_block(); - } - q.pop(); - return Status::OK(); } diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index eff77c05bb..7be17706ce 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -31,13 +31,20 @@ namespace doris { namespace vectorized { class PipChannel; -} +class BroadcastPBlockHolder; +} // namespace vectorized namespace pipeline { using InstanceLoId = int64_t; struct TransmitInfo { vectorized::PipChannel* channel; - std::unique_ptr block; + PBlock* block; + bool eos; +}; + +struct BroadcastTransmitInfo { + vectorized::PipChannel* channel; + vectorized::BroadcastPBlockHolder* block_holder; bool eos; }; @@ -50,6 +57,7 @@ public: ~ExchangeSinkBuffer(); void register_sink(TUniqueId); Status add_block(TransmitInfo&& request); + Status add_block(BroadcastTransmitInfo&& request); bool can_write() const; bool is_pending_finish() const; void close(); @@ -57,8 +65,13 @@ public: private: phmap::flat_hash_map> _instance_to_package_queue_mutex; + // store data in non-broadcast shuffle phmap::flat_hash_map>> _instance_to_package_queue; + // store data in broadcast shuffle + phmap::flat_hash_map>> + _instance_to_broadcast_package_queue; using PackageSeq = int64_t; // must init zero phmap::flat_hash_map _instance_to_seq; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index c44e8acd74..c7a9e62335 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -242,7 +242,7 @@ Status Channel::close_internal() { RETURN_IF_ERROR(send_current_block(true)); } else { SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); - RETURN_IF_ERROR(send_block(nullptr, true)); + RETURN_IF_ERROR(send_block((PBlock*)nullptr, true)); } // Don't wait for the last packet to finish, left it to close_wait. return Status::OK(); @@ -287,7 +287,6 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sink.output_partition.type == TPartitionType::RANDOM || sink.output_partition.type == TPartitionType::RANGE_PARTITIONED || sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED); - _cur_pb_block = &_pb_block1; std::map fragment_id_to_channel_index; @@ -317,6 +316,12 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int } } _name = "VDataStreamSender"; + if (state->enable_pipeline_exec()) { + _broadcast_pb_blocks.resize(config::num_broadcast_buffer); + _broadcast_pb_block_idx = 0; + } else { + _cur_pb_block = &_pb_block1; + } } VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, @@ -470,6 +475,23 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { for (auto channel : _channels) { RETURN_IF_ERROR(channel->send_local_block(block)); } + } else if (state->enable_pipeline_exec()) { + BroadcastPBlockHolder* block_holder = nullptr; + RETURN_IF_ERROR(_get_next_available_buffer(&block_holder)); + { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + RETURN_IF_ERROR( + serialize_block(block, block_holder->get_block(), _channels.size())); + } + + for (auto channel : _channels) { + if (channel->is_local()) { + RETURN_IF_ERROR(channel->send_local_block(block)); + } else { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + RETURN_IF_ERROR(channel->send_block(block_holder, eos)); + } + } } else { { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); @@ -620,6 +642,28 @@ void VDataStreamSender::_roll_pb_block() { _cur_pb_block = (_cur_pb_block == &_pb_block1 ? &_pb_block2 : &_pb_block1); } +Status VDataStreamSender::_get_next_available_buffer(BroadcastPBlockHolder** holder) { + constexpr int MAX_LOOP = 1000; + + size_t it = 0; + while (it < MAX_LOOP) { + if (_broadcast_pb_block_idx == _broadcast_pb_blocks.size()) { + _broadcast_pb_block_idx = 0; + } + + for (; _broadcast_pb_block_idx < _broadcast_pb_blocks.size(); _broadcast_pb_block_idx++) { + if (_broadcast_pb_blocks[_broadcast_pb_block_idx].available()) { + _broadcast_pb_block_idx++; + *holder = &_broadcast_pb_blocks[_broadcast_pb_block_idx - 1]; + return Status::OK(); + } + } + it++; + } + return Status::InternalError( + "Exceed the max loop limit when acquire the next available buffer!"); +} + void VDataStreamSender::registe_channels(pipeline::ExchangeSinkBuffer* buffer) { for (auto channel : _channels) { ((PipChannel*)channel)->registe(buffer); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index e644aaf847..19858c259d 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -49,6 +49,42 @@ namespace vectorized { class VExprContext; class Channel; +template +struct AtomicWrapper { + std::atomic _value; + + AtomicWrapper() : _value() {} + + AtomicWrapper(const std::atomic& a) : _value(a.load()) {} + + AtomicWrapper(const AtomicWrapper& other) : _value(other._value.load()) {} + + AtomicWrapper& operator=(const AtomicWrapper& other) { _value.store(other._a.load()); } +}; + +// We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock +// will be shared between different channel, so we have to use a ref count to mark if this +// PBlock is available for next serialization. +class BroadcastPBlockHolder { +public: + BroadcastPBlockHolder() : _ref_count(0) {} + ~BroadcastPBlockHolder() noexcept = default; + + void unref() noexcept { + DCHECK_GT(_ref_count._value, 0); + _ref_count._value.fetch_sub(1); + } + void ref() noexcept { _ref_count._value.fetch_add(1); } + + bool available() { return _ref_count._value == 0; } + + PBlock* get_block() { return &pblock; } + +private: + AtomicWrapper _ref_count; + PBlock pblock; +}; + class VDataStreamSender : public DataSink { public: friend class pipeline::ExchangeSinkOperator; @@ -91,6 +127,7 @@ protected: friend class pipeline::ExchangeSinkBuffer; void _roll_pb_block(); + Status _get_next_available_buffer(BroadcastPBlockHolder** holder); Status get_partition_column_result(Block* block, int* result) const { int counter = 0; @@ -131,6 +168,10 @@ protected: PBlock _pb_block2; PBlock* _cur_pb_block; + // used by pipeline engine + std::vector _broadcast_pb_blocks; + int _broadcast_pb_block_idx; + // compute per-row partition values std::vector _partition_expr_ctxs; @@ -219,6 +260,10 @@ public: // if batch is nullptr, send the eof packet virtual Status send_block(PBlock* block, bool eos = false); + virtual Status send_block(BroadcastPBlockHolder* block, bool eos = false) { + return Status::InternalError("Send BroadcastPBlockHolder is not allowed!"); + } + Status add_rows(Block* block, const std::vector& row); virtual Status send_current_block(bool eos); @@ -369,8 +414,21 @@ public: } } if (eos || block->column_metas_size()) { - RETURN_IF_ERROR(_buffer->add_block( - {this, block ? std::make_unique(*block) : nullptr, eos})); + RETURN_IF_ERROR(_buffer->add_block({this, block, eos})); + } + return Status::OK(); + } + + Status send_block(BroadcastPBlockHolder* block, bool eos = false) override { + if (eos) { + if (_eos_send) { + return Status::OK(); + } else { + _eos_send = true; + } + } + if (eos || block->get_block()->column_metas_size()) { + RETURN_IF_ERROR(_buffer->add_block({this, block, eos})); } return Status::OK(); }