[Bugfix](Outfile) fix that export data to parquet and orc file format (#19436)

1. support export `LARGEINT` data type to parquet/orc file format.
2. Export the DORIS `DATE/DATETIME` type to the `Date/Timestamp` logic type of parquet file format.
3. Fix that the data is not correct when the DATE type data is exported to ORC.
This commit is contained in:
Tiewei Fang
2023-05-13 22:39:24 +08:00
committed by GitHub
parent e98f4c4a5e
commit 91cdb79d89
13 changed files with 462 additions and 330 deletions

View File

@ -35,6 +35,7 @@ import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TParquetCompressionType;
import org.apache.doris.thrift.TParquetDataLogicalType;
import org.apache.doris.thrift.TParquetDataType;
import org.apache.doris.thrift.TParquetRepetitionType;
import org.apache.doris.thrift.TParquetSchema;
@ -65,6 +66,7 @@ public class OutFileClause {
public static final List<Type> RESULT_COL_TYPES = Lists.newArrayList();
public static final Map<String, TParquetRepetitionType> PARQUET_REPETITION_TYPE_MAP = Maps.newHashMap();
public static final Map<String, TParquetDataType> PARQUET_DATA_TYPE_MAP = Maps.newHashMap();
public static final Map<String, TParquetDataLogicalType> PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP = Maps.newHashMap();
public static final Map<String, TParquetCompressionType> PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap();
public static final Map<String, TParquetVersion> PARQUET_VERSION_MAP = Maps.newHashMap();
public static final Set<String> ORC_DATA_TYPE = Sets.newHashSet();
@ -97,6 +99,11 @@ public class OutFileClause {
PARQUET_DATA_TYPE_MAP.put("double", TParquetDataType.DOUBLE);
PARQUET_DATA_TYPE_MAP.put("fixed_len_byte_array", TParquetDataType.FIXED_LEN_BYTE_ARRAY);
PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("date", TParquetDataLogicalType.DATE);
PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("datetime", TParquetDataLogicalType.TIMESTAMP);
// TODO(ftw): add other logical type
PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("none", TParquetDataLogicalType.NONE);
PARQUET_COMPRESSION_TYPE_MAP.put("snappy", TParquetCompressionType.SNAPPY);
PARQUET_COMPRESSION_TYPE_MAP.put("gzip", TParquetCompressionType.GZIP);
PARQUET_COMPRESSION_TYPE_MAP.put("brotli", TParquetCompressionType.BROTLI);
@ -283,6 +290,7 @@ public class OutFileClause {
}
type = "string";
break;
case LARGEINT:
case DATE:
case DATETIME:
case DATETIMEV2:
@ -354,6 +362,7 @@ public class OutFileClause {
+ " but the type of column " + i + " is " + schema.second);
}
break;
case LARGEINT:
case DATE:
case DATETIME:
case DATETIMEV2:
@ -420,13 +429,13 @@ public class OutFileClause {
case TINYINT:
case SMALLINT:
case INT:
case DATE:
if (!PARQUET_DATA_TYPE_MAP.get("int32").equals(type)) {
throw new AnalysisException("project field type is TINYINT/SMALLINT/INT,"
+ "should use int32, " + "but the definition type of column " + i + " is " + type);
}
break;
case BIGINT:
case DATE:
case DATETIME:
if (!PARQUET_DATA_TYPE_MAP.get("int64").equals(type)) {
throw new AnalysisException("project field type is BIGINT/DATE/DATETIME,"
@ -454,9 +463,10 @@ public class OutFileClause {
case DECIMALV2:
case DATETIMEV2:
case DATEV2:
case LARGEINT:
if (!PARQUET_DATA_TYPE_MAP.get("byte_array").equals(type)) {
throw new AnalysisException("project field type is CHAR/VARCHAR/STRING/DECIMAL/DATEV2"
+ "/DATETIMEV2, should use byte_array, but the definition type of column "
+ "/DATETIMEV2/LARGEINT, should use byte_array, but the definition type of column "
+ i + " is " + type);
}
break;
@ -497,10 +507,10 @@ public class OutFileClause {
case TINYINT:
case SMALLINT:
case INT:
case DATE:
parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("int32");
break;
case BIGINT:
case DATE:
case DATETIME:
parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("int64");
break;
@ -519,6 +529,7 @@ public class OutFileClause {
case DECIMAL128:
case DATETIMEV2:
case DATEV2:
case LARGEINT:
parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("byte_array");
break;
case HLL:
@ -532,6 +543,18 @@ public class OutFileClause {
throw new AnalysisException("currently parquet do not support column type: "
+ expr.getType().getPrimitiveType());
}
switch (expr.getType().getPrimitiveType()) {
case DATE:
parquetSchema.schema_data_logical_type = PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.get("date");
break;
case DATETIME:
parquetSchema.schema_data_logical_type = PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.get("datetime");
break;
default:
parquetSchema.schema_data_logical_type = PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.get("none");
}
parquetSchema.schema_column_name = colLabels.get(i);
parquetSchemas.add(parquetSchema);
}