diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index b7d3a4ca0f..1ae3f7acc7 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -449,8 +449,8 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column, // Alter table operation should read the whole variant column, since it does not aware of // subcolumns of variant during processing rewriting rowsets. // This is slow, since it needs to read all sub columns and merge them into a single column - RETURN_IF_ERROR(HierarchicalDataReader::create(iter, tablet_column.path_info(), node, root, - output_as_raw_json)); + RETURN_IF_ERROR( + HierarchicalDataReader::create(iter, root_path, node, root, output_as_raw_json)); return Status::OK(); } diff --git a/be/src/vec/columns/column_object.cpp b/be/src/vec/columns/column_object.cpp index aff38c56a8..552ad31809 100644 --- a/be/src/vec/columns/column_object.cpp +++ b/be/src/vec/columns/column_object.cpp @@ -586,6 +586,11 @@ ColumnObject::ColumnObject(bool is_nullable_, bool create_root_) } } +ColumnObject::ColumnObject(bool is_nullable_, DataTypePtr type, MutableColumnPtr&& column) + : is_nullable(is_nullable_) { + add_sub_column({}, std::move(column), type); +} + ColumnObject::ColumnObject(Subcolumns&& subcolumns_, bool is_nullable_) : is_nullable(is_nullable_), subcolumns(std::move(subcolumns_)), diff --git a/be/src/vec/columns/column_object.h b/be/src/vec/columns/column_object.h index 7f328992a2..407419ff5c 100644 --- a/be/src/vec/columns/column_object.h +++ b/be/src/vec/columns/column_object.h @@ -231,6 +231,8 @@ public: explicit ColumnObject(bool is_nullable_, bool create_root = true); + explicit ColumnObject(bool is_nullable_, DataTypePtr type, MutableColumnPtr&& column); + ColumnObject(Subcolumns&& subcolumns_, bool is_nullable_); ~ColumnObject() override = default; diff --git a/be/src/vec/functions/function_variant_element.cpp b/be/src/vec/functions/function_variant_element.cpp new file mode 100644 index 0000000000..8925663527 --- /dev/null +++ b/be/src/vec/functions/function_variant_element.cpp @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "exprs/json_functions.h" +#include "simdjson.h" +#include "vec/columns/column.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_object.h" +#include "vec/columns/column_string.h" +#include "vec/common/assert_cast.h" +#include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_nothing.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_object.h" +#include "vec/data_types/data_type_string.h" +#include "vec/functions/function.h" +#include "vec/functions/function_helpers.h" +#include "vec/functions/simple_function_factory.h" + +namespace doris::vectorized { + +class FunctionVariantElement : public IFunction { +public: + static constexpr auto name = "element_at"; + static FunctionPtr create() { return std::make_shared(); } + + // Get function name. + String get_name() const override { return name; } + + bool use_default_implementation_for_nulls() const override { return true; } + + size_t get_number_of_arguments() const override { return 2; } + + ColumnNumbers get_arguments_that_are_always_constant() const override { return {1}; } + + DataTypes get_variadic_argument_types_impl() const override { + return {std::make_shared(), std::make_shared()}; + } + + DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { + DCHECK((WhichDataType(remove_nullable(arguments[0]))).is_variant_type()) + << "First argument for function: " << name + << " should be DataTypeObject but it has type " << arguments[0]->get_name() << "."; + DCHECK(is_string(arguments[1])) + << "Second argument for function: " << name << " should be String but it has type " + << arguments[1]->get_name() << "."; + return make_nullable(std::make_shared()); + } + + Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, + size_t result, size_t input_rows_count) const override { + const auto* variant_col = check_and_get_column( + block.get_by_position(arguments[0]).column.get()); + if (!variant_col) { + return Status::RuntimeError( + fmt::format("unsupported types for function {}({}, {})", get_name(), + block.get_by_position(arguments[0]).type->get_name(), + block.get_by_position(arguments[1]).type->get_name())); + } + if (block.empty()) { + block.replace_by_position(result, block.get_by_position(result).type->create_column()); + return Status::OK(); + } + + auto index_column = block.get_by_position(arguments[1]).column; + ColumnPtr result_column; + RETURN_IF_ERROR(get_element_column(*variant_col, index_column, &result_column)); + block.replace_by_position(result, result_column); + return Status::OK(); + } + +private: + static Status get_element_column(const ColumnObject& src, const ColumnPtr& index_column, + ColumnPtr* result) { + std::string field_name = index_column->get_data_at(0).to_string(); + if (src.empty()) { + *result = ColumnObject::create(true); + return Status::OK(); + } + if (src.is_scalar_variant() && + WhichDataType(remove_nullable(src.get_root_type())).is_string_or_fixed_string()) { + // use parser to extract from root + auto type = std::make_shared(); + MutableColumnPtr result_column = type->create_column(); + const ColumnString& docs = + *check_and_get_column(remove_nullable(src.get_root()).get()); + simdjson::ondemand::parser parser; + std::vector parsed_paths; + if (field_name.empty() || field_name[0] != '$') { + field_name = "$." + field_name; + } + JsonFunctions::parse_json_paths(field_name, &parsed_paths); + for (size_t i = 0; i < docs.size(); ++i) { + if (!extract_from_document(parser, docs.get_data_at(i), parsed_paths, + assert_cast(result_column.get()))) { + VLOG_DEBUG << "failed to parse " << docs.get_data_at(i) << ", field " + << field_name; + result_column->insert_default(); + } + } + *result = ColumnObject::create(true, type, std::move(result_column)); + return Status::OK(); + } else { + return Status::NotSupported("Not support element_at with none scalar variant {}", + src.debug_string()); + } + } + + static Status extract_from_document(simdjson::ondemand::parser& parser, const StringRef& doc, + const std::vector& paths, ColumnString* column) { + try { + simdjson::padded_string json_str {doc.data, doc.size}; + simdjson::ondemand::document doc = parser.iterate(json_str); + simdjson::ondemand::object object = doc.get_object(); + simdjson::ondemand::value value; + RETURN_IF_ERROR(JsonFunctions::extract_from_object(object, paths, &value)); + _write_data_to_column(value, column); + } catch (simdjson::simdjson_error& e) { + VLOG_DEBUG << "simdjson parse exception: " << e.what(); + return Status::DataQualityError("simdjson parse exception {}", e.what()); + } + return Status::OK(); + } + + static void _write_data_to_column(simdjson::ondemand::value& value, ColumnString* column) { + switch (value.type()) { + case simdjson::ondemand::json_type::null: { + column->insert_default(); + break; + } + case simdjson::ondemand::json_type::boolean: { + if (value.get_bool()) { + column->insert_data("1", 1); + } else { + column->insert_data("0", 1); + } + break; + } + default: { + auto value_str = simdjson::to_json_string(value).value(); + column->insert_data(value_str.data(), value_str.length()); + } + } + } +}; + +void register_function_variant_element(SimpleFunctionFactory& factory) { + factory.register_function(); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/functions/simple_function_factory.h b/be/src/vec/functions/simple_function_factory.h index 5c47198fd6..9bedc204cb 100644 --- a/be/src/vec/functions/simple_function_factory.h +++ b/be/src/vec/functions/simple_function_factory.h @@ -90,6 +90,7 @@ void register_function_array(SimpleFunctionFactory& factory); void register_function_map(SimpleFunctionFactory& factory); void register_function_struct(SimpleFunctionFactory& factory); void register_function_struct_element(SimpleFunctionFactory& factory); +void register_function_variant_element(SimpleFunctionFactory& factory); void register_function_geo(SimpleFunctionFactory& factory); void register_function_multi_string_position(SimpleFunctionFactory& factory); void register_function_multi_string_search(SimpleFunctionFactory& factory); @@ -178,8 +179,12 @@ public: auto iter = function_creators.find(key_str); if (iter == function_creators.end()) { - LOG(WARNING) << fmt::format("Function signature {} is not found", key_str); - return nullptr; + // use original name as signature without variadic arguments + iter = function_creators.find(name); + if (iter == function_creators.end()) { + LOG(WARNING) << fmt::format("Function signature {} is not found", key_str); + return nullptr; + } } return iter->second()->build(arguments, return_type); @@ -280,6 +285,7 @@ public: register_function_ip(instance); register_function_tokenize(instance); register_function_ignore(instance); + register_function_variant_element(instance); }); return instance; } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index be58c13985..a7f642944e 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2521,8 +2521,6 @@ public class Config extends ConfigBase { @ConfField public static String cloud_sql_server_cluster_id = "RESERVED_CLUSTER_ID_FOR_SQL_SERVER"; - @ConfField(mutable = true) - public static boolean enable_variant_access_in_original_planner = false; //========================================================================== // end of cloud config //========================================================================== diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index 38b7403960..0b4317f871 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -1039,7 +1039,7 @@ public class Analyzer { LOG.debug("register column ref table {}, colName {}, col {}", tblName, colName, col.toSql()); if (col.getType().isVariantType() || (subColNames != null && !subColNames.isEmpty())) { - if (!Config.enable_variant_access_in_original_planner + if (getContext() != null && !getContext().getSessionVariable().enableVariantAccessInOriginalPlanner && (subColNames != null && !subColNames.isEmpty())) { ErrorReport.reportAnalysisException("Variant sub-column access is disabled in original planner," + "set enable_variant_access_in_original_planner = true in session variable"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index cc8b2483ff..94b2f78cf1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1657,28 +1657,31 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor targetExpr = expression.collectFirst(PushDownToProjectionFunction.class::isInstance); - if (!targetExpr.isEmpty()) { - return targetExpr.get(0); - } - return null; + // collect all valid PushDownToProjectionFunction from expression + private List getPushDownToProjectionFunctionForRewritten(NamedExpression expression) { + List targetExprList = expression.collectToList(PushDownToProjectionFunction.class::isInstance); + return targetExprList.stream() + .filter(PushDownToProjectionFunction::validToPushDown) + .collect(Collectors.toList()); } // register rewritten slots from original PushDownToProjectionFunction private void registerRewrittenSlot(PhysicalProject project, OlapScanNode olapScanNode) { // register slots that are rewritten from element_at/etc.. - for (NamedExpression expr : project.getProjects()) { + List allPushDownProjectionFunctions = project.getProjects().stream() + .map(this::getPushDownToProjectionFunctionForRewritten) + .flatMap(List::stream) + .collect(Collectors.toList()); + for (Expression expr : allPushDownProjectionFunctions) { + PushDownToProjectionFunction function = (PushDownToProjectionFunction) expr; if (context != null && context.getConnectContext() != null && context.getConnectContext().getStatementContext() != null) { - Slot rewrittenSlot = context.getConnectContext() - .getStatementContext().getRewrittenSlotRefByOriginalExpr(getOriginalFunctionForRewritten(expr)); - if (rewrittenSlot != null) { - TupleDescriptor tupleDescriptor = context.getTupleDesc(olapScanNode.getTupleId()); - context.createSlotDesc(tupleDescriptor, (SlotReference) rewrittenSlot); - } + Slot argumentSlot = function.getInputSlots().stream().findFirst().get(); + Expression rewrittenSlot = PushDownToProjectionFunction.rewriteToSlot( + function, (SlotReference) argumentSlot); + TupleDescriptor tupleDescriptor = context.getTupleDesc(olapScanNode.getTupleId()); + context.createSlotDesc(tupleDescriptor, (SlotReference) rewrittenSlot); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionOptimization.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionOptimization.java index 6064a8d210..e7b3a308f0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionOptimization.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionOptimization.java @@ -21,7 +21,6 @@ import org.apache.doris.nereids.rules.expression.rules.ArrayContainToArrayOverla import org.apache.doris.nereids.rules.expression.rules.CaseWhenToIf; import org.apache.doris.nereids.rules.expression.rules.DateFunctionRewrite; import org.apache.doris.nereids.rules.expression.rules.DistinctPredicatesRule; -import org.apache.doris.nereids.rules.expression.rules.ElementAtToSlot; import org.apache.doris.nereids.rules.expression.rules.ExtractCommonFactorRule; import org.apache.doris.nereids.rules.expression.rules.OrToIn; import org.apache.doris.nereids.rules.expression.rules.SimplifyComparisonPredicate; @@ -49,8 +48,7 @@ public class ExpressionOptimization extends ExpressionRewrite { OrToIn.INSTANCE, ArrayContainToArrayOverlap.INSTANCE, CaseWhenToIf.INSTANCE, - TopnToMax.INSTANCE, - ElementAtToSlot.INSTANCE + TopnToMax.INSTANCE ); private static final ExpressionRuleExecutor EXECUTOR = new ExpressionRuleExecutor(OPTIMIZE_REWRITE_RULES); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/ElementAtToSlot.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/ElementAtToSlot.java deleted file mode 100644 index adc050c987..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/ElementAtToSlot.java +++ /dev/null @@ -1,89 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.rules.expression.rules; - -import org.apache.doris.nereids.StatementContext; -import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext; -import org.apache.doris.nereids.rules.expression.ExpressionRewriteRule; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.SlotReference; -import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; -import org.apache.doris.nereids.trees.expressions.functions.scalar.ElementAt; -import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral; -import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter; -import org.apache.doris.qe.ConnectContext; - -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -/** - * Transform element_at function to SlotReference for variant sub-column access. - * This optimization will help query engine to prune as many sub columns as possible - * to speed up query. - * eg: element_at(element_at(v, "a"), "b") -> SlotReference(column=v, subColLabels=["a", "b"]) - */ -public class ElementAtToSlot extends DefaultExpressionRewriter implements - ExpressionRewriteRule { - - public static final ElementAtToSlot INSTANCE = new ElementAtToSlot(); - - @Override - public Expression rewrite(Expression expr, ExpressionRewriteContext ctx) { - return expr.accept(this, ctx); - } - - /** - * Rewrites an {@link ElementAt} instance to a {@link SlotReference}. - * This method is used to transform an ElementAt expr into a SlotReference, - * based on the provided topColumnSlot and the context of the statement. - * - * @param elementAt The {@link ElementAt} instance to be rewritten. - * @param topColumnSlot The {@link SlotReference} that represents the top column slot. - * @return A {@link SlotReference} that represents the rewritten element. - * If a target column slot is found in the context, it is returned to avoid duplicates. - * Otherwise, a new SlotReference is created and added to the context. - */ - public static Expression rewriteToSlot(ElementAt elementAt, SlotReference topColumnSlot) { - // rewrite to slotRef - StatementContext ctx = ConnectContext.get().getStatementContext(); - List fullPaths = elementAt.collectToList(node -> node instanceof VarcharLiteral).stream() - .map(node -> ((VarcharLiteral) node).getValue()) - .collect(Collectors.toList()); - SlotReference targetColumnSlot = ctx.getPathSlot(topColumnSlot, fullPaths); - if (targetColumnSlot != null) { - // avoid duplicated slots - return targetColumnSlot; - } - SlotReference slotRef = new SlotReference(StatementScopeIdGenerator.newExprId(), - topColumnSlot.getName(), topColumnSlot.getDataType(), - topColumnSlot.nullable(), topColumnSlot.getQualifier(), topColumnSlot.getTable().get(), - topColumnSlot.getColumn().get(), Optional.of(topColumnSlot.getInternalName()), - fullPaths); - ctx.addPathSlotRef(topColumnSlot, fullPaths, slotRef, elementAt); - ctx.addSlotToRelation(slotRef, ctx.getRelationBySlot(topColumnSlot)); - - return slotRef; - } - - @Override - public Expression visitElementAt(ElementAt elementAt, ExpressionRewriteContext context) { - // todo - return elementAt; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FunctionBinder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FunctionBinder.java index f60f38f764..bf0812cdc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FunctionBinder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FunctionBinder.java @@ -205,15 +205,14 @@ public class FunctionBinder extends AbstractExpressionRewriteRule { if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null && !ConnectContext.get().getSessionVariable().isEnableRewriteElementAtToSlot()) { - throw new AnalysisException( - "set enable_rewrite_element_at_to_slot=true when using element_at function for variant type"); + return boundFunction; } Slot slot = elementAt.getInputSlots().stream().findFirst().get(); if (slot.hasUnbound()) { slot = (Slot) super.visit(slot, context); } // rewrite to slot and bound this slot - return ElementAtToSlot.rewriteToSlot(elementAt, (SlotReference) slot); + return PushDownToProjectionFunction.rewriteToSlot(elementAt, (SlotReference) slot); } return boundFunction; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java index 6bd5f1bd8e..d4fe6d5043 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java @@ -39,8 +39,8 @@ import java.util.List; /** * ScalarFunction 'element_at'. This class is generated by GenerateFunction. */ -public class ElementAt extends ScalarFunction - implements BinaryExpression, ExplicitlyCastableSignature, AlwaysNullable, PushDownToProjectionFunction { +public class ElementAt extends PushDownToProjectionFunction + implements BinaryExpression, ExplicitlyCastableSignature, AlwaysNullable { public static final List SIGNATURES = ImmutableList.of( FunctionSignature.ret(new FollowToAnyDataType(0)) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/PushDownToProjectionFunction.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/PushDownToProjectionFunction.java index e2473ea095..362a84bc94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/PushDownToProjectionFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/PushDownToProjectionFunction.java @@ -17,24 +17,68 @@ package org.apache.doris.nereids.trees.expressions.functions.scalar; +import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; +import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral; +import org.apache.doris.qe.ConnectContext; +import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; /** * Function that could be rewritten and pushed down to projection */ -public interface PushDownToProjectionFunction { +public abstract class PushDownToProjectionFunction extends ScalarFunction { + public PushDownToProjectionFunction(String name, Expression... arguments) { + super(name, arguments); + } + /** * check if specified function could be pushed down to project * @param pushDownExpr expr to check * @return if it is valid to push down input expr */ - static boolean validToPushDown(Expression pushDownExpr) { + public static boolean validToPushDown(Expression pushDownExpr) { // Currently only element at for variant type could be pushed down - return !pushDownExpr.collectToList( + return pushDownExpr != null && !pushDownExpr.collectToList( PushDownToProjectionFunction.class::isInstance).stream().filter( x -> ((Expression) x).getDataType().isVariantType()).collect( Collectors.toList()).isEmpty(); } + + /** + * Rewrites an {@link PushDownToProjectionFunction} instance to a {@link SlotReference}. + * This method is used to transform an PushDownToProjectionFunction expr into a SlotReference, + * based on the provided topColumnSlot and the context of the statement. + * + * @param pushedFunction The {@link PushDownToProjectionFunction} instance to be rewritten. + * @param topColumnSlot The {@link SlotReference} that represents the top column slot. + * @return A {@link SlotReference} that represents the rewritten element. + * If a target column slot is found in the context, it is returned to avoid duplicates. + * Otherwise, a new SlotReference is created and added to the context. + */ + public static Expression rewriteToSlot(PushDownToProjectionFunction pushedFunction, SlotReference topColumnSlot) { + // rewrite to slotRef + StatementContext ctx = ConnectContext.get().getStatementContext(); + List fullPaths = pushedFunction.collectToList(node -> node instanceof VarcharLiteral).stream() + .map(node -> ((VarcharLiteral) node).getValue()) + .collect(Collectors.toList()); + SlotReference targetColumnSlot = ctx.getPathSlot(topColumnSlot, fullPaths); + if (targetColumnSlot != null) { + // avoid duplicated slots + return targetColumnSlot; + } + SlotReference slotRef = new SlotReference(StatementScopeIdGenerator.newExprId(), + topColumnSlot.getName(), topColumnSlot.getDataType(), + topColumnSlot.nullable(), topColumnSlot.getQualifier(), topColumnSlot.getTable().get(), + topColumnSlot.getColumn().get(), Optional.of(topColumnSlot.getInternalName()), + fullPaths); + ctx.addPathSlotRef(topColumnSlot, fullPaths, slotRef, pushedFunction); + ctx.addSlotToRelation(slotRef, ctx.getRelationBySlot(topColumnSlot)); + + return slotRef; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 93a592d64a..df394ec281 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -186,6 +186,8 @@ public class SessionVariable implements Serializable, Writable { public static final String DELETE_WITHOUT_PARTITION = "delete_without_partition"; + public static final String ENABLE_VARIANT_ACCESS_IN_ORIGINAL_PLANNER = "enable_variant_access_in_original_planner"; + // set the default parallelism for send batch when execute InsertStmt operation, // if the value for parallelism exceed `max_send_batch_parallelism_per_job` in BE config, // then the coordinator be will use the value of `max_send_batch_parallelism_per_job` @@ -793,6 +795,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = SEND_BATCH_PARALLELISM, needForward = true) public int sendBatchParallelism = 1; + @VariableMgr.VarAttr(name = ENABLE_VARIANT_ACCESS_IN_ORIGINAL_PLANNER) + public boolean enableVariantAccessInOriginalPlanner = false; + @VariableMgr.VarAttr(name = EXTRACT_WIDE_RANGE_EXPR, needForward = true) public boolean extractWideRangeExpr = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/rewrite/ElementAtToSlotRefRule.java b/fe/fe-core/src/main/java/org/apache/doris/rewrite/ElementAtToSlotRefRule.java index e6234b002f..e1d32530ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rewrite/ElementAtToSlotRefRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rewrite/ElementAtToSlotRefRule.java @@ -42,12 +42,6 @@ public class ElementAtToSlotRefRule implements ExprRewriteRule { @Override public Expr apply(Expr expr, Analyzer analyzer, ClauseType clauseType) throws AnalysisException { - // Only check element at of variant all rewrited to slots - List elementAtFunctions = Lists.newArrayList(); - getElementAtFunction(expr, elementAtFunctions); - if (!elementAtFunctions.isEmpty()) { - throw new AnalysisException("element_at should not appear in common rewrite stage"); - } return expr; } diff --git a/regression-test/data/variant_p0/mv/multi_slot.out b/regression-test/data/variant_p0/mv/multi_slot.out new file mode 100644 index 0000000000..2499e28289 --- /dev/null +++ b/regression-test/data/variant_p0/mv/multi_slot.out @@ -0,0 +1,43 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_star -- +\N \N +456 \N + +-- !select_star -- +\N \N +\N \N +1 \N +1 \N +1 \N +3 \N +5 \N + +-- !select_star -- +-4 {"k1":-4,"k2":-4,"k3":"d"} +-5 {"k1":-4,"k2":-4,"k4":"d"} +-6 {"k1":-4,"k2":-4,"k4":{"k44":789}} +1 {"k1":1,"k2":1,"k3":"a"} +2 {"k1":2,"k2":2,"k3":"b"} +3 {"k1":-3,"k3":"c"} +4 {"k1":-3,"k4":{"k44":456}} + +-- !select_star -- +\N \N +456 \N +789 \N + +-- !select_mv -- +\N \N +1 \N +3 \N +5 \N + +-- !select_mv -- +\N \N +\N \N +1 \N +1 \N +1 \N +3 \N +5 \N + diff --git a/regression-test/suites/variant_p0/mv/multi_slot.groovy b/regression-test/suites/variant_p0/mv/multi_slot.groovy new file mode 100644 index 0000000000..98b8f7f549 --- /dev/null +++ b/regression-test/suites/variant_p0/mv/multi_slot.groovy @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite ("multi_slot") { + sql """ DROP TABLE IF EXISTS multi_slot; """ + + sql """ + create table multi_slot( + k int null, + v variant null + ) + duplicate key (k) + distributed BY hash(k) buckets 3 + properties("replication_num" = "1"); + """ + + sql """insert into multi_slot select 1,'{"k1" : 1, "k2" : 1, "k3" : "a"}';""" + sql """insert into multi_slot select 2,'{"k1" : 2, "k2" : 2, "k3" : "b"}';""" + sql """insert into multi_slot select 3,'{"k1" : -3, "k2" : null, "k3" : "c"}';""" + sql """insert into multi_slot select 4,'{"k1" : -3, "k2" : null, "k4" : {"k44" : 456}}';""" + order_qt_select_star "select abs(cast(v['k4']['k44'] as int)), sum(abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3) from multi_slot group by abs(cast(v['k4']['k44'] as int))" + + createMV ("create materialized view k1a2p2ap3p as select abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1,abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3 from multi_slot;") + + createMV("create materialized view k1a2p2ap3ps as select abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1,sum(abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3) from multi_slot group by abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1;") + + createMV("create materialized view k1a2p2ap3psp as select abs(cast(v['k4']['k44'] as int)), sum(abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3) from multi_slot group by abs(cast(v['k4']['k44'] as int));") + + sql """insert into multi_slot select -4,'{"k1" : -4, "k2" : -4, "k3" : "d"}';""" + sql """insert into multi_slot select -5,'{"k1" : -4, "k2" : -4, "k4" : "d"}';""" + sql """insert into multi_slot select -6,'{"k1" : -4, "k2" : -4, "k4" : {"k44" : 789}}';""" + + sql "SET experimental_enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + + + order_qt_select_star "select abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1,abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3 from multi_slot;" + order_qt_select_star "select * from multi_slot order by cast(v['k1'] as int);" + // TODO fix and remove enable_rewrite_element_at_to_slot + order_qt_select_star "select /*+SET_VAR(enable_rewrite_element_at_to_slot=false) */ abs(cast(v['k4']['k44'] as int)), sum(abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3) from multi_slot group by abs(cast(v['k4']['k44'] as int))" + + def retry_times = 60 + for (def i = 0; i < retry_times; ++i) { + boolean is_k1a2p2ap3p = false + boolean is_k1a2p2ap3ps = false + boolean is_d_table = false + explain { + sql("select /*+SET_VAR(enable_rewrite_element_at_to_slot=false) */ abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1,sum(abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3) from multi_slot group by abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1 order by abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1") + check { explainStr, ex, startTime, endTime -> + if (ex != null) { + throw ex; + } + logger.info("explain result: ${explainStr}".toString()) + is_k1a2p2ap3p = explainStr.contains"(k1a2p2ap3p)" + is_k1a2p2ap3ps = explainStr.contains("(k1a2p2ap3ps)") + is_d_table = explainStr.contains("(multi_slot)") + assert is_k1a2p2ap3p || is_k1a2p2ap3ps || is_d_table + } + } + // FIXME: the mv selector maybe select base table forever when exist multi mv, + // so this pr just treat as success if select base table. + // we should remove is_d_table in the future + if (is_d_table || is_k1a2p2ap3p || is_k1a2p2ap3ps) { + break + } + if (i + 1 == retry_times) { + throw new IllegalStateException("retry and failed too much") + } + sleep(1000) + } + order_qt_select_mv "select /*+SET_VAR(enable_rewrite_element_at_to_slot=false) */ abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1,sum(abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3) from multi_slot group by abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1 order by abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1;" + + explain { + sql("select abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1,abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3 from multi_slot order by abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1,abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3") + contains "(k1a2p2ap3p)" + } + order_qt_select_mv "select abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1,abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3 from multi_slot order by abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1,abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3;" + +} diff --git a/regression-test/suites/variant_p0/schema_change/schema_change.groovy b/regression-test/suites/variant_p0/schema_change/schema_change.groovy index fe593553fd..42cef32c8e 100644 --- a/regression-test/suites/variant_p0/schema_change/schema_change.groovy +++ b/regression-test/suites/variant_p0/schema_change/schema_change.groovy @@ -77,6 +77,6 @@ suite("regression_test_variant_schema_change", "variant_type"){ sql """INSERT INTO ${table_name} SELECT k, v, v from ${table_name} limit 8101""" sql """DROP MATERIALIZED VIEW var_cnt ON ${table_name}""" sql """INSERT INTO ${table_name} SELECT k, v,v from ${table_name} limit 1111""" - // TODO support select from mv - // qt_sql """select v['k1'], cast(v['k2'] as string) from ${table_name} order by k desc limit 10""" + // select from mv + qt_sql """select v['k1'], cast(v['k2'] as string) from ${table_name} order by k desc limit 10""" } \ No newline at end of file