[Enhancement](Load) stream tvf support csv header (#23797)
Co-authored-by: yiguolei <676222867@qq.com>
This commit is contained in:
@ -226,14 +226,10 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
|
||||
throw new AnalysisException("format:" + formatString + " is not supported.");
|
||||
}
|
||||
|
||||
// TODO Support is needed in the future
|
||||
if (getTFileType() == TFileType.FILE_STREAM && (formatString.equals("csv_with_names")
|
||||
|| formatString.equals("csv_with_names_and_types")
|
||||
|| formatString.equals("parquet")
|
||||
if (getTFileType() == TFileType.FILE_STREAM && (formatString.equals("parquet")
|
||||
|| formatString.equals("avro")
|
||||
|| formatString.equals("orc"))) {
|
||||
throw new AnalysisException("current http_stream does not yet support csv_with_names, "
|
||||
+ "csv_with_names_and_types, parquet, avro and orc");
|
||||
throw new AnalysisException("current http_stream does not yet support parquet, avro and orc");
|
||||
}
|
||||
columnSeparator = validParams.getOrDefault(COLUMN_SEPARATOR, DEFAULT_COLUMN_SEPARATOR);
|
||||
lineDelimiter = validParams.getOrDefault(LINE_DELIMITER, DEFAULT_LINE_DELIMITER);
|
||||
|
||||
@ -0,0 +1,11 @@
|
||||
id,name,age
|
||||
1,alice,18
|
||||
2,bob,20
|
||||
3,jack,24
|
||||
4,jackson,19
|
||||
5,liming,18
|
||||
6,luffy,20
|
||||
7,zoro,22
|
||||
8,sanzi,26
|
||||
9,wusuopu,21
|
||||
10,nami,18
|
||||
|
@ -0,0 +1,12 @@
|
||||
id,name,age
|
||||
INT,STRING,INT
|
||||
1,alice,18
|
||||
2,bob,20
|
||||
3,jack,24
|
||||
4,jackson,19
|
||||
5,liming,18
|
||||
6,luffy,20
|
||||
7,zoro,22
|
||||
8,sanzi,26
|
||||
9,wusuopu,21
|
||||
10,nami,18
|
||||
|
@ -0,0 +1,13 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !sql1 --
|
||||
1 alice 18
|
||||
2 bob 20
|
||||
3 jack 24
|
||||
4 jackson 19
|
||||
5 liming 18
|
||||
6 luffy 20
|
||||
7 zoro 22
|
||||
8 sanzi 26
|
||||
9 wusuopu 21
|
||||
10 nami 18
|
||||
|
||||
@ -0,0 +1,13 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !sql1 --
|
||||
1 alice 18
|
||||
2 bob 20
|
||||
3 jack 24
|
||||
4 jackson 19
|
||||
5 liming 18
|
||||
6 luffy 20
|
||||
7 zoro 22
|
||||
8 sanzi 26
|
||||
9 wusuopu 21
|
||||
10 nami 18
|
||||
|
||||
@ -0,0 +1,64 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
import java.util.Random;
|
||||
|
||||
suite("test_http_stream_csv_with_names", "p0") {
|
||||
|
||||
// 1. test csv_with_names
|
||||
def tableName1 = "test_http_stream_csv_with_names"
|
||||
def db = "regression_test_load_p0_http_stream"
|
||||
try {
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${tableName1} (
|
||||
id int,
|
||||
name VARCHAR(100),
|
||||
age int
|
||||
)
|
||||
DISTRIBUTED BY HASH(id) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1"
|
||||
)
|
||||
"""
|
||||
|
||||
streamLoad {
|
||||
set 'version', '1'
|
||||
set 'sql', """
|
||||
insert into ${db}.${tableName1} (id, name, age) select cast(id as INT) as id, name, age from http_stream(
|
||||
"format"="csv_with_names"
|
||||
)
|
||||
"""
|
||||
time 10000
|
||||
file 'student_with_names.csv'
|
||||
check { result, exception, startTime, endTime ->
|
||||
if (exception != null) {
|
||||
throw exception
|
||||
}
|
||||
log.info("http_stream result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertEquals(10, json.NumberTotalRows)
|
||||
assertEquals(0, json.NumberFilteredRows)
|
||||
}
|
||||
}
|
||||
|
||||
qt_sql1 "select id, name, age from ${tableName1} order by id"
|
||||
} finally {
|
||||
try_sql "DROP TABLE IF EXISTS ${tableName1}"
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,64 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
import java.util.Random;
|
||||
|
||||
suite("test_http_stream_csv_with_names_and_types", "p0") {
|
||||
|
||||
// 1. test csv_with_names_and_types
|
||||
def tableName1 = "test_http_stream_csv_with_names_and_types"
|
||||
def db = "regression_test_load_p0_http_stream"
|
||||
try {
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${tableName1} (
|
||||
id int,
|
||||
name VARCHAR(100),
|
||||
age int
|
||||
)
|
||||
DISTRIBUTED BY HASH(id) BUCKETS 1
|
||||
PROPERTIES (
|
||||
"replication_num" = "1"
|
||||
)
|
||||
"""
|
||||
|
||||
streamLoad {
|
||||
set 'version', '1'
|
||||
set 'sql', """
|
||||
insert into ${db}.${tableName1} (id, name, age) select cast(id as INT) as id, name, age from http_stream(
|
||||
"format"="csv_with_names_and_types"
|
||||
)
|
||||
"""
|
||||
time 10000
|
||||
file 'student_with_names_and_types.csv'
|
||||
check { result, exception, startTime, endTime ->
|
||||
if (exception != null) {
|
||||
throw exception
|
||||
}
|
||||
log.info("http_stream result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertEquals(10, json.NumberTotalRows)
|
||||
assertEquals(0, json.NumberFilteredRows)
|
||||
}
|
||||
}
|
||||
|
||||
qt_sql1 "select id, name, age from ${tableName1} order by id"
|
||||
} finally {
|
||||
try_sql "DROP TABLE IF EXISTS ${tableName1}"
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user