[Improvement](materialized-view) set job failed when toAgentTaskRequest meet error (#25358)

set job failed when toAgentTaskRequest meet error
This commit is contained in:
Pxl
2023-10-16 20:10:52 +08:00
committed by GitHub
parent f9df3bae61
commit 72920fbd1d
9 changed files with 337 additions and 35 deletions

View File

@ -525,7 +525,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
if (task.getFailedTimes() > 0) {
task.setFinished(true);
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature());
LOG.warn("rollup task failed after try three times: " + task.getErrorMsg());
LOG.warn("rollup task failed: " + task.getErrorMsg());
if (!failedAgentTasks.containsKey(task.getTabletId())) {
failedAgentTasks.put(task.getTabletId(), Lists.newArrayList(task));
} else {

View File

@ -518,10 +518,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
LOG.info("schema change tasks not finished. job: {}", jobId);
List<AgentTask> tasks = schemaChangeBatchTask.getUnfinishedTasks(2000);
for (AgentTask task : tasks) {
if (task.getFailedTimes() >= 3) {
if (task.getFailedTimes() > 0) {
task.setFinished(true);
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature());
LOG.warn("schema change task failed after try three times: " + task.getErrorMsg());
LOG.warn("schema change task failed: " + task.getErrorMsg());
if (!failedAgentTasks.containsKey(task.getTabletId())) {
failedAgentTasks.put(task.getTabletId(), Lists.newArrayList(task));
} else {

View File

@ -32,9 +32,11 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.mvrewrite.CountFieldToSum;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
@ -79,6 +81,9 @@ public class CreateMaterializedViewStmt extends DdlStmt {
FN_NAME_TO_PATTERN.put(FunctionSet.HLL_UNION, new MVColumnHLLUnionPattern());
}
public static final ImmutableSet<String> invalidFn = ImmutableSet.of("now", "current_time", "current_date",
"utc_timestamp", "uuid", "random", "unix_timestamp", "curdate");
private String mvName;
private SelectStmt selectStmt;
private Map<String, String> properties;
@ -158,14 +163,50 @@ public class CreateMaterializedViewStmt extends DdlStmt {
return selectStmt.getWhereClause();
}
private void checkExprValidInMv(Expr expr, String functionName) throws AnalysisException {
if (!isReplay && expr.haveFunction(functionName)) {
throw new AnalysisException("The materialized view contain " + functionName + " is disallowed");
}
}
private void checkExprValidInMv(Expr expr) throws AnalysisException {
if (isReplay) {
return;
}
for (String function : invalidFn) {
checkExprValidInMv(expr, function);
}
}
private void checkExprValidInMv() throws AnalysisException {
if (selectStmt.getWhereClause() != null) {
checkExprValidInMv(selectStmt.getWhereClause());
}
SelectList selectList = selectStmt.getSelectList();
for (SelectListItem selectListItem : selectList.getItems()) {
checkExprValidInMv(selectListItem.getExpr());
}
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
checkExprValidInMv();
FeNameFormat.checkTableName(mvName);
rewriteToBitmapWithCheck();
// TODO(ml): The mv name in from clause should pass the analyze without error.
selectStmt.forbiddenMVRewrite();
selectStmt.analyze(analyzer);
ExprRewriter rewriter = analyzer.getExprRewriter();
rewriter.reset();
selectStmt.rewriteExprs(rewriter);
selectStmt.reset();
analyzer = new Analyzer(analyzer.getEnv(), analyzer.getContext());
selectStmt.analyze(analyzer);
if (selectStmt.getAggInfo() != null) {
mvKeysType = KeysType.AGG_KEYS;
}
@ -227,10 +268,6 @@ public class CreateMaterializedViewStmt extends DdlStmt {
+ selectListItemExpr.toSql());
}
if (!isReplay && selectListItemExpr.haveFunction("curdate")) {
throw new AnalysisException(
"The materialized view contain curdate is disallowed");
}
if (selectListItemExpr instanceof FunctionCallExpr
&& ((FunctionCallExpr) selectListItemExpr).isAggregateFunction()) {

View File

@ -51,7 +51,7 @@ public enum ExpressionFunctions {
private static final Logger LOG = LogManager.getLogger(ExpressionFunctions.class);
private ImmutableMultimap<String, FEFunctionInvoker> functions;
private static final Set<String> unfixedFn = ImmutableSet.of(
public static final Set<String> unfixedFn = ImmutableSet.of(
"now",
"current_time",
"current_date",

View File

@ -570,14 +570,6 @@ public abstract class AbstractSelectMaterializedIndexRule {
return visit(aggregateFunction, context);
}
@Override
public Expression visitAlias(Alias alias, Void context) {
if (mvNameToMvSlot.containsKey(alias.toSlot().toSql())) {
return mvNameToMvSlot.get(alias.toSlot().toSql());
}
return visit(alias, context);
}
@Override
public Expression visitScalarFunction(ScalarFunction scalarFunction, Void context) {
List<Expression> newChildrenWithoutCast = scalarFunction.children().stream()

View File

@ -124,7 +124,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
.replace(agg.withChildren(mvPlan), mvPlan);
} else {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
new LogicalAggregate<>(
agg.getGroupByExpressions(),
@ -168,12 +168,12 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
if (result.exprRewriteMap.isEmpty()) {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
agg.withChildren(filter.withChildren(mvPlan)), mvPlan));
} else {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
new LogicalAggregate<>(
agg.getGroupByExpressions(),
@ -226,7 +226,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
generateNewOutputsWithMvOutputs(mvPlan, newProjectList),
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId));
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
new LogicalAggregate<>(
agg.getGroupByExpressions(),
@ -271,7 +271,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
if (result.exprRewriteMap.isEmpty()) {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
agg.withChildren(
project.withProjectsAndChild(
@ -285,7 +285,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
filter.withChildren(mvPlan));
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
new LogicalAggregate<>(
agg.getGroupByExpressions(),
@ -328,7 +328,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
if (result.exprRewriteMap.isEmpty()) {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
agg.withChildren(
filter.withChildren(
@ -342,7 +342,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
generateNewOutputsWithMvOutputs(mvPlan, newProjectList), mvPlan);
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
new LogicalAggregate<>(
agg.getGroupByExpressions(),
@ -374,7 +374,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
if (result.exprRewriteMap.isEmpty()) {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
agg.withChildren(
repeat.withAggOutputAndChild(
@ -382,7 +382,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
), mvPlan));
} else {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
new LogicalAggregate<>(
agg.getGroupByExpressions(),
@ -428,7 +428,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
if (result.exprRewriteMap.isEmpty()) {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
agg.withChildren(
repeat.withAggOutputAndChild(
@ -437,7 +437,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
)), mvPlan));
} else {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
new LogicalAggregate<>(
agg.getGroupByExpressions(),
@ -480,7 +480,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
if (result.exprRewriteMap.isEmpty()) {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
agg.withChildren(
repeat.withAggOutputAndChild(
@ -496,7 +496,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
generateNewOutputsWithMvOutputs(mvPlan, newProjectList),
mvPlan);
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
new LogicalAggregate<>(
agg.getGroupByExpressions(),
@ -545,7 +545,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
if (result.exprRewriteMap.isEmpty()) {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
agg.withChildren(
repeat.withAggOutputAndChild(
@ -564,7 +564,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
filter.withChildren(mvPlan));
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
new LogicalAggregate<>(
agg.getGroupByExpressions(),
@ -611,7 +611,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
if (result.exprRewriteMap.isEmpty()) {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
agg.withChildren(
repeat.withAggOutputAndChild(
@ -630,7 +630,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId));
return new LogicalProject<>(
generateProjectsAlias(agg.getOutput(), slotContext),
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
new LogicalAggregate<>(
agg.getGroupByExpressions(),

View File

@ -165,7 +165,12 @@ public class AgentBatchTask implements Runnable {
client = ClientPool.backendPool.borrowObject(address);
List<TAgentTaskRequest> agentTaskRequests = new LinkedList<TAgentTaskRequest>();
for (AgentTask task : tasks) {
agentTaskRequests.add(toAgentTaskRequest(task));
try {
agentTaskRequests.add(toAgentTaskRequest(task));
} catch (Exception e) {
task.failed();
throw e;
}
}
client.submitTasks(agentTaskRequests);
if (LOG.isDebugEnabled()) {