[improve](multi-table-load) pause job when can not find table #29870

If there is no table that can be found, the task will cycle forever and no data will be loaded. To avoid invalid scheduled tasks, It is better to pause the job rather than run it.
This commit is contained in:
HHoflittlefish777
2024-01-12 16:47:34 +08:00
committed by yiguolei
parent 6598b4f7c8
commit 7b30119537
3 changed files with 106 additions and 0 deletions

View File

@ -55,6 +55,7 @@ import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
@ -71,7 +72,9 @@ import org.apache.doris.cooldown.CooldownDelete;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.routineload.ErrorReason;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.routineload.RoutineLoadJob.JobState;
import org.apache.doris.master.MasterImpl;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.PrivPredicate;
@ -1873,6 +1876,14 @@ public class FrontendServiceImpl implements FrontendService.Iface {
status = new TStatus(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(exception.getMessage());
result.setStatus(status);
try {
RoutineLoadJob routineLoadJob = Env.getCurrentEnv().getRoutineLoadManager()
.getRoutineLoadJobByMultiLoadTaskTxnId(request.getTxnId());
routineLoadJob.updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
"failed to get stream load plan, " + exception.getMessage()), false);
} catch (UserException e) {
LOG.warn("catch update routine load job error.", e);
}
return result;
}
long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000;