[Fix](Outfile) Use data_type_serde to export data to parquet file format (#24998)

This commit is contained in:
Tiewei Fang
2023-10-13 13:58:34 +08:00
committed by GitHub
parent 4f65a9c425
commit 6f9a084d99
18 changed files with 2446 additions and 893 deletions

View File

@ -39,7 +39,6 @@ import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TParquetCompressionType;
import org.apache.doris.thrift.TParquetDataLogicalType;
import org.apache.doris.thrift.TParquetDataType;
import org.apache.doris.thrift.TParquetRepetitionType;
import org.apache.doris.thrift.TParquetSchema;
@ -70,7 +69,6 @@ public class OutFileClause {
public static final List<Type> RESULT_COL_TYPES = Lists.newArrayList();
public static final Map<String, TParquetRepetitionType> PARQUET_REPETITION_TYPE_MAP = Maps.newHashMap();
public static final Map<String, TParquetDataType> PARQUET_DATA_TYPE_MAP = Maps.newHashMap();
public static final Map<String, TParquetDataLogicalType> PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP = Maps.newHashMap();
public static final Map<String, TParquetCompressionType> PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap();
public static final Map<String, TParquetVersion> PARQUET_VERSION_MAP = Maps.newHashMap();
public static final Set<String> ORC_DATA_TYPE = Sets.newHashSet();
@ -103,12 +101,6 @@ public class OutFileClause {
PARQUET_DATA_TYPE_MAP.put("double", TParquetDataType.DOUBLE);
PARQUET_DATA_TYPE_MAP.put("fixed_len_byte_array", TParquetDataType.FIXED_LEN_BYTE_ARRAY);
PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("decimal", TParquetDataLogicalType.DECIMAL);
PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("date", TParquetDataLogicalType.DATE);
PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("datetime", TParquetDataLogicalType.TIMESTAMP);
// TODO(ftw): add other logical type
PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("none", TParquetDataLogicalType.NONE);
PARQUET_COMPRESSION_TYPE_MAP.put("snappy", TParquetCompressionType.SNAPPY);
PARQUET_COMPRESSION_TYPE_MAP.put("gzip", TParquetCompressionType.GZIP);
PARQUET_COMPRESSION_TYPE_MAP.put("brotli", TParquetCompressionType.BROTLI);
@ -495,170 +487,17 @@ public class OutFileClause {
private void analyzeForParquetFormat(List<Expr> resultExprs, List<String> colLabels) throws AnalysisException {
if (this.parquetSchemas.isEmpty()) {
genParquetSchema(resultExprs, colLabels);
genParquetColumnName(resultExprs, colLabels);
}
// check schema number
if (resultExprs.size() != this.parquetSchemas.size()) {
throw new AnalysisException("Parquet schema number does not equal to select item number");
}
// check type
for (int i = 0; i < this.parquetSchemas.size(); ++i) {
TParquetDataType type = this.parquetSchemas.get(i).schema_data_type;
Type resultType = resultExprs.get(i).getType();
switch (resultType.getPrimitiveType()) {
case BOOLEAN:
if (!PARQUET_DATA_TYPE_MAP.get("boolean").equals(type)) {
throw new AnalysisException("project field type is BOOLEAN, should use boolean,"
+ " but the type of column " + i + " is " + type);
}
break;
case TINYINT:
case SMALLINT:
case INT:
case DATE:
if (!PARQUET_DATA_TYPE_MAP.get("int32").equals(type)) {
throw new AnalysisException("project field type is TINYINT/SMALLINT/INT,"
+ "should use int32, " + "but the definition type of column " + i + " is " + type);
}
break;
case BIGINT:
case DATETIME:
if (!PARQUET_DATA_TYPE_MAP.get("int64").equals(type)) {
throw new AnalysisException("project field type is BIGINT/DATE/DATETIME,"
+ "should use int64, but the definition type of column " + i + " is " + type);
}
break;
case FLOAT:
if (!PARQUET_DATA_TYPE_MAP.get("float").equals(type)) {
throw new AnalysisException("project field type is FLOAT, should use float,"
+ " but the definition type of column " + i + " is " + type);
}
break;
case DOUBLE:
if (!PARQUET_DATA_TYPE_MAP.get("double").equals(type)) {
throw new AnalysisException("project field type is DOUBLE, should use double,"
+ " but the definition type of column " + i + " is " + type);
}
break;
case DECIMAL32:
case DECIMAL64:
case DECIMAL128: {
if (!PARQUET_DATA_TYPE_MAP.get("fixed_len_byte_array").equals(type)) {
throw new AnalysisException("project field type is DECIMAL"
+ ", should use fixed_len_byte_array, but the definition type of column "
+ i + " is " + type);
}
break;
}
case DECIMALV2:
case CHAR:
case VARCHAR:
case STRING:
case DATETIMEV2:
case DATEV2:
case LARGEINT:
if (!PARQUET_DATA_TYPE_MAP.get("byte_array").equals(type)) {
throw new AnalysisException("project field type is CHAR/VARCHAR/STRING/DECIMAL/DATEV2"
+ "/DATETIMEV2/LARGEINT, should use byte_array, but the definition type of column "
+ i + " is " + type);
}
break;
case HLL:
case BITMAP:
if (ConnectContext.get() != null && ConnectContext.get()
.getSessionVariable().isReturnObjectDataAsBinary()) {
if (!PARQUET_DATA_TYPE_MAP.get("byte_array").equals(type)) {
throw new AnalysisException("project field type is HLL/BITMAP, should use byte_array, "
+ "but the definition type of column " + i + " is " + type);
}
} else {
throw new AnalysisException("Parquet format does not support column type: "
+ resultType.getPrimitiveType());
}
break;
default:
throw new AnalysisException("Parquet format does not support column type: "
+ resultType.getPrimitiveType());
}
}
}
private void genParquetSchema(List<Expr> resultExprs, List<String> colLabels) throws AnalysisException {
Preconditions.checkState(this.parquetSchemas.isEmpty());
private void genParquetColumnName(List<Expr> resultExprs, List<String> colLabels) throws AnalysisException {
for (int i = 0; i < resultExprs.size(); ++i) {
Expr expr = resultExprs.get(i);
TParquetSchema parquetSchema = new TParquetSchema();
if (resultExprs.get(i).isNullable()) {
parquetSchema.schema_repetition_type = PARQUET_REPETITION_TYPE_MAP.get("optional");
} else {
parquetSchema.schema_repetition_type = PARQUET_REPETITION_TYPE_MAP.get("required");
}
switch (expr.getType().getPrimitiveType()) {
case BOOLEAN:
parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("boolean");
break;
case TINYINT:
case SMALLINT:
case INT:
case DATE:
parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("int32");
break;
case BIGINT:
case DATETIME:
parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("int64");
break;
case FLOAT:
parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("float");
break;
case DOUBLE:
parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("double");
break;
case DECIMAL32:
case DECIMAL64:
case DECIMAL128: {
parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("fixed_len_byte_array");
break;
}
case DECIMALV2:
case CHAR:
case VARCHAR:
case STRING:
case DATETIMEV2:
case DATEV2:
case LARGEINT:
parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("byte_array");
break;
case HLL:
case BITMAP:
if (ConnectContext.get() != null && ConnectContext.get()
.getSessionVariable().isReturnObjectDataAsBinary()) {
parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("byte_array");
}
break;
default:
throw new AnalysisException("currently parquet do not support column type: "
+ expr.getType().getPrimitiveType());
}
switch (expr.getType().getPrimitiveType()) {
case DECIMAL32:
case DECIMAL64:
case DECIMAL128: {
parquetSchema.schema_data_logical_type = PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.get("decimal");
break;
}
case DATE:
parquetSchema.schema_data_logical_type = PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.get("date");
break;
case DATETIME:
parquetSchema.schema_data_logical_type = PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.get("datetime");
break;
default:
parquetSchema.schema_data_logical_type = PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.get("none");
}
parquetSchema.schema_column_name = colLabels.get(i);
parquetSchemas.add(parquetSchema);
}
@ -864,6 +703,7 @@ public class OutFileClause {
}
// check schema. if schema is not set, Doris will gen schema by select items
// Note: These codes are useless and outdated.
String schema = properties.get(SCHEMA);
if (schema == null) {
return;