diff --git a/src/common/backend/utils/sort/tuplesort.cpp b/src/common/backend/utils/sort/tuplesort.cpp index bfb6ab2da..3a1133d74 100644 --- a/src/common/backend/utils/sort/tuplesort.cpp +++ b/src/common/backend/utils/sort/tuplesort.cpp @@ -1968,8 +1968,6 @@ static bool tuplesort_gettuple_common(Tuplesortstate* state, bool forward, SortT case TSS_SORTEDONTAPE: Assert(forward || state->randomAccess); - Assert(state->slabAllocatorUsed); - /* * The slot that held the tuple that we returned in previous * gettuple call can now be reused. diff --git a/src/gausskernel/optimizer/path/costsize.cpp b/src/gausskernel/optimizer/path/costsize.cpp index e1edaba99..44c0afe96 100755 --- a/src/gausskernel/optimizer/path/costsize.cpp +++ b/src/gausskernel/optimizer/path/costsize.cpp @@ -998,6 +998,8 @@ void cost_index(IndexPath* path, PlannerInfo* root, double loop_count) double pages_fetched; bool ispartitionedindex = path->indexinfo->rel->isPartitionedTable; bool disable_path = false; + int dop = SET_DOP(path->path.dop); + if (enable_parametrized_path(root, baserel, (Path*)path) || (!u_sess->attr.attr_sql.enable_indexscan && !indexonly) || (!u_sess->attr.attr_sql.enable_indexonlyscan && indexonly)) { @@ -1219,7 +1221,11 @@ void cost_index(IndexPath* path, PlannerInfo* root, double loop_count) */ csquared = indexCorrelation * indexCorrelation; - run_cost += max_IO_cost + csquared * (min_IO_cost - max_IO_cost); + if (dop == 0) { + run_cost += (max_IO_cost + csquared * (min_IO_cost - max_IO_cost)); + } else { + run_cost += (max_IO_cost + csquared * (min_IO_cost - max_IO_cost)) / dop; + } ereport(DEBUG2, (errmodule(MOD_OPT), @@ -1253,7 +1259,12 @@ void cost_index(IndexPath* path, PlannerInfo* root, double loop_count) else cpu_per_tuple = u_sess->attr.attr_sql.cpu_tuple_cost + qpqual_cost.per_tuple; - run_cost += cpu_per_tuple * tuples_fetched; + run_cost += u_sess->opt_cxt.smp_thread_cost * (dop - 1); + if (dop == 0) { + run_cost += cpu_per_tuple * tuples_fetched; + } else { + run_cost += cpu_per_tuple * tuples_fetched / dop; + } ereport(DEBUG2, (errmodule(MOD_OPT), diff --git a/src/gausskernel/optimizer/path/indxpath.cpp b/src/gausskernel/optimizer/path/indxpath.cpp index d1646ac7f..339101eae 100755 --- a/src/gausskernel/optimizer/path/indxpath.cpp +++ b/src/gausskernel/optimizer/path/indxpath.cpp @@ -914,7 +914,8 @@ static List* build_index_paths(PlannerInfo* root, RelOptInfo* rel, IndexOptInfo* bool index_is_ordered = false; bool index_only_scan = false; int indexcol; - + bool can_parallel = IS_STREAM_PLAN && (u_sess->opt_cxt.query_dop > 1) && (ST_BITMAPSCAN != scantype) && + (!rel->isPartitionedTable); /* * Check that index supports the desired scan type(s) */ @@ -1072,6 +1073,22 @@ static List* build_index_paths(PlannerInfo* root, RelOptInfo* rel, IndexOptInfo* upper_params, loop_count); result = lappend(result, ipath); + if (can_parallel) { + ipath = create_index_path(root, + index, + index_clauses, + clause_columns, + NIL, + NIL, + useful_pathkeys, + index_is_ordered ? ForwardScanDirection : NoMovementScanDirection, + index_only_scan, + outer_relids, + upper_params, + loop_count, + u_sess->opt_cxt.query_dop); + result = lappend(result, ipath); + } } /* @@ -1097,6 +1114,23 @@ static List* build_index_paths(PlannerInfo* root, RelOptInfo* rel, IndexOptInfo* upper_params, loop_count); result = lappend(result, ipath); + + if (can_parallel) { + ipath = create_index_path(root, + index, + index_clauses, + clause_columns, + NIL, + NIL, + useful_pathkeys, + BackwardScanDirection, + index_only_scan, + outer_relids, + upper_params, + loop_count, + u_sess->opt_cxt.query_dop); + result = lappend(result, ipath); + } } } diff --git a/src/gausskernel/optimizer/util/pathnode.cpp b/src/gausskernel/optimizer/util/pathnode.cpp index 969bdcd74..8a5a43f98 100755 --- a/src/gausskernel/optimizer/util/pathnode.cpp +++ b/src/gausskernel/optimizer/util/pathnode.cpp @@ -2356,7 +2356,7 @@ bool is_pwj_path(Path* pwjpath) */ IndexPath* create_index_path(PlannerInfo* root, IndexOptInfo* index, List* indexclauses, List* indexclausecols, List* indexorderbys, List* indexorderbycols, List* pathkeys, ScanDirection indexscandir, bool indexonly, - Relids required_outer, Bitmapset *upper_params, double loop_count) + Relids required_outer, Bitmapset *upper_params, double loop_count, int dop) { IndexPath* pathnode = makeNode(IndexPath); RelOptInfo* rel = index->rel; @@ -2370,7 +2370,7 @@ IndexPath* create_index_path(PlannerInfo* root, IndexOptInfo* index, List* index pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = get_baserel_parampathinfo(root, rel, required_outer, upper_params); pathnode->path.pathkeys = pathkeys; - + pathnode->path.dop = dop; /* Convert clauses to indexquals the executor can handle */ expand_indexqual_conditions(index, indexclauses, indexclausecols, &indexquals, &indexqualcols); diff --git a/src/gausskernel/process/stream/streamCore.cpp b/src/gausskernel/process/stream/streamCore.cpp index 375a888d8..e39dcebe4 100755 --- a/src/gausskernel/process/stream/streamCore.cpp +++ b/src/gausskernel/process/stream/streamCore.cpp @@ -56,6 +56,7 @@ #include "utils/combocid.h" #include "vecexecutor/vecstream.h" #include "access/hash.h" +#include "access/nbtree.h" #include "pgstat.h" #include "tcop/tcopprot.h" #include "distributelayer/streamCore.h" @@ -73,6 +74,7 @@ MemoryContext StreamNodeGroup::m_memoryGlobalCxt = NULL; pthread_mutex_t StreamNodeGroup::m_streamNodeGroupLock; HTAB* StreamNodeGroup::m_streamNodeGroupTbl = NULL; HTAB* StreamNodeGroup::m_streamConnectSyncTbl = NULL; +HTAB* StreamNodeGroup::m_streamDescHashTbl = NULL; pthread_mutex_t StreamNodeGroup::m_streamConnectSyncLock; static void ConsumerNodeSyncUpMessage(RecursiveUnionController* controller, int step, StreamState* node); @@ -438,6 +440,15 @@ void StreamNodeGroup::StartUp() hash_create("stream connect sync hash", 256, &nodectl, HASH_ELEM | HASH_FUNCTION | HASH_SHRCTX); pthread_mutex_init(&m_streamConnectSyncLock, NULL); + rc = memset_s(&nodectl, sizeof(nodectl), 0, sizeof(nodectl)); + securec_check(rc, "\0", "\0"); + nodectl.keysize = sizeof(StreamKey); + nodectl.entrysize = sizeof(StreamDescElement); + nodectl.hash = tag_hash; + nodectl.hcxt = m_memoryGlobalCxt; + + m_streamDescHashTbl = + hash_create("stream desc hash", STREAM_DESC_HASH_NUMBER, &nodectl, HASH_ELEM | HASH_FUNCTION | HASH_SHRCTX); } /* @@ -1904,6 +1915,78 @@ void StreamNodeGroup::MarkRecursiveVfdInvalid() } } +void StreamNodeGroup::BuildStreamDesc(const uint64& queryId, Plan* node) +{ + StreamKey streamKey; + memset_s(&streamKey, sizeof(streamKey), 0, sizeof(streamKey)); + streamKey.queryId = queryId; + streamKey.planNodeId = node->plan_node_id; + + void* parallelDesc = NULL; + + switch (nodeTag(node)) { + case T_IndexScan: + parallelDesc = palloc0(sizeof(ParallelIndexScanDescData)); + ((ParallelIndexScanDescData*)parallelDesc)->ps_indexid = ((IndexScan*)node)->indexid; + ((ParallelIndexScanDescData*)parallelDesc)->ps_relid = ((IndexScan*)node)->scan.scanrelid; + ((ParallelIndexScanDescData*)parallelDesc)->psBtpscan = Btbuildparallelscan(); + break; + default: + break; + } + + if (!parallelDesc) { + return; + } + bool found = false; + StreamDescElement* element = (StreamDescElement*)hash_search(m_streamDescHashTbl, &streamKey, HASH_ENTER, &found); + if (found != false) { + ereport(ERROR, (errcode(ERRCODE_SYSTEM_ERROR), errmsg("streamKey of stream nodegroup id is duplicated"))); + } + element->key = streamKey; + element->parallelDesc = (ParallelIndexScanDescData*)parallelDesc; +} + +void StreamNodeGroup::DestroyStreamDesc(const uint64& queryId, Plan* node) +{ + StreamKey streamKey; + memset_s(&streamKey, sizeof(streamKey), 0, sizeof(streamKey)); + streamKey.queryId = queryId; + streamKey.planNodeId = node->plan_node_id; + bool found = false; + StreamDescElement* element = NULL; + + switch (nodeTag(node)) { + case T_IndexScan: + element = (StreamDescElement*)hash_search(m_streamDescHashTbl, &streamKey, HASH_FIND, &found); + if (found == true) { + if (((ParallelIndexScanDescData*)element->parallelDesc)->psBtpscan) { + delete ((ParallelIndexScanDescData*)element->parallelDesc)->psBtpscan; + } + pfree(element->parallelDesc); + (StreamDescElement*)hash_search(m_streamDescHashTbl, &streamKey, HASH_REMOVE, NULL); + } + break; + default: + break; + } +} + +void* StreamNodeGroup::GetParalleDesc(const uint64& queryId, const uint64& planNodeId) +{ + StreamKey key; + memset_s(&key, sizeof(key), 0, sizeof(key)); + key.queryId = queryId; + key.planNodeId = planNodeId; + bool found = false; + StreamDescElement* element = (StreamDescElement*)hash_search(m_streamDescHashTbl, &key, HASH_FIND, &found); + if (found == false) { + return NULL; + } else { + return element->parallelDesc; + } +} + #ifndef ENABLE_MULTIPLE_NODES bool InitStreamObject(PlannedStmt* planStmt) { diff --git a/src/gausskernel/runtime/executor/execMain.cpp b/src/gausskernel/runtime/executor/execMain.cpp index cf3c6600c..c0781a0e9 100755 --- a/src/gausskernel/runtime/executor/execMain.cpp +++ b/src/gausskernel/runtime/executor/execMain.cpp @@ -1532,6 +1532,7 @@ void InitPlan(QueryDesc *queryDesc, int eflags) (IS_SPQ_COORDINATOR && list_nth_int(plannedstmt->subplan_ids, i - 1) != 0) || #endif plannedstmt->planTree->plan_node_id == list_nth_int(plannedstmt->subplan_ids, i - 1))) { + estate->es_under_subplan = true; subplanstate = ExecInitNode(subplan, estate, sp_eflags); @@ -1562,6 +1563,7 @@ void InitPlan(QueryDesc *queryDesc, int eflags) plan->initPlan = plannedstmt->initPlan; estate->es_subplan_ids = plannedstmt->subplan_ids; } + planstate = ExecInitNode(plan, estate, eflags); if (estate->pruningResult) { diff --git a/src/gausskernel/runtime/executor/nodeIndexscan.cpp b/src/gausskernel/runtime/executor/nodeIndexscan.cpp index 4ec3b22c1..31b76ed41 100644 --- a/src/gausskernel/runtime/executor/nodeIndexscan.cpp +++ b/src/gausskernel/runtime/executor/nodeIndexscan.cpp @@ -263,6 +263,8 @@ void ExecReScanIndexScan(IndexScanState* node) scan_handler_idx_rescan( node->iss_ScanDesc, node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_OrderByKeys, node->iss_NumOrderByKeys); + scan_handler_idx_rescan_parallel(node->iss_ScanDesc); + ExecScanReScan(&node->ss); } @@ -467,8 +469,13 @@ void ExecEndIndexScan(IndexScanState* node) /* * close the index relation (no-op if we didn't open it) */ - if (index_scan_desc) + if (index_scan_desc) { scan_handler_idx_endscan(index_scan_desc); + if (WorkerThreadAmI() && node->ss.ps.plan->dop > 1) { + u_sess->stream_cxt.global_obj->DestroyStreamDesc( + node->ss.ps.state->es_plannedstmt->queryId, node->ss.ps.plan); + } + } /* * close the index relation (no-op if we didn't open it) @@ -631,6 +638,18 @@ void ExecInitIndexRelation(IndexScanState* node, EState* estate, int eflags) } } + ParallelIndexScanDescData *paralleDesc = NULL; + if (u_sess->stream_cxt.global_obj && node->ss.ps.plan->dop > 1) { + if (WorkerThreadAmI()) { + u_sess->stream_cxt.global_obj->BuildStreamDesc( + estate->es_plannedstmt->queryId, index_state->ss.ps.plan); + } + paralleDesc = (ParallelIndexScanDescData*)u_sess->stream_cxt.global_obj->GetParalleDesc( + estate->es_plannedstmt->queryId, index_state->ss.ps.plan->plan_node_id); + if (WorkerThreadAmI()) + scan_handler_idx_parallelscan_initialize(current_relation, index_state->iss_RelationDesc, paralleDesc); + } + /* * Initialize scan descriptor. */ @@ -639,7 +658,8 @@ void ExecInitIndexRelation(IndexScanState* node, EState* estate, int eflags) scanSnap, index_state->iss_NumScanKeys, index_state->iss_NumOrderByKeys, - (ScanState*)index_state); + (ScanState*)index_state, + paralleDesc); } return; diff --git a/src/gausskernel/storage/access/hbstore/hbindex_am.cpp b/src/gausskernel/storage/access/hbstore/hbindex_am.cpp index 20de90970..9f3ab4a03 100644 --- a/src/gausskernel/storage/access/hbstore/hbindex_am.cpp +++ b/src/gausskernel/storage/access/hbstore/hbindex_am.cpp @@ -545,15 +545,22 @@ static HeapTuple cross_level_index_getnext(IndexScanDesc scan, ScanDirection dir * ------------------------------------------------------------------------ */ -IndexScanDesc scan_handler_idx_beginscan(Relation heap_relation, Relation index_relation, Snapshot snapshot, int nkeys, int norderbys, ScanState* scan_state) +IndexScanDesc scan_handler_idx_beginscan(Relation heap_relation, Relation index_relation, Snapshot snapshot, + int nkeys, int norderbys, ScanState* scan_state, ParallelIndexScanDesc pscan) { if (unlikely(RELATION_OWN_BUCKET(heap_relation))) { return hbkt_idx_beginscan(heap_relation, index_relation, snapshot, nkeys, norderbys, scan_state); } else { - return index_beginscan(heap_relation, index_relation, snapshot, nkeys, norderbys, scan_state); + return index_beginscan(heap_relation, index_relation, snapshot, nkeys, norderbys, scan_state, pscan); } } +void scan_handler_idx_parallelscan_initialize(Relation heap_relation, + Relation index_relation, ParallelIndexScanDesc p_index_scan) +{ + index_parallelscan_initialize(heap_relation, index_relation, p_index_scan); +} + IndexScanDesc scan_handler_idx_beginscan_bitmap(Relation indexRelation, Snapshot snapshot, int nkeys, ScanState* scan_state) { if (unlikely(RELATION_OWN_BUCKET(indexRelation))) { @@ -574,6 +581,12 @@ void scan_handler_idx_rescan(IndexScanDesc scan, ScanKey key, int nkeys, ScanKey } } +void scan_handler_idx_rescan_parallel(IndexScanDesc scan) +{ + Assert(scan != NULL); + IndexRescanParallel(scan); +} + void scan_handler_idx_rescan_local(IndexScanDesc scan, ScanKey key, int nkeys, ScanKey orderbys, int norderbys) { Assert(scan != NULL); diff --git a/src/gausskernel/storage/access/index/indexam.cpp b/src/gausskernel/storage/access/index/indexam.cpp index f450adb94..758e32288 100644 --- a/src/gausskernel/storage/access/index/indexam.cpp +++ b/src/gausskernel/storage/access/index/indexam.cpp @@ -83,6 +83,7 @@ #include "storage/predicate.h" #include "storage/procarray.h" #include "storage/smgr/smgr.h" +#include "access/nbtree.h" #include "access/ustore/knl_uvisibility.h" #include "access/ustore/knl_uscan.h" #include "utils/snapmgr.h" @@ -143,7 +144,8 @@ } \ } while (0) -static IndexScanDesc index_beginscan_internal(Relation index_relation, int nkeys, int norderbys, Snapshot snapshot); +static IndexScanDesc index_beginscan_internal(Relation index_relation, int nkeys, int norderbys, Snapshot snapshot, + ParallelIndexScanDesc pscan = NULL); /* ---------------- * index_open - open an index relation by relation OID @@ -246,11 +248,12 @@ bool index_insert(Relation index_relation, Datum *values, const bool *isnull, It * Caller must be holding suitable locks on the heap and the index. */ IndexScanDesc index_beginscan( - Relation heap_relation, Relation index_relation, Snapshot snapshot, int nkeys, int norderbys, ScanState* scan_state) + Relation heap_relation, Relation index_relation, Snapshot snapshot, int nkeys, int norderbys, ScanState* scan_state, + ParallelIndexScanDesc pscan) { IndexScanDesc scan; - scan = index_beginscan_internal(index_relation, nkeys, norderbys, snapshot); + scan = index_beginscan_internal(index_relation, nkeys, norderbys, snapshot, pscan); /* * Save additional parameters into the scandesc. Everything else was set @@ -295,7 +298,8 @@ IndexScanDesc index_beginscan_bitmap(Relation index_relation, Snapshot snapshot, /* * index_beginscan_internal --- common code for index_beginscan variants */ -static IndexScanDesc index_beginscan_internal(Relation index_relation, int nkeys, int norderbys, Snapshot snapshot) +static IndexScanDesc index_beginscan_internal(Relation index_relation, int nkeys, int norderbys, Snapshot snapshot, + ParallelIndexScanDesc pscan) { IndexScanDesc scan; FmgrInfo *procedure = NULL; @@ -324,6 +328,9 @@ static IndexScanDesc index_beginscan_internal(Relation index_relation, int nkeys scan->spq_scan = NULL; #endif + /* Initialize information for parallel scan. */ + scan->parallelScan = pscan; + return scan; } @@ -371,6 +378,18 @@ void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, } } +/* ---------------- + * index_rescan_parallel - reset parallel index scan variables + * ---------------- + */ + +void IndexRescanParallel(IndexScanDesc scan) +{ + if (scan->parallelScan) { + btparallelrescan(scan); + } +} + /* ---------------- * index_endscan - end a scan * ---------------- @@ -476,6 +495,31 @@ void index_restrpos(IndexScanDesc scan) } +/* + * index_parallelscan_initialize - initialize parallel scan + * + * We initialize both the ParallelIndexScanDesc proper and the AM-specific + * information which follows it. + * + * This function calls access method specific initialization routine to + * initialize am specific information. Call this just once in the leader + * process; then, individual workers attach via index_beginscan_parallel. + */ +void index_parallelscan_initialize(Relation heap_relation, Relation index_relation, + ParallelIndexScanDesc target) +{ + if (!target) { + return; + } + Size offset; + RELATION_CHECKS; + + target->ps_relid = RelationGetRelid(heap_relation); + target->ps_indexid = RelationGetRelid(index_relation); + + Btinitparallelscan(target->psBtpscan); +} + /* ---------------- * index_getnext_tid - get the next TID from a scan * diff --git a/src/gausskernel/storage/access/nbtree/nbtree.cpp b/src/gausskernel/storage/access/nbtree/nbtree.cpp index 9c120610b..f4fc03c24 100644 --- a/src/gausskernel/storage/access/nbtree/nbtree.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtree.cpp @@ -25,6 +25,7 @@ #include "access/xlog.h" #include "catalog/index.h" #include "commands/vacuum.h" +#include "pgstat.h" #include "storage/indexfsm.h" #include "storage/ipc.h" #include "storage/lmgr.h" @@ -50,6 +51,41 @@ typedef struct { MemoryContext pagedelcontext; } BTVacState; +/* + * BTPARALLEL_NOT_INITIALIZED indicates that the scan has not started. + * + * BTPARALLEL_ADVANCING indicates that some process is advancing the scan to + * a new page; others must wait. + * + * BTPARALLEL_IDLE indicates that no backend is currently advancing the scan + * to a new page; some process can start doing that. + * + * BTPARALLEL_DONE indicates that the scan is complete (including error exit). + * We reach this state once for every distinct combination of array keys. + */ +typedef enum BTPS_State { + BTPARALLEL_NOT_INITIALIZED, + BTPARALLEL_ADVANCING, + BTPARALLEL_IDLE, + BTPARALLEL_DONE +}; + +/* + * BTParallelScanDescData contains btree specific shared information required + * for parallel scan. + */ +typedef struct BTParallelScanDescData { + BlockNumber btps_scanPage; /* latest or next page to be scanned */ + BTPS_State btpsPageStatus;/* indicates whether next page is available + * for scan. see above for possible states of + * parallel scan. */ + int btpsArrayKeyCount; /* count indicating number of array + * scan keys processed by parallel + * scan */ +} BTParallelScanDescData; + +typedef struct BTParallelScanDescData *BTParallelScanDesc; + static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, IndexBulkDeleteCallback callback, void *callback_state, BTCycleId cycleid); static void btvacuumpage(BTVacState *vstate, BlockNumber blkno, BlockNumber orig_blkno); @@ -480,6 +516,7 @@ IndexScanDesc btbeginscan_internal(Relation rel, int nkeys, int norderbys) so->arrayContext = NULL; so->killedItems = NULL; /* until needed */ so->numKilled = 0; + so->arrayKeyCount = 0; /* * We don't know yet whether the scan will be index-only, so we do not @@ -703,6 +740,189 @@ void btrestrpos_internal(IndexScanDesc scan) } } + +/* + * Btinitparallelscan -- initialize BTParallelScanDesc for parallel btree scan + */ +void Btinitparallelscan(void *target) +{ + BTParallelScanDesc btTarget = (BTParallelScanDesc) target; + + LWLockAcquire(ParallelIndexScanLock, LW_EXCLUSIVE); + btTarget->btps_scanPage = InvalidBlockNumber; + btTarget->btpsPageStatus = BTPARALLEL_NOT_INITIALIZED; + btTarget->btpsArrayKeyCount = 0; + LWLockRelease(ParallelIndexScanLock); +} + +/* + * btparallelrescan() -- reset parallel scan + */ +void btparallelrescan(IndexScanDesc scan) +{ + BTParallelScanDesc btscan; + ParallelIndexScanDesc parallel_scan = scan->parallelScan; + + Assert(parallel_scan); + + btscan = (BTParallelScanDesc) parallel_scan->psBtpscan; + + /* + * In theory, we don't need to acquire the spinlock here, because there + * shouldn't be any other workers running at this point, but we do so for + * consistency. + */ + LWLockAcquire(ParallelIndexScanLock, LW_EXCLUSIVE); + btscan->btps_scanPage = InvalidBlockNumber; + btscan->btpsPageStatus = BTPARALLEL_NOT_INITIALIZED; + btscan->btpsArrayKeyCount = 0; + LWLockRelease(ParallelIndexScanLock); +} + +/* + * _bt_parallel_seize() -- Begin the process of advancing the scan to a new + * page. Other scans must wait until we call bt_parallel_release() or + * bt_parallel_done(). + * + * The return value is true if we successfully seized the scan and false + * if we did not. The latter case occurs if no pages remain for the current + * set of scankeys. + * + * If the return value is true, *pageno returns the next or current page + * of the scan (depending on the scan direction). An invalid block number + * means the scan hasn't yet started, and P_NONE means we've reached the end. + * The first time a participating process reaches the last page, it will return + * true and set *pageno to P_NONE; after that, further attempts to seize the + * scan will return false. + * + * Callers should ignore the value of pageno if the return value is false. + */ +bool _bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + BTPS_State pageStatus; + bool exitLoop = false; + bool status = true; + ParallelIndexScanDesc parallel_scan = scan->parallelScan; + BTParallelScanDesc btscan; + + *pageno = P_NONE; + + btscan = (BTParallelScanDesc) (parallel_scan->psBtpscan); + + while (1) { + LWLockAcquire(ParallelIndexScanLock, LW_EXCLUSIVE); + pageStatus = btscan->btpsPageStatus; + + if (so->arrayKeyCount < btscan->btpsArrayKeyCount) { + /* Parallel scan has already advanced to a new set of scankeys. */ + status = false; + } else if (pageStatus == BTPARALLEL_DONE) { + /* + * We're done with this set of scankeys. This may be the end, or + * there could be more sets to try. + */ + status = false; + } else if (pageStatus != BTPARALLEL_ADVANCING) { + /* + * We have successfully seized control of the scan for the purpose + * of advancing it to a new page! + */ + btscan->btpsPageStatus = BTPARALLEL_ADVANCING; + *pageno = btscan->btps_scanPage; + exitLoop = true; + } + LWLockRelease(ParallelIndexScanLock); + if (exitLoop || !status) { + break; + } + } + + return status; +} + +/* + * _bt_parallel_release() -- Complete the process of advancing the scan to a + * new page. We now have the new value btps_scanPage; some other backend + * can now begin advancing the scan. + */ +void _bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page) +{ + ParallelIndexScanDesc parallel_scan = scan->parallelScan; + BTParallelScanDesc btscan; + + btscan = (BTParallelScanDesc) (parallel_scan->psBtpscan); + + { + LWLockAcquire(ParallelIndexScanLock, LW_EXCLUSIVE); + btscan->btps_scanPage = scan_page; + btscan->btpsPageStatus = BTPARALLEL_IDLE; + LWLockRelease(ParallelIndexScanLock); + } +} + +/* + * _bt_parallel_done() -- Mark the parallel scan as complete. + * + * When there are no pages left to scan, this function should be called to + * notify other workers. Otherwise, they might wait forever for the scan to + * advance to the next page. + */ +void _bt_parallel_done(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + ParallelIndexScanDesc parallel_scan = scan->parallelScan; + BTParallelScanDesc btscan; + bool statusChanged = false; + + /* Do nothing, for non-parallel scans */ + if (parallel_scan == NULL) { + return; + } + + btscan = (BTParallelScanDesc) (parallel_scan->psBtpscan); + + /* + * Mark the parallel scan as done for this combination of scan keys, + * unless some other process already did so. See also + * _bt_advance_array_keys. + */ + { + LWLockAcquire(ParallelIndexScanLock, LW_EXCLUSIVE); + if (so->arrayKeyCount >= btscan->btpsArrayKeyCount + && btscan->btpsPageStatus != BTPARALLEL_DONE) { + btscan->btpsPageStatus = BTPARALLEL_DONE; + statusChanged = true; + } + LWLockRelease(ParallelIndexScanLock); + } +} + +/* + * _bt_parallel_advance_array_keys() -- Advances the parallel scan for array + * keys. + * + * Updates the count of array keys processed for both local and parallel + * scans. + */ +void _bt_parallel_advance_array_keys(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + ParallelIndexScanDesc parallel_scan = scan->parallelScan; + BTParallelScanDesc btscan; + + btscan = (BTParallelScanDesc) (parallel_scan->psBtpscan); + + so->arrayKeyCount++; + LWLockAcquire(ParallelIndexScanLock, LW_EXCLUSIVE); + if (btscan->btpsPageStatus == BTPARALLEL_DONE) { + btscan->btps_scanPage = InvalidBlockNumber; + btscan->btpsPageStatus = BTPARALLEL_NOT_INITIALIZED; + btscan->btpsArrayKeyCount++; + } + LWLockRelease(ParallelIndexScanLock); +} + /* * Bulk deletion of all index entries pointing to a set of heap tuples. * The set of target tuples is specified via a callback routine that tells @@ -1396,3 +1616,12 @@ static BTVacuumPosting btree_vacuum_posting(BTVacState *vac_state, IndexTuple po *num_remaining = num_live; return vacposting; } + +/* + * btestimateparallelscan -- estimate storage for BTParallelScanDescData + */ +void* Btbuildparallelscan(void) +{ + void *btPscan = new BTParallelScanDescData; + return btPscan; +} diff --git a/src/gausskernel/storage/access/nbtree/nbtsearch.cpp b/src/gausskernel/storage/access/nbtree/nbtsearch.cpp index 1d59776cf..e07865b9d 100644 --- a/src/gausskernel/storage/access/nbtree/nbtsearch.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtsearch.cpp @@ -35,9 +35,11 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber off static void _bt_saveitem(BTScanOpaque so, int itemIndex, OffsetNumber offnum, IndexTuple itup, Oid partOid, int2 bucketid); static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir); +static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir); +static bool _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir); static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir); static void _bt_check_natts_correct(const Relation index, bool heapkeyspace, Page page, OffsetNumber offnum); - +static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir); static int btree_setup_posting_items(BTScanOpaque so, int itemIndex, OffsetNumber offnum, ItemPointer heapTid, IndexTuple itup); static void btree_save_posting_item(BTScanOpaque so, int itemIndex, OffsetNumber offnum, ItemPointer heapTid, @@ -592,8 +594,11 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) ScanKeyData notnullkeys[INDEX_MAX_KEYS]; int keysCount = 0; int i; + bool status = true; StrategyNumber strat_total; BTScanPosItem *currItem = NULL; + bool match = false; + BlockNumber blkno; pgstat_count_index_scan(rel); @@ -610,6 +615,27 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) if (!so->qual_ok) return false; + /* + * For parallel scans, get the starting page from shared state. If the + * scan has not started, proceed to find out first leaf page in the usual + * way while keeping other participating processes waiting. If the scan + * has already begun, use the page number from the shared structure. + */ + if (scan->parallelScan != NULL) { + status = _bt_parallel_seize(scan, &blkno); + if (!status) { + return false; + } else if (blkno == P_NONE) { + _bt_parallel_done(scan); + return false; + } else if (blkno != InvalidBlockNumber) { + if (!_bt_parallel_readpage(scan, blkno, dir)) { + return false; + } + goto readcomplete; + } + } + /* ---------- * Examine the scan keys to discover where we need to start the scan. * @@ -774,8 +800,14 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) * the tree. Walk down that edge to the first or last key, and scan from * there. */ - if (keysCount == 0) - return _bt_endpoint(scan, dir); + if (keysCount == 0) { + match = _bt_endpoint(scan, dir); + if (!match) { + /* No match, so mark (parallel) scan finished */ + _bt_parallel_done(scan); + } + return match; + } /* * We want to start the scan somewhere within the index. Set up an @@ -801,8 +833,10 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) */ ScanKey subkey = (ScanKey)DatumGetPointer(cur->sk_argument); Assert(subkey->sk_flags & SK_ROW_MEMBER); - if (subkey->sk_flags & SK_ISNULL) + if (subkey->sk_flags & SK_ISNULL) { + _bt_parallel_done(scan); return false; + } inskey.scankeys[i] = *subkey; /* @@ -1003,21 +1037,19 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) * because nothing finer to lock exists. */ PredicateLockRelation(rel, scan->xs_snapshot); + + /* + * mark parallel scan as done, so that all the workers can finish + * their scan + */ + _bt_parallel_done(scan); return false; } else PredicateLockPage(rel, BufferGetBlockNumber(buf), scan->xs_snapshot); - /* initialize moreLeft/moreRight appropriately for scan direction */ - if (ScanDirectionIsForward(dir)) { - so->currPos.moreLeft = false; - so->currPos.moreRight = true; - } else { - so->currPos.moreLeft = true; - so->currPos.moreRight = false; - } - so->numKilled = 0; /* just paranoia */ - so->markItemIndex = -1; /* ditto */ + _bt_initialize_more_data(so, dir); + { /* position to the precise item on the page */ int posting_off = 0; offnum = _bt_binsrch(rel, &inskey, buf, &posting_off); @@ -1057,7 +1089,9 @@ bool _bt_first(IndexScanDesc scan, ScanDirection dir) /* Drop the lock, but not pin, on the current page */ LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK); + } +readcomplete: /* OK, itemIndex says what to return */ currItem = &so->currPos.items[so->currPos.itemIndex]; scan->xs_ctup.t_self = currItem->heapTid; @@ -1149,6 +1183,10 @@ bool _bt_next(IndexScanDesc scan, ScanDirection dir) * moreLeft or moreRight (as appropriate) is cleared if _bt_checkkeys reports * that there can be no more matching tuples in the current scan direction. * + * In the case of a parallel scan, caller must have called _bt_parallel_seize + * prior to calling this function; this function will invoke + * _bt_parallel_release before returning. + * * Returns true if any matching items found on the page, false if none. */ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum) @@ -1175,6 +1213,15 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber off page = BufferGetPage(so->currPos.buf); opaque = (BTPageOpaqueInternal)PageGetSpecialPointer(page); + + /* allow next page be processed by parallel worker */ + if (scan->parallelScan) { + if (ScanDirectionIsForward(dir)) + _bt_parallel_release(scan, opaque->btpo_next); + else + _bt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf)); + } + minoff = P_FIRSTDATAKEY(opaque); maxoff = PageGetMaxOffsetNumber(page); @@ -1185,6 +1232,12 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber off */ so->currPos.nextPage = opaque->btpo_next; + /* + * We note the buffer's block number so that we can release the pin later. + * This allows us to re-read the buffer if it is needed again for hinting. + */ + so->currPos.currPage = BufferGetBlockNumber(so->currPos.buf); + /* initialize tuple workspace to empty */ so->currPos.nextTupleOffset = 0; @@ -1314,8 +1367,8 @@ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) { BTScanOpaque so = (BTScanOpaque)scan->opaque; Relation rel; - Page page; - BTPageOpaqueInternal opaque; + BlockNumber blkno = InvalidBlockNumber; + bool status; /* we must have the buffer pinned and locked */ Assert(BufferIsValid(so->currPos.buf)); @@ -1346,65 +1399,169 @@ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) rel = scan->indexRelation; - if (ScanDirectionIsForward(dir)) { - /* Walk right to the next page with data */ - /* We must rely on the previously saved nextPage link! */ - BlockNumber blkno = so->currPos.nextPage; + /* release the previous buffer, if pinned */ + _bt_relbuf(rel, so->currPos.buf); + if (ScanDirectionIsForward(dir)) { + so->currPos.buf = InvalidBuffer; + + /* Walk right to the next page with data */ + if (scan->parallelScan != NULL) { + /* + * Seize the scan to get the next block number; if the scan has + * ended already, bail out. + */ + status = _bt_parallel_seize(scan, &blkno); + if (!status) { + return false; + } + } else { + /* Not parallel, so use the previously-saved nextPage link. */ + blkno = so->currPos.nextPage; + } /* Remember we left a page with data */ so->currPos.moreLeft = true; - - for (;;) { - /* release the previous buffer */ - _bt_relbuf(rel, so->currPos.buf); - so->currPos.buf = InvalidBuffer; - /* if we're at end of scan, give up */ - if (blkno == P_NONE || !so->currPos.moreRight) + } else { + /* Remember we left a page with data */ + so->currPos.moreRight = true; + if (scan->parallelScan != NULL) { + /* + * Seize the scan to get the current block number; if the scan has + * ended already, bail out. + */ + status = _bt_parallel_seize(scan, &blkno); + if (!status) { + so->currPos.buf = InvalidBuffer; return false; + } + } else { + /* Not parallel, so just use our own notion of the current page */ + blkno = so->currPos.currPage; + } + } + + if (!_bt_readnextpage(scan, blkno, dir)) { + return false; + } + + return true; +} + +/* + * _bt_readnextpage() -- Read next page containing valid data for scan + * + * On success exit, so->currPos is updated to contain data from the next + * interesting page. Caller is responsible to release lock and pin on + * buffer on success. We return true to indicate success. + * + * If there are no more matching records in the given direction, we drop all + * locks and pins, set so->currPos.buf to InvalidBuffer, and return false. + */ +static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, + ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + Relation rel; + Page page; + BTPageOpaqueInternal opaque; + bool status; + + rel = scan->indexRelation; + + if (ScanDirectionIsForward(dir)) { + for (;;) { + /* + * if we're at end of scan, give up and mark parallel scan as + * done, so that all the workers can finish their scan + */ + if (blkno == P_NONE || !so->currPos.moreRight) { + _bt_parallel_done(scan); + return false; + } /* check for interrupts while we're not holding any buffer lock */ CHECK_FOR_INTERRUPTS(); /* step right one page */ so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ); - /* check for deleted page */ page = BufferGetPage(so->currPos.buf); - opaque = (BTPageOpaqueInternal)PageGetSpecialPointer(page); + opaque = BTPageGetOpaqueInternal(page); + /* check for deleted page */ if (!P_IGNORE(opaque)) { PredicateLockPage(rel, blkno, scan->xs_snapshot); /* see if there are any matches on this page */ /* note that this will clear moreRight if we can stop */ - if (_bt_readpage(scan, dir, P_FIRSTDATAKEY(opaque))) + if (_bt_readpage(scan, dir, P_FIRSTDATAKEY(opaque))) { break; + } + } else if (scan->parallelScan != NULL) { + /* allow next page be processed by parallel worker */ + _bt_parallel_release(scan, opaque->btpo_next); } + + /* release the previous buffer */ + _bt_relbuf(rel, so->currPos.buf); + so->currPos.buf = InvalidBuffer; + /* nope, keep going */ - blkno = opaque->btpo_next; + if (scan->parallelScan != NULL) { + status = _bt_parallel_seize(scan, &blkno); + if (!status) { + return false; + } + } else { + blkno = opaque->btpo_next; + } } } else { - /* Remember we left a page with data */ - so->currPos.moreRight = true; + /* + * Should only happen in parallel cases, when some other backend + * advanced the scan. + */ + + so->currPos.currPage = blkno; + so->currPos.buf = _bt_getbuf(rel, so->currPos.currPage, BT_READ); /* * Walk left to the next page with data. This is much more complex * than the walk-right case because of the possibility that the page * to our left splits while we are in flight to it, plus the * possibility that the page we were on gets deleted after we leave - * it. See nbtree/README for details. + * it. See nbtree/README for details. + * + * It might be possible to rearrange this code to have less overhead + * in pinning and locking, but that would require capturing the left + * pointer when the page is initially read, and using it here, along + * with big changes to _bt_walk_left() and the code below. It is not + * clear whether this would be a win, since if the page immediately to + * the left splits after we read this page and before we step left, we + * would need to visit more pages than with the current code. + * + * Note that if we change the code so that we drop the pin for a scan + * which uses a non-MVCC snapshot, we will need to modify the code for + * walking left, to allow for the possibility that a referenced page + * has been deleted. As long as the buffer is pinned or the snapshot + * is MVCC the page cannot move past the half-dead state to fully + * deleted. */ + for (;;) { /* Done if we know there are no matching keys to the left */ if (!so->currPos.moreLeft) { _bt_relbuf(rel, so->currPos.buf); + _bt_parallel_done(scan); so->currPos.buf = InvalidBuffer; return false; } /* Step to next physical page */ - Buffer temp = so->currPos.buf; - so->currPos.buf = InvalidBuffer; + Buffer temp = so->currPos.buf; + so->currPos.buf = InvalidBuffer; so->currPos.buf = _bt_walk_left(rel, temp); /* if we're physically at end of index, return failure */ - if (so->currPos.buf == InvalidBuffer) + if (so->currPos.buf == InvalidBuffer) { + _bt_parallel_done(scan); return false; + } /* * Okay, we managed to move left to a non-deleted page. Done if @@ -1412,13 +1569,33 @@ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) * and do it all again. */ page = BufferGetPage(so->currPos.buf); - opaque = (BTPageOpaqueInternal)PageGetSpecialPointer(page); + opaque = BTPageGetOpaqueInternal(page); if (!P_IGNORE(opaque)) { PredicateLockPage(rel, BufferGetBlockNumber(so->currPos.buf), scan->xs_snapshot); /* see if there are any matches on this page */ /* note that this will clear moreLeft if we can stop */ - if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page))) + if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page))) { break; + } + } else if (scan->parallelScan != NULL) { + /* allow next page be processed by parallel worker */ + _bt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf)); + } + + /* + * For parallel scans, get the last page scanned as it is quite + * possible that by the time we try to seize the scan, some other + * worker has already advanced the scan to a different page. We + * must continue based on the latest page scanned by any worker. + */ + if (scan->parallelScan != NULL) { + _bt_relbuf(rel, so->currPos.buf); + status = _bt_parallel_seize(scan, &blkno); + if (!status) { + so->currPos.buf = InvalidBuffer; + return false; + } + so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ); } } } @@ -1426,6 +1603,27 @@ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) return true; } +/* + * _bt_parallel_readpage() -- Read current page containing valid data for scan + * + * On success, release lock and maybe pin on buffer. We return true to + * indicate success. + */ +static bool _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + + _bt_initialize_more_data(so, dir); + + if (!_bt_readnextpage(scan, blkno, dir)) { + return false; + } + + /* Drop the lock, but not pin, on the new page */ + LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK); + return true; +} + /* * _bt_walk_left() -- step left one page, if possible * @@ -1667,16 +1865,7 @@ static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir) /* remember which buffer we have pinned */ so->currPos.buf = buf; - /* initialize moreLeft/moreRight appropriately for scan direction */ - if (ScanDirectionIsForward(dir)) { - so->currPos.moreLeft = false; - so->currPos.moreRight = true; - } else { - so->currPos.moreLeft = true; - so->currPos.moreRight = false; - } - so->numKilled = 0; /* just paranoia */ - so->markItemIndex = -1; /* ditto */ + _bt_initialize_more_data(so, dir); /* * Now load data from the first page of the scan. @@ -1889,3 +2078,22 @@ static inline void btree_save_posting_item(BTScanOpaque so, int item_idx, Offset if (so->currTuples) curr_item->tupleOffset = tuple_offset; } + +/* + * _bt_initialize_more_data() -- initialize moreLeft/moreRight appropriately + * for scan direction + */ +static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir) +{ + /* initialize moreLeft/moreRight appropriately for scan direction */ + if (ScanDirectionIsForward(dir)) { + so->currPos.moreLeft = false; + so->currPos.moreRight = true; + } else { + so->currPos.moreLeft = true; + so->currPos.moreRight = false; + } + so->numKilled = 0; /* just paranoia */ + so->markItemIndex = -1; /* ditto */ +} + diff --git a/src/gausskernel/storage/access/nbtree/nbtutils.cpp b/src/gausskernel/storage/access/nbtree/nbtutils.cpp index 93d3ffba9..616da34cc 100644 --- a/src/gausskernel/storage/access/nbtree/nbtutils.cpp +++ b/src/gausskernel/storage/access/nbtree/nbtutils.cpp @@ -554,6 +554,10 @@ bool _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir) } } + /* advance parallel scan */ + if (scan->parallelScan != NULL) + _bt_parallel_advance_array_keys(scan); + return found; } diff --git a/src/gausskernel/storage/lmgr/lwlocknames.txt b/src/gausskernel/storage/lmgr/lwlocknames.txt index 9934949c9..ebf15340a 100755 --- a/src/gausskernel/storage/lmgr/lwlocknames.txt +++ b/src/gausskernel/storage/lmgr/lwlocknames.txt @@ -144,3 +144,4 @@ RedoTruncateLock 135 ExrtoRecycleResidualUndoLock 137 ShareInputScanLock 138 +ParallelIndexScanLock 139 \ No newline at end of file diff --git a/src/include/access/genam.h b/src/include/access/genam.h index fbf04d175..42af09b01 100644 --- a/src/include/access/genam.h +++ b/src/include/access/genam.h @@ -86,6 +86,7 @@ typedef bool (*IndexBulkDeleteCallback)(ItemPointer itemptr, void* state, Oid pa typedef struct IndexScanDescData* IndexScanDesc; typedef struct SysScanDescData* SysScanDesc; struct ScanState; +typedef struct ParallelIndexScanDescData *ParallelIndexScanDesc; /* * Enumeration specifying the type of uniqueness check to perform in @@ -132,10 +133,13 @@ extern void index_delete(Relation index_relation, Datum* values, const bool* isn extern bool index_insert(Relation indexRelation, Datum* values, const bool* isnull, ItemPointer heap_t_ctid, Relation heapRelation, IndexUniqueCheck checkUnique); -extern IndexScanDesc index_beginscan( - Relation heapRelation, Relation indexRelation, Snapshot snapshot, int nkeys, int norderbys, ScanState* scan_state=NULL); +extern IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, + int nkeys, int norderbys, ScanState* scan_state = NULL, ParallelIndexScanDesc pscan = NULL); +extern void index_parallelscan_initialize(Relation heap_relation, + Relation index_relation, ParallelIndexScanDesc pIndexScan); extern IndexScanDesc index_beginscan_bitmap(Relation indexRelation, Snapshot snapshot, int nkeys, ScanState* scan_state=NULL); extern void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys); +extern void IndexRescanParallel(IndexScanDesc scan); extern void index_endscan(IndexScanDesc scan); extern void index_markpos(IndexScanDesc scan); extern void index_restrpos(IndexScanDesc scan); diff --git a/src/include/access/hbindex_am.h b/src/include/access/hbindex_am.h index 6de8943e2..cd4a173bf 100644 --- a/src/include/access/hbindex_am.h +++ b/src/include/access/hbindex_am.h @@ -48,10 +48,13 @@ static inline bool hbkt_idx_need_switch_bkt(IndexScanDesc scan, int targetSlot) extern bool hbkt_idx_bitmapscan_switch_bucket(IndexScanDesc scan, int targetSlot); extern bool cbi_scan_need_fix_hbkt_rel(IndexScanDesc scan, int2 bucketid = InvalidBktId); extern bool cbi_scan_fix_hbkt_rel(HBktIdxScanDesc hpScan); -extern IndexScanDesc scan_handler_idx_beginscan(Relation heap_relation, Relation index_relation, Snapshot snapshot, - int nkeys, int norderbys, ScanState* scan_state = NULL); +extern IndexScanDesc scan_handler_idx_beginscan(Relation heap_relation, Relation index_relation, Snapshot snapshot, + int nkeys, int norderbys, ScanState* scan_state = NULL, ParallelIndexScanDesc pscan = NULL); +extern void scan_handler_idx_parallelscan_initialize(Relation heap_relation, + Relation index_relation, ParallelIndexScanDesc p_index_scan); extern IndexScanDesc scan_handler_idx_beginscan_bitmap(Relation indexRelation, Snapshot snapshot, int nkeys, ScanState* scan_state); extern void scan_handler_idx_rescan(IndexScanDesc scan, ScanKey key, int nkeys, ScanKey orderbys, int norderbys); +extern void scan_handler_idx_rescan_parallel(IndexScanDesc scan); extern void scan_handler_idx_rescan_local(IndexScanDesc scan, ScanKey key, int nkeys, ScanKey orderbys, int norderbys); extern void scan_handler_idx_endscan(IndexScanDesc scan); extern void scan_handler_idx_markpos(IndexScanDesc scan); diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index c34fad5c0..bb6d1e83c 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -107,6 +107,7 @@ typedef struct UBTPageOpaqueData { } UBTPageOpaqueData; typedef UBTPageOpaqueData* UBTPageOpaque; +#define BTPageGetOpaqueInternal(page) ((BTPageOpaqueInternal) PageGetSpecialPointer(page)) typedef struct BTDedupIntervalData { OffsetNumber base_off; @@ -905,6 +906,7 @@ typedef struct BTScanPosData { Buffer buf; /* if valid, the buffer is pinned */ BlockNumber nextPage; /* page's right link when we scanned it */ + BlockNumber currPage; /* page referenced by items array */ TransactionId xid_base; @@ -961,6 +963,8 @@ typedef struct BTScanOpaqueData { ScanKey arrayKeyData; /* modified copy of scan->keyData */ int numArrayKeys; /* number of equality-type array keys (-1 if * there are any unsatisfiable array keys) */ + int arrayKeyCount; /* count indicating number of array scan keys + * processed */ BTArrayKeyInfo* arrayKeys; /* info about each equality-type array key */ MemoryContext arrayContext; /* scan-lifespan context for array data */ @@ -1282,6 +1286,8 @@ extern Datum btbuild(PG_FUNCTION_ARGS); extern Datum btbuildempty(PG_FUNCTION_ARGS); extern Datum btinsert(PG_FUNCTION_ARGS); extern Datum btbeginscan(PG_FUNCTION_ARGS); +extern void* Btbuildparallelscan(void); +extern void Btinitparallelscan(void *target); extern Datum btgettuple(PG_FUNCTION_ARGS); extern Datum btgetbitmap(PG_FUNCTION_ARGS); extern Datum cbtreegetbitmap(PG_FUNCTION_ARGS); @@ -1294,6 +1300,16 @@ extern Datum btvacuumcleanup(PG_FUNCTION_ARGS); extern Datum btcanreturn(PG_FUNCTION_ARGS); extern Datum btoptions(PG_FUNCTION_ARGS); +extern void btparallelrescan(IndexScanDesc scan); +extern void Btinitparallelscan(void *target); +/* + * prototypes for internal functions in nbtree.c + */ +extern bool _bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno); +extern void _bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page); +extern void _bt_parallel_done(IndexScanDesc scan); +extern void _bt_parallel_advance_array_keys(IndexScanDesc scan); + extern inline IndexBuildResult *btbuild_internal(Relation heap, Relation index, IndexInfo *index_info); extern inline void btbuildempty_internal(Relation index); @@ -1326,7 +1342,6 @@ extern inline IndexBuildResult *btmerge_internal(Relation dstIdxRel, List *srcId * thought, we are not going to implement them right now. */ extern Datum btmerge(PG_FUNCTION_ARGS); - /* * prototypes for functions in nbtinsert.c */ diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index d3f40dd24..cc67b1be8 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -170,6 +170,8 @@ typedef struct IndexScanDescData { SPQScanDesc spq_scan; #endif IndexFetchTableData *xs_heapfetch; + /* parallel index scan information, in global variables */ + struct ParallelIndexScanDescData *parallelScan; /* put decompressed heap tuple data into xs_ctbuf_hdr be careful! when malloc memory should give extra mem for *xs_ctbuf_hdr. t_bits which is varlength arr */ @@ -177,6 +179,13 @@ typedef struct IndexScanDescData { /* DO NOT add any other members here. xs_ctbuf_hdr must be the last one. */ } IndexScanDescData; +/* Generic structure for parallel scans */ +typedef struct ParallelIndexScanDescData { + Oid ps_relid; + Oid ps_indexid; + void* psBtpscan; +} ParallelIndexScanDescData; + #define SizeofIndexScanDescData (offsetof(IndexScanDescData, xs_ctbuf_hdr) + SizeofHeapTupleHeader) /* Get partition heap oid for bitmap index scan */ diff --git a/src/include/c.h b/src/include/c.h index 468768a34..5b6a61990 100644 --- a/src/include/c.h +++ b/src/include/c.h @@ -722,6 +722,8 @@ typedef struct pathData { * True iff pointer is properly aligned to point to the given type. */ #define PointerIsAligned(pointer, type) (((intptr_t)(pointer) % (sizeof(type))) == 0) +#define OffsetToPointer(base, offset) \ + ((void *)((char *)(base) + (offset))) #define OidIsValid(objectId) ((bool)((objectId) != InvalidOid)) diff --git a/src/include/distributelayer/streamCore.h b/src/include/distributelayer/streamCore.h index a117a3032..d7f191780 100755 --- a/src/include/distributelayer/streamCore.h +++ b/src/include/distributelayer/streamCore.h @@ -58,6 +58,8 @@ #define TupleVectorMaxSize 100 +#define STREAM_DESC_HASH_NUMBER 256 + #define IS_STREAM_PORTAL (!StreamThreadAmI() && portal->streamInfo.streamGroup != NULL) struct StreamState; @@ -103,6 +105,11 @@ typedef struct { uint64 key; } StreamConnectSyncElement; +typedef struct { + StreamKey key; + ParallelIndexScanDescData* parallelDesc; +} StreamDescElement; + enum StreamObjType { STREAM_PRODUCER, STREAM_CONSUMER, @@ -442,6 +449,12 @@ public: /* Send stop signal to all stream threads in node group. */ void SigStreamThreadClose(); + void BuildStreamDesc(const uint64& queryId, Plan* node); + + void* GetParalleDesc(const uint64& queryId, const uint64& planNodeId); + + void DestroyStreamDesc(const uint64& queryId, Plan* node); + struct PortalData *m_portal; #endif /* Mark recursive vfd is invalid before aborting transaction. */ @@ -519,6 +532,7 @@ private: /* Mark Stream query quit status. */ StreamObjStatus m_quitStatus; #endif + static HTAB* m_streamDescHashTbl; }; extern bool IsThreadProcessStreamRecursive(); diff --git a/src/include/executor/node/nodeIndexscan.h b/src/include/executor/node/nodeIndexscan.h index 0fdbf67a5..c3441dafc 100644 --- a/src/include/executor/node/nodeIndexscan.h +++ b/src/include/executor/node/nodeIndexscan.h @@ -14,6 +14,7 @@ #ifndef NODEINDEXSCAN_H #define NODEINDEXSCAN_H +#include "executor/exec/execStream.h" #include "nodes/execnodes.h" extern IndexScanState* ExecInitIndexScan(IndexScan* node, EState* estate, int eflags); diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index 068b3e682..df461e6e8 100755 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -3643,6 +3643,11 @@ inline bool StreamTopConsumerAmI() return (t_thrd.subrole == TOP_CONSUMER); } +inline bool WorkerThreadAmI() +{ + return (t_thrd.role == WORKER || t_thrd.role == THREADPOOL_WORKER); +} + inline bool WLMThreadAmI() { return (t_thrd.role == WLM_WORKER || t_thrd.role == WLM_MONITOR || diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index c9e5af177..91dee74f2 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -61,7 +61,7 @@ extern Path *create_tsstorescan_path(PlannerInfo * root,RelOptInfo * rel, int do #endif /* ENABLE_MULTIPLE_NODES */ extern IndexPath* create_index_path(PlannerInfo* root, IndexOptInfo* index, List* indexclauses, List* indexclausecols, List* indexorderbys, List* indexorderbycols, List* pathkeys, ScanDirection indexscandir, bool indexonly, - Relids required_outer, Bitmapset *upper_params, double loop_count); + Relids required_outer, Bitmapset *upper_params, double loop_count, int dop = 1); extern Path* build_seqScanPath_by_indexScanPath(PlannerInfo* root, Path* index_path); extern bool CheckBitmapQualIsGlobalIndex(Path* bitmapqual); extern bool CheckBitmapHeapPathContainGlobalOrLocal(Path* bitmapqual); diff --git a/src/test/regress/expected/plan_hint_iud.out b/src/test/regress/expected/plan_hint_iud.out index 25e775a6f..09443698d 100755 --- a/src/test/regress/expected/plan_hint_iud.out +++ b/src/test/regress/expected/plan_hint_iud.out @@ -579,15 +579,18 @@ deallocate all; --- Set :EXP merge /*+ set(query_dop 1008) */ into t1 using t2 on t1.c1 = t2.c1 when matched then update set t1.c2 = t2.c2 when not matched then insert values (t2.c1, t2.c2); - QUERY PLAN ----------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------------- Merge on t1 - -> Nested Loop Left Join - -> Streaming(type: LOCAL GATHER dop: 1/8) - -> Seq Scan on t2 - -> Index Scan using t1_pkey on t1 - Index Cond: (c1 = t2.c1) -(6 rows) + -> Streaming(type: LOCAL GATHER dop: 1/8) + -> Nested Loop Left Join + Join Filter: (t1.c1 = t2.c1) + -> Streaming(type: LOCAL REDISTRIBUTE dop: 8/8) + -> Seq Scan on t2 + -> Materialize + -> Streaming(type: LOCAL REDISTRIBUTE dop: 8/8) + -> Seq Scan on t1 +(9 rows) --- Plancache prepare merge_g as merge /*+ use_gplan */ into t1 using t2 on t1.c1 = t2.c1 and t1.c1 = $1 when matched then update set t1.c2 = t2.c2 when not matched then insert values (t2.c1, t2.c2);