From 2c92f46b3240e0aab820f03fbb20fab8ccfd860a Mon Sep 17 00:00:00 2001 From: Dong Young Yoon Date: Mon, 14 Dec 2015 16:19:38 -0500 Subject: [PATCH 1/7] added tpmfilter for develop branch. --- server/modules/filter/CMakeLists.txt | 5 + server/modules/filter/tpmfilter.c | 538 +++++++++++++++++++++++++++ 2 files changed, 543 insertions(+) create mode 100644 server/modules/filter/tpmfilter.c diff --git a/server/modules/filter/CMakeLists.txt b/server/modules/filter/CMakeLists.txt index e59201f22..8a6fe3b2f 100644 --- a/server/modules/filter/CMakeLists.txt +++ b/server/modules/filter/CMakeLists.txt @@ -46,6 +46,11 @@ target_link_libraries(namedserverfilter log_manager) set_target_properties(namedserverfilter PROPERTIES VERSION "1.1.0") install(TARGETS namedserverfilter DESTINATION ${MAXSCALE_LIBDIR}) +add_library(tpmfilter SHARED tpmfilter.c) +target_link_libraries(tpmfilter log_manager) +set_target_properties(tpmfilter PROPERTIES VERSION "1.0.0") +install(TARGETS tpmfilter DESTINATION ${MAXSCALE_LIBDIR}) + if(BUILD_SLAVELAG) add_library(slavelag SHARED slavelag.c) diff --git a/server/modules/filter/tpmfilter.c b/server/modules/filter/tpmfilter.c new file mode 100644 index 000000000..3efda48dd --- /dev/null +++ b/server/modules/filter/tpmfilter.c @@ -0,0 +1,538 @@ +/* + * This file is distributed as part of MaxScale by MariaDB Corporation. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright MariaDB Corporation Ab 2014 + */ + +/** + * @file tpmfilter.c - Transaction Performance Monitoring Filter + * @verbatim + * + * A simple filter that groups queries into a transaction with the latency. + * + * The filter reads the routed queries, groups them into a transaction by + * detecting 'commit' statement at the end. The transactions are timestamped with a + * unix-timestamp and the latency of a transaction is recorded in milliseconds. + * The filter will not record transactions that are rolled back. + * Please note that the filter only works with 'autocommit' option disabled. + * + * The filter makes no attempt to deal with query packets that do not fit + * in a single GWBUF. + * + * Optional parameters: + * filename= + * delimiter= + * query_delimiter= + * source= + * user= + * + * Date Who Description + * 06/12/2015 Dong Young Yoon Initial implementation + * + * @endverbatim + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/** Defined in log_manager.cc */ +extern int lm_enabled_logfiles_bitmask; +extern size_t log_ses_count[]; +extern __thread log_info_t tls_log_info; + +MODULE_INFO info = { + MODULE_API_FILTER, + MODULE_GA, + FILTER_VERSION, + "Transaction Performance Monitoring filter" +}; + +static char *version_str = "V1.0.0"; +static size_t buf_size = 10; +static size_t sql_size_limit = 64 * 1024 * 1024; /* The maximum size for query statements in a transaction (64MB) */ + +/* + * The filter entry points + */ +static FILTER *createInstance(char **options, FILTER_PARAMETER **); +static void *newSession(FILTER *instance, SESSION *session); +static void closeSession(FILTER *instance, void *session); +static void freeSession(FILTER *instance, void *session); +static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); +static void setUpstream(FILTER *instance, void *fsession, UPSTREAM *upstream); +static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); +static int clientReply(FILTER *instance, void *fsession, GWBUF *queue); +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); + +static FILTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + freeSession, + setDownstream, + setUpstream, + routeQuery, + clientReply, + diagnostic, +}; + +/** + * A instance structure, every instance will write to a same file. + */ +typedef struct { + int sessions; /* Session count */ + char *source; /* The source of the client connection */ + char *user; /* The user name to filter on */ + char *filename; /* filename */ + char *delimiter; /* delimiter for columns in a log */ + char *query_delimiter; /* delimiter for query statements in a transaction */ + + int query_delimiter_size; /* the length of the query delimiter */ + FILE* fp; +} DBS_INSTANCE; + +/** + * The session structure for this DBS filter. + * This stores the downstream filter information, such that the + * filter is able to pass the query on to the next filter (or router) + * in the chain. + * + * It also holds the file descriptor to which queries are written. + */ +typedef struct { + DOWNSTREAM down; + UPSTREAM up; + int active; + char *clientHost; + char *userName; + char* sql; + struct timeval start; + char *current; + int n_statements; + struct timeval total; + struct timeval current_start; + bool query_end; + char *buf; + int sql_index; + size_t max_sql_size; +} DBS_SESSION; + +/** + * Implementation of the mandatory version entry point + * + * @return version string of the module + */ +char * +version() +{ + return version_str; +} + +/** + * The module initialisation routine, called when the module + * is first loaded. + */ +void +ModuleInit() +{ +} + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +FILTER_OBJECT * +GetModuleObject() +{ + return &MyObject; +} + +/** + * Create an instance of the filter for a particular service + * within MaxScale. + * + * @param options The options for this filter + * @param params The array of name/value pair parameters for the filter + * + * @return The instance data for this new instance + */ +static FILTER * +createInstance(char **options, FILTER_PARAMETER **params) +{ +int i; +DBS_INSTANCE *my_instance; + + if ((my_instance = calloc(1, sizeof(DBS_INSTANCE))) != NULL) + { + my_instance->source = NULL; + my_instance->user = NULL; + + /* set default log filename */ + my_instance->filename = strdup("tpm.log"); + /* set default delimiter */ + my_instance->delimiter = strdup("|"); + /* set default query delimiter */ + my_instance->query_delimiter = strdup(";"); + my_instance->query_delimiter_size = 1; + + for (i = 0; params && params[i]; i++) + { + if (!strcmp(params[i]->name, "filename")) + { + free(my_instance->filename); + my_instance->filename = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "source")) + my_instance->source = strdup(params[i]->value); + else if (!strcmp(params[i]->name, "user")) + my_instance->user = strdup(params[i]->value); + else if (!strcmp(params[i]->name, "delimiter")) + { + free(my_instance->delimiter); + my_instance->delimiter = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "query_delimiter")) + { + free(my_instance->query_delimiter); + my_instance->query_delimiter = strdup(params[i]->value); + my_instance->query_delimiter_size = strlen(my_instance->query_delimiter); + } + } + my_instance->sessions = 0; + my_instance->fp = fopen(my_instance->filename, "w"); + if (my_instance->fp == NULL) + { + skygw_log_write(LOGFILE_ERROR, "Error: Opening output file '%s' for tpmfilter failed due to %d, %s", my_instance->filename, errno, strerror(errno)); + return NULL; + } + } + return (FILTER *)my_instance; +} + +/** + * Associate a new session with this instance of the filter. + * + * Every session uses the same log file. + * + * @param instance The filter instance data + * @param session The session itself + * @return Session specific data for this session + */ +static void * +newSession(FILTER *instance, SESSION *session) +{ +DBS_INSTANCE *my_instance = (DBS_INSTANCE *)instance; +DBS_SESSION *my_session; +int i; +char *remote, *user; + + if ((my_session = calloc(1, sizeof(DBS_SESSION))) != NULL) + { + atomic_add(&my_instance->sessions,1); + + my_session->max_sql_size = 4 * 1024; // default max query size of 4k. + my_session->sql = (char*)malloc(my_session->max_sql_size); + memset(my_session->sql, 0x00, my_session->max_sql_size); + my_session->buf = (char*)malloc(buf_size); + my_session->sql_index = 0; + my_session->n_statements = 0; + my_session->total.tv_sec = 0; + my_session->total.tv_usec = 0; + my_session->current = NULL; + if ((remote = session_get_remote(session)) != NULL) + my_session->clientHost = strdup(remote); + else + my_session->clientHost = NULL; + if ((user = session_getUser(session)) != NULL) + my_session->userName = strdup(user); + else + my_session->userName = NULL; + my_session->active = 1; + if (my_instance->source && my_session->clientHost && strcmp(my_session->clientHost, + my_instance->source)) + my_session->active = 0; + if (my_instance->user && my_session->userName && strcmp(my_session->userName, + my_instance->user)) + my_session->active = 0; + } + + return my_session; +} + +/** + * Close a session with the filter, this is the mechanism + * by which a filter may cleanup data structure etc. + * + * @param instance The filter instance data + * @param session The session being closed + */ +static void +closeSession(FILTER *instance, void *session) +{ + DBS_SESSION *my_session = (DBS_SESSION *)session; + DBS_INSTANCE *my_instance = (DBS_INSTANCE *)instance; + if (my_instance->fp != NULL) + { + // flush FP when a session is closed. + fflush(my_instance->fp); + } + +} + +/** + * Free the memory associated with the session + * + * @param instance The filter instance + * @param session The filter session + */ +static void +freeSession(FILTER *instance, void *session) +{ +DBS_SESSION *my_session = (DBS_SESSION *)session; + + free(my_session->clientHost); + free(my_session->userName); + free(my_session->sql); + free(my_session->buf); + free(session); + return; +} + +/** + * Set the downstream filter or router to which queries will be + * passed from this filter. + * + * @param instance The filter instance data + * @param session The filter session + * @param downstream The downstream filter or router. + */ +static void +setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) +{ +DBS_SESSION *my_session = (DBS_SESSION *)session; + + my_session->down = *downstream; +} + +/** + * Set the upstream filter or session to which results will be + * passed from this filter. + * + * @param instance The filter instance data + * @param session The filter session + * @param upstream The upstream filter or session. + */ +static void +setUpstream(FILTER *instance, void *session, UPSTREAM *upstream) +{ +DBS_SESSION *my_session = (DBS_SESSION *)session; + + my_session->up = *upstream; +} + +/** + * The routeQuery entry point. This is passed the query buffer + * to which the filter should be applied. Once applied the + * query should normally be passed to the downstream component + * (filter or router) in the filter chain. + * + * @param instance The filter instance data + * @param session The filter session + * @param queue The query data + */ +static int +routeQuery(FILTER *instance, void *session, GWBUF *queue) +{ +DBS_INSTANCE *my_instance = (DBS_INSTANCE *)instance; +DBS_SESSION *my_session = (DBS_SESSION *)session; +char *ptr = NULL; +size_t i; + + if (my_session->active) + { + if (queue->next != NULL) + { + queue = gwbuf_make_contiguous(queue); + } + if ((ptr = modutil_get_SQL(queue)) != NULL) + { + my_session->query_end = false; + /* check for commit and rollback */ + if (strlen(ptr) > 5) + { + size_t ptr_size = strlen(ptr)+1; + char* buf = my_session->buf; + for (i=0; i < ptr_size && i < buf_size; ++i) + { + buf[i] = tolower(ptr[i]); + } + if (strncmp(buf, "commit", 6) == 0) + { + my_session->query_end = true; + } + else if (strncmp(buf, "rollback", 8) == 0) + { + my_session->query_end = true; + my_session->sql_index = 0; + } + } + + /* for normal sql statements */ + if (!my_session->query_end) + { + /* check and expand buffer size first. */ + size_t new_sql_size = my_session->max_sql_size; + size_t len = my_session->sql_index + strlen(ptr) + my_instance->query_delimiter_size + 1; + + /* if the total length of query statements exceeds the maximum limit, print an error and return */ + if (len > sql_size_limit) + { + skygw_log_write(LOGFILE_ERROR, "Error: The size of query statements exceeds the maximum buffer limit of 64MB."); + goto retblock; + } + + /* double buffer size until the buffer fits the query */ + while (len > new_sql_size) + { + new_sql_size *= 2; + } + if (new_sql_size > my_session->max_sql_size) + { + char* new_sql = (char*)malloc(new_sql_size); + if (new_sql == NULL) + { + skygw_log_write(LOGFILE_ERROR, "Error: Memory allocation failure."); + goto retblock; + } + memcpy(new_sql, my_session->sql, my_session->sql_index); + free(my_session->sql); + my_session->sql = new_sql; + my_session->max_sql_size = new_sql_size; + } + + /* first statement */ + if (my_session->sql_index == 0) + { + memcpy(my_session->sql, ptr, strlen(ptr)); + my_session->sql_index += strlen(ptr); + gettimeofday(&my_session->current_start, NULL); + } + /* otherwise, append the statement with semicolon as a statement delimiter */ + else + { + /* append a query delimiter */ + memcpy(my_session->sql + my_session->sql_index, my_instance->query_delimiter, my_instance->query_delimiter_size); + /* append the next query statement */ + memcpy(my_session->sql + my_session->sql_index + my_instance->query_delimiter_size, ptr, strlen(ptr)); + /* set new pointer for the buffer */ + my_session->sql_index += (my_instance->query_delimiter_size + strlen(ptr)); + } + } + } + } + +retblock: + + free(ptr); + /* Pass the query downstream */ + return my_session->down.routeQuery(my_session->down.instance, + my_session->down.session, queue); +} + +static int +clientReply(FILTER *instance, void *session, GWBUF *reply) +{ +DBS_INSTANCE *my_instance = (DBS_INSTANCE *)instance; +DBS_SESSION *my_session = (DBS_SESSION *)session; +struct timeval tv, diff; +int i, inserted; + +/* found 'commit' and sql statements exist. */ + if (my_session->query_end && my_session->sql_index > 0) + { + gettimeofday(&tv, NULL); + timersub(&tv, &(my_session->current_start), &diff); + + /* get latency */ + uint64_t millis = (diff.tv_sec * (uint64_t)1000 + diff.tv_usec / 1000); + /* get timestamp */ + uint64_t timestamp = (tv.tv_sec + (tv.tv_usec / (1000*1000))); + + *(my_session->sql + my_session->sql_index) = '\0'; + + /* print to log. */ + fprintf(my_instance->fp, "%ld%s%s%s%s%s%ld%s%s\n", + timestamp, + my_instance->delimiter, + my_session->clientHost, + my_instance->delimiter, + my_session->userName, + my_instance->delimiter, + millis, + my_instance->delimiter, + my_session->sql); + + my_session->sql_index = 0; + } + + /* Pass the result upstream */ + return my_session->up.clientReply(my_session->up.instance, + my_session->up.session, reply); +} + +/** + * Diagnostics routine + * + * If fsession is NULL then print diagnostics on the filter + * instance as a whole, otherwise print diagnostics for the + * particular session. + * + * @param instance The filter instance + * @param fsession Filter session, may be NULL + * @param dcb The DCB for diagnostic output + */ +static void +diagnostic(FILTER *instance, void *fsession, DCB *dcb) +{ +DBS_INSTANCE *my_instance = (DBS_INSTANCE *)instance; +DBS_SESSION *my_session = (DBS_SESSION *)fsession; +int i; + + if (my_instance->source) + dcb_printf(dcb, "\t\tLimit logging to connections from %s\n", + my_instance->source); + if (my_instance->user) + dcb_printf(dcb, "\t\tLimit logging to user %s\n", + my_instance->user); + if (my_session) + { + dcb_printf(dcb, "\t\tLogging to file %s.\n", + my_instance->filename); + } +} From 2222ad311c38553fed0d243e7bfaf6ed6fcc1af4 Mon Sep 17 00:00:00 2001 From: Dong Young Yoon Date: Mon, 14 Dec 2015 16:31:35 -0500 Subject: [PATCH 2/7] made tpmfilter compatible with the current upstream develop branch. --- server/modules/filter/tpmfilter.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/server/modules/filter/tpmfilter.c b/server/modules/filter/tpmfilter.c index 3efda48dd..fb2b9ac4f 100644 --- a/server/modules/filter/tpmfilter.c +++ b/server/modules/filter/tpmfilter.c @@ -60,7 +60,6 @@ /** Defined in log_manager.cc */ extern int lm_enabled_logfiles_bitmask; extern size_t log_ses_count[]; -extern __thread log_info_t tls_log_info; MODULE_INFO info = { MODULE_API_FILTER, @@ -228,7 +227,7 @@ DBS_INSTANCE *my_instance; my_instance->fp = fopen(my_instance->filename, "w"); if (my_instance->fp == NULL) { - skygw_log_write(LOGFILE_ERROR, "Error: Opening output file '%s' for tpmfilter failed due to %d, %s", my_instance->filename, errno, strerror(errno)); + MXS_ERROR("Opening output file '%s' for tpmfilter failed due to %d, %s", my_instance->filename, errno, strerror(errno)); return NULL; } } @@ -413,7 +412,7 @@ size_t i; /* if the total length of query statements exceeds the maximum limit, print an error and return */ if (len > sql_size_limit) { - skygw_log_write(LOGFILE_ERROR, "Error: The size of query statements exceeds the maximum buffer limit of 64MB."); + MXS_ERROR("The size of query statements exceeds the maximum buffer limit of 64MB."); goto retblock; } @@ -427,7 +426,7 @@ size_t i; char* new_sql = (char*)malloc(new_sql_size); if (new_sql == NULL) { - skygw_log_write(LOGFILE_ERROR, "Error: Memory allocation failure."); + MXS_ERROR("Memory allocation failure."); goto retblock; } memcpy(new_sql, my_session->sql, my_session->sql_index); From 74c0ab2c57b3cb841d526e8cfbd5aceac510b086 Mon Sep 17 00:00:00 2001 From: Dong Young Yoon Date: Fri, 18 Dec 2015 15:23:09 -0500 Subject: [PATCH 3/7] Added documentation for tpmfilter. --- ...ansaction-Performance-Monitoring-Filter.md | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) create mode 100644 Documentation/Filters/Transaction-Performance-Monitoring-Filter.md diff --git a/Documentation/Filters/Transaction-Performance-Monitoring-Filter.md b/Documentation/Filters/Transaction-Performance-Monitoring-Filter.md new file mode 100644 index 000000000..03de34b05 --- /dev/null +++ b/Documentation/Filters/Transaction-Performance-Monitoring-Filter.md @@ -0,0 +1,107 @@ +# Transaction Performance Monitoring Filter + +## Overview + +The Transaction Performance Monitoring (TPM) filter is a filter module for MaxScale that monitors every SQL statement that passes through the filter. The filter groups a series of SQL statements into a transaction by detecting 'commit' or 'rollback' statements. It logs all committed transactions with necessary information, such as timestamp, client, SQL statements, latency, etc., which can be used later for transaction performance analysis. + +## Configuration + +The configuration block for the TPM filter requires the minimal filter options in it's section within the maxscale.cnf file, stored in /etc/maxscale.cnf. + +``` +[MyLogFilter] +type=filter +module=tpmfilter + +[MyService] +type=service +router=readconnrouter +servers=server1 +user=myuser +passwd=mypasswd +filters=MyLogFilter +``` + +## Filter Options + +The TPM filter does not support any filter options currently. + +## Filter Parameters + +The TPM filter accepts a number of optional parameters. + +### Filename + +The name of the output file created for performance logging. The default filename is **tpm.log**. + +``` +filebase=/tmp/SqlQueryLog +``` + +### Source + +The optional source parameter defines an address that is used to match against the address from which the client connection to MaxScale originates. Only sessions that originate from this address will be logged. + +``` +source=127.0.0.1 +``` + +### User + +The optional user parameter defines a user name that is used to match against the user from which the client connection to MaxScale originates. Only sessions that are connected using this username are logged. + +``` +user=john +``` + +### Delimiter + +The optional delimiter parameter defines a delimiter that is used to distinguish columns in the log. The default delimiter is **|**. + +``` +delimiter=: +``` + +### Query_delimiter + +The optional query_delimiter defines a delimiter that is used to distinguish different SQL statements in a transaction. The default query delimiter is **;**. + +``` +query_delimiter=@@@ +``` + + +## Examples + +### Example 1 - Log Transactions for Performance Analysis + +You want to log every transaction with its SQL statements and latency for future transaction performance analysis. + +Add a filter with the following definition: + +``` +[PerformanceLogger] +type=filter +module=tpmfilter +delimiter=:: +query_delimiter=@@ +filebase=/var/logs/tpm/perf.log + +[Product Service] +type=service +router=readconnrouter +servers=server1 +user=myuser +passwd=mypasswd +filters=PerformanceLogger +``` + +The following is an example log that is generated from the above TPM filter: + +``` +1450469909::127.0.0.1::root::5::UPDATE WAREHOUSE SET W_YTD = W_YTD + 1954.67 WHERE W_ID = 1 @@SELECT W_STREET_1, W_STREET_2, W_CITY, W_STATE, W_ZIP, W_NAME FROM WAREHOUSE WHERE W_ID = 1@@UPDATE DISTRICT SET D_YTD = D_YTD + 1954.67 WHERE D_W_ID = 1 AND D_ID = 4@@SELECT D_STREET_1, D_STREET_2, D_CITY, D_STATE, D_ZIP, D_NAME FROM DISTRICT WHERE D_W_ID = 1 AND D_ID = 4@@SELECT C_FIRST, C_MIDDLE, C_LAST, C_STREET_1, C_STREET_2, C_CITY, C_STATE, C_ZIP, C_PHONE, C_CREDIT, C_CREDIT_LIM, C_DISCOUNT, C_BALANCE, C_YTD_PAYMENT, C_PAYMENT_CNT, C_SINCE FROM CUSTOMER WHERE C_W_ID = 1 AND C_D_ID = 4 AND C_ID = 766@@UPDATE CUSTOMER SET C_BALANCE = 145950.77, C_YTD_PAYMENT = 173436.67, C_PAYMENT_CNT = 67 WHERE C_W_ID = 1 AND C_D_ID = 4 AND C_ID = 766@@INSERT INTO HISTORY (H_C_D_ID, H_C_W_ID, H_C_ID, H_D_ID, H_W_ID, H_DATE, H_AMOUNT, H_DATA) VALUES (4,1,766,4,1,'2015-12-18 15:18:29',1954.67,'sxvnj vivbun') +1450469909::127.0.0.1::root::14::UPDATE WAREHOUSE SET W_YTD = W_YTD + 3969.43 WHERE W_ID = 2 @@SELECT W_STREET_1, W_STREET_2, W_CITY, W_STATE, W_ZIP, W_NAME FROM WAREHOUSE WHERE W_ID = 2@@UPDATE DISTRICT SET D_YTD = D_YTD + 3969.43 WHERE D_W_ID = 2 AND D_ID = 5@@SELECT D_STREET_1, D_STREET_2, D_CITY, D_STATE, D_ZIP, D_NAME FROM DISTRICT WHERE D_W_ID = 2 AND D_ID = 5@@SELECT C_FIRST, C_MIDDLE, C_LAST, C_STREET_1, C_STREET_2, C_CITY, C_STATE, C_ZIP, C_PHONE, C_CREDIT, C_CREDIT_LIM, C_DISCOUNT, C_BALANCE, C_YTD_PAYMENT, C_PAYMENT_CNT, C_SINCE FROM CUSTOMER WHERE C_W_ID = 1 AND C_D_ID = 6 AND C_ID = 1789@@UPDATE CUSTOMER SET C_BALANCE = 169626.31, C_YTD_PAYMENT = 111249.43, C_PAYMENT_CNT = 49 WHERE C_W_ID = 1 AND C_D_ID = 6 AND C_ID = 1789@@INSERT INTO HISTORY (H_C_D_ID, H_C_W_ID, H_C_ID, H_D_ID, H_W_ID, H_DATE, H_AMOUNT, H_DATA) VALUES (6,1,1789,5,2,'2015-12-18 15:18:29',3969.43,'gqfla adopdon') +... +``` + +Note that 5 and 14 are latencies of each transaction in milliseconds. \ No newline at end of file From b5171cf7ee17ba377dbef7204755e19d1b6f24cb Mon Sep 17 00:00:00 2001 From: Dong Young Yoon Date: Fri, 18 Dec 2015 16:11:39 -0500 Subject: [PATCH 4/7] 1. Changed the prefix 'DBS' to 'TPM' for session and instance structs. 2. Added every optional parameters in diagnostics function. --- server/modules/filter/tpmfilter.c | 48 +++++++++++++++++-------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/server/modules/filter/tpmfilter.c b/server/modules/filter/tpmfilter.c index fb2b9ac4f..e34c754a7 100644 --- a/server/modules/filter/tpmfilter.c +++ b/server/modules/filter/tpmfilter.c @@ -110,10 +110,10 @@ typedef struct { int query_delimiter_size; /* the length of the query delimiter */ FILE* fp; -} DBS_INSTANCE; +} TPM_INSTANCE; /** - * The session structure for this DBS filter. + * The session structure for this TPM filter. * This stores the downstream filter information, such that the * filter is able to pass the query on to the next filter (or router) * in the chain. @@ -136,7 +136,7 @@ typedef struct { char *buf; int sql_index; size_t max_sql_size; -} DBS_SESSION; +} TPM_SESSION; /** * Implementation of the mandatory version entry point @@ -185,9 +185,9 @@ static FILTER * createInstance(char **options, FILTER_PARAMETER **params) { int i; -DBS_INSTANCE *my_instance; +TPM_INSTANCE *my_instance; - if ((my_instance = calloc(1, sizeof(DBS_INSTANCE))) != NULL) + if ((my_instance = calloc(1, sizeof(TPM_INSTANCE))) != NULL) { my_instance->source = NULL; my_instance->user = NULL; @@ -246,12 +246,12 @@ DBS_INSTANCE *my_instance; static void * newSession(FILTER *instance, SESSION *session) { -DBS_INSTANCE *my_instance = (DBS_INSTANCE *)instance; -DBS_SESSION *my_session; +TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; +TPM_SESSION *my_session; int i; char *remote, *user; - if ((my_session = calloc(1, sizeof(DBS_SESSION))) != NULL) + if ((my_session = calloc(1, sizeof(TPM_SESSION))) != NULL) { atomic_add(&my_instance->sessions,1); @@ -294,8 +294,8 @@ char *remote, *user; static void closeSession(FILTER *instance, void *session) { - DBS_SESSION *my_session = (DBS_SESSION *)session; - DBS_INSTANCE *my_instance = (DBS_INSTANCE *)instance; + TPM_SESSION *my_session = (TPM_SESSION *)session; + TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; if (my_instance->fp != NULL) { // flush FP when a session is closed. @@ -313,7 +313,7 @@ closeSession(FILTER *instance, void *session) static void freeSession(FILTER *instance, void *session) { -DBS_SESSION *my_session = (DBS_SESSION *)session; +TPM_SESSION *my_session = (TPM_SESSION *)session; free(my_session->clientHost); free(my_session->userName); @@ -334,7 +334,7 @@ DBS_SESSION *my_session = (DBS_SESSION *)session; static void setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) { -DBS_SESSION *my_session = (DBS_SESSION *)session; +TPM_SESSION *my_session = (TPM_SESSION *)session; my_session->down = *downstream; } @@ -350,7 +350,7 @@ DBS_SESSION *my_session = (DBS_SESSION *)session; static void setUpstream(FILTER *instance, void *session, UPSTREAM *upstream) { -DBS_SESSION *my_session = (DBS_SESSION *)session; +TPM_SESSION *my_session = (TPM_SESSION *)session; my_session->up = *upstream; } @@ -368,8 +368,8 @@ DBS_SESSION *my_session = (DBS_SESSION *)session; static int routeQuery(FILTER *instance, void *session, GWBUF *queue) { -DBS_INSTANCE *my_instance = (DBS_INSTANCE *)instance; -DBS_SESSION *my_session = (DBS_SESSION *)session; +TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; +TPM_SESSION *my_session = (TPM_SESSION *)session; char *ptr = NULL; size_t i; @@ -467,8 +467,8 @@ retblock: static int clientReply(FILTER *instance, void *session, GWBUF *reply) { -DBS_INSTANCE *my_instance = (DBS_INSTANCE *)instance; -DBS_SESSION *my_session = (DBS_SESSION *)session; +TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; +TPM_SESSION *my_session = (TPM_SESSION *)session; struct timeval tv, diff; int i, inserted; @@ -519,8 +519,8 @@ int i, inserted; static void diagnostic(FILTER *instance, void *fsession, DCB *dcb) { -DBS_INSTANCE *my_instance = (DBS_INSTANCE *)instance; -DBS_SESSION *my_session = (DBS_SESSION *)fsession; +TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; +TPM_SESSION *my_session = (TPM_SESSION *)fsession; int i; if (my_instance->source) @@ -529,9 +529,13 @@ int i; if (my_instance->user) dcb_printf(dcb, "\t\tLimit logging to user %s\n", my_instance->user); - if (my_session) - { + if (my_instance->filename) dcb_printf(dcb, "\t\tLogging to file %s.\n", my_instance->filename); - } + if (my_instance->delimiter) + dcb_printf(dcb, "\t\tLogging with delimiter %s.\n", + my_instance->delimiter); + if (my_instance->query_delimiter) + dcb_printf(dcb, "\t\tLogging with query delimiter %s.\n", + my_instance->query_delimiter); } From e01b4a33fdc1ae3cf6faac789ed4746ea0a0086c Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Tue, 13 Sep 2016 21:22:36 +0300 Subject: [PATCH 5/7] Make service and monitor permissions checks optional MaxScale shouldn't require the service and monitor user checks. It makes sense to disable the checks to speed up the startup process when the user knows that the permissions are OK. --- Documentation/Getting-Started/Configuration-Guide.md | 9 +++++++++ server/core/config.c | 5 +++++ server/core/dbusers.c | 3 ++- server/core/monitor.c | 5 +++++ server/include/maxconfig.h | 1 + 5 files changed, 22 insertions(+), 1 deletion(-) diff --git a/Documentation/Getting-Started/Configuration-Guide.md b/Documentation/Getting-Started/Configuration-Guide.md index 33ea2ab46..a4e863282 100644 --- a/Documentation/Getting-Started/Configuration-Guide.md +++ b/Documentation/Getting-Started/Configuration-Guide.md @@ -113,6 +113,15 @@ Enable or disable the high precision timestamps in logfiles. Enabling this adds ms_timestamp=1 ``` +#### `skip_permission_checks` + +Skip service and monitor user permission checks. This is useful when +you know the permissions are OK and you want to speed up the startup +process. + +It is recommended to leave the permission checks on so that any +missing privileges are detected when maxscale is starting up. + #### `syslog` Enable or disable the logging of messages to *syslog*. diff --git a/server/core/config.c b/server/core/config.c index b1cbb41e8..baaec3596 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -957,6 +957,10 @@ handle_global_item(const char *name, const char *value) { mxs_log_set_highprecision_enabled(config_truth_value((char*)value)); } + else if (strcmp(name, "skip_permission_checks") == 0) + { + gateway.skip_permission_checks = config_truth_value((char*)value); + } else if (strcmp(name, "auth_connect_timeout") == 0) { char* endptr; @@ -1301,6 +1305,7 @@ global_defaults() gateway.auth_conn_timeout = DEFAULT_AUTH_CONNECT_TIMEOUT; gateway.auth_read_timeout = DEFAULT_AUTH_READ_TIMEOUT; gateway.auth_write_timeout = DEFAULT_AUTH_WRITE_TIMEOUT; + gateway.skip_permission_checks = false; if (version_string != NULL) { gateway.version_string = MXS_STRDUP_A(version_string); diff --git a/server/core/dbusers.c b/server/core/dbusers.c index d49a9698e..7927e1a90 100644 --- a/server/core/dbusers.c +++ b/server/core/dbusers.c @@ -2689,7 +2689,8 @@ static bool check_server_permissions(SERVICE *service, SERVER* server, */ bool check_service_permissions(SERVICE* service) { - if (is_internal_service(service->routerModule)) + if (is_internal_service(service->routerModule) || + config_get_global_options()->skip_permission_checks) { return true; } diff --git a/server/core/monitor.c b/server/core/monitor.c index 3741c3071..e2143b319 100644 --- a/server/core/monitor.c +++ b/server/core/monitor.c @@ -543,6 +543,11 @@ bool check_monitor_permissions(MONITOR* monitor, const char* query) return false; } + if (config_get_global_options()->skip_permission_checks) + { + return true; + } + char *user = monitor->user; char *dpasswd = decryptPassword(monitor->password); GATEWAY_CONF* cnf = config_get_global_options(); diff --git a/server/include/maxconfig.h b/server/include/maxconfig.h index 2e7970657..684af2376 100644 --- a/server/include/maxconfig.h +++ b/server/include/maxconfig.h @@ -121,6 +121,7 @@ typedef struct unsigned int auth_conn_timeout; /**< Connection timeout for the user authentication */ unsigned int auth_read_timeout; /**< Read timeout for the user authentication */ unsigned int auth_write_timeout; /**< Write timeout for the user authentication */ + bool skip_permission_checks; /**< Skip service and monitor permission checks */ char qc_name[PATH_MAX]; /**< The name of the query classifier to load */ char* qc_args; /**< Arguments for the query classifier */ } GATEWAY_CONF; From df38b4dd500d5ec6e4fdfed04129de2d62807026 Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Wed, 14 Sep 2016 09:48:13 +0300 Subject: [PATCH 6/7] Add utility scripts to make installation easier The `create_grants` scripts allow users to be easily "copied" to MaxScale. It queries the backend for grants for all users and converts them into similar grants for the MaxScale host. The `create_roles.sql` is a small set of queries which creates two utility roles, `proxy_authenticator` and `proxy_monitor`. These roles can be assigned to the actual service and monitor users with a single grant command. --- CMakeLists.txt | 2 ++ script/create_grants | 61 +++++++++++++++++++++++++++++++++++++++++ script/create_roles.sql | 7 +++++ 3 files changed, 70 insertions(+) create mode 100755 script/create_grants create mode 100644 script/create_roles.sql diff --git a/CMakeLists.txt b/CMakeLists.txt index 9119f5ad7..9d0773e25 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -205,6 +205,8 @@ install_file(${CMAKE_BINARY_DIR}/ReleaseNotes.txt core) install_file(${CMAKE_BINARY_DIR}/UpgradingToMaxScale12.txt core) install_file(server/maxscale.cnf.template core) install_file(server/maxscale_binlogserver_template.cnf core) +install_program(script/create_grants core) +install_file(script/create_roles.sql core) # Install the template into /etc if(WITH_MAXSCALE_CNF AND (NOT TARGET_COMPONENT OR "core" STREQUAL "${TARGET_COMPONENT}")) diff --git a/script/create_grants b/script/create_grants new file mode 100755 index 000000000..c4bbe910a --- /dev/null +++ b/script/create_grants @@ -0,0 +1,61 @@ +#!/bin/bash + +function runQuery(){ + mysql -s -s -h $host -P $port -u $user -p$password -e "$1" + if [ $? -ne 0 ] + then + echo "Failed to execute query: $1" + exit + fi +} + +# Transform grants to from external hosts to MaxScale's host +function getGrants(){ + result=$(runQuery "show grants for $1"|sed -e "s/@[^ ]*/@'$maxscalehost'/" -e "s/ *IDENTIFIED BY.*//" -e "s/$/;/") + echo "$result" +} + +user=$(whoami) +host=$(hostname) +port=3306 +include_root="and user <> 'root'" + +if [ "$1" == "--help" ] || [ $# -eq 0 ] +then + echo "$0 -u USER -p PASSWORD -h HOST -P PORT [-r]" + exit +fi + +while getopts "u:p:h:P:r" var +do + case $var in + u) + user=$OPTARG + ;; + + p) + password=$OPTARG + ;; + + h) + host=$OPTARG + ;; + + P) + port=$OPTARG + ;; + r) + include_root="" + ;; + esac +done + +# Get the MaxScale hostname from the backend server +maxscalehost=$(runQuery "select user()") +maxscalehost=${maxscalehost#*@} + +# List all the users +runQuery "select concat(\"'\", user, \"'\", '@', \"'\", host, \"'\") from mysql.user where user <> '' and host <> '%' $include_root"|while read i +do + getGrants "$i" +done diff --git a/script/create_roles.sql b/script/create_roles.sql new file mode 100644 index 000000000..98d473c1c --- /dev/null +++ b/script/create_roles.sql @@ -0,0 +1,7 @@ +CREATE ROLE proxy_authenticator; +GRANT SELECT ON mysql.user TO proxy_authenticator; +GRANT SELECT ON mysql.db TO proxy_authenticator; +GRANT SELECT ON mysql.tables_priv TO proxy_authenticator; +GRANT SHOW DATABASES ON *.* TO proxy_authenticator; +CREATE ROLE proxy_monitor; +GRANT REPLICATION CLIENT ON *.* TO proxy_monitor; From b60f000a4c8de2c26df95398b509c0f7bd6413cb Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Thu, 15 Sep 2016 12:21:54 +0300 Subject: [PATCH 7/7] Format tpmfilter The filter is now formatted with the astyle config. --- server/modules/filter/tpmfilter/tpmfilter.c | 633 ++++++++++---------- 1 file changed, 328 insertions(+), 305 deletions(-) diff --git a/server/modules/filter/tpmfilter/tpmfilter.c b/server/modules/filter/tpmfilter/tpmfilter.c index 9d360bdab..d17823a0f 100644 --- a/server/modules/filter/tpmfilter/tpmfilter.c +++ b/server/modules/filter/tpmfilter/tpmfilter.c @@ -27,14 +27,14 @@ * in a single GWBUF. * * Optional parameters: - * filename= - * delimiter= - * query_delimiter= - * source= - * user= + * filename= + * delimiter= + * query_delimiter= + * source= + * user= * - * Date Who Description - * 06/12/2015 Dong Young Yoon Initial implementation + * Date Who Description + * 06/12/2015 Dong Young Yoon Initial implementation * * @endverbatim */ @@ -56,31 +56,34 @@ extern int lm_enabled_logfiles_bitmask; extern size_t log_ses_count[]; -MODULE_INFO info = { - MODULE_API_FILTER, - MODULE_GA, - FILTER_VERSION, - "Transaction Performance Monitoring filter" +MODULE_INFO info = +{ + MODULE_API_FILTER, + MODULE_GA, + FILTER_VERSION, + "Transaction Performance Monitoring filter" }; static char *version_str = "V1.0.0"; static size_t buf_size = 10; -static size_t sql_size_limit = 64 * 1024 * 1024; /* The maximum size for query statements in a transaction (64MB) */ +static size_t sql_size_limit = 64 * 1024 * + 1024; /* The maximum size for query statements in a transaction (64MB) */ /* * The filter entry points */ -static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **); -static void *newSession(FILTER *instance, SESSION *session); -static void closeSession(FILTER *instance, void *session); -static void freeSession(FILTER *instance, void *session); -static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); -static void setUpstream(FILTER *instance, void *fsession, UPSTREAM *upstream); -static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); -static int clientReply(FILTER *instance, void *fsession, GWBUF *queue); -static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); +static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **); +static void *newSession(FILTER *instance, SESSION *session); +static void closeSession(FILTER *instance, void *session); +static void freeSession(FILTER *instance, void *session); +static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); +static void setUpstream(FILTER *instance, void *fsession, UPSTREAM *upstream); +static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); +static int clientReply(FILTER *instance, void *fsession, GWBUF *queue); +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); -static FILTER_OBJECT MyObject = { +static FILTER_OBJECT MyObject = +{ createInstance, newSession, closeSession, @@ -95,16 +98,17 @@ static FILTER_OBJECT MyObject = { /** * A instance structure, every instance will write to a same file. */ -typedef struct { - int sessions; /* Session count */ - char *source; /* The source of the client connection */ - char *user; /* The user name to filter on */ - char *filename; /* filename */ - char *delimiter; /* delimiter for columns in a log */ - char *query_delimiter; /* delimiter for query statements in a transaction */ +typedef struct +{ + int sessions; /* Session count */ + char *source; /* The source of the client connection */ + char *user; /* The user name to filter on */ + char *filename; /* filename */ + char *delimiter; /* delimiter for columns in a log */ + char *query_delimiter; /* delimiter for query statements in a transaction */ - int query_delimiter_size; /* the length of the query delimiter */ - FILE* fp; + int query_delimiter_size; /* the length of the query delimiter */ + FILE* fp; } TPM_INSTANCE; /** @@ -115,22 +119,23 @@ typedef struct { * * It also holds the file descriptor to which queries are written. */ -typedef struct { - DOWNSTREAM down; - UPSTREAM up; - int active; - char *clientHost; - char *userName; - char* sql; - struct timeval start; - char *current; - int n_statements; - struct timeval total; - struct timeval current_start; - bool query_end; - char *buf; - int sql_index; - size_t max_sql_size; +typedef struct +{ + DOWNSTREAM down; + UPSTREAM up; + int active; + char *clientHost; + char *userName; + char* sql; + struct timeval start; + char *current; + int n_statements; + struct timeval total; + struct timeval current_start; + bool query_end; + char *buf; + int sql_index; + size_t max_sql_size; } TPM_SESSION; /** @@ -141,7 +146,7 @@ typedef struct { char * version() { - return version_str; + return version_str; } /** @@ -164,69 +169,74 @@ ModuleInit() FILTER_OBJECT * GetModuleObject() { - return &MyObject; + return &MyObject; } /** * Create an instance of the filter for a particular service * within MaxScale. * - * @param options The options for this filter - * @param params The array of name/value pair parameters for the filter + * @param options The options for this filter + * @param params The array of name/value pair parameters for the filter * * @return The instance data for this new instance */ -static FILTER * +static FILTER * createInstance(const char *name, char **options, FILTER_PARAMETER **params) { -int i; -TPM_INSTANCE *my_instance; + int i; + TPM_INSTANCE *my_instance; - if ((my_instance = calloc(1, sizeof(TPM_INSTANCE))) != NULL) - { - my_instance->source = NULL; - my_instance->user = NULL; + if ((my_instance = calloc(1, sizeof(TPM_INSTANCE))) != NULL) + { + my_instance->source = NULL; + my_instance->user = NULL; - /* set default log filename */ - my_instance->filename = strdup("tpm.log"); - /* set default delimiter */ - my_instance->delimiter = strdup("|"); - /* set default query delimiter */ - my_instance->query_delimiter = strdup(";"); - my_instance->query_delimiter_size = 1; + /* set default log filename */ + my_instance->filename = strdup("tpm.log"); + /* set default delimiter */ + my_instance->delimiter = strdup("|"); + /* set default query delimiter */ + my_instance->query_delimiter = strdup(";"); + my_instance->query_delimiter_size = 1; - for (i = 0; params && params[i]; i++) - { - if (!strcmp(params[i]->name, "filename")) - { - free(my_instance->filename); - my_instance->filename = strdup(params[i]->value); - } - else if (!strcmp(params[i]->name, "source")) - my_instance->source = strdup(params[i]->value); - else if (!strcmp(params[i]->name, "user")) - my_instance->user = strdup(params[i]->value); - else if (!strcmp(params[i]->name, "delimiter")) - { - free(my_instance->delimiter); - my_instance->delimiter = strdup(params[i]->value); - } - else if (!strcmp(params[i]->name, "query_delimiter")) - { - free(my_instance->query_delimiter); - my_instance->query_delimiter = strdup(params[i]->value); - my_instance->query_delimiter_size = strlen(my_instance->query_delimiter); - } - } - my_instance->sessions = 0; - my_instance->fp = fopen(my_instance->filename, "w"); - if (my_instance->fp == NULL) - { - MXS_ERROR("Opening output file '%s' for tpmfilter failed due to %d, %s", my_instance->filename, errno, strerror(errno)); - return NULL; - } - } - return (FILTER *)my_instance; + for (i = 0; params && params[i]; i++) + { + if (!strcmp(params[i]->name, "filename")) + { + free(my_instance->filename); + my_instance->filename = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "source")) + { + my_instance->source = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "user")) + { + my_instance->user = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "delimiter")) + { + free(my_instance->delimiter); + my_instance->delimiter = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "query_delimiter")) + { + free(my_instance->query_delimiter); + my_instance->query_delimiter = strdup(params[i]->value); + my_instance->query_delimiter_size = strlen(my_instance->query_delimiter); + } + } + my_instance->sessions = 0; + my_instance->fp = fopen(my_instance->filename, "w"); + if (my_instance->fp == NULL) + { + MXS_ERROR("Opening output file '%s' for tpmfilter failed due to %d, %s", my_instance->filename, errno, + strerror(errno)); + return NULL; + } + } + return (FILTER *)my_instance; } /** @@ -234,120 +244,132 @@ TPM_INSTANCE *my_instance; * * Every session uses the same log file. * - * @param instance The filter instance data - * @param session The session itself + * @param instance The filter instance data + * @param session The session itself * @return Session specific data for this session */ -static void * +static void * newSession(FILTER *instance, SESSION *session) { -TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; -TPM_SESSION *my_session; -int i; -char *remote, *user; + TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; + TPM_SESSION *my_session; + int i; + char *remote, *user; - if ((my_session = calloc(1, sizeof(TPM_SESSION))) != NULL) - { - atomic_add(&my_instance->sessions,1); + if ((my_session = calloc(1, sizeof(TPM_SESSION))) != NULL) + { + atomic_add(&my_instance->sessions, 1); - my_session->max_sql_size = 4 * 1024; // default max query size of 4k. - my_session->sql = (char*)malloc(my_session->max_sql_size); - memset(my_session->sql, 0x00, my_session->max_sql_size); - my_session->buf = (char*)malloc(buf_size); - my_session->sql_index = 0; - my_session->n_statements = 0; - my_session->total.tv_sec = 0; - my_session->total.tv_usec = 0; - my_session->current = NULL; - if ((remote = session_get_remote(session)) != NULL) - my_session->clientHost = strdup(remote); - else - my_session->clientHost = NULL; - if ((user = session_getUser(session)) != NULL) - my_session->userName = strdup(user); - else - my_session->userName = NULL; - my_session->active = 1; - if (my_instance->source && my_session->clientHost && strcmp(my_session->clientHost, - my_instance->source)) - my_session->active = 0; - if (my_instance->user && my_session->userName && strcmp(my_session->userName, - my_instance->user)) - my_session->active = 0; - } + my_session->max_sql_size = 4 * 1024; // default max query size of 4k. + my_session->sql = (char*)malloc(my_session->max_sql_size); + memset(my_session->sql, 0x00, my_session->max_sql_size); + my_session->buf = (char*)malloc(buf_size); + my_session->sql_index = 0; + my_session->n_statements = 0; + my_session->total.tv_sec = 0; + my_session->total.tv_usec = 0; + my_session->current = NULL; + if ((remote = session_get_remote(session)) != NULL) + { + my_session->clientHost = strdup(remote); + } + else + { + my_session->clientHost = NULL; + } + if ((user = session_getUser(session)) != NULL) + { + my_session->userName = strdup(user); + } + else + { + my_session->userName = NULL; + } + my_session->active = 1; + if (my_instance->source && my_session->clientHost && strcmp(my_session->clientHost, + my_instance->source)) + { + my_session->active = 0; + } + if (my_instance->user && my_session->userName && strcmp(my_session->userName, + my_instance->user)) + { + my_session->active = 0; + } + } - return my_session; + return my_session; } /** * Close a session with the filter, this is the mechanism * by which a filter may cleanup data structure etc. * - * @param instance The filter instance data - * @param session The session being closed + * @param instance The filter instance data + * @param session The session being closed */ -static void +static void closeSession(FILTER *instance, void *session) { - TPM_SESSION *my_session = (TPM_SESSION *)session; - TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; - if (my_instance->fp != NULL) - { - // flush FP when a session is closed. - fflush(my_instance->fp); - } + TPM_SESSION *my_session = (TPM_SESSION *)session; + TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; + if (my_instance->fp != NULL) + { + // flush FP when a session is closed. + fflush(my_instance->fp); + } } /** * Free the memory associated with the session * - * @param instance The filter instance - * @param session The filter session + * @param instance The filter instance + * @param session The filter session */ static void freeSession(FILTER *instance, void *session) { -TPM_SESSION *my_session = (TPM_SESSION *)session; + TPM_SESSION *my_session = (TPM_SESSION *)session; - free(my_session->clientHost); - free(my_session->userName); - free(my_session->sql); - free(my_session->buf); - free(session); - return; + free(my_session->clientHost); + free(my_session->userName); + free(my_session->sql); + free(my_session->buf); + free(session); + return; } /** * Set the downstream filter or router to which queries will be * passed from this filter. * - * @param instance The filter instance data - * @param session The filter session - * @param downstream The downstream filter or router. + * @param instance The filter instance data + * @param session The filter session + * @param downstream The downstream filter or router. */ static void setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) { -TPM_SESSION *my_session = (TPM_SESSION *)session; + TPM_SESSION *my_session = (TPM_SESSION *)session; - my_session->down = *downstream; + my_session->down = *downstream; } /** * Set the upstream filter or session to which results will be * passed from this filter. * - * @param instance The filter instance data - * @param session The filter session - * @param upstream The upstream filter or session. + * @param instance The filter instance data + * @param session The filter session + * @param upstream The upstream filter or session. */ static void setUpstream(FILTER *instance, void *session, UPSTREAM *upstream) { -TPM_SESSION *my_session = (TPM_SESSION *)session; + TPM_SESSION *my_session = (TPM_SESSION *)session; - my_session->up = *upstream; + my_session->up = *upstream; } /** @@ -356,148 +378,149 @@ TPM_SESSION *my_session = (TPM_SESSION *)session; * query should normally be passed to the downstream component * (filter or router) in the filter chain. * - * @param instance The filter instance data - * @param session The filter session - * @param queue The query data + * @param instance The filter instance data + * @param session The filter session + * @param queue The query data */ -static int +static int routeQuery(FILTER *instance, void *session, GWBUF *queue) { -TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; -TPM_SESSION *my_session = (TPM_SESSION *)session; -char *ptr = NULL; -size_t i; + TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; + TPM_SESSION *my_session = (TPM_SESSION *)session; + char *ptr = NULL; + size_t i; - if (my_session->active) - { - if (queue->next != NULL) - { - queue = gwbuf_make_contiguous(queue); - } - if ((ptr = modutil_get_SQL(queue)) != NULL) - { - my_session->query_end = false; - /* check for commit and rollback */ - if (strlen(ptr) > 5) - { - size_t ptr_size = strlen(ptr)+1; - char* buf = my_session->buf; - for (i=0; i < ptr_size && i < buf_size; ++i) - { - buf[i] = tolower(ptr[i]); - } - if (strncmp(buf, "commit", 6) == 0) - { - my_session->query_end = true; - } - else if (strncmp(buf, "rollback", 8) == 0) - { - my_session->query_end = true; - my_session->sql_index = 0; - } - } + if (my_session->active) + { + if (queue->next != NULL) + { + queue = gwbuf_make_contiguous(queue); + } + if ((ptr = modutil_get_SQL(queue)) != NULL) + { + my_session->query_end = false; + /* check for commit and rollback */ + if (strlen(ptr) > 5) + { + size_t ptr_size = strlen(ptr) + 1; + char* buf = my_session->buf; + for (i = 0; i < ptr_size && i < buf_size; ++i) + { + buf[i] = tolower(ptr[i]); + } + if (strncmp(buf, "commit", 6) == 0) + { + my_session->query_end = true; + } + else if (strncmp(buf, "rollback", 8) == 0) + { + my_session->query_end = true; + my_session->sql_index = 0; + } + } - /* for normal sql statements */ - if (!my_session->query_end) - { - /* check and expand buffer size first. */ - size_t new_sql_size = my_session->max_sql_size; - size_t len = my_session->sql_index + strlen(ptr) + my_instance->query_delimiter_size + 1; + /* for normal sql statements */ + if (!my_session->query_end) + { + /* check and expand buffer size first. */ + size_t new_sql_size = my_session->max_sql_size; + size_t len = my_session->sql_index + strlen(ptr) + my_instance->query_delimiter_size + 1; - /* if the total length of query statements exceeds the maximum limit, print an error and return */ - if (len > sql_size_limit) - { - MXS_ERROR("The size of query statements exceeds the maximum buffer limit of 64MB."); - goto retblock; - } + /* if the total length of query statements exceeds the maximum limit, print an error and return */ + if (len > sql_size_limit) + { + MXS_ERROR("The size of query statements exceeds the maximum buffer limit of 64MB."); + goto retblock; + } - /* double buffer size until the buffer fits the query */ - while (len > new_sql_size) - { - new_sql_size *= 2; - } - if (new_sql_size > my_session->max_sql_size) - { - char* new_sql = (char*)malloc(new_sql_size); - if (new_sql == NULL) - { - MXS_ERROR("Memory allocation failure."); - goto retblock; - } - memcpy(new_sql, my_session->sql, my_session->sql_index); - free(my_session->sql); - my_session->sql = new_sql; - my_session->max_sql_size = new_sql_size; - } + /* double buffer size until the buffer fits the query */ + while (len > new_sql_size) + { + new_sql_size *= 2; + } + if (new_sql_size > my_session->max_sql_size) + { + char* new_sql = (char*)malloc(new_sql_size); + if (new_sql == NULL) + { + MXS_ERROR("Memory allocation failure."); + goto retblock; + } + memcpy(new_sql, my_session->sql, my_session->sql_index); + free(my_session->sql); + my_session->sql = new_sql; + my_session->max_sql_size = new_sql_size; + } - /* first statement */ - if (my_session->sql_index == 0) - { - memcpy(my_session->sql, ptr, strlen(ptr)); - my_session->sql_index += strlen(ptr); - gettimeofday(&my_session->current_start, NULL); - } - /* otherwise, append the statement with semicolon as a statement delimiter */ - else - { - /* append a query delimiter */ - memcpy(my_session->sql + my_session->sql_index, my_instance->query_delimiter, my_instance->query_delimiter_size); - /* append the next query statement */ - memcpy(my_session->sql + my_session->sql_index + my_instance->query_delimiter_size, ptr, strlen(ptr)); - /* set new pointer for the buffer */ - my_session->sql_index += (my_instance->query_delimiter_size + strlen(ptr)); - } - } - } - } + /* first statement */ + if (my_session->sql_index == 0) + { + memcpy(my_session->sql, ptr, strlen(ptr)); + my_session->sql_index += strlen(ptr); + gettimeofday(&my_session->current_start, NULL); + } + /* otherwise, append the statement with semicolon as a statement delimiter */ + else + { + /* append a query delimiter */ + memcpy(my_session->sql + my_session->sql_index, my_instance->query_delimiter, + my_instance->query_delimiter_size); + /* append the next query statement */ + memcpy(my_session->sql + my_session->sql_index + my_instance->query_delimiter_size, ptr, strlen(ptr)); + /* set new pointer for the buffer */ + my_session->sql_index += (my_instance->query_delimiter_size + strlen(ptr)); + } + } + } + } retblock: - free(ptr); - /* Pass the query downstream */ - return my_session->down.routeQuery(my_session->down.instance, - my_session->down.session, queue); + free(ptr); + /* Pass the query downstream */ + return my_session->down.routeQuery(my_session->down.instance, + my_session->down.session, queue); } static int clientReply(FILTER *instance, void *session, GWBUF *reply) { -TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; -TPM_SESSION *my_session = (TPM_SESSION *)session; -struct timeval tv, diff; -int i, inserted; + TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; + TPM_SESSION *my_session = (TPM_SESSION *)session; + struct timeval tv, diff; + int i, inserted; -/* found 'commit' and sql statements exist. */ - if (my_session->query_end && my_session->sql_index > 0) - { - gettimeofday(&tv, NULL); - timersub(&tv, &(my_session->current_start), &diff); + /* found 'commit' and sql statements exist. */ + if (my_session->query_end && my_session->sql_index > 0) + { + gettimeofday(&tv, NULL); + timersub(&tv, &(my_session->current_start), &diff); - /* get latency */ - uint64_t millis = (diff.tv_sec * (uint64_t)1000 + diff.tv_usec / 1000); - /* get timestamp */ - uint64_t timestamp = (tv.tv_sec + (tv.tv_usec / (1000*1000))); + /* get latency */ + uint64_t millis = (diff.tv_sec * (uint64_t)1000 + diff.tv_usec / 1000); + /* get timestamp */ + uint64_t timestamp = (tv.tv_sec + (tv.tv_usec / (1000 * 1000))); - *(my_session->sql + my_session->sql_index) = '\0'; + *(my_session->sql + my_session->sql_index) = '\0'; - /* print to log. */ - fprintf(my_instance->fp, "%ld%s%s%s%s%s%ld%s%s\n", - timestamp, - my_instance->delimiter, - my_session->clientHost, - my_instance->delimiter, - my_session->userName, - my_instance->delimiter, - millis, - my_instance->delimiter, - my_session->sql); + /* print to log. */ + fprintf(my_instance->fp, "%ld%s%s%s%s%s%ld%s%s\n", + timestamp, + my_instance->delimiter, + my_session->clientHost, + my_instance->delimiter, + my_session->userName, + my_instance->delimiter, + millis, + my_instance->delimiter, + my_session->sql); - my_session->sql_index = 0; - } + my_session->sql_index = 0; + } - /* Pass the result upstream */ - return my_session->up.clientReply(my_session->up.instance, - my_session->up.session, reply); + /* Pass the result upstream */ + return my_session->up.clientReply(my_session->up.instance, + my_session->up.session, reply); } /** @@ -507,30 +530,30 @@ int i, inserted; * instance as a whole, otherwise print diagnostics for the * particular session. * - * @param instance The filter instance - * @param fsession Filter session, may be NULL - * @param dcb The DCB for diagnostic output + * @param instance The filter instance + * @param fsession Filter session, may be NULL + * @param dcb The DCB for diagnostic output */ -static void +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb) { -TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; -TPM_SESSION *my_session = (TPM_SESSION *)fsession; -int i; + TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; + TPM_SESSION *my_session = (TPM_SESSION *)fsession; + int i; - if (my_instance->source) - dcb_printf(dcb, "\t\tLimit logging to connections from %s\n", - my_instance->source); - if (my_instance->user) - dcb_printf(dcb, "\t\tLimit logging to user %s\n", - my_instance->user); - if (my_instance->filename) - dcb_printf(dcb, "\t\tLogging to file %s.\n", - my_instance->filename); - if (my_instance->delimiter) - dcb_printf(dcb, "\t\tLogging with delimiter %s.\n", - my_instance->delimiter); - if (my_instance->query_delimiter) - dcb_printf(dcb, "\t\tLogging with query delimiter %s.\n", - my_instance->query_delimiter); + if (my_instance->source) + dcb_printf(dcb, "\t\tLimit logging to connections from %s\n", + my_instance->source); + if (my_instance->user) + dcb_printf(dcb, "\t\tLimit logging to user %s\n", + my_instance->user); + if (my_instance->filename) + dcb_printf(dcb, "\t\tLogging to file %s.\n", + my_instance->filename); + if (my_instance->delimiter) + dcb_printf(dcb, "\t\tLogging with delimiter %s.\n", + my_instance->delimiter); + if (my_instance->query_delimiter) + dcb_printf(dcb, "\t\tLogging with query delimiter %s.\n", + my_instance->query_delimiter); }