[opt](paimon) support mapping Paimon column type "Row" to Doris type "Struct" (#34239)
backport: #33786
This commit is contained in:
@ -166,6 +166,10 @@ public class PaimonColumnValue implements ColumnValue {
|
||||
|
||||
@Override
|
||||
public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> values) {
|
||||
|
||||
// todo: support pruned struct fields
|
||||
InternalRow row = record.getRow(idx, structFieldIndex.size());
|
||||
for (int i : structFieldIndex) {
|
||||
values.add(new PaimonColumnValue(row, i, dorisType.getChildTypes().get(i)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -25,6 +25,7 @@ import org.apache.paimon.types.BigIntType;
|
||||
import org.apache.paimon.types.BinaryType;
|
||||
import org.apache.paimon.types.BooleanType;
|
||||
import org.apache.paimon.types.CharType;
|
||||
import org.apache.paimon.types.DataField;
|
||||
import org.apache.paimon.types.DataType;
|
||||
import org.apache.paimon.types.DataTypeDefaultVisitor;
|
||||
import org.apache.paimon.types.DateType;
|
||||
@ -48,6 +49,7 @@ import org.slf4j.LoggerFactory;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Convert paimon type to doris type.
|
||||
@ -190,7 +192,13 @@ public class PaimonTypeUtils {
|
||||
|
||||
@Override
|
||||
public PaimonColumnType visit(RowType rowType) {
|
||||
return this.defaultMethod(rowType);
|
||||
PaimonColumnType paimonColumnType = new PaimonColumnType(Type.STRUCT);
|
||||
List<DataField> fields = rowType.getFields();
|
||||
List<ColumnType> childTypes = fields.stream()
|
||||
.map(field -> fromPaimonType(field.name(), field.type()))
|
||||
.collect(Collectors.toList());
|
||||
paimonColumnType.setChildTypes(childTypes);
|
||||
return paimonColumnType;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -39,9 +39,12 @@ import org.apache.paimon.types.ArrayType;
|
||||
import org.apache.paimon.types.DataField;
|
||||
import org.apache.paimon.types.DecimalType;
|
||||
import org.apache.paimon.types.MapType;
|
||||
import org.apache.paimon.types.RowType;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class PaimonExternalTable extends ExternalTable {
|
||||
|
||||
@ -131,6 +134,13 @@ public class PaimonExternalTable extends ExternalTable {
|
||||
MapType mapType = (MapType) dataType;
|
||||
return new org.apache.doris.catalog.MapType(
|
||||
paimonTypeToDorisType(mapType.getKeyType()), paimonTypeToDorisType(mapType.getValueType()));
|
||||
case ROW:
|
||||
RowType rowType = (RowType) dataType;
|
||||
List<DataField> fields = rowType.getFields();
|
||||
return new org.apache.doris.catalog.StructType(fields.stream()
|
||||
.map(field -> new org.apache.doris.catalog.StructField(field.name(),
|
||||
paimonTypeToDorisType(field.type())))
|
||||
.collect(Collectors.toCollection(ArrayList::new)));
|
||||
case TIME_WITHOUT_TIME_ZONE:
|
||||
return Type.UNSUPPORTED;
|
||||
default:
|
||||
|
||||
File diff suppressed because one or more lines are too long
@ -56,10 +56,7 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_
|
||||
sql """drop catalog ${hms_ctl_name}""";
|
||||
|
||||
String enabled = context.config.otherConfigs.get("enablePaimonTest")
|
||||
if (enabled != null && enabled.equalsIgnoreCase("enable_deprecated_case")) {
|
||||
// The timestamp type of paimon has no logical or converted type,
|
||||
// and is conflict with column type change from bigint to timestamp.
|
||||
// Deprecated currently.
|
||||
if (enabled != null && enabled.equalsIgnoreCase("true")) {
|
||||
def qt_all_type = { String table_name ->
|
||||
qt_all """select * from ${table_name} order by c1"""
|
||||
qt_predict_like_1 """select * from ${table_name} where c13 like '%3%' order by c1"""
|
||||
@ -174,6 +171,9 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_
|
||||
|
||||
def c100= """select * from array_nested order by c1;"""
|
||||
|
||||
def c102= """select * from row_native_test order by id;"""
|
||||
def c103= """select * from row_jni_test order by id;"""
|
||||
|
||||
String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort")
|
||||
String catalog_name = "ctl_test_paimon_catalog"
|
||||
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
|
||||
@ -271,6 +271,10 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_
|
||||
qt_c98 c98
|
||||
qt_c99 c99
|
||||
qt_c100 c100
|
||||
qt_c102 c102
|
||||
sql """ set force_jni_scanner=true; """
|
||||
qt_c103 c103
|
||||
sql """ set force_jni_scanner=false; """
|
||||
|
||||
// test view from jion paimon
|
||||
sql """ switch internal """
|
||||
|
||||
Reference in New Issue
Block a user