pick (#38402)
This commit is contained in:
@ -68,6 +68,8 @@ public class AlterRoutineLoadStmt extends DdlStmt {
|
||||
.add(LoadStmt.STRICT_MODE)
|
||||
.add(LoadStmt.TIMEZONE)
|
||||
.add(CreateRoutineLoadStmt.WORKLOAD_GROUP)
|
||||
.add(LoadStmt.KEY_ENCLOSE)
|
||||
.add(LoadStmt.KEY_ESCAPE)
|
||||
.build();
|
||||
|
||||
private final LabelName labelName;
|
||||
@ -250,6 +252,12 @@ public class AlterRoutineLoadStmt extends DdlStmt {
|
||||
.getWorkloadGroup(ConnectContext.get().getCurrentUserIdentity(), workloadGroup);
|
||||
analyzedJobProperties.put(CreateRoutineLoadStmt.WORKLOAD_GROUP, String.valueOf(wgId));
|
||||
}
|
||||
if (jobProperties.containsKey(LoadStmt.KEY_ENCLOSE)) {
|
||||
analyzedJobProperties.put(LoadStmt.KEY_ENCLOSE, jobProperties.get(LoadStmt.KEY_ENCLOSE));
|
||||
}
|
||||
if (jobProperties.containsKey(LoadStmt.KEY_ESCAPE)) {
|
||||
analyzedJobProperties.put(LoadStmt.KEY_ESCAPE, jobProperties.get(LoadStmt.KEY_ESCAPE));
|
||||
}
|
||||
}
|
||||
|
||||
private void checkDataSourceProperties() throws UserException {
|
||||
|
||||
@ -141,6 +141,8 @@ public class CreateRoutineLoadStmt extends DdlStmt {
|
||||
.add(LOAD_TO_SINGLE_TABLET)
|
||||
.add(PARTIAL_COLUMNS)
|
||||
.add(WORKLOAD_GROUP)
|
||||
.add(LoadStmt.KEY_ENCLOSE)
|
||||
.add(LoadStmt.KEY_ESCAPE)
|
||||
.build();
|
||||
|
||||
private final LabelName labelName;
|
||||
@ -178,9 +180,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
|
||||
private boolean numAsString = false;
|
||||
private boolean fuzzyParse = false;
|
||||
|
||||
private String enclose;
|
||||
private byte enclose;
|
||||
|
||||
private String escape;
|
||||
private byte escape;
|
||||
|
||||
private long workloadGroupId = -1;
|
||||
|
||||
@ -311,11 +313,11 @@ public class CreateRoutineLoadStmt extends DdlStmt {
|
||||
return jsonPaths;
|
||||
}
|
||||
|
||||
public String getEnclose() {
|
||||
public byte getEnclose() {
|
||||
return enclose;
|
||||
}
|
||||
|
||||
public String getEscape() {
|
||||
public byte getEscape() {
|
||||
return escape;
|
||||
}
|
||||
|
||||
@ -507,14 +509,24 @@ public class CreateRoutineLoadStmt extends DdlStmt {
|
||||
loadToSingleTablet = Util.getBooleanPropertyOrDefault(jobProperties.get(LoadStmt.LOAD_TO_SINGLE_TABLET),
|
||||
RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET,
|
||||
LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean");
|
||||
enclose = jobProperties.get(LoadStmt.KEY_ENCLOSE);
|
||||
if (enclose != null && enclose.length() != 1) {
|
||||
throw new AnalysisException("enclose must be single-char");
|
||||
|
||||
String encloseStr = jobProperties.get(LoadStmt.KEY_ENCLOSE);
|
||||
if (encloseStr != null) {
|
||||
if (encloseStr.length() != 1) {
|
||||
throw new AnalysisException("enclose must be single-char");
|
||||
} else {
|
||||
enclose = encloseStr.getBytes()[0];
|
||||
}
|
||||
}
|
||||
escape = jobProperties.get(LoadStmt.KEY_ESCAPE);
|
||||
if (escape != null && escape.length() != 1) {
|
||||
throw new AnalysisException("escape must be single-char");
|
||||
String escapeStr = jobProperties.get(LoadStmt.KEY_ESCAPE);
|
||||
if (escapeStr != null) {
|
||||
if (escapeStr.length() != 1) {
|
||||
throw new AnalysisException("enclose must be single-char");
|
||||
} else {
|
||||
escape = escapeStr.getBytes()[0];
|
||||
}
|
||||
}
|
||||
|
||||
String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP);
|
||||
if (!StringUtils.isEmpty(inputWorkloadGroupStr)) {
|
||||
this.workloadGroupId = Env.getCurrentEnv().getWorkloadGroupMgr()
|
||||
|
||||
@ -390,11 +390,13 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
|
||||
} else {
|
||||
jobProperties.put(PROPS_FUZZY_PARSE, "false");
|
||||
}
|
||||
if (stmt.getEnclose() != null) {
|
||||
jobProperties.put(LoadStmt.KEY_ENCLOSE, stmt.getEnclose());
|
||||
if (String.valueOf(stmt.getEnclose()) != null) {
|
||||
this.enclose = stmt.getEnclose();
|
||||
jobProperties.put(LoadStmt.KEY_ENCLOSE, String.valueOf(stmt.getEnclose()));
|
||||
}
|
||||
if (stmt.getEscape() != null) {
|
||||
jobProperties.put(LoadStmt.KEY_ESCAPE, stmt.getEscape());
|
||||
if (String.valueOf(stmt.getEscape()) != null) {
|
||||
this.escape = stmt.getEscape();
|
||||
jobProperties.put(LoadStmt.KEY_ESCAPE, String.valueOf(stmt.getEscape()));
|
||||
}
|
||||
if (stmt.getWorkloadGroupId() > 0) {
|
||||
jobProperties.put(WORKLOAD_GROUP, String.valueOf(stmt.getWorkloadGroupId()));
|
||||
|
||||
@ -0,0 +1,10 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !sql_enclose_and_escape --
|
||||
1 ab,ced 2023-07-15 de 2023-07-20T05:48:31 "ghi"
|
||||
|
||||
-- !sql_enclose_and_escape_resume --
|
||||
1 ab,ced 2023-07-15 de 2023-07-20T05:48:31 "ghi"
|
||||
|
||||
-- !sql_enclose_and_escape_multi_table --
|
||||
1 ab,ced 2023-07-15 de 2023-07-20T05:48:31 "ghi"
|
||||
|
||||
@ -0,0 +1 @@
|
||||
1,eab,cfede,2023-07-15,def,2023-07-20:05:48:31,"ghi"
|
||||
|
@ -0,0 +1 @@
|
||||
1,eab,gfegdg,2023-07-15,def,2023-07-20:05:48:31,"ghi"
|
||||
|
@ -0,0 +1,187 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
import org.apache.kafka.clients.admin.AdminClient
|
||||
import org.apache.kafka.clients.producer.KafkaProducer
|
||||
import org.apache.kafka.clients.producer.ProducerRecord
|
||||
import org.apache.kafka.clients.producer.ProducerConfig
|
||||
|
||||
suite("test_routine_load_property","p0") {
|
||||
// send data to Kafka
|
||||
def kafkaCsvTpoics = [
|
||||
"test_enclose_and_escape0",
|
||||
"test_enclose_and_escape1",
|
||||
]
|
||||
String enabled = context.config.otherConfigs.get("enableKafkaTest")
|
||||
String kafka_port = context.config.otherConfigs.get("kafka_port")
|
||||
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
|
||||
def kafka_broker = "${externalEnvIp}:${kafka_port}"
|
||||
if (enabled != null && enabled.equalsIgnoreCase("true")) {
|
||||
// define kafka
|
||||
def props = new Properties()
|
||||
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
|
||||
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
|
||||
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
|
||||
// Create kafka producer
|
||||
def producer = new KafkaProducer<>(props)
|
||||
|
||||
for (String kafkaCsvTopic in kafkaCsvTpoics) {
|
||||
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
|
||||
def lines = txt.readLines()
|
||||
lines.each { line ->
|
||||
logger.info("=====${line}========")
|
||||
def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
|
||||
producer.send(record)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// test create routine load job with enclose and escape
|
||||
def tableName = "test_routine_load_with_enclose_and_escape"
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${tableName} (
|
||||
`k1` int(20) NULL,
|
||||
`k2` string NULL,
|
||||
`v1` date NULL,
|
||||
`v2` string NULL,
|
||||
`v3` datetime NULL,
|
||||
`v4` string NULL
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`k1`)
|
||||
COMMENT 'OLAP'
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
|
||||
PROPERTIES ("replication_allocation" = "tag.location.default: 1");
|
||||
"""
|
||||
|
||||
if (enabled != null && enabled.equalsIgnoreCase("true")) {
|
||||
def jobName = "test_enclose_and_escape"
|
||||
try {
|
||||
sql """
|
||||
CREATE ROUTINE LOAD ${jobName} on ${tableName}
|
||||
COLUMNS TERMINATED BY ","
|
||||
PROPERTIES
|
||||
(
|
||||
"max_batch_interval" = "5",
|
||||
"max_batch_rows" = "300000",
|
||||
"enclose" = "e",
|
||||
"escape" = "f",
|
||||
"max_batch_size" = "209715200"
|
||||
)
|
||||
FROM KAFKA
|
||||
(
|
||||
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
|
||||
"kafka_topic" = "${kafkaCsvTpoics[0]}",
|
||||
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
|
||||
);
|
||||
"""
|
||||
sql "sync"
|
||||
|
||||
def count = 0
|
||||
while (true) {
|
||||
def res = sql "select count(*) from ${tableName}"
|
||||
def state = sql "show routine load for ${jobName}"
|
||||
log.info("routine load state: ${state[0][8].toString()}".toString())
|
||||
log.info("routine load statistic: ${state[0][14].toString()}".toString())
|
||||
log.info("reason of state changed: ${state[0][17].toString()}".toString())
|
||||
if (res[0][0] > 0) {
|
||||
break
|
||||
}
|
||||
if (count >= 120) {
|
||||
log.error("routine load can not visible for long time")
|
||||
assertEquals(20, res[0][0])
|
||||
break
|
||||
}
|
||||
sleep(1000)
|
||||
count++
|
||||
}
|
||||
qt_sql_enclose_and_escape "select * from ${tableName} order by k1"
|
||||
|
||||
sql "pause routine load for ${jobName}"
|
||||
def res = sql "show routine load for ${jobName}"
|
||||
log.info("routine load job properties: ${res[0][11].toString()}".toString())
|
||||
sql "ALTER ROUTINE LOAD FOR ${jobName} PROPERTIES(\"enclose\" = \"g\");"
|
||||
sql "resume routine load for ${jobName}"
|
||||
count = 0
|
||||
while (true) {
|
||||
res = sql "select count(*) from ${tableName}"
|
||||
def state = sql "show routine load for ${jobName}"
|
||||
log.info("routine load state: ${state[0][8].toString()}".toString())
|
||||
log.info("routine load statistic: ${state[0][14].toString()}".toString())
|
||||
log.info("reason of state changed: ${state[0][17].toString()}".toString())
|
||||
if (res[0][0] > 0) {
|
||||
break
|
||||
}
|
||||
if (count >= 120) {
|
||||
log.error("routine load can not visible for long time")
|
||||
assertEquals(20, res[0][0])
|
||||
break
|
||||
}
|
||||
sleep(1000)
|
||||
count++
|
||||
}
|
||||
qt_sql_enclose_and_escape_resume "select * from ${tableName} order by k1"
|
||||
} finally {
|
||||
sql "stop routine load for ${jobName}"
|
||||
}
|
||||
|
||||
try {
|
||||
sql """
|
||||
CREATE ROUTINE LOAD ${jobName}
|
||||
COLUMNS TERMINATED BY ","
|
||||
PROPERTIES
|
||||
(
|
||||
"max_batch_interval" = "5",
|
||||
"max_batch_rows" = "300000",
|
||||
"max_batch_size" = "209715200",
|
||||
"enclose" = "e",
|
||||
"escape" = "f"
|
||||
)
|
||||
FROM KAFKA
|
||||
(
|
||||
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
|
||||
"kafka_topic" = "${kafkaCsvTpoics[0]}",
|
||||
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
|
||||
);
|
||||
"""
|
||||
sql "sync"
|
||||
|
||||
def count = 0
|
||||
while (true) {
|
||||
def res = sql "select count(*) from ${tableName}"
|
||||
def state = sql "show routine load for ${jobName}"
|
||||
log.info("routine load state: ${state[0][8].toString()}".toString())
|
||||
log.info("routine load statistic: ${state[0][14].toString()}".toString())
|
||||
log.info("reason of state changed: ${state[0][17].toString()}".toString())
|
||||
if (res[0][0] > 0) {
|
||||
break
|
||||
}
|
||||
if (count >= 120) {
|
||||
log.error("routine load can not visible for long time")
|
||||
assertEquals(20, res[0][0])
|
||||
break
|
||||
}
|
||||
sleep(1000)
|
||||
count++
|
||||
}
|
||||
|
||||
qt_sql_enclose_and_escape_multi_table "select * from ${tableName} order by k1"
|
||||
} finally {
|
||||
sql "stop routine load for ${jobName}"
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user