fix high system load for reading clients with hot standby

This commit is contained in:
LiHeng
2020-11-22 22:58:29 +08:00
parent 8830ce04eb
commit 6dfb1d9a03
4 changed files with 56 additions and 30 deletions

View File

@ -115,8 +115,9 @@ static inline int32 GetPrivateRefCount(Buffer buffer);
static void ForgetPrivateRefCountEntry(PrivateRefCountEntry* ref);
static void CheckForBufferLeaks(void);
static int ts_ckpt_progress_comparator(Datum a, Datum b, void* arg);
static bool ReadBuffer_common_ReadBlock(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
BlockNumber blockNum, ReadBufferMode mode, bool isExtend, Block bufBlock);
static bool ReadBuffer_common_ReadBlock(SMgrRelation smgr, char relpersistence,
ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, bool isExtend,
Block bufBlock, bool *blockExist);
/*
@ -1839,8 +1840,12 @@ Buffer ReadBuffer_common_for_localbuf(RelFileNode rnode, char relpersistence, Fo
bufBlock = LocalBufHdrGetBlock(bufHdr);
// ReadBuffer_common_ReadBlock();
(void)ReadBuffer_common_ReadBlock(smgr, relpersistence, forkNum, blockNum, mode, isExtend, bufBlock);
bool block_exist = true;
(void)ReadBuffer_common_ReadBlock(smgr, relpersistence, forkNum, blockNum, mode,
isExtend, bufBlock, &block_exist);
if (!block_exist) {
return InvalidBuffer;
}
uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
buf_state |= BM_VALID;
@ -1872,7 +1877,12 @@ Buffer ReadBuffer_common_for_direct(
XLogRedoBufferGetBlkFunc(bufferslot, &bufBlock);
Assert(bufBlock != NULL);
(void)ReadBuffer_common_ReadBlock(smgr, relpersistence, forkNum, blockNum, mode, isExtend, bufBlock);
bool block_exist = true;
(void)ReadBuffer_common_ReadBlock(smgr, relpersistence, forkNum, blockNum, mode,
isExtend, bufBlock, &block_exist);
if (!block_exist) {
return InvalidBuffer;
}
XLogRedoBufferSetStateFunc(bufferslot, BM_VALID);
return RedoBufferSlotGetBuffer(bufferslot);
}
@ -1883,7 +1893,7 @@ Buffer ReadBuffer_common_for_direct(
* * 2020-03-05
*/
static bool ReadBuffer_common_ReadBlock(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
BlockNumber blockNum, ReadBufferMode mode, bool isExtend, Block bufBlock)
BlockNumber blockNum, ReadBufferMode mode, bool isExtend, Block bufBlock, bool *blockExist)
{
bool needputtodirty = false;
@ -1928,7 +1938,7 @@ static bool ReadBuffer_common_ReadBlock(SMgrRelation smgr, char relpersistence,
INSTR_TIME_SET_CURRENT(io_start);
smgrread(smgr, forkNum, blockNum, (char*)bufBlock);
*blockExist = smgrread(smgr, forkNum, blockNum, (char*)bufBlock);
if (u_sess->attr.attr_common.track_io_timing) {
INSTR_TIME_SET_CURRENT(io_time);
@ -1942,6 +1952,13 @@ static bool ReadBuffer_common_ReadBlock(SMgrRelation smgr, char relpersistence,
pgstatCountBlocksReadTime4SessionLevel(INSTR_TIME_GET_MICROSEC(io_time));
}
#ifndef ENABLE_MULTIPLE_NODES
/* Block not exists */
if (!(*blockExist)) {
return false;
}
#endif
/* check for garbage data */
if (!PageIsVerified((Page)bufBlock, blockNum)) {
addBadBlockStat(&smgr->smgr_rnode.node, forkNum);
@ -2021,22 +2038,6 @@ static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumb
if (is_extend)
block_num = smgrnblocks(smgr, fork_num);
/* When the parallel redo is enabled, there may be a scenario where
* the index is replayed before the page replayed. For single-mode,
* readable standby feature, Operators related to index scan access
* the index first, then access the table, and you will find that
* the tid or the heap(page) does not exist. Because the transaction
* was not originally committed, the tid or the heap(page) should not
* be visible. So accessing the non-existent heap tuple by the tid
* should return that the tuple does not exist without error reporting.
*/
#ifndef ENABLE_MULTIPLE_NODES
else if(RecoveryInProgress()) {
if(block_num >= smgrnblocks(smgr, fork_num))
return InvalidBuffer;
}
#endif
if (isLocalBuf) {
buf_desc = LocalBufferAlloc(smgr, fork_num, block_num, &found);
if (found) {
@ -2166,8 +2167,24 @@ static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumb
buf_block = isLocalBuf ? LocalBufHdrGetBlock(buf_desc) : BufHdrGetBlock(buf_desc);
bool block_exist = true;
bool needputtodirty =
ReadBuffer_common_ReadBlock(smgr, relpersistence, fork_num, block_num, mode, is_extend, buf_block);
ReadBuffer_common_ReadBlock(smgr, relpersistence, fork_num, block_num,
mode, is_extend, buf_block, &block_exist);
/*
* When the parallel redo is enabled, there may be a scenario where
* the index is replayed before the page replayed. For single-mode,
* readable standby feature, Operators related to index scan access
* the index first, then access the table, and you will find that
* the tid or the heap(page) does not exist. Because the transaction
* was not originally committed, the tid or the heap(page) should not
* be visible. So accessing the non-existent heap tuple by the tid
* should return that the tuple does not exist without error reporting.
*/
if (!block_exist) {
return InvalidBuffer;
}
if (needputtodirty) {
/* set BM_DIRTY to overwrite later */
uint32 old_buf_state = LockBufHdr(buf_desc);

View File

@ -968,7 +968,7 @@ do {\
/*
* mdread() -- Read the specified block from a relation.
*/
void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char* buffer)
bool mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char* buffer)
{
off_t seekpos;
int nbytes;
@ -1058,6 +1058,12 @@ void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char* b
}
if (nbytes != BLCKSZ) {
#ifndef ENABLE_MULTIPLE_NODES
if(RecoveryInProgress()) {
return false;
}
#endif
if (nbytes < 0) {
ereport(ERROR,
(errcode_for_file_access(),
@ -1085,6 +1091,9 @@ void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char* b
BLCKSZ)));
}
}
/* Target block exists */
return true;
}
/*

View File

@ -50,7 +50,7 @@ typedef struct f_smgr {
void (*smgr_extend)(
SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const char* buffer, bool skipFsync);
void (*smgr_prefetch)(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum);
void (*smgr_read)(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char* buffer);
bool (*smgr_read)(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char* buffer);
void (*smgr_write)(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const char* buffer, bool skipFsync);
void (*smgr_writeback)(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks);
BlockNumber (*smgr_nblocks)(SMgrRelation reln, ForkNumber forknum);
@ -560,9 +560,9 @@ void smgrasyncwrite(SMgrRelation reln, ForkNumber forknum, AioDispatchDesc_t** d
* instantiate pages in the shared buffer cache. All storage managers
* return pages in the format that POSTGRES expects.
*/
void smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char* buffer)
bool smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char* buffer)
{
(*(g_smgrsw[reln->smgr_which].smgr_read))(reln, forknum, blocknum, buffer);
return (*(g_smgrsw[reln->smgr_which].smgr_read))(reln, forknum, blocknum, buffer);
}
/*

View File

@ -95,7 +95,7 @@ extern void smgrdounlink(SMgrRelation reln, bool isRedo);
extern void smgrdounlinkfork(SMgrRelation reln, ForkNumber forknum, bool isRedo);
extern void smgrextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const char* buffer, bool skipFsync);
extern void smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum);
extern void smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char* buffer);
extern bool smgrread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char* buffer);
extern void smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const char* buffer, bool skipFsync);
extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks);
extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
@ -117,7 +117,7 @@ extern bool mdexists(SMgrRelation reln, ForkNumber forknum);
extern void mdunlink(const RelFileNodeBackend& rnode, ForkNumber forknum, bool isRedo);
extern void mdextend(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const char* buffer, bool skipFsync);
extern void mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum);
extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char* buffer);
extern bool mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char* buffer);
extern void mdwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const char* buffer, bool skipFsync);
extern void mdwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks);
extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);