From 4ba086877c61fa7e069f42e0365f4d2200e5352f Mon Sep 17 00:00:00 2001 From: jiang_jianyu Date: Tue, 25 Aug 2020 21:27:33 +0800 Subject: [PATCH] add message queue from PG --- src/common/backend/libpq/Makefile | 2 +- src/common/backend/libpq/pqcomm.cpp | 38 +- src/common/backend/libpq/pqformat.cpp | 28 + src/common/backend/libpq/pqmq.cpp | 270 ++++++ src/gausskernel/storage/ipc/Makefile | 2 +- src/gausskernel/storage/ipc/shm_mq.cpp | 1158 ++++++++++++++++++++++++ src/include/libpq/libpq.h | 40 +- src/include/libpq/pqformat.h | 1 + src/include/libpq/pqmq.h | 25 + src/include/storage/procsignal.h | 1 + src/include/storage/shm_mq.h | 82 ++ 11 files changed, 1625 insertions(+), 22 deletions(-) create mode 100644 src/common/backend/libpq/pqmq.cpp create mode 100644 src/gausskernel/storage/ipc/shm_mq.cpp create mode 100644 src/include/libpq/pqmq.h create mode 100644 src/include/storage/shm_mq.h diff --git a/src/common/backend/libpq/Makefile b/src/common/backend/libpq/Makefile index cfab9c855..3969dd94a 100644 --- a/src/common/backend/libpq/Makefile +++ b/src/common/backend/libpq/Makefile @@ -23,6 +23,6 @@ ifneq "$(MAKECMDGOALS)" "clean" endif endif OBJS = be-fsstubs.o be-secure.o auth.o crypt.o hba.o ip.o md5.o sha2.o pqcomm.o \ - pqformat.o pqsignal.o + pqformat.o pqsignal.o pqmq.o include $(top_srcdir)/src/gausskernel/common.mk diff --git a/src/common/backend/libpq/pqcomm.cpp b/src/common/backend/libpq/pqcomm.cpp index 45e28cf24..b6190904d 100644 --- a/src/common/backend/libpq/pqcomm.cpp +++ b/src/common/backend/libpq/pqcomm.cpp @@ -145,6 +145,28 @@ static int Lock_AF_UNIX(unsigned short portNumber, const char* unixSocketName, b static int Setup_AF_UNIX(bool is_create_psql_sock); #endif /* HAVE_UNIX_SOCKETS */ +static void socket_comm_reset(void); +static int socket_flush(void); +static int socket_flush_if_writable(void); +static bool socket_is_send_pending(void); +static int socket_putmessage(char msgtype, const char *s, size_t len); +static int socket_putmessage_noblock(char msgtype, const char *s, size_t len); +static void socket_startcopyout(void); +static void socket_endcopyout(bool errorAbort); + +static PQcommMethods PqCommSocketMethods = { + socket_comm_reset, + socket_flush, + socket_flush_if_writable, + socket_is_send_pending, + socket_putmessage, + socket_putmessage_noblock, + socket_startcopyout, + socket_endcopyout +}; + +THR_LOCAL PQcommMethods *PqCommMethods = &PqCommSocketMethods; + extern bool FencedUDFMasterMode; /* -------------------------------- @@ -458,7 +480,7 @@ void pq_init(void) * inside a pqcomm.c routine (which ideally will never happen, but...) * -------------------------------- */ -void pq_comm_reset(void) +static void socket_comm_reset(void) { /* Do not throw away pending data, but do reset the busy flag */ t_thrd.libpq_cxt.PqCommBusy = false; @@ -1612,7 +1634,7 @@ static int internal_putbytes(const char* s, size_t len) * returns 0 if OK, EOF if trouble * -------------------------------- */ -int pq_flush(void) +static int socket_flush(void) { int res = 0; @@ -1769,7 +1791,7 @@ static int internal_flush(void) * Returns 0 if OK, or EOF if trouble. * -------------------------------- */ -int pq_flush_if_writable(void) +static int socket_flush_if_writable(void) { int res; @@ -1868,7 +1890,7 @@ void pq_flush_timedwait(int timeout) * pq_is_send_pending - is there any pending data in the output buffer? * -------------------------------- */ -bool pq_is_send_pending(void) +static bool socket_is_send_pending(void) { return (t_thrd.libpq_cxt.PqSendStart < t_thrd.libpq_cxt.PqSendPointer); } @@ -1905,7 +1927,7 @@ bool pq_is_send_pending(void) * returns 0 if OK, EOF if trouble * -------------------------------- */ -int pq_putmessage(char msgtype, const char* s, size_t len) +static int socket_putmessage(char msgtype, const char* s, size_t len) { if (t_thrd.libpq_cxt.DoingCopyOut || t_thrd.libpq_cxt.PqCommBusy) { return 0; @@ -1941,7 +1963,7 @@ fail: * If the output buffer is too small to hold the message, the buffer * is enlarged. */ -int pq_putmessage_noblock(char msgtype, const char* s, size_t len) +static int socket_putmessage_noblock(char msgtype, const char* s, size_t len) { int res; int required; @@ -1967,7 +1989,7 @@ int pq_putmessage_noblock(char msgtype, const char* s, size_t len) * is beginning * -------------------------------- */ -void pq_startcopyout(void) +static void socket_startcopyout(void) { t_thrd.libpq_cxt.DoingCopyOut = true; } @@ -1982,7 +2004,7 @@ void pq_startcopyout(void) * not allow binary transfers, so a textual terminator is always correct. * -------------------------------- */ -void pq_endcopyout(bool errorAbort) +static void socket_endcopyout(bool errorAbort) { if (!t_thrd.libpq_cxt.DoingCopyOut) { return; diff --git a/src/common/backend/libpq/pqformat.cpp b/src/common/backend/libpq/pqformat.cpp index 082875395..3509790d0 100644 --- a/src/common/backend/libpq/pqformat.cpp +++ b/src/common/backend/libpq/pqformat.cpp @@ -610,6 +610,34 @@ const char* pq_getmsgstring(StringInfo msg) return pg_client_to_server(str, slen); } +/* -------------------------------- + * pq_getmsgrawstring - get a null-terminated text string - NO conversion + * + * Returns a pointer directly into the message buffer. + * -------------------------------- + */ +const char *pq_getmsgrawstring(StringInfo msg) +{ + char *str; + int slen; + + str = &msg->data[msg->cursor]; + + /* + * It's safe to use strlen() here because a StringInfo is guaranteed to + * have a trailing null byte. But check we found a null inside the + * message. + */ + slen = strlen(str); + if (msg->cursor + slen >= msg->len) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid string in message"))); + msg->cursor += slen + 1; + + return str; +} + /* -------------------------------- * pq_getmsgend - verify message fully consumed * -------------------------------- diff --git a/src/common/backend/libpq/pqmq.cpp b/src/common/backend/libpq/pqmq.cpp new file mode 100644 index 000000000..274c285a0 --- /dev/null +++ b/src/common/backend/libpq/pqmq.cpp @@ -0,0 +1,270 @@ +/*------------------------------------------------------------------------- + * + * pqmq.cpp + * Use the frontend/backend protocol for communication over a shm_mq + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/common/backend/libpq/pqmq.cpp + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "libpq/pqmq.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" + +static THR_LOCAL shm_mq *pq_mq; +static THR_LOCAL shm_mq_handle *pq_mq_handle; +static THR_LOCAL bool pq_mq_busy = false; +static THR_LOCAL ThreadId pq_mq_parallel_master_pid = 0; +static THR_LOCAL BackendId pq_mq_parallel_master_backend_id = InvalidBackendId; + +static void mq_comm_reset(void); +static int mq_flush(void); +static int mq_flush_if_writable(void); +static bool mq_is_send_pending(void); +static int mq_putmessage(char msgtype, const char *s, size_t len); +static int mq_putmessage_noblock(char msgtype, const char *s, size_t len); +static void mq_startcopyout(void); +static void mq_endcopyout(bool errorAbort); + +static THR_LOCAL PQcommMethods PqCommMqMethods = { + mq_comm_reset, + mq_flush, + mq_flush_if_writable, + mq_is_send_pending, + mq_putmessage, + mq_putmessage_noblock, + mq_startcopyout, + mq_endcopyout +}; + +static THR_LOCAL PQcommMethods *save_PqCommMethods; +static THR_LOCAL CommandDest save_whereToSendOutput; +static THR_LOCAL ProtocolVersion save_FrontendProtocol; + +/* + * Arrange to redirect frontend/backend protocol messages to a message queue. + */ +void pq_redirect_to_shm_mq(shm_mq_handle *mqh) +{ + save_PqCommMethods = PqCommMethods; + save_whereToSendOutput = CommandDest(t_thrd.postgres_cxt.whereToSendOutput); + save_FrontendProtocol = FrontendProtocol; + + PqCommMethods = &PqCommMqMethods; + pq_mq_handle = mqh; + t_thrd.postgres_cxt.whereToSendOutput = static_cast(DestRemote); + FrontendProtocol = PG_PROTOCOL_LATEST; +} + +void pq_stop_redirect_to_shm_mq(void) +{ + PqCommMethods = save_PqCommMethods; + t_thrd.postgres_cxt.whereToSendOutput = static_cast(save_whereToSendOutput); + FrontendProtocol = save_FrontendProtocol; + pq_mq = NULL; + pq_mq_handle = NULL; +} + +/* + * Arrange to SendProcSignal() to the parallel master each time we transmit + * message data via the shm_mq. + */ +void pq_set_parallel_master(ThreadId pid, BackendId backend_id) +{ + Assert(PqCommMethods == &PqCommMqMethods); + pq_mq_parallel_master_pid = pid; + pq_mq_parallel_master_backend_id = backend_id; +} + +static void mq_comm_reset(void) +{ + /* Nothing to do. */ +} + +static int mq_flush(void) +{ + /* Nothing to do. */ + return 0; +} + +static int mq_flush_if_writable(void) +{ + /* Nothing to do. */ + return 0; +} + +static bool mq_is_send_pending(void) +{ + /* There's never anything pending. */ + return false; +} + +/* + * Transmit a libpq protocol message to the shared memory message queue + * selected via pq_mq_handle. We don't include a length word, because the + * receiver will know the length of the message from shm_mq_receive(). + */ +static int mq_putmessage(char msgtype, const char *s, size_t len) +{ + shm_mq_iovec iov[2]; + shm_mq_result result; + + /* + * If we're sending a message, and we have to wait because the queue is + * full, and then we get interrupted, and that interrupt results in trying + * to send another message, we respond by detaching the queue. There's no + * way to return to the original context, but even if there were, just + * queueing the message would amount to indefinitely postponing the + * response to the interrupt. So we do this instead. + */ + if (pq_mq_busy) { + if (pq_mq_handle != NULL) + shm_mq_detach(pq_mq_handle); + pq_mq_handle = NULL; + return EOF; + } + + /* + * If the message queue is already gone, just ignore the message. This + * doesn't necessarily indicate a problem; for example, DEBUG messages can + * be generated late in the shutdown sequence, after all DSMs have already + * been detached. + */ + if (pq_mq_handle == NULL) + return 0; + + pq_mq_busy = true; + + iov[0].data = &msgtype; + iov[0].len = 1; + iov[1].data = s; + iov[1].len = len; + + Assert(pq_mq_handle != NULL); + + for (;;) { + result = shm_mq_sendv(pq_mq_handle, iov, 2, true); + + if (pq_mq_parallel_master_pid != 0) + (void)SendProcSignal(pq_mq_parallel_master_pid,PROCSIG_PARALLEL_MESSAGE, + pq_mq_parallel_master_backend_id); + + if (result != SHM_MQ_WOULD_BLOCK) + break; + + (void)WaitLatch(&t_thrd.proc->procLatch, WL_LATCH_SET, 0); + ResetLatch(&t_thrd.proc->procLatch); + CHECK_FOR_INTERRUPTS(); + } + + pq_mq_busy = false; + + Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED); + if (result != SHM_MQ_SUCCESS) + return EOF; + return 0; +} + +static int mq_putmessage_noblock(char msgtype, const char *s, size_t len) +{ + /* + * While the shm_mq machinery does support sending a message in + * non-blocking mode, there's currently no way to try sending beginning to + * send the message that doesn't also commit us to completing the + * transmission. This could be improved in the future, but for now we + * don't need it. + */ + elog(ERROR, "not currently supported"); + return 0; +} + +static void mq_startcopyout(void) +{ + /* Nothing to do. */ +} + +static void mq_endcopyout(bool errorAbort) +{ + /* Nothing to do. */ +} + +/* + * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData + * structure with the results. + */ +void pq_parse_errornotice(StringInfo msg, ErrorData *edata) +{ + /* Initialize edata with reasonable defaults. */ + errno_t rc = memset_s(edata, sizeof(ErrorData), 0, sizeof(ErrorData)); + securec_check(rc, "\0", "\0"); + edata->elevel = ERROR; + + /* Loop over fields and extract each one. */ + for (;;) { + char code = pq_getmsgbyte(msg); + const char *value = NULL; + + if (code == '\0') { + pq_getmsgend(msg); + break; + } + value = pq_getmsgrawstring(msg); + + switch (code) { + case PG_DIAG_SEVERITY: + /* ignore, trusting we'll get a nonlocalized version */ + break; + case PG_DIAG_SQLSTATE: + if (strlen(value) != 5) { + elog(ERROR, "invalid SQLSTATE: \"%s\"", value); + } + edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2], + value[3], value[4]); + break; + case PG_DIAG_MESSAGE_PRIMARY: + edata->message = pstrdup(value); + break; + case PG_DIAG_MESSAGE_DETAIL: + edata->detail = pstrdup(value); + break; + case PG_DIAG_MESSAGE_HINT: + edata->hint = pstrdup(value); + break; + case PG_DIAG_STATEMENT_POSITION: + edata->cursorpos = pg_atoi(const_cast(value), sizeof(int), '\0'); + break; + case PG_DIAG_INTERNAL_POSITION: + edata->internalpos = pg_atoi(const_cast(value), sizeof(int), '\0'); + break; + case PG_DIAG_INTERNAL_QUERY: + edata->internalquery = pstrdup(value); + break; + case PG_DIAG_CONTEXT: + edata->context = pstrdup(value); + break; + case PG_DIAG_SOURCE_FILE: + edata->filename = pstrdup(value); + break; + case PG_DIAG_SOURCE_LINE: + edata->lineno = pg_atoi(const_cast(value), sizeof(int), '\0'); + break; + case PG_DIAG_SOURCE_FUNCTION: + edata->funcname = pstrdup(value); + break; + default: + elog(ERROR, "unrecognized error field code: %d", (int) code); + break; + } + } +} + diff --git a/src/gausskernel/storage/ipc/Makefile b/src/gausskernel/storage/ipc/Makefile index 5ce67f767..010ea8de9 100644 --- a/src/gausskernel/storage/ipc/Makefile +++ b/src/gausskernel/storage/ipc/Makefile @@ -17,6 +17,6 @@ ifneq "$(MAKECMDGOALS)" "clean" endif endif OBJS = ipc.o ipci.o pmsignal.o procarray.o procsignal.o shmem.o shmqueue.o \ - sinval.o sinvaladt.o standby.o + sinval.o sinvaladt.o standby.o shm_mq.o include $(top_srcdir)/src/gausskernel/common.mk diff --git a/src/gausskernel/storage/ipc/shm_mq.cpp b/src/gausskernel/storage/ipc/shm_mq.cpp new file mode 100644 index 000000000..7c731336b --- /dev/null +++ b/src/gausskernel/storage/ipc/shm_mq.cpp @@ -0,0 +1,1158 @@ +/*------------------------------------------------------------------------- + * + * shm_mq.cpp + * single-reader, single-writer message queue + * + * Both the sender and the receiver must have a PGPROC; their respective + * process latches are used for synchronization. Only the sender may send, + * and only the receiver may receive. This is intended to allow a user + * backend to communicate with worker backends that it has registered. + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/gausskernel/storage/ipc/shm_mq.cpp + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "storage/procsignal.h" +#include "storage/shm_mq.h" +#include "storage/spin.h" +#include "gs_threadlocal.h" +#include "gs_thread.h" + +/* + * This structure represents the actual queue. + * + * Some notes on synchronization: + * + * mq_receiver and mq_bytes_read can only be changed by the receiver; and + * mq_sender and mq_bytes_written can only be changed by the sender. + * mq_receiver and mq_sender are protected by mq_mutex, although, importantly, + * they cannot change once set, and thus may be read without a lock once this + * is known to be the case. + * + * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead, + * they are written atomically using 8 byte loads and stores. Memory barriers + * must be carefully used to synchronize reads and writes of these values with + * reads and writes of the actual data in mq_ring. + * + * mq_detached needs no locking. It can be set by either the sender or the + * receiver, but only ever from false to true, so redundant writes don't + * matter. It is important that if we set mq_detached and then set the + * counterparty's latch, the counterparty must be certain to see the change + * after waking up. Since SetLatch begins with a memory barrier and ResetLatch + * ends with one, this should be OK. + * + * mq_ring_size and mq_ring_offset never change after initialization, and + * can therefore be read without the lock. + * + * Importantly, mq_ring can be safely read and written without a lock. + * At any given time, the difference between mq_bytes_read and + * mq_bytes_written defines the number of bytes within mq_ring that contain + * unread data, and mq_bytes_read defines the position where those bytes + * begin. The sender can increase the number of unread bytes at any time, + * but only the receiver can give license to overwrite those bytes, by + * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read + * the unread bytes it knows to be present without the lock. Conversely, + * the sender can write to the unused portion of the ring buffer without + * the lock, because nobody else can be reading or writing those bytes. The + * receiver could be making more bytes unused by incrementing mq_bytes_read, + * but that's OK. Note that it would be unsafe for the receiver to read any + * data it's already marked as read, or to write any data; and it would be + * unsafe for the sender to reread any data after incrementing + * mq_bytes_written, but fortunately there's no need for any of that. + */ +struct shm_mq { + slock_t mq_mutex; + PGPROC *mq_receiver; + PGPROC *mq_sender; + pg_atomic_uint64 mq_bytes_read; + pg_atomic_uint64 mq_bytes_written; + Size mq_ring_size; + bool mq_detached; + uint8 mq_ring_offset; + char mq_ring[FLEXIBLE_ARRAY_MEMBER]; +}; + +/* + * This structure is a backend-private handle for access to a queue. + * + * mqh_queue is a pointer to the queue we've attached. + * + * If this queue is intended to connect the current process with a background + * worker that started it, the user can pass a pointer to the worker handle + * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this + * is to allow us to begin sending to or receiving from that queue before the + * process we'll be communicating with has even been started. If it fails + * to start, the handle will allow us to notice that and fail cleanly, rather + * than waiting forever; see shm_mq_wait_internal. This is mostly useful in + * simple cases - e.g. where there are just 2 processes communicating; in + * more complex scenarios, every process may not have a BackgroundWorkerHandle + * available, or may need to watch for the failure of more than one other + * process at a time. + * + * When a message exists as a contiguous chunk of bytes in the queue - that is, + * it is smaller than the size of the ring buffer and does not wrap around + * the end - we return the message to the caller as a pointer into the buffer. + * For messages that are larger or happen to wrap, we reassemble the message + * locally by copying the chunks into a backend-local buffer. mqh_buffer is + * the buffer, and mqh_buflen is the number of bytes allocated for it. + * + * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete + * are used to track the state of non-blocking operations. When the caller + * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they + * are expected to retry the call at a later time with the same argument; + * we need to retain enough state to pick up where we left off. + * mqh_length_word_complete tracks whether we are done sending or receiving + * (whichever we're doing) the entire length word. mqh_partial_bytes tracks + * the number of bytes read or written for either the length word or the + * message itself, and mqh_expected_bytes - which is used only for reads - + * tracks the expected total size of the payload. + * + * mqh_counterparty_attached tracks whether we know the counterparty to have + * attached to the queue at some previous point. This lets us avoid some + * mutex acquisitions. + * + * mqh_context is the memory context in effect at the time we attached to + * the shm_mq. The shm_mq_handle itself is allocated in this context, and + * we make sure any other allocations we do happen in this context as well, + * to avoid nasty surprises. + */ +struct shm_mq_handle { + shm_mq *mqh_queue; + char *mqh_segment; + BackgroundWorkerHandle *mqh_handle; + char *mqh_buffer; + Size mqh_buflen; + Size mqh_consume_pending; + Size mqh_partial_bytes; + Size mqh_expected_bytes; + bool mqh_length_word_complete; + bool mqh_counterparty_attached; + MemoryContext mqh_context; +}; + +static void shm_mq_detach_internal(shm_mq *mq); +static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, + const void *data, bool nowait, Size *bytes_written); +static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,Size bytes_needed, bool nowait, + Size *nbytesp, void **datap); +static bool shm_mq_counterparty_gone(shm_mq *mq, + BackgroundWorkerHandle *handle); +static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, + BackgroundWorkerHandle *handle); +static void shm_mq_inc_bytes_read(shm_mq *mq, Size n); +static void shm_mq_inc_bytes_written(shm_mq *mq, Size n); + +/* Minimum queue size is enough for header and at least one chunk of data. */ +const Size shm_mq_minimum_size = MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF; + +#define MQH_INITIAL_BUFSIZE 8192 + +/* + * Initialize a new shared message queue. + */ +shm_mq *shm_mq_create(void *address, Size size) +{ + shm_mq *mq = (shm_mq*)address; + Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring)); + + /* If the size isn't MAXALIGN'd, just discard the odd bytes. */ + size = MAXALIGN_DOWN(size); + + /* Queue size must be large enough to hold some data. */ + Assert(size > data_offset); + + /* Initialize queue header. */ + SpinLockInit(&mq->mq_mutex); + mq->mq_receiver = NULL; + mq->mq_sender = NULL; + pg_atomic_init_u64(&mq->mq_bytes_read, 0); + pg_atomic_init_u64(&mq->mq_bytes_written, 0); + mq->mq_ring_size = size - data_offset; + mq->mq_detached = false; + mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring); + + return mq; +} + +/* + * Set the identity of the process that will receive from a shared message + * queue. + */ +void shm_mq_set_receiver(shm_mq *mq, PGPROC *proc) +{ + PGPROC *sender = NULL; + + SpinLockAcquire(&mq->mq_mutex); + Assert(mq->mq_receiver == NULL); + mq->mq_receiver = proc; + sender = mq->mq_sender; + SpinLockRelease(&mq->mq_mutex); + + if (sender != NULL) + SetLatch(&sender->procLatch); +} + +/* + * Set the identity of the process that will send to a shared message queue. + */ +void shm_mq_set_sender(shm_mq *mq, PGPROC *proc) +{ + PGPROC *receiver = NULL; + + SpinLockAcquire(&mq->mq_mutex); + Assert(mq->mq_sender == NULL); + mq->mq_sender = proc; + receiver = mq->mq_receiver; + SpinLockRelease(&mq->mq_mutex); + + if (receiver != NULL) + SetLatch(&receiver->procLatch); +} + +/* + * Get the configured receiver. + */ +PGPROC *shm_mq_get_receiver(shm_mq *mq) +{ + PGPROC *receiver = NULL; + + SpinLockAcquire(&mq->mq_mutex); + receiver = mq->mq_receiver; + SpinLockRelease(&mq->mq_mutex); + + return receiver; +} + +/* + * Get the configured sender. + */ +PGPROC *shm_mq_get_sender(shm_mq *mq) +{ + PGPROC *sender = NULL; + + SpinLockAcquire(&mq->mq_mutex); + sender = mq->mq_sender; + SpinLockRelease(&mq->mq_mutex); + + return sender; +} + +/* + * Attach to a shared message queue so we can send or receive messages. + * + * The memory context in effect at the time this function is called should + * be one which will last for at least as long as the message queue itself. + * We'll allocate the handle in that context, and future allocations that + * are needed to buffer incoming data will happen in that context as well. + * + * + * If handle != NULL, the queue can be read or written even before the + * other process has attached. We'll wait for it to do so if needed. The + * handle must be for a background worker initialized with bgw_notify_pid + * equal to our PID. + * + * shm_mq_detach() should be called when done. This will free the + * shm_mq_handle and mark the queue itself as detached, so that our + * counterpart won't get stuck waiting for us to fill or drain the queue + * after we've already lost interest. + */ +shm_mq_handle *shm_mq_attach(shm_mq *mq, char *seg, BackgroundWorkerHandle *handle) +{ + shm_mq_handle *mqh = (shm_mq_handle*)palloc(sizeof(shm_mq_handle)); + + Assert(mq->mq_receiver == t_thrd.proc || mq->mq_sender == t_thrd.proc); + mqh->mqh_queue = mq; + mqh->mqh_segment = seg; + mqh->mqh_handle = handle; + mqh->mqh_buffer = NULL; + mqh->mqh_buflen = 0; + mqh->mqh_consume_pending = 0; + mqh->mqh_partial_bytes = 0; + mqh->mqh_expected_bytes = 0; + mqh->mqh_length_word_complete = false; + mqh->mqh_counterparty_attached = false; + mqh->mqh_context = CurrentMemoryContext; + + return mqh; +} + +/* + * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had + * been passed to shm_mq_attach. + */ +void shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle) +{ + Assert(mqh->mqh_handle == NULL); + mqh->mqh_handle = handle; +} + +/* + * Write a message into a shared message queue. + */ +shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait) +{ + shm_mq_iovec iov; + + iov.data = (const char*)data; + iov.len = nbytes; + + return shm_mq_sendv(mqh, &iov, 1, nowait); +} + +/* + * Write a message into a shared message queue, gathered from multiple + * addresses. + * + * When nowait = false, we'll wait on our process latch when the ring buffer + * fills up, and then continue writing once the receiver has drained some data. + * The process latch is reset after each wait. + * + * When nowait = true, we do not manipulate the state of the process latch; + * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In + * this case, the caller should call this function again, with the same + * arguments, each time the process latch is set. (Once begun, the sending + * of a message cannot be aborted except by detaching from the queue; changing + * the length or payload will corrupt the queue.) + */ +shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait) +{ + shm_mq_result res; + shm_mq *mq = mqh->mqh_queue; + PGPROC *receiver = NULL; + Size nbytes = 0; + Size bytes_written; + int i; + int which_iov = 0; + Size offset; + + Assert(mq->mq_sender == t_thrd.proc); + + /* Compute total size of write. */ + for (i = 0; i < iovcnt; ++i) + nbytes += iov[i].len; + + /* Try to write, or finish writing, the length word into the buffer. */ + while (!mqh->mqh_length_word_complete) { + Assert(mqh->mqh_partial_bytes < sizeof(Size)); + res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes, + ((char *)&nbytes) + mqh->mqh_partial_bytes, + nowait, &bytes_written); + if (res == SHM_MQ_DETACHED) { + /* Reset state in case caller tries to send another message. */ + mqh->mqh_partial_bytes = 0; + mqh->mqh_length_word_complete = false; + return res; + } + mqh->mqh_partial_bytes += bytes_written; + + if (mqh->mqh_partial_bytes >= sizeof(Size)) { + Assert(mqh->mqh_partial_bytes == sizeof(Size)); + + mqh->mqh_partial_bytes = 0; + mqh->mqh_length_word_complete = true; + } + + if (res != SHM_MQ_SUCCESS) + return res; + + /* Length word can't be split unless bigger than required alignment. */ + Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF); + } + + /* Write the actual data bytes into the buffer. */ + Assert(mqh->mqh_partial_bytes <= nbytes); + offset = mqh->mqh_partial_bytes; + do { + Size chunksize; + + /* Figure out which bytes need to be sent next. */ + if (offset >= iov[which_iov].len) { + offset -= iov[which_iov].len; + ++which_iov; + if (which_iov >= iovcnt) + break; + continue; + } + + /* + * We want to avoid copying the data if at all possible, but every + * chunk of bytes we write into the queue has to be MAXALIGN'd, except + * the last. Thus, if a chunk other than the last one ends on a + * non-MAXALIGN'd boundary, we have to combine the tail end of its + * data with data from one or more following chunks until we either + * reach the last chunk or accumulate a number of bytes which is + * MAXALIGN'd. + */ + if (which_iov + 1 < iovcnt && + offset + MAXIMUM_ALIGNOF > iov[which_iov].len) { + char tmpbuf[MAXIMUM_ALIGNOF]; + Size j = 0; + + for (;;) { + if (offset < iov[which_iov].len) { + tmpbuf[j] = iov[which_iov].data[offset]; + j++; + offset++; + if (j == MAXIMUM_ALIGNOF) + break; + } else { + offset -= iov[which_iov].len; + which_iov++; + if (which_iov >= iovcnt) + break; + } + } + + res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written); + if (res == SHM_MQ_DETACHED) { + /* Reset state in case caller tries to send another message. */ + mqh->mqh_partial_bytes = 0; + mqh->mqh_length_word_complete = false; + return res; + } + + mqh->mqh_partial_bytes += bytes_written; + if (res != SHM_MQ_SUCCESS) + return res; + continue; + } + + /* + * If this is the last chunk, we can write all the data, even if it + * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to + * MAXALIGN_DOWN the write size. + */ + chunksize = iov[which_iov].len - offset; + if (which_iov + 1 < iovcnt) + chunksize = MAXALIGN_DOWN(chunksize); + res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset], + nowait, &bytes_written); + if (res == SHM_MQ_DETACHED) { + /* Reset state in case caller tries to send another message. */ + mqh->mqh_length_word_complete = false; + mqh->mqh_partial_bytes = 0; + return res; + } + + mqh->mqh_partial_bytes += bytes_written; + offset += bytes_written; + if (res != SHM_MQ_SUCCESS) + return res; + } while (mqh->mqh_partial_bytes < nbytes); + + /* Reset for next message. */ + mqh->mqh_partial_bytes = 0; + mqh->mqh_length_word_complete = false; + + /* If queue has been detached, let caller know. */ + if (mq->mq_detached) + return SHM_MQ_DETACHED; + + /* + * If the counterparty is known to have attached, we can read mq_receiver + * without acquiring the spinlock and assume it isn't NULL. Otherwise, + * more caution is needed. + */ + if (mqh->mqh_counterparty_attached) { + receiver = mq->mq_receiver; + } else { + SpinLockAcquire(&mq->mq_mutex); + receiver = mq->mq_receiver; + SpinLockRelease(&mq->mq_mutex); + if (receiver == NULL) + return SHM_MQ_SUCCESS; + mqh->mqh_counterparty_attached = true; + } + + /* Notify receiver of the newly-written data, and return. */ + SetLatch(&receiver->procLatch); + return SHM_MQ_SUCCESS; +} + +/* + * Receive a message from a shared message queue. + * + * We set *nbytes to the message length and *data to point to the message + * payload. If the entire message exists in the queue as a single, + * contiguous chunk, *data will point directly into shared memory; otherwise, + * it will point to a temporary buffer. This mostly avoids data copying in + * the hoped-for case where messages are short compared to the buffer size, + * while still allowing longer messages. In either case, the return value + * remains valid until the next receive operation is performed on the queue. + * + * When nowait = false, we'll wait on our process latch when the ring buffer + * is empty and we have not yet received a full message. The sender will + * set our process latch after more data has been written, and we'll resume + * processing. Each call will therefore return a complete message + * (unless the sender detaches the queue). + * + * When nowait = true, we do not manipulate the state of the process latch; + * instead, whenever the buffer is empty and we need to read from it, we + * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this + * function again after the process latch has been set. + */ +shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait) +{ + shm_mq *mq = mqh->mqh_queue; + shm_mq_result res; + Size rb = 0; + Size nbytes; + void *rawdata = NULL; + + Assert(mq->mq_receiver == t_thrd.proc); + + /* We can't receive data until the sender has attached. */ + if (!mqh->mqh_counterparty_attached) { + if (nowait) { + int counterparty_gone; + + /* + * We shouldn't return at this point at all unless the sender + * hasn't attached yet. However, the correct return value depends + * on whether the sender is still attached. If we first test + * whether the sender has ever attached and then test whether the + * sender has detached, there's a race condition: a sender that + * attaches and detaches very quickly might fool us into thinking + * the sender never attached at all. So, test whether our + * counterparty is definitively gone first, and only afterwards + * check whether the sender ever attached in the first place. + */ + counterparty_gone = (int)shm_mq_counterparty_gone(mq, mqh->mqh_handle); + if (shm_mq_get_sender(mq) == NULL) { + if (counterparty_gone) + return SHM_MQ_DETACHED; + else + return SHM_MQ_WOULD_BLOCK; + } + } else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle) + && shm_mq_get_sender(mq) == NULL) { + mq->mq_detached = true; + return SHM_MQ_DETACHED; + } + mqh->mqh_counterparty_attached = true; + } + + /* + * If we've consumed an amount of data greater than 1/4th of the ring + * size, mark it consumed in shared memory. We try to avoid doing this + * unnecessarily when only a small amount of data has been consumed, + * because SetLatch() is fairly expensive and we don't want to do it too + * often. + */ + if (mqh->mqh_consume_pending > mq->mq_ring_size / 4) { + shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending); + mqh->mqh_consume_pending = 0; + } + + /* Try to read, or finish reading, the length word from the buffer. */ + while (!mqh->mqh_length_word_complete) { + /* Try to receive the message length word. */ + Assert(mqh->mqh_partial_bytes < sizeof(Size)); + res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes, + nowait, &rb, &rawdata); + if (res != SHM_MQ_SUCCESS) + return res; + + /* + * Hopefully, we'll receive the entire message length word at once. + * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over + * multiple reads. + */ + if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size)) { + Size needed; + + nbytes = *(Size *)rawdata; + + /* If we've already got the whole message, we're done. */ + needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes); + if (rb >= needed) { + mqh->mqh_consume_pending += needed; + *nbytesp = nbytes; + *datap = ((char *)rawdata) + MAXALIGN(sizeof(Size)); + return SHM_MQ_SUCCESS; + } + + /* + * We don't have the whole message, but we at least have the whole + * length word. + */ + mqh->mqh_expected_bytes = nbytes; + mqh->mqh_length_word_complete = true; + mqh->mqh_consume_pending += MAXALIGN(sizeof(Size)); + rb -= MAXALIGN(sizeof(Size)); + } else { + Size lengthbytes; + + /* Can't be split unless bigger than required alignment. */ + Assert(sizeof(Size) > MAXIMUM_ALIGNOF); + + /* Message word is split; need buffer to reassemble. */ + if (mqh->mqh_buffer == NULL) { + mqh->mqh_buffer = (char*)MemoryContextAlloc(mqh->mqh_context, + MQH_INITIAL_BUFSIZE); + mqh->mqh_buflen = MQH_INITIAL_BUFSIZE; + } + Assert(mqh->mqh_buflen >= sizeof(Size)); + + /* Copy partial length word; remember to consume it. */ + if (mqh->mqh_partial_bytes + rb > sizeof(Size)) + lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes; + else + lengthbytes = rb; + errno_t rc = memcpy_s(&mqh->mqh_buffer[mqh->mqh_partial_bytes], lengthbytes, + rawdata, lengthbytes); + securec_check(rc, "\0", "\0"); + mqh->mqh_partial_bytes += lengthbytes; + mqh->mqh_consume_pending += MAXALIGN(lengthbytes); + rb -= lengthbytes; + + /* If we now have the whole word, we're ready to read payload. */ + if (mqh->mqh_partial_bytes >= sizeof(Size)) { + Assert(mqh->mqh_partial_bytes == sizeof(Size)); + mqh->mqh_expected_bytes = *(Size *)mqh->mqh_buffer; + mqh->mqh_length_word_complete = true; + mqh->mqh_partial_bytes = 0; + } + } + } + nbytes = mqh->mqh_expected_bytes; + + if (mqh->mqh_partial_bytes == 0) { + /* + * Try to obtain the whole message in a single chunk. If this works, + * we need not copy the data and can return a pointer directly into + * shared memory. + */ + res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata); + if (res != SHM_MQ_SUCCESS) + return res; + if (rb >= nbytes) { + mqh->mqh_length_word_complete = false; + mqh->mqh_consume_pending += MAXALIGN(nbytes); + *nbytesp = nbytes; + *datap = rawdata; + return SHM_MQ_SUCCESS; + } + + /* + * The message has wrapped the buffer. We'll need to copy it in order + * to return it to the client in one chunk. First, make sure we have + * a large enough buffer available. + */ + if (mqh->mqh_buflen < nbytes) { + Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE); + + while (newbuflen < nbytes) + newbuflen *= 2; + + if (mqh->mqh_buffer != NULL) { + pfree(mqh->mqh_buffer); + mqh->mqh_buffer = NULL; + mqh->mqh_buflen = 0; + } + mqh->mqh_buffer = (char*)MemoryContextAlloc(mqh->mqh_context, newbuflen); + mqh->mqh_buflen = newbuflen; + } + } + + /* Loop until we've copied the entire message. */ + for (;;) { + Size still_needed; + + /* Copy as much as we can. */ + Assert(mqh->mqh_partial_bytes + rb <= nbytes); + errno_t rc = memcpy_s(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rb, rawdata, rb); + securec_check(rc, "\0", "\0"); + mqh->mqh_partial_bytes += rb; + + /* + * Update count of bytes that can be consumed, accounting for + * alignment padding. Note that this will never actually insert any + * padding except at the end of a message, because the buffer size is + * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well. + */ + Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb)); + mqh->mqh_consume_pending += MAXALIGN(rb); + + /* If we got all the data, exit the loop. */ + if (mqh->mqh_partial_bytes >= nbytes) + break; + + /* Wait for some more data. */ + still_needed = nbytes - mqh->mqh_partial_bytes; + res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata); + if (res != SHM_MQ_SUCCESS) + return res; + if (rb > still_needed) + rb = still_needed; + } + + /* Return the complete message, and reset for next message. */ + *nbytesp = nbytes; + *datap = mqh->mqh_buffer; + mqh->mqh_length_word_complete = false; + mqh->mqh_partial_bytes = 0; + return SHM_MQ_SUCCESS; +} + +/* + * Wait for the other process that's supposed to use this queue to attach + * to it. + * + * The return value is SHM_MQ_DETACHED if the worker has already detached or + * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached. + * Note that we will only be able to detect that the worker has died before + * attaching if a background worker handle was passed to shm_mq_attach(). + */ +shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh) +{ + shm_mq *mq = mqh->mqh_queue; + PGPROC **victim; + + if (shm_mq_get_receiver(mq) == t_thrd.proc) { + victim = &mq->mq_sender; + } else { + Assert(shm_mq_get_sender(mq) == t_thrd.proc); + victim = &mq->mq_receiver; + } + + if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle)) + return SHM_MQ_SUCCESS; + else + return SHM_MQ_DETACHED; +} + +/* + * Detach from a shared message queue, and destroy the shm_mq_handle. + */ +void shm_mq_detach(shm_mq_handle *mqh) +{ + /* Notify counterparty that we're outta here. */ + shm_mq_detach_internal(mqh->mqh_queue); + + /* Release local memory associated with handle. */ + if (mqh->mqh_buffer != NULL) + pfree(mqh->mqh_buffer); + pfree(mqh); +} + +/* + * Notify counterparty that we're detaching from shared message queue. + * + * The purpose of this function is to make sure that the process + * with which we're communicating doesn't block forever waiting for us to + * fill or drain the queue once we've lost interest. When the sender + * detaches, the receiver can read any messages remaining in the queue; + * further reads will return SHM_MQ_DETACHED. If the receiver detaches, + * further attempts to send messages will likewise return SHM_MQ_DETACHED. + * + * This is separated out from shm_mq_detach() because if the on_dsm_detach + * callback fires, we only want to do this much. We do not try to touch + * the local shm_mq_handle, as it may have been pfree'd already. + */ +static void shm_mq_detach_internal(shm_mq *mq) +{ + PGPROC *victim = NULL; + + SpinLockAcquire(&mq->mq_mutex); + if (mq->mq_sender == t_thrd.proc) { + victim = mq->mq_receiver; + } else { + Assert(mq->mq_receiver == t_thrd.proc); + victim = mq->mq_sender; + } + mq->mq_detached = true; + SpinLockRelease(&mq->mq_mutex); + + if (victim != NULL) { + SetLatch(&victim->procLatch); + } +} + +/* + * Get the shm_mq from handle. + */ +shm_mq *shm_mq_get_queue(shm_mq_handle *mqh) +{ + return mqh->mqh_queue; +} + +/* + * Write bytes into a shared message queue. + */ +static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, + bool nowait, Size *bytes_written) +{ + shm_mq *mq = mqh->mqh_queue; + Size sent = 0; + uint64 used; + Size ringsize = mq->mq_ring_size; + Size available; + + while (sent < nbytes) { + uint64 rb; + uint64 wb; + + /* Compute number of ring buffer bytes used and available. */ + rb = pg_atomic_read_u64(&mq->mq_bytes_read); + wb = pg_atomic_read_u64(&mq->mq_bytes_written); + Assert(wb >= rb); + used = wb - rb; + Assert(used <= ringsize); + available = Min(ringsize - used, nbytes - sent); + + /* + * Bail out if the queue has been detached. Note that we would be in + * trouble if the compiler decided to cache the value of + * mq->mq_detached in a register or on the stack across loop + * iterations. It probably shouldn't do that anyway since we'll + * always return, call an external function that performs a system + * call, or reach a memory barrier at some point later in the loop, + * but just to be sure, insert a compiler barrier here. + */ + pg_compiler_barrier(); + if (mq->mq_detached) { + *bytes_written = sent; + return SHM_MQ_DETACHED; + } + + if (available == 0 && !mqh->mqh_counterparty_attached) { + /* + * The queue is full, so if the receiver isn't yet known to be + * attached, we must wait for that to happen. + */ + if (nowait) { + if (shm_mq_counterparty_gone(mq, mqh->mqh_handle)) { + *bytes_written = sent; + return SHM_MQ_DETACHED; + } + if (shm_mq_get_receiver(mq) == NULL) { + *bytes_written = sent; + return SHM_MQ_WOULD_BLOCK; + } + } else if (!shm_mq_wait_internal(mq, &mq->mq_receiver, mqh->mqh_handle)) { + mq->mq_detached = true; + *bytes_written = sent; + return SHM_MQ_DETACHED; + } + mqh->mqh_counterparty_attached = true; + + /* + * The receiver may have read some data after attaching, so we + * must not wait without rechecking the queue state. + */ + } else if (available == 0) { + /* + * Since mq->mqh_counterparty_attached is known to be true at this + * point, mq_receiver has been set, and it can't change once set. + * Therefore, we can read it without acquiring the spinlock. + */ + Assert(mqh->mqh_counterparty_attached); + SetLatch(&mq->mq_receiver->procLatch); + + /* Skip manipulation of our latch if nowait = true. */ + if (nowait) { + *bytes_written = sent; + return SHM_MQ_WOULD_BLOCK; + } + + /* + * Wait for our latch to be set. It might already be set for some + * unrelated reason, but that'll just result in one extra trip + * through the loop. It's worth it to avoid resetting the latch + * at top of loop, because setting an already-set latch is much + * cheaper than setting one that has been reset. + */ + (void)WaitLatch(&t_thrd.proc->procLatch, WL_LATCH_SET, 0); + + /* Reset the latch so we don't spin. */ + ResetLatch(&t_thrd.proc->procLatch); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + } else { + Size offset; + Size sendnow; + + offset = wb % (uint64)ringsize; + sendnow = Min(available, ringsize - offset); + + /* + * Write as much data as we can via a single memcpy(). Make sure + * these writes happen after the read of mq_bytes_read, above. + * This barrier pairs with the one in shm_mq_inc_bytes_read. + * (Since we're separating the read of mq_bytes_read from a + * subsequent write to mq_ring, we need a full barrier here.) + */ + pg_memory_barrier(); + errno_t rc = memcpy_s(&mq->mq_ring[mq->mq_ring_offset + offset], sendnow, + (char*)data + sent, sendnow); + securec_check(rc, "\0", "\0"); + sent += sendnow; + + /* + * Update count of bytes written, with alignment padding. Note + * that this will never actually insert any padding except at the + * end of a run of bytes, because the buffer size is a multiple of + * MAXIMUM_ALIGNOF, and each read is as well. + */ + Assert(sent == nbytes || sendnow == MAXALIGN(sendnow)); + shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow)); + + /* + * For efficiency, we don't set the reader's latch here. We'll do + * that only when the buffer fills up or after writing an entire + * message. + */ + } + } + + *bytes_written = sent; + return SHM_MQ_SUCCESS; +} + +/* + * Wait until at least *nbytesp bytes are available to be read from the + * shared message queue, or until the buffer wraps around. If the queue is + * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait + * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set + * to the location at which data bytes can be read, *nbytesp is set to the + * number of bytes which can be read at that address, and the return value + * is SHM_MQ_SUCCESS. + */ +static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, + Size *nbytesp, void **datap) +{ + shm_mq *mq = mqh->mqh_queue; + Size ringsize = mq->mq_ring_size; + uint64 used; + uint64 written; + + for (;;) { + Size offset; + uint64 read; + + /* Get bytes written, so we can compute what's available to read. */ + written = pg_atomic_read_u64(&mq->mq_bytes_written); + + /* + * Get bytes read. Include bytes we could consume but have not yet + * consumed. + */ + read = pg_atomic_read_u64(&mq->mq_bytes_read) + + mqh->mqh_consume_pending; + used = written - read; + Assert(used <= ringsize); + offset = read % (uint64)ringsize; + + /* If we have enough data or buffer has wrapped, we're done. */ + if (used >= bytes_needed || offset + used >= ringsize) { + *nbytesp = Min(used, ringsize - offset); + *datap = &mq->mq_ring[mq->mq_ring_offset + offset]; + + /* + * Separate the read of mq_bytes_written, above, from caller's + * attempt to read the data itself. Pairs with the barrier in + * shm_mq_inc_bytes_written. + */ + pg_read_barrier(); + return SHM_MQ_SUCCESS; + } + + /* + * Fall out before waiting if the queue has been detached. + * + * Note that we don't check for this until *after* considering whether + * the data already available is enough, since the receiver can finish + * receiving a message stored in the buffer even after the sender has + * detached. + */ + if (mq->mq_detached) { + /* + * If the writer advanced mq_bytes_written and then set + * mq_detached, we might not have read the final value of + * mq_bytes_written above. Insert a read barrier and then check + * again if mq_bytes_written has advanced. + */ + pg_read_barrier(); + if (written != pg_atomic_read_u64(&mq->mq_bytes_written)) + continue; + + return SHM_MQ_DETACHED; + } + + /* + * We didn't get enough data to satisfy the request, so mark any data + * previously-consumed as read to make more buffer space. + */ + if (mqh->mqh_consume_pending > 0) { + shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending); + mqh->mqh_consume_pending = 0; + } + + /* Skip manipulation of our latch if nowait = true. */ + if (nowait) + return SHM_MQ_WOULD_BLOCK; + + /* + * Wait for our latch to be set. It might already be set for some + * unrelated reason, but that'll just result in one extra trip through + * the loop. It's worth it to avoid resetting the latch at top of + * loop, because setting an already-set latch is much cheaper than + * setting one that has been reset. + */ + (void)WaitLatch(&t_thrd.proc->procLatch, WL_LATCH_SET, 0); + + /* Reset the latch so we don't spin. */ + ResetLatch(&t_thrd.proc->procLatch); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + } +} + +/* + * Test whether a counterparty who may not even be alive yet is definitely gone. + */ +static bool shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle) +{ + ThreadId pid; + + /* If the queue has been detached, counterparty is definitely gone. */ + if (mq->mq_detached) { + return true; + } + + /* If there's a handle, check worker status. */ + if (handle != NULL) { + BgwHandleStatus status; + + /* Check for unexpected worker death. */ + status = GetBackgroundWorkerPid(handle, &pid); + if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED) { + /* Mark it detached, just to make it official. */ + mq->mq_detached = true; + return true; + } + } + + /* Counterparty is not definitively gone. */ + return false; +} + +/* + * This is used when a process is waiting for its counterpart to attach to the + * queue. We exit when the other process attaches as expected, or, if + * handle != NULL, when the referenced background process or the postmaster + * dies. Note that if handle == NULL, and the process fails to attach, we'll + * potentially get stuck here forever waiting for a process that may never + * start. We do check for interrupts, though. + * + * ptr is a pointer to the memory address that we're expecting to become + * non-NULL when our counterpart attaches to the queue. + */ +static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle) +{ + bool result = false; + + for (;;) { + BgwHandleStatus status; + ThreadId pid; + + /* Acquire the lock just long enough to check the pointer. */ + SpinLockAcquire(&mq->mq_mutex); + result = (*ptr != NULL); + SpinLockRelease(&mq->mq_mutex); + + /* Fail if detached; else succeed if initialized. */ + if (mq->mq_detached) { + result = false; + break; + } + if (result) { + break; + } + if (handle != NULL) { + /* Check for unexpected worker death. */ + status = GetBackgroundWorkerPid(handle, &pid); + if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED) { + result = false; + break; + } + } + + /* Wait to be signalled. */ + (void)WaitLatch(&t_thrd.proc->procLatch, WL_LATCH_SET, 0); + + /* Reset the latch so we don't spin. */ + ResetLatch(&t_thrd.proc->procLatch); + + /* An interrupt may have occurred while we were waiting. */ + CHECK_FOR_INTERRUPTS(); + } + + return result; +} + +/* + * Increment the number of bytes read. + */ +static void shm_mq_inc_bytes_read(shm_mq *mq, Size n) +{ + PGPROC *sender = NULL; + + /* + * Separate prior reads of mq_ring from the increment of mq_bytes_read + * which follows. This pairs with the full barrier in + * shm_mq_send_bytes(). We only need a read barrier here because the + * increment of mq_bytes_read is actually a read followed by a dependent + * write. + */ + pg_read_barrier(); + + /* + * There's no need to use pg_atomic_fetch_add_u64 here, because nobody + * else can be changing this value. This method should be cheaper. + */ + pg_atomic_write_u64(&mq->mq_bytes_read, + pg_atomic_read_u64(&mq->mq_bytes_read) + n); + + /* + * We shouldn't have any bytes to read without a sender, so we can read + * mq_sender here without a lock. Once it's initialized, it can't change. + */ + sender = mq->mq_sender; + Assert(sender != NULL); + SetLatch(&sender->procLatch); +} + +/* + * Increment the number of bytes written. + */ +static void shm_mq_inc_bytes_written(shm_mq *mq, Size n) +{ + /* + * Separate prior reads of mq_ring from the write of mq_bytes_written + * which we're about to do. Pairs with the read barrier found in + * shm_mq_get_receive_bytes. + */ + pg_write_barrier(); + + /* + * There's no need to use pg_atomic_fetch_add_u64 here, because nobody + * else can be changing this value. This method avoids taking the bus + * lock unnecessarily. + */ + pg_atomic_write_u64(&mq->mq_bytes_written, + pg_atomic_read_u64(&mq->mq_bytes_written) + n); +} + diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index e04ea17ae..3988224f4 100755 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -1,7 +1,7 @@ /* ------------------------------------------------------------------------- * * libpq.h - * POSTGRES LIBPQ buffer structure definitions. + * POSTGRES LIBPQ buffer structure definitions. * * * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group @@ -22,19 +22,43 @@ /* ---------------- * PQArgBlock - * Information (pointer to array of this structure) required - * for the PQfn() call. (This probably ought to go somewhere else...) + * Information (pointer to array of this structure) required + * for the PQfn() call. (This probably ought to go somewhere else...) * ---------------- */ typedef struct { int len; int isint; union { - int* ptr; /* can't use void (dec compiler barfs) */ + int *ptr; /* can't use void (dec compiler barfs) */ int integer; } u; } PQArgBlock; +typedef struct { + void (*comm_reset) (void); + int (*flush) (void); + int (*flush_if_writable) (void); + bool (*is_send_pending) (void); + int (*putmessage) (char msgtype, const char* s, size_t len); + int (*putmessage_noblock) (char msgtype, const char* s, size_t len); + void (*startcopyout) (void); + void (*endcopyout) (bool errorAbort); +} PQcommMethods; + +extern PGDLLIMPORT THR_LOCAL PQcommMethods *PqCommMethods; + +#define pq_comm_reset() (PqCommMethods->comm_reset()) +#define pq_flush() (PqCommMethods->flush()) +#define pq_flush_if_writable() (PqCommMethods->flush_if_writable()) +#define pq_is_send_pending() (PqCommMethods->is_send_pending()) +#define pq_putmessage(msgtype, s, len) \ + (PqCommMethods->putmessage(msgtype, s, len)) +#define pq_putmessage_noblock(msgtype, s, len) \ + (PqCommMethods->putmessage_noblock(msgtype, s, len)) +#define pq_startcopyout() (PqCommMethods->startcopyout()) +#define pq_endcopyout(errorAbort) (PqCommMethods->endcopyout(errorAbort)) + /* * External functions. */ @@ -49,7 +73,6 @@ extern int StreamConnection(pgsocket server_fd, Port* port); extern void StreamClose(pgsocket sock); extern void TouchSocketFile(void); extern void pq_init(void); -extern void pq_comm_reset(void); extern int pq_getbytes(char* s, size_t len); extern int pq_getstring(StringInfo s); extern int pq_getmessage(StringInfo s, int maxlen); @@ -57,14 +80,7 @@ extern int pq_getbyte(void); extern int pq_peekbyte(void); extern int pq_getbyte_if_available(unsigned char* c); extern int pq_putbytes(const char* s, size_t len); -extern int pq_flush(void); -extern int pq_flush_if_writable(void); extern void pq_flush_timedwait(int timeout); -extern bool pq_is_send_pending(void); -extern int pq_putmessage(char msgtype, const char* s, size_t len); -extern int pq_putmessage_noblock(char msgtype, const char* s, size_t len); -extern void pq_startcopyout(void); -extern void pq_endcopyout(bool errorAbort); extern bool pq_select(int timeout_ms); extern void pq_abandon_sendbuffer(void); extern void pq_abandon_recvbuffer(void); diff --git a/src/include/libpq/pqformat.h b/src/include/libpq/pqformat.h index 28f9eca5f..0a14b3b39 100755 --- a/src/include/libpq/pqformat.h +++ b/src/include/libpq/pqformat.h @@ -46,6 +46,7 @@ extern const char* pq_getmsgbytes(StringInfo msg, int datalen); extern void pq_copymsgbytes(StringInfo msg, char* buf, int datalen); extern char* pq_getmsgtext(StringInfo msg, int rawbytes, int* nbytes); extern const char* pq_getmsgstring(StringInfo msg); +extern const char* pq_getmsgrawstring(StringInfo msg); extern void pq_getmsgend(StringInfo msg); /* diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h new file mode 100644 index 000000000..2a749790e --- /dev/null +++ b/src/include/libpq/pqmq.h @@ -0,0 +1,25 @@ +/*------------------------------------------------------------------------- + * + * pqmq.h + * Use the frontend/backend protocol for communication over a shm_mq + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/libpq/pqmq.h + * + *------------------------------------------------------------------------- + */ +#ifndef PQMQ_H +#define PQMQ_H + +#include "lib/stringinfo.h" +#include "storage/shm_mq.h" + +extern void pq_redirect_to_shm_mq(shm_mq_handle* mqh); +extern void pq_stop_redirect_to_shm_mq(void); +extern void pq_set_parallel_master(pid_t pid, BackendId backend_id); + +extern void pq_parse_errornotice(StringInfo str, ErrorData* edata); + +#endif /* PQMQ_H */ diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 0a3efda84..1c2df987d 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -55,6 +55,7 @@ typedef enum { PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, PROCSIG_EXECUTOR_FLAG, + PROCSIG_PARALLEL_MESSAGE, /* message from cooperating parallel backend */ NUM_PROCSIGNALS /* Must be last! */ } ProcSignalReason; diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h new file mode 100644 index 000000000..27f98af44 --- /dev/null +++ b/src/include/storage/shm_mq.h @@ -0,0 +1,82 @@ +/*------------------------------------------------------------------------- + * + * shm_mq.h + * single-reader, single-writer shared memory message queue + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/shm_mq.h + * + *------------------------------------------------------------------------- + */ +#ifndef SHM_MQ_H +#define SHM_MQ_H + +#include "postmaster/bgworker.h" +#include "storage/proc.h" + +/* The queue itself, in shared memory. */ +struct shm_mq; +typedef struct shm_mq shm_mq; + +/* Backend-private state. */ +struct shm_mq_handle; +typedef struct shm_mq_handle shm_mq_handle; + +/* Descriptors for a single write spanning multiple locations. */ +typedef struct { + const char *data; + Size len; +} shm_mq_iovec; + +/* Possible results of a send or receive operation. */ +typedef enum { + SHM_MQ_SUCCESS, /* Sent or received a message. */ + SHM_MQ_WOULD_BLOCK, /* Not completed; retry later. */ + SHM_MQ_DETACHED /* Other process has detached queue. */ +} shm_mq_result; + +/* + * Primitives to create a queue and set the sender and receiver. + * + * Both the sender and the receiver must be set before any messages are read + * or written, but they need not be set by the same process. Each must be + * set exactly once. + */ +extern shm_mq *shm_mq_create(void *address, Size size); +extern void shm_mq_set_receiver(shm_mq *mq, PGPROC *); +extern void shm_mq_set_sender(shm_mq *mq, PGPROC *); + +/* Accessor methods for sender and receiver. */ +extern PGPROC *shm_mq_get_receiver(shm_mq *); +extern PGPROC *shm_mq_get_sender(shm_mq *); + +/* Set up backend-local queue state. */ +extern shm_mq_handle *shm_mq_attach(shm_mq *mq, char *seg, + BackgroundWorkerHandle *handle); + +/* Associate worker handle with shm_mq. */ +extern void shm_mq_set_handle(shm_mq_handle *, BackgroundWorkerHandle *); + +/* Break connection, release handle resources. */ +extern void shm_mq_detach(shm_mq_handle *mqh); + +/* Get the shm_mq from handle. */ +extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh); + +/* Send or receive messages. */ +extern shm_mq_result shm_mq_send(shm_mq_handle *mqh, + Size nbytes, const void *data, bool nowait); +extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, + shm_mq_iovec *iov, int iovcnt, bool nowait); +extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh, + Size *nbytesp, void **datap, bool nowait); + +/* Wait for our counterparty to attach to the queue. */ +extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh); + +/* Smallest possible queue. */ +extern PGDLLIMPORT const Size shm_mq_minimum_size; + +#endif /* SHM_MQ_H */