[Bug](exchange) change BlockSerializer from unique_ptr to object (#22653)

change BlockSerializer from unique_ptr to object
This commit is contained in:
Pxl
2023-08-07 14:47:21 +08:00
committed by GitHub
parent 9c91e80b0c
commit 591aee528d
5 changed files with 71 additions and 99 deletions

View File

@ -89,12 +89,12 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
// Result file sink is not the top sink
if (params.__isset.destinations && params.destinations.size() > 0) {
sink->reset(new doris::vectorized::VResultFileSink(
pool, params.sender_id, row_desc, thrift_sink.result_file_sink,
state, pool, params.sender_id, row_desc, thrift_sink.result_file_sink,
params.destinations, 16 * 1024, send_query_statistics_with_every_batch,
output_exprs, desc_tbl));
} else {
sink->reset(new doris::vectorized::VResultFileSink(
pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
state, pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
send_query_statistics_with_every_batch, output_exprs));
}
break;
@ -226,12 +226,12 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
// Result file sink is not the top sink
if (params.__isset.destinations && params.destinations.size() > 0) {
sink->reset(new doris::vectorized::VResultFileSink(
pool, local_params.sender_id, row_desc, thrift_sink.result_file_sink,
state, pool, local_params.sender_id, row_desc, thrift_sink.result_file_sink,
params.destinations, 16 * 1024, send_query_statistics_with_every_batch,
output_exprs, desc_tbl));
} else {
sink->reset(new doris::vectorized::VResultFileSink(
pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
state, pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
send_query_statistics_with_every_batch, output_exprs));
}
break;

View File

@ -95,8 +95,6 @@ Status Channel::init(RuntimeState* state) {
_fragment_instance_id, _dest_node_id);
}
_serializer.reset(new BlockSerializer(_parent, _is_local));
// In bucket shuffle join will set fragment_instance_id (-1, -1)
// to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
// so the empty channel not need call function close_internal()
@ -113,7 +111,7 @@ Status Channel::send_current_block(bool eos) {
}
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
if (eos) {
RETURN_IF_ERROR(_serializer->serialize_block(_ch_cur_pb_block, 1));
RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1));
}
RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos));
ch_roll_pb_block();
@ -122,8 +120,8 @@ Status Channel::send_current_block(bool eos) {
Status Channel::send_local_block(bool eos) {
SCOPED_TIMER(_parent->_local_send_timer);
Block block = _serializer->get_block()->to_block();
_serializer->get_block()->set_muatable_columns(block.clone_empty_columns());
Block block = _serializer.get_block()->to_block();
_serializer.get_block()->set_muatable_columns(block.clone_empty_columns());
if (_recvr_is_valid()) {
COUNTER_UPDATE(_parent->_local_bytes_send_counter, block.bytes());
COUNTER_UPDATE(_parent->_local_sent_rows, block.rows());
@ -134,7 +132,7 @@ Status Channel::send_local_block(bool eos) {
}
return Status::OK();
} else {
_serializer->reset_block();
_serializer.reset_block();
return _receiver_status;
}
}
@ -205,7 +203,7 @@ Status Channel::add_rows(Block* block, const std::vector<int>& rows) {
bool serialized = false;
RETURN_IF_ERROR(
_serializer->next_serialized_block(block, _ch_cur_pb_block, 1, &serialized, &rows));
_serializer.next_serialized_block(block, _ch_cur_pb_block, 1, &serialized, &rows));
if (serialized) {
RETURN_IF_ERROR(send_current_block(false));
}
@ -224,9 +222,7 @@ Status Channel::close_wait(RuntimeState* state) {
_need_close = false;
return st;
}
if (_serializer) {
_serializer->reset_block();
}
_serializer.reset_block();
return Status::OK();
}
@ -236,14 +232,14 @@ Status Channel::close_internal() {
}
VLOG_RPC << "Channel::close() instance_id=" << _fragment_instance_id
<< " dest_node=" << _dest_node_id << " #rows= "
<< ((_serializer->get_block() == nullptr) ? 0 : _serializer->get_block()->rows())
<< ((_serializer.get_block() == nullptr) ? 0 : _serializer.get_block()->rows())
<< " receiver status: " << _receiver_status;
if (is_receiver_eof()) {
_serializer->reset_block();
_serializer.reset_block();
return Status::OK();
}
Status status;
if (_serializer->get_block() != nullptr && _serializer->get_block()->rows() > 0) {
if (_serializer.get_block() != nullptr && _serializer.get_block()->rows() > 0) {
status = send_current_block(true);
} else {
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
@ -286,6 +282,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int
int per_channel_buffer_size,
bool send_query_statistics_with_every_batch)
: _sender_id(sender_id),
_state(state),
_pool(pool),
_row_desc(row_desc),
_current_channel_idx(0),
@ -299,7 +296,8 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int
_blocks_sent_counter(nullptr),
_local_bytes_send_counter(nullptr),
_dest_node_id(sink.dest_node_id),
_transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) {
_transfer_large_data_by_brpc(config::transfer_large_data_by_brpc),
_serializer(this) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
@ -344,12 +342,13 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int
}
}
VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
PlanNodeId dest_node_id,
VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id,
const RowDescriptor& row_desc, PlanNodeId dest_node_id,
const std::vector<TPlanFragmentDestination>& destinations,
int per_channel_buffer_size,
bool send_query_statistics_with_every_batch)
: _sender_id(sender_id),
_state(state),
_pool(pool),
_row_desc(row_desc),
_current_channel_idx(0),
@ -365,7 +364,8 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD
_split_block_distribute_by_channel_timer(nullptr),
_blocks_sent_counter(nullptr),
_local_bytes_send_counter(nullptr),
_dest_node_id(dest_node_id) {
_dest_node_id(dest_node_id),
_serializer(this) {
_cur_pb_block = &_pb_block1;
_name = "VDataStreamSender";
std::map<int64_t, int64_t> fragment_id_to_channel_index;
@ -384,30 +384,6 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD
}
}
VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc,
int per_channel_buffer_size,
bool send_query_statistics_with_every_batch)
: _sender_id(0),
_pool(pool),
_row_desc(row_desc),
_current_channel_idx(0),
_profile(nullptr),
_serialize_batch_timer(nullptr),
_compress_timer(nullptr),
_brpc_send_timer(nullptr),
_brpc_wait_timer(nullptr),
_bytes_sent_counter(nullptr),
_local_send_timer(nullptr),
_split_block_hash_compute_timer(nullptr),
_split_block_distribute_by_channel_timer(nullptr),
_blocks_sent_counter(nullptr),
_peak_memory_usage_counter(nullptr),
_local_bytes_send_counter(nullptr),
_dest_node_id(0) {
_cur_pb_block = &_pb_block1;
_name = "VDataStreamSender";
}
VDataStreamSender::~VDataStreamSender() {
_channel_shared_ptrs.clear();
}
@ -429,7 +405,6 @@ Status VDataStreamSender::init(const TDataSink& tsink) {
Status VDataStreamSender::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::prepare(state));
_state = state;
std::vector<std::string> instances;
for (const auto& channel : _channels) {
@ -454,8 +429,6 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, _row_desc));
}
_serializer.reset(new BlockSerializer(this));
_bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
_uncompressed_bytes_counter = ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
_local_sent_rows = ADD_COUNTER(profile(), "LocalSentRows", TUnit::UNIT);
@ -521,27 +494,27 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
#ifndef BROADCAST_ALL_CHANNELS
#define BROADCAST_ALL_CHANNELS(PBLOCK, PBLOCK_TO_SEND, POST_PROCESS) \
{ \
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); \
bool serialized = false; \
RETURN_IF_ERROR( \
_serializer->next_serialized_block(block, PBLOCK, _channels.size(), &serialized)); \
if (serialized) { \
Status status; \
for (auto channel : _channels) { \
if (!channel->is_receiver_eof()) { \
if (channel->is_local()) { \
status = channel->send_local_block(block); \
} else { \
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); \
status = channel->send_block(PBLOCK_TO_SEND, false); \
} \
HANDLE_CHANNEL_STATUS(state, channel, status); \
} \
} \
POST_PROCESS; \
} \
#define BROADCAST_ALL_CHANNELS(PBLOCK, PBLOCK_TO_SEND, POST_PROCESS) \
{ \
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); \
bool serialized = false; \
RETURN_IF_ERROR( \
_serializer.next_serialized_block(block, PBLOCK, _channels.size(), &serialized)); \
if (serialized) { \
Status status; \
for (auto channel : _channels) { \
if (!channel->is_receiver_eof()) { \
if (channel->is_local()) { \
status = channel->send_local_block(block); \
} else { \
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); \
status = channel->send_block(PBLOCK_TO_SEND, false); \
} \
HANDLE_CHANNEL_STATUS(state, channel, status); \
} \
} \
POST_PROCESS; \
} \
}
#endif
// 1. serialize depends on it is not local exchange
@ -574,7 +547,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
RETURN_IF_ERROR(
_serializer->serialize_block(block, current_channel->ch_cur_pb_block()));
_serializer.serialize_block(block, current_channel->ch_cur_pb_block()));
auto status = current_channel->send_block(current_channel->ch_cur_pb_block(), eos);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
current_channel->ch_roll_pb_block();
@ -648,14 +621,14 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
}
Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) {
if (_serializer->get_block() && _serializer->get_block()->rows() > 0) {
if (_serializer.get_block() && _serializer.get_block()->rows() > 0) {
BroadcastPBlockHolder* block_holder = nullptr;
RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
{
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
Block block = _serializer->get_block()->to_block();
RETURN_IF_ERROR(_serializer->serialize_block(&block, block_holder->get_block(),
_channels.size()));
Block block = _serializer.get_block()->to_block();
RETURN_IF_ERROR(_serializer.serialize_block(&block, block_holder->get_block(),
_channels.size()));
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
@ -690,10 +663,10 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) {
{
// send last block
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
if (_serializer && _serializer->get_block() && _serializer->get_block()->rows() > 0) {
Block block = _serializer->get_block()->to_block();
if (_serializer.get_block() && _serializer.get_block()->rows() > 0) {
Block block = _serializer.get_block()->to_block();
RETURN_IF_ERROR(
_serializer->serialize_block(&block, _cur_pb_block, _channels.size()));
_serializer.serialize_block(&block, _cur_pb_block, _channels.size()));
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {

View File

@ -97,14 +97,11 @@ public:
const std::vector<TPlanFragmentDestination>& destinations,
int per_channel_buffer_size, bool send_query_statistics_with_every_batch);
VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
PlanNodeId dest_node_id,
VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id,
const RowDescriptor& row_desc, PlanNodeId dest_node_id,
const std::vector<TPlanFragmentDestination>& destinations,
int per_channel_buffer_size, bool send_query_statistics_with_every_batch);
VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc, int per_channel_buffer_size,
bool send_query_statistics_with_every_batch);
~VDataStreamSender() override;
Status init(const TDataSink& thrift_sink) override;
@ -209,7 +206,7 @@ protected:
bool _only_local_exchange = false;
bool _enable_pipeline_exec = false;
std::unique_ptr<BlockSerializer> _serializer;
BlockSerializer _serializer;
};
class Channel {
@ -234,10 +231,10 @@ public:
_closed(false),
_brpc_dest_addr(brpc_dest),
_is_transfer_chain(is_transfer_chain),
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) {
std::string localhost = BackendOptions::get_localhost();
_is_local = (_brpc_dest_addr.hostname == localhost) &&
(_brpc_dest_addr.port == config::brpc_port);
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
_is_local((_brpc_dest_addr.hostname == BackendOptions::get_localhost()) &&
(_brpc_dest_addr.port == config::brpc_port)),
_serializer(_parent, _is_local) {
if (_is_local) {
VLOG_NOTICE << "will use local Exchange, dest_node_id is : " << _dest_node_id;
}
@ -385,7 +382,7 @@ protected:
PBlock _ch_pb_block1;
PBlock _ch_pb_block2;
std::unique_ptr<BlockSerializer> _serializer;
BlockSerializer _serializer;
};
#define HANDLE_CHANNEL_STATUS(state, channel, status) \
@ -490,7 +487,7 @@ public:
bool serialized = false;
_pblock = std::make_unique<PBlock>();
RETURN_IF_ERROR(
_serializer->next_serialized_block(block, _pblock.get(), 1, &serialized, &rows));
_serializer.next_serialized_block(block, _pblock.get(), 1, &serialized, &rows));
if (serialized) {
RETURN_IF_ERROR(send_current_block(false));
}
@ -506,7 +503,7 @@ public:
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
if (eos) {
_pblock = std::make_unique<PBlock>();
RETURN_IF_ERROR(_serializer->serialize_block(_pblock.get(), 1));
RETURN_IF_ERROR(_serializer.serialize_block(_pblock.get(), 1));
}
RETURN_IF_ERROR(send_block(_pblock.release(), eos));
return Status::OK();

View File

@ -45,8 +45,9 @@ class TExpr;
namespace doris::vectorized {
VResultFileSink::VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc,
const TResultFileSink& sink, int per_channel_buffer_size,
VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool,
const RowDescriptor& row_desc, const TResultFileSink& sink,
int per_channel_buffer_size,
bool send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr)
: _t_output_expr(t_output_expr), _row_desc(row_desc) {
@ -62,8 +63,8 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc
_header = sink.header;
}
VResultFileSink::VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
const TResultFileSink& sink,
VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool, int sender_id,
const RowDescriptor& row_desc, const TResultFileSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
int per_channel_buffer_size,
bool send_query_statistics_with_every_batch,
@ -77,7 +78,7 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, int sender_id, const RowDescr
_storage_type = sink.storage_backend_type;
_is_top_sink = false;
CHECK_EQ(destinations.size(), 1);
_stream_sender.reset(new VDataStreamSender(pool, sender_id, row_desc, sink.dest_node_id,
_stream_sender.reset(new VDataStreamSender(state, pool, sender_id, row_desc, sink.dest_node_id,
destinations, per_channel_buffer_size,
send_query_statistics_with_every_batch));

View File

@ -46,11 +46,12 @@ class VExprContext;
class VResultFileSink : public DataSink {
public:
VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc, const TResultFileSink& sink,
int per_channel_buffer_size, bool send_query_statistics_with_every_batch,
VResultFileSink(RuntimeState* state, ObjectPool* pool, const RowDescriptor& row_desc,
const TResultFileSink& sink, int per_channel_buffer_size,
bool send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr);
VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
const TResultFileSink& sink,
VResultFileSink(RuntimeState* state, ObjectPool* pool, int sender_id,
const RowDescriptor& row_desc, const TResultFileSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
int per_channel_buffer_size, bool send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs);