diff --git a/src/gausskernel/storage/replication/logical/launcher.cpp b/src/gausskernel/storage/replication/logical/launcher.cpp index 52ff64a7b..62acf1102 100644 --- a/src/gausskernel/storage/replication/logical/launcher.cpp +++ b/src/gausskernel/storage/replication/logical/launcher.cpp @@ -270,6 +270,7 @@ void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid user } /* Prepare the worker info. */ + worker->generation++; worker->proc = NULL; worker->dbid = dbid; worker->userid = userid; @@ -299,6 +300,7 @@ void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid user void logicalrep_worker_stop(Oid subid, Oid relid) { LogicalRepWorker *worker; + uint16 generation; (void)LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); @@ -309,6 +311,12 @@ void logicalrep_worker_stop(Oid subid, Oid relid) return; } + /* + * Remember which generation was our worker so we can check if what we see + * is still the same one. + */ + generation = worker->generation; + /* * If we found a worker but it does not have proc set then it is still * starting up; wait for it to finish starting and then kill it. @@ -333,9 +341,10 @@ void logicalrep_worker_stop(Oid subid, Oid relid) /* * Worker is no longer associated with subscription. It must have - * exited, nothing more for us to do. + * exited, nothing more for us to do. Or whether the worker generation + * is different, meaning that a different worker has taken the slot. */ - if (worker->subid == InvalidOid) { + if (worker->subid == InvalidOid || worker->generation != generation) { LWLockRelease(LogicalRepWorkerLock); return; } @@ -353,7 +362,7 @@ void logicalrep_worker_stop(Oid subid, Oid relid) int rc; /* is it gone? */ - if (!worker->proc) { + if (!worker->proc || worker->generation != generation) { break; } LWLockRelease(LogicalRepWorkerLock); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index d6d1f4fc8..4642c343e 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -17,6 +17,9 @@ typedef struct LogicalRepWorker { + /* Increased everytime the slot is tabken by new worker */ + uint16 generation; + /* Pointer to proc array. NULL if not running. */ PGPROC *proc;