[Enhancement](stmt-forward) better error msg for follower fe #17132

The error log msg for the FE follower's forward to master failure is ambiguous as seen, so we should clarify it.
This commit is contained in:
奕冷
2023-02-26 12:28:33 +08:00
committed by GitHub
parent 605d840231
commit 5018223176

View File

@ -26,11 +26,13 @@ import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.ImmutableMap;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import java.nio.ByteBuffer;
@ -67,7 +69,7 @@ public class MasterOpExecutor {
Span forwardSpan =
ctx.getTracer().spanBuilder("forward").setParent(Context.current())
.startSpan();
try (Scope scope = forwardSpan.makeCurrent()) {
try (Scope ignored = forwardSpan.makeCurrent()) {
forward();
} catch (Exception e) {
forwardSpan.recordException(e);
@ -88,7 +90,7 @@ public class MasterOpExecutor {
int masterRpcPort = ctx.getEnv().getMasterRpcPort();
TNetworkAddress thriftAddress = new TNetworkAddress(masterHost, masterRpcPort);
FrontendService.Client client = null;
FrontendService.Client client;
try {
client = ClientPool.frontendPool.borrowObject(thriftAddress, thriftTimeoutMs);
} catch (Exception e) {
@ -112,7 +114,7 @@ public class MasterOpExecutor {
params.setSessionVariables(ctx.getSessionVariable().getForwardVariables());
// create a trace carrier
Map<String, String> traceCarrier = new HashMap<String, String>();
Map<String, String> traceCarrier = new HashMap<>();
// Inject the request with the current context
Telemetry.getOpenTelemetry().getPropagators().getTextMapPropagator()
.inject(Context.current(), traceCarrier, (carrier, key, value) -> carrier.put(key, value));
@ -127,19 +129,28 @@ public class MasterOpExecutor {
boolean isReturnToPool = false;
try {
result = client.forward(params);
client.forward(params);
isReturnToPool = true;
} catch (TTransportException e) {
// wrap the raw exception.
Exception exception = new ForwardToMasterException(
String.format("Forward statement %s to Master %s failed", ctx.getStmtId(),
thriftAddress), e);
boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs);
if (!ok) {
throw e;
throw exception;
}
if (shouldNotRetry || e.getType() == TTransportException.TIMED_OUT) {
throw e;
throw exception;
} else {
LOG.warn("Forward statement " + ctx.getStmtId() + " to Master " + thriftAddress + " twice", e);
result = client.forward(params);
isReturnToPool = true;
try {
client.forward(params);
isReturnToPool = true;
} catch (TException ex) {
throw exception;
}
}
} finally {
if (isReturnToPool) {
@ -190,4 +201,28 @@ public class MasterOpExecutor {
public void setResult(TMasterOpResult result) {
this.result = result;
}
public static class ForwardToMasterException extends RuntimeException {
private static final Map<Integer, String> TYPE_MSG_MAP =
ImmutableMap.<Integer, String>builder()
.put(TTransportException.UNKNOWN, "Unknown exception")
.put(TTransportException.NOT_OPEN, "Connection is not open")
.put(TTransportException.ALREADY_OPEN, "Connection has already opened up")
.put(TTransportException.TIMED_OUT, "Connection timeout")
.put(TTransportException.END_OF_FILE, "EOF")
.put(TTransportException.CORRUPTED_DATA, "Corrupted data")
.build();
private final String msg;
public ForwardToMasterException(String msg, TTransportException exception) {
this.msg = msg + ", cause: " + TYPE_MSG_MAP.get(exception.getType());
}
@Override
public String getMessage() {
return msg;
}
}
}