[Bug] Bucket Shuffle Join may cause:Failed to send brpc batch, Not connected to 0.0.0.0:0 (#5901)
This commit is contained in:
@ -95,6 +95,10 @@ public class DataPartition {
|
||||
return type != TPartitionType.UNPARTITIONED;
|
||||
}
|
||||
|
||||
public boolean isBucketShuffleHashPartition() {
|
||||
return type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED;
|
||||
}
|
||||
|
||||
public TPartitionType getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
@ -828,7 +828,11 @@ public class Coordinator {
|
||||
params.instanceExecParams.size() + destParams.perExchNumSenders.get(exchId.asInt()));
|
||||
}
|
||||
|
||||
if (bucketShuffleJoinController.isBucketShuffleJoin(destFragment.getFragmentId().asInt())) {
|
||||
if (sink.getOutputPartition().isBucketShuffleHashPartition()) {
|
||||
// the destFragment must be bucket shuffle
|
||||
Preconditions.checkState(bucketShuffleJoinController.
|
||||
isBucketShuffleJoin(destFragment.getFragmentId().asInt()));
|
||||
|
||||
int bucketSeq = 0;
|
||||
int bucketNum = bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId());
|
||||
TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0);
|
||||
@ -1544,7 +1548,6 @@ public class Coordinator {
|
||||
return true;
|
||||
}
|
||||
|
||||
// One fragment could only have one HashJoinNode
|
||||
if (node instanceof HashJoinNode) {
|
||||
HashJoinNode joinNode = (HashJoinNode) node;
|
||||
if (joinNode.isBucketShuffle()) {
|
||||
|
||||
Reference in New Issue
Block a user