[fix](autoinc) Fix broker load when target table has autoinc column (#28402)
This commit is contained in:
@ -0,0 +1,9 @@
|
||||
Bob, 100
|
||||
Alice, 200
|
||||
Tom, 300
|
||||
Test, 400
|
||||
Carter, 500
|
||||
Smith, 600
|
||||
Beata, 700
|
||||
Doris, 800
|
||||
Nereids, 900
|
||||
|
@ -0,0 +1,9 @@
|
||||
null, Bob, 100
|
||||
null, Alice, 200
|
||||
null, Tom, 300
|
||||
null, Test, 400
|
||||
4, Carter, 500
|
||||
5, Smith, 600
|
||||
6, Beata, 700
|
||||
7, Doris, 800
|
||||
8, Nereids, 900
|
||||
|
@ -154,6 +154,7 @@ public class LoadingTaskPlanner {
|
||||
slotDesc.setIsMaterialized(true);
|
||||
slotDesc.setColumn(col);
|
||||
slotDesc.setIsNullable(col.isAllowNull());
|
||||
slotDesc.setAutoInc(col.isAutoInc());
|
||||
SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc);
|
||||
scanSlotDesc.setIsMaterialized(true);
|
||||
scanSlotDesc.setColumn(col);
|
||||
|
||||
@ -0,0 +1,45 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !sql --
|
||||
0 Bob 100
|
||||
1 Alice 200
|
||||
2 Tom 300
|
||||
3 Test 400
|
||||
4 Carter 500
|
||||
5 Smith 600
|
||||
6 Beata 700
|
||||
7 Doris 800
|
||||
8 Nereids 900
|
||||
|
||||
-- !sql --
|
||||
0 Bob 123
|
||||
1 Alice 200
|
||||
2 Tom 323
|
||||
3 Test 400
|
||||
4 Carter 523
|
||||
5 Smith 600
|
||||
6 Beata 700
|
||||
7 Doris 800
|
||||
8 Nereids 900
|
||||
|
||||
-- !sql --
|
||||
0 Bob 100
|
||||
1 Alice 200
|
||||
2 Tom 300
|
||||
3 Test 400
|
||||
4 Carter 500
|
||||
5 Smith 600
|
||||
6 Beata 700
|
||||
7 Doris 800
|
||||
8 Nereids 900
|
||||
|
||||
-- !sql --
|
||||
0 Bob 123
|
||||
1 Alice 200
|
||||
2 Tom 323
|
||||
3 Test 400
|
||||
4 Carter 523
|
||||
5 Smith 600
|
||||
6 Beata 700
|
||||
7 Doris 800
|
||||
8 Nereids 900
|
||||
|
||||
@ -0,0 +1,113 @@
|
||||
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_autoinc_broker_load", "p0,external,hive,external_docker,external_docker_hive") {
|
||||
|
||||
String enabled = context.config.otherConfigs.get("enableHiveTest")
|
||||
if (enabled != null && enabled.equalsIgnoreCase("true")) {
|
||||
brokerName = getBrokerName()
|
||||
hdfsUser = getHdfsUser()
|
||||
hdfsPasswd = getHdfsPasswd()
|
||||
hdfs_port = context.config.otherConfigs.get("hdfs_port")
|
||||
externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
|
||||
|
||||
def test_dir = "user/doris/preinstalled_data/data_case/autoinc"
|
||||
|
||||
def load_from_hdfs = {columns, testTable, label, testFile, format, brokerName, hdfsUser, hdfsPasswd ->
|
||||
def result1= sql """ LOAD LABEL ${label} (
|
||||
DATA INFILE("hdfs://${externalEnvIp}:${hdfs_port}/${test_dir}/${testFile}")
|
||||
INTO TABLE ${testTable}
|
||||
COLUMNS TERMINATED BY ","
|
||||
(${columns})
|
||||
) with HDFS (
|
||||
"fs.defaultFS"="hdfs://${externalEnvIp}:${hdfs_port}",
|
||||
"hadoop.username"="${hdfsUser}")
|
||||
PROPERTIES (
|
||||
"timeout"="1200",
|
||||
"max_filter_ratio"="0");"""
|
||||
assertTrue(result1.size() == 1)
|
||||
assertTrue(result1[0].size() == 1)
|
||||
assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected")
|
||||
}
|
||||
|
||||
def wait_for_load_result = {checklabel, testTable ->
|
||||
max_try_milli_secs = 10000
|
||||
while(max_try_milli_secs) {
|
||||
result = sql "show load where label = '${checklabel}'"
|
||||
if(result[0][2] == "FINISHED") {
|
||||
break
|
||||
} else {
|
||||
sleep(1000) // wait 1 second every time
|
||||
max_try_milli_secs -= 1000
|
||||
if(max_try_milli_secs <= 0) {
|
||||
assertEquals(1, 2)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
table = "test_autoinc_broker_load"
|
||||
sql "drop table if exists ${table}"
|
||||
sql """ CREATE TABLE IF NOT EXISTS `${table}` (
|
||||
`id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID",
|
||||
`name` varchar(65533) NOT NULL COMMENT "用户姓名",
|
||||
`value` int(11) NOT NULL COMMENT "用户得分"
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(`id`)
|
||||
COMMENT "OLAP"
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"in_memory" = "false",
|
||||
"storage_format" = "V2",
|
||||
"enable_unique_key_merge_on_write" = "true") """
|
||||
|
||||
def test_load_label = UUID.randomUUID().toString().replaceAll("-", "")
|
||||
load_from_hdfs("name, value", table, test_load_label, "auto_inc_basic.csv", "csv", brokerName, hdfsUser, hdfsPasswd)
|
||||
wait_for_load_result(test_load_label, table)
|
||||
qt_sql "select * from ${table};"
|
||||
sql """ insert into ${table} values(0, "Bob", 123), (2, "Tom", 323), (4, "Carter", 523);"""
|
||||
qt_sql "select * from ${table} order by id"
|
||||
sql "drop table if exists ${table};"
|
||||
|
||||
|
||||
table = "test_autoinc_broker_load2"
|
||||
sql "drop table if exists ${table}"
|
||||
sql """ CREATE TABLE IF NOT EXISTS `${table}` (
|
||||
`id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID",
|
||||
`name` varchar(65533) NOT NULL COMMENT "用户姓名",
|
||||
`value` int(11) NOT NULL COMMENT "用户得分"
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(`id`)
|
||||
COMMENT "OLAP"
|
||||
DISTRIBUTED BY HASH(`id`) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"in_memory" = "false",
|
||||
"storage_format" = "V2",
|
||||
"enable_unique_key_merge_on_write" = "true");"""
|
||||
test_load_label = UUID.randomUUID().toString().replaceAll("-", "")
|
||||
load_from_hdfs("id, name, value", table, test_load_label, "auto_inc_with_null.csv", "csv", brokerName, hdfsUser, hdfsPasswd)
|
||||
wait_for_load_result(test_load_label, table)
|
||||
sql "sync"
|
||||
qt_sql "select * from ${table};"
|
||||
sql """ insert into ${table} values(0, "Bob", 123), (2, "Tom", 323), (4, "Carter", 523);"""
|
||||
qt_sql "select * from ${table} order by id"
|
||||
sql "drop table if exists ${table};"
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user