!846 修复一主多备下备机故障时主机walsender超时未退出的问题
Merge pull request !846 from chenxiaobin/walsnd_timeout
This commit is contained in:
@ -1301,6 +1301,7 @@ static void knl_t_walsender_init(knl_t_walsender_context* walsender_cxt)
|
||||
walsender_cxt->tmpbuf = (StringInfoData*)palloc0(sizeof(StringInfoData));
|
||||
walsender_cxt->remotePort = 0;
|
||||
walsender_cxt->walSndCaughtUp = false;
|
||||
walsender_cxt->last_check_timeout_timestamp = 0;
|
||||
}
|
||||
|
||||
static void knl_t_tsearch_init(knl_t_tsearch_context* tsearch_cxt)
|
||||
|
||||
@ -3499,26 +3499,26 @@ static void WalSndCheckTimeOut(TimestampTz now)
|
||||
}
|
||||
|
||||
/*
|
||||
* Use static last_reply_time to avoid call GetHeartbeatLastReplyTimestamp frequently
|
||||
* Use last_check_timeout_timestamp to avoid call GetHeartbeatLastReplyTimestamp frequently
|
||||
* when t_thrd.walsender_cxt.last_reply_timestamp has meet the timeout condition
|
||||
* but last heartbeat time doesn't.
|
||||
*/
|
||||
static TimestampTz last_reply_time = t_thrd.walsender_cxt.last_reply_timestamp;
|
||||
TimestampTz *last_reply_time = &t_thrd.walsender_cxt.last_check_timeout_timestamp;
|
||||
/* t_thrd.walsender_cxt.last_reply_timestamp newer */
|
||||
if (timestamptz_cmp_internal(t_thrd.walsender_cxt.last_reply_timestamp, last_reply_time) > 0) {
|
||||
last_reply_time = t_thrd.walsender_cxt.last_reply_timestamp;
|
||||
if (timestamptz_cmp_internal(t_thrd.walsender_cxt.last_reply_timestamp, *last_reply_time) > 0) {
|
||||
*last_reply_time = t_thrd.walsender_cxt.last_reply_timestamp;
|
||||
}
|
||||
|
||||
timeout = CalculateTimeout(last_reply_time);
|
||||
timeout = CalculateTimeout(*last_reply_time);
|
||||
if (now < timeout) {
|
||||
return;
|
||||
}
|
||||
|
||||
TimestampTz heartbeat = GetHeartbeatLastReplyTimestamp();
|
||||
/* If heartbeat newer, use heartbeat to recalculate timeout. */
|
||||
if (timestamptz_cmp_internal(heartbeat, last_reply_time) > 0) {
|
||||
last_reply_time = heartbeat;
|
||||
timeout = CalculateTimeout(last_reply_time);
|
||||
if (timestamptz_cmp_internal(heartbeat, *last_reply_time) > 0) {
|
||||
*last_reply_time = heartbeat;
|
||||
timeout = CalculateTimeout(*last_reply_time);
|
||||
}
|
||||
|
||||
if (now >= timeout) {
|
||||
@ -3529,7 +3529,7 @@ static void WalSndCheckTimeOut(TimestampTz now)
|
||||
*/
|
||||
if (log_min_messages <= ERROR || client_min_messages <= ERROR) {
|
||||
WalReplicationTimestampInfo timeStampInfo;
|
||||
WalReplicationTimestampToString(&timeStampInfo, now, timeout, last_reply_time, heartbeat);
|
||||
WalReplicationTimestampToString(&timeStampInfo, now, timeout, *last_reply_time, heartbeat);
|
||||
ereport(ERROR, (errmsg("terminating Walsender process due to replication timeout."
|
||||
"now time(%s) timeout time(%s) last recv time(%s) heartbeat time(%s)",
|
||||
timeStampInfo.nowTimeStamp, timeStampInfo.timeoutStamp,
|
||||
|
||||
@ -2189,6 +2189,8 @@ typedef struct knl_t_walsender_context {
|
||||
bool walSndCaughtUp;
|
||||
/* Notify primary to advance logical replication slot. */
|
||||
struct pg_conn* advancePrimaryConn;
|
||||
/* Timestamp of the last check-timeout time in WalSndCheckTimeOut. */
|
||||
TimestampTz last_check_timeout_timestamp;
|
||||
} knl_t_walsender_context;
|
||||
|
||||
typedef struct knl_t_walreceiverfuncs_context {
|
||||
|
||||
Reference in New Issue
Block a user