【资源池化】【ctrl锁优化页面相关流程】rebuild判断逻辑优化

This commit is contained in:
dongning12
2024-04-09 20:41:06 +08:00
committed by yaoxin
parent 983e00ffee
commit 205028182d
3 changed files with 66 additions and 69 deletions

View File

@ -668,7 +668,7 @@ void BufValidateDrc(BufferDesc *buf_desc)
dms_context_t dms_ctx;
InitDmsBufContext(&dms_ctx, buf_desc->tag);
unsigned long long lsn = (unsigned long long)BufferGetLSN(buf_desc);
bool is_dirty = (buf_desc->state & (BM_DIRTY | BM_JUST_DIRTIED)) > 0 ? true : false;
bool is_dirty = SSBufferIsDirty(buf_desc);
dms_validate_drc(&dms_ctx, buf_ctrl, lsn, (unsigned char)is_dirty);
}

View File

@ -713,7 +713,7 @@ static unsigned char CBPageDirty(dms_buf_ctrl_t *buf_ctrl)
return 0;
}
BufferDesc *buf_desc = GetBufferDescriptor(buf_ctrl->buf_id);
bool is_dirty = (pg_atomic_read_u64(&buf_desc->state) & (BM_DIRTY | BM_JUST_DIRTIED)) > 0;
bool is_dirty = SSBufferIsDirty(buf_desc);
return (unsigned char)is_dirty;
}
@ -1279,57 +1279,29 @@ static void CBSetDmsStatus(void *db_handle, int dms_status)
g_instance.dms_cxt.dms_status = (dms_status_t)dms_status;
}
static bool SSCheckBufferIfCanGoRebuild(BufferDesc* buf_desc, uint64 buf_state)
static int32 SSBufRebuildOneDrcInternal(BufferDesc *buf_desc, unsigned char thread_index)
{
bool ret = false;
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
if ((buf_state & BM_VALID) && (buf_ctrl->lock_mode != (unsigned char)DMS_LOCK_NULL)) {
ret = true;
} else if ((buf_state & BM_TAG_VALID) && (buf_ctrl->lock_mode != (unsigned char)DMS_LOCK_NULL)) {
if (LWLockConditionalAcquire(buf_desc->io_in_progress_lock, LW_SHARED)) {
ret = true;
} else {
/*
* In the condition of (Phase1)readbuffer_common->(Phase2)dms_request_page->(Phase3)seg_read->(Phase4)lock
* seg_head buffer, and stuck in Phase4 as reform happened. It will block the request of data pase as the
* lock mode of dms_buf_ctrl was already set to DMS_LOCK_EXCLUSIVE and IO is still in process.
* In order to get rid of this dilemma, we force set the lock mode back to null and don't rebuild this
* page. The stucked process will request the page again when it add content lock and the reformer will
* become owner when it request the page.
*/
ereport(WARNING, (errmsg("[%u/%u/%u/%d/0 %d-%u] Set lock mode to NULL, desc state:%lu, ctrl state:%u, lock mode:%d.",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum, buf_state, buf_ctrl->state, buf_ctrl->lock_mode)));
buf_ctrl->lock_mode = DMS_LOCK_NULL;
}
}
return ret;
}
static int32 SSRebuildBuf(BufferDesc *buf_desc, unsigned char thread_index)
{
#ifdef USE_ASSERT_CHECKING
if (IsSegmentPhysicalRelNode(buf_desc->tag.rnode)) {
SegNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL);
} else {
SmgrNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL);
}
#endif
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
Assert(buf_ctrl != NULL);
Assert(buf_ctrl->is_edp != 1);
Assert(XLogRecPtrIsValid(g_instance.dms_cxt.ckptRedo));
#endif
dms_context_t dms_ctx;
InitDmsBufContext(&dms_ctx, buf_desc->tag);
dms_ctrl_info_t ctrl_info = { 0 };
ctrl_info.ctrl = *buf_ctrl;
ctrl_info.lsn = (unsigned long long)BufferGetLSN(buf_desc);
ctrl_info.is_dirty = (buf_desc->state & (BM_DIRTY | BM_JUST_DIRTIED)) > 0 ? true : false;
ctrl_info.is_dirty = SSBufferIsDirty(buf_desc);
int ret = dms_buf_res_rebuild_drc_parallel(&dms_ctx, &ctrl_info, thread_index);
if (ret != DMS_SUCCESS) {
ereport(WARNING, (errmsg("Failed to rebuild page, rel:%u/%u/%u/%d, forknum:%d, blocknum:%u.",
ereport(WARNING, (errmsg("[%u/%u/%u/%d %d-%u][SS reform] rebuild page failed.",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum)));
return ret;
@ -1337,45 +1309,53 @@ static int32 SSRebuildBuf(BufferDesc *buf_desc, unsigned char thread_index)
return DMS_SUCCESS;
}
static int32 CBDrcBufRebuildInternal(int begin, int len, unsigned char thread_index)
static int SSBufRebuildOneDrc(int index, unsigned char thread_index)
{
uint64 buf_state;
Assert(begin >= 0 && len > 0 && (begin + len) <= TOTAL_BUFFER_NUM);
for (int i = begin; i < begin + len; i++) {
BufferDesc *buf_desc = GetBufferDescriptor(i);
bool need_rebuild = true;
if (LWLockConditionalAcquire(buf_desc->content_lock, LW_EXCLUSIVE)) {
buf_state = LockBufHdr(buf_desc);
if ((buf_state & BM_VALID) && !(buf_state & BM_DIRTY)) {
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
BufferDesc *buf_desc = GetBufferDescriptor(index);
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(index);
(void)SSPinBuffer(buf_desc);
bool need_rebuild = true;
LWLockAcquire((LWLock*)buf_ctrl->ctrl_lock, LW_EXCLUSIVE);
bool is_owner = DMS_BUF_CTRL_IS_OWNER(buf_ctrl);
LWLockRelease((LWLock*)buf_ctrl->ctrl_lock);
if (is_owner) {
if (LWLockConditionalAcquire(buf_desc->content_lock, LW_SHARED)) {
if (!SSBufferIsDirty(buf_desc)) {
LWLockAcquire((LWLock*)buf_ctrl->ctrl_lock, LW_EXCLUSIVE);
buf_ctrl->lock_mode = DMS_LOCK_NULL;
LWLockRelease((LWLock*)buf_ctrl->ctrl_lock);
need_rebuild = false;
ereport(DEBUG5, (errmsg("[%u/%u/%u/%d %d-%u][SS reform] no need rebuild, set lock_mode NULL.",
buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode,
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum)));
}
UnlockBufHdr(buf_desc, buf_state);
LWLockRelease(buf_desc->content_lock);
}
if (need_rebuild) {
buf_state = LockBufHdr(buf_desc);
if (SSCheckBufferIfCanGoRebuild(buf_desc, buf_state)) {
int ret = SSRebuildBuf(buf_desc, thread_index);
if (ret != DMS_SUCCESS) {
if (LWLockHeldByMe(buf_desc->io_in_progress_lock)) {
LWLockRelease(buf_desc->io_in_progress_lock);
}
UnlockBufHdr(buf_desc, buf_state);
return ret;
}
}
if (LWLockHeldByMe(buf_desc->io_in_progress_lock)) {
LWLockRelease(buf_desc->io_in_progress_lock);
}
UnlockBufHdr(buf_desc, buf_state);
LWLockRelease(buf_desc->content_lock);
}
} else {
need_rebuild = false;
}
SSUnPinBuffer(buf_desc);
if (need_rebuild) {
int ret = SSBufRebuildOneDrcInternal(buf_desc, thread_index);
return ret;
}
return DMS_SUCCESS;
}
static int32 CBBufRebuildDrcInternal(int begin, int len, unsigned char thread_index)
{
Assert(begin >= 0 && len > 0 && (begin + len) <= TOTAL_BUFFER_NUM);
int end = begin + len - 1;
for (int i = begin; i <= end; i++) {
int ret = SSBufRebuildOneDrc(i, thread_index);
if (ret != DMS_SUCCESS) {
return ret;
}
}
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform] rebuild buf thread_index:%d, buf_if start from:%d to:%d, max_buf_id:%d",
(int)thread_index, begin, (begin + len - 1), (TOTAL_BUFFER_NUM - 1))));
(int)thread_index, begin, end, (TOTAL_BUFFER_NUM - 1))));
return GS_SUCCESS;
}
@ -1388,7 +1368,7 @@ static int32 CBDrcBufRebuildInternal(int begin, int len, unsigned char thread_in
*/
const int dms_invalid_thread_index = 255;
const int dms_invalid_thread_num = 255;
static int32 CBDrcBufRebuildParallel(void* db_handle, unsigned char thread_index, unsigned char thread_num)
static int32 CBBufRebuildDrcParallel(void* db_handle, unsigned char thread_index, unsigned char thread_num)
{
Assert((thread_index == dms_invalid_thread_index && thread_num == dms_invalid_thread_num) ||
(thread_index != dms_invalid_thread_index && thread_num != dms_invalid_thread_num &&
@ -1403,7 +1383,7 @@ static int32 CBDrcBufRebuildParallel(void* db_handle, unsigned char thread_index
buf_begin = 0;
buf_num = TOTAL_BUFFER_NUM;
}
return CBDrcBufRebuildInternal(buf_begin, buf_num, thread_index);
return CBBufRebuildDrcInternal(buf_begin, buf_num, thread_index);
}
static int32 CBDrcBufValidate(void *db_handle)
@ -2160,7 +2140,7 @@ void DmsInitCallback(dms_callback_t *callback)
callback->opengauss_recovery_primary = CBRecoveryPrimary;
callback->get_dms_status = CBGetDmsStatus;
callback->set_dms_status = CBSetDmsStatus;
callback->dms_reform_rebuild_parallel = CBDrcBufRebuildParallel;
callback->dms_reform_rebuild_parallel = CBBufRebuildDrcParallel;
callback->dms_thread_init = DmsCallbackThreadShmemInit;
callback->confirm_owner = CBConfirmOwner;
callback->confirm_converting = CBConfirmConverting;

View File

@ -86,4 +86,21 @@ void DmsReleaseBuffer(int buffer, bool is_seg);
bool SSRequestPageInOndemandRealtimeBuild(BufferTag *bufferTag, XLogRecPtr recordLsn, XLogRecPtr *pageLsn);
bool SSOndemandRealtimeBuildAllowFlush(BufferDesc *buf);
bool SSNeedTerminateRequestPageInReform(dms_buf_ctrl_t *buf_ctrl);
inline bool SSBufferIsDirty(BufferDesc *buf_desc)
{
uint64 state = pg_atomic_read_u64(&buf_desc->state);
// no need to judge (BM_DIRTY | BM_JUST_DIRTIED), BM_DIRTY is enough
if (state & BM_DIRTY) {
#ifdef USE_ASSERT_CHECKING
Assert((state & BM_VALID) == BM_VALID);
#endif
return true;
}
if (ENABLE_DSS_AIO && buf_desc->extra->aio_in_progress) {
return true;
}
return false;
}
#endif