[cherry-pick](branch-2.1) let insert statement support CTE (#36150) (#36265)

## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->

cherry-pick: #36150
This commit is contained in:
Liu Zhenlong
2024-07-16 14:50:53 +08:00
committed by GitHub
parent 9bb37e3616
commit 2c4b5519e9
11 changed files with 133 additions and 17 deletions

View File

@ -579,9 +579,13 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(),
DMLCommandType.INSERT,
plan);
Optional<LogicalPlan> cte = Optional.empty();
if (ctx.cte() != null) {
cte = Optional.ofNullable(withCte(plan, ctx.cte()));
}
LogicalPlan command;
if (isOverwrite) {
command = new InsertOverwriteTableCommand(sink, labelName);
command = new InsertOverwriteTableCommand(sink, labelName, cte);
} else {
if (ConnectContext.get() != null && ConnectContext.get().isTxnModel()
&& sink.child() instanceof LogicalInlineTable) {
@ -590,13 +594,10 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
// Now handle it as `insert into select`(a separate load job), should fix it as the legacy.
command = new BatchInsertIntoTableCommand(sink);
} else {
command = new InsertIntoTableCommand(sink, labelName, Optional.empty());
command = new InsertIntoTableCommand(sink, labelName, Optional.empty(), cte);
}
}
if (ctx.explain() != null) {
return withExplain(command, ctx.explain());
}
return command;
return withExplain(command, ctx.explain());
}
/**

View File

@ -173,7 +173,8 @@ public class CreateTableCommand extends Command implements ForwardWithSync {
ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), query);
try {
if (!FeConstants.runningUnitTest) {
new InsertIntoTableCommand(query, Optional.empty(), Optional.empty()).run(ctx, executor);
new InsertIntoTableCommand(query, Optional.empty(), Optional.empty(), Optional.empty()).run(
ctx, executor);
}
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
handleFallbackFailedCtas(ctx);

View File

@ -52,8 +52,8 @@ public class DeleteFromUsingCommand extends DeleteFromCommand {
+ " Please check the following session variables: "
+ String.join(", ", SessionVariable.DEBUG_VARIABLES));
}
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty(), Optional.empty()).run(ctx,
executor);
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty(), Optional.empty(),
Optional.empty()).run(ctx, executor);
}
@Override

View File

@ -134,7 +134,7 @@ public class LoadCommand extends Command implements ForwardWithSync {
profile.getSummaryProfile().setQueryBeginTime();
if (sourceInfos.size() == 1) {
plans = ImmutableList.of(new InsertIntoTableCommand(completeQueryPlan(ctx, sourceInfos.get(0)),
Optional.of(labelName), Optional.empty()));
Optional.of(labelName), Optional.empty(), Optional.empty()));
} else {
throw new AnalysisException("Multi insert into statements are unsupported.");
}

View File

@ -94,8 +94,8 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty(), Optional.empty()).run(ctx,
executor);
new InsertIntoTableCommand(completeQueryPlan(ctx, logicalQuery), Optional.empty(), Optional.empty(),
Optional.empty()).run(ctx, executor);
}
/**

View File

@ -80,7 +80,7 @@ import java.util.stream.Collectors;
*/
public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
private UpdateMvByPartitionCommand(LogicalPlan logicalQuery) {
super(logicalQuery, Optional.empty());
super(logicalQuery, Optional.empty(), Optional.empty());
}
/**

View File

@ -74,16 +74,18 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
*/
private long jobId;
private Optional<InsertCommandContext> insertCtx;
private final Optional<LogicalPlan> cte;
/**
* constructor
*/
public InsertIntoTableCommand(LogicalPlan logicalQuery, Optional<String> labelName,
Optional<InsertCommandContext> insertCtx) {
Optional<InsertCommandContext> insertCtx, Optional<LogicalPlan> cte) {
super(PlanType.INSERT_INTO_TABLE_COMMAND);
this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null");
this.labelName = Objects.requireNonNull(labelName, "labelName should not be null");
this.insertCtx = insertCtx;
this.cte = cte;
}
public Optional<String> getLabelName() {
@ -141,6 +143,9 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
try {
// 1. process inline table (default values, empty values)
this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf);
if (cte.isPresent()) {
this.logicalQuery = ((LogicalPlan) cte.get().withChildren(logicalQuery));
}
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());

View File

@ -77,14 +77,17 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
private LogicalPlan logicalQuery;
private Optional<String> labelName;
private final Optional<LogicalPlan> cte;
/**
* constructor
*/
public InsertOverwriteTableCommand(LogicalPlan logicalQuery, Optional<String> labelName) {
public InsertOverwriteTableCommand(LogicalPlan logicalQuery, Optional<String> labelName,
Optional<LogicalPlan> cte) {
super(PlanType.INSERT_INTO_TABLE_COMMAND);
this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery should not be null");
this.labelName = Objects.requireNonNull(labelName, "labelName should not be null");
this.cte = cte;
}
public void setLabelName(Optional<String> labelName) {
@ -116,6 +119,10 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
throw new AnalysisException("Not allowed to perform current operation on async materialized view");
}
this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf);
if (cte.isPresent()) {
this.logicalQuery = (LogicalPlan) logicalQuery.withChildren(cte.get().withChildren(
this.logicalQuery.child(0)));
}
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
@ -186,7 +193,7 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
private void runInsertCommand(LogicalPlan logicalQuery, InsertCommandContext insertCtx,
ConnectContext ctx, StmtExecutor executor) throws Exception {
InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(logicalQuery, labelName,
Optional.of(insertCtx));
Optional.of(insertCtx), Optional.empty());
insertCommand.run(ctx, executor);
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage());