diff --git a/server/core/gateway.cc b/server/core/gateway.cc index 9d53c128c..393f58811 100644 --- a/server/core/gateway.cc +++ b/server/core/gateway.cc @@ -51,6 +51,7 @@ #include "maxscale/config.h" #include "maxscale/dcb.h" #include "maxscale/maxscale.h" +#include "maxscale/messagequeue.hh" #include "maxscale/modules.h" #include "maxscale/monitor.h" #include "maxscale/poll.h" @@ -58,7 +59,7 @@ #include "maxscale/statistics.h" #include "maxscale/worker.hh" -using maxscale::Worker; +using namespace maxscale; #define STRING_BUFFER_SIZE 1024 #define PIDFD_CLOSED -1 @@ -1890,6 +1891,13 @@ int main(int argc, char **argv) goto return_main; } + if (!MessageQueue::init()) + { + MXS_ERROR("Failed to initialize message queue."); + rc = MAXSCALE_INTERNALERROR; + goto return_main; + } + if (!Worker::init()) { MXS_ERROR("Failed to initialize workers."); @@ -2007,6 +2015,7 @@ int main(int argc, char **argv) } Worker::finish(); + MessageQueue::finish(); /*< * Destroy the router and filter instances of all services. diff --git a/server/core/maxscale/messagequeue.hh b/server/core/maxscale/messagequeue.hh index 32a938358..d0d6c67ce 100644 --- a/server/core/maxscale/messagequeue.hh +++ b/server/core/maxscale/messagequeue.hh @@ -116,6 +116,21 @@ public: typedef MessageQueueHandler Handler; typedef MessageQueueMessage Message; + /** + * Initializes the message queue mechanism. To be called once at + * process startup. + * + * @return True if the initialization succeeded, false otherwise. + */ + static bool init(); + + + /** + * Finalizes the message queue mechanism. To be called once at + * process shutdown, if the initialization succeeded. + */ + static void finish(); + /** * Creates a @c MessageQueue with the provided handler. * diff --git a/server/core/messagequeue.cc b/server/core/messagequeue.cc index 1b9db0996..c6680c72a 100644 --- a/server/core/messagequeue.cc +++ b/server/core/messagequeue.cc @@ -12,14 +12,31 @@ */ #include "maxscale/messagequeue.hh" +#include #include #include #include #include +#include #include #include #include "maxscale/worker.hh" +namespace +{ + +struct +{ + bool initialized; + int pipe_flags; +} this_unit = +{ + false, + O_NONBLOCK | O_CLOEXEC +}; + +} + namespace maxscale { @@ -46,17 +63,89 @@ MessageQueue::~MessageQueue() close(m_write_fd); } +//static +bool MessageQueue::init() +{ + ss_dassert(!this_unit.initialized); + + /* From "man 7 pipe" + * ---- + * + * O_NONBLOCK enabled, n <= PIPE_BUF + * If there is room to write n bytes to the pipe, then write(2) + * succeeds immediately, writing all n bytes; otherwise write(2) + * fails, with errno set to EAGAIN. + * + * ... (On Linux, PIPE_BUF is 4096 bytes.) + * + * ---- + * + * As O_NONBLOCK is set and the messages are less than 4096 bytes, + * O_DIRECT should not be needed and we should be safe without it. + * + * However, to be in the safe side, if we run on kernel version >= 3.4 + * we use it. + */ + + utsname u; + + if (uname(&u) == 0) + { + char* p; + char* zMajor = strtok_r(u.release, ".", &p); + char* zMinor = strtok_r(NULL, ".", &p); + + if (zMajor && zMinor) + { + int major = atoi(zMajor); + int minor = atoi(zMinor); + + if (major >= 3 && minor >= 4) + { + // O_DIRECT for pipes is supported from kernel 3.4 onwards. + this_unit.pipe_flags |= O_DIRECT; + } + else + { + MXS_NOTICE("O_DIRECT is not supported for pipes on Linux kernel %s " + "(supported from version 3.4 onwards), NOT using it.", + u.release); + } + } + else + { + MXS_WARNING("Syntax used in utsname.release seems to have changed, " + "not able to figure out current kernel version. Assuming " + "O_DIRECT is not supported for pipes."); + } + } + else + { + MXS_WARNING("uname() failed, assuming O_DIRECT is not supported for pipes: %s", + mxs_strerror(errno)); + } + + this_unit.initialized = true; + + return this_unit.initialized; +} + +//static +void MessageQueue::finish() +{ + ss_dassert(this_unit.initialized); + this_unit.initialized = false; +} + //static MessageQueue* MessageQueue::create(Handler* pHandler) { + ss_dassert(this_unit.initialized); + MessageQueue* pThis = NULL; - // We create the pipe in message mode (O_DIRECT), so that we do - // not need to deal with partial messages and as non blocking so - // that the descriptor can be added to an epoll instance. - int fds[2]; - if (pipe2(fds, O_DIRECT | O_NONBLOCK | O_CLOEXEC) == 0) + if (pipe2(fds, this_unit.pipe_flags) == 0) { int read_fd = fds[0]; int write_fd = fds[1];