[Feature](materialized-view) support create mv contain aggstate column (#20812)

support create mv contain aggstate column
This commit is contained in:
Pxl
2023-06-21 13:06:52 +08:00
committed by GitHub
parent fcd778fb4f
commit 5f0bb49d46
15 changed files with 393 additions and 1085 deletions

View File

@ -25,6 +25,7 @@
#include <string>
#include "common/config.h"
#include "common/exception.h"
#include "olap/schema_change.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"
@ -45,8 +46,12 @@ EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request)
Status EngineAlterTabletTask::execute() {
SCOPED_ATTACH_TASK(_mem_tracker);
DorisMetrics::instance()->create_rollup_requests_total->increment(1);
Status res = SchemaChangeHandler::process_alter_tablet_v2(_alter_tablet_req);
Status res = Status::OK();
try {
res = SchemaChangeHandler::process_alter_tablet_v2(_alter_tablet_req);
} catch (const Exception& e) {
res = e.to_status();
}
if (!res.ok()) {
DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
return res;

View File

@ -188,6 +188,10 @@ template <template <typename> class AggregateFunctionTemplate,
AggregateFunctionPtr create_aggregate_function_min_max_by(const String& name,
const DataTypes& argument_types,
const bool result_is_nullable) {
if (argument_types.size() != 2) {
return nullptr;
}
WhichDataType which(remove_nullable(argument_types[0]));
#define DISPATCH(TYPE) \
if (which.idx == TypeIndex::TYPE) \

View File

@ -165,7 +165,9 @@ bool DataTypeNullable::equals(const IDataType& rhs) const {
}
DataTypePtr make_nullable(const DataTypePtr& type) {
if (type->is_nullable()) return type;
if (type->is_nullable()) {
return type;
}
return std::make_shared<DataTypeNullable>(type);
}
@ -178,7 +180,9 @@ DataTypes make_nullable(const DataTypes& types) {
}
DataTypePtr remove_nullable(const DataTypePtr& type) {
if (type->is_nullable()) return static_cast<const DataTypeNullable&>(*type).get_nested_type();
if (type->is_nullable()) {
return assert_cast<const DataTypeNullable*>(type.get())->get_nested_type();
}
return type;
}

View File

@ -518,8 +518,7 @@ public class ColumnDef {
}
return new Column(name, type, isKey, aggregateType, isAllowNull, isAutoInc, defaultValue.value, comment,
visible, defaultValue.defaultValueExprDef, Column.COLUMN_UNIQUE_ID_INIT_VALUE, defaultValue.getValue(),
genericAggregationName, typeList, nullableList);
visible, defaultValue.defaultValueExprDef, Column.COLUMN_UNIQUE_ID_INIT_VALUE, defaultValue.getValue());
}
@Override

View File

@ -19,6 +19,8 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Function;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndexMeta;
@ -224,20 +226,6 @@ public class CreateMaterializedViewStmt extends DdlStmt {
if (selectListItemExpr instanceof FunctionCallExpr
&& ((FunctionCallExpr) selectListItemExpr).isAggregateFunction()) {
FunctionCallExpr functionCallExpr = (FunctionCallExpr) selectListItemExpr;
String functionName = functionCallExpr.getFnName().getFunction();
// current version not support count(distinct) function in creating materialized
// view
if (!isReplay) {
MVColumnPattern mvColumnPattern = FN_NAME_TO_PATTERN.get(functionName.toLowerCase());
if (mvColumnPattern == null) {
throw new AnalysisException(
"Materialized view does not support this function:" + functionCallExpr.toSqlImpl());
}
if (!mvColumnPattern.match(functionCallExpr)) {
throw new AnalysisException(
"The function " + functionName + " must match pattern:" + mvColumnPattern.toString());
}
}
if (beginIndexOfAggregation == -1) {
beginIndexOfAggregation = i;
@ -415,7 +403,6 @@ public class CreateMaterializedViewStmt extends DdlStmt {
throws AnalysisException {
String functionName = functionCallExpr.getFnName().getFunction();
List<Expr> childs = functionCallExpr.getChildren();
Preconditions.checkArgument(childs.size() == 1);
Expr defineExpr = childs.get(0);
Type baseType = defineExpr.getType();
AggregateType mvAggregateType = null;
@ -465,7 +452,9 @@ public class CreateMaterializedViewStmt extends DdlStmt {
type = Type.BIGINT;
break;
default:
throw new AnalysisException("Unsupported function:" + functionName);
mvAggregateType = AggregateType.GENERIC_AGGREGATION;
defineExpr = Function.convertToStateCombinator(functionCallExpr);
type = defineExpr.type;
}
if (mvAggregateType == null) {
mvAggregateType = AggregateType.valueOf(functionName.toUpperCase());
@ -478,8 +467,8 @@ public class CreateMaterializedViewStmt extends DdlStmt {
} else if (defineExpr instanceof CastExpr && defineExpr.getChild(0) instanceof SlotRef) {
slot = (SlotRef) defineExpr.getChild(0);
} else {
throw new AnalysisException("Aggregate function require single slot argument, invalid argument is: "
+ defineExpr.toSql());
throw new AnalysisException(
"Aggregate function require single slot argument, invalid argument is: " + defineExpr.toSql());
}
AggregateType input = slot.getColumn().getAggregationType();
@ -512,6 +501,12 @@ public class CreateMaterializedViewStmt extends DdlStmt {
name = item.getName();
break;
default:
if (Env.getCurrentEnv()
.isAggFunctionName(functionCallExpr.getFnName().getFunction().toLowerCase())) {
MVColumnItem genericItem = buildMVColumnItem(analyzer, functionCallExpr);
expr = genericItem.getDefineExpr();
name = genericItem.getName();
}
break;
}
}

View File

@ -69,8 +69,8 @@ public class Column implements Writable, GsonPostProcessable {
private static final String COLUMN_MAP_KEY = "key";
private static final String COLUMN_MAP_VALUE = "value";
public static final Column UNSUPPORTED_COLUMN = new Column("unknown",
Type.UNSUPPORTED, true, null, true, false, null, "invalid", true, null, -1, null, null, null, null);
public static final Column UNSUPPORTED_COLUMN = new Column("unknown", Type.UNSUPPORTED, true, null, true, false,
null, "invalid", true, null, -1, null);
@SerializedName(value = "name")
private String name;
@ -170,26 +170,24 @@ public class Column implements Writable, GsonPostProcessable {
public Column(String name, Type type, boolean isKey, AggregateType aggregateType, boolean isAllowNull,
String defaultValue, String comment) {
this(name, type, isKey, aggregateType, isAllowNull, false, defaultValue, comment, true, null,
COLUMN_UNIQUE_ID_INIT_VALUE, defaultValue, null, null, null);
COLUMN_UNIQUE_ID_INIT_VALUE, defaultValue);
}
public Column(String name, Type type, boolean isKey, AggregateType aggregateType, boolean isAllowNull,
String comment, boolean visible, int colUniqueId) {
this(name, type, isKey, aggregateType, isAllowNull, false, null, comment, visible, null,
colUniqueId, null, null, null, null);
this(name, type, isKey, aggregateType, isAllowNull, false, null, comment, visible, null, colUniqueId, null);
}
public Column(String name, Type type, boolean isKey, AggregateType aggregateType, boolean isAllowNull,
String defaultValue, String comment, boolean visible, DefaultValueExprDef defaultValueExprDef,
int colUniqueId, String realDefaultValue) {
this(name, type, isKey, aggregateType, isAllowNull, false, defaultValue, comment, visible, defaultValueExprDef,
colUniqueId, realDefaultValue, null, null, null);
colUniqueId, realDefaultValue);
}
public Column(String name, Type type, boolean isKey, AggregateType aggregateType, boolean isAllowNull,
boolean isAutoInc, String defaultValue, String comment, boolean visible,
DefaultValueExprDef defaultValueExprDef, int colUniqueId, String realDefaultValue,
String genericAggregationName, List<Type> genericAggregationArguments, List<Boolean> nullableList) {
DefaultValueExprDef defaultValueExprDef, int colUniqueId, String realDefaultValue) {
this.name = name;
if (this.name == null) {
this.name = "";
@ -215,13 +213,15 @@ public class Column implements Writable, GsonPostProcessable {
createChildrenColumn(this.type, this);
this.uniqueId = colUniqueId;
this.genericAggregationName = genericAggregationName;
if (genericAggregationArguments != null) {
for (int i = 0; i < genericAggregationArguments.size(); i++) {
Column c = new Column(COLUMN_AGG_ARGUMENT_CHILDREN, genericAggregationArguments.get(i));
c.setIsAllowNull(nullableList.get(i));
if (type.isAggStateType()) {
AggStateType aggState = (AggStateType) type;
for (int i = 0; i < aggState.getSubTypes().size(); i++) {
Column c = new Column(COLUMN_AGG_ARGUMENT_CHILDREN, aggState.getSubTypes().get(i));
c.setIsAllowNull(aggState.getSubTypeNullables().get(i));
addChildrenColumn(c);
}
this.genericAggregationName = aggState.getFunctionName();
this.aggregationType = AggregateType.GENERIC_AGGREGATION;
}
}
@ -516,8 +516,9 @@ public class Column implements Writable, GsonPostProcessable {
tColumn.setColUniqueId(uniqueId);
if (type.isAggStateType()) {
tColumn.setAggregation(genericAggregationName);
tColumn.setResultIsNullable(((AggStateType) type).getResultIsNullable());
AggStateType aggState = (AggStateType) type;
tColumn.setAggregation(aggState.getFunctionName());
tColumn.setResultIsNullable(aggState.getResultIsNullable());
for (Column column : children) {
tColumn.addToChildrenColumn(column.toThrift());
}

View File

@ -4743,6 +4743,10 @@ public class Env {
return functionSet.isNullResultWithOneNullParamFunctions(funcName);
}
public boolean isAggFunctionName(String name) {
return functionSet.isAggFunctionName(name);
}
@Deprecated
public long loadCluster(DataInputStream dis, long checksum) throws IOException, DdlException {
return getInternalCatalog().loadCluster(dis, checksum);

View File

@ -216,7 +216,7 @@ public class MaterializedIndexMeta implements Writable, GsonPostProcessable {
}
if (matchedColumn != null) {
LOG.debug("trans old MV, MV: {}, DefineExpr:{}, DefineName:{}",
LOG.info("trans old MV: {}, DefineExpr:{}, DefineName:{}",
matchedColumn.getName(), entry.getValue().toSqlWithoutTbl(), entry.getKey());
matchedColumn.setDefineExpr(entry.getValue());
matchedColumn.setDefineName(entry.getKey());

View File

@ -411,9 +411,8 @@ public class HMSExternalTable extends ExternalTable {
tmpSchema.add(new Column(field.getName(),
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType(),
IcebergExternalTable.ICEBERG_DATETIME_SCALE_MS),
true, null,
true, false, null, field.getComment(), true, null,
schema.caseInsensitiveFindField(field.getName()).fieldId(), null, null, null, null));
true, null, true, false, null, field.getComment(), true, null,
schema.caseInsensitiveFindField(field.getName()).fieldId(), null));
}
return tmpSchema;
}

View File

@ -196,8 +196,7 @@ public class JdbcMySQLClient extends JdbcClient {
dorisTableSchema.add(new Column(field.getColumnName(),
jdbcTypeToDoris(field), field.isKey(), null,
field.isAllowNull(), field.isAutoincrement(), field.getDefaultValue(), field.getRemarks(),
true, null, -1, null,
null, null, null));
true, null, -1, null));
}
return dorisTableSchema;
}

View File

@ -0,0 +1,8 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_star --
\N 4 \N d
-4 -4 -4 d
1 1 1 a
2 2 2 b
3 -3 \N c

View File

@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.codehaus.groovy.runtime.IOGroovyMethods
suite ("test_agg_state_max_by") {
sql """set enable_nereids_planner=true"""
sql """ DROP TABLE IF EXISTS d_table; """
sql """
create table d_table(
k1 int null,
k2 int not null,
k3 bigint null,
k4 varchar(100) null
)
duplicate key (k1,k2,k3)
distributed BY hash(k1) buckets 3
properties("replication_num" = "1");
"""
sql "insert into d_table select 1,1,1,'a';"
sql "insert into d_table select 2,2,2,'b';"
sql "insert into d_table select 3,-3,null,'c';"
sql "insert into d_table(k4,k2) values('d',4);"
createMV("create materialized view k1mb as select k1,max_by(k2,k3) from d_table group by k1;")
sql "insert into d_table select -4,-4,-4,'d';"
qt_select_star "select * from d_table order by k1;"
/*
explain {
sql("select k1,max_by(k2,k3) from d_table group by k1 order by k1;")
contains "(k1mb)"
}
qt_select_mv "select k1,max_by(k2,k3) from d_table group by k1 order by k1;"
*/
}

View File

@ -37,11 +37,6 @@ suite ("sum_devide_count") {
sql "insert into d_table select 2,2,2,'b';"
sql "insert into d_table select 3,-3,null,'c';"
test {
sql "create materialized view kavg as select k1,k4,avg(k2) from d_table group by k1,k4;"
exception "errCode = 2,"
}
createMV ("create materialized view kavg as select k1,k4,sum(k2),count(k2) from d_table group by k1,k4;")
sql "insert into d_table select -4,-4,-4,'d';"

View File

@ -39,11 +39,6 @@ suite ("sum_devide_count") {
sql "SET experimental_enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
test {
sql "create materialized view kavg as select k1,k4,avg(k2) from sum_devide_count group by k1,k4;"
exception "errCode = 2,"
}
createMV ("create materialized view kavg as select k1,k4,sum(k2),count(k2) from sum_devide_count group by k1,k4;")
sleep(3000)