branch-2.1: [fix](Nereids) Use the schema saved during planning as the schema of the original target table #47337 (#47773)
pick from master #47337
This commit is contained in:
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.nereids;
|
||||
|
||||
import org.apache.doris.analysis.StatementBase;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.View;
|
||||
import org.apache.doris.catalog.constraint.TableIdentifier;
|
||||
@ -179,6 +180,8 @@ public class StatementContext implements Closeable {
|
||||
private final Map<List<String>, TableIf> insertTargetTables = Maps.newHashMap();
|
||||
// save view's def to avoid them change before lock
|
||||
private final Map<List<String>, String> viewInfos = Maps.newHashMap();
|
||||
// save insert into schema to avoid schema changed between two read locks
|
||||
private final List<Column> insertTargetSchema = new ArrayList<>();
|
||||
|
||||
// for create view support in nereids
|
||||
// key is the start and end position of the sql substring that needs to be replaced,
|
||||
@ -276,6 +279,10 @@ public class StatementContext implements Closeable {
|
||||
return tables;
|
||||
}
|
||||
|
||||
public List<Column> getInsertTargetSchema() {
|
||||
return insertTargetSchema;
|
||||
}
|
||||
|
||||
public void setTables(Map<List<String>, TableIf> tables) {
|
||||
this.tables.clear();
|
||||
this.tables.putAll(tables);
|
||||
|
||||
@ -19,7 +19,6 @@ package org.apache.doris.nereids.pattern.generator;
|
||||
|
||||
import org.apache.doris.nereids.pattern.generator.javaast.ClassDeclaration;
|
||||
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@ -45,8 +44,7 @@ public class PlanPatternGeneratorAnalyzer {
|
||||
Map<ClassDeclaration, Set<String>> planClassMap = analyzer.getParentClassMap().entrySet().stream()
|
||||
.filter(kv -> kv.getValue().contains("org.apache.doris.nereids.trees.plans.Plan"))
|
||||
.filter(kv -> !kv.getKey().name.equals("GroupPlan"))
|
||||
.filter(kv -> !Modifier.isAbstract(kv.getKey().modifiers.mod)
|
||||
&& kv.getKey() instanceof ClassDeclaration)
|
||||
.filter(kv -> kv.getKey() instanceof ClassDeclaration)
|
||||
.collect(Collectors.toMap(kv -> (ClassDeclaration) kv.getKey(), kv -> kv.getValue()));
|
||||
|
||||
List<PlanPatternGenerator> generators = planClassMap.entrySet()
|
||||
|
||||
@ -40,6 +40,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
|
||||
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
|
||||
import org.apache.doris.nereids.util.RelationUtil;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
@ -74,8 +75,8 @@ public class CollectRelation implements AnalysisRuleFactory {
|
||||
unboundRelation()
|
||||
.thenApply(this::collectFromUnboundRelation)
|
||||
.toRule(RuleType.COLLECT_TABLE_FROM_RELATION),
|
||||
unboundTableSink()
|
||||
.thenApply(this::collectFromUnboundTableSink)
|
||||
unboundLogicalSink()
|
||||
.thenApply(this::collectFromUnboundSink)
|
||||
.toRule(RuleType.COLLECT_TABLE_FROM_SINK),
|
||||
any().whenNot(UnboundRelation.class::isInstance)
|
||||
.whenNot(UnboundTableSink.class::isInstance)
|
||||
@ -123,7 +124,7 @@ public class CollectRelation implements AnalysisRuleFactory {
|
||||
return null;
|
||||
}
|
||||
|
||||
private Plan collectFromUnboundTableSink(MatchingContext<UnboundTableSink<Plan>> ctx) {
|
||||
private Plan collectFromUnboundSink(MatchingContext<UnboundLogicalSink<Plan>> ctx) {
|
||||
List<String> nameParts = ctx.root.getNameParts();
|
||||
switch (nameParts.size()) {
|
||||
case 1:
|
||||
@ -181,6 +182,13 @@ public class CollectRelation implements AnalysisRuleFactory {
|
||||
if (tableFrom == TableFrom.QUERY) {
|
||||
collectMTMVCandidates(table, cascadesContext);
|
||||
}
|
||||
if (tableFrom == TableFrom.INSERT_TARGET) {
|
||||
if (!cascadesContext.getStatementContext().getInsertTargetSchema().isEmpty()) {
|
||||
LOG.warn("collect insert target table '{}' more than once.", tableQualifier);
|
||||
}
|
||||
cascadesContext.getStatementContext().getInsertTargetSchema().clear();
|
||||
cascadesContext.getStatementContext().getInsertTargetSchema().addAll(table.getFullSchema());
|
||||
}
|
||||
if (table instanceof View) {
|
||||
parseAndCollectFromView(tableQualifier, (View) table, cascadesContext);
|
||||
}
|
||||
|
||||
@ -24,7 +24,6 @@ import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.MetaLockUtils;
|
||||
import org.apache.doris.common.util.ProfileManager.ProfileType;
|
||||
import org.apache.doris.datasource.hive.HMSExternalTable;
|
||||
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
|
||||
@ -59,11 +58,9 @@ import org.apache.doris.qe.StmtExecutor;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
@ -181,9 +178,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
|
||||
|
||||
// lock after plan and check does table's schema changed to ensure we lock table order by id.
|
||||
TableIf newestTargetTableIf = RelationUtil.getTable(qualifiedTargetTableName, ctx.getEnv());
|
||||
List<TableIf> targetTables = Lists.newArrayList(targetTableIf, newestTargetTableIf);
|
||||
targetTables.sort(Comparator.comparing(TableIf::getId));
|
||||
MetaLockUtils.readLockTables(targetTables);
|
||||
newestTargetTableIf.readLock();
|
||||
try {
|
||||
if (targetTableIf.getId() != newestTargetTableIf.getId()) {
|
||||
LOG.warn("insert plan failed {} times. query id is {}. table id changed from {} to {}",
|
||||
@ -191,10 +186,11 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
|
||||
targetTableIf.getId(), newestTargetTableIf.getId());
|
||||
continue;
|
||||
}
|
||||
if (!targetTableIf.getFullSchema().equals(newestTargetTableIf.getFullSchema())) {
|
||||
// Use the schema saved during planning as the schema of the original target table.
|
||||
if (!ctx.getStatementContext().getInsertTargetSchema().equals(newestTargetTableIf.getFullSchema())) {
|
||||
LOG.warn("insert plan failed {} times. query id is {}. table schema changed from {} to {}",
|
||||
retryTimes, DebugUtil.printId(ctx.queryId()),
|
||||
targetTableIf.getFullSchema(), newestTargetTableIf.getFullSchema());
|
||||
ctx.getStatementContext().getInsertTargetSchema(), newestTargetTableIf.getFullSchema());
|
||||
continue;
|
||||
}
|
||||
if (ctx.getConnectType() == ConnectType.MYSQL && ctx.getMysqlChannel() != null) {
|
||||
@ -207,9 +203,9 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
|
||||
buildResult.physicalSink
|
||||
);
|
||||
}
|
||||
MetaLockUtils.readUnlockTables(targetTables);
|
||||
newestTargetTableIf.readUnlock();
|
||||
} catch (Throwable e) {
|
||||
MetaLockUtils.readUnlockTables(targetTables);
|
||||
newestTargetTableIf.readUnlock();
|
||||
// the abortTxn in onFail need to acquire table write lock
|
||||
if (insertExecutor != null) {
|
||||
insertExecutor.onFail(e);
|
||||
|
||||
Reference in New Issue
Block a user