From f273f598441bfe634fc67d939c8e4ac4f47129df Mon Sep 17 00:00:00 2001 From: chenhao <510341142@qq.com> Date: Fri, 19 Apr 2019 14:49:12 +0800 Subject: [PATCH] Refactor the code selecting rollup (#947) --- .../doris/analysis/BinaryPredicate.java | 2 + .../apache/doris/planner/OlapScanNode.java | 261 +-------------- .../apache/doris/planner/RollupSelector.java | 300 ++++++++++++++++++ 3 files changed, 316 insertions(+), 247 deletions(-) create mode 100644 fe/src/main/java/org/apache/doris/planner/RollupSelector.java diff --git a/fe/src/main/java/org/apache/doris/analysis/BinaryPredicate.java b/fe/src/main/java/org/apache/doris/analysis/BinaryPredicate.java index 76915a370c..fcdbebd1e9 100644 --- a/fe/src/main/java/org/apache/doris/analysis/BinaryPredicate.java +++ b/fe/src/main/java/org/apache/doris/analysis/BinaryPredicate.java @@ -125,6 +125,8 @@ public class BinaryPredicate extends Predicate implements Writable { } public boolean isEquivalence() { return this == EQ; }; + + public boolean isUnequivalence() { return this == NE; } } private Operator op; diff --git a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java index 2efb62ef15..032ce71e07 100644 --- a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -87,8 +87,6 @@ public class OlapScanNode extends ScanNode { private boolean canTurnOnPreAggr = true; private ArrayList tupleColumns = new ArrayList(); private HashSet predicateColumns = new HashSet(); - private HashSet inPredicateColumns = new HashSet(); - private HashSet eqJoinColumns = new HashSet(); private OlapTable olapTable = null; private long selectedTabletsNum = 0; private long totalTabletsNum = 0; @@ -169,193 +167,6 @@ public class OlapScanNode extends ScanNode { } } - - // private void analyzeVectorizedConjuncts(Analyzer analyzer) throws InternalException { - // for (SlotDescriptor slot : desc.getSlots()) { - // for (Expr conjunct : conjuncts) { - // if (expr.isConstant()) { - // continue; - // } - // if (analyzer.isWhereClauseConjunct(conjunct) - // && expr.isBound(slot.getId()) - // && conjunct.isVectorized() - // && conjunct instanceof Predicate) { - // conjunct.computeOutputColumn(analyzer); - // } else { - // Preconditions.checkState(false); - // } - // } - // } - // } - - private List selectRollupIndex(Partition partition) throws UserException { - if (olapTable.getKeysType() == KeysType.DUP_KEYS) { - isPreAggregation = true; - } - - List allIndices = Lists.newArrayList(); - allIndices.add(partition.getBaseIndex()); - allIndices.addAll(partition.getRollupIndices()); - LOG.debug("num of rollup(base included): {}, pre aggr: {}", allIndices.size(), isPreAggregation); - - // 1. find all rollup indexes which contains all tuple columns - List containTupleIndexes = Lists.newArrayList(); - List baseIndexKeyColumns = olapTable.getKeyColumnsByIndexId(partition.getBaseIndex().getId()); - for (MaterializedIndex index : allIndices) { - Set indexColNames = Sets.newHashSet(); - for (Column col : olapTable.getSchemaByIndexId(index.getId())) { - indexColNames.add(col.getName()); - } - - if (indexColNames.containsAll(tupleColumns)) { - // If preAggregation is off, so that we only can use base table - // or those rollup tables which key columns is the same with base table - // (often in different order) - if (isPreAggregation) { - LOG.debug("preAggregation is on. add index {} which contains all tuple columns", index.getId()); - containTupleIndexes.add(index); - } else if (olapTable.getKeyColumnsByIndexId(index.getId()).size() == baseIndexKeyColumns.size()) { - LOG.debug("preAggregation is off, but index {} have same key columns with base index.", - index.getId()); - containTupleIndexes.add(index); - } - } else { - LOG.debug("exclude index {} because it does not contain all tuple columns", index.getId()); - } - } - - if (containTupleIndexes.isEmpty()) { - throw new UserException("Failed to select index, no match index"); - } - - // 2. find all indexes which match the prefix most based on predicate/sort/in predicate columns - // from containTupleIndices. - List prefixMatchedIndexes = Lists.newArrayList(); - int maxPrefixMatchCount = 0; - int prefixMatchCount = 0; - for (MaterializedIndex index : containTupleIndexes) { - prefixMatchCount = 0; - for (Column col : olapTable.getSchemaByIndexId(index.getId())) { - if (sortColumn != null) { - if (inPredicateColumns.contains(col.getName())) { - prefixMatchCount++; - } else if (sortColumn.equals(col.getName())) { - prefixMatchCount++; - break; - } else { - break; - } - } else { - if (predicateColumns.contains(col.getName())) { - break; - } - } - } - if (prefixMatchCount == maxPrefixMatchCount) { - LOG.debug("s2: find a equal prefix match index {}. match count: {}", index.getId(), prefixMatchCount); - prefixMatchedIndexes.add(index); - } else if (prefixMatchCount > maxPrefixMatchCount) { - LOG.debug("s2: find a better prefix match index {}. match count: {}", index.getId(), prefixMatchCount); - maxPrefixMatchCount = prefixMatchCount; - prefixMatchedIndexes.clear(); - prefixMatchedIndexes.add(index); - } - } - - // 3. find all indexes which match the prefix most based on equal join columns - // from containTupleIndices. - List eqJoinPrefixMatchedIndexes = Lists.newArrayList(); - maxPrefixMatchCount = 0; - for (MaterializedIndex index : containTupleIndexes) { - prefixMatchCount = 0; - for (Column col : olapTable.getSchemaByIndexId(index.getId())) { - if (eqJoinColumns.contains(col.getName()) || predicateColumns.contains(col.getName())) { - prefixMatchCount++; - } else { - break; - } - } - if (prefixMatchCount == maxPrefixMatchCount) { - LOG.debug("s3: find a equal prefix match index {}. match count: {}", index.getId(), prefixMatchCount); - eqJoinPrefixMatchedIndexes.add(index); - } else if (prefixMatchCount > maxPrefixMatchCount) { - LOG.debug("s3: find a better prefix match index {}. match count: {}", index.getId(), prefixMatchCount); - maxPrefixMatchCount = prefixMatchCount; - eqJoinPrefixMatchedIndexes.clear(); - eqJoinPrefixMatchedIndexes.add(index); - } - } - - // 4. find the intersection of prefixMatchIndices and eqJoinPrefixMatchIndices as candidate indexes - List finalCandidateIndexes = Lists.newArrayList(); - for (MaterializedIndex index : prefixMatchedIndexes) { - for (MaterializedIndex oneIndex : eqJoinPrefixMatchedIndexes) { - if (oneIndex.getId() == index.getId()) { - finalCandidateIndexes.add(index); - LOG.debug("find a matched index {} in intersection of " - + "prefixMatchIndices and eqJoinPrefixMatchIndices", - index.getId()); - } - } - } - // maybe there is no intersection between prefixMatchIndices and eqJoinPrefixMatchIndices. - // in this case, use prefixMatchIndices; - if (finalCandidateIndexes.isEmpty()) { - finalCandidateIndexes = prefixMatchedIndexes; - } - - // 5. sorted the final candidate indexes by index id - // this is to make sure that candidate indexes find in all partitions will be returned in same order - Collections.sort(finalCandidateIndexes, new Comparator() { - @Override - public int compare(MaterializedIndex index1, MaterializedIndex index2) - { - return (int) (index1.getId() - index2.getId()); - } - }); - return finalCandidateIndexes; - } - - private void normalizePredicate(Analyzer analyzer) throws UserException { - // 1. Get Columns which has eqJoin on it - List eqJoinPredicate = analyzer.getEqJoinConjuncts(desc.getId()); - for (Expr expr : eqJoinPredicate) { - for (SlotDescriptor slot : desc.getSlots()) { - for (int i = 0; i < 2; i++) { - if (expr.getChild(i).isBound(slot.getId())) { - eqJoinColumns.add(slot.getColumn().getName()); - LOG.debug("Add eqJoinColumn: ColName=" + slot.getColumn().getName()); - break; - } - } - } - } - - // 2. Get Columns which has predicate on it - for (SlotDescriptor slot : desc.getSlots()) { - for (Expr expr : conjuncts) { - if (expr.isConstant()) { - continue; - } - if (expr.isBound(slot.getId())) { - predicateColumns.add(slot.getColumn().getName()); - LOG.debug("Add predicateColumn: ColName=" + slot.getColumn().getName()); - if (expr instanceof InPredicate) { - inPredicateColumns.add(slot.getColumn().getName()); - LOG.debug("Add inPredicateColumn: ColName=" + slot.getColumn().getName()); - } - } - } - } - - // 3. Get Columns of this tuple - for (SlotDescriptor slot : desc.getSlots()) { - Column col = slot.getColumn(); - tupleColumns.add(col.getName()); - LOG.debug("Add tupleColumn: ColName=" + col.getName()); - } - } - private Collection partitionPrune(PartitionInfo partitionInfo) throws AnalysisException { PartitionPruner partitionPruner = null; switch(partitionInfo.getType()) { @@ -500,86 +311,42 @@ public class OlapScanNode extends ScanNode { } private void getScanRangeLocations(Analyzer analyzer) throws UserException, AnalysisException { - normalizePredicate(analyzer); - long start = System.currentTimeMillis(); Collection partitionIds = partitionPrune(olapTable.getPartitionInfo()); - + if (partitionIds == null) { partitionIds = new ArrayList(); for (Partition partition : olapTable.getPartitions()) { partitionIds.add(partition.getId()); } } + selectedPartitionNum = partitionIds.size(); LOG.debug("partition prune cost: {} ms, partitions: {}", (System.currentTimeMillis() - start), partitionIds); start = System.currentTimeMillis(); - // find all candidate rollups - int candidateTableSize = 0; - List> tables = Lists.newArrayList(); - for (Long partitionId : partitionIds) { - Partition partition = olapTable.getPartition(partitionId); - List candidateTables = selectRollupIndex(partition); - if (candidateTableSize == 0) { - candidateTableSize = candidateTables.size(); - } else { - if (candidateTableSize != candidateTables.size()) { - String errMsg = "two partition's candidate_table_size not equal, one is " + candidateTableSize - + ", the other is" + candidateTables.size(); - throw new AnalysisException(errMsg); - } - } - tables.add(candidateTables); + if (olapTable.getKeysType() == KeysType.DUP_KEYS) { + isPreAggregation = true; } - // chose one rollup from candidate rollups - long minRowCount = Long.MAX_VALUE; - int partitionPos = -1; - for (int i = 0; i < candidateTableSize; i++) { - MaterializedIndex candidateIndex = null; - long rowCount = 0; - for (List candidateTables : tables) { - if (candidateIndex == null) { - candidateIndex = candidateTables.get(i); - } else { - if (candidateIndex.getId() != candidateTables.get(i).getId()) { - String errMsg = "two partition's candidate_table not equal, one is " - + candidateIndex.getId() + ", the other is " + candidateTables.get(i).getId(); - throw new AnalysisException(errMsg); - } - } - rowCount += candidateTables.get(i).getRowCount(); - } - LOG.debug("rowCount={} for table={}", rowCount, candidateIndex.getId()); - if (rowCount < minRowCount) { - minRowCount = rowCount; - selectedIndexId = tables.get(0).get(i).getId(); - partitionPos = i; - } else if (rowCount == minRowCount) { - // check column number, select one minimum column number - int selectedColumnSize = olapTable.getIndexIdToSchema().get(selectedIndexId).size(); - int currColumnSize = olapTable.getIndexIdToSchema().get(tables.get(0).get(i).getId()).size(); - if (currColumnSize < selectedColumnSize) { - selectedIndexId = tables.get(0).get(i).getId(); - partitionPos = i; - } - } + if (partitionIds.size() == 0) { + return; } + final RollupSelector rollupSelector = new RollupSelector(analyzer, desc, olapTable); + selectedIndexId = rollupSelector.selectBestRollup(partitionIds, conjuncts, isPreAggregation); + long localBeId = -1; if (Config.enable_local_replica_selection) { localBeId = Catalog.getCurrentSystemInfo().getBackendIdByHost(FrontendOptions.getLocalHostAddress()); } - MaterializedIndex selectedTable = null; - int j = 0; + for (Long partitionId : partitionIds) { - Partition partition = olapTable.getPartition(partitionId); - LOG.debug("selected partition: " + partition.getName()); - selectedTable = tables.get(j++).get(partitionPos); - List tablets = new ArrayList(); - Collection tabletIds = distributionPrune(selectedTable, partition.getDistributionInfo()); + final Partition partition = olapTable.getPartition(partitionId); + final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId); + final List tablets = Lists.newArrayList(); + final Collection tabletIds = distributionPrune(selectedTable, partition.getDistributionInfo()); LOG.debug("distribution prune tablets: {}", tabletIds); List allTabletIds = selectedTable.getTabletIdsInOrder(); diff --git a/fe/src/main/java/org/apache/doris/planner/RollupSelector.java b/fe/src/main/java/org/apache/doris/planner/RollupSelector.java new file mode 100644 index 0000000000..b62e57a578 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/planner/RollupSelector.java @@ -0,0 +1,300 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.CastExpr; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.common.UserException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.*; +import java.util.stream.Collectors; + +public final class RollupSelector { + private static final Logger LOG = LogManager.getLogger(RollupSelector.class); + + // Rollup's table info. + private final TupleDescriptor tupleDesc; + private final OlapTable table; + private final Analyzer analyzer; + + public RollupSelector(Analyzer analyzer, TupleDescriptor tupleDesc, OlapTable table) { + this.analyzer = analyzer; + this.tupleDesc = tupleDesc; + this.table = table; + } + + public long selectBestRollup( + Collection partitionIds, List conjuncts, boolean isPreAggregation) + throws UserException { + Preconditions.checkArgument(partitionIds != null && !partitionIds.isEmpty(), + "Paritition can't be null or empty."); + // Get first partition to select best prefix index rollups, because MaterializedIndex ids in one rollup's partitions are all same. + final List bestPrefixIndexRollups = + selectBestPrefixIndexRollup( + table.getPartition(partitionIds.iterator().next()), + conjuncts, + isPreAggregation); + return selectBestRowCountRollup(bestPrefixIndexRollups, partitionIds); + } + + private long selectBestRowCountRollup(List bestPrefixIndexRollups, Collection partitionIds) { + long minRowCount = Long.MAX_VALUE; + long selectedIndexId = 0; + for (Long indexId : bestPrefixIndexRollups) { + long rowCount = 0; + for (Long partitionId : partitionIds) { + rowCount += table.getPartition(partitionId).getIndex(indexId).getRowCount(); + } + LOG.debug("rowCount={} for table={}", rowCount, indexId); + if (rowCount < minRowCount) { + minRowCount = rowCount; + selectedIndexId = indexId; + } else if (rowCount == minRowCount) { + // check column number, select one minimum column number + int selectedColumnSize = table.getIndexIdToSchema().get(selectedIndexId).size(); + int currColumnSize = table.getIndexIdToSchema().get(indexId).size(); + if (currColumnSize < selectedColumnSize) { + selectedIndexId = indexId; + } + } + } + return selectedIndexId; + } + + private List selectBestPrefixIndexRollup( + Partition partition, List conjuncts, boolean isPreAggregation) throws UserException { + + final List outputColumns = Lists.newArrayList(); + for (SlotDescriptor slot : tupleDesc.getMaterializedSlots()) { + Column col = slot.getColumn(); + outputColumns.add(col.getName()); + } + + final List rollups = Lists.newArrayList(); + rollups.add(partition.getBaseIndex()); + rollups.addAll(partition.getRollupIndices()); + LOG.debug("num of rollup(base included): {}, pre aggr: {}", rollups.size(), isPreAggregation); + + // 1. find all rollup indexes which contains all tuple columns + final List rollupsContainsOutput = Lists.newArrayList(); + final List baseTableColumns = table.getKeyColumnsByIndexId(partition.getBaseIndex().getId()); + for (MaterializedIndex rollup : rollups) { + final Set rollupColumns = Sets.newHashSet(); + table.getSchemaByIndexId(rollup.getId()) + .stream().forEach(column -> rollupColumns.add(column.getName())); + + if (rollupColumns.containsAll(outputColumns)) { + // If preAggregation is off, so that we only can use base table + // or those rollup tables which key columns is the same with base table + // (often in different order) + if (isPreAggregation) { + LOG.debug("preAggregation is on. add index {} which contains all output columns", + rollup.getId()); + rollupsContainsOutput.add(rollup); + } else if (table.getKeyColumnsByIndexId(rollup.getId()).size() == baseTableColumns.size()) { + LOG.debug("preAggregation is off, but index {} have same key columns with base index.", + rollup.getId()); + rollupsContainsOutput.add(rollup); + } + } else { + LOG.debug("exclude index {} because it does not contain all output columns", rollup.getId()); + } + } + + Preconditions.checkArgument(rollupsContainsOutput.size() > 0, + "Can't find candicate rollup contains all output columns."); + + + // 2. find all rollups which match the prefix most based on predicates column from containTupleIndices. + final Set equivalenceColumns = Sets.newHashSet(); + final Set unequivalenceColumns = Sets.newHashSet(); + collectColumns(conjuncts, equivalenceColumns, unequivalenceColumns); + final List rollupsMatchingBestPrefixIndex = Lists.newArrayList(); + matchPrefixIndex(rollupsContainsOutput, rollupsMatchingBestPrefixIndex, + equivalenceColumns, unequivalenceColumns); + + if (rollupsMatchingBestPrefixIndex.isEmpty()) { + rollupsContainsOutput.stream().forEach(n -> rollupsMatchingBestPrefixIndex.add(n.getId())); + } + + // 3. sorted the final candidate indexes by index id + // this is to make sure that candidate indexes find in all partitions will be returned in same order + Collections.sort(rollupsMatchingBestPrefixIndex, new Comparator() { + @Override + public int compare(Long id1, Long id2) { + return (int) (id1 - id2); + } + }); + return rollupsMatchingBestPrefixIndex; + } + + private void matchPrefixIndex(List candidateRollups, + List rollupsMatchingBestPrefixIndex, + Set equivalenceColumns, + Set unequivalenceColumns) { + if (equivalenceColumns.size() == 0 && unequivalenceColumns.size() == 0) { + return; + } + int maxPrefixMatchCount = 0; + int prefixMatchCount; + for (MaterializedIndex index : candidateRollups) { + prefixMatchCount = 0; + for (Column col : table.getSchemaByIndexId(index.getId())) { + if (equivalenceColumns.contains(col.getName())) { + prefixMatchCount++; + } else if (unequivalenceColumns.contains(col.getName())) { + // Unequivalence predicate's columns can match only first column in rollup. + prefixMatchCount++; + break; + } else { + break; + } + } + + if (prefixMatchCount == maxPrefixMatchCount) { + LOG.debug("s3: find a equal prefix match index {}. match count: {}", index.getId(), prefixMatchCount); + rollupsMatchingBestPrefixIndex.add(index.getId()); + } else if (prefixMatchCount > maxPrefixMatchCount) { + LOG.debug("s3: find a better prefix match index {}. match count: {}", index.getId(), prefixMatchCount); + maxPrefixMatchCount = prefixMatchCount; + rollupsMatchingBestPrefixIndex.clear(); + rollupsMatchingBestPrefixIndex.add(index.getId()); + } + } + } + + private void collectColumns( + List conjuncts, Set equivalenceColumns, Set unequivalenceColumns) { + + // 1. Get columns which has predicate on it. + for (Expr expr : conjuncts) { + if (!isPredicateUsedForPrefixIndex(expr, false)) { + continue; + } + for (SlotDescriptor slot : tupleDesc.getMaterializedSlots()) { + if (expr.isBound(slot.getId())) { + if (!isEquivalenceExpr(expr)) { + unequivalenceColumns.add(slot.getColumn().getName()); + } else { + equivalenceColumns.add(slot.getColumn().getName()); + } + break; + } + } + } + + // 2. Equal join predicates when pushing inner child. + List eqJoinPredicate = analyzer.getEqJoinConjuncts(tupleDesc.getId()); + for (Expr expr : eqJoinPredicate) { + if (!isPredicateUsedForPrefixIndex(expr, true)) { + continue; + } + for (SlotDescriptor slot : tupleDesc.getMaterializedSlots()) { + for (int i = 0; i < 2; i++) { + if (expr.getChild(i).isBound(slot.getId())) { + equivalenceColumns.add(slot.getColumn().getName()); + break; + } + } + } + } + } + + private boolean isEquivalenceExpr(Expr expr) { + if (expr instanceof InPredicate) { + return true; + } + if (expr instanceof BinaryPredicate) { + final BinaryPredicate predicate = (BinaryPredicate) expr; + if (predicate.getOp().isEquivalence()) { + return true; + } + } + return false; + } + + private boolean isPredicateUsedForPrefixIndex(Expr expr, boolean isJoinConjunct) { + if (!(expr instanceof InPredicate) + && !(expr instanceof BinaryPredicate)) { + return false; + } + if (expr instanceof InPredicate) { + return isInPredicateUsedForPrefixIndex((InPredicate)expr); + } else if (expr instanceof BinaryPredicate) { + if (isJoinConjunct) { + return isEqualJoinConjunctUsedForPrefixIndex((BinaryPredicate)expr); + } else { + return isBinaryPredicateUsedForPrefixIndex((BinaryPredicate)expr); + } + } + return true; + } + + private boolean isEqualJoinConjunctUsedForPrefixIndex(BinaryPredicate expr) { + Preconditions.checkArgument(expr.getOp().isEquivalence()); + if (expr.isAuxExpr()) { + return false; + } + for (Expr child : expr.getChildren()) { + for (SlotDescriptor slot : tupleDesc.getMaterializedSlots()) { + if (child.isBound(slot.getId()) && isSlotRefNested(child)) { + return true; + } + } + } + return false; + } + + private boolean isBinaryPredicateUsedForPrefixIndex(BinaryPredicate expr) { + if (expr.isAuxExpr() || expr.getOp().isUnequivalence()) { + return false; + } + return (isSlotRefNested(expr.getChild(0)) && expr.getChild(1).isConstant()) + || (isSlotRefNested(expr.getChild(1)) && expr.getChild(0).isConstant()); + } + + private boolean isInPredicateUsedForPrefixIndex(InPredicate expr) { + if (expr.isNotIn()) { + return false; + } + return isSlotRefNested(expr.getChild(0)) && expr.isLiteralChildren(); + } + + private boolean isSlotRefNested(Expr expr) { + while (expr instanceof CastExpr) { + expr = expr.getChild(0); + } + return expr instanceof SlotRef; + } +}