删除按需回放实时构建中无效代码

This commit is contained in:
chendong76
2024-06-15 11:26:59 +08:00
committed by yaoxin
parent c7edbf6da4
commit 76b94a051f
6 changed files with 11 additions and 170 deletions

View File

@ -33,7 +33,6 @@
#include "securec_check.h"
#include "miscadmin.h"
#include "access/double_write.h"
#include "access/ondemand_extreme_rto/dispatcher.h"
#include "access/multi_redo_api.h"
void InitDmsBufCtrl(void)
@ -231,11 +230,6 @@ RETRY:
void SmgrNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk)
{
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
if (SS_ONDEMAND_REALTIME_BUILD_FAILOVER && (buf_ctrl->state & BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED)) {
return;
}
/*
* prerequisite is that the page that initialized to zero in memory should be flush to disk
*/
@ -282,17 +276,6 @@ void SmgrNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, con
}
#endif
static Buffer ReadBufferInRealtimeBuildFailoverForDMS(BufferDesc* buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk)
{
Page page = (Page)BufHdrGetBlock(buf_desc);
XLogRecPtr ckptRedoPtr = pg_atomic_read_u64(&ondemand_extreme_rto::g_dispatcher->ckptRedoPtr);
if (XLByteLT(ckptRedoPtr, PageGetLSN(page))) {
return BufferDescriptorGetBuffer(buf_desc);
} else {
return ReadBuffer_common_for_dms(read_mode, buf_desc, pblk);
}
}
Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const XLogPhyBlock *pblk)
{
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
@ -301,21 +284,7 @@ Buffer TerminateReadPage(BufferDesc* buf_desc, ReadBufferMode read_mode, const X
if (g_instance.dms_cxt.SSRecoveryInfo.in_flushcopy && AmDmsReformProcProcess()) {
ereport(PANIC, (errmsg("SS In flush copy, can't read from disk!")));
}
/*
* do not allow pageredo workers read buffer from disk if standby node in ondemand
* realtime build status, because some buffer need init directly in recovery mode
*/
if (unlikely(AmPageRedoWorker() && (read_mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) && SS_IN_REFORM)) {
buf_ctrl->state &= ~BUF_READ_MODE_ONDEMAND_REALTIME_BUILD;
buffer = InvalidBuffer;
} else {
if (SS_ONDEMAND_REALTIME_BUILD_FAILOVER && (read_mode == RBM_NORMAL) &&
(buf_ctrl->state & BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED)) {
buffer = ReadBufferInRealtimeBuildFailoverForDMS(buf_desc, read_mode, pblk);
} else {
buffer = ReadBuffer_common_for_dms(read_mode, buf_desc, pblk);
}
}
buffer = ReadBuffer_common_for_dms(read_mode, buf_desc, pblk);
} else {
#ifdef USE_ASSERT_CHECKING
if (buf_ctrl->state & BUF_IS_EXTEND) {
@ -427,11 +396,6 @@ static bool DmsStartBufferIO(BufferDesc *buf_desc, LWLockMode mode)
#ifdef USE_ASSERT_CHECKING
void SegNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, SegSpace *spc)
{
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
if (SS_ONDEMAND_REALTIME_BUILD_FAILOVER && (buf_ctrl->state & BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED)) {
return;
}
/*
* prequisite is that the page that initialized to zero in memory should be flushed to disk,
* references to seg_extend
@ -455,39 +419,12 @@ void SegNetPageCheckDiskLSN(BufferDesc *buf_desc, ReadBufferMode read_mode, SegS
}
#endif
static Buffer ReadSegBufferInRealtimeBuildFailoverForDMS(BufferDesc* buf_desc, ReadBufferMode read_mode, SegSpace *spc)
{
Page page = (Page)BufHdrGetBlock(buf_desc);
XLogRecPtr ckptRedoPtr = pg_atomic_read_u64(&ondemand_extreme_rto::g_dispatcher->ckptRedoPtr);
if (XLByteLT(ckptRedoPtr, PageGetLSN(page))) {
SegTerminateBufferIO(buf_desc, false, BM_VALID);
return BufferDescriptorGetBuffer(buf_desc);
} else {
return ReadSegBufferForDMS(buf_desc, read_mode, spc);
}
}
Buffer TerminateReadSegPage(BufferDesc *buf_desc, ReadBufferMode read_mode, SegSpace *spc)
{
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
Buffer buffer;
if (buf_ctrl->state & BUF_NEED_LOAD) {
/*
* do not allow pageredo workers read buffer from disk if standby node in ondemand
* realtime build status, because some buffer need init directly in recovery mode
*/
if (unlikely(AmPageRedoWorker() && (read_mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) && SS_IN_REFORM)) {
buf_ctrl->state &= ~BUF_READ_MODE_ONDEMAND_REALTIME_BUILD;
SegTerminateBufferIO(buf_desc, false, BM_VALID);
buffer = InvalidBuffer;
} else {
if (SS_ONDEMAND_REALTIME_BUILD_FAILOVER && (read_mode == RBM_NORMAL) &&
(buf_ctrl->state & BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED)) {
buffer = ReadSegBufferInRealtimeBuildFailoverForDMS(buf_desc, read_mode, spc);
} else {
buffer = ReadSegBufferForDMS(buf_desc, read_mode, spc);
}
}
buffer = ReadSegBufferForDMS(buf_desc, read_mode, spc);
} else {
Page page = (Page)BufHdrGetBlock(buf_desc);
PageSetChecksumInplace(page, buf_desc->tag.blockNum);
@ -521,12 +458,6 @@ Buffer DmsReadSegPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode,
return buffer;
}
if (unlikely(AmPageRedoWorker() && (read_mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) && SS_IN_REFORM)) {
buf_ctrl->state &= ~BUF_READ_MODE_ONDEMAND_REALTIME_BUILD;
*with_io = false;
return InvalidBuffer;
}
if (!DmsCheckBufAccessible()) {
*with_io = false;
return 0;
@ -557,12 +488,6 @@ Buffer DmsReadPage(Buffer buffer, LWLockMode mode, ReadBufferMode read_mode, boo
return buffer;
}
if (unlikely(AmPageRedoWorker() && (read_mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) && SS_IN_REFORM)) {
buf_ctrl->state &= ~BUF_READ_MODE_ONDEMAND_REALTIME_BUILD;
*with_io = false;
return InvalidBuffer;
}
XLogPhyBlock pblk = {0, 0, 0};
if (OidIsValid(buf_ctrl->pblk_relno)) {
Assert(ExtentTypeIsValid(buf_ctrl->pblk_relno));
@ -806,19 +731,6 @@ void SSOndemandClearRedoDoneState()
}
}
static void SSOndemandCheckBufferState()
{
for (int buffer = 0; buffer < TOTAL_BUFFER_NUM; buffer++) {
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buffer);
Assert(!(buf_ctrl->state & BUF_READ_MODE_ONDEMAND_REALTIME_BUILD));
// realtime build pinned buffer are already mark dirty in CBFlushCopy, do not need label
if (buf_ctrl->state & BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED) {
buf_ctrl->state &= ~BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED;
}
}
}
void SSRecheckBufferPool()
{
uint64 buf_state;
@ -849,7 +761,6 @@ void SSRecheckBufferPool()
buf_desc->tag.rnode.bucketNode, buf_desc->tag.forkNum, buf_desc->tag.blockNum, (unsigned long long)pagelsn)));
}
}
SSOndemandCheckBufferState();
}
bool CheckPageNeedSkipInRecovery(Buffer buf, uint64 xlogLsn)
@ -1150,26 +1061,6 @@ bool SSWaitIOTimeout(BufferDesc *buf)
return ret;
}
bool SSOndemandRealtimeBuildAllowFlush(BufferDesc *buf_desc)
{
if (!ENABLE_DMS || IsInitdb) {
return true;
}
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(buf_desc->buf_id);
if (buf_ctrl->state & BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED) {
if (!SS_ONDEMAND_REALTIME_BUILD_DISABLED && IsExtremeRtoRunning()) {
XLogRecPtr ckptRedoPtr = pg_atomic_read_u64(&ondemand_extreme_rto::g_dispatcher->ckptRedoPtr);
XLogRecPtr bufferLsn = PageGetLSN(BufHdrGetBlock(buf_desc));
if (XLByteLT(ckptRedoPtr, bufferLsn)) {
return false;
}
}
buf_ctrl->state &= ~BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED;
}
return true;
}
Buffer SSReadBuffer(BufferTag *tag, ReadBufferMode mode)
{
Buffer buffer;
@ -1200,11 +1091,6 @@ bool SSNeedTerminateRequestPageInReform(dms_buf_ctrl_t *buf_ctrl)
if ((AmPageRedoProcess() || AmStartupProcess()) && dms_reform_failed()) {
return true;
}
if (AmPageRedoProcess() && SS_IN_REFORM && (buf_ctrl->state & BUF_READ_MODE_ONDEMAND_REALTIME_BUILD)) {
buf_ctrl->state &= ~BUF_READ_MODE_ONDEMAND_REALTIME_BUILD;
return true;
}
return false;
}

View File

@ -2533,7 +2533,6 @@ Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber fork
bool isExtend = false;
bool isLocalBuf = SmgrIsTemp(smgr);
bool need_repair = false;
dms_buf_ctrl_t *buf_ctrl = NULL;
*hit = false;
@ -2606,13 +2605,6 @@ Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber fork
}
}
if (ENABLE_DMS) {
buf_ctrl = GetDmsBufCtrl(bufHdr->buf_id);
if (mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) {
buf_ctrl->state |= BUF_READ_MODE_ONDEMAND_REALTIME_BUILD;
}
}
found_branch:
/* At this point we do NOT hold any locks.
*
@ -2641,7 +2633,7 @@ found_branch:
if (!isLocalBuf) {
if (mode == RBM_ZERO_AND_LOCK) {
if (ENABLE_DMS) {
buf_ctrl->state |= BUF_READ_MODE_ZERO_LOCK;
GetDmsBufCtrl(bufHdr->buf_id)->state |= BUF_READ_MODE_ZERO_LOCK;
LockBuffer(BufferDescriptorGetBuffer(bufHdr), BUFFER_LOCK_EXCLUSIVE);
} else {
LWLockAcquire(bufHdr->content_lock, LW_EXCLUSIVE);
@ -2657,7 +2649,7 @@ found_branch:
BufferDescSetPBLK(bufHdr, pblk);
} else if (mode == RBM_ZERO_AND_CLEANUP_LOCK) {
if (ENABLE_DMS) {
buf_ctrl->state |= BUF_READ_MODE_ZERO_LOCK;
GetDmsBufCtrl(bufHdr->buf_id)->state |= BUF_READ_MODE_ZERO_LOCK;
}
LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
}
@ -2761,6 +2753,7 @@ found_branch:
goto found_branch;
}
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(bufHdr->buf_id);
LWLockMode req_lock_mode = isExtend ? LW_EXCLUSIVE : LW_SHARED;
if (!LockModeCompatible(buf_ctrl, req_lock_mode)) {
if (!StartReadPage(bufHdr, req_lock_mode)) {
@ -2784,13 +2777,7 @@ found_branch:
break;
} while (true);
Buffer tmp_buffer = TerminateReadPage(bufHdr, mode, pblk);
if (BufferIsInvalid(tmp_buffer) && (mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) &&
!(buf_ctrl->state & BUF_READ_MODE_ONDEMAND_REALTIME_BUILD)) {
SSUnPinBuffer(bufHdr);
return InvalidBuffer;
}
return tmp_buffer;
return TerminateReadPage(bufHdr, mode, pblk);
}
ClearReadHint(bufHdr->buf_id);
}
@ -3091,7 +3078,7 @@ retry:
/* Pin the buffer and then release the buffer spinlock */
PinBuffer_Locked(buf);
if (!SSPageCheckIfCanEliminate(buf, old_flags) || !SSOndemandRealtimeBuildAllowFlush(buf)) {
if (!SSPageCheckIfCanEliminate(buf, old_flags)) {
// for dms this page cannot eliminate, get another one
UnpinBuffer(buf, true);
continue;
@ -6299,9 +6286,6 @@ retry:
read_mode = RBM_ZERO_AND_LOCK;
buf_ctrl->state &= ~BUF_READ_MODE_ZERO_LOCK;
}
if (buf_ctrl->state & BUF_READ_MODE_ONDEMAND_REALTIME_BUILD) {
read_mode = RBM_FOR_ONDEMAND_REALTIME_BUILD;
}
bool with_io_in_progress = true;
if (IsSegmentBufferID(buf->buf_id)) {
@ -6331,11 +6315,6 @@ retry:
g_instance.dms_cxt.SSRecoveryInfo.recovery_trapped_in_page_request = true;
}
if ((read_mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) &&
!(buf_ctrl->state & BUF_READ_MODE_ONDEMAND_REALTIME_BUILD)) {
return;
}
if (!DmsCheckBufAccessible()) {
dms_retry_times = 1;
} else {
@ -6352,9 +6331,6 @@ retry:
tag->forkNum, tag->blockNum, buf->buf_id))));
t_thrd.postgres_cxt.whereToSendOutput = output_backup;
}
if (read_mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) {
sleep_time = SS_BUF_WAIT_TIME_IN_ONDEMAND_REALTIME_BUILD;
}
pg_usleep(sleep_time);
goto retry;
}

View File

@ -565,20 +565,12 @@ Buffer ReadSegBufferForDMS(BufferDesc* bufHdr, ReadBufferMode mode, SegSpace *sp
Buffer ReadBufferFast(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode)
{
bool found = false;
dms_buf_ctrl_t *buf_ctrl;
/* Make sure we will have room to remember the buffer pin */
ResourceOwnerEnlargeBuffers(t_thrd.utils_cxt.CurrentResourceOwner);
BufferDesc *bufHdr = SegBufferAlloc(spc, rnode, forkNum, blockNum, &found);
if (ENABLE_DMS) {
buf_ctrl = GetDmsBufCtrl(bufHdr->buf_id);
if (mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) {
buf_ctrl->state |= BUF_READ_MODE_ONDEMAND_REALTIME_BUILD;
}
}
if (!found) {
SegmentCheck(!(pg_atomic_read_u64(&bufHdr->state) & BM_VALID));
@ -601,6 +593,7 @@ Buffer ReadBufferFast(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, Bloc
goto found_branch;
}
dms_buf_ctrl_t *buf_ctrl = GetDmsBufCtrl(bufHdr->buf_id);
LWLockMode lockmode = LW_SHARED;
if (!LockModeCompatible(buf_ctrl, lockmode)) {
if (!StartReadPage(bufHdr, lockmode)) {
@ -624,13 +617,7 @@ Buffer ReadBufferFast(SegSpace *spc, RelFileNode rnode, ForkNumber forkNum, Bloc
break;
} while (true);
Buffer tmp_buffer = TerminateReadSegPage(bufHdr, mode, spc);
if (BufferIsInvalid(tmp_buffer) && (mode == RBM_FOR_ONDEMAND_REALTIME_BUILD) &&
!(buf_ctrl->state & BUF_READ_MODE_ONDEMAND_REALTIME_BUILD)) {
SSUnPinBuffer(bufHdr);
return InvalidBuffer;
}
return tmp_buffer;
return TerminateReadSegPage(bufHdr, mode, spc);
}
if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK || mode == RBM_ZERO) {
@ -744,7 +731,7 @@ retry:
SegPinBufferLocked(buf, &new_tag);
if (!SSPageCheckIfCanEliminate(buf, old_flags) || !SSOndemandRealtimeBuildAllowFlush(buf)) {
if (!SSPageCheckIfCanEliminate(buf, old_flags)) {
SegUnpinBuffer(buf);
continue;
}

View File

@ -129,11 +129,7 @@
#define BUF_READ_MODE_ZERO_LOCK 0x80
#define BUF_DIRTY_NEED_FLUSH 0x100
#define BUF_ERTO_NEED_MARK_DIRTY 0x200
#define BUF_READ_MODE_ONDEMAND_REALTIME_BUILD 0x400
/* mark buffer is pinned in ondemand realtime build, which do not allow eliminated */
#define BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED 0x800
#define BUF_ONDEMAND_REDO_DONE 0x1000
#define BUF_ONDEMAND_REDO_DONE 0x400
#define SS_BROADCAST_FAILED_RETRYCOUNTS 4
#define SS_BROADCAST_WAIT_INFINITE (0xFFFFFFFFU)

View File

@ -83,7 +83,6 @@ bool SSWaitIOTimeout(BufferDesc *buf);
void buftag_get_buf_info(BufferTag tag, stat_buf_info_t *buf_info);
Buffer SSReadBuffer(BufferTag *tag, ReadBufferMode mode);
void DmsReleaseBuffer(int buffer, bool is_seg);
bool SSOndemandRealtimeBuildAllowFlush(BufferDesc *buf);
bool SSNeedTerminateRequestPageInReform(dms_buf_ctrl_t *buf_ctrl);
void ForgetBufferNeedCheckPin(Buffer buf_id);

View File

@ -76,9 +76,6 @@ typedef enum {
RBM_NORMAL_NO_LOG, /* Don't log page as invalid during WAL
* replay; otherwise same as RBM_NORMAL */
RBM_FOR_REMOTE, /* Like RBM_NORMAL, but not remote read again when PageIsVerified failed. */
RBM_FOR_ONDEMAND_REALTIME_BUILD /* Like RBM_NORMAL, only used in ondemand realtime time
* build (shared storage mode), need newest page by DMS,
* but do not load from disk */
} ReadBufferMode;
typedef enum