From bb23078d433bd162b6f35521435508c65b113c59 Mon Sep 17 00:00:00 2001 From: morrySnow <101034200+morrySnow@users.noreply.github.com> Date: Thu, 14 Dec 2023 13:54:33 +0800 Subject: [PATCH] [feture](Nereids) support delete from without using (#28083) support sql: DELETE FROM WHERE --- be/src/olap/utils.cpp | 7 +- .../org/apache/doris/nereids/DorisParser.g4 | 6 +- fe/fe-core/src/main/cup/sql_parser.cup | 2 +- .../org/apache/doris/load/DeleteHandler.java | 63 ++++ .../nereids/parser/LogicalPlanBuilder.java | 66 ++-- .../trees/plans/commands/CommandUtils.java | 49 +++ .../plans/commands/DeleteFromCommand.java | 290 ++++++++++++++++++ ...mmand.java => DeleteFromUsingCommand.java} | 44 ++- .../commands/InsertIntoTableCommand.java | 10 +- .../trees/plans/visitor/CommandVisitor.java | 11 +- .../transaction/DatabaseTransactionMgr.java | 4 + ...t.java => DeleteFromUsingCommandTest.java} | 8 +- .../data/nereids_p0/delete/delete_using.out | 44 ++- .../nereids_p0/delete/delete_using.groovy | 60 ++-- .../insert_into_table/no_partition.groovy | 12 + 15 files changed, 563 insertions(+), 113 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CommandUtils.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromCommand.java rename fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/{DeleteCommand.java => DeleteFromUsingCommand.java} (79%) rename fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/{DeleteCommandTest.java => DeleteFromUsingCommandTest.java} (93%) diff --git a/be/src/olap/utils.cpp b/be/src/olap/utils.cpp index 9da96f2962..bcdf015da8 100644 --- a/be/src/olap/utils.cpp +++ b/be/src/olap/utils.cpp @@ -624,9 +624,10 @@ bool valid_datetime(const std::string& value_str, const uint32_t scale) { LOG(WARNING) << "invalid microsecond. [microsecond=" << what[9].str() << "]"; return false; } - - long ms = strtol(what[9].str().c_str(), nullptr, 10); - if (ms % ((long)std::pow(10, 6 - scale)) != 0) { + auto s9 = what[9].str(); + s9.resize(6, '0'); + if (const long ms = strtol(s9.c_str(), nullptr, 10); + ms % static_cast(std::pow(10, 6 - scale)) != 0) { LOG(WARNING) << "invalid microsecond. [microsecond=" << what[9].str() << ", scale = " << scale << "]"; return false; diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index e88773c56b..674e455097 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -62,9 +62,9 @@ statement SET updateAssignmentSeq fromClause? whereClause #update - | explain? cte? DELETE FROM tableName=multipartIdentifier tableAlias - (PARTITION partition=identifierList)? - (USING relation (COMMA relation)*) + | explain? cte? DELETE FROM tableName=multipartIdentifier + partitionSpec? tableAlias + (USING relation (COMMA relation)*)? whereClause #delete | LOAD LABEL lableName=identifier LEFT_PAREN dataDescs+=dataDesc (COMMA dataDescs+=dataDesc)* RIGHT_PAREN diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index a81f8cf93c..fbd657dbcc 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -4441,7 +4441,7 @@ cancel_param ::= // Delete stmt delete_stmt ::= - KW_DELETE KW_FROM table_name:table opt_table_alias:alias opt_partition_names:partitionNames opt_using_clause:fromClause where_clause:wherePredicate + KW_DELETE KW_FROM table_name:table opt_partition_names:partitionNames opt_table_alias:alias opt_using_clause:fromClause where_clause:wherePredicate {: RESULT = new DeleteStmt(new TableRef(table, alias), partitionNames, fromClause, wherePredicate); :} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java index ea3f786d0f..815bf91f90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java @@ -18,6 +18,7 @@ package org.apache.doris.load; import org.apache.doris.analysis.DeleteStmt; +import org.apache.doris.analysis.Predicate; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; @@ -33,6 +34,7 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState; import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; import com.google.common.base.Joiner; import com.google.common.collect.Lists; @@ -50,6 +52,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.UUID; import java.util.concurrent.locks.ReentrantReadWriteLock; public class DeleteHandler implements Writable { @@ -86,6 +89,66 @@ public class DeleteHandler implements Writable { lock.writeLock().unlock(); } + /** + * use for Nereids process empty relation + */ + public void processEmptyRelation(QueryState execState) { + String sb = "{'label':'" + DeleteJob.DELETE_PREFIX + UUID.randomUUID() + + "', 'txnId':'" + -1 + + "', 'status':'" + TransactionStatus.VISIBLE.name() + "'}"; + execState.setOk(0, 0, sb); + } + + /** + * used for Nereids planner + */ + public void process(Database targetDb, OlapTable targetTbl, List partitionNames, + List deleteConditions, QueryState execState) { + DeleteJob deleteJob = null; + try { + targetTbl.readLock(); + try { + if (targetTbl.getState() != OlapTable.OlapTableState.NORMAL) { + // table under alter operation can also do delete. + // just add a comment here to notice. + } + deleteJob = DeleteJob.newBuilder() + .buildWith(new DeleteJob.BuildParams( + targetDb, + targetTbl, + partitionNames, + deleteConditions)); + + long txnId = deleteJob.beginTxn(); + TransactionState txnState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(targetDb.getId(), txnId); + // must call this to make sure we only handle the tablet in the mIndex we saw here. + // table may be under schema change or rollup, and the newly created tablets will not be checked later, + // to make sure that the delete transaction can be done successfully. + txnState.addTableIndexes(targetTbl); + idToDeleteJob.put(txnId, deleteJob); + deleteJob.dispatch(); + } finally { + targetTbl.readUnlock(); + } + deleteJob.await(); + String commitMsg = deleteJob.commit(); + execState.setOk(0, 0, commitMsg); + } catch (Exception ex) { + if (deleteJob != null) { + deleteJob.cancel(ex.getMessage()); + } + execState.setError(ex.getMessage()); + } finally { + if (!FeConstants.runningUnitTest) { + clearJob(deleteJob); + } + } + } + + /** + * used for legacy planner + */ public void process(DeleteStmt stmt, QueryState execState) throws DdlException { Database targetDb = Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDbName()); OlapTable targetTbl = targetDb.getOlapTableOrDdlException(stmt.getTableName()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index eba033e816..1c4d847e3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -326,7 +326,8 @@ import org.apache.doris.nereids.trees.plans.commands.Constraint; import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand; import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand; -import org.apache.doris.nereids.trees.plans.commands.DeleteCommand; +import org.apache.doris.nereids.trees.plans.commands.DeleteFromCommand; +import org.apache.doris.nereids.trees.plans.commands.DeleteFromUsingCommand; import org.apache.doris.nereids.trees.plans.commands.DropConstraintCommand; import org.apache.doris.nereids.trees.plans.commands.DropMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand; @@ -477,24 +478,14 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor { Optional labelName = ctx.labelName == null ? Optional.empty() : Optional.of(ctx.labelName.getText()); List colNames = ctx.cols == null ? ImmutableList.of() : visitIdentifierList(ctx.cols); // TODO visit partitionSpecCtx - PartitionSpecContext partitionSpecCtx = ctx.partitionSpec(); - List partitions = ImmutableList.of(); - boolean temporaryPartition = false; - if (partitionSpecCtx != null) { - temporaryPartition = partitionSpecCtx.TEMPORARY() != null; - if (partitionSpecCtx.partition != null) { - partitions = ImmutableList.of(partitionSpecCtx.partition.getText()); - } else { - partitions = visitIdentifierList(partitionSpecCtx.partitions); - } - } + Pair> partitionSpec = visitPartitionSpec(ctx.partitionSpec()); LogicalPlan plan = ctx.query() != null ? visitQuery(ctx.query()) : visitInlineTable(ctx.inlineTable()); UnboundTableSink sink = new UnboundTableSink<>( tableName.build(), colNames, ImmutableList.of(), - temporaryPartition, - partitions, + partitionSpec.first, + partitionSpec.second, ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(), DMLCommandType.INSERT, plan); @@ -514,6 +505,24 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor { return command; } + /** + * return a pair, first will be true if partitions is temp partition, select is a list to present partition list. + */ + @Override + public Pair> visitPartitionSpec(PartitionSpecContext ctx) { + List partitions = ImmutableList.of(); + boolean temporaryPartition = false; + if (ctx != null) { + temporaryPartition = ctx.TEMPORARY() != null; + if (ctx.partition != null) { + partitions = ImmutableList.of(ctx.partition.getText()); + } else { + partitions = visitIdentifierList(ctx.partitions); + } + } + return Pair.of(temporaryPartition, partitions); + } + @Override public CreateMTMVCommand visitCreateMTMV(CreateMTMVContext ctx) { List nameParts = visitMultipartIdentifier(ctx.mvName); @@ -708,22 +717,28 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor { @Override public LogicalPlan visitDelete(DeleteContext ctx) { List tableName = visitMultipartIdentifier(ctx.tableName); - List partitions = ctx.partition == null ? ImmutableList.of() : visitIdentifierList(ctx.partition); + Pair> partitionSpec = visitPartitionSpec(ctx.partitionSpec()); LogicalPlan query = withTableAlias(LogicalPlanBuilderAssistant.withCheckPolicy( - new UnboundRelation(StatementScopeIdGenerator.newRelationId(), tableName)), ctx.tableAlias()); - if (ctx.USING() != null) { - query = withRelations(query, ctx.relation()); - } - query = withFilter(query, Optional.of(ctx.whereClause())); + new UnboundRelation(StatementScopeIdGenerator.newRelationId(), tableName, + partitionSpec.second, partitionSpec.first)), ctx.tableAlias()); String tableAlias = null; if (ctx.tableAlias().strictIdentifier() != null) { tableAlias = ctx.tableAlias().getText(); } - Optional cte = Optional.empty(); - if (ctx.cte() != null) { - cte = Optional.ofNullable(withCte(query, ctx.cte())); + if (ctx.USING() == null && ctx.cte() == null && ctx.explain() == null) { + query = withFilter(query, Optional.of(ctx.whereClause())); + return new DeleteFromCommand(tableName, tableAlias, partitionSpec.first, partitionSpec.second, query); + } else { + // convert to insert into select + query = withRelations(query, ctx.relation()); + query = withFilter(query, Optional.of(ctx.whereClause())); + Optional cte = Optional.empty(); + if (ctx.cte() != null) { + cte = Optional.ofNullable(withCte(query, ctx.cte())); + } + return withExplain(new DeleteFromUsingCommand(tableName, tableAlias, + partitionSpec.first, partitionSpec.second, query, cte), ctx.explain()); } - return withExplain(new DeleteCommand(tableName, tableAlias, partitions, query, cte), ctx.explain()); } @Override @@ -2744,6 +2759,9 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor { } private LogicalPlan withRelations(LogicalPlan inputPlan, List relations) { + if (relations == null) { + return inputPlan; + } LogicalPlan left = inputPlan; for (RelationContext relation : relations) { // build left deep join tree diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CommandUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CommandUtils.java new file mode 100644 index 0000000000..f9b0c3e18d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CommandUtils.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.util.RelationUtil; +import org.apache.doris.qe.ConnectContext; + +import java.util.List; + +/** + * delete from unique key table. + */ +public class CommandUtils { + + /** + * check delete target table should unique key olap table. If ok, return it. + */ + public static OlapTable checkAndGetDeleteTargetTable(ConnectContext ctx, List nameParts) { + List qualifiedTableName = RelationUtil.getQualifierName(ctx, nameParts); + TableIf table = RelationUtil.getTable(qualifiedTableName, ctx.getEnv()); + if (!(table instanceof OlapTable)) { + throw new AnalysisException("table must be olapTable in delete command"); + } + OlapTable targetTable = ((OlapTable) table); + if (targetTable.getKeysType() != KeysType.UNIQUE_KEYS) { + throw new AnalysisException("Nereids only support delete command on unique key table now"); + } + return targetTable; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromCommand.java new file mode 100644 index 0000000000..f77cdd1cc8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromCommand.java @@ -0,0 +1,290 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.Predicate; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.Config; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.analyzer.UnboundRelation; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.trees.expressions.And; +import org.apache.doris.nereids.trees.expressions.ComparisonPredicate; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.InPredicate; +import org.apache.doris.nereids.trees.expressions.IsNull; +import org.apache.doris.nereids.trees.expressions.Not; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; +import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; +import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; +import org.apache.doris.nereids.trees.plans.physical.PhysicalUnary; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.qe.StmtExecutor; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.util.Lists; + +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * delete from unique key table. + */ +public class DeleteFromCommand extends Command implements ForwardWithSync { + + private final List nameParts; + private final String tableAlias; + private final boolean isTempPart; + private final List partitions; + private final LogicalPlan logicalQuery; + + /** + * constructor + */ + public DeleteFromCommand(List nameParts, String tableAlias, + boolean isTempPart, List partitions, LogicalPlan logicalQuery) { + super(PlanType.DELETE_COMMAND); + this.nameParts = Utils.copyRequiredList(nameParts); + this.tableAlias = tableAlias; + this.isTempPart = isTempPart; + this.partitions = Utils.copyRequiredList(partitions); + this.logicalQuery = logicalQuery; + } + + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext()); + turnOffForbidUnknownStats(ctx.getSessionVariable()); + NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); + planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift()); + executor.setPlanner(planner); + executor.checkBlockRules(); + // if fe could do fold constant to get delete will do nothing for table, just return. + if (planner.getPhysicalPlan() instanceof PhysicalEmptyRelation) { + Env.getCurrentEnv() + .getDeleteHandler().processEmptyRelation(ctx.getState()); + return; + } + Optional> optFilter = (planner.getPhysicalPlan() + .>>collect(PhysicalFilter.class::isInstance)).stream() + .findAny(); + Optional optScan = (planner.getPhysicalPlan() + .>collect(PhysicalOlapScan.class::isInstance)).stream() + .findAny(); + Optional optRelation = (logicalQuery + .>collect(UnboundRelation.class::isInstance)).stream() + .findAny(); + Preconditions.checkArgument(optFilter.isPresent(), "delete command must contain filter"); + Preconditions.checkArgument(optScan.isPresent(), "delete command could be only used on olap table"); + Preconditions.checkArgument(optRelation.isPresent(), "delete command could be only used on olap table"); + PhysicalOlapScan scan = optScan.get(); + UnboundRelation relation = optRelation.get(); + PhysicalFilter filter = optFilter.get(); + + // predicate check + OlapTable olapTable = scan.getTable(); + Set columns = olapTable.getFullSchema().stream().map(Column::getName).collect(Collectors.toSet()); + try { + Plan plan = planner.getPhysicalPlan(); + checkSubQuery(plan); + for (Expression conjunct : filter.getConjuncts()) { + conjunct.>collect(SlotReference.class::isInstance) + .forEach(s -> checkColumn(columns, s, olapTable)); + checkPredicate(conjunct); + } + } catch (Exception e) { + new DeleteFromUsingCommand(nameParts, tableAlias, isTempPart, partitions, + logicalQuery, Optional.empty()).run(ctx, executor); + return; + } + + // call delete handler to process + List predicates = planner.getScanNodes().get(0).getConjuncts().stream() + .filter(c -> { + // filter predicate __DORIS_DELETE_SIGN__ = 0 + List slotRefs = Lists.newArrayList(); + c.collect(SlotRef.class::isInstance, slotRefs); + return slotRefs.stream().map(SlotRef.class::cast) + .noneMatch(s -> Column.DELETE_SIGN.equalsIgnoreCase(s.getColumnName())); + }) + .map(c -> { + if (c instanceof Predicate) { + return (Predicate) c; + } else { + throw new AnalysisException("non predicate in filter: " + c.toSql()); + } + }).collect(Collectors.toList()); + if (predicates.isEmpty()) { + // TODO this will delete all rows, however storage layer do not support true predicate now + // just throw exception to fallback until storage support true predicate. + throw new AnalysisException("delete all rows is forbidden temporary."); + } + Env.getCurrentEnv() + .getDeleteHandler() + .process((Database) scan.getDatabase(), scan.getTable(), + Lists.newArrayList(relation.getPartNames()), predicates, ctx.getState()); + } + + private void turnOffForbidUnknownStats(SessionVariable sessionVariable) { + sessionVariable.setIsSingleSetVar(true); + sessionVariable.setForbidUnownColStats(false); + } + + private void checkColumn(Set tableColumns, SlotReference slotReference, OlapTable table) { + // 0. must slot from table + if (!slotReference.getColumn().isPresent()) { + throw new AnalysisException(""); + } + Column column = slotReference.getColumn().get(); + + if (Column.DELETE_SIGN.equalsIgnoreCase(column.getName())) { + return; + } + // 1. shadow column + if (Column.isShadowColumn(column.getName())) { + throw new AnalysisException("Can not apply delete condition to shadow column " + column.getName()); + } + // 2. table has shadow column on table related to column in predicates + String shadowName = Column.getShadowName(column.getName()); + if (tableColumns.contains(shadowName)) { + throw new AnalysisException(String.format("Column '%s' is under" + + " schema change operation. Do not allow delete operation", shadowName)); + } + // 3. check column is primitive type + // TODO(Now we can not push down non-scala type like array/map/struct to storage layer because of + // predict_column in be not support non-scala type, so we just should ban this type in delete predict, when + // we delete predict_column in be we should delete this ban) + if (!column.getType().isScalarType()) { + throw new AnalysisException(String.format("Can not apply delete condition to column type: " + + column.getType())); + } + // 4. column should not float or double + if (slotReference.getDataType().isFloatLikeType()) { + throw new AnalysisException("Column[" + column.getName() + "] type is float or double."); + } + // 5. only contains key column if agg or mor + if (!column.isKey()) { + if (table.getKeysType() == KeysType.AGG_KEYS) { + throw new AnalysisException("delete predicate on value column only supports Unique table with" + + " merge-on-write enabled and Duplicate table, but " + "Table[" + table.getName() + + "] is an Aggregate table."); + } else if (table.getKeysType() == KeysType.UNIQUE_KEYS && !table.getEnableUniqueKeyMergeOnWrite()) { + throw new AnalysisException("delete predicate on value column only supports Unique table with" + + " merge-on-write enabled and Duplicate table, but " + "Table[" + table.getName() + + "] is an unique table without merge-on-write."); + } + } + } + + private void checkSubQuery(Plan plan) { + while (true) { + if (!(plan instanceof PhysicalDistribute + || plan instanceof PhysicalOlapScan + || plan instanceof PhysicalProject + || plan instanceof PhysicalFilter)) { + throw new AnalysisException("Where clause only supports compound predicate," + + " binary predicate, is_null predicate or in predicate."); + } + if (plan instanceof PhysicalOlapScan) { + break; + } + plan = ((PhysicalUnary) plan).child(); + } + } + + private void checkPredicate(Expression predicate) { + if (predicate instanceof And) { + checkPredicate(((And) predicate).left()); + checkPredicate(((And) predicate).right()); + } else if (predicate instanceof ComparisonPredicate) { + ComparisonPredicate cp = (ComparisonPredicate) predicate; + if (!(cp.left() instanceof SlotReference)) { + throw new AnalysisException( + "Left expr of binary predicate should be column name, predicate: " + predicate.toSql() + + ", left expr type:" + cp.left().getDataType()); + } + if (!(cp.right() instanceof Literal)) { + throw new AnalysisException( + "Right expr of binary predicate should be value, predicate: " + predicate.toSql() + + ", right expr type:" + cp.right().getDataType()); + } + } else if (predicate instanceof IsNull) { + if (!(((IsNull) predicate).child() instanceof SlotReference)) { + throw new AnalysisException( + "Child expr of is_null predicate should be column name, predicate: " + predicate.toSql()); + } + } else if (predicate instanceof Not) { + Expression child = ((Not) predicate).child(); + if (child instanceof IsNull) { + if (!(((IsNull) child).child() instanceof SlotReference)) { + throw new AnalysisException( + "Child expr of is_null predicate should be column name, predicate: " + predicate.toSql()); + } + } else { + throw new AnalysisException("Where clause only supports compound predicate," + + " binary predicate, is_null predicate or in predicate. But we meet " + + child.toSql()); + } + } else if (predicate instanceof InPredicate) { + InPredicate in = (InPredicate) predicate; + if (!(in.getCompareExpr() instanceof SlotReference)) { + throw new AnalysisException( + "Left expr of in predicate should be column name, predicate: " + predicate.toSql() + + ", left expr type:" + in.getCompareExpr().getDataType()); + } + int maxAllowedInElementNumOfDelete = Config.max_allowed_in_element_num_of_delete; + if (in.getOptions().size() > maxAllowedInElementNumOfDelete) { + throw new AnalysisException("Element num of in predicate should not be more than " + + maxAllowedInElementNumOfDelete); + } + for (Expression option : in.getOptions()) { + if (!(option instanceof Literal)) { + throw new AnalysisException("Child of in predicate should be value, but get " + option); + } + } + } else { + throw new AnalysisException("Where clause only supports compound predicate," + + " binary predicate, is_null predicate or in predicate. But we meet " + + predicate.toSql()); + } + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitDeleteFromCommand(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java similarity index 79% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteCommand.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java index 8c0568724b..8fadeb3b14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java @@ -18,12 +18,10 @@ package org.apache.doris.nereids.trees.plans.commands; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundTableSink; -import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral; @@ -34,9 +32,9 @@ import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; -import org.apache.doris.nereids.util.RelationUtil; import org.apache.doris.nereids.util.Utils; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; import com.google.common.collect.ImmutableList; @@ -48,50 +46,44 @@ import java.util.Optional; /** * delete from unique key table. */ -public class DeleteCommand extends Command implements ForwardWithSync, Explainable { +public class DeleteFromUsingCommand extends Command implements ForwardWithSync, Explainable { + private final List nameParts; private final String tableAlias; + private final boolean isTempPart; private final List partitions; - private LogicalPlan logicalQuery; - private OlapTable targetTable; private final Optional cte; + private final LogicalPlan logicalQuery; /** * constructor */ - public DeleteCommand(List nameParts, String tableAlias, List partitions, - LogicalPlan logicalQuery, Optional cte) { + public DeleteFromUsingCommand(List nameParts, String tableAlias, + boolean isTempPart, List partitions, LogicalPlan logicalQuery, Optional cte) { super(PlanType.DELETE_COMMAND); this.nameParts = Utils.copyRequiredList(nameParts); this.tableAlias = tableAlias; + this.isTempPart = isTempPart; this.partitions = Utils.copyRequiredList(partitions); - this.logicalQuery = logicalQuery; this.cte = cte; + this.logicalQuery = logicalQuery; } @Override public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + if (ctx.getSessionVariable().isInDebugMode()) { + throw new AnalysisException("Delete is forbidden since current session is in debug mode." + + " Please check the following session variables: " + + String.join(", ", SessionVariable.DEBUG_VARIABLES)); + } new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty()).run(ctx, executor); } - private void checkTable(ConnectContext ctx) { - List qualifieredTableName = RelationUtil.getQualifierName(ctx, nameParts); - TableIf table = RelationUtil.getTable(qualifieredTableName, ctx.getEnv()); - if (!(table instanceof OlapTable)) { - throw new AnalysisException("table must be olapTable in delete command"); - } - targetTable = ((OlapTable) table); - if (targetTable.getKeysType() != KeysType.UNIQUE_KEYS) { - throw new AnalysisException("Nereids only support delete command on unique key table now"); - } - } - /** * public for test */ public LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuery) { - checkTable(ctx); - + OlapTable targetTable = CommandUtils.checkAndGetDeleteTargetTable(ctx, nameParts); // add select and insert node. List selectLists = Lists.newArrayList(); List cols = Lists.newArrayList(); @@ -122,7 +114,7 @@ public class DeleteCommand extends Command implements ForwardWithSync, Explainab // make UnboundTableSink return new UnboundTableSink<>(nameParts, cols, ImmutableList.of(), - false, partitions, isPartialUpdate, DMLCommandType.DELETE, logicalQuery); + isTempPart, partitions, isPartialUpdate, DMLCommandType.DELETE, logicalQuery); } public LogicalPlan getLogicalQuery() { @@ -136,6 +128,6 @@ public class DeleteCommand extends Command implements ForwardWithSync, Explainab @Override public R accept(PlanVisitor visitor, C context) { - return visitor.visitDeleteCommand(this, context); + return visitor.visitDeleteFromUsingCommand(this, context); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java index d555bede4c..9dcba2ce9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java @@ -31,7 +31,6 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.glue.LogicalPlanAdapter; -import org.apache.doris.nereids.trees.TreeNode; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Explainable; import org.apache.doris.nereids.trees.plans.Plan; @@ -124,15 +123,18 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext()); NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift()); + executor.setPlanner(planner); executor.checkBlockRules(); if (ctx.getMysqlChannel() != null) { ctx.getMysqlChannel().reset(); } - Optional> plan = (planner.getPhysicalPlan() - .>>collect(node -> node instanceof PhysicalOlapTableSink)).stream().findAny(); + // TODO: support other type table insert into + Optional> plan = (planner.getPhysicalPlan() + .>>collect(PhysicalOlapTableSink.class::isInstance)).stream() + .findAny(); Preconditions.checkArgument(plan.isPresent(), "insert into command must contain OlapTableSinkNode"); - physicalOlapTableSink = ((PhysicalOlapTableSink) plan.get()); + physicalOlapTableSink = plan.get(); Table targetTable = physicalOlapTableSink.getTargetTable(); // check auth diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index c67606b0ea..e67b78bb4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -25,7 +25,8 @@ import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand; import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand; -import org.apache.doris.nereids.trees.plans.commands.DeleteCommand; +import org.apache.doris.nereids.trees.plans.commands.DeleteFromCommand; +import org.apache.doris.nereids.trees.plans.commands.DeleteFromUsingCommand; import org.apache.doris.nereids.trees.plans.commands.DropConstraintCommand; import org.apache.doris.nereids.trees.plans.commands.DropMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand; @@ -68,8 +69,12 @@ public interface CommandVisitor { return visitCommand(updateCommand, context); } - default R visitDeleteCommand(DeleteCommand deleteCommand, C context) { - return visitCommand(deleteCommand, context); + default R visitDeleteFromCommand(DeleteFromCommand deleteFromCommand, C context) { + return visitCommand(deleteFromCommand, context); + } + + default R visitDeleteFromUsingCommand(DeleteFromUsingCommand deleteFromUsingCommand, C context) { + return visitCommand(deleteFromUsingCommand, context); } default R visitLoadCommand(LoadCommand loadCommand, C context) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 83b4d16411..6ef7d3e70a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -195,6 +195,10 @@ public class DatabaseTransactionMgr { return dbId; } + public TransactionIdGenerator getIdGenerator() { + return idGenerator; + } + protected TransactionState getTransactionState(Long transactionId) { readLock(); try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/DeleteCommandTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/DeleteFromUsingCommandTest.java similarity index 93% rename from fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/DeleteCommandTest.java rename to fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/DeleteFromUsingCommandTest.java index 34cff09554..74b9ad16f2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/DeleteCommandTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/DeleteFromUsingCommandTest.java @@ -19,7 +19,7 @@ package org.apache.doris.nereids.trees.plans; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.parser.NereidsParser; -import org.apache.doris.nereids.trees.plans.commands.DeleteCommand; +import org.apache.doris.nereids.trees.plans.commands.DeleteFromUsingCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.util.PlanChecker; import org.apache.doris.nereids.util.PlanPatternMatchSupported; @@ -28,7 +28,7 @@ import org.apache.doris.utframe.TestWithFeService; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class DeleteCommandTest extends TestWithFeService implements PlanPatternMatchSupported { +public class DeleteFromUsingCommandTest extends TestWithFeService implements PlanPatternMatchSupported { @Override protected void runBeforeAll() throws Exception { createDatabase("test"); @@ -73,8 +73,8 @@ public class DeleteCommandTest extends TestWithFeService implements PlanPatternM public void testFromClauseDelete() throws AnalysisException { String sql = "delete from t1 a using src join t2 on src.k1 = t2.k1 where t2.k1 = a.k1"; LogicalPlan parsed = new NereidsParser().parseSingle(sql); - Assertions.assertTrue(parsed instanceof DeleteCommand); - DeleteCommand command = ((DeleteCommand) parsed); + Assertions.assertTrue(parsed instanceof DeleteFromUsingCommand); + DeleteFromUsingCommand command = ((DeleteFromUsingCommand) parsed); LogicalPlan plan = command.completeQueryPlan(connectContext, command.getLogicalQuery()); PlanChecker.from(connectContext, plan) .analyze(plan) diff --git a/regression-test/data/nereids_p0/delete/delete_using.out b/regression-test/data/nereids_p0/delete/delete_using.out index 2734e5b4a3..fa3a048e0e 100644 --- a/regression-test/data/nereids_p0/delete/delete_using.out +++ b/regression-test/data/nereids_p0/delete/delete_using.out @@ -1,29 +1,25 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !sql -- -1 \N 2 1 1.0 \N -1 10 1 1 1.0 2000-01-01 -2 \N 4 2 2.0 \N -2 20 2 2 2.0 2000-01-02 -3 \N 6 3 3.0 \N -3 30 3 3 3.0 2000-01-03 +-- !original_data -- +1 2000-01-01 1 1 1.0 +2 2000-01-02 2 2 2.0 +3 2000-01-03 3 3 3.0 --- !sql -- -2 \N 4 2 2.0 \N -2 20 2 2 2.0 2000-01-02 -3 \N 6 3 3.0 \N -3 30 3 3 3.0 2000-01-03 +-- !after_delete_from_partition -- +1 2000-01-01 1 1 1.0 +3 2000-01-03 3 3 3.0 --- !sql -- -1 \N 2 1 1.0 \N -1 10 1 1 1.0 2000-01-01 -2 \N 4 2 2.0 \N -2 20 2 2 2.0 2000-01-02 -3 \N 6 3 3.0 \N -3 30 3 3 3.0 2000-01-03 +-- !after_delete -- +3 2000-01-03 3 3 3.0 --- !sql -- -2 \N 4 2 2.0 \N -2 20 2 2 2.0 2000-01-02 -3 \N 6 3 3.0 \N -3 30 3 3 3.0 2000-01-03 +-- !original_data -- +1 2000-01-01 1 1 1.0 +2 2000-01-02 2 2 2.0 +3 2000-01-03 3 3 3.0 + +-- !after_delete_from_partition -- +1 2000-01-01 1 1 1.0 +3 2000-01-03 3 3 3.0 + +-- !after_delete -- +3 2000-01-03 3 3 3.0 diff --git a/regression-test/suites/nereids_p0/delete/delete_using.groovy b/regression-test/suites/nereids_p0/delete/delete_using.groovy index a4f9639c57..061d748d8a 100644 --- a/regression-test/suites/nereids_p0/delete/delete_using.groovy +++ b/regression-test/suites/nereids_p0/delete/delete_using.groovy @@ -30,26 +30,30 @@ suite('nereids_delete_using') { sql """ create table t1 ( id int, - id1 int, + dt date, c1 bigint, c2 string, - c3 double, - c4 date - ) unique key (id, id1) - distributed by hash(id, id1) + c3 double + ) unique key (id, dt) + partition by range(dt) ( + from ("2000-01-01") TO ("2000-01-31") INTERVAL 1 DAY + ) + distributed by hash(id) properties( 'replication_num'='1', "enable_unique_key_merge_on_write" = "true", - "store_row_column" = "${use_row_store}"); """ + "store_row_column" = "${use_row_store}" + ); + """ sql 'drop table if exists t2' sql ''' create table t2 ( id int, + dt date, c1 bigint, c2 string, - c3 double, - c4 date + c3 double ) unique key (id) distributed by hash(id) properties( @@ -69,24 +73,25 @@ suite('nereids_delete_using') { sql ''' INSERT INTO t1 VALUES - (1, 10, 1, '1', 1.0, '2000-01-01'), - (2, 20, 2, '2', 2.0, '2000-01-02'), - (3, 30, 3, '3', 3.0, '2000-01-03'); + (1, '2000-01-01', 1, '1', 1.0), + (2, '2000-01-02', 2, '2', 2.0), + (3, '2000-01-03', 3, '3', 3.0); ''' sql ''' INSERT INTO t2 VALUES - (1, 10, '10', 10.0, '2000-01-10'), - (2, 20, '20', 20.0, '2000-01-20'), - (3, 30, '30', 30.0, '2000-01-30'), - (4, 4, '4', 4.0, '2000-01-04'), - (5, 5, '5', 5.0, '2000-01-05'); + (1, '2000-01-10', 10, '10', 10.0), + (2, '2000-01-20', 20, '20', 20.0), + (3, '2000-01-30', 30, '30', 30.0), + (4, '2000-01-04', 4, '4', 4.0), + (5, '2000-01-05', 5, '5', 5.0); ''' sql ''' INSERT INTO t3 VALUES (1), + (2), (4), (5); ''' @@ -95,11 +100,24 @@ suite('nereids_delete_using') { sql 'set enable_fallback_to_original_planner=false' sql 'set enable_nereids_dml=true' - sql 'insert into t1(id, c1, c2, c3) select id, c1 * 2, c2, c3 from t1' - sql 'insert into t2(id, c1, c2, c3) select id, c1, c2 * 2, c3 from t2' - sql 'insert into t2(c1, c3) select c1 + 1, c3 + 1 from (select id, c1, c3 from t1 order by id, c1 limit 10) t1, t3' + qt_original_data 'select * from t1 order by id, dt' - qt_sql 'select * from t1 order by id, id1' + test { + sql ''' + delete from t1 temporary partition (p_20000102) + using t2 join t3 on t2.id = t3.id + where t1.id = t2.id; + ''' + exception 'Partition: p_20000102 is not exists' + } + + sql ''' + delete from t1 partition (p_20000102) + using t2 join t3 on t2.id = t3.id + where t1.id = t2.id; + ''' + + qt_after_delete_from_partition 'select * from t1 order by id, dt' sql ''' delete from t1 @@ -107,7 +125,7 @@ suite('nereids_delete_using') { where t1.id = t2.id; ''' - qt_sql 'select * from t1 order by id, id1' + qt_after_delete 'select * from t1 order by id, dt' } } } diff --git a/regression-test/suites/nereids_p0/insert_into_table/no_partition.groovy b/regression-test/suites/nereids_p0/insert_into_table/no_partition.groovy index 08d2f4b67b..c68ac24c74 100644 --- a/regression-test/suites/nereids_p0/insert_into_table/no_partition.groovy +++ b/regression-test/suites/nereids_p0/insert_into_table/no_partition.groovy @@ -16,6 +16,8 @@ // under the License. suite('nereids_insert_no_partition') { + // TODO: reopen this case in conf, currently delete fill failed + // because nereids generate a true predicate and be could not support it. sql 'use nereids_insert_into_table_test' sql 'clean label from nereids_insert_into_table_test' @@ -289,9 +291,19 @@ suite('nereids_insert_no_partition') { sql '''delete from uni_mow_nop_t where id is null''' sql '''delete from uni_light_sc_mow_nop_t where id is not null''' sql '''delete from uni_light_sc_mow_nop_t where id is null''' + + // TODO turn off fallback when storage layer support true predicate + sql '''set enable_fallback_to_original_planner=true''' sql '''delete from uni_mow_not_null_nop_t where id is not null''' + sql '''set enable_fallback_to_original_planner=false''' + sql '''delete from uni_mow_not_null_nop_t where id is null''' + + // TODO turn off fallback when storage layer support true predicate + sql '''set enable_fallback_to_original_planner=true''' sql '''delete from uni_light_sc_mow_not_null_nop_t where id is not null''' + sql '''set enable_fallback_to_original_planner=false''' + sql '''delete from uni_light_sc_mow_not_null_nop_t where id is null''' sql 'alter table agg_light_sc_nop_t rename column ktinyint ktint' sql 'alter table agg_light_sc_not_null_nop_t rename column ktinyint ktint'