diff --git a/regression-test/data/load_p0/broker_load/test_s3_load_with_load_parallelism.out b/regression-test/data/load_p0/broker_load/test_s3_load_with_load_parallelism.out new file mode 100644 index 0000000000..7f63e400c3 --- /dev/null +++ b/regression-test/data/load_p0/broker_load/test_s3_load_with_load_parallelism.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +5000000 + diff --git a/regression-test/data/load_p0/stream_load/test_compress_type.out b/regression-test/data/load_p0/stream_load/test_compress_type.out new file mode 100644 index 0000000000..f76aa4d741 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_compress_type.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +120 + diff --git a/regression-test/suites/load_p0/broker_load/test_s3_load_with_load_parallelism.groovy b/regression-test/suites/load_p0/broker_load/test_s3_load_with_load_parallelism.groovy new file mode 100644 index 0000000000..be0d7f9c34 --- /dev/null +++ b/regression-test/suites/load_p0/broker_load/test_s3_load_with_load_parallelism.groovy @@ -0,0 +1,179 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_s3_load_with_load_parallelism", "load_p0") { + + def tableName = "test_load_parallelism" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName}( + `col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20), + `col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20), + `col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20), + `col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20), + `col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20), + `col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20), + `col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20), + `col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20), + `col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20), + `col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20) + ) ENGINE=OLAP + DUPLICATE KEY(`col_0`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1 + PROPERTIES ( "replication_num" = "1" ); + """ + + def attributesList = [ + + ] + + // attributesList.add(new LoadAttributes("s3://doris-build-1308700295/regression/load/data/enclose_not_trim_quotes.csv", + // "${tableName}", "", "COLUMNS TERMINATED BY \",\"", "FORMAT AS \"CSV\"", "(k1,k2,v1,v2,v3,v4)", + // "PROPERTIES (\"enclose\" = \"\\\"\", \"escape\" = \"\\\\\")").addProperties("trim_double_quotes", "false")) + + attributesList.add(new LoadAttributes("s3://test-for-student-1308700295/regression/segcompaction/segcompaction.orc", + "${tableName}", "", "", "FORMAT AS \"ORC\"", "(col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49)", "").addProperties("load_parallelism", "3")) + + def ak = getS3AK() + def sk = getS3SK() + + + def i = 0 + for (LoadAttributes attributes : attributesList) { + def label = "test_s3_load_escape_enclose" + UUID.randomUUID().toString().replace("-", "_") + "_" + i + attributes.label = label + def prop = attributes.getPropertiesStr() + + sql """ + LOAD LABEL $label ( + DATA INFILE("$attributes.dataDesc.path") + INTO TABLE $attributes.dataDesc.tableName + $attributes.dataDesc.columnTermClause + $attributes.dataDesc.lineTermClause + $attributes.dataDesc.formatClause + $attributes.dataDesc.columns + $attributes.dataDesc.whereExpr + ) + WITH S3 ( + "AWS_ACCESS_KEY" = "$ak", + "AWS_SECRET_KEY" = "$sk", + "AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com", + "AWS_REGION" = "ap-beijing" + ) + ${prop} + """ + // "AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com", + // "AWS_ACCESS_KEY" = "AKIDd9RVMzIOI0V7Wlnbr9JG0WrhJk28zc2H", + // "AWS_SECRET_KEY"="4uWxMhqnW3Plz97sPjqlSUXO1RhokRuO", + // "AWS_REGION" = "ap-beijing" + + def max_try_milli_secs = 600000 + while (max_try_milli_secs > 0) { + String[][] result = sql """ show load where label="$attributes.label" order by createtime desc limit 1; """ + + if (result[0][2].equals("FINISHED")) { + if (attributes.isExceptFailed) { + assertTrue(false, "load should be failed but was success: $result") + } + logger.info("Load FINISHED " + attributes.label + ": $result") + break + } + if (result[0][2].equals("CANCELLED")) { + if (attributes.dataDesc.path.split("/")[-1] == "enclose_incomplete.csv" || attributes.dataDesc.path.split("/")[-1] == "enclose_without_escape.csv") { + break + } + if (attributes.isExceptFailed) { + logger.info("Load FINISHED " + attributes.label) + break + } + assertTrue(false, "load failed: $result") + break + } + Thread.sleep(1000) + max_try_milli_secs -= 1000 + if (max_try_milli_secs <= 0) { + assertTrue(false, "load Timeout: $attributes.label") + } + } + } + sql "sync" + qt_sql """ + SELECT count(*) FROM ${tableName} + """ +} + +class DataDesc { + public String mergeType = "" + public String path + public String tableName + public String lineTermClause + public String columnTermClause + public String formatClause + public String columns + public String whereExpr +} + +class LoadAttributes { + LoadAttributes(String path, String tableName, String lineTermClause, String columnTermClause, String formatClause, + String columns, String whereExpr, boolean isExceptFailed = false) { + this.dataDesc = new DataDesc() + this.dataDesc.path = path + this.dataDesc.tableName = tableName + this.dataDesc.lineTermClause = lineTermClause + this.dataDesc.columnTermClause = columnTermClause + this.dataDesc.formatClause = formatClause + this.dataDesc.columns = columns + this.dataDesc.whereExpr = whereExpr + + this.isExceptFailed = isExceptFailed + + properties = new HashMap<>() + properties.put("use_new_load_scan_node", "true") + } + + LoadAttributes addProperties(String k, String v) { + properties.put(k, v) + return this + } + + String getPropertiesStr() { + if (properties.isEmpty()) { + return "" + } + String prop = "PROPERTIES (" + properties.forEach (k, v) -> { + prop += "\"${k}\" = \"${v}\"," + } + prop = prop.substring(0, prop.size() - 1) + prop += ")" + return prop + } + + LoadAttributes withPathStyle() { + usePathStyle = "true" + return this + } + + public DataDesc dataDesc + public Map properties + public String label + public String usePathStyle = "false" + public boolean isExceptFailed + + +} diff --git a/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql b/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql new file mode 100644 index 0000000000..41c3660e11 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/ddl/basic_data.sql @@ -0,0 +1,29 @@ +CREATE TABLE basic_data +( + k00 INT NOT NULL, + k01 DATE NOT NULL, + k02 BOOLEAN NULL, + k03 TINYINT NULL, + k04 SMALLINT NULL, + k05 INT NULL, + k06 BIGINT NULL, + k07 LARGEINT NULL, + k08 FLOAT NULL, + k09 DOUBLE NULL, + k10 DECIMAL(9,1) NULL, + k11 DECIMALV3(9,1) NULL, + k12 DATETIME NULL, + k13 DATEV2 NULL, + k14 DATETIMEV2 NULL, + k15 CHAR NULL, + k16 VARCHAR NULL, + k17 STRING NULL, + k18 JSON NULL + +) +DUPLICATE KEY(k00) +DISTRIBUTED BY HASH(k00) BUCKETS 32 +PROPERTIES ( + "bloom_filter_columns"="k05", + "replication_num" = "1" +); \ No newline at end of file diff --git a/regression-test/suites/load_p0/stream_load/test_compress_type.groovy b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy new file mode 100644 index 0000000000..d1123027ba --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_compress_type.groovy @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_compress_type", "load_p0") { + def tableName = "basic_data" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + // GZ/LZO/BZ2/LZ4FRAME/DEFLATE/LZOP + sql new File("""${context.file.parent}/ddl/${tableName}.sql""").text + // sql new File("""${context.file.parent}/ddl/${tableName}_create.sql""").text + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'format', "CSV" + set 'compress_type', 'GZ' + + file "basic_data.csv.gz" + + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'format', "CSV" + set 'compress_type', 'BZ2' + + file "basic_data.csv.bz2" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'format', 'csv' + set 'compress_type', 'LZ4' + + file "basic_data.csv.lz4" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'compress_type', 'GZ' + + file "basic_data.csv.gz" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'compress_type', 'BZ2' + + file "basic_data.csv.bz2" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'compress_type', 'LZ4' + + file "basic_data.csv.lz4" + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Success", json.Status) + assertEquals(20, json.NumberTotalRows) + assertEquals(20, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'format', "CSV" + file "basic_data.csv.gz" + + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.contains("too many filtered rows")) + assertEquals(13, json.NumberTotalRows) + assertEquals(0, json.NumberLoadedRows) + assertEquals(13, json.NumberFilteredRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'format', "CSV" + file "basic_data.csv.bz2" + + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.contains("too many filtered rows")) + assertEquals(9, json.NumberTotalRows) + assertEquals(0, json.NumberLoadedRows) + assertEquals(9, json.NumberFilteredRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + set 'format', "CSV" + file "basic_data.csv.lz4" + + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.contains("too many filtered rows")) + assertEquals(31, json.NumberTotalRows) + assertEquals(0, json.NumberLoadedRows) + assertEquals(31, json.NumberFilteredRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + file "basic_data.csv.gz" + + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.contains("too many filtered rows")) + assertEquals(13, json.NumberTotalRows) + assertEquals(0, json.NumberLoadedRows) + assertEquals(13, json.NumberFilteredRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + file "basic_data.csv.bz2" + + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.contains("too many filtered rows")) + assertEquals(9, json.NumberTotalRows) + assertEquals(0, json.NumberLoadedRows) + assertEquals(9, json.NumberFilteredRows) + assertTrue(json.LoadBytes > 0) + } + } + + streamLoad { + table "${tableName}" + set 'column_separator', '|' + set 'trim_double_quotes', 'true' + file "basic_data.csv.lz4" + + check { + result, exception, startTime, endTime -> + assertTrue(exception == null) + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("Fail", json.Status) + assertTrue(json.Message.contains("too many filtered rows")) + assertEquals(31, json.NumberTotalRows) + assertEquals(0, json.NumberLoadedRows) + assertEquals(31, json.NumberFilteredRows) + assertTrue(json.LoadBytes > 0) + } + } + + qt_sql """ select count(*) from ${tableName} """ +} \ No newline at end of file