!4946 【资源池化】【bugfix】解决clean阶段confirm owner/cvt失败情况,以及clean阶段超时
Merge pull request !4946 from 董宁/bugfix18_master
This commit is contained in:
@ -57,6 +57,7 @@ void InitDmsBufCtrl(void)
|
||||
buf_ctrl->pblk_blkno = InvalidBlockNumber;
|
||||
buf_ctrl->pblk_lsn = InvalidXLogRecPtr;
|
||||
buf_ctrl->been_loaded = false;
|
||||
buf_ctrl->ctrl_lock = LWLockAssign(LWTRANCHE_DMS_BUF_CTRL);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -181,8 +182,9 @@ bool StartReadPage(BufferDesc *buf_desc, LWLockMode mode)
|
||||
|
||||
dms_context_t dms_ctx;
|
||||
InitDmsBufContext(&dms_ctx, buf_desc->tag);
|
||||
|
||||
LWLockAcquire((LWLock*)buf_ctrl->ctrl_lock, LW_EXCLUSIVE);
|
||||
int ret = dms_request_page(&dms_ctx, buf_ctrl, req_mode);
|
||||
LWLockRelease((LWLock*)buf_ctrl->ctrl_lock);
|
||||
return (ret == DMS_SUCCESS);
|
||||
}
|
||||
|
||||
@ -642,8 +644,10 @@ bool DmsReleaseOwner(BufferTag buf_tag, int buf_id)
|
||||
unsigned char released = 0;
|
||||
dms_context_t dms_ctx;
|
||||
InitDmsBufContext(&dms_ctx, buf_tag);
|
||||
|
||||
return ((dms_release_owner(&dms_ctx, buf_ctrl, &released) == DMS_SUCCESS) && (released != 0));
|
||||
LWLockAcquire((LWLock*)buf_ctrl->ctrl_lock, LW_EXCLUSIVE);
|
||||
int ret = dms_release_owner(&dms_ctx, buf_ctrl, &released);
|
||||
LWLockRelease((LWLock*)buf_ctrl->ctrl_lock);
|
||||
return ((ret == DMS_SUCCESS) && (released != 0));
|
||||
}
|
||||
|
||||
void BufValidateDrc(BufferDesc *buf_desc)
|
||||
|
@ -1432,15 +1432,15 @@ static int32 CBDrcBufValidate(void *db_handle)
|
||||
}
|
||||
|
||||
// used for find bufferdesc in dms
|
||||
static bool SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_desc)
|
||||
// no need WaitIO to check valid bit is set or not, we use spinlock to guarantee to lock_mode
|
||||
static BufferDesc* SSGetBufferDesc(char *pageid)
|
||||
{
|
||||
int buf_id;
|
||||
uint32 hash;
|
||||
BufferTag *tag = (BufferTag *)pageid;
|
||||
BufferDesc *buf_desc;
|
||||
bool ret = true;
|
||||
|
||||
BufferDesc *buf_desc = NULL;
|
||||
RelFileNode relfilenode = tag->rnode;
|
||||
uint32 hash = BufTableHashCode(tag);
|
||||
bool retry = false;
|
||||
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
if (IsSegmentPhysicalRelNode(relfilenode)) {
|
||||
@ -1453,40 +1453,26 @@ static bool SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_d
|
||||
}
|
||||
#endif
|
||||
|
||||
hash = BufTableHashCode(tag);
|
||||
|
||||
uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount;
|
||||
PG_TRY();
|
||||
{
|
||||
buf_id = BufTableLookup(tag, hash);
|
||||
if (buf_id >= 0) {
|
||||
buf_desc = GetBufferDescriptor(buf_id);
|
||||
if (IsSegmentBufferID(buf_id)) {
|
||||
(void)SegPinBuffer(buf_desc);
|
||||
} else {
|
||||
ResourceOwnerEnlargeBuffers(t_thrd.utils_cxt.CurrentResourceOwner);
|
||||
(void)PinBuffer(buf_desc, NULL);
|
||||
do {
|
||||
buf_id = BufTableLookup(tag, hash);
|
||||
if (buf_id < 0) {
|
||||
buf_desc = NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
buf_desc = GetBufferDescriptor(buf_id);
|
||||
(void)SSPinBuffer(buf_desc);
|
||||
if (!BUFFERTAGS_PTR_EQUAL(&buf_desc->tag, tag)) {
|
||||
SSUnPinBuffer(buf_desc);
|
||||
*ret_buf_desc = NULL;
|
||||
break;
|
||||
buf_desc = NULL;
|
||||
retry = true;
|
||||
} else {
|
||||
retry = false;
|
||||
}
|
||||
|
||||
bool wait_success = SSWaitIOTimeout(buf_desc);
|
||||
if (!wait_success) {
|
||||
SSUnPinBuffer(buf_desc);
|
||||
ret = false;
|
||||
break;
|
||||
}
|
||||
|
||||
Assert(!(pg_atomic_read_u64(&buf_desc->state) & BM_IO_ERROR));
|
||||
*is_valid = (pg_atomic_read_u64(&buf_desc->state) & BM_VALID) != 0;
|
||||
*ret_buf_desc = buf_desc;
|
||||
} else {
|
||||
*ret_buf_desc = NULL;
|
||||
}
|
||||
} while (retry);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
@ -1494,100 +1480,60 @@ static bool SSGetBufferDesc(char *pageid, bool *is_valid, BufferDesc** ret_buf_d
|
||||
ReleaseResource();
|
||||
}
|
||||
PG_END_TRY();
|
||||
return ret;
|
||||
return buf_desc;
|
||||
}
|
||||
|
||||
static int CBConfirmOwner(void *db_handle, char *pageid, unsigned char *lock_mode, unsigned char *is_edp,
|
||||
unsigned long long *edp_lsn)
|
||||
{
|
||||
BufferDesc *buf_desc = NULL;
|
||||
bool valid;
|
||||
*is_edp = (unsigned char)false;
|
||||
dms_buf_ctrl_t *buf_ctrl = NULL;
|
||||
|
||||
bool ret = SSGetBufferDesc(pageid, &valid, &buf_desc);
|
||||
if (!ret) {
|
||||
ereport(WARNING, (errmodule(MOD_DMS),
|
||||
errmsg("[SS] CBConfirmOwner, require LWLock timeout")));
|
||||
return GS_TIMEOUT;
|
||||
}
|
||||
|
||||
BufferDesc *buf_desc = SSGetBufferDesc(pageid);
|
||||
if (buf_desc == NULL) {
|
||||
*lock_mode = (uint8)DMS_LOCK_NULL;
|
||||
return GS_SUCCESS;
|
||||
}
|
||||
|
||||
if (!valid) {
|
||||
*lock_mode = (uint8)DMS_LOCK_NULL;
|
||||
*is_edp = (unsigned char)false;
|
||||
SSUnPinBuffer(buf_desc);
|
||||
return GS_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* not acquire buf_desc->io_in_progress_lock
|
||||
* consistency guaranteed by reform phase
|
||||
*/
|
||||
buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
|
||||
LWLockAcquire((LWLock*)buf_ctrl->ctrl_lock, LW_EXCLUSIVE);
|
||||
*lock_mode = buf_ctrl->lock_mode;
|
||||
// opengauss currently no edp
|
||||
Assert(buf_ctrl->is_edp == 0);
|
||||
*is_edp = (unsigned char)false;
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
if (buf_ctrl->is_edp) {
|
||||
BufferTag *tag = &buf_desc->tag;
|
||||
ereport(PANIC, (errmsg("[SS][%u/%u/%u/%d %d-%u] CBConfirmOwner, do not allow edp exist, please check.",
|
||||
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
|
||||
tag->forkNum, tag->blockNum)));
|
||||
}
|
||||
#endif
|
||||
LWLockRelease((LWLock*)buf_ctrl->ctrl_lock);
|
||||
SSUnPinBuffer(buf_desc);
|
||||
|
||||
return GS_SUCCESS;
|
||||
}
|
||||
|
||||
static int CBConfirmConverting(void *db_handle, char *pageid, unsigned char smon_chk,
|
||||
unsigned char *lock_mode, unsigned long long *edp_map, unsigned long long *lsn)
|
||||
{
|
||||
BufferDesc *buf_desc = NULL;
|
||||
bool valid;
|
||||
dms_buf_ctrl_t *buf_ctrl = NULL;
|
||||
|
||||
*lsn = 0;
|
||||
*lsn = 0; // lsn not used in dms, so need to waste time to PageGetLSN
|
||||
*edp_map = 0;
|
||||
|
||||
bool ret = SSGetBufferDesc(pageid, &valid, &buf_desc);
|
||||
if (!ret) {
|
||||
ereport(WARNING, (errmodule(MOD_DMS),
|
||||
errmsg("[SS] CBConfirmConverting, require LWLock timeout")));
|
||||
return GS_TIMEOUT;
|
||||
}
|
||||
|
||||
BufferDesc *buf_desc = SSGetBufferDesc(pageid);
|
||||
if (buf_desc == NULL) {
|
||||
*lock_mode = (uint8)DMS_LOCK_NULL;
|
||||
return GS_SUCCESS;
|
||||
}
|
||||
|
||||
if (!valid) {
|
||||
*lock_mode = (uint8)DMS_LOCK_NULL;
|
||||
SSUnPinBuffer(buf_desc);
|
||||
return GS_SUCCESS;
|
||||
}
|
||||
|
||||
bool get_lock = SSLWLockAcquireTimeout(buf_desc->io_in_progress_lock, LW_EXCLUSIVE);
|
||||
if (get_lock) {
|
||||
buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
|
||||
*lock_mode = buf_ctrl->lock_mode;
|
||||
LWLockRelease(buf_desc->io_in_progress_lock);
|
||||
SSUnPinBuffer(buf_desc);
|
||||
return GS_SUCCESS;
|
||||
}
|
||||
|
||||
if (smon_chk) {
|
||||
SSUnPinBuffer(buf_desc);
|
||||
BufferTag *tag = &buf_desc->tag;
|
||||
ereport(WARNING, (errmodule(MOD_DMS), (errmsg("[SS lwlock][%u/%u/%u/%d %d-%u] request LWLock timeout, "
|
||||
"buf_id:%d, lwlock:%p",
|
||||
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
|
||||
tag->forkNum, tag->blockNum, buf_desc->buf_id, buf_desc->io_in_progress_lock))));
|
||||
return GS_TIMEOUT;
|
||||
}
|
||||
|
||||
// without lock
|
||||
buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
|
||||
LWLockAcquire((LWLock*)buf_ctrl->ctrl_lock, LW_EXCLUSIVE);
|
||||
*lock_mode = buf_ctrl->lock_mode;
|
||||
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
if (buf_ctrl->is_edp) {
|
||||
BufferTag *tag = &buf_desc->tag;
|
||||
ereport(PANIC, (errmsg("[SS][%u/%u/%u/%d %d-%u] CBConfirmConverting, do not allow edp exist, please check.",
|
||||
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
|
||||
tag->forkNum, tag->blockNum)));
|
||||
}
|
||||
#endif
|
||||
LWLockRelease((LWLock*)buf_ctrl->ctrl_lock);
|
||||
SSUnPinBuffer(buf_desc);
|
||||
return GS_SUCCESS;
|
||||
}
|
||||
@ -2024,38 +1970,37 @@ static int CBCacheMsg(void *db_handle, char* msg)
|
||||
|
||||
static int CBMarkNeedFlush(void *db_handle, char *pageid, unsigned char *is_edp)
|
||||
{
|
||||
bool valid = false;
|
||||
BufferDesc *buf_desc = NULL;
|
||||
*is_edp = false;
|
||||
BufferTag *tag = (BufferTag *)pageid;
|
||||
|
||||
bool ret = SSGetBufferDesc(pageid, &valid, &buf_desc);
|
||||
if (!ret) {
|
||||
ereport(WARNING, (errmodule(MOD_DMS),
|
||||
errmsg("[SS] CBMarkNeedFlush, require LWLock timeout")));
|
||||
return GS_TIMEOUT;
|
||||
}
|
||||
|
||||
BufferDesc *buf_desc = SSGetBufferDesc(pageid);
|
||||
if (buf_desc == NULL) {
|
||||
ereport(WARNING, (errmodule(MOD_DMS),
|
||||
errmsg("[SS] CBMarkNeedFlush, buf_desc not found")));
|
||||
ereport(LOG, (errmodule(MOD_DMS),
|
||||
errmsg("[SS][%u/%u/%u/%d %d-%u] CBMarkNeedFlush, buf_desc not found",
|
||||
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
|
||||
tag->forkNum, tag->blockNum)));
|
||||
return DMS_ERROR;
|
||||
}
|
||||
|
||||
if (!valid) {
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
|
||||
LWLockAcquire((LWLock*)buf_ctrl->ctrl_lock, LW_EXCLUSIVE);
|
||||
if (!DMS_BUF_CTRL_IS_OWNER(buf_ctrl)) {
|
||||
LWLockRelease((LWLock*)buf_ctrl->ctrl_lock);
|
||||
ereport(LOG, (errmodule(MOD_DMS),
|
||||
errmsg("[SS][%u/%u/%u/%d %d-%u] CBMarkNeedFlush, do not have current page",
|
||||
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
|
||||
tag->forkNum, tag->blockNum)));
|
||||
SSUnPinBuffer(buf_desc);
|
||||
ereport(WARNING, (errmodule(MOD_DMS),
|
||||
errmsg("[SS] CBMarkNeedFlush, buf_desc not valid")));
|
||||
return DMS_ERROR;
|
||||
}
|
||||
LWLockRelease((LWLock*)buf_ctrl->ctrl_lock);
|
||||
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
|
||||
if (buf_ctrl->is_edp) {
|
||||
ereport(PANIC, (errmsg("[SS] CBMarkNeedFlush, do not allow edp exist, please check")));
|
||||
ereport(PANIC, (errmsg("[SS][%u/%u/%u/%d %d-%u] CBMarkNeedFlush, do not allow edp exist, please check.",
|
||||
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
|
||||
tag->forkNum, tag->blockNum)));
|
||||
}
|
||||
#endif
|
||||
*is_edp = false;
|
||||
|
||||
ereport(LOG, (errmsg("[SS] CBMarkNeedFlush found buf: %u/%u/%u/%d %d-%u",
|
||||
tag->rnode.spcNode, tag->rnode.dbNode, tag->rnode.relNode, tag->rnode.bucketNode,
|
||||
tag->forkNum, tag->blockNum)));
|
||||
|
@ -1,3 +1,3 @@
|
||||
dms_commit_id=53254ee39283addc0c92ad19df63eac80929122d
|
||||
dms_commit_id=edff7169230b8995fcbd4ab49a048b866aac7e68
|
||||
dss_commit_id=e0d6825a1e97d9cbefeadcfdaa704df60eff4bef
|
||||
cbb_commit_id=e7cd5a5c633189c4a7c49960a3d146d83b74ae09
|
@ -202,7 +202,8 @@ static const char *BuiltinTrancheNames[] = {
|
||||
"PCABufferContentLock",
|
||||
"XlogTrackPartLock",
|
||||
"SSTxnStatusCachePartLock",
|
||||
"SSSnapshotXminCachePartLock"
|
||||
"SSSnapshotXminCachePartLock",
|
||||
"DmsBufCtrlLock"
|
||||
};
|
||||
|
||||
static void RegisterLWLockTranches(void);
|
||||
@ -377,6 +378,10 @@ int NumLWLocks(void)
|
||||
/* bufmgr.c needs two for each shared buffer */
|
||||
numLocks += 2 * TOTAL_BUFFER_NUM;
|
||||
|
||||
if (ENABLE_DMS) {
|
||||
numLocks += TOTAL_BUFFER_NUM; // dms_buf_ctrl_t lock
|
||||
}
|
||||
|
||||
/* each zone owns undo space lock */
|
||||
numLocks += MAX(maxConn, maxThreadNum) * numLockFactor * UNDO_ZONE_LOCK;
|
||||
|
||||
|
@ -33,7 +33,7 @@ extern "C" {
|
||||
#define DMS_LOCAL_MINOR_VER_WEIGHT 1000
|
||||
#define DMS_LOCAL_MAJOR_VERSION 0
|
||||
#define DMS_LOCAL_MINOR_VERSION 0
|
||||
#define DMS_LOCAL_VERSION 133
|
||||
#define DMS_LOCAL_VERSION 135
|
||||
|
||||
#define DMS_SUCCESS 0
|
||||
#define DMS_ERROR (-1)
|
||||
@ -437,6 +437,7 @@ typedef struct st_dms_buf_ctrl {
|
||||
unsigned long long pblk_lsn;
|
||||
unsigned char seg_fileno;
|
||||
unsigned int seg_blockno;
|
||||
void* ctrl_lock;
|
||||
#endif
|
||||
} dms_buf_ctrl_t;
|
||||
|
||||
@ -1115,6 +1116,7 @@ typedef struct st_dms_profile {
|
||||
char gsdb_home[DMS_LOG_PATH_LEN];
|
||||
unsigned char enable_mes_task_threadpool;
|
||||
unsigned int mes_task_worker_max_cnt;
|
||||
unsigned int max_alive_time_for_abnormal_status;
|
||||
} dms_profile_t;
|
||||
|
||||
typedef struct st_logger_param {
|
||||
|
@ -288,6 +288,7 @@ enum BuiltinTrancheIds
|
||||
LWTRANCHE_XLOG_TRACK_PARTITION,
|
||||
LWTRANCHE_SS_TXNSTATUS_PARTITION,
|
||||
LWTRANCHE_SS_SNAPSHOT_XMIN_PARTITION,
|
||||
LWTRANCHE_DMS_BUF_CTRL,
|
||||
/*
|
||||
* Each trancheId above should have a corresponding item in BuiltinTrancheNames;
|
||||
*/
|
||||
|
Reference in New Issue
Block a user