[refactor](errormessage) step1: unify the status usage in FE (#34062)
We should tell the user the correct error message when some thing wrong. But error message is in a mess. I will make it clear. This is the first step: unify the error code usage in FE.
This commit is contained in:
@ -26,14 +26,6 @@ public class Status {
|
||||
public static final Status CANCELLED = new Status(TStatusCode.CANCELLED, "Cancelled");
|
||||
public static final Status TIMEOUT = new Status(TStatusCode.TIMEOUT, "Timeout");
|
||||
|
||||
public TStatusCode getErrorCode() {
|
||||
return errorCode;
|
||||
}
|
||||
|
||||
public String getErrorMsg() {
|
||||
return errorMsg;
|
||||
}
|
||||
|
||||
private TStatusCode errorCode; // anything other than OK
|
||||
private String errorMsg;
|
||||
|
||||
@ -58,6 +50,25 @@ public class Status {
|
||||
}
|
||||
}
|
||||
|
||||
public Status(final PStatus status) {
|
||||
TStatusCode mappingCode = TStatusCode.findByValue(status.getStatusCode());
|
||||
// Not all pstatus code could be mapped to TStatusCode, see BE status.h file
|
||||
// For those not mapped, set it to internal error.
|
||||
if (mappingCode == null) {
|
||||
this.errorCode = TStatusCode.INTERNAL_ERROR;
|
||||
} else {
|
||||
this.errorCode = mappingCode;
|
||||
}
|
||||
if (!status.getErrorMsgsList().isEmpty()) {
|
||||
this.errorMsg = status.getErrorMsgs(0);
|
||||
}
|
||||
}
|
||||
|
||||
public void updateStatus(TStatusCode code, String errorMessage) {
|
||||
this.errorCode = code;
|
||||
this.errorMsg = errorMessage;
|
||||
}
|
||||
|
||||
public boolean ok() {
|
||||
return this.errorCode == TStatusCode.OK;
|
||||
}
|
||||
@ -70,27 +81,14 @@ public class Status {
|
||||
return this.errorCode == TStatusCode.THRIFT_RPC_ERROR;
|
||||
}
|
||||
|
||||
public void setStatus(Status status) {
|
||||
this.errorCode = status.errorCode;
|
||||
this.errorMsg = status.getErrorMsg();
|
||||
public TStatusCode getErrorCode() {
|
||||
return errorCode;
|
||||
}
|
||||
|
||||
public void setStatus(String msg) {
|
||||
this.errorCode = TStatusCode.INTERNAL_ERROR;
|
||||
this.errorMsg = msg;
|
||||
public String getErrorMsg() {
|
||||
return errorMsg;
|
||||
}
|
||||
|
||||
public void setPstatus(PStatus status) {
|
||||
this.errorCode = TStatusCode.findByValue(status.getStatusCode());
|
||||
if (!status.getErrorMsgsList().isEmpty()) {
|
||||
this.errorMsg = status.getErrorMsgs(0);
|
||||
}
|
||||
}
|
||||
|
||||
public void setRpcStatus(String msg) {
|
||||
this.errorCode = TStatusCode.THRIFT_RPC_ERROR;
|
||||
this.errorMsg = msg;
|
||||
}
|
||||
|
||||
public void rewriteErrorMsg() {
|
||||
if (ok()) {
|
||||
|
||||
@ -490,7 +490,7 @@ public class Coordinator implements CoordInterface {
|
||||
try {
|
||||
this.backendExecStates.clear();
|
||||
this.pipelineExecContexts.clear();
|
||||
this.queryStatus.setStatus(new Status());
|
||||
this.queryStatus.updateStatus(TStatusCode.OK, "");
|
||||
if (this.exportFiles == null) {
|
||||
this.exportFiles = Lists.newArrayList();
|
||||
}
|
||||
@ -1101,7 +1101,7 @@ public class Coordinator implements CoordInterface {
|
||||
if (exception != null && errMsg == null) {
|
||||
errMsg = operation + " failed. " + exception.getMessage();
|
||||
}
|
||||
queryStatus.setStatus(errMsg);
|
||||
queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
|
||||
switch (code) {
|
||||
case TIMEOUT:
|
||||
@ -1182,7 +1182,7 @@ public class Coordinator implements CoordInterface {
|
||||
if (exception != null && errMsg == null) {
|
||||
errMsg = operation + " failed. " + exception.getMessage();
|
||||
}
|
||||
queryStatus.setStatus(errMsg);
|
||||
queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
|
||||
switch (code) {
|
||||
case TIMEOUT:
|
||||
@ -1306,7 +1306,7 @@ public class Coordinator implements CoordInterface {
|
||||
return;
|
||||
}
|
||||
|
||||
queryStatus.setStatus(status);
|
||||
queryStatus.updateStatus(status.getErrorCode(), status.getErrorMsg());
|
||||
if (status.getErrorCode() == TStatusCode.TIMEOUT) {
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT);
|
||||
} else {
|
||||
@ -1470,7 +1470,7 @@ public class Coordinator implements CoordInterface {
|
||||
+ "so that send cancel to BE again",
|
||||
DebugUtil.printId(queryId), queryStatus.toString(), new Exception());
|
||||
} else {
|
||||
queryStatus.setStatus(Status.CANCELLED);
|
||||
queryStatus.updateStatus(TStatusCode.CANCELLED, "cancelled");
|
||||
}
|
||||
LOG.warn("Cancel execution of query {}, this is a outside invoke, cancelReason {}",
|
||||
DebugUtil.printId(queryId), cancelReason.toString());
|
||||
@ -3137,8 +3137,7 @@ public class Coordinator implements CoordInterface {
|
||||
public void onSuccess(InternalService.PCancelPlanFragmentResult result) {
|
||||
cancelInProcess = false;
|
||||
if (result.hasStatus()) {
|
||||
Status status = new Status();
|
||||
status.setPstatus(result.getStatus());
|
||||
Status status = new Status(result.getStatus());
|
||||
if (status.getErrorCode() == TStatusCode.OK) {
|
||||
hasCancelled = true;
|
||||
} else {
|
||||
@ -3323,8 +3322,7 @@ public class Coordinator implements CoordInterface {
|
||||
public void onSuccess(InternalService.PCancelPlanFragmentResult result) {
|
||||
cancelInProcess = false;
|
||||
if (result.hasStatus()) {
|
||||
Status status = new Status();
|
||||
status.setPstatus(result.getStatus());
|
||||
Status status = new Status(result.getStatus());
|
||||
if (status.getErrorCode() == TStatusCode.OK) {
|
||||
hasCancelled = true;
|
||||
} else {
|
||||
@ -3388,8 +3386,7 @@ public class Coordinator implements CoordInterface {
|
||||
public void onSuccess(InternalService.PCancelPlanFragmentResult result) {
|
||||
cancelInProcess = false;
|
||||
if (result.hasStatus()) {
|
||||
Status status = new Status();
|
||||
status.setPstatus(result.getStatus());
|
||||
Status status = new Status(result.getStatus());
|
||||
if (status.getErrorCode() == TStatusCode.OK) {
|
||||
hasCancelled = true;
|
||||
} else {
|
||||
|
||||
@ -196,7 +196,7 @@ public class PointQueryExec implements CoordInterface {
|
||||
if (tryCount >= maxTry) {
|
||||
break;
|
||||
}
|
||||
status.setStatus(Status.OK);
|
||||
status.updateStatus(TStatusCode.OK, "");
|
||||
} while (true);
|
||||
// handle status code
|
||||
if (!status.ok()) {
|
||||
@ -270,7 +270,7 @@ public class PointQueryExec implements CoordInterface {
|
||||
long currentTs = System.currentTimeMillis();
|
||||
if (currentTs >= timeoutTs) {
|
||||
LOG.warn("fetch result timeout {}", backend.getBrpcAddress());
|
||||
status.setStatus("query timeout");
|
||||
status.updateStatus(TStatusCode.INTERNAL_ERROR, "query timeout");
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
@ -279,35 +279,35 @@ public class PointQueryExec implements CoordInterface {
|
||||
// continue to get result
|
||||
LOG.info("future get interrupted Exception");
|
||||
if (isCancel) {
|
||||
status.setStatus(Status.CANCELLED);
|
||||
status.updateStatus(TStatusCode.CANCELLED, "cancelled");
|
||||
return null;
|
||||
}
|
||||
} catch (TimeoutException e) {
|
||||
futureResponse.cancel(true);
|
||||
LOG.warn("fetch result timeout {}, addr {}", timeoutTs - currentTs, backend.getBrpcAddress());
|
||||
status.setStatus("query timeout");
|
||||
status.updateStatus(TStatusCode.INTERNAL_ERROR, "query timeout");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
} catch (RpcException e) {
|
||||
LOG.warn("fetch result rpc exception {}, e {}", backend.getBrpcAddress(), e);
|
||||
status.setRpcStatus(e.getMessage());
|
||||
status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
|
||||
SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
|
||||
return null;
|
||||
} catch (ExecutionException e) {
|
||||
LOG.warn("fetch result execution exception {}, addr {}", e, backend.getBrpcAddress());
|
||||
if (e.getMessage().contains("time out")) {
|
||||
// if timeout, we set error code to TIMEOUT, and it will not retry querying.
|
||||
status.setStatus(new Status(TStatusCode.TIMEOUT, e.getMessage()));
|
||||
status.updateStatus(TStatusCode.TIMEOUT, e.getMessage());
|
||||
} else {
|
||||
status.setRpcStatus(e.getMessage());
|
||||
status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
|
||||
SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode());
|
||||
if (code != TStatusCode.OK) {
|
||||
status.setPstatus(pResult.getStatus());
|
||||
Status resultStatus = new Status(pResult.getStatus());
|
||||
if (resultStatus.getErrorCode() != TStatusCode.OK) {
|
||||
status.updateStatus(resultStatus.getErrorCode(), resultStatus.getErrorMsg());
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -335,7 +335,7 @@ public class PointQueryExec implements CoordInterface {
|
||||
}
|
||||
|
||||
if (isCancel) {
|
||||
status.setStatus(Status.CANCELLED);
|
||||
status.updateStatus(TStatusCode.CANCELLED, "cancelled");
|
||||
}
|
||||
return rowBatch;
|
||||
}
|
||||
|
||||
@ -59,7 +59,7 @@ public class ResultReceiver {
|
||||
int maxMsgSizeOfResultReceiver;
|
||||
|
||||
private void setRunStatus(Status status) {
|
||||
runStatus.setStatus(status);
|
||||
runStatus.updateStatus(status.getErrorCode(), status.getErrorMsg());
|
||||
}
|
||||
|
||||
private boolean isCancel() {
|
||||
@ -104,14 +104,14 @@ public class ResultReceiver {
|
||||
if (!isCancel()) {
|
||||
LOG.warn("ResultReceiver is not set to cancelled state, this should not happen");
|
||||
} else {
|
||||
status.setStatus(new Status(TStatusCode.CANCELLED, this.cancelReason));
|
||||
status.updateStatus(TStatusCode.CANCELLED, this.cancelReason);
|
||||
return null;
|
||||
}
|
||||
} catch (TimeoutException e) {
|
||||
LOG.warn("Query {} get result timeout, get result duration {} ms",
|
||||
DebugUtil.printId(this.queryId), (timeoutTs - currentTs) / 1000);
|
||||
setRunStatus(Status.TIMEOUT);
|
||||
status.setStatus(Status.TIMEOUT);
|
||||
status.updateStatus(TStatusCode.TIMEOUT, "");
|
||||
updateCancelReason("fetch data timeout");
|
||||
return null;
|
||||
} catch (InterruptedException e) {
|
||||
@ -119,15 +119,15 @@ public class ResultReceiver {
|
||||
LOG.warn("Future of ResultReceiver of query {} got interrupted Exception",
|
||||
DebugUtil.printId(this.queryId), e);
|
||||
if (isCancel()) {
|
||||
status.setStatus(Status.CANCELLED);
|
||||
status.updateStatus(TStatusCode.CANCELLED, "cancelled");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode());
|
||||
if (code != TStatusCode.OK) {
|
||||
status.setPstatus(pResult.getStatus());
|
||||
Status resultStatus = new Status(pResult.getStatus());
|
||||
if (resultStatus.getErrorCode() != TStatusCode.OK) {
|
||||
status.updateStatus(resultStatus.getErrorCode(), resultStatus.getErrorMsg());
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -136,7 +136,7 @@ public class ResultReceiver {
|
||||
if (packetIdx != pResult.getPacketSeq()) {
|
||||
LOG.warn("finistId={}, receive packet failed, expect={}, receive={}",
|
||||
DebugUtil.printId(finstId), packetIdx, pResult.getPacketSeq());
|
||||
status.setRpcStatus("receive error packet");
|
||||
status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, "receive error packet");
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -170,20 +170,20 @@ public class ResultReceiver {
|
||||
}
|
||||
} catch (RpcException e) {
|
||||
LOG.warn("fetch result rpc exception, finstId={}", DebugUtil.printId(finstId), e);
|
||||
status.setRpcStatus(e.getMessage());
|
||||
status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
|
||||
SimpleScheduler.addToBlacklist(backendId, e.getMessage());
|
||||
} catch (ExecutionException e) {
|
||||
LOG.warn("fetch result execution exception, finstId={}", DebugUtil.printId(finstId), e);
|
||||
if (e.getMessage().contains("time out")) {
|
||||
// if timeout, we set error code to TIMEOUT, and it will not retry querying.
|
||||
status.setStatus(new Status(TStatusCode.TIMEOUT, e.getMessage()));
|
||||
status.updateStatus(TStatusCode.TIMEOUT, e.getMessage());
|
||||
} else {
|
||||
status.setRpcStatus(e.getMessage());
|
||||
status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
|
||||
SimpleScheduler.addToBlacklist(backendId, e.getMessage());
|
||||
}
|
||||
} catch (TimeoutException e) {
|
||||
LOG.warn("fetch result timeout, finstId={}", DebugUtil.printId(finstId), e);
|
||||
status.setStatus(new Status(TStatusCode.TIMEOUT, "query timeout"));
|
||||
status.updateStatus(TStatusCode.TIMEOUT, "query timeout");
|
||||
} finally {
|
||||
synchronized (this) {
|
||||
currentThread = null;
|
||||
@ -191,7 +191,7 @@ public class ResultReceiver {
|
||||
}
|
||||
|
||||
if ((isCancel())) {
|
||||
status.setStatus(runStatus);
|
||||
status.updateStatus(runStatus.getErrorCode(), runStatus.getErrorMsg());
|
||||
}
|
||||
return rowBatch;
|
||||
}
|
||||
|
||||
@ -55,13 +55,13 @@ public class CacheBeProxy extends CacheProxy {
|
||||
.updateCache(address, request);
|
||||
InternalService.PCacheResponse response = future.get(timeoutMs, TimeUnit.MILLISECONDS);
|
||||
if (response.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
|
||||
status.setStatus(new Status(TStatusCode.OK, "CACHE_OK"));
|
||||
status.updateStatus(TStatusCode.OK, "CACHE_OK");
|
||||
} else {
|
||||
status.setStatus(response.getStatus().toString());
|
||||
status.updateStatus(TStatusCode.INTERNAL_ERROR, response.getStatus().toString());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("update cache exception, sqlKey {}", sqlKey, e);
|
||||
status.setRpcStatus(e.getMessage());
|
||||
status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
|
||||
SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
|
||||
}
|
||||
}
|
||||
@ -80,17 +80,17 @@ public class CacheBeProxy extends CacheProxy {
|
||||
return future.get(timeoutMs, TimeUnit.MILLISECONDS);
|
||||
} catch (RpcException e) {
|
||||
LOG.warn("fetch catch rpc exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e);
|
||||
status.setRpcStatus(e.getMessage());
|
||||
status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
|
||||
SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("future get interrupted exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e);
|
||||
status.setStatus("interrupted exception");
|
||||
status.updateStatus(TStatusCode.INTERNAL_ERROR, "interrupted exception");
|
||||
} catch (ExecutionException e) {
|
||||
LOG.warn("future get execution exception, sqlKey {}, backend {}", sqlKey, backend.getId(), e);
|
||||
status.setStatus("execution exception");
|
||||
status.updateStatus(TStatusCode.INTERNAL_ERROR, "execution exception");
|
||||
} catch (TimeoutException e) {
|
||||
LOG.warn("fetch result timeout, sqlKey {}, backend {}", sqlKey, backend.getId(), e);
|
||||
status.setStatus("query timeout");
|
||||
status.updateStatus(TStatusCode.TIMEOUT, "query timeout");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@ -130,10 +130,10 @@ public class CacheBeProxy extends CacheProxy {
|
||||
= BackendServiceProxy.getInstance().clearCache(address, request);
|
||||
InternalService.PCacheResponse response = future.get(timeoutMs, TimeUnit.MILLISECONDS);
|
||||
if (response.getStatus() == InternalService.PCacheStatus.CACHE_OK) {
|
||||
status.setStatus(new Status(TStatusCode.OK, "CACHE_OK"));
|
||||
status.updateStatus(TStatusCode.OK, "CACHE_OK");
|
||||
return true;
|
||||
} else {
|
||||
status.setStatus(response.getStatus().toString());
|
||||
status.updateStatus(TStatusCode.INTERNAL_ERROR, response.getStatus().toString());
|
||||
return false;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -31,6 +31,7 @@ import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.proto.InternalService;
|
||||
import org.apache.doris.qe.RowBatch;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
@ -86,7 +87,7 @@ public class PartitionCache extends Cache {
|
||||
range = new PartitionRange(this.partitionPredicate, this.olapTable,
|
||||
this.partitionInfo);
|
||||
if (!range.analytics()) {
|
||||
status.setStatus("analytics range error");
|
||||
status.updateStatus(TStatusCode.INTERNAL_ERROR, "analytics range error");
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@ -118,12 +118,10 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC
|
||||
throw new RuntimeException(String.format("fetch arrow flight schema timeout, finstId: %s",
|
||||
DebugUtil.printId(tid)));
|
||||
}
|
||||
TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode());
|
||||
if (code != TStatusCode.OK) {
|
||||
Status status = new Status();
|
||||
status.setPstatus(pResult.getStatus());
|
||||
Status resultStatus = new Status(pResult.getStatus());
|
||||
if (resultStatus.getErrorCode() != TStatusCode.OK) {
|
||||
throw new RuntimeException(String.format("fetch arrow flight schema failed, finstId: %s, errmsg: %s",
|
||||
DebugUtil.printId(tid), status.getErrorMsg()));
|
||||
DebugUtil.printId(tid), resultStatus.toString()));
|
||||
}
|
||||
if (pResult.hasBeArrowFlightIp()) {
|
||||
ctx.getResultFlightServerAddr().hostname = pResult.getBeArrowFlightIp().toStringUtf8();
|
||||
|
||||
Reference in New Issue
Block a user