!2807 【共享存储】修复invalid shared copy和获取页面并发时, 当IO未完成时, invalid可能导致页面实际为失效

Merge pull request !2807 from cchen676/1207master
This commit is contained in:
opengauss-bot
2023-01-14 06:04:41 +00:00
committed by Gitee

View File

@ -600,13 +600,10 @@ static char* CBGetPage(dms_buf_ctrl_t *buf_ctrl)
static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsigned int ver)
{
bool valid = false;
int buf_id;
int buf_id = -1;
BufferTag* tag = (BufferTag *)pageid;
uint32 hash;
LWLock *partition_lock = NULL;
BufferDesc *buf_desc = NULL;
dms_buf_ctrl_t *buf_ctrl = NULL;
int ret = DMS_SUCCESS;
hash = BufTableHashCode(tag);
@ -619,30 +616,34 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
return ret;
}
BufferDesc *buf_desc = GetBufferDescriptor(buf_id);
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_id);
uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount;
PG_TRY();
{
buf_desc = GetBufferDescriptor(buf_id);
if (IsSegmentBufferID(buf_id)) {
valid = SegPinBuffer(buf_desc);
(void)SegPinBuffer(buf_desc);
} else {
ResourceOwnerEnlargeBuffers(t_thrd.utils_cxt.CurrentResourceOwner);
valid = PinBuffer(buf_desc, NULL);
(void)PinBuffer(buf_desc, NULL);
}
LWLockRelease(partition_lock);
if (valid) {
(void)LWLockAcquire(buf_desc->content_lock, LW_EXCLUSIVE);
buf_ctrl = GetDmsBufCtrl(buf_id);
if (ver == buf_ctrl->ver) {
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
} else {
ereport(WARNING, (errmodule(MOD_DMS),
WaitIO(buf_desc);
Assert(pg_atomic_read_u32(&buf_desc->state) & BM_VALID);
Assert(!(pg_atomic_read_u32(&buf_desc->state) & BM_IO_ERROR));
(void)LWLockAcquire(buf_desc->content_lock, LW_EXCLUSIVE);
buf_ctrl = GetDmsBufCtrl(buf_id);
if (ver == buf_ctrl->ver) {
buf_ctrl->lock_mode = (unsigned char)DMS_LOCK_NULL;
} else {
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[CBInvalidatePage] invalid ver:%u, buf_ctrl ver:%u", ver, buf_ctrl->ver)));
ret = DMS_ERROR;
}
LWLockRelease(buf_desc->content_lock);
ret = DMS_ERROR;
}
LWLockRelease(buf_desc->content_lock);
if (IsSegmentBufferID(buf_id)) {
SegReleaseBuffer(buf_id + 1);
} else {
@ -662,12 +663,12 @@ static int CBInvalidatePage(void *db_handle, char pageid[DMS_PAGEID_SIZE], unsig
ReleaseResource();
ret = DMS_ERROR;
}
PG_END_TRY();
if (ret == DMS_SUCCESS) {
Assert(buf_ctrl->lock_mode == DMS_LOCK_NULL);
}
PG_END_TRY();
return ret;
}
@ -1033,7 +1034,6 @@ static int32 CBDrcBufRebuild(void *db_handle)
// used for find bufferdesc in dms
static void SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_desc)
{
bool valid;
int buf_id;
uint32 hash;
LWLock *partition_lock = NULL;
@ -1063,14 +1063,17 @@ static void SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_d
buf_id = BufTableLookup(tag, hash);
if (buf_id >= 0) {
buf_desc = GetBufferDescriptor(buf_id);
ResourceOwnerEnlargeBuffers(t_thrd.utils_cxt.CurrentResourceOwner);
if (IsSegmentBufferID(buf_id)) {
valid = SegPinBuffer(buf_desc);
(void)SegPinBuffer(buf_desc);
} else {
valid = PinBuffer(buf_desc, NULL);
ResourceOwnerEnlargeBuffers(t_thrd.utils_cxt.CurrentResourceOwner);
(void)PinBuffer(buf_desc, NULL);
}
LWLockRelease(partition_lock);
*is_valid = valid;
WaitIO(buf_desc);
Assert(!(pg_atomic_read_u32(&buf_desc->state) & BM_IO_ERROR));
*is_valid = (pg_atomic_read_u32(&buf_desc->state) & BM_VALID) != 0;
*ret_buf_desc = buf_desc;
} else {
*ret_buf_desc = NULL;