0315 DB side fix: release ack msg recv failed

This commit is contained in:
bowenliu
2023-03-15 14:20:19 +08:00
committed by dongning12
parent b4cc5559c8
commit ccb6e22ada
5 changed files with 133 additions and 13 deletions

View File

@ -153,13 +153,44 @@ void MarkReadHint(int buf_id, char persistence, bool extend, const XLogPhyBlock
void ClearReadHint(int buf_id, bool buf_deleted)
{
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_id);
if (buf_ctrl->state & BUF_BEING_RELEASED) {
BufferDesc *buf_desc = GetBufferDescriptor(buf_id);
RelFileNode rnode = buf_desc->tag.rnode;
ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d marked as NOT being"
" released during release owner, old_val:%u", rnode.spcNode, rnode.dbNode, rnode.relNode,
rnode.bucketNode, rnode.opt, buf_desc->tag.forkNum, buf_desc->tag.blockNum,
buf_desc->buf_id, BUF_BEING_RELEASED)));
}
buf_ctrl->state &=
~(BUF_NEED_LOAD | BUF_IS_LOADED | BUF_LOAD_FAILED | BUF_NEED_TRANSFER | BUF_IS_EXTEND | BUF_DIRTY_NEED_FLUSH);
~(BUF_NEED_LOAD | BUF_IS_LOADED | BUF_LOAD_FAILED | BUF_NEED_TRANSFER | BUF_IS_EXTEND |
BUF_DIRTY_NEED_FLUSH | BUF_BEING_RELEASED);
if (buf_deleted) {
buf_ctrl->state = 0;
}
}
/* this function should be called inside header lock */
void MarkDmsBufBeingReleased(BufferDesc *buf_desc, bool set)
{
RelFileNode rnode = buf_desc->tag.rnode;
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
unsigned int old_val = buf_ctrl->state & BUF_BEING_RELEASED;
if (set) {
buf_ctrl->state |= BUF_BEING_RELEASED;
ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d marked as being released during"
" release owner, old_val:%u", rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode,
rnode.opt, buf_desc->tag.forkNum, buf_desc->tag.blockNum, buf_desc->buf_id, old_val)));
} else if (!set && old_val) {
buf_ctrl->state &= ~BUF_BEING_RELEASED;
ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d marked as NOT being"
" released during release owner, old_val:%u", rnode.spcNode, rnode.dbNode, rnode.relNode,
rnode.bucketNode, rnode.opt, buf_desc->tag.forkNum, buf_desc->tag.blockNum,
buf_desc->buf_id, old_val)));
}
}
/*
* true: the page is transferred successfully by dms,
* false: the page request is rejected or error, if hold the content_lock,
@ -169,6 +200,14 @@ bool StartReadPage(BufferDesc *buf_desc, LWLockMode mode)
{
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
dms_lock_mode_t req_mode = (mode == LW_SHARED) ? DMS_LOCK_SHARE : DMS_LOCK_EXCLUSIVE;
RelFileNode rnode = buf_desc->tag.rnode;
if (buf_ctrl->state & BUF_BEING_RELEASED) {
ereport(WARNING, (errmsg("[%d/%d/%d/%d/%d %d-%d] buffer is being released in dms_release_owner",
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
buf_desc->tag.forkNum, buf_desc->tag.blockNum)));
return false;
}
dms_context_t dms_ctx;
InitDmsBufContext(&dms_ctx, buf_desc->tag);
@ -469,17 +508,15 @@ Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, boo
return TerminateReadPage(buf_desc, read_mode, OidIsValid(buf_ctrl->pblk_relno) ? &pblk : NULL);
}
bool DmsReleaseOwner(BufferTag buf_tag, int buf_id)
bool DmsReleaseOwner(BufferTag buf_tag, int buf_id, unsigned char* released)
{
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_id);
if (buf_ctrl->state & BUF_IS_RELPERSISTENT_TEMP) {
return true;
}
unsigned char released = 0;
dms_context_t dms_ctx;
InitDmsBufContext(&dms_ctx, buf_tag);
return ((dms_release_owner(&dms_ctx, buf_ctrl, &released) == DMS_SUCCESS) && (released != 0));
return (dms_release_owner(&dms_ctx, buf_ctrl, released) == DMS_SUCCESS);
}
void BufValidateDrc(BufferDesc *buf_desc)

View File

@ -711,15 +711,26 @@ static volatile BufferDesc *PageListBufferAlloc(SMgrRelation smgr, char relpersi
*
* Need to lock the buffer header to change its tag.
*/
retry_victim:
buf_state = LockBufHdr(buf);
/* Everything is fine, the buffer is ours, so break */
old_flags = buf_state & BUF_FLAG_MASK;
if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(old_flags & BM_DIRTY) && !(old_flags & BM_IS_META)) {
if (ENABLE_DMS && (old_flags & BM_TAG_VALID)) {
if (DmsReleaseOwner(buf->tag, buf->buf_id)) {
unsigned char released = 0;
bool returned = DmsReleaseOwner(old_tag, buf->buf_id, &released);
if (returned && released) {
ClearReadHint(buf->buf_id, true);
break;
} else if (!returned) {
MarkDmsBufBeingReleased(buf, true);
UnlockBufHdr(buf, buf_state);
pg_usleep(1000L);
goto retry_victim;
} else { /* if returned and !released, we will have to try another victim */
MarkDmsBufBeingReleased(buf, false);
}
} else {
break;
@ -734,6 +745,9 @@ static volatile BufferDesc *PageListBufferAlloc(SMgrRelation smgr, char relpersi
* we must undo everything we've done and start
* over with a new victim buffer.
*/
if (ENABLE_DMS) { /* between two tries of releasing owner, buffer might be dirtied and got skipped */
MarkDmsBufBeingReleased(buf, false);
}
UnlockBufHdr(buf, buf_state);
BufTableDelete(&new_tag, new_hash);
if ((old_flags & BM_TAG_VALID) && old_partition_lock != new_partition_lock) {
@ -2452,11 +2466,23 @@ found_branch:
* 1. previous attempts to read the buffer must have failed,
* but DRC has been created, so load page directly again
* 2. maybe we have failed previous, and try again in this loop
* 3. if previous read attempt has failed but concurrently getting
* released, a contradiction happens and we panic
*/
buf_ctrl->state |= BUF_NEED_LOAD;
if (buf_ctrl->state & BUF_BEING_RELEASED) {
RelFileNode rnode = bufHdr->tag.rnode;
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] previous read"
" attempt has failed but concurrently trying to release owner, contradiction",
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
bufHdr->tag.forkNum, bufHdr->tag.blockNum)));
pg_usleep(5000L);
continue;
} else {
buf_ctrl->state |= BUF_NEED_LOAD;
}
}
break;
}while (true);
} while (true);
return TerminateReadPage(bufHdr, mode, pblk);
}
@ -2931,6 +2957,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe
/*
* Need to lock the buffer header too in order to change its tag.
*/
retry_victim:
buf_state = LockBufHdr(buf);
/*
* Somebody could have pinned or re-dirtied the buffer while we were
@ -2948,15 +2975,34 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe
* release owner procedure is in buf header lock, it's not reasonable,
* need to improve.
*/
if (DmsReleaseOwner(old_tag, buf->buf_id)) {
unsigned char released = 0;
RelFileNode rnode = buf->tag.rnode;
bool returned = DmsReleaseOwner(old_tag, buf->buf_id, &released);
int retry_times = 0;
if (returned && released) {
ClearReadHint(buf->buf_id, true);
break;
} else if (!returned) {
MarkDmsBufBeingReleased(buf, true);
UnlockBufHdr(buf, buf_state);
pg_usleep(1000L);
ereport(DEBUG1, (errmodule(MOD_DMS),
errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d retry release owner",
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
buf->tag.forkNum, buf->tag.blockNum, buf->buf_id, ++retry_times)));
goto retry_victim;
} else { /* if returned and !released, we will have to try another victim */
MarkDmsBufBeingReleased(buf, false);
}
} else {
break;
}
}
if (ENABLE_DMS) { /* between two tries of releasing owner, buffer might be dirtied and got skipped */
MarkDmsBufBeingReleased(buf, false);
}
UnlockBufHdr(buf, buf_state);
BufTableDelete(&new_tag, new_hash);
if ((old_flags & BM_TAG_VALID) && old_partition_lock != new_partition_lock) {
@ -3113,7 +3159,9 @@ retry:
}
if (ENABLE_DMS && (buf_state & BM_TAG_VALID)) {
if (!DmsReleaseOwner(buf->tag, buf->buf_id)) {
unsigned char released = 0;
bool returned = DmsReleaseOwner(old_tag, buf->buf_id, &released);
if (!(returned && released)) {
UnlockBufHdr(buf, buf_state);
LWLockRelease(old_partition_lock);
pg_usleep(5000);

View File

@ -623,9 +623,21 @@ Buffer ReadBufferFast(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, Bloc
} else {
/*
* previous attempts to read the buffer must have failed,
* but DRC has been created, so load page directly again
* but DRC has been created, so load page directly again;
* but if previous read attempt has failed but concurrently
* getting released, a contradiction happens and we panic.
*/
Assert(pg_atomic_read_u32(&bufHdr->state) & BM_IO_ERROR);
if (buf_ctrl->state & BUF_BEING_RELEASED) {
ereport(WARNING, (errmodule(MOD_DMS), errmsg(
"[%d/%d/%d/%d/%d %d-%d] buffer:%d is in the process of release owner",
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
bufHdr->tag.forkNum, bufHdr->tag.blockNum, bufHdr->buf_id)));
pg_usleep(5000L);
continue;
}
buf_ctrl->state |= BUF_NEED_LOAD;
}
@ -778,6 +790,7 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum,
return FoundBufferInHashTable(buf_id, new_partition_lock, foundPtr);
}
retry_victim:
buf_state = LockBufHdr(buf);
old_flags = buf_state & BUF_FLAG_MASK;
@ -788,14 +801,33 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum,
* release owner procedure is in buf header lock, it's not reasonable,
* need to improve.
*/
if (DmsReleaseOwner(old_tag, buf->buf_id)) {
unsigned char released = 0;
RelFileNode rnode = buf->tag.rnode;
bool returned = DmsReleaseOwner(old_tag, buf->buf_id, &released);
int retry_times = 0;
if (returned && released) {
ClearReadHint(buf->buf_id, true);
break;
} else if (!returned) {
MarkDmsBufBeingReleased(buf, true);
UnlockBufHdr(buf, buf_state);
pg_usleep(1000L);
ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d retry release owner",
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
buf->tag.forkNum, buf->tag.blockNum, buf->buf_id, ++retry_times)));
goto retry_victim;
} else { /* if returned and !released, we will have to try another victim */
MarkDmsBufBeingReleased(buf, false);
}
} else {
break;
}
}
if (ENABLE_DMS) { /* between two tries of releasing owner, buffer might be dirtied and got skipped */
MarkDmsBufBeingReleased(buf, false);
}
UnlockBufHdr(buf, buf_state);
BufTableDelete(&new_tag, new_hash);
if (old_flag_valid && old_partition_lock != new_partition_lock) {

View File

@ -106,6 +106,8 @@
#define BUF_READ_MODE_ZERO_LOCK 0x80
#define BUF_DIRTY_NEED_FLUSH 0x100
#define BUF_ERTO_NEED_MARK_DIRTY 0x200
/* mark buffer whether is being released in DMS DRC */
#define BUF_BEING_RELEASED 0x400
#define SS_BROADCAST_FAILED_RETRYCOUNTS 4
#define SS_BROADCAST_WAIT_INFINITE (0xFFFFFFFF)

View File

@ -55,11 +55,12 @@ void MarkReadHint(int buf_id, char persistence, bool extend, const XLogPhyBlock
bool LockModeCompatible(dms_buf_ctrl_t *buf_ctrl, LWLockMode mode);
bool StartReadPage(BufferDesc *buf_desc, LWLockMode mode);
void ClearReadHint(int buf_id, bool buf_deleted = false);
void MarkDmsBufBeingReleased(BufferDesc *buf_desc, bool set);
Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk);
Buffer TerminateReadSegPage(BufferDesc *buf_desc, ReadBufferMode read_mode, SegSpace *spc = NULL);
Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool *with_io);
Buffer DmsReadSegPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool *with_io);
bool DmsReleaseOwner(BufferTag buf_tag, int buf_id);
bool DmsReleaseOwner(BufferTag buf_tag, int buf_id, unsigned char* released);
int32 CheckBuf4Rebuild(BufferDesc* buf_desc);
int SSLockAcquire(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait,
dms_opengauss_lock_req_type_t reqType = LOCK_NORMAL_MODE);