cherry pick from #41860 Co-authored-by: wuwenchi <wuwenchihdu@hotmail.com>
This commit is contained in:
@ -0,0 +1,155 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.datasource.hive;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import org.apache.hadoop.hive.metastore.api.Table;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
public class HiveProperties {
|
||||
public static final String PROP_FIELD_DELIMITER = "field.delim";
|
||||
public static final String PROP_SEPARATOR_CHAR = "separatorChar";
|
||||
public static final String PROP_SERIALIZATION_FORMAT = "serialization.format";
|
||||
public static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01"
|
||||
|
||||
public static final String PROP_LINE_DELIMITER = "line.delim";
|
||||
public static final String DEFAULT_LINE_DELIMITER = "\n";
|
||||
|
||||
public static final String PROP_QUOTE_CHAR = "quoteChar";
|
||||
|
||||
public static final String PROP_COLLECTION_DELIMITER_HIVE2 = "colelction.delim";
|
||||
public static final String PROP_COLLECTION_DELIMITER_HIVE3 = "collection.delim";
|
||||
public static final String DEFAULT_COLLECTION_DELIMITER = "\2";
|
||||
|
||||
public static final String PROP_MAP_KV_DELIMITER = "mapkey.delim";
|
||||
public static final String DEFAULT_MAP_KV_DELIMITER = "\003";
|
||||
|
||||
public static final String PROP_ESCAPE_DELIMITER = "escape.delim";
|
||||
public static final String DEFAULT_ESCAPE_DELIMIER = "\\";
|
||||
|
||||
public static final String PROP_NULL_FORMAT = "serialization.null.format";
|
||||
public static final String DEFAULT_NULL_FORMAT = "\\N";
|
||||
|
||||
public static final Set<String> HIVE_SERDE_PROPERTIES = ImmutableSet.of(
|
||||
PROP_FIELD_DELIMITER,
|
||||
PROP_COLLECTION_DELIMITER_HIVE2,
|
||||
PROP_COLLECTION_DELIMITER_HIVE3,
|
||||
PROP_SEPARATOR_CHAR,
|
||||
PROP_SERIALIZATION_FORMAT,
|
||||
PROP_LINE_DELIMITER,
|
||||
PROP_QUOTE_CHAR,
|
||||
PROP_MAP_KV_DELIMITER,
|
||||
PROP_ESCAPE_DELIMITER,
|
||||
PROP_NULL_FORMAT
|
||||
);
|
||||
|
||||
public static String getFieldDelimiter(Table table) {
|
||||
// This method is used for text format.
|
||||
// If you need compatibility with csv format, please use `getColumnSeparator`.
|
||||
Optional<String> fieldDelim = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_FIELD_DELIMITER);
|
||||
Optional<String> serFormat = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_SERIALIZATION_FORMAT);
|
||||
return HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_FIELD_DELIMITER, fieldDelim, serFormat));
|
||||
}
|
||||
|
||||
public static String getColumnSeparator(Table table) {
|
||||
Optional<String> fieldDelim = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_FIELD_DELIMITER);
|
||||
Optional<String> columnSeparator = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_SEPARATOR_CHAR);
|
||||
Optional<String> serFormat = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_SERIALIZATION_FORMAT);
|
||||
return HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_FIELD_DELIMITER, fieldDelim, columnSeparator, serFormat));
|
||||
}
|
||||
|
||||
|
||||
public static String getLineDelimiter(Table table) {
|
||||
Optional<String> lineDelim = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_LINE_DELIMITER);
|
||||
return HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_LINE_DELIMITER, lineDelim));
|
||||
}
|
||||
|
||||
public static String getMapKvDelimiter(Table table) {
|
||||
Optional<String> mapkvDelim = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_MAP_KV_DELIMITER);
|
||||
return HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_MAP_KV_DELIMITER, mapkvDelim));
|
||||
}
|
||||
|
||||
public static String getCollectionDelimiter(Table table) {
|
||||
Optional<String> collectionDelimHive2 = HiveMetaStoreClientHelper.getSerdeProperty(table,
|
||||
PROP_COLLECTION_DELIMITER_HIVE2);
|
||||
Optional<String> collectionDelimHive3 = HiveMetaStoreClientHelper.getSerdeProperty(table,
|
||||
PROP_COLLECTION_DELIMITER_HIVE3);
|
||||
return HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_COLLECTION_DELIMITER, collectionDelimHive2, collectionDelimHive3));
|
||||
}
|
||||
|
||||
public static Optional<String> getQuoteChar(Table table) {
|
||||
Map<String, String> serdeParams = table.getSd().getSerdeInfo().getParameters();
|
||||
if (serdeParams.containsKey(PROP_QUOTE_CHAR)) {
|
||||
return Optional.of(serdeParams.get(PROP_QUOTE_CHAR));
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public static Optional<String> getEscapeDelimiter(Table table) {
|
||||
Optional<String> escapeDelim = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_ESCAPE_DELIMITER);
|
||||
if (escapeDelim.isPresent()) {
|
||||
String escape = HiveMetaStoreClientHelper.getByte(escapeDelim.get());
|
||||
if (escape != null) {
|
||||
return Optional.of(escape);
|
||||
} else {
|
||||
return Optional.of(DEFAULT_ESCAPE_DELIMIER);
|
||||
}
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
public static String getNullFormat(Table table) {
|
||||
Optional<String> nullFormat = HiveMetaStoreClientHelper.getSerdeProperty(table, PROP_NULL_FORMAT);
|
||||
return HiveMetaStoreClientHelper.firstPresentOrDefault(DEFAULT_NULL_FORMAT, nullFormat);
|
||||
}
|
||||
|
||||
// Set properties to table
|
||||
public static void setTableProperties(Table table, Map<String, String> properties) {
|
||||
HashMap<String, String> serdeProps = new HashMap<>();
|
||||
HashMap<String, String> tblProps = new HashMap<>();
|
||||
|
||||
for (String k : properties.keySet()) {
|
||||
if (HIVE_SERDE_PROPERTIES.contains(k)) {
|
||||
serdeProps.put(k, properties.get(k));
|
||||
} else {
|
||||
tblProps.put(k, properties.get(k));
|
||||
}
|
||||
}
|
||||
|
||||
if (table.getParameters() == null) {
|
||||
table.setParameters(tblProps);
|
||||
} else {
|
||||
table.getParameters().putAll(tblProps);
|
||||
}
|
||||
|
||||
if (table.getSd().getSerdeInfo().getParameters() == null) {
|
||||
table.getSd().getSerdeInfo().setParameters(serdeProps);
|
||||
} else {
|
||||
table.getSd().getSerdeInfo().getParameters().putAll(serdeProps);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -25,6 +25,7 @@ import org.apache.doris.datasource.statistics.CommonStatistics;
|
||||
import org.apache.doris.fs.remote.BrokerFileSystem;
|
||||
import org.apache.doris.fs.remote.RemoteFileSystem;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
@ -69,6 +70,8 @@ public final class HiveUtil {
|
||||
public static final String COMPRESSION_KEY = "compression";
|
||||
public static final Set<String> SUPPORTED_ORC_COMPRESSIONS = ImmutableSet.of("plain", "zlib", "snappy", "zstd");
|
||||
public static final Set<String> SUPPORTED_PARQUET_COMPRESSIONS = ImmutableSet.of("plain", "snappy", "zstd");
|
||||
public static final Set<String> SUPPORTED_TEXT_COMPRESSIONS =
|
||||
ImmutableSet.of("plain", "gzip", "zstd", "bzip2", "lz4", "snappy");
|
||||
|
||||
private HiveUtil() {
|
||||
}
|
||||
@ -191,7 +194,6 @@ public final class HiveUtil {
|
||||
Table table = new Table();
|
||||
table.setDbName(hiveTable.getDbName());
|
||||
table.setTableName(hiveTable.getTableName());
|
||||
// table.setOwner("");
|
||||
int createTime = (int) System.currentTimeMillis() * 1000;
|
||||
table.setCreateTime(createTime);
|
||||
table.setLastAccessTime(createTime);
|
||||
@ -211,10 +213,10 @@ public final class HiveUtil {
|
||||
setCompressType(hiveTable, props);
|
||||
// set hive table comment by table properties
|
||||
props.put("comment", hiveTable.getComment());
|
||||
table.setParameters(props);
|
||||
if (props.containsKey("owner")) {
|
||||
table.setOwner(props.get("owner"));
|
||||
}
|
||||
HiveProperties.setTableProperties(table, props);
|
||||
return table;
|
||||
}
|
||||
|
||||
@ -232,6 +234,12 @@ public final class HiveUtil {
|
||||
throw new AnalysisException("Unsupported orc compression type " + compression);
|
||||
}
|
||||
props.putIfAbsent("orc.compress", StringUtils.isEmpty(compression) ? "zlib" : compression);
|
||||
} else if (fileFormat.equalsIgnoreCase("text")) {
|
||||
if (StringUtils.isNotEmpty(compression) && !SUPPORTED_TEXT_COMPRESSIONS.contains(compression)) {
|
||||
throw new AnalysisException("Unsupported text compression type " + compression);
|
||||
}
|
||||
props.putIfAbsent("text.compression", StringUtils.isEmpty(compression)
|
||||
? ConnectContext.get().getSessionVariable().hiveTextCompression() : compression);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Compression is not supported on " + fileFormat);
|
||||
}
|
||||
@ -249,7 +257,7 @@ public final class HiveUtil {
|
||||
sd.setBucketCols(bucketCols);
|
||||
sd.setNumBuckets(numBuckets);
|
||||
Map<String, String> parameters = new HashMap<>();
|
||||
parameters.put("tag", "doris external hive talbe");
|
||||
parameters.put("tag", "doris external hive table");
|
||||
sd.setParameters(parameters);
|
||||
return sd;
|
||||
}
|
||||
@ -266,6 +274,10 @@ public final class HiveUtil {
|
||||
inputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
|
||||
outputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
|
||||
serDe = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe";
|
||||
} else if (fileFormat.equalsIgnoreCase("text")) {
|
||||
inputFormat = "org.apache.hadoop.mapred.TextInputFormat";
|
||||
outputFormat = "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
|
||||
serDe = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
|
||||
} else {
|
||||
throw new IllegalArgumentException("Creating table with an unsupported file format: " + fileFormat);
|
||||
}
|
||||
|
||||
@ -38,6 +38,7 @@ import org.apache.doris.datasource.hive.HiveMetaStoreCache;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreCache.FileCacheValue;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.datasource.hive.HivePartition;
|
||||
import org.apache.doris.datasource.hive.HiveProperties;
|
||||
import org.apache.doris.datasource.hive.HiveTransaction;
|
||||
import org.apache.doris.datasource.hive.source.HiveSplit.HiveSplitCreator;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
|
||||
@ -57,6 +58,7 @@ import com.google.common.collect.Maps;
|
||||
import lombok.Setter;
|
||||
import org.apache.hadoop.hive.common.ValidWriteIdList;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.hadoop.hive.metastore.api.Table;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -65,7 +67,6 @@ import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
@ -77,26 +78,6 @@ import java.util.stream.Collectors;
|
||||
public class HiveScanNode extends FileQueryScanNode {
|
||||
private static final Logger LOG = LogManager.getLogger(HiveScanNode.class);
|
||||
|
||||
public static final String PROP_FIELD_DELIMITER = "field.delim";
|
||||
public static final String DEFAULT_FIELD_DELIMITER = "\1"; // "\x01"
|
||||
public static final String PROP_LINE_DELIMITER = "line.delim";
|
||||
public static final String DEFAULT_LINE_DELIMITER = "\n";
|
||||
public static final String PROP_SEPARATOR_CHAR = "separatorChar";
|
||||
public static final String PROP_QUOTE_CHAR = "quoteChar";
|
||||
public static final String PROP_SERIALIZATION_FORMAT = "serialization.format";
|
||||
|
||||
public static final String PROP_COLLECTION_DELIMITER_HIVE2 = "colelction.delim";
|
||||
public static final String PROP_COLLECTION_DELIMITER_HIVE3 = "collection.delim";
|
||||
public static final String DEFAULT_COLLECTION_DELIMITER = "\2";
|
||||
|
||||
public static final String PROP_MAP_KV_DELIMITER = "mapkey.delim";
|
||||
public static final String DEFAULT_MAP_KV_DELIMITER = "\003";
|
||||
|
||||
public static final String PROP_ESCAPE_DELIMITER = "escape.delim";
|
||||
public static final String DEFAULT_ESCAPE_DELIMIER = "\\";
|
||||
public static final String PROP_NULL_FORMAT = "serialization.null.format";
|
||||
public static final String DEFAULT_NULL_FORMAT = "\\N";
|
||||
|
||||
protected final HMSExternalTable hmsTable;
|
||||
private HiveTransaction hiveTransaction = null;
|
||||
|
||||
@ -431,57 +412,21 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
@Override
|
||||
protected TFileAttributes getFileAttributes() throws UserException {
|
||||
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
|
||||
|
||||
Table table = hmsTable.getRemoteTable();
|
||||
// 1. set column separator
|
||||
Optional<String> fieldDelim = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
|
||||
PROP_FIELD_DELIMITER);
|
||||
Optional<String> serFormat = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
|
||||
PROP_SERIALIZATION_FORMAT);
|
||||
Optional<String> columnSeparator = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
|
||||
PROP_SEPARATOR_CHAR);
|
||||
textParams.setColumnSeparator(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_FIELD_DELIMITER, fieldDelim, columnSeparator, serFormat)));
|
||||
textParams.setColumnSeparator(HiveProperties.getColumnSeparator(table));
|
||||
// 2. set line delimiter
|
||||
Optional<String> lineDelim = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
|
||||
PROP_LINE_DELIMITER);
|
||||
textParams.setLineDelimiter(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_LINE_DELIMITER, lineDelim)));
|
||||
textParams.setLineDelimiter(HiveProperties.getLineDelimiter(table));
|
||||
// 3. set mapkv delimiter
|
||||
Optional<String> mapkvDelim = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
|
||||
PROP_MAP_KV_DELIMITER);
|
||||
textParams.setMapkvDelimiter(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_MAP_KV_DELIMITER, mapkvDelim)));
|
||||
textParams.setMapkvDelimiter(HiveProperties.getMapKvDelimiter(table));
|
||||
// 4. set collection delimiter
|
||||
Optional<String> collectionDelimHive2 = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
|
||||
PROP_COLLECTION_DELIMITER_HIVE2);
|
||||
Optional<String> collectionDelimHive3 = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
|
||||
PROP_COLLECTION_DELIMITER_HIVE3);
|
||||
textParams.setCollectionDelimiter(
|
||||
HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_COLLECTION_DELIMITER, collectionDelimHive2, collectionDelimHive3)));
|
||||
textParams.setCollectionDelimiter(HiveProperties.getCollectionDelimiter(table));
|
||||
// 5. set quote char
|
||||
Map<String, String> serdeParams = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters();
|
||||
if (serdeParams.containsKey(PROP_QUOTE_CHAR)) {
|
||||
textParams.setEnclose(serdeParams.get(PROP_QUOTE_CHAR).getBytes()[0]);
|
||||
}
|
||||
HiveProperties.getQuoteChar(table).ifPresent(d -> textParams.setEnclose(d.getBytes()[0]));
|
||||
// 6. set escape delimiter
|
||||
Optional<String> escapeDelim = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
|
||||
PROP_ESCAPE_DELIMITER);
|
||||
if (escapeDelim.isPresent()) {
|
||||
String escape = HiveMetaStoreClientHelper.getByte(
|
||||
escapeDelim.get());
|
||||
if (escape != null) {
|
||||
textParams
|
||||
.setEscape(escape.getBytes()[0]);
|
||||
} else {
|
||||
textParams.setEscape(DEFAULT_ESCAPE_DELIMIER.getBytes()[0]);
|
||||
}
|
||||
}
|
||||
HiveProperties.getEscapeDelimiter(table).ifPresent(d -> textParams.setEscape(d.getBytes()[0]));
|
||||
// 7. set null format
|
||||
Optional<String> nullFormat = HiveMetaStoreClientHelper.getSerdeProperty(hmsTable.getRemoteTable(),
|
||||
PROP_NULL_FORMAT);
|
||||
textParams.setNullFormat(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_NULL_FORMAT, nullFormat));
|
||||
textParams.setNullFormat(HiveProperties.getNullFormat(table));
|
||||
|
||||
TFileAttributes fileAttributes = new TFileAttributes();
|
||||
fileAttributes.setTextParams(textParams);
|
||||
|
||||
@ -21,12 +21,10 @@ import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
|
||||
import org.apache.doris.datasource.iceberg.IcebergUtils;
|
||||
import org.apache.doris.planner.ColumnRange;
|
||||
import org.apache.doris.thrift.TFileAttributes;
|
||||
|
||||
import org.apache.iceberg.Table;
|
||||
|
||||
@ -74,11 +72,6 @@ public class IcebergApiSource implements IcebergSource {
|
||||
return icebergExtTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileAttributes getFileAttributes() throws UserException {
|
||||
return new TFileAttributes();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExternalCatalog getCatalog() {
|
||||
return icebergExtTable.getCatalog();
|
||||
|
||||
@ -22,14 +22,10 @@ import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HMSExternalTable;
|
||||
import org.apache.doris.datasource.hive.source.HiveScanNode;
|
||||
import org.apache.doris.datasource.iceberg.IcebergUtils;
|
||||
import org.apache.doris.planner.ColumnRange;
|
||||
import org.apache.doris.thrift.TFileAttributes;
|
||||
import org.apache.doris.thrift.TFileTextScanRangeParams;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@ -70,18 +66,6 @@ public class IcebergHMSSource implements IcebergSource {
|
||||
return hmsTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileAttributes getFileAttributes() throws UserException {
|
||||
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
|
||||
textParams.setColumnSeparator(hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
|
||||
.getOrDefault(HiveScanNode.PROP_FIELD_DELIMITER, HiveScanNode.DEFAULT_FIELD_DELIMITER));
|
||||
textParams.setLineDelimiter(HiveScanNode.DEFAULT_LINE_DELIMITER);
|
||||
TFileAttributes fileAttributes = new TFileAttributes();
|
||||
fileAttributes.setTextParams(textParams);
|
||||
fileAttributes.setHeaderType("");
|
||||
return fileAttributes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExternalCatalog getCatalog() {
|
||||
return hmsTable.getCatalog();
|
||||
|
||||
@ -21,9 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.thrift.TFileAttributes;
|
||||
|
||||
public interface IcebergSource {
|
||||
|
||||
@ -33,8 +31,6 @@ public interface IcebergSource {
|
||||
|
||||
TableIf getTargetTable();
|
||||
|
||||
TFileAttributes getFileAttributes() throws UserException;
|
||||
|
||||
ExternalCatalog getCatalog();
|
||||
|
||||
String getFileFormat() throws DdlException, MetaNotFoundException;
|
||||
|
||||
@ -25,7 +25,7 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.util.LocationPath;
|
||||
import org.apache.doris.datasource.hive.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HMSExternalTable;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.datasource.hive.HiveProperties;
|
||||
import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
|
||||
import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
@ -42,7 +42,9 @@ import org.apache.doris.thrift.THivePartition;
|
||||
import org.apache.doris.thrift.THiveSerDeProperties;
|
||||
import org.apache.doris.thrift.THiveTableSink;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
|
||||
import org.apache.hadoop.hive.metastore.api.Table;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
@ -52,20 +54,6 @@ import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class HiveTableSink extends BaseExternalTableDataSink {
|
||||
public static final String PROP_FIELD_DELIMITER = "field.delim";
|
||||
public static final String DEFAULT_FIELD_DELIMITER = "\1";
|
||||
public static final String PROP_SERIALIZATION_FORMAT = "serialization.format";
|
||||
public static final String PROP_LINE_DELIMITER = "line.delim";
|
||||
public static final String DEFAULT_LINE_DELIMITER = "\n";
|
||||
public static final String PROP_COLLECT_DELIMITER = "collection.delim";
|
||||
public static final String DEFAULT_COLLECT_DELIMITER = "\2";
|
||||
public static final String PROP_MAPKV_DELIMITER = "mapkv.delim";
|
||||
public static final String DEFAULT_MAPKV_DELIMITER = "\3";
|
||||
public static final String PROP_ESCAPE_DELIMITER = "escape.delim";
|
||||
public static final String DEFAULT_ESCAPE_DELIMIER = "\\";
|
||||
public static final String PROP_NULL_FORMAT = "serialization.null.format";
|
||||
public static final String DEFAULT_NULL_FORMAT = "\\N";
|
||||
|
||||
private final HMSExternalTable targetTable;
|
||||
private static final HashSet<TFileFormatType> supportedTypes = new HashSet<TFileFormatType>() {{
|
||||
add(TFileFormatType.FORMAT_CSV_PLAIN);
|
||||
@ -184,10 +172,13 @@ public class HiveTableSink extends BaseExternalTableDataSink {
|
||||
compressType = targetTable.getRemoteTable().getParameters().get("parquet.compression");
|
||||
break;
|
||||
case FORMAT_CSV_PLAIN:
|
||||
compressType = ConnectContext.get().getSessionVariable().hiveTextCompression();
|
||||
compressType = targetTable.getRemoteTable().getParameters().get("text.compression");
|
||||
if (Strings.isNullOrEmpty(compressType)) {
|
||||
compressType = ConnectContext.get().getSessionVariable().hiveTextCompression();
|
||||
}
|
||||
break;
|
||||
default:
|
||||
compressType = "uncompressed";
|
||||
compressType = "plain";
|
||||
break;
|
||||
}
|
||||
tSink.setCompressionType(getTFileCompressType(compressType));
|
||||
@ -218,47 +209,19 @@ public class HiveTableSink extends BaseExternalTableDataSink {
|
||||
|
||||
private void setSerDeProperties(THiveTableSink tSink) {
|
||||
THiveSerDeProperties serDeProperties = new THiveSerDeProperties();
|
||||
Table table = targetTable.getRemoteTable();
|
||||
// 1. set field delimiter
|
||||
Optional<String> fieldDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
|
||||
PROP_FIELD_DELIMITER);
|
||||
Optional<String> serFormat = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
|
||||
PROP_SERIALIZATION_FORMAT);
|
||||
serDeProperties.setFieldDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_FIELD_DELIMITER, fieldDelim, serFormat)));
|
||||
serDeProperties.setFieldDelim(HiveProperties.getFieldDelimiter(table));
|
||||
// 2. set line delimiter
|
||||
Optional<String> lineDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
|
||||
PROP_LINE_DELIMITER);
|
||||
serDeProperties.setLineDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_LINE_DELIMITER, lineDelim)));
|
||||
serDeProperties.setLineDelim(HiveProperties.getLineDelimiter(table));
|
||||
// 3. set collection delimiter
|
||||
Optional<String> collectDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
|
||||
PROP_COLLECT_DELIMITER);
|
||||
serDeProperties
|
||||
.setCollectionDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_COLLECT_DELIMITER, collectDelim)));
|
||||
serDeProperties.setCollectionDelim(HiveProperties.getCollectionDelimiter(table));
|
||||
// 4. set mapkv delimiter
|
||||
Optional<String> mapkvDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
|
||||
PROP_MAPKV_DELIMITER);
|
||||
serDeProperties.setMapkvDelim(HiveMetaStoreClientHelper.getByte(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_MAPKV_DELIMITER, mapkvDelim)));
|
||||
serDeProperties.setMapkvDelim(HiveProperties.getMapKvDelimiter(table));
|
||||
// 5. set escape delimiter
|
||||
Optional<String> escapeDelim = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
|
||||
PROP_ESCAPE_DELIMITER);
|
||||
if (escapeDelim.isPresent()) {
|
||||
String escape = HiveMetaStoreClientHelper.getByte(
|
||||
escapeDelim.get());
|
||||
if (escape != null) {
|
||||
serDeProperties
|
||||
.setEscapeChar(escape);
|
||||
} else {
|
||||
serDeProperties.setEscapeChar(DEFAULT_ESCAPE_DELIMIER);
|
||||
}
|
||||
}
|
||||
HiveProperties.getEscapeDelimiter(table).ifPresent(serDeProperties::setEscapeChar);
|
||||
// 6. set null format
|
||||
Optional<String> nullFormat = HiveMetaStoreClientHelper.getSerdeProperty(targetTable.getRemoteTable(),
|
||||
PROP_NULL_FORMAT);
|
||||
serDeProperties.setNullFormat(HiveMetaStoreClientHelper.firstPresentOrDefault(
|
||||
DEFAULT_NULL_FORMAT, nullFormat));
|
||||
serDeProperties.setNullFormat(HiveProperties.getNullFormat(table));
|
||||
tSink.setSerdeProperties(serDeProperties);
|
||||
}
|
||||
|
||||
|
||||
@ -1107,7 +1107,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
private boolean enableSyncRuntimeFilterSize = true;
|
||||
|
||||
@VariableMgr.VarAttr(name = HIVE_TEXT_COMPRESSION, needForward = true)
|
||||
private String hiveTextCompression = "uncompressed";
|
||||
private String hiveTextCompression = "plain";
|
||||
|
||||
@VariableMgr.VarAttr(name = READ_CSV_EMPTY_LINE_AS_NULL, needForward = true,
|
||||
description = {"在读取csv文件时是否读取csv的空行为null",
|
||||
@ -3981,6 +3981,10 @@ public class SessionVariable implements Serializable, Writable {
|
||||
}
|
||||
|
||||
public String hiveTextCompression() {
|
||||
if (hiveTextCompression.equals("uncompressed")) {
|
||||
// This is for compatibility.
|
||||
return "plain";
|
||||
}
|
||||
return hiveTextCompression;
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,78 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_hive_ddl_text_format", "p0,external,hive,external_docker,external_docker_hive") {
|
||||
String enabled = context.config.otherConfigs.get("enableHiveTest")
|
||||
if (enabled != null && enabled.equalsIgnoreCase("true")) {
|
||||
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
|
||||
String hms_port = context.config.otherConfigs.get("hive3HmsPort")
|
||||
String hdfs_port = context.config.otherConfigs.get("hive3HdfsPort")
|
||||
String catalog_name = "test_hive_ddl_text_format"
|
||||
String table_name = "table_with_pars";
|
||||
|
||||
sql """drop catalog if exists ${catalog_name};"""
|
||||
|
||||
sql """
|
||||
create catalog if not exists ${catalog_name} properties (
|
||||
'type'='hms',
|
||||
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}',
|
||||
'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}',
|
||||
'use_meta_cache' = 'true'
|
||||
);
|
||||
"""
|
||||
logger.info("catalog " + catalog_name + " created")
|
||||
sql """switch ${catalog_name};"""
|
||||
logger.info("switched to catalog " + catalog_name)
|
||||
sql """use `default`;"""
|
||||
|
||||
sql """ drop table if exists tb_text """
|
||||
sql """
|
||||
create table tb_text (
|
||||
id int,
|
||||
`name` string
|
||||
) PROPERTIES (
|
||||
'compression'='gzip',
|
||||
'file_format'='text',
|
||||
'field.delim'='\t',
|
||||
'line.delim'='\n',
|
||||
'collection.delim'=';',
|
||||
'mapkey.delim'=':',
|
||||
'serialization.null.format'='\\N'
|
||||
);
|
||||
"""
|
||||
|
||||
String serde = "'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'"
|
||||
String input_format = "'org.apache.hadoop.mapred.TextInputFormat'"
|
||||
String output_format = "'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'"
|
||||
String doris_fileformat = "'doris.file_format'='text'"
|
||||
String filed_delim = "'field.delim'"
|
||||
String line_delim = "'line.delim'"
|
||||
String mapkey_delim = "'mapkey.delim'"
|
||||
|
||||
def create_tbl_res = sql """ show create table tb_text """
|
||||
String res = create_tbl_res.toString()
|
||||
logger.info("${res}")
|
||||
assertTrue(res.containsIgnoreCase("${serde}"))
|
||||
assertTrue(res.containsIgnoreCase("${input_format}"))
|
||||
assertTrue(res.containsIgnoreCase("${output_format}"))
|
||||
assertTrue(res.containsIgnoreCase("${doris_fileformat}"))
|
||||
assertTrue(res.containsIgnoreCase("${filed_delim}"))
|
||||
assertTrue(res.containsIgnoreCase("${filed_delim}"))
|
||||
assertTrue(res.containsIgnoreCase("${line_delim}"))
|
||||
assertTrue(res.containsIgnoreCase("${mapkey_delim}"))
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user