[Feature][regression-test]CSV import and export support header (#8487) (#9017)

* [Feature][regression-test]CSV import and export support header (#8487)
1.Add two new types to stream load boker load: csv_with_names and csv_with_name_sand_types
2.Add two new types to export: csv_with_names and csv_with_names_and_types
This commit is contained in:
chenlinzhong
2022-04-15 13:44:36 +08:00
committed by GitHub
parent 419ec3b96c
commit ca4f4d199e
8 changed files with 464 additions and 1 deletions

View File

@ -41,4 +41,10 @@ testSuites = ""
// empty directories will test all directories
testDirectories = ""
customConf1 = "test_custom_conf_value"
customConf1 = "test_custom_conf_value"
// for test csv with header
hdfsFs = "hdfs://127.0.0.1:9000"
hdfsUser = "palo-qa"
hdfsPasswd = ""
brokerName = "broker_name"

View File

@ -0,0 +1,10 @@
2017-07-03,78,5,OlpJfkVz,176
2017-07-03,73,18,iVM0NyAH,49
2017-07-03,5,13,wvim4aqW,95
2017-07-03,20,12,ZksFGmLv,172
2017-07-03,39,15,Mf0RinDC,48
2017-07-03,55,9,VGq4T2kt,10
2017-07-03,50,12,p2TolhzU,149
2017-07-03,22,17,Q8kWnJyU,122
2017-07-03,54,16,upn9ZRPC,151
2017-07-03,22,10,Nzp8B0L2,94

View File

@ -0,0 +1,11 @@
event_day,siteid,citycode,username,pv
2017-07-03,77,18,rFphH1sk,165
2017-07-03,51,8,R47GKb1c,157
2017-07-03,82,3,uD7bYnZK,62
2017-07-03,78,8,Bf8K9rFd,122
2017-07-03,38,20,A8JHqEfY,194
2017-07-03,18,12,Emu786j5,84
2017-07-03,43,10,yklMRVYJ,132
2017-07-03,82,3,RvBK0g4o,118
2017-07-03,0,18,EMuWnD0y,9
2017-07-03,99,18,IjO9Hiof,132

View File

@ -0,0 +1,12 @@
event_day,siteid,citycode,username,pv
date,int,samllint,varchar,int
2017-07-03,77,18,rFphH1sk,165
2017-07-03,51,8,R47GKb1c,157
2017-07-03,82,3,uD7bYnZK,62
2017-07-03,78,8,Bf8K9rFd,122
2017-07-03,38,20,A8JHqEfY,194
2017-07-03,18,12,Emu786j5,84
2017-07-03,43,10,yklMRVYJ,132
2017-07-03,82,3,RvBK0g4o,118
2017-07-03,0,18,EMuWnD0y,9
2017-07-03,99,18,IjO9Hiof,1323

View File

@ -75,6 +75,7 @@ under the License.
<groovy-eclipse-batch.version>3.0.7-01</groovy-eclipse-batch.version>
<groovy-eclipse-compiler.version>3.7.0</groovy-eclipse-compiler.version>
<antlr.version>4.9.3</antlr.version>
<hadoop.version>2.8.0</hadoop.version>
</properties>
<build>
<plugins>
@ -259,5 +260,20 @@ under the License.
<artifactId>antlr4-runtime</artifactId>
<version>${antlr.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -29,6 +29,7 @@ import org.apache.doris.regression.action.StreamLoadAction
import org.apache.doris.regression.action.SuiteAction
import org.apache.doris.regression.action.TestAction
import org.apache.doris.regression.util.JdbcUtils
import org.apache.doris.regression.util.Hdfs
import org.junit.jupiter.api.Assertions
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@ -41,6 +42,8 @@ import java.util.stream.LongStream
import static org.apache.doris.regression.util.DataUtils.sortByToString
import java.io.File
class Suite implements GroovyInterceptable {
final SuiteContext context
final String name
@ -250,6 +253,66 @@ class Suite implements GroovyInterceptable {
runAction(new TestAction(context), actionSupplier)
}
String getBrokerName() {
String brokerName = context.config.otherConfigs.get("brokerName")
return brokerName
}
String getHdfsFs() {
String hdfsFs = context.config.otherConfigs.get("hdfsFs")
return hdfsFs
}
String getHdfsUser() {
String hdfsUser = context.config.otherConfigs.get("hdfsUser")
return hdfsUser
}
String getHdfsPasswd() {
String hdfsPasswd = context.config.otherConfigs.get("hdfsPasswd")
return hdfsPasswd
}
String getHdfsDataDir() {
String dataDir = context.config.dataPath + "/" + group + "/"
String hdfsFs = context.config.otherConfigs.get("hdfsFs")
String hdfsUser = context.config.otherConfigs.get("hdfsUser")
Hdfs hdfs = new Hdfs(hdfsFs, hdfsUser, dataDir)
return hdfs.genRemoteDataDir()
}
String uploadToHdfs(String localFile) {
String dataDir = context.config.dataPath + "/" + group + "/"
localFile = dataDir + localFile
String hdfsFs = context.config.otherConfigs.get("hdfsFs")
String hdfsUser = context.config.otherConfigs.get("hdfsUser")
Hdfs hdfs = new Hdfs(hdfsFs, hdfsUser, dataDir)
String remotePath = hdfs.upload(localFile)
return remotePath;
}
int getTotalLine(String filePath) {
def file = new File(filePath)
int lines = 0;
file.eachLine {
lines++;
}
return lines;
}
boolean deleteFile(String filePath) {
def file = new File(filePath)
file.delete()
}
List<String> downloadExportFromHdfs(String label) {
String dataDir = context.config.dataPath + "/" + group + "/"
String hdfsFs = context.config.otherConfigs.get("hdfsFs")
String hdfsUser = context.config.otherConfigs.get("hdfsUser")
Hdfs hdfs = new Hdfs(hdfsFs, hdfsUser, dataDir)
return hdfs.downLoad(label)
}
void streamLoad(Closure actionSupplier) {
runAction(new StreamLoadAction(context), actionSupplier)
}

View File

@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.regression.util
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.junit.Assert;
import java.io.FileOutputStream;
import java.io.IOException;
// upload file from "regression-test/data/hdfs" to "hdfs://xx/user/${userName}/groovy/"
// download file from "hdfs://xx/user/${userName}/groovy/" to "regression-test/data/hdfs"
class Hdfs {
FileSystem fs = null;
String uri;
String userName;
String testRemoteDir;
String localDataDir;
Hdfs(String uri, String username, String localDataDir) {
Configuration conf = new Configuration();
conf.setStrings("fs.default.name", uri);
this.uri = uri;
this.userName = username;
System.setProperty("HADOOP_USER_NAME", username);
conf.setBoolean("fs.hdfs.impl.disable.cache", true);
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
this.testRemoteDir = genRemoteDataDir();
this.localDataDir = localDataDir;
try {
fs = FileSystem.get(conf);
Path remoteDirPath = new Path(testRemoteDir);
if (!fs.exists(remoteDirPath)) {
fs.mkdirs(remoteDirPath);
}
} catch (IOException e) {
e.printStackTrace();
}
}
List<String> downLoad(String prefix) {
List<String> files = new ArrayList<>();
try {
String filepath = this.testRemoteDir + prefix + "*";
FileStatus[] fileStatusArray = fs.globStatus(new Path(filepath + "*"));
for (FileStatus fileStatus : fileStatusArray) {
Path path = fileStatus.getPath();
FSDataInputStream fsDataInputStream = fs.open(path);
String localFilePath = getAbsoluteLocalPath(prefix.split('/')[0] + path.getName())
FileOutputStream fileOutputStream = new FileOutputStream(localFilePath);
IOUtils.copy(fsDataInputStream, fileOutputStream);
files.add(localFilePath);
}
} catch (IOException e) {
e.printStackTrace();
}
return files;
}
String getAbsoluteLocalPath(String file_name) {
String localAbsolutePath = this.localDataDir + "/" + file_name;
return localAbsolutePath;
}
String genRemoteDataDir() {
return this.uri + "/user/" + this.userName + "/groovy/";
}
String getAbsoluteRemotePath(String file_name) {
String remoteAbsolutePath = genRemoteDataDir() + file_name;
return remoteAbsolutePath;
}
String upload(String local_file) {
try {
Path src = new Path(local_file);
String remote_file = getAbsoluteRemotePath(src.getName());
Path dst = new Path(remote_file);
fs.copyFromLocalFile(src, dst);
return remote_file;
} catch (IOException e) {
e.printStackTrace();
}
return "";
}
}

View File

@ -0,0 +1,236 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_csv_with_header", "demo") {
//define format
def format_csv = "csv"
def format_csv_with_names = "csv_with_names"
def format_csv_with_names_and_types = "csv_with_names_and_types"
//define format data file
def format_csv_file = "testheader/csv.txt"
def format_csv_with_names_file = "testheader/csv_with_names.txt"
def format_csv_with_names_and_types_file = "testheader/csv_with_names_and_types.txt"
def expect_rows = 10
def testTable = "test_csv_with_header"
def test_stream_load = {testTablex, label, format, file_path, exect_rows->
streamLoad {
table testTablex
set 'label',label
set 'column_separator',','
set 'format',format
file file_path
time 10000
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("success", json.Status.toLowerCase())
assertEquals(json.NumberTotalRows, exect_rows)
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
}
}
}
def import_from_hdfs = {testTable1, label1, hdfsFilePath, format1, brokerName, hdfsUser, hdfsPasswd->
sql """
LOAD LABEL ${label1} (
DATA INFILE("${hdfsFilePath}")
INTO TABLE ${testTable1} COLUMNS TERMINATED BY ","
FORMAT as "${format1}" )
with BROKER "${brokerName}"
("username"="${hdfsUser}", "password"="${hdfsPasswd}")
"""
}
def check_import_result = {checklabel, testTable4, expected_rows->
max_try_secs = 100000
while(max_try_secs--) {
result = sql "show load where label = '${checklabel}'"
if(result[0][2] == "FINISHED") {
result_count = sql "select count(*) from ${testTable4}"
assertEquals(result_count[0][0], expected_rows)
break
} else {
sleep(1000)
max_try_secs = max_try_secs - 1000
if(max_try_secs < 0) {
assertEquals(1, 2)
}
}
}
}
sql "DROP TABLE IF EXISTS ${testTable}"
def result1 = sql """
CREATE TABLE `${testTable}` (
`event_day` date NULL COMMENT "",
`siteid` int(11) NULL DEFAULT "10" COMMENT "",
`citycode` smallint(6) NULL COMMENT "",
`username` varchar(32) NULL DEFAULT "" COMMENT "",
`pv` bigint(20) NULL DEFAULT "0" COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`event_day`, `siteid`, `citycode`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`siteid`) BUCKETS 10
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
)
"""
//[stream load] format=csv
def label = UUID.randomUUID().toString()
test_stream_load.call(testTable, label, format_csv, format_csv_file, expect_rows)
//[stream load] format=csv_with_names
label = UUID.randomUUID().toString()
test_stream_load.call(testTable, label, format_csv_with_names, format_csv_with_names_file, expect_rows)
//[stream load] format=csv_with_names_and_types
label = UUID.randomUUID().toString()
test_stream_load.call(testTable, label, format_csv_with_names_and_types, format_csv_with_names_and_types_file, expect_rows)
// check total rows
def result_count = sql "select count(*) from ${testTable}"
assertEquals(result_count[0][0], expect_rows*3)
//test import data from hdfs
hdfsUser = getHdfsUser()
brokerName =getBrokerName()
hdfsPasswd = getHdfsPasswd()
hdfsFs = getHdfsFs()
//[broker load] test normal
label = UUID.randomUUID().toString().replaceAll("-", "")
remote_csv_file = uploadToHdfs format_csv_file
export_result = import_from_hdfs.call(testTable, label, remote_csv_file, format_csv, brokerName, hdfsUser, hdfsPasswd)
check_import_result.call(label, testTable, expect_rows * 4)
//[broker load] csv_with_names
label = UUID.randomUUID().toString().replaceAll("-", "")
remote_csv_file = uploadToHdfs format_csv_with_names_file
export_result = import_from_hdfs.call(testTable, label, remote_csv_file, format_csv_with_names, brokerName, hdfsUser, hdfsPasswd)
check_import_result.call(label, testTable, expect_rows * 5)
//[broker load] csv_with_names_and_types
label = UUID.randomUUID().toString().replaceAll("-", "")
remote_csv_file = uploadToHdfs format_csv_with_names_and_types_file
export_result = import_from_hdfs.call(testTable, label, remote_csv_file, format_csv_with_names_and_types, brokerName, hdfsUser, hdfsPasswd)
check_import_result.call(label, testTable, expect_rows * 6)
def export_to_hdfs = {exportTable, exportLable, hdfsPath, exportFormat, exportBrokerName, exportUserName, exportPasswd->
sql """ EXPORT TABLE ${exportTable}
TO "${hdfsPath}"
PROPERTIES ("label" = "${exportLable}", "column_separator"=",","format"="${exportFormat}")
WITH BROKER "${exportBrokerName}" ("username"="${exportUserName}", "password"="${exportPasswd}")
"""
}
def check_export_result = {checklabel1->
max_try_secs = 100000
while(max_try_secs--) {
result = sql "show export where label='${checklabel1}'"
if(result[0][2] == "FINISHED") {
break
} else {
sleep(1000)
max_try_secs = max_try_secs - 1000
if(max_try_secs < 0) {
assertEquals(1,2)
}
}
}
}
def check_download_result={resultlist, fileFormat, expectedTotalRows->
int totalLines = 0
if(fileFormat == format_csv_with_names) {
expectedTotalRows += resultlist.size()
}else if(fileFormat == format_csv_with_names_and_types) {
expectedTotalRows += resultlist.size() * 2
}
for(String oneFile :resultlist) {
totalLines += getTotalLine(oneFile)
deleteFile(oneFile)
}
assertEquals(expectedTotalRows,totalLines)
}
resultCount = sql "select count(*) from ${testTable}"
currentTotalRows = resultCount[0][0]
// export table to hdfs format=csv
hdfsDataDir = getHdfsDataDir()
label = UUID.randomUUID().toString().replaceAll("-", "")
export_to_hdfs.call(testTable, label, hdfsDataDir + "/" + label, format_csv, brokerName, hdfsUser, hdfsPasswd)
check_export_result(label)
result = downloadExportFromHdfs(label + "/export-data")
check_download_result(result, format_csv, currentTotalRows)
// export table to hdfs format=csv_with_names
label = UUID.randomUUID().toString().replaceAll("-", "")
export_to_hdfs.call(testTable, label, hdfsDataDir + "/" + label, format_csv_with_names, brokerName, hdfsUser, hdfsPasswd)
check_export_result(label)
result = downloadExportFromHdfs(label + "/export-data")
check_download_result(result, format_csv_with_names, currentTotalRows)
// export table to hdfs format=csv_with_names_and_types
label = UUID.randomUUID().toString().replaceAll("-", "")
export_to_hdfs.call(testTable, label, hdfsDataDir + "/" + label, format_csv_with_names_and_types, brokerName, hdfsUser, hdfsPasswd)
check_export_result(label)
result = downloadExportFromHdfs(label + "/export-data")
check_download_result(result, format_csv_with_names_and_types, currentTotalRows)
// select out file to hdfs
select_out_file = {outTable, outHdfsPath, outFormat, outHdfsFs, outBroker, outHdfsUser, outPasswd->
sql """
SELECT * FROM ${outTable}
INTO OUTFILE "${outHdfsPath}"
FORMAT AS "${outFormat}"
PROPERTIES
(
"broker.name" = "${outBroker}",
"column_separator" = ",",
"line_delimiter" = "\n",
"max_file_size" = "5MB",
"broker.username"="${hdfsUser}",
"broker.password"="${outPasswd}"
)
"""
}
// select out file to hdfs format=csv
label = UUID.randomUUID().toString().replaceAll("-", "")
select_out_file(testTable, hdfsDataDir + "/" + label + "/csv", format_csv, hdfsFs, brokerName, hdfsUser, hdfsPasswd)
result = downloadExportFromHdfs(label + "/csv")
check_download_result(result, format_csv, currentTotalRows)
// select out file to hdfs format=csv_with_names
label = UUID.randomUUID().toString().replaceAll("-", "")
select_out_file(testTable, hdfsDataDir + "/" + label + "/csv", format_csv_with_names, hdfsFs, brokerName, hdfsUser, hdfsPasswd)
result = downloadExportFromHdfs(label + "/csv")
check_download_result(result, format_csv_with_names, currentTotalRows)
// select out file to hdfs format=csv_with_names_and_types
label = UUID.randomUUID().toString().replaceAll("-", "")
select_out_file(testTable, hdfsDataDir + "/" + label + "/csv", format_csv_with_names_and_types, hdfsFs, brokerName, hdfsUser, hdfsPasswd)
result = downloadExportFromHdfs(label + "/csv")
check_download_result(result, format_csv_with_names_and_types, currentTotalRows)
}