primary must update the targetBarrier incrementally.

Offering: openGaussDev

More detail: targetBarrier更新必须递增

Match-id-be34323f18803bbe8851773fc97170ec8b73be02
This commit is contained in:
openGaussDev
2022-03-10 20:36:29 +08:00
committed by yanghao
parent 87eeb92227
commit ea0a5061e9

View File

@ -4213,6 +4213,11 @@ static void InitWalSnd(void)
rc = memset_s(g_instance.streaming_dr_cxt.targetBarrierId, MAX_BARRIER_ID_LENGTH, 0,
sizeof(g_instance.streaming_dr_cxt.targetBarrierId));
securec_check(rc, "\0", "\0");
SpinLockAcquire(&g_instance.streaming_dr_cxt.mutex);
rc = memset_s(g_instance.streaming_dr_cxt.currentBarrierId, MAX_BARRIER_ID_LENGTH, 0,
sizeof(g_instance.streaming_dr_cxt.currentBarrierId));
SpinLockRelease(&g_instance.streaming_dr_cxt.mutex);
securec_check(rc, "\0", "\0");
}
walsnd->isTermChanged = false;
walsnd->peer_state = NORMAL_STATE;
@ -6877,9 +6882,11 @@ static void ProcessHadrReplyMessage()
HadrReplyMessage hadrReply;
pq_copymsgbytes(t_thrd.walsender_cxt.reply_message, (char*)&hadrReply, sizeof(HadrReplyMessage));
SpinLockAcquire(&g_instance.streaming_dr_cxt.mutex);
if (BARRIER_LT((char *)g_instance.streaming_dr_cxt.targetBarrierId, (char *)hadrReply.targetBarrierId)) {
int rc = strncpy_s((char *)g_instance.streaming_dr_cxt.targetBarrierId, MAX_BARRIER_ID_LENGTH,
hadrReply.targetBarrierId, MAX_BARRIER_ID_LENGTH - 1);
securec_check(rc, "\0", "\0");
}
SpinLockRelease(&g_instance.streaming_dr_cxt.mutex);
}