|
|
|
|
@ -101,6 +101,7 @@ static bool SyncPaxosQueueIsOrderedByLSN(void);
|
|
|
|
|
static List *SyncRepGetSyncStandbysPriority(bool *am_sync, List** catchup_standbys = NULL);
|
|
|
|
|
static List *SyncRepGetSyncStandbysQuorum(bool *am_sync, List** catchup_standbys = NULL);
|
|
|
|
|
static int cmp_lsn(const void *a, const void *b);
|
|
|
|
|
static bool DelayIntoMostAvaSync(bool checkSyncNum, int curSyncNum = 0);
|
|
|
|
|
|
|
|
|
|
#define CATCHUP_XLOG_DIFF(ptr1, ptr2, amount) \
|
|
|
|
|
XLogRecPtrIsInvalid(ptr1) ? false : (XLByteDifference(ptr2, ptr1) < amount)
|
|
|
|
|
@ -155,9 +156,6 @@ void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel)
|
|
|
|
|
char *new_status = NULL;
|
|
|
|
|
const char *old_status = NULL;
|
|
|
|
|
int mode = u_sess->attr.attr_storage.sync_rep_wait_mode;
|
|
|
|
|
TimestampTz now_time;
|
|
|
|
|
TimestampTz now_time_dynamical;
|
|
|
|
|
bool sync_master_standalone_window = false;
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Fast exit if user has not requested sync replication, or there are no
|
|
|
|
|
@ -185,21 +183,10 @@ void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel)
|
|
|
|
|
* condition but we'll be fetching that cache line anyway so its likely to
|
|
|
|
|
* be a low cost check. We don't wait for sync rep if no sync standbys alive
|
|
|
|
|
*/
|
|
|
|
|
now_time = GetCurrentTimestamp();
|
|
|
|
|
long diff_sec;
|
|
|
|
|
int diff_microsec;
|
|
|
|
|
|
|
|
|
|
if (t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone) {
|
|
|
|
|
TimestampDifference(t_thrd.walsender_cxt.WalSndCtl->last_sync_master_standalone_time, now_time, &diff_sec,
|
|
|
|
|
&diff_microsec);
|
|
|
|
|
if (diff_sec >= (long)u_sess->attr.attr_storage.keep_sync_window) {
|
|
|
|
|
sync_master_standalone_window = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!t_thrd.walsender_cxt.WalSndCtl->sync_standbys_defined ||
|
|
|
|
|
XLByteLE(XactCommitLSN, t_thrd.walsender_cxt.WalSndCtl->lsn[mode]) ||
|
|
|
|
|
(sync_master_standalone_window && !IS_SHARED_STORAGE_MODE) ||
|
|
|
|
|
(t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !IS_SHARED_STORAGE_MODE &&
|
|
|
|
|
!DelayIntoMostAvaSync(false)) ||
|
|
|
|
|
!SynRepWaitCatchup(XactCommitLSN)) {
|
|
|
|
|
LWLockRelease(SyncRepLock);
|
|
|
|
|
RESUME_INTERRUPTS();
|
|
|
|
|
@ -263,8 +250,9 @@ void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel)
|
|
|
|
|
* walsender changes the state to SYNC_REP_WAIT_COMPLETE, it will never
|
|
|
|
|
* update it again, so we can't be seeing a stale value in that case.
|
|
|
|
|
*/
|
|
|
|
|
if (t_thrd.proc->syncRepState == SYNC_REP_WAIT_COMPLETE)
|
|
|
|
|
if (t_thrd.proc->syncRepState == SYNC_REP_WAIT_COMPLETE && !DelayIntoMostAvaSync(true)) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If a wait for synchronous replication is pending, we can neither
|
|
|
|
|
@ -327,18 +315,9 @@ void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN, bool enableHandleCancel)
|
|
|
|
|
/*
|
|
|
|
|
* If we modify the syncmode dynamically, we'll stop wait
|
|
|
|
|
*/
|
|
|
|
|
now_time_dynamical = GetCurrentTimestamp();
|
|
|
|
|
long diff_sec_dynamical;
|
|
|
|
|
int diff_microsec_dynamical;
|
|
|
|
|
if (t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone) {
|
|
|
|
|
TimestampDifference(t_thrd.walsender_cxt.WalSndCtl->last_sync_master_standalone_time, now_time_dynamical,
|
|
|
|
|
&diff_sec_dynamical, &diff_microsec_dynamical);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if ((sync_master_standalone_window && !IS_SHARED_STORAGE_MODE) ||
|
|
|
|
|
u_sess->attr.attr_storage.guc_synchronous_commit <= SYNCHRONOUS_COMMIT_LOCAL_FLUSH ||
|
|
|
|
|
(t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone &&
|
|
|
|
|
diff_sec_dynamical >= (long)u_sess->attr.attr_storage.keep_sync_window)) {
|
|
|
|
|
if ((t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone && !IS_SHARED_STORAGE_MODE &&
|
|
|
|
|
!DelayIntoMostAvaSync(false)) ||
|
|
|
|
|
u_sess->attr.attr_storage.guc_synchronous_commit <= SYNCHRONOUS_COMMIT_LOCAL_FLUSH) {
|
|
|
|
|
ereport(WARNING,
|
|
|
|
|
(errmsg("canceling wait for synchronous replication due to syncmaster standalone."),
|
|
|
|
|
errdetail("The transaction has already committed locally, but might not have been replicated to "
|
|
|
|
|
@ -627,6 +606,55 @@ void SyncRepReleaseWaiters(void)
|
|
|
|
|
numapply, (uint32)(replayPtr >> 32), (uint32)replayPtr)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Check whether to delay the time of entering most_available_sync mode.
|
|
|
|
|
*
|
|
|
|
|
* Return false if keep_sync_window is not set, or most_available_sync
|
|
|
|
|
* is not set, or current time is not in the time range since the time
|
|
|
|
|
* when the number of sync standby is not qualified.
|
|
|
|
|
* Otherwise it's set to true.
|
|
|
|
|
*/
|
|
|
|
|
static bool DelayIntoMostAvaSync(bool checkSyncNum, int curSyncNum)
|
|
|
|
|
{
|
|
|
|
|
bool result = false;
|
|
|
|
|
|
|
|
|
|
if (!t_thrd.walsender_cxt.WalSndCtl->most_available_sync || !u_sess->attr.attr_storage.keep_sync_window) {
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (checkSyncNum) {
|
|
|
|
|
List *sync_standbys = NIL;
|
|
|
|
|
bool am_sync = false;
|
|
|
|
|
sync_standbys = SyncRepGetSyncStandbys(&am_sync);
|
|
|
|
|
curSyncNum = list_length(sync_standbys);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (curSyncNum == t_thrd.syncrep_cxt.SyncRepConfig->num_sync) {
|
|
|
|
|
if (t_thrd.walsender_cxt.WalSndCtl->keep_sync_window_start != 0) {
|
|
|
|
|
t_thrd.walsender_cxt.WalSndCtl->keep_sync_window_start = 0;
|
|
|
|
|
t_thrd.walsender_cxt.WalSndCtl->out_keep_sync_window = false;
|
|
|
|
|
}
|
|
|
|
|
} else if (t_thrd.walsender_cxt.WalSndCtl->keep_sync_window_start == 0) {
|
|
|
|
|
t_thrd.walsender_cxt.WalSndCtl->keep_sync_window_start = GetCurrentTimestamp();
|
|
|
|
|
result = true;
|
|
|
|
|
} else if (!t_thrd.walsender_cxt.WalSndCtl->out_keep_sync_window) {
|
|
|
|
|
TimestampTz now_time = 0;
|
|
|
|
|
long diff_sec = 0;
|
|
|
|
|
int diff_microsec = 0;
|
|
|
|
|
now_time = GetCurrentTimestamp();
|
|
|
|
|
TimestampDifference(t_thrd.walsender_cxt.WalSndCtl->keep_sync_window_start, now_time, &diff_sec,
|
|
|
|
|
&diff_microsec);
|
|
|
|
|
if (diff_sec < (long)u_sess->attr.attr_storage.keep_sync_window) {
|
|
|
|
|
result = true;
|
|
|
|
|
ereport(DEBUG1, (errmsg("Delay to entering most_available_sync mode, %ld seconds passed.", diff_sec)));
|
|
|
|
|
} else {
|
|
|
|
|
t_thrd.walsender_cxt.WalSndCtl->out_keep_sync_window = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Calculate the synced Receive, Write, Flush and Apply positions among sync standbys.
|
|
|
|
|
*
|
|
|
|
|
@ -655,9 +683,10 @@ bool SyncRepGetSyncRecPtr(XLogRecPtr *receivePtr, XLogRecPtr *writePtr, XLogRecP
|
|
|
|
|
* but in a particular scenario, when most_available_sync is true, primary only wait the alive sync standbys
|
|
|
|
|
* if list_length(sync_standbys) doesn't satisfy t_thrd.syncrep_cxt.SyncRepConfig->num_sync.
|
|
|
|
|
*/
|
|
|
|
|
int cur_num_sync_standbys = list_length(sync_standbys);
|
|
|
|
|
if ((!(*am_sync) && check_am_sync) || t_thrd.syncrep_cxt.SyncRepConfig == NULL ||
|
|
|
|
|
(!t_thrd.walsender_cxt.WalSndCtl->most_available_sync &&
|
|
|
|
|
list_length(sync_standbys) < t_thrd.syncrep_cxt.SyncRepConfig->num_sync)) {
|
|
|
|
|
((!t_thrd.walsender_cxt.WalSndCtl->most_available_sync || DelayIntoMostAvaSync(false, cur_num_sync_standbys)) &&
|
|
|
|
|
cur_num_sync_standbys < t_thrd.syncrep_cxt.SyncRepConfig->num_sync)) {
|
|
|
|
|
list_free(sync_standbys);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
@ -1404,8 +1433,6 @@ void SyncRepCheckSyncStandbyAlive(void)
|
|
|
|
|
ereport(LOG, (errmsg("synchronous master is now standalone")));
|
|
|
|
|
|
|
|
|
|
t_thrd.walsender_cxt.WalSndCtl->sync_master_standalone = true;
|
|
|
|
|
t_thrd.walsender_cxt.WalSndCtl->last_sync_master_standalone_time = GetCurrentTimestamp();
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If there is any waiting sender, then wake-up them as
|
|
|
|
|
* master has switched to standalone mode
|
|
|
|
|
|