Files
oceanbase/docs/docs-cn/5.overview/8.sql-engine/6.distributed-execution-planning-and-scheduling/5.distributed-aggregate-pressure-reduction.md
2022-02-10 14:51:49 +08:00

3.8 KiB

分布式聚合下压

总的来看,聚合算子的下压可以分为 Partition-Wise Aggregation 计划和下压加两阶段聚合计划。

Partition-Wise Aggregation

如果聚合查询的语意是按照分区键去做分组且在分组内做聚合,那么这样的聚合操作称为 Partition-Wise Aggregation。所有的操作都被下压到分区内并行的执行。需要注意的是,Partition-Wise Aggregation 不一定是最优的执行,因为并行度会受限于分区的数量。

explain select sum(c2) from t2 group by c1\G;
*************************** 1. row ***************************
Query Plan: 
============================================
|ID|OPERATOR           |NAME|EST. ROWS|COST|
--------------------------------------------
|0 |EXCHANGE IN DISTR  |    |1000     |2834|
|1 | EXCHANGE OUT DISTR|    |1000     |2740|
|2 |  HASH GROUP BY    |    |1000     |2740|
|3 |   TABLE SCAN      |t2  |4000     |499 |
============================================

Outputs & filters: 
-------------------------------------
  0 - output([T_FUN_SUM(t2.c2)]), filter(nil)
  1 - output([T_FUN_SUM(t2.c2)]), filter(nil)
  2 - output([T_FUN_SUM(t2.c2)]), filter(nil), 
      group([t2.c1]), agg_func([T_FUN_SUM(t2.c2)])
  3 - output([t2.c1], [t2.c2]), filter(nil), 
      access([t2.c1], [t2.c2]), partitions(p[0-3])

下压加两阶段聚合

在更一般的情况下,aggregation 操作可能不是按照分区键作为分组来做的,在这样的情况下,OceanBase 数据库会采用两阶段聚合的方式,如下例所示。

  1. 将 aggregation 操作下压,获得部分的聚合结果。

  2. 进行汇总的方式获得最终结果。

explain select sum(c1) from t2 group by c2\G;
*************************** 1. row ***************************
Query Plan: 
=============================================
|ID|OPERATOR            |NAME|EST. ROWS|COST|
---------------------------------------------
|0 |HASH GROUP BY       |    |1000     |3395|
|1 | EXCHANGE IN DISTR  |    |1000     |2834|
|2 |  EXCHANGE OUT DISTR|    |1000     |2740|
|3 |   HASH GROUP BY    |    |1000     |2740|
|4 |    TABLE SCAN      |t2  |4000     |499 |
=============================================

Outputs & filters: 
-------------------------------------
  0 - output([T_FUN_SUM(T_FUN_SUM(t2.c1))]), filter(nil), 
      group([t2.c2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.c1))])
  1 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
  2 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
  3 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil), 
      group([t2.c2]), agg_func([T_FUN_SUM(t2.c1)])
  4 - output([t2.c1], [t2.c2]), filter(nil), 
      access([t2.c1], [t2.c2]), partitions(p[0-3])
explain select sum(c1) from t2 group by c2\G;
*************************** 1. row ***************************
Query Plan: 
=============================================
|ID|OPERATOR            |NAME|EST. ROWS|COST|
---------------------------------------------
|0 |HASH GROUP BY       |    |1000     |3395|
|1 | EXCHANGE IN DISTR  |    |1000     |2834|
|2 |  EXCHANGE OUT DISTR|    |1000     |2740|
|3 |   HASH GROUP BY    |    |1000     |2740|
|4 |    TABLE SCAN      |t2  |4000     |499 |
=============================================

Outputs & filters: 
-------------------------------------
  0 - output([T_FUN_SUM(T_FUN_SUM(t2.c1))]), filter(nil), 
      group([t2.c2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.c1))])
  1 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
  2 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
  3 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil), 
      group([t2.c2]), agg_func([T_FUN_SUM(t2.c1)])
  4 - output([t2.c1], [t2.c2]), filter(nil), 
      access([t2.c1], [t2.c2]), partitions(p[0-3])