[fix](planner) cannot process more than one subquery in disjunct (#16506)
before this PR, Doris cannot process sql like that
```sql
CREATE TABLE `test_sq_dj1` (
`c1` int(11) NULL,
`c2` int(11) NULL,
`c3` int(11) NULL
) ENGINE=OLAP
DUPLICATE KEY(`c1`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`c1`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);
CREATE TABLE `test_sq_dj2` (
`c1` int(11) NULL,
`c2` int(11) NULL,
`c3` int(11) NULL
) ENGINE=OLAP
DUPLICATE KEY(`c1`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`c1`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);
insert into test_sq_dj1 values(1, 2, 3), (10, 20, 30), (100, 200, 300);
insert into test_sq_dj2 values(10, 20, 30);
-- core
SELECT * FROM test_sq_dj1 WHERE c1 IN (SELECT c1 FROM test_sq_dj2) OR c1 IN (SELECT c1 FROM test_sq_dj2) OR c1 < 10;
-- invalid slot
SELECT * FROM test_sq_dj1 WHERE c1 IN (SELECT c1 FROM test_sq_dj2) OR c1 IN (SELECT c2 FROM test_sq_dj2) OR c1 < 10;
```
there are two problems:
1. we should remove redundant sub-query in one conjuncts to avoid generate useless join node
2. when we have more than one sub-query in one disjunct. we should put the conjunct contains the disjunct at the top node of the set of mark join nodes. And pop up the mark slot to the top node.
This commit is contained in:
@ -396,6 +396,8 @@ public class Analyzer {
|
||||
|
||||
private final Map<TableRef, TupleId> markTupleIdByInnerRef = Maps.newHashMap();
|
||||
|
||||
private final Set<TupleId> markTupleIdsNotProcessed = Sets.newHashSet();
|
||||
|
||||
public GlobalState(Env env, ConnectContext context) {
|
||||
this.env = env;
|
||||
this.context = context;
|
||||
@ -672,12 +674,18 @@ public class Analyzer {
|
||||
|
||||
tableRefMap.put(result.getId(), ref);
|
||||
|
||||
// for mark join
|
||||
// for mark join, init three context
|
||||
// 1. markTuples to records all tuples belong to mark slot
|
||||
// 2. markTupleIdByInnerRef to records relationship between inner table of mark join and the mark tuple
|
||||
// 3. markTupleIdsNotProcessed to records un-process mark tuple id. if an expr contains slot belong to
|
||||
// the un-process mark tuple, it should not assign to current join node and should pop up its
|
||||
// mark slot until all mark tuples in this expr has been processed.
|
||||
if (ref.getJoinOp() != null && ref.isMark()) {
|
||||
TupleDescriptor markTuple = getDescTbl().createTupleDescriptor();
|
||||
markTuple.setAliases(new String[]{ref.getMarkTupleName()}, true);
|
||||
globalState.markTuples.put(ref.getMarkTupleName(), markTuple);
|
||||
globalState.markTupleIdByInnerRef.put(ref, markTuple.getId());
|
||||
globalState.markTupleIdsNotProcessed.add(markTuple.getId());
|
||||
}
|
||||
|
||||
return result;
|
||||
@ -1592,12 +1600,38 @@ public class Analyzer {
|
||||
return result;
|
||||
}
|
||||
|
||||
public boolean needPopUpMarkTuple(TableRef ref) {
|
||||
TupleId id = globalState.markTupleIdByInnerRef.get(ref);
|
||||
if (id == null) {
|
||||
return false;
|
||||
}
|
||||
List<Expr> exprs = getAllConjuncts(id);
|
||||
for (Expr expr : exprs) {
|
||||
List<TupleId> tupleIds = Lists.newArrayList();
|
||||
expr.getIds(tupleIds, null);
|
||||
if (tupleIds.stream().anyMatch(globalState.markTupleIdsNotProcessed::contains)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public List<Expr> getMarkConjuncts(TableRef ref) {
|
||||
TupleId id = globalState.markTupleIdByInnerRef.get(ref);
|
||||
if (id == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return getAllConjuncts(id);
|
||||
globalState.markTupleIdsNotProcessed.remove(id);
|
||||
List<Expr> retExprs = Lists.newArrayList();
|
||||
List<Expr> exprs = getAllConjuncts(id);
|
||||
for (Expr expr : exprs) {
|
||||
List<TupleId> tupleIds = Lists.newArrayList();
|
||||
expr.getIds(tupleIds, null);
|
||||
if (tupleIds.stream().noneMatch(globalState.markTupleIdsNotProcessed::contains)) {
|
||||
retExprs.add(expr);
|
||||
}
|
||||
}
|
||||
return retExprs;
|
||||
}
|
||||
|
||||
public TupleDescriptor getMarkTuple(TableRef ref) {
|
||||
|
||||
@ -365,10 +365,15 @@ public class StmtRewriter {
|
||||
List<Expr> subqueryExprInConjunct, List<Expr> subqueryExprInDisjunct) {
|
||||
if (!(expr instanceof CompoundPredicate)) {
|
||||
if (expr.contains(Subquery.class)) {
|
||||
// remove redundant sub-query by compare two sub-query with equals
|
||||
if (inDisjunct) {
|
||||
subqueryExprInDisjunct.add(expr);
|
||||
if (!subqueryExprInDisjunct.contains(expr)) {
|
||||
subqueryExprInDisjunct.add(expr);
|
||||
}
|
||||
} else {
|
||||
subqueryExprInConjunct.add(expr);
|
||||
if (!subqueryExprInConjunct.contains(expr)) {
|
||||
subqueryExprInConjunct.add(expr);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -432,10 +437,12 @@ public class StmtRewriter {
|
||||
* ON $a$1.a = T1.a
|
||||
* WHERE T1.c < 10;
|
||||
*/
|
||||
// TODO(mark join) need support mark join
|
||||
private static void rewriteWhereClauseSubqueries(SelectStmt stmt, Analyzer analyzer)
|
||||
throws AnalysisException {
|
||||
int numTableRefs = stmt.fromClause.size();
|
||||
// we must use two same set structs to process conjuncts and disjuncts
|
||||
// because the same sub-query could appear in both at the same time.
|
||||
// if we use one ExprSubstitutionMap, the sub-query will be replaced by wrong expr.
|
||||
ArrayList<Expr> exprsWithSubqueriesInConjuncts = Lists.newArrayList();
|
||||
ArrayList<Expr> exprsWithSubqueriesInDisjuncts = Lists.newArrayList();
|
||||
ExprSubstitutionMap conjunctsSmap = new ExprSubstitutionMap();
|
||||
@ -445,7 +452,6 @@ public class StmtRewriter {
|
||||
List<Expr> subqueryInDisjunct = Lists.newArrayList();
|
||||
// Check if all the conjuncts in the WHERE clause that contain subqueries
|
||||
// can currently be rewritten as a join.
|
||||
// TODO(mark join) traverse expr tree to process subquery.
|
||||
extractExprWithSubquery(false, stmt.whereClause, subqueryInConjunct, subqueryInDisjunct);
|
||||
for (Expr conjunct : subqueryInConjunct) {
|
||||
processOneSubquery(stmt, exprsWithSubqueriesInConjuncts,
|
||||
@ -458,8 +464,7 @@ public class StmtRewriter {
|
||||
stmt.whereClause = stmt.whereClause.substitute(conjunctsSmap, disjunctsSmap, analyzer, false);
|
||||
|
||||
boolean hasNewVisibleTuple = false;
|
||||
// Recursively equal all the exprs that contain subqueries and merge them
|
||||
// with 'stmt'.
|
||||
// Recursively equal all the exprs that contain subqueries and merge them with 'stmt'.
|
||||
for (Expr expr : exprsWithSubqueriesInConjuncts) {
|
||||
if (mergeExpr(stmt, rewriteExpr(expr, analyzer), analyzer, null)) {
|
||||
hasNewVisibleTuple = true;
|
||||
@ -515,7 +520,6 @@ public class StmtRewriter {
|
||||
// Replace all the supported exprs with subqueries with true BoolLiterals
|
||||
// using a smap.
|
||||
if (isMark) {
|
||||
// TODO(mark join) if need mark join, we should replace a SlotRef instead of BoolLiteral
|
||||
TupleDescriptor markTuple = analyzer.getDescTbl().createTupleDescriptor();
|
||||
markTuple.setAliases(new String[]{stmt.getTableAliasGenerator().getNextAlias()}, true);
|
||||
SlotDescriptor markSlot = analyzer.addSlotDescriptor(markTuple);
|
||||
@ -840,9 +844,6 @@ public class StmtRewriter {
|
||||
&& ((ExistsPredicate) expr).isNotExists()) {
|
||||
// For the case of a NOT IN with an eq join conjunct, replace the join
|
||||
// conjunct with a conjunct that uses the null-matching eq operator.
|
||||
// TODO: mark join only works on nested loop join now, and NLJ do NOT support NULL_AWARE_LEFT_ANTI_JOIN
|
||||
// remove markTuple == null when nested loop join support NULL_AWARE_LEFT_ANTI_JOIN
|
||||
// or plan mark join on hash join
|
||||
if (expr instanceof InPredicate && markTuple == null) {
|
||||
joinOp = VectorizedUtil.isVectorized()
|
||||
? JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN : JoinOperator.LEFT_ANTI_JOIN;
|
||||
|
||||
@ -94,8 +94,11 @@ public class TableRef implements ParseNode, Writable {
|
||||
// Indicates whether this table ref is given an explicit alias,
|
||||
protected boolean hasExplicitAlias;
|
||||
protected JoinOperator joinOp;
|
||||
// for mark join
|
||||
protected boolean isMark;
|
||||
// we must record mark tuple name for re-analyze
|
||||
protected String markTupleName;
|
||||
|
||||
protected List<String> usingColNames;
|
||||
protected ArrayList<LateralViewRef> lateralViewRefs;
|
||||
protected Expr onClause;
|
||||
|
||||
@ -758,7 +758,7 @@ public class HashJoinNode extends JoinNodeBase {
|
||||
StringBuilder output =
|
||||
new StringBuilder().append(detailPrefix).append("join op: ").append(joinOp.toString()).append("(")
|
||||
.append(distrModeStr).append(")").append("[").append(colocateReason).append("]\n");
|
||||
|
||||
output.append(detailPrefix).append("is mark: ").append(isMarkJoin()).append("\n");
|
||||
if (detailLevel == TExplainLevel.BRIEF) {
|
||||
output.append(detailPrefix).append(
|
||||
String.format("cardinality=%,d", cardinality)).append("\n");
|
||||
|
||||
@ -114,7 +114,6 @@ public abstract class JoinNodeBase extends PlanNode {
|
||||
}
|
||||
|
||||
protected void computeOutputTuple(Analyzer analyzer) throws UserException {
|
||||
// TODO(mark join) if it is mark join use mark tuple instead?
|
||||
// 1. create new tuple
|
||||
vOutputTupleDesc = analyzer.getDescTbl().createTupleDescriptor();
|
||||
boolean copyLeft = false;
|
||||
@ -208,6 +207,15 @@ public abstract class JoinNodeBase extends PlanNode {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// add mark slot if needed
|
||||
if (isMarkJoin() && analyzer.needPopUpMarkTuple(innerRef)) {
|
||||
SlotDescriptor markSlot = analyzer.getMarkTuple(innerRef).getSlots().get(0);
|
||||
SlotDescriptor outputSlotDesc =
|
||||
analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, markSlot);
|
||||
srcTblRefToOutputTupleSmap.put(new SlotRef(markSlot), new SlotRef(outputSlotDesc));
|
||||
}
|
||||
|
||||
// 2. compute srcToOutputMap
|
||||
vSrcToOutputSMap = ExprSubstitutionMap.subtraction(outputSmap, srcTblRefToOutputTupleSmap, analyzer);
|
||||
for (int i = 0; i < vSrcToOutputSMap.size(); i++) {
|
||||
@ -220,6 +228,7 @@ public abstract class JoinNodeBase extends PlanNode {
|
||||
rSlotRef.getDesc().setIsMaterialized(true);
|
||||
}
|
||||
}
|
||||
|
||||
vOutputTupleDesc.computeStatAndMemLayout();
|
||||
// 3. add tupleisnull in null-side
|
||||
Preconditions.checkState(srcTblRefToOutputTupleSmap.getLhs().size() == vSrcToOutputSMap.getLhs().size());
|
||||
@ -313,7 +322,7 @@ public abstract class JoinNodeBase extends PlanNode {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void projectOutputTuple() throws NotImplementedException {
|
||||
public void projectOutputTuple() {
|
||||
if (vOutputTupleDesc == null) {
|
||||
return;
|
||||
}
|
||||
@ -343,15 +352,19 @@ public abstract class JoinNodeBase extends PlanNode {
|
||||
|
||||
protected abstract void computeOtherConjuncts(Analyzer analyzer, ExprSubstitutionMap originToIntermediateSmap);
|
||||
|
||||
protected void computeIntermediateTuple(Analyzer analyzer, TupleDescriptor markTuple) throws AnalysisException {
|
||||
protected void computeIntermediateTuple(Analyzer analyzer) throws AnalysisException {
|
||||
// 1. create new tuple
|
||||
TupleDescriptor vIntermediateLeftTupleDesc = analyzer.getDescTbl().createTupleDescriptor();
|
||||
TupleDescriptor vIntermediateRightTupleDesc = analyzer.getDescTbl().createTupleDescriptor();
|
||||
vIntermediateTupleDescList = new ArrayList<>();
|
||||
vIntermediateTupleDescList.add(vIntermediateLeftTupleDesc);
|
||||
vIntermediateTupleDescList.add(vIntermediateRightTupleDesc);
|
||||
if (markTuple != null) {
|
||||
vIntermediateTupleDescList.add(markTuple);
|
||||
// if join type is MARK, add mark tuple to intermediate tuple. mark slot will be generated after join.
|
||||
if (isMarkJoin()) {
|
||||
TupleDescriptor markTuple = analyzer.getMarkTuple(innerRef);
|
||||
if (markTuple != null) {
|
||||
vIntermediateTupleDescList.add(markTuple);
|
||||
}
|
||||
}
|
||||
boolean leftNullable = false;
|
||||
boolean rightNullable = false;
|
||||
@ -454,11 +467,7 @@ public abstract class JoinNodeBase extends PlanNode {
|
||||
public void finalize(Analyzer analyzer) throws UserException {
|
||||
super.finalize(analyzer);
|
||||
if (VectorizedUtil.isVectorized()) {
|
||||
TupleDescriptor markTuple = null;
|
||||
if (innerRef != null) {
|
||||
markTuple = analyzer.getMarkTuple(innerRef);
|
||||
}
|
||||
computeIntermediateTuple(analyzer, markTuple);
|
||||
computeIntermediateTuple(analyzer);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -228,6 +228,7 @@ public class NestedLoopJoinNode extends JoinNodeBase {
|
||||
StringBuilder output =
|
||||
new StringBuilder().append(detailPrefix).append("join op: ").append(joinOp.toString()).append("(")
|
||||
.append(distrModeStr).append(")\n");
|
||||
output.append(detailPrefix).append("is mark: ").append(isMarkJoin()).append("\n");
|
||||
|
||||
if (detailLevel == TExplainLevel.BRIEF) {
|
||||
output.append(detailPrefix).append(
|
||||
|
||||
Reference in New Issue
Block a user