diff --git a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp index d0ac3f773..745d6f53d 100644 --- a/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp +++ b/src/gausskernel/ddes/adapter/ss_dms_bufmgr.cpp @@ -33,7 +33,6 @@ #include "securec_check.h" #include "miscadmin.h" #include "access/double_write.h" -#include "access/ondemand_extreme_rto/dispatcher.h" #include "access/multi_redo_api.h" void InitDmsBufCtrl(void) @@ -231,11 +230,6 @@ RETRY: void SmgrNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk) { - dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); - if (SS_ONDEMAND_REALTIME_BUILD_FAILOVER && (buf_ctrl->state & BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED)) { - return; - } - /* * prerequisite is that the page that initialized to zero in memory should be flush to disk */ @@ -282,17 +276,6 @@ void SmgrNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, con } #endif -static Buffer ReadBufferInRealtimeBuildFailoverForDMS(BufferDesc* buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk) -{ - Page page = (Page)BufHdrGetBlock(buf_desc); - XLogRecPtr ckptRedoPtr = pg_atomic_read_u64(&ondemand_extreme_rto::g_dispatcher->ckptRedoPtr); - if (XLByteLT(ckptRedoPtr, PageGetLSN(page))) { - return BufferDescriptorGetBuffer(buf_desc); - } else { - return ReadBuffer_common_for_dms(read_mode, buf_desc, pblk); - } -} - Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk) { dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); @@ -301,21 +284,7 @@ Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const X if (g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy && AmDmsReformProcProcess()) { ereport(PANIC, (errmsg("SS In flush copy, can't read from disk!"))); } - /* - * do not allow pageredo workers read buffer from disk if standby node in ondemand - * realtime build status, because some buffer need init directly in recovery mode - */ - if (unlikely(AmPageRedoWorker() && (read_mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) && SS_IN_REFORM)) { - buf_ctrl->state &= ~BUF_READ_MODE_ONDEMAND_REALTIME_BUILD; - buffer = InvalidBuffer; - } else { - if (SS_ONDEMAND_REALTIME_BUILD_FAILOVER && (read_mode == RBM_NORMAL) && - (buf_ctrl->state & BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED)) { - buffer = ReadBufferInRealtimeBuildFailoverForDMS(buf_desc, read_mode, pblk); - } else { - buffer = ReadBuffer_common_for_dms(read_mode, buf_desc, pblk); - } - } + buffer = ReadBuffer_common_for_dms(read_mode, buf_desc, pblk); } else { #ifdef USE_ASSERT_CHECKING if (buf_ctrl->state & BUF_IS_EXTEND) { @@ -427,11 +396,6 @@ static bool DmsStartBufferIO(BufferDesc *buf_desc, LWLockMode mode) #ifdef USE_ASSERT_CHECKING void SegNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, SegSpace *spc) { - dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); - if (SS_ONDEMAND_REALTIME_BUILD_FAILOVER && (buf_ctrl->state & BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED)) { - return; - } - /* * prequisite is that the page that initialized to zero in memory should be flushed to disk, * references to seg_extend @@ -455,39 +419,12 @@ void SegNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, SegS } #endif -static Buffer ReadSegBufferInRealtimeBuildFailoverForDMS(BufferDesc* buf_desc, ReadBufferMode read_mode, SegSpace *spc) -{ - Page page = (Page)BufHdrGetBlock(buf_desc); - XLogRecPtr ckptRedoPtr = pg_atomic_read_u64(&ondemand_extreme_rto::g_dispatcher->ckptRedoPtr); - if (XLByteLT(ckptRedoPtr, PageGetLSN(page))) { - SegTerminateBufferIO(buf_desc, false, BM_VALID); - return BufferDescriptorGetBuffer(buf_desc); - } else { - return ReadSegBufferForDMS(buf_desc, read_mode, spc); - } -} - Buffer TerminateReadSegPage(BufferDesc *buf_desc, ReadBufferMode read_mode, SegSpace *spc) { dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); Buffer buffer; if (buf_ctrl->state & BUF_NEED_LOAD) { - /* - * do not allow pageredo workers read buffer from disk if standby node in ondemand - * realtime build status, because some buffer need init directly in recovery mode - */ - if (unlikely(AmPageRedoWorker() && (read_mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) && SS_IN_REFORM)) { - buf_ctrl->state &= ~BUF_READ_MODE_ONDEMAND_REALTIME_BUILD; - SegTerminateBufferIO(buf_desc, false, BM_VALID); - buffer = InvalidBuffer; - } else { - if (SS_ONDEMAND_REALTIME_BUILD_FAILOVER && (read_mode == RBM_NORMAL) && - (buf_ctrl->state & BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED)) { - buffer = ReadSegBufferInRealtimeBuildFailoverForDMS(buf_desc, read_mode, spc); - } else { - buffer = ReadSegBufferForDMS(buf_desc, read_mode, spc); - } - } + buffer = ReadSegBufferForDMS(buf_desc, read_mode, spc); } else { Page page = (Page)BufHdrGetBlock(buf_desc); PageSetChecksumInplace(page, buf_desc->tag.blockNum); @@ -521,12 +458,6 @@ Buffer DmsReadSegPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, return buffer; } - if (unlikely(AmPageRedoWorker() && (read_mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) && SS_IN_REFORM)) { - buf_ctrl->state &= ~BUF_READ_MODE_ONDEMAND_REALTIME_BUILD; - *with_io = false; - return InvalidBuffer; - } - if (!DmsCheckBufAccessible()) { *with_io = false; return 0; @@ -557,12 +488,6 @@ Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, boo return buffer; } - if (unlikely(AmPageRedoWorker() && (read_mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) && SS_IN_REFORM)) { - buf_ctrl->state &= ~BUF_READ_MODE_ONDEMAND_REALTIME_BUILD; - *with_io = false; - return InvalidBuffer; - } - XLogPhyBlock pblk = {0, 0, 0}; if (OidIsValid(buf_ctrl->pblk_relno)) { Assert(ExtentTypeIsValid(buf_ctrl->pblk_relno)); @@ -806,19 +731,6 @@ void SSOndemandClearRedoDoneState() } } -static void SSOndemandCheckBufferState() -{ - for (int buffer = 0; buffer < TOTAL_BUFFER_NUM; buffer++) { - dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buffer); - Assert(!(buf_ctrl->state & BUF_READ_MODE_ONDEMAND_REALTIME_BUILD)); - - // realtime build pinned buffer are already mark dirty in CBFlushCopy, do not need label - if (buf_ctrl->state & BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED) { - buf_ctrl->state &= ~BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED; - } - } -} - void SSRecheckBufferPool() { uint64 buf_state; @@ -849,7 +761,6 @@ void SSRecheckBufferPool() buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum, (unsigned long long)pagelsn))); } } - SSOndemandCheckBufferState(); } bool CheckPageNeedSkipInRecovery(Buffer buf, uint64 xlogLsn) @@ -1150,26 +1061,6 @@ bool SSWaitIOTimeout(BufferDesc *buf) return ret; } -bool SSOndemandRealtimeBuildAllowFlush(BufferDesc *buf_desc) -{ - if (!ENABLE_DMS || IsInitdb) { - return true; - } - - dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id); - if (buf_ctrl->state & BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED) { - if (!SS_ONDEMAND_REALTIME_BUILD_DISABLED && IsExtremeRtoRunning()) { - XLogRecPtr ckptRedoPtr = pg_atomic_read_u64(&ondemand_extreme_rto::g_dispatcher->ckptRedoPtr); - XLogRecPtr bufferLsn = PageGetLSN(BufHdrGetBlock(buf_desc)); - if (XLByteLT(ckptRedoPtr, bufferLsn)) { - return false; - } - } - buf_ctrl->state &= ~BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED; - } - return true; -} - Buffer SSReadBuffer(BufferTag *tag, ReadBufferMode mode) { Buffer buffer; @@ -1200,11 +1091,6 @@ bool SSNeedTerminateRequestPageInReform(dms_buf_ctrl_t *buf_ctrl) if ((AmPageRedoProcess() || AmStartupProcess()) && dms_reform_failed()) { return true; } - - if (AmPageRedoProcess() && SS_IN_REFORM && (buf_ctrl->state & BUF_READ_MODE_ONDEMAND_REALTIME_BUILD)) { - buf_ctrl->state &= ~BUF_READ_MODE_ONDEMAND_REALTIME_BUILD; - return true; - } return false; } diff --git a/src/gausskernel/storage/buffer/bufmgr.cpp b/src/gausskernel/storage/buffer/bufmgr.cpp index 912a33b32..93a7f0894 100644 --- a/src/gausskernel/storage/buffer/bufmgr.cpp +++ b/src/gausskernel/storage/buffer/bufmgr.cpp @@ -2533,7 +2533,6 @@ Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber fork bool isExtend = false; bool isLocalBuf = SmgrIsTemp(smgr); bool need_repair = false; - dms_buf_ctrl_t *buf_ctrl = NULL; *hit = false; @@ -2606,13 +2605,6 @@ Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber fork } } - if (ENABLE_DMS) { - buf_ctrl = GetDmsBufCtrl(bufHdr->buf_id); - if (mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) { - buf_ctrl->state |= BUF_READ_MODE_ONDEMAND_REALTIME_BUILD; - } - } - found_branch: /* At this point we do NOT hold any locks. * @@ -2641,7 +2633,7 @@ found_branch: if (!isLocalBuf) { if (mode == RBM_ZERO_AND_LOCK) { if (ENABLE_DMS) { - buf_ctrl->state |= BUF_READ_MODE_ZERO_LOCK; + GetDmsBufCtrl(bufHdr->buf_id)->state |= BUF_READ_MODE_ZERO_LOCK; LockBuffer(BufferDescriptorGetBuffer(bufHdr), BUFFER_LOCK_EXCLUSIVE); } else { LWLockAcquire(bufHdr->content_lock, LW_EXCLUSIVE); @@ -2657,7 +2649,7 @@ found_branch: BufferDescSetPBLK(bufHdr, pblk); } else if (mode == RBM_ZERO_AND_CLEANUP_LOCK) { if (ENABLE_DMS) { - buf_ctrl->state |= BUF_READ_MODE_ZERO_LOCK; + GetDmsBufCtrl(bufHdr->buf_id)->state |= BUF_READ_MODE_ZERO_LOCK; } LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr)); } @@ -2761,6 +2753,7 @@ found_branch: goto found_branch; } + dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(bufHdr->buf_id); LWLockMode req_lock_mode = isExtend ? LW_EXCLUSIVE : LW_SHARED; if (!LockModeCompatible(buf_ctrl, req_lock_mode)) { if (!StartReadPage(bufHdr, req_lock_mode)) { @@ -2784,13 +2777,7 @@ found_branch: break; } while (true); - Buffer tmp_buffer = TerminateReadPage(bufHdr, mode, pblk); - if (BufferIsInvalid(tmp_buffer) && (mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) && - !(buf_ctrl->state & BUF_READ_MODE_ONDEMAND_REALTIME_BUILD)) { - SSUnPinBuffer(bufHdr); - return InvalidBuffer; - } - return tmp_buffer; + return TerminateReadPage(bufHdr, mode, pblk); } ClearReadHint(bufHdr->buf_id); } @@ -3091,7 +3078,7 @@ retry: /* Pin the buffer and then release the buffer spinlock */ PinBuffer_Locked(buf); - if (!SSPageCheckIfCanEliminate(buf, old_flags) || !SSOndemandRealtimeBuildAllowFlush(buf)) { + if (!SSPageCheckIfCanEliminate(buf, old_flags)) { // for dms this page cannot eliminate, get another one UnpinBuffer(buf, true); continue; @@ -6299,9 +6286,6 @@ retry: read_mode = RBM_ZERO_AND_LOCK; buf_ctrl->state &= ~BUF_READ_MODE_ZERO_LOCK; } - if (buf_ctrl->state & BUF_READ_MODE_ONDEMAND_REALTIME_BUILD) { - read_mode = RBM_FOR_ONDEMAND_REALTIME_BUILD; - } bool with_io_in_progress = true; if (IsSegmentBufferID(buf->buf_id)) { @@ -6331,11 +6315,6 @@ retry: g_instance.dms_cxt.SSRecoveryInfo.recovery_trapped_in_page_request = true; } - if ((read_mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) && - !(buf_ctrl->state & BUF_READ_MODE_ONDEMAND_REALTIME_BUILD)) { - return; - } - if (!DmsCheckBufAccessible()) { dms_retry_times = 1; } else { @@ -6352,9 +6331,6 @@ retry: tag->forkNum, tag->blockNum, buf->buf_id)))); t_thrd.postgres_cxt.whereToSendOutput = output_backup; } - if (read_mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) { - sleep_time = SS_BUF_WAIT_TIME_IN_ONDEMAND_REALTIME_BUILD; - } pg_usleep(sleep_time); goto retry; } diff --git a/src/gausskernel/storage/smgr/segment/segbuffer.cpp b/src/gausskernel/storage/smgr/segment/segbuffer.cpp index 0a2e900a7..fca49c654 100644 --- a/src/gausskernel/storage/smgr/segment/segbuffer.cpp +++ b/src/gausskernel/storage/smgr/segment/segbuffer.cpp @@ -565,20 +565,12 @@ Buffer ReadSegBufferForDMS(BufferDesc* bufHdr, ReadBufferMode mode, SegSpace *sp Buffer ReadBufferFast(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode) { bool found = false; - dms_buf_ctrl_t *buf_ctrl; /* Make sure we will have room to remember the buffer pin */ ResourceOwnerEnlargeBuffers(t_thrd.utils_cxt.CurrentResourceOwner); BufferDesc *bufHdr = SegBufferAlloc(spc, rnode, forkNum, blockNum, &found); - if (ENABLE_DMS) { - buf_ctrl = GetDmsBufCtrl(bufHdr->buf_id); - if (mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) { - buf_ctrl->state |= BUF_READ_MODE_ONDEMAND_REALTIME_BUILD; - } - } - if (!found) { SegmentCheck(!(pg_atomic_read_u64(&bufHdr->state) & BM_VALID)); @@ -601,6 +593,7 @@ Buffer ReadBufferFast(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, Bloc goto found_branch; } + dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(bufHdr->buf_id); LWLockMode lockmode = LW_SHARED; if (!LockModeCompatible(buf_ctrl, lockmode)) { if (!StartReadPage(bufHdr, lockmode)) { @@ -624,13 +617,7 @@ Buffer ReadBufferFast(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, Bloc break; } while (true); - Buffer tmp_buffer = TerminateReadSegPage(bufHdr, mode, spc); - if (BufferIsInvalid(tmp_buffer) && (mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) && - !(buf_ctrl->state & BUF_READ_MODE_ONDEMAND_REALTIME_BUILD)) { - SSUnPinBuffer(bufHdr); - return InvalidBuffer; - } - return tmp_buffer; + return TerminateReadSegPage(bufHdr, mode, spc); } if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK || mode == RBM_ZERO) { @@ -744,7 +731,7 @@ retry: SegPinBufferLocked(buf, &new_tag); - if (!SSPageCheckIfCanEliminate(buf, old_flags) || !SSOndemandRealtimeBuildAllowFlush(buf)) { + if (!SSPageCheckIfCanEliminate(buf, old_flags)) { SegUnpinBuffer(buf); continue; } diff --git a/src/include/ddes/dms/ss_common_attr.h b/src/include/ddes/dms/ss_common_attr.h index 492ac996b..378aba81d 100644 --- a/src/include/ddes/dms/ss_common_attr.h +++ b/src/include/ddes/dms/ss_common_attr.h @@ -129,11 +129,7 @@ #define BUF_READ_MODE_ZERO_LOCK 0x80 #define BUF_DIRTY_NEED_FLUSH 0x100 #define BUF_ERTO_NEED_MARK_DIRTY 0x200 - -#define BUF_READ_MODE_ONDEMAND_REALTIME_BUILD 0x400 -/* mark buffer is pinned in ondemand realtime build, which do not allow eliminated */ -#define BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED 0x800 -#define BUF_ONDEMAND_REDO_DONE 0x1000 +#define BUF_ONDEMAND_REDO_DONE 0x400 #define SS_BROADCAST_FAILED_RETRYCOUNTS 4 #define SS_BROADCAST_WAIT_INFINITE (0xFFFFFFFFU) diff --git a/src/include/ddes/dms/ss_dms_bufmgr.h b/src/include/ddes/dms/ss_dms_bufmgr.h index 4e56ed596..152261a4e 100644 --- a/src/include/ddes/dms/ss_dms_bufmgr.h +++ b/src/include/ddes/dms/ss_dms_bufmgr.h @@ -83,7 +83,6 @@ bool SSWaitIOTimeout(BufferDesc *buf); void buftag_get_buf_info(BufferTag tag, stat_buf_info_t *buf_info); Buffer SSReadBuffer(BufferTag *tag, ReadBufferMode mode); void DmsReleaseBuffer(int buffer, bool is_seg); -bool SSOndemandRealtimeBuildAllowFlush(BufferDesc *buf); bool SSNeedTerminateRequestPageInReform(dms_buf_ctrl_t *buf_ctrl); void ForgetBufferNeedCheckPin(Buffer buf_id); diff --git a/src/include/storage/buf/bufmgr.h b/src/include/storage/buf/bufmgr.h index e9828fe75..796200213 100644 --- a/src/include/storage/buf/bufmgr.h +++ b/src/include/storage/buf/bufmgr.h @@ -76,9 +76,6 @@ typedef enum { RBM_NORMAL_NO_LOG, /* Don't log page as invalid during WAL * replay; otherwise same as RBM_NORMAL */ RBM_FOR_REMOTE, /* Like RBM_NORMAL, but not remote read again when PageIsVerified failed. */ - RBM_FOR_ONDEMAND_REALTIME_BUILD /* Like RBM_NORMAL, only used in ondemand realtime time - * build (shared storage mode), need newest page by DMS, - * but do not load from disk */ } ReadBufferMode; typedef enum