分布式计划的生成 ============================= OceanBase 数据库的优化器会分两阶段来生成分布式的执行计划。 1. 第一阶段,不考虑数据的物理分布,生成所有基于本地关系优化的最优执行计划。在本地计划生成后,优化器会检查数据是否访问了多个分区,或者是否是本地单分区表但是用户用 HINT 强制指定了采用并行查询执行。 2. 第二阶段,生成分布式计划。根据执行计划树,在需要进行数据重分布的地方,插入 EXCHANGE 节点,从而将原先的本地计划树变成分布式计划。 相关名词解释 --------------- * job 分布式计划以数据重分布点为边界,切分为可以并行执行的逻辑子计划,每个子计划由一个 job 进行封装。 * root job job 树最顶端的 job。 * 普通 job job 树的其它 job。 * 中间结果管理器(Intermediate Result Manager) 用于缓存需要在不同 job 间传递的数据。 * EXCHANGE 算子 用于进行跨 job 数据传递的算子,具体分为如下两种: * task 每个逻辑子计划(job)可以并行执行,每个并行执行的任务称为一个 task。 * 主线程 负责接收客户端请求、返回操作结果的线程,负责执行 root job。 * 工作线程 负责执行普通 job 的 task。 * 调度线程 负责调度 job。 分布式计划的相关算子 ------------------- 生成分布式计划的过程就是在原始计划树上寻找恰当位置插入 EXCHANGE 算子的过程,在自顶向下遍历计划树的时候,需要根据相应算子的数据处理的情况以及输入算子的数据分区情况,决定是否需要插入 EXCHANGE 算子。 如下示例为最简单的单表扫描: ```javascript explain select * from t2\G *************************** 1. row *************************** Query Plan: ============================================ |ID|OPERATOR |NAME|EST. ROWS|COST| -------------------------------------------- |0 |EXCHANGE IN DISTR | |4000 |878 | |1 | EXCHANGE OUT DISTR| |4000 |499 | |2 | TABLE SCAN |t2 |4000 |499 | ============================================ Outputs & filters: ------------------------------------- 0 - output([t2.c1], [t2.c2]), filter(nil) 1 - output([t2.c1], [t2.c2]), filter(nil) 2 - output([t2.c1], [t2.c2]), filter(nil), access([t2.c1], [t2.c2]), partitions(p[0-3]) ``` 当 t2 是一个分区表,可以在 table scan 上插入配对的 EXCHANGE 算子,从而将 TABLE SCAN 和 EXCHANGE OUT 封装成一个 job,可以用于并行的执行。 #### **单输入可下压算子** 单输入可下压算子主要包括 AGGREGATION、SORT、 GROUP BY 和 LIMIT 算子等,除了 LIMIT 算子以外,其余所列举的算子都会有一个操作的键,如果操作的键和输入数据的数据分布是一致的,则可以做一阶段聚合操作,也即 Partition Wise Aggregation。如果操作的键和输入数据的数据分布是不一致的,则需要做两阶段聚合操作,聚合算子需要做下压操作。 一阶段聚合,如下例所示: ```javascript explain select sum(c2) from t2 group by c1\G; *************************** 1. row *************************** Query Plan: ============================================ |ID|OPERATOR |NAME|EST. ROWS|COST| -------------------------------------------- |0 |EXCHANGE IN DISTR | |1000 |2834| |1 | EXCHANGE OUT DISTR| |1000 |2740| |2 | HASH GROUP BY | |1000 |2740| |3 | TABLE SCAN |t2 |4000 |499 | ============================================ Outputs & filters: ------------------------------------- 0 - output([T_FUN_SUM(t2.c2)]), filter(nil) 1 - output([T_FUN_SUM(t2.c2)]), filter(nil) 2 - output([T_FUN_SUM(t2.c2)]), filter(nil), group([t2.c1]), agg_func([T_FUN_SUM(t2.c2)]) 3 - output([t2.c1], [t2.c2]), filter(nil), access([t2.c1], [t2.c2]), partitions(p[0-3]) ``` 二阶段聚合如下例所示: ```javascript explain select sum(c1) from t2 group by c2\G *************************** 1. row *************************** Query Plan: ============================================= |ID|OPERATOR |NAME|EST. ROWS|COST| --------------------------------------------- |0 |HASH GROUP BY | |1000 |3395| |1 | EXCHANGE IN DISTR | |1000 |2834| |2 | EXCHANGE OUT DISTR| |1000 |2740| |3 | HASH GROUP BY | |1000 |2740| |4 | TABLE SCAN |t2 |4000 |499 | ============================================= Outputs & filters: ------------------------------------- 0 - output([T_FUN_SUM(T_FUN_SUM(t2.c1))]), filter(nil), group([t2.c2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.c1))]) 1 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil) 2 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil) 3 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil), group([t2.c2]), agg_func([T_FUN_SUM(t2.c1)]) 4 - output([t2.c1], [t2.c2]), filter(nil), access([t2.c1], [t2.c2]), partitions(p[0-3]) ``` #### **二元输入算子** 在 OceanBase 数据库当前版本中,主要考虑 JOIN 算子的情况。对于 JOIN 来说,主要有三种方式。 * PARTITION-WISE JOIN 当左右表都是分区表且分区方式相同,物理分布一样,且 JOIN 的连接条件为分区键时,可以使用以 partition 为单位的连接方法。如下例所示: ```javascript explain select * from t2, t3 where t2.c1 = t3.c1\G *************************** 1. row *************************** Query Plan: ============================================= |ID|OPERATOR |NAME|EST. ROWS|COST | --------------------------------------------- |0 |EXCHANGE IN DISTR | |31680 |35075| |1 | EXCHANGE OUT DISTR| |31680 |29077| |2 | HASH JOIN | |31680 |29077| |3 | TABLE SCAN |t2 |4000 |499 | |4 | TABLE SCAN |t3 |4000 |499 | ============================================= Outputs & filters: ------------------------------------- 0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil) 1 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil) 2 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil), equal_conds([t2.c1 = t3.c1]), other_conds(nil) 3 - output([t2.c1], [t2.c2]), filter(nil), access([t2.c1], [t2.c2]), partitions(p[0-3]) 4 - output([t3.c1], [t3.c2]), filter(nil), access([t3.c1], [t3.c2]), partitions(p[0-3]) ``` * PARTIAL PARTITION-WISE JOIN 当左右表中一个表为分区表,另一个表为非分区表,或者两者皆为分区表但是连接键仅和其中一个分区表的分区键相同的情况下,会以该分区表的分区分布为基准,重新分布另一个表的数据。如下例所示: ```javascript explain select * from t1, t2 where t1.c1 = t2.c1\G *************************** 1. row *************************** Query Plan: ====================================================== |ID|OPERATOR |NAME|EST. ROWS|COST| ------------------------------------------------------ |0 |EXCHANGE IN DISTR | |24 |1977| |1 | EXCHANGE OUT DISTR | |24 |1973| |2 | HASH JOIN | |24 |1973| |3 | EXCHANGE IN DISTR | |3 |37 | |4 | EXCHANGE OUT DISTR (PKEY)| |3 |37 | |5 | TABLE SCAN |t1 |3 |37 | |6 | TABLE SCAN |t2 |4000 |499 | ====================================================== Outputs & filters: ------------------------------------- 0 - output([t1.c1], [t1.c2], [t2.c1], [t2.c2]), filter(nil) 1 - output([t1.c1], [t1.c2], [t2.c1], [t2.c2]), filter(nil) 2 - output([t1.c1], [t1.c2], [t2.c1], [t2.c2]), filter(nil), equal_conds([t1.c1 = t2.c1]), other_conds(nil) 3 - output([t1.c1], [t1.c2]), filter(nil) 4 - (#keys=1, [t1.c1]), output([t1.c1], [t1.c2]), filter(nil) 5 - output([t1.c1], [t1.c2]), filter(nil), access([t1.c1], [t1.c2]), partitions(p0) 6 - output([t2.c1], [t2.c2]), filter(nil), access([t2.c1], [t2.c2]), partitions(p[0-3]) ``` * 左右表都需要进行数据重分布当连接键和左右表的分区键都没有关系的情况下,由于一些实现的限制,当前会生成将左右表的数据都重新分布到一台计算节点上再执行连接的计划。未来会将此种计划的 JOIN 算子也能完全并行执行。生成的计划如下例所示: ```javascript explain select * from t2, t3 where t2.c2 = t3.c2\G *************************** 1. row *************************** Query Plan: ============================================== |ID|OPERATOR |NAME|EST. ROWS|COST | ---------------------------------------------- |0 |HASH JOIN | |31680 |29835| |1 | EXCHANGE IN DISTR | |4000 |878 | |2 | EXCHANGE OUT DISTR| |4000 |499 | |3 | TABLE SCAN |t2 |4000 |499 | |4 | EXCHANGE IN DISTR | |4000 |878 | |5 | EXCHANGE OUT DISTR| |4000 |499 | |6 | TABLE SCAN |t3 |4000 |499 | ============================================== Outputs & filters: ------------------------------------- 0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil), equal_conds([t2.c2 = t3.c2]), other_conds(nil) 1 - output([t2.c1], [t2.c2]), filter(nil) 2 - output([t2.c1], [t2.c2]), filter(nil) 3 - output([t2.c1], [t2.c2]), filter(nil), access([t2.c1], [t2.c2]), partitions(p[0-3]) 4 - output([t3.c1], [t3.c2]), filter(nil) 5 - output([t3.c1], [t3.c2]), filter(nil) 6 - output([t3.c1], [t3.c2]), filter(nil), access([t3.c1], [t3.c2]), partitions(p[0-3]) ```