[Bug](pipeline-engine) fix hang on insert into select when enable pipeline engine (#19075)
This commit is contained in:
@ -66,6 +66,9 @@ public class RuntimeProfile {
|
||||
|
||||
private Long timestamp = -1L;
|
||||
|
||||
private Boolean isDone = false;
|
||||
private Boolean isCancel = false;
|
||||
|
||||
public RuntimeProfile(String name) {
|
||||
this();
|
||||
this.name = name;
|
||||
@ -77,6 +80,22 @@ public class RuntimeProfile {
|
||||
this.counterMap.put("TotalTime", counterTotalTime);
|
||||
}
|
||||
|
||||
public void setIsCancel(Boolean isCancel) {
|
||||
this.isCancel = isCancel;
|
||||
}
|
||||
|
||||
public Boolean getIsCancel() {
|
||||
return isCancel;
|
||||
}
|
||||
|
||||
public void setIsDone(Boolean isDone) {
|
||||
this.isDone = isDone;
|
||||
}
|
||||
|
||||
public Boolean getIsDone() {
|
||||
return isDone;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@ -207,7 +207,7 @@ public class Coordinator {
|
||||
|
||||
// backend execute state
|
||||
private final List<BackendExecState> backendExecStates = Lists.newArrayList();
|
||||
private final Map<Integer, PipelineExecContext> pipelineExecContexts = new HashMap<>();
|
||||
private final Map<Pair<Integer, Long>, PipelineExecContext> pipelineExecContexts = new HashMap<>();
|
||||
// backend which state need to be checked when joining this coordinator.
|
||||
// It is supposed to be the subset of backendExecStates.
|
||||
private final List<BackendExecState> needCheckBackendExecStates = Lists.newArrayList();
|
||||
@ -840,11 +840,20 @@ public class Coordinator {
|
||||
needCheckBackendState = true;
|
||||
}
|
||||
|
||||
Map<TUniqueId, RuntimeProfile> fragmentInstancesMap = new HashMap<TUniqueId, RuntimeProfile>();
|
||||
for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry : tParams.entrySet()) {
|
||||
for (TPipelineInstanceParams instanceParam : entry.getValue().local_params) {
|
||||
String name = "Instance " + DebugUtil.printId(instanceParam.fragment_instance_id)
|
||||
+ " (host=" + entry.getKey() + ")";
|
||||
fragmentInstancesMap.put(instanceParam.fragment_instance_id, new RuntimeProfile(name));
|
||||
}
|
||||
}
|
||||
|
||||
// 3. group BackendExecState by BE. So that we can use one RPC to send all fragment instances of a BE.
|
||||
for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry : tParams.entrySet()) {
|
||||
PipelineExecContext pipelineExecContext =
|
||||
new PipelineExecContext(fragment.getFragmentId(),
|
||||
profileFragmentId, entry.getValue(), this.addressToBackendID, entry.getKey());
|
||||
Long backendId = this.addressToBackendID.get(entry.getKey());
|
||||
PipelineExecContext pipelineExecContext = new PipelineExecContext(fragment.getFragmentId(),
|
||||
profileFragmentId, entry.getValue(), backendId, fragmentInstancesMap);
|
||||
// Each tParam will set the total number of Fragments that need to be executed on the same BE,
|
||||
// and the BE will determine whether all Fragments have been executed based on this information.
|
||||
// Notice. load fragment has a small probability that FragmentNumOnHost is 0, for unknown reasons.
|
||||
@ -853,7 +862,7 @@ public class Coordinator {
|
||||
entry.getValue().setNeedWaitExecutionTrigger(twoPhaseExecution);
|
||||
entry.getValue().setFragmentId(fragment.getFragmentId().asInt());
|
||||
|
||||
pipelineExecContexts.put(fragment.getFragmentId().asInt(), pipelineExecContext);
|
||||
pipelineExecContexts.put(Pair.of(fragment.getFragmentId().asInt(), backendId), pipelineExecContext);
|
||||
if (needCheckBackendState) {
|
||||
needCheckPipelineExecContexts.add(pipelineExecContext);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
@ -2111,7 +2120,7 @@ public class Coordinator {
|
||||
|
||||
public void updateFragmentExecStatus(TReportExecStatusParams params) {
|
||||
if (enablePipelineEngine) {
|
||||
PipelineExecContext ctx = pipelineExecContexts.get(params.getFragmentId());
|
||||
PipelineExecContext ctx = pipelineExecContexts.get(Pair.of(params.getFragmentId(), params.getBackendId()));
|
||||
if (!ctx.updateProfile(params)) {
|
||||
return;
|
||||
}
|
||||
@ -2136,7 +2145,7 @@ public class Coordinator {
|
||||
status.getErrorMsg());
|
||||
updateStatus(status, params.getFragmentInstanceId());
|
||||
}
|
||||
if (ctx.doneFlags.get(params.getFragmentInstanceId())) {
|
||||
if (ctx.fragmentInstancesMap.get(params.fragment_instance_id).getIsDone()) {
|
||||
if (params.isSetDeltaUrls()) {
|
||||
updateDeltas(params.getDeltaUrls());
|
||||
}
|
||||
@ -2714,11 +2723,8 @@ public class Coordinator {
|
||||
PlanFragmentId fragmentId;
|
||||
boolean initiated;
|
||||
volatile boolean done;
|
||||
volatile Map<TUniqueId, Boolean> doneFlags = new HashMap<TUniqueId, Boolean>();
|
||||
boolean hasCanceled;
|
||||
volatile Map<TUniqueId, Boolean> cancelFlags = new HashMap<TUniqueId, Boolean>();
|
||||
|
||||
volatile Map<TUniqueId, RuntimeProfile> profiles = new HashMap<TUniqueId, RuntimeProfile>();
|
||||
Map<TUniqueId, RuntimeProfile> fragmentInstancesMap;
|
||||
int cancelProgress = 0;
|
||||
int profileFragmentId;
|
||||
TNetworkAddress brpcAddress;
|
||||
@ -2729,25 +2735,19 @@ public class Coordinator {
|
||||
private final int numInstances;
|
||||
|
||||
public PipelineExecContext(PlanFragmentId fragmentId, int profileFragmentId,
|
||||
TPipelineFragmentParams rpcParams, Map<TNetworkAddress, Long> addressToBackendID,
|
||||
TNetworkAddress addr) {
|
||||
TPipelineFragmentParams rpcParams, Long backendId,
|
||||
Map<TUniqueId, RuntimeProfile> fragmentInstancesMap) {
|
||||
this.profileFragmentId = profileFragmentId;
|
||||
this.fragmentId = fragmentId;
|
||||
this.rpcParams = rpcParams;
|
||||
this.numInstances = rpcParams.local_params.size();
|
||||
for (int i = 0; i < this.numInstances; i++) {
|
||||
this.doneFlags.put(rpcParams.local_params.get(i).fragment_instance_id, false);
|
||||
this.cancelFlags.put(rpcParams.local_params.get(i).fragment_instance_id, false);
|
||||
this.fragmentInstancesMap = fragmentInstancesMap;
|
||||
|
||||
String name = "Instance " + DebugUtil.printId(rpcParams.local_params.get(i).fragment_instance_id)
|
||||
+ " (host=" + addr + ")";
|
||||
this.profiles.put(rpcParams.local_params.get(i).fragment_instance_id, new RuntimeProfile(name));
|
||||
}
|
||||
this.initiated = false;
|
||||
this.done = false;
|
||||
|
||||
this.address = addr;
|
||||
this.backend = idToBackend.get(addressToBackendID.get(address));
|
||||
this.backend = idToBackend.get(backendId);
|
||||
this.address = new TNetworkAddress(backend.getIp(), backend.getBePort());
|
||||
this.brpcAddress = new TNetworkAddress(backend.getIp(), backend.getBrpcPort());
|
||||
|
||||
this.hasCanceled = false;
|
||||
@ -2771,15 +2771,17 @@ public class Coordinator {
|
||||
// update profile.
|
||||
// return true if profile is updated. Otherwise, return false.
|
||||
public synchronized boolean updateProfile(TReportExecStatusParams params) {
|
||||
if (this.done) {
|
||||
RuntimeProfile profile = fragmentInstancesMap.get(params.fragment_instance_id);
|
||||
if (params.done && profile.getIsDone()) {
|
||||
// duplicate packet
|
||||
return false;
|
||||
}
|
||||
|
||||
if (params.isSetProfile()) {
|
||||
this.profiles.get(params.fragment_instance_id).update(params.profile);
|
||||
profile.update(params.profile);
|
||||
}
|
||||
if (params.done) {
|
||||
this.doneFlags.replace(params.fragment_instance_id, true);
|
||||
profile.setIsDone(true);
|
||||
profileReportProgress++;
|
||||
}
|
||||
if (profileReportProgress == numInstances) {
|
||||
@ -2789,7 +2791,7 @@ public class Coordinator {
|
||||
}
|
||||
|
||||
public synchronized void printProfile(StringBuilder builder) {
|
||||
this.profiles.values().stream().forEach(p -> {
|
||||
this.fragmentInstancesMap.values().stream().forEach(p -> {
|
||||
p.computeTimeInProfile();
|
||||
p.prettyPrint(builder, "");
|
||||
});
|
||||
@ -2815,7 +2817,7 @@ public class Coordinator {
|
||||
this.initiated, this.done, this.hasCanceled, backend.getId(),
|
||||
DebugUtil.printId(localParam.fragment_instance_id), cancelReason.name());
|
||||
}
|
||||
if (cancelFlags.get(localParam.fragment_instance_id)) {
|
||||
if (fragmentInstancesMap.get(localParam.fragment_instance_id).getIsCancel()) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
@ -2841,7 +2843,7 @@ public class Coordinator {
|
||||
}
|
||||
this.hasCanceled = true;
|
||||
for (int i = 0; i < this.numInstances; i++) {
|
||||
this.cancelFlags.replace(rpcParams.local_params.get(i).fragment_instance_id, true);
|
||||
fragmentInstancesMap.get(rpcParams.local_params.get(i).fragment_instance_id).setIsCancel(true);
|
||||
}
|
||||
cancelProgress = numInstances;
|
||||
return true;
|
||||
@ -3354,8 +3356,8 @@ public class Coordinator {
|
||||
if (!ctx.computeTimeInProfile(fragmentProfile.size())) {
|
||||
return;
|
||||
}
|
||||
ctx.profiles.values().stream().forEach(p ->
|
||||
fragmentProfile.get(ctx.profileFragmentId).addChild(p));
|
||||
ctx.fragmentInstancesMap.values().stream()
|
||||
.forEach(p -> fragmentProfile.get(ctx.profileFragmentId).addChild(p));
|
||||
}
|
||||
} else {
|
||||
for (BackendExecState backendExecState : backendExecStates) {
|
||||
|
||||
Reference in New Issue
Block a user