From d54fc03793db89a62a802eea764a7f5a303fa38f Mon Sep 17 00:00:00 2001 From: chenxiaobin <1025221611@qq.com> Date: Thu, 14 Jan 2021 15:16:00 +0800 Subject: [PATCH] set WalSndCaughtUp to thread context variable --- .../process/threadpool/knl_thread.cpp | 1 + .../storage/replication/walsender.cpp | 35 +++++++++---------- src/include/knl/knl_thread.h | 2 ++ 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/src/gausskernel/process/threadpool/knl_thread.cpp b/src/gausskernel/process/threadpool/knl_thread.cpp index c8290896f..0ef8d7ecf 100755 --- a/src/gausskernel/process/threadpool/knl_thread.cpp +++ b/src/gausskernel/process/threadpool/knl_thread.cpp @@ -1297,6 +1297,7 @@ static void knl_t_walsender_init(knl_t_walsender_context* walsender_cxt) walsender_cxt->reply_message = (StringInfoData*)palloc0(sizeof(StringInfoData)); walsender_cxt->tmpbuf = (StringInfoData*)palloc0(sizeof(StringInfoData)); walsender_cxt->remotePort = 0; + walsender_cxt->walSndCaughtUp = false; } static void knl_t_tsearch_init(knl_t_tsearch_context* tsearch_cxt) diff --git a/src/gausskernel/storage/replication/walsender.cpp b/src/gausskernel/storage/replication/walsender.cpp index a3645210b..9ef1a80cb 100755 --- a/src/gausskernel/storage/replication/walsender.cpp +++ b/src/gausskernel/storage/replication/walsender.cpp @@ -108,8 +108,6 @@ extern void *internal_load_library(const char *libname); extern char *expand_dynamic_library_name(const char *name); extern bool PMstateIsRun(void); -/* Are we there yet? */ -static bool WalSndCaughtUp = false; #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ bool WalSegmemtRemovedhappened = false; volatile bool bSyncStat = false; @@ -1577,7 +1575,7 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc) break; /* Waiting for new WAL. Since we need to wait, we're now caught up. */ - WalSndCaughtUp = true; + t_thrd.walsender_cxt.walSndCaughtUp = true; /* * Try to flush pending output to the client. Also wait for the socket @@ -3020,7 +3018,7 @@ static int WalSndLoop(WalSndSendDataCallback send_data) * sender is "out of work". */ if (WalSndCaughtup()) { - WalSndCaughtUp = true; + t_thrd.walsender_cxt.walSndCaughtUp = true; t_thrd.walsender_cxt.sentPtr = InvalidXLogRecPtr; /* Close open wal file */ @@ -3048,10 +3046,10 @@ static int WalSndLoop(WalSndSendDataCallback send_data) if (!pq_is_send_pending()) send_data(); else - WalSndCaughtUp = false; + t_thrd.walsender_cxt.walSndCaughtUp = false; /* Send DummyStandby end message */ - if (WalSndCaughtUp) { + if (t_thrd.walsender_cxt.walSndCaughtUp) { /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) break; @@ -3070,9 +3068,9 @@ static int WalSndLoop(WalSndSendDataCallback send_data) if (!pq_is_send_pending()) send_data(); else - WalSndCaughtUp = false; + t_thrd.walsender_cxt.walSndCaughtUp = false; - if (WalSndCaughtUp && dummyStandbyMode) { + if (t_thrd.walsender_cxt.walSndCaughtUp && dummyStandbyMode) { if (!pq_is_send_pending()) { WalSndSyncDummyStandbyDone(false); (void)pq_flush(); @@ -3090,7 +3088,7 @@ static int WalSndLoop(WalSndSendDataCallback send_data) } /* If nothing remains to be sent right now ... */ - if (WalSndCaughtUp && !pq_is_send_pending()) { + if (t_thrd.walsender_cxt.walSndCaughtUp && !pq_is_send_pending()) { /* * If we're in catchup state, move to streaming. This is an * important state change for users to know about, since before @@ -3127,7 +3125,7 @@ static int WalSndLoop(WalSndSendDataCallback send_data) else send_data(); - if (WalSndCaughtUp && !pq_is_send_pending()) { + if (t_thrd.walsender_cxt.walSndCaughtUp && !pq_is_send_pending()) { if (dummyStandbyMode || XLByteEQ(t_thrd.walsender_cxt.sentPtr, t_thrd.walsender_cxt.MyWalSnd->flush)) t_thrd.walsender_cxt.walsender_shutdown_requested = true; @@ -3156,7 +3154,7 @@ static int WalSndLoop(WalSndSendDataCallback send_data) * loaded a subset of the available data but then pq_flush_if_writable * flushed it all --- we should immediately try to send more. */ - if (WalSndCaughtUp || pq_is_send_pending()) { + if (t_thrd.walsender_cxt.walSndCaughtUp || pq_is_send_pending()) { long sleeptime; int wakeEvents; @@ -3694,7 +3692,7 @@ static void XLogSendLogical(void) * XLogReadRecord() had to stop reading but WalSndWaitForWal didn't wait - * i.e. when we're shutting down. */ - WalSndCaughtUp = false; + t_thrd.walsender_cxt.walSndCaughtUp = false; record = XLogReadRecord(t_thrd.walsender_cxt.logical_decoding_ctx->reader, t_thrd.walsender_cxt.logical_startptr, &errm); @@ -3718,7 +3716,7 @@ static void XLogSendLogical(void) * then we're caught up. */ if (t_thrd.walsender_cxt.logical_decoding_ctx->reader->EndRecPtr >= GetFlushRecPtr()) - WalSndCaughtUp = true; + t_thrd.walsender_cxt.walSndCaughtUp = true; } /* Update shared memory status */ @@ -3776,7 +3774,7 @@ static void XLogSendPhysical(void) ereport(LOG, (errmsg("terminating walsender process to force cascaded standby " "to update timeline and reconnect"))); t_thrd.walsender_cxt.walsender_ready_to_stop = true; - WalSndCaughtUp = true; + t_thrd.walsender_cxt.walSndCaughtUp = true; return; } } else if (dummyStandbyMode) @@ -3786,7 +3784,7 @@ static void XLogSendPhysical(void) /* Quick exit if nothing to do */ if (!u_sess->attr.attr_storage.enable_stream_replication || XLByteLE(SendRqstPtr, t_thrd.walsender_cxt.sentPtr)) { - WalSndCaughtUp = true; + t_thrd.walsender_cxt.walSndCaughtUp = true; return; } @@ -3808,11 +3806,11 @@ static void XLogSendPhysical(void) /* if we went beyond SendRqstPtr, back off */ if (XLByteLE(SendRqstPtr, endptr)) { endptr = SendRqstPtr; - WalSndCaughtUp = true; + t_thrd.walsender_cxt.walSndCaughtUp = true; } else { /* round down to page boundary. */ endptr -= (endptr % XLOG_BLCKSZ); - WalSndCaughtUp = false; + t_thrd.walsender_cxt.walSndCaughtUp = false; t_thrd.walsender_cxt.catchup_threshold = XLByteDifference(SendRqstPtr, endptr); } @@ -3848,7 +3846,8 @@ static void XLogSendPhysical(void) msghdr.walEnd = SendRqstPtr; msghdr.sendTime = GetCurrentTimestamp(); msghdr.sender_sent_location = endptr; - msghdr.catchup = (t_thrd.walsender_cxt.MyWalSnd->state == WALSNDSTATE_CATCHUP && !WalSndCaughtUp); + msghdr.catchup = (t_thrd.walsender_cxt.MyWalSnd->state == WALSNDSTATE_CATCHUP && + !t_thrd.walsender_cxt.walSndCaughtUp); SpinLockAcquire(&hashmdata->mutex); local_role = hashmdata->current_mode; SpinLockRelease(&hashmdata->mutex); diff --git a/src/include/knl/knl_thread.h b/src/include/knl/knl_thread.h index c69cfe057..2bbba0eeb 100644 --- a/src/include/knl/knl_thread.h +++ b/src/include/knl/knl_thread.h @@ -2183,6 +2183,8 @@ typedef struct knl_t_walsender_context { struct LogicalDecodingContext* logical_decoding_ctx; XLogRecPtr logical_startptr; int remotePort; + /* Have we caught up with primary? */ + bool walSndCaughtUp; } knl_t_walsender_context; typedef struct knl_t_walreceiverfuncs_context {