[opt](Nereids) fix several insert into related issues (#40467) (#40755)

pick from master #40467

- http_stream TVF should always generate one fragment plan
- http_stream TVF plan should not check root as scan node
- distinguish group_commit TVF with normal insert statement
- index and generate slot should based on type cast base slot
- agg_state could cast from nullable to non-nullable
- colocated and bucket scan range compute should only on scan node
This commit is contained in:
morrySnow
2024-09-13 10:19:56 +08:00
committed by GitHub
parent 431e2e1af9
commit 51c8b62d1c
10 changed files with 105 additions and 208 deletions

View File

@ -572,7 +572,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
isAutoDetect,
isOverwrite,
ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(),
DMLCommandType.INSERT,
ctx.tableId == null ? DMLCommandType.INSERT : DMLCommandType.GROUP_COMMIT,
plan);
Optional<LogicalPlan> cte = Optional.empty();
if (ctx.cte() != null) {

View File

@ -67,6 +67,7 @@ import org.apache.doris.nereids.trees.plans.visitor.InferPlanOutputAlias;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.StringType;
import org.apache.doris.nereids.types.coercion.CharacterType;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.nereids.util.TypeCoercionUtils;
@ -125,7 +126,8 @@ public class BindSink implements AnalysisRuleFactory {
&& table.getSequenceMapCol() != null
&& sink.getColNames().contains(table.getSequenceMapCol());
Pair<List<Column>, Integer> bindColumnsResult =
bindTargetColumns(table, sink.getColNames(), childHasSeqCol, needExtraSeqCol);
bindTargetColumns(table, sink.getColNames(), childHasSeqCol, needExtraSeqCol,
sink.getDMLCommandType() == DMLCommandType.GROUP_COMMIT);
List<Column> bindColumns = bindColumnsResult.first;
int extraColumnsNum = bindColumnsResult.second;
@ -175,8 +177,12 @@ public class BindSink implements AnalysisRuleFactory {
.filter(col -> col.getName().equalsIgnoreCase(table.getSequenceMapCol()))
.findFirst();
} else {
if (!sink.getColNames().isEmpty()) {
if (sink.getColNames().stream()
// ATTN: must use bindColumns here. Because of insert into from group_commit tvf submitted by BE
// do not follow any column list with target table, but it contains all inviable data in sink's
// child. THis is different with other insert action that contain non-inviable data by default.
if (!bindColumns.isEmpty()) {
if (bindColumns.stream()
.map(Column::getName)
.anyMatch(c -> c.equalsIgnoreCase(Column.SEQUENCE_COL))) {
haveInputSeqCol = true; // case2.a
} // else case2.b
@ -204,7 +210,8 @@ public class BindSink implements AnalysisRuleFactory {
Map<String, NamedExpression> columnToOutput = getColumnToOutput(
ctx, table, isPartialUpdate, boundSink, child);
LogicalProject<?> fullOutputProject = getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput);
LogicalProject<?> fullOutputProject = getOutputProjectByCoercion(
table.getFullSchema(), child, columnToOutput);
return boundSink.withChildAndUpdateOutput(fullOutputProject);
}
@ -266,39 +273,31 @@ public class BindSink implements AnalysisRuleFactory {
// we need to insert all the columns of the target table
// although some columns are not mentions.
// so we add a projects to supply the default value.
Map<Column, NamedExpression> columnToChildOutput = Maps.newHashMap();
for (int i = 0; i < child.getOutput().size(); ++i) {
columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i));
}
Map<String, NamedExpression> columnToOutput = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
Map<String, NamedExpression> columnToReplaced = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
Map<Expression, Expression> replaceMap = Maps.newHashMap();
NereidsParser expressionParser = new NereidsParser();
List<Column> materializedViewColumn = Lists.newArrayList();
// generate slots not mentioned in sql, mv slots and shaded slots.
for (Column column : boundSink.getTargetTable().getFullSchema()) {
if (column.isMaterializedViewColumn()) {
List<SlotRef> refs = column.getRefColumns();
// now we have to replace the column to slots.
Preconditions.checkArgument(refs != null,
"mv column %s 's ref column cannot be null", column);
Expression parsedExpression = expressionParser.parseExpression(
column.getDefineExpr().toSqlWithoutTbl());
// the boundSlotExpression is an expression whose slots are bound but function
// may not be bound, we have to bind it again.
// for example: to_bitmap.
Expression boundExpression = new CustomExpressionAnalyzer(
boundSink, ctx.cascadesContext, columnToOutput).analyze(parsedExpression);
if (boundExpression instanceof Alias) {
boundExpression = ((Alias) boundExpression).child();
}
NamedExpression slot = new Alias(boundExpression, column.getDefineExpr().toSqlWithoutTbl());
columnToOutput.put(column.getName(), slot);
} else if (columnToChildOutput.containsKey(column)
materializedViewColumn.add(column);
continue;
}
if (columnToChildOutput.containsKey(column)
// do not process explicitly use DEFAULT value here:
// insert into table t values(DEFAULT)
&& !(columnToChildOutput.get(column) instanceof DefaultValueSlot)) {
columnToOutput.put(column.getName(), columnToChildOutput.get(column));
Alias output = new Alias(TypeCoercionUtils.castIfNotSameType(
columnToChildOutput.get(column), DataType.fromCatalogType(column.getType())),
column.getName());
columnToOutput.put(column.getName(), output);
columnToReplaced.put(column.getName(), output.toSlot());
replaceMap.put(output.toSlot(), output.child());
} else {
if (table instanceof OlapTable && ((OlapTable) table).hasSequenceCol()
&& column.getName().equals(Column.SEQUENCE_COL)
@ -319,6 +318,8 @@ public class BindSink implements AnalysisRuleFactory {
seqColumn = new Alias(seqColumn, column.getName());
}
columnToOutput.put(column.getName(), seqColumn);
columnToReplaced.put(column.getName(), seqColumn.toSlot());
replaceMap.put(seqColumn.toSlot(), seqColumn.child(0));
}
} else if (isPartialUpdate) {
// If the current load is a partial update, the values of unmentioned
@ -335,9 +336,12 @@ public class BindSink implements AnalysisRuleFactory {
Expression defualtValueExpression = ExpressionAnalyzer.analyzeFunction(
boundSink, ctx.cascadesContext, unboundFunctionDefaultValue
);
columnToOutput.put(column.getName(),
new Alias(defualtValueExpression, column.getName())
);
Alias output = new Alias(TypeCoercionUtils.castIfNotSameType(
defualtValueExpression, DataType.fromCatalogType(column.getType())),
column.getName());
columnToOutput.put(column.getName(), output);
columnToReplaced.put(column.getName(), output.toSlot());
replaceMap.put(output.toSlot(), output.child());
} else {
continue;
}
@ -350,10 +354,11 @@ public class BindSink implements AnalysisRuleFactory {
}
// Otherwise, the unmentioned columns should be filled with default values
// or null values
columnToOutput.put(column.getName(), new Alias(
new NullLiteral(DataType.fromCatalogType(column.getType())),
column.getName()
));
Alias output = new Alias(new NullLiteral(DataType.fromCatalogType(column.getType())),
column.getName());
columnToOutput.put(column.getName(), output);
columnToReplaced.put(column.getName(), output.toSlot());
replaceMap.put(output.toSlot(), output.child());
} else {
try {
// it comes from the original planner, if default value expression is
@ -372,8 +377,12 @@ public class BindSink implements AnalysisRuleFactory {
if (defualtValueExpression instanceof Alias) {
defualtValueExpression = ((Alias) defualtValueExpression).child();
}
columnToOutput.put(column.getName(),
new Alias(defualtValueExpression, column.getName()));
Alias output = new Alias((TypeCoercionUtils.castIfNotSameType(
defualtValueExpression, DataType.fromCatalogType(column.getType()))),
column.getName());
columnToOutput.put(column.getName(), output);
columnToReplaced.put(column.getName(), output.toSlot());
replaceMap.put(output.toSlot(), output.child());
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
@ -381,6 +390,29 @@ public class BindSink implements AnalysisRuleFactory {
}
}
}
for (Column column : materializedViewColumn) {
if (column.isMaterializedViewColumn()) {
List<SlotRef> refs = column.getRefColumns();
// now we have to replace the column to slots.
Preconditions.checkArgument(refs != null,
"mv column %s 's ref column cannot be null", column);
Expression parsedExpression = expressionParser.parseExpression(
column.getDefineExpr().toSqlWithoutTbl());
// the boundSlotExpression is an expression whose slots are bound but function
// may not be bound, we have to bind it again.
// for example: to_bitmap.
Expression boundExpression = new CustomExpressionAnalyzer(
boundSink, ctx.cascadesContext, columnToReplaced).analyze(parsedExpression);
if (boundExpression instanceof Alias) {
boundExpression = ((Alias) boundExpression).child();
}
boundExpression = ExpressionUtils.replace(boundExpression, replaceMap);
boundExpression = TypeCoercionUtils.castIfNotSameType(boundExpression,
DataType.fromCatalogType(column.getType()));
Alias output = new Alias(boundExpression, column.getDefineExpr().toSqlWithoutTbl());
columnToOutput.put(column.getName(), output);
}
}
return columnToOutput;
}
@ -527,12 +559,14 @@ public class BindSink implements AnalysisRuleFactory {
}
private Pair<List<Column>, Integer> bindTargetColumns(OlapTable table, List<String> colsName,
boolean childHasSeqCol, boolean needExtraSeqCol) {
boolean childHasSeqCol, boolean needExtraSeqCol, boolean isGroupCommit) {
// if the table set sequence column in stream load phase, the sequence map column is null, we query it.
if (colsName.isEmpty()) {
// ATTN: group commit without column list should return all base index column
// because it already prepares data for these columns.
return Pair.of(table.getBaseSchema(true).stream()
.filter(c -> validColumn(c, childHasSeqCol))
.collect(ImmutableList.toImmutableList()), 0);
.filter(c -> isGroupCommit || validColumn(c, childHasSeqCol))
.collect(ImmutableList.toImmutableList()), 0);
} else {
int extraColumnsNum = (needExtraSeqCol ? 1 : 0);
List<String> processedColsName = Lists.newArrayList(colsName);

View File

@ -19,12 +19,17 @@ package org.apache.doris.nereids.trees.expressions.functions.table;
import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.DistributionSpecHash;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.tablefunction.HttpStreamTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import com.google.common.collect.ImmutableList;
import java.util.Map;
/** http_stream */
@ -49,6 +54,12 @@ public class HttpStream extends TableValuedFunction {
}
}
@Override
public PhysicalProperties getPhysicalProperties() {
return PhysicalProperties.createHash(new DistributionSpecHash(ImmutableList.of(),
ShuffleType.EXECUTION_BUCKETED));
}
@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitHttpStream(this, context);

View File

@ -27,6 +27,8 @@ public enum DMLCommandType {
NONE,
// for INSERT INTO or INSERT INTO SELECT
INSERT,
// for group_commit tvf
GROUP_COMMIT,
// for UPDATE
UPDATE,
// for DELETE

View File

@ -2390,9 +2390,10 @@ public class Coordinator implements CoordInterface {
FragmentScanRangeAssignment assignment
= fragmentExecParamsMap.get(scanNode.getFragmentId()).scanRangeAssignment;
boolean fragmentContainsColocateJoin = isColocateFragment(scanNode.getFragment(),
scanNode.getFragment().getPlanRoot());
scanNode.getFragment().getPlanRoot()) && (scanNode instanceof OlapScanNode);
boolean fragmentContainsBucketShuffleJoin = bucketShuffleJoinController
.isBucketShuffleJoin(scanNode.getFragmentId().asInt(), scanNode.getFragment().getPlanRoot());
.isBucketShuffleJoin(scanNode.getFragmentId().asInt(), scanNode.getFragment().getPlanRoot())
&& (scanNode instanceof OlapScanNode);
// A fragment may contain both colocate join and bucket shuffle join
// on need both compute scanRange to init basic data for query coordinator

View File

@ -3267,8 +3267,16 @@ public class StmtExecutor {
httpStreamParams.setLabel(insertExecutor.getLabelName());
PlanNode planRoot = planner.getFragments().get(0).getPlanRoot();
Preconditions.checkState(planRoot instanceof TVFScanNode || planRoot instanceof GroupCommitScanNode,
"Nereids' planNode cannot be converted to " + planRoot.getClass().getName());
boolean isValidPlan = !planner.getScanNodes().isEmpty();
for (ScanNode scanNode : planner.getScanNodes()) {
if (!(scanNode instanceof TVFScanNode || planRoot instanceof GroupCommitScanNode)) {
isValidPlan = false;
break;
}
}
if (!isValidPlan) {
throw new AnalysisException("plan is invalid: " + planRoot.getExplainString());
}
} catch (QueryStateException e) {
LOG.debug("Command(" + originStmt.originStmt + ") process failed.", e);
context.setState(e.getQueryState());
@ -3339,11 +3347,8 @@ public class StmtExecutor {
LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e);
throw ((NereidsException) e).getException();
}
boolean isInsertIntoCommand = parsedStmt != null && parsedStmt instanceof LogicalPlanAdapter
&& ((LogicalPlanAdapter) parsedStmt).getLogicalPlan() instanceof InsertIntoTableCommand;
if (e instanceof NereidsException
&& !context.getSessionVariable().enableFallbackToOriginalPlanner
&& !isInsertIntoCommand) {
&& !context.getSessionVariable().enableFallbackToOriginalPlanner) {
LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e);
throw ((NereidsException) e).getException();
}