[refactor](broadcastbuffer) using a queue to remove ref and unref codes (#28698)

Co-authored-by: yiguolei <yiguolei@gmail.com>Add a new class broadcastbufferholderqueue to manage holders
Using shared ptr to manage holders, not use ref and unref, it is too difficult to maintain.
This commit is contained in:
yiguolei
2023-12-20 21:23:25 +08:00
committed by GitHub
parent a8dcca98ec
commit 18ad8562f2
10 changed files with 120 additions and 145 deletions

View File

@ -45,16 +45,40 @@
#include "vec/sink/vdata_stream_sender.h"
namespace doris {
namespace vectorized {
void BroadcastPBlockHolder::unref() noexcept {
DCHECK_GT(_ref_count._value, 0);
auto old_value = _ref_count._value.fetch_sub(1);
if (_dep && old_value == 1) {
_dep->return_available_block();
namespace vectorized {
BroadcastPBlockHolder::~BroadcastPBlockHolder() {
// lock the parent queue, if the queue could lock success, then return the block
// to the queue, to reuse the block
std::shared_ptr<BroadcastPBlockHolderQueue> tmp_queue = _parent_creator.lock();
if (tmp_queue != nullptr) {
tmp_queue->push(BroadcastPBlockHolder::create_shared(std::move(_pblock)));
}
// If the queue already deconstruted, then release pblock automatically since it
// is a unique ptr.
}
void BroadcastPBlockHolderQueue::push(std::shared_ptr<BroadcastPBlockHolder> holder) {
std::unique_lock l(_holders_lock);
holder->set_parent_creator(shared_from_this());
_holders.push(holder);
if (_broadcast_dependency) {
_broadcast_dependency->set_ready();
}
}
std::shared_ptr<BroadcastPBlockHolder> BroadcastPBlockHolderQueue::pop() {
std::unique_lock l(_holders_lock);
if (_holders.empty()) {
return {};
}
std::shared_ptr<BroadcastPBlockHolder> res = _holders.top();
_holders.pop();
if (_holders.empty() && _broadcast_dependency != nullptr) {
_broadcast_dependency->block();
}
return res;
}
} // namespace vectorized
namespace pipeline {
@ -184,12 +208,10 @@ Status ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
template <typename Parent>
Status ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& request) {
if (_is_finishing) {
request.block_holder->unref();
return Status::OK();
}
TUniqueId ins_id = request.channel->_fragment_instance_id;
if (_is_receiver_eof(ins_id.lo)) {
request.block_holder->unref();
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
@ -243,7 +265,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {
if (!request.exec_status.ok()) {
request.exec_status.to_protobuf(brpc_request->mutable_exec_status());
}
auto send_callback = request.channel->get_send_callback(id, request.eos, nullptr);
auto send_callback = request.channel->get_send_callback(id, request.eos);
_instance_to_rpc_ctx[id]._send_callback = send_callback;
_instance_to_rpc_ctx[id].is_cancelled = false;
@ -307,8 +329,7 @@ Status ExchangeSinkBuffer<Parent>::_send_rpc(InstanceLoId id) {
auto statistic = brpc_request->mutable_query_statistics();
_statistics->to_pb(statistic);
}
auto send_callback =
request.channel->get_send_callback(id, request.eos, request.block_holder);
auto send_callback = request.channel->get_send_callback(id, request.eos);
ExchangeRpcContext rpc_ctx;
rpc_ctx._send_callback = send_callback;

View File

@ -29,6 +29,7 @@
#include <memory>
#include <mutex>
#include <queue>
#include <stack>
#include <string>
#include "common/global_types.h"
@ -45,7 +46,6 @@ class TUniqueId;
using InstanceLoId = int64_t;
namespace pipeline {
class BroadcastDependency;
class ExchangeSinkQueueDependency;
class Dependency;
} // namespace pipeline
@ -71,25 +71,52 @@ struct AtomicWrapper {
// We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast shuffle, one PBlock
// will be shared between different channel, so we have to use a ref count to mark if this
// PBlock is available for next serialization.
class BroadcastPBlockHolderQueue;
class BroadcastPBlockHolder {
ENABLE_FACTORY_CREATOR(BroadcastPBlockHolder);
public:
BroadcastPBlockHolder() : _ref_count(0), _dep(nullptr) {}
BroadcastPBlockHolder(pipeline::BroadcastDependency* dep) : _ref_count(0), _dep(dep) {}
~BroadcastPBlockHolder() noexcept = default;
BroadcastPBlockHolder() { _pblock = std::make_unique<PBlock>(); }
BroadcastPBlockHolder(std::unique_ptr<PBlock>&& pblock) { _pblock = std::move(pblock); }
~BroadcastPBlockHolder();
void ref(int delta) noexcept { _ref_count._value.fetch_add(delta); }
void unref() noexcept;
void ref() noexcept { ref(1); }
bool available() { return _ref_count._value == 0; }
PBlock* get_block() { return &pblock; }
PBlock* get_block() { return _pblock.get(); }
private:
AtomicWrapper<int32_t> _ref_count;
PBlock pblock;
pipeline::BroadcastDependency* _dep = nullptr;
friend class BroadcastPBlockHolderQueue;
std::unique_ptr<PBlock> _pblock;
std::weak_ptr<BroadcastPBlockHolderQueue> _parent_creator;
void set_parent_creator(std::shared_ptr<BroadcastPBlockHolderQueue> parent_creator) {
_parent_creator = parent_creator;
}
};
// Use a stack inside to ensure that the PBlock is in cpu cache
class BroadcastPBlockHolderQueue : public std::enable_shared_from_this<BroadcastPBlockHolderQueue> {
ENABLE_FACTORY_CREATOR(BroadcastPBlockHolderQueue);
public:
BroadcastPBlockHolderQueue() = default;
BroadcastPBlockHolderQueue(std::shared_ptr<pipeline::Dependency>& broadcast_dependency) {
_broadcast_dependency = broadcast_dependency;
}
void push(std::shared_ptr<BroadcastPBlockHolder> holder);
bool empty() {
std::unique_lock l(_holders_lock);
return _holders.empty();
}
std::shared_ptr<BroadcastPBlockHolder> pop();
private:
std::stack<std::shared_ptr<BroadcastPBlockHolder>> _holders;
std::shared_ptr<pipeline::Dependency> _broadcast_dependency;
std::mutex _holders_lock;
};
} // namespace vectorized
namespace pipeline {
@ -104,7 +131,7 @@ struct TransmitInfo {
template <typename Parent>
struct BroadcastTransmitInfo {
vectorized::PipChannel<Parent>* channel = nullptr;
vectorized::BroadcastPBlockHolder* block_holder = nullptr;
std::shared_ptr<vectorized::BroadcastPBlockHolder> block_holder = nullptr;
bool eos;
};
@ -115,10 +142,9 @@ class ExchangeSendCallback : public ::doris::DummyBrpcCallback<Response> {
public:
ExchangeSendCallback() = default;
void init(InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data) {
void init(InstanceLoId id, bool eos) {
_id = id;
_eos = eos;
_data = data;
}
~ExchangeSendCallback() override = default;
@ -135,9 +161,6 @@ public:
void call() noexcept override {
try {
if (_data) {
_data->unref();
}
if (::doris::DummyBrpcCallback<Response>::cntl_->Failed()) {
std::string err = fmt::format(
"failed to send brpc when exchange, error={}, error_text={}, client: {}, "
@ -164,7 +187,6 @@ private:
std::function<void(const InstanceLoId&, const bool&, const Response&, const int64_t&)> _suc_fn;
InstanceLoId _id;
bool _eos;
vectorized::BroadcastPBlockHolder* _data = nullptr;
};
struct ExchangeRpcContext {

View File

@ -181,13 +181,13 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_exchange_sink_dependency->add_child(_queue_dependency);
if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) &&
!only_local_exchange) {
_broadcast_dependency = BroadcastDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
_broadcast_dependency->set_available_block(config::num_broadcast_buffer);
_broadcast_pb_blocks.reserve(config::num_broadcast_buffer);
for (size_t i = 0; i < config::num_broadcast_buffer; i++) {
_broadcast_pb_blocks.emplace_back(
vectorized::BroadcastPBlockHolder(_broadcast_dependency.get()));
_broadcast_dependency =
Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"BroadcastDependency", true, state->get_query_ctx());
_broadcast_pb_blocks =
vectorized::BroadcastPBlockHolderQueue::create_shared(_broadcast_dependency);
for (int i = 0; i < config::num_broadcast_buffer; ++i) {
_broadcast_pb_blocks->push(vectorized::BroadcastPBlockHolder::create_shared());
}
_exchange_sink_dependency->add_child(_broadcast_dependency);
@ -338,7 +338,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
}
}
} else {
vectorized::BroadcastPBlockHolder* block_holder = nullptr;
std::shared_ptr<vectorized::BroadcastPBlockHolder> block_holder = nullptr;
RETURN_IF_ERROR(local_state.get_next_available_buffer(&block_holder));
{
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
@ -355,13 +355,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
} else {
block_holder->get_block()->Clear();
}
local_state._broadcast_dependency->take_available_block();
block_holder->ref(local_state.channels.size());
for (auto channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
Status status;
if (channel->is_local()) {
block_holder->unref();
status = channel->send_local_block(&cur_block);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
@ -369,8 +366,6 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
block_holder, source_state == SourceState::FINISHED);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
} else {
block_holder->unref();
}
}
cur_block.clear_column_data();
@ -454,21 +449,16 @@ void ExchangeSinkLocalState::register_channels(
}
Status ExchangeSinkLocalState::get_next_available_buffer(
vectorized::BroadcastPBlockHolder** holder) {
std::shared_ptr<vectorized::BroadcastPBlockHolder>* holder) {
// This condition means we need use broadcast buffer, so we should make sure
// there are available buffer before running pipeline
for (size_t broadcast_pb_block_idx = 0; broadcast_pb_block_idx < _broadcast_pb_blocks.size();
broadcast_pb_block_idx++) {
if (_broadcast_pb_blocks[broadcast_pb_block_idx].available()) {
*holder = &_broadcast_pb_blocks[broadcast_pb_block_idx];
return Status::OK();
}
if (_broadcast_pb_blocks->empty()) {
return Status::InternalError("No broadcast buffer left! Dependency: {}",
_broadcast_dependency->debug_string());
} else {
*holder = _broadcast_pb_blocks->pop();
return Status::OK();
}
return Status::InternalError("No broadcast buffer left! Available blocks: " +
std::to_string(_broadcast_dependency->available_blocks()) +
" and number of buffer is " +
std::to_string(_broadcast_pb_blocks.size()) +
" Dependency: " + _broadcast_dependency->debug_string());
}
template <typename Channels, typename HashValueType>

View File

@ -72,51 +72,6 @@ public:
~ExchangeSinkQueueDependency() override = default;
};
class BroadcastDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(BroadcastDependency);
BroadcastDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "BroadcastDependency", true, query_ctx),
_available_block(0) {}
~BroadcastDependency() override = default;
std::string debug_string(int indentation_level = 0) override {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}{}: id={}, block task = {}, ready={}, _available_block = {}",
std::string(indentation_level * 2, ' '), _name, _node_id,
_blocked_task.size(), _ready, _available_block.load());
return fmt::to_string(debug_string_buffer);
}
void set_available_block(int available_block) { _available_block = available_block; }
void return_available_block() {
if (_available_block.fetch_add(1) == 0) {
std::lock_guard<std::mutex> lock(_lock);
if (_available_block == 0) {
return;
}
Dependency::set_ready();
}
}
void take_available_block() {
if (_available_block.fetch_sub(1) == 1) {
std::lock_guard<std::mutex> lock(_lock);
if (_available_block == 0) {
Dependency::block();
}
}
}
int available_blocks() const { return _available_block; }
private:
std::atomic<int> _available_block;
std::mutex _lock;
};
/**
* We use this to control the execution for local exchange.
* +---------------+ +---------------+ +---------------+
@ -165,7 +120,7 @@ public:
Dependency* finishdependency() override { return _finish_dependency.get(); }
Status serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers = 1);
void register_channels(pipeline::ExchangeSinkBuffer<ExchangeSinkLocalState>* buffer);
Status get_next_available_buffer(vectorized::BroadcastPBlockHolder** holder);
Status get_next_available_buffer(std::shared_ptr<vectorized::BroadcastPBlockHolder>* holder);
RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; }
RuntimeProfile::Counter* blocks_sent_counter() { return _blocks_sent_counter; }
@ -231,12 +186,12 @@ private:
// Sender instance id, unique within a fragment.
int _sender_id;
std::vector<vectorized::BroadcastPBlockHolder> _broadcast_pb_blocks;
std::shared_ptr<vectorized::BroadcastPBlockHolderQueue> _broadcast_pb_blocks;
vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer;
std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency;
std::shared_ptr<BroadcastDependency> _broadcast_dependency;
std::shared_ptr<Dependency> _broadcast_dependency;
std::vector<std::shared_ptr<LocalExchangeChannelDependency>> _local_channels_dependency;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
int _partition_count;

View File

@ -240,8 +240,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, Status exec_status)
status = channel->send_local_block(&cur_block);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_broadcast_block(_block_holder.get(),
true);
status = channel->send_broadcast_block(_block_holder, true);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
}

View File

@ -77,7 +77,7 @@ private:
std::vector<vectorized::Channel<ResultFileSinkLocalState>*> _channels;
bool _only_local_exchange = false;
vectorized::BlockSerializer<ResultFileSinkLocalState> _serializer;
std::unique_ptr<vectorized::BroadcastPBlockHolder> _block_holder;
std::shared_ptr<vectorized::BroadcastPBlockHolder> _block_holder;
RuntimeProfile::Counter* _brpc_wait_timer = nullptr;
RuntimeProfile::Counter* _local_send_timer = nullptr;
RuntimeProfile::Counter* _brpc_send_timer = nullptr;

View File

@ -67,6 +67,8 @@ struct BasicSharedState {
};
class Dependency : public std::enable_shared_from_this<Dependency> {
ENABLE_FACTORY_CREATOR(Dependency);
public:
Dependency(int id, int node_id, std::string name, QueryContext* query_ctx)
: _id(id),

View File

@ -40,7 +40,8 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv*
_start_time = VecDateTimeValue::local_time();
_shared_hash_table_controller.reset(new vectorized::SharedHashTableController());
_shared_scanner_controller.reset(new vectorized::SharedScannerController());
_execution_dependency.reset(new pipeline::Dependency(-1, -1, "ExecutionDependency", this));
_execution_dependency =
pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", this);
_runtime_filter_mgr.reset(
new RuntimeFilterMgr(TUniqueId(), RuntimeFilterParamsContext::create(this)));
}

View File

@ -377,8 +377,10 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int
}
_name = "VDataStreamSender";
if (_enable_pipeline_exec) {
_broadcast_pb_blocks.resize(config::num_broadcast_buffer);
_broadcast_pb_block_idx = 0;
_broadcast_pb_blocks = vectorized::BroadcastPBlockHolderQueue::create_shared();
for (int i = 0; i < config::num_broadcast_buffer; ++i) {
_broadcast_pb_blocks->push(vectorized::BroadcastPBlockHolder::create_shared());
}
} else {
_cur_pb_block = &_pb_block1;
}
@ -552,7 +554,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
}
}
} else if (_enable_pipeline_exec) {
BroadcastPBlockHolder* block_holder = nullptr;
std::shared_ptr<BroadcastPBlockHolder> block_holder = nullptr;
RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
{
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
@ -568,19 +570,15 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
block_holder->get_block()->Clear();
}
Status status;
block_holder->ref(_channels.size());
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
block_holder->unref();
status = channel->send_local_block(&cur_block);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_broadcast_block(block_holder, eos);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
} else {
block_holder->unref();
}
}
cur_block.clear_column_data();
@ -793,18 +791,14 @@ void VDataStreamSender::_roll_pb_block() {
_cur_pb_block = (_cur_pb_block == &_pb_block1 ? &_pb_block2 : &_pb_block1);
}
Status VDataStreamSender::_get_next_available_buffer(BroadcastPBlockHolder** holder) {
if (_broadcast_pb_block_idx >= _broadcast_pb_blocks.size()) {
return Status::InternalError(
"get_next_available_buffer meet invalid index, index={}, size={}",
_broadcast_pb_block_idx, _broadcast_pb_blocks.size());
Status VDataStreamSender::_get_next_available_buffer(
std::shared_ptr<BroadcastPBlockHolder>* holder) {
if (_broadcast_pb_blocks->empty()) {
return Status::InternalError("No broadcast buffer left!");
} else {
*holder = _broadcast_pb_blocks->pop();
return Status::OK();
}
if (!_broadcast_pb_blocks[_broadcast_pb_block_idx].available()) {
return Status::InternalError("broadcast_pb_blocks not available");
}
*holder = &_broadcast_pb_blocks[_broadcast_pb_block_idx];
_broadcast_pb_block_idx++;
return Status::OK();
}
void VDataStreamSender::register_pipeline_channels(
@ -819,16 +813,7 @@ bool VDataStreamSender::channel_all_can_write() {
!_only_local_exchange) {
// This condition means we need use broadcast buffer, so we should make sure
// there are available buffer before running pipeline
if (_broadcast_pb_block_idx == _broadcast_pb_blocks.size()) {
_broadcast_pb_block_idx = 0;
}
for (; _broadcast_pb_block_idx < _broadcast_pb_blocks.size(); _broadcast_pb_block_idx++) {
if (_broadcast_pb_blocks[_broadcast_pb_block_idx].available()) {
return true;
}
}
return false;
return !_broadcast_pb_blocks->empty();
} else {
for (auto channel : _channels) {
if (!channel->can_write()) {

View File

@ -158,7 +158,7 @@ protected:
friend class pipeline::ExchangeSinkBuffer<VDataStreamSender>;
void _roll_pb_block();
Status _get_next_available_buffer(BroadcastPBlockHolder** holder);
Status _get_next_available_buffer(std::shared_ptr<BroadcastPBlockHolder>* holder);
template <typename Channels, typename HashValueType>
Status channel_add_rows(RuntimeState* state, Channels& channels, int num_channels,
@ -185,8 +185,7 @@ protected:
PBlock* _cur_pb_block = nullptr;
// used by pipeline engine
std::vector<BroadcastPBlockHolder> _broadcast_pb_blocks;
int _broadcast_pb_block_idx;
std::shared_ptr<BroadcastPBlockHolderQueue> _broadcast_pb_blocks;
std::unique_ptr<PartitionerBase> _partitioner;
size_t _partition_count;
@ -273,7 +272,8 @@ public:
virtual Status send_remote_block(PBlock* block, bool eos = false,
Status exec_status = Status::OK());
virtual Status send_broadcast_block(BroadcastPBlockHolder* block, bool eos = false) {
virtual Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
bool eos = false) {
return Status::InternalError("Send BroadcastPBlockHolder is not allowed!");
}
@ -488,11 +488,11 @@ public:
return Status::OK();
}
Status send_broadcast_block(BroadcastPBlockHolder* block, bool eos = false) override {
Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
bool eos = false) override {
COUNTER_UPDATE(Channel<Parent>::_parent->blocks_sent_counter(), 1);
if (eos) {
if (_eos_send) {
block->unref();
return Status::OK();
}
_eos_send = true;
@ -536,13 +536,13 @@ public:
}
std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>> get_send_callback(
InstanceLoId id, bool eos, vectorized::BroadcastPBlockHolder* data) {
InstanceLoId id, bool eos) {
if (!_send_callback) {
_send_callback = pipeline::ExchangeSendCallback<PTransmitDataResult>::create_shared();
} else {
_send_callback->cntl_->Reset();
}
_send_callback->init(id, eos, data);
_send_callback->init(id, eos);
return _send_callback;
}