Merge branch 'develop' into MXS-1266

This commit is contained in:
MassimilianoPinto
2017-06-13 11:18:51 +02:00
20 changed files with 690 additions and 363 deletions

View File

@ -78,7 +78,7 @@ static int cdc_auth_set_client_data(
* @param args Arguments for this command
* @return True if user was successfully added
*/
static bool cdc_add_new_user(const MODULECMD_ARG *args)
static bool cdc_add_new_user(const MODULECMD_ARG *args, json_t** output)
{
const char *user = args->argv[1].value.string;
size_t userlen = strlen(user);
@ -154,7 +154,9 @@ MXS_MODULE* MXS_CREATE_MODULE()
{ MODULECMD_ARG_STRING, "Password of the user"}
};
modulecmd_register_command("cdc", "add_user", MODULECMD_TYPE_ACTIVE, cdc_add_new_user, 3, args);
modulecmd_register_command("cdc", "add_user", MODULECMD_TYPE_ACTIVE,
cdc_add_new_user, 3, args,
"Add a new CDC user");
static MXS_AUTHENTICATOR MyObject =
{

View File

@ -85,7 +85,7 @@ void cache_config_reset(CACHE_CONFIG& config)
*
* @return True, if the command was handled.
*/
bool cache_command_show(const MODULECMD_ARG* pArgs)
bool cache_command_show(const MODULECMD_ARG* pArgs, json_t** output)
{
ss_dassert(pArgs->argc == 2);
ss_dassert(MODULECMD_GET_TYPE(&pArgs->argv[0].type) == MODULECMD_ARG_OUTPUT);
@ -148,7 +148,8 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
};
modulecmd_register_command(MXS_MODULE_NAME, "show", MODULECMD_TYPE_PASSIVE,
cache_command_show, MXS_ARRAY_NELEMS(show_argv), show_argv);
cache_command_show, MXS_ARRAY_NELEMS(show_argv), show_argv,
"Show cache filter statistics");
MXS_NOTICE("Initialized cache module %s.\n", VERSION_STRING);

View File

@ -103,6 +103,15 @@ static const MXS_ENUM_VALUE option_values[] =
{NULL}
};
typedef enum ccr_hint_value_t
{
CCR_HINT_NONE,
CCR_HINT_MATCH,
CCR_HINT_IGNORE
} CCR_HINT_VALUE;
static CCR_HINT_VALUE search_ccr_hint(GWBUF* buffer);
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
@ -148,11 +157,11 @@ MXS_MODULE* MXS_CREATE_MODULE()
{"match", MXS_MODULE_PARAM_STRING},
{"ignore", MXS_MODULE_PARAM_STRING},
{
"options",
MXS_MODULE_PARAM_ENUM,
"ignorecase",
MXS_MODULE_OPT_NONE,
option_values
"options",
MXS_MODULE_PARAM_ENUM,
"ignorecase",
MXS_MODULE_OPT_NONE,
option_values
},
{MXS_END_MODULE_PARAMS}
}
@ -299,28 +308,49 @@ routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *queue)
{
if ((sql = modutil_get_SQL(queue)) != NULL)
{
if (my_instance->nomatch == NULL ||
(my_instance->nomatch && regexec(&my_instance->nore, sql, 0, NULL, 0) != 0))
bool trigger_ccr = true;
bool decided = false; // Set by hints to take precedence.
CCR_HINT_VALUE ccr_hint_val = search_ccr_hint(queue);
if (ccr_hint_val == CCR_HINT_IGNORE)
{
if (my_instance->match == NULL ||
(my_instance->match && regexec(&my_instance->re, sql, 0, NULL, 0) == 0))
trigger_ccr = false;
decided = true;
}
else if (ccr_hint_val == CCR_HINT_MATCH)
{
decided = true;
}
if (!decided)
{
if (my_instance->nomatch &&
regexec(&my_instance->nore, sql, 0, NULL, 0) == 0)
{
if (my_instance->count)
{
my_session->hints_left = my_instance->count;
MXS_INFO("Write operation detected, next %d queries routed to master", my_instance->count);
}
if (my_instance->time)
{
my_session->last_modification = now;
MXS_INFO("Write operation detected, queries routed to master for %d seconds", my_instance->time);
}
my_instance->stats.n_modified++;
// Nomatch was present and sql matched it.
trigger_ccr = false;
}
else if (my_instance->match &&
(regexec(&my_instance->re, sql, 0, NULL, 0) != 0))
{
// Match was present but sql did *not* match it.
trigger_ccr = false;
}
}
if (trigger_ccr)
{
if (my_instance->count)
{
my_session->hints_left = my_instance->count;
MXS_INFO("Write operation detected, next %d queries routed to master", my_instance->count);
}
if (my_instance->time)
{
my_session->last_modification = now;
MXS_INFO("Write operation detected, queries routed to master for %d seconds", my_instance->time);
}
my_instance->stats.n_modified++;
}
MXS_FREE(sql);
}
}
@ -429,3 +459,52 @@ static uint64_t getCapabilities(MXS_FILTER* instance)
{
return RCAP_TYPE_NONE;
}
/**
* Find the first CCR filter hint. The hint is removed from the buffer and the
* contents returned.
*
* @param buffer Input buffer
* @return The found ccr hint value
*/
static CCR_HINT_VALUE search_ccr_hint(GWBUF* buffer)
{
const char CCR[] = "ccr";
CCR_HINT_VALUE rval = CCR_HINT_NONE;
bool found_ccr = false;
HINT** prev_ptr = &buffer->hint;
HINT* hint = buffer->hint;
while (hint && !found_ccr)
{
if (hint->type == HINT_PARAMETER && strcasecmp(hint->data, CCR) == 0)
{
found_ccr = true;
if (strcasecmp(hint->value, "match") == 0)
{
rval = CCR_HINT_MATCH;
}
else if (strcasecmp(hint->value, "ignore") == 0)
{
rval = CCR_HINT_IGNORE;
}
else
{
MXS_ERROR("Unknown value for hint parameter %s: '%s'.",
CCR, (char*)hint->value);
}
}
else
{
prev_ptr = &hint->next;
hint = hint->next;
}
}
// Remove the ccr-hint from the hint chain. Otherwise rwsplit will complain.
if (found_ccr)
{
*prev_ptr = hint->next;
hint_free(hint);
}
return rval;
}

View File

@ -144,6 +144,7 @@ const char* rule_names[] =
{
"UNDEFINED",
"COLUMN",
"FUNCTION",
"THROTTLE",
"PERMISSION",
"WILDCARD",
@ -712,7 +713,7 @@ TIMERANGE* split_reverse_time(TIMERANGE* tr)
return tmp;
}
bool dbfw_reload_rules(const MODULECMD_ARG *argv)
bool dbfw_reload_rules(const MODULECMD_ARG *argv, json_t** output)
{
bool rval = true;
MXS_FILTER_DEF *filter = argv->argv[0].value.filter;
@ -776,7 +777,7 @@ bool dbfw_reload_rules(const MODULECMD_ARG *argv)
return rval;
}
bool dbfw_show_rules(const MODULECMD_ARG *argv)
bool dbfw_show_rules(const MODULECMD_ARG *argv, json_t** output)
{
DCB *dcb = argv->argv[0].value.dcb;
MXS_FILTER_DEF *filter = argv->argv[1].value.filter;
@ -802,6 +803,30 @@ bool dbfw_show_rules(const MODULECMD_ARG *argv)
return true;
}
bool dbfw_show_rules_json(const MODULECMD_ARG *argv, json_t** output)
{
MXS_FILTER_DEF *filter = argv->argv[0].value.filter;
FW_INSTANCE *inst = (FW_INSTANCE*)filter_def_get_instance(filter);
json_t* arr = json_array();
if (!thr_rules || !thr_users)
{
if (!replace_rules(inst))
{
return 0;
}
}
for (RULE *rule = thr_rules; rule; rule = rule->next)
{
json_array_append_new(arr, rule_to_json(rule));
}
*output = arr;
return true;
}
static const MXS_ENUM_VALUE action_values[] =
{
{"allow", FW_ACTION_ALLOW},
@ -827,7 +852,8 @@ MXS_MODULE* MXS_CREATE_MODULE()
};
modulecmd_register_command(MXS_MODULE_NAME, "rules/reload", MODULECMD_TYPE_ACTIVE,
dbfw_reload_rules, 2, args_rules_reload);
dbfw_reload_rules, 2, args_rules_reload,
"Reload dbfwfilter rules");
modulecmd_arg_type_t args_rules_show[] =
{
@ -836,7 +862,17 @@ MXS_MODULE* MXS_CREATE_MODULE()
};
modulecmd_register_command(MXS_MODULE_NAME, "rules", MODULECMD_TYPE_PASSIVE,
dbfw_show_rules, 2, args_rules_show);
dbfw_show_rules, 2, args_rules_show,
"(deprecated) Show dbfwfilter rule statistics");
modulecmd_arg_type_t args_rules_show_json[] =
{
{MODULECMD_ARG_FILTER | MODULECMD_ARG_NAME_MATCHES_DOMAIN, "Filter to inspect"}
};
modulecmd_register_command(MXS_MODULE_NAME, "rules/json", MODULECMD_TYPE_PASSIVE,
dbfw_show_rules_json, 1, args_rules_show_json,
"Show dbfwfilter rule statistics as JSON");
static MXS_FILTER_OBJECT MyObject =
{

View File

@ -32,7 +32,7 @@ char VERSION_STRING[] = "V1.0.0";
*
* @return True, if the command was handled.
*/
bool masking_command_reload(const MODULECMD_ARG* pArgs)
bool masking_command_reload(const MODULECMD_ARG* pArgs, json_t** output)
{
ss_dassert(pArgs->argc == 2);
ss_dassert(MODULECMD_GET_TYPE(&pArgs->argv[0].type) == MODULECMD_ARG_OUTPUT);
@ -66,7 +66,8 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
modulecmd_register_command(MXS_MODULE_NAME, "reload",
MODULECMD_TYPE_ACTIVE, masking_command_reload,
MXS_ARRAY_NELEMS(reload_argv), reload_argv);
MXS_ARRAY_NELEMS(reload_argv), reload_argv,
"Reload masking filter rules");
MXS_NOTICE("Masking module %s initialized.", VERSION_STRING);

View File

@ -13,49 +13,36 @@
/**
* @file qlafilter.c - Quary Log All Filter
* @verbatim
*
* QLA Filter - Query Log All. A primitive query logging filter, simply
* used to verify the filter mechanism for downstream filters. All queries
* that are passed through the filter will be written to file.
* QLA Filter - Query Log All. A simple query logging filter. All queries passing
* through the filter are written to a text file.
*
* The filter makes no attempt to deal with query packets that do not fit
* in a single GWBUF.
*
* A single option may be passed to the filter, this is the name of the
* file to which the queries are logged. A serial number is appended to this
* name in order that each session logs to a different file.
*
* Date Who Description
* 03/06/2014 Mark Riddoch Initial implementation
* 11/06/2014 Mark Riddoch Addition of source and match parameters
* 19/06/2014 Mark Riddoch Addition of user parameter
*
* @endverbatim
*/
#define MXS_MODULE_NAME "qlafilter"
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <fcntl.h>
#include <time.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include <maxscale/alloc.h>
#include <maxscale/atomic.h>
#include <maxscale/filter.h>
#include <maxscale/log_manager.h>
#include <maxscale/modinfo.h>
#include <maxscale/modutil.h>
#include <maxscale/utils.h>
#include <maxscale/log_manager.h>
#include <time.h>
#include <sys/time.h>
#include <regex.h>
#include <string.h>
#include <maxscale/atomic.h>
#include <maxscale/alloc.h>
#include <maxscale/pcre2.h>
#include <maxscale/service.h>
#include <maxscale/utils.h>
/** Date string buffer size */
/* Date string buffer size */
#define QLA_DATE_BUFFER_SIZE 20
/** Log file save mode flags */
/* Log file save mode flags */
#define CONFIG_FILE_SESSION (1 << 0) // Default value, session specific files
#define CONFIG_FILE_UNIFIED (1 << 1) // One file shared by all sessions
@ -69,12 +56,10 @@ enum log_options
LOG_DATA_QUERY = (1 << 4),
};
/** Default values for logged data */
/* Default values for logged data */
#define LOG_DATA_DEFAULT "date,user,query"
/*
* The filter entry points
*/
/* The filter entry points */
static MXS_FILTER *createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *);
static MXS_FILTER_SESSION *newSession(MXS_FILTER *instance, MXS_SESSION *session);
static void closeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session);
@ -101,47 +86,48 @@ typedef struct
char *source; /* The source of the client connection to filter on */
char *user_name; /* The user name to filter on */
char *match; /* Optional text to match against */
regex_t re; /* Compiled regex text */
char *nomatch; /* Optional text to match against for exclusion */
regex_t nore; /* Compiled regex nomatch text */
pcre2_code* re_match; /* Compiled regex text */
char *exclude; /* Optional text to match against for exclusion */
pcre2_code* re_exclude; /* Compiled regex nomatch text */
uint32_t ovec_size; /* PCRE2 match data ovector size */
uint32_t log_mode_flags; /* Log file mode settings */
uint32_t log_file_data_flags; /* What data is saved to the files */
FILE *unified_fp; /* Unified log file. The pointer needs to be shared here
* to avoid garbled printing. */
bool flush_writes; /* Flush log file after every write? */
bool append; /* Open files in append-mode? */
bool write_warning_given; /* To make sure some warning are only given once */
/* Avoid repeatedly printing some errors/warnings. */
bool write_warning_given;
bool match_error_printed;
bool exclude_error_printed;
} QLA_INSTANCE;
/**
* The session structure for this QLA filter.
* This stores the downstream filter information, such that the
* filter is able to pass the query on to the next filter (or router)
* in the chain.
*
* It also holds the file descriptor to which queries are written.
*/
/* The session structure for this QLA filter. */
typedef struct
{
int active;
MXS_DOWNSTREAM down;
char *filename; /* The session-specific log file name */
FILE *fp; /* The session-specific log file */
const char *remote;
char *service; /* The service name this filter is attached to. Not owned. */
size_t ses_id; /* The session this filter serves */
const char *user; /* The client */
char *filename; /* The session-specific log file name */
FILE *fp; /* The session-specific log file */
const char *remote; /* Client address */
char *service; /* The service name this filter is attached to. Not owned. */
size_t ses_id; /* The session this filter serves */
const char *user; /* The client */
pcre2_match_data* match_data; /* Regex match data */
} QLA_SESSION;
static FILE* open_log_file(uint32_t, QLA_INSTANCE *, const char *);
static int write_log_entry(uint32_t, FILE*, QLA_INSTANCE*, QLA_SESSION*, const char*,
const char*, size_t);
static bool regex_check(QLA_INSTANCE* my_instance, QLA_SESSION* my_session,
const char* ptr, int length);
static const MXS_ENUM_VALUE option_values[] =
{
{"ignorecase", REG_ICASE},
{"ignorecase", PCRE2_CASELESS},
{"case", 0},
{"extended", REG_EXTENDED},
{"extended", PCRE2_EXTENDED},
{NULL}
};
@ -162,11 +148,19 @@ static const MXS_ENUM_VALUE log_data_values[] =
{NULL}
};
static const char PARAM_MATCH[] = "match";
static const char PARAM_EXCLUDE[] = "exclude";
static const char PARAM_USER[] = "user";
static const char PARAM_SOURCE[] = "source";
static const char PARAM_FILEBASE[] = "filebase";
static const char PARAM_OPTIONS[] = "options";
static const char PARAM_LOG_TYPE[] = "log_type";
static const char PARAM_LOG_DATA[] = "log_data";
static const char PARAM_FLUSH[] = "flush";
static const char PARAM_APPEND[] = "append";
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
* "module object", this is a structure with the set of
* external entry points for this module.
* The module entry point routine.
*
* @return The module object
*/
@ -203,55 +197,55 @@ MXS_MODULE* MXS_CREATE_MODULE()
NULL, /* Thread finish. */
{
{
"match",
PARAM_MATCH,
MXS_MODULE_PARAM_REGEX
},
{
PARAM_EXCLUDE,
MXS_MODULE_PARAM_REGEX
},
{
PARAM_USER,
MXS_MODULE_PARAM_STRING
},
{
"exclude",
PARAM_SOURCE,
MXS_MODULE_PARAM_STRING
},
{
"user",
MXS_MODULE_PARAM_STRING
},
{
"source",
MXS_MODULE_PARAM_STRING
},
{
"filebase",
PARAM_FILEBASE,
MXS_MODULE_PARAM_STRING,
NULL,
MXS_MODULE_OPT_REQUIRED
},
{
"options",
PARAM_OPTIONS,
MXS_MODULE_PARAM_ENUM,
"ignorecase",
MXS_MODULE_OPT_NONE,
option_values
},
{
"log_type",
PARAM_LOG_TYPE,
MXS_MODULE_PARAM_ENUM,
"session",
MXS_MODULE_OPT_NONE,
log_type_values
},
{
"log_data",
PARAM_LOG_DATA,
MXS_MODULE_PARAM_ENUM,
LOG_DATA_DEFAULT,
MXS_MODULE_OPT_NONE,
log_data_values
},
{
"flush",
PARAM_FLUSH,
MXS_MODULE_PARAM_BOOL,
"false"
},
{
"append",
PARAM_APPEND,
MXS_MODULE_PARAM_BOOL,
"false"
},
@ -263,14 +257,13 @@ MXS_MODULE* MXS_CREATE_MODULE()
}
/**
* Create an instance of the filter for a particular service
* within MaxScale.
* Create an instance of the filter for a particular service within MaxScale.
*
* @param name The name of the instance (as defined in the config file).
* @param name The name of the instance (as defined in the config file)
* @param options The options for this filter
* @param params The array of name/value pair parameters for the filter
*
* @return The instance data for this new instance
* @return The new filter instance, or NULL on error
*/
static MXS_FILTER *
createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *params)
@ -279,39 +272,53 @@ createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *params)
if (my_instance)
{
my_instance->name = MXS_STRDUP_A(name);
my_instance->sessions = 0;
my_instance->ovec_size = 0;
my_instance->unified_fp = NULL;
my_instance->write_warning_given = false;
my_instance->name = MXS_STRDUP_A(name);
my_instance->filebase = MXS_STRDUP_A(config_get_string(params, "filebase"));
my_instance->flush_writes = config_get_bool(params, "flush");
my_instance->append = config_get_bool(params, "append");
my_instance->match = config_copy_string(params, "match");
my_instance->nomatch = config_copy_string(params, "exclude");
my_instance->source = config_copy_string(params, "source");
my_instance->user_name = config_copy_string(params, "user");
my_instance->log_file_data_flags = config_get_enum(params, "log_data", log_data_values);
my_instance->log_mode_flags = config_get_enum(params, "log_type", log_type_values);
my_instance->match_error_printed = false;
my_instance->exclude_error_printed = false;
my_instance->source = config_copy_string(params, PARAM_SOURCE);
my_instance->user_name = config_copy_string(params, PARAM_USER);
my_instance->filebase = MXS_STRDUP_A(config_get_string(params, PARAM_FILEBASE));
my_instance->append = config_get_bool(params, PARAM_APPEND);
my_instance->flush_writes = config_get_bool(params, PARAM_FLUSH);
my_instance->log_file_data_flags = config_get_enum(params, PARAM_LOG_DATA, log_data_values);
my_instance->log_mode_flags = config_get_enum(params, PARAM_LOG_TYPE, log_type_values);
my_instance->match = config_copy_string(params, PARAM_MATCH);
my_instance->exclude = config_copy_string(params, PARAM_EXCLUDE);
my_instance->re_exclude = NULL;
my_instance->re_match = NULL;
bool error = false;
int cflags = config_get_enum(params, "options", option_values);
if (my_instance->match && regcomp(&my_instance->re, my_instance->match, cflags))
int cflags = config_get_enum(params, PARAM_OPTIONS, option_values);
if (my_instance->match)
{
MXS_ERROR("Invalid regular expression '%s' for the 'match' "
"parameter.", my_instance->match);
MXS_FREE(my_instance->match);
my_instance->match = NULL;
error = true;
my_instance->re_match =
config_get_compiled_regex(params, PARAM_MATCH, cflags, &my_instance->ovec_size);
if (!my_instance->re_match)
{
error = true;
}
}
if (my_instance->nomatch && regcomp(&my_instance->nore, my_instance->nomatch, cflags))
if (my_instance->exclude)
{
MXS_ERROR("Invalid regular expression '%s' for the 'nomatch'"
" parameter.", my_instance->nomatch);
MXS_FREE(my_instance->nomatch);
my_instance->nomatch = NULL;
error = true;
uint32_t ovec_size_temp = 0;
my_instance->re_exclude =
config_get_compiled_regex(params, PARAM_EXCLUDE, cflags, &ovec_size_temp);
if (ovec_size_temp > my_instance->ovec_size)
{
my_instance->ovec_size = ovec_size_temp;
}
if (!my_instance->re_exclude)
{
error = true;
}
}
// Try to open the unified log file
@ -330,10 +337,8 @@ createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *params)
if (my_instance->unified_fp == NULL)
{
MXS_ERROR("Opening output file for qla "
"filter failed due to %d, %s",
errno,
mxs_strerror(errno));
MXS_ERROR("Opening output file for qla-filter failed due to %d, %s",
errno, mxs_strerror(errno));
error = true;
}
MXS_FREE(filename);
@ -346,17 +351,11 @@ createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *params)
if (error)
{
if (my_instance->match)
{
MXS_FREE(my_instance->match);
regfree(&my_instance->re);
}
if (my_instance->nomatch)
{
MXS_FREE(my_instance->nomatch);
regfree(&my_instance->nore);
}
MXS_FREE(my_instance->name);
MXS_FREE(my_instance->match);
pcre2_code_free(my_instance->re_match);
MXS_FREE(my_instance->exclude);
pcre2_code_free(my_instance->re_exclude);
if (my_instance->unified_fp != NULL)
{
fclose(my_instance->unified_fp);
@ -389,8 +388,19 @@ newSession(MXS_FILTER *instance, MXS_SESSION *session)
if ((my_session = MXS_CALLOC(1, sizeof(QLA_SESSION))) != NULL)
{
if ((my_session->filename = (char *)MXS_MALLOC(strlen(my_instance->filebase) + 20)) == NULL)
my_session->fp = NULL;
my_session->match_data = NULL;
my_session->filename = (char *)MXS_MALLOC(strlen(my_instance->filebase) + 20);
const uint32_t ovec_size = my_instance->ovec_size;
if (ovec_size)
{
my_session->match_data = pcre2_match_data_create(ovec_size, NULL);
}
if (!my_session->filename || (ovec_size && !my_session->match_data))
{
MXS_FREE(my_session->filename);
pcre2_match_data_free(my_session->match_data);
MXS_FREE(my_session);
return NULL;
}
@ -415,7 +425,7 @@ newSession(MXS_FILTER *instance, MXS_SESSION *session)
sprintf(my_session->filename, "%s.%lu",
my_instance->filebase,
my_session->ses_id); // Fixed possible race condition
my_session->ses_id);
// Multiple sessions can try to update my_instance->sessions simultaneously
atomic_add(&(my_instance->sessions), 1);
@ -429,11 +439,10 @@ newSession(MXS_FILTER *instance, MXS_SESSION *session)
if (my_session->fp == NULL)
{
MXS_ERROR("Opening output file for qla "
"filter failed due to %d, %s",
errno,
mxs_strerror(errno));
MXS_ERROR("Opening output file for qla-filter failed due to %d, %s",
errno, mxs_strerror(errno));
MXS_FREE(my_session->filename);
pcre2_match_data_free(my_session->match_data);
MXS_FREE(my_session);
my_session = NULL;
}
@ -473,6 +482,7 @@ freeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session)
QLA_SESSION *my_session = (QLA_SESSION *) session;
MXS_FREE(my_session->filename);
pcre2_match_data_free(my_session->match_data);
MXS_FREE(session);
return;
}
@ -513,58 +523,51 @@ routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *queue)
struct tm t;
struct timeval tv;
if (my_session->active)
if (my_session->active &&
modutil_extract_SQL(queue, &ptr, &length) &&
regex_check(my_instance, my_session, ptr, length))
{
if (modutil_extract_SQL(queue, &ptr, &length))
char buffer[QLA_DATE_BUFFER_SIZE];
gettimeofday(&tv, NULL);
localtime_r(&tv.tv_sec, &t);
strftime(buffer, sizeof(buffer), "%F %T", &t);
/**
* Loop over all the possible log file modes and write to
* the enabled files.
*/
char *sql_string = ptr;
bool write_error = false;
if (my_instance->log_mode_flags & CONFIG_FILE_SESSION)
{
if ((my_instance->match == NULL ||
regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) &&
(my_instance->nomatch == NULL ||
regexec(&my_instance->nore, ptr, 0, NULL, 0) != 0))
// In this case there is no need to write the session
// number into the files.
uint32_t data_flags = (my_instance->log_file_data_flags &
~LOG_DATA_SESSION);
if (write_log_entry(data_flags, my_session->fp,
my_instance, my_session, buffer, sql_string, length) < 0)
{
char buffer[QLA_DATE_BUFFER_SIZE];
gettimeofday(&tv, NULL);
localtime_r(&tv.tv_sec, &t);
strftime(buffer, sizeof(buffer), "%F %T", &t);
/**
* Loop over all the possible log file modes and write to
* the enabled files.
*/
char *sql_string = ptr;
bool write_error = false;
if (my_instance->log_mode_flags & CONFIG_FILE_SESSION)
{
// In this case there is no need to write the session
// number into the files.
uint32_t data_flags = (my_instance->log_file_data_flags &
~LOG_DATA_SESSION);
if (write_log_entry(data_flags, my_session->fp,
my_instance, my_session, buffer, sql_string, length) < 0)
{
write_error = true;
}
}
if (my_instance->log_mode_flags & CONFIG_FILE_UNIFIED)
{
uint32_t data_flags = my_instance->log_file_data_flags;
if (write_log_entry(data_flags, my_instance->unified_fp,
my_instance, my_session, buffer, sql_string, length) < 0)
{
write_error = true;
}
}
if (write_error && !my_instance->write_warning_given)
{
MXS_ERROR("qla-filter '%s': Log file write failed. "
"Suppressing further similar warnings.",
my_instance->name);
my_instance->write_warning_given = true;
}
write_error = true;
}
}
if (my_instance->log_mode_flags & CONFIG_FILE_UNIFIED)
{
uint32_t data_flags = my_instance->log_file_data_flags;
if (write_log_entry(data_flags, my_instance->unified_fp,
my_instance, my_session, buffer, sql_string, length) < 0)
{
write_error = true;
}
}
if (write_error && !my_instance->write_warning_given)
{
MXS_ERROR("qla-filter '%s': Log file write failed. "
"Suppressing further similar warnings.",
my_instance->name);
my_instance->write_warning_given = true;
}
}
/* Pass the query downstream */
return my_session->down.routeQuery(my_session->down.instance,
@ -580,7 +583,7 @@ routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *queue)
*
* @param instance The filter instance
* @param fsession Filter session, may be NULL
* @param dcb The DCB for diagnostic output
* @param dcb The DCB for diagnostic output
*/
static void
diagnostic(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, DCB *dcb)
@ -608,10 +611,10 @@ diagnostic(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, DCB *dcb)
dcb_printf(dcb, "\t\tInclude queries that match %s\n",
my_instance->match);
}
if (my_instance->nomatch)
if (my_instance->exclude)
{
dcb_printf(dcb, "\t\tExclude queries that match %s\n",
my_instance->nomatch);
my_instance->exclude);
}
}
@ -639,22 +642,22 @@ static json_t* diagnostic_json(const MXS_FILTER *instance, const MXS_FILTER_SESS
if (my_instance->source)
{
json_object_set_new(rval, "source", json_string(my_instance->source));
json_object_set_new(rval, PARAM_SOURCE, json_string(my_instance->source));
}
if (my_instance->user_name)
{
json_object_set_new(rval, "user", json_string(my_instance->user_name));
json_object_set_new(rval, PARAM_USER, json_string(my_instance->user_name));
}
if (my_instance->match)
{
json_object_set_new(rval, "match", json_string(my_instance->match));
json_object_set_new(rval, PARAM_MATCH, json_string(my_instance->match));
}
if (my_instance->nomatch)
if (my_instance->exclude)
{
json_object_set_new(rval, "exclude", json_string(my_instance->nomatch));
json_object_set_new(rval, PARAM_EXCLUDE, json_string(my_instance->exclude));
}
return rval;
@ -669,6 +672,7 @@ static uint64_t getCapabilities(MXS_FILTER* instance)
{
return RCAP_TYPE_NONE;
}
/**
* Open the log file and print a header if appropriate.
* @param data_flags Data save settings flags
@ -687,10 +691,12 @@ static FILE* open_log_file(uint32_t data_flags, QLA_INSTANCE *instance, const ch
}
else
{
// Using fopen() with 'a+' means we will always write to the end but can read
// anywhere. Depending on the "append"-setting the file has been
// opened in different modes, which should be considered if file handling
// changes later (e.g. rewinding).
/**
* Using fopen() with 'a+' means we will always write to the end but can read
* anywhere. Depending on the "append"-setting the file has been
* opened in different modes, which should be considered if file handling
* changes later (e.g. rewinding).
*/
if ((fp = fopen(filename, "a+")) != NULL)
{
// Check to see if file already has contents
@ -770,13 +776,14 @@ static FILE* open_log_file(uint32_t data_flags, QLA_INSTANCE *instance, const ch
/**
* Write an entry to the log file.
*
* @param data_flags Controls what to write
* @param logfile Target file
* @param instance Filter instance
* @param session Filter session
* @param time_string Date entry
* @param sql_string SQL-query, not NULL terminated!
* @param sql_str_len Length of SQL-string
* @param logfile Target file
* @param instance Filter instance
* @param session Filter session
* @param time_string Date entry
* @param sql_string SQL-query, *not* NULL terminated
* @param sql_str_len Length of SQL-string
* @return The number of characters written, or a negative value on failure
*/
static int write_log_entry(uint32_t data_flags, FILE *logfile, QLA_INSTANCE *instance,
@ -785,11 +792,12 @@ static int write_log_entry(uint32_t data_flags, FILE *logfile, QLA_INSTANCE *ins
{
ss_dassert(logfile != NULL);
size_t print_len = 0;
// First calculate an upper limit for the total length. The strlen()-calls
// could be removed if the values would be saved into the instance or session
// or if we had some reasonable max lengths. (Apparently there are max lengths
// but they are much higher than what is typically needed)
/**
* First calculate an upper limit for the total length. The strlen()-calls
* could be removed if the values would be saved into the instance or session
* or if we had some reasonable max lengths. (Apparently there are max lengths
* but they are much higher than what is typically needed.)
*/
// The numbers have some extra for delimiters.
if (data_flags & LOG_DATA_SERVICE)
@ -818,9 +826,9 @@ static int write_log_entry(uint32_t data_flags, FILE *logfile, QLA_INSTANCE *ins
return 0; // Nothing to print
}
// Allocate space for a buffer. Printing to the file in parts would likely
// cause garbled printing if several threads write simultaneously, so we
// have to first print to a string.
/* Allocate space for a buffer. Printing to the file in parts would likely
cause garbled printing if several threads write simultaneously, so we
have to first print to a string. */
char *print_str = NULL;
if ((print_str = MXS_CALLOC(print_len, sizeof(char))) == NULL)
{
@ -910,3 +918,46 @@ static int write_log_entry(uint32_t data_flags, FILE *logfile, QLA_INSTANCE *ins
return rval;
}
}
static bool regex_check(QLA_INSTANCE* my_instance, QLA_SESSION* my_session,
const char* ptr, int length)
{
bool rval = true;
if (my_instance->re_match)
{
int result = pcre2_match(my_instance->re_match, (PCRE2_SPTR)ptr,
length, 0, 0, my_session->match_data, NULL);
if (result == PCRE2_ERROR_NOMATCH)
{
rval = false; // Didn't match the "match"-regex
}
else if (result < 0)
{
rval = false;
if (!my_instance->match_error_printed)
{
MXS_PCRE2_PRINT_ERROR(result);
my_instance->match_error_printed = true;
}
}
}
if (rval && my_instance->re_exclude)
{
int result = pcre2_match(my_instance->re_exclude, (PCRE2_SPTR)ptr,
length, 0, 0, my_session->match_data, NULL);
if (result >= 0)
{
rval = false; // Matched the "exclude"-regex
}
else if (result != PCRE2_ERROR_NOMATCH)
{
rval = false;
if (!my_instance->exclude_error_printed)
{
MXS_PCRE2_PRINT_ERROR(result);
my_instance->exclude_error_printed = true;
}
}
}
return rval;
}

View File

@ -101,7 +101,7 @@ static bool conversion_task_ctl(AVRO_INSTANCE *inst, bool start);
static SPINLOCK instlock;
static AVRO_INSTANCE *instances;
bool avro_handle_convert(const MODULECMD_ARG *args)
bool avro_handle_convert(const MODULECMD_ARG *args, json_t** output)
{
bool rval = false;
@ -148,7 +148,9 @@ MXS_MODULE* MXS_CREATE_MODULE()
{ MODULECMD_ARG_SERVICE | MODULECMD_ARG_NAME_MATCHES_DOMAIN, "The avrorouter service" },
{ MODULECMD_ARG_STRING, "Action, whether to 'start' or 'stop' the conversion process" }
};
modulecmd_register_command(MXS_MODULE_NAME, "convert", MODULECMD_TYPE_ACTIVE, avro_handle_convert, 2, args);
modulecmd_register_command(MXS_MODULE_NAME, "convert", MODULECMD_TYPE_ACTIVE,
avro_handle_convert, 2, args,
"Start or stop the binlog to avro conversion process");
static MXS_ROUTER_OBJECT MyObject =
{

View File

@ -321,8 +321,9 @@ bool listfuncs_cb(const MODULECMD *cmd, void *data)
{
DCB *dcb = (DCB*)data;
dcb_printf(dcb, "Command: %s %s\n", cmd->domain, cmd->identifier);
dcb_printf(dcb, "Parameters: ");
dcb_printf(dcb, "Command:\t%s %s\n", cmd->domain, cmd->identifier);
dcb_printf(dcb, "Description:\t%s\n", cmd->description);
dcb_printf(dcb, "Parameters:\t");
for (int i = 0; i < cmd->arg_count_max; i++)
{
@ -1717,10 +1718,18 @@ static void callModuleCommand(DCB *dcb, char *domain, char *id, char *v3,
if (arg)
{
if (!modulecmd_call_command(cmd, arg))
json_t* output = NULL;
if (!modulecmd_call_command(cmd, arg, &output))
{
dcb_printf(dcb, "Error: %s\n", modulecmd_get_error());
}
else if (output)
{
dcb_printf(dcb, "%s\n", json_dumps(output, JSON_INDENT(4)));
}
json_decref(output);
modulecmd_arg_free(arg);
}
else