MXS-1506: Combine housekeeper task types

The tasks themselves now control whether they are executed again. To
compare it to the old system, oneshot tasks now return `false` and
repeating tasks return `true`.

Letting the housekeeper remove the tasks makes the code simpler and
removes the possibility of the task being removed while it is being
executed. It does introduce a deadlock possibility if a housekeeper
function is called inside a housekeeper task.
This commit is contained in:
Markus Mäkelä
2018-04-03 15:12:33 +03:00
parent 96a0aae7fe
commit c70216390f
10 changed files with 105 additions and 105 deletions

View File

@ -21,6 +21,17 @@
MXS_BEGIN_DECLS
/**
* The task callback function
*
* The parameter is the user data given to the `hktask_add` function.
*
* If the function returns true, the same task is added back to the queue and
* executed again at a later point in time. If the function returns false,
* the task is removed.
*/
typedef bool (*TASKFN)(void *data);
/**
* Initialises the housekeeper mechanism.
*
@ -60,19 +71,7 @@ void hkfinish();
* @param data Data passed to function as the parameter
* @param frequency Frequency of execution
*/
void hktask_add(const char *name, void (*task)(void *) , void *data, int frequency);
/**
* @brief Add oneshot task
*
* The task will only execute once.
*
* @param name Task name
* @param task Function to execute
* @param data Data passed to function as the parameter
* @param when Number of seconds to wait until task is executed
*/
void hktask_oneshot(const char *name, void (*task)(void *) , void *data, int when);
void hktask_add(const char *name, TASKFN func, void *data, int frequency);
/**
* @brief Remove all tasks with this name

View File

@ -15,7 +15,7 @@
#include <stdlib.h>
#include <string.h>
#include <string>
#include <vector>
#include <list>
#include <maxscale/alloc.h>
#include <maxscale/atomic.h>
@ -42,6 +42,8 @@
* is incremented every 100ms and can be read with the mxs_clock() function.
*/
static void hkthread(void*);
// TODO: Move these into a separate file
static int64_t mxs_clock_ticks = 0; /*< One clock tick is 100 milliseconds */
@ -50,36 +52,40 @@ int64_t mxs_clock()
return atomic_load_int64(&mxs_clock_ticks);
}
enum hktask_type
{
HK_REPEATED = 1,
HK_ONESHOT
};
namespace
{
typedef void (*TASKFN)(void *data);
// A task to perform
struct Task
{
Task(std::string name, TASKFN func, void* data, int frequency, hktask_type type):
Task(std::string name, TASKFN func, void* data, int frequency):
name(name),
func(func),
data(data),
frequency(frequency),
nextdue(time(0) + frequency),
type(type)
nextdue(time(0) + frequency)
{
}
struct NameMatch
{
NameMatch(std::string name):
m_name(name) {}
bool operator()(const Task& task)
{
return task.name == m_name;
}
std::string m_name;
};
std::string name; /*< Task name */
TASKFN func; /*< The function to call */
void* data; /*< Data to pass to the function */
int frequency; /*< How often to call the tasks, in seconds */
time_t nextdue; /*< When the task should be next run */
hktask_type type; /*< The task type */
};
class Housekeeper
@ -91,7 +97,7 @@ public:
static bool init();
void stop();
void run();
void add(std::string name, TASKFN func, void* data, int frequency, hktask_type type);
void add(const Task& task);
void remove(std::string name);
void print_tasks(DCB* pDcb);
@ -100,8 +106,13 @@ public:
private:
THREAD m_thread;
uint32_t m_running;
std::vector<Task> m_tasks;
std::list<Task> m_tasks;
mxs::SpinLock m_lock;
bool is_running() const
{
return atomic_load_uint32(&m_running);
}
};
// Helper struct used to initialize the housekeeper
@ -114,8 +125,6 @@ struct hkinit_result
// The Housekeeper instance
static Housekeeper* hk = NULL;
static void hkthread(void*);
Housekeeper::Housekeeper():
m_running(1)
{
@ -148,36 +157,33 @@ bool Housekeeper::init()
void Housekeeper::run()
{
while (atomic_load_uint32(&m_running))
while (is_running())
{
for (int i = 0; i < 10; i++)
{
thread_millisleep(100);
atomic_add_int64(&mxs_clock_ticks, 1);
}
time_t now = time(0);
m_lock.acquire();
for (auto it = m_tasks.begin(); it != m_tasks.end() && atomic_load_uint32(&m_running); it++)
mxs::SpinLockGuard guard(m_lock);
time_t now = time(0);
auto it = m_tasks.begin();
while (it != m_tasks.end() && is_running())
{
if (it->nextdue <= now)
{
it->nextdue = now + it->frequency;
// We need to copy type and name, in case hktask_remove is called from
// the callback. Otherwise we will access freed data.
enum hktask_type type = it->type;
std::string name = it->name;
m_lock.release();
it->func(it->data);
if (type == HK_ONESHOT)
if (!it->func(it->data))
{
remove(name);
it = m_tasks.erase(it);
continue;
}
m_lock.acquire();
}
it++;
}
m_lock.release();
}
}
@ -186,26 +192,16 @@ void Housekeeper::stop()
atomic_store_uint32(&m_running, 0);
}
void Housekeeper::add(std::string name, TASKFN func, void* data, int frequency, hktask_type type)
void Housekeeper::add(const Task& task)
{
mxs::SpinLockGuard guard(m_lock);
m_tasks.push_back(Task(name, func, data, frequency, type));
m_tasks.push_back(task);
}
void Housekeeper::remove(std::string name)
{
mxs::SpinLockGuard guard(m_lock);
auto it = m_tasks.begin();
while (it != m_tasks.end())
{
if (it->name == name)
{
it = m_tasks.erase(it);
continue;
}
it++;
}
m_tasks.remove_if(Task::NameMatch(name));
}
void Housekeeper::print_tasks(DCB* pDcb)
@ -220,9 +216,7 @@ void Housekeeper::print_tasks(DCB* pDcb)
char buf[40];
localtime_r(&ptr->nextdue, &tm);
asctime_r(&tm, buf);
dcb_printf(pDcb, "%-25s | %-8s | %-9d | %s", ptr->name.c_str(),
ptr->type == HK_REPEATED ? "Repeated" : "One-Shot",
ptr->frequency, buf);
dcb_printf(pDcb, "%-25s | %-9d | %s", ptr->name.c_str(), ptr->frequency, buf);
}
}
@ -242,15 +236,12 @@ json_t* Housekeeper::tasks_json(const char* host)
ss_dassert(nl);
*nl = '\0';
const char* task_type = ptr->type == HK_REPEATED ? "Repeated" : "One-Shot";
json_t* obj = json_object();
json_object_set_new(obj, CN_ID, json_string(ptr->name.c_str()));
json_object_set_new(obj, CN_TYPE, json_string("tasks"));
json_t* attr = json_object();
json_object_set_new(attr, "task_type", json_string(task_type));
json_object_set_new(attr, "frequency", json_integer(ptr->frequency));
json_object_set_new(attr, "next_execution", json_string(buf));
@ -263,14 +254,10 @@ json_t* Housekeeper::tasks_json(const char* host)
}
void hktask_add(const char *name, void (*taskfn)(void *), void *data, int frequency)
void hktask_add(const char *name, TASKFN func, void *data, int frequency)
{
hk->add(name, taskfn, data, frequency, HK_REPEATED);
}
void hktask_oneshot(const char *name, void (*taskfn)(void *), void *data, int when)
{
hk->add(name, taskfn, data, when, HK_ONESHOT);
Task task(name, func, data, frequency);
hk->add(task);
}
void hktask_remove(const char *name)

View File

@ -96,7 +96,7 @@ static int find_type(typelib_t* tl, const char* needle, int maxlen);
static void service_add_qualified_param(SERVICE* svc,
MXS_CONFIG_PARAMETER* param);
static void service_internal_restart(void *data);
static bool service_internal_restart(void *data);
static void service_calculate_weights(SERVICE *service);
SERVICE* service_alloc(const char *name, const char *router)
@ -427,8 +427,7 @@ int serviceStartAllPorts(SERVICE* service)
int retry_after = MXS_MIN(service->stats.n_failed_starts * 10, service->max_retry_interval);
snprintf(taskname, sizeof(taskname), "%s_start_retry_%d",
service->name, service->stats.n_failed_starts);
hktask_oneshot(taskname, service_internal_restart,
(void*) service, retry_after);
hktask_add(taskname, service_internal_restart, service, retry_after);
MXS_NOTICE("Failed to start service %s, retrying in %d seconds.",
service->name, retry_after);
@ -2212,10 +2211,11 @@ serviceGetList()
* Function called by the housekeeper thread to retry starting of a service
* @param data Service to restart
*/
static void service_internal_restart(void *data)
static bool service_internal_restart(void *data)
{
SERVICE* service = (SERVICE*)data;
serviceStartAllPorts(service);
return false;
}
/**

View File

@ -1432,11 +1432,12 @@ struct TaskAssignment
Worker* worker;
};
static void delayed_routing_cb(void* data)
static bool delayed_routing_cb(void* data)
{
TaskAssignment* job = static_cast<TaskAssignment*>(data);
job->worker->post(job->task, mxs::Worker::EXECUTE_QUEUED);
delete job;
return false;
}
bool session_delay_routing(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buffer, int seconds)
@ -1454,7 +1455,7 @@ bool session_delay_routing(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buf
std::auto_ptr<TaskAssignment> job(new TaskAssignment(task, worker));
TaskAssignment* pJob = job.release();
hktask_oneshot(name.str().c_str(), delayed_routing_cb, pJob, seconds);
hktask_add(name.str().c_str(), delayed_routing_cb, pJob, seconds);
success = true;
}
catch (std::bad_alloc)

View File

@ -238,7 +238,7 @@ typedef struct
bool was_query; /**True if the previous routeQuery call had valid content*/
} MQ_SESSION;
void sendMessage(void* data);
bool sendMessage(void* data);
static const MXS_ENUM_VALUE trigger_values[] =
{
@ -673,7 +673,7 @@ int declareQueue(MQ_INSTANCE *my_instance, MQ_SESSION* my_session, char* qname)
* the housekeeper thread.
* @param data MQfilter instance
*/
void sendMessage(void* data)
bool sendMessage(void* data)
{
MQ_INSTANCE *instance = (MQ_INSTANCE*) data;
mqmessage *tmp;
@ -704,7 +704,7 @@ void sendMessage(void* data)
if (err_num != AMQP_STATUS_OK)
{
/** No connection to the broker */
return;
return true;
}
spinlock_acquire(&instance->msg_lock);
@ -713,7 +713,7 @@ void sendMessage(void* data)
if (tmp == NULL)
{
spinlock_release(&instance->msg_lock);
return;
return true;
}
instance->messages = instance->messages->next;
@ -745,7 +745,7 @@ void sendMessage(void* data)
if (tmp == NULL)
{
spinlock_release(&instance->msg_lock);
return;
return true;
}
instance->messages = instance->messages->next;
@ -757,9 +757,11 @@ void sendMessage(void* data)
tmp->next = instance->messages;
instance->messages = tmp;
spinlock_release(&instance->msg_lock);
return;
return true;
}
}
return true;
}
/**

View File

@ -77,7 +77,7 @@ static void errorReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
static uint64_t getCapabilities(MXS_ROUTER* instance);
extern int MaxScaleUptime();
extern void avro_get_used_tables(AVRO_INSTANCE *router, DCB *dcb);
void converter_func(void* data);
bool converter_func(void* data);
bool binlog_next_file_exists(const char* binlogdir, const char* binlog);
int blr_file_get_next_binlogname(const char *router);
bool avro_load_conversion_state(AVRO_INSTANCE *router);
@ -1177,7 +1177,7 @@ stats_func(void *inst)
/**
* Conversion task: MySQL binlogs to AVRO files
*/
void converter_func(void* data)
bool converter_func(void* data)
{
AVRO_INSTANCE* router = (AVRO_INSTANCE*) data;
bool ok = true;
@ -1227,6 +1227,8 @@ void converter_func(void* data)
router->binlog_name, router->current_pos, router->task_delay);
}
}
return true;
}
/**

View File

@ -97,7 +97,7 @@ bool blr_get_encryption_key(ROUTER_INSTANCE *router);
int blr_parse_key_file(ROUTER_INSTANCE *router);
static bool blr_open_gtid_maps_storage(ROUTER_INSTANCE *inst);
static void stats_func(void *);
static bool stats_func(void *);
static bool rses_begin_locked_router_action(ROUTER_SLAVE *);
static void rses_end_locked_router_action(ROUTER_SLAVE *);
@ -2505,7 +2505,7 @@ static uint64_t getCapabilities(MXS_ROUTER* instance)
*
* @param inst The router instance
*/
static void
static bool
stats_func(void *inst)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)inst;
@ -2531,6 +2531,8 @@ stats_func(void *inst)
slave = slave->next;
}
spinlock_release(&router->lock);
return true;
}
/**

View File

@ -1018,7 +1018,7 @@ extern const char *blr_skip_leading_sql_comments(const char *);
extern bool blr_fetch_mariadb_gtid(ROUTER_SLAVE *,
const char *,
MARIADB_GTID_INFO *);
extern void blr_start_master_in_main(void* data);
extern bool blr_start_master_in_main(void* data);
extern bool blr_binlog_file_exists(ROUTER_INSTANCE *router,
const MARIADB_GTID_INFO *info_file);

View File

@ -64,7 +64,7 @@ static void blr_log_packet(int priority, char *msg, uint8_t *ptr, int len);
void blr_master_close(ROUTER_INSTANCE *);
char *blr_extract_column(GWBUF *buf, int col);
void poll_fake_write_event(DCB *dcb);
static void blr_check_last_master_event(void *inst);
static bool blr_check_last_master_event(void *inst);
extern int blr_check_heartbeat(ROUTER_INSTANCE *router);
static void blr_log_identity(ROUTER_INSTANCE *router);
static void blr_extract_header_semisync(uint8_t *pkt, REP_HEADER *hdr);
@ -251,10 +251,10 @@ static void blr_start_master(void* data)
if (name)
{
sprintf(name, "%s %s", router->service->name, master);
hktask_oneshot(name,
blr_start_master_in_main,
router,
connect_retry);
hktask_add(name,
blr_start_master_in_main,
router,
connect_retry);
MXS_FREE(name);
}
@ -324,7 +324,7 @@ static void worker_cb_start_master(int worker_id, void* data)
*
* @param data Data intended for `blr_start_master`.
*/
void blr_start_master_in_main(void* data)
bool blr_start_master_in_main(void* data)
{
// The master should be connected to in the main worker, so we post it a
// message and call `blr_start_master` there.
@ -339,6 +339,8 @@ void blr_start_master_in_main(void* data)
{
MXS_ERROR("Could not post 'blr_start_master' message to main worker.");
}
return false;
}
/**
@ -423,10 +425,10 @@ blr_restart_master(ROUTER_INSTANCE *router)
if (name)
{
sprintf(name, "%s %s", router->service->name, master);
hktask_oneshot(name,
blr_start_master_in_main,
router,
connect_retry);
hktask_add(name,
blr_start_master_in_main,
router,
connect_retry);
MXS_FREE(name);
MXS_ERROR("%s: failed to connect to master server '%s', "
@ -529,10 +531,10 @@ blr_master_delayed_connect(ROUTER_INSTANCE *router)
if (name)
{
sprintf(name, "%s %s", router->service->name, master);
hktask_oneshot(name,
blr_start_master_in_main,
router,
router->retry_interval);
hktask_add(name,
blr_start_master_in_main,
router,
router->retry_interval);
MXS_FREE(name);
}
}
@ -1917,9 +1919,10 @@ blr_stop_start_master(ROUTER_INSTANCE *router)
* @param router Current router instance
*/
static void
static bool
blr_check_last_master_event(void *inst)
{
bool rval = true;
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)inst;
int master_check = 1;
int master_state = BLRM_UNCONNECTED;
@ -1953,8 +1956,10 @@ blr_check_last_master_event(void *inst)
"%s heartbeat",
router->service->name);
hktask_remove(task_name);
rval = false;
}
return rval;
}
/**

View File

@ -279,7 +279,7 @@ static int blr_slave_send_columndef_with_status_schema(ROUTER_INSTANCE *router,
int type,
int len,
uint8_t seqno);
static void blr_send_slave_heartbeat(void *inst);
static bool blr_send_slave_heartbeat(void *inst);
static int blr_slave_send_heartbeat(ROUTER_INSTANCE *router,
ROUTER_SLAVE *slave);
static int blr_set_master_ssl(ROUTER_INSTANCE *router,
@ -6099,7 +6099,7 @@ blr_slave_send_columndef_with_status_schema(ROUTER_INSTANCE *router,
* @param router Current router instance
*/
static void
static bool
blr_send_slave_heartbeat(void *inst)
{
ROUTER_SLAVE *sptr = NULL;
@ -6136,6 +6136,8 @@ blr_send_slave_heartbeat(void *inst)
}
spinlock_release(&router->lock);
return true;
}
/**