!89 synchronous_commit parameter supports remote_apply mode

Merge pull request !89 from meimeidtt/patch2
This commit is contained in:
opengauss-bot
2020-08-07 17:06:02 +08:00
committed by Gitee
5 changed files with 36 additions and 10 deletions

View File

@ -106,6 +106,7 @@ typedef enum {
SYNCHRONOUS_COMMIT_REMOTE_RECEIVE, /* wait for local flush and remote receive */
SYNCHRONOUS_COMMIT_REMOTE_WRITE, /* wait for local flush and remote write */
SYNCHRONOUS_COMMIT_REMOTE_FLUSH, /* wait for local and remote flush */
SYNCHRONOUS_COMMIT_REMOTE_REPLAY,
SYNCHRONOUS_BAD
} SyncCommitLevel;

View File

@ -888,6 +888,8 @@ static const struct config_enum_entry synchronous_commit_options[] = {{"local",
{"no", SYNCHRONOUS_COMMIT_OFF, true},
{"1", SYNCHRONOUS_COMMIT_ON, true},
{"0", SYNCHRONOUS_COMMIT_OFF, true},
{"2", SYNCHRONOUS_COMMIT_REMOTE_REPLAY, false},
{"remote_apply", SYNCHRONOUS_COMMIT_REMOTE_REPLAY, false},
{NULL, 0, false}};
static const struct config_enum_entry plan_cache_mode_options[] = {

View File

@ -80,11 +80,11 @@ static void SyncRepWaitCompletionQueue();
static void SyncRepNotifyComplete();
static int SyncRepGetStandbyPriority(void);
static bool SyncRepGetSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, bool* am_sync);
static bool SyncRepGetSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, XLogRecPtr* replayPtr, bool* am_sync);
static void SyncRepGetOldestSyncRecPtr(
XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, List* sync_standbys);
XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, XLogRecPtr* replayPtr, List* sync_standbys);
static void SyncRepGetNthLatestSyncRecPtr(
XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, List* sync_standbys, uint8 nth);
XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, XLogRecPtr* replayPtr, List* sync_standbys, uint8 nth);
#ifdef USE_ASSERT_CHECKING
static bool SyncRepQueueIsOrderedByLSN(int mode);
@ -434,6 +434,7 @@ void SyncRepReleaseWaiters(void)
XLogRecPtr receivePtr;
XLogRecPtr writePtr;
XLogRecPtr flushPtr;
XLogRecPtr replayPtr;
int numreceive = 0;
int numwrite = 0;
int numflush = 0;
@ -465,7 +466,7 @@ void SyncRepReleaseWaiters(void)
* Check whether we are a sync standby or not, and calculate the synced
* positions among all sync standbys.
*/
got_recptr = SyncRepGetSyncRecPtr(&receivePtr, &writePtr, &flushPtr, &am_sync);
got_recptr = SyncRepGetSyncRecPtr(&receivePtr, &writePtr, &flushPtr, &replayPtr, &am_sync);
/*
* If we are managing a sync standby, though we weren't prior to this,
@ -512,6 +513,11 @@ void SyncRepReleaseWaiters(void)
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
}
if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_REPALY], replayPtr)) {
walsndctl->lsn[SYNC_REP_WAIT_REPALY] = t_thrd.walsender_cxt.MyWalSnd->apply;
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_REPALY);
}
LWLockRelease(SyncRepLock);
ereport(DEBUG3,
@ -537,13 +543,14 @@ void SyncRepReleaseWaiters(void)
* On return, *am_sync is set to true if this walsender is connecting to
* sync standby. Otherwise it's set to false.
*/
static bool SyncRepGetSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, bool* am_sync)
static bool SyncRepGetSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, XLogRecPtr* replayPtr, bool* am_sync)
{
List* sync_standbys = NIL;
*receivePtr = InvalidXLogRecPtr;
*writePtr = InvalidXLogRecPtr;
*flushPtr = InvalidXLogRecPtr;
*replayPtr = InvalidXLogRecPtr;
*am_sync = false;
/* Get standbys that are considered as synchronous at this moment */
@ -572,10 +579,10 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, X
* positions even in a quorum-based sync replication.
*/
if (t_thrd.syncrep_cxt.SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) {
SyncRepGetOldestSyncRecPtr(receivePtr, writePtr, flushPtr, sync_standbys);
SyncRepGetOldestSyncRecPtr(receivePtr, writePtr, flushPtr, replayPtr, sync_standbys);
} else {
SyncRepGetNthLatestSyncRecPtr(
receivePtr, writePtr, flushPtr, sync_standbys, t_thrd.syncrep_cxt.SyncRepConfig->num_sync);
receivePtr, writePtr, flushPtr, replayPtr, sync_standbys, t_thrd.syncrep_cxt.SyncRepConfig->num_sync);
}
list_free(sync_standbys);
@ -586,7 +593,7 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr* receivePtr, XLogRecPtr* writePtr, X
* Calculate the oldest Write, Flush and Apply positions among sync standbys.
*/
static void SyncRepGetOldestSyncRecPtr(
XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, List* sync_standbys)
XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, XLogRecPtr* replayPtr, List* sync_standbys)
{
ListCell* cell = NULL;
@ -599,11 +606,13 @@ static void SyncRepGetOldestSyncRecPtr(
XLogRecPtr receive;
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
SpinLockAcquire(&walsnd->mutex);
receive = walsnd->receive;
write = walsnd->write;
flush = walsnd->flush;
apply = walsnd->apply;
SpinLockRelease(&walsnd->mutex);
if (XLogRecPtrIsInvalid(*writePtr) || !XLByteLE(*writePtr, write))
@ -612,6 +621,8 @@ static void SyncRepGetOldestSyncRecPtr(
*flushPtr = flush;
if (XLogRecPtrIsInvalid(*receivePtr) || !XLByteLE(*receivePtr, receive))
*receivePtr = receive;
if (XLogRecPtrIsInvalid(*replayPtr) || !XLByteLE(*replayPtr, apply))
*replayPtr = apply;
}
}
@ -620,12 +631,13 @@ static void SyncRepGetOldestSyncRecPtr(
* standbys.
*/
static void SyncRepGetNthLatestSyncRecPtr(
XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, List* sync_standbys, uint8 nth)
XLogRecPtr* receivePtr, XLogRecPtr* writePtr, XLogRecPtr* flushPtr, XLogRecPtr* replayPtr, List* sync_standbys, uint8 nth)
{
ListCell* cell = NULL;
XLogRecPtr* receive_array = NULL;
XLogRecPtr* write_array = NULL;
XLogRecPtr* flush_array = NULL;
XLogRecPtr* apply_array = NULL;
int len;
int i = 0;
@ -633,6 +645,7 @@ static void SyncRepGetNthLatestSyncRecPtr(
receive_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * len);
write_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * len);
flush_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * len);
apply_array = (XLogRecPtr*)palloc(sizeof(XLogRecPtr) * len);
foreach (cell, sync_standbys) {
WalSnd* walsnd = &t_thrd.walsender_cxt.WalSndCtl->walsnds[lfirst_int(cell)];
@ -641,6 +654,7 @@ static void SyncRepGetNthLatestSyncRecPtr(
receive_array[i] = walsnd->receive;
write_array[i] = walsnd->write;
flush_array[i] = walsnd->flush;
apply_array[i] = walsnd->apply;
SpinLockRelease(&walsnd->mutex);
i++;
@ -649,11 +663,13 @@ static void SyncRepGetNthLatestSyncRecPtr(
qsort(receive_array, len, sizeof(XLogRecPtr), cmp_lsn);
qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
/* Get Nth latest Write, Flush, Apply positions */
*receivePtr = receive_array[nth - 1];
*writePtr = write_array[nth - 1];
*flushPtr = flush_array[nth - 1];
*replayPtr = apply_array[nth - 1];
pfree(receive_array);
receive_array = NULL;
@ -661,6 +677,8 @@ static void SyncRepGetNthLatestSyncRecPtr(
write_array = NULL;
pfree(flush_array);
flush_array = NULL;
pfree(apply_array);
apply_array = NULL;
}
/*
@ -1332,6 +1350,9 @@ void assign_synchronous_commit(int newval, void* extra)
case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
break;
case SYNCHRONOUS_COMMIT_REMOTE_REPLAY:
SyncRepWaitMode = SYNC_REP_WAIT_REPALY;
break;
default:
SyncRepWaitMode = SYNC_REP_NO_WAIT;
break;

View File

@ -92,6 +92,7 @@ typedef enum {
SYNCHRONOUS_COMMIT_REMOTE_RECEIVE, /* wait for local flush and remote receive */
SYNCHRONOUS_COMMIT_REMOTE_WRITE, /* wait for local flush and remote write */
SYNCHRONOUS_COMMIT_REMOTE_FLUSH, /* wait for local and remote flush */
SYNCHRONOUS_COMMIT_REMOTE_REPLAY, /* wait for local and remote replay */
SYNCHRONOUS_BAD
} SyncCommitLevel;

View File

@ -25,8 +25,9 @@
#define SYNC_REP_WAIT_RECEIVE 0
#define SYNC_REP_WAIT_WRITE 1
#define SYNC_REP_WAIT_FLUSH 2
#define SYNC_REP_WAIT_REPALY 3
#define NUM_SYNC_REP_WAIT_MODE 3
#define NUM_SYNC_REP_WAIT_MODE 4
/* syncRepState */
#define SYNC_REP_NOT_WAITING 0