Revert "0315 DB side fix: release ack msg recv failed"
This reverts commit ccb6e22ada49665ef6b1368bf5f6c042dffe12a8.
This commit is contained in:
@ -153,44 +153,13 @@ void MarkReadHint(int buf_id, char persistence, bool extend, const XLogPhyBlock
|
||||
void ClearReadHint(int buf_id, bool buf_deleted)
|
||||
{
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_id);
|
||||
|
||||
if (buf_ctrl->state & BUF_BEING_RELEASED) {
|
||||
BufferDesc *buf_desc = GetBufferDescriptor(buf_id);
|
||||
RelFileNode rnode = buf_desc->tag.rnode;
|
||||
ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d marked as NOT being"
|
||||
" released during release owner, old_val:%u", rnode.spcNode, rnode.dbNode, rnode.relNode,
|
||||
rnode.bucketNode, rnode.opt, buf_desc->tag.forkNum, buf_desc->tag.blockNum,
|
||||
buf_desc->buf_id, BUF_BEING_RELEASED)));
|
||||
}
|
||||
|
||||
buf_ctrl->state &=
|
||||
~(BUF_NEED_LOAD | BUF_IS_LOADED | BUF_LOAD_FAILED | BUF_NEED_TRANSFER | BUF_IS_EXTEND |
|
||||
BUF_DIRTY_NEED_FLUSH | BUF_BEING_RELEASED);
|
||||
~(BUF_NEED_LOAD | BUF_IS_LOADED | BUF_LOAD_FAILED | BUF_NEED_TRANSFER | BUF_IS_EXTEND | BUF_DIRTY_NEED_FLUSH);
|
||||
if (buf_deleted) {
|
||||
buf_ctrl->state = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* this function should be called inside header lock */
|
||||
void MarkDmsBufBeingReleased(BufferDesc *buf_desc, bool set)
|
||||
{
|
||||
RelFileNode rnode = buf_desc->tag.rnode;
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
|
||||
unsigned int old_val = buf_ctrl->state & BUF_BEING_RELEASED;
|
||||
if (set) {
|
||||
buf_ctrl->state |= BUF_BEING_RELEASED;
|
||||
ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d marked as being released during"
|
||||
" release owner, old_val:%u", rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode,
|
||||
rnode.opt, buf_desc->tag.forkNum, buf_desc->tag.blockNum, buf_desc->buf_id, old_val)));
|
||||
} else if (!set && old_val) {
|
||||
buf_ctrl->state &= ~BUF_BEING_RELEASED;
|
||||
ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d marked as NOT being"
|
||||
" released during release owner, old_val:%u", rnode.spcNode, rnode.dbNode, rnode.relNode,
|
||||
rnode.bucketNode, rnode.opt, buf_desc->tag.forkNum, buf_desc->tag.blockNum,
|
||||
buf_desc->buf_id, old_val)));
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* true: the page is transferred successfully by dms,
|
||||
* false: the page request is rejected or error, if hold the content_lock,
|
||||
@ -200,14 +169,6 @@ bool StartReadPage(BufferDesc *buf_desc, LWLockMode mode)
|
||||
{
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
|
||||
dms_lock_mode_t req_mode = (mode == LW_SHARED) ? DMS_LOCK_SHARE : DMS_LOCK_EXCLUSIVE;
|
||||
RelFileNode rnode = buf_desc->tag.rnode;
|
||||
|
||||
if (buf_ctrl->state & BUF_BEING_RELEASED) {
|
||||
ereport(WARNING, (errmsg("[%d/%d/%d/%d/%d %d-%d] buffer is being released in dms_release_owner",
|
||||
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
|
||||
buf_desc->tag.forkNum, buf_desc->tag.blockNum)));
|
||||
return false;
|
||||
}
|
||||
|
||||
dms_context_t dms_ctx;
|
||||
InitDmsBufContext(&dms_ctx, buf_desc->tag);
|
||||
@ -508,15 +469,17 @@ Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, boo
|
||||
return TerminateReadPage(buf_desc, read_mode, OidIsValid(buf_ctrl->pblk_relno) ? &pblk : NULL);
|
||||
}
|
||||
|
||||
bool DmsReleaseOwner(BufferTag buf_tag, int buf_id, unsigned char* released)
|
||||
bool DmsReleaseOwner(BufferTag buf_tag, int buf_id)
|
||||
{
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_id);
|
||||
if (buf_ctrl->state & BUF_IS_RELPERSISTENT_TEMP) {
|
||||
return true;
|
||||
}
|
||||
unsigned char released = 0;
|
||||
dms_context_t dms_ctx;
|
||||
InitDmsBufContext(&dms_ctx, buf_tag);
|
||||
return (dms_release_owner(&dms_ctx, buf_ctrl, released) == DMS_SUCCESS);
|
||||
|
||||
return ((dms_release_owner(&dms_ctx, buf_ctrl, &released) == DMS_SUCCESS) && (released != 0));
|
||||
}
|
||||
|
||||
void BufValidateDrc(BufferDesc *buf_desc)
|
||||
|
||||
@ -711,26 +711,15 @@ static volatile BufferDesc *PageListBufferAlloc(SMgrRelation smgr, char relpersi
|
||||
*
|
||||
* Need to lock the buffer header to change its tag.
|
||||
*/
|
||||
retry_victim:
|
||||
buf_state = LockBufHdr(buf);
|
||||
|
||||
/* Everything is fine, the buffer is ours, so break */
|
||||
old_flags = buf_state & BUF_FLAG_MASK;
|
||||
if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(old_flags & BM_DIRTY) && !(old_flags & BM_IS_META)) {
|
||||
if (ENABLE_DMS && (old_flags & BM_TAG_VALID)) {
|
||||
unsigned char released = 0;
|
||||
bool returned = DmsReleaseOwner(old_tag, buf->buf_id, &released);
|
||||
|
||||
if (returned && released) {
|
||||
if (DmsReleaseOwner(buf->tag, buf->buf_id)) {
|
||||
ClearReadHint(buf->buf_id, true);
|
||||
break;
|
||||
} else if (!returned) {
|
||||
MarkDmsBufBeingReleased(buf, true);
|
||||
UnlockBufHdr(buf, buf_state);
|
||||
pg_usleep(1000L);
|
||||
goto retry_victim;
|
||||
} else { /* if returned and !released, we will have to try another victim */
|
||||
MarkDmsBufBeingReleased(buf, false);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
@ -745,9 +734,6 @@ retry_victim:
|
||||
* we must undo everything we've done and start
|
||||
* over with a new victim buffer.
|
||||
*/
|
||||
if (ENABLE_DMS) { /* between two tries of releasing owner, buffer might be dirtied and got skipped */
|
||||
MarkDmsBufBeingReleased(buf, false);
|
||||
}
|
||||
UnlockBufHdr(buf, buf_state);
|
||||
BufTableDelete(&new_tag, new_hash);
|
||||
if ((old_flags & BM_TAG_VALID) && old_partition_lock != new_partition_lock) {
|
||||
@ -2480,23 +2466,11 @@ found_branch:
|
||||
* 1. previous attempts to read the buffer must have failed,
|
||||
* but DRC has been created, so load page directly again
|
||||
* 2. maybe we have failed previous, and try again in this loop
|
||||
* 3. if previous read attempt has failed but concurrently getting
|
||||
* released, a contradiction happens and we panic
|
||||
*/
|
||||
if (buf_ctrl->state & BUF_BEING_RELEASED) {
|
||||
RelFileNode rnode = bufHdr->tag.rnode;
|
||||
ereport(WARNING, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] previous read"
|
||||
" attempt has failed but concurrently trying to release owner, contradiction",
|
||||
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
|
||||
bufHdr->tag.forkNum, bufHdr->tag.blockNum)));
|
||||
pg_usleep(5000L);
|
||||
continue;
|
||||
} else {
|
||||
buf_ctrl->state |= BUF_NEED_LOAD;
|
||||
}
|
||||
buf_ctrl->state |= BUF_NEED_LOAD;
|
||||
}
|
||||
break;
|
||||
} while (true);
|
||||
}while (true);
|
||||
|
||||
return TerminateReadPage(bufHdr, mode, pblk);
|
||||
}
|
||||
@ -2971,7 +2945,6 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumbe
|
||||
/*
|
||||
* Need to lock the buffer header too in order to change its tag.
|
||||
*/
|
||||
retry_victim:
|
||||
buf_state = LockBufHdr(buf);
|
||||
/*
|
||||
* Somebody could have pinned or re-dirtied the buffer while we were
|
||||
@ -2989,34 +2962,15 @@ retry_victim:
|
||||
* release owner procedure is in buf header lock, it's not reasonable,
|
||||
* need to improve.
|
||||
*/
|
||||
unsigned char released = 0;
|
||||
RelFileNode rnode = buf->tag.rnode;
|
||||
bool returned = DmsReleaseOwner(old_tag, buf->buf_id, &released);
|
||||
int retry_times = 0;
|
||||
|
||||
if (returned && released) {
|
||||
if (DmsReleaseOwner(old_tag, buf->buf_id)) {
|
||||
ClearReadHint(buf->buf_id, true);
|
||||
break;
|
||||
} else if (!returned) {
|
||||
MarkDmsBufBeingReleased(buf, true);
|
||||
UnlockBufHdr(buf, buf_state);
|
||||
pg_usleep(1000L);
|
||||
ereport(DEBUG1, (errmodule(MOD_DMS),
|
||||
errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d retry release owner for %d times",
|
||||
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
|
||||
buf->tag.forkNum, buf->tag.blockNum, buf->buf_id, ++retry_times)));
|
||||
goto retry_victim;
|
||||
} else { /* if returned and !released, we will have to try another victim */
|
||||
MarkDmsBufBeingReleased(buf, false);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (ENABLE_DMS) { /* between two tries of releasing owner, buffer might be dirtied and got skipped */
|
||||
MarkDmsBufBeingReleased(buf, false);
|
||||
}
|
||||
UnlockBufHdr(buf, buf_state);
|
||||
BufTableDelete(&new_tag, new_hash);
|
||||
if ((old_flags & BM_TAG_VALID) && old_partition_lock != new_partition_lock) {
|
||||
@ -3173,9 +3127,7 @@ retry:
|
||||
}
|
||||
|
||||
if (ENABLE_DMS && (buf_state & BM_TAG_VALID)) {
|
||||
unsigned char released = 0;
|
||||
bool returned = DmsReleaseOwner(old_tag, buf->buf_id, &released);
|
||||
if (!(returned && released)) {
|
||||
if (!DmsReleaseOwner(buf->tag, buf->buf_id)) {
|
||||
UnlockBufHdr(buf, buf_state);
|
||||
LWLockRelease(old_partition_lock);
|
||||
pg_usleep(5000);
|
||||
|
||||
@ -629,21 +629,9 @@ Buffer ReadBufferFast(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, Bloc
|
||||
} else {
|
||||
/*
|
||||
* previous attempts to read the buffer must have failed,
|
||||
* but DRC has been created, so load page directly again;
|
||||
* but if previous read attempt has failed but concurrently
|
||||
* getting released, a contradiction happens and we panic.
|
||||
* but DRC has been created, so load page directly again
|
||||
*/
|
||||
Assert(pg_atomic_read_u32(&bufHdr->state) & BM_IO_ERROR);
|
||||
|
||||
if (buf_ctrl->state & BUF_BEING_RELEASED) {
|
||||
ereport(WARNING, (errmodule(MOD_DMS), errmsg(
|
||||
"[%d/%d/%d/%d/%d %d-%d] buffer:%d is in the process of release owner",
|
||||
rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
|
||||
bufHdr->tag.forkNum, bufHdr->tag.blockNum, bufHdr->buf_id)));
|
||||
pg_usleep(5000L);
|
||||
continue;
|
||||
}
|
||||
|
||||
buf_ctrl->state |= BUF_NEED_LOAD;
|
||||
}
|
||||
|
||||
@ -796,7 +784,6 @@ BufferDesc *SegBufferAlloc(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum,
|
||||
return FoundBufferInHashTable(buf_id, new_partition_lock, foundPtr);
|
||||
}
|
||||
|
||||
retry_victim:
|
||||
buf_state = LockBufHdr(buf);
|
||||
old_flags = buf_state & BUF_FLAG_MASK;
|
||||
|
||||
@ -807,33 +794,14 @@ retry_victim:
|
||||
* release owner procedure is in buf header lock, it's not reasonable,
|
||||
* need to improve.
|
||||
*/
|
||||
unsigned char released = 0;
|
||||
RelFileNode rnode = buf->tag.rnode;
|
||||
bool returned = DmsReleaseOwner(old_tag, buf->buf_id, &released);
|
||||
int retry_times = 0;
|
||||
|
||||
if (returned && released) {
|
||||
if (DmsReleaseOwner(old_tag, buf->buf_id)) {
|
||||
ClearReadHint(buf->buf_id, true);
|
||||
break;
|
||||
} else if (!returned) {
|
||||
MarkDmsBufBeingReleased(buf, true);
|
||||
UnlockBufHdr(buf, buf_state);
|
||||
pg_usleep(1000L);
|
||||
ereport(DEBUG1, (errmodule(MOD_DMS), errmsg("[%d/%d/%d/%d/%d %d-%d] buf:%d retry release owner "
|
||||
"for %d times", rnode.spcNode, rnode.dbNode, rnode.relNode, rnode.bucketNode, rnode.opt,
|
||||
buf->tag.forkNum, buf->tag.blockNum, buf->buf_id, ++retry_times)));
|
||||
goto retry_victim;
|
||||
} else { /* if returned and !released, we will have to try another victim */
|
||||
MarkDmsBufBeingReleased(buf, false);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (ENABLE_DMS) { /* between two tries of releasing owner, buffer might be dirtied and got skipped */
|
||||
MarkDmsBufBeingReleased(buf, false);
|
||||
}
|
||||
UnlockBufHdr(buf, buf_state);
|
||||
BufTableDelete(&new_tag, new_hash);
|
||||
if (old_flag_valid && old_partition_lock != new_partition_lock) {
|
||||
|
||||
@ -127,8 +127,6 @@
|
||||
#define BUF_READ_MODE_ZERO_LOCK 0x80
|
||||
#define BUF_DIRTY_NEED_FLUSH 0x100
|
||||
#define BUF_ERTO_NEED_MARK_DIRTY 0x200
|
||||
/* mark buffer whether is being released in DMS DRC */
|
||||
#define BUF_BEING_RELEASED 0x400
|
||||
|
||||
#define SS_BROADCAST_FAILED_RETRYCOUNTS 4
|
||||
#define SS_BROADCAST_WAIT_INFINITE (0xFFFFFFFF)
|
||||
|
||||
@ -55,12 +55,11 @@ void MarkReadHint(int buf_id, char persistence, bool extend, const XLogPhyBlock
|
||||
bool LockModeCompatible(dms_buf_ctrl_t *buf_ctrl, LWLockMode mode);
|
||||
bool StartReadPage(BufferDesc *buf_desc, LWLockMode mode);
|
||||
void ClearReadHint(int buf_id, bool buf_deleted = false);
|
||||
void MarkDmsBufBeingReleased(BufferDesc *buf_desc, bool set);
|
||||
Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk);
|
||||
Buffer TerminateReadSegPage(BufferDesc *buf_desc, ReadBufferMode read_mode, SegSpace *spc = NULL);
|
||||
Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool *with_io);
|
||||
Buffer DmsReadSegPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, bool *with_io);
|
||||
bool DmsReleaseOwner(BufferTag buf_tag, int buf_id, unsigned char* released);
|
||||
bool DmsReleaseOwner(BufferTag buf_tag, int buf_id);
|
||||
int SSLockAcquire(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock, bool dontWait,
|
||||
dms_opengauss_lock_req_type_t reqType = LOCK_NORMAL_MODE);
|
||||
int SSLockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock);
|
||||
|
||||
Reference in New Issue
Block a user