Merge branch 'binlog_server_wait_data' into binlog_server_waitdata_encryption

This commit is contained in:
MassimilianoPinto
2016-10-05 18:25:43 +02:00
24 changed files with 1164 additions and 781 deletions

View File

@ -276,11 +276,16 @@ hkthread(void *data)
ptr->nextdue = now + ptr->frequency;
taskfn = ptr->task;
taskdata = ptr->data;
// We need to copy type and name, in case hktask_remove is called from
// the callback. Otherwise we will access freed data.
HKTASK_TYPE type = ptr->type;
char name[strlen(ptr->name) + 1];
strcpy(name, ptr->name);
spinlock_release(&tasklock);
(*taskfn)(taskdata);
if (ptr->type == HK_ONESHOT)
if (type == HK_ONESHOT)
{
hktask_remove(ptr->name);
hktask_remove(name);
}
spinlock_acquire(&tasklock);
ptr = tasks;

View File

@ -11,53 +11,13 @@
* Public License.
*/
/** @file
@brief (brief description)
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <libgen.h>
#include <maxscale/alloc.h>
#include <skygw_utils.h>
#include <log_manager.h>
typedef struct thread_st
{
skygw_message_t* mes;
simple_mutex_t* mtx;
size_t* nactive;
pthread_t tid;
} thread_t;
static void* thr_run(void* data);
static void* thr_run_morelog(void* data);
#define MAX_NTHR 256
#define NITER 100
#if 1
# define TEST1
#endif
#if 0
# define TEST2
#endif
#define TEST3
#define TEST4
const char USAGE[] =
"usage: %s [-t <#threads>]\n"
"\n"
"-t: Number of threads. Default is %d.\n";
const int N_THR = 4;
#define TEST_ERROR(msg)\
do { fprintf(stderr, "[%s:%d]: %s\n", basename(__FILE__), __LINE__, msg); } while (false)
static void skygw_log_enable(int priority)
{
mxs_log_set_priority_enabled(priority, true);
@ -78,60 +38,13 @@ int main(int argc, char* argv[])
skygw_message_t* mes;
simple_mutex_t* mtx;
size_t nactive;
thread_t** thr = NULL;
time_t t;
struct tm tm;
char c;
int nthr = N_THR;
while ((c = getopt(argc, argv, "t:")) != -1)
{
switch (c)
{
case 't':
nthr = atoi(optarg);
if (nthr <= 0)
{
err = 1;
}
break;
default:
err = 1;
break;
}
}
if (err != 0)
{
fprintf(stderr, USAGE, argv[0], N_THR);
err = 1;
goto return_err;
}
printf("Using %d threads.\n", nthr);
thr = (thread_t **)MXS_CALLOC(1, nthr * sizeof(thread_t*));
if (thr == NULL)
{
err = 1;
goto return_err;
}
i = atexit(mxs_log_finish);
if (i != 0)
{
fprintf(stderr, "Couldn't register exit function.\n");
}
succp = mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
if (!succp)
{
fprintf(stderr, "Log manager initialization failed.\n");
}
ss_dassert(succp);
ss_info_dassert(succp, "Log manager initialization failed");
t = time(NULL);
localtime_r(&t, &tm);
@ -143,63 +56,63 @@ int main(int argc, char* argv[])
tm.tm_min,
tm.tm_sec);
mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
logstr = ("First write with flush.");
err = MXS_ERROR("%s", logstr);
ss_dassert(err == 0);
logstr = ("Second write with flush.");
err = MXS_ERROR("%s", logstr);
ss_dassert(err == 0);
logstr = ("Third write, no flush.");
err = MXS_ERROR("%s", logstr);
ss_dassert(err == 0);
logstr = ("Fourth write, no flush. Next flush only.");
err = MXS_ERROR("%s", logstr);
ss_dassert(err == 0);
err = mxs_log_flush();
ss_dassert(err == 0);
logstr = "My name is %s %d years and %d months.";
#if !defined(SS_DEBUG)
skygw_log_enable(LOG_INFO);
#endif
err = MXS_INFO(logstr, "TraceyTracey", 3, 7);
mxs_log_flush();
#if !defined(SS_DEBUG)
skygw_log_enable(LOG_INFO);
#endif
ss_dassert(err == 0);
err = mxs_log_flush();
ss_dassert(err == 0);
logstr = "My name is Tracey Tracey 47 years and 7 months.";
err = MXS_INFO("%s", logstr);
#if !defined(SS_DEBUG)
skygw_log_enable(LOG_INFO);
#endif
ss_dassert(err == 0);
logstr = "My name is Stacey %s";
err = MXS_INFO(logstr, " ");
mxs_log_finish();
#if !defined(SS_DEBUG)
skygw_log_enable(LOG_INFO);
#endif
ss_dassert(err == 0);
logstr = "My name is Philip";
err = MXS_INFO("%s", logstr);
#if !defined(SS_DEBUG)
skygw_log_enable(LOG_INFO);
#endif
ss_dassert(err == 0);
logstr = "Philip.";
err = MXS_INFO("%s", logstr);
#if !defined(SS_DEBUG)
skygw_log_enable(LOG_INFO);
#endif
ss_dassert(err == 0);
logstr = "Ph%dlip.";
err = MXS_INFO(logstr, 1);
ss_dassert(err == 0);
mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
logstr = ("A terrible error has occurred!");
err = MXS_ERROR("%s", logstr);
ss_dassert(err == 0);
logstr = ("Hi, how are you?");
err = MXS_NOTICE("%s", logstr);
ss_dassert(err == 0);
logstr = ("I'm doing fine!");
err = MXS_NOTICE("%s", logstr);
ss_dassert(err == 0);
logstr = ("Rather more surprising, at least at first sight, is the fact that a reference to "
"a[i] can also be written as *(a+i). In evaluating a[i], C converts it to *(a+i) "
@ -207,142 +120,14 @@ int main(int argc, char* argv[])
"of this equivalence, it follows that &a[i] and a+i are also identical: a+i is the "
"address of the i-th element beyond a.");
err = MXS_ERROR("%s", logstr);
ss_dassert(err == 0);
logstr = ("I was wondering, you know, it has been such a lovely weather whole morning and I "
"thought that would you like to come to my place and have a little piece of cheese "
"with us. Just me and my mom - and you, of course. Then, if you wish, we could "
"listen to the radio and keep company for our little Steven, my mom's cat, you know.");
err = MXS_NOTICE("%s", logstr);
mxs_log_finish();
#if defined(TEST1)
mes = skygw_message_init();
mtx = simple_mutex_init(NULL, "testmtx");
/** Test starts */
fprintf(stderr, "\nStarting test #1 \n");
/** 1 */
for (i = 0; i < nthr; i++)
{
thr[i] = (thread_t*)MXS_CALLOC(1, sizeof(thread_t));
MXS_ABORT_IF_NULL(thr[i]);
thr[i]->mes = mes;
thr[i]->mtx = mtx;
thr[i]->nactive = &nactive;
}
nactive = nthr;
for (i = 0; i < nthr; i++)
{
pthread_t p;
pthread_create(&p, NULL, thr_run, thr[i]);
thr[i]->tid = p;
}
do
{
skygw_message_wait(mes);
simple_mutex_lock(mtx, true);
if (nactive > 0)
{
simple_mutex_unlock(mtx);
continue;
}
break;
}
while (true);
for (i = 0; i < nthr; i++)
{
pthread_join(thr[i]->tid, NULL);
}
/** This is to release memory */
mxs_log_finish();
simple_mutex_unlock(mtx);
for (i = 0; i < nthr; i++)
{
MXS_FREE(thr[i]);
}
#endif
#if defined(TEST2)
fprintf(stderr, "\nStarting test #2 \n");
/** 2 */
for (i = 0; i < nthr; i++)
{
thr[i] = (thread_t*)MXS_CALLOC(1, sizeof(thread_t));
MXS_ABORT_IF_NULL(thr[i]);
thr[i]->mes = mes;
thr[i]->mtx = mtx;
thr[i]->nactive = &nactive;
}
nactive = nthr;
fprintf(stderr,
"\nLaunching %d threads, each iterating %d times.",
nthr,
NITER);
for (i = 0; i < nthr; i++)
{
pthread_t p;
pthread_create(&p, NULL, thr_run_morelog, thr[i]);
thr[i]->tid = p;
}
fprintf(stderr, ".. done");
fprintf(stderr, "\nStarting to wait threads.\n");
do
{
skygw_message_wait(mes);
simple_mutex_lock(mtx, true);
if (nactive > 0)
{
simple_mutex_unlock(mtx);
continue;
}
break;
}
while (true);
for (i = 0; i < nthr; i++)
{
pthread_join(thr[i]->tid, NULL);
}
/** This is to release memory */
mxs_log_finish();
simple_mutex_unlock(mtx);
fprintf(stderr, "\nFreeing thread memory.");
for (i = 0; i < nthr; i++)
{
MXS_FREE(thr[i]);
}
/** Test ended here */
skygw_message_done(mes);
simple_mutex_done(mtx);
#endif /* TEST 2 */
#if defined(TEST3)
/**
* Test enable/disable log.
*/
#if !defined(SS_DEBUG)
skygw_log_enable(LOG_INFO);
#endif
succp = mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
ss_dassert(succp);
ss_dassert(err == 0);
logstr = ("\tTEST 3 - test enabling and disabling logs.");
err = MXS_ERROR("%s", logstr);
@ -401,16 +186,8 @@ int main(int argc, char* argv[])
err = MXS_ERROR("%s", logstr);
ss_dassert(err == 0);
mxs_log_finish();
#endif /* TEST 3 */
#if defined(TEST4)
succp = mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
ss_dassert(succp);
#if !defined(SS_DEBUG)
skygw_log_enable(LOG_INFO);
#endif
logstr = ("\tTEST 4 - test spreading logs down to other logs.");
err = MXS_ERROR("%s", logstr);
ss_dassert(err == 0);
@ -440,13 +217,9 @@ int main(int argc, char* argv[])
err = MXS_NOTICE("%s", logstr);
ss_dassert(err == 0);
mxs_log_finish();
succp = mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
ss_dassert(succp);
#if !defined(SS_DEBUG)
skygw_log_enable(LOG_INFO);
#endif
logstr = ("6.\tWrite to ERROR and thus also to MESSAGE and TRACE logs.");
err = MXS_ERROR("%s", logstr);
ss_dassert(err == 0);
@ -479,240 +252,24 @@ int main(int argc, char* argv[])
(int)3,
"foo",
(int)3);
ss_dassert(err == 0);
err = MXS_ERROR("12.\tWrite to MESSAGE and TRACE log some "
"formattings "
": %d %s %d",
(int)3,
"foo",
(int)3);
ss_dassert(err == 0);
err = MXS_ERROR("13.\tWrite to TRACE log some formattings "
": %d %s %d",
(int)3,
"foo",
(int)3);
ss_dassert(err == 0);
mxs_log_finish();
#endif /* TEST 4 */
fprintf(stderr, ".. done.\n");
return_err:
if (thr != NULL)
{
MXS_FREE(thr);
}
return err;
}
static void* thr_run(void* data)
{
thread_t* td = (thread_t *)data;
char* logstr;
int err;
mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
mxs_log_flush();
logstr = ("Hi, how are you?");
err = MXS_NOTICE("%s", logstr);
if (err != 0)
{
TEST_ERROR("Error, log write failed.");
}
ss_dassert(err == 0);
mxs_log_finish();
mxs_log_flush();
logstr = ("I was wondering, you know, it has been such a lovely weather whole morning and "
"I thought that would you like to come to my place and have a little piece of "
"cheese with us. Just me and my mom - and you, of course. Then, if you wish, "
"we could listen to the radio and keep company for our little Steven, my mom's "
"cat, you know.");
if (err != 0)
{
TEST_ERROR("Error, log write failed.");
}
ss_dassert(err == 0);
err = MXS_NOTICE("%s", logstr);
mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
logstr = ("Testing. One, two, three\n");
err = MXS_ERROR("%s", logstr);
if (err != 0)
{
TEST_ERROR("Error, log write failed.");
}
ss_dassert(err == 0);
mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
mxs_log_flush();
logstr = ("For automatic and register variables, it is done each time the function or block is entered.");
#if !defined(SS_DEBUG)
skygw_log_enable(LOG_INFO);
#endif
err = MXS_INFO("%s", logstr);
if (err != 0)
{
TEST_ERROR("Error, log write failed.");
}
ss_dassert(err == 0);
mxs_log_finish();
mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
logstr = ("Rather more surprising, at least at first sight, is the fact that a reference "
"to a[i] can also be written as *(a+i). In evaluating a[i], C converts it to *(a+i) "
"immediately; the two forms are equivalent. Applying the operatos & to both parts "
"of this equivalence, it follows that &a[i] and a+i are also identical: a+i is the "
"address of the i-th element beyond a.");
err = MXS_ERROR("%s", logstr);
if (err != 0)
{
TEST_ERROR("Error, log write failed.");
}
ss_dassert(err == 0);
mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
mxs_log_finish();
mxs_log_flush();
mxs_log_finish();
mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
logstr = ("..and you?");
err = MXS_NOTICE("%s", logstr);
if (err != 0)
{
TEST_ERROR("Error, log write failed.");
}
ss_dassert(err == 0);
mxs_log_finish();
mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
logstr = ("For automatic and register variables, it is done each time the function or block is entered.");
#if !defined(SS_DEBUG)
skygw_log_enable(LOG_INFO);
#endif
err = MXS_INFO("%s", logstr);
if (err != 0)
{
TEST_ERROR("Error, log write failed.");
}
ss_dassert(err == 0);
mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
logstr = ("Rather more surprising, at least at first sight, is the fact that a reference to "
"a[i] can also be written as *(a+i). In evaluating a[i], C converts it to *(a+i) "
"immediately; the two forms are equivalent. Applying the operatos & to both parts "
"of this equivalence, it follows that &a[i] and a+i are also identical: a+i is the "
"address of the i-th element beyond a.");
err = MXS_ERROR("%s", logstr);
if (err != 0)
{
TEST_ERROR("Error, log write failed.");
}
ss_dassert(err == 0);
mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
logstr = ("..... and you too?");
err = MXS_NOTICE("%s", logstr);
if (err != 0)
{
TEST_ERROR("Error, log write failed.");
}
ss_dassert(err == 0);
mxs_log_finish();
mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
#if !defined(SS_DEBUG)
skygw_log_enable(LOG_INFO);
#endif
mxs_log_flush();
logstr = ("For automatic and register variables, it is done each time the function or block is entered.");
#if !defined(SS_DEBUG)
skygw_log_enable(LOG_INFO);
#endif
err = MXS_INFO("%s", logstr);
if (err != 0)
{
TEST_ERROR("Error, log write failed.");
}
ss_dassert(err == 0);
mxs_log_finish();
mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
logstr = ("Testing. One, two, three, four\n");
err = MXS_ERROR("%s", logstr);
if (err != 0)
{
TEST_ERROR("Error, log write failed.");
}
ss_dassert(err == 0);
mxs_log_finish();
mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
logstr = ("Testing. One, two, three, .. where was I?\n");
err = MXS_ERROR("%s", logstr);
if (err != 0)
{
TEST_ERROR("Error, log write failed.");
}
ss_dassert(err == 0);
mxs_log_finish();
mxs_log_init(NULL, "/tmp", MXS_LOG_TARGET_FS);
mxs_log_finish();
simple_mutex_lock(td->mtx, true);
*td->nactive -= 1;
simple_mutex_unlock(td->mtx);
skygw_message_send(td->mes);
return NULL;
}
static int nstr(char** str_arr)
{
int i;
for (i = 0; str_arr[i] != NULL; i++)
{
}
return i;
}
char* logs[] =
{
"foo",
"bar",
"done",
"critical test logging",
"longer test l o g g g i n g",
"reeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"
"eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeally looooooooooooooooooooooooooooooooooooooo"
"ooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooong line",
"shoorter one",
"two",
"scrap : 834nuft984pnw8ynup4598yp8wup8upwn48t5gpn45",
"more the same : f98uft5p8ut2p44449upnt5",
"asdasd987987asdasd987987asdasd987987asdasd987987asdasd987987asdasd987987asdasd987987asdasd98987",
NULL
};
static void* thr_run_morelog(void* data)
{
thread_t* td = (thread_t *)data;
int err;
int i;
int nmsg;
nmsg = nstr(logs);
for (i = 0; i < NITER; i++)
{
char* str = logs[rand() % nmsg];
err = MXS_LOG_MESSAGE((int)(rand() % (LOG_DEBUG + 1)),
"%s - iteration # %d",
str,
i);
if (err != 0)
{
fprintf(stderr, "Error, log write failed.\n");
}
}
simple_mutex_lock(td->mtx, true);
*td->nactive -= 1;
simple_mutex_unlock(td->mtx);
skygw_message_send(td->mes);
return NULL;
}

View File

@ -265,6 +265,12 @@ cdc_auth_set_client_data(CDC_session *client_data,
uint8_t *client_auth_packet,
int client_auth_packet_size)
{
if (client_auth_packet_size % 2 != 0)
{
/** gw_hex2bin expects an even number of bytes */
client_auth_packet_size--;
}
int rval = CDC_STATE_AUTH_ERR;
int decoded_size = client_auth_packet_size / 2;
char decoded_buffer[decoded_size + 1]; // Extra for terminating null

View File

@ -11,3 +11,4 @@ add_subdirectory(regexfilter)
add_subdirectory(tee)
add_subdirectory(testfilter)
add_subdirectory(topfilter)
add_subdirectory(tpmfilter)

View File

@ -184,7 +184,7 @@ RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc
if (!status.ok())
{
MXS_ERROR("Could not store version information to created RocksDB database \"%s\". "
"You may need to delete the database and retry. RocksDB error: %s",
"You may need to delete the database and retry. RocksDB error: \"%s\"",
path.c_str(),
status.ToString().c_str());
}
@ -236,7 +236,7 @@ RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc
else
{
MXS_ERROR("Could not read version information from RocksDB database %s. "
"You may need to delete the database and retry. RocksDB error: %s",
"You may need to delete the database and retry. RocksDB error: \"%s\"",
path.c_str(),
status.ToString().c_str());
delete pDb;
@ -244,8 +244,13 @@ RocksDBStorage* RocksDBStorage::Create(const char* zName, uint32_t ttl, int argc
}
else
{
MXS_ERROR("Could not open/initialize RocksDB database %s. RocksDB error: %s",
MXS_ERROR("Could not open/initialize RocksDB database %s. RocksDB error: \"%s\"",
path.c_str(), status.ToString().c_str());
if (status.IsIOError())
{
MXS_ERROR("Is an other MaxScale process running?");
}
}
return pStorage;

View File

@ -484,7 +484,7 @@ regex_replace(const char *sql, pcre2_code *re, pcre2_match_data *match_data, con
size_t result_size;
/** This should never fail with rc == 0 because we used pcre2_match_data_create_from_pattern() */
if (pcre2_match(re, (PCRE2_SPTR) sql, PCRE2_ZERO_TERMINATED, 0, 0, match_data, NULL))
if (pcre2_match(re, (PCRE2_SPTR) sql, PCRE2_ZERO_TERMINATED, 0, 0, match_data, NULL) > 0)
{
result_size = strlen(sql) + strlen(replace);
result = MXS_MALLOC(result_size);

View File

@ -0,0 +1,4 @@
add_library(tpmfilter SHARED tpmfilter.c)
target_link_libraries(tpmfilter maxscale-common)
set_target_properties(tpmfilter PROPERTIES VERSION "1.0.0")
install_module(tpmfilter experimental)

View File

@ -0,0 +1,559 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl.
*
* Change Date: 2019-07-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
/**
* @file tpmfilter.c - Transaction Performance Monitoring Filter
* @verbatim
*
* A simple filter that groups queries into a transaction with the latency.
*
* The filter reads the routed queries, groups them into a transaction by
* detecting 'commit' statement at the end. The transactions are timestamped with a
* unix-timestamp and the latency of a transaction is recorded in milliseconds.
* The filter will not record transactions that are rolled back.
* Please note that the filter only works with 'autocommit' option disabled.
*
* The filter makes no attempt to deal with query packets that do not fit
* in a single GWBUF.
*
* Optional parameters:
* filename=<name of the file to which transaction performance logs are written (default=tpm.log)>
* delimiter=<delimiter for columns in a log (default='|')>
* query_delimiter=<delimiter for query statements in a transaction (default=';')>
* source=<source address to limit filter>
* user=<username to limit filter>
*
* Date Who Description
* 06/12/2015 Dong Young Yoon Initial implementation
*
* @endverbatim
*/
#include <stdio.h>
#include <fcntl.h>
#include <filter.h>
#include <modinfo.h>
#include <modutil.h>
#include <skygw_utils.h>
#include <log_manager.h>
#include <string.h>
#include <time.h>
#include <sys/time.h>
#include <regex.h>
#include <atomic.h>
/** Defined in log_manager.cc */
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
MODULE_INFO info =
{
MODULE_API_FILTER,
MODULE_GA,
FILTER_VERSION,
"Transaction Performance Monitoring filter"
};
static char *version_str = "V1.0.0";
static size_t buf_size = 10;
static size_t sql_size_limit = 64 * 1024 *
1024; /* The maximum size for query statements in a transaction (64MB) */
/*
* The filter entry points
*/
static FILTER *createInstance(const char *name, char **options, FILTER_PARAMETER **);
static void *newSession(FILTER *instance, SESSION *session);
static void closeSession(FILTER *instance, void *session);
static void freeSession(FILTER *instance, void *session);
static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstream);
static void setUpstream(FILTER *instance, void *fsession, UPSTREAM *upstream);
static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue);
static int clientReply(FILTER *instance, void *fsession, GWBUF *queue);
static void diagnostic(FILTER *instance, void *fsession, DCB *dcb);
static FILTER_OBJECT MyObject =
{
createInstance,
newSession,
closeSession,
freeSession,
setDownstream,
setUpstream,
routeQuery,
clientReply,
diagnostic,
};
/**
* A instance structure, every instance will write to a same file.
*/
typedef struct
{
int sessions; /* Session count */
char *source; /* The source of the client connection */
char *user; /* The user name to filter on */
char *filename; /* filename */
char *delimiter; /* delimiter for columns in a log */
char *query_delimiter; /* delimiter for query statements in a transaction */
int query_delimiter_size; /* the length of the query delimiter */
FILE* fp;
} TPM_INSTANCE;
/**
* The session structure for this TPM filter.
* This stores the downstream filter information, such that the
* filter is able to pass the query on to the next filter (or router)
* in the chain.
*
* It also holds the file descriptor to which queries are written.
*/
typedef struct
{
DOWNSTREAM down;
UPSTREAM up;
int active;
char *clientHost;
char *userName;
char* sql;
struct timeval start;
char *current;
int n_statements;
struct timeval total;
struct timeval current_start;
bool query_end;
char *buf;
int sql_index;
size_t max_sql_size;
} TPM_SESSION;
/**
* Implementation of the mandatory version entry point
*
* @return version string of the module
*/
char *
version()
{
return version_str;
}
/**
* The module initialisation routine, called when the module
* is first loaded.
*/
void
ModuleInit()
{
}
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
* "module object", this is a structure with the set of
* external entry points for this module.
*
* @return The module object
*/
FILTER_OBJECT *
GetModuleObject()
{
return &MyObject;
}
/**
* Create an instance of the filter for a particular service
* within MaxScale.
*
* @param options The options for this filter
* @param params The array of name/value pair parameters for the filter
*
* @return The instance data for this new instance
*/
static FILTER *
createInstance(const char *name, char **options, FILTER_PARAMETER **params)
{
int i;
TPM_INSTANCE *my_instance;
if ((my_instance = calloc(1, sizeof(TPM_INSTANCE))) != NULL)
{
my_instance->source = NULL;
my_instance->user = NULL;
/* set default log filename */
my_instance->filename = strdup("tpm.log");
/* set default delimiter */
my_instance->delimiter = strdup("|");
/* set default query delimiter */
my_instance->query_delimiter = strdup(";");
my_instance->query_delimiter_size = 1;
for (i = 0; params && params[i]; i++)
{
if (!strcmp(params[i]->name, "filename"))
{
free(my_instance->filename);
my_instance->filename = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "source"))
{
my_instance->source = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "user"))
{
my_instance->user = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "delimiter"))
{
free(my_instance->delimiter);
my_instance->delimiter = strdup(params[i]->value);
}
else if (!strcmp(params[i]->name, "query_delimiter"))
{
free(my_instance->query_delimiter);
my_instance->query_delimiter = strdup(params[i]->value);
my_instance->query_delimiter_size = strlen(my_instance->query_delimiter);
}
}
my_instance->sessions = 0;
my_instance->fp = fopen(my_instance->filename, "w");
if (my_instance->fp == NULL)
{
MXS_ERROR("Opening output file '%s' for tpmfilter failed due to %d, %s", my_instance->filename, errno,
strerror(errno));
return NULL;
}
}
return (FILTER *)my_instance;
}
/**
* Associate a new session with this instance of the filter.
*
* Every session uses the same log file.
*
* @param instance The filter instance data
* @param session The session itself
* @return Session specific data for this session
*/
static void *
newSession(FILTER *instance, SESSION *session)
{
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
TPM_SESSION *my_session;
int i;
char *remote, *user;
if ((my_session = calloc(1, sizeof(TPM_SESSION))) != NULL)
{
atomic_add(&my_instance->sessions, 1);
my_session->max_sql_size = 4 * 1024; // default max query size of 4k.
my_session->sql = (char*)malloc(my_session->max_sql_size);
memset(my_session->sql, 0x00, my_session->max_sql_size);
my_session->buf = (char*)malloc(buf_size);
my_session->sql_index = 0;
my_session->n_statements = 0;
my_session->total.tv_sec = 0;
my_session->total.tv_usec = 0;
my_session->current = NULL;
if ((remote = session_get_remote(session)) != NULL)
{
my_session->clientHost = strdup(remote);
}
else
{
my_session->clientHost = NULL;
}
if ((user = session_getUser(session)) != NULL)
{
my_session->userName = strdup(user);
}
else
{
my_session->userName = NULL;
}
my_session->active = 1;
if (my_instance->source && my_session->clientHost && strcmp(my_session->clientHost,
my_instance->source))
{
my_session->active = 0;
}
if (my_instance->user && my_session->userName && strcmp(my_session->userName,
my_instance->user))
{
my_session->active = 0;
}
}
return my_session;
}
/**
* Close a session with the filter, this is the mechanism
* by which a filter may cleanup data structure etc.
*
* @param instance The filter instance data
* @param session The session being closed
*/
static void
closeSession(FILTER *instance, void *session)
{
TPM_SESSION *my_session = (TPM_SESSION *)session;
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
if (my_instance->fp != NULL)
{
// flush FP when a session is closed.
fflush(my_instance->fp);
}
}
/**
* Free the memory associated with the session
*
* @param instance The filter instance
* @param session The filter session
*/
static void
freeSession(FILTER *instance, void *session)
{
TPM_SESSION *my_session = (TPM_SESSION *)session;
free(my_session->clientHost);
free(my_session->userName);
free(my_session->sql);
free(my_session->buf);
free(session);
return;
}
/**
* Set the downstream filter or router to which queries will be
* passed from this filter.
*
* @param instance The filter instance data
* @param session The filter session
* @param downstream The downstream filter or router.
*/
static void
setDownstream(FILTER *instance, void *session, DOWNSTREAM *downstream)
{
TPM_SESSION *my_session = (TPM_SESSION *)session;
my_session->down = *downstream;
}
/**
* Set the upstream filter or session to which results will be
* passed from this filter.
*
* @param instance The filter instance data
* @param session The filter session
* @param upstream The upstream filter or session.
*/
static void
setUpstream(FILTER *instance, void *session, UPSTREAM *upstream)
{
TPM_SESSION *my_session = (TPM_SESSION *)session;
my_session->up = *upstream;
}
/**
* The routeQuery entry point. This is passed the query buffer
* to which the filter should be applied. Once applied the
* query should normally be passed to the downstream component
* (filter or router) in the filter chain.
*
* @param instance The filter instance data
* @param session The filter session
* @param queue The query data
*/
static int
routeQuery(FILTER *instance, void *session, GWBUF *queue)
{
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
TPM_SESSION *my_session = (TPM_SESSION *)session;
char *ptr = NULL;
size_t i;
if (my_session->active)
{
if (queue->next != NULL)
{
queue = gwbuf_make_contiguous(queue);
}
if ((ptr = modutil_get_SQL(queue)) != NULL)
{
my_session->query_end = false;
/* check for commit and rollback */
if (strlen(ptr) > 5)
{
size_t ptr_size = strlen(ptr) + 1;
char* buf = my_session->buf;
for (i = 0; i < ptr_size && i < buf_size; ++i)
{
buf[i] = tolower(ptr[i]);
}
if (strncmp(buf, "commit", 6) == 0)
{
my_session->query_end = true;
}
else if (strncmp(buf, "rollback", 8) == 0)
{
my_session->query_end = true;
my_session->sql_index = 0;
}
}
/* for normal sql statements */
if (!my_session->query_end)
{
/* check and expand buffer size first. */
size_t new_sql_size = my_session->max_sql_size;
size_t len = my_session->sql_index + strlen(ptr) + my_instance->query_delimiter_size + 1;
/* if the total length of query statements exceeds the maximum limit, print an error and return */
if (len > sql_size_limit)
{
MXS_ERROR("The size of query statements exceeds the maximum buffer limit of 64MB.");
goto retblock;
}
/* double buffer size until the buffer fits the query */
while (len > new_sql_size)
{
new_sql_size *= 2;
}
if (new_sql_size > my_session->max_sql_size)
{
char* new_sql = (char*)malloc(new_sql_size);
if (new_sql == NULL)
{
MXS_ERROR("Memory allocation failure.");
goto retblock;
}
memcpy(new_sql, my_session->sql, my_session->sql_index);
free(my_session->sql);
my_session->sql = new_sql;
my_session->max_sql_size = new_sql_size;
}
/* first statement */
if (my_session->sql_index == 0)
{
memcpy(my_session->sql, ptr, strlen(ptr));
my_session->sql_index += strlen(ptr);
gettimeofday(&my_session->current_start, NULL);
}
/* otherwise, append the statement with semicolon as a statement delimiter */
else
{
/* append a query delimiter */
memcpy(my_session->sql + my_session->sql_index, my_instance->query_delimiter,
my_instance->query_delimiter_size);
/* append the next query statement */
memcpy(my_session->sql + my_session->sql_index + my_instance->query_delimiter_size, ptr, strlen(ptr));
/* set new pointer for the buffer */
my_session->sql_index += (my_instance->query_delimiter_size + strlen(ptr));
}
}
}
}
retblock:
free(ptr);
/* Pass the query downstream */
return my_session->down.routeQuery(my_session->down.instance,
my_session->down.session, queue);
}
static int
clientReply(FILTER *instance, void *session, GWBUF *reply)
{
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
TPM_SESSION *my_session = (TPM_SESSION *)session;
struct timeval tv, diff;
int i, inserted;
/* found 'commit' and sql statements exist. */
if (my_session->query_end && my_session->sql_index > 0)
{
gettimeofday(&tv, NULL);
timersub(&tv, &(my_session->current_start), &diff);
/* get latency */
uint64_t millis = (diff.tv_sec * (uint64_t)1000 + diff.tv_usec / 1000);
/* get timestamp */
uint64_t timestamp = (tv.tv_sec + (tv.tv_usec / (1000 * 1000)));
*(my_session->sql + my_session->sql_index) = '\0';
/* print to log. */
fprintf(my_instance->fp, "%ld%s%s%s%s%s%ld%s%s\n",
timestamp,
my_instance->delimiter,
my_session->clientHost,
my_instance->delimiter,
my_session->userName,
my_instance->delimiter,
millis,
my_instance->delimiter,
my_session->sql);
my_session->sql_index = 0;
}
/* Pass the result upstream */
return my_session->up.clientReply(my_session->up.instance,
my_session->up.session, reply);
}
/**
* Diagnostics routine
*
* If fsession is NULL then print diagnostics on the filter
* instance as a whole, otherwise print diagnostics for the
* particular session.
*
* @param instance The filter instance
* @param fsession Filter session, may be NULL
* @param dcb The DCB for diagnostic output
*/
static void
diagnostic(FILTER *instance, void *fsession, DCB *dcb)
{
TPM_INSTANCE *my_instance = (TPM_INSTANCE *)instance;
TPM_SESSION *my_session = (TPM_SESSION *)fsession;
int i;
if (my_instance->source)
dcb_printf(dcb, "\t\tLimit logging to connections from %s\n",
my_instance->source);
if (my_instance->user)
dcb_printf(dcb, "\t\tLimit logging to user %s\n",
my_instance->user);
if (my_instance->filename)
dcb_printf(dcb, "\t\tLogging to file %s.\n",
my_instance->filename);
if (my_instance->delimiter)
dcb_printf(dcb, "\t\tLogging with delimiter %s.\n",
my_instance->delimiter);
if (my_instance->query_delimiter)
dcb_printf(dcb, "\t\tLogging with query delimiter %s.\n",
my_instance->query_delimiter);
}

View File

@ -449,7 +449,10 @@ avro_client_process_command(AVRO_INSTANCE *router, AVRO_CLIENT *client, GWBUF *q
const char req_last_gtid[] = "QUERY-LAST-TRANSACTION";
const char req_gtid[] = "QUERY-TRANSACTION";
const size_t req_data_len = sizeof(req_data) - 1;
uint8_t *data = GWBUF_DATA(queue);
size_t buflen = gwbuf_length(queue);
uint8_t data[buflen + 1];
gwbuf_copy_data(queue, 0, buflen, data);
data[buflen] = '\0';
char *command_ptr = strstr((char *)data, req_data);
if (command_ptr != NULL)

View File

@ -667,6 +667,15 @@ static void closeSession(ROUTER *instance, void *router_session)
else
{
ss_dassert(!BREF_IS_WAITING_RESULT(bref));
/** This should never be true unless a backend reference is taken
* out of use before clearing the BREF_WAITING_RESULT state */
if (BREF_IS_WAITING_RESULT(bref))
{
MXS_WARNING("A closed backend was expecting a result, this should not be possible. "
"Decrementing active operation counter for this backend.");
bref_clear_state(bref, BREF_WAITING_RESULT);
}
}
}
/** Unlock */
@ -1044,160 +1053,6 @@ lock_failed:
return;
}
/**
* @brief Router error handling routine (API)
*
* Error Handler routine to resolve _backend_ failures. If it succeeds then
* there are enough operative backends available and connected. Otherwise it
* fails, and session is terminated.
*
* @param instance The router instance
* @param router_session The router session
* @param errmsgbuf The error message to reply
* @param backend_dcb The backend DCB
* @param action The action: ERRACT_NEW_CONNECTION or
* ERRACT_REPLY_CLIENT
* @param succp Result of action: true iff router can continue
*
* Even if succp == true connecting to new slave may have failed. succp is to
* tell whether router has enough master/slave connections to continue work.
*/
static void handleError(ROUTER *instance, void *router_session,
GWBUF *errmsgbuf, DCB *problem_dcb,
error_action_t action, bool *succp)
{
SESSION *session;
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES *rses = (ROUTER_CLIENT_SES *)router_session;
CHK_DCB(problem_dcb);
/** Don't handle same error twice on same DCB */
if (problem_dcb->dcb_errhandle_called)
{
/** we optimistically assume that previous call succeed */
/*
* The return of true is potentially misleading, but appears to
* be safe with the code as it stands on 9 Sept 2015 - MNB
*/
*succp = true;
return;
}
else
{
problem_dcb->dcb_errhandle_called = true;
}
session = problem_dcb->session;
if (session == NULL || rses == NULL)
{
*succp = false;
}
else if (DCB_ROLE_CLIENT_HANDLER == problem_dcb->dcb_role)
{
*succp = false;
}
else
{
CHK_SESSION(session);
CHK_CLIENT_RSES(rses);
switch (action)
{
case ERRACT_NEW_CONNECTION:
{
if (!rses_begin_locked_router_action(rses))
{
*succp = false;
break;
}
/**
* If master has lost its Master status error can't be
* handled so that session could continue.
*/
if (rses->rses_master_ref && rses->rses_master_ref->bref_dcb == problem_dcb &&
!SERVER_IS_MASTER(rses->rses_master_ref->bref_backend->backend_server))
{
SERVER *srv = rses->rses_master_ref->bref_backend->backend_server;
backend_ref_t *bref;
bref = get_bref_from_dcb(rses, problem_dcb);
if (bref != NULL)
{
CHK_BACKEND_REF(bref);
if (BREF_IS_WAITING_RESULT(bref))
{
bref_clear_state(bref, BREF_WAITING_RESULT);
}
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);
}
else
{
MXS_ERROR("server %s:%d lost the "
"master status but could not locate the "
"corresponding backend ref.",
srv->name, srv->port);
}
if (rses->rses_config.rw_master_failure_mode != RW_FAIL_INSTANTLY &&
(bref == NULL || !BREF_IS_WAITING_RESULT(bref)))
{
/** The failure of a master is not considered a critical
* failure as partial functionality still remains. Reads
* are allowed as long as slave servers are available
* and writes will cause an error to be returned.
*
* If we were waiting for a response from the master, we
* can't be sure whether it was executed or not. In this
* case the safest thing to do is to close the client
* connection. */
*succp = true;
}
else
{
if (!srv->master_err_is_logged)
{
MXS_ERROR("server %s:%d lost the "
"master status. Readwritesplit "
"service can't locate the master. "
"Client sessions will be closed.",
srv->name, srv->port);
srv->master_err_is_logged = true;
}
*succp = false;
}
}
else
{
/**
* This is called in hope of getting replacement for
* failed slave(s). This call may free rses.
*/
*succp = handle_error_new_connection(inst, &rses, problem_dcb, errmsgbuf);
}
/* Free the lock if rses still exists */
if (rses)
{
rses_end_locked_router_action(rses);
}
break;
}
case ERRACT_REPLY_CLIENT:
{
handle_error_reply_client(session, rses, problem_dcb, errmsgbuf);
*succp = false; /*< no new backend servers were made available */
break;
}
default:
*succp = false;
break;
}
}
dcb_close(problem_dcb);
}
/**
* @brief Get router capabilities (API)
@ -1674,6 +1529,175 @@ static bool rwsplit_process_router_options(ROUTER_INSTANCE *router,
return success;
}
/**
* @brief Router error handling routine (API)
*
* Error Handler routine to resolve _backend_ failures. If it succeeds then
* there are enough operative backends available and connected. Otherwise it
* fails, and session is terminated.
*
* @param instance The router instance
* @param router_session The router session
* @param errmsgbuf The error message to reply
* @param backend_dcb The backend DCB
* @param action The action: ERRACT_NEW_CONNECTION or
* ERRACT_REPLY_CLIENT
* @param succp Result of action: true iff router can continue
*
* Even if succp == true connecting to new slave may have failed. succp is to
* tell whether router has enough master/slave connections to continue work.
*/
static void handleError(ROUTER *instance, void *router_session,
GWBUF *errmsgbuf, DCB *problem_dcb,
error_action_t action, bool *succp)
{
SESSION *session;
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES *rses = (ROUTER_CLIENT_SES *)router_session;
CHK_DCB(problem_dcb);
/** Don't handle same error twice on same DCB */
if (problem_dcb->dcb_errhandle_called)
{
/** we optimistically assume that previous call succeed */
/*
* The return of true is potentially misleading, but appears to
* be safe with the code as it stands on 9 Sept 2015 - MNB
*/
*succp = true;
return;
}
else
{
problem_dcb->dcb_errhandle_called = true;
}
session = problem_dcb->session;
bool close_dcb = true;
if (session == NULL || rses == NULL)
{
*succp = false;
}
else if (DCB_ROLE_CLIENT_HANDLER == problem_dcb->dcb_role)
{
*succp = false;
}
else
{
CHK_SESSION(session);
CHK_CLIENT_RSES(rses);
switch (action)
{
case ERRACT_NEW_CONNECTION:
{
if (!rses_begin_locked_router_action(rses))
{
close_dcb = false; /* With the assumption that if the router session is closed,
* then so is the dcb.
*/
*succp = false;
break;
}
/**
* If master has lost its Master status error can't be
* handled so that session could continue.
*/
if (rses->rses_master_ref && rses->rses_master_ref->bref_dcb == problem_dcb &&
!SERVER_IS_MASTER(rses->rses_master_ref->bref_backend->backend_server))
{
SERVER *srv = rses->rses_master_ref->bref_backend->backend_server;
backend_ref_t *bref;
bref = get_bref_from_dcb(rses, problem_dcb);
if (bref != NULL)
{
CHK_BACKEND_REF(bref);
if (BREF_IS_WAITING_RESULT(bref))
{
bref_clear_state(bref, BREF_WAITING_RESULT);
}
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);
}
else
{
MXS_ERROR("server %s:%d lost the "
"master status but could not locate the "
"corresponding backend ref.",
srv->name, srv->port);
}
if (rses->rses_config.rw_master_failure_mode != RW_FAIL_INSTANTLY &&
(bref == NULL || !BREF_IS_WAITING_RESULT(bref)))
{
/** The failure of a master is not considered a critical
* failure as partial functionality still remains. Reads
* are allowed as long as slave servers are available
* and writes will cause an error to be returned.
*
* If we were waiting for a response from the master, we
* can't be sure whether it was executed or not. In this
* case the safest thing to do is to close the client
* connection. */
*succp = true;
}
else
{
if (!srv->master_err_is_logged)
{
MXS_ERROR("server %s:%d lost the "
"master status. Readwritesplit "
"service can't locate the master. "
"Client sessions will be closed.",
srv->name, srv->port);
srv->master_err_is_logged = true;
}
*succp = false;
}
}
else
{
/**
* This is called in hope of getting replacement for
* failed slave(s). This call may free rses.
*/
*succp = handle_error_new_connection(inst, &rses, problem_dcb, errmsgbuf);
}
dcb_close(problem_dcb);
close_dcb = false;
/* Free the lock if rses still exists */
if (rses)
{
rses_end_locked_router_action(rses);
}
break;
}
case ERRACT_REPLY_CLIENT:
{
handle_error_reply_client(session, rses, problem_dcb, errmsgbuf);
close_dcb = false;
*succp = false; /*< no new backend servers were made available */
break;
}
default:
ss_dassert(!true);
*succp = false;
break;
}
}
if (close_dcb)
{
dcb_close(problem_dcb);
}
}
/**
* @brief Handle an error reply for a client
*
@ -1694,18 +1718,39 @@ static void handle_error_reply_client(SESSION *ses, ROUTER_CLIENT_SES *rses,
client_dcb = ses->client_dcb;
spinlock_release(&ses->ses_lock);
/**
* If bref exists, mark it closed
*/
if ((bref = get_bref_from_dcb(rses, backend_dcb)) != NULL)
if (rses_begin_locked_router_action(rses))
{
CHK_BACKEND_REF(bref);
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);
if (BREF_IS_WAITING_RESULT(bref))
/**
* If bref exists, mark it closed
*/
if ((bref = get_bref_from_dcb(rses, backend_dcb)) != NULL)
{
bref_clear_state(bref, BREF_WAITING_RESULT);
CHK_BACKEND_REF(bref);
if (BREF_IS_IN_USE(bref))
{
bref_clear_state(bref, BREF_IN_USE);
bref_set_state(bref, BREF_CLOSED);
if (BREF_IS_WAITING_RESULT(bref))
{
bref_clear_state(bref, BREF_WAITING_RESULT);
}
dcb_close(backend_dcb);
}
}
else
{
// All dcbs should be associated with a backend reference.
ss_dassert(!true);
}
rses_end_locked_router_action(rses);
}
else
{
// The session has already been closed, hence the dcb has been
// closed as well.
}
if (sesstate == SESSION_STATE_ROUTER_READY)