Merge branch 'release-1.0GA' into firewall

Conflicts:
	query_classifier/query_classifier.cc
This commit is contained in:
Markus Makela 2014-12-09 14:23:07 +02:00
commit f2cec6e51e
34 changed files with 860 additions and 488 deletions

View File

@ -455,23 +455,14 @@ return_succp:
/**
* @node Initializes log managing routines in MariaDB Corporation MaxScale.
* Initializes log managing routines in MariaDB Corporation MaxScale.
*
* Parameters:
* @param p_ctx - in, give
* pointer to memory location where logmanager stores private write
* buffer.
* @param argc number of arguments in argv array
*
* @param argc - in, use
* number of arguments in argv array
* @param argv arguments array
*
* @param argv - in, use
* arguments array
*
* @return
*
*
* @details (write detailed description here)
* @return true if succeed, otherwise false
*
*/
bool skygw_logmanager_init(
@ -495,7 +486,12 @@ return_succp:
return succp;
}
/**
* Release resources of log manager.
*
* Lock must have been acquired before calling
* this function.
*/
static void logmanager_done_nomutex(void)
{
int i;
@ -540,17 +536,7 @@ static void logmanager_done_nomutex(void)
/**
* @node This function is provided for atexit() system function.
*
* Parameters:
* @param void - <usage>
* <description>
*
* @return void
*
*
* @details (write detailed description here)
*
* This function is provided for atexit() system function.
*/
void skygw_logmanager_exit(void)
{
@ -558,20 +544,9 @@ void skygw_logmanager_exit(void)
}
/**
* @node End execution of log manager
* End execution of log manager
*
* Parameters:
* @param p_ctx - in, take
* pointer to memory location including context pointer. Context will
* be freed in this function.
*
* @param logmanager - in, use
* pointer to logmanager.
*
* @return void
*
*
* @details Stops file writing thread, releases filewriter, and logfiles.
* Stops file writing thread, releases filewriter, and logfiles.
*
*/
void skygw_logmanager_done(void)
@ -623,38 +598,25 @@ static logfile_t* logmanager_get_logfile(
/**
* @node Finds write position from block buffer for log string and writes there.
*
* Finds write position from block buffer for log string and writes there.
*
* Parameters:
*
* @param id - in, use
* logfile object identifier
* @param id logfile object identifier
* @param flush indicates whether log string must be written to disk
* immediately
* @param use_valist does write involve formatting of the string and use of
* valist argument
* @param spread_down if true, log string is spread to all logs having
* larger id.
* @param rotate if set, closes currently open log file and opens a
* new one
* @param str_len length of formatted string
* @param str string to be written to log
* @param valist variable-length argument list for formatting the string
*
* @param flush - in, use
* indicates whether log string must be written to disk immediately
*
* @param use_valist - in, use
* does write involve formatting of the string and use of valist argument
*
* @param spread_down - in, use
* if true, log string is spread to all logs having larger id.
* @return 0 if succeed, -1 otherwise
*
* @param rotate if set, closes currently open log file and opens a new one
*
* @param str_len - in, use
* length of formatted string
*
* @param str - in, use
* string to be written to log
*
* @param valist - in, use
* variable-length argument list for formatting the string
*
* @return
*
*
* @details (write detailed description here)
*
*/
static int logmanager_write_log(
logfile_id_t id,
@ -905,6 +867,12 @@ return_err:
return err;
}
/**
* Register writer to a block buffer. When reference counter is non-zero the
* flusher thread doesn't write the block to disk.
*
* @param bb block buffer
*/
static void blockbuf_register(
blockbuf_t* bb)
{
@ -913,7 +881,12 @@ static void blockbuf_register(
atomic_add(&bb->bb_refcount, 1);
}
/**
* Unregister writer from block buffer. If the buffer got filled up and there
* are no other registered writers anymore, notify the flusher thread.
*
* @param bb block buffer
*/
static void blockbuf_unregister(
blockbuf_t* bb)
{
@ -2612,6 +2585,7 @@ static bool logfile_init(
goto return_with_succp;
}
#if defined(SS_DEBUG)
if (store_shmem)
{
fprintf(stderr, "%s\t: %s->%s\n",
@ -2625,6 +2599,7 @@ static bool logfile_init(
STRLOGNAME(logfile_id),
logfile->lf_full_file_name);
}
#endif
succp = true;
logfile->lf_state = RUN;
CHK_LOGFILE(logfile);

View File

@ -362,50 +362,29 @@ static bool create_parse_tree(
Parser_state parser_state;
bool failp = FALSE;
const char* virtual_db = "skygw_virtual";
#if defined(SS_DEBUG_EXTRA)
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"[readwritesplit:create_parse_tree] 1.")));
#endif
if (parser_state.init(thd, thd->query(), thd->query_length())) {
if (parser_state.init(thd, thd->query(), thd->query_length()))
{
failp = TRUE;
goto return_here;
}
#if defined(SS_DEBUG_EXTRA)
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"[readwritesplit:create_parse_tree] 2.")));
#endif
mysql_reset_thd_for_next_command(thd);
#if defined(SS_DEBUG_EXTRA)
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"[readwritesplit:create_parse_tree] 3.")));
#endif
/**
* Set some database to thd so that parsing won't fail because of
* missing database. Then parse.
*/
failp = thd->set_db(virtual_db, strlen(virtual_db));
#if defined(SS_DEBUG_EXTRA)
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"[readwritesplit:create_parse_tree] 4.")));
#endif
if (failp) {
if (failp)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Failed to set database in thread context.")));
}
failp = parse_sql(thd, &parser_state, NULL);
#if defined(SS_DEBUG_EXTRA)
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"[readwritesplit:create_parse_tree] 5.")));
#endif
if (failp) {
if (failp)
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [readwritesplit:create_parse_tree] failed to "
@ -417,16 +396,14 @@ return_here:
}
/**
* @node Set new query type if new is more restrictive than old.
* Set new query type if new is more restrictive than old.
*
* Parameters:
* @param qtype - <usage>
* <description>
* @param qtype Existing type
*
* @param new_type - <usage>
* <description>
* @param new_type New query type
*
* @return
* @return Query type as an unsigned int value which must be casted to qtype.
*
*
* @details The implementation relies on that enumerated values correspond
@ -443,13 +420,11 @@ static u_int32_t set_query_type(
}
/**
* @node Detect query type, read-only, write, or session update
* Detect query type by examining parsed representation of it.
*
* Parameters:
* @param thd - <usage>
* <description>
* @param thd MariaDB thread context.
*
* @return
* @return Copy of query type value.
*
*
* @details Query type is deduced by checking for certain properties
@ -474,7 +449,7 @@ static skygw_query_type_t resolve_query_type(
* all write operations to all nodes.
*/
#if defined(NOT_IN_USE)
bool force_data_modify_op_replication;
bool force_data_modify_op_replication;
force_data_modify_op_replication = FALSE;
#endif /* NOT_IN_USE */
ss_info_dassert(thd != NULL, ("thd is NULL\n"));
@ -868,6 +843,11 @@ return_qtype:
* Checks if statement causes implicit COMMIT.
* autocommit_stmt gets values 1, 0 or -1 if stmt is enable, disable or
* something else than autocommit.
*
* @param lex Parse tree
* @param autocommit_stmt memory address for autocommit status
*
* @return true if statement causes implicit commit and false otherwise
*/
static bool skygw_stmt_causes_implicit_commit(
LEX* lex,
@ -897,7 +877,7 @@ static bool skygw_stmt_causes_implicit_commit(
}
else
{
succp =false;
succp = false;
}
break;
default:
@ -913,7 +893,9 @@ return_succp:
* Finds out if stmt is SET autocommit
* and if the new value matches with the enable_cmd argument.
*
* Returns 1, 0, or -1 if command was:
* @param lex parse tree
*
* @return 1, 0, or -1 if command was:
* enable, disable, or not autocommit, respectively.
*/
static int is_autocommit_stmt(
@ -984,9 +966,11 @@ char* skygw_query_classifier_get_stmtname(
}
/**
*Returns the LEX struct of the parsed GWBUF
*@param The parsed GWBUF
*@return Pointer to the LEX struct or NULL if an error occurred or the query was not parsed
* Get the parse tree from parsed querybuf.
* @param querybuf The parsed GWBUF
*
* @return Pointer to the LEX struct or NULL if an error occurred or the query
* was not parsed
*/
LEX* get_lex(GWBUF* querybuf)
{

View File

@ -208,18 +208,13 @@ passwd=mypwd
#router_options=slave_selection_criteria=
#filters=fetch|qla
[HTTPD Router]
type=service
router=testroute
servers=server1,server2,server3
[Debug Interface]
type=service
router=debugcli
[CLI]
type=service
router=CLI
router=cli
## Listener definitions for the services
#
@ -270,12 +265,6 @@ protocol=telnetd
#address=127.0.0.1
port=4442
[HTTPD Listener]
type=listener
service=HTTPD Router
protocol=HTTPD
port=6444
[CLI Listener]
type=listener
service=CLI

View File

@ -32,11 +32,13 @@
* x.y.z.%, x.y.%.%, x.%.%.%
* 03/10/14 Massimiliano Pinto Added netmask to user@host authentication for wildcard in IPv4 hosts
* 13/10/14 Massimiliano Pinto Added (user@host)@db authentication
* 04/12/14 Massimiliano Pinto Added support for IPv$ wildcard hosts: a.%, a.%.% and a.b.%
*
* @endverbatim
*/
#include <stdio.h>
#include <ctype.h>
#include <mysql.h>
#include <dcb.h>
@ -82,6 +84,7 @@ void resource_free(HASHTABLE *resource);
void *resource_fetch(HASHTABLE *, char *);
int resource_add(HASHTABLE *, char *, char *);
int resource_hash(char *);
static int normalize_hostname(char *input_host, char *output_host);
/**
* Load the user/passwd form mysql.user table into the service users' hashtable
@ -217,8 +220,6 @@ int add_mysql_users_with_host_ipv4(USERS *users, char *user, char *host, char *p
struct sockaddr_in serv_addr;
MYSQL_USER_HOST key;
char ret_ip[INET_ADDRSTRLEN + 1]="";
int found_range=0;
int found_any=0;
int ret = 0;
if (users == NULL || user == NULL || host == NULL) {
@ -255,42 +256,30 @@ int add_mysql_users_with_host_ipv4(USERS *users, char *user, char *host, char *p
/* ANY */
if (strcmp(host, "%") == 0) {
strcpy(ret_ip, "0.0.0.0");
found_any = 1;
key.netmask = 0;
} else {
char *tmp;
strncpy(ret_ip, host, INET_ADDRSTRLEN);
tmp = ret_ip+strlen(ret_ip)-1;
/* hostname without % wildcards has netmask = 32 */
key.netmask = normalize_hostname(host, ret_ip);
/* start from Class C */
while(tmp > ret_ip) {
if (*tmp == '%') {
/* set only the last IPv4 byte to 1
* avoiding setipadress() failure
* for Class C address
*/
found_range++;
if (found_range == 1)
*tmp = '1';
else
*tmp = '0';
}
tmp--;
if (key.netmask == -1) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : strdup() failed in normalize_hostname for %s@%s",
user,
host)));
}
}
/* fill IPv4 data struct */
if (setipaddress(&serv_addr.sin_addr, ret_ip)) {
if (setipaddress(&serv_addr.sin_addr, ret_ip) && strlen(ret_ip)) {
/* copy IPv4 data into key.ipv4 */
memcpy(&key.ipv4, &serv_addr, sizeof(serv_addr));
if (found_range) {
/* let's zero the last IP byte: a.b.c.0 we set above to 1*/
/* if netmask < 32 there are % wildcards */
if (key.netmask < 32) {
/* let's zero the last IP byte: a.b.c.0 we may have set above to 1*/
key.ipv4.sin_addr.s_addr &= 0x00FFFFFF;
key.netmask = 32 - (found_range * 8);
} else {
key.netmask = 32 - (found_any * 32);
}
/* add user@host as key and passwd as value in the MySQL users hash table */
@ -1120,3 +1109,87 @@ resource_fetch(HASHTABLE *resources, char *key)
return hashtable_fetch(resources, key);
}
/**
* Normalize hostname with % wildcards to a valid IP string.
*
* Valid input values:
* a.b.c.d, a.b.c.%, a.b.%.%, a.%.%.%
* Short formats a.% and a.%.% are both converted to a.%.%.%
* Short format a.b.% is converted to a.b.%.%
*
* Last host byte is set to 1, avoiding setipadress() failure
*
* @param input_host The hostname with possible % wildcards
* @param output_host The normalized hostname (buffer must be preallocated)
* @return The calculated netmask or -1 on failure
*/
static int normalize_hostname(char *input_host, char *output_host)
{
int netmask, bytes, bits = 0, found_wildcard = 0;
char *p, *lasts, *tmp;
int useorig = 0;
output_host[0] = 0;
bytes = 0;
tmp = strdup(input_host);
if (tmp == NULL) {
return -1;
}
p = strtok_r(tmp, ".", &lasts);
while (p != NULL)
{
if (strcmp(p, "%"))
{
if (! isdigit(*p))
useorig = 1;
strcat(output_host, p);
bits += 8;
}
else if (bytes == 3)
{
found_wildcard = 1;
strcat(output_host, "1");
}
else
{
found_wildcard = 1;
strcat(output_host, "0");
}
bytes++;
p = strtok_r(NULL, ".", &lasts);
if (p)
strcat(output_host, ".");
}
if (found_wildcard)
{
netmask = bits;
while (bytes++ < 4)
{
if (bytes == 4)
{
strcat(output_host, ".1");
}
else
{
strcat(output_host, ".0");
}
}
}
else
netmask = 32;
if (useorig == 1)
{
netmask = 32;
strcpy(output_host, input_host);
}
free(tmp);
return netmask;
}

View File

@ -332,8 +332,7 @@ DOWNSTREAM *me;
if ((filter->obj = load_module(filter->module,
MODULE_FILTER)) == NULL)
{
me = NULL;
goto retblock;
return NULL;
}
}
@ -342,8 +341,7 @@ DOWNSTREAM *me;
if ((filter->filter = (filter->obj->createInstance)(filter->options,
filter->parameters)) == NULL)
{
me = NULL;
goto retblock;
return NULL;
}
}
if ((me = (DOWNSTREAM *)calloc(1, sizeof(DOWNSTREAM))) == NULL)
@ -355,7 +353,7 @@ DOWNSTREAM *me;
errno,
strerror(errno))));
goto retblock;
return NULL;
}
me->instance = filter->filter;
me->routeQuery = (void *)(filter->obj->routeQuery);
@ -363,12 +361,10 @@ DOWNSTREAM *me;
if ((me->session=filter->obj->newSession(me->instance, session)) == NULL)
{
free(me);
me = NULL;
goto retblock;
return NULL;
}
filter->obj->setDownstream(me->instance, me->session, downstream);
retblock:
return me;
}

View File

@ -637,55 +637,63 @@ static bool resolve_maxscale_homedir(
}
check_home_dir:
if (*p_home_dir != NULL)
{
if (!file_is_readable(*p_home_dir))
{
char* tailstr = "MaxScale doesn't have read permission "
"to MAXSCALE_HOME.";
char* logstr = (char*)malloc(strlen(log_context)+
1+
strlen(tailstr)+
1);
snprintf(logstr,
strlen(log_context)+
1+
strlen(tailstr)+1,
"%s:%s",
log_context,
tailstr);
print_log_n_stderr(true, true, logstr, logstr, 0);
free(logstr);
goto return_succp;
}
#if WRITABLE_HOME
if (!file_is_writable(*p_home_dir))
{
char* tailstr = "MaxScale doesn't have write permission "
"to MAXSCALE_HOME. Exiting.";
char* logstr = (char*)malloc(strlen(log_context)+
1+
strlen(tailstr)+
1);
snprintf(logstr,
strlen(log_context)+
1+
strlen(tailstr)+1,
"%s:%s",
log_context,
tailstr);
print_log_n_stderr(true, true, logstr, logstr, 0);
free(logstr);
goto return_succp;
}
#endif
if (!daemon_mode)
{
fprintf(stderr,
"Using %s as MAXSCALE_HOME = %s\n",
log_context,
tmp);
}
succp = true;
goto return_succp;
}
return_succp:
free (tmp);
if (*p_home_dir != NULL)
{
char* errstr;
errstr = check_dir_access(*p_home_dir);
if (errstr != NULL)
{
char* logstr = (char*)malloc(strlen(log_context)+
1+
strlen(errstr)+
1);
snprintf(logstr,
strlen(log_context)+
1+
strlen(errstr)+1,
"%s: %s",
log_context,
errstr);
print_log_n_stderr(true, true, logstr, logstr, 0);
free(errstr);
free(logstr);
succp = false;
}
else
{
succp = true;
if (!daemon_mode)
{
fprintf(stderr,
"Using %s as MAXSCALE_HOME = %s\n",
log_context,
(tmp == NULL ? *p_home_dir : tmp));
}
}
}
else
{
succp = false;
}
if (tmp != NULL)
{
free(tmp);
}
if (log_context != NULL)
{

View File

@ -198,7 +198,12 @@ HASHENTRIES *entry, *ptr;
* @param vfreefn The free function for the value
*/
void
hashtable_memory_fns(HASHTABLE *table, HASHMEMORYFN kcopyfn, HASHMEMORYFN vcopyfn, HASHMEMORYFN kfreefn, HASHMEMORYFN vfreefn)
hashtable_memory_fns(
HASHTABLE *table,
HASHMEMORYFN kcopyfn,
HASHMEMORYFN vcopyfn,
HASHMEMORYFN kfreefn,
HASHMEMORYFN vfreefn)
{
if (kcopyfn != NULL)
table->kcopyfn = kcopyfn;

View File

@ -349,8 +349,15 @@ serviceStart(SERVICE *service)
SERV_PROTOCOL *port;
int listeners = 0;
service->router_instance = service->router->createInstance(service,
service->routerOptions);
if((service->router_instance = service->router->createInstance(service,
service->routerOptions)) == NULL)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Error : Failed to start router for service '%s'.",
service->name)));
return listeners;
}
port = service->ports;
while (port)
@ -395,12 +402,21 @@ int
serviceStartAll()
{
SERVICE *ptr;
int n = 0;
int n = 0,i;
ptr = allServices;
while (ptr)
{
n += serviceStart(ptr);
n += (i = serviceStart(ptr));
if(i == 0)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Error : Failed to start service '%s'.",
ptr->name)));
}
ptr = ptr->next;
}
return n;

View File

@ -392,6 +392,22 @@ int main() {
if (!ret) fprintf(stderr, "\t-- Expecting ok\n");
assert(ret == 0);
ret = set_and_get_mysql_users_wildcards("pippo", "192.%", "foo", "192.254.254.242", NULL, NULL, NULL);
if (!ret) fprintf(stderr, "\t-- Expecting ok\n");
assert(ret == 0);
ret = set_and_get_mysql_users_wildcards("pippo", "192.%.%", "foo", "192.254.254.242", NULL, NULL, NULL);
if (!ret) fprintf(stderr, "\t-- Expecting ok\n");
assert(ret == 0);
ret = set_and_get_mysql_users_wildcards("pippo", "192.254.%", "foo", "192.254.254.242", NULL, NULL, NULL);
if (!ret) fprintf(stderr, "\t-- Expecting ok\n");
assert(ret == 0);
ret = set_and_get_mysql_users_wildcards("pippo", "192.254.%", "foo", "192.254.0.242", NULL, NULL, NULL);
if (!ret) fprintf(stderr, "\t-- Expecting ok\n");
assert(ret == 0);
ret = set_and_get_mysql_users_wildcards("riccio", "192.0.0.%", "foo", "192.134.0.2", NULL, NULL, NULL);
if (ret) fprintf(stderr, "\t-- Expecting no match\n");
assert(ret == 1);

View File

@ -58,7 +58,7 @@ extern __thread log_info_t tls_log_info;
MODULE_INFO info = {
MODULE_API_FILTER,
MODULE_BETA_RELEASE,
MODULE_GA,
FILTER_VERSION,
"A simple query logging filter"
};

View File

@ -48,7 +48,7 @@ extern __thread log_info_t tls_log_info;
MODULE_INFO info = {
MODULE_API_FILTER,
MODULE_BETA_RELEASE,
MODULE_GA,
FILTER_VERSION,
"A query rewrite filter that uses regular expressions to rewite queries"
};

View File

@ -65,7 +65,7 @@ extern __thread log_info_t tls_log_info;
MODULE_INFO info = {
MODULE_API_FILTER,
MODULE_BETA_RELEASE,
MODULE_GA,
FILTER_VERSION,
"A tee piece in the filter plumbing"
};

View File

@ -55,7 +55,7 @@ extern __thread log_info_t tls_log_info;
MODULE_INFO info = {
MODULE_API_FILTER,
MODULE_BETA_RELEASE,
MODULE_GA,
FILTER_VERSION,
"A top N query logging filter"
};

View File

@ -41,6 +41,8 @@
#define BINLOG_NAMEFMT "%s.%06d"
#define BINLOG_NAME_ROOT "mysql-bin"
#define BINLOG_EVENT_HDR_LEN 19
/* How often to call the binlog status function (seconds) */
#define BLR_STATS_FREQ 60
#define BLR_NSTATS_MINUTES 30
@ -64,9 +66,9 @@
* BLR_MASTER_BACKOFF_TIME The increments of the back off time (seconds)
* BLR_MAX_BACKOFF Maximum number of increments to backoff to
*/
#define BLR_MASTER_BACKOFF_TIME 5
#define BLR_MASTER_BACKOFF_TIME 10
#define BLR_MAX_BACKOFF 60
/**
* Some useful macros for examining the MySQL Response packets
*/
@ -128,6 +130,7 @@ typedef struct blfile {
*/
typedef struct {
int n_events; /*< Number of events sent */
unsigned long n_bytes; /*< Number of bytes sent */
int n_bursts; /*< Number of bursts sent */
int n_requests; /*< Number of requests received */
int n_flows; /*< Number of flow control restarts */
@ -138,6 +141,7 @@ typedef struct {
int n_above;
int n_failed_read;
int n_overrun;
int n_caughtup;
int n_actions[3];
uint64_t lastsample;
int minno;
@ -175,6 +179,7 @@ typedef struct router_slave {
*router; /*< Pointer to the owning router */
struct router_slave *next;
SLAVE_STATS stats; /*< Slave statistics */
time_t connect_time; /*< Connect time of slave */
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail;
#endif
@ -188,6 +193,7 @@ typedef struct {
int n_slaves; /*< Number slave sessions created */
int n_reads; /*< Number of record reads */
uint64_t n_binlogs; /*< Number of binlog records from master */
uint64_t n_binlogs_ses; /*< Number of binlog records from master */
uint64_t n_binlog_errors;/*< Number of binlog records from master */
uint64_t n_rotates; /*< Number of binlog rotate events */
uint64_t n_cachehits; /*< Number of hits on the binlog cache */
@ -265,10 +271,12 @@ typedef struct router_instance {
unsigned int short_burst; /*< Short burst for slave catchup */
unsigned int long_burst; /*< Long burst for slave catchup */
unsigned long burst_size; /*< Maximum size of burst to send */
unsigned long heartbeat; /*< Configured heartbeat value */
ROUTER_STATS stats; /*< Statistics for this router */
int active_logs;
int reconnect_pending;
int retry_backoff;
time_t connect_time;
int handling_threads;
struct router_instance
*next;
@ -278,25 +286,26 @@ typedef struct router_instance {
* State machine for the master to MaxScale replication
*/
#define BLRM_UNCONNECTED 0x0000
#define BLRM_AUTHENTICATED 0x0001
#define BLRM_TIMESTAMP 0x0002
#define BLRM_SERVERID 0x0003
#define BLRM_HBPERIOD 0x0004
#define BLRM_CHKSUM1 0x0005
#define BLRM_CHKSUM2 0x0006
#define BLRM_GTIDMODE 0x0007
#define BLRM_MUUID 0x0008
#define BLRM_SUUID 0x0009
#define BLRM_LATIN1 0x000A
#define BLRM_UTF8 0x000B
#define BLRM_SELECT1 0x000C
#define BLRM_SELECTVER 0x000D
#define BLRM_REGISTER 0x000E
#define BLRM_BINLOGDUMP 0x000F
#define BLRM_CONNECTING 0x0001
#define BLRM_AUTHENTICATED 0x0002
#define BLRM_TIMESTAMP 0x0003
#define BLRM_SERVERID 0x0004
#define BLRM_HBPERIOD 0x0005
#define BLRM_CHKSUM1 0x0006
#define BLRM_CHKSUM2 0x0007
#define BLRM_GTIDMODE 0x0008
#define BLRM_MUUID 0x0009
#define BLRM_SUUID 0x000A
#define BLRM_LATIN1 0x000B
#define BLRM_UTF8 0x000C
#define BLRM_SELECT1 0x000D
#define BLRM_SELECTVER 0x000E
#define BLRM_REGISTER 0x000F
#define BLRM_BINLOGDUMP 0x0010
#define BLRM_MAXSTATE 0x000F
#define BLRM_MAXSTATE 0x0010
static char *blrm_states[] = { "Unconnected", "Authenticated", "Timestamp retrieval",
static char *blrm_states[] = { "Unconnected", "Connecting", "Authenticated", "Timestamp retrieval",
"Server ID retrieval", "HeartBeat Period setup", "binlog checksum config",
"binlog checksum rerieval", "GTID Mode retrieval", "Master UUID retrieval",
"Set Slave UUID", "Set Names latin1", "Set Names utf8", "select 1",
@ -371,6 +380,8 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered",
#define ANONYMOUS_GTID_EVENT 0x22
#define PREVIOUS_GTIDS_EVENT 0x23
#define MAX_EVENT_TYPE 0x23
/**
* Binlog event flags
*/

View File

@ -252,7 +252,7 @@ typedef enum mysql_server_cmd {
typedef struct server_command_st {
mysql_server_cmd_t scom_cmd;
int scom_nresponse_packets; /*< packets in response */
size_t scom_nbytes_to_read; /*< bytes left to read in current packet */
ssize_t scom_nbytes_to_read; /*< bytes left to read in current packet */
struct server_command_st* scom_next;
} server_command_t;
@ -388,8 +388,8 @@ void protocol_remove_srv_command(MySQLProtocol* p);
bool protocol_waits_response(MySQLProtocol* p);
mysql_server_cmd_t protocol_get_srv_command(MySQLProtocol* p,bool removep);
int get_stmt_nresponse_packets(GWBUF* buf, mysql_server_cmd_t cmd);
bool protocol_get_response_status (MySQLProtocol* p, int* npackets, size_t* nbytes);
void protocol_set_response_status (MySQLProtocol* p, int npackets, size_t nbytes);
bool protocol_get_response_status (MySQLProtocol* p, int* npackets, ssize_t* nbytes);
void protocol_set_response_status (MySQLProtocol* p, int npackets, ssize_t nbytes);
void protocol_archive_srv_command(MySQLProtocol* p);
@ -397,6 +397,6 @@ void init_response_status (
GWBUF* buf,
mysql_server_cmd_t cmd,
int* npackets,
size_t* nbytes);
ssize_t* nbytes);

View File

@ -61,7 +61,7 @@ static char *version_str = "V1.4.0";
MODULE_INFO info = {
MODULE_API_MONITOR,
MODULE_BETA_RELEASE,
MODULE_GA,
MONITOR_VERSION,
"A Galera cluster monitor"
};

View File

@ -75,7 +75,7 @@ static char *version_str = "V1.4.0";
MODULE_INFO info = {
MODULE_API_MONITOR,
MODULE_BETA_RELEASE,
MODULE_GA,
MONITOR_VERSION,
"A MySQL Master/Slave replication monitor"
};
@ -676,12 +676,21 @@ int log_no_master = 1;
if (mon_status_changed(ptr))
{
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
#if defined(SS_DEBUG)
LOGIF(LT, (skygw_log_write_flush(
LOGFILE_TRACE,
"Backend server %s:%d state : %s",
ptr->server->name,
ptr->server->port,
STRSRVSTATUS(ptr->server))));
#else
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"Backend server %s:%d state : %s",
ptr->server->name,
ptr->server->port,
STRSRVSTATUS(ptr->server))));
#endif
}
if (SERVER_IS_DOWN(ptr->server))
@ -753,8 +762,8 @@ int log_no_master = 1;
if (root_master && mon_status_changed(root_master) && !(root_master->server->status & SERVER_STALE_STATUS)) {
if (root_master->pending_status & (SERVER_MASTER)) {
if (!(root_master->mon_prev_status & SERVER_STALE_STATUS) && !(root_master->server->status & SERVER_MAINT)) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"Info: A Master Server is now available: %s:%i",
root_master->server->name,
root_master->server->port)));

View File

@ -40,7 +40,7 @@
MODULE_INFO info = {
MODULE_API_PROTOCOL,
MODULE_BETA_RELEASE,
MODULE_GA,
GWPROTOCOL_VERSION,
"A maxscale protocol for the administration interface"
};

View File

@ -30,8 +30,6 @@
* Date Who Description
* 14/06/2013 Mark Riddoch Initial version
* 17/06/2013 Massimiliano Pinto Added MaxScale To Backends routines
* 27/06/2013 Vilho Raatikka Added skygw_log_write command as an example
* and necessary headers.
* 01/07/2013 Massimiliano Pinto Put Log Manager example code behind SS_DEBUG macros.
* 03/07/2013 Massimiliano Pinto Added delayq for incoming data before mysql connection
* 04/07/2013 Massimiliano Pinto Added asyncrhronous MySQL protocol connection to backend
@ -51,7 +49,7 @@
MODULE_INFO info = {
MODULE_API_PROTOCOL,
MODULE_BETA_RELEASE,
MODULE_GA,
GWPROTOCOL_VERSION,
"The MySQL to backend server protocol"
};
@ -835,7 +833,7 @@ static int gw_error_backend_event(DCB *dcb)
len = sizeof(error);
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0)
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *)&len) == 0)
{
if (error != 0)
{
@ -877,7 +875,7 @@ static int gw_error_backend_event(DCB *dcb)
char buf[100];
len = sizeof(error);
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0)
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *)&len) == 0)
{
if (error != 0)
{
@ -962,11 +960,20 @@ static int gw_create_backend_connection(
}
/** Copy client flags to backend protocol */
protocol->client_capabilities =
((MySQLProtocol *)(backend_dcb->session->client->protocol))->client_capabilities;
/** Copy client charset to backend protocol */
protocol->charset =
((MySQLProtocol *)(backend_dcb->session->client->protocol))->charset;
if (backend_dcb->session->client->protocol)
{
/** Copy client flags to backend protocol */
protocol->client_capabilities =
((MySQLProtocol *)(backend_dcb->session->client->protocol))->client_capabilities;
/** Copy client charset to backend protocol */
protocol->charset =
((MySQLProtocol *)(backend_dcb->session->client->protocol))->charset;
}
else
{
protocol->client_capabilities = GW_MYSQL_CAPABILITIES_CLIENT;
protocol->charset = 0x08;
}
/*< if succeed, fd > 0, -1 otherwise */
rv = gw_do_connect_to_backend(server->name, server->port, &fd);
@ -1083,7 +1090,7 @@ gw_backend_hangup(DCB *dcb)
char buf[100];
len = sizeof(error);
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0)
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, (socklen_t *)&len) == 0)
{
if (error != 0)
{
@ -1334,7 +1341,6 @@ static int gw_change_user(
/* get the auth token len */
memcpy(&auth_token_len, client_auth_packet, 1);
ss_dassert(auth_token_len >= 0);
client_auth_packet++;
@ -1483,7 +1489,7 @@ static GWBUF* process_response_data (
int nbytes_to_process) /*< number of new bytes read */
{
int npackets_left = 0; /*< response's packet count */
size_t nbytes_left = 0; /*< nbytes to be read for the packet */
ssize_t nbytes_left = 0; /*< nbytes to be read for the packet */
MySQLProtocol* p;
GWBUF* outbuf = NULL;
@ -1557,11 +1563,13 @@ static GWBUF* process_response_data (
*/
else /*< nbytes_left < nbytes_to_process */
{
ss_dassert(nbytes_left >= 0);
nbytes_to_process -= nbytes_left;
/** Move the prefix of the buffer to outbuf from redbuf */
outbuf = gwbuf_append(outbuf, gwbuf_clone_portion(readbuf, 0, nbytes_left));
readbuf = gwbuf_consume(readbuf, nbytes_left);
outbuf = gwbuf_append(outbuf,
gwbuf_clone_portion(readbuf, 0, (size_t)nbytes_left));
readbuf = gwbuf_consume(readbuf, (size_t)nbytes_left);
ss_dassert(npackets_left > 0);
npackets_left -= 1;
nbytes_left = 0;
@ -1609,7 +1617,7 @@ static bool sescmd_response_complete(
DCB* dcb)
{
int npackets_left;
size_t nbytes_left;
ssize_t nbytes_left;
MySQLProtocol* p;
bool succp;

View File

@ -49,7 +49,7 @@
MODULE_INFO info = {
MODULE_API_PROTOCOL,
MODULE_BETA_RELEASE,
MODULE_GA,
GWPROTOCOL_VERSION,
"The client to MaxScale MySQL protocol implementation"
};
@ -144,7 +144,7 @@ GetModuleObject()
int
mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message) {
uint8_t *outbuf = NULL;
uint8_t mysql_payload_size = 0;
uint32_t mysql_payload_size = 0;
uint8_t mysql_packet_header[4];
uint8_t *mysql_payload = NULL;
uint8_t field_count = 0;
@ -223,7 +223,7 @@ int
MySQLSendHandshake(DCB* dcb)
{
uint8_t *outbuf = NULL;
uint8_t mysql_payload_size = 0;
uint32_t mysql_payload_size = 0;
uint8_t mysql_packet_header[4];
uint8_t mysql_packet_id = 0;
uint8_t mysql_filler = GW_MYSQL_HANDSHAKE_FILLER;
@ -283,7 +283,6 @@ MySQLSendHandshake(DCB* dcb)
// write packet heder with mysql_payload_size
gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size);
//mysql_packet_header[0] = mysql_payload_size;
// write packent number, now is 0
mysql_packet_header[3]= mysql_packet_id;
@ -682,7 +681,8 @@ int gw_read_client_event(
int message_len = 25 + MYSQL_DATABASE_MAXLEN;
fail_str = calloc(1, message_len+1);
snprintf(fail_str, message_len, "Unknown database '%s'", (char*)((MYSQL_session *)dcb->data)->db);
snprintf(fail_str, message_len, "Unknown database '%s'",
(char*)((MYSQL_session *)dcb->data)->db);
modutil_send_mysql_err_packet(dcb, 2, 0, 1049, "42000", fail_str);
} else {

View File

@ -1965,7 +1965,7 @@ void init_response_status (
GWBUF* buf,
mysql_server_cmd_t cmd,
int* npackets,
size_t* nbytes_left)
ssize_t* nbytes_left)
{
uint8_t* packet;
int nparam;
@ -2027,7 +2027,7 @@ void init_response_status (
bool protocol_get_response_status (
MySQLProtocol* p,
int* npackets,
size_t* nbytes)
ssize_t* nbytes)
{
bool succp;
@ -2035,7 +2035,7 @@ bool protocol_get_response_status (
spinlock_acquire(&p->protocol_lock);
*npackets = p->protocol_command.scom_nresponse_packets;
*nbytes = p->protocol_command.scom_nbytes_to_read;
*nbytes = (ssize_t)p->protocol_command.scom_nbytes_to_read;
spinlock_release(&p->protocol_lock);
if (*npackets < 0 && *nbytes == 0)
@ -2053,7 +2053,7 @@ bool protocol_get_response_status (
void protocol_set_response_status (
MySQLProtocol* p,
int npackets_left,
size_t nbytes)
ssize_t nbytes)
{
CHK_PROTOCOL(p);

View File

@ -40,7 +40,7 @@
MODULE_INFO info = {
MODULE_API_PROTOCOL,
MODULE_BETA_RELEASE,
MODULE_GA,
GWPROTOCOL_VERSION,
"A telnet deamon protocol for simple administration interface"
};

View File

@ -19,4 +19,7 @@ target_link_libraries(cli log_manager utils)
install(TARGETS cli DESTINATION modules)
add_subdirectory(readwritesplit)
if(BUILD_BINLOG)
add_subdirectory(binlog)
endif()

View File

@ -59,6 +59,8 @@
#include <mysql_client_server_protocol.h>
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
static char *version_str = "V1.0.6";
@ -186,6 +188,8 @@ int i;
inst->long_burst = DEF_LONG_BURST;
inst->burst_size = DEF_BURST_SIZE;
inst->retry_backoff = 1;
inst->binlogdir = NULL;
inst->heartbeat = 300; // Default is every 5 minutes
/*
* We only support one server behind this router, since the server is
@ -306,6 +310,14 @@ int i;
inst->burst_size = size;
}
else if (strcmp(options[i], "heartbeat") == 0)
{
inst->heartbeat = atoi(value);
}
else if (strcmp(options[i], "binlogdir") == 0)
{
inst->binlogdir = strdup(value);
}
else
{
LOGIF(LE, (skygw_log_write(
@ -416,6 +428,7 @@ ROUTER_SLAVE *slave;
slave->router = inst;
slave->file = NULL;
strcpy(slave->binlogfile, "unassigned");
slave->connect_time = time(0);
/**
* Add this session to the list of active sessions.
@ -509,9 +522,13 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
{
/*
* We must be closing the master session.
*
* TODO: Handle closure of master session
*/
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"%s: Master %s disconnected after %ld seconds. "
"%d events read,",
router->service->name, router->master->remote,
time(0) - router->connect_time, router->stats.n_binlogs_ses)));
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Binlog router close session with master server %s",
@ -529,6 +546,15 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
/* decrease server registered slaves counter */
atomic_add(&router->stats.n_registered, -1);
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"%s: Slave %s, server id %d, disconnected after %ld seconds. "
"%d events sent, %lu bytes.",
router->service->name, slave->dcb->remote,
slave->serverid,
time(0) - slave->connect_time, slave->stats.n_events,
slave->stats.n_bytes)));
/*
* Mark the slave as unregistered to prevent the forwarding
* of any more binlog records to this slave.
@ -641,25 +667,29 @@ struct tm tm;
min5 /= 5.0;
dcb_printf(dcb, "\tMaster connection DCB: %p\n",
dcb_printf(dcb, "\tMaster connection DCB: %p\n",
router_inst->master);
dcb_printf(dcb, "\tMaster connection state: %s\n",
dcb_printf(dcb, "\tMaster connection state: %s\n",
blrm_states[router_inst->master_state]);
localtime_r(&router_inst->stats.lastReply, &tm);
asctime_r(&tm, buf);
dcb_printf(dcb, "\tNumber of master connects: %d\n",
dcb_printf(dcb, "\tBinlog directory: %s\n",
router_inst->binlogdir);
dcb_printf(dcb, "\tNumber of master connects: %d\n",
router_inst->stats.n_masterstarts);
dcb_printf(dcb, "\tNumber of delayed reconnects: %d\n",
dcb_printf(dcb, "\tNumber of delayed reconnects: %d\n",
router_inst->stats.n_delayedreconnects);
dcb_printf(dcb, "\tCurrent binlog file: %s\n",
dcb_printf(dcb, "\tCurrent binlog file: %s\n",
router_inst->binlog_name);
dcb_printf(dcb, "\tCurrent binlog position: %u\n",
dcb_printf(dcb, "\tCurrent binlog position: %u\n",
router_inst->binlog_position);
dcb_printf(dcb, "\tNumber of slave servers: %u\n",
dcb_printf(dcb, "\tNumber of slave servers: %u\n",
router_inst->stats.n_slaves);
dcb_printf(dcb, "\tNumber of binlog events received: %u\n",
dcb_printf(dcb, "\tNo. of binlog events received this session: %u\n",
router_inst->stats.n_binlogs_ses);
dcb_printf(dcb, "\tTotal no. of binlog events received: %u\n",
router_inst->stats.n_binlogs);
minno = router_inst->stats.minno - 1;
if (minno == -1)
@ -668,28 +698,31 @@ struct tm tm;
dcb_printf(dcb, "\tCurrent 5 10 15 30 Min Avg\n");
dcb_printf(dcb, "\t %6d %8.1f %8.1f %8.1f %8.1f\n",
router_inst->stats.minavgs[minno], min5, min10, min15, min30);
dcb_printf(dcb, "\tNumber of fake binlog events: %u\n",
dcb_printf(dcb, "\tNumber of fake binlog events: %u\n",
router_inst->stats.n_fakeevents);
dcb_printf(dcb, "\tNumber of artificial binlog events: %u\n",
dcb_printf(dcb, "\tNumber of artificial binlog events: %u\n",
router_inst->stats.n_artificial);
dcb_printf(dcb, "\tNumber of binlog events in error: %u\n",
dcb_printf(dcb, "\tNumber of binlog events in error: %u\n",
router_inst->stats.n_binlog_errors);
dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n",
dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n",
router_inst->stats.n_rotates);
dcb_printf(dcb, "\tNumber of heartbeat events: %u\n",
dcb_printf(dcb, "\tNumber of heartbeat events: %u\n",
router_inst->stats.n_heartbeats);
dcb_printf(dcb, "\tNumber of packets received: %u\n",
dcb_printf(dcb, "\tNumber of packets received: %u\n",
router_inst->stats.n_reads);
dcb_printf(dcb, "\tNumber of residual data packets: %u\n",
dcb_printf(dcb, "\tNumber of residual data packets: %u\n",
router_inst->stats.n_residuals);
dcb_printf(dcb, "\tAverage events per packet %.1f\n",
dcb_printf(dcb, "\tAverage events per packet %.1f\n",
(double)router_inst->stats.n_binlogs / router_inst->stats.n_reads);
dcb_printf(dcb, "\tLast event from master at: %s",
dcb_printf(dcb, "\tLast event from master at: %s",
buf);
dcb_printf(dcb, "\t (%d seconds ago)\n",
time(0) - router_inst->stats.lastReply);
dcb_printf(dcb, "\tLast event from master: 0x%x\n",
router_inst->lastEventReceived);
dcb_printf(dcb, "\tLast event from master: 0x%x (%s)\n",
router_inst->lastEventReceived,
(router_inst->lastEventReceived >= 0 &&
router_inst->lastEventReceived < 0x24) ?
event_names[router_inst->lastEventReceived] : "unknown");
if (router_inst->active_logs)
dcb_printf(dcb, "\tRouter processing binlog records\n");
if (router_inst->reconnect_pending)
@ -697,7 +730,7 @@ struct tm tm;
dcb_printf(dcb, "\tEvents received:\n");
for (i = 0; i < 0x24; i++)
{
dcb_printf(dcb, "\t\t%-38s: %u\n", event_names[i], router_inst->stats.events[i]);
dcb_printf(dcb, "\t\t%-38s %u\n", event_names[i], router_inst->stats.events[i]);
}
#if SPINLOCK_PROFILE
@ -739,19 +772,44 @@ struct tm tm;
min15 /= 15.0;
min10 /= 10.0;
min5 /= 5.0;
dcb_printf(dcb, "\t\tServer-id: %d\n", session->serverid);
dcb_printf(dcb,
"\t\tServer-id: %d\n",
session->serverid);
if (session->hostname)
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
dcb_printf(dcb, "\t\tSlave DCB: %p\n", session->dcb);
dcb_printf(dcb, "\t\tNext Sequence No: %d\n", session->seqno);
dcb_printf(dcb, "\t\tState: %s\n", blrs_states[session->state]);
dcb_printf(dcb, "\t\tBinlog file: %s\n", session->binlogfile);
dcb_printf(dcb, "\t\tBinlog position: %u\n", session->binlog_pos);
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
dcb_printf(dcb,
"\t\tSlave: %d\n",
session->dcb->remote);
dcb_printf(dcb,
"\t\tSlave DCB: %p\n",
session->dcb);
dcb_printf(dcb,
"\t\tNext Sequence No: %d\n",
session->seqno);
dcb_printf(dcb,
"\t\tState: %s\n",
blrs_states[session->state]);
dcb_printf(dcb,
"\t\tBinlog file: %s\n",
session->binlogfile);
dcb_printf(dcb,
"\t\tBinlog position: %u\n",
session->binlog_pos);
if (session->nocrc)
dcb_printf(dcb, "\t\tMaster Binlog CRC: None\n");
dcb_printf(dcb, "\t\tNo. requests: %u\n", session->stats.n_requests);
dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events);
dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts);
dcb_printf(dcb,
"\t\tMaster Binlog CRC: None\n");
dcb_printf(dcb,
"\t\tNo. requests: %u\n",
session->stats.n_requests);
dcb_printf(dcb,
"\t\tNo. events sent: %u\n",
session->stats.n_events);
dcb_printf(dcb,
"\t\tNo. bursts sent: %u\n",
session->stats.n_bursts);
dcb_printf(dcb,
"\t\tNo. transitions to follow mode: %u\n",
session->stats.n_bursts);
minno = session->stats.minno - 1;
if (minno == -1)
minno = 30;
@ -760,15 +818,18 @@ struct tm tm;
dcb_printf(dcb, "\t\t %6d %8.1f %8.1f %8.1f %8.1f\n",
session->stats.minavgs[minno], min5, min10,
min15, min30);
dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows);
dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd);
dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb);
dcb_printf(dcb, "\t\tNo. of low water cbs N/A %u\n", session->stats.n_cbna);
dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read);
dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun);
dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]);
dcb_printf(dcb, "\t\tNo. of distribute action 2 %u\n", session->stats.n_actions[1]);
dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]);
dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows);
dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd);
dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb);
dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read);
#if DETAILED_DIAG
dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun);
dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]);
dcb_printf(dcb, "\t\tNo. of distribute action 2 %u\n", session->stats.n_actions[1]);
dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]);
#endif
if ((session->cstate & CS_UPTODATE) == 0)
{
dcb_printf(dcb, "\t\tSlave is in catchup mode. %s%s\n",
@ -793,7 +854,7 @@ struct tm tm;
dcb_printf(dcb, "\tSpinlock statistics (rses_lock):\n");
spinlock_stats(&session->rses_lock, spin_reporter, dcb);
#endif
dcb_printf(dcb, "\n");
dcb_printf(dcb, "\t\t--------------------\n\n");
session = session->next;
}
spinlock_release(&router_inst->lock);
@ -822,6 +883,24 @@ ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
router->stats.lastReply = time(0);
}
static char *
extract_message(GWBUF *errpkt)
{
char *rval;
int len;
len = EXTRACT24(errpkt->start);
if ((rval = (char *)malloc(len)) == NULL)
return NULL;
memcpy(rval, (char *)(errpkt->start) + 7, 6);
rval[6] = ' ';
memcpy(&rval[7], (char *)(errpkt->start) + 13, len - 8);
rval[len-2] = 0;
return rval;
}
/**
* Error Reply routine
*
@ -841,10 +920,10 @@ errorReply(ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
int error, len;
char msg[85];
char msg[85], *errmsg;
len = sizeof(error);
if (getsockopt(router->master->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0)
if (router->master && getsockopt(router->master->fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0 && error != 0)
{
strerror_r(error, msg, 80);
strcat(msg, " ");
@ -852,10 +931,21 @@ char msg[85];
else
strcpy(msg, "");
errmsg = extract_message(message);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, "Master connection '%s', %sattempting reconnect to master",
message, msg)));
LOGFILE_ERROR, "%s: Master connection error '%s' in state '%s', "
"%sattempting reconnect to master",
router->service->name, errmsg,
blrm_states[router->master_state], msg)));
if (errmsg)
free(errmsg);
*succp = true;
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"%s: Master %s disconnected after %ld seconds. "
"%d events read.",
router->service->name, router->master->remote,
time(0) - router->connect_time, router->stats.n_binlogs_ses)));
blr_master_reconnect(router);
}

View File

@ -54,6 +54,8 @@
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
/**

View File

@ -51,6 +51,8 @@
#include <log_manager.h>
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
static void blr_file_create(ROUTER_INSTANCE *router, char *file);
@ -75,18 +77,27 @@ int root_len, i;
DIR *dirp;
struct dirent *dp;
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
if (router->binlogdir == NULL)
{
strcpy(path, ptr);
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
{
strcpy(path, ptr);
}
strcat(path, "/");
strcat(path, router->service->name);
if (access(path, R_OK) == -1)
mkdir(path, 0777);
router->binlogdir = strdup(path);
}
if (access(router->binlogdir, R_OK) == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"%s: Unable to read the binlog directory %s.",
router->service->name, router->binlogdir)));
}
strcat(path, "/");
strcat(path, router->service->name);
if (access(path, R_OK) == -1)
mkdir(path, 0777);
router->binlogdir = strdup(path);
/* First try to find a binlog file number by reading the directory */
root_len = strlen(router->fileroot);
@ -353,7 +364,7 @@ struct stat statb;
"Short read when reading the header. "
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %d",
file->binlogname, pos, n)));
n, file->binlogname, pos)));
break;
}
return NULL;
@ -364,6 +375,17 @@ struct stat statb;
hdr->event_size = extract_field(&hdbuf[9], 32);
hdr->next_pos = EXTRACT32(&hdbuf[13]);
hdr->flags = EXTRACT16(&hdbuf[17]);
if (hdr->event_type > MAX_EVENT_TYPE)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Invalid event type 0x%x. "
"Binlog file is %s, position %d",
hdr->event_type,
file->binlogname, pos)));
return NULL;
}
if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,

View File

@ -63,6 +63,8 @@
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
static GWBUF *blr_make_query(char *statement);
static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
@ -91,6 +93,18 @@ blr_start_master(ROUTER_INSTANCE *router)
DCB *client;
GWBUF *buf;
router->stats.n_binlogs_ses = 0;
spinlock_acquire(&router->lock);
if (router->master_state != BLRM_UNCONNECTED)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"%s: Master Connect: Unexpected master state %s\n",
router->service->name, blrm_states[router->master_state])));
spinlock_release(&router->lock);
return;
}
router->master_state = BLRM_CONNECTING;
spinlock_release(&router->lock);
if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
@ -98,6 +112,7 @@ GWBUF *buf;
return;
}
router->client = client;
client->state = DCB_STATE_POLLING; /* Fake the client is reading */
client->data = CreateMySQLAuthData(router->user, router->password, "");
if ((router->session = session_alloc(router->service, client)) == NULL)
{
@ -108,17 +123,27 @@ GWBUF *buf;
client->session = router->session;
if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL)
{
char *name = malloc(strlen(router->service->name) + strlen(" Master") + 1);
sprintf(name, "%s Master", router->service->name);
hktask_oneshot(name, blr_start_master, router,
BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
char *name;
if ((name = malloc(strlen(router->service->name)
+ strlen(" Master") + 1)) != NULL)
{
sprintf(name, "%s Master", router->service->name);
hktask_oneshot(name, blr_start_master, router,
BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
}
if (router->retry_backoff > BLR_MAX_BACKOFF)
router->retry_backoff = 1;
router->retry_backoff = BLR_MAX_BACKOFF;
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Binlog router: failed to connect to master server '%s'",
router->service->databases->unique_name)));
return;
}
router->master->remote = strdup(router->service->databases->name);
LOGIF(LM,(skygw_log_write(
LOGFILE_MESSAGE,
"%s: atempting to connect to master server %s.",
router->service->name, router->master->remote)));
router->connect_time = time(0);
if (setsockopt(router->master->fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive , sizeof(keepalive )))
perror("setsockopt");
@ -129,7 +154,6 @@ perror("setsockopt");
router->master_state = BLRM_TIMESTAMP;
router->stats.n_masterstarts++;
router->retry_backoff = 1;
}
/**
@ -160,7 +184,27 @@ GWBUF *ptr;
router->reconnect_pending = 0;
router->active_logs = 0;
spinlock_release(&router->lock);
blr_start_master(router);
if (router->master_state < BLRM_BINLOGDUMP)
{
char *name;
router->master_state = BLRM_UNCONNECTED;
if ((name = malloc(strlen(router->service->name)
+ strlen(" Master")+1)) != NULL);
{
sprintf(name, "%s Master", router->service->name);
hktask_oneshot(name, blr_start_master, router,
BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
}
if (router->retry_backoff > BLR_MAX_BACKOFF)
router->retry_backoff = BLR_MAX_BACKOFF;
}
else
{
router->master_state = BLRM_UNCONNECTED;
blr_start_master(router);
}
}
/**
@ -225,8 +269,9 @@ char query[128];
if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.",
router->master_state)));
LOGFILE_ERROR,
"Invalid master state machine state (%d) for binlog router.",
router->master_state)));
gwbuf_consume(buf, gwbuf_length(buf));
spinlock_acquire(&router->lock);
if (router->reconnect_pending)
@ -234,6 +279,12 @@ char query[128];
router->active_logs = 0;
spinlock_release(&router->lock);
atomic_add(&router->handling_threads, -1);
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"%s: Pending reconnect in state %s.",
router->service->name,
blrm_states[router->master_state]
)));
blr_restart_master(router);
return;
}
@ -247,8 +298,11 @@ char query[128];
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Received error: %d, %s from master during %s phase of the master state machine.",
MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state]
"%s: Received error: %d, %s from master during %s phase "
"of the master state machine.",
router->service->name,
MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf),
blrm_states[router->master_state]
)));
gwbuf_consume(buf, gwbuf_length(buf));
spinlock_acquire(&router->lock);
@ -272,12 +326,17 @@ char query[128];
buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'");
router->master_state = BLRM_SERVERID;
router->master->func.write(router->master, buf);
router->retry_backoff = 1;
break;
case BLRM_SERVERID:
// Response to fetch of master's server-id
router->saved_master.server_id = buf;
// TODO: Extract the value of server-id and place in router->master_id
buf = blr_make_query("SET @master_heartbeat_period = 1799999979520");
{
char str[80];
sprintf(str, "SET @master_heartbeat_period = %lu000000000", router->heartbeat);
buf = blr_make_query(str);
}
router->master_state = BLRM_HBPERIOD;
router->master->func.write(router->master, buf);
break;
@ -357,6 +416,12 @@ char query[128];
buf = blr_make_binlog_dump(router);
router->master_state = BLRM_BINLOGDUMP;
router->master->func.write(router->master, buf);
LOGIF(LM,(skygw_log_write(
LOGFILE_MESSAGE,
"%s: Request binlog records from %s at "
"position %d from master server %s.",
router->service->name, router->binlog_name,
router->binlog_position, router->master->remote)));
break;
case BLRM_BINLOGDUMP:
// Main body, we have received a binlog record from the master
@ -618,133 +683,172 @@ static REP_HEADER phdr;
n_bufs = 1;
}
blr_extract_header(ptr, &hdr);
if (hdr.event_size != len - 5)
if (len < BINLOG_EVENT_HDR_LEN)
{
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"Packet length is %d, but event size is %d, "
"binlog file %s position %d"
"reslen is %d and preslen is %d, "
"length of previous event %d. %s",
len, hdr.event_size,
router->binlog_name,
router->binlog_position,
reslen, preslen, prev_length,
(prev_length == -1 ?
(no_residual ? "No residual data from previous call" : "Residual data from previous call") : "")
)));
blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len);
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"This event (0x%x) was contained in %d GWBUFs, "
"the previous events was contained in %d GWBUFs",
router->lastEventReceived, n_bufs, pn_bufs)));
if (msg)
char *msg = "";
if (ptr[4] == 0xfe) /* EOF Packet */
{
free(msg);
msg = NULL;
msg = "end of file";
}
break;
else if (ptr[4] == 0xff) /* EOF Packet */
{
msg = "error";
}
LOGIF(LM,(skygw_log_write(
LOGFILE_MESSAGE,
"Non-event message (%s) from master.",
msg)));
}
phdr = hdr;
if (hdr.ok == 0)
else
{
router->stats.n_binlogs++;
router->stats.n_binlogs_ses++;
router->lastEventReceived = hdr.event_type;
blr_extract_header(ptr, &hdr);
if (hdr.event_size != len - 5)
{
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"Packet length is %d, but event size is %d, "
"binlog file %s position %d "
"reslen is %d and preslen is %d, "
"length of previous event %d. %s",
len, hdr.event_size,
router->binlog_name,
router->binlog_position,
reslen, preslen, prev_length,
(prev_length == -1 ?
(no_residual ? "No residual data from previous call" : "Residual data from previous call") : "")
)));
blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len);
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"This event (0x%x) was contained in %d GWBUFs, "
"the previous events was contained in %d GWBUFs",
router->lastEventReceived, n_bufs, pn_bufs)));
if (msg)
{
free(msg);
msg = NULL;
}
break;
}
phdr = hdr;
if (hdr.ok == 0)
{
router->stats.n_binlogs++;
router->lastEventReceived = hdr.event_type;
// #define SHOW_EVENTS
#ifdef SHOW_EVENTS
printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size);
printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size);
#endif
if (hdr.event_type >= 0 && hdr.event_type < 0x24)
router->stats.events[hdr.event_type]++;
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
{
// Fake format description message
LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG,
"Replication fake event. "
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->stats.n_fakeevents++;
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
if (hdr.event_type >= 0 && hdr.event_type < 0x24)
router->stats.events[hdr.event_type]++;
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
{
/*
* We need to save this to replay to new
* slaves that attach later.
*/
if (router->saved_master.fde_event)
free(router->saved_master.fde_event);
router->saved_master.fde_len = hdr.event_size;
router->saved_master.fde_event = malloc(hdr.event_size);
if (router->saved_master.fde_event)
memcpy(router->saved_master.fde_event,
ptr + 5, hdr.event_size);
// Fake format description message
LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG,
"Replication fake event. "
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->stats.n_fakeevents++;
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
{
uint8_t *new_fde;
unsigned int new_fde_len;
/*
* We need to save this to replay to new
* slaves that attach later.
*/
new_fde_len = hdr.event_size;
new_fde = malloc(hdr.event_size);
if (new_fde)
{
memcpy(new_fde, ptr + 5, hdr.event_size);
if (router->saved_master.fde_event)
free(router->saved_master.fde_event);
router->saved_master.fde_event = new_fde;
router->saved_master.fde_len = new_fde_len;
}
else
{
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"%s: Received a format description "
"event that MaxScale was unable to "
"record. Event length is %d.",
router->service->name,
hdr.event_size)));
blr_log_packet(LOGFILE_ERROR,
"Format Description Event:", ptr, len);
}
}
}
else
{
if (hdr.event_type == HEARTBEAT_EVENT)
{
#ifdef SHOW_EVENTS
printf("Replication heartbeat\n");
#endif
LOGIF(LD,(skygw_log_write(
LOGFILE_DEBUG,
"Replication heartbeat. "
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->stats.n_heartbeats++;
}
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
{
ptr = ptr + 5; // We don't put the first byte of the payload
// into the binlog file
if (hdr.event_type == ROTATE_EVENT)
router->rotating = 1;
blr_write_binlog_record(router, &hdr, ptr);
if (hdr.event_type == ROTATE_EVENT)
{
blr_rotate_event(router, ptr, &hdr);
}
blr_distribute_binlog_record(router, &hdr, ptr);
}
else
{
router->stats.n_artificial++;
LOGIF(LD,(skygw_log_write(
LOGFILE_DEBUG,
"Artificial event not written "
"to disk or distributed. "
"Type 0x%x, Length %d, Binlog "
"%s @ %d.",
hdr.event_type,
hdr.event_size,
router->binlog_name,
router->binlog_position)));
ptr += 5;
if (hdr.event_type == ROTATE_EVENT)
{
router->rotating = 1;
blr_rotate_event(router, ptr, &hdr);
}
}
}
}
else
{
if (hdr.event_type == HEARTBEAT_EVENT)
{
#ifdef SHOW_EVENTS
printf("Replication heartbeat\n");
#endif
LOGIF(LD,(skygw_log_write(
LOGFILE_DEBUG,
"Replication heartbeat. "
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->stats.n_heartbeats++;
}
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
{
ptr = ptr + 5; // We don't put the first byte of the payload
// into the binlog file
if (hdr.event_type == ROTATE_EVENT)
router->rotating = 1;
blr_write_binlog_record(router, &hdr, ptr);
if (hdr.event_type == ROTATE_EVENT)
{
blr_rotate_event(router, ptr, &hdr);
}
blr_distribute_binlog_record(router, &hdr, ptr);
}
else
{
router->stats.n_artificial++;
LOGIF(LD,(skygw_log_write(
LOGFILE_DEBUG,
"Artificial event not written "
"to disk or distributed. "
"Type 0x%x, Length %d, Binlog "
"%s @ %d.",
hdr.event_type,
hdr.event_size,
router->binlog_name,
router->binlog_position)));
ptr += 5;
if (hdr.event_type == ROTATE_EVENT)
{
router->rotating = 1;
blr_rotate_event(router, ptr, &hdr);
}
}
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"Error packet in binlog stream.%s @ %d.",
router->binlog_name,
router->binlog_position)));
blr_log_packet(LOGFILE_ERROR, "Error Packet:",
ptr, len);
router->stats.n_binlog_errors++;
}
}
else
{
printf("Binlog router error: %s\n", &ptr[7]);
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"Error packet in binlog stream.%s @ %d.",
router->binlog_name,
router->binlog_position)));
blr_log_packet(LOGFILE_ERROR, "Error Packet:",
ptr, len);
router->stats.n_binlog_errors++;
}
if (msg)
{
@ -968,6 +1072,7 @@ int action;
{
blr_slave_rotate(slave, ptr);
}
slave->stats.n_bytes += gwbuf_length(pkt);
slave->dcb->func.write(slave->dcb, pkt);
if (hdr->event_type != ROTATE_EVENT)
{

View File

@ -65,8 +65,11 @@ int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static void blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
/**
* Process a request packet from the slave server.
@ -544,29 +547,8 @@ uint32_t chksum;
rval = slave->dcb->func.write(slave->dcb, resp);
/* Send the FORMAT_DESCRIPTION_EVENT */
if (router->saved_master.fde_event)
{
resp = gwbuf_alloc(router->saved_master.fde_len + 5);
ptr = GWBUF_DATA(resp);
encode_value(ptr, router->saved_master.fde_len + 1, 24); // Payload length
ptr += 3;
*ptr++ = slave->seqno++;
*ptr++ = 0; // OK
memcpy(ptr, router->saved_master.fde_event, router->saved_master.fde_len);
encode_value(ptr, time(0), 32); // Overwrite timestamp
/*
* Since we have changed the timestamp we must recalculate the CRC
*
* Position ptr to the start of the event header,
* calculate a new checksum
* and write it into the header
*/
ptr = GWBUF_DATA(resp) + 5 + router->saved_master.fde_len - 4;
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, GWBUF_DATA(resp) + 5, router->saved_master.fde_len - 4);
encode_value(ptr, chksum, 32);
rval = slave->dcb->func.write(slave->dcb, resp);
}
if (slave->binlog_pos != 4)
blr_slave_send_fde(router, slave);
slave->dcb->low_water = router->low_water;
slave->dcb->high_water = router->high_water;
@ -575,8 +557,9 @@ uint32_t chksum;
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"%s: New slave %s requested binlog file %s from position %lu",
"%s: New slave %s, server id %d, requested binlog file %s from position %lu",
router->service->name, slave->dcb->remote,
slave->serverid,
slave->binlogfile, slave->binlog_pos)));
if (slave->binlog_pos != router->binlog_position ||
@ -782,6 +765,7 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "blr_open_binlog took %d beats",
hkheartbeat - beat1)));
}
slave->stats.n_bytes += gwbuf_length(head);
written = slave->dcb->func.write(slave->dcb, head);
if (written && hdr.event_type != ROTATE_EVENT)
{
@ -839,11 +823,23 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
if (state_change)
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"%s: Slave %s is up to date %s, %u.",
slave->stats.n_caughtup++;
if (slave->stats.n_caughtup == 1)
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"%s: Slave %s is up to date %s, %u.",
router->service->name,
slave->dcb->remote,
slave->binlogfile, slave->binlog_pos)));
}
else if ((slave->stats.n_caughtup % 50) == 0)
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"%s: Slave %s is up to date %s, %u.",
router->service->name,
slave->dcb->remote,
slave->binlogfile, slave->binlog_pos)));
}
}
}
else
@ -1031,3 +1027,51 @@ uint32_t chksum;
slave->dcb->func.write(slave->dcb, resp);
return 1;
}
/**
* Send a "fake" format description event to the newly connected slave
*
* @param router The router instance
* @param slave The slave to send the event to
*/
static void
blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
{
BLFILE *file;
REP_HEADER hdr;
GWBUF *record, *head;
uint8_t *ptr;
uint32_t chksum;
if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL)
return;
if ((record = blr_read_binlog(router, file, 4, &hdr)) == NULL)
{
blr_close_binlog(router, file);
return;
}
blr_close_binlog(router, file);
head = gwbuf_alloc(5);
ptr = GWBUF_DATA(head);
encode_value(ptr, hdr.event_size + 1, 24); // Payload length
ptr += 3;
*ptr++ = slave->seqno++;
*ptr++ = 0; // OK
head = gwbuf_append(head, record);
ptr = GWBUF_DATA(record);
encode_value(ptr, time(0), 32); // Overwrite timestamp
ptr += 13;
encode_value(ptr, 0, 32); // Set next position to 0
/*
* Since we have changed the timestamp we must recalculate the CRC
*
* Position ptr to the start of the event header,
* calculate a new checksum
* and write it into the header
*/
ptr = GWBUF_DATA(record) + hdr.event_size - 4;
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, GWBUF_DATA(record), hdr.event_size - 4);
encode_value(ptr, chksum, 32);
slave->dcb->func.write(slave->dcb, head);
}

View File

@ -48,7 +48,7 @@
MODULE_INFO info = {
MODULE_API_ROUTER,
MODULE_BETA_RELEASE,
MODULE_GA,
ROUTER_VERSION,
"The admin user interface"
};

View File

@ -47,7 +47,7 @@
MODULE_INFO info = {
MODULE_API_ROUTER,
MODULE_BETA_RELEASE,
MODULE_GA,
ROUTER_VERSION,
"The debug user interface"
};

View File

@ -96,7 +96,7 @@ extern __thread log_info_t tls_log_info;
MODULE_INFO info = {
MODULE_API_ROUTER,
MODULE_BETA_RELEASE,
MODULE_GA,
ROUTER_VERSION,
"A connection based router to load balance based on connections"
};

View File

@ -37,7 +37,7 @@
MODULE_INFO info = {
MODULE_API_ROUTER,
MODULE_BETA_RELEASE,
MODULE_GA,
ROUTER_VERSION,
"A Read/Write splitting router for enhancement read scalability"
};
@ -313,8 +313,8 @@ static int hashkeyfun(
}
static int hashcmpfun(
void* v1,
void* v2)
void* v1,
void* v2)
{
char* i1 = (char*) v1;
char* i2 = (char*) v2;
@ -374,6 +374,15 @@ ROUTER_OBJECT* GetModuleObject()
return &MyObject;
}
/**
* Refresh the instance by hte given parameter value.
*
* @param router Router instance
* @param singleparam Parameter fo be reloaded
*
* Note: this part is not done. Needs refactoring.
*/
static void refreshInstance(
ROUTER_INSTANCE* router,
CONFIG_PARAMETER* singleparam)
@ -987,6 +996,14 @@ static void closeSession(
}
}
/**
* When router session is closed, freeSession can be called to free allocated
* resources.
*
* @param router_instance The router instance the session belongs to
* @param router_client_session Client session
*
*/
static void freeSession(
ROUTER* router_instance,
void* router_client_session)
@ -1645,10 +1662,10 @@ skygw_query_type_t is_read_tmp_table(
* @param type The type of the query resolved so far
*/
void check_create_tmp_table(
ROUTER* instance,
void* router_session,
GWBUF* querybuf,
skygw_query_type_t type)
ROUTER* instance,
void* router_session,
GWBUF* querybuf,
skygw_query_type_t type)
{
int klen = 0;

View File

@ -1412,7 +1412,6 @@ int simple_mutex_unlock(
err,
strerror(errno));
perror("simple_mutex : ");
ss_dassert(sm->sm_mutex.__data.__nusers >= 0);
} else {
/**
* Note that these updates are not protected.