diff --git a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp index 14114aad8..87b584d00 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp @@ -153,13 +153,44 @@ void MarkReadHint(int buf_id, char persistence, bool extend, const XLogPhyBlock void ClearReadHint(int buf_id, bool buf_deleted) { dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_id); + + if (buf_ctrl->state & BUF_BEING_RELEASED) { + BufferDesc *buf_desc = GetBufferDescriptor(buf_id); + RelFileNode rnode = buf_desc->tag.rnode; + ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d marked as NOT being" + " released during release owner, old_val:%u", rnode.spcNode, rnode.dbNode, rnode.relNode, + rnode.bucketNode, rnode.opt, buf_desc->tag.forkNum, buf_desc->tag.blockNum, + buf_desc->buf_id, BUF_BEING_RELEASED))); + } + buf_ctrl->state &= - ~(BUF_NEED_LOAD | BUF_IS_LOADED | BUF_LOAD_FAILED | BUF_NEED_TRANSFER | BUF_IS_EXTEND | BUF_DIRTY_NEED_FLUSH); + ~(BUF_NEED_LOAD | BUF_IS_LOADED | BUF_LOAD_FAILED | BUF_NEED_TRANSFER | BUF_IS_EXTEND | + BUF_DIRTY_NEED_FLUSH | BUF_BEING_RELEASED); if (buf_deleted) { buf_ctrl->state = 0; } } +/* this function should be called inside header lock */ +void MarkDmsBufBeingReleased(BufferDesc *buf_desc, bool set) +{ + RelFileNode rnode = buf_desc->tag.rnode; + dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); + unsigned int old_val = buf_ctrl->state & BUF_BEING_RELEASED; + if (set) { + buf_ctrl->state |= BUF_BEING_RELEASED; + ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d marked as being released during" + " release owner, old_val:%u", rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, + rnode.opt, buf_desc->tag.forkNum, buf_desc->tag.blockNum, buf_desc->buf_id, old_val))); + } else if (!set && old_val) { + buf_ctrl->state &= ~BUF_BEING_RELEASED; + ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d marked as NOT being" + " released during release owner, old_val:%u", rnode.spcNode, rnode.dbNode, rnode.relNode, + rnode.bucketNode, rnode.opt, buf_desc->tag.forkNum, buf_desc->tag.blockNum, + buf_desc->buf_id, old_val))); + } +} + /* * true: the page is transferred successfully by dms, * false: the page request is rejected or error, if hold the content_lock, @@ -169,6 +200,14 @@ bool StartReadPage(BufferDesc *buf_desc, LWLockMode mode) { dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); dms_lock_mode_t req_mode = (mode == LW_SHARED) ? DMS_LOCK_SHARE : DMS_LOCK_EXCLUSIVE; + RelFileNode rnode = buf_desc->tag.rnode; + + if (buf_ctrl->state & BUF_BEING_RELEASED) { + ereport(WARNING, (errmsg("[%d/%d/%d/%d/%d %d-%d] buffer is being released in dms_release_owner", + rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt, + buf_desc->tag.forkNum, buf_desc->tag.blockNum))); + return false; + } dms_context_t dms_ctx; InitDmsBufContext(&dms_ctx, buf_desc->tag); @@ -469,17 +508,15 @@ Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, boo return TerminateReadPage(buf_desc, read_mode, OidIsValid(buf_ctrl->pblk_relno) ? &pblk : NULL); } -bool DmsReleaseOwner(BufferTag buf_tag, int buf_id) +bool DmsReleaseOwner(BufferTag buf_tag, int buf_id, unsigned char* released) { dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_id); if (buf_ctrl->state & BUF_IS_RELPERSISTENT_TEMP) { return true; } - unsigned char released = 0; dms_context_t dms_ctx; InitDmsBufContext(&dms_ctx, buf_tag); - - return ((dms_release_owner(&dms_ctx, buf_ctrl, &released) == DMS_SUCCESS) && (released != 0)); + return (dms_release_owner(&dms_ctx, buf_ctrl, released) == DMS_SUCCESS); } void BufValidateDrc(BufferDesc *buf_desc) diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index ab8a0d0ec..d41c78bc8 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -711,15 +711,26 @@ static volatile BufferDesc *PageListBufferAlloc(SMgrRelation smgr, char relpersi * * Need to lock the buffer header to change its tag. */ +retry_victim: buf_state = LockBufHdr(buf); /* Everything is fine, the buffer is ours, so break */ old_flags = buf_state & BUF_FLAG_MASK; if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(old_flags & BM_DIRTY) && !(old_flags & BM_IS_META)) { if (ENABLE_DMS && (old_flags & BM_TAG_VALID)) { - if (DmsReleaseOwner(buf->tag, buf->buf_id)) { + unsigned char released = 0; + bool returned = DmsReleaseOwner(old_tag, buf->buf_id, &released); + + if (returned && released) { ClearReadHint(buf->buf_id, true); break; + } else if (!returned) { + MarkDmsBufBeingReleased(buf, true); + UnlockBufHdr(buf, buf_state); + pg_usleep(1000L); + goto retry_victim; + } else { /* if returned and !released, we will have to try another victim */ + MarkDmsBufBeingReleased(buf, false); } } else { break; @@ -734,6 +745,9 @@ static volatile BufferDesc *PageListBufferAlloc(SMgrRelation smgr, char relpersi * we must undo everything we've done and start * over with a new victim buffer. */ + if (ENABLE_DMS) { /* between two tries of releasing owner, buffer might be dirtied and got skipped */ + MarkDmsBufBeingReleased(buf, false); + } UnlockBufHdr(buf, buf_state); BufTableDelete(&new_tag, new_hash); if ((old_flags & BM_TAG_VALID) && old_partition_lock != new_partition_lock) { @@ -2452,11 +2466,23 @@ found_branch: * 1. previous attempts to read the buffer must have failed, * but DRC has been created, so load page directly again * 2. maybe we have failed previous, and try again in this loop + * 3. if previous read attempt has failed but concurrently getting + * released, a contradiction happens and we panic */ - buf_ctrl->state |= BUF_NEED_LOAD; + if (buf_ctrl->state & BUF_BEING_RELEASED) { + RelFileNode rnode = bufHdr->tag.rnode; + ereport(WARNING, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] previous read" + " attempt has failed but concurrently trying to release owner, contradiction", + rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt, + bufHdr->tag.forkNum, bufHdr->tag.blockNum))); + pg_usleep(5000L); + continue; + } else { + buf_ctrl->state |= BUF_NEED_LOAD; + } } break; - }while (true); + } while (true); return TerminateReadPage(bufHdr, mode, pblk); } @@ -2931,6 +2957,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe /* * Need to lock the buffer header too in order to change its tag. */ +retry_victim: buf_state = LockBufHdr(buf); /* * Somebody could have pinned or re-dirtied the buffer while we were @@ -2948,15 +2975,34 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe * release owner procedure is in buf header lock, it's not reasonable, * need to improve. */ - if (DmsReleaseOwner(old_tag, buf->buf_id)) { + unsigned char released = 0; + RelFileNode rnode = buf->tag.rnode; + bool returned = DmsReleaseOwner(old_tag, buf->buf_id, &released); + int retry_times = 0; + + if (returned && released) { ClearReadHint(buf->buf_id, true); break; + } else if (!returned) { + MarkDmsBufBeingReleased(buf, true); + UnlockBufHdr(buf, buf_state); + pg_usleep(1000L); + ereport(DEBUG1, (errmodule(MOD_DMS), + errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d retry release owner", + rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt, + buf->tag.forkNum, buf->tag.blockNum, buf->buf_id, ++retry_times))); + goto retry_victim; + } else { /* if returned and !released, we will have to try another victim */ + MarkDmsBufBeingReleased(buf, false); } } else { break; } } + if (ENABLE_DMS) { /* between two tries of releasing owner, buffer might be dirtied and got skipped */ + MarkDmsBufBeingReleased(buf, false); + } UnlockBufHdr(buf, buf_state); BufTableDelete(&new_tag, new_hash); if ((old_flags & BM_TAG_VALID) && old_partition_lock != new_partition_lock) { @@ -3113,7 +3159,9 @@ retry: } if (ENABLE_DMS && (buf_state & BM_TAG_VALID)) { - if (!DmsReleaseOwner(buf->tag, buf->buf_id)) { + unsigned char released = 0; + bool returned = DmsReleaseOwner(old_tag, buf->buf_id, &released); + if (!(returned && released)) { UnlockBufHdr(buf, buf_state); LWLockRelease(old_partition_lock); pg_usleep(5000); diff --git a/src/gausskernel/storage/smgr/segment/segbuffer.cpp b/src/gausskernel/storage/smgr/segment/segbuffer.cpp index 598010df2..6cb2f1208 100644 --- a/src/gausskernel/storage/smgr/segment/segbuffer.cpp +++ b/src/gausskernel/storage/smgr/segment/segbuffer.cpp @@ -623,9 +623,21 @@ Buffer ReadBufferFast(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, Bloc } else { /* * previous attempts to read the buffer must have failed, - * but DRC has been created, so load page directly again + * but DRC has been created, so load page directly again; + * but if previous read attempt has failed but concurrently + * getting released, a contradiction happens and we panic. */ Assert(pg_atomic_read_u32(&bufHdr->state) & BM_IO_ERROR); + + if (buf_ctrl->state & BUF_BEING_RELEASED) { + ereport(WARNING, (errmodule(MOD_DMS), errmsg( + "[%d/%d/%d/%d/%d %d-%d] buffer:%d is in the process of release owner", + rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt, + bufHdr->tag.forkNum, bufHdr->tag.blockNum, bufHdr->buf_id))); + pg_usleep(5000L); + continue; + } + buf_ctrl->state |= BUF_NEED_LOAD; } @@ -778,6 +790,7 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, return FoundBufferInHashTable(buf_id, new_partition_lock, foundPtr); } +retry_victim: buf_state = LockBufHdr(buf); old_flags = buf_state & BUF_FLAG_MASK; @@ -788,14 +801,33 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, * release owner procedure is in buf header lock, it's not reasonable, * need to improve. */ - if (DmsReleaseOwner(old_tag, buf->buf_id)) { + unsigned char released = 0; + RelFileNode rnode = buf->tag.rnode; + bool returned = DmsReleaseOwner(old_tag, buf->buf_id, &released); + int retry_times = 0; + + if (returned && released) { ClearReadHint(buf->buf_id, true); break; + } else if (!returned) { + MarkDmsBufBeingReleased(buf, true); + UnlockBufHdr(buf, buf_state); + pg_usleep(1000L); + ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d retry release owner", + rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt, + buf->tag.forkNum, buf->tag.blockNum, buf->buf_id, ++retry_times))); + goto retry_victim; + } else { /* if returned and !released, we will have to try another victim */ + MarkDmsBufBeingReleased(buf, false); } } else { break; } } + + if (ENABLE_DMS) { /* between two tries of releasing owner, buffer might be dirtied and got skipped */ + MarkDmsBufBeingReleased(buf, false); + } UnlockBufHdr(buf, buf_state); BufTableDelete(&new_tag, new_hash); if (old_flag_valid && old_partition_lock != new_partition_lock) { diff --git a/src/include/ddes/dms/ss_common_attr.h b/src/include/ddes/dms/ss_common_attr.h index 48173cb10..61966fb60 100644 --- a/src/include/ddes/dms/ss_common_attr.h +++ b/src/include/ddes/dms/ss_common_attr.h @@ -106,6 +106,8 @@ #define BUF_READ_MODE_ZERO_LOCK 0x80 #define BUF_DIRTY_NEED_FLUSH 0x100 #define BUF_ERTO_NEED_MARK_DIRTY 0x200 +/* mark buffer whether is being released in DMS DRC */ +#define BUF_BEING_RELEASED 0x400 #define SS_BROADCAST_FAILED_RETRYCOUNTS 4 #define SS_BROADCAST_WAIT_INFINITE (0xFFFFFFFF) diff --git a/src/include/ddes/dms/ss_dms_bufmgr.h b/src/include/ddes/dms/ss_dms_bufmgr.h index 692d83c3a..21fbf03db 100644 --- a/src/include/ddes/dms/ss_dms_bufmgr.h +++ b/src/include/ddes/dms/ss_dms_bufmgr.h @@ -55,11 +55,12 @@ void MarkReadHint(int buf_id, char persistence, bool extend, const XLogPhyBlock bool LockModeCompatible(dms_buf_ctrl_t *buf_ctrl, LWLockMode mode); bool StartReadPage(BufferDesc *buf_desc, LWLockMode mode); void ClearReadHint(int buf_id, bool buf_deleted = false); +void MarkDmsBufBeingReleased(BufferDesc *buf_desc, bool set); Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk); Buffer TerminateReadSegPage(BufferDesc *buf_desc, ReadBufferMode read_mode, SegSpace *spc = NULL); Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool *with_io); Buffer DmsReadSegPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool *with_io); -bool DmsReleaseOwner(BufferTag buf_tag, int buf_id); +bool DmsReleaseOwner(BufferTag buf_tag, int buf_id, unsigned char* released); int32 CheckBuf4Rebuild(BufferDesc* buf_desc); int SSLockAcquire(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait, dms_opengauss_lock_req_type_t reqType = LOCK_NORMAL_MODE);