[Feature](multi-catalog) support query hive views. (#18815)

A very simple implementation to query hive views, it is an EXPERIMENTAL feature.
We can try to parse the ddl of hive views and try to execute the query relies on the fact that HiveQL
is very similar to Doris SQL. But if the ddl of hive views use some complicated or incompatible grammar,
the query might fail.
This commit is contained in:
Xiangyu Wang
2023-04-24 08:49:26 +08:00
committed by GitHub
parent 0dd45ce158
commit 2d7903e2bd
7 changed files with 513 additions and 6 deletions

View File

@ -2012,6 +2012,13 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static boolean enable_func_pushdown = true;
/**
* If set to true, doris will try to parse the ddl of a hive view and try to execute the query
* otherwise it will throw an AnalysisException.
*/
@ConfField(mutable = true, expType = ExperimentalType.EXPERIMENTAL)
public static boolean enable_query_hive_views = false;
/**
* If set to true, doris will automatically synchronize hms metadata to the cache in fe.
*/

View File

@ -34,6 +34,7 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.View;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.IdGenerator;
@ -81,6 +82,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -319,6 +321,11 @@ public class Analyzer {
// True if at least one of the analyzers belongs to a subquery.
public boolean containsSubquery = false;
// When parsing a ddl of hive view, it does not contains any catalog info,
// so we need to record it in Analyzer
// otherwise some error will occurs when resolving TableRef later.
public String externalCtl;
// all registered conjuncts (map from id to Predicate)
private final Map<ExprId, Expr> conjuncts = Maps.newHashMap();
@ -541,6 +548,14 @@ public class Analyzer {
return new Analyzer(parentAnalyzer, globalState, new InferPredicateState());
}
public void setExternalCtl(String externalCtl) {
globalState.externalCtl = externalCtl;
}
public String getExternalCtl() {
return globalState.externalCtl;
}
public void setIsExplain() {
globalState.isExplain = true;
}
@ -757,6 +772,10 @@ public class Analyzer {
}
// Try to find a matching local view.
TableName tableName = tableRef.getName();
if (StringUtils.isNotEmpty(this.globalState.externalCtl)
&& StringUtils.isEmpty(tableName.getCtl())) {
tableName.setCtl(this.globalState.externalCtl);
}
if (!tableName.isFullyQualified()) {
// Searches the hierarchy of analyzers bottom-up for a registered local view with
// a matching alias.
@ -801,11 +820,26 @@ public class Analyzer {
// Now hms table only support a bit of table kinds in the whole hive system.
// So Add this strong checker here to avoid some undefine behaviour in doris.
if (table.getType() == TableType.HMS_EXTERNAL_TABLE && !((HMSExternalTable) table).isSupportedHmsTable()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_HMS_TABLE,
table.getName(),
((HMSExternalTable) table).getDbName(),
tableName.getCtl());
if (table.getType() == TableType.HMS_EXTERNAL_TABLE) {
if (!((HMSExternalTable) table).isSupportedHmsTable()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_HMS_TABLE,
table.getName(),
((HMSExternalTable) table).getDbName(),
tableName.getCtl());
}
if (Config.enable_query_hive_views) {
if (((HMSExternalTable) table).isView()
&& StringUtils.isNotEmpty(((HMSExternalTable) table).getViewText())) {
View hmsView = new View(table.getId(), table.getName(), table.getFullSchema());
hmsView.setInlineViewDefWithSqlMode(((HMSExternalTable) table).getViewText(),
ConnectContext.get().getSessionVariable().getSqlMode());
InlineViewRef inlineViewRef = new InlineViewRef(hmsView, tableRef);
if (StringUtils.isNotEmpty(tableName.getCtl())) {
inlineViewRef.setExternalCtl(tableName.getCtl());
}
return inlineViewRef;
}
}
}
// tableName.getTbl() stores the table name specified by the user in the from statement.

View File

@ -25,6 +25,7 @@ import org.apache.doris.common.UserException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
@ -131,10 +132,21 @@ public class FromClause implements ParseNode, Iterable<TableRef> {
tblRef = analyzer.resolveTableRef(tblRef);
tablerefs.set(i, Preconditions.checkNotNull(tblRef));
tblRef.setLeftTblRef(leftTblRef);
boolean setExternalCtl = false;
String preExternalCtl = null;
if (tblRef instanceof InlineViewRef) {
((InlineViewRef) tblRef).setNeedToSql(needToSql);
String externalCtl = ((InlineViewRef) tblRef).getExternalCtl();
if (StringUtils.isNotEmpty(externalCtl)) {
preExternalCtl = analyzer.getExternalCtl();
analyzer.setExternalCtl(externalCtl);
setExternalCtl = true;
}
}
tblRef.analyze(analyzer);
if (setExternalCtl) {
analyzer.setExternalCtl(preExternalCtl);
}
leftTblRef = tblRef;
Expr clause = tblRef.getOnClause();
if (clause != null && clause.contains(Subquery.class)) {

View File

@ -75,6 +75,11 @@ public class InlineViewRef extends TableRef {
// Map inline view's output slots to the corresponding baseTblResultExpr of queryStmt.
protected final ExprSubstitutionMap baseTblSmap;
// When parsing a ddl of hive view, it does not contains any catalog info,
// so we need to record it in Analyzer
// otherwise some error will occurs when resolving TableRef later.
protected String externalCtl;
// END: Members that need to be reset()
// ///////////////////////////////////////
@ -448,6 +453,14 @@ public class InlineViewRef extends TableRef {
return queryStmt;
}
public void setExternalCtl(String externalCtl) {
this.externalCtl = externalCtl;
}
public String getExternalCtl() {
return this.externalCtl;
}
@Override
public String tableNameToSql() {
// Enclose the alias in quotes if Hive cannot parse it without quotes.

View File

@ -32,6 +32,7 @@ import org.apache.doris.thrift.TTableType;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
@ -253,6 +254,26 @@ public class HMSExternalTable extends ExternalTable {
}
}
public String getViewText() {
String viewText = getViewExpandedText();
if (StringUtils.isNotEmpty(viewText)) {
return viewText;
}
return getViewOriginalText();
}
public String getViewExpandedText() {
LOG.debug("View expanded text of hms table [{}.{}.{}] : {}",
this.getCatalog().getName(), this.getDbName(), this.getName(), remoteTable.getViewExpandedText());
return remoteTable.getViewExpandedText();
}
public String getViewOriginalText() {
LOG.debug("View original text of hms table [{}.{}.{}] : {}",
this.getCatalog().getName(), this.getDbName(), this.getName(), remoteTable.getViewOriginalText());
return remoteTable.getViewOriginalText();
}
public String getMetastoreUri() {
return ((HMSExternalCatalog) catalog).getHiveMetastoreUris();
}

View File

@ -40,6 +40,7 @@ public class AlterTableEvent extends MetastoreTableEvent {
// true if this alter event was due to a rename operation
private final boolean isRename;
private final boolean isView;
private AlterTableEvent(NotificationEvent event, String catalogName) {
super(event, catalogName);
@ -59,7 +60,7 @@ public class AlterTableEvent extends MetastoreTableEvent {
// this is a rename event if either dbName or tblName of before and after object changed
isRename = !tableBefore.getDbName().equalsIgnoreCase(tableAfter.getDbName())
|| !tableBefore.getTableName().equalsIgnoreCase(tableAfter.getTableName());
isView = tableBefore.isSetViewExpandedText() || tableBefore.isSetViewOriginalText();
}
public static List<MetastoreEvent> getEvents(NotificationEvent event,
@ -67,6 +68,15 @@ public class AlterTableEvent extends MetastoreTableEvent {
return Lists.newArrayList(new AlterTableEvent(event, catalogName));
}
private void processRecreateTable() throws DdlException {
if (!isView) {
return;
}
Env.getCurrentEnv().getCatalogMgr()
.dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName);
Env.getCurrentEnv().getCatalogMgr()
.createExternalTable(tableAfter.getDbName(), tableAfter.getTableName(), catalogName);
}
private void processRename() throws DdlException {
if (!isRename) {
@ -100,6 +110,12 @@ public class AlterTableEvent extends MetastoreTableEvent {
processRename();
return;
}
if (isView) {
// if this table is a view, `viewExpandedText/viewOriginalText` of this table may be changed,
// so we need to recreate the table to make sure `remoteTable` will be rebuild
processRecreateTable();
return;
}
//The scope of refresh can be narrowed in the future
Env.getCurrentEnv().getCatalogMgr()
.refreshExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName);

View File

@ -0,0 +1,404 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.hms;
import org.apache.doris.analysis.CreateCatalogStmt;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateResourceStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.utframe.TestWithFeService;
import com.google.common.collect.Lists;
import mockit.Expectations;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import java.util.List;
public class HmsCatalogTest extends TestWithFeService {
private static final String HMS_CATALOG = "hms_ctl";
private Env env;
private CatalogMgr mgr;
private ConnectContext rootCtx;
@Mocked
private HMSExternalTable tbl;
@Mocked
private HMSExternalTable view1;
@Mocked
private HMSExternalTable view2;
@Mocked
private HMSExternalTable view3;
@Mocked
private HMSExternalTable view4;
@Override
protected void runBeforeAll() throws Exception {
FeConstants.runningUnitTest = true;
Config.enable_query_hive_views = true;
rootCtx = createDefaultCtx();
env = Env.getCurrentEnv();
rootCtx.setEnv(env);
mgr = env.getCatalogMgr();
// create hms catalog by resource
CreateResourceStmt hmsResource = (CreateResourceStmt) parseAndAnalyzeStmt(
"create resource hms_resource properties('type' = 'hms', 'hive.metastore.uris' = 'thrift://192.168.0.1:9083');",
rootCtx);
env.getResourceMgr().createResource(hmsResource);
CreateCatalogStmt hmsCatalog = (CreateCatalogStmt) parseAndAnalyzeStmt(
"create catalog hms_ctl with resource hms_resource;",
rootCtx);
mgr.createCatalog(hmsCatalog);
// create inner db and tbl for test
CreateDbStmt createDbStmt = (CreateDbStmt) parseAndAnalyzeStmt("create database test", rootCtx);
mgr.getInternalCatalog().createDb(createDbStmt);
CreateTableStmt createTableStmt = (CreateTableStmt) parseAndAnalyzeStmt("create table test.tbl1(\n"
+ "k1 int comment 'test column k1', "
+ "k2 int comment 'test column k2') comment 'test table1' "
+ "distributed by hash(k1) buckets 1\n"
+ "properties(\"replication_num\" = \"1\");");
mgr.getInternalCatalog().createTable(createTableStmt);
}
private void createDbAndTableForHmsCatalog(HMSExternalCatalog hmsCatalog) {
Deencapsulation.setField(hmsCatalog, "initialized", true);
Deencapsulation.setField(hmsCatalog, "objectCreated", true);
List<Column> schema = Lists.newArrayList();
schema.add(new Column("k1", PrimitiveType.INT));
HMSExternalDatabase db = new HMSExternalDatabase(hmsCatalog, 10000, "hms_db");
Deencapsulation.setField(db, "initialized", true);
Deencapsulation.setField(tbl, "objectCreated", true);
new Expectations(tbl) {
{
tbl.getId();
minTimes = 0;
result = 10001;
tbl.getName();
minTimes = 0;
result = "hms_tbl";
tbl.getDbName();
minTimes = 0;
result = "hms_db";
tbl.getFullSchema();
minTimes = 0;
result = schema;
tbl.isSupportedHmsTable();
minTimes = 0;
result = true;
tbl.isView();
minTimes = 0;
result = false;
tbl.getType();
minTimes = 0;
result = TableIf.TableType.HMS_EXTERNAL_TABLE;
}
};
Deencapsulation.setField(view1, "objectCreated", true);
new Expectations(view1) {
{
view1.getId();
minTimes = 0;
result = 10002;
view1.getName();
minTimes = 0;
result = "hms_view1";
view1.getDbName();
minTimes = 0;
result = "hms_db";
view1.isView();
minTimes = 0;
result = true;
view1.getType();
minTimes = 0;
result = TableIf.TableType.HMS_EXTERNAL_TABLE;
view1.getFullSchema();
minTimes = 0;
result = schema;
view1.getViewText();
minTimes = 0;
result = "SELECT * FROM hms_db.hms_tbl";
view1.isSupportedHmsTable();
minTimes = 0;
result = true;
}
};
Deencapsulation.setField(view2, "objectCreated", true);
new Expectations(view2) {
{
view2.getId();
minTimes = 0;
result = 10003;
view2.getName();
minTimes = 0;
result = "hms_view2";
view2.getDbName();
minTimes = 0;
result = "hms_db";
view2.isView();
minTimes = 0;
result = true;
view2.getType();
minTimes = 0;
result = TableIf.TableType.HMS_EXTERNAL_TABLE;
view2.getFullSchema();
minTimes = 0;
result = schema;
view2.getViewText();
minTimes = 0;
result = "SELECT * FROM (SELECT * FROM hms_db.hms_view1) t1";
view2.isSupportedHmsTable();
minTimes = 0;
result = true;
}
};
Deencapsulation.setField(view3, "objectCreated", true);
new Expectations(view3) {
{
view3.getId();
minTimes = 0;
result = 10004;
view3.getName();
minTimes = 0;
result = "hms_view3";
view3.getDbName();
minTimes = 0;
result = "hms_db";
view3.isView();
minTimes = 0;
result = true;
view3.getType();
minTimes = 0;
result = TableIf.TableType.HMS_EXTERNAL_TABLE;
view3.getFullSchema();
minTimes = 0;
result = schema;
view3.getViewText();
minTimes = 0;
result = "SELECT * FROM hms_db.hms_view1 UNION ALL SELECT * FROM hms_db.hms_view2";
view3.isSupportedHmsTable();
minTimes = 0;
result = true;
}
};
Deencapsulation.setField(view4, "objectCreated", true);
new Expectations(view4) {
{
view4.getId();
minTimes = 0;
result = 10005;
view4.getName();
minTimes = 0;
result = "hms_view4";
view4.getDbName();
minTimes = 0;
result = "hms_db";
view4.isView();
minTimes = 0;
result = true;
view4.getType();
minTimes = 0;
result = TableIf.TableType.HMS_EXTERNAL_TABLE;
view4.getFullSchema();
minTimes = 0;
result = schema;
view4.getViewText();
minTimes = 0;
result = "SELECT not_exists_func(k1) FROM hms_db.hms_tbl";
view4.isSupportedHmsTable();
minTimes = 0;
result = true;
}
};
db.addTableForTest(tbl);
db.addTableForTest(view1);
db.addTableForTest(view2);
db.addTableForTest(view3);
db.addTableForTest(view4);
hmsCatalog.addDatabaseForTest(db);
}
@Test
public void testQueryView() {
createDbAndTableForHmsCatalog((HMSExternalCatalog) env.getCatalogMgr().getCatalog(HMS_CATALOG));
// test normal table
try {
parseAndAnalyzeStmt("SELECT * FROM hms_ctl.hms_db.hms_tbl", rootCtx);
} catch (Exception exception) {
exception.printStackTrace();
Assert.fail();
}
// test simple view
try {
parseAndAnalyzeStmt("SELECT * FROM hms_ctl.hms_db.hms_view1", rootCtx);
} catch (Exception exception) {
exception.printStackTrace();
Assert.fail();
}
// test view with subquery
try {
parseAndAnalyzeStmt("SELECT * FROM hms_ctl.hms_db.hms_view2", rootCtx);
} catch (Exception exception) {
exception.printStackTrace();
Assert.fail();
}
// test view with union
try {
parseAndAnalyzeStmt("SELECT * FROM hms_ctl.hms_db.hms_view3", rootCtx);
} catch (Exception exception) {
exception.printStackTrace();
Assert.fail();
}
// test view with not support func
AnalysisException e = Assert.assertThrows(AnalysisException.class,
() -> parseAndAnalyzeStmt("SELECT * FROM hms_ctl.hms_db.hms_view4", rootCtx));
Assert.assertTrue(e.getMessage().contains("No matching function with signature: not_exists_func"));
// change to hms_ctl
try {
env.changeCatalog(rootCtx, HMS_CATALOG);
} catch (Exception exception) {
exception.printStackTrace();
Assert.fail();
}
// test in hms_ctl
try {
parseAndAnalyzeStmt("SELECT * FROM hms_db.hms_view1", rootCtx);
} catch (Exception exception) {
exception.printStackTrace();
Assert.fail();
}
try {
parseAndAnalyzeStmt("SELECT * FROM hms_db.hms_view2", rootCtx);
} catch (Exception exception) {
exception.printStackTrace();
Assert.fail();
}
try {
parseAndAnalyzeStmt("SELECT * FROM hms_db.hms_view3", rootCtx);
} catch (Exception exception) {
exception.printStackTrace();
Assert.fail();
}
Assert.assertThrows(AnalysisException.class,
() -> parseAndAnalyzeStmt("SELECT * FROM hms_db.hms_view4", rootCtx));
// test federated query
try {
parseAndAnalyzeStmt("SELECT * FROM hms_db.hms_view3, internal.test.tbl1", rootCtx);
} catch (Exception exception) {
exception.printStackTrace();
Assert.fail();
}
// change to internal catalog
try {
env.changeCatalog(rootCtx, InternalCatalog.INTERNAL_CATALOG_NAME);
} catch (Exception exception) {
exception.printStackTrace();
Assert.fail();
}
try {
parseAndAnalyzeStmt("SELECT * FROM hms_ctl.hms_db.hms_view3, internal.test.tbl1", rootCtx);
} catch (Exception exception) {
exception.printStackTrace();
Assert.fail();
}
try {
parseAndAnalyzeStmt("SELECT * FROM hms_ctl.hms_db.hms_view3, test.tbl1", rootCtx);
} catch (Exception exception) {
exception.printStackTrace();
Assert.fail();
}
}
}