[feture](Nereids) support delete from without using (#28083)

support sql: DELETE FROM <table_name> WHERE <predicates>
This commit is contained in:
morrySnow
2023-12-14 13:54:33 +08:00
committed by GitHub
parent e5a57f82ec
commit bb23078d43
15 changed files with 563 additions and 113 deletions

View File

@ -18,6 +18,7 @@
package org.apache.doris.load;
import org.apache.doris.analysis.DeleteStmt;
import org.apache.doris.analysis.Predicate;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
@ -33,6 +34,7 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
@ -50,6 +52,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class DeleteHandler implements Writable {
@ -86,6 +89,66 @@ public class DeleteHandler implements Writable {
lock.writeLock().unlock();
}
/**
* use for Nereids process empty relation
*/
public void processEmptyRelation(QueryState execState) {
String sb = "{'label':'" + DeleteJob.DELETE_PREFIX + UUID.randomUUID()
+ "', 'txnId':'" + -1
+ "', 'status':'" + TransactionStatus.VISIBLE.name() + "'}";
execState.setOk(0, 0, sb);
}
/**
* used for Nereids planner
*/
public void process(Database targetDb, OlapTable targetTbl, List<String> partitionNames,
List<Predicate> deleteConditions, QueryState execState) {
DeleteJob deleteJob = null;
try {
targetTbl.readLock();
try {
if (targetTbl.getState() != OlapTable.OlapTableState.NORMAL) {
// table under alter operation can also do delete.
// just add a comment here to notice.
}
deleteJob = DeleteJob.newBuilder()
.buildWith(new DeleteJob.BuildParams(
targetDb,
targetTbl,
partitionNames,
deleteConditions));
long txnId = deleteJob.beginTxn();
TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(targetDb.getId(), txnId);
// must call this to make sure we only handle the tablet in the mIndex we saw here.
// table may be under schema change or rollup, and the newly created tablets will not be checked later,
// to make sure that the delete transaction can be done successfully.
txnState.addTableIndexes(targetTbl);
idToDeleteJob.put(txnId, deleteJob);
deleteJob.dispatch();
} finally {
targetTbl.readUnlock();
}
deleteJob.await();
String commitMsg = deleteJob.commit();
execState.setOk(0, 0, commitMsg);
} catch (Exception ex) {
if (deleteJob != null) {
deleteJob.cancel(ex.getMessage());
}
execState.setError(ex.getMessage());
} finally {
if (!FeConstants.runningUnitTest) {
clearJob(deleteJob);
}
}
}
/**
* used for legacy planner
*/
public void process(DeleteStmt stmt, QueryState execState) throws DdlException {
Database targetDb = Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDbName());
OlapTable targetTbl = targetDb.getOlapTableOrDdlException(stmt.getTableName());

View File

@ -326,7 +326,8 @@ import org.apache.doris.nereids.trees.plans.commands.Constraint;
import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.DeleteCommand;
import org.apache.doris.nereids.trees.plans.commands.DeleteFromCommand;
import org.apache.doris.nereids.trees.plans.commands.DeleteFromUsingCommand;
import org.apache.doris.nereids.trees.plans.commands.DropConstraintCommand;
import org.apache.doris.nereids.trees.plans.commands.DropMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
@ -477,24 +478,14 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
Optional<String> labelName = ctx.labelName == null ? Optional.empty() : Optional.of(ctx.labelName.getText());
List<String> colNames = ctx.cols == null ? ImmutableList.of() : visitIdentifierList(ctx.cols);
// TODO visit partitionSpecCtx
PartitionSpecContext partitionSpecCtx = ctx.partitionSpec();
List<String> partitions = ImmutableList.of();
boolean temporaryPartition = false;
if (partitionSpecCtx != null) {
temporaryPartition = partitionSpecCtx.TEMPORARY() != null;
if (partitionSpecCtx.partition != null) {
partitions = ImmutableList.of(partitionSpecCtx.partition.getText());
} else {
partitions = visitIdentifierList(partitionSpecCtx.partitions);
}
}
Pair<Boolean, List<String>> partitionSpec = visitPartitionSpec(ctx.partitionSpec());
LogicalPlan plan = ctx.query() != null ? visitQuery(ctx.query()) : visitInlineTable(ctx.inlineTable());
UnboundTableSink<?> sink = new UnboundTableSink<>(
tableName.build(),
colNames,
ImmutableList.of(),
temporaryPartition,
partitions,
partitionSpec.first,
partitionSpec.second,
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(),
DMLCommandType.INSERT,
plan);
@ -514,6 +505,24 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
return command;
}
/**
* return a pair, first will be true if partitions is temp partition, select is a list to present partition list.
*/
@Override
public Pair<Boolean, List<String>> visitPartitionSpec(PartitionSpecContext ctx) {
List<String> partitions = ImmutableList.of();
boolean temporaryPartition = false;
if (ctx != null) {
temporaryPartition = ctx.TEMPORARY() != null;
if (ctx.partition != null) {
partitions = ImmutableList.of(ctx.partition.getText());
} else {
partitions = visitIdentifierList(ctx.partitions);
}
}
return Pair.of(temporaryPartition, partitions);
}
@Override
public CreateMTMVCommand visitCreateMTMV(CreateMTMVContext ctx) {
List<String> nameParts = visitMultipartIdentifier(ctx.mvName);
@ -708,22 +717,28 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
@Override
public LogicalPlan visitDelete(DeleteContext ctx) {
List<String> tableName = visitMultipartIdentifier(ctx.tableName);
List<String> partitions = ctx.partition == null ? ImmutableList.of() : visitIdentifierList(ctx.partition);
Pair<Boolean, List<String>> partitionSpec = visitPartitionSpec(ctx.partitionSpec());
LogicalPlan query = withTableAlias(LogicalPlanBuilderAssistant.withCheckPolicy(
new UnboundRelation(StatementScopeIdGenerator.newRelationId(), tableName)), ctx.tableAlias());
if (ctx.USING() != null) {
query = withRelations(query, ctx.relation());
}
query = withFilter(query, Optional.of(ctx.whereClause()));
new UnboundRelation(StatementScopeIdGenerator.newRelationId(), tableName,
partitionSpec.second, partitionSpec.first)), ctx.tableAlias());
String tableAlias = null;
if (ctx.tableAlias().strictIdentifier() != null) {
tableAlias = ctx.tableAlias().getText();
}
Optional<LogicalPlan> cte = Optional.empty();
if (ctx.cte() != null) {
cte = Optional.ofNullable(withCte(query, ctx.cte()));
if (ctx.USING() == null && ctx.cte() == null && ctx.explain() == null) {
query = withFilter(query, Optional.of(ctx.whereClause()));
return new DeleteFromCommand(tableName, tableAlias, partitionSpec.first, partitionSpec.second, query);
} else {
// convert to insert into select
query = withRelations(query, ctx.relation());
query = withFilter(query, Optional.of(ctx.whereClause()));
Optional<LogicalPlan> cte = Optional.empty();
if (ctx.cte() != null) {
cte = Optional.ofNullable(withCte(query, ctx.cte()));
}
return withExplain(new DeleteFromUsingCommand(tableName, tableAlias,
partitionSpec.first, partitionSpec.second, query, cte), ctx.explain());
}
return withExplain(new DeleteCommand(tableName, tableAlias, partitions, query, cte), ctx.explain());
}
@Override
@ -2744,6 +2759,9 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
}
private LogicalPlan withRelations(LogicalPlan inputPlan, List<RelationContext> relations) {
if (relations == null) {
return inputPlan;
}
LogicalPlan left = inputPlan;
for (RelationContext relation : relations) {
// build left deep join tree

View File

@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.qe.ConnectContext;
import java.util.List;
/**
* delete from unique key table.
*/
public class CommandUtils {
/**
* check delete target table should unique key olap table. If ok, return it.
*/
public static OlapTable checkAndGetDeleteTargetTable(ConnectContext ctx, List<String> nameParts) {
List<String> qualifiedTableName = RelationUtil.getQualifierName(ctx, nameParts);
TableIf table = RelationUtil.getTable(qualifiedTableName, ctx.getEnv());
if (!(table instanceof OlapTable)) {
throw new AnalysisException("table must be olapTable in delete command");
}
OlapTable targetTable = ((OlapTable) table);
if (targetTable.getKeysType() != KeysType.UNIQUE_KEYS) {
throw new AnalysisException("Nereids only support delete command on unique key table now");
}
return targetTable;
}
}

View File

@ -0,0 +1,290 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.Predicate;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.InPredicate;
import org.apache.doris.nereids.trees.expressions.IsNull;
import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnary;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import com.google.common.base.Preconditions;
import org.apache.hadoop.util.Lists;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* delete from unique key table.
*/
public class DeleteFromCommand extends Command implements ForwardWithSync {
private final List<String> nameParts;
private final String tableAlias;
private final boolean isTempPart;
private final List<String> partitions;
private final LogicalPlan logicalQuery;
/**
* constructor
*/
public DeleteFromCommand(List<String> nameParts, String tableAlias,
boolean isTempPart, List<String> partitions, LogicalPlan logicalQuery) {
super(PlanType.DELETE_COMMAND);
this.nameParts = Utils.copyRequiredList(nameParts);
this.tableAlias = tableAlias;
this.isTempPart = isTempPart;
this.partitions = Utils.copyRequiredList(partitions);
this.logicalQuery = logicalQuery;
}
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
turnOffForbidUnknownStats(ctx.getSessionVariable());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
executor.setPlanner(planner);
executor.checkBlockRules();
// if fe could do fold constant to get delete will do nothing for table, just return.
if (planner.getPhysicalPlan() instanceof PhysicalEmptyRelation) {
Env.getCurrentEnv()
.getDeleteHandler().processEmptyRelation(ctx.getState());
return;
}
Optional<PhysicalFilter<?>> optFilter = (planner.getPhysicalPlan()
.<Set<PhysicalFilter<?>>>collect(PhysicalFilter.class::isInstance)).stream()
.findAny();
Optional<PhysicalOlapScan> optScan = (planner.getPhysicalPlan()
.<Set<PhysicalOlapScan>>collect(PhysicalOlapScan.class::isInstance)).stream()
.findAny();
Optional<UnboundRelation> optRelation = (logicalQuery
.<Set<UnboundRelation>>collect(UnboundRelation.class::isInstance)).stream()
.findAny();
Preconditions.checkArgument(optFilter.isPresent(), "delete command must contain filter");
Preconditions.checkArgument(optScan.isPresent(), "delete command could be only used on olap table");
Preconditions.checkArgument(optRelation.isPresent(), "delete command could be only used on olap table");
PhysicalOlapScan scan = optScan.get();
UnboundRelation relation = optRelation.get();
PhysicalFilter<?> filter = optFilter.get();
// predicate check
OlapTable olapTable = scan.getTable();
Set<String> columns = olapTable.getFullSchema().stream().map(Column::getName).collect(Collectors.toSet());
try {
Plan plan = planner.getPhysicalPlan();
checkSubQuery(plan);
for (Expression conjunct : filter.getConjuncts()) {
conjunct.<Set<SlotReference>>collect(SlotReference.class::isInstance)
.forEach(s -> checkColumn(columns, s, olapTable));
checkPredicate(conjunct);
}
} catch (Exception e) {
new DeleteFromUsingCommand(nameParts, tableAlias, isTempPart, partitions,
logicalQuery, Optional.empty()).run(ctx, executor);
return;
}
// call delete handler to process
List<Predicate> predicates = planner.getScanNodes().get(0).getConjuncts().stream()
.filter(c -> {
// filter predicate __DORIS_DELETE_SIGN__ = 0
List<Expr> slotRefs = Lists.newArrayList();
c.collect(SlotRef.class::isInstance, slotRefs);
return slotRefs.stream().map(SlotRef.class::cast)
.noneMatch(s -> Column.DELETE_SIGN.equalsIgnoreCase(s.getColumnName()));
})
.map(c -> {
if (c instanceof Predicate) {
return (Predicate) c;
} else {
throw new AnalysisException("non predicate in filter: " + c.toSql());
}
}).collect(Collectors.toList());
if (predicates.isEmpty()) {
// TODO this will delete all rows, however storage layer do not support true predicate now
// just throw exception to fallback until storage support true predicate.
throw new AnalysisException("delete all rows is forbidden temporary.");
}
Env.getCurrentEnv()
.getDeleteHandler()
.process((Database) scan.getDatabase(), scan.getTable(),
Lists.newArrayList(relation.getPartNames()), predicates, ctx.getState());
}
private void turnOffForbidUnknownStats(SessionVariable sessionVariable) {
sessionVariable.setIsSingleSetVar(true);
sessionVariable.setForbidUnownColStats(false);
}
private void checkColumn(Set<String> tableColumns, SlotReference slotReference, OlapTable table) {
// 0. must slot from table
if (!slotReference.getColumn().isPresent()) {
throw new AnalysisException("");
}
Column column = slotReference.getColumn().get();
if (Column.DELETE_SIGN.equalsIgnoreCase(column.getName())) {
return;
}
// 1. shadow column
if (Column.isShadowColumn(column.getName())) {
throw new AnalysisException("Can not apply delete condition to shadow column " + column.getName());
}
// 2. table has shadow column on table related to column in predicates
String shadowName = Column.getShadowName(column.getName());
if (tableColumns.contains(shadowName)) {
throw new AnalysisException(String.format("Column '%s' is under"
+ " schema change operation. Do not allow delete operation", shadowName));
}
// 3. check column is primitive type
// TODO(Now we can not push down non-scala type like array/map/struct to storage layer because of
// predict_column in be not support non-scala type, so we just should ban this type in delete predict, when
// we delete predict_column in be we should delete this ban)
if (!column.getType().isScalarType()) {
throw new AnalysisException(String.format("Can not apply delete condition to column type: "
+ column.getType()));
}
// 4. column should not float or double
if (slotReference.getDataType().isFloatLikeType()) {
throw new AnalysisException("Column[" + column.getName() + "] type is float or double.");
}
// 5. only contains key column if agg or mor
if (!column.isKey()) {
if (table.getKeysType() == KeysType.AGG_KEYS) {
throw new AnalysisException("delete predicate on value column only supports Unique table with"
+ " merge-on-write enabled and Duplicate table, but " + "Table[" + table.getName()
+ "] is an Aggregate table.");
} else if (table.getKeysType() == KeysType.UNIQUE_KEYS && !table.getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("delete predicate on value column only supports Unique table with"
+ " merge-on-write enabled and Duplicate table, but " + "Table[" + table.getName()
+ "] is an unique table without merge-on-write.");
}
}
}
private void checkSubQuery(Plan plan) {
while (true) {
if (!(plan instanceof PhysicalDistribute
|| plan instanceof PhysicalOlapScan
|| plan instanceof PhysicalProject
|| plan instanceof PhysicalFilter)) {
throw new AnalysisException("Where clause only supports compound predicate,"
+ " binary predicate, is_null predicate or in predicate.");
}
if (plan instanceof PhysicalOlapScan) {
break;
}
plan = ((PhysicalUnary<?>) plan).child();
}
}
private void checkPredicate(Expression predicate) {
if (predicate instanceof And) {
checkPredicate(((And) predicate).left());
checkPredicate(((And) predicate).right());
} else if (predicate instanceof ComparisonPredicate) {
ComparisonPredicate cp = (ComparisonPredicate) predicate;
if (!(cp.left() instanceof SlotReference)) {
throw new AnalysisException(
"Left expr of binary predicate should be column name, predicate: " + predicate.toSql()
+ ", left expr type:" + cp.left().getDataType());
}
if (!(cp.right() instanceof Literal)) {
throw new AnalysisException(
"Right expr of binary predicate should be value, predicate: " + predicate.toSql()
+ ", right expr type:" + cp.right().getDataType());
}
} else if (predicate instanceof IsNull) {
if (!(((IsNull) predicate).child() instanceof SlotReference)) {
throw new AnalysisException(
"Child expr of is_null predicate should be column name, predicate: " + predicate.toSql());
}
} else if (predicate instanceof Not) {
Expression child = ((Not) predicate).child();
if (child instanceof IsNull) {
if (!(((IsNull) child).child() instanceof SlotReference)) {
throw new AnalysisException(
"Child expr of is_null predicate should be column name, predicate: " + predicate.toSql());
}
} else {
throw new AnalysisException("Where clause only supports compound predicate,"
+ " binary predicate, is_null predicate or in predicate. But we meet "
+ child.toSql());
}
} else if (predicate instanceof InPredicate) {
InPredicate in = (InPredicate) predicate;
if (!(in.getCompareExpr() instanceof SlotReference)) {
throw new AnalysisException(
"Left expr of in predicate should be column name, predicate: " + predicate.toSql()
+ ", left expr type:" + in.getCompareExpr().getDataType());
}
int maxAllowedInElementNumOfDelete = Config.max_allowed_in_element_num_of_delete;
if (in.getOptions().size() > maxAllowedInElementNumOfDelete) {
throw new AnalysisException("Element num of in predicate should not be more than "
+ maxAllowedInElementNumOfDelete);
}
for (Expression option : in.getOptions()) {
if (!(option instanceof Literal)) {
throw new AnalysisException("Child of in predicate should be value, but get " + option);
}
}
} else {
throw new AnalysisException("Where clause only supports compound predicate,"
+ " binary predicate, is_null predicate or in predicate. But we meet "
+ predicate.toSql());
}
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitDeleteFromCommand(this, context);
}
}

View File

@ -18,12 +18,10 @@
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
@ -34,9 +32,9 @@ import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import com.google.common.collect.ImmutableList;
@ -48,50 +46,44 @@ import java.util.Optional;
/**
* delete from unique key table.
*/
public class DeleteCommand extends Command implements ForwardWithSync, Explainable {
public class DeleteFromUsingCommand extends Command implements ForwardWithSync, Explainable {
private final List<String> nameParts;
private final String tableAlias;
private final boolean isTempPart;
private final List<String> partitions;
private LogicalPlan logicalQuery;
private OlapTable targetTable;
private final Optional<LogicalPlan> cte;
private final LogicalPlan logicalQuery;
/**
* constructor
*/
public DeleteCommand(List<String> nameParts, String tableAlias, List<String> partitions,
LogicalPlan logicalQuery, Optional<LogicalPlan> cte) {
public DeleteFromUsingCommand(List<String> nameParts, String tableAlias,
boolean isTempPart, List<String> partitions, LogicalPlan logicalQuery, Optional<LogicalPlan> cte) {
super(PlanType.DELETE_COMMAND);
this.nameParts = Utils.copyRequiredList(nameParts);
this.tableAlias = tableAlias;
this.isTempPart = isTempPart;
this.partitions = Utils.copyRequiredList(partitions);
this.logicalQuery = logicalQuery;
this.cte = cte;
this.logicalQuery = logicalQuery;
}
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
if (ctx.getSessionVariable().isInDebugMode()) {
throw new AnalysisException("Delete is forbidden since current session is in debug mode."
+ " Please check the following session variables: "
+ String.join(", ", SessionVariable.DEBUG_VARIABLES));
}
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty()).run(ctx, executor);
}
private void checkTable(ConnectContext ctx) {
List<String> qualifieredTableName = RelationUtil.getQualifierName(ctx, nameParts);
TableIf table = RelationUtil.getTable(qualifieredTableName, ctx.getEnv());
if (!(table instanceof OlapTable)) {
throw new AnalysisException("table must be olapTable in delete command");
}
targetTable = ((OlapTable) table);
if (targetTable.getKeysType() != KeysType.UNIQUE_KEYS) {
throw new AnalysisException("Nereids only support delete command on unique key table now");
}
}
/**
* public for test
*/
public LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuery) {
checkTable(ctx);
OlapTable targetTable = CommandUtils.checkAndGetDeleteTargetTable(ctx, nameParts);
// add select and insert node.
List<NamedExpression> selectLists = Lists.newArrayList();
List<String> cols = Lists.newArrayList();
@ -122,7 +114,7 @@ public class DeleteCommand extends Command implements ForwardWithSync, Explainab
// make UnboundTableSink
return new UnboundTableSink<>(nameParts, cols, ImmutableList.of(),
false, partitions, isPartialUpdate, DMLCommandType.DELETE, logicalQuery);
isTempPart, partitions, isPartialUpdate, DMLCommandType.DELETE, logicalQuery);
}
public LogicalPlan getLogicalQuery() {
@ -136,6 +128,6 @@ public class DeleteCommand extends Command implements ForwardWithSync, Explainab
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitDeleteCommand(this, context);
return visitor.visitDeleteFromUsingCommand(this, context);
}
}

View File

@ -31,7 +31,6 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.TreeNode;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
@ -124,15 +123,18 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
executor.setPlanner(planner);
executor.checkBlockRules();
if (ctx.getMysqlChannel() != null) {
ctx.getMysqlChannel().reset();
}
Optional<TreeNode<?>> plan = (planner.getPhysicalPlan()
.<Set<TreeNode<?>>>collect(node -> node instanceof PhysicalOlapTableSink)).stream().findAny();
// TODO: support other type table insert into
Optional<PhysicalOlapTableSink<?>> plan = (planner.getPhysicalPlan()
.<Set<PhysicalOlapTableSink<?>>>collect(PhysicalOlapTableSink.class::isInstance)).stream()
.findAny();
Preconditions.checkArgument(plan.isPresent(), "insert into command must contain OlapTableSinkNode");
physicalOlapTableSink = ((PhysicalOlapTableSink<?>) plan.get());
physicalOlapTableSink = plan.get();
Table targetTable = physicalOlapTableSink.getTargetTable();
// check auth

View File

@ -25,7 +25,8 @@ import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.DeleteCommand;
import org.apache.doris.nereids.trees.plans.commands.DeleteFromCommand;
import org.apache.doris.nereids.trees.plans.commands.DeleteFromUsingCommand;
import org.apache.doris.nereids.trees.plans.commands.DropConstraintCommand;
import org.apache.doris.nereids.trees.plans.commands.DropMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
@ -68,8 +69,12 @@ public interface CommandVisitor<R, C> {
return visitCommand(updateCommand, context);
}
default R visitDeleteCommand(DeleteCommand deleteCommand, C context) {
return visitCommand(deleteCommand, context);
default R visitDeleteFromCommand(DeleteFromCommand deleteFromCommand, C context) {
return visitCommand(deleteFromCommand, context);
}
default R visitDeleteFromUsingCommand(DeleteFromUsingCommand deleteFromUsingCommand, C context) {
return visitCommand(deleteFromUsingCommand, context);
}
default R visitLoadCommand(LoadCommand loadCommand, C context) {

View File

@ -195,6 +195,10 @@ public class DatabaseTransactionMgr {
return dbId;
}
public TransactionIdGenerator getIdGenerator() {
return idGenerator;
}
protected TransactionState getTransactionState(Long transactionId) {
readLock();
try {