新增按需回放极致RTO基线

This commit is contained in:
chendong76
2023-06-14 09:36:32 +08:00
committed by zhang_xubo
parent b53839900e
commit fa8a8ce8de
45 changed files with 9846 additions and 1183 deletions

View File

@ -23,8 +23,8 @@
* ---------------------------------------------------------------------------------------
*/
#ifndef BATCH_REDO_H
#define BATCH_REDO_H
#ifndef EXTREME_RTO_BATCH_REDO_H
#define EXTREME_RTO_BATCH_REDO_H
#include "c.h"
#include "storage/buf/block.h"
@ -69,4 +69,4 @@ extern void PRTrackClearBlock(XLogRecParseState *recordBlockState, HTAB *redoIt
extern void PRTrackAddBlock(XLogRecParseState *recordBlockState, HTAB *redoItemHash);
} // namespace extreme_rto
#endif /* BATCH_REDO_H */
#endif /* EXTREME_RTO_BATCH_REDO_H */

View File

@ -70,15 +70,6 @@ typedef enum {
WORKER_STATE_EXITING,
} ReadWorkersState;
typedef enum {
TRIGGER_NORMAL = 0,
TRIGGER_PRIMARY,
TRIGGER_STADNBY,
TRIGGER_FAILOVER,
TRIGGER_SWITCHOVER,
TRIGGER_SMARTSHUTDOWN,
} Enum_TriggeredState;
typedef enum {
NONE,
APPLYING,
@ -193,7 +184,6 @@ const static uint64 OUTPUT_WAIT_COUNT = 0x7FFFFFF;
const static uint64 PRINT_ALL_WAIT_COUNT = 0x7FFFFFFFF;
extern RedoItem g_redoEndMark;
extern RedoItem g_terminateMark;
extern uint32 g_startupTriggerState;
extern uint32 g_readManagerTriggerFlag;
inline int get_batch_redo_num()
@ -251,13 +241,11 @@ void GetReplayedRecPtr(XLogRecPtr *startPtr, XLogRecPtr *endPtr);
void StartupSendFowarder(RedoItem *item);
XLogRecPtr GetSafeMinCheckPoint();
RedoWaitInfo redo_get_io_event(int32 event_id);
void redo_get_wroker_statistic(uint32 *realNum, RedoWorkerStatsData *worker, uint32 workerLen);
void redo_get_worker_statistic(uint32 *realNum, RedoWorkerStatsData *worker, uint32 workerLen);
void CheckCommittingCsnList();
void redo_get_wroker_time_count(RedoWorkerTimeCountsInfo **workerCountInfoList, uint32 *realNum);
void redo_get_worker_time_count(RedoWorkerTimeCountsInfo **workerCountInfoList, uint32 *realNum);
void DumpDispatcher();
} // namespace extreme_rto
extreme_rto::Enum_TriggeredState CheckForSatartupStatus(void);
#endif

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2023 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* xlog_read.h
*
*
*
* IDENTIFICATION
* src/include/access/extreme_rto/xlog_read.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef EXTREME_RTO_XLOG_READ_H
#define EXTREME_RTO_XLOG_READ_H
#include "access/xlog_basic.h"
namespace extreme_rto {
XLogRecord* XLogParallelReadNextRecord(XLogReaderState* xlogreader);
XLogRecord *ReadNextXLogRecord(XLogReaderState **xlogreaderptr, int emode);
} // namespace extreme_rto
#endif /* EXTREME_RTO_XLOG_READ_H */

View File

@ -0,0 +1,77 @@
/*
* Copyright (c) 2023 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* extreme_rto_redo_api.h
*
*
* IDENTIFICATION
* src/include/access/extreme_rto_redo_api.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef EXTREME_RTO_REDO_API_H
#define EXTREME_RTO_REDO_API_H
#include "access/xlogproc.h"
#include "access/redo_statistic.h"
typedef enum {
DEFAULT_EXTREME_RTO,
ONDEMAND_EXTREME_RTO,
} ExtremeRtoRedoType;
extern ExtremeRtoRedoType g_extreme_rto_type;
void ExtremeWaitAllReplayWorkerIdle();
void ExtremeDispatchCleanInvalidPageMarkToAllRedoWorker(RepairFileKey key);
void ExtremeDispatchClosefdMarkToAllRedoWorker();
void ExtremeRecordBadBlockAndPushToRemote(XLogBlockDataParse *datadecode, PageErrorType error_type,
XLogRecPtr old_lsn, XLogPhyBlock pblk);
void ExtremeCheckCommittingCsnList();
XLogRecord *ExtremeReadNextXLogRecord(XLogReaderState **xlogreaderptr, int emode);
void ExtremeExtremeRtoStopHere();
void ExtremeWaitAllRedoWorkerQueueEmpty();
XLogRecPtr ExtremeGetSafeMinCheckPoint();
void ExtremeClearRecoveryThreadHashTbl(const RelFileNode &node, ForkNumber forknum, BlockNumber minblkno,
bool segment_shrink);
void ExtremeBatchClearRecoveryThreadHashTbl(Oid spcNode, Oid dbNode);
bool ExtremeRedoWorkerIsUndoSpaceWorker();
void ExtremeStartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen);
void ExtremeDispatchRedoRecordToFile(XLogReaderState *record, List *expectedTLIs, TimestampTz recordXTime);
void ExtremeGetThreadNameIfPageRedoWorker(int argc, char *argv[], char **threadNamePtr);
PGPROC *ExtremeStartupPidGetProc(ThreadId pid);
void ExtremeUpdateStandbyState(HotStandbyState newState);
void ExtremeUpdateMinRecoveryForTrxnRedoThd(XLogRecPtr newMinRecoveryPoint);
uint32 ExtremeGetMyPageRedoWorkerIdWithLock();
void ExtremeParallelRedoThreadMain();
void ExtremeFreeAllocatedRedoItem();
uint32 ExtremeGetAllWorkerCount();
void **ExtremeGetXLogInvalidPagesFromWorkers();
void ExtremeSendRecoveryEndMarkToWorkersAndWaitForFinish(int code);
RedoWaitInfo ExtremeRedoGetIoEvent(int32 event_id);
void ExtremeRedoGetWorkerStatistic(uint32 *realNum, RedoWorkerStatsData *worker, uint32 workerLen);
void ExtremeRedoGetWorkerTimeCount(RedoWorkerTimeCountsInfo **workerCountInfoList, uint32 *realNum);
void ExtremeEndDispatcherContext();
void ExtremeSetPageRedoWorkerIndex(int index);
int ExtremeGetPageRedoWorkerIndex();
void ExtremeSetMyPageRedoWorker(knl_thread_arg *arg);
uint32 ExtremeGetMyPageRedoWorkerId();
bool IsExtremeMultiThreadRedoRunning();
bool IsExtremeRtoRunning();
bool IsExtremeRtoSmartShutdown();
void ExtremeRtoRedoManagerSendEndToStartup();
#endif

View File

@ -33,9 +33,7 @@
#include "nodes/pg_list.h"
#include "storage/proc.h"
#include "access/redo_statistic.h"
#include "access/extreme_rto_redo_api.h"
typedef enum {
NOT_PAGE_REDO_THREAD,
@ -44,6 +42,7 @@ typedef enum {
} PageRedoExitStatus;
extern bool g_supportHotStandby;
extern uint32 g_startupTriggerState;
const static bool SUPPORT_FPAGE_DISPATCH = true; /* support file dispatch if true, else support page dispatche */
const static bool SUPPORT_USTORE_UNDO_WORKER = true; /* support USTORE has undo redo worker, support page dispatch */
@ -88,7 +87,6 @@ static inline bool IsMultiThreadRedo()
uint32 GetRedoWorkerCount();
bool IsMultiThreadRedoRunning();
bool IsExtremeRtoRunning();
void DispatchRedoRecord(XLogReaderState* record, List* expectedTLIs, TimestampTz recordXTime);
void GetThreadNameIfMultiRedo(int argc, char* argv[], char** threadNamePtr);
@ -113,9 +111,7 @@ void FreeAllocatedRedoItem();
void** GetXLogInvalidPagesFromWorkers();
void SendRecoveryEndMarkToWorkersAndWaitForFinish(int code);
RedoWaitInfo GetRedoIoEvent(int32 event_id);
void GetRedoWrokerStatistic(uint32* realNum, RedoWorkerStatsData* worker, uint32 workerLen);
bool IsExtremeRtoSmartShutdown();
void ExtremeRtoRedoManagerSendEndToStartup();
void GetRedoWorkerStatistic(uint32* realNum, RedoWorkerStatsData* worker, uint32 workerLen);
void CountXLogNumbers(XLogReaderState *record);
void ApplyRedoRecord(XLogReaderState* record);
void DiagLogRedoRecord(XLogReaderState *record, const char *funcName);

View File

@ -0,0 +1,72 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* batch_redo.h
*
*
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/batch_redo.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_BATCH_REDO_H
#define ONDEMAND_EXTREME_RTO_BATCH_REDO_H
#include "c.h"
#include "storage/buf/block.h"
#include "storage/smgr/relfilenode.h"
#include "lib/dllist.h"
#include "utils/hsearch.h"
#include "access/xlogproc.h"
#include "access/xlogutils.h"
namespace ondemand_extreme_rto {
#define PAGETYPE_DROP 0x04
#define PAGETYPE_CREATE 0x02
#define PAGETYPE_TRUNCATE 0x01
#define PAGETYPE_MODIFY 0x00
#define INITredoItemHashSIZE 1024
#define INIT_REDO_ITEM_TAG(a, xx_rnode, xx_forkNum, xx_blockNum) \
((a).rNode = (xx_rnode), (a).forkNum = (xx_forkNum), (a).blockNum = (xx_blockNum))
/*
* Note: if there are any pad bytes in the struct, INIT_RedoItemTag have
* to be fixed to zero them, since this struct is used as a hash key.
*/
typedef struct redoitemtag {
RelFileNode rNode;
ForkNumber forkNum;
BlockNumber blockNum;
} RedoItemTag;
typedef struct redoitemhashentry {
RedoItemTag redoItemTag;
XLogRecParseState *head;
XLogRecParseState *tail;
int redoItemNum;
} RedoItemHashEntry;
extern void PRPrintRedoItemHashTab(HTAB *redoItemHash);
extern HTAB *PRRedoItemHashInitialize(MemoryContext context);
extern void PRTrackClearBlock(XLogRecParseState *recordBlockState, HTAB *redoItemHash);
extern void PRTrackAddBlock(XLogRecParseState *recordBlockState, HTAB *redoItemHash);
} // namespace ondemand_extreme_rto
#endif /* BATCH_REDO_H */

View File

@ -0,0 +1,251 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* dispatcher.h
*
*
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/dispatcher.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_DISPATCHER_H
#define ONDEMAND_EXTREME_RTO_DISPATCHER_H
#include "gs_thread.h"
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/xlog.h"
#include "access/xlogreader.h"
#include "nodes/pg_list.h"
#include "storage/proc.h"
#include "access/redo_statistic.h"
#include "access/ondemand_extreme_rto/redo_item.h"
#include "access/ondemand_extreme_rto/page_redo.h"
#include "access/ondemand_extreme_rto/txn_redo.h"
namespace ondemand_extreme_rto {
typedef struct {
PageRedoWorker *batchThd; /* BatchRedoThread */
PageRedoWorker *managerThd; /* PageRedoManager */
PageRedoWorker **redoThd; /* RedoThreadPool */
uint32 redoThdNum;
uint32 *chosedRTIds; /* chosedRedoThdIds */
uint32 chosedRTCnt; /* chosedRedoThdCount */
} PageRedoPipeline;
typedef struct {
PageRedoWorker *managerThd; /* TrxnRedoManager */
PageRedoWorker *redoThd; /* TrxnRedoWorker */
} TrxnRedoPipeline;
typedef struct ReadPipeline {
PageRedoWorker *managerThd; /* readthrd */
PageRedoWorker *readPageThd; /* readthrd */
PageRedoWorker *readThd; /* readthrd */
} ReadPipeline;
#define MAX_XLOG_READ_BUFFER (0xFFFFF) /* 8k uint */
typedef enum {
WORKER_STATE_STOP = 0,
WORKER_STATE_RUN,
WORKER_STATE_STOPPING,
WORKER_STATE_EXIT,
WORKER_STATE_EXITING,
} ReadWorkersState;
typedef enum {
NONE,
APPLYING,
APPLIED,
} ReadBufState;
typedef enum {
READ_MANAGER_STOP,
READ_MANAGER_RUN,
} XLogReadManagerState;
typedef struct RecordBufferAarray {
XLogSegNo segno;
XLogRecPtr segoffset;
uint32 readlen;
char *readsegbuf;
uint32 bufState;
} RecordBufferAarray;
#ifdef USE_ASSERT_CHECKING
#define LSN_CHECK_BUF_SIZE (128*1024*1024)
typedef struct {
uint64 curPosition;
XLogRecPtr curLsn;
#if (!defined __x86_64__) && (!defined __aarch64__)
/* protects lastReplayedReadRecPtr and lastReplayedEndRecPtr */
slock_t ptrLck;
#endif
uint32 lsnCheckBuf[LSN_CHECK_BUF_SIZE];
}LsnCheckCtl;
#endif
typedef struct RecordBufferState {
XLogReaderState *initreader;
uint32 readWorkerState;
uint32 readPageWorkerState;
uint32 readSource;
uint32 failSource;
uint32 xlogReadManagerState;
uint32 applyindex;
uint32 readindex;
RecordBufferAarray xlogsegarray[MAX_ALLOC_SEGNUM];
char *readsegbuf;
char *readBuf;
char *errormsg_buf;
void *readprivate;
XLogRecPtr targetRecPtr;
XLogRecPtr expectLsn;
uint32 waitRedoDone;
} RecordBufferState;
typedef struct {
MemoryContext oldCtx;
PageRedoPipeline *pageLines;
uint32 pageLineNum; /* PageLineNum */
uint32 *chosedPageLineIds; /* chosedPageLineIds */
uint32 chosedPLCnt; /* chosedPageLineCount */
TrxnRedoPipeline trxnLine;
ReadPipeline readLine;
RecordBufferState rtoXlogBufState;
PageRedoWorker **allWorkers; /* Array of page redo workers. */
uint32 allWorkersCnt;
RedoItem *freeHead; /* Head of freed-item list. */
RedoItem *freeStateHead;
RedoItem *allocatedRedoItem;
int exitCode; /* Thread exit code. */
uint64 totalCostTime;
uint64 txnCostTime; /* txn cost time */
uint64 pprCostTime;
uint32 maxItemNum;
uint32 curItemNum;
uint32 syncEnterCount;
uint32 syncExitCount;
volatile uint32 batchThrdEnterNum;
volatile uint32 batchThrdExitNum;
volatile uint32 segpageXactDoneFlag;
pg_atomic_uint32 standbyState; /* sync standbyState from trxn worker to startup */
bool needImmediateCheckpoint;
bool needFullSyncCheckpoint;
volatile sig_atomic_t smartShutdown;
#ifdef USE_ASSERT_CHECKING
void *originLsnCheckAddr;
LsnCheckCtl *lsnCheckCtl;
slock_t updateLck;
#endif
RedoInterruptCallBackFunc oldStartupIntrruptFunc;
volatile bool recoveryStop;
volatile XLogRedoNumStatics xlogStatics[RM_NEXT_ID][MAX_XLOG_INFO_NUM];
RedoTimeCost *startupTimeCost;
} LogDispatcher;
typedef struct {
bool (*rm_dispatch)(XLogReaderState *record, List *expectedTLIs, TimestampTz recordXTime);
bool (*rm_loginfovalid)(XLogReaderState *record, uint8 minInfo, uint8 maxInfo);
RmgrId rm_id;
uint8 rm_mininfo;
uint8 rm_maxinfo;
} RmgrDispatchData;
extern LogDispatcher *g_dispatcher;
extern RedoItem g_GlobalLsnForwarder;
extern RedoItem g_cleanupMark;
extern THR_LOCAL RecordBufferState *g_recordbuffer;
const static uint64 OUTPUT_WAIT_COUNT = 0x7FFFFFF;
const static uint64 PRINT_ALL_WAIT_COUNT = 0x7FFFFFFFF;
extern RedoItem g_redoEndMark;
extern RedoItem g_terminateMark;
extern uint32 g_readManagerTriggerFlag;
inline int get_batch_redo_num()
{
return g_instance.attr.attr_storage.batch_redo_num;
}
inline int get_page_redo_worker_num_per_manager()
{
return g_instance.attr.attr_storage.recovery_redo_workers_per_paser_worker;
}
inline int get_trxn_redo_manager_num()
{
return TRXN_REDO_MANAGER_NUM;
}
inline int get_trxn_redo_worker_num()
{
return TRXN_REDO_WORKER_NUM;
}
void StartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen);
/* RedoItem lifecycle. */
void DispatchRedoRecordToFile(XLogReaderState *record, List *expectedTLIs, TimestampTz recordXTime);
void ProcessPendingRecords(bool fullSync = false);
void FreeRedoItem(RedoItem *item);
/* Dispatcher phases. */
void SendRecoveryEndMarkToWorkersAndWaitForFinish(int code);
/* Dispatcher states. */
int GetDispatcherExitCode();
bool DispatchPtrIsNull();
uint32 GetBatchCount();
uint32 GetAllWorkerCount();
PGPROC *StartupPidGetProc(ThreadId pid);
extern void SetStartupBufferPinWaitBufId(int bufid);
extern void GetStartupBufferPinWaitBufId(int *bufids, uint32 len);
void UpdateStandbyState(HotStandbyState newState);
void UpdateMinRecoveryForTrxnRedoThd(XLogRecPtr minRecoveryPoint);
/* Redo end state saved by each page worker. */
void **GetXLogInvalidPagesFromWorkers();
/* Other utility functions. */
uint32 GetSlotId(const RelFileNode node, BlockNumber block, ForkNumber forkNum, uint32 workerCount);
bool XactHasSegpageRelFiles(XLogReaderState *record);
XLogReaderState *NewReaderState(XLogReaderState *readerState);
void FreeAllocatedRedoItem();
List *CheckImcompleteAction(List *imcompleteActionList);
void SetPageWorkStateByThreadId(uint32 threadState);
void GetReplayedRecPtr(XLogRecPtr *startPtr, XLogRecPtr *endPtr);
void StartupSendFowarder(RedoItem *item);
XLogRecPtr GetSafeMinCheckPoint();
RedoWaitInfo redo_get_io_event(int32 event_id);
void redo_get_worker_statistic(uint32 *realNum, RedoWorkerStatsData *worker, uint32 workerLen);
void CheckCommittingCsnList();
void redo_get_worker_time_count(RedoWorkerTimeCountsInfo **workerCountInfoList, uint32 *realNum);
void DumpDispatcher();
} // namespace ondemand_extreme_rto
#endif

View File

@ -0,0 +1,258 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* page_redo.h
*
*
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/page_redo.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_PAGE_REDO_H
#define ONDEMAND_EXTREME_RTO_PAGE_REDO_H
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/ondemand_extreme_rto/redo_item.h"
#include "nodes/pg_list.h"
#include "storage/proc.h"
#include "access/ondemand_extreme_rto/posix_semaphore.h"
#include "access/ondemand_extreme_rto/spsc_blocking_queue.h"
#include "access/xlogproc.h"
#include "postmaster/pagerepair.h"
namespace ondemand_extreme_rto {
static const uint32 PAGE_WORK_QUEUE_SIZE = 8192;
static const uint32 ONDEMAND_EXTREME_RTO_ALIGN_LEN = 16; /* need 128-bit aligned */
static const uint32 MAX_REMOTE_READ_INFO_NUM = 100;
typedef enum {
REDO_BATCH,
REDO_PAGE_MNG,
REDO_PAGE_WORKER,
REDO_TRXN_MNG,
REDO_TRXN_WORKER,
REDO_READ_WORKER,
REDO_READ_PAGE_WORKER,
REDO_READ_MNG,
REDO_ROLE_NUM,
} RedoRole;
typedef struct BadBlockRecEnt{
RepairBlockKey key;
XLogPhyBlock pblk;
XLogRecPtr rec_min_lsn;
XLogRecPtr rec_max_lsn;
XLogRecParseState *head;
XLogRecParseState *tail;
} BadBlockRecEnt;
struct PageRedoWorker {
/*
* The last successfully applied log record's end position + 1 as an
* atomic uint64. The type of a log record's position is XLogRecPtr.
* Here the position is stored as an uint64 so it can be read and
* written atomically.
*/
XLogRecPtr lastReplayedReadRecPtr;
XLogRecPtr lastReplayedEndRecPtr;
#if (!defined __x86_64__) && (!defined __aarch64__)
/* protects lastReplayedReadRecPtr and lastReplayedEndRecPtr */
slock_t ptrLck;
#endif
PageRedoWorker *selfOrinAddr;
/* Worker id. */
uint32 id;
int index;
/* Thread id */
gs_thread_t tid;
/* The proc struct of this worker thread. */
PGPROC *proc;
RedoRole role;
uint32 slotId;
bool isUndoSpaceWorker;
/* ---------------------------------------------
* Initial context
*
* Global variable values at worker creation time.
*/
/* Initial server mode from the dispatcher. */
ServerMode initialServerMode;
/* Initial timeline ID from the dispatcher. */
TimeLineID initialTimeLineID;
List *expectedTLIs;
/* ---------------------------------------------
* Redo item queue.
*
* Redo items are provided by the dispatcher and consumed by each
* worker. See AddPageRedoItem() for the use of the additional
* pending list.
*/
/* The head of the pending item list. */
RedoItem *pendingHead;
/* The tail of the pending item list. */
RedoItem *pendingTail;
/* To-be-replayed log-record-list queue. */
SPSCBlockingQueue *queue;
/*
* The last recovery restart point seen by the txn worker. Restart
* points before this is useless and can be removed.
*/
XLogRecPtr lastCheckedRestartPoint;
/* min recovery point */
XLogRecPtr minRecoveryPoint;
/* ---------------------------------------------
* Per-worker run-time context
*
* States maintained by each individual page-redo worker during
* log replay. These are read by the txn-redo worker.
*/
/* ---------------------------------------------
* Global run-time context
*
* States maintained outside page-redo worker during log replay.
* Updates to these states must be synchronized to all page-redo workers.
*/
/*
* Global standbyState set by the txn worker.
*/
HotStandbyState standbyState;
TransactionId latestObservedXid;
bool StandbyMode;
char *DataDir;
TransactionId RecentXmin;
/* ---------------------------------------------
* Redo end context
*
* Thread-local variable values saved after log replay has completed.
* These values are collected by each redo worker at redo end and
* are used by the dispatcher.
*/
/* XLog invalid pages. */
void *xlogInvalidPages;
void *committingCsnList;
/* ---------------------------------------------
* Phase barrier.
*
* A barrier for synchronizing the dispatcher and page redo worker
* between different phases.
*/
/* Semaphore marking the completion of the current phase. */
PosixSemaphore phaseMarker;
MemoryContext oldCtx;
HTAB *redoItemHash;
TimeLineID recoveryTargetTLI;
bool ArchiveRecoveryRequested;
bool StandbyModeRequested;
bool InArchiveRecovery;
bool ArchiveRestoreRequested;
bool InRecovery;
char* recoveryRestoreCommand;
uint32 fullSyncFlag;
RedoParseManager parseManager;
RedoBufferManager bufferManager;
RedoTimeCost timeCostList[TIME_COST_NUM];
uint32 remoteReadPageNum;
HTAB *badPageHashTbl;
char page[BLCKSZ];
XLogBlockDataParse *curRedoBlockState;
};
extern THR_LOCAL PageRedoWorker *g_redoWorker;
/* Worker lifecycle. */
PageRedoWorker *StartPageRedoWorker(PageRedoWorker *worker);
void DestroyPageRedoWorker(PageRedoWorker *worker);
/* Thread creation utility functions. */
bool IsPageRedoWorkerProcess(int argc, char *argv[]);
void AdaptArgvForPageRedoWorker(char *argv[]);
void GetThreadNameIfPageRedoWorker(int argc, char *argv[], char **threadNamePtr);
extern bool RedoWorkerIsUndoSpaceWorker();
uint32 GetMyPageRedoWorkerIdWithLock();
PGPROC *GetPageRedoWorkerProc(PageRedoWorker *worker);
/* Worker main function. */
void ParallelRedoThreadRegister();
void ParallelRedoThreadMain();
/* Dispatcher phases. */
bool SendPageRedoEndMark(PageRedoWorker *worker);
void WaitPageRedoWorkerReachLastMark(PageRedoWorker *worker);
/* Redo processing. */
void AddPageRedoItem(PageRedoWorker *worker, void *item);
void UpdatePageRedoWorkerStandbyState(PageRedoWorker *worker, HotStandbyState newState);
/* Redo end states. */
void ClearBTreeIncompleteActions(PageRedoWorker *worker);
void *GetXLogInvalidPages(PageRedoWorker *worker);
bool RedoWorkerIsIdle(PageRedoWorker *worker);
void DumpPageRedoWorker(PageRedoWorker *worker);
PageRedoWorker *CreateWorker(uint32 id);
extern void UpdateRecordGlobals(RedoItem *item, HotStandbyState standbyState);
void ReferenceRedoItem(void *item);
void DereferenceRedoItem(void *item);
void PushToWorkerLsn(bool force);
void GetCompletedReadEndPtr(PageRedoWorker *worker, XLogRecPtr *readPtr, XLogRecPtr *endPtr);
void SetReadBufferForExtRto(XLogReaderState *state, XLogRecPtr pageptr, int reqLen);
void DumpExtremeRtoReadBuf();
void PutRecordToReadQueue(XLogReaderState *recordreader);
bool LsnUpdate();
void ResetRtoXlogReadBuf(XLogRecPtr targetPagePtr);
bool XLogPageReadForExtRto(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen);
void ExtremeRtoStopHere();
void WaitAllRedoWorkerQueueEmpty();
void WaitAllReplayWorkerIdle();
void DispatchClosefdMarkToAllRedoWorker();
void DispatchCleanInvalidPageMarkToAllRedoWorker(RepairFileKey key);
const char *RedoWokerRole2Str(RedoRole role);
/* block or file repair function */
HTAB* BadBlockHashTblCreate();
void RepairPageAndRecoveryXLog(BadBlockRecEnt *page_info, const char *page);
void CheckRemoteReadAndRepairPage(BadBlockRecEnt *entry);
void ClearSpecificsPageEntryAndMem(BadBlockRecEnt *entry);
void ClearRecoveryThreadHashTbl(const RelFileNode &node, ForkNumber forknum, BlockNumber minblkno,
bool segment_shrink);
void BatchClearRecoveryThreadHashTbl(Oid spcNode, Oid dbNode);
void RecordBadBlockAndPushToRemote(XLogBlockDataParse *datadecode, PageErrorType error_type,
XLogRecPtr old_lsn, XLogPhyBlock pblk);
void SeqCheckRemoteReadAndRepairPage();
} // namespace ondemand_extreme_rto
#endif

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* posix_semaphore.h
*
*
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/posix_semaphore.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_POSIX_SEMAPHORE_H
#define ONDEMAND_EXTREME_RTO_POSIX_SEMAPHORE_H
#include <semaphore.h>
namespace ondemand_extreme_rto {
typedef struct {
sem_t semaphore;
bool initialized;
} PosixSemaphore;
void PosixSemaphoreInit(PosixSemaphore *sem, unsigned int initValue);
void PosixSemaphoreDestroy(PosixSemaphore *sem);
void PosixSemaphoreWait(PosixSemaphore *sem);
void PosixSemaphorePost(PosixSemaphore *sem);
} // namespace ondemand_extreme_rto
#endif

View File

@ -0,0 +1,75 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* redo_item.h
*
*
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/redo_item.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_REDO_ITEM_H
#define ONDEMAND_EXTREME_RTO_REDO_ITEM_H
#include "access/xlogreader.h"
#include "datatype/timestamp.h"
#include "nodes/pg_list.h"
#include "utils/atomic.h"
#include "storage/buf/block.h"
#include "storage/smgr/relfilenode.h"
#include "access/ondemand_extreme_rto/posix_semaphore.h"
#include "replication/replicainternal.h"
namespace ondemand_extreme_rto {
typedef struct RedoItem_s {
bool needImmediateCheckpoint;
bool needFullSyncCheckpoint;
/* The expected timelines for this record. */
List *expectedTLIs;
/* The timestamp of the log record if it is a transaction record. */
TimestampTz recordXTime;
/* Next item on the free list. */
struct RedoItem_s *freeNext;
/* A "deep" copy of the log record. */
XLogReaderState record;
/* Used for really free */
struct RedoItem_s *allocatedNext;
TimestampTz syncXLogReceiptTime;
int syncXLogReceiptSource;
TransactionId RecentXmin;
ServerMode syncServerMode;
} RedoItem;
static const int32 ANY_BLOCK_ID = -1;
static const uint32 ANY_WORKER = (uint32)-1;
static const uint32 TRXN_WORKER = (uint32)-2;
static const uint32 ALL_WORKER = (uint32)-3;
static const uint32 USTORE_WORKER = (uint32)-4;
void DumpItem(RedoItem *item, const char *funcName);
static inline RedoItem *GetRedoItemPtr(XLogReaderState *record)
{
return (RedoItem *)(((char *)record) - offsetof(RedoItem, record));
}
} // namespace ondemand_extreme_rto
#endif

View File

@ -0,0 +1,62 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* spsc_blocking_queue.h
*
*
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/spsc_blocking_queue.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_SPSC_BLOCKING_QUEUE_H
#define ONDEMAND_EXTREME_RTO_SPSC_BLOCKING_QUEUE_H
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/parallel_recovery/posix_semaphore.h"
namespace ondemand_extreme_rto {
typedef void (*CallBackFunc)();
struct SPSCBlockingQueue {
pg_atomic_uint32 writeHead; /* Array index for the next write. */
pg_atomic_uint32 readTail; /* Array index for the next read. */
uint32 capacity; /* Queue capacity, must be power of 2. */
uint32 mask; /* Bit mask for computing index. */
pg_atomic_uint32 maxUsage;
pg_atomic_uint64 totalCnt;
CallBackFunc callBackFunc;
uint64 lastTotalCnt;
void *buffer[1]; /* Queue buffer, the actual size is capacity. */
};
SPSCBlockingQueue *SPSCBlockingQueueCreate(uint32 capacity, CallBackFunc func = NULL);
void SPSCBlockingQueueDestroy(SPSCBlockingQueue *queue);
bool SPSCBlockingQueuePut(SPSCBlockingQueue *queue, void *element);
void *SPSCBlockingQueueTake(SPSCBlockingQueue *queue);
bool SPSCBlockingQueueIsEmpty(SPSCBlockingQueue *queue);
void *SPSCBlockingQueueTop(SPSCBlockingQueue *queue);
void SPSCBlockingQueuePop(SPSCBlockingQueue *queue);
void DumpQueue(const SPSCBlockingQueue *queue);
uint32 SPSCGetQueueCount(SPSCBlockingQueue *queue);
bool SPSCBlockingQueueGetAll(SPSCBlockingQueue *queue, void ***eleArry, uint32 *eleNum);
void SPSCBlockingQueuePopN(SPSCBlockingQueue *queue, uint32 n);
} // namespace ondemand_extreme_rto
#endif

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* txn_redo.h
*
*
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/txn_redo.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_TXN_REDO_H
#define ONDEMAND_EXTREME_RTO_TXN_REDO_H
#include "access/parallel_recovery/redo_item.h"
namespace ondemand_extreme_rto {
void AddTxnRedoItem(PageRedoWorker *worker, void *item);
void TrxnMngProc(RedoItem *item, PageRedoWorker *wk);
void TrxnWorkerProc(RedoItem *item);
} // namespace ondemand_extreme_rto
#endif

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2023 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* xlog_read.h
*
*
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/xlog_read.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_XLOG_READ_H
#define ONDEMAND_EXTREME_RTO_XLOG_READ_H
#include "access/xlog_basic.h"
namespace ondemand_extreme_rto {
XLogRecord* XLogParallelReadNextRecord(XLogReaderState* xlogreader);
XLogRecord *ReadNextXLogRecord(XLogReaderState **xlogreaderptr, int emode);
} // namespace ondemand_extreme_rto
#endif /* XLOG_READ_H */

View File

@ -126,7 +126,7 @@ void GetReplayedRecPtrFromUndoWorkers(XLogRecPtr *readPtr, XLogRecPtr *endPtr);
List* CheckImcompleteAction(List* imcompleteActionList);
void SetPageWorkStateByThreadId(uint32 threadState);
RedoWaitInfo redo_get_io_event(int32 event_id);
void redo_get_wroker_statistic(uint32* realNum, RedoWorkerStatsData* worker, uint32 workerLen);
void redo_get_worker_statistic(uint32* realNum, RedoWorkerStatsData* worker, uint32 workerLen);
extern void redo_dump_all_stats();
void WaitRedoWorkerIdle();
void SendClearMarkToAllWorkers();
@ -139,7 +139,7 @@ extern void InitReaderStateByOld(XLogReaderState *newState, XLogReaderState *old
extern void CopyDataFromOldReader(XLogReaderState *newReaderState, XLogReaderState *oldReaderState);
bool TxnQueueIsEmpty(TxnRedoWorker* worker);
void redo_get_wroker_time_count(RedoWorkerTimeCountsInfo **workerCountInfoList, uint32 *realNum);
void redo_get_worker_time_count(RedoWorkerTimeCountsInfo **workerCountInfoList, uint32 *realNum);
}

View File

@ -64,6 +64,15 @@ typedef enum {
STANDBY_SNAPSHOT_READY
} HotStandbyState;
typedef enum {
TRIGGER_NORMAL = 0,
TRIGGER_PRIMARY,
TRIGGER_STADNBY,
TRIGGER_FAILOVER,
TRIGGER_SWITCHOVER,
TRIGGER_SMARTSHUTDOWN,
} Enum_TriggeredState;
#define InHotStandby (t_thrd.xlog_cxt.standbyState >= STANDBY_SNAPSHOT_PENDING)
#define DUMMYSTANDBY_CONNECT_INTERVAL 3 // unit second
@ -814,7 +823,6 @@ extern char* TrimStr(const char* str);
extern void CloseXlogFilesAtThreadExit(void);
extern void SetLatestXTime(TimestampTz xtime);
XLogRecord* XLogParallelReadNextRecord(XLogReaderState* xlogreader);
void ResourceManagerStartup(void);
void ResourceManagerStop(void);
@ -859,6 +867,14 @@ bool CheckForSwitchoverTrigger(void);
void HandleCascadeStandbyPromote(XLogRecPtr *recptr);
void update_dirty_page_queue_rec_lsn(XLogRecPtr current_insert_lsn, bool need_immediately_update = false);
XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int whichChkpt);
int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
bool timeLineInHistory(TimeLineID tli, List *expectedTLEs);
Enum_TriggeredState CheckForSatartupStatus(void);
bool CheckForStandbyTrigger(void);
void UpdateMinrecoveryInAchive();
bool NewDataIsInBuf(XLogRecPtr expectedRecPtr);
bool rescanLatestTimeLine(void);
int XLogFileReadAnyTLI(XLogSegNo segno, int emode, uint32 sources);
extern XLogRecPtr XlogRemoveSegPrimary;

View File

@ -21,14 +21,8 @@
#include "replication/output_plugin.h"
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/extreme_rto/redo_item.h"
#include "nodes/pg_list.h"
#include "storage/proc.h"
#include "access/extreme_rto/posix_semaphore.h"
#include "access/extreme_rto/spsc_blocking_queue.h"
#include "access/parallel_recovery/redo_item.h"
#include "nodes/parsenodes_common.h"