Use O_DIRECT, but only if available.
TODO: The kernel version should be looked up at startup and made
generally available so that it is readily available for
anybody interested.
This commit is contained in:
@ -51,6 +51,7 @@
|
|||||||
#include "maxscale/config.h"
|
#include "maxscale/config.h"
|
||||||
#include "maxscale/dcb.h"
|
#include "maxscale/dcb.h"
|
||||||
#include "maxscale/maxscale.h"
|
#include "maxscale/maxscale.h"
|
||||||
|
#include "maxscale/messagequeue.hh"
|
||||||
#include "maxscale/modules.h"
|
#include "maxscale/modules.h"
|
||||||
#include "maxscale/monitor.h"
|
#include "maxscale/monitor.h"
|
||||||
#include "maxscale/poll.h"
|
#include "maxscale/poll.h"
|
||||||
@ -58,7 +59,7 @@
|
|||||||
#include "maxscale/statistics.h"
|
#include "maxscale/statistics.h"
|
||||||
#include "maxscale/worker.hh"
|
#include "maxscale/worker.hh"
|
||||||
|
|
||||||
using maxscale::Worker;
|
using namespace maxscale;
|
||||||
|
|
||||||
#define STRING_BUFFER_SIZE 1024
|
#define STRING_BUFFER_SIZE 1024
|
||||||
#define PIDFD_CLOSED -1
|
#define PIDFD_CLOSED -1
|
||||||
@ -1890,6 +1891,13 @@ int main(int argc, char **argv)
|
|||||||
goto return_main;
|
goto return_main;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!MessageQueue::init())
|
||||||
|
{
|
||||||
|
MXS_ERROR("Failed to initialize message queue.");
|
||||||
|
rc = MAXSCALE_INTERNALERROR;
|
||||||
|
goto return_main;
|
||||||
|
}
|
||||||
|
|
||||||
if (!Worker::init())
|
if (!Worker::init())
|
||||||
{
|
{
|
||||||
MXS_ERROR("Failed to initialize workers.");
|
MXS_ERROR("Failed to initialize workers.");
|
||||||
@ -2007,6 +2015,7 @@ int main(int argc, char **argv)
|
|||||||
}
|
}
|
||||||
|
|
||||||
Worker::finish();
|
Worker::finish();
|
||||||
|
MessageQueue::finish();
|
||||||
|
|
||||||
/*<
|
/*<
|
||||||
* Destroy the router and filter instances of all services.
|
* Destroy the router and filter instances of all services.
|
||||||
|
|||||||
@ -116,6 +116,21 @@ public:
|
|||||||
typedef MessageQueueHandler Handler;
|
typedef MessageQueueHandler Handler;
|
||||||
typedef MessageQueueMessage Message;
|
typedef MessageQueueMessage Message;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes the message queue mechanism. To be called once at
|
||||||
|
* process startup.
|
||||||
|
*
|
||||||
|
* @return True if the initialization succeeded, false otherwise.
|
||||||
|
*/
|
||||||
|
static bool init();
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finalizes the message queue mechanism. To be called once at
|
||||||
|
* process shutdown, if the initialization succeeded.
|
||||||
|
*/
|
||||||
|
static void finish();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a @c MessageQueue with the provided handler.
|
* Creates a @c MessageQueue with the provided handler.
|
||||||
*
|
*
|
||||||
|
|||||||
@ -12,14 +12,31 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "maxscale/messagequeue.hh"
|
#include "maxscale/messagequeue.hh"
|
||||||
|
#include <linux/version.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
#include <sys/utsname.h>
|
||||||
#include <maxscale/debug.h>
|
#include <maxscale/debug.h>
|
||||||
#include <maxscale/log_manager.h>
|
#include <maxscale/log_manager.h>
|
||||||
#include "maxscale/worker.hh"
|
#include "maxscale/worker.hh"
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
|
||||||
|
struct
|
||||||
|
{
|
||||||
|
bool initialized;
|
||||||
|
int pipe_flags;
|
||||||
|
} this_unit =
|
||||||
|
{
|
||||||
|
false,
|
||||||
|
O_NONBLOCK | O_CLOEXEC
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
namespace maxscale
|
namespace maxscale
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -46,17 +63,89 @@ MessageQueue::~MessageQueue()
|
|||||||
close(m_write_fd);
|
close(m_write_fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//static
|
||||||
|
bool MessageQueue::init()
|
||||||
|
{
|
||||||
|
ss_dassert(!this_unit.initialized);
|
||||||
|
|
||||||
|
/* From "man 7 pipe"
|
||||||
|
* ----
|
||||||
|
*
|
||||||
|
* O_NONBLOCK enabled, n <= PIPE_BUF
|
||||||
|
* If there is room to write n bytes to the pipe, then write(2)
|
||||||
|
* succeeds immediately, writing all n bytes; otherwise write(2)
|
||||||
|
* fails, with errno set to EAGAIN.
|
||||||
|
*
|
||||||
|
* ... (On Linux, PIPE_BUF is 4096 bytes.)
|
||||||
|
*
|
||||||
|
* ----
|
||||||
|
*
|
||||||
|
* As O_NONBLOCK is set and the messages are less than 4096 bytes,
|
||||||
|
* O_DIRECT should not be needed and we should be safe without it.
|
||||||
|
*
|
||||||
|
* However, to be in the safe side, if we run on kernel version >= 3.4
|
||||||
|
* we use it.
|
||||||
|
*/
|
||||||
|
|
||||||
|
utsname u;
|
||||||
|
|
||||||
|
if (uname(&u) == 0)
|
||||||
|
{
|
||||||
|
char* p;
|
||||||
|
char* zMajor = strtok_r(u.release, ".", &p);
|
||||||
|
char* zMinor = strtok_r(NULL, ".", &p);
|
||||||
|
|
||||||
|
if (zMajor && zMinor)
|
||||||
|
{
|
||||||
|
int major = atoi(zMajor);
|
||||||
|
int minor = atoi(zMinor);
|
||||||
|
|
||||||
|
if (major >= 3 && minor >= 4)
|
||||||
|
{
|
||||||
|
// O_DIRECT for pipes is supported from kernel 3.4 onwards.
|
||||||
|
this_unit.pipe_flags |= O_DIRECT;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXS_NOTICE("O_DIRECT is not supported for pipes on Linux kernel %s "
|
||||||
|
"(supported from version 3.4 onwards), NOT using it.",
|
||||||
|
u.release);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXS_WARNING("Syntax used in utsname.release seems to have changed, "
|
||||||
|
"not able to figure out current kernel version. Assuming "
|
||||||
|
"O_DIRECT is not supported for pipes.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXS_WARNING("uname() failed, assuming O_DIRECT is not supported for pipes: %s",
|
||||||
|
mxs_strerror(errno));
|
||||||
|
}
|
||||||
|
|
||||||
|
this_unit.initialized = true;
|
||||||
|
|
||||||
|
return this_unit.initialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
//static
|
||||||
|
void MessageQueue::finish()
|
||||||
|
{
|
||||||
|
ss_dassert(this_unit.initialized);
|
||||||
|
this_unit.initialized = false;
|
||||||
|
}
|
||||||
|
|
||||||
//static
|
//static
|
||||||
MessageQueue* MessageQueue::create(Handler* pHandler)
|
MessageQueue* MessageQueue::create(Handler* pHandler)
|
||||||
{
|
{
|
||||||
|
ss_dassert(this_unit.initialized);
|
||||||
|
|
||||||
MessageQueue* pThis = NULL;
|
MessageQueue* pThis = NULL;
|
||||||
|
|
||||||
// We create the pipe in message mode (O_DIRECT), so that we do
|
|
||||||
// not need to deal with partial messages and as non blocking so
|
|
||||||
// that the descriptor can be added to an epoll instance.
|
|
||||||
|
|
||||||
int fds[2];
|
int fds[2];
|
||||||
if (pipe2(fds, O_DIRECT | O_NONBLOCK | O_CLOEXEC) == 0)
|
if (pipe2(fds, this_unit.pipe_flags) == 0)
|
||||||
{
|
{
|
||||||
int read_fd = fds[0];
|
int read_fd = fds[0];
|
||||||
int write_fd = fds[1];
|
int write_fd = fds[1];
|
||||||
|
|||||||
Reference in New Issue
Block a user