Merge branch 'develop' into MXS-936

This commit is contained in:
MassimilianoPinto
2016-11-09 16:21:40 +01:00
6 changed files with 189 additions and 41 deletions

View File

@ -30,8 +30,10 @@ The QLA filter accepts the following options.
|ignorecase|Use case-insensitive matching | |ignorecase|Use case-insensitive matching |
|case |Use case-sensitive matching | |case |Use case-sensitive matching |
|extended |Use extended regular expression syntax (ERE)| |extended |Use extended regular expression syntax (ERE)|
|session_file| Use session-specific file (default)|
To use multiple filter options, list them in a comma-separated list. |unified_file| Use one file for all sessions|
|flush_writes| Flush after every write|
To use multiple filter options, list them in a comma-separated list. If no file settings are given, default will be used. Multiple file settings can be enabled simultaneously.
``` ```
options=case,extended options=case,extended

View File

@ -51,11 +51,34 @@ typedef struct hktask
struct hktask *next; /*< Next task in the list */ struct hktask *next; /*< Next task in the list */
} HKTASK; } HKTASK;
extern void hkinit(); /**
* Initialises the housekeeper mechanism.
*
* A call to any of the other housekeeper functions can be made only if
* this function returns successfully.
*
* @return True if the housekeeper mechanism was initialized, false otherwise.
*/
extern bool hkinit();
/**
* Shuts down the housekeeper mechanism.
*
* Should be called @b only if @c hkinit() returned successfully.
*
* @see hkinit hkfinish
*/
extern void hkshutdown();
/**
* Waits for the housekeeper thread to finish. Should be called only after
* hkshutdown() has been called.
*/
extern void hkfinish();
extern int hktask_add(const char *name, void (*task)(void *), void *data, int frequency); extern int hktask_add(const char *name, void (*task)(void *), void *data, int frequency);
extern int hktask_oneshot(const char *name, void (*task)(void *), void *data, int when); extern int hktask_oneshot(const char *name, void (*task)(void *), void *data, int when);
extern int hktask_remove(const char *name); extern int hktask_remove(const char *name);
extern void hkshutdown();
extern void hkshow_tasks(DCB *pdcb); extern void hkshow_tasks(DCB *pdcb);
MXS_END_DECLS MXS_END_DECLS

View File

@ -0,0 +1,22 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
/**
* @file semaphore.h Semaphores used by MaxScale.
*/
// As a minimal preparation for other environments than Linux, components
// include <maxscale/semaphore.h>, instead of including <semaphore.h>
// directly.
#include <semaphore.h>

View File

@ -1936,7 +1936,13 @@ int main(int argc, char **argv)
/* /*
* Start the housekeeper thread * Start the housekeeper thread
*/ */
hkinit(); if (!hkinit())
{
char* logerr = "Failed to start housekeeper thread.";
print_log_n_stderr(true, true, logerr, logerr, 0);
rc = MAXSCALE_INTERNALERROR;
goto return_main;
}
/*< /*<
* Start the polling threads, note this is one less than is * Start the polling threads, note this is one less than is
@ -1974,6 +1980,11 @@ int main(int argc, char **argv)
*/ */
poll_waitevents((void *)0); poll_waitevents((void *)0);
/*<
* Wait for the housekeeper to finish.
*/
hkfinish();
/*< /*<
* Wait server threads' completion. * Wait server threads' completion.
*/ */

View File

@ -10,13 +10,14 @@
* of this software will be governed by version 2 or later of the General * of this software will be governed by version 2 or later of the General
* Public License. * Public License.
*/ */
#include <maxscale/housekeeper.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <maxscale/alloc.h> #include <maxscale/alloc.h>
#include <maxscale/housekeeper.h> #include <maxscale/atomic.h>
#include <maxscale/thread.h> #include <maxscale/semaphore.h>
#include <maxscale/spinlock.h> #include <maxscale/spinlock.h>
#include <maxscale/log_manager.h> #include <maxscale/thread.h>
/** /**
* @file housekeeper.c Provide a mechanism to run periodic tasks * @file housekeeper.c Provide a mechanism to run periodic tasks
@ -49,22 +50,28 @@ static HKTASK *tasks = NULL;
*/ */
static SPINLOCK tasklock = SPINLOCK_INIT; static SPINLOCK tasklock = SPINLOCK_INIT;
static int do_shutdown = 0; static bool do_shutdown = 0;
long hkheartbeat = 0; /*< One heartbeat is 100 milliseconds */ long hkheartbeat = 0; /*< One heartbeat is 100 milliseconds */
static THREAD hk_thr_handle; static THREAD hk_thr_handle;
static void hkthread(void *); static void hkthread(void *);
/** bool
* Initialise the housekeeper thread
*/
void
hkinit() hkinit()
{ {
if (thread_start(&hk_thr_handle, hkthread, NULL) == NULL) bool inited = false;
if (thread_start(&hk_thr_handle, hkthread, NULL) != NULL)
{ {
MXS_ERROR("Failed to start housekeeper thread."); inited = true;
} }
else
{
MXS_ALERT("Failed to start housekeeper thread.");
}
return inited;
} }
/** /**
@ -255,21 +262,17 @@ hkthread(void *data)
void *taskdata; void *taskdata;
int i; int i;
for (;;) while (!do_shutdown)
{ {
for (i = 0; i < 10; i++) for (i = 0; i < 10; i++)
{ {
if (do_shutdown)
{
return;
}
thread_millisleep(100); thread_millisleep(100);
hkheartbeat++; hkheartbeat++;
} }
now = time(0); now = time(0);
spinlock_acquire(&tasklock); spinlock_acquire(&tasklock);
ptr = tasks; ptr = tasks;
while (ptr) while (!do_shutdown && ptr)
{ {
if (ptr->nextdue <= now) if (ptr->nextdue <= now)
{ {
@ -297,16 +300,25 @@ hkthread(void *data)
} }
spinlock_release(&tasklock); spinlock_release(&tasklock);
} }
MXS_NOTICE("Housekeeper shutting down.");
} }
/**
* Called to shutdown the housekeeper
*
*/
void void
hkshutdown() hkshutdown()
{ {
do_shutdown = 1; do_shutdown = true;
atomic_synchronize();
}
void hkfinish()
{
ss_dassert(do_shutdown);
MXS_NOTICE("Waiting for housekeeper to shut down.");
thread_wait(hk_thr_handle);
do_shutdown = false;
MXS_NOTICE("Housekeeper has shut down.");
} }
/** /**

View File

@ -62,6 +62,10 @@ static char *version_str = "V1.1.1";
/** Formatting buffer size */ /** Formatting buffer size */
#define QLA_STRING_BUFFER_SIZE 1024 #define QLA_STRING_BUFFER_SIZE 1024
/** Log file settings flags */
#define CONFIG_FILE_SESSION (1 << 0) // Default value, session specific files
#define CONFIG_FILE_UNIFIED (1 << 1) // One file shared by all sessions
/* /*
* The filter entry points * The filter entry points
*/ */
@ -108,6 +112,10 @@ typedef struct
regex_t re; /* Compiled regex text */ regex_t re; /* Compiled regex text */
char *nomatch; /* Optional text to match against for exclusion */ char *nomatch; /* Optional text to match against for exclusion */
regex_t nore; /* Compiled regex nomatch text */ regex_t nore; /* Compiled regex nomatch text */
uint32_t log_mode_flags; /* Log file mode settings */
FILE *unified_fp; /* Unified log file. The pointer needs to be shared here
* to avoid garbled printing. */
bool flush_writes; /* Flush log file after every write */
} QLA_INSTANCE; } QLA_INSTANCE;
/** /**
@ -126,6 +134,7 @@ typedef struct
int active; int active;
char *user; char *user;
char *remote; char *remote;
size_t ses_id; /* The session this filter serves */
} QLA_SESSION; } QLA_SESSION;
/** /**
@ -187,6 +196,9 @@ createInstance(const char *name, char **options, FILTER_PARAMETER **params)
my_instance->match = NULL; my_instance->match = NULL;
my_instance->nomatch = NULL; my_instance->nomatch = NULL;
my_instance->filebase = NULL; my_instance->filebase = NULL;
my_instance->log_mode_flags = 0;
my_instance->unified_fp = NULL;
my_instance->flush_writes = false;
bool error = false; bool error = false;
if (params) if (params)
@ -240,6 +252,18 @@ createInstance(const char *name, char **options, FILTER_PARAMETER **params)
{ {
cflags |= REG_EXTENDED; cflags |= REG_EXTENDED;
} }
else if (!strcasecmp(options[i], "session_file"))
{
my_instance->log_mode_flags |= CONFIG_FILE_SESSION;
}
else if (!strcasecmp(options[i], "unified_file"))
{
my_instance->log_mode_flags |= CONFIG_FILE_UNIFIED;
}
else if (!strcasecmp(options[i], "flush_writes"))
{
my_instance->flush_writes = true;
}
else else
{ {
MXS_ERROR("qlafilter: Unsupported option '%s'.", MXS_ERROR("qlafilter: Unsupported option '%s'.",
@ -248,7 +272,11 @@ createInstance(const char *name, char **options, FILTER_PARAMETER **params)
} }
} }
} }
if (my_instance->log_mode_flags == 0)
{
// If nothing has been set, set a default value
my_instance->log_mode_flags = CONFIG_FILE_SESSION;
}
if (my_instance->filebase == NULL) if (my_instance->filebase == NULL)
{ {
MXS_ERROR("qlafilter: No 'filebase' parameter defined."); MXS_ERROR("qlafilter: No 'filebase' parameter defined.");
@ -276,6 +304,35 @@ createInstance(const char *name, char **options, FILTER_PARAMETER **params)
my_instance->nomatch = NULL; my_instance->nomatch = NULL;
error = true; error = true;
} }
// Try to open the unified log file
if (my_instance->log_mode_flags & CONFIG_FILE_UNIFIED &&
my_instance->filebase != NULL)
{
// First calculate filename length
const char UNIFIED[] = ".unified";
int namelen = strlen(my_instance->filebase) + sizeof(UNIFIED);
char *filename = NULL;
if ((filename = MXS_CALLOC(namelen, sizeof(char))) != NULL)
{
snprintf(filename, namelen, "%s.unified", my_instance->filebase);
// Open the file. It is only closed at program exit
my_instance->unified_fp = fopen(filename, "w");
if (my_instance->unified_fp == NULL)
{
char errbuf[MXS_STRERROR_BUFLEN];
MXS_ERROR("Opening output file for qla "
"filter failed due to %d, %s",
errno,
strerror_r(errno, errbuf, sizeof(errbuf)));
error = true;
}
MXS_FREE(filename);
}
else
{
error = true;
}
}
if (error) if (error)
{ {
@ -290,6 +347,10 @@ createInstance(const char *name, char **options, FILTER_PARAMETER **params)
MXS_FREE(my_instance->nomatch); MXS_FREE(my_instance->nomatch);
regfree(&my_instance->nore); regfree(&my_instance->nore);
} }
if (my_instance->unified_fp != NULL)
{
fclose(my_instance->unified_fp);
}
MXS_FREE(my_instance->filebase); MXS_FREE(my_instance->filebase);
MXS_FREE(my_instance->source); MXS_FREE(my_instance->source);
MXS_FREE(my_instance->userName); MXS_FREE(my_instance->userName);
@ -339,15 +400,17 @@ newSession(FILTER *instance, SESSION *session)
my_session->user = userName; my_session->user = userName;
my_session->remote = remote; my_session->remote = remote;
my_session->ses_id = session->ses_id;
sprintf(my_session->filename, "%s.%d", sprintf(my_session->filename, "%s.%lu",
my_instance->filebase, my_instance->filebase,
my_instance->sessions); my_session->ses_id); // Fixed possible race condition
// Multiple sessions can try to update my_instance->sessions simultaneously // Multiple sessions can try to update my_instance->sessions simultaneously
atomic_add(&(my_instance->sessions), 1); atomic_add(&(my_instance->sessions), 1);
if (my_session->active) // Only open the session file if the corresponding mode setting is used
if (my_session->active && (my_instance->log_mode_flags | CONFIG_FILE_SESSION))
{ {
my_session->fp = fopen(my_session->filename, "w"); my_session->fp = fopen(my_session->filename, "w");
@ -355,7 +418,7 @@ newSession(FILTER *instance, SESSION *session)
{ {
char errbuf[MXS_STRERROR_BUFLEN]; char errbuf[MXS_STRERROR_BUFLEN];
MXS_ERROR("Opening output file for qla " MXS_ERROR("Opening output file for qla "
"fileter failed due to %d, %s", "filter failed due to %d, %s",
errno, errno,
strerror_r(errno, errbuf, sizeof(errbuf))); strerror_r(errno, errbuf, sizeof(errbuf)));
MXS_FREE(my_session->filename); MXS_FREE(my_session->filename);
@ -364,14 +427,6 @@ newSession(FILTER *instance, SESSION *session)
} }
} }
} }
else
{
char errbuf[MXS_STRERROR_BUFLEN];
MXS_ERROR("Memory allocation for qla filter failed due to "
"%d, %s.",
errno,
strerror_r(errno, errbuf, sizeof(errbuf)));
}
return my_session; return my_session;
} }
@ -459,8 +514,31 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue)
gettimeofday(&tv, NULL); gettimeofday(&tv, NULL);
localtime_r(&tv.tv_sec, &t); localtime_r(&tv.tv_sec, &t);
strftime(buffer, sizeof(buffer), "%F %T", &t); strftime(buffer, sizeof(buffer), "%F %T", &t);
fprintf(my_session->fp, "%s,%s@%s,%s\n", buffer, my_session->user,
my_session->remote, trim(squeeze_whitespace(ptr))); /**
* Loop over all the possible log file modes and write to
* the enabled files.
*/
char *sql_string = trim(squeeze_whitespace(ptr));
if (my_instance->log_mode_flags & CONFIG_FILE_SESSION)
{
fprintf(my_session->fp, "%s,%s@%s,%s\n", buffer, my_session->user,
my_session->remote, sql_string);
if (my_instance->flush_writes)
{
fflush(my_session->fp);
}
}
if (my_instance->log_mode_flags & CONFIG_FILE_UNIFIED)
{
fprintf(my_instance->unified_fp, "S%zd,%s,%s@%s,%s\n",
my_session->ses_id, buffer, my_session->user,
my_session->remote, sql_string);
if (my_instance->flush_writes)
{
fflush(my_instance->unified_fp);
}
}
} }
MXS_FREE(ptr); MXS_FREE(ptr);
} }