From f5036fea63408bdb2555a58d2f717bc4aafc7ddc Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Mon, 11 Jul 2022 23:48:15 +0800 Subject: [PATCH] [enhancement][multi-catalog]Add strong checker for hms table (#10724) --- .../org/apache/doris/analysis/Analyzer.java | 10 +++ .../doris/catalog/external/ExternalTable.java | 2 +- .../catalog/external/HMSExternalTable.java | 65 +++++++++++++++++-- .../org/apache/doris/common/ErrorCode.java | 4 +- .../apache/doris/common/util/BrokerUtil.java | 7 ++ 5 files changed, 81 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index 904d57777a..4ef6fb5a42 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -31,6 +31,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Type; import org.apache.doris.catalog.View; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -674,6 +675,15 @@ public class Analyzer { table = HudiUtils.resolveHudiTable((HudiTable) table); } + // Now hms table only support a bit of table kinds in the whole hive system. + // So Add this strong checker here to avoid some undefine behaviour in doris. + if (table.getType() == TableType.HMS_EXTERNAL_TABLE && !((HMSExternalTable) table).isSupportedHmsTable()) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_HMS_TABLE, + table.getName(), + ((HMSExternalTable) table).getDbName(), + tableName.getCtl()); + } + // tableName.getTbl() stores the table name specified by the user in the from statement. // In the case of case-sensitive table names, the value of tableName.getTbl() is the same as table.getName(). // However, since the system view is not case-sensitive, table.getName() gets the lowercase view name, diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index c034d9420f..870fc2b3a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -44,7 +44,7 @@ public class ExternalTable implements TableIf { protected String name; protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true); protected TableType type = null; - protected List fullSchema = null; + protected volatile List fullSchema = null; /** * Create external table. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index d6fa3eb34f..db8b7377be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -44,7 +44,12 @@ public class HMSExternalTable extends ExternalTable { private final HMSExternalDataSource ds; private final String dbName; - private org.apache.hadoop.hive.metastore.api.Table remoteTable = null; + private final List supportedHiveFileFormats = Lists.newArrayList( + "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", + "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat", + "org.apache.hadoop.mapred.TextInputFormat"); + + private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null; private DLAType dlaType = DLAType.UNKNOWN; private boolean initialized = false; @@ -67,6 +72,11 @@ public class HMSExternalTable extends ExternalTable { this.type = TableType.HMS_EXTERNAL_TABLE; } + public boolean isSupportedHmsTable() { + makeSureInitialized(); + return dlaType != DLAType.UNKNOWN; + } + private synchronized void makeSureInitialized() { if (!initialized) { init(); @@ -84,18 +94,63 @@ public class HMSExternalTable extends ExternalTable { dlaType = DLAType.UNKNOWN; fullSchema = Lists.newArrayList(); } else { - if (remoteTable.getParameters().containsKey("table_type") && remoteTable.getParameters().get("table_type") - .equalsIgnoreCase("ICEBERG")) { + if (supportedIcebergTable()) { dlaType = DLAType.ICEBERG; - } else if (remoteTable.getSd().getInputFormat().toLowerCase().contains("hoodie")) { + } else if (supportedHoodieTable()) { dlaType = DLAType.HUDI; - } else { + } else if (supportedHiveTable()) { dlaType = DLAType.HIVE; + } else { + dlaType = DLAType.UNKNOWN; + fullSchema = Lists.newArrayList(); } initSchema(); } } + /** + * Now we only support cow table in iceberg. + */ + private boolean supportedIcebergTable() { + Map paras = remoteTable.getParameters(); + if (paras == null) { + return false; + } + boolean isIcebergTable = paras.containsKey("table_type") + && paras.get("table_type").equalsIgnoreCase("ICEBERG"); + boolean isMorInDelete = paras.containsKey("write.delete.mode") + && paras.get("write.delete.mode").equalsIgnoreCase("merge-on-read"); + boolean isMorInUpdate = paras.containsKey("write.update.mode") + && paras.get("write.update.mode").equalsIgnoreCase("merge-on-read"); + boolean isMorInMerge = paras.containsKey("write.merge.mode") + && paras.get("write.merge.mode").equalsIgnoreCase("merge-on-read"); + boolean isCowTable = !(isMorInDelete || isMorInUpdate || isMorInMerge); + return isIcebergTable && isCowTable; + } + + /** + * Now we only support `Snapshot Queries` on both cow and mor table and `Read Optimized Queries` on cow table. + * And they both use the `HoodieParquetInputFormat` for the input format in hive metastore. + */ + private boolean supportedHoodieTable() { + if (remoteTable.getSd() == null) { + return false; + } + String inputFormatName = remoteTable.getSd().getInputFormat(); + return inputFormatName != null + && inputFormatName.equalsIgnoreCase("org.apache.hudi.hadoop.HoodieParquetInputFormat"); + } + + /** + * Now we only support three file input format hive tables: parquet/orc/text. And they must be managed_table. + */ + private boolean supportedHiveTable() { + boolean isManagedTable = remoteTable.getTableType().equalsIgnoreCase("MANAGED_TABLE"); + String inputFileFormat = remoteTable.getSd().getInputFormat(); + boolean supportedFileFormat = inputFileFormat != null && supportedHiveFileFormats.contains(inputFileFormat); + return isManagedTable && supportedFileFormat; + } + private void initSchema() { if (fullSchema == null) { synchronized (this) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java index 4fdd640b4e..88d2171a44 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java @@ -1689,7 +1689,9 @@ public enum ErrorCode { ERR_WRONG_CATALOG_NAME(5085, new byte[]{'4', '2', '0', '0', '0'}, "Incorrect catalog name '%s'"), ERR_UNKNOWN_CATALOG(5086, new byte[]{'4', '2', '0', '0', '0'}, "Unknown catalog '%s'"), ERR_CATALOG_ACCESS_DENIED(5087, new byte[]{'4', '2', '0', '0', '0'}, - "Access denied for user '%s' to catalog '%s'"); + "Access denied for user '%s' to catalog '%s'"), + ERR_NONSUPPORT_HMS_TABLE(5088, new byte[]{'4', '2', '0', '0', '0'}, + "Nonsupport hive metastore table named '%s' in database '%s' with catalog '%s'."); // This is error code private final int code; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index 8367bc8b85..0108dd7004 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -89,6 +89,8 @@ public class BrokerUtil { public static String HADOOP_USER_NAME = "hadoop.username"; public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal"; public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab"; + public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit"; + public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path"; public static THdfsParams generateHdfsParam(Map properties) { THdfsParams tHdfsParams = new THdfsParams(); @@ -109,6 +111,11 @@ public class BrokerUtil { tHdfsParams.hdfs_conf.add(hdfsConf); } } + // `dfs.client.read.shortcircuit` and `dfs.domain.socket.path` should be both set to enable short circuit read. + // We should disable short circuit read if they are not both set because it will cause performance down. + if (!properties.containsKey(HADOOP_SHORT_CIRCUIT) || !properties.containsKey(HADOOP_SOCKET_PATH)) { + tHdfsParams.addToHdfsConf(new THdfsConf(HADOOP_SHORT_CIRCUIT, "false")); + } return tHdfsParams; }