fix backendworker not exit bugs
This commit is contained in:
@ -55,6 +55,7 @@ void pq_redirect_to_shm_mq(shm_mq_handle *mqh)
|
||||
t_thrd.msqueue_cxt.pq_mq_handle = mqh;
|
||||
t_thrd.postgres_cxt.whereToSendOutput = static_cast<int>(DestRemote);
|
||||
FrontendProtocol = PG_PROTOCOL_LATEST;
|
||||
t_thrd.msqueue_cxt.is_changed = true;
|
||||
}
|
||||
|
||||
void pq_stop_redirect_to_shm_mq(void)
|
||||
@ -64,6 +65,7 @@ void pq_stop_redirect_to_shm_mq(void)
|
||||
FrontendProtocol = t_thrd.msqueue_cxt.save_FrontendProtocol;
|
||||
t_thrd.msqueue_cxt.pq_mq = NULL;
|
||||
t_thrd.msqueue_cxt.pq_mq_handle = NULL;
|
||||
t_thrd.msqueue_cxt.is_changed = false;
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@ -638,8 +638,9 @@ static void bgworker_quickdie(SIGNAL_ARGS)
|
||||
*/
|
||||
static void bgworker_die(SIGNAL_ARGS)
|
||||
{
|
||||
(void)PG_SETMASK(&t_thrd.libpq_cxt.BlockSig);
|
||||
(void)gs_signal_setmask(&t_thrd.libpq_cxt.BlockSig, NULL);
|
||||
|
||||
t_thrd.postgres_cxt.whereToSendOutput = DestNone;
|
||||
ereport(FATAL,
|
||||
(errcode(ERRCODE_ADMIN_SHUTDOWN),
|
||||
errmsg("terminating background worker \"%s\" due to administrator command",
|
||||
@ -734,6 +735,14 @@ void StartBackgroundWorker(void* bgWorkerSlotShmAddr)
|
||||
(void)gspqsignal(SIGUSR2, SIG_IGN);
|
||||
(void)gspqsignal(SIGCHLD, SIG_DFL);
|
||||
|
||||
(void)gs_signal_unblock_sigusr2();
|
||||
if (IsUnderPostmaster) {
|
||||
/* We allow SIGQUIT (quickdie) at all times */
|
||||
(void)sigdelset(&t_thrd.libpq_cxt.BlockSig, SIGQUIT);
|
||||
}
|
||||
|
||||
gs_signal_setmask(&t_thrd.libpq_cxt.BlockSig, NULL); /* block everything except SIGQUIT */
|
||||
|
||||
/*
|
||||
* If an exception is encountered, processing resumes here.
|
||||
*
|
||||
@ -1298,12 +1307,12 @@ void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid, uint32 fl
|
||||
*/
|
||||
void BackgroundWorkerBlockSignals(void)
|
||||
{
|
||||
(void)PG_SETMASK(&t_thrd.libpq_cxt.BlockSig);
|
||||
(void)gs_signal_setmask(&t_thrd.libpq_cxt.BlockSig, NULL);
|
||||
}
|
||||
|
||||
void BackgroundWorkerUnblockSignals(void)
|
||||
{
|
||||
(void)PG_SETMASK(&t_thrd.libpq_cxt.UnBlockSig);
|
||||
(void)gs_signal_setmask(&t_thrd.libpq_cxt.UnBlockSig, NULL);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -191,6 +191,8 @@ AutonomousSession * AutonomousSessionStart(void)
|
||||
shm_mq_set_handle(session->command_qh, session->worker_handle);
|
||||
shm_mq_set_handle(session->response_qh, session->worker_handle);
|
||||
|
||||
t_thrd.autonomous_cxt.handle = session->worker_handle;
|
||||
|
||||
bgwstatus = WaitForBackgroundWorkerStartup(session->worker_handle, &pid);
|
||||
if (bgwstatus != BGWH_STARTED)
|
||||
ereport(ERROR,
|
||||
@ -244,6 +246,7 @@ void AutonomousSessionEnd(AutonomousSession *session)
|
||||
pfree(session->worker_handle);
|
||||
pfree(session->seg);
|
||||
pfree(session);
|
||||
t_thrd.autonomous_cxt.handle = NULL;
|
||||
}
|
||||
|
||||
AutonomousResult *AutonomousSessionExecute(AutonomousSession *session, const char *sql)
|
||||
@ -347,7 +350,6 @@ AutonomousPreparedStatement *AutonomousSessionPrepare(AutonomousSession *session
|
||||
invalid_protocol_message(msgtype);
|
||||
break;
|
||||
}
|
||||
|
||||
pq_redirect_to_shm_mq(session->command_qh);
|
||||
pq_beginmessage(&msg, 'D');
|
||||
pq_sendbyte(&msg, 'S');
|
||||
@ -522,7 +524,6 @@ void autonomous_worker_main(Datum main_arg)
|
||||
|
||||
char msgtype;
|
||||
|
||||
(void)gspqsignal(SIGTERM, die);
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
t_thrd.autonomous_cxt.isnested = true;
|
||||
|
||||
@ -67,6 +67,7 @@
|
||||
#endif /* PGXC */
|
||||
#include "postmaster/autovacuum.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "replication/dataqueue.h"
|
||||
#include "replication/datasender.h"
|
||||
#include "replication/walsender.h"
|
||||
@ -146,6 +147,7 @@ extern int optreset; /* might not be declared by system headers */
|
||||
#include "utils/syscache.h"
|
||||
#include "utils/tqual.h"
|
||||
#include "storage/mot/jit_exec.h"
|
||||
#include "libpq/pqmq.h"
|
||||
|
||||
#define GSCGROUP_ATTACH_TASK() \
|
||||
{ \
|
||||
@ -6624,6 +6626,13 @@ int StreamMain(void* arg)
|
||||
int curTryCounter;
|
||||
int* oldTryCounter = NULL;
|
||||
if (sigsetjmp(local_sigjmp_buf, 1) != 0) {
|
||||
if (t_thrd.msqueue_cxt.is_changed == true) {
|
||||
pq_stop_redirect_to_shm_mq();
|
||||
}
|
||||
if (t_thrd.autonomous_cxt.handle) {
|
||||
TerminateBackgroundWorker(t_thrd.autonomous_cxt.handle);
|
||||
t_thrd.autonomous_cxt.handle = NULL;
|
||||
}
|
||||
gstrace_tryblock_exit(true, oldTryCounter);
|
||||
|
||||
(void)pgstat_report_waitstatus(STATE_WAIT_UNDEFINED);
|
||||
@ -7521,7 +7530,7 @@ int PostgresMain(int argc, char* argv[], const char* dbname, const char* usernam
|
||||
t_thrd.postgres_cxt.gpc_fisrt_send_clean = false;
|
||||
GPC->SendPrepareDestoryMsg();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Abort the current transaction in order to recover.
|
||||
*/
|
||||
|
||||
@ -1387,6 +1387,7 @@ static void knl_t_heartbeat_init(knl_t_heartbeat_context* heartbeat_cxt)
|
||||
static void knl_t_autonomous_init(knl_t_autonomous_context* autonomous_cxt)
|
||||
{
|
||||
autonomous_cxt->isnested = false;
|
||||
autonomous_cxt->handle = NULL;
|
||||
autonomous_cxt->sqlstmt = NULL;
|
||||
}
|
||||
|
||||
@ -1432,7 +1433,7 @@ void knl_t_msqueue_init(knl_t_msqueue_context* msqueue_cxt)
|
||||
msqueue_cxt->save_PqCommMethods = NULL;
|
||||
msqueue_cxt->save_whereToSendOutput = DestDebug;
|
||||
msqueue_cxt->save_FrontendProtocol = PG_PROTOCOL_LATEST;
|
||||
//msqueue_cxt->PqCommMethods = NULL;
|
||||
msqueue_cxt->is_changed = false;
|
||||
PqCommMethods_init();
|
||||
}
|
||||
|
||||
|
||||
@ -2665,6 +2665,7 @@ struct PLpgSQL_expr;
|
||||
typedef struct knl_t_autonomous_context {
|
||||
PLpgSQL_expr* sqlstmt;
|
||||
bool isnested;
|
||||
BackgroundWorkerHandle* handle;
|
||||
} knl_t_autonomous_context;
|
||||
|
||||
/* MOT thread attributes */
|
||||
@ -2736,6 +2737,7 @@ typedef struct knl_t_msqueue_context {
|
||||
CommandDest save_whereToSendOutput;
|
||||
ProtocolVersion save_FrontendProtocol;
|
||||
const PQcommMethods *PqCommMethods;
|
||||
bool is_changed;
|
||||
} knl_t_msqueue_context;
|
||||
|
||||
/* thread context. */
|
||||
|
||||
Reference in New Issue
Block a user