Move the docs folder
This commit is contained in:
		
				
					committed by
					
						
						LINxiansheng
					
				
			
			
				
	
			
			
			
						parent
						
							7c6dcc6712
						
					
				
				
					commit
					d42f317422
				
			@ -0,0 +1,84 @@
 | 
			
		||||
分布式执行和并行查询 
 | 
			
		||||
===============================
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
#### 适用分区表的场景 
 | 
			
		||||
 | 
			
		||||
如果关系表比较小,则不必要进行分区,如果关系表比较大,则需要根据上层业务需求,审慎选择分区键,以保证大多数查询能够使用分区键进行分区裁剪以减少数据访问量。同时,对于有关联性的表,建议采用关联键作为分区键,采用相同分区方式,使用 table group 的方式将相同分区配置在同样的节点上,以减少跨节点的数据交互。OceanBase 数据库的优化器会自动根据查询和数据的物理分布生成分布式执行计划。
 | 
			
		||||
 | 
			
		||||
并行查询简介 
 | 
			
		||||
---------------
 | 
			
		||||
 | 
			
		||||
并行查询是指通过对查询计划的改造,给每一个查询计划增加更多的 CPU 和 IO 处理能力,来提高单个查询的响应时间。并行查询技术可以用于分布式执行计划的执行,也可以用于本地查询计划的执行。
 | 
			
		||||
 | 
			
		||||
当单个查询所要访问的数据不在一个节点上的时候,需要通过数据重分布的方式,将相关的数据分布到同样的计算节点进行计算,以每一次的数据重分布节点为上下界,OceanBase 数据库的执行计划被划分为一个个的 job,而每一个 job 可以被切分为并行度个 task 进行并发的执行以提高执行效率。
 | 
			
		||||
 | 
			
		||||
典型的场景,当并行度被提高时,查询的响应时间会缩短,更多的 CPU、IO 和内存资源会被用于查询的执行。对于大数据量查询处理的决策支持系统或者数据仓库型应用来说,查询时间提升会尤为明显。
 | 
			
		||||
 | 
			
		||||
整体来说,并行查询的总体思路和分布式执行计划有相似之处,将执行计划分解之后,不同于串行执行将整个计划由单个执行线程执行,将执行计划的每个部分分由多个执行线程执行,通过一定的调度的方式,实现执行计划的 job 与 job 之间的并发执行和 job 内部的并发执行。
 | 
			
		||||
 | 
			
		||||
在在线交易(OLTP)场景下,也可以适用于批量更新操作,创建索引,维护索引等操作。
 | 
			
		||||
 | 
			
		||||
并行查询可以提高以下场景处理性能:
 | 
			
		||||
 | 
			
		||||
* 查询中需要处理大量数据表的连接,扫描,分区索引表的扫描
 | 
			
		||||
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
* 大索引的创建
 | 
			
		||||
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
* 批量 DML 操作
 | 
			
		||||
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
#### 适用并行查询的场景 
 | 
			
		||||
 | 
			
		||||
并行查询对于以下情况有显著效果:
 | 
			
		||||
 | 
			
		||||
* 充足的 IO 带宽
 | 
			
		||||
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
* 系统 CPU 负载较低
 | 
			
		||||
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
* 充足的内存资源以满足并行查询的需要
 | 
			
		||||
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
如果系统没有充足的资源进行额外的并行处理,使用并行查询或者提高并行度并不能提高执行性能。相反,在系统过载的情况下,操作系统被迫进行更多的调度,上下文切换或者页面交换,可能会导致性能的进一步下降。
 | 
			
		||||
 | 
			
		||||
通常在 D(Decision)S(Support)S(System) 系统,大量分区需要被访问和数据仓库环境下,并行执行能够提升执行响应时间。OLTP 系统通常在批量 DML 操作或者进行 schema 维护操作时能够受益,例如进行 index 的创建等。对于简单的 DML 操作或者分区内查询以及涉及分区数比较小的查询来说,使用并行查询并不能很明显的提高查询响应时间。
 | 
			
		||||
 | 
			
		||||
还有需要注意的是,当想要通过并行查询得到最佳的性能表现时,系统的每一个组成部分需要共同的进行配置。因为任何一个部分的性能表现瓶颈都会成为制约整个系统表现的单点。
 | 
			
		||||
 | 
			
		||||
下述查询操作都可以使用并行查询:
 | 
			
		||||
 | 
			
		||||
* 各种 access methods 全表扫描(包括分区间并行和分区内并行扫描),索引表扫描。
 | 
			
		||||
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
* 各种表连接操作包括 NESTED LOOP JOIN, MERGE JOIN 和 HASH JOIN。
 | 
			
		||||
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
* 其他一些 SQL 操作包括一些聚合操作,例如 GROUP BY, DISTINCT,SUM,LIMIT 算子的下压等。
 | 
			
		||||
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,331 @@
 | 
			
		||||
分布式计划的生成 
 | 
			
		||||
=============================
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
OceanBase 数据库的优化器会分两阶段来生成分布式的执行计划。
 | 
			
		||||
 | 
			
		||||
1. 第一阶段,不考虑数据的物理分布,生成所有基于本地关系优化的最优执行计划。在本地计划生成后,优化器会检查数据是否访问了多个分区,或者是否是本地单分区表但是用户用 HINT 强制指定了采用并行查询执行。
 | 
			
		||||
 | 
			
		||||
   
 | 
			
		||||
 | 
			
		||||
2. 第二阶段,生成分布式计划。根据执行计划树,在需要进行数据重分布的地方,插入 EXCHANGE 节点,从而将原先的本地计划树变成分布式计划。
 | 
			
		||||
 | 
			
		||||
   
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
相关名词解释 
 | 
			
		||||
---------------
 | 
			
		||||
 | 
			
		||||
* job 
 | 
			
		||||
 | 
			
		||||
  分布式计划以数据重分布点为边界,切分为可以并行执行的逻辑子计划,每个子计划由一个 job 进行封装。
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
<!-- -->
 | 
			
		||||
 | 
			
		||||
* root job
 | 
			
		||||
 | 
			
		||||
  job 树最顶端的 job。
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
<!-- -->
 | 
			
		||||
 | 
			
		||||
* 普通 job
 | 
			
		||||
 | 
			
		||||
  job 树的其它 job。
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
<!-- -->
 | 
			
		||||
 | 
			
		||||
* 中间结果管理器(Intermediate Result Manager)
 | 
			
		||||
 | 
			
		||||
  用于缓存需要在不同 job 间传递的数据。
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
<!-- -->
 | 
			
		||||
 | 
			
		||||
* EXCHANGE 算子
 | 
			
		||||
 | 
			
		||||
  用于进行跨 job 数据传递的算子,具体分为如下两种:
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
<!-- -->
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
<!-- -->
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
<!-- -->
 | 
			
		||||
 | 
			
		||||
* task
 | 
			
		||||
 | 
			
		||||
  每个逻辑子计划(job)可以并行执行,每个并行执行的任务称为一个 task。
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
<!-- -->
 | 
			
		||||
 | 
			
		||||
* 主线程
 | 
			
		||||
 | 
			
		||||
  负责接收客户端请求、返回操作结果的线程,负责执行 root job。
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
<!-- -->
 | 
			
		||||
 | 
			
		||||
* 工作线程
 | 
			
		||||
 | 
			
		||||
  负责执行普通 job 的 task。
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
* 调度线程
 | 
			
		||||
 | 
			
		||||
  负责调度 job。
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
分布式计划的相关算子 
 | 
			
		||||
-------------------
 | 
			
		||||
 | 
			
		||||
生成分布式计划的过程就是在原始计划树上寻找恰当位置插入 EXCHANGE 算子的过程,在自顶向下遍历计划树的时候,需要根据相应算子的数据处理的情况以及输入算子的数据分区情况,决定是否需要插入 EXCHANGE 算子。
 | 
			
		||||
 | 
			
		||||
如下示例为最简单的单表扫描:
 | 
			
		||||
 | 
			
		||||
```javascript
 | 
			
		||||
explain select * from t2\G
 | 
			
		||||
*************************** 1. row ***************************
 | 
			
		||||
Query Plan: 
 | 
			
		||||
============================================
 | 
			
		||||
|ID|OPERATOR           |NAME|EST. ROWS|COST|
 | 
			
		||||
--------------------------------------------
 | 
			
		||||
|0 |EXCHANGE IN DISTR  |    |4000     |878 |
 | 
			
		||||
|1 | EXCHANGE OUT DISTR|    |4000     |499 |
 | 
			
		||||
|2 |  TABLE SCAN       |t2  |4000     |499 |
 | 
			
		||||
============================================
 | 
			
		||||
 | 
			
		||||
Outputs & filters: 
 | 
			
		||||
-------------------------------------
 | 
			
		||||
  0 - output([t2.c1], [t2.c2]), filter(nil)
 | 
			
		||||
  1 - output([t2.c1], [t2.c2]), filter(nil)
 | 
			
		||||
  2 - output([t2.c1], [t2.c2]), filter(nil), 
 | 
			
		||||
      access([t2.c1], [t2.c2]), partitions(p[0-3])
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
当 t2 是一个分区表,可以在 table scan 上插入配对的 EXCHANGE 算子,从而将 TABLE SCAN 和 EXCHANGE OUT 封装成一个 job,可以用于并行的执行。
 | 
			
		||||
 | 
			
		||||
#### **单输入可下压算子** 
 | 
			
		||||
 | 
			
		||||
单输入可下压算子主要包括 AGGREGATION、SORT、 GROUP BY 和 LIMIT 算子等,除了 LIMIT 算子以外,其余所列举的算子都会有一个操作的键,如果操作的键和输入数据的数据分布是一致的,则可以做一阶段聚合操作,也即 Partition Wise Aggregation。如果操作的键和输入数据的数据分布是不一致的,则需要做两阶段聚合操作,聚合算子需要做下压操作。
 | 
			
		||||
 | 
			
		||||
一阶段聚合,如下例所示:
 | 
			
		||||
 | 
			
		||||
```javascript
 | 
			
		||||
explain select sum(c2) from t2 group by c1\G;
 | 
			
		||||
*************************** 1. row ***************************
 | 
			
		||||
Query Plan: 
 | 
			
		||||
============================================
 | 
			
		||||
|ID|OPERATOR           |NAME|EST. ROWS|COST|
 | 
			
		||||
--------------------------------------------
 | 
			
		||||
|0 |EXCHANGE IN DISTR  |    |1000     |2834|
 | 
			
		||||
|1 | EXCHANGE OUT DISTR|    |1000     |2740|
 | 
			
		||||
|2 |  HASH GROUP BY    |    |1000     |2740|
 | 
			
		||||
|3 |   TABLE SCAN      |t2  |4000     |499 |
 | 
			
		||||
============================================
 | 
			
		||||
 | 
			
		||||
Outputs & filters: 
 | 
			
		||||
-------------------------------------
 | 
			
		||||
  0 - output([T_FUN_SUM(t2.c2)]), filter(nil)
 | 
			
		||||
  1 - output([T_FUN_SUM(t2.c2)]), filter(nil)
 | 
			
		||||
  2 - output([T_FUN_SUM(t2.c2)]), filter(nil), 
 | 
			
		||||
      group([t2.c1]), agg_func([T_FUN_SUM(t2.c2)])
 | 
			
		||||
  3 - output([t2.c1], [t2.c2]), filter(nil), 
 | 
			
		||||
      access([t2.c1], [t2.c2]), partitions(p[0-3])
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
二阶段聚合如下例所示:
 | 
			
		||||
 | 
			
		||||
```javascript
 | 
			
		||||
explain select sum(c1) from t2 group by c2\G
 | 
			
		||||
*************************** 1. row ***************************
 | 
			
		||||
Query Plan: 
 | 
			
		||||
=============================================
 | 
			
		||||
|ID|OPERATOR            |NAME|EST. ROWS|COST|
 | 
			
		||||
---------------------------------------------
 | 
			
		||||
|0 |HASH GROUP BY       |    |1000     |3395|
 | 
			
		||||
|1 | EXCHANGE IN DISTR  |    |1000     |2834|
 | 
			
		||||
|2 |  EXCHANGE OUT DISTR|    |1000     |2740|
 | 
			
		||||
|3 |   HASH GROUP BY    |    |1000     |2740|
 | 
			
		||||
|4 |    TABLE SCAN      |t2  |4000     |499 |
 | 
			
		||||
=============================================
 | 
			
		||||
 | 
			
		||||
Outputs & filters: 
 | 
			
		||||
-------------------------------------
 | 
			
		||||
  0 - output([T_FUN_SUM(T_FUN_SUM(t2.c1))]), filter(nil), 
 | 
			
		||||
      group([t2.c2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.c1))])
 | 
			
		||||
  1 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
 | 
			
		||||
  2 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
 | 
			
		||||
  3 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil), 
 | 
			
		||||
      group([t2.c2]), agg_func([T_FUN_SUM(t2.c1)])
 | 
			
		||||
  4 - output([t2.c1], [t2.c2]), filter(nil), 
 | 
			
		||||
      access([t2.c1], [t2.c2]), partitions(p[0-3])
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
#### **二元输入算子** 
 | 
			
		||||
 | 
			
		||||
在 OceanBase 数据库当前版本中,主要考虑 JOIN 算子的情况。对于 JOIN 来说,主要有三种方式。
 | 
			
		||||
 | 
			
		||||
* PARTITION-WISE JOIN 当左右表都是分区表且分区方式相同,物理分布一样,且 JOIN 的连接条件为分区键时,可以使用以 partition 为单位的连接方法。如下例所示:
 | 
			
		||||
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
```javascript
 | 
			
		||||
  explain select * from t2, t3 where t2.c1 = t3.c1\G
 | 
			
		||||
*************************** 1. row ***************************
 | 
			
		||||
Query Plan: 
 | 
			
		||||
=============================================
 | 
			
		||||
|ID|OPERATOR           |NAME|EST. ROWS|COST |
 | 
			
		||||
---------------------------------------------
 | 
			
		||||
|0 |EXCHANGE IN DISTR  |    |31680    |35075|
 | 
			
		||||
|1 | EXCHANGE OUT DISTR|    |31680    |29077|
 | 
			
		||||
|2 |  HASH JOIN        |    |31680    |29077|
 | 
			
		||||
|3 |   TABLE SCAN      |t2  |4000     |499  |
 | 
			
		||||
|4 |   TABLE SCAN      |t3  |4000     |499  |
 | 
			
		||||
=============================================
 | 
			
		||||
 | 
			
		||||
Outputs & filters: 
 | 
			
		||||
-------------------------------------
 | 
			
		||||
  0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
 | 
			
		||||
  1 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
 | 
			
		||||
  2 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil), 
 | 
			
		||||
      equal_conds([t2.c1 = t3.c1]), other_conds(nil)
 | 
			
		||||
  3 - output([t2.c1], [t2.c2]), filter(nil), 
 | 
			
		||||
      access([t2.c1], [t2.c2]), partitions(p[0-3])
 | 
			
		||||
  4 - output([t3.c1], [t3.c2]), filter(nil), 
 | 
			
		||||
      access([t3.c1], [t3.c2]), partitions(p[0-3])
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
* PARTIAL PARTITION-WISE JOIN 当左右表中一个表为分区表,另一个表为非分区表,或者两者皆为分区表但是连接键仅和其中一个分区表的分区键相同的情况下,会以该分区表的分区分布为基准,重新分布另一个表的数据。如下例所示:
 | 
			
		||||
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
```javascript
 | 
			
		||||
explain select * from t1, t2 where t1.c1 = t2.c1\G
 | 
			
		||||
*************************** 1. row ***************************
 | 
			
		||||
Query Plan: 
 | 
			
		||||
======================================================
 | 
			
		||||
|ID|OPERATOR                     |NAME|EST. ROWS|COST|
 | 
			
		||||
------------------------------------------------------
 | 
			
		||||
|0 |EXCHANGE IN DISTR            |    |24       |1977|
 | 
			
		||||
|1 | EXCHANGE OUT DISTR          |    |24       |1973|
 | 
			
		||||
|2 |  HASH JOIN                  |    |24       |1973|
 | 
			
		||||
|3 |   EXCHANGE IN DISTR         |    |3        |37  |
 | 
			
		||||
|4 |    EXCHANGE OUT DISTR (PKEY)|    |3        |37  |
 | 
			
		||||
|5 |     TABLE SCAN              |t1  |3        |37  |
 | 
			
		||||
|6 |   TABLE SCAN                |t2  |4000     |499 |
 | 
			
		||||
======================================================
 | 
			
		||||
 | 
			
		||||
Outputs & filters: 
 | 
			
		||||
-------------------------------------
 | 
			
		||||
  0 - output([t1.c1], [t1.c2], [t2.c1], [t2.c2]), filter(nil)
 | 
			
		||||
  1 - output([t1.c1], [t1.c2], [t2.c1], [t2.c2]), filter(nil)
 | 
			
		||||
  2 - output([t1.c1], [t1.c2], [t2.c1], [t2.c2]), filter(nil), 
 | 
			
		||||
      equal_conds([t1.c1 = t2.c1]), other_conds(nil)
 | 
			
		||||
  3 - output([t1.c1], [t1.c2]), filter(nil)
 | 
			
		||||
  4 - (#keys=1, [t1.c1]), output([t1.c1], [t1.c2]), filter(nil)
 | 
			
		||||
  5 - output([t1.c1], [t1.c2]), filter(nil), 
 | 
			
		||||
      access([t1.c1], [t1.c2]), partitions(p0)
 | 
			
		||||
  6 - output([t2.c1], [t2.c2]), filter(nil), 
 | 
			
		||||
      access([t2.c1], [t2.c2]), partitions(p[0-3])
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
* 左右表都需要进行数据重分布当连接键和左右表的分区键都没有关系的情况下,由于一些实现的限制,当前会生成将左右表的数据都重新分布到一台计算节点上再执行连接的计划。未来会将此种计划的 JOIN 算子也能完全并行执行。生成的计划如下例所示:
 | 
			
		||||
 | 
			
		||||
  
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
```javascript
 | 
			
		||||
explain select * from t2, t3 where t2.c2 = t3.c2\G
 | 
			
		||||
*************************** 1. row ***************************
 | 
			
		||||
Query Plan: 
 | 
			
		||||
==============================================
 | 
			
		||||
|ID|OPERATOR            |NAME|EST. ROWS|COST |
 | 
			
		||||
----------------------------------------------
 | 
			
		||||
|0 |HASH JOIN           |    |31680    |29835|
 | 
			
		||||
|1 | EXCHANGE IN DISTR  |    |4000     |878  |
 | 
			
		||||
|2 |  EXCHANGE OUT DISTR|    |4000     |499  |
 | 
			
		||||
|3 |   TABLE SCAN       |t2  |4000     |499  |
 | 
			
		||||
|4 | EXCHANGE IN DISTR  |    |4000     |878  |
 | 
			
		||||
|5 |  EXCHANGE OUT DISTR|    |4000     |499  |
 | 
			
		||||
|6 |   TABLE SCAN       |t3  |4000     |499  |
 | 
			
		||||
==============================================
 | 
			
		||||
 | 
			
		||||
Outputs & filters: 
 | 
			
		||||
-------------------------------------
 | 
			
		||||
  0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil), 
 | 
			
		||||
      equal_conds([t2.c2 = t3.c2]), other_conds(nil)
 | 
			
		||||
  1 - output([t2.c1], [t2.c2]), filter(nil)
 | 
			
		||||
  2 - output([t2.c1], [t2.c2]), filter(nil)
 | 
			
		||||
  3 - output([t2.c1], [t2.c2]), filter(nil), 
 | 
			
		||||
      access([t2.c1], [t2.c2]), partitions(p[0-3])
 | 
			
		||||
  4 - output([t3.c1], [t3.c2]), filter(nil)
 | 
			
		||||
  5 - output([t3.c1], [t3.c2]), filter(nil)
 | 
			
		||||
  6 - output([t3.c1], [t3.c2]), filter(nil), 
 | 
			
		||||
      access([t3.c1], [t3.c2]), partitions(p[0-3])
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,81 @@
 | 
			
		||||
分布式执行计划调度
 | 
			
		||||
=========
 | 
			
		||||
 | 
			
		||||
分布式执行计划的简单调度模型为,在计划生成的最后阶段,以 exchange 节点为界,拆分成多个子计划,每个子计划被封装成为一个 DFO, 在并行度大于 1 的场景下, 会一次调度两个 DFO, 依次完成DFO树的遍历执行; 在并行度等于 1 的场景下, 每个 DFO 会将产生的数据存入中间结果管理器, 按照后序遍历的形式完成整个 DFO 树的遍历执行。
 | 
			
		||||
 | 
			
		||||
单 DFO 调度示例 
 | 
			
		||||
-------------------
 | 
			
		||||
 | 
			
		||||
在并行度为 1 的场景下, 对于如下查询计划执行单 DFO 调度。
 | 
			
		||||
 | 
			
		||||
```javascript
 | 
			
		||||
======================================================================================
 | 
			
		||||
|ID|OPERATOR                             |NAME                 |EST. ROWS |COST      |
 | 
			
		||||
--------------------------------------------------------------------------------------
 | 
			
		||||
|0 |LIMIT                                |                     |10        |6956829987|
 | 
			
		||||
|1 | PX COORDINATOR MERGE SORT           |                     |10        |6956829985|
 | 
			
		||||
|2 |  EXCHANGE OUT DISTR                 |:EX10002             |10        |6956829976|
 | 
			
		||||
|3 |   LIMIT                             |                     |10        |6956829976|
 | 
			
		||||
|4 |    TOP-N SORT                       |                     |10        |6956829975|
 | 
			
		||||
|5 |     HASH GROUP BY                   |                     |454381562 |5815592885|
 | 
			
		||||
|6 |      HASH JOIN                      |                     |500918979 |5299414557|
 | 
			
		||||
|7 |       EXCHANGE IN DISTR             |                     |225943610 |2081426759|
 | 
			
		||||
|8 |        EXCHANGE OUT DISTR (PKEY)    |:EX10001             |225943610 |1958446695|
 | 
			
		||||
|9 |         MATERIAL                    |                     |225943610 |1958446695|
 | 
			
		||||
|10|          HASH JOIN                  |                     |225943610 |1480989849|
 | 
			
		||||
|11|           JOIN FILTER CREATE        |                     |30142669  |122441311 |
 | 
			
		||||
|12|            PX PARTITION ITERATOR    |                     |30142669  |122441311 |
 | 
			
		||||
|13|             TABLE SCAN              |CUSTOMER             |30142669  |122441311 |
 | 
			
		||||
|14|           EXCHANGE IN DISTR         |                     |731011898 |900388059 |
 | 
			
		||||
|15|            EXCHANGE OUT DISTR (PKEY)|:EX10000             |731011898 |614947815 |
 | 
			
		||||
|16|             JOIN FILTER USE         |                     |731011898 |614947815 |
 | 
			
		||||
|17|              PX BLOCK ITERATOR      |                     |731011898 |614947815 |
 | 
			
		||||
|18|               TABLE SCAN            |ORDERS               |731011898 |614947815 |
 | 
			
		||||
|19|       PX PARTITION ITERATOR         |                     |3243094528|1040696710|
 | 
			
		||||
|20|        TABLE SCAN                   |LINEITEM(I_L_Q06_001)|3243094528|1040696710|
 | 
			
		||||
======================================================================================
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
如下图所示, DFO 树除 ROOT DFO 外, 在垂直方向上被分别划分为 0 , 1 , 2 个 DFO, 从而后序遍历调度的顺序为 0-\>1-\>2, 即可完成整个计划树的迭代。
 | 
			
		||||
 | 
			
		||||

 | 
			
		||||
 | 
			
		||||
两 DFO 调度示例 
 | 
			
		||||
-------------------
 | 
			
		||||
 | 
			
		||||
对于并行度大于 1 的计划, 调度方式会采用两 DFO 调度。对于如下查询计划执行两 DFO 调度:
 | 
			
		||||
 | 
			
		||||
```javascript
 | 
			
		||||
Query Plan
 | 
			
		||||
=============================================================================
 | 
			
		||||
|ID|OPERATOR                                   |NAME    |EST. ROWS|COST     |
 | 
			
		||||
-----------------------------------------------------------------------------
 | 
			
		||||
|0 |PX COORDINATOR MERGE SORT                  |        |9873917  |692436562|
 | 
			
		||||
|1 | EXCHANGE OUT DISTR                        |:EX10002|9873917  |689632565|
 | 
			
		||||
|2 |  SORT                                     |        |9873917  |689632565|
 | 
			
		||||
|3 |   SUBPLAN SCAN                            |VIEW5   |9873917  |636493382|
 | 
			
		||||
|4 |    WINDOW FUNCTION                        |        |29621749 |629924873|
 | 
			
		||||
|5 |     HASH GROUP BY                         |        |29621749 |624266752|
 | 
			
		||||
|6 |      HASH JOIN                            |        |31521003 |591048941|
 | 
			
		||||
|7 |       JOIN FILTER CREATE                  |        |407573   |7476793  |
 | 
			
		||||
|8 |        EXCHANGE IN DISTR                  |        |407573   |7476793  |
 | 
			
		||||
|9 |         EXCHANGE OUT DISTR (BROADCAST)    |:EX10001|407573   |7303180  |
 | 
			
		||||
|10|          HASH JOIN                        |        |407573   |7303180  |
 | 
			
		||||
|11|           JOIN FILTER CREATE              |        |1        |53       |
 | 
			
		||||
|12|            EXCHANGE IN DISTR              |        |1        |53       |
 | 
			
		||||
|13|             EXCHANGE OUT DISTR (BROADCAST)|:EX10000|1        |53       |
 | 
			
		||||
|14|              PX BLOCK ITERATOR            |        |1        |53       |
 | 
			
		||||
|15|               TABLE SCAN                  |NATION  |1        |53       |
 | 
			
		||||
|16|           JOIN FILTER USE                 |        |10189312 |3417602  |
 | 
			
		||||
|17|            PX BLOCK ITERATOR              |        |10189312 |3417602  |
 | 
			
		||||
|18|             TABLE SCAN                    |SUPPLIER|10189312 |3417602  |
 | 
			
		||||
|19|       JOIN FILTER USE                     |        |803481600|276540086|
 | 
			
		||||
|20|        PX PARTITION ITERATOR              |        |803481600|276540086|
 | 
			
		||||
|21|         TABLE SCAN                        |PARTSUPP|803481600|276540086|
 | 
			
		||||
=============================================================================
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
如下图所示,DFO 树除 ROOT DFO 外, 被划分为 3 个 DFO, 调度时会先调 0 和 1 对应的 DFO, 待 0 号 DFO 执行完毕后, 会再调度 1 和 2 号 DFO, 依次迭代完成执行。
 | 
			
		||||
 | 
			
		||||

 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,127 @@
 | 
			
		||||
分布式连接 
 | 
			
		||||
==========================
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
本节主要介绍 OceanBase 数据库支持的分布式连接的几种典型场景。
 | 
			
		||||
 | 
			
		||||
PARTITION-WISE JOIN 场景 
 | 
			
		||||
-------------------------------
 | 
			
		||||
 | 
			
		||||
PARTITION-WISE JOIN 是指两表的连接条件包含了两表的分区键,并且两表的分区方式是一样的。
 | 
			
		||||
 | 
			
		||||
在如下例所示,t2 和 t3 都是在 c1 这个列上进行了 key 分区,分区总数为 4 个分区,分区模式完全相同。当查询的条件为 `t2.c1=t3.c1 and t2.c2=t3.c2` 时,查询条件完全包含了分区键,查询可以以分区为单位在每个分区内进行。如果并行度为 4 的话,该查询可以同时做 4 个分区的JOIN 并且将最后结果输出。
 | 
			
		||||
 | 
			
		||||
```javascript
 | 
			
		||||
create table t2 (c1 int, c2 int) partition by key(c1) partitions 4;
 | 
			
		||||
create table t3 (c1 int, c2 int) partition by key(c1) partitions 4;
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
```javascript
 | 
			
		||||
explain select * from t2, t3 where t2.c1 = t3.c1 and t2.c2=t3.c2\G
 | 
			
		||||
*************************** 1. row ***************************
 | 
			
		||||
Query Plan: 
 | 
			
		||||
============================================
 | 
			
		||||
|ID|OPERATOR           |NAME|EST. ROWS|COST|
 | 
			
		||||
--------------------------------------------
 | 
			
		||||
|0 |EXCHANGE IN DISTR  |    |63       |8374|
 | 
			
		||||
|1 | EXCHANGE OUT DISTR|    |63       |8362|
 | 
			
		||||
|2 |  HASH JOIN        |    |63       |8362|
 | 
			
		||||
|3 |   TABLE SCAN      |t2  |4000     |499 |
 | 
			
		||||
|4 |   TABLE SCAN      |t3  |4000     |499 |
 | 
			
		||||
============================================
 | 
			
		||||
 | 
			
		||||
Outputs & filters: 
 | 
			
		||||
-------------------------------------
 | 
			
		||||
  0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
 | 
			
		||||
  1 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
 | 
			
		||||
  2 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil), 
 | 
			
		||||
      equal_conds([t2.c1 = t3.c1], [t2.c2 = t3.c2]), other_conds(nil)
 | 
			
		||||
  3 - output([t2.c1], [t2.c2]), filter(nil), 
 | 
			
		||||
      access([t2.c1], [t2.c2]), partitions(p[0-3])
 | 
			
		||||
  4 - output([t3.c1], [t3.c2]), filter(nil), 
 | 
			
		||||
      access([t3.c1], [t3.c2]), partitions(p[0-3])
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
以一边分区表的分区模式重新分布另一个表的场景 
 | 
			
		||||
-------------------------------
 | 
			
		||||
 | 
			
		||||
同样以 t2 和 t3 表为例,如下例所示,查询 JOIN 条件覆盖了 t2 的分区键而没有覆盖 t3 的分区键,所以执行计划将会按照 t2 的分区模式,将 t3 的数据进行以 partition key 为目标的分组。在将 t3 的数据依照 c2 的值和 t2 的 c1 列的分区方式打散重分布后,可以以 t2 的分区为单位进行分区间的并行连接。
 | 
			
		||||
 | 
			
		||||
```javascript
 | 
			
		||||
explain select * from t2, t3 where t2.c1 = t3.c2\G
 | 
			
		||||
*************************** 1. row ***************************
 | 
			
		||||
Query Plan: 
 | 
			
		||||
=======================================================
 | 
			
		||||
|ID|OPERATOR                     |NAME|EST. ROWS|COST |
 | 
			
		||||
-------------------------------------------------------
 | 
			
		||||
|0 |EXCHANGE IN DISTR            |    |31680    |35454|
 | 
			
		||||
|1 | EXCHANGE OUT DISTR          |    |31680    |29456|
 | 
			
		||||
|2 |  HASH JOIN                  |    |31680    |29456|
 | 
			
		||||
|3 |   TABLE SCAN                |t2  |4000     |499  |
 | 
			
		||||
|4 |   EXCHANGE IN DISTR         |    |4000     |878  |
 | 
			
		||||
|5 |    EXCHANGE OUT DISTR (PKEY)|    |4000     |499  |
 | 
			
		||||
|6 |     TABLE SCAN              |t3  |4000     |499  |
 | 
			
		||||
=======================================================
 | 
			
		||||
 | 
			
		||||
Outputs & filters: 
 | 
			
		||||
-------------------------------------
 | 
			
		||||
  0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
 | 
			
		||||
  1 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
 | 
			
		||||
  2 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil), 
 | 
			
		||||
      equal_conds([t2.c1 = t3.c2]), other_conds(nil)
 | 
			
		||||
  3 - output([t2.c1], [t2.c2]), filter(nil), 
 | 
			
		||||
      access([t2.c1], [t2.c2]), partitions(p[0-3])
 | 
			
		||||
  4 - output([t3.c1], [t3.c2]), filter(nil)
 | 
			
		||||
  5 - (#keys=1, [t3.c2]), output([t3.c1], [t3.c2]), filter(nil)
 | 
			
		||||
  6 - output([t3.c1], [t3.c2]), filter(nil), 
 | 
			
		||||
      access([t3.c1], [t3.c2]), partitions(p[0-3])
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
两个表都重新分布的场景 
 | 
			
		||||
--------------------
 | 
			
		||||
 | 
			
		||||
对于同样的 t2 和 t3 表,如果连接条件既没有包含 t2 的分区键也没有包含 t3 的分区键,会生成如下例所示的计划。注意这个计划的 JOIN 部分是在主线程完成。在后续的 OceanBase 数据库版本中,这样的计划的 JOIN 部分也会在工作线程以并行的方式执行。
 | 
			
		||||
 | 
			
		||||
```javascript
 | 
			
		||||
explain select * from t2, t3 where t2.c2 = t3.c2\G
 | 
			
		||||
*************************** 1. row ***************************
 | 
			
		||||
Query Plan: 
 | 
			
		||||
==============================================
 | 
			
		||||
|ID|OPERATOR            |NAME|EST. ROWS|COST |
 | 
			
		||||
----------------------------------------------
 | 
			
		||||
|0 |HASH JOIN           |    |31680    |29835|
 | 
			
		||||
|1 | EXCHANGE IN DISTR  |    |4000     |878  |
 | 
			
		||||
|2 |  EXCHANGE OUT DISTR|    |4000     |499  |
 | 
			
		||||
|3 |   TABLE SCAN       |t2  |4000     |499  |
 | 
			
		||||
|4 | EXCHANGE IN DISTR  |    |4000     |878  |
 | 
			
		||||
|5 |  EXCHANGE OUT DISTR|    |4000     |499  |
 | 
			
		||||
|6 |   TABLE SCAN       |t3  |4000     |499  |
 | 
			
		||||
==============================================
 | 
			
		||||
 | 
			
		||||
Outputs & filters: 
 | 
			
		||||
-------------------------------------
 | 
			
		||||
  0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil), 
 | 
			
		||||
      equal_conds([t2.c2 = t3.c2]), other_conds(nil)
 | 
			
		||||
  1 - output([t2.c1], [t2.c2]), filter(nil)
 | 
			
		||||
  2 - output([t2.c1], [t2.c2]), filter(nil)
 | 
			
		||||
  3 - output([t2.c1], [t2.c2]), filter(nil), 
 | 
			
		||||
      access([t2.c1], [t2.c2]), partitions(p[0-3])
 | 
			
		||||
  4 - output([t3.c1], [t3.c2]), filter(nil)
 | 
			
		||||
  5 - output([t3.c1], [t3.c2]), filter(nil)
 | 
			
		||||
  6 - output([t3.c1], [t3.c2]), filter(nil), 
 | 
			
		||||
      access([t3.c1], [t3.c2]), partitions(p[0-3])
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -0,0 +1,111 @@
 | 
			
		||||
分布式聚合下压 
 | 
			
		||||
============================
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
总的来看,聚合算子的下压可以分为 Partition-Wise Aggregation 计划和下压加两阶段聚合计划。
 | 
			
		||||
 | 
			
		||||
Partition-Wise Aggregation 
 | 
			
		||||
-----------------------------------
 | 
			
		||||
 | 
			
		||||
如果聚合查询的语意是按照分区键去做分组且在分组内做聚合,那么这样的聚合操作称为 Partition-Wise Aggregation。所有的操作都被下压到分区内并行的执行。需要注意的是,Partition-Wise Aggregation 不一定是最优的执行,因为并行度会受限于分区的数量。
 | 
			
		||||
 | 
			
		||||
```javascript
 | 
			
		||||
explain select sum(c2) from t2 group by c1\G;
 | 
			
		||||
*************************** 1. row ***************************
 | 
			
		||||
Query Plan: 
 | 
			
		||||
============================================
 | 
			
		||||
|ID|OPERATOR           |NAME|EST. ROWS|COST|
 | 
			
		||||
--------------------------------------------
 | 
			
		||||
|0 |EXCHANGE IN DISTR  |    |1000     |2834|
 | 
			
		||||
|1 | EXCHANGE OUT DISTR|    |1000     |2740|
 | 
			
		||||
|2 |  HASH GROUP BY    |    |1000     |2740|
 | 
			
		||||
|3 |   TABLE SCAN      |t2  |4000     |499 |
 | 
			
		||||
============================================
 | 
			
		||||
 | 
			
		||||
Outputs & filters: 
 | 
			
		||||
-------------------------------------
 | 
			
		||||
  0 - output([T_FUN_SUM(t2.c2)]), filter(nil)
 | 
			
		||||
  1 - output([T_FUN_SUM(t2.c2)]), filter(nil)
 | 
			
		||||
  2 - output([T_FUN_SUM(t2.c2)]), filter(nil), 
 | 
			
		||||
      group([t2.c1]), agg_func([T_FUN_SUM(t2.c2)])
 | 
			
		||||
  3 - output([t2.c1], [t2.c2]), filter(nil), 
 | 
			
		||||
      access([t2.c1], [t2.c2]), partitions(p[0-3])
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
下压加两阶段聚合 
 | 
			
		||||
-----------------
 | 
			
		||||
 | 
			
		||||
在更一般的情况下,aggregation 操作可能不是按照分区键作为分组来做的,在这样的情况下,OceanBase 数据库会采用两阶段聚合的方式,如下例所示。
 | 
			
		||||
 | 
			
		||||
1. 将 aggregation 操作下压,获得部分的聚合结果。
 | 
			
		||||
 | 
			
		||||
   
 | 
			
		||||
 | 
			
		||||
2. 进行汇总的方式获得最终结果。
 | 
			
		||||
 | 
			
		||||
   
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
```javascript
 | 
			
		||||
explain select sum(c1) from t2 group by c2\G;
 | 
			
		||||
*************************** 1. row ***************************
 | 
			
		||||
Query Plan: 
 | 
			
		||||
=============================================
 | 
			
		||||
|ID|OPERATOR            |NAME|EST. ROWS|COST|
 | 
			
		||||
---------------------------------------------
 | 
			
		||||
|0 |HASH GROUP BY       |    |1000     |3395|
 | 
			
		||||
|1 | EXCHANGE IN DISTR  |    |1000     |2834|
 | 
			
		||||
|2 |  EXCHANGE OUT DISTR|    |1000     |2740|
 | 
			
		||||
|3 |   HASH GROUP BY    |    |1000     |2740|
 | 
			
		||||
|4 |    TABLE SCAN      |t2  |4000     |499 |
 | 
			
		||||
=============================================
 | 
			
		||||
 | 
			
		||||
Outputs & filters: 
 | 
			
		||||
-------------------------------------
 | 
			
		||||
  0 - output([T_FUN_SUM(T_FUN_SUM(t2.c1))]), filter(nil), 
 | 
			
		||||
      group([t2.c2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.c1))])
 | 
			
		||||
  1 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
 | 
			
		||||
  2 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
 | 
			
		||||
  3 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil), 
 | 
			
		||||
      group([t2.c2]), agg_func([T_FUN_SUM(t2.c1)])
 | 
			
		||||
  4 - output([t2.c1], [t2.c2]), filter(nil), 
 | 
			
		||||
      access([t2.c1], [t2.c2]), partitions(p[0-3])
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
```javascript
 | 
			
		||||
explain select sum(c1) from t2 group by c2\G;
 | 
			
		||||
*************************** 1. row ***************************
 | 
			
		||||
Query Plan: 
 | 
			
		||||
=============================================
 | 
			
		||||
|ID|OPERATOR            |NAME|EST. ROWS|COST|
 | 
			
		||||
---------------------------------------------
 | 
			
		||||
|0 |HASH GROUP BY       |    |1000     |3395|
 | 
			
		||||
|1 | EXCHANGE IN DISTR  |    |1000     |2834|
 | 
			
		||||
|2 |  EXCHANGE OUT DISTR|    |1000     |2740|
 | 
			
		||||
|3 |   HASH GROUP BY    |    |1000     |2740|
 | 
			
		||||
|4 |    TABLE SCAN      |t2  |4000     |499 |
 | 
			
		||||
=============================================
 | 
			
		||||
 | 
			
		||||
Outputs & filters: 
 | 
			
		||||
-------------------------------------
 | 
			
		||||
  0 - output([T_FUN_SUM(T_FUN_SUM(t2.c1))]), filter(nil), 
 | 
			
		||||
      group([t2.c2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.c1))])
 | 
			
		||||
  1 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
 | 
			
		||||
  2 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
 | 
			
		||||
  3 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil), 
 | 
			
		||||
      group([t2.c2]), agg_func([T_FUN_SUM(t2.c1)])
 | 
			
		||||
  4 - output([t2.c1], [t2.c2]), filter(nil), 
 | 
			
		||||
      access([t2.c1], [t2.c2]), partitions(p[0-3])
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user