[Enhancement](execute) add timeout for executing fragment rpc (#12512)

Co-authored-by: chenjie <chenjie@cecdat.com>
This commit is contained in:
ChPi
2022-09-14 09:12:33 +08:00
committed by GitHub
parent 8448867bed
commit ead016e0d2

View File

@ -47,6 +47,7 @@ public class BackendServiceClient {
private final PBackendServiceGrpc.PBackendServiceFutureStub stub;
private final PBackendServiceGrpc.PBackendServiceBlockingStub blockingStub;
private final ManagedChannel channel;
private final long execPlanTimeout;
public BackendServiceClient(TNetworkAddress address) {
this.address = address;
@ -56,21 +57,26 @@ public class BackendServiceClient {
.intercept(new OpenTelemetryClientInterceptor()).usePlaintext().build();
stub = PBackendServiceGrpc.newFutureStub(channel);
blockingStub = PBackendServiceGrpc.newBlockingStub(channel);
// execPlanTimeout should be greater than future.get timeout, otherwise future will throw ExecutionException
execPlanTimeout = Config.remote_fragment_exec_timeout_ms + 5000;
}
public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentAsync(
InternalService.PExecPlanFragmentRequest request) {
return stub.execPlanFragment(request);
return stub.withDeadlineAfter(execPlanTimeout, TimeUnit.MILLISECONDS)
.execPlanFragment(request);
}
public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentPrepareAsync(
InternalService.PExecPlanFragmentRequest request) {
return stub.execPlanFragmentPrepare(request);
return stub.withDeadlineAfter(execPlanTimeout, TimeUnit.MILLISECONDS)
.execPlanFragmentPrepare(request);
}
public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentStartAsync(
InternalService.PExecPlanFragmentStartRequest request) {
return stub.execPlanFragmentStart(request);
return stub.withDeadlineAfter(execPlanTimeout, TimeUnit.MILLISECONDS)
.execPlanFragmentStart(request);
}
public Future<InternalService.PCancelPlanFragmentResult> cancelPlanFragmentAsync(