!1874 修复删除订阅时逻辑复制槽不存在报错的问题
Merge pull request !1874 from chenxiaobin/initial_data
This commit is contained in:
@ -1509,3 +1509,8 @@ int tuplestore_get_spread_num(Tuplestorestate* state)
|
||||
{
|
||||
return state->spreadNum;
|
||||
}
|
||||
|
||||
int tuplestore_get_memtupcount(Tuplestorestate* state)
|
||||
{
|
||||
return state->memtupcount;
|
||||
}
|
||||
|
||||
@ -1153,35 +1153,44 @@ void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
|
||||
*/
|
||||
void ReplicationSlotDropAtPubNode(char *slotname, bool missing_ok)
|
||||
{
|
||||
WalRcvExecResult *res = NULL;
|
||||
StringInfoData cmd;
|
||||
|
||||
Assert(t_thrd.libwalreceiver_cxt.streamConn);
|
||||
|
||||
initStringInfo(&cmd);
|
||||
|
||||
/* Check if the replication slot exists on publisher. */
|
||||
if (missing_ok) {
|
||||
Oid row[1] = {INT4OID};
|
||||
|
||||
appendStringInfo(&cmd, "SELECT 1 FROM pg_replication_slots WHERE slot_name = '%s'", slotname);
|
||||
res = (WalReceiverFuncTable[GET_FUNC_IDX]).walrcv_exec(cmd.data, 1, row);
|
||||
resetStringInfo(&cmd);
|
||||
|
||||
if (res->status == WALRCV_OK_TUPLES && tuplestore_get_memtupcount(res->tuplestore) == 0) {
|
||||
walrcv_clear_result(res);
|
||||
FreeStringInfo(&cmd);
|
||||
|
||||
ereport(WARNING, (errmsg("replication slot \"%s\" does not exist on publisher", slotname)));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
appendStringInfo(&cmd, "DROP_REPLICATION_SLOT %s", quote_identifier(slotname));
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
WalRcvExecResult *res = WalReceiverFuncTable[GET_FUNC_IDX].walrcv_exec(cmd.data, 0, NULL);
|
||||
if (res->status != WALRCV_OK_COMMAND && missing_ok && res->sqlstate == ERRCODE_UNDEFINED_OBJECT) {
|
||||
/* drop replication slot failed cause it doesn't exist on publisher, give a warning and continue */
|
||||
ereport(WARNING, (errmsg("could not drop the replication slot \"%s\" on publisher", slotname),
|
||||
errdetail("The error was: %s", res->err)));
|
||||
} else if (res->status != WALRCV_OK_COMMAND) {
|
||||
ereport(ERROR, (errmsg("could not drop the replication slot \"%s\" on publisher", slotname),
|
||||
errdetail("The error was: %s", res->err)));
|
||||
} else {
|
||||
ereport(NOTICE, (errmsg("dropped replication slot \"%s\" on publisher", slotname)));
|
||||
}
|
||||
res = WalReceiverFuncTable[GET_FUNC_IDX].walrcv_exec(cmd.data, 0, NULL);
|
||||
if (res->status != WALRCV_OK_COMMAND) {
|
||||
walrcv_clear_result(res);
|
||||
}
|
||||
PG_CATCH();
|
||||
{
|
||||
FreeStringInfo(&cmd);
|
||||
PG_RE_THROW();
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
ereport(ERROR, (errmsg("could not drop the replication slot \"%s\" on publisher", slotname),
|
||||
errdetail("The error was: %s", res->err)));
|
||||
} else {
|
||||
ereport(NOTICE, (errmsg("dropped replication slot \"%s\" on publisher", slotname)));
|
||||
}
|
||||
|
||||
walrcv_clear_result(res);
|
||||
FreeStringInfo(&cmd);
|
||||
}
|
||||
|
||||
|
||||
@ -79,5 +79,6 @@ extern void tuplestore_end(Tuplestorestate* state);
|
||||
extern int64 tuplestore_get_avgwidth(Tuplestorestate* state);
|
||||
extern bool tuplestore_get_busy_status(Tuplestorestate* state);
|
||||
extern int tuplestore_get_spread_num(Tuplestorestate* state);
|
||||
extern int tuplestore_get_memtupcount(Tuplestorestate* state);
|
||||
|
||||
#endif /* TUPLESTORE_H */
|
||||
|
||||
Reference in New Issue
Block a user