fix partition bitmapand use btree with gin or gist
Signed-off-by: xiliu <xiliu_h@163.com>
This commit is contained in:
@ -57,6 +57,8 @@ static TupleTableSlot* BitmapHeapTblNext(BitmapHeapScanState* node);
|
||||
static void bitgetpage(HeapScanDesc scan, TBMIterateResult* tbmres);
|
||||
static void ExecInitPartitionForBitmapHeapScan(BitmapHeapScanState* scanstate, EState* estate);
|
||||
static void ExecInitNextPartitionForBitmapHeapScan(BitmapHeapScanState* node);
|
||||
static void BitmapHeapPrefetchNext(
|
||||
BitmapHeapScanState* node, HeapScanDesc scan, const TIDBitmap* tbm, TBMIterator** prefetch_iterator);
|
||||
|
||||
/* This struct is used for partition switch while prefetch pages */
|
||||
typedef struct PrefetchNode {
|
||||
@ -182,18 +184,6 @@ static TupleTableSlot* BitmapHeapTblNext(BitmapHeapScanState* node)
|
||||
break;
|
||||
}
|
||||
|
||||
/* Check whether switch partition-fake-rel, use rd_rel save */
|
||||
if (BitmapNodeNeedSwitchPartRel(node)) {
|
||||
GPISetCurrPartOid(node->gpi_scan, node->tbmres->partitionOid);
|
||||
if (!GPIGetNextPartRelation(node->gpi_scan, CurrentMemoryContext, AccessShareLock)) {
|
||||
/* If the current partition is invalid, the next page is directly processed */
|
||||
tbmres = NULL;
|
||||
continue;
|
||||
}
|
||||
scan->rs_rd = node->gpi_scan->fakePartRelation;
|
||||
scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
|
||||
}
|
||||
|
||||
#ifdef USE_PREFETCH
|
||||
if (node->prefetch_pages > 0) {
|
||||
/* The main iterator has closed the distance by one page */
|
||||
@ -211,6 +201,21 @@ static TupleTableSlot* BitmapHeapTblNext(BitmapHeapScanState* node)
|
||||
}
|
||||
#endif /* USE_PREFETCH */
|
||||
|
||||
/* Check whether switch partition-fake-rel, use rd_rel save */
|
||||
if (BitmapNodeNeedSwitchPartRel(node)) {
|
||||
GPISetCurrPartOid(node->gpi_scan, node->tbmres->partitionOid);
|
||||
if (!GPIGetNextPartRelation(node->gpi_scan, CurrentMemoryContext, AccessShareLock)) {
|
||||
/* If the current partition is invalid, the next page is directly processed */
|
||||
tbmres = NULL;
|
||||
#ifdef USE_PREFETCH
|
||||
BitmapHeapPrefetchNext(node, scan, tbm, &prefetch_iterator);
|
||||
#endif /* USE_PREFETCH */
|
||||
continue;
|
||||
}
|
||||
scan->rs_rd = node->gpi_scan->fakePartRelation;
|
||||
scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
|
||||
}
|
||||
|
||||
/*
|
||||
* Ignore any claimed entries past what we think is the end of the
|
||||
* relation. (This is probably not necessary given that we got at
|
||||
@ -286,126 +291,7 @@ static TupleTableSlot* BitmapHeapTblNext(BitmapHeapScanState* node)
|
||||
}
|
||||
|
||||
#ifdef USE_PREFETCH
|
||||
|
||||
/*
|
||||
* We issue prefetch requests *after* fetching the current page to try
|
||||
* to avoid having prefetching interfere with the main I/O. Also, this
|
||||
* should happen only when we have determined there is still something
|
||||
* to do on the current page, else we may uselessly prefetch the same
|
||||
* page we are just about to request for real.
|
||||
*/
|
||||
if (prefetch_iterator != NULL) {
|
||||
ADIO_RUN()
|
||||
{
|
||||
BlockNumber* blockList = NULL;
|
||||
BlockNumber* blockListPtr = NULL;
|
||||
PrefetchNode* prefetchNode = NULL;
|
||||
PrefetchNode* prefetchNodePtr = NULL;
|
||||
int prefetchNow = 0;
|
||||
int prefetchWindow = node->prefetch_target - node->prefetch_pages;
|
||||
|
||||
/* We expect to prefetch at most prefetchWindow pages */
|
||||
if (prefetchWindow > 0) {
|
||||
if (tbm_is_global(tbm)) {
|
||||
prefetchNode = (PrefetchNode*)malloc(sizeof(PrefetchNode) * prefetchWindow);
|
||||
prefetchNodePtr = prefetchNode;
|
||||
}
|
||||
blockList = (BlockNumber*)palloc(sizeof(BlockNumber) * prefetchWindow);
|
||||
blockListPtr = blockList;
|
||||
}
|
||||
while (node->prefetch_pages < node->prefetch_target) {
|
||||
TBMIterateResult* tbmpre = tbm_iterate(prefetch_iterator);
|
||||
|
||||
if (tbmpre == NULL) {
|
||||
/* No more pages to prefetch */
|
||||
tbm_end_iterate(prefetch_iterator);
|
||||
node->prefetch_iterator = prefetch_iterator = NULL;
|
||||
break;
|
||||
}
|
||||
node->prefetch_pages++;
|
||||
/* we use PrefetchNode here to store relations between blockno and partition Oid */
|
||||
if (tbm_is_global(tbm)) {
|
||||
prefetchNodePtr->blockNum = tbmpre->blockno;
|
||||
prefetchNodePtr->partOid = tbmpre->partitionOid;
|
||||
prefetchNodePtr++;
|
||||
}
|
||||
/* For Async Direct I/O we accumulate a list and send it */
|
||||
*blockListPtr++ = tbmpre->blockno;
|
||||
prefetchNow++;
|
||||
}
|
||||
|
||||
/* Send the list we generated and free it */
|
||||
if (prefetchNow) {
|
||||
if (tbm_is_global(tbm)) {
|
||||
/*
|
||||
* we must save part Oid before switch relation, and recover it after prefetch.
|
||||
* The reason for this is to assure correctness while getting a new tbmres.
|
||||
*/
|
||||
Oid oldOid = GPIGetCurrPartOid(node->gpi_scan);
|
||||
int blkCount = 0;
|
||||
Oid prevOid = prefetchNode[0].partOid;
|
||||
for (int i = 0; i < prefetchNow; i++) {
|
||||
if (prefetchNode[i].partOid == prevOid) {
|
||||
blockList[blkCount++] = prefetchNode[i].blockNum;
|
||||
} else {
|
||||
GPISetCurrPartOid(node->gpi_scan, prevOid);
|
||||
if (GPIGetNextPartRelation(node->gpi_scan, CurrentMemoryContext, AccessShareLock)) {
|
||||
PageListPrefetch(
|
||||
node->gpi_scan->fakePartRelation, MAIN_FORKNUM, blockList, blkCount, 0, 0);
|
||||
}
|
||||
blkCount = 0;
|
||||
prevOid = prefetchNode[i].partOid;
|
||||
blockList[blkCount++] = prefetchNode[i].blockNum;
|
||||
}
|
||||
}
|
||||
GPISetCurrPartOid(node->gpi_scan, prevOid);
|
||||
if (GPIGetNextPartRelation(node->gpi_scan, CurrentMemoryContext, AccessShareLock)) {
|
||||
PageListPrefetch(node->gpi_scan->fakePartRelation, MAIN_FORKNUM, blockList, blkCount, 0, 0);
|
||||
}
|
||||
/* recover old oid after prefetch switch */
|
||||
GPISetCurrPartOid(node->gpi_scan, oldOid);
|
||||
} else {
|
||||
PageListPrefetch(scan->rs_rd, MAIN_FORKNUM, blockList, prefetchNow, 0, 0);
|
||||
}
|
||||
}
|
||||
if (prefetchWindow > 0) {
|
||||
pfree_ext(blockList);
|
||||
if (tbm_is_global(tbm)) {
|
||||
pfree_ext(prefetchNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
ADIO_ELSE()
|
||||
{
|
||||
Oid oldOid = GPIGetCurrPartOid(node->gpi_scan);
|
||||
while (node->prefetch_pages < node->prefetch_target) {
|
||||
TBMIterateResult* tbmpre = tbm_iterate(prefetch_iterator);
|
||||
Relation prefetchRel = scan->rs_rd;
|
||||
if (tbmpre == NULL) {
|
||||
/* No more pages to prefetch */
|
||||
tbm_end_iterate(prefetch_iterator);
|
||||
node->prefetch_iterator = prefetch_iterator = NULL;
|
||||
break;
|
||||
}
|
||||
node->prefetch_pages++;
|
||||
if (tbm_is_global(node->tbm) && GPIScanCheckPartOid(node->gpi_scan, tbmpre->partitionOid)) {
|
||||
GPISetCurrPartOid(node->gpi_scan, tbmpre->partitionOid);
|
||||
if (!GPIGetNextPartRelation(node->gpi_scan, CurrentMemoryContext, AccessShareLock)) {
|
||||
/* If the current partition is invalid, the next page is directly processed */
|
||||
tbmpre = NULL;
|
||||
continue;
|
||||
} else {
|
||||
prefetchRel = node->gpi_scan->fakePartRelation;
|
||||
}
|
||||
}
|
||||
/* For posix_fadvise() we just send the one request */
|
||||
PrefetchBuffer(prefetchRel, MAIN_FORKNUM, tbmpre->blockno);
|
||||
}
|
||||
/* recover old oid after prefetch switch */
|
||||
GPISetCurrPartOid(node->gpi_scan, oldOid);
|
||||
}
|
||||
ADIO_END();
|
||||
}
|
||||
BitmapHeapPrefetchNext(node, scan, tbm, &prefetch_iterator);
|
||||
#endif /* USE_PREFETCH */
|
||||
|
||||
/*
|
||||
@ -908,3 +794,127 @@ static void ExecInitPartitionForBitmapHeapScan(BitmapHeapScanState* scanstate, E
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* We issue prefetch requests *after* fetching the current page to try
|
||||
* to avoid having prefetching interfere with the main I/O. Also, this
|
||||
* should happen only when we have determined there is still something
|
||||
* to do on the current page, else we may uselessly prefetch the same
|
||||
* page we are just about to request for real.
|
||||
*/
|
||||
void BitmapHeapPrefetchNext(
|
||||
BitmapHeapScanState* node, HeapScanDesc scan, const TIDBitmap* tbm, TBMIterator** prefetch_iterator)
|
||||
{
|
||||
if (*prefetch_iterator == NULL) {
|
||||
return;
|
||||
}
|
||||
ADIO_RUN()
|
||||
{
|
||||
BlockNumber* blockList = NULL;
|
||||
BlockNumber* blockListPtr = NULL;
|
||||
PrefetchNode* prefetchNode = NULL;
|
||||
PrefetchNode* prefetchNodePtr = NULL;
|
||||
int prefetchNow = 0;
|
||||
int prefetchWindow = node->prefetch_target - node->prefetch_pages;
|
||||
|
||||
/* We expect to prefetch at most prefetchWindow pages */
|
||||
if (prefetchWindow > 0) {
|
||||
if (tbm_is_global(tbm)) {
|
||||
prefetchNode = (PrefetchNode*)malloc(sizeof(PrefetchNode) * prefetchWindow);
|
||||
prefetchNodePtr = prefetchNode;
|
||||
}
|
||||
blockList = (BlockNumber*)palloc(sizeof(BlockNumber) * prefetchWindow);
|
||||
blockListPtr = blockList;
|
||||
}
|
||||
while (node->prefetch_pages < node->prefetch_target) {
|
||||
TBMIterateResult* tbmpre = tbm_iterate(*prefetch_iterator);
|
||||
|
||||
if (tbmpre == NULL) {
|
||||
/* No more pages to prefetch */
|
||||
tbm_end_iterate(*prefetch_iterator);
|
||||
node->prefetch_iterator = *prefetch_iterator = NULL;
|
||||
break;
|
||||
}
|
||||
node->prefetch_pages++;
|
||||
/* we use PrefetchNode here to store relations between blockno and partition Oid */
|
||||
if (tbm_is_global(tbm)) {
|
||||
prefetchNodePtr->blockNum = tbmpre->blockno;
|
||||
prefetchNodePtr->partOid = tbmpre->partitionOid;
|
||||
prefetchNodePtr++;
|
||||
}
|
||||
/* For Async Direct I/O we accumulate a list and send it */
|
||||
*blockListPtr++ = tbmpre->blockno;
|
||||
prefetchNow++;
|
||||
}
|
||||
|
||||
/* Send the list we generated and free it */
|
||||
if (prefetchNow) {
|
||||
if (tbm_is_global(tbm)) {
|
||||
/*
|
||||
* we must save part Oid before switch relation, and recover it after prefetch.
|
||||
* The reason for this is to assure correctness while getting a new tbmres.
|
||||
*/
|
||||
Oid oldOid = GPIGetCurrPartOid(node->gpi_scan);
|
||||
int blkCount = 0;
|
||||
Oid prevOid = prefetchNode[0].partOid;
|
||||
for (int i = 0; i < prefetchNow; i++) {
|
||||
if (prefetchNode[i].partOid == prevOid) {
|
||||
blockList[blkCount++] = prefetchNode[i].blockNum;
|
||||
} else {
|
||||
GPISetCurrPartOid(node->gpi_scan, prevOid);
|
||||
if (GPIGetNextPartRelation(node->gpi_scan, CurrentMemoryContext, AccessShareLock)) {
|
||||
PageListPrefetch(node->gpi_scan->fakePartRelation, MAIN_FORKNUM, blockList, blkCount, 0, 0);
|
||||
}
|
||||
blkCount = 0;
|
||||
prevOid = prefetchNode[i].partOid;
|
||||
blockList[blkCount++] = prefetchNode[i].blockNum;
|
||||
}
|
||||
}
|
||||
GPISetCurrPartOid(node->gpi_scan, prevOid);
|
||||
if (GPIGetNextPartRelation(node->gpi_scan, CurrentMemoryContext, AccessShareLock)) {
|
||||
PageListPrefetch(node->gpi_scan->fakePartRelation, MAIN_FORKNUM, blockList, blkCount, 0, 0);
|
||||
}
|
||||
/* recover old oid after prefetch switch */
|
||||
GPISetCurrPartOid(node->gpi_scan, oldOid);
|
||||
} else {
|
||||
PageListPrefetch(scan->rs_rd, MAIN_FORKNUM, blockList, prefetchNow, 0, 0);
|
||||
}
|
||||
}
|
||||
if (prefetchWindow > 0) {
|
||||
pfree_ext(blockList);
|
||||
if (tbm_is_global(tbm)) {
|
||||
pfree_ext(prefetchNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
ADIO_ELSE()
|
||||
{
|
||||
Oid oldOid = GPIGetCurrPartOid(node->gpi_scan);
|
||||
while (node->prefetch_pages < node->prefetch_target) {
|
||||
TBMIterateResult* tbmpre = tbm_iterate(*prefetch_iterator);
|
||||
Relation prefetchRel = scan->rs_rd;
|
||||
if (tbmpre == NULL) {
|
||||
/* No more pages to prefetch */
|
||||
tbm_end_iterate(*prefetch_iterator);
|
||||
node->prefetch_iterator = *prefetch_iterator = NULL;
|
||||
break;
|
||||
}
|
||||
node->prefetch_pages++;
|
||||
if (tbm_is_global(node->tbm) && GPIScanCheckPartOid(node->gpi_scan, tbmpre->partitionOid)) {
|
||||
GPISetCurrPartOid(node->gpi_scan, tbmpre->partitionOid);
|
||||
if (!GPIGetNextPartRelation(node->gpi_scan, CurrentMemoryContext, AccessShareLock)) {
|
||||
/* If the current partition is invalid, the next page is directly processed */
|
||||
tbmpre = NULL;
|
||||
continue;
|
||||
} else {
|
||||
prefetchRel = node->gpi_scan->fakePartRelation;
|
||||
}
|
||||
}
|
||||
/* For posix_fadvise() we just send the one request */
|
||||
PrefetchBuffer(prefetchRel, MAIN_FORKNUM, tbmpre->blockno);
|
||||
}
|
||||
/* recover old oid after prefetch switch */
|
||||
GPISetCurrPartOid(node->gpi_scan, oldOid);
|
||||
}
|
||||
ADIO_END();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user