Merge branch 'develop' into binlog_server_wait_data
This commit is contained in:
@ -265,6 +265,12 @@ cdc_auth_set_client_data(CDC_session *client_data,
|
||||
uint8_t *client_auth_packet,
|
||||
int client_auth_packet_size)
|
||||
{
|
||||
if (client_auth_packet_size % 2 != 0)
|
||||
{
|
||||
/** gw_hex2bin expects an even number of bytes */
|
||||
client_auth_packet_size--;
|
||||
}
|
||||
|
||||
int rval = CDC_STATE_AUTH_ERR;
|
||||
int decoded_size = client_auth_packet_size / 2;
|
||||
char decoded_buffer[decoded_size + 1]; // Extra for terminating null
|
||||
|
@ -11,3 +11,4 @@ add_subdirectory(regexfilter)
|
||||
add_subdirectory(tee)
|
||||
add_subdirectory(testfilter)
|
||||
add_subdirectory(topfilter)
|
||||
add_subdirectory(tpmfilter)
|
||||
|
@ -184,7 +184,7 @@ RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc
|
||||
if (!status.ok())
|
||||
{
|
||||
MXS_ERROR("Could not store version information to created RocksDB database \"%s\". "
|
||||
"You may need to delete the database and retry. RocksDB error: %s",
|
||||
"You may need to delete the database and retry. RocksDB error: \"%s\"",
|
||||
path.c_str(),
|
||||
status.ToString().c_str());
|
||||
}
|
||||
@ -236,7 +236,7 @@ RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Could not read version information from RocksDB database %s. "
|
||||
"You may need to delete the database and retry. RocksDB error: %s",
|
||||
"You may need to delete the database and retry. RocksDB error: \"%s\"",
|
||||
path.c_str(),
|
||||
status.ToString().c_str());
|
||||
delete pDb;
|
||||
@ -244,8 +244,13 @@ RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Could not open/initialize RocksDB database %s. RocksDB error: %s",
|
||||
MXS_ERROR("Could not open/initialize RocksDB database %s. RocksDB error: \"%s\"",
|
||||
path.c_str(), status.ToString().c_str());
|
||||
|
||||
if (status.IsIOError())
|
||||
{
|
||||
MXS_ERROR("Is an other MaxScale process running?");
|
||||
}
|
||||
}
|
||||
|
||||
return pStorage;
|
||||
|
@ -484,7 +484,7 @@ regex_replace(const char *sql, pcre2_code *re, pcre2_match_data *match_data, con
|
||||
size_t result_size;
|
||||
|
||||
/** This should never fail with rc == 0 because we used pcre2_match_data_create_from_pattern() */
|
||||
if (pcre2_match(re, (PCRE2_SPTR) sql, PCRE2_ZERO_TERMINATED, 0, 0, match_data, NULL))
|
||||
if (pcre2_match(re, (PCRE2_SPTR) sql, PCRE2_ZERO_TERMINATED, 0, 0, match_data, NULL) > 0)
|
||||
{
|
||||
result_size = strlen(sql) + strlen(replace);
|
||||
result = MXS_MALLOC(result_size);
|
||||
|
4
server/modules/filter/tpmfilter/CMakeLists.txt
Normal file
4
server/modules/filter/tpmfilter/CMakeLists.txt
Normal file
@ -0,0 +1,4 @@
|
||||
add_library(tpmfilter SHARED tpmfilter.c)
|
||||
target_link_libraries(tpmfilter maxscale-common)
|
||||
set_target_properties(tpmfilter PROPERTIES VERSION "1.0.0")
|
||||
install_module(tpmfilter experimental)
|
559
server/modules/filter/tpmfilter/tpmfilter.c
Normal file
559
server/modules/filter/tpmfilter/tpmfilter.c
Normal file
@ -0,0 +1,559 @@
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
|
||||
*
|
||||
* Change Date: 2019-07-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @file tpmfilter.c - Transaction Performance Monitoring Filter
|
||||
* @verbatim
|
||||
*
|
||||
* A simple filter that groups queries into a transaction with the latency.
|
||||
*
|
||||
* The filter reads the routed queries, groups them into a transaction by
|
||||
* detecting 'commit' statement at the end. The transactions are timestamped with a
|
||||
* unix-timestamp and the latency of a transaction is recorded in milliseconds.
|
||||
* The filter will not record transactions that are rolled back.
|
||||
* Please note that the filter only works with 'autocommit' option disabled.
|
||||
*
|
||||
* The filter makes no attempt to deal with query packets that do not fit
|
||||
* in a single GWBUF.
|
||||
*
|
||||
* Optional parameters:
|
||||
* filename=<name of the file to which transaction performance logs are written (default=tpm.log)>
|
||||
* delimiter=<delimiter for columns in a log (default='|')>
|
||||
* query_delimiter=<delimiter for query statements in a transaction (default=';')>
|
||||
* source=<source address to limit filter>
|
||||
* user=<username to limit filter>
|
||||
*
|
||||
* Date Who Description
|
||||
* 06/12/2015 Dong Young Yoon Initial implementation
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <fcntl.h>
|
||||
#include <filter.h>
|
||||
#include <modinfo.h>
|
||||
#include <modutil.h>
|
||||
#include <skygw_utils.h>
|
||||
#include <log_manager.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include <sys/time.h>
|
||||
#include <regex.h>
|
||||
#include <atomic.h>
|
||||
|
||||
/** Defined in log_manager.cc */
|
||||
extern int lm_enabled_logfiles_bitmask;
|
||||
extern size_t log_ses_count[];
|
||||
|
||||
MODULE_INFO info =
|
||||
{
|
||||
MODULE_API_FILTER,
|
||||
MODULE_GA,
|
||||
FILTER_VERSION,
|
||||
"Transaction Performance Monitoring filter"
|
||||
};
|
||||
|
||||
static char *version_str = "V1.0.0";
|
||||
static size_t buf_size = 10;
|
||||
static size_t sql_size_limit = 64 * 1024 *
|
||||
1024; /* The maximum size for query statements in a transaction (64MB) */
|
||||
|
||||
/*
|
||||
* The filter entry points
|
||||
*/
|
||||
static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **);
|
||||
static void *newSession(FILTER *instance, SESSION *session);
|
||||
static void closeSession(FILTER *instance, void *session);
|
||||
static void freeSession(FILTER *instance, void *session);
|
||||
static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
|
||||
static void setUpstream(FILTER *instance, void *fsession, UPSTREAM *upstream);
|
||||
static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue);
|
||||
static int clientReply(FILTER *instance, void *fsession, GWBUF *queue);
|
||||
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb);
|
||||
|
||||
static FILTER_OBJECT MyObject =
|
||||
{
|
||||
createInstance,
|
||||
newSession,
|
||||
closeSession,
|
||||
freeSession,
|
||||
setDownstream,
|
||||
setUpstream,
|
||||
routeQuery,
|
||||
clientReply,
|
||||
diagnostic,
|
||||
};
|
||||
|
||||
/**
|
||||
* A instance structure, every instance will write to a same file.
|
||||
*/
|
||||
typedef struct
|
||||
{
|
||||
int sessions; /* Session count */
|
||||
char *source; /* The source of the client connection */
|
||||
char *user; /* The user name to filter on */
|
||||
char *filename; /* filename */
|
||||
char *delimiter; /* delimiter for columns in a log */
|
||||
char *query_delimiter; /* delimiter for query statements in a transaction */
|
||||
|
||||
int query_delimiter_size; /* the length of the query delimiter */
|
||||
FILE* fp;
|
||||
} TPM_INSTANCE;
|
||||
|
||||
/**
|
||||
* The session structure for this TPM filter.
|
||||
* This stores the downstream filter information, such that the
|
||||
* filter is able to pass the query on to the next filter (or router)
|
||||
* in the chain.
|
||||
*
|
||||
* It also holds the file descriptor to which queries are written.
|
||||
*/
|
||||
typedef struct
|
||||
{
|
||||
DOWNSTREAM down;
|
||||
UPSTREAM up;
|
||||
int active;
|
||||
char *clientHost;
|
||||
char *userName;
|
||||
char* sql;
|
||||
struct timeval start;
|
||||
char *current;
|
||||
int n_statements;
|
||||
struct timeval total;
|
||||
struct timeval current_start;
|
||||
bool query_end;
|
||||
char *buf;
|
||||
int sql_index;
|
||||
size_t max_sql_size;
|
||||
} TPM_SESSION;
|
||||
|
||||
/**
|
||||
* Implementation of the mandatory version entry point
|
||||
*
|
||||
* @return version string of the module
|
||||
*/
|
||||
char *
|
||||
version()
|
||||
{
|
||||
return version_str;
|
||||
}
|
||||
|
||||
/**
|
||||
* The module initialisation routine, called when the module
|
||||
* is first loaded.
|
||||
*/
|
||||
void
|
||||
ModuleInit()
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* The module entry point routine. It is this routine that
|
||||
* must populate the structure that is referred to as the
|
||||
* "module object", this is a structure with the set of
|
||||
* external entry points for this module.
|
||||
*
|
||||
* @return The module object
|
||||
*/
|
||||
FILTER_OBJECT *
|
||||
GetModuleObject()
|
||||
{
|
||||
return &MyObject;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an instance of the filter for a particular service
|
||||
* within MaxScale.
|
||||
*
|
||||
* @param options The options for this filter
|
||||
* @param params The array of name/value pair parameters for the filter
|
||||
*
|
||||
* @return The instance data for this new instance
|
||||
*/
|
||||
static FILTER *
|
||||
createInstance(const char *name, char **options, FILTER_PARAMETER **params)
|
||||
{
|
||||
int i;
|
||||
TPM_INSTANCE *my_instance;
|
||||
|
||||
if ((my_instance = calloc(1, sizeof(TPM_INSTANCE))) != NULL)
|
||||
{
|
||||
my_instance->source = NULL;
|
||||
my_instance->user = NULL;
|
||||
|
||||
/* set default log filename */
|
||||
my_instance->filename = strdup("tpm.log");
|
||||
/* set default delimiter */
|
||||
my_instance->delimiter = strdup("|");
|
||||
/* set default query delimiter */
|
||||
my_instance->query_delimiter = strdup(";");
|
||||
my_instance->query_delimiter_size = 1;
|
||||
|
||||
for (i = 0; params && params[i]; i++)
|
||||
{
|
||||
if (!strcmp(params[i]->name, "filename"))
|
||||
{
|
||||
free(my_instance->filename);
|
||||
my_instance->filename = strdup(params[i]->value);
|
||||
}
|
||||
else if (!strcmp(params[i]->name, "source"))
|
||||
{
|
||||
my_instance->source = strdup(params[i]->value);
|
||||
}
|
||||
else if (!strcmp(params[i]->name, "user"))
|
||||
{
|
||||
my_instance->user = strdup(params[i]->value);
|
||||
}
|
||||
else if (!strcmp(params[i]->name, "delimiter"))
|
||||
{
|
||||
free(my_instance->delimiter);
|
||||
my_instance->delimiter = strdup(params[i]->value);
|
||||
}
|
||||
else if (!strcmp(params[i]->name, "query_delimiter"))
|
||||
{
|
||||
free(my_instance->query_delimiter);
|
||||
my_instance->query_delimiter = strdup(params[i]->value);
|
||||
my_instance->query_delimiter_size = strlen(my_instance->query_delimiter);
|
||||
}
|
||||
}
|
||||
my_instance->sessions = 0;
|
||||
my_instance->fp = fopen(my_instance->filename, "w");
|
||||
if (my_instance->fp == NULL)
|
||||
{
|
||||
MXS_ERROR("Opening output file '%s' for tpmfilter failed due to %d, %s", my_instance->filename, errno,
|
||||
strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
return (FILTER *)my_instance;
|
||||
}
|
||||
|
||||
/**
|
||||
* Associate a new session with this instance of the filter.
|
||||
*
|
||||
* Every session uses the same log file.
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param session The session itself
|
||||
* @return Session specific data for this session
|
||||
*/
|
||||
static void *
|
||||
newSession(FILTER *instance, SESSION *session)
|
||||
{
|
||||
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
|
||||
TPM_SESSION *my_session;
|
||||
int i;
|
||||
char *remote, *user;
|
||||
|
||||
if ((my_session = calloc(1, sizeof(TPM_SESSION))) != NULL)
|
||||
{
|
||||
atomic_add(&my_instance->sessions, 1);
|
||||
|
||||
my_session->max_sql_size = 4 * 1024; // default max query size of 4k.
|
||||
my_session->sql = (char*)malloc(my_session->max_sql_size);
|
||||
memset(my_session->sql, 0x00, my_session->max_sql_size);
|
||||
my_session->buf = (char*)malloc(buf_size);
|
||||
my_session->sql_index = 0;
|
||||
my_session->n_statements = 0;
|
||||
my_session->total.tv_sec = 0;
|
||||
my_session->total.tv_usec = 0;
|
||||
my_session->current = NULL;
|
||||
if ((remote = session_get_remote(session)) != NULL)
|
||||
{
|
||||
my_session->clientHost = strdup(remote);
|
||||
}
|
||||
else
|
||||
{
|
||||
my_session->clientHost = NULL;
|
||||
}
|
||||
if ((user = session_getUser(session)) != NULL)
|
||||
{
|
||||
my_session->userName = strdup(user);
|
||||
}
|
||||
else
|
||||
{
|
||||
my_session->userName = NULL;
|
||||
}
|
||||
my_session->active = 1;
|
||||
if (my_instance->source && my_session->clientHost && strcmp(my_session->clientHost,
|
||||
my_instance->source))
|
||||
{
|
||||
my_session->active = 0;
|
||||
}
|
||||
if (my_instance->user && my_session->userName && strcmp(my_session->userName,
|
||||
my_instance->user))
|
||||
{
|
||||
my_session->active = 0;
|
||||
}
|
||||
}
|
||||
|
||||
return my_session;
|
||||
}
|
||||
|
||||
/**
|
||||
* Close a session with the filter, this is the mechanism
|
||||
* by which a filter may cleanup data structure etc.
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param session The session being closed
|
||||
*/
|
||||
static void
|
||||
closeSession(FILTER *instance, void *session)
|
||||
{
|
||||
TPM_SESSION *my_session = (TPM_SESSION *)session;
|
||||
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
|
||||
if (my_instance->fp != NULL)
|
||||
{
|
||||
// flush FP when a session is closed.
|
||||
fflush(my_instance->fp);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Free the memory associated with the session
|
||||
*
|
||||
* @param instance The filter instance
|
||||
* @param session The filter session
|
||||
*/
|
||||
static void
|
||||
freeSession(FILTER *instance, void *session)
|
||||
{
|
||||
TPM_SESSION *my_session = (TPM_SESSION *)session;
|
||||
|
||||
free(my_session->clientHost);
|
||||
free(my_session->userName);
|
||||
free(my_session->sql);
|
||||
free(my_session->buf);
|
||||
free(session);
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the downstream filter or router to which queries will be
|
||||
* passed from this filter.
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param session The filter session
|
||||
* @param downstream The downstream filter or router.
|
||||
*/
|
||||
static void
|
||||
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
|
||||
{
|
||||
TPM_SESSION *my_session = (TPM_SESSION *)session;
|
||||
|
||||
my_session->down = *downstream;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the upstream filter or session to which results will be
|
||||
* passed from this filter.
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param session The filter session
|
||||
* @param upstream The upstream filter or session.
|
||||
*/
|
||||
static void
|
||||
setUpstream(FILTER *instance, void *session, UPSTREAM *upstream)
|
||||
{
|
||||
TPM_SESSION *my_session = (TPM_SESSION *)session;
|
||||
|
||||
my_session->up = *upstream;
|
||||
}
|
||||
|
||||
/**
|
||||
* The routeQuery entry point. This is passed the query buffer
|
||||
* to which the filter should be applied. Once applied the
|
||||
* query should normally be passed to the downstream component
|
||||
* (filter or router) in the filter chain.
|
||||
*
|
||||
* @param instance The filter instance data
|
||||
* @param session The filter session
|
||||
* @param queue The query data
|
||||
*/
|
||||
static int
|
||||
routeQuery(FILTER *instance, void *session, GWBUF *queue)
|
||||
{
|
||||
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
|
||||
TPM_SESSION *my_session = (TPM_SESSION *)session;
|
||||
char *ptr = NULL;
|
||||
size_t i;
|
||||
|
||||
if (my_session->active)
|
||||
{
|
||||
if (queue->next != NULL)
|
||||
{
|
||||
queue = gwbuf_make_contiguous(queue);
|
||||
}
|
||||
if ((ptr = modutil_get_SQL(queue)) != NULL)
|
||||
{
|
||||
my_session->query_end = false;
|
||||
/* check for commit and rollback */
|
||||
if (strlen(ptr) > 5)
|
||||
{
|
||||
size_t ptr_size = strlen(ptr) + 1;
|
||||
char* buf = my_session->buf;
|
||||
for (i = 0; i < ptr_size && i < buf_size; ++i)
|
||||
{
|
||||
buf[i] = tolower(ptr[i]);
|
||||
}
|
||||
if (strncmp(buf, "commit", 6) == 0)
|
||||
{
|
||||
my_session->query_end = true;
|
||||
}
|
||||
else if (strncmp(buf, "rollback", 8) == 0)
|
||||
{
|
||||
my_session->query_end = true;
|
||||
my_session->sql_index = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* for normal sql statements */
|
||||
if (!my_session->query_end)
|
||||
{
|
||||
/* check and expand buffer size first. */
|
||||
size_t new_sql_size = my_session->max_sql_size;
|
||||
size_t len = my_session->sql_index + strlen(ptr) + my_instance->query_delimiter_size + 1;
|
||||
|
||||
/* if the total length of query statements exceeds the maximum limit, print an error and return */
|
||||
if (len > sql_size_limit)
|
||||
{
|
||||
MXS_ERROR("The size of query statements exceeds the maximum buffer limit of 64MB.");
|
||||
goto retblock;
|
||||
}
|
||||
|
||||
/* double buffer size until the buffer fits the query */
|
||||
while (len > new_sql_size)
|
||||
{
|
||||
new_sql_size *= 2;
|
||||
}
|
||||
if (new_sql_size > my_session->max_sql_size)
|
||||
{
|
||||
char* new_sql = (char*)malloc(new_sql_size);
|
||||
if (new_sql == NULL)
|
||||
{
|
||||
MXS_ERROR("Memory allocation failure.");
|
||||
goto retblock;
|
||||
}
|
||||
memcpy(new_sql, my_session->sql, my_session->sql_index);
|
||||
free(my_session->sql);
|
||||
my_session->sql = new_sql;
|
||||
my_session->max_sql_size = new_sql_size;
|
||||
}
|
||||
|
||||
/* first statement */
|
||||
if (my_session->sql_index == 0)
|
||||
{
|
||||
memcpy(my_session->sql, ptr, strlen(ptr));
|
||||
my_session->sql_index += strlen(ptr);
|
||||
gettimeofday(&my_session->current_start, NULL);
|
||||
}
|
||||
/* otherwise, append the statement with semicolon as a statement delimiter */
|
||||
else
|
||||
{
|
||||
/* append a query delimiter */
|
||||
memcpy(my_session->sql + my_session->sql_index, my_instance->query_delimiter,
|
||||
my_instance->query_delimiter_size);
|
||||
/* append the next query statement */
|
||||
memcpy(my_session->sql + my_session->sql_index + my_instance->query_delimiter_size, ptr, strlen(ptr));
|
||||
/* set new pointer for the buffer */
|
||||
my_session->sql_index += (my_instance->query_delimiter_size + strlen(ptr));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
retblock:
|
||||
|
||||
free(ptr);
|
||||
/* Pass the query downstream */
|
||||
return my_session->down.routeQuery(my_session->down.instance,
|
||||
my_session->down.session, queue);
|
||||
}
|
||||
|
||||
static int
|
||||
clientReply(FILTER *instance, void *session, GWBUF *reply)
|
||||
{
|
||||
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
|
||||
TPM_SESSION *my_session = (TPM_SESSION *)session;
|
||||
struct timeval tv, diff;
|
||||
int i, inserted;
|
||||
|
||||
/* found 'commit' and sql statements exist. */
|
||||
if (my_session->query_end && my_session->sql_index > 0)
|
||||
{
|
||||
gettimeofday(&tv, NULL);
|
||||
timersub(&tv, &(my_session->current_start), &diff);
|
||||
|
||||
/* get latency */
|
||||
uint64_t millis = (diff.tv_sec * (uint64_t)1000 + diff.tv_usec / 1000);
|
||||
/* get timestamp */
|
||||
uint64_t timestamp = (tv.tv_sec + (tv.tv_usec / (1000 * 1000)));
|
||||
|
||||
*(my_session->sql + my_session->sql_index) = '\0';
|
||||
|
||||
/* print to log. */
|
||||
fprintf(my_instance->fp, "%ld%s%s%s%s%s%ld%s%s\n",
|
||||
timestamp,
|
||||
my_instance->delimiter,
|
||||
my_session->clientHost,
|
||||
my_instance->delimiter,
|
||||
my_session->userName,
|
||||
my_instance->delimiter,
|
||||
millis,
|
||||
my_instance->delimiter,
|
||||
my_session->sql);
|
||||
|
||||
my_session->sql_index = 0;
|
||||
}
|
||||
|
||||
/* Pass the result upstream */
|
||||
return my_session->up.clientReply(my_session->up.instance,
|
||||
my_session->up.session, reply);
|
||||
}
|
||||
|
||||
/**
|
||||
* Diagnostics routine
|
||||
*
|
||||
* If fsession is NULL then print diagnostics on the filter
|
||||
* instance as a whole, otherwise print diagnostics for the
|
||||
* particular session.
|
||||
*
|
||||
* @param instance The filter instance
|
||||
* @param fsession Filter session, may be NULL
|
||||
* @param dcb The DCB for diagnostic output
|
||||
*/
|
||||
static void
|
||||
diagnostic(FILTER *instance, void *fsession, DCB *dcb)
|
||||
{
|
||||
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
|
||||
TPM_SESSION *my_session = (TPM_SESSION *)fsession;
|
||||
int i;
|
||||
|
||||
if (my_instance->source)
|
||||
dcb_printf(dcb, "\t\tLimit logging to connections from %s\n",
|
||||
my_instance->source);
|
||||
if (my_instance->user)
|
||||
dcb_printf(dcb, "\t\tLimit logging to user %s\n",
|
||||
my_instance->user);
|
||||
if (my_instance->filename)
|
||||
dcb_printf(dcb, "\t\tLogging to file %s.\n",
|
||||
my_instance->filename);
|
||||
if (my_instance->delimiter)
|
||||
dcb_printf(dcb, "\t\tLogging with delimiter %s.\n",
|
||||
my_instance->delimiter);
|
||||
if (my_instance->query_delimiter)
|
||||
dcb_printf(dcb, "\t\tLogging with query delimiter %s.\n",
|
||||
my_instance->query_delimiter);
|
||||
}
|
@ -449,7 +449,10 @@ avro_client_process_command(AVRO_INSTANCE *router, AVRO_CLIENT *client, GWBUF *q
|
||||
const char req_last_gtid[] = "QUERY-LAST-TRANSACTION";
|
||||
const char req_gtid[] = "QUERY-TRANSACTION";
|
||||
const size_t req_data_len = sizeof(req_data) - 1;
|
||||
uint8_t *data = GWBUF_DATA(queue);
|
||||
size_t buflen = gwbuf_length(queue);
|
||||
uint8_t data[buflen + 1];
|
||||
gwbuf_copy_data(queue, 0, buflen, data);
|
||||
data[buflen] = '\0';
|
||||
char *command_ptr = strstr((char *)data, req_data);
|
||||
|
||||
if (command_ptr != NULL)
|
||||
|
@ -667,6 +667,15 @@ static void closeSession(ROUTER *instance, void *router_session)
|
||||
else
|
||||
{
|
||||
ss_dassert(!BREF_IS_WAITING_RESULT(bref));
|
||||
|
||||
/** This should never be true unless a backend reference is taken
|
||||
* out of use before clearing the BREF_WAITING_RESULT state */
|
||||
if (BREF_IS_WAITING_RESULT(bref))
|
||||
{
|
||||
MXS_WARNING("A closed backend was expecting a result, this should not be possible. "
|
||||
"Decrementing active operation counter for this backend.");
|
||||
bref_clear_state(bref, BREF_WAITING_RESULT);
|
||||
}
|
||||
}
|
||||
}
|
||||
/** Unlock */
|
||||
@ -1044,160 +1053,6 @@ lock_failed:
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Router error handling routine (API)
|
||||
*
|
||||
* Error Handler routine to resolve _backend_ failures. If it succeeds then
|
||||
* there are enough operative backends available and connected. Otherwise it
|
||||
* fails, and session is terminated.
|
||||
*
|
||||
* @param instance The router instance
|
||||
* @param router_session The router session
|
||||
* @param errmsgbuf The error message to reply
|
||||
* @param backend_dcb The backend DCB
|
||||
* @param action The action: ERRACT_NEW_CONNECTION or
|
||||
* ERRACT_REPLY_CLIENT
|
||||
* @param succp Result of action: true iff router can continue
|
||||
*
|
||||
* Even if succp == true connecting to new slave may have failed. succp is to
|
||||
* tell whether router has enough master/slave connections to continue work.
|
||||
*/
|
||||
static void handleError(ROUTER *instance, void *router_session,
|
||||
GWBUF *errmsgbuf, DCB *problem_dcb,
|
||||
error_action_t action, bool *succp)
|
||||
{
|
||||
SESSION *session;
|
||||
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance;
|
||||
ROUTER_CLIENT_SES *rses = (ROUTER_CLIENT_SES *)router_session;
|
||||
|
||||
CHK_DCB(problem_dcb);
|
||||
|
||||
/** Don't handle same error twice on same DCB */
|
||||
if (problem_dcb->dcb_errhandle_called)
|
||||
{
|
||||
/** we optimistically assume that previous call succeed */
|
||||
/*
|
||||
* The return of true is potentially misleading, but appears to
|
||||
* be safe with the code as it stands on 9 Sept 2015 - MNB
|
||||
*/
|
||||
*succp = true;
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
problem_dcb->dcb_errhandle_called = true;
|
||||
}
|
||||
session = problem_dcb->session;
|
||||
|
||||
if (session == NULL || rses == NULL)
|
||||
{
|
||||
*succp = false;
|
||||
}
|
||||
else if (DCB_ROLE_CLIENT_HANDLER == problem_dcb->dcb_role)
|
||||
{
|
||||
*succp = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
CHK_SESSION(session);
|
||||
CHK_CLIENT_RSES(rses);
|
||||
|
||||
switch (action)
|
||||
{
|
||||
case ERRACT_NEW_CONNECTION:
|
||||
{
|
||||
if (!rses_begin_locked_router_action(rses))
|
||||
{
|
||||
*succp = false;
|
||||
break;
|
||||
}
|
||||
|
||||
/**
|
||||
* If master has lost its Master status error can't be
|
||||
* handled so that session could continue.
|
||||
*/
|
||||
if (rses->rses_master_ref && rses->rses_master_ref->bref_dcb == problem_dcb &&
|
||||
!SERVER_IS_MASTER(rses->rses_master_ref->bref_backend->backend_server))
|
||||
{
|
||||
SERVER *srv = rses->rses_master_ref->bref_backend->backend_server;
|
||||
backend_ref_t *bref;
|
||||
bref = get_bref_from_dcb(rses, problem_dcb);
|
||||
if (bref != NULL)
|
||||
{
|
||||
CHK_BACKEND_REF(bref);
|
||||
if (BREF_IS_WAITING_RESULT(bref))
|
||||
{
|
||||
bref_clear_state(bref, BREF_WAITING_RESULT);
|
||||
}
|
||||
bref_clear_state(bref, BREF_IN_USE);
|
||||
bref_set_state(bref, BREF_CLOSED);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("server %s:%d lost the "
|
||||
"master status but could not locate the "
|
||||
"corresponding backend ref.",
|
||||
srv->name, srv->port);
|
||||
}
|
||||
|
||||
if (rses->rses_config.rw_master_failure_mode != RW_FAIL_INSTANTLY &&
|
||||
(bref == NULL || !BREF_IS_WAITING_RESULT(bref)))
|
||||
{
|
||||
/** The failure of a master is not considered a critical
|
||||
* failure as partial functionality still remains. Reads
|
||||
* are allowed as long as slave servers are available
|
||||
* and writes will cause an error to be returned.
|
||||
*
|
||||
* If we were waiting for a response from the master, we
|
||||
* can't be sure whether it was executed or not. In this
|
||||
* case the safest thing to do is to close the client
|
||||
* connection. */
|
||||
*succp = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!srv->master_err_is_logged)
|
||||
{
|
||||
MXS_ERROR("server %s:%d lost the "
|
||||
"master status. Readwritesplit "
|
||||
"service can't locate the master. "
|
||||
"Client sessions will be closed.",
|
||||
srv->name, srv->port);
|
||||
srv->master_err_is_logged = true;
|
||||
}
|
||||
*succp = false;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/**
|
||||
* This is called in hope of getting replacement for
|
||||
* failed slave(s). This call may free rses.
|
||||
*/
|
||||
*succp = handle_error_new_connection(inst, &rses, problem_dcb, errmsgbuf);
|
||||
}
|
||||
/* Free the lock if rses still exists */
|
||||
if (rses)
|
||||
{
|
||||
rses_end_locked_router_action(rses);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case ERRACT_REPLY_CLIENT:
|
||||
{
|
||||
handle_error_reply_client(session, rses, problem_dcb, errmsgbuf);
|
||||
*succp = false; /*< no new backend servers were made available */
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
*succp = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
dcb_close(problem_dcb);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Get router capabilities (API)
|
||||
@ -1674,6 +1529,175 @@ static bool rwsplit_process_router_options(ROUTER_INSTANCE *router,
|
||||
return success;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Router error handling routine (API)
|
||||
*
|
||||
* Error Handler routine to resolve _backend_ failures. If it succeeds then
|
||||
* there are enough operative backends available and connected. Otherwise it
|
||||
* fails, and session is terminated.
|
||||
*
|
||||
* @param instance The router instance
|
||||
* @param router_session The router session
|
||||
* @param errmsgbuf The error message to reply
|
||||
* @param backend_dcb The backend DCB
|
||||
* @param action The action: ERRACT_NEW_CONNECTION or
|
||||
* ERRACT_REPLY_CLIENT
|
||||
* @param succp Result of action: true iff router can continue
|
||||
*
|
||||
* Even if succp == true connecting to new slave may have failed. succp is to
|
||||
* tell whether router has enough master/slave connections to continue work.
|
||||
*/
|
||||
static void handleError(ROUTER *instance, void *router_session,
|
||||
GWBUF *errmsgbuf, DCB *problem_dcb,
|
||||
error_action_t action, bool *succp)
|
||||
{
|
||||
SESSION *session;
|
||||
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance;
|
||||
ROUTER_CLIENT_SES *rses = (ROUTER_CLIENT_SES *)router_session;
|
||||
|
||||
CHK_DCB(problem_dcb);
|
||||
|
||||
/** Don't handle same error twice on same DCB */
|
||||
if (problem_dcb->dcb_errhandle_called)
|
||||
{
|
||||
/** we optimistically assume that previous call succeed */
|
||||
/*
|
||||
* The return of true is potentially misleading, but appears to
|
||||
* be safe with the code as it stands on 9 Sept 2015 - MNB
|
||||
*/
|
||||
*succp = true;
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
problem_dcb->dcb_errhandle_called = true;
|
||||
}
|
||||
session = problem_dcb->session;
|
||||
|
||||
bool close_dcb = true;
|
||||
|
||||
if (session == NULL || rses == NULL)
|
||||
{
|
||||
*succp = false;
|
||||
}
|
||||
else if (DCB_ROLE_CLIENT_HANDLER == problem_dcb->dcb_role)
|
||||
{
|
||||
*succp = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
CHK_SESSION(session);
|
||||
CHK_CLIENT_RSES(rses);
|
||||
|
||||
switch (action)
|
||||
{
|
||||
case ERRACT_NEW_CONNECTION:
|
||||
{
|
||||
if (!rses_begin_locked_router_action(rses))
|
||||
{
|
||||
close_dcb = false; /* With the assumption that if the router session is closed,
|
||||
* then so is the dcb.
|
||||
*/
|
||||
*succp = false;
|
||||
break;
|
||||
}
|
||||
|
||||
/**
|
||||
* If master has lost its Master status error can't be
|
||||
* handled so that session could continue.
|
||||
*/
|
||||
if (rses->rses_master_ref && rses->rses_master_ref->bref_dcb == problem_dcb &&
|
||||
!SERVER_IS_MASTER(rses->rses_master_ref->bref_backend->backend_server))
|
||||
{
|
||||
SERVER *srv = rses->rses_master_ref->bref_backend->backend_server;
|
||||
backend_ref_t *bref;
|
||||
bref = get_bref_from_dcb(rses, problem_dcb);
|
||||
if (bref != NULL)
|
||||
{
|
||||
CHK_BACKEND_REF(bref);
|
||||
if (BREF_IS_WAITING_RESULT(bref))
|
||||
{
|
||||
bref_clear_state(bref, BREF_WAITING_RESULT);
|
||||
}
|
||||
bref_clear_state(bref, BREF_IN_USE);
|
||||
bref_set_state(bref, BREF_CLOSED);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("server %s:%d lost the "
|
||||
"master status but could not locate the "
|
||||
"corresponding backend ref.",
|
||||
srv->name, srv->port);
|
||||
}
|
||||
|
||||
if (rses->rses_config.rw_master_failure_mode != RW_FAIL_INSTANTLY &&
|
||||
(bref == NULL || !BREF_IS_WAITING_RESULT(bref)))
|
||||
{
|
||||
/** The failure of a master is not considered a critical
|
||||
* failure as partial functionality still remains. Reads
|
||||
* are allowed as long as slave servers are available
|
||||
* and writes will cause an error to be returned.
|
||||
*
|
||||
* If we were waiting for a response from the master, we
|
||||
* can't be sure whether it was executed or not. In this
|
||||
* case the safest thing to do is to close the client
|
||||
* connection. */
|
||||
*succp = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!srv->master_err_is_logged)
|
||||
{
|
||||
MXS_ERROR("server %s:%d lost the "
|
||||
"master status. Readwritesplit "
|
||||
"service can't locate the master. "
|
||||
"Client sessions will be closed.",
|
||||
srv->name, srv->port);
|
||||
srv->master_err_is_logged = true;
|
||||
}
|
||||
*succp = false;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/**
|
||||
* This is called in hope of getting replacement for
|
||||
* failed slave(s). This call may free rses.
|
||||
*/
|
||||
*succp = handle_error_new_connection(inst, &rses, problem_dcb, errmsgbuf);
|
||||
}
|
||||
|
||||
dcb_close(problem_dcb);
|
||||
close_dcb = false;
|
||||
/* Free the lock if rses still exists */
|
||||
if (rses)
|
||||
{
|
||||
rses_end_locked_router_action(rses);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case ERRACT_REPLY_CLIENT:
|
||||
{
|
||||
handle_error_reply_client(session, rses, problem_dcb, errmsgbuf);
|
||||
close_dcb = false;
|
||||
*succp = false; /*< no new backend servers were made available */
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
ss_dassert(!true);
|
||||
*succp = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (close_dcb)
|
||||
{
|
||||
dcb_close(problem_dcb);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Handle an error reply for a client
|
||||
*
|
||||
@ -1694,18 +1718,39 @@ static void handle_error_reply_client(SESSION *ses, ROUTER_CLIENT_SES *rses,
|
||||
client_dcb = ses->client_dcb;
|
||||
spinlock_release(&ses->ses_lock);
|
||||
|
||||
/**
|
||||
* If bref exists, mark it closed
|
||||
*/
|
||||
if ((bref = get_bref_from_dcb(rses, backend_dcb)) != NULL)
|
||||
if (rses_begin_locked_router_action(rses))
|
||||
{
|
||||
CHK_BACKEND_REF(bref);
|
||||
bref_clear_state(bref, BREF_IN_USE);
|
||||
bref_set_state(bref, BREF_CLOSED);
|
||||
if (BREF_IS_WAITING_RESULT(bref))
|
||||
/**
|
||||
* If bref exists, mark it closed
|
||||
*/
|
||||
if ((bref = get_bref_from_dcb(rses, backend_dcb)) != NULL)
|
||||
{
|
||||
bref_clear_state(bref, BREF_WAITING_RESULT);
|
||||
CHK_BACKEND_REF(bref);
|
||||
|
||||
if (BREF_IS_IN_USE(bref))
|
||||
{
|
||||
bref_clear_state(bref, BREF_IN_USE);
|
||||
bref_set_state(bref, BREF_CLOSED);
|
||||
if (BREF_IS_WAITING_RESULT(bref))
|
||||
{
|
||||
bref_clear_state(bref, BREF_WAITING_RESULT);
|
||||
}
|
||||
|
||||
dcb_close(backend_dcb);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// All dcbs should be associated with a backend reference.
|
||||
ss_dassert(!true);
|
||||
}
|
||||
|
||||
rses_end_locked_router_action(rses);
|
||||
}
|
||||
else
|
||||
{
|
||||
// The session has already been closed, hence the dcb has been
|
||||
// closed as well.
|
||||
}
|
||||
|
||||
if (sesstate == SESSION_STATE_ROUTER_READY)
|
||||
|
Reference in New Issue
Block a user