[enhancement](test) add tpch_sf10 cases to p2 (#12698)

This commit is contained in:
Yongqiang YANG
2022-09-27 17:12:37 +08:00
committed by GitHub
parent 64988cb3d4
commit eba71cf5da
52 changed files with 759 additions and 0 deletions

View File

@ -0,0 +1,17 @@
CREATE TABLE IF NOT EXISTS customer (
c_custkey int NOT NULL,
c_name VARCHAR(25) NOT NULL,
c_address VARCHAR(40) NOT NULL,
c_nationkey int NOT NULL,
c_phone VARCHAR(15) NOT NULL,
c_acctbal decimal(15, 2) NOT NULL,
c_mktsegment VARCHAR(10) NOT NULL,
c_comment VARCHAR(117) NOT NULL
)ENGINE=OLAP
UNIQUE KEY(`c_custkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 24
PROPERTIES (
"replication_num" = "3"
)

View File

@ -0,0 +1 @@
DELETE FROM customer where C_CUSTKEY >= 0;

View File

@ -0,0 +1,6 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/tpch/sf10/customer.tbl")
INTO TABLE customer
COLUMNS TERMINATED BY "|"
(c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, temp)
)

View File

@ -0,0 +1,7 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/tpch/sf10/customer.tbl")
INTO TABLE customer
COLUMNS TERMINATED BY "|"
(c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment, temp)
ORDER BY c_custkey
)

View File

@ -0,0 +1 @@
DELETE FROM customer where C_CUSTKEY > 750000;

View File

@ -0,0 +1,18 @@
CREATE TABLE IF NOT EXISTS customer (
c_custkey int NOT NULL,
c_name VARCHAR(25) NOT NULL,
c_address VARCHAR(40) NOT NULL,
c_nationkey int NOT NULL,
c_phone VARCHAR(15) NOT NULL,
c_acctbal decimal(15, 2) NOT NULL,
c_mktsegment VARCHAR(10) NOT NULL,
c_comment VARCHAR(117) NOT NULL
)ENGINE=OLAP
UNIQUE KEY(`c_custkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 24
PROPERTIES (
"function_column.sequence_type" = 'int',
"replication_num" = "3"
)

View File

@ -0,0 +1,25 @@
CREATE TABLE IF NOT EXISTS lineitem (
l_shipdate DATE NOT NULL,
l_orderkey bigint NOT NULL,
l_linenumber int not null,
l_partkey int NOT NULL,
l_suppkey int not null,
l_quantity decimal(15, 2) NOT NULL,
l_extendedprice decimal(15, 2) NOT NULL,
l_discount decimal(15, 2) NOT NULL,
l_tax decimal(15, 2) NOT NULL,
l_returnflag VARCHAR(1) NOT NULL,
l_linestatus VARCHAR(1) NOT NULL,
l_commitdate DATE NOT NULL,
l_receiptdate DATE NOT NULL,
l_shipinstruct VARCHAR(25) NOT NULL,
l_shipmode VARCHAR(10) NOT NULL,
l_comment VARCHAR(44) NOT NULL
)ENGINE=OLAP
UNIQUE KEY(`l_shipdate`, `l_orderkey`,`l_linenumber`,`l_partkey`,`l_suppkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
PROPERTIES (
"replication_num" = "3"
)

View File

@ -0,0 +1 @@
DELETE FROM lineitem where l_linenumber >= 0;

View File

@ -0,0 +1,6 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/tpch/sf10/lineitem.tbl.*")
INTO TABLE lineitem
COLUMNS TERMINATED BY "|"
(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment,temp)
)

View File

@ -0,0 +1,7 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/tpch/sf10/lineitem.tbl.*")
INTO TABLE lineitem
COLUMNS TERMINATED BY "|"
(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment,temp)
ORDER BY l_orderkey
)

View File

@ -0,0 +1 @@
DELETE FROM lineitem where l_orderkey >= 24000001 and l_orderkey <= 36000000;

View File

@ -0,0 +1,26 @@
CREATE TABLE IF NOT EXISTS lineitem (
l_shipdate DATE NOT NULL,
l_orderkey bigint NOT NULL,
l_linenumber int not null,
l_partkey int NOT NULL,
l_suppkey int not null,
l_quantity decimal(15, 2) NOT NULL,
l_extendedprice decimal(15, 2) NOT NULL,
l_discount decimal(15, 2) NOT NULL,
l_tax decimal(15, 2) NOT NULL,
l_returnflag VARCHAR(1) NOT NULL,
l_linestatus VARCHAR(1) NOT NULL,
l_commitdate DATE NOT NULL,
l_receiptdate DATE NOT NULL,
l_shipinstruct VARCHAR(25) NOT NULL,
l_shipmode VARCHAR(10) NOT NULL,
l_comment VARCHAR(44) NOT NULL
)ENGINE=OLAP
UNIQUE KEY(`l_shipdate`, `l_orderkey`,`l_linenumber`,`l_partkey`,`l_suppkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
PROPERTIES (
"function_column.sequence_type" = 'DATE',
"replication_num" = "3"
)

View File

@ -0,0 +1,13 @@
CREATE TABLE IF NOT EXISTS nation (
`n_nationkey` int(11) NOT NULL,
`n_name` varchar(25) NOT NULL,
`n_regionkey` int(11) NOT NULL,
`n_comment` varchar(152) NULL
) ENGINE=OLAP
UNIQUE KEY(`N_NATIONKEY`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`N_NATIONKEY`) BUCKETS 1
PROPERTIES (
"replication_num" = "3"
);

View File

@ -0,0 +1 @@
DELETE FROM nation where n_nationkey >= 0;

View File

@ -0,0 +1,6 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/tpch/sf10/nation.tbl")
INTO TABLE nation
COLUMNS TERMINATED BY "|"
(n_nationkey, n_name, n_regionkey, n_comment, temp)
)

View File

@ -0,0 +1,7 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/tpch/sf10/nation.tbl")
INTO TABLE nation
COLUMNS TERMINATED BY "|"
(n_nationkey, n_name, n_regionkey, n_comment, temp)
ORDER BY n_nationkey
)

View File

@ -0,0 +1 @@
DELETE FROM nation where n_nationkey < 10;

View File

@ -0,0 +1,14 @@
CREATE TABLE IF NOT EXISTS nation (
`n_nationkey` int(11) NOT NULL,
`n_name` varchar(25) NOT NULL,
`n_regionkey` int(11) NOT NULL,
`n_comment` varchar(152) NULL
) ENGINE=OLAP
UNIQUE KEY(`N_NATIONKEY`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`N_NATIONKEY`) BUCKETS 1
PROPERTIES (
"function_column.sequence_type" = 'int',
"replication_num" = "3"
);

View File

@ -0,0 +1,17 @@
CREATE TABLE IF NOT EXISTS orders (
o_orderkey bigint NOT NULL,
o_orderdate DATE NOT NULL,
o_custkey int NOT NULL,
o_orderstatus VARCHAR(1) NOT NULL,
o_totalprice decimal(15, 2) NOT NULL,
o_orderpriority VARCHAR(15) NOT NULL,
o_clerk VARCHAR(15) NOT NULL,
o_shippriority int NOT NULL,
o_comment VARCHAR(79) NOT NULL
)ENGINE=OLAP
UNIQUE KEY(`o_orderkey`, `o_orderdate`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 96
PROPERTIES (
"replication_num" = "3"
)

View File

@ -0,0 +1 @@
DELETE FROM orders where o_orderkey >= 0;

View File

@ -0,0 +1,6 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/tpch/sf10/orders.tbl.*")
INTO TABLE orders
COLUMNS TERMINATED BY "|"
(o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, temp)
)

View File

@ -0,0 +1,7 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/tpch/sf10/orders.tbl.*")
INTO TABLE orders
COLUMNS TERMINATED BY "|"
(o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, temp)
ORDER BY o_orderkey
)

View File

@ -0,0 +1 @@
DELETE FROM orders where o_orderkey >= 24000001 and o_orderkey <= 36000000;

View File

@ -0,0 +1,18 @@
CREATE TABLE IF NOT EXISTS orders (
o_orderkey bigint NOT NULL,
o_orderdate DATE NOT NULL,
o_custkey int NOT NULL,
o_orderstatus VARCHAR(1) NOT NULL,
o_totalprice decimal(15, 2) NOT NULL,
o_orderpriority VARCHAR(15) NOT NULL,
o_clerk VARCHAR(15) NOT NULL,
o_shippriority int NOT NULL,
o_comment VARCHAR(79) NOT NULL
)ENGINE=OLAP
UNIQUE KEY(`o_orderkey`, `o_orderdate`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 96
PROPERTIES (
"function_column.sequence_type" = 'bigint',
"replication_num" = "3"
)

View File

@ -0,0 +1,18 @@
CREATE TABLE IF NOT EXISTS part (
p_partkey int NOT NULL,
p_name VARCHAR(55) NOT NULL,
p_mfgr VARCHAR(25) NOT NULL,
p_brand VARCHAR(10) NOT NULL,
p_type VARCHAR(25) NOT NULL,
p_size int NOT NULL,
p_container VARCHAR(10) NOT NULL,
p_retailprice decimal(15, 2) NOT NULL,
p_comment VARCHAR(23) NOT NULL
)ENGINE=OLAP
UNIQUE KEY(`p_partkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 24
PROPERTIES (
"replication_num" = "3"
)

View File

@ -0,0 +1 @@
DELETE FROM part where p_partkey >= 0;

View File

@ -0,0 +1,6 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/tpch/sf10/part.tbl")
INTO TABLE part
COLUMNS TERMINATED BY "|"
(p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, temp)
)

View File

@ -0,0 +1,7 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/tpch/sf10/part.tbl")
INTO TABLE part
COLUMNS TERMINATED BY "|"
(p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, temp)
ORDER BY p_partkey
)

View File

@ -0,0 +1 @@
DELETE FROM part where p_partkey > 1000000;

View File

@ -0,0 +1,19 @@
CREATE TABLE IF NOT EXISTS part (
p_partkey int NOT NULL,
p_name VARCHAR(55) NOT NULL,
p_mfgr VARCHAR(25) NOT NULL,
p_brand VARCHAR(10) NOT NULL,
p_type VARCHAR(25) NOT NULL,
p_size int NOT NULL,
p_container VARCHAR(10) NOT NULL,
p_retailprice decimal(15, 2) NOT NULL,
p_comment VARCHAR(23) NOT NULL
)ENGINE=OLAP
UNIQUE KEY(`p_partkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`p_partkey`) BUCKETS 24
PROPERTIES (
"function_column.sequence_type" = 'int',
"replication_num" = "3"
)

View File

@ -0,0 +1,14 @@
CREATE TABLE IF NOT EXISTS partsupp (
ps_partkey int NOT NULL,
ps_suppkey int NOT NULL,
ps_availqty int NOT NULL,
ps_supplycost decimal(15, 2) NOT NULL,
ps_comment VARCHAR(199) NOT NULL
)ENGINE=OLAP
UNIQUE KEY(`ps_partkey`,`ps_suppkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`ps_partkey`) BUCKETS 24
PROPERTIES (
"replication_num" = "3"
)

View File

@ -0,0 +1 @@
DELETE FROM partsupp where ps_partkey >= 0;

View File

@ -0,0 +1,6 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/tpch/sf10/partsupp.tbl.*")
INTO TABLE partsupp
COLUMNS TERMINATED BY "|"
(ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment, temp)
)

View File

@ -0,0 +1,7 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/tpch/sf10/partsupp.tbl.*")
INTO TABLE partsupp
COLUMNS TERMINATED BY "|"
(ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment, temp)
ORDER BY ps_partkey
)

View File

@ -0,0 +1 @@
DELETE FROM partsupp where ps_partkey >= 800001 and ps_partkey <= 1200000;

View File

@ -0,0 +1,15 @@
CREATE TABLE IF NOT EXISTS partsupp (
ps_partkey int NOT NULL,
ps_suppkey int NOT NULL,
ps_availqty int NOT NULL,
ps_supplycost decimal(15, 2) NOT NULL,
ps_comment VARCHAR(199) NOT NULL
)ENGINE=OLAP
UNIQUE KEY(`ps_partkey`,`ps_suppkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`ps_partkey`) BUCKETS 24
PROPERTIES (
"function_column.sequence_type" = 'int',
"replication_num" = "3"
)

View File

@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS region (
r_regionkey int NOT NULL,
r_name VARCHAR(25) NOT NULL,
r_comment VARCHAR(152)
)ENGINE=OLAP
UNIQUE KEY(`r_regionkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`r_regionkey`) BUCKETS 1
PROPERTIES (
"replication_num" = "3"
)

View File

@ -0,0 +1 @@
DELETE FROM region where r_regionkey >= 0;

View File

@ -0,0 +1,6 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/tpch/sf10/region.tbl")
INTO TABLE region
COLUMNS TERMINATED BY "|"
(r_regionkey, r_name, r_comment, temp)
)

View File

@ -0,0 +1,7 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/tpch/sf10/region.tbl")
INTO TABLE region
COLUMNS TERMINATED BY "|"
(r_regionkey, r_name, r_comment, temp)
ORDER BY r_regionkey
)

View File

@ -0,0 +1 @@
DELETE FROM region where r_regionkey >= 3;

View File

@ -0,0 +1,13 @@
CREATE TABLE IF NOT EXISTS region (
r_regionkey int NOT NULL,
r_name VARCHAR(25) NOT NULL,
r_comment VARCHAR(152)
)ENGINE=OLAP
UNIQUE KEY(`r_regionkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`r_regionkey`) BUCKETS 1
PROPERTIES (
"function_column.sequence_type" = 'int',
"replication_num" = "3"
)

View File

@ -0,0 +1,15 @@
CREATE TABLE IF NOT EXISTS supplier (
s_suppkey int NOT NULL,
s_name VARCHAR(25) NOT NULL,
s_address VARCHAR(40) NOT NULL,
s_nationkey int NOT NULL,
s_phone VARCHAR(15) NOT NULL,
s_acctbal decimal(15, 2) NOT NULL,
s_comment VARCHAR(101) NOT NULL
)ENGINE=OLAP
UNIQUE KEY(`s_suppkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`s_suppkey`) BUCKETS 12
PROPERTIES (
"replication_num" = "3"
)

View File

@ -0,0 +1 @@
DELETE FROM supplier where s_suppkey >= 0;

View File

@ -0,0 +1,6 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/tpch/sf10/supplier.tbl")
INTO TABLE supplier
COLUMNS TERMINATED BY "|"
(s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, temp)
)

View File

@ -0,0 +1,7 @@
LOAD LABEL ${loadLabel} (
DATA INFILE("s3://${s3BucketName}/regression/tpch/sf10/supplier.tbl")
INTO TABLE supplier
COLUMNS TERMINATED BY "|"
(s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment, temp)
ORDER BY s_suppkey
)

View File

@ -0,0 +1 @@
DELETE FROM supplier where s_suppkey > 50000;

View File

@ -0,0 +1,16 @@
CREATE TABLE IF NOT EXISTS supplier (
s_suppkey int NOT NULL,
s_name VARCHAR(25) NOT NULL,
s_address VARCHAR(40) NOT NULL,
s_nationkey int NOT NULL,
s_phone VARCHAR(15) NOT NULL,
s_acctbal decimal(15, 2) NOT NULL,
s_comment VARCHAR(101) NOT NULL
)ENGINE=OLAP
UNIQUE KEY(`s_suppkey`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`s_suppkey`) BUCKETS 12
PROPERTIES (
"function_column.sequence_type" = 'int',
"replication_num" = "3"
)

View File

@ -0,0 +1,120 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("load_four_step") {
// Import multiple times, use unique key, use seq and delete, select some to delete, and then import
// Map[tableName, rowCount]
def tables = [customer: [1500000,750000], lineitem: [59986052,47982508], nation: [25,15], orders: [15000000,12000000], part: [2000000,1000000], partsupp: [8000000,6400000], region: [5,3], supplier: [100000,50000]]
def s3BucketName = getS3BucketName()
def s3WithProperties = """WITH S3 (
|"AWS_ACCESS_KEY" = "${getS3AK()}",
|"AWS_SECRET_KEY" = "${getS3SK()}",
|"AWS_ENDPOINT" = "${getS3Endpoint()}",
|"AWS_REGION" = "${getS3Region()}")
|PROPERTIES(
|"exec_mem_limit" = "8589934592",
|"load_parallelism" = "3")""".stripMargin()
// set fe configuration
sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')"
def uniqueID1 = Math.abs(UUID.randomUUID().hashCode()).toString()
def uniqueID2 = Math.abs(UUID.randomUUID().hashCode()).toString()
def uniqueID3 = Math.abs(UUID.randomUUID().hashCode()).toString()
tables.each { table, rows ->
// create table if not exists
try{
sql new File("""${context.file.parent}/ddl/${table}_sequence.sql""").text
def loadLabel1 = table + "_" + uniqueID1
// load data from cos
def loadSql1 = new File("""${context.file.parent}/ddl/${table}_load_sequence.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName)
loadSql1 = loadSql1.replaceAll("\\\$\\{loadLabel\\}", loadLabel1) + s3WithProperties
sql loadSql1
// check load state
while (true) {
def stateResult1 = sql "show load where Label = '${loadLabel1}'"
def loadState1 = stateResult1[stateResult1.size() - 1][2].toString()
if ("CANCELLED".equalsIgnoreCase(loadState1)) {
throw new IllegalStateException("load ${loadLabel1} failed.")
} else if ("FINISHED".equalsIgnoreCase(loadState1)) {
sql 'sync'
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${table}"
logger.info("select ${table} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == rows[0])
}
def loadLabel2 = table + "_" + uniqueID2
def loadSql2 = new File("""${context.file.parent}/ddl/${table}_load_sequence.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName)
loadSql2 = loadSql2.replaceAll("\\\$\\{loadLabel\\}", loadLabel2) + s3WithProperties
sql loadSql2
while(true){
def stateResult2 = sql "show load where Label = '${loadLabel2}'"
def loadState2 = stateResult2[stateResult2.size() - 1][2].toString()
if ("CANCELLED".equalsIgnoreCase(loadState2)) {
throw new IllegalStateException("load ${loadLabel2} failed.")
} else if ("FINISHED".equalsIgnoreCase(loadState2)) {
sql 'sync'
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${table}"
logger.info("select ${table} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == rows[0])
}
break;
}
sleep(5000)
}
sql new File("""${context.file.parent}/ddl/${table}_part_delete.sql""").text
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${table}"
logger.info("select ${table} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == rows[1])
}
def loadLabel3 = table + "_" + uniqueID3
def loadSql3 = new File("""${context.file.parent}/ddl/${table}_load_sequence.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName)
loadSql3 = loadSql3.replaceAll("\\\$\\{loadLabel\\}", loadLabel3) + s3WithProperties
sql loadSql3
while(true){
def stateResult3 = sql "show load where Label = '${loadLabel3}'"
def loadState3 = stateResult3[stateResult3.size() - 1][2].toString()
if ("CANCELLED".equalsIgnoreCase(loadState3)) {
throw new IllegalStateException("load ${loadLabel3} failed.")
} else if ("FINISHED".equalsIgnoreCase(loadState3)) {
sql 'sync'
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${table}"
logger.info("select ${table} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == rows[0])
}
break;
}
sleep(5000)
}
break
}
sleep(5000)
}
}
finally {
try_sql("DROP TABLE IF EXISTS ${table}")
}
}
}

View File

@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("load_one_step") {
// Import once, use unique key, do not use seq and delete
// Map[tableName, rowCount]
def tables = [customer: 1500000, lineitem: 59986052, nation: 25, orders: 15000000, part: 2000000, partsupp: 8000000, region: 5, supplier: 100000]
def s3BucketName = getS3BucketName()
def s3WithProperties = """WITH S3 (
|"AWS_ACCESS_KEY" = "${getS3AK()}",
|"AWS_SECRET_KEY" = "${getS3SK()}",
|"AWS_ENDPOINT" = "${getS3Endpoint()}",
|"AWS_REGION" = "${getS3Region()}")
|PROPERTIES(
|"exec_mem_limit" = "8589934592",
|"load_parallelism" = "3")""".stripMargin()
// set fe configuration
sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')"
def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
tables.each { table, rows ->
// create table if not exists
try{
sql new File("""${context.file.parent}/ddl/${table}.sql""").text
def loadLabel = table + "_" + uniqueID
// load data from cos
def loadSql = new File("""${context.file.parent}/ddl/${table}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName)
loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties
sql loadSql
// check load state
while (true) {
def stateResult = sql "show load where Label = '${loadLabel}'"
def loadState = stateResult[stateResult.size() - 1][2].toString()
if ("CANCELLED".equalsIgnoreCase(loadState)) {
throw new IllegalStateException("load ${loadLabel} failed.")
} else if ("FINISHED".equalsIgnoreCase(loadState)) {
sql 'sync'
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${table}"
logger.info("select ${table} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == rows)
}
sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${table}"
logger.info("select ${table} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == 0)
}
break
}
sleep(5000)
}
}
finally {
try_sql("DROP TABLE IF EXISTS ${table}")
}
}
}

View File

@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("load_three_step") {
// Import multiple times, use unique key, use seq and delete
// Map[tableName, rowCount]
def tables = [customer: 1500000, lineitem: 59986052, nation: 25, orders: 15000000, part: 2000000, partsupp: 8000000, region: 5, supplier: 100000]
def s3BucketName = getS3BucketName()
def s3WithProperties = """WITH S3 (
|"AWS_ACCESS_KEY" = "${getS3AK()}",
|"AWS_SECRET_KEY" = "${getS3SK()}",
|"AWS_ENDPOINT" = "${getS3Endpoint()}",
|"AWS_REGION" = "${getS3Region()}")
|PROPERTIES(
|"exec_mem_limit" = "8589934592",
|"load_parallelism" = "3")""".stripMargin()
// set fe configuration
sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')"
def uniqueID1 = Math.abs(UUID.randomUUID().hashCode()).toString()
def uniqueID2 = Math.abs(UUID.randomUUID().hashCode()).toString()
tables.each { table, rows ->
// create table if not exists
try{
sql new File("""${context.file.parent}/ddl/${table}_sequence.sql""").text
def loadLabel1 = table + "_" + uniqueID1
// load data from cos
def loadSql1 = new File("""${context.file.parent}/ddl/${table}_load_sequence.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName)
loadSql1 = loadSql1.replaceAll("\\\$\\{loadLabel\\}", loadLabel1) + s3WithProperties
sql loadSql1
// check load state
while (true) {
def stateResult1 = sql "show load where Label = '${loadLabel1}'"
def loadState1 = stateResult1[stateResult1.size() - 1][2].toString()
if ("CANCELLED".equalsIgnoreCase(loadState1)) {
throw new IllegalStateException("load ${loadLabel1} failed.")
} else if ("FINISHED".equalsIgnoreCase(loadState1)) {
sql 'sync'
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${table}"
logger.info("select ${table} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == rows)
}
def loadLabel2 = table + "_" + uniqueID2
def loadSql2 = new File("""${context.file.parent}/ddl/${table}_load_sequence.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName)
loadSql2 = loadSql2.replaceAll("\\\$\\{loadLabel\\}", loadLabel2) + s3WithProperties
sql loadSql2
while(true){
def stateResult2 = sql "show load where Label = '${loadLabel2}'"
def loadState2 = stateResult2[stateResult2.size() - 1][2].toString()
if ("CANCELLED".equalsIgnoreCase(loadState2)) {
throw new IllegalStateException("load ${loadLabel2} failed.")
} else if ("FINISHED".equalsIgnoreCase(loadState2)) {
sql 'sync'
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${table}"
logger.info("select ${table} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == rows)
}
break;
}
sleep(5000)
}
sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text
for (int i = 1; i <= 5; i++) {
sql 'sync'
def loadRowCount = sql "select count(1) from ${table}"
logger.info("select ${table} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == 0)
}
break
}
sleep(5000)
}
}
finally {
try_sql("DROP TABLE IF EXISTS ${table}")
}
}
}

View File

@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("load_two_step") {
// Import once, use unique key, use seq and delete
// Map[tableName, rowCount]
def tables = [customer: 1500000, lineitem: 59986052, nation: 25, orders: 15000000, part: 2000000, partsupp: 8000000, region: 5, supplier: 100000]
def s3BucketName = getS3BucketName()
def s3WithProperties = """WITH S3 (
|"AWS_ACCESS_KEY" = "${getS3AK()}",
|"AWS_SECRET_KEY" = "${getS3SK()}",
|"AWS_ENDPOINT" = "${getS3Endpoint()}",
|"AWS_REGION" = "${getS3Region()}")
|PROPERTIES(
|"exec_mem_limit" = "8589934592",
|"load_parallelism" = "3")""".stripMargin()
// set fe configuration
sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')"
def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
tables.each { table, rows ->
// create table if not exists
try{
sql new File("""${context.file.parent}/ddl/${table}_sequence.sql""").text
def loadLabel = table + "_" + uniqueID
// load data from cos
def loadSql = new File("""${context.file.parent}/ddl/${table}_load_sequence.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName)
loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties
sql loadSql
// check load state
while (true) {
def stateResult = sql "show load where Label = '${loadLabel}'"
def loadState = stateResult[stateResult.size() - 1][2].toString()
if ("CANCELLED".equalsIgnoreCase(loadState)) {
throw new IllegalStateException("load ${loadLabel} failed.")
} else if ("FINISHED".equalsIgnoreCase(loadState)) {
sql 'sync'
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${table}"
logger.info("select ${table} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == rows)
}
sql new File("""${context.file.parent}/ddl/${table}_delete.sql""").text
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${table}"
logger.info("select ${table} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == 0)
}
break
}
sleep(5000)
}
}
finally {
try_sql("DROP TABLE IF EXISTS ${table}")
}
}
}