From 38b5a0ac95a1b11b71bd4ef936e5a7fe2b01ff32 Mon Sep 17 00:00:00 2001 From: TotaJ Date: Mon, 25 Apr 2022 17:48:53 +0800 Subject: [PATCH] Fix logical decode core. --- .../storage/replication/logical/decode.cpp | 2 ++ src/gausskernel/storage/replication/walsender.cpp | 11 ++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/gausskernel/storage/replication/logical/decode.cpp b/src/gausskernel/storage/replication/logical/decode.cpp index 9bca465a5..432bdebb6 100644 --- a/src/gausskernel/storage/replication/logical/decode.cpp +++ b/src/gausskernel/storage/replication/logical/decode.cpp @@ -223,6 +223,7 @@ void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState * return; } + ResourceOwner tmpOwner = t_thrd.utils_cxt.CurrentResourceOwner; /* cast so we get a warning when new rmgrs are added */ switch ((RmgrIds)XLogRecGetRmid(record)) { /* @@ -257,6 +258,7 @@ void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState * default: break; } + t_thrd.utils_cxt.CurrentResourceOwner = tmpOwner; } /* diff --git a/src/gausskernel/storage/replication/walsender.cpp b/src/gausskernel/storage/replication/walsender.cpp index bb659178a..f288c6c12 100755 --- a/src/gausskernel/storage/replication/walsender.cpp +++ b/src/gausskernel/storage/replication/walsender.cpp @@ -401,6 +401,7 @@ int WalSenderMain(void) t_thrd.walsender_cxt.server_run_mode = t_thrd.postmaster_cxt.HaShmData->current_mode; SetHaWalSenderChannel(); + Assert(t_thrd.utils_cxt.CurrentResourceOwner != NULL); /* Handle handshake messages before streaming */ WalSndHandshake(); @@ -496,6 +497,7 @@ static void WalSndHandshake(void) while (!repCxt.replicationStarted) { int firstchar; + Assert(t_thrd.utils_cxt.CurrentResourceOwner != NULL); WalSndSetState(WALSNDSTATE_STARTUP); set_ps_display("idle", false); @@ -2033,6 +2035,8 @@ bool isLogicalSlotExist(char* slotName) } static void IdentifyCommand(Node* cmd_node, ReplicationCxt* repCxt, const char *cmd_string){ + ResourceOwner tmpOwner = t_thrd.utils_cxt.CurrentResourceOwner; + Assert(tmpOwner != NULL); switch (cmd_node->type) { case T_IdentifySystemCmd: IdentifySystem(); @@ -2140,7 +2144,7 @@ static void IdentifyCommand(Node* cmd_node, ReplicationCxt* repCxt, const char * ereport(FATAL, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("invalid standby query string: %s", cmd_string))); } - + t_thrd.utils_cxt.CurrentResourceOwner = tmpOwner; } /* @@ -3596,8 +3600,12 @@ static int WalSndLoop(WalSndSendDataCallback send_data) t_thrd.walsender_cxt.last_logical_xlog_advanced_timestamp = GetCurrentTimestamp(); t_thrd.walsender_cxt.waiting_for_ping_response = false; + ResourceOwner tmpOwner = t_thrd.utils_cxt.CurrentResourceOwner; + Assert(!IsTransactionOrTransactionBlock() && + strcmp(ResourceOwnerGetName(tmpOwner), "walsender top-level resource owner") == 0); /* Loop forever, unless we get an error */ for (;;) { + t_thrd.utils_cxt.CurrentResourceOwner = tmpOwner; TimestampTz now; /* Clear any already-pending wakeups */ @@ -4006,6 +4014,7 @@ static int WalSndLoop(WalSndSendDataCallback send_data) } } + t_thrd.utils_cxt.CurrentResourceOwner = tmpOwner; WalSndShutdown(); return 1; /* keep the compiler quiet */ }