[Fix](Variant) support materialize view for variant and accessing variant subcolumns (#30603)

* [Fix](Variant) support materialize view for variant and accessing variant subcolumns
1. fix schema change with path lost and lead to invalid data read
2. support element_at function in BE side and use simdjson to parse data
3. fix multi slot expression
This commit is contained in:
lihangyu
2024-02-06 18:14:18 +08:00
committed by yiguolei
parent 5b343911e8
commit b23a785775
18 changed files with 408 additions and 129 deletions

View File

@ -1039,7 +1039,7 @@ public class Analyzer {
LOG.debug("register column ref table {}, colName {}, col {}", tblName, colName, col.toSql());
if (col.getType().isVariantType() || (subColNames != null && !subColNames.isEmpty())) {
if (!Config.enable_variant_access_in_original_planner
if (getContext() != null && !getContext().getSessionVariable().enableVariantAccessInOriginalPlanner
&& (subColNames != null && !subColNames.isEmpty())) {
ErrorReport.reportAnalysisException("Variant sub-column access is disabled in original planner,"
+ "set enable_variant_access_in_original_planner = true in session variable");

View File

@ -1657,28 +1657,31 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
return inputFragment;
}
// Get top most PushDownToProjectionFunction from expression
private Expression getOriginalFunctionForRewritten(NamedExpression expression) {
List<Expression> targetExpr = expression.collectFirst(PushDownToProjectionFunction.class::isInstance);
if (!targetExpr.isEmpty()) {
return targetExpr.get(0);
}
return null;
// collect all valid PushDownToProjectionFunction from expression
private List<Expression> getPushDownToProjectionFunctionForRewritten(NamedExpression expression) {
List<Expression> targetExprList = expression.collectToList(PushDownToProjectionFunction.class::isInstance);
return targetExprList.stream()
.filter(PushDownToProjectionFunction::validToPushDown)
.collect(Collectors.toList());
}
// register rewritten slots from original PushDownToProjectionFunction
private void registerRewrittenSlot(PhysicalProject<? extends Plan> project, OlapScanNode olapScanNode) {
// register slots that are rewritten from element_at/etc..
for (NamedExpression expr : project.getProjects()) {
List<Expression> allPushDownProjectionFunctions = project.getProjects().stream()
.map(this::getPushDownToProjectionFunctionForRewritten)
.flatMap(List::stream)
.collect(Collectors.toList());
for (Expression expr : allPushDownProjectionFunctions) {
PushDownToProjectionFunction function = (PushDownToProjectionFunction) expr;
if (context != null
&& context.getConnectContext() != null
&& context.getConnectContext().getStatementContext() != null) {
Slot rewrittenSlot = context.getConnectContext()
.getStatementContext().getRewrittenSlotRefByOriginalExpr(getOriginalFunctionForRewritten(expr));
if (rewrittenSlot != null) {
TupleDescriptor tupleDescriptor = context.getTupleDesc(olapScanNode.getTupleId());
context.createSlotDesc(tupleDescriptor, (SlotReference) rewrittenSlot);
}
Slot argumentSlot = function.getInputSlots().stream().findFirst().get();
Expression rewrittenSlot = PushDownToProjectionFunction.rewriteToSlot(
function, (SlotReference) argumentSlot);
TupleDescriptor tupleDescriptor = context.getTupleDesc(olapScanNode.getTupleId());
context.createSlotDesc(tupleDescriptor, (SlotReference) rewrittenSlot);
}
}
}

View File

@ -21,7 +21,6 @@ import org.apache.doris.nereids.rules.expression.rules.ArrayContainToArrayOverla
import org.apache.doris.nereids.rules.expression.rules.CaseWhenToIf;
import org.apache.doris.nereids.rules.expression.rules.DateFunctionRewrite;
import org.apache.doris.nereids.rules.expression.rules.DistinctPredicatesRule;
import org.apache.doris.nereids.rules.expression.rules.ElementAtToSlot;
import org.apache.doris.nereids.rules.expression.rules.ExtractCommonFactorRule;
import org.apache.doris.nereids.rules.expression.rules.OrToIn;
import org.apache.doris.nereids.rules.expression.rules.SimplifyComparisonPredicate;
@ -49,8 +48,7 @@ public class ExpressionOptimization extends ExpressionRewrite {
OrToIn.INSTANCE,
ArrayContainToArrayOverlap.INSTANCE,
CaseWhenToIf.INSTANCE,
TopnToMax.INSTANCE,
ElementAtToSlot.INSTANCE
TopnToMax.INSTANCE
);
private static final ExpressionRuleExecutor EXECUTOR = new ExpressionRuleExecutor(OPTIMIZE_REWRITE_RULES);

View File

@ -1,89 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.expression.rules;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteRule;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
import org.apache.doris.nereids.trees.expressions.functions.scalar.ElementAt;
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
import org.apache.doris.qe.ConnectContext;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Transform element_at function to SlotReference for variant sub-column access.
* This optimization will help query engine to prune as many sub columns as possible
* to speed up query.
* eg: element_at(element_at(v, "a"), "b") -> SlotReference(column=v, subColLabels=["a", "b"])
*/
public class ElementAtToSlot extends DefaultExpressionRewriter<ExpressionRewriteContext> implements
ExpressionRewriteRule<ExpressionRewriteContext> {
public static final ElementAtToSlot INSTANCE = new ElementAtToSlot();
@Override
public Expression rewrite(Expression expr, ExpressionRewriteContext ctx) {
return expr.accept(this, ctx);
}
/**
* Rewrites an {@link ElementAt} instance to a {@link SlotReference}.
* This method is used to transform an ElementAt expr into a SlotReference,
* based on the provided topColumnSlot and the context of the statement.
*
* @param elementAt The {@link ElementAt} instance to be rewritten.
* @param topColumnSlot The {@link SlotReference} that represents the top column slot.
* @return A {@link SlotReference} that represents the rewritten element.
* If a target column slot is found in the context, it is returned to avoid duplicates.
* Otherwise, a new SlotReference is created and added to the context.
*/
public static Expression rewriteToSlot(ElementAt elementAt, SlotReference topColumnSlot) {
// rewrite to slotRef
StatementContext ctx = ConnectContext.get().getStatementContext();
List<String> fullPaths = elementAt.collectToList(node -> node instanceof VarcharLiteral).stream()
.map(node -> ((VarcharLiteral) node).getValue())
.collect(Collectors.toList());
SlotReference targetColumnSlot = ctx.getPathSlot(topColumnSlot, fullPaths);
if (targetColumnSlot != null) {
// avoid duplicated slots
return targetColumnSlot;
}
SlotReference slotRef = new SlotReference(StatementScopeIdGenerator.newExprId(),
topColumnSlot.getName(), topColumnSlot.getDataType(),
topColumnSlot.nullable(), topColumnSlot.getQualifier(), topColumnSlot.getTable().get(),
topColumnSlot.getColumn().get(), Optional.of(topColumnSlot.getInternalName()),
fullPaths);
ctx.addPathSlotRef(topColumnSlot, fullPaths, slotRef, elementAt);
ctx.addSlotToRelation(slotRef, ctx.getRelationBySlot(topColumnSlot));
return slotRef;
}
@Override
public Expression visitElementAt(ElementAt elementAt, ExpressionRewriteContext context) {
// todo
return elementAt;
}
}

View File

@ -205,15 +205,14 @@ public class FunctionBinder extends AbstractExpressionRewriteRule {
if (ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable() != null
&& !ConnectContext.get().getSessionVariable().isEnableRewriteElementAtToSlot()) {
throw new AnalysisException(
"set enable_rewrite_element_at_to_slot=true when using element_at function for variant type");
return boundFunction;
}
Slot slot = elementAt.getInputSlots().stream().findFirst().get();
if (slot.hasUnbound()) {
slot = (Slot) super.visit(slot, context);
}
// rewrite to slot and bound this slot
return ElementAtToSlot.rewriteToSlot(elementAt, (SlotReference) slot);
return PushDownToProjectionFunction.rewriteToSlot(elementAt, (SlotReference) slot);
}
return boundFunction;
}

View File

@ -39,8 +39,8 @@ import java.util.List;
/**
* ScalarFunction 'element_at'. This class is generated by GenerateFunction.
*/
public class ElementAt extends ScalarFunction
implements BinaryExpression, ExplicitlyCastableSignature, AlwaysNullable, PushDownToProjectionFunction {
public class ElementAt extends PushDownToProjectionFunction
implements BinaryExpression, ExplicitlyCastableSignature, AlwaysNullable {
public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
FunctionSignature.ret(new FollowToAnyDataType(0))

View File

@ -17,24 +17,68 @@
package org.apache.doris.nereids.trees.expressions.functions.scalar;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import org.apache.doris.qe.ConnectContext;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Function that could be rewritten and pushed down to projection
*/
public interface PushDownToProjectionFunction {
public abstract class PushDownToProjectionFunction extends ScalarFunction {
public PushDownToProjectionFunction(String name, Expression... arguments) {
super(name, arguments);
}
/**
* check if specified function could be pushed down to project
* @param pushDownExpr expr to check
* @return if it is valid to push down input expr
*/
static boolean validToPushDown(Expression pushDownExpr) {
public static boolean validToPushDown(Expression pushDownExpr) {
// Currently only element at for variant type could be pushed down
return !pushDownExpr.collectToList(
return pushDownExpr != null && !pushDownExpr.collectToList(
PushDownToProjectionFunction.class::isInstance).stream().filter(
x -> ((Expression) x).getDataType().isVariantType()).collect(
Collectors.toList()).isEmpty();
}
/**
* Rewrites an {@link PushDownToProjectionFunction} instance to a {@link SlotReference}.
* This method is used to transform an PushDownToProjectionFunction expr into a SlotReference,
* based on the provided topColumnSlot and the context of the statement.
*
* @param pushedFunction The {@link PushDownToProjectionFunction} instance to be rewritten.
* @param topColumnSlot The {@link SlotReference} that represents the top column slot.
* @return A {@link SlotReference} that represents the rewritten element.
* If a target column slot is found in the context, it is returned to avoid duplicates.
* Otherwise, a new SlotReference is created and added to the context.
*/
public static Expression rewriteToSlot(PushDownToProjectionFunction pushedFunction, SlotReference topColumnSlot) {
// rewrite to slotRef
StatementContext ctx = ConnectContext.get().getStatementContext();
List<String> fullPaths = pushedFunction.collectToList(node -> node instanceof VarcharLiteral).stream()
.map(node -> ((VarcharLiteral) node).getValue())
.collect(Collectors.toList());
SlotReference targetColumnSlot = ctx.getPathSlot(topColumnSlot, fullPaths);
if (targetColumnSlot != null) {
// avoid duplicated slots
return targetColumnSlot;
}
SlotReference slotRef = new SlotReference(StatementScopeIdGenerator.newExprId(),
topColumnSlot.getName(), topColumnSlot.getDataType(),
topColumnSlot.nullable(), topColumnSlot.getQualifier(), topColumnSlot.getTable().get(),
topColumnSlot.getColumn().get(), Optional.of(topColumnSlot.getInternalName()),
fullPaths);
ctx.addPathSlotRef(topColumnSlot, fullPaths, slotRef, pushedFunction);
ctx.addSlotToRelation(slotRef, ctx.getRelationBySlot(topColumnSlot));
return slotRef;
}
}

View File

@ -186,6 +186,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String DELETE_WITHOUT_PARTITION = "delete_without_partition";
public static final String ENABLE_VARIANT_ACCESS_IN_ORIGINAL_PLANNER = "enable_variant_access_in_original_planner";
// set the default parallelism for send batch when execute InsertStmt operation,
// if the value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config,
// then the coordinator be will use the value of `max_send_batch_parallelism_per_job`
@ -793,6 +795,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = SEND_BATCH_PARALLELISM, needForward = true)
public int sendBatchParallelism = 1;
@VariableMgr.VarAttr(name = ENABLE_VARIANT_ACCESS_IN_ORIGINAL_PLANNER)
public boolean enableVariantAccessInOriginalPlanner = false;
@VariableMgr.VarAttr(name = EXTRACT_WIDE_RANGE_EXPR, needForward = true)
public boolean extractWideRangeExpr = true;

View File

@ -42,12 +42,6 @@ public class ElementAtToSlotRefRule implements ExprRewriteRule {
@Override
public Expr apply(Expr expr, Analyzer analyzer, ClauseType clauseType) throws AnalysisException {
// Only check element at of variant all rewrited to slots
List<Expr> elementAtFunctions = Lists.newArrayList();
getElementAtFunction(expr, elementAtFunctions);
if (!elementAtFunctions.isEmpty()) {
throw new AnalysisException("element_at should not appear in common rewrite stage");
}
return expr;
}