[enhancement][multi-catalog]Add strong checker for hms table (#10724)

This commit is contained in:
huangzhaowei
2022-07-11 23:48:15 +08:00
committed by GitHub
parent aea41c2ffa
commit f5036fea63
5 changed files with 81 additions and 7 deletions

View File

@ -31,6 +31,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.View;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@ -674,6 +675,15 @@ public class Analyzer {
table = HudiUtils.resolveHudiTable((HudiTable) table);
}
// Now hms table only support a bit of table kinds in the whole hive system.
// So Add this strong checker here to avoid some undefine behaviour in doris.
if (table.getType() == TableType.HMS_EXTERNAL_TABLE && !((HMSExternalTable) table).isSupportedHmsTable()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_HMS_TABLE,
table.getName(),
((HMSExternalTable) table).getDbName(),
tableName.getCtl());
}
// tableName.getTbl() stores the table name specified by the user in the from statement.
// In the case of case-sensitive table names, the value of tableName.getTbl() is the same as table.getName().
// However, since the system view is not case-sensitive, table.getName() gets the lowercase view name,

View File

@ -44,7 +44,7 @@ public class ExternalTable implements TableIf {
protected String name;
protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
protected TableType type = null;
protected List<Column> fullSchema = null;
protected volatile List<Column> fullSchema = null;
/**
* Create external table.

View File

@ -44,7 +44,12 @@ public class HMSExternalTable extends ExternalTable {
private final HMSExternalDataSource ds;
private final String dbName;
private org.apache.hadoop.hive.metastore.api.Table remoteTable = null;
private final List<String> supportedHiveFileFormats = Lists.newArrayList(
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
"org.apache.hadoop.mapred.TextInputFormat");
private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null;
private DLAType dlaType = DLAType.UNKNOWN;
private boolean initialized = false;
@ -67,6 +72,11 @@ public class HMSExternalTable extends ExternalTable {
this.type = TableType.HMS_EXTERNAL_TABLE;
}
public boolean isSupportedHmsTable() {
makeSureInitialized();
return dlaType != DLAType.UNKNOWN;
}
private synchronized void makeSureInitialized() {
if (!initialized) {
init();
@ -84,18 +94,63 @@ public class HMSExternalTable extends ExternalTable {
dlaType = DLAType.UNKNOWN;
fullSchema = Lists.newArrayList();
} else {
if (remoteTable.getParameters().containsKey("table_type") && remoteTable.getParameters().get("table_type")
.equalsIgnoreCase("ICEBERG")) {
if (supportedIcebergTable()) {
dlaType = DLAType.ICEBERG;
} else if (remoteTable.getSd().getInputFormat().toLowerCase().contains("hoodie")) {
} else if (supportedHoodieTable()) {
dlaType = DLAType.HUDI;
} else {
} else if (supportedHiveTable()) {
dlaType = DLAType.HIVE;
} else {
dlaType = DLAType.UNKNOWN;
fullSchema = Lists.newArrayList();
}
initSchema();
}
}
/**
* Now we only support cow table in iceberg.
*/
private boolean supportedIcebergTable() {
Map<String, String> paras = remoteTable.getParameters();
if (paras == null) {
return false;
}
boolean isIcebergTable = paras.containsKey("table_type")
&& paras.get("table_type").equalsIgnoreCase("ICEBERG");
boolean isMorInDelete = paras.containsKey("write.delete.mode")
&& paras.get("write.delete.mode").equalsIgnoreCase("merge-on-read");
boolean isMorInUpdate = paras.containsKey("write.update.mode")
&& paras.get("write.update.mode").equalsIgnoreCase("merge-on-read");
boolean isMorInMerge = paras.containsKey("write.merge.mode")
&& paras.get("write.merge.mode").equalsIgnoreCase("merge-on-read");
boolean isCowTable = !(isMorInDelete || isMorInUpdate || isMorInMerge);
return isIcebergTable && isCowTable;
}
/**
* Now we only support `Snapshot Queries` on both cow and mor table and `Read Optimized Queries` on cow table.
* And they both use the `HoodieParquetInputFormat` for the input format in hive metastore.
*/
private boolean supportedHoodieTable() {
if (remoteTable.getSd() == null) {
return false;
}
String inputFormatName = remoteTable.getSd().getInputFormat();
return inputFormatName != null
&& inputFormatName.equalsIgnoreCase("org.apache.hudi.hadoop.HoodieParquetInputFormat");
}
/**
* Now we only support three file input format hive tables: parquet/orc/text. And they must be managed_table.
*/
private boolean supportedHiveTable() {
boolean isManagedTable = remoteTable.getTableType().equalsIgnoreCase("MANAGED_TABLE");
String inputFileFormat = remoteTable.getSd().getInputFormat();
boolean supportedFileFormat = inputFileFormat != null && supportedHiveFileFormats.contains(inputFileFormat);
return isManagedTable && supportedFileFormat;
}
private void initSchema() {
if (fullSchema == null) {
synchronized (this) {

View File

@ -1689,7 +1689,9 @@ public enum ErrorCode {
ERR_WRONG_CATALOG_NAME(5085, new byte[]{'4', '2', '0', '0', '0'}, "Incorrect catalog name '%s'"),
ERR_UNKNOWN_CATALOG(5086, new byte[]{'4', '2', '0', '0', '0'}, "Unknown catalog '%s'"),
ERR_CATALOG_ACCESS_DENIED(5087, new byte[]{'4', '2', '0', '0', '0'},
"Access denied for user '%s' to catalog '%s'");
"Access denied for user '%s' to catalog '%s'"),
ERR_NONSUPPORT_HMS_TABLE(5088, new byte[]{'4', '2', '0', '0', '0'},
"Nonsupport hive metastore table named '%s' in database '%s' with catalog '%s'.");
// This is error code
private final int code;

View File

@ -89,6 +89,8 @@ public class BrokerUtil {
public static String HADOOP_USER_NAME = "hadoop.username";
public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
public static THdfsParams generateHdfsParam(Map<String, String> properties) {
THdfsParams tHdfsParams = new THdfsParams();
@ -109,6 +111,11 @@ public class BrokerUtil {
tHdfsParams.hdfs_conf.add(hdfsConf);
}
}
// `dfs.client.read.shortcircuit` and `dfs.domain.socket.path` should be both set to enable short circuit read.
// We should disable short circuit read if they are not both set because it will cause performance down.
if (!properties.containsKey(HADOOP_SHORT_CIRCUIT) || !properties.containsKey(HADOOP_SOCKET_PATH)) {
tHdfsParams.addToHdfsConf(new THdfsConf(HADOOP_SHORT_CIRCUIT, "false"));
}
return tHdfsParams;
}