资源池化rto优化

This commit is contained in:
chendong76
2024-01-16 19:19:54 +08:00
parent d86c7fcb3d
commit 27d62c009e
51 changed files with 2010 additions and 311 deletions

View File

@ -55,9 +55,6 @@ inline bool IsOndemandExtremeRtoMode()
{
return (g_extreme_rto_type == ONDEMAND_EXTREME_RTO);
}
inline void SetOndemandExtremeRtoMode();
inline bool IsDefaultExtremeRtoMode();
inline bool IsOndemandExtremeRtoMode();
void ExtremeWaitAllReplayWorkerIdle();
void ExtremeDispatchCleanInvalidPageMarkToAllRedoWorker(RepairFileKey key);
void ExtremeDispatchClosefdMarkToAllRedoWorker();

View File

@ -38,9 +38,12 @@
#ifdef ENABLE_LITE_MODE
#define ENABLE_ONDEMAND_RECOVERY false
#define ENABLE_ONDEMAND_REALTIME_BUILD false
#else
#define ENABLE_ONDEMAND_RECOVERY (ENABLE_DMS && IsExtremeRedo() \
&& g_instance.attr.attr_storage.dms_attr.enable_ondemand_recovery)
#define ENABLE_ONDEMAND_REALTIME_BUILD (ENABLE_ONDEMAND_RECOVERY \
&& g_instance.attr.attr_storage.dms_attr.enable_ondemand_realtime_build)
#endif
typedef enum {
@ -98,14 +101,12 @@ inline bool IsParallelRedo()
return g_instance.comm_cxt.predo_cxt.redoType == PARALLEL_REDO && (get_real_recovery_parallelism() > 1);
}
static inline bool IsMultiThreadRedo()
{
return (get_real_recovery_parallelism() > 1);
}
uint32 GetRedoWorkerCount();
bool IsMultiThreadRedoRunning();
void DispatchRedoRecord(XLogReaderState* record, List* expectedTLIs, TimestampTz recordXTime);
void GetThreadNameIfMultiRedo(int argc, char* argv[], char** threadNamePtr);

View File

@ -56,6 +56,7 @@ static const int MAX_REDO_WORKERS_PER_PARSE = 8;
static const int TRXN_REDO_MANAGER_NUM = 1;
static const int TRXN_REDO_WORKER_NUM = 1;
static const int XLOG_READER_NUM = 3;
static const int ONDEMAND_AUXILIARY_WORKER_NUM = 2; // segredoworker and ctrlworker, only for ondemand recovery
static const int MAX_EXTREME_THREAD_NUM = MAX_PARSE_WORKERS * MAX_REDO_WORKERS_PER_PARSE + MAX_PARSE_WORKERS +
MAX_PARSE_WORKERS + TRXN_REDO_MANAGER_NUM + TRXN_REDO_WORKER_NUM + XLOG_READER_NUM;

View File

@ -69,9 +69,13 @@ typedef struct redoitemhashentry {
} RedoItemHashEntry;
extern void PRPrintRedoItemHashTab(HTAB *redoItemHash);
extern HTAB **PRRedoItemHashInitialize(MemoryContext context);
extern void PRTrackClearBlock(XLogRecParseState *recordBlockState, HTAB *redoItemHash);
extern void PRTrackAddBlock(XLogRecParseState *recordBlockState, HTAB *redoItemHash);
extern ondemand_htab_ctrl_t *PRRedoItemHashInitialize(MemoryContext context);
extern ondemand_htab_ctrl_t **PRInitRedoItemHashForAllPipeline(MemoryContext context);
extern void PRTrackClearBlock(XLogRecParseState *recordBlockState, HTAB *redoItemHash);
extern void PRTrackAddBlock(XLogRecParseState *recordBlockState, HTAB *redoItemHash, bool isHead = false);
extern void PRTrackAddBatchBlock(XLogRecParseState *headBlockState, XLogRecParseState *tailBlockState, int count,
HTAB *redoItemHash, bool isHead);
extern void PRTrackAllClear(HTAB *redoItemHash);
extern uint32 XlogTrackTableHashCode(RedoItemTag *tagPtr);
} // namespace ondemand_extreme_rto

View File

@ -43,6 +43,7 @@ namespace ondemand_extreme_rto {
typedef struct {
PageRedoWorker *batchThd; /* BatchRedoThread */
PageRedoWorker *managerThd; /* PageRedoManager */
PageRedoWorker *htabThd; /* HashMapManager */
PageRedoWorker **redoThd; /* RedoThreadPool */
uint32 redoThdNum;
uint32 *chosedRTIds; /* chosedRedoThdIds */
@ -60,6 +61,11 @@ typedef struct ReadPipeline {
PageRedoWorker *readThd; /* readthrd */
} ReadPipeline;
typedef struct AuxiliaryPipeLine {
PageRedoWorker *segRedoThd;
PageRedoWorker *ctrlThd;
} AuxiliaryPipeLine;
#define MAX_XLOG_READ_BUFFER (0xFFFFF) /* 8k uint */
typedef enum {
@ -130,6 +136,7 @@ typedef struct {
uint32 chosedPLCnt; /* chosedPageLineCount */
TrxnRedoPipeline trxnLine;
ReadPipeline readLine;
AuxiliaryPipeLine auxiliaryLine;
RecordBufferState rtoXlogBufState;
PageRedoWorker **allWorkers; /* Array of page redo workers. */
uint32 allWorkersCnt;
@ -166,6 +173,11 @@ typedef struct {
volatile XLogRedoNumStatics xlogStatics[RM_NEXT_ID][MAX_XLOG_INFO_NUM];
RedoTimeCost *startupTimeCost;
RedoParseManager parseManager;
/* used in realtime ondemand extreme rto */
volatile XLogRecPtr ckptRedoPtr;
volatile XLogRecPtr syncRecordPtr;
SPSCBlockingQueue *trxnQueue;
SPSCBlockingQueue *segQueue;
} LogDispatcher;
typedef struct {
@ -180,6 +192,7 @@ extern LogDispatcher *g_dispatcher;
extern RedoItem g_GlobalLsnForwarder;
extern RedoItem g_cleanupMark;
extern RedoItem g_forceDistributeMark;
extern RedoItem g_hashmapPruneMark;
extern THR_LOCAL RecordBufferState *g_recordbuffer;
const static uint64 OUTPUT_WAIT_COUNT = 0x7FFFFFF;
@ -213,13 +226,14 @@ void StartRecoveryWorkers(XLogReaderState *xlogreader, uint32 privateLen);
/* RedoItem lifecycle. */
void DispatchRedoRecordToFile(XLogReaderState *record, List *expectedTLIs, TimestampTz recordXTime);
void UpdateCheckpointRedoPtrForPrune(XLogRecPtr prunePtr);
void ProcessPendingRecords(bool fullSync = false);
void FreeRedoItem(RedoItem *item);
/* Dispatcher phases. */
void SendRecoveryEndMarkToWorkersAndWaitForFinish(int code);
void SendRecoveryEndMarkToWorkersAndWaitForReach(int code);
void WaitRedoFinish();
void WaitRealtimeBuildShutdown();
/* Dispatcher states. */
int GetDispatcherExitCode();

View File

@ -41,14 +41,25 @@
namespace ondemand_extreme_rto {
#define ONDEMAND_DISTRIBUTE_RATIO 0.9
#define ONDEMAND_DISTRIBUTE_RATIO 0.95
#define ONDEMAND_FORCE_PRUNE_RATIO 0.99
#define ONDEMAND_HASHTAB_SWITCH_LIMIT 100000
#define SEG_PROC_PIPELINE_SLOT 0
static const uint32 PAGE_WORK_QUEUE_SIZE = 65536;
static const uint32 REALTIME_BUILD_RECORD_QUEUE_SIZE = 4194304;
static const uint32 ONDEMAND_EXTREME_RTO_ALIGN_LEN = 16; /* need 128-bit aligned */
static const uint32 MAX_REMOTE_READ_INFO_NUM = 100;
static const uint32 ADVANCE_GLOBALLSN_INTERVAL = 1; /* unit second */
extern uint32 g_ondemandXLogParseMemFullValue;
extern uint32 g_ondemandXLogParseMemApproachFullVaule;
extern uint32 g_ondemandRealtimeBuildQueueFullValue;
typedef bool (*OndemandCheckPauseCB)(void);
typedef void (*OndemandRefreshPauseStatusCB)(void);
typedef enum {
REDO_BATCH,
REDO_PAGE_MNG,
@ -58,10 +69,13 @@ typedef enum {
REDO_READ_WORKER,
REDO_READ_PAGE_WORKER,
REDO_READ_MNG,
REDO_SEG_WORKER,
REDO_HTAB_MNG,
REDO_CTRL_WORKER,
REDO_ROLE_NUM,
} RedoRole;
typedef struct BadBlockRecEnt{
typedef struct BadBlockRecEnt {
RepairBlockKey key;
XLogPhyBlock pblk;
XLogRecPtr rec_min_lsn;
@ -173,7 +187,7 @@ struct PageRedoWorker {
PosixSemaphore phaseMarker;
MemoryContext oldCtx;
HTAB *redoItemHash;
ondemand_htab_ctrl_t *redoItemHashCtrl;
TimeLineID recoveryTargetTLI;
bool ArchiveRecoveryRequested;
bool StandbyModeRequested;
@ -186,6 +200,11 @@ struct PageRedoWorker {
RedoBufferManager bufferManager;
RedoTimeCost timeCostList[TIME_COST_NUM];
char page[BLCKSZ];
/* for ondemand realtime build */
XLogRecPtr nextPrunePtr;
bool inRealtimeBuild;
uint32 currentHtabBlockNum;
};
@ -223,7 +242,7 @@ void ClearBTreeIncompleteActions(PageRedoWorker *worker);
void *GetXLogInvalidPages(PageRedoWorker *worker);
bool RedoWorkerIsIdle(PageRedoWorker *worker);
void DumpPageRedoWorker(PageRedoWorker *worker);
PageRedoWorker *CreateWorker(uint32 id);
PageRedoWorker *CreateWorker(uint32 id, bool inRealtimeBuild);
extern void UpdateRecordGlobals(RedoItem *item, HotStandbyState standbyState);
void ReferenceRedoItem(void *item);
void DereferenceRedoItem(void *item);
@ -250,6 +269,11 @@ void RecordBadBlockAndPushToRemote(XLogBlockDataParse *datadecode, PageErrorType
const char *RedoWokerRole2Str(RedoRole role);
bool checkBlockRedoDoneFromHashMapAndLock(LWLock **lock, RedoItemTag redoItemTag, RedoItemHashEntry **redoItemEntry,
bool holdLock);
void RedoWorkerQueueCallBack();
void OndemandRequestPrimaryDoCkptIfNeed();
void GetOndemandRecoveryStatus(ondemand_recovery_stat *stat);
void ReleaseBlockParseStateIfNotReplay(XLogRecParseState *preState);
bool SSXLogParseRecordNeedReplayInOndemandRealtimeBuild(XLogRecParseState *redoblockstate);
} // namespace ondemand_extreme_rto
#endif

View File

@ -26,6 +26,12 @@
#include "access/xlogproc.h"
typedef enum {
PARSE_TYPE_DATA = 0,
PARSE_TYPE_DDL,
PARSE_TYPE_SEG,
} XLogRecParseType;
Size OndemandRecoveryShmemSize(void);
void OndemandRecoveryShmemInit(void);
void OndemandXlogFileIdCacheInit(void);
@ -39,5 +45,10 @@ XLogRecParseState *OndemandRedoReloadXLogRecord(XLogRecParseState *redoblockstat
void OndemandRedoReleaseXLogRecord(XLogRecParseState *reloadBlockState);
void OnDemandSendRecoveryEndMarkToWorkersAndWaitForReach(int code);
void OnDemandWaitRedoFinish();
void OnDemandWaitRealtimeBuildShutDown();
XLogRecPtr GetRedoLocInCheckpointRecord(XLogReaderState *record);
void OnDemandUpdateRealtimeBuildPrunePtr();
XLogRecParseType GetCurrentXLogRecParseType(XLogRecParseState *preState);
void WaitUntilRealtimeBuildStatusToFailoverAndUpdatePrunePtr();
#endif /* ONDEMAND_EXTREME_RTO_REDO_UTILS_H */

View File

@ -61,7 +61,7 @@
#define XLogSegmentsNum(val) (((val) * XLogBaseSize + XLogSegSize - 1) / XLogSegSize)
#define XLogPreReadSize 67108864 // 64MB
#define XLogPreReadSize 4194304 // 4MB
/* Compute XLogRecPtr with segment number and offset. */
#define XLogSegNoOffsetToRecPtr(segno, offset, dest) \

View File

@ -59,6 +59,16 @@ typedef void (*relasexlogreadstate)(void* record);
#define XLogBlockHeadGetCompressOpt(blockhead) ((blockhead)->opt)
#define XLogBlockHeadGetValidInfo(blockhead) ((blockhead)->block_valid)
#define XLogBlockHeadGetPhysicalBlock(blockhead) ((blockhead)->pblk)
#define XLogBlockHeadGetBufferTag(blockhead, buffertag) \
do { \
(buffertag)->rnode.spcNode = (blockhead)->spcNode; \
(buffertag)->rnode.dbNode = (blockhead)->dbNode; \
(buffertag)->rnode.relNode = (blockhead)->relNode; \
(buffertag)->rnode.bucketNode = (blockhead)->bucketNode; \
(buffertag)->rnode.opt = (blockhead)->opt; \
(buffertag)->forkNum = (blockhead)->forknum; \
(buffertag)->blockNum = (blockhead)->blkno; \
} while (0)
/* for common blockhead end */
/* for block data beging */
@ -101,7 +111,7 @@ extern void GetFlushBufferInfo(void *buf, RedoBufferInfo *bufferinfo, uint64 *bu
#define RedoBufferDirtyClear(bufferinfo) ((bufferinfo)->dirtyflag = false)
#define IsRedoBufferDirty(bufferinfo) ((bufferinfo)->dirtyflag == true)
#define RedoMemIsValid(memctl, bufferid) (((bufferid) > InvalidBuffer) && ((bufferid) <= (memctl->totalblknum)))
#define RedoMemIsValid(memctl, bufferid) (((bufferid) > InvalidBuffer) && ((uint32)(bufferid) <= (memctl->totalblknum)))
typedef struct {
RedoBufferTag blockinfo;
@ -655,15 +665,15 @@ typedef void (*InterruptFunc)();
typedef struct
{
int totalblknum; /* total slot */
int usedblknum; /* used slot */
uint32 totalblknum; /* total slot */
uint32 usedblknum; /* used slot */
Size itemsize;
Buffer firstfreeslot; /* first free slot */
Buffer firstreleaseslot; /* first release slot */
RedoMemSlot *memslot; /* slot itme */
bool isInit;
InterruptFunc doInterrupt;
}RedoMemManager;
} RedoMemManager;
typedef void (*RefOperateFunc)(void *record);
#ifdef USE_ASSERT_CHECKING
@ -1158,6 +1168,7 @@ XLogRecParseState* xlog_redo_parse_to_block(XLogReaderState* record, uint32* blo
XLogRecParseState* smgr_redo_parse_to_block(XLogReaderState* record, uint32* blocknum);
XLogRecParseState* segpage_redo_parse_to_block(XLogReaderState* record, uint32* blocknum);
void ProcSegPageCommonRedo(XLogRecParseState *parseState);
void SegPageRedoChildState(XLogRecParseState *childStateList);
void ProcSegPageJustFreeChildState(XLogRecParseState *parseState);
XLogRecParseState* XactXlogClogParseToBlock(XLogReaderState* record, XLogRecParseState* recordstatehead,
uint32* blocknum, TransactionId xid, int nsubxids, TransactionId* subxids, CLogXidStatus status);
@ -1289,5 +1300,6 @@ bool is_backup_end(const XLogRecParseState *parse_state);
void redo_atomic_xlog_dispatch(uint8 opCode, RedoBufferInfo *redo_buf, const char *data);
void seg_redo_new_page_copy_and_flush(BufferTag *tag, char *data, XLogRecPtr lsn);
void redo_target_page(const BufferTag& buf_tag, StandbyReadLsnInfoArray* lsn_info, Buffer base_page_buf);
void MarkSegPageRedoChildPageDirty(RedoBufferInfo *bufferinfo);
#endif

View File

@ -0,0 +1 @@
DROP FUNCTION IF EXISTS pg_catalog.ondemand_recovery_status() CASCADE;

View File

@ -0,0 +1 @@
DROP FUNCTION IF EXISTS pg_catalog.ondemand_recovery_status() CASCADE;

View File

@ -0,0 +1,15 @@
DROP FUNCTION IF EXISTS pg_catalog.ondemand_recovery_status() CASCADE;
SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 6991;
CREATE FUNCTION pg_catalog.ondemand_recovery_status(
out primary_checkpoint_redo_lsn text,
out realtime_build_replayed_lsn text,
out hashmap_used_blocks oid,
out hashmap_total_blocks oid,
out trxn_queue_blocks oid,
out seg_queue_blocks oid,
out in_ondemand_recovery boolean,
out ondemand_recovery_status text,
out realtime_build_status text,
out recovery_pause_status text
)
RETURNS SETOF record LANGUAGE INTERNAL as 'ondemand_recovery_status' stable;

View File

@ -0,0 +1,15 @@
DROP FUNCTION IF EXISTS pg_catalog.ondemand_recovery_status() CASCADE;
SET LOCAL inplace_upgrade_next_system_object_oids=IUO_PROC, 6991;
CREATE FUNCTION pg_catalog.ondemand_recovery_status(
out primary_checkpoint_redo_lsn text,
out realtime_build_replayed_lsn text,
out hashmap_used_blocks oid,
out hashmap_total_blocks oid,
out trxn_queue_blocks oid,
out seg_queue_blocks oid,
out in_ondemand_recovery boolean,
out ondemand_recovery_status text,
out realtime_build_status text,
out recovery_pause_status text
)
RETURNS SETOF record LANGUAGE INTERNAL as 'ondemand_recovery_status' stable;

View File

@ -32,7 +32,7 @@ extern "C" {
#define DMS_LOCAL_MINOR_VER_WEIGHT 1000
#define DMS_LOCAL_MAJOR_VERSION 0
#define DMS_LOCAL_MINOR_VERSION 0
#define DMS_LOCAL_VERSION 126
#define DMS_LOCAL_VERSION 127
#define DMS_SUCCESS 0
#define DMS_ERROR (-1)
@ -582,6 +582,7 @@ typedef enum en_dms_wait_event {
DMS_EVT_DCS_REQ_XA_OWNER_ID,
DMS_EVT_DCS_REQ_XA_IN_USE,
DMS_EVT_DCS_REQ_END_XA,
DMS_EVT_REQ_CKPT,
// add new enum at tail, or make adaptations to openGauss
DMS_EVT_COUNT,
@ -830,6 +831,7 @@ typedef void (*dms_thread_init_t)(unsigned char need_startup, char **reg_data);
typedef void (*dms_thread_deinit_t)(void);
typedef int (*dms_get_db_primary_id)(void *db_handle, unsigned int *primary_id);
typedef int (*dms_opengauss_ondemand_redo_buffer)(void *block_key, int *redo_status);
typedef int (*dms_opengauss_do_ckpt_immediate)(unsigned long long *ckpt_loc);
// for ssl
typedef int(*dms_decrypt_pwd_t)(const char *cipher, unsigned int len, char *plain, unsigned int size);
@ -965,6 +967,7 @@ typedef struct st_dms_callback {
dms_get_opengauss_update_xid get_opengauss_update_xid;
dms_get_opengauss_txn_status get_opengauss_txn_status;
dms_opengauss_lock_buffer opengauss_lock_buffer;
dms_opengauss_do_ckpt_immediate opengauss_do_ckpt_immediate;
dms_get_txn_snapshot get_txn_snapshot;
dms_get_opengauss_txn_snapshot get_opengauss_txn_snapshot;
dms_get_opengauss_txn_of_master get_opengauss_txn_of_master;

View File

@ -131,6 +131,10 @@
#define BUF_DIRTY_NEED_FLUSH 0x100
#define BUF_ERTO_NEED_MARK_DIRTY 0x200
#define BUF_READ_MODE_ONDEMAND_REALTIME_BUILD 0x400
/* mark buffer is pinned in ondemand realtime build, which do not allow eliminated */
#define BUF_IS_ONDEMAND_REALTIME_BUILD_PINNED 0x800
#define SS_BROADCAST_FAILED_RETRYCOUNTS 4
#define SS_BROADCAST_WAIT_INFINITE (0xFFFFFFFF)
#define SS_BROADCAST_WAIT_FIVE_SECONDS (5000)

View File

@ -91,6 +91,7 @@ typedef struct st_ss_dms_func {
int (*dms_info)(char *buf, unsigned int len, dms_info_id_e id);
void (*dms_get_buf_res)(unsigned long long *row_id, dv_drc_buf_info *drc_info, int type);
void (*dms_get_cmd_stat)(int index, wait_cmd_stat_result_t *cmd_stat_result);
int (*dms_req_opengauss_immediate_ckpt)(dms_context_t *dms_ctx, unsigned long long *ckpt_loc);
} ss_dms_func_t;
int ss_dms_func_init();
@ -140,7 +141,7 @@ int dms_reform_req_opengauss_ondemand_redo_buffer(dms_context_t *dms_ctx, void *
int *redo_status);
unsigned int dms_get_mes_max_watting_rooms(void);
int dms_send_opengauss_oldest_xmin(dms_context_t *dms_ctx, unsigned long long oldest_xmin, unsigned char dest_id);
int dms_req_opengauss_immediate_checkpoint(dms_context_t *dms_ctx, unsigned long long *redo_lsn);
int get_drc_info(int *is_found, dv_drc_buf_info *drc_info);
int dms_info(char *buf, unsigned int len, dms_info_id_e id);
void dms_get_buf_res(unsigned long long *row_id, dv_drc_buf_info *drc_info, int type);

View File

@ -30,6 +30,7 @@
#define GetDmsBufCtrl(id) (&t_thrd.storage_cxt.dmsBufCtl[(id)])
#define SS_BUF_MAX_WAIT_TIME (1000L * 1000 * 20) // 20s
#define SS_BUF_WAIT_TIME_IN_ONDEMAND_REALTIME_BUILD (100000L) // 100ms
#define DmsInitLatch(drid, _type, _oid, _idx, _parent_part, _part, _uid) \
do { \
@ -88,4 +89,8 @@ bool SSOndemandRequestPrimaryRedo(BufferTag tag);
bool SSLWLockAcquireTimeout(LWLock* lock, LWLockMode mode);
bool SSWaitIOTimeout(BufferDesc *buf);
void buftag_get_buf_info(BufferTag tag, stat_buf_info_t *buf_info);
Buffer SSReadBuffer(BufferTag *tag, ReadBufferMode mode);
void DmsReleaseBuffer(int buffer, bool is_seg);
bool SSRequestPageInOndemandRealtimeBuild(BufferTag *bufferTag, XLogRecPtr recordLsn, XLogRecPtr *pageLsn);
bool SSOndemandRealtimeBuildAllowFlush(BufferDesc *buf);
#endif

View File

@ -40,15 +40,25 @@
#define SS_REPLAYED_BY_ONDEMAND (ENABLE_DMS && !SS_IN_ONDEMAND_RECOVERY && \
t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandBuildDone == true && \
t_thrd.shemem_ptr_cxt.XLogCtl->IsOnDemandRedoDone == true)
#define SS_ONDEMAND_REALTIME_BUILD_DISABLED (ENABLE_DMS && \
g_instance.dms_cxt.SSRecoveryInfo.ondemand_realtime_build_status == DISABLED)
#define SS_ONDEMAND_REALTIME_BUILD_READY_TO_BUILD (ENABLE_DMS && \
g_instance.dms_cxt.SSRecoveryInfo.ondemand_realtime_build_status == READY_TO_BUILD)
#define SS_ONDEMAND_REALTIME_BUILD_NORMAL (ENABLE_DMS && \
g_instance.dms_cxt.SSRecoveryInfo.ondemand_realtime_build_status == BUILD_NORMAL)
#define SS_ONDEMAND_REALTIME_BUILD_SHUTDOWN (ENABLE_DMS && \
g_instance.dms_cxt.SSRecoveryInfo.ondemand_realtime_build_status == BUILD_TO_DISABLED)
#define SS_ONDEMAND_REALTIME_BUILD_FAILOVER (ENABLE_DMS && \
g_instance.dms_cxt.SSRecoveryInfo.ondemand_realtime_build_status == BUILD_TO_REDO)
#define SS_ONDEMAND_RECOVERY_PAUSE (ENABLE_DMS && \
g_instance.dms_cxt.SSRecoveryInfo.ondemand_recovery_pause_status != NOT_PAUSE)
#define SS_ONDEMAND_RECOVERY_HASHMAP_FULL (ENABLE_DMS && \
g_instance.dms_cxt.SSRecoveryInfo.ondemand_recovery_pause_status == PAUSE_FOR_PRUNE_HASHMAP)
#define SS_ONDEMAND_RECOVERY_TRXN_QUEUE_FULL (ENABLE_DMS && \
g_instance.dms_cxt.SSRecoveryInfo.ondemand_recovery_pause_status == PAUSE_FOR_PRUNE_TRXN_QUEUE)
#define REFORM_CTRL_VERSION 1
typedef struct st_old_reformer_ctrl {
uint64 list_stable; // stable instances list
int primaryInstId;
pg_crc32c crc;
} ss_old_reformer_ctrl_t;
typedef struct st_reformer_ctrl {
uint32 version;
uint64 list_stable; // stable instances list
@ -84,6 +94,35 @@ typedef enum st_failover_ckpt_status {
ALLOW_CKPT
} failover_ckpt_status_t;
typedef enum st_ondemand_realtime_build_status {
DISABLED = 0,
BUILD_NORMAL,
READY_TO_BUILD,
BUILD_TO_DISABLED,
BUILD_TO_REDO
} ondemand_realtime_build_status_t;
typedef enum st_ondemand_recovery_pause_status {
NOT_PAUSE = 0,
PAUSE_FOR_SYNC_REDO,
PAUSE_FOR_PRUNE_HASHMAP,
PAUSE_FOR_PRUNE_SEG_QUEUE,
PAUSE_FOR_PRUNE_TRXN_QUEUE
} ondemand_recovery_pause_status_t;
typedef struct ondemand_recovery_stat {
XLogRecPtr checkpointPtr;
XLogRecPtr replayedPtr;
uint32 hmpUsedBlkNum;
uint32 hmpTotalBlkNum;
uint32 trxnQueueNum;
uint32 segQueueNum;
bool inOndemandRecovery;
SSGlobalClusterState ondemandRecoveryStatus;
ondemand_realtime_build_status_t realtimeBuildStatus;
ondemand_recovery_pause_status_t recoveryPauseStatus;
} ondemand_recovery_stat;
typedef struct ss_recovery_info {
bool recovery_pause_flag;
volatile failover_ckpt_status_t failover_ckpt_status;
@ -103,9 +142,17 @@ typedef struct ss_recovery_info {
bool startup_need_exit_normally; //used in alive failover
bool recovery_trapped_in_page_request; //used in alive failover
bool in_ondemand_recovery;
volatile ondemand_realtime_build_status_t ondemand_realtime_build_status;
bool dorado_sharestorage_inited; // used in dorado mode
volatile ondemand_recovery_pause_status_t ondemand_recovery_pause_status;
} ss_recovery_info_t;
typedef struct ondemand_htab_ctrl {
HTAB *hTab;
void *nextHTabCtrl;
XLogRecPtr maxRedoItemPtr;
} ondemand_htab_ctrl_t;
extern bool SSRecoveryNodes();
extern void SSWaitStartupExit();
extern int SSGetPrimaryInstId();
@ -115,6 +162,9 @@ extern bool SSRecoveryApplyDelay();
extern void SShandle_promote_signal();
extern void ss_failover_dw_init();
extern void ss_switchover_promoting_dw_init();
extern XLogRecPtr SSOndemandRequestPrimaryCkptAndGetRedoLsn();
void StartupOndemandRecovery();
void OndemandRealtimeBuildHandleFailover();
#endif

View File

@ -100,6 +100,7 @@ typedef struct knl_instance_attr_dms {
bool enable_catalog_centralized;
bool enable_dss_aio;
bool enable_verify_page;
bool enable_ondemand_realtime_build;
bool enable_ondemand_recovery;
int ondemand_recovery_mem_size;
int instance_id;

View File

@ -757,7 +757,7 @@ typedef struct knl_g_parallel_redo_context {
char* ali_buf;
XLogRedoNumStatics xlogStatics[RM_NEXT_ID][MAX_XLOG_INFO_NUM];
RedoCpuBindControl redoCpuBindcontrl;
HTAB **redoItemHash; /* used in ondemand extreme RTO */
ondemand_htab_ctrl_t **redoItemHashCtrl; /* used in ondemand extreme RTO */
/* extreme-rto standby read */
TransactionId exrto_recyle_xmin;
XLogRecPtr global_recycle_lsn;

View File

@ -212,6 +212,16 @@ read page worker get a record make lsn forwarder get new item
startup get a record check stop delay redo dispatch(total)
decode null null null
for ondemand extreme rto
thread step1 step2 step3 step4
step5 step6 step7 step8
seg redo worker get a record redo record(total) redo seg xlog get a record
null null null null
hashmap manager prune seg record prune hashmap(history) prune hashmap(lastest) get a record(instruct)
null null null null
ctrl worker update usedblknum request primary ckpt null null
null null null null
for parallel redo
thread step1 step2 step3 step4 step5
step6 step7 step8 step9

View File

@ -75,7 +75,10 @@ typedef enum {
RBM_ZERO_ON_ERROR, /* Read, but return an all-zeros page on error */
RBM_NORMAL_NO_LOG, /* Don't log page as invalid during WAL
* replay; otherwise same as RBM_NORMAL */
RBM_FOR_REMOTE /* Like RBM_NORMAL, but not remote read again when PageIsVerified failed. */
RBM_FOR_REMOTE, /* Like RBM_NORMAL, but not remote read again when PageIsVerified failed. */
RBM_FOR_ONDEMAND_REALTIME_BUILD /* Like RBM_NORMAL, only used in ondemand realtime time
* build (shared storage mode), need newest page by DMS,
* but do not load from disk */
} ReadBufferMode;
typedef enum

View File

@ -1876,6 +1876,7 @@ extern Datum compress_ratio_info(PG_FUNCTION_ARGS);
extern Datum compress_statistic_info(PG_FUNCTION_ARGS);
extern Datum pg_read_binary_file_blocks(PG_FUNCTION_ARGS);
extern Datum dss_io_stat(PG_FUNCTION_ARGS);
extern Datum get_ondemand_recovery_status(PG_FUNCTION_ARGS);
/* plhandler.cpp */
extern Datum generate_procoverage_report(PG_FUNCTION_ARGS);