ustore support seq scan smp

This commit is contained in:
zhaosen
2024-07-15 18:13:29 +08:00
parent 38de12c76b
commit 4abafc6b22
4 changed files with 75 additions and 29 deletions

View File

@ -1136,7 +1136,7 @@ static void set_plain_rel_pathlist(PlannerInfo* root, RelOptInfo* rel, RangeTblE
* support normal row table unless it is partitioned.
* The partition table can be parallelized when partItrs > u_sess->opt_cxt.query_dop.
*/
bool can_parallel = IS_STREAM_PLAN && (u_sess->opt_cxt.query_dop > 1) && (!rel->is_ustore) &&
bool can_parallel = IS_STREAM_PLAN && (u_sess->opt_cxt.query_dop > 1) &&
(rel->locator_type != LOCATOR_TYPE_REPLICATED) && (rte->tablesample == NULL);
if (!isrp) {
#endif

View File

@ -925,6 +925,10 @@ void UHeapamScanMarkpos(TableScanDesc sscan)
return UHeapMarkPos(sscan);
}
void UHeapamScanInitParallelSeqscan(TableScanDesc sscan, int32 dop, ScanDirection dir)
{
return UeapInitParallelSeqscan(sscan, dop, dir);
}
void UHeapamScanEndscan(TableScanDesc sscan)
{
@ -1216,7 +1220,7 @@ static const TableAmRoutine g_ustoream_methods = {
scan_restrpos : UHeapamScanRestrpos,
scan_markpos : UHeapamScanMarkpos,
scan_init_parallel_seqscan : HeapamScanInitParallelSeqscan,
scan_init_parallel_seqscan : UHeapamScanInitParallelSeqscan,
scan_getnexttuple : UHeapamScanGetnexttuple,
scan_GetNextBatch : UHeapamGetNextBatchMode,
scan_getpage : UHeapamScanGetpage,

View File

@ -89,36 +89,46 @@ FORCE_INLINE
bool NextUpage(UHeapScanDesc scan, ScanDirection dir, BlockNumber& page)
{
bool finished = false;
/*
* advance to next/prior page and detect end of scan
*/
if (BackwardScanDirection == dir) {
finished = (page == scan->rs_base.rs_startblock);
if (page == 0) {
page = scan->rs_base.rs_nblocks;
}
page--;
} else {
if (scan->dop > 1) {
Assert(scan->rs_parallel == NULL);
Assert(dir == ForwardScanDirection);
page++;
if (page >= scan->rs_base.rs_nblocks) {
page = 0;
if ((page - scan->rs_base.rs_startblock) % PARALLEL_SCAN_GAP == 0) {
page += (scan->dop - 1) * PARALLEL_SCAN_GAP;
}
finished = (page == scan->rs_base.rs_startblock);
finished = (page >= scan->rs_base.rs_nblocks);
} else {
/*
* Report our new scan position for synchronization purposes. We
* don't do that when moving backwards, however. That would just
* mess up any other forward-moving scanners.
*
* Note: we do this before checking for end of scan so that the
* final state of the position hint is back at the start of the
* rel. That's not strictly necessary, but otherwise when you run
* the same query multiple times the starting position would shift
* a little bit backwards on every invocation, which is confusing.
* We don't guarantee any specific ordering in general, though.
*/
if (scan->rs_allow_sync) {
ss_report_location(scan->rs_base.rs_rd, page);
* advance to next/prior page and detect end of scan
*/
if (BackwardScanDirection == dir) {
finished = (page == scan->rs_base.rs_startblock);
if (page == 0) {
page = scan->rs_base.rs_nblocks;
}
page--;
} else {
page++;
if (page >= scan->rs_base.rs_nblocks) {
page = 0;
}
finished = (page == scan->rs_base.rs_startblock);
/*
* Report our new scan position for synchronization purposes. We
* don't do that when moving backwards, however. That would just
* mess up any other forward-moving scanners.
*
* Note: we do this before checking for end of scan so that the
* final state of the position hint is back at the start of the
* rel. That's not strictly necessary, but otherwise when you run
* the same query multiple times the starting position would shift
* a little bit backwards on every invocation, which is confusing.
* We don't guarantee any specific ordering in general, though.
*/
if (scan->rs_allow_sync) {
ss_report_location(scan->rs_base.rs_rd, page);
}
}
}
@ -684,6 +694,7 @@ TableScanDesc UHeapBeginScan(Relation relation, Snapshot snapshot, int nkeys, Pa
uscan->rs_base.rs_ntuples = 0;
uscan->rs_cutup = NULL;
uscan->rs_parallel = parallel_scan;
uscan->dop = 1;
if (uscan->rs_parallel != NULL) {
/* For parallel scan, believe whatever ParallelHeapScanDesc says. */
uscan->rs_base.rs_syncscan = uscan->rs_parallel->phs_syncscan;
@ -780,6 +791,7 @@ static void UHeapinitscan(TableScanDesc sscan, ScanKey key, bool isRescan)
scan->rs_base.rs_inited = false;
scan->rs_base.rs_cbuf = InvalidBuffer;
scan->rs_base.rs_cblock = InvalidBlockNumber;
scan->dop = 1;
if (scan->rs_base.rs_rd->rd_tam_ops == TableAmUstore) {
scan->rs_base.lastVar = -1;
@ -1194,6 +1206,34 @@ void UHeapRestRpos(TableScanDesc sscan)
}
}
void UeapInitParallelSeqscan(TableScanDesc sscan, int32 dop, ScanDirection dir)
{
HeapScanDesc scan = (HeapScanDesc) sscan;
if (!scan || scan->rs_base.rs_nblocks == 0) {
return;
}
if (dop <= 1) {
return;
}
scan->dop = dop;
uint32 paral_blocks = u_sess->stream_cxt.smp_id * PARALLEL_SCAN_GAP;
/* If not enough pages to divide into every worker. */
if (scan->rs_base.rs_nblocks <= paral_blocks) {
scan->rs_base.rs_startblock = 0;
scan->rs_base.rs_nblocks = 0;
return;
}
if(dir == BackwardScanDirection){
ereport(ERROR, (errmsg("Backward Scan Direction is not support for ustore parallel seq scan.")));
}
scan->rs_base.rs_startblock = paral_blocks;
}
UHeapTuple UHeapGetNext(TableScanDesc sscan, ScanDirection dir, bool* has_cur_xact_write)
{
UHeapScanDesc scan = (UHeapScanDesc)sscan;

View File

@ -38,6 +38,7 @@ typedef struct UHeapScanDescData {
/* these fields only used in page-at-a-time mode and for bitmap scans */
int rs_mindex; /* marked tuple's saved index */
int dop; /* scan parallel degree */
UHeapTuple rs_visutuples[MaxPossibleUHeapTuplesPerPage]; /* visible tuples */
UHeapTuple rs_cutup; /* current tuple in scan, if any */
@ -71,6 +72,7 @@ bool UHeapScanBitmapNextTuple(TableScanDesc sscan, TBMIterateResult *tbmres, Tup
bool UHeapScanBitmapNextBlock(TableScanDesc sscan, const TBMIterateResult *tbmres,
bool* has_cur_xact_write = NULL);
bool UHeapGetPage(TableScanDesc sscan, BlockNumber page, bool* has_cur_xact_write = NULL);
void UeapInitParallelSeqscan(TableScanDesc sscan, int32 dop, ScanDirection dir);
UHeapTuple UHeapGetNext(TableScanDesc sscan, ScanDirection dir, bool* has_cur_xact_write = NULL);
extern bool UHeapGetTupPageBatchmode(UHeapScanDesc scan, ScanDirection dir);