!4309 【资源池化】修复drc回收时invalidate page中判断错误的问题
Merge pull request !4309 from cchen676/0809master_1
This commit is contained in:
@ -55,6 +55,7 @@ void InitDmsBufCtrl(void)
|
||||
buf_ctrl->pblk_relno = InvalidOid;
|
||||
buf_ctrl->pblk_blkno = InvalidBlockNumber;
|
||||
buf_ctrl->pblk_lsn = InvalidXLogRecPtr;
|
||||
buf_ctrl->been_loaded = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -160,6 +161,8 @@ void ClearReadHint(int buf_id, bool buf_deleted)
|
||||
if (buf_deleted) {
|
||||
buf_ctrl->state = 0;
|
||||
}
|
||||
buf_ctrl->seg_fileno = EXTENT_INVALID;
|
||||
buf_ctrl->seg_blockno = InvalidBlockNumber;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -795,6 +798,13 @@ bool SSSegRead(SMgrRelation reln, ForkNumber forknum, char *buffer)
|
||||
.opt = 0
|
||||
};
|
||||
|
||||
/* Check whether the physical location info match! */
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf - 1);
|
||||
if (buf_ctrl->seg_fileno != EXTENT_INVALID && (buf_ctrl->seg_fileno != buf_desc->extra->seg_fileno ||
|
||||
buf_ctrl->seg_blockno != buf_desc->extra->seg_blockno)) {
|
||||
ereport(PANIC, (errmsg("It seemd physical location of drc not match with buf desc!")));
|
||||
}
|
||||
|
||||
seg_physical_read(reln->seg_space, fakenode, forknum, buf_desc->extra->seg_blockno, (char *)buffer);
|
||||
if (PageIsVerified((Page)buffer, buf_desc->extra->seg_blockno)) {
|
||||
rdStatus = SMGR_RD_OK;
|
||||
|
||||
@ -646,9 +646,10 @@ static int tryEnterLocalPage(BufferTag *tag, dms_lock_mode_t mode, dms_buf_ctrl_
|
||||
errmsg("[%u/%u/%u/%d %d-%u] lock mode is null, still need to transfer page",
|
||||
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
|
||||
tag->forkNum, tag->blockNum)));
|
||||
} else if (buf_desc->extra->seg_fileno != EXTENT_INVALID) {
|
||||
(*buf_ctrl)->seg_fileno = buf_desc->extra->seg_fileno;
|
||||
(*buf_ctrl)->seg_blockno = buf_desc->extra->seg_blockno;
|
||||
}
|
||||
(*buf_ctrl)->seg_fileno = buf_desc->extra->seg_fileno;
|
||||
(*buf_ctrl)->seg_blockno = buf_desc->extra->seg_blockno;
|
||||
} while (0);
|
||||
}
|
||||
PG_CATCH();
|
||||
@ -710,6 +711,7 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
|
||||
BufferTag* tag = (BufferTag *)pageid;
|
||||
uint32 hash;
|
||||
LWLock *partition_lock = NULL;
|
||||
uint32 buf_state;
|
||||
int ret = DMS_SUCCESS;
|
||||
|
||||
hash = BufTableHashCode(tag);
|
||||
@ -736,6 +738,47 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
|
||||
PG_TRY();
|
||||
{
|
||||
buf_desc = GetBufferDescriptor(buf_id);
|
||||
if (SS_PRIMARY_MODE) {
|
||||
buf_state = LockBufHdr(buf_desc);
|
||||
if (BUF_STATE_GET_REFCOUNT(buf_state) != 0 || BUF_STATE_GET_USAGECOUNT(buf_state) != 0) {
|
||||
UnlockBufHdr(buf_desc, buf_state);
|
||||
LWLockRelease(partition_lock);
|
||||
return DMS_ERROR;
|
||||
}
|
||||
|
||||
if (!(buf_state & BM_VALID) || (buf_state & BM_IO_ERROR)) {
|
||||
ereport(LOG, (errmodule(MOD_DMS),
|
||||
errmsg("[%d/%d/%d/%d %d-%d] invalidate page, buffer is not valid or io error, state = 0x%x",
|
||||
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
|
||||
tag->forkNum, tag->blockNum, buf_desc->state)));
|
||||
UnlockBufHdr(buf_desc, buf_state);
|
||||
LWLockRelease(partition_lock);
|
||||
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
|
||||
buf_ctrl->seg_fileno = EXTENT_INVALID;
|
||||
buf_ctrl->seg_blockno = InvalidBlockNumber;
|
||||
return DMS_SUCCESS;
|
||||
}
|
||||
|
||||
/* For aio (flush disk not finished), dirty, in dirty queue, dirty need flush, can't recycle */
|
||||
if (buf_desc->extra->aio_in_progress || (buf_state & BM_DIRTY) || (buf_state & BM_JUST_DIRTIED) ||
|
||||
XLogRecPtrIsValid(pg_atomic_read_u64(&buf_desc->extra->rec_lsn)) ||
|
||||
(buf_ctrl->state & BUF_DIRTY_NEED_FLUSH)) {
|
||||
ereport(DEBUG1, (errmodule(MOD_DMS),
|
||||
errmsg("[%d/%d/%d/%d %d-%d] invalidate owner rejected, buffer is dirty/permanent, state = 0x%x",
|
||||
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
|
||||
tag->forkNum, tag->blockNum, buf_desc->state)));
|
||||
ret = DMS_ERROR;
|
||||
} else {
|
||||
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
|
||||
buf_ctrl->seg_fileno = EXTENT_INVALID;
|
||||
buf_ctrl->seg_blockno = InvalidBlockNumber;
|
||||
}
|
||||
|
||||
UnlockBufHdr(buf_desc, buf_state);
|
||||
LWLockRelease(partition_lock);
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (IsSegmentBufferID(buf_id)) {
|
||||
(void)SegPinBuffer(buf_desc);
|
||||
} else {
|
||||
@ -746,8 +789,9 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
|
||||
|
||||
bool wait_success = SSWaitIOTimeout(buf_desc);
|
||||
if (!wait_success) {
|
||||
DmsReleaseBuffer(buf_id + 1, IsSegmentBufferID(buf_id));
|
||||
ret = GS_TIMEOUT;
|
||||
break;
|
||||
return ret;
|
||||
}
|
||||
|
||||
if ((!(pg_atomic_read_u32(&buf_desc->state) & BM_VALID)) ||
|
||||
@ -756,30 +800,26 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
|
||||
errmsg("[%d/%d/%d/%d %d-%d] invalidate page, buffer is not valid or io error, state = 0x%x",
|
||||
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
|
||||
tag->forkNum, tag->blockNum, buf_desc->state)));
|
||||
DmsReleaseBuffer(buf_id + 1, IsSegmentBufferID(buf_id));
|
||||
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
|
||||
buf_ctrl->seg_fileno = EXTENT_INVALID;
|
||||
buf_ctrl->seg_blockno = InvalidBlockNumber;
|
||||
ret = DMS_SUCCESS;
|
||||
break;
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool can_invld_owner = (buf_desc->state & (BM_DIRTY | BM_JUST_DIRTIED | BM_PERMANENT)) > 0 ? false : true;
|
||||
if (!invld_owner || (invld_owner && can_invld_owner)) {
|
||||
get_lock = SSLWLockAcquireTimeout(buf_desc->content_lock, LW_EXCLUSIVE);
|
||||
if (!get_lock) {
|
||||
ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, "
|
||||
"buf_id:%d, lwlock:%p",
|
||||
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
|
||||
tag->forkNum, tag->blockNum, buf_id, buf_desc->content_lock))));
|
||||
ret = GS_TIMEOUT;
|
||||
} else {
|
||||
buf_ctrl = GetDmsBufCtrl(buf_id);
|
||||
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
|
||||
LWLockRelease(buf_desc->content_lock);
|
||||
}
|
||||
} else { /* invalidate owner which buffer is dirty/permanent */
|
||||
ereport(DEBUG1, (errmodule(MOD_DMS),
|
||||
errmsg("[%d/%d/%d/%d %d-%d] invalidate owner rejected, buffer is dirty/permanent, state = 0x%x",
|
||||
get_lock = SSLWLockAcquireTimeout(buf_desc->content_lock, LW_EXCLUSIVE);
|
||||
if (!get_lock) {
|
||||
ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, "
|
||||
"buf_id:%d, lwlock:%p",
|
||||
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
|
||||
tag->forkNum, tag->blockNum, buf_desc->state)));
|
||||
ret = DMS_ERROR;
|
||||
tag->forkNum, tag->blockNum, buf_id, buf_desc->content_lock))));
|
||||
ret = GS_TIMEOUT;
|
||||
} else {
|
||||
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
|
||||
buf_ctrl->seg_fileno = EXTENT_INVALID;
|
||||
buf_ctrl->seg_blockno = InvalidBlockNumber;
|
||||
LWLockRelease(buf_desc->content_lock);
|
||||
}
|
||||
|
||||
if (IsSegmentBufferID(buf_id)) {
|
||||
@ -819,15 +859,17 @@ static void CBVerifyPage(dms_buf_ctrl_t *buf_ctrl, char *new_page)
|
||||
|
||||
BufferDesc *buf_desc = GetBufferDescriptor(buf_ctrl->buf_id);
|
||||
|
||||
if (buf_desc->extra->seg_fileno == EXTENT_INVALID) {
|
||||
buf_desc->extra->seg_fileno = buf_ctrl->seg_fileno;
|
||||
buf_desc->extra->seg_blockno = buf_ctrl->seg_blockno;
|
||||
} else if (buf_desc->extra->seg_fileno != buf_ctrl->seg_fileno ||
|
||||
buf_desc->extra->seg_blockno != buf_ctrl->seg_blockno) {
|
||||
ereport(PANIC, (errmsg("[%u/%u/%u/%d/%d %d-%u] location mismatch, seg_fileno:%d, seg_blockno:%u",
|
||||
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
|
||||
buf_desc->tag.rnode.bucketNode, buf_desc->tag.rnode.opt, buf_desc->tag.forkNum,
|
||||
buf_desc->tag.blockNum, buf_desc->extra->seg_fileno, buf_desc->extra->seg_blockno)));
|
||||
if (buf_ctrl->seg_fileno != EXTENT_INVALID) {
|
||||
if (buf_desc->extra->seg_fileno == EXTENT_INVALID) {
|
||||
buf_desc->extra->seg_fileno = buf_ctrl->seg_fileno;
|
||||
buf_desc->extra->seg_blockno = buf_ctrl->seg_blockno;
|
||||
} else if (buf_desc->extra->seg_fileno != buf_ctrl->seg_fileno ||
|
||||
buf_desc->extra->seg_blockno != buf_ctrl->seg_blockno) {
|
||||
ereport(PANIC, (errmsg("[%u/%u/%u/%d/%d %d-%u] location mismatch, seg_fileno:%d, seg_blockno:%u",
|
||||
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
|
||||
buf_desc->tag.rnode.bucketNode, buf_desc->tag.rnode.opt, buf_desc->tag.forkNum,
|
||||
buf_desc->tag.blockNum, buf_desc->extra->seg_fileno, buf_desc->extra->seg_blockno)));
|
||||
}
|
||||
}
|
||||
|
||||
/* page content is not valid */
|
||||
@ -1170,7 +1212,7 @@ static bool SSCheckBufferIfCanGoRebuild(BufferDesc* buf_desc, uint32 buf_state)
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
|
||||
if ((buf_state & BM_VALID) && (buf_ctrl->lock_mode != (unsigned char)DMS_LOCK_NULL)) {
|
||||
ret = true;
|
||||
} else if ((buf_state & BM_TAG_VALID) && (buf_ctrl->lock_mode != (unsigned char)DMS_LOCK_NULL)) {
|
||||
} else if ((buf_state & BM_TAG_VALID) && (buf_ctrl->lock_mode != (unsigned char)DMS_LOCK_NULL) && !(buf_ctrl->state | BUF_IS_EXTEND)) {
|
||||
if (LWLockConditionalAcquire(buf_desc->io_in_progress_lock, LW_SHARED)) {
|
||||
ret = true;
|
||||
} else {
|
||||
@ -1182,6 +1224,9 @@ static bool SSCheckBufferIfCanGoRebuild(BufferDesc* buf_desc, uint32 buf_state)
|
||||
* page. The stucked process will request the page again when it add content lock and the reformer will
|
||||
* become owner when it request the page.
|
||||
*/
|
||||
ereport(WARNING, (errmsg("[%u/%u/%u/%d/0 %d-%u] Set lock mode to NULL, desc state:%u, ctrl state:%u, lock mode:%d.",
|
||||
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
|
||||
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum, buf_state, buf_ctrl->state, buf_ctrl->lock_mode)));
|
||||
buf_ctrl->lock_mode = DMS_LOCK_NULL;
|
||||
}
|
||||
}
|
||||
|
||||
@ -2113,6 +2113,14 @@ static bool ReadBuffer_common_ReadBlock(SMgrRelation smgr, char relpersistence,
|
||||
}
|
||||
}
|
||||
|
||||
if (ENABLE_DMS) {
|
||||
Buffer buf = BlockGetBuffer((char *)bufBlock);
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf - 1);
|
||||
if (buf_ctrl->lock_mode == DMS_LOCK_NULL) {
|
||||
ereport(PANIC, (errmsg("It seemd read buffer not across DMS!")));
|
||||
}
|
||||
}
|
||||
|
||||
return needputtodirty;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user