[fix](pipelineX) fix multi be may be missing profiles #29914

This commit is contained in:
Mryange
2024-01-14 17:33:17 +08:00
committed by yiguolei
parent fd66ce0928
commit 7a574df9fc
2 changed files with 34 additions and 2 deletions

View File

@ -33,11 +33,14 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* ExecutionProfile is used to collect profile of a complete query plan(including query or load).
@ -72,6 +75,12 @@ public class ExecutionProfile {
// fragmentId -> dummy value
private MarkedCountDownLatch<Integer, Long> profileFragmentDoneSignal;
// fragmentId -> The number of BE without 'done.
private Map<Integer, Integer> befragmentDone;
// lock befragmentDone
private ReadWriteLock lock;
// use to merge profile from multi be
private List<Map<TNetworkAddress, List<RuntimeProfile>>> multiBeProfile = null;
@ -231,8 +240,20 @@ public class ExecutionProfile {
public void markFragments(int fragments) {
profileFragmentDoneSignal = new MarkedCountDownLatch<>(fragments);
lock = new ReentrantReadWriteLock();
befragmentDone = new HashMap<>();
for (int fragmentId = 0; fragmentId < fragments; fragmentId++) {
profileFragmentDoneSignal.addMark(fragmentId, -1L /* value is meaningless */);
befragmentDone.put(fragmentId, 0);
}
}
public void addFragments(int fragmentId) {
lock.writeLock().lock();
try {
befragmentDone.put(fragmentId, befragmentDone.get(fragmentId) + 1);
} finally {
lock.writeLock().unlock();
}
}
@ -282,8 +303,16 @@ public class ExecutionProfile {
public void markOneFragmentDone(int fragmentId) {
if (profileFragmentDoneSignal != null) {
if (!profileFragmentDoneSignal.markedCountDown(fragmentId, -1L)) {
LOG.warn("Mark fragment {} done failed", fragmentId);
lock.writeLock().lock();
try {
befragmentDone.put(fragmentId, befragmentDone.get(fragmentId) - 1);
if (befragmentDone.get(fragmentId) == 0) {
if (!profileFragmentDoneSignal.markedCountDown(fragmentId, -1L)) {
LOG.warn("Mark fragment {} done failed", fragmentId);
}
}
} finally {
lock.writeLock().unlock();
}
}
}

View File

@ -3196,6 +3196,9 @@ public class Coordinator implements CoordInterface {
this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime();
this.enablePipelineX = enablePipelineX;
this.executionProfile = executionProfile;
if (enablePipelineX) {
executionProfile.addFragments(profileFragmentId);
}
}
public Stream<RuntimeProfile> profileStream() {