[improve](outfile) add file_suffix options for outfile (#24334)

This commit is contained in:
wudi
2023-09-15 12:58:41 +08:00
committed by GitHub
parent 3e0933a507
commit 29fe87982f
8 changed files with 94 additions and 1 deletions

View File

@ -70,6 +70,7 @@ struct ResultFileOptions {
std::string orc_schema;
bool delete_existing_files = false;
std::string file_suffix;
ResultFileOptions(const TResultFileSinkOptions& t_opt) {
file_path = t_opt.file_path;
@ -80,6 +81,7 @@ struct ResultFileOptions {
t_opt.__isset.max_file_size_bytes ? t_opt.max_file_size_bytes : max_file_size_bytes;
delete_existing_files =
t_opt.__isset.delete_existing_files ? t_opt.delete_existing_files : false;
file_suffix = t_opt.file_suffix;
is_local_file = true;
if (t_opt.__isset.broker_addresses) {

View File

@ -172,9 +172,11 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
// file name format as: my_prefix_{fragment_instance_id}_0.csv
Status VFileResultWriter::_get_next_file_name(std::string* file_name) {
std::string suffix =
_file_opts->file_suffix.empty() ? _file_format_to_name() : _file_opts->file_suffix;
std::stringstream ss;
ss << _file_opts->file_path << print_id(_fragment_instance_id) << "_" << (_file_idx++) << "."
<< _file_format_to_name();
<< suffix;
*file_name = ss.str();
if (_storage_type == TStorageBackendType::LOCAL) {
// For local file writer, the file_path is a local dir.

View File

@ -80,6 +80,7 @@ illustrate:
line_delimiter: line delimiter,is only for CSV format. mulit-bytes supported starting in version 1.2, such as: "\\x01", "abc".
max_file_size: the size limit of a single file, if the result exceeds this value, it will be cut into multiple files, the value range of max_file_size is [5MB, 2GB] and the default is 1GB. (When specified that the file format is ORC, the size of the actual division file will be a multiples of 64MB, such as: specify max_file_size = 5MB, and actually use 64MB as the division; specify max_file_size = 65MB, and will actually use 128MB as cut division points.)
delete_existing_files: default `false`. If it is specified as true, you will first delete all files specified in the directory specified by the file_path, and then export the data to the directory.For example: "file_path" = "/user/tmp", then delete all files and directory under "/user/"; "file_path" = "/user/tmp/", then delete all files and directory under "/user/tmp/"
file_suffix: Specify the suffix of the export file
Broker related properties need to be prefixed with `broker.`:
broker.name: broker name

View File

@ -84,6 +84,7 @@ INTO OUTFILE "file_path"
line_delimiter: 行分隔符,只支持csv格式。在 1.2 版本开始支持多字节分隔符,如:"\\x01", "abc"。
max_file_size: 单个文件大小限制,如果结果超过这个值,将切割成多个文件, max_file_size取值范围是[5MB, 2GB], 默认为1GB。(当指定导出为orc文件格式时,实际切分文件的大小将是64MB的倍数,如:指定max_file_size = 5MB, 实际将以64MB为切分;指定max_file_size = 65MB, 实际将以128MB为切分)
delete_existing_files: 默认为false,若指定为true,则会先删除file_path指定的目录下的所有文件,然后导出数据到该目录下。例如:"file_path" = "/user/tmp", 则会删除"/user/"下所有文件及目录;"file_path" = "/user/tmp/", 则会删除"/user/tmp/"下所有文件及目录
file_suffix: 指定导出文件的后缀
Broker 相关属性需加前缀 `broker.`:
broker.name: broker名称

View File

@ -139,6 +139,8 @@ public class OutFileClause {
public static final String PROP_MAX_FILE_SIZE = "max_file_size";
private static final String PROP_SUCCESS_FILE_NAME = "success_file_name";
public static final String PROP_DELETE_EXISTING_FILES = "delete_existing_files";
public static final String PROP_FILE_SUFFIX = "file_suffix";
private static final String PARQUET_PROP_PREFIX = "parquet.";
private static final String SCHEMA = "schema";
@ -156,6 +158,7 @@ public class OutFileClause {
private TFileFormatType fileFormatType;
private long maxFileSizeBytes = DEFAULT_MAX_FILE_SIZE_BYTES;
private boolean deleteExistingFiles = false;
private String fileSuffix = "";
private BrokerDesc brokerDesc = null;
// True if result is written to local disk.
// If set to true, the brokerDesc must be null.
@ -643,6 +646,11 @@ public class OutFileClause {
processedPropKeys.add(PROP_DELETE_EXISTING_FILES);
}
if (properties.containsKey(PROP_FILE_SUFFIX)) {
fileSuffix = properties.get(PROP_FILE_SUFFIX);
processedPropKeys.add(PROP_FILE_SUFFIX);
}
if (properties.containsKey(PROP_SUCCESS_FILE_NAME)) {
successFileName = properties.get(PROP_SUCCESS_FILE_NAME);
FeNameFormat.checkCommonName("file name", successFileName);
@ -880,6 +888,8 @@ public class OutFileClause {
}
sinkOptions.setMaxFileSizeBytes(maxFileSizeBytes);
sinkOptions.setDeleteExistingFiles(deleteExistingFiles);
sinkOptions.setFileSuffix(fileSuffix);
if (brokerDesc != null) {
sinkOptions.setBrokerProperties(brokerDesc.getProperties());
// broker_addresses of sinkOptions will be set in Coordinator.

View File

@ -126,6 +126,7 @@ struct TResultFileSinkOptions {
15: optional string orc_schema
16: optional bool delete_existing_files;
17: optional string file_suffix;
}
struct TMemoryScratchSink {

View File

@ -0,0 +1,4 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select --
zhangsan

View File

@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_outfile_file_suffix", "p0") {
// open nereids
sql """ set enable_nereids_planner=true """
sql """ set enable_fallback_to_original_planner=false """
String ak = getS3AK()
String sk = getS3SK()
String s3_endpoint = getS3Endpoint()
String region = getS3Region()
String bucket = context.config.otherConfigs.get("s3BucketName");
def create_table = {table_name ->
sql """ DROP TABLE IF EXISTS ${table_name} """
sql """
CREATE TABLE IF NOT EXISTS ${table_name} (
`name` varchar(128) NOT NULL COMMENT ""
)
DISTRIBUTED BY HASH(name) PROPERTIES("replication_num" = "1");
"""
sql """ INSERT INTO ${table_name} values('zhangsan');"""
}
def table_name = "test_outfile_file_suffix"
create_table(table_name)
def outFilePath = """s3://${bucket}/outfile_"""
def csv_suffix_result = { file_suffix, file_format ->
result = sql """
select * from ${table_name}
into outfile "${outFilePath}"
FORMAT AS ${file_format}
PROPERTIES(
"s3.endpoint" = "${s3_endpoint}",
"s3.region" = "${region}",
"s3.secret_key"="${sk}",
"s3.access_key" = "${ak}",
"file_suffix" = "${file_suffix}"
);
"""
return result[0][3]
}
def file_suffix = "txt";
def file_format = "csv";
def outfile_url = csv_suffix_result(file_suffix, file_format);
print("http://${s3_endpoint}${outfile_url.substring(4)}0.${file_suffix}")
qt_select """ select * from s3(
"uri" = "http://${s3_endpoint}${outfile_url.substring(4)}0.${file_suffix}",
"ACCESS_KEY"= "${ak}",
"SECRET_KEY" = "${sk}",
"format" = "${file_format}",
"region" = "${region}"
);
"""
}