[pipeline](rpc) support closure reuse in pipeline exec engine (#20278)
This commit is contained in:
@ -43,54 +43,6 @@
|
||||
#include "vec/sink/vdata_stream_sender.h"
|
||||
|
||||
namespace doris::pipeline {
|
||||
template <typename T>
|
||||
class SelfDeleteClosure : public google::protobuf::Closure {
|
||||
public:
|
||||
SelfDeleteClosure(InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data = nullptr)
|
||||
: _id(id), _eos(eos), _data(data) {}
|
||||
~SelfDeleteClosure() override = default;
|
||||
SelfDeleteClosure(const SelfDeleteClosure& other) = delete;
|
||||
SelfDeleteClosure& operator=(const SelfDeleteClosure& other) = delete;
|
||||
void addFailedHandler(std::function<void(const InstanceLoId&, const std::string&)> fail_fn) {
|
||||
_fail_fn = std::move(fail_fn);
|
||||
}
|
||||
void addSuccessHandler(std::function<void(const InstanceLoId&, const bool&, const T&)> suc_fn) {
|
||||
_suc_fn = suc_fn;
|
||||
}
|
||||
|
||||
void Run() noexcept override {
|
||||
std::unique_ptr<SelfDeleteClosure> self_guard(this);
|
||||
try {
|
||||
if (_data) {
|
||||
_data->unref();
|
||||
}
|
||||
if (cntl.Failed()) {
|
||||
std::string err = fmt::format(
|
||||
"failed to send brpc when exchange, error={}, error_text={}, client: {}, "
|
||||
"latency = {}",
|
||||
berror(cntl.ErrorCode()), cntl.ErrorText(), BackendOptions::get_localhost(),
|
||||
cntl.latency_us());
|
||||
_fail_fn(_id, err);
|
||||
} else {
|
||||
_suc_fn(_id, _eos, result);
|
||||
}
|
||||
} catch (const std::exception& exp) {
|
||||
LOG(FATAL) << "brpc callback error: " << exp.what();
|
||||
} catch (...) {
|
||||
LOG(FATAL) << "brpc callback error.";
|
||||
}
|
||||
}
|
||||
|
||||
brpc::Controller cntl;
|
||||
T result;
|
||||
|
||||
private:
|
||||
std::function<void(const InstanceLoId&, const std::string&)> _fail_fn;
|
||||
std::function<void(const InstanceLoId&, const bool&, const T&)> _suc_fn;
|
||||
InstanceLoId _id;
|
||||
bool _eos;
|
||||
vectorized::BroadcastPBlockHolder* _data;
|
||||
};
|
||||
|
||||
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id,
|
||||
int be_number, PipelineFragmentContext* context)
|
||||
@ -227,12 +179,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
|
||||
if (request.block) {
|
||||
brpc_request->set_allocated_block(request.block.get());
|
||||
}
|
||||
auto* _closure = new SelfDeleteClosure<PTransmitDataResult>(id, request.eos, nullptr);
|
||||
_closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
|
||||
_closure->addFailedHandler(
|
||||
auto* closure = request.channel->get_closure(id, request.eos, nullptr);
|
||||
closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
|
||||
closure->addFailedHandler(
|
||||
[&](const InstanceLoId& id, const std::string& err) { _failed(id, err); });
|
||||
_closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
|
||||
const PTransmitDataResult& result) {
|
||||
closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
|
||||
const PTransmitDataResult& result) {
|
||||
Status s = Status(result.status());
|
||||
if (s.is<ErrorCode::END_OF_FILE>()) {
|
||||
_set_receiver_eof(id);
|
||||
@ -248,11 +200,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
|
||||
{
|
||||
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
|
||||
if (enable_http_send_block(*brpc_request)) {
|
||||
RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure,
|
||||
RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), closure,
|
||||
*brpc_request,
|
||||
request.channel->_brpc_dest_addr));
|
||||
} else {
|
||||
transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request);
|
||||
transmit_block(*request.channel->_brpc_stub, closure, *brpc_request);
|
||||
}
|
||||
}
|
||||
if (request.block) {
|
||||
@ -271,13 +223,12 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
|
||||
if (request.block_holder->get_block()) {
|
||||
brpc_request->set_allocated_block(request.block_holder->get_block());
|
||||
}
|
||||
auto* _closure =
|
||||
new SelfDeleteClosure<PTransmitDataResult>(id, request.eos, request.block_holder);
|
||||
_closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
|
||||
_closure->addFailedHandler(
|
||||
auto* closure = request.channel->get_closure(id, request.eos, request.block_holder);
|
||||
closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
|
||||
closure->addFailedHandler(
|
||||
[&](const InstanceLoId& id, const std::string& err) { _failed(id, err); });
|
||||
_closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
|
||||
const PTransmitDataResult& result) {
|
||||
closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
|
||||
const PTransmitDataResult& result) {
|
||||
Status s = Status(result.status());
|
||||
if (s.is<ErrorCode::END_OF_FILE>()) {
|
||||
_set_receiver_eof(id);
|
||||
@ -293,11 +244,11 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
|
||||
{
|
||||
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
|
||||
if (enable_http_send_block(*brpc_request)) {
|
||||
RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure,
|
||||
RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), closure,
|
||||
*brpc_request,
|
||||
request.channel->_brpc_dest_addr));
|
||||
} else {
|
||||
transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request);
|
||||
transmit_block(*request.channel->_brpc_stub, closure, *brpc_request);
|
||||
}
|
||||
}
|
||||
if (request.block_holder->get_block()) {
|
||||
@ -336,6 +287,7 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
|
||||
void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
|
||||
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
|
||||
_instance_to_receiver_eof[id] = true;
|
||||
_instance_to_sending_by_pipeline[id] = true;
|
||||
}
|
||||
|
||||
bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) {
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <brpc/controller.h>
|
||||
#include <gen_cpp/data.pb.h>
|
||||
#include <gen_cpp/types.pb.h>
|
||||
#include <parallel_hashmap/phmap.h>
|
||||
@ -31,18 +32,55 @@
|
||||
|
||||
#include "common/global_types.h"
|
||||
#include "common/status.h"
|
||||
#include "service/backend_options.h"
|
||||
|
||||
namespace doris {
|
||||
class PTransmitDataParams;
|
||||
class TUniqueId;
|
||||
|
||||
using InstanceLoId = int64_t;
|
||||
|
||||
namespace vectorized {
|
||||
class PipChannel;
|
||||
class BroadcastPBlockHolder;
|
||||
|
||||
template <typename T>
|
||||
struct AtomicWrapper {
|
||||
std::atomic<T> _value;
|
||||
|
||||
AtomicWrapper() : _value() {}
|
||||
|
||||
AtomicWrapper(const std::atomic<T>& a) : _value(a.load()) {}
|
||||
|
||||
AtomicWrapper(const AtomicWrapper& other) : _value(other._value.load()) {}
|
||||
|
||||
AtomicWrapper& operator=(const AtomicWrapper& other) { _value.store(other._a.load()); }
|
||||
};
|
||||
|
||||
// We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock
|
||||
// will be shared between different channel, so we have to use a ref count to mark if this
|
||||
// PBlock is available for next serialization.
|
||||
class BroadcastPBlockHolder {
|
||||
public:
|
||||
BroadcastPBlockHolder() : _ref_count(0) {}
|
||||
~BroadcastPBlockHolder() noexcept = default;
|
||||
|
||||
void unref() noexcept {
|
||||
DCHECK_GT(_ref_count._value, 0);
|
||||
_ref_count._value.fetch_sub(1);
|
||||
}
|
||||
void ref() noexcept { _ref_count._value.fetch_add(1); }
|
||||
|
||||
bool available() { return _ref_count._value == 0; }
|
||||
|
||||
PBlock* get_block() { return &pblock; }
|
||||
|
||||
private:
|
||||
AtomicWrapper<uint32_t> _ref_count;
|
||||
PBlock pblock;
|
||||
};
|
||||
} // namespace vectorized
|
||||
|
||||
namespace pipeline {
|
||||
using InstanceLoId = int64_t;
|
||||
struct TransmitInfo {
|
||||
vectorized::PipChannel* channel;
|
||||
std::unique_ptr<PBlock> block;
|
||||
@ -57,6 +95,62 @@ struct BroadcastTransmitInfo {
|
||||
|
||||
class PipelineFragmentContext;
|
||||
|
||||
template <typename T>
|
||||
class SelfDeleteClosure : public google::protobuf::Closure {
|
||||
public:
|
||||
SelfDeleteClosure() = default;
|
||||
|
||||
void init(InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data) {
|
||||
_id = id;
|
||||
_eos = eos;
|
||||
_data = data;
|
||||
}
|
||||
|
||||
~SelfDeleteClosure() override = default;
|
||||
SelfDeleteClosure(const SelfDeleteClosure& other) = delete;
|
||||
SelfDeleteClosure& operator=(const SelfDeleteClosure& other) = delete;
|
||||
void addFailedHandler(
|
||||
const std::function<void(const InstanceLoId&, const std::string&)>& fail_fn) {
|
||||
_fail_fn = fail_fn;
|
||||
}
|
||||
void addSuccessHandler(
|
||||
const std::function<void(const InstanceLoId&, const bool&, const T&)>& suc_fn) {
|
||||
_suc_fn = suc_fn;
|
||||
}
|
||||
|
||||
void Run() noexcept override {
|
||||
try {
|
||||
if (_data) {
|
||||
_data->unref();
|
||||
}
|
||||
if (cntl.Failed()) {
|
||||
std::string err = fmt::format(
|
||||
"failed to send brpc when exchange, error={}, error_text={}, client: {}, "
|
||||
"latency = {}",
|
||||
berror(cntl.ErrorCode()), cntl.ErrorText(), BackendOptions::get_localhost(),
|
||||
cntl.latency_us());
|
||||
_fail_fn(_id, err);
|
||||
} else {
|
||||
_suc_fn(_id, _eos, result);
|
||||
}
|
||||
} catch (const std::exception& exp) {
|
||||
LOG(FATAL) << "brpc callback error: " << exp.what();
|
||||
} catch (...) {
|
||||
LOG(FATAL) << "brpc callback error.";
|
||||
}
|
||||
}
|
||||
|
||||
brpc::Controller cntl;
|
||||
T result;
|
||||
|
||||
private:
|
||||
std::function<void(const InstanceLoId&, const std::string&)> _fail_fn;
|
||||
std::function<void(const InstanceLoId&, const bool&, const T&)> _suc_fn;
|
||||
InstanceLoId _id;
|
||||
bool _eos;
|
||||
vectorized::BroadcastPBlockHolder* _data;
|
||||
};
|
||||
|
||||
// Each ExchangeSinkOperator have one ExchangeSinkBuffer
|
||||
class ExchangeSinkBuffer {
|
||||
public:
|
||||
|
||||
@ -68,42 +68,6 @@ class ExchangeSinkOperator;
|
||||
namespace vectorized {
|
||||
class Channel;
|
||||
|
||||
template <typename T>
|
||||
struct AtomicWrapper {
|
||||
std::atomic<T> _value;
|
||||
|
||||
AtomicWrapper() : _value() {}
|
||||
|
||||
AtomicWrapper(const std::atomic<T>& a) : _value(a.load()) {}
|
||||
|
||||
AtomicWrapper(const AtomicWrapper& other) : _value(other._value.load()) {}
|
||||
|
||||
AtomicWrapper& operator=(const AtomicWrapper& other) { _value.store(other._a.load()); }
|
||||
};
|
||||
|
||||
// We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock
|
||||
// will be shared between different channel, so we have to use a ref count to mark if this
|
||||
// PBlock is available for next serialization.
|
||||
class BroadcastPBlockHolder {
|
||||
public:
|
||||
BroadcastPBlockHolder() : _ref_count(0) {}
|
||||
~BroadcastPBlockHolder() noexcept = default;
|
||||
|
||||
void unref() noexcept {
|
||||
DCHECK_GT(_ref_count._value, 0);
|
||||
_ref_count._value.fetch_sub(1);
|
||||
}
|
||||
void ref() noexcept { _ref_count._value.fetch_add(1); }
|
||||
|
||||
bool available() { return _ref_count._value == 0; }
|
||||
|
||||
PBlock* get_block() { return &pblock; }
|
||||
|
||||
private:
|
||||
AtomicWrapper<uint32_t> _ref_count;
|
||||
PBlock pblock;
|
||||
};
|
||||
|
||||
class VDataStreamSender : public DataSink {
|
||||
public:
|
||||
friend class pipeline::ExchangeSinkOperator;
|
||||
@ -525,11 +489,23 @@ public:
|
||||
_buffer->register_sink(_fragment_instance_id);
|
||||
}
|
||||
|
||||
pipeline::SelfDeleteClosure<PTransmitDataResult>* get_closure(
|
||||
InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data) {
|
||||
if (!_closure) {
|
||||
_closure.reset(new pipeline::SelfDeleteClosure<PTransmitDataResult>());
|
||||
} else {
|
||||
_closure->cntl.Reset();
|
||||
}
|
||||
_closure->init(id, eos, data);
|
||||
return _closure.get();
|
||||
}
|
||||
|
||||
private:
|
||||
friend class VDataStreamSender;
|
||||
|
||||
pipeline::ExchangeSinkBuffer* _buffer = nullptr;
|
||||
bool _eos_send = false;
|
||||
std::unique_ptr<pipeline::SelfDeleteClosure<PTransmitDataResult>> _closure = nullptr;
|
||||
};
|
||||
|
||||
} // namespace vectorized
|
||||
|
||||
Reference in New Issue
Block a user