Make lines of mqfilter.c less that 110 characters long.
Untabified as well.
This commit is contained in:
@ -29,32 +29,35 @@
|
|||||||
* The filter makes no attempt to deal with queries that do not fit
|
* The filter makes no attempt to deal with queries that do not fit
|
||||||
* in a single GWBUF or result sets that span multiple GWBUFs.
|
* in a single GWBUF or result sets that span multiple GWBUFs.
|
||||||
*
|
*
|
||||||
* To use a SSL connection the CA certificate, the client certificate and the client public key must be provided.
|
* To use a SSL connection the CA certificate, the client certificate and the client public
|
||||||
|
* key must be provided.
|
||||||
* By default this filter uses a TCP connection.
|
* By default this filter uses a TCP connection.
|
||||||
*@verbatim
|
*@verbatim
|
||||||
* The options for this filter are:
|
* The options for this filter are:
|
||||||
*
|
*
|
||||||
* logging_trigger Set the logging level
|
* logging_trigger Set the logging level
|
||||||
* logging_strict Sets whether to trigger when any of the parameters match or only if all parameters match
|
* logging_strict Sets whether to trigger when any of the parameters match or only if
|
||||||
* logging_log_all Log only SELECT, UPDATE, DELETE and INSERT or all possible queries
|
* all parameters match
|
||||||
* hostname The server hostname where the messages are sent
|
* logging_log_all Log only SELECT, UPDATE, DELETE and INSERT or all possible queries
|
||||||
* port Port to send the messages to
|
* hostname The server hostname where the messages are sent
|
||||||
* username Server login username
|
* port Port to send the messages to
|
||||||
* password Server login password
|
* username Server login username
|
||||||
* vhost The virtual host location on the server, where the messages are sent
|
* password Server login password
|
||||||
* exchange The name of the exchange
|
* vhost The virtual host location on the server, where the messages are sent
|
||||||
* exchange_type The type of the exchange, defaults to direct
|
* exchange The name of the exchange
|
||||||
* key The routing key used when sending messages to the exchange
|
* exchange_type The type of the exchange, defaults to direct
|
||||||
* queue The queue that will be bound to the used exchange
|
* key The routing key used when sending messages to the exchange
|
||||||
* ssl_CA_cert Path to the CA certificate in PEM format
|
* queue The queue that will be bound to the used exchange
|
||||||
* ssl_client_cert Path to the client cerificate in PEM format
|
* ssl_CA_cert Path to the CA certificate in PEM format
|
||||||
* ssl_client_key Path to the client public key in PEM format
|
* ssl_client_cert Path to the client cerificate in PEM format
|
||||||
|
* ssl_client_key Path to the client public key in PEM format
|
||||||
*
|
*
|
||||||
* The logging trigger levels are:
|
* The logging trigger levels are:
|
||||||
* all Log everything
|
* all Log everything
|
||||||
* source Trigger on statements originating from a particular source (database user and host combination)
|
* source Trigger on statements originating from a particular source (database user and
|
||||||
* schema Trigger on a certain schema
|
* host combination)
|
||||||
* object Trigger on a particular database object (table or view)
|
* schema Trigger on a certain schema
|
||||||
|
* object Trigger on a particular database object (table or view)
|
||||||
*@endverbatim
|
*@endverbatim
|
||||||
* See the individual struct documentations for logging trigger parameters
|
* See the individual struct documentations for logging trigger parameters
|
||||||
*/
|
*/
|
||||||
@ -146,8 +149,8 @@ enum log_trigger_t
|
|||||||
* Both options allow multiple values separated by a ','.
|
* Both options allow multiple values separated by a ','.
|
||||||
* @verbatim
|
* @verbatim
|
||||||
* Trigger options:
|
* Trigger options:
|
||||||
* logging_source_user Comma-separated list of usernames to log
|
* logging_source_user Comma-separated list of usernames to log
|
||||||
* logging_source_host Comma-separated list of hostnames to log
|
* logging_source_host Comma-separated list of hostnames to log
|
||||||
* @endverbatim
|
* @endverbatim
|
||||||
*/
|
*/
|
||||||
typedef struct source_trigger_t
|
typedef struct source_trigger_t
|
||||||
@ -164,7 +167,7 @@ typedef struct source_trigger_t
|
|||||||
* Log only those queries that target a specific database.
|
* Log only those queries that target a specific database.
|
||||||
*
|
*
|
||||||
* Trigger options:
|
* Trigger options:
|
||||||
* logging_schema Comma-separated list of databases
|
* logging_schema Comma-separated list of databases
|
||||||
*/
|
*/
|
||||||
typedef struct schema_trigger_t
|
typedef struct schema_trigger_t
|
||||||
{
|
{
|
||||||
@ -178,7 +181,7 @@ typedef struct schema_trigger_t
|
|||||||
* Log only those queries that target specific database objects.
|
* Log only those queries that target specific database objects.
|
||||||
*@verbatim
|
*@verbatim
|
||||||
* Trigger options:
|
* Trigger options:
|
||||||
* logging_object Comma-separated list of database objects
|
* logging_object Comma-separated list of database objects
|
||||||
*@endverbatim
|
*@endverbatim
|
||||||
*/
|
*/
|
||||||
typedef struct object_trigger_t
|
typedef struct object_trigger_t
|
||||||
@ -313,8 +316,8 @@ init_conn(MQ_INSTANCE *my_instance)
|
|||||||
|
|
||||||
if ((my_instance->sock = amqp_ssl_socket_new(my_instance->conn)) != NULL)
|
if ((my_instance->sock = amqp_ssl_socket_new(my_instance->conn)) != NULL)
|
||||||
{
|
{
|
||||||
|
amqp_ok = amqp_ssl_socket_set_cacert(my_instance->sock, my_instance->ssl_CA_cert);
|
||||||
if ((amqp_ok = amqp_ssl_socket_set_cacert(my_instance->sock, my_instance->ssl_CA_cert)) != AMQP_STATUS_OK)
|
if (amqp_ok != AMQP_STATUS_OK)
|
||||||
{
|
{
|
||||||
MXS_ERROR("Failed to set CA certificate: %s", amqp_error_string2(amqp_ok));
|
MXS_ERROR("Failed to set CA certificate: %s", amqp_error_string2(amqp_ok));
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
@ -345,13 +348,15 @@ init_conn(MQ_INSTANCE *my_instance)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**Socket creation was successful, trying to open the socket*/
|
/**Socket creation was successful, trying to open the socket*/
|
||||||
if ((amqp_ok = amqp_socket_open(my_instance->sock, my_instance->hostname, my_instance->port)) != AMQP_STATUS_OK)
|
amqp_ok = amqp_socket_open(my_instance->sock, my_instance->hostname, my_instance->port);
|
||||||
|
if (amqp_ok != AMQP_STATUS_OK)
|
||||||
{
|
{
|
||||||
MXS_ERROR("Failed to open socket: %s", amqp_error_string2(amqp_ok));
|
MXS_ERROR("Failed to open socket: %s", amqp_error_string2(amqp_ok));
|
||||||
goto cleanup;
|
goto cleanup;
|
||||||
}
|
}
|
||||||
amqp_rpc_reply_t reply;
|
amqp_rpc_reply_t reply;
|
||||||
reply = amqp_login(my_instance->conn, my_instance->vhost, 0, AMQP_DEFAULT_FRAME_SIZE, 0, AMQP_SASL_METHOD_PLAIN, my_instance->username, my_instance->password);
|
reply = amqp_login(my_instance->conn, my_instance->vhost, 0, AMQP_DEFAULT_FRAME_SIZE, 0,
|
||||||
|
AMQP_SASL_METHOD_PLAIN, my_instance->username, my_instance->password);
|
||||||
if (reply.reply_type != AMQP_RESPONSE_NORMAL)
|
if (reply.reply_type != AMQP_RESPONSE_NORMAL)
|
||||||
{
|
{
|
||||||
MXS_ERROR("Login to RabbitMQ server failed.");
|
MXS_ERROR("Login to RabbitMQ server failed.");
|
||||||
@ -383,17 +388,20 @@ init_conn(MQ_INSTANCE *my_instance)
|
|||||||
{
|
{
|
||||||
if (reply.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
|
if (reply.reply.id == AMQP_CHANNEL_CLOSE_METHOD)
|
||||||
{
|
{
|
||||||
amqp_send_method(my_instance->conn, my_instance->channel, AMQP_CHANNEL_CLOSE_OK_METHOD, NULL);
|
amqp_send_method(my_instance->conn, my_instance->channel,
|
||||||
|
AMQP_CHANNEL_CLOSE_OK_METHOD, NULL);
|
||||||
}
|
}
|
||||||
else if (reply.reply.id == AMQP_CONNECTION_CLOSE_METHOD)
|
else if (reply.reply.id == AMQP_CONNECTION_CLOSE_METHOD)
|
||||||
{
|
{
|
||||||
amqp_send_method(my_instance->conn, my_instance->channel, AMQP_CONNECTION_CLOSE_OK_METHOD, NULL);
|
amqp_send_method(my_instance->conn, my_instance->channel,
|
||||||
|
AMQP_CONNECTION_CLOSE_OK_METHOD, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
my_instance->channel++;
|
my_instance->channel++;
|
||||||
amqp_channel_open(my_instance->conn, my_instance->channel);
|
amqp_channel_open(my_instance->conn, my_instance->channel);
|
||||||
|
|
||||||
amqp_exchange_delete(my_instance->conn, my_instance->channel, amqp_cstring_bytes(my_instance->exchange), 0);
|
amqp_exchange_delete(my_instance->conn, my_instance->channel,
|
||||||
|
amqp_cstring_bytes(my_instance->exchange), 0);
|
||||||
amqp_exchange_declare(my_instance->conn, my_instance->channel,
|
amqp_exchange_declare(my_instance->conn, my_instance->channel,
|
||||||
amqp_cstring_bytes(my_instance->exchange),
|
amqp_cstring_bytes(my_instance->exchange),
|
||||||
amqp_cstring_bytes(my_instance->exchange_type),
|
amqp_cstring_bytes(my_instance->exchange_type),
|
||||||
@ -492,7 +500,7 @@ char** parse_optstr(char* str, char* tok, int* szstore)
|
|||||||
* Create an instance of the filter for a particular service
|
* Create an instance of the filter for a particular service
|
||||||
* within MaxScale.
|
* within MaxScale.
|
||||||
*
|
*
|
||||||
* @param options The options for this filter
|
* @param options The options for this filter
|
||||||
*
|
*
|
||||||
* @return The instance data for this new instance
|
* @return The instance data for this new instance
|
||||||
*/
|
*/
|
||||||
@ -775,7 +783,8 @@ createInstance(char **options, FILTER_PARAMETER **params)
|
|||||||
|
|
||||||
if (my_instance->use_ssl)
|
if (my_instance->use_ssl)
|
||||||
{
|
{
|
||||||
amqp_set_initialize_ssl_library(0); /**Assume the underlying SSL library is already initialized*/
|
/**Assume the underlying SSL library is already initialized*/
|
||||||
|
amqp_set_initialize_ssl_library(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**Connect to the server*/
|
/**Connect to the server*/
|
||||||
@ -968,8 +977,8 @@ void pushMessage(MQ_INSTANCE *instance, amqp_basic_properties_t* prop, char* msg
|
|||||||
* a connection to the server and prepares the exchange and the queue for use.
|
* a connection to the server and prepares the exchange and the queue for use.
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
* @param instance The filter instance data
|
* @param instance The filter instance data
|
||||||
* @param session The session itself
|
* @param session The session itself
|
||||||
* @return Session specific data for this session
|
* @return Session specific data for this session
|
||||||
*/
|
*/
|
||||||
static void *
|
static void *
|
||||||
@ -1002,8 +1011,8 @@ newSession(FILTER *instance, SESSION *session)
|
|||||||
* by which a filter may cleanup data structure etc.
|
* by which a filter may cleanup data structure etc.
|
||||||
* In the case of the MQ filter we do nothing.
|
* In the case of the MQ filter we do nothing.
|
||||||
*
|
*
|
||||||
* @param instance The filter instance data
|
* @param instance The filter instance data
|
||||||
* @param session The session being closed
|
* @param session The session being closed
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
closeSession(FILTER *instance, void *session){ }
|
closeSession(FILTER *instance, void *session){ }
|
||||||
@ -1011,8 +1020,8 @@ closeSession(FILTER *instance, void *session){ }
|
|||||||
/**
|
/**
|
||||||
* Free the memory associated with the session
|
* Free the memory associated with the session
|
||||||
*
|
*
|
||||||
* @param instance The filter instance
|
* @param instance The filter instance
|
||||||
* @param session The filter session
|
* @param session The filter session
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
freeSession(FILTER *instance, void *session)
|
freeSession(FILTER *instance, void *session)
|
||||||
@ -1028,9 +1037,9 @@ freeSession(FILTER *instance, void *session)
|
|||||||
* Set the downstream filter or router to which queries will be
|
* Set the downstream filter or router to which queries will be
|
||||||
* passed from this filter.
|
* passed from this filter.
|
||||||
*
|
*
|
||||||
* @param instance The filter instance data
|
* @param instance The filter instance data
|
||||||
* @param session The filter session
|
* @param session The filter session
|
||||||
* @param downstream The downstream filter or router.
|
* @param downstream The downstream filter or router.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
|
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
|
||||||
@ -1086,9 +1095,9 @@ unsigned int pktlen(void* c)
|
|||||||
* The message is tagged with an unique identifier and the clientReply will
|
* The message is tagged with an unique identifier and the clientReply will
|
||||||
* use the same identifier for the reply from the backend to form a query-reply pair.
|
* use the same identifier for the reply from the backend to form a query-reply pair.
|
||||||
*
|
*
|
||||||
* @param instance The filter instance data
|
* @param instance The filter instance data
|
||||||
* @param session The filter session
|
* @param session The filter session
|
||||||
* @param queue The query data
|
* @param queue The query data
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
||||||
@ -1161,7 +1170,8 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
|||||||
{
|
{
|
||||||
if (strcmp(my_instance->src_trg->user[i], sessusr) == 0)
|
if (strcmp(my_instance->src_trg->user[i], sessusr) == 0)
|
||||||
{
|
{
|
||||||
MXS_INFO("Trigger is TRG_SOURCE: user: %s = %s", my_instance->src_trg->user[i], sessusr);
|
MXS_INFO("Trigger is TRG_SOURCE: user: %s = %s",
|
||||||
|
my_instance->src_trg->user[i], sessusr);
|
||||||
src_ok = true;
|
src_ok = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -1178,7 +1188,8 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
|||||||
|
|
||||||
if (strcmp(my_instance->src_trg->host[i], sesshost) == 0)
|
if (strcmp(my_instance->src_trg->host[i], sesshost) == 0)
|
||||||
{
|
{
|
||||||
MXS_INFO("Trigger is TRG_SOURCE: host: %s = %s", my_instance->src_trg->host[i], sesshost);
|
MXS_INFO("Trigger is TRG_SOURCE: host: %s = %s",
|
||||||
|
my_instance->src_trg->host[i], sesshost);
|
||||||
src_ok = true;
|
src_ok = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -1217,7 +1228,8 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
|||||||
if (strcmp(tmp, my_instance->shm_trg->objects[i]) == 0)
|
if (strcmp(tmp, my_instance->shm_trg->objects[i]) == 0)
|
||||||
{
|
{
|
||||||
|
|
||||||
MXS_INFO("Trigger is TRG_SCHEMA: %s = %s", tmp, my_instance->shm_trg->objects[i]);
|
MXS_INFO("Trigger is TRG_SCHEMA: %s = %s",
|
||||||
|
tmp, my_instance->shm_trg->objects[i]);
|
||||||
|
|
||||||
schema_ok = true;
|
schema_ok = true;
|
||||||
break;
|
break;
|
||||||
@ -1241,7 +1253,8 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
|||||||
if (strcmp(my_session->db, my_instance->shm_trg->objects[i]) == 0)
|
if (strcmp(my_session->db, my_instance->shm_trg->objects[i]) == 0)
|
||||||
{
|
{
|
||||||
|
|
||||||
MXS_INFO("Trigger is TRG_SCHEMA: %s = %s", my_session->db, my_instance->shm_trg->objects[i]);
|
MXS_INFO("Trigger is TRG_SCHEMA: %s = %s",
|
||||||
|
my_session->db, my_instance->shm_trg->objects[i]);
|
||||||
|
|
||||||
schema_ok = true;
|
schema_ok = true;
|
||||||
break;
|
break;
|
||||||
@ -1291,7 +1304,8 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
|||||||
if (!strcmp(tbnm, my_instance->obj_trg->objects[i]))
|
if (!strcmp(tbnm, my_instance->obj_trg->objects[i]))
|
||||||
{
|
{
|
||||||
obj_ok = true;
|
obj_ok = true;
|
||||||
MXS_INFO("Trigger is TRG_OBJECT: %s = %s", my_instance->obj_trg->objects[i], sesstbls[j]);
|
MXS_INFO("Trigger is TRG_OBJECT: %s = %s",
|
||||||
|
my_instance->obj_trg->objects[i], sesstbls[j]);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1479,7 +1493,8 @@ unsigned int consume_leitoi(unsigned char** c)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Converts length-encoded strings to character strings and advanced the pointer to the next unrelated byte.
|
* Converts length-encoded strings to character strings and advanced
|
||||||
|
* the pointer to the next unrelated byte.
|
||||||
* The caller is responsible for freeing the allocated memory.
|
* The caller is responsible for freeing the allocated memory.
|
||||||
* @param c Pointer to the first byte of a valid packet.
|
* @param c Pointer to the first byte of a valid packet.
|
||||||
* @return The newly allocated string or NULL of an error occurred
|
* @return The newly allocated string or NULL of an error occurred
|
||||||
@ -1517,9 +1532,9 @@ unsigned int is_eof(void* p)
|
|||||||
* adds a timestamp to it and publishes the resulting string on the exchange.
|
* adds a timestamp to it and publishes the resulting string on the exchange.
|
||||||
* The message is tagged with the same identifier that the query was.
|
* The message is tagged with the same identifier that the query was.
|
||||||
*
|
*
|
||||||
* @param instance The filter instance data
|
* @param instance The filter instance data
|
||||||
* @param session The filter session
|
* @param session The filter session
|
||||||
* @param reply The response data
|
* @param reply The response data
|
||||||
*/
|
*/
|
||||||
static int clientReply(FILTER* instance, void *session, GWBUF *reply)
|
static int clientReply(FILTER* instance, void *session, GWBUF *reply)
|
||||||
{
|
{
|
||||||
@ -1660,9 +1675,9 @@ static int clientReply(FILTER* instance, void *session, GWBUF *reply)
|
|||||||
* Prints the connection details and the names of the exchange,
|
* Prints the connection details and the names of the exchange,
|
||||||
* queue and the routing key.
|
* queue and the routing key.
|
||||||
*
|
*
|
||||||
* @param instance The filter instance
|
* @param instance The filter instance
|
||||||
* @param fsession Filter session, may be NULL
|
* @param fsession Filter session, may be NULL
|
||||||
* @param dcb The DCB for diagnostic output
|
* @param dcb The DCB for diagnostic output
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
diagnostic(FILTER *instance, void *fsession, DCB *dcb)
|
diagnostic(FILTER *instance, void *fsession, DCB *dcb)
|
||||||
|
|||||||
Reference in New Issue
Block a user