From c45feed3e45112fff47b56e0a30e76001816ab44 Mon Sep 17 00:00:00 2001 From: chenxiaobin Date: Thu, 5 Nov 2020 16:39:20 +0800 Subject: [PATCH] parallel index scans --- src/bin/gs_guc/cluster_guc.conf | 1 + src/common/backend/nodes/copyfuncs.cpp | 2 + src/common/backend/nodes/equalfuncs.cpp | 1 + src/common/backend/nodes/outfuncs.cpp | 2 + src/common/backend/nodes/readfuncs.cpp | 2 + src/common/backend/utils/adt/selfuncs.cpp | 2 + src/common/backend/utils/misc/guc.cpp | 18 ++ src/gausskernel/optimizer/geqo/geqo_eval.cpp | 3 + src/gausskernel/optimizer/path/allpaths.cpp | 244 +++++++++++----- src/gausskernel/optimizer/path/costsize.cpp | 51 +++- src/gausskernel/optimizer/path/indxpath.cpp | 64 ++++- src/gausskernel/optimizer/path/joinpath.cpp | 4 - src/gausskernel/optimizer/plan/createplan.cpp | 33 ++- .../optimizer/plan/pgxcplan_single.cpp | 2 +- src/gausskernel/optimizer/plan/planner.cpp | 8 +- src/gausskernel/optimizer/plan/subselect.cpp | 6 + src/gausskernel/optimizer/util/pathnode.cpp | 58 +++- src/gausskernel/optimizer/util/plancat.cpp | 4 + src/gausskernel/optimizer/util/relnode.cpp | 1 + .../runtime/executor/execParallel.cpp | 14 +- .../runtime/executor/nodeIndexscan.cpp | 161 +++++++++-- .../storage/access/common/reloptions.cpp | 16 +- .../storage/access/index/indexam.cpp | 124 +++++++- .../storage/access/nbtree/nbtree.cpp | 233 ++++++++++++++- .../storage/access/nbtree/nbtsearch.cpp | 268 +++++++++++++++--- .../storage/access/nbtree/nbtutils.cpp | 5 + src/include/access/genam.h | 8 + src/include/access/nbtree.h | 18 +- src/include/access/relscan.h | 15 + src/include/c.h | 2 + src/include/catalog/pg_am.h | 8 +- src/include/executor/nodeIndexscan.h | 4 + .../knl/knl_guc/knl_session_attr_sql.h | 1 + src/include/knl/knl_session.h | 3 + src/include/nodes/execnodes.h | 4 + src/include/nodes/plannodes.h | 1 + src/include/nodes/primnodes.h | 1 + src/include/nodes/relation.h | 2 + src/include/optimizer/cost.h | 2 +- src/include/optimizer/pathnode.h | 2 +- src/include/optimizer/paths.h | 1 + src/include/optimizer/planmain.h | 2 +- src/include/utils/rel.h | 11 + src/include/workload/ctxctl.h | 5 + src/test/regress/expected/hw_smp.out | 2 + src/test/regress/expected/parallel_append.out | 202 ++++++------- src/test/regress/expected/parallel_query.out | 123 ++++++++ .../regress/expected/single_node_union.out | 20 +- src/test/regress/sql/hw_smp.sql | 2 + src/test/regress/sql/parallel_append.sql | 6 +- src/test/regress/sql/parallel_query.sql | 53 +++- src/tools/pgindent/typedefs.list | 3 + 52 files changed, 1530 insertions(+), 298 deletions(-) diff --git a/src/bin/gs_guc/cluster_guc.conf b/src/bin/gs_guc/cluster_guc.conf index cd80cb470..264ee08db 100644 --- a/src/bin/gs_guc/cluster_guc.conf +++ b/src/bin/gs_guc/cluster_guc.conf @@ -538,6 +538,7 @@ max_inner_tool_connections|int|1,8388607|NULL|NULL| max_keep_log_seg|int|0,2147483647|NULL|NULL| max_background_workers|int|0,262143|NULL|NULL| min_parallel_table_scan_size|int|0,715827882|kB|NULL| +min_parallel_index_scan_size|int|0,715827882|kB|NULL| max_parallel_workers_per_gather|int|0,1024|NULL|NULL| parallel_tuple_cost|real|0,1.79769e+308|NULL|NULL| parallel_setup_cost|real|0,1.79769e+308|NULL|NULL| diff --git a/src/common/backend/nodes/copyfuncs.cpp b/src/common/backend/nodes/copyfuncs.cpp index 7bde5e406..00ce7d622 100644 --- a/src/common/backend/nodes/copyfuncs.cpp +++ b/src/common/backend/nodes/copyfuncs.cpp @@ -177,6 +177,7 @@ static void CopyPlanFields(const Plan* from, Plan* newnode) COPY_SCALAR_FIELD(multiple); COPY_SCALAR_FIELD(plan_width); COPY_SCALAR_FIELD(parallel_aware); + COPY_SCALAR_FIELD(parallel_safe); COPY_SCALAR_FIELD(dop); COPY_NODE_FIELD(targetlist); COPY_NODE_FIELD(qual); @@ -2549,6 +2550,7 @@ static SubPlan* _copySubPlan(const SubPlan* from) COPY_SCALAR_FIELD(firstColCollation); COPY_SCALAR_FIELD(useHashTable); COPY_SCALAR_FIELD(unknownEqFalse); + COPY_SCALAR_FIELD(parallel_safe); COPY_NODE_FIELD(setParam); COPY_NODE_FIELD(parParam); COPY_NODE_FIELD(args); diff --git a/src/common/backend/nodes/equalfuncs.cpp b/src/common/backend/nodes/equalfuncs.cpp index cd621d6bc..4ce7370c9 100755 --- a/src/common/backend/nodes/equalfuncs.cpp +++ b/src/common/backend/nodes/equalfuncs.cpp @@ -363,6 +363,7 @@ static bool _equalSubPlan(const SubPlan* a, const SubPlan* b) COMPARE_SCALAR_FIELD(firstColCollation); COMPARE_SCALAR_FIELD(useHashTable); COMPARE_SCALAR_FIELD(unknownEqFalse); + COMPARE_SCALAR_FIELD(parallel_safe); COMPARE_NODE_FIELD(setParam); COMPARE_NODE_FIELD(parParam); COMPARE_NODE_FIELD(args); diff --git a/src/common/backend/nodes/outfuncs.cpp b/src/common/backend/nodes/outfuncs.cpp index 82aeb96e6..ff4ad7c0e 100755 --- a/src/common/backend/nodes/outfuncs.cpp +++ b/src/common/backend/nodes/outfuncs.cpp @@ -611,6 +611,7 @@ static void _outPlanInfo(StringInfo str, Plan* node) WRITE_FLOAT_FIELD(multiple, "%.0f"); WRITE_INT_FIELD(plan_width); WRITE_BOOL_FIELD(parallel_aware); + WRITE_BOOL_FIELD(parallel_safe); WRITE_NODE_FIELD(targetlist); WRITE_NODE_FIELD(qual); WRITE_NODE_FIELD(lefttree); @@ -2267,6 +2268,7 @@ static void _outSubPlan(StringInfo str, SubPlan* node) WRITE_OID_FIELD(firstColCollation); WRITE_BOOL_FIELD(useHashTable); WRITE_BOOL_FIELD(unknownEqFalse); + WRITE_BOOL_FIELD(parallel_safe); WRITE_NODE_FIELD(setParam); WRITE_NODE_FIELD(parParam); WRITE_NODE_FIELD(args); diff --git a/src/common/backend/nodes/readfuncs.cpp b/src/common/backend/nodes/readfuncs.cpp index 82c2ac5d9..938c6cf3a 100644 --- a/src/common/backend/nodes/readfuncs.cpp +++ b/src/common/backend/nodes/readfuncs.cpp @@ -2685,6 +2685,7 @@ static Plan* _readPlan(Plan* local_node) READ_FLOAT_FIELD(multiple); READ_INT_FIELD(plan_width); READ_BOOL_FIELD(parallel_aware); + READ_BOOL_FIELD(parallel_safe); READ_NODE_FIELD(targetlist); READ_NODE_FIELD(qual); READ_NODE_FIELD(lefttree); @@ -2735,6 +2736,7 @@ static SubPlan* _readSubPlan(SubPlan* local_node) READ_OID_FIELD(firstColCollation); READ_BOOL_FIELD(useHashTable); READ_BOOL_FIELD(unknownEqFalse); + READ_BOOL_FIELD(parallel_safe); READ_NODE_FIELD(setParam); READ_NODE_FIELD(parParam); READ_NODE_FIELD(args); diff --git a/src/common/backend/utils/adt/selfuncs.cpp b/src/common/backend/utils/adt/selfuncs.cpp index 5f1150955..64b9d2321 100644 --- a/src/common/backend/utils/adt/selfuncs.cpp +++ b/src/common/backend/utils/adt/selfuncs.cpp @@ -7459,6 +7459,7 @@ Datum gincostestimate(PG_FUNCTION_ARGS) Cost* index_total_cost = (Cost*)PG_GETARG_POINTER(4); Selectivity* index_selectivity = (Selectivity*)PG_GETARG_POINTER(5); double* index_correlation = (double*)PG_GETARG_POINTER(6); + double* index_pages = (double*)PG_GETARG_POINTER(7); IndexOptInfo* index = path->indexinfo; List* index_quals = path->indexquals; List* index_orderbys = path->indexorderbys; @@ -7717,6 +7718,7 @@ Datum gincostestimate(PG_FUNCTION_ARGS) *index_total_cost += qual_arg_cost; *index_total_cost += ((num_tuples * *index_selectivity) * (u_sess->attr.attr_sql.cpu_index_tuple_cost + qual_op_cost)); + *index_pages = data_pages_fetched; PG_RETURN_VOID(); } diff --git a/src/common/backend/utils/misc/guc.cpp b/src/common/backend/utils/misc/guc.cpp index 57c2fcd00..758dc9bac 100644 --- a/src/common/backend/utils/misc/guc.cpp +++ b/src/common/backend/utils/misc/guc.cpp @@ -7377,6 +7377,24 @@ static void init_configure_names_int() NULL, NULL }, + { + { + "min_parallel_index_scan_size", + PGC_USERSET, + QUERY_TUNING_COST, + gettext_noop("Sets the minimum amount of index data for a parallel scan."), + gettext_noop("If the planner estimates that it will read a number of index pages " + "too small to reach this limit, a parallel scan will not be considered."), + GUC_UNIT_BLOCKS, + }, + &u_sess->attr.attr_sql.min_parallel_index_scan_size, + (512 * 1024) / BLCKSZ, + 0, + INT_MAX / 3, + NULL, + NULL, + NULL + }, { /* Can't be set in postgresql.conf */ { diff --git a/src/gausskernel/optimizer/geqo/geqo_eval.cpp b/src/gausskernel/optimizer/geqo/geqo_eval.cpp index 6dc6715ab..cdf79fc13 100755 --- a/src/gausskernel/optimizer/geqo/geqo_eval.cpp +++ b/src/gausskernel/optimizer/geqo/geqo_eval.cpp @@ -231,6 +231,9 @@ static List* merge_clump(PlannerInfo* root, List* clumps, Clump* new_clump, bool joinrel = make_join_rel(root, old_clump->joinrel, new_clump->joinrel); /* Keep searching if join order is not valid */ if (joinrel != NULL) { + /* Create GatherPaths for any useful partial paths for rel */ + generate_gather_paths(root, joinrel); + /* Find and save the cheapest paths for this joinrel */ set_cheapest(joinrel); diff --git a/src/gausskernel/optimizer/path/allpaths.cpp b/src/gausskernel/optimizer/path/allpaths.cpp index f88ff3b3c..c9bf7e314 100755 --- a/src/gausskernel/optimizer/path/allpaths.cpp +++ b/src/gausskernel/optimizer/path/allpaths.cpp @@ -65,7 +65,7 @@ static void set_base_rel_pathlists(PlannerInfo* root); static void set_correlated_rel_pathlist(PlannerInfo* root, RelOptInfo* rel); static void set_rel_pathlist(PlannerInfo* root, RelOptInfo* rel, Index rti, RangeTblEntry* rte); static void set_plain_rel_size(PlannerInfo* root, RelOptInfo* rel, RangeTblEntry* rte); -static void create_parallel_paths(PlannerInfo* root, RelOptInfo* rel); +static void create_plain_partial_paths(PlannerInfo* root, RelOptInfo* rel); static void set_tablesample_rel_size(PlannerInfo* root, RelOptInfo* rel, RangeTblEntry* rte); static void set_plain_rel_pathlist(PlannerInfo* root, RelOptInfo* rel, RangeTblEntry* rte); static void have_gather_plan_node(Plan* plan, void* context, const char* query_string); @@ -667,6 +667,23 @@ static void set_rel_pathlist(PlannerInfo* root, RelOptInfo* rel, Index rti, Rang break; } } + /* + * If this is a baserel, consider gathering any partial paths we may have + * created for it. (If we tried to gather inheritance children, we could + * end up with a very large number of gather nodes, each trying to grab + * its own pool of workers, so don't do this for otherrels. Instead, + * we'll consider gathering partial paths for the parent appendrel.) + */ + if (rel->reloptkind == RELOPT_BASEREL) { + generate_gather_paths(root, rel); + } + + /* + * Find the cheapest of the paths for this rel here because + * generate_gather_paths may delete a path that some paths have + * a reference to. + */ + set_cheapest(rel); debug1_print_rel(root, rel); @@ -752,50 +769,21 @@ static void set_plain_rel_size(PlannerInfo* root, RelOptInfo* rel, RangeTblEntry } /* - * create_parallel_paths - * Build parallel access paths for a plain relation + * create_plain_partial__paths + * Build partial access paths for a plain relation */ -static void create_parallel_paths(PlannerInfo* root, RelOptInfo* rel) +static void create_plain_partial_paths(PlannerInfo* root, RelOptInfo* rel) { - int parallel_threshold = u_sess->attr.attr_sql.min_parallel_table_scan_size; - int parallel_workers = 1; - int max_parallel_workers = u_sess->attr.attr_sql.max_parallel_workers_per_gather; - /* - * If this relation is too small to be worth a parallel scan, just return - * without doing anything ... unless it's an inheritance child. In that case, - * we want to generate a parallel path here anyway. It might not be worthwhile - * just for this relation, but when combined with all of its inheritance siblings - * it may well pay off. - */ - if (rel->pages < parallel_threshold && rel->reloptkind == RELOPT_BASEREL) { - return; - } + int parallel_workers = 0; - /* - * Limit the degree of parallelism logarithmically based on the size of the - * relation. This probably needs to be a good deal more sophisticated, but we - * need something here for now. - */ - while (rel->pages > parallel_threshold * 3 && parallel_workers < max_parallel_workers) { - parallel_workers++; - parallel_threshold *= 3; - if (parallel_threshold >= PG_INT32_MAX / 3) - break; + parallel_workers = compute_parallel_worker(rel, rel->pages, 0); + + if (parallel_workers <= 0) { + return; } /* Add an unordered partial path based on a parallel sequential scan. */ add_partial_path(rel, create_seqscan_path(root, rel, NULL, 1, parallel_workers)); - - /* - * If this is a baserel, consider gathering any partial paths we may have - * just created. If we gathered an inheritance child, we could end up - * with a very large number of gather nodes, each trying to grab its own - * pool of workers, so don't do this in that case. Instead, we'll - * consider gathering partial paths for the appendrel. - */ - if (rel->reloptkind == RELOPT_BASEREL) { - generate_gather_paths(root, rel); - } } @@ -950,7 +938,7 @@ static void set_plain_rel_pathlist(PlannerInfo* root, RelOptInfo* rel, RangeTblE /* Consider parallel sequential scan */ if (rel->consider_parallel) { - create_parallel_paths(root, rel); + create_plain_partial_paths(root, rel); } break; @@ -994,9 +982,6 @@ static void set_plain_rel_pathlist(PlannerInfo* root, RelOptInfo* rel, RangeTblE } #endif - /* Now find the cheapest of the paths for this rel */ - set_cheapest(rel); - /* Consider partition's path for partitioned table */ #ifdef PGXC if (!isrp) @@ -1033,16 +1018,21 @@ static void have_gather_plan_node(Plan* plan, void* context, const char* query_s /* * If this relation could possibly be scanned from within a worker, then set - * the consider_parallel flag. The flag has previously been initialized to - * false, so we just bail out if it becomes clear that we can't safely set it. + * its consider_parallel flag. */ static void set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte) { + /* + * The flag has previously been initialized to false, so we can just + * return if it becomes clear that we can't safely set it. + */ + Assert(!rel->consider_parallel); + /* Don't call this if parallelism is disallowed for the entire query. */ Assert(root->glob->parallelModeOK); - /* Don't call this for non-baserels. */ - Assert(rel->reloptkind == RELOPT_BASEREL); + /* This should only be called for baserels and appendrel children. */ + Assert(rel->reloptkind == RELOPT_BASEREL || rel->reloptkind == RELOPT_OTHER_MEMBER_REL); /* Assorted checks based on rtekind. */ switch (rte->rtekind) { @@ -1361,9 +1351,6 @@ static void set_append_rel_size(PlannerInfo* root, RelOptInfo* rel, Index rti, R continue; } - /* Copy consider_parallel flag from parent. */ - childrel->consider_parallel = rel->consider_parallel; - /* * CE failed, so finish copying/modifying targetlist and join quals. * @@ -1413,6 +1400,18 @@ static void set_append_rel_size(PlannerInfo* root, RelOptInfo* rel, Index rti, R * because attr_needed is only examined for base relations not * otherrels. So we just leave the child's attr_needed empty. */ + + /* + * If parallelism is allowable for this query in general, see whether + * it's allowable for this childrel in particular. But if we've + * already decided the appendrel is not parallel-safe as a whole, + * there's no point in considering parallelism for this child. For + * consistency, do this before calling set_rel_size() for the child. + */ + if (root->glob->parallelModeOK && rel->consider_parallel) { + set_rel_consider_parallel(root, childrel, childRTE); + } + /* * Compute the child's size. */ @@ -1428,6 +1427,19 @@ static void set_append_rel_size(PlannerInfo* root, RelOptInfo* rel, Index rti, R has_live_children = true; + /* + * If any live child is not parallel-safe, treat the whole appendrel + * as not parallel-safe. In future we might be able to generate plans + * in which some children are farmed out to workers while others are + * not; but we don't have that today, so it's a waste to consider + * partial paths anywhere in the appendrel unless it's all safe. + * (Child rels visited before this one will be unmarked in + * set_append_rel_pathlist().) + */ + if (!childrel->consider_parallel) { + rel->consider_parallel = false; + } + /* * Accumulate size information from each live child. */ @@ -1568,6 +1580,16 @@ static void set_append_rel_pathlist(PlannerInfo* root, RelOptInfo* rel, Index rt childRTE = root->simple_rte_array[childRTindex]; childrel = root->simple_rel_array[childRTindex]; + /* + * If set_append_rel_size() decided the parent appendrel was + * parallel-unsafe at some point after visiting this child rel, we + * need to propagate the unsafety marking down to the child, so that + * we don't generate useless partial paths for it. + */ + if (!rel->consider_parallel) { + childrel->consider_parallel = false; + } + /* * Compute the child's access paths. */ @@ -1772,9 +1794,6 @@ static void add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, List *li /* Add the path. */ add_partial_path(rel, (Path *) appendpath); - - /* Consider gathering it. */ - generate_gather_paths(root, rel); } /* @@ -1876,9 +1895,6 @@ static void add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, List *li } } } - - /* Select cheapest paths */ - set_cheapest(rel); } /* @@ -2182,9 +2198,6 @@ static void set_subquery_pathlist(PlannerInfo* root, RelOptInfo* rel, Index rti, /* Generate appropriate path */ add_path(root, rel, create_subqueryscan_path(root, rel, pathkeys, NULL)); - - /* Select cheapest path (pretty easy in this case...) */ - set_cheapest(rel); } /* @@ -2195,9 +2208,6 @@ static void set_function_pathlist(PlannerInfo* root, RelOptInfo* rel, RangeTblEn { /* Generate appropriate path */ add_path(root, rel, create_functionscan_path(root, rel)); - - /* Select cheapest path (pretty easy in this case...) */ - set_cheapest(rel); } /* @@ -2208,9 +2218,6 @@ static void set_values_pathlist(PlannerInfo* root, RelOptInfo* rel, RangeTblEntr { /* Generate appropriate path */ add_path(root, rel, create_valuesscan_path(root, rel)); - - /* Select cheapest path (pretty easy in this case...) */ - set_cheapest(rel); } /* @@ -2326,9 +2333,6 @@ static void set_cte_pathlist(PlannerInfo* root, RelOptInfo* rel, RangeTblEntry* /* Generate appropriate path */ add_path(root, rel, create_ctescan_path(root, rel)); - - /* Select cheapest path (pretty easy in this case...) */ - set_cheapest(rel); } /* @@ -2379,15 +2383,16 @@ static void set_worktable_pathlist(PlannerInfo* root, RelOptInfo* rel, RangeTblE /* Generate appropriate path */ add_path(root, rel, create_worktablescan_path(root, rel)); - - /* Select cheapest path (pretty easy in this case...) */ - set_cheapest(rel); } /* * generate_gather_paths * Generate parallel access paths for a relation by pushing a Gather on * top of a partial path. + * + * This must not be called until after we're done creating all partial paths + * for the specified relation. (Otherwise, add_partial_path might delete a + * path that some GatherPath has a reference to.) */ void generate_gather_paths(PlannerInfo* root, RelOptInfo* rel) { @@ -2400,7 +2405,9 @@ void generate_gather_paths(PlannerInfo* root, RelOptInfo* rel) /* * The output of Gather is currently always unsorted, so there's only one - * partial path of interest: the cheapest one. + * partial path of interest: the cheapest one. That will be the one at + * the front of partial_pathlist because of the way add_partial_path + * works. * * Eventually, we should have a Gather Merge operation that can merge * multiple tuple streams together while preserving their ordering. We @@ -2551,11 +2558,18 @@ RelOptInfo* standard_join_search(PlannerInfo* root, int levels_needed, List* ini join_search_one_level(root, lev); /* - * Do cleanup work on each just-processed rel. + * Run generate_gather_paths() for each just-processed joinrel. We + * could not do this earlier because both regular and partial paths + * can get added to a particular joinrel at multiple times within + * join_search_one_level. After that, we're done creating paths + * for the joinrel, so run set_cheapest(). */ foreach (lc, root->join_rel_level[lev]) { rel = (RelOptInfo*)lfirst(lc); + /* Create GatherPaths for any useful partial paths for rel */ + generate_gather_paths(root, rel); + /* Find and save the cheapest paths for this rel */ set_cheapest(rel, root); @@ -3393,6 +3407,92 @@ bool is_single_baseresult_plan(Plan* plan) return IsA(plan, BaseResult) ? (plan->lefttree == NULL) : false; } +/* + * Compute the number of parallel workers that should be used to scan a + * relation. We compute the parallel workers based on the size of the heap to + * be scanned and the size of the index to be scanned, then choose a minimum + * of those. + * + * "heap_pages" is the number of pages from the table that we expect to scan. + * "index_pages" is the number of pages from the index that we expect to scan. + */ +int compute_parallel_worker(RelOptInfo *rel, BlockNumber heap_pages, BlockNumber index_pages) +{ + int min_parallel_table_scan_size = u_sess->attr.attr_sql.min_parallel_table_scan_size; + int min_parallel_index_scan_size = u_sess->attr.attr_sql.min_parallel_index_scan_size; + int parallel_workers = 0; + int heap_parallel_workers = 1; + int index_parallel_workers = 1; + + /* + * If the user has set the parallel_workers reloption, use that; otherwise + * select a default number of workers. + */ + if (rel->rel_parallel_workers != -1) { + parallel_workers = rel->rel_parallel_workers; + } else { + int heap_parallel_threshold; + int index_parallel_threshold; + + /* + * If this relation is too small to be worth a parallel scan, just + * return without doing anything ... unless it's an inheritance child. + * In that case, we want to generate a parallel path here anyway. It + * might not be worthwhile just for this relation, but when combined + * with all of its inheritance siblings it may well pay off. + */ + if (heap_pages < (BlockNumber)min_parallel_table_scan_size && + index_pages < (BlockNumber)min_parallel_index_scan_size && rel->reloptkind == RELOPT_BASEREL) { + return 0; + } + + if (heap_pages > 0) { + /* + * Select the number of workers based on the log of the size of + * the relation. This probably needs to be a good deal more + * sophisticated, but we need something here for now. Note that + * the upper limit of the min_parallel_table_scan_size GUC is + * chosen to prevent overflow here. + */ + heap_parallel_threshold = Max(min_parallel_table_scan_size, 1); + while (heap_pages >= (BlockNumber)(heap_parallel_threshold * 3)) { + heap_parallel_workers++; + heap_parallel_threshold *= 3; + if (heap_parallel_threshold > INT_MAX / 3) { + break; /* avoid overflow */ + } + } + + parallel_workers = heap_parallel_workers; + } + + if (index_pages > 0) { + /* same calculation as for heap_pages above */ + index_parallel_threshold = Max(min_parallel_index_scan_size, 1); + while (index_pages >= (BlockNumber)(index_parallel_threshold * 3)) { + index_parallel_workers++; + index_parallel_threshold *= 3; + if (index_parallel_threshold > INT_MAX / 3) { + break; /* avoid overflow */ + } + } + + if (parallel_workers > 0) { + parallel_workers = Min(parallel_workers, index_parallel_workers); + } else { + parallel_workers = index_parallel_workers; + } + } + } + + /* + * In no case use more than max_parallel_workers_per_gather workers. + */ + parallel_workers = Min(parallel_workers, u_sess->attr.attr_sql.max_parallel_workers_per_gather); + + return parallel_workers; +} + /***************************************************************************** * DEBUG SUPPORT *****************************************************************************/ diff --git a/src/gausskernel/optimizer/path/costsize.cpp b/src/gausskernel/optimizer/path/costsize.cpp index 06057865b..30a1b202f 100644 --- a/src/gausskernel/optimizer/path/costsize.cpp +++ b/src/gausskernel/optimizer/path/costsize.cpp @@ -162,6 +162,7 @@ void init_plan_cost(Plan* plan) plan->pred_total_time = -1.0; plan->pred_max_memory = -1; plan->parallel_aware = false; + plan->parallel_safe = false; } static inline void get_info_from_rel( @@ -1012,7 +1013,7 @@ void cost_gather(GatherPath *path, RelOptInfo *rel, ParamPathInfo *param_info) * number of returned tuples, but they won't reduce the number of tuples * we have to fetch from the table, so they don't reduce the scan cost. */ -void cost_index(IndexPath* path, PlannerInfo* root, double loop_count) +void cost_index(IndexPath* path, PlannerInfo* root, double loop_count, bool partial_path) { IndexOptInfo* index = path->indexinfo; RelOptInfo* baserel = index->rel; @@ -1020,6 +1021,7 @@ void cost_index(IndexPath* path, PlannerInfo* root, double loop_count) List* allclauses = NIL; Cost startup_cost = 0; Cost run_cost = 0; + Cost cpu_run_cost = 0; Cost indexStartupCost; Cost indexTotalCost; Selectivity indexSelectivity; @@ -1031,6 +1033,8 @@ void cost_index(IndexPath* path, PlannerInfo* root, double loop_count) double tuples_fetched; double pages_fetched; bool ispartitionedindex = path->indexinfo->rel->isPartitionedTable; + double rand_heap_pages; + double index_pages = 0.0; /* Should only be applied to base relations */ AssertEreport(IsA(baserel, RelOptInfo) && IsA(index, IndexOptInfo), @@ -1064,14 +1068,15 @@ void cost_index(IndexPath* path, PlannerInfo* root, double loop_count) * the fraction of main-table tuples we will have to retrieve) and its * correlation to the main-table tuple order. */ - OidFunctionCall7(index->amcostestimate, + OidFunctionCall8(index->amcostestimate, PointerGetDatum(root), PointerGetDatum(path), Float8GetDatum(loop_count), PointerGetDatum(&indexStartupCost), PointerGetDatum(&indexTotalCost), PointerGetDatum(&indexSelectivity), - PointerGetDatum(&indexCorrelation)); + PointerGetDatum(&indexCorrelation), + PointerGetDatum(&index_pages)); /* * Save amcostestimate's results for possible use in bitmap scan planning. @@ -1133,6 +1138,8 @@ void cost_index(IndexPath* path, PlannerInfo* root, double loop_count) if (indexonly) pages_fetched = ceil(pages_fetched * (1.0 - baserel->allvisfrac)); + rand_heap_pages = pages_fetched; + max_IO_cost = (pages_fetched * spc_random_page_cost) / loop_count; /* @@ -1165,6 +1172,8 @@ void cost_index(IndexPath* path, PlannerInfo* root, double loop_count) if (indexonly) pages_fetched = ceil(pages_fetched * (1.0 - baserel->allvisfrac)); + rand_heap_pages = pages_fetched; + /* max_IO_cost is for the perfectly uncorrelated case (csquared=0) */ max_IO_cost = pages_fetched * spc_random_page_cost; @@ -1183,6 +1192,28 @@ void cost_index(IndexPath* path, PlannerInfo* root, double loop_count) } } + if (partial_path) { + /* + * Estimate the number of parallel workers required to scan index. Use + * the number of heap pages computed considering heap fetches won't be + * sequential as for parallel scans the pages are accessed in random + * order. + */ + path->path.parallel_workers = + compute_parallel_worker(baserel, (BlockNumber)rand_heap_pages, (BlockNumber)index_pages); + + /* + * Fall out if workers can't be assigned for parallel scan, because in + * such a case this path will be rejected. So there is no benefit in + * doing extra computation. + */ + if (path->path.parallel_workers <= 0) { + return; + } + + path->path.parallel_aware = true; + } + /* * Now interpolate based on estimated index order correlation to get total * disk I/O cost for main table accesses. @@ -1218,7 +1249,19 @@ void cost_index(IndexPath* path, PlannerInfo* root, double loop_count) else cpu_per_tuple = u_sess->attr.attr_sql.cpu_tuple_cost + qpqual_cost.per_tuple; - run_cost += cpu_per_tuple * tuples_fetched; + cpu_run_cost += cpu_per_tuple * tuples_fetched; + + /* Adjust costing for parallelism, if used. */ + if (path->path.parallel_workers > 0) { + double parallel_divisor = get_parallel_divisor(&path->path); + + path->path.rows = clamp_row_est(path->path.rows / parallel_divisor); + + /* The CPU cost is divided among all the workers. */ + cpu_run_cost /= parallel_divisor; + } + + run_cost += cpu_run_cost; path->path.startup_cost = startup_cost; path->path.total_cost = startup_cost + run_cost; diff --git a/src/gausskernel/optimizer/path/indxpath.cpp b/src/gausskernel/optimizer/path/indxpath.cpp index cc8d8456b..b94fc7eeb 100755 --- a/src/gausskernel/optimizer/path/indxpath.cpp +++ b/src/gausskernel/optimizer/path/indxpath.cpp @@ -697,7 +697,7 @@ static inline bool index_relation_has_bucket(IndexOptInfo* index) /* * build_index_paths * Given an index and a set of index clauses for it, construct zero - * or more IndexPaths. + * or more IndexPaths. It also constructs zero or more partial IndexPaths. * * We return a list of paths because (1) this routine checks some cases * that should cause us to not generate any IndexPath, and (2) in some @@ -896,8 +896,39 @@ static List* build_index_paths(PlannerInfo* root, RelOptInfo* rel, IndexOptInfo* index_is_ordered ? ForwardScanDirection : NoMovementScanDirection, index_only_scan, outer_relids, - loop_count); + loop_count, + false); result = lappend(result, ipath); + + /* + * If appropriate, consider parallel index scan. We don't allow + * parallel index scan for bitmap index scans. + */ + if (index->amcanparallel && !index_only_scan && rel->consider_parallel && outer_relids == NULL && + scantype != ST_BITMAPSCAN) { + ipath = create_index_path(root, + index, + index_clauses, + clause_columns, + orderbyclauses, + orderbyclausecols, + useful_pathkeys, + index_is_ordered ? ForwardScanDirection : NoMovementScanDirection, + index_only_scan, + outer_relids, + loop_count, + true); + + /* + * if, after costing the path, we find that it's not worth + * using parallel workers, just free it. + */ + if (ipath->path.parallel_workers > 0) { + add_partial_path(rel, (Path*)ipath); + } else { + pfree(ipath); + } + } } /* @@ -920,8 +951,35 @@ static List* build_index_paths(PlannerInfo* root, RelOptInfo* rel, IndexOptInfo* BackwardScanDirection, index_only_scan, outer_relids, - loop_count); + loop_count, + false); result = lappend(result, ipath); + + /* If appropriate, consider parallel index scan */ + if (index->amcanparallel && !index_only_scan && rel->consider_parallel && outer_relids == NULL && + scantype != ST_BITMAPSCAN) { + ipath = create_index_path(root, + index, index_clauses, + clause_columns, + NIL, + NIL, + useful_pathkeys, + BackwardScanDirection, + index_only_scan, + outer_relids, + loop_count, + true); + + /* + * if, after costing the path, we find that it's not worth + * using parallel workers, just free it. + */ + if (ipath->path.parallel_workers > 0) { + add_partial_path(rel, (Path*)ipath); + } else { + pfree(ipath); + } + } } } diff --git a/src/gausskernel/optimizer/path/joinpath.cpp b/src/gausskernel/optimizer/path/joinpath.cpp index c4c6677f8..3fc5d5100 100755 --- a/src/gausskernel/optimizer/path/joinpath.cpp +++ b/src/gausskernel/optimizer/path/joinpath.cpp @@ -246,10 +246,6 @@ void add_paths_to_joinrel(PlannerInfo* root, RelOptInfo* joinrel, RelOptInfo* ou if (u_sess->attr.attr_sql.enable_hashjoin || jointype == JOIN_FULL || hashjoin_hint != NIL) { hash_inner_and_outer(root, joinrel, outerrel, innerrel, jointype, &semifactors, &extra); } - /* - * 6. Consider gathering partial paths. - */ - generate_gather_paths(root, joinrel); #ifdef PGXC /* * Can not generate join path. It is not necessary to this branch, otherwise diff --git a/src/gausskernel/optimizer/plan/createplan.cpp b/src/gausskernel/optimizer/plan/createplan.cpp index 72fce06a3..1225a3a09 100644 --- a/src/gausskernel/optimizer/plan/createplan.cpp +++ b/src/gausskernel/optimizer/plan/createplan.cpp @@ -81,7 +81,7 @@ static Plan* create_plan_recurse(PlannerInfo* root, Path* best_path); static Plan* create_scan_plan(PlannerInfo* root, Path* best_path); static List* build_relation_tlist(RelOptInfo* rel); static bool use_physical_tlist(PlannerInfo* root, RelOptInfo* rel); -static Plan* create_gating_plan(PlannerInfo* root, Plan* plan, List* quals); +static Plan* create_gating_plan(PlannerInfo* root, Plan* plan, List* quals, bool parallel_safe); static Plan* create_join_plan(PlannerInfo* root, JoinPath* best_path); static Plan* create_append_plan(PlannerInfo* root, AppendPath* best_path); static Plan* create_merge_append_plan(PlannerInfo* root, MergeAppendPath* best_path); @@ -691,7 +691,7 @@ static Plan* create_scan_plan(PlannerInfo* root, Path* best_path) * quals. */ if (root->hasPseudoConstantQuals) { - plan = create_gating_plan(root, plan, scan_clauses); + plan = create_gating_plan(root, plan, scan_clauses, best_path->parallel_safe); } return plan; @@ -896,7 +896,7 @@ void disuse_physical_tlist(Plan* plan, Path* path) * in most cases we have only a very bad idea of the probability of the gating * qual being true. */ -static Plan* create_gating_plan(PlannerInfo* root, Plan* plan, List* quals) +static Plan* create_gating_plan(PlannerInfo* root, Plan* plan, List* quals, bool parallel_safe) { List* pseudoconstants = NIL; @@ -910,7 +910,10 @@ static Plan* create_gating_plan(PlannerInfo* root, Plan* plan, List* quals) return plan; } - return (Plan*)make_result(root, plan->targetlist, (Node*)pseudoconstants, plan); + Plan *gplan = (Plan*)make_result(root, plan->targetlist, (Node*)pseudoconstants, plan); + /* Gating quals could be unsafe, so better use the Path's safety flag */ + gplan->parallel_safe = parallel_safe; + return gplan; } /* If inner plan or outer plan exec on CN and other side has add stream or hashfilter for replication join, @@ -1023,7 +1026,7 @@ static Plan* create_join_plan(PlannerInfo* root, JoinPath* best_path) * quals. */ if (root->hasPseudoConstantQuals) - plan = create_gating_plan(root, plan, best_path->joinrestrictinfo); + plan = create_gating_plan(root, plan, best_path->joinrestrictinfo, best_path->path.parallel_safe); #ifdef NOT_USED @@ -1125,7 +1128,8 @@ static Plan* create_append_plan(PlannerInfo* root, AppendPath* best_path) * quals. */ if (root->hasPseudoConstantQuals) { - return create_gating_plan(root, (Plan*)plan, best_path->path.parent->baserestrictinfo); + return create_gating_plan(root, (Plan*)plan, best_path->path.parent->baserestrictinfo, + best_path->path.parallel_safe); } return (Plan*)plan; @@ -2780,6 +2784,8 @@ static Plan* create_bitmap_subplan(PlannerInfo* root, Path* bitmapqual, List** q set_plan_rows( plan, clamp_row_est(apath->bitmapselectivity * apath->path.parent->tuples), apath->path.parent->multiple); plan->plan_width = 0; /* meaningless */ + plan->parallel_aware = false; + plan->parallel_safe = apath->path.parallel_safe; *qual = subquals; *indexqual = subindexquals; *indexECs = subindexECs; @@ -2840,6 +2846,8 @@ static Plan* create_bitmap_subplan(PlannerInfo* root, Path* bitmapqual, List** q clamp_row_est(opath->bitmapselectivity * opath->path.parent->tuples), opath->path.parent->multiple); plan->plan_width = 0; /* meaningless */ + plan->parallel_aware = false; + plan->parallel_safe = opath->path.parallel_safe; } /* @@ -2906,6 +2914,8 @@ static Plan* create_bitmap_subplan(PlannerInfo* root, Path* bitmapqual, List** q set_plan_rows( plan, clamp_row_est(ipath->indexselectivity * ipath->path.parent->tuples), ipath->path.parent->multiple); plan->plan_width = 0; /* meaningless */ + plan->parallel_aware = false; + plan->parallel_safe = ipath->path.parallel_safe; *qual = get_actual_clauses(ipath->indexclauses); *indexqual = get_actual_clauses(ipath->indexquals); foreach (l, ipath->indexinfo->indpred) { @@ -5241,6 +5251,7 @@ static void copy_path_costsize(Plan* dest, Path* src) dest->innerdistinct = src->innerdistinct; dest->outerdistinct = src->outerdistinct; dest->parallel_aware = src->parallel_aware; + dest->parallel_safe = src->parallel_safe; } else { /* init the cost field directly */ init_plan_cost(dest); @@ -5258,6 +5269,10 @@ void copy_plan_costsize(Plan* dest, Plan* src) dest->total_cost = src->total_cost; set_plan_rows_from_plan(dest, PLAN_LOCAL_ROWS(src), src->multiple); dest->plan_width = src->plan_width; + /* Assume the inserted node is not parallel-aware. */ + dest->parallel_aware = false; + /* Assume the inserted node is parallel-safe, if child plan is. */ + dest->parallel_safe = src->parallel_safe; } else { /* init the cost field directly */ init_plan_cost(dest); @@ -7190,6 +7205,8 @@ Plan* materialize_finished_plan(Plan* subplan, bool materialize_above_stream, bo /* parameter kluge --- see comments above */ matplan->extParam = bms_copy(subplan->extParam); matplan->allParam = bms_copy(subplan->allParam); + matplan->parallel_aware = false; + matplan->parallel_safe = subplan->parallel_safe; return matplan; } @@ -8900,9 +8917,9 @@ void pgxc_copy_path_costsize(Plan* dest, Path* src) copy_path_costsize(dest, src); } -Plan* pgxc_create_gating_plan(PlannerInfo* root, Plan* plan, List* quals) +Plan* pgxc_create_gating_plan(PlannerInfo* root, Plan* plan, List* quals, bool parallel_safe) { - return create_gating_plan(root, plan, quals); + return create_gating_plan(root, plan, quals, parallel_safe); } #endif /* PGXC */ diff --git a/src/gausskernel/optimizer/plan/pgxcplan_single.cpp b/src/gausskernel/optimizer/plan/pgxcplan_single.cpp index 314cd67a2..94ed46ab8 100755 --- a/src/gausskernel/optimizer/plan/pgxcplan_single.cpp +++ b/src/gausskernel/optimizer/plan/pgxcplan_single.cpp @@ -830,7 +830,7 @@ Plan* create_remotequery_plan(PlannerInfo* root, RemoteQueryPath* best_path) default: elog(ERROR, "creating remote query plan for relations of type %d is not supported", rel->reloptkind); } - return pgxc_create_gating_plan(root, (Plan*)result_node, quals); + return pgxc_create_gating_plan(root, (Plan*)result_node, quals, best_path->path.parallel_safe); } return (Plan*)result_node; diff --git a/src/gausskernel/optimizer/plan/planner.cpp b/src/gausskernel/optimizer/plan/planner.cpp index ca97b702f..191c57dad 100644 --- a/src/gausskernel/optimizer/plan/planner.cpp +++ b/src/gausskernel/optimizer/plan/planner.cpp @@ -7076,7 +7076,10 @@ bool plan_cluster_use_sort(Oid tableOid, Oid indexOid) (rel->orientation != REL_ROW_ORIENTED)); /* Estimate the cost of index scan */ - indexScanPath = create_index_path(root, indexInfo, NIL, NIL, NIL, NIL, NIL, ForwardScanDirection, false, NULL, 1.0); + indexScanPath = create_index_path(root, indexInfo, + NIL, NIL, NIL, NIL, NIL, + ForwardScanDirection, false, + NULL, 1.0, false); return (seqScanAndSortPath.total_cost < indexScanPath->path.total_cost); } @@ -7142,7 +7145,8 @@ bool planClusterPartitionUseSort(Relation partRel, Oid indexOid, PlannerInfo* ro (relOptInfo->orientation != REL_ROW_ORIENTED)); /* Estimate the cost of index scan */ - indexScanPath = create_index_path(root, indexInfo, NIL, NIL, NIL, NIL, NIL, ForwardScanDirection, false, NULL, 1.0); + indexScanPath = create_index_path(root, indexInfo, NIL, NIL, NIL, NIL, + NIL, ForwardScanDirection, false, NULL, 1.0, false); return (seqScanAndSortPath.total_cost < indexScanPath->path.total_cost); } diff --git a/src/gausskernel/optimizer/plan/subselect.cpp b/src/gausskernel/optimizer/plan/subselect.cpp index 4eab0d9de..c6545e96b 100755 --- a/src/gausskernel/optimizer/plan/subselect.cpp +++ b/src/gausskernel/optimizer/plan/subselect.cpp @@ -699,6 +699,7 @@ static Node* build_subplan(PlannerInfo* root, Plan* plan, PlannerInfo* subroot, get_first_col_type(plan, &splan->firstColType, &splan->firstColTypmod, &splan->firstColCollation); splan->useHashTable = false; splan->unknownEqFalse = unknownEqFalse; + splan->parallel_safe = plan->parallel_safe; splan->setParam = NIL; splan->parParam = NIL; splan->args = NIL; @@ -1341,6 +1342,11 @@ static void SS_process_one_cte(PlannerInfo* root, CommonTableExpr* cte, Query* s get_first_col_type(plan, &splan->firstColType, &splan->firstColTypmod, &splan->firstColCollation); splan->useHashTable = false; splan->unknownEqFalse = false; + /* + * CTE scans are not considered for parallelism (cf + * set_rel_consider_parallel). + */ + splan->parallel_safe = false; splan->setParam = NIL; splan->parParam = NIL; splan->args = NIL; diff --git a/src/gausskernel/optimizer/util/pathnode.cpp b/src/gausskernel/optimizer/util/pathnode.cpp index 5fd19b269..f0db851ff 100755 --- a/src/gausskernel/optimizer/util/pathnode.cpp +++ b/src/gausskernel/optimizer/util/pathnode.cpp @@ -970,8 +970,14 @@ void set_hint_value(RelOptInfo* join_rel, Path* new_path, HintState* hstate) * but just recycling discarded Path nodes is a very useful savings in * a large join tree. We can recycle the List nodes of pathlist, too. * - * BUT: we do not pfree IndexPath objects, since they may be referenced as - * children of BitmapHeapPaths as well as being paths in their own right. + * As noted in optimizer/README, deleting a previously-accepted Path is + * safe because we know that Paths of this rel cannot yet be referenced + * from any other rel, such as a higher-level join. However, in some cases + * it is possible that a Path is referenced by another Path for its own + * rel; we must not delete such a Path, even if it is dominated by the new + * Path. Currently this occurs only for IndexPath objects, which may be + * referenced as children of BitmapHeapPaths as well as being paths in + * their own right. Hence, we don't pfree IndexPaths when rejecting them. * * 'parent_rel' is the relation entry to which the path corresponds. * 'new_path' is a potential path for parent_rel. @@ -1450,6 +1456,10 @@ static void add_parameterized_path(RelOptInfo* parent_rel, Path* new_path) * parallel such that each worker will generate a subset of the path's * overall result. * + * As in add_path, the partial_pathlist is kept sorted with the cheapest + * total path in front. This is depended on by multiple places, which + * just take the front entry as the cheapest path without searching. + * * We don't generate parameterized partial paths for several reasons. Most * importantly, they're not safe to execute, because there's nothing to * make sure that a parallel scan within the parameterized portion of the @@ -1469,6 +1479,13 @@ static void add_parameterized_path(RelOptInfo* parent_rel, Path* new_path) * costs: parallelism is only used for plans that will be run to completion. * Therefore, this routine is much simpler than add_path: it needs to * consider only pathkeys and total cost. + * As with add_path, we pfree paths that are found to be dominated by + * another partial path; this requires that there be no other references to + * such paths yet. Hence, GatherPaths must not be created for a rel until + * we're done creating all partial paths for it. We do not currently build + * partial indexscan paths, so there is no need for an exception for + * IndexPaths here; for safety, we instead Assert that a path to be freed + * isn't an IndexPath. */ void add_partial_path(RelOptInfo* parent_rel, Path* new_path) { @@ -1529,8 +1546,7 @@ void add_partial_path(RelOptInfo* parent_rel, Path* new_path) */ if (remove_old) { parent_rel->partial_pathlist = list_delete_cell(parent_rel->partial_pathlist, p1, p1_prev); - /* add_path has a special case for IndexPath; we don't need it */ - Assert(!IsA(old_path, IndexPath)); + /* we should not see IndexPaths here, so always safe to delete */ pfree(old_path); /* p1_prev does not advance */ } else { @@ -1557,8 +1573,7 @@ void add_partial_path(RelOptInfo* parent_rel, Path* new_path) else parent_rel->partial_pathlist = lcons(new_path, parent_rel->partial_pathlist); } else { - /* add_path has a special case for IndexPath; we don't need it */ - Assert(!IsA(new_path, IndexPath)); + /* we should not see IndexPaths here, so always safe to delete */ /* Reject and recycle the new path */ pfree(new_path); } @@ -1641,7 +1656,11 @@ Path* create_seqscan_path(PlannerInfo* root, RelOptInfo* rel, Relids required_ou pathnode->parallel_workers = parallel_workers; #ifdef STREAMPLAN - /* We need to set locator_type for parallel query, cause we may send this value to bg worker */ + /* + * We need to set locator_type for parallel query, cause we may send + * this value to bg worker. If not, locator_type is the initial value '\0', + * which make the later serialized plan truncated. + */ pathnode->locator_type = rel->locator_type; if (IS_STREAM_PLAN) { pathnode->distribute_keys = rel->distribute_keys; @@ -1972,12 +1991,13 @@ bool is_pwj_path(Path* pwjpath) * 'required_outer' is the set of outer relids for a parameterized path. * 'loop_count' is the number of repetitions of the indexscan to factor into * estimates of caching behavior. + * 'partial_path' is true if constructing a parallel index scan path. * * Returns the new path node. */ IndexPath* create_index_path(PlannerInfo* root, IndexOptInfo* index, List* indexclauses, List* indexclausecols, List* indexorderbys, List* indexorderbycols, List* pathkeys, ScanDirection indexscandir, bool indexonly, - Relids required_outer, double loop_count) + Relids required_outer, double loop_count, bool partial_path) { IndexPath* pathnode = makeNode(IndexPath); RelOptInfo* rel = index->rel; @@ -1987,6 +2007,9 @@ IndexPath* create_index_path(PlannerInfo* root, IndexOptInfo* index, List* index pathnode->path.pathtype = indexonly ? T_IndexOnlyScan : T_IndexScan; pathnode->path.parent = rel; pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); + pathnode->path.parallel_aware = false; + pathnode->path.parallel_safe = rel->consider_parallel; + pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = pathkeys; /* Convert clauses to indexquals the executor can handle */ @@ -2001,6 +2024,12 @@ IndexPath* create_index_path(PlannerInfo* root, IndexOptInfo* index, List* index pathnode->indexorderbycols = indexorderbycols; pathnode->indexscandir = indexscandir; #ifdef STREAMPLAN + /* + * We need to set locator_type for parallel query, cause we may send + * this value to bg worker. If not, locator_type is the initial value '\0', + * which make the later serialized plan truncated. + */ + pathnode->path.locator_type = rel->locator_type; if (IS_STREAM_PLAN) { pathnode->path.distribute_keys = rel->distribute_keys; pathnode->path.locator_type = rel->locator_type; @@ -2012,7 +2041,7 @@ IndexPath* create_index_path(PlannerInfo* root, IndexOptInfo* index, List* index } #endif - cost_index(pathnode, root, loop_count); + cost_index(pathnode, root, loop_count, partial_path); return pathnode; } @@ -2178,7 +2207,9 @@ AppendPath* create_append_path(PlannerInfo* root, RelOptInfo* rel, List* subpath pathnode->path.parallel_aware = parallel_aware; pathnode->path.parallel_safe = rel->consider_parallel; pathnode->path.parallel_workers = parallel_workers; - pathnode->path.pathkeys = NIL; /* result is always considered unsorted */ + pathnode->path.pathkeys = NIL; /* result is always considered + * unsorted */ + pathnode->subpaths = subpaths; /* * For parallel append, non-partial paths are sorted by descending total @@ -2995,6 +3026,9 @@ GatherPath *create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath pathnode->subpath = subpath; pathnode->single_copy = false; +#ifdef STREAMPLAN + inherit_path_locator_info((Path*)pathnode, subpath); +#endif if (pathnode->path.parallel_workers == 0) { pathnode->path.pathkeys = subpath->pathkeys; pathnode->path.parallel_workers = 1; @@ -3047,7 +3081,7 @@ Path* create_subqueryscan_path(PlannerInfo* root, RelOptInfo* rel, List* pathkey pathnode->param_info = get_baserel_parampathinfo(root, rel, required_outer); pathnode->parallel_aware = false; - pathnode->parallel_safe = rel->consider_parallel; + pathnode->parallel_safe = rel->consider_parallel && rel->subplan->parallel_safe; pathnode->parallel_workers = 0; pathnode->pathkeys = pathkeys; @@ -3932,7 +3966,7 @@ Path* reparameterize_path(PlannerInfo* root, Path* path, Relids required_outer, securec_check(errorno, "", ""); newpath->path.param_info = get_baserel_parampathinfo(root, rel, required_outer); - cost_index(newpath, root, loop_count); + cost_index(newpath, root, loop_count, false); return (Path*)newpath; } case T_BitmapHeapScan: { diff --git a/src/gausskernel/optimizer/util/plancat.cpp b/src/gausskernel/optimizer/util/plancat.cpp index 6dc7f12bd..9de8b569e 100644 --- a/src/gausskernel/optimizer/util/plancat.cpp +++ b/src/gausskernel/optimizer/util/plancat.cpp @@ -178,6 +178,9 @@ void get_relation_info(PlannerInfo* root, Oid relationObjectId, bool inhparent, &rel->allvisfrac, &sampledPartitionIds); + /* Retrieve the parallel_workers reloption, or -1 if not set. */ + rel->rel_parallel_workers = RelationGetParallelWorkers(relation, -1); + /* * Make list of indexes. Ignore indexes on system catalogs if told to. * Don't bother with indexes for an inheritance parent, either. @@ -283,6 +286,7 @@ void get_relation_info(PlannerInfo* root, Oid relationObjectId, bool inhparent, info->amoptionalkey = indexRelation->rd_am->amoptionalkey; info->amsearcharray = indexRelation->rd_am->amsearcharray; info->amsearchnulls = indexRelation->rd_am->amsearchnulls; + info->amcanparallel = indexRelation->rd_rel->relam == BTREE_AM_OID; info->amhasgettuple = OidIsValid(indexRelation->rd_am->amgettuple); info->amhasgetbitmap = OidIsValid(indexRelation->rd_am->amgetbitmap); diff --git a/src/gausskernel/optimizer/util/relnode.cpp b/src/gausskernel/optimizer/util/relnode.cpp index b0b852be8..cd8c5a965 100755 --- a/src/gausskernel/optimizer/util/relnode.cpp +++ b/src/gausskernel/optimizer/util/relnode.cpp @@ -171,6 +171,7 @@ RelOptInfo* build_simple_rel(PlannerInfo* root, int relid, RelOptKind reloptkind rel->rows = 0; rel->width = 0; rel->consider_parallel = false; /* might get changed later */ + rel->rel_parallel_workers = -1; /* set up in GetRelationInfo */ rel->encodedwidth = 0; rel->encodednum = 0; rel->reltargetlist = NIL; diff --git a/src/gausskernel/runtime/executor/execParallel.cpp b/src/gausskernel/runtime/executor/execParallel.cpp index 484c61f19..cfedc6ed8 100644 --- a/src/gausskernel/runtime/executor/execParallel.cpp +++ b/src/gausskernel/runtime/executor/execParallel.cpp @@ -27,6 +27,7 @@ #include "executor/executor.h" #include "executor/nodeSeqscan.h" #include "executor/nodeAppend.h" +#include "executor/nodeIndexscan.h" #include "executor/tqueue.h" #include "nodes/nodeFuncs.h" #include "optimizer/planmain.h" @@ -115,7 +116,7 @@ static char *ExecSerializePlan(Plan *plan, EState *estate) pstmt->rtable = estate->es_range_table; pstmt->resultRelations = NIL; pstmt->utilityStmt = NULL; - pstmt->subplans = NIL; + pstmt->subplans = estate->es_plannedstmt->subplans; pstmt->rewindPlanIDs = NULL; pstmt->rowMarks = NIL; pstmt->nParamExec = estate->es_plannedstmt->nParamExec; @@ -150,6 +151,9 @@ static bool ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateConte case T_SeqScanState: ExecSeqScanEstimate((SeqScanState *)planstate, e->pcxt); break; + case T_IndexScanState: + ExecIndexScanEstimate((IndexScanState*)planstate, e->pcxt); + break; case T_AppendState: ExecAppendEstimate((AppendState*)planstate, e->pcxt); break; @@ -194,6 +198,10 @@ static bool ExecParallelInitializeDSM(PlanState *planstate, ExecParallelInitiali ExecSeqScanInitializeDSM((SeqScanState *)planstate, d->pcxt, cxt->pwCtx->queryInfo.pscan_num); cxt->pwCtx->queryInfo.pscan_num++; break; + case T_IndexScanState: + ExecIndexScanInitializeDSM((IndexScanState*)planstate, d->pcxt, cxt->pwCtx->queryInfo.piscan_num); + cxt->pwCtx->queryInfo.piscan_num++; + break; case T_AppendState: ExecAppendInitializeDSM((AppendState *)planstate, d->pcxt, cxt->pwCtx->queryInfo.pappend_num); cxt->pwCtx->queryInfo.pappend_num++; @@ -364,6 +372,7 @@ ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, } queryInfo.pscan = (ParallelHeapScanDesc *)palloc0(sizeof(ParallelHeapScanDesc) * e.nnodes); + queryInfo.piscan = (ParallelIndexScanDesc *)palloc0(sizeof(ParallelIndexScanDesc) * e.nnodes); queryInfo.pappend = (ParallelAppendState**)palloc0(sizeof(ParallelAppendState*) * e.nnodes); /* @@ -600,6 +609,9 @@ static bool ExecParallelInitializeWorker(PlanState *planstate, void *context) case T_SeqScanState: ExecSeqScanInitializeWorker((SeqScanState *)planstate, context); break; + case T_IndexScanState: + ExecIndexScanInitializeWorker((IndexScanState *)planstate, context); + break; case T_AppendState: ExecAppendInitializeWorker((AppendState *)planstate, context); break; diff --git a/src/gausskernel/runtime/executor/nodeIndexscan.cpp b/src/gausskernel/runtime/executor/nodeIndexscan.cpp index 9db0718cd..7234f4aae 100755 --- a/src/gausskernel/runtime/executor/nodeIndexscan.cpp +++ b/src/gausskernel/runtime/executor/nodeIndexscan.cpp @@ -21,6 +21,9 @@ * ExecEndIndexScan releases all storage. * ExecIndexMarkPos marks scan position. * ExecIndexRestrPos restores scan position. + * ExecIndexScanEstimate estimates DSM space needed for parallel index scan + * ExecIndexScanInitializeDSM initialize DSM for parallel indexscan + * ExecIndexScanInitializeWorker attach to DSM info in parallel worker */ #include "postgres.h" #include "knl/knl_variable.h" @@ -179,6 +182,19 @@ TupleTableSlot* ExecIndexScan(IndexScanState* node) */ void ExecReScanIndexScan(IndexScanState* node) { + bool reset_parallel_scan = true; + + /* + * If we are here to just update the scan keys, then don't reset parallel + * scan. We don't want each of the participating process in the parallel + * scan to update the shared parallel scan state at the start of the scan. + * It is quite possible that one of the participants has already begun + * scanning the index when another has yet to start it. + */ + if (node->iss_NumRuntimeKeys != 0 && !node->iss_RuntimeKeysReady) { + reset_parallel_scan = false; + } + /* * For recursive-stream rescan, if number of RuntimeKeys not euqal zero, * just return without rescan. @@ -231,9 +247,20 @@ void ExecReScanIndexScan(IndexScanState* node) } } - /* reset index scan */ - abs_idx_rescan( - node->iss_ScanDesc, node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_OrderByKeys, node->iss_NumOrderByKeys); + /* + * Reset (parallel) index scan. For parallel-aware nodes, the scan + * descriptor is initialized during actual execution of node and we can + * reach here before that (ex. during execution of nest loop join). So, + * avoid updating the scan descriptor at that time. + */ + if (node->iss_ScanDesc) { + abs_idx_rescan(node->iss_ScanDesc, node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_OrderByKeys, + node->iss_NumOrderByKeys); + + if (reset_parallel_scan && GetIndexScanDesc(node->iss_ScanDesc)->parallel_scan) { + index_parallelrescan(GetIndexScanDesc(node->iss_ScanDesc)); + } + } ExecScanReScan(&node->ss); } @@ -678,28 +705,36 @@ IndexScanState* ExecInitIndexScan(IndexScan* node, EState* estate, int eflags) } } else { /* - * Initialize scan descriptor. + * for parallel-aware node, we initialize the scan descriptor after + * initializing the shared memory for parallel execution. */ - index_state->iss_ScanDesc = abs_idx_beginscan(current_relation, - index_state->iss_RelationDesc, - estate->es_snapshot, - index_state->iss_NumScanKeys, - index_state->iss_NumOrderByKeys, - (ScanState*)index_state); + if (!node->scan.plan.parallel_aware) { + /* + * Initialize scan descriptor. + */ + index_state->iss_ScanDesc = abs_idx_beginscan(current_relation, + index_state->iss_RelationDesc, + estate->es_snapshot, + index_state->iss_NumScanKeys, + index_state->iss_NumOrderByKeys, + (ScanState*)index_state); + } } - /* - * If no run-time keys to calculate, go ahead and pass the scankeys to the - * index AM. - */ - if (index_state->iss_ScanDesc == NULL) { - index_state->ss.ps.stubType = PST_Scan; - } else if (index_state->iss_NumRuntimeKeys == 0) { - abs_idx_rescan_local(index_state->iss_ScanDesc, - index_state->iss_ScanKeys, - index_state->iss_NumScanKeys, - index_state->iss_OrderByKeys, - index_state->iss_NumOrderByKeys); + if (!node->scan.plan.parallel_aware) { + /* + * If no run-time keys to calculate, go ahead and pass the scankeys to the + * index AM. + */ + if (index_state->iss_ScanDesc == NULL) { + index_state->ss.ps.stubType = PST_Scan; + } else if (index_state->iss_NumRuntimeKeys == 0) { + abs_idx_rescan_local(index_state->iss_ScanDesc, + index_state->iss_ScanKeys, + index_state->iss_NumScanKeys, + index_state->iss_OrderByKeys, + index_state->iss_NumOrderByKeys); + } } /* @@ -1337,3 +1372,85 @@ void ExecInitPartitionForIndexScan(IndexScanState* index_state, EState* estate) } } } + +/* ---------------------------------------------------------------- + * Parallel Scan Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecIndexScanEstimate + * + * estimates the space required to serialize indexscan node. + * ---------------------------------------------------------------- + */ +void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt) +{ + EState *estate = node->ss.ps.state; + node->iss_PscanLen = index_parallelscan_estimate(node->iss_RelationDesc, estate->es_snapshot); +} + +/* ---------------------------------------------------------------- + * ExecIndexScanInitializeDSM + * + * Set up a parallel index scan descriptor. + * ---------------------------------------------------------------- + */ +void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt, int nodeid) +{ + EState *estate = node->ss.ps.state; + knl_u_parallel_context *cxt = (knl_u_parallel_context *)pcxt->seg; + + /* Here we can't use palloc, cause we have switch to old memctx in ExecInitParallelPlan */ + cxt->pwCtx->queryInfo.piscan[nodeid] = + (ParallelIndexScanDesc)MemoryContextAllocZero(cxt->memCtx, node->iss_PscanLen); + index_parallelscan_initialize(node->ss.ss_currentRelation, node->iss_PscanLen, node->iss_RelationDesc, + estate->es_snapshot, cxt->pwCtx->queryInfo.piscan[nodeid]); + cxt->pwCtx->queryInfo.piscan[nodeid]->plan_node_id = node->ss.ps.plan->plan_node_id; + node->iss_ScanDesc = (AbsTblScanDesc)index_beginscan_parallel(node->ss.ss_currentRelation, node->iss_RelationDesc, + node->iss_NumScanKeys, node->iss_NumOrderByKeys, cxt->pwCtx->queryInfo.piscan[nodeid]); + + /* + * If no run-time keys to calculate, go ahead and pass the scankeys to the + * index AM. + */ + if (node->iss_NumRuntimeKeys == 0) { + abs_idx_rescan(node->iss_ScanDesc, node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_OrderByKeys, + node->iss_NumOrderByKeys); + } +} + +/* ---------------------------------------------------------------- + * ExecIndexScanInitializeWorker + * + * Copy relevant information from TOC into planstate. + * ---------------------------------------------------------------- + */ +void ExecIndexScanInitializeWorker(IndexScanState *node, void *context) +{ + ParallelIndexScanDesc piscan = NULL; + knl_u_parallel_context *cxt = (knl_u_parallel_context *)context; + + for (int i = 0; i < cxt->pwCtx->queryInfo.piscan_num; i++) { + if (node->ss.ps.plan->plan_node_id == cxt->pwCtx->queryInfo.piscan[i]->plan_node_id) { + piscan = cxt->pwCtx->queryInfo.piscan[i]; + break; + } + } + + if (piscan == NULL) { + ereport(ERROR, (errmsg("could not find plan info, plan node id:%d", node->ss.ps.plan->plan_node_id))); + } + + node->iss_ScanDesc = (AbsTblScanDesc)index_beginscan_parallel(node->ss.ss_currentRelation, node->iss_RelationDesc, + node->iss_NumScanKeys, node->iss_NumOrderByKeys, piscan); + + /* + * If no run-time keys to calculate, go ahead and pass the scankeys to the + * index AM. + */ + if (node->iss_NumRuntimeKeys == 0) { + abs_idx_rescan(node->iss_ScanDesc, node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_OrderByKeys, + node->iss_NumOrderByKeys); + } +} diff --git a/src/gausskernel/storage/access/common/reloptions.cpp b/src/gausskernel/storage/access/common/reloptions.cpp index 317716517..eadd7fffa 100644 --- a/src/gausskernel/storage/access/common/reloptions.cpp +++ b/src/gausskernel/storage/access/common/reloptions.cpp @@ -26,6 +26,7 @@ #include "commands/defrem.h" #include "commands/tablespace.h" #include "nodes/makefuncs.h" +#include "postmaster/postmaster.h" #include "pgxc/redistrib.h" #include "tsearch/ts_public.h" #include "utils/array.h" @@ -178,6 +179,15 @@ static relopt_int intRelOpts[] = {{{"fillfactor", "Packs table pages only to thi {{"rel_cn_oid", "rel oid on coordinator", RELOPT_KIND_HEAP}, 0, 0, 2000000000}, + { + { + "parallel_workers", + "Number of parallel processes that can be used per executor node for this relation.", + RELOPT_KIND_HEAP + }, + -1, 0, MAX_BACKENDS + }, + /* list terminator */ {{NULL}}}; @@ -1444,8 +1454,7 @@ void ForbidToSetOptionsForPSort(List* options) } /* - * Option parser for anything that uses StdRdOptions (i.e. fillfactor and - * autovacuum) + * Option parser for anything that uses StdRdOptions. */ bytea* default_reloptions(Datum reloptions, bool validate, relopt_kind kind) { @@ -1504,7 +1513,8 @@ bytea* default_reloptions(Datum reloptions, bool validate, relopt_kind kind) {"user_catalog_table", RELOPT_TYPE_BOOL, offsetof(StdRdOptions, user_catalog_table)}, {"hashbucket", RELOPT_TYPE_BOOL, offsetof(StdRdOptions, hashbucket)}, {"on_commit_delete_rows", RELOPT_TYPE_BOOL, offsetof(StdRdOptions, on_commit_delete_rows)}, - {"wait_clean_gpi", RELOPT_TYPE_STRING, offsetof(StdRdOptions, wait_clean_gpi)}}; + {"wait_clean_gpi", RELOPT_TYPE_STRING, offsetof(StdRdOptions, wait_clean_gpi)}, + {"parallel_workers", RELOPT_TYPE_INT, offsetof(StdRdOptions, parallel_workers)}}; options = parseRelOptions(reloptions, validate, kind, &numoptions); diff --git a/src/gausskernel/storage/access/index/indexam.cpp b/src/gausskernel/storage/access/index/indexam.cpp index e6be3876c..20a68a487 100755 --- a/src/gausskernel/storage/access/index/indexam.cpp +++ b/src/gausskernel/storage/access/index/indexam.cpp @@ -21,6 +21,10 @@ * index_insert - insert an index tuple into a relation * index_markpos - mark a scan position * index_restrpos - restore a scan position + * index_parallelscan_estimate - estimate shared memory for parallel scan + * index_parallelscan_initialize - initialize parallel scan + * index_parallelrescan - (re)start a parallel scan of an index + * index_beginscan_parallel - join parallel index scan * index_getnext_tid - get the next TID from a scan * index_fetch_heap - get the scan's next heap tuple * index_getnext - get the next heap tuple from a scan @@ -155,8 +159,13 @@ const IndexAm g_HeapIdxAm = {.idx_rescan = (idx_rescan_t)index_rescan, .idx_getnext = (idx_getnext_t)index_getnext, .idx_getbitmap = (idx_getbitmap_t)index_getbitmap}; -static IndexScanDesc index_beginscan_internal(Relation index_relation, int nkeys, int norderbys, Snapshot snapshot); +static IndexScanDesc index_beginscan_internal(Relation index_relation, int nkeys, + int norderbys, Snapshot snapshot, + ParallelIndexScanDesc pscan, bool temp_snap); +extern Size btestimateparallelscan(void); +extern void btinitparallelscan(void* target); +extern void btparallelrescan(IndexScanDesc scan); /* ---------------- * index_open - open an index relation by relation OID @@ -252,7 +261,7 @@ IndexScanDesc index_beginscan( { IndexScanDesc scan; - scan = index_beginscan_internal(index_relation, nkeys, norderbys, snapshot); + scan = index_beginscan_internal(index_relation, nkeys, norderbys, snapshot, NULL, false); /* * Save additional parameters into the scandesc. Everything else was set @@ -278,7 +287,7 @@ IndexScanDesc index_beginscan_bitmap(Relation index_relation, Snapshot snapshot, { IndexScanDesc scan; - scan = index_beginscan_internal(index_relation, nkeys, 0, snapshot); + scan = index_beginscan_internal(index_relation, nkeys, 0, snapshot, NULL, false); /* * Save additional parameters into the scandesc. Everything else was set @@ -292,7 +301,9 @@ IndexScanDesc index_beginscan_bitmap(Relation index_relation, Snapshot snapshot, /* * index_beginscan_internal --- common code for index_beginscan variants */ -static IndexScanDesc index_beginscan_internal(Relation index_relation, int nkeys, int norderbys, Snapshot snapshot) +static IndexScanDesc index_beginscan_internal(Relation index_relation, int nkeys, + int norderbys, Snapshot snapshot, + ParallelIndexScanDesc pscan, bool temp_snap) { IndexScanDesc scan; FmgrInfo* procedure = NULL; @@ -316,6 +327,11 @@ static IndexScanDesc index_beginscan_internal(Relation index_relation, int nkeys scan->sd.type = T_ScanDesc_Index; scan->sd.idxAm = &g_HeapIdxAm; + + /* Initialize information for parallel scan. */ + scan->parallel_scan = pscan; + scan->xs_temp_snap = temp_snap; + return scan; } @@ -386,6 +402,10 @@ void index_endscan(IndexScanDesc scan) GPIScanEnd(scan->xs_gpi_scan); } + if (scan->xs_temp_snap) { + UnregisterSnapshot(scan->xs_snapshot); + } + /* Release the scan data structure itself */ IndexScanEnd(scan); } @@ -436,6 +456,102 @@ void index_restrpos(IndexScanDesc scan) (void)FunctionCall1(procedure, PointerGetDatum(scan)); } +/* + * index_parallelscan_estimate - estimate shared memory for parallel scan + * + * Currently, we don't pass any information to the AM-specific estimator, + * so it can probably only return a constant. In the future, we might need + * to pass more information. + */ +Size index_parallelscan_estimate(Relation index_relation, Snapshot snapshot) +{ + RELATION_CHECKS; + + Size nbytes = offsetof(ParallelIndexScanDescData, ps_snapshot_data); + nbytes = add_size(nbytes, EstimateSnapshotSpace(snapshot)); + nbytes = MAXALIGN(nbytes); + + /* We reach heare only if the index type is btree */ + Assert(index_relation->rd_rel->relam == BTREE_AM_OID); + + /* add the needed size of parallel scan */ + nbytes = add_size(nbytes, btestimateparallelscan()); + + return nbytes; +} + +/* + * index_parallelscan_initialize - initialize parallel scan + * + * We initialize both the ParallelIndexScanDesc proper and the AM-specific + * information which follows it. + * + * This function calls access method specific initialization routine to + * initialize am specific information. Call this just once in the leader + * process; then, individual workers attach via index_beginscan_parallel. + */ +void index_parallelscan_initialize(Relation heap_relation, Size pscan_len, Relation index_relation, + Snapshot snapshot, ParallelIndexScanDesc target) +{ + RELATION_CHECKS; + + Size offset = add_size(offsetof(ParallelIndexScanDescData, ps_snapshot_data), EstimateSnapshotSpace(snapshot)); + offset = MAXALIGN(offset); + + target->ps_relid = RelationGetRelid(heap_relation); + target->ps_indexid = RelationGetRelid(index_relation); + target->ps_offset = offset; + SerializeSnapshot(snapshot, target->ps_snapshot_data, + pscan_len - offsetof(ParallelIndexScanDescData, ps_snapshot_data)); + + /* We reach heare only if the index type is btree */ + Assert(index_relation->rd_rel->relam == BTREE_AM_OID); + + /* Initial the parallel scan */ + void *amtarget = OffsetToPointer(target, offset); + btinitparallelscan(amtarget); +} + +/* ---------------- + * index_parallelrescan - (re)start a parallel scan of an index + * ---------------- + */ +void index_parallelrescan(IndexScanDesc scan) +{ + SCAN_CHECKS; + + /* We reach heare only if the index type is btree */ + Assert(scan->indexRelation->rd_rel->relam == BTREE_AM_OID); + + /* Reset the parallel scan */ + btparallelrescan(scan); +} + +/* + * index_beginscan_parallel - join parallel index scan + * + * Caller must be holding suitable locks on the heap and the index. + */ +IndexScanDesc index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys, int norderbys, + ParallelIndexScanDesc pscan) +{ + Assert(RelationGetRelid(heaprel) == pscan->ps_relid); + Snapshot snapshot = RestoreSnapshot(pscan->ps_snapshot_data, + pscan->pscan_len - offsetof(ParallelIndexScanDescData, ps_snapshot_data)); + RegisterSnapshot(snapshot); + + IndexScanDesc scan = index_beginscan_internal(indexrel, nkeys, norderbys, snapshot, pscan, true); + + /* + * Save additional parameters into the scandesc. Everything else was set + * up by index_beginscan_internal. + */ + scan->heapRelation = heaprel; + scan->xs_snapshot = snapshot; + + return scan; +} + /* ---------------- * index_getnext_tid - get the next TID from a scan * diff --git a/src/gausskernel/storage/access/nbtree/nbtree.cpp b/src/gausskernel/storage/access/nbtree/nbtree.cpp index 6d3add9bd..172eccb7a 100755 --- a/src/gausskernel/storage/access/nbtree/nbtree.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtree.cpp @@ -24,6 +24,7 @@ #include "access/xlog.h" #include "catalog/index.h" #include "commands/vacuum.h" +#include "pgstat.h" #include "storage/indexfsm.h" #include "storage/ipc.h" #include "storage/lmgr.h" @@ -48,8 +49,47 @@ typedef struct { MemoryContext pagedelcontext; } BTVacState; -static void btbuildCallback( - Relation index, HeapTuple htup, Datum* values, const bool* isnull, bool tupleIsAlive, void* state); +/* + * BTPARALLEL_NOT_INITIALIZED indicates that the scan has not started. + * + * BTPARALLEL_ADVANCING indicates that some process is advancing the scan to + * a new page; others must wait. + * + * BTPARALLEL_IDLE indicates that no backend is currently advancing the scan + * to a new page; some process can start doing that. + * + * BTPARALLEL_DONE indicates that the scan is complete (including error exit). + * We reach this state once for every distinct combination of array keys. + */ +typedef enum { + BTPARALLEL_NOT_INITIALIZED, + BTPARALLEL_ADVANCING, + BTPARALLEL_IDLE, + BTPARALLEL_DONE +} BTPS_State; + +#define BTCV_WAIT_TIME 1 + +/* + * BTParallelScanDescData contains btree specific shared information required + * for parallel scan. + */ +typedef struct BTParallelScanDescData { + BlockNumber btps_scanPage; /* latest or next page to be scanned */ + BTPS_State btps_pageStatus; /* indicates whether next page is available + * for scan. see above for possible states of + * parallel scan. */ + int btps_arrayKeyCount; /* count indicating number of array + * scan keys processed by parallel + * scan */ + pthread_mutex_t btps_cv_mutex; /* protects btps_cv */ + pthread_cond_t btps_cv; /* used to synchronize parallel scan */ +} BTParallelScanDescData; + +typedef struct BTParallelScanDescData *BTParallelScanDesc; + +static void btbuildCallback(Relation index, HeapTuple htup, Datum *values, const bool *isnull, bool tupleIsAlive, + void *state); static void btvacuumscan(IndexVacuumInfo* info, IndexBulkDeleteResult* stats, IndexBulkDeleteCallback callback, void* callback_state, BTCycleId cycleid); static void btvacuumpage(BTVacState* vstate, BlockNumber blkno, BlockNumber orig_blkno); @@ -466,6 +506,7 @@ Datum btrescan(PG_FUNCTION_ARGS) so->markPos.buf = InvalidBuffer; } so->markItemIndex = -1; + so->arrayKeyCount = 0; /* * Allocate tuple workspace arrays, if needed for an index-only scan and @@ -626,6 +667,194 @@ Datum btrestrpos(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* + * btestimateparallelscan -- estimate storage for BTParallelScanDescData + */ +Size btestimateparallelscan(void) +{ + return sizeof(BTParallelScanDescData); +} + +/* + * btinitparallelscan -- initialize BTParallelScanDesc for parallel btree scan + */ +void btinitparallelscan(void* target) +{ + BTParallelScanDesc bt_target = (BTParallelScanDesc)target; + + bt_target->btps_scanPage = InvalidBlockNumber; + bt_target->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + bt_target->btps_arrayKeyCount = 0; + pthread_mutex_init(&bt_target->btps_cv_mutex, NULL); + pthread_cond_init(&bt_target->btps_cv, NULL); +} + +/* + * btparallelrescan() -- reset parallel scan + */ +void btparallelrescan(IndexScanDesc scan) +{ + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + + Assert(parallel_scan); + + BTParallelScanDesc btscan = (BTParallelScanDesc)OffsetToPointer((void *)parallel_scan, parallel_scan->ps_offset); + + /* + * In theory, we don't need to acquire the spinlock here, because there + * shouldn't be any other workers running at this point, but we do so for + * consistency. + */ + WLMContextLock bt_lock(&btscan->btps_cv_mutex); + bt_lock.Lock(); + btscan->btps_scanPage = InvalidBlockNumber; + btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + btscan->btps_arrayKeyCount = 0; + bt_lock.UnLock(); +} + +/* + * _bt_parallel_seize() -- Begin the process of advancing the scan to a new + * page. Other scans must wait until we call bt_parallel_release() or + * bt_parallel_done(). + * + * The return value is true if we successfully seized the scan and false + * if we did not. The latter case occurs if no pages remain for the current + * set of scankeys. + * + * If the return value is true, *pageno returns the next or current page + * of the scan (depending on the scan direction). An invalid block number + * means the scan hasn't yet started, and P_NONE means we've reached the end. + * The first time a participating process reaches the last page, it will return + * true and set *pageno to P_NONE; after that, further attempts to seize the + * scan will return false. + * + * Callers should ignore the value of pageno if the return value is false. + */ +bool _bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno) +{ + BTScanOpaque so = (BTScanOpaque)scan->opaque; + BTPS_State pageStatus; + bool exit_loop = false; + bool status = true; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + + *pageno = P_NONE; + + BTParallelScanDesc btscan = (BTParallelScanDesc)OffsetToPointer((void *)parallel_scan, parallel_scan->ps_offset); + + WLMContextLock bt_lock(&btscan->btps_cv_mutex); + + bt_lock.Lock(); + while (1) { + CHECK_FOR_INTERRUPTS(); + + pageStatus = btscan->btps_pageStatus; + if (so->arrayKeyCount < btscan->btps_arrayKeyCount) { + /* Parallel scan has already advanced to a new set of scankeys. */ + status = false; + } else if (pageStatus == BTPARALLEL_DONE) { + /* + * We're done with this set of scankeys. This may be the end, or + * there could be more sets to try. + */ + status = false; + } else if (pageStatus != BTPARALLEL_ADVANCING) { + /* + * We have successfully seized control of the scan for the purpose + * of advancing it to a new page! + */ + btscan->btps_pageStatus = BTPARALLEL_ADVANCING; + *pageno = btscan->btps_scanPage; + exit_loop = true; + } + if (exit_loop || !status) { + break; + } + + bt_lock.ConditionTimedWait(&btscan->btps_cv, BTCV_WAIT_TIME); + } + bt_lock.UnLock(); + + return status; +} + +/* + * _bt_parallel_release() -- Complete the process of advancing the scan to a + * new page. We now have the new value btps_scanPage; some other backend + * can now begin advancing the scan. + */ +void _bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page) +{ + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan = (BTParallelScanDesc)OffsetToPointer((void *)parallel_scan, parallel_scan->ps_offset); + + WLMContextLock bt_lock(&btscan->btps_cv_mutex); + + bt_lock.Lock(); + btscan->btps_scanPage = scan_page; + btscan->btps_pageStatus = BTPARALLEL_IDLE; + bt_lock.ConditionWakeUp(&btscan->btps_cv); + bt_lock.UnLock(); +} + +/* + * _bt_parallel_done() -- Mark the parallel scan as complete. + * + * When there are no pages left to scan, this function should be called to + * notify other workers. Otherwise, they might wait forever for the scan to + * advance to the next page. + */ +void _bt_parallel_done(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque)scan->opaque; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + + /* Do nothing, for non-parallel scans */ + if (parallel_scan == NULL) { + return; + } + + BTParallelScanDesc btscan = (BTParallelScanDesc)OffsetToPointer((void *)parallel_scan, parallel_scan->ps_offset); + + WLMContextLock bt_lock(&btscan->btps_cv_mutex); + + bt_lock.Lock(); + if (so->arrayKeyCount >= btscan->btps_arrayKeyCount && btscan->btps_pageStatus != BTPARALLEL_DONE) { + btscan->btps_pageStatus = BTPARALLEL_DONE; + /* wake up all the workers associated with this parallel scan */ + bt_lock.ConditionWakeUpAll(&btscan->btps_cv); + } + + bt_lock.UnLock(); +} + +/* + * _bt_parallel_advance_array_keys() -- Advances the parallel scan for array + * keys. + * + * Updates the count of array keys processed for both local and parallel + * scans. + */ +void _bt_parallel_advance_array_keys(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque)scan->opaque; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan = (BTParallelScanDesc)OffsetToPointer((void *)parallel_scan, parallel_scan->ps_offset); + + so->arrayKeyCount++; + + WLMContextLock bt_lock(&btscan->btps_cv_mutex); + + bt_lock.Lock(); + if (btscan->btps_pageStatus == BTPARALLEL_DONE) { + btscan->btps_scanPage = InvalidBlockNumber; + btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + btscan->btps_arrayKeyCount++; + } + bt_lock.UnLock(); +} + /* * Bulk deletion of all index entries pointing to a set of heap tuples. * The set of target tuples is specified via a callback routine that tells diff --git a/src/gausskernel/storage/access/nbtree/nbtsearch.cpp b/src/gausskernel/storage/access/nbtree/nbtsearch.cpp index ac6488ee0..603ca0e73 100755 --- a/src/gausskernel/storage/access/nbtree/nbtsearch.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtsearch.cpp @@ -32,8 +32,11 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum); static void _bt_saveitem(BTScanOpaque so, int itemIndex, OffsetNumber offnum, IndexTuple itup, Oid partOid); static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir); +static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir); +static bool _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir); static Buffer _bt_walk_left(Relation rel, Buffer buf); static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir); +static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir); /* * _bt_search() -- Search the tree for a particular scankey, @@ -471,8 +474,10 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) ScanKeyData notnullkeys[INDEX_MAX_KEYS]; int keysCount = 0; int i; + bool status = true; StrategyNumber strat_total; BTScanPosItem* currItem = NULL; + BlockNumber blkno; pgstat_count_index_scan(rel); @@ -489,6 +494,27 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) if (!so->qual_ok) return false; + /* + * For parallel scans, get the starting page from shared state. If the + * scan has not started, proceed to find out first leaf page in the usual + * way while keeping other participating processes waiting. If the scan + * has already begun, use the page number from the shared structure. + */ + if (scan->parallel_scan != NULL) { + status = _bt_parallel_seize(scan, &blkno); + if (!status) { + return false; + } else if (blkno == P_NONE) { + _bt_parallel_done(scan); + return false; + } else if (blkno != InvalidBlockNumber) { + if (!_bt_parallel_readpage(scan, blkno, dir)) { + return false; + } + goto readcomplete; + } + } + /* ---------- * Examine the scan keys to discover where we need to start the scan. * @@ -654,8 +680,15 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) * the tree. Walk down that edge to the first or last key, and scan from * there. */ - if (keysCount == 0) - return _bt_endpoint(scan, dir); + if (keysCount == 0) { + bool match = _bt_endpoint(scan, dir); + if (!match) { + /* No match, so mark (parallel) scan finished */ + _bt_parallel_done(scan); + } + + return match; + } /* * We want to start the scan somewhere within the index. Set up an @@ -681,8 +714,10 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) */ ScanKey subkey = (ScanKey)DatumGetPointer(cur->sk_argument); Assert(subkey->sk_flags & SK_ROW_MEMBER); - if (subkey->sk_flags & SK_ISNULL) + if (subkey->sk_flags & SK_ISNULL) { + _bt_parallel_done(scan); return false; + } scankeys[i] = *subkey; /* @@ -881,7 +916,6 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) (void)_bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ, false); /* - * don't need to keep the stack around... * remember which buffer we have pinned, if any */ so->currPos.buf = buf; @@ -892,20 +926,19 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) * because nothing finer to lock exists. */ PredicateLockRelation(rel, scan->xs_snapshot); + + /* + * mark parallel scan as done, so that all the workers can finish + * their scan + */ + _bt_parallel_done(scan); + so->currPos.buf = InvalidBuffer; + return false; } else PredicateLockPage(rel, BufferGetBlockNumber(buf), scan->xs_snapshot); - /* initialize moreLeft/moreRight appropriately for scan direction */ - if (ScanDirectionIsForward(dir)) { - so->currPos.moreLeft = false; - so->currPos.moreRight = true; - } else { - so->currPos.moreLeft = true; - so->currPos.moreRight = false; - } - so->numKilled = 0; /* just paranoia */ - so->markItemIndex = -1; /* ditto */ + _bt_initialize_more_data(so, dir); /* position to the precise item on the page */ offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey); @@ -946,6 +979,7 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) /* Drop the lock, but not pin, on the current page */ LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK); +readcomplete: /* OK, itemIndex says what to return */ currItem = &so->currPos.items[so->currPos.itemIndex]; scan->xs_ctup.t_self = currItem->heapTid; @@ -1030,6 +1064,9 @@ bool _bt_next(IndexScanDesc scan, ScanDirection dir) * moreLeft or moreRight (as appropriate) is cleared if _bt_checkkeys reports * that there can be no more matching tuples in the current scan direction. * + * In the case of a parallel scan, caller must have called _bt_parallel_seize + * prior to calling this function; this function will invoke + * _bt_parallel_release before returning. * Returns true if any matching items found on the page, false if none. */ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum) @@ -1053,13 +1090,26 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber off tupdesc = RelationGetDescr(scan->indexRelation); PartitionOidAttr = IndexRelationGetNumberOfAttributes(scan->indexRelation); - /* we must have the buffer pinned and locked */ + /* + * We must have the buffer pinned and locked, but the usual macro can't be + * used here; this function is what makes it good for currPos. + */ Assert(BufferIsValid(so->currPos.buf)); /* We've pinned the buffer, nobody can prune this buffer, check whether snapshot is valid. */ CheckSnapshotIsValidException(scan->xs_snapshot, "_bt_readpage"); page = BufferGetPage(so->currPos.buf); opaque = (BTPageOpaqueInternal)PageGetSpecialPointer(page); + + /* allow next page be processed by parallel worker */ + if (scan->parallel_scan) { + if (ScanDirectionIsForward(dir)) { + _bt_parallel_release(scan, opaque->btpo_next); + } else { + _bt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf)); + } + } + minoff = P_FIRSTDATAKEY(opaque); maxoff = PageGetMaxOffsetNumber(page); @@ -1163,11 +1213,14 @@ static void _bt_saveitem(BTScanOpaque so, int itemIndex, OffsetNumber offnum, co /* * _bt_steppage() -- Step to next page containing valid data for scan * - * On entry, so->currPos.buf must be pinned and read-locked. We'll drop - * the lock and pin before moving to next page. + * On entry, if so->currPos.buf is valid the buffer is pinned but not locked; + * if pinned, we'll drop the pin before moving to next page. The buffer is + * not locked on entry. * - * On success exit, we hold pin and read-lock on the next interesting page, - * and so->currPos is updated to contain data from that page. + * On success exit, so->currPos is updated to contain data from the next + * interesting page. For success on a scan using a non-MVCC snapshot we hold + * a pin, but not a read lock, on that page. If we do not hold the pin, we + * set so->currPos.buf to InvalidBuffer. We return TRUE to indicate success. * * If there are no more matching records in the given direction, we drop all * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE. @@ -1176,8 +1229,8 @@ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) { BTScanOpaque so = (BTScanOpaque)scan->opaque; Relation rel; - Page page; - BTPageOpaqueInternal opaque; + BlockNumber blkno = InvalidBlockNumber; + bool status = true; /* we must have the buffer pinned and locked */ Assert(BufferIsValid(so->currPos.buf)); @@ -1210,19 +1263,86 @@ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) if (ScanDirectionIsForward(dir)) { /* Walk right to the next page with data */ - /* We must rely on the previously saved nextPage link! */ - BlockNumber blkno = so->currPos.nextPage; + if (scan->parallel_scan != NULL) { + /* + * Seize the scan to get the next block number; if the scan has + * ended already, bail out. + */ + status = _bt_parallel_seize(scan, &blkno); + if (!status) { + /* release the previous buffer, if pinned */ + _bt_relbuf(rel, so->currPos.buf); + so->currPos.buf = InvalidBuffer; + return false; + } + } else { + /* Not parallel, so use the previously-saved nextPage link. */ + blkno = so->currPos.nextPage; + } /* Remember we left a page with data */ so->currPos.moreLeft = true; + } else { + /* Remember we left a page with data */ + so->currPos.moreRight = true; - for (;;) { - /* release the previous buffer */ + if (scan->parallel_scan != NULL) { + /* + * Seize the scan to get the current block number; if the scan has + * ended already, bail out. + */ + status = _bt_parallel_seize(scan, &blkno); _bt_relbuf(rel, so->currPos.buf); - so->currPos.buf = InvalidBuffer; - /* if we're at end of scan, give up */ - if (blkno == P_NONE || !so->currPos.moreRight) + if (!status) { + so->currPos.buf = InvalidBuffer; return false; + } + } else { + /* Not parallel, so just use our own notion of the current page */ + blkno = so->currPos.currPage; + } + } + + if (!_bt_readnextpage(scan, blkno, dir)) { + return false; + } + + return true; +} + +/* + * _bt_readnextpage() -- Read next page containing valid data for scan + * + * On success exit, so->currPos is updated to contain data from the next + * interesting page. Caller is responsible to release lock and pin on + * buffer on success. We return TRUE to indicate success. + * + * If there are no more matching records in the given direction, we drop all + * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE. + */ +static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque)scan->opaque; + Page page; + BTPageOpaqueInternal opaque; + bool status = true; + + Relation rel = scan->indexRelation; + + if (ScanDirectionIsForward(dir)) { + for (;;) { + if (so->currPos.buf != InvalidBuffer) { + _bt_relbuf(rel, so->currPos.buf); + so->currPos.buf = InvalidBuffer; + } + /* + * if we're at end of scan, give up and mark parallel scan as + * done, so that all the workers can finish their scan + */ + if (blkno == P_NONE || !so->currPos.moreRight) { + _bt_parallel_done(scan); + return false; + } /* check for interrupts while we're not holding any buffer lock */ CHECK_FOR_INTERRUPTS(); /* step right one page */ @@ -1238,11 +1358,26 @@ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) break; } /* nope, keep going */ - blkno = opaque->btpo_next; + if (scan->parallel_scan != NULL) { + status = _bt_parallel_seize(scan, &blkno); + if (!status) { + _bt_relbuf(rel, so->currPos.buf); + so->currPos.buf = InvalidBuffer; + return false; + } + } else { + blkno = opaque->btpo_next; + } } } else { - /* Remember we left a page with data */ - so->currPos.moreRight = true; + /* + * Should only happen in parallel cases, when some other backend + * advanced the scan. + */ + if (so->currPos.currPage != blkno) { + _bt_relbuf(rel, so->currPos.buf); + so->currPos.currPage = blkno; + } /* * Walk left to the next page with data. This is much more complex @@ -1255,6 +1390,7 @@ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) /* Done if we know there are no matching keys to the left */ if (!so->currPos.moreLeft) { _bt_relbuf(rel, so->currPos.buf); + _bt_parallel_done(scan); so->currPos.buf = InvalidBuffer; return false; } @@ -1263,8 +1399,10 @@ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) so->currPos.buf = _bt_walk_left(rel, so->currPos.buf); /* if we're physically at end of index, return failure */ - if (so->currPos.buf == InvalidBuffer) + if (so->currPos.buf == InvalidBuffer) { + _bt_parallel_done(scan); return false; + } /* * Okay, we managed to move left to a non-deleted page. Done if @@ -1280,12 +1418,50 @@ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page))) break; } + + /* + * For parallel scans, get the last page scanned as it is quite + * possible that by the time we try to seize the scan, some other + * worker has already advanced the scan to a different page. We + * must continue based on the latest page scanned by any worker. + */ + if (scan->parallel_scan != NULL) { + _bt_relbuf(rel, so->currPos.buf); + status = _bt_parallel_seize(scan, &blkno); + if (!status) { + so->currPos.buf = InvalidBuffer; + return false; + } + so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ); + } } } return true; } +/* + * _bt_parallel_readpage() -- Read current page containing valid data for scan + * + * On success, release lock and maybe pin on buffer. We return TRUE to + * indicate success. + */ +static bool _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque)scan->opaque; + + _bt_initialize_more_data(so, dir); + + if (!_bt_readnextpage(scan, blkno, dir)) { + return false; + } + + /* Drop the lock, but not pin, on the current page */ + LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK); + + return true; +} + /* * _bt_walk_left() -- step left one page, if possible * @@ -1533,16 +1709,7 @@ static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir) /* remember which buffer we have pinned */ so->currPos.buf = buf; - /* initialize moreLeft/moreRight appropriately for scan direction */ - if (ScanDirectionIsForward(dir)) { - so->currPos.moreLeft = false; - so->currPos.moreRight = true; - } else { - so->currPos.moreLeft = true; - so->currPos.moreRight = false; - } - so->numKilled = 0; /* just paranoia */ - so->markItemIndex = -1; /* ditto */ + _bt_initialize_more_data(so, dir); /* * Now load data from the first page of the scan. @@ -1678,3 +1845,20 @@ bool _bt_check_natts(const Relation index, Page page, OffsetNumber offnum) } } +/* + * _bt_initialize_more_data() -- initialize moreLeft/moreRight appropriately + * for scan direction + */ +static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir) +{ + /* initialize moreLeft/moreRight appropriately for scan direction */ + if (ScanDirectionIsForward(dir)) { + so->currPos.moreLeft = false; + so->currPos.moreRight = true; + } else { + so->currPos.moreLeft = true; + so->currPos.moreRight = false; + } + so->numKilled = 0; /* just paranoia */ + so->markItemIndex = -1; /* ditto */ +} diff --git a/src/gausskernel/storage/access/nbtree/nbtutils.cpp b/src/gausskernel/storage/access/nbtree/nbtutils.cpp index 9df27b4ae..0e90f4e2c 100755 --- a/src/gausskernel/storage/access/nbtree/nbtutils.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtutils.cpp @@ -547,6 +547,11 @@ bool _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir) } } + /* advance parallel scan */ + if (scan->parallel_scan != NULL) { + _bt_parallel_advance_array_keys(scan); + } + return found; } diff --git a/src/include/access/genam.h b/src/include/access/genam.h index e76a38bb0..2594966a4 100644 --- a/src/include/access/genam.h +++ b/src/include/access/genam.h @@ -82,6 +82,8 @@ typedef bool (*IndexBulkDeleteCallback)(ItemPointer itemptr, void* state, Oid pa typedef struct IndexScanDescData* IndexScanDesc; typedef struct SysScanDescData* SysScanDesc; +typedef struct ParallelIndexScanDescData* ParallelIndexScanDesc; + /* * Enumeration specifying the type of uniqueness check to perform in * index_insert(). @@ -132,6 +134,12 @@ extern void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey or extern void index_endscan(IndexScanDesc scan); extern void index_markpos(IndexScanDesc scan); extern void index_restrpos(IndexScanDesc scan); +extern Size index_parallelscan_estimate(Relation indexRelation, Snapshot snapshot); +extern void index_parallelscan_initialize(Relation heapRelation, Size pscan_len, Relation indexRelation, + Snapshot snapshot, ParallelIndexScanDesc target); +extern void index_parallelrescan(IndexScanDesc scan); +extern IndexScanDesc index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys, int norderbys, + ParallelIndexScanDesc pscan); extern ItemPointer index_getnext_tid(IndexScanDesc scan, ScanDirection direction); extern HeapTuple index_fetch_heap(IndexScanDesc scan); extern HeapTuple index_getnext(IndexScanDesc scan, ScanDirection direction); diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index ae32a3a82..b070ea2ab 100755 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -525,6 +525,7 @@ typedef struct BTScanPosItem { /* what we remember about each match */ typedef struct BTScanPosData { Buffer buf; /* if valid, the buffer is pinned */ + BlockNumber currPage; /* page referenced by items array */ BlockNumber nextPage; /* page's right link when we scanned it */ /* @@ -579,6 +580,8 @@ typedef struct BTScanOpaqueData { ScanKey arrayKeyData; /* modified copy of scan->keyData */ int numArrayKeys; /* number of equality-type array keys (-1 if * there are any unsatisfiable array keys) */ + int arrayKeyCount; /* count indicating number of array scan keys + * processed */ BTArrayKeyInfo* arrayKeys; /* info about each equality-type array key */ MemoryContext arrayContext; /* scan-lifespan context for array data */ @@ -663,7 +666,7 @@ typedef struct BTOrderedIndexListElement { } BTOrderedIndexListElement; /* - * prototypes for functions in nbtree.c (external entry points for btree) + * external entry points for btree, in nbtree.c */ extern Datum btbuild(PG_FUNCTION_ARGS); extern Datum btbuildempty(PG_FUNCTION_ARGS); @@ -680,6 +683,19 @@ extern Datum btbulkdelete(PG_FUNCTION_ARGS); extern Datum btvacuumcleanup(PG_FUNCTION_ARGS); extern Datum btcanreturn(PG_FUNCTION_ARGS); extern Datum btoptions(PG_FUNCTION_ARGS); + +extern Size btestimateparallelscan(void); +extern void btinitparallelscan(void* bt_target); +extern void btparallelrescan(IndexScanDesc scan); + +/* + * prototypes for internal functions in nbtree.c + */ +extern bool _bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno); +extern void _bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page); +extern void _bt_parallel_done(IndexScanDesc scan); +extern void _bt_parallel_advance_array_keys(IndexScanDesc scan); + /* * this is the interface of merge 2 or more index for btree index * we also have similar interfaces for other kind of indexes, like hash/gist/gin diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 3ae8208c5..85dfd5f18 100755 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -155,6 +155,7 @@ typedef struct IndexScanDescData { ScanKey orderByData; /* array of ordering op descriptors */ bool xs_want_itup; /* caller requests index tuples */ bool xs_want_ext_oid; /* global partition index need partition oid */ + bool xs_temp_snap; /* unregister snapshot at scan end? */ /* signaling to index AM about killing index tuples */ bool kill_prior_tuple; /* last-returned tuple is dead */ @@ -177,6 +178,10 @@ typedef struct IndexScanDescData { /* state data for traversing HOT chains in index_getnext */ bool xs_continue_hot; /* T if must keep walking HOT chain */ + + /* parallel index scan information, in shared memory */ + ParallelIndexScanDesc parallel_scan; + /* put decompressed heap tuple data into xs_ctbuf_hdr be careful! when malloc memory should give extra mem for *xs_ctbuf_hdr. t_bits which is varlength arr */ @@ -199,6 +204,16 @@ typedef struct IndexScanDescData { #define IndexScanNeedSwitchPartRel(scan) \ ((scan)->xs_want_ext_oid && GPIScanCheckPartOid((scan)->xs_gpi_scan, (scan)->heapRelation->rd_id)) +/* Generic structure for parallel scans */ +typedef struct ParallelIndexScanDescData { + int plan_node_id; /* used to identify speicific plan */ + Oid ps_relid; + Oid ps_indexid; + Size ps_offset; /* Offset in bytes of am specific structure */ + uint32 pscan_len; /* total size of this struct, including phs_snapshot_data */ + char ps_snapshot_data[FLEXIBLE_ARRAY_MEMBER]; +} ParallelIndexScanDescData; + typedef struct HBktIdxScanDescData { AbsIdxScanDescData sd; Relation rs_rd; /* heap relation descriptor */ diff --git a/src/include/c.h b/src/include/c.h index 57dd2a9aa..7f652c79e 100644 --- a/src/include/c.h +++ b/src/include/c.h @@ -653,6 +653,8 @@ typedef NameData* Name; */ #define PointerIsAligned(pointer, type) (((intptr_t)(pointer) % (sizeof(type))) == 0) +#define OffsetToPointer(base, offset) ((void *)((char *) base + offset)) + #define OidIsValid(objectId) ((bool)((objectId) != InvalidOid)) #define RegProcedureIsValid(p) OidIsValid(p) diff --git a/src/include/catalog/pg_am.h b/src/include/catalog/pg_am.h index e96e396b5..4c69b8869 100755 --- a/src/include/catalog/pg_am.h +++ b/src/include/catalog/pg_am.h @@ -115,6 +115,12 @@ typedef FormData_pg_am *Form_pg_am; #define Anum_pg_am_amcostestimate 30 #define Anum_pg_am_amoptions 31 +/* ---------------- + * compiler constant for amtype + * ---------------- + */ +#define AMTYPE_INDEX 'i' /* index access method */ + /* ---------------- * initial contents of pg_am * ---------------- @@ -140,7 +146,7 @@ DATA(insert OID = 4039 ( psort 5 1 f f f f t t f t f f f 0 - - psortgettuple p DESCR("psort index access method"); #define PSORT_AM_OID 4039 -DATA(insert OID = 4239 ( cbtree 5 1 f f f f t t f t f f t 0 btinsert btbeginscan cbtreegettuple cbtreegetbitmap btrescan btendscan - - - cbtreebuild btbuildempty - - cbtreecanreturn cbtreecostestimate cbtreeoptions )); +DATA(insert OID = 4239 ( cbtree 5 1 f f f f t t f t f f t 0 btinsert btbeginscan cbtreegettuple cbtreegetbitmap btrescan btendscan - - - cbtreebuild btbuildempty - - cbtreecanreturn cbtreecostestimate cbtreeoptions )); DESCR("cstore btree index access method"); #define CBTREE_AM_OID 4239 diff --git a/src/include/executor/nodeIndexscan.h b/src/include/executor/nodeIndexscan.h index 4371c36c2..a2ecbe0fb 100644 --- a/src/include/executor/nodeIndexscan.h +++ b/src/include/executor/nodeIndexscan.h @@ -14,6 +14,7 @@ #ifndef NODEINDEXSCAN_H #define NODEINDEXSCAN_H +#include "access/parallel.h" #include "nodes/execnodes.h" extern IndexScanState* ExecInitIndexScan(IndexScan* node, EState* estate, int eflags); @@ -22,6 +23,9 @@ extern void ExecEndIndexScan(IndexScanState* node); extern void ExecIndexMarkPos(IndexScanState* node); extern void ExecIndexRestrPos(IndexScanState* node); extern void ExecReScanIndexScan(IndexScanState* node); +extern void ExecIndexScanEstimate(IndexScanState *node, ParallelContext *pcxt); +extern void ExecIndexScanInitializeDSM(IndexScanState *node, ParallelContext *pcxt, int nodeid); +extern void ExecIndexScanInitializeWorker(IndexScanState *node, void *context); /* * These routines are exported to share code with nodeIndexonlyscan.c and diff --git a/src/include/knl/knl_guc/knl_session_attr_sql.h b/src/include/knl/knl_guc/knl_session_attr_sql.h index e7ac7f383..31d89b9f9 100644 --- a/src/include/knl/knl_guc/knl_session_attr_sql.h +++ b/src/include/knl/knl_guc/knl_session_attr_sql.h @@ -156,6 +156,7 @@ typedef struct knl_session_attr_sql { int max_cn_temp_file_size; int default_statistics_target; int min_parallel_table_scan_size; + int min_parallel_index_scan_size; /* Memory Limit user could set in session */ int FencedUDFMemoryLimit; int64 g_default_expthresh; diff --git a/src/include/knl/knl_session.h b/src/include/knl/knl_session.h index 8c082eceb..36218d6cd 100644 --- a/src/include/knl/knl_session.h +++ b/src/include/knl/knl_session.h @@ -2038,6 +2038,7 @@ typedef struct ParallelAppendState ParallelAppendState; /* Info need to pass from leader to worker */ struct ParallelHeapScanDescData; +struct ParallelIndexScanDescData; typedef uint64 XLogRecPtr; typedef struct ParallelQueryInfo { struct SharedExecutorInstrumentation *instrumentation; @@ -2052,6 +2053,8 @@ typedef struct ParallelQueryInfo { ParallelHeapScanDescData **pscan; int pappend_num; ParallelAppendState **pappend; + int piscan_num; + ParallelIndexScanDescData **piscan; } ParallelQueryInfo; struct BTShared; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 8199eee29..043877102 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1608,6 +1608,8 @@ typedef struct { * RuntimeContext expr context for evaling runtime Skeys * RelationDesc index relation descriptor * ScanDesc index scan descriptor + * + * pscan_len size of parallel index scan descriptor * ---------------- */ typedef struct IndexScanState { @@ -1626,6 +1628,8 @@ typedef struct IndexScanState { List* iss_IndexPartitionList; LOCKMODE lockMode; Relation iss_CurrentIndexPartition; + + Size iss_PscanLen; } IndexScanState; /* ---------------- diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index bc59a9274..b145fb7da 100755 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -248,6 +248,7 @@ typedef struct Plan { * information needed for parallel query */ bool parallel_aware; /* engage parallel-aware logic? */ + bool parallel_safe; /* * machine learning model estimations diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index d7000a823..fe0732a7d 100755 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -642,6 +642,7 @@ typedef struct SubPlan { bool useHashTable; /* TRUE if it's okay to return FALSE when the spec result is UNKNOWN; this allows much simpler handling of null values */ bool unknownEqFalse; + bool parallel_safe; /* OK to use as part of parallel plan? */ /* Information for passing params into and out of the subselect: */ /* setParam and parParam are lists of integers (param IDs) */ List* setParam; /* initplan subqueries have to set these Params for parent plan */ diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index f8201fea6..109ebbfdc 100755 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -593,6 +593,7 @@ typedef struct RelOptInfo { /* use "struct Plan" to avoid including plannodes.h here */ struct Plan* subplan; /* if subquery */ PlannerInfo* subroot; /* if subquery */ + int rel_parallel_workers; /* wanted number of parallel workers */ /* use "struct FdwRoutine" to avoid including fdwapi.h here */ struct FdwRoutine* fdwroutine; /* if foreign table */ void* fdw_private; /* if foreign table */ @@ -700,6 +701,7 @@ typedef struct IndexOptInfo { bool amsearchnulls; /* can AM search for NULL/NOT NULL entries? */ bool amhasgettuple; /* does AM have amgettuple interface? */ bool amhasgetbitmap; /* does AM have amgetbitmap interface? */ + bool amcanparallel; /* does AM support parallel scan? */ } IndexOptInfo; /* diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 9cc8bcb3a..487f23586 100755 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -87,7 +87,7 @@ extern void cost_samplescan(Path* path, PlannerInfo* root, RelOptInfo* baserel, extern void cost_cstorescan(Path* path, PlannerInfo* root, RelOptInfo* baserel); extern void cost_dfsscan(Path* path, PlannerInfo* root, RelOptInfo* baserel); extern void cost_tsstorescan(Path *path, PlannerInfo *root, RelOptInfo *baserel); -extern void cost_index(IndexPath* path, PlannerInfo* root, double loop_count); +extern void cost_index(IndexPath* path, PlannerInfo* root, double loop_count, bool partial_path); extern void cost_bitmap_heap_scan( Path* path, PlannerInfo* root, RelOptInfo* baserel, ParamPathInfo* param_info, Path* bitmapqual, double loop_count); extern void cost_bitmap_and_node(BitmapAndPath* path, PlannerInfo* root); diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index d2460f477..8feb4603d 100755 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -59,7 +59,7 @@ extern Path* create_cstorescan_path(PlannerInfo* root, RelOptInfo* rel, int dop extern Path *create_tsstorescan_path(PlannerInfo* root, RelOptInfo* rel, int dop = 1); extern IndexPath* create_index_path(PlannerInfo* root, IndexOptInfo* index, List* indexclauses, List* indexclausecols, List* indexorderbys, List* indexorderbycols, List* pathkeys, ScanDirection indexscandir, bool indexonly, - Relids required_outer, double loop_count); + Relids required_outer, double loop_count, bool partial_path); extern Path* build_seqScanPath_by_indexScanPath(PlannerInfo* root, Path* index_path); extern bool CheckBitmapQualIsGlobalIndex(Path* bitmapqual); extern bool CheckBitmapHeapPathContainGlobalOrLocal(Path* bitmapqual); diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h index 1c0df07f9..5bb6f2702 100755 --- a/src/include/optimizer/paths.h +++ b/src/include/optimizer/paths.h @@ -20,6 +20,7 @@ extern RelOptInfo* make_one_rel(PlannerInfo* root, List* joinlist); extern RelOptInfo* standard_join_search(PlannerInfo* root, int levels_needed, List* initial_rels); extern void generate_gather_paths(PlannerInfo *root, RelOptInfo *rel); +extern int compute_parallel_worker(RelOptInfo *rel, BlockNumber heap_pages, BlockNumber index_pages); extern void set_rel_size(PlannerInfo* root, RelOptInfo* rel, Index rti, RangeTblEntry* rte); diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index 634f4c4e3..f768eb095 100755 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -178,7 +178,7 @@ extern Plan* create_remotelimit_plan(PlannerInfo* root, Plan* local_plan); extern List* pgxc_order_qual_clauses(PlannerInfo* root, List* clauses); extern List* pgxc_build_relation_tlist(RelOptInfo* rel); extern void pgxc_copy_path_costsize(Plan* dest, Path* src); -extern Plan* pgxc_create_gating_plan(PlannerInfo* root, Plan* plan, List* quals); +extern Plan* pgxc_create_gating_plan(PlannerInfo* root, Plan* plan, List* quals, bool parallel_safe); #endif extern void expand_dfs_tables(PlannerInfo* root); extern void expand_internal_rtentry(PlannerInfo* root, RangeTblEntry* rte, Index rti); diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index e04ffd1d3..c4d21bff4 100755 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -311,6 +311,8 @@ typedef struct StdRdOptions { char* end_ctid_internal; char *merge_list; bool on_commit_delete_rows; /* global temp table */ + + int parallel_workers; /* max number of parallel workers */ } StdRdOptions; #define HEAP_MIN_FILLFACTOR 10 @@ -326,6 +328,15 @@ typedef struct StdRdOptions { ((relation)->rd_rel->relkind == RELKIND_RELATION || (relation)->rd_rel->relkind == RELKIND_MATVIEW) ? \ ((StdRdOptions*)(relation)->rd_options)->user_catalog_table : false) +/* + * RelationGetParallelWorkers + * Returns the relation's parallel_workers reloption setting. + * Note multiple eval of argument! + */ +#define RelationGetParallelWorkers(relation, defaultpw) \ + ((relation)->rd_options ? \ + ((StdRdOptions *) (relation)->rd_options)->parallel_workers : (defaultpw)) + #define RelationIsInternal(relation) (RelationGetInternalMask(relation) != INTERNAL_MASK_DISABLE) /* diff --git a/src/include/workload/ctxctl.h b/src/include/workload/ctxctl.h index eee781813..7bc4a5272 100644 --- a/src/include/workload/ctxctl.h +++ b/src/include/workload/ctxctl.h @@ -328,6 +328,11 @@ struct WLMContextLock { (void)pthread_cond_signal(condition); } + void ConditionWakeUpAll(pthread_cond_t* condition) + { + (void)pthread_cond_broadcast(condition); + } + private: pthread_mutex_t* m_mutex; bool m_isLocked; diff --git a/src/test/regress/expected/hw_smp.out b/src/test/regress/expected/hw_smp.out index 4e94a9328..8115cb1c8 100644 --- a/src/test/regress/expected/hw_smp.out +++ b/src/test/regress/expected/hw_smp.out @@ -3,6 +3,7 @@ -- Create @ 2017-7-11 -- --dynamic smp+random plan, plan should be same +set max_parallel_workers_per_gather=0; set plan_mode_seed = 1485495508; explain (costs off) select count(node_name) as dncnt from pgxc_node where node_type='D' @@ -469,3 +470,4 @@ drop cascades to table store_sales_extend_max_1t drop cascades to table item_inventory drop cascades to table item_inventory_plan drop cascades to table item_store_sold +reset max_parallel_workers_per_gather; diff --git a/src/test/regress/expected/parallel_append.out b/src/test/regress/expected/parallel_append.out index 2639657e8..a43d4faa8 100644 --- a/src/test/regress/expected/parallel_append.out +++ b/src/test/regress/expected/parallel_append.out @@ -33,21 +33,21 @@ explain select * from a union select * from b; Group By Key: a.a1, a.a2, a.a3 --? -> Append (cost=.* rows=10 width=12) --? -> Gather (cost=.* rows=5 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on a (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on a (cost=.* rows=3 width=12) --? -> Gather (cost=.* rows=5 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on b (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on b (cost=.* rows=3 width=12) (9 rows) explain select * from a union all select * from b; --?.*QUERY PLAN.* --?-----------.* --? Gather (cost=.* rows=10 width=12) - Number of Workers: 3 + Number of Workers: 2 --? -> Parallel Append (cost=.* rows=4 width=12) ---? -> Parallel Seq Scan on a (cost=.* rows=2 width=12) ---? -> Parallel Seq Scan on b (cost=.* rows=2 width=12) +--? -> Parallel Seq Scan on a (cost=.* rows=3 width=12) +--? -> Parallel Seq Scan on b (cost=.* rows=3 width=12) (5 rows) explain select * from a where a1 > 4 union select * from b where b1 < 6; @@ -57,11 +57,11 @@ explain select * from a where a1 > 4 union select * from b where b1 < 6; Group By Key: a.a1, a.a2, a.a3 --? -> Append (cost=.* rows=3 width=12) --? -> Gather (cost=.* rows=1 width=12) - Number of Workers: 3 + Number of Workers: 1 --? -> Parallel Seq Scan on a (cost=.* rows=1 width=12) Filter: (a1 > 4) --? -> Gather (cost=.* rows=2 width=12) - Number of Workers: 3 + Number of Workers: 1 --? -> Parallel Seq Scan on b (cost=.* rows=1 width=12) Filter: (b1 < 6) (11 rows) @@ -72,11 +72,11 @@ explain select * from a where a1 > 4 union all select * from b where b1 < 6; --? Result (cost=.* rows=3 width=12) --? -> Append (cost=.* rows=3 width=12) --? -> Gather (cost=.* rows=1 width=12) - Number of Workers: 3 + Number of Workers: 1 --? -> Parallel Seq Scan on a (cost=.* rows=1 width=12) Filter: (a1 > 4) --? -> Gather (cost=.* rows=2 width=12) - Number of Workers: 3 + Number of Workers: 1 --? -> Parallel Seq Scan on b (cost=.* rows=1 width=12) Filter: (b1 < 6) (10 rows) @@ -101,14 +101,14 @@ explain select * from (select * from a union all select * from b) as ta, c where --? Hash Join (cost=.* rows=5 width=24) Hash Cond: (a.a1 = c.c1) --? -> Gather (cost=.* rows=10 width=12) - Number of Workers: 3 + Number of Workers: 2 --? -> Parallel Append (cost=.* rows=4 width=12) ---? -> Parallel Seq Scan on a (cost=.* rows=2 width=12) ---? -> Parallel Seq Scan on b (cost=.* rows=2 width=12) +--? -> Parallel Seq Scan on a (cost=.* rows=3 width=12) +--? -> Parallel Seq Scan on b (cost=.* rows=3 width=12) --? -> Hash (cost=.* rows=5 width=12) --? -> Gather (cost=.* rows=5 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on c (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on c (cost=.* rows=3 width=12) (11 rows) explain select * from d left outer join (select * from a union all select * from b) as t on d.d1=t.a1; @@ -117,14 +117,14 @@ explain select * from d left outer join (select * from a union all select * from --? Hash Right Join (cost=.* rows=10 width=24) Hash Cond: (a.a1 = d.d1) --? -> Gather (cost=.* rows=10 width=12) - Number of Workers: 3 + Number of Workers: 2 --? -> Parallel Append (cost=.* rows=4 width=12) ---? -> Parallel Seq Scan on a (cost=.* rows=2 width=12) ---? -> Parallel Seq Scan on b (cost=.* rows=2 width=12) +--? -> Parallel Seq Scan on a (cost=.* rows=3 width=12) +--? -> Parallel Seq Scan on b (cost=.* rows=3 width=12) --? -> Hash (cost=.* rows=10 width=12) --? -> Gather (cost=.* rows=10 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on d (cost=.* rows=3 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on d (cost=.* rows=6 width=12) (11 rows) explain select d.d1, sum(d.d2), sum(t.a2) from (select * from a union all select * from b) t, d where t.a1=d1 group by d.d1 order by 1,2; @@ -137,14 +137,14 @@ explain select d.d1, sum(d.d2), sum(t.a2) from (select * from a union all select --? -> Hash Join (cost=.* rows=10 width=12) Hash Cond: (a.a1 = d.d1) --? -> Gather (cost=.* rows=10 width=8) - Number of Workers: 3 + Number of Workers: 2 --? -> Parallel Append (cost=.* rows=4 width=8) ---? -> Parallel Seq Scan on a (cost=.* rows=2 width=8) ---? -> Parallel Seq Scan on b (cost=.* rows=2 width=8) +--? -> Parallel Seq Scan on a (cost=.* rows=3 width=8) +--? -> Parallel Seq Scan on b (cost=.* rows=3 width=8) --? -> Hash (cost=.* rows=10 width=8) --? -> Gather (cost=.* rows=10 width=8) - Number of Workers: 3 ---? -> Parallel Seq Scan on d (cost=.* rows=3 width=8) + Number of Workers: 1 +--? -> Parallel Seq Scan on d (cost=.* rows=6 width=8) (15 rows) select * from a union select * from b; @@ -258,12 +258,12 @@ explain select * from c except select * from b where b1 >4; --? -> Append (cost=.* rows=10 width=12) --? -> Subquery Scan on "*SELECT* 1" (cost=.* rows=5 width=12) --? -> Gather (cost=.* rows=5 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on c (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on c (cost=.* rows=3 width=12) --? -> Subquery Scan on "*SELECT* 2" (cost=.* rows=5 width=12) --? -> Gather (cost=.* rows=5 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on b (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on b (cost=.* rows=3 width=12) Filter: (b1 > 4) (11 rows) @@ -274,12 +274,12 @@ explain select * from c except all select * from b where b1 >4; --? -> Append (cost=.* rows=10 width=12) --? -> Subquery Scan on "*SELECT* 1" (cost=.* rows=5 width=12) --? -> Gather (cost=.* rows=5 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on c (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on c (cost=.* rows=3 width=12) --? -> Subquery Scan on "*SELECT* 2" (cost=.* rows=5 width=12) --? -> Gather (cost=.* rows=5 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on b (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on b (cost=.* rows=3 width=12) Filter: (b1 > 4) (11 rows) @@ -309,12 +309,12 @@ explain select * from e intersect select * from c; --? -> Append (cost=.* rows=12 width=12) --? -> Subquery Scan on "*SELECT* 2" (cost=.* rows=5 width=12) --? -> Gather (cost=.* rows=5 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on c (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on c (cost=.* rows=3 width=12) --? -> Subquery Scan on "*SELECT* 1" (cost=.* rows=7 width=12) --? -> Gather (cost=.* rows=7 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on e (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on e (cost=.* rows=4 width=12) (10 rows) explain select * from e intersect all select * from c where c1 != 8; @@ -324,13 +324,13 @@ explain select * from e intersect all select * from c where c1 != 8; --? -> Append (cost=.* rows=11 width=12) --? -> Subquery Scan on "*SELECT* 2" (cost=.* rows=4 width=12) --? -> Gather (cost=.* rows=4 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on c (cost=.* rows=1 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on c (cost=.* rows=2 width=12) Filter: (c1 <> 8) --? -> Subquery Scan on "*SELECT* 1" (cost=.* rows=7 width=12) --? -> Gather (cost=.* rows=7 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on e (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on e (cost=.* rows=4 width=12) (11 rows) -------------------------------------- @@ -409,27 +409,28 @@ select * from b union all (select * from (select * from a union all select * fro 8 | 8 | 8 (15 rows) -select * from (select * from a union all select * from b)as x, (select * from d union all select* from e)as y where x.a1 = y.d1; +select * from (select * from a union all select * from b)as x, (select * from d union all select* from e)as y + where x.a1 = y.d1 order by 1, 2, 3, 4, 5, 6; a1 | a2 | a3 | d1 | d2 | d3 ----+----+----+----+----+---- - 4 | 4 | 4 | 4 | 4 | 4 - 4 | 4 | 4 | 4 | 4 | 4 1 | 1 | 1 | 1 | 2 | 3 - 2 | 2 | 2 | 2 | 3 | 4 - 3 | 3 | 3 | 3 | 4 | 5 - 4 | 4 | 4 | 4 | 5 | 6 - 4 | 4 | 4 | 4 | 5 | 6 - 5 | 5 | 5 | 5 | 6 | 7 - 5 | 5 | 5 | 5 | 6 | 7 1 | 1 | 1 | 1 | 3 | 2 + 2 | 2 | 2 | 2 | 3 | 4 2 | 2 | 2 | 2 | 4 | 4 + 3 | 3 | 3 | 3 | 4 | 5 3 | 3 | 3 | 3 | 5 | 6 + 4 | 4 | 4 | 4 | 4 | 4 + 4 | 4 | 4 | 4 | 4 | 4 + 4 | 4 | 4 | 4 | 5 | 6 + 4 | 4 | 4 | 4 | 5 | 6 4 | 4 | 4 | 4 | 6 | 8 4 | 4 | 4 | 4 | 6 | 8 - 5 | 5 | 5 | 5 | 7 | 10 - 5 | 5 | 5 | 5 | 7 | 10 5 | 5 | 5 | 5 | 5 | 5 5 | 5 | 5 | 5 | 5 | 5 + 5 | 5 | 5 | 5 | 6 | 7 + 5 | 5 | 5 | 5 | 6 | 7 + 5 | 5 | 5 | 5 | 7 | 10 + 5 | 5 | 5 | 5 | 7 | 10 6 | 6 | 6 | 6 | 6 | 6 8 | 8 | 8 | 8 | 8 | 8 (20 rows) @@ -441,8 +442,8 @@ explain select * from e intersect (select * from a except select * from b union --? -> Append (cost=.* rows=17 width=12) --? -> Subquery Scan on "*SELECT* 1" (cost=.* rows=7 width=12) --? -> Gather (cost=.* rows=7 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on e (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on e (cost=.* rows=4 width=12) --? -> Result (cost=.* rows=10 width=12) --? -> HashAggregate (cost=.* rows=10 width=12) Group By Key: "*SELECT* 2".a1, "*SELECT* 2".a2, "*SELECT* 2".a3 @@ -452,15 +453,15 @@ explain select * from e intersect (select * from a except select * from b union --? -> Append (cost=.* rows=10 width=12) --? -> Subquery Scan on "*SELECT* 2" (cost=.* rows=5 width=12) --? -> Gather (cost=.* rows=5 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on a (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on a (cost=.* rows=3 width=12) --? -> Subquery Scan on "*SELECT* 3" (cost=.* rows=5 width=12) --? -> Gather (cost=.* rows=5 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on b (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on b (cost=.* rows=3 width=12) --? -> Gather (cost=.* rows=5 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on c (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on c (cost=.* rows=3 width=12) (24 rows) explain select d2 from d except all (select d2 from d except select c1 from c) union all select e1 from e; @@ -472,22 +473,22 @@ explain select d2 from d except all (select d2 from d except select c1 from c) u --? -> Append (cost=.* rows=16 width=4) --? -> Subquery Scan on "*SELECT* 1" (cost=.* rows=10 width=4) --? -> Gather (cost=.* rows=10 width=4) - Number of Workers: 3 ---? -> Parallel Seq Scan on d (cost=.* rows=3 width=4) + Number of Workers: 1 +--? -> Parallel Seq Scan on d (cost=.* rows=6 width=4) --? -> Result (cost=.* rows=6 width=4) --? -> HashSetOp Except (cost=.* rows=6 width=4) --? -> Append (cost=.* rows=15 width=4) --? -> Subquery Scan on "*SELECT* 2" (cost=.* rows=10 width=4) --? -> Gather (cost=.* rows=10 width=4) - Number of Workers: 3 ---? -> Parallel Seq Scan on d (cost=.* rows=3 width=4) + Number of Workers: 1 +--? -> Parallel Seq Scan on d (cost=.* rows=6 width=4) --? -> Subquery Scan on "*SELECT* 3" (cost=.* rows=5 width=4) --? -> Gather (cost=.* rows=5 width=4) - Number of Workers: 3 ---? -> Parallel Seq Scan on c (cost=.* rows=2 width=4) + Number of Workers: 1 +--? -> Parallel Seq Scan on c (cost=.* rows=3 width=4) --? -> Gather (cost=.* rows=7 width=4) - Number of Workers: 3 ---? -> Parallel Seq Scan on e (cost=.* rows=2 width=4) + Number of Workers: 1 +--? -> Parallel Seq Scan on e (cost=.* rows=4 width=4) (22 rows) explain select * from a union all (select * from b union select * from c where c1 < 5); @@ -495,16 +496,16 @@ explain select * from a union all (select * from b union select * from c where c --?------------.* --? Append (cost=.* rows=12 width=12) --? -> Gather (cost=.* rows=5 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on a (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on a (cost=.* rows=3 width=12) --? -> HashAggregate (cost=.* rows=7 width=12) Group By Key: b.b1, b.b2, b.b3 --? -> Append (cost=.* rows=7 width=12) --? -> Gather (cost=.* rows=5 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on b (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on b (cost=.* rows=3 width=12) --? -> Gather (cost=.* rows=2 width=12) - Number of Workers: 3 + Number of Workers: 1 --? -> Parallel Seq Scan on c (cost=.* rows=1 width=12) Filter: (c1 < 5) (14 rows) @@ -520,45 +521,48 @@ explain select * from a except select * from b union select * from c; --? -> Append (cost=.* rows=10 width=12) --? -> Subquery Scan on "*SELECT* 1" (cost=.* rows=5 width=12) --? -> Gather (cost=.* rows=5 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on a (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on a (cost=.* rows=3 width=12) --? -> Subquery Scan on "*SELECT* 2" (cost=.* rows=5 width=12) --? -> Gather (cost=.* rows=5 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on b (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on b (cost=.* rows=3 width=12) --? -> Gather (cost=.* rows=5 width=12) - Number of Workers: 3 ---? -> Parallel Seq Scan on c (cost=.* rows=2 width=12) + Number of Workers: 1 +--? -> Parallel Seq Scan on c (cost=.* rows=3 width=12) (17 rows) explain select * from b union all (select * from (select * from a union all select * from b)); --?.* QUERY PLAN.* --?---------.* --? Gather (cost=.* rows=15 width=12) - Number of Workers: 3 + Number of Workers: 2 --? -> Parallel Append (cost=.* rows=6 width=12) ---? -> Parallel Seq Scan on b (cost=.* rows=2 width=12) ---? -> Parallel Seq Scan on a (cost=.* rows=2 width=12) ---? -> Parallel Seq Scan on b (cost=.* rows=2 width=12) +--? -> Parallel Seq Scan on b (cost=.* rows=3 width=12) +--? -> Parallel Seq Scan on a (cost=.* rows=3 width=12) +--? -> Parallel Seq Scan on b (cost=.* rows=3 width=12) (6 rows) -explain select * from (select * from a union all select * from b)as x, (select * from d union all select* from e)as y where x.a1 = y.d1; +explain select * from (select * from a union all select * from b)as x, (select * from d union all select* from e)as y + where x.a1 = y.d1 order by 1, 2, 3, 4, 5, 6; --?.* QUERY PLAN.* --?--------.* ---? Hash Join (cost=.* rows=10 width=24) - Hash Cond: (d.d1 = a.a1) ---? -> Gather (cost=.* rows=17 width=12) - Number of Workers: 3 ---? -> Parallel Append (cost=.* rows=5 width=12) ---? -> Parallel Seq Scan on d (cost=.* rows=3 width=12) ---? -> Parallel Seq Scan on e (cost=.* rows=2 width=12) ---? -> Hash (cost=.* rows=10 width=12) ---? -> Gather (cost=.* rows=10 width=12) - Number of Workers: 3 ---? -> Parallel Append (cost=.* rows=4 width=12) ---? -> Parallel Seq Scan on a (cost=.* rows=2 width=12) ---? -> Parallel Seq Scan on b (cost=.* rows=2 width=12) -(13 rows) +--? Sort (cost=.* rows=10 width=24) + Sort Key: a.a1, a.a2, a.a3, d.d2, d.d3 +--? -> Hash Join (cost=.* rows=10 width=24) + Hash Cond: (d.d1 = a.a1) +--? -> Gather (cost=.* rows=17 width=12) + Number of Workers: 2 +--? -> Parallel Append (cost=.* rows=7 width=12) +--? -> Parallel Seq Scan on d (cost=.* rows=6 width=12) +--? -> Parallel Seq Scan on e (cost=.* rows=4 width=12) +--? -> Hash (cost=.* rows=10 width=12) +--? -> Gather (cost=.* rows=10 width=12) + Number of Workers: 2 +--? -> Parallel Append (cost=.* rows=4 width=12) +--? -> Parallel Seq Scan on a (cost=.* rows=3 width=12) +--? -> Parallel Seq Scan on b (cost=.* rows=3 width=12) +(15 rows) ---------------------------------------- -- clean up diff --git a/src/test/regress/expected/parallel_query.out b/src/test/regress/expected/parallel_query.out index c3cf35b5b..b95bfefa8 100644 --- a/src/test/regress/expected/parallel_query.out +++ b/src/test/regress/expected/parallel_query.out @@ -165,3 +165,126 @@ reset parallel_tuple_cost; reset max_parallel_workers_per_gather; reset min_parallel_table_scan_size; reset parallel_leader_participation; +--test parallel index scan +create table parallel_t2(a int, b int); +insert into parallel_t2 values(generate_series(1,100000), generate_series(1,100000)); +create index t2_idx on parallel_t2 using btree(a); +--set index scan parameter +set enable_seqscan to off; +set enable_bitmapscan to off; +--normal plan for index scan +explain (costs off) select count(b) from parallel_t2 where a > 5000; + QUERY PLAN +---------------------------------------------- + Aggregate + -> Index Scan using t2_idx on parallel_t2 + Index Cond: (a > 5000) +(3 rows) + +explain (costs off) select count(b) from parallel_t2 where a < 5000; + QUERY PLAN +---------------------------------------------- + Aggregate + -> Index Scan using t2_idx on parallel_t2 + Index Cond: (a < 5000) +(3 rows) + +select count(b) from parallel_t2 where a > 5000; + count +------- + 95000 +(1 row) + +select count(b) from parallel_t2 where a < 5000; + count +------- + 4999 +(1 row) + +--set parallel parameter +set force_parallel_mode=on; +set parallel_setup_cost=0; +set parallel_tuple_cost=0.000005; +set max_parallel_workers_per_gather=2; +set min_parallel_table_scan_size=0; +set min_parallel_index_scan_size=0; +set parallel_leader_participation=on; +--parallel plan for index scan +explain (costs off) select count(b) from parallel_t2 where a > 5000; + QUERY PLAN +------------------------------------------------------------- + Aggregate + -> Gather + Number of Workers: 2 + -> Parallel Index Scan using t2_idx on parallel_t2 + Index Cond: (a > 5000) +(5 rows) + +explain (costs off) select count(b) from parallel_t2 where a < 5000; + QUERY PLAN +------------------------------------------------------------- + Aggregate + -> Gather + Number of Workers: 2 + -> Parallel Index Scan using t2_idx on parallel_t2 + Index Cond: (a < 5000) +(5 rows) + +select count(b) from parallel_t2 where a > 5000; + count +------- + 95000 +(1 row) + +select count(b) from parallel_t2 where a < 5000; + count +------- + 4999 +(1 row) + +--set parallel_workers for parallel_t2 +alter table parallel_t2 set (parallel_workers = 1); +--parallel plan for index scan +explain (costs off) select count(b) from parallel_t2 where a > 5000; + QUERY PLAN +------------------------------------------------------------- + Aggregate + -> Gather + Number of Workers: 1 + -> Parallel Index Scan using t2_idx on parallel_t2 + Index Cond: (a > 5000) +(5 rows) + +explain (costs off) select count(b) from parallel_t2 where a < 5000; + QUERY PLAN +------------------------------------------------------------- + Aggregate + -> Gather + Number of Workers: 1 + -> Parallel Index Scan using t2_idx on parallel_t2 + Index Cond: (a < 5000) +(5 rows) + +select count(b) from parallel_t2 where a > 5000; + count +------- + 95000 +(1 row) + +select count(b) from parallel_t2 where a < 5000; + count +------- + 4999 +(1 row) + +--clean up +drop table parallel_t2; +reset enable_seqscan; +reset enable_bitmapscan; +reset force_parallel_mode; +reset parallel_setup_cost; +reset parallel_tuple_cost; +reset max_parallel_workers_per_gather; +reset min_parallel_table_scan_size; +reset parallel_leader_participation; +reset min_parallel_index_scan_size; \ No newline at end of file diff --git a/src/test/regress/expected/single_node_union.out b/src/test/regress/expected/single_node_union.out index 804359f62..409265ef4 100644 --- a/src/test/regress/expected/single_node_union.out +++ b/src/test/regress/expected/single_node_union.out @@ -657,17 +657,15 @@ select * from (select * from t3 a union all select * from t3 b) ss join int4_tbl on f1 = expensivefunc(x); QUERY PLAN -------------------------------------------------------- - Gather - Number of Workers: 2 - -> Hash Join - Hash Cond: (expensivefunc(a.x) = int4_tbl.f1) - -> Parallel Append - -> Parallel Seq Scan on t3 a - -> Parallel Seq Scan on t3 b - -> Hash - -> Seq Scan on int4_tbl -(9 rows) +------------------------------------------------- + Hash Join + Hash Cond: (expensivefunc(a.x) = int4_tbl.f1) + -> Append + -> Seq Scan on t3 a + -> Seq Scan on t3 b + -> Hash + -> Seq Scan on int4_tbl +(7 rows) select * from (select * from t3 a union all select * from t3 b) ss diff --git a/src/test/regress/sql/hw_smp.sql b/src/test/regress/sql/hw_smp.sql index cd79dc8be..8d1a14d0d 100644 --- a/src/test/regress/sql/hw_smp.sql +++ b/src/test/regress/sql/hw_smp.sql @@ -4,6 +4,7 @@ -- --dynamic smp+random plan, plan should be same +set max_parallel_workers_per_gather=0; set plan_mode_seed = 1485495508; explain (costs off) select count(node_name) as dncnt from pgxc_node where node_type='D' @@ -286,3 +287,4 @@ drop table if exists replication_02; drop table if exists replication_06; drop schema hw_smp cascade; +reset max_parallel_workers_per_gather; diff --git a/src/test/regress/sql/parallel_append.sql b/src/test/regress/sql/parallel_append.sql index ab39497d3..784f4b6d1 100644 --- a/src/test/regress/sql/parallel_append.sql +++ b/src/test/regress/sql/parallel_append.sql @@ -78,14 +78,16 @@ select d2 from d except all (select d2 from d except select c1 from c) union all select * from a union all (select * from b union select * from c where c1 < 5); select * from a except select * from b union select * from c; select * from b union all (select * from (select * from a union all select * from b)); -select * from (select * from a union all select * from b)as x, (select * from d union all select* from e)as y where x.a1 = y.d1; +select * from (select * from a union all select * from b)as x, (select * from d union all select* from e)as y + where x.a1 = y.d1 order by 1, 2, 3, 4, 5, 6; explain select * from e intersect (select * from a except select * from b union select * from c); explain select d2 from d except all (select d2 from d except select c1 from c) union all select e1 from e; explain select * from a union all (select * from b union select * from c where c1 < 5); explain select * from a except select * from b union select * from c; explain select * from b union all (select * from (select * from a union all select * from b)); -explain select * from (select * from a union all select * from b)as x, (select * from d union all select* from e)as y where x.a1 = y.d1; +explain select * from (select * from a union all select * from b)as x, (select * from d union all select* from e)as y + where x.a1 = y.d1 order by 1, 2, 3, 4, 5, 6; ---------------------------------------- -- clean up ---------------------------------------- diff --git a/src/test/regress/sql/parallel_query.sql b/src/test/regress/sql/parallel_query.sql index d26f9f8d5..a85e3aba9 100644 --- a/src/test/regress/sql/parallel_query.sql +++ b/src/test/regress/sql/parallel_query.sql @@ -39,4 +39,55 @@ reset parallel_setup_cost; reset parallel_tuple_cost; reset max_parallel_workers_per_gather; reset min_parallel_table_scan_size; -reset parallel_leader_participation; \ No newline at end of file +reset parallel_leader_participation; + +--test parallel index scan +create table parallel_t2(a int, b int); +insert into parallel_t2 values(generate_series(1,100000), generate_series(1,100000)); +create index t2_idx on parallel_t2 using btree(a); + +--set index scan parameter +set enable_seqscan to off; +set enable_bitmapscan to off; + +--normal plan for index scan +explain (costs off) select count(b) from parallel_t2 where a > 5000; +explain (costs off) select count(b) from parallel_t2 where a < 5000; +select count(b) from parallel_t2 where a > 5000; +select count(b) from parallel_t2 where a < 5000; + +--set parallel parameter +set force_parallel_mode=on; +set parallel_setup_cost=0; +set parallel_tuple_cost=0.000005; +set max_parallel_workers_per_gather=2; +set min_parallel_table_scan_size=0; +set min_parallel_index_scan_size=0; +set parallel_leader_participation=on; + +--parallel plan for index scan +explain (costs off) select count(b) from parallel_t2 where a > 5000; +explain (costs off) select count(b) from parallel_t2 where a < 5000; +select count(b) from parallel_t2 where a > 5000; +select count(b) from parallel_t2 where a < 5000; + +--set parallel_workers for parallel_t2 +alter table parallel_t2 set (parallel_workers = 1); + +--parallel plan for index scan +explain (costs off) select count(b) from parallel_t2 where a > 5000; +explain (costs off) select count(b) from parallel_t2 where a < 5000; +select count(b) from parallel_t2 where a > 5000; +select count(b) from parallel_t2 where a < 5000; + +--clean up +drop table parallel_t2; +reset enable_seqscan; +reset enable_bitmapscan; +reset force_parallel_mode; +reset parallel_setup_cost; +reset parallel_tuple_cost; +reset max_parallel_workers_per_gather; +reset min_parallel_table_scan_size; +reset parallel_leader_participation; +reset min_parallel_index_scan_size; \ No newline at end of file diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 8a2823d6a..7c463522f 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -148,6 +148,9 @@ BTPageOpaque BTPageOpaqueData BTPageStat BTPageState +BTParallelScanDesc +BTParallelScanDescData +BTPS_State BTScanOpaque BTScanOpaqueData BTScanPosData