[feat](Nereids) persist constraint in table (#29767)

This commit is contained in:
谢健
2024-01-16 18:21:40 +08:00
committed by yiguolei
parent 2916745cf2
commit 4e41e1d797
22 changed files with 683 additions and 45 deletions

View File

@ -72,9 +72,11 @@ public final class FeMetaVersion {
public static final int VERSION_125 = 125;
// For write/read function nullable mode info
public static final int VERSION_126 = 126;
// For constraints
public static final int VERSION_127 = 127;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_126;
public static final int VERSION_CURRENT = VERSION_127;
// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...

View File

@ -22,12 +22,14 @@ import org.apache.doris.catalog.constraint.Constraint;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.QueryableReentrantReadWriteLock;
import org.apache.doris.common.util.SqlUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
@ -50,7 +52,6 @@ import java.io.DataOutput;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -116,8 +117,8 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
@SerializedName(value = "comment")
protected String comment = "";
@SerializedName(value = "constraints")
private HashMap<String, Constraint> constraintsMap = new HashMap<>();
@SerializedName(value = "ta")
private TableAttributes tableAttributes = new TableAttributes();
// check read lock leaky
private Map<Long, String> readLockThreads = null;
@ -334,12 +335,12 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
}
public Constraint getConstraint(String name) {
return constraintsMap.get(name);
return getConstraintsMap().get(name);
}
@Override
public Map<String, Constraint> getConstraintsMapUnsafe() {
return constraintsMap;
return tableAttributes.getConstraintsMap();
}
public TableType getType() {
@ -455,9 +456,9 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
for (Column column : fullSchema) {
column.write(out);
}
Text.writeString(out, comment);
// write table attributes
Text.writeString(out, GsonUtils.GSON.toJson(tableAttributes));
// write create time
out.writeLong(createTime);
}
@ -488,7 +489,12 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
hasCompoundKey = true;
}
comment = Text.readString(in);
// table attribute only support after version 127
if (FeMetaVersion.VERSION_127 <= Env.getCurrentEnvJournalVersion()) {
String json = Text.readString(in);
this.tableAttributes = GsonUtils.GSON.fromJson(json, TableAttributes.class);
}
// read create time
this.createTime = in.readLong();
}

View File

@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.catalog.constraint.Constraint;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* TableAttributes contains additional information about all table
*/
public class TableAttributes implements Writable {
@SerializedName(value = "constraints")
private final Map<String, Constraint> constraintsMap = new HashMap<>();
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public TableAttributes read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), TableAttributes.class);
}
public Map<String, Constraint> getConstraintsMap() {
return constraintsMap;
}
}

View File

@ -183,7 +183,7 @@ public interface TableIf {
}
}
default Map<String, Constraint> getConstraintMap() {
default Map<String, Constraint> getConstraintsMap() {
readLock();
try {
return ImmutableMap.copyOf(getConstraintsMapUnsafe());
@ -236,25 +236,27 @@ public interface TableIf {
}
}
default void addUniqueConstraint(String name, ImmutableList<String> columns) {
default Constraint addUniqueConstraint(String name, ImmutableList<String> columns) {
writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
UniqueConstraint uniqueConstraint = new UniqueConstraint(name, ImmutableSet.copyOf(columns));
checkConstraintNotExistence(name, uniqueConstraint, constraintMap);
constraintMap.put(name, uniqueConstraint);
return uniqueConstraint;
} finally {
writeUnlock();
}
}
default void addPrimaryKeyConstraint(String name, ImmutableList<String> columns) {
default Constraint addPrimaryKeyConstraint(String name, ImmutableList<String> columns) {
writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
PrimaryKeyConstraint primaryKeyConstraint = new PrimaryKeyConstraint(name, ImmutableSet.copyOf(columns));
checkConstraintNotExistence(name, primaryKeyConstraint, constraintMap);
constraintMap.put(name, primaryKeyConstraint);
return primaryKeyConstraint;
} finally {
writeUnlock();
}
@ -277,7 +279,7 @@ public interface TableIf {
}
}
default void addForeignConstraint(String name, ImmutableList<String> columns,
default Constraint addForeignConstraint(String name, ImmutableList<String> columns,
TableIf referencedTable, ImmutableList<String> referencedColumns) {
writeLock();
try {
@ -289,12 +291,32 @@ public interface TableIf {
foreignKeyConstraint.getReferencedColumnNames());
updatePrimaryKeyForForeignKey(requirePrimaryKey, referencedTable);
constraintMap.put(name, foreignKeyConstraint);
return foreignKeyConstraint;
} finally {
writeUnlock();
}
}
default void dropConstraint(String name) {
default void replayAddConstraint(Constraint constraint) {
if (constraint instanceof UniqueConstraint) {
UniqueConstraint uniqueConstraint = (UniqueConstraint) constraint;
this.addUniqueConstraint(constraint.getName(),
ImmutableList.copyOf(uniqueConstraint.getUniqueColumnNames()));
} else if (constraint instanceof PrimaryKeyConstraint) {
PrimaryKeyConstraint primaryKeyConstraint = (PrimaryKeyConstraint) constraint;
this.addPrimaryKeyConstraint(primaryKeyConstraint.getName(),
ImmutableList.copyOf(primaryKeyConstraint.getPrimaryKeyNames()));
} else if (constraint instanceof ForeignKeyConstraint) {
ForeignKeyConstraint foreignKey = (ForeignKeyConstraint) constraint;
this.addForeignConstraint(foreignKey.getName(),
ImmutableList.copyOf(foreignKey.getForeignKeyNames()),
foreignKey.getReferencedTable(),
ImmutableList.copyOf(foreignKey.getReferencedColumnNames()));
}
}
default Constraint dropConstraint(String name) {
Constraint dropConstraint;
writeLock();
try {
Map<String, Constraint> constraintMap = getConstraintsMapUnsafe();
@ -308,9 +330,11 @@ public interface TableIf {
((PrimaryKeyConstraint) constraint).getForeignTables()
.forEach(t -> t.dropFKReferringPK(this, (PrimaryKeyConstraint) constraint));
}
dropConstraint = constraint;
} finally {
writeUnlock();
}
return dropConstraint;
}
default void dropFKReferringPK(TableIf table, PrimaryKeyConstraint constraint) {

View File

@ -17,15 +17,25 @@
package org.apache.doris.catalog.constraint;
public abstract class Constraint {
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public abstract class Constraint implements Writable {
public enum ConstraintType {
FOREIGN_KEY("FOREIGN KEY"),
PRIMARY_KEY("PRIMARY KEY"),
UNIQUE("UNIQUE");
@SerializedName(value = "tn")
private final String name;
private ConstraintType(String stringValue) {
ConstraintType(String stringValue) {
this.name = stringValue;
}
@ -34,7 +44,9 @@ public abstract class Constraint {
}
}
@SerializedName(value = "n")
private final String name;
@SerializedName(value = "ty")
private final ConstraintType type;
@ -50,4 +62,17 @@ public abstract class Constraint {
public ConstraintType getType() {
return type;
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
/**
* Read Constraint.
**/
public static Constraint read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, Constraint.class);
}
}

View File

@ -24,13 +24,17 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.ImmutableSet;
import com.google.gson.annotations.SerializedName;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
public class ForeignKeyConstraint extends Constraint {
private final ImmutableMap<String, String> foreignToReference;
@SerializedName(value = "ftr")
private final Map<String, String> foreignToReference;
@SerializedName(value = "rt")
private final TableIdentifier referencedTable;
public ForeignKeyConstraint(String name, List<String> columns,
@ -50,11 +54,11 @@ public class ForeignKeyConstraint extends Constraint {
this.foreignToReference = builder.build();
}
public ImmutableSet<String> getForeignKeyNames() {
public Set<String> getForeignKeyNames() {
return foreignToReference.keySet();
}
public ImmutableSet<String> getReferencedColumnNames() {
public Set<String> getReferencedColumnNames() {
return ImmutableSet.copyOf(foreignToReference.values());
}
@ -62,7 +66,7 @@ public class ForeignKeyConstraint extends Constraint {
return foreignToReference.get(column);
}
public ImmutableMap<String, String> getForeignToReference() {
public Map<String, String> getForeignToReference() {
return foreignToReference;
}

View File

@ -23,15 +23,18 @@ import org.apache.doris.catalog.TableIf;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.gson.annotations.SerializedName;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class PrimaryKeyConstraint extends Constraint {
private final ImmutableSet<String> columns;
@SerializedName(value = "cols")
private final Set<String> columns;
// record the foreign table which references the primary key
@SerializedName(value = "ft")
private final Set<TableIdentifier> foreignTables = new HashSet<>();
public PrimaryKeyConstraint(String name, Set<String> columns) {
@ -39,11 +42,11 @@ public class PrimaryKeyConstraint extends Constraint {
this.columns = ImmutableSet.copyOf(columns);
}
public ImmutableSet<String> getPrimaryKeyNames() {
public Set<String> getPrimaryKeyNames() {
return columns;
}
public ImmutableSet<Column> getPrimaryKeys(TableIf table) {
public Set<Column> getPrimaryKeys(TableIf table) {
return columns.stream().map(table::getColumn).collect(ImmutableSet.toImmutableSet());
}

View File

@ -20,30 +20,42 @@ package org.apache.doris.catalog.constraint;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.nereids.exceptions.MetaNotFoundException;
import com.google.common.base.Preconditions;
import com.google.gson.annotations.SerializedName;
import java.util.Objects;
class TableIdentifier {
public class TableIdentifier {
@SerializedName(value = "dbId")
private final long databaseId;
@SerializedName(value = "tId")
private final long tableId;
@SerializedName(value = "cId")
private final long catalogId;
TableIdentifier(TableIf tableIf) {
public TableIdentifier(TableIf tableIf) {
Preconditions.checkArgument(tableIf != null,
"Table can not be null in constraint");
databaseId = tableIf.getDatabase().getId();
tableId = tableIf.getId();
databaseId = tableIf.getDatabase().getId();
catalogId = tableIf.getDatabase().getCatalog().getId();
}
TableIf toTableIf() {
DatabaseIf databaseIf = Env.getCurrentEnv().getCurrentCatalog().getDbNullable(databaseId);
public TableIf toTableIf() {
CatalogIf<?> catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
if (catalogIf == null) {
throw new MetaNotFoundException(String.format("Can not find catalog %s in constraint", catalogId));
}
DatabaseIf<?> databaseIf = catalogIf.getDbNullable(databaseId);
if (databaseIf == null) {
throw new RuntimeException(String.format("Can not find database %s in constraint", databaseId));
throw new MetaNotFoundException(String.format("Can not find database %s in constraint", databaseId));
}
TableIf tableIf = databaseIf.getTableNullable(tableId);
if (tableIf == null) {
throw new RuntimeException(String.format("Can not find table %s in constraint", databaseId));
throw new MetaNotFoundException(String.format("Can not find table %s in constraint", databaseId));
}
return tableIf;
}

View File

@ -22,22 +22,24 @@ import org.apache.doris.catalog.TableIf;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableSet;
import com.google.gson.annotations.SerializedName;
import java.util.Set;
public class UniqueConstraint extends Constraint {
private final ImmutableSet<String> columns;
@SerializedName(value = "cols")
private final Set<String> columns;
public UniqueConstraint(String name, Set<String> columns) {
super(ConstraintType.UNIQUE, name);
this.columns = ImmutableSet.copyOf(columns);
}
public ImmutableSet<String> getUniqueColumnNames() {
public Set<String> getUniqueColumnNames() {
return columns;
}
public ImmutableSet<Column> getUniqueKeys(TableIf table) {
public Set<Column> getUniqueKeys(TableIf table) {
return columns.stream().map(table::getColumn).collect(ImmutableSet.toImmutableSet());
}

View File

@ -21,7 +21,9 @@ import org.apache.doris.alter.AlterCancelException;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableAttributes;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.constraint.Constraint;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
@ -77,6 +79,9 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
protected long timestamp;
@SerializedName(value = "dbName")
protected String dbName;
@SerializedName(value = "ta")
private final TableAttributes tableAttributes = new TableAttributes();
// this field will be refreshed after reloading schema
protected volatile long schemaUpdateTime;
@ -272,6 +277,11 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
return null;
}
@Override
public Map<String, Constraint> getConstraintsMapUnsafe() {
return tableAttributes.getConstraintsMap();
}
@Override
public String getEngine() {
return getType().toEngineName();

View File

@ -94,6 +94,11 @@ public class ExternalSchemaCache {
}
}
public void addSchemaForTest(String dbName, String tblName, ImmutableList<Column> schema) {
SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
schemaCache.put(key, schema);
}
public void invalidateTableCache(String dbName, String tblName) {
SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
schemaCache.invalidate(key);

View File

@ -58,6 +58,7 @@ import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.mysql.privilege.UserPropertyInfo;
import org.apache.doris.persist.AlterConstraintLog;
import org.apache.doris.persist.AlterDatabasePropertyInfo;
import org.apache.doris.persist.AlterLightSchemaChangeInfo;
import org.apache.doris.persist.AlterMTMV;
@ -793,6 +794,12 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_DROP_CONSTRAINT:
case OperationType.OP_ADD_CONSTRAINT: {
data = AlterConstraintLog.read(in);
isRead = true;
break;
}
case OperationType.OP_ALTER_USER: {
data = AlterUserOperationLog.read(in);
isRead = true;

View File

@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.exceptions;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import java.util.Optional;
/** Nereids's AnalysisException. */
public class MetaNotFoundException extends RuntimeException {
private final String message;
private final Optional<Integer> line;
private final Optional<Integer> startPosition;
private final Optional<LogicalPlan> plan;
public MetaNotFoundException(String message, Throwable cause, Optional<Integer> line,
Optional<Integer> startPosition, Optional<LogicalPlan> plan) {
super(message, cause);
this.message = message;
this.line = line;
this.startPosition = startPosition;
this.plan = plan;
}
public MetaNotFoundException(String message, Optional<Integer> line,
Optional<Integer> startPosition, Optional<LogicalPlan> plan) {
super(message);
this.message = message;
this.line = line;
this.startPosition = startPosition;
this.plan = plan;
}
public MetaNotFoundException(String message, Throwable cause) {
this(message, cause, Optional.empty(), Optional.empty(), Optional.empty());
}
public MetaNotFoundException(String message) {
this(message, Optional.empty(), Optional.empty(), Optional.empty());
}
@Override
public String getMessage() {
String planAnnotation = plan.map(p -> ";\n" + p.treeString()).orElse("");
return getSimpleMessage() + planAnnotation;
}
private String getSimpleMessage() {
if (line.isPresent() || startPosition.isPresent()) {
String lineAnnotation = line.map(l -> "line " + l).orElse("");
String positionAnnotation = startPosition.map(s -> " pos " + s).orElse("");
return message + ";" + lineAnnotation + positionAnnotation;
} else {
return message;
}
}
// TODO: support ErrorCode
}

View File

@ -17,7 +17,7 @@
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.NereidsPlanner;
@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.persist.AlterConstraintLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
@ -46,6 +47,7 @@ import java.util.Set;
public class AddConstraintCommand extends Command implements ForwardWithSync {
public static final Logger LOG = LogManager.getLogger(AddConstraintCommand.class);
private final String name;
private final Constraint constraint;
@ -61,16 +63,19 @@ public class AddConstraintCommand extends Command implements ForwardWithSync {
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
Pair<ImmutableList<String>, TableIf> columnsAndTable = extractColumnsAndTable(ctx, constraint.toProject());
org.apache.doris.catalog.constraint.Constraint catalogConstraint = null;
if (constraint.isForeignKey()) {
Pair<ImmutableList<String>, TableIf> referencedColumnsAndTable
= extractColumnsAndTable(ctx, constraint.toReferenceProject());
columnsAndTable.second.addForeignConstraint(name, columnsAndTable.first,
catalogConstraint = columnsAndTable.second.addForeignConstraint(name, columnsAndTable.first,
referencedColumnsAndTable.second, referencedColumnsAndTable.first);
} else if (constraint.isPrimaryKey()) {
columnsAndTable.second.addPrimaryKeyConstraint(name, columnsAndTable.first);
catalogConstraint = columnsAndTable.second.addPrimaryKeyConstraint(name, columnsAndTable.first);
} else if (constraint.isUnique()) {
columnsAndTable.second.addUniqueConstraint(name, columnsAndTable.first);
catalogConstraint = columnsAndTable.second.addUniqueConstraint(name, columnsAndTable.first);
}
Env.getCurrentEnv().getEditLog().logAddConstraint(
new AlterConstraintLog(catalogConstraint, columnsAndTable.second));
}
private Pair<ImmutableList<String>, TableIf> extractColumnsAndTable(ConnectContext ctx, LogicalPlan plan) {
@ -82,8 +87,6 @@ public class AddConstraintCommand extends Command implements ForwardWithSync {
throw new AnalysisException("Can not found table in constraint " + constraint.toString());
}
LogicalCatalogRelation catalogRelation = logicalCatalogRelationSet.iterator().next();
Preconditions.checkArgument(catalogRelation.getTable() instanceof Table,
"We only support table now but we meet ", catalogRelation.getTable());
ImmutableList<String> columns = analyzedPlan.getOutput().stream()
.map(s -> {
Preconditions.checkArgument(s instanceof SlotReference

View File

@ -17,7 +17,7 @@
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
@ -28,10 +28,10 @@ import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.persist.AlterConstraintLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -58,7 +58,8 @@ public class DropConstraintCommand extends Command implements ForwardWithSync {
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
TableIf table = extractTable(ctx, plan);
table.dropConstraint(name);
org.apache.doris.catalog.constraint.Constraint catalogConstraint = table.dropConstraint(name);
Env.getCurrentEnv().getEditLog().logDropConstraint(new AlterConstraintLog(catalogConstraint, table));
}
private TableIf extractTable(ConnectContext ctx, LogicalPlan plan) {
@ -70,8 +71,6 @@ public class DropConstraintCommand extends Command implements ForwardWithSync {
throw new AnalysisException("Can not found table when dropping constraint");
}
LogicalCatalogRelation catalogRelation = logicalCatalogRelationSet.iterator().next();
Preconditions.checkArgument(catalogRelation.getTable() instanceof Table,
"Don't support table ", catalogRelation.getTable());
return catalogRelation.getTable();
}

View File

@ -51,7 +51,7 @@ public class ShowConstraintsCommand extends Command implements NoForward {
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
TableIf tableIf = RelationUtil.getDbAndTable(
RelationUtil.getQualifierName(ctx, nameParts), ctx.getEnv()).value();
List<List<String>> res = tableIf.getConstraintMap().entrySet().stream()
List<List<String>> res = tableIf.getConstraintsMap().entrySet().stream()
.map(e -> Lists.newArrayList(e.getKey(),
e.getValue().getType().getName(),
e.getValue().toString()))

View File

@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.persist;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.constraint.Constraint;
import org.apache.doris.catalog.constraint.TableIdentifier;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class AlterConstraintLog implements Writable {
@SerializedName("ct")
final Constraint constraint;
@SerializedName("tid")
final TableIdentifier tableIdentifier;
public AlterConstraintLog(Constraint constraint, TableIf table) {
this.constraint = constraint;
this.tableIdentifier = new TableIdentifier(table);
}
public TableIf getTableIf() {
return tableIdentifier.toTableIf();
}
public Constraint getConstraint() {
return constraint;
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public static AlterConstraintLog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, AlterConstraintLog.class);
}
}

View File

@ -973,6 +973,16 @@ public class EditLog {
case OperationType.OP_ALTER_MTMV_STMT: {
break;
}
case OperationType.OP_ADD_CONSTRAINT: {
final AlterConstraintLog log = (AlterConstraintLog) journal.getData();
log.getTableIf().replayAddConstraint(log.getConstraint());
break;
}
case OperationType.OP_DROP_CONSTRAINT: {
final AlterConstraintLog log = (AlterConstraintLog) journal.getData();
log.getTableIf().dropConstraint(log.getConstraint().getName());
break;
}
case OperationType.OP_ALTER_USER: {
final AlterUserOperationLog log = (AlterUserOperationLog) journal.getData();
env.getAuth().replayAlterUser(log);
@ -1969,6 +1979,14 @@ public class EditLog {
}
public void logAddConstraint(AlterConstraintLog log) {
logEdit(OperationType.OP_ADD_CONSTRAINT, log);
}
public void logDropConstraint(AlterConstraintLog log) {
logEdit(OperationType.OP_DROP_CONSTRAINT, log);
}
public void logInsertOverwrite(InsertOverwriteLog log) {
logEdit(OperationType.OP_INSERT_OVERWRITE, log);
}

View File

@ -285,6 +285,8 @@ public class OperationType {
public static final short OP_CHANGE_MTMV_TASK = 342;
@Deprecated
public static final short OP_ALTER_MTMV_STMT = 345;
public static final short OP_ADD_CONSTRAINT = 346;
public static final short OP_DROP_CONSTRAINT = 347;
public static final short OP_DROP_EXTERNAL_TABLE = 350;
public static final short OP_DROP_EXTERNAL_DB = 351;
@ -355,6 +357,7 @@ public class OperationType {
public static final short OP_INSERT_OVERWRITE = 461;
/**
* Get opcode name by op code.
**/

View File

@ -44,6 +44,10 @@ import org.apache.doris.catalog.SinglePartitionInfo;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.constraint.Constraint;
import org.apache.doris.catalog.constraint.ForeignKeyConstraint;
import org.apache.doris.catalog.constraint.PrimaryKeyConstraint;
import org.apache.doris.catalog.constraint.UniqueConstraint;
import org.apache.doris.catalog.external.EsExternalDatabase;
import org.apache.doris.catalog.external.EsExternalTable;
import org.apache.doris.catalog.external.ExternalDatabase;
@ -202,6 +206,12 @@ public class GsonUtils {
Policy.class, "clazz").registerSubtype(RowPolicy.class, RowPolicy.class.getSimpleName())
.registerSubtype(StoragePolicy.class, StoragePolicy.class.getSimpleName());
private static RuntimeTypeAdapterFactory<Constraint> constraintTypeAdapterFactory = RuntimeTypeAdapterFactory.of(
Constraint.class, "clazz")
.registerSubtype(PrimaryKeyConstraint.class, PrimaryKeyConstraint.class.getSimpleName())
.registerSubtype(ForeignKeyConstraint.class, ForeignKeyConstraint.class.getSimpleName())
.registerSubtype(UniqueConstraint.class, UniqueConstraint.class.getSimpleName());
private static RuntimeTypeAdapterFactory<CatalogIf> dsTypeAdapterFactory = RuntimeTypeAdapterFactory.of(
CatalogIf.class, "clazz")
.registerSubtype(InternalCatalog.class, InternalCatalog.class.getSimpleName())
@ -287,6 +297,7 @@ public class GsonUtils {
.registerTypeAdapterFactory(hbResponseTypeAdapterFactory)
.registerTypeAdapterFactory(rdsTypeAdapterFactory)
.registerTypeAdapterFactory(jobExecutorRuntimeTypeAdapterFactory)
.registerTypeAdapterFactory(constraintTypeAdapterFactory)
.registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer())
.registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter())
.registerTypeAdapter(PartitionKey.class, new PartitionKey.PartitionKeySerializer())

View File

@ -0,0 +1,305 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog.constraint;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.EsExternalDatabase;
import org.apache.doris.catalog.external.EsExternalTable;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.EsExternalCatalog;
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.nereids.util.PlanPatternMatchSupported;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.persist.AlterConstraintLog;
import org.apache.doris.persist.EditLog;
import org.apache.doris.persist.OperationType;
import org.apache.doris.utframe.TestWithFeService;
import com.google.common.collect.ImmutableList;
import mockit.Mock;
import mockit.MockUp;
import org.apache.hadoop.util.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
class ConstraintPersistTest extends TestWithFeService implements PlanPatternMatchSupported {
@Override
public void runBeforeAll() throws Exception {
createDatabase("test");
connectContext.setDatabase("test");
createTable("create table t1 (\n"
+ " k1 int,\n"
+ " k2 int\n"
+ ")\n"
+ "unique key(k1, k2)\n"
+ "distributed by hash(k1) buckets 4\n"
+ "properties(\n"
+ " \"replication_num\"=\"1\"\n"
+ ")");
createTable("create table t2 (\n"
+ " k1 int,\n"
+ " k2 int\n"
+ ")\n"
+ "unique key(k1, k2)\n"
+ "distributed by hash(k1) buckets 4\n"
+ "properties(\n"
+ " \"replication_num\"=\"1\"\n"
+ ")");
}
@Test
void addConstraintLogPersistTest() throws Exception {
Config.edit_log_type = "local";
addConstraint("alter table t1 add constraint pk primary key (k1)");
addConstraint("alter table t2 add constraint pk primary key (k1)");
addConstraint("alter table t1 add constraint uk unique (k1)");
addConstraint("alter table t1 add constraint fk foreign key (k1) references t2(k1)");
TableIf tableIf = RelationUtil.getTable(
RelationUtil.getQualifierName(connectContext, Lists.newArrayList("test", "t1")),
connectContext.getEnv());
Map<String, Constraint> constraintMap = tableIf.getConstraintsMap();
tableIf.getConstraintsMapUnsafe().clear();
Assertions.assertTrue(tableIf.getConstraintsMap().isEmpty());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutput output = new DataOutputStream(outputStream);
for (Constraint value : constraintMap.values()) {
JournalEntity journalEntity = new JournalEntity();
journalEntity.setData(new AlterConstraintLog(value, tableIf));
journalEntity.setOpCode(OperationType.OP_ADD_CONSTRAINT);
journalEntity.write(output);
}
InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
DataInput input = new DataInputStream(inputStream);
for (int i = 0; i < constraintMap.values().size(); i++) {
JournalEntity journalEntity = new JournalEntity();
journalEntity.readFields(input);
EditLog.loadJournal(Env.getCurrentEnv(), 0L, journalEntity);
}
Assertions.assertEquals(tableIf.getConstraintsMap(), constraintMap);
dropConstraint("alter table t1 drop constraint fk");
dropConstraint("alter table t1 drop constraint pk");
dropConstraint("alter table t2 drop constraint pk");
dropConstraint("alter table t1 drop constraint uk");
}
@Test
void dropConstraintLogPersistTest() throws Exception {
Config.edit_log_type = "local";
addConstraint("alter table t1 add constraint pk primary key (k1)");
addConstraint("alter table t2 add constraint pk primary key (k1)");
addConstraint("alter table t1 add constraint uk unique (k1)");
addConstraint("alter table t1 add constraint fk foreign key (k1) references t2(k1)");
TableIf tableIf = RelationUtil.getTable(
RelationUtil.getQualifierName(connectContext, Lists.newArrayList("test", "t1")),
connectContext.getEnv());
Map<String, Constraint> constraintMap = tableIf.getConstraintsMap();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutput output = new DataOutputStream(outputStream);
for (Constraint value : constraintMap.values()) {
JournalEntity journalEntity = new JournalEntity();
journalEntity.setData(new AlterConstraintLog(value, tableIf));
journalEntity.setOpCode(OperationType.OP_DROP_CONSTRAINT);
journalEntity.write(output);
}
InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
DataInput input = new DataInputStream(inputStream);
for (int i = 0; i < constraintMap.values().size(); i++) {
JournalEntity journalEntity = new JournalEntity();
journalEntity.readFields(input);
EditLog.loadJournal(Env.getCurrentEnv(), 0L, journalEntity);
}
Assertions.assertTrue(tableIf.getConstraintsMap().isEmpty());
}
@Test
void constraintWithTablePersistTest() throws Exception {
addConstraint("alter table t1 add constraint pk primary key (k1)");
addConstraint("alter table t2 add constraint pk primary key (k1)");
addConstraint("alter table t1 add constraint uk unique (k1)");
addConstraint("alter table t1 add constraint fk foreign key (k1) references t2(k1)");
TableIf tableIf = RelationUtil.getTable(
RelationUtil.getQualifierName(connectContext, Lists.newArrayList("test", "t1")),
connectContext.getEnv());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutput output = new DataOutputStream(outputStream);
tableIf.write(output);
InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
DataInput input = new DataInputStream(inputStream);
TableIf loadTable = Table.read(input);
Assertions.assertEquals(loadTable.getConstraintsMap(), tableIf.getConstraintsMap());
dropConstraint("alter table t1 drop constraint fk");
dropConstraint("alter table t1 drop constraint pk");
dropConstraint("alter table t2 drop constraint pk");
dropConstraint("alter table t1 drop constraint uk");
}
@Test
void externalTableTest() throws Exception {
ExternalTable externalTable = new ExternalTable();
externalTable.addPrimaryKeyConstraint("pk", ImmutableList.of("col"));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutput output = new DataOutputStream(outputStream);
externalTable.write(output);
InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
DataInput input = new DataInputStream(inputStream);
TableIf loadTable = ExternalTable.read(input);
Assertions.assertEquals(1, loadTable.getConstraintsMap().size());
}
@Test
void addConstraintLogPersistForExternalTableTest() throws Exception {
Config.edit_log_type = "local";
createCatalog("create catalog es properties('type' = 'es', 'elasticsearch.hosts' = 'http://192.168.0.1',"
+ " 'elasticsearch.username' = 'user1');");
Env.getCurrentEnv().changeCatalog(connectContext, "es");
EsExternalCatalog esCatalog = (EsExternalCatalog) getCatalog("es");
EsExternalDatabase db = new EsExternalDatabase(esCatalog, 10002, "es_db1");
EsExternalTable tbl = new EsExternalTable(10003, "es_tbl1", "es_db1", esCatalog);
ImmutableList<Column> schema = ImmutableList.of(new Column("k1", PrimitiveType.INT));
tbl.setNewFullSchema(schema);
db.addTableForTest(tbl);
esCatalog.addDatabaseForTest(db);
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(esCatalog).addSchemaForTest(db.getFullName(), tbl.getName(), schema);
new MockUp<RelationUtil>() {
@Mock
public TableIf getTable(List<String> qualifierName, Env env) {
return tbl;
}
};
new MockUp<ExternalTable>() {
@Mock
public DatabaseIf getDatabase() {
return db;
}
};
new MockUp<TableIdentifier>() {
@Mock
public TableIf toTableIf() {
return tbl;
}
};
addConstraint("alter table es.es_db1.es_tbl1 add constraint pk primary key (k1)");
addConstraint("alter table es.es_db1.es_tbl1 add constraint uk unique (k1)");
TableIf tableIf = RelationUtil.getTable(
RelationUtil.getQualifierName(connectContext, Lists.newArrayList("test", "t1")),
connectContext.getEnv());
Map<String, Constraint> constraintMap = tableIf.getConstraintsMap();
tableIf.getConstraintsMapUnsafe().clear();
Assertions.assertTrue(tableIf.getConstraintsMap().isEmpty());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutput output = new DataOutputStream(outputStream);
for (Constraint value : new ArrayList<>(constraintMap.values())) {
JournalEntity journalEntity = new JournalEntity();
journalEntity.setData(new AlterConstraintLog(value, tableIf));
journalEntity.setOpCode(OperationType.OP_ADD_CONSTRAINT);
journalEntity.write(output);
}
InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
DataInput input = new DataInputStream(inputStream);
for (int i = 0; i < constraintMap.values().size(); i++) {
JournalEntity journalEntity = new JournalEntity();
journalEntity.readFields(input);
EditLog.loadJournal(Env.getCurrentEnv(), 0L, journalEntity);
}
Assertions.assertEquals(tableIf.getConstraintsMap(), constraintMap);
Env.getCurrentEnv().changeCatalog(connectContext, "internal");
}
@Test
void dropConstraintLogPersistForExternalTest() throws Exception {
Config.edit_log_type = "local";
createCatalog("create catalog es2 properties('type' = 'es', 'elasticsearch.hosts' = 'http://192.168.0.1',"
+ " 'elasticsearch.username' = 'user1');");
Env.getCurrentEnv().changeCatalog(connectContext, "es2");
EsExternalCatalog esCatalog = (EsExternalCatalog) getCatalog("es2");
EsExternalDatabase db = new EsExternalDatabase(esCatalog, 10002, "es_db1");
EsExternalTable tbl = new EsExternalTable(10003, "es_tbl1", "es_db1", esCatalog);
ImmutableList<Column> schema = ImmutableList.of(new Column("k1", PrimitiveType.INT));
tbl.setNewFullSchema(schema);
db.addTableForTest(tbl);
esCatalog.addDatabaseForTest(db);
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(esCatalog).addSchemaForTest(db.getFullName(), tbl.getName(), schema);
new MockUp<RelationUtil>() {
@Mock
public TableIf getTable(List<String> qualifierName, Env env) {
return tbl;
}
};
new MockUp<ExternalTable>() {
@Mock
public DatabaseIf getDatabase() {
return db;
}
};
new MockUp<TableIdentifier>() {
@Mock
public TableIf toTableIf() {
return tbl;
}
};
addConstraint("alter table es.es_db1.es_tbl1 add constraint pk primary key (k1)");
addConstraint("alter table es.es_db1.es_tbl1 add constraint uk unique (k1)");
TableIf tableIf = RelationUtil.getTable(
RelationUtil.getQualifierName(connectContext, Lists.newArrayList("test", "t1")),
connectContext.getEnv());
Map<String, Constraint> constraintMap = tableIf.getConstraintsMap();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutput output = new DataOutputStream(outputStream);
for (Constraint value : constraintMap.values()) {
JournalEntity journalEntity = new JournalEntity();
journalEntity.setData(new AlterConstraintLog(value, tableIf));
journalEntity.setOpCode(OperationType.OP_DROP_CONSTRAINT);
journalEntity.write(output);
}
InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
DataInput input = new DataInputStream(inputStream);
for (int i = 0; i < constraintMap.values().size(); i++) {
JournalEntity journalEntity = new JournalEntity();
journalEntity.readFields(input);
EditLog.loadJournal(Env.getCurrentEnv(), 0L, journalEntity);
}
Assertions.assertTrue(tableIf.getConstraintsMap().isEmpty());
Env.getCurrentEnv().changeCatalog(connectContext, "internal");
}
}

View File

@ -21,6 +21,7 @@ import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.analysis.AlterSqlBlockRuleStmt;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateCatalogStmt;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateFunctionStmt;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
@ -55,6 +56,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
@ -650,6 +652,15 @@ public abstract class TestWithFeService {
Env.getCurrentEnv().createTableAsSelect(createTableAsSelectStmt);
}
public void createCatalog(String sql) throws Exception {
CreateCatalogStmt stmt = (CreateCatalogStmt) parseAndAnalyzeStmt(sql, connectContext);
Env.getCurrentEnv().getCatalogMgr().createCatalog(stmt);
}
public CatalogIf getCatalog(String name) throws Exception {
return Env.getCurrentEnv().getCatalogMgr().getCatalog(name);
}
public void createTables(String... sqls) throws Exception {
createTables(false, sqls);
}