[pipelineX](bug) Fix correctness problem using multiple BE (#29765)

This commit is contained in:
Gabriel
2024-01-10 14:49:46 +08:00
committed by yiguolei
parent fae7a395de
commit 443b79d6ba
2 changed files with 8 additions and 6 deletions

View File

@ -724,7 +724,9 @@ public abstract class ScanNode extends PlanNode {
return !isKeySearch() && context != null
&& context.getSessionVariable().isIgnoreStorageDataDistribution()
&& context.getSessionVariable().getEnablePipelineXEngine()
&& !fragment.isHasNullAwareLeftAntiJoin();
&& !fragment.isHasNullAwareLeftAntiJoin()
&& ((this instanceof OlapScanNode) && ((OlapScanNode) this).getScanTabletIds().size()
< ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
}
public boolean haveLimitAndConjunts() {

View File

@ -2044,9 +2044,10 @@ public class Coordinator implements CoordInterface {
boolean sharedScan = true;
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
boolean ignoreStorageDataDistribution = scanNodes.stream()
.allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context)) && useNereids;
if (node.isPresent() && (!node.get().shouldDisableSharedScan(context)
|| (node.get().ignoreStorageDataDistribution(context)
&& expectedInstanceNum > perNodeScanRanges.size() && useNereids))) {
|| ignoreStorageDataDistribution)) {
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
// if have limit and conjunts, only need 1 instance to save cpu and
// mem resource
@ -2906,9 +2907,8 @@ public class Coordinator implements CoordInterface {
* 2. `parallelExecInstanceNum` is larger than scan ranges.
* 3. Use Nereids planner.
*/
boolean ignoreStorageDataDistribution = scanNodes.stream().filter(scanNode -> {
return scanNodeIds.contains(scanNode.getId().asInt());
}).allMatch(node -> node.ignoreStorageDataDistribution(context))
boolean ignoreStorageDataDistribution = scanNodes.stream()
.allMatch(node -> node.ignoreStorageDataDistribution(context))
&& addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> {
return addressScanRange.getValue().size() < parallelExecInstanceNum;
}) && useNereids;