…0063)" This reverts commit c6df7c21a3c09ae1664deabacb88dfcea9d94b68. ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
@ -569,7 +569,6 @@ public class Util {
|
||||
*
|
||||
* @param path of file to be inferred.
|
||||
*/
|
||||
|
||||
@NotNull
|
||||
public static TFileCompressType inferFileCompressTypeByPath(String path) {
|
||||
String lowerCasePath = path.toLowerCase();
|
||||
@ -587,8 +586,6 @@ public class Util {
|
||||
return TFileCompressType.DEFLATE;
|
||||
} else if (lowerCasePath.endsWith(".snappy")) {
|
||||
return TFileCompressType.SNAPPYBLOCK;
|
||||
} else if (lowerCasePath.endsWith(".zst") || lowerCasePath.endsWith(".zstd")) {
|
||||
return TFileCompressType.ZSTD;
|
||||
} else {
|
||||
return TFileCompressType.PLAIN;
|
||||
}
|
||||
|
||||
@ -80,10 +80,6 @@ public abstract class BaseExternalTableDataSink extends DataSink {
|
||||
return TFileCompressType.ZLIB;
|
||||
} else if ("zstd".equalsIgnoreCase(compressType)) {
|
||||
return TFileCompressType.ZSTD;
|
||||
} else if ("gzip".equalsIgnoreCase(compressType)) {
|
||||
return TFileCompressType.GZ;
|
||||
} else if ("bzip2".equalsIgnoreCase(compressType)) {
|
||||
return TFileCompressType.BZ2;
|
||||
} else if ("uncompressed".equalsIgnoreCase(compressType)) {
|
||||
return TFileCompressType.PLAIN;
|
||||
} else {
|
||||
|
||||
@ -25,7 +25,6 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.util.LocationPath;
|
||||
import org.apache.doris.datasource.hive.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HMSExternalTable;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
|
||||
import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
@ -39,7 +38,6 @@ import org.apache.doris.thrift.THiveColumn;
|
||||
import org.apache.doris.thrift.THiveColumnType;
|
||||
import org.apache.doris.thrift.THiveLocationParams;
|
||||
import org.apache.doris.thrift.THivePartition;
|
||||
import org.apache.doris.thrift.THiveSerDeProperties;
|
||||
import org.apache.doris.thrift.THiveTableSink;
|
||||
|
||||
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
|
||||
@ -52,23 +50,9 @@ import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class HiveTableSink extends BaseExternalTableDataSink {
|
||||
public static final String PROP_FIELD_DELIMITER = "field.delim";
|
||||
public static final String DEFAULT_FIELD_DELIMITER = "\1";
|
||||
public static final String PROP_SERIALIZATION_FORMAT = "serialization.format";
|
||||
public static final String PROP_LINE_DELIMITER = "line.delim";
|
||||
public static final String DEFAULT_LINE_DELIMITER = "\n";
|
||||
public static final String PROP_COLLECT_DELIMITER = "collection.delim";
|
||||
public static final String DEFAULT_COLLECT_DELIMITER = "\2";
|
||||
public static final String PROP_MAPKV_DELIMITER = "mapkv.delim";
|
||||
public static final String DEFAULT_MAPKV_DELIMITER = "\3";
|
||||
public static final String PROP_ESCAPE_DELIMITER = "escape.delim";
|
||||
public static final String DEFAULT_ESCAPE_DELIMIER = "\\";
|
||||
public static final String PROP_NULL_FORMAT = "serialization.null.format";
|
||||
public static final String DEFAULT_NULL_FORMAT = "\\N";
|
||||
|
||||
private final HMSExternalTable targetTable;
|
||||
private static final HashSet<TFileFormatType> supportedTypes = new HashSet<TFileFormatType>() {{
|
||||
add(TFileFormatType.FORMAT_CSV_PLAIN);
|
||||
add(TFileFormatType.FORMAT_ORC);
|
||||
add(TFileFormatType.FORMAT_PARQUET);
|
||||
}};
|
||||
@ -131,7 +115,6 @@ public class HiveTableSink extends BaseExternalTableDataSink {
|
||||
TFileFormatType formatType = getTFileFormatType(sd.getInputFormat());
|
||||
tSink.setFileFormat(formatType);
|
||||
setCompressType(tSink, formatType);
|
||||
setSerDeProperties(tSink);
|
||||
|
||||
THiveLocationParams locationParams = new THiveLocationParams();
|
||||
LocationPath locationPath = new LocationPath(sd.getLocation(), targetTable.getHadoopProperties());
|
||||
@ -181,9 +164,6 @@ public class HiveTableSink extends BaseExternalTableDataSink {
|
||||
case FORMAT_PARQUET:
|
||||
compressType = targetTable.getRemoteTable().getParameters().get("parquet.compression");
|
||||
break;
|
||||
case FORMAT_CSV_PLAIN:
|
||||
compressType = ConnectContext.get().getSessionVariable().hiveTextCompression();
|
||||
break;
|
||||
default:
|
||||
compressType = "uncompressed";
|
||||
break;
|
||||
@ -214,45 +194,6 @@ public class HiveTableSink extends BaseExternalTableDataSink {
|
||||
tSink.setPartitions(partitions);
|
||||
}
|
||||
|
||||
private void setSerDeProperties(THiveTableSink tSink) {
|
||||
THiveSerDeProperties serDeProperties = new THiveSerDeProperties();
|
||||
// 1. set field delimiter
|
||||
Optional<String> fieldDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
|
||||
PROP_FIELD_DELIMITER);
|
||||
Optional<String> serFormat = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
|
||||
PROP_SERIALIZATION_FORMAT);
|
||||
serDeProperties.setFieldDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_FIELD_DELIMITER, fieldDelim, serFormat)));
|
||||
// 2. set line delimiter
|
||||
Optional<String> lineDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
|
||||
PROP_LINE_DELIMITER);
|
||||
serDeProperties.setLineDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_LINE_DELIMITER, lineDelim)));
|
||||
// 3. set collection delimiter
|
||||
Optional<String> collectDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
|
||||
PROP_COLLECT_DELIMITER);
|
||||
serDeProperties
|
||||
.setCollectionDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_COLLECT_DELIMITER, collectDelim)));
|
||||
// 4. set mapkv delimiter
|
||||
Optional<String> mapkvDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
|
||||
PROP_MAPKV_DELIMITER);
|
||||
serDeProperties.setMapkvDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_MAPKV_DELIMITER, mapkvDelim)));
|
||||
// 5. set escape delimiter
|
||||
Optional<String> escapeDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
|
||||
PROP_ESCAPE_DELIMITER);
|
||||
serDeProperties
|
||||
.setEscapeChar(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_ESCAPE_DELIMIER, escapeDelim)));
|
||||
// 6. set null format
|
||||
Optional<String> nullFormat = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
|
||||
PROP_NULL_FORMAT);
|
||||
serDeProperties.setNullFormat(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_NULL_FORMAT, nullFormat));
|
||||
tSink.setSerdeProperties(serDeProperties);
|
||||
}
|
||||
|
||||
protected TDataSinkType getDataSinkType() {
|
||||
return TDataSinkType.HIVE_TABLE_SINK;
|
||||
}
|
||||
|
||||
@ -197,8 +197,6 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
public static final String ENABLE_SYNC_RUNTIME_FILTER_SIZE = "enable_sync_runtime_filter_size";
|
||||
|
||||
public static final String HIVE_TEXT_COMPRESSION = "hive_text_compression";
|
||||
|
||||
public static final String READ_CSV_EMPTY_LINE_AS_NULL = "read_csv_empty_line_as_null";
|
||||
|
||||
public static final String BE_NUMBER_FOR_TEST = "be_number_for_test";
|
||||
@ -1077,9 +1075,6 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = ENABLE_SYNC_RUNTIME_FILTER_SIZE, needForward = true)
|
||||
private boolean enableSyncRuntimeFilterSize = true;
|
||||
|
||||
@VariableMgr.VarAttr(name = HIVE_TEXT_COMPRESSION, needForward = true)
|
||||
private String hiveTextCompression = "uncompressed";
|
||||
|
||||
@VariableMgr.VarAttr(name = READ_CSV_EMPTY_LINE_AS_NULL, needForward = true,
|
||||
description = {"在读取csv文件时是否读取csv的空行为null",
|
||||
"Determine whether to read empty rows in CSV files as NULL when reading CSV files."})
|
||||
@ -3881,14 +3876,6 @@ public class SessionVariable implements Serializable, Writable {
|
||||
return enablePipelineXEngine;
|
||||
}
|
||||
|
||||
public String hiveTextCompression() {
|
||||
return hiveTextCompression;
|
||||
}
|
||||
|
||||
public void setHiveTextCompression(String hiveTextCompression) {
|
||||
this.hiveTextCompression = hiveTextCompression;
|
||||
}
|
||||
|
||||
public boolean enableSyncRuntimeFilterSize() {
|
||||
return enableSyncRuntimeFilterSize;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user