Remove old polling message system

The old polling message system is obsolete now that the worker messages
are implemented. The old system was only used to clean up the persistent
connection pool of a server.
This commit is contained in:
Markus Mäkelä
2017-04-28 23:34:44 +03:00
parent 4c6e0c75a5
commit dd8a10f466
5 changed files with 49 additions and 67 deletions

View File

@ -61,6 +61,4 @@ void dShowEventStats(DCB *dcb);
int poll_get_stat(POLL_STAT stat);
RESULTSET *eventTimesGetList();
void poll_send_message(enum poll_message msg, void *data);
MXS_END_DECLS

View File

@ -284,7 +284,7 @@ public:
* MyResult& result = task.result();
* @endcode
*/
bool post(Task* pTask, Semaphore* pSem = NULL);
bool post(Task* pTask, Semaphore* pSem = NULL, enum execute_mode_t mode = EXECUTE_AUTO);
/**
* Posts a task to a worker for execution.
@ -296,7 +296,7 @@ public:
*
* @attention Once the task has been executed, it will be deleted.
*/
bool post(std::auto_ptr<DisposableTask> sTask);
bool post(std::auto_ptr<DisposableTask> sTask, enum execute_mode_t mode = EXECUTE_AUTO);
/**
* Posts a task to all workers for execution.
@ -450,7 +450,7 @@ private:
static Worker* create(int id, int epoll_listener_fd);
bool post_disposable(DisposableTask* pTask);
bool post_disposable(DisposableTask* pTask, enum execute_mode_t mode = EXECUTE_AUTO);
void handle_message(MessageQueue& queue, const MessageQueue::Message& msg); // override

View File

@ -43,12 +43,6 @@ using maxscale::Worker;
static int next_epoll_fd = 0; /*< Which thread handles the next DCB */
static int n_threads; /*< Number of threads */
/** Poll cross-thread messaging variables */
static volatile int *poll_msg;
static void *poll_msg_data = NULL;
static SPINLOCK poll_msg_lock = SPINLOCK_INIT;
/**
* Initialise the polling system we are using for the gateway.
*
@ -58,11 +52,6 @@ void
poll_init()
{
n_threads = config_threadcount();
if ((poll_msg = (int*)MXS_CALLOC(n_threads, sizeof(int))) == NULL)
{
exit(-1);
}
}
static bool add_fd_to_worker(int wid, int fd, uint32_t events, MXS_POLL_DATA* data)
@ -422,45 +411,3 @@ eventTimesGetList()
return set;
}
void poll_send_message(enum poll_message msg, void *data)
{
spinlock_acquire(&poll_msg_lock);
int nthr = config_threadcount();
poll_msg_data = data;
for (int i = 0; i < nthr; i++)
{
poll_msg[i] |= msg;
}
/** Handle this thread's message */
poll_check_message();
for (int i = 0; i < nthr; i++)
{
if (i != Worker::get_current_id())
{
while (poll_msg[i] & msg)
{
thread_millisleep(1);
}
}
}
poll_msg_data = NULL;
spinlock_release(&poll_msg_lock);
}
void poll_check_message()
{
int thread_id = Worker::get_current_id();
if (poll_msg[thread_id] & POLL_MSG_CLEAN_PERSISTENT)
{
SERVER *server = (SERVER*)poll_msg_data;
dcb_persistent_clean_count(server->persistent[thread_id], thread_id, false);
atomic_synchronize();
poll_msg[thread_id] &= ~POLL_MSG_CLEAN_PERSISTENT;
}
}

View File

@ -48,9 +48,16 @@
#include <maxscale/ssl.h>
#include <maxscale/alloc.h>
#include <maxscale/paths.h>
#include <maxscale/semaphore.hh>
#include "maxscale/monitor.h"
#include "maxscale/poll.h"
#include "maxscale/workertask.hh"
#include "maxscale/worker.hh"
using maxscale::Semaphore;
using maxscale::Worker;
using maxscale::WorkerTask;
/** The latin1 charset */
#define SERVER_DEFAULT_CHARSET 0x08
@ -524,6 +531,39 @@ dprintAllServersJson(DCB *dcb)
spinlock_release(&server_spin);
}
/**
* A class for cleaning up persistent connections
*/
class CleanupTask : public WorkerTask
{
public:
CleanupTask(const SERVER* server):
m_server(server)
{
}
void execute(Worker& worker)
{
int thread_id = worker.get_current_id();
dcb_persistent_clean_count(m_server->persistent[thread_id], thread_id, false);
}
private:
const SERVER* m_server; /**< Server to clean up */
};
/**
* @brief Clean up any stale persistent connections
*
* This function purges any stale persistent connections from @c server.
*
* @param server Server to clean up
*/
static void cleanup_persistent_connections(const SERVER* server)
{
CleanupTask task(server);
Worker::execute_concurrently(task);
}
/**
* Print server details to a DCB
@ -604,7 +644,7 @@ dprintServer(DCB *dcb, const SERVER *server)
if (server->persistpoolmax)
{
dcb_printf(dcb, "\tPersistent pool size: %d\n", server->stats.n_persistent);
poll_send_message(POLL_MSG_CLEAN_PERSISTENT, (void*)server);
cleanup_persistent_connections(server);
dcb_printf(dcb, "\tPersistent measured pool size: %d\n", server->stats.n_persistent);
dcb_printf(dcb, "\tPersistent actual size max: %d\n", server->persistmax);
dcb_printf(dcb, "\tPersistent pool size limit: %ld\n", server->persistpoolmax);

View File

@ -548,11 +548,10 @@ void Worker::set_maxwait(unsigned int maxwait)
this_unit.max_poll_sleep = maxwait;
}
bool Worker::post(Task* pTask, Semaphore* pSem)
bool Worker::post(Task* pTask, Semaphore* pSem, enum execute_mode_t mode)
{
// No logging here, function must be signal safe.
intptr_t arg1 = reinterpret_cast<intptr_t>(pTask);
intptr_t arg2 = reinterpret_cast<intptr_t>(pSem);
bool rval = true;
if (mode == Worker::EXECUTE_AUTO && Worker::get_current() == this)
{
@ -574,14 +573,14 @@ bool Worker::post(Task* pTask, Semaphore* pSem)
return rval;
}
bool Worker::post(std::auto_ptr<DisposableTask> sTask)
bool Worker::post(std::auto_ptr<DisposableTask> sTask, enum execute_mode_t mode)
{
// No logging here, function must be signal safe.
return post_disposable(sTask.release());
return post_disposable(sTask.release(), mode);
}
// private
bool Worker::post_disposable(DisposableTask* pTask)
bool Worker::post_disposable(DisposableTask* pTask, enum execute_mode_t mode)
{
bool posted = true;
@ -1086,8 +1085,6 @@ void Worker::poll_waitevents()
/** Process closed DCBs */
dcb_process_zombies(m_id);
poll_check_message();
m_state = IDLE;
} /*< while(1) */