diff --git a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp index 4f3a79dcd..0c6f6249e 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp @@ -668,7 +668,7 @@ void BufValidateDrc(BufferDesc *buf_desc) dms_context_t dms_ctx; InitDmsBufContext(&dms_ctx, buf_desc->tag); unsigned long long lsn = (unsigned long long)BufferGetLSN(buf_desc); - bool is_dirty = (buf_desc->state & (BM_DIRTY | BM_JUST_DIRTIED)) > 0 ? true : false; + bool is_dirty = SSBufferIsDirty(buf_desc); dms_validate_drc(&dms_ctx, buf_ctrl, lsn, (unsigned char)is_dirty); } diff --git a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp index ca541c47e..afb88cd3e 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_callback.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_callback.cpp @@ -713,7 +713,7 @@ static unsigned char CBPageDirty(dms_buf_ctrl_t *buf_ctrl) return 0; } BufferDesc *buf_desc = GetBufferDescriptor(buf_ctrl->buf_id); - bool is_dirty = (pg_atomic_read_u64(&buf_desc->state) & (BM_DIRTY | BM_JUST_DIRTIED)) > 0; + bool is_dirty = SSBufferIsDirty(buf_desc); return (unsigned char)is_dirty; } @@ -1279,57 +1279,29 @@ static void CBSetDmsStatus(void *db_handle, int dms_status) g_instance.dms_cxt.dms_status = (dms_status_t)dms_status; } -static bool SSCheckBufferIfCanGoRebuild(BufferDesc* buf_desc, uint64 buf_state) +static int32 SSBufRebuildOneDrcInternal(BufferDesc *buf_desc, unsigned char thread_index) { - bool ret = false; dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); - if ((buf_state & BM_VALID) && (buf_ctrl->lock_mode != (unsigned char)DMS_LOCK_NULL)) { - ret = true; - } else if ((buf_state & BM_TAG_VALID) && (buf_ctrl->lock_mode != (unsigned char)DMS_LOCK_NULL)) { - if (LWLockConditionalAcquire(buf_desc->io_in_progress_lock, LW_SHARED)) { - ret = true; - } else { - /* - * In the condition of (Phase1)readbuffer_common->(Phase2)dms_request_page->(Phase3)seg_read->(Phase4)lock - * seg_head buffer, and stuck in Phase4 as reform happened. It will block the request of data pase as the - * lock mode of dms_buf_ctrl was already set to DMS_LOCK_EXCLUSIVE and IO is still in process. - * In order to get rid of this dilemma, we force set the lock mode back to null and don't rebuild this - * page. The stucked process will request the page again when it add content lock and the reformer will - * become owner when it request the page. - */ - ereport(WARNING, (errmsg("[%u/%u/%u/%d/0 %d-%u] Set lock mode to NULL, desc state:%lu, ctrl state:%u, lock mode:%d.", - buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode, - buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum, buf_state, buf_ctrl->state, buf_ctrl->lock_mode))); - buf_ctrl->lock_mode = DMS_LOCK_NULL; - } - } - - return ret; -} - -static int32 SSRebuildBuf(BufferDesc *buf_desc, unsigned char thread_index) -{ #ifdef USE_ASSERT_CHECKING if (IsSegmentPhysicalRelNode(buf_desc->tag.rnode)) { SegNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL); } else { SmgrNetPageCheckDiskLSN(buf_desc, RBM_NORMAL, NULL); } -#endif - - dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); + Assert(buf_ctrl != NULL); Assert(buf_ctrl->is_edp != 1); Assert(XLogRecPtrIsValid(g_instance.dms_cxt.ckptRedo)); +#endif dms_context_t dms_ctx; InitDmsBufContext(&dms_ctx, buf_desc->tag); dms_ctrl_info_t ctrl_info = { 0 }; ctrl_info.ctrl = *buf_ctrl; ctrl_info.lsn = (unsigned long long)BufferGetLSN(buf_desc); - ctrl_info.is_dirty = (buf_desc->state & (BM_DIRTY | BM_JUST_DIRTIED)) > 0 ? true : false; + ctrl_info.is_dirty = SSBufferIsDirty(buf_desc); int ret = dms_buf_res_rebuild_drc_parallel(&dms_ctx, &ctrl_info, thread_index); if (ret != DMS_SUCCESS) { - ereport(WARNING, (errmsg("Failed to rebuild page, rel:%u/%u/%u/%d, forknum:%d, blocknum:%u.", + ereport(WARNING, (errmsg("[%u/%u/%u/%d %d-%u][SS reform] rebuild page failed.", buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode, buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum))); return ret; @@ -1337,45 +1309,53 @@ static int32 SSRebuildBuf(BufferDesc *buf_desc, unsigned char thread_index) return DMS_SUCCESS; } -static int32 CBDrcBufRebuildInternal(int begin, int len, unsigned char thread_index) +static int SSBufRebuildOneDrc(int index, unsigned char thread_index) { - uint64 buf_state; - Assert(begin >= 0 && len > 0 && (begin + len) <= TOTAL_BUFFER_NUM); - for (int i = begin; i < begin + len; i++) { - BufferDesc *buf_desc = GetBufferDescriptor(i); - bool need_rebuild = true; - if (LWLockConditionalAcquire(buf_desc->content_lock, LW_EXCLUSIVE)) { - buf_state = LockBufHdr(buf_desc); - if ((buf_state & BM_VALID) && !(buf_state & BM_DIRTY)) { - dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); + BufferDesc *buf_desc = GetBufferDescriptor(index); + dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(index); + (void)SSPinBuffer(buf_desc); + bool need_rebuild = true; + LWLockAcquire((LWLock*)buf_ctrl->ctrl_lock, LW_EXCLUSIVE); + bool is_owner = DMS_BUF_CTRL_IS_OWNER(buf_ctrl); + LWLockRelease((LWLock*)buf_ctrl->ctrl_lock); + if (is_owner) { + if (LWLockConditionalAcquire(buf_desc->content_lock, LW_SHARED)) { + if (!SSBufferIsDirty(buf_desc)) { + LWLockAcquire((LWLock*)buf_ctrl->ctrl_lock, LW_EXCLUSIVE); buf_ctrl->lock_mode = DMS_LOCK_NULL; + LWLockRelease((LWLock*)buf_ctrl->ctrl_lock); need_rebuild = false; + ereport(DEBUG5, (errmsg("[%u/%u/%u/%d %d-%u][SS reform] no need rebuild, set lock_mode NULL.", + buf_desc->tag.rnode.spcNode, buf_desc->tag.rnode.dbNode, buf_desc->tag.rnode.relNode, + buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum))); } - UnlockBufHdr(buf_desc, buf_state); - LWLockRelease(buf_desc->content_lock); - } - - if (need_rebuild) { - buf_state = LockBufHdr(buf_desc); - if (SSCheckBufferIfCanGoRebuild(buf_desc, buf_state)) { - int ret = SSRebuildBuf(buf_desc, thread_index); - if (ret != DMS_SUCCESS) { - if (LWLockHeldByMe(buf_desc->io_in_progress_lock)) { - LWLockRelease(buf_desc->io_in_progress_lock); - } - UnlockBufHdr(buf_desc, buf_state); - return ret; - } - } - if (LWLockHeldByMe(buf_desc->io_in_progress_lock)) { - LWLockRelease(buf_desc->io_in_progress_lock); - } - UnlockBufHdr(buf_desc, buf_state); + LWLockRelease(buf_desc->content_lock); + } + } else { + need_rebuild = false; + } + SSUnPinBuffer(buf_desc); + + if (need_rebuild) { + int ret = SSBufRebuildOneDrcInternal(buf_desc, thread_index); + return ret; + } + return DMS_SUCCESS; +} + +static int32 CBBufRebuildDrcInternal(int begin, int len, unsigned char thread_index) +{ + Assert(begin >= 0 && len > 0 && (begin + len) <= TOTAL_BUFFER_NUM); + int end = begin + len - 1; + for (int i = begin; i <= end; i++) { + int ret = SSBufRebuildOneDrc(i, thread_index); + if (ret != DMS_SUCCESS) { + return ret; } } ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] rebuild buf thread_index:%d, buf_if start from:%d to:%d, max_buf_id:%d", - (int)thread_index, begin, (begin + len - 1), (TOTAL_BUFFER_NUM - 1)))); + (int)thread_index, begin, end, (TOTAL_BUFFER_NUM - 1)))); return GS_SUCCESS; } @@ -1388,7 +1368,7 @@ static int32 CBDrcBufRebuildInternal(int begin, int len, unsigned char thread_in */ const int dms_invalid_thread_index = 255; const int dms_invalid_thread_num = 255; -static int32 CBDrcBufRebuildParallel(void* db_handle, unsigned char thread_index, unsigned char thread_num) +static int32 CBBufRebuildDrcParallel(void* db_handle, unsigned char thread_index, unsigned char thread_num) { Assert((thread_index == dms_invalid_thread_index && thread_num == dms_invalid_thread_num) || (thread_index != dms_invalid_thread_index && thread_num != dms_invalid_thread_num && @@ -1403,7 +1383,7 @@ static int32 CBDrcBufRebuildParallel(void* db_handle, unsigned char thread_index buf_begin = 0; buf_num = TOTAL_BUFFER_NUM; } - return CBDrcBufRebuildInternal(buf_begin, buf_num, thread_index); + return CBBufRebuildDrcInternal(buf_begin, buf_num, thread_index); } static int32 CBDrcBufValidate(void *db_handle) @@ -2160,7 +2140,7 @@ void DmsInitCallback(dms_callback_t *callback) callback->opengauss_recovery_primary = CBRecoveryPrimary; callback->get_dms_status = CBGetDmsStatus; callback->set_dms_status = CBSetDmsStatus; - callback->dms_reform_rebuild_parallel = CBDrcBufRebuildParallel; + callback->dms_reform_rebuild_parallel = CBBufRebuildDrcParallel; callback->dms_thread_init = DmsCallbackThreadShmemInit; callback->confirm_owner = CBConfirmOwner; callback->confirm_converting = CBConfirmConverting; diff --git a/src/include/ddes/dms/ss_dms_bufmgr.h b/src/include/ddes/dms/ss_dms_bufmgr.h index 269f944de..8a8b90164 100644 --- a/src/include/ddes/dms/ss_dms_bufmgr.h +++ b/src/include/ddes/dms/ss_dms_bufmgr.h @@ -86,4 +86,21 @@ void DmsReleaseBuffer(int buffer, bool is_seg); bool SSRequestPageInOndemandRealtimeBuild(BufferTag *bufferTag, XLogRecPtr recordLsn, XLogRecPtr *pageLsn); bool SSOndemandRealtimeBuildAllowFlush(BufferDesc *buf); bool SSNeedTerminateRequestPageInReform(dms_buf_ctrl_t *buf_ctrl); + +inline bool SSBufferIsDirty(BufferDesc *buf_desc) +{ + uint64 state = pg_atomic_read_u64(&buf_desc->state); + // no need to judge (BM_DIRTY | BM_JUST_DIRTIED), BM_DIRTY is enough + if (state & BM_DIRTY) { +#ifdef USE_ASSERT_CHECKING + Assert((state & BM_VALID) == BM_VALID); +#endif + return true; + } + if (ENABLE_DSS_AIO && buf_desc->extra->aio_in_progress) { + return true; + } + return false; +} + #endif