Refactor the code selecting rollup (#947)

This commit is contained in:
chenhao
2019-04-19 14:49:12 +08:00
committed by ZHAO Chun
parent 22dc6119b9
commit f273f59844
3 changed files with 316 additions and 247 deletions

View File

@ -125,6 +125,8 @@ public class BinaryPredicate extends Predicate implements Writable {
}
public boolean isEquivalence() { return this == EQ; };
public boolean isUnequivalence() { return this == NE; }
}
private Operator op;

View File

@ -87,8 +87,6 @@ public class OlapScanNode extends ScanNode {
private boolean canTurnOnPreAggr = true;
private ArrayList<String> tupleColumns = new ArrayList<String>();
private HashSet<String> predicateColumns = new HashSet<String>();
private HashSet<String> inPredicateColumns = new HashSet<String>();
private HashSet<String> eqJoinColumns = new HashSet<String>();
private OlapTable olapTable = null;
private long selectedTabletsNum = 0;
private long totalTabletsNum = 0;
@ -169,193 +167,6 @@ public class OlapScanNode extends ScanNode {
}
}
// private void analyzeVectorizedConjuncts(Analyzer analyzer) throws InternalException {
// for (SlotDescriptor slot : desc.getSlots()) {
// for (Expr conjunct : conjuncts) {
// if (expr.isConstant()) {
// continue;
// }
// if (analyzer.isWhereClauseConjunct(conjunct)
// && expr.isBound(slot.getId())
// && conjunct.isVectorized()
// && conjunct instanceof Predicate) {
// conjunct.computeOutputColumn(analyzer);
// } else {
// Preconditions.checkState(false);
// }
// }
// }
// }
private List<MaterializedIndex> selectRollupIndex(Partition partition) throws UserException {
if (olapTable.getKeysType() == KeysType.DUP_KEYS) {
isPreAggregation = true;
}
List<MaterializedIndex> allIndices = Lists.newArrayList();
allIndices.add(partition.getBaseIndex());
allIndices.addAll(partition.getRollupIndices());
LOG.debug("num of rollup(base included): {}, pre aggr: {}", allIndices.size(), isPreAggregation);
// 1. find all rollup indexes which contains all tuple columns
List<MaterializedIndex> containTupleIndexes = Lists.newArrayList();
List<Column> baseIndexKeyColumns = olapTable.getKeyColumnsByIndexId(partition.getBaseIndex().getId());
for (MaterializedIndex index : allIndices) {
Set<String> indexColNames = Sets.newHashSet();
for (Column col : olapTable.getSchemaByIndexId(index.getId())) {
indexColNames.add(col.getName());
}
if (indexColNames.containsAll(tupleColumns)) {
// If preAggregation is off, so that we only can use base table
// or those rollup tables which key columns is the same with base table
// (often in different order)
if (isPreAggregation) {
LOG.debug("preAggregation is on. add index {} which contains all tuple columns", index.getId());
containTupleIndexes.add(index);
} else if (olapTable.getKeyColumnsByIndexId(index.getId()).size() == baseIndexKeyColumns.size()) {
LOG.debug("preAggregation is off, but index {} have same key columns with base index.",
index.getId());
containTupleIndexes.add(index);
}
} else {
LOG.debug("exclude index {} because it does not contain all tuple columns", index.getId());
}
}
if (containTupleIndexes.isEmpty()) {
throw new UserException("Failed to select index, no match index");
}
// 2. find all indexes which match the prefix most based on predicate/sort/in predicate columns
// from containTupleIndices.
List<MaterializedIndex> prefixMatchedIndexes = Lists.newArrayList();
int maxPrefixMatchCount = 0;
int prefixMatchCount = 0;
for (MaterializedIndex index : containTupleIndexes) {
prefixMatchCount = 0;
for (Column col : olapTable.getSchemaByIndexId(index.getId())) {
if (sortColumn != null) {
if (inPredicateColumns.contains(col.getName())) {
prefixMatchCount++;
} else if (sortColumn.equals(col.getName())) {
prefixMatchCount++;
break;
} else {
break;
}
} else {
if (predicateColumns.contains(col.getName())) {
break;
}
}
}
if (prefixMatchCount == maxPrefixMatchCount) {
LOG.debug("s2: find a equal prefix match index {}. match count: {}", index.getId(), prefixMatchCount);
prefixMatchedIndexes.add(index);
} else if (prefixMatchCount > maxPrefixMatchCount) {
LOG.debug("s2: find a better prefix match index {}. match count: {}", index.getId(), prefixMatchCount);
maxPrefixMatchCount = prefixMatchCount;
prefixMatchedIndexes.clear();
prefixMatchedIndexes.add(index);
}
}
// 3. find all indexes which match the prefix most based on equal join columns
// from containTupleIndices.
List<MaterializedIndex> eqJoinPrefixMatchedIndexes = Lists.newArrayList();
maxPrefixMatchCount = 0;
for (MaterializedIndex index : containTupleIndexes) {
prefixMatchCount = 0;
for (Column col : olapTable.getSchemaByIndexId(index.getId())) {
if (eqJoinColumns.contains(col.getName()) || predicateColumns.contains(col.getName())) {
prefixMatchCount++;
} else {
break;
}
}
if (prefixMatchCount == maxPrefixMatchCount) {
LOG.debug("s3: find a equal prefix match index {}. match count: {}", index.getId(), prefixMatchCount);
eqJoinPrefixMatchedIndexes.add(index);
} else if (prefixMatchCount > maxPrefixMatchCount) {
LOG.debug("s3: find a better prefix match index {}. match count: {}", index.getId(), prefixMatchCount);
maxPrefixMatchCount = prefixMatchCount;
eqJoinPrefixMatchedIndexes.clear();
eqJoinPrefixMatchedIndexes.add(index);
}
}
// 4. find the intersection of prefixMatchIndices and eqJoinPrefixMatchIndices as candidate indexes
List<MaterializedIndex> finalCandidateIndexes = Lists.newArrayList();
for (MaterializedIndex index : prefixMatchedIndexes) {
for (MaterializedIndex oneIndex : eqJoinPrefixMatchedIndexes) {
if (oneIndex.getId() == index.getId()) {
finalCandidateIndexes.add(index);
LOG.debug("find a matched index {} in intersection of "
+ "prefixMatchIndices and eqJoinPrefixMatchIndices",
index.getId());
}
}
}
// maybe there is no intersection between prefixMatchIndices and eqJoinPrefixMatchIndices.
// in this case, use prefixMatchIndices;
if (finalCandidateIndexes.isEmpty()) {
finalCandidateIndexes = prefixMatchedIndexes;
}
// 5. sorted the final candidate indexes by index id
// this is to make sure that candidate indexes find in all partitions will be returned in same order
Collections.sort(finalCandidateIndexes, new Comparator<MaterializedIndex>() {
@Override
public int compare(MaterializedIndex index1, MaterializedIndex index2)
{
return (int) (index1.getId() - index2.getId());
}
});
return finalCandidateIndexes;
}
private void normalizePredicate(Analyzer analyzer) throws UserException {
// 1. Get Columns which has eqJoin on it
List<Expr> eqJoinPredicate = analyzer.getEqJoinConjuncts(desc.getId());
for (Expr expr : eqJoinPredicate) {
for (SlotDescriptor slot : desc.getSlots()) {
for (int i = 0; i < 2; i++) {
if (expr.getChild(i).isBound(slot.getId())) {
eqJoinColumns.add(slot.getColumn().getName());
LOG.debug("Add eqJoinColumn: ColName=" + slot.getColumn().getName());
break;
}
}
}
}
// 2. Get Columns which has predicate on it
for (SlotDescriptor slot : desc.getSlots()) {
for (Expr expr : conjuncts) {
if (expr.isConstant()) {
continue;
}
if (expr.isBound(slot.getId())) {
predicateColumns.add(slot.getColumn().getName());
LOG.debug("Add predicateColumn: ColName=" + slot.getColumn().getName());
if (expr instanceof InPredicate) {
inPredicateColumns.add(slot.getColumn().getName());
LOG.debug("Add inPredicateColumn: ColName=" + slot.getColumn().getName());
}
}
}
}
// 3. Get Columns of this tuple
for (SlotDescriptor slot : desc.getSlots()) {
Column col = slot.getColumn();
tupleColumns.add(col.getName());
LOG.debug("Add tupleColumn: ColName=" + col.getName());
}
}
private Collection<Long> partitionPrune(PartitionInfo partitionInfo) throws AnalysisException {
PartitionPruner partitionPruner = null;
switch(partitionInfo.getType()) {
@ -500,86 +311,42 @@ public class OlapScanNode extends ScanNode {
}
private void getScanRangeLocations(Analyzer analyzer) throws UserException, AnalysisException {
normalizePredicate(analyzer);
long start = System.currentTimeMillis();
Collection<Long> partitionIds = partitionPrune(olapTable.getPartitionInfo());
if (partitionIds == null) {
partitionIds = new ArrayList<Long>();
for (Partition partition : olapTable.getPartitions()) {
partitionIds.add(partition.getId());
}
}
selectedPartitionNum = partitionIds.size();
LOG.debug("partition prune cost: {} ms, partitions: {}", (System.currentTimeMillis() - start), partitionIds);
start = System.currentTimeMillis();
// find all candidate rollups
int candidateTableSize = 0;
List<List<MaterializedIndex>> tables = Lists.newArrayList();
for (Long partitionId : partitionIds) {
Partition partition = olapTable.getPartition(partitionId);
List<MaterializedIndex> candidateTables = selectRollupIndex(partition);
if (candidateTableSize == 0) {
candidateTableSize = candidateTables.size();
} else {
if (candidateTableSize != candidateTables.size()) {
String errMsg = "two partition's candidate_table_size not equal, one is " + candidateTableSize
+ ", the other is" + candidateTables.size();
throw new AnalysisException(errMsg);
}
}
tables.add(candidateTables);
if (olapTable.getKeysType() == KeysType.DUP_KEYS) {
isPreAggregation = true;
}
// chose one rollup from candidate rollups
long minRowCount = Long.MAX_VALUE;
int partitionPos = -1;
for (int i = 0; i < candidateTableSize; i++) {
MaterializedIndex candidateIndex = null;
long rowCount = 0;
for (List<MaterializedIndex> candidateTables : tables) {
if (candidateIndex == null) {
candidateIndex = candidateTables.get(i);
} else {
if (candidateIndex.getId() != candidateTables.get(i).getId()) {
String errMsg = "two partition's candidate_table not equal, one is "
+ candidateIndex.getId() + ", the other is " + candidateTables.get(i).getId();
throw new AnalysisException(errMsg);
}
}
rowCount += candidateTables.get(i).getRowCount();
}
LOG.debug("rowCount={} for table={}", rowCount, candidateIndex.getId());
if (rowCount < minRowCount) {
minRowCount = rowCount;
selectedIndexId = tables.get(0).get(i).getId();
partitionPos = i;
} else if (rowCount == minRowCount) {
// check column number, select one minimum column number
int selectedColumnSize = olapTable.getIndexIdToSchema().get(selectedIndexId).size();
int currColumnSize = olapTable.getIndexIdToSchema().get(tables.get(0).get(i).getId()).size();
if (currColumnSize < selectedColumnSize) {
selectedIndexId = tables.get(0).get(i).getId();
partitionPos = i;
}
}
if (partitionIds.size() == 0) {
return;
}
final RollupSelector rollupSelector = new RollupSelector(analyzer, desc, olapTable);
selectedIndexId = rollupSelector.selectBestRollup(partitionIds, conjuncts, isPreAggregation);
long localBeId = -1;
if (Config.enable_local_replica_selection) {
localBeId = Catalog.getCurrentSystemInfo().getBackendIdByHost(FrontendOptions.getLocalHostAddress());
}
MaterializedIndex selectedTable = null;
int j = 0;
for (Long partitionId : partitionIds) {
Partition partition = olapTable.getPartition(partitionId);
LOG.debug("selected partition: " + partition.getName());
selectedTable = tables.get(j++).get(partitionPos);
List<Tablet> tablets = new ArrayList<Tablet>();
Collection<Long> tabletIds = distributionPrune(selectedTable, partition.getDistributionInfo());
final Partition partition = olapTable.getPartition(partitionId);
final MaterializedIndex selectedTable = partition.getIndex(selectedIndexId);
final List<Tablet> tablets = Lists.newArrayList();
final Collection<Long> tabletIds = distributionPrune(selectedTable, partition.getDistributionInfo());
LOG.debug("distribution prune tablets: {}", tabletIds);
List<Long> allTabletIds = selectedTable.getTabletIdsInOrder();

View File

@ -0,0 +1,300 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.UserException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.stream.Collectors;
public final class RollupSelector {
private static final Logger LOG = LogManager.getLogger(RollupSelector.class);
// Rollup's table info.
private final TupleDescriptor tupleDesc;
private final OlapTable table;
private final Analyzer analyzer;
public RollupSelector(Analyzer analyzer, TupleDescriptor tupleDesc, OlapTable table) {
this.analyzer = analyzer;
this.tupleDesc = tupleDesc;
this.table = table;
}
public long selectBestRollup(
Collection<Long> partitionIds, List<Expr> conjuncts, boolean isPreAggregation)
throws UserException {
Preconditions.checkArgument(partitionIds != null && !partitionIds.isEmpty(),
"Paritition can't be null or empty.");
// Get first partition to select best prefix index rollups, because MaterializedIndex ids in one rollup's partitions are all same.
final List<Long> bestPrefixIndexRollups =
selectBestPrefixIndexRollup(
table.getPartition(partitionIds.iterator().next()),
conjuncts,
isPreAggregation);
return selectBestRowCountRollup(bestPrefixIndexRollups, partitionIds);
}
private long selectBestRowCountRollup(List<Long> bestPrefixIndexRollups, Collection<Long> partitionIds) {
long minRowCount = Long.MAX_VALUE;
long selectedIndexId = 0;
for (Long indexId : bestPrefixIndexRollups) {
long rowCount = 0;
for (Long partitionId : partitionIds) {
rowCount += table.getPartition(partitionId).getIndex(indexId).getRowCount();
}
LOG.debug("rowCount={} for table={}", rowCount, indexId);
if (rowCount < minRowCount) {
minRowCount = rowCount;
selectedIndexId = indexId;
} else if (rowCount == minRowCount) {
// check column number, select one minimum column number
int selectedColumnSize = table.getIndexIdToSchema().get(selectedIndexId).size();
int currColumnSize = table.getIndexIdToSchema().get(indexId).size();
if (currColumnSize < selectedColumnSize) {
selectedIndexId = indexId;
}
}
}
return selectedIndexId;
}
private List<Long> selectBestPrefixIndexRollup(
Partition partition, List<Expr> conjuncts, boolean isPreAggregation) throws UserException {
final List<String> outputColumns = Lists.newArrayList();
for (SlotDescriptor slot : tupleDesc.getMaterializedSlots()) {
Column col = slot.getColumn();
outputColumns.add(col.getName());
}
final List<MaterializedIndex> rollups = Lists.newArrayList();
rollups.add(partition.getBaseIndex());
rollups.addAll(partition.getRollupIndices());
LOG.debug("num of rollup(base included): {}, pre aggr: {}", rollups.size(), isPreAggregation);
// 1. find all rollup indexes which contains all tuple columns
final List<MaterializedIndex> rollupsContainsOutput = Lists.newArrayList();
final List<Column> baseTableColumns = table.getKeyColumnsByIndexId(partition.getBaseIndex().getId());
for (MaterializedIndex rollup : rollups) {
final Set<String> rollupColumns = Sets.newHashSet();
table.getSchemaByIndexId(rollup.getId())
.stream().forEach(column -> rollupColumns.add(column.getName()));
if (rollupColumns.containsAll(outputColumns)) {
// If preAggregation is off, so that we only can use base table
// or those rollup tables which key columns is the same with base table
// (often in different order)
if (isPreAggregation) {
LOG.debug("preAggregation is on. add index {} which contains all output columns",
rollup.getId());
rollupsContainsOutput.add(rollup);
} else if (table.getKeyColumnsByIndexId(rollup.getId()).size() == baseTableColumns.size()) {
LOG.debug("preAggregation is off, but index {} have same key columns with base index.",
rollup.getId());
rollupsContainsOutput.add(rollup);
}
} else {
LOG.debug("exclude index {} because it does not contain all output columns", rollup.getId());
}
}
Preconditions.checkArgument(rollupsContainsOutput.size() > 0,
"Can't find candicate rollup contains all output columns.");
// 2. find all rollups which match the prefix most based on predicates column from containTupleIndices.
final Set<String> equivalenceColumns = Sets.newHashSet();
final Set<String> unequivalenceColumns = Sets.newHashSet();
collectColumns(conjuncts, equivalenceColumns, unequivalenceColumns);
final List<Long> rollupsMatchingBestPrefixIndex = Lists.newArrayList();
matchPrefixIndex(rollupsContainsOutput, rollupsMatchingBestPrefixIndex,
equivalenceColumns, unequivalenceColumns);
if (rollupsMatchingBestPrefixIndex.isEmpty()) {
rollupsContainsOutput.stream().forEach(n -> rollupsMatchingBestPrefixIndex.add(n.getId()));
}
// 3. sorted the final candidate indexes by index id
// this is to make sure that candidate indexes find in all partitions will be returned in same order
Collections.sort(rollupsMatchingBestPrefixIndex, new Comparator<Long>() {
@Override
public int compare(Long id1, Long id2) {
return (int) (id1 - id2);
}
});
return rollupsMatchingBestPrefixIndex;
}
private void matchPrefixIndex(List<MaterializedIndex> candidateRollups,
List<Long> rollupsMatchingBestPrefixIndex,
Set<String> equivalenceColumns,
Set<String> unequivalenceColumns) {
if (equivalenceColumns.size() == 0 && unequivalenceColumns.size() == 0) {
return;
}
int maxPrefixMatchCount = 0;
int prefixMatchCount;
for (MaterializedIndex index : candidateRollups) {
prefixMatchCount = 0;
for (Column col : table.getSchemaByIndexId(index.getId())) {
if (equivalenceColumns.contains(col.getName())) {
prefixMatchCount++;
} else if (unequivalenceColumns.contains(col.getName())) {
// Unequivalence predicate's columns can match only first column in rollup.
prefixMatchCount++;
break;
} else {
break;
}
}
if (prefixMatchCount == maxPrefixMatchCount) {
LOG.debug("s3: find a equal prefix match index {}. match count: {}", index.getId(), prefixMatchCount);
rollupsMatchingBestPrefixIndex.add(index.getId());
} else if (prefixMatchCount > maxPrefixMatchCount) {
LOG.debug("s3: find a better prefix match index {}. match count: {}", index.getId(), prefixMatchCount);
maxPrefixMatchCount = prefixMatchCount;
rollupsMatchingBestPrefixIndex.clear();
rollupsMatchingBestPrefixIndex.add(index.getId());
}
}
}
private void collectColumns(
List<Expr> conjuncts, Set<String> equivalenceColumns, Set<String> unequivalenceColumns) {
// 1. Get columns which has predicate on it.
for (Expr expr : conjuncts) {
if (!isPredicateUsedForPrefixIndex(expr, false)) {
continue;
}
for (SlotDescriptor slot : tupleDesc.getMaterializedSlots()) {
if (expr.isBound(slot.getId())) {
if (!isEquivalenceExpr(expr)) {
unequivalenceColumns.add(slot.getColumn().getName());
} else {
equivalenceColumns.add(slot.getColumn().getName());
}
break;
}
}
}
// 2. Equal join predicates when pushing inner child.
List<Expr> eqJoinPredicate = analyzer.getEqJoinConjuncts(tupleDesc.getId());
for (Expr expr : eqJoinPredicate) {
if (!isPredicateUsedForPrefixIndex(expr, true)) {
continue;
}
for (SlotDescriptor slot : tupleDesc.getMaterializedSlots()) {
for (int i = 0; i < 2; i++) {
if (expr.getChild(i).isBound(slot.getId())) {
equivalenceColumns.add(slot.getColumn().getName());
break;
}
}
}
}
}
private boolean isEquivalenceExpr(Expr expr) {
if (expr instanceof InPredicate) {
return true;
}
if (expr instanceof BinaryPredicate) {
final BinaryPredicate predicate = (BinaryPredicate) expr;
if (predicate.getOp().isEquivalence()) {
return true;
}
}
return false;
}
private boolean isPredicateUsedForPrefixIndex(Expr expr, boolean isJoinConjunct) {
if (!(expr instanceof InPredicate)
&& !(expr instanceof BinaryPredicate)) {
return false;
}
if (expr instanceof InPredicate) {
return isInPredicateUsedForPrefixIndex((InPredicate)expr);
} else if (expr instanceof BinaryPredicate) {
if (isJoinConjunct) {
return isEqualJoinConjunctUsedForPrefixIndex((BinaryPredicate)expr);
} else {
return isBinaryPredicateUsedForPrefixIndex((BinaryPredicate)expr);
}
}
return true;
}
private boolean isEqualJoinConjunctUsedForPrefixIndex(BinaryPredicate expr) {
Preconditions.checkArgument(expr.getOp().isEquivalence());
if (expr.isAuxExpr()) {
return false;
}
for (Expr child : expr.getChildren()) {
for (SlotDescriptor slot : tupleDesc.getMaterializedSlots()) {
if (child.isBound(slot.getId()) && isSlotRefNested(child)) {
return true;
}
}
}
return false;
}
private boolean isBinaryPredicateUsedForPrefixIndex(BinaryPredicate expr) {
if (expr.isAuxExpr() || expr.getOp().isUnequivalence()) {
return false;
}
return (isSlotRefNested(expr.getChild(0)) && expr.getChild(1).isConstant())
|| (isSlotRefNested(expr.getChild(1)) && expr.getChild(0).isConstant());
}
private boolean isInPredicateUsedForPrefixIndex(InPredicate expr) {
if (expr.isNotIn()) {
return false;
}
return isSlotRefNested(expr.getChild(0)) && expr.isLiteralChildren();
}
private boolean isSlotRefNested(Expr expr) {
while (expr instanceof CastExpr) {
expr = expr.getChild(0);
}
return expr instanceof SlotRef;
}
}