cherry pick from #37090
This commit is contained in:
@ -48,6 +48,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.RelationId;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
|
||||
import org.apache.doris.nereids.util.Utils;
|
||||
import org.apache.doris.proto.InternalService;
|
||||
import org.apache.doris.proto.Types.PUniqueId;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
@ -58,6 +59,7 @@ import org.apache.doris.qe.cache.SqlCache;
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
@ -123,16 +125,14 @@ public class NereidsSqlCacheManager {
|
||||
SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
|
||||
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
|
||||
String key = currentUserIdentity.toString() + ":" + sql.trim();
|
||||
if ((sqlCaches.getIfPresent(key) == null) && sqlCacheContext.getOrComputeCacheKeyMd5() != null
|
||||
if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null
|
||||
&& sqlCacheContext.getResultSetInFe().isPresent()) {
|
||||
sqlCaches.put(key, sqlCacheContext);
|
||||
}
|
||||
}
|
||||
|
||||
/** tryAddCache */
|
||||
public void tryAddCache(
|
||||
ConnectContext connectContext, String sql,
|
||||
CacheAnalyzer analyzer, boolean currentMissParseSqlFromSqlCache) {
|
||||
/** tryAddBeCache */
|
||||
public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyzer analyzer) {
|
||||
Optional<SqlCacheContext> sqlCacheContextOpt = connectContext.getStatementContext().getSqlCacheContext();
|
||||
if (!sqlCacheContextOpt.isPresent()) {
|
||||
return;
|
||||
@ -143,8 +143,7 @@ public class NereidsSqlCacheManager {
|
||||
SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
|
||||
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
|
||||
String key = currentUserIdentity.toString() + ":" + sql.trim();
|
||||
if ((currentMissParseSqlFromSqlCache || sqlCaches.getIfPresent(key) == null)
|
||||
&& sqlCacheContext.getOrComputeCacheKeyMd5() != null) {
|
||||
if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null) {
|
||||
SqlCache cache = (SqlCache) analyzer.getCache();
|
||||
sqlCacheContext.setSumOfPartitionNum(cache.getSumOfPartitionNum());
|
||||
sqlCacheContext.setLatestPartitionId(cache.getLatestId());
|
||||
@ -182,9 +181,6 @@ public class NereidsSqlCacheManager {
|
||||
if (viewsChanged(env, sqlCacheContext)) {
|
||||
return invalidateCache(key);
|
||||
}
|
||||
if (usedVariablesChanged(sqlCacheContext)) {
|
||||
return invalidateCache(key);
|
||||
}
|
||||
|
||||
LogicalEmptyRelation whateverPlan = new LogicalEmptyRelation(new RelationId(0), ImmutableList.of());
|
||||
if (nondeterministicFunctionChanged(whateverPlan, connectContext, sqlCacheContext)) {
|
||||
@ -201,7 +197,10 @@ public class NereidsSqlCacheManager {
|
||||
|
||||
try {
|
||||
Optional<ResultSet> resultSetInFe = sqlCacheContext.getResultSetInFe();
|
||||
if (resultSetInFe.isPresent()) {
|
||||
|
||||
List<Variable> currentVariables = resolveUserVariables(sqlCacheContext);
|
||||
boolean usedVariablesChanged = usedVariablesChanged(currentVariables, sqlCacheContext);
|
||||
if (resultSetInFe.isPresent() && !usedVariablesChanged) {
|
||||
MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L);
|
||||
|
||||
String cachedPlan = sqlCacheContext.getPhysicalPlan();
|
||||
@ -214,7 +213,9 @@ public class NereidsSqlCacheManager {
|
||||
}
|
||||
|
||||
Status status = new Status();
|
||||
PUniqueId cacheKeyMd5 = sqlCacheContext.getOrComputeCacheKeyMd5();
|
||||
PUniqueId cacheKeyMd5 = usedVariablesChanged
|
||||
? sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables))
|
||||
: sqlCacheContext.getOrComputeCacheKeyMd5();
|
||||
InternalService.PFetchCacheResult cacheData =
|
||||
SqlCache.getCacheData(sqlCacheContext.getCacheProxy(),
|
||||
cacheKeyMd5, sqlCacheContext.getLatestPartitionId(),
|
||||
@ -235,7 +236,7 @@ public class NereidsSqlCacheManager {
|
||||
);
|
||||
return Optional.of(logicalSqlCache);
|
||||
}
|
||||
return invalidateCache(key);
|
||||
return Optional.empty();
|
||||
} catch (Throwable t) {
|
||||
return invalidateCache(key);
|
||||
}
|
||||
@ -342,12 +343,24 @@ public class NereidsSqlCacheManager {
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean usedVariablesChanged(SqlCacheContext sqlCacheContext) {
|
||||
for (Variable variable : sqlCacheContext.getUsedVariables()) {
|
||||
private List<Variable> resolveUserVariables(SqlCacheContext sqlCacheContext) {
|
||||
List<Variable> cachedUsedVariables = sqlCacheContext.getUsedVariables();
|
||||
List<Variable> currentVariables = Lists.newArrayListWithCapacity(cachedUsedVariables.size());
|
||||
for (Variable cachedVariable : cachedUsedVariables) {
|
||||
Variable currentVariable = ExpressionAnalyzer.resolveUnboundVariable(
|
||||
new UnboundVariable(variable.getName(), variable.getType()));
|
||||
if (!Objects.equals(currentVariable, variable)
|
||||
|| variable.getRealExpression().anyMatch(Nondeterministic.class::isInstance)) {
|
||||
new UnboundVariable(cachedVariable.getName(), cachedVariable.getType()));
|
||||
currentVariables.add(currentVariable);
|
||||
}
|
||||
return currentVariables;
|
||||
}
|
||||
|
||||
private boolean usedVariablesChanged(List<Variable> currentVariables, SqlCacheContext sqlCacheContext) {
|
||||
List<Variable> cachedUsedVariables = sqlCacheContext.getUsedVariables();
|
||||
for (int i = 0; i < cachedUsedVariables.size(); i++) {
|
||||
Variable currentVariable = currentVariables.get(i);
|
||||
Variable cachedVariable = cachedUsedVariables.get(i);
|
||||
if (!Objects.equals(currentVariable, cachedVariable)
|
||||
|| cachedVariable.getRealExpression().anyMatch(Nondeterministic.class::isInstance)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,10 +19,7 @@ package org.apache.doris.nereids;
|
||||
|
||||
import org.apache.doris.analysis.DescriptorTable;
|
||||
import org.apache.doris.analysis.ExplainOptions;
|
||||
import org.apache.doris.analysis.LiteralExpr;
|
||||
import org.apache.doris.analysis.StatementBase;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.NereidsException;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.profile.SummaryProfile;
|
||||
@ -45,37 +42,28 @@ import org.apache.doris.nereids.processor.post.PlanPostProcessors;
|
||||
import org.apache.doris.nereids.processor.pre.PlanPreprocessors;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.MaterializationContext;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.Literal;
|
||||
import org.apache.doris.nereids.trees.plans.ComputeResultSet;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
|
||||
import org.apache.doris.planner.PlanFragment;
|
||||
import org.apache.doris.planner.Planner;
|
||||
import org.apache.doris.planner.RuntimeFilter;
|
||||
import org.apache.doris.planner.ScanNode;
|
||||
import org.apache.doris.qe.CommonResultSet;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.ResultSet;
|
||||
import org.apache.doris.qe.ResultSetMetaData;
|
||||
import org.apache.doris.qe.cache.CacheAnalyzer;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
@ -535,65 +523,16 @@ public class NereidsPlanner extends Planner {
|
||||
if (!(parsedStmt instanceof LogicalPlanAdapter)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
if (physicalPlan instanceof PhysicalSqlCache
|
||||
&& ((PhysicalSqlCache) physicalPlan).getResultSet().isPresent()) {
|
||||
return Optional.of(((PhysicalSqlCache) physicalPlan).getResultSet().get());
|
||||
}
|
||||
if (!(physicalPlan instanceof PhysicalResultSink)) {
|
||||
return Optional.empty();
|
||||
if (physicalPlan instanceof ComputeResultSet) {
|
||||
Optional<SqlCacheContext> sqlCacheContext = statementContext.getSqlCacheContext();
|
||||
Optional<ResultSet> resultSet = ((ComputeResultSet) physicalPlan)
|
||||
.computeResultInFe(cascadesContext, sqlCacheContext);
|
||||
if (resultSet.isPresent()) {
|
||||
return resultSet;
|
||||
}
|
||||
}
|
||||
|
||||
Optional<SqlCacheContext> sqlCacheContext = statementContext.getSqlCacheContext();
|
||||
boolean enableSqlCache
|
||||
= CacheAnalyzer.canUseSqlCache(statementContext.getConnectContext().getSessionVariable());
|
||||
Plan child = physicalPlan.child(0);
|
||||
if (child instanceof PhysicalOneRowRelation) {
|
||||
PhysicalOneRowRelation physicalOneRowRelation = (PhysicalOneRowRelation) physicalPlan.child(0);
|
||||
List<Column> columns = Lists.newArrayList();
|
||||
List<String> data = Lists.newArrayList();
|
||||
for (int i = 0; i < physicalOneRowRelation.getProjects().size(); i++) {
|
||||
NamedExpression item = physicalOneRowRelation.getProjects().get(i);
|
||||
NamedExpression output = physicalPlan.getOutput().get(i);
|
||||
Expression expr = item.child(0);
|
||||
if (expr instanceof Literal) {
|
||||
LiteralExpr legacyExpr = ((Literal) expr).toLegacyLiteral();
|
||||
columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType()));
|
||||
data.add(legacyExpr.getStringValueInFe());
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
|
||||
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data));
|
||||
if (sqlCacheContext.isPresent() && enableSqlCache) {
|
||||
sqlCacheContext.get().setResultSetInFe(resultSet);
|
||||
Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache(
|
||||
statementContext.getConnectContext(),
|
||||
statementContext.getOriginStatement().originStmt
|
||||
);
|
||||
}
|
||||
return Optional.of(resultSet);
|
||||
} else if (child instanceof PhysicalEmptyRelation) {
|
||||
List<Column> columns = Lists.newArrayList();
|
||||
for (int i = 0; i < physicalPlan.getOutput().size(); i++) {
|
||||
NamedExpression output = physicalPlan.getOutput().get(i);
|
||||
columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType()));
|
||||
}
|
||||
|
||||
ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
|
||||
ResultSet resultSet = new CommonResultSet(metadata, ImmutableList.of());
|
||||
if (sqlCacheContext.isPresent() && enableSqlCache) {
|
||||
sqlCacheContext.get().setResultSetInFe(resultSet);
|
||||
Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache(
|
||||
statementContext.getConnectContext(),
|
||||
statementContext.getOriginStatement().originStmt
|
||||
);
|
||||
}
|
||||
return Optional.of(resultSet);
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
||||
@ -329,53 +329,57 @@ public class SqlCacheContext {
|
||||
if (cacheKeyMd5 != null) {
|
||||
return cacheKeyMd5;
|
||||
}
|
||||
|
||||
StringBuilder cacheKey = new StringBuilder(originSql);
|
||||
for (Entry<FullTableName, String> entry : usedViews.entrySet()) {
|
||||
cacheKey.append("|")
|
||||
.append(entry.getKey())
|
||||
.append("=")
|
||||
.append(entry.getValue());
|
||||
}
|
||||
for (Variable usedVariable : usedVariables) {
|
||||
cacheKey.append("|")
|
||||
.append(usedVariable.getType().name())
|
||||
.append(":")
|
||||
.append(usedVariable.getName())
|
||||
.append("=")
|
||||
.append(usedVariable.getRealExpression().toSql());
|
||||
}
|
||||
for (Pair<Expression, Expression> pair : foldNondeterministicPairs) {
|
||||
cacheKey.append("|")
|
||||
.append(pair.key().toSql())
|
||||
.append("=")
|
||||
.append(pair.value().toSql());
|
||||
}
|
||||
for (Entry<FullTableName, List<RowFilterPolicy>> entry : rowPolicies.entrySet()) {
|
||||
List<RowFilterPolicy> policy = entry.getValue();
|
||||
if (policy.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
cacheKey.append("|")
|
||||
.append(entry.getKey())
|
||||
.append("=")
|
||||
.append(policy);
|
||||
}
|
||||
for (Entry<FullColumnName, Optional<DataMaskPolicy>> entry : dataMaskPolicies.entrySet()) {
|
||||
if (!entry.getValue().isPresent()) {
|
||||
continue;
|
||||
}
|
||||
cacheKey.append("|")
|
||||
.append(entry.getKey())
|
||||
.append("=")
|
||||
.append(entry.getValue().map(Object::toString).orElse(""));
|
||||
}
|
||||
cacheKeyMd5 = CacheProxy.getMd5(cacheKey.toString());
|
||||
cacheKeyMd5 = doComputeCacheKeyMd5(usedVariables);
|
||||
}
|
||||
}
|
||||
return cacheKeyMd5;
|
||||
}
|
||||
|
||||
/** doComputeCacheKeyMd5 */
|
||||
public synchronized PUniqueId doComputeCacheKeyMd5(Set<Variable> usedVariables) {
|
||||
StringBuilder cacheKey = new StringBuilder(originSql);
|
||||
for (Entry<FullTableName, String> entry : usedViews.entrySet()) {
|
||||
cacheKey.append("|")
|
||||
.append(entry.getKey())
|
||||
.append("=")
|
||||
.append(entry.getValue());
|
||||
}
|
||||
for (Variable usedVariable : usedVariables) {
|
||||
cacheKey.append("|")
|
||||
.append(usedVariable.getType().name())
|
||||
.append(":")
|
||||
.append(usedVariable.getName())
|
||||
.append("=")
|
||||
.append(usedVariable.getRealExpression().toSql());
|
||||
}
|
||||
for (Pair<Expression, Expression> pair : foldNondeterministicPairs) {
|
||||
cacheKey.append("|")
|
||||
.append(pair.key().toSql())
|
||||
.append("=")
|
||||
.append(pair.value().toSql());
|
||||
}
|
||||
for (Entry<FullTableName, List<RowFilterPolicy>> entry : rowPolicies.entrySet()) {
|
||||
List<RowFilterPolicy> policy = entry.getValue();
|
||||
if (policy.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
cacheKey.append("|")
|
||||
.append(entry.getKey())
|
||||
.append("=")
|
||||
.append(policy);
|
||||
}
|
||||
for (Entry<FullColumnName, Optional<DataMaskPolicy>> entry : dataMaskPolicies.entrySet()) {
|
||||
if (!entry.getValue().isPresent()) {
|
||||
continue;
|
||||
}
|
||||
cacheKey.append("|")
|
||||
.append(entry.getKey())
|
||||
.append("=")
|
||||
.append(entry.getValue().map(Object::toString).orElse(""));
|
||||
}
|
||||
return CacheProxy.getMd5(cacheKey.toString());
|
||||
}
|
||||
|
||||
public void setOriginSql(String originSql) {
|
||||
this.originSql = originSql.trim();
|
||||
}
|
||||
|
||||
@ -0,0 +1,55 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.trees.plans;
|
||||
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.SqlCacheContext;
|
||||
import org.apache.doris.qe.ResultSet;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* This class is used to return result set in fe without send fragment to be.
|
||||
* Some plans support this function, for example:
|
||||
* <li>1. the sql `select 100` will generate a plan, PhysicalOneRowRelation, and PhysicalOneRowRelation implement this
|
||||
* interface, so fe can send the only row to client immediately.
|
||||
* </li>
|
||||
* <li>2. the sql `select * from tbl limit 0` will generate PhysicalEmptyRelation, which means no any rows returned,
|
||||
* the PhysicalEmptyRelation implement this interface.
|
||||
* </li>
|
||||
* </p>
|
||||
* <p>
|
||||
* If you want to cache the result set in fe, you can implement this interface and write this code:
|
||||
* </p>
|
||||
* <pre>
|
||||
* StatementContext statementContext = cascadesContext.getStatementContext();
|
||||
* boolean enableSqlCache
|
||||
* = CacheAnalyzer.canUseSqlCache(statementContext.getConnectContext().getSessionVariable());
|
||||
* if (sqlCacheContext.isPresent() && enableSqlCache) {
|
||||
* sqlCacheContext.get().setResultSetInFe(resultSet);
|
||||
* Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache(
|
||||
* statementContext.getConnectContext(),
|
||||
* statementContext.getOriginStatement().originStmt
|
||||
* );
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
public interface ComputeResultSet {
|
||||
Optional<ResultSet> computeResultInFe(CascadesContext cascadesContext, Optional<SqlCacheContext> sqlCacheContext);
|
||||
}
|
||||
@ -17,20 +17,31 @@
|
||||
|
||||
package org.apache.doris.nereids.trees.plans.physical;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.SqlCacheContext;
|
||||
import org.apache.doris.nereids.StatementContext;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.properties.LogicalProperties;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.plans.ComputeResultSet;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
import org.apache.doris.nereids.trees.plans.RelationId;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.EmptyRelation;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
|
||||
import org.apache.doris.nereids.util.Utils;
|
||||
import org.apache.doris.qe.CommonResultSet;
|
||||
import org.apache.doris.qe.ResultSet;
|
||||
import org.apache.doris.qe.ResultSetMetaData;
|
||||
import org.apache.doris.qe.cache.CacheAnalyzer;
|
||||
import org.apache.doris.statistics.Statistics;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
@ -41,7 +52,7 @@ import java.util.Optional;
|
||||
* e.g.
|
||||
* select * from tbl limit 0
|
||||
*/
|
||||
public class PhysicalEmptyRelation extends PhysicalRelation implements EmptyRelation {
|
||||
public class PhysicalEmptyRelation extends PhysicalRelation implements EmptyRelation, ComputeResultSet {
|
||||
|
||||
private final List<? extends NamedExpression> projects;
|
||||
|
||||
@ -102,4 +113,30 @@ public class PhysicalEmptyRelation extends PhysicalRelation implements EmptyRela
|
||||
return new PhysicalEmptyRelation(relationId, projects, Optional.empty(),
|
||||
logicalPropertiesSupplier.get(), physicalProperties, statistics);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ResultSet> computeResultInFe(CascadesContext cascadesContext,
|
||||
Optional<SqlCacheContext> sqlCacheContext) {
|
||||
List<Column> columns = Lists.newArrayList();
|
||||
List<Slot> outputSlots = getOutput();
|
||||
for (int i = 0; i < outputSlots.size(); i++) {
|
||||
NamedExpression output = outputSlots.get(i);
|
||||
columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType()));
|
||||
}
|
||||
|
||||
StatementContext statementContext = cascadesContext.getStatementContext();
|
||||
boolean enableSqlCache
|
||||
= CacheAnalyzer.canUseSqlCache(statementContext.getConnectContext().getSessionVariable());
|
||||
|
||||
ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
|
||||
ResultSet resultSet = new CommonResultSet(metadata, ImmutableList.of());
|
||||
if (sqlCacheContext.isPresent() && enableSqlCache) {
|
||||
sqlCacheContext.get().setResultSetInFe(resultSet);
|
||||
Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache(
|
||||
statementContext.getConnectContext(),
|
||||
statementContext.getOriginStatement().originStmt
|
||||
);
|
||||
}
|
||||
return Optional.of(resultSet);
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,21 +17,35 @@
|
||||
|
||||
package org.apache.doris.nereids.trees.plans.physical;
|
||||
|
||||
import org.apache.doris.analysis.LiteralExpr;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.SqlCacheContext;
|
||||
import org.apache.doris.nereids.StatementContext;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.properties.LogicalProperties;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.Literal;
|
||||
import org.apache.doris.nereids.trees.plans.ComputeResultSet;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
import org.apache.doris.nereids.trees.plans.RelationId;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
|
||||
import org.apache.doris.nereids.util.Utils;
|
||||
import org.apache.doris.qe.CommonResultSet;
|
||||
import org.apache.doris.qe.ResultSet;
|
||||
import org.apache.doris.qe.ResultSetMetaData;
|
||||
import org.apache.doris.qe.cache.CacheAnalyzer;
|
||||
import org.apache.doris.statistics.Statistics;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
@ -40,7 +54,7 @@ import java.util.Optional;
|
||||
* A physical relation that contains only one row consist of some constant expressions.
|
||||
* e.g. select 100, 'value'
|
||||
*/
|
||||
public class PhysicalOneRowRelation extends PhysicalRelation implements OneRowRelation {
|
||||
public class PhysicalOneRowRelation extends PhysicalRelation implements OneRowRelation, ComputeResultSet {
|
||||
|
||||
private final List<NamedExpression> projects;
|
||||
|
||||
@ -119,4 +133,37 @@ public class PhysicalOneRowRelation extends PhysicalRelation implements OneRowRe
|
||||
return new PhysicalOneRowRelation(relationId, projects, groupExpression,
|
||||
logicalPropertiesSupplier.get(), physicalProperties, statistics);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ResultSet> computeResultInFe(
|
||||
CascadesContext cascadesContext, Optional<SqlCacheContext> sqlCacheContext) {
|
||||
List<Column> columns = Lists.newArrayList();
|
||||
List<String> data = Lists.newArrayList();
|
||||
for (int i = 0; i < projects.size(); i++) {
|
||||
NamedExpression item = projects.get(i);
|
||||
NamedExpression output = getOutput().get(i);
|
||||
Expression expr = item.child(0);
|
||||
if (expr instanceof Literal) {
|
||||
LiteralExpr legacyExpr = ((Literal) expr).toLegacyLiteral();
|
||||
columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType()));
|
||||
data.add(legacyExpr.getStringValueInFe());
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
|
||||
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data));
|
||||
StatementContext statementContext = cascadesContext.getStatementContext();
|
||||
boolean enableSqlCache
|
||||
= CacheAnalyzer.canUseSqlCache(statementContext.getConnectContext().getSessionVariable());
|
||||
if (sqlCacheContext.isPresent() && enableSqlCache) {
|
||||
sqlCacheContext.get().setResultSetInFe(resultSet);
|
||||
Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache(
|
||||
statementContext.getConnectContext(),
|
||||
statementContext.getOriginStatement().originStmt
|
||||
);
|
||||
}
|
||||
return Optional.of(resultSet);
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,16 +17,20 @@
|
||||
|
||||
package org.apache.doris.nereids.trees.plans.physical;
|
||||
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.SqlCacheContext;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.properties.LogicalProperties;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.plans.ComputeResultSet;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.Sink;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
|
||||
import org.apache.doris.nereids.util.Utils;
|
||||
import org.apache.doris.qe.ResultSet;
|
||||
import org.apache.doris.statistics.Statistics;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -39,7 +43,8 @@ import java.util.Optional;
|
||||
/**
|
||||
* result sink
|
||||
*/
|
||||
public class PhysicalResultSink<CHILD_TYPE extends Plan> extends PhysicalSink<CHILD_TYPE> implements Sink {
|
||||
public class PhysicalResultSink<CHILD_TYPE extends Plan> extends PhysicalSink<CHILD_TYPE>
|
||||
implements Sink, ComputeResultSet {
|
||||
|
||||
public PhysicalResultSink(List<NamedExpression> outputExprs, Optional<GroupExpression> groupExpression,
|
||||
LogicalProperties logicalProperties, CHILD_TYPE child) {
|
||||
@ -125,4 +130,15 @@ public class PhysicalResultSink<CHILD_TYPE extends Plan> extends PhysicalSink<CH
|
||||
return new PhysicalResultSink<>(outputExprs, groupExpression,
|
||||
null, physicalProperties, statistics, child());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ResultSet> computeResultInFe(
|
||||
CascadesContext cascadesContext, Optional<SqlCacheContext> sqlCacheContext) {
|
||||
CHILD_TYPE child = child();
|
||||
if (child instanceof ComputeResultSet) {
|
||||
return ((ComputeResultSet) child).computeResultInFe(cascadesContext, sqlCacheContext);
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,12 +19,15 @@ package org.apache.doris.nereids.trees.plans.physical;
|
||||
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.SqlCacheContext;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.properties.FunctionalDependencies;
|
||||
import org.apache.doris.nereids.properties.LogicalProperties;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.plans.ComputeResultSet;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
import org.apache.doris.nereids.trees.plans.TreeStringPlan;
|
||||
@ -44,7 +47,7 @@ import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
/** PhysicalSqlCache */
|
||||
public class PhysicalSqlCache extends PhysicalLeaf implements SqlCache, TreeStringPlan {
|
||||
public class PhysicalSqlCache extends PhysicalLeaf implements SqlCache, TreeStringPlan, ComputeResultSet {
|
||||
private final TUniqueId queryId;
|
||||
private final List<String> columnLabels;
|
||||
private final List<Expr> resultExprs;
|
||||
@ -154,4 +157,10 @@ public class PhysicalSqlCache extends PhysicalLeaf implements SqlCache, TreeStri
|
||||
public String getChildrenTreeString() {
|
||||
return planBody;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ResultSet> computeResultInFe(
|
||||
CascadesContext cascadesContext, Optional<SqlCacheContext> sqlCacheContext) {
|
||||
return resultSet;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1591,7 +1591,7 @@ public class StmtExecutor {
|
||||
String originStmt = parsedStmt.getOrigStmt().originStmt;
|
||||
NereidsSqlCacheManager sqlCacheManager = context.getEnv().getSqlCacheManager();
|
||||
if (cacheResult != null) {
|
||||
sqlCacheManager.tryAddCache(context, originStmt, cacheAnalyzer, false);
|
||||
sqlCacheManager.tryAddBeCache(context, originStmt, cacheAnalyzer);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1809,11 +1809,7 @@ public class StmtExecutor {
|
||||
Cache cache = cacheAnalyzer.getCache();
|
||||
if (cache instanceof SqlCache && !cache.isDisableCache() && planner instanceof NereidsPlanner) {
|
||||
String originStmt = parsedStmt.getOrigStmt().originStmt;
|
||||
LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) queryStmt;
|
||||
boolean currentMissParseSqlFromSqlCache = !(logicalPlanAdapter.getLogicalPlan()
|
||||
instanceof org.apache.doris.nereids.trees.plans.algebra.SqlCache);
|
||||
context.getEnv().getSqlCacheManager().tryAddCache(
|
||||
context, originStmt, cacheAnalyzer, currentMissParseSqlFromSqlCache);
|
||||
context.getEnv().getSqlCacheManager().tryAddBeCache(context, originStmt, cacheAnalyzer);
|
||||
}
|
||||
}
|
||||
if (!isSendFields) {
|
||||
|
||||
Reference in New Issue
Block a user