[BUG] fix 4149, add sessionVariable to choose broadcastjoin first when cardinality cannot be estimated (#4150)

This commit is contained in:
wutiangan
2020-07-29 12:28:52 +08:00
committed by GitHub
parent 79b4f92cb7
commit 59676a1117
6 changed files with 59 additions and 1 deletions

View File

@ -338,3 +338,9 @@ SET forward_to_master = concat('tr', 'u', 'e');
* `rewrite_count_distinct_to_bitmap_hll`
Whether to rewrite count distinct queries of bitmap and HLL types as bitmap_union_count and hll_union_agg.
* `prefer_join_method`
When choosing the join method(broadcast join or shuffle join), if the broadcast join cost and shuffle join cost are equal, which join method should we prefer.
Currently, the optional values for this variable are "broadcast" or "shuffle".

View File

@ -338,3 +338,8 @@ SET forward_to_master = concat('tr', 'u', 'e');
是否将 bitmap 和 hll 类型的 count distinct 查询重写为 bitmap_union_count 和 hll_union_agg 。
* `prefer_join_method`
在选择join的具体实现方式是broadcast join还是shuffle join时,如果broadcast join cost和shuffle join cost相等时,优先选择哪种join方式。
目前该变量的可选值为"broadcast" 或者 "shuffle"。

View File

@ -280,6 +280,20 @@ public class DistributedPlanner {
}
}
/**
* When broadcastCost and partitionCost are equal, there is no uniform standard for which join implementation is better.
* Some scenarios are suitable for broadcast join, and some scenarios are suitable for shuffle join.
* Therefore, we add a SessionVariable to help users choose a better join implementation.
*/
private boolean isBroadcastCostSmaller(long broadcastCost, long partitionCost) {
String joinMethod = ConnectContext.get().getSessionVariable().getPreferJoinMethod();
if (joinMethod.equalsIgnoreCase("broadcast")) {
return broadcastCost <= partitionCost;
} else {
return broadcastCost < partitionCost;
}
}
/**
* Creates either a broadcast join or a repartitioning join, depending on the expected cost. If any of the inputs to
* the cost computation is unknown, it assumes the cost will be 0. Costs being equal, it'll favor partitioned over
@ -351,7 +365,7 @@ public class DistributedPlanner {
&& (perNodeMemLimit == 0 || Math.round(
(double) rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD) <= perNodeMemLimit)
&& (node.getInnerRef().isBroadcastJoin() || (!node.getInnerRef().isPartitionJoin()
&& broadcastCost < partitionCost))) {
&& isBroadcastCostSmaller(broadcastCost, partitionCost)))) {
doBroadcast = true;
} else {
doBroadcast = false;

View File

@ -74,6 +74,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
public static final String ENABLE_SPILLING = "enable_spilling";
public static final String PREFER_JOIN_METHOD = "prefer_join_method";
public static final int MIN_EXEC_INSTANCE_NUM = 1;
public static final int MAX_EXEC_INSTANCE_NUM = 32;
// if set to true, some of stmt will be forwarded to master FE to get result
@ -209,6 +210,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = DISABLE_COLOCATE_JOIN)
private boolean disableColocateJoin = false;
@VariableMgr.VarAttr(name = PREFER_JOIN_METHOD)
private String preferJoinMethod = "broadcast";
/*
* the parallel exec instance num for one Fragment in one BE
* 1 means disable this feature
@ -397,6 +401,10 @@ public class SessionVariable implements Serializable, Writable {
return disableColocateJoin;
}
public String getPreferJoinMethod() {return preferJoinMethod; }
public void setPreferJoinMethod(String preferJoinMethod) {this.preferJoinMethod = preferJoinMethod; }
public int getParallelExecInstanceNum() {
return parallelExecInstanceNum;
}

View File

@ -244,6 +244,12 @@ public class VariableMgr {
}
}
if (setVar.getVariable().toLowerCase().equals("prefer_join_method")) {
if (!value.toLowerCase().equals("broadcast") && !value.toLowerCase().equals("shuffle")) {
ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, "prefer_join_method", value);
}
}
if (setVar.getType() == SetType.GLOBAL) {
wlock.lock();
try {

View File

@ -907,4 +907,23 @@ public class QueryPlanTest {
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertFalse(explainString.contains("INNER JOIN (PARTITIONED)"));
}
@Test
public void testPreferBroadcastJoin() throws Exception {
connectContext.setDatabase("default_cluster:test");
String queryStr = "explain select * from (select k1 from jointest group by k1)t2, jointest t1 where t1.k1 = t2.k1";
// default set PreferBroadcastJoin true
String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(explainString.contains("INNER JOIN (BROADCAST)"));
connectContext.getSessionVariable().setPreferJoinMethod("shuffle");
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(explainString.contains("INNER JOIN (PARTITIONED)"));
connectContext.getSessionVariable().setPreferJoinMethod("broadcast");
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Assert.assertTrue(explainString.contains("INNER JOIN (BROADCAST)"));
}
}