diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index c5d5446611..bc914818e8 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -89,12 +89,12 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink // Result file sink is not the top sink if (params.__isset.destinations && params.destinations.size() > 0) { sink->reset(new doris::vectorized::VResultFileSink( - pool, params.sender_id, row_desc, thrift_sink.result_file_sink, + state, pool, params.sender_id, row_desc, thrift_sink.result_file_sink, params.destinations, 16 * 1024, send_query_statistics_with_every_batch, output_exprs, desc_tbl)); } else { sink->reset(new doris::vectorized::VResultFileSink( - pool, row_desc, thrift_sink.result_file_sink, 16 * 1024, + state, pool, row_desc, thrift_sink.result_file_sink, 16 * 1024, send_query_statistics_with_every_batch, output_exprs)); } break; @@ -226,12 +226,12 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink // Result file sink is not the top sink if (params.__isset.destinations && params.destinations.size() > 0) { sink->reset(new doris::vectorized::VResultFileSink( - pool, local_params.sender_id, row_desc, thrift_sink.result_file_sink, + state, pool, local_params.sender_id, row_desc, thrift_sink.result_file_sink, params.destinations, 16 * 1024, send_query_statistics_with_every_batch, output_exprs, desc_tbl)); } else { sink->reset(new doris::vectorized::VResultFileSink( - pool, row_desc, thrift_sink.result_file_sink, 16 * 1024, + state, pool, row_desc, thrift_sink.result_file_sink, 16 * 1024, send_query_statistics_with_every_batch, output_exprs)); } break; diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index df705ee37b..2fd86e0c38 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -95,8 +95,6 @@ Status Channel::init(RuntimeState* state) { _fragment_instance_id, _dest_node_id); } - _serializer.reset(new BlockSerializer(_parent, _is_local)); - // In bucket shuffle join will set fragment_instance_id (-1, -1) // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0" // so the empty channel not need call function close_internal() @@ -113,7 +111,7 @@ Status Channel::send_current_block(bool eos) { } SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); if (eos) { - RETURN_IF_ERROR(_serializer->serialize_block(_ch_cur_pb_block, 1)); + RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1)); } RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos)); ch_roll_pb_block(); @@ -122,8 +120,8 @@ Status Channel::send_current_block(bool eos) { Status Channel::send_local_block(bool eos) { SCOPED_TIMER(_parent->_local_send_timer); - Block block = _serializer->get_block()->to_block(); - _serializer->get_block()->set_muatable_columns(block.clone_empty_columns()); + Block block = _serializer.get_block()->to_block(); + _serializer.get_block()->set_muatable_columns(block.clone_empty_columns()); if (_recvr_is_valid()) { COUNTER_UPDATE(_parent->_local_bytes_send_counter, block.bytes()); COUNTER_UPDATE(_parent->_local_sent_rows, block.rows()); @@ -134,7 +132,7 @@ Status Channel::send_local_block(bool eos) { } return Status::OK(); } else { - _serializer->reset_block(); + _serializer.reset_block(); return _receiver_status; } } @@ -205,7 +203,7 @@ Status Channel::add_rows(Block* block, const std::vector& rows) { bool serialized = false; RETURN_IF_ERROR( - _serializer->next_serialized_block(block, _ch_cur_pb_block, 1, &serialized, &rows)); + _serializer.next_serialized_block(block, _ch_cur_pb_block, 1, &serialized, &rows)); if (serialized) { RETURN_IF_ERROR(send_current_block(false)); } @@ -224,9 +222,7 @@ Status Channel::close_wait(RuntimeState* state) { _need_close = false; return st; } - if (_serializer) { - _serializer->reset_block(); - } + _serializer.reset_block(); return Status::OK(); } @@ -236,14 +232,14 @@ Status Channel::close_internal() { } VLOG_RPC << "Channel::close() instance_id=" << _fragment_instance_id << " dest_node=" << _dest_node_id << " #rows= " - << ((_serializer->get_block() == nullptr) ? 0 : _serializer->get_block()->rows()) + << ((_serializer.get_block() == nullptr) ? 0 : _serializer.get_block()->rows()) << " receiver status: " << _receiver_status; if (is_receiver_eof()) { - _serializer->reset_block(); + _serializer.reset_block(); return Status::OK(); } Status status; - if (_serializer->get_block() != nullptr && _serializer->get_block()->rows() > 0) { + if (_serializer.get_block() != nullptr && _serializer.get_block()->rows() > 0) { status = send_current_block(true); } else { SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); @@ -286,6 +282,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int int per_channel_buffer_size, bool send_query_statistics_with_every_batch) : _sender_id(sender_id), + _state(state), _pool(pool), _row_desc(row_desc), _current_channel_idx(0), @@ -299,7 +296,8 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int _blocks_sent_counter(nullptr), _local_bytes_send_counter(nullptr), _dest_node_id(sink.dest_node_id), - _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) { + _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc), + _serializer(this) { DCHECK_GT(destinations.size(), 0); DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED || sink.output_partition.type == TPartitionType::HASH_PARTITIONED || @@ -344,12 +342,13 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int } } -VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, - PlanNodeId dest_node_id, +VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id, + const RowDescriptor& row_desc, PlanNodeId dest_node_id, const std::vector& destinations, int per_channel_buffer_size, bool send_query_statistics_with_every_batch) : _sender_id(sender_id), + _state(state), _pool(pool), _row_desc(row_desc), _current_channel_idx(0), @@ -365,7 +364,8 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD _split_block_distribute_by_channel_timer(nullptr), _blocks_sent_counter(nullptr), _local_bytes_send_counter(nullptr), - _dest_node_id(dest_node_id) { + _dest_node_id(dest_node_id), + _serializer(this) { _cur_pb_block = &_pb_block1; _name = "VDataStreamSender"; std::map fragment_id_to_channel_index; @@ -384,30 +384,6 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD } } -VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc, - int per_channel_buffer_size, - bool send_query_statistics_with_every_batch) - : _sender_id(0), - _pool(pool), - _row_desc(row_desc), - _current_channel_idx(0), - _profile(nullptr), - _serialize_batch_timer(nullptr), - _compress_timer(nullptr), - _brpc_send_timer(nullptr), - _brpc_wait_timer(nullptr), - _bytes_sent_counter(nullptr), - _local_send_timer(nullptr), - _split_block_hash_compute_timer(nullptr), - _split_block_distribute_by_channel_timer(nullptr), - _blocks_sent_counter(nullptr), - _peak_memory_usage_counter(nullptr), - _local_bytes_send_counter(nullptr), - _dest_node_id(0) { - _cur_pb_block = &_pb_block1; - _name = "VDataStreamSender"; -} - VDataStreamSender::~VDataStreamSender() { _channel_shared_ptrs.clear(); } @@ -429,7 +405,6 @@ Status VDataStreamSender::init(const TDataSink& tsink) { Status VDataStreamSender::prepare(RuntimeState* state) { RETURN_IF_ERROR(DataSink::prepare(state)); - _state = state; std::vector instances; for (const auto& channel : _channels) { @@ -454,8 +429,6 @@ Status VDataStreamSender::prepare(RuntimeState* state) { RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, _row_desc)); } - _serializer.reset(new BlockSerializer(this)); - _bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES); _uncompressed_bytes_counter = ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES); _local_sent_rows = ADD_COUNTER(profile(), "LocalSentRows", TUnit::UNIT); @@ -521,27 +494,27 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) { #ifndef BROADCAST_ALL_CHANNELS -#define BROADCAST_ALL_CHANNELS(PBLOCK, PBLOCK_TO_SEND, POST_PROCESS) \ - { \ - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); \ - bool serialized = false; \ - RETURN_IF_ERROR( \ - _serializer->next_serialized_block(block, PBLOCK, _channels.size(), &serialized)); \ - if (serialized) { \ - Status status; \ - for (auto channel : _channels) { \ - if (!channel->is_receiver_eof()) { \ - if (channel->is_local()) { \ - status = channel->send_local_block(block); \ - } else { \ - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); \ - status = channel->send_block(PBLOCK_TO_SEND, false); \ - } \ - HANDLE_CHANNEL_STATUS(state, channel, status); \ - } \ - } \ - POST_PROCESS; \ - } \ +#define BROADCAST_ALL_CHANNELS(PBLOCK, PBLOCK_TO_SEND, POST_PROCESS) \ + { \ + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); \ + bool serialized = false; \ + RETURN_IF_ERROR( \ + _serializer.next_serialized_block(block, PBLOCK, _channels.size(), &serialized)); \ + if (serialized) { \ + Status status; \ + for (auto channel : _channels) { \ + if (!channel->is_receiver_eof()) { \ + if (channel->is_local()) { \ + status = channel->send_local_block(block); \ + } else { \ + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); \ + status = channel->send_block(PBLOCK_TO_SEND, false); \ + } \ + HANDLE_CHANNEL_STATUS(state, channel, status); \ + } \ + } \ + POST_PROCESS; \ + } \ } #endif // 1. serialize depends on it is not local exchange @@ -574,7 +547,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR( - _serializer->serialize_block(block, current_channel->ch_cur_pb_block())); + _serializer.serialize_block(block, current_channel->ch_cur_pb_block())); auto status = current_channel->send_block(current_channel->ch_cur_pb_block(), eos); HANDLE_CHANNEL_STATUS(state, current_channel, status); current_channel->ch_roll_pb_block(); @@ -648,14 +621,14 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { } Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) { - if (_serializer->get_block() && _serializer->get_block()->rows() > 0) { + if (_serializer.get_block() && _serializer.get_block()->rows() > 0) { BroadcastPBlockHolder* block_holder = nullptr; RETURN_IF_ERROR(_get_next_available_buffer(&block_holder)); { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - Block block = _serializer->get_block()->to_block(); - RETURN_IF_ERROR(_serializer->serialize_block(&block, block_holder->get_block(), - _channels.size())); + Block block = _serializer.get_block()->to_block(); + RETURN_IF_ERROR(_serializer.serialize_block(&block, block_holder->get_block(), + _channels.size())); Status status; for (auto channel : _channels) { if (!channel->is_receiver_eof()) { @@ -690,10 +663,10 @@ Status VDataStreamSender::close(RuntimeState* state, Status exec_status) { { // send last block SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - if (_serializer && _serializer->get_block() && _serializer->get_block()->rows() > 0) { - Block block = _serializer->get_block()->to_block(); + if (_serializer.get_block() && _serializer.get_block()->rows() > 0) { + Block block = _serializer.get_block()->to_block(); RETURN_IF_ERROR( - _serializer->serialize_block(&block, _cur_pb_block, _channels.size())); + _serializer.serialize_block(&block, _cur_pb_block, _channels.size())); Status status; for (auto channel : _channels) { if (!channel->is_receiver_eof()) { diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 77dfe0b20a..770a381da4 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -97,14 +97,11 @@ public: const std::vector& destinations, int per_channel_buffer_size, bool send_query_statistics_with_every_batch); - VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, - PlanNodeId dest_node_id, + VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id, + const RowDescriptor& row_desc, PlanNodeId dest_node_id, const std::vector& destinations, int per_channel_buffer_size, bool send_query_statistics_with_every_batch); - VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc, int per_channel_buffer_size, - bool send_query_statistics_with_every_batch); - ~VDataStreamSender() override; Status init(const TDataSink& thrift_sink) override; @@ -209,7 +206,7 @@ protected: bool _only_local_exchange = false; bool _enable_pipeline_exec = false; - std::unique_ptr _serializer; + BlockSerializer _serializer; }; class Channel { @@ -234,10 +231,10 @@ public: _closed(false), _brpc_dest_addr(brpc_dest), _is_transfer_chain(is_transfer_chain), - _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) { - std::string localhost = BackendOptions::get_localhost(); - _is_local = (_brpc_dest_addr.hostname == localhost) && - (_brpc_dest_addr.port == config::brpc_port); + _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch), + _is_local((_brpc_dest_addr.hostname == BackendOptions::get_localhost()) && + (_brpc_dest_addr.port == config::brpc_port)), + _serializer(_parent, _is_local) { if (_is_local) { VLOG_NOTICE << "will use local Exchange, dest_node_id is : " << _dest_node_id; } @@ -385,7 +382,7 @@ protected: PBlock _ch_pb_block1; PBlock _ch_pb_block2; - std::unique_ptr _serializer; + BlockSerializer _serializer; }; #define HANDLE_CHANNEL_STATUS(state, channel, status) \ @@ -490,7 +487,7 @@ public: bool serialized = false; _pblock = std::make_unique(); RETURN_IF_ERROR( - _serializer->next_serialized_block(block, _pblock.get(), 1, &serialized, &rows)); + _serializer.next_serialized_block(block, _pblock.get(), 1, &serialized, &rows)); if (serialized) { RETURN_IF_ERROR(send_current_block(false)); } @@ -506,7 +503,7 @@ public: SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); if (eos) { _pblock = std::make_unique(); - RETURN_IF_ERROR(_serializer->serialize_block(_pblock.get(), 1)); + RETURN_IF_ERROR(_serializer.serialize_block(_pblock.get(), 1)); } RETURN_IF_ERROR(send_block(_pblock.release(), eos)); return Status::OK(); diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp index c5f4c0358e..5b2003d8c2 100644 --- a/be/src/vec/sink/vresult_file_sink.cpp +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -45,8 +45,9 @@ class TExpr; namespace doris::vectorized { -VResultFileSink::VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc, - const TResultFileSink& sink, int per_channel_buffer_size, +VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool, + const RowDescriptor& row_desc, const TResultFileSink& sink, + int per_channel_buffer_size, bool send_query_statistics_with_every_batch, const std::vector& t_output_expr) : _t_output_expr(t_output_expr), _row_desc(row_desc) { @@ -62,8 +63,8 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc _header = sink.header; } -VResultFileSink::VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, - const TResultFileSink& sink, +VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool, int sender_id, + const RowDescriptor& row_desc, const TResultFileSink& sink, const std::vector& destinations, int per_channel_buffer_size, bool send_query_statistics_with_every_batch, @@ -77,7 +78,7 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, int sender_id, const RowDescr _storage_type = sink.storage_backend_type; _is_top_sink = false; CHECK_EQ(destinations.size(), 1); - _stream_sender.reset(new VDataStreamSender(pool, sender_id, row_desc, sink.dest_node_id, + _stream_sender.reset(new VDataStreamSender(state, pool, sender_id, row_desc, sink.dest_node_id, destinations, per_channel_buffer_size, send_query_statistics_with_every_batch)); diff --git a/be/src/vec/sink/vresult_file_sink.h b/be/src/vec/sink/vresult_file_sink.h index 90bc06bb42..c8d2f3be18 100644 --- a/be/src/vec/sink/vresult_file_sink.h +++ b/be/src/vec/sink/vresult_file_sink.h @@ -46,11 +46,12 @@ class VExprContext; class VResultFileSink : public DataSink { public: - VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc, const TResultFileSink& sink, - int per_channel_buffer_size, bool send_query_statistics_with_every_batch, + VResultFileSink(RuntimeState* state, ObjectPool* pool, const RowDescriptor& row_desc, + const TResultFileSink& sink, int per_channel_buffer_size, + bool send_query_statistics_with_every_batch, const std::vector& t_output_expr); - VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, - const TResultFileSink& sink, + VResultFileSink(RuntimeState* state, ObjectPool* pool, int sender_id, + const RowDescriptor& row_desc, const TResultFileSink& sink, const std::vector& destinations, int per_channel_buffer_size, bool send_query_statistics_with_every_batch, const std::vector& t_output_expr, DescriptorTbl& descs);