From 84d2c72db2c9daa7cdc33ecd373577548b116460 Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Thu, 19 Nov 2015 05:34:36 +0200 Subject: [PATCH] Formatted mqfilter Mqfilter formatted according to the style guide. --- server/modules/filter/mqfilter.c | 2185 ++++++++++++++++-------------- 1 file changed, 1170 insertions(+), 1015 deletions(-) diff --git a/server/modules/filter/mqfilter.c b/server/modules/filter/mqfilter.c index 1587240bd..eace7fcb8 100644 --- a/server/modules/filter/mqfilter.c +++ b/server/modules/filter/mqfilter.c @@ -18,7 +18,7 @@ /** * @file mqfilter.c - * MQ Filter - AMQP Filter. + * MQ Filter - AMQP Filter. * A filter that logs and publishes canonized queries on to a RabbitMQ server. * * The filter reads the routed query, forms a canonized version of it and publishes the @@ -28,7 +28,7 @@ * * The filter makes no attempt to deal with queries that do not fit * in a single GWBUF or result sets that span multiple GWBUFs. - * + * * To use a SSL connection the CA certificate, the client certificate and the client public key must be provided. * By default this filter uses a TCP connection. *@verbatim @@ -49,7 +49,7 @@ * ssl_CA_cert Path to the CA certificate in PEM format * ssl_client_cert Path to the client cerificate in PEM format * ssl_client_key Path to the client public key in PEM format - * + * * The logging trigger levels are: * all Log everything * source Trigger on statements originating from a particular source (database user and host combination) @@ -80,11 +80,12 @@ #include #include -MODULE_INFO info = { - MODULE_API_FILTER, - MODULE_ALPHA_RELEASE, - FILTER_VERSION, - "A RabbitMQ query logging filter" +MODULE_INFO info = +{ + MODULE_API_FILTER, + MODULE_ALPHA_RELEASE, + FILTER_VERSION, + "A RabbitMQ query logging filter" }; static char *version_str = "V1.0.2"; @@ -93,46 +94,49 @@ static int hktask_id = 0; /* * The filter entry points */ -static FILTER *createInstance(char **options, FILTER_PARAMETER **); -static void *newSession(FILTER *instance, SESSION *session); -static void closeSession(FILTER *instance, void *session); -static void freeSession(FILTER *instance, void *session); -static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); -static void setUpstream(FILTER *instance, void *fsession, UPSTREAM *upstream); -static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); -static int clientReply(FILTER *instance, void *fsession, GWBUF *queue); -static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); +static FILTER *createInstance(char **options, FILTER_PARAMETER **); +static void *newSession(FILTER *instance, SESSION *session); +static void closeSession(FILTER *instance, void *session); +static void freeSession(FILTER *instance, void *session); +static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); +static void setUpstream(FILTER *instance, void *fsession, UPSTREAM *upstream); +static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); +static int clientReply(FILTER *instance, void *fsession, GWBUF *queue); +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); -static FILTER_OBJECT MyObject = { - createInstance, - newSession, - closeSession, - freeSession, - setDownstream, - setUpstream, - routeQuery, - clientReply, - diagnostic, +static FILTER_OBJECT MyObject = +{ + createInstance, + newSession, + closeSession, + freeSession, + setDownstream, + setUpstream, + routeQuery, + clientReply, + diagnostic, }; /** *Structure used to store messages and their properties. */ -typedef struct mqmessage_t { - amqp_basic_properties_t *prop; - char *msg; - struct mqmessage_t *next; -}mqmessage; +typedef struct mqmessage_t +{ + amqp_basic_properties_t *prop; + char *msg; + struct mqmessage_t *next; +} mqmessage; /** *Logging trigger levels */ -enum log_trigger_t{ - TRG_ALL = 0x00, - TRG_SOURCE = 0x01, - TRG_SCHEMA = 0x02, - TRG_OBJECT = 0x04 +enum log_trigger_t +{ + TRG_ALL = 0x00, + TRG_SOURCE = 0x01, + TRG_SCHEMA = 0x02, + TRG_OBJECT = 0x04 }; /** @@ -146,92 +150,94 @@ enum log_trigger_t{ * logging_source_host Comma-separated list of hostnames to log * @endverbatim */ -typedef struct source_trigger_t{ - char** user; - int usize; - char** host; - int hsize; -}SRC_TRIG; +typedef struct source_trigger_t +{ + char** user; + int usize; + char** host; + int hsize; +} SRC_TRIG; /** * Schema logging trigger * * Log only those queries that target a specific database. - * + * * Trigger options: * logging_schema Comma-separated list of databases */ -typedef struct schema_trigger_t{ - char** objects; - int size; -}SHM_TRIG; - +typedef struct schema_trigger_t +{ + char** objects; + int size; +} SHM_TRIG; /** * Database object logging trigger * * Log only those queries that target specific database objects. - *@verbatim + *@verbatim * Trigger options: * logging_object Comma-separated list of database objects *@endverbatim */ -typedef struct object_trigger_t{ - char** objects; - int size; -}OBJ_TRIG; +typedef struct object_trigger_t +{ + char** objects; + int size; +} OBJ_TRIG; /** * Statistics for the mqfilter. */ -typedef struct mqstats_t{ +typedef struct mqstats_t +{ int n_msg; /*< Total number of messages */ int n_sent; /*< Number of sent messages */ int n_queued; /*< Number of unsent messages */ - -}MQSTATS; - +} MQSTATS; /** * A instance structure, containing the hostname, login credentials, * virtual host location and the names of the exchange and the key. * Also contains the paths to the CA certificate and client certificate and key. - * + * * Default values assume that a local RabbitMQ server is running on port 5672 with the default * user 'guest' and the password 'guest' using a default exchange named 'default_exchange' with a - * routing key named 'key'. Type of the exchange is 'direct' by default and all queries are logged. - * + * routing key named 'key'. Type of the exchange is 'direct' by default and all queries are logged. + * */ -typedef struct { - int port; - char *hostname; - char *username; - char *password; - char *vhost; - char *exchange; - char *exchange_type; - char *key; - char *queue; - bool use_ssl; - bool log_all; - bool strict_logging; - char *ssl_CA_cert; - char *ssl_client_cert; - char *ssl_client_key; - amqp_connection_state_t conn; /**The connection object*/ - amqp_socket_t* sock; /**The currently active socket*/ - amqp_channel_t channel; /**The current channel in use*/ - int conn_stat; /**state of the connection to the server*/ - int rconn_intv; /**delay for reconnects, in seconds*/ - time_t last_rconn; /**last reconnect attempt*/ - SPINLOCK rconn_lock; - SPINLOCK msg_lock; - mqmessage* messages; - enum log_trigger_t trgtype; - SRC_TRIG* src_trg; - SHM_TRIG* shm_trg; - OBJ_TRIG* obj_trg; - MQSTATS stats; +typedef struct +{ + int port; + char *hostname; + char *username; + char *password; + char *vhost; + char *exchange; + char *exchange_type; + char *key; + char *queue; + bool use_ssl; + bool log_all; + bool strict_logging; + char *ssl_CA_cert; + char *ssl_client_cert; + char *ssl_client_key; + amqp_connection_state_t conn; /**The connection object*/ + amqp_socket_t* sock; /**The currently active socket*/ + amqp_channel_t channel; /**The current channel in use*/ + int conn_stat; /**state of the connection to the server*/ + int rconn_intv; /**delay for reconnects, in seconds*/ + time_t last_rconn; /**last reconnect attempt*/ + SPINLOCK rconn_lock; + SPINLOCK msg_lock; + mqmessage* messages; + enum log_trigger_t trgtype; + SRC_TRIG* src_trg; + SHM_TRIG* shm_trg; + OBJ_TRIG* obj_trg; + MQSTATS stats; } MQ_INSTANCE; /** @@ -243,18 +249,18 @@ typedef struct { * Also holds the necessary session connection information. * */ -typedef struct { - char* uid; /**Unique identifier used to tag messages*/ - char* db; /**The currently active database*/ - DOWNSTREAM down; - UPSTREAM up; - SESSION* session; - bool was_query; /**True if the previous routeQuery call had valid content*/ +typedef struct +{ + char* uid; /**Unique identifier used to tag messages*/ + char* db; /**The currently active database*/ + DOWNSTREAM down; + UPSTREAM up; + SESSION* session; + bool was_query; /**True if the previous routeQuery call had valid content*/ } MQ_SESSION; void sendMessage(void* data); - /** * Implementation of the mandatory version entry point * @@ -263,7 +269,7 @@ void sendMessage(void* data); char * version() { - return version_str; + return version_str; } /** @@ -286,134 +292,154 @@ ModuleInit() FILTER_OBJECT * GetModuleObject() { - return &MyObject; + return &MyObject; } /** - * Internal function used to initialize the connection to + * Internal function used to initialize the connection to * the RabbitMQ server. Also used to reconnect to the server * in case the connection fails and to redeclare exchanges * and queues if they are lost - * + * */ -static int +static int init_conn(MQ_INSTANCE *my_instance) -{ - int rval = 0; - int amqp_ok = AMQP_STATUS_OK; +{ + int rval = 0; + int amqp_ok = AMQP_STATUS_OK; - if(my_instance->use_ssl){ + if (my_instance->use_ssl) + { - if((my_instance->sock = amqp_ssl_socket_new(my_instance->conn)) != NULL){ + if ((my_instance->sock = amqp_ssl_socket_new(my_instance->conn)) != NULL) + { - if((amqp_ok = amqp_ssl_socket_set_cacert(my_instance->sock,my_instance->ssl_CA_cert)) != AMQP_STATUS_OK){ - MXS_ERROR("Failed to set CA certificate: %s", amqp_error_string2(amqp_ok)); - goto cleanup; - } - if((amqp_ok = amqp_ssl_socket_set_key(my_instance->sock, - my_instance->ssl_client_cert, - my_instance->ssl_client_key)) != AMQP_STATUS_OK){ - MXS_ERROR("Failed to set client certificate and key: %s", amqp_error_string2(amqp_ok)); - goto cleanup; - } - }else{ + if ((amqp_ok = amqp_ssl_socket_set_cacert(my_instance->sock, my_instance->ssl_CA_cert)) != AMQP_STATUS_OK) + { + MXS_ERROR("Failed to set CA certificate: %s", amqp_error_string2(amqp_ok)); + goto cleanup; + } + if ((amqp_ok = amqp_ssl_socket_set_key(my_instance->sock, + my_instance->ssl_client_cert, + my_instance->ssl_client_key)) != AMQP_STATUS_OK) + { + MXS_ERROR("Failed to set client certificate and key: %s", amqp_error_string2(amqp_ok)); + goto cleanup; + } + } + else + { + + amqp_ok = AMQP_STATUS_SSL_CONNECTION_FAILED; + MXS_ERROR("SSL socket creation failed."); + goto cleanup; + } + + /**SSL is not used, falling back to TCP*/ + } + else if ((my_instance->sock = amqp_tcp_socket_new(my_instance->conn)) == NULL) + { + MXS_ERROR("TCP socket creation failed."); + goto cleanup; - amqp_ok = AMQP_STATUS_SSL_CONNECTION_FAILED; - MXS_ERROR("SSL socket creation failed."); - goto cleanup; } - /**SSL is not used, falling back to TCP*/ - }else if((my_instance->sock = amqp_tcp_socket_new(my_instance->conn)) == NULL){ - MXS_ERROR("TCP socket creation failed."); - goto cleanup; - - } - - /**Socket creation was successful, trying to open the socket*/ - if((amqp_ok = amqp_socket_open(my_instance->sock,my_instance->hostname,my_instance->port)) != AMQP_STATUS_OK){ - MXS_ERROR("Failed to open socket: %s", amqp_error_string2(amqp_ok)); - goto cleanup; - } - amqp_rpc_reply_t reply; - reply = amqp_login(my_instance->conn,my_instance->vhost,0,AMQP_DEFAULT_FRAME_SIZE,0,AMQP_SASL_METHOD_PLAIN,my_instance->username,my_instance->password); - if(reply.reply_type != AMQP_RESPONSE_NORMAL){ - MXS_ERROR("Login to RabbitMQ server failed."); - goto cleanup; - } - amqp_channel_open(my_instance->conn,my_instance->channel); - reply = amqp_get_rpc_reply(my_instance->conn); - if(reply.reply_type != AMQP_RESPONSE_NORMAL){ - MXS_ERROR("Channel creation failed."); - goto cleanup; - } - - amqp_exchange_declare(my_instance->conn,my_instance->channel, - amqp_cstring_bytes(my_instance->exchange), - amqp_cstring_bytes(my_instance->exchange_type), - 0, 1, - amqp_empty_table); - - reply = amqp_get_rpc_reply(my_instance->conn); - - if(reply.reply_type != AMQP_RESPONSE_NORMAL){ - MXS_ERROR("Exchange declaration failed,trying to redeclare the exchange."); - if(reply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION){ - if(reply.reply.id == AMQP_CHANNEL_CLOSE_METHOD){ - amqp_send_method(my_instance->conn,my_instance->channel,AMQP_CHANNEL_CLOSE_OK_METHOD,NULL); - }else if(reply.reply.id == AMQP_CONNECTION_CLOSE_METHOD){ - amqp_send_method(my_instance->conn,my_instance->channel,AMQP_CONNECTION_CLOSE_OK_METHOD,NULL); - } - - my_instance->channel++; - amqp_channel_open(my_instance->conn,my_instance->channel); - - amqp_exchange_delete(my_instance->conn,my_instance->channel,amqp_cstring_bytes(my_instance->exchange),0); - amqp_exchange_declare(my_instance->conn,my_instance->channel, - amqp_cstring_bytes(my_instance->exchange), - amqp_cstring_bytes(my_instance->exchange_type), - 0, 1, - amqp_empty_table); - reply = amqp_get_rpc_reply(my_instance->conn); - } - if(reply.reply_type != AMQP_RESPONSE_NORMAL){ - MXS_ERROR("Exchange redeclaration failed."); + /**Socket creation was successful, trying to open the socket*/ + if ((amqp_ok = amqp_socket_open(my_instance->sock, my_instance->hostname, my_instance->port)) != AMQP_STATUS_OK) + { + MXS_ERROR("Failed to open socket: %s", amqp_error_string2(amqp_ok)); goto cleanup; } - } - - if(my_instance->queue){ - - - - amqp_queue_declare(my_instance->conn,my_instance->channel, - amqp_cstring_bytes(my_instance->queue), - 0, 1, 0, 0, - amqp_empty_table); - reply = amqp_get_rpc_reply(my_instance->conn); - if(reply.reply_type != AMQP_RESPONSE_NORMAL){ - MXS_ERROR("Queue declaration failed."); + amqp_rpc_reply_t reply; + reply = amqp_login(my_instance->conn, my_instance->vhost, 0, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, my_instance->username, my_instance->password); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + MXS_ERROR("Login to RabbitMQ server failed."); + goto cleanup; + } + amqp_channel_open(my_instance->conn, my_instance->channel); + reply = amqp_get_rpc_reply(my_instance->conn); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + MXS_ERROR("Channel creation failed."); goto cleanup; } - - amqp_queue_bind(my_instance->conn,my_instance->channel, - amqp_cstring_bytes(my_instance->queue), - amqp_cstring_bytes(my_instance->exchange), - amqp_cstring_bytes(my_instance->key), - amqp_empty_table); - reply = amqp_get_rpc_reply(my_instance->conn); - if(reply.reply_type != AMQP_RESPONSE_NORMAL){ - MXS_ERROR("Failed to bind queue to exchange."); - goto cleanup; + amqp_exchange_declare(my_instance->conn, my_instance->channel, + amqp_cstring_bytes(my_instance->exchange), + amqp_cstring_bytes(my_instance->exchange_type), + 0, 1, + amqp_empty_table); + + reply = amqp_get_rpc_reply(my_instance->conn); + + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + MXS_ERROR("Exchange declaration failed,trying to redeclare the exchange."); + if (reply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) + { + if (reply.reply.id == AMQP_CHANNEL_CLOSE_METHOD) + { + amqp_send_method(my_instance->conn, my_instance->channel, AMQP_CHANNEL_CLOSE_OK_METHOD, NULL); + } + else if (reply.reply.id == AMQP_CONNECTION_CLOSE_METHOD) + { + amqp_send_method(my_instance->conn, my_instance->channel, AMQP_CONNECTION_CLOSE_OK_METHOD, NULL); + } + + my_instance->channel++; + amqp_channel_open(my_instance->conn, my_instance->channel); + + amqp_exchange_delete(my_instance->conn, my_instance->channel, amqp_cstring_bytes(my_instance->exchange), 0); + amqp_exchange_declare(my_instance->conn, my_instance->channel, + amqp_cstring_bytes(my_instance->exchange), + amqp_cstring_bytes(my_instance->exchange_type), + 0, 1, + amqp_empty_table); + reply = amqp_get_rpc_reply(my_instance->conn); + } + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + MXS_ERROR("Exchange redeclaration failed."); + goto cleanup; + } } - } - rval = 1; - cleanup: + if (my_instance->queue) + { + + + + amqp_queue_declare(my_instance->conn, my_instance->channel, + amqp_cstring_bytes(my_instance->queue), + 0, 1, 0, 0, + amqp_empty_table); + reply = amqp_get_rpc_reply(my_instance->conn); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + MXS_ERROR("Queue declaration failed."); + goto cleanup; + } + + + amqp_queue_bind(my_instance->conn, my_instance->channel, + amqp_cstring_bytes(my_instance->queue), + amqp_cstring_bytes(my_instance->exchange), + amqp_cstring_bytes(my_instance->key), + amqp_empty_table); + reply = amqp_get_rpc_reply(my_instance->conn); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + MXS_ERROR("Failed to bind queue to exchange."); + goto cleanup; + } + } + rval = 1; + +cleanup: + + return rval; - return rval; - } /** @@ -427,264 +453,329 @@ init_conn(MQ_INSTANCE *my_instance) */ char** parse_optstr(char* str, char* tok, int* szstore) { - char *lasts, *tk = str; - char **arr; - int i = 0, size = 1; - - - while((tk = strpbrk(tk + 1,tok))){ - size++; - } + char *lasts, *tk = str; + char **arr; + int i = 0, size = 1; - arr = malloc(sizeof(char*)*size); - if(arr == NULL){ - MXS_ERROR("Cannot allocate enough memory."); - *szstore = 0; - return NULL; - } - - *szstore = size; - tk = strtok_r(str,tok, &lasts); - while(tk && i < size){ - arr[i++] = strdup(tk); - tk = strtok_r(NULL,tok,&lasts); - } - return arr; + while ((tk = strpbrk(tk + 1, tok))) + { + size++; + } + + arr = malloc(sizeof(char*)*size); + + if (arr == NULL) + { + MXS_ERROR("Cannot allocate enough memory."); + *szstore = 0; + return NULL; + } + + *szstore = size; + tk = strtok_r(str, tok, &lasts); + while (tk && i < size) + { + arr[i++] = strdup(tk); + tk = strtok_r(NULL, tok, &lasts); + } + return arr; } /** * Create an instance of the filter for a particular service * within MaxScale. - * + * * @param options The options for this filter * * @return The instance data for this new instance */ -static FILTER * +static FILTER * createInstance(char **options, FILTER_PARAMETER **params) { - MQ_INSTANCE *my_instance; - int paramcount = 0, parammax = 64, i = 0, x = 0, arrsize = 0; - FILTER_PARAMETER** paramlist; - char** arr; - char taskname[512]; - - if ((my_instance = calloc(1, sizeof(MQ_INSTANCE)))) + MQ_INSTANCE *my_instance; + int paramcount = 0, parammax = 64, i = 0, x = 0, arrsize = 0; + FILTER_PARAMETER** paramlist; + char** arr; + char taskname[512]; + + if ((my_instance = calloc(1, sizeof(MQ_INSTANCE)))) { - spinlock_init(&my_instance->rconn_lock); - spinlock_init(&my_instance->msg_lock); - uid_gen = 0; - paramlist = malloc(sizeof(FILTER_PARAMETER*)*64); + spinlock_init(&my_instance->rconn_lock); + spinlock_init(&my_instance->msg_lock); + uid_gen = 0; + paramlist = malloc(sizeof(FILTER_PARAMETER*)*64); - if((my_instance->conn = amqp_new_connection()) == NULL){ + if ((my_instance->conn = amqp_new_connection()) == NULL) + { + return NULL; + } + my_instance->channel = 1; + my_instance->last_rconn = time(NULL); + my_instance->conn_stat = AMQP_STATUS_OK; + my_instance->rconn_intv = 1; + my_instance->port = 5672; + my_instance->trgtype = TRG_ALL; + my_instance->log_all = false; + my_instance->strict_logging = true; + + for (i = 0; params[i]; i++) + { + if (!strcmp(params[i]->name, "hostname")) + { + my_instance->hostname = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "username")) + { + my_instance->username = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "password")) + { + my_instance->password = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "vhost")) + { + my_instance->vhost = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "port")) + { + my_instance->port = atoi(params[i]->value); + } + else if (!strcmp(params[i]->name, "exchange")) + { + my_instance->exchange = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "key")) + { + my_instance->key = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "queue")) + { + my_instance->queue = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "ssl_client_certificate")) + { + + my_instance->ssl_client_cert = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "ssl_client_key")) + { + + my_instance->ssl_client_key = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "ssl_CA_cert")) + { + + my_instance->ssl_CA_cert = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "exchange_type")) + { + + my_instance->exchange_type = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "logging_trigger")) + { + + arr = parse_optstr(params[i]->value, ",", &arrsize); + + for (x = 0; x < arrsize; x++) + { + if (!strcmp(arr[x], "source")) + { + my_instance->trgtype |= TRG_SOURCE; + } + else if (!strcmp(arr[x], "schema")) + { + my_instance->trgtype |= TRG_SCHEMA; + } + else if (!strcmp(arr[x], "object")) + { + my_instance->trgtype |= TRG_OBJECT; + } + else if (!strcmp(arr[x], "all")) + { + my_instance->trgtype = TRG_ALL; + } + else + { + MXS_ERROR("Unknown option for 'logging_trigger':%s.", arr[x]); + } + } + + if (arrsize > 0) + { + free(arr); + } + arrsize = 0; - return NULL; - } - my_instance->channel = 1; - my_instance->last_rconn = time(NULL); - my_instance->conn_stat = AMQP_STATUS_OK; - my_instance->rconn_intv = 1; - my_instance->port = 5672; - my_instance->trgtype = TRG_ALL; - my_instance->log_all = false; - my_instance->strict_logging = true; - for(i = 0;params[i];i++){ - if(!strcmp(params[i]->name,"hostname")){ - my_instance->hostname = strdup(params[i]->value); - }else if(!strcmp(params[i]->name,"username")){ - my_instance->username = strdup(params[i]->value); - }else if(!strcmp(params[i]->name,"password")){ - my_instance->password = strdup(params[i]->value); - }else if(!strcmp(params[i]->name,"vhost")){ - my_instance->vhost = strdup(params[i]->value); - }else if(!strcmp(params[i]->name,"port")){ - my_instance->port = atoi(params[i]->value); - }else if(!strcmp(params[i]->name,"exchange")){ - my_instance->exchange = strdup(params[i]->value); - }else if(!strcmp(params[i]->name,"key")){ - my_instance->key = strdup(params[i]->value); - }else if(!strcmp(params[i]->name,"queue")){ - my_instance->queue = strdup(params[i]->value); - } - else if(!strcmp(params[i]->name,"ssl_client_certificate")){ + } + else if (strstr(params[i]->name, "logging_")) + { - my_instance->ssl_client_cert = strdup(params[i]->value); - - }else if(!strcmp(params[i]->name,"ssl_client_key")){ + if (paramcount < parammax) + { + paramlist[paramcount] = malloc(sizeof(FILTER_PARAMETER)); + paramlist[paramcount]->name = strdup(params[i]->name); + paramlist[paramcount]->value = strdup(params[i]->value); + paramcount++; + } + } + } - my_instance->ssl_client_key = strdup(params[i]->value); - - }else if(!strcmp(params[i]->name,"ssl_CA_cert")){ + if (my_instance->trgtype & TRG_SOURCE) + { - my_instance->ssl_CA_cert = strdup(params[i]->value); + my_instance->src_trg = (SRC_TRIG*) malloc(sizeof(SRC_TRIG)); + my_instance->src_trg->user = NULL; + my_instance->src_trg->host = NULL; + my_instance->src_trg->usize = 0; + my_instance->src_trg->hsize = 0; - }else if(!strcmp(params[i]->name,"exchange_type")){ + } - my_instance->exchange_type = strdup(params[i]->value); + if (my_instance->trgtype & TRG_SCHEMA) + { - }else if(!strcmp(params[i]->name,"logging_trigger")){ - - arr = parse_optstr(params[i]->value,",",&arrsize); + my_instance->shm_trg = (SHM_TRIG*) malloc(sizeof(SHM_TRIG)); + my_instance->shm_trg->objects = NULL; + my_instance->shm_trg->size = 0; - for(x = 0;xtrgtype |= TRG_SOURCE; - }else if(!strcmp(arr[x],"schema")){ - my_instance->trgtype |= TRG_SCHEMA; - }else if(!strcmp(arr[x],"object")){ - my_instance->trgtype |= TRG_OBJECT; - }else if(!strcmp(arr[x],"all")){ - my_instance->trgtype = TRG_ALL; - }else{ - MXS_ERROR("Unknown option for 'logging_trigger':%s.",arr[x]); - } - } + } - if(arrsize > 0){ - free(arr); - } - arrsize = 0; - + if (my_instance->trgtype & TRG_OBJECT) + { + my_instance->obj_trg = (OBJ_TRIG*) malloc(sizeof(OBJ_TRIG)); + my_instance->obj_trg->objects = NULL; + my_instance->obj_trg->size = 0; - }else if(strstr(params[i]->name,"logging_")){ + } - if(paramcount < parammax){ - paramlist[paramcount] = malloc(sizeof(FILTER_PARAMETER)); - paramlist[paramcount]->name = strdup(params[i]->name); - paramlist[paramcount]->value = strdup(params[i]->value); - paramcount++; - } - - } + for (i = 0; i < paramcount; i++) + { - } + if (!strcmp(paramlist[i]->name, "logging_source_user")) + { - if(my_instance->trgtype & TRG_SOURCE){ + if (my_instance->src_trg) + { + my_instance->src_trg->user = parse_optstr(paramlist[i]->value, ",", &arrsize); + my_instance->src_trg->usize = arrsize; + arrsize = 0; + } - my_instance->src_trg = (SRC_TRIG*)malloc(sizeof(SRC_TRIG)); - my_instance->src_trg->user = NULL; - my_instance->src_trg->host = NULL; - my_instance->src_trg->usize = 0; - my_instance->src_trg->hsize = 0; + } + else if (!strcmp(paramlist[i]->name, "logging_source_host")) + { - } + if (my_instance->src_trg) + { + my_instance->src_trg->host = parse_optstr(paramlist[i]->value, ",", &arrsize); + my_instance->src_trg->hsize = arrsize; + arrsize = 0; + } - if(my_instance->trgtype & TRG_SCHEMA){ + } + else if (!strcmp(paramlist[i]->name, "logging_schema")) + { - my_instance->shm_trg = (SHM_TRIG*)malloc(sizeof(SHM_TRIG)); - my_instance->shm_trg->objects = NULL; - my_instance->shm_trg->size = 0; + if (my_instance->shm_trg) + { + my_instance->shm_trg->objects = parse_optstr(paramlist[i]->value, ",", &arrsize); + my_instance->shm_trg->size = arrsize; + arrsize = 0; + } - } + } + else if (!strcmp(paramlist[i]->name, "logging_object")) + { - if(my_instance->trgtype & TRG_OBJECT){ + if (my_instance->obj_trg) + { + my_instance->obj_trg->objects = parse_optstr(paramlist[i]->value, ",", &arrsize); + my_instance->obj_trg->size = arrsize; + arrsize = 0; + } - my_instance->obj_trg = (OBJ_TRIG*)malloc(sizeof(OBJ_TRIG)); - my_instance->obj_trg->objects = NULL; - my_instance->obj_trg->size = 0; + } + else if (!strcmp(paramlist[i]->name, "logging_log_all")) + { + if (config_truth_value(paramlist[i]->value)) + { + my_instance->log_all = true; + } + } + else if (!strcmp(paramlist[i]->name, "logging_strict")) + { + if (!config_truth_value(paramlist[i]->value)) + { + my_instance->strict_logging = false; + } + } + free(paramlist[i]->name); + free(paramlist[i]->value); + free(paramlist[i]); + } - } + free(paramlist); - for(i = 0;ihostname == NULL) + { + my_instance->hostname = strdup("localhost"); + } + if (my_instance->username == NULL) + { + my_instance->username = strdup("guest"); + } + if (my_instance->password == NULL) + { + my_instance->password = strdup("guest"); + } + if (my_instance->vhost == NULL) + { + my_instance->vhost = strdup("/"); + } + if (my_instance->exchange == NULL) + { + my_instance->exchange = strdup("default_exchange"); + } + if (my_instance->key == NULL) + { + my_instance->key = strdup("key"); + } + if (my_instance->exchange_type == NULL) + { + my_instance->exchange_type = strdup("direct"); + } - if(!strcmp(paramlist[i]->name,"logging_source_user")){ - - if(my_instance->src_trg){ - my_instance->src_trg->user = parse_optstr(paramlist[i]->value,",",&arrsize); - my_instance->src_trg->usize = arrsize; - arrsize = 0; - } + if (my_instance->ssl_client_cert != NULL && + my_instance->ssl_client_key != NULL && + my_instance->ssl_CA_cert != NULL) + { + my_instance->use_ssl = true; + } + else + { + my_instance->use_ssl = false; + } - }else if(!strcmp(paramlist[i]->name,"logging_source_host")){ - - if(my_instance->src_trg){ - my_instance->src_trg->host = parse_optstr(paramlist[i]->value,",",&arrsize); - my_instance->src_trg->hsize = arrsize; - arrsize = 0; - } + if (my_instance->use_ssl) + { + amqp_set_initialize_ssl_library(0); /**Assume the underlying SSL library is already initialized*/ + } - }else if(!strcmp(paramlist[i]->name,"logging_schema")){ - - if(my_instance->shm_trg){ - my_instance->shm_trg->objects = parse_optstr(paramlist[i]->value,",",&arrsize); - my_instance->shm_trg->size = arrsize; - arrsize = 0; - } + /**Connect to the server*/ + init_conn(my_instance); - }else if(!strcmp(paramlist[i]->name,"logging_object")){ - - if(my_instance->obj_trg){ - my_instance->obj_trg->objects = parse_optstr(paramlist[i]->value,",",&arrsize); - my_instance->obj_trg->size = arrsize; - arrsize = 0; - } - - }else if(!strcmp(paramlist[i]->name,"logging_log_all")){ - if(config_truth_value(paramlist[i]->value)){ - my_instance->log_all = true; - } - }else if(!strcmp(paramlist[i]->name,"logging_strict")){ - if(!config_truth_value(paramlist[i]->value)){ - my_instance->strict_logging = false; - } - } - free(paramlist[i]->name); - free(paramlist[i]->value); - free(paramlist[i]); - } - - free(paramlist); - - if(my_instance->hostname == NULL){ - my_instance->hostname = strdup("localhost"); - } - if(my_instance->username == NULL){ - my_instance->username = strdup("guest"); - } - if(my_instance->password == NULL){ - my_instance->password = strdup("guest"); - } - if(my_instance->vhost == NULL){ - my_instance->vhost = strdup("/"); - } - if(my_instance->exchange == NULL){ - my_instance->exchange = strdup("default_exchange"); - } - if(my_instance->key == NULL){ - my_instance->key = strdup("key"); - } - if(my_instance->exchange_type == NULL){ - my_instance->exchange_type = strdup("direct"); - } - - if(my_instance->ssl_client_cert != NULL && - my_instance->ssl_client_key != NULL && - my_instance->ssl_CA_cert != NULL){ - my_instance->use_ssl = true; - }else{ - my_instance->use_ssl = false; - } - - if(my_instance->use_ssl){ - amqp_set_initialize_ssl_library(0);/**Assume the underlying SSL library is already initialized*/ - } - - /**Connect to the server*/ - init_conn(my_instance); - - snprintf(taskname,511,"mqtask%d",atomic_add(&hktask_id,1)); - hktask_add(taskname,sendMessage,(void*)my_instance,5); + snprintf(taskname, 511, "mqtask%d", atomic_add(&hktask_id, 1)); + hktask_add(taskname, sendMessage, (void*) my_instance, 5); } - return (FILTER *)my_instance; + return(FILTER *) my_instance; } - - /** * Declares a persistent, non-exclusive and non-passive queue that * auto-deletes after all the messages have been consumed. @@ -692,38 +783,39 @@ createInstance(char **options, FILTER_PARAMETER **params) * @param qname Name of the queue to be declared * @return Returns 0 if an error occurred, 1 if successful */ -int declareQueue(MQ_INSTANCE *my_instance, MQ_SESSION* my_session, char* qname) +int declareQueue(MQ_INSTANCE *my_instance, MQ_SESSION* my_session, char* qname) { - int success = 1; - amqp_rpc_reply_t reply; + int success = 1; + amqp_rpc_reply_t reply; - spinlock_acquire(&my_instance->rconn_lock); + spinlock_acquire(&my_instance->rconn_lock); - amqp_queue_declare(my_instance->conn,my_instance->channel, - amqp_cstring_bytes(qname), - 0, 1, 0, 1, - amqp_empty_table); - reply = amqp_get_rpc_reply(my_instance->conn); - if(reply.reply_type != AMQP_RESPONSE_NORMAL){ - success = 0; - MXS_ERROR("Queue declaration failed."); - - } + amqp_queue_declare(my_instance->conn, my_instance->channel, + amqp_cstring_bytes(qname), + 0, 1, 0, 1, + amqp_empty_table); + reply = amqp_get_rpc_reply(my_instance->conn); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + success = 0; + MXS_ERROR("Queue declaration failed."); - - amqp_queue_bind(my_instance->conn,my_instance->channel, - amqp_cstring_bytes(qname), - amqp_cstring_bytes(my_instance->exchange), - amqp_cstring_bytes(my_session->uid), - amqp_empty_table); - reply = amqp_get_rpc_reply(my_instance->conn); - if(reply.reply_type != AMQP_RESPONSE_NORMAL){ - success = 0; - MXS_ERROR("Failed to bind queue to exchange."); - - } - spinlock_release(&my_instance->rconn_lock); - return success; + } + + amqp_queue_bind(my_instance->conn, my_instance->channel, + amqp_cstring_bytes(qname), + amqp_cstring_bytes(my_instance->exchange), + amqp_cstring_bytes(my_session->uid), + amqp_empty_table); + reply = amqp_get_rpc_reply(my_instance->conn); + if (reply.reply_type != AMQP_RESPONSE_NORMAL) + { + success = 0; + MXS_ERROR("Failed to bind queue to exchange."); + + } + spinlock_release(&my_instance->rconn_lock); + return success; } /** @@ -734,93 +826,93 @@ int declareQueue(MQ_INSTANCE *my_instance, MQ_SESSION* my_session, char* qname) */ void sendMessage(void* data) { - MQ_INSTANCE *instance = (MQ_INSTANCE*)data; - mqmessage *tmp; - int err_num; - - spinlock_acquire(&instance->rconn_lock); - if(instance->conn_stat != AMQP_STATUS_OK){ - - if(difftime(time(NULL),instance->last_rconn) > instance->rconn_intv){ - - instance->last_rconn = time(NULL); - - if(init_conn(instance)){ - instance->rconn_intv = 1.0; - instance->conn_stat = AMQP_STATUS_OK; - - }else{ - instance->rconn_intv += 5.0; - MXS_ERROR("Failed to reconnect to the MQRabbit server "); - } - } - err_num = instance->conn_stat; - } - spinlock_release(&instance->rconn_lock); - - if(err_num != AMQP_STATUS_OK) - { - /** No connection to the broker */ - return; - } - - spinlock_acquire(&instance->msg_lock); - tmp = instance->messages; - - if(tmp == NULL) - { - spinlock_release(&instance->msg_lock); - return; - } - - instance->messages = instance->messages->next; - spinlock_release(&instance->msg_lock); - - while(tmp){ - - err_num = amqp_basic_publish(instance->conn,instance->channel, - amqp_cstring_bytes(instance->exchange), - amqp_cstring_bytes(instance->key), - 0,0,tmp->prop,amqp_cstring_bytes(tmp->msg)); + MQ_INSTANCE *instance = (MQ_INSTANCE*) data; + mqmessage *tmp; + int err_num; spinlock_acquire(&instance->rconn_lock); - instance->conn_stat = err_num; - spinlock_release(&instance->rconn_lock); - - if(err_num == AMQP_STATUS_OK){ - /**Message was sent successfully*/ - free(tmp->prop); - free(tmp->msg); - free(tmp); - - atomic_add(&instance->stats.n_sent,1); - atomic_add(&instance->stats.n_queued,-1); - spinlock_acquire(&instance->msg_lock); - tmp = instance->messages; - - if(tmp == NULL) - { - spinlock_release(&instance->msg_lock); - return; - } - - instance->messages = instance->messages->next; - spinlock_release(&instance->msg_lock); - } - else + if (instance->conn_stat != AMQP_STATUS_OK) { - spinlock_acquire(&instance->msg_lock); - tmp->next = instance->messages; - instance->messages = tmp; - spinlock_release(&instance->msg_lock); - return; + if (difftime(time(NULL), instance->last_rconn) > instance->rconn_intv) + { + instance->last_rconn = time(NULL); + + if (init_conn(instance)) + { + instance->rconn_intv = 1.0; + instance->conn_stat = AMQP_STATUS_OK; + } + else + { + instance->rconn_intv += 5.0; + MXS_ERROR("Failed to reconnect to the MQRabbit server "); + } + } + err_num = instance->conn_stat; } - - } + spinlock_release(&instance->rconn_lock); + if (err_num != AMQP_STATUS_OK) + { + /** No connection to the broker */ + return; + } + + spinlock_acquire(&instance->msg_lock); + tmp = instance->messages; + + if (tmp == NULL) + { + spinlock_release(&instance->msg_lock); + return; + } + + instance->messages = instance->messages->next; + spinlock_release(&instance->msg_lock); + + while (tmp) + { + err_num = amqp_basic_publish(instance->conn, instance->channel, + amqp_cstring_bytes(instance->exchange), + amqp_cstring_bytes(instance->key), + 0, 0, tmp->prop, amqp_cstring_bytes(tmp->msg)); + + spinlock_acquire(&instance->rconn_lock); + instance->conn_stat = err_num; + spinlock_release(&instance->rconn_lock); + + if (err_num == AMQP_STATUS_OK) + { + /**Message was sent successfully*/ + free(tmp->prop); + free(tmp->msg); + free(tmp); + + atomic_add(&instance->stats.n_sent, 1); + atomic_add(&instance->stats.n_queued, -1); + spinlock_acquire(&instance->msg_lock); + tmp = instance->messages; + + if (tmp == NULL) + { + spinlock_release(&instance->msg_lock); + return; + } + + instance->messages = instance->messages->next; + spinlock_release(&instance->msg_lock); + } + else + { + spinlock_acquire(&instance->msg_lock); + tmp->next = instance->messages; + instance->messages = tmp; + spinlock_release(&instance->msg_lock); + return; + } + } } - /** * Push a new message on the stack to be broadcasted later. * The message assumes ownership of the memory allocated to the message content and properties. @@ -829,33 +921,32 @@ void sendMessage(void* data) */ void pushMessage(MQ_INSTANCE *instance, amqp_basic_properties_t* prop, char* msg) { - - mqmessage* newmsg = calloc(1,sizeof(mqmessage)); - if(newmsg){ - - newmsg->msg = msg; - newmsg->prop = prop; - - }else{ - MXS_ERROR("Cannot allocate enough memory."); - free(prop); - free(msg); - return; - } - spinlock_acquire(&instance->msg_lock); - - newmsg->next = instance->messages; - instance->messages = newmsg; + mqmessage* newmsg = calloc(1, sizeof(mqmessage)); + if (newmsg) + { + newmsg->msg = msg; + newmsg->prop = prop; + } + else + { + MXS_ERROR("Cannot allocate enough memory."); + free(prop); + free(msg); + return; + } - spinlock_release(&instance->msg_lock); - - atomic_add(&instance->stats.n_msg,1); - atomic_add(&instance->stats.n_queued,1); + spinlock_acquire(&instance->msg_lock); + + newmsg->next = instance->messages; + instance->messages = newmsg; + + spinlock_release(&instance->msg_lock); + + atomic_add(&instance->stats.n_msg, 1); + atomic_add(&instance->stats.n_queued, 1); } - - /** * Associate a new session with this instance of the filter and opens * a connection to the server and prepares the exchange and the queue for use. @@ -865,31 +956,31 @@ void pushMessage(MQ_INSTANCE *instance, amqp_basic_properties_t* prop, char* msg * @param session The session itself * @return Session specific data for this session */ -static void * +static void * newSession(FILTER *instance, SESSION *session) { - MQ_SESSION *my_session; - MYSQL_session* sessauth; + MQ_SESSION *my_session; + MYSQL_session* sessauth; - if ((my_session = calloc(1, sizeof(MQ_SESSION))) != NULL){ - - my_session->was_query = false; - my_session->uid = NULL; - my_session->session = session; - sessauth = my_session->session->data; - if(sessauth->db && strnlen(sessauth->db,128)>0){ - my_session->db = strdup(sessauth->db); - }else{ - my_session->db = NULL; + if ((my_session = calloc(1, sizeof(MQ_SESSION))) != NULL) + { + my_session->was_query = false; + my_session->uid = NULL; + my_session->session = session; + sessauth = my_session->session->data; + if (sessauth->db && strnlen(sessauth->db, 128) > 0) + { + my_session->db = strdup(sessauth->db); + } + else + { + my_session->db = NULL; + } } - - } - return my_session; + return my_session; } - - /** * Close a session with the filter, this is the mechanism * by which a filter may cleanup data structure etc. @@ -898,10 +989,8 @@ newSession(FILTER *instance, SESSION *session) * @param instance The filter instance data * @param session The session being closed */ -static void -closeSession(FILTER *instance, void *session) -{ -} +static void +closeSession(FILTER *instance, void *session){ } /** * Free the memory associated with the session @@ -912,11 +1001,11 @@ closeSession(FILTER *instance, void *session) static void freeSession(FILTER *instance, void *session) { - MQ_SESSION *my_session = (MQ_SESSION *)session; - free(my_session->uid); - free(my_session->db); - free(my_session); - return; + MQ_SESSION *my_session = (MQ_SESSION *) session; + free(my_session->uid); + free(my_session->db); + free(my_session); + return; } /** @@ -924,23 +1013,22 @@ freeSession(FILTER *instance, void *session) * passed from this filter. * * @param instance The filter instance data - * @param session The filter session + * @param session The filter session * @param downstream The downstream filter or router. */ static void setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) { - MQ_SESSION *my_session = (MQ_SESSION *)session; - my_session->down = *downstream; + MQ_SESSION *my_session = (MQ_SESSION *) session; + my_session->down = *downstream; } -static void setUpstream(FILTER *instance, void *session, UPSTREAM *upstream) +static void setUpstream(FILTER *instance, void *session, UPSTREAM *upstream) { - MQ_SESSION *my_session = (MQ_SESSION *)session; - my_session->up = *upstream; + MQ_SESSION *my_session = (MQ_SESSION *) session; + my_session->up = *upstream; } - /** * Generates a unique key using a number of unique unsigned integers. * @param array The array that is used @@ -948,11 +1036,12 @@ static void setUpstream(FILTER *instance, void *session, UPSTREAM *upstream) */ void genkey(char* array, int size) { - int i = 0; - for(i = 0;istart + 4)) == 0x02){ - if(my_session->db){ - free(my_session->db); - } - plen = pktlen(queue->start); - my_session->db = calloc(plen,sizeof(char)); - memcpy(my_session->db,queue->start + 5,plen - 1); - } - - if(modutil_is_SQL(queue)){ - - /**Parse the query*/ - - if (!query_is_parsed(queue)){ - success = parse_query(queue); + /**The user is changing databases*/ + if (*((char*) (queue->start + 4)) == 0x02) + { + if (my_session->db) + { + free(my_session->db); + } + plen = pktlen(queue->start); + my_session->db = calloc(plen, sizeof(char)); + memcpy(my_session->db, queue->start + 5, plen - 1); } - if(!success){ - MXS_ERROR("Parsing query failed."); - goto send_downstream; - } + if (modutil_is_SQL(queue)) + { - if(!my_instance->log_all){ - if(!skygw_is_real_query(queue)){ - goto send_downstream; - } - } + /**Parse the query*/ - if(my_instance->trgtype == TRG_ALL){ - MXS_INFO("Trigger is TRG_ALL"); - schema_ok = true; - src_ok = true; - obj_ok = true; - goto validate_triggers; - } - - if(my_instance->trgtype & TRG_SOURCE && my_instance->src_trg){ - - if(session_isvalid(my_session->session)){ - - sessusr = session_getUser(my_session->session); - sesshost = session_get_remote(my_session->session); - - /**Username was configured*/ - if(my_instance->src_trg->usize > 0){ - for(i = 0;isrc_trg->usize;i++){ + if (!query_is_parsed(queue)) + { + success = parse_query(queue); + } - if(strcmp(my_instance->src_trg->user[i],sessusr) == 0) - { - MXS_INFO("Trigger is TRG_SOURCE: user: %s = %s",my_instance->src_trg->user[i],sessusr); + if (!success) + { + MXS_ERROR("Parsing query failed."); + goto send_downstream; + } + + if (!my_instance->log_all) + { + if (!skygw_is_real_query(queue)) + { + goto send_downstream; + } + } + + if (my_instance->trgtype == TRG_ALL) + { + MXS_INFO("Trigger is TRG_ALL"); + schema_ok = true; + src_ok = true; + obj_ok = true; + goto validate_triggers; + } + + if (my_instance->trgtype & TRG_SOURCE && my_instance->src_trg) + { + if (session_isvalid(my_session->session)) + { + sessusr = session_getUser(my_session->session); + sesshost = session_get_remote(my_session->session); + + /**Username was configured*/ + if (my_instance->src_trg->usize > 0) + { + for (i = 0; i < my_instance->src_trg->usize; i++) + { + if (strcmp(my_instance->src_trg->user[i], sessusr) == 0) + { + MXS_INFO("Trigger is TRG_SOURCE: user: %s = %s", my_instance->src_trg->user[i], sessusr); + src_ok = true; + break; + } + } + } + + /**If username was not matched, try to match hostname*/ + + if (!src_ok && my_instance->src_trg->hsize > 0) + { + + for (i = 0; i < my_instance->src_trg->hsize; i++) + { + + if (strcmp(my_instance->src_trg->host[i], sesshost) == 0) + { + MXS_INFO("Trigger is TRG_SOURCE: host: %s = %s", my_instance->src_trg->host[i], sesshost); + src_ok = true; + break; + } + } + } + } + + if (src_ok && !my_instance->strict_logging) + { + schema_ok = true; + obj_ok = true; + goto validate_triggers; + } + } + else + { + src_ok = true; + } + + if (my_instance->trgtype & TRG_SCHEMA && my_instance->shm_trg) + { + int tbsz = 0, z; + char** tblnames = skygw_get_table_names(queue, &tbsz, true); + char* tmp; + bool all_remotes = true; + + for (z = 0; z < tbsz; z++) + { + if ((tmp = strchr(tblnames[z], '.')) != NULL) + { + char *lasts; + tmp = strtok_r(tblnames[z], ".", &lasts); + for (i = 0; i < my_instance->shm_trg->size; i++) + { + + if (strcmp(tmp, my_instance->shm_trg->objects[i]) == 0) + { + + MXS_INFO("Trigger is TRG_SCHEMA: %s = %s", tmp, my_instance->shm_trg->objects[i]); + + schema_ok = true; + break; + } + } + } + else + { + all_remotes = false; + } + free(tblnames[z]); + } + free(tblnames); + + if (!schema_ok && !all_remotes && my_session->db && strlen(my_session->db) > 0) + { + + for (i = 0; i < my_instance->shm_trg->size; i++) + { + + if (strcmp(my_session->db, my_instance->shm_trg->objects[i]) == 0) + { + + MXS_INFO("Trigger is TRG_SCHEMA: %s = %s", my_session->db, my_instance->shm_trg->objects[i]); + + schema_ok = true; + break; + } + } + } + + if (schema_ok && !my_instance->strict_logging) + { src_ok = true; - break; - } - - } + obj_ok = true; + goto validate_triggers; + } - - } + } + else + { + schema_ok = true; + } - /**If username was not matched, try to match hostname*/ - if(!src_ok && my_instance->src_trg->hsize > 0){ + if (my_instance->trgtype & TRG_OBJECT && my_instance->obj_trg) + { - for(i = 0;isrc_trg->hsize;i++){ - - if(strcmp(my_instance->src_trg->host[i],sesshost) == 0) - { - MXS_INFO("Trigger is TRG_SOURCE: host: %s = %s",my_instance->src_trg->host[i],sesshost); - src_ok = true; - break; - } - - } + sesstbls = skygw_get_table_names(queue, &dbcount, false); - } + for (j = 0; j < dbcount; j++) + { + char* tbnm = NULL; - } + if ((strchr(sesstbls[j], '.')) != NULL) + { + char *lasts; + tbnm = strtok_r(sesstbls[j], ".", &lasts); + tbnm = strtok_r(NULL, ".", &lasts); + } + else + { + tbnm = sesstbls[j]; + } - if(src_ok && !my_instance->strict_logging){ - schema_ok = true; - obj_ok = true; - goto validate_triggers; - } - }else{ - src_ok = true; + for (i = 0; i < my_instance->obj_trg->size; i++) + { + + + if (!strcmp(tbnm, my_instance->obj_trg->objects[i])) + { + obj_ok = true; + MXS_INFO("Trigger is TRG_OBJECT: %s = %s", my_instance->obj_trg->objects[i], sesstbls[j]); + break; + } + + } + + } + if (dbcount > 0) + { + for (j = 0; j < dbcount; j++) + { + free(sesstbls[j]); + } + free(sesstbls); + dbcount = 0; + } + + if (obj_ok && !my_instance->strict_logging) + { + src_ok = true; + schema_ok = true; + goto validate_triggers; + } + + } + else + { + obj_ok = true; + } + + +validate_triggers: + + if (src_ok && schema_ok && obj_ok) + { + + /** + * Something matched the trigger, log the query + */ + + MXS_INFO("Routing message to: %s:%d %s as %s/%s, exchange: %s<%s> key:%s queue:%s", + my_instance->hostname, my_instance->port, + my_instance->vhost, my_instance->username, + my_instance->password, my_instance->exchange, + my_instance->exchange_type, my_instance->key, + my_instance->queue); + + if (my_session->uid == NULL) + { + + my_session->uid = calloc(33, sizeof(char)); + + if (!my_session->uid) + { + MXS_ERROR("Out of memory."); + } + else + { + genkey(my_session->uid, 32); + } + + } + + if (queue->next != NULL) + { + queue = gwbuf_make_contiguous(queue); + } + + if (modutil_extract_SQL(queue, &ptr, &length)) + { + + my_session->was_query = true; + + if ((prop = malloc(sizeof(amqp_basic_properties_t)))) + { + prop->_flags = AMQP_BASIC_CONTENT_TYPE_FLAG | + AMQP_BASIC_DELIVERY_MODE_FLAG | + AMQP_BASIC_MESSAGE_ID_FLAG | + AMQP_BASIC_CORRELATION_ID_FLAG; + prop->content_type = amqp_cstring_bytes("text/plain"); + prop->delivery_mode = AMQP_DELIVERY_PERSISTENT; + prop->correlation_id = amqp_cstring_bytes(my_session->uid); + prop->message_id = amqp_cstring_bytes("query"); + } + + + + if (success) + { + + /**Try to convert to a canonical form and use the plain query if unsuccessful*/ + if ((canon_q = skygw_get_canonical(queue)) == NULL) + { + MXS_ERROR("Cannot form canonical query."); + } + + } + + memset(t_buf, 0, 128); + sprintf(t_buf, "%lu|", (unsigned long) time(NULL)); + + int qlen = strnlen(canon_q, length) + strnlen(t_buf, 128); + if ((combined = malloc((qlen + 1) * sizeof(char))) == NULL) + { + MXS_ERROR("Out of memory"); + } + strcpy(combined, t_buf); + strncat(combined, canon_q, length); + + pushMessage(my_instance, prop, combined); + free(canon_q); + } + + } + + /** Pass the query downstream */ } - - - - if(my_instance->trgtype & TRG_SCHEMA && my_instance->shm_trg){ - int tbsz = 0,z; - char** tblnames = skygw_get_table_names(queue,&tbsz,true); - char* tmp; - bool all_remotes = true; - - for(z = 0;zshm_trg->size; i++){ - - if(strcmp(tmp,my_instance->shm_trg->objects[i]) == 0){ - - MXS_INFO("Trigger is TRG_SCHEMA: %s = %s",tmp,my_instance->shm_trg->objects[i]); - - schema_ok = true; - break; - } - } - }else{ - all_remotes = false; - } - free(tblnames[z]); - } - free(tblnames); - - if(!schema_ok && !all_remotes && my_session->db && strlen(my_session->db)>0){ - - for(i = 0; ishm_trg->size; i++){ - - if(strcmp(my_session->db,my_instance->shm_trg->objects[i]) == 0){ - - MXS_INFO("Trigger is TRG_SCHEMA: %s = %s",my_session->db,my_instance->shm_trg->objects[i]); - - schema_ok = true; - break; - } - } - } - - if(schema_ok && !my_instance->strict_logging){ - src_ok = true; - obj_ok = true; - goto validate_triggers; - } - - }else{ - schema_ok = true; - } - - - if(my_instance->trgtype & TRG_OBJECT && my_instance->obj_trg){ - - sesstbls = skygw_get_table_names(queue,&dbcount,false); - - for(j = 0; jobj_trg->size; i++){ - - - if(!strcmp(tbnm,my_instance->obj_trg->objects[i])){ - obj_ok = true; - MXS_INFO("Trigger is TRG_OBJECT: %s = %s",my_instance->obj_trg->objects[i],sesstbls[j]); - break; - } - - } - - } - if(dbcount > 0){ - for(j = 0; jstrict_logging){ - src_ok = true; - schema_ok = true; - goto validate_triggers; - } - - }else{ - obj_ok = true; - } - - - validate_triggers: - - if(src_ok&&schema_ok&&obj_ok){ - - /** - * Something matched the trigger, log the query - */ - - MXS_INFO("Routing message to: %s:%d %s as %s/%s, exchange: %s<%s> key:%s queue:%s", - my_instance->hostname,my_instance->port, - my_instance->vhost,my_instance->username, - my_instance->password,my_instance->exchange, - my_instance->exchange_type,my_instance->key, - my_instance->queue); - - if(my_session->uid == NULL){ - - my_session->uid = calloc(33,sizeof(char)); - - if(!my_session->uid){ - MXS_ERROR("Out of memory."); - }else{ - genkey(my_session->uid,32); - } - - } - - if (queue->next != NULL) - { - queue = gwbuf_make_contiguous(queue); - } - - if(modutil_extract_SQL(queue, &ptr, &length)){ - - my_session->was_query = true; - - if((prop = malloc(sizeof(amqp_basic_properties_t)))){ - prop->_flags = AMQP_BASIC_CONTENT_TYPE_FLAG | - AMQP_BASIC_DELIVERY_MODE_FLAG | - AMQP_BASIC_MESSAGE_ID_FLAG | - AMQP_BASIC_CORRELATION_ID_FLAG; - prop->content_type = amqp_cstring_bytes("text/plain"); - prop->delivery_mode = AMQP_DELIVERY_PERSISTENT; - prop->correlation_id = amqp_cstring_bytes(my_session->uid); - prop->message_id = amqp_cstring_bytes("query"); - } - - - - if(success){ - - /**Try to convert to a canonical form and use the plain query if unsuccessful*/ - if((canon_q = skygw_get_canonical(queue)) == NULL){ - MXS_ERROR("Cannot form canonical query."); - } - - } - - memset(t_buf,0,128); - sprintf(t_buf, "%lu|",(unsigned long)time(NULL)); - - int qlen = strnlen(canon_q,length) + strnlen(t_buf,128); - if((combined = malloc((qlen+1)*sizeof(char))) == NULL){ - MXS_ERROR("Out of memory"); - } - strcpy(combined,t_buf); - strncat(combined,canon_q,length); - - pushMessage(my_instance,prop,combined); - free(canon_q); - } - - } - - /** Pass the query downstream */ - } - send_downstream: - return my_session->down.routeQuery(my_session->down.instance, - my_session->down.session, queue); +send_downstream: + return my_session->down.routeQuery(my_session->down.instance, + my_session->down.session, queue); } /** @@ -1276,49 +1405,61 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue) */ unsigned int leitoi(unsigned char* c) { - unsigned char* ptr = c; - unsigned int sz = *ptr; - if(*ptr < 0xfb) return sz; - if(*ptr == 0xfc){ - sz = *++ptr; - sz += (*++ptr << 8); - }else if(*ptr == 0xfd){ - sz = *++ptr; - sz += (*++ptr << 8); - sz += (*++ptr << 8); - }else{ - sz = *++ptr; - sz += (*++ptr << 8); - sz += (*++ptr << 8); - sz += (*++ptr << 8); - sz += (*++ptr << 8); - sz += (*++ptr << 8); - sz += (*++ptr << 8); - sz += (*++ptr << 8); - sz += (*++ptr << 8); - } - return sz; + unsigned char* ptr = c; + unsigned int sz = *ptr; + if (*ptr < 0xfb) return sz; + if (*ptr == 0xfc) + { + sz = *++ptr; + sz += (*++ptr << 8); + } + else if (*ptr == 0xfd) + { + sz = *++ptr; + sz += (*++ptr << 8); + sz += (*++ptr << 8); + } + else + { + sz = *++ptr; + sz += (*++ptr << 8); + sz += (*++ptr << 8); + sz += (*++ptr << 8); + sz += (*++ptr << 8); + sz += (*++ptr << 8); + sz += (*++ptr << 8); + sz += (*++ptr << 8); + sz += (*++ptr << 8); + } + return sz; } /** - * Converts a length-encoded integer into a standard unsigned integer + * Converts a length-encoded integer into a standard unsigned integer * and advances the pointer to the next unrelated byte. * * @param c Pointer to the first byte of a length-encoded integer */ unsigned int consume_leitoi(unsigned char** c) { - unsigned int rval = leitoi(*c); - if(**c == 0xfc){ - *c += 3; - }else if(**c == 0xfd){ - *c += 4; - }else if(**c == 0xfe){ - *c += 9; - }else{ - *c += 1; - } - return rval; + unsigned int rval = leitoi(*c); + if (**c == 0xfc) + { + *c += 3; + } + else if (**c == 0xfd) + { + *c += 4; + } + else if (**c == 0xfe) + { + *c += 9; + } + else + { + *c += 1; + } + return rval; } /** @@ -1329,13 +1470,14 @@ unsigned int consume_leitoi(unsigned char** c) */ char* consume_lestr(unsigned char** c) { - unsigned int slen = consume_leitoi(c); - char *str = calloc((slen + 1), sizeof(char)); - if(str){ - memcpy(str,*c,slen); - *c += slen; - } - return str; + unsigned int slen = consume_leitoi(c); + char *str = calloc((slen + 1), sizeof(char)); + if (str) + { + memcpy(str, *c, slen); + *c += slen; + } + return str; } /** @@ -1345,11 +1487,10 @@ char* consume_lestr(unsigned char** c) */ unsigned int is_eof(void* p) { - unsigned char* ptr = (unsigned char*) p; - return *(ptr) == 0x05 && *(ptr + 1) == 0x00 && *(ptr + 2) == 0x00 && *(ptr + 4) == 0xfe; + unsigned char* ptr = (unsigned char*) p; + return *(ptr) == 0x05 && *(ptr + 1) == 0x00 && *(ptr + 2) == 0x00 && *(ptr + 4) == 0xfe; } - /** * The clientReply entry point. This is passed the response buffer * to which the filter should be applied. Once processed the @@ -1359,127 +1500,142 @@ unsigned int is_eof(void* p) * The function tries to extract a SQL query response out of the response buffer, * adds a timestamp to it and publishes the resulting string on the exchange. * The message is tagged with the same identifier that the query was. - * + * * @param instance The filter instance data * @param session The filter session * @param reply The response data */ static int clientReply(FILTER* instance, void *session, GWBUF *reply) { - MQ_SESSION *my_session = (MQ_SESSION *)session; - MQ_INSTANCE *my_instance = (MQ_INSTANCE *)instance; - char t_buf[128],*combined; - unsigned int pkt_len = pktlen(reply->sbuf->data), offset = 0; - amqp_basic_properties_t *prop; + MQ_SESSION *my_session = (MQ_SESSION *) session; + MQ_INSTANCE *my_instance = (MQ_INSTANCE *) instance; + char t_buf[128], *combined; + unsigned int pkt_len = pktlen(reply->sbuf->data), offset = 0; + amqp_basic_properties_t *prop; - if (my_session->was_query){ + if (my_session->was_query) + { - int packet_ok = 0, was_last = 0; + int packet_ok = 0, was_last = 0; - my_session->was_query = false; + my_session->was_query = false; - if(pkt_len > 0){ - if((prop = malloc(sizeof(amqp_basic_properties_t)))){ - prop->_flags = AMQP_BASIC_CONTENT_TYPE_FLAG | - AMQP_BASIC_DELIVERY_MODE_FLAG | - AMQP_BASIC_MESSAGE_ID_FLAG | - AMQP_BASIC_CORRELATION_ID_FLAG; - prop->content_type = amqp_cstring_bytes("text/plain"); - prop->delivery_mode = AMQP_DELIVERY_PERSISTENT; - prop->correlation_id = amqp_cstring_bytes(my_session->uid); - prop->message_id = amqp_cstring_bytes("reply"); - } - if(!(combined = calloc(GWBUF_LENGTH(reply) + 256,sizeof(char)))){ - MXS_ERROR("Out of memory"); - } + if (pkt_len > 0) + { + if ((prop = malloc(sizeof(amqp_basic_properties_t)))) + { + prop->_flags = AMQP_BASIC_CONTENT_TYPE_FLAG | + AMQP_BASIC_DELIVERY_MODE_FLAG | + AMQP_BASIC_MESSAGE_ID_FLAG | + AMQP_BASIC_CORRELATION_ID_FLAG; + prop->content_type = amqp_cstring_bytes("text/plain"); + prop->delivery_mode = AMQP_DELIVERY_PERSISTENT; + prop->correlation_id = amqp_cstring_bytes(my_session->uid); + prop->message_id = amqp_cstring_bytes("reply"); + } + if (!(combined = calloc(GWBUF_LENGTH(reply) + 256, sizeof(char)))) + { + MXS_ERROR("Out of memory"); + } - memset(t_buf,0,128); - sprintf(t_buf,"%lu|",(unsigned long)time(NULL)); - - - memcpy(combined + offset,t_buf,strnlen(t_buf,40)); - offset += strnlen(t_buf,40); + memset(t_buf, 0, 128); + sprintf(t_buf, "%lu|", (unsigned long) time(NULL)); - if(*(reply->sbuf->data + 4) == 0x00){ /**OK packet*/ - unsigned int aff_rows = 0, l_id = 0, s_flg = 0, wrn = 0; - unsigned char *ptr = (unsigned char*)(reply->sbuf->data + 5); - pkt_len = pktlen(reply->sbuf->data); - aff_rows = consume_leitoi(&ptr); - l_id = consume_leitoi(&ptr); - s_flg |= *ptr++; - s_flg |= (*ptr++ << 8); - wrn |= *ptr++; - wrn |= (*ptr++ << 8); - sprintf(combined + offset,"OK - affected_rows: %d " - " last_insert_id: %d " - " status_flags: %#0x " - " warnings: %d ", - aff_rows,l_id,s_flg,wrn); - offset += strnlen(combined,GWBUF_LENGTH(reply) + 256) - offset; - if(pkt_len > 7){ - int plen = consume_leitoi(&ptr); - if(plen > 0){ - sprintf(combined + offset," message: %.*s\n",plen,ptr); - } - } + memcpy(combined + offset, t_buf, strnlen(t_buf, 40)); + offset += strnlen(t_buf, 40); - packet_ok = 1; - was_last = 1; + if (*(reply->sbuf->data + 4) == 0x00) + { /**OK packet*/ + unsigned int aff_rows = 0, l_id = 0, s_flg = 0, wrn = 0; + unsigned char *ptr = (unsigned char*) (reply->sbuf->data + 5); + pkt_len = pktlen(reply->sbuf->data); + aff_rows = consume_leitoi(&ptr); + l_id = consume_leitoi(&ptr); + s_flg |= *ptr++; + s_flg |= (*ptr++ << 8); + wrn |= *ptr++; + wrn |= (*ptr++ << 8); + sprintf(combined + offset, "OK - affected_rows: %d " + " last_insert_id: %d " + " status_flags: %#0x " + " warnings: %d ", + aff_rows, l_id, s_flg, wrn); + offset += strnlen(combined, GWBUF_LENGTH(reply) + 256) - offset; - }else if(*(reply->sbuf->data + 4) == 0xff){ /**ERR packet*/ + if (pkt_len > 7) + { + int plen = consume_leitoi(&ptr); + if (plen > 0) + { + sprintf(combined + offset, " message: %.*s\n", plen, ptr); + } + } - sprintf(combined + offset,"ERROR - message: %.*s", - (int)(reply->end - ((void*)(reply->sbuf->data + 13))), - (char *)reply->sbuf->data + 13); - packet_ok = 1; - was_last = 1; - - }else if(*(reply->sbuf->data + 4) == 0xfb){ /**LOCAL_INFILE request packet*/ - - unsigned char *rset = (unsigned char*)reply->sbuf->data; - strcpy(combined + offset,"LOCAL_INFILE: "); - strncat(combined + offset,(const char*)rset+5,pktlen(rset)); - packet_ok = 1; - was_last = 1; - - }else{ /**Result set*/ - - unsigned char *rset = (unsigned char*)(reply->sbuf->data + 4); - char *tmp; - unsigned int col_cnt = consume_leitoi(&rset); + packet_ok = 1; + was_last = 1; - tmp = calloc(256,sizeof(char)); - sprintf(tmp,"Columns: %d",col_cnt); - memcpy(combined + offset,tmp,strnlen(tmp,256)); - offset += strnlen(tmp,256); - memcpy(combined + offset,"\n",1); - offset++; - free(tmp); - - packet_ok = 1; - was_last = 1; - - } - if(packet_ok){ + } + else if (*(reply->sbuf->data + 4) == 0xff) + { /**ERR packet*/ - pushMessage(my_instance,prop,combined); + sprintf(combined + offset, "ERROR - message: %.*s", + (int) (reply->end - ((void*) (reply->sbuf->data + 13))), + (char *) reply->sbuf->data + 13); + packet_ok = 1; + was_last = 1; - if(was_last){ + } + else if (*(reply->sbuf->data + 4) == 0xfb) + { /**LOCAL_INFILE request packet*/ - /**Successful reply received and sent, releasing uid*/ - - free(my_session->uid); - my_session->uid = NULL; + unsigned char *rset = (unsigned char*) reply->sbuf->data; + strcpy(combined + offset, "LOCAL_INFILE: "); + strncat(combined + offset, (const char*) rset + 5, pktlen(rset)); + packet_ok = 1; + was_last = 1; + + } + else + { /**Result set*/ + + unsigned char *rset = (unsigned char*) (reply->sbuf->data + 4); + char *tmp; + unsigned int col_cnt = consume_leitoi(&rset); + + tmp = calloc(256, sizeof(char)); + sprintf(tmp, "Columns: %d", col_cnt); + memcpy(combined + offset, tmp, strnlen(tmp, 256)); + offset += strnlen(tmp, 256); + memcpy(combined + offset, "\n", 1); + offset++; + free(tmp); + + packet_ok = 1; + was_last = 1; + + } + if (packet_ok) + { + + pushMessage(my_instance, prop, combined); + + if (was_last) + { + + /**Successful reply received and sent, releasing uid*/ + + free(my_session->uid); + my_session->uid = NULL; + + } + } + } - } - } } - } - - return my_session->up.clientReply(my_session->up.instance, - my_session->up.session, reply); + return my_session->up.clientReply(my_session->up.instance, + my_session->up.session, reply); } /** @@ -1492,25 +1648,24 @@ static int clientReply(FILTER* instance, void *session, GWBUF *reply) * @param fsession Filter session, may be NULL * @param dcb The DCB for diagnostic output */ -static void +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb) { - MQ_INSTANCE *my_instance = (MQ_INSTANCE *)instance; + MQ_INSTANCE *my_instance = (MQ_INSTANCE *) instance; - if (my_instance) + if (my_instance) { - dcb_printf(dcb, "Connecting to %s:%d as '%s'.\nVhost: %s\tExchange: %s\nKey: %s\tQueue: %s\n\n", - my_instance->hostname,my_instance->port, - my_instance->username, - my_instance->vhost, my_instance->exchange, - my_instance->key, my_instance->queue - ); - dcb_printf(dcb, "%-16s%-16s%-16s\n", - "Messages","Queued","Sent"); - dcb_printf(dcb, "%-16d%-16d%-16d\n", - my_instance->stats.n_msg, - my_instance->stats.n_queued, - my_instance->stats.n_sent); + dcb_printf(dcb, "Connecting to %s:%d as '%s'.\nVhost: %s\tExchange: %s\nKey: %s\tQueue: %s\n\n", + my_instance->hostname, my_instance->port, + my_instance->username, + my_instance->vhost, my_instance->exchange, + my_instance->key, my_instance->queue + ); + dcb_printf(dcb, "%-16s%-16s%-16s\n", + "Messages", "Queued", "Sent"); + dcb_printf(dcb, "%-16d%-16d%-16d\n", + my_instance->stats.n_msg, + my_instance->stats.n_queued, + my_instance->stats.n_sent); } } -