From 381e9c548596fd41568aaed731b228ed04d4ef9c Mon Sep 17 00:00:00 2001 From: TotaJ Date: Mon, 7 Dec 2020 11:47:38 +0800 Subject: [PATCH] Parallel query for subplan/sublink. --- src/gausskernel/optimizer/path/allpaths.cpp | 16 ++++-- src/gausskernel/optimizer/path/joinpath.cpp | 56 ++++++++++--------- src/gausskernel/optimizer/plan/createplan.cpp | 7 ++- .../optimizer/plan/streamplan_single.cpp | 10 +++- src/gausskernel/optimizer/util/clauses.cpp | 51 ++++++++++++++++- src/gausskernel/optimizer/util/relnode.cpp | 3 +- src/gausskernel/runtime/executor/nodeHash.cpp | 13 ++--- src/test/regress/expected/parallel_append.out | 18 ++++-- src/test/regress/expected/parallel_query.out | 51 +++++++++++++++++ src/test/regress/sql/parallel_query.sql | 14 +++++ 10 files changed, 189 insertions(+), 50 deletions(-) diff --git a/src/gausskernel/optimizer/path/allpaths.cpp b/src/gausskernel/optimizer/path/allpaths.cpp index 120943197..a5adfd88e 100755 --- a/src/gausskernel/optimizer/path/allpaths.cpp +++ b/src/gausskernel/optimizer/path/allpaths.cpp @@ -1072,10 +1072,10 @@ static void set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, RangeT break; case RTE_VALUES: - /* - * The data for a VALUES clause is stored in the plan tree itself, - * so scanning it in a worker is fine. - */ + /* Check for parallel-restricted functions. */ + if (has_parallel_hazard((Node *)rte->values_lists, false)) { + return; + } break; case RTE_CTE: @@ -1102,6 +1102,14 @@ static void set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, RangeT if (has_parallel_hazard((Node *)rel->baserestrictinfo, false)) return; + /* + * Likewise, if the relation's outputs are not parallel-safe, give up. + * (Usually, they're just Vars, but sometimes they're not.) + */ + if (has_parallel_hazard((Node *)rel->reltargetlist, false)) { + return; + } + /* We have a winner. */ rel->consider_parallel = true; } diff --git a/src/gausskernel/optimizer/path/joinpath.cpp b/src/gausskernel/optimizer/path/joinpath.cpp index 3c119f0bf..e7e61de8c 100755 --- a/src/gausskernel/optimizer/path/joinpath.cpp +++ b/src/gausskernel/optimizer/path/joinpath.cpp @@ -43,6 +43,10 @@ #include "pgxc/pgxc.h" #include "parser/parsetree.h" +#define IS_JOIN_TYPE_PARALLEL_SAFE(jointype) ((jointype) != JOIN_UNIQUE_OUTER && (jointype) != JOIN_FULL && \ + (jointype) != JOIN_RIGHT && (jointype) != JOIN_RIGHT_SEMI && (jointype) != JOIN_RIGHT_ANTI && \ + (jointype) != JOIN_RIGHT_ANTI_FULL) + static void try_partial_mergejoin_path(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, SpecialJoinInfo *sjinfo, Path *outer_path, Path *inner_path, List *restrict_clauses, List *pathkeys, List *mergeclauses, List *outersortkeys, List *innersortkeys); @@ -1098,8 +1102,8 @@ static void sort_inner_and_outer(PlannerInfo* root, RelOptInfo* joinrel, RelOptI * JOIN_RIGHT, because they can produce false null extended rows. Also, * the resulting path must not be parameterized. */ - if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && save_jointype != JOIN_FULL && - save_jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL) { + if (joinrel->consider_parallel && IS_JOIN_TYPE_PARALLEL_SAFE(save_jointype) + && outerrel->partial_pathlist != NIL) { cheapest_partial_outer = (Path *)linitial(outerrel->partial_pathlist); if (inner_path->parallel_safe) { @@ -1446,8 +1450,8 @@ static void match_unsorted_outer(PlannerInfo* root, RelOptInfo* joinrel, RelOptI * parameterized. Similarly, we can't handle JOIN_FULL and JOIN_RIGHT, * because they can produce false null extended rows. */ - if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && save_jointype != JOIN_FULL && - save_jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL) { + if (joinrel->consider_parallel && IS_JOIN_TYPE_PARALLEL_SAFE(save_jointype) && + outerrel->partial_pathlist != NIL) { if (nestjoinOK) { consider_parallel_nestloop(root, joinrel, outerrel, innerrel, save_jointype, extra); } @@ -1525,8 +1529,13 @@ static void consider_parallel_mergejoin(PlannerInfo *root, RelOptInfo *joinrel, static void consider_parallel_nestloop(PlannerInfo* root, RelOptInfo* joinrel, RelOptInfo* outerrel, RelOptInfo* innerrel, JoinType jointype, JoinPathExtraData* extra) { + JoinType saveJointype = jointype; ListCell* lc1 = NULL; + if (jointype == JOIN_UNIQUE_INNER) { + jointype = JOIN_INNER; + } + foreach (lc1, outerrel->partial_pathlist) { Path* outerpath = (Path*)lfirst(lc1); List* pathkeys; @@ -1549,12 +1558,13 @@ static void consider_parallel_nestloop(PlannerInfo* root, RelOptInfo* joinrel, R continue; /* - * Like match_unsorted_outer, we only consider a single nestloop - * path when the jointype is JOIN_UNIQUE_INNER. But we have to scan - * cheapest_parameterized_paths to find the one we want to consider, - * because cheapest_total_path might not be parallel-safe. + * If we're doing JOIN_UNIQUE_INNER, we can only use the inner's + * cheapest_total_path, and we have to unique-ify it. (We might + * be able to relax this to allow other safe, unparameterized + * inner paths, but right now create_unique_path is not on board + * with that.) */ - if (jointype == JOIN_UNIQUE_INNER) { + if (saveJointype == JOIN_UNIQUE_INNER) { if (!bms_is_empty(PATH_REQ_OUTER(innerpath))) continue; innerpath = (Path*)create_unique_path(root, innerrel, innerpath, extra->sjinfo); @@ -1795,8 +1805,8 @@ static void hash_inner_and_outer(PlannerInfo* root, RelOptInfo* joinrel, RelOptI * single set of match bits for each batch, but that will require * figuring out a deadlock-free way to wait for the probe to finish. */ - if (joinrel->consider_parallel && jointype != JOIN_UNIQUE_OUTER && jointype != JOIN_FULL && - jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL) { + if (joinrel->consider_parallel && IS_JOIN_TYPE_PARALLEL_SAFE(save_jointype) && + outerrel->partial_pathlist != NIL) { Path* cheapest_partial_outer = NULL; Path* cheapest_partial_inner = NULL; Path* cheapest_safe_inner = NULL; @@ -1805,9 +1815,11 @@ static void hash_inner_and_outer(PlannerInfo* root, RelOptInfo* joinrel, RelOptI /* * Can we use a partial inner plan too, so that we can build a - * shared hash table in parallel? + * shared hash table in parallel? We can't handle + * JOIN_UNIQUE_INNER because we can't guarantee uniqueness. */ - if (innerrel->partial_pathlist != NIL && u_sess->attr.attr_sql.enable_parallel_hash) { + if (innerrel->partial_pathlist != NIL && save_jointype != JOIN_UNIQUE_INNER && + u_sess->attr.attr_sql.enable_parallel_hash) { cheapest_partial_inner = (Path*)linitial(innerrel->partial_pathlist); try_partial_hashjoin_path(root, joinrel, @@ -1822,22 +1834,14 @@ static void hash_inner_and_outer(PlannerInfo* root, RelOptInfo* joinrel, RelOptI /* * Normally, given that the joinrel is parallel-safe, the cheapest * total inner path will also be parallel-safe, but if not, we'll - * have to search cheapest_parameterized_paths for the cheapest - * unparameterized inner path. + * have to search for the cheapest safe, unparameterized inner + * path. If doing JOIN_UNIQUE_INNER, we can't use any alternative + * inner path. */ if (cheapest_total_inner->parallel_safe) { cheapest_safe_inner = cheapest_total_inner; - } else { - ListCell* lc; - - foreach (lc, innerrel->cheapest_parameterized_paths) { - Path* innerpath = (Path*)lfirst(lc); - - if (innerpath->parallel_safe && bms_is_empty(PATH_REQ_OUTER(innerpath))) { - cheapest_safe_inner = innerpath; - break; - } - } + } else if (save_jointype != JOIN_UNIQUE_INNER) { + cheapest_safe_inner = get_cheapest_parallel_safe_total_inner(innerrel->pathlist); } if (cheapest_safe_inner != NULL) { diff --git a/src/gausskernel/optimizer/plan/createplan.cpp b/src/gausskernel/optimizer/plan/createplan.cpp index 8ebac568a..832815162 100644 --- a/src/gausskernel/optimizer/plan/createplan.cpp +++ b/src/gausskernel/optimizer/plan/createplan.cpp @@ -1838,7 +1838,12 @@ static Gather* create_gather_plan(PlannerInfo* root, GatherPath* best_path) disuse_physical_tlist(subplan, best_path->subpath); - Gather* gather_plan = make_gather(subplan->targetlist, + /* + * Copy a new target list for gather, since in merge join case, it will change the targetlist. + * If we just use a pointer to subplan->targetlist, then it will change the subplan's targetlist + * at same time, which we don't want. Check prepare_sort_from_pathkeys for the targetlist. + */ + Gather* gather_plan = make_gather(list_copy(subplan->targetlist), NIL, best_path->path.parallel_workers, SS_assign_special_param(root), diff --git a/src/gausskernel/optimizer/plan/streamplan_single.cpp b/src/gausskernel/optimizer/plan/streamplan_single.cpp index 335f27d82..666a5a9d9 100755 --- a/src/gausskernel/optimizer/plan/streamplan_single.cpp +++ b/src/gausskernel/optimizer/plan/streamplan_single.cpp @@ -488,7 +488,7 @@ void finalize_node_id(Plan* result_plan, int* plan_node_id, int* parent_node_id, int save_parent_id = *parent_node_id; (*plan_node_id)++; (*num_plannodes)++; - if (result_plan->initPlan && IS_STREAM_PLAN) { + if (result_plan->initPlan && (IS_SINGLE_NODE || IS_STREAM_PLAN)) { List* cteLinkList = NIL; ListCell* lc = NULL; foreach (lc, result_plan->initPlan) { @@ -496,6 +496,7 @@ void finalize_node_id(Plan* result_plan, int* plan_node_id, int* parent_node_id, if (plan->subLinkType == CTE_SUBLINK) cteLinkList = lappend(cteLinkList, plan); } +#ifdef ENABLE_MULTIPLE_NODES if (cteLinkList != NIL) { if (IsA(result_plan, ValuesScan)) { sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason, @@ -505,7 +506,8 @@ void finalize_node_id(Plan* result_plan, int* plan_node_id, int* parent_node_id, mark_stream_unsupport(); } } - if (IS_STREAM_PLAN) +#endif + if (IS_SINGLE_NODE || IS_STREAM_PLAN) *initplans = list_concat(*initplans, list_copy(result_plan->initPlan)); } switch (nodeTag(result_plan)) { @@ -876,7 +878,7 @@ void finalize_node_id(Plan* result_plan, int* plan_node_id, int* parent_node_id, } break; } - if (IS_STREAM_PLAN) { + if (IS_STREAM_PLAN || IS_SINGLE_NODE) { if (is_replicated_plan(result_plan) && is_execute_on_multinodes(result_plan)) { List* nodelist = check_random_expr(result_plan); if (list_length(nodelist) > 0) { @@ -924,6 +926,7 @@ void finalize_node_id(Plan* result_plan, int* plan_node_id, int* parent_node_id, } else subplan_ids[subplan->plan_id] = subplan_ids[0]; if (!has_finalized) { +#ifdef ENABLE_MULTIPLE_NODES if (is_execute_on_coordinator(result_plan) || (is_execute_on_allnodes(result_plan) && !is_data_node_exec)) { Plan* child_plan = NULL; @@ -973,6 +976,7 @@ void finalize_node_id(Plan* result_plan, int* plan_node_id, int* parent_node_id, } pushdown_execnodes(plan, result_plan->exec_nodes, false, true); } +#endif if (check_stream_support()) { PlannerInfo* subroot = NULL; Plan* child_root = NULL; diff --git a/src/gausskernel/optimizer/util/clauses.cpp b/src/gausskernel/optimizer/util/clauses.cpp index f58b23255..c0a62fd33 100644 --- a/src/gausskernel/optimizer/util/clauses.cpp +++ b/src/gausskernel/optimizer/util/clauses.cpp @@ -91,6 +91,7 @@ typedef struct { typedef struct { bool allow_restricted; + List *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */ } has_parallel_hazard_arg; @@ -1168,6 +1169,7 @@ bool has_parallel_hazard(Node *node, bool allow_restricted) has_parallel_hazard_arg context; context.allow_restricted = allow_restricted; + context.safe_param_ids = NIL; return has_parallel_hazard_walker(node, &context); } @@ -1196,8 +1198,53 @@ static bool has_parallel_hazard_walker(Node *node, has_parallel_hazard_arg *cont /* Recurse into subselects */ return query_tree_walker(query, (bool (*)())has_parallel_hazard_walker, context, 0); - } else if (IsA(node, SubPlan) || IsA(node, SubLink) || IsA(node, AlternativeSubPlan) || IsA(node, Param)) { - return true; + } else if (IsA(node, SubLink) || IsA(node, AlternativeSubPlan)) { + if (!context->allow_restricted) { + return true; + } + } else if (IsA(node, SubPlan)) { + /* + * Only parallel-safe SubPlans can be sent to workers. Within the + * testexpr of the SubPlan, Params representing the output columns of the + * subplan can be treated as parallel-safe, so temporarily add their IDs + * to the safe_param_ids list while examining the testexpr. + */ + SubPlan *subplan = (SubPlan *)node; + + if (!subplan->parallel_safe && !context->allow_restricted) { + return true; + } + List *saveSafeParamIds = context->safe_param_ids; + context->safe_param_ids = list_concat(list_copy(context->safe_param_ids), list_copy(subplan->paramIds)); + if (has_parallel_hazard_walker(subplan->testexpr, context)) { + return true; /* no need to restore safe_param_ids */ + } + list_free(context->safe_param_ids); + context->safe_param_ids = saveSafeParamIds; + /* we must also check args, but no special Param treatment there */ + if (has_parallel_hazard_walker((Node *) subplan->args, context)) { + return true; + } + /* don't want to recurse normally, so we're done */ + return false; + } else if (IsA(node, Param)) { + /* + * We can't pass Params to workers at the moment either, so they are also + * parallel-restricted, unless they are PARAM_EXTERN Params or are + * PARAM_EXEC Params listed in safe_param_ids, meaning they could be + * either generated within workers or can be computed by the leader and + * then their value can be passed to workers. + */ + Param *param = (Param *)node; + if (param->paramkind == PARAM_EXTERN) { + return false; + } + if (param->paramkind != PARAM_EXEC || !list_member_int(context->safe_param_ids, param->paramid)) { + if (!context->allow_restricted) { + return true; + } + } + return false; /* nothing to recurse to */ } /* This is just a notational convenience for callers. */ diff --git a/src/gausskernel/optimizer/util/relnode.cpp b/src/gausskernel/optimizer/util/relnode.cpp index f2d132f0d..9492696d0 100755 --- a/src/gausskernel/optimizer/util/relnode.cpp +++ b/src/gausskernel/optimizer/util/relnode.cpp @@ -647,7 +647,8 @@ RelOptInfo* build_join_rel(PlannerInfo* root, Relids joinrelids, RelOptInfo* out * here. */ if (inner_rel->consider_parallel && outer_rel->consider_parallel && - !has_parallel_hazard((Node *)restrictlist, false)) { + !has_parallel_hazard((Node *)restrictlist, false) && + !has_parallel_hazard((Node *)joinrel->reltargetlist, false)) { joinrel->consider_parallel = true; } diff --git a/src/gausskernel/runtime/executor/nodeHash.cpp b/src/gausskernel/runtime/executor/nodeHash.cpp index 66da4766b..a2c0c288f 100755 --- a/src/gausskernel/runtime/executor/nodeHash.cpp +++ b/src/gausskernel/runtime/executor/nodeHash.cpp @@ -2047,14 +2047,13 @@ bool ExecParallelScanHashBucket(HashJoinState* hjstate, ExprContext* econtext) return true; } } - /* - * For right Semi/Anti join, we delete mathced tuples in HashTable to make next matching faster, - * so pointer hj_PreTuple is designed to follow the hj_CurTuple and to help us to clear the HashTable. - */ - if (hjstate->js.jointype == JOIN_RIGHT_SEMI || hjstate->js.jointype == JOIN_RIGHT_ANTI) { - hjstate->hj_PreTuple = hashTuple; - } + /* + * We don't support parallel right Semi/Anti join, so we don't set hj_PreTuple like ExecScanHashBucket + * did. Check ExecScanHashBucket and hash_inner_and_outer. + */ + Assert(hjstate->js.jointype != JOIN_RIGHT_SEMI); + Assert(hjstate->js.jointype != JOIN_RIGHT_ANTI); hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); } diff --git a/src/test/regress/expected/parallel_append.out b/src/test/regress/expected/parallel_append.out index c77521bd1..54c017bb1 100644 --- a/src/test/regress/expected/parallel_append.out +++ b/src/test/regress/expected/parallel_append.out @@ -82,18 +82,24 @@ explain (costs off) select * from a where a1 > 4 union all select * from b where (10 rows) explain (costs off) select * from c where c1 in (select a1 from a union select b1 from b); - QUERY PLAN ---------------------------------- + QUERY PLAN +------------------------------------------------ Hash Join Hash Cond: (a.a1 = c.c1) -> HashAggregate Group By Key: a.a1 -> Append - -> Seq Scan on a - -> Seq Scan on b + -> Gather + Number of Workers: 1 + -> Parallel Seq Scan on a + -> Gather + Number of Workers: 1 + -> Parallel Seq Scan on b -> Hash - -> Seq Scan on c -(9 rows) + -> Gather + Number of Workers: 1 + -> Parallel Seq Scan on c +(15 rows) explain (costs off) select * from (select * from a union all select * from b) as ta, c where ta.a1 = c.c1; QUERY PLAN diff --git a/src/test/regress/expected/parallel_query.out b/src/test/regress/expected/parallel_query.out index 0689d8008..4ad1ecd5a 100644 --- a/src/test/regress/expected/parallel_query.out +++ b/src/test/regress/expected/parallel_query.out @@ -1061,6 +1061,57 @@ select count(*) from parallel_t1,parallel_t2 where parallel_t1.a=parallel_t2.a; reset enable_hashjoin; reset enable_nestloop; reset enable_indexscan; +--parallel with subplan +drop table subplan_tb1; +ERROR: table "subplan_tb1" does not exist +create table subplan_tb1(a int, b varchar); +insert into subplan_tb1 values (0, NULL); +insert into subplan_tb1 values (1, NULL); +insert into subplan_tb1 values (2, NULL); +insert into subplan_tb1 values (3, NULL); +explain (verbose,costs off) select boo_1.a +from subplan_tb1 boo_1 inner join subplan_tb1 boo_2 +on ( case when boo_1.a in ( +select boo_3.a from subplan_tb1 boo_3 ) then boo_1.a end ) in +(select max( boo.a ) column_009 from subplan_tb1 boo); + QUERY PLAN +--------------------------------------------------------------------------------------------------- + Merge Join + Output: boo_1.a + Merge Cond: ((max(boo.a)) = (CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END)) + -> Sort + Output: (max(boo.a)) + Sort Key: (max(boo.a)) + -> Nested Loop + Output: (max(boo.a)) + -> Aggregate + Output: max(boo.a) + -> Gather + Output: boo.a + Number of Workers: 2 + -> Parallel Seq Scan on public.subplan_tb1 boo + Output: boo.a + -> Gather + Output: boo_2.a, boo_2.b + Number of Workers: 2 + -> Parallel Seq Scan on public.subplan_tb1 boo_2 + Output: boo_2.a, boo_2.b + -> Sort + Output: boo_1.a, (CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END) + Sort Key: (CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END) + -> Gather + Output: boo_1.a, CASE WHEN (hashed SubPlan 1) THEN boo_1.a ELSE NULL::integer END + Number of Workers: 2 + -> Parallel Seq Scan on public.subplan_tb1 boo_1 + Output: boo_1.a + SubPlan 1 + -> Gather + Output: boo_3.a + Number of Workers: 2 + -> Parallel Seq Scan on public.subplan_tb1 boo_3 + Output: boo_3.a +(34 rows) + --clean up drop table parallel_t1; drop table parallel_t2; diff --git a/src/test/regress/sql/parallel_query.sql b/src/test/regress/sql/parallel_query.sql index e7d2bbc14..30440ede4 100644 --- a/src/test/regress/sql/parallel_query.sql +++ b/src/test/regress/sql/parallel_query.sql @@ -278,6 +278,20 @@ reset enable_hashjoin; reset enable_nestloop; reset enable_indexscan; +--parallel with subplan +drop table subplan_tb1; +create table subplan_tb1(a int, b varchar); +insert into subplan_tb1 values (0, NULL); +insert into subplan_tb1 values (1, NULL); +insert into subplan_tb1 values (2, NULL); +insert into subplan_tb1 values (3, NULL); + +explain (verbose,costs off) select boo_1.a +from subplan_tb1 boo_1 inner join subplan_tb1 boo_2 +on ( case when boo_1.a in ( +select boo_3.a from subplan_tb1 boo_3 ) then boo_1.a end ) in +(select max( boo.a ) column_009 from subplan_tb1 boo); + --clean up drop table parallel_t1; drop table parallel_t2;