From 5bf84814cc6f7ac78d62bfe69ec6f63004818702 Mon Sep 17 00:00:00 2001 From: ccoffline <45881148+ccoffline@users.noreply.github.com> Date: Sat, 19 Dec 2020 11:16:59 +0800 Subject: [PATCH] [Doc] Improve broadcast instructions (#5048) --- docs/en/getting-started/advance-usage.md | 4 +++- docs/zh-CN/getting-started/advance-usage.md | 4 +++- .../org/apache/doris/planner/HashJoinNode.java | 17 +++++++---------- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/docs/en/getting-started/advance-usage.md b/docs/en/getting-started/advance-usage.md index c10dae55ab..81624891a1 100644 --- a/docs/en/getting-started/advance-usage.md +++ b/docs/en/getting-started/advance-usage.md @@ -209,7 +209,9 @@ Modify the timeout to 1 minute: By default, the system implements Join by conditionally filtering small tables, broadcasting them to the nodes where the large tables are located, forming a memory Hash table, and then streaming out the data of the large tables Hash Join. However, if the amount of data filtered by small tables can not be put into memory, Join will not be able to complete at this time. The usual error should be caused by memory overrun first. -If you encounter the above situation, it is recommended to use Shuffle Join, also known as Partitioned Join. That is, small and large tables are Hash according to Join's key, and then distributed Join. This memory consumption is allocated to all computing nodes in the cluster. +If you encounter the above situation, it is recommended to use Shuffle Join explicitly, also known as Partitioned Join. That is, small and large tables are Hash according to Join's key, and then distributed Join. This memory consumption is allocated to all computing nodes in the cluster. + +Doris will try to use Broadcast Join first. If small tables are too large to broadcasting, Doris will switch to Shuffle Join automatically. Note that if you use Broadcast Join explicitly in this case, Doris will still switch to Shuffle Join automatically. Use Broadcast Join (default): diff --git a/docs/zh-CN/getting-started/advance-usage.md b/docs/zh-CN/getting-started/advance-usage.md index fc9f4104a9..85965370a6 100644 --- a/docs/zh-CN/getting-started/advance-usage.md +++ b/docs/zh-CN/getting-started/advance-usage.md @@ -209,7 +209,9 @@ mysql> SHOW VARIABLES LIKE "%query_timeout%"; 系统默认实现 Join 的方式,是将小表进行条件过滤后,将其广播到大表所在的各个节点上,形成一个内存 Hash 表,然后流式读出大表的数据进行Hash Join。但是如果当小表过滤后的数据量无法放入内存的话,此时 Join 将无法完成,通常的报错应该是首先造成内存超限。 -如果遇到上述情况,建议使用 Shuffle Join 的方式,也被称作 Partitioned Join。即将小表和大表都按照 Join 的 key 进行 Hash,然后进行分布式的 Join。这个对内存的消耗就会分摊到集群的所有计算节点上。 +如果遇到上述情况,建议显式指定 Shuffle Join,也被称作 Partitioned Join。即将小表和大表都按照 Join 的 key 进行 Hash,然后进行分布式的 Join。这个对内存的消耗就会分摊到集群的所有计算节点上。 + +Doris会自动尝试进行 Broadcast Join,如果预估小表过大则会自动切换至 Shuffle Join。注意,如果此时显式指定了 Broadcast Join 也会自动切换至 Shuffle Join。 使用 Broadcast Join(默认): diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index 17683073f2..f080a83a74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -275,22 +275,19 @@ public class HashJoinNode extends PlanNode { protected String getNodeExplainString(String detailPrefix, TExplainLevel detailLevel) { String distrModeStr = (distrMode != DistributionMode.NONE) ? (" (" + distrMode.toString() + ")") : ""; - StringBuilder output = new StringBuilder().append( - detailPrefix + "join op: " + joinOp.toString() + distrModeStr + "\n").append( - detailPrefix + "hash predicates:\n"); - - output.append(detailPrefix + "colocate: " + isColocate + (isColocate? "" : ", reason: " + colocateReason) + "\n"); + StringBuilder output = new StringBuilder() + .append(detailPrefix).append("join op: ").append(joinOp.toString()).append(distrModeStr).append("\n") + .append(detailPrefix).append("hash predicates:\n") + .append(detailPrefix).append("colocate: ").append(isColocate).append(isColocate ? "" : ", reason: " + colocateReason).append("\n"); for (BinaryPredicate eqJoinPredicate : eqJoinConjuncts) { - output.append(detailPrefix).append("equal join conjunct: ").append(eqJoinPredicate.toSql() + "\n"); + output.append(detailPrefix).append("equal join conjunct: ").append(eqJoinPredicate.toSql()).append("\n"); } if (!otherJoinConjuncts.isEmpty()) { - output.append(detailPrefix + "other join predicates: ").append( - getExplainString(otherJoinConjuncts) + "\n"); + output.append(detailPrefix).append("other join predicates: ").append(getExplainString(otherJoinConjuncts)).append("\n"); } if (!conjuncts.isEmpty()) { - output.append(detailPrefix + "other predicates: ").append( - getExplainString(conjuncts) + "\n"); + output.append(detailPrefix).append("other predicates: ").append(getExplainString(conjuncts)).append("\n"); } return output.toString(); }