主备高可用能力增强

This commit is contained in:
chenxiaobin19
2023-06-07 10:23:48 +08:00
parent 338028e0f8
commit 53725381cb
11 changed files with 255 additions and 15 deletions

View File

@ -736,6 +736,8 @@ dolphin_server_port|int|1024,65535|NULL|NULL|
enable_dolphin_proto|bool|0,0|NULL|NULL|
enable_remote_excute|bool|0,0|NULL|NULL|
light_comm|bool|0,0|NULL|NULL|
ignore_standby_lsn_window|int|0,2147483647|s|NULL|
ignore_feedback_xmin_window|int|0,2147483647|s|NULL|
[cmserver]
log_dir|string|0,0|NULL|NULL|
log_file_size|int|0,2047|MB|NULL|

View File

@ -2540,6 +2540,36 @@ static void InitStorageConfigureNamesInt()
NULL,
NULL},
{{"ignore_standby_lsn_window",
PGC_SIGHUP,
NODE_ALL,
REPLICATION_SENDING,
gettext_noop("Sets the maximum time to wait for WAL replication."),
NULL,
GUC_UNIT_MS},
&u_sess->attr.attr_storage.ignore_standby_lsn_window,
0,
0,
INT_MAX,
NULL,
NULL,
NULL},
{{"ignore_feedback_xmin_window",
PGC_SIGHUP,
NODE_ALL,
REPLICATION_SENDING,
gettext_noop("Sets the maximum time to wait for feedback xmin."),
NULL,
GUC_UNIT_MS},
&u_sess->attr.attr_storage.ignore_feedback_xmin_window,
0,
0,
INT_MAX,
NULL,
NULL,
NULL},
{{"replication_type",
PGC_POSTMASTER,
NODE_ALL,

View File

@ -1084,6 +1084,9 @@ void vacuum_set_xid_limits(Relation rel, int64 freeze_min_age, int64 freeze_tabl
TransactionId safeLimit;
TransactionId nextXid;
/* Recompute replication_slot_xmin before GetOldestXmin */
ReplicationSlotsComputeRequiredXmin(false);
/*
* We can always ignore processes running lazy vacuum. This is because we
* use these values only for deciding which tuples we must keep in the

View File

@ -930,6 +930,24 @@ void ReplicationSlotPersist(void)
ReplicationSlotSave();
}
static bool IfIgnoreStandbyXmin(TransactionId xmin, TimestampTz last_xmin_change_time)
{
if (u_sess->attr.attr_storage.ignore_feedback_xmin_window <= 0) {
return false;
}
TimestampTz nowTime = GetCurrentTimestamp();
if (timestamptz_cmp_internal(nowTime, TimestampTzPlusMilliseconds(last_xmin_change_time,
u_sess->attr.attr_storage.ignore_feedback_xmin_window)) >= 0) {
/* If the xmin is older than recentGlobalXmin, ignore it */
TransactionId recentGlobalXmin = pg_atomic_read_u64(&t_thrd.xact_cxt.ShmemVariableCache->recentGlobalXmin);
if (xmin < recentGlobalXmin) {
return true;
}
}
return false;
}
/*
* Compute the oldest xmin across all slots and store it in the ProcArray.
*
@ -949,6 +967,7 @@ void ReplicationSlotsComputeRequiredXmin(bool already_locked)
ReplicationSlot *s = &t_thrd.slot_cxt.ReplicationSlotCtl->replication_slots[i];
TransactionId effective_xmin;
TransactionId effective_catalog_xmin;
TimestampTz last_xmin_change_time;
if (!s->in_use)
continue;
@ -959,11 +978,12 @@ void ReplicationSlotsComputeRequiredXmin(bool already_locked)
SpinLockAcquire(&s->mutex);
effective_xmin = vslot->effective_xmin;
effective_catalog_xmin = vslot->effective_catalog_xmin;
last_xmin_change_time = vslot->last_xmin_change_time;
SpinLockRelease(&s->mutex);
}
/* check the data xmin */
if (TransactionIdIsValid(effective_xmin) &&
if (TransactionIdIsValid(effective_xmin) && !IfIgnoreStandbyXmin(effective_xmin, last_xmin_change_time) &&
(!TransactionIdIsValid(agg_xmin) || TransactionIdPrecedes(effective_xmin, agg_xmin)))
agg_xmin = effective_xmin;
@ -1817,6 +1837,7 @@ loop:
slot->active = false;
slot->extra_content = extra_content;
slot->archive_config = archive_cfg;
slot->last_xmin_change_time = GetCurrentTimestamp();
restored = true;
if (extra_content != NULL) {
MarkArchiveSlotOperate();

View File

@ -107,7 +107,8 @@ static bool SyncRepQueueIsOrderedByLSN(int mode);
static bool SyncPaxosQueueIsOrderedByLSN(void);
#endif
static int SyncRepGetSyncStandbysInGroup(SyncRepStandbyData** sync_standbys, int groupid, List** catchup_standbys);
static int SyncRepGetSyncStandbysInGroup(SyncRepStandbyData** sync_standbys, int groupid, List** catchup_standbys,
int mode = SYNC_REP_NO_WAIT);
static int standby_priority_comparator(const void *a, const void *b);
static inline void free_sync_standbys_list(List* sync_standbys);
@ -862,7 +863,17 @@ bool SyncRepGetSyncRecPtr(XLogRecPtr *receivePtr, XLogRecPtr *writePtr, XLogRecP
break;
}
}
SyncRepStandbyData *stby = NULL;
for(i = 0; !(*am_sync) && i < num_standbys; i++) {
/*
* there may be some hanging sync standby, so potential sync
* standby need to release waiters too.
*/
stby = sync_standbys + i;
*am_sync = stby->receive_too_old || stby->write_too_old || stby->flush_too_old || stby->apply_too_old;
}
/*
* Quick exit if we are not managing a sync standby (or not check for check_am_sync is false)
* or there are not enough synchronous standbys.
@ -904,8 +915,15 @@ bool SyncRepGetSyncRecPtr(XLogRecPtr *receivePtr, XLogRecPtr *writePtr, XLogRecP
sync_standbys, num_standbys, i, t_thrd.syncrep_cxt.SyncRepConfig[i]->num_sync);
}
}
/*
* deal with position is invalid when most available sync mode is on
* and all sync standbys don't update position over ignore_standby_lsn_window.
*/
*writePtr = XLogRecPtrIsInvalid(*writePtr) ? GetXLogWriteRecPtr() : *writePtr;
*flushPtr = XLogRecPtrIsInvalid(*flushPtr) ? GetFlushRecPtr() : *flushPtr;
*receivePtr = XLogRecPtrIsInvalid(*receivePtr) ? *writePtr : *receivePtr;
*replayPtr = XLogRecPtrIsInvalid(*replayPtr) ? *flushPtr : *replayPtr;
}
pfree(sync_standbys);
return true;
@ -959,6 +977,41 @@ static bool SyncRepGetSyncLeftTime(XLogRecPtr XactCommitLSN, TimestampTz* leftTi
}
#endif
/*
* Calculate the indicated position among sync standbys.
*/
static void SyncRepGetOldestSyncRecPtrByMode(XLogRecPtr* outPtr, SyncRepStandbyData* sync_standbys, int num_standbys,
int mode)
{
int i;
SyncRepStandbyData* stby;
XLogRecPtr ptr;
/* Scan through all sync standbys and calculate the oldest positions. */
for(i = 0; i < num_standbys; i++) {
stby = sync_standbys + i;
switch (mode) {
case SYNC_REP_WAIT_RECEIVE:
ptr = stby->receive;
break;
case SYNC_REP_WAIT_WRITE:
ptr = stby->write;
break;
case SYNC_REP_WAIT_FLUSH:
ptr = stby->flush;
break;
case SYNC_REP_WAIT_APPLY:
ptr = stby->apply;
break;
default:
return;
}
if (XLogRecPtrIsInvalid(*outPtr) || !XLByteLE(*outPtr, ptr))
*outPtr = ptr;
}
}
/*
* Calculate the oldest Write, Flush and Apply positions among sync standbys.
*/
@ -971,6 +1024,10 @@ static void SyncRepGetOldestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* write
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
bool receive_has_invalid = false;
bool write_has_invalid = false;
bool flush_has_invalid = false;
bool apply_has_invalid = false;
/*
* Scan through all sync standbys and calculate the oldest
@ -981,21 +1038,60 @@ static void SyncRepGetOldestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* write
if(stby->sync_standby_group != groupid) {
continue;
}
receive_has_invalid = receive_has_invalid || stby->receive_too_old;
write_has_invalid = write_has_invalid || stby->write_too_old;
flush_has_invalid = flush_has_invalid || stby->flush_too_old;
apply_has_invalid = apply_has_invalid || stby->apply_too_old;
receive = stby->receive;
write = stby->write;
flush = stby->flush;
apply = stby->apply;
if (XLogRecPtrIsInvalid(*writePtr) || !XLByteLE(*writePtr, write))
if (!write_has_invalid && (XLogRecPtrIsInvalid(*writePtr) || !XLByteLE(*writePtr, write)))
*writePtr = write;
if (XLogRecPtrIsInvalid(*flushPtr) || !XLByteLE(*flushPtr, flush))
if (!flush_has_invalid && (XLogRecPtrIsInvalid(*flushPtr) || !XLByteLE(*flushPtr, flush)))
*flushPtr = flush;
if (XLogRecPtrIsInvalid(*receivePtr) || !XLByteLE(*receivePtr, receive))
if (!receive_has_invalid && (XLogRecPtrIsInvalid(*receivePtr) || !XLByteLE(*receivePtr, receive)))
*receivePtr = receive;
if (XLogRecPtrIsInvalid(*replayPtr) || !XLByteLE(*replayPtr, apply))
if (!apply_has_invalid && (XLogRecPtrIsInvalid(*replayPtr) || !XLByteLE(*replayPtr, apply)))
*replayPtr = apply;
}
/*
* If any lsn point is invalid, reacquire sync standbys which have
* valid lsn porint and recompute.
*/
SyncRepStandbyData* sync_standbys_tmp = (SyncRepStandbyData *)palloc(
g_instance.attr.attr_storage.max_wal_senders * sizeof(SyncRepStandbyData));
int num_standbys_tmp;
if (receive_has_invalid) {
Assert(XLogRecPtrIsInvalid(*receivePtr));
num_standbys_tmp = SyncRepGetSyncStandbysInGroup(&sync_standbys_tmp, groupid, NULL, SYNC_REP_WAIT_RECEIVE);
SyncRepGetOldestSyncRecPtrByMode(receivePtr, sync_standbys_tmp, num_standbys_tmp,
SYNC_REP_WAIT_RECEIVE);
}
if (write_has_invalid) {
Assert(XLogRecPtrIsInvalid(*writePtr));
num_standbys_tmp = SyncRepGetSyncStandbysInGroup(&sync_standbys_tmp, groupid, NULL, SYNC_REP_WAIT_WRITE);
SyncRepGetOldestSyncRecPtrByMode(writePtr, sync_standbys_tmp, num_standbys_tmp,
SYNC_REP_WAIT_WRITE);
}
if (flush_has_invalid) {
Assert(XLogRecPtrIsInvalid(*flushPtr));
num_standbys_tmp = SyncRepGetSyncStandbysInGroup(&sync_standbys_tmp, groupid, NULL, SYNC_REP_WAIT_FLUSH);
SyncRepGetOldestSyncRecPtrByMode(flushPtr, sync_standbys_tmp, num_standbys_tmp,
SYNC_REP_WAIT_FLUSH);
}
if (apply_has_invalid) {
Assert(XLogRecPtrIsInvalid(*replayPtr));
num_standbys_tmp = SyncRepGetSyncStandbysInGroup(&sync_standbys_tmp, groupid, NULL, SYNC_REP_WAIT_APPLY);
SyncRepGetOldestSyncRecPtrByMode(replayPtr, sync_standbys_tmp, num_standbys_tmp,
SYNC_REP_WAIT_APPLY);
}
pfree(sync_standbys_tmp);
}
/*
@ -1012,6 +1108,7 @@ static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* wr
XLogRecPtr *flush_array = NULL;
XLogRecPtr* apply_array = NULL;
int group_len;
int receive_valid_num, write_valid_num, flush_valid_num, apply_valid_num;
int i;
SyncRepStandbyData* stby;
@ -1032,6 +1129,10 @@ static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* wr
apply_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * group_len);
i = 0;
receive_valid_num = 0;
write_valid_num = 0;
flush_valid_num = 0;
apply_valid_num = 0;
foreach(cell, stby_list) {
stby = sync_standbys + lfirst_int(cell);
@ -1039,10 +1140,30 @@ static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* wr
continue;
}
receive_array[i] = stby->receive;
write_array[i] = stby->write;
flush_array[i] = stby->flush;
apply_array[i] = stby->apply;
if (stby->receive_too_old) {
receive_array[i] = InvalidXLogRecPtr;
} else {
receive_array[i] = stby->receive;
receive_valid_num++;
}
if (stby->write_too_old) {
write_array[i] = InvalidXLogRecPtr;
} else {
write_array[i] = stby->write;
write_valid_num++;
}
if (stby->flush_too_old) {
flush_array[i] = InvalidXLogRecPtr;
} else {
flush_array[i] = stby->flush;
flush_valid_num++;
}
if (stby->apply_too_old) {
apply_array[i] = InvalidXLogRecPtr;
} else {
apply_array[i] = stby->apply;
apply_valid_num++;
}
i++;
}
@ -1070,6 +1191,19 @@ static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* wr
if (XLogRecPtrIsInvalid(*replayPtr) || XLByteLE(apply_array[nth - 1], *replayPtr))
*replayPtr = apply_array[nth - 1];
/* If positions are remain Invalid, return oldest valid one if posible */
if (XLogRecPtrIsInvalid(*receivePtr) && receive_valid_num > 0) {
*receivePtr = receive_array[receive_valid_num - 1];
}
if (XLogRecPtrIsInvalid(*writePtr) && write_valid_num > 0) {
*writePtr = write_array[write_valid_num - 1];
}
if (XLogRecPtrIsInvalid(*flushPtr) && flush_valid_num > 0) {
*flushPtr = flush_array[flush_valid_num - 1];
}
if (XLogRecPtrIsInvalid(*replayPtr) && apply_valid_num > 0) {
*replayPtr = apply_array[apply_valid_num - 1];
}
list_free(stby_list);
@ -1399,13 +1533,14 @@ int SyncRepGetSyncStandbys(SyncRepStandbyData** sync_standbys, List** catchup_st
return num_sync;
}
static int SyncRepGetSyncStandbysInGroup(SyncRepStandbyData** sync_standbys, int groupid, List** catchup_standbys)
static int SyncRepGetSyncStandbysInGroup(SyncRepStandbyData** sync_standbys, int groupid, List** catchup_standbys,
int mode)
{
int i;
int num_sync = 0; /* how many sync standbys in current group */
volatile WalSnd *walsnd = NULL; /* Use volatile pointer to prevent code rearrangement */
SyncRepStandbyData *stby = NULL;
TimestampTz now = GetCurrentTimestamp();
/* state/peer_state/peer_role is not included in SyncRepStandbyData */
WalSndState state;
@ -1429,6 +1564,10 @@ static int SyncRepGetSyncStandbysInGroup(SyncRepStandbyData** sync_standbys, int
stby->sync_standby_priority = walsnd->sync_standby_priority;
stby->sync_standby_group = walsnd->sync_standby_group;
stby->is_cross_cluster = walsnd->is_cross_cluster;
stby->receive_too_old = IfIgnoreStandbyLsn(now, walsnd->lastReceiveChangeTime);
stby->write_too_old = IfIgnoreStandbyLsn(now, walsnd->lastWriteChangeTime);
stby->flush_too_old = IfIgnoreStandbyLsn(now, walsnd->lastFlushChangeTime);
stby->apply_too_old = IfIgnoreStandbyLsn(now, walsnd->lastApplyChangeTime);
SpinLockRelease(&walsnd->mutex);
/* Must be active */
@ -1456,6 +1595,14 @@ static int SyncRepGetSyncStandbysInGroup(SyncRepStandbyData** sync_standbys, int
continue;
}
/* used in SyncRepGetOldestSyncRecPtr to skip standby with too old position. */
if ((mode == SYNC_REP_WAIT_RECEIVE && stby->receive_too_old) ||
(mode == SYNC_REP_WAIT_WRITE && stby->write_too_old) ||
(mode == SYNC_REP_WAIT_FLUSH && stby->flush_too_old) ||
(mode == SYNC_REP_WAIT_APPLY && stby->apply_too_old)) {
continue;
}
stby->walsnd_index = i;
stby->is_me = (walsnd == t_thrd.walsender_cxt.MyWalSnd);
num_sync++;

View File

@ -2863,6 +2863,8 @@ static void ProcessStandbyReplyMessage(void)
}
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = t_thrd.walsender_cxt.MyWalSnd;
TimestampTz now = GetCurrentTimestamp();
XLogRecPtr localFlush = GetFlushRecPtr();
/*
* Update shared state for this WalSender process based on reply data from
@ -2870,6 +2872,18 @@ static void ProcessStandbyReplyMessage(void)
*/
{
SpinLockAcquire(&walsnd->mutex);
/*
* If reply position is bigger than last one, or equal to local flush,
* update change time.
*/
walsnd->lastReceiveChangeTime = XLByteLT(walsnd->receive, reply.receive) ||
XLByteEQ(walsnd->receive, localFlush) ? now : walsnd->lastReceiveChangeTime;
walsnd->lastWriteChangeTime = XLByteLT(walsnd->write, reply.write) ||
XLByteEQ(walsnd->write, localFlush) ? now : walsnd->lastWriteChangeTime;
walsnd->lastFlushChangeTime = XLByteLT(walsnd->flush, reply.flush) ||
XLByteEQ(walsnd->flush, localFlush) ? now : walsnd->lastFlushChangeTime;
walsnd->lastApplyChangeTime = XLByteLT(walsnd->apply, reply.apply) ||
XLByteEQ(walsnd->apply, localFlush) ? now : walsnd->lastApplyChangeTime;
walsnd->receive = reply.receive;
walsnd->write = reply.write;
walsnd->flush = reply.flush;
@ -2926,6 +2940,7 @@ static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin)
slot->data.xmin = feedbackXmin;
slot->effective_xmin = feedbackXmin;
}
slot->last_xmin_change_time = GetCurrentTimestamp();
SpinLockRelease(&slot->mutex);
if (changed) {
@ -4509,6 +4524,10 @@ static void InitWalSnd(void)
walsnd->lastCalWrite = InvalidXLogRecPtr;
walsnd->catchupRate = 0;
walsnd->slot_idx = -1;
walsnd->lastReceiveChangeTime = 0;
walsnd->lastWriteChangeTime = 0;
walsnd->lastFlushChangeTime = 0;
walsnd->lastApplyChangeTime = 0;
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
OwnLatch((Latch *)&walsnd->latch);

View File

@ -264,6 +264,8 @@ typedef struct knl_session_attr_storage {
void* logical_decode_options_default;
int logical_sender_timeout;
int ignore_standby_lsn_window;
int ignore_feedback_xmin_window;
} knl_session_attr_storage;
#endif /* SRC_INCLUDE_KNL_KNL_SESSION_ATTR_STORAGE */

View File

@ -173,6 +173,7 @@ typedef struct ReplicationSlot {
ArchiveConfig* archive_config;
bool is_recovery;
char* extra_content;
TimestampTz last_xmin_change_time;
} ReplicationSlot;
typedef struct ArchiveSlotConfig {

View File

@ -49,6 +49,11 @@ extern volatile bool most_available_sync;
#define GetWalsndSyncRepConfig(walsnder) \
(t_thrd.syncrep_cxt.SyncRepConfig[(walsnder)->sync_standby_group])
#define IfIgnoreStandbyLsn(nowTime, lastTime) \
(t_thrd.walsender_cxt.WalSndCtl->most_available_sync && \
u_sess->attr.attr_storage.ignore_standby_lsn_window > 0 && \
timestamptz_cmp_internal(nowTime, TimestampTzPlusMilliseconds(lastTime, \
u_sess->attr.attr_storage.ignore_standby_lsn_window)) >= 0)
/*
* SyncRepGetCandidateStandbys returns an array of these structs,
@ -70,6 +75,10 @@ typedef struct SyncRepStandbyData
/* This flag indicates whether this struct is about our own process */
bool is_me;
bool is_cross_cluster;
bool receive_too_old;
bool write_too_old;
bool flush_too_old;
bool apply_too_old;
} SyncRepStandbyData;

View File

@ -156,6 +156,10 @@ typedef struct WalSnd {
TimestampTz lastRequestTimestamp;
/* The idx of replication slot */
int slot_idx;
TimestampTz lastReceiveChangeTime;
TimestampTz lastWriteChangeTime;
TimestampTz lastFlushChangeTime;
TimestampTz lastApplyChangeTime;
} WalSnd;
extern THR_LOCAL WalSnd* MyWalSnd;

View File

@ -381,6 +381,8 @@ select name,vartype,unit,min_val,max_val from pg_settings where name <> 'qunit_c
ident_file | string | | |
idle_in_transaction_session_timeout | integer | s | 0 | 86400
ignore_checksum_failure | bool | | |
ignore_feedback_xmin_window | integer | ms | 0 | 2147483647
ignore_standby_lsn_window | integer | ms | 0 | 2147483647
ignore_system_indexes | bool | | |
incremental_checkpoint_timeout | integer | s | 1 | 3600
instance_metric_retention_time | integer | | 0 | 3650