MXS-1511: QLA-Filter replace newlines in SQL-queries with custom strings
The config parameter 'newline_replacement' (defaults to 1 space " ") now defines what to write to the log file when the sql-query has a newline sequence (\n, \r or \r\n). If 'newline_replacement' is the empty string "", no replacing is done and newlines are printed to file. Also, adds the config parameter 'separator', which defines the string printed between elements. Default value is ",".
This commit is contained in:
@ -107,9 +107,9 @@ typedef struct
|
||||
char *unified_filename; /* Filename of the unified log file */
|
||||
bool flush_writes; /* Flush log file after every write? */
|
||||
bool append; /* Open files in append-mode? */
|
||||
|
||||
/* Avoid repeatedly printing some errors/warnings. */
|
||||
bool write_warning_given;
|
||||
char *query_newline; /* Character(s) used to replace a newline within a query */
|
||||
char *separator; /* Character(s) used to separate elements */
|
||||
bool write_warning_given; /* Avoid repeatedly printing some errors/warnings. */
|
||||
} QLA_INSTANCE;
|
||||
|
||||
/**
|
||||
@ -156,8 +156,8 @@ typedef struct
|
||||
LOG_EVENT_DATA event_data; /* Information about the latest event, required if logging execution time. */
|
||||
} QLA_SESSION;
|
||||
|
||||
static FILE* open_log_file(uint32_t, QLA_INSTANCE *, const char *);
|
||||
static int write_log_entry(uint32_t, FILE*, QLA_INSTANCE*, QLA_SESSION*,
|
||||
static FILE* open_log_file(QLA_INSTANCE *, uint32_t, const char *);
|
||||
static int write_log_entry(FILE*, QLA_INSTANCE*, QLA_SESSION*, uint32_t,
|
||||
const char*, const char*, size_t, int);
|
||||
static bool cb_log(const MODULECMD_ARG *argv, json_t** output);
|
||||
|
||||
@ -197,6 +197,8 @@ static const char PARAM_LOG_TYPE[] = "log_type";
|
||||
static const char PARAM_LOG_DATA[] = "log_data";
|
||||
static const char PARAM_FLUSH[] = "flush";
|
||||
static const char PARAM_APPEND[] = "append";
|
||||
static const char PARAM_NEWLINE[] = "newline_replacement";
|
||||
static const char PARAM_SEPARATOR[] = "separator";
|
||||
|
||||
MXS_BEGIN_DECLS
|
||||
|
||||
@ -300,6 +302,18 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
||||
MXS_MODULE_OPT_NONE,
|
||||
log_data_values
|
||||
},
|
||||
{
|
||||
PARAM_NEWLINE,
|
||||
MXS_MODULE_PARAM_QUOTEDSTRING,
|
||||
" ",
|
||||
MXS_MODULE_OPT_NONE
|
||||
},
|
||||
{
|
||||
PARAM_SEPARATOR,
|
||||
MXS_MODULE_PARAM_QUOTEDSTRING,
|
||||
",",
|
||||
MXS_MODULE_OPT_NONE
|
||||
},
|
||||
{
|
||||
PARAM_FLUSH,
|
||||
MXS_MODULE_PARAM_BOOL,
|
||||
@ -350,6 +364,8 @@ createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *params)
|
||||
my_instance->flush_writes = config_get_bool(params, PARAM_FLUSH);
|
||||
my_instance->log_file_data_flags = config_get_enum(params, PARAM_LOG_DATA, log_data_values);
|
||||
my_instance->log_mode_flags = config_get_enum(params, PARAM_LOG_TYPE, log_type_values);
|
||||
my_instance->query_newline = config_copy_string(params, PARAM_NEWLINE);
|
||||
my_instance->separator = config_copy_string(params, PARAM_SEPARATOR);
|
||||
|
||||
my_instance->match = config_copy_string(params, PARAM_MATCH);
|
||||
my_instance->exclude = config_copy_string(params, PARAM_EXCLUDE);
|
||||
@ -378,8 +394,7 @@ createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *params)
|
||||
{
|
||||
snprintf(filename, namelen, "%s.unified", my_instance->filebase);
|
||||
// Open the file. It is only closed at program exit
|
||||
my_instance->unified_fp = open_log_file(my_instance->log_file_data_flags,
|
||||
my_instance, filename);
|
||||
my_instance->unified_fp = open_log_file(my_instance, my_instance->log_file_data_flags, filename);
|
||||
|
||||
if (my_instance->unified_fp == NULL)
|
||||
{
|
||||
@ -413,6 +428,8 @@ createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *params)
|
||||
MXS_FREE(my_instance->filebase);
|
||||
MXS_FREE(my_instance->source);
|
||||
MXS_FREE(my_instance->user_name);
|
||||
MXS_FREE(my_instance->query_newline);
|
||||
MXS_FREE(my_instance->separator);
|
||||
MXS_FREE(my_instance);
|
||||
my_instance = NULL;
|
||||
}
|
||||
@ -485,7 +502,7 @@ newSession(MXS_FILTER *instance, MXS_SESSION *session)
|
||||
{
|
||||
uint32_t data_flags = (my_instance->log_file_data_flags &
|
||||
~LOG_DATA_SESSION); // No point printing "Session"
|
||||
my_session->fp = open_log_file(data_flags, my_instance, my_session->filename);
|
||||
my_session->fp = open_log_file(my_instance, data_flags, my_session->filename);
|
||||
|
||||
if (my_session->fp == NULL)
|
||||
{
|
||||
@ -574,22 +591,21 @@ setUpstream(MXS_FILTER *instance, MXS_FILTER_SESSION *session, MXS_UPSTREAM *ups
|
||||
*
|
||||
* @param my_instance Filter instance
|
||||
* @param my_session Filter session
|
||||
* @param date_string Date string
|
||||
* @param query Query string, not 0-terminated
|
||||
* @param querylen Query string length
|
||||
* @param date_string Date string
|
||||
* @param elapsed_ms Query execution time, in milliseconds
|
||||
*/
|
||||
void write_log_entries(QLA_INSTANCE* my_instance, QLA_SESSION* my_session,
|
||||
const char* query, int querylen, const char* date_string, int elapsed_ms)
|
||||
const char* date_string, const char* query, int querylen, int elapsed_ms)
|
||||
{
|
||||
bool write_error = false;
|
||||
if (my_instance->log_mode_flags & CONFIG_FILE_SESSION)
|
||||
{
|
||||
// In this case there is no need to write the session
|
||||
// number into the files.
|
||||
uint32_t data_flags = (my_instance->log_file_data_flags &
|
||||
~LOG_DATA_SESSION);
|
||||
if (write_log_entry(data_flags, my_session->fp, my_instance, my_session,
|
||||
uint32_t data_flags = (my_instance->log_file_data_flags & ~LOG_DATA_SESSION);
|
||||
if (write_log_entry(my_session->fp, my_instance, my_session, data_flags,
|
||||
date_string, query, querylen, elapsed_ms) < 0)
|
||||
{
|
||||
write_error = true;
|
||||
@ -598,7 +614,7 @@ void write_log_entries(QLA_INSTANCE* my_instance, QLA_SESSION* my_session,
|
||||
if (my_instance->log_mode_flags & CONFIG_FILE_UNIFIED)
|
||||
{
|
||||
uint32_t data_flags = my_instance->log_file_data_flags;
|
||||
if (write_log_entry(data_flags, my_instance->unified_fp, my_instance, my_session,
|
||||
if (write_log_entry(my_instance->unified_fp, my_instance, my_session, data_flags,
|
||||
date_string, query, querylen, elapsed_ms) < 0)
|
||||
{
|
||||
write_error = true;
|
||||
@ -667,7 +683,7 @@ routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *queue)
|
||||
else
|
||||
{
|
||||
// If execution times are not logged, write the log entry now.
|
||||
write_log_entries(my_instance, my_session, query, query_len, event.query_date, -1);
|
||||
write_log_entries(my_instance, my_session, event.query_date, query, query_len, -1);
|
||||
}
|
||||
}
|
||||
/* Pass the query downstream */
|
||||
@ -703,7 +719,7 @@ clientReply(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *queue)
|
||||
// Calculate elapsed time in milliseconds.
|
||||
double elapsed_ms = 1E3 * (now.tv_sec - event.begin_time.tv_sec) +
|
||||
(now.tv_nsec - event.begin_time.tv_nsec) / (double)1E6;
|
||||
write_log_entries(my_instance, my_session, query, query_len, event.query_date,
|
||||
write_log_entries(my_instance, my_session, event.query_date, query, query_len,
|
||||
std::floor(elapsed_ms + 0.5));
|
||||
clear(event);
|
||||
}
|
||||
@ -811,12 +827,13 @@ static uint64_t getCapabilities(MXS_FILTER* instance)
|
||||
|
||||
/**
|
||||
* Open the log file and print a header if appropriate.
|
||||
* @param data_flags Data save settings flags
|
||||
*
|
||||
* @param instance The filter instance
|
||||
* @param data_flags Data save settings flags
|
||||
* @param filename Target file path
|
||||
* @return A valid file on success, null otherwise.
|
||||
*/
|
||||
static FILE* open_log_file(uint32_t data_flags, QLA_INSTANCE *instance, const char *filename)
|
||||
static FILE* open_log_file(QLA_INSTANCE *instance, uint32_t data_flags, const char *filename)
|
||||
{
|
||||
bool file_existed = false;
|
||||
FILE *fp = NULL;
|
||||
@ -843,68 +860,56 @@ static FILE* open_log_file(uint32_t data_flags, QLA_INSTANCE *instance, const ch
|
||||
}
|
||||
}
|
||||
}
|
||||
if (fp && !file_existed)
|
||||
|
||||
if (fp && !file_existed && data_flags != 0)
|
||||
{
|
||||
// Print a header. Luckily, we know the header has limited length
|
||||
const char SERVICE[] = "Service,";
|
||||
const char SESSION[] = "Session,";
|
||||
const char DATE[] = "Date,";
|
||||
const char USERHOST[] = "User@Host,";
|
||||
const char QUERY[] = "Query,";
|
||||
const char REPLY_TIME[] = "Reply_time,";
|
||||
const int headerlen = sizeof(SERVICE) + sizeof(SERVICE) + sizeof(DATE) +
|
||||
sizeof(USERHOST) + sizeof(QUERY) + sizeof(REPLY_TIME);
|
||||
// Print a header.
|
||||
const char SERVICE[] = "Service";
|
||||
const char SESSION[] = "Session";
|
||||
const char DATE[] = "Date";
|
||||
const char USERHOST[] = "User@Host";
|
||||
const char QUERY[] = "Query";
|
||||
const char REPLY_TIME[] = "Reply_time";
|
||||
|
||||
char print_str[headerlen];
|
||||
memset(print_str, '\0', headerlen);
|
||||
std::stringstream header;
|
||||
const char* curr_sep = ""; // Use empty string as the first separator
|
||||
const char* real_sep = instance->separator;
|
||||
|
||||
char *current_pos = print_str;
|
||||
if (instance->log_file_data_flags & LOG_DATA_SERVICE)
|
||||
if (data_flags & LOG_DATA_SERVICE)
|
||||
{
|
||||
strcat(current_pos, SERVICE);
|
||||
current_pos += sizeof(SERVICE) - 1;
|
||||
header << SERVICE;
|
||||
curr_sep = real_sep;
|
||||
}
|
||||
if (instance->log_file_data_flags & LOG_DATA_SESSION)
|
||||
if (data_flags & LOG_DATA_SESSION)
|
||||
{
|
||||
strcat(current_pos, SESSION);
|
||||
current_pos += sizeof(SERVICE) - 1;
|
||||
header << curr_sep << SESSION;
|
||||
curr_sep = real_sep;
|
||||
}
|
||||
if (instance->log_file_data_flags & LOG_DATA_DATE)
|
||||
if (data_flags & LOG_DATA_DATE)
|
||||
{
|
||||
strcat(current_pos, DATE);
|
||||
current_pos += sizeof(DATE) - 1;
|
||||
header << curr_sep << DATE;
|
||||
curr_sep = real_sep;
|
||||
}
|
||||
if (instance->log_file_data_flags & LOG_DATA_USER)
|
||||
if (data_flags & LOG_DATA_USER)
|
||||
{
|
||||
strcat(current_pos, USERHOST);
|
||||
current_pos += sizeof(USERHOST) - 1;
|
||||
header << curr_sep << USERHOST;
|
||||
curr_sep = real_sep;
|
||||
}
|
||||
if (instance->log_file_data_flags & LOG_DATA_REPLY_TIME)
|
||||
if (data_flags & LOG_DATA_REPLY_TIME)
|
||||
{
|
||||
strcat(current_pos, REPLY_TIME);
|
||||
current_pos += sizeof(REPLY_TIME) - 1;
|
||||
header << curr_sep << REPLY_TIME;
|
||||
curr_sep = real_sep;
|
||||
}
|
||||
if (instance->log_file_data_flags & LOG_DATA_QUERY)
|
||||
if (data_flags & LOG_DATA_QUERY)
|
||||
{
|
||||
strcat(current_pos, QUERY);
|
||||
current_pos += sizeof(QUERY) - 1;
|
||||
}
|
||||
if (current_pos > print_str)
|
||||
{
|
||||
// Overwrite the last ','.
|
||||
*(current_pos - 1) = '\n';
|
||||
}
|
||||
else
|
||||
{
|
||||
// Nothing to print
|
||||
return fp;
|
||||
header << curr_sep << QUERY;
|
||||
}
|
||||
header << '\n';
|
||||
|
||||
// Finally, write the log header.
|
||||
int written = fprintf(fp, "%s", print_str);
|
||||
int written = fprintf(fp, "%s", header.str().c_str());
|
||||
|
||||
if ((written <= 0) ||
|
||||
((instance->flush_writes) && (fflush(fp) < 0)))
|
||||
if ((written <= 0) || ((instance->flush_writes) && (fflush(fp) < 0)))
|
||||
{
|
||||
// Weird error, file opened but a write failed. Best to stop.
|
||||
fclose(fp);
|
||||
@ -915,151 +920,129 @@ static FILE* open_log_file(uint32_t data_flags, QLA_INSTANCE *instance, const ch
|
||||
return fp;
|
||||
}
|
||||
|
||||
static void print_string_replace_newlines(const char *sql_string,
|
||||
size_t sql_str_len, const char* rep_newline,
|
||||
std::stringstream* output)
|
||||
{
|
||||
ss_dassert(output);
|
||||
size_t line_begin = 0;
|
||||
size_t search_pos = 0;
|
||||
while (search_pos < sql_str_len)
|
||||
{
|
||||
int line_end_chars = 0;
|
||||
// A newline is either \r\n, \n or \r
|
||||
if (sql_string[search_pos] == '\r')
|
||||
{
|
||||
if (search_pos + 1 < sql_str_len && sql_string[search_pos + 1] == '\n')
|
||||
{
|
||||
// Got \r\n
|
||||
line_end_chars = 2;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Just \r
|
||||
line_end_chars = 1;
|
||||
}
|
||||
}
|
||||
else if (sql_string[search_pos] == '\n')
|
||||
{
|
||||
// Just \n
|
||||
line_end_chars = 1;
|
||||
}
|
||||
|
||||
if (line_end_chars > 0)
|
||||
{
|
||||
// Found line ending characters, write out the line excluding line end.
|
||||
output->write(&sql_string[line_begin], search_pos - line_begin);
|
||||
*output << rep_newline;
|
||||
// Next line begins after line end chars
|
||||
line_begin = search_pos + line_end_chars;
|
||||
// For \r\n, advance search_pos
|
||||
search_pos += line_end_chars - 1;
|
||||
}
|
||||
|
||||
search_pos++;
|
||||
}
|
||||
|
||||
// Print anything left
|
||||
if (line_begin < sql_str_len)
|
||||
{
|
||||
output->write(&sql_string[line_begin], sql_str_len - line_begin);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write an entry to the log file.
|
||||
*
|
||||
* @param data_flags Controls what to write
|
||||
* @param logfile Target file
|
||||
* @param instance Filter instance
|
||||
* @param session Filter session
|
||||
* @param data_flags Controls what to write
|
||||
* @param time_string Date entry
|
||||
* @param sql_string SQL-query, *not* NULL terminated
|
||||
* @param sql_str_len Length of SQL-string
|
||||
* @param elapsed_ms Query execution time, in milliseconds
|
||||
* @return The number of characters written, or a negative value on failure
|
||||
*/
|
||||
static int write_log_entry(uint32_t data_flags, FILE *logfile, QLA_INSTANCE *instance,
|
||||
QLA_SESSION *session, const char *time_string, const char *sql_string,
|
||||
size_t sql_str_len, int elapsed_ms)
|
||||
static int write_log_entry(FILE *logfile, QLA_INSTANCE *instance, QLA_SESSION *session, uint32_t data_flags,
|
||||
const char *time_string, const char *sql_string, size_t sql_str_len,
|
||||
int elapsed_ms)
|
||||
{
|
||||
ss_dassert(logfile != NULL);
|
||||
size_t print_len = 0;
|
||||
/**
|
||||
* First calculate an upper limit for the total length. The strlen()-calls
|
||||
* could be removed if the values would be saved into the instance or session
|
||||
* or if we had some reasonable max lengths. (Apparently there are max lengths
|
||||
* but they are much higher than what is typically needed.)
|
||||
*/
|
||||
if (data_flags == 0)
|
||||
{
|
||||
// Nothing to print
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Printing to the file in parts would likely cause garbled printing if several threads write
|
||||
* simultaneously, so we have to first print to a string. */
|
||||
std::stringstream output;
|
||||
const char* curr_sep = ""; // Use empty string as the first separator
|
||||
const char* real_sep = instance->separator;
|
||||
|
||||
// The numbers have some extra for delimiters.
|
||||
const size_t integer_chars = 20; // Enough space for any integer type
|
||||
if (data_flags & LOG_DATA_SERVICE)
|
||||
{
|
||||
print_len += strlen(session->service) + 1;
|
||||
output << session->service;
|
||||
curr_sep = real_sep;
|
||||
}
|
||||
if (data_flags & LOG_DATA_SESSION)
|
||||
{
|
||||
print_len += integer_chars;
|
||||
output << curr_sep << session->ses_id;
|
||||
curr_sep = real_sep;
|
||||
}
|
||||
if (data_flags & LOG_DATA_DATE)
|
||||
{
|
||||
print_len += QLA_DATE_BUFFER_SIZE + 1;
|
||||
output << curr_sep << time_string;
|
||||
curr_sep = real_sep;
|
||||
}
|
||||
if (data_flags & LOG_DATA_USER)
|
||||
{
|
||||
print_len += strlen(session->user) + strlen(session->remote) + 2;
|
||||
output << curr_sep << session->user << "@" << session->remote;
|
||||
curr_sep = real_sep;
|
||||
}
|
||||
if (data_flags & LOG_DATA_REPLY_TIME)
|
||||
{
|
||||
print_len += integer_chars;
|
||||
output << curr_sep << elapsed_ms;
|
||||
curr_sep = real_sep;
|
||||
}
|
||||
if (data_flags & LOG_DATA_QUERY)
|
||||
{
|
||||
print_len += sql_str_len + 1; // Can't use strlen, not null-terminated
|
||||
}
|
||||
|
||||
if (print_len == 0)
|
||||
{
|
||||
return 0; // Nothing to print
|
||||
}
|
||||
|
||||
/* Allocate space for a buffer. Printing to the file in parts would likely
|
||||
cause garbled printing if several threads write simultaneously, so we
|
||||
have to first print to a string. */
|
||||
char *print_str = NULL;
|
||||
if ((print_str = (char*)MXS_CALLOC(print_len, sizeof(char))) == NULL)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
bool error = false;
|
||||
char *current_pos = print_str;
|
||||
int rval = 0;
|
||||
if (!error && (data_flags & LOG_DATA_SERVICE))
|
||||
{
|
||||
if ((rval = sprintf(current_pos, "%s,", session->service)) < 0)
|
||||
output << curr_sep;
|
||||
if (*instance->query_newline)
|
||||
{
|
||||
error = true;
|
||||
print_string_replace_newlines(sql_string, sql_str_len, instance->query_newline, &output);
|
||||
}
|
||||
else
|
||||
{
|
||||
current_pos += rval;
|
||||
// The newline replacement is an empty string so print the query as is
|
||||
output.write(sql_string, sql_str_len); // non-null-terminated string
|
||||
}
|
||||
}
|
||||
if (!error && (data_flags & LOG_DATA_SESSION))
|
||||
{
|
||||
if ((rval = sprintf(current_pos, "%lu,", session->ses_id)) < 0)
|
||||
{
|
||||
error = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
current_pos += rval;
|
||||
}
|
||||
}
|
||||
if (!error && (data_flags & LOG_DATA_DATE))
|
||||
{
|
||||
if ((rval = sprintf(current_pos, "%s,", time_string)) < 0)
|
||||
{
|
||||
error = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
current_pos += rval;
|
||||
}
|
||||
}
|
||||
if (!error && (data_flags & LOG_DATA_USER))
|
||||
{
|
||||
if ((rval = sprintf(current_pos, "%s@%s,", session->user, session->remote)) < 0)
|
||||
{
|
||||
error = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
current_pos += rval;
|
||||
}
|
||||
}
|
||||
if (!error && (data_flags & LOG_DATA_REPLY_TIME))
|
||||
{
|
||||
if ((rval = sprintf(current_pos, "%d,", elapsed_ms)) < 0)
|
||||
{
|
||||
error = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
current_pos += rval;
|
||||
}
|
||||
}
|
||||
if (!error && (data_flags & LOG_DATA_QUERY))
|
||||
{
|
||||
strncat(current_pos, sql_string, sql_str_len); // non-null-terminated string
|
||||
current_pos += sql_str_len + 1; // +1 to move to the next char after
|
||||
}
|
||||
if (error || current_pos <= print_str)
|
||||
{
|
||||
MXS_FREE(print_str);
|
||||
MXS_ERROR("qlafilter ('%s'): Failed to format log event.", instance->name);
|
||||
return -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Overwrite the last ','. The rest is already filled with 0.
|
||||
*(current_pos - 1) = '\n';
|
||||
}
|
||||
output << "\n";
|
||||
|
||||
// Finally, write the log event.
|
||||
int written = fprintf(logfile, "%s", print_str);
|
||||
MXS_FREE(print_str);
|
||||
int written = fprintf(logfile, "%s", output.str().c_str());
|
||||
|
||||
if ((!instance->flush_writes) || (written <= 0))
|
||||
{
|
||||
|
Reference in New Issue
Block a user