diff --git a/src/gausskernel/optimizer/path/allpaths.cpp b/src/gausskernel/optimizer/path/allpaths.cpp index 40050c018..41f8e196a 100755 --- a/src/gausskernel/optimizer/path/allpaths.cpp +++ b/src/gausskernel/optimizer/path/allpaths.cpp @@ -1136,7 +1136,7 @@ static void set_plain_rel_pathlist(PlannerInfo* root, RelOptInfo* rel, RangeTblE * support normal row table unless it is partitioned. * The partition table can be parallelized when partItrs > u_sess->opt_cxt.query_dop. */ - bool can_parallel = IS_STREAM_PLAN && (u_sess->opt_cxt.query_dop > 1) && (!rel->is_ustore) && + bool can_parallel = IS_STREAM_PLAN && (u_sess->opt_cxt.query_dop > 1) && (rel->locator_type != LOCATOR_TYPE_REPLICATED) && (rte->tablesample == NULL); if (!isrp) { #endif diff --git a/src/gausskernel/storage/access/table/tableam.cpp b/src/gausskernel/storage/access/table/tableam.cpp index 200ba28a1..f852b2d82 100644 --- a/src/gausskernel/storage/access/table/tableam.cpp +++ b/src/gausskernel/storage/access/table/tableam.cpp @@ -925,6 +925,10 @@ void UHeapamScanMarkpos(TableScanDesc sscan) return UHeapMarkPos(sscan); } +void UHeapamScanInitParallelSeqscan(TableScanDesc sscan, int32 dop, ScanDirection dir) +{ + return UeapInitParallelSeqscan(sscan, dop, dir); +} void UHeapamScanEndscan(TableScanDesc sscan) { @@ -1216,7 +1220,7 @@ static const TableAmRoutine g_ustoream_methods = { scan_restrpos : UHeapamScanRestrpos, scan_markpos : UHeapamScanMarkpos, - scan_init_parallel_seqscan : HeapamScanInitParallelSeqscan, + scan_init_parallel_seqscan : UHeapamScanInitParallelSeqscan, scan_getnexttuple : UHeapamScanGetnexttuple, scan_GetNextBatch : UHeapamGetNextBatchMode, scan_getpage : UHeapamScanGetpage, diff --git a/src/gausskernel/storage/access/ustore/knl_uscan.cpp b/src/gausskernel/storage/access/ustore/knl_uscan.cpp index e819e1db2..92f6bacad 100644 --- a/src/gausskernel/storage/access/ustore/knl_uscan.cpp +++ b/src/gausskernel/storage/access/ustore/knl_uscan.cpp @@ -89,36 +89,46 @@ FORCE_INLINE bool NextUpage(UHeapScanDesc scan, ScanDirection dir, BlockNumber& page) { bool finished = false; - /* - * advance to next/prior page and detect end of scan - */ - if (BackwardScanDirection == dir) { - finished = (page == scan->rs_base.rs_startblock); - if (page == 0) { - page = scan->rs_base.rs_nblocks; - } - page--; - } else { + if (scan->dop > 1) { + Assert(scan->rs_parallel == NULL); + Assert(dir == ForwardScanDirection); page++; - if (page >= scan->rs_base.rs_nblocks) { - page = 0; + if ((page - scan->rs_base.rs_startblock) % PARALLEL_SCAN_GAP == 0) { + page += (scan->dop - 1) * PARALLEL_SCAN_GAP; } - finished = (page == scan->rs_base.rs_startblock); - + finished = (page >= scan->rs_base.rs_nblocks); + } else { /* - * Report our new scan position for synchronization purposes. We - * don't do that when moving backwards, however. That would just - * mess up any other forward-moving scanners. - * - * Note: we do this before checking for end of scan so that the - * final state of the position hint is back at the start of the - * rel. That's not strictly necessary, but otherwise when you run - * the same query multiple times the starting position would shift - * a little bit backwards on every invocation, which is confusing. - * We don't guarantee any specific ordering in general, though. - */ - if (scan->rs_allow_sync) { - ss_report_location(scan->rs_base.rs_rd, page); + * advance to next/prior page and detect end of scan + */ + if (BackwardScanDirection == dir) { + finished = (page == scan->rs_base.rs_startblock); + if (page == 0) { + page = scan->rs_base.rs_nblocks; + } + page--; + } else { + page++; + if (page >= scan->rs_base.rs_nblocks) { + page = 0; + } + finished = (page == scan->rs_base.rs_startblock); + + /* + * Report our new scan position for synchronization purposes. We + * don't do that when moving backwards, however. That would just + * mess up any other forward-moving scanners. + * + * Note: we do this before checking for end of scan so that the + * final state of the position hint is back at the start of the + * rel. That's not strictly necessary, but otherwise when you run + * the same query multiple times the starting position would shift + * a little bit backwards on every invocation, which is confusing. + * We don't guarantee any specific ordering in general, though. + */ + if (scan->rs_allow_sync) { + ss_report_location(scan->rs_base.rs_rd, page); + } } } @@ -684,6 +694,7 @@ TableScanDesc UHeapBeginScan(Relation relation, Snapshot snapshot, int nkeys, Pa uscan->rs_base.rs_ntuples = 0; uscan->rs_cutup = NULL; uscan->rs_parallel = parallel_scan; + uscan->dop = 1; if (uscan->rs_parallel != NULL) { /* For parallel scan, believe whatever ParallelHeapScanDesc says. */ uscan->rs_base.rs_syncscan = uscan->rs_parallel->phs_syncscan; @@ -780,6 +791,7 @@ static void UHeapinitscan(TableScanDesc sscan, ScanKey key, bool isRescan) scan->rs_base.rs_inited = false; scan->rs_base.rs_cbuf = InvalidBuffer; scan->rs_base.rs_cblock = InvalidBlockNumber; + scan->dop = 1; if (scan->rs_base.rs_rd->rd_tam_ops == TableAmUstore) { scan->rs_base.lastVar = -1; @@ -1194,6 +1206,34 @@ void UHeapRestRpos(TableScanDesc sscan) } } +void UeapInitParallelSeqscan(TableScanDesc sscan, int32 dop, ScanDirection dir) +{ + HeapScanDesc scan = (HeapScanDesc) sscan; + + if (!scan || scan->rs_base.rs_nblocks == 0) { + return; + } + + if (dop <= 1) { + return; + } + + scan->dop = dop; + + uint32 paral_blocks = u_sess->stream_cxt.smp_id * PARALLEL_SCAN_GAP; + + /* If not enough pages to divide into every worker. */ + if (scan->rs_base.rs_nblocks <= paral_blocks) { + scan->rs_base.rs_startblock = 0; + scan->rs_base.rs_nblocks = 0; + return; + } + if(dir == BackwardScanDirection){ + ereport(ERROR, (errmsg("Backward Scan Direction is not support for ustore parallel seq scan."))); + } + scan->rs_base.rs_startblock = paral_blocks; +} + UHeapTuple UHeapGetNext(TableScanDesc sscan, ScanDirection dir, bool* has_cur_xact_write) { UHeapScanDesc scan = (UHeapScanDesc)sscan; diff --git a/src/include/access/ustore/knl_uscan.h b/src/include/access/ustore/knl_uscan.h index f5edfe7c0..5423340f0 100644 --- a/src/include/access/ustore/knl_uscan.h +++ b/src/include/access/ustore/knl_uscan.h @@ -38,6 +38,7 @@ typedef struct UHeapScanDescData { /* these fields only used in page-at-a-time mode and for bitmap scans */ int rs_mindex; /* marked tuple's saved index */ + int dop; /* scan parallel degree */ UHeapTuple rs_visutuples[MaxPossibleUHeapTuplesPerPage]; /* visible tuples */ UHeapTuple rs_cutup; /* current tuple in scan, if any */ @@ -71,6 +72,7 @@ bool UHeapScanBitmapNextTuple(TableScanDesc sscan, TBMIterateResult *tbmres, Tup bool UHeapScanBitmapNextBlock(TableScanDesc sscan, const TBMIterateResult *tbmres, bool* has_cur_xact_write = NULL); bool UHeapGetPage(TableScanDesc sscan, BlockNumber page, bool* has_cur_xact_write = NULL); +void UeapInitParallelSeqscan(TableScanDesc sscan, int32 dop, ScanDirection dir); UHeapTuple UHeapGetNext(TableScanDesc sscan, ScanDirection dir, bool* has_cur_xact_write = NULL); extern bool UHeapGetTupPageBatchmode(UHeapScanDesc scan, ScanDirection dir);