修复drc回收时判断了正常状态的buffer不可回收的问题

This commit is contained in:
arcoalien@qq.com
2023-10-17 16:33:25 +08:00
parent b5a8d5b056
commit 011112cacd
3 changed files with 96 additions and 33 deletions

View File

@ -55,6 +55,7 @@ void InitDmsBufCtrl(void)
buf_ctrl->pblk_relno = InvalidOid;
buf_ctrl->pblk_blkno = InvalidBlockNumber;
buf_ctrl->pblk_lsn = InvalidXLogRecPtr;
buf_ctrl->been_loaded = false;
}
}
}
@ -160,6 +161,8 @@ void ClearReadHint(int buf_id, bool buf_deleted)
if (buf_deleted) {
buf_ctrl->state = 0;
}
buf_ctrl->seg_fileno = EXTENT_INVALID;
buf_ctrl->seg_blockno = InvalidBlockNumber;
}
/*
@ -801,6 +804,13 @@ bool SSSegRead(SMgrRelation reln, ForkNumber forknum, char *buffer)
.opt = 0
};
/* Check whether the physical location info match! */
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf - 1);
if (buf_ctrl->seg_fileno != EXTENT_INVALID && (buf_ctrl->seg_fileno != buf_desc->extra->seg_fileno ||
buf_ctrl->seg_blockno != buf_desc->extra->seg_blockno)) {
ereport(PANIC, (errmsg("It seemd physical location of drc not match with buf desc!")));
}
seg_physical_read(reln->seg_space, fakenode, forknum, buf_desc->extra->seg_blockno, (char *)buffer);
if (PageIsVerified((Page)buffer, buf_desc->extra->seg_blockno)) {
rdStatus = SMGR_RD_OK;

View File

@ -646,9 +646,10 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_
errmsg("[%u/%u/%u/%d %d-%u] lock mode is null, still need to transfer page",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum)));
} else if (buf_desc->extra->seg_fileno != EXTENT_INVALID) {
(*buf_ctrl)->seg_fileno = buf_desc->extra->seg_fileno;
(*buf_ctrl)->seg_blockno = buf_desc->extra->seg_blockno;
}
(*buf_ctrl)->seg_fileno = buf_desc->extra->seg_fileno;
(*buf_ctrl)->seg_blockno = buf_desc->extra->seg_blockno;
} while (0);
}
PG_CATCH();
@ -710,6 +711,7 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
BufferTag* tag = (BufferTag *)pageid;
uint32 hash;
LWLock *partition_lock = NULL;
uint32 buf_state;
int ret = DMS_SUCCESS;
hash = BufTableHashCode(tag);
@ -736,6 +738,47 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
PG_TRY();
{
buf_desc = GetBufferDescriptor(buf_id);
if (SS_PRIMARY_MODE) {
buf_state = LockBufHdr(buf_desc);
if (BUF_STATE_GET_REFCOUNT(buf_state) != 0 || BUF_STATE_GET_USAGECOUNT(buf_state) != 0) {
UnlockBufHdr(buf_desc, buf_state);
LWLockRelease(partition_lock);
return DMS_ERROR;
}
if (!(buf_state & BM_VALID) || (buf_state & BM_IO_ERROR)) {
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[%d/%d/%d/%d %d-%d] invalidate page, buffer is not valid or io error, state = 0x%x",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, buf_desc->state)));
UnlockBufHdr(buf_desc, buf_state);
LWLockRelease(partition_lock);
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
buf_ctrl->seg_fileno = EXTENT_INVALID;
buf_ctrl->seg_blockno = InvalidBlockNumber;
return DMS_SUCCESS;
}
/* For aio (flush disk not finished), dirty, in dirty queue, dirty need flush, can't recycle */
if (buf_desc->extra->aio_in_progress || (buf_state & BM_DIRTY) || (buf_state & BM_JUST_DIRTIED) ||
XLogRecPtrIsValid(pg_atomic_read_u64(&buf_desc->extra->rec_lsn)) ||
(buf_ctrl->state & BUF_DIRTY_NEED_FLUSH)) {
ereport(DEBUG1, (errmodule(MOD_DMS),
errmsg("[%d/%d/%d/%d %d-%d] invalidate owner rejected, buffer is dirty/permanent, state = 0x%x",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, buf_desc->state)));
ret = DMS_ERROR;
} else {
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
buf_ctrl->seg_fileno = EXTENT_INVALID;
buf_ctrl->seg_blockno = InvalidBlockNumber;
}
UnlockBufHdr(buf_desc, buf_state);
LWLockRelease(partition_lock);
return ret;
}
if (IsSegmentBufferID(buf_id)) {
(void)SegPinBuffer(buf_desc);
} else {
@ -746,8 +789,9 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
bool wait_success = SSWaitIOTimeout(buf_desc);
if (!wait_success) {
DmsReleaseBuffer(buf_id + 1, IsSegmentBufferID(buf_id));
ret = GS_TIMEOUT;
break;
return ret;
}
if ((!(pg_atomic_read_u32(&buf_desc->state) & BM_VALID)) ||
@ -756,30 +800,26 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
errmsg("[%d/%d/%d/%d %d-%d] invalidate page, buffer is not valid or io error, state = 0x%x",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, buf_desc->state)));
DmsReleaseBuffer(buf_id + 1, IsSegmentBufferID(buf_id));
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
buf_ctrl->seg_fileno = EXTENT_INVALID;
buf_ctrl->seg_blockno = InvalidBlockNumber;
ret = DMS_SUCCESS;
break;
return ret;
}
bool can_invld_owner = (buf_desc->state & (BM_DIRTY | BM_JUST_DIRTIED | BM_PERMANENT)) > 0 ? false : true;
if (!invld_owner || (invld_owner && can_invld_owner)) {
get_lock = SSLWLockAcquireTimeout(buf_desc->content_lock, LW_EXCLUSIVE);
if (!get_lock) {
ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, "
"buf_id:%d, lwlock:%p",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, buf_id, buf_desc->content_lock))));
ret = GS_TIMEOUT;
} else {
buf_ctrl = GetDmsBufCtrl(buf_id);
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
LWLockRelease(buf_desc->content_lock);
}
} else { /* invalidate owner which buffer is dirty/permanent */
ereport(DEBUG1, (errmodule(MOD_DMS),
errmsg("[%d/%d/%d/%d %d-%d] invalidate owner rejected, buffer is dirty/permanent, state = 0x%x",
get_lock = SSLWLockAcquireTimeout(buf_desc->content_lock, LW_EXCLUSIVE);
if (!get_lock) {
ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, "
"buf_id:%d, lwlock:%p",
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
tag->forkNum, tag->blockNum, buf_desc->state)));
ret = DMS_ERROR;
tag->forkNum, tag->blockNum, buf_id, buf_desc->content_lock))));
ret = GS_TIMEOUT;
} else {
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
buf_ctrl->seg_fileno = EXTENT_INVALID;
buf_ctrl->seg_blockno = InvalidBlockNumber;
LWLockRelease(buf_desc->content_lock);
}
if (IsSegmentBufferID(buf_id)) {
@ -819,15 +859,17 @@ static void CBVerifyPage(dms_buf_ctrl_t *buf_ctrl, char *new_page)
BufferDesc *buf_desc = GetBufferDescriptor(buf_ctrl->buf_id);
if (buf_desc->extra->seg_fileno == EXTENT_INVALID) {
buf_desc->extra->seg_fileno = buf_ctrl->seg_fileno;
buf_desc->extra->seg_blockno = buf_ctrl->seg_blockno;
} else if (buf_desc->extra->seg_fileno != buf_ctrl->seg_fileno ||
buf_desc->extra->seg_blockno != buf_ctrl->seg_blockno) {
ereport(PANIC, (errmsg("[%u/%u/%u/%d/%d %d-%u] location mismatch, seg_fileno:%d, seg_blockno:%u",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
buf_desc->tag.rnode.bucketNode, buf_desc->tag.rnode.opt, buf_desc->tag.forkNum,
buf_desc->tag.blockNum, buf_desc->extra->seg_fileno, buf_desc->extra->seg_blockno)));
if (buf_ctrl->seg_fileno != EXTENT_INVALID) {
if (buf_desc->extra->seg_fileno == EXTENT_INVALID) {
buf_desc->extra->seg_fileno = buf_ctrl->seg_fileno;
buf_desc->extra->seg_blockno = buf_ctrl->seg_blockno;
} else if (buf_desc->extra->seg_fileno != buf_ctrl->seg_fileno ||
buf_desc->extra->seg_blockno != buf_ctrl->seg_blockno) {
ereport(PANIC, (errmsg("[%u/%u/%u/%d/%d %d-%u] location mismatch, seg_fileno:%d, seg_blockno:%u",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
buf_desc->tag.rnode.bucketNode, buf_desc->tag.rnode.opt, buf_desc->tag.forkNum,
buf_desc->tag.blockNum, buf_desc->extra->seg_fileno, buf_desc->extra->seg_blockno)));
}
}
/* page content is not valid */
@ -1170,7 +1212,7 @@ static bool SSCheckBufferIfCanGoRebuild(BufferDesc* buf_desc, uint32 buf_state)
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
if ((buf_state & BM_VALID) && (buf_ctrl->lock_mode != (unsigned char)DMS_LOCK_NULL)) {
ret = true;
} else if ((buf_state & BM_TAG_VALID) && (buf_ctrl->lock_mode != (unsigned char)DMS_LOCK_NULL)) {
} else if ((buf_state & BM_TAG_VALID) && (buf_ctrl->lock_mode != (unsigned char)DMS_LOCK_NULL) && !(buf_ctrl->state | BUF_IS_EXTEND)) {
if (LWLockConditionalAcquire(buf_desc->io_in_progress_lock, LW_SHARED)) {
ret = true;
} else {
@ -1182,6 +1224,9 @@ static bool SSCheckBufferIfCanGoRebuild(BufferDesc* buf_desc, uint32 buf_state)
* page. The stucked process will request the page again when it add content lock and the reformer will
* become owner when it request the page.
*/
ereport(WARNING, (errmsg("[%u/%u/%u/%d/0 %d-%u] Set lock mode to NULL, desc state:%u, ctrl state:%u, lock mode:%d.",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum, buf_state, buf_ctrl->state, buf_ctrl->lock_mode)));
buf_ctrl->lock_mode = DMS_LOCK_NULL;
}
}

View File

@ -2113,6 +2113,14 @@ static bool ReadBuffer_common_ReadBlock(SMgrRelation smgr, char relpersistence,
}
}
if (ENABLE_DMS) {
Buffer buf = BlockGetBuffer((char *)bufBlock);
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf - 1);
if (buf_ctrl->lock_mode == DMS_LOCK_NULL) {
ereport(PANIC, (errmsg("It seemd read buffer not across DMS!")));
}
}
return needputtodirty;
}