[fix](Nereids) lock table when generate distribute plan (#38950) (#39029)

We should lock table when generate distribute plan, because insert overwrite by async materialized view will drop partitions parallel, and query thread will throw exception:
```
java.lang.RuntimeException: Cannot invoke "org.apache.doris.catalog.Partition.getBaseIndex()" because "partition" is null
    at org.apache.doris.nereids.util.Utils.execWithUncheckedException(Utils.java:76) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.nereids.glue.translator.PhysicalPlanTranslator.translatePlan(PhysicalPlanTranslator.java:278) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.nereids.NereidsPlanner.splitFragments(NereidsPlanner.java:341) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.nereids.NereidsPlanner.distribute(NereidsPlanner.java:400) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.nereids.NereidsPlanner.plan(NereidsPlanner.java:147) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.StmtExecutor.executeByNereids(StmtExecutor.java:796) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.StmtExecutor.execute(StmtExecutor.java:605) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.StmtExecutor.queryRetry(StmtExecutor.java:558) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.StmtExecutor.execute(StmtExecutor.java:548) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.ConnectProcessor.executeQuery(ConnectProcessor.java:385) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.ConnectProcessor.handleQuery(ConnectProcessor.java:237) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.MysqlConnectProcessor.handleQuery(MysqlConnectProcessor.java:260) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.MysqlConnectProcessor.dispatch(MysqlConnectProcessor.java:288) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.MysqlConnectProcessor.processOnce(MysqlConnectProcessor.java:342) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.mysql.ReadListener.lambda$handleEvent$0(ReadListener.java:52) ~[doris-fe.jar:1.2-SNAPSHOT]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
    at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.doris.catalog.Partition.getBaseIndex()" because "partition" is null
    at org.apache.doris.planner.OlapScanNode.mockRowCountInStatistic(OlapScanNode.java:589) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.planner.OlapScanNode.finalizeForNereids(OlapScanNode.java:1733) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.nereids.util.Utils.execWithUncheckedException(Utils.java:74) ~[doris-fe.jar:1.2-SNAPSHOT]
    ... 17 more
2024-07-29 00:46:17,608 WARN (mysql-nio-pool-114|201) Analyze failed. stmt[210035, 49d3041004ba4b6a-b07fe4491d03c5de]
org.apache.doris.common.NereidsException: errCode = 2, detailMessage = Cannot invoke "org.apache.doris.catalog.Partition.getBaseIndex()" because "partition" is null
    at org.apache.doris.qe.StmtExecutor.executeByNereids(StmtExecutor.java:803) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.StmtExecutor.execute(StmtExecutor.java:605) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.StmtExecutor.queryRetry(StmtExecutor.java:558) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.StmtExecutor.execute(StmtExecutor.java:548) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.ConnectProcessor.executeQuery(ConnectProcessor.java:385) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.ConnectProcessor.handleQuery(ConnectProcessor.java:237) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.MysqlConnectProcessor.handleQuery(MysqlConnectProcessor.java:260) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.MysqlConnectProcessor.dispatch(MysqlConnectProcessor.java:288) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.MysqlConnectProcessor.processOnce(MysqlConnectProcessor.java:342) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.mysql.ReadListener.lambda$handleEvent$0(ReadListener.java:52) ~[doris-fe.jar:1.2-SNAPSHOT]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
    at java.lang.Thread.run(Thread.java:833) ~[?:?]
```

this exception is too hard to reproduce, so I can not write a test case
This commit is contained in:
924060929
2024-08-07 19:00:44 +08:00
committed by GitHub
parent 931fa5bd1b
commit fd3f95066e
24 changed files with 128 additions and 110 deletions

View File

@ -89,7 +89,7 @@ public class MTMVCache {
}
// Can not convert to table sink, because use the same column from different table when self join
// the out slot is wrong
planner.plan(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
Plan originPlan = planner.getCascadesContext().getRewritePlan();
// Eliminate result sink because sink operator is useless in query rewrite by materialized view
// and the top sort can also be removed

View File

@ -153,6 +153,6 @@ public class MTMVPlanUtil {
StatementBase parsedStmt = statements.get(0);
LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
return planner.plan(logicalPlan, PhysicalProperties.ANY, ExplainLevel.NONE);
return planner.planWithLock(logicalPlan, PhysicalProperties.ANY, ExplainLevel.NONE);
}
}

View File

@ -72,6 +72,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
/**
@ -118,23 +119,23 @@ public class NereidsPlanner extends Planner {
setParsedPlan(parsedPlan);
PhysicalProperties requireProperties = buildInitRequireProperties();
statementContext.getStopwatch().reset().start();
Plan resultPlan = null;
try {
boolean showPlanProcess = showPlanProcess(queryStmt.getExplainOptions());
resultPlan = plan(parsedPlan, requireProperties, explainLevel, showPlanProcess);
planWithLock(parsedPlan, requireProperties, explainLevel, showPlanProcess, plan -> {
setOptimizedPlan(plan);
if (explainLevel.isPlanLevel) {
return;
}
physicalPlan = (PhysicalPlan) plan;
translate(physicalPlan);
});
} finally {
statementContext.getStopwatch().stop();
}
setOptimizedPlan(resultPlan);
if (explainLevel.isPlanLevel) {
return;
}
physicalPlan = (PhysicalPlan) resultPlan;
translate(physicalPlan);
}
@VisibleForTesting
public void plan(StatementBase queryStmt) {
public void planWithLock(StatementBase queryStmt) {
try {
plan(queryStmt, statementContext.getConnectContext().getSessionVariable().toThrift());
} catch (Exception e) {
@ -142,12 +143,18 @@ public class NereidsPlanner extends Planner {
}
}
public PhysicalPlan plan(LogicalPlan plan, PhysicalProperties outputProperties) {
return (PhysicalPlan) plan(plan, outputProperties, ExplainLevel.NONE, false);
public PhysicalPlan planWithLock(LogicalPlan plan, PhysicalProperties outputProperties) {
return (PhysicalPlan) planWithLock(plan, outputProperties, ExplainLevel.NONE, false);
}
public Plan plan(LogicalPlan plan, PhysicalProperties requireProperties, ExplainLevel explainLevel) {
return plan(plan, requireProperties, explainLevel, false);
public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties, ExplainLevel explainLevel) {
return planWithLock(plan, requireProperties, explainLevel, false);
}
public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties,
ExplainLevel explainLevel, boolean showPlanProcess) {
Consumer<Plan> noCallback = p -> {};
return planWithLock(plan, requireProperties, explainLevel, showPlanProcess, noCallback);
}
/**
@ -155,11 +162,14 @@ public class NereidsPlanner extends Planner {
*
* @param plan wait for plan
* @param requireProperties request physical properties constraints
* @param showPlanProcess is record plan process to CascadesContext
* @param lockCallback this callback function will invoke the table lock
* @return plan generated by this planner
* @throws AnalysisException throw exception if failed in ant stage
*/
public Plan plan(LogicalPlan plan, PhysicalProperties requireProperties,
ExplainLevel explainLevel, boolean showPlanProcess) {
public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties,
ExplainLevel explainLevel, boolean showPlanProcess,
Consumer<Plan> lockCallback) {
try {
if (plan instanceof LogicalSqlCache) {
rewrittenPlan = analyzedPlan = plan;
@ -186,77 +196,85 @@ public class NereidsPlanner extends Planner {
initCascadesContext(plan, requireProperties);
try (Lock lock = new Lock(plan, cascadesContext)) {
// resolve column, table and function
// analyze this query
analyze(showAnalyzeProcess(explainLevel, showPlanProcess));
// minidump of input must be serialized first, this process ensure minidump string not null
try {
MinidumpUtils.serializeInputsToDumpFile(plan, cascadesContext.getTables());
} catch (IOException e) {
throw new RuntimeException(e);
}
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setQueryAnalysisFinishTime();
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsAnalysisTime();
}
if (explainLevel == ExplainLevel.ANALYZED_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
analyzedPlan = cascadesContext.getRewritePlan();
if (explainLevel == ExplainLevel.ANALYZED_PLAN) {
return analyzedPlan;
}
}
// rule-based optimize
rewrite(showRewriteProcess(explainLevel, showPlanProcess));
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsRewriteTime();
}
if (explainLevel == ExplainLevel.REWRITTEN_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
rewrittenPlan = cascadesContext.getRewritePlan();
if (explainLevel == ExplainLevel.REWRITTEN_PLAN) {
return rewrittenPlan;
}
}
optimize();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsOptimizeTime();
}
// print memo before choose plan.
// if chooseNthPlan failed, we could get memo to debug
if (cascadesContext.getConnectContext().getSessionVariable().dumpNereidsMemo) {
String memo = cascadesContext.getMemo().toString();
LOG.info(ConnectContext.get().getQueryIdentifier() + "\n" + memo);
}
int nth = cascadesContext.getConnectContext().getSessionVariable().getNthOptimizedPlan();
PhysicalPlan physicalPlan = chooseNthPlan(getRoot(), requireProperties, nth);
physicalPlan = postProcess(physicalPlan);
if (cascadesContext.getConnectContext().getSessionVariable().dumpNereidsMemo) {
String tree = physicalPlan.treeString();
LOG.info(ConnectContext.get().getQueryIdentifier() + "\n" + tree);
}
if (explainLevel == ExplainLevel.OPTIMIZED_PLAN
|| explainLevel == ExplainLevel.ALL_PLAN
|| explainLevel == ExplainLevel.SHAPE_PLAN) {
optimizedPlan = physicalPlan;
}
// serialize optimized plan to dumpfile, dumpfile do not have this part means optimize failed
MinidumpUtils.serializeOutputToDumpFile(physicalPlan);
NereidsTracer.output(statementContext.getConnectContext());
return physicalPlan;
Plan resultPlan = planWithoutLock(plan, explainLevel, showPlanProcess, requireProperties);
lockCallback.accept(resultPlan);
return resultPlan;
}
} finally {
statementContext.releasePlannerResources();
}
}
private Plan planWithoutLock(
LogicalPlan plan, ExplainLevel explainLevel,
boolean showPlanProcess, PhysicalProperties requireProperties) {
// resolve column, table and function
// analyze this query
analyze(showAnalyzeProcess(explainLevel, showPlanProcess));
// minidump of input must be serialized first, this process ensure minidump string not null
try {
MinidumpUtils.serializeInputsToDumpFile(plan, cascadesContext.getTables());
} catch (IOException e) {
throw new RuntimeException(e);
}
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setQueryAnalysisFinishTime();
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsAnalysisTime();
}
if (explainLevel == ExplainLevel.ANALYZED_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
analyzedPlan = cascadesContext.getRewritePlan();
if (explainLevel == ExplainLevel.ANALYZED_PLAN) {
return analyzedPlan;
}
}
// rule-based optimize
rewrite(showRewriteProcess(explainLevel, showPlanProcess));
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsRewriteTime();
}
if (explainLevel == ExplainLevel.REWRITTEN_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
rewrittenPlan = cascadesContext.getRewritePlan();
if (explainLevel == ExplainLevel.REWRITTEN_PLAN) {
return rewrittenPlan;
}
}
optimize();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsOptimizeTime();
}
// print memo before choose plan.
// if chooseNthPlan failed, we could get memo to debug
if (cascadesContext.getConnectContext().getSessionVariable().dumpNereidsMemo) {
String memo = cascadesContext.getMemo().toString();
LOG.info(ConnectContext.get().getQueryIdentifier() + "\n" + memo);
}
int nth = cascadesContext.getConnectContext().getSessionVariable().getNthOptimizedPlan();
PhysicalPlan physicalPlan = chooseNthPlan(getRoot(), requireProperties, nth);
physicalPlan = postProcess(physicalPlan);
if (cascadesContext.getConnectContext().getSessionVariable().dumpNereidsMemo) {
String tree = physicalPlan.treeString();
LOG.info(ConnectContext.get().getQueryIdentifier() + "\n" + tree);
}
if (explainLevel == ExplainLevel.OPTIMIZED_PLAN
|| explainLevel == ExplainLevel.ALL_PLAN
|| explainLevel == ExplainLevel.SHAPE_PLAN) {
optimizedPlan = physicalPlan;
}
// serialize optimized plan to dumpfile, dumpfile do not have this part means optimize failed
MinidumpUtils.serializeOutputToDumpFile(physicalPlan);
NereidsTracer.output(statementContext.getConnectContext());
return physicalPlan;
}
private LogicalPlan preprocess(LogicalPlan logicalPlan) {
return new PlanPreprocessors(statementContext).process(logicalPlan);
}

View File

@ -214,7 +214,7 @@ public class MinidumpUtils {
}
NereidsPlanner nereidsPlanner = new NereidsPlanner(
new StatementContext(ConnectContext.get(), new OriginStatement(sql, 0)));
nereidsPlanner.plan(LogicalPlanAdapter.of(parsed));
nereidsPlanner.planWithLock(LogicalPlanAdapter.of(parsed));
return ((AbstractPlan) nereidsPlanner.getOptimizedPlan()).toJson();
}

View File

@ -75,7 +75,7 @@ public class AddConstraintCommand extends Command implements ForwardWithSync {
private Pair<ImmutableList<String>, TableIf> extractColumnsAndTable(ConnectContext ctx, LogicalPlan plan) {
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
Plan analyzedPlan = planner.plan(plan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN);
Plan analyzedPlan = planner.planWithLock(plan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN);
Set<LogicalCatalogRelation> logicalCatalogRelationSet = analyzedPlan
.collect(LogicalCatalogRelation.class::isInstance);
if (logicalCatalogRelationSet.size() != 1) {

View File

@ -109,7 +109,7 @@ public class CreateTableCommand extends Command implements ForwardWithSync {
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().disableConstantFoldingByBEOnce();
Plan plan = planner.plan(new UnboundResultSink<>(query), PhysicalProperties.ANY, ExplainLevel.NONE);
Plan plan = planner.planWithLock(new UnboundResultSink<>(query), PhysicalProperties.ANY, ExplainLevel.NONE);
if (ctasCols == null) {
// we should analyze the plan firstly to get the columns' name.
ctasCols = plan.getOutput().stream().map(NamedExpression::getName).collect(Collectors.toList());

View File

@ -61,7 +61,7 @@ public class DropConstraintCommand extends Command implements ForwardWithSync {
private TableIf extractTable(ConnectContext ctx, LogicalPlan plan) {
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
Plan analyzedPlan = planner.plan(plan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN);
Plan analyzedPlan = planner.planWithLock(plan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN);
Set<LogicalCatalogRelation> logicalCatalogRelationSet = analyzedPlan
.collect(LogicalCatalogRelation.class::isInstance);
if (logicalCatalogRelationSet.size() != 1) {

View File

@ -85,7 +85,7 @@ public class AlterViewInfo extends BaseViewInfo {
/**validate*/
public void validate(ConnectContext ctx) throws UserException {
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(new UnboundResultSink<>(logicalQuery), PhysicalProperties.ANY, ExplainLevel.NONE);
planner.planWithLock(new UnboundResultSink<>(logicalQuery), PhysicalProperties.ANY, ExplainLevel.NONE);
Set<String> colSets = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
for (Column col : finalCols) {
if (!colSets.add(col.getName())) {

View File

@ -209,7 +209,7 @@ public class CreateMTMVInfo {
LogicalSink<Plan> logicalSink = new UnboundResultSink<>(logicalQuery);
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().disableConstantFoldingByBEOnce();
Plan plan = planner.plan(logicalSink, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
Plan plan = planner.planWithLock(logicalSink, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
if (plan.anyMatch(node -> node instanceof OneRowRelation)) {
throw new AnalysisException("at least contain one table");
}
@ -280,7 +280,7 @@ public class CreateMTMVInfo {
}
Plan plan;
try {
plan = planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.NONE);
plan = planner.planWithLock(logicalQuery, PhysicalProperties.ANY, ExplainLevel.NONE);
} finally {
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);

View File

@ -78,7 +78,7 @@ public class CreateViewInfo extends BaseViewInfo {
PrivPredicate.CREATE.getPrivs().toString(), viewName.getTbl());
}
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(new UnboundResultSink<>(logicalQuery), PhysicalProperties.ANY, ExplainLevel.NONE);
planner.planWithLock(new UnboundResultSink<>(logicalQuery), PhysicalProperties.ANY, ExplainLevel.NONE);
Set<String> colSets = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
for (Column col : finalCols) {
if (!colSets.add(col.getName())) {

View File

@ -137,7 +137,7 @@ public class MTMVPartitionDefinition {
cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
try {
Plan mvRewrittenPlan =
planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
planner.planWithLock(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
RelatedTableInfo relatedTableInfo = MaterializedViewUtils
.getRelatedTableInfo(partitionColName, timeUnit, mvRewrittenPlan, cascadesContext);
if (!relatedTableInfo.isPctPossible()) {