[Colo][Scan] delete the colo scan code (#30584)
This commit is contained in:
@ -39,7 +39,6 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.SessionVariable;
|
||||
import org.apache.doris.thrift.TPartitionType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -932,13 +931,6 @@ public class DistributedPlanner {
|
||||
childFragment.addPlanRoot(node);
|
||||
childFragment.setHasColocatePlanNode(true);
|
||||
return childFragment;
|
||||
} else if (SessionVariable.enablePipelineEngine()
|
||||
&& childFragment.getPlanRoot().shouldColoAgg(node.getAggInfo())
|
||||
&& childFragment.getPlanRoot() instanceof OlapScanNode) {
|
||||
childFragment.getPlanRoot().setShouldColoScan();
|
||||
childFragment.addPlanRoot(node);
|
||||
childFragment.setHasColocatePlanNode(false);
|
||||
return childFragment;
|
||||
} else {
|
||||
return createMergeAggregationFragment(node, childFragment);
|
||||
}
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.AggregateInfo;
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.BaseTableRef;
|
||||
import org.apache.doris.analysis.BinaryPredicate;
|
||||
@ -1334,44 +1333,6 @@ public class OlapScanNode extends ScanNode {
|
||||
return scanRangeLocations.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldColoAgg(AggregateInfo aggregateInfo) {
|
||||
distributionColumnIds.clear();
|
||||
if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
|
||||
&& ConnectContext.get().getSessionVariable().enableColocateScan()) {
|
||||
List<Expr> aggPartitionExprs = aggregateInfo.getInputPartitionExprs();
|
||||
List<SlotDescriptor> slots = desc.getSlots();
|
||||
for (Expr aggExpr : aggPartitionExprs) {
|
||||
if (aggExpr instanceof SlotRef) {
|
||||
SlotDescriptor slotDesc = ((SlotRef) aggExpr).getDesc();
|
||||
int columnId = 0;
|
||||
for (SlotDescriptor slotDescriptor : slots) {
|
||||
if (slotDescriptor.equals(slotDesc)) {
|
||||
if (slotDescriptor.getType().isFixedLengthType()
|
||||
|| slotDescriptor.getType().isStringType()) {
|
||||
distributionColumnIds.add(columnId);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
columnId++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < slots.size(); i++) {
|
||||
if (!distributionColumnIds.contains(i) && (!slots.get(i).getType().isFixedLengthType()
|
||||
|| slots.get(i).getType().isStringType())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return !distributionColumnIds.isEmpty();
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setShouldColoScan() {
|
||||
shouldColoScan = true;
|
||||
|
||||
@ -20,7 +20,6 @@
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.AggregateInfo;
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.BitmapFilterPredicate;
|
||||
import org.apache.doris.analysis.CompoundPredicate;
|
||||
@ -881,10 +880,6 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
|
||||
return this.children.get(0).getNumInstances();
|
||||
}
|
||||
|
||||
public boolean shouldColoAgg(AggregateInfo aggregateInfo) {
|
||||
return false;
|
||||
}
|
||||
|
||||
public void setShouldColoScan() {}
|
||||
|
||||
public boolean getShouldColoScan() {
|
||||
|
||||
@ -114,7 +114,6 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public static final String BATCH_SIZE = "batch_size";
|
||||
public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations";
|
||||
public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan";
|
||||
public static final String ENABLE_COLOCATE_SCAN = "enable_colocate_scan";
|
||||
public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join";
|
||||
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
|
||||
public static final String PARALLEL_PIPELINE_TASK_NUM = "parallel_pipeline_task_num";
|
||||
@ -707,9 +706,6 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = DISABLE_COLOCATE_PLAN)
|
||||
public boolean disableColocatePlan = false;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_COLOCATE_SCAN)
|
||||
public boolean enableColocateScan = false;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_JOIN, varType = VariableAnnotation.EXPERIMENTAL_ONLINE)
|
||||
public boolean enableBucketShuffleJoin = true;
|
||||
|
||||
@ -2043,10 +2039,6 @@ public class SessionVariable implements Serializable, Writable {
|
||||
return disableColocatePlan;
|
||||
}
|
||||
|
||||
public boolean enableColocateScan() {
|
||||
return enableColocateScan;
|
||||
}
|
||||
|
||||
public boolean isEnableBucketShuffleJoin() {
|
||||
return enableBucketShuffleJoin;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user