[feature](pipelineX)use markFragments instead of markInstances in pipelineX (#27829)

This commit is contained in:
Mryange
2023-12-11 17:59:53 +08:00
committed by GitHub
parent 3e1e8d2ebe
commit 877935442f
21 changed files with 713 additions and 264 deletions

View File

@ -63,9 +63,14 @@ public class ExecutionProfile {
// Profile for load channels. Only for load job.
private RuntimeProfile loadChannelProfile;
// A countdown latch to mark the completion of each instance.
// use for old pipeline
// instance id -> dummy value
private MarkedCountDownLatch<TUniqueId, Long> profileDoneSignal;
// A countdown latch to mark the completion of each fragment. use for pipelineX
// fragmentId -> dummy value
private MarkedCountDownLatch<Integer, Long> profileFragmentDoneSignal;
public ExecutionProfile(TUniqueId queryId, int fragmentNum) {
executionProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId));
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
@ -79,7 +84,35 @@ public class ExecutionProfile {
executionProfile.addChild(loadChannelProfile);
}
public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> planNodeMap) {
private RuntimeProfile getPipelineXAggregatedProfile(Map<Integer, String> planNodeMap) {
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
for (int i = 0; i < fragmentProfiles.size(); ++i) {
RuntimeProfile oldFragmentProfile = fragmentProfiles.get(i);
RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " + i);
fragmentsProfile.addChild(newFragmentProfile);
List<RuntimeProfile> allPipelines = new ArrayList<RuntimeProfile>();
for (Pair<RuntimeProfile, Boolean> runtimeProfile : oldFragmentProfile.getChildList()) {
allPipelines.add(runtimeProfile.first);
}
int pipelineIdx = 0;
for (RuntimeProfile pipeline : allPipelines) {
List<RuntimeProfile> allPipelineTask = new ArrayList<RuntimeProfile>();
for (Pair<RuntimeProfile, Boolean> runtimeProfile : pipeline.getChildList()) {
allPipelineTask.add(runtimeProfile.first);
}
RuntimeProfile mergedpipelineProfile = new RuntimeProfile(
"Pipeline : " + pipelineIdx + "(instance_num="
+ allPipelineTask.size() + ")",
allPipelines.get(0).nodeId());
RuntimeProfile.mergeProfiles(allPipelineTask, mergedpipelineProfile, planNodeMap);
newFragmentProfile.addChild(mergedpipelineProfile);
pipelineIdx++;
}
}
return fragmentsProfile;
}
private RuntimeProfile getNonPipelineXAggregatedProfile(Map<Integer, String> planNodeMap) {
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
for (int i = 0; i < fragmentProfiles.size(); ++i) {
RuntimeProfile oldFragmentProfile = fragmentProfiles.get(i);
@ -97,6 +130,54 @@ public class ExecutionProfile {
return fragmentsProfile;
}
public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> planNodeMap) {
if (enablePipelineX()) {
/*
* Fragment 0
* ---Pipeline 0
* ------pipelineTask 0
* ------pipelineTask 0
* ------pipelineTask 0
* ---Pipeline 1
* ------pipelineTask 1
* ---Pipeline 2
* ------pipelineTask 2
* ------pipelineTask 2
* Fragment 1
* ---Pipeline 0
* ------......
* ---Pipeline 1
* ------......
* ---Pipeline 2
* ------......
* ......
*/
return getPipelineXAggregatedProfile(planNodeMap);
} else {
/*
* Fragment 0
* ---Instance 0
* ------pipelineTask 0
* ------pipelineTask 1
* ------pipelineTask 2
* ---Instance 1
* ------pipelineTask 0
* ------pipelineTask 1
* ------pipelineTask 2
* ---Instance 2
* ------pipelineTask 0
* ------pipelineTask 1
* ------pipelineTask 2
* Fragment 1
* ---Instance 0
* ---Instance 1
* ---Instance 2
* ......
*/
return getNonPipelineXAggregatedProfile(planNodeMap);
}
}
public RuntimeProfile getExecutionProfile() {
return executionProfile;
}
@ -120,6 +201,17 @@ public class ExecutionProfile {
}
}
private boolean enablePipelineX() {
return profileFragmentDoneSignal != null;
}
public void markFragments(int fragments) {
profileFragmentDoneSignal = new MarkedCountDownLatch<>(fragments);
for (int fragmentId = 0; fragmentId < fragments; fragmentId++) {
profileFragmentDoneSignal.addMark(fragmentId, -1L /* value is meaningless */);
}
}
public void update(long startTime, boolean isFinished) {
if (startTime > 0) {
executionProfile.getCounterTotalTime().setValue(TUnit.TIME_MS, TimeUtils.getElapsedTimeMs(startTime));
@ -133,6 +225,14 @@ public class ExecutionProfile {
}
}
if (isFinished && profileFragmentDoneSignal != null) {
try {
profileFragmentDoneSignal.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
LOG.warn("signal await error", e1);
}
}
for (RuntimeProfile fragmentProfile : fragmentProfiles) {
fragmentProfile.sortChildren();
}
@ -143,6 +243,9 @@ public class ExecutionProfile {
// count down to zero to notify all objects waiting for this
profileDoneSignal.countDownToZero(new Status());
}
if (profileFragmentDoneSignal != null) {
profileFragmentDoneSignal.countDownToZero(new Status());
}
}
public void markOneInstanceDone(TUniqueId fragmentInstanceId) {
@ -153,6 +256,14 @@ public class ExecutionProfile {
}
}
public void markOneFragmentDone(int fragmentId) {
if (profileFragmentDoneSignal != null) {
if (!profileFragmentDoneSignal.markedCountDown(fragmentId, -1L)) {
LOG.warn("Mark fragment {} done failed", fragmentId);
}
}
}
public boolean awaitAllInstancesDone(long waitTimeS) throws InterruptedException {
if (profileDoneSignal == null) {
return true;
@ -160,6 +271,13 @@ public class ExecutionProfile {
return profileDoneSignal.await(waitTimeS, TimeUnit.SECONDS);
}
public boolean awaitAllFragmentsDone(long waitTimeS) throws InterruptedException {
if (profileFragmentDoneSignal == null) {
return true;
}
return profileFragmentDoneSignal.await(waitTimeS, TimeUnit.SECONDS);
}
public boolean isAllInstancesDone() {
if (profileDoneSignal == null) {
return true;
@ -167,9 +285,16 @@ public class ExecutionProfile {
return profileDoneSignal.getCount() == 0;
}
public void addInstanceProfile(int instanceIdx, RuntimeProfile instanceProfile) {
Preconditions.checkArgument(instanceIdx < fragmentProfiles.size(),
instanceIdx + " vs. " + fragmentProfiles.size());
fragmentProfiles.get(instanceIdx).addChild(instanceProfile);
public boolean isAllFragmentsDone() {
if (profileFragmentDoneSignal == null) {
return true;
}
return profileFragmentDoneSignal.getCount() == 0;
}
public void addInstanceProfile(int fragmentId, RuntimeProfile instanceProfile) {
Preconditions.checkArgument(fragmentId < fragmentProfiles.size(),
fragmentId + " vs. " + fragmentProfiles.size());
fragmentProfiles.get(fragmentId).addChild(instanceProfile);
}
}

View File

@ -148,6 +148,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class Coordinator implements CoordInterface {
private static final Logger LOG = LogManager.getLogger(Coordinator.class);
@ -677,7 +678,12 @@ public class Coordinator implements CoordInterface {
Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId), scanRangeNum);
LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), addressToBackendID.keySet());
}
executionProfile.markInstances(instanceIds);
if (enablePipelineXEngine) {
executionProfile.markFragments(fragments.size());
} else {
executionProfile.markInstances(instanceIds);
}
if (enablePipelineEngine) {
sendPipelineCtx();
} else {
@ -894,7 +900,8 @@ public class Coordinator implements CoordInterface {
Long backendId = this.addressToBackendID.get(entry.getKey());
PipelineExecContext pipelineExecContext = new PipelineExecContext(fragment.getFragmentId(),
profileFragmentId, entry.getValue(), backendId, fragmentInstancesMap,
executionProfile.getLoadChannelProfile());
executionProfile.getLoadChannelProfile(), this.enablePipelineXEngine,
this.executionProfile);
// Each tParam will set the total number of Fragments that need to be executed on the same BE,
// and the BE will determine whether all Fragments have been executed based on this information.
// Notice. load fragment has a small probability that FragmentNumOnHost is 0, for unknown reasons.
@ -2459,7 +2466,7 @@ public class Coordinator implements CoordInterface {
public void updateFragmentExecStatus(TReportExecStatusParams params) {
if (enablePipelineXEngine) {
PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId()));
if (!ctx.updateProfile(params, true)) {
if (!ctx.updateProfile(params)) {
return;
}
@ -2503,16 +2510,14 @@ public class Coordinator implements CoordInterface {
}
Preconditions.checkArgument(params.isSetDetailedReport());
for (TDetailedReportParams param : params.detailed_report) {
if (ctx.fragmentInstancesMap.get(param.fragment_instance_id).getIsDone()) {
LOG.debug("Query {} instance {} is marked done",
DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()));
executionProfile.markOneInstanceDone(param.getFragmentInstanceId());
}
if (ctx.done) {
LOG.debug("Query {} fragment {} is marked done",
DebugUtil.printId(queryId), ctx.profileFragmentId);
executionProfile.markOneFragmentDone(ctx.profileFragmentId);
}
} else if (enablePipelineEngine) {
PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId()));
if (!ctx.updateProfile(params, false)) {
if (!ctx.updateProfile(params)) {
return;
}
@ -2657,7 +2662,11 @@ public class Coordinator implements CoordInterface {
long waitTime = Math.min(leftTimeoutS, fixedMaxWaitTime);
boolean awaitRes = false;
try {
awaitRes = executionProfile.awaitAllInstancesDone(waitTime);
if (enablePipelineXEngine) {
awaitRes = executionProfile.awaitAllFragmentsDone(waitTime);
} else {
awaitRes = executionProfile.awaitAllInstancesDone(waitTime);
}
} catch (InterruptedException e) {
// Do nothing
}
@ -2700,7 +2709,11 @@ public class Coordinator implements CoordInterface {
}
public boolean isDone() {
return executionProfile.isAllInstancesDone();
if (enablePipelineXEngine) {
return executionProfile.isAllFragmentsDone();
} else {
return executionProfile.isAllInstancesDone();
}
}
// map from a BE host address to the per-node assigned scan ranges;
@ -3092,9 +3105,13 @@ public class Coordinator implements CoordInterface {
boolean initiated;
volatile boolean done;
boolean hasCanceled;
// use for pipeline
Map<TUniqueId, RuntimeProfile> fragmentInstancesMap;
// use for pipelineX
List<RuntimeProfile> taskProfile;
boolean enablePipelineX;
RuntimeProfile loadChannelProfile;
int cancelProgress = 0;
int profileFragmentId;
TNetworkAddress brpcAddress;
TNetworkAddress address;
@ -3103,16 +3120,18 @@ public class Coordinator implements CoordInterface {
long profileReportProgress = 0;
long beProcessEpoch = 0;
private final int numInstances;
final ExecutionProfile executionProfile;
public PipelineExecContext(PlanFragmentId fragmentId, int profileFragmentId,
TPipelineFragmentParams rpcParams, Long backendId,
Map<TUniqueId, RuntimeProfile> fragmentInstancesMap,
RuntimeProfile loadChannelProfile) {
RuntimeProfile loadChannelProfile, boolean enablePipelineX, final ExecutionProfile executionProfile) {
this.profileFragmentId = profileFragmentId;
this.fragmentId = fragmentId;
this.rpcParams = rpcParams;
this.numInstances = rpcParams.local_params.size();
this.fragmentInstancesMap = fragmentInstancesMap;
this.taskProfile = new ArrayList<RuntimeProfile>();
this.loadChannelProfile = loadChannelProfile;
this.initiated = false;
@ -3125,12 +3144,27 @@ public class Coordinator implements CoordInterface {
this.hasCanceled = false;
this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime();
this.enablePipelineX = enablePipelineX;
this.executionProfile = executionProfile;
}
public Stream<RuntimeProfile> profileStream() {
if (enablePipelineX) {
return taskProfile.stream();
}
return fragmentInstancesMap.values().stream();
}
private void attachInstanceProfileToFragmentProfile() {
profileStream()
.forEach(p -> executionProfile.addInstanceProfile(this.profileFragmentId, p));
}
/**
* Some information common to all Fragments does not need to be sent repeatedly.
* Therefore, when we confirm that a certain BE has accepted the information,
* we will delete the information in the subsequent Fragment to avoid repeated sending.
* we will delete the information in the subsequent Fragment to avoid repeated
* sending.
* This information can be obtained from the cache of BE.
*/
public void unsetFields() {
@ -3144,29 +3178,31 @@ public class Coordinator implements CoordInterface {
// update profile.
// return true if profile is updated. Otherwise, return false.
public synchronized boolean updateProfile(TReportExecStatusParams params, boolean isPipelineX) {
if (isPipelineX) {
public synchronized boolean updateProfile(TReportExecStatusParams params) {
if (enablePipelineX) {
taskProfile.clear();
int pipelineIdx = 0;
for (TDetailedReportParams param : params.detailed_report) {
RuntimeProfile profile = fragmentInstancesMap.get(param.fragment_instance_id);
if (params.done && profile.getIsDone()) {
continue;
}
String name = "Pipeline :" + pipelineIdx + " "
+ " (host=" + address + ")";
RuntimeProfile profile = new RuntimeProfile(name);
taskProfile.add(profile);
if (param.isSetProfile()) {
profile.update(param.profile);
}
if (params.isSetLoadChannelProfile()) {
loadChannelProfile.update(params.loadChannelProfile);
}
if (params.done) {
profile.setIsDone(true);
profileReportProgress++;
}
pipelineIdx++;
}
if (profileReportProgress == numInstances) {
this.done = true;
if (params.isSetLoadChannelProfile()) {
loadChannelProfile.update(params.loadChannelProfile);
}
return true;
this.done = params.done;
if (this.done) {
attachInstanceProfileToFragmentProfile();
}
return this.done;
} else {
RuntimeProfile profile = fragmentInstancesMap.get(params.fragment_instance_id);
if (params.done && profile.getIsDone()) {
@ -3192,7 +3228,7 @@ public class Coordinator implements CoordInterface {
}
public synchronized void printProfile(StringBuilder builder) {
this.fragmentInstancesMap.values().stream().forEach(p -> {
this.profileStream().forEach(p -> {
p.computeTimeInProfile();
p.prettyPrint(builder, "");
});
@ -3200,23 +3236,41 @@ public class Coordinator implements CoordInterface {
// cancel all fragment instances.
// return true if cancel success. Otherwise, return false
public synchronized boolean cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {
if (!this.initiated) {
LOG.warn("Query {}, ccancel before initiated", DebugUtil.printId(queryId));
private synchronized boolean cancelFragment(Types.PPlanFragmentCancelReason cancelReason) {
if (!this.hasCanceled) {
return false;
}
// don't cancel if it is already finished
if (this.done) {
LOG.warn("Query {}, cancel after finished", DebugUtil.printId(queryId));
return false;
}
if (this.hasCanceled) {
LOG.warn("Query {}, cancel after cancelled", DebugUtil.printId(queryId));
for (RuntimeProfile profile : taskProfile) {
profile.setIsCancel(true);
}
if (LOG.isDebugEnabled()) {
LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} backend: {},"
+ " fragment id={} query={}, reason: {}",
this.initiated, this.done, this.hasCanceled, backend.getId(),
this.profileFragmentId,
DebugUtil.printId(queryId), cancelReason.name());
}
try {
try {
BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddress,
this.profileFragmentId, queryId, cancelReason);
} catch (RpcException e) {
LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(),
brpcAddress.getPort());
SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddress), e.getMessage());
}
} catch (Exception e) {
LOG.warn("catch a exception", e);
return false;
}
return true;
}
private synchronized boolean cancelInstance(Types.PPlanFragmentCancelReason cancelReason) {
for (TPipelineInstanceParams localParam : rpcParams.local_params) {
LOG.warn("cancelRemoteFragments initiated={} done={} hasCanceled={} backend:{},"
+ " fragment instance id={} query={}, reason: {}",
+ " fragment instance id={} query={}, reason: {}",
this.initiated, this.done, this.hasCanceled, backend.getId(),
DebugUtil.printId(localParam.fragment_instance_id),
DebugUtil.printId(queryId), cancelReason.name());
@ -3244,14 +3298,35 @@ public class Coordinator implements CoordInterface {
if (!this.hasCanceled) {
return false;
}
for (int i = 0; i < this.numInstances; i++) {
fragmentInstancesMap.get(rpcParams.local_params.get(i).fragment_instance_id).setIsCancel(true);
}
cancelProgress = numInstances;
return true;
}
/// TODO: refactor rpcParams
public synchronized boolean cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {
if (!this.initiated) {
LOG.warn("Query {}, ccancel before initiated", DebugUtil.printId(queryId));
return false;
}
// don't cancel if it is already finished
if (this.done) {
LOG.warn("Query {}, cancel after finished", DebugUtil.printId(queryId));
return false;
}
if (this.hasCanceled) {
LOG.warn("Query {}, cancel after cancelled", DebugUtil.printId(queryId));
return false;
}
if (this.enablePipelineX) {
return cancelFragment(cancelReason);
} else {
return cancelInstance(cancelReason);
}
}
public synchronized boolean computeTimeInProfile(int maxFragmentId) {
if (this.profileFragmentId < 0 || this.profileFragmentId > maxFragmentId) {
LOG.warn("profileFragmentId {} should be in [0, {})", profileFragmentId, maxFragmentId);
@ -3843,7 +3918,7 @@ public class Coordinator implements CoordInterface {
private void attachInstanceProfileToFragmentProfile() {
if (enablePipelineEngine) {
for (PipelineExecContext ctx : pipelineExecContexts.values()) {
ctx.fragmentInstancesMap.values().stream()
ctx.profileStream()
.forEach(p -> executionProfile.addInstanceProfile(ctx.profileFragmentId, p));
}
} else {

View File

@ -240,6 +240,24 @@ public class BackendServiceProxy {
}
}
public Future<InternalService.PCancelPlanFragmentResult> cancelPipelineXPlanFragmentAsync(TNetworkAddress address,
int fragmentId, TUniqueId queryId, Types.PPlanFragmentCancelReason cancelReason) throws RpcException {
final InternalService.PCancelPlanFragmentRequest pRequest = InternalService.PCancelPlanFragmentRequest
.newBuilder()
.setFinstId(Types.PUniqueId.newBuilder().setHi(0).setLo(0).build())
.setCancelReason(cancelReason)
.setFragmentId(fragmentId)
.setQueryId(Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build()).build();
try {
final BackendServiceClient client = getProxy(address);
return client.cancelPlanFragmentAsync(pRequest);
} catch (Throwable e) {
LOG.warn("Cancel plan fragment catch a exception, address={}:{}", address.getHostname(), address.getPort(),
e);
throw new RpcException(address.hostname, e.getMessage());
}
}
public Future<InternalService.PFetchDataResult> fetchDataAsync(
TNetworkAddress address, InternalService.PFetchDataRequest request) throws RpcException {
try {