From 29b6e53eb864dd76a7733cfd0310d8381c94d7db Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Tue, 8 Jan 2019 12:25:55 +0200 Subject: [PATCH] MXS-2218 Replace mq housekeeper task with delayed call --- server/modules/filter/mqfilter/mqfilter.cc | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/server/modules/filter/mqfilter/mqfilter.cc b/server/modules/filter/mqfilter/mqfilter.cc index b1a28f916..96c3f75d5 100644 --- a/server/modules/filter/mqfilter/mqfilter.cc +++ b/server/modules/filter/mqfilter/mqfilter.cc @@ -76,7 +76,7 @@ #include #include #include -#include +#include #include static int uid_gen; @@ -241,7 +241,7 @@ typedef struct bool was_query; /**True if the previous routeQuery call had valid content*/ } MQ_SESSION; -bool sendMessage(void* data); +bool sendMessage(mxb::Worker::Call::action_t action, MQ_INSTANCE* instance); static const MXS_ENUM_VALUE trigger_values[] = { @@ -659,9 +659,9 @@ static MXS_FILTER* createInstance(const char* name, MXS_CONFIG_PARAMETER* params /**Connect to the server*/ init_conn(my_instance); - char taskname[512]; - snprintf(taskname, 511, "mqtask%d", atomic_add(&hktask_id, 1)); - hktask_add(taskname, sendMessage, (void*)my_instance, 5); + mxb::Worker& worker = mxs::MainWorker::get(); + + worker.delayed_call(5000, sendMessage, my_instance); } return (MXS_FILTER*)my_instance; @@ -718,9 +718,13 @@ int declareQueue(MQ_INSTANCE* my_instance, MQ_SESSION* my_session, char* qname) * the housekeeper thread. * @param data MQfilter instance */ -bool sendMessage(void* data) +bool sendMessage(mxb::Worker::Call::action_t action, MQ_INSTANCE* instance) { - MQ_INSTANCE* instance = (MQ_INSTANCE*) data; + if (action == mxb::Worker::Call::CANCEL) + { + return true; + } + mqmessage* tmp; int err_num = AMQP_STATUS_OK;