reuse function in astore
This commit is contained in:
@ -925,10 +925,6 @@ void UHeapamScanMarkpos(TableScanDesc sscan)
|
||||
return UHeapMarkPos(sscan);
|
||||
}
|
||||
|
||||
void UHeapamScanInitParallelSeqscan(TableScanDesc sscan, int32 dop, ScanDirection dir)
|
||||
{
|
||||
return UHeapInitParallelSeqscan(sscan, dop, dir);
|
||||
}
|
||||
|
||||
void UHeapamScanEndscan(TableScanDesc sscan)
|
||||
{
|
||||
@ -1220,7 +1216,7 @@ static const TableAmRoutine g_ustoream_methods = {
|
||||
scan_restrpos : UHeapamScanRestrpos,
|
||||
scan_markpos : UHeapamScanMarkpos,
|
||||
|
||||
scan_init_parallel_seqscan : UHeapamScanInitParallelSeqscan,
|
||||
scan_init_parallel_seqscan : HeapamScanInitParallelSeqscan,
|
||||
scan_getnexttuple : UHeapamScanGetnexttuple,
|
||||
scan_GetNextBatch : UHeapamGetNextBatchMode,
|
||||
scan_getpage : UHeapamScanGetpage,
|
||||
|
||||
@ -91,12 +91,28 @@ bool NextUpage(UHeapScanDesc scan, ScanDirection dir, BlockNumber& page)
|
||||
bool finished = false;
|
||||
if (scan->dop > 1) {
|
||||
Assert(scan->rs_parallel == NULL);
|
||||
Assert(dir == ForwardScanDirection);
|
||||
page++;
|
||||
if ((page - scan->rs_base.rs_startblock) % PARALLEL_SCAN_GAP == 0) {
|
||||
page += (scan->dop - 1) * PARALLEL_SCAN_GAP;
|
||||
if (BackwardScanDirection == dir) {
|
||||
finished = (page == 0);
|
||||
if (finished)
|
||||
return finished;
|
||||
page--;
|
||||
if ((scan->rs_base.rs_startblock - page) % PARALLEL_SCAN_GAP == 0) {
|
||||
page -= (scan->dop - 1) * PARALLEL_SCAN_GAP;
|
||||
}
|
||||
} else {
|
||||
page++;
|
||||
if ((page - scan->rs_base.rs_startblock) % PARALLEL_SCAN_GAP == 0) {
|
||||
page += (scan->dop - 1) * PARALLEL_SCAN_GAP;
|
||||
}
|
||||
|
||||
if (scan->rs_base.rs_rangeScanInRedis.isRangeScanInRedis) {
|
||||
/* Parallel workers start from different point. */
|
||||
finished =
|
||||
(page >= scan->rs_base.rs_startblock + scan->rs_base.rs_nblocks - PARALLEL_SCAN_GAP * u_sess->stream_cxt.smp_id);
|
||||
} else {
|
||||
finished = (page >= scan->rs_base.rs_nblocks);
|
||||
}
|
||||
}
|
||||
finished = (page >= scan->rs_base.rs_nblocks);
|
||||
} else {
|
||||
/*
|
||||
* advance to next/prior page and detect end of scan
|
||||
@ -1206,34 +1222,6 @@ void UHeapRestRpos(TableScanDesc sscan)
|
||||
}
|
||||
}
|
||||
|
||||
void UHeapInitParallelSeqscan(TableScanDesc sscan, int32 dop, ScanDirection dir)
|
||||
{
|
||||
HeapScanDesc scan = (HeapScanDesc) sscan;
|
||||
|
||||
if (!scan || scan->rs_base.rs_nblocks == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (dop <= 1) {
|
||||
return;
|
||||
}
|
||||
|
||||
scan->dop = dop;
|
||||
|
||||
uint32 paral_blocks = u_sess->stream_cxt.smp_id * PARALLEL_SCAN_GAP;
|
||||
|
||||
/* If not enough pages to divide into every worker. */
|
||||
if (scan->rs_base.rs_nblocks <= paral_blocks) {
|
||||
scan->rs_base.rs_startblock = 0;
|
||||
scan->rs_base.rs_nblocks = 0;
|
||||
return;
|
||||
}
|
||||
if(dir == BackwardScanDirection){
|
||||
ereport(ERROR, (errmsg("Backward Scan Direction is not support for ustore parallel seq scan.")));
|
||||
}
|
||||
scan->rs_base.rs_startblock = paral_blocks;
|
||||
}
|
||||
|
||||
UHeapTuple UHeapGetNext(TableScanDesc sscan, ScanDirection dir, bool* has_cur_xact_write)
|
||||
{
|
||||
UHeapScanDesc scan = (UHeapScanDesc)sscan;
|
||||
|
||||
@ -72,7 +72,6 @@ bool UHeapScanBitmapNextTuple(TableScanDesc sscan, TBMIterateResult *tbmres, Tup
|
||||
bool UHeapScanBitmapNextBlock(TableScanDesc sscan, const TBMIterateResult *tbmres,
|
||||
bool* has_cur_xact_write = NULL);
|
||||
bool UHeapGetPage(TableScanDesc sscan, BlockNumber page, bool* has_cur_xact_write = NULL);
|
||||
void UeapInitParallelSeqscan(TableScanDesc sscan, int32 dop, ScanDirection dir);
|
||||
|
||||
UHeapTuple UHeapGetNext(TableScanDesc sscan, ScanDirection dir, bool* has_cur_xact_write = NULL);
|
||||
extern bool UHeapGetTupPageBatchmode(UHeapScanDesc scan, ScanDirection dir);
|
||||
|
||||
Reference in New Issue
Block a user