From f9367d72bbc7e1cc7f28bbd5bbc96c57b4033e8e Mon Sep 17 00:00:00 2001 From: cchen676 Date: Tue, 26 Jul 2022 14:30:37 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=903.0=E5=88=86=E6=94=AF=E5=9B=9E?= =?UTF-8?q?=E5=90=88master=E3=80=91=E6=94=AF=E6=8C=81=E5=A2=9E=E9=87=8Fbui?= =?UTF-8?q?ld=E6=97=B6=E6=A0=A1=E9=AA=8C=E4=B8=BB=E5=A4=87=E8=BE=BE?= =?UTF-8?q?=E6=88=90=E5=A4=9A=E6=95=B0=E6=B4=BE=E4=B8=80=E8=87=B4=E6=80=A7?= =?UTF-8?q?=E7=9A=84=E4=BD=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/bin/gs_guc/cluster_guc.conf | 1 + src/bin/pg_rewind/parsexlog.cpp | 171 +++++++++++++++++- src/bin/pg_rewind/pg_rewind.cpp | 10 + src/bin/pg_rewind/pg_rewind.h | 4 + .../backend/utils/misc/guc/guc_storage.cpp | 13 ++ .../storage/access/transam/twophase.cpp | 5 + .../storage/access/transam/xact.cpp | 5 + src/gausskernel/storage/lmgr/proc.cpp | 1 + src/gausskernel/storage/replication/slot.cpp | 36 +++- .../storage/replication/syncrep.cpp | 55 ++++++ .../storage/replication/walreceiver.cpp | 36 ++++ .../storage/replication/walsender.cpp | 1 + .../knl/knl_guc/knl_instance_attr_storage.h | 3 + src/include/replication/syncrep.h | 3 + src/include/replication/walsender_private.h | 2 + src/include/storage/proc.h | 1 + .../regress/output/recovery_2pc_tools.source | 1 + 17 files changed, 345 insertions(+), 3 deletions(-) diff --git a/src/bin/gs_guc/cluster_guc.conf b/src/bin/gs_guc/cluster_guc.conf index 4b9c65a40..0e5443190 100755 --- a/src/bin/gs_guc/cluster_guc.conf +++ b/src/bin/gs_guc/cluster_guc.conf @@ -696,6 +696,7 @@ redo_bind_cpu_attr|string|0,0|NULL|NULL| logical_decode_options_default|string|0,0|NULL|NULL| logical_sender_timeout|int|0,2147483647|ms|NULL| var_eq_const_selectivity|bool|0,0|NULL|NULL| +enable_save_confirmed_lsn|bool|0,0|NULL|NULL| [cmserver] log_dir|string|0,0|NULL|NULL| log_file_size|int|0,2047|MB|NULL| diff --git a/src/bin/pg_rewind/parsexlog.cpp b/src/bin/pg_rewind/parsexlog.cpp index 1e7039db2..266cedea9 100644 --- a/src/bin/pg_rewind/parsexlog.cpp +++ b/src/bin/pg_rewind/parsexlog.cpp @@ -15,7 +15,7 @@ #include "postgres.h" #include "knl/knl_variable.h" -#include "filemap.h" +#include "file_ops.h" #include "logging.h" #include "pg_rewind.h" #include "pg_build.h" @@ -23,6 +23,7 @@ #include "access/htup.h" #include "access/xlog_internal.h" #include "storage/smgr/segment.h" +#include "replication/slot.h" #include "access/xlogreader.h" #include "catalog/pg_control.h" #include @@ -541,3 +542,171 @@ void recordReadTest(const char* datadir, XLogRecPtr ptr, TimeLineID tli) CloseXlogFile(); return; } + +static void ReadConfirmedLSNFromDisk(char* buffer, XLogRecPtr *confirmedLsn) +{ + ReplicationSlotOnDisk slot; + pg_crc32c checksum; + + int rc = memcpy_s(&slot, sizeof(ReplicationSlotOnDisk), buffer, sizeof(ReplicationSlotOnDisk)); + securec_check(rc, "", ""); + + INIT_CRC32C(checksum); + COMP_CRC32C(checksum, &slot.slotdata, sizeof(ReplicationSlotPersistentData)); + FIN_CRC32C(checksum); + + if (!EQ_CRC32C(checksum, slot.checksum) || + slot.magic != SLOT_MAGIC || + slot.length != ReplicationSlotOnDiskDynamicSize) { + pg_log(PG_ERROR, "Failed to check CRC/Magic/Lenth of replication slot %s.\n", slot.slotdata.name.data); + return; + } + + if (GET_SLOT_PERSISTENCY(slot.slotdata) != RS_PERSISTENT) { + return; + } + + if (XLogRecPtrIsValid(slot.slotdata.confirmed_flush) && XLByteLT(*confirmedLsn, slot.slotdata.confirmed_flush)) { + *confirmedLsn = slot.slotdata.confirmed_flush; + pg_log(PG_PROGRESS, "Get confirmed lsn %X/%X from replication slot %s.", (uint32)((*confirmedLsn) >> 32), + (uint32)(*confirmedLsn), slot.slotdata.name.data); + } +} + +bool FindConfirmedLSN(const char* dataDir, XLogRecPtr *confirmedLsn) +{ + DIR *replicationDir = NULL; + struct dirent *replicationDe = NULL; + pg_log(PG_PROGRESS, "Start to get the confirmed LSN of the replication slots.\n"); + char replicationDirPath[MAXPGPATH] = { 0 }; + int rc = EOK; + rc = sprintf_s(replicationDirPath, MAXPGPATH, "%s/pg_replslot", dataDir); + securec_check_ss_c(rc, "\0", "\0"); + replicationDir = opendir(replicationDirPath); + if (replicationDir == NULL) { + pg_log(PG_WARNING, "The DIR pg_replslot does'nt exist. Can't get confirmed LSN.\n"); + return false; + } + + while ((replicationDe = readdir(replicationDir)) != NULL) { + char path[MAXPGPATH] = { 0 }; + struct stat fst; + size_t size; + char* buffer = NULL; + + if (strcmp(replicationDe->d_name, ".") == 0 || strcmp(replicationDe->d_name, "..") == 0) + continue; + + rc = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "%s/%s/state", replicationDirPath, replicationDe->d_name); + securec_check_ss_c(rc, "\0", "\0"); + + if (lstat(path, &fst) == 0) { + buffer = slurpFile("", path, &size); + if (size != sizeof(ReplicationSlotOnDisk)) { + pg_log(PG_ERROR, "unexpected slot file size %d, expected %d\n", (int)size, + (int)sizeof(ReplicationSlotOnDisk)); + free(buffer); + continue; + } + ReadConfirmedLSNFromDisk(buffer, confirmedLsn); + free(buffer); + } + } + closedir(replicationDir); + + if (XLogRecPtrIsInvalid(*confirmedLsn)) { + return false; + } + + return true; +} + +BuildErrorCode CheckConfirmedLSNOnTarget(const char *datadir, TimeLineID tli, XLogRecPtr ckptRedo, XLogRecPtr confirmedLSN, + uint32 term) +{ + if (XLogRecPtrIsInvalid(confirmedLSN) || XLByteLT(confirmedLSN, ckptRedo)) { + return BUILD_SUCCESS; + } + + XLogPageReadPrivate privateReader; + XLogReaderState *xlogreader = NULL; + char pg_conf_file[MAXPGPATH]; + char *errormsg = NULL; + + privateReader.datadir = datadir; + privateReader.tli = tli; + xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &privateReader); + if (xlogreader == NULL) { + pg_log(PG_ERROR, "out of memory\n"); + return BUILD_FATAL; + } + + /* get conn info */ + int ret = snprintf_s(pg_conf_file, MAXPGPATH, MAXPGPATH - 1, "%s/postgresql.conf", datadir_target); + securec_check_ss_c(ret, "\0", "\0"); + get_conninfo(pg_conf_file); + + XLogRecord *record = NULL; + record = XLogReadRecord(xlogreader, confirmedLSN, &errormsg); + if (record == NULL) { + if (errormsg != NULL) { + pg_fatal("could not find previous WAL record at %X/%X: %s\n", (uint32)(confirmedLSN >> 32), + (uint32)confirmedLSN, errormsg); + } else { + pg_fatal("could not find previous WAL record at %X/%X\n", (uint32)(confirmedLSN >> 32), + (uint32)confirmedLSN); + } + XLogReaderFree(xlogreader); + CloseXlogFile(); + return BUILD_FATAL; + } + + if (checkCommonAncestorByXlog(xlogreader->ReadRecPtr, record->xl_crc, term) == false) { + pg_log(PG_FATAL, "could not find confirmed record %X/%X on source\n", (uint32)(confirmedLSN >> 32), + (uint32)confirmedLSN); + XLogReaderFree(xlogreader); + CloseXlogFile(); + return BUILD_FATAL; + } + + XLogReaderFree(xlogreader); + CloseXlogFile(); + pg_log(PG_PROGRESS, "Check confirmed lsn %X/%X at source success.", (uint32)((confirmedLSN) >> 32), + (uint32)(confirmedLSN)); + return BUILD_SUCCESS; +} + +bool CheckIfEanbedSaveSlots() +{ + const char* optname[] = {"enable_save_confirmed_lsn"}; + char config_file[MAXPGPATH] = {0}; + char** optlines = NULL; + int ret = EOK; + int lines_index = 0; + int optvalue_off; + int optvalue_len; + char optvalue[MAX_VALUE_LEN] = { 0 }; + + ret = snprintf_s(config_file, MAXPGPATH, MAXPGPATH - 1, "%s/postgresql.conf", pg_data); + securec_check_ss_c(ret, "\0", "\0"); + config_file[MAXPGPATH - 1] = '\0'; + optlines = readfile(config_file); + + if (optlines == NULL) { + pg_log(PG_WARNING, _("%s cannot be opened.\n"), config_file); + return false; + } + + lines_index = find_gucoption((const char **)optlines, (const char *)optname[0], NULL, NULL, &optvalue_off, &optvalue_len); + if (lines_index != INVALID_LINES_IDX) { + ret = strncpy_s(optvalue, MAX_VALUE_LEN, optlines[lines_index] + optvalue_off, + (size_t)Min(optvalue_len, MAX_VALUE_LEN - 1)); + securec_check_c(ret, "", ""); + if ((strcmp(optvalue, "1") == 0) || (strcmp(optvalue, "true") == 0) || (strcmp(optvalue, "on") == 0)) { + pg_log(PG_PROGRESS, _("Enable saving confirmed_lsn in target %s\n"), config_file); + return true; + } + } + + return false; +} diff --git a/src/bin/pg_rewind/pg_rewind.cpp b/src/bin/pg_rewind/pg_rewind.cpp index 9fca0c20e..aadf227b0 100755 --- a/src/bin/pg_rewind/pg_rewind.cpp +++ b/src/bin/pg_rewind/pg_rewind.cpp @@ -239,6 +239,16 @@ BuildErrorCode gs_increment_build(const char* pgdata, const char* connstr, char* PG_CHECKRETURN_AND_RETURN(rv); pg_log(PG_PROGRESS, "find diverge point success\n"); + /* Read pg_replslot and get the largest confirmed LSN across all the synced replslots */ + if(CheckIfEanbedSaveSlots()) { + XLogRecPtr confirmedLsn = InvalidXLogRecPtr; + if(FindConfirmedLSN(datadir_target, &confirmedLsn) && + CheckConfirmedLSNOnTarget(datadir_target, lastcommontli, chkptredo, confirmedLsn, term) == BUILD_FATAL) { + pg_log(PG_PROGRESS, "Can't find quorum confirmed LSN at source, build will exit!\n"); + exit(1); + } + } + /* Checkpoint redo should exist. Otherwise, fatal and change to full build. */ (void)readOneRecord(datadir_target, chkptredo, chkpttli); pg_log(PG_PROGRESS, diff --git a/src/bin/pg_rewind/pg_rewind.h b/src/bin/pg_rewind/pg_rewind.h index 133a9790a..546540832 100644 --- a/src/bin/pg_rewind/pg_rewind.h +++ b/src/bin/pg_rewind/pg_rewind.h @@ -66,6 +66,10 @@ extern BuildErrorCode waitEndTargetFileStatThread(void); extern BuildErrorCode targetFilemapProcess(void); void recordReadTest(const char* datadir, XLogRecPtr ptr, TimeLineID tli); void openDebugLog(void); +bool FindConfirmedLSN(const char* dataDir, XLogRecPtr *confirmedLsn); +BuildErrorCode CheckConfirmedLSNOnTarget(const char *datadir, TimeLineID tli, XLogRecPtr ckptRedo, XLogRecPtr confirmedLSN, + uint32 term); +bool CheckIfEanbedSaveSlots(); #define PG_CHECKBUILD_AND_RETURN() \ do { \ diff --git a/src/common/backend/utils/misc/guc/guc_storage.cpp b/src/common/backend/utils/misc/guc/guc_storage.cpp index 175eb143c..9c7c5e936 100755 --- a/src/common/backend/utils/misc/guc/guc_storage.cpp +++ b/src/common/backend/utils/misc/guc/guc_storage.cpp @@ -1001,6 +1001,19 @@ static void InitStorageConfigureNamesBool() NULL, NULL, NULL}, +#endif +#ifndef ENABLE_MULTIPLE_NODES + {{"enable_save_confirmed_lsn", + PGC_POSTMASTER, + NODE_SINGLENODE, + UNGROUPED, + gettext_noop("Enable save confirmed lsn at xact commit."), + NULL}, + &g_instance.attr.attr_storage.enable_save_confirmed_lsn, + false, + NULL, + NULL, + NULL}, #endif /* End-of-list marker */ {{NULL, diff --git a/src/gausskernel/storage/access/transam/twophase.cpp b/src/gausskernel/storage/access/transam/twophase.cpp index 9ca3e10cc..63ca29730 100644 --- a/src/gausskernel/storage/access/transam/twophase.cpp +++ b/src/gausskernel/storage/access/transam/twophase.cpp @@ -2010,6 +2010,11 @@ void EndPrepare(GlobalTransaction gxact) if (g_instance.attr.attr_storage.dcf_attr.enable_dcf) { SyncPaxosWaitForLSN(gxact->prepare_end_lsn); } else { +#ifndef ENABLE_MULTIPLE_NODES + if (g_instance.attr.attr_storage.enable_save_confirmed_lsn) { + t_thrd.proc->syncSetConfirmedLSN = t_thrd.xlog_cxt.ProcLastRecPtr; + } +#endif SyncWaitRet stopWatiRes = SyncRepWaitForLSN(gxact->prepare_end_lsn, false); #ifdef ENABLE_MULTIPLE_NODES /* In distribute prepare phase, repsync failed participant rasie error to coordinator */ diff --git a/src/gausskernel/storage/access/transam/xact.cpp b/src/gausskernel/storage/access/transam/xact.cpp index d2b0a00a2..9928ca956 100755 --- a/src/gausskernel/storage/access/transam/xact.cpp +++ b/src/gausskernel/storage/access/transam/xact.cpp @@ -1851,6 +1851,11 @@ static TransactionId RecordTransactionCommit(void) * clog and still show as running in the procarray and continue to hold locks. */ if (wrote_xlog && u_sess->attr.attr_storage.guc_synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH) { +#ifndef ENABLE_MULTIPLE_NODES + if (g_instance.attr.attr_storage.enable_save_confirmed_lsn) { + t_thrd.proc->syncSetConfirmedLSN = t_thrd.xlog_cxt.ProcLastRecPtr; + } +#endif SyncRepWaitForLSN(t_thrd.xlog_cxt.XactLastRecEnd, !markXidCommitted); g_instance.comm_cxt.localinfo_cxt.set_term = true; } diff --git a/src/gausskernel/storage/lmgr/proc.cpp b/src/gausskernel/storage/lmgr/proc.cpp index 7acd4ea55..03ef6bf9b 100755 --- a/src/gausskernel/storage/lmgr/proc.cpp +++ b/src/gausskernel/storage/lmgr/proc.cpp @@ -903,6 +903,7 @@ void InitProcess(void) t_thrd.proc->waitLSN = 0; t_thrd.proc->syncRepState = SYNC_REP_NOT_WAITING; t_thrd.proc->syncRepInCompleteQueue = false; + t_thrd.proc->syncSetConfirmedLSN = 0; SHMQueueElemInit(&(t_thrd.proc->syncRepLinks)); /* Initialize fields for data sync rep */ diff --git a/src/gausskernel/storage/replication/slot.cpp b/src/gausskernel/storage/replication/slot.cpp index 2e7a76d33..ced6b615c 100755 --- a/src/gausskernel/storage/replication/slot.cpp +++ b/src/gausskernel/storage/replication/slot.cpp @@ -52,7 +52,7 @@ #include "storage/procarray.h" #include "postmaster/postmaster.h" #include "utils/builtins.h" - +#include "replication/walsender_private.h" extern bool PMstateIsRun(void); @@ -318,6 +318,7 @@ void ReplicationSlotCreate(const char *name, ReplicationSlotPersistency persiste ReplicationSlot *slot = NULL; int i; errno_t rc = 0; + int slot_idx = -1; Assert(t_thrd.slot_cxt.MyReplicationSlot == NULL); @@ -363,8 +364,10 @@ void ReplicationSlotCreate(const char *name, ReplicationSlotPersistency persiste } return; } - if (!s->in_use && slot == NULL) + if (!s->in_use && slot == NULL) { slot = s; + slot_idx = i; + } } /* If all slots are in use, we're out of luck. */ if (slot == NULL) { @@ -444,6 +447,9 @@ void ReplicationSlotCreate(const char *name, ReplicationSlotPersistency persiste vslot->active = true; SpinLockRelease(&slot->mutex); t_thrd.slot_cxt.MyReplicationSlot = slot; + if (t_thrd.walsender_cxt.MyWalSnd != NULL && t_thrd.walsender_cxt.MyWalSnd->pid != 0) { + t_thrd.walsender_cxt.MyWalSnd->slot_idx = slot_idx; + } } LWLockRelease(ReplicationSlotControlLock); @@ -464,6 +470,7 @@ void ReplicationSlotAcquire(const char *name, bool isDummyStandby, bool allowDro ReplicationSlot *slot = NULL; int i; bool active = false; + int slot_idx = -1; Assert(t_thrd.slot_cxt.MyReplicationSlot == NULL); @@ -482,6 +489,7 @@ void ReplicationSlotAcquire(const char *name, bool isDummyStandby, bool allowDro vslot->active = true; SpinLockRelease(&s->mutex); slot = s; + slot_idx = i; break; } } @@ -512,6 +520,9 @@ void ReplicationSlotAcquire(const char *name, bool isDummyStandby, bool allowDro /* We made this slot active, so it's ours now. */ t_thrd.slot_cxt.MyReplicationSlot = slot; + if (t_thrd.walsender_cxt.MyWalSnd != NULL && t_thrd.walsender_cxt.MyWalSnd->pid != 0) { + t_thrd.walsender_cxt.MyWalSnd->slot_idx = slot_idx; + } } /* @@ -758,6 +769,11 @@ static void ReplicationSlotDropAcquired(void) ReplicationSlotsComputeRequiredXmin(false); ReplicationSlotsComputeRequiredLSN(NULL); + /* reset the slot_idx to invalid */ + if (t_thrd.walsender_cxt.MyWalSnd != NULL && t_thrd.walsender_cxt.MyWalSnd->slot_idx != -1) { + t_thrd.walsender_cxt.MyWalSnd->slot_idx = -1; + } + /* * If removing the directory fails, the worst thing that will happen is * that the user won't be able to create a new slot with the same name @@ -949,6 +965,7 @@ void ReplicationSlotsComputeRequiredLSN(ReplicationSlotState *repl_slt_state) XLogRecPtr max_required = InvalidXLogRecPtr; XLogRecPtr standby_slots_list[g_instance.attr.attr_storage.max_replication_slots]; XLogRecPtr min_archive_restart_lsn = InvalidXLogRecPtr; + XLogRecPtr max_confirmed_lsn = InvalidXLogRecPtr; bool in_use = false; errno_t rc = EOK; @@ -987,6 +1004,14 @@ void ReplicationSlotsComputeRequiredLSN(ReplicationSlotState *repl_slt_state) SpinLockRelease(&s->mutex); CalculateMinAndMaxRequiredPtr(s, standby_slots_list, i, restart_lsn, &min_tools_required, &min_required, &min_archive_restart_lsn, &max_required); +#ifndef ENABLE_MULTIPLE_NODES + if (g_instance.attr.attr_storage.enable_save_confirmed_lsn && + GET_SLOT_PERSISTENCY(s->data) == RS_PERSISTENT && + XLogRecPtrIsValid(s->data.confirmed_flush) && + XLByteLT(max_confirmed_lsn, s->data.confirmed_flush)) { + max_confirmed_lsn = s->data.confirmed_flush; + } +#endif continue; lock_release: SpinLockRelease(&s->mutex); @@ -1015,6 +1040,13 @@ void ReplicationSlotsComputeRequiredLSN(ReplicationSlotState *repl_slt_state) } } } +#ifndef ENABLE_MULTIPLE_NODES + if (g_instance.attr.attr_storage.enable_save_confirmed_lsn && + XLogRecPtrIsValid(max_confirmed_lsn) && + XLByteLT(max_confirmed_lsn, repl_slt_state->quorum_min_required)) { + repl_slt_state->quorum_min_required = max_confirmed_lsn; + } +#endif repl_slt_state->min_tools_required = min_tools_required; repl_slt_state->min_archive_slot_required = min_archive_restart_lsn; repl_slt_state->exist_in_use = in_use; diff --git a/src/gausskernel/storage/replication/syncrep.cpp b/src/gausskernel/storage/replication/syncrep.cpp index 26bb6ab23..8df4f0908 100755 --- a/src/gausskernel/storage/replication/syncrep.cpp +++ b/src/gausskernel/storage/replication/syncrep.cpp @@ -332,6 +332,11 @@ SyncWaitRet SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel) * the commit is cleaned up. */ if (t_thrd.int_cxt.ProcDiePending || t_thrd.proc_cxt.proc_exit_inprogress) { +#ifndef ENABLE_MULTIPLE_NODES + if (g_instance.attr.attr_storage.enable_save_confirmed_lsn) { + t_thrd.postgres_cxt.whereToSendOutput = DestNone; + } +#endif ereport(WARNING, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("canceling the wait for synchronous replication and terminating connection due to " @@ -698,6 +703,34 @@ void SyncRepReleaseWaiters(void) numapply, (uint32)(replayPtr >> 32), (uint32)replayPtr))); } +#ifndef ENABLE_MULTIPLE_NODES +void SetXactLastCommitToSyncedStandby(XLogRecPtr recptr) +{ + int slot_idx; + bool modified = false; + volatile WalSnd* walsnd = t_thrd.walsender_cxt.MyWalSnd; + slot_idx = walsnd->slot_idx; + + if (slot_idx < 0) { + ereport(PANIC, (errmsg("The replication slot index is invalid!"))); + } + + ReplicationSlot *s = &t_thrd.slot_cxt.ReplicationSlotCtl->replication_slots[slot_idx]; + SpinLockAcquire(&s->mutex); + if (XLByteLT(s->data.confirmed_flush, recptr)) { + s->data.confirmed_flush = recptr; + s->just_dirtied = true; + s->dirty = true; + modified = true; + } + SpinLockRelease(&s->mutex); + + if (modified) { + ReplicationSlotSave(); + } +} +#endif + static SyncStandbyNumState check_sync_standbys_num(const List* sync_standbys) { if (t_thrd.syncrep_cxt.SyncRepConfig == NULL) @@ -1104,6 +1137,7 @@ int SyncRepWakeQueue(bool all, int mode) PGPROC *proc = NULL; PGPROC *thisproc = NULL; int numprocs = 0; + XLogRecPtr confirmedLSN = InvalidXLogRecPtr; Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); Assert(SyncRepQueueIsOrderedByLSN(mode)); @@ -1127,6 +1161,21 @@ int SyncRepWakeQueue(bool all, int mode) */ thisproc = proc; thisproc->syncRepInCompleteQueue = true; +#ifndef ENABLE_MULTIPLE_NODES + /* + * Set confirmed LSN at primary node during sync wait for LSN. + * Confirmed LSN is the start LSN of last xact of proc which all qurom stanby nodes had met with primary. + * With saving the confirmed LSN at primary node during sync wait process and validating it during build + * process, it can avoid the primary node lost data if it will be built as new standby while an async + * standby node is running as new primary. + */ + if (g_instance.attr.attr_storage.enable_save_confirmed_lsn && + XLogRecPtrIsValid(thisproc->syncSetConfirmedLSN)) { + confirmedLSN = + XLByteLT(confirmedLSN, thisproc->syncSetConfirmedLSN) ? thisproc->syncSetConfirmedLSN : confirmedLSN; + thisproc->syncSetConfirmedLSN = InvalidXLogRecPtr; + } +#endif proc = (PGPROC *)SHMQueueNext(&(t_thrd.walsender_cxt.WalSndCtl->SyncRepQueue[mode]), &(proc->syncRepLinks), offsetof(PGPROC, syncRepLinks)); @@ -1135,6 +1184,12 @@ int SyncRepWakeQueue(bool all, int mode) numprocs++; } +#ifndef ENABLE_MULTIPLE_NODES + if (g_instance.attr.attr_storage.enable_save_confirmed_lsn && XLogRecPtrIsValid(confirmedLSN)) { + SetXactLastCommitToSyncedStandby(confirmedLSN); + } +#endif + /* Delete the finished segment from the list, and only notifies leader proc */ if (pTail != pHead) { PGPROC* leaderProc = (PGPROC *) (((char *) pHead->next) - offsetof(PGPROC, syncRepLinks)); diff --git a/src/gausskernel/storage/replication/walreceiver.cpp b/src/gausskernel/storage/replication/walreceiver.cpp index e83c97084..33a167857 100755 --- a/src/gausskernel/storage/replication/walreceiver.cpp +++ b/src/gausskernel/storage/replication/walreceiver.cpp @@ -203,6 +203,9 @@ static void ProcessArchiveXlogMessage(const ArchiveXlogMessage* archive_xlog_mes static void WalRecvSendArchiveXlogResponse(ArchiveTaskStatus *archive_status); static void ProcessHadrSwitchoverRequest(HadrSwitchoverMessage *hadrSwitchoverMessage); static void WalRecvHadrSwitchoverResponse(); +#ifndef ENABLE_MULTIPLE_NODES +static void ResetConfirmedLSNOnDisk(); +#endif #ifdef ENABLE_MULTIPLE_NODES static void WalRecvHadrSendReply(); #endif @@ -671,6 +674,9 @@ void WalReceiverMain(void) if(walrcv->conn_target != REPCONNTARGET_OBS && !g_instance.attr.attr_storage.dcf_attr.enable_dcf) { firstSynchStandbyFile(); set_disable_conn_mode(); +#ifndef ENABLE_MULTIPLE_NODES + ResetConfirmedLSNOnDisk(); +#endif } knl_g_set_redo_finish_status(REDO_FINISH_STATUS_LOCAL); @@ -2799,3 +2805,33 @@ static void WalRecvHadrSendReply() } #endif +#ifndef ENABLE_MULTIPLE_NODES +static void ResetConfirmedLSNOnDisk() +{ + if (!g_instance.attr.attr_storage.enable_save_confirmed_lsn || + !IsServerModeStandby()) { + return; + } + + int i; + bool modified = false; + + ereport(DEBUG1, (errmsg("Reset the confirmed LSN info of replication slot."))); + for (i = 0; i < g_instance.attr.attr_storage.max_replication_slots; i++) { + ReplicationSlot *s = &t_thrd.slot_cxt.ReplicationSlotCtl->replication_slots[i]; + if (!s->in_use || GET_SLOT_PERSISTENCY(s->data) != RS_PERSISTENT || XLogRecPtrIsInvalid(s->data.confirmed_flush)) + continue; + + SpinLockAcquire(&s->mutex); + s->data.confirmed_flush = InvalidXLogRecPtr; + s->just_dirtied = true; + s->dirty = true; + modified = true; + SpinLockRelease(&s->mutex); + } + + if (modified) { + CheckPointReplicationSlots(); + } +} +#endif diff --git a/src/gausskernel/storage/replication/walsender.cpp b/src/gausskernel/storage/replication/walsender.cpp index 4c937c17f..b6f159ac5 100755 --- a/src/gausskernel/storage/replication/walsender.cpp +++ b/src/gausskernel/storage/replication/walsender.cpp @@ -4496,6 +4496,7 @@ static void InitWalSnd(void) walsnd->lastCalTime = 0; walsnd->lastCalWrite = InvalidXLogRecPtr; walsnd->catchupRate = 0; + walsnd->slot_idx = -1; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ OwnLatch((Latch *)&walsnd->latch); diff --git a/src/include/knl/knl_guc/knl_instance_attr_storage.h b/src/include/knl/knl_guc/knl_instance_attr_storage.h index 710ea978a..9b61e27f5 100755 --- a/src/include/knl/knl_guc/knl_instance_attr_storage.h +++ b/src/include/knl/knl_guc/knl_instance_attr_storage.h @@ -166,6 +166,9 @@ typedef struct knl_instance_attr_storage { int max_logical_replication_workers; char *redo_bind_cpu_attr; int max_active_gtt; +#ifndef ENABLE_MULTIPLE_NODES + bool enable_save_confirmed_lsn; +#endif } knl_instance_attr_storage; #endif /* SRC_INCLUDE_KNL_KNL_INSTANCE_ATTR_STORAGE_H_ */ diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index e08b2994c..13bc75455 100755 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -118,6 +118,9 @@ typedef union syncrep_scanner_YYSTYPE { extern int syncrep_scanner_yylex(syncrep_scanner_YYSTYPE* lvalp, YYLTYPE* llocp, syncrep_scanner_yyscan_t yyscanner); extern void syncrep_scanner_yyerror(const char* message, syncrep_scanner_yyscan_t yyscanner); extern bool SyncRepGetSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, XLogRecPtr* replayPtr, bool* am_sync, bool check_am_sync = true); +#ifndef ENABLE_MULTIPLE_NODES +extern void SetXactLastCommitToSyncedStandby(XLogRecPtr recptr); +#endif #ifndef ENABLE_MULTIPLE_NODES /* diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 978e28ae2..6e50dfc05 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -154,6 +154,8 @@ typedef struct WalSnd { SDRSInteractiveState interactiveState; bool isMasterInstanceReady; TimestampTz lastRequestTimestamp; + /* The idx of replication slot */ + int slot_idx; } WalSnd; extern THR_LOCAL WalSnd* MyWalSnd; diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 55ddfab90..4dbfdf708 100755 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -185,6 +185,7 @@ struct PGPROC { int syncRepState; /* wait state for sync rep */ bool syncRepInCompleteQueue; /* waiting in complete queue */ SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */ + XLogRecPtr syncSetConfirmedLSN; /* set confirmed LSN for SyncRepWaitForLSN */ XLogRecPtr waitPaxosLSN; /* waiting for this LSN or higher on paxos callback */ int syncPaxosState; /* wait state for sync paxos, reuse syncRepState defines */ diff --git a/src/test/regress/output/recovery_2pc_tools.source b/src/test/regress/output/recovery_2pc_tools.source index 6cb6f76d8..ed2f46dfb 100644 --- a/src/test/regress/output/recovery_2pc_tools.source +++ b/src/test/regress/output/recovery_2pc_tools.source @@ -291,6 +291,7 @@ select name,vartype,unit,min_val,max_val from pg_settings where name <> 'qunit_c enable_recyclebin | bool | | | enable_resource_record | bool | | | enable_resource_track | bool | | | + enable_save_confirmed_lsn | bool | | | enable_save_datachanged_timestamp | bool | | | enable_security_policy | bool | | | enable_segment | bool | | |