[Feature](multi-catalog) support query hive-view for nereids planner. (#21419)
Relevant pr #18815, support query hive views for nereids planner.
This commit is contained in:
@ -24,6 +24,7 @@ import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.View;
|
||||
import org.apache.doris.catalog.external.EsExternalTable;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.nereids.CTEContext;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
@ -58,6 +59,7 @@ import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@ -203,6 +205,13 @@ public class BindRelation extends OneAnalysisRuleFactory {
|
||||
Plan viewPlan = parseAndAnalyzeView(((View) table).getDdlSql(), cascadesContext);
|
||||
return new LogicalSubQueryAlias<>(tableQualifier, viewPlan);
|
||||
case HMS_EXTERNAL_TABLE:
|
||||
if (Config.enable_query_hive_views) {
|
||||
if (((HMSExternalTable) table).isView()
|
||||
&& StringUtils.isNotEmpty(((HMSExternalTable) table).getViewText())) {
|
||||
Plan hiveViewPlan = parseAndAnalyzeHiveView(table, cascadesContext);
|
||||
return new LogicalSubQueryAlias<>(tableQualifier, hiveViewPlan);
|
||||
}
|
||||
}
|
||||
return new LogicalFileScan(RelationUtil.newRelationId(),
|
||||
(HMSExternalTable) table, ImmutableList.of(dbName));
|
||||
case SCHEMA:
|
||||
@ -218,6 +227,18 @@ public class BindRelation extends OneAnalysisRuleFactory {
|
||||
}
|
||||
}
|
||||
|
||||
private Plan parseAndAnalyzeHiveView(TableIf table, CascadesContext cascadesContext) {
|
||||
HMSExternalTable hiveTable = (HMSExternalTable) table;
|
||||
ConnectContext ctx = cascadesContext.getConnectContext();
|
||||
String previousCatalog = ctx.getCurrentCatalog().getName();
|
||||
String previousDb = ctx.getDatabase();
|
||||
ctx.changeDefaultCatalog(hiveTable.getCatalog().getName());
|
||||
Plan hiveViewPlan = parseAndAnalyzeView(hiveTable.getViewText(), cascadesContext);
|
||||
ctx.changeDefaultCatalog(previousCatalog);
|
||||
ctx.setDatabase(previousDb);
|
||||
return hiveViewPlan;
|
||||
}
|
||||
|
||||
private Plan parseAndAnalyzeView(String viewSql, CascadesContext parentContext) {
|
||||
LogicalPlan parsedViewPlan = new NereidsParser().parseSingle(viewSql);
|
||||
CascadesContext viewContext = CascadesContext.newRewriteContext(
|
||||
|
||||
@ -33,22 +33,22 @@ import org.apache.doris.common.jmockit.Deencapsulation;
|
||||
import org.apache.doris.datasource.CatalogMgr;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase;
|
||||
import org.apache.doris.qe.SessionVariable;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import mockit.Expectations;
|
||||
import mockit.Mocked;
|
||||
import org.junit.Assert;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class HmsCatalogTest extends TestWithFeService {
|
||||
public class HmsCatalogTest extends AnalyzeCheckTestBase {
|
||||
private static final String HMS_CATALOG = "hms_ctl";
|
||||
private Env env;
|
||||
private CatalogMgr mgr;
|
||||
private ConnectContext rootCtx;
|
||||
|
||||
@Mocked
|
||||
private HMSExternalTable tbl;
|
||||
@ -65,19 +65,18 @@ public class HmsCatalogTest extends TestWithFeService {
|
||||
protected void runBeforeAll() throws Exception {
|
||||
FeConstants.runningUnitTest = true;
|
||||
Config.enable_query_hive_views = true;
|
||||
rootCtx = createDefaultCtx();
|
||||
env = Env.getCurrentEnv();
|
||||
rootCtx.setEnv(env);
|
||||
connectContext.setEnv(env);
|
||||
mgr = env.getCatalogMgr();
|
||||
|
||||
// create hms catalog
|
||||
CreateCatalogStmt hmsCatalog = (CreateCatalogStmt) parseAndAnalyzeStmt(
|
||||
"create catalog hms_ctl properties('type' = 'hms', 'hive.metastore.uris' = 'thrift://192.168.0.1:9083');",
|
||||
rootCtx);
|
||||
connectContext);
|
||||
mgr.createCatalog(hmsCatalog);
|
||||
|
||||
// create inner db and tbl for test
|
||||
CreateDbStmt createDbStmt = (CreateDbStmt) parseAndAnalyzeStmt("create database test", rootCtx);
|
||||
CreateDbStmt createDbStmt = (CreateDbStmt) parseAndAnalyzeStmt("create database test", connectContext);
|
||||
mgr.getInternalCatalog().createDb(createDbStmt);
|
||||
|
||||
CreateTableStmt createTableStmt = (CreateTableStmt) parseAndAnalyzeStmt("create table test.tbl1(\n"
|
||||
@ -151,6 +150,10 @@ public class HmsCatalogTest extends TestWithFeService {
|
||||
minTimes = 0;
|
||||
result = true;
|
||||
|
||||
view1.getCatalog();
|
||||
minTimes = 0;
|
||||
result = hmsCatalog;
|
||||
|
||||
view1.getType();
|
||||
minTimes = 0;
|
||||
result = TableIf.TableType.HMS_EXTERNAL_TABLE;
|
||||
@ -185,6 +188,10 @@ public class HmsCatalogTest extends TestWithFeService {
|
||||
minTimes = 0;
|
||||
result = "hms_db";
|
||||
|
||||
view2.getCatalog();
|
||||
minTimes = 0;
|
||||
result = hmsCatalog;
|
||||
|
||||
view2.isView();
|
||||
minTimes = 0;
|
||||
result = true;
|
||||
@ -223,6 +230,10 @@ public class HmsCatalogTest extends TestWithFeService {
|
||||
minTimes = 0;
|
||||
result = "hms_db";
|
||||
|
||||
view3.getCatalog();
|
||||
minTimes = 0;
|
||||
result = hmsCatalog;
|
||||
|
||||
view3.isView();
|
||||
minTimes = 0;
|
||||
result = true;
|
||||
@ -261,6 +272,10 @@ public class HmsCatalogTest extends TestWithFeService {
|
||||
minTimes = 0;
|
||||
result = "hms_db";
|
||||
|
||||
view4.getCatalog();
|
||||
minTimes = 0;
|
||||
result = hmsCatalog;
|
||||
|
||||
view4.isView();
|
||||
minTimes = 0;
|
||||
result = true;
|
||||
@ -294,106 +309,94 @@ public class HmsCatalogTest extends TestWithFeService {
|
||||
|
||||
@Test
|
||||
public void testQueryView() {
|
||||
SessionVariable sv = connectContext.getSessionVariable();
|
||||
Assertions.assertNotNull(sv);
|
||||
sv.setEnableNereidsPlanner(true);
|
||||
sv.enableFallbackToOriginalPlanner = false;
|
||||
|
||||
createDbAndTableForHmsCatalog((HMSExternalCatalog) env.getCatalogMgr().getCatalog(HMS_CATALOG));
|
||||
// test normal table
|
||||
queryViews(false);
|
||||
|
||||
// force use nereids planner to query hive views
|
||||
queryViews(true);
|
||||
}
|
||||
|
||||
private void testParseAndAnalyze(boolean useNereids, String sql) {
|
||||
try {
|
||||
parseAndAnalyzeStmt("SELECT * FROM hms_ctl.hms_db.hms_tbl", rootCtx);
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
// test simple view
|
||||
try {
|
||||
parseAndAnalyzeStmt("SELECT * FROM hms_ctl.hms_db.hms_view1", rootCtx);
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
// test view with subquery
|
||||
try {
|
||||
parseAndAnalyzeStmt("SELECT * FROM hms_ctl.hms_db.hms_view2", rootCtx);
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
// test view with union
|
||||
try {
|
||||
parseAndAnalyzeStmt("SELECT * FROM hms_ctl.hms_db.hms_view3", rootCtx);
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
// test view with not support func
|
||||
AnalysisException e = Assert.assertThrows(AnalysisException.class,
|
||||
() -> parseAndAnalyzeStmt("SELECT * FROM hms_ctl.hms_db.hms_view4", rootCtx));
|
||||
Assert.assertTrue(e.getMessage().contains("No matching function with signature: not_exists_func"));
|
||||
|
||||
// change to hms_ctl
|
||||
try {
|
||||
env.changeCatalog(rootCtx, HMS_CATALOG);
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
// test in hms_ctl
|
||||
try {
|
||||
parseAndAnalyzeStmt("SELECT * FROM hms_db.hms_view1", rootCtx);
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
try {
|
||||
parseAndAnalyzeStmt("SELECT * FROM hms_db.hms_view2", rootCtx);
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
try {
|
||||
parseAndAnalyzeStmt("SELECT * FROM hms_db.hms_view3", rootCtx);
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
Assert.assertThrows(AnalysisException.class,
|
||||
() -> parseAndAnalyzeStmt("SELECT * FROM hms_db.hms_view4", rootCtx));
|
||||
|
||||
// test federated query
|
||||
try {
|
||||
parseAndAnalyzeStmt("SELECT * FROM hms_db.hms_view3, internal.test.tbl1", rootCtx);
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
// change to internal catalog
|
||||
try {
|
||||
env.changeCatalog(rootCtx, InternalCatalog.INTERNAL_CATALOG_NAME);
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
try {
|
||||
parseAndAnalyzeStmt("SELECT * FROM hms_ctl.hms_db.hms_view3, internal.test.tbl1", rootCtx);
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
try {
|
||||
parseAndAnalyzeStmt("SELECT * FROM hms_ctl.hms_db.hms_view3, test.tbl1", rootCtx);
|
||||
if (useNereids) {
|
||||
checkAnalyze(sql);
|
||||
} else {
|
||||
parseAndAnalyzeStmt(sql, connectContext);
|
||||
}
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
}
|
||||
|
||||
private void testParseAndAnalyzeWithThrows(boolean useNereids, String sql,
|
||||
Class<? extends Throwable> throwableClass) {
|
||||
try {
|
||||
if (useNereids) {
|
||||
Assert.assertThrows(throwableClass, () -> checkAnalyze(sql));
|
||||
} else {
|
||||
Assert.assertThrows(throwableClass, () -> parseAndAnalyzeStmt(sql, connectContext));
|
||||
}
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
}
|
||||
|
||||
private void queryViews(boolean useNereids) {
|
||||
// test normal table
|
||||
testParseAndAnalyze(useNereids, "SELECT * FROM hms_ctl.hms_db.hms_tbl");
|
||||
|
||||
// test simple view
|
||||
testParseAndAnalyze(useNereids, "SELECT * FROM hms_ctl.hms_db.hms_view1");
|
||||
|
||||
// test view with subquery
|
||||
testParseAndAnalyze(useNereids, "SELECT * FROM hms_ctl.hms_db.hms_view2");
|
||||
|
||||
// test view with union
|
||||
testParseAndAnalyze(useNereids, "SELECT * FROM hms_ctl.hms_db.hms_view3");
|
||||
|
||||
// test view with not support func
|
||||
testParseAndAnalyzeWithThrows(useNereids, "SELECT * FROM hms_ctl.hms_db.hms_view4",
|
||||
useNereids ? org.apache.doris.nereids.exceptions.AnalysisException.class : AnalysisException.class);
|
||||
|
||||
// change to hms_ctl
|
||||
try {
|
||||
env.changeCatalog(connectContext, HMS_CATALOG);
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
// test in hms_ctl
|
||||
testParseAndAnalyze(useNereids, "SELECT * FROM hms_db.hms_view1");
|
||||
|
||||
testParseAndAnalyze(useNereids, "SELECT * FROM hms_db.hms_view2");
|
||||
|
||||
testParseAndAnalyze(useNereids, "SELECT * FROM hms_db.hms_view3");
|
||||
|
||||
testParseAndAnalyzeWithThrows(useNereids, "SELECT * FROM hms_db.hms_view4",
|
||||
useNereids ? org.apache.doris.nereids.exceptions.AnalysisException.class : AnalysisException.class);
|
||||
|
||||
// test federated query
|
||||
testParseAndAnalyze(useNereids, "SELECT * FROM hms_db.hms_view3, internal.test.tbl1");
|
||||
|
||||
// change to internal catalog
|
||||
try {
|
||||
env.changeCatalog(connectContext, InternalCatalog.INTERNAL_CATALOG_NAME);
|
||||
} catch (Exception exception) {
|
||||
exception.printStackTrace();
|
||||
Assert.fail();
|
||||
}
|
||||
|
||||
testParseAndAnalyze(useNereids, "SELECT * FROM hms_ctl.hms_db.hms_view3, internal.test.tbl1");
|
||||
|
||||
testParseAndAnalyze(useNereids, "SELECT * FROM hms_ctl.hms_db.hms_view3, test.tbl1");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user