diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index bfbd6ed875..d1b146c34b 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -41,4 +41,10 @@ testSuites = "" // empty directories will test all directories testDirectories = "" -customConf1 = "test_custom_conf_value" \ No newline at end of file +customConf1 = "test_custom_conf_value" + +// for test csv with header +hdfsFs = "hdfs://127.0.0.1:9000" +hdfsUser = "palo-qa" +hdfsPasswd = "" +brokerName = "broker_name" diff --git a/regression-test/data/demo/testheader/csv.txt b/regression-test/data/demo/testheader/csv.txt new file mode 100644 index 0000000000..f05d1e852f --- /dev/null +++ b/regression-test/data/demo/testheader/csv.txt @@ -0,0 +1,10 @@ +2017-07-03,78,5,OlpJfkVz,176 +2017-07-03,73,18,iVM0NyAH,49 +2017-07-03,5,13,wvim4aqW,95 +2017-07-03,20,12,ZksFGmLv,172 +2017-07-03,39,15,Mf0RinDC,48 +2017-07-03,55,9,VGq4T2kt,10 +2017-07-03,50,12,p2TolhzU,149 +2017-07-03,22,17,Q8kWnJyU,122 +2017-07-03,54,16,upn9ZRPC,151 +2017-07-03,22,10,Nzp8B0L2,94 \ No newline at end of file diff --git a/regression-test/data/demo/testheader/csv_with_names.txt b/regression-test/data/demo/testheader/csv_with_names.txt new file mode 100644 index 0000000000..d92e98544e --- /dev/null +++ b/regression-test/data/demo/testheader/csv_with_names.txt @@ -0,0 +1,11 @@ +event_day,siteid,citycode,username,pv +2017-07-03,77,18,rFphH1sk,165 +2017-07-03,51,8,R47GKb1c,157 +2017-07-03,82,3,uD7bYnZK,62 +2017-07-03,78,8,Bf8K9rFd,122 +2017-07-03,38,20,A8JHqEfY,194 +2017-07-03,18,12,Emu786j5,84 +2017-07-03,43,10,yklMRVYJ,132 +2017-07-03,82,3,RvBK0g4o,118 +2017-07-03,0,18,EMuWnD0y,9 +2017-07-03,99,18,IjO9Hiof,132 \ No newline at end of file diff --git a/regression-test/data/demo/testheader/csv_with_names_and_types.txt b/regression-test/data/demo/testheader/csv_with_names_and_types.txt new file mode 100644 index 0000000000..665b008bf0 --- /dev/null +++ b/regression-test/data/demo/testheader/csv_with_names_and_types.txt @@ -0,0 +1,12 @@ +event_day,siteid,citycode,username,pv +date,int,samllint,varchar,int +2017-07-03,77,18,rFphH1sk,165 +2017-07-03,51,8,R47GKb1c,157 +2017-07-03,82,3,uD7bYnZK,62 +2017-07-03,78,8,Bf8K9rFd,122 +2017-07-03,38,20,A8JHqEfY,194 +2017-07-03,18,12,Emu786j5,84 +2017-07-03,43,10,yklMRVYJ,132 +2017-07-03,82,3,RvBK0g4o,118 +2017-07-03,0,18,EMuWnD0y,9 +2017-07-03,99,18,IjO9Hiof,1323 \ No newline at end of file diff --git a/regression-test/framework/pom.xml b/regression-test/framework/pom.xml index a665c8f84d..f90d2a7214 100644 --- a/regression-test/framework/pom.xml +++ b/regression-test/framework/pom.xml @@ -75,6 +75,7 @@ under the License. 3.0.7-01 3.7.0 4.9.3 + 2.8.0 @@ -259,5 +260,20 @@ under the License. antlr4-runtime ${antlr.version} + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 17a0d152bf..ff6f733f15 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -29,6 +29,7 @@ import org.apache.doris.regression.action.StreamLoadAction import org.apache.doris.regression.action.SuiteAction import org.apache.doris.regression.action.TestAction import org.apache.doris.regression.util.JdbcUtils +import org.apache.doris.regression.util.Hdfs import org.junit.jupiter.api.Assertions import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -41,6 +42,8 @@ import java.util.stream.LongStream import static org.apache.doris.regression.util.DataUtils.sortByToString +import java.io.File + class Suite implements GroovyInterceptable { final SuiteContext context final String name @@ -250,6 +253,66 @@ class Suite implements GroovyInterceptable { runAction(new TestAction(context), actionSupplier) } + String getBrokerName() { + String brokerName = context.config.otherConfigs.get("brokerName") + return brokerName + } + + String getHdfsFs() { + String hdfsFs = context.config.otherConfigs.get("hdfsFs") + return hdfsFs + } + + String getHdfsUser() { + String hdfsUser = context.config.otherConfigs.get("hdfsUser") + return hdfsUser + } + + String getHdfsPasswd() { + String hdfsPasswd = context.config.otherConfigs.get("hdfsPasswd") + return hdfsPasswd + } + + String getHdfsDataDir() { + String dataDir = context.config.dataPath + "/" + group + "/" + String hdfsFs = context.config.otherConfigs.get("hdfsFs") + String hdfsUser = context.config.otherConfigs.get("hdfsUser") + Hdfs hdfs = new Hdfs(hdfsFs, hdfsUser, dataDir) + return hdfs.genRemoteDataDir() + } + + String uploadToHdfs(String localFile) { + String dataDir = context.config.dataPath + "/" + group + "/" + localFile = dataDir + localFile + String hdfsFs = context.config.otherConfigs.get("hdfsFs") + String hdfsUser = context.config.otherConfigs.get("hdfsUser") + Hdfs hdfs = new Hdfs(hdfsFs, hdfsUser, dataDir) + String remotePath = hdfs.upload(localFile) + return remotePath; + } + + int getTotalLine(String filePath) { + def file = new File(filePath) + int lines = 0; + file.eachLine { + lines++; + } + return lines; + } + + boolean deleteFile(String filePath) { + def file = new File(filePath) + file.delete() + } + + List downloadExportFromHdfs(String label) { + String dataDir = context.config.dataPath + "/" + group + "/" + String hdfsFs = context.config.otherConfigs.get("hdfsFs") + String hdfsUser = context.config.otherConfigs.get("hdfsUser") + Hdfs hdfs = new Hdfs(hdfsFs, hdfsUser, dataDir) + return hdfs.downLoad(label) + } + void streamLoad(Closure actionSupplier) { runAction(new StreamLoadAction(context), actionSupplier) } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Hdfs.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Hdfs.groovy new file mode 100644 index 0000000000..c0b59d3e77 --- /dev/null +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Hdfs.groovy @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.regression.util + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileStatus; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.junit.Assert; + +import java.io.FileOutputStream; +import java.io.IOException; + +// upload file from "regression-test/data/hdfs" to "hdfs://xx/user/${userName}/groovy/" +// download file from "hdfs://xx/user/${userName}/groovy/" to "regression-test/data/hdfs" +class Hdfs { + FileSystem fs = null; + String uri; + String userName; + String testRemoteDir; + String localDataDir; + + Hdfs(String uri, String username, String localDataDir) { + Configuration conf = new Configuration(); + conf.setStrings("fs.default.name", uri); + this.uri = uri; + this.userName = username; + System.setProperty("HADOOP_USER_NAME", username); + conf.setBoolean("fs.hdfs.impl.disable.cache", true); + conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); + conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem"); + this.testRemoteDir = genRemoteDataDir(); + this.localDataDir = localDataDir; + try { + fs = FileSystem.get(conf); + Path remoteDirPath = new Path(testRemoteDir); + if (!fs.exists(remoteDirPath)) { + fs.mkdirs(remoteDirPath); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + List downLoad(String prefix) { + List files = new ArrayList<>(); + try { + String filepath = this.testRemoteDir + prefix + "*"; + FileStatus[] fileStatusArray = fs.globStatus(new Path(filepath + "*")); + for (FileStatus fileStatus : fileStatusArray) { + Path path = fileStatus.getPath(); + FSDataInputStream fsDataInputStream = fs.open(path); + String localFilePath = getAbsoluteLocalPath(prefix.split('/')[0] + path.getName()) + FileOutputStream fileOutputStream = new FileOutputStream(localFilePath); + IOUtils.copy(fsDataInputStream, fileOutputStream); + files.add(localFilePath); + } + } catch (IOException e) { + e.printStackTrace(); + } + return files; + } + + String getAbsoluteLocalPath(String file_name) { + String localAbsolutePath = this.localDataDir + "/" + file_name; + return localAbsolutePath; + } + + String genRemoteDataDir() { + return this.uri + "/user/" + this.userName + "/groovy/"; + } + + String getAbsoluteRemotePath(String file_name) { + String remoteAbsolutePath = genRemoteDataDir() + file_name; + return remoteAbsolutePath; + } + + String upload(String local_file) { + try { + Path src = new Path(local_file); + String remote_file = getAbsoluteRemotePath(src.getName()); + Path dst = new Path(remote_file); + fs.copyFromLocalFile(src, dst); + return remote_file; + } catch (IOException e) { + e.printStackTrace(); + } + return ""; + } +} \ No newline at end of file diff --git a/regression-test/suites/demo/test_csv_with_header.groovy b/regression-test/suites/demo/test_csv_with_header.groovy new file mode 100644 index 0000000000..42eaf09a8a --- /dev/null +++ b/regression-test/suites/demo/test_csv_with_header.groovy @@ -0,0 +1,236 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_csv_with_header", "demo") { + //define format + def format_csv = "csv" + def format_csv_with_names = "csv_with_names" + def format_csv_with_names_and_types = "csv_with_names_and_types" + //define format data file + def format_csv_file = "testheader/csv.txt" + def format_csv_with_names_file = "testheader/csv_with_names.txt" + def format_csv_with_names_and_types_file = "testheader/csv_with_names_and_types.txt" + def expect_rows = 10 + def testTable = "test_csv_with_header" + + def test_stream_load = {testTablex, label, format, file_path, exect_rows-> + streamLoad { + table testTablex + set 'label',label + set 'column_separator',',' + set 'format',format + file file_path + time 10000 + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, exect_rows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + def import_from_hdfs = {testTable1, label1, hdfsFilePath, format1, brokerName, hdfsUser, hdfsPasswd-> + sql """ + LOAD LABEL ${label1} ( + DATA INFILE("${hdfsFilePath}") + INTO TABLE ${testTable1} COLUMNS TERMINATED BY "," + FORMAT as "${format1}" ) + with BROKER "${brokerName}" + ("username"="${hdfsUser}", "password"="${hdfsPasswd}") + """ + } + + def check_import_result = {checklabel, testTable4, expected_rows-> + max_try_secs = 100000 + while(max_try_secs--) { + result = sql "show load where label = '${checklabel}'" + if(result[0][2] == "FINISHED") { + result_count = sql "select count(*) from ${testTable4}" + assertEquals(result_count[0][0], expected_rows) + break + } else { + sleep(1000) + max_try_secs = max_try_secs - 1000 + if(max_try_secs < 0) { + assertEquals(1, 2) + } + } + } + } + + sql "DROP TABLE IF EXISTS ${testTable}" + def result1 = sql """ + CREATE TABLE `${testTable}` ( + `event_day` date NULL COMMENT "", + `siteid` int(11) NULL DEFAULT "10" COMMENT "", + `citycode` smallint(6) NULL COMMENT "", + `username` varchar(32) NULL DEFAULT "" COMMENT "", + `pv` bigint(20) NULL DEFAULT "0" COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`event_day`, `siteid`, `citycode`) + COMMENT "OLAP" + DISTRIBUTED BY HASH(`siteid`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ) + """ + //[stream load] format=csv + def label = UUID.randomUUID().toString() + test_stream_load.call(testTable, label, format_csv, format_csv_file, expect_rows) + + //[stream load] format=csv_with_names + label = UUID.randomUUID().toString() + test_stream_load.call(testTable, label, format_csv_with_names, format_csv_with_names_file, expect_rows) + + //[stream load] format=csv_with_names_and_types + label = UUID.randomUUID().toString() + test_stream_load.call(testTable, label, format_csv_with_names_and_types, format_csv_with_names_and_types_file, expect_rows) + + // check total rows + def result_count = sql "select count(*) from ${testTable}" + assertEquals(result_count[0][0], expect_rows*3) + + + //test import data from hdfs + hdfsUser = getHdfsUser() + brokerName =getBrokerName() + hdfsPasswd = getHdfsPasswd() + hdfsFs = getHdfsFs() + //[broker load] test normal + label = UUID.randomUUID().toString().replaceAll("-", "") + remote_csv_file = uploadToHdfs format_csv_file + export_result = import_from_hdfs.call(testTable, label, remote_csv_file, format_csv, brokerName, hdfsUser, hdfsPasswd) + check_import_result.call(label, testTable, expect_rows * 4) + + //[broker load] csv_with_names + label = UUID.randomUUID().toString().replaceAll("-", "") + remote_csv_file = uploadToHdfs format_csv_with_names_file + export_result = import_from_hdfs.call(testTable, label, remote_csv_file, format_csv_with_names, brokerName, hdfsUser, hdfsPasswd) + check_import_result.call(label, testTable, expect_rows * 5) + + //[broker load] csv_with_names_and_types + label = UUID.randomUUID().toString().replaceAll("-", "") + remote_csv_file = uploadToHdfs format_csv_with_names_and_types_file + export_result = import_from_hdfs.call(testTable, label, remote_csv_file, format_csv_with_names_and_types, brokerName, hdfsUser, hdfsPasswd) + check_import_result.call(label, testTable, expect_rows * 6) + + def export_to_hdfs = {exportTable, exportLable, hdfsPath, exportFormat, exportBrokerName, exportUserName, exportPasswd-> + sql """ EXPORT TABLE ${exportTable} + TO "${hdfsPath}" + PROPERTIES ("label" = "${exportLable}", "column_separator"=",","format"="${exportFormat}") + WITH BROKER "${exportBrokerName}" ("username"="${exportUserName}", "password"="${exportPasswd}") + """ + } + + def check_export_result = {checklabel1-> + max_try_secs = 100000 + while(max_try_secs--) { + result = sql "show export where label='${checklabel1}'" + if(result[0][2] == "FINISHED") { + break + } else { + sleep(1000) + max_try_secs = max_try_secs - 1000 + if(max_try_secs < 0) { + assertEquals(1,2) + } + } + } + } + + def check_download_result={resultlist, fileFormat, expectedTotalRows-> + int totalLines = 0 + if(fileFormat == format_csv_with_names) { + expectedTotalRows += resultlist.size() + }else if(fileFormat == format_csv_with_names_and_types) { + expectedTotalRows += resultlist.size() * 2 + } + for(String oneFile :resultlist) { + totalLines += getTotalLine(oneFile) + deleteFile(oneFile) + } + assertEquals(expectedTotalRows,totalLines) + } + + resultCount = sql "select count(*) from ${testTable}" + currentTotalRows = resultCount[0][0] + + // export table to hdfs format=csv + hdfsDataDir = getHdfsDataDir() + label = UUID.randomUUID().toString().replaceAll("-", "") + export_to_hdfs.call(testTable, label, hdfsDataDir + "/" + label, format_csv, brokerName, hdfsUser, hdfsPasswd) + check_export_result(label) + result = downloadExportFromHdfs(label + "/export-data") + check_download_result(result, format_csv, currentTotalRows) + + // export table to hdfs format=csv_with_names + label = UUID.randomUUID().toString().replaceAll("-", "") + export_to_hdfs.call(testTable, label, hdfsDataDir + "/" + label, format_csv_with_names, brokerName, hdfsUser, hdfsPasswd) + check_export_result(label) + result = downloadExportFromHdfs(label + "/export-data") + check_download_result(result, format_csv_with_names, currentTotalRows) + + // export table to hdfs format=csv_with_names_and_types + label = UUID.randomUUID().toString().replaceAll("-", "") + export_to_hdfs.call(testTable, label, hdfsDataDir + "/" + label, format_csv_with_names_and_types, brokerName, hdfsUser, hdfsPasswd) + check_export_result(label) + result = downloadExportFromHdfs(label + "/export-data") + check_download_result(result, format_csv_with_names_and_types, currentTotalRows) + + + // select out file to hdfs + select_out_file = {outTable, outHdfsPath, outFormat, outHdfsFs, outBroker, outHdfsUser, outPasswd-> + sql """ + SELECT * FROM ${outTable} + INTO OUTFILE "${outHdfsPath}" + FORMAT AS "${outFormat}" + PROPERTIES + ( + "broker.name" = "${outBroker}", + "column_separator" = ",", + "line_delimiter" = "\n", + "max_file_size" = "5MB", + "broker.username"="${hdfsUser}", + "broker.password"="${outPasswd}" + ) + """ + } + // select out file to hdfs format=csv + label = UUID.randomUUID().toString().replaceAll("-", "") + select_out_file(testTable, hdfsDataDir + "/" + label + "/csv", format_csv, hdfsFs, brokerName, hdfsUser, hdfsPasswd) + result = downloadExportFromHdfs(label + "/csv") + check_download_result(result, format_csv, currentTotalRows) + + // select out file to hdfs format=csv_with_names + label = UUID.randomUUID().toString().replaceAll("-", "") + select_out_file(testTable, hdfsDataDir + "/" + label + "/csv", format_csv_with_names, hdfsFs, brokerName, hdfsUser, hdfsPasswd) + result = downloadExportFromHdfs(label + "/csv") + check_download_result(result, format_csv_with_names, currentTotalRows) + + // select out file to hdfs format=csv_with_names_and_types + label = UUID.randomUUID().toString().replaceAll("-", "") + select_out_file(testTable, hdfsDataDir + "/" + label + "/csv", format_csv_with_names_and_types, hdfsFs, brokerName, hdfsUser, hdfsPasswd) + result = downloadExportFromHdfs(label + "/csv") + check_download_result(result, format_csv_with_names_and_types, currentTotalRows) + +}