[fix](Nereids) create constraint write edit log in lock scope (#30422)
write edit log in lock scope to ensure the order of log sequence. To avoid the sequence like: ``` add primary key pk1 add foreign key ref pk1 log foreign key log primary key ```
This commit is contained in:
@ -26,6 +26,7 @@ import org.apache.doris.cluster.ClusterNamespace;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.persist.AlterConstraintLog;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.BaseAnalysisTask;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
@ -225,7 +226,7 @@ public interface TableIf {
|
||||
}
|
||||
|
||||
// Note this function is not thread safe
|
||||
default void checkConstraintNotExistence(String name, Constraint primaryKeyConstraint,
|
||||
default void checkConstraintNotExistenceUnsafe(String name, Constraint primaryKeyConstraint,
|
||||
Map<String, Constraint> constraintMap) {
|
||||
if (constraintMap.containsKey(name)) {
|
||||
throw new RuntimeException(String.format("Constraint name %s has existed", name));
|
||||
@ -238,63 +239,72 @@ public interface TableIf {
|
||||
}
|
||||
}
|
||||
|
||||
default Constraint addUniqueConstraint(String name, ImmutableList<String> columns) {
|
||||
default void addUniqueConstraint(String name, ImmutableList<String> columns, boolean replay) {
|
||||
writeLock();
|
||||
try {
|
||||
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
|
||||
UniqueConstraint uniqueConstraint = new UniqueConstraint(name, ImmutableSet.copyOf(columns));
|
||||
checkConstraintNotExistence(name, uniqueConstraint, constraintMap);
|
||||
checkConstraintNotExistenceUnsafe(name, uniqueConstraint, constraintMap);
|
||||
constraintMap.put(name, uniqueConstraint);
|
||||
return uniqueConstraint;
|
||||
if (!replay) {
|
||||
Env.getCurrentEnv().getEditLog().logAddConstraint(
|
||||
new AlterConstraintLog(uniqueConstraint, this));
|
||||
}
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
default Constraint addPrimaryKeyConstraint(String name, ImmutableList<String> columns) {
|
||||
default void addPrimaryKeyConstraint(String name, ImmutableList<String> columns, boolean replay) {
|
||||
writeLock();
|
||||
try {
|
||||
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
|
||||
PrimaryKeyConstraint primaryKeyConstraint = new PrimaryKeyConstraint(name, ImmutableSet.copyOf(columns));
|
||||
checkConstraintNotExistence(name, primaryKeyConstraint, constraintMap);
|
||||
checkConstraintNotExistenceUnsafe(name, primaryKeyConstraint, constraintMap);
|
||||
constraintMap.put(name, primaryKeyConstraint);
|
||||
return primaryKeyConstraint;
|
||||
if (!replay) {
|
||||
Env.getCurrentEnv().getEditLog().logAddConstraint(
|
||||
new AlterConstraintLog(primaryKeyConstraint, this));
|
||||
}
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
default void updatePrimaryKeyForForeignKey(PrimaryKeyConstraint requirePrimaryKey, TableIf referencedTable) {
|
||||
referencedTable.writeLock();
|
||||
try {
|
||||
Optional<Constraint> primaryKeyConstraint = referencedTable.getConstraintsMapUnsafe().values().stream()
|
||||
.filter(requirePrimaryKey::equals)
|
||||
.findFirst();
|
||||
if (!primaryKeyConstraint.isPresent()) {
|
||||
throw new AnalysisException(String.format(
|
||||
"Foreign key constraint requires a primary key constraint %s in %s",
|
||||
requirePrimaryKey.getPrimaryKeyNames(), referencedTable.getName()));
|
||||
}
|
||||
((PrimaryKeyConstraint) (primaryKeyConstraint.get())).addForeignTable(this);
|
||||
} finally {
|
||||
referencedTable.writeUnlock();
|
||||
default PrimaryKeyConstraint tryGetPrimaryKeyForForeignKeyUnsafe(
|
||||
PrimaryKeyConstraint requirePrimaryKey, TableIf referencedTable) {
|
||||
Optional<Constraint> primaryKeyConstraint = referencedTable.getConstraintsMapUnsafe().values().stream()
|
||||
.filter(requirePrimaryKey::equals)
|
||||
.findFirst();
|
||||
if (!primaryKeyConstraint.isPresent()) {
|
||||
throw new AnalysisException(String.format(
|
||||
"Foreign key constraint requires a primary key constraint %s in %s",
|
||||
requirePrimaryKey.getPrimaryKeyNames(), referencedTable.getName()));
|
||||
}
|
||||
return ((PrimaryKeyConstraint) (primaryKeyConstraint.get()));
|
||||
}
|
||||
|
||||
default Constraint addForeignConstraint(String name, ImmutableList<String> columns,
|
||||
TableIf referencedTable, ImmutableList<String> referencedColumns) {
|
||||
default void addForeignConstraint(String name, ImmutableList<String> columns,
|
||||
TableIf referencedTable, ImmutableList<String> referencedColumns, boolean replay) {
|
||||
writeLock();
|
||||
referencedTable.writeLock();
|
||||
try {
|
||||
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
|
||||
ForeignKeyConstraint foreignKeyConstraint =
|
||||
new ForeignKeyConstraint(name, columns, referencedTable, referencedColumns);
|
||||
checkConstraintNotExistence(name, foreignKeyConstraint, constraintMap);
|
||||
PrimaryKeyConstraint requirePrimaryKey = new PrimaryKeyConstraint(name,
|
||||
checkConstraintNotExistenceUnsafe(name, foreignKeyConstraint, constraintMap);
|
||||
PrimaryKeyConstraint requirePrimaryKeyName = new PrimaryKeyConstraint(name,
|
||||
foreignKeyConstraint.getReferencedColumnNames());
|
||||
updatePrimaryKeyForForeignKey(requirePrimaryKey, referencedTable);
|
||||
PrimaryKeyConstraint primaryKeyConstraint =
|
||||
tryGetPrimaryKeyForForeignKeyUnsafe(requirePrimaryKeyName, referencedTable);
|
||||
primaryKeyConstraint.addForeignTable(this);
|
||||
constraintMap.put(name, foreignKeyConstraint);
|
||||
return foreignKeyConstraint;
|
||||
if (!replay) {
|
||||
Env.getCurrentEnv().getEditLog().logAddConstraint(
|
||||
new AlterConstraintLog(foreignKeyConstraint, this));
|
||||
}
|
||||
} finally {
|
||||
referencedTable.writeUnlock();
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
@ -303,22 +313,21 @@ public interface TableIf {
|
||||
if (constraint instanceof UniqueConstraint) {
|
||||
UniqueConstraint uniqueConstraint = (UniqueConstraint) constraint;
|
||||
this.addUniqueConstraint(constraint.getName(),
|
||||
ImmutableList.copyOf(uniqueConstraint.getUniqueColumnNames()));
|
||||
ImmutableList.copyOf(uniqueConstraint.getUniqueColumnNames()), true);
|
||||
} else if (constraint instanceof PrimaryKeyConstraint) {
|
||||
PrimaryKeyConstraint primaryKeyConstraint = (PrimaryKeyConstraint) constraint;
|
||||
this.addPrimaryKeyConstraint(primaryKeyConstraint.getName(),
|
||||
ImmutableList.copyOf(primaryKeyConstraint.getPrimaryKeyNames()));
|
||||
ImmutableList.copyOf(primaryKeyConstraint.getPrimaryKeyNames()), true);
|
||||
} else if (constraint instanceof ForeignKeyConstraint) {
|
||||
ForeignKeyConstraint foreignKey = (ForeignKeyConstraint) constraint;
|
||||
this.addForeignConstraint(foreignKey.getName(),
|
||||
ImmutableList.copyOf(foreignKey.getForeignKeyNames()),
|
||||
foreignKey.getReferencedTable(),
|
||||
ImmutableList.copyOf(foreignKey.getReferencedColumnNames()));
|
||||
ImmutableList.copyOf(foreignKey.getReferencedColumnNames()), true);
|
||||
}
|
||||
}
|
||||
|
||||
default Constraint dropConstraint(String name) {
|
||||
Constraint dropConstraint;
|
||||
default void dropConstraint(String name) {
|
||||
writeLock();
|
||||
try {
|
||||
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
|
||||
@ -332,11 +341,10 @@ public interface TableIf {
|
||||
((PrimaryKeyConstraint) constraint).getForeignTables()
|
||||
.forEach(t -> t.dropFKReferringPK(this, (PrimaryKeyConstraint) constraint));
|
||||
}
|
||||
dropConstraint = constraint;
|
||||
Env.getCurrentEnv().getEditLog().logDropConstraint(new AlterConstraintLog(constraint, this));
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
return dropConstraint;
|
||||
}
|
||||
|
||||
default void dropFKReferringPK(TableIf table, PrimaryKeyConstraint constraint) {
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.nereids.trees.plans.commands;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.nereids.NereidsPlanner;
|
||||
@ -30,7 +29,6 @@ import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
|
||||
import org.apache.doris.persist.AlterConstraintLog;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
|
||||
@ -63,19 +61,16 @@ public class AddConstraintCommand extends Command implements ForwardWithSync {
|
||||
@Override
|
||||
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
|
||||
Pair<ImmutableList<String>, TableIf> columnsAndTable = extractColumnsAndTable(ctx, constraint.toProject());
|
||||
org.apache.doris.catalog.constraint.Constraint catalogConstraint = null;
|
||||
if (constraint.isForeignKey()) {
|
||||
Pair<ImmutableList<String>, TableIf> referencedColumnsAndTable
|
||||
= extractColumnsAndTable(ctx, constraint.toReferenceProject());
|
||||
catalogConstraint = columnsAndTable.second.addForeignConstraint(name, columnsAndTable.first,
|
||||
referencedColumnsAndTable.second, referencedColumnsAndTable.first);
|
||||
columnsAndTable.second.addForeignConstraint(name, columnsAndTable.first,
|
||||
referencedColumnsAndTable.second, referencedColumnsAndTable.first, false);
|
||||
} else if (constraint.isPrimaryKey()) {
|
||||
catalogConstraint = columnsAndTable.second.addPrimaryKeyConstraint(name, columnsAndTable.first);
|
||||
columnsAndTable.second.addPrimaryKeyConstraint(name, columnsAndTable.first, false);
|
||||
} else if (constraint.isUnique()) {
|
||||
catalogConstraint = columnsAndTable.second.addUniqueConstraint(name, columnsAndTable.first);
|
||||
columnsAndTable.second.addUniqueConstraint(name, columnsAndTable.first, false);
|
||||
}
|
||||
Env.getCurrentEnv().getEditLog().logAddConstraint(
|
||||
new AlterConstraintLog(catalogConstraint, columnsAndTable.second));
|
||||
}
|
||||
|
||||
private Pair<ImmutableList<String>, TableIf> extractColumnsAndTable(ConnectContext ctx, LogicalPlan plan) {
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.nereids.trees.plans.commands;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.nereids.NereidsPlanner;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
@ -28,7 +27,6 @@ import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
|
||||
import org.apache.doris.persist.AlterConstraintLog;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
|
||||
@ -58,8 +56,7 @@ public class DropConstraintCommand extends Command implements ForwardWithSync {
|
||||
@Override
|
||||
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
|
||||
TableIf table = extractTable(ctx, plan);
|
||||
org.apache.doris.catalog.constraint.Constraint catalogConstraint = table.dropConstraint(name);
|
||||
Env.getCurrentEnv().getEditLog().logDropConstraint(new AlterConstraintLog(catalogConstraint, table));
|
||||
table.dropConstraint(name);
|
||||
}
|
||||
|
||||
private TableIf extractTable(ConnectContext ctx, LogicalPlan plan) {
|
||||
|
||||
@ -169,8 +169,11 @@ class ConstraintPersistTest extends TestWithFeService implements PlanPatternMatc
|
||||
@Test
|
||||
void externalTableTest() throws Exception {
|
||||
ExternalTable externalTable = new ExternalTable();
|
||||
externalTable.addPrimaryKeyConstraint("pk", ImmutableList.of("col"));
|
||||
|
||||
try {
|
||||
externalTable.addPrimaryKeyConstraint("pk", ImmutableList.of("col"), false);
|
||||
} catch (Exception ignore) {
|
||||
// ignore
|
||||
}
|
||||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
DataOutput output = new DataOutputStream(outputStream);
|
||||
externalTable.write(output);
|
||||
|
||||
Reference in New Issue
Block a user