From 01d4c96ad4d4a4e3de7728ae5070a935a0c915aa Mon Sep 17 00:00:00 2001 From: zhaosen Date: Fri, 26 Jul 2024 14:45:14 +0800 Subject: [PATCH] reuse function in astore --- .../storage/access/table/tableam.cpp | 6 +-- .../storage/access/ustore/knl_uscan.cpp | 54 ++++++++----------- src/include/access/ustore/knl_uscan.h | 1 - 3 files changed, 22 insertions(+), 39 deletions(-) diff --git a/src/gausskernel/storage/access/table/tableam.cpp b/src/gausskernel/storage/access/table/tableam.cpp index 83ebf3b23..200ba28a1 100644 --- a/src/gausskernel/storage/access/table/tableam.cpp +++ b/src/gausskernel/storage/access/table/tableam.cpp @@ -925,10 +925,6 @@ void UHeapamScanMarkpos(TableScanDesc sscan) return UHeapMarkPos(sscan); } -void UHeapamScanInitParallelSeqscan(TableScanDesc sscan, int32 dop, ScanDirection dir) -{ - return UHeapInitParallelSeqscan(sscan, dop, dir); -} void UHeapamScanEndscan(TableScanDesc sscan) { @@ -1220,7 +1216,7 @@ static const TableAmRoutine g_ustoream_methods = { scan_restrpos : UHeapamScanRestrpos, scan_markpos : UHeapamScanMarkpos, - scan_init_parallel_seqscan : UHeapamScanInitParallelSeqscan, + scan_init_parallel_seqscan : HeapamScanInitParallelSeqscan, scan_getnexttuple : UHeapamScanGetnexttuple, scan_GetNextBatch : UHeapamGetNextBatchMode, scan_getpage : UHeapamScanGetpage, diff --git a/src/gausskernel/storage/access/ustore/knl_uscan.cpp b/src/gausskernel/storage/access/ustore/knl_uscan.cpp index f12c921b8..b2c8af3df 100644 --- a/src/gausskernel/storage/access/ustore/knl_uscan.cpp +++ b/src/gausskernel/storage/access/ustore/knl_uscan.cpp @@ -91,12 +91,28 @@ bool NextUpage(UHeapScanDesc scan, ScanDirection dir, BlockNumber& page) bool finished = false; if (scan->dop > 1) { Assert(scan->rs_parallel == NULL); - Assert(dir == ForwardScanDirection); - page++; - if ((page - scan->rs_base.rs_startblock) % PARALLEL_SCAN_GAP == 0) { - page += (scan->dop - 1) * PARALLEL_SCAN_GAP; + if (BackwardScanDirection == dir) { + finished = (page == 0); + if (finished) + return finished; + page--; + if ((scan->rs_base.rs_startblock - page) % PARALLEL_SCAN_GAP == 0) { + page -= (scan->dop - 1) * PARALLEL_SCAN_GAP; + } + } else { + page++; + if ((page - scan->rs_base.rs_startblock) % PARALLEL_SCAN_GAP == 0) { + page += (scan->dop - 1) * PARALLEL_SCAN_GAP; + } + + if (scan->rs_base.rs_rangeScanInRedis.isRangeScanInRedis) { + /* Parallel workers start from different point. */ + finished = + (page >= scan->rs_base.rs_startblock + scan->rs_base.rs_nblocks - PARALLEL_SCAN_GAP * u_sess->stream_cxt.smp_id); + } else { + finished = (page >= scan->rs_base.rs_nblocks); + } } - finished = (page >= scan->rs_base.rs_nblocks); } else { /* * advance to next/prior page and detect end of scan @@ -1206,34 +1222,6 @@ void UHeapRestRpos(TableScanDesc sscan) } } -void UHeapInitParallelSeqscan(TableScanDesc sscan, int32 dop, ScanDirection dir) -{ - HeapScanDesc scan = (HeapScanDesc) sscan; - - if (!scan || scan->rs_base.rs_nblocks == 0) { - return; - } - - if (dop <= 1) { - return; - } - - scan->dop = dop; - - uint32 paral_blocks = u_sess->stream_cxt.smp_id * PARALLEL_SCAN_GAP; - - /* If not enough pages to divide into every worker. */ - if (scan->rs_base.rs_nblocks <= paral_blocks) { - scan->rs_base.rs_startblock = 0; - scan->rs_base.rs_nblocks = 0; - return; - } - if(dir == BackwardScanDirection){ - ereport(ERROR, (errmsg("Backward Scan Direction is not support for ustore parallel seq scan."))); - } - scan->rs_base.rs_startblock = paral_blocks; -} - UHeapTuple UHeapGetNext(TableScanDesc sscan, ScanDirection dir, bool* has_cur_xact_write) { UHeapScanDesc scan = (UHeapScanDesc)sscan; diff --git a/src/include/access/ustore/knl_uscan.h b/src/include/access/ustore/knl_uscan.h index 5423340f0..e8a7fae26 100644 --- a/src/include/access/ustore/knl_uscan.h +++ b/src/include/access/ustore/knl_uscan.h @@ -72,7 +72,6 @@ bool UHeapScanBitmapNextTuple(TableScanDesc sscan, TBMIterateResult *tbmres, Tup bool UHeapScanBitmapNextBlock(TableScanDesc sscan, const TBMIterateResult *tbmres, bool* has_cur_xact_write = NULL); bool UHeapGetPage(TableScanDesc sscan, BlockNumber page, bool* has_cur_xact_write = NULL); -void UeapInitParallelSeqscan(TableScanDesc sscan, int32 dop, ScanDirection dir); UHeapTuple UHeapGetNext(TableScanDesc sscan, ScanDirection dir, bool* has_cur_xact_write = NULL); extern bool UHeapGetTupPageBatchmode(UHeapScanDesc scan, ScanDirection dir);