Added stats to mqfilter and moved the actual sending of the messages to a separate housekeeper task.

This commit is contained in:
Markus Makela
2015-02-18 15:20:40 +02:00
parent 2dc553fe6e
commit 8b8512890d
2 changed files with 110 additions and 44 deletions

View File

@ -49,7 +49,7 @@ macro(set_variables)
set(STATIC_EMBEDDED TRUE CACHE BOOL "Use static version of libmysqld") set(STATIC_EMBEDDED TRUE CACHE BOOL "Use static version of libmysqld")
# Build RabbitMQ components # Build RabbitMQ components
set(BUILD_RABBITMQ FALSE CACHE BOOL "Build RabbitMQ components") set(BUILD_RABBITMQ TRUE CACHE BOOL "Build RabbitMQ components")
# Build the binlog router # Build the binlog router
set(BUILD_BINLOG TRUE CACHE BOOL "Build binlog router") set(BUILD_BINLOG TRUE CACHE BOOL "Build binlog router")

View File

@ -78,6 +78,8 @@
#include <spinlock.h> #include <spinlock.h>
#include <session.h> #include <session.h>
#include <plugin.h> #include <plugin.h>
#include <housekeeper.h>
MODULE_INFO info = { MODULE_INFO info = {
MODULE_API_FILTER, MODULE_API_FILTER,
MODULE_ALPHA_RELEASE, MODULE_ALPHA_RELEASE,
@ -87,7 +89,7 @@ MODULE_INFO info = {
static char *version_str = "V1.0.2"; static char *version_str = "V1.0.2";
static int uid_gen; static int uid_gen;
static int hktask_id = 0;
/* /*
* The filter entry points * The filter entry points
*/ */
@ -179,6 +181,16 @@ typedef struct object_trigger_t{
int size; int size;
}OBJ_TRIG; }OBJ_TRIG;
/**
* Statistics for the mqfilter.
*/
typedef struct mqstats_t{
int n_msg; /*< Total number of messages */
int n_sent; /*< Number of sent messages */
int n_queued; /*< Number of unsent messages */
}MQSTATS;
/** /**
* A instance structure, containing the hostname, login credentials, * A instance structure, containing the hostname, login credentials,
@ -190,7 +202,6 @@ typedef struct object_trigger_t{
* routing key named 'key'. Type of the exchange is 'direct' by default and all queries are logged. * routing key named 'key'. Type of the exchange is 'direct' by default and all queries are logged.
* *
*/ */
typedef struct { typedef struct {
int port; int port;
char *hostname; char *hostname;
@ -213,12 +224,14 @@ typedef struct {
int conn_stat; /**state of the connection to the server*/ int conn_stat; /**state of the connection to the server*/
int rconn_intv; /**delay for reconnects, in seconds*/ int rconn_intv; /**delay for reconnects, in seconds*/
time_t last_rconn; /**last reconnect attempt*/ time_t last_rconn; /**last reconnect attempt*/
SPINLOCK* rconn_lock; SPINLOCK rconn_lock;
SPINLOCK msg_lock;
mqmessage* messages; mqmessage* messages;
enum log_trigger_t trgtype; enum log_trigger_t trgtype;
SRC_TRIG* src_trg; SRC_TRIG* src_trg;
SHM_TRIG* shm_trg; SHM_TRIG* shm_trg;
OBJ_TRIG* obj_trg; OBJ_TRIG* obj_trg;
MQSTATS stats;
} MQ_INSTANCE; } MQ_INSTANCE;
/** /**
@ -239,6 +252,9 @@ typedef struct {
bool was_query; /**True if the previous routeQuery call had valid content*/ bool was_query; /**True if the previous routeQuery call had valid content*/
} MQ_SESSION; } MQ_SESSION;
void sendMessage(void* data);
/** /**
* Implementation of the mandatory version entry point * Implementation of the mandatory version entry point
* *
@ -426,6 +442,8 @@ char** parse_optstr(char* str, char* tok, int* szstore)
char *lasts, *tk = str; char *lasts, *tk = str;
char **arr; char **arr;
int i = 0, size = 1; int i = 0, size = 1;
while((tk = strpbrk(tk + 1,tok))){ while((tk = strpbrk(tk + 1,tok))){
size++; size++;
} }
@ -463,11 +481,12 @@ createInstance(char **options, FILTER_PARAMETER **params)
int paramcount = 0, parammax = 64, i = 0, x = 0, arrsize = 0; int paramcount = 0, parammax = 64, i = 0, x = 0, arrsize = 0;
FILTER_PARAMETER** paramlist; FILTER_PARAMETER** paramlist;
char** arr; char** arr;
char taskname[512];
if ((my_instance = calloc(1, sizeof(MQ_INSTANCE)))&& if ((my_instance = calloc(1, sizeof(MQ_INSTANCE))))
(my_instance->rconn_lock = malloc(sizeof(SPINLOCK))))
{ {
spinlock_init(my_instance->rconn_lock); spinlock_init(&my_instance->rconn_lock);
spinlock_init(&my_instance->msg_lock);
uid_gen = 0; uid_gen = 0;
paramlist = malloc(sizeof(FILTER_PARAMETER*)*64); paramlist = malloc(sizeof(FILTER_PARAMETER*)*64);
@ -670,7 +689,8 @@ createInstance(char **options, FILTER_PARAMETER **params)
/**Connect to the server*/ /**Connect to the server*/
init_conn(my_instance); init_conn(my_instance);
snprintf(taskname,511,"mqtask%d",atomic_add(&hktask_id,1));
hktask_add(taskname,sendMessage,(void*)my_instance,5);
} }
return (FILTER *)my_instance; return (FILTER *)my_instance;
@ -690,7 +710,7 @@ int declareQueue(MQ_INSTANCE *my_instance, MQ_SESSION* my_session, char* qname)
int success = 1; int success = 1;
amqp_rpc_reply_t reply; amqp_rpc_reply_t reply;
spinlock_acquire(my_instance->rconn_lock); spinlock_acquire(&my_instance->rconn_lock);
amqp_queue_declare(my_instance->conn,my_instance->channel, amqp_queue_declare(my_instance->conn,my_instance->channel,
amqp_cstring_bytes(qname), amqp_cstring_bytes(qname),
@ -717,20 +737,23 @@ int declareQueue(MQ_INSTANCE *my_instance, MQ_SESSION* my_session, char* qname)
"Error : Failed to bind queue to exchange."); "Error : Failed to bind queue to exchange.");
} }
spinlock_release(my_instance->rconn_lock); spinlock_release(&my_instance->rconn_lock);
return success; return success;
} }
/** /**
* Broadcasts a message on the message stack to the RabbitMQ server * Broadcasts a message on the message stack to the RabbitMQ server
* and frees the allocated memory if successful. * and frees the allocated memory if successful. This function is only called by
* @return AMQP_STATUS_OK if the broadcasting was successful * the housekeeper thread.
* @param data MQfilter instance
*/ */
int sendMessage(MQ_INSTANCE *instance) void sendMessage(void* data)
{ {
int err_code; MQ_INSTANCE *instance = (MQ_INSTANCE*)data;
mqmessage *tmp; mqmessage *tmp;
int err_num;
spinlock_acquire(&instance->rconn_lock);
if(instance->conn_stat != AMQP_STATUS_OK){ if(instance->conn_stat != AMQP_STATUS_OK){
if(difftime(time(NULL),instance->last_rconn) > instance->rconn_intv){ if(difftime(time(NULL),instance->last_rconn) > instance->rconn_intv){
@ -747,29 +770,70 @@ int sendMessage(MQ_INSTANCE *instance)
"Error : Failed to reconnect to the MQRabbit server "); "Error : Failed to reconnect to the MQRabbit server ");
} }
} }
err_num = instance->conn_stat;
} }
spinlock_release(&instance->rconn_lock);
if(instance->messages){
instance->conn_stat = amqp_basic_publish(instance->conn,instance->channel, if(err_num != AMQP_STATUS_OK)
{
/** No connection to the broker */
return;
}
spinlock_acquire(&instance->msg_lock);
tmp = instance->messages;
if(tmp == NULL)
{
spinlock_release(&instance->msg_lock);
return;
}
instance->messages = instance->messages->next;
spinlock_release(&instance->msg_lock);
while(tmp){
err_num = amqp_basic_publish(instance->conn,instance->channel,
amqp_cstring_bytes(instance->exchange), amqp_cstring_bytes(instance->exchange),
amqp_cstring_bytes(instance->key), amqp_cstring_bytes(instance->key),
0,0,instance->messages->prop,amqp_cstring_bytes(instance->messages->msg)); 0,0,tmp->prop,amqp_cstring_bytes(tmp->msg));
spinlock_acquire(&instance->rconn_lock);
/**Message was sent successfully*/ instance->conn_stat = err_num;
if(instance->conn_stat == AMQP_STATUS_OK){ spinlock_release(&instance->rconn_lock);
tmp = instance->messages;
instance->messages = instance->messages->next; if(err_num == AMQP_STATUS_OK){
/**Message was sent successfully*/
free(tmp->prop); free(tmp->prop);
free(tmp->msg); free(tmp->msg);
free(tmp); free(tmp);
atomic_add(&instance->stats.n_sent,1);
atomic_add(&instance->stats.n_queued,-1);
spinlock_acquire(&instance->msg_lock);
tmp = instance->messages;
if(tmp == NULL)
{
spinlock_release(&instance->msg_lock);
return;
}
instance->messages = instance->messages->next;
spinlock_release(&instance->msg_lock);
}
else
{
spinlock_acquire(&instance->msg_lock);
tmp->next = instance->messages;
instance->messages = tmp;
spinlock_release(&instance->msg_lock);
return;
} }
} }
err_code = instance->conn_stat;
return err_code;
} }
@ -781,34 +845,30 @@ int sendMessage(MQ_INSTANCE *instance)
*/ */
void pushMessage(MQ_INSTANCE *instance, amqp_basic_properties_t* prop, char* msg) void pushMessage(MQ_INSTANCE *instance, amqp_basic_properties_t* prop, char* msg)
{ {
spinlock_acquire(instance->rconn_lock);
mqmessage* newmsg = malloc(sizeof(mqmessage)); mqmessage* newmsg = calloc(1,sizeof(mqmessage));
if(newmsg){ if(newmsg){
newmsg->msg = msg; newmsg->msg = msg;
newmsg->prop = prop; newmsg->prop = prop;
newmsg->next = NULL;
if(instance->messages){
newmsg->next = instance->messages;
}
instance->messages = newmsg;
}else{ }else{
skygw_log_write(LOGFILE_ERROR, skygw_log_write(LOGFILE_ERROR,
"Error : Cannot allocate enough memory."); "Error : Cannot allocate enough memory.");
free(prop); free(prop);
free(msg); free(msg);
return;
} }
while(instance->messages){ spinlock_acquire(&instance->msg_lock);
if(sendMessage(instance) != AMQP_STATUS_OK){
break; newmsg->next = instance->messages;
} instance->messages = newmsg;
}
spinlock_release(instance->rconn_lock); spinlock_release(&instance->msg_lock);
atomic_add(&instance->stats.n_msg,1);
atomic_add(&instance->stats.n_queued,1);
} }
@ -1459,12 +1519,18 @@ diagnostic(FILTER *instance, void *fsession, DCB *dcb)
if (my_instance) if (my_instance)
{ {
dcb_printf(dcb, "\t\tConnecting to %s:%d as %s/%s.\nVhost: %s\tExchange: %s\tKey: %s\tQueue: %s\n", dcb_printf(dcb, "Connecting to %s:%d as '%s'.\nVhost: %s\tExchange: %s\nKey: %s\tQueue: %s\n\n",
my_instance->hostname,my_instance->port, my_instance->hostname,my_instance->port,
my_instance->username,my_instance->password, my_instance->username,
my_instance->vhost, my_instance->exchange, my_instance->vhost, my_instance->exchange,
my_instance->key, my_instance->queue my_instance->key, my_instance->queue
); );
dcb_printf(dcb, "%-16s%-16s%-16s\n",
"Messages","Queued","Sent");
dcb_printf(dcb, "%-16d%-16d%-16d\n",
my_instance->stats.n_msg,
my_instance->stats.n_queued,
my_instance->stats.n_sent);
} }
} }