From b60f000a4c8de2c26df95398b509c0f7bd6413cb Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Thu, 15 Sep 2016 12:21:54 +0300 Subject: [PATCH] Format tpmfilter The filter is now formatted with the astyle config. --- server/modules/filter/tpmfilter/tpmfilter.c | 633 ++++++++++---------- 1 file changed, 328 insertions(+), 305 deletions(-) diff --git a/server/modules/filter/tpmfilter/tpmfilter.c b/server/modules/filter/tpmfilter/tpmfilter.c index 9d360bdab..d17823a0f 100644 --- a/server/modules/filter/tpmfilter/tpmfilter.c +++ b/server/modules/filter/tpmfilter/tpmfilter.c @@ -27,14 +27,14 @@ * in a single GWBUF. * * Optional parameters: - * filename= - * delimiter= - * query_delimiter= - * source= - * user= + * filename= + * delimiter= + * query_delimiter= + * source= + * user= * - * Date Who Description - * 06/12/2015 Dong Young Yoon Initial implementation + * Date Who Description + * 06/12/2015 Dong Young Yoon Initial implementation * * @endverbatim */ @@ -56,31 +56,34 @@ extern int lm_enabled_logfiles_bitmask; extern size_t log_ses_count[]; -MODULE_INFO info = { - MODULE_API_FILTER, - MODULE_GA, - FILTER_VERSION, - "Transaction Performance Monitoring filter" +MODULE_INFO info = +{ + MODULE_API_FILTER, + MODULE_GA, + FILTER_VERSION, + "Transaction Performance Monitoring filter" }; static char *version_str = "V1.0.0"; static size_t buf_size = 10; -static size_t sql_size_limit = 64 * 1024 * 1024; /* The maximum size for query statements in a transaction (64MB) */ +static size_t sql_size_limit = 64 * 1024 * + 1024; /* The maximum size for query statements in a transaction (64MB) */ /* * The filter entry points */ -static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **); -static void *newSession(FILTER *instance, SESSION *session); -static void closeSession(FILTER *instance, void *session); -static void freeSession(FILTER *instance, void *session); -static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); -static void setUpstream(FILTER *instance, void *fsession, UPSTREAM *upstream); -static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); -static int clientReply(FILTER *instance, void *fsession, GWBUF *queue); -static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); +static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **); +static void *newSession(FILTER *instance, SESSION *session); +static void closeSession(FILTER *instance, void *session); +static void freeSession(FILTER *instance, void *session); +static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream); +static void setUpstream(FILTER *instance, void *fsession, UPSTREAM *upstream); +static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); +static int clientReply(FILTER *instance, void *fsession, GWBUF *queue); +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); -static FILTER_OBJECT MyObject = { +static FILTER_OBJECT MyObject = +{ createInstance, newSession, closeSession, @@ -95,16 +98,17 @@ static FILTER_OBJECT MyObject = { /** * A instance structure, every instance will write to a same file. */ -typedef struct { - int sessions; /* Session count */ - char *source; /* The source of the client connection */ - char *user; /* The user name to filter on */ - char *filename; /* filename */ - char *delimiter; /* delimiter for columns in a log */ - char *query_delimiter; /* delimiter for query statements in a transaction */ +typedef struct +{ + int sessions; /* Session count */ + char *source; /* The source of the client connection */ + char *user; /* The user name to filter on */ + char *filename; /* filename */ + char *delimiter; /* delimiter for columns in a log */ + char *query_delimiter; /* delimiter for query statements in a transaction */ - int query_delimiter_size; /* the length of the query delimiter */ - FILE* fp; + int query_delimiter_size; /* the length of the query delimiter */ + FILE* fp; } TPM_INSTANCE; /** @@ -115,22 +119,23 @@ typedef struct { * * It also holds the file descriptor to which queries are written. */ -typedef struct { - DOWNSTREAM down; - UPSTREAM up; - int active; - char *clientHost; - char *userName; - char* sql; - struct timeval start; - char *current; - int n_statements; - struct timeval total; - struct timeval current_start; - bool query_end; - char *buf; - int sql_index; - size_t max_sql_size; +typedef struct +{ + DOWNSTREAM down; + UPSTREAM up; + int active; + char *clientHost; + char *userName; + char* sql; + struct timeval start; + char *current; + int n_statements; + struct timeval total; + struct timeval current_start; + bool query_end; + char *buf; + int sql_index; + size_t max_sql_size; } TPM_SESSION; /** @@ -141,7 +146,7 @@ typedef struct { char * version() { - return version_str; + return version_str; } /** @@ -164,69 +169,74 @@ ModuleInit() FILTER_OBJECT * GetModuleObject() { - return &MyObject; + return &MyObject; } /** * Create an instance of the filter for a particular service * within MaxScale. * - * @param options The options for this filter - * @param params The array of name/value pair parameters for the filter + * @param options The options for this filter + * @param params The array of name/value pair parameters for the filter * * @return The instance data for this new instance */ -static FILTER * +static FILTER * createInstance(const char *name, char **options, FILTER_PARAMETER **params) { -int i; -TPM_INSTANCE *my_instance; + int i; + TPM_INSTANCE *my_instance; - if ((my_instance = calloc(1, sizeof(TPM_INSTANCE))) != NULL) - { - my_instance->source = NULL; - my_instance->user = NULL; + if ((my_instance = calloc(1, sizeof(TPM_INSTANCE))) != NULL) + { + my_instance->source = NULL; + my_instance->user = NULL; - /* set default log filename */ - my_instance->filename = strdup("tpm.log"); - /* set default delimiter */ - my_instance->delimiter = strdup("|"); - /* set default query delimiter */ - my_instance->query_delimiter = strdup(";"); - my_instance->query_delimiter_size = 1; + /* set default log filename */ + my_instance->filename = strdup("tpm.log"); + /* set default delimiter */ + my_instance->delimiter = strdup("|"); + /* set default query delimiter */ + my_instance->query_delimiter = strdup(";"); + my_instance->query_delimiter_size = 1; - for (i = 0; params && params[i]; i++) - { - if (!strcmp(params[i]->name, "filename")) - { - free(my_instance->filename); - my_instance->filename = strdup(params[i]->value); - } - else if (!strcmp(params[i]->name, "source")) - my_instance->source = strdup(params[i]->value); - else if (!strcmp(params[i]->name, "user")) - my_instance->user = strdup(params[i]->value); - else if (!strcmp(params[i]->name, "delimiter")) - { - free(my_instance->delimiter); - my_instance->delimiter = strdup(params[i]->value); - } - else if (!strcmp(params[i]->name, "query_delimiter")) - { - free(my_instance->query_delimiter); - my_instance->query_delimiter = strdup(params[i]->value); - my_instance->query_delimiter_size = strlen(my_instance->query_delimiter); - } - } - my_instance->sessions = 0; - my_instance->fp = fopen(my_instance->filename, "w"); - if (my_instance->fp == NULL) - { - MXS_ERROR("Opening output file '%s' for tpmfilter failed due to %d, %s", my_instance->filename, errno, strerror(errno)); - return NULL; - } - } - return (FILTER *)my_instance; + for (i = 0; params && params[i]; i++) + { + if (!strcmp(params[i]->name, "filename")) + { + free(my_instance->filename); + my_instance->filename = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "source")) + { + my_instance->source = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "user")) + { + my_instance->user = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "delimiter")) + { + free(my_instance->delimiter); + my_instance->delimiter = strdup(params[i]->value); + } + else if (!strcmp(params[i]->name, "query_delimiter")) + { + free(my_instance->query_delimiter); + my_instance->query_delimiter = strdup(params[i]->value); + my_instance->query_delimiter_size = strlen(my_instance->query_delimiter); + } + } + my_instance->sessions = 0; + my_instance->fp = fopen(my_instance->filename, "w"); + if (my_instance->fp == NULL) + { + MXS_ERROR("Opening output file '%s' for tpmfilter failed due to %d, %s", my_instance->filename, errno, + strerror(errno)); + return NULL; + } + } + return (FILTER *)my_instance; } /** @@ -234,120 +244,132 @@ TPM_INSTANCE *my_instance; * * Every session uses the same log file. * - * @param instance The filter instance data - * @param session The session itself + * @param instance The filter instance data + * @param session The session itself * @return Session specific data for this session */ -static void * +static void * newSession(FILTER *instance, SESSION *session) { -TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; -TPM_SESSION *my_session; -int i; -char *remote, *user; + TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; + TPM_SESSION *my_session; + int i; + char *remote, *user; - if ((my_session = calloc(1, sizeof(TPM_SESSION))) != NULL) - { - atomic_add(&my_instance->sessions,1); + if ((my_session = calloc(1, sizeof(TPM_SESSION))) != NULL) + { + atomic_add(&my_instance->sessions, 1); - my_session->max_sql_size = 4 * 1024; // default max query size of 4k. - my_session->sql = (char*)malloc(my_session->max_sql_size); - memset(my_session->sql, 0x00, my_session->max_sql_size); - my_session->buf = (char*)malloc(buf_size); - my_session->sql_index = 0; - my_session->n_statements = 0; - my_session->total.tv_sec = 0; - my_session->total.tv_usec = 0; - my_session->current = NULL; - if ((remote = session_get_remote(session)) != NULL) - my_session->clientHost = strdup(remote); - else - my_session->clientHost = NULL; - if ((user = session_getUser(session)) != NULL) - my_session->userName = strdup(user); - else - my_session->userName = NULL; - my_session->active = 1; - if (my_instance->source && my_session->clientHost && strcmp(my_session->clientHost, - my_instance->source)) - my_session->active = 0; - if (my_instance->user && my_session->userName && strcmp(my_session->userName, - my_instance->user)) - my_session->active = 0; - } + my_session->max_sql_size = 4 * 1024; // default max query size of 4k. + my_session->sql = (char*)malloc(my_session->max_sql_size); + memset(my_session->sql, 0x00, my_session->max_sql_size); + my_session->buf = (char*)malloc(buf_size); + my_session->sql_index = 0; + my_session->n_statements = 0; + my_session->total.tv_sec = 0; + my_session->total.tv_usec = 0; + my_session->current = NULL; + if ((remote = session_get_remote(session)) != NULL) + { + my_session->clientHost = strdup(remote); + } + else + { + my_session->clientHost = NULL; + } + if ((user = session_getUser(session)) != NULL) + { + my_session->userName = strdup(user); + } + else + { + my_session->userName = NULL; + } + my_session->active = 1; + if (my_instance->source && my_session->clientHost && strcmp(my_session->clientHost, + my_instance->source)) + { + my_session->active = 0; + } + if (my_instance->user && my_session->userName && strcmp(my_session->userName, + my_instance->user)) + { + my_session->active = 0; + } + } - return my_session; + return my_session; } /** * Close a session with the filter, this is the mechanism * by which a filter may cleanup data structure etc. * - * @param instance The filter instance data - * @param session The session being closed + * @param instance The filter instance data + * @param session The session being closed */ -static void +static void closeSession(FILTER *instance, void *session) { - TPM_SESSION *my_session = (TPM_SESSION *)session; - TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; - if (my_instance->fp != NULL) - { - // flush FP when a session is closed. - fflush(my_instance->fp); - } + TPM_SESSION *my_session = (TPM_SESSION *)session; + TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; + if (my_instance->fp != NULL) + { + // flush FP when a session is closed. + fflush(my_instance->fp); + } } /** * Free the memory associated with the session * - * @param instance The filter instance - * @param session The filter session + * @param instance The filter instance + * @param session The filter session */ static void freeSession(FILTER *instance, void *session) { -TPM_SESSION *my_session = (TPM_SESSION *)session; + TPM_SESSION *my_session = (TPM_SESSION *)session; - free(my_session->clientHost); - free(my_session->userName); - free(my_session->sql); - free(my_session->buf); - free(session); - return; + free(my_session->clientHost); + free(my_session->userName); + free(my_session->sql); + free(my_session->buf); + free(session); + return; } /** * Set the downstream filter or router to which queries will be * passed from this filter. * - * @param instance The filter instance data - * @param session The filter session - * @param downstream The downstream filter or router. + * @param instance The filter instance data + * @param session The filter session + * @param downstream The downstream filter or router. */ static void setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream) { -TPM_SESSION *my_session = (TPM_SESSION *)session; + TPM_SESSION *my_session = (TPM_SESSION *)session; - my_session->down = *downstream; + my_session->down = *downstream; } /** * Set the upstream filter or session to which results will be * passed from this filter. * - * @param instance The filter instance data - * @param session The filter session - * @param upstream The upstream filter or session. + * @param instance The filter instance data + * @param session The filter session + * @param upstream The upstream filter or session. */ static void setUpstream(FILTER *instance, void *session, UPSTREAM *upstream) { -TPM_SESSION *my_session = (TPM_SESSION *)session; + TPM_SESSION *my_session = (TPM_SESSION *)session; - my_session->up = *upstream; + my_session->up = *upstream; } /** @@ -356,148 +378,149 @@ TPM_SESSION *my_session = (TPM_SESSION *)session; * query should normally be passed to the downstream component * (filter or router) in the filter chain. * - * @param instance The filter instance data - * @param session The filter session - * @param queue The query data + * @param instance The filter instance data + * @param session The filter session + * @param queue The query data */ -static int +static int routeQuery(FILTER *instance, void *session, GWBUF *queue) { -TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; -TPM_SESSION *my_session = (TPM_SESSION *)session; -char *ptr = NULL; -size_t i; + TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; + TPM_SESSION *my_session = (TPM_SESSION *)session; + char *ptr = NULL; + size_t i; - if (my_session->active) - { - if (queue->next != NULL) - { - queue = gwbuf_make_contiguous(queue); - } - if ((ptr = modutil_get_SQL(queue)) != NULL) - { - my_session->query_end = false; - /* check for commit and rollback */ - if (strlen(ptr) > 5) - { - size_t ptr_size = strlen(ptr)+1; - char* buf = my_session->buf; - for (i=0; i < ptr_size && i < buf_size; ++i) - { - buf[i] = tolower(ptr[i]); - } - if (strncmp(buf, "commit", 6) == 0) - { - my_session->query_end = true; - } - else if (strncmp(buf, "rollback", 8) == 0) - { - my_session->query_end = true; - my_session->sql_index = 0; - } - } + if (my_session->active) + { + if (queue->next != NULL) + { + queue = gwbuf_make_contiguous(queue); + } + if ((ptr = modutil_get_SQL(queue)) != NULL) + { + my_session->query_end = false; + /* check for commit and rollback */ + if (strlen(ptr) > 5) + { + size_t ptr_size = strlen(ptr) + 1; + char* buf = my_session->buf; + for (i = 0; i < ptr_size && i < buf_size; ++i) + { + buf[i] = tolower(ptr[i]); + } + if (strncmp(buf, "commit", 6) == 0) + { + my_session->query_end = true; + } + else if (strncmp(buf, "rollback", 8) == 0) + { + my_session->query_end = true; + my_session->sql_index = 0; + } + } - /* for normal sql statements */ - if (!my_session->query_end) - { - /* check and expand buffer size first. */ - size_t new_sql_size = my_session->max_sql_size; - size_t len = my_session->sql_index + strlen(ptr) + my_instance->query_delimiter_size + 1; + /* for normal sql statements */ + if (!my_session->query_end) + { + /* check and expand buffer size first. */ + size_t new_sql_size = my_session->max_sql_size; + size_t len = my_session->sql_index + strlen(ptr) + my_instance->query_delimiter_size + 1; - /* if the total length of query statements exceeds the maximum limit, print an error and return */ - if (len > sql_size_limit) - { - MXS_ERROR("The size of query statements exceeds the maximum buffer limit of 64MB."); - goto retblock; - } + /* if the total length of query statements exceeds the maximum limit, print an error and return */ + if (len > sql_size_limit) + { + MXS_ERROR("The size of query statements exceeds the maximum buffer limit of 64MB."); + goto retblock; + } - /* double buffer size until the buffer fits the query */ - while (len > new_sql_size) - { - new_sql_size *= 2; - } - if (new_sql_size > my_session->max_sql_size) - { - char* new_sql = (char*)malloc(new_sql_size); - if (new_sql == NULL) - { - MXS_ERROR("Memory allocation failure."); - goto retblock; - } - memcpy(new_sql, my_session->sql, my_session->sql_index); - free(my_session->sql); - my_session->sql = new_sql; - my_session->max_sql_size = new_sql_size; - } + /* double buffer size until the buffer fits the query */ + while (len > new_sql_size) + { + new_sql_size *= 2; + } + if (new_sql_size > my_session->max_sql_size) + { + char* new_sql = (char*)malloc(new_sql_size); + if (new_sql == NULL) + { + MXS_ERROR("Memory allocation failure."); + goto retblock; + } + memcpy(new_sql, my_session->sql, my_session->sql_index); + free(my_session->sql); + my_session->sql = new_sql; + my_session->max_sql_size = new_sql_size; + } - /* first statement */ - if (my_session->sql_index == 0) - { - memcpy(my_session->sql, ptr, strlen(ptr)); - my_session->sql_index += strlen(ptr); - gettimeofday(&my_session->current_start, NULL); - } - /* otherwise, append the statement with semicolon as a statement delimiter */ - else - { - /* append a query delimiter */ - memcpy(my_session->sql + my_session->sql_index, my_instance->query_delimiter, my_instance->query_delimiter_size); - /* append the next query statement */ - memcpy(my_session->sql + my_session->sql_index + my_instance->query_delimiter_size, ptr, strlen(ptr)); - /* set new pointer for the buffer */ - my_session->sql_index += (my_instance->query_delimiter_size + strlen(ptr)); - } - } - } - } + /* first statement */ + if (my_session->sql_index == 0) + { + memcpy(my_session->sql, ptr, strlen(ptr)); + my_session->sql_index += strlen(ptr); + gettimeofday(&my_session->current_start, NULL); + } + /* otherwise, append the statement with semicolon as a statement delimiter */ + else + { + /* append a query delimiter */ + memcpy(my_session->sql + my_session->sql_index, my_instance->query_delimiter, + my_instance->query_delimiter_size); + /* append the next query statement */ + memcpy(my_session->sql + my_session->sql_index + my_instance->query_delimiter_size, ptr, strlen(ptr)); + /* set new pointer for the buffer */ + my_session->sql_index += (my_instance->query_delimiter_size + strlen(ptr)); + } + } + } + } retblock: - free(ptr); - /* Pass the query downstream */ - return my_session->down.routeQuery(my_session->down.instance, - my_session->down.session, queue); + free(ptr); + /* Pass the query downstream */ + return my_session->down.routeQuery(my_session->down.instance, + my_session->down.session, queue); } static int clientReply(FILTER *instance, void *session, GWBUF *reply) { -TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; -TPM_SESSION *my_session = (TPM_SESSION *)session; -struct timeval tv, diff; -int i, inserted; + TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; + TPM_SESSION *my_session = (TPM_SESSION *)session; + struct timeval tv, diff; + int i, inserted; -/* found 'commit' and sql statements exist. */ - if (my_session->query_end && my_session->sql_index > 0) - { - gettimeofday(&tv, NULL); - timersub(&tv, &(my_session->current_start), &diff); + /* found 'commit' and sql statements exist. */ + if (my_session->query_end && my_session->sql_index > 0) + { + gettimeofday(&tv, NULL); + timersub(&tv, &(my_session->current_start), &diff); - /* get latency */ - uint64_t millis = (diff.tv_sec * (uint64_t)1000 + diff.tv_usec / 1000); - /* get timestamp */ - uint64_t timestamp = (tv.tv_sec + (tv.tv_usec / (1000*1000))); + /* get latency */ + uint64_t millis = (diff.tv_sec * (uint64_t)1000 + diff.tv_usec / 1000); + /* get timestamp */ + uint64_t timestamp = (tv.tv_sec + (tv.tv_usec / (1000 * 1000))); - *(my_session->sql + my_session->sql_index) = '\0'; + *(my_session->sql + my_session->sql_index) = '\0'; - /* print to log. */ - fprintf(my_instance->fp, "%ld%s%s%s%s%s%ld%s%s\n", - timestamp, - my_instance->delimiter, - my_session->clientHost, - my_instance->delimiter, - my_session->userName, - my_instance->delimiter, - millis, - my_instance->delimiter, - my_session->sql); + /* print to log. */ + fprintf(my_instance->fp, "%ld%s%s%s%s%s%ld%s%s\n", + timestamp, + my_instance->delimiter, + my_session->clientHost, + my_instance->delimiter, + my_session->userName, + my_instance->delimiter, + millis, + my_instance->delimiter, + my_session->sql); - my_session->sql_index = 0; - } + my_session->sql_index = 0; + } - /* Pass the result upstream */ - return my_session->up.clientReply(my_session->up.instance, - my_session->up.session, reply); + /* Pass the result upstream */ + return my_session->up.clientReply(my_session->up.instance, + my_session->up.session, reply); } /** @@ -507,30 +530,30 @@ int i, inserted; * instance as a whole, otherwise print diagnostics for the * particular session. * - * @param instance The filter instance - * @param fsession Filter session, may be NULL - * @param dcb The DCB for diagnostic output + * @param instance The filter instance + * @param fsession Filter session, may be NULL + * @param dcb The DCB for diagnostic output */ -static void +static void diagnostic(FILTER *instance, void *fsession, DCB *dcb) { -TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; -TPM_SESSION *my_session = (TPM_SESSION *)fsession; -int i; + TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance; + TPM_SESSION *my_session = (TPM_SESSION *)fsession; + int i; - if (my_instance->source) - dcb_printf(dcb, "\t\tLimit logging to connections from %s\n", - my_instance->source); - if (my_instance->user) - dcb_printf(dcb, "\t\tLimit logging to user %s\n", - my_instance->user); - if (my_instance->filename) - dcb_printf(dcb, "\t\tLogging to file %s.\n", - my_instance->filename); - if (my_instance->delimiter) - dcb_printf(dcb, "\t\tLogging with delimiter %s.\n", - my_instance->delimiter); - if (my_instance->query_delimiter) - dcb_printf(dcb, "\t\tLogging with query delimiter %s.\n", - my_instance->query_delimiter); + if (my_instance->source) + dcb_printf(dcb, "\t\tLimit logging to connections from %s\n", + my_instance->source); + if (my_instance->user) + dcb_printf(dcb, "\t\tLimit logging to user %s\n", + my_instance->user); + if (my_instance->filename) + dcb_printf(dcb, "\t\tLogging to file %s.\n", + my_instance->filename); + if (my_instance->delimiter) + dcb_printf(dcb, "\t\tLogging with delimiter %s.\n", + my_instance->delimiter); + if (my_instance->query_delimiter) + dcb_printf(dcb, "\t\tLogging with query delimiter %s.\n", + my_instance->query_delimiter); }