[fix][refactor] refactor schema init of externa table and some parquet issue (#30325)
1. Skip parquet file which has only 4 bytes length: PAR1
2. Refactor the schema init method of iceberg/hudi/hive table in hms catalog
1. Remove some redundant methods of `getIcebergTable`
2. Fix issue described in #23771
3. Support HoodieParquetInputFormatBase, treat it as normal hive table format
4. When listing file, skip all hidden dirs and files
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@ -54,6 +54,8 @@ thirdparty/installed*
|
||||
thirdparty/doris-thirdparty*.tar.xz
|
||||
|
||||
docker/thirdparties/docker-compose/mysql/data
|
||||
docker/thirdparties/docker-compose/hive/scripts/tpch1.db/
|
||||
docker/thirdparties/docker-compose/hive/scripts/paimon1
|
||||
|
||||
fe_plugins/output
|
||||
fe_plugins/**/.factorypath
|
||||
|
||||
@ -238,8 +238,11 @@ Status ParquetReader::_open_file() {
|
||||
}
|
||||
if (_file_metadata == nullptr) {
|
||||
SCOPED_RAW_TIMER(&_statistics.parse_footer_time);
|
||||
if (_file_reader->size() == 0) {
|
||||
return Status::EndOfFile("open file failed, empty parquet file: " + _scan_range.path);
|
||||
if (_file_reader->size() <= sizeof(PARQUET_VERSION_NUMBER)) {
|
||||
// Some system may generate parquet file with only 4 bytes: PAR1
|
||||
// Should consider it as empty file.
|
||||
return Status::EndOfFile("open file failed, empty parquet file {} with size: {}",
|
||||
_scan_range.path, _file_reader->size());
|
||||
}
|
||||
size_t meta_size = 0;
|
||||
if (_meta_cache == nullptr) {
|
||||
@ -928,4 +931,4 @@ int64_t ParquetReader::_get_column_start_offset(const tparquet::ColumnMetaData&
|
||||
}
|
||||
return column.data_page_offset;
|
||||
}
|
||||
} // namespace doris::vectorized
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -770,7 +770,7 @@ public class HiveMetaStoreClientHelper {
|
||||
try {
|
||||
hudiSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Cannot get hudi table schema.");
|
||||
throw new RuntimeException("Cannot get hudi table schema.", e);
|
||||
}
|
||||
return hudiSchema;
|
||||
}
|
||||
|
||||
@ -32,6 +32,7 @@ import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HMSCachedClient;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
|
||||
import org.apache.doris.datasource.hive.HivePartition;
|
||||
import org.apache.doris.external.iceberg.util.IcebergUtils;
|
||||
import org.apache.doris.mtmv.MTMVRelatedTableIf;
|
||||
import org.apache.doris.nereids.exceptions.NotSupportedException;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
@ -62,8 +63,6 @@ import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
|
||||
import org.apache.hadoop.hive.ql.io.AcidUtils;
|
||||
import org.apache.iceberg.Schema;
|
||||
import org.apache.iceberg.Table;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -87,8 +86,10 @@ import java.util.stream.Collectors;
|
||||
public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableIf {
|
||||
private static final Logger LOG = LogManager.getLogger(HMSExternalTable.class);
|
||||
|
||||
private static final Set<String> SUPPORTED_HIVE_FILE_FORMATS;
|
||||
private static final Set<String> SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS;
|
||||
public static final Set<String> SUPPORTED_HIVE_FILE_FORMATS;
|
||||
public static final Set<String> SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS;
|
||||
public static final Set<String> SUPPORTED_HUDI_FILE_FORMATS;
|
||||
|
||||
private static final Map<StatsType, String> MAP_SPARK_STATS_TO_DORIS;
|
||||
private static final String TBL_PROP_TXN_PROPERTIES = "transactional_properties";
|
||||
private static final String TBL_PROP_INSERT_ONLY = "insert_only";
|
||||
@ -111,13 +112,16 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat");
|
||||
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
|
||||
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hadoop.mapred.TextInputFormat");
|
||||
// Some hudi table use HoodieParquetInputFormatBase as input format
|
||||
// But we can't treat it as hudi table.
|
||||
// So add to SUPPORTED_HIVE_FILE_FORMATS and treat is as a hive table.
|
||||
// Then Doris will just list the files from location and read parquet files directly.
|
||||
SUPPORTED_HIVE_FILE_FORMATS.add("org.apache.hudi.hadoop.HoodieParquetInputFormatBase");
|
||||
|
||||
SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS = Sets.newHashSet();
|
||||
SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS.add("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat");
|
||||
}
|
||||
|
||||
private static final Set<String> SUPPORTED_HUDI_FILE_FORMATS;
|
||||
|
||||
static {
|
||||
SUPPORTED_HUDI_FILE_FORMATS = Sets.newHashSet();
|
||||
SUPPORTED_HUDI_FILE_FORMATS.add("org.apache.hudi.hadoop.HoodieParquetInputFormat");
|
||||
@ -405,10 +409,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
return ((HMSExternalCatalog) catalog).getHiveMetastoreUris();
|
||||
}
|
||||
|
||||
public String getHiveVersion() {
|
||||
return ((HMSExternalCatalog) catalog).getHiveVersion();
|
||||
}
|
||||
|
||||
public Map<String, String> getCatalogProperties() {
|
||||
return catalog.getProperties();
|
||||
}
|
||||
@ -454,32 +454,28 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
return initSchema();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public List<Column> initSchema() {
|
||||
makeSureInitialized();
|
||||
List<Column> columns;
|
||||
List<FieldSchema> schema = ((HMSExternalCatalog) catalog).getClient().getSchema(dbName, name);
|
||||
if (dlaType.equals(DLAType.ICEBERG)) {
|
||||
columns = getIcebergSchema(schema);
|
||||
columns = getIcebergSchema();
|
||||
} else if (dlaType.equals(DLAType.HUDI)) {
|
||||
columns = getHudiSchema(schema);
|
||||
columns = getHudiSchema();
|
||||
} else {
|
||||
List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
|
||||
for (FieldSchema field : schema) {
|
||||
tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
|
||||
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
|
||||
true, field.getComment(), true, -1));
|
||||
}
|
||||
columns = tmpSchema;
|
||||
columns = getHiveSchema();
|
||||
}
|
||||
initPartitionColumns(columns);
|
||||
return columns;
|
||||
}
|
||||
|
||||
public List<Column> getHudiSchema(List<FieldSchema> hmsSchema) {
|
||||
private List<Column> getIcebergSchema() {
|
||||
return IcebergUtils.getSchema(catalog, dbName, name);
|
||||
}
|
||||
|
||||
private List<Column> getHudiSchema() {
|
||||
org.apache.avro.Schema hudiSchema = HiveMetaStoreClientHelper.getHudiTableSchema(this);
|
||||
List<Column> tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size());
|
||||
List<Column> tmpSchema = Lists.newArrayListWithCapacity(hudiSchema.getFields().size());
|
||||
for (org.apache.avro.Schema.Field hudiField : hudiSchema.getFields()) {
|
||||
String columnName = hudiField.name().toLowerCase(Locale.ROOT);
|
||||
tmpSchema.add(new Column(columnName, HudiUtils.fromAvroHudiTypeToDorisType(hudiField.schema()),
|
||||
@ -488,6 +484,19 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
return tmpSchema;
|
||||
}
|
||||
|
||||
private List<Column> getHiveSchema() {
|
||||
List<Column> columns;
|
||||
List<FieldSchema> schema = ((HMSExternalCatalog) catalog).getClient().getSchema(dbName, name);
|
||||
List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
|
||||
for (FieldSchema field : schema) {
|
||||
tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
|
||||
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
|
||||
true, field.getComment(), true, -1));
|
||||
}
|
||||
columns = tmpSchema;
|
||||
return columns;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCacheRowCount() {
|
||||
//Cached accurate information
|
||||
@ -528,20 +537,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
return 1;
|
||||
}
|
||||
|
||||
private List<Column> getIcebergSchema(List<FieldSchema> hmsSchema) {
|
||||
Table icebergTable = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this);
|
||||
Schema schema = icebergTable.schema();
|
||||
List<Column> tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size());
|
||||
for (FieldSchema field : hmsSchema) {
|
||||
tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
|
||||
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType(),
|
||||
IcebergExternalTable.ICEBERG_DATETIME_SCALE_MS),
|
||||
true, null, true, false, null, field.getComment(), true, null,
|
||||
schema.caseInsensitiveFindField(field.getName()).fieldId(), null));
|
||||
}
|
||||
return tmpSchema;
|
||||
}
|
||||
|
||||
private void initPartitionColumns(List<Column> schema) {
|
||||
List<String> partitionKeys = remoteTable.getPartitionKeys().stream().map(FieldSchema::getName)
|
||||
.collect(Collectors.toList());
|
||||
@ -598,7 +593,9 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
return getHiveColumnStats(colName);
|
||||
case ICEBERG:
|
||||
return StatisticsUtil.getIcebergColumnStats(colName,
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this));
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(
|
||||
catalog, dbName, name
|
||||
));
|
||||
default:
|
||||
LOG.warn("get column stats for dlaType {} is not supported.", dlaType);
|
||||
}
|
||||
|
||||
@ -17,12 +17,10 @@
|
||||
|
||||
package org.apache.doris.catalog.external;
|
||||
|
||||
import org.apache.doris.catalog.ArrayType;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
|
||||
import org.apache.doris.external.iceberg.util.IcebergUtils;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.BaseAnalysisTask;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
@ -33,21 +31,11 @@ import org.apache.doris.thrift.TIcebergTable;
|
||||
import org.apache.doris.thrift.TTableDescriptor;
|
||||
import org.apache.doris.thrift.TTableType;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.iceberg.Schema;
|
||||
import org.apache.iceberg.types.Types;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Optional;
|
||||
|
||||
public class IcebergExternalTable extends ExternalTable {
|
||||
|
||||
// https://iceberg.apache.org/spec/#schemas-and-data-types
|
||||
// All time and timestamp values are stored with microsecond precision
|
||||
public static final int ICEBERG_DATETIME_SCALE_MS = 6;
|
||||
|
||||
public IcebergExternalTable(long id, String name, String dbName, IcebergExternalCatalog catalog) {
|
||||
super(id, name, catalog, dbName, TableType.ICEBERG_EXTERNAL_TABLE);
|
||||
}
|
||||
@ -65,66 +53,8 @@ public class IcebergExternalTable extends ExternalTable {
|
||||
|
||||
@Override
|
||||
public List<Column> initSchema() {
|
||||
return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(), () -> {
|
||||
Schema schema = ((IcebergExternalCatalog) catalog).getIcebergTable(dbName, name).schema();
|
||||
List<Types.NestedField> columns = schema.columns();
|
||||
List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
|
||||
for (Types.NestedField field : columns) {
|
||||
tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
|
||||
icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true,
|
||||
schema.caseInsensitiveFindField(field.name()).fieldId()));
|
||||
}
|
||||
return tmpSchema;
|
||||
});
|
||||
}
|
||||
|
||||
private Type icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType primitive) {
|
||||
switch (primitive.typeId()) {
|
||||
case BOOLEAN:
|
||||
return Type.BOOLEAN;
|
||||
case INTEGER:
|
||||
return Type.INT;
|
||||
case LONG:
|
||||
return Type.BIGINT;
|
||||
case FLOAT:
|
||||
return Type.FLOAT;
|
||||
case DOUBLE:
|
||||
return Type.DOUBLE;
|
||||
case STRING:
|
||||
case BINARY:
|
||||
case UUID:
|
||||
return Type.STRING;
|
||||
case FIXED:
|
||||
Types.FixedType fixed = (Types.FixedType) primitive;
|
||||
return ScalarType.createCharType(fixed.length());
|
||||
case DECIMAL:
|
||||
Types.DecimalType decimal = (Types.DecimalType) primitive;
|
||||
return ScalarType.createDecimalV3Type(decimal.precision(), decimal.scale());
|
||||
case DATE:
|
||||
return ScalarType.createDateV2Type();
|
||||
case TIMESTAMP:
|
||||
return ScalarType.createDatetimeV2Type(ICEBERG_DATETIME_SCALE_MS);
|
||||
case TIME:
|
||||
return Type.UNSUPPORTED;
|
||||
default:
|
||||
throw new IllegalArgumentException("Cannot transform unknown type: " + primitive);
|
||||
}
|
||||
}
|
||||
|
||||
protected Type icebergTypeToDorisType(org.apache.iceberg.types.Type type) {
|
||||
if (type.isPrimitiveType()) {
|
||||
return icebergPrimitiveTypeToDorisType((org.apache.iceberg.types.Type.PrimitiveType) type);
|
||||
}
|
||||
switch (type.typeId()) {
|
||||
case LIST:
|
||||
Types.ListType list = (Types.ListType) type;
|
||||
return ArrayType.create(icebergTypeToDorisType(list.elementType()), true);
|
||||
case MAP:
|
||||
case STRUCT:
|
||||
return Type.UNSUPPORTED;
|
||||
default:
|
||||
throw new IllegalArgumentException("Cannot transform unknown type: " + type);
|
||||
}
|
||||
makeSureInitialized();
|
||||
return IcebergUtils.getSchema(catalog, dbName, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -358,14 +358,14 @@ public class HiveMetaStoreCache {
|
||||
}
|
||||
|
||||
// Get File Status by using FileSystem API.
|
||||
private FileCacheValue getFileCache(String location, InputFormat<?, ?> inputFormat,
|
||||
JobConf jobConf,
|
||||
List<String> partitionValues,
|
||||
String bindBrokerName) throws UserException {
|
||||
private FileCacheValue getFileCache(String location, String inputFormat,
|
||||
JobConf jobConf,
|
||||
List<String> partitionValues,
|
||||
String bindBrokerName) throws UserException {
|
||||
FileCacheValue result = new FileCacheValue();
|
||||
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
|
||||
new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
|
||||
location, bindBrokerName), jobConf, bindBrokerName));
|
||||
location, bindBrokerName), jobConf, bindBrokerName));
|
||||
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf));
|
||||
try {
|
||||
// For Tez engine, it may generate subdirectoies for "union" query.
|
||||
@ -425,12 +425,12 @@ public class HiveMetaStoreCache {
|
||||
FileInputFormat.setInputPaths(jobConf, finalLocation.get());
|
||||
try {
|
||||
FileCacheValue result;
|
||||
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
|
||||
// TODO: This is a temp config, will remove it after the HiveSplitter is stable.
|
||||
if (key.useSelfSplitter) {
|
||||
result = getFileCache(finalLocation.get(), inputFormat, jobConf,
|
||||
key.getPartitionValues(), key.bindBrokerName);
|
||||
result = getFileCache(finalLocation.get(), key.inputFormat, jobConf,
|
||||
key.getPartitionValues(), key.bindBrokerName);
|
||||
} else {
|
||||
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
|
||||
InputSplit[] splits;
|
||||
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
|
||||
if (!Strings.isNullOrEmpty(remoteUser)) {
|
||||
@ -1082,7 +1082,7 @@ public class HiveMetaStoreCache {
|
||||
private static boolean isGeneratedPath(String name) {
|
||||
return "_temporary".equals(name) // generated by spark
|
||||
|| "_imapala_insert_staging".equals(name) // generated by impala
|
||||
|| name.startsWith(".hive-staging"); // generated by hive
|
||||
|| name.startsWith("."); // generated by hive or hidden file
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -106,6 +106,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
|
||||
return Env.getCurrentEnv()
|
||||
.getExtMetaCacheMgr()
|
||||
.getIcebergMetadataCache()
|
||||
.getIcebergTable(catalog, id, dbName, tblName, getProperties());
|
||||
.getIcebergTable(this, dbName, tblName);
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,16 +21,14 @@ import org.apache.doris.catalog.ArrayType;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.fs.FileSystemFactory;
|
||||
import org.apache.doris.fs.remote.BrokerFileSystem;
|
||||
import org.apache.doris.fs.remote.RemoteFileSystem;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
|
||||
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
|
||||
@ -46,10 +44,7 @@ import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URLDecoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
@ -134,7 +129,6 @@ public final class HiveUtil {
|
||||
}
|
||||
|
||||
private static Type convertHiveTypeToiveDoris(TypeInfo hiveTypeInfo) {
|
||||
|
||||
switch (hiveTypeInfo.getCategory()) {
|
||||
case PRIMITIVE: {
|
||||
PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) hiveTypeInfo;
|
||||
@ -190,39 +184,14 @@ public final class HiveUtil {
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isSplittable(RemoteFileSystem remoteFileSystem, InputFormat<?, ?> inputFormat,
|
||||
String location, JobConf jobConf) throws UserException {
|
||||
public static boolean isSplittable(RemoteFileSystem remoteFileSystem, String inputFormat,
|
||||
String location, JobConf jobConf) throws UserException {
|
||||
if (remoteFileSystem instanceof BrokerFileSystem) {
|
||||
return ((BrokerFileSystem) remoteFileSystem)
|
||||
.isSplittable(location, inputFormat.getClass().getCanonicalName());
|
||||
return ((BrokerFileSystem) remoteFileSystem).isSplittable(location, inputFormat);
|
||||
}
|
||||
|
||||
// ORC uses a custom InputFormat but is always splittable
|
||||
if (inputFormat.getClass().getSimpleName().equals("OrcInputFormat")) {
|
||||
return true;
|
||||
}
|
||||
// use reflection to get isSplitable method on FileInputFormat
|
||||
// ATTN: the method name is actually "isSplitable", but the right spell is "isSplittable"
|
||||
Method method = null;
|
||||
for (Class<?> clazz = inputFormat.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
|
||||
try {
|
||||
method = clazz.getDeclaredMethod("isSplitable", FileSystem.class, Path.class);
|
||||
break;
|
||||
} catch (NoSuchMethodException ignored) {
|
||||
LOG.debug("Class {} doesn't contain isSplitable method.", clazz);
|
||||
}
|
||||
}
|
||||
|
||||
if (method == null) {
|
||||
return false;
|
||||
}
|
||||
Path path = new Path(location);
|
||||
try {
|
||||
method.setAccessible(true);
|
||||
return (boolean) method.invoke(inputFormat, FileSystemFactory.getNativeByPath(path, jobConf), path);
|
||||
} catch (InvocationTargetException | IllegalAccessException | IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
// All supported hive input format are splittable
|
||||
return HMSExternalTable.SUPPORTED_HIVE_FILE_FORMATS.contains(inputFormat);
|
||||
}
|
||||
|
||||
public static String getHivePartitionValue(String part) {
|
||||
@ -236,5 +205,4 @@ public final class HiveUtil {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -33,9 +33,17 @@ import org.apache.doris.analysis.NullLiteral;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.StringLiteral;
|
||||
import org.apache.doris.analysis.Subquery;
|
||||
import org.apache.doris.catalog.ArrayType;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.thrift.TExprOpcode;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.iceberg.Schema;
|
||||
import org.apache.iceberg.expressions.Expression;
|
||||
import org.apache.iceberg.expressions.Expressions;
|
||||
@ -45,19 +53,17 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
||||
/**
|
||||
* Iceberg utils
|
||||
*/
|
||||
public class IcebergUtils {
|
||||
private static final Logger LOG = LogManager.getLogger(IcebergUtils.class);
|
||||
private static ThreadLocal<Integer> columnIdThreadLocal = new ThreadLocal<Integer>() {
|
||||
@Override
|
||||
public Integer initialValue() {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
static long MILLIS_TO_NANO_TIME = 1000;
|
||||
private static long MILLIS_TO_NANO_TIME = 1000;
|
||||
// https://iceberg.apache.org/spec/#schemas-and-data-types
|
||||
// All time and timestamp values are stored with microsecond precision
|
||||
private static final int ICEBERG_DATETIME_SCALE_MS = 6;
|
||||
|
||||
public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
|
||||
if (expr == null) {
|
||||
@ -238,4 +244,74 @@ public class IcebergUtils {
|
||||
}
|
||||
return slotRef;
|
||||
}
|
||||
|
||||
private static Type icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType primitive) {
|
||||
switch (primitive.typeId()) {
|
||||
case BOOLEAN:
|
||||
return Type.BOOLEAN;
|
||||
case INTEGER:
|
||||
return Type.INT;
|
||||
case LONG:
|
||||
return Type.BIGINT;
|
||||
case FLOAT:
|
||||
return Type.FLOAT;
|
||||
case DOUBLE:
|
||||
return Type.DOUBLE;
|
||||
case STRING:
|
||||
case BINARY:
|
||||
case UUID:
|
||||
return Type.STRING;
|
||||
case FIXED:
|
||||
Types.FixedType fixed = (Types.FixedType) primitive;
|
||||
return ScalarType.createCharType(fixed.length());
|
||||
case DECIMAL:
|
||||
Types.DecimalType decimal = (Types.DecimalType) primitive;
|
||||
return ScalarType.createDecimalV3Type(decimal.precision(), decimal.scale());
|
||||
case DATE:
|
||||
return ScalarType.createDateV2Type();
|
||||
case TIMESTAMP:
|
||||
return ScalarType.createDatetimeV2Type(ICEBERG_DATETIME_SCALE_MS);
|
||||
case TIME:
|
||||
return Type.UNSUPPORTED;
|
||||
default:
|
||||
throw new IllegalArgumentException("Cannot transform unknown type: " + primitive);
|
||||
}
|
||||
}
|
||||
|
||||
public static Type icebergTypeToDorisType(org.apache.iceberg.types.Type type) {
|
||||
if (type.isPrimitiveType()) {
|
||||
return icebergPrimitiveTypeToDorisType((org.apache.iceberg.types.Type.PrimitiveType) type);
|
||||
}
|
||||
switch (type.typeId()) {
|
||||
case LIST:
|
||||
Types.ListType list = (Types.ListType) type;
|
||||
return ArrayType.create(icebergTypeToDorisType(list.elementType()), true);
|
||||
case MAP:
|
||||
case STRUCT:
|
||||
return Type.UNSUPPORTED;
|
||||
default:
|
||||
throw new IllegalArgumentException("Cannot transform unknown type: " + type);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get iceberg schema from catalog and convert them to doris schema
|
||||
*/
|
||||
public static List<Column> getSchema(ExternalCatalog catalog, String dbName, String name) {
|
||||
return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(), () -> {
|
||||
org.apache.iceberg.Table icebergTable = Env.getCurrentEnv()
|
||||
.getExtMetaCacheMgr()
|
||||
.getIcebergMetadataCache()
|
||||
.getIcebergTable(catalog, dbName, name);
|
||||
Schema schema = icebergTable.schema();
|
||||
List<Types.NestedField> columns = schema.columns();
|
||||
List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
|
||||
for (Types.NestedField field : columns) {
|
||||
tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
|
||||
IcebergUtils.icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true,
|
||||
schema.caseInsensitiveFindField(field.name()).fieldId()));
|
||||
}
|
||||
return tmpSchema;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@ -148,6 +148,10 @@ public class MTMVTask extends AbstractTask {
|
||||
LOG.info("mtmv task run, taskId: {}", super.getTaskId());
|
||||
try {
|
||||
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
String taskSessionContext = ctx.getSessionVariable().toJson().toJSONString();
|
||||
LOG.debug("mtmv task session variable, taskId: {}, session: {}", super.getTaskId(), taskSessionContext);
|
||||
}
|
||||
// Every time a task is run, the relation is regenerated because baseTables and baseViews may change,
|
||||
// such as deleting a table and creating a view with the same name
|
||||
this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
|
||||
|
||||
@ -24,7 +24,6 @@ import org.apache.doris.catalog.external.IcebergExternalTable;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
|
||||
import org.apache.doris.planner.ColumnRange;
|
||||
import org.apache.doris.thrift.TFileAttributes;
|
||||
|
||||
@ -48,11 +47,9 @@ public class IcebergApiSource implements IcebergSource {
|
||||
this.icebergExtTable = table;
|
||||
|
||||
this.originTable = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(
|
||||
((IcebergExternalCatalog) icebergExtTable.getCatalog()).getCatalog(),
|
||||
icebergExtTable.getCatalog().getId(),
|
||||
icebergExtTable.getDbName(),
|
||||
icebergExtTable.getName(),
|
||||
icebergExtTable.getCatalog().getProperties());
|
||||
icebergExtTable.getCatalog(),
|
||||
icebergExtTable.getDbName(),
|
||||
icebergExtTable.getName());
|
||||
|
||||
this.desc = desc;
|
||||
}
|
||||
|
||||
@ -47,7 +47,9 @@ public class IcebergHMSSource implements IcebergSource {
|
||||
this.desc = desc;
|
||||
this.columnNameToRange = columnNameToRange;
|
||||
this.icebergTable =
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(hmsTable);
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache()
|
||||
.getIcebergTable(hmsTable.getCatalog(),
|
||||
hmsTable.getDbName(), hmsTable.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -19,9 +19,7 @@ package org.apache.doris.planner.external.iceberg;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
@ -72,76 +70,44 @@ public class IcebergMetadataCache {
|
||||
return ifPresent;
|
||||
}
|
||||
|
||||
Table icebergTable = getIcebergTable(key, catalog, params.getDatabase(), params.getTable());
|
||||
Table icebergTable = getIcebergTable(catalog, params.getDatabase(), params.getTable());
|
||||
List<Snapshot> snaps = Lists.newArrayList();
|
||||
Iterables.addAll(snaps, icebergTable.snapshots());
|
||||
snapshotListCache.put(key, snaps);
|
||||
return snaps;
|
||||
}
|
||||
|
||||
public Table getIcebergTable(IcebergMetadataCacheKey key, CatalogIf catalog, String dbName, String tbName)
|
||||
throws UserException {
|
||||
public Table getIcebergTable(CatalogIf catalog, String dbName, String tbName) {
|
||||
IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog.getId(), dbName, tbName);
|
||||
Table cacheTable = tableCache.getIfPresent(key);
|
||||
if (cacheTable != null) {
|
||||
return cacheTable;
|
||||
}
|
||||
|
||||
Table icebergTable;
|
||||
Catalog icebergCatalog;
|
||||
if (catalog instanceof HMSExternalCatalog) {
|
||||
HMSExternalCatalog ctg = (HMSExternalCatalog) catalog;
|
||||
icebergTable = createIcebergTable(
|
||||
ctg.getHiveMetastoreUris(),
|
||||
ctg.getCatalogProperty().getHadoopProperties(),
|
||||
dbName,
|
||||
tbName,
|
||||
ctg.getProperties());
|
||||
icebergCatalog = createIcebergHiveCatalog(
|
||||
ctg.getHiveMetastoreUris(),
|
||||
ctg.getCatalogProperty().getHadoopProperties(),
|
||||
ctg.getProperties());
|
||||
} else if (catalog instanceof IcebergExternalCatalog) {
|
||||
IcebergExternalCatalog extCatalog = (IcebergExternalCatalog) catalog;
|
||||
icebergTable = getIcebergTable(
|
||||
extCatalog.getCatalog(), extCatalog.getId(), dbName, tbName, extCatalog.getProperties());
|
||||
icebergCatalog = ((IcebergExternalCatalog) catalog).getCatalog();
|
||||
} else {
|
||||
throw new UserException("Only support 'hms' and 'iceberg' type for iceberg table");
|
||||
throw new RuntimeException("Only support 'hms' and 'iceberg' type for iceberg table");
|
||||
}
|
||||
Table icebergTable = HiveMetaStoreClientHelper.ugiDoAs(catalog.getId(),
|
||||
() -> icebergCatalog.loadTable(TableIdentifier.of(dbName, tbName)));
|
||||
initIcebergTableFileIO(icebergTable, catalog.getProperties());
|
||||
tableCache.put(key, icebergTable);
|
||||
return icebergTable;
|
||||
}
|
||||
|
||||
public Table getIcebergTable(IcebergSource icebergSource) throws MetaNotFoundException {
|
||||
return icebergSource.getIcebergTable();
|
||||
}
|
||||
|
||||
public Table getIcebergTable(HMSExternalTable hmsTable) {
|
||||
IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(
|
||||
hmsTable.getCatalog().getId(),
|
||||
hmsTable.getDbName(),
|
||||
hmsTable.getName());
|
||||
Table table = tableCache.getIfPresent(key);
|
||||
if (table != null) {
|
||||
return table;
|
||||
}
|
||||
Table icebergTable = createIcebergTable(hmsTable);
|
||||
tableCache.put(key, icebergTable);
|
||||
|
||||
return icebergTable;
|
||||
}
|
||||
|
||||
public Table getIcebergTable(Catalog catalog, long catalogId, String dbName, String tbName,
|
||||
private Table getIcebergTable(Catalog catalog, long catalogId, String dbName, String tbName,
|
||||
Map<String, String> props) {
|
||||
IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(
|
||||
catalogId,
|
||||
dbName,
|
||||
tbName);
|
||||
Table cacheTable = tableCache.getIfPresent(key);
|
||||
if (cacheTable != null) {
|
||||
return cacheTable;
|
||||
}
|
||||
|
||||
Table table = HiveMetaStoreClientHelper.ugiDoAs(catalogId,
|
||||
() -> catalog.loadTable(TableIdentifier.of(dbName, tbName)));
|
||||
initIcebergTableFileIO(table, props);
|
||||
|
||||
tableCache.put(key, table);
|
||||
|
||||
return table;
|
||||
}
|
||||
|
||||
@ -190,14 +156,12 @@ public class IcebergMetadataCache {
|
||||
});
|
||||
}
|
||||
|
||||
private Table createIcebergTable(String uri, Map<String, String> hdfsConf, String db, String tbl,
|
||||
Map<String, String> props) {
|
||||
private Catalog createIcebergHiveCatalog(String uri, Map<String, String> hdfsConf, Map<String, String> props) {
|
||||
// set hdfs configure
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
for (Map.Entry<String, String> entry : hdfsConf.entrySet()) {
|
||||
conf.set(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
HiveCatalog hiveCatalog = new HiveCatalog();
|
||||
hiveCatalog.setConf(conf);
|
||||
|
||||
@ -211,20 +175,10 @@ public class IcebergMetadataCache {
|
||||
catalogProperties.put("uri", uri);
|
||||
hiveCatalog.initialize("hive", catalogProperties);
|
||||
}
|
||||
Table table = HiveMetaStoreClientHelper.ugiDoAs(conf, () -> hiveCatalog.loadTable(TableIdentifier.of(db, tbl)));
|
||||
initIcebergTableFileIO(table, props);
|
||||
return table;
|
||||
return hiveCatalog;
|
||||
}
|
||||
|
||||
private Table createIcebergTable(HMSExternalTable hmsTable) {
|
||||
return createIcebergTable(hmsTable.getMetastoreUri(),
|
||||
hmsTable.getHadoopProperties(),
|
||||
hmsTable.getDbName(),
|
||||
hmsTable.getName(),
|
||||
hmsTable.getCatalogProperties());
|
||||
}
|
||||
|
||||
private void initIcebergTableFileIO(Table table, Map<String, String> props) {
|
||||
private static void initIcebergTableFileIO(Table table, Map<String, String> props) {
|
||||
Map<String, String> ioConf = new HashMap<>();
|
||||
table.properties().forEach((key, value) -> {
|
||||
if (key.startsWith("io.")) {
|
||||
|
||||
@ -22,7 +22,6 @@ import org.apache.doris.analysis.FunctionCallExpr;
|
||||
import org.apache.doris.analysis.TableSnapshot;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.HdfsResource;
|
||||
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
@ -128,7 +127,7 @@ public class IcebergScanNode extends FileQueryScanNode {
|
||||
|
||||
@Override
|
||||
protected void doInitialize() throws UserException {
|
||||
icebergTable = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(source);
|
||||
icebergTable = source.getIcebergTable();
|
||||
super.doInitialize();
|
||||
}
|
||||
|
||||
|
||||
@ -609,7 +609,7 @@ public class StatisticsUtil {
|
||||
Table icebergTable = Env.getCurrentEnv()
|
||||
.getExtMetaCacheMgr()
|
||||
.getIcebergMetadataCache()
|
||||
.getIcebergTable(table);
|
||||
.getIcebergTable(table.getCatalog(), table.getDbName(), table.getName());
|
||||
TableScan tableScan = icebergTable.newScan().includeColumnStats();
|
||||
for (FileScanTask task : tableScan.planFiles()) {
|
||||
rowCount += task.file().recordCount();
|
||||
|
||||
Reference in New Issue
Block a user