diff --git a/macros.cmake b/macros.cmake index 8f2a84a12..ba81e91e5 100644 --- a/macros.cmake +++ b/macros.cmake @@ -49,7 +49,7 @@ macro(set_variables) set(STATIC_EMBEDDED TRUE CACHE BOOL "Use static version of libmysqld") # Build RabbitMQ components - set(BUILD_RABBITMQ FALSE CACHE BOOL "Build RabbitMQ components") + set(BUILD_RABBITMQ TRUE CACHE BOOL "Build RabbitMQ components") # Build the binlog router set(BUILD_BINLOG TRUE CACHE BOOL "Build binlog router") diff --git a/server/modules/filter/mqfilter.c b/server/modules/filter/mqfilter.c index e614cf7b4..7d78f20ea 100644 --- a/server/modules/filter/mqfilter.c +++ b/server/modules/filter/mqfilter.c @@ -78,6 +78,8 @@ #include #include #include +#include + MODULE_INFO info = { MODULE_API_FILTER, MODULE_ALPHA_RELEASE, @@ -87,7 +89,7 @@ MODULE_INFO info = { static char *version_str = "V1.0.2"; static int uid_gen; - +static int hktask_id = 0; /* * The filter entry points */ @@ -179,6 +181,16 @@ typedef struct object_trigger_t{ int size; }OBJ_TRIG; +/** + * Statistics for the mqfilter. + */ +typedef struct mqstats_t{ + int n_msg; /*< Total number of messages */ + int n_sent; /*< Number of sent messages */ + int n_queued; /*< Number of unsent messages */ + +}MQSTATS; + /** * A instance structure, containing the hostname, login credentials, @@ -190,7 +202,6 @@ typedef struct object_trigger_t{ * routing key named 'key'. Type of the exchange is 'direct' by default and all queries are logged. * */ - typedef struct { int port; char *hostname; @@ -213,12 +224,14 @@ typedef struct { int conn_stat; /**state of the connection to the server*/ int rconn_intv; /**delay for reconnects, in seconds*/ time_t last_rconn; /**last reconnect attempt*/ - SPINLOCK* rconn_lock; + SPINLOCK rconn_lock; + SPINLOCK msg_lock; mqmessage* messages; enum log_trigger_t trgtype; SRC_TRIG* src_trg; SHM_TRIG* shm_trg; OBJ_TRIG* obj_trg; + MQSTATS stats; } MQ_INSTANCE; /** @@ -239,6 +252,9 @@ typedef struct { bool was_query; /**True if the previous routeQuery call had valid content*/ } MQ_SESSION; +void sendMessage(void* data); + + /** * Implementation of the mandatory version entry point * @@ -426,6 +442,8 @@ char** parse_optstr(char* str, char* tok, int* szstore) char *lasts, *tk = str; char **arr; int i = 0, size = 1; + + while((tk = strpbrk(tk + 1,tok))){ size++; } @@ -463,11 +481,12 @@ createInstance(char **options, FILTER_PARAMETER **params) int paramcount = 0, parammax = 64, i = 0, x = 0, arrsize = 0; FILTER_PARAMETER** paramlist; char** arr; + char taskname[512]; - if ((my_instance = calloc(1, sizeof(MQ_INSTANCE)))&& - (my_instance->rconn_lock = malloc(sizeof(SPINLOCK)))) + if ((my_instance = calloc(1, sizeof(MQ_INSTANCE)))) { - spinlock_init(my_instance->rconn_lock); + spinlock_init(&my_instance->rconn_lock); + spinlock_init(&my_instance->msg_lock); uid_gen = 0; paramlist = malloc(sizeof(FILTER_PARAMETER*)*64); @@ -670,7 +689,8 @@ createInstance(char **options, FILTER_PARAMETER **params) /**Connect to the server*/ init_conn(my_instance); - + snprintf(taskname,511,"mqtask%d",atomic_add(&hktask_id,1)); + hktask_add(taskname,sendMessage,(void*)my_instance,5); } return (FILTER *)my_instance; @@ -690,7 +710,7 @@ int declareQueue(MQ_INSTANCE *my_instance, MQ_SESSION* my_session, char* qname) int success = 1; amqp_rpc_reply_t reply; - spinlock_acquire(my_instance->rconn_lock); + spinlock_acquire(&my_instance->rconn_lock); amqp_queue_declare(my_instance->conn,my_instance->channel, amqp_cstring_bytes(qname), @@ -717,20 +737,23 @@ int declareQueue(MQ_INSTANCE *my_instance, MQ_SESSION* my_session, char* qname) "Error : Failed to bind queue to exchange."); } - spinlock_release(my_instance->rconn_lock); + spinlock_release(&my_instance->rconn_lock); return success; } /** * Broadcasts a message on the message stack to the RabbitMQ server - * and frees the allocated memory if successful. - * @return AMQP_STATUS_OK if the broadcasting was successful + * and frees the allocated memory if successful. This function is only called by + * the housekeeper thread. + * @param data MQfilter instance */ -int sendMessage(MQ_INSTANCE *instance) +void sendMessage(void* data) { - int err_code; + MQ_INSTANCE *instance = (MQ_INSTANCE*)data; mqmessage *tmp; - + int err_num; + + spinlock_acquire(&instance->rconn_lock); if(instance->conn_stat != AMQP_STATUS_OK){ if(difftime(time(NULL),instance->last_rconn) > instance->rconn_intv){ @@ -747,29 +770,70 @@ int sendMessage(MQ_INSTANCE *instance) "Error : Failed to reconnect to the MQRabbit server "); } } + err_num = instance->conn_stat; } - - if(instance->messages){ - instance->conn_stat = amqp_basic_publish(instance->conn,instance->channel, + spinlock_release(&instance->rconn_lock); + + if(err_num != AMQP_STATUS_OK) + { + /** No connection to the broker */ + return; + } + + spinlock_acquire(&instance->msg_lock); + tmp = instance->messages; + + if(tmp == NULL) + { + spinlock_release(&instance->msg_lock); + return; + } + + instance->messages = instance->messages->next; + spinlock_release(&instance->msg_lock); + + while(tmp){ + + err_num = amqp_basic_publish(instance->conn,instance->channel, amqp_cstring_bytes(instance->exchange), amqp_cstring_bytes(instance->key), - 0,0,instance->messages->prop,amqp_cstring_bytes(instance->messages->msg)); + 0,0,tmp->prop,amqp_cstring_bytes(tmp->msg)); - - /**Message was sent successfully*/ - if(instance->conn_stat == AMQP_STATUS_OK){ - tmp = instance->messages; - instance->messages = instance->messages->next; + spinlock_acquire(&instance->rconn_lock); + instance->conn_stat = err_num; + spinlock_release(&instance->rconn_lock); + + if(err_num == AMQP_STATUS_OK){ + /**Message was sent successfully*/ free(tmp->prop); free(tmp->msg); free(tmp); + + atomic_add(&instance->stats.n_sent,1); + atomic_add(&instance->stats.n_queued,-1); + spinlock_acquire(&instance->msg_lock); + tmp = instance->messages; + + if(tmp == NULL) + { + spinlock_release(&instance->msg_lock); + return; + } + + instance->messages = instance->messages->next; + spinlock_release(&instance->msg_lock); + } + else + { + spinlock_acquire(&instance->msg_lock); + tmp->next = instance->messages; + instance->messages = tmp; + spinlock_release(&instance->msg_lock); + return; } } - err_code = instance->conn_stat; - - return err_code; } @@ -781,34 +845,30 @@ int sendMessage(MQ_INSTANCE *instance) */ void pushMessage(MQ_INSTANCE *instance, amqp_basic_properties_t* prop, char* msg) { - spinlock_acquire(instance->rconn_lock); - mqmessage* newmsg = malloc(sizeof(mqmessage)); + mqmessage* newmsg = calloc(1,sizeof(mqmessage)); if(newmsg){ + newmsg->msg = msg; newmsg->prop = prop; - newmsg->next = NULL; - - if(instance->messages){ - newmsg->next = instance->messages; - } - - instance->messages = newmsg; }else{ skygw_log_write(LOGFILE_ERROR, "Error : Cannot allocate enough memory."); free(prop); free(msg); + return; } - while(instance->messages){ - if(sendMessage(instance) != AMQP_STATUS_OK){ - break; - } - } + spinlock_acquire(&instance->msg_lock); + + newmsg->next = instance->messages; + instance->messages = newmsg; - spinlock_release(instance->rconn_lock); + spinlock_release(&instance->msg_lock); + + atomic_add(&instance->stats.n_msg,1); + atomic_add(&instance->stats.n_queued,1); } @@ -1459,12 +1519,18 @@ diagnostic(FILTER *instance, void *fsession, DCB *dcb) if (my_instance) { - dcb_printf(dcb, "\t\tConnecting to %s:%d as %s/%s.\nVhost: %s\tExchange: %s\tKey: %s\tQueue: %s\n", + dcb_printf(dcb, "Connecting to %s:%d as '%s'.\nVhost: %s\tExchange: %s\nKey: %s\tQueue: %s\n\n", my_instance->hostname,my_instance->port, - my_instance->username,my_instance->password, + my_instance->username, my_instance->vhost, my_instance->exchange, my_instance->key, my_instance->queue ); + dcb_printf(dcb, "%-16s%-16s%-16s\n", + "Messages","Queued","Sent"); + dcb_printf(dcb, "%-16d%-16d%-16d\n", + my_instance->stats.n_msg, + my_instance->stats.n_queued, + my_instance->stats.n_sent); } }