diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 8618a0d445..b9c0ecf435 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -43,54 +43,6 @@ #include "vec/sink/vdata_stream_sender.h" namespace doris::pipeline { -template -class SelfDeleteClosure : public google::protobuf::Closure { -public: - SelfDeleteClosure(InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data = nullptr) - : _id(id), _eos(eos), _data(data) {} - ~SelfDeleteClosure() override = default; - SelfDeleteClosure(const SelfDeleteClosure& other) = delete; - SelfDeleteClosure& operator=(const SelfDeleteClosure& other) = delete; - void addFailedHandler(std::function fail_fn) { - _fail_fn = std::move(fail_fn); - } - void addSuccessHandler(std::function suc_fn) { - _suc_fn = suc_fn; - } - - void Run() noexcept override { - std::unique_ptr self_guard(this); - try { - if (_data) { - _data->unref(); - } - if (cntl.Failed()) { - std::string err = fmt::format( - "failed to send brpc when exchange, error={}, error_text={}, client: {}, " - "latency = {}", - berror(cntl.ErrorCode()), cntl.ErrorText(), BackendOptions::get_localhost(), - cntl.latency_us()); - _fail_fn(_id, err); - } else { - _suc_fn(_id, _eos, result); - } - } catch (const std::exception& exp) { - LOG(FATAL) << "brpc callback error: " << exp.what(); - } catch (...) { - LOG(FATAL) << "brpc callback error."; - } - } - - brpc::Controller cntl; - T result; - -private: - std::function _fail_fn; - std::function _suc_fn; - InstanceLoId _id; - bool _eos; - vectorized::BroadcastPBlockHolder* _data; -}; ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, int be_number, PipelineFragmentContext* context) @@ -227,12 +179,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { if (request.block) { brpc_request->set_allocated_block(request.block.get()); } - auto* _closure = new SelfDeleteClosure(id, request.eos, nullptr); - _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); - _closure->addFailedHandler( + auto* closure = request.channel->get_closure(id, request.eos, nullptr); + closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); + closure->addFailedHandler( [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); - _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, - const PTransmitDataResult& result) { + closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, + const PTransmitDataResult& result) { Status s = Status(result.status()); if (s.is()) { _set_receiver_eof(id); @@ -248,11 +200,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); if (enable_http_send_block(*brpc_request)) { - RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure, + RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), closure, *brpc_request, request.channel->_brpc_dest_addr)); } else { - transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request); + transmit_block(*request.channel->_brpc_stub, closure, *brpc_request); } } if (request.block) { @@ -271,13 +223,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { if (request.block_holder->get_block()) { brpc_request->set_allocated_block(request.block_holder->get_block()); } - auto* _closure = - new SelfDeleteClosure(id, request.eos, request.block_holder); - _closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); - _closure->addFailedHandler( + auto* closure = request.channel->get_closure(id, request.eos, request.block_holder); + closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); + closure->addFailedHandler( [&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); - _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, - const PTransmitDataResult& result) { + closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, + const PTransmitDataResult& result) { Status s = Status(result.status()); if (s.is()) { _set_receiver_eof(id); @@ -293,11 +244,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); if (enable_http_send_block(*brpc_request)) { - RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure, + RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), closure, *brpc_request, request.channel->_brpc_dest_addr)); } else { - transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request); + transmit_block(*request.channel->_brpc_stub, closure, *brpc_request); } } if (request.block_holder->get_block()) { @@ -336,6 +287,7 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); _instance_to_receiver_eof[id] = true; + _instance_to_sending_by_pipeline[id] = true; } bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) { diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index e71df9053d..9114aabf77 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include @@ -31,18 +32,55 @@ #include "common/global_types.h" #include "common/status.h" +#include "service/backend_options.h" namespace doris { class PTransmitDataParams; class TUniqueId; +using InstanceLoId = int64_t; + namespace vectorized { class PipChannel; -class BroadcastPBlockHolder; + +template +struct AtomicWrapper { + std::atomic _value; + + AtomicWrapper() : _value() {} + + AtomicWrapper(const std::atomic& a) : _value(a.load()) {} + + AtomicWrapper(const AtomicWrapper& other) : _value(other._value.load()) {} + + AtomicWrapper& operator=(const AtomicWrapper& other) { _value.store(other._a.load()); } +}; + +// We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock +// will be shared between different channel, so we have to use a ref count to mark if this +// PBlock is available for next serialization. +class BroadcastPBlockHolder { +public: + BroadcastPBlockHolder() : _ref_count(0) {} + ~BroadcastPBlockHolder() noexcept = default; + + void unref() noexcept { + DCHECK_GT(_ref_count._value, 0); + _ref_count._value.fetch_sub(1); + } + void ref() noexcept { _ref_count._value.fetch_add(1); } + + bool available() { return _ref_count._value == 0; } + + PBlock* get_block() { return &pblock; } + +private: + AtomicWrapper _ref_count; + PBlock pblock; +}; } // namespace vectorized namespace pipeline { -using InstanceLoId = int64_t; struct TransmitInfo { vectorized::PipChannel* channel; std::unique_ptr block; @@ -57,6 +95,62 @@ struct BroadcastTransmitInfo { class PipelineFragmentContext; +template +class SelfDeleteClosure : public google::protobuf::Closure { +public: + SelfDeleteClosure() = default; + + void init(InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data) { + _id = id; + _eos = eos; + _data = data; + } + + ~SelfDeleteClosure() override = default; + SelfDeleteClosure(const SelfDeleteClosure& other) = delete; + SelfDeleteClosure& operator=(const SelfDeleteClosure& other) = delete; + void addFailedHandler( + const std::function& fail_fn) { + _fail_fn = fail_fn; + } + void addSuccessHandler( + const std::function& suc_fn) { + _suc_fn = suc_fn; + } + + void Run() noexcept override { + try { + if (_data) { + _data->unref(); + } + if (cntl.Failed()) { + std::string err = fmt::format( + "failed to send brpc when exchange, error={}, error_text={}, client: {}, " + "latency = {}", + berror(cntl.ErrorCode()), cntl.ErrorText(), BackendOptions::get_localhost(), + cntl.latency_us()); + _fail_fn(_id, err); + } else { + _suc_fn(_id, _eos, result); + } + } catch (const std::exception& exp) { + LOG(FATAL) << "brpc callback error: " << exp.what(); + } catch (...) { + LOG(FATAL) << "brpc callback error."; + } + } + + brpc::Controller cntl; + T result; + +private: + std::function _fail_fn; + std::function _suc_fn; + InstanceLoId _id; + bool _eos; + vectorized::BroadcastPBlockHolder* _data; +}; + // Each ExchangeSinkOperator have one ExchangeSinkBuffer class ExchangeSinkBuffer { public: diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 86d15782ae..7c1b423083 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -68,42 +68,6 @@ class ExchangeSinkOperator; namespace vectorized { class Channel; -template -struct AtomicWrapper { - std::atomic _value; - - AtomicWrapper() : _value() {} - - AtomicWrapper(const std::atomic& a) : _value(a.load()) {} - - AtomicWrapper(const AtomicWrapper& other) : _value(other._value.load()) {} - - AtomicWrapper& operator=(const AtomicWrapper& other) { _value.store(other._a.load()); } -}; - -// We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock -// will be shared between different channel, so we have to use a ref count to mark if this -// PBlock is available for next serialization. -class BroadcastPBlockHolder { -public: - BroadcastPBlockHolder() : _ref_count(0) {} - ~BroadcastPBlockHolder() noexcept = default; - - void unref() noexcept { - DCHECK_GT(_ref_count._value, 0); - _ref_count._value.fetch_sub(1); - } - void ref() noexcept { _ref_count._value.fetch_add(1); } - - bool available() { return _ref_count._value == 0; } - - PBlock* get_block() { return &pblock; } - -private: - AtomicWrapper _ref_count; - PBlock pblock; -}; - class VDataStreamSender : public DataSink { public: friend class pipeline::ExchangeSinkOperator; @@ -525,11 +489,23 @@ public: _buffer->register_sink(_fragment_instance_id); } + pipeline::SelfDeleteClosure* get_closure( + InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data) { + if (!_closure) { + _closure.reset(new pipeline::SelfDeleteClosure()); + } else { + _closure->cntl.Reset(); + } + _closure->init(id, eos, data); + return _closure.get(); + } + private: friend class VDataStreamSender; pipeline::ExchangeSinkBuffer* _buffer = nullptr; bool _eos_send = false; + std::unique_ptr> _closure = nullptr; }; } // namespace vectorized