[refactor](Nereids) compatible with all ability legacy planner (#27947)

refactor:
1. split InsertIntoTableCommand into three sub command
- InsertIntoTableCommand
- InsertOverwriteTableCommand
- BatchInsertIntoTableCommand

feature:
1. support DEFAULT keywords in values list
2. support empty values list
3. support temporary partition
4. support insert into values in txn model

fix:
1. should start transaction before release read lock on target table
This commit is contained in:
morrySnow
2023-12-05 19:10:55 +08:00
committed by GitHub
parent 8e161ad0f2
commit e79422addc
36 changed files with 1628 additions and 654 deletions

View File

@ -23,9 +23,9 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.nereids.analyzer.Scope;
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.hint.Hint;
import org.apache.doris.nereids.jobs.Job;
@ -403,12 +403,12 @@ public class CascadesContext implements ScheduleContext {
tableNames.addAll(extractTableNamesFromOneRowRelation((UnboundOneRowRelation) p));
} else {
Set<LogicalPlan> logicalPlans = p.collect(
n -> (n instanceof UnboundRelation || n instanceof UnboundOlapTableSink));
n -> (n instanceof UnboundRelation || n instanceof UnboundTableSink));
for (LogicalPlan plan : logicalPlans) {
if (plan instanceof UnboundRelation) {
tableNames.add(((UnboundRelation) plan).getNameParts());
} else if (plan instanceof UnboundOlapTableSink) {
tableNames.add(((UnboundOlapTableSink<?>) plan).getNameParts());
} else if (plan instanceof UnboundTableSink) {
tableNames.add(((UnboundTableSink<?>) plan).getNameParts());
} else {
throw new AnalysisException("get tables from plan failed. meet unknown type node " + plan);
}

View File

@ -41,44 +41,61 @@ import java.util.Optional;
/**
* Represent an olap table sink plan node that has not been bound.
*/
public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD_TYPE>
public class UnboundTableSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD_TYPE>
implements Unbound, Sink, BlockFuncDepsPropagation {
private final List<String> nameParts;
private final List<String> colNames;
private final List<String> hints;
private final boolean temporaryPartition;
private final List<String> partitions;
private final boolean isPartialUpdate;
private final boolean isFromNativeInsertStmt;
public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
public UnboundTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, false, false, Optional.empty(), Optional.empty(), child);
this(nameParts, colNames, hints, false, partitions,
false, false, Optional.empty(), Optional.empty(), child);
}
public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
public UnboundTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
boolean temporaryPartition, List<String> partitions, CHILD_TYPE child) {
this(nameParts, colNames, hints, temporaryPartition, partitions,
false, false, Optional.empty(), Optional.empty(), child);
}
public UnboundTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, boolean isPartialUpdate, CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, isPartialUpdate, false,
this(nameParts, colNames, hints, false, partitions, isPartialUpdate, false,
Optional.empty(), Optional.empty(), child);
}
public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, boolean isPartialUpdate, boolean isFromNativeInsertStmt, CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, isPartialUpdate, isFromNativeInsertStmt,
public UnboundTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
boolean temporaryPartition, List<String> partitions, boolean isPartialUpdate, CHILD_TYPE child) {
this(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, false,
Optional.empty(), Optional.empty(), child);
}
public UnboundTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
boolean temporaryPartition, List<String> partitions,
boolean isPartialUpdate, boolean isFromNativeInsertStmt, CHILD_TYPE child) {
this(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, isFromNativeInsertStmt,
Optional.empty(), Optional.empty(), child);
}
/**
* constructor
*/
public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, boolean isPartialUpdate, boolean isFromNativeInsertStmt,
public UnboundTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
boolean temporaryPartition, List<String> partitions,
boolean isPartialUpdate, boolean isFromNativeInsertStmt,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, logicalProperties, child);
this.nameParts = Utils.copyRequiredList(nameParts);
this.colNames = Utils.copyRequiredList(colNames);
this.hints = Utils.copyRequiredList(hints);
this.temporaryPartition = temporaryPartition;
this.partitions = Utils.copyRequiredList(partitions);
this.isPartialUpdate = isPartialUpdate;
this.isFromNativeInsertStmt = isFromNativeInsertStmt;
@ -92,6 +109,10 @@ public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
return nameParts;
}
public boolean isTemporaryPartition() {
return temporaryPartition;
}
public List<String> getPartitions() {
return partitions;
}
@ -111,13 +132,13 @@ public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "UnboundOlapTableSink only accepts one child");
return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, isPartialUpdate,
return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate,
isFromNativeInsertStmt, groupExpression, Optional.empty(), children.get(0));
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitUnboundOlapTableSink(this, context);
return visitor.visitUnboundTableSink(this, context);
}
@Override
@ -133,7 +154,7 @@ public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
if (o == null || getClass() != o.getClass()) {
return false;
}
UnboundOlapTableSink<?> that = (UnboundOlapTableSink<?>) o;
UnboundTableSink<?> that = (UnboundTableSink<?>) o;
return Objects.equals(nameParts, that.nameParts)
&& Objects.equals(colNames, that.colNames)
&& Objects.equals(hints, that.hints)
@ -147,14 +168,14 @@ public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, isPartialUpdate,
return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate,
isFromNativeInsertStmt, groupExpression, Optional.of(getLogicalProperties()), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions,
return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions,
isPartialUpdate, isFromNativeInsertStmt, groupExpression, logicalProperties, children.get(0));
}

View File

@ -103,6 +103,7 @@ import org.apache.doris.nereids.rules.rewrite.PushDownTopNThroughUnion;
import org.apache.doris.nereids.rules.rewrite.PushDownTopNThroughWindow;
import org.apache.doris.nereids.rules.rewrite.PushFilterInsideJoin;
import org.apache.doris.nereids.rules.rewrite.PushProjectIntoOneRowRelation;
import org.apache.doris.nereids.rules.rewrite.PushProjectIntoUnion;
import org.apache.doris.nereids.rules.rewrite.PushProjectThroughUnion;
import org.apache.doris.nereids.rules.rewrite.ReorderJoin;
import org.apache.doris.nereids.rules.rewrite.RewriteCteChildren;
@ -263,6 +264,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
bottomUp(new MergeSetOperations()),
bottomUp(new PushProjectIntoOneRowRelation()),
topDown(new MergeOneRowRelationIntoUnion()),
topDown(new PushProjectIntoUnion()),
costBased(topDown(new InferSetOperatorDistinct())),
topDown(new BuildAggForUnion())
),

View File

@ -95,7 +95,7 @@ import org.apache.doris.nereids.DorisParser.InPartitionDefContext;
import org.apache.doris.nereids.DorisParser.IndexDefContext;
import org.apache.doris.nereids.DorisParser.IndexDefsContext;
import org.apache.doris.nereids.DorisParser.InlineTableContext;
import org.apache.doris.nereids.DorisParser.InsertIntoQueryContext;
import org.apache.doris.nereids.DorisParser.InsertTableContext;
import org.apache.doris.nereids.DorisParser.IntegerLiteralContext;
import org.apache.doris.nereids.DorisParser.IntervalContext;
import org.apache.doris.nereids.DorisParser.Is_not_null_predContext;
@ -117,6 +117,7 @@ import org.apache.doris.nereids.DorisParser.NamedExpressionSeqContext;
import org.apache.doris.nereids.DorisParser.NullLiteralContext;
import org.apache.doris.nereids.DorisParser.OutFileClauseContext;
import org.apache.doris.nereids.DorisParser.ParenthesizedExpressionContext;
import org.apache.doris.nereids.DorisParser.PartitionSpecContext;
import org.apache.doris.nereids.DorisParser.PartitionValueDefContext;
import org.apache.doris.nereids.DorisParser.PartitionsDefContext;
import org.apache.doris.nereids.DorisParser.PlanTypeContext;
@ -141,6 +142,7 @@ import org.apache.doris.nereids.DorisParser.RelationContext;
import org.apache.doris.nereids.DorisParser.RollupDefContext;
import org.apache.doris.nereids.DorisParser.RollupDefsContext;
import org.apache.doris.nereids.DorisParser.RowConstructorContext;
import org.apache.doris.nereids.DorisParser.RowConstructorItemContext;
import org.apache.doris.nereids.DorisParser.SampleByPercentileContext;
import org.apache.doris.nereids.DorisParser.SampleByRowsContext;
import org.apache.doris.nereids.DorisParser.SampleContext;
@ -181,13 +183,13 @@ import org.apache.doris.nereids.DorisParserBaseVisitor;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundStar;
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.analyzer.UnboundVariable;
import org.apache.doris.nereids.analyzer.UnboundVariable.VariableType;
import org.apache.doris.nereids.exceptions.AnalysisException;
@ -199,7 +201,6 @@ import org.apache.doris.nereids.properties.SelectHintOrdered;
import org.apache.doris.nereids.properties.SelectHintSetVar;
import org.apache.doris.nereids.trees.TableSample;
import org.apache.doris.nereids.trees.expressions.Add;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.BitAnd;
import org.apache.doris.nereids.trees.expressions.BitNot;
@ -207,6 +208,7 @@ import org.apache.doris.nereids.trees.expressions.BitOr;
import org.apache.doris.nereids.trees.expressions.BitXor;
import org.apache.doris.nereids.trees.expressions.CaseWhen;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.DefaultValueSlot;
import org.apache.doris.nereids.trees.expressions.Divide;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Exists;
@ -314,6 +316,7 @@ import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.Constraint;
import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand;
@ -326,6 +329,7 @@ import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
@ -359,6 +363,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
import org.apache.doris.nereids.trees.plans.logical.LogicalIntersect;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
@ -450,12 +455,13 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
}
@Override
public LogicalPlan visitInsertIntoQuery(InsertIntoQueryContext ctx) {
public LogicalPlan visitInsertTable(InsertTableContext ctx) {
boolean isOverwrite = ctx.INTO() == null;
List<String> tableName = new ArrayList<>();
ImmutableList.Builder<String> tableName = ImmutableList.builder();
if (null != ctx.tableName) {
tableName = visitMultipartIdentifier(ctx.tableName);
tableName.addAll(visitMultipartIdentifier(ctx.tableName));
} else if (null != ctx.tableId) {
// process group commit insert table command send by be
TableName name = Env.getCurrentEnv().getInternalCatalog()
.getTableNameByTableId(Long.valueOf(ctx.tableId.getText()));
tableName.add(name.getDb());
@ -463,21 +469,44 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
} else {
throw new ParseException("tableName and tableId cannot both be null");
}
String labelName = ctx.labelName == null ? null : ctx.labelName.getText();
Optional<String> labelName = ctx.labelName == null ? Optional.empty() : Optional.of(ctx.labelName.getText());
List<String> colNames = ctx.cols == null ? ImmutableList.of() : visitIdentifierList(ctx.cols);
List<String> partitions = ctx.partition == null ? ImmutableList.of() : visitIdentifierList(ctx.partition);
UnboundOlapTableSink<?> sink = new UnboundOlapTableSink<>(
tableName,
// TODO visit partitionSpecCtx
PartitionSpecContext partitionSpecCtx = ctx.partitionSpec();
List<String> partitions = ImmutableList.of();
boolean temporaryPartition = false;
if (partitionSpecCtx != null) {
temporaryPartition = partitionSpecCtx.TEMPORARY() != null;
if (partitionSpecCtx.partition != null) {
partitions = ImmutableList.of(partitionSpecCtx.partition.getText());
} else {
partitions = visitIdentifierList(partitionSpecCtx.partitions);
}
}
LogicalPlan plan = ctx.query() != null ? visitQuery(ctx.query()) : visitInlineTable(ctx.inlineTable());
UnboundTableSink<?> sink = new UnboundTableSink<>(
tableName.build(),
colNames,
ImmutableList.of(),
temporaryPartition,
partitions,
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(),
true,
visitQuery(ctx.query()));
if (ctx.explain() != null) {
return withExplain(sink, ctx.explain());
plan);
LogicalPlan command;
if (isOverwrite) {
command = new InsertOverwriteTableCommand(sink, labelName);
} else {
if (ConnectContext.get() != null && ConnectContext.get().isTxnModel()) {
command = new BatchInsertIntoTableCommand(sink);
} else {
command = new InsertIntoTableCommand(sink, labelName);
}
}
return new InsertIntoTableCommand(sink, Optional.ofNullable(labelName), isOverwrite);
if (ctx.explain() != null) {
return withExplain(command, ctx.explain());
}
return command;
}
@Override
@ -998,11 +1027,14 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
}
}
private LogicalPlan logicalPlanCombiner(LogicalPlan left, LogicalPlan right, Qualifier qualifier) {
private static LogicalPlan logicalPlanCombiner(LogicalPlan left, LogicalPlan right, Qualifier qualifier) {
return new LogicalUnion(qualifier, ImmutableList.of(left, right));
}
private LogicalPlan reduceToLogicalPlanTree(int low, int high,
/**
* construct avl union tree
*/
public static LogicalPlan reduceToLogicalPlanTree(int low, int high,
List<LogicalPlan> logicalPlans, Qualifier qualifier) {
switch (high - low) {
case 0:
@ -1052,11 +1084,10 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
@Override
public LogicalPlan visitInlineTable(InlineTableContext ctx) {
List<LogicalPlan> exprsList = ctx.rowConstructor().stream()
List<List<NamedExpression>> values = ctx.rowConstructor().stream()
.map(this::visitRowConstructor)
.map(LogicalPlan.class::cast)
.collect(ImmutableList.toImmutableList());
return reduceToLogicalPlanTree(0, exprsList.size() - 1, exprsList, Qualifier.ALL);
return new LogicalInlineTable(values);
}
/**
@ -1166,11 +1197,15 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
* supported.
*/
@Override
public Expression visitNamedExpression(NamedExpressionContext ctx) {
public NamedExpression visitNamedExpression(NamedExpressionContext ctx) {
return ParserUtils.withOrigin(ctx, () -> {
Expression expression = getExpression(ctx.expression());
if (ctx.identifierOrText() == null) {
return expression;
if (expression instanceof NamedExpression) {
return (NamedExpression) expression;
} else {
return new UnboundAlias(expression);
}
}
String alias = visitIdentifierOrText(ctx.identifierOrText());
return new UnboundAlias(expression, alias);
@ -2003,15 +2038,19 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
}
@Override
public UnboundOneRowRelation visitRowConstructor(RowConstructorContext ctx) {
return new UnboundOneRowRelation(
StatementScopeIdGenerator.newRelationId(),
ctx.namedExpression().stream()
.map(this::visitNamedExpression)
.map(e -> (e instanceof NamedExpression)
? ((NamedExpression) e)
: new Alias(e, e.toSql()))
.collect(ImmutableList.toImmutableList()));
public List<NamedExpression> visitRowConstructor(RowConstructorContext ctx) {
return ctx.rowConstructorItem().stream()
.map(this::visitRowConstructorItem)
.collect(ImmutableList.toImmutableList());
}
@Override
public NamedExpression visitRowConstructorItem(RowConstructorItemContext ctx) {
if (ctx.DEFAULT() != null) {
return new DefaultValueSlot();
} else {
return visitNamedExpression(ctx.namedExpression());
}
}
@Override
@ -2790,16 +2829,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
}
private List<NamedExpression> getNamedExpressions(NamedExpressionSeqContext namedCtx) {
return ParserUtils.withOrigin(namedCtx, () -> {
List<Expression> expressions = visit(namedCtx.namedExpression(), Expression.class);
return expressions.stream().map(expression -> {
if (expression instanceof NamedExpression) {
return (NamedExpression) expression;
} else {
return new UnboundAlias(expression);
}
}).collect(ImmutableList.toImmutableList());
});
return ParserUtils.withOrigin(namedCtx, () -> visit(namedCtx.namedExpression(), NamedExpression.class));
}
@Override

View File

@ -20,7 +20,7 @@ package org.apache.doris.nereids.processor.pre;
import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
@ -33,10 +33,10 @@ import org.apache.doris.qe.VariableMgr;
public class TurnOffPipelineForDml extends PlanPreprocessor {
@Override
public Plan visitUnboundOlapTableSink(UnboundOlapTableSink<? extends Plan> unboundOlapTableSink,
public Plan visitUnboundTableSink(UnboundTableSink<? extends Plan> unboundTableSink,
StatementContext context) {
turnOffPipeline(context);
return unboundOlapTableSink;
return unboundTableSink;
}
@Override

View File

@ -30,7 +30,8 @@ public enum RuleType {
// **** make sure BINDING_UNBOUND_LOGICAL_PLAN is the lowest priority in the rewrite rules. ****
BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
BINDING_NON_LEAF_LOGICAL_PLAN(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_FILE(RuleTypeClass.REWRITE),
BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE),
BINDING_RELATION(RuleTypeClass.REWRITE),
BINDING_PROJECT_SLOT(RuleTypeClass.REWRITE),
@ -44,21 +45,11 @@ public enum RuleType {
BINDING_SORT_SET_OPERATION_SLOT(RuleTypeClass.REWRITE),
BINDING_LIMIT_SLOT(RuleTypeClass.REWRITE),
BINDING_GENERATE_SLOT(RuleTypeClass.REWRITE),
BINDING_ONE_ROW_RELATION_FUNCTION(RuleTypeClass.REWRITE),
BINDING_PROJECT_FUNCTION(RuleTypeClass.REWRITE),
BINDING_AGGREGATE_FUNCTION(RuleTypeClass.REWRITE),
BINDING_REPEAT_FUNCTION(RuleTypeClass.REWRITE),
BINDING_SUBQUERY_ALIAS_SLOT(RuleTypeClass.REWRITE),
BINDING_FILTER_FUNCTION(RuleTypeClass.REWRITE),
BINDING_HAVING_FUNCTION(RuleTypeClass.REWRITE),
BINDING_SORT_FUNCTION(RuleTypeClass.REWRITE),
BINDING_JOIN_FUNCTION(RuleTypeClass.REWRITE),
BINDING_UNBOUND_TVF_RELATION_FUNCTION(RuleTypeClass.REWRITE),
BINDING_SET_OPERATION_SLOT(RuleTypeClass.REWRITE),
BINDING_GENERATE_FUNCTION(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
BINDING_INLINE_TABLE_SLOT(RuleTypeClass.REWRITE),
BINDING_INSERT_FILE(RuleTypeClass.REWRITE),
COUNT_LITERAL_REWRITE(RuleTypeClass.REWRITE),
REPLACE_SORT_EXPRESSION_BY_CHILD_OUTPUT(RuleTypeClass.REWRITE),
@ -243,6 +234,7 @@ public enum RuleType {
PUSH_PROJECT_THROUGH_UNION(RuleTypeClass.REWRITE),
MERGE_ONE_ROW_RELATION_INTO_UNION(RuleTypeClass.REWRITE),
PUSH_PROJECT_INTO_ONE_ROW_RELATION(RuleTypeClass.REWRITE),
PUSH_PROJECT_INTO_UNION(RuleTypeClass.REWRITE),
MERGE_SET_OPERATION(RuleTypeClass.REWRITE),
BUILD_AGG_FOR_UNION(RuleTypeClass.REWRITE),
COUNT_DISTINCT_REWRITE(RuleTypeClass.REWRITE),

View File

@ -63,6 +63,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
import org.apache.doris.nereids.trees.plans.logical.LogicalIntersect;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
@ -517,6 +518,16 @@ public class BindExpression implements AnalysisRuleFactory {
return new LogicalOneRowRelation(oneRowRelation.getRelationId(), projects);
})
),
RuleType.BINDING_INLINE_TABLE_SLOT.build(
logicalInlineTable().thenApply(ctx -> {
LogicalInlineTable logicalInlineTable = ctx.root;
// ensure all expressions are valid.
logicalInlineTable.getExpressions().forEach(expr ->
bindSlot(expr, ImmutableList.of(), ctx.cascadesContext, false)
);
return null;
})
),
RuleType.BINDING_SET_OPERATION_SLOT.build(
// LogicalSetOperation don't bind again if LogicalSetOperation.outputs is not empty, this is special
// we should not remove LogicalSetOperation::canBind, because in default case, the plan can run into

View File

@ -27,8 +27,8 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.rules.Rule;
@ -37,6 +37,7 @@ import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
import org.apache.doris.nereids.rules.expression.rules.FunctionBinder;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.DefaultValueSlot;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Substring;
@ -45,6 +46,7 @@ import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.types.DataType;
@ -71,8 +73,8 @@ public class BindSink implements AnalysisRuleFactory {
@Override
public List<Rule> buildRules() {
return ImmutableList.of(
RuleType.BINDING_INSERT_TARGET_TABLE.build(unboundOlapTableSink().thenApply(ctx -> {
UnboundOlapTableSink<?> sink = ctx.root;
RuleType.BINDING_INSERT_TARGET_TABLE.build(unboundTableSink().thenApply(ctx -> {
UnboundTableSink<?> sink = ctx.root;
Pair<Database, OlapTable> pair = bind(ctx.cascadesContext, sink);
Database database = pair.first;
OlapTable table = pair.second;
@ -93,13 +95,13 @@ public class BindSink implements AnalysisRuleFactory {
database,
table,
bindColumns,
bindPartitionIds(table, sink.getPartitions()),
bindPartitionIds(table, sink.getPartitions(), sink.isTemporaryPartition()),
child.getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList()),
isPartialUpdate,
sink.isFromNativeInsertStmt(),
sink.child());
child);
if (isPartialUpdate) {
// check the necessary conditions for partial updates
@ -129,7 +131,6 @@ public class BindSink implements AnalysisRuleFactory {
// we need to insert all the columns of the target table
// although some columns are not mentions.
// so we add a projects to supply the default value.
if (boundSink.getCols().size() != child.getOutput().size() + extraColumnsNum) {
throw new AnalysisException("insert into cols should be corresponding to the query output");
}
@ -179,6 +180,10 @@ public class BindSink implements AnalysisRuleFactory {
throw new AnalysisException(e.getMessage(), e.getCause());
}
// we need to insert all the columns of the target table
// although some columns are not mentions.
// so we add a projects to supply the default value.
Map<Column, NamedExpression> columnToChildOutput = Maps.newHashMap();
for (int i = 0; i < child.getOutput().size(); ++i) {
columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i));
@ -208,7 +213,10 @@ public class BindSink implements AnalysisRuleFactory {
}
NamedExpression slot = new Alias(boundExpression, column.getDefineExpr().toSqlWithoutTbl());
columnToOutput.put(column.getName(), slot);
} else if (columnToChildOutput.containsKey(column)) {
} else if (columnToChildOutput.containsKey(column)
// do not process explicitly use DEFAULT value here:
// insert into table t values(DEFAULT)
&& !(columnToChildOutput.get(column) instanceof DefaultValueSlot)) {
columnToOutput.put(column.getName(), columnToChildOutput.get(column));
} else {
if (table.hasSequenceCol()
@ -243,6 +251,12 @@ public class BindSink implements AnalysisRuleFactory {
continue;
}
} else if (column.getDefaultValue() == null) {
// throw exception if explicitly use Default value but no default value present
// insert into table t values(DEFAULT)
if (columnToChildOutput.get(column) instanceof DefaultValueSlot) {
throw new AnalysisException("Column has no default value,"
+ " column=" + column.getName());
}
// Otherwise, the unmentioned columns should be filled with default values
// or null values
columnToOutput.put(column.getName(), new Alias(
@ -277,8 +291,14 @@ public class BindSink implements AnalysisRuleFactory {
}
}
List<NamedExpression> fullOutputExprs = ImmutableList.copyOf(columnToOutput.values());
LogicalProject<?> fullOutputProject = new LogicalProject<>(fullOutputExprs, boundSink.child());
if (child instanceof LogicalOneRowRelation) {
// remove default value slot in one row relation
child = ((LogicalOneRowRelation) child).withProjects(((LogicalOneRowRelation) child)
.getProjects().stream()
.filter(p -> !(p instanceof DefaultValueSlot))
.collect(ImmutableList.toImmutableList()));
}
LogicalProject<?> fullOutputProject = new LogicalProject<>(fullOutputExprs, child);
// add cast project
List<NamedExpression> castExprs = Lists.newArrayList();
@ -328,7 +348,7 @@ public class BindSink implements AnalysisRuleFactory {
);
}
private Pair<Database, OlapTable> bind(CascadesContext cascadesContext, UnboundOlapTableSink<? extends Plan> sink) {
private Pair<Database, OlapTable> bind(CascadesContext cascadesContext, UnboundTableSink<? extends Plan> sink) {
List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
sink.getNameParts());
Pair<DatabaseIf, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier,
@ -344,11 +364,11 @@ public class BindSink implements AnalysisRuleFactory {
return Pair.of(((Database) pair.first), (OlapTable) pair.second);
}
private List<Long> bindPartitionIds(OlapTable table, List<String> partitions) {
private List<Long> bindPartitionIds(OlapTable table, List<String> partitions, boolean temp) {
return partitions.isEmpty()
? ImmutableList.of()
: partitions.stream().map(pn -> {
Partition partition = table.getPartition(pn);
Partition partition = table.getPartition(pn, temp);
if (partition == null) {
throw new AnalysisException(String.format("partition %s is not found in table %s",
pn, table.getName()));

View File

@ -0,0 +1,75 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.rewrite;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import org.apache.doris.nereids.util.ExpressionUtils;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
/**
* Project(Union) -> Union, if union with all qualifier and without children.
*/
public class PushProjectIntoUnion extends OneRewriteRuleFactory {
@Override
public Rule build() {
return logicalProject(logicalUnion()
.when(u -> u.getQualifier() == Qualifier.ALL)
.when(u -> u.arity() == 0)
).then(p -> {
LogicalUnion union = p.child();
ImmutableList.Builder<List<NamedExpression>> newConstExprs = ImmutableList.builder();
for (List<NamedExpression> constExprs : union.getConstantExprsList()) {
Map<Expression, Expression> replaceMap = Maps.newHashMap();
Map<Expression, NamedExpression> replaceRootMap = Maps.newHashMap();
for (int i = 0; i < constExprs.size(); i++) {
NamedExpression ne = constExprs.get(i);
if (ne instanceof Alias) {
replaceMap.put(union.getOutput().get(i), ((Alias) ne).child());
} else {
replaceMap.put(union.getOutput().get(i), ne);
}
replaceRootMap.put(union.getOutput().get(i), ne);
}
ImmutableList.Builder<NamedExpression> newProjections = ImmutableList.builder();
for (NamedExpression old : p.getProjects()) {
if (old instanceof SlotReference) {
newProjections.add(replaceRootMap.get(old));
} else {
newProjections.add(ExpressionUtils.replace(old, replaceMap));
}
}
newConstExprs.add(newProjections.build());
}
return p.child()
.withChildrenAndConstExprsList(ImmutableList.of(), ImmutableList.of(), newConstExprs.build())
.withNewOutputs(p.getOutputs());
}).toRule(RuleType.PUSH_PROJECT_INTO_UNION);
}
}

View File

@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.expressions;
import org.apache.doris.nereids.exceptions.UnboundException;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.NullType;
/**
* only use for insert into t values(DEFAULT, ...)
*/
public class DefaultValueSlot extends Slot {
public DefaultValueSlot() {
super();
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitDefaultValue(this, context);
}
@Override
public boolean nullable() {
return false;
}
@Override
public String getName() throws UnboundException {
return "$default";
}
@Override
public DataType getDataType() throws UnboundException {
return NullType.INSTANCE;
}
@Override
public Slot toSlot() throws UnboundException {
return this;
}
@Override
public String toString() {
return "DEFAULT_VALUE";
}
}

View File

@ -40,6 +40,7 @@ import org.apache.doris.nereids.trees.expressions.CaseWhen;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.CompoundPredicate;
import org.apache.doris.nereids.trees.expressions.DefaultValueSlot;
import org.apache.doris.nereids.trees.expressions.Divide;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Exists;
@ -221,6 +222,10 @@ public abstract class ExpressionVisitor<R, C>
return visitSlot(slotReference, context);
}
public R visitDefaultValue(DefaultValueSlot defaultValueSlot, C context) {
return visitSlot(defaultValueSlot, context);
}
public R visitArrayItemSlot(ArrayItemReference.ArrayItemSlot arrayItemSlot, C context) {
return visitSlotReference(arrayItemSlot, context);
}

View File

@ -60,6 +60,7 @@ public enum PlanType {
LOGICAL_FILTER,
LOGICAL_GENERATE,
LOGICAL_HAVING,
LOGICAL_INLINE_TABLE,
LOGICAL_INTERSECT,
LOGICAL_JOIN,
LOGICAL_LIMIT,
@ -121,6 +122,8 @@ public enum PlanType {
EXPLAIN_COMMAND,
EXPORT_COMMAND,
INSERT_INTO_TABLE_COMMAND,
BATCH_INSERT_INTO_TABLE_COMMAND,
INSERT_OVERWRITE_TABLE_COMMAND,
LOAD_COMMAND,
SELECT_INTO_OUTFILE_COMMAND,
UPDATE_COMMAND,

View File

@ -0,0 +1,159 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.TreeNode;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* insert into values with in txn model.
*/
public class BatchInsertIntoTableCommand extends Command implements ForwardWithSync, Explainable {
public static final Logger LOG = LogManager.getLogger(BatchInsertIntoTableCommand.class);
private LogicalPlan logicalQuery;
public BatchInsertIntoTableCommand(LogicalPlan logicalQuery) {
super(PlanType.BATCH_INSERT_INTO_TABLE_COMMAND);
this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null");
}
@Override
public Plan getExplainPlan(ConnectContext ctx) throws Exception {
return InsertExecutor.normalizePlan(this.logicalQuery, InsertExecutor.getTargetTable(this.logicalQuery, ctx));
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitBatchInsertIntoTableCommand(this, context);
}
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
if (!ctx.getSessionVariable().isEnableNereidsDML()) {
try {
ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
} catch (Exception e) {
throw new AnalysisException("failed to set fallback to original planner to true", e);
}
throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner");
}
UnboundTableSink<? extends Plan> unboundTableSink = (UnboundTableSink<? extends Plan>) logicalQuery;
Plan query = unboundTableSink.child();
if (!(query instanceof LogicalInlineTable)) {
throw new AnalysisException("Insert into ** select is not supported in a transaction");
}
PhysicalOlapTableSink<?> sink;
TableIf targetTableIf = InsertExecutor.getTargetTable(logicalQuery, ctx);
targetTableIf.readLock();
try {
this.logicalQuery = (LogicalPlan) InsertExecutor.normalizePlan(logicalQuery, targetTableIf);
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
executor.checkBlockRules();
if (ctx.getMysqlChannel() != null) {
ctx.getMysqlChannel().reset();
}
Optional<TreeNode<?>> plan = planner.getPhysicalPlan()
.<Set<TreeNode<?>>>collect(PhysicalOlapTableSink.class::isInstance).stream().findAny();
Preconditions.checkArgument(plan.isPresent(), "insert into command must contain OlapTableSinkNode");
sink = ((PhysicalOlapTableSink<?>) plan.get());
Table targetTable = sink.getTargetTable();
// should set columns of sink since we maybe generate some invisible columns
List<Column> fullSchema = sink.getTargetTable().getFullSchema();
List<Column> targetSchema = Lists.newArrayList();
if (sink.isPartialUpdate()) {
List<String> partialUpdateColumns = sink.getCols().stream()
.map(Column::getName)
.collect(Collectors.toList());
for (Column column : fullSchema) {
if (partialUpdateColumns.contains(column.getName())) {
targetSchema.add(column);
}
}
} else {
targetSchema = fullSchema;
}
// check auth
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), targetTable.getQualifiedDbName(), targetTable.getName(),
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
targetTable.getQualifiedDbName() + ": " + targetTable.getName());
}
Optional<PhysicalUnion> union = planner.getPhysicalPlan()
.<Set<PhysicalUnion>>collect(PhysicalUnion.class::isInstance).stream().findAny();
if (union.isPresent()) {
InsertExecutor.executeBatchInsertTransaction(ctx, targetTable.getQualifiedDbName(),
targetTable.getName(), targetSchema, union.get().getConstantExprsList());
return;
}
Optional<PhysicalOneRowRelation> oneRowRelation = planner.getPhysicalPlan()
.<Set<PhysicalOneRowRelation>>collect(PhysicalOneRowRelation.class::isInstance).stream().findAny();
if (oneRowRelation.isPresent()) {
InsertExecutor.executeBatchInsertTransaction(ctx, targetTable.getQualifiedDbName(),
targetTable.getName(), targetSchema, ImmutableList.of(oneRowRelation.get().getProjects()));
return;
}
// TODO: update error msg
throw new AnalysisException("could not run this sql");
} finally {
targetTableIf.readUnlock();
}
}
}

View File

@ -43,9 +43,7 @@ public abstract class Command extends AbstractPlan implements LogicalPlan, Block
super(type, Optional.empty(), Optional.empty(), null, ImmutableList.of());
}
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
// all command should impl this interface.
}
public abstract void run(ConnectContext ctx, StmtExecutor executor) throws Exception;
@Override
public Optional<GroupExpression> getGroupExpression() {

View File

@ -24,7 +24,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.annotation.Developing;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.PhysicalProperties;
@ -126,10 +126,10 @@ public class CreateTableCommand extends Command implements ForwardWithSync {
throw new AnalysisException(e.getMessage(), e.getCause());
}
query = new UnboundOlapTableSink<>(createTableInfo.getTableNameParts(), ImmutableList.of(), ImmutableList.of(),
query = new UnboundTableSink<>(createTableInfo.getTableNameParts(), ImmutableList.of(), ImmutableList.of(),
ImmutableList.of(), query);
try {
new InsertIntoTableCommand(query, Optional.empty(), false).run(ctx, executor);
new InsertIntoTableCommand(query, Optional.empty()).run(ctx, executor);
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
handleFallbackFailedCtas(ctx);
}

View File

@ -21,8 +21,8 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
@ -70,7 +70,7 @@ public class DeleteCommand extends Command implements ForwardWithSync, Explainab
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty(), false).run(ctx, executor);
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty()).run(ctx, executor);
}
private void checkTable(ConnectContext ctx) {
@ -120,7 +120,7 @@ public class DeleteCommand extends Command implements ForwardWithSync, Explainab
&& cols.size() < targetTable.getColumns().size();
// make UnboundTableSink
return new UnboundOlapTableSink<>(nameParts, cols, ImmutableList.of(),
return new UnboundTableSink<>(nameParts, cols, ImmutableList.of(),
partitions, isPartialUpdate, logicalQuery);
}

View File

@ -69,9 +69,9 @@ public class ExplainCommand extends Command implements NoForward {
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
LogicalPlan explainPlan = null;
LogicalPlan explainPlan;
if (!(logicalPlan instanceof Explainable)) {
throw new AnalysisException("explain a plan cannot be explained");
throw new AnalysisException(logicalPlan.getClass().getSimpleName() + " cannot be explained");
}
explainPlan = ((LogicalPlan) ((Explainable) logicalPlan).getExplainPlan(ctx));
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(explainPlan, ctx.getStatementContext());

View File

@ -0,0 +1,591 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.parser.LogicalPlanBuilder;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.DefaultValueSlot;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
import org.apache.doris.nereids.trees.expressions.literal.ArrayLiteral;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TMergeType;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.stream.Collectors;
/**
* transaction wrapper for Nereids
*/
public class InsertExecutor {
private static final Logger LOG = LogManager.getLogger(InsertExecutor.class);
private static final long INVALID_TXN_ID = -1L;
private final ConnectContext ctx;
private final Coordinator coordinator;
private final String labelName;
private final Database database;
private final Table table;
private final long createAt = System.currentTimeMillis();
private long loadedRows = 0;
private int filteredRows = 0;
private long txnId = INVALID_TXN_ID;
private TransactionStatus txnStatus = TransactionStatus.ABORTED;
private String errMsg = "";
/**
* constructor
*/
public InsertExecutor(ConnectContext ctx, Database database, Table table,
String labelName, NereidsPlanner planner) {
this.ctx = ctx;
this.coordinator = new Coordinator(ctx, null, planner, ctx.getStatsErrorEstimator());
this.labelName = labelName;
this.database = database;
this.table = table;
}
public long getTxnId() {
return txnId;
}
/**
* begin transaction if necessary
*/
public void beginTransaction() {
if (!(table instanceof OlapTable)) {
return;
}
try {
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), ImmutableList.of(table.getId()), labelName,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
} catch (Exception e) {
throw new AnalysisException("begin transaction failed. " + e.getMessage(), e);
}
}
/**
* finalize sink to complete enough info for sink execution
*/
public void finalizeSink(DataSink sink, boolean isPartialUpdate, boolean isFromInsert) {
if (!(sink instanceof OlapTableSink)) {
return;
}
Preconditions.checkState(table instanceof OlapTable,
"sink is OlapTableSink, but table type is " + table.getType());
OlapTableSink olapTableSink = (OlapTableSink) sink;
boolean isStrictMode = ctx.getSessionVariable().getEnableInsertStrict()
&& isPartialUpdate
&& isFromInsert;
try {
// TODO refactor this to avoid call legacy planner's function
olapTableSink.init(ctx.queryId(), txnId, database.getId(),
ctx.getExecTimeout(),
ctx.getSessionVariable().getSendBatchParallelism(),
false,
isStrictMode);
olapTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}
TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId);
if (state == null) {
throw new AnalysisException("txn does not exist: " + txnId);
}
state.addTableIndexes((OlapTable) table);
if (isPartialUpdate && isFromInsert) {
state.setSchemaForPartialUpdate((OlapTable) table);
}
}
/**
* execute insert txn for insert into select command.
*/
public void executeSingleInsertTransaction(StmtExecutor executor, long jobId) {
String queryId = DebugUtil.printId(ctx.queryId());
LOG.info("start insert [{}] with query id {} and txn id {}", labelName, queryId, txnId);
Throwable throwable = null;
try {
coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict());
coordinator.setQueryType(TQueryType.LOAD);
executor.getProfile().setExecutionProfile(coordinator.getExecutionProfile());
QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), coordinator);
coordinator.exec();
int execTimeout = ctx.getExecTimeout();
LOG.debug("insert [{}] with query id {} execution timeout is {}", labelName, queryId, execTimeout);
boolean notTimeout = coordinator.join(execTimeout);
if (!coordinator.isDone()) {
coordinator.cancel();
if (notTimeout) {
errMsg = coordinator.getExecStatus().getErrorMsg();
ErrorReport.reportDdlException("there exists unhealthy backend. "
+ errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
}
}
if (!coordinator.getExecStatus().ok()) {
errMsg = coordinator.getExecStatus().getErrorMsg();
LOG.warn("insert [{}] with query id {} failed, {}", labelName, queryId, errMsg);
ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
}
LOG.debug("insert [{}] with query id {} delta files is {}",
labelName, queryId, coordinator.getDeltaUrls());
if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) {
loadedRows = Long.parseLong(coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
}
if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) {
filteredRows = Integer.parseInt(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
}
// if in strict mode, insert will fail if there are filtered rows
if (ctx.getSessionVariable().getEnableInsertStrict()) {
if (filteredRows > 0) {
ctx.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT,
"Insert has filtered data in strict mode, tracking_url=" + coordinator.getTrackingUrl());
return;
}
}
if (table.getType() != TableType.OLAP && table.getType() != TableType.MATERIALIZED_VIEW) {
// no need to add load job.
// MySQL table is already being inserted.
ctx.getState().setOk(loadedRows, filteredRows, null);
return;
}
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
try {
String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage());
Env.getCurrentGlobalTransactionMgr().abortTransaction(
database.getId(), txnId,
(errMsg == null ? "unknown reason" : errMsg));
} catch (Exception abortTxnException) {
LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier(), abortTxnException);
}
} else if (Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
database, Lists.newArrayList(table),
txnId,
TabletCommitInfo.fromThrift(coordinator.getCommitInfos()),
ctx.getSessionVariable().getInsertVisibleTimeoutMs())) {
txnStatus = TransactionStatus.VISIBLE;
} else {
txnStatus = TransactionStatus.COMMITTED;
}
} catch (Throwable t) {
// if any throwable being thrown during insert operation, first we should abort this txn
LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t);
if (txnId != INVALID_TXN_ID) {
try {
Env.getCurrentGlobalTransactionMgr().abortTransaction(
database.getId(), txnId,
t.getMessage() == null ? "unknown reason" : t.getMessage());
} catch (Exception abortTxnException) {
// just print a log if abort txn failed. This failure do not need to pass to user.
// user only concern abort how txn failed.
LOG.warn("insert [{}] with query id {} abort txn {} failed",
labelName, queryId, txnId, abortTxnException);
}
}
if (!Config.using_old_load_usage_pattern) {
// if not using old load usage pattern, error will be returned directly to user
StringBuilder sb = new StringBuilder(t.getMessage());
if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
sb.append(". url: ").append(coordinator.getTrackingUrl());
}
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString());
return;
}
/*
* If config 'using_old_load_usage_pattern' is true.
* Doris will return a label to user, and user can use this label to check load job's status,
* which exactly like the old insert stmt usage pattern.
*/
throwable = t;
} finally {
executor.updateProfile(true);
QeProcessorImpl.INSTANCE.unregisterQuery(ctx.queryId());
}
// Go here, which means:
// 1. transaction is finished successfully (COMMITTED or VISIBLE), or
// 2. transaction failed but Config.using_old_load_usage_pattern is true.
// we will record the load job info for these 2 cases
try {
// the statement parsed by Nereids is saved at executor::parsedStmt.
StatementBase statement = executor.getParsedStmt();
UserIdentity userIdentity;
//if we use job scheduler, parse statement will not set user identity,so we need to get it from context
if (null == statement) {
userIdentity = ctx.getCurrentUserIdentity();
} else {
userIdentity = statement.getUserInfo();
}
EtlJobType etlJobType = EtlJobType.INSERT;
if (0 != jobId) {
etlJobType = EtlJobType.INSERT_JOB;
}
ctx.getEnv().getLoadManager()
.recordFinishedLoadJob(labelName, txnId, database.getFullName(),
table.getId(),
etlJobType, createAt, throwable == null ? "" : throwable.getMessage(),
coordinator.getTrackingUrl(), userIdentity, jobId);
} catch (MetaNotFoundException e) {
LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
errMsg = "Record info of insert load with error " + e.getMessage();
}
// {'label':'my_label1', 'status':'visible', 'txnId':'123'}
// {'label':'my_label1', 'status':'visible', 'txnId':'123' 'err':'error messages'}
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(labelName).append("', 'status':'").append(txnStatus.name());
sb.append("', 'txnId':'").append(txnId).append("'");
if (table.getType() == TableType.MATERIALIZED_VIEW) {
sb.append("', 'rows':'").append(loadedRows).append("'");
}
if (!Strings.isNullOrEmpty(errMsg)) {
sb.append(", 'err':'").append(errMsg).append("'");
}
sb.append("}");
ctx.getState().setOk(loadedRows, filteredRows, sb.toString());
// set insert result in connection context,
// so that user can use `show insert result` to get info of the last insert operation.
ctx.setOrUpdateInsertResult(txnId, labelName, database.getFullName(), table.getName(),
txnStatus, loadedRows, filteredRows);
// update it, so that user can get loaded rows in fe.audit.log
ctx.updateReturnRows((int) loadedRows);
}
/**
* execute insert values in transaction.
*/
public static void executeBatchInsertTransaction(ConnectContext ctx, String dbName, String tableName,
List<Column> columns, List<List<NamedExpression>> constantExprsList) {
if (ctx.isTxnIniting()) { // first time, begin txn
beginBatchInsertTransaction(ctx, dbName, tableName, columns);
}
if (!ctx.getTxnEntry().getTxnConf().getDb().equals(dbName)
|| !ctx.getTxnEntry().getTxnConf().getTbl().equals(tableName)) {
throw new AnalysisException("Only one table can be inserted in one transaction.");
}
TransactionEntry txnEntry = ctx.getTxnEntry();
int effectRows = 0;
for (List<NamedExpression> row : constantExprsList) {
++effectRows;
InternalService.PDataRow data = getRowStringValue(row);
if (data == null) {
continue;
}
List<InternalService.PDataRow> dataToSend = txnEntry.getDataToSend();
dataToSend.add(data);
if (dataToSend.size() >= StmtExecutor.MAX_DATA_TO_SEND_FOR_TXN) {
// send data
InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(txnEntry);
try {
executor.sendData();
} catch (Exception e) {
throw new AnalysisException("send data to be failed, because " + e.getMessage(), e);
}
}
}
txnEntry.setRowsInTransaction(txnEntry.getRowsInTransaction() + effectRows);
// {'label':'my_label1', 'status':'visible', 'txnId':'123'}
// {'label':'my_label1', 'status':'visible', 'txnId':'123' 'err':'error messages'}
String sb = "{'label':'" + ctx.getTxnEntry().getLabel()
+ "', 'status':'" + TransactionStatus.PREPARE.name()
+ "', 'txnId':'" + ctx.getTxnEntry().getTxnConf().getTxnId() + "'"
+ "}";
ctx.getState().setOk(effectRows, 0, sb);
// set insert result in connection context,
// so that user can use `show insert result` to get info of the last insert operation.
ctx.setOrUpdateInsertResult(
ctx.getTxnEntry().getTxnConf().getTxnId(),
ctx.getTxnEntry().getLabel(),
dbName,
tableName,
TransactionStatus.PREPARE,
effectRows,
0);
// update it, so that user can get loaded rows in fe.audit.log
ctx.updateReturnRows(effectRows);
}
private static InternalService.PDataRow getRowStringValue(List<NamedExpression> cols) {
if (cols.isEmpty()) {
return null;
}
InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder();
for (Expression expr : cols) {
while (expr instanceof Alias || expr instanceof Cast) {
expr = expr.child(0);
}
if (!(expr instanceof Literal)) {
throw new AnalysisException(
"do not support non-literal expr in transactional insert operation: " + expr.toSql());
}
if (expr instanceof NullLiteral) {
row.addColBuilder().setValue(StmtExecutor.NULL_VALUE_FOR_LOAD);
} else if (expr instanceof ArrayLiteral) {
row.addColBuilder().setValue(String.format("\"%s\"",
((ArrayLiteral) expr).toLegacyLiteral().getStringValueForArray()));
} else {
row.addColBuilder().setValue(String.format("\"%s\"",
((Literal) expr).toLegacyLiteral().getStringValue()));
}
}
return row.build();
}
private static void beginBatchInsertTransaction(ConnectContext ctx,
String dbName, String tblName, List<Column> columns) {
TransactionEntry txnEntry = ctx.getTxnEntry();
TTxnParams txnConf = txnEntry.getTxnConf();
SessionVariable sessionVariable = ctx.getSessionVariable();
long timeoutSecond = ctx.getExecTimeout();
TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
Database dbObj = Env.getCurrentInternalCatalog()
.getDbOrException(dbName, s -> new AnalysisException("database is invalid for dbName: " + s));
Table tblObj = dbObj.getTableOrException(tblName, s -> new AnalysisException("table is invalid: " + s));
txnConf.setDbId(dbObj.getId()).setTbl(tblName).setDb(dbName);
txnEntry.setTable(tblObj);
txnEntry.setDb(dbObj);
String label = txnEntry.getLabel();
try {
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
label, new TransactionState.TxnCoordinator(
TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
sourceType, timeoutSecond);
txnConf.setTxnId(txnId);
String token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
txnConf.setToken(token);
} catch (UserException e) {
throw new AnalysisException(e.getMessage(), e);
}
TStreamLoadPutRequest request = new TStreamLoadPutRequest();
long maxExecMemByte = sessionVariable.getMaxExecMemByte();
String timeZone = sessionVariable.getTimeZone();
int sendBatchParallelism = sessionVariable.getSendBatchParallelism();
request.setTxnId(txnConf.getTxnId())
.setDb(txnConf.getDb())
.setTbl(txnConf.getTbl())
.setColumns(columns.stream()
.map(Column::getName)
.map(n -> n.replace("`", "``"))
.map(n -> "`" + n + "`")
.collect(Collectors.joining(",")))
.setFileType(TFileType.FILE_STREAM)
.setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
.setMergeType(TMergeType.APPEND)
.setThriftRpcTimeoutMs(5000)
.setLoadId(ctx.queryId())
.setExecMemLimit(maxExecMemByte)
.setTimeout((int) timeoutSecond)
.setTimezone(timeZone)
.setSendBatchParallelism(sendBatchParallelism)
.setTrimDoubleQuotes(true);
// execute begin txn
InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(txnEntry);
try {
executor.beginTransaction(request);
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}
}
/**
* normalize plan to let it could be process correctly by nereids
*/
public static Plan normalizePlan(Plan plan, TableIf table) {
UnboundTableSink<? extends Plan> unboundTableSink = (UnboundTableSink<? extends Plan>) plan;
Plan query = unboundTableSink.child();
if (!(query instanceof LogicalInlineTable)) {
return plan;
}
LogicalInlineTable logicalInlineTable = (LogicalInlineTable) query;
ImmutableList.Builder<LogicalPlan> oneRowRelationBuilder = ImmutableList.builder();
List<Column> columns = table.getBaseSchema(false);
for (List<NamedExpression> values : logicalInlineTable.getConstantExprsList()) {
ImmutableList.Builder<NamedExpression> constantExprs = ImmutableList.builder();
if (values.isEmpty()) {
if (CollectionUtils.isNotEmpty(unboundTableSink.getColNames())) {
throw new AnalysisException("value list should not be empty if columns are specified");
}
for (Column column : columns) {
constantExprs.add(generateDefaultExpression(column));
}
} else {
if (CollectionUtils.isNotEmpty(unboundTableSink.getColNames())) {
if (values.size() != unboundTableSink.getColNames().size()) {
throw new AnalysisException("Column count doesn't match value count");
}
for (int i = 0; i < values.size(); i++) {
if (values.get(i) instanceof DefaultValueSlot) {
boolean hasDefaultValue = false;
for (Column column : columns) {
if (unboundTableSink.getColNames().get(i).equalsIgnoreCase(column.getName())) {
constantExprs.add(generateDefaultExpression(column));
hasDefaultValue = true;
}
}
if (!hasDefaultValue) {
throw new AnalysisException("Unknown column '"
+ unboundTableSink.getColNames().get(i) + "' in target table.");
}
} else {
constantExprs.add(values.get(i));
}
}
} else {
if (values.size() != columns.size()) {
throw new AnalysisException("Column count doesn't match value count");
}
for (int i = 0; i < columns.size(); i++) {
if (values.get(i) instanceof DefaultValueSlot) {
constantExprs.add(generateDefaultExpression(columns.get(i)));
} else {
constantExprs.add(values.get(i));
}
}
}
}
oneRowRelationBuilder.add(new UnboundOneRowRelation(
StatementScopeIdGenerator.newRelationId(), constantExprs.build()));
}
List<LogicalPlan> oneRowRelations = oneRowRelationBuilder.build();
if (oneRowRelations.size() == 1) {
return plan.withChildren(oneRowRelations.get(0));
} else {
return plan.withChildren(
LogicalPlanBuilder.reduceToLogicalPlanTree(0, oneRowRelations.size() - 1,
oneRowRelations, Qualifier.ALL));
}
}
/**
* get target table from names.
*/
public static TableIf getTargetTable(Plan plan, ConnectContext ctx) {
if (!(plan instanceof UnboundTableSink)) {
throw new AnalysisException("the root of plan should be UnboundTableSink"
+ " but it is " + plan.getType());
}
UnboundTableSink<? extends Plan> unboundTableSink = (UnboundTableSink<? extends Plan>) plan;
List<String> tableQualifier = RelationUtil.getQualifierName(ctx, unboundTableSink.getNameParts());
return RelationUtil.getDbAndTable(tableQualifier, ctx.getEnv()).second;
}
private static NamedExpression generateDefaultExpression(Column column) {
try {
if (column.getDefaultValue() == null) {
throw new AnalysisException("Column has no default value, column=" + column.getName());
}
if (column.getDefaultValueExpr() != null) {
Expression defualtValueExpression = new NereidsParser().parseExpression(
column.getDefaultValueExpr().toSqlWithoutTbl());
if (!(defualtValueExpression instanceof UnboundAlias)) {
defualtValueExpression = new UnboundAlias(defualtValueExpression);
}
return (NamedExpression) defualtValueExpression;
} else {
return new Alias(Literal.of(column.getDefaultValue())
.checkedCastTo(DataType.fromCatalogType(column.getType())),
column.getName());
}
} catch (org.apache.doris.common.AnalysisException e) {
throw new AnalysisException(e.getMessage(), e);
}
}
}

View File

@ -17,27 +17,18 @@
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.AddPartitionLikeClause;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.ReplacePartitionClause;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.ProfileManager.ProfileType;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.TreeNode;
@ -48,42 +39,33 @@ import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.txn.Transaction;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.DdlExecutor;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
/**
* insert into select command implementation
* insert into select command support the grammer: explain? insert into table columns? partitions? hints? query
* insert into select command support the grammar: explain? insert into table columns? partitions? hints? query
* InsertIntoTableCommand is a command to represent insert the answer of a query into a table.
* class structure's:
* InsertIntoTableCommand(Query())
@ -93,13 +75,8 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
public static final Logger LOG = LogManager.getLogger(InsertIntoTableCommand.class);
private final LogicalPlan logicalQuery;
private LogicalPlan logicalQuery;
private Optional<String> labelName;
private final boolean isOverwrite;
private NereidsPlanner planner;
private boolean isTxnBegin = false;
/**
* When source it's from job scheduler,it will be set.
*/
@ -108,12 +85,10 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
/**
* constructor
*/
public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> labelName, boolean isOverwrite) {
public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> labelName) {
super(PlanType.INSERT_INTO_TABLE_COMMAND);
this.logicalQuery = Objects.requireNonNull(logicalQuery,
"logicalQuery cannot be null in InsertIntoTableCommand");
this.labelName = labelName;
this.isOverwrite = isOverwrite;
this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null");
this.labelName = Objects.requireNonNull(labelName, "labelName should not be null");
}
public void setLabelName(Optional<String> labelName) {
@ -124,10 +99,6 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
this.jobId = jobId;
}
public NereidsPlanner getPlanner() {
return planner;
}
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
if (!ctx.getSessionVariable().isEnableNereidsDML()) {
@ -139,258 +110,68 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner");
}
if (ctx.isTxnModel()) {
// in original planner, if is in txn model, insert into select command and tableRef >= 1 will be refused.
// we can just run select a one-row-relation like select 1, 2, 3
// in StmtExecutor#executeForTxn, select 1, 2, 3 means valueList is null, so the if-clause from line 1556
// to 1580 will be skipped and effect rows will be always 0
// in nereids, we just forbid it.
throw new AnalysisException("insert into table command is not supported in txn model");
}
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
executor.checkBlockRules();
if (ctx.getMysqlChannel() != null) {
ctx.getMysqlChannel().reset();
}
String label = this.labelName.orElse(String.format("label_%x_%x", ctx.queryId().hi, ctx.queryId().lo));
Optional<TreeNode<?>> plan = (planner.getPhysicalPlan()
.<Set<TreeNode<?>>>collect(node -> node instanceof PhysicalOlapTableSink)).stream().findAny();
Preconditions.checkArgument(plan.isPresent(), "insert into command must contain OlapTableSinkNode");
PhysicalOlapTableSink<?> physicalOlapTableSink = ((PhysicalOlapTableSink<?>) plan.get());
OlapTable targetTable = physicalOlapTableSink.getTargetTable();
// check auth
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), targetTable.getQualifiedDbName(), targetTable.getName(),
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
targetTable.getQualifiedDbName() + ": " + targetTable.getName());
}
if (isOverwrite) {
dealOverwrite(ctx, executor, physicalOlapTableSink);
return;
}
OlapTableSink sink = ((OlapTableSink) planner.getFragments().get(0).getSink());
// group commit
if (analyzeGroupCommit(sink, physicalOlapTableSink)) {
/*handleGroupCommit(ctx, sink, physicalOlapTableSink);
return;*/
throw new AnalysisException("group commit is not supported in nereids now");
}
Preconditions.checkArgument(!isTxnBegin, "an insert command cannot create more than one txn");
Transaction txn = new Transaction(ctx,
physicalOlapTableSink.getDatabase(),
physicalOlapTableSink.getTargetTable(), label, planner);
isTxnBegin = true;
boolean isStrictMode = (ctx.getSessionVariable().getEnableInsertStrict()
&& physicalOlapTableSink.isPartialUpdate()
&& physicalOlapTableSink.isFromNativeInsertStmt());
sink.init(ctx.queryId(), txn.getTxnId(),
physicalOlapTableSink.getDatabase().getId(),
ctx.getExecTimeout(),
ctx.getSessionVariable().getSendBatchParallelism(),
false,
isStrictMode);
sink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(
physicalOlapTableSink.getDatabase().getId(),
txn.getTxnId());
if (state == null) {
throw new DdlException("txn does not exist: " + txn.getTxnId());
}
state.addTableIndexes(physicalOlapTableSink.getTargetTable());
if (physicalOlapTableSink.isFromNativeInsertStmt() && physicalOlapTableSink.isPartialUpdate()) {
state.setSchemaForPartialUpdate(physicalOlapTableSink.getTargetTable());
}
executor.setProfileType(ProfileType.LOAD);
LOG.info("Nereids start to execute the insert command, query id: {}, txn id: {}",
ctx.queryId(), txn.getTxnId());
txn.executeInsertIntoTableCommand(executor, jobId);
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
try {
String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage());
Env.getCurrentGlobalTransactionMgr().abortTransaction(
physicalOlapTableSink.getDatabase().getId(), txn.getTxnId(),
(errMsg == null ? "unknown reason" : errMsg));
} catch (Exception abortTxnException) {
LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier(), abortTxnException);
}
}
}
/**
* when `isOverwrite` is true, use this logic
*
* @param ctx ctx
* @param executor executor
* @param physicalOlapTableSink physicalOlapTableSink
* @throws Exception Exception
*/
public void dealOverwrite(ConnectContext ctx, StmtExecutor executor,
PhysicalOlapTableSink<?> physicalOlapTableSink) throws Exception {
OlapTable targetTable = physicalOlapTableSink.getTargetTable();
TableName tableName = new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, targetTable.getQualifiedDbName(),
targetTable.getName());
ConnectContext.get().setSkipAuth(true);
PhysicalOlapTableSink<?> physicalOlapTableSink;
DataSink sink;
InsertExecutor insertExecutor;
TableIf targetTableIf = InsertExecutor.getTargetTable(logicalQuery, ctx);
// should lock target table until we begin transaction.
targetTableIf.readLock();
try {
List<String> partitionNames = ((UnboundOlapTableSink<?>) logicalQuery).getPartitions();
if (CollectionUtils.isEmpty(partitionNames)) {
partitionNames = Lists.newArrayList(targetTable.getPartitionNames());
// 1. process inline table (default values, empty values)
this.logicalQuery = (LogicalPlan) InsertExecutor.normalizePlan(logicalQuery, targetTableIf);
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
executor.checkBlockRules();
if (ctx.getMysqlChannel() != null) {
ctx.getMysqlChannel().reset();
}
List<String> tempPartitionNames = addTempPartition(ctx, tableName, partitionNames);
boolean insertRes = insertInto(ctx, executor, tempPartitionNames, tableName);
if (!insertRes) {
return;
Optional<TreeNode<?>> plan = (planner.getPhysicalPlan()
.<Set<TreeNode<?>>>collect(node -> node instanceof PhysicalOlapTableSink)).stream().findAny();
Preconditions.checkArgument(plan.isPresent(), "insert into command must contain OlapTableSinkNode");
physicalOlapTableSink = ((PhysicalOlapTableSink<?>) plan.get());
Table targetTable = physicalOlapTableSink.getTargetTable();
// check auth
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), targetTable.getQualifiedDbName(), targetTable.getName(),
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
targetTable.getQualifiedDbName() + ": " + targetTable.getName());
}
replacePartition(ctx, tableName, partitionNames, tempPartitionNames);
sink = planner.getFragments().get(0).getSink();
// group commit
if (analyzeGroupCommit(ctx, sink, physicalOlapTableSink)) {
// handleGroupCommit(ctx, sink, physicalOlapTableSink);
// return;
throw new AnalysisException("group commit is not supported in nereids now");
}
String label = this.labelName.orElse(String.format("label_%x_%x", ctx.queryId().hi, ctx.queryId().lo));
insertExecutor = new InsertExecutor(ctx,
physicalOlapTableSink.getDatabase(),
physicalOlapTableSink.getTargetTable(), label, planner);
insertExecutor.beginTransaction();
} finally {
ConnectContext.get().setSkipAuth(false);
targetTableIf.readUnlock();
}
insertExecutor.finalizeSink(sink, physicalOlapTableSink.isPartialUpdate(),
physicalOlapTableSink.isFromNativeInsertStmt());
executor.setProfileType(ProfileType.LOAD);
insertExecutor.executeSingleInsertTransaction(executor, jobId);
}
/**
* replacing partitionNames with tempPartitionNames
*
* @param ctx ctx
* @param tableName tableName
* @param partitionNames partitionNames
* @param tempPartitionNames tempPartitionNames
* @throws UserException UserException
*/
private void replacePartition(ConnectContext ctx, TableName tableName, List<String> partitionNames,
List<String> tempPartitionNames)
throws UserException {
// overwrite old partition with tmp partition
try {
List<AlterClause> ops = new ArrayList<>();
Map<String, String> properties = new HashMap<>();
properties.put("use_temp_partition_name", "false");
ops.add(new ReplacePartitionClause(new PartitionNames(false, partitionNames),
new PartitionNames(true, tempPartitionNames), properties));
AlterTableStmt alterTableStmt = new AlterTableStmt(tableName, ops);
Env.getCurrentEnv().alterTable(alterTableStmt);
} catch (Exception e) {
LOG.warn("IOT overwrite table partitions error", e);
handleIotPartitionRollback(ctx, tableName, tempPartitionNames);
throw e;
}
}
/**
* insert into select
*
* @param ctx ctx
* @param executor executor
* @param tempPartitionNames tempPartitionNames
* @param tableName tableName
*/
private boolean insertInto(ConnectContext ctx, StmtExecutor executor, List<String> tempPartitionNames,
TableName tableName) {
try {
UnboundOlapTableSink<?> sink = (UnboundOlapTableSink<?>) logicalQuery;
UnboundOlapTableSink<?> copySink = new UnboundOlapTableSink<>(
sink.getNameParts(),
sink.getColNames(),
sink.getHints(),
tempPartitionNames,
sink.isPartialUpdate(),
sink.isFromNativeInsertStmt(),
(LogicalPlan) (sink.child(0)));
new InsertIntoTableCommand(copySink, labelName, false).run(ctx, executor);
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage());
LOG.warn("InsertInto state error:{}", errMsg);
handleIotPartitionRollback(ctx, tableName, tempPartitionNames);
return false;
}
return true;
} catch (Exception e) {
LOG.warn("InsertInto error", e);
handleIotPartitionRollback(ctx, tableName, tempPartitionNames);
return false;
}
}
/**
* add some tempPartitions
*
* @param ctx ctx
* @param tableName tableName
* @param partitionNames partitionNames
* @return tempPartitionNames
* @throws Exception Exception
*/
private List<String> addTempPartition(ConnectContext ctx, TableName tableName, List<String> partitionNames)
throws Exception {
List<String> tempPartitionNames = new ArrayList<>();
try {
// create tmp partitions with uuid
for (String partitionName : partitionNames) {
UUID uuid = UUID.randomUUID();
// to comply with naming rules
String tempPartName = "tmp_partition_" + uuid.toString().replace('-', '_');
List<AlterClause> ops = new ArrayList<>();
ops.add(new AddPartitionLikeClause(tempPartName, partitionName, true));
AlterTableStmt alterTableStmt = new AlterTableStmt(tableName, ops);
Analyzer tempAnalyzer = new Analyzer(Env.getCurrentEnv(), ctx);
alterTableStmt.analyze(tempAnalyzer);
DdlExecutor.execute(ctx.getEnv(), alterTableStmt);
// only when execution succeeded, put the temp partition name into list
tempPartitionNames.add(tempPartName);
}
return tempPartitionNames;
} catch (Exception e) {
LOG.warn("IOT create tmp table partitions error", e);
handleIotPartitionRollback(ctx, tableName, tempPartitionNames);
throw e;
}
}
/**
* delete temp partitions
*
* @param ctx ctx
* @param targetTableName targetTableName
* @param tempPartitionNames tempPartitionNames
*/
private void handleIotPartitionRollback(ConnectContext ctx, TableName targetTableName,
List<String> tempPartitionNames) {
try {
for (String partitionName : tempPartitionNames) {
List<AlterClause> ops = new ArrayList<>();
ops.add(new DropPartitionClause(true, partitionName, true, true));
AlterTableStmt dropTablePartitionStmt = new AlterTableStmt(targetTableName, ops);
Analyzer tempAnalyzer = new Analyzer(Env.getCurrentEnv(), ctx);
dropTablePartitionStmt.analyze(tempAnalyzer);
DdlExecutor.execute(ctx.getEnv(), dropTablePartitionStmt);
}
} catch (Exception ex) {
LOG.warn("IOT drop partitions error", ex);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + ex.getMessage());
}
}
private void handleGroupCommit(ConnectContext ctx, OlapTableSink sink,
private void handleGroupCommit(ConnectContext ctx, DataSink sink,
PhysicalOlapTableSink<?> physicalOlapTableSink)
throws UserException, RpcException, TException, ExecutionException, InterruptedException {
// TODO we should refactor this to remove rely on UnionNode
List<InternalService.PDataRow> rows = new ArrayList<>();
List<List<Expr>> materializedConstExprLists = ((UnionNode) sink.getFragment().getPlanRoot())
.getMaterializedConstExprLists();
List<List<Expr>> materializedConstExprLists = ((UnionNode) sink.getFragment()
.getPlanRoot()).getMaterializedConstExprLists();
int filterSize = 0;
for (Slot slot : physicalOlapTableSink.getOutput()) {
if (slot.getName().contains(Column.DELETE_SIGN)
@ -417,13 +198,11 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
}
TransactionStatus txnStatus = TransactionStatus.PREPARE;
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(response.getLabel()).append("', 'status':'").append(txnStatus.name());
sb.append("', 'txnId':'").append(response.getTxnId()).append("'");
sb.append("', 'optimizer':'").append("nereids").append("'");
sb.append("}");
ctx.getState().setOk(response.getLoadedRows(), (int) response.getFilteredRows(), sb.toString());
String sb = "{'label':'" + response.getLabel() + "', 'status':'" + txnStatus.name()
+ "', 'txnId':'" + response.getTxnId() + "'"
+ "', 'optimizer':'" + "nereids" + "'"
+ "}";
ctx.getState().setOk(response.getLoadedRows(), (int) response.getFilteredRows(), sb);
ctx.setOrUpdateInsertResult(response.getTxnId(), response.getLabel(),
physicalOlapTableSink.getDatabase().getFullName(), physicalOlapTableSink.getTargetTable().getName(),
txnStatus, response.getLoadedRows(), (int) response.getFilteredRows());
@ -431,7 +210,14 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
ctx.updateReturnRows((int) response.getLoadedRows());
}
private boolean analyzeGroupCommit(OlapTableSink sink, PhysicalOlapTableSink<?> physicalOlapTableSink) {
private boolean analyzeGroupCommit(ConnectContext ctx, DataSink sink,
PhysicalOlapTableSink<?> physicalOlapTableSink) {
if (!(sink instanceof OlapTableSink)) {
return false;
}
if (!ctx.getSessionVariable().isEnableInsertGroupCommit()) {
return false;
}
return ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit()
&& physicalOlapTableSink.getTargetTable() instanceof OlapTable
&& !ConnectContext.get().isTxnModel()
@ -441,7 +227,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
@Override
public Plan getExplainPlan(ConnectContext ctx) {
return this.logicalQuery;
return InsertExecutor.normalizePlan(this.logicalQuery, InsertExecutor.getTargetTable(this.logicalQuery, ctx));
}
@Override

View File

@ -0,0 +1,288 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.AddPartitionLikeClause;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.ReplacePartitionClause;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.TreeNode;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.DdlExecutor;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
/**
* insert into select command implementation
* insert into select command support the grammer: explain? insert into table columns? partitions? hints? query
* InsertIntoTableCommand is a command to represent insert the answer of a query into a table.
* class structure's:
* InsertIntoTableCommand(Query())
* ExplainCommand(Query())
*/
public class InsertOverwriteTableCommand extends Command implements ForwardWithSync, Explainable {
private static final Logger LOG = LogManager.getLogger(InsertOverwriteTableCommand.class);
private LogicalPlan logicalQuery;
private Optional<String> labelName;
/**
* constructor
*/
public InsertOverwriteTableCommand(LogicalPlan logicalQuery, Optional<String> labelName) {
super(PlanType.INSERT_INTO_TABLE_COMMAND);
this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null");
this.labelName = Objects.requireNonNull(labelName, "labelName should not be null");
}
public void setLabelName(Optional<String> labelName) {
this.labelName = labelName;
}
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
if (!ctx.getSessionVariable().isEnableNereidsDML()) {
try {
ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
} catch (Exception e) {
throw new AnalysisException("failed to set fallback to original planner to true", e);
}
throw new AnalysisException("Nereids DML is disabled, will try to fall back to the original planner");
}
// insert overwrite only support
TableIf targetTableIf = InsertExecutor.getTargetTable(logicalQuery, ctx);
if (!(targetTableIf instanceof OlapTable)) {
throw new AnalysisException("insert into overwrite only support OLAP table."
+ " But current table type is " + targetTableIf.getType());
}
this.logicalQuery = (LogicalPlan) InsertExecutor.normalizePlan(logicalQuery, targetTableIf);
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
executor.checkBlockRules();
if (ctx.getMysqlChannel() != null) {
ctx.getMysqlChannel().reset();
}
Optional<TreeNode<?>> plan = (planner.getPhysicalPlan()
.<Set<TreeNode<?>>>collect(node -> node instanceof PhysicalOlapTableSink)).stream().findAny();
Preconditions.checkArgument(plan.isPresent(), "insert into command must contain OlapTableSinkNode");
PhysicalOlapTableSink<?> physicalOlapTableSink = ((PhysicalOlapTableSink<?>) plan.get());
OlapTable targetTable = physicalOlapTableSink.getTargetTable();
// check auth
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), targetTable.getQualifiedDbName(), targetTable.getName(),
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
targetTable.getQualifiedDbName() + ": " + targetTable.getName());
}
TableName tableName = new TableName(InternalCatalog.INTERNAL_CATALOG_NAME,
targetTable.getQualifiedDbName(), targetTable.getName());
ConnectContext.get().setSkipAuth(true);
try {
List<String> partitionNames = ((UnboundTableSink<?>) logicalQuery).getPartitions();
if (CollectionUtils.isEmpty(partitionNames)) {
partitionNames = Lists.newArrayList(targetTable.getPartitionNames());
}
List<String> tempPartitionNames = addTempPartitions(ctx, tableName, partitionNames);
boolean insertRes = insertInto(ctx, executor, tempPartitionNames, tableName);
if (!insertRes) {
return;
}
replacePartition(ctx, tableName, partitionNames, tempPartitionNames);
} finally {
ConnectContext.get().setSkipAuth(false);
}
}
/**
* replacing partitionNames with tempPartitionNames
*
* @param ctx ctx
* @param tableName tableName
* @param partitionNames partitionNames
* @param tempPartitionNames tempPartitionNames
* @throws UserException UserException
*/
private void replacePartition(ConnectContext ctx, TableName tableName, List<String> partitionNames,
List<String> tempPartitionNames)
throws UserException {
// overwrite old partition with tmp partition
try {
List<AlterClause> ops = new ArrayList<>();
Map<String, String> properties = new HashMap<>();
properties.put("use_temp_partition_name", "false");
ops.add(new ReplacePartitionClause(new PartitionNames(false, partitionNames),
new PartitionNames(true, tempPartitionNames), properties));
AlterTableStmt alterTableStmt = new AlterTableStmt(tableName, ops);
Env.getCurrentEnv().alterTable(alterTableStmt);
} catch (Exception e) {
LOG.warn("IOT overwrite table partitions error", e);
rollback(ctx, tableName, tempPartitionNames);
throw e;
}
}
/**
* insert into select
*
* @param ctx ctx
* @param executor executor
* @param tempPartitionNames tempPartitionNames
* @param tableName tableName
*/
private boolean insertInto(ConnectContext ctx, StmtExecutor executor, List<String> tempPartitionNames,
TableName tableName) {
try {
UnboundTableSink<?> sink = (UnboundTableSink<?>) logicalQuery;
UnboundTableSink<?> copySink = new UnboundTableSink<>(
sink.getNameParts(),
sink.getColNames(),
sink.getHints(),
true,
tempPartitionNames,
sink.isPartialUpdate(),
sink.isFromNativeInsertStmt(),
(LogicalPlan) (sink.child(0)));
new InsertIntoTableCommand(copySink, labelName).run(ctx, executor);
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage());
LOG.warn("InsertInto state error:{}", errMsg);
rollback(ctx, tableName, tempPartitionNames);
return false;
}
return true;
} catch (Exception e) {
LOG.warn("InsertInto error", e);
rollback(ctx, tableName, tempPartitionNames);
return false;
}
}
/**
* add some tempPartitions
*
* @param ctx ctx
* @param tableName tableName
* @param partitionNames partitionNames
* @return tempPartitionNames
* @throws Exception Exception
*/
private List<String> addTempPartitions(ConnectContext ctx, TableName tableName, List<String> partitionNames)
throws Exception {
List<String> tempPartitionNames = new ArrayList<>();
try {
// create tmp partitions with uuid
for (String partitionName : partitionNames) {
UUID uuid = UUID.randomUUID();
// to comply with naming rules
String tempPartName = "tmp_partition_" + uuid.toString().replace('-', '_');
List<AlterClause> ops = new ArrayList<>();
ops.add(new AddPartitionLikeClause(tempPartName, partitionName, true));
AlterTableStmt alterTableStmt = new AlterTableStmt(tableName, ops);
Analyzer tempAnalyzer = new Analyzer(Env.getCurrentEnv(), ctx);
alterTableStmt.analyze(tempAnalyzer);
DdlExecutor.execute(ctx.getEnv(), alterTableStmt);
// only when execution succeeded, put the temp partition name into list
tempPartitionNames.add(tempPartName);
}
return tempPartitionNames;
} catch (Exception e) {
LOG.warn("IOT create tmp table partitions error", e);
rollback(ctx, tableName, tempPartitionNames);
throw e;
}
}
/**
* delete temp partitions
*
* @param ctx ctx
* @param targetTableName targetTableName
* @param tempPartitionNames tempPartitionNames
*/
private void rollback(ConnectContext ctx, TableName targetTableName,
List<String> tempPartitionNames) {
try {
for (String partitionName : tempPartitionNames) {
List<AlterClause> ops = new ArrayList<>();
ops.add(new DropPartitionClause(true, partitionName, true, true));
AlterTableStmt dropTablePartitionStmt = new AlterTableStmt(targetTableName, ops);
Analyzer tempAnalyzer = new Analyzer(Env.getCurrentEnv(), ctx);
dropTablePartitionStmt.analyze(tempAnalyzer);
DdlExecutor.execute(ctx.getEnv(), dropTablePartitionStmt);
}
} catch (Exception ex) {
LOG.warn("IOT drop partitions error", ex);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + ex.getMessage());
}
}
@Override
public Plan getExplainPlan(ConnectContext ctx) {
return InsertExecutor.normalizePlan(this.logicalQuery, InsertExecutor.getTargetTable(this.logicalQuery, ctx));
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitInsertOverwriteTableCommand(this, context);
}
}

View File

@ -30,10 +30,10 @@ import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundStar;
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
@ -225,7 +225,7 @@ public class LoadCommand extends Command implements ForwardWithSync {
checkAndAddSequenceCol(olapTable, dataDesc, sinkCols, selectLists);
boolean isPartialUpdate = olapTable.getEnableUniqueKeyMergeOnWrite()
&& sinkCols.size() < olapTable.getColumns().size();
return new UnboundOlapTableSink<>(dataDesc.getNameParts(), sinkCols, ImmutableList.of(),
return new UnboundTableSink<>(dataDesc.getNameParts(), sinkCols, ImmutableList.of(),
dataDesc.getPartitionNames(), isPartialUpdate, tvfLogicalPlan);
}

View File

@ -23,8 +23,8 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.EqualTo;
@ -89,7 +89,7 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty(), false).run(ctx, executor);
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty()).run(ctx, executor);
}
/**
@ -136,7 +136,7 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab
&& selectItems.size() < targetTable.getColumns().size();
// make UnboundTableSink
return new UnboundOlapTableSink<>(nameParts, ImmutableList.of(), ImmutableList.of(),
return new UnboundTableSink<>(nameParts, ImmutableList.of(), ImmutableList.of(),
ImmutableList.of(), isPartialUpdate, logicalQuery);
}

View File

@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.BlockFuncDepsPropagation;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* represent value list such as values(1), (2), (3) will generate LogicalInlineTable((1), (2), (3)).
*/
public class LogicalInlineTable extends LogicalLeaf implements BlockFuncDepsPropagation {
private final List<List<NamedExpression>> constantExprsList;
public LogicalInlineTable(List<List<NamedExpression>> constantExprsList) {
this(constantExprsList, Optional.empty(), Optional.empty());
}
public LogicalInlineTable(List<List<NamedExpression>> constantExprsList,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties) {
super(PlanType.LOGICAL_INLINE_TABLE, groupExpression, logicalProperties);
this.constantExprsList = ImmutableList.copyOf(
Objects.requireNonNull(constantExprsList, "constantExprsList should not be null"));
}
public List<List<NamedExpression>> getConstantExprsList() {
return constantExprsList;
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitLogicalInlineTable(this, context);
}
@Override
public List<? extends Expression> getExpressions() {
return constantExprsList.stream().flatMap(List::stream).collect(ImmutableList.toImmutableList());
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return null;
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return null;
}
@Override
public List<Slot> computeOutput() {
return ImmutableList.of();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LogicalInlineTable that = (LogicalInlineTable) o;
return Objects.equals(constantExprsList, that.constantExprsList);
}
@Override
public int hashCode() {
return Objects.hash(constantExprsList);
}
}

View File

@ -56,6 +56,12 @@ public class LogicalUnion extends LogicalSetOperation implements Union, OutputPr
this.constantExprsList = ImmutableList.of();
}
public LogicalUnion(Qualifier qualifier, List<List<NamedExpression>> constantExprsList, List<Plan> children) {
super(PlanType.LOGICAL_UNION, qualifier, children);
this.hasPushedFilter = false;
this.constantExprsList = constantExprsList;
}
public LogicalUnion(Qualifier qualifier, List<NamedExpression> outputs, List<List<SlotReference>> childrenOutputs,
List<List<NamedExpression>> constantExprsList, boolean hasPushedFilter, List<Plan> children) {
super(PlanType.LOGICAL_UNION, qualifier, outputs, childrenOutputs, children);

View File

@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.visitor;
import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
@ -29,6 +30,7 @@ import org.apache.doris.nereids.trees.plans.commands.DropMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.commands.ExportCommand;
import org.apache.doris.nereids.trees.plans.commands.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.nereids.trees.plans.commands.RefreshMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.UpdateCommand;
@ -46,9 +48,19 @@ public interface CommandVisitor<R, C> {
return visitCommand(createPolicy, context);
}
default R visitInsertIntoTableCommand(InsertIntoTableCommand insertIntoSelectCommand,
default R visitInsertIntoTableCommand(InsertIntoTableCommand insertIntoTableCommand,
C context) {
return visitCommand(insertIntoSelectCommand, context);
return visitCommand(insertIntoTableCommand, context);
}
default R visitInsertOverwriteTableCommand(InsertOverwriteTableCommand insertOverwriteTableCommand,
C context) {
return visitCommand(insertOverwriteTableCommand, context);
}
default R visitBatchInsertIntoTableCommand(BatchInsertIntoTableCommand batchInsertIntoTableCommand,
C context) {
return visitCommand(batchInsertIntoTableCommand, context);
}
default R visitUpdateCommand(UpdateCommand updateCommand, C context) {

View File

@ -33,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
import org.apache.doris.nereids.trees.plans.logical.LogicalIntersect;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
@ -175,6 +176,10 @@ public abstract class PlanVisitor<R, C> implements CommandVisitor<R, C>, Relatio
return visit(having, context);
}
public R visitLogicalInlineTable(LogicalInlineTable logicalInlineTable, C context) {
return visit(logicalInlineTable, context);
}
public R visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan> join, C context) {
return visit(join, context);
}

View File

@ -17,8 +17,8 @@
package org.apache.doris.nereids.trees.plans.visitor;
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
@ -48,8 +48,8 @@ public interface SinkVisitor<R, C> {
// unbound
// *******************************
default R visitUnboundOlapTableSink(UnboundOlapTableSink<? extends Plan> unboundOlapTableSink, C context) {
return visitLogicalSink(unboundOlapTableSink, context);
default R visitUnboundTableSink(UnboundTableSink<? extends Plan> unboundTableSink, C context) {
return visitLogicalSink(unboundTableSink, context);
}
default R visitUnboundResultSink(UnboundResultSink<? extends Plan> unboundResultSink, C context) {

View File

@ -1,248 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.txn;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* transaction wrapper for Nereids
*/
public class Transaction {
public static final Logger LOG = LogManager.getLogger(Transaction.class);
private final ConnectContext ctx;
private final NereidsPlanner planner;
private final long createAt;
private final long txnId;
private final String labelName;
private final Database database;
private final Table table;
private long loadedRows = 0;
private int filteredRows = 0;
private TransactionStatus txnStatus = TransactionStatus.ABORTED;
private String errMsg = "";
private final Coordinator coordinator;
/**
* constructor
*/
public Transaction(ConnectContext ctx, Database database, Table table, String labelName, NereidsPlanner planner)
throws UserException {
this.ctx = ctx;
this.labelName = labelName;
this.database = database;
this.table = table;
this.planner = planner;
this.coordinator = new Coordinator(ctx, null, planner, ctx.getStatsErrorEstimator());
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), ImmutableList.of(table.getId()), labelName,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
LoadJobSourceType.INSERT_STREAMING, ctx.getExecTimeout());
this.createAt = System.currentTimeMillis();
}
public long getTxnId() {
return txnId;
}
/**
* execute insert txn for insert into select command.
*/
public void executeInsertIntoTableCommand(StmtExecutor executor, long jobId) {
LOG.info("Do insert [{}] with query id: {}", labelName, DebugUtil.printId(ctx.queryId()));
Throwable throwable = null;
try {
coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict());
coordinator.setQueryType(TQueryType.LOAD);
executor.getProfile().setExecutionProfile(coordinator.getExecutionProfile());
QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), coordinator);
coordinator.exec();
int execTimeout = ctx.getExecTimeout();
LOG.debug("Insert {} execution timeout:{}", DebugUtil.printId(ctx.queryId()), execTimeout);
boolean notTimeout = coordinator.join(execTimeout);
if (!coordinator.isDone()) {
coordinator.cancel();
if (notTimeout) {
errMsg = coordinator.getExecStatus().getErrorMsg();
ErrorReport.reportDdlException("There exists unhealthy backend. "
+ errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
}
}
if (!coordinator.getExecStatus().ok()) {
errMsg = coordinator.getExecStatus().getErrorMsg();
LOG.warn("insert failed: {}", errMsg);
ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
}
LOG.debug("delta files is {}", coordinator.getDeltaUrls());
if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) {
loadedRows = Long.parseLong(coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
}
if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) {
filteredRows = Integer.parseInt(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
}
// if in strict mode, insert will fail if there are filtered rows
if (ctx.getSessionVariable().getEnableInsertStrict()) {
if (filteredRows > 0) {
ctx.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT,
"Insert has filtered data in strict mode, tracking_url=" + coordinator.getTrackingUrl());
return;
}
}
if (table.getType() != TableType.OLAP && table.getType() != TableType.MATERIALIZED_VIEW) {
// no need to add load job.
// MySQL table is already being inserted.
ctx.getState().setOk(loadedRows, filteredRows, null);
return;
}
if (Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
database, Lists.newArrayList(table),
txnId,
TabletCommitInfo.fromThrift(coordinator.getCommitInfos()),
ctx.getSessionVariable().getInsertVisibleTimeoutMs())) {
txnStatus = TransactionStatus.VISIBLE;
} else {
txnStatus = TransactionStatus.COMMITTED;
}
} catch (Throwable t) {
// if any throwable being thrown during insert operation, first we should abort this txn
LOG.warn("handle insert stmt fail: {}", labelName, t);
try {
Env.getCurrentGlobalTransactionMgr().abortTransaction(
database.getId(), txnId,
t.getMessage() == null ? "unknown reason" : t.getMessage());
} catch (Exception abortTxnException) {
// just print a log if abort txn failed. This failure do not need to pass to user.
// user only concern abort how txn failed.
LOG.warn("errors when abort txn", abortTxnException);
}
if (!Config.using_old_load_usage_pattern) {
// if not using old load usage pattern, error will be returned directly to user
StringBuilder sb = new StringBuilder(t.getMessage());
if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
sb.append(". url: " + coordinator.getTrackingUrl());
}
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString());
return;
}
/*
* If config 'using_old_load_usage_pattern' is true.
* Doris will return a label to user, and user can use this label to check load job's status,
* which exactly like the old insert stmt usage pattern.
*/
throwable = t;
} finally {
executor.updateProfile(true);
QeProcessorImpl.INSTANCE.unregisterQuery(ctx.queryId());
}
// Go here, which means:
// 1. transaction is finished successfully (COMMITTED or VISIBLE), or
// 2. transaction failed but Config.using_old_load_usage_pattern is true.
// we will record the load job info for these 2 cases
try {
// the statement parsed by Nereids is saved at executor::parsedStmt.
StatementBase statement = executor.getParsedStmt();
UserIdentity userIdentity;
//if we use job scheduler, parse statement will not set user identity,so we need to get it from context
if (null == statement) {
userIdentity = ctx.getCurrentUserIdentity();
} else {
userIdentity = statement.getUserInfo();
}
EtlJobType etlJobType = EtlJobType.INSERT;
if (0 != jobId) {
etlJobType = EtlJobType.INSERT_JOB;
}
ctx.getEnv().getLoadManager()
.recordFinishedLoadJob(labelName, txnId, database.getFullName(),
table.getId(),
etlJobType, createAt, throwable == null ? "" : throwable.getMessage(),
coordinator.getTrackingUrl(), userIdentity, jobId);
} catch (MetaNotFoundException e) {
LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
errMsg = "Record info of insert load with error " + e.getMessage();
}
// {'label':'my_label1', 'status':'visible', 'txnId':'123'}
// {'label':'my_label1', 'status':'visible', 'txnId':'123' 'err':'error messages'}
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(labelName).append("', 'status':'").append(txnStatus.name());
sb.append("', 'txnId':'").append(txnId).append("'");
if (table.getType() == TableType.MATERIALIZED_VIEW) {
sb.append("', 'rows':'").append(loadedRows).append("'");
}
if (!Strings.isNullOrEmpty(errMsg)) {
sb.append(", 'err':'").append(errMsg).append("'");
}
sb.append("}");
ctx.getState().setOk(loadedRows, filteredRows, sb.toString());
// set insert result in connection context,
// so that user can use `show insert result` to get info of the last insert operation.
ctx.setOrUpdateInsertResult(txnId, labelName, database.getFullName(), table.getName(),
txnStatus, loadedRows, filteredRows);
// update it, so that user can get loaded rows in fe.audit.log
ctx.updateReturnRows((int) loadedRows);
}
}

View File

@ -122,6 +122,7 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.minidump.MinidumpUtils;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.Forward;
@ -518,6 +519,13 @@ public class StmtExecutor {
"Nereids only process LogicalPlanAdapter, but parsedStmt is " + parsedStmt.getClass().getName());
context.getState().setNereids(true);
LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
// when we in transaction mode, we only support insert into command and transaction command
if (context.isTxnModel()) {
if (!(logicalPlan instanceof BatchInsertIntoTableCommand)) {
String errMsg = "This is in a transaction, only insert, commit, rollback is acceptable.";
throw new NereidsException(errMsg, new AnalysisException(errMsg));
}
}
if (logicalPlan instanceof Command) {
if (logicalPlan instanceof Forward) {
redirectStatus = ((Forward) logicalPlan).toRedirectStatus();