[Feature](multi-catalog) Support sql cache for hms catalog (#23391)

**Support sql cache for hms catalog. Legacy planner and Nereids planner are all supported. 
Not support partition cache now, not support federated query now.**
This commit is contained in:
Xiangyu Wang
2023-09-10 21:56:35 +08:00
committed by GitHub
parent 9b3be0ba7a
commit 586492c124
12 changed files with 559 additions and 90 deletions

View File

@ -71,6 +71,7 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_QUERY_ERR;
public static LongCounterMetric COUNTER_QUERY_TABLE;
public static LongCounterMetric COUNTER_QUERY_OLAP_TABLE;
public static LongCounterMetric COUNTER_QUERY_HIVE_TABLE;
public static AutoMappedMetric<LongCounterMetric> USER_COUNTER_QUERY_ALL;
public static AutoMappedMetric<LongCounterMetric> USER_COUNTER_QUERY_ERR;
public static Histogram HISTO_QUERY_LATENCY;
@ -287,6 +288,9 @@ public final class MetricRepo {
COUNTER_QUERY_OLAP_TABLE = new LongCounterMetric("query_olap_table", MetricUnit.REQUESTS,
"total query from olap table");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_OLAP_TABLE);
COUNTER_QUERY_HIVE_TABLE = new LongCounterMetric("query_hive_table", MetricUnit.REQUESTS,
"total query from hive table");
DORIS_METRIC_REGISTER.addMetrics(COUNTER_QUERY_HIVE_TABLE);
USER_COUNTER_QUERY_ALL = new AutoMappedMetric<>(name -> {
LongCounterMetric userCountQueryAll = new LongCounterMetric("query_total", MetricUnit.REQUESTS,
"total query for single user");

View File

@ -143,7 +143,7 @@ public class NereidsPlanner extends Planner {
ArrayList<String> columnLabelList = physicalPlan.getOutput().stream().map(NamedExpression::getName)
.collect(Collectors.toCollection(ArrayList::new));
logicalPlanAdapter.setColLabels(columnLabelList);
logicalPlanAdapter.setViews(statementContext.getViews());
logicalPlanAdapter.setViewDdlSqls(statementContext.getViewDdlSqls());
}
@VisibleForTesting

View File

@ -18,7 +18,6 @@
package org.apache.doris.nereids;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.View;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.hint.Hint;
@ -87,8 +86,8 @@ public class StatementContext {
// Used to update consumer's stats
private final Map<CTEId, List<Pair<Map<Slot, Slot>, Group>>> cteIdToConsumerGroup = new HashMap<>();
private final Map<CTEId, LogicalPlan> rewrittenCtePlan = new HashMap<>();
private final Set<View> views = Sets.newHashSet();
private final Map<String, Hint> hintMap = Maps.newLinkedHashMap();
private final Set<String> viewDdlSqlSet = Sets.newHashSet();
public StatementContext() {
this.connectContext = ConnectContext.get();
@ -235,11 +234,11 @@ public class StatementContext {
return rewrittenCtePlan;
}
public void addView(View view) {
this.views.add(view);
public void addViewDdlSql(String ddlSql) {
this.viewDdlSqlSet.add(ddlSql);
}
public List<View> getViews() {
return ImmutableList.copyOf(views);
public List<String> getViewDdlSqls() {
return ImmutableList.copyOf(viewDdlSqlSet);
}
}

View File

@ -23,7 +23,6 @@ import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.Queriable;
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.View;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
@ -44,7 +43,7 @@ public class LogicalPlanAdapter extends StatementBase implements Queriable {
private final LogicalPlan logicalPlan;
private List<Expr> resultExprs;
private ArrayList<String> colLabels;
private List<View> views;
private List<String> viewDdlSqls;
public LogicalPlanAdapter(LogicalPlan logicalPlan, StatementContext statementContext) {
this.logicalPlan = logicalPlan;
@ -81,8 +80,8 @@ public class LogicalPlanAdapter extends StatementBase implements Queriable {
return colLabels;
}
public List<View> getViews() {
return views;
public List<String> getViewDdlSqls() {
return viewDdlSqls;
}
@Override
@ -98,8 +97,8 @@ public class LogicalPlanAdapter extends StatementBase implements Queriable {
this.colLabels = colLabels;
}
public void setViews(List<View> views) {
this.views = views;
public void setViewDdlSqls(List<String> viewDdlSqls) {
this.viewDdlSqls = viewDdlSqls;
}
public StatementContext getStatementContext() {

View File

@ -63,7 +63,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Optional;
@ -220,16 +219,14 @@ public class BindRelation extends OneAnalysisRuleFactory {
case OLAP:
return makeOlapScan(table, unboundRelation, tableQualifier);
case VIEW:
cascadesContext.getStatementContext().addView((View) table);
Plan viewPlan = parseAndAnalyzeView(((View) table).getDdlSql(), cascadesContext);
return new LogicalSubQueryAlias<>(tableQualifier, viewPlan);
case HMS_EXTERNAL_TABLE:
if (Config.enable_query_hive_views) {
if (((HMSExternalTable) table).isView()
&& StringUtils.isNotEmpty(((HMSExternalTable) table).getViewText())) {
Plan hiveViewPlan = parseAndAnalyzeHiveView(table, cascadesContext);
return new LogicalSubQueryAlias<>(tableQualifier, hiveViewPlan);
}
if (Config.enable_query_hive_views && ((HMSExternalTable) table).isView()) {
String hiveCatalog = ((HMSExternalTable) table).getCatalog().getName();
String ddlSql = ((HMSExternalTable) table).getViewText();
Plan hiveViewPlan = parseAndAnalyzeHiveView(hiveCatalog, ddlSql, cascadesContext);
return new LogicalSubQueryAlias<>(tableQualifier, hiveViewPlan);
}
return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table, tableQualifier);
case ICEBERG_EXTERNAL_TABLE:
@ -248,20 +245,20 @@ public class BindRelation extends OneAnalysisRuleFactory {
}
}
private Plan parseAndAnalyzeHiveView(TableIf table, CascadesContext cascadesContext) {
HMSExternalTable hiveTable = (HMSExternalTable) table;
private Plan parseAndAnalyzeHiveView(String hiveCatalog, String ddlSql, CascadesContext cascadesContext) {
ConnectContext ctx = cascadesContext.getConnectContext();
String previousCatalog = ctx.getCurrentCatalog().getName();
String previousDb = ctx.getDatabase();
ctx.changeDefaultCatalog(hiveTable.getCatalog().getName());
Plan hiveViewPlan = parseAndAnalyzeView(hiveTable.getViewText(), cascadesContext);
ctx.changeDefaultCatalog(hiveCatalog);
Plan hiveViewPlan = parseAndAnalyzeView(ddlSql, cascadesContext);
ctx.changeDefaultCatalog(previousCatalog);
ctx.setDatabase(previousDb);
return hiveViewPlan;
}
private Plan parseAndAnalyzeView(String viewSql, CascadesContext parentContext) {
LogicalPlan parsedViewPlan = new NereidsParser().parseSingle(viewSql);
private Plan parseAndAnalyzeView(String ddlSql, CascadesContext parentContext) {
parentContext.getStatementContext().addViewDdlSql(ddlSql);
LogicalPlan parsedViewPlan = new NereidsParser().parseSingle(ddlSql);
// TODO: use a good to do this, such as eliminate UnboundResultSink
if (parsedViewPlan instanceof UnboundResultSink) {
parsedViewPlan = (LogicalPlan) ((UnboundResultSink<?>) parsedViewPlan).child();

View File

@ -270,4 +270,8 @@ public abstract class FileScanNode extends ExternalScanNode {
long fileLength = last.getOffset() + last.getLength() - 1L;
throw new IllegalArgumentException(String.format("Offset %d is outside of file (0..%d)", offset, fileLength));
}
public long getReadPartitionNum() {
return this.readPartitionNum;
}
}

View File

@ -34,6 +34,7 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
@ -43,12 +44,14 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.external.HiveScanNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.RowBatch;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -59,7 +62,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Analyze which caching mode a SQL is suitable for
@ -89,7 +91,6 @@ public class CacheAnalyzer {
private StatementBase parsedStmt;
private SelectStmt selectStmt;
private List<ScanNode> scanNodes;
private OlapTable olapTable;
private RangePartitionInfo partitionInfo;
private Column partColumn;
private CompoundPredicate partitionPredicate;
@ -137,7 +138,7 @@ public class CacheAnalyzer {
}
public class CacheTable implements Comparable<CacheTable> {
public OlapTable olapTable;
public TableIf table;
public long latestPartitionId;
public long latestVersion;
public long latestTime;
@ -145,7 +146,7 @@ public class CacheAnalyzer {
public long sumOfPartitionNum;
public CacheTable() {
olapTable = null;
table = null;
latestPartitionId = 0;
latestVersion = 0;
latestTime = 0;
@ -160,7 +161,7 @@ public class CacheAnalyzer {
public void debug() {
LOG.debug("table {}, partition id {}, ver {}, time {}, partition num {}, sumOfPartitionNum: {}",
olapTable.getName(), latestPartitionId, latestVersion, latestTime, partitionNum, sumOfPartitionNum);
table.getName(), latestPartitionId, latestVersion, latestTime, partitionNum, sumOfPartitionNum);
}
}
@ -207,28 +208,12 @@ public class CacheAnalyzer {
LOG.debug("not a select stmt or no scan node. queryid {}", DebugUtil.printId(queryId));
return CacheMode.NoNeed;
}
MetricRepo.COUNTER_QUERY_TABLE.increase(1L);
this.selectStmt = (SelectStmt) parsedStmt;
//Check the last version time of the table
List<CacheTable> tblTimeList = Lists.newArrayList();
for (int i = 0; i < scanNodes.size(); i++) {
ScanNode node = scanNodes.get(i);
if (!(node instanceof OlapScanNode)) {
LOG.debug("query contains non-olap table. queryid {}", DebugUtil.printId(queryId));
return CacheMode.None;
}
if (enablePartitionCache() && ((OlapScanNode) node).getSelectedPartitionNum() > 1
&& selectStmt.hasGroupByClause()) {
LOG.debug("more than one partition scanned when query has agg, partition cache cannot use, queryid {}",
DebugUtil.printId(queryId));
return CacheMode.None;
}
CacheTable cTable = getSelectedPartitionLastUpdateTime((OlapScanNode) node);
tblTimeList.add(cTable);
List<CacheTable> tblTimeList = buildCacheTableList();
if (CollectionUtils.isEmpty(tblTimeList)) {
return CacheMode.None;
}
MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L);
Collections.sort(tblTimeList);
latestTable = tblTimeList.get(0);
latestTable.sumOfPartitionNum = tblTimeList.stream().mapToLong(item -> item.partitionNum).sum();
latestTable.debug();
@ -251,6 +236,11 @@ public class CacheAnalyzer {
return CacheMode.Sql;
}
// TODO:wxy support partition cache for hive table later
if (!(latestTable.table instanceof OlapTable)) {
LOG.debug("only support partition cache for olap table now. queryid {}", DebugUtil.printId(queryId));
return CacheMode.None;
}
if (!enablePartitionCache()) {
LOG.debug("partition query cache is disabled. queryid {}", DebugUtil.printId(queryId));
return CacheMode.None;
@ -265,7 +255,7 @@ public class CacheAnalyzer {
return CacheMode.None;
}
}
olapTable = latestTable.olapTable;
OlapTable olapTable = (OlapTable) latestTable.table;
if (olapTable.getPartitionInfo().getType() != PartitionType.RANGE) {
LOG.debug("the partition of OlapTable not RANGE type, queryid {}", DebugUtil.printId(queryId));
return CacheMode.None;
@ -315,23 +305,12 @@ public class CacheAnalyzer {
}
return CacheMode.NoNeed;
}
MetricRepo.COUNTER_QUERY_TABLE.increase(1L);
//Check the last version time of the table
List<CacheTable> tblTimeList = Lists.newArrayList();
for (int i = 0; i < scanNodes.size(); i++) {
ScanNode node = scanNodes.get(i);
if (!(node instanceof OlapScanNode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("query contains non-olap table. queryid {}", DebugUtil.printId(queryId));
}
return CacheMode.None;
}
CacheTable cTable = getSelectedPartitionLastUpdateTime((OlapScanNode) node);
tblTimeList.add(cTable);
List<CacheTable> tblTimeList = buildCacheTableList();
if (CollectionUtils.isEmpty(tblTimeList)) {
return CacheMode.None;
}
MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L);
Collections.sort(tblTimeList);
latestTable = tblTimeList.get(0);
latestTable.sumOfPartitionNum = tblTimeList.stream().mapToLong(item -> item.partitionNum).sum();
latestTable.debug();
@ -370,23 +349,12 @@ public class CacheAnalyzer {
}
return CacheMode.NoNeed;
}
MetricRepo.COUNTER_QUERY_TABLE.increase(1L);
//Check the last version time of the table
List<CacheTable> tblTimeList = Lists.newArrayList();
for (int i = 0; i < scanNodes.size(); i++) {
ScanNode node = scanNodes.get(i);
if (!(node instanceof OlapScanNode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("query contains non-olap table. queryid {}", DebugUtil.printId(queryId));
}
return CacheMode.None;
}
CacheTable cTable = getSelectedPartitionLastUpdateTime((OlapScanNode) node);
tblTimeList.add(cTable);
List<CacheTable> tblTimeList = buildCacheTableList();
if (CollectionUtils.isEmpty(tblTimeList)) {
return CacheMode.None;
}
MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L);
Collections.sort(tblTimeList);
latestTable = tblTimeList.get(0);
latestTable.sumOfPartitionNum = tblTimeList.stream().mapToLong(item -> item.partitionNum).sum();
latestTable.debug();
@ -395,8 +363,7 @@ public class CacheAnalyzer {
return CacheMode.NoNeed;
}
allViewStmtSet.addAll(((LogicalPlanAdapter) parsedStmt).getViews()
.stream().map(view -> view.getDdlSql()).collect(Collectors.toSet()));
allViewStmtSet.addAll(((LogicalPlanAdapter) parsedStmt).getViewDdlSqls());
String allViewExpandStmtListStr = StringUtils.join(allViewStmtSet, "|");
if (now == 0) {
@ -417,6 +384,45 @@ public class CacheAnalyzer {
return CacheMode.None;
}
private List<CacheTable> buildCacheTableList() {
//Check the last version time of the table
MetricRepo.COUNTER_QUERY_TABLE.increase(1L);
long olapScanNodeSize = scanNodes.stream().filter(node -> node instanceof OlapScanNode).count();
long hiveScanNodeSize = scanNodes.stream().filter(node -> node instanceof HiveScanNode).count();
if (olapScanNodeSize > 0) {
MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L);
}
if (hiveScanNodeSize > 0) {
MetricRepo.COUNTER_QUERY_HIVE_TABLE.increase(1L);
}
if (!(olapScanNodeSize == scanNodes.size() || hiveScanNodeSize == scanNodes.size())) {
LOG.debug("only support olap/hive table with non-federated query, other types are not supported now, "
+ "queryId {}", DebugUtil.printId(queryId));
return Collections.emptyList();
}
List<CacheTable> tblTimeList = Lists.newArrayList();
for (int i = 0; i < scanNodes.size(); i++) {
ScanNode node = scanNodes.get(i);
if (enablePartitionCache()
&& (node instanceof OlapScanNode)
&& ((OlapScanNode) node).getSelectedPartitionNum() > 1
&& selectStmt != null
&& selectStmt.hasGroupByClause()) {
LOG.debug("more than one partition scanned when qeury has agg, partition cache cannot use, queryid {}",
DebugUtil.printId(queryId));
return Collections.emptyList();
}
CacheTable cTable = node instanceof OlapScanNode
? buildCacheTableForOlapScanNode((OlapScanNode) node)
: buildCacheTableForHiveScanNode((HiveScanNode) node);
tblTimeList.add(cTable);
}
Collections.sort(tblTimeList);
return tblTimeList;
}
public InternalService.PFetchCacheResult getCacheData() {
if (parsedStmt instanceof LogicalPlanAdapter) {
cacheMode = innerCheckCacheModeForNereids(0);
@ -579,11 +585,11 @@ public class CacheAnalyzer {
}
}
private CacheTable getSelectedPartitionLastUpdateTime(OlapScanNode node) {
private CacheTable buildCacheTableForOlapScanNode(OlapScanNode node) {
CacheTable cacheTable = new CacheTable();
OlapTable olapTable = node.getOlapTable();
cacheTable.olapTable = olapTable;
cacheTable.partitionNum = node.getSelectedPartitionIds().size();
cacheTable.table = olapTable;
for (Long partitionId : node.getSelectedPartitionIds()) {
Partition partition = olapTable.getPartition(partitionId);
if (partition.getVisibleVersionTime() >= cacheTable.latestTime) {
@ -595,6 +601,14 @@ public class CacheAnalyzer {
return cacheTable;
}
private CacheTable buildCacheTableForHiveScanNode(HiveScanNode node) {
CacheTable cacheTable = new CacheTable();
cacheTable.table = node.getTargetTable();
cacheTable.partitionNum = node.getReadPartitionNum();
cacheTable.latestTime = cacheTable.table.getLastUpdateTime();
return cacheTable;
}
private void addAllViewStmt(List<TableRef> tblRefs) {
for (TableRef tblRef : tblRefs) {
if (tblRef instanceof InlineViewRef) {

View File

@ -72,7 +72,7 @@ public class PartitionCache extends Cache {
public void setCacheInfo(CacheAnalyzer.CacheTable latestTable, RangePartitionInfo partitionInfo, Column partColumn,
CompoundPredicate partitionPredicate, String allViewExpandStmtListStr) {
this.latestTable = latestTable;
this.olapTable = latestTable.olapTable;
this.olapTable = (OlapTable) latestTable.table;
this.partitionInfo = partitionInfo;
this.partColumn = partColumn;
this.partitionPredicate = partitionPredicate;

View File

@ -36,7 +36,7 @@ public abstract class AnalyzeCheckTestBase extends TestWithFeService {
}
protected void checkAnalyze(String sql) {
LogicalPlan analyzed = analyze(sql);
LogicalPlan analyzed = analyzeAndGetLogicalPlanByNereids(sql);
Assertions.assertTrue(checkBound(analyzed));
}

View File

@ -0,0 +1,406 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.analysis.CreateCatalogStmt;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.catalog.external.HMSExternalTable.DLAType;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.external.HiveScanNode;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.qe.cache.SqlCache;
import com.google.common.collect.Lists;
import mockit.Expectations;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class HmsQueryCacheTest extends AnalyzeCheckTestBase {
private static final String HMS_CATALOG = "hms_ctl";
private static final long NOW = System.currentTimeMillis();
private Env env;
private CatalogMgr mgr;
private OlapScanNode olapScanNode;
@Mocked
private HMSExternalTable tbl;
@Mocked
private HMSExternalTable view1;
@Mocked
private HMSExternalTable view2;
@Mocked
private HiveScanNode hiveScanNode1;
@Mocked
private HiveScanNode hiveScanNode2;
@Mocked
private HiveScanNode hiveScanNode3;
@Override
protected void runBeforeAll() throws Exception {
FeConstants.runningUnitTest = true;
Config.enable_query_hive_views = true;
Config.cache_enable_sql_mode = true;
Config.cache_enable_partition_mode = true;
connectContext.getSessionVariable().setEnableSqlCache(true);
env = Env.getCurrentEnv();
connectContext.setEnv(env);
mgr = env.getCatalogMgr();
// create hms catalog
CreateCatalogStmt hmsCatalogStmt = (CreateCatalogStmt) parseAndAnalyzeStmt(
"create catalog hms_ctl properties('type' = 'hms', 'hive.metastore.uris' = 'thrift://192.168.0.1:9083');",
connectContext);
mgr.createCatalog(hmsCatalogStmt);
// create inner db and tbl for test
CreateDbStmt createDbStmt = (CreateDbStmt) parseAndAnalyzeStmt("create database test", connectContext);
mgr.getInternalCatalog().createDb(createDbStmt);
CreateTableStmt createTableStmt = (CreateTableStmt) parseAndAnalyzeStmt("create table test.tbl1(\n"
+ "k1 int comment 'test column k1', "
+ "k2 int comment 'test column k2') comment 'test table1' "
+ "distributed by hash(k1) buckets 1\n"
+ "properties(\"replication_num\" = \"1\");");
mgr.getInternalCatalog().createTable(createTableStmt);
}
private void init(HMSExternalCatalog hmsCatalog) {
Deencapsulation.setField(hmsCatalog, "initialized", true);
Deencapsulation.setField(hmsCatalog, "objectCreated", true);
List<Column> schema = Lists.newArrayList();
schema.add(new Column("k1", PrimitiveType.INT));
HMSExternalDatabase db = new HMSExternalDatabase(hmsCatalog, 10000, "hms_db");
Deencapsulation.setField(db, "initialized", true);
Deencapsulation.setField(tbl, "objectCreated", true);
Deencapsulation.setField(tbl, "rwLock", new ReentrantReadWriteLock(true));
new Expectations(tbl) {
{
tbl.getId();
minTimes = 0;
result = 10001;
tbl.getName();
minTimes = 0;
result = "hms_tbl";
tbl.getDbName();
minTimes = 0;
result = "hms_db";
tbl.getFullSchema();
minTimes = 0;
result = schema;
tbl.isSupportedHmsTable();
minTimes = 0;
result = true;
tbl.isView();
minTimes = 0;
result = false;
tbl.getType();
minTimes = 0;
result = TableIf.TableType.HMS_EXTERNAL_TABLE;
tbl.getDlaType();
minTimes = 0;
result = DLAType.HIVE;
tbl.getLastUpdateTime();
minTimes = 0;
result = NOW;
}
};
Deencapsulation.setField(view1, "objectCreated", true);
Deencapsulation.setField(view1, "rwLock", new ReentrantReadWriteLock(true));
new Expectations(view1) {
{
view1.getId();
minTimes = 0;
result = 10002;
view1.getName();
minTimes = 0;
result = "hms_view1";
view1.getDbName();
minTimes = 0;
result = "hms_db";
view1.isView();
minTimes = 0;
result = true;
view1.getCatalog();
minTimes = 0;
result = hmsCatalog;
view1.getType();
minTimes = 0;
result = TableIf.TableType.HMS_EXTERNAL_TABLE;
view1.getFullSchema();
minTimes = 0;
result = schema;
view1.getViewText();
minTimes = 0;
result = "SELECT * FROM hms_db.hms_tbl";
view1.isSupportedHmsTable();
minTimes = 0;
result = true;
view1.getDlaType();
minTimes = 0;
result = DLAType.HIVE;
view1.getLastUpdateTime();
minTimes = 0;
result = NOW;
}
};
Deencapsulation.setField(view2, "objectCreated", true);
Deencapsulation.setField(view2, "rwLock", new ReentrantReadWriteLock(true));
new Expectations(view2) {
{
view2.getId();
minTimes = 0;
result = 10003;
view2.getName();
minTimes = 0;
result = "hms_view2";
view2.getDbName();
minTimes = 0;
result = "hms_db";
view2.isView();
minTimes = 0;
result = true;
view2.getCatalog();
minTimes = 0;
result = hmsCatalog;
view2.getType();
minTimes = 0;
result = TableIf.TableType.HMS_EXTERNAL_TABLE;
view2.getFullSchema();
minTimes = 0;
result = schema;
view2.getViewText();
minTimes = 0;
result = "SELECT * FROM hms_db.hms_view1";
view2.isSupportedHmsTable();
minTimes = 0;
result = true;
view2.getDlaType();
minTimes = 0;
result = DLAType.HIVE;
view2.getLastUpdateTime();
minTimes = 0;
result = NOW;
}
};
db.addTableForTest(tbl);
db.addTableForTest(view1);
db.addTableForTest(view2);
hmsCatalog.addDatabaseForTest(db);
new Expectations(hiveScanNode1) {
{
hiveScanNode1.getTargetTable();
minTimes = 0;
result = tbl;
}
};
new Expectations(hiveScanNode2) {
{
hiveScanNode2.getTargetTable();
minTimes = 0;
result = view1;
}
};
new Expectations(hiveScanNode3) {
{
hiveScanNode3.getTargetTable();
minTimes = 0;
result = view2;
}
};
TupleDescriptor desc = new TupleDescriptor(new TupleId(1));
desc.setTable(mgr.getInternalCatalog().getDbNullable("default_cluster:test").getTableNullable("tbl1"));
olapScanNode = new OlapScanNode(new PlanNodeId(1), desc, "tb1ScanNode");
}
@Test
public void testHitSqlCache() throws Exception {
init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG));
StatementBase parseStmt = parseAndAnalyzeStmt("select * from hms_ctl.hms_db.hms_tbl", connectContext);
List<ScanNode> scanNodes = Arrays.asList(hiveScanNode1);
CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes);
ca.checkCacheMode(NOW + Config.cache_last_version_interval_second * 1000L * 2);
Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql);
}
@Test
public void testHitSqlCacheByNereids() throws Exception {
init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG));
StatementBase parseStmt = analyzeAndGetStmtByNereids("select * from hms_ctl.hms_db.hms_tbl", connectContext);
List<ScanNode> scanNodes = Arrays.asList(hiveScanNode1);
CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes);
ca.checkCacheModeForNereids(NOW + Config.cache_last_version_interval_second * 1000L * 2);
Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql);
}
@Test
public void testHitSqlCacheWithHiveView() throws Exception {
init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG));
StatementBase parseStmt = parseAndAnalyzeStmt("select * from hms_ctl.hms_db.hms_view1", connectContext);
List<ScanNode> scanNodes = Arrays.asList(hiveScanNode2);
CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes);
ca.checkCacheMode(NOW + Config.cache_last_version_interval_second * 1000L * 2);
Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql);
}
@Test
public void testHitSqlCacheWithHiveViewByNereids() throws Exception {
init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG));
StatementBase parseStmt = analyzeAndGetStmtByNereids("select * from hms_ctl.hms_db.hms_view1", connectContext);
List<ScanNode> scanNodes = Arrays.asList(hiveScanNode2);
CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes);
ca.checkCacheModeForNereids(NOW + Config.cache_last_version_interval_second * 1000L * 2);
Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql);
}
@Test
public void testHitSqlCacheWithNestedHiveView() throws Exception {
init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG));
StatementBase parseStmt = parseAndAnalyzeStmt("select * from hms_ctl.hms_db.hms_view2", connectContext);
List<ScanNode> scanNodes = Arrays.asList(hiveScanNode3);
CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes);
ca.checkCacheMode(NOW + Config.cache_last_version_interval_second * 1000L * 2);
Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql);
SqlCache sqlCache = (SqlCache) ca.getCache();
String cacheKey = sqlCache.getSqlWithViewStmt();
Assert.assertEquals(cacheKey, "SELECT `hms_ctl`.`default_cluster:hms_db`.`hms_view2`.`k1` AS `k1` "
+ "FROM `hms_ctl`.`default_cluster:hms_db`.`hms_view2`"
+ "|SELECT * FROM hms_db.hms_tbl|SELECT * FROM hms_db.hms_view1");
}
@Test
public void testHitSqlCacheWithNestedHiveViewByNereids() throws Exception {
init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG));
StatementBase parseStmt = analyzeAndGetStmtByNereids("select * from hms_ctl.hms_db.hms_view2", connectContext);
List<ScanNode> scanNodes = Arrays.asList(hiveScanNode3);
CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes);
ca.checkCacheModeForNereids(NOW + Config.cache_last_version_interval_second * 1000L * 2);
Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.Sql);
SqlCache sqlCache = (SqlCache) ca.getCache();
String cacheKey = sqlCache.getSqlWithViewStmt();
Assert.assertEquals(cacheKey, "select * from hms_ctl.hms_db.hms_view2"
+ "|SELECT * FROM hms_db.hms_tbl|SELECT * FROM hms_db.hms_view1");
}
@Test
public void testNotHitSqlCache() throws Exception {
init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG));
StatementBase parseStmt = parseAndAnalyzeStmt("select * from hms_ctl.hms_db.hms_tbl", connectContext);
List<ScanNode> scanNodes = Arrays.asList(hiveScanNode1);
CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes);
ca.checkCacheMode(NOW);
Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.None);
}
@Test
public void testNotHitSqlCacheByNereids() throws Exception {
init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG));
StatementBase parseStmt = analyzeAndGetStmtByNereids("select * from hms_ctl.hms_db.hms_tbl", connectContext);
List<ScanNode> scanNodes = Arrays.asList(hiveScanNode1);
CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes);
ca.checkCacheModeForNereids(NOW);
Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.None);
}
@Test
public void testNotHitSqlCacheWithFederatedQuery() throws Exception {
init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG));
// cache mode is None if this query is a federated query
StatementBase parseStmt = parseAndAnalyzeStmt("select * from hms_ctl.hms_db.hms_tbl "
+ "inner join internal.test.tbl1", connectContext);
List<ScanNode> scanNodes = Arrays.asList(hiveScanNode1, olapScanNode);
CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes);
ca.checkCacheMode(NOW + Config.cache_last_version_interval_second * 1000L * 2);
Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.None);
}
@Test
public void testNotHitSqlCacheWithFederatedQueryByNereids() throws Exception {
init((HMSExternalCatalog) mgr.getCatalog(HMS_CATALOG));
// cache mode is None if this query is a federated query
StatementBase parseStmt = analyzeAndGetStmtByNereids("select * from hms_ctl.hms_db.hms_tbl "
+ "inner join internal.test.tbl1", connectContext);
List<ScanNode> scanNodes = Arrays.asList(hiveScanNode1, olapScanNode);
CacheAnalyzer ca = new CacheAnalyzer(connectContext, parseStmt, scanNodes);
ca.checkCacheModeForNereids(NOW + Config.cache_last_version_interval_second * 1000L * 2);
Assert.assertEquals(ca.getCacheMode(), CacheAnalyzer.CacheMode.None);
}
}

View File

@ -100,8 +100,8 @@ import java.util.Collection;
import java.util.List;
import java.util.function.Function;
public class PartitionCacheTest {
private static final Logger LOG = LogManager.getLogger(PartitionCacheTest.class);
public class OlapQueryCacheTest {
private static final Logger LOG = LogManager.getLogger(OlapQueryCacheTest.class);
public static String clusterName = "testCluster";
public static String dbName = "testDb";
public static String fullDbName = "testCluster:testDb";

View File

@ -58,6 +58,7 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
@ -183,6 +184,12 @@ public abstract class TestWithFeService {
return statementContext;
}
protected StatementContext createStatementCtx(String sql, ConnectContext ctx) {
StatementContext statementContext = new StatementContext(ctx, new OriginStatement(sql, 0));
ctx.setStatementContext(statementContext);
return statementContext;
}
protected <T extends StatementBase> T createStmt(String showSql)
throws Exception {
return (T) parseAndAnalyzeStmt(showSql, connectContext);
@ -193,7 +200,12 @@ public abstract class TestWithFeService {
return MemoTestUtils.createCascadesContext(statementCtx, sql);
}
public LogicalPlan analyze(String sql) {
protected CascadesContext createCascadesContext(String sql, ConnectContext ctx) {
StatementContext statementCtx = createStatementCtx(sql, ctx);
return MemoTestUtils.createCascadesContext(statementCtx, sql);
}
public LogicalPlan analyzeAndGetLogicalPlanByNereids(String sql) {
Set<String> originDisableRules = connectContext.getSessionVariable().getDisableNereidsRuleNames();
Set<String> disableRuleWithAuth = Sets.newHashSet(originDisableRules);
disableRuleWithAuth.add(RuleType.RELATION_AUTHENTICATION.name());
@ -205,6 +217,40 @@ public abstract class TestWithFeService {
return (LogicalPlan) cascadesContext.getRewritePlan();
}
public LogicalPlan analyzeAndGetLogicalPlanByNereids(String sql, ConnectContext ctx) {
Set<String> originDisableRules = ctx.getSessionVariable().getDisableNereidsRuleNames();
Set<String> disableRuleWithAuth = Sets.newHashSet(originDisableRules);
disableRuleWithAuth.add(RuleType.RELATION_AUTHENTICATION.name());
ctx.getSessionVariable().setDisableNereidsRules(String.join(",", disableRuleWithAuth));
CascadesContext cascadesContext = createCascadesContext(sql, ctx);
cascadesContext.newAnalyzer().analyze();
ctx.getSessionVariable().setDisableNereidsRules(String.join(",", originDisableRules));
cascadesContext.toMemo();
return (LogicalPlan) cascadesContext.getRewritePlan();
}
// Parse an origin stmt and analyze it by nereids. Return a StatementBase instance.
public StatementBase analyzeAndGetStmtByNereids(String sql) {
return analyzeAndGetStmtByNereids(sql, connectContext);
}
// Parse an origin stmt and analyze it by nereids. Return a StatementBase instance.
public StatementBase analyzeAndGetStmtByNereids(String sql, ConnectContext ctx) {
Set<String> originDisableRules = ctx.getSessionVariable().getDisableNereidsRuleNames();
Set<String> disableRuleWithAuth = Sets.newHashSet(originDisableRules);
disableRuleWithAuth.add(RuleType.RELATION_AUTHENTICATION.name());
ctx.getSessionVariable().setDisableNereidsRules(String.join(",", disableRuleWithAuth));
CascadesContext cascadesContext = createCascadesContext(sql, ctx);
cascadesContext.newAnalyzer().analyze();
ctx.getSessionVariable().setDisableNereidsRules(String.join(",", originDisableRules));
cascadesContext.toMemo();
LogicalPlan plan = (LogicalPlan) cascadesContext.getRewritePlan();
LogicalPlanAdapter adapter = new LogicalPlanAdapter(plan, cascadesContext.getStatementContext());
adapter.setViewDdlSqls(cascadesContext.getStatementContext().getViewDdlSqls());
cascadesContext.getStatementContext().setParsedStatement(adapter);
return adapter;
}
protected ConnectContext createCtx(UserIdentity user, String host) throws IOException {
ConnectContext ctx = new ConnectContext();
ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);