[pipelineX](local shuffle) Fix local shuffle for colocate/bucket join (#28032)

This commit is contained in:
Gabriel
2023-12-06 10:02:36 +08:00
committed by GitHub
parent 73404405e6
commit 1be513b927
31 changed files with 265 additions and 99 deletions

View File

@ -42,6 +42,7 @@ import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TEqJoinCondition;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.THashJoinNode;
import org.apache.doris.thrift.TJoinDistributionType;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
@ -730,6 +731,7 @@ public class HashJoinNode extends JoinNodeBase {
msg.hash_join_node.addToVintermediateTupleIdList(tupleDescriptor.getId().asInt());
}
}
msg.hash_join_node.setDistType(isColocate ? TJoinDistributionType.COLOCATE : distrMode.toThrift());
}
@Override
@ -812,6 +814,22 @@ public class HashJoinNode extends JoinNodeBase {
public String toString() {
return description;
}
public TJoinDistributionType toThrift() {
switch (this) {
case NONE:
return TJoinDistributionType.NONE;
case BROADCAST:
return TJoinDistributionType.BROADCAST;
case PARTITIONED:
return TJoinDistributionType.PARTITIONED;
case BUCKET_SHUFFLE:
return TJoinDistributionType.BUCKET_SHUFFLE;
default:
Preconditions.checkArgument(false, "Unknown DistributionMode: " + toString());
}
return TJoinDistributionType.NONE;
}
}
/**

View File

@ -64,6 +64,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.resource.Tag;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.StatsDeriveResult;
@ -1444,7 +1445,7 @@ public class OlapScanNode extends ScanNode {
msg.olap_scan_node.setOutputColumnUniqueIds(outputColumnUniqueIds);
}
if (shouldColoScan) {
if (shouldColoScan || SessionVariable.enablePipelineEngineX()) {
msg.olap_scan_node.setDistributeColumnIds(new ArrayList<>(distributionColumnIds));
}
}
@ -1621,8 +1622,10 @@ public class OlapScanNode extends ScanNode {
public void finalizeForNereids() {
computeNumNodes();
computeStatsForNereids();
// distributionColumnIds is used for one backend node agg optimization, nereids do not support it.
distributionColumnIds.clear();
if (!SessionVariable.enablePipelineEngineX()) {
// distributionColumnIds is used for one backend node agg optimization, nereids do not support it.
distributionColumnIds.clear();
}
}
private void computeStatsForNereids() {

View File

@ -144,6 +144,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
// The runtime filter id that is expected to be used
private Set<RuntimeFilterId> targetRuntimeFilterIds;
private int bucketNum;
// has colocate plan node
private boolean hasColocatePlanNode = false;
@ -460,4 +462,12 @@ public class PlanFragment extends TreeNode<PlanFragment> {
public void setFragmentSequenceNum(int seq) {
fragmentSequenceNum = seq;
}
public int getBucketNum() {
return bucketNum;
}
public void setBucketNum(int bucketNum) {
this.bucketNum = bucketNum;
}
}

View File

@ -45,6 +45,7 @@ import org.apache.doris.planner.IntersectNode;
import org.apache.doris.planner.MultiCastDataSink;
import org.apache.doris.planner.MultiCastPlanFragment;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OriginalPlanner;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNode;
@ -300,6 +301,9 @@ public class Coordinator implements CoordInterface {
&& (fragments.size() > 0);
initQueryOptions(context);
if (planner instanceof OriginalPlanner) {
queryOptions.setEnableLocalShuffle(false);
}
setFromUserProperty(context);
@ -2240,6 +2244,15 @@ public class Coordinator implements CoordInterface {
if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>());
fragmentIdTobucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange());
// Same as bucket shuffle.
int bucketNum = 0;
if (scanNode.getOlapTable().isColocateTable()) {
bucketNum = scanNode.getOlapTable().getDefaultDistributionInfo().getBucketNum();
} else {
bucketNum = (int) (scanNode.getTotalTabletsNum());
}
scanNode.getFragment().setBucketNum(bucketNum);
}
Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
BucketSeqToScanRange bucketSeqToScanRange = fragmentIdTobucketSeqToScanRangeMap.get(scanNode.getFragmentId());
@ -2767,6 +2780,7 @@ public class Coordinator implements CoordInterface {
fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashMap<>());
fragmentIdBucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange());
fragmentIdToBuckendIdBucketCountMap.put(scanNode.getFragmentId(), new HashMap<>());
scanNode.getFragment().setBucketNum(bucketNum);
}
Map<Integer, TNetworkAddress> bucketSeqToAddress
= fragmentIdToSeqToAddressMap.get(scanNode.getFragmentId());
@ -3554,7 +3568,12 @@ public class Coordinator implements CoordInterface {
}
params.setFileScanParams(fileScanRangeParamsMap);
params.setNumBuckets(fragment.getBucketNum());
res.put(instanceExecParam.host, params);
res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer, Integer>());
}
for (int bucket : instanceExecParam.bucketSeqSet) {
res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket, i);
}
TPipelineFragmentParams params = res.get(instanceExecParam.host);
TPipelineInstanceParams localParams = new TPipelineInstanceParams();

View File

@ -2569,7 +2569,7 @@ public class SessionVariable implements Serializable, Writable {
tResult.setBeExecVersion(Config.be_exec_version);
tResult.setEnablePipelineEngine(enablePipelineEngine);
tResult.setEnablePipelineXEngine(enablePipelineXEngine);
tResult.setEnableLocalShuffle(enableLocalShuffle);
tResult.setEnableLocalShuffle(enableLocalShuffle && enableNereidsPlanner);
tResult.setParallelInstance(getParallelExecInstanceNum());
tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary);
tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery);
@ -2976,6 +2976,14 @@ public class SessionVariable implements Serializable, Writable {
|| connectContext.getSessionVariable().enablePipelineXEngine;
}
public static boolean enablePipelineEngineX() {
ConnectContext connectContext = ConnectContext.get();
if (connectContext == null) {
return false;
}
return connectContext.getSessionVariable().enablePipelineXEngine;
}
public static boolean enableAggState() {
ConnectContext connectContext = ConnectContext.get();
if (connectContext == null) {