[bugfix](coordinator) should use fragment id not profile fragment id to cancel fragment (#31852)
This commit is contained in:
@ -136,7 +136,6 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
@ -1253,7 +1252,7 @@ public class Coordinator implements CoordInterface {
|
||||
}
|
||||
}
|
||||
|
||||
private void updateStatus(Status status, TUniqueId instanceId) {
|
||||
private void updateStatus(Status status) {
|
||||
lock.lock();
|
||||
try {
|
||||
// The query is done and we are just waiting for remote fragments to clean up.
|
||||
@ -1272,10 +1271,6 @@ public class Coordinator implements CoordInterface {
|
||||
}
|
||||
|
||||
queryStatus.setStatus(status);
|
||||
LOG.warn("one instance report fail throw updateStatus(), need cancel. job id: {},"
|
||||
+ " query id: {}, instance id: {}, error message: {}",
|
||||
jobId, DebugUtil.printId(queryId), instanceId != null ? DebugUtil.printId(instanceId) : "NaN",
|
||||
status.getErrorMsg());
|
||||
if (status.getErrorCode() == TStatusCode.TIMEOUT) {
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT);
|
||||
} else {
|
||||
@ -1286,38 +1281,6 @@ public class Coordinator implements CoordInterface {
|
||||
}
|
||||
}
|
||||
|
||||
private void updateStatus(Status status, long backendId) {
|
||||
lock.lock();
|
||||
try {
|
||||
// The query is done and we are just waiting for remote fragments to clean up.
|
||||
// Ignore their cancelled updates.
|
||||
if (returnedAllResults && status.isCancelled()) {
|
||||
return;
|
||||
}
|
||||
// nothing to update
|
||||
if (status.ok()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// don't override an error status; also, cancellation has already started
|
||||
if (!queryStatus.ok()) {
|
||||
return;
|
||||
}
|
||||
|
||||
queryStatus.setStatus(status);
|
||||
LOG.warn("one instance report fail throw updateStatus(), need cancel. job id: {},"
|
||||
+ " query id: {}, error message: {}",
|
||||
jobId, DebugUtil.printId(queryId), status.getErrorMsg());
|
||||
if (status.getErrorCode() == TStatusCode.TIMEOUT) {
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT, backendId);
|
||||
} else {
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, backendId);
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RowBatch getNext() throws Exception {
|
||||
if (receiver == null) {
|
||||
@ -1332,7 +1295,7 @@ public class Coordinator implements CoordInterface {
|
||||
DebugUtil.printId(queryId), status.toString());
|
||||
}
|
||||
|
||||
updateStatus(status, null /* no instance id */);
|
||||
updateStatus(status);
|
||||
|
||||
Status copyStatus = null;
|
||||
lock();
|
||||
@ -1487,18 +1450,6 @@ public class Coordinator implements CoordInterface {
|
||||
executionProfile.onCancel();
|
||||
}
|
||||
|
||||
private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason, long backendId) {
|
||||
if (null != receiver) {
|
||||
receiver.cancel(cancelReason.toString());
|
||||
}
|
||||
if (null != pointExec) {
|
||||
pointExec.cancel();
|
||||
return;
|
||||
}
|
||||
cancelRemoteFragmentsAsync(cancelReason, backendId);
|
||||
executionProfile.onCancel();
|
||||
}
|
||||
|
||||
private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cancelReason) {
|
||||
if (enablePipelineEngine) {
|
||||
for (PipelineExecContext ctx : pipelineExecContexts.values()) {
|
||||
@ -1511,15 +1462,6 @@ public class Coordinator implements CoordInterface {
|
||||
}
|
||||
}
|
||||
|
||||
private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cancelReason, long backendId) {
|
||||
Preconditions.checkArgument(enablePipelineXEngine);
|
||||
for (PipelineExecContext ctx : pipelineExecContexts.values()) {
|
||||
if (!Objects.equals(idToBackend.get(backendId), ctx.backend)) {
|
||||
ctx.cancelFragmentInstance(cancelReason);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void computeFragmentExecParams() throws Exception {
|
||||
// fill hosts field in fragmentExecParams
|
||||
computeFragmentHosts();
|
||||
@ -2489,7 +2431,7 @@ public class Coordinator implements CoordInterface {
|
||||
LOG.warn("one instance report fail, query_id={} instance_id={}, error message: {}",
|
||||
DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()),
|
||||
status.getErrorMsg());
|
||||
updateStatus(status, params.backend_id);
|
||||
updateStatus(status);
|
||||
}
|
||||
if (params.isSetDeltaUrls()) {
|
||||
updateDeltas(params.getDeltaUrls());
|
||||
@ -2546,7 +2488,7 @@ public class Coordinator implements CoordInterface {
|
||||
DebugUtil.printId(queryId), params.getFragmentId(),
|
||||
DebugUtil.printId(params.getFragmentInstanceId()),
|
||||
params.getBackendId(), status.getErrorMsg());
|
||||
updateStatus(status, params.getFragmentInstanceId());
|
||||
updateStatus(status);
|
||||
}
|
||||
|
||||
// params.isDone() should be promised.
|
||||
@ -2622,7 +2564,7 @@ public class Coordinator implements CoordInterface {
|
||||
LOG.warn("Instance {} of query {} report failed status, error msg: {}",
|
||||
DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()),
|
||||
status.getErrorMsg());
|
||||
updateStatus(status, params.getFragmentInstanceId());
|
||||
updateStatus(status);
|
||||
}
|
||||
}
|
||||
|
||||
@ -3336,7 +3278,7 @@ public class Coordinator implements CoordInterface {
|
||||
try {
|
||||
try {
|
||||
BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddress,
|
||||
this.profileFragmentId, queryId, cancelReason);
|
||||
this.fragmentId, queryId, cancelReason);
|
||||
} catch (RpcException e) {
|
||||
LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(),
|
||||
brpcAddress.getPort());
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.common.util.NetUtils;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.planner.PlanFragmentId;
|
||||
import org.apache.doris.proto.InternalService;
|
||||
import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
|
||||
import org.apache.doris.proto.InternalService.PGetWalQueueSizeRequest;
|
||||
@ -241,12 +242,13 @@ public class BackendServiceProxy {
|
||||
}
|
||||
|
||||
public Future<InternalService.PCancelPlanFragmentResult> cancelPipelineXPlanFragmentAsync(TNetworkAddress address,
|
||||
int fragmentId, TUniqueId queryId, Types.PPlanFragmentCancelReason cancelReason) throws RpcException {
|
||||
PlanFragmentId fragmentId, TUniqueId queryId,
|
||||
Types.PPlanFragmentCancelReason cancelReason) throws RpcException {
|
||||
final InternalService.PCancelPlanFragmentRequest pRequest = InternalService.PCancelPlanFragmentRequest
|
||||
.newBuilder()
|
||||
.setFinstId(Types.PUniqueId.newBuilder().setHi(0).setLo(0).build())
|
||||
.setCancelReason(cancelReason)
|
||||
.setFragmentId(fragmentId)
|
||||
.setFragmentId(fragmentId.asInt())
|
||||
.setQueryId(Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build()).build();
|
||||
try {
|
||||
final BackendServiceClient client = getProxy(address);
|
||||
|
||||
Reference in New Issue
Block a user