From ead016e0d2567030c297277c1dca63198cc18efb Mon Sep 17 00:00:00 2001 From: ChPi Date: Wed, 14 Sep 2022 09:12:33 +0800 Subject: [PATCH] [Enhancement](execute) add timeout for executing fragment rpc (#12512) Co-authored-by: chenjie --- .../org/apache/doris/rpc/BackendServiceClient.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index fec5c1727e..0269bee35d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -47,6 +47,7 @@ public class BackendServiceClient { private final PBackendServiceGrpc.PBackendServiceFutureStub stub; private final PBackendServiceGrpc.PBackendServiceBlockingStub blockingStub; private final ManagedChannel channel; + private final long execPlanTimeout; public BackendServiceClient(TNetworkAddress address) { this.address = address; @@ -56,21 +57,26 @@ public class BackendServiceClient { .intercept(new OpenTelemetryClientInterceptor()).usePlaintext().build(); stub = PBackendServiceGrpc.newFutureStub(channel); blockingStub = PBackendServiceGrpc.newBlockingStub(channel); + // execPlanTimeout should be greater than future.get timeout, otherwise future will throw ExecutionException + execPlanTimeout = Config.remote_fragment_exec_timeout_ms + 5000; } public Future execPlanFragmentAsync( InternalService.PExecPlanFragmentRequest request) { - return stub.execPlanFragment(request); + return stub.withDeadlineAfter(execPlanTimeout, TimeUnit.MILLISECONDS) + .execPlanFragment(request); } public Future execPlanFragmentPrepareAsync( InternalService.PExecPlanFragmentRequest request) { - return stub.execPlanFragmentPrepare(request); + return stub.withDeadlineAfter(execPlanTimeout, TimeUnit.MILLISECONDS) + .execPlanFragmentPrepare(request); } public Future execPlanFragmentStartAsync( InternalService.PExecPlanFragmentStartRequest request) { - return stub.execPlanFragmentStart(request); + return stub.withDeadlineAfter(execPlanTimeout, TimeUnit.MILLISECONDS) + .execPlanFragmentStart(request); } public Future cancelPlanFragmentAsync(