[branch-2.1](hive) support hive write text table (#38549) (#40063)

1. Support write hive text table
2. Add SessionVariable `hive_text_compression` to write compressed hive
text table
3. Supported compression type: gzip, bzip2, snappy, lz4, zstd

pick from https://github.com/apache/doris/pull/38549
This commit is contained in:
Socrates
2024-08-29 16:50:40 +08:00
committed by GitHub
parent 901344a880
commit c6df7c21a3
30 changed files with 2311 additions and 153 deletions

View File

@ -569,6 +569,7 @@ public class Util {
*
* @param path of file to be inferred.
*/
@NotNull
public static TFileCompressType inferFileCompressTypeByPath(String path) {
String lowerCasePath = path.toLowerCase();
@ -586,6 +587,8 @@ public class Util {
return TFileCompressType.DEFLATE;
} else if (lowerCasePath.endsWith(".snappy")) {
return TFileCompressType.SNAPPYBLOCK;
} else if (lowerCasePath.endsWith(".zst") || lowerCasePath.endsWith(".zstd")) {
return TFileCompressType.ZSTD;
} else {
return TFileCompressType.PLAIN;
}

View File

@ -80,6 +80,10 @@ public abstract class BaseExternalTableDataSink extends DataSink {
return TFileCompressType.ZLIB;
} else if ("zstd".equalsIgnoreCase(compressType)) {
return TFileCompressType.ZSTD;
} else if ("gzip".equalsIgnoreCase(compressType)) {
return TFileCompressType.GZ;
} else if ("bzip2".equalsIgnoreCase(compressType)) {
return TFileCompressType.BZ2;
} else if ("uncompressed".equalsIgnoreCase(compressType)) {
return TFileCompressType.PLAIN;
} else {

View File

@ -25,6 +25,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.qe.ConnectContext;
@ -38,6 +39,7 @@ import org.apache.doris.thrift.THiveColumn;
import org.apache.doris.thrift.THiveColumnType;
import org.apache.doris.thrift.THiveLocationParams;
import org.apache.doris.thrift.THivePartition;
import org.apache.doris.thrift.THiveSerDeProperties;
import org.apache.doris.thrift.THiveTableSink;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@ -50,9 +52,23 @@ import java.util.Set;
import java.util.stream.Collectors;
public class HiveTableSink extends BaseExternalTableDataSink {
public static final String PROP_FIELD_DELIMITER = "field.delim";
public static final String DEFAULT_FIELD_DELIMITER = "\1";
public static final String PROP_SERIALIZATION_FORMAT = "serialization.format";
public static final String PROP_LINE_DELIMITER = "line.delim";
public static final String DEFAULT_LINE_DELIMITER = "\n";
public static final String PROP_COLLECT_DELIMITER = "collection.delim";
public static final String DEFAULT_COLLECT_DELIMITER = "\2";
public static final String PROP_MAPKV_DELIMITER = "mapkv.delim";
public static final String DEFAULT_MAPKV_DELIMITER = "\3";
public static final String PROP_ESCAPE_DELIMITER = "escape.delim";
public static final String DEFAULT_ESCAPE_DELIMIER = "\\";
public static final String PROP_NULL_FORMAT = "serialization.null.format";
public static final String DEFAULT_NULL_FORMAT = "\\N";
private final HMSExternalTable targetTable;
private static final HashSet<TFileFormatType> supportedTypes = new HashSet<TFileFormatType>() {{
add(TFileFormatType.FORMAT_CSV_PLAIN);
add(TFileFormatType.FORMAT_ORC);
add(TFileFormatType.FORMAT_PARQUET);
}};
@ -115,6 +131,7 @@ public class HiveTableSink extends BaseExternalTableDataSink {
TFileFormatType formatType = getTFileFormatType(sd.getInputFormat());
tSink.setFileFormat(formatType);
setCompressType(tSink, formatType);
setSerDeProperties(tSink);
THiveLocationParams locationParams = new THiveLocationParams();
LocationPath locationPath = new LocationPath(sd.getLocation(), targetTable.getHadoopProperties());
@ -164,6 +181,9 @@ public class HiveTableSink extends BaseExternalTableDataSink {
case FORMAT_PARQUET:
compressType = targetTable.getRemoteTable().getParameters().get("parquet.compression");
break;
case FORMAT_CSV_PLAIN:
compressType = ConnectContext.get().getSessionVariable().hiveTextCompression();
break;
default:
compressType = "uncompressed";
break;
@ -194,6 +214,45 @@ public class HiveTableSink extends BaseExternalTableDataSink {
tSink.setPartitions(partitions);
}
private void setSerDeProperties(THiveTableSink tSink) {
THiveSerDeProperties serDeProperties = new THiveSerDeProperties();
// 1. set field delimiter
Optional<String> fieldDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
PROP_FIELD_DELIMITER);
Optional<String> serFormat = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
PROP_SERIALIZATION_FORMAT);
serDeProperties.setFieldDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
DEFAULT_FIELD_DELIMITER, fieldDelim, serFormat)));
// 2. set line delimiter
Optional<String> lineDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
PROP_LINE_DELIMITER);
serDeProperties.setLineDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
DEFAULT_LINE_DELIMITER, lineDelim)));
// 3. set collection delimiter
Optional<String> collectDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
PROP_COLLECT_DELIMITER);
serDeProperties
.setCollectionDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
DEFAULT_COLLECT_DELIMITER, collectDelim)));
// 4. set mapkv delimiter
Optional<String> mapkvDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
PROP_MAPKV_DELIMITER);
serDeProperties.setMapkvDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
DEFAULT_MAPKV_DELIMITER, mapkvDelim)));
// 5. set escape delimiter
Optional<String> escapeDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
PROP_ESCAPE_DELIMITER);
serDeProperties
.setEscapeChar(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
DEFAULT_ESCAPE_DELIMIER, escapeDelim)));
// 6. set null format
Optional<String> nullFormat = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
PROP_NULL_FORMAT);
serDeProperties.setNullFormat(HiveMetaStoreClientHelper.firstPresentOrDefault(
DEFAULT_NULL_FORMAT, nullFormat));
tSink.setSerdeProperties(serDeProperties);
}
protected TDataSinkType getDataSinkType() {
return TDataSinkType.HIVE_TABLE_SINK;
}

View File

@ -197,6 +197,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_SYNC_RUNTIME_FILTER_SIZE = "enable_sync_runtime_filter_size";
public static final String HIVE_TEXT_COMPRESSION = "hive_text_compression";
public static final String READ_CSV_EMPTY_LINE_AS_NULL = "read_csv_empty_line_as_null";
public static final String BE_NUMBER_FOR_TEST = "be_number_for_test";
@ -1075,6 +1077,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_SYNC_RUNTIME_FILTER_SIZE, needForward = true)
private boolean enableSyncRuntimeFilterSize = true;
@VariableMgr.VarAttr(name = HIVE_TEXT_COMPRESSION, needForward = true)
private String hiveTextCompression = "uncompressed";
@VariableMgr.VarAttr(name = READ_CSV_EMPTY_LINE_AS_NULL, needForward = true,
description = {"在读取csv文件时是否读取csv的空行为null",
"Determine whether to read empty rows in CSV files as NULL when reading CSV files."})
@ -3876,6 +3881,14 @@ public class SessionVariable implements Serializable, Writable {
return enablePipelineXEngine;
}
public String hiveTextCompression() {
return hiveTextCompression;
}
public void setHiveTextCompression(String hiveTextCompression) {
this.hiveTextCompression = hiveTextCompression;
}
public boolean enableSyncRuntimeFilterSize() {
return enableSyncRuntimeFilterSize;
}