【bugfix】修复BufferAlloc解pin后,buffer可能被淘汰的问题
This commit is contained in:
@ -2126,6 +2126,24 @@ void RedoPageWorkerRedoBcmBlock(XLogRecParseState *procState)
|
||||
}
|
||||
}
|
||||
|
||||
LWLock* OndemandGetXLogPartitionLock(BufferDesc* bufHdr, ForkNumber forkNum, BlockNumber blockNum) {
|
||||
LWLock *xlog_partition_lock = NULL;
|
||||
ondemand_extreme_rto::RedoItemTag redoItemTag;
|
||||
INIT_REDO_ITEM_TAG(redoItemTag, bufHdr->tag.rnode, forkNum, blockNum);
|
||||
uint32 slotId = GetSlotId(redoItemTag.rNode, 0, 0, GetBatchCount());
|
||||
HTAB *hashMap = g_instance.comm_cxt.predo_cxt.redoItemHashCtrl[slotId]->hTab;
|
||||
if (hashMap == NULL) {
|
||||
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED),
|
||||
errmsg("redo item hash table corrupted, there has invalid hashtable.")));
|
||||
}
|
||||
|
||||
/* get partition lock by redoItemTag */
|
||||
unsigned int partitionLockHash = XlogTrackTableHashCode(&redoItemTag);
|
||||
xlog_partition_lock = XlogTrackMappingPartitionLock(partitionLockHash);
|
||||
|
||||
return xlog_partition_lock;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check the block if need to redo and try hashmap lock.
|
||||
* There are three kinds of result as follow:
|
||||
|
||||
@ -3394,13 +3394,28 @@ retry_new_buffer:
|
||||
bool hasUnpinned = false;
|
||||
while (ondemand_extreme_rto::checkBlockRedoStateAndTryHashMapLock(buf, fork_num, block_num) == ONDEMAND_HASHMAP_ENTRY_REDOING) {
|
||||
if (!hasUnpinned) {
|
||||
UnpinBuffer(buf, true);
|
||||
hasUnpinned = true;
|
||||
UnpinBuffer(buf, true);
|
||||
hasUnpinned = true;
|
||||
}
|
||||
pg_usleep(TEN_MICROSECOND);
|
||||
}
|
||||
|
||||
/* Pin buffer again after getlock, and check if buffer has been eliminated. */
|
||||
if (hasUnpinned) {
|
||||
PinBuffer(buf, strategy);
|
||||
INIT_BUFFERTAG(new_tag, rel_file_node, fork_num, block_num);
|
||||
if (!BUFFERTAGS_PTR_EQUAL(&buf->tag, &new_tag)) {
|
||||
UnpinBuffer(buf, true);
|
||||
LWLock *xlog_partition_lock = ondemand_extreme_rto::OndemandGetXLogPartitionLock(buf, fork_num, block_num);
|
||||
if (LWLockHeldByMe(xlog_partition_lock)) {
|
||||
LWLockRelease(xlog_partition_lock);
|
||||
}
|
||||
ereport(DEBUG1, (errmodule(MOD_REDO), errcode(ERRCODE_LOG),
|
||||
errmsg("buffer has been eliminated, goto retry: spc/db/rel/bucket fork-block: %u/%u/%u/%d %d-%u.",
|
||||
buf->tag.rnode.spcNode, buf->tag.rnode.dbNode, buf->tag.rnode.relNode, buf->tag.rnode.bucketNode,
|
||||
buf->tag.forkNum, buf->tag.blockNum)));
|
||||
goto retry;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -271,6 +271,7 @@ void BatchClearRecoveryThreadHashTbl(Oid spcNode, Oid dbNode);
|
||||
void RecordBadBlockAndPushToRemote(XLogBlockDataParse *datadecode, PageErrorType error_type,
|
||||
XLogRecPtr old_lsn, XLogPhyBlock pblk);
|
||||
const char *RedoWokerRole2Str(RedoRole role);
|
||||
LWLock* OndemandGetXLogPartitionLock(BufferDesc* bufHdr, ForkNumber forkNum, BlockNumber blockNum);
|
||||
int checkBlockRedoStateAndTryHashMapLock(BufferDesc* bufHdr, ForkNumber forkNum, BlockNumber blockNum);
|
||||
bool checkBlockRedoDoneFromHashMapAndLock(LWLock **lock, RedoItemTag redoItemTag, RedoItemHashEntry **redoItemEntry,
|
||||
bool holdLock);
|
||||
|
||||
Reference in New Issue
Block a user