[Fix](export/outfile) Support compression when exporting data to Parquet / ORC. (#37167)
bp: #36490
This commit is contained in:
@ -64,6 +64,7 @@ public class ExportStmt extends StatementBase {
|
||||
public static final String PARALLELISM = "parallelism";
|
||||
public static final String LABEL = "label";
|
||||
public static final String DATA_CONSISTENCY = "data_consistency";
|
||||
public static final String COMPRESS_TYPE = "compress_type";
|
||||
|
||||
private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
|
||||
private static final String DEFAULT_LINE_DELIMITER = "\n";
|
||||
@ -81,6 +82,7 @@ public class ExportStmt extends StatementBase {
|
||||
.add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER)
|
||||
.add(PropertyAnalyzer.PROPERTIES_TIMEOUT)
|
||||
.add("format")
|
||||
.add(COMPRESS_TYPE)
|
||||
.build();
|
||||
|
||||
private TableName tblName;
|
||||
@ -107,6 +109,7 @@ public class ExportStmt extends StatementBase {
|
||||
private String deleteExistingFiles;
|
||||
private String withBom;
|
||||
private String dataConsistency = ExportJob.CONSISTENT_PARTITION;
|
||||
private String compressionType;
|
||||
private SessionVariable sessionVariables;
|
||||
|
||||
private String qualifiedUser;
|
||||
@ -234,6 +237,7 @@ public class ExportStmt extends StatementBase {
|
||||
exportJob.setDeleteExistingFiles(this.deleteExistingFiles);
|
||||
exportJob.setWithBom(this.withBom);
|
||||
exportJob.setDataConsistency(this.dataConsistency);
|
||||
exportJob.setCompressType(this.compressionType);
|
||||
|
||||
if (columns != null) {
|
||||
Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
|
||||
@ -376,6 +380,9 @@ public class ExportStmt extends StatementBase {
|
||||
+ ExportJob.CONSISTENT_PARTITION + "`/`" + ExportJob.CONSISTENT_NONE + "`");
|
||||
}
|
||||
}
|
||||
|
||||
// compress_type
|
||||
this.compressionType = properties.getOrDefault(COMPRESS_TYPE, "");
|
||||
}
|
||||
|
||||
private void checkColumns() throws DdlException {
|
||||
|
||||
@ -37,6 +37,7 @@ import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.property.PropertyConverter;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.thrift.TFileCompressType;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.thrift.TParquetCompressionType;
|
||||
import org.apache.doris.thrift.TParquetDataType;
|
||||
@ -70,6 +71,7 @@ public class OutFileClause {
|
||||
public static final Map<String, TParquetRepetitionType> PARQUET_REPETITION_TYPE_MAP = Maps.newHashMap();
|
||||
public static final Map<String, TParquetDataType> PARQUET_DATA_TYPE_MAP = Maps.newHashMap();
|
||||
public static final Map<String, TParquetCompressionType> PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap();
|
||||
public static final Map<String, TFileCompressType> ORC_COMPRESSION_TYPE_MAP = Maps.newHashMap();
|
||||
public static final Map<String, TParquetVersion> PARQUET_VERSION_MAP = Maps.newHashMap();
|
||||
public static final Set<String> ORC_DATA_TYPE = Sets.newHashSet();
|
||||
public static final String FILE_NUMBER = "FileNumber";
|
||||
@ -106,9 +108,15 @@ public class OutFileClause {
|
||||
PARQUET_COMPRESSION_TYPE_MAP.put("brotli", TParquetCompressionType.BROTLI);
|
||||
PARQUET_COMPRESSION_TYPE_MAP.put("zstd", TParquetCompressionType.ZSTD);
|
||||
PARQUET_COMPRESSION_TYPE_MAP.put("lz4", TParquetCompressionType.LZ4);
|
||||
PARQUET_COMPRESSION_TYPE_MAP.put("lzo", TParquetCompressionType.LZO);
|
||||
PARQUET_COMPRESSION_TYPE_MAP.put("bz2", TParquetCompressionType.BZ2);
|
||||
PARQUET_COMPRESSION_TYPE_MAP.put("default", TParquetCompressionType.UNCOMPRESSED);
|
||||
// arrow do not support lzo and bz2 compression type.
|
||||
// PARQUET_COMPRESSION_TYPE_MAP.put("lzo", TParquetCompressionType.LZO);
|
||||
// PARQUET_COMPRESSION_TYPE_MAP.put("bz2", TParquetCompressionType.BZ2);
|
||||
PARQUET_COMPRESSION_TYPE_MAP.put("plain", TParquetCompressionType.UNCOMPRESSED);
|
||||
|
||||
ORC_COMPRESSION_TYPE_MAP.put("plain", TFileCompressType.PLAIN);
|
||||
ORC_COMPRESSION_TYPE_MAP.put("snappy", TFileCompressType.SNAPPYBLOCK);
|
||||
ORC_COMPRESSION_TYPE_MAP.put("zlib", TFileCompressType.ZLIB);
|
||||
ORC_COMPRESSION_TYPE_MAP.put("zstd", TFileCompressType.ZSTD);
|
||||
|
||||
PARQUET_VERSION_MAP.put("v1", TParquetVersion.PARQUET_1_0);
|
||||
PARQUET_VERSION_MAP.put("latest", TParquetVersion.PARQUET_2_LATEST);
|
||||
@ -137,6 +145,7 @@ public class OutFileClause {
|
||||
public static final String PROP_DELETE_EXISTING_FILES = "delete_existing_files";
|
||||
public static final String PROP_FILE_SUFFIX = "file_suffix";
|
||||
public static final String PROP_WITH_BOM = "with_bom";
|
||||
public static final String COMPRESS_TYPE = "compress_type";
|
||||
|
||||
private static final String PARQUET_PROP_PREFIX = "parquet.";
|
||||
private static final String SCHEMA = "schema";
|
||||
@ -170,8 +179,8 @@ public class OutFileClause {
|
||||
private boolean isAnalyzed = false;
|
||||
private String headerType = "";
|
||||
|
||||
private static final String PARQUET_COMPRESSION = "compression";
|
||||
private TParquetCompressionType parquetCompressionType = TParquetCompressionType.UNCOMPRESSED;
|
||||
private TParquetCompressionType parquetCompressionType = TParquetCompressionType.SNAPPY;
|
||||
private TFileCompressType orcCompressionType = TFileCompressType.ZLIB;
|
||||
private static final String PARQUET_DISABLE_DICTIONARY = "disable_dictionary";
|
||||
private boolean parquetDisableDictionary = false;
|
||||
private static final String PARQUET_VERSION = "version";
|
||||
@ -664,19 +673,11 @@ public class OutFileClause {
|
||||
return fullPath.replace(filePath, "");
|
||||
}
|
||||
|
||||
void setParquetCompressionType(String propertyValue) {
|
||||
if (PARQUET_COMPRESSION_TYPE_MAP.containsKey(propertyValue)) {
|
||||
this.parquetCompressionType = PARQUET_COMPRESSION_TYPE_MAP.get(propertyValue);
|
||||
} else {
|
||||
LOG.warn("not set parquet compression type or is invalid, set default to UNCOMPRESSED type.");
|
||||
}
|
||||
}
|
||||
|
||||
void setParquetVersion(String propertyValue) {
|
||||
if (PARQUET_VERSION_MAP.containsKey(propertyValue)) {
|
||||
this.parquetVersion = PARQUET_VERSION_MAP.get(propertyValue);
|
||||
} else {
|
||||
LOG.warn("not set parquet version type or is invalid, set default to PARQUET_1.0 version.");
|
||||
LOG.debug("not set parquet version type or is invalid, set default to PARQUET_1.0 version.");
|
||||
}
|
||||
}
|
||||
|
||||
@ -692,15 +693,25 @@ public class OutFileClause {
|
||||
* currently only supports: compression, disable_dictionary, version
|
||||
*/
|
||||
private void getParquetProperties(Set<String> processedPropKeys) throws AnalysisException {
|
||||
// save compress type
|
||||
if (properties.containsKey(COMPRESS_TYPE)) {
|
||||
if (PARQUET_COMPRESSION_TYPE_MAP.containsKey(properties.get(COMPRESS_TYPE).toLowerCase())) {
|
||||
this.parquetCompressionType = PARQUET_COMPRESSION_TYPE_MAP.get(
|
||||
properties.get(COMPRESS_TYPE).toLowerCase());
|
||||
processedPropKeys.add(COMPRESS_TYPE);
|
||||
} else {
|
||||
throw new AnalysisException("parquet compression type [" + properties.get(COMPRESS_TYPE)
|
||||
+ "] is invalid, please choose one among SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO, BZ2 or PLAIN");
|
||||
}
|
||||
}
|
||||
|
||||
// save all parquet prefix property
|
||||
Iterator<Map.Entry<String, String>> iter = properties.entrySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
Map.Entry<String, String> entry = iter.next();
|
||||
if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) {
|
||||
processedPropKeys.add(entry.getKey());
|
||||
if (entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_COMPRESSION)) {
|
||||
setParquetCompressionType(entry.getValue());
|
||||
} else if (entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_DISABLE_DICTIONARY)) {
|
||||
if (entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_DISABLE_DICTIONARY)) {
|
||||
this.parquetDisableDictionary = Boolean.valueOf(entry.getValue());
|
||||
} else if (entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_VERSION)) {
|
||||
setParquetVersion(entry.getValue());
|
||||
@ -744,6 +755,18 @@ public class OutFileClause {
|
||||
}
|
||||
|
||||
private void getOrcProperties(Set<String> processedPropKeys) throws AnalysisException {
|
||||
// get compression type
|
||||
// save compress type
|
||||
if (properties.containsKey(COMPRESS_TYPE)) {
|
||||
if (ORC_COMPRESSION_TYPE_MAP.containsKey(properties.get(COMPRESS_TYPE).toLowerCase())) {
|
||||
this.orcCompressionType = ORC_COMPRESSION_TYPE_MAP.get(properties.get(COMPRESS_TYPE).toLowerCase());
|
||||
processedPropKeys.add(COMPRESS_TYPE);
|
||||
} else {
|
||||
throw new AnalysisException("orc compression type [" + properties.get(COMPRESS_TYPE) + "] is invalid,"
|
||||
+ " please choose one among ZLIB, SNAPPY, ZSTD or PLAIN");
|
||||
}
|
||||
}
|
||||
|
||||
// check schema. if schema is not set, Doris will gen schema by select items
|
||||
String schema = properties.get(SCHEMA);
|
||||
if (schema == null) {
|
||||
@ -846,6 +869,7 @@ public class OutFileClause {
|
||||
}
|
||||
if (isOrcFormat()) {
|
||||
sinkOptions.setOrcSchema(serializeOrcSchema());
|
||||
sinkOptions.setOrcCompressionType(orcCompressionType);
|
||||
}
|
||||
return sinkOptions;
|
||||
}
|
||||
|
||||
@ -62,6 +62,7 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
|
||||
import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
|
||||
@ -174,6 +175,8 @@ public class ExportJob implements Writable {
|
||||
private String withBom;
|
||||
@SerializedName("dataConsistency")
|
||||
private String dataConsistency;
|
||||
@SerializedName("compressType")
|
||||
private String compressType;
|
||||
|
||||
private TableRef tableRef;
|
||||
|
||||
@ -621,6 +624,12 @@ public class ExportJob implements Writable {
|
||||
if (format.equals("csv") || format.equals("csv_with_names") || format.equals("csv_with_names_and_types")) {
|
||||
outfileProperties.put(OutFileClause.PROP_COLUMN_SEPARATOR, columnSeparator);
|
||||
outfileProperties.put(OutFileClause.PROP_LINE_DELIMITER, lineDelimiter);
|
||||
} else {
|
||||
// orc / parquet
|
||||
// compressType == null means outfile will use default compression type
|
||||
if (compressType != null) {
|
||||
outfileProperties.put(ExportCommand.COMPRESS_TYPE, compressType);
|
||||
}
|
||||
}
|
||||
if (!maxFileSize.isEmpty()) {
|
||||
outfileProperties.put(OutFileClause.PROP_MAX_FILE_SIZE, maxFileSize);
|
||||
|
||||
@ -74,6 +74,7 @@ public class ExportCommand extends Command implements ForwardWithSync {
|
||||
public static final String PARALLELISM = "parallelism";
|
||||
public static final String LABEL = "label";
|
||||
public static final String DATA_CONSISTENCY = "data_consistency";
|
||||
public static final String COMPRESS_TYPE = "compress_type";
|
||||
private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
|
||||
private static final String DEFAULT_LINE_DELIMITER = "\n";
|
||||
private static final String DEFAULT_PARALLELISM = "1";
|
||||
@ -91,6 +92,7 @@ public class ExportCommand extends Command implements ForwardWithSync {
|
||||
.add(PropertyAnalyzer.PROPERTIES_TIMEOUT)
|
||||
.add("format")
|
||||
.add(OutFileClause.PROP_WITH_BOM)
|
||||
.add(COMPRESS_TYPE)
|
||||
.build();
|
||||
|
||||
private final List<String> nameParts;
|
||||
@ -337,9 +339,13 @@ public class ExportCommand extends Command implements ForwardWithSync {
|
||||
} catch (NumberFormatException e) {
|
||||
throw new UserException("The value of timeout is invalid!");
|
||||
}
|
||||
|
||||
exportJob.setTimeoutSecond(timeoutSecond);
|
||||
|
||||
// set compress_type
|
||||
if (fileProperties.containsKey(COMPRESS_TYPE)) {
|
||||
exportJob.setCompressType(fileProperties.get(COMPRESS_TYPE));
|
||||
}
|
||||
|
||||
// exportJob generate outfile sql
|
||||
exportJob.generateOutfileLogicalPlans(RelationUtil.getQualifierName(ctx, this.nameParts));
|
||||
return exportJob;
|
||||
|
||||
Reference in New Issue
Block a user