[fix](Nereids) should do read lock on table being insert when analyze (#25619)

This commit is contained in:
morrySnow
2023-10-20 21:09:19 +08:00
committed by GitHub
parent 9fcee92a26
commit 3ffc6f0835
3 changed files with 124 additions and 42 deletions

View File

@ -23,6 +23,8 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.nereids.analyzer.Scope;
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.jobs.Job;
@ -52,7 +54,9 @@ import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
@ -61,6 +65,8 @@ import org.apache.doris.statistics.Statistics;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import org.apache.hadoop.util.Lists;
import java.util.ArrayList;
import java.util.HashMap;
@ -74,6 +80,7 @@ import java.util.Set;
import java.util.Stack;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
/**
@ -96,7 +103,7 @@ public class CascadesContext implements ScheduleContext {
private final Map<SubqueryExpr, Boolean> subqueryExprIsAnalyzed;
private final RuntimeFilterContext runtimeFilterContext;
private Optional<Scope> outerScope = Optional.empty();
private List<TableIf> tables = null;
private Map<Long, TableIf> tables = null;
private boolean isRewriteRoot;
private volatile boolean isTimeout = false;
@ -213,7 +220,7 @@ public class CascadesContext implements ScheduleContext {
}
public void setTables(List<TableIf> tables) {
this.tables = tables;
this.tables = tables.stream().collect(Collectors.toMap(TableIf::getId, t -> t, (t1, t2) -> t1));
}
public ConnectContext getConnectContext() {
@ -330,21 +337,23 @@ public class CascadesContext implements ScheduleContext {
* Extract tables.
*/
public void extractTables(LogicalPlan logicalPlan) {
Set<UnboundRelation> relations = getTables(logicalPlan);
tables = new ArrayList<>();
for (UnboundRelation r : relations) {
Set<List<String>> tableNames = getTables(logicalPlan);
tables = Maps.newHashMap();
for (List<String> tableName : tableNames) {
try {
tables.add(getTable(r));
TableIf table = getTable(tableName);
tables.put(table.getId(), table);
} catch (Throwable e) {
// IGNORE
}
}
}
/** get table by table name, try to get from information from dumpfile first */
public TableIf getTableInMinidumpCache(String tableName) {
Preconditions.checkState(tables != null);
for (TableIf table : tables) {
Preconditions.checkState(tables != null, "tables should not be null");
for (TableIf table : tables.values()) {
if (table.getName().equals(tableName)) {
return table;
}
@ -356,45 +365,101 @@ public class CascadesContext implements ScheduleContext {
}
public List<TableIf> getTables() {
return tables;
if (tables == null) {
return null;
} else {
return Lists.newArrayList(tables.values());
}
}
private Set<UnboundRelation> getTables(LogicalPlan logicalPlan) {
Set<UnboundRelation> unboundRelations = new HashSet<>();
private Set<List<String>> getTables(LogicalPlan logicalPlan) {
final Set<List<String>> tableNames = new HashSet<>();
logicalPlan.foreach(p -> {
if (p instanceof LogicalFilter) {
unboundRelations.addAll(extractUnboundRelationFromFilter((LogicalFilter<?>) p));
tableNames.addAll(extractTableNamesFromFilter((LogicalFilter<?>) p));
} else if (p instanceof LogicalCTE) {
unboundRelations.addAll(extractUnboundRelationFromCTE((LogicalCTE<?>) p));
tableNames.addAll(extractTableNamesFromCTE((LogicalCTE<?>) p));
} else if (p instanceof LogicalProject) {
tableNames.addAll(extractTableNamesFromProject((LogicalProject<?>) p));
} else if (p instanceof LogicalHaving) {
tableNames.addAll(extractTableNamesFromHaving((LogicalHaving<?>) p));
} else if (p instanceof UnboundOneRowRelation) {
tableNames.addAll(extractTableNamesFromOneRowRelation((UnboundOneRowRelation) p));
} else {
unboundRelations.addAll(p.collect(UnboundRelation.class::isInstance));
Set<LogicalPlan> logicalPlans = p.collect(
n -> (n instanceof UnboundRelation || n instanceof UnboundOlapTableSink));
for (LogicalPlan plan : logicalPlans) {
if (plan instanceof UnboundRelation) {
tableNames.add(((UnboundRelation) plan).getNameParts());
} else if (plan instanceof UnboundOlapTableSink) {
tableNames.add(((UnboundOlapTableSink<?>) plan).getNameParts());
} else {
throw new AnalysisException("get tables from plan failed. meet unknown type node " + plan);
}
}
}
});
return unboundRelations;
return tableNames;
}
private Set<UnboundRelation> extractUnboundRelationFromFilter(LogicalFilter<?> filter) {
Set<SubqueryExpr> subqueryExprs = filter.getPredicate()
private Set<List<String>> extractTableNamesFromHaving(LogicalHaving<?> having) {
Set<SubqueryExpr> subqueryExprs = having.getPredicate()
.collect(SubqueryExpr.class::isInstance);
Set<UnboundRelation> relations = new HashSet<>();
Set<List<String>> tableNames = new HashSet<>();
for (SubqueryExpr expr : subqueryExprs) {
LogicalPlan plan = expr.getQueryPlan();
relations.addAll(getTables(plan));
tableNames.addAll(getTables(plan));
}
return relations;
return tableNames;
}
private Set<UnboundRelation> extractUnboundRelationFromCTE(LogicalCTE<?> cte) {
private Set<List<String>> extractTableNamesFromOneRowRelation(UnboundOneRowRelation oneRowRelation) {
Set<SubqueryExpr> subqueryExprs = oneRowRelation.getProjects().stream()
.<Set<SubqueryExpr>>map(p -> p.collect(SubqueryExpr.class::isInstance))
.flatMap(Set::stream)
.collect(Collectors.toSet());
Set<List<String>> tableNames = new HashSet<>();
for (SubqueryExpr expr : subqueryExprs) {
LogicalPlan plan = expr.getQueryPlan();
tableNames.addAll(getTables(plan));
}
return tableNames;
}
private Set<List<String>> extractTableNamesFromProject(LogicalProject<?> project) {
Set<SubqueryExpr> subqueryExprs = project.getProjects().stream()
.<Set<SubqueryExpr>>map(p -> p.collect(SubqueryExpr.class::isInstance))
.flatMap(Set::stream)
.collect(Collectors.toSet());
Set<List<String>> tableNames = new HashSet<>();
for (SubqueryExpr expr : subqueryExprs) {
LogicalPlan plan = expr.getQueryPlan();
tableNames.addAll(getTables(plan));
}
return tableNames;
}
private Set<List<String>> extractTableNamesFromFilter(LogicalFilter<?> filter) {
Set<SubqueryExpr> subqueryExprs = filter.getPredicate()
.collect(SubqueryExpr.class::isInstance);
Set<List<String>> tableNames = new HashSet<>();
for (SubqueryExpr expr : subqueryExprs) {
LogicalPlan plan = expr.getQueryPlan();
tableNames.addAll(getTables(plan));
}
return tableNames;
}
private Set<List<String>> extractTableNamesFromCTE(LogicalCTE<?> cte) {
List<LogicalSubQueryAlias<Plan>> subQueryAliases = cte.getAliasQueries();
Set<UnboundRelation> relations = new HashSet<>();
Set<List<String>> tableNames = new HashSet<>();
for (LogicalSubQueryAlias<Plan> subQueryAlias : subQueryAliases) {
relations.addAll(getTables(subQueryAlias));
tableNames.addAll(getTables(subQueryAlias));
}
return relations;
return tableNames;
}
private TableIf getTable(UnboundRelation unboundRelation) {
List<String> nameParts = unboundRelation.getNameParts();
private TableIf getTable(List<String> nameParts) {
switch (nameParts.size()) {
case 1: { // table
String ctlName = getConnectContext().getEnv().getCurrentCatalog().getName();
@ -413,7 +478,7 @@ public class CascadesContext implements ScheduleContext {
return getTable(nameParts.get(0), nameParts.get(1), nameParts.get(2), getConnectContext().getEnv());
}
default:
throw new IllegalStateException("Table name [" + unboundRelation.getTableName() + "] is invalid.");
throw new IllegalStateException("Table name [" + String.join(".", nameParts) + "] is invalid.");
}
}
@ -455,10 +520,10 @@ public class CascadesContext implements ScheduleContext {
public Lock(LogicalPlan plan, CascadesContext cascadesContext) {
this.cascadesContext = cascadesContext;
// tables can also be load from dump file
if (cascadesContext.getTables() == null) {
if (cascadesContext.tables == null) {
cascadesContext.extractTables(plan);
}
for (TableIf table : cascadesContext.tables) {
for (TableIf table : cascadesContext.tables.values()) {
if (!table.tryReadLock(1, TimeUnit.MINUTES)) {
close();
throw new RuntimeException(String.format("Failed to get read lock on table: %s", table.getName()));

View File

@ -140,7 +140,7 @@ public class BindRelation extends OneAnalysisRuleFactory {
List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
unboundRelation.getNameParts());
TableIf table = null;
if (cascadesContext.getTables() != null) {
if (!CollectionUtils.isEmpty(cascadesContext.getTables())) {
table = cascadesContext.getTableInMinidumpCache(tableName);
}
if (table == null) {

View File

@ -17,14 +17,15 @@
package org.apache.doris.nereids.util;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.datasets.ssb.SSBTestBase;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -47,10 +48,10 @@ public class ReadLockTest extends SSBTestBase {
PhysicalProperties.ANY
);
CascadesContext cascadesContext = planner.getCascadesContext();
List<Table> f = (List<Table>) Deencapsulation.getField(cascadesContext, "tables");
Assertions.assertEquals(1, f.size());
Assertions.assertEquals("supplier", f.stream().map(Table::getName).findFirst().get());
List<TableIf> f = cascadesContext.getTables();
Assertions.assertEquals(1, f.size());
Assertions.assertEquals("supplier", f.stream().map(TableIf::getName).findFirst().get());
}
@Test
@ -69,9 +70,9 @@ public class ReadLockTest extends SSBTestBase {
PhysicalProperties.ANY
);
CascadesContext cascadesContext = planner.getCascadesContext();
List<Table> f = (List<Table>) Deencapsulation.getField(cascadesContext, "tables");
List<TableIf> f = cascadesContext.getTables();
Assertions.assertEquals(1, f.size());
Assertions.assertEquals("supplier", f.stream().map(Table::getName).findFirst().get());
Assertions.assertEquals("supplier", f.stream().map(TableIf::getName).findFirst().get());
}
@Test
@ -84,10 +85,9 @@ public class ReadLockTest extends SSBTestBase {
PhysicalProperties.ANY
);
CascadesContext cascadesContext = planner.getCascadesContext();
List<Table> f = (List<Table>) Deencapsulation.getField(cascadesContext, "tables");
List<TableIf> f = cascadesContext.getTables();
Assertions.assertEquals(1, f.size());
Assertions.assertEquals("supplier", f.stream().map(Table::getName).findFirst().get());
Assertions.assertEquals("supplier", f.stream().map(TableIf::getName).findFirst().get());
}
@Test
@ -100,11 +100,28 @@ public class ReadLockTest extends SSBTestBase {
PhysicalProperties.ANY
);
CascadesContext cascadesContext = planner.getCascadesContext();
List<Table> f = (List<Table>) Deencapsulation.getField(cascadesContext, "tables");
List<TableIf> f = cascadesContext.getTables();
Assertions.assertEquals(2, f.size());
Set<String> tableNames = f.stream().map(Table::getName).collect(Collectors.toSet());
Set<String> tableNames = f.stream().map(TableIf::getName).collect(Collectors.toSet());
Assertions.assertTrue(tableNames.contains("supplier"));
Assertions.assertTrue(tableNames.contains("lineorder"));
}
@Test
public void testInserInto() {
String sql = "INSERT INTO supplier(s_suppkey) SELECT lo_orderkey FROM lineorder";
StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql);
InsertIntoTableCommand insertIntoTableCommand = (InsertIntoTableCommand) parser.parseSingle(sql);
NereidsPlanner planner = new NereidsPlanner(statementContext);
planner.plan(
(LogicalPlan) insertIntoTableCommand.getExplainPlan(connectContext),
PhysicalProperties.ANY
);
CascadesContext cascadesContext = planner.getCascadesContext();
List<TableIf> f = cascadesContext.getTables();
Assertions.assertEquals(2, f.size());
Set<String> tableNames = f.stream().map(TableIf::getName).collect(Collectors.toSet());
Assertions.assertTrue(tableNames.contains("supplier"));
Assertions.assertTrue(tableNames.contains("lineorder"));
}
}