add message queue from PG

This commit is contained in:
jiang_jianyu
2020-08-25 21:27:33 +08:00
parent b3c0ecfb4c
commit 4ba086877c
11 changed files with 1625 additions and 22 deletions

View File

@ -23,6 +23,6 @@ ifneq "$(MAKECMDGOALS)" "clean"
endif
endif
OBJS = be-fsstubs.o be-secure.o auth.o crypt.o hba.o ip.o md5.o sha2.o pqcomm.o \
pqformat.o pqsignal.o
pqformat.o pqsignal.o pqmq.o
include $(top_srcdir)/src/gausskernel/common.mk

View File

@ -145,6 +145,28 @@ static int Lock_AF_UNIX(unsigned short portNumber, const char* unixSocketName, b
static int Setup_AF_UNIX(bool is_create_psql_sock);
#endif /* HAVE_UNIX_SOCKETS */
static void socket_comm_reset(void);
static int socket_flush(void);
static int socket_flush_if_writable(void);
static bool socket_is_send_pending(void);
static int socket_putmessage(char msgtype, const char *s, size_t len);
static int socket_putmessage_noblock(char msgtype, const char *s, size_t len);
static void socket_startcopyout(void);
static void socket_endcopyout(bool errorAbort);
static PQcommMethods PqCommSocketMethods = {
socket_comm_reset,
socket_flush,
socket_flush_if_writable,
socket_is_send_pending,
socket_putmessage,
socket_putmessage_noblock,
socket_startcopyout,
socket_endcopyout
};
THR_LOCAL PQcommMethods *PqCommMethods = &PqCommSocketMethods;
extern bool FencedUDFMasterMode;
/* --------------------------------
@ -458,7 +480,7 @@ void pq_init(void)
* inside a pqcomm.c routine (which ideally will never happen, but...)
* --------------------------------
*/
void pq_comm_reset(void)
static void socket_comm_reset(void)
{
/* Do not throw away pending data, but do reset the busy flag */
t_thrd.libpq_cxt.PqCommBusy = false;
@ -1612,7 +1634,7 @@ static int internal_putbytes(const char* s, size_t len)
* returns 0 if OK, EOF if trouble
* --------------------------------
*/
int pq_flush(void)
static int socket_flush(void)
{
int res = 0;
@ -1769,7 +1791,7 @@ static int internal_flush(void)
* Returns 0 if OK, or EOF if trouble.
* --------------------------------
*/
int pq_flush_if_writable(void)
static int socket_flush_if_writable(void)
{
int res;
@ -1868,7 +1890,7 @@ void pq_flush_timedwait(int timeout)
* pq_is_send_pending - is there any pending data in the output buffer?
* --------------------------------
*/
bool pq_is_send_pending(void)
static bool socket_is_send_pending(void)
{
return (t_thrd.libpq_cxt.PqSendStart < t_thrd.libpq_cxt.PqSendPointer);
}
@ -1905,7 +1927,7 @@ bool pq_is_send_pending(void)
* returns 0 if OK, EOF if trouble
* --------------------------------
*/
int pq_putmessage(char msgtype, const char* s, size_t len)
static int socket_putmessage(char msgtype, const char* s, size_t len)
{
if (t_thrd.libpq_cxt.DoingCopyOut || t_thrd.libpq_cxt.PqCommBusy) {
return 0;
@ -1941,7 +1963,7 @@ fail:
* If the output buffer is too small to hold the message, the buffer
* is enlarged.
*/
int pq_putmessage_noblock(char msgtype, const char* s, size_t len)
static int socket_putmessage_noblock(char msgtype, const char* s, size_t len)
{
int res;
int required;
@ -1967,7 +1989,7 @@ int pq_putmessage_noblock(char msgtype, const char* s, size_t len)
* is beginning
* --------------------------------
*/
void pq_startcopyout(void)
static void socket_startcopyout(void)
{
t_thrd.libpq_cxt.DoingCopyOut = true;
}
@ -1982,7 +2004,7 @@ void pq_startcopyout(void)
* not allow binary transfers, so a textual terminator is always correct.
* --------------------------------
*/
void pq_endcopyout(bool errorAbort)
static void socket_endcopyout(bool errorAbort)
{
if (!t_thrd.libpq_cxt.DoingCopyOut) {
return;

View File

@ -610,6 +610,34 @@ const char* pq_getmsgstring(StringInfo msg)
return pg_client_to_server(str, slen);
}
/* --------------------------------
* pq_getmsgrawstring - get a null-terminated text string - NO conversion
*
* Returns a pointer directly into the message buffer.
* --------------------------------
*/
const char *pq_getmsgrawstring(StringInfo msg)
{
char *str;
int slen;
str = &msg->data[msg->cursor];
/*
* It's safe to use strlen() here because a StringInfo is guaranteed to
* have a trailing null byte. But check we found a null inside the
* message.
*/
slen = strlen(str);
if (msg->cursor + slen >= msg->len)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("invalid string in message")));
msg->cursor += slen + 1;
return str;
}
/* --------------------------------
* pq_getmsgend - verify message fully consumed
* --------------------------------

View File

@ -0,0 +1,270 @@
/*-------------------------------------------------------------------------
*
* pqmq.cpp
* Use the frontend/backend protocol for communication over a shm_mq
*
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/common/backend/libpq/pqmq.cpp
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
static THR_LOCAL shm_mq *pq_mq;
static THR_LOCAL shm_mq_handle *pq_mq_handle;
static THR_LOCAL bool pq_mq_busy = false;
static THR_LOCAL ThreadId pq_mq_parallel_master_pid = 0;
static THR_LOCAL BackendId pq_mq_parallel_master_backend_id = InvalidBackendId;
static void mq_comm_reset(void);
static int mq_flush(void);
static int mq_flush_if_writable(void);
static bool mq_is_send_pending(void);
static int mq_putmessage(char msgtype, const char *s, size_t len);
static int mq_putmessage_noblock(char msgtype, const char *s, size_t len);
static void mq_startcopyout(void);
static void mq_endcopyout(bool errorAbort);
static THR_LOCAL PQcommMethods PqCommMqMethods = {
mq_comm_reset,
mq_flush,
mq_flush_if_writable,
mq_is_send_pending,
mq_putmessage,
mq_putmessage_noblock,
mq_startcopyout,
mq_endcopyout
};
static THR_LOCAL PQcommMethods *save_PqCommMethods;
static THR_LOCAL CommandDest save_whereToSendOutput;
static THR_LOCAL ProtocolVersion save_FrontendProtocol;
/*
* Arrange to redirect frontend/backend protocol messages to a message queue.
*/
void pq_redirect_to_shm_mq(shm_mq_handle *mqh)
{
save_PqCommMethods = PqCommMethods;
save_whereToSendOutput = CommandDest(t_thrd.postgres_cxt.whereToSendOutput);
save_FrontendProtocol = FrontendProtocol;
PqCommMethods = &PqCommMqMethods;
pq_mq_handle = mqh;
t_thrd.postgres_cxt.whereToSendOutput = static_cast<int>(DestRemote);
FrontendProtocol = PG_PROTOCOL_LATEST;
}
void pq_stop_redirect_to_shm_mq(void)
{
PqCommMethods = save_PqCommMethods;
t_thrd.postgres_cxt.whereToSendOutput = static_cast<int>(save_whereToSendOutput);
FrontendProtocol = save_FrontendProtocol;
pq_mq = NULL;
pq_mq_handle = NULL;
}
/*
* Arrange to SendProcSignal() to the parallel master each time we transmit
* message data via the shm_mq.
*/
void pq_set_parallel_master(ThreadId pid, BackendId backend_id)
{
Assert(PqCommMethods == &PqCommMqMethods);
pq_mq_parallel_master_pid = pid;
pq_mq_parallel_master_backend_id = backend_id;
}
static void mq_comm_reset(void)
{
/* Nothing to do. */
}
static int mq_flush(void)
{
/* Nothing to do. */
return 0;
}
static int mq_flush_if_writable(void)
{
/* Nothing to do. */
return 0;
}
static bool mq_is_send_pending(void)
{
/* There's never anything pending. */
return false;
}
/*
* Transmit a libpq protocol message to the shared memory message queue
* selected via pq_mq_handle. We don't include a length word, because the
* receiver will know the length of the message from shm_mq_receive().
*/
static int mq_putmessage(char msgtype, const char *s, size_t len)
{
shm_mq_iovec iov[2];
shm_mq_result result;
/*
* If we're sending a message, and we have to wait because the queue is
* full, and then we get interrupted, and that interrupt results in trying
* to send another message, we respond by detaching the queue. There's no
* way to return to the original context, but even if there were, just
* queueing the message would amount to indefinitely postponing the
* response to the interrupt. So we do this instead.
*/
if (pq_mq_busy) {
if (pq_mq_handle != NULL)
shm_mq_detach(pq_mq_handle);
pq_mq_handle = NULL;
return EOF;
}
/*
* If the message queue is already gone, just ignore the message. This
* doesn't necessarily indicate a problem; for example, DEBUG messages can
* be generated late in the shutdown sequence, after all DSMs have already
* been detached.
*/
if (pq_mq_handle == NULL)
return 0;
pq_mq_busy = true;
iov[0].data = &msgtype;
iov[0].len = 1;
iov[1].data = s;
iov[1].len = len;
Assert(pq_mq_handle != NULL);
for (;;) {
result = shm_mq_sendv(pq_mq_handle, iov, 2, true);
if (pq_mq_parallel_master_pid != 0)
(void)SendProcSignal(pq_mq_parallel_master_pid,PROCSIG_PARALLEL_MESSAGE,
pq_mq_parallel_master_backend_id);
if (result != SHM_MQ_WOULD_BLOCK)
break;
(void)WaitLatch(&t_thrd.proc->procLatch, WL_LATCH_SET, 0);
ResetLatch(&t_thrd.proc->procLatch);
CHECK_FOR_INTERRUPTS();
}
pq_mq_busy = false;
Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED);
if (result != SHM_MQ_SUCCESS)
return EOF;
return 0;
}
static int mq_putmessage_noblock(char msgtype, const char *s, size_t len)
{
/*
* While the shm_mq machinery does support sending a message in
* non-blocking mode, there's currently no way to try sending beginning to
* send the message that doesn't also commit us to completing the
* transmission. This could be improved in the future, but for now we
* don't need it.
*/
elog(ERROR, "not currently supported");
return 0;
}
static void mq_startcopyout(void)
{
/* Nothing to do. */
}
static void mq_endcopyout(bool errorAbort)
{
/* Nothing to do. */
}
/*
* Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData
* structure with the results.
*/
void pq_parse_errornotice(StringInfo msg, ErrorData *edata)
{
/* Initialize edata with reasonable defaults. */
errno_t rc = memset_s(edata, sizeof(ErrorData), 0, sizeof(ErrorData));
securec_check(rc, "\0", "\0");
edata->elevel = ERROR;
/* Loop over fields and extract each one. */
for (;;) {
char code = pq_getmsgbyte(msg);
const char *value = NULL;
if (code == '\0') {
pq_getmsgend(msg);
break;
}
value = pq_getmsgrawstring(msg);
switch (code) {
case PG_DIAG_SEVERITY:
/* ignore, trusting we'll get a nonlocalized version */
break;
case PG_DIAG_SQLSTATE:
if (strlen(value) != 5) {
elog(ERROR, "invalid SQLSTATE: \"%s\"", value);
}
edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2],
value[3], value[4]);
break;
case PG_DIAG_MESSAGE_PRIMARY:
edata->message = pstrdup(value);
break;
case PG_DIAG_MESSAGE_DETAIL:
edata->detail = pstrdup(value);
break;
case PG_DIAG_MESSAGE_HINT:
edata->hint = pstrdup(value);
break;
case PG_DIAG_STATEMENT_POSITION:
edata->cursorpos = pg_atoi(const_cast<char*>(value), sizeof(int), '\0');
break;
case PG_DIAG_INTERNAL_POSITION:
edata->internalpos = pg_atoi(const_cast<char*>(value), sizeof(int), '\0');
break;
case PG_DIAG_INTERNAL_QUERY:
edata->internalquery = pstrdup(value);
break;
case PG_DIAG_CONTEXT:
edata->context = pstrdup(value);
break;
case PG_DIAG_SOURCE_FILE:
edata->filename = pstrdup(value);
break;
case PG_DIAG_SOURCE_LINE:
edata->lineno = pg_atoi(const_cast<char*>(value), sizeof(int), '\0');
break;
case PG_DIAG_SOURCE_FUNCTION:
edata->funcname = pstrdup(value);
break;
default:
elog(ERROR, "unrecognized error field code: %d", (int) code);
break;
}
}
}

View File

@ -17,6 +17,6 @@ ifneq "$(MAKECMDGOALS)" "clean"
endif
endif
OBJS = ipc.o ipci.o pmsignal.o procarray.o procsignal.o shmem.o shmqueue.o \
sinval.o sinvaladt.o standby.o
sinval.o sinvaladt.o standby.o shm_mq.o
include $(top_srcdir)/src/gausskernel/common.mk

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,7 @@
/* -------------------------------------------------------------------------
*
* libpq.h
* POSTGRES LIBPQ buffer structure definitions.
* POSTGRES LIBPQ buffer structure definitions.
*
*
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
@ -22,19 +22,43 @@
/* ----------------
* PQArgBlock
* Information (pointer to array of this structure) required
* for the PQfn() call. (This probably ought to go somewhere else...)
* Information (pointer to array of this structure) required
* for the PQfn() call. (This probably ought to go somewhere else...)
* ----------------
*/
typedef struct {
int len;
int isint;
union {
int* ptr; /* can't use void (dec compiler barfs) */
int *ptr; /* can't use void (dec compiler barfs) */
int integer;
} u;
} PQArgBlock;
typedef struct {
void (*comm_reset) (void);
int (*flush) (void);
int (*flush_if_writable) (void);
bool (*is_send_pending) (void);
int (*putmessage) (char msgtype, const char* s, size_t len);
int (*putmessage_noblock) (char msgtype, const char* s, size_t len);
void (*startcopyout) (void);
void (*endcopyout) (bool errorAbort);
} PQcommMethods;
extern PGDLLIMPORT THR_LOCAL PQcommMethods *PqCommMethods;
#define pq_comm_reset() (PqCommMethods->comm_reset())
#define pq_flush() (PqCommMethods->flush())
#define pq_flush_if_writable() (PqCommMethods->flush_if_writable())
#define pq_is_send_pending() (PqCommMethods->is_send_pending())
#define pq_putmessage(msgtype, s, len) \
(PqCommMethods->putmessage(msgtype, s, len))
#define pq_putmessage_noblock(msgtype, s, len) \
(PqCommMethods->putmessage_noblock(msgtype, s, len))
#define pq_startcopyout() (PqCommMethods->startcopyout())
#define pq_endcopyout(errorAbort) (PqCommMethods->endcopyout(errorAbort))
/*
* External functions.
*/
@ -49,7 +73,6 @@ extern int StreamConnection(pgsocket server_fd, Port* port);
extern void StreamClose(pgsocket sock);
extern void TouchSocketFile(void);
extern void pq_init(void);
extern void pq_comm_reset(void);
extern int pq_getbytes(char* s, size_t len);
extern int pq_getstring(StringInfo s);
extern int pq_getmessage(StringInfo s, int maxlen);
@ -57,14 +80,7 @@ extern int pq_getbyte(void);
extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char* c);
extern int pq_putbytes(const char* s, size_t len);
extern int pq_flush(void);
extern int pq_flush_if_writable(void);
extern void pq_flush_timedwait(int timeout);
extern bool pq_is_send_pending(void);
extern int pq_putmessage(char msgtype, const char* s, size_t len);
extern int pq_putmessage_noblock(char msgtype, const char* s, size_t len);
extern void pq_startcopyout(void);
extern void pq_endcopyout(bool errorAbort);
extern bool pq_select(int timeout_ms);
extern void pq_abandon_sendbuffer(void);
extern void pq_abandon_recvbuffer(void);

View File

@ -46,6 +46,7 @@ extern const char* pq_getmsgbytes(StringInfo msg, int datalen);
extern void pq_copymsgbytes(StringInfo msg, char* buf, int datalen);
extern char* pq_getmsgtext(StringInfo msg, int rawbytes, int* nbytes);
extern const char* pq_getmsgstring(StringInfo msg);
extern const char* pq_getmsgrawstring(StringInfo msg);
extern void pq_getmsgend(StringInfo msg);
/*

25
src/include/libpq/pqmq.h Normal file
View File

@ -0,0 +1,25 @@
/*-------------------------------------------------------------------------
*
* pqmq.h
* Use the frontend/backend protocol for communication over a shm_mq
*
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/libpq/pqmq.h
*
*-------------------------------------------------------------------------
*/
#ifndef PQMQ_H
#define PQMQ_H
#include "lib/stringinfo.h"
#include "storage/shm_mq.h"
extern void pq_redirect_to_shm_mq(shm_mq_handle* mqh);
extern void pq_stop_redirect_to_shm_mq(void);
extern void pq_set_parallel_master(pid_t pid, BackendId backend_id);
extern void pq_parse_errornotice(StringInfo str, ErrorData* edata);
#endif /* PQMQ_H */

View File

@ -55,6 +55,7 @@ typedef enum {
PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
PROCSIG_EXECUTOR_FLAG,
PROCSIG_PARALLEL_MESSAGE, /* message from cooperating parallel backend */
NUM_PROCSIGNALS /* Must be last! */
} ProcSignalReason;

View File

@ -0,0 +1,82 @@
/*-------------------------------------------------------------------------
*
* shm_mq.h
* single-reader, single-writer shared memory message queue
*
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/storage/shm_mq.h
*
*-------------------------------------------------------------------------
*/
#ifndef SHM_MQ_H
#define SHM_MQ_H
#include "postmaster/bgworker.h"
#include "storage/proc.h"
/* The queue itself, in shared memory. */
struct shm_mq;
typedef struct shm_mq shm_mq;
/* Backend-private state. */
struct shm_mq_handle;
typedef struct shm_mq_handle shm_mq_handle;
/* Descriptors for a single write spanning multiple locations. */
typedef struct {
const char *data;
Size len;
} shm_mq_iovec;
/* Possible results of a send or receive operation. */
typedef enum {
SHM_MQ_SUCCESS, /* Sent or received a message. */
SHM_MQ_WOULD_BLOCK, /* Not completed; retry later. */
SHM_MQ_DETACHED /* Other process has detached queue. */
} shm_mq_result;
/*
* Primitives to create a queue and set the sender and receiver.
*
* Both the sender and the receiver must be set before any messages are read
* or written, but they need not be set by the same process. Each must be
* set exactly once.
*/
extern shm_mq *shm_mq_create(void *address, Size size);
extern void shm_mq_set_receiver(shm_mq *mq, PGPROC *);
extern void shm_mq_set_sender(shm_mq *mq, PGPROC *);
/* Accessor methods for sender and receiver. */
extern PGPROC *shm_mq_get_receiver(shm_mq *);
extern PGPROC *shm_mq_get_sender(shm_mq *);
/* Set up backend-local queue state. */
extern shm_mq_handle *shm_mq_attach(shm_mq *mq, char *seg,
BackgroundWorkerHandle *handle);
/* Associate worker handle with shm_mq. */
extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *);
/* Break connection, release handle resources. */
extern void shm_mq_detach(shm_mq_handle *mqh);
/* Get the shm_mq from handle. */
extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh);
/* Send or receive messages. */
extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
Size nbytes, const void *data, bool nowait);
extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh,
shm_mq_iovec *iov, int iovcnt, bool nowait);
extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
Size *nbytesp, void **datap, bool nowait);
/* Wait for our counterparty to attach to the queue. */
extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
/* Smallest possible queue. */
extern PGDLLIMPORT const Size shm_mq_minimum_size;
#endif /* SHM_MQ_H */