MXS-1506: Move all functionality into Housekeeper

The class now does all of the work and the API wraps the calls to the
member methods. Using an STL container makes the list management a lot
more convenient.
This commit is contained in:
Markus Mäkelä
2018-04-02 15:23:51 +03:00
parent 67b2f24be1
commit 96a0aae7fe
4 changed files with 143 additions and 249 deletions

View File

@ -17,7 +17,6 @@
*/ */
#include <maxscale/cdefs.h> #include <maxscale/cdefs.h>
#include <time.h>
#include <maxscale/dcb.h> #include <maxscale/dcb.h>
MXS_BEGIN_DECLS MXS_BEGIN_DECLS
@ -30,7 +29,7 @@ MXS_BEGIN_DECLS
* *
* @return True if the housekeeper mechanism was initialized, false otherwise. * @return True if the housekeeper mechanism was initialized, false otherwise.
*/ */
extern bool hkinit(); bool hkinit();
/** /**
* Shuts down the housekeeper mechanism. * Shuts down the housekeeper mechanism.
@ -39,13 +38,13 @@ extern bool hkinit();
* *
* @see hkinit hkfinish * @see hkinit hkfinish
*/ */
extern void hkshutdown(); void hkshutdown();
/** /**
* Waits for the housekeeper thread to finish. Should be called only after * Waits for the housekeeper thread to finish. Should be called only after
* hkshutdown() has been called. * hkshutdown() has been called.
*/ */
extern void hkfinish(); void hkfinish();
/** /**
* @brief Add a new task * @brief Add a new task
@ -60,10 +59,8 @@ extern void hkfinish();
* @param task Function to execute * @param task Function to execute
* @param data Data passed to function as the parameter * @param data Data passed to function as the parameter
* @param frequency Frequency of execution * @param frequency Frequency of execution
*
* @return 1 if task was added
*/ */
int hktask_add(const char *name, void (*task)(void *) , void *data, int frequency); void hktask_add(const char *name, void (*task)(void *) , void *data, int frequency);
/** /**
* @brief Add oneshot task * @brief Add oneshot task
@ -74,24 +71,20 @@ int hktask_add(const char *name, void (*task)(void *) , void *data, int frequenc
* @param task Function to execute * @param task Function to execute
* @param data Data passed to function as the parameter * @param data Data passed to function as the parameter
* @param when Number of seconds to wait until task is executed * @param when Number of seconds to wait until task is executed
*
* @return 1 if task was added
*/ */
int hktask_oneshot(const char *name, void (*task)(void *) , void *data, int when); void hktask_oneshot(const char *name, void (*task)(void *) , void *data, int when);
/** /**
* @brief Remove a task * @brief Remove all tasks with this name
* *
* @param name Task name * @param name Task name
*
* @return 1 if the task was removed
*/ */
int hktask_remove(const char *name); void hktask_remove(const char *name);
/** /**
* @brief Show the tasks that are scheduled for the house keeper * @brief Show the tasks that are scheduled for the house keeper
* *
* @param pdcb The DCB to send to output * @param pDcb The DCB to send to output
*/ */
void hkshow_tasks(DCB *pdcb); void hkshow_tasks(DCB *pdcb);

View File

@ -12,21 +12,22 @@
*/ */
#include <maxscale/cppdefs.hh> #include <maxscale/cppdefs.hh>
#include <maxscale/housekeeper.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <string> #include <string>
#include <vector>
#include <maxscale/alloc.h> #include <maxscale/alloc.h>
#include <maxscale/atomic.h> #include <maxscale/atomic.h>
#include <maxscale/clock.h> #include <maxscale/clock.h>
#include <maxscale/config.h> #include <maxscale/config.h>
#include <maxscale/housekeeper.h>
#include <maxscale/json_api.h>
#include <maxscale/query_classifier.h>
#include <maxscale/semaphore.h> #include <maxscale/semaphore.h>
#include <maxscale/spinlock.h> #include <maxscale/spinlock.h>
#include <maxscale/spinlock.hh>
#include <maxscale/thread.h> #include <maxscale/thread.h>
#include <maxscale/query_classifier.h>
#include <maxscale/json_api.h>
/** /**
* @file housekeeper.cc Provide a mechanism to run periodic tasks * @file housekeeper.cc Provide a mechanism to run periodic tasks
@ -55,18 +56,30 @@ enum hktask_type
HK_ONESHOT HK_ONESHOT
}; };
/** namespace
* The housekeeper task list
*/
struct HKTASK
{ {
char* name; /*< A simple task name */
void (*task)(void *data); /*< The task to call */ typedef void (*TASKFN)(void *data);
void *data; /*< Data to pass the task */
int frequency; /*< How often to call the tasks (seconds) */ // A task to perform
time_t nextdue; /*< When the task should be next run */ struct Task
enum hktask_type type; /*< The task type */ {
struct HKTASK* next; Task(std::string name, TASKFN func, void* data, int frequency, hktask_type type):
name(name),
func(func),
data(data),
frequency(frequency),
nextdue(time(0) + frequency),
type(type)
{
}
std::string name; /*< Task name */
TASKFN func; /*< The function to call */
void* data; /*< Data to pass to the function */
int frequency; /*< How often to call the tasks, in seconds */
time_t nextdue; /*< When the task should be next run */
hktask_type type; /*< The task type */
}; };
class Housekeeper class Housekeeper
@ -78,10 +91,17 @@ public:
static bool init(); static bool init();
void stop(); void stop();
void run(); void run();
void add(std::string name, TASKFN func, void* data, int frequency, hktask_type type);
void remove(std::string name);
void print_tasks(DCB* pDcb);
json_t* tasks_json(const char* host);
private: private:
THREAD m_thread; THREAD m_thread;
uint32_t m_running; uint32_t m_running;
std::vector<Task> m_tasks;
mxs::SpinLock m_lock;
}; };
// Helper struct used to initialize the housekeeper // Helper struct used to initialize the housekeeper
@ -91,20 +111,11 @@ struct hkinit_result
bool ok; bool ok;
}; };
/**
* List of all tasks that need to be run
*/
static HKTASK* tasks = NULL;
/**
* Spinlock to protect the tasks list
*/
static SPINLOCK tasklock = SPINLOCK_INIT;
static void hkthread(void *);
// The Housekeeper instance // The Housekeeper instance
static Housekeeper* hk = NULL; static Housekeeper* hk = NULL;
static void hkthread(void*);
Housekeeper::Housekeeper(): Housekeeper::Housekeeper():
m_running(1) m_running(1)
{ {
@ -145,34 +156,28 @@ void Housekeeper::run()
atomic_add_int64(&mxs_clock_ticks, 1); atomic_add_int64(&mxs_clock_ticks, 1);
} }
time_t now = time(0); time_t now = time(0);
spinlock_acquire(&tasklock); m_lock.acquire();
HKTASK* ptr = tasks;
while (atomic_load_uint32(&m_running) && ptr) for (auto it = m_tasks.begin(); it != m_tasks.end() && atomic_load_uint32(&m_running); it++)
{ {
if (ptr->nextdue <= now) if (it->nextdue <= now)
{ {
ptr->nextdue = now + ptr->frequency; it->nextdue = now + it->frequency;
// We need to copy type and name, in case hktask_remove is called from // We need to copy type and name, in case hktask_remove is called from
// the callback. Otherwise we will access freed data. // the callback. Otherwise we will access freed data.
enum hktask_type type = ptr->type; enum hktask_type type = it->type;
std::string name = ptr->name; std::string name = it->name;
spinlock_release(&tasklock); m_lock.release();
ptr->task(ptr->data); it->func(it->data);
if (type == HK_ONESHOT) if (type == HK_ONESHOT)
{ {
hktask_remove(name.c_str()); remove(name);
} }
spinlock_acquire(&tasklock); m_lock.acquire();
ptr = tasks;
}
else
{
ptr = ptr->next;
} }
} }
spinlock_release(&tasklock); m_lock.release();
} }
} }
@ -181,144 +186,98 @@ void Housekeeper::stop()
atomic_store_uint32(&m_running, 0); atomic_store_uint32(&m_running, 0);
} }
int hktask_add(const char *name, void (*taskfn)(void *), void *data, int frequency) void Housekeeper::add(std::string name, TASKFN func, void* data, int frequency, hktask_type type)
{ {
HKTASK *task, *ptr; mxs::SpinLockGuard guard(m_lock);
m_tasks.push_back(Task(name, func, data, frequency, type));
}
if ((task = (HKTASK *)MXS_MALLOC(sizeof(HKTASK))) == NULL) void Housekeeper::remove(std::string name)
{
mxs::SpinLockGuard guard(m_lock);
auto it = m_tasks.begin();
while (it != m_tasks.end())
{ {
return 0; if (it->name == name)
}
if ((task->name = MXS_STRDUP(name)) == NULL)
{
MXS_FREE(task);
return 0;
}
task->task = taskfn;
task->data = data;
task->frequency = frequency;
task->type = HK_REPEATED;
task->nextdue = time(0) + frequency;
task->next = NULL;
spinlock_acquire(&tasklock);
ptr = tasks;
while (ptr && ptr->next)
{
if (strcmp(ptr->name, name) == 0)
{ {
spinlock_release(&tasklock); it = m_tasks.erase(it);
MXS_FREE(task->name); continue;
MXS_FREE(task);
return 0;
} }
ptr = ptr->next; it++;
} }
if (ptr)
{
if (strcmp(ptr->name, name) == 0)
{
spinlock_release(&tasklock);
MXS_FREE(task->name);
MXS_FREE(task);
return 0;
}
ptr->next = task;
}
else
{
tasks = task;
}
spinlock_release(&tasklock);
return task->nextdue;
} }
int hktask_oneshot(const char *name, void (*taskfn)(void *), void *data, int when) void Housekeeper::print_tasks(DCB* pDcb)
{ {
HKTASK *task, *ptr; mxs::SpinLockGuard guard(m_lock);
dcb_printf(pDcb, "%-25s | Type | Frequency | Next Due\n", "Name");
dcb_printf(pDcb, "--------------------------+----------+-----------+-------------------------\n");
if ((task = (HKTASK *)MXS_MALLOC(sizeof(HKTASK))) == NULL) for (auto ptr = m_tasks.begin(); ptr != m_tasks.end(); ptr++)
{ {
return 0; struct tm tm;
char buf[40];
localtime_r(&ptr->nextdue, &tm);
asctime_r(&tm, buf);
dcb_printf(pDcb, "%-25s | %-8s | %-9d | %s", ptr->name.c_str(),
ptr->type == HK_REPEATED ? "Repeated" : "One-Shot",
ptr->frequency, buf);
} }
if ((task->name = MXS_STRDUP(name)) == NULL)
{
MXS_FREE(task);
return 0;
}
task->task = taskfn;
task->data = data;
task->frequency = 0;
task->type = HK_ONESHOT;
task->nextdue = time(0) + when;
task->next = NULL;
spinlock_acquire(&tasklock);
ptr = tasks;
while (ptr && ptr->next)
{
ptr = ptr->next;
}
if (ptr)
{
ptr->next = task;
}
else
{
tasks = task;
}
spinlock_release(&tasklock);
return task->nextdue;
} }
int hktask_remove(const char *name) json_t* Housekeeper::tasks_json(const char* host)
{ {
HKTASK *ptr, *lptr = NULL; json_t* arr = json_array();
spinlock_acquire(&tasklock); mxs::SpinLockGuard guard(m_lock);
ptr = tasks;
while (ptr && strcmp(ptr->name, name) != 0)
{
lptr = ptr;
ptr = ptr->next;
}
if (ptr && lptr)
{
lptr->next = ptr->next;
}
else if (ptr)
{
tasks = ptr->next;
}
spinlock_release(&tasklock);
if (ptr) for (auto ptr = m_tasks.begin(); ptr != m_tasks.end(); ptr++)
{ {
MXS_FREE(ptr->name); struct tm tm;
MXS_FREE(ptr); char buf[40];
return 1; localtime_r(&ptr->nextdue, &tm);
} asctime_r(&tm, buf);
else char* nl = strchr(buf, '\n');
{ ss_dassert(nl);
return 0; *nl = '\0';
const char* task_type = ptr->type == HK_REPEATED ? "Repeated" : "One-Shot";
json_t* obj = json_object();
json_object_set_new(obj, CN_ID, json_string(ptr->name.c_str()));
json_object_set_new(obj, CN_TYPE, json_string("tasks"));
json_t* attr = json_object();
json_object_set_new(attr, "task_type", json_string(task_type));
json_object_set_new(attr, "frequency", json_integer(ptr->frequency));
json_object_set_new(attr, "next_execution", json_string(buf));
json_object_set_new(obj, CN_ATTRIBUTES, attr);
json_array_append_new(arr, obj);
} }
return mxs_json_resource(host, MXS_JSON_API_TASKS, arr);
}
}
void hktask_add(const char *name, void (*taskfn)(void *), void *data, int frequency)
{
hk->add(name, taskfn, data, frequency, HK_REPEATED);
}
void hktask_oneshot(const char *name, void (*taskfn)(void *), void *data, int when)
{
hk->add(name, taskfn, data, when, HK_ONESHOT);
}
void hktask_remove(const char *name)
{
hk->remove(name);
} }
/**
* The housekeeper thread implementation.
*
* This function is responsible for executing the housekeeper tasks.
*
* The implementation of the callng of the task functions is such that
* the tasks are called without the tasklock spinlock being held. This
* allows manipulation of the housekeeper task list during execution of
* one of the tasks. The resutl is that upon completion of a task the
* search for tasks to run must restart from the start of the queue.
* It is vital that the task->nextdue tiem is updated before the task
* is run.
*
* @param data Unused, here to satisfy the thread system
*/
void hkthread(void *data) void hkthread(void *data)
{ {
struct hkinit_result* res = (struct hkinit_result*)data; struct hkinit_result* res = (struct hkinit_result*)data;
@ -331,7 +290,10 @@ void hkthread(void *data)
sem_post(&res->sem); sem_post(&res->sem);
hk->run(); if (res->ok)
{
hk->run();
}
qc_thread_end(QC_INIT_BOTH); qc_thread_end(QC_INIT_BOTH);
MXS_NOTICE("Housekeeper shutting down."); MXS_NOTICE("Housekeeper shutting down.");
@ -349,71 +311,18 @@ void hkshutdown()
void hkfinish() void hkfinish()
{ {
ss_dassert(hk);
MXS_NOTICE("Waiting for housekeeper to shut down."); MXS_NOTICE("Waiting for housekeeper to shut down.");
delete hk; delete hk;
hk = NULL;
MXS_NOTICE("Housekeeper has shut down."); MXS_NOTICE("Housekeeper has shut down.");
} }
void hkshow_tasks(DCB *pdcb) void hkshow_tasks(DCB *pDcb)
{ {
HKTASK *ptr; hk->print_tasks(pDcb);
struct tm tm;
char buf[40];
dcb_printf(pdcb, "%-25s | Type | Frequency | Next Due\n", "Name");
dcb_printf(pdcb, "--------------------------+----------+-----------+-------------------------\n");
spinlock_acquire(&tasklock);
ptr = tasks;
while (ptr)
{
localtime_r(&ptr->nextdue, &tm);
asctime_r(&tm, buf);
dcb_printf(pdcb, "%-25s | %-8s | %-9d | %s",
ptr->name,
ptr->type == HK_REPEATED ? "Repeated" : "One-Shot",
ptr->frequency,
buf);
ptr = ptr->next;
}
spinlock_release(&tasklock);
} }
json_t* hk_tasks_json(const char* host) json_t* hk_tasks_json(const char* host)
{ {
json_t* arr = json_array(); return hk->tasks_json(host);
spinlock_acquire(&tasklock);
for (HKTASK* ptr = tasks; ptr; ptr = ptr->next)
{
struct tm tm;
char buf[40];
localtime_r(&ptr->nextdue, &tm);
asctime_r(&tm, buf);
char* nl = strchr(buf, '\n');
ss_dassert(nl);
*nl = '\0';
const char* task_type = ptr->type == HK_REPEATED ? "Repeated" : "One-Shot";
json_t* obj = json_object();
json_object_set_new(obj, CN_ID, json_string(ptr->name));
json_object_set_new(obj, CN_TYPE, json_string("tasks"));
json_t* attr = json_object();
json_object_set_new(attr, "task_type", json_string(task_type));
json_object_set_new(attr, "frequency", json_integer(ptr->frequency));
json_object_set_new(attr, "next_execution", json_string(buf));
json_object_set_new(obj, CN_ATTRIBUTES, attr);
json_array_append_new(arr, obj);
}
spinlock_release(&tasklock);
return mxs_json_resource(host, MXS_JSON_API_TASKS, arr);
} }

View File

@ -1454,14 +1454,8 @@ bool session_delay_routing(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buf
std::auto_ptr<TaskAssignment> job(new TaskAssignment(task, worker)); std::auto_ptr<TaskAssignment> job(new TaskAssignment(task, worker));
TaskAssignment* pJob = job.release(); TaskAssignment* pJob = job.release();
if (hktask_oneshot(name.str().c_str(), delayed_routing_cb, pJob, seconds)) hktask_oneshot(name.str().c_str(), delayed_routing_cb, pJob, seconds);
{ success = true;
success = true;
}
else
{
delete pJob;
}
} }
catch (std::bad_alloc) catch (std::bad_alloc)
{ {

View File

@ -366,14 +366,12 @@ static bool conversion_task_ctl(AVRO_INSTANCE *inst, bool start)
/** Remove old task and create a new one */ /** Remove old task and create a new one */
hktask_remove(tasknm); hktask_remove(tasknm);
if (!start || hktask_add(tasknm, converter_func, inst, inst->task_delay)) if (start)
{ {
rval = true; hktask_add(tasknm, converter_func, inst, inst->task_delay);
}
else
{
MXS_ERROR("Failed to add binlog to Avro conversion task to housekeeper.");
} }
rval = true;
} }
return rval; return rval;