From 40fb97943f25dc064b35aba3f8fb874f83885e4d Mon Sep 17 00:00:00 2001 From: "arcoalien@qq.com" Date: Sat, 25 Dec 2021 18:49:17 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E5=BB=B6=E8=BF=9F=E8=BF=9B=E5=85=A5=E6=9C=80?= =?UTF-8?q?=E5=A4=A7=E5=8F=AF=E7=94=A8=E6=A8=A1=E5=BC=8F=E9=9C=80=E6=B1=82?= =?UTF-8?q?=E7=9A=84=E4=BB=A3=E7=A0=81=E9=80=BB=E8=BE=91=20=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E7=A4=BE=E5=8C=BA=E8=AF=A5=E9=9C=80=E6=B1=82=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/bin/gs_guc/cluster_guc.cpp | 4 + .../storage/replication/syncrep.cpp | 89 ++++++++++++------- .../storage/replication/walsender.cpp | 3 +- src/include/replication/walsender_private.h | 3 +- 4 files changed, 66 insertions(+), 33 deletions(-) diff --git a/src/bin/gs_guc/cluster_guc.cpp b/src/bin/gs_guc/cluster_guc.cpp index 69ba04947..a533effd9 100644 --- a/src/bin/gs_guc/cluster_guc.cpp +++ b/src/bin/gs_guc/cluster_guc.cpp @@ -2659,6 +2659,10 @@ static void readPopenOutputParallel(const char* cmd, bool if_for_all_instance) } curr_cxt->readbuf[0] = '\0'; curr_cxt->cur_buf_loc = 0; + endsp = strstr(result, "WARNING"); + if (NULL != endsp) { + (void)write_stderr("%s", result); + } endsp = strstr(result, "Success to perform gs_guc"); if (NULL != endsp) { successNumber++; diff --git a/src/gausskernel/storage/replication/syncrep.cpp b/src/gausskernel/storage/replication/syncrep.cpp index de4485198..1a1aba2c5 100755 --- a/src/gausskernel/storage/replication/syncrep.cpp +++ b/src/gausskernel/storage/replication/syncrep.cpp @@ -101,6 +101,7 @@ static bool SyncPaxosQueueIsOrderedByLSN(void); static List *SyncRepGetSyncStandbysPriority(bool *am_sync, List** catchup_standbys = NULL); static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, List** catchup_standbys = NULL); static int cmp_lsn(const void *a, const void *b); +static bool DelayIntoMostAvaSync(bool checkSyncNum, int curSyncNum = 0); #define CATCHUP_XLOG_DIFF(ptr1, ptr2, amount) \ XLogRecPtrIsInvalid(ptr1) ? false : (XLByteDifference(ptr2, ptr1) < amount) @@ -155,9 +156,6 @@ void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel) char *new_status = NULL; const char *old_status = NULL; int mode = u_sess->attr.attr_storage.sync_rep_wait_mode; - TimestampTz now_time; - TimestampTz now_time_dynamical; - bool sync_master_standalone_window = false; /* * Fast exit if user has not requested sync replication, or there are no @@ -185,21 +183,9 @@ void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel) * condition but we'll be fetching that cache line anyway so its likely to * be a low cost check. We don't wait for sync rep if no sync standbys alive */ - now_time = GetCurrentTimestamp(); - long diff_sec; - int diff_microsec; - - if (t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone) { - TimestampDifference(t_thrd.walsender_cxt.WalSndCtl->last_sync_master_standalone_time, now_time, &diff_sec, - &diff_microsec); - if(diff_sec >= (long)u_sess->attr.attr_storage.keep_sync_window) { - sync_master_standalone_window = true; - } - } - if (!t_thrd.walsender_cxt.WalSndCtl->sync_standbys_defined || XLByteLE(XactCommitLSN, t_thrd.walsender_cxt.WalSndCtl->lsn[mode]) || - sync_master_standalone_window || + (t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !DelayIntoMostAvaSync(false)) || !SynRepWaitCatchup(XactCommitLSN)) { LWLockRelease(SyncRepLock); RESUME_INTERRUPTS(); @@ -263,8 +249,9 @@ void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel) * walsender changes the state to SYNC_REP_WAIT_COMPLETE, it will never * update it again, so we can't be seeing a stale value in that case. */ - if (t_thrd.proc->syncRepState == SYNC_REP_WAIT_COMPLETE) + if (t_thrd.proc->syncRepState == SYNC_REP_WAIT_COMPLETE && !DelayIntoMostAvaSync(true)) { break; + } /* * If a wait for synchronous replication is pending, we can neither @@ -327,17 +314,8 @@ void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel) /* * If we modify the syncmode dynamically, we'll stop wait */ - now_time_dynamical = GetCurrentTimestamp(); - long diff_sec_dynamical; - int diff_microsec_dynamical; - if (t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone) { - TimestampDifference(t_thrd.walsender_cxt.WalSndCtl->last_sync_master_standalone_time, now_time_dynamical, - &diff_sec_dynamical, &diff_microsec_dynamical); - } - if (sync_master_standalone_window || - synchronous_commit <= SYNCHRONOUS_COMMIT_LOCAL_FLUSH || - (t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && - diff_sec_dynamical >= (long)u_sess->attr.attr_storage.keep_sync_window)) { + if ((t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !DelayIntoMostAvaSync(false)) || + synchronous_commit <= SYNCHRONOUS_COMMIT_LOCAL_FLUSH) { ereport(WARNING, (errmsg("canceling wait for synchronous replication due to syncmaster standalone."), errdetail("The transaction has already committed locally, but might not have been replicated to " @@ -626,6 +604,55 @@ void SyncRepReleaseWaiters(void) numapply, (uint32)(replayPtr >> 32), (uint32)replayPtr))); } +/* + * Check whether to delay the time of entering most_available_sync mode. + * + * Return false if keep_sync_window is not set, or most_available_sync + * is not set, or current time is not in the time range since the time + * when the number of sync standby is not qualified. + * Otherwise it's set to true. + */ +static bool DelayIntoMostAvaSync(bool checkSyncNum, int curSyncNum) +{ + bool result = false; + + if (!t_thrd.walsender_cxt.WalSndCtl->most_available_sync || !u_sess->attr.attr_storage.keep_sync_window) { + return result; + } + + if (checkSyncNum) { + List *sync_standbys = NIL; + bool am_sync = false; + sync_standbys = SyncRepGetSyncStandbys(&am_sync); + curSyncNum = list_length(sync_standbys); + } + + if (curSyncNum == t_thrd.syncrep_cxt.SyncRepConfig->num_sync) { + if (t_thrd.walsender_cxt.WalSndCtl->keep_sync_window_start != 0) { + t_thrd.walsender_cxt.WalSndCtl->keep_sync_window_start = 0; + t_thrd.walsender_cxt.WalSndCtl->out_keep_sync_window = false; + } + } else if (t_thrd.walsender_cxt.WalSndCtl->keep_sync_window_start == 0) { + t_thrd.walsender_cxt.WalSndCtl->keep_sync_window_start = GetCurrentTimestamp(); + result = true; + } else if (!t_thrd.walsender_cxt.WalSndCtl->out_keep_sync_window) { + TimestampTz now_time = 0; + long diff_sec = 0; + int diff_microsec = 0; + now_time = GetCurrentTimestamp(); + TimestampDifference(t_thrd.walsender_cxt.WalSndCtl->keep_sync_window_start, now_time, &diff_sec, + &diff_microsec); + if (diff_sec < (long)u_sess->attr.attr_storage.keep_sync_window) { + result = true; + ereport(DEBUG1, (errmsg("Delay to entering most_available_sync mode, %ld seconds passed.", diff_sec))); + } else { + t_thrd.walsender_cxt.WalSndCtl->out_keep_sync_window = true; + } + } + + return result; +} + /* * Calculate the synced Receive, Write, Flush and Apply positions among sync standbys. * @@ -654,9 +681,10 @@ bool SyncRepGetSyncRecPtr(XLogRecPtr *receivePtr, XLogRecPtr *writePtr, XLogRecP * but in a particular scenario, when most_available_sync is true, primary only wait the alive sync standbys * if list_length(sync_standbys) doesn't satisfy t_thrd.syncrep_cxt.SyncRepConfig->num_sync. */ + int cur_num_sync_standbys = list_length(sync_standbys); if ((!(*am_sync) && check_am_sync) || t_thrd.syncrep_cxt.SyncRepConfig == NULL || - (!t_thrd.walsender_cxt.WalSndCtl->most_available_sync && - list_length(sync_standbys) < t_thrd.syncrep_cxt.SyncRepConfig->num_sync)) { + ((!t_thrd.walsender_cxt.WalSndCtl->most_available_sync || DelayIntoMostAvaSync(false, cur_num_sync_standbys)) && + cur_num_sync_standbys < t_thrd.syncrep_cxt.SyncRepConfig->num_sync)) { list_free(sync_standbys); return false; } @@ -1392,7 +1420,6 @@ void SyncRepCheckSyncStandbyAlive(void) ereport(LOG, (errmsg("synchronous master is now standalone"))); t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone = true; - t_thrd.walsender_cxt.WalSndCtl->last_sync_master_standalone_time = GetCurrentTimestamp(); /* * If there is any waiting sender, then wake-up them as * master has switched to standalone mode diff --git a/src/gausskernel/storage/replication/walsender.cpp b/src/gausskernel/storage/replication/walsender.cpp index 5355b3bc9..eda28cadb 100755 --- a/src/gausskernel/storage/replication/walsender.cpp +++ b/src/gausskernel/storage/replication/walsender.cpp @@ -4349,7 +4349,8 @@ void WalSndShmemInit(void) } t_thrd.walsender_cxt.WalSndCtl->most_available_sync = false; t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone = false; - t_thrd.walsender_cxt.WalSndCtl->last_sync_master_standalone_time = 0; + t_thrd.walsender_cxt.WalSndCtl->keep_sync_window_start = 0; + t_thrd.walsender_cxt.WalSndCtl->out_keep_sync_window = false; t_thrd.walsender_cxt.WalSndCtl->demotion = NoDemote; SpinLockInit(&t_thrd.walsender_cxt.WalSndCtl->mutex); } diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 7d1ba89c5..2ab9a8863 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -181,7 +181,8 @@ typedef struct WalSndCtlData { * mode. */ bool sync_master_standalone; - TimestampTz last_sync_master_standalone_time; + TimestampTz keep_sync_window_start; + bool out_keep_sync_window; /* * The demotion of postmaster Also indicates that all the walsenders From e0f8f6a966811f07b8cc42d922d889351ba53b50 Mon Sep 17 00:00:00 2001 From: "arcoalien@qq.com" Date: Wed, 5 Jan 2022 16:00:27 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=B8=BB=E7=BA=BF?= =?UTF-8?q?=E5=9B=9E=E5=90=88=E5=AF=BC=E8=87=B4=E7=9A=84=E5=B7=AE=E5=BC=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/gausskernel/storage/replication/syncrep.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/gausskernel/storage/replication/syncrep.cpp b/src/gausskernel/storage/replication/syncrep.cpp index 4bbcd8b77..147ee4498 100755 --- a/src/gausskernel/storage/replication/syncrep.cpp +++ b/src/gausskernel/storage/replication/syncrep.cpp @@ -185,7 +185,8 @@ void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel) */ if (!t_thrd.walsender_cxt.WalSndCtl->sync_standbys_defined || XLByteLE(XactCommitLSN, t_thrd.walsender_cxt.WalSndCtl->lsn[mode]) || - (t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !DelayIntoMostAvaSync(false)) || + (t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !IS_SHARED_STORAGE_MODE && + !DelayIntoMostAvaSync(false)) || !SynRepWaitCatchup(XactCommitLSN)) { LWLockRelease(SyncRepLock); RESUME_INTERRUPTS(); @@ -314,8 +315,9 @@ void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel) /* * If we modify the syncmode dynamically, we'll stop wait */ - if ((t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !DelayIntoMostAvaSync(false)) || - synchronous_commit <= SYNCHRONOUS_COMMIT_LOCAL_FLUSH) { + if ((t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !IS_SHARED_STORAGE_MODE && + !DelayIntoMostAvaSync(false)) || + u_sess->attr.attr_storage.guc_synchronous_commit <= SYNCHRONOUS_COMMIT_LOCAL_FLUSH) { ereport(WARNING, (errmsg("canceling wait for synchronous replication due to syncmaster standalone."), errdetail("The transaction has already committed locally, but might not have been replicated to "