tpmfilter now prints which server the query has been executed on.
This commit is contained in:
parent
cb243f47a0
commit
7776d55963
@ -35,6 +35,8 @@
|
||||
|
||||
MXS_BEGIN_DECLS
|
||||
|
||||
struct server;
|
||||
|
||||
/**
|
||||
* Buffer properties - used to store properties related to the buffer
|
||||
* contents. This may be added at any point during the processing of the
|
||||
@ -129,6 +131,7 @@ typedef struct gwbuf
|
||||
gwbuf_type_t gwbuf_type; /*< buffer's data type information */
|
||||
HINT *hint; /*< Hint data for this buffer */
|
||||
BUF_PROPERTY *properties; /*< Buffer properties */
|
||||
struct server *server;
|
||||
} GWBUF;
|
||||
|
||||
/*<
|
||||
|
@ -28,13 +28,14 @@
|
||||
*
|
||||
* Optional parameters:
|
||||
* filename=<name of the file to which transaction performance logs are written (default=tpm.log)>
|
||||
* delimiter=<delimiter for columns in a log (default='|')>
|
||||
* query_delimiter=<delimiter for query statements in a transaction (default=';')>
|
||||
* delimiter=<delimiter for columns in a log (default=':::')>
|
||||
* query_delimiter=<delimiter for query statements in a transaction (default='@@@')>
|
||||
* source=<source address to limit filter>
|
||||
* user=<username to limit filter>
|
||||
*
|
||||
* Date Who Description
|
||||
* 06/12/2015 Dong Young Yoon Initial implementation
|
||||
* 14/12/2016 Dong Young Yoon Prints which server the query executed on
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -42,15 +43,22 @@
|
||||
#include <ctype.h>
|
||||
#include <stdio.h>
|
||||
#include <fcntl.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <regex.h>
|
||||
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/filter.h>
|
||||
#include <maxscale/modinfo.h>
|
||||
#include <maxscale/modutil.h>
|
||||
#include <maxscale/log_manager.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include <sys/time.h>
|
||||
#include <regex.h>
|
||||
#include <maxscale/thread.h>
|
||||
#include <maxscale/server.h>
|
||||
#include <maxscale/atomic.h>
|
||||
#include <maxscale/query_classifier.h>
|
||||
|
||||
/** Defined in log_manager.cc */
|
||||
extern int lm_enabled_logfiles_bitmask;
|
||||
@ -64,10 +72,13 @@ MODULE_INFO info =
|
||||
"Transaction Performance Monitoring filter"
|
||||
};
|
||||
|
||||
static char *version_str = "V1.0.0";
|
||||
static char *version_str = "V1.0.1";
|
||||
static size_t buf_size = 10;
|
||||
static size_t sql_size_limit = 64 * 1024 *
|
||||
1024; /* The maximum size for query statements in a transaction (64MB) */
|
||||
static const int default_sql_size = 4 * 1024;
|
||||
static const char* default_query_delimiter = "@@@";
|
||||
static const char* default_log_delimiter = ":::";
|
||||
|
||||
/*
|
||||
* The filter entry points
|
||||
@ -82,6 +93,7 @@ static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue);
|
||||
static int clientReply(FILTER *instance, void *fsession, GWBUF *queue);
|
||||
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb);
|
||||
static uint64_t getCapabilities(void);
|
||||
static void checkNamedPipe(void *args);
|
||||
|
||||
static FILTER_OBJECT MyObject =
|
||||
{
|
||||
@ -109,6 +121,9 @@ typedef struct
|
||||
char *filename; /* filename */
|
||||
char *delimiter; /* delimiter for columns in a log */
|
||||
char *query_delimiter; /* delimiter for query statements in a transaction */
|
||||
char *named_pipe;
|
||||
int named_pipe_fd;
|
||||
bool log_enabled;
|
||||
|
||||
int query_delimiter_size; /* the length of the query delimiter */
|
||||
FILE* fp;
|
||||
@ -187,48 +202,104 @@ GetModuleObject()
|
||||
static FILTER *
|
||||
createInstance(const char *name, char **options, FILTER_PARAMETER **params)
|
||||
{
|
||||
int i;
|
||||
int i, ret;
|
||||
TPM_INSTANCE *my_instance;
|
||||
|
||||
if ((my_instance = calloc(1, sizeof(TPM_INSTANCE))) != NULL)
|
||||
if ((my_instance = MXS_CALLOC(1, sizeof(TPM_INSTANCE))) != NULL)
|
||||
{
|
||||
my_instance->source = NULL;
|
||||
my_instance->user = NULL;
|
||||
my_instance->named_pipe = NULL;
|
||||
my_instance->log_enabled = false;
|
||||
|
||||
/* set default log filename */
|
||||
my_instance->filename = strdup("tpm.log");
|
||||
my_instance->filename = MXS_STRDUP_A("tpm.log");
|
||||
/* set default delimiter */
|
||||
my_instance->delimiter = strdup("|");
|
||||
my_instance->delimiter = MXS_STRDUP_A(default_log_delimiter);
|
||||
/* set default query delimiter */
|
||||
my_instance->query_delimiter = strdup(";");
|
||||
my_instance->query_delimiter_size = 1;
|
||||
my_instance->query_delimiter = MXS_STRDUP_A(default_query_delimiter);
|
||||
my_instance->query_delimiter_size = 3;
|
||||
|
||||
for (i = 0; params && params[i]; i++)
|
||||
{
|
||||
if (!strcmp(params[i]->name, "filename"))
|
||||
{
|
||||
free(my_instance->filename);
|
||||
my_instance->filename = strdup(params[i]->value);
|
||||
MXS_FREE(my_instance->filename);
|
||||
my_instance->filename = MXS_STRDUP_A(params[i]->value);
|
||||
}
|
||||
else if (!strcmp(params[i]->name, "source"))
|
||||
{
|
||||
my_instance->source = strdup(params[i]->value);
|
||||
my_instance->source = MXS_STRDUP_A(params[i]->value);
|
||||
}
|
||||
else if (!strcmp(params[i]->name, "user"))
|
||||
{
|
||||
my_instance->user = strdup(params[i]->value);
|
||||
my_instance->user = MXS_STRDUP_A(params[i]->value);
|
||||
}
|
||||
else if (!strcmp(params[i]->name, "delimiter"))
|
||||
{
|
||||
free(my_instance->delimiter);
|
||||
my_instance->delimiter = strdup(params[i]->value);
|
||||
MXS_FREE(my_instance->delimiter);
|
||||
my_instance->delimiter = MXS_STRDUP_A(params[i]->value);
|
||||
}
|
||||
else if (!strcmp(params[i]->name, "query_delimiter"))
|
||||
{
|
||||
free(my_instance->query_delimiter);
|
||||
my_instance->query_delimiter = strdup(params[i]->value);
|
||||
MXS_FREE(my_instance->query_delimiter);
|
||||
my_instance->query_delimiter = MXS_STRDUP_A(params[i]->value);
|
||||
my_instance->query_delimiter_size = strlen(my_instance->query_delimiter);
|
||||
}
|
||||
else if (!strcmp(params[i]->name, "named_pipe"))
|
||||
{
|
||||
if (params[i]->value == NULL)
|
||||
{
|
||||
MXS_ERROR("You need to specify 'named_pipe' for tpmfilter.");
|
||||
MXS_FREE(my_instance);
|
||||
return NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
my_instance->named_pipe = MXS_STRDUP_A(params[i]->value);
|
||||
// check if the file exists first.
|
||||
if (access(my_instance->named_pipe, F_OK) == 0)
|
||||
{
|
||||
// if exists, check if it is a named pipe.
|
||||
struct stat st;
|
||||
ret = stat(my_instance->named_pipe, &st);
|
||||
|
||||
// check whether the file is named pipe.
|
||||
if (ret == -1 && errno != ENOENT)
|
||||
{
|
||||
MXS_ERROR("stat() failed on named pipe: %s", strerror(errno));
|
||||
MXS_FREE(my_instance);
|
||||
return NULL;
|
||||
}
|
||||
if (ret == 0 && S_ISFIFO(st.st_mode))
|
||||
{
|
||||
// if it is a named pipe, we delete it and recreate it.
|
||||
unlink(my_instance->named_pipe);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("The file '%s' already exists and it is not a named pipe.", my_instance->named_pipe);
|
||||
MXS_FREE(my_instance);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
// now create the named pipe.
|
||||
ret = mkfifo(my_instance->named_pipe, 0660);
|
||||
if (ret == -1)
|
||||
{
|
||||
MXS_ERROR("mkfifo() failed on named pipe: %s", strerror(errno));
|
||||
MXS_FREE(my_instance);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (my_instance->named_pipe == NULL)
|
||||
{
|
||||
MXS_ERROR("You need to specify 'named_pipe' for tpmfilter.");
|
||||
MXS_FREE(my_instance);
|
||||
return NULL;
|
||||
}
|
||||
my_instance->sessions = 0;
|
||||
my_instance->fp = fopen(my_instance->filename, "w");
|
||||
@ -239,6 +310,18 @@ createInstance(const char *name, char **options, FILTER_PARAMETER **params)
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Launch a thread that checks the named pipe.
|
||||
*/
|
||||
THREAD thread;
|
||||
if (thread_start(&thread, checkNamedPipe, (void*) my_instance) == NULL)
|
||||
{
|
||||
MXS_ERROR("Couldn't create a thread to check the named pipe: %s", strerror(errno));
|
||||
MXS_FREE(my_instance);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return (FILTER *)my_instance;
|
||||
}
|
||||
|
||||
@ -259,14 +342,14 @@ newSession(FILTER *instance, SESSION *session)
|
||||
int i;
|
||||
char *remote, *user;
|
||||
|
||||
if ((my_session = calloc(1, sizeof(TPM_SESSION))) != NULL)
|
||||
if ((my_session = MXS_CALLOC(1, sizeof(TPM_SESSION))) != NULL)
|
||||
{
|
||||
atomic_add(&my_instance->sessions, 1);
|
||||
|
||||
my_session->max_sql_size = 4 * 1024; // default max query size of 4k.
|
||||
my_session->sql = (char*)malloc(my_session->max_sql_size);
|
||||
my_session->max_sql_size = default_sql_size; // default max query size of 4k.
|
||||
my_session->sql = (char*)MXS_MALLOC(my_session->max_sql_size);
|
||||
memset(my_session->sql, 0x00, my_session->max_sql_size);
|
||||
my_session->buf = (char*)malloc(buf_size);
|
||||
my_session->buf = (char*)MXS_MALLOC(buf_size);
|
||||
my_session->sql_index = 0;
|
||||
my_session->n_statements = 0;
|
||||
my_session->total.tv_sec = 0;
|
||||
@ -274,7 +357,7 @@ newSession(FILTER *instance, SESSION *session)
|
||||
my_session->current = NULL;
|
||||
if ((remote = session_get_remote(session)) != NULL)
|
||||
{
|
||||
my_session->clientHost = strdup(remote);
|
||||
my_session->clientHost = MXS_STRDUP_A(remote);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -282,7 +365,7 @@ newSession(FILTER *instance, SESSION *session)
|
||||
}
|
||||
if ((user = session_getUser(session)) != NULL)
|
||||
{
|
||||
my_session->userName = strdup(user);
|
||||
my_session->userName = MXS_STRDUP_A(user);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -335,11 +418,11 @@ freeSession(FILTER *instance, void *session)
|
||||
{
|
||||
TPM_SESSION *my_session = (TPM_SESSION *)session;
|
||||
|
||||
free(my_session->clientHost);
|
||||
free(my_session->userName);
|
||||
free(my_session->sql);
|
||||
free(my_session->buf);
|
||||
free(session);
|
||||
MXS_FREE(my_session->clientHost);
|
||||
MXS_FREE(my_session->userName);
|
||||
MXS_FREE(my_session->sql);
|
||||
MXS_FREE(my_session->buf);
|
||||
MXS_FREE(session);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -397,25 +480,19 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
||||
{
|
||||
if ((ptr = modutil_get_SQL(queue)) != NULL)
|
||||
{
|
||||
uint32_t query_type = qc_get_type(queue);
|
||||
int query_len = strlen(ptr);
|
||||
my_session->query_end = false;
|
||||
|
||||
/* check for commit and rollback */
|
||||
if (strlen(ptr) > 5)
|
||||
if (query_type & QUERY_TYPE_COMMIT)
|
||||
{
|
||||
size_t ptr_size = strlen(ptr) + 1;
|
||||
char* buf = my_session->buf;
|
||||
for (i = 0; i < ptr_size && i < buf_size; ++i)
|
||||
{
|
||||
buf[i] = tolower(ptr[i]);
|
||||
}
|
||||
if (strncmp(buf, "commit", 6) == 0)
|
||||
{
|
||||
my_session->query_end = true;
|
||||
}
|
||||
else if (strncmp(buf, "rollback", 8) == 0)
|
||||
{
|
||||
my_session->query_end = true;
|
||||
my_session->sql_index = 0;
|
||||
}
|
||||
my_session->query_end = true;
|
||||
}
|
||||
else if (query_type & QUERY_TYPE_ROLLBACK)
|
||||
{
|
||||
my_session->query_end = true;
|
||||
my_session->sql_index = 0;
|
||||
}
|
||||
|
||||
/* for normal sql statements */
|
||||
@ -439,14 +516,14 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
||||
}
|
||||
if (new_sql_size > my_session->max_sql_size)
|
||||
{
|
||||
char* new_sql = (char*)malloc(new_sql_size);
|
||||
char* new_sql = (char*)MXS_MALLOC(new_sql_size);
|
||||
if (new_sql == NULL)
|
||||
{
|
||||
MXS_ERROR("Memory allocation failure.");
|
||||
goto retblock;
|
||||
}
|
||||
memcpy(new_sql, my_session->sql, my_session->sql_index);
|
||||
free(my_session->sql);
|
||||
MXS_FREE(my_session->sql);
|
||||
my_session->sql = new_sql;
|
||||
my_session->max_sql_size = new_sql_size;
|
||||
}
|
||||
@ -475,7 +552,7 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
||||
|
||||
retblock:
|
||||
|
||||
free(ptr);
|
||||
MXS_FREE(ptr);
|
||||
/* Pass the query downstream */
|
||||
return my_session->down.routeQuery(my_session->down.instance,
|
||||
my_session->down.session, queue);
|
||||
@ -503,16 +580,19 @@ clientReply(FILTER *instance, void *session, GWBUF *reply)
|
||||
*(my_session->sql + my_session->sql_index) = '\0';
|
||||
|
||||
/* print to log. */
|
||||
fprintf(my_instance->fp, "%ld%s%s%s%s%s%ld%s%s\n",
|
||||
timestamp,
|
||||
my_instance->delimiter,
|
||||
my_session->clientHost,
|
||||
my_instance->delimiter,
|
||||
my_session->userName,
|
||||
my_instance->delimiter,
|
||||
millis,
|
||||
my_instance->delimiter,
|
||||
my_session->sql);
|
||||
if (my_instance->log_enabled)
|
||||
{
|
||||
fprintf(my_instance->fp, "%ld%s%s%s%s%s%ld%s%s\n",
|
||||
timestamp,
|
||||
my_instance->delimiter,
|
||||
reply->server->unique_name,
|
||||
my_instance->delimiter,
|
||||
reply->server->name,
|
||||
my_instance->delimiter,
|
||||
millis,
|
||||
my_instance->delimiter,
|
||||
my_session->sql);
|
||||
}
|
||||
|
||||
my_session->sql_index = 0;
|
||||
}
|
||||
@ -566,3 +646,48 @@ static uint64_t getCapabilities(void)
|
||||
{
|
||||
return RCAP_TYPE_CONTIGUOUS_INPUT;
|
||||
}
|
||||
|
||||
static void checkNamedPipe(void *args)
|
||||
{
|
||||
int ret;
|
||||
char buffer[2];
|
||||
char buf[4096];
|
||||
TPM_INSTANCE* inst = (TPM_INSTANCE*) args;
|
||||
char* named_pipe = inst->named_pipe;
|
||||
|
||||
// open named pipe and this will block until middleware opens it.
|
||||
while ((inst->named_pipe_fd = open(named_pipe, O_RDONLY)) > 0)
|
||||
{
|
||||
// 1 for start logging, 0 for stopping.
|
||||
while ((ret = read(inst->named_pipe_fd, buffer, 1)) > 0)
|
||||
{
|
||||
if (buffer[0] == '1')
|
||||
{
|
||||
// reopens the log file.
|
||||
inst->fp = fopen(inst->filename, "w");
|
||||
if (inst->fp == NULL)
|
||||
{
|
||||
MXS_ERROR("Failed to open a log file for tpmfilter.");
|
||||
MXS_FREE(inst);
|
||||
return;
|
||||
}
|
||||
inst->log_enabled = true;
|
||||
}
|
||||
else if (buffer[0] == '0')
|
||||
{
|
||||
inst->log_enabled = false;
|
||||
}
|
||||
}
|
||||
if (ret == 0)
|
||||
{
|
||||
close(inst->named_pipe_fd);
|
||||
}
|
||||
}
|
||||
if (inst->named_pipe_fd == -1)
|
||||
{
|
||||
MXS_ERROR("Failed to open the named pipe '%s': %s", named_pipe, strerror(errno));
|
||||
return;
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
@ -895,6 +895,7 @@ gw_read_and_write(DCB *dcb)
|
||||
if (session_ok_to_route(dcb))
|
||||
{
|
||||
gwbuf_set_type(stmt, GWBUF_TYPE_MYSQL);
|
||||
stmt->server = dcb->server;
|
||||
session->service->router->clientReply(session->service->router_instance,
|
||||
session->router_session,
|
||||
stmt, dcb);
|
||||
|
Loading…
x
Reference in New Issue
Block a user