diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 57e4fe67aa..f1837f6882 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -26,11 +26,13 @@ import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueId; +import com.google.common.collect.ImmutableMap; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; import java.nio.ByteBuffer; @@ -67,7 +69,7 @@ public class MasterOpExecutor { Span forwardSpan = ctx.getTracer().spanBuilder("forward").setParent(Context.current()) .startSpan(); - try (Scope scope = forwardSpan.makeCurrent()) { + try (Scope ignored = forwardSpan.makeCurrent()) { forward(); } catch (Exception e) { forwardSpan.recordException(e); @@ -88,7 +90,7 @@ public class MasterOpExecutor { int masterRpcPort = ctx.getEnv().getMasterRpcPort(); TNetworkAddress thriftAddress = new TNetworkAddress(masterHost, masterRpcPort); - FrontendService.Client client = null; + FrontendService.Client client; try { client = ClientPool.frontendPool.borrowObject(thriftAddress, thriftTimeoutMs); } catch (Exception e) { @@ -112,7 +114,7 @@ public class MasterOpExecutor { params.setSessionVariables(ctx.getSessionVariable().getForwardVariables()); // create a trace carrier - Map traceCarrier = new HashMap(); + Map traceCarrier = new HashMap<>(); // Inject the request with the current context Telemetry.getOpenTelemetry().getPropagators().getTextMapPropagator() .inject(Context.current(), traceCarrier, (carrier, key, value) -> carrier.put(key, value)); @@ -127,19 +129,28 @@ public class MasterOpExecutor { boolean isReturnToPool = false; try { - result = client.forward(params); + client.forward(params); isReturnToPool = true; } catch (TTransportException e) { + // wrap the raw exception. + Exception exception = new ForwardToMasterException( + String.format("Forward statement %s to Master %s failed", ctx.getStmtId(), + thriftAddress), e); + boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs); if (!ok) { - throw e; + throw exception; } if (shouldNotRetry || e.getType() == TTransportException.TIMED_OUT) { - throw e; + throw exception; } else { LOG.warn("Forward statement " + ctx.getStmtId() + " to Master " + thriftAddress + " twice", e); - result = client.forward(params); - isReturnToPool = true; + try { + client.forward(params); + isReturnToPool = true; + } catch (TException ex) { + throw exception; + } } } finally { if (isReturnToPool) { @@ -190,4 +201,28 @@ public class MasterOpExecutor { public void setResult(TMasterOpResult result) { this.result = result; } + + public static class ForwardToMasterException extends RuntimeException { + + private static final Map TYPE_MSG_MAP = + ImmutableMap.builder() + .put(TTransportException.UNKNOWN, "Unknown exception") + .put(TTransportException.NOT_OPEN, "Connection is not open") + .put(TTransportException.ALREADY_OPEN, "Connection has already opened up") + .put(TTransportException.TIMED_OUT, "Connection timeout") + .put(TTransportException.END_OF_FILE, "EOF") + .put(TTransportException.CORRUPTED_DATA, "Corrupted data") + .build(); + + private final String msg; + + public ForwardToMasterException(String msg, TTransportException exception) { + this.msg = msg + ", cause: " + TYPE_MSG_MAP.get(exception.getType()); + } + + @Override + public String getMessage() { + return msg; + } + } }