From 7776d5596360814e7d2303bf88e2cb0981174307 Mon Sep 17 00:00:00 2001 From: Dong Young Yoon Date: Wed, 14 Dec 2016 12:07:48 -0500 Subject: [PATCH] tpmfilter now prints which server the query has been executed on. --- include/maxscale/buffer.h | 3 + server/modules/filter/tpmfilter/tpmfilter.c | 247 +++++++++++++----- .../MySQL/MySQLBackend/mysql_backend.c | 1 + 3 files changed, 190 insertions(+), 61 deletions(-) diff --git a/include/maxscale/buffer.h b/include/maxscale/buffer.h index 54c5fdc42..0b4637fc6 100644 --- a/include/maxscale/buffer.h +++ b/include/maxscale/buffer.h @@ -35,6 +35,8 @@ MXS_BEGIN_DECLS +struct server; + /** * Buffer properties - used to store properties related to the buffer * contents. This may be added at any point during the processing of the @@ -129,6 +131,7 @@ typedef struct gwbuf gwbuf_type_t gwbuf_type; /*< buffer's data type information */ HINT *hint; /*< Hint data for this buffer */ BUF_PROPERTY *properties; /*< Buffer properties */ + struct server *server; } GWBUF; /*< diff --git a/server/modules/filter/tpmfilter/tpmfilter.c b/server/modules/filter/tpmfilter/tpmfilter.c index 0dd59002f..38dcb4a4c 100644 --- a/server/modules/filter/tpmfilter/tpmfilter.c +++ b/server/modules/filter/tpmfilter/tpmfilter.c @@ -28,13 +28,14 @@ * * Optional parameters: * filename= - * delimiter= - * query_delimiter= + * delimiter= + * query_delimiter= * source= * user= * * Date Who Description * 06/12/2015 Dong Young Yoon Initial implementation + * 14/12/2016 Dong Young Yoon Prints which server the query executed on * * @endverbatim */ @@ -42,15 +43,22 @@ #include #include #include +#include +#include +#include +#include +#include +#include + +#include #include #include #include #include -#include -#include -#include -#include +#include +#include #include +#include /** Defined in log_manager.cc */ extern int lm_enabled_logfiles_bitmask; @@ -64,10 +72,13 @@ MODULE_INFO info = "Transaction Performance Monitoring filter" }; -static char *version_str = "V1.0.0"; +static char *version_str = "V1.0.1"; static size_t buf_size = 10; static size_t sql_size_limit = 64 * 1024 * 1024; /* The maximum size for query statements in a transaction (64MB) */ +static const int default_sql_size = 4 * 1024; +static const char* default_query_delimiter = "@@@"; +static const char* default_log_delimiter = ":::"; /* * The filter entry points @@ -82,6 +93,7 @@ static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); static int clientReply(FILTER *instance, void *fsession, GWBUF *queue); static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); static uint64_t getCapabilities(void); +static void checkNamedPipe(void *args); static FILTER_OBJECT MyObject = { @@ -109,6 +121,9 @@ typedef struct char *filename; /* filename */ char *delimiter; /* delimiter for columns in a log */ char *query_delimiter; /* delimiter for query statements in a transaction */ + char *named_pipe; + int named_pipe_fd; + bool log_enabled; int query_delimiter_size; /* the length of the query delimiter */ FILE* fp; @@ -187,48 +202,104 @@ GetModuleObject() static FILTER * createInstance(const char *name, char **options, FILTER_PARAMETER **params) { - int i; + int i, ret; TPM_INSTANCE *my_instance; - if ((my_instance = calloc(1, sizeof(TPM_INSTANCE))) != NULL) + if ((my_instance = MXS_CALLOC(1, sizeof(TPM_INSTANCE))) != NULL) { my_instance->source = NULL; my_instance->user = NULL; + my_instance->named_pipe = NULL; + my_instance->log_enabled = false; /* set default log filename */ - my_instance->filename = strdup("tpm.log"); + my_instance->filename = MXS_STRDUP_A("tpm.log"); /* set default delimiter */ - my_instance->delimiter = strdup("|"); + my_instance->delimiter = MXS_STRDUP_A(default_log_delimiter); /* set default query delimiter */ - my_instance->query_delimiter = strdup(";"); - my_instance->query_delimiter_size = 1; + my_instance->query_delimiter = MXS_STRDUP_A(default_query_delimiter); + my_instance->query_delimiter_size = 3; for (i = 0; params && params[i]; i++) { if (!strcmp(params[i]->name, "filename")) { - free(my_instance->filename); - my_instance->filename = strdup(params[i]->value); + MXS_FREE(my_instance->filename); + my_instance->filename = MXS_STRDUP_A(params[i]->value); } else if (!strcmp(params[i]->name, "source")) { - my_instance->source = strdup(params[i]->value); + my_instance->source = MXS_STRDUP_A(params[i]->value); } else if (!strcmp(params[i]->name, "user")) { - my_instance->user = strdup(params[i]->value); + my_instance->user = MXS_STRDUP_A(params[i]->value); } else if (!strcmp(params[i]->name, "delimiter")) { - free(my_instance->delimiter); - my_instance->delimiter = strdup(params[i]->value); + MXS_FREE(my_instance->delimiter); + my_instance->delimiter = MXS_STRDUP_A(params[i]->value); } else if (!strcmp(params[i]->name, "query_delimiter")) { - free(my_instance->query_delimiter); - my_instance->query_delimiter = strdup(params[i]->value); + MXS_FREE(my_instance->query_delimiter); + my_instance->query_delimiter = MXS_STRDUP_A(params[i]->value); my_instance->query_delimiter_size = strlen(my_instance->query_delimiter); } + else if (!strcmp(params[i]->name, "named_pipe")) + { + if (params[i]->value == NULL) + { + MXS_ERROR("You need to specify 'named_pipe' for tpmfilter."); + MXS_FREE(my_instance); + return NULL; + } + else + { + my_instance->named_pipe = MXS_STRDUP_A(params[i]->value); + // check if the file exists first. + if (access(my_instance->named_pipe, F_OK) == 0) + { + // if exists, check if it is a named pipe. + struct stat st; + ret = stat(my_instance->named_pipe, &st); + + // check whether the file is named pipe. + if (ret == -1 && errno != ENOENT) + { + MXS_ERROR("stat() failed on named pipe: %s", strerror(errno)); + MXS_FREE(my_instance); + return NULL; + } + if (ret == 0 && S_ISFIFO(st.st_mode)) + { + // if it is a named pipe, we delete it and recreate it. + unlink(my_instance->named_pipe); + } + else + { + MXS_ERROR("The file '%s' already exists and it is not a named pipe.", my_instance->named_pipe); + MXS_FREE(my_instance); + return NULL; + } + } + + // now create the named pipe. + ret = mkfifo(my_instance->named_pipe, 0660); + if (ret == -1) + { + MXS_ERROR("mkfifo() failed on named pipe: %s", strerror(errno)); + MXS_FREE(my_instance); + return NULL; + } + } + } + } + if (my_instance->named_pipe == NULL) + { + MXS_ERROR("You need to specify 'named_pipe' for tpmfilter."); + MXS_FREE(my_instance); + return NULL; } my_instance->sessions = 0; my_instance->fp = fopen(my_instance->filename, "w"); @@ -239,6 +310,18 @@ createInstance(const char *name, char **options, FILTER_PARAMETER **params) return NULL; } } + + /* + * Launch a thread that checks the named pipe. + */ + THREAD thread; + if (thread_start(&thread, checkNamedPipe, (void*) my_instance) == NULL) + { + MXS_ERROR("Couldn't create a thread to check the named pipe: %s", strerror(errno)); + MXS_FREE(my_instance); + return NULL; + } + return (FILTER *)my_instance; } @@ -259,14 +342,14 @@ newSession(FILTER *instance, SESSION *session) int i; char *remote, *user; - if ((my_session = calloc(1, sizeof(TPM_SESSION))) != NULL) + if ((my_session = MXS_CALLOC(1, sizeof(TPM_SESSION))) != NULL) { atomic_add(&my_instance->sessions, 1); - my_session->max_sql_size = 4 * 1024; // default max query size of 4k. - my_session->sql = (char*)malloc(my_session->max_sql_size); + my_session->max_sql_size = default_sql_size; // default max query size of 4k. + my_session->sql = (char*)MXS_MALLOC(my_session->max_sql_size); memset(my_session->sql, 0x00, my_session->max_sql_size); - my_session->buf = (char*)malloc(buf_size); + my_session->buf = (char*)MXS_MALLOC(buf_size); my_session->sql_index = 0; my_session->n_statements = 0; my_session->total.tv_sec = 0; @@ -274,7 +357,7 @@ newSession(FILTER *instance, SESSION *session) my_session->current = NULL; if ((remote = session_get_remote(session)) != NULL) { - my_session->clientHost = strdup(remote); + my_session->clientHost = MXS_STRDUP_A(remote); } else { @@ -282,7 +365,7 @@ newSession(FILTER *instance, SESSION *session) } if ((user = session_getUser(session)) != NULL) { - my_session->userName = strdup(user); + my_session->userName = MXS_STRDUP_A(user); } else { @@ -335,11 +418,11 @@ freeSession(FILTER *instance, void *session) { TPM_SESSION *my_session = (TPM_SESSION *)session; - free(my_session->clientHost); - free(my_session->userName); - free(my_session->sql); - free(my_session->buf); - free(session); + MXS_FREE(my_session->clientHost); + MXS_FREE(my_session->userName); + MXS_FREE(my_session->sql); + MXS_FREE(my_session->buf); + MXS_FREE(session); return; } @@ -397,25 +480,19 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue) { if ((ptr = modutil_get_SQL(queue)) != NULL) { + uint32_t query_type = qc_get_type(queue); + int query_len = strlen(ptr); my_session->query_end = false; + /* check for commit and rollback */ - if (strlen(ptr) > 5) + if (query_type & QUERY_TYPE_COMMIT) { - size_t ptr_size = strlen(ptr) + 1; - char* buf = my_session->buf; - for (i = 0; i < ptr_size && i < buf_size; ++i) - { - buf[i] = tolower(ptr[i]); - } - if (strncmp(buf, "commit", 6) == 0) - { - my_session->query_end = true; - } - else if (strncmp(buf, "rollback", 8) == 0) - { - my_session->query_end = true; - my_session->sql_index = 0; - } + my_session->query_end = true; + } + else if (query_type & QUERY_TYPE_ROLLBACK) + { + my_session->query_end = true; + my_session->sql_index = 0; } /* for normal sql statements */ @@ -439,14 +516,14 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue) } if (new_sql_size > my_session->max_sql_size) { - char* new_sql = (char*)malloc(new_sql_size); + char* new_sql = (char*)MXS_MALLOC(new_sql_size); if (new_sql == NULL) { MXS_ERROR("Memory allocation failure."); goto retblock; } memcpy(new_sql, my_session->sql, my_session->sql_index); - free(my_session->sql); + MXS_FREE(my_session->sql); my_session->sql = new_sql; my_session->max_sql_size = new_sql_size; } @@ -475,7 +552,7 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue) retblock: - free(ptr); + MXS_FREE(ptr); /* Pass the query downstream */ return my_session->down.routeQuery(my_session->down.instance, my_session->down.session, queue); @@ -503,16 +580,19 @@ clientReply(FILTER *instance, void *session, GWBUF *reply) *(my_session->sql + my_session->sql_index) = '\0'; /* print to log. */ - fprintf(my_instance->fp, "%ld%s%s%s%s%s%ld%s%s\n", - timestamp, - my_instance->delimiter, - my_session->clientHost, - my_instance->delimiter, - my_session->userName, - my_instance->delimiter, - millis, - my_instance->delimiter, - my_session->sql); + if (my_instance->log_enabled) + { + fprintf(my_instance->fp, "%ld%s%s%s%s%s%ld%s%s\n", + timestamp, + my_instance->delimiter, + reply->server->unique_name, + my_instance->delimiter, + reply->server->name, + my_instance->delimiter, + millis, + my_instance->delimiter, + my_session->sql); + } my_session->sql_index = 0; } @@ -566,3 +646,48 @@ static uint64_t getCapabilities(void) { return RCAP_TYPE_CONTIGUOUS_INPUT; } + +static void checkNamedPipe(void *args) +{ + int ret; + char buffer[2]; + char buf[4096]; + TPM_INSTANCE* inst = (TPM_INSTANCE*) args; + char* named_pipe = inst->named_pipe; + + // open named pipe and this will block until middleware opens it. + while ((inst->named_pipe_fd = open(named_pipe, O_RDONLY)) > 0) + { + // 1 for start logging, 0 for stopping. + while ((ret = read(inst->named_pipe_fd, buffer, 1)) > 0) + { + if (buffer[0] == '1') + { + // reopens the log file. + inst->fp = fopen(inst->filename, "w"); + if (inst->fp == NULL) + { + MXS_ERROR("Failed to open a log file for tpmfilter."); + MXS_FREE(inst); + return; + } + inst->log_enabled = true; + } + else if (buffer[0] == '0') + { + inst->log_enabled = false; + } + } + if (ret == 0) + { + close(inst->named_pipe_fd); + } + } + if (inst->named_pipe_fd == -1) + { + MXS_ERROR("Failed to open the named pipe '%s': %s", named_pipe, strerror(errno)); + return; + } + + return; +} diff --git a/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c b/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c index c6ffd3656..2b0d90df5 100644 --- a/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c +++ b/server/modules/protocol/MySQL/MySQLBackend/mysql_backend.c @@ -895,6 +895,7 @@ gw_read_and_write(DCB *dcb) if (session_ok_to_route(dcb)) { gwbuf_set_type(stmt, GWBUF_TYPE_MYSQL); + stmt->server = dcb->server; session->service->router->clientReply(session->service->router_instance, session->router_session, stmt, dcb);