[Bug](pipeline) Fix bug in non-local exchange on pipeline engine (#16463)

Currently, for broadcast shuffle, we serialize a block once and then send it by RPC through multiple channel. After this, we will serialize next block in the same memory for consideration of memory reuse. However, since the RPC is asynchronized, maybe the next block serialization will happen before sending the previous block.

So, in this PR, I use a ref count to identify if the serialized block can be reuse in broadcast shuffle.
This commit is contained in:
Gabriel
2023-02-09 19:22:40 +08:00
committed by GitHub
parent 539fd684e9
commit a038fdaec6
5 changed files with 207 additions and 49 deletions

View File

@ -896,6 +896,8 @@ CONF_Int32(query_bkd_inverted_index_limit_percent, "5"); // 5%
CONF_String(inverted_index_dict_path, "${DORIS_HOME}/dict");
// tree depth for bkd index
CONF_Int32(max_depth_in_bkd_tree, "32");
// use num_broadcast_buffer blocks as buffer to do broadcast
CONF_Int32(num_broadcast_buffer, "32");
#ifdef BE_TEST
// test s3
CONF_String(test_s3_resource, "resource");

View File

@ -32,7 +32,8 @@ namespace doris::pipeline {
template <typename T>
class SelfDeleteClosure : public google::protobuf::Closure {
public:
SelfDeleteClosure(InstanceLoId id, bool eos) : _id(id), _eos(eos) {}
SelfDeleteClosure(InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data = nullptr)
: _id(id), _eos(eos), _data(data) {}
~SelfDeleteClosure() override = default;
SelfDeleteClosure(const SelfDeleteClosure& other) = delete;
SelfDeleteClosure& operator=(const SelfDeleteClosure& other) = delete;
@ -56,6 +57,9 @@ public:
} else {
_suc_fn(_id, _eos, result);
}
if (_data) {
_data->unref();
}
} catch (const std::exception& exp) {
LOG(FATAL) << "brpc callback error: " << exp.what();
} catch (...) {
@ -71,6 +75,7 @@ private:
std::function<void(const InstanceLoId&, const bool&, const T&)> _suc_fn;
InstanceLoId _id;
bool _eos;
vectorized::BroadcastPBlockHolder* _data;
};
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id,
@ -126,6 +131,8 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
_instance_to_package_queue_mutex[low_id] = std::make_unique<std::mutex>();
_instance_to_seq[low_id] = 0;
_instance_to_package_queue[low_id] = std::queue<TransmitInfo, std::list<TransmitInfo>>();
_instance_to_broadcast_package_queue[low_id] =
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>();
PUniqueId finst_id;
finst_id.set_hi(fragment_instance_id.hi);
finst_id.set_lo(fragment_instance_id.lo);
@ -155,59 +162,93 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) {
return Status::OK();
}
Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
if (_is_finishing) {
return Status::OK();
}
TUniqueId ins_id = request.channel->_fragment_instance_id;
bool send_now = false;
request.block_holder->ref();
{
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[ins_id.lo]);
// Do not have in process rpc, directly send
if (_instance_to_sending_by_pipeline[ins_id.lo]) {
send_now = true;
_instance_to_sending_by_pipeline[ins_id.lo] = false;
}
_instance_to_broadcast_package_queue[ins_id.lo].emplace(std::move(request));
}
if (send_now) {
RETURN_IF_ERROR(_send_rpc(ins_id.lo));
}
return Status::OK();
}
Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
std::queue<TransmitInfo, std::list<TransmitInfo>>& q = _instance_to_package_queue[id];
if (q.empty() || _is_finishing) {
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>& broadcast_q =
_instance_to_broadcast_package_queue[id];
if (_is_finishing) {
_instance_to_sending_by_pipeline[id] = true;
return Status::OK();
}
TransmitInfo& request = q.front();
#define DO_RPC(QUEUE, BLOCK, HOLDER) \
auto& request = QUEUE.front(); \
if (!_instance_to_request[id]) { \
_construct_request(id); \
} \
auto brpc_request = _instance_to_request[id]; \
brpc_request->set_eos(request.eos); \
brpc_request->set_packet_seq(_instance_to_seq[id]++); \
if (request.BLOCK) { \
brpc_request->set_allocated_block(request.BLOCK); \
} \
auto* _closure = new SelfDeleteClosure<PTransmitDataResult>(id, request.eos, HOLDER); \
_closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms); \
_closure->addFailedHandler( \
[&](const InstanceLoId& id, const std::string& err) { _failed(id, err); }); \
_closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, \
const PTransmitDataResult& result) { \
Status s = Status(result.status()); \
if (!s.ok()) { \
_failed(id, \
fmt::format("exchange req success but status isn't ok: {}", s.to_string())); \
} else if (eos) { \
_ended(id); \
} else { \
_send_rpc(id); \
} \
}); \
{ \
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); \
if (enable_http_send_block(*brpc_request)) { \
RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure, \
*brpc_request, request.channel->_brpc_dest_addr)); \
} else { \
transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request); \
} \
} \
if (request.BLOCK) { \
brpc_request->release_block(); \
} \
QUEUE.pop();
if (!_instance_to_request[id]) {
_construct_request(id);
if (!q.empty()) {
// If we have data to shuffle which is not broadcasted
DO_RPC(q, block, nullptr)
} else if (!broadcast_q.empty()) {
// If we have data to shuffle which is broadcasted
DO_RPC(broadcast_q, block_holder->get_block(), request.block_holder)
} else {
_instance_to_sending_by_pipeline[id] = true;
return Status::OK();
}
auto brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
if (request.block) {
brpc_request->set_allocated_block(request.block.get());
}
auto* _closure = new SelfDeleteClosure<PTransmitDataResult>(id, request.eos);
_closure->cntl.set_timeout_ms(request.channel->_brpc_timeout_ms);
_closure->addFailedHandler(
[&](const InstanceLoId& id, const std::string& err) { _failed(id, err); });
_closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos,
const PTransmitDataResult& result) {
Status s = Status(result.status());
if (!s.ok()) {
_failed(id, fmt::format("exchange req success but status isn't ok: {}", s.to_string()));
} else if (eos) {
_ended(id);
} else {
_send_rpc(id);
}
});
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
if (enable_http_send_block(*brpc_request)) {
RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), _closure,
*brpc_request, request.channel->_brpc_dest_addr));
} else {
transmit_block(*request.channel->_brpc_stub, _closure, *brpc_request);
}
}
if (request.block) {
brpc_request->release_block();
}
q.pop();
return Status::OK();
}

View File

@ -31,13 +31,20 @@
namespace doris {
namespace vectorized {
class PipChannel;
}
class BroadcastPBlockHolder;
} // namespace vectorized
namespace pipeline {
using InstanceLoId = int64_t;
struct TransmitInfo {
vectorized::PipChannel* channel;
std::unique_ptr<PBlock> block;
PBlock* block;
bool eos;
};
struct BroadcastTransmitInfo {
vectorized::PipChannel* channel;
vectorized::BroadcastPBlockHolder* block_holder;
bool eos;
};
@ -50,6 +57,7 @@ public:
~ExchangeSinkBuffer();
void register_sink(TUniqueId);
Status add_block(TransmitInfo&& request);
Status add_block(BroadcastTransmitInfo&& request);
bool can_write() const;
bool is_pending_finish() const;
void close();
@ -57,8 +65,13 @@ public:
private:
phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
_instance_to_package_queue_mutex;
// store data in non-broadcast shuffle
phmap::flat_hash_map<InstanceLoId, std::queue<TransmitInfo, std::list<TransmitInfo>>>
_instance_to_package_queue;
// store data in broadcast shuffle
phmap::flat_hash_map<InstanceLoId,
std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>>
_instance_to_broadcast_package_queue;
using PackageSeq = int64_t;
// must init zero
phmap::flat_hash_map<InstanceLoId, PackageSeq> _instance_to_seq;

View File

@ -242,7 +242,7 @@ Status Channel::close_internal() {
RETURN_IF_ERROR(send_current_block(true));
} else {
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
RETURN_IF_ERROR(send_block(nullptr, true));
RETURN_IF_ERROR(send_block((PBlock*)nullptr, true));
}
// Don't wait for the last packet to finish, left it to close_wait.
return Status::OK();
@ -287,7 +287,6 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int
sink.output_partition.type == TPartitionType::RANDOM ||
sink.output_partition.type == TPartitionType::RANGE_PARTITIONED ||
sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED);
_cur_pb_block = &_pb_block1;
std::map<int64_t, int64_t> fragment_id_to_channel_index;
@ -317,6 +316,12 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int
}
}
_name = "VDataStreamSender";
if (state->enable_pipeline_exec()) {
_broadcast_pb_blocks.resize(config::num_broadcast_buffer);
_broadcast_pb_block_idx = 0;
} else {
_cur_pb_block = &_pb_block1;
}
}
VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
@ -470,6 +475,23 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
for (auto channel : _channels) {
RETURN_IF_ERROR(channel->send_local_block(block));
}
} else if (state->enable_pipeline_exec()) {
BroadcastPBlockHolder* block_holder = nullptr;
RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
{
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
RETURN_IF_ERROR(
serialize_block(block, block_holder->get_block(), _channels.size()));
}
for (auto channel : _channels) {
if (channel->is_local()) {
RETURN_IF_ERROR(channel->send_local_block(block));
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
RETURN_IF_ERROR(channel->send_block(block_holder, eos));
}
}
} else {
{
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
@ -620,6 +642,28 @@ void VDataStreamSender::_roll_pb_block() {
_cur_pb_block = (_cur_pb_block == &_pb_block1 ? &_pb_block2 : &_pb_block1);
}
Status VDataStreamSender::_get_next_available_buffer(BroadcastPBlockHolder** holder) {
constexpr int MAX_LOOP = 1000;
size_t it = 0;
while (it < MAX_LOOP) {
if (_broadcast_pb_block_idx == _broadcast_pb_blocks.size()) {
_broadcast_pb_block_idx = 0;
}
for (; _broadcast_pb_block_idx < _broadcast_pb_blocks.size(); _broadcast_pb_block_idx++) {
if (_broadcast_pb_blocks[_broadcast_pb_block_idx].available()) {
_broadcast_pb_block_idx++;
*holder = &_broadcast_pb_blocks[_broadcast_pb_block_idx - 1];
return Status::OK();
}
}
it++;
}
return Status::InternalError(
"Exceed the max loop limit when acquire the next available buffer!");
}
void VDataStreamSender::registe_channels(pipeline::ExchangeSinkBuffer* buffer) {
for (auto channel : _channels) {
((PipChannel*)channel)->registe(buffer);

View File

@ -49,6 +49,42 @@ namespace vectorized {
class VExprContext;
class Channel;
template <typename T>
struct AtomicWrapper {
std::atomic<T> _value;
AtomicWrapper() : _value() {}
AtomicWrapper(const std::atomic<T>& a) : _value(a.load()) {}
AtomicWrapper(const AtomicWrapper& other) : _value(other._value.load()) {}
AtomicWrapper& operator=(const AtomicWrapper& other) { _value.store(other._a.load()); }
};
// We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock
// will be shared between different channel, so we have to use a ref count to mark if this
// PBlock is available for next serialization.
class BroadcastPBlockHolder {
public:
BroadcastPBlockHolder() : _ref_count(0) {}
~BroadcastPBlockHolder() noexcept = default;
void unref() noexcept {
DCHECK_GT(_ref_count._value, 0);
_ref_count._value.fetch_sub(1);
}
void ref() noexcept { _ref_count._value.fetch_add(1); }
bool available() { return _ref_count._value == 0; }
PBlock* get_block() { return &pblock; }
private:
AtomicWrapper<uint32_t> _ref_count;
PBlock pblock;
};
class VDataStreamSender : public DataSink {
public:
friend class pipeline::ExchangeSinkOperator;
@ -91,6 +127,7 @@ protected:
friend class pipeline::ExchangeSinkBuffer;
void _roll_pb_block();
Status _get_next_available_buffer(BroadcastPBlockHolder** holder);
Status get_partition_column_result(Block* block, int* result) const {
int counter = 0;
@ -131,6 +168,10 @@ protected:
PBlock _pb_block2;
PBlock* _cur_pb_block;
// used by pipeline engine
std::vector<BroadcastPBlockHolder> _broadcast_pb_blocks;
int _broadcast_pb_block_idx;
// compute per-row partition values
std::vector<VExprContext*> _partition_expr_ctxs;
@ -219,6 +260,10 @@ public:
// if batch is nullptr, send the eof packet
virtual Status send_block(PBlock* block, bool eos = false);
virtual Status send_block(BroadcastPBlockHolder* block, bool eos = false) {
return Status::InternalError("Send BroadcastPBlockHolder is not allowed!");
}
Status add_rows(Block* block, const std::vector<int>& row);
virtual Status send_current_block(bool eos);
@ -369,8 +414,21 @@ public:
}
}
if (eos || block->column_metas_size()) {
RETURN_IF_ERROR(_buffer->add_block(
{this, block ? std::make_unique<PBlock>(*block) : nullptr, eos}));
RETURN_IF_ERROR(_buffer->add_block({this, block, eos}));
}
return Status::OK();
}
Status send_block(BroadcastPBlockHolder* block, bool eos = false) override {
if (eos) {
if (_eos_send) {
return Status::OK();
} else {
_eos_send = true;
}
}
if (eos || block->get_block()->column_metas_size()) {
RETURN_IF_ERROR(_buffer->add_block({this, block, eos}));
}
return Status::OK();
}