From 6e3f441b6c46cc4fdfa56c8766b537d0752cb516 Mon Sep 17 00:00:00 2001 From: chenxiaobin19 <1025221611@qq.com> Date: Tue, 21 Jun 2022 21:19:48 +0800 Subject: [PATCH] check if temporary replication slot of tablesync worker exist before dropping it --- src/common/backend/utils/sort/tuplestore.cpp | 5 ++ .../optimizer/commands/subscriptioncmds.cpp | 47 +++++++++++-------- src/include/utils/tuplestore.h | 1 + 3 files changed, 34 insertions(+), 19 deletions(-) diff --git a/src/common/backend/utils/sort/tuplestore.cpp b/src/common/backend/utils/sort/tuplestore.cpp index 7672bc21d..4535ba48c 100644 --- a/src/common/backend/utils/sort/tuplestore.cpp +++ b/src/common/backend/utils/sort/tuplestore.cpp @@ -1509,3 +1509,8 @@ int tuplestore_get_spread_num(Tuplestorestate* state) { return state->spreadNum; } + +int tuplestore_get_memtupcount(Tuplestorestate* state) +{ + return state->memtupcount; +} diff --git a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp index 67490ec94..5f05f4418 100644 --- a/src/gausskernel/optimizer/commands/subscriptioncmds.cpp +++ b/src/gausskernel/optimizer/commands/subscriptioncmds.cpp @@ -1153,35 +1153,44 @@ void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) */ void ReplicationSlotDropAtPubNode(char *slotname, bool missing_ok) { + WalRcvExecResult *res = NULL; StringInfoData cmd; Assert(t_thrd.libwalreceiver_cxt.streamConn); initStringInfo(&cmd); + + /* Check if the replication slot exists on publisher. */ + if (missing_ok) { + Oid row[1] = {INT4OID}; + + appendStringInfo(&cmd, "SELECT 1 FROM pg_replication_slots WHERE slot_name = '%s'", slotname); + res = (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_exec(cmd.data, 1, row); + resetStringInfo(&cmd); + + if (res->status == WALRCV_OK_TUPLES && tuplestore_get_memtupcount(res->tuplestore) == 0) { + walrcv_clear_result(res); + FreeStringInfo(&cmd); + + ereport(WARNING, (errmsg("replication slot \"%s\" does not exist on publisher", slotname))); + return; + } + } + appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname)); - PG_TRY(); - { - WalRcvExecResult *res = WalReceiverFuncTable[GET_FUNC_IDX].walrcv_exec(cmd.data, 0, NULL); - if (res->status != WALRCV_OK_COMMAND && missing_ok && res->sqlstate == ERRCODE_UNDEFINED_OBJECT) { - /* drop replication slot failed cause it doesn't exist on publisher, give a warning and continue */ - ereport(WARNING, (errmsg("could not drop the replication slot \"%s\" on publisher", slotname), - errdetail("The error was: %s", res->err))); - } else if (res->status != WALRCV_OK_COMMAND) { - ereport(ERROR, (errmsg("could not drop the replication slot \"%s\" on publisher", slotname), - errdetail("The error was: %s", res->err))); - } else { - ereport(NOTICE, (errmsg("dropped replication slot \"%s\" on publisher", slotname))); - } + res = WalReceiverFuncTable[GET_FUNC_IDX].walrcv_exec(cmd.data, 0, NULL); + if (res->status != WALRCV_OK_COMMAND) { walrcv_clear_result(res); - } - PG_CATCH(); - { FreeStringInfo(&cmd); - PG_RE_THROW(); - } - PG_END_TRY(); + ereport(ERROR, (errmsg("could not drop the replication slot \"%s\" on publisher", slotname), + errdetail("The error was: %s", res->err))); + } else { + ereport(NOTICE, (errmsg("dropped replication slot \"%s\" on publisher", slotname))); + } + + walrcv_clear_result(res); FreeStringInfo(&cmd); } diff --git a/src/include/utils/tuplestore.h b/src/include/utils/tuplestore.h index 193ce9b5e..565c8670e 100644 --- a/src/include/utils/tuplestore.h +++ b/src/include/utils/tuplestore.h @@ -79,5 +79,6 @@ extern void tuplestore_end(Tuplestorestate* state); extern int64 tuplestore_get_avgwidth(Tuplestorestate* state); extern bool tuplestore_get_busy_status(Tuplestorestate* state); extern int tuplestore_get_spread_num(Tuplestorestate* state); +extern int tuplestore_get_memtupcount(Tuplestorestate* state); #endif /* TUPLESTORE_H */