diff --git a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index 6f8f482cfb..9357747781 100644 --- a/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -45,6 +45,7 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; @@ -59,7 +60,7 @@ import org.apache.doris.thrift.TStorageMedium; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -85,6 +86,7 @@ import java.util.concurrent.ConcurrentHashMap; */ public class MaterializedViewHandler extends AlterHandler { private static final Logger LOG = LogManager.getLogger(MaterializedViewHandler.class); + public static final String NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX = "__v2_"; public MaterializedViewHandler() { super("materialized view"); @@ -325,7 +327,7 @@ public class MaterializedViewHandler extends AlterHandler { baseIndexId, mvIndexId, baseIndexName, mvName, mvColumns, baseSchemaHash, mvSchemaHash, mvKeysType, mvShortKeyColumnCount); - String newStorageFormatIndexName = "__v2_" + olapTable.getName(); + String newStorageFormatIndexName = NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + olapTable.getName(); if (mvName.equals(newStorageFormatIndexName)) { mvJob.setStorageFormat(TStorageFormat.V2); } @@ -440,7 +442,7 @@ public class MaterializedViewHandler extends AlterHandler { String rollupIndexName = addRollupClause.getRollupName(); List rollupColumnNames = addRollupClause.getColumnNames(); if (changeStorageFormat) { - String newStorageFormatIndexName = "__v2_" + olapTable.getName(); + String newStorageFormatIndexName = NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + olapTable.getName(); rollupIndexName = newStorageFormatIndexName; List columns = olapTable.getSchemaByIndexId(baseIndexId); // create the same schema as base table @@ -465,10 +467,14 @@ public class MaterializedViewHandler extends AlterHandler { boolean hasKey = false; boolean meetReplaceValue = false; KeysType keysType = olapTable.getKeysType(); + Map baseColumnNameToColumn = Maps.newHashMap(); + for (Column column : olapTable.getSchemaByIndexId(baseIndexId)) { + baseColumnNameToColumn.put(column.getName(), column); + } if (keysType.isAggregationFamily()) { int keysNumOfRollup = 0; for (String columnName : rollupColumnNames) { - Column oneColumn = olapTable.getColumn(columnName); + Column oneColumn = baseColumnNameToColumn.get(columnName); if (oneColumn == null) { throw new DdlException("Column[" + columnName + "] does not exist"); } @@ -503,69 +509,41 @@ public class MaterializedViewHandler extends AlterHandler { } } } else if (KeysType.DUP_KEYS == keysType) { - /* - * eg. - * Base Table's schema is (k1,k2,k3,k4,k5) dup key (k1,k2,k3). - * The following rollup is allowed: - * 1. (k1) dup key (k1) - * 2. (k2,k3) dup key (k2) - * 3. (k1,k2,k3) dup key (k1,k2) - * - * The following rollup is forbidden: - * 1. (k1) dup key (k2) - * 2. (k2,k3) dup key (k3,k2) - * 3. (k1,k2,k3) dup key (k2,k3) - */ + // supplement the duplicate key if (addRollupClause.getDupKeys() == null || addRollupClause.getDupKeys().isEmpty()) { - // user does not specify duplicate key for rollup, - // use base table's duplicate key. - // so we should check if rollup columns contains all base table's duplicate key. - List baseIdxCols = olapTable.getSchemaByIndexId(baseIndexId); - Set baseIdxKeyColNames = Sets.newHashSet(); - for (Column baseCol : baseIdxCols) { - if (baseCol.isKey()) { - baseIdxKeyColNames.add(baseCol.getName()); + int keyStorageLayoutBytes = 0; + for (int i = 0; i < rollupColumnNames.size(); i++) { + String columnName = rollupColumnNames.get(i); + Column baseColumn = baseColumnNameToColumn.get(columnName); + if (baseColumn == null) { + throw new DdlException("Column[" + columnName + "] does not exist in base index"); + } + keyStorageLayoutBytes += baseColumn.getType().getStorageLayoutBytes(); + Column rollupColumn = new Column(baseColumn); + if ((i + 1) <= FeConstants.shortkey_max_column_count + || keyStorageLayoutBytes < FeConstants.shortkey_maxsize_bytes) { + rollupColumn.setIsKey(true); + rollupColumn.setAggregationType(null, false); } else { - break; + rollupColumn.setIsKey(false); + rollupColumn.setAggregationType(AggregateType.NONE, true); } - } - - boolean found = false; - for (String baseIdxKeyColName : baseIdxKeyColNames) { - found = false; - for (String rollupColName : rollupColumnNames) { - if (rollupColName.equalsIgnoreCase(baseIdxKeyColName)) { - found = true; - break; - } - } - if (!found) { - throw new DdlException("Rollup should contains all base table's duplicate keys if " - + "no duplicate key is specified: " + baseIdxKeyColName); - } - } - - // check (a)(b) - for (String columnName : rollupColumnNames) { - Column oneColumn = olapTable.getColumn(columnName); - if (oneColumn == null) { - throw new DdlException("Column[" + columnName + "] does not exist"); - } - if (oneColumn.isKey() && meetValue) { - throw new DdlException("Invalid column order. key should before all values: " + columnName); - } - if (oneColumn.isKey()) { - hasKey = true; - } else { - meetValue = true; - } - rollupSchema.add(oneColumn); - } - - if (!hasKey) { - throw new DdlException("No key column is found"); + rollupSchema.add(rollupColumn); } } else { + /* + * eg. + * Base Table's schema is (k1,k2,k3,k4,k5) dup key (k1,k2,k3). + * The following rollup is allowed: + * 1. (k1) dup key (k1) + * 2. (k2,k3) dup key (k2) + * 3. (k1,k2,k3) dup key (k1,k2) + * + * The following rollup is forbidden: + * 1. (k1) dup key (k2) + * 2. (k2,k3) dup key (k3,k2) + * 3. (k1,k2,k3) dup key (k2,k3) + */ // user specify the duplicate keys for rollup index List dupKeys = addRollupClause.getDupKeys(); if (dupKeys.size() > rollupColumnNames.size()) { @@ -583,7 +561,8 @@ public class MaterializedViewHandler extends AlterHandler { isKey = true; } - if (olapTable.getColumn(rollupColName) == null) { + Column baseColumn = baseColumnNameToColumn.get(rollupColName); + if (baseColumn == null) { throw new DdlException("Column[" + rollupColName + "] does not exist"); } @@ -591,7 +570,7 @@ public class MaterializedViewHandler extends AlterHandler { throw new DdlException("Invalid column order. key should before all values: " + rollupColName); } - Column oneColumn = new Column(olapTable.getColumn(rollupColName)); + Column oneColumn = new Column(baseColumn); if (isKey) { hasKey = true; oneColumn.setIsKey(true); diff --git a/fe/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/src/main/java/org/apache/doris/analysis/Analyzer.java index 04e0dd7340..85f5600ef5 100644 --- a/fe/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -1063,23 +1063,16 @@ public class Analyzer { return uniqueTableAliasSet_; } - public List getAllConjunt(TupleId id) { + public List getAllConjuncts(TupleId id) { List conjunctIds = tuplePredicates.get(id); if (conjunctIds == null) { return null; } List result = Lists.newArrayList(); - List ojClauseConjuncts = null; for (ExprId conjunctId : conjunctIds) { Expr e = globalState.conjuncts.get(conjunctId); Preconditions.checkState(e != null); - if (ojClauseConjuncts != null) { - if (ojClauseConjuncts.contains(conjunctId)) { - result.add(e); - } - } else { - result.add(e); - } + result.add(e); } return result; } diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java index 593cd4e5fd..2af26d321a 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateMaterializedViewStmt.java @@ -19,6 +19,7 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.AggregateType; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.FeConstants; @@ -27,6 +28,7 @@ import org.apache.doris.common.UserException; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.util.List; @@ -93,9 +95,13 @@ public class CreateMaterializedViewStmt extends DdlStmt { @Override public void analyze(Analyzer analyzer) throws UserException { + // TODO(ml): remove it + if (!Config.enable_materialized_view) { + throw new AnalysisException("The materialized view is coming soon"); + } super.analyze(analyzer); FeNameFormat.checkTableName(mvName); - // TODO(ml): the mv name in from clause should pass the analyze without error. + // TODO(ml): The mv name in from clause should pass the analyze without error. selectStmt.analyze(analyzer); analyzeSelectClause(); analyzeFromClause(); @@ -103,6 +109,14 @@ public class CreateMaterializedViewStmt extends DdlStmt { throw new AnalysisException("The where clause is not supported in add materialized view clause, expr:" + selectStmt.getWhereClause().toSql()); } + if (selectStmt.getGroupByClause() != null) { + // TODO(ml): The metadata in fe does not support the deduplicate mv in base table + // which keys type is duplicated. + if (selectStmt.getGroupByClause().getGroupingExprs().size() == + selectStmt.getSelectList().getItems().size()) { + throw new AnalysisException("The deduplicate materialized view is not yet supported"); + } + } if (selectStmt.getHavingPred() != null) { throw new AnalysisException("The having clause is not supported in add materialized view clause, expr:" + selectStmt.getHavingPred().toSql()); @@ -120,7 +134,8 @@ public class CreateMaterializedViewStmt extends DdlStmt { throw new AnalysisException("The materialized view must contain at least one column"); } boolean meetAggregate = false; - Set mvColumnNameSet = Sets.newHashSet(); + Set mvKeyColumnNameSet = Sets.newHashSet(); + Map> mvAggColumnNameToFunctionNames = Maps.newHashMap(); /** * 1. The columns of mv must be a single column or a aggregate column without any calculate. * Also the children of aggregate column must be a single column without any calculate. @@ -143,7 +158,8 @@ public class CreateMaterializedViewStmt extends DdlStmt { } SlotRef slotRef = (SlotRef) selectListItem.getExpr(); String columnName = slotRef.getColumnName().toLowerCase(); - if (!mvColumnNameSet.add(columnName)) { + // check duplicate key + if (!mvKeyColumnNameSet.add(columnName)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_DUP_FIELDNAME, columnName); } MVColumnItem mvColumnItem = new MVColumnItem(columnName); @@ -172,10 +188,20 @@ public class CreateMaterializedViewStmt extends DdlStmt { + "Error function: " + functionCallExpr.toSqlImpl()); } meetAggregate = true; + // check duplicate value String columnName = slotRef.getColumnName().toLowerCase(); - if (!mvColumnNameSet.add(columnName)) { + if (mvKeyColumnNameSet.contains(columnName)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_DUP_FIELDNAME, columnName); } + Set functionNames = mvAggColumnNameToFunctionNames.get(columnName); + if (functionNames == null) { + functionNames = Sets.newHashSet(); + } + if (!functionNames.add(functionName.toLowerCase())) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_DUP_FIELDNAME, columnName); + } + mvAggColumnNameToFunctionNames.put(columnName, functionNames); + if (beginIndexOfAggregation == -1) { beginIndexOfAggregation = i; } @@ -186,6 +212,9 @@ public class CreateMaterializedViewStmt extends DdlStmt { } } // TODO(ML): only value columns of materialized view, such as select sum(v1) from table + if (beginIndexOfAggregation == 0) { + throw new AnalysisException("The materialized view must contain at least one key column"); + } } private void analyzeFromClause() throws AnalysisException { diff --git a/fe/src/main/java/org/apache/doris/analysis/Expr.java b/fe/src/main/java/org/apache/doris/analysis/Expr.java index 96971b42a0..2a5f531539 100644 --- a/fe/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/src/main/java/org/apache/doris/analysis/Expr.java @@ -47,6 +47,7 @@ import java.util.Iterator; import java.util.List; import java.util.ListIterator; import java.util.Map; +import java.util.Set; /** * Root of the expr node hierarchy. @@ -1108,6 +1109,13 @@ abstract public class Expr extends TreeNode implements ParseNode, Cloneabl } } + public void getTableNameToColumnNames(Map> tableNameToColumnNames) { + Preconditions.checkState(tableNameToColumnNames != null); + for (Expr child : children) { + child.getTableNameToColumnNames(tableNameToColumnNames); + } + } + /** * @return true if this is an instance of LiteralExpr */ diff --git a/fe/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/src/main/java/org/apache/doris/analysis/SelectStmt.java index e4f5299ab5..d0b8a9135f 100644 --- a/fe/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -59,6 +59,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; /** * Representation of a single select block, including GROUP BY, ORDER BY and HAVING @@ -66,6 +67,7 @@ import java.util.Set; */ public class SelectStmt extends QueryStmt { private final static Logger LOG = LogManager.getLogger(SelectStmt.class); + private UUID id = UUID.randomUUID(); // /////////////////////////////////////// // BEGIN: Members that need to be reset() @@ -137,6 +139,7 @@ public class SelectStmt extends QueryStmt { protected SelectStmt(SelectStmt other) { super(other); + this.id = other.id; selectList = other.selectList.clone(); fromClause_ = other.fromClause_.clone(); whereClause = (other.whereClause != null) ? other.whereClause.clone() : null; @@ -178,6 +181,10 @@ public class SelectStmt extends QueryStmt { return new SelectStmt(this); } + public UUID getId() { + return id; + } + /** * @return the original select list items from the query */ @@ -1431,4 +1438,20 @@ public class SelectStmt extends QueryStmt { } return false; } + + @Override + public int hashCode() { + return id.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (!(obj instanceof SelectStmt)) { + return false; + } + return this.id.equals(((SelectStmt) obj).id); + } } diff --git a/fe/src/main/java/org/apache/doris/analysis/SlotRef.java b/fe/src/main/java/org/apache/doris/analysis/SlotRef.java index 4240960346..1cfabbbcf5 100644 --- a/fe/src/main/java/org/apache/doris/analysis/SlotRef.java +++ b/fe/src/main/java/org/apache/doris/analysis/SlotRef.java @@ -17,6 +17,7 @@ package org.apache.doris.analysis; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.io.Text; @@ -35,6 +36,9 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; public class SlotRef extends Expr { private static final Logger LOG = LogManager.getLogger(SlotRef.class); @@ -259,6 +263,32 @@ public class SlotRef extends Expr { } } + @Override + public void getTableNameToColumnNames(Map> tupleDescToColumnNames) { + Preconditions.checkState(desc != null); + if (!desc.isMaterialized()) { + return; + } + if (col == null) { + for (Expr expr : desc.getSourceExprs()) { + expr.getTableNameToColumnNames(tupleDescToColumnNames); + } + } else { + Table table = desc.getParent().getTable(); + if (table == null) { + // Maybe this column comes from inline view. + return; + } + String tableName = table.getName(); + Set columnNames = tupleDescToColumnNames.get(tableName); + if (columnNames == null) { + columnNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + tupleDescToColumnNames.put(tableName, columnNames); + } + columnNames.add(desc.getColumn().getName()); + } + } + public String getColumnName() { return col; } diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index a0210994c7..aebd76ef21 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -4870,9 +4870,7 @@ public class Catalog { } public void createMaterializedView(CreateMaterializedViewStmt stmt) throws AnalysisException, DdlException { - // TODO(ml): remove it - throw new AnalysisException("The materialized view is coming soon"); -// this.alter.processCreateMaterializedView(stmt); + this.alter.processCreateMaterializedView(stmt); } /* diff --git a/fe/src/main/java/org/apache/doris/catalog/Column.java b/fe/src/main/java/org/apache/doris/catalog/Column.java index 6b71611f23..aff4b6a064 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/src/main/java/org/apache/doris/catalog/Column.java @@ -167,6 +167,13 @@ public class Column implements Writable { return this.aggregationType; } + public boolean isAggregated() { + if (aggregationType == null || aggregationType == AggregateType.NONE) { + return false; + } + return true; + } + public boolean isAggregationTypeImplicit() { return this.isAggregationTypeImplicit; } diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java index 58951dcfb8..860a72dd01 100644 --- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -289,6 +289,10 @@ public class OlapTable extends Table { public void rebuildFullSchema() { fullSchema.clear(); nameToColumn.clear(); + for (Column baseColumn : indexIdToSchema.get(baseIndexId)) { + fullSchema.add(baseColumn); + nameToColumn.put(baseColumn.getName(), baseColumn); + } for (List columns : indexIdToSchema.values()) { for (Column column : columns) { if (!nameToColumn.containsKey(column.getName())) { @@ -331,6 +335,16 @@ public class OlapTable extends Table { return null; } + public Map> getVisibleIndexIdToSchema() { + Map> visibleMVs = Maps.newHashMap(); + Partition partition = idToPartition.values().stream().findFirst().get(); + List mvs = partition.getMaterializedIndices(IndexExtState.VISIBLE); + for (MaterializedIndex mv : mvs) { + visibleMVs.put(mv.getId(), indexIdToSchema.get(mv.getId())); + } + return visibleMVs; + } + // this is only for schema change. public void renameIndexForSchemaChange(String name, String newName) { long idxId = indexNameToId.remove(name); @@ -1002,6 +1016,11 @@ public class OlapTable extends Table { tableProperty = TableProperty.read(in); } } + + // In the present, the fullSchema could be rebuilt by schema change while the properties is changed by MV. + // After that, some properties of fullSchema and nameToColumn may be not same as properties of base columns. + // So, here we need to rebuild the fullSchema to ensure the correctness of the properties. + rebuildFullSchema(); } public boolean equals(Table table) { diff --git a/fe/src/main/java/org/apache/doris/catalog/Table.java b/fe/src/main/java/org/apache/doris/catalog/Table.java index 688b240d7d..8ddf394640 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/src/main/java/org/apache/doris/catalog/Table.java @@ -69,8 +69,22 @@ public class Table extends MetaObject implements Writable { * * NOTICE: the order of this fullSchema is meaningless to OlapTable */ + /** + * The fullSchema of OlapTable includes the base columns and the SHADOW_NAME_PRFIX columns. + * The properties of base columns in fullSchema are same as properties in baseIndex. + * For example: + * Table (c1 int, c2 int, c3 int) + * Schema change (c3 to bigint) + * When OlapTable is changing schema, the fullSchema is (c1 int, c2 int, c3 int, SHADOW_NAME_PRFIX_c3 bigint) + * The fullSchema of OlapTable is mainly used by Scanner of Load job. + * + * If you want to get the mv columns, you should call getIndexToSchema in Subclass OlapTable. + */ protected List fullSchema; // tree map for case-insensitive lookup. + /** + * The nameToColumn of OlapTable includes the base columns and the SHADOW_NAME_PRFIX columns. + */ protected Map nameToColumn; // DO NOT persist this variable. diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index 22a0cc10bf..202f0ed283 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -999,5 +999,11 @@ public class Config extends ConfigBase { */ @ConfField public static boolean check_java_version = true; + + /* + * control materialized view + */ + @ConfField(mutable = true, masterOnly = true) + public static boolean enable_materialized_view = false; } diff --git a/fe/src/main/java/org/apache/doris/external/EsStateStore.java b/fe/src/main/java/org/apache/doris/external/EsStateStore.java index 5cf3fdaff8..c47dbc5e38 100644 --- a/fe/src/main/java/org/apache/doris/external/EsStateStore.java +++ b/fe/src/main/java/org/apache/doris/external/EsStateStore.java @@ -201,8 +201,8 @@ public class EsStateStore extends MasterDaemon { JSONObject mappings = indexMetaMap.optJSONObject("mappings"); JSONObject rootSchema = mappings.optJSONObject(esTable.getMappingType()); JSONObject schema = rootSchema.optJSONObject("properties"); - List cols = esTable.getFullSchema(); - for (Column col : cols) { + List colList = esTable.getFullSchema(); + for (Column col : colList) { String colName = col.getName(); if (!schema.has(colName)) { continue; diff --git a/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java b/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java new file mode 100644 index 0000000000..f8624e1589 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java @@ -0,0 +1,520 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.alter.MaterializedViewHandler; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.CastExpr; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TableRef; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; + +/** + * The new materialized view selector supports SPJ<->SPJG. + * It means this selector can judge any combination of those two options SPJ and SPJG. + * For example, the type of query is SPJG while the type of MV is SPJ. + * At the same time, it is compatible with all the features of the old version. + * + * What is SPJ and SPJG? + * The SPJ query is "Select Projection and Join" such as: + * select t1.c1 from t1, t2 where t1.c2=t2.c2 and t1.c3=1; + * The SPJG query is "Select Projection Join and Group-by" such as: + * select t1.c1, sum(t2.c1) from t1, t2 where t1.c2=t2.c2 and t1.c3=1 group by t1.c 1; + */ +public class MaterializedViewSelector { + private static final Logger LOG = LogManager.getLogger(MaterializedViewSelector.class); + + private final SelectStmt selectStmt; + private final Analyzer analyzer; + + /** + * The key of following maps is table name. + * The value of following maps is column names. + * `columnNamesInPredicates` means that the column names in where clause. + * And so on. + */ + private Map> columnNamesInPredicates = Maps.newHashMap(); + private boolean isSPJQuery; + private Map> columnNamesInGrouping = Maps.newHashMap(); + private Map> aggregateColumnsInQuery = Maps.newHashMap(); + private Map> columnNamesInQueryOutput = Maps.newHashMap(); + + private boolean disableSPJGView; + private String reasonOfDisable; + private boolean isPreAggregation = true; + + public MaterializedViewSelector(SelectStmt selectStmt, Analyzer analyzer) { + this.selectStmt = selectStmt; + this.analyzer = analyzer; + init(); + } + + /** + * There are two stages to choosing the best MV. + * Phase 1: Predicates = computeCandidateMVs + * According to aggregation and column information in the select stmt, + * the candidate MVs that meets the query conditions are selected. + * Phase 2: Priorities = computeBestMVByCost + * According to prefix index and row count in candidate MVs, + * the best MV is selected. + * + * @param scanNode + * @return + */ + public BestIndexInfo selectBestMV(ScanNode scanNode) throws UserException { + long start = System.currentTimeMillis(); + Preconditions.checkState(scanNode instanceof OlapScanNode); + OlapScanNode olapScanNode = (OlapScanNode) scanNode; + Map> candidateIndexIdToSchema = predicates(olapScanNode); + long bestIndexId = priorities(olapScanNode, candidateIndexIdToSchema); + LOG.info("The best materialized view is {} for scan node {} in query {}, cost {}", + bestIndexId, scanNode.getId(), selectStmt.toSql(), (System.currentTimeMillis() - start)); + BestIndexInfo bestIndexInfo = new BestIndexInfo(bestIndexId, isPreAggregation, reasonOfDisable); + return bestIndexInfo; + } + + private Map> predicates(OlapScanNode scanNode) { + // Step1: all of predicates is compensating predicates + Map> candidateIndexIdToSchema = scanNode.getOlapTable().getVisibleIndexIdToSchema(); + OlapTable table = scanNode.getOlapTable(); + Preconditions.checkState(table != null); + String tableName = table.getName(); + // Step2: check all columns in compensating predicates are available in the view output + checkCompensatingPredicates(columnNamesInPredicates.get(tableName), candidateIndexIdToSchema); + // Step3: group by list in query is the subset of group by list in view or view contains no aggregation + checkGrouping(columnNamesInGrouping.get(tableName), candidateIndexIdToSchema); + // Step4: aggregation functions are available in the view output + checkAggregationFunction(aggregateColumnsInQuery.get(tableName), candidateIndexIdToSchema); + // Step5: columns required to compute output expr are available in the view output + checkOutputColumns(columnNamesInQueryOutput.get(tableName), candidateIndexIdToSchema); + // Step6: if table type is aggregate and the candidateIndexIdToSchema is empty, + if (table.getKeysType() == KeysType.AGG_KEYS && candidateIndexIdToSchema.size() == 0) { + // the base index will be added in the candidateIndexIdToSchema. + /** + * In Doris, it is allowed that the aggregate table should be scanned directly + * while there is no aggregation info in query. + * For example: + * Aggregate tableA: k1, k2, sum(v1) + * Query: select k1, k2, v1 from tableA + * Allowed + * Result: same as select k1, k2, sum(v1) from tableA group by t1, t2 + * + * However, the query should not be selector normally. + * The reason is that the level of group by in tableA is upper then the level of group by in query. + * So, we need to compensate those kinds of index in following step. + * + */ + compensateIndex(candidateIndexIdToSchema, scanNode.getOlapTable().getVisibleIndexIdToSchema(), + table.getSchemaByIndexId(table.getBaseIndexId()).size()); + } + return candidateIndexIdToSchema; + } + + private long priorities(OlapScanNode scanNode, Map> candidateIndexIdToSchema) { + // Step1: the candidate indexes that satisfies the most prefix index + final Set equivalenceColumns = Sets.newHashSet(); + final Set unequivalenceColumns = Sets.newHashSet(); + scanNode.collectColumns(analyzer, equivalenceColumns, unequivalenceColumns); + Set indexesMatchingBestPrefixIndex = + matchBestPrefixIndex(candidateIndexIdToSchema, equivalenceColumns, unequivalenceColumns); + + // Step2: the best index that satisfies the least number of rows + return selectBestRowCountIndex(indexesMatchingBestPrefixIndex, scanNode.getOlapTable(), scanNode + .getSelectedPartitionIds()); + } + + private Set matchBestPrefixIndex(Map> candidateIndexIdToSchema, + Set equivalenceColumns, + Set unequivalenceColumns) { + if (equivalenceColumns.size() == 0 && unequivalenceColumns.size() == 0) { + return candidateIndexIdToSchema.keySet(); + } + Set indexesMatchingBestPrefixIndex = Sets.newHashSet(); + int maxPrefixMatchCount = 0; + for (Map.Entry> entry : candidateIndexIdToSchema.entrySet()) { + int prefixMatchCount = 0; + long indexId = entry.getKey(); + List indexSchema = entry.getValue(); + for (Column col : indexSchema) { + if (equivalenceColumns.contains(col.getName())) { + prefixMatchCount++; + } else if (unequivalenceColumns.contains(col.getName())) { + // Unequivalence predicate's columns can match only first column in rollup. + prefixMatchCount++; + break; + } else { + break; + } + } + + if (prefixMatchCount == maxPrefixMatchCount) { + LOG.debug("find a equal prefix match index {}. match count: {}", indexId, prefixMatchCount); + indexesMatchingBestPrefixIndex.add(indexId); + } else if (prefixMatchCount > maxPrefixMatchCount) { + LOG.debug("find a better prefix match index {}. match count: {}", indexId, prefixMatchCount); + maxPrefixMatchCount = prefixMatchCount; + indexesMatchingBestPrefixIndex.clear(); + indexesMatchingBestPrefixIndex.add(indexId); + } + } + LOG.debug("Those mv match the best prefix index:" + Joiner.on(",").join(indexesMatchingBestPrefixIndex)); + return indexesMatchingBestPrefixIndex; + } + + private long selectBestRowCountIndex(Set indexesMatchingBestPrefixIndex, OlapTable olapTable, + Collection partitionIds) { + long minRowCount = Long.MAX_VALUE; + long selectedIndexId = 0; + for (Long indexId : indexesMatchingBestPrefixIndex) { + long rowCount = 0; + for (Long partitionId : partitionIds) { + rowCount += olapTable.getPartition(partitionId).getIndex(indexId).getRowCount(); + } + LOG.debug("rowCount={} for table={}", rowCount, indexId); + if (rowCount < minRowCount) { + minRowCount = rowCount; + selectedIndexId = indexId; + } else if (rowCount == minRowCount) { + // check column number, select one minimum column number + int selectedColumnSize = olapTable.getIndexIdToSchema().get(selectedIndexId).size(); + int currColumnSize = olapTable.getIndexIdToSchema().get(indexId).size(); + if (currColumnSize < selectedColumnSize) { + selectedIndexId = indexId; + } + } + } + String tableName = olapTable.getName(); + String v2RollupIndexName = MaterializedViewHandler.NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + tableName; + Long v2RollupIndex = olapTable.getIndexIdByName(v2RollupIndexName); + long baseIndexId = olapTable.getBaseIndexId(); + ConnectContext connectContext = ConnectContext.get(); + boolean useV2Rollup = false; + if (connectContext != null) { + useV2Rollup = connectContext.getSessionVariable().getUseV2Rollup(); + } + if (baseIndexId == selectedIndexId && v2RollupIndex != null && useV2Rollup) { + // if the selectedIndexId is baseIndexId + // check whether there is a V2 rollup index and useV2Rollup flag is true, + // if both true, use v2 rollup index + selectedIndexId = v2RollupIndex; + } + if (!useV2Rollup && v2RollupIndex != null && v2RollupIndex == selectedIndexId) { + // if the selectedIndexId is v2RollupIndex + // but useV2Rollup is false, use baseIndexId as selectedIndexId + // just make sure to use baseIndex instead of v2RollupIndex if the useV2Rollup is false + selectedIndexId = baseIndexId; + } + return selectedIndexId; + } + + private void checkCompensatingPredicates(Set columnsInPredicates, + Map> candidateIndexIdToSchema) { + // When the query statement does not contain any columns in predicates, all candidate index can pass this check + if (columnsInPredicates == null) { + return; + } + Iterator>> iterator = candidateIndexIdToSchema.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry> entry = iterator.next(); + Set indexNonAggregatedColumnNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + entry.getValue().stream().filter(column -> !column.isAggregated()) + .forEach(column -> indexNonAggregatedColumnNames.add(column.getName())); + if (!indexNonAggregatedColumnNames.containsAll(columnsInPredicates)) { + iterator.remove(); + } + } + LOG.debug("Those mv pass the test of compensating predicates:" + + Joiner.on(",").join(candidateIndexIdToSchema.keySet())); + } + + /** + * View Query result + * SPJ SPJG OR SPJ pass + * SPJG SPJ fail + * SPJG SPJG pass + * 1. grouping columns in query is subset of grouping columns in view + * 2. the empty grouping columns in query is subset of all of views + * + * @param columnsInGrouping + * @param candidateIndexIdToSchema + */ + + private void checkGrouping(Set columnsInGrouping, Map> candidateIndexIdToSchema) { + Iterator>> iterator = candidateIndexIdToSchema.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry> entry = iterator.next(); + Set indexNonAggregatedColumnNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + List candidateIndexSchema = entry.getValue(); + candidateIndexSchema.stream().filter(column -> !column.isAggregated()) + .forEach(column -> indexNonAggregatedColumnNames.add(column.getName())); + // When the candidate index is SPJ type, it passes the verification directly + if (indexNonAggregatedColumnNames.size() == candidateIndexSchema.size()) { + continue; + } + // When the query is SPJ type but the candidate index is SPJG type, it will not pass directly. + if (isSPJQuery || disableSPJGView) { + iterator.remove(); + continue; + } + // The query is SPJG. The candidate index is SPJG too. + // The grouping columns in query is empty. For example: select sum(A) from T + if (columnsInGrouping == null) { + continue; + } + // The grouping columns in query must be subset of the grouping columns in view + if (!indexNonAggregatedColumnNames.containsAll(columnsInGrouping)) { + iterator.remove(); + } + } + LOG.debug("Those mv pass the test of grouping:" + + Joiner.on(",").join(candidateIndexIdToSchema.keySet())); + } + + private void checkAggregationFunction(Set aggregatedColumnsInQueryOutput, + Map> candidateIndexIdToSchema) { + Iterator>> iterator = candidateIndexIdToSchema.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry> entry = iterator.next(); + List indexAggregatedColumns = Lists.newArrayList(); + List candidateIndexSchema = entry.getValue(); + candidateIndexSchema.stream().filter(column -> column.isAggregated()) + .forEach(column -> indexAggregatedColumns.add( + new AggregatedColumn(column.getName(), column.getAggregationType().name()))); + // When the candidate index is SPJ type, it passes the verification directly + if (indexAggregatedColumns.size() == 0) { + continue; + } + // When the query is SPJ type but the candidate index is SPJG type, it will not pass directly. + if (isSPJQuery || disableSPJGView) { + iterator.remove(); + continue; + } + // The query is SPJG. The candidate index is SPJG too. + /* Situation1: The query is deduplicate SPJG when aggregatedColumnsInQueryOutput is null. + * For example: select a , b from table group by a, b + * The aggregation function check should be pass directly when MV is SPJG. + */ + if (aggregatedColumnsInQueryOutput == null) { + continue; + } + // The aggregated columns in query output must be subset of the aggregated columns in view + if (!indexAggregatedColumns.containsAll(aggregatedColumnsInQueryOutput)) { + iterator.remove(); + } + } + LOG.debug("Those mv pass the test of aggregation function:" + + Joiner.on(",").join(candidateIndexIdToSchema.keySet())); + } + + private void checkOutputColumns(Set columnNamesInQueryOutput, + Map> candidateIndexIdToSchema) { + if (columnNamesInQueryOutput == null) { + return; + } + Iterator>> iterator = candidateIndexIdToSchema.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry> entry = iterator.next(); + Set indexColumnNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + List candidateIndexSchema = entry.getValue(); + candidateIndexSchema.stream().forEach(column -> indexColumnNames.add(column.getName())); + // The aggregated columns in query output must be subset of the aggregated columns in view + if (!indexColumnNames.containsAll(columnNamesInQueryOutput)) { + iterator.remove(); + } + } + LOG.debug("Those mv pass the test of output columns:" + + Joiner.on(",").join(candidateIndexIdToSchema.keySet())); + } + + private void compensateIndex(Map> candidateIndexIdToSchema, + Map> allVisibleIndexes, + int sizeOfBaseIndexSchema) { + isPreAggregation = false; + reasonOfDisable = "The aggregate operator does not match"; + for (Map.Entry> index : allVisibleIndexes.entrySet()) { + if (index.getValue().size() == sizeOfBaseIndexSchema) { + candidateIndexIdToSchema.put(index.getKey(), index.getValue()); + } + } + LOG.debug("Those mv pass the test of output columns:" + + Joiner.on(",").join(candidateIndexIdToSchema.keySet())); + } + + private void init() { + // Step1: compute the columns in compensating predicates + Expr whereClause = selectStmt.getWhereClause(); + if (whereClause != null) { + whereClause.getTableNameToColumnNames(columnNamesInPredicates); + } + for (TableRef tableRef : selectStmt.getTableRefs()) { + if (tableRef.getOnClause() == null) { + continue; + } + tableRef.getOnClause().getTableNameToColumnNames(columnNamesInPredicates); + } + + if (selectStmt.getAggInfo() == null) { + isSPJQuery = true; + } else { + // Step2: compute the columns in group by expr + if (selectStmt.getAggInfo().getGroupingExprs() != null) { + List groupingExprs = selectStmt.getAggInfo().getGroupingExprs(); + for (Expr expr : groupingExprs) { + expr.getTableNameToColumnNames(columnNamesInGrouping); + } + } + // Step3: compute the aggregation function + for (FunctionCallExpr aggExpr : selectStmt.getAggInfo().getAggregateExprs()) { + // Only sum, min, max function could appear in materialized views. + // The number of children in these functions is one. + if (aggExpr.getChildren().size() != 1) { + reasonOfDisable = "aggExpr has more than one child"; + disableSPJGView = true; + break; + } + Expr aggChild0 = aggExpr.getChild(0); + if (aggChild0 instanceof SlotRef) { + SlotRef slotRef = (SlotRef) aggChild0; + Preconditions.checkState(slotRef.getColumnName() != null); + Table table = slotRef.getDesc().getParent().getTable(); + /* If this column come from subquery, the parent table will be null + * For example: select k1 from (select name as k1 from tableA) A + * The parent table of k1 column in outer query is null. + */ + if (table == null) { + continue; + } + addAggregatedColumn(slotRef.getColumnName(), aggExpr.getFnName().getFunction(), + table.getName()); + } else if ((aggChild0 instanceof CastExpr) && (aggChild0.getChild(0) instanceof SlotRef)) { + SlotRef slotRef = (SlotRef) aggChild0.getChild(0); + Preconditions.checkState(slotRef.getColumnName() != null); + Table table = slotRef.getDesc().getParent().getTable(); + /* + * Same as above + */ + if (table == null) { + continue; + } + addAggregatedColumn(slotRef.getColumnName(), aggExpr.getFnName().getFunction(), + table.getName()); + } else { + reasonOfDisable = "aggExpr.getChild(0)[" + aggExpr.getChild(0).debugString() + + "] is not SlotRef or CastExpr|CaseExpr"; + disableSPJGView = true; + break; + } + // TODO(ml): select rollup by case expr + } + } + + // Step4: compute the output column + for (Expr resultExpr : selectStmt.getResultExprs()) { + resultExpr.getTableNameToColumnNames(columnNamesInQueryOutput); + } + } + + private void addAggregatedColumn(String columnName, String functionName, String tableName) { + AggregatedColumn newAggregatedColumn = new AggregatedColumn(columnName, functionName); + Set aggregatedColumns = aggregateColumnsInQuery.get(tableName); + if (aggregatedColumns == null) { + aggregatedColumns = Sets.newHashSet(); + aggregateColumnsInQuery.put(tableName, aggregatedColumns); + } + aggregatedColumns.add(newAggregatedColumn); + } + + class AggregatedColumn { + private String columnName; + private String aggFunctionName; + + public AggregatedColumn(String columnName, String aggFunctionName) { + this.columnName = columnName; + this.aggFunctionName = aggFunctionName; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof AggregatedColumn)) { + return false; + } + AggregatedColumn input = (AggregatedColumn) obj; + return this.columnName.equalsIgnoreCase(input.columnName) + && this.aggFunctionName.equalsIgnoreCase(input.aggFunctionName); + } + + @Override + public int hashCode() { + return Objects.hash(this.columnName, this.aggFunctionName); + } + } + + public class BestIndexInfo { + private long bestIndexId; + private boolean isPreAggregation; + private String reasonOfDisable; + + public BestIndexInfo(long bestIndexId, boolean isPreAggregation, String reasonOfDisable) { + this.bestIndexId = bestIndexId; + this.isPreAggregation = isPreAggregation; + this.reasonOfDisable = reasonOfDisable; + } + + public long getBestIndexId() { + return bestIndexId; + } + + public boolean isPreAggregation() { + return isPreAggregation; + } + + public String getReasonOfDisable() { + return reasonOfDisable; + } + } +} + + diff --git a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java index b113388ea7..225b6965c7 100644 --- a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -19,6 +19,12 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BaseTableRef; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.CastExpr; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; @@ -38,6 +44,8 @@ import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.service.FrontendOptions; import org.apache.doris.system.Backend; import org.apache.doris.thrift.TExplainLevel; @@ -69,24 +77,24 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; // Full scan of an Olap table. public class OlapScanNode extends ScanNode { private static final Logger LOG = LogManager.getLogger(OlapScanNode.class); - private List result = new ArrayList(); + private List result = new ArrayList<>(); private boolean isPreAggregation = false; private String reasonOfPreAggregation = null; private boolean canTurnOnPreAggr = true; private boolean forceOpenPreAgg = false; - private ArrayList tupleColumns = new ArrayList(); - private HashSet predicateColumns = new HashSet(); private OlapTable olapTable = null; private long selectedTabletsNum = 0; private long totalTabletsNum = 0; private long selectedIndexId = -1; private int selectedPartitionNum = 0; + private Collection selectedPartitionIds = Lists.newArrayList(); private long totalBytes = 0; // List of tablets will be scanned by current olap_scan_node @@ -130,6 +138,83 @@ public class OlapScanNode extends ScanNode { this.forceOpenPreAgg = forceOpenPreAgg; } + public Collection getSelectedPartitionIds() { + return selectedPartitionIds; + } + + /** + * This method is mainly used to update scan range info in OlapScanNode by the new materialized selector. + * Situation1: + * If the new scan range is same as the old scan range which determined by the old materialized selector, + * the scan range will not be changed. + *

+ * Situation2: Scan range is difference. The type of table is duplicated. + * The new scan range is used directly. + * The reason is that the old selector does not support SPJ<->SPJG, so the result of old one must be incorrect. + *

+ * Situation3: Scan range is difference. The type of table is aggregated. + * The new scan range is different from the old one. + * If the test_materialized_view is set to true, an error will be reported. + * The query will be cancelled. + *

+ * Situation4: Scan range is difference. The type of table is aggregated. `test_materialized_view` is set to false. + * The result of the old version selector will be selected. Print the warning log + * + * @param selectedIndexId + * @param isPreAggregation + * @param reasonOfDisable + * @throws UserException + */ + public void updateScanRangeInfoByNewMVSelector(long selectedIndexId, boolean isPreAggregation, String + reasonOfDisable) + throws UserException { + if (selectedIndexId == this.selectedIndexId && isPreAggregation == this.isPreAggregation) { + return; + } + StringBuilder stringBuilder = new StringBuilder("The new selected index id ") + .append(selectedIndexId) + .append(", pre aggregation tag ").append(isPreAggregation) + .append(", reason ").append(reasonOfDisable == null ? "null" : reasonOfDisable) + .append(". The old selected index id ").append(this.selectedIndexId) + .append(" pre aggregation tag ").append(this.isPreAggregation) + .append(" reason ").append(this.reasonOfPreAggregation == null ? "null" : this.reasonOfPreAggregation); + String scanRangeInfo = stringBuilder.toString(); + String situation; + boolean update; + CHECK: + { + if (olapTable.getKeysType() == KeysType.DUP_KEYS) { + situation = "The key type of table is duplicate."; + update = true; + break CHECK; + } + if (ConnectContext.get() == null) { + situation = "Connection context is null"; + update = true; + break CHECK; + } + SessionVariable sessionVariable = ConnectContext.get().getSessionVariable(); + if (sessionVariable.getTestMaterializedView()) { + throw new AnalysisException("The old scan range info is different from the new one when " + + "test_materialized_view is true. " + + scanRangeInfo); + } + situation = "The key type of table is aggregated."; + update = false; + break CHECK; + } + + if (update) { + this.selectedIndexId = selectedIndexId; + this.isPreAggregation = isPreAggregation; + this.reasonOfPreAggregation = reasonOfDisable; + computeTabletInfo(); + LOG.info("Using the new scan range info instead of the old one. {}, {}", situation ,scanRangeInfo); + } else { + LOG.warn("Using the old scan range info instead of the new one. {}, {}", situation, scanRangeInfo); + } + } + public OlapTable getOlapTable() { return olapTable; } @@ -228,8 +313,7 @@ public class OlapScanNode extends ScanNode { private void addScanRangeLocations(Partition partition, MaterializedIndex index, List tablets, - long localBeId) - throws UserException, AnalysisException { + long localBeId) throws UserException { int logNum = 0; int schemaHash = olapTable.getSchemaHashByIndexId(index.getId()); @@ -312,45 +396,54 @@ public class OlapScanNode extends ScanNode { } } - private void getScanRangeLocations(Analyzer analyzer) throws UserException, AnalysisException { + private void getScanRangeLocations(Analyzer analyzer) throws UserException { long start = System.currentTimeMillis(); - Collection partitionIds = partitionPrune(olapTable.getPartitionInfo()); - - if (partitionIds == null) { - partitionIds = new ArrayList(); + // Step1: compute partition ids + selectedPartitionIds = partitionPrune(olapTable.getPartitionInfo()); + if (selectedPartitionIds == null) { + selectedPartitionIds = Lists.newArrayList(); for (Partition partition : olapTable.getPartitions()) { if (!partition.hasData()) { continue; } - partitionIds.add(partition.getId()); + selectedPartitionIds.add(partition.getId()); } } else { - partitionIds = partitionIds.stream().filter(id -> olapTable.getPartition(id).hasData()).collect( - Collectors.toList()); + selectedPartitionIds = selectedPartitionIds.stream() + .filter(id -> olapTable.getPartition(id).hasData()) + .collect(Collectors.toList()); } - - selectedPartitionNum = partitionIds.size(); - LOG.debug("partition prune cost: {} ms, partitions: {}", (System.currentTimeMillis() - start), partitionIds); - - start = System.currentTimeMillis(); - - if (olapTable.getKeysType() == KeysType.DUP_KEYS) { - isPreAggregation = true; - } - - if (partitionIds.size() == 0) { + selectedPartitionNum = selectedPartitionIds.size(); + LOG.debug("partition prune cost: {} ms, partitions: {}", + (System.currentTimeMillis() - start), selectedPartitionIds); + if (selectedPartitionIds.size() == 0) { return; } + // Step2: select best rollup + start = System.currentTimeMillis(); + if (olapTable.getKeysType() == KeysType.DUP_KEYS) { + isPreAggregation = true; + } + // TODO: Remove the logic of old selector. final RollupSelector rollupSelector = new RollupSelector(analyzer, desc, olapTable); - selectedIndexId = rollupSelector.selectBestRollup(partitionIds, conjuncts, isPreAggregation); + selectedIndexId = rollupSelector.selectBestRollup(selectedPartitionIds, conjuncts, isPreAggregation); + LOG.debug("select best roll up cost: {} ms, best index id: {}", + (System.currentTimeMillis() - start), selectedIndexId); + // Step3: compute tablet info by selected index id and selected partition ids + start = System.currentTimeMillis(); + computeTabletInfo(); + LOG.debug("distribution prune cost: {} ms", (System.currentTimeMillis() - start)); + } + + private void computeTabletInfo() throws UserException { long localBeId = -1; if (Config.enable_local_replica_selection) { localBeId = Catalog.getCurrentSystemInfo().getBackendIdByHost(FrontendOptions.getLocalHostAddress()); } - for (Long partitionId : partitionIds) { + for (Long partitionId : selectedPartitionIds) { final Partition partition = olapTable.getPartition(partitionId); final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId); final List tablets = Lists.newArrayList(); @@ -376,7 +469,6 @@ public class OlapScanNode extends ScanNode { selectedTabletsNum += tablets.size(); addScanRangeLocations(partition, selectedTable, tablets, localBeId); } - LOG.debug("distribution prune cost: {} ms", (System.currentTimeMillis() - start)); } /** @@ -497,4 +589,108 @@ public class OlapScanNode extends ScanNode { return olapScanNode; } + + public void collectColumns(Analyzer analyzer, Set equivalenceColumns, Set unequivalenceColumns) { + // 1. Get columns which has predicate on it. + for (Expr expr : conjuncts) { + if (!isPredicateUsedForPrefixIndex(expr, false)) { + continue; + } + for (SlotDescriptor slot : desc.getMaterializedSlots()) { + if (expr.isBound(slot.getId())) { + if (!isEquivalenceExpr(expr)) { + unequivalenceColumns.add(slot.getColumn().getName()); + } else { + equivalenceColumns.add(slot.getColumn().getName()); + } + break; + } + } + } + + // 2. Equal join predicates when pushing inner child. + List eqJoinPredicate = analyzer.getEqJoinConjuncts(desc.getId()); + for (Expr expr : eqJoinPredicate) { + if (!isPredicateUsedForPrefixIndex(expr, true)) { + continue; + } + for (SlotDescriptor slot : desc.getMaterializedSlots()) { + Preconditions.checkState(expr.getChildren().size() == 2); + for (Expr child : expr.getChildren()) { + if (child.isBound(slot.getId())) { + equivalenceColumns.add(slot.getColumn().getName()); + break; + } + } + } + } + } + + + private boolean isEquivalenceExpr(Expr expr) { + if (expr instanceof InPredicate) { + return true; + } + if (expr instanceof BinaryPredicate) { + final BinaryPredicate predicate = (BinaryPredicate) expr; + if (predicate.getOp().isEquivalence()) { + return true; + } + } + return false; + } + + private boolean isPredicateUsedForPrefixIndex(Expr expr, boolean isJoinConjunct) { + if (!(expr instanceof InPredicate) + && !(expr instanceof BinaryPredicate)) { + return false; + } + if (expr instanceof InPredicate) { + return isInPredicateUsedForPrefixIndex((InPredicate) expr); + } else if (expr instanceof BinaryPredicate) { + if (isJoinConjunct) { + return isEqualJoinConjunctUsedForPrefixIndex((BinaryPredicate) expr); + } else { + return isBinaryPredicateUsedForPrefixIndex((BinaryPredicate) expr); + } + } + return true; + } + + private boolean isEqualJoinConjunctUsedForPrefixIndex(BinaryPredicate expr) { + Preconditions.checkArgument(expr.getOp().isEquivalence()); + if (expr.isAuxExpr()) { + return false; + } + for (Expr child : expr.getChildren()) { + for (SlotDescriptor slot : desc.getMaterializedSlots()) { + if (child.isBound(slot.getId()) && isSlotRefNested(child)) { + return true; + } + } + } + return false; + } + + private boolean isBinaryPredicateUsedForPrefixIndex(BinaryPredicate expr) { + if (expr.isAuxExpr() || expr.getOp().isUnequivalence()) { + return false; + } + return (isSlotRefNested(expr.getChild(0)) && expr.getChild(1).isConstant()) + || (isSlotRefNested(expr.getChild(1)) && expr.getChild(0).isConstant()); + } + + private boolean isInPredicateUsedForPrefixIndex(InPredicate expr) { + if (expr.isNotIn()) { + return false; + } + return isSlotRefNested(expr.getChild(0)) && expr.isLiteralChildren(); + } + + private boolean isSlotRefNested(Expr expr) { + while (expr instanceof CastExpr) { + expr = expr.getChild(0); + } + return expr instanceof SlotRef; + } } diff --git a/fe/src/main/java/org/apache/doris/planner/Planner.java b/fe/src/main/java/org/apache/doris/planner/Planner.java index e7a8dee620..f21a111b37 100644 --- a/fe/src/main/java/org/apache/doris/planner/Planner.java +++ b/fe/src/main/java/org/apache/doris/planner/Planner.java @@ -158,6 +158,8 @@ public class Planner { // TupleDescriptor.avgSerializedSize analyzer.getDescTbl().computeMemLayout(); singleNodePlan.finalize(analyzer); + // materialized view selector + singleNodePlanner.selectMaterializedView(queryStmt, analyzer); if (queryOptions.num_nodes == 1) { // single-node execution; we're almost done singleNodePlan = addUnassignedConjuncts(analyzer, singleNodePlan); diff --git a/fe/src/main/java/org/apache/doris/planner/RollupSelector.java b/fe/src/main/java/org/apache/doris/planner/RollupSelector.java index 3d7315c053..d303c2916c 100644 --- a/fe/src/main/java/org/apache/doris/planner/RollupSelector.java +++ b/fe/src/main/java/org/apache/doris/planner/RollupSelector.java @@ -17,6 +17,7 @@ package org.apache.doris.planner; +import org.apache.doris.alter.MaterializedViewHandler; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BinaryPredicate; import org.apache.doris.analysis.CastExpr; @@ -31,12 +32,12 @@ import org.apache.doris.catalog.MaterializedIndex.IndexExtState; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.doris.qe.ConnectContext; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -97,7 +98,7 @@ public final class RollupSelector { } } String tableName = table.getName(); - String v2RollupIndexName = "__v2_" + tableName; + String v2RollupIndexName = MaterializedViewHandler.NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + tableName; Long v2RollupIndex = table.getIndexIdByName(v2RollupIndexName); long baseIndexId = table.getBaseIndexId(); ConnectContext connectContext = ConnectContext.get(); diff --git a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index e90028c6c6..f0728ea9b9 100644 --- a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -69,6 +69,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.UUID; /** * Constructs a non-executable single-node plan from an analyzed parse tree. @@ -81,6 +82,7 @@ public class SingleNodePlanner { private final PlannerContext ctx_; private final ArrayList scanNodes = Lists.newArrayList(); + private Map> selectStmtToScanNodes = Maps.newHashMap(); public SingleNodePlanner(PlannerContext ctx) { ctx_ = ctx; @@ -408,7 +410,7 @@ public class SingleNodePlanner { } boolean valueColumnValidate = true; - List allConjuncts = analyzer.getAllConjunt(selectStmt.getTableRefs().get(0).getId()); + List allConjuncts = analyzer.getAllConjuncts(selectStmt.getTableRefs().get(0).getId()); List conjunctSlotIds = Lists.newArrayList(); if (allConjuncts != null) { for (Expr conjunct : allConjuncts) { @@ -677,7 +679,7 @@ public class SingleNodePlanner { // create left-deep sequence of binary hash joins; assign node ids as we go along TableRef tblRef = selectStmt.getTableRefs().get(0); materializeTableResultForCrossJoinOrCountStar(tblRef, analyzer); - PlanNode root = createTableRefNode(analyzer, tblRef); + PlanNode root = createTableRefNode(analyzer, tblRef, selectStmt); // to change the inner contains analytic function // selectStmt.seondSubstituteInlineViewExprs(analyzer.getChangeResSmap()); @@ -698,7 +700,7 @@ public class SingleNodePlanner { for (int i = 1; i < selectStmt.getTableRefs().size(); ++i) { TableRef outerRef = selectStmt.getTableRefs().get(i - 1); TableRef innerRef = selectStmt.getTableRefs().get(i); - root = createJoinNode(analyzer, root, outerRef, innerRef); + root = createJoinNode(analyzer, root, outerRef, innerRef, selectStmt); // Have the build side of a join copy data to a compact representation // in the tuple buffer. root.getChildren().get(1).setCompactData(true); @@ -749,6 +751,39 @@ public class SingleNodePlanner { return root; } + public void selectMaterializedView(QueryStmt queryStmt, Analyzer analyzer) throws UserException { + if (queryStmt instanceof SelectStmt) { + SelectStmt selectStmt = (SelectStmt) queryStmt; + for (TableRef tableRef : selectStmt.getTableRefs()) { + if (tableRef instanceof InlineViewRef) { + selectMaterializedView(((InlineViewRef) tableRef).getViewStmt(), analyzer); + } + } + List scanNodeList = selectStmtToScanNodes.get(selectStmt.getId()); + if (scanNodeList == null) { + return; + } + MaterializedViewSelector materializedViewSelector = new MaterializedViewSelector(selectStmt, analyzer); + for (ScanNode scanNode : scanNodeList) { + if (!(scanNode instanceof OlapScanNode)) { + continue; + } + OlapScanNode olapScanNode = (OlapScanNode) scanNode; + MaterializedViewSelector.BestIndexInfo bestIndexInfo = materializedViewSelector.selectBestMV + (olapScanNode); + olapScanNode.updateScanRangeInfoByNewMVSelector(bestIndexInfo.getBestIndexId(), bestIndexInfo.isPreAggregation(), + bestIndexInfo.getReasonOfDisable()); + } + + } else { + Preconditions.checkState(queryStmt instanceof SetOperationStmt); + SetOperationStmt unionStmt = (SetOperationStmt) queryStmt; + for (SetOperationStmt.SetOperand unionOperand : unionStmt.getOperands()) { + selectMaterializedView(unionOperand.getQueryStmt(), analyzer); + } + } + } + /** * Returns a new AggregationNode that materializes the aggregation of the given stmt. * Assigns conjuncts from the Having clause to the returned node. @@ -1279,7 +1314,7 @@ public class SingleNodePlanner { /** * Create node for scanning all data files of a particular table. */ - private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef) + private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt selectStmt) throws UserException { ScanNode scanNode = null; @@ -1328,6 +1363,12 @@ public class SingleNodePlanner { analyzer.materializeSlots(scanNode.getConjuncts()); scanNodes.add(scanNode); + List scanNodeList = selectStmtToScanNodes.get(selectStmt.getId()); + if (scanNodeList == null) { + scanNodeList = Lists.newArrayList(); + selectStmtToScanNodes.put(selectStmt.getId(), scanNodeList); + } + scanNodeList.add(scanNode); return scanNode; } @@ -1402,12 +1443,13 @@ public class SingleNodePlanner { * as well as regular conjuncts. Calls init() on the new join node. * Throws if the JoinNode.init() fails. */ - private PlanNode createJoinNode(Analyzer analyzer, PlanNode outer, TableRef outerRef, TableRef innerRef) + private PlanNode createJoinNode(Analyzer analyzer, PlanNode outer, TableRef outerRef, TableRef innerRef, + SelectStmt selectStmt) throws UserException, AnalysisException { materializeTableResultForCrossJoinOrCountStar(innerRef, analyzer); // the rows coming from the build node only need to have space for the tuple // materialized by that node - PlanNode inner = createTableRefNode(analyzer, innerRef); + PlanNode inner = createTableRefNode(analyzer, innerRef, selectStmt); List eqJoinConjuncts = Lists.newArrayList(); Reference errMsg = new Reference(); @@ -1464,10 +1506,10 @@ public class SingleNodePlanner { * Throws if a PlanNode.init() failed or if planning of the given * table ref is not implemented. */ - private PlanNode createTableRefNode(Analyzer analyzer, TableRef tblRef) + private PlanNode createTableRefNode(Analyzer analyzer, TableRef tblRef, SelectStmt selectStmt) throws UserException, AnalysisException { if (tblRef instanceof BaseTableRef) { - return createScanNode(analyzer, tblRef); + return createScanNode(analyzer, tblRef, selectStmt); } if (tblRef instanceof InlineViewRef) { return createInlineViewPlan(analyzer, (InlineViewRef) tblRef); diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java index 546c3ca88b..211cbe39e7 100644 --- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -90,6 +90,7 @@ public class SessionVariable implements Serializable, Writable { public static final String LOAD_MEM_LIMIT = "load_mem_limit"; public static final String DEFAULT_ROWSET_TYPE = "default_rowset_type"; public static final String USE_V2_ROLLUP = "use_v2_rollup"; + public static final String TEST_MATERIALIZED_VIEW = "test_materialized_view"; // max memory used on every backend. @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) @@ -222,6 +223,10 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = USE_V2_ROLLUP) private boolean useV2Rollup = false; + // TODO(ml): remove it after test + @VariableMgr.VarAttr(name = TEST_MATERIALIZED_VIEW) + private boolean testMaterializedView = false; + public long getMaxExecMemByte() { return maxExecMemByte; } @@ -390,8 +395,12 @@ public class SessionVariable implements Serializable, Writable { public boolean getUseV2Rollup() { return useV2Rollup; } - public void setUseV2Rollup(boolean useV2Rollup) { - this.useV2Rollup = useV2Rollup; + public boolean getTestMaterializedView() { + return this.testMaterializedView; + } + + public void setTestMaterializedView(boolean testMaterializedView) { + this.testMaterializedView = testMaterializedView; } // Serialize to thrift object diff --git a/fe/src/test/java/org/apache/doris/analysis/CreateMaterializedViewStmtTest.java b/fe/src/test/java/org/apache/doris/analysis/CreateMaterializedViewStmtTest.java index 18ba82ac3b..bab7833f0d 100644 --- a/fe/src/test/java/org/apache/doris/analysis/CreateMaterializedViewStmtTest.java +++ b/fe/src/test/java/org/apache/doris/analysis/CreateMaterializedViewStmtTest.java @@ -18,11 +18,15 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.AggregateType; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.qe.ConnectContext; import com.google.common.collect.Lists; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import java.util.ArrayList; @@ -38,6 +42,15 @@ public class CreateMaterializedViewStmtTest { private Analyzer analyzer; @Mocked private ExprSubstitutionMap exprSubstitutionMap; + @Mocked + private ConnectContext connectContext; + @Mocked + private Config config; + + @Before + public void initTest() { + Deencapsulation.setField(Config.class, "enable_materialized_view", true); + } @Test public void testFunctionColumnInSelectClause(@Injectable ArithmeticExpr arithmeticExpr) { @@ -178,6 +191,8 @@ public class CreateMaterializedViewStmtTest { result = havingClause; slotRef.getColumnName(); result = "k1"; + selectStmt.getGroupByClause(); + result = null; } }; CreateMaterializedViewStmt createMaterializedViewStmt = new CreateMaterializedViewStmt("test", selectStmt, null); @@ -222,6 +237,8 @@ public class CreateMaterializedViewStmtTest { result = "k1"; slotRef2.getColumnName(); result = "k2"; + selectStmt.getGroupByClause(); + result = null; } }; CreateMaterializedViewStmt createMaterializedViewStmt = new CreateMaterializedViewStmt("test", selectStmt, null); @@ -238,7 +255,8 @@ public class CreateMaterializedViewStmtTest { @Injectable SlotRef slotRef2, @Injectable FunctionCallExpr functionCallExpr, @Injectable TableRef tableRef, - @Injectable SelectStmt selectStmt) throws UserException { + @Injectable SelectStmt selectStmt, + @Injectable GroupByClause groupByClause) throws UserException { SelectList selectList = new SelectList(); SelectListItem selectListItem1 = new SelectListItem(slotRef1, null); selectList.addItem(selectListItem1); @@ -247,6 +265,8 @@ public class CreateMaterializedViewStmtTest { OrderByElement orderByElement1 = new OrderByElement(functionCallExpr, false, false); OrderByElement orderByElement2 = new OrderByElement(slotRef1, false, false); ArrayList orderByElementList = Lists.newArrayList(orderByElement1, orderByElement2); + List groupByList = Lists.newArrayList(); + groupByList.add(slotRef1); new Expectations() { { @@ -273,6 +293,10 @@ public class CreateMaterializedViewStmtTest { result = Lists.newArrayList(slotRef2); functionCallExpr.getChild(0); result = slotRef2; + selectStmt.getGroupByClause(); + result = groupByClause; + groupByClause.getGroupingExprs(); + result = groupByList; } }; CreateMaterializedViewStmt createMaterializedViewStmt = new CreateMaterializedViewStmt("test", selectStmt, null); @@ -326,7 +350,8 @@ public class CreateMaterializedViewStmtTest { @Injectable FunctionCallExpr functionCallExpr, @Injectable SlotRef functionChild0, @Injectable TableRef tableRef, - @Injectable SelectStmt selectStmt) throws UserException { + @Injectable SelectStmt selectStmt, + @Injectable GroupByClause groupByClause) throws UserException { SelectList selectList = new SelectList(); SelectListItem selectListItem1 = new SelectListItem(slotRef1, null); selectList.addItem(selectListItem1); @@ -337,6 +362,11 @@ public class CreateMaterializedViewStmtTest { OrderByElement orderByElement1 = new OrderByElement(slotRef1, false, false); ArrayList orderByElementList = Lists.newArrayList(orderByElement1); + + List groupByList = Lists.newArrayList(); + groupByList.add(slotRef1); + groupByList.add(slotRef2); + new Expectations() { { analyzer.getClusterName(); @@ -364,6 +394,10 @@ public class CreateMaterializedViewStmtTest { result = Lists.newArrayList(slotRef1); functionCallExpr.getChild(0); result = functionChild0; + selectStmt.getGroupByClause(); + result = groupByClause; + groupByClause.getGroupingExprs(); + result = groupByList; } }; CreateMaterializedViewStmt createMaterializedViewStmt = new CreateMaterializedViewStmt("test", selectStmt, null); @@ -383,7 +417,8 @@ public class CreateMaterializedViewStmtTest { @Injectable FunctionCallExpr functionCallExpr, @Injectable SlotRef functionChild0, @Injectable TableRef tableRef, - @Injectable SelectStmt selectStmt) throws UserException { + @Injectable SelectStmt selectStmt, + @Injectable GroupByClause groupByClause) throws UserException { SelectList selectList = new SelectList(); SelectListItem selectListItem1 = new SelectListItem(slotRef1, null); selectList.addItem(selectListItem1); @@ -401,6 +436,11 @@ public class CreateMaterializedViewStmtTest { final String columnName3 = "k3"; final String columnName4 = "v1"; final String columnName5 = "sum_v2"; + + List groupByList = Lists.newArrayList(); + groupByList.add(slotRef1); + groupByList.add(slotRef2); + groupByList.add(slotRef3); new Expectations() { { analyzer.getClusterName(); @@ -434,6 +474,10 @@ public class CreateMaterializedViewStmtTest { result = columnName4; functionChild0.getColumnName(); result = columnName5; + selectStmt.getGroupByClause(); + result = groupByClause; + groupByClause.getGroupingExprs(); + result = groupByList; } }; @@ -475,11 +519,9 @@ public class CreateMaterializedViewStmtTest { @Test public void testMVColumnsWithoutOrderbyWithoutAggregation(@Injectable SlotRef slotRef1, - @Injectable SlotRef slotRef2, - @Injectable SlotRef slotRef3, - @Injectable SlotRef slotRef4, - @Injectable TableRef tableRef, - @Injectable SelectStmt selectStmt) throws UserException { + @Injectable SlotRef slotRef2, @Injectable SlotRef slotRef3, @Injectable SlotRef slotRef4, + @Injectable TableRef tableRef, @Injectable SelectStmt selectStmt, + @Injectable GroupByClause groupByClause) throws UserException { SelectList selectList = new SelectList(); SelectListItem selectListItem1 = new SelectListItem(slotRef1, null); selectList.addItem(selectListItem1); @@ -494,6 +536,11 @@ public class CreateMaterializedViewStmtTest { final String columnName2 = "k2"; final String columnName3 = "k3"; final String columnName4 = "v1"; + + List groupByList = Lists.newArrayList(); + groupByList.add(slotRef1); + groupByList.add(slotRef2); + groupByList.add(slotRef3); new Expectations() { { analyzer.getClusterName(); @@ -529,6 +576,10 @@ public class CreateMaterializedViewStmtTest { result = 3; slotRef4.getType().getStorageLayoutBytes(); result = 4; + selectStmt.getGroupByClause(); + result = groupByClause; + groupByClause.getGroupingExprs(); + result = groupByList; } }; @@ -565,11 +616,12 @@ public class CreateMaterializedViewStmtTest { @Test public void testMVColumns(@Injectable SlotRef slotRef1, - @Injectable SlotRef slotRef2, - @Injectable FunctionCallExpr functionCallExpr, - @Injectable SlotRef functionChild0, - @Injectable TableRef tableRef, - @Injectable SelectStmt selectStmt) throws UserException { + @Injectable SlotRef slotRef2, + @Injectable FunctionCallExpr functionCallExpr, + @Injectable SlotRef functionChild0, + @Injectable TableRef tableRef, + @Injectable SelectStmt selectStmt, + @Injectable GroupByClause groupByClause) throws UserException { SelectList selectList = new SelectList(); SelectListItem selectListItem1 = new SelectListItem(slotRef1, null); selectList.addItem(selectListItem1); @@ -583,6 +635,9 @@ public class CreateMaterializedViewStmtTest { final String columnName1 = "k1"; final String columnName2 = "v1"; final String columnName3 = "sum_v2"; + List groupByList = Lists.newArrayList(); + groupByList.add(slotRef1); + groupByList.add(slotRef2); new Expectations() { { analyzer.getClusterName(); @@ -612,6 +667,10 @@ public class CreateMaterializedViewStmtTest { result = columnName2; functionChild0.getColumnName(); result = columnName3; + selectStmt.getGroupByClause(); + result = groupByClause; + groupByClause.getGroupingExprs(); + result = groupByList; } }; @@ -639,6 +698,46 @@ public class CreateMaterializedViewStmtTest { Assert.fail(e.getMessage()); } + } + + @Test + public void testDeduplicateMV(@Injectable SlotRef slotRef1, + @Injectable TableRef tableRef, + @Injectable SelectStmt selectStmt, + @Injectable GroupByClause groupByClause) throws UserException { + SelectList selectList = new SelectList(); + SelectListItem selectListItem1 = new SelectListItem(slotRef1, null); + selectList.addItem(selectListItem1); + final String columnName1 = "k1"; + List groupByList = Lists.newArrayList(); + groupByList.add(slotRef1); + new Expectations() { + { + analyzer.getClusterName(); + result = "default"; + selectStmt.getSelectList(); + result = selectList; + selectStmt.analyze(analyzer); + selectStmt.getTableRefs(); + result = Lists.newArrayList(tableRef); + selectStmt.getWhereClause(); + result = null; + slotRef1.getColumnName(); + result = columnName1; + selectStmt.getGroupByClause(); + result = groupByClause; + groupByClause.getGroupingExprs(); + result = groupByList; + } + }; + + CreateMaterializedViewStmt createMaterializedViewStmt = new CreateMaterializedViewStmt("test", selectStmt, null); + try { + createMaterializedViewStmt.analyze(analyzer); + Assert.fail(); + } catch (UserException e) { + System.out.print(e.getMessage()); + } } } diff --git a/fe/src/test/java/org/apache/doris/analysis/ExprTest.java b/fe/src/test/java/org/apache/doris/analysis/ExprTest.java new file mode 100644 index 0000000000..92c7dc43d9 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/analysis/ExprTest.java @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.jmockit.Deencapsulation; + +import com.google.common.collect.Maps; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; +import java.util.Set; + +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; + +public class ExprTest { + + @Test + public void testGetTableNameToColumnNames(@Mocked Analyzer analyzer, + @Injectable SlotDescriptor slotDesc1, + @Injectable SlotDescriptor slotDesc2, + @Injectable TupleDescriptor tupleDescriptor1, + @Injectable TupleDescriptor tupleDescriptor2, + @Injectable Table tableA, + @Injectable Table tableB) throws AnalysisException { + TableName tableAName = new TableName("test", "tableA"); + TableName tableBName = new TableName("test", "tableB"); + SlotRef tableAColumn1 = new SlotRef(tableAName, "c1"); + SlotRef tableBColumn1 = new SlotRef(tableBName, "c1"); + Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, tableAColumn1, tableBColumn1); + Deencapsulation.setField(tableAColumn1, "desc", slotDesc1); + Deencapsulation.setField(tableBColumn1, "desc", slotDesc2); + new Expectations() { + { + slotDesc1.isMaterialized(); + result = true; + slotDesc2.isMaterialized(); + result = true; + slotDesc1.getColumn().getName(); + result = "c1"; + slotDesc2.getColumn().getName(); + result = "c1"; + slotDesc1.getParent(); + result = tupleDescriptor1; + slotDesc2.getParent(); + result = tupleDescriptor2; + tupleDescriptor1.getTable(); + result = tableA; + tupleDescriptor2.getTable(); + result = tableB; + tableA.getName(); + result = "tableA"; + tableB.getName(); + result = "tableB"; + + } + }; + + Map> tableNameToColumnNames = Maps.newHashMap(); + whereExpr.getTableNameToColumnNames(tableNameToColumnNames); + Assert.assertEquals(tableNameToColumnNames.size(), 2); + Set tableAColumns = tableNameToColumnNames.get("tableA"); + Assert.assertNotEquals(tableAColumns, null); + Assert.assertTrue(tableAColumns.contains("c1")); + Set tableBColumns = tableNameToColumnNames.get("tableB"); + Assert.assertNotEquals(tableBColumns, null); + Assert.assertTrue(tableBColumns.contains("c1")); + } +} diff --git a/fe/src/test/java/org/apache/doris/backup/CatalogMocker.java b/fe/src/test/java/org/apache/doris/backup/CatalogMocker.java index c8b9ac16a5..d911ae91aa 100644 --- a/fe/src/test/java/org/apache/doris/backup/CatalogMocker.java +++ b/fe/src/test/java/org/apache/doris/backup/CatalogMocker.java @@ -241,6 +241,7 @@ public class CatalogMocker { partitionInfo.setDataProperty(TEST_SINGLE_PARTITION_ID, dataProperty); OlapTable olapTable = new OlapTable(TEST_TBL_ID, TEST_TBL_NAME, TEST_TBL_BASE_SCHEMA, KeysType.AGG_KEYS, partitionInfo, distributionInfo); + Deencapsulation.setField(olapTable, "baseIndexId", TEST_TBL_ID); Tablet tablet0 = new Tablet(TEST_TABLET0_ID); TabletMeta tabletMeta = new TabletMeta(TEST_DB_ID, TEST_TBL_ID, TEST_SINGLE_PARTITION_ID, @@ -312,6 +313,7 @@ public class CatalogMocker { OlapTable olapTable2 = new OlapTable(TEST_TBL2_ID, TEST_TBL2_NAME, TEST_TBL_BASE_SCHEMA, KeysType.AGG_KEYS, rangePartitionInfo, distributionInfo2); + Deencapsulation.setField(olapTable2, "baseIndexId", TEST_TBL2_ID); Tablet baseTabletP1 = new Tablet(TEST_BASE_TABLET_P1_ID); TabletMeta tabletMetaBaseTabletP1 = new TabletMeta(TEST_DB_ID, TEST_TBL2_ID, TEST_PARTITION1_ID, diff --git a/fe/src/test/java/org/apache/doris/catalog/DatabaseTest.java b/fe/src/test/java/org/apache/doris/catalog/DatabaseTest.java index d914dc66e0..4ceecd22f1 100644 --- a/fe/src/test/java/org/apache/doris/catalog/DatabaseTest.java +++ b/fe/src/test/java/org/apache/doris/catalog/DatabaseTest.java @@ -17,12 +17,15 @@ package org.apache.doris.catalog; -import mockit.Expectations; -import mockit.Mocked; import org.apache.doris.catalog.MaterializedIndex.IndexState; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.persist.CreateTableInfo; import org.apache.doris.persist.EditLog; +import org.apache.doris.thrift.TStorageType; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.junit.Assert; import org.junit.Before; @@ -36,8 +39,12 @@ import java.io.FileOutputStream; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; +import mockit.Expectations; +import mockit.Mocked; + public class DatabaseTest { private Database db; @@ -171,6 +178,13 @@ public class DatabaseTest { Partition partition = new Partition(20000L, "table", index, new RandomDistributionInfo(10)); OlapTable table = new OlapTable(1000, "table", columns, KeysType.AGG_KEYS, new SinglePartitionInfo(), new RandomDistributionInfo(10)); + List column = Lists.newArrayList(); + short schemaHash = 1; + table.setIndexSchemaInfo(new Long(1), "test", column, 1, 1, schemaHash); + Deencapsulation.setField(table, "baseIndexId", 1); + Map indexIdToStorageType = Maps.newHashMap(); + indexIdToStorageType.put(new Long(1), TStorageType.COLUMN); + Deencapsulation.setField(table, "indexIdToStorageType", indexIdToStorageType); table.addPartition(partition); db2.createTable(table); db2.write(dos); diff --git a/fe/src/test/java/org/apache/doris/catalog/TableTest.java b/fe/src/test/java/org/apache/doris/catalog/TableTest.java index 81b99f23d8..0041513d93 100644 --- a/fe/src/test/java/org/apache/doris/catalog/TableTest.java +++ b/fe/src/test/java/org/apache/doris/catalog/TableTest.java @@ -26,8 +26,15 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.thrift.TStorageType; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -71,8 +78,15 @@ public class TableTest { columns.add(new Column("column10", ScalarType.createType(PrimitiveType.DATE), true, null, "", "")); columns.add(new Column("column11", ScalarType.createType(PrimitiveType.DATETIME), true, null, "", "")); - Table table1 = new OlapTable(1000L, "group1", columns, KeysType.AGG_KEYS, - new SinglePartitionInfo(), new RandomDistributionInfo(10)); + OlapTable table1 = new OlapTable(1000L, "group1", columns, KeysType.AGG_KEYS, + new SinglePartitionInfo(), new RandomDistributionInfo(10)); + List column = Lists.newArrayList(); + short schemaHash = 1; + table1.setIndexSchemaInfo(new Long(1), "test", column, 1, 1, schemaHash); + Deencapsulation.setField(table1, "baseIndexId", 1); + Map indexIdToStorageType = Maps.newHashMap(); + indexIdToStorageType.put(new Long(1), TStorageType.COLUMN); + Deencapsulation.setField(table1, "indexIdToStorageType", indexIdToStorageType); table1.write(dos); dos.flush(); dos.close(); diff --git a/fe/src/test/java/org/apache/doris/common/util/UnitTestUtil.java b/fe/src/test/java/org/apache/doris/common/util/UnitTestUtil.java index 6f1c080ecb..fa52c05cc3 100644 --- a/fe/src/test/java/org/apache/doris/common/util/UnitTestUtil.java +++ b/fe/src/test/java/org/apache/doris/common/util/UnitTestUtil.java @@ -37,6 +37,7 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.Config; import org.apache.doris.common.LoadException; +import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.load.DppConfig; import org.apache.doris.load.Load; import org.apache.doris.system.Backend; @@ -112,6 +113,7 @@ public class UnitTestUtil { partitionInfo.setIsInMemory(partitionId, false); OlapTable table = new OlapTable(tableId, TABLE_NAME, columns, KeysType.AGG_KEYS, partitionInfo, distributionInfo); + Deencapsulation.setField(table, "baseIndexId", indexId); table.addPartition(partition); table.setIndexSchemaInfo(indexId, TABLE_NAME, columns, 0, SCHEMA_HASH, (short) 1); table.setStorageTypeToIndex(indexId, TStorageType.COLUMN); diff --git a/fe/src/test/java/org/apache/doris/persist/CreateTableInfoTest.java b/fe/src/test/java/org/apache/doris/persist/CreateTableInfoTest.java index 67b193f519..e7f53edc2c 100644 --- a/fe/src/test/java/org/apache/doris/persist/CreateTableInfoTest.java +++ b/fe/src/test/java/org/apache/doris/persist/CreateTableInfoTest.java @@ -24,6 +24,7 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.doris.catalog.FakeCatalog; import org.apache.doris.catalog.ScalarType; @@ -44,6 +45,10 @@ import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.SinglePartitionInfo; import org.apache.doris.catalog.MaterializedIndex.IndexState; import org.apache.doris.common.FeConstants; +import org.apache.doris.thrift.TStorageType; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; public class CreateTableInfoTest { private Catalog catalog; @@ -89,6 +94,13 @@ public class CreateTableInfoTest { Partition partition = new Partition(20000L, "table", index, distributionInfo); OlapTable table = new OlapTable(1000L, "table", columns, KeysType.AGG_KEYS, new SinglePartitionInfo(), distributionInfo); + List column = Lists.newArrayList(); + short schemaHash = 1; + table.setIndexSchemaInfo(new Long(1), "test", column, 1, 1, schemaHash); + Deencapsulation.setField(table, "baseIndexId", 1); + Map indexIdToStorageType = Maps.newHashMap(); + indexIdToStorageType.put(new Long(1), TStorageType.COLUMN); + Deencapsulation.setField(table, "indexIdToStorageType", indexIdToStorageType); table.addPartition(partition); CreateTableInfo info = new CreateTableInfo("db1", table); info.write(dos); diff --git a/fe/src/test/java/org/apache/doris/planner/MaterializedViewFunctionTest.java b/fe/src/test/java/org/apache/doris/planner/MaterializedViewFunctionTest.java new file mode 100644 index 0000000000..7bee2a810c --- /dev/null +++ b/fe/src/test/java/org/apache/doris/planner/MaterializedViewFunctionTest.java @@ -0,0 +1,571 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.utframe.DorisAssert; +import org.apache.doris.utframe.UtFrameUtils; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +public class MaterializedViewFunctionTest { + private static String baseDir = "fe"; + private static String runningDir = baseDir + "/mocked/MaterializedViewFunctionTest/" + + UUID.randomUUID().toString() + "/"; + private static final String EMPS_TABLE_NAME = "emps"; + private static final String EMPS_MV_NAME = "emps_mv"; + private static final String HR_DB_NAME = "db1"; + private static final String QUERY_USE_EMPS_MV = "rollup: " + EMPS_MV_NAME; + private static final String QUERY_USE_EMPS = "rollup: " + EMPS_TABLE_NAME; + private static final String DEPTS_TABLE_NAME = "depts"; + private static final String DEPTS_MV_NAME = "depts_mv"; + private static final String QUERY_USE_DEPTS_MV = "rollup: " + DEPTS_MV_NAME; + private static final String QUERY_USE_DEPTS = "rollup: " + DEPTS_TABLE_NAME; + private static DorisAssert dorisAssert; + + @BeforeClass + public static void beforeClass() throws Exception { + UtFrameUtils.createMinDorisCluster(runningDir); + dorisAssert = new DorisAssert(); + dorisAssert.withEnableMV().withDatabase(HR_DB_NAME).useDatabase(HR_DB_NAME); + } + + @Before + public void beforeMethod() throws Exception { + String createTableSQL = "create table " + HR_DB_NAME + "." + EMPS_TABLE_NAME + " (empid int, name varchar, " + + "deptno int, salary int, commission int) " + + "distributed by hash(empid) buckets 3 properties('replication_num' = '1');"; + dorisAssert.withTable(createTableSQL); + createTableSQL = "create table " + HR_DB_NAME + "." + DEPTS_TABLE_NAME + + " (deptno int, name varchar, cost int) " + + "distributed by hash(deptno) buckets 3 properties('replication_num' = '1');"; + dorisAssert.withTable(createTableSQL); + } + + @After + public void afterMethod() throws Exception { + dorisAssert.dropTable(EMPS_TABLE_NAME); + dorisAssert.dropTable(DEPTS_TABLE_NAME); + } + + @AfterClass + public static void afterClass() throws Exception { + UtFrameUtils.cleanDorisFeDir(baseDir); + } + + @Test + public void testProjectionMV1() throws Exception { + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, empid from " + + EMPS_TABLE_NAME + " order by deptno;"; + String query = "select empid, deptno from " + EMPS_TABLE_NAME + ";"; + dorisAssert.withMaterializedView(createMVSQL); + dorisAssert.query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testProjectionMV2() throws Exception { + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, empid from " + + EMPS_TABLE_NAME + " order by deptno;"; + String query1 = "select empid + 1 from " + EMPS_TABLE_NAME + " where deptno = 10;"; + dorisAssert.withMaterializedView(createMVSQL); + dorisAssert.query(query1).explainContains(QUERY_USE_EMPS_MV); + String query2 = "select name from " + EMPS_TABLE_NAME + " where deptno -10 = 0;"; + dorisAssert.query(query2).explainWithout(QUERY_USE_EMPS_MV); + + } + + @Test + public void testProjectionMV3() throws Exception { + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, empid, name from " + + EMPS_TABLE_NAME + " order by deptno;"; + String query1 = "select empid +1, name from " + EMPS_TABLE_NAME + " where deptno = 10;"; + dorisAssert.withMaterializedView(createMVSQL); + dorisAssert.query(query1).explainContains(QUERY_USE_EMPS_MV); + String query2 = "select name from " + EMPS_TABLE_NAME + " where deptno - 10 = 0;"; + dorisAssert.query(query2).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testProjectionMV4() throws Exception { + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as select name, deptno, salary from " + + EMPS_TABLE_NAME + ";"; + String query1 = "select name from " + EMPS_TABLE_NAME + " where deptno > 30 and salary > 3000;"; + dorisAssert.withMaterializedView(createMVSQL); + dorisAssert.query(query1).explainContains(QUERY_USE_EMPS_MV); + String query2 = "select empid from " + EMPS_TABLE_NAME + " where deptno > 30 and empid > 10;"; + dorisAssert.query(query2).explainWithout(QUERY_USE_EMPS_MV); + } + + @Test + public void testUnionQueryOnProjectionMV() throws Exception { + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, empid from " + + EMPS_TABLE_NAME + " order by deptno;"; + String union = "select empid from " + EMPS_TABLE_NAME + " where deptno > 300" + " union all select empid from" + + " " + EMPS_TABLE_NAME + " where deptno < 200"; + dorisAssert.withMaterializedView(createMVSQL).query(union).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testAggQueryOnAggMV1() throws Exception { + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, sum(salary), max" + "" + + "(salary) from " + EMPS_TABLE_NAME + " group by deptno;"; + String query = "select sum(salary), deptno from " + EMPS_TABLE_NAME + " group by deptno;"; + dorisAssert.withMaterializedView(createMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testAggQueryOnAggMV2() throws Exception { + String agg = "select deptno, sum(salary) from " + EMPS_TABLE_NAME + " group by deptno"; + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as " + agg + ";"; + String query = "select * from (select deptno, sum(salary) as sum_salary from " + EMPS_TABLE_NAME + " group " + + "by" + " deptno) a where (sum_salary * 2) > 3;"; + dorisAssert.withMaterializedView(createMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV); + } + + /* + TODO + The deduplicate materialized view is not yet supported + @Test + public void testAggQueryOnDeduplicatedMV() throws Exception { + String deduplicateSQL = "select deptno, empid, name, salary, commission from " + EMPS_TABLE_NAME + " group " + + "by" + " deptno, empid, name, salary, commission"; + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as " + deduplicateSQL + ";"; + String query1 = "select deptno, sum(salary) from (" + deduplicateSQL + ") A group by deptno;"; + dorisAssert.withMaterializedView(createMVSQL); + dorisAssert.query(query1).explainContains(QUERY_USE_EMPS_MV); + String query2 = "select deptno, empid from " + EMPS_TABLE_NAME + ";"; + dorisAssert.query(query2).explainWithout(QUERY_USE_EMPS_MV); + } + */ + + @Test + public void testAggQueryOnAggMV3() throws Exception { + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, commission, sum(salary)" + + " from " + EMPS_TABLE_NAME + " group by deptno, commission;"; + String query = "select commission, sum(salary) from " + EMPS_TABLE_NAME + " where commission * (deptno + " + + "commission) = 100 group by commission;"; + dorisAssert.withMaterializedView(createMVSQL); + dorisAssert.query(query).explainContains(QUERY_USE_EMPS_MV); + } + + /** + * Matching failed because the filtering condition under Aggregate + * references columns for aggregation. + */ + @Test + public void testAggQueryOnAggMV4() throws Exception { + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, commission, sum(salary)" + + " from " + EMPS_TABLE_NAME + " group by deptno, commission;"; + String query = "select deptno, sum(salary) from " + EMPS_TABLE_NAME + " where salary>1000 group by deptno;"; + dorisAssert.withMaterializedView(createMVSQL); + dorisAssert.query(query).explainWithout(QUERY_USE_EMPS_MV); + } + + /** + * There will be a compensating Project added after matching of the Aggregate. + */ + @Test + public void testAggQuqeryOnAggMV5() throws Exception { + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, commission, sum(salary)" + + " from " + EMPS_TABLE_NAME + " group by deptno, commission;"; + String query = "select * from (select deptno, sum(salary) as sum_salary from " + EMPS_TABLE_NAME + + " group by deptno) a where sum_salary>10;"; + dorisAssert.withMaterializedView(createMVSQL); + dorisAssert.query(query).explainContains(QUERY_USE_EMPS_MV); + } + + /** + * There will be a compensating Project + Filter added after matching of the Aggregate. + * + * @throws Exception + */ + @Test + public void testAggQuqeryOnAggMV6() throws Exception { + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, commission, sum(salary)" + + " from " + EMPS_TABLE_NAME + " group by deptno, commission;"; + String query = "select * from (select deptno, sum(salary) as sum_salary from " + EMPS_TABLE_NAME + + " where deptno>=20 group by deptno) a where sum_salary>10;"; + dorisAssert.withMaterializedView(createMVSQL); + dorisAssert.query(query).explainContains(QUERY_USE_EMPS_MV); + } + + /** + * Aggregation query with groupSets at coarser level of aggregation than + * aggregation materialized view. + */ + @Test + public void testGroupingSetQueryOnAggMV() throws Exception { + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as select empid, deptno, sum(salary) " + + "from " + EMPS_TABLE_NAME + " group by empid, deptno;"; + String query = "select sum(salary), empid, deptno from " + EMPS_TABLE_NAME + " group by rollup(empid,deptno);"; + dorisAssert.withMaterializedView(createMVSQL); + dorisAssert.query(query).explainContains(QUERY_USE_EMPS_MV); + } + + /** + * Aggregation query at coarser level of aggregation than aggregation materialized view. + */ + @Test + public void testAggQuqeryOnAggMV7() throws Exception { + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, commission, sum(salary) " + + "from " + EMPS_TABLE_NAME + " " + "group by deptno, commission;"; + String query = "select deptno, sum(salary) from " + EMPS_TABLE_NAME + " where deptno>=20 group by deptno;"; + dorisAssert.withMaterializedView(createMVSQL); + dorisAssert.query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testAggQueryOnAggMV8() throws Exception { + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, sum(salary) " + + "from " + EMPS_TABLE_NAME + " group by deptno;"; + String query = "select deptno, sum(salary) + 1 from " + EMPS_TABLE_NAME + " group by deptno;"; + dorisAssert.withMaterializedView(createMVSQL); + dorisAssert.query(query).explainContains(QUERY_USE_EMPS_MV); + } + + /** + * Query with cube and arithmetic expr + */ + @Test + public void testAggQueryOnAggMV9() throws Exception { + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, commission, sum(salary) " + + "from " + EMPS_TABLE_NAME + " group by deptno, commission;"; + String query = "select deptno, commission, sum(salary) + 1 from " + EMPS_TABLE_NAME + + " group by cube(deptno,commission);"; + dorisAssert.withMaterializedView(createMVSQL); + dorisAssert.query(query).explainContains(QUERY_USE_EMPS_MV); + } + + /** + * Query with rollup and arithmetic expr + */ + @Test + public void testAggQueryOnAggMV10() throws Exception { + String createMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, commission, sum(salary) " + + "from " + EMPS_TABLE_NAME + " group by deptno, commission;"; + String query = "select deptno, commission, sum(salary) + 1 from " + EMPS_TABLE_NAME + + " group by rollup (deptno, commission);"; + dorisAssert.withMaterializedView(createMVSQL); + dorisAssert.query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testJoinOnLeftProjectToJoin() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + + " as select deptno, sum(salary), sum(commission) from " + EMPS_TABLE_NAME + " group by deptno;"; + String createDeptsMVSQL = "create materialized view " + DEPTS_MV_NAME + " as select deptno, max(cost) from " + + DEPTS_TABLE_NAME + " group by deptno;"; + String query = "select * from (select deptno , sum(salary) from " + EMPS_TABLE_NAME + " group by deptno) A " + + "join (select deptno, max(cost) from " + DEPTS_TABLE_NAME + " group by deptno ) B on A.deptno = B" + + ".deptno;"; + dorisAssert.withMaterializedView(createDeptsMVSQL).withMaterializedView(createEmpsMVSQL).query(query) + .explainContains(QUERY_USE_EMPS_MV, QUERY_USE_DEPTS_MV); + } + + @Test + public void testJoinOnRightProjectToJoin() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, sum(salary), sum" + + "(commission) from " + EMPS_TABLE_NAME + " group by deptno;"; + String createDeptsMVSQL = "create materialized view " + DEPTS_MV_NAME + " as select deptno, max(cost) from " + + DEPTS_TABLE_NAME + " group by deptno;"; + String query = "select * from (select deptno , sum(salary), sum(commission) from " + EMPS_TABLE_NAME + + " group by deptno) A join (select deptno from " + DEPTS_TABLE_NAME + " group by deptno ) B on A" + + ".deptno = B.deptno;"; + dorisAssert.withMaterializedView(createDeptsMVSQL).withMaterializedView(createEmpsMVSQL).query(query) + .explainContains(QUERY_USE_EMPS_MV, QUERY_USE_DEPTS_MV); + } + + @Test + public void testJoinOnProjectsToJoin() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, sum(salary), sum" + + "(commission) from " + EMPS_TABLE_NAME + " group by deptno;"; + String createDeptsMVSQL = "create materialized view " + DEPTS_MV_NAME + " as select deptno, max(cost) from " + + DEPTS_TABLE_NAME + " group by deptno;"; + String query = "select * from (select deptno , sum(salary) from " + EMPS_TABLE_NAME + " group by deptno) A " + + "join (select deptno from " + DEPTS_TABLE_NAME + " group by deptno ) B on A.deptno = B.deptno;"; + dorisAssert.withMaterializedView(createDeptsMVSQL).withMaterializedView(createEmpsMVSQL).query(query) + .explainContains(QUERY_USE_EMPS_MV, QUERY_USE_DEPTS_MV); + } + + @Test + public void testJoinOnCalcToJoin0() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select empid, deptno from " + + EMPS_TABLE_NAME + ";"; + String createDeptsMVSQL = "create materialized view " + DEPTS_MV_NAME + " as select deptno from " + + DEPTS_TABLE_NAME + ";"; + String query = "select * from (select empid, deptno from " + EMPS_TABLE_NAME + " where deptno > 10 ) A " + + "join (select deptno from " + DEPTS_TABLE_NAME + " ) B on A.deptno = B.deptno;"; + dorisAssert.withMaterializedView(createDeptsMVSQL).withMaterializedView(createEmpsMVSQL).query(query) + .explainContains(QUERY_USE_EMPS_MV, QUERY_USE_DEPTS_MV); + } + + @Test + public void testJoinOnCalcToJoin1() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select empid, deptno from " + + EMPS_TABLE_NAME + ";"; + String createDeptsMVSQL = "create materialized view " + DEPTS_MV_NAME + " as select deptno from " + + DEPTS_TABLE_NAME + ";"; + String query = "select * from (select empid, deptno from " + EMPS_TABLE_NAME + " ) A join (select " + + "deptno from " + DEPTS_TABLE_NAME + " where deptno > 10 ) B on A.deptno = B.deptno;"; + dorisAssert.withMaterializedView(createDeptsMVSQL).withMaterializedView(createEmpsMVSQL).query(query) + .explainContains(QUERY_USE_EMPS_MV, QUERY_USE_DEPTS_MV); + } + + @Test + public void testJoinOnCalcToJoin2() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select empid, deptno from " + + EMPS_TABLE_NAME + ";"; + String createDeptsMVSQL = "create materialized view " + DEPTS_MV_NAME + " as select deptno from " + + DEPTS_TABLE_NAME + ";"; + String query = "select * from (select empid, deptno from " + EMPS_TABLE_NAME + " where empid >10 ) A " + + "join (select deptno from " + DEPTS_TABLE_NAME + " where deptno > 10 ) B on A.deptno = B.deptno;"; + dorisAssert.withMaterializedView(createDeptsMVSQL).withMaterializedView(createEmpsMVSQL).query(query) + .explainContains(QUERY_USE_EMPS_MV, QUERY_USE_DEPTS_MV); + } + + @Test + public void testJoinOnCalcToJoin3() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select empid, deptno from " + + EMPS_TABLE_NAME + ";"; + String createDeptsMVSQL = "create materialized view " + DEPTS_MV_NAME + " as select deptno from " + + DEPTS_TABLE_NAME + ";"; + String query = "select * from (select empid, deptno + 1 deptno from " + EMPS_TABLE_NAME + " where empid >10 )" + + " A join (select deptno from " + DEPTS_TABLE_NAME + + " where deptno > 10 ) B on A.deptno = B.deptno;"; + dorisAssert.withMaterializedView(createDeptsMVSQL).withMaterializedView(createEmpsMVSQL).query(query) + .explainContains(QUERY_USE_EMPS_MV, QUERY_USE_DEPTS_MV); + } + + @Test + public void testJoinOnCalcToJoin4() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select empid, deptno from " + + EMPS_TABLE_NAME + ";"; + String createDeptsMVSQL = "create materialized view " + DEPTS_MV_NAME + " as select deptno from " + + DEPTS_TABLE_NAME + ";"; + String query = "select * from (select empid, deptno + 1 deptno from " + EMPS_TABLE_NAME + + " where empid is not null ) A full join (select deptno from " + DEPTS_TABLE_NAME + + " where deptno is not null ) B on A.deptno = B.deptno;"; + dorisAssert.withMaterializedView(createDeptsMVSQL).withMaterializedView(createEmpsMVSQL).query(query) + .explainContains(QUERY_USE_EMPS_MV, QUERY_USE_DEPTS_MV); + } + + @Test + public void testOrderByQueryOnProjectView() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, empid from " + + EMPS_TABLE_NAME + ";"; + String query = "select empid from " + EMPS_TABLE_NAME + " order by deptno"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testOrderByQueryOnOrderByView() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, empid from " + + EMPS_TABLE_NAME + " order by deptno;"; + String query = "select empid from " + EMPS_TABLE_NAME + " order by deptno"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testQueryOnStar() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, empid, name, " + + "salary, commission from " + EMPS_TABLE_NAME + " order by deptno, empid;"; + String query = "select * from " + EMPS_TABLE_NAME + " where deptno = 1"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testQueryOnStarAndJoin() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, empid, name, " + + "salary, commission from " + EMPS_TABLE_NAME + " order by deptno, empid;"; + String query = "select * from " + EMPS_TABLE_NAME + " join depts on " + EMPS_TABLE_NAME + ".deptno = " + + DEPTS_TABLE_NAME + ".deptno"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testAggregateMVAggregateFuncs1() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select empid, deptno, sum(salary) " + + "from " + EMPS_TABLE_NAME + " group by empid, deptno;"; + String query = "select deptno from " + EMPS_TABLE_NAME + " group by deptno"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testAggregateMVAggregateFuncs2() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select empid, deptno, sum(salary) " + + "from " + EMPS_TABLE_NAME + " group by empid, deptno;"; + String query = "select deptno, sum(salary) from " + EMPS_TABLE_NAME + " group by deptno"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testAggregateMVAggregateFuncs3() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select empid, deptno, sum(salary) " + + "from " + EMPS_TABLE_NAME + " group by empid, deptno;"; + String query = "select deptno, empid, sum(salary) from " + EMPS_TABLE_NAME + " group by deptno, empid"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testAggregateMVAggregateFuncs4() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select empid, deptno, sum(salary) " + + "from " + EMPS_TABLE_NAME + " group by empid, deptno;"; + String query = "select deptno, sum(salary) from " + EMPS_TABLE_NAME + " where deptno > 10 group by deptno"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testAggregateMVAggregateFuncs5() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, empid, sum(salary) " + + "from " + EMPS_TABLE_NAME + " group by empid, deptno;"; + String query = "select deptno, sum(salary) + 1 from " + EMPS_TABLE_NAME + " where deptno > 10 group by deptno"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testAggregateMVCalcGroupByQuery1() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, empid, sum(salary) " + + "from " + EMPS_TABLE_NAME + " group by empid, deptno;"; + String query = "select deptno+1, sum(salary) + 1 from " + EMPS_TABLE_NAME + " where deptno > 10 " + + "group by deptno+1;"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testAggregateMVCalcGroupByQuery2() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, empid, sum(salary) " + + "from " + EMPS_TABLE_NAME + " group by empid, deptno;"; + String query = "select deptno * empid, sum(salary) + 1 from " + EMPS_TABLE_NAME + " where deptno > 10 " + + "group by deptno * empid;"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testAggregateMVCalcGroupByQuery3() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, empid, sum(salary) " + + "from " + EMPS_TABLE_NAME + " group by empid, deptno;"; + String query = "select empid, deptno * empid, sum(salary) + 1 from " + EMPS_TABLE_NAME + " where deptno > 10 " + + "group by empid, deptno * empid;"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testAggregateMVCalcAggFunctionQuery() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, empid, sum(salary) " + + "from " + EMPS_TABLE_NAME + " group by empid, deptno;"; + String query = "select deptno, sum(salary + 1) from " + EMPS_TABLE_NAME + " where deptno > 10 " + + "group by deptno;"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainWithout(QUERY_USE_EMPS_MV); + } + + @Test + public void testSubQuery() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, empid " + + "from " + EMPS_TABLE_NAME + ";"; + String query = "select empid, deptno, salary from " + EMPS_TABLE_NAME + " e1 where empid = (select max(empid)" + + " from " + EMPS_TABLE_NAME + " where deptno = e1.deptno);"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV, + QUERY_USE_EMPS); + } + + @Test + public void testDistinctQuery() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, sum(salary) " + + "from " + EMPS_TABLE_NAME + " group by deptno;"; + String query1 = "select distinct deptno from " + EMPS_TABLE_NAME + ";"; + dorisAssert.withMaterializedView(createEmpsMVSQL); + dorisAssert.query(query1).explainContains(QUERY_USE_EMPS_MV); + String query2 = "select deptno, sum(distinct salary) from " + EMPS_TABLE_NAME + " group by deptno;"; + dorisAssert.query(query2).explainWithout(QUERY_USE_EMPS_MV); + } + + @Test + public void testSingleMVMultiUsage() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select deptno, empid, salary " + + "from " + EMPS_TABLE_NAME + " order by deptno;"; + String query = "select * from (select deptno, empid from " + EMPS_TABLE_NAME + " where deptno>100) A join " + + "(select deptno, empid from " + EMPS_TABLE_NAME + " where deptno >200) B using (deptno);"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV, 2); + } + + @Test + public void testMultiMVMultiUsage() throws Exception { + String createEmpsMVSQL01 = "create materialized view emp_mv_01 as select deptno, empid, salary " + + "from " + EMPS_TABLE_NAME + " order by deptno;"; + String createEmpsMVSQL02 = "create materialized view emp_mv_02 as select deptno, sum(salary) " + + "from " + EMPS_TABLE_NAME + " group by deptno;"; + String query = "select * from (select deptno, empid from " + EMPS_TABLE_NAME + " where deptno>100) A join " + + "(select deptno, sum(salary) from " + EMPS_TABLE_NAME + " where deptno >200 group by deptno) B " + + "using (deptno);"; + dorisAssert.withMaterializedView(createEmpsMVSQL01).withMaterializedView(createEmpsMVSQL02).query(query) + .explainContains("rollup: emp_mv_01", "rollup: emp_mv_02"); + } + + @Test + public void testMVOnJoinQuery() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select salary, empid, deptno from " + + EMPS_TABLE_NAME + " order by salary;"; + String query = "select empid, salary from " + EMPS_TABLE_NAME + " join " + DEPTS_TABLE_NAME + + " using (deptno) where salary > 300;"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV, + QUERY_USE_DEPTS); + } + + // TODO: should be support + @Test + public void testAggregateMVOnCountDistinctQuery1() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select empid, deptno, sum(salary) " + + "from " + EMPS_TABLE_NAME + " group by empid, deptno;"; + String query = "select deptno, count(distinct empid) from " + EMPS_TABLE_NAME + " group by deptno;"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testQueryAfterTrimingOfUnusedFields() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select empid, deptno from " + + EMPS_TABLE_NAME + " order by empid, deptno;"; + String query = "select empid, deptno from (select empid, deptno, salary from " + EMPS_TABLE_NAME + ") A;"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV); + } + + @Test + public void testUnionAll() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select empid, deptno from " + + EMPS_TABLE_NAME + " order by empid, deptno;"; + String query = "select empid, deptno from " + EMPS_TABLE_NAME + " where empid >1 union all select empid," + + " deptno from " + EMPS_TABLE_NAME + " where empid <0;"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV, 2); + } + + @Test + public void testUnionDistinct() throws Exception { + String createEmpsMVSQL = "create materialized view " + EMPS_MV_NAME + " as select empid, deptno from " + + EMPS_TABLE_NAME + " order by empid, deptno;"; + String query = "select empid, deptno from " + EMPS_TABLE_NAME + " where empid >1 union select empid," + + " deptno from " + EMPS_TABLE_NAME + " where empid <0;"; + dorisAssert.withMaterializedView(createEmpsMVSQL).query(query).explainContains(QUERY_USE_EMPS_MV, 2); + } +} diff --git a/fe/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java b/fe/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java new file mode 100644 index 0000000000..12aa5aea25 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/planner/MaterializedViewSelectorTest.java @@ -0,0 +1,407 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.AggregateInfo; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TableName; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.jmockit.Deencapsulation; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import mockit.Expectations; +import mockit.Injectable; + +public class MaterializedViewSelectorTest { + + + @Test + public void initTest(@Injectable SelectStmt selectStmt, + @Injectable SlotDescriptor tableAColumn1Desc, + @Injectable AggregateInfo aggregateInfo, + @Injectable Table tableA, + @Injectable TupleDescriptor tableADesc, + @Injectable SlotDescriptor tableAColumn2Desc, + @Injectable SlotDescriptor tableBColumn1Desc, + @Injectable TupleDescriptor tableBDesc, + @Injectable Table tableB, + @Injectable Analyzer analyzer) { + TableName tableAName = new TableName("test", "tableA"); + TableName tableBName = new TableName("test", "tableB"); + SlotRef tableAColumn1 = new SlotRef(tableAName, "c1"); + Deencapsulation.setField(tableAColumn1, "isAnalyzed", true); + SlotRef tableAColumn2 = new SlotRef(tableAName, "c2"); + Deencapsulation.setField(tableAColumn2, "isAnalyzed", true); + SlotRef tableBColumn1 = new SlotRef(tableBName, "c1"); + Deencapsulation.setField(tableBColumn1, "isAnalyzed", true); + Deencapsulation.setField(tableAColumn1, "desc", tableAColumn1Desc); + Deencapsulation.setField(tableAColumn2, "desc", tableAColumn2Desc); + Deencapsulation.setField(tableBColumn1, "desc", tableBColumn1Desc); + FunctionCallExpr tableAColumn2Sum = new FunctionCallExpr("SUM", Lists.newArrayList(tableAColumn2)); + FunctionCallExpr tableBColumn1Max = new FunctionCallExpr("MAX", Lists.newArrayList(tableBColumn1)); + new Expectations() { + { + selectStmt.getAggInfo(); + result = aggregateInfo; + aggregateInfo.getGroupingExprs(); + result = Lists.newArrayList(tableAColumn1); + tableAColumn1Desc.isMaterialized(); + result = true; + tableAColumn1Desc.getColumn().getName(); + result = "c1"; + tableAColumn1Desc.getParent(); + result = tableADesc; + tableADesc.getTable(); + result = tableA; + tableA.getName(); + result = "tableA"; + + aggregateInfo.getAggregateExprs(); + result = Lists.newArrayList(tableAColumn2Sum, tableBColumn1Max); + tableAColumn2Sum.getChildren(); + result = Lists.newArrayList(tableAColumn2); + tableBColumn1Max.getChildren(); + result = Lists.newArrayList(tableBColumn1); + tableAColumn2.getColumnName(); + result = "c2"; + tableBColumn1.getColumnName(); + result = "c1"; + tableAColumn2.getTableName().getTbl(); + result = "tableA"; + tableBColumn1.getTableName().getTbl(); + result = "tableB"; + + selectStmt.getResultExprs(); + result = Lists.newArrayList(tableAColumn1, tableAColumn2Sum, tableBColumn1Max); + tableAColumn2Desc.isMaterialized(); + result = true; + tableAColumn2Desc.getColumn().getName(); + result = "c2"; + tableAColumn2Desc.getParent(); + result = tableADesc; + tableBColumn1Desc.isMaterialized(); + result = true; + tableBColumn1Desc.getColumn().getName(); + result = "c1"; + tableBColumn1Desc.getParent(); + result = tableBDesc; + tableBDesc.getTable(); + result = tableB; + tableB.getName(); + result = "tableB"; + } + }; + + MaterializedViewSelector materializedViewSelector = new MaterializedViewSelector(selectStmt, analyzer); + Map> columnNamesInPredicates = + Deencapsulation.getField(materializedViewSelector, "columnNamesInPredicates"); + Assert.assertEquals(0, columnNamesInPredicates.size()); + Assert.assertFalse(Deencapsulation.getField(materializedViewSelector, "isSPJQuery")); + Map> columnNamesInGrouping = + Deencapsulation.getField(materializedViewSelector, "columnNamesInGrouping"); + Assert.assertEquals(1, columnNamesInGrouping.size()); + Set tableAColumnNamesInGrouping = columnNamesInGrouping.get("tableA"); + Assert.assertNotEquals(tableAColumnNamesInGrouping, null); + Assert.assertEquals(1, tableAColumnNamesInGrouping.size()); + Assert.assertTrue(tableAColumnNamesInGrouping.contains("c1")); + Map> aggregateColumnsInQuery = + Deencapsulation.getField(materializedViewSelector, "aggregateColumnsInQuery"); + Assert.assertEquals(2, aggregateColumnsInQuery.size()); + Set tableAAgggregatedColumns = aggregateColumnsInQuery.get("tableA"); + Assert.assertEquals(1, tableAAgggregatedColumns.size()); + MaterializedViewSelector.AggregatedColumn aggregatedColumn1 = tableAAgggregatedColumns.iterator().next(); + Assert.assertEquals("c2", Deencapsulation.getField(aggregatedColumn1, "columnName")); + Assert.assertTrue("SUM".equalsIgnoreCase(Deencapsulation.getField(aggregatedColumn1, "aggFunctionName"))); + Set tableBAgggregatedColumns = aggregateColumnsInQuery.get("tableB"); + Assert.assertEquals(1, tableBAgggregatedColumns.size()); + MaterializedViewSelector.AggregatedColumn aggregatedColumn2 = tableBAgggregatedColumns.iterator().next(); + Assert.assertEquals("c1", Deencapsulation.getField(aggregatedColumn2, "columnName")); + Assert.assertTrue("MAX".equalsIgnoreCase(Deencapsulation.getField(aggregatedColumn2, "aggFunctionName"))); + Map> columnNamesInQueryOutput = + Deencapsulation.getField(materializedViewSelector, "columnNamesInQueryOutput"); + Assert.assertEquals(2, columnNamesInQueryOutput.size()); + Set tableAColumnNamesInQueryOutput = columnNamesInQueryOutput.get("tableA"); + Assert.assertEquals(2, tableAColumnNamesInQueryOutput.size()); + Assert.assertTrue(tableAColumnNamesInQueryOutput.contains("c1")); + Assert.assertTrue(tableAColumnNamesInQueryOutput.contains("c2")); + Set tableBColumnNamesInQueryOutput = columnNamesInQueryOutput.get("tableB"); + Assert.assertEquals(1, tableBColumnNamesInQueryOutput.size()); + Assert.assertTrue(tableBColumnNamesInQueryOutput.contains("c1")); + } + + @Test + public void testCheckCompensatingPredicates(@Injectable SelectStmt selectStmt, @Injectable Analyzer analyzer) { + Set tableAColumnNames = Sets.newHashSet(); + tableAColumnNames.add("C1"); + Map> candidateIndexIdToSchema = Maps.newHashMap(); + List index1Columns = Lists.newArrayList(); + Column index1Column1 = new Column("c1", Type.INT, true, null, true, "", ""); + index1Columns.add(index1Column1); + candidateIndexIdToSchema.put(new Long(1), index1Columns); + List index2Columns = Lists.newArrayList(); + Column index2Column1 = new Column("c1", Type.INT, false, AggregateType.NONE, true, "", ""); + index2Columns.add(index2Column1); + candidateIndexIdToSchema.put(new Long(2), index2Columns); + List index3Columns = Lists.newArrayList(); + Column index3Column1 = new Column("c1", Type.INT, false, AggregateType.SUM, true, "", ""); + index3Columns.add(index3Column1); + candidateIndexIdToSchema.put(new Long(3), index3Columns); + List index4Columns = Lists.newArrayList(); + Column index4Column2 = new Column("c2", Type.INT, true, null, true, "", ""); + index4Columns.add(index4Column2); + candidateIndexIdToSchema.put(new Long(4), index4Columns); + new Expectations() { + { + selectStmt.getAggInfo(); + result = null; + selectStmt.getResultExprs(); + result = Lists.newArrayList(); + } + }; + + MaterializedViewSelector selector = new MaterializedViewSelector(selectStmt, analyzer); + Deencapsulation.invoke(selector, "checkCompensatingPredicates", tableAColumnNames, candidateIndexIdToSchema); + Assert.assertEquals(2, candidateIndexIdToSchema.size()); + Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(1))); + Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(2))); + } + + @Test + public void testCheckGrouping(@Injectable SelectStmt selectStmt, @Injectable Analyzer analyzer) { + Set tableAColumnNames = Sets.newHashSet(); + tableAColumnNames.add("C1"); + Map> candidateIndexIdToSchema = Maps.newHashMap(); + List index1Columns = Lists.newArrayList(); + Column index1Column1 = new Column("c2", Type.INT, true, null, true, "", ""); + index1Columns.add(index1Column1); + candidateIndexIdToSchema.put(new Long(1), index1Columns); + List index2Columns = Lists.newArrayList(); + Column index2Column1 = new Column("c1", Type.INT, true, null, true, "", ""); + index2Columns.add(index2Column1); + Column index2Column2 = new Column("c2", Type.INT, false, AggregateType.SUM, true, "", ""); + index2Columns.add(index2Column2); + candidateIndexIdToSchema.put(new Long(2), index2Columns); + List index3Columns = Lists.newArrayList(); + Column index3Column1 = new Column("c2", Type.INT, true, null, true, "", ""); + index3Columns.add(index3Column1); + Column index3Column2 = new Column("c1", Type.INT, false, AggregateType.SUM, true, "", ""); + index3Columns.add(index3Column2); + candidateIndexIdToSchema.put(new Long(3), index3Columns); + new Expectations() { + { + selectStmt.getAggInfo(); + result = null; + selectStmt.getResultExprs(); + result = Lists.newArrayList(); + } + }; + + MaterializedViewSelector selector = new MaterializedViewSelector(selectStmt, analyzer); + Deencapsulation.setField(selector, "isSPJQuery", false); + Deencapsulation.invoke(selector, "checkGrouping", tableAColumnNames, candidateIndexIdToSchema); + Assert.assertEquals(2, candidateIndexIdToSchema.size()); + Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(1))); + Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(2))); + } + + @Test + public void testCheckAggregationFunction(@Injectable SelectStmt selectStmt, @Injectable Analyzer analyzer) { + Map> candidateIndexIdToSchema = Maps.newHashMap(); + List index1Columns = Lists.newArrayList(); + Column index1Column1 = new Column("c2", Type.INT, true, null, true, "", ""); + index1Columns.add(index1Column1); + candidateIndexIdToSchema.put(new Long(1), index1Columns); + List index2Columns = Lists.newArrayList(); + Column index2Column1 = new Column("c1", Type.INT, true, null, true, "", ""); + index2Columns.add(index2Column1); + Column index2Column2 = new Column("c2", Type.INT, false, AggregateType.SUM, true, "", ""); + index2Columns.add(index2Column2); + candidateIndexIdToSchema.put(new Long(2), index2Columns); + List index3Columns = Lists.newArrayList(); + Column index3Column1 = new Column("c2", Type.INT, true, null, true, "", ""); + index3Columns.add(index3Column1); + Column index3Column2 = new Column("c1", Type.INT, false, AggregateType.SUM, true, "", ""); + index3Columns.add(index3Column2); + candidateIndexIdToSchema.put(new Long(3), index3Columns); + new Expectations() { + { + selectStmt.getAggInfo(); + result = null; + selectStmt.getResultExprs(); + result = Lists.newArrayList(); + } + }; + + MaterializedViewSelector selector = new MaterializedViewSelector(selectStmt, analyzer); + MaterializedViewSelector.AggregatedColumn aggregatedColumn = Deencapsulation.newInnerInstance + (MaterializedViewSelector.AggregatedColumn.class, selector, "C1", "sum"); + Set aggregatedColumnsInQueryOutput = Sets.newHashSet(); + aggregatedColumnsInQueryOutput.add(aggregatedColumn); + Deencapsulation.setField(selector, "isSPJQuery", false); + Deencapsulation.invoke(selector, "checkAggregationFunction", aggregatedColumnsInQueryOutput, + candidateIndexIdToSchema); + Assert.assertEquals(2, candidateIndexIdToSchema.size()); + Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(1))); + Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(3))); + } + + @Test + public void testCheckOutputColumns(@Injectable SelectStmt selectStmt, @Injectable Analyzer analyzer) { + Map> candidateIndexIdToSchema = Maps.newHashMap(); + List index1Columns = Lists.newArrayList(); + Column index1Column1 = new Column("c2", Type.INT, true, null, true, "", ""); + index1Columns.add(index1Column1); + candidateIndexIdToSchema.put(new Long(1), index1Columns); + List index2Columns = Lists.newArrayList(); + Column index2Column1 = new Column("c1", Type.INT, true, null, true, "", ""); + index2Columns.add(index2Column1); + Column index2Column2 = new Column("c2", Type.INT, false, AggregateType.NONE, true, "", ""); + index2Columns.add(index2Column2); + candidateIndexIdToSchema.put(new Long(2), index2Columns); + List index3Columns = Lists.newArrayList(); + Column index3Column1 = new Column("C2", Type.INT, true, null, true, "", ""); + index3Columns.add(index3Column1); + Column index3Column2 = new Column("c1", Type.INT, false, AggregateType.SUM, true, "", ""); + index3Columns.add(index3Column2); + candidateIndexIdToSchema.put(new Long(3), index3Columns); + new Expectations() { + { + selectStmt.getAggInfo(); + result = null; + selectStmt.getResultExprs(); + result = Lists.newArrayList(); + } + }; + + MaterializedViewSelector selector = new MaterializedViewSelector(selectStmt, analyzer); + Set columnNamesInQueryOutput = Sets.newHashSet(); + columnNamesInQueryOutput.add("c1"); + columnNamesInQueryOutput.add("c2"); + Deencapsulation.invoke(selector, "checkOutputColumns", columnNamesInQueryOutput, + candidateIndexIdToSchema); + Assert.assertEquals(2, candidateIndexIdToSchema.size()); + Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(2))); + Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(3))); + } + + @Test + public void testCompensateIndex(@Injectable SelectStmt selectStmt, @Injectable Analyzer analyzer) { + Map> candidateIndexIdToSchema = Maps.newHashMap(); + Map> allVisibleIndexes = Maps.newHashMap(); + List index1Columns = Lists.newArrayList(); + Column index1Column1 = new Column("c2", Type.INT, true, null, true, "", ""); + index1Columns.add(index1Column1); + allVisibleIndexes.put(new Long(1), index1Columns); + List index2Columns = Lists.newArrayList(); + Column index2Column1 = new Column("c1", Type.INT, true, null, true, "", ""); + index2Columns.add(index2Column1); + Column index2Column2 = new Column("c2", Type.INT, false, AggregateType.NONE, true, "", ""); + index2Columns.add(index2Column2); + allVisibleIndexes.put(new Long(2), index2Columns); + List index3Columns = Lists.newArrayList(); + Column index3Column1 = new Column("C2", Type.INT, true, null, true, "", ""); + index3Columns.add(index3Column1); + Column index3Column2 = new Column("c1", Type.INT, false, AggregateType.SUM, true, "", ""); + index3Columns.add(index3Column2); + allVisibleIndexes.put(new Long(3), index3Columns); + new Expectations() { + { + selectStmt.getAggInfo(); + result = null; + selectStmt.getResultExprs(); + result = Lists.newArrayList(); + } + }; + + MaterializedViewSelector selector = new MaterializedViewSelector(selectStmt, analyzer); + Deencapsulation.invoke(selector, "compensateIndex", candidateIndexIdToSchema, + allVisibleIndexes, 2); + Assert.assertEquals(2, candidateIndexIdToSchema.size()); + Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(2))); + Assert.assertTrue(candidateIndexIdToSchema.keySet().contains(new Long(3))); + } + + @Test + public void testSelectBestRowCountIndex(@Injectable SelectStmt selectStmt, @Injectable Analyzer analyzer) { + Map> candidateIndexIdToSchema = Maps.newHashMap(); + List index1Columns = Lists.newArrayList(); + Column index1Column1 = new Column("c1", Type.INT, true, null, true, "", ""); + index1Columns.add(index1Column1); + Column index1Column2 = new Column("c2", Type.INT, false, AggregateType.NONE, true, "", ""); + index1Columns.add(index1Column2); + Column index1Column3 = new Column("c3", Type.INT, false, AggregateType.NONE, true, "", ""); + index1Columns.add(index1Column3); + candidateIndexIdToSchema.put(new Long(1), index1Columns); + List index2Columns = Lists.newArrayList(); + Column index2Column1 = new Column("c2", Type.INT, true, null, true, "", ""); + index2Columns.add(index2Column1); + Column index2Column2 = new Column("c1", Type.INT, false, AggregateType.NONE, true, "", ""); + index2Columns.add(index2Column2); + Column index2Column3 = new Column("c3", Type.INT, false, AggregateType.NONE, true, "", ""); + index2Columns.add(index2Column3); + candidateIndexIdToSchema.put(new Long(2), index2Columns); + List index3Columns = Lists.newArrayList(); + Column index3Column1 = new Column("c1", Type.INT, true, null, true, "", ""); + index3Columns.add(index3Column1); + Column index3Column2 = new Column("c3", Type.INT, false, AggregateType.NONE, true, "", ""); + index3Columns.add(index3Column2); + Column index3Column3 = new Column("c2", Type.INT, false, AggregateType.NONE, true, "", ""); + index3Columns.add(index3Column3); + candidateIndexIdToSchema.put(new Long(3), index3Columns); + new Expectations() { + { + selectStmt.getAggInfo(); + result = null; + selectStmt.getResultExprs(); + result = Lists.newArrayList(); + } + }; + Set equivalenceColumns = Sets.newHashSet(); + equivalenceColumns.add("c1"); + equivalenceColumns.add("c2"); + Set unequivalenceColumns = Sets.newHashSet(); + unequivalenceColumns.add("c3"); + + MaterializedViewSelector selector = new MaterializedViewSelector(selectStmt, analyzer); + Set result = Deencapsulation.invoke(selector, "matchBestPrefixIndex", candidateIndexIdToSchema, + equivalenceColumns, unequivalenceColumns); + Assert.assertEquals(2, result.size()); + Assert.assertTrue(result.contains(new Long(1))); + Assert.assertTrue(result.contains(new Long(2))); + } + + +} diff --git a/fe/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java b/fe/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java index 3aa7e7493d..9018b218db 100644 --- a/fe/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java +++ b/fe/src/test/java/org/apache/doris/utframe/AnotherDemoTest.java @@ -117,10 +117,7 @@ public class AnotherDemoTest { @AfterClass public static void TearDown() { - try { - FileUtils.deleteDirectory(new File(runningDirBase)); - } catch (IOException e) { - } + UtFrameUtils.cleanDorisFeDir(runningDirBase); } // generate all port from valid ports diff --git a/fe/src/test/java/org/apache/doris/utframe/DemoTest.java b/fe/src/test/java/org/apache/doris/utframe/DemoTest.java index 9bfeb3b388..572385c334 100644 --- a/fe/src/test/java/org/apache/doris/utframe/DemoTest.java +++ b/fe/src/test/java/org/apache/doris/utframe/DemoTest.java @@ -26,24 +26,15 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.DdlException; -import org.apache.doris.common.Pair; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.Planner; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.utframe.MockedBackendFactory.DefaultBeThriftServiceImpl; -import org.apache.doris.utframe.MockedBackendFactory.DefaultHeartbeatServiceImpl; -import org.apache.doris.utframe.MockedBackendFactory.DefaultPBackendServiceImpl; import org.apache.doris.utframe.MockedFrontend.EnvVarNotSetException; import org.apache.doris.utframe.MockedFrontend.FeStartException; import org.apache.doris.utframe.MockedFrontend.NotInitException; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -51,7 +42,6 @@ import org.junit.Test; import java.io.File; import java.io.IOException; -import java.nio.file.Files; import java.util.List; import java.util.Map; import java.util.UUID; @@ -66,15 +56,6 @@ import java.util.UUID; */ public class DemoTest { - private static int fe_http_port; - private static int fe_rpc_port; - private static int fe_query_port; - private static int fe_edit_log_port; - - private static int be_heartbeat_port; - private static int be_thrift_port; - private static int be_brpc_port; - private static int be_http_port; // use a unique dir so that it won't be conflict with other unit test which // may also start a Mocked Frontend private static String runningDirBase = "fe"; @@ -83,63 +64,13 @@ public class DemoTest { @BeforeClass public static void beforeClass() throws EnvVarNotSetException, IOException, FeStartException, NotInitException, DdlException, InterruptedException { - // get DORIS_HOME - String dorisHome = System.getenv("DORIS_HOME"); - if (Strings.isNullOrEmpty(dorisHome)) { - dorisHome = Files.createTempDirectory("DORIS_HOME").toAbsolutePath().toString(); - } - - getPorts(); - - // start fe in "DORIS_HOME/fe/mocked/" - MockedFrontend frontend = MockedFrontend.getInstance(); - Map feConfMap = Maps.newHashMap(); - // set additional fe config - feConfMap.put("http_port", String.valueOf(fe_http_port)); - feConfMap.put("rpc_port", String.valueOf(fe_rpc_port)); - feConfMap.put("query_port", String.valueOf(fe_query_port)); - feConfMap.put("edit_log_port", String.valueOf(fe_edit_log_port)); - feConfMap.put("tablet_create_timeout_second", "10"); - frontend.init(dorisHome + "/" + runningDir, feConfMap); - frontend.start(new String[0]); - - // start be - MockedBackend backend = MockedBackendFactory.createBackend("127.0.0.1", - be_heartbeat_port, be_thrift_port, be_brpc_port, be_http_port, - new DefaultHeartbeatServiceImpl(be_thrift_port, be_http_port, be_brpc_port), - new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl()); - backend.setFeAddress(new TNetworkAddress("127.0.0.1", frontend.getRpcPort())); - backend.start(); - - // add be - List> bes = Lists.newArrayList(); - bes.add(Pair.create(backend.getHost(), backend.getHeartbeatPort())); - Catalog.getCurrentSystemInfo().addBackends(bes, false, "default_cluster"); - - // sleep to wait first heartbeat - Thread.sleep(6000); + UtFrameUtils.createMinDorisCluster(runningDir); } @AfterClass public static void TearDown() { - try { - FileUtils.deleteDirectory(new File(runningDirBase)); - } catch (IOException e) { - } - } - - // generate all port from valid ports - private static void getPorts() { - fe_http_port = UtFrameUtils.findValidPort(); - fe_rpc_port = UtFrameUtils.findValidPort(); - fe_query_port = UtFrameUtils.findValidPort(); - fe_edit_log_port = UtFrameUtils.findValidPort(); - - be_heartbeat_port = UtFrameUtils.findValidPort(); - be_thrift_port = UtFrameUtils.findValidPort(); - be_brpc_port = UtFrameUtils.findValidPort(); - be_http_port = UtFrameUtils.findValidPort(); + UtFrameUtils.cleanDorisFeDir(runningDirBase); } @Test diff --git a/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java b/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java new file mode 100644 index 0000000000..02de02a510 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/utframe/DorisAssert.java @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.utframe; + +import org.apache.doris.alter.AlterJobV2; +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateMaterializedViewStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.DropTableStmt; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.Config; +import org.apache.doris.planner.Planner; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TExplainLevel; + +import org.apache.commons.lang.StringUtils; +import org.junit.Assert; + +import java.io.IOException; +import java.util.Map; +import java.util.stream.Stream; + +public class DorisAssert { + + private ConnectContext ctx; + + public DorisAssert() throws IOException { + this.ctx = UtFrameUtils.createDefaultCtx(); + } + + public DorisAssert withEnableMV() { + ctx.getSessionVariable().setTestMaterializedView(true); + Config.enable_materialized_view = true; + return this; + } + + public DorisAssert withDatabase(String dbName) throws Exception { + CreateDbStmt createDbStmt = + (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt("create database " + dbName + ";", ctx); + Catalog.getCurrentCatalog().createDb(createDbStmt); + return this; + } + + public DorisAssert useDatabase(String dbName) { + ctx.setDatabase(ClusterNamespace.getFullName(SystemInfoService.DEFAULT_CLUSTER, dbName)); + return this; + } + + public DorisAssert withTable(String sql) throws Exception { + CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); + Catalog.getCurrentCatalog().createTable(createTableStmt); + return this; + } + + public DorisAssert dropTable(String tableName) throws Exception { + DropTableStmt dropTableStmt = + (DropTableStmt) UtFrameUtils.parseAndAnalyzeStmt("drop table " + tableName + ";", ctx); + Catalog.getCurrentCatalog().dropTable(dropTableStmt); + return this; + } + + // Add materialized view to the schema + public DorisAssert withMaterializedView(String sql) throws Exception { + CreateMaterializedViewStmt createMaterializedViewStmt = + (CreateMaterializedViewStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, ctx); + Catalog.getCurrentCatalog().createMaterializedView(createMaterializedViewStmt); + // check alter job + Map alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2(); + for (AlterJobV2 alterJobV2 : alterJobs.values()) { + while (!alterJobV2.getJobState().isFinalState()) { + System.out.println("alter job " + alterJobV2.getDbId() + " is running. state: " + alterJobV2.getJobState()); + Thread.sleep(5000); + } + System.out.println("alter job " + alterJobV2.getDbId() + " is done. state: " + alterJobV2.getJobState()); + Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState()); + } + // waiting table state to normal + Thread.sleep(5000); + return this; + } + + public QueryAssert query(String sql) { + return new QueryAssert(ctx, sql); + } + + public class QueryAssert { + private ConnectContext connectContext; + private String sql; + + public QueryAssert(ConnectContext connectContext, String sql) { + this.connectContext = connectContext; + this.sql = sql; + } + + public void explainContains(String... keywords) throws Exception { + Assert.assertTrue(Stream.of(keywords).allMatch(explainQuery()::contains)); + } + + public void explainContains(String keywords, int count) throws Exception { + Assert.assertTrue(StringUtils.countMatches(explainQuery(), keywords) == count); + } + + public void explainWithout(String s) throws Exception { + Assert.assertFalse(explainQuery().contains(s)); + } + + private String explainQuery() throws Exception { + StmtExecutor stmtExecutor = new StmtExecutor(connectContext, "explain " + sql); + stmtExecutor.execute(); + Planner planner = stmtExecutor.planner(); + String explainString = planner.getExplainString(planner.getFragments(), TExplainLevel.VERBOSE); + System.out.println(explainString); + return explainString; + } + } +} diff --git a/fe/src/test/java/org/apache/doris/utframe/MockedFrontend.java b/fe/src/test/java/org/apache/doris/utframe/MockedFrontend.java index 0768481001..73c5595c43 100644 --- a/fe/src/test/java/org/apache/doris/utframe/MockedFrontend.java +++ b/fe/src/test/java/org/apache/doris/utframe/MockedFrontend.java @@ -195,7 +195,7 @@ public class MockedFrontend { Thread feThread = new Thread(new FERunnable(this, args), FE_PROCESS); feThread.start(); // wait the catalog to be ready until timeout (30 seconds) - waitForCatalogReady(30 * 1000); + waitForCatalogReady(120 * 1000); System.out.println("Fe process is started"); } diff --git a/fe/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/src/test/java/org/apache/doris/utframe/UtFrameUtils.java index 6348ffdd97..58de9a4b10 100644 --- a/fe/src/test/java/org/apache/doris/utframe/UtFrameUtils.java +++ b/fe/src/test/java/org/apache/doris/utframe/UtFrameUtils.java @@ -44,6 +44,9 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.commons.io.FileUtils; + +import java.io.File; import java.io.IOException; import java.io.StringReader; import java.net.ServerSocket; @@ -52,7 +55,9 @@ import java.nio.file.Files; import java.util.List; import java.util.Map; + public class UtFrameUtils { + // Help to create a mocked ConnectContext. public static ConnectContext createDefaultCtx() throws IOException { SocketChannel channel = SocketChannel.open(); @@ -131,6 +136,13 @@ public class UtFrameUtils { Thread.sleep(6000); } + public static void cleanDorisFeDir(String baseDir) { + try { + FileUtils.deleteDirectory(new File(baseDir)); + } catch (IOException e) { + } + } + public static int findValidPort() { ServerSocket socket = null; try {