[Fix](multi-catalog) skip hms events if hms table is not supported. (#28644)
Co-authored-by: wangxiangyu <wangxiangyu@360shuke.com>
This commit is contained in:
@ -418,6 +418,12 @@ public interface TableIf {
|
||||
getName());
|
||||
}
|
||||
|
||||
default String getNameWithFullQualifiers() {
|
||||
return String.format("%s.%s.%s", getDatabase().getCatalog().getName(),
|
||||
ClusterNamespace.getNameFromFullName(getDatabase().getFullName()),
|
||||
getName());
|
||||
}
|
||||
|
||||
default boolean isManagedTable() {
|
||||
return getType() == TableType.OLAP || getType() == TableType.MATERIALIZED_VIEW;
|
||||
}
|
||||
|
||||
@ -28,6 +28,7 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HMSCachedClient;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
|
||||
import org.apache.doris.nereids.exceptions.NotSupportedException;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.BaseAnalysisTask;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
@ -136,8 +137,13 @@ public class HMSExternalTable extends ExternalTable {
|
||||
}
|
||||
|
||||
public boolean isSupportedHmsTable() {
|
||||
makeSureInitialized();
|
||||
return dlaType != DLAType.UNKNOWN;
|
||||
try {
|
||||
makeSureInitialized();
|
||||
return true;
|
||||
} catch (NotSupportedException e) {
|
||||
LOG.warn("Not supported hms table, message: {}", e.getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
protected synchronized void makeSureInitialized() {
|
||||
@ -145,7 +151,7 @@ public class HMSExternalTable extends ExternalTable {
|
||||
if (!objectCreated) {
|
||||
remoteTable = ((HMSExternalCatalog) catalog).getClient().getTable(dbName, name);
|
||||
if (remoteTable == null) {
|
||||
dlaType = DLAType.UNKNOWN;
|
||||
throw new IllegalArgumentException("Hms table not exists, table: " + getNameWithFullQualifiers());
|
||||
} else {
|
||||
if (supportedIcebergTable()) {
|
||||
dlaType = DLAType.ICEBERG;
|
||||
@ -154,7 +160,7 @@ public class HMSExternalTable extends ExternalTable {
|
||||
} else if (supportedHiveTable()) {
|
||||
dlaType = DLAType.HIVE;
|
||||
} else {
|
||||
dlaType = DLAType.UNKNOWN;
|
||||
throw new NotSupportedException("Unsupported dlaType for table: " + getNameWithFullQualifiers());
|
||||
}
|
||||
}
|
||||
objectCreated = true;
|
||||
@ -201,12 +207,8 @@ public class HMSExternalTable extends ExternalTable {
|
||||
if (inputFileFormat == null) {
|
||||
return false;
|
||||
}
|
||||
boolean supportedFileFormat = SUPPORTED_HIVE_FILE_FORMATS.contains(inputFileFormat);
|
||||
if (!supportedFileFormat) {
|
||||
throw new IllegalArgumentException("Unsupported hive input format: " + inputFileFormat);
|
||||
}
|
||||
LOG.debug("hms table {} is {} with file format: {}", name, remoteTable.getTableType(), inputFileFormat);
|
||||
return true;
|
||||
return SUPPORTED_HIVE_FILE_FORMATS.contains(inputFileFormat);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -17,12 +17,14 @@
|
||||
|
||||
package org.apache.doris.datasource;
|
||||
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
|
||||
import org.apache.doris.fs.FileSystemCache;
|
||||
import org.apache.doris.nereids.exceptions.NotSupportedException;
|
||||
import org.apache.doris.planner.external.hudi.HudiPartitionMgr;
|
||||
import org.apache.doris.planner.external.hudi.HudiPartitionProcessor;
|
||||
import org.apache.doris.planner.external.iceberg.IcebergMetadataCache;
|
||||
@ -172,7 +174,14 @@ public class ExternalMetaCacheMgr {
|
||||
String dbName = ClusterNamespace.getNameFromFullName(table.getDbName());
|
||||
HiveMetaStoreCache metaCache = cacheMap.get(catalogId);
|
||||
if (metaCache != null) {
|
||||
metaCache.addPartitionsCache(dbName, table.getName(), partitionNames, table.getPartitionColumnTypes());
|
||||
List<Type> partitionColumnTypes;
|
||||
try {
|
||||
partitionColumnTypes = table.getPartitionColumnTypes();
|
||||
} catch (NotSupportedException e) {
|
||||
LOG.warn("Ignore not supported hms table, message: {} ", e.getMessage());
|
||||
return;
|
||||
}
|
||||
metaCache.addPartitionsCache(dbName, table.getName(), partitionNames, partitionColumnTypes);
|
||||
}
|
||||
LOG.debug("add partition cache for {}.{} in catalog {}", dbName, table.getName(), catalogId);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user