From a9287762d2f45aa14a350543c6a689f97bf81593 Mon Sep 17 00:00:00 2001 From: w00427717 Date: Thu, 15 Oct 2020 19:30:46 +0800 Subject: [PATCH] parallel search: hashjoin outer support --- src/gausskernel/optimizer/path/joinpath.cpp | 89 +++++++++++++++++++ src/gausskernel/optimizer/plan/createplan.cpp | 11 ++- src/gausskernel/optimizer/util/pathnode.cpp | 6 ++ src/gausskernel/optimizer/util/relnode.cpp | 1 + .../runtime/executor/execParallel.cpp | 21 +++-- .../regress/expected/parallel_hashjoin.out | 70 +++++++++++++++ src/test/regress/parallel_schedule | 2 +- src/test/regress/sql/parallel_hashjoin.sql | 22 +++++ 8 files changed, 212 insertions(+), 10 deletions(-) create mode 100644 src/test/regress/expected/parallel_hashjoin.out create mode 100644 src/test/regress/sql/parallel_hashjoin.sql diff --git a/src/gausskernel/optimizer/path/joinpath.cpp b/src/gausskernel/optimizer/path/joinpath.cpp index 95065b27c..0b2524976 100755 --- a/src/gausskernel/optimizer/path/joinpath.cpp +++ b/src/gausskernel/optimizer/path/joinpath.cpp @@ -866,6 +866,55 @@ static void try_hashjoin_path(PlannerInfo* root, RelOptInfo* joinrel, JoinType j } } +/* + * try_partial_hashjoin_path + * Consider a partial hashjoin join path; if it appears useful, push it into + * the joinrel's partial_pathlist via add_partial_path(). + */ +static void try_partial_hashjoin_path(PlannerInfo* root, RelOptInfo* joinrel, Path* outer_path, Path* inner_path, + List* hashclauses, JoinType jointype, JoinPathExtraData* extra) +{ + JoinCostWorkspace workspace; + + /* + * If the inner path is parameterized, the parameterization must be fully + * satisfied by the proposed outer path. Parameterized partial paths are + * not supported. The caller should already have verified that no + * extra_lateral_rels are required here. + */ + if (inner_path->param_info != NULL) { + Relids inner_paramrels = inner_path->param_info->ppi_req_outer; + + if (!bms_is_empty(inner_paramrels)) { + return; + } + } + + /* + * Before creating a path, get a quick lower bound on what it is likely + * to cost. Bail out right away if it looks terrible. + */ + initial_cost_hashjoin( + root, &workspace, jointype, hashclauses, outer_path, inner_path, extra->sjinfo, &extra->semifactors, 1); + if (!add_partial_path_precheck(joinrel, workspace.total_cost, NIL)) { + return; + } + + /* Might be good enough to be worth trying, so let's try it. */ + add_partial_path(joinrel, + (Path*)create_hashjoin_path(root, + joinrel, + jointype, + &workspace, + extra->sjinfo, + &extra->semifactors, + outer_path, + inner_path, + extra->restrictlist, + NULL, + hashclauses)); +} + /* * clause_sides_match_join * Determine whether a join clause is of the right form to use in this join. @@ -1748,6 +1797,46 @@ static void hash_inner_and_outer(PlannerInfo* root, RelOptInfo* joinrel, RelOptI } } j++; + + /* + * If the joinrel is parallel-safe, we may be able to consider a + * partial hash join. However, we can't handle JOIN_UNIQUE_OUTER, + * because the outer path will be partial, and therefore we won't be + * able to properly guarantee uniqueness. Also, the resulting path + * must not be parameterized. + */ + if (joinrel->consider_parallel && jointype != JOIN_UNIQUE_OUTER && outerrel->partial_pathlist != NIL) { + Path* cheapest_partial_outer; + Path* cheapest_safe_inner = NULL; + + cheapest_partial_outer = (Path*)linitial(outerrel->partial_pathlist); + + /* + * Normally, given that the joinrel is parallel-safe, the cheapest + * total inner path will also be parallel-safe, but if not, we'll + * have to search cheapest_parameterized_paths for the cheapest + * unparameterized inner path. + */ + if (cheapest_total_inner->parallel_safe) { + cheapest_safe_inner = cheapest_total_inner; + } else { + ListCell* lc; + + foreach (lc, innerrel->cheapest_parameterized_paths) { + Path* innerpath = (Path*)lfirst(lc); + + if (innerpath->parallel_safe && bms_is_empty(PATH_REQ_OUTER(innerpath))) { + cheapest_safe_inner = innerpath; + break; + } + } + } + + if (cheapest_safe_inner != NULL) { + try_partial_hashjoin_path( + root, joinrel, cheapest_partial_outer, cheapest_safe_inner, hashclauses, jointype, extra); + } + } } i++; } diff --git a/src/gausskernel/optimizer/plan/createplan.cpp b/src/gausskernel/optimizer/plan/createplan.cpp index b4be99793..6c7476aa6 100644 --- a/src/gausskernel/optimizer/plan/createplan.cpp +++ b/src/gausskernel/optimizer/plan/createplan.cpp @@ -1838,7 +1838,16 @@ static Gather* create_gather_plan(PlannerInfo* root, GatherPath* best_path) copy_path_costsize(&gather_plan->plan, &best_path->path); #ifdef STREAMPLAN - add_distribute_info(root, &gather_plan->plan, scan_relid, &(best_path->path), NULL); + switch (subplan->type) { + case T_HashJoin: + case T_MergeJoin: + case T_NestLoop: + inherit_plan_locator_info(&gather_plan->plan, subplan); + break; + default: + add_distribute_info(root, &gather_plan->plan, scan_relid, &(best_path->path), NULL); + break; + } #endif /* use parallel mode for parallel plans. */ diff --git a/src/gausskernel/optimizer/util/pathnode.cpp b/src/gausskernel/optimizer/util/pathnode.cpp index 5cb2408c8..ba095df1e 100755 --- a/src/gausskernel/optimizer/util/pathnode.cpp +++ b/src/gausskernel/optimizer/util/pathnode.cpp @@ -3788,6 +3788,12 @@ HashPath* create_hashjoin_path(PlannerInfo* root, RelOptInfo* joinrel, JoinType pathnode->jpath.path.param_info = get_joinrel_parampathinfo(root, joinrel, outer_path, inner_path, sjinfo, required_outer, &restrict_clauses); + pathnode->jpath.path.parallel_aware = false; + pathnode->jpath.path.parallel_safe = + joinrel->consider_parallel && outer_path->parallel_safe && inner_path->parallel_safe; + /* This is a foolish way to estimate parallel_degree, but for now... */ + pathnode->jpath.path.parallel_degree = outer_path->parallel_degree; + /* * A hashjoin never has pathkeys, since its output ordering is * unpredictable due to possible batching. XXX If the inner relation is diff --git a/src/gausskernel/optimizer/util/relnode.cpp b/src/gausskernel/optimizer/util/relnode.cpp index df60f250f..5c11a5045 100755 --- a/src/gausskernel/optimizer/util/relnode.cpp +++ b/src/gausskernel/optimizer/util/relnode.cpp @@ -557,6 +557,7 @@ RelOptInfo* build_join_rel(PlannerInfo* root, Relids joinrelids, RelOptInfo* out joinrel->reltargetlist = NIL; joinrel->pathlist = NIL; joinrel->ppilist = NIL; + joinrel->partial_pathlist = NIL; joinrel->cheapest_startup_path = NULL; joinrel->cheapest_total_path = NIL; joinrel->cheapest_unique_path = NULL; diff --git a/src/gausskernel/runtime/executor/execParallel.cpp b/src/gausskernel/runtime/executor/execParallel.cpp index 3a6b3f12e..1e3a59c76 100644 --- a/src/gausskernel/runtime/executor/execParallel.cpp +++ b/src/gausskernel/runtime/executor/execParallel.cpp @@ -1,7 +1,7 @@ /* ------------------------------------------------------------------------- * * execParallel.c - * Support routines for parallel execution. + * Support routines for parallel execution. * * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -16,7 +16,7 @@ * the actual plan to be passed down to the worker. * * IDENTIFICATION - * src/backend/executor/execParallel.c + * src/backend/executor/execParallel.c * * ------------------------------------------------------------------------- */ @@ -158,11 +158,8 @@ static bool ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateConte } /* - * Ordinary plan nodes won't do anything here, but parallel-aware plan nodes - * may need to initialize shared state in the DSM before parallel workers - * are available. They can allocate the space they previous estimated using - * shm_toc_allocate, and add the keys they previously estimated using - * shm_toc_insert, in each case targeting pcxt->toc. + * Initialize the dynamic shared memory segment that will be used to control + * parallel execution. */ static bool ExecParallelInitializeDSM(PlanState *planstate, ExecParallelInitializeDSMContext *d) { @@ -178,7 +175,15 @@ static bool ExecParallelInitializeDSM(PlanState *planstate, ExecParallelInitiali d->nnodes++; knl_u_parallel_context *cxt = (knl_u_parallel_context *)d->pcxt->seg; - /* Call initializers for parallel-aware plan nodes. */ + /* + * Call initializers for parallel-aware plan nodes. + * + * Ordinary plan nodes won't do anything here, but parallel-aware plan + * nodes may need to initialize shared state in the DSM before parallel + * workers are available. They can allocate the space they previously + * estimated using shm_toc_allocate, and add the keys they previously + * estimated using shm_toc_insert, in each case targeting pcxt->toc. + */ if (planstate->plan->parallel_aware) { switch (nodeTag(planstate)) { case T_SeqScanState: diff --git a/src/test/regress/expected/parallel_hashjoin.out b/src/test/regress/expected/parallel_hashjoin.out new file mode 100644 index 000000000..468f528eb --- /dev/null +++ b/src/test/regress/expected/parallel_hashjoin.out @@ -0,0 +1,70 @@ +create table parallel_hashjoin_test_a (id int); +create table parallel_hashjoin_test_b (id int); +insert into parallel_hashjoin_test_a select n from generate_series(1,1000) n; +insert into parallel_hashjoin_test_b select n from generate_series(1,10) n; +analyse parallel_hashjoin_test_a; +analyse parallel_hashjoin_test_b; +explain (costs off) select * from parallel_hashjoin_test_a left outer join parallel_hashjoin_test_b on parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id where parallel_hashjoin_test_a.id < 10; + QUERY PLAN +-------------------------------------------------------------------------- + Hash Left Join + Hash Cond: (parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id) + -> Seq Scan on parallel_hashjoin_test_a + Filter: (id < 10) + -> Hash + -> Seq Scan on parallel_hashjoin_test_b +(6 rows) + +select * from parallel_hashjoin_test_a left outer join parallel_hashjoin_test_b on parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id where parallel_hashjoin_test_a.id < 10; + id | id +----+---- + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 4 + 5 | 5 + 6 | 6 + 7 | 7 + 8 | 8 + 9 | 9 +(9 rows) + + +set parallel_setup_cost = 1; +set min_parallel_table_scan_size=0; +set parallel_tuple_cost = 0.01; +set enable_nestloop=off; + +explain (costs off) select * from parallel_hashjoin_test_a left outer join parallel_hashjoin_test_b on parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id where parallel_hashjoin_test_a.id < 10; + QUERY PLAN +-------------------------------------------------------------------------------- + Gather + Number of Workers: 2 + -> Hash Left Join + Hash Cond: (parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id) + -> Parallel Seq Scan on parallel_hashjoin_test_a + Filter: (id < 10) + -> Hash + -> Seq Scan on parallel_hashjoin_test_b +(8 rows) + +select * from parallel_hashjoin_test_a left outer join parallel_hashjoin_test_b on parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id where parallel_hashjoin_test_a.id < 10; + id | id +----+---- + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 4 + 5 | 5 + 6 | 6 + 7 | 7 + 8 | 8 + 9 | 9 +(9 rows) + + +reset parallel_setup_cost; +reset min_parallel_table_scan_size; +reset parallel_tuple_cost; +reset enable_nestloop; + diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 25384205d..a5083e127 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -596,7 +596,7 @@ test: create_procedure create_function pg_compatibility postgres_fdw test: autonomous_transaction # parallel query -test: parallel_query +test: parallel_query parallel_nested_loop parallel_hashjoin # gs_basebackup test: gs_basebackup diff --git a/src/test/regress/sql/parallel_hashjoin.sql b/src/test/regress/sql/parallel_hashjoin.sql new file mode 100644 index 000000000..0449e3982 --- /dev/null +++ b/src/test/regress/sql/parallel_hashjoin.sql @@ -0,0 +1,22 @@ +create table parallel_hashjoin_test_a (id int); +create table parallel_hashjoin_test_b (id int); +insert into parallel_hashjoin_test_a select n from generate_series(1,1000) n; +insert into parallel_hashjoin_test_b select n from generate_series(1,10) n; +analyse parallel_hashjoin_test_a; +analyse parallel_hashjoin_test_b; +explain (costs off) select * from parallel_hashjoin_test_a left outer join parallel_hashjoin_test_b on parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id where parallel_hashjoin_test_a.id < 10; +select * from parallel_hashjoin_test_a left outer join parallel_hashjoin_test_b on parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id where parallel_hashjoin_test_a.id < 10; + +set parallel_setup_cost = 1; +set min_parallel_table_scan_size=0; +set parallel_tuple_cost = 0.01; +set enable_nestloop=off; + +explain (costs off) select * from parallel_hashjoin_test_a left outer join parallel_hashjoin_test_b on parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id where parallel_hashjoin_test_a.id < 10; +select * from parallel_hashjoin_test_a left outer join parallel_hashjoin_test_b on parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id where parallel_hashjoin_test_a.id < 10; + +reset parallel_setup_cost; +reset min_parallel_table_scan_size; +reset parallel_tuple_cost; +reset enable_nestloop; +