[fix](vec)crashing caused by parallel output file (#17384)

This commit is contained in:
Jerry Hu
2023-03-03 19:03:53 +08:00
committed by GitHub
parent 9f97cd029f
commit 5c265d8183
5 changed files with 82 additions and 47 deletions

View File

@ -287,6 +287,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int
sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED);
std::map<int64_t, int64_t> fragment_id_to_channel_index;
_enable_pipeline_exec = state->enable_pipeline_exec();
for (int i = 0; i < destinations.size(); ++i) {
// Select first dest as transfer chain.
@ -294,7 +295,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int
const auto& fragment_instance_id = destinations[i].fragment_instance_id;
if (fragment_id_to_channel_index.find(fragment_instance_id.lo) ==
fragment_id_to_channel_index.end()) {
if (state->enable_pipeline_exec()) {
if (_enable_pipeline_exec) {
_channel_shared_ptrs.emplace_back(new PipChannel(
this, row_desc, destinations[i].brpc_server, fragment_instance_id,
sink.dest_node_id, per_channel_buffer_size, is_transfer_chain,
@ -314,7 +315,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int
}
}
_name = "VDataStreamSender";
if (state->enable_pipeline_exec()) {
if (_enable_pipeline_exec) {
_broadcast_pb_blocks.resize(config::num_broadcast_buffer);
_broadcast_pb_block_idx = 0;
} else {
@ -323,6 +324,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int
}
VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
PlanNodeId dest_node_id,
const std::vector<TPlanFragmentDestination>& destinations,
int per_channel_buffer_size,
bool send_query_statistics_with_every_batch)
@ -330,6 +332,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD
_pool(pool),
_row_desc(row_desc),
_current_channel_idx(0),
_part_type(TPartitionType::UNPARTITIONED),
_ignore_not_found(true),
_profile(nullptr),
_serialize_batch_timer(nullptr),
@ -342,9 +345,23 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD
_split_block_distribute_by_channel_timer(nullptr),
_blocks_sent_counter(nullptr),
_local_bytes_send_counter(nullptr),
_dest_node_id(0) {
_dest_node_id(dest_node_id) {
_cur_pb_block = &_pb_block1;
_name = "VDataStreamSender";
std::map<int64_t, int64_t> fragment_id_to_channel_index;
for (int i = 0; i < destinations.size(); ++i) {
const auto& fragment_instance_id = destinations[i].fragment_instance_id;
if (fragment_id_to_channel_index.find(fragment_instance_id.lo) ==
fragment_id_to_channel_index.end()) {
_channel_shared_ptrs.emplace_back(
new Channel(this, row_desc, destinations[i].brpc_server, fragment_instance_id,
_dest_node_id, per_channel_buffer_size, false,
send_query_statistics_with_every_batch));
}
fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
_channel_shared_ptrs.size() - 1);
_channels.push_back(_channel_shared_ptrs.back().get());
}
}
VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc,
@ -472,7 +489,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
for (auto channel : _channels) {
RETURN_IF_ERROR(channel->send_local_block(block));
}
} else if (state->enable_pipeline_exec()) {
} else if (_enable_pipeline_exec) {
BroadcastPBlockHolder* block_holder = nullptr;
RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
{

View File

@ -94,6 +94,7 @@ public:
int per_channel_buffer_size, bool send_query_statistics_with_every_batch);
VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
PlanNodeId dest_node_id,
const std::vector<TPlanFragmentDestination>& destinations,
int per_channel_buffer_size, bool send_query_statistics_with_every_batch);
@ -208,6 +209,7 @@ protected:
bool _new_shuffle_hash_method = false;
bool _only_local_exchange = false;
bool _enable_pipeline_exec = false;
};
class Channel {

View File

@ -32,9 +32,7 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc
const TResultFileSink& sink, int per_channel_buffer_size,
bool send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr)
: VDataStreamSender(pool, row_desc, per_channel_buffer_size,
send_query_statistics_with_every_batch),
_t_output_expr(t_output_expr) {
: _t_output_expr(t_output_expr), _row_desc(row_desc) {
CHECK(sink.__isset.file_options);
_file_opts.reset(new ResultFileOptions(sink.file_options));
CHECK(sink.__isset.storage_backend_type);
@ -53,20 +51,18 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, int sender_id, const RowDescr
int per_channel_buffer_size,
bool send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs)
: VDataStreamSender(pool, sender_id, row_desc, destinations, per_channel_buffer_size,
send_query_statistics_with_every_batch),
_t_output_expr(t_output_expr),
_output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false) {
: _t_output_expr(t_output_expr),
_output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false),
_row_desc(row_desc) {
CHECK(sink.__isset.file_options);
_file_opts.reset(new ResultFileOptions(sink.file_options));
CHECK(sink.__isset.storage_backend_type);
_storage_type = sink.storage_backend_type;
_is_top_sink = false;
DCHECK_EQ(destinations.size(), 1);
_channel_shared_ptrs.emplace_back(new Channel(
this, _output_row_descriptor, destinations[0].brpc_server,
destinations[0].fragment_instance_id, sink.dest_node_id, _buf_size, true, true));
_channels.push_back(_channel_shared_ptrs.back().get());
CHECK_EQ(destinations.size(), 1);
_stream_sender.reset(new VDataStreamSender(pool, sender_id, row_desc, sink.dest_node_id,
destinations, per_channel_buffer_size,
send_query_statistics_with_every_batch));
_name = "VResultFileSink";
//for impl csv_with_name and csv_with_names_and_types
@ -75,6 +71,9 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, int sender_id, const RowDescr
}
Status VResultFileSink::init(const TDataSink& tsink) {
if (!_is_top_sink) {
RETURN_IF_ERROR(_stream_sender->init(tsink));
}
return Status::OK();
}
@ -110,30 +109,24 @@ Status VResultFileSink::prepare(RuntimeState* state) {
_output_row_descriptor));
} else {
// init channel
_profile = _pool->add(new RuntimeProfile(title.str()));
_state = state;
_serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime");
_bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
_local_bytes_send_counter = ADD_COUNTER(profile(), "LocalBytesSent", TUnit::BYTES);
_uncompressed_bytes_counter =
ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
// create writer
_output_block.reset(new Block(_output_row_descriptor.tuple_descriptors()[0]->slots(), 1));
_writer.reset(new (std::nothrow) VFileResultWriter(
_file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs,
_profile, nullptr, _output_block.get(), state->return_object_data_as_binary(),
_output_row_descriptor));
RETURN_IF_ERROR(_stream_sender->prepare(state));
_profile->add_child(_stream_sender->profile(), true, nullptr);
}
_writer->set_header_info(_header_type, _header);
RETURN_IF_ERROR(_writer->init(state));
for (int i = 0; i < _channels.size(); ++i) {
RETURN_IF_ERROR(_channels[i]->init(state));
}
return Status::OK();
}
Status VResultFileSink::open(RuntimeState* state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VResultFileSink::open");
if (!_is_top_sink) {
RETURN_IF_ERROR(_stream_sender->open(state));
}
return VExpr::open(_output_vexpr_ctxs, state);
}
@ -169,25 +162,9 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) {
state->fragment_instance_id());
} else {
if (final_status.ok()) {
RETURN_IF_ERROR(serialize_block(_output_block.get(), _cur_pb_block, _channels.size()));
for (auto channel : _channels) {
RETURN_IF_ERROR(channel->send_block(_cur_pb_block));
}
}
Status final_st = Status::OK();
for (int i = 0; i < _channels.size(); ++i) {
Status st = _channels[i]->close(state);
if (!st.ok() && final_st.ok()) {
final_st = st;
}
}
// wait all channels to finish
for (int i = 0; i < _channels.size(); ++i) {
Status st = _channels[i]->close_wait(state);
if (!st.ok() && final_st.ok()) {
final_st = st;
}
RETURN_IF_ERROR(_stream_sender->send(state, _output_block.get(), true));
}
RETURN_IF_ERROR(_stream_sender->close(state, final_status));
_output_block->clear();
}
@ -201,7 +178,7 @@ void VResultFileSink::set_query_statistics(std::shared_ptr<QueryStatistics> stat
if (_is_top_sink) {
_sender->set_query_statistics(statistics);
} else {
_query_statistics = statistics;
_stream_sender->set_query_statistics(statistics);
}
}

View File

@ -24,7 +24,7 @@ namespace doris {
namespace vectorized {
class VResultWriter;
class VResultFileSink : public VDataStreamSender {
class VResultFileSink : public DataSink {
public:
VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc, const TResultFileSink& sink,
int per_channel_buffer_size, bool send_query_statistics_with_every_batch,
@ -47,6 +47,7 @@ public:
RuntimeProfile* profile() override { return _profile; }
void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) override;
const RowDescriptor& row_desc() const { return _row_desc; }
private:
Status prepare_exprs(RuntimeState* state);
@ -61,11 +62,15 @@ private:
std::unique_ptr<Block> _output_block = nullptr;
std::shared_ptr<BufferControlBlock> _sender;
std::unique_ptr<VDataStreamSender> _stream_sender;
std::shared_ptr<VResultWriter> _writer;
int _buf_size = 1024; // Allocated from _pool
bool _is_top_sink = true;
std::string _header;
std::string _header_type;
RowDescriptor _row_desc;
RuntimeProfile* _profile;
};
} // namespace vectorized
} // namespace doris

View File

@ -186,4 +186,38 @@ suite("test_outfile") {
path.delete();
}
}
// test parallel output
try {
File path = new File(outFilePath)
if (!path.exists()) {
assert path.mkdirs()
} else {
throw new IllegalStateException("""${outFilePath} already exists! """)
}
sql """drop table if exists select_into_file"""
sql """CREATE TABLE `select_into_file` (
`id` int,
`name` varchar(30)
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);"""
sql """insert into select_into_file values(1, "b"),(2, "z"),(3, "a"),
(4, "c"), (5, "睿"), (6, "多"), (7, "丝"), (8, "test"),
(100, "aa"), (111, "bb"), (123, "cc"), (222, "dd");"""
sql "set enable_parallel_outfile = true;"
sql """select * from select_into_file into outfile "file://${outFilePath}/" properties("success_file_name" = "SUCCESS");"""
} finally {
try_sql("DROP TABLE IF EXISTS select_into_file")
File path = new File(outFilePath)
if (path.exists()) {
for (File f: path.listFiles()) {
f.delete();
}
path.delete();
}
}
}