[JoinReorder] Implement a better join reorder algorithm. (#6226)
The current JoinReorder algorithm mainly sorts according to the star model, and only considers the query association relationship between the table and the table. The problems are following: 1. Only applicable to user data whose data model is a star model, data of other models cannot be sorted. 2. Regardless of the cost of the table, it is impossible to determine the size of the join table relationship, and the real query optimization ability is weak. 3. It is impossible to avoid possible time-consuming joins such as cross joins by sorting. The new JoinReorder algorithm mainly introduces a new sorting algorithm for Join The new ranking algorithm introduces the cost evaluation model to Doris. The sorting algorithm is mainly based on the following three principles: 1. The order is: Largest node, Smallest node. . . Second largest node 2. Cross join is better than Inner join 3. The right children of Outer join, semi join, and anti join do not move PlanNode's cost model evaluation mainly relies on two values: cardinality and selectivity. cardinality: cardinality, can also be simply understood as the number of rows. selectivity: selectivity, a value between 0 and 1. Predicate generally has selectivity. The cost model generally calculates the final cardinality of a PlanNode based on the pre-calculated cardinality of PlanNode and the selectivity of the predicate to which it belongs. Currently, you can configure "enable_cost_based_join_reorder" to control the opening and closing of JoinReorder. When the configuration is turned on, the new sorting algorithm will take effect, when it is turned off, the old sorting algorithm will take effect, and it is turned off by default. The new sorting algorithm currently has no cost base evaluation for external tables (odbc, es) and set calculations (intersect, except). When using these queries, it is not recommended to enable cost base join reorder. When using these queries, it is not recommended to enable cost base join reorder. At the code architecture level: 1. The new sorting algorithm occurs in the single-node execution planning stage. 2. Refactored the init and finalize phases of PlanNode to ensure that PlanNode planning and cost evaluation have been completed before the sorting algorithm occurs.
This commit is contained in:
@ -2733,6 +2733,10 @@ opt_alter_type ::=
|
||||
{:
|
||||
RESULT = ShowAlterStmt.AlterType.ROLLUP;
|
||||
:}
|
||||
| KW_MATERIALIZED KW_VIEW
|
||||
{:
|
||||
RESULT = ShowAlterStmt.AlterType.ROLLUP;
|
||||
:}
|
||||
| KW_COLUMN
|
||||
{:
|
||||
RESULT = ShowAlterStmt.AlterType.COLUMN;
|
||||
|
||||
@ -152,7 +152,7 @@ public class Analyzer {
|
||||
|
||||
// The runtime filter that is expected to be used
|
||||
private final List<RuntimeFilter> assignedRuntimeFilters = new ArrayList<>();
|
||||
|
||||
|
||||
public void setIsSubquery() {
|
||||
isSubquery = true;
|
||||
globalState.containsSubquery = true;
|
||||
@ -206,8 +206,8 @@ public class Analyzer {
|
||||
private final Map<TupleId, List<ExprId>> eqJoinConjuncts = Maps.newHashMap();
|
||||
|
||||
// set of conjuncts that have been assigned to some PlanNode
|
||||
private final Set<ExprId> assignedConjuncts =
|
||||
Collections.newSetFromMap(new IdentityHashMap<ExprId, Boolean>());
|
||||
private Set<ExprId> assignedConjuncts =
|
||||
Collections.newSetFromMap(new IdentityHashMap<ExprId, Boolean>());
|
||||
|
||||
// map from outer-joined tuple id, ie, one that is nullable in this select block,
|
||||
// to the last Join clause (represented by its rhs table ref) that outer-joined it
|
||||
@ -849,6 +849,15 @@ public class Analyzer {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* register expr id
|
||||
*
|
||||
* @param expr
|
||||
*/
|
||||
void registerExprId(Expr expr) {
|
||||
expr.setId(globalState.conjunctIdGenerator.getNextId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Register individual conjunct with all tuple and slot ids it references
|
||||
* and with the global conjunct list.
|
||||
@ -945,6 +954,16 @@ public class Analyzer {
|
||||
registerConjunct(p);
|
||||
}
|
||||
|
||||
public Set<ExprId> getAssignedConjuncts() {
|
||||
return Sets.newHashSet(globalState.assignedConjuncts);
|
||||
}
|
||||
|
||||
public void setAssignedConjuncts(Set<ExprId> assigned) {
|
||||
if (assigned != null) {
|
||||
globalState.assignedConjuncts = Sets.newHashSet(assigned);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return all unassigned registered conjuncts that are fully bound by the given
|
||||
* (logical) tuple ids, can be evaluated by 'tupleIds' and are not tied to an
|
||||
@ -952,7 +971,7 @@ public class Analyzer {
|
||||
*/
|
||||
public List<Expr> getUnassignedConjuncts(List<TupleId> tupleIds) {
|
||||
List<Expr> result = Lists.newArrayList();
|
||||
for (Expr e: getUnassignedConjuncts(tupleIds, true)) {
|
||||
for (Expr e : getUnassignedConjuncts(tupleIds, true)) {
|
||||
if (canEvalPredicate(tupleIds, e)) result.add(e);
|
||||
}
|
||||
return result;
|
||||
@ -1256,7 +1275,7 @@ public class Analyzer {
|
||||
}
|
||||
final Expr newConjunct = conjunct.getResultValue();
|
||||
if (newConjunct instanceof BoolLiteral) {
|
||||
final BoolLiteral value = (BoolLiteral)newConjunct;
|
||||
final BoolLiteral value = (BoolLiteral) newConjunct;
|
||||
if (!value.getValue()) {
|
||||
if (fromHavingClause) {
|
||||
hasEmptyResultSet_ = true;
|
||||
@ -1597,6 +1616,17 @@ public class Analyzer {
|
||||
public void setChangeResSmap(ExprSubstitutionMap changeResSmap) {
|
||||
this.changeResSmap = changeResSmap;
|
||||
}
|
||||
|
||||
// Load plan and query plan are the same framework
|
||||
// Some Load method in doris access through http protocol, which will cause the session may be empty.
|
||||
// In order to avoid the occurrence of null pointer exceptions, a check will be added here
|
||||
public boolean safeIsEnableJoinReorderBasedCost() {
|
||||
if (globalState.context == null) {
|
||||
return false;
|
||||
}
|
||||
return globalState.context.getSessionVariable().isEnableJoinReorderBasedCost();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if predicate 'e' can be correctly evaluated by a tree materializing
|
||||
* 'tupleIds', otherwise false:
|
||||
@ -1783,7 +1813,7 @@ public class Analyzer {
|
||||
* materialization decision be cost-based?
|
||||
*/
|
||||
public void markRefdSlots(Analyzer analyzer, PlanNode planRoot,
|
||||
List<Expr> outputExprs, AnalyticInfo analyticInfo) {
|
||||
List<Expr> outputExprs, AnalyticInfo analyticInfo) {
|
||||
if (planRoot == null) {
|
||||
return;
|
||||
}
|
||||
@ -1824,7 +1854,7 @@ public class Analyzer {
|
||||
|
||||
/**
|
||||
* Column conduction, can slot a value-transfer to slot b
|
||||
*
|
||||
* <p>
|
||||
* TODO(zxy) Use value-transfer graph to check
|
||||
*/
|
||||
public boolean hasValueTransfer(SlotId a, SlotId b) {
|
||||
@ -1834,7 +1864,7 @@ public class Analyzer {
|
||||
/**
|
||||
* Returns sorted slot IDs with value transfers from 'srcSid'.
|
||||
* Time complexity: O(V) where V = number of slots
|
||||
*
|
||||
* <p>
|
||||
* TODO(zxy) Use value-transfer graph to check
|
||||
*/
|
||||
public List<SlotId> getValueTransferTargets(SlotId srcSid) {
|
||||
@ -1848,8 +1878,8 @@ public class Analyzer {
|
||||
* to an outer-joined tuple.
|
||||
*/
|
||||
public boolean hasOuterJoinedValueTransferTarget(List<SlotId> sids) {
|
||||
for (SlotId srcSid: sids) {
|
||||
for (SlotId dstSid: getValueTransferTargets(srcSid)) {
|
||||
for (SlotId srcSid : sids) {
|
||||
for (SlotId dstSid : getValueTransferTargets(srcSid)) {
|
||||
if (isOuterJoined(getTupleId(dstSid))) return true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -622,6 +622,30 @@ public class BinaryPredicate extends Predicate implements Writable {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSelectivity() {
|
||||
switch(op) {
|
||||
case EQ:
|
||||
case EQ_FOR_NULL: {
|
||||
Reference<SlotRef> slotRefRef = new Reference<SlotRef>();
|
||||
boolean singlePredicate = isSingleColumnPredicate(slotRefRef, null);
|
||||
if (singlePredicate) {
|
||||
long distinctValues = slotRefRef.getRef().getNumDistinctValues();
|
||||
if (distinctValues != -1) {
|
||||
selectivity = 1.0 / distinctValues;
|
||||
}
|
||||
}
|
||||
break;
|
||||
} default: {
|
||||
// Reference hive
|
||||
selectivity = 1.0 / 3.0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return 31 * super.hashCode() + Objects.hashCode(op);
|
||||
|
||||
@ -83,7 +83,7 @@ public class DescriptorTable {
|
||||
for (SlotDescriptor slot: src.getSlots()) {
|
||||
copySlotDescriptor(d, slot);
|
||||
}
|
||||
d.computeMemLayout();
|
||||
d.computeStatAndMemLayout();
|
||||
return d;
|
||||
}
|
||||
|
||||
@ -122,14 +122,21 @@ public class DescriptorTable {
|
||||
}
|
||||
}
|
||||
|
||||
// Computes physical layout parameters of all descriptors.
|
||||
// Call this only after the last descriptor was added.
|
||||
@Deprecated
|
||||
public void computeMemLayout() {
|
||||
for (TupleDescriptor d : tupleDescs.values()) {
|
||||
d.computeMemLayout();
|
||||
}
|
||||
}
|
||||
|
||||
// Computes physical layout parameters of all descriptors and calculate the statistics of the tuple.
|
||||
// Call this only after the last descriptor was added.
|
||||
public void computeStatAndMemLayout() {
|
||||
for (TupleDescriptor d : tupleDescs.values()) {
|
||||
d.computeStatAndMemLayout();
|
||||
}
|
||||
}
|
||||
|
||||
public TDescriptorTable toThrift() {
|
||||
TDescriptorTable result = new TDescriptorTable();
|
||||
HashSet<Table> referencedTbls = Sets.newHashSet();
|
||||
|
||||
@ -63,7 +63,7 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
|
||||
private static final String NEGATE_FN = "negate";
|
||||
|
||||
// to be used where we can't come up with a better estimate
|
||||
protected static final double DEFAULT_SELECTIVITY = 0.1;
|
||||
public static final double DEFAULT_SELECTIVITY = 0.1;
|
||||
|
||||
public final static float FUNCTION_CALL_COST = 10;
|
||||
|
||||
@ -180,6 +180,10 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
|
||||
public boolean apply(Expr arg) { return arg instanceof NullLiteral; }
|
||||
};
|
||||
|
||||
public void setSelectivity() {
|
||||
selectivity = -1;
|
||||
}
|
||||
|
||||
/* TODO(zc)
|
||||
public final static com.google.common.base.Predicate<Expr>
|
||||
IS_NONDETERMINISTIC_BUILTIN_FN_PREDICATE =
|
||||
@ -299,6 +303,8 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
|
||||
return selectivity;
|
||||
}
|
||||
|
||||
public boolean hasSelectivity() { return selectivity >= 0; }
|
||||
|
||||
public long getNumDistinctValues() {
|
||||
return numDistinctValues;
|
||||
}
|
||||
@ -374,6 +380,9 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
|
||||
|
||||
// Do all the analysis for the expr subclass before marking the Expr analyzed.
|
||||
analyzeImpl(analyzer);
|
||||
if (analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
setSelectivity();
|
||||
}
|
||||
analysisDone();
|
||||
}
|
||||
|
||||
@ -1420,6 +1429,29 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the descriptor of the scan slot that directly or indirectly produces
|
||||
* the values of 'this' SlotRef. Traverses the source exprs of intermediate slot
|
||||
* descriptors to resolve materialization points (e.g., aggregations).
|
||||
* Returns null if 'e' or any source expr of 'e' is not a SlotRef or cast SlotRef.
|
||||
*/
|
||||
public SlotDescriptor findSrcScanSlot() {
|
||||
SlotRef slotRef = unwrapSlotRef(false);
|
||||
if (slotRef == null) {
|
||||
return null;
|
||||
}
|
||||
SlotDescriptor slotDesc = slotRef.getDesc();
|
||||
if (slotDesc.isScanSlot()) {
|
||||
return slotDesc;
|
||||
}
|
||||
if (slotDesc.getSourceExprs().size() == 1) {
|
||||
return slotDesc.getSourceExprs().get(0).findSrcScanSlot();
|
||||
}
|
||||
// No known source expr, or there are several source exprs meaning the slot is
|
||||
// has no single source table.
|
||||
return null;
|
||||
}
|
||||
|
||||
public static double getConstFromExpr(Expr e) throws AnalysisException{
|
||||
Preconditions.checkState(e.isConstant());
|
||||
double value = 0;
|
||||
|
||||
@ -17,11 +17,6 @@
|
||||
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Table;
|
||||
@ -31,10 +26,16 @@ import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.UserException;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Wraps a list of TableRef instances that form a FROM clause, allowing them to be
|
||||
* analyzed independently of the statement using them. To increase the flexibility of
|
||||
@ -61,23 +62,6 @@ public class FromClause implements ParseNode, Iterable<TableRef> {
|
||||
this.needToSql = needToSql;
|
||||
}
|
||||
|
||||
private void sortTableRefForSubquery(Analyzer analyzer) {
|
||||
Collections.sort(this.tableRefs_, new Comparator<TableRef>() {
|
||||
@Override
|
||||
public int compare(TableRef tableref1, TableRef tableref2) {
|
||||
int i1 = 0;
|
||||
int i2 = 0;
|
||||
if (tableref1.getOnClause() != null) {
|
||||
i1 = 1;
|
||||
}
|
||||
if (tableref2.getOnClause() != null) {
|
||||
i2 = 1;
|
||||
}
|
||||
return i1 - i2;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void checkFromHiveTable(Analyzer analyzer) throws AnalysisException {
|
||||
for (TableRef tblRef : tableRefs_) {
|
||||
if (!(tblRef instanceof BaseTableRef)) {
|
||||
@ -111,6 +95,33 @@ public class FromClause implements ParseNode, Iterable<TableRef> {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In some cases, the reorder method of select stmt will incorrectly sort the tableRef with on clause.
|
||||
* The meaning of this function is to reset those tableRefs with on clauses.
|
||||
* For example:
|
||||
* Origin stmt: select * from t1 inner join t2 on t1.k1=t2.k1
|
||||
* After analyze: select * from t2 on t1.k1=t2.k1 inner join t1
|
||||
*
|
||||
* If this statement just needs to be reanalyze (query rewriter), an error will be reported
|
||||
* because the table t1 in the on clause cannot be recognized.
|
||||
*/
|
||||
private void sortTableRefKeepSequenceOfOnClause() {
|
||||
Collections.sort(this.tableRefs_, new Comparator<TableRef>() {
|
||||
@Override
|
||||
public int compare(TableRef tableref1, TableRef tableref2) {
|
||||
int i1 = 0;
|
||||
int i2 = 0;
|
||||
if (tableref1.getOnClause() != null) {
|
||||
i1 = 1;
|
||||
}
|
||||
if (tableref2.getOnClause() != null) {
|
||||
i2 = 1;
|
||||
}
|
||||
return i1 - i2;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
|
||||
if (analyzed_) return;
|
||||
@ -120,7 +131,14 @@ public class FromClause implements ParseNode, Iterable<TableRef> {
|
||||
return;
|
||||
}
|
||||
|
||||
sortTableRefForSubquery(analyzer);
|
||||
// The order of the tables may have changed during the previous analyzer process.
|
||||
// For example, a join b on xxx is changed to b on xxx join a.
|
||||
// This change will cause the predicate in on clause be adjusted to the front of the association table,
|
||||
// causing semantic analysis to fail. Unknown column 'column1' in 'table1'
|
||||
// So we need to readjust the order of the tables here.
|
||||
if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
sortTableRefKeepSequenceOfOnClause();
|
||||
}
|
||||
|
||||
// Start out with table refs to establish aliases.
|
||||
TableRef leftTblRef = null; // the one to the left of tblRef
|
||||
|
||||
@ -515,7 +515,10 @@ public class SelectStmt extends QueryStmt {
|
||||
if (needToSql) {
|
||||
sqlString_ = toSql();
|
||||
}
|
||||
reorderTable(analyzer);
|
||||
if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
LOG.debug("use old reorder logical in select stmt");
|
||||
reorderTable(analyzer);
|
||||
}
|
||||
|
||||
resolveInlineViewRefs(analyzer);
|
||||
|
||||
|
||||
@ -19,6 +19,8 @@ package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.ColumnStats;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.thrift.TSlotDescriptor;
|
||||
|
||||
@ -195,6 +197,8 @@ public class SlotDescriptor {
|
||||
stats = new ColumnStats();
|
||||
}
|
||||
}
|
||||
// FIXME(dhc): mock ndv
|
||||
stats.setNumDistinctValues(parent.getCardinality());
|
||||
return stats;
|
||||
}
|
||||
|
||||
@ -290,4 +294,12 @@ public class SlotDescriptor {
|
||||
builder.append(prefix).append("slotIdx=").append(slotIdx).append("\n");
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
public boolean isScanSlot() {
|
||||
Table table = parent.getTable();
|
||||
if ((table != null) && (table instanceof OlapTable)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -87,7 +87,7 @@ public class Subquery extends Expr {
|
||||
try {
|
||||
stmt.analyze(analyzer);
|
||||
} catch (UserException e) {
|
||||
throw new AnalysisException(e.getMessage());
|
||||
throw new AnalysisException(e.getMessage(), e);
|
||||
}
|
||||
// Check whether the stmt_ contains an illegal mix of un/correlated table refs.
|
||||
stmt.getCorrelatedTupleIds(analyzer);
|
||||
|
||||
@ -638,6 +638,7 @@ public class TableRef implements ParseNode, Writable {
|
||||
return null;
|
||||
}
|
||||
|
||||
public boolean isAnalyzed() { return isAnalyzed; }
|
||||
public boolean isResolved() {
|
||||
return !getClass().equals(TableRef.class);
|
||||
}
|
||||
|
||||
@ -62,18 +62,24 @@ public class TupleDescriptor {
|
||||
private int numNullBytes;
|
||||
private int numNullableSlots;
|
||||
|
||||
// This cardinality is only used to mock slot ndv.
|
||||
// Only tuple of olap scan node has this value.
|
||||
private long cardinality;
|
||||
|
||||
private float avgSerializedSize; // in bytes; includes serialization overhead
|
||||
|
||||
public TupleDescriptor(TupleId id) {
|
||||
this.id = id;
|
||||
this.slots = new ArrayList<SlotDescriptor>();
|
||||
this.debugName = "";
|
||||
this.cardinality = -1;
|
||||
}
|
||||
|
||||
public TupleDescriptor(TupleId id, String debugName) {
|
||||
this.id = id;
|
||||
this.slots = new ArrayList<SlotDescriptor>();
|
||||
this.debugName = debugName;
|
||||
this.cardinality = -1;
|
||||
}
|
||||
|
||||
public void addSlot(SlotDescriptor desc) {
|
||||
@ -97,6 +103,14 @@ public class TupleDescriptor {
|
||||
return slots;
|
||||
}
|
||||
|
||||
public void setCardinality(long cardinality) {
|
||||
this.cardinality = cardinality;
|
||||
}
|
||||
|
||||
public long getCardinality() {
|
||||
return cardinality;
|
||||
}
|
||||
|
||||
public ArrayList<SlotDescriptor> getMaterializedSlots() {
|
||||
ArrayList<SlotDescriptor> result = Lists.newArrayList();
|
||||
for (SlotDescriptor slot : slots) {
|
||||
@ -162,6 +176,57 @@ public class TupleDescriptor {
|
||||
return ttupleDesc;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is mainly used to calculate the statistics of the tuple and the layout information.
|
||||
* Generally, it occurs after the plan node materializes the slot and before calculating the plan node statistics.
|
||||
* PlanNode.init() {
|
||||
* materializedSlot();
|
||||
* tupleDesc.computeStatAndMemLayout();
|
||||
* computeStat();
|
||||
* }
|
||||
*/
|
||||
public void computeStatAndMemLayout() {
|
||||
computeStat();
|
||||
computeMemLayout();
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is mainly used to evaluate the statistics of the tuple,
|
||||
* such as the average size of each row.
|
||||
* This function will be used before the computeStat() of the plan node
|
||||
* and is the pre-work for evaluating the statistics of the plan node.
|
||||
*
|
||||
* This function is theoretically only called once when the plan node is init.
|
||||
* However, the current code structure is relatively confusing
|
||||
* In order to ensure that even if it is wrongly called a second time, no error will occur,
|
||||
* so it will be initialized again at the beginning of the function.
|
||||
*
|
||||
* In the future this function will be changed to a private function.
|
||||
*/
|
||||
@Deprecated
|
||||
public void computeStat() {
|
||||
// init stat
|
||||
avgSerializedSize = 0;
|
||||
|
||||
// compute stat
|
||||
for (SlotDescriptor d : slots) {
|
||||
if (!d.isMaterialized()) {
|
||||
continue;
|
||||
}
|
||||
ColumnStats stats = d.getStats();
|
||||
if (stats.hasAvgSerializedSize()) {
|
||||
avgSerializedSize += d.getStats().getAvgSerializedSize();
|
||||
} else {
|
||||
// TODO: for computed slots, try to come up with stats estimates
|
||||
avgSerializedSize += d.getType().getSlotSize();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In the future this function will be changed to a private function.
|
||||
*/
|
||||
@Deprecated
|
||||
public void computeMemLayout() {
|
||||
// sort slots by size
|
||||
List<List<SlotDescriptor>> slotsBySize = Lists.newArrayListWithCapacity(PrimitiveType.getMaxSlotSize());
|
||||
@ -173,12 +238,6 @@ public class TupleDescriptor {
|
||||
numNullableSlots = 0;
|
||||
for (SlotDescriptor d : slots) {
|
||||
ColumnStats stats = d.getStats();
|
||||
if (stats.hasAvgSerializedSize()) {
|
||||
avgSerializedSize += d.getStats().getAvgSerializedSize();
|
||||
} else {
|
||||
// TODO: for computed slots, try to come up with stats estimates
|
||||
avgSerializedSize += d.getType().getSlotSize();
|
||||
}
|
||||
if (d.isMaterialized()) {
|
||||
slotsBySize.get(d.getType().getSlotSize()).add(d);
|
||||
if (d.getIsNullable()) {
|
||||
|
||||
@ -0,0 +1,54 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.common;
|
||||
|
||||
import com.google.common.math.LongMath;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
public class CheckedMath {
|
||||
|
||||
private final static Logger LOG = LogManager.getLogger(CheckedMath.class);
|
||||
|
||||
/**
|
||||
* Computes and returns the multiply of two longs. If an overflow occurs,
|
||||
* the maximum Long value is returned (Long.MAX_VALUE).
|
||||
*/
|
||||
public static long checkedMultiply(long a, long b) {
|
||||
try {
|
||||
return LongMath.checkedMultiply(a, b);
|
||||
} catch (ArithmeticException e) {
|
||||
LOG.warn("overflow when multiplying longs: " + a + ", " + b);
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes and returns the sum of two longs. If an overflow occurs,
|
||||
* the maximum Long value is returned (Long.MAX_VALUE).
|
||||
*/
|
||||
public static long checkedAdd(long a, long b) {
|
||||
try {
|
||||
return LongMath.checkedAdd(a, b);
|
||||
} catch (ArithmeticException e) {
|
||||
LOG.warn("overflow when adding longs: " + a + ", " + b);
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -183,7 +183,7 @@ public class ExportJob implements Writable {
|
||||
this.finishTimeMs = -1;
|
||||
this.failMsg = new ExportFailMsg(ExportFailMsg.CancelType.UNKNOWN, "");
|
||||
this.analyzer = new Analyzer(Catalog.getCurrentCatalog(), null);
|
||||
this.desc = new DescriptorTable();
|
||||
this.desc = analyzer.getDescTbl();
|
||||
this.exportPath = "";
|
||||
this.columnSeparator = "\t";
|
||||
this.lineDelimiter = "\n";
|
||||
@ -286,7 +286,7 @@ public class ExportJob implements Writable {
|
||||
}
|
||||
}
|
||||
}
|
||||
desc.computeMemLayout();
|
||||
desc.computeStatAndMemLayout();
|
||||
}
|
||||
|
||||
private void plan() throws UserException {
|
||||
@ -379,7 +379,6 @@ public class ExportJob implements Writable {
|
||||
((OlapScanNode) scanNode).setColumnFilters(Maps.newHashMap());
|
||||
((OlapScanNode) scanNode).setIsPreAggregation(false, "This an export operation");
|
||||
((OlapScanNode) scanNode).setCanTurnOnPreAggr(false);
|
||||
scanNode.init(analyzer);
|
||||
((OlapScanNode) scanNode).selectBestRollupByRollupSelector(analyzer);
|
||||
break;
|
||||
case ODBC:
|
||||
@ -392,6 +391,7 @@ public class ExportJob implements Writable {
|
||||
break;
|
||||
}
|
||||
if (scanNode != null) {
|
||||
scanNode.init(analyzer);
|
||||
scanNode.finalize(analyzer);
|
||||
}
|
||||
|
||||
|
||||
@ -106,7 +106,7 @@ public class LoadLoadingTask extends LoadTask {
|
||||
|
||||
@Override
|
||||
protected void executeTask() throws Exception{
|
||||
LOG.info("begin to execute loading task. load id: {} job: {}. db: {}, tbl: {}. left retry: {}",
|
||||
LOG.info("begin to execute loading task. load id: {} job id: {}. db: {}, tbl: {}. left retry: {}",
|
||||
DebugUtil.printId(loadId), callback.getCallbackId(), db.getFullName(), table.getName(), retryTime);
|
||||
retryTime--;
|
||||
beginTime = System.nanoTime();
|
||||
|
||||
@ -127,7 +127,7 @@ public class LoadingTaskPlanner {
|
||||
scanNode.init(analyzer);
|
||||
scanNode.finalize(analyzer);
|
||||
scanNodes.add(scanNode);
|
||||
descTable.computeMemLayout();
|
||||
descTable.computeStatAndMemLayout();
|
||||
|
||||
// 2. Olap table sink
|
||||
List<Long> partitionIds = getAllPartitionIds();
|
||||
|
||||
@ -950,7 +950,7 @@ public class SparkLoadJob extends BulkLoadJob {
|
||||
}
|
||||
|
||||
private void initTDescriptorTable(DescriptorTable descTable) {
|
||||
descTable.computeMemLayout();
|
||||
descTable.computeStatAndMemLayout();
|
||||
tDescriptorTable = descTable.toThrift();
|
||||
}
|
||||
}
|
||||
|
||||
@ -148,8 +148,8 @@ public class AggregationNode extends PlanNode {
|
||||
// conjuncts_ = orderConjunctsByCost(conjuncts_);
|
||||
|
||||
// Compute the mem layout for both tuples here for simplicity.
|
||||
aggInfo.getOutputTupleDesc().computeMemLayout();
|
||||
aggInfo.getIntermediateTupleDesc().computeMemLayout();
|
||||
aggInfo.getOutputTupleDesc().computeStatAndMemLayout();
|
||||
aggInfo.getIntermediateTupleDesc().computeStatAndMemLayout();
|
||||
|
||||
// do this at the end so it can take all conjuncts into account
|
||||
computeStats(analyzer);
|
||||
@ -167,14 +167,16 @@ public class AggregationNode extends PlanNode {
|
||||
@Override
|
||||
public void computeStats(Analyzer analyzer) {
|
||||
super.computeStats(analyzer);
|
||||
if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
return;
|
||||
}
|
||||
List<Expr> groupingExprs = aggInfo.getGroupingExprs();
|
||||
cardinality = 1;
|
||||
// cardinality: product of # of distinct values produced by grouping exprs
|
||||
for (Expr groupingExpr : groupingExprs) {
|
||||
long numDistinct = groupingExpr.getNumDistinctValues();
|
||||
// TODO: remove these before 1.0
|
||||
LOG.debug("grouping expr: " + groupingExpr.toSql() + " #distinct=" + Long.toString(
|
||||
numDistinct));
|
||||
numDistinct));
|
||||
if (numDistinct == -1) {
|
||||
cardinality = -1;
|
||||
break;
|
||||
@ -190,11 +192,42 @@ public class AggregationNode extends PlanNode {
|
||||
// some others, the estimate doesn't overshoot dramatically)
|
||||
cardinality *= numDistinct;
|
||||
}
|
||||
if (cardinality > 0) {
|
||||
LOG.debug("sel=" + Double.toString(computeSelectivity()));
|
||||
applyConjunctsSelectivity();
|
||||
}
|
||||
// if we ended up with an overflow, the estimate is certain to be wrong
|
||||
if (cardinality < 0) {
|
||||
cardinality = -1;
|
||||
}
|
||||
|
||||
capCardinalityAtLimit();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("stats Agg: cardinality={}", cardinality);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void computeOldCardinality() {
|
||||
List<Expr> groupingExprs = aggInfo.getGroupingExprs();
|
||||
cardinality = 1;
|
||||
// cardinality: product of # of distinct values produced by grouping exprs
|
||||
for (Expr groupingExpr : groupingExprs) {
|
||||
long numDistinct = groupingExpr.getNumDistinctValues();
|
||||
// TODO: remove these before 1.0
|
||||
LOG.debug("grouping expr: " + groupingExpr.toSql() + " #distinct=" + Long.toString(
|
||||
numDistinct));
|
||||
if (numDistinct == -1) {
|
||||
cardinality = -1;
|
||||
break;
|
||||
}
|
||||
cardinality *= numDistinct;
|
||||
}
|
||||
// take HAVING predicate into account
|
||||
LOG.debug("Agg: cardinality=" + Long.toString(cardinality));
|
||||
if (cardinality > 0) {
|
||||
cardinality = Math.round((double) cardinality * computeSelectivity());
|
||||
LOG.debug("sel=" + Double.toString(computeSelectivity()));
|
||||
cardinality = Math.round((double) cardinality * computeOldSelectivity());
|
||||
LOG.debug("sel=" + Double.toString(computeOldSelectivity()));
|
||||
}
|
||||
// if we ended up with an overflow, the estimate is certain to be wrong
|
||||
if (cardinality < 0) {
|
||||
@ -277,6 +310,8 @@ public class AggregationNode extends PlanNode {
|
||||
if (!conjuncts.isEmpty()) {
|
||||
output.append(detailPrefix + "having: ").append(getExplainString(conjuncts) + "\n");
|
||||
}
|
||||
output.append(detailPrefix).append(String.format(
|
||||
"cardinality=%s", cardinality)).append("\n");
|
||||
return output.toString();
|
||||
}
|
||||
|
||||
|
||||
@ -30,15 +30,15 @@ import org.apache.doris.thrift.TPlanNode;
|
||||
import org.apache.doris.thrift.TPlanNodeType;
|
||||
import org.apache.doris.thrift.TQueryOptions;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -108,8 +108,8 @@ public class AnalyticEvalNode extends PlanNode {
|
||||
|
||||
@Override
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
analyzer.getDescTbl().computeMemLayout();
|
||||
intermediateTupleDesc.computeMemLayout();
|
||||
analyzer.getDescTbl().computeStatAndMemLayout();
|
||||
intermediateTupleDesc.computeStatAndMemLayout();
|
||||
// we add the analyticInfo's smap to the combined smap of our child
|
||||
outputSmap = logicalToPhysicalSmap;
|
||||
createDefaultSmap(analyzer);
|
||||
@ -138,6 +138,19 @@ public class AnalyticEvalNode extends PlanNode {
|
||||
@Override
|
||||
protected void computeStats(Analyzer analyzer) {
|
||||
super.computeStats(analyzer);
|
||||
if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
return;
|
||||
}
|
||||
cardinality = cardinality == -1 ? getChild(0).cardinality : cardinality;
|
||||
applyConjunctsSelectivity();
|
||||
capCardinalityAtLimit();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("stats AnalyticEval: cardinality={}", cardinality);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void computeOldCardinality() {
|
||||
cardinality = getChild(0).cardinality;
|
||||
}
|
||||
|
||||
|
||||
@ -181,7 +181,7 @@ public class AnalyticPlanner {
|
||||
long ndv = Expr.getNumDistinctValues(
|
||||
Expr.intersect(pg1.partitionByExprs, pg2.partitionByExprs));
|
||||
|
||||
if (ndv == -1 || ndv < 0 || ndv < numNodes) {
|
||||
if (ndv == -1 || ndv < 1 || ndv < numNodes) {
|
||||
// didn't get a usable value or the number of partitions is too small
|
||||
continue;
|
||||
}
|
||||
@ -228,7 +228,7 @@ public class AnalyticPlanner {
|
||||
// TODO: also look at l2 and take the max?
|
||||
long ndv = Expr.getNumDistinctValues(l1);
|
||||
|
||||
if (ndv < 0 || ndv < numNodes || ndv < maxNdv) {
|
||||
if (ndv < 1 || ndv < numNodes || ndv < maxNdv) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -673,8 +673,7 @@ public class AnalyticPlanner {
|
||||
logicalToPhysicalSmap.put(new SlotRef(logicalOutputSlot), new SlotRef(physicalOutputSlot));
|
||||
}
|
||||
|
||||
physicalOutputTuple.computeMemLayout();
|
||||
// if (requiresIntermediateTuple) physicalIntermediateTuple.computeMemLayout();
|
||||
physicalOutputTuple.computeStatAndMemLayout();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -17,12 +17,17 @@
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.AssertNumRowsElement;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.thrift.TAssertNumRowsNode;
|
||||
import org.apache.doris.thrift.TExplainLevel;
|
||||
import org.apache.doris.thrift.TPlanNode;
|
||||
import org.apache.doris.thrift.TPlanNodeType;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
/**
|
||||
* Assert num rows node is used to determine whether the number of rows is less then desired num of rows.
|
||||
* The rows are the result of subqueryString.
|
||||
@ -30,6 +35,7 @@ import org.apache.doris.thrift.TPlanNodeType;
|
||||
* The cancelled reason will be reported by Backend and displayed back to the user.
|
||||
*/
|
||||
public class AssertNumRowsNode extends PlanNode {
|
||||
private static final Logger LOG = LogManager.getLogger(AssertNumRowsNode.class);
|
||||
|
||||
private long desiredNumOfRows;
|
||||
private String subqueryString;
|
||||
@ -46,6 +52,18 @@ public class AssertNumRowsNode extends PlanNode {
|
||||
this.nullableTupleIds.addAll(input.getNullableTupleIds());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
super.init(analyzer);
|
||||
super.computeStats(analyzer);
|
||||
if (analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
cardinality = 1;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("stats AssertNumRows: cardinality={}", cardinality);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
|
||||
if (detailLevel == TExplainLevel.BRIEF) {
|
||||
|
||||
@ -19,6 +19,8 @@ package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.TableRef;
|
||||
import org.apache.doris.common.CheckedMath;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.thrift.TExplainLevel;
|
||||
import org.apache.doris.thrift.TPlanNode;
|
||||
import org.apache.doris.thrift.TPlanNodeType;
|
||||
@ -59,18 +61,42 @@ public class CrossJoinNode extends PlanNode {
|
||||
return innerRef_;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
super.init(analyzer);
|
||||
assignedConjuncts = analyzer.getAssignedConjuncts();
|
||||
computeStats(analyzer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void computeStats(Analyzer analyzer) {
|
||||
super.computeStats(analyzer);
|
||||
if (getChild(0).cardinality == -1 || getChild(1).cardinality == -1) {
|
||||
cardinality = -1;
|
||||
} else {
|
||||
cardinality = getChild(0).cardinality * getChild(1).cardinality;
|
||||
if (computeSelectivity() != -1) {
|
||||
cardinality = Math.round(((double) cardinality) * computeSelectivity());
|
||||
}
|
||||
}
|
||||
LOG.debug("stats CrossJoin: cardinality={}", Long.toString(cardinality));
|
||||
super.computeStats(analyzer);
|
||||
if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
return;
|
||||
}
|
||||
if (getChild(0).cardinality == -1 || getChild(1).cardinality == -1) {
|
||||
cardinality = -1;
|
||||
} else {
|
||||
cardinality = CheckedMath.checkedMultiply(getChild(0).cardinality, getChild(1).cardinality);
|
||||
applyConjunctsSelectivity();
|
||||
capCardinalityAtLimit();
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("stats CrossJoin: cardinality={}", Long.toString(cardinality));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void computeOldCardinality() {
|
||||
if (getChild(0).cardinality == -1 || getChild(1).cardinality == -1) {
|
||||
cardinality = -1;
|
||||
} else {
|
||||
cardinality = getChild(0).cardinality * getChild(1).cardinality;
|
||||
if (computeOldSelectivity() != -1) {
|
||||
cardinality = Math.round(((double) cardinality) * computeOldSelectivity());
|
||||
}
|
||||
}
|
||||
LOG.debug("stats CrossJoin: cardinality={}", Long.toString(cardinality));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -94,6 +120,8 @@ public class CrossJoinNode extends PlanNode {
|
||||
} else {
|
||||
output.append(detailPrefix + "predicates is NULL.");
|
||||
}
|
||||
output.append(detailPrefix).append(String.format(
|
||||
"cardinality=%s", cardinality)).append("\n");
|
||||
return output.toString();
|
||||
}
|
||||
|
||||
|
||||
@ -26,6 +26,9 @@ import org.apache.doris.thrift.TPlanNodeType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
/**
|
||||
* Node that returns an empty result set. Used for planning query blocks with a constant
|
||||
* predicate evaluating to false or a limit 0. The result set will have zero rows, but
|
||||
@ -33,6 +36,8 @@ import com.google.common.base.Preconditions;
|
||||
* construct a valid row empty batch.
|
||||
*/
|
||||
public class EmptySetNode extends PlanNode {
|
||||
private final static Logger LOG = LogManager.getLogger(EmptySetNode.class);
|
||||
|
||||
public EmptySetNode(PlanNodeId id, ArrayList<TupleId> tupleIds) {
|
||||
super(id, tupleIds, "EMPTYSET");
|
||||
Preconditions.checkArgument(tupleIds.size() > 0);
|
||||
@ -43,6 +48,9 @@ public class EmptySetNode extends PlanNode {
|
||||
avgRowSize = 0;
|
||||
cardinality = 0;
|
||||
numNodes = 1;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("stats EmptySet:" + id + ", cardinality: " + cardinality);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -53,7 +61,7 @@ public class EmptySetNode extends PlanNode {
|
||||
// to be set as materialized (even though it isn't) to avoid failing precondition
|
||||
// checks generating the thrift for slot refs that may reference this tuple.
|
||||
for (TupleId id: tupleIds) analyzer.getTupleDesc(id).setIsMaterialized(true);
|
||||
computeMemLayout(analyzer);
|
||||
computeTupleStatAndMemLayout(analyzer);
|
||||
computeStats(analyzer);
|
||||
}
|
||||
|
||||
|
||||
@ -82,6 +82,7 @@ public class EsScanNode extends ScanNode {
|
||||
super.init(analyzer);
|
||||
|
||||
assignBackends();
|
||||
computeStats(analyzer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -73,11 +73,6 @@ public class ExchangeNode extends PlanNode {
|
||||
if (!copyConjuncts) {
|
||||
this.conjuncts = Lists.newArrayList();
|
||||
}
|
||||
if (hasLimit()) {
|
||||
cardinality = Math.min(limit, inputNode.cardinality);
|
||||
} else {
|
||||
cardinality = inputNode.cardinality;
|
||||
}
|
||||
// Only apply the limit at the receiver if there are multiple senders.
|
||||
if (inputNode.getFragment().isPartitioned()) limit = inputNode.limit;
|
||||
computeTupleIds();
|
||||
@ -95,6 +90,20 @@ public class ExchangeNode extends PlanNode {
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
super.init(analyzer);
|
||||
Preconditions.checkState(conjuncts.isEmpty());
|
||||
if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
return;
|
||||
}
|
||||
computeStats(analyzer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void computeStats(Analyzer analyzer) {
|
||||
Preconditions.checkState(children.size() == 1);
|
||||
cardinality = children.get(0).cardinality;
|
||||
capCardinalityAtLimit();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("stats Exchange:" + id + ", cardinality: " + cardinality);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -28,6 +28,10 @@ import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.TableRef;
|
||||
import org.apache.doris.analysis.TupleId;
|
||||
import org.apache.doris.catalog.ColumnStats;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.CheckedMath;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.thrift.TEqJoinCondition;
|
||||
import org.apache.doris.thrift.TExplainLevel;
|
||||
@ -35,15 +39,17 @@ import org.apache.doris.thrift.THashJoinNode;
|
||||
import org.apache.doris.thrift.TPlanNode;
|
||||
import org.apache.doris.thrift.TPlanNodeType;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@ -59,7 +65,7 @@ public class HashJoinNode extends PlanNode {
|
||||
// predicates of the form 'a=b' or 'a<=>b'
|
||||
private List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList();
|
||||
// join conjuncts from the JOIN clause that aren't equi-join predicates
|
||||
private List<Expr> otherJoinConjuncts;
|
||||
private List<Expr> otherJoinConjuncts;
|
||||
private DistributionMode distrMode;
|
||||
private boolean isColocate = false; //the flag for colocate join
|
||||
private String colocateReason = ""; // if can not do colocate join, set reason here
|
||||
@ -134,23 +140,19 @@ public class HashJoinNode extends PlanNode {
|
||||
|
||||
@Override
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
assignConjuncts(analyzer);
|
||||
|
||||
// Set smap to the combined children's smaps and apply that to all conjuncts_.
|
||||
createDefaultSmap(analyzer);
|
||||
|
||||
super.init(analyzer);
|
||||
assignedConjuncts = analyzer.getAssignedConjuncts();
|
||||
// outSmap replace in outer join may cause NULL be replace by literal
|
||||
// so need replace the outsmap in nullableTupleID
|
||||
replaceOutputSmapForOuterJoin();
|
||||
|
||||
computeStats(analyzer);
|
||||
//assignedConjuncts = analyzr.getAssignedConjuncts();
|
||||
|
||||
ExprSubstitutionMap combinedChildSmap = getCombinedChildWithoutTupleIsNullSmap();
|
||||
List<Expr> newEqJoinConjuncts =
|
||||
Expr.substituteList(eqJoinConjuncts, combinedChildSmap, analyzer, false);
|
||||
eqJoinConjuncts = newEqJoinConjuncts.stream()
|
||||
.map(entity -> (BinaryPredicate) entity).collect(Collectors.toList());
|
||||
assignedConjuncts = analyzer.getAssignedConjuncts();
|
||||
otherJoinConjuncts =
|
||||
Expr.substituteList(otherJoinConjuncts, combinedChildSmap, analyzer, false);
|
||||
}
|
||||
@ -179,10 +181,191 @@ public class HashJoinNode extends PlanNode {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Holds the source scan slots of a <SlotRef> = <SlotRef> join predicate.
|
||||
* The underlying table and column on both sides have stats.
|
||||
*/
|
||||
public static final class EqJoinConjunctScanSlots {
|
||||
private final Expr eqJoinConjunct;
|
||||
private final SlotDescriptor lhs;
|
||||
private final SlotDescriptor rhs;
|
||||
|
||||
private EqJoinConjunctScanSlots(Expr eqJoinConjunct, SlotDescriptor lhs,
|
||||
SlotDescriptor rhs) {
|
||||
this.eqJoinConjunct = eqJoinConjunct;
|
||||
this.lhs = lhs;
|
||||
this.rhs = rhs;
|
||||
}
|
||||
|
||||
// Convenience functions. They return double to avoid excessive casts in callers.
|
||||
public double lhsNdv() {
|
||||
// return the estimated number of rows in this partition (-1 if unknown)
|
||||
return Math.min(lhs.getStats().getNumDistinctValues(), lhsNumRows());
|
||||
}
|
||||
|
||||
public double rhsNdv() {
|
||||
return Math.min(rhs.getStats().getNumDistinctValues(), rhsNumRows());
|
||||
}
|
||||
|
||||
public double lhsNumRows() {
|
||||
Table table = lhs.getParent().getTable();
|
||||
Preconditions.checkState(table instanceof OlapTable);
|
||||
return ((OlapTable) (table)).getRowCount();
|
||||
}
|
||||
|
||||
public double rhsNumRows() {
|
||||
Table table = rhs.getParent().getTable();
|
||||
Preconditions.checkState(table instanceof OlapTable);
|
||||
return ((OlapTable) (table)).getRowCount();
|
||||
}
|
||||
|
||||
public TupleId lhsTid() {
|
||||
return lhs.getParent().getId();
|
||||
}
|
||||
|
||||
public TupleId rhsTid() {
|
||||
return rhs.getParent().getId();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new EqJoinConjunctScanSlots for the given equi-join conjunct or null if
|
||||
* the given conjunct is not of the form <SlotRef> = <SlotRef> or if the underlying
|
||||
* table/column of at least one side is missing stats.
|
||||
*/
|
||||
public static EqJoinConjunctScanSlots create(Expr eqJoinConjunct) {
|
||||
if (!Expr.IS_EQ_BINARY_PREDICATE.apply(eqJoinConjunct)) return null;
|
||||
SlotDescriptor lhsScanSlot = eqJoinConjunct.getChild(0).findSrcScanSlot();
|
||||
if (lhsScanSlot == null || !hasNumRowsAndNdvStats(lhsScanSlot)) return null;
|
||||
SlotDescriptor rhsScanSlot = eqJoinConjunct.getChild(1).findSrcScanSlot();
|
||||
if (rhsScanSlot == null || !hasNumRowsAndNdvStats(rhsScanSlot)) return null;
|
||||
return new EqJoinConjunctScanSlots(eqJoinConjunct, lhsScanSlot, rhsScanSlot);
|
||||
}
|
||||
|
||||
private static boolean hasNumRowsAndNdvStats(SlotDescriptor slotDesc) {
|
||||
if (slotDesc.getColumn() == null) return false;
|
||||
if (!slotDesc.getStats().hasNumDistinctValues()) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Groups the given EqJoinConjunctScanSlots by the lhs/rhs tuple combination
|
||||
* and returns the result as a map.
|
||||
*/
|
||||
public static Map<Pair<TupleId, TupleId>, List<EqJoinConjunctScanSlots>>
|
||||
groupByJoinedTupleIds(List<EqJoinConjunctScanSlots> eqJoinConjunctSlots) {
|
||||
Map<Pair<TupleId, TupleId>, List<EqJoinConjunctScanSlots>> scanSlotsByJoinedTids =
|
||||
new LinkedHashMap<>();
|
||||
for (EqJoinConjunctScanSlots slots : eqJoinConjunctSlots) {
|
||||
Pair<TupleId, TupleId> tids = Pair.create(slots.lhsTid(), slots.rhsTid());
|
||||
List<EqJoinConjunctScanSlots> scanSlots = scanSlotsByJoinedTids.get(tids);
|
||||
if (scanSlots == null) {
|
||||
scanSlots = new ArrayList<>();
|
||||
scanSlotsByJoinedTids.put(tids, scanSlots);
|
||||
}
|
||||
scanSlots.add(slots);
|
||||
}
|
||||
return scanSlotsByJoinedTids;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return eqJoinConjunct.toSql();
|
||||
}
|
||||
}
|
||||
|
||||
private long getJoinCardinality() {
|
||||
Preconditions.checkState(joinOp.isInnerJoin() || joinOp.isOuterJoin());
|
||||
|
||||
long lhsCard = getChild(0).cardinality;
|
||||
long rhsCard = getChild(1).cardinality;
|
||||
if (lhsCard == -1 || rhsCard == -1) {
|
||||
return lhsCard;
|
||||
}
|
||||
|
||||
// Collect join conjuncts that are eligible to participate in cardinality estimation.
|
||||
List<EqJoinConjunctScanSlots> eqJoinConjunctSlots = new ArrayList<>();
|
||||
for (Expr eqJoinConjunct : eqJoinConjuncts) {
|
||||
EqJoinConjunctScanSlots slots = EqJoinConjunctScanSlots.create(eqJoinConjunct);
|
||||
if (slots != null) eqJoinConjunctSlots.add(slots);
|
||||
}
|
||||
|
||||
if (eqJoinConjunctSlots.isEmpty()) {
|
||||
// There are no eligible equi-join conjuncts.
|
||||
return lhsCard;
|
||||
}
|
||||
|
||||
return getGenericJoinCardinality(eqJoinConjunctSlots, lhsCard, rhsCard);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the estimated join cardinality of a generic N:M inner or outer join based
|
||||
* on the given list of equi-join conjunct slots and the join input cardinalities.
|
||||
* The returned result is >= 0.
|
||||
* The list of join conjuncts must be non-empty and the cardinalities must be >= 0.
|
||||
* <p>
|
||||
* Generic estimation:
|
||||
* cardinality = |child(0)| * |child(1)| / max(NDV(L.c), NDV(R.d))
|
||||
* - case A: NDV(L.c) <= NDV(R.d)
|
||||
* every row from child(0) joins with |child(1)| / NDV(R.d) rows
|
||||
* - case B: NDV(L.c) > NDV(R.d)
|
||||
* every row from child(1) joins with |child(0)| / NDV(L.c) rows
|
||||
* - we adjust the NDVs from both sides to account for predicates that may
|
||||
* might have reduce the cardinality and NDVs
|
||||
*/
|
||||
private long getGenericJoinCardinality(List<EqJoinConjunctScanSlots> eqJoinConjunctSlots, long lhsCard, long rhsCard) {
|
||||
Preconditions.checkState(joinOp.isInnerJoin() || joinOp.isOuterJoin());
|
||||
Preconditions.checkState(!eqJoinConjunctSlots.isEmpty());
|
||||
Preconditions.checkState(lhsCard >= 0 && rhsCard >= 0);
|
||||
|
||||
long result = -1;
|
||||
for (EqJoinConjunctScanSlots slots : eqJoinConjunctSlots) {
|
||||
// Adjust the NDVs on both sides to account for predicates. Intuitively, the NDVs
|
||||
// should only decrease. We ignore adjustments that would lead to an increase.
|
||||
double lhsAdjNdv = slots.lhsNdv();
|
||||
if (slots.lhsNumRows() > lhsCard) {
|
||||
lhsAdjNdv *= lhsCard / slots.lhsNumRows();
|
||||
}
|
||||
double rhsAdjNdv = slots.rhsNdv();
|
||||
if (slots.rhsNumRows() > rhsCard) {
|
||||
rhsAdjNdv *= rhsCard / slots.rhsNumRows();
|
||||
}
|
||||
// A lower limit of 1 on the max Adjusted Ndv ensures we don't estimate
|
||||
// cardinality more than the max possible.
|
||||
long joinCard = CheckedMath.checkedMultiply(
|
||||
Math.round((lhsCard / Math.max(1, Math.max(lhsAdjNdv, rhsAdjNdv)))), rhsCard);
|
||||
if (result == -1) {
|
||||
result = joinCard;
|
||||
} else {
|
||||
result = Math.min(result, joinCard);
|
||||
}
|
||||
}
|
||||
Preconditions.checkState(result >= 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void computeStats(Analyzer analyzer) {
|
||||
super.computeStats(analyzer);
|
||||
|
||||
if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
return;
|
||||
}
|
||||
if (joinOp.isSemiAntiJoin()) {
|
||||
cardinality = getSemiJoinCardinality();
|
||||
} else if (joinOp.isInnerJoin() || joinOp.isOuterJoin()) {
|
||||
cardinality = getJoinCardinality();
|
||||
} else {
|
||||
Preconditions.checkState(false, "joinOp is not supported");
|
||||
}
|
||||
capCardinalityAtLimit();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("stats HashJoin:" + id + ", cardinality: " + cardinality);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void computeOldCardinality() {
|
||||
// For a join between child(0) and child(1), we look for join conditions "L.c = R.d"
|
||||
// (with L being from child(0) and R from child(1)) and use as the cardinality
|
||||
// estimate the maximum of
|
||||
@ -228,11 +411,11 @@ public class HashJoinNode extends PlanNode {
|
||||
// TODO rownum
|
||||
//Table rhsTbl = slotDesc.getParent().getTableFamilyGroup().getBaseTable();
|
||||
// if (rhsTbl != null && rhsTbl.getNumRows() != -1) {
|
||||
// we can't have more distinct values than rows in the table, even though
|
||||
// the metastore stats may think so
|
||||
// LOG.info(
|
||||
// "#distinct=" + numDistinct + " #rows=" + Long.toString(rhsTbl.getNumRows()));
|
||||
// numDistinct = Math.min(numDistinct, rhsTbl.getNumRows());
|
||||
// we can't have more distinct values than rows in the table, even though
|
||||
// the metastore stats may think so
|
||||
// LOG.info(
|
||||
// "#distinct=" + numDistinct + " #rows=" + Long.toString(rhsTbl.getNumRows()));
|
||||
// numDistinct = Math.min(numDistinct, rhsTbl.getNumRows());
|
||||
// }
|
||||
maxNumDistinct = Math.max(maxNumDistinct, numDistinct);
|
||||
LOG.debug("min slotref: {}, #distinct: {}", rhsSlotRef.toSql(), numDistinct);
|
||||
@ -245,12 +428,112 @@ public class HashJoinNode extends PlanNode {
|
||||
cardinality = getChild(0).cardinality;
|
||||
} else {
|
||||
cardinality = Math.round((double) getChild(0).cardinality * (double) getChild(
|
||||
1).cardinality / (double) maxNumDistinct);
|
||||
1).cardinality / (double) maxNumDistinct);
|
||||
LOG.debug("lhs card: {}, rhs card: {}", getChild(0).cardinality, getChild(1).cardinality);
|
||||
}
|
||||
LOG.debug("stats HashJoin: cardinality {}", cardinality);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unwraps the SlotRef in expr and returns the NDVs of it.
|
||||
* Returns -1 if the NDVs are unknown or if expr is not a SlotRef.
|
||||
*/
|
||||
private long getNdv(Expr expr) {
|
||||
SlotRef slotRef = expr.unwrapSlotRef(false);
|
||||
if (slotRef == null) {
|
||||
return -1;
|
||||
}
|
||||
SlotDescriptor slotDesc = slotRef.getDesc();
|
||||
if (slotDesc == null) {
|
||||
return -1;
|
||||
}
|
||||
ColumnStats stats = slotDesc.getStats();
|
||||
if (!stats.hasNumDistinctValues()) {
|
||||
return -1;
|
||||
}
|
||||
return stats.getNumDistinctValues();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the estimated cardinality of a semi join node.
|
||||
* For a left semi join between child(0) and child(1), we look for equality join
|
||||
* conditions "L.c = R.d" (with L being from child(0) and R from child(1)) and use as
|
||||
* the cardinality estimate the minimum of
|
||||
* |child(0)| * Min(NDV(L.c), NDV(R.d)) / NDV(L.c)
|
||||
* over all suitable join conditions. The reasoning is that:
|
||||
* - each row in child(0) is returned at most once
|
||||
* - the probability of a row in child(0) having a match in R is
|
||||
* Min(NDV(L.c), NDV(R.d)) / NDV(L.c)
|
||||
* <p>
|
||||
* For a left anti join we estimate the cardinality as the minimum of:
|
||||
* |L| * Max(NDV(L.c) - NDV(R.d), NDV(L.c)) / NDV(L.c)
|
||||
* over all suitable join conditions. The reasoning is that:
|
||||
* - each row in child(0) is returned at most once
|
||||
* - if NDV(L.c) > NDV(R.d) then the probability of row in L having a match
|
||||
* in child(1) is (NDV(L.c) - NDV(R.d)) / NDV(L.c)
|
||||
* - otherwise, we conservatively use |L| to avoid underestimation
|
||||
* <p>
|
||||
* We analogously estimate the cardinality for right semi/anti joins, and treat the
|
||||
* null-aware anti join like a regular anti join
|
||||
*/
|
||||
private long getSemiJoinCardinality() {
|
||||
Preconditions.checkState(joinOp.isSemiJoin());
|
||||
|
||||
// Return -1 if the cardinality of the returned side is unknown.
|
||||
long cardinality;
|
||||
if (joinOp == JoinOperator.RIGHT_SEMI_JOIN
|
||||
|| joinOp == JoinOperator.RIGHT_ANTI_JOIN) {
|
||||
if (getChild(1).cardinality == -1) {
|
||||
return -1;
|
||||
}
|
||||
cardinality = getChild(1).cardinality;
|
||||
} else {
|
||||
if (getChild(0).cardinality == -1) {
|
||||
return -1;
|
||||
}
|
||||
cardinality = getChild(0).cardinality;
|
||||
}
|
||||
double minSelectivity = 1.0;
|
||||
for (Expr eqJoinPredicate : eqJoinConjuncts) {
|
||||
long lhsNdv = getNdv(eqJoinPredicate.getChild(0));
|
||||
lhsNdv = Math.min(lhsNdv, getChild(0).cardinality);
|
||||
long rhsNdv = getNdv(eqJoinPredicate.getChild(1));
|
||||
rhsNdv = Math.min(rhsNdv, getChild(1).cardinality);
|
||||
|
||||
// Skip conjuncts with unknown NDV on either side.
|
||||
if (lhsNdv == -1 || rhsNdv == -1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
double selectivity = 1.0;
|
||||
switch (joinOp) {
|
||||
case LEFT_SEMI_JOIN: {
|
||||
selectivity = (double) Math.min(lhsNdv, rhsNdv) / (double) (lhsNdv);
|
||||
break;
|
||||
}
|
||||
case RIGHT_SEMI_JOIN: {
|
||||
selectivity = (double) Math.min(lhsNdv, rhsNdv) / (double) (rhsNdv);
|
||||
break;
|
||||
}
|
||||
case LEFT_ANTI_JOIN:
|
||||
case NULL_AWARE_LEFT_ANTI_JOIN: {
|
||||
selectivity = (double) (lhsNdv > rhsNdv ? (lhsNdv - rhsNdv) : lhsNdv) / (double) lhsNdv;
|
||||
break;
|
||||
}
|
||||
case RIGHT_ANTI_JOIN: {
|
||||
selectivity = (double) (rhsNdv > lhsNdv ? (rhsNdv - lhsNdv) : rhsNdv) / (double) rhsNdv;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
Preconditions.checkState(false);
|
||||
}
|
||||
minSelectivity = Math.min(minSelectivity, selectivity);
|
||||
}
|
||||
|
||||
Preconditions.checkState(cardinality != -1);
|
||||
return Math.round(cardinality * minSelectivity);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String debugString() {
|
||||
return MoreObjects.toStringHelper(this).add("eqJoinConjuncts",
|
||||
|
||||
@ -186,7 +186,7 @@ public abstract class LoadScanNode extends ScanNode {
|
||||
// LOG.info("brokerScanRange is {}", brokerScanRange);
|
||||
|
||||
// Need re compute memory layout after set some slot descriptor to nullable
|
||||
srcTupleDesc.computeMemLayout();
|
||||
srcTupleDesc.computeStatAndMemLayout();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -1,170 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.SlotId;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.thrift.TEqJoinCondition;
|
||||
import org.apache.doris.thrift.TExplainLevel;
|
||||
import org.apache.doris.thrift.TMergeJoinNode;
|
||||
import org.apache.doris.thrift.TPlanNode;
|
||||
import org.apache.doris.thrift.TPlanNodeType;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Merge join between left child and right child.
|
||||
* The right child must be a leaf node, ie, can only materialize
|
||||
* a single input tuple.
|
||||
*/
|
||||
public class MergeJoinNode extends PlanNode {
|
||||
private final static Logger LOG = LogManager.getLogger(MergeJoinNode.class);
|
||||
// conjuncts of the form "<lhs> = <rhs>", recorded as Pair(<lhs>, <rhs>)
|
||||
private final List<Pair<Expr, Expr>> cmpConjuncts;
|
||||
// join conjuncts from the JOIN clause that aren't equi-join predicates
|
||||
private final List<Expr> otherJoinConjuncts;
|
||||
private DistributionMode distrMode;
|
||||
|
||||
public MergeJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner,
|
||||
List<Pair<Expr, Expr>> cmpConjuncts, List<Expr> otherJoinConjuncts) {
|
||||
super(id, "MERGE JOIN");
|
||||
Preconditions.checkArgument(cmpConjuncts != null);
|
||||
Preconditions.checkArgument(otherJoinConjuncts != null);
|
||||
tupleIds.addAll(outer.getTupleIds());
|
||||
tupleIds.addAll(inner.getTupleIds());
|
||||
this.distrMode = DistributionMode.PARTITIONED;
|
||||
this.cmpConjuncts = cmpConjuncts;
|
||||
this.otherJoinConjuncts = otherJoinConjuncts;
|
||||
children.add(outer);
|
||||
children.add(inner);
|
||||
|
||||
// Inherits all the nullable tuple from the children
|
||||
// Mark tuples that form the "nullable" side of the outer join as nullable.
|
||||
nullableTupleIds.addAll(inner.getNullableTupleIds());
|
||||
nullableTupleIds.addAll(outer.getNullableTupleIds());
|
||||
nullableTupleIds.addAll(outer.getTupleIds());
|
||||
nullableTupleIds.addAll(inner.getTupleIds());
|
||||
}
|
||||
|
||||
public List<Pair<Expr, Expr>> getCmpConjuncts() {
|
||||
return cmpConjuncts;
|
||||
}
|
||||
|
||||
public DistributionMode getDistributionMode() {
|
||||
return distrMode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void computeStats(Analyzer analyzer) {
|
||||
super.computeStats(analyzer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String debugString() {
|
||||
return MoreObjects.toStringHelper(this).add("cmpConjuncts", cmpConjunctsDebugString()).addValue(
|
||||
super.debugString()).toString();
|
||||
}
|
||||
|
||||
private String cmpConjunctsDebugString() {
|
||||
MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
|
||||
for (Pair<Expr, Expr> entry : cmpConjuncts) {
|
||||
helper.add("lhs", entry.first).add("rhs", entry.second);
|
||||
}
|
||||
return helper.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void getMaterializedIds(Analyzer analyzer, List<SlotId> ids) {
|
||||
super.getMaterializedIds(analyzer, ids);
|
||||
// we also need to materialize everything referenced by cmpConjuncts
|
||||
// and otherJoinConjuncts
|
||||
for (Pair<Expr, Expr> p : cmpConjuncts) {
|
||||
p.first.getIds(null, ids);
|
||||
p.second.getIds(null, ids);
|
||||
}
|
||||
for (Expr e : otherJoinConjuncts) {
|
||||
e.getIds(null, ids);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void toThrift(TPlanNode msg) {
|
||||
msg.node_type = TPlanNodeType.MERGE_JOIN_NODE;
|
||||
msg.merge_join_node = new TMergeJoinNode();
|
||||
for (Pair<Expr, Expr> entry : cmpConjuncts) {
|
||||
TEqJoinCondition eqJoinCondition =
|
||||
new TEqJoinCondition(entry.first.treeToThrift(), entry.second.treeToThrift());
|
||||
msg.merge_join_node.addToCmpConjuncts(eqJoinCondition);
|
||||
}
|
||||
for (Expr e : otherJoinConjuncts) {
|
||||
msg.hash_join_node.addToOtherJoinConjuncts(e.treeToThrift());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getNodeExplainString(String detailPrefix, TExplainLevel detailLevel) {
|
||||
String distrModeStr =
|
||||
(distrMode != DistributionMode.NONE) ? (" (" + distrMode.toString() + ")") : "";
|
||||
StringBuilder output = new StringBuilder().append(
|
||||
detailPrefix + "join op: MERGE JOIN" + distrModeStr + "\n").append(
|
||||
detailPrefix + "hash predicates:\n");
|
||||
for (Pair<Expr, Expr> entry : cmpConjuncts) {
|
||||
output.append(detailPrefix + " " +
|
||||
entry.first.toSql() + " = " + entry.second.toSql() + "\n");
|
||||
}
|
||||
if (!otherJoinConjuncts.isEmpty()) {
|
||||
output.append(detailPrefix + "other join predicates: ").append(
|
||||
getExplainString(otherJoinConjuncts) + "\n");
|
||||
}
|
||||
if (!conjuncts.isEmpty()) {
|
||||
output.append(detailPrefix + "other predicates: ").append(
|
||||
getExplainString(conjuncts) + "\n");
|
||||
}
|
||||
return output.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumInstances() {
|
||||
return Math.max(children.get(0).getNumInstances(), children.get(1).getNumInstances());
|
||||
}
|
||||
|
||||
enum DistributionMode {
|
||||
NONE("NONE"),
|
||||
BROADCAST("BROADCAST"),
|
||||
PARTITIONED("PARTITIONED");
|
||||
|
||||
private final String description;
|
||||
|
||||
private DistributionMode(String descr) {
|
||||
this.description = descr;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return description;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -24,7 +24,6 @@ import org.apache.doris.analysis.SlotId;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.analysis.TupleId;
|
||||
import org.apache.doris.common.UserException;
|
||||
|
||||
import org.apache.doris.thrift.TExplainLevel;
|
||||
import org.apache.doris.thrift.TExpr;
|
||||
import org.apache.doris.thrift.TMergeNode;
|
||||
@ -62,19 +61,9 @@ public class MergeNode extends PlanNode {
|
||||
|
||||
protected final TupleId tupleId;
|
||||
|
||||
private final boolean isIntermediateMerge;
|
||||
|
||||
protected MergeNode(PlanNodeId id, TupleId tupleId) {
|
||||
super(id, tupleId.asList(), "MERGE");
|
||||
// this.rowTupleIds.add(tupleId);
|
||||
this.tupleId = tupleId;
|
||||
this.isIntermediateMerge = false;
|
||||
}
|
||||
|
||||
protected MergeNode(PlanNodeId id, MergeNode node) {
|
||||
super(id, node, "MERGE");
|
||||
this.tupleId = node.tupleId;
|
||||
this.isIntermediateMerge = true;
|
||||
}
|
||||
|
||||
public void addConstExprList(List<Expr> exprs) {
|
||||
@ -137,6 +126,26 @@ public class MergeNode extends PlanNode {
|
||||
@Override
|
||||
public void computeStats(Analyzer analyzer) {
|
||||
super.computeStats(analyzer);
|
||||
if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
return;
|
||||
}
|
||||
cardinality = constExprLists.size();
|
||||
for (PlanNode child : children) {
|
||||
// ignore missing child cardinality info in the hope it won't matter enough
|
||||
// to change the planning outcome
|
||||
if (child.cardinality > 0) {
|
||||
cardinality += child.cardinality;
|
||||
}
|
||||
}
|
||||
applyConjunctsSelectivity();
|
||||
capCardinalityAtLimit();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("stats Merge: cardinality={}", Long.toString(cardinality));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void computeOldCardinality() {
|
||||
cardinality = constExprLists.size();
|
||||
for (PlanNode child : children) {
|
||||
// ignore missing child cardinality info in the hope it won't matter enough
|
||||
|
||||
@ -60,6 +60,12 @@ public class MysqlScanNode extends ScanNode {
|
||||
tblName = "`" + tbl.getMysqlTableName() + "`";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
super.init(analyzer);
|
||||
computeStats(analyzer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String debugString() {
|
||||
MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
|
||||
@ -71,7 +77,6 @@ public class MysqlScanNode extends ScanNode {
|
||||
// Convert predicates to MySQL columns and filters.
|
||||
createMySQLColumns(analyzer);
|
||||
createMySQLFilters(analyzer);
|
||||
computeStats(analyzer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -79,6 +79,12 @@ public class OdbcScanNode extends ScanNode {
|
||||
tblName = OdbcTable.databaseProperName(odbcType, tbl.getOdbcTableName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
super.init(analyzer);
|
||||
computeStats(analyzer);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String debugString() {
|
||||
MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
|
||||
@ -90,7 +96,6 @@ public class OdbcScanNode extends ScanNode {
|
||||
// Convert predicates to Odbc columns and filters.
|
||||
createOdbcColumns(analyzer);
|
||||
createOdbcFilters(analyzer);
|
||||
computeStats(analyzer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -132,13 +132,12 @@ public class OlapScanNode extends ScanNode {
|
||||
|
||||
// List of tablets will be scanned by current olap_scan_node
|
||||
private ArrayList<Long> scanTabletIds = Lists.newArrayList();
|
||||
private boolean isFinalized = false;
|
||||
|
||||
private HashSet<Long> scanBackendIds = new HashSet<>();
|
||||
|
||||
private Map<Long, Integer> tabletId2BucketSeq = Maps.newHashMap();
|
||||
// a bucket seq may map to many tablets, and each tablet has a TScanRangeLocations.
|
||||
public ArrayListMultimap<Integer, TScanRangeLocations> bucketSeq2locations= ArrayListMultimap.create();
|
||||
public ArrayListMultimap<Integer, TScanRangeLocations> bucketSeq2locations = ArrayListMultimap.create();
|
||||
|
||||
// Constructs node to scan given data files of table 'tbl'.
|
||||
public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
|
||||
@ -228,8 +227,8 @@ public class OlapScanNode extends ScanNode {
|
||||
SessionVariable sessionVariable = ConnectContext.get().getSessionVariable();
|
||||
if (sessionVariable.getTestMaterializedView()) {
|
||||
throw new AnalysisException("The old scan range info is different from the new one when "
|
||||
+ "test_materialized_view is true. "
|
||||
+ scanRangeInfo);
|
||||
+ "test_materialized_view is true. "
|
||||
+ scanRangeInfo);
|
||||
}
|
||||
situation = "The key type of table is aggregated.";
|
||||
update = false;
|
||||
@ -297,38 +296,93 @@ public class OlapScanNode extends ScanNode {
|
||||
|
||||
filterDeletedRows(analyzer);
|
||||
computePartitionInfo();
|
||||
computeTupleState(analyzer);
|
||||
|
||||
/**
|
||||
* Compute InAccurate stats before mv selector and tablet pruning.
|
||||
* - Accurate statistical information relies on the selector of materialized views and bucket reduction.
|
||||
* - However, Those both processes occur after the reorder algorithm is completed.
|
||||
* - When Join reorder is turned on, the computeStats() must be completed before the reorder algorithm.
|
||||
* - So only an inaccurate statistical information can be calculated here.
|
||||
*/
|
||||
if (analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
computeInaccurateStats(analyzer);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finalize(Analyzer analyzer) throws UserException {
|
||||
if (isFinalized) {
|
||||
return;
|
||||
LOG.debug("OlapScanNode get scan range locations. Tuple: {}", desc);
|
||||
/**
|
||||
* If JoinReorder is turned on, it will be calculated init(), and this value is not accurate.
|
||||
* In the following logic, cardinality will be accurately calculated again.
|
||||
* So here we need to reset the value of cardinality.
|
||||
*/
|
||||
if (analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
cardinality = 0;
|
||||
}
|
||||
|
||||
LOG.debug("OlapScanNode finalize. Tuple: {}", desc);
|
||||
try {
|
||||
getScanRangeLocations();
|
||||
} catch (AnalysisException e) {
|
||||
throw new UserException(e.getMessage());
|
||||
}
|
||||
if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
computeOldRowSizeAndCardinality();
|
||||
}
|
||||
computeNumNodes();
|
||||
}
|
||||
|
||||
computeStats(analyzer);
|
||||
isFinalized = true;
|
||||
public void computeTupleState(Analyzer analyzer) {
|
||||
for (TupleId id : tupleIds) {
|
||||
analyzer.getDescTbl().getTupleDesc(id).computeStat();
|
||||
}
|
||||
}
|
||||
|
||||
public void computeOldRowSizeAndCardinality() {
|
||||
if (cardinality > 0) {
|
||||
avgRowSize = totalBytes / (float) cardinality;
|
||||
capCardinalityAtLimit();
|
||||
}
|
||||
// when node scan has no data, cardinality should be 0 instead of a invalid value after computeStats()
|
||||
cardinality = cardinality == -1 ? 0 : cardinality;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void computeStats(Analyzer analyzer) {
|
||||
protected void computeNumNodes() {
|
||||
if (cardinality > 0) {
|
||||
avgRowSize = totalBytes / (float) cardinality;
|
||||
if (hasLimit()) {
|
||||
cardinality = Math.min(cardinality, limit);
|
||||
}
|
||||
numNodes = scanBackendIds.size();
|
||||
}
|
||||
// even current node scan has no data,at least on backend will be assigned when the fragment actually execute
|
||||
numNodes = numNodes <= 0 ? 1 : numNodes;
|
||||
// when node scan has no data, cardinality should be 0 instead of a invalid value after computeStats()
|
||||
cardinality = cardinality == -1 ? 0 : cardinality;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate inaccurate stats such as: cardinality.
|
||||
* cardinality: the value of cardinality is the sum of rowcount which belongs to selectedPartitionIds
|
||||
* The cardinality here is actually inaccurate, it will be greater than the actual value.
|
||||
* There are two reasons
|
||||
* 1. During the actual execution, not all tablets belonging to the selected partition will be scanned.
|
||||
* Some tablets may have been pruned before execution.
|
||||
* 2. The base index may eventually be replaced by mv index.
|
||||
* <p>
|
||||
* There are three steps to calculate cardinality
|
||||
* 1. Calculate how many rows were scanned
|
||||
* 2. Apply conjunct
|
||||
* 3. Apply limit
|
||||
*
|
||||
* @param analyzer
|
||||
*/
|
||||
private void computeInaccurateStats(Analyzer analyzer) {
|
||||
super.computeStats(analyzer);
|
||||
// step1: Calculate how many rows were scanned
|
||||
cardinality = 0;
|
||||
for (long selectedPartitionId : selectedPartitionIds) {
|
||||
final Partition partition = olapTable.getPartition(selectedPartitionId);
|
||||
final MaterializedIndex baseIndex = partition.getBaseIndex();
|
||||
cardinality += baseIndex.getRowCount();
|
||||
}
|
||||
applyConjunctsSelectivity();
|
||||
capCardinalityAtLimit();
|
||||
}
|
||||
|
||||
private Collection<Long> partitionPrune(PartitionInfo partitionInfo, PartitionNames partitionNames) throws AnalysisException {
|
||||
@ -360,13 +414,13 @@ public class OlapScanNode extends ScanNode {
|
||||
MaterializedIndex table,
|
||||
DistributionInfo distributionInfo) throws AnalysisException {
|
||||
DistributionPruner distributionPruner = null;
|
||||
switch(distributionInfo.getType()) {
|
||||
switch (distributionInfo.getType()) {
|
||||
case HASH: {
|
||||
HashDistributionInfo info = (HashDistributionInfo) distributionInfo;
|
||||
distributionPruner = new HashDistributionPruner(table.getTabletIdsInOrder(),
|
||||
info.getDistributionColumns(),
|
||||
columnFilters,
|
||||
info.getBucketNum());
|
||||
info.getDistributionColumns(),
|
||||
columnFilters,
|
||||
info.getBucketNum());
|
||||
return distributionPruner.prune();
|
||||
}
|
||||
case RANDOM: {
|
||||
@ -409,7 +463,7 @@ public class OlapScanNode extends ScanNode {
|
||||
visibleVersion, visibleVersionHash, localBeId, schemaHash);
|
||||
if (allQueryableReplicas.isEmpty()) {
|
||||
LOG.error("no queryable replica found in tablet {}. visible version {}-{}",
|
||||
tabletId, visibleVersion, visibleVersionHash);
|
||||
tabletId, visibleVersion, visibleVersionHash);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
LOG.debug("tablet {}, replica: {}", tabletId, replica.toString());
|
||||
@ -462,6 +516,12 @@ public class OlapScanNode extends ScanNode {
|
||||
|
||||
result.add(scanRangeLocations);
|
||||
}
|
||||
// FIXME(dhc): we use cardinality here to simulate ndv
|
||||
if (tablets.size() == 0) {
|
||||
desc.setCardinality(0);
|
||||
} else {
|
||||
desc.setCardinality(cardinality);
|
||||
}
|
||||
}
|
||||
|
||||
private void computePartitionInfo() throws AnalysisException {
|
||||
@ -512,6 +572,7 @@ public class OlapScanNode extends ScanNode {
|
||||
|
||||
private void getScanRangeLocations() throws UserException {
|
||||
if (selectedPartitionIds.size() == 0) {
|
||||
desc.setCardinality(0);
|
||||
return;
|
||||
}
|
||||
Preconditions.checkState(selectedIndexId != -1);
|
||||
@ -597,9 +658,9 @@ public class OlapScanNode extends ScanNode {
|
||||
}
|
||||
|
||||
output.append(prefix).append(String.format(
|
||||
"partitions=%s/%s",
|
||||
selectedPartitionNum,
|
||||
olapTable.getPartitions().size()));
|
||||
"partitions=%s/%s",
|
||||
selectedPartitionNum,
|
||||
olapTable.getPartitions().size()));
|
||||
|
||||
String indexName = olapTable.getIndexNameById(selectedIndexId);
|
||||
output.append("\n").append(prefix).append(String.format("rollup: %s", indexName));
|
||||
@ -607,7 +668,7 @@ public class OlapScanNode extends ScanNode {
|
||||
output.append("\n");
|
||||
|
||||
output.append(prefix).append(String.format(
|
||||
"tabletRatio=%s/%s", selectedTabletsNum, totalTabletsNum));
|
||||
"tabletRatio=%s/%s", selectedTabletsNum, totalTabletsNum));
|
||||
output.append("\n");
|
||||
|
||||
// We print up to 10 tablet, and we print "..." if the number is more than 10
|
||||
@ -673,7 +734,6 @@ public class OlapScanNode extends ScanNode {
|
||||
olapScanNode.selectedTabletsNum = 1;
|
||||
olapScanNode.totalTabletsNum = 1;
|
||||
olapScanNode.setIsPreAggregation(false, "Export job");
|
||||
olapScanNode.isFinalized = true;
|
||||
olapScanNode.result.addAll(locationsList);
|
||||
|
||||
return olapScanNode;
|
||||
@ -715,7 +775,7 @@ public class OlapScanNode extends ScanNode {
|
||||
}
|
||||
}
|
||||
|
||||
public TupleId getTupleId(){
|
||||
public TupleId getTupleId() {
|
||||
Preconditions.checkNotNull(desc);
|
||||
return desc.getId();
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.planner;
|
||||
import com.google.common.base.Joiner;
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.ExprId;
|
||||
import org.apache.doris.analysis.ExprSubstitutionMap;
|
||||
import org.apache.doris.analysis.SlotId;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
@ -35,13 +36,13 @@ import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.math.LongMath;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
@ -65,9 +66,9 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
|
||||
|
||||
protected String planNodeName;
|
||||
|
||||
protected PlanNodeId id; // unique w/in plan tree; assigned by planner
|
||||
protected PlanNodeId id; // unique w/in plan tree; assigned by planner
|
||||
protected PlanFragmentId fragmentId; // assigned by planner after fragmentation step
|
||||
protected long limit; // max. # of rows to be returned; 0: no limit
|
||||
protected long limit; // max. # of rows to be returned; 0: no limit
|
||||
|
||||
// ids materialized by the tree rooted at this node
|
||||
protected ArrayList<TupleId> tupleIds;
|
||||
@ -115,13 +116,11 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
|
||||
protected boolean compactData;
|
||||
protected int numInstances;
|
||||
|
||||
public String getPlanNodeName() {
|
||||
return planNodeName;
|
||||
}
|
||||
|
||||
// Runtime filters assigned to this node.
|
||||
protected List<RuntimeFilter> runtimeFilters = new ArrayList<>();
|
||||
|
||||
private boolean cardinalityIsDone = false;
|
||||
|
||||
protected PlanNode(PlanNodeId id, ArrayList<TupleId> tupleIds, String planNodeName) {
|
||||
this.id = id;
|
||||
this.limit = -1;
|
||||
@ -159,6 +158,10 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
|
||||
this.numInstances = 1;
|
||||
}
|
||||
|
||||
public String getPlanNodeName() {
|
||||
return planNodeName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets tblRefIds_, tupleIds_, and nullableTupleIds_.
|
||||
* The default implementation is a no-op.
|
||||
@ -197,8 +200,14 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
|
||||
fragmentId = id;
|
||||
}
|
||||
|
||||
public void setFragment(PlanFragment fragment) { fragment_ = fragment; }
|
||||
public PlanFragment getFragment() { return fragment_; }
|
||||
public void setFragment(PlanFragment fragment) {
|
||||
fragment_ = fragment;
|
||||
}
|
||||
|
||||
public PlanFragment getFragment() {
|
||||
return fragment_;
|
||||
}
|
||||
|
||||
public long getLimit() {
|
||||
return limit;
|
||||
}
|
||||
@ -284,11 +293,12 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
|
||||
this.conjuncts.addAll(conjuncts);
|
||||
}
|
||||
|
||||
public void addPreFilterConjuncts(List<Expr> conjuncts) {
|
||||
if (conjuncts == null) {
|
||||
return;
|
||||
}
|
||||
this.preFilterConjuncts.addAll(conjuncts);
|
||||
public void setAssignedConjuncts(Set<ExprId> conjuncts) {
|
||||
assignedConjuncts = conjuncts;
|
||||
}
|
||||
|
||||
public Set<ExprId> getAssignedConjuncts() {
|
||||
return assignedConjuncts;
|
||||
}
|
||||
|
||||
public void transferConjuncts(PlanNode recipient) {
|
||||
@ -296,28 +306,24 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
|
||||
conjuncts.clear();
|
||||
}
|
||||
|
||||
public void addPreFilterConjuncts(List<Expr> conjuncts) {
|
||||
if (conjuncts == null) {
|
||||
return;
|
||||
}
|
||||
this.preFilterConjuncts.addAll(conjuncts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Call computeMemLayout() for all materialized tuples.
|
||||
* Call computeStatAndMemLayout() for all materialized tuples.
|
||||
*/
|
||||
protected void computeMemLayout(Analyzer analyzer) {
|
||||
for (TupleId id: tupleIds) {
|
||||
analyzer.getDescTbl().getTupleDesc(id).computeMemLayout();
|
||||
protected void computeTupleStatAndMemLayout(Analyzer analyzer) {
|
||||
for (TupleId id : tupleIds) {
|
||||
analyzer.getDescTbl().getTupleDesc(id).computeStatAndMemLayout();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Computes and returns the sum of two cardinalities. If an overflow occurs,
|
||||
* the maximum Long value is returned (Long.MAX_VALUE).
|
||||
*/
|
||||
public static long addCardinalities(long a, long b) {
|
||||
try {
|
||||
return LongMath.checkedAdd(a, b);
|
||||
} catch (ArithmeticException e) {
|
||||
LOG.warn("overflow when adding cardinalities: " + a + ", " + b);
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public String getExplainString() {
|
||||
return getExplainString("", "", TExplainLevel.VERBOSE);
|
||||
@ -378,8 +384,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
|
||||
String childDetailPrefix = prefix + "| ";
|
||||
for (int i = 1; i < children.size(); ++i) {
|
||||
expBuilder.append(
|
||||
children.get(i).getExplainString(childHeadlinePrefix, childDetailPrefix,
|
||||
detailLevel));
|
||||
children.get(i).getExplainString(childHeadlinePrefix, childDetailPrefix,
|
||||
detailLevel));
|
||||
expBuilder.append(childDetailPrefix + "\n");
|
||||
}
|
||||
expBuilder.append(children.get(0).getExplainString(prefix, prefix, detailLevel));
|
||||
@ -428,7 +434,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
|
||||
return;
|
||||
} else {
|
||||
msg.num_children = children.size();
|
||||
for (PlanNode child: children) {
|
||||
for (PlanNode child : children) {
|
||||
child.treeToThriftHelper(container);
|
||||
}
|
||||
}
|
||||
@ -443,11 +449,20 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
|
||||
for (PlanNode child : children) {
|
||||
child.finalize(analyzer);
|
||||
}
|
||||
computeStats(analyzer);
|
||||
computeNumNodes();
|
||||
if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
computeOldCardinality();
|
||||
}
|
||||
}
|
||||
|
||||
protected void computeNumNodes() {
|
||||
if (!children.isEmpty()) {
|
||||
numNodes = getChild(0).numNodes;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes planner statistics: avgRowSize, numNodes, cardinality.
|
||||
* Computes planner statistics: avgRowSize.
|
||||
* Subclasses need to override this.
|
||||
* Assumes that it has already been called on all children.
|
||||
* This is broken out of finalize() so that it can be called separately
|
||||
@ -460,31 +475,39 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
|
||||
TupleDescriptor desc = analyzer.getTupleDesc(tid);
|
||||
avgRowSize += desc.getAvgSerializedSize();
|
||||
}
|
||||
if (!children.isEmpty()) {
|
||||
numNodes = getChild(0).numNodes;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the product of the selectivity of all conjuncts.
|
||||
* This function will calculate the cardinality when the old join reorder algorithm is enabled.
|
||||
* This value is used to determine the distributed way(broadcast of shuffle) of join in the distributed planning.
|
||||
*
|
||||
* If the new join reorder and the old join reorder have the same cardinality calculation method,
|
||||
* also the calculation is completed in the init(),
|
||||
* there is no need to override this function.
|
||||
*/
|
||||
protected double computeSelectivity() {
|
||||
double prod = 1.0;
|
||||
for (Expr e : conjuncts) {
|
||||
if (e.getSelectivity() < 0) {
|
||||
return -1.0;
|
||||
}
|
||||
prod *= e.getSelectivity();
|
||||
protected void computeOldCardinality() {
|
||||
}
|
||||
|
||||
protected void capCardinalityAtLimit() {
|
||||
if (hasLimit()) {
|
||||
cardinality = cardinality == -1 ? limit : Math.min(cardinality, limit);
|
||||
}
|
||||
return prod;
|
||||
}
|
||||
|
||||
protected ExprSubstitutionMap outputSmap;
|
||||
|
||||
// global state of planning wrt conjunct assignment; used by planner as a shortcut
|
||||
// to avoid having to pass assigned conjuncts back and forth
|
||||
// (the planner uses this to save and reset the global state in between join tree
|
||||
// alternatives)
|
||||
protected Set<ExprId> assignedConjuncts;
|
||||
|
||||
protected ExprSubstitutionMap withoutTupleIsNullOutputSmap;
|
||||
|
||||
public ExprSubstitutionMap getOutputSmap() {
|
||||
return outputSmap;
|
||||
}
|
||||
|
||||
public void setOutputSmap(ExprSubstitutionMap smap) {
|
||||
outputSmap = smap;
|
||||
}
|
||||
@ -492,25 +515,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
|
||||
public void setWithoutTupleIsNullOutputSmap(ExprSubstitutionMap smap) {
|
||||
withoutTupleIsNullOutputSmap = smap;
|
||||
}
|
||||
|
||||
public ExprSubstitutionMap getWithoutTupleIsNullOutputSmap() {
|
||||
return withoutTupleIsNullOutputSmap == null ? outputSmap : withoutTupleIsNullOutputSmap;
|
||||
}
|
||||
/**
|
||||
* Marks all slots referenced in exprs as materialized.
|
||||
*/
|
||||
protected void markSlotsMaterialized(Analyzer analyzer, List<Expr> exprs) {
|
||||
List<SlotId> refdIdList = Lists.newArrayList();
|
||||
|
||||
for (Expr expr: exprs) {
|
||||
expr.getIds(null, refdIdList);
|
||||
}
|
||||
|
||||
analyzer.materializeSlots(exprs);
|
||||
}
|
||||
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
assignConjuncts(analyzer);
|
||||
computeStats(analyzer);
|
||||
createDefaultSmap(analyzer);
|
||||
}
|
||||
|
||||
@ -567,12 +578,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
|
||||
/**
|
||||
* Sets outputSmap_ to compose(existing smap, combined child smap). Also
|
||||
* substitutes conjuncts_ using the combined child smap.
|
||||
*
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
protected void createDefaultSmap(Analyzer analyzer) throws UserException {
|
||||
ExprSubstitutionMap combinedChildSmap = getCombinedChildSmap();
|
||||
outputSmap =
|
||||
ExprSubstitutionMap.compose(outputSmap, combinedChildSmap, analyzer);
|
||||
ExprSubstitutionMap.compose(outputSmap, combinedChildSmap, analyzer);
|
||||
|
||||
conjuncts = Expr.substituteList(conjuncts, outputSmap, analyzer, false);
|
||||
}
|
||||
@ -646,13 +658,85 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("[").append(getId().asInt()).append(": ").append(getPlanNodeName()).append("]");
|
||||
sb.append("\nFragment: ").append(getFragmentId().asInt()).append("]");
|
||||
sb.append("\n").append(getNodeExplainString("", TExplainLevel.BRIEF));
|
||||
return sb.toString();
|
||||
|
||||
/**
|
||||
* Returns the estimated combined selectivity of all conjuncts. Uses heuristics to
|
||||
* address the following estimation challenges:
|
||||
* 1. The individual selectivities of conjuncts may be unknown.
|
||||
* 2. Two selectivities, whether known or unknown, could be correlated. Assuming
|
||||
* independence can lead to significant underestimation.
|
||||
* <p>
|
||||
* The first issue is addressed by using a single default selectivity that is
|
||||
* representative of all conjuncts with unknown selectivities.
|
||||
* The second issue is addressed by an exponential backoff when multiplying each
|
||||
* additional selectivity into the final result.
|
||||
*/
|
||||
static protected double computeCombinedSelectivity(List<Expr> conjuncts) {
|
||||
// Collect all estimated selectivities.
|
||||
List<Double> selectivities = new ArrayList<>();
|
||||
for (Expr e : conjuncts) {
|
||||
if (e.hasSelectivity()) selectivities.add(e.getSelectivity());
|
||||
}
|
||||
if (selectivities.size() != conjuncts.size()) {
|
||||
// Some conjuncts have no estimated selectivity. Use a single default
|
||||
// representative selectivity for all those conjuncts.
|
||||
selectivities.add(Expr.DEFAULT_SELECTIVITY);
|
||||
}
|
||||
// Sort the selectivities to get a consistent estimate, regardless of the original
|
||||
// conjunct order. Sort in ascending order such that the most selective conjunct
|
||||
// is fully applied.
|
||||
Collections.sort(selectivities);
|
||||
double result = 1.0;
|
||||
// selectivity = 1 * (s1)^(1/1) * (s2)^(1/2) * ... * (sn-1)^(1/(n-1)) * (sn)^(1/n)
|
||||
for (int i = 0; i < selectivities.size(); ++i) {
|
||||
// Exponential backoff for each selectivity multiplied into the final result.
|
||||
result *= Math.pow(selectivities.get(i), 1.0 / (double) (i + 1));
|
||||
}
|
||||
// Bound result in [0, 1]
|
||||
return Math.max(0.0, Math.min(1.0, result));
|
||||
}
|
||||
|
||||
protected double computeSelectivity() {
|
||||
for (Expr expr : conjuncts) {
|
||||
expr.setSelectivity();
|
||||
}
|
||||
return computeCombinedSelectivity(conjuncts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the product of the selectivity of all conjuncts.
|
||||
* This function is used for old cardinality in finalize()
|
||||
*/
|
||||
protected double computeOldSelectivity() {
|
||||
double prod = 1.0;
|
||||
for (Expr e : conjuncts) {
|
||||
if (e.getSelectivity() < 0) {
|
||||
return -1.0;
|
||||
}
|
||||
prod *= e.getSelectivity();
|
||||
}
|
||||
return prod;
|
||||
}
|
||||
|
||||
// Compute the cardinality after applying conjuncts based on 'preConjunctCardinality'.
|
||||
protected void applyConjunctsSelectivity() {
|
||||
if (cardinality == -1) {
|
||||
return;
|
||||
}
|
||||
applySelectivity();
|
||||
}
|
||||
|
||||
// Compute the cardinality after applying conjuncts with 'selectivity', based on
|
||||
// 'preConjunctCardinality'.
|
||||
private void applySelectivity() {
|
||||
double selectivity = computeSelectivity();
|
||||
Preconditions.checkState(cardinality >= 0);
|
||||
long preConjunctCardinality = cardinality;
|
||||
cardinality = Math.round(cardinality * selectivity);
|
||||
// don't round cardinality down to zero for safety.
|
||||
if (cardinality == 0 && preConjunctCardinality > 0) {
|
||||
cardinality = 1;
|
||||
}
|
||||
}
|
||||
|
||||
public ScanNode getScanNodeInOneFragmentByTupleId(TupleId tupleId) {
|
||||
@ -695,4 +779,13 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
|
||||
}
|
||||
return Joiner.on(", ").join(filtersStr) + "\n";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("[").append(getId().asInt()).append(": ").append(getPlanNodeName()).append("]");
|
||||
sb.append("\nFragment: ").append(getFragmentId().asInt()).append("]");
|
||||
sb.append("\n").append(getNodeExplainString("", TExplainLevel.BRIEF));
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@ -183,10 +183,26 @@ public class Planner {
|
||||
if (selectFailed) {
|
||||
throw new MVSelectFailedException("Failed to select materialize view");
|
||||
}
|
||||
// compute mem layout *before* finalize(); finalize() may reference
|
||||
// TupleDescriptor.avgSerializedSize
|
||||
|
||||
/**
|
||||
* - Under normal circumstances, computeMemLayout() will be executed
|
||||
* at the end of the init function of the plan node.
|
||||
* Such as :
|
||||
* OlapScanNode {
|
||||
* init () {
|
||||
* analyzer.materializeSlots(conjuncts);
|
||||
* computeTupleStatAndMemLayout(analyzer);
|
||||
* computeStat();
|
||||
* }
|
||||
* }
|
||||
* - However Doris is currently unable to determine
|
||||
* whether it is possible to cut or increase the columns in the tuple after PlanNode.init().
|
||||
* - Therefore, for the time being, computeMemLayout() can only be placed
|
||||
* after the completion of the entire single node planner.
|
||||
*/
|
||||
analyzer.getDescTbl().computeMemLayout();
|
||||
singleNodePlan.finalize(analyzer);
|
||||
|
||||
if (queryOptions.num_nodes == 1) {
|
||||
// single-node execution; we're almost done
|
||||
singleNodePlan = addUnassignedConjuncts(analyzer, singleNodePlan);
|
||||
|
||||
@ -33,11 +33,13 @@ import org.apache.doris.thrift.TPlanNode;
|
||||
import org.apache.doris.thrift.TPlanNodeType;
|
||||
import org.apache.doris.thrift.TRepeatNode;
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.BitSet;
|
||||
import java.util.Collections;
|
||||
@ -52,6 +54,8 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
public class RepeatNode extends PlanNode {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(RepeatNode.class);
|
||||
|
||||
private List<Set<Integer>> repeatSlotIdList;
|
||||
private Set<Integer> allSlotId;
|
||||
private TupleDescriptor outputTupleDesc;
|
||||
@ -96,8 +100,11 @@ public class RepeatNode extends PlanNode {
|
||||
@Override
|
||||
public void computeStats(Analyzer analyzer) {
|
||||
avgRowSize = 0;
|
||||
cardinality = 0;
|
||||
numNodes = 1;
|
||||
cardinality = 0;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("stats Sort: cardinality=" + cardinality);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -128,7 +135,7 @@ public class RepeatNode extends PlanNode {
|
||||
((SlotRef) slot).getDesc().setIsNullable(true);
|
||||
}
|
||||
}
|
||||
outputTupleDesc.computeMemLayout();
|
||||
outputTupleDesc.computeStatAndMemLayout();
|
||||
|
||||
List<Set<SlotId>> groupingIdList = new ArrayList<>();
|
||||
List<Expr> exprList = groupByClause.getGroupingExprs();
|
||||
@ -163,7 +170,7 @@ public class RepeatNode extends PlanNode {
|
||||
for (TupleId id : tupleIds) {
|
||||
analyzer.getTupleDesc(id).setIsMaterialized(true);
|
||||
}
|
||||
computeMemLayout(analyzer);
|
||||
computeTupleStatAndMemLayout(analyzer);
|
||||
computeStats(analyzer);
|
||||
createDefaultSmap(analyzer);
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
@ -43,6 +44,13 @@ abstract public class ScanNode extends PlanNode {
|
||||
this.desc = desc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
super.init(analyzer);
|
||||
// materialize conjuncts in where
|
||||
analyzer.materializeSlots(conjuncts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to parse a "host:port" address string into TNetworkAddress
|
||||
* This is called with ipaddress:port when doing scan range assigment.
|
||||
|
||||
@ -18,14 +18,15 @@
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.thrift.TExplainLevel;
|
||||
import org.apache.doris.thrift.TPlanNode;
|
||||
import org.apache.doris.thrift.TPlanNodeType;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -54,16 +55,29 @@ public class SelectNode extends PlanNode {
|
||||
|
||||
@Override
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
analyzer.markConjunctsAssigned(conjuncts);
|
||||
computeStats(analyzer);
|
||||
createDefaultSmap(analyzer);
|
||||
super.init(analyzer);
|
||||
analyzer.markConjunctsAssigned(conjuncts);
|
||||
computeStats(analyzer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void computeStats(Analyzer analyzer) {
|
||||
super.computeStats(analyzer);
|
||||
if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
return;
|
||||
}
|
||||
cardinality = getChild(0).cardinality;
|
||||
applyConjunctsSelectivity();
|
||||
capCardinalityAtLimit();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("stats Select: cardinality={}", this.cardinality);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void computeOldCardinality() {
|
||||
long cardinality = getChild(0).cardinality;
|
||||
double selectivity = computeSelectivity();
|
||||
double selectivity = computeOldSelectivity();
|
||||
if (cardinality < 0 || selectivity < 0) {
|
||||
this.cardinality = -1;
|
||||
} else {
|
||||
|
||||
@ -17,17 +17,13 @@
|
||||
|
||||
package org.apache.doris.planner;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.analysis.TupleId;
|
||||
import org.apache.doris.common.CheckedMath;
|
||||
import org.apache.doris.thrift.TExceptNode;
|
||||
import org.apache.doris.thrift.TExplainLevel;
|
||||
import org.apache.doris.thrift.TExpr;
|
||||
@ -36,18 +32,19 @@ import org.apache.doris.thrift.TPlanNode;
|
||||
import org.apache.doris.thrift.TPlanNodeType;
|
||||
import org.apache.doris.thrift.TUnionNode;
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Node that merges the results of its child plans, Normally, this is done by
|
||||
* materializing the corresponding result exprs into a new tuple. However, if
|
||||
@ -131,12 +128,24 @@ public abstract class SetOperationNode extends PlanNode {
|
||||
@Override
|
||||
public void computeStats(Analyzer analyzer) {
|
||||
super.computeStats(analyzer);
|
||||
if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
return;
|
||||
}
|
||||
computeCardinality();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void computeOldCardinality() {
|
||||
computeCardinality();
|
||||
}
|
||||
|
||||
private void computeCardinality() {
|
||||
cardinality = constExprLists_.size();
|
||||
for (PlanNode child : children) {
|
||||
// ignore missing child cardinality info in the hope it won't matter enough
|
||||
// to change the planning outcome
|
||||
if (child.cardinality > 0) {
|
||||
cardinality = addCardinalities(cardinality, child.cardinality);
|
||||
cardinality = CheckedMath.checkedAdd(cardinality, child.cardinality);
|
||||
}
|
||||
}
|
||||
// The number of nodes of a set operation node is -1 (invalid) if all the referenced tables
|
||||
@ -145,31 +154,12 @@ public abstract class SetOperationNode extends PlanNode {
|
||||
if (numNodes == -1) {
|
||||
numNodes = 1;
|
||||
}
|
||||
cardinality = capAtLimit(cardinality);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("stats Union: cardinality=" + Long.toString(cardinality));
|
||||
capCardinalityAtLimit();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.trace("stats Union: cardinality=" + cardinality);
|
||||
}
|
||||
}
|
||||
|
||||
protected long capAtLimit(long cardinality) {
|
||||
if (hasLimit()) {
|
||||
if (cardinality == -1) {
|
||||
return limit;
|
||||
} else {
|
||||
return Math.min(cardinality, limit);
|
||||
}
|
||||
}
|
||||
return cardinality;
|
||||
}
|
||||
|
||||
/*
|
||||
@Override
|
||||
public void computeResourceProfile(TQueryOptions queryOptions) {
|
||||
// TODO: add an estimate
|
||||
resourceProfile_ = new ResourceProfile(0, 0);
|
||||
}
|
||||
*/
|
||||
|
||||
/**
|
||||
* Returns true if rows from the child with 'childTupleIds' and 'childResultExprs' can
|
||||
* be returned directly by the set operation node (without materialization into a new tuple).
|
||||
@ -270,7 +260,7 @@ public abstract class SetOperationNode extends PlanNode {
|
||||
@Override
|
||||
public void init(Analyzer analyzer) {
|
||||
Preconditions.checkState(conjuncts.isEmpty());
|
||||
computeMemLayout(analyzer);
|
||||
computeTupleStatAndMemLayout(analyzer);
|
||||
computeStats(analyzer);
|
||||
// except Node must not reorder the child
|
||||
if (!(this instanceof ExceptNode)) {
|
||||
|
||||
@ -56,6 +56,7 @@ import org.apache.doris.catalog.OdbcTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.Reference;
|
||||
import org.apache.doris.common.UserException;
|
||||
|
||||
@ -64,15 +65,20 @@ import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Constructs a non-executable single-node plan from an analyzed parse tree.
|
||||
@ -81,7 +87,7 @@ import java.util.Set;
|
||||
* The single-node plan needs to be wrapped in a plan fragment for it to be executable.
|
||||
*/
|
||||
public class SingleNodePlanner {
|
||||
private final static Logger LOG = LogManager.getLogger(DistributedPlanner.class);
|
||||
private final static Logger LOG = LogManager.getLogger(SingleNodePlanner.class);
|
||||
|
||||
private final PlannerContext ctx_;
|
||||
private final ArrayList<ScanNode> scanNodes = Lists.newArrayList();
|
||||
@ -91,6 +97,10 @@ public class SingleNodePlanner {
|
||||
ctx_ = ctx;
|
||||
}
|
||||
|
||||
public PlannerContext getPlannerContext() {
|
||||
return ctx_;
|
||||
}
|
||||
|
||||
public ArrayList<ScanNode> getScanNodes() {
|
||||
return scanNodes;
|
||||
}
|
||||
@ -652,6 +662,270 @@ public class SingleNodePlanner {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the cheapest plan that materializes the joins of all TableRefs in
|
||||
* refPlans and the subplans of all applicable TableRefs in subplanRefs.
|
||||
* Assumes that refPlans are in the order as they originally appeared in
|
||||
* the query.
|
||||
* For this plan:
|
||||
* - the plan is executable, ie, all non-cross joins have equi-join predicates
|
||||
* - the leftmost scan is over the largest of the inputs for which we can still
|
||||
* construct an executable plan
|
||||
* - from bottom to top, all rhs's are in increasing order of selectivity (percentage
|
||||
* of surviving rows)
|
||||
* - outer/cross/semi joins: rhs serialized size is < lhs serialized size;
|
||||
* enforced via join inversion, if necessary
|
||||
* - SubplanNodes are placed as low as possible in the plan tree - as soon as the
|
||||
* required tuple ids of one or more TableRefs in subplanRefs are materialized
|
||||
* Returns null if we can't create an executable plan.
|
||||
*/
|
||||
private PlanNode createCheapestJoinPlan(Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans) throws UserException {
|
||||
if (refPlans.size() == 1) {
|
||||
return refPlans.get(0).second;
|
||||
}
|
||||
|
||||
// collect eligible candidates for the leftmost input; list contains
|
||||
// (plan, materialized size)
|
||||
List<Pair<TableRef, Long>> candidates = new ArrayList<>();
|
||||
for (Pair<TableRef, PlanNode> entry : refPlans) {
|
||||
TableRef ref = entry.first;
|
||||
JoinOperator joinOp = ref.getJoinOp();
|
||||
|
||||
// Avoid reordering outer/semi joins which is generally incorrect.
|
||||
// consideration of the joinOps that result from such a re-ordering (IMPALA-1281).
|
||||
// TODO: Allow the rhs of any cross join as the leftmost table. This needs careful
|
||||
if (joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
PlanNode plan = entry.second;
|
||||
if (plan.getCardinality() == -1) {
|
||||
// use 0 for the size to avoid it becoming the leftmost input
|
||||
// TODO: Consider raw size of scanned partitions in the absence of stats.
|
||||
candidates.add(new Pair<>(ref, new Long(0)));
|
||||
LOG.debug("The candidate of " + ref.getUniqueAlias() + ": -1. "
|
||||
+ "Using 0 instead of -1 to avoid error");
|
||||
continue;
|
||||
}
|
||||
Preconditions.checkState(ref.isAnalyzed());
|
||||
long materializedSize = plan.getCardinality();
|
||||
candidates.add(new Pair<>(ref, new Long(materializedSize)));
|
||||
LOG.debug("The candidate of " + ref.getUniqueAlias() + ": " + materializedSize);
|
||||
}
|
||||
// (ML): 这里感觉是不可能运行到的,因为起码第一个节点是inner join
|
||||
if (candidates.isEmpty()) return null;
|
||||
|
||||
// order candidates by descending materialized size; we want to minimize the memory
|
||||
// consumption of the materialized hash tables required for the join sequence
|
||||
Collections.sort(candidates,
|
||||
new Comparator<Pair<TableRef, Long>>() {
|
||||
@Override
|
||||
public int compare(Pair<TableRef, Long> a, Pair<TableRef, Long> b) {
|
||||
long diff = b.second - a.second;
|
||||
return (diff < 0 ? -1 : (diff > 0 ? 1 : 0));
|
||||
}
|
||||
});
|
||||
|
||||
for (Pair<TableRef, Long> candidate : candidates) {
|
||||
PlanNode result = createJoinPlan(analyzer, candidate.first, refPlans);
|
||||
if (result != null) return result;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
boolean candidateCardinalityIsSmaller(PlanNode candidate, long candidateInnerNodeCardinality,
|
||||
PlanNode newRoot, long newRootInnerNodeCardinality) {
|
||||
if (candidate.getCardinality() < newRoot.getCardinality()) {
|
||||
return true;
|
||||
} else if (candidate.getCardinality() == newRoot.getCardinality()) {
|
||||
if (((candidate instanceof HashJoinNode) && ((HashJoinNode) candidate).getJoinOp().isInnerJoin())
|
||||
&& ((newRoot instanceof HashJoinNode) && ((HashJoinNode) newRoot).getJoinOp().isInnerJoin())) {
|
||||
if (candidateInnerNodeCardinality < newRootInnerNodeCardinality) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a plan with leftmostRef's plan as its leftmost input; the joins
|
||||
* are in decreasing order of selectiveness (percentage of rows they eliminate).
|
||||
* Creates and adds subplan nodes as soon as the tuple ids required by at least one
|
||||
* subplan ref are materialized by a join node added during plan generation.
|
||||
*/
|
||||
// (ML): change the function name
|
||||
private PlanNode createJoinPlan(Analyzer analyzer,
|
||||
TableRef leftmostRef, List<Pair<TableRef, PlanNode>> refPlans)
|
||||
throws UserException {
|
||||
LOG.debug("Try to create a query plan starting with " + leftmostRef.getUniqueAlias());
|
||||
|
||||
// the refs that have yet to be joined
|
||||
List<Pair<TableRef, PlanNode>> remainingRefs = new ArrayList<>();
|
||||
PlanNode root = null; // root of accumulated join plan
|
||||
for (Pair<TableRef, PlanNode> entry : refPlans) {
|
||||
if (entry.first == leftmostRef) {
|
||||
root = entry.second;
|
||||
} else {
|
||||
remainingRefs.add(entry);
|
||||
}
|
||||
}
|
||||
Preconditions.checkNotNull(root);
|
||||
|
||||
// Maps from a TableRef in refPlans with an outer/semi join op to the set of
|
||||
// TableRefs that precede it refPlans (i.e., in FROM-clause order).
|
||||
// The map is used to place outer/semi joins at a fixed position in the plan tree
|
||||
// (IMPALA-860), s.t. all the tables appearing to the left/right of an outer/semi
|
||||
// join in the original query still remain to the left/right after join ordering.
|
||||
// This prevents join re-ordering across outer/semi joins which is generally wrong.
|
||||
|
||||
/**
|
||||
* Key of precedingRefs: the right table ref of outer or semi join
|
||||
* Value of precedingRefs: the preceding refs of this key
|
||||
* For example:
|
||||
* select * from t1, t2, t3 left join t4, t5, t6 right semi join t7, t8, t9
|
||||
* Map:
|
||||
* { t4: [t1, t2, t3],
|
||||
* t7: [t1, t2, t3, t4, t5, t6]
|
||||
* }
|
||||
*/
|
||||
Map<TableRef, Set<TableRef>> precedingRefs = new HashMap<>();
|
||||
List<TableRef> tmpTblRefs = new ArrayList<>();
|
||||
for (Pair<TableRef, PlanNode> entry : refPlans) {
|
||||
TableRef tblRef = entry.first;
|
||||
if (tblRef.getJoinOp().isOuterJoin() || tblRef.getJoinOp().isSemiJoin()) {
|
||||
precedingRefs.put(tblRef, Sets.newHashSet(tmpTblRefs));
|
||||
}
|
||||
tmpTblRefs.add(tblRef);
|
||||
}
|
||||
|
||||
// Refs that have been joined. The union of joinedRefs and the refs in remainingRefs
|
||||
// are the set of all table refs.
|
||||
Set<TableRef> joinedRefs = Sets.newHashSet(leftmostRef);
|
||||
// two statistical value
|
||||
long numOps = 0;
|
||||
// A total of several rounds of successful selection
|
||||
int successfulSelectionTimes = 0;
|
||||
while (!remainingRefs.isEmpty()) {
|
||||
// We minimize the resulting cardinality at each step in the join chain,
|
||||
// which minimizes the total number of hash table lookups.
|
||||
PlanNode newRoot = null;
|
||||
Pair<TableRef, PlanNode> minEntry = null;
|
||||
long newRootRightChildCardinality = 0;
|
||||
for (Pair<TableRef, PlanNode> tblRefToPlanNodeOfCandidate : remainingRefs) {
|
||||
TableRef tblRefOfCandidate = tblRefToPlanNodeOfCandidate.first;
|
||||
long cardinalityOfCandidate = tblRefToPlanNodeOfCandidate.second.getCardinality();
|
||||
PlanNode rootPlanNodeOfCandidate = tblRefToPlanNodeOfCandidate.second;
|
||||
JoinOperator joinOp = tblRefOfCandidate.getJoinOp();
|
||||
|
||||
// Place outer/semi joins at a fixed position in the plan tree.
|
||||
Set<TableRef> requiredRefs = precedingRefs.get(tblRefOfCandidate);
|
||||
if (requiredRefs != null) {
|
||||
Preconditions.checkState(joinOp.isOuterJoin()
|
||||
|| joinOp.isSemiJoin());
|
||||
/**
|
||||
* The semi and outer join nodes are similar to the stop nodes in each round of the algorithm.
|
||||
* If the stop node is encountered during the current round of optimal selection,
|
||||
* it means that the following nodes do not need to be referred to.
|
||||
* This round has been completed.
|
||||
* There are two situation in here.
|
||||
* Situation 1: required table refs have not been placed yet
|
||||
* t1, t2, t3 left join t4, t5
|
||||
* Round 1: t3, t1(new root) meets t4(stop)
|
||||
* stop this round and begin next round
|
||||
* Situation 2: the remaining table refs to prevent incorrect re-ordering of tables across outer/semi joins
|
||||
* Round 1: t5, t1, t2, t3(root) meets t4(stop)
|
||||
* stop this round while the new root is null
|
||||
* planning failed and return null
|
||||
*/
|
||||
if (!requiredRefs.equals(joinedRefs)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
// reset assigned conjuncts of analyzer in every compare
|
||||
analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
|
||||
PlanNode candidate = createJoinNode(analyzer, root, rootPlanNodeOfCandidate, tblRefOfCandidate);
|
||||
// (ML): 这里还需要吗?应该不会返回null吧
|
||||
if (candidate == null) {
|
||||
continue;
|
||||
}
|
||||
// Have the build side of a join copy data to a compact representation
|
||||
// in the tuple buffer.
|
||||
candidate.getChildren().get(1).setCompactData(true);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
stringBuilder.append("The " + tblRefOfCandidate.getUniqueAlias() + " is right child of join node.");
|
||||
stringBuilder.append("The join cardinality is " + candidate.getCardinality() + ".");
|
||||
stringBuilder.append("In round " + successfulSelectionTimes);
|
||||
LOG.debug(stringBuilder.toString());
|
||||
}
|
||||
|
||||
// Use 'candidate' as the new root; don't consider any other table refs at this
|
||||
// position in the plan.
|
||||
if (joinOp.isOuterJoin() || joinOp.isSemiJoin()) {
|
||||
newRoot = candidate;
|
||||
minEntry = tblRefToPlanNodeOfCandidate;
|
||||
break;
|
||||
}
|
||||
|
||||
// Always prefer Hash Join over Nested-Loop Join due to limited costing
|
||||
// infrastructure.
|
||||
/**
|
||||
* The following three conditions are met while the candidate is better.
|
||||
* 1. The first candidate
|
||||
* 2. The candidate is better than new root: [t3, t2] pk [t3, t1] => [t3, t1]
|
||||
* 3. The hash join is better than cross join: [t3 cross t1] pk [t3 hash t2] => t3 hash t2
|
||||
*/
|
||||
if (newRoot == null
|
||||
|| ((candidate.getClass().equals(newRoot.getClass()) && candidateCardinalityIsSmaller(candidate, tblRefToPlanNodeOfCandidate.second.getCardinality(), newRoot, newRootRightChildCardinality)))
|
||||
|| (candidate instanceof HashJoinNode && newRoot instanceof CrossJoinNode)) {
|
||||
newRoot = candidate;
|
||||
minEntry = tblRefToPlanNodeOfCandidate;
|
||||
newRootRightChildCardinality = cardinalityOfCandidate;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The table after the outer or semi join is wrongly planned to the front,
|
||||
* causing the first tblRefToPlanNodeOfCandidate (outer or semi tbl ref) in this round of loop to fail and exit the loop.
|
||||
* This means that the current leftmost node must be wrong, and the correct result cannot be planned.
|
||||
*
|
||||
* For example:
|
||||
* Query: t1 left join t2 inner join t3
|
||||
* Input params: t3(left most tbl ref), [t1,t2] (remaining refs)
|
||||
* Round 1: t3, t1 (joined refs) t2 (remaining refs)
|
||||
* Round 2: requiredRefs.equals(joinedRefs) is false and break, the newRoot is null
|
||||
* Result: null
|
||||
* The t3 should not appear before t2 so planning is fail
|
||||
*/
|
||||
if (newRoot == null) {
|
||||
// Could not generate a valid plan.
|
||||
// for example: the biggest table is the last table
|
||||
return null;
|
||||
}
|
||||
|
||||
// we need to insert every rhs row into the hash table and then look up
|
||||
// every lhs row
|
||||
long lhsCardinality = root.getCardinality();
|
||||
long rhsCardinality = minEntry.second.getCardinality();
|
||||
numOps += lhsCardinality + rhsCardinality;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Round " + successfulSelectionTimes + " chose " + minEntry.first.getUniqueAlias()
|
||||
+ " #lhs=" + lhsCardinality + " #rhs=" + rhsCardinality + " #ops=" + numOps);
|
||||
}
|
||||
remainingRefs.remove(minEntry);
|
||||
joinedRefs.add(minEntry.first);
|
||||
root = newRoot;
|
||||
analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
|
||||
++successfulSelectionTimes;
|
||||
}
|
||||
|
||||
LOG.debug("The final join sequence is "
|
||||
+ joinedRefs.stream().map(TableRef::getUniqueAlias).collect(Collectors.joining(",")));
|
||||
return root;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create tree of PlanNodes that implements the Select/Project/Join/Group by/Having
|
||||
* of the selectStmt query block.
|
||||
@ -691,35 +965,66 @@ public class SingleNodePlanner {
|
||||
return createAggregationPlan(selectStmt, analyzer, emptySetNode);
|
||||
}
|
||||
|
||||
// create left-deep sequence of binary hash joins; assign node ids as we go along
|
||||
TableRef tblRef = selectStmt.getTableRefs().get(0);
|
||||
materializeTableResultForCrossJoinOrCountStar(tblRef, analyzer);
|
||||
PlanNode root = createTableRefNode(analyzer, tblRef, selectStmt);
|
||||
// to change the inner contains analytic function
|
||||
// selectStmt.seondSubstituteInlineViewExprs(analyzer.getChangeResSmap());
|
||||
|
||||
// add aggregate node here
|
||||
PlanNode root = null;
|
||||
AggregateInfo aggInfo = selectStmt.getAggInfo();
|
||||
|
||||
turnOffPreAgg(aggInfo, selectStmt, analyzer, root);
|
||||
if (analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
LOG.debug("Using new join reorder strategy when enable_join_reorder_based_cost is true");
|
||||
// create plans for our table refs; use a list here instead of a map to
|
||||
// maintain a deterministic order of traversing the TableRefs during join
|
||||
// plan generation (helps with tests)
|
||||
List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
|
||||
for (TableRef ref : selectStmt.getTableRefs()) {
|
||||
materializeTableResultForCrossJoinOrCountStar(ref, analyzer);
|
||||
PlanNode plan = createTableRefNode(analyzer, ref, selectStmt);
|
||||
turnOffPreAgg(aggInfo, selectStmt, analyzer, plan);
|
||||
|
||||
if (root instanceof OlapScanNode) {
|
||||
OlapScanNode olapNode = (OlapScanNode) root;
|
||||
// this olap scan node has been turn off pre-aggregation, should not be turned on again.
|
||||
// e.g. select sum(v1) from (select v1 from test_table);
|
||||
if (!olapNode.isPreAggregation()) {
|
||||
olapNode.setCanTurnOnPreAggr(false);
|
||||
if (plan instanceof OlapScanNode) {
|
||||
OlapScanNode olapNode = (OlapScanNode) plan;
|
||||
// this olap scan node has been turn off pre-aggregation, should not be turned on again.
|
||||
// e.g. select sum(v1) from (select v1 from test_table);
|
||||
if (!olapNode.isPreAggregation()) {
|
||||
olapNode.setCanTurnOnPreAggr(false);
|
||||
}
|
||||
}
|
||||
|
||||
Preconditions.checkState(plan != null);
|
||||
refPlans.add(new Pair(ref, plan));
|
||||
}
|
||||
}
|
||||
// save state of conjunct assignment; needed for join plan generation
|
||||
for (Pair<TableRef, PlanNode> entry : refPlans) {
|
||||
entry.second.setAssignedConjuncts(analyzer.getAssignedConjuncts());
|
||||
}
|
||||
root = createCheapestJoinPlan(analyzer, refPlans);
|
||||
Preconditions.checkState(root != null);
|
||||
} else {
|
||||
// create left-deep sequence of binary hash joins; assign node ids as we go along
|
||||
TableRef tblRef = selectStmt.getTableRefs().get(0);
|
||||
materializeTableResultForCrossJoinOrCountStar(tblRef, analyzer);
|
||||
root = createTableRefNode(analyzer, tblRef, selectStmt);
|
||||
// to change the inner contains analytic function
|
||||
// selectStmt.seondSubstituteInlineViewExprs(analyzer.getChangeResSmap());
|
||||
|
||||
for (int i = 1; i < selectStmt.getTableRefs().size(); ++i) {
|
||||
TableRef outerRef = selectStmt.getTableRefs().get(i - 1);
|
||||
TableRef innerRef = selectStmt.getTableRefs().get(i);
|
||||
root = createJoinNode(analyzer, root, outerRef, innerRef, selectStmt);
|
||||
// Have the build side of a join copy data to a compact representation
|
||||
// in the tuple buffer.
|
||||
root.getChildren().get(1).setCompactData(true);
|
||||
root.assignConjuncts(analyzer);
|
||||
turnOffPreAgg(aggInfo, selectStmt, analyzer, root);
|
||||
|
||||
if (root instanceof OlapScanNode) {
|
||||
OlapScanNode olapNode = (OlapScanNode) root;
|
||||
// this olap scan node has been turn off pre-aggregation, should not be turned on again.
|
||||
// e .g. select sum(v1) from (select v1 from test_table);
|
||||
if (!olapNode.isPreAggregation()) {
|
||||
olapNode.setCanTurnOnPreAggr(false);
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 1; i < selectStmt.getTableRefs().size(); ++i) {
|
||||
TableRef outerRef = selectStmt.getTableRefs().get(i - 1);
|
||||
TableRef innerRef = selectStmt.getTableRefs().get(i);
|
||||
root = createJoinNode(analyzer, root, innerRef, selectStmt);
|
||||
// Have the build side of a join copy data to a compact representation
|
||||
// in the tuple buffer.
|
||||
root.getChildren().get(1).setCompactData(true);
|
||||
root.assignConjuncts(analyzer);
|
||||
}
|
||||
}
|
||||
|
||||
if (selectStmt.getSortInfo() != null && selectStmt.getLimit() == -1
|
||||
@ -902,7 +1207,7 @@ public class SingleNodePlanner {
|
||||
// slotDesc.setStats(ColumnStats.fromExpr(resultExpr));
|
||||
slotDesc.setIsMaterialized(true);
|
||||
}
|
||||
tupleDesc.computeMemLayout();
|
||||
tupleDesc.computeStatAndMemLayout();
|
||||
return tupleDesc;
|
||||
}
|
||||
|
||||
@ -1451,15 +1756,14 @@ public class SingleNodePlanner {
|
||||
scanNode.setSortColumn(tblRef.getSortColumn());
|
||||
scanNode.addConjuncts(pushDownConjuncts);
|
||||
}
|
||||
// assignConjuncts(scanNode, analyzer);
|
||||
scanNode.init(analyzer);
|
||||
// TODO chenhao add
|
||||
// materialize conjuncts in where
|
||||
analyzer.materializeSlots(scanNode.getConjuncts());
|
||||
|
||||
scanNodes.add(scanNode);
|
||||
// now we put the selectStmtToScanNodes's init before the scanNode.init
|
||||
List<ScanNode> scanNodeList = selectStmtToScanNodes.computeIfAbsent(selectStmt.getAnalyzer(), k -> Lists.newArrayList());
|
||||
scanNodeList.add(scanNode);
|
||||
|
||||
scanNode.init(analyzer);
|
||||
|
||||
return scanNode;
|
||||
}
|
||||
|
||||
@ -1549,18 +1853,9 @@ public class SingleNodePlanner {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new node to join outer with inner. Collects and assigns join conjunct
|
||||
* as well as regular conjuncts. Calls init() on the new join node.
|
||||
* Throws if the JoinNode.init() fails.
|
||||
*/
|
||||
private PlanNode createJoinNode(Analyzer analyzer, PlanNode outer, TableRef outerRef, TableRef innerRef,
|
||||
SelectStmt selectStmt)
|
||||
throws UserException, AnalysisException {
|
||||
private PlanNode createJoinNodeBase(Analyzer analyzer, PlanNode outer, PlanNode inner, TableRef innerRef)
|
||||
throws UserException {
|
||||
materializeTableResultForCrossJoinOrCountStar(innerRef, analyzer);
|
||||
// the rows coming from the build node only need to have space for the tuple
|
||||
// materialized by that node
|
||||
PlanNode inner = createTableRefNode(analyzer, innerRef, selectStmt);
|
||||
|
||||
List<Expr> eqJoinConjuncts = Lists.newArrayList();
|
||||
Reference<String> errMsg = new Reference<String>();
|
||||
@ -1577,8 +1872,8 @@ public class SingleNodePlanner {
|
||||
}
|
||||
|
||||
// construct cross join node
|
||||
LOG.debug("Join between {} and {} requires at least one conjunctive equality predicate between the two tables",
|
||||
outerRef.getAliasAsName(), innerRef.getAliasAsName());
|
||||
// LOG.debug("Join between {} and {} requires at least one conjunctive equality predicate between the two tables",
|
||||
// outerRef.getAliasAsName(), innerRef.getAliasAsName());
|
||||
// TODO If there are eq join predicates then we should construct a hash join
|
||||
CrossJoinNode result =
|
||||
new CrossJoinNode(ctx_.getNextNodeId(), outer, inner, innerRef);
|
||||
@ -1606,6 +1901,28 @@ public class SingleNodePlanner {
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
for joinreorder
|
||||
*/
|
||||
public PlanNode createJoinNode(Analyzer analyzer, PlanNode outer, PlanNode inner, TableRef innerRef)
|
||||
throws UserException {
|
||||
return createJoinNodeBase(analyzer, outer, inner, innerRef);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new node to join outer with inner. Collects and assigns join conjunct
|
||||
* as well as regular conjuncts. Calls init() on the new join node.
|
||||
* Throws if the JoinNode.init() fails.
|
||||
*/
|
||||
private PlanNode createJoinNode(Analyzer analyzer, PlanNode outer, TableRef innerRef,
|
||||
SelectStmt selectStmt) throws UserException {
|
||||
// the rows coming from the build node only need to have space for the tuple
|
||||
// materialized by that node
|
||||
PlanNode inner = createTableRefNode(analyzer, innerRef, selectStmt);
|
||||
|
||||
return createJoinNodeBase(analyzer, outer, inner, innerRef);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a tree of PlanNodes for the given tblRef, which can be a BaseTableRef,
|
||||
* CollectionTableRef or an InlineViewRef.
|
||||
|
||||
@ -124,6 +124,19 @@ public class SortNode extends PlanNode {
|
||||
@Override
|
||||
protected void computeStats(Analyzer analyzer) {
|
||||
super.computeStats(analyzer);
|
||||
if (!analyzer.safeIsEnableJoinReorderBasedCost()) {
|
||||
return;
|
||||
}
|
||||
cardinality = getChild(0).cardinality;
|
||||
applyConjunctsSelectivity();
|
||||
capCardinalityAtLimit();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("stats Sort: cardinality=" + cardinality);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void computeOldCardinality() {
|
||||
cardinality = getChild(0).cardinality;
|
||||
if (hasLimit()) {
|
||||
if (cardinality == -1) {
|
||||
|
||||
@ -137,7 +137,7 @@ public class StreamLoadPlanner {
|
||||
// create scan node
|
||||
scanNode = new StreamLoadScanNode(loadId, new PlanNodeId(0), tupleDesc, destTable, taskInfo);
|
||||
scanNode.init(analyzer);
|
||||
descTable.computeMemLayout();
|
||||
descTable.computeStatAndMemLayout();
|
||||
scanNode.finalize(analyzer);
|
||||
|
||||
// create dest sink
|
||||
|
||||
@ -143,7 +143,6 @@ public class StreamLoadScanNode extends LoadScanNode {
|
||||
initAndSetPrecedingFilter(taskInfo.getPrecedingFilter(), this.srcTupleDesc, analyzer);
|
||||
initAndSetWhereExpr(taskInfo.getWhereExpr(), this.desc, analyzer);
|
||||
|
||||
computeStats(analyzer);
|
||||
createDefaultSmap(analyzer);
|
||||
|
||||
if (taskInfo.getColumnSeparator() != null) {
|
||||
@ -169,6 +168,7 @@ public class StreamLoadScanNode extends LoadScanNode {
|
||||
brokerScanRange.setParams(params);
|
||||
|
||||
brokerScanRange.setBrokerAddresses(Lists.newArrayList());
|
||||
computeStats(analyzer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -84,6 +84,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public static final String ENABLE_SQL_CACHE = "enable_sql_cache";
|
||||
public static final String ENABLE_PARTITION_CACHE = "enable_partition_cache";
|
||||
|
||||
public static final String ENABLE_COST_BASED_JOIN_REORDER = "enable_cost_based_join_reorder";
|
||||
|
||||
public static final int MIN_EXEC_INSTANCE_NUM = 1;
|
||||
public static final int MAX_EXEC_INSTANCE_NUM = 32;
|
||||
// if set to true, some of stmt will be forwarded to master FE to get result
|
||||
@ -288,6 +290,9 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = ENABLE_PARTITION_CACHE)
|
||||
public boolean enablePartitionCache = false;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_COST_BASED_JOIN_REORDER)
|
||||
private boolean enableJoinReorderBasedCost = false;
|
||||
|
||||
@VariableMgr.VarAttr(name = FORWARD_TO_MASTER)
|
||||
public boolean forwardToMaster = false;
|
||||
|
||||
@ -374,6 +379,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
this.sqlMode = sqlMode;
|
||||
}
|
||||
|
||||
public boolean isEnableJoinReorderBasedCost() { return enableJoinReorderBasedCost; }
|
||||
|
||||
public boolean isAutoCommit() {
|
||||
return autoCommit;
|
||||
}
|
||||
|
||||
@ -356,7 +356,7 @@ public class StmtExecutor implements ProfileWriter {
|
||||
throw e;
|
||||
} catch (UserException e) {
|
||||
// analysis exception only print message, not print the stack
|
||||
LOG.warn("execute Exception. {}", e.getMessage());
|
||||
LOG.warn("execute Exception. {}", e);
|
||||
context.getState().setError(e.getMessage());
|
||||
context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -1456,6 +1456,8 @@ public class QueryPlanTest {
|
||||
Assert.assertTrue(explainString.contains("AGGREGATE (update serialize)"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testLeadAndLagFunction() throws Exception {
|
||||
connectContext.setDatabase("default_cluster:test");
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -16,7 +16,7 @@
|
||||
|
||||
</Appenders>
|
||||
<Loggers>
|
||||
<Root level="info">
|
||||
<Root level="debug">
|
||||
<AppenderRef ref="Console" />
|
||||
<AppenderRef ref="RollingFile" />
|
||||
</Root>
|
||||
|
||||
Reference in New Issue
Block a user