Format tpmfilter

The filter is now formatted with the astyle config.
This commit is contained in:
Markus Makela 2016-09-15 12:21:54 +03:00
parent 08d2529312
commit b60f000a4c

View File

@ -27,14 +27,14 @@
* in a single GWBUF.
*
* Optional parameters:
* filename=<name of the file to which transaction performance logs are written (default=tpm.log)>
* delimiter=<delimiter for columns in a log (default='|')>
* query_delimiter=<delimiter for query statements in a transaction (default=';')>
* source=<source address to limit filter>
* user=<username to limit filter>
* filename=<name of the file to which transaction performance logs are written (default=tpm.log)>
* delimiter=<delimiter for columns in a log (default='|')>
* query_delimiter=<delimiter for query statements in a transaction (default=';')>
* source=<source address to limit filter>
* user=<username to limit filter>
*
* Date Who Description
* 06/12/2015 Dong Young Yoon Initial implementation
* Date Who Description
* 06/12/2015 Dong Young Yoon Initial implementation
*
* @endverbatim
*/
@ -56,31 +56,34 @@
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
MODULE_INFO info = {
MODULE_API_FILTER,
MODULE_GA,
FILTER_VERSION,
"Transaction Performance Monitoring filter"
MODULE_INFO info =
{
MODULE_API_FILTER,
MODULE_GA,
FILTER_VERSION,
"Transaction Performance Monitoring filter"
};
static char *version_str = "V1.0.0";
static size_t buf_size = 10;
static size_t sql_size_limit = 64 * 1024 * 1024; /* The maximum size for query statements in a transaction (64MB) */
static size_t sql_size_limit = 64 * 1024 *
1024; /* The maximum size for query statements in a transaction (64MB) */
/*
* The filter entry points
*/
static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **);
static void *newSession(FILTER *instance, SESSION *session);
static void closeSession(FILTER *instance, void *session);
static void freeSession(FILTER *instance, void *session);
static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
static void setUpstream(FILTER *instance, void *fsession, UPSTREAM *upstream);
static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue);
static int clientReply(FILTER *instance, void *fsession, GWBUF *queue);
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb);
static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **);
static void *newSession(FILTER *instance, SESSION *session);
static void closeSession(FILTER *instance, void *session);
static void freeSession(FILTER *instance, void *session);
static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
static void setUpstream(FILTER *instance, void *fsession, UPSTREAM *upstream);
static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue);
static int clientReply(FILTER *instance, void *fsession, GWBUF *queue);
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb);
static FILTER_OBJECT MyObject = {
static FILTER_OBJECT MyObject =
{
createInstance,
newSession,
closeSession,
@ -95,16 +98,17 @@ static FILTER_OBJECT MyObject = {
/**
* A instance structure, every instance will write to a same file.
*/
typedef struct {
int sessions; /* Session count */
char *source; /* The source of the client connection */
char *user; /* The user name to filter on */
char *filename; /* filename */
char *delimiter; /* delimiter for columns in a log */
char *query_delimiter; /* delimiter for query statements in a transaction */
typedef struct
{
int sessions; /* Session count */
char *source; /* The source of the client connection */
char *user; /* The user name to filter on */
char *filename; /* filename */
char *delimiter; /* delimiter for columns in a log */
char *query_delimiter; /* delimiter for query statements in a transaction */
int query_delimiter_size; /* the length of the query delimiter */
FILE* fp;
int query_delimiter_size; /* the length of the query delimiter */
FILE* fp;
} TPM_INSTANCE;
/**
@ -115,22 +119,23 @@ typedef struct {
*
* It also holds the file descriptor to which queries are written.
*/
typedef struct {
DOWNSTREAM down;
UPSTREAM up;
int active;
char *clientHost;
char *userName;
char* sql;
struct timeval start;
char *current;
int n_statements;
struct timeval total;
struct timeval current_start;
bool query_end;
char *buf;
int sql_index;
size_t max_sql_size;
typedef struct
{
DOWNSTREAM down;
UPSTREAM up;
int active;
char *clientHost;
char *userName;
char* sql;
struct timeval start;
char *current;
int n_statements;
struct timeval total;
struct timeval current_start;
bool query_end;
char *buf;
int sql_index;
size_t max_sql_size;
} TPM_SESSION;
/**
@ -141,7 +146,7 @@ typedef struct {
char *
version()
{
return version_str;
return version_str;
}
/**
@ -164,69 +169,74 @@ ModuleInit()
FILTER_OBJECT *
GetModuleObject()
{
return &MyObject;
return &MyObject;
}
/**
* Create an instance of the filter for a particular service
* within MaxScale.
*
* @param options The options for this filter
* @param params The array of name/value pair parameters for the filter
* @param options The options for this filter
* @param params The array of name/value pair parameters for the filter
*
* @return The instance data for this new instance
*/
static FILTER *
static FILTER *
createInstance(const char *name, char **options, FILTER_PARAMETER **params)
{
int i;
TPM_INSTANCE *my_instance;
int i;
TPM_INSTANCE *my_instance;
if ((my_instance = calloc(1, sizeof(TPM_INSTANCE))) != NULL)
{
my_instance->source = NULL;
my_instance->user = NULL;
if ((my_instance = calloc(1, sizeof(TPM_INSTANCE))) != NULL)
{
my_instance->source = NULL;
my_instance->user = NULL;
/* set default log filename */
my_instance->filename = strdup("tpm.log");
/* set default delimiter */
my_instance->delimiter = strdup("|");
/* set default query delimiter */
my_instance->query_delimiter = strdup(";");
my_instance->query_delimiter_size = 1;
/* set default log filename */
my_instance->filename = strdup("tpm.log");
/* set default delimiter */
my_instance->delimiter = strdup("|");
/* set default query delimiter */
my_instance->query_delimiter = strdup(";");
my_instance->query_delimiter_size = 1;
for (i = 0; params && params[i]; i++)
{
if (!strcmp(params[i]->name, "filename"))
{
free(my_instance->filename);
my_instance->filename = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "source"))
my_instance->source = strdup(params[i]->value);
else if (!strcmp(params[i]->name, "user"))
my_instance->user = strdup(params[i]->value);
else if (!strcmp(params[i]->name, "delimiter"))
{
free(my_instance->delimiter);
my_instance->delimiter = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "query_delimiter"))
{
free(my_instance->query_delimiter);
my_instance->query_delimiter = strdup(params[i]->value);
my_instance->query_delimiter_size = strlen(my_instance->query_delimiter);
}
}
my_instance->sessions = 0;
my_instance->fp = fopen(my_instance->filename, "w");
if (my_instance->fp == NULL)
{
MXS_ERROR("Opening output file '%s' for tpmfilter failed due to %d, %s", my_instance->filename, errno, strerror(errno));
return NULL;
}
}
return (FILTER *)my_instance;
for (i = 0; params && params[i]; i++)
{
if (!strcmp(params[i]->name, "filename"))
{
free(my_instance->filename);
my_instance->filename = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "source"))
{
my_instance->source = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "user"))
{
my_instance->user = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "delimiter"))
{
free(my_instance->delimiter);
my_instance->delimiter = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "query_delimiter"))
{
free(my_instance->query_delimiter);
my_instance->query_delimiter = strdup(params[i]->value);
my_instance->query_delimiter_size = strlen(my_instance->query_delimiter);
}
}
my_instance->sessions = 0;
my_instance->fp = fopen(my_instance->filename, "w");
if (my_instance->fp == NULL)
{
MXS_ERROR("Opening output file '%s' for tpmfilter failed due to %d, %s", my_instance->filename, errno,
strerror(errno));
return NULL;
}
}
return (FILTER *)my_instance;
}
/**
@ -234,120 +244,132 @@ TPM_INSTANCE *my_instance;
*
* Every session uses the same log file.
*
* @param instance The filter instance data
* @param session The session itself
* @param instance The filter instance data
* @param session The session itself
* @return Session specific data for this session
*/
static void *
static void *
newSession(FILTER *instance, SESSION *session)
{
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
TPM_SESSION *my_session;
int i;
char *remote, *user;
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
TPM_SESSION *my_session;
int i;
char *remote, *user;
if ((my_session = calloc(1, sizeof(TPM_SESSION))) != NULL)
{
atomic_add(&my_instance->sessions,1);
if ((my_session = calloc(1, sizeof(TPM_SESSION))) != NULL)
{
atomic_add(&my_instance->sessions, 1);
my_session->max_sql_size = 4 * 1024; // default max query size of 4k.
my_session->sql = (char*)malloc(my_session->max_sql_size);
memset(my_session->sql, 0x00, my_session->max_sql_size);
my_session->buf = (char*)malloc(buf_size);
my_session->sql_index = 0;
my_session->n_statements = 0;
my_session->total.tv_sec = 0;
my_session->total.tv_usec = 0;
my_session->current = NULL;
if ((remote = session_get_remote(session)) != NULL)
my_session->clientHost = strdup(remote);
else
my_session->clientHost = NULL;
if ((user = session_getUser(session)) != NULL)
my_session->userName = strdup(user);
else
my_session->userName = NULL;
my_session->active = 1;
if (my_instance->source && my_session->clientHost && strcmp(my_session->clientHost,
my_instance->source))
my_session->active = 0;
if (my_instance->user && my_session->userName && strcmp(my_session->userName,
my_instance->user))
my_session->active = 0;
}
my_session->max_sql_size = 4 * 1024; // default max query size of 4k.
my_session->sql = (char*)malloc(my_session->max_sql_size);
memset(my_session->sql, 0x00, my_session->max_sql_size);
my_session->buf = (char*)malloc(buf_size);
my_session->sql_index = 0;
my_session->n_statements = 0;
my_session->total.tv_sec = 0;
my_session->total.tv_usec = 0;
my_session->current = NULL;
if ((remote = session_get_remote(session)) != NULL)
{
my_session->clientHost = strdup(remote);
}
else
{
my_session->clientHost = NULL;
}
if ((user = session_getUser(session)) != NULL)
{
my_session->userName = strdup(user);
}
else
{
my_session->userName = NULL;
}
my_session->active = 1;
if (my_instance->source && my_session->clientHost && strcmp(my_session->clientHost,
my_instance->source))
{
my_session->active = 0;
}
if (my_instance->user && my_session->userName && strcmp(my_session->userName,
my_instance->user))
{
my_session->active = 0;
}
}
return my_session;
return my_session;
}
/**
* Close a session with the filter, this is the mechanism
* by which a filter may cleanup data structure etc.
*
* @param instance The filter instance data
* @param session The session being closed
* @param instance The filter instance data
* @param session The session being closed
*/
static void
static void
closeSession(FILTER *instance, void *session)
{
TPM_SESSION *my_session = (TPM_SESSION *)session;
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
if (my_instance->fp != NULL)
{
// flush FP when a session is closed.
fflush(my_instance->fp);
}
TPM_SESSION *my_session = (TPM_SESSION *)session;
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
if (my_instance->fp != NULL)
{
// flush FP when a session is closed.
fflush(my_instance->fp);
}
}
/**
* Free the memory associated with the session
*
* @param instance The filter instance
* @param session The filter session
* @param instance The filter instance
* @param session The filter session
*/
static void
freeSession(FILTER *instance, void *session)
{
TPM_SESSION *my_session = (TPM_SESSION *)session;
TPM_SESSION *my_session = (TPM_SESSION *)session;
free(my_session->clientHost);
free(my_session->userName);
free(my_session->sql);
free(my_session->buf);
free(session);
return;
free(my_session->clientHost);
free(my_session->userName);
free(my_session->sql);
free(my_session->buf);
free(session);
return;
}
/**
* Set the downstream filter or router to which queries will be
* passed from this filter.
*
* @param instance The filter instance data
* @param session The filter session
* @param downstream The downstream filter or router.
* @param instance The filter instance data
* @param session The filter session
* @param downstream The downstream filter or router.
*/
static void
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
{
TPM_SESSION *my_session = (TPM_SESSION *)session;
TPM_SESSION *my_session = (TPM_SESSION *)session;
my_session->down = *downstream;
my_session->down = *downstream;
}
/**
* Set the upstream filter or session to which results will be
* passed from this filter.
*
* @param instance The filter instance data
* @param session The filter session
* @param upstream The upstream filter or session.
* @param instance The filter instance data
* @param session The filter session
* @param upstream The upstream filter or session.
*/
static void
setUpstream(FILTER *instance, void *session, UPSTREAM *upstream)
{
TPM_SESSION *my_session = (TPM_SESSION *)session;
TPM_SESSION *my_session = (TPM_SESSION *)session;
my_session->up = *upstream;
my_session->up = *upstream;
}
/**
@ -356,148 +378,149 @@ TPM_SESSION *my_session = (TPM_SESSION *)session;
* query should normally be passed to the downstream component
* (filter or router) in the filter chain.
*
* @param instance The filter instance data
* @param session The filter session
* @param queue The query data
* @param instance The filter instance data
* @param session The filter session
* @param queue The query data
*/
static int
static int
routeQuery(FILTER *instance, void *session, GWBUF *queue)
{
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
TPM_SESSION *my_session = (TPM_SESSION *)session;
char *ptr = NULL;
size_t i;
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
TPM_SESSION *my_session = (TPM_SESSION *)session;
char *ptr = NULL;
size_t i;
if (my_session->active)
{
if (queue->next != NULL)
{
queue = gwbuf_make_contiguous(queue);
}
if ((ptr = modutil_get_SQL(queue)) != NULL)
{
my_session->query_end = false;
/* check for commit and rollback */
if (strlen(ptr) > 5)
{
size_t ptr_size = strlen(ptr)+1;
char* buf = my_session->buf;
for (i=0; i < ptr_size && i < buf_size; ++i)
{
buf[i] = tolower(ptr[i]);
}
if (strncmp(buf, "commit", 6) == 0)
{
my_session->query_end = true;
}
else if (strncmp(buf, "rollback", 8) == 0)
{
my_session->query_end = true;
my_session->sql_index = 0;
}
}
if (my_session->active)
{
if (queue->next != NULL)
{
queue = gwbuf_make_contiguous(queue);
}
if ((ptr = modutil_get_SQL(queue)) != NULL)
{
my_session->query_end = false;
/* check for commit and rollback */
if (strlen(ptr) > 5)
{
size_t ptr_size = strlen(ptr) + 1;
char* buf = my_session->buf;
for (i = 0; i < ptr_size && i < buf_size; ++i)
{
buf[i] = tolower(ptr[i]);
}
if (strncmp(buf, "commit", 6) == 0)
{
my_session->query_end = true;
}
else if (strncmp(buf, "rollback", 8) == 0)
{
my_session->query_end = true;
my_session->sql_index = 0;
}
}
/* for normal sql statements */
if (!my_session->query_end)
{
/* check and expand buffer size first. */
size_t new_sql_size = my_session->max_sql_size;
size_t len = my_session->sql_index + strlen(ptr) + my_instance->query_delimiter_size + 1;
/* for normal sql statements */
if (!my_session->query_end)
{
/* check and expand buffer size first. */
size_t new_sql_size = my_session->max_sql_size;
size_t len = my_session->sql_index + strlen(ptr) + my_instance->query_delimiter_size + 1;
/* if the total length of query statements exceeds the maximum limit, print an error and return */
if (len > sql_size_limit)
{
MXS_ERROR("The size of query statements exceeds the maximum buffer limit of 64MB.");
goto retblock;
}
/* if the total length of query statements exceeds the maximum limit, print an error and return */
if (len > sql_size_limit)
{
MXS_ERROR("The size of query statements exceeds the maximum buffer limit of 64MB.");
goto retblock;
}
/* double buffer size until the buffer fits the query */
while (len > new_sql_size)
{
new_sql_size *= 2;
}
if (new_sql_size > my_session->max_sql_size)
{
char* new_sql = (char*)malloc(new_sql_size);
if (new_sql == NULL)
{
MXS_ERROR("Memory allocation failure.");
goto retblock;
}
memcpy(new_sql, my_session->sql, my_session->sql_index);
free(my_session->sql);
my_session->sql = new_sql;
my_session->max_sql_size = new_sql_size;
}
/* double buffer size until the buffer fits the query */
while (len > new_sql_size)
{
new_sql_size *= 2;
}
if (new_sql_size > my_session->max_sql_size)
{
char* new_sql = (char*)malloc(new_sql_size);
if (new_sql == NULL)
{
MXS_ERROR("Memory allocation failure.");
goto retblock;
}
memcpy(new_sql, my_session->sql, my_session->sql_index);
free(my_session->sql);
my_session->sql = new_sql;
my_session->max_sql_size = new_sql_size;
}
/* first statement */
if (my_session->sql_index == 0)
{
memcpy(my_session->sql, ptr, strlen(ptr));
my_session->sql_index += strlen(ptr);
gettimeofday(&my_session->current_start, NULL);
}
/* otherwise, append the statement with semicolon as a statement delimiter */
else
{
/* append a query delimiter */
memcpy(my_session->sql + my_session->sql_index, my_instance->query_delimiter, my_instance->query_delimiter_size);
/* append the next query statement */
memcpy(my_session->sql + my_session->sql_index + my_instance->query_delimiter_size, ptr, strlen(ptr));
/* set new pointer for the buffer */
my_session->sql_index += (my_instance->query_delimiter_size + strlen(ptr));
}
}
}
}
/* first statement */
if (my_session->sql_index == 0)
{
memcpy(my_session->sql, ptr, strlen(ptr));
my_session->sql_index += strlen(ptr);
gettimeofday(&my_session->current_start, NULL);
}
/* otherwise, append the statement with semicolon as a statement delimiter */
else
{
/* append a query delimiter */
memcpy(my_session->sql + my_session->sql_index, my_instance->query_delimiter,
my_instance->query_delimiter_size);
/* append the next query statement */
memcpy(my_session->sql + my_session->sql_index + my_instance->query_delimiter_size, ptr, strlen(ptr));
/* set new pointer for the buffer */
my_session->sql_index += (my_instance->query_delimiter_size + strlen(ptr));
}
}
}
}
retblock:
free(ptr);
/* Pass the query downstream */
return my_session->down.routeQuery(my_session->down.instance,
my_session->down.session, queue);
free(ptr);
/* Pass the query downstream */
return my_session->down.routeQuery(my_session->down.instance,
my_session->down.session, queue);
}
static int
clientReply(FILTER *instance, void *session, GWBUF *reply)
{
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
TPM_SESSION *my_session = (TPM_SESSION *)session;
struct timeval tv, diff;
int i, inserted;
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
TPM_SESSION *my_session = (TPM_SESSION *)session;
struct timeval tv, diff;
int i, inserted;
/* found 'commit' and sql statements exist. */
if (my_session->query_end && my_session->sql_index > 0)
{
gettimeofday(&tv, NULL);
timersub(&tv, &(my_session->current_start), &diff);
/* found 'commit' and sql statements exist. */
if (my_session->query_end && my_session->sql_index > 0)
{
gettimeofday(&tv, NULL);
timersub(&tv, &(my_session->current_start), &diff);
/* get latency */
uint64_t millis = (diff.tv_sec * (uint64_t)1000 + diff.tv_usec / 1000);
/* get timestamp */
uint64_t timestamp = (tv.tv_sec + (tv.tv_usec / (1000*1000)));
/* get latency */
uint64_t millis = (diff.tv_sec * (uint64_t)1000 + diff.tv_usec / 1000);
/* get timestamp */
uint64_t timestamp = (tv.tv_sec + (tv.tv_usec / (1000 * 1000)));
*(my_session->sql + my_session->sql_index) = '\0';
*(my_session->sql + my_session->sql_index) = '\0';
/* print to log. */
fprintf(my_instance->fp, "%ld%s%s%s%s%s%ld%s%s\n",
timestamp,
my_instance->delimiter,
my_session->clientHost,
my_instance->delimiter,
my_session->userName,
my_instance->delimiter,
millis,
my_instance->delimiter,
my_session->sql);
/* print to log. */
fprintf(my_instance->fp, "%ld%s%s%s%s%s%ld%s%s\n",
timestamp,
my_instance->delimiter,
my_session->clientHost,
my_instance->delimiter,
my_session->userName,
my_instance->delimiter,
millis,
my_instance->delimiter,
my_session->sql);
my_session->sql_index = 0;
}
my_session->sql_index = 0;
}
/* Pass the result upstream */
return my_session->up.clientReply(my_session->up.instance,
my_session->up.session, reply);
/* Pass the result upstream */
return my_session->up.clientReply(my_session->up.instance,
my_session->up.session, reply);
}
/**
@ -507,30 +530,30 @@ int i, inserted;
* instance as a whole, otherwise print diagnostics for the
* particular session.
*
* @param instance The filter instance
* @param fsession Filter session, may be NULL
* @param dcb The DCB for diagnostic output
* @param instance The filter instance
* @param fsession Filter session, may be NULL
* @param dcb The DCB for diagnostic output
*/
static void
static void
diagnostic(FILTER *instance, void *fsession, DCB *dcb)
{
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
TPM_SESSION *my_session = (TPM_SESSION *)fsession;
int i;
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
TPM_SESSION *my_session = (TPM_SESSION *)fsession;
int i;
if (my_instance->source)
dcb_printf(dcb, "\t\tLimit logging to connections from %s\n",
my_instance->source);
if (my_instance->user)
dcb_printf(dcb, "\t\tLimit logging to user %s\n",
my_instance->user);
if (my_instance->filename)
dcb_printf(dcb, "\t\tLogging to file %s.\n",
my_instance->filename);
if (my_instance->delimiter)
dcb_printf(dcb, "\t\tLogging with delimiter %s.\n",
my_instance->delimiter);
if (my_instance->query_delimiter)
dcb_printf(dcb, "\t\tLogging with query delimiter %s.\n",
my_instance->query_delimiter);
if (my_instance->source)
dcb_printf(dcb, "\t\tLimit logging to connections from %s\n",
my_instance->source);
if (my_instance->user)
dcb_printf(dcb, "\t\tLimit logging to user %s\n",
my_instance->user);
if (my_instance->filename)
dcb_printf(dcb, "\t\tLogging to file %s.\n",
my_instance->filename);
if (my_instance->delimiter)
dcb_printf(dcb, "\t\tLogging with delimiter %s.\n",
my_instance->delimiter);
if (my_instance->query_delimiter)
dcb_printf(dcb, "\t\tLogging with query delimiter %s.\n",
my_instance->query_delimiter);
}