From 5c265d818356f045ff94ed1043c08e1dfefd710f Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Fri, 3 Mar 2023 19:03:53 +0800 Subject: [PATCH] [fix](vec)crashing caused by parallel output file (#17384) --- be/src/vec/sink/vdata_stream_sender.cpp | 25 ++++++-- be/src/vec/sink/vdata_stream_sender.h | 2 + be/src/vec/sink/vresult_file_sink.cpp | 61 ++++++------------- be/src/vec/sink/vresult_file_sink.h | 7 ++- .../suites/export_p0/test_outfile.groovy | 34 +++++++++++ 5 files changed, 82 insertions(+), 47 deletions(-) diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index ee55a2c8b7..47204ba8c4 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -287,6 +287,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED); std::map fragment_id_to_channel_index; + _enable_pipeline_exec = state->enable_pipeline_exec(); for (int i = 0; i < destinations.size(); ++i) { // Select first dest as transfer chain. @@ -294,7 +295,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int const auto& fragment_instance_id = destinations[i].fragment_instance_id; if (fragment_id_to_channel_index.find(fragment_instance_id.lo) == fragment_id_to_channel_index.end()) { - if (state->enable_pipeline_exec()) { + if (_enable_pipeline_exec) { _channel_shared_ptrs.emplace_back(new PipChannel( this, row_desc, destinations[i].brpc_server, fragment_instance_id, sink.dest_node_id, per_channel_buffer_size, is_transfer_chain, @@ -314,7 +315,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int } } _name = "VDataStreamSender"; - if (state->enable_pipeline_exec()) { + if (_enable_pipeline_exec) { _broadcast_pb_blocks.resize(config::num_broadcast_buffer); _broadcast_pb_block_idx = 0; } else { @@ -323,6 +324,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int } VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, + PlanNodeId dest_node_id, const std::vector& destinations, int per_channel_buffer_size, bool send_query_statistics_with_every_batch) @@ -330,6 +332,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD _pool(pool), _row_desc(row_desc), _current_channel_idx(0), + _part_type(TPartitionType::UNPARTITIONED), _ignore_not_found(true), _profile(nullptr), _serialize_batch_timer(nullptr), @@ -342,9 +345,23 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD _split_block_distribute_by_channel_timer(nullptr), _blocks_sent_counter(nullptr), _local_bytes_send_counter(nullptr), - _dest_node_id(0) { + _dest_node_id(dest_node_id) { _cur_pb_block = &_pb_block1; _name = "VDataStreamSender"; + std::map fragment_id_to_channel_index; + for (int i = 0; i < destinations.size(); ++i) { + const auto& fragment_instance_id = destinations[i].fragment_instance_id; + if (fragment_id_to_channel_index.find(fragment_instance_id.lo) == + fragment_id_to_channel_index.end()) { + _channel_shared_ptrs.emplace_back( + new Channel(this, row_desc, destinations[i].brpc_server, fragment_instance_id, + _dest_node_id, per_channel_buffer_size, false, + send_query_statistics_with_every_batch)); + } + fragment_id_to_channel_index.emplace(fragment_instance_id.lo, + _channel_shared_ptrs.size() - 1); + _channels.push_back(_channel_shared_ptrs.back().get()); + } } VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc, @@ -472,7 +489,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { for (auto channel : _channels) { RETURN_IF_ERROR(channel->send_local_block(block)); } - } else if (state->enable_pipeline_exec()) { + } else if (_enable_pipeline_exec) { BroadcastPBlockHolder* block_holder = nullptr; RETURN_IF_ERROR(_get_next_available_buffer(&block_holder)); { diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 5b2da70ce1..5bf116af12 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -94,6 +94,7 @@ public: int per_channel_buffer_size, bool send_query_statistics_with_every_batch); VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, + PlanNodeId dest_node_id, const std::vector& destinations, int per_channel_buffer_size, bool send_query_statistics_with_every_batch); @@ -208,6 +209,7 @@ protected: bool _new_shuffle_hash_method = false; bool _only_local_exchange = false; + bool _enable_pipeline_exec = false; }; class Channel { diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp index d85318b297..07d0c45927 100644 --- a/be/src/vec/sink/vresult_file_sink.cpp +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -32,9 +32,7 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc const TResultFileSink& sink, int per_channel_buffer_size, bool send_query_statistics_with_every_batch, const std::vector& t_output_expr) - : VDataStreamSender(pool, row_desc, per_channel_buffer_size, - send_query_statistics_with_every_batch), - _t_output_expr(t_output_expr) { + : _t_output_expr(t_output_expr), _row_desc(row_desc) { CHECK(sink.__isset.file_options); _file_opts.reset(new ResultFileOptions(sink.file_options)); CHECK(sink.__isset.storage_backend_type); @@ -53,20 +51,18 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, int sender_id, const RowDescr int per_channel_buffer_size, bool send_query_statistics_with_every_batch, const std::vector& t_output_expr, DescriptorTbl& descs) - : VDataStreamSender(pool, sender_id, row_desc, destinations, per_channel_buffer_size, - send_query_statistics_with_every_batch), - _t_output_expr(t_output_expr), - _output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false) { + : _t_output_expr(t_output_expr), + _output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false), + _row_desc(row_desc) { CHECK(sink.__isset.file_options); _file_opts.reset(new ResultFileOptions(sink.file_options)); CHECK(sink.__isset.storage_backend_type); _storage_type = sink.storage_backend_type; _is_top_sink = false; - DCHECK_EQ(destinations.size(), 1); - _channel_shared_ptrs.emplace_back(new Channel( - this, _output_row_descriptor, destinations[0].brpc_server, - destinations[0].fragment_instance_id, sink.dest_node_id, _buf_size, true, true)); - _channels.push_back(_channel_shared_ptrs.back().get()); + CHECK_EQ(destinations.size(), 1); + _stream_sender.reset(new VDataStreamSender(pool, sender_id, row_desc, sink.dest_node_id, + destinations, per_channel_buffer_size, + send_query_statistics_with_every_batch)); _name = "VResultFileSink"; //for impl csv_with_name and csv_with_names_and_types @@ -75,6 +71,9 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, int sender_id, const RowDescr } Status VResultFileSink::init(const TDataSink& tsink) { + if (!_is_top_sink) { + RETURN_IF_ERROR(_stream_sender->init(tsink)); + } return Status::OK(); } @@ -110,30 +109,24 @@ Status VResultFileSink::prepare(RuntimeState* state) { _output_row_descriptor)); } else { // init channel - _profile = _pool->add(new RuntimeProfile(title.str())); - _state = state; - _serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime"); - _bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES); - _local_bytes_send_counter = ADD_COUNTER(profile(), "LocalBytesSent", TUnit::BYTES); - _uncompressed_bytes_counter = - ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES); - // create writer _output_block.reset(new Block(_output_row_descriptor.tuple_descriptors()[0]->slots(), 1)); _writer.reset(new (std::nothrow) VFileResultWriter( _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs, _profile, nullptr, _output_block.get(), state->return_object_data_as_binary(), _output_row_descriptor)); + RETURN_IF_ERROR(_stream_sender->prepare(state)); + _profile->add_child(_stream_sender->profile(), true, nullptr); } _writer->set_header_info(_header_type, _header); RETURN_IF_ERROR(_writer->init(state)); - for (int i = 0; i < _channels.size(); ++i) { - RETURN_IF_ERROR(_channels[i]->init(state)); - } return Status::OK(); } Status VResultFileSink::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "VResultFileSink::open"); + if (!_is_top_sink) { + RETURN_IF_ERROR(_stream_sender->open(state)); + } return VExpr::open(_output_vexpr_ctxs, state); } @@ -169,25 +162,9 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) { state->fragment_instance_id()); } else { if (final_status.ok()) { - RETURN_IF_ERROR(serialize_block(_output_block.get(), _cur_pb_block, _channels.size())); - for (auto channel : _channels) { - RETURN_IF_ERROR(channel->send_block(_cur_pb_block)); - } - } - Status final_st = Status::OK(); - for (int i = 0; i < _channels.size(); ++i) { - Status st = _channels[i]->close(state); - if (!st.ok() && final_st.ok()) { - final_st = st; - } - } - // wait all channels to finish - for (int i = 0; i < _channels.size(); ++i) { - Status st = _channels[i]->close_wait(state); - if (!st.ok() && final_st.ok()) { - final_st = st; - } + RETURN_IF_ERROR(_stream_sender->send(state, _output_block.get(), true)); } + RETURN_IF_ERROR(_stream_sender->close(state, final_status)); _output_block->clear(); } @@ -201,7 +178,7 @@ void VResultFileSink::set_query_statistics(std::shared_ptr stat if (_is_top_sink) { _sender->set_query_statistics(statistics); } else { - _query_statistics = statistics; + _stream_sender->set_query_statistics(statistics); } } diff --git a/be/src/vec/sink/vresult_file_sink.h b/be/src/vec/sink/vresult_file_sink.h index ba63cad517..038d825acc 100644 --- a/be/src/vec/sink/vresult_file_sink.h +++ b/be/src/vec/sink/vresult_file_sink.h @@ -24,7 +24,7 @@ namespace doris { namespace vectorized { class VResultWriter; -class VResultFileSink : public VDataStreamSender { +class VResultFileSink : public DataSink { public: VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc, const TResultFileSink& sink, int per_channel_buffer_size, bool send_query_statistics_with_every_batch, @@ -47,6 +47,7 @@ public: RuntimeProfile* profile() override { return _profile; } void set_query_statistics(std::shared_ptr statistics) override; + const RowDescriptor& row_desc() const { return _row_desc; } private: Status prepare_exprs(RuntimeState* state); @@ -61,11 +62,15 @@ private: std::unique_ptr _output_block = nullptr; std::shared_ptr _sender; + std::unique_ptr _stream_sender; std::shared_ptr _writer; int _buf_size = 1024; // Allocated from _pool bool _is_top_sink = true; std::string _header; std::string _header_type; + + RowDescriptor _row_desc; + RuntimeProfile* _profile; }; } // namespace vectorized } // namespace doris diff --git a/regression-test/suites/export_p0/test_outfile.groovy b/regression-test/suites/export_p0/test_outfile.groovy index 4fa3350268..d2ceb46c5e 100644 --- a/regression-test/suites/export_p0/test_outfile.groovy +++ b/regression-test/suites/export_p0/test_outfile.groovy @@ -186,4 +186,38 @@ suite("test_outfile") { path.delete(); } } + + // test parallel output + try { + File path = new File(outFilePath) + if (!path.exists()) { + assert path.mkdirs() + } else { + throw new IllegalStateException("""${outFilePath} already exists! """) + } + sql """drop table if exists select_into_file""" + sql """CREATE TABLE `select_into_file` ( + `id` int, + `name` varchar(30) + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 2 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + sql """insert into select_into_file values(1, "b"),(2, "z"),(3, "a"), + (4, "c"), (5, "睿"), (6, "多"), (7, "丝"), (8, "test"), + (100, "aa"), (111, "bb"), (123, "cc"), (222, "dd");""" + sql "set enable_parallel_outfile = true;" + sql """select * from select_into_file into outfile "file://${outFilePath}/" properties("success_file_name" = "SUCCESS");""" + } finally { + try_sql("DROP TABLE IF EXISTS select_into_file") + File path = new File(outFilePath) + if (path.exists()) { + for (File f: path.listFiles()) { + f.delete(); + } + path.delete(); + } + } }