清理主机残留的级联备复制槽

This commit is contained in:
xue_meng_en
2024-05-13 19:39:53 +08:00
committed by yaoxin
parent 0b7332ef5f
commit 98802d4120
11 changed files with 300 additions and 12 deletions

View File

@ -75,12 +75,13 @@ bool will_shutdown = false;
* NEXT | 92899 | ? | ?
*
********************************************/
const uint32 GRAND_VERSION_NUM = 92929;
const uint32 GRAND_VERSION_NUM = 92931;
/********************************************
* 2.VERSION NUM FOR EACH FEATURE
* Please write indescending order.
********************************************/
const uint32 ADD_CLEAN_CASCADE_STANDBY_SLOT_MESSAGE_NUM = 92930;
const uint32 FLUSH_LSN_VERSION_NUM = 92929;
const uint32 PRIOR_EXPR_VERSION_NUM = 92928;
const uint32 PUBLICATION_DDL_VERSION_NUM = 92921;

View File

@ -510,6 +510,7 @@ static void knl_g_xlog_init(knl_g_xlog_context *xlog_cxt)
pthread_mutex_init(&xlog_cxt->remain_segs_lock, NULL);
xlog_cxt->shareStorageLockFd = -1;
xlog_cxt->ssReplicationXLogCtl = NULL;
xlog_cxt->need_clean_slot = false;
}
static void KnlGUndoInit(knl_g_undo_context *undoCxt)

View File

@ -58,6 +58,7 @@
extern bool PMstateIsRun(void);
static void ReplicationSlotDropAcquired(void);
static void ReplicationSlotDropWithSlot(ReplicationSlot *slot);
/* internal persistency functions */
static void RestoreSlotFromDisk(const char *name);
@ -601,8 +602,6 @@ bool IsLogicalReplicationSlot(const char *name)
{
bool isLogical = false;
Assert(t_thrd.slot_cxt.MyReplicationSlot == NULL);
ReplicationSlotValidateName(name, ERROR);
/* Search for the named slot to identify whether it is a logical replication slot or not. */
@ -698,6 +697,30 @@ void ReplicationSlotRelease(void)
LWLockRelease(ProcArrayLock);
}
void replication_slot_drop_without_acquire(const char *name)
{
ReplicationSlot *slot = NULL;
(void)ReplicationSlotValidateName(name, WARNING);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (int i = 0; i < g_instance.attr.attr_storage.max_replication_slots; i++) {
ReplicationSlot *s = &t_thrd.slot_cxt.ReplicationSlotCtl->replication_slots[i];
if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) {
volatile ReplicationSlot *vslot = s;
SpinLockAcquire(&s->mutex);
vslot->active = true;
SpinLockRelease(&s->mutex);
slot = s;
break;
}
}
LWLockRelease(ReplicationSlotControlLock);
if (slot == NULL) {
return;
}
ReplicationSlotDropWithSlot(slot);
ereport(ERROR, (errmsg("replication_slot_drop_without_acquire:start clean slot:[%s].", name)));
}
/*
* Permanently drop replication slot identified by the passed in name.
*/
@ -743,15 +766,18 @@ void ReplicationSlotDrop(const char *name, bool for_backup, bool nowait)
*/
static void ReplicationSlotDropAcquired(void)
{
char path[MAXPGPATH];
char tmppath[MAXPGPATH];
bool is_archive_slot = (t_thrd.slot_cxt.MyReplicationSlot->archive_config != NULL);
ReplicationSlot *slot = t_thrd.slot_cxt.MyReplicationSlot;
Assert(t_thrd.slot_cxt.MyReplicationSlot != NULL);
/* slot isn't acquired anymore */
t_thrd.slot_cxt.MyReplicationSlot = NULL;
ReplicationSlotDropWithSlot(slot);
}
static void ReplicationSlotDropWithSlot(ReplicationSlot *slot)
{
char path[MAXPGPATH];
char tmppath[MAXPGPATH];
bool is_archive_slot = (slot->archive_config != NULL);
char replslot_path[MAXPGPATH];
GetReplslotPath(replslot_path);

View File

@ -192,6 +192,7 @@ const WalReceiverFunc WalReceiverFuncTable[] = {
sub_startstreaming, sub_create_slot}
};
static const int SEND_CLEAN_SLOT_INTERVAL = 1000;
/* Prototypes for private functions */
static void EnableWalRcvImmediateExit(void);
static void DisableWalRcvImmediateExit(void);
@ -234,7 +235,8 @@ static void ResetConfirmedLSNOnDisk();
#ifdef ENABLE_MULTIPLE_NODES
static void WalRecvHadrSendReply();
#endif
void xlog_wal_rcv_send_clean_slot_request(char *slot_name);
void process_complete_clean_slot_request(char *slot_name);
void ProcessWalRcvInterrupts(void)
{
@ -374,6 +376,79 @@ void RefuseConnect()
disconn_node.disable_conn_node_host, disconn_node.disable_conn_node_port)));
}
void scan_slots_need_clean_and_request_primary()
{
int cur = 0;
char name[NAMEDATALEN] = {0};
bool found = false;
errno_t errorno = EOK;
while (cur < g_instance.attr.attr_storage.max_replication_slots) {
errorno = memset_s(name, NAMEDATALEN, 0, NAMEDATALEN);
securec_check(errorno, "\0", "\0");
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (; cur < g_instance.attr.attr_storage.max_replication_slots; cur++) {
ReplicationSlot *s = &t_thrd.slot_cxt.ReplicationSlotCtl->replication_slots[cur];
if (!s->in_use || !s->active || s->archive_config != NULL) {
continue;
}
if (s->need_clean) {
errorno = memcpy_s(name, NAMEDATALEN, s->data.name.data, NAMEDATALEN);
securec_check(errorno, "\0", "\0");
found = true;
break;
}
}
if (!found) {
g_instance.xlog_cxt.need_clean_slot = false;
LWLockRelease(ReplicationSlotControlLock);
break;
}
LWLockRelease(ReplicationSlotControlLock);
if (cur < g_instance.attr.attr_storage.max_replication_slots) {
xlog_wal_rcv_send_clean_slot_request(name);
cur++;
}
}
}
void xlog_wal_rcv_send_clean_slot_request(char *slot_name)
{
CleanSlotRequestMessage request_message;
char buf[sizeof(CleanSlotRequestMessage) + 1];
TimestampTz local_now;
errno_t errorno = EOK;
/* Get current timestamp. */
local_now = GetCurrentTimestamp();
request_message.sendTime = local_now;
errorno = snprintf_s(request_message.slotName.data, NAMEDATALEN, NAMEDATALEN - 1, "%s", slot_name);
securec_check_ss(errorno, "\0", "\0");
/* Prepend with the message type and send it. */
buf[0] = 'C';
errorno = memcpy_s(&buf[1], sizeof(CleanSlotRequestMessage), &request_message, sizeof(CleanSlotRequestMessage));
securec_check(errorno, "\0", "\0");
(WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_send(buf, sizeof(CleanSlotRequestMessage) + 1);
ereport(LOG, (errmsg("send clean slot:%s request to Primary(%s) successfully.",
slot_name, t_thrd.walreceiverfuncs_cxt.WalRcv->slotname)));
}
void check_and_send_slot_clean(TimestampTz &clean_slot_sent_timestamp)
{
if (pg_atomic_read_u32(&WorkingGrandVersionNum) >= ADD_CLEAN_CASCADE_STANDBY_SLOT_MESSAGE_NUM &&
u_sess->attr.attr_common.upgrade_mode == 0 && g_instance.xlog_cxt.need_clean_slot &&
t_thrd.xlog_cxt.server_mode == STANDBY_MODE && !t_thrd.xlog_cxt.is_hadr_main_standby &&
!t_thrd.xlog_cxt.is_cascade_standby) {
TimestampTz nowtime = GetCurrentTimestamp();
TimestampTz calculateTime = TimestampTzPlusMilliseconds(clean_slot_sent_timestamp, SEND_CLEAN_SLOT_INTERVAL);
if (timestamptz_cmp_internal(nowtime, calculateTime) >= 0) {
clean_slot_sent_timestamp = nowtime;
scan_slots_need_clean_and_request_primary();
}
}
}
void WalRcvrProcessData(TimestampTz *last_recv_timestamp, bool *ping_sent)
{
/* use volatile pointer to prevent code rearrangement */
@ -382,6 +457,8 @@ void WalRcvrProcessData(TimestampTz *last_recv_timestamp, bool *ping_sent)
char *buf = NULL;
int len;
static TimestampTz clean_slot_sent_timestamp = 0;
#ifdef ENABLE_DISTRIBUTE_TEST
if (TEST_STUB(DN_WALRECEIVE_MAINLOOP, stub_sleep_emit)) {
ereport(get_distribute_test_param()->elevel,
@ -448,7 +525,7 @@ void WalRcvrProcessData(TimestampTz *last_recv_timestamp, bool *ping_sent)
XLogWalRcvSendSwitchTimeoutRequest();
SendPostmasterSignal(PMSIGNAL_SWITCHOVER_TIMEOUT);
}
check_and_send_slot_clean(clean_slot_sent_timestamp);
/* Wait a while for data to arrive */
if ((WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) {
*last_recv_timestamp = GetCurrentTimestamp();
@ -1235,6 +1312,16 @@ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
ProcessHadrSwitchoverRequest(&hadrSwithoverMessage);
break;
}
case 'D': {
CompleteCleanSlotRequestMessage request_message;
CHECK_MSG_SIZE(len, CompleteCleanSlotRequestMessage,
"invalid CompleteCleanSlotRequestMessage message received from primary");
errorno = memcpy_s(&request_message, sizeof(CompleteCleanSlotRequestMessage), buf,
sizeof(CompleteCleanSlotRequestMessage));
securec_check(errorno, "\0", "\0");
process_complete_clean_slot_request(NameStr(request_message.slotName));
break;
}
default:
ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid replication message type %c", type)));
@ -2223,6 +2310,30 @@ static void ProcessSwitchResponse(int code)
}
}
/*
* process switchover response message from primary.
*/
void process_complete_clean_slot_request(char *slot_name)
{
bool cleaned = false;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (int i = 0; i < g_instance.attr.attr_storage.max_replication_slots; i++) {
ReplicationSlot *s = &t_thrd.slot_cxt.ReplicationSlotCtl->replication_slots[i];
if (strcmp(slot_name, NameStr(s->data.name)) == 0) {
volatile ReplicationSlot *vslot = s;
SpinLockAcquire(&s->mutex);
vslot->need_clean = false;
SpinLockRelease(&s->mutex);
cleaned = true;
break;
}
}
LWLockRelease(ReplicationSlotControlLock);
if (cleaned) {
ereport(LOG, (errmsg("clean [%s] clean_slot sign", slot_name)));
}
}
/*
* Keep track of important messages from primary.
*/

View File

@ -255,6 +255,8 @@ static void WalSndHadrSwitchoverRequest();
static void ProcessHadrSwitchoverMessage();
static void ProcessHadrReplyMessage();
static int WalSndTimeout();
void process_clean_slot_message();
void wal_snd_clean_slot(char *msgbuf, char *slot_name);
char *DataDir = ".";
@ -1225,6 +1227,16 @@ static void StartReplication(StartReplicationCmd *cmd)
if (t_thrd.slot_cxt.MyReplicationSlot->data.database != InvalidOid)
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
(errmsg("cannot use a logical replication slot for physical replication"))));
load_server_mode();
if (pg_atomic_read_u32(&WorkingGrandVersionNum) >= ADD_CLEAN_CASCADE_STANDBY_SLOT_MESSAGE_NUM &&
u_sess->attr.attr_common.upgrade_mode == 0 && AM_WAL_STANDBY_SENDER &&
t_thrd.xlog_cxt.server_mode == STANDBY_MODE && !t_thrd.xlog_cxt.is_hadr_main_standby &&
!t_thrd.xlog_cxt.is_cascade_standby && strncmp(cmd->slotname, "gs_roach", strlen("gs_roach")) != 0) {
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
g_instance.xlog_cxt.need_clean_slot = true;
t_thrd.slot_cxt.MyReplicationSlot->need_clean = true;
LWLockRelease(ReplicationSlotControlLock);
}
}
/*
@ -2559,6 +2571,9 @@ static void ProcessStandbyMessage(void)
ProcessUwalCatchupEndMessage();
}
break;
case 'C':
process_clean_slot_message();
break;
default:
ereport(COMMERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected message type \"%d\"", msgtype)));
@ -6985,6 +7000,23 @@ static void WalSndResponseSwitchover(char *msgbuf)
(void)pq_putmessage_noblock('d', msgbuf, sizeof(PrimarySwitchResponseMessage) + 1);
}
/*
* send cleaned slots message
*/
void wal_snd_clean_slot(char *msgbuf, char *slot_name)
{
errno_t errorno = EOK;
CompleteCleanSlotRequestMessage request_message;
errorno = snprintf_s(request_message.slotName.data, NAMEDATALEN, NAMEDATALEN - 1, "%s", slot_name);
securec_check_ss(errorno, "\0", "\0");
request_message.sendTime = GetCurrentTimestamp();
msgbuf[0] = 'D';
errorno = memcpy_s(msgbuf + 1, sizeof(CompleteCleanSlotRequestMessage), &request_message,
sizeof(CompleteCleanSlotRequestMessage));
securec_check(errorno, "\0", "\0");
(void)pq_putmessage_noblock('d', msgbuf, sizeof(CompleteCleanSlotRequestMessage) + 1);
}
/*
* send archive xlog command
*/
@ -7614,6 +7646,35 @@ static void ProcessHadrSwitchoverMessage()
(int32)(walsnd->interactiveState), walsnd->isMasterInstanceReady)));
}
void process_clean_slot_message()
{
CleanSlotRequestMessage message;
pq_copymsgbytes(t_thrd.walsender_cxt.reply_message, (char *)&message, sizeof(CleanSlotRequestMessage));
Assert(t_thrd.slot_cxt.MyReplicationSlot != NULL);
ereport(LOG, (errmsg("get clean slot:[%s] request from standby:[%s]", NameStr(message.slotName),
NameStr(t_thrd.slot_cxt.MyReplicationSlot->data.name))));
if (g_instance.attr.attr_storage.max_replication_slots == 0) {
ereport(WARNING, (errmsg("get clean slot:[%s] request, but ""max_replication_slots is 0",
NameStr(message.slotName))));
}
if (strncmp(NameStr(message.slotName), "gs_roach", strlen("gs_roach")) == 0) {
ereport(WARNING, (errmsg("get clean slot:[%s] request, standby shouldn't send the roach slot clean request.",
NameStr(message.slotName))));
return;
}
if (IsLogicalReplicationSlot(NameStr(message.slotName))) {
ereport(WARNING, (errmsg("get clean slot:[%s] request, but this slot is logical. Won't clean it.",
NameStr(message.slotName))));
} else {
replication_slot_drop_without_acquire(NameStr(message.slotName));
}
wal_snd_clean_slot(t_thrd.walsender_cxt.output_xlog_message, NameStr(message.slotName));
}
static void ProcessHadrReplyMessage()
{
HadrReplyMessage hadrReply;

View File

@ -921,6 +921,7 @@ typedef struct knl_g_xlog_context {
ShareStorageOperateCtl shareStorageopCtl;
int shareStorageLockFd;
ShareStorageXLogCtl *ssReplicationXLogCtl;
bool need_clean_slot; /* standby node will check cascade standby slot */
} knl_g_xlog_context;
typedef struct knl_g_undo_context {

View File

@ -357,6 +357,7 @@ extern uint8 ce_cache_refresh_type;
#define MIN_QUERY_DOP -(MAX_QUERY_DOP)
extern const uint32 BACKUP_SLOT_VERSION_NUM;
extern const uint32 ADD_CLEAN_CASCADE_STANDBY_SLOT_MESSAGE_NUM;
/* Debug mode.
* 0 - Do not change any thing.

View File

@ -132,6 +132,8 @@ typedef struct ReplicationSlot {
/* any outstanding modifications? */
bool just_dirtied;
bool dirty;
/* need clean in Primary */
bool need_clean;
/*
* For logical decoding, it's extremely important that we never remove any
@ -269,7 +271,8 @@ extern void ReplicationSlotCreate(const char* name, ReplicationSlotPersistency p
Oid databaseId, XLogRecPtr restart_lsn, XLogRecPtr confirmed_lsn, char* extra_content = NULL,
bool encrypted = false);
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char* name, bool for_backup = false, bool nowait = true);
extern void ReplicationSlotDrop(const char *name, bool for_backup = false, bool record_log = true);
extern void replication_slot_drop_without_acquire(const char *name);
extern void ReplicationSlotAcquire(const char* name, bool isDummyStandby, bool allowDrop = false, bool nowait = true);
extern bool IsReplicationSlotActive(const char *name);
extern bool IsLogicalReplicationSlot(const char *name);

View File

@ -272,6 +272,36 @@ typedef struct StandbySwitchRequestMessage {
TimestampTz sendTime;
} StandbySwitchRequestMessage;
/*
* @@GaussDB@@
* switchover request message from standby (message type 'C'). This is wrapped within
* a CopyData message at the FE/BE protocol level.
*
* Note that the data length is not specified here.
*/
typedef struct {
/* instance name of slot to be cleaned */
NameData slotName;
/* receiver's system clock at the time of transmission */
TimestampTz sendTime;
} CleanSlotRequestMessage;
/*
* @@GaussDB@@
* switchover request message from standby (message type 'S'). This is wrapped within
* a CopyData message at the FE/BE protocol level.
*
* Note that the data length is not specified here.
*/
typedef struct {
/* cleaned slot name */
NameData slotName;
/* receiver's system clock at the time of transmission */
TimestampTz sendTime;
} CompleteCleanSlotRequestMessage;
/*
* switchover request message in the streaming dr (message type ''). This is wrapped within
* a CopyData message at the FE/BE protocol level.

View File

@ -1 +1,2 @@
exrtostandbyread/single_standby_read_base
exrtostandbyread/single_standby_read_base
cascade/clean_slot

View File

@ -0,0 +1,52 @@
#!/bin/sh
source ./util.sh
function check_select_result()
{
if [ $(echo $result | grep "${1}" | wc -l) -eq 1 ]; then
echo "remote read successful"
else
echo "remote read failed $failed_keyword with [$result]"
exit 1
fi
}
function check_select_no_result()
{
if [ $(echo $result | grep "${1}" | wc -l) -eq 0 ]; then
echo "remote read successful"
else
echo "remote read failed $failed_keyword with [$result]"
exit 1
fi
}
function test_cascade_standby_clean_slot_func()
{
set_default
echo "base"
result=`gsql -d $db -p $dn1_primary_port -c "select * from pg_get_replication_slots();"`
check_select_result "dn_s2"
kill_cascade_cluster
start_cascade_cluster
result=`gsql -d $db -p $dn1_primary_port -c "select * from pg_get_replication_slots();"`
check_select_no_result "dn_s2"
switchover_to_cascade_standby
result=`gsql -d $db -p $dn1_primary_port -c "select * from pg_get_replication_slots();"`
check_select_no_result "dn_s1"
}
function tear_down() {
set_cascade_default
}
test_cascade_standby_clean_slot_func
tear_down