[Chore](agg-state) add sessionvariable enable_agg_state (#21373)

add sessionvariable enable_agg_state
This commit is contained in:
Pxl
2023-07-04 14:25:21 +08:00
committed by GitHub
parent 9477436524
commit 65cb91e60e
16 changed files with 53 additions and 66 deletions

View File

@ -2294,20 +2294,6 @@ public class Analyzer {
return globalState.context.getSessionVariable().isEnableFoldConstantByBe();
}
/**
* Returns true if predicate 'e' can be correctly evaluated by a tree materializing
* 'tupleIds', otherwise false:
* - the predicate needs to be bound by tupleIds
* - a Where clause predicate can only be correctly evaluated if for all outer-joined
* referenced tids the last join to outer-join this tid has been materialized
* - an On clause predicate against the non-nullable side of an Outer Join clause
* can only be correctly evaluated by the join node that materializes the
* Outer Join clause
*/
private boolean canEvalPredicate(PlanNode node, Expr e) {
return canEvalPredicate(node.getTblRefIds(), e);
}
/**
* Returns true if predicate 'e' can be correctly evaluated by a tree materializing
* 'tupleIds', otherwise false:

View File

@ -29,6 +29,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.qe.SessionVariable;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
@ -303,6 +304,11 @@ public class ColumnDef {
throw new AnalysisException(String.format("Aggregate type %s is not compatible with primitive type %s",
toString(), type.toSql()));
}
if (aggregateType == AggregateType.GENERIC_AGGREGATION) {
if (!SessionVariable.enableAggState()) {
throw new AnalysisException("agg state not enable, need set enable_agg_state=true");
}
}
}
if (type.getPrimitiveType() == PrimitiveType.FLOAT || type.getPrimitiveType() == PrimitiveType.DOUBLE) {

View File

@ -1,30 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import org.apache.doris.qe.ConnectContext;
public class VectorizedUtil {
public static boolean isPipeline() {
ConnectContext connectContext = ConnectContext.get();
if (connectContext == null) {
return false;
}
return connectContext.getSessionVariable().enablePipelineEngine();
}
}

View File

@ -36,7 +36,7 @@ import org.apache.doris.planner.HashJoinNode.DistributionMode;
import org.apache.doris.planner.JoinNodeBase;
import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.collect.ImmutableList;
@ -175,8 +175,7 @@ public class RuntimeFilterTranslator {
origFilter.extractTargetsPosition();
// Number of parallel instances are large for pipeline engine, so we prefer bloom filter.
if (origFilter.hasRemoteTargets() && origFilter.getType() == TRuntimeFilterType.IN_OR_BLOOM
&& ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().enablePipelineEngine()) {
&& SessionVariable.enablePipelineEngine()) {
origFilter.setType(TRuntimeFilterType.BLOOM);
}
return origFilter;

View File

@ -313,7 +313,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
} else {
// in-filter is not friendly to pipeline
if (type == TRuntimeFilterType.IN_OR_BLOOM
&& ctx.getSessionVariable().enablePipelineEngine()
&& ctx.getSessionVariable().getEnablePipelineEngine()
&& hasRemoteTarget(join, scan)) {
type = TRuntimeFilterType.BLOOM;
}
@ -363,7 +363,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
}
PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
if (type == TRuntimeFilterType.IN_OR_BLOOM
&& ctx.getSessionVariable().enablePipelineEngine()
&& ctx.getSessionVariable().getEnablePipelineEngine()
&& hasRemoteTarget(join, scan)) {
type = TRuntimeFilterType.BLOOM;
}
@ -563,7 +563,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
PhysicalHashJoin join = innerEntry.getValue();
Preconditions.checkState(join != null);
TRuntimeFilterType type = TRuntimeFilterType.IN_OR_BLOOM;
if (ctx.getSessionVariable().enablePipelineEngine()) {
if (ctx.getSessionVariable().getEnablePipelineEngine()) {
type = TRuntimeFilterType.BLOOM;
}
EqualTo newEqualTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(

View File

@ -52,7 +52,7 @@ public class BuildCTEAnchorAndCTEProducer extends OneRewriteRuleFactory {
CTEId id = logicalCTE.findCTEId(s.getAlias());
if (cascadesContext.cteReferencedCount(id)
<= ConnectContext.get().getSessionVariable().inlineCTEReferencedThreshold
|| !ConnectContext.get().getSessionVariable().enablePipelineEngine) {
|| !ConnectContext.get().getSessionVariable().getEnablePipelineEngine()) {
continue;
}
LogicalCTEProducer logicalCTEProducer = new LogicalCTEProducer(

View File

@ -47,7 +47,7 @@ public class InlineCTE extends OneRewriteRuleFactory {
* Current we only implement CTE Materialize on pipeline engine and only materialize those CTE whose
* refCount > NereidsRewriter.INLINE_CTE_REFERENCED_THRESHOLD.
*/
if (ConnectContext.get().getSessionVariable().enablePipelineEngine
if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
&& ConnectContext.get().getSessionVariable().enableCTEMaterialize
&& refCount > INLINE_CTE_REFERENCED_THRESHOLD) {
return cteConsumer;

View File

@ -39,6 +39,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TPartitionType;
import com.google.common.base.Preconditions;
@ -935,7 +936,7 @@ public class DistributedPlanner {
childFragment.addPlanRoot(node);
childFragment.setHasColocatePlanNode(true);
return childFragment;
} else if (ConnectContext.get().getSessionVariable().enablePipelineEngine()
} else if (SessionVariable.enablePipelineEngine()
&& childFragment.getPlanRoot().shouldColoAgg(node.getAggInfo())
&& childFragment.getPlanRoot() instanceof OlapScanNode) {
childFragment.getPlanRoot().setShouldColoScan();

View File

@ -1229,7 +1229,7 @@ public class OlapScanNode extends ScanNode {
public int getNumInstances() {
// In pipeline exec engine, the instance num equals be_num * parallel instance.
// so here we need count distinct be_num to do the work. make sure get right instance
if (ConnectContext.get().getSessionVariable().enablePipelineEngine()) {
if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()) {
int parallelInstance = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
long numBackend = scanRangeLocations.stream().flatMap(rangeLoc -> rangeLoc.getLocations().stream())
.map(loc -> loc.backend_id).distinct().count();
@ -1241,7 +1241,7 @@ public class OlapScanNode extends ScanNode {
@Override
public boolean shouldColoAgg(AggregateInfo aggregateInfo) {
distributionColumnIds.clear();
if (ConnectContext.get().getSessionVariable().enablePipelineEngine()
if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
&& ConnectContext.get().getSessionVariable().enableColocateScan()) {
List<Expr> aggPartitionExprs = aggregateInfo.getInputPartitionExprs();
List<SlotDescriptor> slots = desc.getSlots();

View File

@ -35,7 +35,6 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ListUtil;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
@ -311,7 +310,7 @@ public class Coordinator {
this.returnedAllResults = false;
this.enableShareHashTableForBroadcastJoin = context.getSessionVariable().enableShareHashTableForBroadcastJoin;
this.enablePipelineEngine = context.getSessionVariable().enablePipelineEngine;
this.enablePipelineEngine = context.getSessionVariable().getEnablePipelineEngine();
initQueryOptions(context);
setFromUserProperty(context);
@ -379,7 +378,7 @@ public class Coordinator {
private void initQueryOptions(ConnectContext context) {
this.queryOptions = context.getSessionVariable().toThrift();
this.queryOptions.setEnablePipelineEngine(VectorizedUtil.isPipeline());
this.queryOptions.setEnablePipelineEngine(SessionVariable.enablePipelineEngine());
this.queryOptions.setBeExecVersion(Config.be_exec_version);
this.queryOptions.setQueryTimeout(context.getExecTimeout());
this.queryOptions.setExecutionTimeout(context.getExecTimeout());

View File

@ -191,6 +191,9 @@ public class SessionVariable implements Serializable, Writable {
public static final long MIN_INSERT_VISIBLE_TIMEOUT_MS = 1000;
public static final String ENABLE_PIPELINE_ENGINE = "enable_pipeline_engine";
public static final String ENABLE_AGG_STATE = "enable_agg_state";
public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline";
public static final String ENABLE_SINGLE_DISTINCT_COLUMN_OPT = "enable_single_distinct_column_opt";
@ -613,7 +616,10 @@ public class SessionVariable implements Serializable, Writable {
public boolean enableVectorizedEngine = true;
@VariableMgr.VarAttr(name = ENABLE_PIPELINE_ENGINE, fuzzy = true, expType = ExperimentalType.EXPERIMENTAL)
public boolean enablePipelineEngine = true;
private boolean enablePipelineEngine = true;
@VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, expType = ExperimentalType.EXPERIMENTAL)
public boolean enableAggState = false;
@VariableMgr.VarAttr(name = ENABLE_PARALLEL_OUTFILE)
public boolean enableParallelOutfile = false;
@ -1636,10 +1642,6 @@ public class SessionVariable implements Serializable, Writable {
this.runtimeFilterMaxInNum = runtimeFilterMaxInNum;
}
public boolean enablePipelineEngine() {
return enablePipelineEngine;
}
public void setEnablePipelineEngine(boolean enablePipelineEngine) {
this.enablePipelineEngine = enablePipelineEngine;
}
@ -2338,4 +2340,24 @@ public class SessionVariable implements Serializable, Writable {
public boolean isEnableUnifiedLoad() {
return enableUnifiedLoad;
}
public boolean getEnablePipelineEngine() {
return enablePipelineEngine;
}
public static boolean enablePipelineEngine() {
ConnectContext connectContext = ConnectContext.get();
if (connectContext == null) {
return false;
}
return connectContext.getSessionVariable().enablePipelineEngine;
}
public static boolean enableAggState() {
ConnectContext connectContext = ConnectContext.get();
if (connectContext == null) {
return true;
}
return connectContext.getSessionVariable().enableAggState;
}
}

View File

@ -316,7 +316,7 @@ public class StmtExecutor {
builder.parallelFragmentExecInstance(String.valueOf(context.sessionVariable.getParallelExecInstanceNum()));
builder.traceId(context.getSessionVariable().getTraceId());
builder.isNereids(context.getState().isNereids ? "Yes" : "No");
builder.isPipeline(context.getSessionVariable().enablePipelineEngine ? "Yes" : "No");
builder.isPipeline(context.getSessionVariable().getEnablePipelineEngine() ? "Yes" : "No");
return builder.build();
}
@ -564,7 +564,7 @@ public class StmtExecutor {
private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
// queue query here
if (!parsedStmt.isExplain() && Config.enable_workload_group && Config.enable_query_queue
&& context.getSessionVariable().enablePipelineEngine()) {
&& context.getSessionVariable().getEnablePipelineEngine()) {
this.queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
try {
this.offerRet = queryQueue.offer();
@ -1358,7 +1358,7 @@ public class StmtExecutor {
// 2. If this is a query, send the result expr fields first, and send result data back to client.
RowBatch batch;
coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator());
if (Config.enable_workload_group && context.sessionVariable.enablePipelineEngine()) {
if (Config.enable_workload_group && context.sessionVariable.getEnablePipelineEngine()) {
coord.setTWorkloadGroups(context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context));
}
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),

View File

@ -16,6 +16,7 @@
// under the License.
suite("test_agg_state_group_concat") {
sql "set enable_agg_state=true"
sql """ DROP TABLE IF EXISTS a_table; """
sql """
create table a_table(

View File

@ -16,6 +16,7 @@
// under the License.
suite("test_agg_state_max") {
sql "set enable_agg_state=true"
sql """ DROP TABLE IF EXISTS a_table; """
sql """
create table a_table(

View File

@ -16,6 +16,7 @@
// under the License.
suite("test_agg_state_nereids") {
sql "set enable_agg_state=true"
sql "set enable_nereids_planner=true;"
sql "set enable_fallback_to_original_planner=false;"

View File

@ -16,6 +16,7 @@
// under the License.
suite("test_agg_state") {
sql "set enable_agg_state=true"
sql """ DROP TABLE IF EXISTS d_table; """
sql """
create table d_table(