!2224 修复删除或刷新订阅阻塞的问题

Merge pull request !2224 from chenxiaobin/fixWorkerHang
This commit is contained in:
opengauss-bot
2022-09-23 09:40:29 +00:00
committed by Gitee
2 changed files with 15 additions and 3 deletions

View File

@ -270,6 +270,7 @@ void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid user
}
/* Prepare the worker info. */
worker->generation++;
worker->proc = NULL;
worker->dbid = dbid;
worker->userid = userid;
@ -299,6 +300,7 @@ void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid user
void logicalrep_worker_stop(Oid subid, Oid relid)
{
LogicalRepWorker *worker;
uint16 generation;
(void)LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
@ -309,6 +311,12 @@ void logicalrep_worker_stop(Oid subid, Oid relid)
return;
}
/*
* Remember which generation was our worker so we can check if what we see
* is still the same one.
*/
generation = worker->generation;
/*
* If we found a worker but it does not have proc set then it is still
* starting up; wait for it to finish starting and then kill it.
@ -333,9 +341,10 @@ void logicalrep_worker_stop(Oid subid, Oid relid)
/*
* Worker is no longer associated with subscription. It must have
* exited, nothing more for us to do.
* exited, nothing more for us to do. Or whether the worker generation
* is different, meaning that a different worker has taken the slot.
*/
if (worker->subid == InvalidOid) {
if (worker->subid == InvalidOid || worker->generation != generation) {
LWLockRelease(LogicalRepWorkerLock);
return;
}
@ -353,7 +362,7 @@ void logicalrep_worker_stop(Oid subid, Oid relid)
int rc;
/* is it gone? */
if (!worker->proc) {
if (!worker->proc || worker->generation != generation) {
break;
}
LWLockRelease(LogicalRepWorkerLock);

View File

@ -17,6 +17,9 @@
typedef struct LogicalRepWorker
{
/* Increased everytime the slot is tabken by new worker */
uint16 generation;
/* Pointer to proc array. NULL if not running. */
PGPROC *proc;