From 98802d4120b92938289de03293db2b850f14801f Mon Sep 17 00:00:00 2001 From: xue_meng_en <1836611252@qq.com> Date: Mon, 13 May 2024 19:39:53 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B8=85=E7=90=86=E4=B8=BB=E6=9C=BA=E6=AE=8B?= =?UTF-8?q?=E7=95=99=E7=9A=84=E7=BA=A7=E8=81=94=E5=A4=87=E5=A4=8D=E5=88=B6?= =?UTF-8?q?=E6=A7=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/common/backend/utils/init/globals.cpp | 3 +- .../process/threadpool/knl_instance.cpp | 1 + src/gausskernel/storage/replication/slot.cpp | 40 ++++-- .../storage/replication/walreceiver.cpp | 115 +++++++++++++++++- .../storage/replication/walsender.cpp | 61 ++++++++++ src/include/knl/knl_instance.h | 1 + src/include/miscadmin.h | 1 + src/include/replication/slot.h | 5 +- src/include/replication/walprotocol.h | 30 +++++ src/test/ha/ha_schedule_single_standby_read | 3 +- src/test/ha/testcase/cascade/clean_slot.sh | 52 ++++++++ 11 files changed, 300 insertions(+), 12 deletions(-) create mode 100644 src/test/ha/testcase/cascade/clean_slot.sh diff --git a/src/common/backend/utils/init/globals.cpp b/src/common/backend/utils/init/globals.cpp index e813f923e..7c1206bf1 100644 --- a/src/common/backend/utils/init/globals.cpp +++ b/src/common/backend/utils/init/globals.cpp @@ -75,12 +75,13 @@ bool will_shutdown = false; * NEXT | 92899 | ? | ? * ********************************************/ -const uint32 GRAND_VERSION_NUM = 92929; +const uint32 GRAND_VERSION_NUM = 92931; /******************************************** * 2.VERSION NUM FOR EACH FEATURE * Please write indescending order. ********************************************/ +const uint32 ADD_CLEAN_CASCADE_STANDBY_SLOT_MESSAGE_NUM = 92930; const uint32 FLUSH_LSN_VERSION_NUM = 92929; const uint32 PRIOR_EXPR_VERSION_NUM = 92928; const uint32 PUBLICATION_DDL_VERSION_NUM = 92921; diff --git a/src/gausskernel/process/threadpool/knl_instance.cpp b/src/gausskernel/process/threadpool/knl_instance.cpp index a8628b7ff..3764b06af 100755 --- a/src/gausskernel/process/threadpool/knl_instance.cpp +++ b/src/gausskernel/process/threadpool/knl_instance.cpp @@ -510,6 +510,7 @@ static void knl_g_xlog_init(knl_g_xlog_context *xlog_cxt) pthread_mutex_init(&xlog_cxt->remain_segs_lock, NULL); xlog_cxt->shareStorageLockFd = -1; xlog_cxt->ssReplicationXLogCtl = NULL; + xlog_cxt->need_clean_slot = false; } static void KnlGUndoInit(knl_g_undo_context *undoCxt) diff --git a/src/gausskernel/storage/replication/slot.cpp b/src/gausskernel/storage/replication/slot.cpp index 189eb7bad..a1d95fe51 100644 --- a/src/gausskernel/storage/replication/slot.cpp +++ b/src/gausskernel/storage/replication/slot.cpp @@ -58,6 +58,7 @@ extern bool PMstateIsRun(void); static void ReplicationSlotDropAcquired(void); +static void ReplicationSlotDropWithSlot(ReplicationSlot *slot); /* internal persistency functions */ static void RestoreSlotFromDisk(const char *name); @@ -601,8 +602,6 @@ bool IsLogicalReplicationSlot(const char *name) { bool isLogical = false; - Assert(t_thrd.slot_cxt.MyReplicationSlot == NULL); - ReplicationSlotValidateName(name, ERROR); /* Search for the named slot to identify whether it is a logical replication slot or not. */ @@ -698,6 +697,30 @@ void ReplicationSlotRelease(void) LWLockRelease(ProcArrayLock); } +void replication_slot_drop_without_acquire(const char *name) +{ + ReplicationSlot *slot = NULL; + (void)ReplicationSlotValidateName(name, WARNING); + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < g_instance.attr.attr_storage.max_replication_slots; i++) { + ReplicationSlot *s = &t_thrd.slot_cxt.ReplicationSlotCtl->replication_slots[i]; + if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) { + volatile ReplicationSlot *vslot = s; + SpinLockAcquire(&s->mutex); + vslot->active = true; + SpinLockRelease(&s->mutex); + slot = s; + break; + } + } + LWLockRelease(ReplicationSlotControlLock); + if (slot == NULL) { + return; + } + ReplicationSlotDropWithSlot(slot); + ereport(ERROR, (errmsg("replication_slot_drop_without_acquire:start clean slot:[%s].", name))); +} + /* * Permanently drop replication slot identified by the passed in name. */ @@ -743,15 +766,18 @@ void ReplicationSlotDrop(const char *name, bool for_backup, bool nowait) */ static void ReplicationSlotDropAcquired(void) { - char path[MAXPGPATH]; - char tmppath[MAXPGPATH]; - bool is_archive_slot = (t_thrd.slot_cxt.MyReplicationSlot->archive_config != NULL); ReplicationSlot *slot = t_thrd.slot_cxt.MyReplicationSlot; - Assert(t_thrd.slot_cxt.MyReplicationSlot != NULL); /* slot isn't acquired anymore */ t_thrd.slot_cxt.MyReplicationSlot = NULL; - + ReplicationSlotDropWithSlot(slot); +} + +static void ReplicationSlotDropWithSlot(ReplicationSlot *slot) +{ + char path[MAXPGPATH]; + char tmppath[MAXPGPATH]; + bool is_archive_slot = (slot->archive_config != NULL); char replslot_path[MAXPGPATH]; GetReplslotPath(replslot_path); diff --git a/src/gausskernel/storage/replication/walreceiver.cpp b/src/gausskernel/storage/replication/walreceiver.cpp index 29ca19d9a..56f9eb3f3 100755 --- a/src/gausskernel/storage/replication/walreceiver.cpp +++ b/src/gausskernel/storage/replication/walreceiver.cpp @@ -192,6 +192,7 @@ const WalReceiverFunc WalReceiverFuncTable[] = { sub_startstreaming, sub_create_slot} }; +static const int SEND_CLEAN_SLOT_INTERVAL = 1000; /* Prototypes for private functions */ static void EnableWalRcvImmediateExit(void); static void DisableWalRcvImmediateExit(void); @@ -234,7 +235,8 @@ static void ResetConfirmedLSNOnDisk(); #ifdef ENABLE_MULTIPLE_NODES static void WalRecvHadrSendReply(); #endif - +void xlog_wal_rcv_send_clean_slot_request(char *slot_name); +void process_complete_clean_slot_request(char *slot_name); void ProcessWalRcvInterrupts(void) { @@ -374,6 +376,79 @@ void RefuseConnect() disconn_node.disable_conn_node_host, disconn_node.disable_conn_node_port))); } +void scan_slots_need_clean_and_request_primary() +{ + int cur = 0; + char name[NAMEDATALEN] = {0}; + bool found = false; + errno_t errorno = EOK; + while (cur < g_instance.attr.attr_storage.max_replication_slots) { + errorno = memset_s(name, NAMEDATALEN, 0, NAMEDATALEN); + securec_check(errorno, "\0", "\0"); + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (; cur < g_instance.attr.attr_storage.max_replication_slots; cur++) { + ReplicationSlot *s = &t_thrd.slot_cxt.ReplicationSlotCtl->replication_slots[cur]; + if (!s->in_use || !s->active || s->archive_config != NULL) { + continue; + } + if (s->need_clean) { + errorno = memcpy_s(name, NAMEDATALEN, s->data.name.data, NAMEDATALEN); + securec_check(errorno, "\0", "\0"); + found = true; + break; + } + } + if (!found) { + g_instance.xlog_cxt.need_clean_slot = false; + LWLockRelease(ReplicationSlotControlLock); + break; + } + LWLockRelease(ReplicationSlotControlLock); + + if (cur < g_instance.attr.attr_storage.max_replication_slots) { + xlog_wal_rcv_send_clean_slot_request(name); + cur++; + } + } +} + +void xlog_wal_rcv_send_clean_slot_request(char *slot_name) +{ + CleanSlotRequestMessage request_message; + char buf[sizeof(CleanSlotRequestMessage) + 1]; + TimestampTz local_now; + errno_t errorno = EOK; + + /* Get current timestamp. */ + local_now = GetCurrentTimestamp(); + request_message.sendTime = local_now; + errorno = snprintf_s(request_message.slotName.data, NAMEDATALEN, NAMEDATALEN - 1, "%s", slot_name); + securec_check_ss(errorno, "\0", "\0"); + /* Prepend with the message type and send it. */ + buf[0] = 'C'; + errorno = memcpy_s(&buf[1], sizeof(CleanSlotRequestMessage), &request_message, sizeof(CleanSlotRequestMessage)); + securec_check(errorno, "\0", "\0"); + (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_send(buf, sizeof(CleanSlotRequestMessage) + 1); + + ereport(LOG, (errmsg("send clean slot:%s request to Primary(%s) successfully.", + slot_name, t_thrd.walreceiverfuncs_cxt.WalRcv->slotname))); +} + +void check_and_send_slot_clean(TimestampTz &clean_slot_sent_timestamp) +{ + if (pg_atomic_read_u32(&WorkingGrandVersionNum) >= ADD_CLEAN_CASCADE_STANDBY_SLOT_MESSAGE_NUM && + u_sess->attr.attr_common.upgrade_mode == 0 && g_instance.xlog_cxt.need_clean_slot && + t_thrd.xlog_cxt.server_mode == STANDBY_MODE && !t_thrd.xlog_cxt.is_hadr_main_standby && + !t_thrd.xlog_cxt.is_cascade_standby) { + TimestampTz nowtime = GetCurrentTimestamp(); + TimestampTz calculateTime = TimestampTzPlusMilliseconds(clean_slot_sent_timestamp, SEND_CLEAN_SLOT_INTERVAL); + if (timestamptz_cmp_internal(nowtime, calculateTime) >= 0) { + clean_slot_sent_timestamp = nowtime; + scan_slots_need_clean_and_request_primary(); + } + } +} + void WalRcvrProcessData(TimestampTz *last_recv_timestamp, bool *ping_sent) { /* use volatile pointer to prevent code rearrangement */ @@ -382,6 +457,8 @@ void WalRcvrProcessData(TimestampTz *last_recv_timestamp, bool *ping_sent) char *buf = NULL; int len; + static TimestampTz clean_slot_sent_timestamp = 0; + #ifdef ENABLE_DISTRIBUTE_TEST if (TEST_STUB(DN_WALRECEIVE_MAINLOOP, stub_sleep_emit)) { ereport(get_distribute_test_param()->elevel, @@ -448,7 +525,7 @@ void WalRcvrProcessData(TimestampTz *last_recv_timestamp, bool *ping_sent) XLogWalRcvSendSwitchTimeoutRequest(); SendPostmasterSignal(PMSIGNAL_SWITCHOVER_TIMEOUT); } - + check_and_send_slot_clean(clean_slot_sent_timestamp); /* Wait a while for data to arrive */ if ((WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len)) { *last_recv_timestamp = GetCurrentTimestamp(); @@ -1235,6 +1312,16 @@ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) ProcessHadrSwitchoverRequest(&hadrSwithoverMessage); break; } + case 'D': { + CompleteCleanSlotRequestMessage request_message; + CHECK_MSG_SIZE(len, CompleteCleanSlotRequestMessage, + "invalid CompleteCleanSlotRequestMessage message received from primary"); + errorno = memcpy_s(&request_message, sizeof(CompleteCleanSlotRequestMessage), buf, + sizeof(CompleteCleanSlotRequestMessage)); + securec_check(errorno, "\0", "\0"); + process_complete_clean_slot_request(NameStr(request_message.slotName)); + break; + } default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid replication message type %c", type))); @@ -2223,6 +2310,30 @@ static void ProcessSwitchResponse(int code) } } +/* + * process switchover response message from primary. + */ +void process_complete_clean_slot_request(char *slot_name) +{ + bool cleaned = false; + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < g_instance.attr.attr_storage.max_replication_slots; i++) { + ReplicationSlot *s = &t_thrd.slot_cxt.ReplicationSlotCtl->replication_slots[i]; + if (strcmp(slot_name, NameStr(s->data.name)) == 0) { + volatile ReplicationSlot *vslot = s; + SpinLockAcquire(&s->mutex); + vslot->need_clean = false; + SpinLockRelease(&s->mutex); + cleaned = true; + break; + } + } + LWLockRelease(ReplicationSlotControlLock); + if (cleaned) { + ereport(LOG, (errmsg("clean [%s] clean_slot sign", slot_name))); + } +} + /* * Keep track of important messages from primary. */ diff --git a/src/gausskernel/storage/replication/walsender.cpp b/src/gausskernel/storage/replication/walsender.cpp index 4ebad95a2..648669f73 100755 --- a/src/gausskernel/storage/replication/walsender.cpp +++ b/src/gausskernel/storage/replication/walsender.cpp @@ -255,6 +255,8 @@ static void WalSndHadrSwitchoverRequest(); static void ProcessHadrSwitchoverMessage(); static void ProcessHadrReplyMessage(); static int WalSndTimeout(); +void process_clean_slot_message(); +void wal_snd_clean_slot(char *msgbuf, char *slot_name); char *DataDir = "."; @@ -1225,6 +1227,16 @@ static void StartReplication(StartReplicationCmd *cmd) if (t_thrd.slot_cxt.MyReplicationSlot->data.database != InvalidOid) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), (errmsg("cannot use a logical replication slot for physical replication")))); + load_server_mode(); + if (pg_atomic_read_u32(&WorkingGrandVersionNum) >= ADD_CLEAN_CASCADE_STANDBY_SLOT_MESSAGE_NUM && + u_sess->attr.attr_common.upgrade_mode == 0 && AM_WAL_STANDBY_SENDER && + t_thrd.xlog_cxt.server_mode == STANDBY_MODE && !t_thrd.xlog_cxt.is_hadr_main_standby && + !t_thrd.xlog_cxt.is_cascade_standby && strncmp(cmd->slotname, "gs_roach", strlen("gs_roach")) != 0) { + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + g_instance.xlog_cxt.need_clean_slot = true; + t_thrd.slot_cxt.MyReplicationSlot->need_clean = true; + LWLockRelease(ReplicationSlotControlLock); + } } /* @@ -2559,6 +2571,9 @@ static void ProcessStandbyMessage(void) ProcessUwalCatchupEndMessage(); } break; + case 'C': + process_clean_slot_message(); + break; default: ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected message type \"%d\"", msgtype))); @@ -6985,6 +7000,23 @@ static void WalSndResponseSwitchover(char *msgbuf) (void)pq_putmessage_noblock('d', msgbuf, sizeof(PrimarySwitchResponseMessage) + 1); } +/* + * send cleaned slots message + */ +void wal_snd_clean_slot(char *msgbuf, char *slot_name) +{ + errno_t errorno = EOK; + CompleteCleanSlotRequestMessage request_message; + errorno = snprintf_s(request_message.slotName.data, NAMEDATALEN, NAMEDATALEN - 1, "%s", slot_name); + securec_check_ss(errorno, "\0", "\0"); + request_message.sendTime = GetCurrentTimestamp(); + msgbuf[0] = 'D'; + errorno = memcpy_s(msgbuf + 1, sizeof(CompleteCleanSlotRequestMessage), &request_message, + sizeof(CompleteCleanSlotRequestMessage)); + securec_check(errorno, "\0", "\0"); + (void)pq_putmessage_noblock('d', msgbuf, sizeof(CompleteCleanSlotRequestMessage) + 1); +} + /* * send archive xlog command */ @@ -7614,6 +7646,35 @@ static void ProcessHadrSwitchoverMessage() (int32)(walsnd->interactiveState), walsnd->isMasterInstanceReady))); } +void process_clean_slot_message() +{ + CleanSlotRequestMessage message; + pq_copymsgbytes(t_thrd.walsender_cxt.reply_message, (char *)&message, sizeof(CleanSlotRequestMessage)); + + Assert(t_thrd.slot_cxt.MyReplicationSlot != NULL); + ereport(LOG, (errmsg("get clean slot:[%s] request from standby:[%s]", NameStr(message.slotName), + NameStr(t_thrd.slot_cxt.MyReplicationSlot->data.name)))); + + if (g_instance.attr.attr_storage.max_replication_slots == 0) { + ereport(WARNING, (errmsg("get clean slot:[%s] request, but ""max_replication_slots is 0", + NameStr(message.slotName)))); + } + + if (strncmp(NameStr(message.slotName), "gs_roach", strlen("gs_roach")) == 0) { + ereport(WARNING, (errmsg("get clean slot:[%s] request, standby shouldn't send the roach slot clean request.", + NameStr(message.slotName)))); + return; + } + + if (IsLogicalReplicationSlot(NameStr(message.slotName))) { + ereport(WARNING, (errmsg("get clean slot:[%s] request, but this slot is logical. Won't clean it.", + NameStr(message.slotName)))); + } else { + replication_slot_drop_without_acquire(NameStr(message.slotName)); + } + wal_snd_clean_slot(t_thrd.walsender_cxt.output_xlog_message, NameStr(message.slotName)); +} + static void ProcessHadrReplyMessage() { HadrReplyMessage hadrReply; diff --git a/src/include/knl/knl_instance.h b/src/include/knl/knl_instance.h index a89f9d1dc..3568c474c 100755 --- a/src/include/knl/knl_instance.h +++ b/src/include/knl/knl_instance.h @@ -921,6 +921,7 @@ typedef struct knl_g_xlog_context { ShareStorageOperateCtl shareStorageopCtl; int shareStorageLockFd; ShareStorageXLogCtl *ssReplicationXLogCtl; + bool need_clean_slot; /* standby node will check cascade standby slot */ } knl_g_xlog_context; typedef struct knl_g_undo_context { diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 87029db45..35a383643 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -357,6 +357,7 @@ extern uint8 ce_cache_refresh_type; #define MIN_QUERY_DOP -(MAX_QUERY_DOP) extern const uint32 BACKUP_SLOT_VERSION_NUM; +extern const uint32 ADD_CLEAN_CASCADE_STANDBY_SLOT_MESSAGE_NUM; /* Debug mode. * 0 - Do not change any thing. diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 270ccab20..a87c1d4b7 100755 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -132,6 +132,8 @@ typedef struct ReplicationSlot { /* any outstanding modifications? */ bool just_dirtied; bool dirty; + /* need clean in Primary */ + bool need_clean; /* * For logical decoding, it's extremely important that we never remove any @@ -269,7 +271,8 @@ extern void ReplicationSlotCreate(const char* name, ReplicationSlotPersistency p Oid databaseId, XLogRecPtr restart_lsn, XLogRecPtr confirmed_lsn, char* extra_content = NULL, bool encrypted = false); extern void ReplicationSlotPersist(void); -extern void ReplicationSlotDrop(const char* name, bool for_backup = false, bool nowait = true); +extern void ReplicationSlotDrop(const char *name, bool for_backup = false, bool record_log = true); +extern void replication_slot_drop_without_acquire(const char *name); extern void ReplicationSlotAcquire(const char* name, bool isDummyStandby, bool allowDrop = false, bool nowait = true); extern bool IsReplicationSlotActive(const char *name); extern bool IsLogicalReplicationSlot(const char *name); diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h index e7a93e4c8..5cfe4dab6 100755 --- a/src/include/replication/walprotocol.h +++ b/src/include/replication/walprotocol.h @@ -272,6 +272,36 @@ typedef struct StandbySwitchRequestMessage { TimestampTz sendTime; } StandbySwitchRequestMessage; +/* + * @@GaussDB@@ + * switchover request message from standby (message type 'C'). This is wrapped within + * a CopyData message at the FE/BE protocol level. + * + * Note that the data length is not specified here. + */ +typedef struct { + /* instance name of slot to be cleaned */ + NameData slotName; + + /* receiver's system clock at the time of transmission */ + TimestampTz sendTime; +} CleanSlotRequestMessage; + +/* + * @@GaussDB@@ + * switchover request message from standby (message type 'S'). This is wrapped within + * a CopyData message at the FE/BE protocol level. + * + * Note that the data length is not specified here. + */ +typedef struct { + /* cleaned slot name */ + NameData slotName; + + /* receiver's system clock at the time of transmission */ + TimestampTz sendTime; +} CompleteCleanSlotRequestMessage; + /* * switchover request message in the streaming dr (message type ''). This is wrapped within * a CopyData message at the FE/BE protocol level. diff --git a/src/test/ha/ha_schedule_single_standby_read b/src/test/ha/ha_schedule_single_standby_read index d971647cf..cda58e181 100644 --- a/src/test/ha/ha_schedule_single_standby_read +++ b/src/test/ha/ha_schedule_single_standby_read @@ -1 +1,2 @@ -exrtostandbyread/single_standby_read_base \ No newline at end of file +exrtostandbyread/single_standby_read_base +cascade/clean_slot \ No newline at end of file diff --git a/src/test/ha/testcase/cascade/clean_slot.sh b/src/test/ha/testcase/cascade/clean_slot.sh new file mode 100644 index 000000000..1d089266e --- /dev/null +++ b/src/test/ha/testcase/cascade/clean_slot.sh @@ -0,0 +1,52 @@ +#!/bin/sh + +source ./util.sh + +function check_select_result() +{ + if [ $(echo $result | grep "${1}" | wc -l) -eq 1 ]; then + echo "remote read successful" + else + echo "remote read failed $failed_keyword with [$result]" + exit 1 + fi +} + +function check_select_no_result() +{ + if [ $(echo $result | grep "${1}" | wc -l) -eq 0 ]; then + echo "remote read successful" + else + echo "remote read failed $failed_keyword with [$result]" + exit 1 + fi +} + +function test_cascade_standby_clean_slot_func() +{ + set_default + + echo "base" + result=`gsql -d $db -p $dn1_primary_port -c "select * from pg_get_replication_slots();"` + check_select_result "dn_s2" + + kill_cascade_cluster + start_cascade_cluster + + + result=`gsql -d $db -p $dn1_primary_port -c "select * from pg_get_replication_slots();"` + check_select_no_result "dn_s2" + + switchover_to_cascade_standby + + result=`gsql -d $db -p $dn1_primary_port -c "select * from pg_get_replication_slots();"` + check_select_no_result "dn_s1" + +} + +function tear_down() { + set_cascade_default +} + +test_cascade_standby_clean_slot_func +tear_down \ No newline at end of file