!3544 openGauss资源池化支持极致RTO按需回放

Merge pull request !3544 from 陈栋/ondemand
This commit is contained in:
opengauss-bot
2023-06-14 02:28:32 +00:00
committed by Gitee
91 changed files with 10952 additions and 1517 deletions

View File

@ -23,8 +23,8 @@
* ---------------------------------------------------------------------------------------
*/
#ifndef BATCH_REDO_H
#define BATCH_REDO_H
#ifndef EXTREME_RTO_BATCH_REDO_H
#define EXTREME_RTO_BATCH_REDO_H
#include "c.h"
#include "storage/buf/block.h"
@ -69,4 +69,4 @@ extern void PRTrackClearBlock(XLogRecParseState *recordBlockState, HTAB *redoIt
extern void PRTrackAddBlock(XLogRecParseState *recordBlockState, HTAB *redoItemHash);
} // namespace extreme_rto
#endif /* BATCH_REDO_H */
#endif /* EXTREME_RTO_BATCH_REDO_H */

View File

@ -70,15 +70,6 @@ typedef enum {
WORKER_STATE_EXITING,
} ReadWorkersState;
typedef enum {
TRIGGER_NORMAL = 0,
TRIGGER_PRIMARY,
TRIGGER_STADNBY,
TRIGGER_FAILOVER,
TRIGGER_SWITCHOVER,
TRIGGER_SMARTSHUTDOWN,
} Enum_TriggeredState;
typedef enum {
NONE,
APPLYING,
@ -193,7 +184,6 @@ const static uint64 OUTPUT_WAIT_COUNT = 0x7FFFFFF;
const static uint64 PRINT_ALL_WAIT_COUNT = 0x7FFFFFFFF;
extern RedoItem g_redoEndMark;
extern RedoItem g_terminateMark;
extern uint32 g_startupTriggerState;
extern uint32 g_readManagerTriggerFlag;
inline int get_batch_redo_num()
@ -251,13 +241,11 @@ void GetReplayedRecPtr(XLogRecPtr *startPtr, XLogRecPtr *endPtr);
void StartupSendFowarder(RedoItem *item);
XLogRecPtr GetSafeMinCheckPoint();
RedoWaitInfo redo_get_io_event(int32 event_id);
void redo_get_wroker_statistic(uint32 *realNum, RedoWorkerStatsData *worker, uint32 workerLen);
void redo_get_worker_statistic(uint32 *realNum, RedoWorkerStatsData *worker, uint32 workerLen);
void CheckCommittingCsnList();
void redo_get_wroker_time_count(RedoWorkerTimeCountsInfo **workerCountInfoList, uint32 *realNum);
void redo_get_worker_time_count(RedoWorkerTimeCountsInfo **workerCountInfoList, uint32 *realNum);
void DumpDispatcher();
} // namespace extreme_rto
extreme_rto::Enum_TriggeredState CheckForSatartupStatus(void);
#endif

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2023 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* xlog_read.h
*
*
*
* IDENTIFICATION
* src/include/access/extreme_rto/xlog_read.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef EXTREME_RTO_XLOG_READ_H
#define EXTREME_RTO_XLOG_READ_H
#include "access/xlog_basic.h"
namespace extreme_rto {
XLogRecord* XLogParallelReadNextRecord(XLogReaderState* xlogreader);
XLogRecord *ReadNextXLogRecord(XLogReaderState **xlogreaderptr, int emode);
} // namespace extreme_rto
#endif /* EXTREME_RTO_XLOG_READ_H */

View File

@ -0,0 +1,78 @@
/*
* Copyright (c) 2023 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* extreme_rto_redo_api.h
*
*
* IDENTIFICATION
* src/include/access/extreme_rto_redo_api.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef EXTREME_RTO_REDO_API_H
#define EXTREME_RTO_REDO_API_H
#include "access/xlogproc.h"
#include "access/redo_statistic.h"
#include "access/ondemand_extreme_rto/redo_utils.h"
typedef enum {
DEFAULT_EXTREME_RTO,
ONDEMAND_EXTREME_RTO,
} ExtremeRtoRedoType;
extern ExtremeRtoRedoType g_extreme_rto_type;
void ExtremeWaitAllReplayWorkerIdle();
void ExtremeDispatchCleanInvalidPageMarkToAllRedoWorker(RepairFileKey key);
void ExtremeDispatchClosefdMarkToAllRedoWorker();
void ExtremeRecordBadBlockAndPushToRemote(XLogBlockDataParse *datadecode, PageErrorType error_type,
XLogRecPtr old_lsn, XLogPhyBlock pblk);
void ExtremeCheckCommittingCsnList();
XLogRecord *ExtremeReadNextXLogRecord(XLogReaderState **xlogreaderptr, int emode);
void ExtremeExtremeRtoStopHere();
void ExtremeWaitAllRedoWorkerQueueEmpty();
XLogRecPtr ExtremeGetSafeMinCheckPoint();
void ExtremeClearRecoveryThreadHashTbl(const RelFileNode &node, ForkNumber forknum, BlockNumber minblkno,
bool segment_shrink);
void ExtremeBatchClearRecoveryThreadHashTbl(Oid spcNode, Oid dbNode);
bool ExtremeRedoWorkerIsUndoSpaceWorker();
void ExtremeStartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen);
void ExtremeDispatchRedoRecordToFile(XLogReaderState *record, List *expectedTLIs, TimestampTz recordXTime);
void ExtremeGetThreadNameIfPageRedoWorker(int argc, char *argv[], char **threadNamePtr);
PGPROC *ExtremeStartupPidGetProc(ThreadId pid);
void ExtremeUpdateStandbyState(HotStandbyState newState);
void ExtremeUpdateMinRecoveryForTrxnRedoThd(XLogRecPtr newMinRecoveryPoint);
uint32 ExtremeGetMyPageRedoWorkerIdWithLock();
void ExtremeParallelRedoThreadMain();
void ExtremeFreeAllocatedRedoItem();
uint32 ExtremeGetAllWorkerCount();
void **ExtremeGetXLogInvalidPagesFromWorkers();
void ExtremeSendRecoveryEndMarkToWorkersAndWaitForFinish(int code);
RedoWaitInfo ExtremeRedoGetIoEvent(int32 event_id);
void ExtremeRedoGetWorkerStatistic(uint32 *realNum, RedoWorkerStatsData *worker, uint32 workerLen);
void ExtremeRedoGetWorkerTimeCount(RedoWorkerTimeCountsInfo **workerCountInfoList, uint32 *realNum);
void ExtremeEndDispatcherContext();
void ExtremeSetPageRedoWorkerIndex(int index);
int ExtremeGetPageRedoWorkerIndex();
void ExtremeSetMyPageRedoWorker(knl_thread_arg *arg);
uint32 ExtremeGetMyPageRedoWorkerId();
bool IsExtremeMultiThreadRedoRunning();
bool IsExtremeRtoRunning();
bool IsExtremeRtoSmartShutdown();
void ExtremeRtoRedoManagerSendEndToStartup();
#endif

View File

@ -33,9 +33,14 @@
#include "nodes/pg_list.h"
#include "storage/proc.h"
#include "access/redo_statistic.h"
#include "access/extreme_rto_redo_api.h"
#ifdef ENABLE_LITE_MODE
#define ENABLE_ONDEMAND_RECOVERY false
#else
#define ENABLE_ONDEMAND_RECOVERY (ENABLE_DMS && IsExtremeRedo() \
&& g_instance.attr.attr_storage.dms_attr.enable_ondemand_recovery)
#endif
typedef enum {
NOT_PAGE_REDO_THREAD,
@ -44,6 +49,7 @@ typedef enum {
} PageRedoExitStatus;
extern bool g_supportHotStandby;
extern uint32 g_startupTriggerState;
const static bool SUPPORT_FPAGE_DISPATCH = true; /* support file dispatch if true, else support page dispatche */
const static bool SUPPORT_USTORE_UNDO_WORKER = true; /* support USTORE has undo redo worker, support page dispatch */
@ -88,7 +94,6 @@ static inline bool IsMultiThreadRedo()
uint32 GetRedoWorkerCount();
bool IsMultiThreadRedoRunning();
bool IsExtremeRtoRunning();
void DispatchRedoRecord(XLogReaderState* record, List* expectedTLIs, TimestampTz recordXTime);
void GetThreadNameIfMultiRedo(int argc, char* argv[], char** threadNamePtr);
@ -113,9 +118,7 @@ void FreeAllocatedRedoItem();
void** GetXLogInvalidPagesFromWorkers();
void SendRecoveryEndMarkToWorkersAndWaitForFinish(int code);
RedoWaitInfo GetRedoIoEvent(int32 event_id);
void GetRedoWrokerStatistic(uint32* realNum, RedoWorkerStatsData* worker, uint32 workerLen);
bool IsExtremeRtoSmartShutdown();
void ExtremeRtoRedoManagerSendEndToStartup();
void GetRedoWorkerStatistic(uint32* realNum, RedoWorkerStatsData* worker, uint32 workerLen);
void CountXLogNumbers(XLogReaderState *record);
void ApplyRedoRecord(XLogReaderState* record);
void DiagLogRedoRecord(XLogReaderState *record, const char *funcName);

View File

@ -0,0 +1,78 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* batch_redo.h
*
*
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/batch_redo.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_BATCH_REDO_H
#define ONDEMAND_EXTREME_RTO_BATCH_REDO_H
#include "c.h"
#include "storage/buf/block.h"
#include "storage/smgr/relfilenode.h"
#include "lib/dllist.h"
#include "utils/hsearch.h"
#include "access/xlogproc.h"
#include "access/xlogutils.h"
namespace ondemand_extreme_rto {
#define PAGETYPE_DROP 0x04
#define PAGETYPE_CREATE 0x02
#define PAGETYPE_TRUNCATE 0x01
#define PAGETYPE_MODIFY 0x00
#define INITredoItemHashSIZE 1024
#define INIT_REDO_ITEM_TAG(a, xx_rnode, xx_forkNum, xx_blockNum) \
((a).rNode = (xx_rnode), (a).forkNum = (xx_forkNum), (a).blockNum = (xx_blockNum))
#define XlogTrackTableHashPartition(hashcode) ((hashcode) % NUM_XLOG_TRACK_PARTITIONS)
#define XlogTrackMappingPartitionLock(hashcode) \
(&t_thrd.shemem_ptr_cxt.mainLWLockArray[FirstXlogTrackLock + XlogTrackTableHashPartition(hashcode)].lock)
/*
* Note: if there are any pad bytes in the struct, INIT_RedoItemTag have
* to be fixed to zero them, since this struct is used as a hash key.
*/
typedef struct redoitemtag {
RelFileNode rNode;
ForkNumber forkNum;
BlockNumber blockNum;
} RedoItemTag;
typedef struct redoitemhashentry {
RedoItemTag redoItemTag;
XLogRecParseState *head;
XLogRecParseState *tail;
int redoItemNum;
bool redoDone;
} RedoItemHashEntry;
extern void PRPrintRedoItemHashTab(HTAB *redoItemHash);
extern HTAB **PRRedoItemHashInitialize(MemoryContext context);
extern void PRTrackClearBlock(XLogRecParseState *recordBlockState, HTAB *redoItemHash);
extern void PRTrackAddBlock(XLogRecParseState *recordBlockState, HTAB *redoItemHash);
extern uint32 XlogTrackTableHashCode(RedoItemTag *tagPtr);
} // namespace ondemand_extreme_rto
#endif /* ONDEMAND_EXTREME_RTO_BATCH_REDO_H */

View File

@ -0,0 +1,255 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* dispatcher.h
*
*
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/dispatcher.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_DISPATCHER_H
#define ONDEMAND_EXTREME_RTO_DISPATCHER_H
#include "gs_thread.h"
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/xlog.h"
#include "access/xlogreader.h"
#include "nodes/pg_list.h"
#include "storage/proc.h"
#include "access/redo_statistic.h"
#include "access/ondemand_extreme_rto/redo_item.h"
#include "access/ondemand_extreme_rto/page_redo.h"
#include "access/ondemand_extreme_rto/txn_redo.h"
namespace ondemand_extreme_rto {
typedef struct {
PageRedoWorker *batchThd; /* BatchRedoThread */
PageRedoWorker *managerThd; /* PageRedoManager */
PageRedoWorker **redoThd; /* RedoThreadPool */
uint32 redoThdNum;
uint32 *chosedRTIds; /* chosedRedoThdIds */
uint32 chosedRTCnt; /* chosedRedoThdCount */
} PageRedoPipeline;
typedef struct {
PageRedoWorker *managerThd; /* TrxnRedoManager */
PageRedoWorker *redoThd; /* TrxnRedoWorker */
} TrxnRedoPipeline;
typedef struct ReadPipeline {
PageRedoWorker *managerThd; /* readthrd */
PageRedoWorker *readPageThd; /* readthrd */
PageRedoWorker *readThd; /* readthrd */
} ReadPipeline;
#define MAX_XLOG_READ_BUFFER (0xFFFFF) /* 8k uint */
typedef enum {
WORKER_STATE_STOP = 0,
WORKER_STATE_RUN,
WORKER_STATE_STOPPING,
WORKER_STATE_EXIT,
WORKER_STATE_EXITING,
} ReadWorkersState;
typedef enum {
NONE,
APPLYING,
APPLIED,
} ReadBufState;
typedef enum {
READ_MANAGER_STOP,
READ_MANAGER_RUN,
} XLogReadManagerState;
typedef struct RecordBufferAarray {
XLogSegNo segno;
XLogRecPtr segoffset;
uint32 readlen;
char *readsegbuf;
uint32 bufState;
} RecordBufferAarray;
#ifdef USE_ASSERT_CHECKING
#define LSN_CHECK_BUF_SIZE (128*1024*1024)
typedef struct {
uint64 curPosition;
XLogRecPtr curLsn;
#if (!defined __x86_64__) && (!defined __aarch64__)
/* protects lastReplayedReadRecPtr and lastReplayedEndRecPtr */
slock_t ptrLck;
#endif
uint32 lsnCheckBuf[LSN_CHECK_BUF_SIZE];
}LsnCheckCtl;
#endif
typedef struct RecordBufferState {
XLogReaderState *initreader;
uint32 readWorkerState;
uint32 readPageWorkerState;
uint32 readSource;
uint32 failSource;
uint32 xlogReadManagerState;
uint32 applyindex;
uint32 readindex;
RecordBufferAarray xlogsegarray[MAX_ALLOC_SEGNUM];
char *readsegbuf;
char *readBuf;
char *errormsg_buf;
void *readprivate;
XLogRecPtr targetRecPtr;
XLogRecPtr expectLsn;
uint32 waitRedoDone;
} RecordBufferState;
typedef struct {
MemoryContext oldCtx;
PageRedoPipeline *pageLines;
uint32 pageLineNum; /* PageLineNum */
uint32 *chosedPageLineIds; /* chosedPageLineIds */
uint32 chosedPLCnt; /* chosedPageLineCount */
TrxnRedoPipeline trxnLine;
ReadPipeline readLine;
RecordBufferState rtoXlogBufState;
PageRedoWorker **allWorkers; /* Array of page redo workers. */
uint32 allWorkersCnt;
RedoItem *freeHead; /* Head of freed-item list. */
RedoItem *freeStateHead;
RedoItem *allocatedRedoItem;
int exitCode; /* Thread exit code. */
uint64 totalCostTime;
uint64 txnCostTime; /* txn cost time */
uint64 pprCostTime;
uint32 maxItemNum;
uint32 curItemNum;
uint32 syncEnterCount;
uint32 syncExitCount;
volatile uint32 batchThrdEnterNum;
volatile uint32 batchThrdExitNum;
volatile uint32 segpageXactDoneFlag;
pg_atomic_uint32 standbyState; /* sync standbyState from trxn worker to startup */
bool needImmediateCheckpoint;
bool needFullSyncCheckpoint;
volatile sig_atomic_t smartShutdown;
#ifdef USE_ASSERT_CHECKING
void *originLsnCheckAddr;
LsnCheckCtl *lsnCheckCtl;
slock_t updateLck;
#endif
RedoInterruptCallBackFunc oldStartupIntrruptFunc;
volatile bool recoveryStop;
volatile XLogRedoNumStatics xlogStatics[RM_NEXT_ID][MAX_XLOG_INFO_NUM];
RedoTimeCost *startupTimeCost;
RedoParseManager parseManager;
} LogDispatcher;
typedef struct {
bool (*rm_dispatch)(XLogReaderState *record, List *expectedTLIs, TimestampTz recordXTime);
bool (*rm_loginfovalid)(XLogReaderState *record, uint8 minInfo, uint8 maxInfo);
RmgrId rm_id;
uint8 rm_mininfo;
uint8 rm_maxinfo;
} RmgrDispatchData;
extern LogDispatcher *g_dispatcher;
extern RedoItem g_GlobalLsnForwarder;
extern RedoItem g_cleanupMark;
extern THR_LOCAL RecordBufferState *g_recordbuffer;
const static uint64 OUTPUT_WAIT_COUNT = 0x7FFFFFF;
const static uint64 PRINT_ALL_WAIT_COUNT = 0x7FFFFFFFF;
extern RedoItem g_redoEndMark;
extern RedoItem g_terminateMark;
extern uint32 g_readManagerTriggerFlag;
extern RefOperate recordRefOperate;
inline int get_batch_redo_num()
{
return g_instance.attr.attr_storage.batch_redo_num;
}
inline int get_page_redo_worker_num_per_manager()
{
return g_instance.attr.attr_storage.recovery_redo_workers_per_paser_worker;
}
inline int get_trxn_redo_manager_num()
{
return TRXN_REDO_MANAGER_NUM;
}
inline int get_trxn_redo_worker_num()
{
return TRXN_REDO_WORKER_NUM;
}
void StartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen);
/* RedoItem lifecycle. */
void DispatchRedoRecordToFile(XLogReaderState *record, List *expectedTLIs, TimestampTz recordXTime);
void ProcessPendingRecords(bool fullSync = false);
void FreeRedoItem(RedoItem *item);
/* Dispatcher phases. */
void SendRecoveryEndMarkToWorkersAndWaitForFinish(int code);
void SendRecoveryEndMarkToWorkersAndWaitForReach(int code);
void WaitRedoFinish();
/* Dispatcher states. */
int GetDispatcherExitCode();
bool DispatchPtrIsNull();
uint32 GetBatchCount();
uint32 GetAllWorkerCount();
PGPROC *StartupPidGetProc(ThreadId pid);
extern void SetStartupBufferPinWaitBufId(int bufid);
extern void GetStartupBufferPinWaitBufId(int *bufids, uint32 len);
void UpdateStandbyState(HotStandbyState newState);
void UpdateMinRecoveryForTrxnRedoThd(XLogRecPtr minRecoveryPoint);
/* Redo end state saved by each page worker. */
void **GetXLogInvalidPagesFromWorkers();
/* Other utility functions. */
uint32 GetSlotId(const RelFileNode node, BlockNumber block, ForkNumber forkNum, uint32 workerCount);
bool XactHasSegpageRelFiles(XLogReaderState *record);
XLogReaderState *NewReaderState(XLogReaderState *readerState);
void FreeAllocatedRedoItem();
List *CheckImcompleteAction(List *imcompleteActionList);
void SetPageWorkStateByThreadId(uint32 threadState);
void GetReplayedRecPtr(XLogRecPtr *startPtr, XLogRecPtr *endPtr);
void StartupSendFowarder(RedoItem *item);
XLogRecPtr GetSafeMinCheckPoint();
RedoWaitInfo redo_get_io_event(int32 event_id);
void redo_get_worker_statistic(uint32 *realNum, RedoWorkerStatsData *worker, uint32 workerLen);
void CheckCommittingCsnList();
void redo_get_worker_time_count(RedoWorkerTimeCountsInfo **workerCountInfoList, uint32 *realNum);
void DumpDispatcher();
} // namespace ondemand_extreme_rto
#endif

View File

@ -0,0 +1,249 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* page_redo.h
*
*
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/page_redo.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_PAGE_REDO_H
#define ONDEMAND_EXTREME_RTO_PAGE_REDO_H
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/ondemand_extreme_rto/redo_item.h"
#include "nodes/pg_list.h"
#include "storage/proc.h"
#include "access/ondemand_extreme_rto/posix_semaphore.h"
#include "access/ondemand_extreme_rto/spsc_blocking_queue.h"
#include "access/xlogproc.h"
#include "postmaster/pagerepair.h"
namespace ondemand_extreme_rto {
#define ONDEMAND_DISTRIBUTE_RATIO 0.9
static const uint32 PAGE_WORK_QUEUE_SIZE = 2097152;
static const uint32 ONDEMAND_EXTREME_RTO_ALIGN_LEN = 16; /* need 128-bit aligned */
static const uint32 MAX_REMOTE_READ_INFO_NUM = 100;
static const uint32 ADVANCE_GLOBALLSN_INTERVAL = 1; /* unit second */
typedef enum {
REDO_BATCH,
REDO_PAGE_MNG,
REDO_PAGE_WORKER,
REDO_TRXN_MNG,
REDO_TRXN_WORKER,
REDO_READ_WORKER,
REDO_READ_PAGE_WORKER,
REDO_READ_MNG,
REDO_ROLE_NUM,
} RedoRole;
typedef struct BadBlockRecEnt{
RepairBlockKey key;
XLogPhyBlock pblk;
XLogRecPtr rec_min_lsn;
XLogRecPtr rec_max_lsn;
XLogRecParseState *head;
XLogRecParseState *tail;
} BadBlockRecEnt;
struct PageRedoWorker {
/*
* The last successfully applied log record's end position + 1 as an
* atomic uint64. The type of a log record's position is XLogRecPtr.
* Here the position is stored as an uint64 so it can be read and
* written atomically.
*/
XLogRecPtr lastReplayedReadRecPtr;
XLogRecPtr lastReplayedEndRecPtr;
#if (!defined __x86_64__) && (!defined __aarch64__)
/* protects lastReplayedReadRecPtr and lastReplayedEndRecPtr */
slock_t ptrLck;
#endif
PageRedoWorker *selfOrinAddr;
/* Worker id. */
uint32 id;
int index;
/* Thread id */
gs_thread_t tid;
/* The proc struct of this worker thread. */
PGPROC *proc;
RedoRole role;
uint32 slotId;
bool isUndoSpaceWorker;
/* ---------------------------------------------
* Initial context
*
* Global variable values at worker creation time.
*/
/* Initial server mode from the dispatcher. */
ServerMode initialServerMode;
/* Initial timeline ID from the dispatcher. */
TimeLineID initialTimeLineID;
List *expectedTLIs;
/* ---------------------------------------------
* Redo item queue.
*
* Redo items are provided by the dispatcher and consumed by each
* worker. See AddPageRedoItem() for the use of the additional
* pending list.
*/
/* The head of the pending item list. */
RedoItem *pendingHead;
/* The tail of the pending item list. */
RedoItem *pendingTail;
/* To-be-replayed log-record-list queue. */
SPSCBlockingQueue *queue;
/*
* The last recovery restart point seen by the txn worker. Restart
* points before this is useless and can be removed.
*/
XLogRecPtr lastCheckedRestartPoint;
/* min recovery point */
XLogRecPtr minRecoveryPoint;
/* ---------------------------------------------
* Per-worker run-time context
*
* States maintained by each individual page-redo worker during
* log replay. These are read by the txn-redo worker.
*/
/* ---------------------------------------------
* Global run-time context
*
* States maintained outside page-redo worker during log replay.
* Updates to these states must be synchronized to all page-redo workers.
*/
/*
* Global standbyState set by the txn worker.
*/
HotStandbyState standbyState;
TransactionId latestObservedXid;
bool StandbyMode;
char *DataDir;
TransactionId RecentXmin;
/* ---------------------------------------------
* Redo end context
*
* Thread-local variable values saved after log replay has completed.
* These values are collected by each redo worker at redo end and
* are used by the dispatcher.
*/
/* XLog invalid pages. */
void *xlogInvalidPages;
void *committingCsnList;
/* ---------------------------------------------
* Phase barrier.
*
* A barrier for synchronizing the dispatcher and page redo worker
* between different phases.
*/
/* Semaphore marking the completion of the current phase. */
PosixSemaphore phaseMarker;
MemoryContext oldCtx;
HTAB *redoItemHash;
TimeLineID recoveryTargetTLI;
bool ArchiveRecoveryRequested;
bool StandbyModeRequested;
bool InArchiveRecovery;
bool ArchiveRestoreRequested;
bool InRecovery;
char* recoveryRestoreCommand;
uint32 fullSyncFlag;
RedoParseManager parseManager;
RedoBufferManager bufferManager;
RedoTimeCost timeCostList[TIME_COST_NUM];
char page[BLCKSZ];
XLogBlockDataParse *curRedoBlockState;
};
extern THR_LOCAL PageRedoWorker *g_redoWorker;
/* Worker lifecycle. */
PageRedoWorker *StartPageRedoWorker(PageRedoWorker *worker);
void DestroyPageRedoWorker(PageRedoWorker *worker);
/* Thread creation utility functions. */
bool IsPageRedoWorkerProcess(int argc, char *argv[]);
void AdaptArgvForPageRedoWorker(char *argv[]);
void GetThreadNameIfPageRedoWorker(int argc, char *argv[], char **threadNamePtr);
extern bool RedoWorkerIsUndoSpaceWorker();
uint32 GetMyPageRedoWorkerIdWithLock();
PGPROC *GetPageRedoWorkerProc(PageRedoWorker *worker);
/* Worker main function. */
void ParallelRedoThreadRegister();
void ParallelRedoThreadMain();
/* Dispatcher phases. */
bool SendPageRedoEndMark(PageRedoWorker *worker);
void WaitPageRedoWorkerReachLastMark(PageRedoWorker *worker);
/* Redo processing. */
void AddPageRedoItem(PageRedoWorker *worker, void *item);
uint64 GetCompletedRecPtr(PageRedoWorker *worker);
void UpdatePageRedoWorkerStandbyState(PageRedoWorker *worker, HotStandbyState newState);
/* Redo end states. */
void ClearBTreeIncompleteActions(PageRedoWorker *worker);
void *GetXLogInvalidPages(PageRedoWorker *worker);
bool RedoWorkerIsIdle(PageRedoWorker *worker);
void DumpPageRedoWorker(PageRedoWorker *worker);
PageRedoWorker *CreateWorker(uint32 id);
extern void UpdateRecordGlobals(RedoItem *item, HotStandbyState standbyState);
void ReferenceRedoItem(void *item);
void DereferenceRedoItem(void *item);
void ReferenceRecParseState(XLogRecParseState *recordstate);
void DereferenceRecParseState(XLogRecParseState *recordstate);
void PushToWorkerLsn();
void GetCompletedReadEndPtr(PageRedoWorker *worker, XLogRecPtr *readPtr, XLogRecPtr *endPtr);
void SetReadBufferForExtRto(XLogReaderState *state, XLogRecPtr pageptr, int reqLen);
void DumpExtremeRtoReadBuf();
void PutRecordToReadQueue(XLogReaderState *recordreader);
bool LsnUpdate();
void ResetRtoXlogReadBuf(XLogRecPtr targetPagePtr);
bool XLogPageReadForExtRto(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen);
void ExtremeRtoStopHere();
void WaitAllRedoWorkerQueueEmpty();
void WaitAllReplayWorkerIdle();
void DispatchClosefdMarkToAllRedoWorker();
void DispatchCleanInvalidPageMarkToAllRedoWorker(RepairFileKey key);
const char *RedoWokerRole2Str(RedoRole role);
} // namespace ondemand_extreme_rto
#endif

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* posix_semaphore.h
*
*
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/posix_semaphore.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_POSIX_SEMAPHORE_H
#define ONDEMAND_EXTREME_RTO_POSIX_SEMAPHORE_H
#include <semaphore.h>
namespace ondemand_extreme_rto {
typedef struct {
sem_t semaphore;
bool initialized;
} PosixSemaphore;
void PosixSemaphoreInit(PosixSemaphore *sem, unsigned int initValue);
void PosixSemaphoreDestroy(PosixSemaphore *sem);
void PosixSemaphoreWait(PosixSemaphore *sem);
void PosixSemaphorePost(PosixSemaphore *sem);
} // namespace ondemand_extreme_rto
#endif

View File

@ -0,0 +1,75 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* redo_item.h
*
*
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/redo_item.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_REDO_ITEM_H
#define ONDEMAND_EXTREME_RTO_REDO_ITEM_H
#include "access/xlogreader.h"
#include "datatype/timestamp.h"
#include "nodes/pg_list.h"
#include "utils/atomic.h"
#include "storage/buf/block.h"
#include "storage/smgr/relfilenode.h"
#include "access/ondemand_extreme_rto/posix_semaphore.h"
#include "replication/replicainternal.h"
namespace ondemand_extreme_rto {
typedef struct RedoItem_s {
bool needImmediateCheckpoint;
bool needFullSyncCheckpoint;
/* The expected timelines for this record. */
List *expectedTLIs;
/* The timestamp of the log record if it is a transaction record. */
TimestampTz recordXTime;
/* Next item on the free list. */
struct RedoItem_s *freeNext;
/* A "deep" copy of the log record. */
XLogReaderState record;
/* Used for really free */
struct RedoItem_s *allocatedNext;
TimestampTz syncXLogReceiptTime;
int syncXLogReceiptSource;
TransactionId RecentXmin;
ServerMode syncServerMode;
} RedoItem;
static const int32 ANY_BLOCK_ID = -1;
static const uint32 ANY_WORKER = (uint32)-1;
static const uint32 TRXN_WORKER = (uint32)-2;
static const uint32 ALL_WORKER = (uint32)-3;
static const uint32 USTORE_WORKER = (uint32)-4;
void DumpItem(RedoItem *item, const char *funcName);
static inline RedoItem *GetRedoItemPtr(XLogReaderState *record)
{
return (RedoItem *)(((char *)record) - offsetof(RedoItem, record));
}
} // namespace ondemand_extreme_rto
#endif

View File

@ -0,0 +1,51 @@
/*
* Copyright (c) 2023 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* redo_utils.h
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/redo_utils.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_REDO_UTILS_H
#define ONDEMAND_EXTREME_RTO_REDO_UTILS_H
#include "access/xlogproc.h"
#define PARSEBUFFER_SIZE (sizeof(XLogRecParseState) + sizeof(ParseBufferDesc))
#define ONDEMAND_MAX_PARSEBUFF_PREPALLOC ((1024 * 1024 * 1024 - 1) / PARSEBUFFER_SIZE)
#define ONDEMAND_MAX_PARSESIZE_PREPALLOC (ONDEMAND_MAX_PARSEBUFF_PREPALLOC * PARSEBUFFER_SIZE)
#define ONDEMAND_MAX_PARSEBUFF_ALLOCSIZE 100 // 100GB
typedef struct
{
int allocNum;
void *allocEntry[ONDEMAND_MAX_PARSEBUFF_ALLOCSIZE];
void *memslotEntry;
} OndemandParseAllocCtrl;
void OndemandXLogParseBufferInit(RedoParseManager *parsemanager, int buffernum, RefOperate *refOperate,
InterruptFunc interruptOperte);
void OndemandXLogParseBufferDestory(RedoParseManager *parsemanager);
XLogRecParseState *OndemandXLogParseBufferAllocList(RedoParseManager *parsemanager, XLogRecParseState *blkstatehead,
void *record);
void OndemandXLogParseBufferRelease(XLogRecParseState *recordstate);
void OnDemandSendRecoveryEndMarkToWorkersAndWaitForReach(int code);
void OnDemandWaitRedoFinish();
#endif /* ONDEMAND_EXTREME_RTO_REDO_UTILS_H */

View File

@ -0,0 +1,62 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* spsc_blocking_queue.h
*
*
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/spsc_blocking_queue.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_SPSC_BLOCKING_QUEUE_H
#define ONDEMAND_EXTREME_RTO_SPSC_BLOCKING_QUEUE_H
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/parallel_recovery/posix_semaphore.h"
namespace ondemand_extreme_rto {
typedef void (*CallBackFunc)();
struct SPSCBlockingQueue {
pg_atomic_uint32 writeHead; /* Array index for the next write. */
pg_atomic_uint32 readTail; /* Array index for the next read. */
uint32 capacity; /* Queue capacity, must be power of 2. */
uint32 mask; /* Bit mask for computing index. */
pg_atomic_uint32 maxUsage;
pg_atomic_uint64 totalCnt;
CallBackFunc callBackFunc;
uint64 lastTotalCnt;
void *buffer[1]; /* Queue buffer, the actual size is capacity. */
};
SPSCBlockingQueue *SPSCBlockingQueueCreate(uint32 capacity, CallBackFunc func = NULL);
void SPSCBlockingQueueDestroy(SPSCBlockingQueue *queue);
bool SPSCBlockingQueuePut(SPSCBlockingQueue *queue, void *element);
void *SPSCBlockingQueueTake(SPSCBlockingQueue *queue);
bool SPSCBlockingQueueIsEmpty(SPSCBlockingQueue *queue);
void *SPSCBlockingQueueTop(SPSCBlockingQueue *queue);
void SPSCBlockingQueuePop(SPSCBlockingQueue *queue);
void DumpQueue(const SPSCBlockingQueue *queue);
uint32 SPSCGetQueueCount(SPSCBlockingQueue *queue);
bool SPSCBlockingQueueGetAll(SPSCBlockingQueue *queue, void ***eleArry, uint32 *eleNum);
void SPSCBlockingQueuePopN(SPSCBlockingQueue *queue, uint32 n);
} // namespace ondemand_extreme_rto
#endif

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* txn_redo.h
*
*
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/txn_redo.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_TXN_REDO_H
#define ONDEMAND_EXTREME_RTO_TXN_REDO_H
#include "access/parallel_recovery/redo_item.h"
namespace ondemand_extreme_rto {
void AddTxnRedoItem(PageRedoWorker *worker, void *item);
void TrxnMngProc(RedoItem *item, PageRedoWorker *wk);
void TrxnWorkerProc(RedoItem *item);
} // namespace ondemand_extreme_rto
#endif

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2023 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* xlog_read.h
*
*
*
* IDENTIFICATION
* src/include/access/ondemand_extreme_rto/xlog_read.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef ONDEMAND_EXTREME_RTO_XLOG_READ_H
#define ONDEMAND_EXTREME_RTO_XLOG_READ_H
#include "access/xlog_basic.h"
namespace ondemand_extreme_rto {
XLogRecord* XLogParallelReadNextRecord(XLogReaderState* xlogreader);
XLogRecord *ReadNextXLogRecord(XLogReaderState **xlogreaderptr, int emode);
} // namespace ondemand_extreme_rto
#endif /* ONDEMAND_EXTREME_RTO_XLOG_READ_H */

View File

@ -126,7 +126,7 @@ void GetReplayedRecPtrFromUndoWorkers(XLogRecPtr *readPtr, XLogRecPtr *endPtr);
List* CheckImcompleteAction(List* imcompleteActionList);
void SetPageWorkStateByThreadId(uint32 threadState);
RedoWaitInfo redo_get_io_event(int32 event_id);
void redo_get_wroker_statistic(uint32* realNum, RedoWorkerStatsData* worker, uint32 workerLen);
void redo_get_worker_statistic(uint32* realNum, RedoWorkerStatsData* worker, uint32 workerLen);
extern void redo_dump_all_stats();
void WaitRedoWorkerIdle();
void SendClearMarkToAllWorkers();
@ -139,7 +139,7 @@ extern void InitReaderStateByOld(XLogReaderState *newState, XLogReaderState *old
extern void CopyDataFromOldReader(XLogReaderState *newReaderState, XLogReaderState *oldReaderState);
bool TxnQueueIsEmpty(TxnRedoWorker* worker);
void redo_get_wroker_time_count(RedoWorkerTimeCountsInfo **workerCountInfoList, uint32 *realNum);
void redo_get_worker_time_count(RedoWorkerTimeCountsInfo **workerCountInfoList, uint32 *realNum);
}

View File

@ -64,6 +64,15 @@ typedef enum {
STANDBY_SNAPSHOT_READY
} HotStandbyState;
typedef enum {
TRIGGER_NORMAL = 0,
TRIGGER_PRIMARY,
TRIGGER_STADNBY,
TRIGGER_FAILOVER,
TRIGGER_SWITCHOVER,
TRIGGER_SMARTSHUTDOWN,
} Enum_TriggeredState;
#define InHotStandby (t_thrd.xlog_cxt.standbyState >= STANDBY_SNAPSHOT_PENDING)
#define DUMMYSTANDBY_CONNECT_INTERVAL 3 // unit second
@ -532,6 +541,8 @@ typedef struct XLogCtlData {
bool SharedRecoveryInProgress;
bool IsRecoveryDone;
bool IsOnDemandBuildDone;
bool IsOnDemandRecoveryDone;
/*
* SharedHotStandbyActive indicates if we're still in crash or archive
@ -810,7 +821,6 @@ extern char* TrimStr(const char* str);
extern void CloseXlogFilesAtThreadExit(void);
extern void SetLatestXTime(TimestampTz xtime);
XLogRecord* XLogParallelReadNextRecord(XLogReaderState* xlogreader);
void ResourceManagerStartup(void);
void ResourceManagerStop(void);
@ -855,6 +865,16 @@ bool CheckForSwitchoverTrigger(void);
void HandleCascadeStandbyPromote(XLogRecPtr *recptr);
void update_dirty_page_queue_rec_lsn(XLogRecPtr current_insert_lsn, bool need_immediately_update = false);
XLogRecord *ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int whichChkpt);
int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
bool timeLineInHistory(TimeLineID tli, List *expectedTLEs);
Enum_TriggeredState CheckForSatartupStatus(void);
bool CheckForStandbyTrigger(void);
void UpdateMinrecoveryInAchive();
bool NewDataIsInBuf(XLogRecPtr expectedRecPtr);
bool rescanLatestTimeLine(void);
int XLogFileReadAnyTLI(XLogSegNo segno, int emode, uint32 sources);
int SSXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *readBuf, TimeLineID *readTLI, char* xlog_path);
extern XLogRecPtr XlogRemoveSegPrimary;

View File

@ -99,6 +99,7 @@
* The XLog directory and control file (relative to $PGDATA)
*/
#define SS_XLOGDIR (g_instance.datadir_cxt.xlogDir)
#define SS_XLOGRECOVERYDIR (g_instance.dms_cxt.SSRecoveryInfo.recovery_xlogDir)
#define XLOGDIR "pg_xlog"
#define ARCHIVEDIR "pg_xlog/archive_status"
#define XLOG_CONTROL_FILE (g_instance.datadir_cxt.controlPath)

View File

@ -111,6 +111,7 @@ typedef struct {
typedef struct {
Buffer buff_id;
pg_atomic_uint32 state;
pg_atomic_uint32 refcount;
} ParseBufferDesc;
#define RedoBufferSlotGetBuffer(bslot) ((bslot)->buf_id)
@ -687,7 +688,12 @@ typedef struct
RefOperate *refOperate;
}RedoParseManager;
typedef enum {
XLOG_NO_DISTRIBUTE,
XLOG_HEAD_DISTRIBUTE,
XLOG_MID_DISTRIBUTE,
XLOG_TAIL_DISTRIBUTE,
} XlogDistributePos;
typedef struct {
void* nextrecord;
@ -695,6 +701,7 @@ typedef struct {
RedoParseManager* manager;
void* refrecord; /* origin dataptr, for mem release */
bool isFullSync;
XlogDistributePos distributeStatus;
} XLogRecParseState;
typedef struct XLogBlockRedoExtreRto {
@ -908,7 +915,36 @@ extern AbnormalProcFunc g_AbFunList[ABNORMAL_NUM];
#define ADD_ABNORMAL_POSITION(pos)
#endif
static inline bool AtomicCompareExchangeBuffer(volatile Buffer *ptr, Buffer *expected, Buffer newval)
{
bool ret = false;
Buffer current;
current = __sync_val_compare_and_swap(ptr, *expected, newval);
ret = (current == *expected);
*expected = current;
return ret;
}
static inline Buffer AtomicReadBuffer(volatile Buffer *ptr)
{
return *ptr;
}
static inline void AtomicWriteBuffer(volatile Buffer* ptr, Buffer val)
{
*ptr = val;
}
static inline Buffer AtomicExchangeBuffer(volatile Buffer *ptr, Buffer newval)
{
Buffer old;
while (true) {
old = AtomicReadBuffer(ptr);
if (AtomicCompareExchangeBuffer(ptr, &old, newval))
break;
}
return old;
}
void HeapXlogCleanOperatorPage(
RedoBufferInfo* buffer, void* recorddata, void* blkdata, Size datalen, Size* freespace, bool repairFragmentation);
@ -1204,6 +1240,7 @@ extern XLogRecParseState* xact_redo_parse_to_block(XLogReaderState* record, uint
extern bool XLogBlockRedoForExtremeRTO(XLogRecParseState* redoblocktate, RedoBufferInfo *bufferinfo,
bool notfound, RedoTimeCost &readBufCost, RedoTimeCost &redoCost);
extern void XlogBlockRedoForOndemandExtremeRTOQuery(XLogRecParseState *redoBlockState, RedoBufferInfo *bufferInfo);
void XLogBlockParseStateRelease_debug(XLogRecParseState* recordstate, const char *func, uint32 line);
#define XLogBlockParseStateRelease(recordstate) XLogBlockParseStateRelease_debug(recordstate, __FUNCTION__, __LINE__)
#ifdef USE_ASSERT_CHECKING

View File

@ -140,6 +140,17 @@
#define SS_PRIMARY_STANDBY_CLUSTER_NORMAL_STANDBY \
(SS_NORMAL_STANDBY && (g_instance.attr.attr_storage.xlog_file_path != 0))
#define SS_CLUSTER_NOT_NORAML (ENABLE_DMS && (g_instance.dms_cxt.SSReformerControl.clusterStatus != CLUSTER_NORMAL))
#define SS_CLUSTER_ONDEMAND_BUILD \
(ENABLE_DMS && (g_instance.dms_cxt.SSReformerControl.clusterStatus == CLUSTER_IN_ONDEMAND_BUILD))
#define SS_CLUSTER_ONDEMAND_RECOVERY \
(ENABLE_DMS && (g_instance.dms_cxt.SSReformerControl.clusterStatus == CLUSTER_IN_ONDEMAND_RECOVERY))
#define SS_CLUSTER_ONDEMAND_NORMAL \
(ENABLE_DMS && (g_instance.dms_cxt.SSReformerControl.clusterStatus == CLUSTER_NORMAL))
#define SS_STANDBY_ONDEMAND_BUILD (SS_STANDBY_MODE && SS_CLUSTER_ONDEMAND_BUILD)
#define SS_STANDBY_ONDEMAND_RECOVERY (SS_STANDBY_MODE && SS_CLUSTER_ONDEMAND_RECOVERY)
#define SS_STANDBY_ONDEMAND_NORMAL (SS_STANDBY_MODE && SS_CLUSTER_ONDEMAND_NORMAL)
/* DMS_BUF_NEED_LOAD */
#define BUF_NEED_LOAD 0x1
/* DMS_BUF_IS_LOADED */
@ -207,5 +218,18 @@ typedef enum SSReformType {
DMS_REFORM_TYPE_FOR_MAINTAIN
} SSReformType;
typedef enum SSGlobalClusterState {
CLUSTER_IN_ONDEMAND_BUILD = 0,
CLUSTER_IN_ONDEMAND_RECOVERY,
CLUSTER_NORMAL
} SSGlobalClusterState;
typedef enum SSOndemandRequestRedoStatus {
ONDEMAND_REDO_DONE = 0,
ONDEMAND_REDO_SKIP,
ONDEMAND_REDO_FAIL,
ONDEMAND_REDO_INVALID
} SSOndemandRequestRedoStatus;
#endif

View File

@ -80,6 +80,8 @@ typedef struct st_ss_dms_func {
void (*dms_refresh_logger)(char *log_field, unsigned long long *value);
void (*dms_validate_drc)(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, unsigned long long lsn,
unsigned char is_dirty);
int (*dms_reform_req_opengauss_ondemand_redo_buffer)(dms_context_t *dms_ctx, void *block_key, unsigned int key_len,
int *redo_status);
} ss_dms_func_t;
int ss_dms_func_init();
@ -123,6 +125,8 @@ bool dms_latch_timed_s(dms_context_t *dms_ctx, dms_drlatch_t *dlatch, unsigned i
void dms_unlatch(dms_context_t *dms_ctx, dms_drlatch_t *dlatch);
void dms_pre_uninit(void);
void dms_validate_drc(dms_context_t *dms_ctx, dms_buf_ctrl_t *ctrl, unsigned long long lsn, unsigned char is_dirty);
int dms_reform_req_opengauss_ondemand_redo_buffer(dms_context_t *dms_ctx, void *block_key, unsigned int key_len,
int *redo_status);
#ifdef __cplusplus
}
#endif

View File

@ -83,4 +83,5 @@ long SSGetBufSleepTime(int retry_times);
SMGR_READ_STATUS SmgrNetPageCheckRead(Oid spcNode, Oid dbNode, Oid relNode, ForkNumber forkNum,
BlockNumber blockNo, char *blockbuf);
void SSUnPinBuffer(BufferDesc* buf_desc);
bool SSOndemandRequestPrimaryRedo(BufferTag tag);
#endif

View File

@ -32,11 +32,30 @@
#define SS_BEFORE_RECOVERY (ENABLE_DMS && g_instance.dms_cxt.SSReformInfo.in_reform == true \
&& g_instance.dms_cxt.SSRecoveryInfo.recovery_pause_flag == true)
#define SS_IN_FAILOVER (ENABLE_DMS && g_instance.dms_cxt.SSRecoveryInfo.in_failover == true)
#define SS_IN_ONDEMAND_RECOVERY (ENABLE_DMS && g_instance.dms_cxt.SSRecoveryInfo.in_ondemand_recovery == true)
#define SS_ONDEMAND_BUILD_DONE (ENABLE_DMS && SS_IN_ONDEMAND_RECOVERY \
&& t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandBuildDone == true)
#define SS_ONDEMAND_RECOVERY_DONE (ENABLE_DMS && SS_IN_ONDEMAND_RECOVERY \
&& t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandRecoveryDone == true)
#define SS_REPLAYED_BY_ONDEMAND (ENABLE_DMS && !SS_IN_ONDEMAND_RECOVERY && \
t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandBuildDone == true && \
t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandRecoveryDone == true)
typedef struct st_reformer_ctrl {
#define REFORM_CTRL_VERSION 1
typedef struct st_old_reformer_ctrl {
uint64 list_stable; // stable instances list
int primaryInstId;
pg_crc32c crc;
} ss_old_reformer_ctrl_t;
typedef struct st_reformer_ctrl {
uint32 version;
uint64 list_stable; // stable instances list
int primaryInstId;
int recoveryInstId;
SSGlobalClusterState clusterStatus;
pg_crc32c crc;
} ss_reformer_ctrl_t;
typedef struct st_reform_info {
@ -66,14 +85,14 @@ typedef struct ss_recovery_info {
bool no_backend_left;
bool startup_need_exit_normally; //used in alive failover
bool recovery_trapped_in_page_request; //used in alive failover
bool in_ondemand_recovery;
} ss_recovery_info_t;
extern bool SSRecoveryNodes();
extern void SSWaitStartupExit();
extern int SSGetPrimaryInstId();
extern void SSSavePrimaryInstId(int id);
extern void SSReadControlFile(int id, bool updateDmsCtx = false);
extern void SSWriteReformerControlPages(void);
extern void SSInitReformerControlPages(void);
extern bool SSRecoveryApplyDelay();
extern void SShandle_promote_signal();
extern void ss_failover_dw_init();

View File

@ -32,8 +32,10 @@
#define DMS_MAX_CONNECTIONS (int32)16000
#define SS_PRIMARY_ID g_instance.dms_cxt.SSReformerControl.primaryInstId // currently master ID is hardcoded as 0
#define SS_RECOVERY_ID g_instance.dms_cxt.SSReformerControl.recoveryInstId
#define SS_MY_INST_ID g_instance.attr.attr_storage.dms_attr.instance_id
#define SS_OFFICIAL_PRIMARY (SS_MY_INST_ID == SS_PRIMARY_ID)
#define SS_OFFICIAL_RECOVERY_NODE (SS_MY_INST_ID == SS_RECOVERY_ID)
void DMSInit();
void DMSUninit();

View File

@ -31,14 +31,18 @@
#define REFORM_WAIT_LONG 100000 /* 0.1 sec */
#define WAIT_REFORM_CTRL_REFRESH_TRIES 1000
#define REFORM_CTRL_VERSION 1
typedef struct SSBroadcastCancelTrx {
SSBroadcastOp type; // must be first
} SSBroadcastCancelTrx;
bool SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, XLogRecPtr targetRecPtr, char *buf);
int SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, XLogRecPtr targetRecPtr, char *buf,
int readLen);
XLogReaderState *SSXLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data, Size alignedSize);
void SSGetXlogPath();
void SSSaveReformerCtrl();
void SSGetRecoveryXlogPath();
void SSSaveReformerCtrl(bool force = false);
void SSReadControlFile(int id, bool updateDmsCtx = false);
void SSClearSegCache();
int SSCancelTransactionOfAllStandby(SSBroadcastOp type);
int SSProcessCancelTransaction(SSBroadcastOp type);

View File

@ -100,6 +100,8 @@ typedef struct knl_instance_attr_dms {
bool enable_catalog_centralized;
bool enable_dss_aio;
bool enable_verify_page;
bool enable_ondemand_recovery;
int ondemand_recovery_mem_size;
int instance_id;
int recv_msg_pool_size;
char* interconnect_url;

View File

@ -741,6 +741,8 @@ typedef struct knl_g_parallel_redo_context {
char* ali_buf;
XLogRedoNumStatics xlogStatics[RM_NEXT_ID][MAX_XLOG_INFO_NUM];
RedoCpuBindControl redoCpuBindcontrl;
HTAB **redoItemHash; /* used in ondemand extreme RTO */
} knl_g_parallel_redo_context;
typedef struct knl_g_heartbeat_context {
@ -827,7 +829,7 @@ typedef struct knl_g_comm_context {
long lastArchiveRcvTime;
void* pLogCtl;
bool rejectRequest;
MemoryContext redoItemCtx;
#ifdef USE_SSL
libcomm_sslinfo* libcomm_data_port_list;
libcomm_sslinfo* libcomm_ctrl_port_list;

View File

@ -3356,6 +3356,12 @@ typedef struct knl_t_dms_context {
bool flush_copy_get_page_failed; //used in flush copy
} knl_t_dms_context;
typedef struct knl_t_ondemand_xlog_copy_context {
int openLogFile;
XLogSegNo openLogSegNo;
uint32 openLogOff;
} knl_t_ondemand_xlog_copy_context;
/* thread context. */
typedef struct knl_thrd_context {
knl_thread_role role;
@ -3504,6 +3510,7 @@ typedef struct knl_thrd_context {
knl_t_cfs_shrinker_context cfs_shrinker_cxt;
knl_t_sql_patch_context sql_patch_cxt;
knl_t_dms_context dms_cxt;
knl_t_ondemand_xlog_copy_context ondemand_xlog_copy_cxt;
knl_t_rc_context rc_cxt;
} knl_thrd_context;

View File

@ -37,6 +37,7 @@
/*****************************************************************************
* Backend version and inplace upgrade staffs
*****************************************************************************/
extern const uint32 ONDEMAND_REDO_VERSION_NUM;
extern const uint32 SRF_FUSION_VERSION_NUM;
extern const uint32 INNER_UNIQUE_VERSION_NUM;
extern const uint32 PARTITION_ENHANCE_VERSION_NUM;
@ -131,6 +132,7 @@ extern const uint32 CREATE_TABLE_AS_VERSION_NUM;
extern void register_backend_version(uint32 backend_version);
extern bool contain_backend_version(uint32 version_number);
extern void SSUpgradeFileBeforeCommit();
#define INPLACE_UPGRADE_PRECOMMIT_VERSION 1
@ -402,6 +404,7 @@ extern bool stack_is_too_deep(void);
/* in tcop/utility.c */
extern void PreventCommandIfReadOnly(const char* cmdname);
extern void PreventCommandDuringRecovery(const char* cmdname);
extern void PreventCommandDuringSSOndemandRecovery(Node* parseTree);
extern int trace_recovery(int trace_level);

View File

@ -21,14 +21,8 @@
#include "replication/output_plugin.h"
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/extreme_rto/redo_item.h"
#include "nodes/pg_list.h"
#include "storage/proc.h"
#include "access/extreme_rto/posix_semaphore.h"
#include "access/extreme_rto/spsc_blocking_queue.h"
#include "access/parallel_recovery/redo_item.h"
#include "nodes/parsenodes_common.h"

View File

@ -420,5 +420,6 @@ extern bool StartBufferIO(BufferDesc* buf, bool forInput);
extern Buffer ReadBuffer_common_for_dms(ReadBufferMode readmode, BufferDesc *bufDesc, const XLogPhyBlock *pblk);
extern void ReadBuffer_common_for_check(ReadBufferMode readmode, BufferDesc* buf_desc,
const XLogPhyBlock *pblk, Block bufBlock);
extern BufferDesc *RedoForOndemandExtremeRTOQuery(BufferDesc *bufHdr, char relpersistence,
ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode);
#endif

View File

@ -128,6 +128,9 @@ const struct LWLOCK_PARTITION_DESC LWLockPartInfo[] = {
/* Number of partions of the segment head buffer */
#define NUM_SEGMENT_HEAD_PARTITIONS 128
/* Number of partitions of the redo xlog track mapping hashtable */
#define NUM_XLOG_TRACK_PARTITIONS 4096
/* Number of partions the session roleid hashtable */
#define NUM_SESSION_ROLEID_PARTITIONS 128
@ -190,8 +193,9 @@ const struct LWLOCK_PARTITION_DESC LWLockPartInfo[] = {
#define FirstGPRCMappingLock (FirstSessRoleIdLock + NUM_SESSION_ROLEID_PARTITIONS)
/* standby statement history */
#define FirstStandbyStmtHistLock (FirstGPRCMappingLock + NUM_GPRC_PARTITIONS)
#define FirstXlogTrackLock (FirstStandbyStmtHistLock + NUM_STANDBY_STMTHIST_PARTITIONS)
/* must be last: */
#define NumFixedLWLocks (FirstStandbyStmtHistLock + NUM_STANDBY_STMTHIST_PARTITIONS)
#define NumFixedLWLocks (FirstXlogTrackLock + NUM_XLOG_TRACK_PARTITIONS)
/*
* WARNING----Please keep BuiltinTrancheIds and BuiltinTrancheNames consistent!!!
*
@ -270,6 +274,7 @@ enum BuiltinTrancheIds
LWTRANCHE_REPLICATION_ORIGIN,
LWTRANCHE_AUDIT_INDEX_WAIT,
LWTRANCHE_PCA_BUFFER_CONTENT,
LWTRANCHE_XLOG_TRACK_PARTITION,
/*
* Each trancheId above should have a corresponding item in BuiltinTrancheNames;
*/

View File

@ -638,8 +638,13 @@ extern void write_stderr(const char* fmt, ...)
the supplied arguments. */
__attribute__((format(PG_PRINTF_ATTRIBUTE, 1, 2)));
extern void getElevelAndSqlstate(int* eLevel, int* sqlState);
extern void write_stderr_with_prefix(const char* fmt, ...)
/* This extension allows gcc to check the format string for consistency with
the supplied arguments. */
__attribute__((format(PG_PRINTF_ATTRIBUTE, 1, 2)));
extern void getElevelAndSqlstate(int* eLevel, int* sqlState);
extern void get_time_now(char* nowTime, int timeLen);
void freeSecurityFuncSpace(char* charList, ...);
extern void SimpleLogToServer(int elevel, bool silent, const char* fmt, ...)