From cc7e988501548db8bd1468e8f60be85fdcfeb64e Mon Sep 17 00:00:00 2001 From: xue_meng_en <1836611252@qq.com> Date: Sat, 10 Jun 2023 11:39:25 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=81=E5=BC=8F=E5=AE=B9=E7=81=BEbugfix?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/bin/gs_guc/cluster_guc.conf | 1 + .../backend/pgxc_single/barrier/barrier.cpp | 6 +++--- src/common/backend/utils/cache/partcache.cpp | 2 +- src/common/backend/utils/cache/relcache.cpp | 2 +- src/common/backend/utils/init/globals.cpp | 2 +- src/common/backend/utils/misc/guc.cpp | 2 +- src/gausskernel/optimizer/plan/streamplan.cpp | 6 +++--- .../process/postmaster/postmaster.cpp | 8 ++++---- .../storage/access/transam/csnlog.cpp | 2 +- .../storage/access/transam/transam.cpp | 5 +++-- src/gausskernel/storage/access/transam/xact.cpp | 6 +++--- src/gausskernel/storage/access/transam/xlog.cpp | 16 ++++++++-------- .../storage/access/transam/xlogfuncs.cpp | 14 +++++++++++++- src/gausskernel/storage/ipc/ipci.cpp | 2 +- src/gausskernel/storage/ipc/procarray.cpp | 14 +++++++------- src/gausskernel/storage/ipc/standby.cpp | 4 ++-- .../storage/replication/archive_walreceiver.cpp | 2 +- .../storage/replication/libpqwalreceiver.cpp | 2 +- .../storage/replication/slotfuncs.cpp | 2 +- .../storage/replication/walreceiver.cpp | 2 +- .../storage/replication/walsender.cpp | 2 +- src/include/catalog/upgrade_sql/set_guc/add_guc | 1 + src/include/replication/walreceiver.h | 10 ++++++---- .../regress/output/recovery_2pc_tools.source | 1 + 24 files changed, 66 insertions(+), 48 deletions(-) diff --git a/src/bin/gs_guc/cluster_guc.conf b/src/bin/gs_guc/cluster_guc.conf index 86d904f1e..84468cc6f 100755 --- a/src/bin/gs_guc/cluster_guc.conf +++ b/src/bin/gs_guc/cluster_guc.conf @@ -693,6 +693,7 @@ disable_memory_protect|bool|0,0|NULL|NULL| segment_buffers|int|16,1073741823|kB|NULL| undo_zone_count|int|0,1048576|NULL|NULL| cluster_run_mode|enum|cluster_primary,cluster_standby|NULL|NULL| +stream_cluster_run_mode|enum|cluster_primary,cluster_standby|NULL|NULL| xlog_file_size|int64|1048576,576460752303423487|B|The value must be an integer multiple of 16777216(16M)| xlog_file_path|string|0,0|NULL|NULL| plsql_show_all_error|bool|0,0|NULL|NULL| diff --git a/src/common/backend/pgxc_single/barrier/barrier.cpp b/src/common/backend/pgxc_single/barrier/barrier.cpp index 33ecbb7c3..eef9408f1 100755 --- a/src/common/backend/pgxc_single/barrier/barrier.cpp +++ b/src/common/backend/pgxc_single/barrier/barrier.cpp @@ -459,7 +459,7 @@ void barrier_redo(XLogReaderState* record) /* Nothing to do */ XLogRecPtr barrierLSN = record->EndRecPtr; char* barrierId = XLogRecGetData(record); - if (IS_HADR_BARRIER(barrierId) && IS_DISASTER_RECOVER_MODE) { + if (IS_HADR_BARRIER(barrierId) && IS_MULTI_DISASTER_RECOVER_MODE) { ereport(WARNING, (errmsg("The HADR barrier %s is not for streaming standby cluster", barrierId))); return; } @@ -496,7 +496,7 @@ void barrier_redo(XLogReaderState* record) t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo = csn + 1; } - if (!IS_DISASTER_RECOVER_MODE || XLogRecPtrIsInvalid(t_thrd.xlog_cxt.minRecoveryPoint) || + if (!IS_MULTI_DISASTER_RECOVER_MODE || XLogRecPtrIsInvalid(t_thrd.xlog_cxt.minRecoveryPoint) || XLByteLT(barrierLSN, t_thrd.xlog_cxt.minRecoveryPoint) || t_thrd.shemem_ptr_cxt.ControlFile->backupEndRequired) { return; @@ -1047,7 +1047,7 @@ static void barrier_redo_pause(char* barrierId) RedoInterruptCallBack(); if (IS_OBS_DISASTER_RECOVER_MODE) { update_recovery_barrier(); - } else if (IS_DISASTER_RECOVER_MODE) { + } else if (IS_MULTI_DISASTER_RECOVER_MODE) { RequestXLogStreamForBarrier(); } ereport(DEBUG4, ((errmodule(MOD_REDO), errcode(ERRCODE_LOG), diff --git a/src/common/backend/utils/cache/partcache.cpp b/src/common/backend/utils/cache/partcache.cpp index 5b0f9a288..89f8fba99 100644 --- a/src/common/backend/utils/cache/partcache.cpp +++ b/src/common/backend/utils/cache/partcache.cpp @@ -246,7 +246,7 @@ Partition PartitionBuildDesc(Oid targetPartId, StorageType storage_type, bool in /* Assign value in partitiongetrelation. */ partition->partMap = NULL; - if (IS_DISASTER_RECOVER_MODE) { + if (IS_MULTI_DISASTER_RECOVER_MODE) { TransactionId xmin = HeapTupleGetRawXmin(pg_partition_tuple); partition->xmin_csn = CSNLogGetDRCommitSeqNo(xmin); } else { diff --git a/src/common/backend/utils/cache/relcache.cpp b/src/common/backend/utils/cache/relcache.cpp index 14eab549e..a2fa88344 100755 --- a/src/common/backend/utils/cache/relcache.cpp +++ b/src/common/backend/utils/cache/relcache.cpp @@ -2372,7 +2372,7 @@ Relation RelationBuildDesc(Oid targetRelId, bool insertIt, bool buildkey) relation->rd_createcsn = csnInfo.createcsn; relation->rd_changecsn = csnInfo.changecsn; - if (relation->rd_id >= FirstNormalObjectId && IS_DISASTER_RECOVER_MODE) { + if (relation->rd_id >= FirstNormalObjectId && IS_MULTI_DISASTER_RECOVER_MODE) { TransactionId xmin = HeapTupleGetRawXmin(pg_class_tuple); relation->xmin_csn = CSNLogGetDRCommitSeqNo(xmin); } else { diff --git a/src/common/backend/utils/init/globals.cpp b/src/common/backend/utils/init/globals.cpp index 604642ab6..d78a25b9b 100644 --- a/src/common/backend/utils/init/globals.cpp +++ b/src/common/backend/utils/init/globals.cpp @@ -59,7 +59,7 @@ bool open_join_children = true; bool will_shutdown = false; /* hard-wired binary version number */ -const uint32 GRAND_VERSION_NUM = 92848; +const uint32 GRAND_VERSION_NUM = 92849; const uint32 SRF_FUSION_VERSION_NUM = 92847; const uint32 INNER_UNIQUE_VERSION_NUM = 92845; diff --git a/src/common/backend/utils/misc/guc.cpp b/src/common/backend/utils/misc/guc.cpp index 9bf5bfb2a..1bd48ca67 100755 --- a/src/common/backend/utils/misc/guc.cpp +++ b/src/common/backend/utils/misc/guc.cpp @@ -4370,7 +4370,7 @@ static void InitConfigureNamesEnum() NULL}, {{"stream_cluster_run_mode", PGC_POSTMASTER, - NODE_DISTRIBUTE, + NODE_ALL, PRESET_OPTIONS, gettext_noop("Sets the type of streaming cluster."), NULL}, diff --git a/src/gausskernel/optimizer/plan/streamplan.cpp b/src/gausskernel/optimizer/plan/streamplan.cpp index 961585917..15d2e1c80 100644 --- a/src/gausskernel/optimizer/plan/streamplan.cpp +++ b/src/gausskernel/optimizer/plan/streamplan.cpp @@ -1217,7 +1217,7 @@ NodeDefinition* get_all_datanodes_def() NodeDefinition* nodeDefArray = NULL; int rc = 0; - if (IS_DISASTER_RECOVER_MODE) { + if (IS_MULTI_DISASTER_RECOVER_MODE) { PgxcNodeGetOidsForInit(NULL, &dn_node_arr, NULL, &dn_node_num, NULL, false); } else { PgxcNodeGetOids(NULL, &dn_node_arr, NULL, &dn_node_num, false); @@ -1230,7 +1230,7 @@ NodeDefinition* get_all_datanodes_def() pfree_ext(dn_node_arr); dn_node_arr = NULL; } - if (IS_DISASTER_RECOVER_MODE) { + if (IS_MULTI_DISASTER_RECOVER_MODE) { PgxcNodeGetOidsForInit(NULL, &dn_node_arr, NULL, &dn_node_num, NULL, false); } else { PgxcNodeGetOids(NULL, &dn_node_arr, NULL, &dn_node_num, false); @@ -1249,7 +1249,7 @@ NodeDefinition* get_all_datanodes_def() nodeDefArray = (NodeDefinition*)palloc(sizeof(NodeDefinition) * u_sess->pgxc_cxt.NumDataNodes); NodeDefinition* res = NULL; for (i = 0; i < u_sess->pgxc_cxt.NumDataNodes; i++) { - if (!IS_DISASTER_RECOVER_MODE) { + if (!IS_MULTI_DISASTER_RECOVER_MODE) { Oid current_primary_oid = PgxcNodeGetPrimaryDNFromMatric(dn_node_arr[i]); res = PgxcNodeGetDefinition(current_primary_oid); } else { diff --git a/src/gausskernel/process/postmaster/postmaster.cpp b/src/gausskernel/process/postmaster/postmaster.cpp index 8380d4b99..944b6c992 100644 --- a/src/gausskernel/process/postmaster/postmaster.cpp +++ b/src/gausskernel/process/postmaster/postmaster.cpp @@ -456,7 +456,7 @@ bool PMstateIsRun(void); #define GTM_LITE_CN (GTM_LITE_MODE && IS_PGXC_COORDINATOR) #ifdef ENABLE_MULTIPLE_NODES -#define START_BARRIER_CREATOR (IS_PGXC_COORDINATOR && !IS_DISASTER_RECOVER_MODE) +#define START_BARRIER_CREATOR (IS_PGXC_COORDINATOR && !IS_MULTI_DISASTER_RECOVER_MODE) #else #define START_BARRIER_CREATOR IS_PGXC_DATANODE #endif @@ -3917,7 +3917,7 @@ static int ServerLoop(void) * pmState is PM_HOT_STANDBY, neither PM_RECOVERY nor PM_RUN */ if (pmState == PM_HOT_STANDBY && g_instance.pid_cxt.BarrierPreParsePID == 0 && - !dummyStandbyMode && IS_DISASTER_RECOVER_MODE) { + !dummyStandbyMode && IS_MULTI_DISASTER_RECOVER_MODE) { g_instance.pid_cxt.BarrierPreParsePID = initialize_util_thread(BARRIER_PREPARSE); } #endif @@ -4951,7 +4951,7 @@ int ProcessStartupPacket(Port* port, bool SSLdone) errmsg("can not accept connection in pending mode."))); } else { #ifdef ENABLE_MULTIPLE_NODES - if (STANDBY_MODE == hashmdata->current_mode && (!IS_DISASTER_RECOVER_MODE || GTM_FREE_MODE || + if (STANDBY_MODE == hashmdata->current_mode && (!IS_MULTI_DISASTER_RECOVER_MODE || GTM_FREE_MODE || g_instance.attr.attr_storage.recovery_parse_workers > 1)) { ereport(ERROR, (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("can not accept connection in standby mode."))); @@ -9361,7 +9361,7 @@ static void handle_begin_hot_standby() ereport(LOG, (errmsg("database system is ready to accept read only connections"))); #ifdef ENABLE_MULTIPLE_NODES - if (IS_DISASTER_RECOVER_MODE && g_instance.pid_cxt.BarrierPreParsePID == 0) { + if (IS_MULTI_DISASTER_RECOVER_MODE && g_instance.pid_cxt.BarrierPreParsePID == 0) { g_instance.pid_cxt.BarrierPreParsePID = initialize_util_thread(BARRIER_PREPARSE); } #endif diff --git a/src/gausskernel/storage/access/transam/csnlog.cpp b/src/gausskernel/storage/access/transam/csnlog.cpp index f2b19fc24..d24a2d6e7 100644 --- a/src/gausskernel/storage/access/transam/csnlog.cpp +++ b/src/gausskernel/storage/access/transam/csnlog.cpp @@ -155,7 +155,7 @@ void CSNLogSetCommitSeqNo(TransactionId xid, int nsubxids, TransactionId *subxid pageno = TransactionIdToCSNPage(subxids[offset]); xid = InvalidTransactionId; } - if (IS_DISASTER_RECOVER_MODE && COMMITSEQNO_IS_COMMITTED(csn)) { + if (IS_MULTI_DISASTER_RECOVER_MODE && COMMITSEQNO_IS_COMMITTED(csn)) { UpdateXLogMaxCSN(csn); } } diff --git a/src/gausskernel/storage/access/transam/transam.cpp b/src/gausskernel/storage/access/transam/transam.cpp index c5a84957f..2a1167abe 100644 --- a/src/gausskernel/storage/access/transam/transam.cpp +++ b/src/gausskernel/storage/access/transam/transam.cpp @@ -155,7 +155,8 @@ RETRY: } Assert(TransactionIdIsValid(xid)); - if ((!IS_DISASTER_RECOVER_MODE) && (snapshot == NULL || !IsVersionMVCCSnapshot(snapshot)) && TransactionIdPrecedes(transactionId, xid)) { + if ((!IS_MULTI_DISASTER_RECOVER_MODE) && (snapshot == NULL || !IsVersionMVCCSnapshot(snapshot)) && + TransactionIdPrecedes(transactionId, xid)) { result = GetCSNByCLog(transactionId, isCommit); } else { uint32 saveInterruptHoldoffCount = t_thrd.int_cxt.InterruptHoldoffCount; @@ -171,7 +172,7 @@ RETRY: } PG_CATCH(); { - if ((IS_CN_DISASTER_RECOVER_MODE || IS_DISASTER_RECOVER_MODE) && + if ((IS_CN_DISASTER_RECOVER_MODE || IS_MULTI_DISASTER_RECOVER_MODE) && t_thrd.xact_cxt.slru_errcause == SLRU_OPEN_FAILED) { (void)MemoryContextSwitchTo(old); diff --git a/src/gausskernel/storage/access/transam/xact.cpp b/src/gausskernel/storage/access/transam/xact.cpp index 3a33d5db2..f60fd9a4f 100755 --- a/src/gausskernel/storage/access/transam/xact.cpp +++ b/src/gausskernel/storage/access/transam/xact.cpp @@ -7645,7 +7645,7 @@ void xact_redo(XLogReaderState *record) (void)TWOPAHSE_LWLOCK_ACQUIRE(xid, LW_EXCLUSIVE); PrepareRedoAdd(XLogRecGetData(record), record->ReadRecPtr, record->EndRecPtr); - if (IS_DISASTER_RECOVER_MODE) { + if (IS_MULTI_DISASTER_RECOVER_MODE) { TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) XLogRecGetData(record); XactLockTableInsert(hdr->xid); } @@ -7662,7 +7662,7 @@ void xact_redo(XLogReaderState *record) /* Delete TwoPhaseState gxact entry and/or 2PC file. */ (void)TWOPAHSE_LWLOCK_ACQUIRE(xlrec->xid, LW_EXCLUSIVE); PrepareRedoRemove(xlrec->xid, false); - if (IS_DISASTER_RECOVER_MODE) { + if (IS_MULTI_DISASTER_RECOVER_MODE) { XactLockTableDelete(xlrec->xid); } TWOPAHSE_LWLOCK_RELEASE(xlrec->xid); @@ -7674,7 +7674,7 @@ void xact_redo(XLogReaderState *record) /* Delete TwoPhaseState gxact entry and/or 2PC file. */ (void)TWOPAHSE_LWLOCK_ACQUIRE(xlrec->xid, LW_EXCLUSIVE); PrepareRedoRemove(xlrec->xid, false); - if (IS_DISASTER_RECOVER_MODE) { + if (IS_MULTI_DISASTER_RECOVER_MODE) { XactLockTableDelete(xlrec->xid); } TWOPAHSE_LWLOCK_RELEASE(xlrec->xid); diff --git a/src/gausskernel/storage/access/transam/xlog.cpp b/src/gausskernel/storage/access/transam/xlog.cpp index a912b63f2..5bb1ccf87 100755 --- a/src/gausskernel/storage/access/transam/xlog.cpp +++ b/src/gausskernel/storage/access/transam/xlog.cpp @@ -7707,7 +7707,7 @@ void TruncateAndRemoveXLogForRoachRestore(XLogReaderState *record) pfree(writeContent); } - if (((IS_OBS_DISASTER_RECOVER_MODE == false) && (IS_DISASTER_RECOVER_MODE == false)) || + if (((IS_OBS_DISASTER_RECOVER_MODE == false) && (IS_MULTI_DISASTER_RECOVER_MODE == false)) || (t_thrd.xlog_cxt.recoveryTarget == RECOVERY_TARGET_TIME_OBS)) durable_rename(RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE, FATAL); @@ -7879,7 +7879,7 @@ static bool recoveryStopsHere(XLogReaderState *record, bool *includeThis) } /* Do we have a PITR target at all? */ - if ((IS_OBS_DISASTER_RECOVER_MODE || IS_DISASTER_RECOVER_MODE) && + if ((IS_OBS_DISASTER_RECOVER_MODE || IS_MULTI_DISASTER_RECOVER_MODE) && t_thrd.xlog_cxt.recoveryTarget == RECOVERY_TARGET_UNSET) { // Save timestamp of latest transaction commit/abort if this is a // transaction record @@ -7982,7 +7982,7 @@ static bool recoveryStopsHere(XLogReaderState *record, bool *includeThis) pfree_ext(g_instance.roach_cxt.globalBarrierRecordForPITR); pfree_ext(g_instance.roach_cxt.targetRestoreTimeFromMedia); /* truncate XLOG after barrier time. */ - if (IS_OBS_DISASTER_RECOVER_MODE || IS_DISASTER_RECOVER_MODE) { + if (IS_OBS_DISASTER_RECOVER_MODE || IS_MULTI_DISASTER_RECOVER_MODE) { TruncateAndRemoveXLogForRoachRestore(record); xlogOff = (uint32)(record->EndRecPtr) % XLOG_BLCKSZ; if (xlogOff > 0 && xlogOff < XLOG_BLCKSZ) { @@ -10377,7 +10377,7 @@ void StartupXLOG(void) if (!recoveryContinue && (t_thrd.xlog_cxt.server_mode == PRIMARY_MODE || t_thrd.xlog_cxt.server_mode == NORMAL_MODE || (IS_OBS_DISASTER_RECOVER_MODE && (t_thrd.xlog_cxt.recoveryTarget != RECOVERY_TARGET_TIME_OBS)) || - IS_DISASTER_RECOVER_MODE)) { + IS_MULTI_DISASTER_RECOVER_MODE)) { extreme_rto::WaitAllRedoWorkerQueueEmpty(); break; } @@ -10415,7 +10415,7 @@ void StartupXLOG(void) SendRecoveryEndMarkToWorkersAndWaitForFinish(0); RecoveryXlogReader(oldXlogReader, xlogreader); - if (!(IS_OBS_DISASTER_RECOVER_MODE || IS_DISASTER_RECOVER_MODE)) { + if (!(IS_OBS_DISASTER_RECOVER_MODE || IS_MULTI_DISASTER_RECOVER_MODE)) { if (t_thrd.xlog_cxt.recoveryPauseAtTarget && reachedStopPoint) { SetRecoveryPause(true); recoveryPausesHere(); @@ -10501,7 +10501,7 @@ void StartupXLOG(void) */ XLogCheckInvalidPages(); - if (IS_DISASTER_RECOVER_MODE) { + if (IS_MULTI_DISASTER_RECOVER_MODE) { ereport(LOG, (errmsg("reach stop barrier wait startupxlog here"))); while (reachedStopPoint) { pg_usleep(1000000L); /* 1000 ms */ @@ -11146,7 +11146,7 @@ static void sendPMBeginHotStby() * If we are in cluster-standby-mode, we need launch barreir preparse * thread from the minrecoverypoint point. */ - if (IS_DISASTER_RECOVER_MODE && g_instance.pid_cxt.BarrierPreParsePID == 0) { + if (IS_MULTI_DISASTER_RECOVER_MODE && g_instance.pid_cxt.BarrierPreParsePID == 0) { SetBarrierPreParseLsn(t_thrd.xlog_cxt.minRecoveryPoint); } #endif @@ -13039,7 +13039,7 @@ bool CreateRestartPoint(int flags) * in csnlog.c). When hot standby is disabled, though, we mustn't do * this because StartupCSNLOG hasn't been called yet. */ - if (g_instance.attr.attr_storage.EnableHotStandby && !IS_DISASTER_RECOVER_MODE) { + if (g_instance.attr.attr_storage.EnableHotStandby && !IS_MULTI_DISASTER_RECOVER_MODE) { pg_time_t now; int elapsed_secs; now = (pg_time_t)time(NULL); diff --git a/src/gausskernel/storage/access/transam/xlogfuncs.cpp b/src/gausskernel/storage/access/transam/xlogfuncs.cpp index 9d21337ef..2636fcfae 100755 --- a/src/gausskernel/storage/access/transam/xlogfuncs.cpp +++ b/src/gausskernel/storage/access/transam/xlogfuncs.cpp @@ -1532,7 +1532,7 @@ Datum gs_get_local_barrier_status(PG_FUNCTION_ARGS) // archive_LSN max archive xlog LSN // flush_LSN max flush xlog LSN - if (IS_OBS_DISASTER_RECOVER_MODE || IS_CN_OBS_DISASTER_RECOVER_MODE || IS_DISASTER_RECOVER_MODE) { + if (IS_OBS_DISASTER_RECOVER_MODE || IS_CN_OBS_DISASTER_RECOVER_MODE || IS_MULTI_DISASTER_RECOVER_MODE) { SpinLockAcquire(&walrcv->mutex); rc = strncpy_s((char *)barrierId, MAX_BARRIER_ID_LENGTH, (char *)walrcv->lastRecoveredBarrierId, MAX_BARRIER_ID_LENGTH - 1); @@ -2417,6 +2417,10 @@ Datum gs_pitr_archive_slot_force_advance(PG_FUNCTION_ARGS) Datum gs_get_standby_cluster_barrier_status(PG_FUNCTION_ARGS) { +#ifndef ENABLE_MULTIPLE_NODES + DISTRIBUTED_FEATURE_NOT_SUPPORTED(); +#endif + volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv; XLogRecPtr barrierLsn = InvalidXLogRecPtr; char lastestbarrierId[MAX_BARRIER_ID_LENGTH] = {0}; @@ -2496,6 +2500,10 @@ Datum gs_get_standby_cluster_barrier_status(PG_FUNCTION_ARGS) Datum gs_set_standby_cluster_target_barrier_id(PG_FUNCTION_ARGS) { +#ifndef ENABLE_MULTIPLE_NODES + DISTRIBUTED_FEATURE_NOT_SUPPORTED(); +#endif + volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv; text *barrier = PG_GETARG_TEXT_P(0); char *barrierstr = NULL; @@ -2542,6 +2550,10 @@ Datum gs_set_standby_cluster_target_barrier_id(PG_FUNCTION_ARGS) Datum gs_query_standby_cluster_barrier_id_exist(PG_FUNCTION_ARGS) { +#ifndef ENABLE_MULTIPLE_NODES + DISTRIBUTED_FEATURE_NOT_SUPPORTED(); +#endif + text *barrier = PG_GETARG_TEXT_P(0); char *barrierstr = NULL; CommitSeqNo *hentry = NULL; diff --git a/src/gausskernel/storage/ipc/ipci.cpp b/src/gausskernel/storage/ipc/ipci.cpp index d243ca461..4951965c3 100644 --- a/src/gausskernel/storage/ipc/ipci.cpp +++ b/src/gausskernel/storage/ipc/ipci.cpp @@ -416,7 +416,7 @@ void CreateSharedMemoryAndSemaphores(bool makePrivate, int port) InitSegSpcCache(); #ifdef ENABLE_MULTIPLE_NODES - if (IS_DISASTER_RECOVER_MODE) { + if (IS_MULTI_DISASTER_RECOVER_MODE) { InitDisasterCache(); } #endif diff --git a/src/gausskernel/storage/ipc/procarray.cpp b/src/gausskernel/storage/ipc/procarray.cpp index ac72ac86d..55b140884 100755 --- a/src/gausskernel/storage/ipc/procarray.cpp +++ b/src/gausskernel/storage/ipc/procarray.cpp @@ -1960,7 +1960,7 @@ Snapshot GetSnapshotData(Snapshot snapshot, bool force_local_snapshot) t_thrd.xact_cxt.useLocalSnapshot = false; - if ((IS_DISASTER_RECOVER_MODE && !is_exec_dn) || + if ((IS_MULTI_DISASTER_RECOVER_MODE && !is_exec_dn) || (GTM_LITE_MODE && ((is_exec_cn && !force_local_snapshot) || /* GTM_LITE exec cn */ (!is_exec_cn && u_sess->utils_cxt.snapshot_source == SNAPSHOT_COORDINATOR)))) { /* GTM_LITE other node */ @@ -3063,7 +3063,7 @@ VirtualTransactionId *GetConflictingVirtualXIDs(TransactionId limitXmin, Oid dbO } } #else - if (!IS_DISASTER_RECOVER_MODE) { + if (!IS_MULTI_DISASTER_RECOVER_MODE) { break; } CommitSeqNo xact_csn = pgxact->csn_dr; @@ -4023,7 +4023,7 @@ static bool GetPGXCSnapshotData(Snapshot snapshot) * If this node is in recovery phase, * snapshot has to be taken directly from WAL information. */ - if (!IS_DISASTER_RECOVER_MODE && RecoveryInProgress()) + if (!IS_MULTI_DISASTER_RECOVER_MODE && RecoveryInProgress()) return false; /* @@ -4094,7 +4094,7 @@ static bool GetSnapshotDataDataNode(Snapshot snapshot) GTM_Snapshot gtm_snapshot; ereport(DEBUG1, (errmsg("Getting snapshot for autovacuum. Current XID = " XID_FMT, GetCurrentTransactionIdIfAny()))); - gtm_snapshot = IS_DISASTER_RECOVER_MODE ? GetSnapshotGTMDR() : GetSnapshotGTMLite(); + gtm_snapshot = IS_MULTI_DISASTER_RECOVER_MODE ? GetSnapshotGTMDR() : GetSnapshotGTMLite(); if (!gtm_snapshot) { if (g_instance.status > NoShutdown) { @@ -4127,7 +4127,7 @@ static bool GetSnapshotDataDataNode(Snapshot snapshot) TransactionId save_recentglobalxmin = u_sess->utils_cxt.RecentGlobalXmin; snapshot->gtm_snapshot_type = u_sess->utils_cxt.is_autovacuum_snapshot ? GTM_SNAPSHOT_TYPE_AUTOVACUUM : GTM_SNAPSHOT_TYPE_GLOBAL; - if (IS_DISASTER_RECOVER_MODE) { + if (IS_MULTI_DISASTER_RECOVER_MODE) { snapshot->snapshotcsn = u_sess->utils_cxt.g_snapshotcsn; t_thrd.pgxact->csn_dr = snapshot->snapshotcsn; pg_memory_barrier(); @@ -4181,7 +4181,7 @@ static bool GetSnapshotDataCoordinator(Snapshot snapshot) ereport(DEBUG1, (errmsg("Getting snapshot. Current XID = " XID_FMT, GetCurrentTransactionIdIfAny()))); } - gtm_snapshot = IS_DISASTER_RECOVER_MODE ? GetSnapshotGTMDR() : GetSnapshotGTMLite(); + gtm_snapshot = IS_MULTI_DISASTER_RECOVER_MODE ? GetSnapshotGTMDR() : GetSnapshotGTMLite(); if (!gtm_snapshot) { if (g_instance.status > NoShutdown) { @@ -4195,7 +4195,7 @@ static bool GetSnapshotDataCoordinator(Snapshot snapshot) } else { snapshot->gtm_snapshot_type = GTM_SNAPSHOT_TYPE_GLOBAL; *u_sess->utils_cxt.g_GTM_Snapshot = *gtm_snapshot; - if (IS_DISASTER_RECOVER_MODE) { + if (IS_MULTI_DISASTER_RECOVER_MODE) { snapshot->snapshotcsn = gtm_snapshot->csn; t_thrd.pgxact->csn_dr = snapshot->snapshotcsn; LWLockAcquire(XLogMaxCSNLock, LW_SHARED); diff --git a/src/gausskernel/storage/ipc/standby.cpp b/src/gausskernel/storage/ipc/standby.cpp index eac45526e..d02342920 100755 --- a/src/gausskernel/storage/ipc/standby.cpp +++ b/src/gausskernel/storage/ipc/standby.cpp @@ -288,7 +288,7 @@ void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, const R return; CommitSeqNo limitXminCSN = InvalidCommitSeqNo; - if (IS_DISASTER_RECOVER_MODE) { + if (IS_MULTI_DISASTER_RECOVER_MODE) { limitXminCSN = CSNLogGetDRCommitSeqNo(latestRemovedXid); while (true) { CommitSeqNo lastReplayedConflictCSN = (CommitSeqNo)pg_atomic_read_u64( @@ -1246,7 +1246,7 @@ bool StandbySafeRestartpoint(void) { #ifdef ENABLE_MULTIPLE_NODES ListCell* l = NULL; - if (t_thrd.xlog_cxt.committing_csn_list && IS_DISASTER_RECOVER_MODE) { + if (t_thrd.xlog_cxt.committing_csn_list && IS_MULTI_DISASTER_RECOVER_MODE) { foreach (l, t_thrd.xlog_cxt.committing_csn_list) { TransactionId* action = (TransactionId*)lfirst(l); if (log_min_messages <= DEBUG4) { diff --git a/src/gausskernel/storage/replication/archive_walreceiver.cpp b/src/gausskernel/storage/replication/archive_walreceiver.cpp index a07798bf9..c1920c698 100644 --- a/src/gausskernel/storage/replication/archive_walreceiver.cpp +++ b/src/gausskernel/storage/replication/archive_walreceiver.cpp @@ -163,7 +163,7 @@ void update_stop_barrier() securec_check(rc, "\0", "\0"); ereport(LOG, (errmsg("Get switchover barrierID %s", (char *)walrcv->recoverySwitchoverBarrierId))); } - } else if (IS_DISASTER_RECOVER_MODE) { + } else if (IS_MULTI_DISASTER_RECOVER_MODE) { if (strlen(g_instance.csn_barrier_cxt.stopBarrierId) != 0) { SpinLockAcquire(&walrcv->mutex); rc = strncpy_s((char *)walrcv->recoveryStopBarrierId, MAX_BARRIER_ID_LENGTH, diff --git a/src/gausskernel/storage/replication/libpqwalreceiver.cpp b/src/gausskernel/storage/replication/libpqwalreceiver.cpp index c30df148b..1b08d5ea5 100755 --- a/src/gausskernel/storage/replication/libpqwalreceiver.cpp +++ b/src/gausskernel/storage/replication/libpqwalreceiver.cpp @@ -603,7 +603,7 @@ static void NotifyWalSendForMainStandby(uint32 localTerm, uint32 remoteTerm) continue; SpinLockAcquire(&walsnd->mutex); - if (IS_DISASTER_RECOVER_MODE && localTerm != remoteTerm) { + if (IS_MULTI_DISASTER_RECOVER_MODE && localTerm != remoteTerm) { walsnd->isTermChanged = true; } walsnd->sendKeepalive = true; diff --git a/src/gausskernel/storage/replication/slotfuncs.cpp b/src/gausskernel/storage/replication/slotfuncs.cpp index 39b53dc1a..c821e0355 100755 --- a/src/gausskernel/storage/replication/slotfuncs.cpp +++ b/src/gausskernel/storage/replication/slotfuncs.cpp @@ -1082,7 +1082,7 @@ void slot_redo(XLogReaderState *record) return; } - if ((IS_DISASTER_RECOVER_MODE && (info == XLOG_SLOT_CREATE || info == XLOG_SLOT_ADVANCE)) || + if ((IS_MULTI_DISASTER_RECOVER_MODE && (info == XLOG_SLOT_CREATE || info == XLOG_SLOT_ADVANCE)) || IsRoachRestore() || t_thrd.xlog_cxt.recoveryTarget == RECOVERY_TARGET_TIME_OBS) { return; } diff --git a/src/gausskernel/storage/replication/walreceiver.cpp b/src/gausskernel/storage/replication/walreceiver.cpp index 18f4d4030..2aa82a620 100755 --- a/src/gausskernel/storage/replication/walreceiver.cpp +++ b/src/gausskernel/storage/replication/walreceiver.cpp @@ -1335,7 +1335,7 @@ static void ProcessReplyFlags(void) { #ifdef ENABLE_MULTIPLE_NODES volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv; - if (IS_DISASTER_RECOVER_MODE) { + if (IS_MULTI_DISASTER_RECOVER_MODE) { SpinLockAcquire(&walrcv->mutex); if (walrcv->isPauseByTargetBarrier) { t_thrd.walreceiver_cxt.reply_message->replyFlags |= IS_PAUSE_BY_TARGET_BARRIER; diff --git a/src/gausskernel/storage/replication/walsender.cpp b/src/gausskernel/storage/replication/walsender.cpp index fd4ee470c..54a87c5b6 100755 --- a/src/gausskernel/storage/replication/walsender.cpp +++ b/src/gausskernel/storage/replication/walsender.cpp @@ -4245,7 +4245,7 @@ static int WalSndLoop(WalSndSendDataCallback send_data) volatile WalSnd *walsnd = t_thrd.walsender_cxt.MyWalSnd; if (t_thrd.postmaster_cxt.HaShmData->is_hadr_main_standby) { /* In distributed streaming dr cluster, shutdown walsender of main standby when term is changed */ - if (IS_DISASTER_RECOVER_MODE && walsnd->isTermChanged) { + if (IS_MULTI_DISASTER_RECOVER_MODE && walsnd->isTermChanged) { ereport(LOG, (errmsg("Shutdown walsender of main standby due to the term change."))); SpinLockAcquire(&walsnd->mutex); walsnd->isTermChanged = false; diff --git a/src/include/catalog/upgrade_sql/set_guc/add_guc b/src/include/catalog/upgrade_sql/set_guc/add_guc index e69de29bb..d78f78086 100644 --- a/src/include/catalog/upgrade_sql/set_guc/add_guc +++ b/src/include/catalog/upgrade_sql/set_guc/add_guc @@ -0,0 +1 @@ +92763 stream_cluster_run_mode datanode cluster_standby diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 9a0e0d3aa..26d889f5d 100755 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -44,15 +44,17 @@ #ifdef ENABLE_MULTIPLE_NODES #define AM_HADR_CN_WAL_RECEIVER (t_thrd.postmaster_cxt.HaShmData->is_cross_region && \ t_thrd.postmaster_cxt.HaShmData->current_mode == STANDBY_MODE && IS_PGXC_COORDINATOR) + +#define IS_MULTI_DISASTER_RECOVER_MODE \ + (t_thrd.postmaster_cxt.HaShmData->current_mode == STANDBY_MODE && \ + g_instance.attr.attr_common.stream_cluster_run_mode == RUN_MODE_STANDBY) +#else +#define IS_MULTI_DISASTER_RECOVER_MODE false #endif #define AM_HADR_WAL_RECEIVER (t_thrd.postmaster_cxt.HaShmData->is_cross_region && \ t_thrd.postmaster_cxt.HaShmData->is_hadr_main_standby) -#define IS_DISASTER_RECOVER_MODE \ - (t_thrd.postmaster_cxt.HaShmData->current_mode == STANDBY_MODE && \ - g_instance.attr.attr_common.stream_cluster_run_mode == RUN_MODE_STANDBY) - #define IS_CN_DISASTER_RECOVER_MODE \ (IS_PGXC_COORDINATOR && t_thrd.postmaster_cxt.HaShmData->current_mode == STANDBY_MODE && \ g_instance.attr.attr_common.stream_cluster_run_mode == RUN_MODE_STANDBY) diff --git a/src/test/regress/output/recovery_2pc_tools.source b/src/test/regress/output/recovery_2pc_tools.source index 9a40c0474..fc28d5c5b 100644 --- a/src/test/regress/output/recovery_2pc_tools.source +++ b/src/test/regress/output/recovery_2pc_tools.source @@ -641,6 +641,7 @@ select name,vartype,unit,min_val,max_val from pg_settings where name <> 'qunit_c standby_shared_buffers_fraction | real | | 0.1 | 1 statement_timeout | integer | ms | 0 | 2147483647 stats_temp_directory | string | | | + stream_cluster_run_mode | enum | | | string_hash_compatible | bool | | | support_batch_bind | bool | | | support_extended_features | bool | | |