MXS-2008 Move Worker and MessageQueue to maxbase
This commit is contained in:
@ -3,10 +3,13 @@ add_library(maxbase STATIC
|
||||
eventcount.cc
|
||||
log.cc
|
||||
logger.cc
|
||||
messagequeue.cc
|
||||
semaphore.cc
|
||||
stopwatch.cc
|
||||
string.cc
|
||||
stacktrace.cc
|
||||
worker.cc
|
||||
workertask.cc
|
||||
)
|
||||
|
||||
set_target_properties(maxbase PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs)
|
||||
|
326
maxutils/maxbase/src/messagequeue.cc
Normal file
326
maxutils/maxbase/src/messagequeue.cc
Normal file
@ -0,0 +1,326 @@
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
*
|
||||
* Change Date: 2022-01-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include <maxbase/messagequeue.hh>
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <fstream>
|
||||
#include <maxbase/assert.h>
|
||||
#include <maxbase/log.h>
|
||||
#include <maxbase/string.h>
|
||||
#include <maxbase/worker.hh>
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
static struct
|
||||
{
|
||||
bool initialized;
|
||||
int pipe_max_size;
|
||||
} this_unit =
|
||||
{
|
||||
false
|
||||
};
|
||||
|
||||
int get_pipe_max_size()
|
||||
{
|
||||
int size = 65536; // Default value from pipe(7)
|
||||
std::ifstream file("/proc/sys/fs/pipe-max-size");
|
||||
|
||||
if (file.good())
|
||||
{
|
||||
file >> size;
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace maxbase
|
||||
{
|
||||
|
||||
MessageQueue::MessageQueue(Handler* pHandler, int read_fd, int write_fd)
|
||||
: mxb::PollData(&MessageQueue::poll_handler)
|
||||
, m_handler(*pHandler)
|
||||
, m_read_fd(read_fd)
|
||||
, m_write_fd(write_fd)
|
||||
, m_pWorker(NULL)
|
||||
{
|
||||
mxb_assert(pHandler);
|
||||
mxb_assert(read_fd);
|
||||
mxb_assert(write_fd);
|
||||
}
|
||||
|
||||
MessageQueue::~MessageQueue()
|
||||
{
|
||||
if (m_pWorker)
|
||||
{
|
||||
m_pWorker->remove_fd(m_read_fd);
|
||||
}
|
||||
|
||||
close(m_read_fd);
|
||||
close(m_write_fd);
|
||||
}
|
||||
|
||||
//static
|
||||
bool MessageQueue::init()
|
||||
{
|
||||
mxb_assert(!this_unit.initialized);
|
||||
|
||||
this_unit.initialized = true;
|
||||
this_unit.pipe_max_size = get_pipe_max_size();
|
||||
|
||||
return this_unit.initialized;
|
||||
}
|
||||
|
||||
//static
|
||||
void MessageQueue::finish()
|
||||
{
|
||||
mxb_assert(this_unit.initialized);
|
||||
this_unit.initialized = false;
|
||||
}
|
||||
|
||||
//static
|
||||
MessageQueue* MessageQueue::create(Handler* pHandler)
|
||||
{
|
||||
mxb_assert(this_unit.initialized);
|
||||
|
||||
/* From "man 7 pipe"
|
||||
* ----
|
||||
*
|
||||
* O_NONBLOCK enabled, n <= PIPE_BUF
|
||||
* If there is room to write n bytes to the pipe, then write(2)
|
||||
* succeeds immediately, writing all n bytes; otherwise write(2)
|
||||
* fails, with errno set to EAGAIN.
|
||||
*
|
||||
* ... (On Linux, PIPE_BUF is 4096 bytes.)
|
||||
*
|
||||
* ----
|
||||
*
|
||||
* As O_NONBLOCK is set and the messages are less than 4096 bytes,
|
||||
* O_DIRECT should not be needed and we should be safe without it.
|
||||
*
|
||||
* However, to be in the safe side, we first try whether it is supported,
|
||||
* and if not, we create the pipe without O_DIRECT.
|
||||
*/
|
||||
|
||||
MessageQueue* pThis = NULL;
|
||||
|
||||
int fds[2];
|
||||
|
||||
int rv = pipe2(fds, O_NONBLOCK | O_CLOEXEC | O_DIRECT);
|
||||
|
||||
if ((rv != 0) && (errno == EINVAL))
|
||||
{
|
||||
// Ok, apparently the kernel does not support O_DIRECT. Let's try without.
|
||||
rv = pipe2(fds, O_NONBLOCK | O_CLOEXEC);
|
||||
|
||||
if (rv == 0)
|
||||
{
|
||||
// Succeeded, so apparently it was the missing support for O_DIRECT.
|
||||
MXB_WARNING("Platform does not support O_DIRECT in conjunction with pipes, "
|
||||
"using without.");
|
||||
}
|
||||
}
|
||||
|
||||
if (rv == 0)
|
||||
{
|
||||
int read_fd = fds[0];
|
||||
int write_fd = fds[1];
|
||||
#ifdef F_SETPIPE_SZ
|
||||
/**
|
||||
* Increase the pipe buffer size on systems that support it. Modifying
|
||||
* the buffer size of one fd will also increase it for the other.
|
||||
*/
|
||||
if (fcntl(fds[0], F_SETPIPE_SZ, this_unit.pipe_max_size) == -1)
|
||||
{
|
||||
MXB_WARNING("Failed to increase pipe buffer size to '%d': %d, %s",
|
||||
this_unit.pipe_max_size, errno, mxb_strerror(errno));
|
||||
}
|
||||
#endif
|
||||
pThis = new (std::nothrow) MessageQueue(pHandler, read_fd, write_fd);
|
||||
|
||||
if (!pThis)
|
||||
{
|
||||
MXB_OOM();
|
||||
close(read_fd);
|
||||
close(write_fd);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXB_ERROR("Could not create pipe for worker: %s", mxb_strerror(errno));
|
||||
}
|
||||
|
||||
return pThis;
|
||||
}
|
||||
|
||||
bool MessageQueue::post(const Message& message) const
|
||||
{
|
||||
// NOTE: No logging here, this function must be signal safe.
|
||||
bool rv = false;
|
||||
|
||||
mxb_assert(m_pWorker);
|
||||
if (m_pWorker)
|
||||
{
|
||||
/**
|
||||
* This is a stopgap measure to solve MXS-1983 that causes Resource temporarily
|
||||
* unavailable errors. The errors are caused by the pipe buffer being too small to
|
||||
* hold all worker messages. By retrying a limited number of times before giving
|
||||
* up, the success rate for posted messages under heavy load increases
|
||||
* significantly.
|
||||
*/
|
||||
int fast = 0;
|
||||
int slow = 0;
|
||||
const int fast_size = 100;
|
||||
const int slow_limit = 3;
|
||||
ssize_t n;
|
||||
|
||||
while (true)
|
||||
{
|
||||
n = write(m_write_fd, &message, sizeof(message));
|
||||
rv = (n == sizeof(message));
|
||||
|
||||
if (n == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
|
||||
{
|
||||
if (++fast > fast_size)
|
||||
{
|
||||
fast = 0;
|
||||
|
||||
if (++slow >= slow_limit)
|
||||
{
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
sched_yield();
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (n == -1)
|
||||
{
|
||||
MXB_ERROR("Failed to write message: %d, %s", errno, mxb_strerror(errno));
|
||||
|
||||
static bool warn_pipe_buffer_size = true;
|
||||
|
||||
if ((errno == EAGAIN || errno == EWOULDBLOCK) && warn_pipe_buffer_size)
|
||||
{
|
||||
MXB_ERROR("Consider increasing pipe buffer size (sysctl fs.pipe-max-size)");
|
||||
warn_pipe_buffer_size = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXB_ERROR("Attempt to post using a message queue that is not added to a worker.");
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
bool MessageQueue::add_to_worker(Worker* pWorker)
|
||||
{
|
||||
if (m_pWorker)
|
||||
{
|
||||
m_pWorker->remove_fd(m_read_fd);
|
||||
m_pWorker = NULL;
|
||||
}
|
||||
|
||||
if (pWorker->add_fd(m_read_fd, EPOLLIN, this))
|
||||
{
|
||||
m_pWorker = pWorker;
|
||||
}
|
||||
|
||||
return m_pWorker != NULL;
|
||||
}
|
||||
|
||||
Worker* MessageQueue::remove_from_worker()
|
||||
{
|
||||
Worker* pWorker = m_pWorker;
|
||||
|
||||
if (m_pWorker)
|
||||
{
|
||||
m_pWorker->remove_fd(m_read_fd);
|
||||
m_pWorker = NULL;
|
||||
}
|
||||
|
||||
return pWorker;
|
||||
}
|
||||
|
||||
uint32_t MessageQueue::handle_poll_events(Worker* pWorker, uint32_t events)
|
||||
{
|
||||
uint32_t rc = MXB_POLL_NOP;
|
||||
|
||||
mxb_assert(pWorker == m_pWorker);
|
||||
|
||||
// We only expect EPOLLIN events.
|
||||
mxb_assert(((events & EPOLLIN) != 0) && ((events & ~EPOLLIN) == 0));
|
||||
|
||||
if (events & EPOLLIN)
|
||||
{
|
||||
Message message;
|
||||
|
||||
ssize_t n;
|
||||
|
||||
do
|
||||
{
|
||||
n = read(m_read_fd, &message, sizeof(message));
|
||||
|
||||
if (n == sizeof(message))
|
||||
{
|
||||
m_handler.handle_message(*this, message);
|
||||
}
|
||||
else if (n == -1)
|
||||
{
|
||||
if (errno != EWOULDBLOCK)
|
||||
{
|
||||
MXB_ERROR("Worker could not read from pipe: %s", mxb_strerror(errno));
|
||||
}
|
||||
}
|
||||
else if (n != 0)
|
||||
{
|
||||
// This really should not happen as the pipe is in message mode. We
|
||||
// should either get a message, nothing at all or an error. In non-debug
|
||||
// mode we continue reading in order to empty the pipe as otherwise the
|
||||
// thread may hang.
|
||||
MXB_ERROR("MessageQueue could only read %ld bytes from pipe, although "
|
||||
"expected %lu bytes.", n, sizeof(message));
|
||||
mxb_assert(!true);
|
||||
}
|
||||
}
|
||||
while ((n != 0) && (n != -1));
|
||||
|
||||
rc = MXB_POLL_READ;
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
//static
|
||||
uint32_t MessageQueue::poll_handler(MXB_POLL_DATA* pData, MXB_WORKER* pWorker, uint32_t events)
|
||||
{
|
||||
MessageQueue* pThis = static_cast<MessageQueue*>(pData);
|
||||
|
||||
return pThis->handle_poll_events(static_cast<Worker*>(pWorker), events);
|
||||
}
|
||||
|
||||
}
|
@ -5,3 +5,7 @@ add_test(test_mxb_log test_mxb_log)
|
||||
add_executable(test_semaphore test_semaphore.cc)
|
||||
target_link_libraries(test_semaphore maxbase pthread)
|
||||
add_test(test_semaphore test_semaphore)
|
||||
|
||||
add_executable(test_worker test_worker.cc)
|
||||
target_link_libraries(test_worker maxbase pthread)
|
||||
add_test(test_worker test_worker)
|
||||
|
132
maxutils/maxbase/src/test/test_worker.cc
Normal file
132
maxutils/maxbase/src/test/test_worker.cc
Normal file
@ -0,0 +1,132 @@
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
*
|
||||
* Change Date: 2022-01-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <maxbase/assert.h>
|
||||
#include <maxbase/log.hh>
|
||||
#include <maxbase/worker.hh>
|
||||
|
||||
using namespace maxbase;
|
||||
using namespace std;
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
// TODO: Put this in some common place.
|
||||
int64_t get_monotonic_time_ms()
|
||||
{
|
||||
struct timespec ts;
|
||||
MXB_AT_DEBUG(int rv =) clock_gettime(CLOCK_MONOTONIC, &ts);
|
||||
mxb_assert(rv == 0);
|
||||
|
||||
return ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
|
||||
}
|
||||
|
||||
class TimerTest
|
||||
{
|
||||
public:
|
||||
static int s_ticks;
|
||||
|
||||
TimerTest(Worker* pWorker, int* pRv, int32_t delay)
|
||||
: m_id(s_id++)
|
||||
, m_worker(*pWorker)
|
||||
, m_delay(delay)
|
||||
, m_at(get_monotonic_time_ms() + delay)
|
||||
, m_rv(*pRv)
|
||||
{
|
||||
}
|
||||
|
||||
int32_t delay() const
|
||||
{
|
||||
return m_delay;
|
||||
}
|
||||
|
||||
bool tick(Worker::Call::action_t action)
|
||||
{
|
||||
bool rv = false;
|
||||
|
||||
if (action == Worker::Call::EXECUTE)
|
||||
{
|
||||
int64_t now = get_monotonic_time_ms();
|
||||
int64_t diff = abs(now - m_at);
|
||||
|
||||
cout << m_id << ": " << diff << endl;
|
||||
|
||||
if (diff > 50)
|
||||
{
|
||||
cout << "Error: Difference between expected and happened > 50: " << diff << endl;
|
||||
m_rv = EXIT_FAILURE;
|
||||
}
|
||||
|
||||
m_at += m_delay;
|
||||
|
||||
if (--s_ticks < 0)
|
||||
{
|
||||
m_worker.shutdown();
|
||||
}
|
||||
|
||||
rv = true;
|
||||
}
|
||||
|
||||
return rv;
|
||||
}
|
||||
|
||||
private:
|
||||
static int s_id;
|
||||
|
||||
int m_id;
|
||||
Worker& m_worker;
|
||||
int32_t m_delay;
|
||||
int64_t m_at;
|
||||
int& m_rv;
|
||||
};
|
||||
|
||||
int TimerTest::s_id = 1;
|
||||
int TimerTest::s_ticks;
|
||||
|
||||
int run()
|
||||
{
|
||||
int rv = EXIT_SUCCESS;
|
||||
|
||||
TimerTest::s_ticks = 100;
|
||||
|
||||
Worker w;
|
||||
|
||||
TimerTest t1(&w, &rv, 200);
|
||||
TimerTest t2(&w, &rv, 300);
|
||||
TimerTest t3(&w, &rv, 400);
|
||||
TimerTest t4(&w, &rv, 500);
|
||||
TimerTest t5(&w, &rv, 600);
|
||||
|
||||
w.delayed_call(t1.delay(), &TimerTest::tick, &t1);
|
||||
w.delayed_call(t2.delay(), &TimerTest::tick, &t2);
|
||||
w.delayed_call(t3.delay(), &TimerTest::tick, &t3);
|
||||
w.delayed_call(t4.delay(), &TimerTest::tick, &t4);
|
||||
w.delayed_call(t5.delay(), &TimerTest::tick, &t5);
|
||||
|
||||
w.run();
|
||||
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
int main()
|
||||
{
|
||||
mxb::Log log(MXB_LOG_TARGET_STDOUT);
|
||||
|
||||
maxbase::MessageQueue::init();
|
||||
maxbase::Worker::init();
|
||||
|
||||
return run();
|
||||
}
|
1066
maxutils/maxbase/src/worker.cc
Normal file
1066
maxutils/maxbase/src/worker.cc
Normal file
File diff suppressed because it is too large
Load Diff
55
maxutils/maxbase/src/workertask.cc
Normal file
55
maxutils/maxbase/src/workertask.cc
Normal file
@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
*
|
||||
* Change Date: 2022-01-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include <maxbase/workertask.hh>
|
||||
#include <maxbase/atomic.h>
|
||||
#include <maxbase/assert.h>
|
||||
|
||||
namespace maxbase
|
||||
{
|
||||
|
||||
//
|
||||
// WorkerTask
|
||||
//
|
||||
WorkerTask::~WorkerTask()
|
||||
{
|
||||
}
|
||||
|
||||
//
|
||||
// WorkerDisposableTask
|
||||
//
|
||||
WorkerDisposableTask::WorkerDisposableTask()
|
||||
: m_count(0)
|
||||
{
|
||||
}
|
||||
|
||||
WorkerDisposableTask::~WorkerDisposableTask()
|
||||
{
|
||||
}
|
||||
|
||||
void WorkerDisposableTask::inc_ref()
|
||||
{
|
||||
atomic_add(&m_count, 1);
|
||||
}
|
||||
|
||||
void WorkerDisposableTask::dec_ref()
|
||||
{
|
||||
mxb_assert(atomic_load_int32(&m_count) > 0);
|
||||
|
||||
if (atomic_add(&m_count, -1) == 1)
|
||||
{
|
||||
delete this;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user