[Feat](nereids)when dealing insert into stmt with empty table source, fe returns directly (#35333)
* [Feat](nereids) when dealing insert into stmt with empty table source, fe returns directly (#34418) When a LogicalOlapScan has no partitions, transform it to a LogicalEmptyRelation. When dealing insert into stmt with empty table source, fe returns directly. * [Fix](nereids) fix when insert into select empty table --------- Co-authored-by: feiniaofeiafei <moailing@selectdb.com>
This commit is contained in:
@ -20,7 +20,11 @@ package org.apache.doris.nereids.rules.rewrite;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.nereids.rules.Rule;
|
||||
import org.apache.doris.nereids.rules.RuleType;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Used to prune empty partition.
|
||||
@ -32,7 +36,12 @@ public class PruneEmptyPartition extends OneRewriteRuleFactory {
|
||||
return logicalOlapScan().thenApply(ctx -> {
|
||||
LogicalOlapScan scan = ctx.root;
|
||||
OlapTable table = scan.getTable();
|
||||
return scan.withSelectedPartitionIds(table.selectNonEmptyPartitionIds(scan.getSelectedPartitionIds()));
|
||||
List<Long> ids = table.selectNonEmptyPartitionIds(scan.getSelectedPartitionIds());
|
||||
if (ids.isEmpty()) {
|
||||
return new LogicalEmptyRelation(ConnectContext.get().getStatementContext().getNextRelationId(),
|
||||
scan.getOutput());
|
||||
}
|
||||
return scan.withSelectedPartitionIds(ids);
|
||||
}).toRule(RuleType.PRUNE_EMPTY_PARTITION);
|
||||
}
|
||||
}
|
||||
|
||||
@ -86,7 +86,7 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
public class CreateMTMVInfo {
|
||||
public static final Logger LOG = LogManager.getLogger(CreateMTMVInfo.class);
|
||||
public static final String MTMV_PLANER_DISABLE_RULES = "OLAP_SCAN_PARTITION_PRUNE";
|
||||
public static final String MTMV_PLANER_DISABLE_RULES = "OLAP_SCAN_PARTITION_PRUNE,PRUNE_EMPTY_PARTITION";
|
||||
private final boolean ifNotExists;
|
||||
private final TableNameInfo mvName;
|
||||
private List<String> keys;
|
||||
|
||||
@ -58,18 +58,20 @@ public abstract class AbstractInsertExecutor {
|
||||
|
||||
protected String errMsg = "";
|
||||
protected Optional<InsertCommandContext> insertCtx;
|
||||
protected final boolean emptyInsert;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner,
|
||||
Optional<InsertCommandContext> insertCtx) {
|
||||
Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
|
||||
this.ctx = ctx;
|
||||
this.coordinator = new Coordinator(ctx, null, planner, ctx.getStatsErrorEstimator());
|
||||
this.labelName = labelName;
|
||||
this.table = table;
|
||||
this.database = table.getDatabase();
|
||||
this.insertCtx = insertCtx;
|
||||
this.emptyInsert = emptyInsert;
|
||||
}
|
||||
|
||||
public Coordinator getCoordinator() {
|
||||
@ -189,4 +191,8 @@ public abstract class AbstractInsertExecutor {
|
||||
}
|
||||
afterExec(executor);
|
||||
}
|
||||
|
||||
public boolean isEmptyInsert() {
|
||||
return emptyInsert;
|
||||
}
|
||||
}
|
||||
|
||||
@ -59,8 +59,9 @@ public abstract class BaseExternalTableInsertExecutor extends AbstractInsertExec
|
||||
*/
|
||||
public BaseExternalTableInsertExecutor(ConnectContext ctx, ExternalTable table,
|
||||
String labelName, NereidsPlanner planner,
|
||||
Optional<InsertCommandContext> insertCtx) {
|
||||
super(ctx, table, labelName, planner, insertCtx);
|
||||
Optional<InsertCommandContext> insertCtx,
|
||||
boolean emptyInsert) {
|
||||
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
|
||||
catalogName = table.getCatalog().getName();
|
||||
transactionManager = table.getCatalog().getTransactionManager();
|
||||
|
||||
|
||||
@ -43,8 +43,8 @@ public class HiveInsertExecutor extends BaseExternalTableInsertExecutor {
|
||||
*/
|
||||
public HiveInsertExecutor(ConnectContext ctx, HMSExternalTable table,
|
||||
String labelName, NereidsPlanner planner,
|
||||
Optional<InsertCommandContext> insertCtx) {
|
||||
super(ctx, table, labelName, planner, insertCtx);
|
||||
Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
|
||||
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -40,8 +40,9 @@ public class IcebergInsertExecutor extends BaseExternalTableInsertExecutor {
|
||||
*/
|
||||
public IcebergInsertExecutor(ConnectContext ctx, IcebergExternalTable table,
|
||||
String labelName, NereidsPlanner planner,
|
||||
Optional<InsertCommandContext> insertCtx) {
|
||||
super(ctx, table, labelName, planner, insertCtx);
|
||||
Optional<InsertCommandContext> insertCtx,
|
||||
boolean emptyInsert) {
|
||||
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -37,6 +37,7 @@ import org.apache.doris.nereids.trees.plans.PlanType;
|
||||
import org.apache.doris.nereids.trees.plans.commands.Command;
|
||||
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
|
||||
@ -165,9 +166,11 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
|
||||
// return;
|
||||
throw new AnalysisException("group commit is not supported in Nereids now");
|
||||
}
|
||||
boolean emptyInsert = childIsEmptyRelation(physicalSink);
|
||||
OlapTable olapTable = (OlapTable) targetTableIf;
|
||||
// the insertCtx contains some variables to adjust SinkNode
|
||||
insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx);
|
||||
insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert);
|
||||
|
||||
boolean isEnableMemtableOnSinkNode =
|
||||
olapTable.getTableProperty().getUseSchemaLightChange()
|
||||
? insertExecutor.getCoordinator().getQueryOptions().isEnableMemtableOnSinkNode()
|
||||
@ -175,14 +178,16 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
|
||||
insertExecutor.getCoordinator().getQueryOptions()
|
||||
.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
|
||||
} else if (physicalSink instanceof PhysicalHiveTableSink) {
|
||||
boolean emptyInsert = childIsEmptyRelation(physicalSink);
|
||||
HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf;
|
||||
insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner,
|
||||
Optional.of(insertCtx.orElse((new HiveInsertCommandContext()))));
|
||||
Optional.of(insertCtx.orElse((new HiveInsertCommandContext()))), emptyInsert);
|
||||
// set hive query options
|
||||
} else if (physicalSink instanceof PhysicalIcebergTableSink) {
|
||||
boolean emptyInsert = childIsEmptyRelation(physicalSink);
|
||||
IcebergExternalTable icebergExternalTable = (IcebergExternalTable) targetTableIf;
|
||||
insertExecutor = new IcebergInsertExecutor(ctx, icebergExternalTable, label, planner,
|
||||
Optional.of(insertCtx.orElse((new BaseExternalTableInsertCommandContext()))));
|
||||
Optional.of(insertCtx.orElse((new BaseExternalTableInsertCommandContext()))), emptyInsert);
|
||||
} else {
|
||||
// TODO: support other table types
|
||||
throw new AnalysisException("insert into command only support [olap, hive, iceberg] table");
|
||||
@ -203,6 +208,10 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
|
||||
|
||||
private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Exception {
|
||||
AbstractInsertExecutor insertExecutor = initPlan(ctx, executor);
|
||||
// if the insert stmt data source is empty, directly return, no need to be executed.
|
||||
if (insertExecutor.isEmptyInsert()) {
|
||||
return;
|
||||
}
|
||||
insertExecutor.executeSingleInsert(executor, jobId);
|
||||
}
|
||||
|
||||
@ -219,4 +228,12 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
|
||||
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
|
||||
return visitor.visitInsertIntoTableCommand(this, context);
|
||||
}
|
||||
|
||||
private boolean childIsEmptyRelation(PhysicalSink sink) {
|
||||
if (sink.children() != null && sink.children().size() == 1
|
||||
&& sink.child(0) instanceof PhysicalEmptyRelation) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -76,8 +76,8 @@ public class OlapInsertExecutor extends AbstractInsertExecutor {
|
||||
* constructor
|
||||
*/
|
||||
public OlapInsertExecutor(ConnectContext ctx, Table table,
|
||||
String labelName, NereidsPlanner planner, Optional<InsertCommandContext> insertCtx) {
|
||||
super(ctx, table, labelName, planner, insertCtx);
|
||||
String labelName, NereidsPlanner planner, Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
|
||||
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
|
||||
}
|
||||
|
||||
public long getTxnId() {
|
||||
|
||||
Reference in New Issue
Block a user