diff --git a/src/common/backend/utils/misc/guc/guc_sql.cpp b/src/common/backend/utils/misc/guc/guc_sql.cpp index 8a989d820..79d21e17e 100755 --- a/src/common/backend/utils/misc/guc/guc_sql.cpp +++ b/src/common/backend/utils/misc/guc/guc_sql.cpp @@ -2584,6 +2584,20 @@ static void InitSqlConfigureNamesReal() NULL, NULL, NULL}, + {{"smp_thread_cost", + PGC_USERSET, + NODE_ALL, + QUERY_TUNING_COST, + gettext_noop("Sets the planner's estimate of the cost of a " + "smp thread cost."), + NULL}, + &u_sess->opt_cxt.smp_thread_cost, + DEFAULT_SMP_THREAD_COST, + 0, + DBL_MAX, + NULL, + NULL, + NULL}, {{"cpu_tuple_cost", PGC_USERSET, NODE_ALL, diff --git a/src/gausskernel/optimizer/path/costsize.cpp b/src/gausskernel/optimizer/path/costsize.cpp index caa2c27c0..a263fb486 100755 --- a/src/gausskernel/optimizer/path/costsize.cpp +++ b/src/gausskernel/optimizer/path/costsize.cpp @@ -998,6 +998,8 @@ void cost_index(IndexPath* path, PlannerInfo* root, double loop_count) double pages_fetched; bool ispartitionedindex = path->indexinfo->rel->isPartitionedTable; bool disable_path = false; + int dop = SET_DOP(path->path.dop); + if (enable_parametrized_path(root, baserel, (Path*)path) || (!u_sess->attr.attr_sql.enable_indexscan && !indexonly) || (!u_sess->attr.attr_sql.enable_indexonlyscan && indexonly)) { @@ -1219,7 +1221,7 @@ void cost_index(IndexPath* path, PlannerInfo* root, double loop_count) */ csquared = indexCorrelation * indexCorrelation; - run_cost += max_IO_cost + csquared * (min_IO_cost - max_IO_cost); + run_cost += (max_IO_cost + csquared * (min_IO_cost - max_IO_cost)) / dop; ereport(DEBUG2, (errmodule(MOD_OPT), @@ -1253,7 +1255,8 @@ void cost_index(IndexPath* path, PlannerInfo* root, double loop_count) else cpu_per_tuple = u_sess->attr.attr_sql.cpu_tuple_cost + qpqual_cost.per_tuple; - run_cost += cpu_per_tuple * tuples_fetched; + run_cost += u_sess->opt_cxt.smp_thread_cost * (dop - 1); + run_cost += cpu_per_tuple * tuples_fetched / dop; ereport(DEBUG2, (errmodule(MOD_OPT), diff --git a/src/gausskernel/optimizer/path/indxpath.cpp b/src/gausskernel/optimizer/path/indxpath.cpp index d1646ac7f..339101eae 100755 --- a/src/gausskernel/optimizer/path/indxpath.cpp +++ b/src/gausskernel/optimizer/path/indxpath.cpp @@ -914,7 +914,8 @@ static List* build_index_paths(PlannerInfo* root, RelOptInfo* rel, IndexOptInfo* bool index_is_ordered = false; bool index_only_scan = false; int indexcol; - + bool can_parallel = IS_STREAM_PLAN && (u_sess->opt_cxt.query_dop > 1) && (ST_BITMAPSCAN != scantype) && + (!rel->isPartitionedTable); /* * Check that index supports the desired scan type(s) */ @@ -1072,6 +1073,22 @@ static List* build_index_paths(PlannerInfo* root, RelOptInfo* rel, IndexOptInfo* upper_params, loop_count); result = lappend(result, ipath); + if (can_parallel) { + ipath = create_index_path(root, + index, + index_clauses, + clause_columns, + NIL, + NIL, + useful_pathkeys, + index_is_ordered ? ForwardScanDirection : NoMovementScanDirection, + index_only_scan, + outer_relids, + upper_params, + loop_count, + u_sess->opt_cxt.query_dop); + result = lappend(result, ipath); + } } /* @@ -1097,6 +1114,23 @@ static List* build_index_paths(PlannerInfo* root, RelOptInfo* rel, IndexOptInfo* upper_params, loop_count); result = lappend(result, ipath); + + if (can_parallel) { + ipath = create_index_path(root, + index, + index_clauses, + clause_columns, + NIL, + NIL, + useful_pathkeys, + BackwardScanDirection, + index_only_scan, + outer_relids, + upper_params, + loop_count, + u_sess->opt_cxt.query_dop); + result = lappend(result, ipath); + } } } diff --git a/src/gausskernel/optimizer/util/pathnode.cpp b/src/gausskernel/optimizer/util/pathnode.cpp index 969bdcd74..8a5a43f98 100755 --- a/src/gausskernel/optimizer/util/pathnode.cpp +++ b/src/gausskernel/optimizer/util/pathnode.cpp @@ -2356,7 +2356,7 @@ bool is_pwj_path(Path* pwjpath) */ IndexPath* create_index_path(PlannerInfo* root, IndexOptInfo* index, List* indexclauses, List* indexclausecols, List* indexorderbys, List* indexorderbycols, List* pathkeys, ScanDirection indexscandir, bool indexonly, - Relids required_outer, Bitmapset *upper_params, double loop_count) + Relids required_outer, Bitmapset *upper_params, double loop_count, int dop) { IndexPath* pathnode = makeNode(IndexPath); RelOptInfo* rel = index->rel; @@ -2370,7 +2370,7 @@ IndexPath* create_index_path(PlannerInfo* root, IndexOptInfo* index, List* index pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer, upper_params); pathnode->path.pathkeys = pathkeys; - + pathnode->path.dop = dop; /* Convert clauses to indexquals the executor can handle */ expand_indexqual_conditions(index, indexclauses, indexclausecols, &indexquals, &indexqualcols); diff --git a/src/gausskernel/process/stream/streamCore.cpp b/src/gausskernel/process/stream/streamCore.cpp index 375a888d8..d5352986d 100755 --- a/src/gausskernel/process/stream/streamCore.cpp +++ b/src/gausskernel/process/stream/streamCore.cpp @@ -56,6 +56,7 @@ #include "utils/combocid.h" #include "vecexecutor/vecstream.h" #include "access/hash.h" +#include "access/nbtree.h" #include "pgstat.h" #include "tcop/tcopprot.h" #include "distributelayer/streamCore.h" @@ -1904,6 +1905,68 @@ void StreamNodeGroup::MarkRecursiveVfdInvalid() } } +void StreamNodeGroup::BuildStreamDesc(const uint64& queryId, Plan* node) +{ + StreamKey streamKey; + streamKey.queryId = queryId; + streamKey.planNodeId = node->plan_node_id; + + void* parallelDesc = NULL; + + switch (nodeTag(node)) { + case T_IndexScan: + parallelDesc = palloc0(sizeof(ParallelIndexScanDescData)); + ((ParallelIndexScanDescData*)parallelDesc)->ps_indexid = ((IndexScan*)node)->indexid; + ((ParallelIndexScanDescData*)parallelDesc)->ps_relid = ((IndexScan*)node)->scan.scanrelid; + ((ParallelIndexScanDescData*)parallelDesc)->ps_btpscan = btbuildparallelscan(); + break; + default: + break; + } + + if (!parallelDesc) { + return; + } + + m_streamDesc.insert({streamKey, parallelDesc}); +} + +void StreamNodeGroup::DestroyStreamDesc(const uint64& queryId, Plan* node) +{ + StreamKey streamKey; + streamKey.queryId = queryId; + streamKey.planNodeId = node->plan_node_id; + + std::unordered_map::iterator iter; + + switch (nodeTag(node)) { + case T_IndexScan: + iter = m_streamDesc.find(streamKey); + if (m_streamDesc.end() == iter) { + return; + } + if (iter->second) { + if (((ParallelIndexScanDescData*)iter->second)->ps_btpscan) { + delete ((ParallelIndexScanDescData*)iter->second)->ps_btpscan; + } + pfree(iter->second); + } + m_streamDesc.erase(streamKey); + break; + default: + break; + } +} + +void* StreamNodeGroup::GetParalleDesc(const uint64& queryId, const uint64& planNodeId) +{ + StreamKey key; + key.queryId = queryId; + key.planNodeId = planNodeId; + std::unordered_map::iterator iter = m_streamDesc.find(key); + return (m_streamDesc.end() == iter) ? NULL : iter->second; +} + #ifndef ENABLE_MULTIPLE_NODES bool InitStreamObject(PlannedStmt* planStmt) { diff --git a/src/gausskernel/runtime/executor/execMain.cpp b/src/gausskernel/runtime/executor/execMain.cpp index a47d07511..89ad637bb 100755 --- a/src/gausskernel/runtime/executor/execMain.cpp +++ b/src/gausskernel/runtime/executor/execMain.cpp @@ -1522,6 +1522,7 @@ void InitPlan(QueryDesc *queryDesc, int eflags) (IS_SPQ_COORDINATOR && list_nth_int(plannedstmt->subplan_ids, i - 1) != 0) || #endif plannedstmt->planTree->plan_node_id == list_nth_int(plannedstmt->subplan_ids, i - 1))) { + estate->es_under_subplan = true; subplanstate = ExecInitNode(subplan, estate, sp_eflags); @@ -1552,6 +1553,7 @@ void InitPlan(QueryDesc *queryDesc, int eflags) plan->initPlan = plannedstmt->initPlan; estate->es_subplan_ids = plannedstmt->subplan_ids; } + planstate = ExecInitNode(plan, estate, eflags); if (estate->pruningResult) { diff --git a/src/gausskernel/runtime/executor/nodeIndexscan.cpp b/src/gausskernel/runtime/executor/nodeIndexscan.cpp index 0d5684ea1..fc5ccd305 100644 --- a/src/gausskernel/runtime/executor/nodeIndexscan.cpp +++ b/src/gausskernel/runtime/executor/nodeIndexscan.cpp @@ -259,6 +259,8 @@ void ExecReScanIndexScan(IndexScanState* node) scan_handler_idx_rescan( node->iss_ScanDesc, node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_OrderByKeys, node->iss_NumOrderByKeys); + scan_handler_idx_rescan_parallel(node->iss_ScanDesc); + ExecScanReScan(&node->ss); } @@ -463,8 +465,12 @@ void ExecEndIndexScan(IndexScanState* node) /* * close the index relation (no-op if we didn't open it) */ - if (index_scan_desc) + if (index_scan_desc) { scan_handler_idx_endscan(index_scan_desc); + if (WorkerThreadAmI() && node->ss.ps.plan->dop > 1) { + u_sess->stream_cxt.global_obj->DestroyStreamDesc(node->ss.ps.state->es_plannedstmt->queryId, node->ss.ps.plan); + } + } /* * close the index relation (no-op if we didn't open it) @@ -599,14 +605,25 @@ void ExecInitIndexRelation(IndexScanState* node, EState* estate, int eflags) } } + ParallelIndexScanDescData *paralleDesc = NULL; + if (u_sess->stream_cxt.global_obj && node->ss.ps.plan->dop > 1) { + if (WorkerThreadAmI()) { + u_sess->stream_cxt.global_obj->BuildStreamDesc(estate->es_plannedstmt->queryId, index_state->ss.ps.plan); + } + paralleDesc = (ParallelIndexScanDescData*)u_sess->stream_cxt.global_obj->GetParalleDesc(estate->es_plannedstmt->queryId, index_state->ss.ps.plan->plan_node_id); + if (WorkerThreadAmI()) + scan_handler_idx_parallelscan_initialize(current_relation, index_state->iss_RelationDesc, paralleDesc); + } + /* Initialize scan descriptor for partitioned table */ - index_state->iss_ScanDesc = scan_handler_idx_beginscan(index_state->ss.ss_currentPartition, - index_state->iss_CurrentIndexPartition, + index_state->iss_ScanDesc = scan_handler_idx_beginscan(current_relation, + index_state->iss_RelationDesc, scanSnap, index_state->iss_NumScanKeys, index_state->iss_NumOrderByKeys, - (ScanState*)index_state); - } + (ScanState*)index_state, + paralleDesc); + } } } else { /* diff --git a/src/gausskernel/storage/access/hbstore/hbindex_am.cpp b/src/gausskernel/storage/access/hbstore/hbindex_am.cpp index 20de90970..1eac6115a 100644 --- a/src/gausskernel/storage/access/hbstore/hbindex_am.cpp +++ b/src/gausskernel/storage/access/hbstore/hbindex_am.cpp @@ -545,15 +545,20 @@ static HeapTuple cross_level_index_getnext(IndexScanDesc scan, ScanDirection dir * ------------------------------------------------------------------------ */ -IndexScanDesc scan_handler_idx_beginscan(Relation heap_relation, Relation index_relation, Snapshot snapshot, int nkeys, int norderbys, ScanState* scan_state) +IndexScanDesc scan_handler_idx_beginscan(Relation heap_relation, Relation index_relation, Snapshot snapshot, int nkeys, int norderbys, ScanState* scan_state, + ParallelIndexScanDesc pscan) { if (unlikely(RELATION_OWN_BUCKET(heap_relation))) { return hbkt_idx_beginscan(heap_relation, index_relation, snapshot, nkeys, norderbys, scan_state); } else { - return index_beginscan(heap_relation, index_relation, snapshot, nkeys, norderbys, scan_state); + return index_beginscan(heap_relation, index_relation, snapshot, nkeys, norderbys, scan_state, pscan); } } +void scan_handler_idx_parallelscan_initialize(Relation heap_relation, Relation index_relation, ParallelIndexScanDesc p_index_scan) { + index_parallelscan_initialize(heap_relation, index_relation, p_index_scan); +} + IndexScanDesc scan_handler_idx_beginscan_bitmap(Relation indexRelation, Snapshot snapshot, int nkeys, ScanState* scan_state) { if (unlikely(RELATION_OWN_BUCKET(indexRelation))) { @@ -574,6 +579,12 @@ void scan_handler_idx_rescan(IndexScanDesc scan, ScanKey key, int nkeys, ScanKey } } +void scan_handler_idx_rescan_parallel(IndexScanDesc scan) +{ + Assert(scan != NULL); + index_rescan_parallel(scan); +} + void scan_handler_idx_rescan_local(IndexScanDesc scan, ScanKey key, int nkeys, ScanKey orderbys, int norderbys) { Assert(scan != NULL); diff --git a/src/gausskernel/storage/access/index/indexam.cpp b/src/gausskernel/storage/access/index/indexam.cpp index 999398d2b..377ec244b 100644 --- a/src/gausskernel/storage/access/index/indexam.cpp +++ b/src/gausskernel/storage/access/index/indexam.cpp @@ -83,6 +83,7 @@ #include "storage/predicate.h" #include "storage/procarray.h" #include "storage/smgr/smgr.h" +#include "access/nbtree.h" #include "access/ustore/knl_uvisibility.h" #include "access/ustore/knl_uscan.h" #include "utils/snapmgr.h" @@ -143,7 +144,8 @@ } \ } while (0) -static IndexScanDesc index_beginscan_internal(Relation index_relation, int nkeys, int norderbys, Snapshot snapshot); +static IndexScanDesc index_beginscan_internal(Relation index_relation, int nkeys, int norderbys, Snapshot snapshot + ,ParallelIndexScanDesc pscan = NULL); /* ---------------- * index_open - open an index relation by relation OID @@ -246,11 +248,12 @@ bool index_insert(Relation index_relation, Datum *values, const bool *isnull, It * Caller must be holding suitable locks on the heap and the index. */ IndexScanDesc index_beginscan( - Relation heap_relation, Relation index_relation, Snapshot snapshot, int nkeys, int norderbys, ScanState* scan_state) + Relation heap_relation, Relation index_relation, Snapshot snapshot, int nkeys, int norderbys, ScanState* scan_state, + ParallelIndexScanDesc pscan) { IndexScanDesc scan; - scan = index_beginscan_internal(index_relation, nkeys, norderbys, snapshot); + scan = index_beginscan_internal(index_relation, nkeys, norderbys, snapshot, pscan); /* * Save additional parameters into the scandesc. Everything else was set @@ -295,7 +298,8 @@ IndexScanDesc index_beginscan_bitmap(Relation index_relation, Snapshot snapshot, /* * index_beginscan_internal --- common code for index_beginscan variants */ -static IndexScanDesc index_beginscan_internal(Relation index_relation, int nkeys, int norderbys, Snapshot snapshot) +static IndexScanDesc index_beginscan_internal(Relation index_relation, int nkeys, int norderbys, Snapshot snapshot, + ParallelIndexScanDesc pscan) { IndexScanDesc scan; FmgrInfo *procedure = NULL; @@ -324,6 +328,9 @@ static IndexScanDesc index_beginscan_internal(Relation index_relation, int nkeys scan->spq_scan = NULL; #endif + /* Initialize information for parallel scan. */ + scan->parallel_scan = pscan; + return scan; } @@ -371,6 +378,18 @@ void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, } } +/* ---------------- + * index_rescan_parallel - reset parallel index scan variables + * ---------------- + */ + +void index_rescan_parallel(IndexScanDesc scan) +{ + if (scan->parallel_scan) { + btparallelrescan(scan); + } +} + /* ---------------- * index_endscan - end a scan * ---------------- @@ -476,6 +495,33 @@ void index_restrpos(IndexScanDesc scan) } +/* + * index_parallelscan_initialize - initialize parallel scan + * + * We initialize both the ParallelIndexScanDesc proper and the AM-specific + * information which follows it. + * + * This function calls access method specific initialization routine to + * initialize am specific information. Call this just once in the leader + * process; then, individual workers attach via index_beginscan_parallel. + */ +void +index_parallelscan_initialize(Relation heap_relation, Relation index_relation, + ParallelIndexScanDesc target) +{ + if (!target) { + return; + } + Size offset; + + RELATION_CHECKS; + + target->ps_relid = RelationGetRelid(heap_relation); + target->ps_indexid = RelationGetRelid(index_relation); + + btinitparallelscan(target->ps_btpscan); +} + /* ---------------- * index_getnext_tid - get the next TID from a scan * diff --git a/src/gausskernel/storage/access/nbtree/nbtree.cpp b/src/gausskernel/storage/access/nbtree/nbtree.cpp index 9c120610b..825f2f639 100644 --- a/src/gausskernel/storage/access/nbtree/nbtree.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtree.cpp @@ -17,6 +17,9 @@ * * ------------------------------------------------------------------------- */ +#include +#include + #include "postgres.h" #include "knl/knl_variable.h" #include "access/nbtree.h" @@ -25,6 +28,7 @@ #include "access/xlog.h" #include "catalog/index.h" #include "commands/vacuum.h" +#include "pgstat.h" #include "storage/indexfsm.h" #include "storage/ipc.h" #include "storage/lmgr.h" @@ -50,6 +54,46 @@ typedef struct { MemoryContext pagedelcontext; } BTVacState; +/* + * BTPARALLEL_NOT_INITIALIZED indicates that the scan has not started. + * + * BTPARALLEL_ADVANCING indicates that some process is advancing the scan to + * a new page; others must wait. + * + * BTPARALLEL_IDLE indicates that no backend is currently advancing the scan + * to a new page; some process can start doing that. + * + * BTPARALLEL_DONE indicates that the scan is complete (including error exit). + * We reach this state once for every distinct combination of array keys. + */ +typedef enum +{ + BTPARALLEL_NOT_INITIALIZED, + BTPARALLEL_ADVANCING, + BTPARALLEL_IDLE, + BTPARALLEL_DONE +} BTPS_State; + +/* + * BTParallelScanDescData contains btree specific shared information required + * for parallel scan. + */ +typedef struct BTParallelScanDescData +{ + BlockNumber btps_scanPage; /* latest or next page to be scanned */ + BTPS_State btps_pageStatus;/* indicates whether next page is available + * for scan. see above for possible states of + * parallel scan. */ + int btps_arrayKeyCount; /* count indicating number of array + * scan keys processed by parallel + * scan */ + + std::mutex btps_mutex; /* protects above variables */ + std::condition_variable btps_cv; /* used to synchronize parallel scan */ +} BTParallelScanDescData; + +typedef struct BTParallelScanDescData *BTParallelScanDesc; + static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, IndexBulkDeleteCallback callback, void *callback_state, BTCycleId cycleid); static void btvacuumpage(BTVacState *vstate, BlockNumber blkno, BlockNumber orig_blkno); @@ -480,6 +524,7 @@ IndexScanDesc btbeginscan_internal(Relation rel, int nkeys, int norderbys) so->arrayContext = NULL; so->killedItems = NULL; /* until needed */ so->numKilled = 0; + so->arrayKeyCount = 0; /* * We don't know yet whether the scan will be index-only, so we do not @@ -703,6 +748,201 @@ void btrestrpos_internal(IndexScanDesc scan) } } + +/* + * btinitparallelscan -- initialize BTParallelScanDesc for parallel btree scan + */ +void +btinitparallelscan(void *target) +{ + BTParallelScanDesc bt_target = (BTParallelScanDesc) target; + + std::unique_lock lck(bt_target->btps_mutex); + bt_target->btps_scanPage = InvalidBlockNumber; + bt_target->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + bt_target->btps_arrayKeyCount = 0; +} + +/* + * btparallelrescan() -- reset parallel scan + */ +void +btparallelrescan(IndexScanDesc scan) +{ + BTParallelScanDesc btscan; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + + Assert(parallel_scan); + + btscan = (BTParallelScanDesc) parallel_scan->ps_btpscan; + + /* + * In theory, we don't need to acquire the spinlock here, because there + * shouldn't be any other workers running at this point, but we do so for + * consistency. + */ + std::unique_lock lck(btscan->btps_mutex); + btscan->btps_scanPage = InvalidBlockNumber; + btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + btscan->btps_arrayKeyCount = 0; +} + +/* + * _bt_parallel_seize() -- Begin the process of advancing the scan to a new + * page. Other scans must wait until we call bt_parallel_release() or + * bt_parallel_done(). + * + * The return value is true if we successfully seized the scan and false + * if we did not. The latter case occurs if no pages remain for the current + * set of scankeys. + * + * If the return value is true, *pageno returns the next or current page + * of the scan (depending on the scan direction). An invalid block number + * means the scan hasn't yet started, and P_NONE means we've reached the end. + * The first time a participating process reaches the last page, it will return + * true and set *pageno to P_NONE; after that, further attempts to seize the + * scan will return false. + * + * Callers should ignore the value of pageno if the return value is false. + */ +bool +_bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + BTPS_State pageStatus; + bool exit_loop = false; + bool status = true; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan; + + *pageno = P_NONE; + + btscan = (BTParallelScanDesc) (parallel_scan->ps_btpscan); + + while (1) + { + std::unique_lock lck(btscan->btps_mutex); + pageStatus = btscan->btps_pageStatus; + + if (so->arrayKeyCount < btscan->btps_arrayKeyCount) + { + /* Parallel scan has already advanced to a new set of scankeys. */ + status = false; + } + else if (pageStatus == BTPARALLEL_DONE) + { + /* + * We're done with this set of scankeys. This may be the end, or + * there could be more sets to try. + */ + status = false; + } + else if (pageStatus != BTPARALLEL_ADVANCING) + { + /* + * We have successfully seized control of the scan for the purpose + * of advancing it to a new page! + */ + btscan->btps_pageStatus = BTPARALLEL_ADVANCING; + *pageno = btscan->btps_scanPage; + exit_loop = true; + } + if (exit_loop || !status) + break; + + btscan->btps_cv.wait(lck); + } + + return status; +} + +/* + * _bt_parallel_release() -- Complete the process of advancing the scan to a + * new page. We now have the new value btps_scanPage; some other backend + * can now begin advancing the scan. + */ +void +_bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page) +{ + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan; + + btscan = (BTParallelScanDesc) (parallel_scan->ps_btpscan); + + { + std::unique_lock lck(btscan->btps_mutex); + btscan->btps_scanPage = scan_page; + btscan->btps_pageStatus = BTPARALLEL_IDLE; + } + btscan->btps_cv.notify_one(); +} + +/* + * _bt_parallel_done() -- Mark the parallel scan as complete. + * + * When there are no pages left to scan, this function should be called to + * notify other workers. Otherwise, they might wait forever for the scan to + * advance to the next page. + */ +void +_bt_parallel_done(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan; + bool status_changed = false; + + /* Do nothing, for non-parallel scans */ + if (parallel_scan == NULL) + return; + + btscan = (BTParallelScanDesc) (parallel_scan->ps_btpscan); + + /* + * Mark the parallel scan as done for this combination of scan keys, + * unless some other process already did so. See also + * _bt_advance_array_keys. + */ + { + std::unique_lock lck(btscan->btps_mutex); + if (so->arrayKeyCount >= btscan->btps_arrayKeyCount + && btscan->btps_pageStatus != BTPARALLEL_DONE) { + btscan->btps_pageStatus = BTPARALLEL_DONE; + status_changed = true; + } + } + + /* wake up all the workers associated with this parallel scan */ + if (status_changed) + btscan->btps_cv.notify_all(); +} + +/* + * _bt_parallel_advance_array_keys() -- Advances the parallel scan for array + * keys. + * + * Updates the count of array keys processed for both local and parallel + * scans. + */ +void +_bt_parallel_advance_array_keys(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan; + + btscan = (BTParallelScanDesc) (parallel_scan->ps_btpscan); + + so->arrayKeyCount++; + std::unique_lock lck(btscan->btps_mutex); + if (btscan->btps_pageStatus == BTPARALLEL_DONE) + { + btscan->btps_scanPage = InvalidBlockNumber; + btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + btscan->btps_arrayKeyCount++; + } +} + /* * Bulk deletion of all index entries pointing to a set of heap tuples. * The set of target tuples is specified via a callback routine that tells @@ -1396,3 +1636,13 @@ static BTVacuumPosting btree_vacuum_posting(BTVacState *vac_state, IndexTuple po *num_remaining = num_live; return vacposting; } + +/* + * btestimateparallelscan -- estimate storage for BTParallelScanDescData + */ +void* +btbuildparallelscan(void) +{ + void *bt_pscan = (void*)new BTParallelScanDescData; + return bt_pscan; +} diff --git a/src/gausskernel/storage/access/nbtree/nbtsearch.cpp b/src/gausskernel/storage/access/nbtree/nbtsearch.cpp index 1d59776cf..6f5beffc8 100644 --- a/src/gausskernel/storage/access/nbtree/nbtsearch.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtsearch.cpp @@ -35,9 +35,12 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber off static void _bt_saveitem(BTScanOpaque so, int itemIndex, OffsetNumber offnum, IndexTuple itup, Oid partOid, int2 bucketid); static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir); +static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir); +static bool _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, + ScanDirection dir); static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir); static void _bt_check_natts_correct(const Relation index, bool heapkeyspace, Page page, OffsetNumber offnum); - +static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir); static int btree_setup_posting_items(BTScanOpaque so, int itemIndex, OffsetNumber offnum, ItemPointer heapTid, IndexTuple itup); static void btree_save_posting_item(BTScanOpaque so, int itemIndex, OffsetNumber offnum, ItemPointer heapTid, @@ -592,8 +595,10 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) ScanKeyData notnullkeys[INDEX_MAX_KEYS]; int keysCount = 0; int i; + bool status = true; StrategyNumber strat_total; BTScanPosItem *currItem = NULL; + BlockNumber blkno; pgstat_count_index_scan(rel); @@ -610,6 +615,30 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) if (!so->qual_ok) return false; + /* + * For parallel scans, get the starting page from shared state. If the + * scan has not started, proceed to find out first leaf page in the usual + * way while keeping other participating processes waiting. If the scan + * has already begun, use the page number from the shared structure. + */ + if (scan->parallel_scan != NULL) + { + status = _bt_parallel_seize(scan, &blkno); + if (!status) + return false; + else if (blkno == P_NONE) + { + _bt_parallel_done(scan); + return false; + } + else if (blkno != InvalidBlockNumber) + { + if (!_bt_parallel_readpage(scan, blkno, dir)) + return false; + goto readcomplete; + } + } + /* ---------- * Examine the scan keys to discover where we need to start the scan. * @@ -775,7 +804,19 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) * there. */ if (keysCount == 0) - return _bt_endpoint(scan, dir); + { + bool match; + + match = _bt_endpoint(scan, dir); + + if (!match) + { + /* No match, so mark (parallel) scan finished */ + _bt_parallel_done(scan); + } + + return match; + } /* * We want to start the scan somewhere within the index. Set up an @@ -802,7 +843,10 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) ScanKey subkey = (ScanKey)DatumGetPointer(cur->sk_argument); Assert(subkey->sk_flags & SK_ROW_MEMBER); if (subkey->sk_flags & SK_ISNULL) + { + _bt_parallel_done(scan); return false; + } inskey.scankeys[i] = *subkey; /* @@ -1003,21 +1047,19 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) * because nothing finer to lock exists. */ PredicateLockRelation(rel, scan->xs_snapshot); + + /* + * mark parallel scan as done, so that all the workers can finish + * their scan + */ + _bt_parallel_done(scan); return false; } else PredicateLockPage(rel, BufferGetBlockNumber(buf), scan->xs_snapshot); - /* initialize moreLeft/moreRight appropriately for scan direction */ - if (ScanDirectionIsForward(dir)) { - so->currPos.moreLeft = false; - so->currPos.moreRight = true; - } else { - so->currPos.moreLeft = true; - so->currPos.moreRight = false; - } - so->numKilled = 0; /* just paranoia */ - so->markItemIndex = -1; /* ditto */ + _bt_initialize_more_data(so, dir); + { /* position to the precise item on the page */ int posting_off = 0; offnum = _bt_binsrch(rel, &inskey, buf, &posting_off); @@ -1057,7 +1099,9 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) /* Drop the lock, but not pin, on the current page */ LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK); + } +readcomplete: /* OK, itemIndex says what to return */ currItem = &so->currPos.items[so->currPos.itemIndex]; scan->xs_ctup.t_self = currItem->heapTid; @@ -1149,6 +1193,10 @@ bool _bt_next(IndexScanDesc scan, ScanDirection dir) * moreLeft or moreRight (as appropriate) is cleared if _bt_checkkeys reports * that there can be no more matching tuples in the current scan direction. * + * In the case of a parallel scan, caller must have called _bt_parallel_seize + * prior to calling this function; this function will invoke + * _bt_parallel_release before returning. + * * Returns true if any matching items found on the page, false if none. */ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum) @@ -1175,6 +1223,16 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber off page = BufferGetPage(so->currPos.buf); opaque = (BTPageOpaqueInternal)PageGetSpecialPointer(page); + + /* allow next page be processed by parallel worker */ + if (scan->parallel_scan) + { + if (ScanDirectionIsForward(dir)) + _bt_parallel_release(scan, opaque->btpo_next); + else + _bt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf)); + } + minoff = P_FIRSTDATAKEY(opaque); maxoff = PageGetMaxOffsetNumber(page); @@ -1314,8 +1372,8 @@ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) { BTScanOpaque so = (BTScanOpaque)scan->opaque; Relation rel; - Page page; - BTPageOpaqueInternal opaque; + BlockNumber blkno = InvalidBlockNumber; + bool status; /* we must have the buffer pinned and locked */ Assert(BufferIsValid(so->currPos.buf)); @@ -1347,52 +1405,148 @@ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) rel = scan->indexRelation; if (ScanDirectionIsForward(dir)) { + /* release the previous buffer, if pinned */ + _bt_relbuf(rel, so->currPos.buf); + so->currPos.buf = InvalidBuffer; + /* Walk right to the next page with data */ - /* We must rely on the previously saved nextPage link! */ - BlockNumber blkno = so->currPos.nextPage; + if (scan->parallel_scan != NULL) + { + /* + * Seize the scan to get the next block number; if the scan has + * ended already, bail out. + */ + status = _bt_parallel_seize(scan, &blkno); + if (!status) + { + return false; + } + } + else + { + /* Not parallel, so use the previously-saved nextPage link. */ + blkno = so->currPos.nextPage; + } /* Remember we left a page with data */ so->currPos.moreLeft = true; - for (;;) { - /* release the previous buffer */ - _bt_relbuf(rel, so->currPos.buf); - so->currPos.buf = InvalidBuffer; - /* if we're at end of scan, give up */ - if (blkno == P_NONE || !so->currPos.moreRight) + } else { + /* Remember we left a page with data */ + so->currPos.moreRight = true; + + if (scan->parallel_scan != NULL) + { + /* + * Seize the scan to get the current block number; if the scan has + * ended already, bail out. + */ + status = _bt_parallel_seize(scan, &blkno); + if (!status) + { + so->currPos.buf = InvalidBuffer; return false; + } + } + } + + if (!_bt_readnextpage(scan, blkno, dir)) + return false; + + return true; +} + +/* + * _bt_readnextpage() -- Read next page containing valid data for scan + * + * On success exit, so->currPos is updated to contain data from the next + * interesting page. Caller is responsible to release lock and pin on + * buffer on success. We return true to indicate success. + * + * If there are no more matching records in the given direction, we drop all + * locks and pins, set so->currPos.buf to InvalidBuffer, and return false. + */ +static bool +_bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + Relation rel; + Page page; + BTPageOpaqueInternal opaque; + bool status; + + rel = scan->indexRelation; + + if (ScanDirectionIsForward(dir)) + { + for (;;) + { + /* + * if we're at end of scan, give up and mark parallel scan as + * done, so that all the workers can finish their scan + */ + if (blkno == P_NONE || !so->currPos.moreRight) + { + _bt_parallel_done(scan); + return false; + } /* check for interrupts while we're not holding any buffer lock */ CHECK_FOR_INTERRUPTS(); /* step right one page */ so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ); - /* check for deleted page */ page = BufferGetPage(so->currPos.buf); - opaque = (BTPageOpaqueInternal)PageGetSpecialPointer(page); - if (!P_IGNORE(opaque)) { + opaque = BTPageGetOpaqueInternal(page); + /* check for deleted page */ + if (!P_IGNORE(opaque)) + { PredicateLockPage(rel, blkno, scan->xs_snapshot); /* see if there are any matches on this page */ /* note that this will clear moreRight if we can stop */ if (_bt_readpage(scan, dir, P_FIRSTDATAKEY(opaque))) break; } - /* nope, keep going */ - blkno = opaque->btpo_next; - } - } else { - /* Remember we left a page with data */ - so->currPos.moreRight = true; + else if (scan->parallel_scan != NULL) + { + /* allow next page be processed by parallel worker */ + _bt_parallel_release(scan, opaque->btpo_next); + } + /* release the previous buffer */ + _bt_relbuf(rel, so->currPos.buf); + so->currPos.buf = InvalidBuffer; + + /* nope, keep going */ + if (scan->parallel_scan != NULL) + { + status = _bt_parallel_seize(scan, &blkno); + if (!status) + { + return false; + } + } + else + { + blkno = opaque->btpo_next; + } + } + } + else + { /* * Walk left to the next page with data. This is much more complex * than the walk-right case because of the possibility that the page * to our left splits while we are in flight to it, plus the * possibility that the page we were on gets deleted after we leave - * it. See nbtree/README for details. + * it. See nbtree/README for details. */ - for (;;) { + + for (;;) + { /* Done if we know there are no matching keys to the left */ - if (!so->currPos.moreLeft) { + if (!so->currPos.moreLeft) + { _bt_relbuf(rel, so->currPos.buf); + _bt_parallel_done(scan); so->currPos.buf = InvalidBuffer; return false; } @@ -1404,7 +1558,10 @@ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) /* if we're physically at end of index, return failure */ if (so->currPos.buf == InvalidBuffer) + { + _bt_parallel_done(scan); return false; + } /* * Okay, we managed to move left to a non-deleted page. Done if @@ -1412,20 +1569,65 @@ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) * and do it all again. */ page = BufferGetPage(so->currPos.buf); - opaque = (BTPageOpaqueInternal)PageGetSpecialPointer(page); - if (!P_IGNORE(opaque)) { + opaque = BTPageGetOpaqueInternal(page); + if (!P_IGNORE(opaque)) + { PredicateLockPage(rel, BufferGetBlockNumber(so->currPos.buf), scan->xs_snapshot); /* see if there are any matches on this page */ /* note that this will clear moreLeft if we can stop */ if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page))) break; } + else if (scan->parallel_scan != NULL) + { + /* allow next page be processed by parallel worker */ + _bt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf)); + } + + /* + * For parallel scans, get the last page scanned as it is quite + * possible that by the time we try to seize the scan, some other + * worker has already advanced the scan to a different page. We + * must continue based on the latest page scanned by any worker. + */ + if (scan->parallel_scan != NULL) + { + _bt_relbuf(rel, so->currPos.buf); + status = _bt_parallel_seize(scan, &blkno); + if (!status) + { + so->currPos.buf = InvalidBuffer; + return false; + } + so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ); + } } } return true; } +/* + * _bt_parallel_readpage() -- Read current page containing valid data for scan + * + * On success, release lock and maybe pin on buffer. We return true to + * indicate success. + */ +static bool +_bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + + _bt_initialize_more_data(so, dir); + + if (!_bt_readnextpage(scan, blkno, dir)) + return false; + + /* Drop the lock, but not pin, on the new page */ + LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK); + return true; +} + /* * _bt_walk_left() -- step left one page, if possible * @@ -1667,16 +1869,7 @@ static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir) /* remember which buffer we have pinned */ so->currPos.buf = buf; - /* initialize moreLeft/moreRight appropriately for scan direction */ - if (ScanDirectionIsForward(dir)) { - so->currPos.moreLeft = false; - so->currPos.moreRight = true; - } else { - so->currPos.moreLeft = true; - so->currPos.moreRight = false; - } - so->numKilled = 0; /* just paranoia */ - so->markItemIndex = -1; /* ditto */ + _bt_initialize_more_data(so, dir); /* * Now load data from the first page of the scan. @@ -1889,3 +2082,26 @@ static inline void btree_save_posting_item(BTScanOpaque so, int item_idx, Offset if (so->currTuples) curr_item->tupleOffset = tuple_offset; } + +/* + * _bt_initialize_more_data() -- initialize moreLeft/moreRight appropriately + * for scan direction + */ +static inline void +_bt_initialize_more_data(BTScanOpaque so, ScanDirection dir) +{ + /* initialize moreLeft/moreRight appropriately for scan direction */ + if (ScanDirectionIsForward(dir)) + { + so->currPos.moreLeft = false; + so->currPos.moreRight = true; + } + else + { + so->currPos.moreLeft = true; + so->currPos.moreRight = false; + } + so->numKilled = 0; /* just paranoia */ + so->markItemIndex = -1; /* ditto */ +} + diff --git a/src/gausskernel/storage/access/nbtree/nbtutils.cpp b/src/gausskernel/storage/access/nbtree/nbtutils.cpp index 93d3ffba9..1c477b6b0 100644 --- a/src/gausskernel/storage/access/nbtree/nbtutils.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtutils.cpp @@ -554,6 +554,10 @@ bool _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir) } } + /* advance parallel scan */ + if (scan->parallel_scan != NULL) + _bt_parallel_advance_array_keys(scan); + return found; } diff --git a/src/include/access/genam.h b/src/include/access/genam.h index fbf04d175..ac48b2f58 100644 --- a/src/include/access/genam.h +++ b/src/include/access/genam.h @@ -86,6 +86,7 @@ typedef bool (*IndexBulkDeleteCallback)(ItemPointer itemptr, void* state, Oid pa typedef struct IndexScanDescData* IndexScanDesc; typedef struct SysScanDescData* SysScanDesc; struct ScanState; +typedef struct ParallelIndexScanDescData *ParallelIndexScanDesc; /* * Enumeration specifying the type of uniqueness check to perform in @@ -133,9 +134,11 @@ extern bool index_insert(Relation indexRelation, Datum* values, const bool* isnu Relation heapRelation, IndexUniqueCheck checkUnique); extern IndexScanDesc index_beginscan( - Relation heapRelation, Relation indexRelation, Snapshot snapshot, int nkeys, int norderbys, ScanState* scan_state=NULL); + Relation heapRelation, Relation indexRelation, Snapshot snapshot, int nkeys, int norderbys, ScanState* scan_state=NULL, ParallelIndexScanDesc pscan=NULL); +extern void index_parallelscan_initialize(Relation heap_relation, Relation index_relation, ParallelIndexScanDesc p_index_scan); extern IndexScanDesc index_beginscan_bitmap(Relation indexRelation, Snapshot snapshot, int nkeys, ScanState* scan_state=NULL); extern void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys); +extern void index_rescan_parallel(IndexScanDesc scan); extern void index_endscan(IndexScanDesc scan); extern void index_markpos(IndexScanDesc scan); extern void index_restrpos(IndexScanDesc scan); diff --git a/src/include/access/hbindex_am.h b/src/include/access/hbindex_am.h index 6de8943e2..8aa2765bd 100644 --- a/src/include/access/hbindex_am.h +++ b/src/include/access/hbindex_am.h @@ -49,9 +49,11 @@ extern bool hbkt_idx_bitmapscan_switch_bucket(IndexScanDesc scan, int targetSlot extern bool cbi_scan_need_fix_hbkt_rel(IndexScanDesc scan, int2 bucketid = InvalidBktId); extern bool cbi_scan_fix_hbkt_rel(HBktIdxScanDesc hpScan); extern IndexScanDesc scan_handler_idx_beginscan(Relation heap_relation, Relation index_relation, Snapshot snapshot, - int nkeys, int norderbys, ScanState* scan_state = NULL); + int nkeys, int norderbys, ScanState* scan_state = NULL, ParallelIndexScanDesc pscan = NULL); + extern void scan_handler_idx_parallelscan_initialize(Relation heap_relation, Relation index_relation, ParallelIndexScanDesc p_index_scan); extern IndexScanDesc scan_handler_idx_beginscan_bitmap(Relation indexRelation, Snapshot snapshot, int nkeys, ScanState* scan_state); extern void scan_handler_idx_rescan(IndexScanDesc scan, ScanKey key, int nkeys, ScanKey orderbys, int norderbys); +extern void scan_handler_idx_rescan_parallel(IndexScanDesc scan); extern void scan_handler_idx_rescan_local(IndexScanDesc scan, ScanKey key, int nkeys, ScanKey orderbys, int norderbys); extern void scan_handler_idx_endscan(IndexScanDesc scan); extern void scan_handler_idx_markpos(IndexScanDesc scan); diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index c34fad5c0..395d07001 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -107,6 +107,7 @@ typedef struct UBTPageOpaqueData { } UBTPageOpaqueData; typedef UBTPageOpaqueData* UBTPageOpaque; +#define BTPageGetOpaqueInternal(page) ((BTPageOpaqueInternal) PageGetSpecialPointer(page)) typedef struct BTDedupIntervalData { OffsetNumber base_off; @@ -961,6 +962,8 @@ typedef struct BTScanOpaqueData { ScanKey arrayKeyData; /* modified copy of scan->keyData */ int numArrayKeys; /* number of equality-type array keys (-1 if * there are any unsatisfiable array keys) */ + int arrayKeyCount; /* count indicating number of array scan keys + * processed */ BTArrayKeyInfo* arrayKeys; /* info about each equality-type array key */ MemoryContext arrayContext; /* scan-lifespan context for array data */ @@ -1282,6 +1285,8 @@ extern Datum btbuild(PG_FUNCTION_ARGS); extern Datum btbuildempty(PG_FUNCTION_ARGS); extern Datum btinsert(PG_FUNCTION_ARGS); extern Datum btbeginscan(PG_FUNCTION_ARGS); +extern void* btbuildparallelscan(void); +extern void btinitparallelscan(void *target); extern Datum btgettuple(PG_FUNCTION_ARGS); extern Datum btgetbitmap(PG_FUNCTION_ARGS); extern Datum cbtreegetbitmap(PG_FUNCTION_ARGS); @@ -1294,6 +1299,16 @@ extern Datum btvacuumcleanup(PG_FUNCTION_ARGS); extern Datum btcanreturn(PG_FUNCTION_ARGS); extern Datum btoptions(PG_FUNCTION_ARGS); +extern void btparallelrescan(IndexScanDesc scan); +extern void btinitparallelscan(void *target); +/* + * prototypes for internal functions in nbtree.c + */ +extern bool _bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno); +extern void _bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page); +extern void _bt_parallel_done(IndexScanDesc scan); +extern void _bt_parallel_advance_array_keys(IndexScanDesc scan); + extern inline IndexBuildResult *btbuild_internal(Relation heap, Relation index, IndexInfo *index_info); extern inline void btbuildempty_internal(Relation index); @@ -1326,7 +1341,6 @@ extern inline IndexBuildResult *btmerge_internal(Relation dstIdxRel, List *srcId * thought, we are not going to implement them right now. */ extern Datum btmerge(PG_FUNCTION_ARGS); - /* * prototypes for functions in nbtinsert.c */ diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index d3f40dd24..878f657fd 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -170,6 +170,8 @@ typedef struct IndexScanDescData { SPQScanDesc spq_scan; #endif IndexFetchTableData *xs_heapfetch; + /* parallel index scan information, in global variables */ + struct ParallelIndexScanDescData *parallel_scan; /* put decompressed heap tuple data into xs_ctbuf_hdr be careful! when malloc memory should give extra mem for *xs_ctbuf_hdr. t_bits which is varlength arr */ @@ -177,6 +179,14 @@ typedef struct IndexScanDescData { /* DO NOT add any other members here. xs_ctbuf_hdr must be the last one. */ } IndexScanDescData; +/* Generic structure for parallel scans */ +typedef struct ParallelIndexScanDescData +{ + Oid ps_relid; + Oid ps_indexid; + void* ps_btpscan; +} ParallelIndexScanDescData; + #define SizeofIndexScanDescData (offsetof(IndexScanDescData, xs_ctbuf_hdr) + SizeofHeapTupleHeader) /* Get partition heap oid for bitmap index scan */ diff --git a/src/include/c.h b/src/include/c.h index 468768a34..5c02faa24 100644 --- a/src/include/c.h +++ b/src/include/c.h @@ -722,6 +722,8 @@ typedef struct pathData { * True iff pointer is properly aligned to point to the given type. */ #define PointerIsAligned(pointer, type) (((intptr_t)(pointer) % (sizeof(type))) == 0) +#define OffsetToPointer(base, offset) \ + ((void *)((char *) base + offset)) #define OidIsValid(objectId) ((bool)((objectId) != InvalidOid)) diff --git a/src/include/distributelayer/streamCore.h b/src/include/distributelayer/streamCore.h index a117a3032..b4dacb7e4 100755 --- a/src/include/distributelayer/streamCore.h +++ b/src/include/distributelayer/streamCore.h @@ -28,6 +28,7 @@ #define SRC_INCLUDE_DISTRIBUTELAYER_STREAMCORE_H_ #include +#include #include "postgres.h" #include "knl/knl_variable.h" @@ -442,6 +443,12 @@ public: /* Send stop signal to all stream threads in node group. */ void SigStreamThreadClose(); + void BuildStreamDesc(const uint64& queryId, Plan* node); + + void* GetParalleDesc(const uint64& queryId, const uint64& planNodeId); + + void DestroyStreamDesc(const uint64& queryId, Plan* node); + struct PortalData *m_portal; #endif /* Mark recursive vfd is invalid before aborting transaction. */ @@ -519,6 +526,21 @@ private: /* Mark Stream query quit status. */ StreamObjStatus m_quitStatus; #endif + struct KeyHash { + std::size_t operator()(const StreamKey& k) const + { + return std::hash()(k.queryId) ^ + (std::hash()(k.planNodeId) << 1); + } + }; + + struct KeyEqual { + bool operator()(const StreamKey& lhs, const StreamKey& rhs) const + { + return lhs.queryId == rhs.queryId && lhs.planNodeId == rhs.planNodeId; + } + }; + std::unordered_map m_streamDesc; }; extern bool IsThreadProcessStreamRecursive(); diff --git a/src/include/executor/node/nodeIndexscan.h b/src/include/executor/node/nodeIndexscan.h index 0fdbf67a5..c3441dafc 100644 --- a/src/include/executor/node/nodeIndexscan.h +++ b/src/include/executor/node/nodeIndexscan.h @@ -14,6 +14,7 @@ #ifndef NODEINDEXSCAN_H #define NODEINDEXSCAN_H +#include "executor/exec/execStream.h" #include "nodes/execnodes.h" extern IndexScanState* ExecInitIndexScan(IndexScan* node, EState* estate, int eflags); diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 6b214e360..7bd3bbf18 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -3642,6 +3642,16 @@ inline bool StreamTopConsumerAmI() return (t_thrd.subrole == TOP_CONSUMER); } +inline bool WorkerThreadAmI() +{ + return (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER); +} + +inline bool StreamWorkerThreadAmI() +{ + return (t_thrd.role == STREAM_WORKER); +} + inline bool WLMThreadAmI() { return (t_thrd.role == WLM_WORKER || t_thrd.role == WLM_MONITOR || diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index c9e5af177..91dee74f2 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -61,7 +61,7 @@ extern Path *create_tsstorescan_path(PlannerInfo * root,RelOptInfo * rel, int do #endif /* ENABLE_MULTIPLE_NODES */ extern IndexPath* create_index_path(PlannerInfo* root, IndexOptInfo* index, List* indexclauses, List* indexclausecols, List* indexorderbys, List* indexorderbycols, List* pathkeys, ScanDirection indexscandir, bool indexonly, - Relids required_outer, Bitmapset *upper_params, double loop_count); + Relids required_outer, Bitmapset *upper_params, double loop_count, int dop = 1); extern Path* build_seqScanPath_by_indexScanPath(PlannerInfo* root, Path* index_path); extern bool CheckBitmapQualIsGlobalIndex(Path* bitmapqual); extern bool CheckBitmapHeapPathContainGlobalOrLocal(Path* bitmapqual); diff --git a/src/test/regress/input/parallel_index_scan.source b/src/test/regress/input/parallel_index_scan.source new file mode 100644 index 000000000..99a20da5e --- /dev/null +++ b/src/test/regress/input/parallel_index_scan.source @@ -0,0 +1,35 @@ +drop schema if exists test_parallel_index_scan cascade; +create schema test_parallel_index_scan; +set current_schema='test_parallel_index_scan'; +-- create test table and index +DROP TABLE IF EXISTS parallel_index_01; +CREATE TABLE parallel_index_01(a int, b int); +INSERT INTO parallel_index_01 VALUES (generate_series(1, 1000000), generate_series(1,1000000)); +CREATE INDEX index_parallel_index_01 ON parallel_index_01(a); +SET enable_seqscan = OFF; +SET enable_bitmapscan = OFF; +SET enable_indexonlyscan = OFF; +SET query_dop = 2; +-- encourage optimizer to choose parallel path +SET smp_thread_cost = 0; + +-- parallel index scan in equality case +SELECT * FROM parallel_index_01 WHERE a=100; +SELECT * FROM parallel_index_01 WHERE a=100 AND a=10000; +SELECT * FROM parallel_index_01 WHERE a=100 AND b=100; +SELECT * FROM parallel_index_01 WHERE a=100 AND b=200; + +-- parallel index scan in scope case +EXPLAIN (COSTS OFF) SELECT * FROM parallel_index_01 WHERE a<10000; +SELECT COUNT(b) FROM parallel_index_01 WHERE a<10000; +EXPLAIN (COSTS OFF) SELECT * FROM parallel_index_01 WHERE a>10000 AND a<20000; +SELECT COUNT(b) FROM parallel_index_01 WHERE a>10000 AND a<20000; +EXPLAIN (COSTS OFF) SELECT * FROM parallel_index_01 WHERE a in (1000,10000,100000) ORDER BY a; +SELECT * FROM parallel_index_01 WHERE a in (1000,10000,100000) ORDER BY a; + +--cleanup env +reset enable_seqscan; +reset enable_bitmapscan; +reset enable_indexonlyscan; +reset query_dop; +reset smp_thread_cost; diff --git a/src/test/regress/output/parallel_index_scan.source b/src/test/regress/output/parallel_index_scan.source new file mode 100644 index 000000000..7501a417f --- /dev/null +++ b/src/test/regress/output/parallel_index_scan.source @@ -0,0 +1,92 @@ +drop schema if exists test_parallel_index_scan cascade; +NOTICE: schema "test_parallel_index_scan" does not exist, skipping +create schema test_parallel_index_scan; +set current_schema='test_parallel_index_scan'; +-- create test table and index +DROP TABLE IF EXISTS parallel_index_01; +NOTICE: table "parallel_index_01" does not exist, skipping +CREATE TABLE parallel_index_01(a int, b int); +INSERT INTO parallel_index_01 VALUES (generate_series(1, 1000000), generate_series(1,1000000)); +CREATE INDEX index_parallel_index_01 ON parallel_index_01(a); +SET enable_seqscan = OFF; +SET enable_bitmapscan = OFF; +SET enable_indexonlyscan = OFF; +SET query_dop = 2; +-- encourage optimizer to choose parallel path +SET smp_thread_cost = 0; +-- parallel index scan in equality case +SELECT * FROM parallel_index_01 WHERE a=100; + a | b +-----+----- + 100 | 100 +(1 row) + +SELECT * FROM parallel_index_01 WHERE a=100 AND a=10000; + a | b +---+--- +(0 rows) + +SELECT * FROM parallel_index_01 WHERE a=100 AND b=100; + a | b +-----+----- + 100 | 100 +(1 row) + +SELECT * FROM parallel_index_01 WHERE a=100 AND b=200; + a | b +---+--- +(0 rows) + +-- parallel index scan in scope case +EXPLAIN (COSTS OFF) SELECT * FROM parallel_index_01 WHERE a<10000; + QUERY PLAN +--------------------------------------------------------------------- + Streaming(type: LOCAL GATHER dop: 1/2) + -> Index Scan using index_parallel_index_01 on parallel_index_01 + Index Cond: (a < 10000) +(3 rows) + +SELECT COUNT(b) FROM parallel_index_01 WHERE a<10000; + count +------- + 9999 +(1 row) + +EXPLAIN (COSTS OFF) SELECT * FROM parallel_index_01 WHERE a>10000 AND a<20000; + QUERY PLAN +--------------------------------------------------------------------- + Streaming(type: LOCAL GATHER dop: 1/2) + -> Index Scan using index_parallel_index_01 on parallel_index_01 + Index Cond: ((a > 10000) AND (a < 20000)) +(3 rows) + +SELECT COUNT(b) FROM parallel_index_01 WHERE a>10000 AND a<20000; + count +------- + 9999 +(1 row) + +EXPLAIN (COSTS OFF) SELECT * FROM parallel_index_01 WHERE a in (1000,10000,100000) ORDER BY a; + QUERY PLAN +--------------------------------------------------------------------------- + Sort + Sort Key: a + -> Streaming(type: LOCAL GATHER dop: 1/2) + -> Index Scan using index_parallel_index_01 on parallel_index_01 + Index Cond: (a = ANY ('{1000,10000,100000}'::integer[])) +(5 rows) + +SELECT * FROM parallel_index_01 WHERE a in (1000,10000,100000) ORDER BY a; + a | b +--------+-------- + 1000 | 1000 + 10000 | 10000 + 100000 | 100000 +(3 rows) + +--cleanup env +reset enable_seqscan; +reset enable_bitmapscan; +reset enable_indexonlyscan; +reset query_dop; +reset smp_thread_cost;