[feature](tvf)(jni-avro)jni-avro scanner add complex data types (#26236)
Support avro's enum, record, union data types
This commit is contained in:
@ -86,18 +86,10 @@ Status AvroJNIReader::init_fetch_table_reader(
|
||||
{"file_type", std::to_string(type)},
|
||||
{"is_get_table_schema", "false"},
|
||||
{"hive.serde", "org.apache.hadoop.hive.serde2.avro.AvroSerDe"}};
|
||||
switch (type) {
|
||||
case TFileType::FILE_HDFS:
|
||||
required_param.insert(std::make_pair("uri", _params.hdfs_params.hdfs_conf.data()->value));
|
||||
break;
|
||||
case TFileType::FILE_S3:
|
||||
required_param.insert(std::make_pair("uri", _range.path));
|
||||
if (type == TFileType::FILE_S3) {
|
||||
required_param.insert(_params.properties.begin(), _params.properties.end());
|
||||
break;
|
||||
default:
|
||||
return Status::InternalError("unsupported file reader type: {}", std::to_string(type));
|
||||
}
|
||||
required_param.insert(_params.properties.begin(), _params.properties.end());
|
||||
required_param.insert(std::make_pair("uri", _range.path));
|
||||
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/avro/AvroJNIScanner",
|
||||
required_param, column_names);
|
||||
RETURN_IF_ERROR(_jni_connector->init(_colname_to_value_range));
|
||||
@ -144,8 +136,7 @@ Status AvroJNIReader::get_parsed_schema(std::vector<std::string>* col_names,
|
||||
}
|
||||
|
||||
TypeDescriptor AvroJNIReader::convert_to_doris_type(const rapidjson::Value& column_schema) {
|
||||
::doris::TPrimitiveType::type schema_type =
|
||||
static_cast< ::doris::TPrimitiveType::type>(column_schema["type"].GetInt());
|
||||
auto schema_type = static_cast< ::doris::TPrimitiveType::type>(column_schema["type"].GetInt());
|
||||
switch (schema_type) {
|
||||
case TPrimitiveType::INT:
|
||||
case TPrimitiveType::STRING:
|
||||
@ -153,30 +144,35 @@ TypeDescriptor AvroJNIReader::convert_to_doris_type(const rapidjson::Value& colu
|
||||
case TPrimitiveType::BOOLEAN:
|
||||
case TPrimitiveType::DOUBLE:
|
||||
case TPrimitiveType::FLOAT:
|
||||
return TypeDescriptor(thrift_to_type(schema_type));
|
||||
case TPrimitiveType::BINARY:
|
||||
return {thrift_to_type(schema_type)};
|
||||
case TPrimitiveType::ARRAY: {
|
||||
TypeDescriptor list_type(PrimitiveType::TYPE_ARRAY);
|
||||
list_type.add_sub_type(convert_complex_type(column_schema["childColumn"].GetObject()));
|
||||
const rapidjson::Value& childColumns = column_schema["childColumns"];
|
||||
list_type.add_sub_type(convert_to_doris_type(childColumns[0]));
|
||||
return list_type;
|
||||
}
|
||||
case TPrimitiveType::MAP: {
|
||||
TypeDescriptor map_type(PrimitiveType::TYPE_MAP);
|
||||
|
||||
const rapidjson::Value& childColumns = column_schema["childColumns"];
|
||||
// The default type of AVRO MAP structure key is STRING
|
||||
map_type.add_sub_type(PrimitiveType::TYPE_STRING);
|
||||
map_type.add_sub_type(convert_complex_type(column_schema["childColumn"].GetObject()));
|
||||
map_type.add_sub_type(convert_to_doris_type(childColumns[1]));
|
||||
return map_type;
|
||||
}
|
||||
default:
|
||||
return TypeDescriptor(PrimitiveType::INVALID_TYPE);
|
||||
case TPrimitiveType::STRUCT: {
|
||||
TypeDescriptor struct_type(PrimitiveType::TYPE_STRUCT);
|
||||
const rapidjson::Value& childColumns = column_schema["childColumns"];
|
||||
for (auto i = 0; i < childColumns.Size(); i++) {
|
||||
const rapidjson::Value& child = childColumns[i];
|
||||
struct_type.add_sub_type(convert_to_doris_type(childColumns[i]),
|
||||
std::string(child["name"].GetString()));
|
||||
}
|
||||
return struct_type;
|
||||
}
|
||||
default:
|
||||
return {PrimitiveType::INVALID_TYPE};
|
||||
}
|
||||
}
|
||||
|
||||
TypeDescriptor AvroJNIReader::convert_complex_type(
|
||||
const rapidjson::Document::ConstObject child_schema) {
|
||||
::doris::TPrimitiveType::type child_schema_type =
|
||||
static_cast< ::doris::TPrimitiveType::type>(child_schema["type"].GetInt());
|
||||
return TypeDescriptor(thrift_to_type(child_schema_type));
|
||||
}
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
Binary file not shown.
@ -19,17 +19,25 @@ package org.apache.doris.avro;
|
||||
|
||||
import org.apache.doris.common.jni.vec.ColumnValue;
|
||||
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericData.Fixed;
|
||||
import org.apache.avro.generic.GenericFixed;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.BigInteger;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
|
||||
public class AvroColumnValue implements ColumnValue {
|
||||
|
||||
@ -42,6 +50,12 @@ public class AvroColumnValue implements ColumnValue {
|
||||
}
|
||||
|
||||
private Object inspectObject() {
|
||||
if (fieldData instanceof ByteBuffer) {
|
||||
return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(((ByteBuffer) fieldData).array());
|
||||
} else if (fieldData instanceof Fixed) {
|
||||
return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(
|
||||
((GenericFixed) fieldData).bytes());
|
||||
}
|
||||
return ((PrimitiveObjectInspector) fieldInspector).getPrimitiveJavaObject(fieldData);
|
||||
}
|
||||
|
||||
@ -162,6 +176,24 @@ public class AvroColumnValue implements ColumnValue {
|
||||
|
||||
@Override
|
||||
public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {
|
||||
|
||||
StructObjectInspector inspector = (StructObjectInspector) fieldInspector;
|
||||
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
|
||||
for (Integer idx : structFieldIndex) {
|
||||
AvroColumnValue cv = null;
|
||||
if (idx != null) {
|
||||
StructField sf = fields.get(idx);
|
||||
Object o;
|
||||
if (fieldData instanceof GenericData.Record) {
|
||||
GenericRecord record = (GenericRecord) inspector.getStructFieldData(fieldData, sf);
|
||||
o = record.get(idx);
|
||||
} else {
|
||||
o = inspector.getStructFieldData(fieldData, sf);
|
||||
}
|
||||
if (Objects.nonNull(o)) {
|
||||
cv = new AvroColumnValue(sf.getFieldObjectInspector(), o);
|
||||
}
|
||||
}
|
||||
values.add(cv);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,12 +21,9 @@ import org.apache.doris.common.jni.JniScanner;
|
||||
import org.apache.doris.common.jni.vec.ColumnType;
|
||||
import org.apache.doris.common.jni.vec.ScanPredicate;
|
||||
import org.apache.doris.common.jni.vec.TableSchema;
|
||||
import org.apache.doris.common.jni.vec.TableSchema.SchemaColumn;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
import org.apache.doris.thrift.TPrimitiveType;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.Schema.Field;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hive.common.JavaUtils;
|
||||
@ -40,10 +37,7 @@ import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
@ -193,54 +187,6 @@ public class AvroJNIScanner extends JniScanner {
|
||||
@Override
|
||||
protected TableSchema parseTableSchema() throws UnsupportedOperationException {
|
||||
Schema schema = avroReader.getSchema();
|
||||
List<Field> schemaFields = schema.getFields();
|
||||
List<SchemaColumn> schemaColumns = new ArrayList<>();
|
||||
for (Field schemaField : schemaFields) {
|
||||
Schema avroSchema = schemaField.schema();
|
||||
String columnName = schemaField.name().toLowerCase(Locale.ROOT);
|
||||
|
||||
SchemaColumn schemaColumn = new SchemaColumn();
|
||||
TPrimitiveType tPrimitiveType = serializeSchemaType(avroSchema, schemaColumn);
|
||||
schemaColumn.setName(columnName);
|
||||
schemaColumn.setType(tPrimitiveType);
|
||||
schemaColumns.add(schemaColumn);
|
||||
}
|
||||
return new TableSchema(schemaColumns);
|
||||
return AvroTypeUtils.parseTableSchema(schema);
|
||||
}
|
||||
|
||||
private TPrimitiveType serializeSchemaType(Schema avroSchema, SchemaColumn schemaColumn)
|
||||
throws UnsupportedOperationException {
|
||||
Schema.Type type = avroSchema.getType();
|
||||
switch (type) {
|
||||
case NULL:
|
||||
return TPrimitiveType.NULL_TYPE;
|
||||
case STRING:
|
||||
return TPrimitiveType.STRING;
|
||||
case INT:
|
||||
return TPrimitiveType.INT;
|
||||
case BOOLEAN:
|
||||
return TPrimitiveType.BOOLEAN;
|
||||
case LONG:
|
||||
return TPrimitiveType.BIGINT;
|
||||
case FLOAT:
|
||||
return TPrimitiveType.FLOAT;
|
||||
case BYTES:
|
||||
return TPrimitiveType.BINARY;
|
||||
case DOUBLE:
|
||||
return TPrimitiveType.DOUBLE;
|
||||
case ARRAY:
|
||||
SchemaColumn arrayChildColumn = new SchemaColumn();
|
||||
schemaColumn.addChildColumn(arrayChildColumn);
|
||||
arrayChildColumn.setType(serializeSchemaType(avroSchema.getElementType(), arrayChildColumn));
|
||||
return TPrimitiveType.ARRAY;
|
||||
case MAP:
|
||||
SchemaColumn mapChildColumn = new SchemaColumn();
|
||||
schemaColumn.addChildColumn(mapChildColumn);
|
||||
mapChildColumn.setType(serializeSchemaType(avroSchema.getValueType(), mapChildColumn));
|
||||
return TPrimitiveType.MAP;
|
||||
default:
|
||||
throw new UnsupportedOperationException("avro format: " + type.getName() + " is not supported.");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,122 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.avro;
|
||||
|
||||
import org.apache.doris.common.jni.vec.TableSchema;
|
||||
import org.apache.doris.common.jni.vec.TableSchema.SchemaColumn;
|
||||
import org.apache.doris.thrift.TPrimitiveType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.Schema.Field;
|
||||
import org.apache.commons.compress.utils.Lists;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class AvroTypeUtils {
|
||||
|
||||
protected static TableSchema parseTableSchema(Schema schema) throws UnsupportedOperationException {
|
||||
List<Field> schemaFields = schema.getFields();
|
||||
List<SchemaColumn> schemaColumns = new ArrayList<>();
|
||||
for (Field schemaField : schemaFields) {
|
||||
Schema avroSchema = schemaField.schema();
|
||||
String columnName = schemaField.name();
|
||||
|
||||
SchemaColumn schemaColumn = new SchemaColumn();
|
||||
TPrimitiveType tPrimitiveType = typeFromAvro(avroSchema, schemaColumn);
|
||||
schemaColumn.setName(columnName);
|
||||
schemaColumn.setType(tPrimitiveType);
|
||||
schemaColumns.add(schemaColumn);
|
||||
}
|
||||
return new TableSchema(schemaColumns);
|
||||
}
|
||||
|
||||
private static TPrimitiveType typeFromAvro(Schema avroSchema, SchemaColumn schemaColumn)
|
||||
throws UnsupportedOperationException {
|
||||
Schema.Type type = avroSchema.getType();
|
||||
switch (type) {
|
||||
case ENUM:
|
||||
case STRING:
|
||||
return TPrimitiveType.STRING;
|
||||
case INT:
|
||||
return TPrimitiveType.INT;
|
||||
case BOOLEAN:
|
||||
return TPrimitiveType.BOOLEAN;
|
||||
case LONG:
|
||||
return TPrimitiveType.BIGINT;
|
||||
case FLOAT:
|
||||
return TPrimitiveType.FLOAT;
|
||||
case FIXED:
|
||||
case BYTES:
|
||||
return TPrimitiveType.BINARY;
|
||||
case DOUBLE:
|
||||
return TPrimitiveType.DOUBLE;
|
||||
case ARRAY:
|
||||
SchemaColumn arrayChildColumn = new SchemaColumn();
|
||||
schemaColumn.addChildColumns(Collections.singletonList(arrayChildColumn));
|
||||
arrayChildColumn.setType(typeFromAvro(avroSchema.getElementType(), arrayChildColumn));
|
||||
return TPrimitiveType.ARRAY;
|
||||
case MAP:
|
||||
// The default type of AVRO MAP structure key is STRING
|
||||
SchemaColumn keyChildColumn = new SchemaColumn();
|
||||
keyChildColumn.setType(TPrimitiveType.STRING);
|
||||
SchemaColumn valueChildColumn = new SchemaColumn();
|
||||
valueChildColumn.setType(typeFromAvro(avroSchema.getValueType(), valueChildColumn));
|
||||
|
||||
schemaColumn.addChildColumns(Arrays.asList(keyChildColumn, valueChildColumn));
|
||||
return TPrimitiveType.MAP;
|
||||
case RECORD:
|
||||
List<Field> fields = avroSchema.getFields();
|
||||
List<SchemaColumn> childSchemaColumn = Lists.newArrayList();
|
||||
for (Field field : fields) {
|
||||
SchemaColumn structChildColumn = new SchemaColumn();
|
||||
structChildColumn.setName(field.name());
|
||||
structChildColumn.setType(typeFromAvro(field.schema(), structChildColumn));
|
||||
childSchemaColumn.add(structChildColumn);
|
||||
}
|
||||
schemaColumn.addChildColumns(childSchemaColumn);
|
||||
return TPrimitiveType.STRUCT;
|
||||
case UNION:
|
||||
List<Schema> nonNullableMembers = filterNullableUnion(avroSchema);
|
||||
Preconditions.checkArgument(!nonNullableMembers.isEmpty(),
|
||||
avroSchema.getName() + "Union child type not all nullAble type");
|
||||
List<SchemaColumn> childSchemaColumns = Lists.newArrayList();
|
||||
for (Schema nullableMember : nonNullableMembers) {
|
||||
SchemaColumn childColumn = new SchemaColumn();
|
||||
childColumn.setName(nullableMember.getName());
|
||||
childColumn.setType(typeFromAvro(nullableMember, childColumn));
|
||||
childSchemaColumns.add(childColumn);
|
||||
}
|
||||
schemaColumn.addChildColumns(childSchemaColumns);
|
||||
return TPrimitiveType.STRUCT;
|
||||
default:
|
||||
throw new UnsupportedOperationException(
|
||||
"avro format: " + avroSchema.getName() + type.getName() + " is not supported.");
|
||||
}
|
||||
}
|
||||
|
||||
private static List<Schema> filterNullableUnion(Schema schema) {
|
||||
Preconditions.checkArgument(schema.isUnion(), "Schema must be union");
|
||||
return schema.getTypes().stream().filter(s -> !s.isNullable()).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,105 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.avro;
|
||||
|
||||
import org.apache.doris.common.jni.vec.TableSchema;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaBuilder;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class AvroTypeUtilsTest {
|
||||
private Schema allTypesRecordSchema;
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
private String result;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
result = "[{\"name\":\"aBoolean\",\"type\":2,\"childColumns\":null},{\"name\":\"aInt\",\"type\":5,"
|
||||
+ "\"childColumns\":null},{\"name\":\"aLong\",\"type\":6,\"childColumns\":null},{\"name\":\""
|
||||
+ "aFloat\",\"type\":7,\"childColumns\":null},{\"name\":\"aDouble\",\"type\":8,\"childColumns\""
|
||||
+ ":null},{\"name\":\"aString\",\"type\":23,\"childColumns\":null},{\"name\":\"aBytes\",\"type\""
|
||||
+ ":11,\"childColumns\":null},{\"name\":\"aFixed\",\"type\":11,\"childColumns\":null},{\"name\""
|
||||
+ ":\"anArray\",\"type\":20,\"childColumns\":[{\"name\":null,\"type\":5,\"childColumns\":null}]}"
|
||||
+ ",{\"name\":\"aMap\",\"type\":21,\"childColumns\":[{\"name\":null,\"type\":23,\"childColumns\""
|
||||
+ ":null},{\"name\":null,\"type\":5,\"childColumns\":null}]},{\"name\":\"anEnum\",\"type\":23"
|
||||
+ ",\"childColumns\":null},{\"name\":\"aRecord\",\"type\":22,\"childColumns\":[{\"name\":\"a\","
|
||||
+ "\"type\":5,\"childColumns\":null},{\"name\":\"b\",\"type\":8,\"childColumns\":null},{\"name\":"
|
||||
+ "\"c\",\"type\":23,\"childColumns\":null}]},{\"name\":\"aUnion\",\"type\":22,\"childColumns\":"
|
||||
+ "[{\"name\":\"string\",\"type\":23,\"childColumns\":null}]}]\n";
|
||||
|
||||
Schema simpleEnumSchema = SchemaBuilder.enumeration("myEnumType").symbols("A", "B", "C");
|
||||
Schema simpleRecordSchema = SchemaBuilder.record("simpleRecord")
|
||||
.fields()
|
||||
.name("a")
|
||||
.type().intType().noDefault()
|
||||
.name("b")
|
||||
.type().doubleType().noDefault()
|
||||
.name("c")
|
||||
.type().stringType().noDefault()
|
||||
.endRecord();
|
||||
|
||||
allTypesRecordSchema = SchemaBuilder.builder()
|
||||
.record("all")
|
||||
.fields()
|
||||
.name("aBoolean")
|
||||
.type().booleanType().noDefault()
|
||||
.name("aInt")
|
||||
.type().intType().noDefault()
|
||||
.name("aLong")
|
||||
.type().longType().noDefault()
|
||||
.name("aFloat")
|
||||
.type().floatType().noDefault()
|
||||
.name("aDouble")
|
||||
.type().doubleType().noDefault()
|
||||
.name("aString")
|
||||
.type().stringType().noDefault()
|
||||
.name("aBytes")
|
||||
.type().bytesType().noDefault()
|
||||
.name("aFixed")
|
||||
.type().fixed("myFixedType").size(16).noDefault()
|
||||
.name("anArray")
|
||||
.type().array().items().intType().noDefault()
|
||||
.name("aMap")
|
||||
.type().map().values().intType().noDefault()
|
||||
.name("anEnum")
|
||||
.type(simpleEnumSchema).noDefault()
|
||||
.name("aRecord")
|
||||
.type(simpleRecordSchema).noDefault()
|
||||
.name("aUnion")
|
||||
.type().optional().stringType()
|
||||
.endRecord();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseTableSchema() throws IOException {
|
||||
TableSchema tableSchema = AvroTypeUtils.parseTableSchema(allTypesRecordSchema);
|
||||
String tableSchemaTableSchema = tableSchema.getTableSchema();
|
||||
JsonNode tableSchemaTree = objectMapper.readTree(tableSchemaTableSchema);
|
||||
|
||||
JsonNode resultSchemaTree = objectMapper.readTree(result);
|
||||
Assert.assertEquals(resultSchemaTree, tableSchemaTree);
|
||||
}
|
||||
|
||||
}
|
||||
@ -49,7 +49,7 @@ public class TableSchema {
|
||||
public static class SchemaColumn {
|
||||
private String name;
|
||||
private int type;
|
||||
private SchemaColumn childColumn;
|
||||
private List<SchemaColumn> childColumns;
|
||||
|
||||
public SchemaColumn() {
|
||||
|
||||
@ -59,8 +59,8 @@ public class TableSchema {
|
||||
return name;
|
||||
}
|
||||
|
||||
public SchemaColumn getChildColumn() {
|
||||
return childColumn;
|
||||
public List<SchemaColumn> getChildColumns() {
|
||||
return childColumns;
|
||||
}
|
||||
|
||||
public int getType() {
|
||||
@ -75,8 +75,8 @@ public class TableSchema {
|
||||
this.type = type.getValue();
|
||||
}
|
||||
|
||||
public void addChildColumn(SchemaColumn childColumn) {
|
||||
this.childColumn = childColumn;
|
||||
public void addChildColumns(List<SchemaColumn> childColumns) {
|
||||
this.childColumns = childColumns;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
73
regression-test/data/external_table_p0/tvf/test_tvf_avro.out
Normal file
73
regression-test/data/external_table_p0/tvf/test_tvf_avro.out
Normal file
@ -0,0 +1,73 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !1 --
|
||||
aBoolean BOOLEAN Yes false \N NONE
|
||||
aInt INT Yes false \N NONE
|
||||
aLong BIGINT Yes false \N NONE
|
||||
aFloat FLOAT Yes false \N NONE
|
||||
aDouble DOUBLE Yes false \N NONE
|
||||
aString TEXT Yes false \N NONE
|
||||
anArray ARRAY<INT> Yes false \N NONE
|
||||
aMap MAP<TEXT,INT> Yes false \N NONE
|
||||
anEnum TEXT Yes false \N NONE
|
||||
aRecord STRUCT<a:INT,b:DOUBLE,c:TEXT> Yes false \N NONE
|
||||
aUnion STRUCT<string:TEXT> Yes false \N NONE
|
||||
mapArrayLong MAP<TEXT,ARRAY<BIGINT>> Yes false \N NONE
|
||||
arrayMapBoolean ARRAY<MAP<TEXT,BOOLEAN>> Yes false \N NONE
|
||||
|
||||
-- !2 --
|
||||
2
|
||||
|
||||
-- !1 --
|
||||
false 100 9999 2.11 9.1102 string test [5, 6, 7] {"k1":1, "k2":2} B {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k11":[77, 11, 33], "k22":[10, 20]} [{"Key11":1}, {"Key22":0}]
|
||||
true 42 3400 3.14 9.81 a test string [1, 2, 3, 4] {"key1":1, "key2":2} A {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k1":[99, 88, 77], "k2":[10, 20]} [{"arrayMapKey1":0}, {"arrayMapKey2":1}]
|
||||
|
||||
-- !2 --
|
||||
[1, 2, 3, 4]
|
||||
[5, 6, 7]
|
||||
|
||||
-- !3 --
|
||||
{"k1":1, "k2":2}
|
||||
{"key1":1, "key2":2}
|
||||
|
||||
-- !4 --
|
||||
A
|
||||
B
|
||||
|
||||
-- !5 --
|
||||
{"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"}
|
||||
{"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"}
|
||||
|
||||
-- !6 --
|
||||
\N
|
||||
\N
|
||||
|
||||
-- !7 --
|
||||
{"k1":[99, 88, 77], "k2":[10, 20]}
|
||||
{"k11":[77, 11, 33], "k22":[10, 20]}
|
||||
|
||||
-- !8 --
|
||||
[{"Key11":1}, {"Key22":0}]
|
||||
[{"arrayMapKey1":0}, {"arrayMapKey2":1}]
|
||||
|
||||
-- !3 --
|
||||
aBoolean BOOLEAN Yes false \N NONE
|
||||
aInt INT Yes false \N NONE
|
||||
aLong BIGINT Yes false \N NONE
|
||||
aFloat FLOAT Yes false \N NONE
|
||||
aDouble DOUBLE Yes false \N NONE
|
||||
aString TEXT Yes false \N NONE
|
||||
anArray ARRAY<INT> Yes false \N NONE
|
||||
aMap MAP<TEXT,INT> Yes false \N NONE
|
||||
anEnum TEXT Yes false \N NONE
|
||||
aRecord STRUCT<a:INT,b:DOUBLE,c:TEXT> Yes false \N NONE
|
||||
aUnion STRUCT<string:TEXT> Yes false \N NONE
|
||||
mapArrayLong MAP<TEXT,ARRAY<BIGINT>> Yes false \N NONE
|
||||
arrayMapBoolean ARRAY<MAP<TEXT,BOOLEAN>> Yes false \N NONE
|
||||
|
||||
-- !9 --
|
||||
false 100 9999 2.11 9.1102 string test [5, 6, 7] {"k1":1, "k2":2} B {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k11":[77, 11, 33], "k22":[10, 20]} [{"Key11":1}, {"Key22":0}]
|
||||
true 42 3400 3.14 9.81 a test string [1, 2, 3, 4] {"key1":1, "key2":2} A {"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"} \N {"k1":[99, 88, 77], "k2":[10, 20]} [{"arrayMapKey1":0}, {"arrayMapKey2":1}]
|
||||
|
||||
-- !4 --
|
||||
2
|
||||
|
||||
@ -0,0 +1,154 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_tvf_avro", "external,hive,tvf,avro,external_docker") {
|
||||
|
||||
def all_type_file = "all_type.avro";
|
||||
def format = "avro"
|
||||
|
||||
// s3 config
|
||||
String ak = getS3AK()
|
||||
String sk = getS3SK()
|
||||
String s3_endpoint = getS3Endpoint()
|
||||
String region = getS3Region()
|
||||
String bucket = context.config.otherConfigs.get("s3BucketName");
|
||||
def s3Uri = "https://${bucket}.${s3_endpoint}/regression/datalake/pipeline_data/tvf/${all_type_file}";
|
||||
|
||||
// hdfs config
|
||||
String hdfs_port = context.config.otherConfigs.get("hdfs_port")
|
||||
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
|
||||
def hdfsUserName = "doris"
|
||||
def defaultFS = "hdfs://${externalEnvIp}:${hdfs_port}"
|
||||
def hdfsUri = "${defaultFS}" + "/user/doris/preinstalled_data/avro/avro_all_types/${all_type_file}"
|
||||
|
||||
// TVF s3()
|
||||
qt_1 """
|
||||
desc function s3(
|
||||
"uri" = "${s3Uri}",
|
||||
"ACCESS_KEY" = "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"REGION" = "${region}",
|
||||
"FORMAT" = "${format}");
|
||||
"""
|
||||
|
||||
qt_2 """
|
||||
select count(*) from s3(
|
||||
"uri" ="${s3Uri}",
|
||||
"ACCESS_KEY" = "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"REGION" = "${region}",
|
||||
"FORMAT" = "${format}");
|
||||
"""
|
||||
|
||||
order_qt_1 """
|
||||
select * from s3(
|
||||
"uri" = "${s3Uri}",
|
||||
"ACCESS_KEY" = "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"REGION" = "${region}",
|
||||
"FORMAT" = "${format}");
|
||||
"""
|
||||
|
||||
order_qt_2 """
|
||||
select anArray from s3(
|
||||
"uri" = "${s3Uri}",
|
||||
"ACCESS_KEY" = "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"REGION" = "${region}",
|
||||
"FORMAT" = "${format}");
|
||||
"""
|
||||
|
||||
order_qt_3 """
|
||||
select aMap from s3(
|
||||
"uri" = "${s3Uri}",
|
||||
"ACCESS_KEY" = "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"REGION" = "${region}",
|
||||
"FORMAT" = "${format}");
|
||||
"""
|
||||
|
||||
order_qt_4 """
|
||||
select anEnum from s3(
|
||||
"uri" = "${s3Uri}",
|
||||
"ACCESS_KEY" = "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"REGION" = "${region}",
|
||||
"FORMAT" = "${format}");
|
||||
"""
|
||||
|
||||
order_qt_5 """
|
||||
select aRecord from s3(
|
||||
"uri" = "${s3Uri}",
|
||||
"ACCESS_KEY" = "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"REGION" = "${region}",
|
||||
"FORMAT" = "${format}");
|
||||
"""
|
||||
|
||||
order_qt_6 """
|
||||
select aUnion from s3(
|
||||
"uri" = "${s3Uri}",
|
||||
"ACCESS_KEY" = "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"REGION" = "${region}",
|
||||
"FORMAT" = "${format}");
|
||||
"""
|
||||
|
||||
order_qt_7 """
|
||||
select mapArrayLong from s3(
|
||||
"uri" ="${s3Uri}",
|
||||
"ACCESS_KEY" = "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"REGION" = "${region}",
|
||||
"FORMAT" = "${format}");
|
||||
"""
|
||||
|
||||
order_qt_8 """
|
||||
select arrayMapBoolean from s3(
|
||||
"uri" = "${s3Uri}",
|
||||
"ACCESS_KEY" = "${ak}",
|
||||
"SECRET_KEY" = "${sk}",
|
||||
"REGION" = "${region}",
|
||||
"FORMAT" = "${format}");
|
||||
"""
|
||||
|
||||
// TVF hdfs()
|
||||
String enabled = context.config.otherConfigs.get("enableHiveTest")
|
||||
if (enabled != null && enabled.equalsIgnoreCase("true")) {
|
||||
try {
|
||||
qt_3 """
|
||||
desc function HDFS(
|
||||
"uri" = "${hdfsUri}",
|
||||
"fs.defaultFS" = "${defaultFS}",
|
||||
"hadoop.username" = "${hdfsUserName}",
|
||||
"FORMAT" = "${format}"); """
|
||||
|
||||
order_qt_9 """ select * from HDFS(
|
||||
"uri" = "${hdfsUri}",
|
||||
"fs.defaultFS" = "${defaultFS}",
|
||||
"hadoop.username" = "${hdfsUserName}",
|
||||
"format" = "${format}")"""
|
||||
|
||||
qt_4 """ select count(*) from HDFS(
|
||||
"uri" = "${hdfsUri}",
|
||||
"fs.defaultFS" = "${defaultFS}",
|
||||
"hadoop.username" = "${hdfsUserName}",
|
||||
"format" = "${format}"); """
|
||||
} finally {
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user