Merge branch 'release-1.0GA' of https://github.com/mariadb-corporation/MaxScale into release-1.0GA

This commit is contained in:
VilhoRaatikka 2014-12-05 23:57:02 +02:00
commit b1eaaea961
21 changed files with 733 additions and 329 deletions

View File

@ -1,3 +1,6 @@
if(LOG_DEBUG)
add_definitions(-DSS_LOG_DEBUG)
endif()
add_library(log_manager SHARED log_manager.cc)
target_link_libraries(log_manager pthread aio stdc++)
install(TARGETS log_manager DESTINATION lib)

View File

@ -1093,7 +1093,7 @@ static char* blockbuf_get_writepos(
simple_mutex_unlock(&bb->bb_mutex);
simple_mutex_lock(&bb_list->mlist_mutex, true);
node = bb_list->mlist_first;
}
else
{
@ -3111,9 +3111,9 @@ static int find_last_seqno(
{
if (snstr != NULL && i == seqnoidx)
{
strcat(filename, snstr); /*< add sequence number */
strncat(filename, snstr, NAME_MAX - 1); /*< add sequence number */
}
strcat(filename, p->sp_string);
strncat(filename, p->sp_string, NAME_MAX - 1);
if (p->sp_next == NULL)
{

View File

@ -32,11 +32,13 @@
* x.y.z.%, x.y.%.%, x.%.%.%
* 03/10/14 Massimiliano Pinto Added netmask to user@host authentication for wildcard in IPv4 hosts
* 13/10/14 Massimiliano Pinto Added (user@host)@db authentication
* 04/12/14 Massimiliano Pinto Added support for IPv$ wildcard hosts: a.%, a.%.% and a.b.%
*
* @endverbatim
*/
#include <stdio.h>
#include <ctype.h>
#include <mysql.h>
#include <dcb.h>
@ -82,6 +84,7 @@ void resource_free(HASHTABLE *resource);
void *resource_fetch(HASHTABLE *, char *);
int resource_add(HASHTABLE *, char *, char *);
int resource_hash(char *);
static int normalize_hostname(char *input_host, char *output_host);
/**
* Load the user/passwd form mysql.user table into the service users' hashtable
@ -217,8 +220,6 @@ int add_mysql_users_with_host_ipv4(USERS *users, char *user, char *host, char *p
struct sockaddr_in serv_addr;
MYSQL_USER_HOST key;
char ret_ip[INET_ADDRSTRLEN + 1]="";
int found_range=0;
int found_any=0;
int ret = 0;
if (users == NULL || user == NULL || host == NULL) {
@ -255,42 +256,30 @@ int add_mysql_users_with_host_ipv4(USERS *users, char *user, char *host, char *p
/* ANY */
if (strcmp(host, "%") == 0) {
strcpy(ret_ip, "0.0.0.0");
found_any = 1;
key.netmask = 0;
} else {
char *tmp;
strncpy(ret_ip, host, INET_ADDRSTRLEN);
tmp = ret_ip+strlen(ret_ip)-1;
/* hostname without % wildcards has netmask = 32 */
key.netmask = normalize_hostname(host, ret_ip);
/* start from Class C */
while(tmp > ret_ip) {
if (*tmp == '%') {
/* set only the last IPv4 byte to 1
* avoiding setipadress() failure
* for Class C address
*/
found_range++;
if (found_range == 1)
*tmp = '1';
else
*tmp = '0';
}
tmp--;
if (key.netmask == -1) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : strdup() failed in normalize_hostname for %s@%s",
user,
host)));
}
}
/* fill IPv4 data struct */
if (setipaddress(&serv_addr.sin_addr, ret_ip)) {
if (setipaddress(&serv_addr.sin_addr, ret_ip) && strlen(ret_ip)) {
/* copy IPv4 data into key.ipv4 */
memcpy(&key.ipv4, &serv_addr, sizeof(serv_addr));
if (found_range) {
/* let's zero the last IP byte: a.b.c.0 we set above to 1*/
/* if netmask < 32 there are % wildcards */
if (key.netmask < 32) {
/* let's zero the last IP byte: a.b.c.0 we may have set above to 1*/
key.ipv4.sin_addr.s_addr &= 0x00FFFFFF;
key.netmask = 32 - (found_range * 8);
} else {
key.netmask = 32 - (found_any * 32);
}
/* add user@host as key and passwd as value in the MySQL users hash table */
@ -1120,3 +1109,87 @@ resource_fetch(HASHTABLE *resources, char *key)
return hashtable_fetch(resources, key);
}
/**
* Normalize hostname with % wildcards to a valid IP string.
*
* Valid input values:
* a.b.c.d, a.b.c.%, a.b.%.%, a.%.%.%
* Short formats a.% and a.%.% are both converted to a.%.%.%
* Short format a.b.% is converted to a.b.%.%
*
* Last host byte is set to 1, avoiding setipadress() failure
*
* @param input_host The hostname with possible % wildcards
* @param output_host The normalized hostname (buffer must be preallocated)
* @return The calculated netmask or -1 on failure
*/
static int normalize_hostname(char *input_host, char *output_host)
{
int netmask, bytes, bits = 0, found_wildcard = 0;
char *p, *lasts, *tmp;
int useorig = 0;
output_host[0] = 0;
bytes = 0;
tmp = strdup(input_host);
if (tmp == NULL) {
return -1;
}
p = strtok_r(tmp, ".", &lasts);
while (p != NULL)
{
if (strcmp(p, "%"))
{
if (! isdigit(*p))
useorig = 1;
strcat(output_host, p);
bits += 8;
}
else if (bytes == 3)
{
found_wildcard = 1;
strcat(output_host, "1");
}
else
{
found_wildcard = 1;
strcat(output_host, "0");
}
bytes++;
p = strtok_r(NULL, ".", &lasts);
if (p)
strcat(output_host, ".");
}
if (found_wildcard)
{
netmask = bits;
while (bytes++ < 4)
{
if (bytes == 4)
{
strcat(output_host, ".1");
}
else
{
strcat(output_host, ".0");
}
}
}
else
netmask = 32;
if (useorig == 1)
{
netmask = 32;
strcpy(output_host, input_host);
}
free(tmp);
return netmask;
}

View File

@ -332,8 +332,7 @@ DOWNSTREAM *me;
if ((filter->obj = load_module(filter->module,
MODULE_FILTER)) == NULL)
{
me = NULL;
goto retblock;
return NULL;
}
}
@ -342,8 +341,7 @@ DOWNSTREAM *me;
if ((filter->filter = (filter->obj->createInstance)(filter->options,
filter->parameters)) == NULL)
{
me = NULL;
goto retblock;
return NULL;
}
}
if ((me = (DOWNSTREAM *)calloc(1, sizeof(DOWNSTREAM))) == NULL)
@ -355,7 +353,7 @@ DOWNSTREAM *me;
errno,
strerror(errno))));
goto retblock;
return NULL;
}
me->instance = filter->filter;
me->routeQuery = (void *)(filter->obj->routeQuery);
@ -363,12 +361,10 @@ DOWNSTREAM *me;
if ((me->session=filter->obj->newSession(me->instance, session)) == NULL)
{
free(me);
me = NULL;
goto retblock;
return NULL;
}
filter->obj->setDownstream(me->instance, me->session, downstream);
retblock:
return me;
}

View File

@ -637,55 +637,63 @@ static bool resolve_maxscale_homedir(
}
check_home_dir:
if (*p_home_dir != NULL)
{
if (!file_is_readable(*p_home_dir))
{
char* tailstr = "MaxScale doesn't have read permission "
"to MAXSCALE_HOME.";
char* logstr = (char*)malloc(strlen(log_context)+
1+
strlen(tailstr)+
1);
snprintf(logstr,
strlen(log_context)+
1+
strlen(tailstr)+1,
"%s:%s",
log_context,
tailstr);
print_log_n_stderr(true, true, logstr, logstr, 0);
free(logstr);
goto return_succp;
}
#if WRITABLE_HOME
if (!file_is_writable(*p_home_dir))
{
char* tailstr = "MaxScale doesn't have write permission "
"to MAXSCALE_HOME. Exiting.";
char* logstr = (char*)malloc(strlen(log_context)+
1+
strlen(tailstr)+
1);
snprintf(logstr,
strlen(log_context)+
1+
strlen(tailstr)+1,
"%s:%s",
log_context,
tailstr);
print_log_n_stderr(true, true, logstr, logstr, 0);
free(logstr);
goto return_succp;
}
#endif
if (!daemon_mode)
{
fprintf(stderr,
"Using %s as MAXSCALE_HOME = %s\n",
log_context,
tmp);
}
succp = true;
goto return_succp;
}
return_succp:
free (tmp);
if (*p_home_dir != NULL)
{
char* errstr;
errstr = check_dir_access(*p_home_dir);
if (errstr != NULL)
{
char* logstr = (char*)malloc(strlen(log_context)+
1+
strlen(errstr)+
1);
snprintf(logstr,
strlen(log_context)+
1+
strlen(errstr)+1,
"%s: %s",
log_context,
errstr);
print_log_n_stderr(true, true, logstr, logstr, 0);
free(errstr);
free(logstr);
succp = false;
}
else
{
succp = true;
if (!daemon_mode)
{
fprintf(stderr,
"Using %s as MAXSCALE_HOME = %s\n",
log_context,
(tmp == NULL ? *p_home_dir : tmp));
}
}
}
else
{
succp = false;
}
if (tmp != NULL)
{
free(tmp);
}
if (log_context != NULL)
{

View File

@ -293,7 +293,7 @@ GWBUF *modutil_create_mysql_err_msg(
const char *msg)
{
uint8_t *outbuf = NULL;
uint8_t mysql_payload_size = 0;
uint32_t mysql_payload_size = 0;
uint8_t mysql_packet_header[4];
uint8_t *mysql_payload = NULL;
uint8_t field_count = 0;

View File

@ -227,8 +227,9 @@ static int reported = 0;
*/
int secrets_writeKeys(char *secret_file)
{
int fd;
MAXKEYS key;
int fd,randfd;
unsigned int randval;
MAXKEYS key;
/* Open for writing | Create | Truncate the file for writing */
if ((fd = open(secret_file, O_CREAT | O_WRONLY | O_TRUNC, S_IRUSR)) < 0)
@ -243,7 +244,28 @@ MAXKEYS key;
return 1;
}
srand(time(NULL));
/* Open for writing | Create | Truncate the file for writing */
if ((randfd = open("/dev/random", O_RDONLY)) < 0)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : failed opening /dev/random. Error %d, %s.",
errno,
strerror(errno))));
return 1;
}
if(read(randfd,(void*)&randval,sizeof(unsigned int)) < 1)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : failed to read /dev/random.")));
close(randfd);
return 1;
}
close(randfd);
srand(randval);
secrets_random_str(key.enckey, MAXSCALE_KEYLEN);
secrets_random_str(key.initvector, MAXSCALE_IV_LEN);

View File

@ -231,9 +231,9 @@ int set_and_get_mysql_users_wildcards(char *username, char *hostname, char *pass
service->users = mysql_users;
if (db_from != NULL)
strcpy(data->db, db_from);
strncpy(data->db, db_from,MYSQL_DATABASE_MAXLEN);
else
strcpy(data->db, "");
strncpy(data->db, "",MYSQL_DATABASE_MAXLEN);
/* freed by dcb_free(dcb) */
dcb->data = data;
@ -392,6 +392,22 @@ int main() {
if (!ret) fprintf(stderr, "\t-- Expecting ok\n");
assert(ret == 0);
ret = set_and_get_mysql_users_wildcards("pippo", "192.%", "foo", "192.254.254.242", NULL, NULL, NULL);
if (!ret) fprintf(stderr, "\t-- Expecting ok\n");
assert(ret == 0);
ret = set_and_get_mysql_users_wildcards("pippo", "192.%.%", "foo", "192.254.254.242", NULL, NULL, NULL);
if (!ret) fprintf(stderr, "\t-- Expecting ok\n");
assert(ret == 0);
ret = set_and_get_mysql_users_wildcards("pippo", "192.254.%", "foo", "192.254.254.242", NULL, NULL, NULL);
if (!ret) fprintf(stderr, "\t-- Expecting ok\n");
assert(ret == 0);
ret = set_and_get_mysql_users_wildcards("pippo", "192.254.%", "foo", "192.254.0.242", NULL, NULL, NULL);
if (!ret) fprintf(stderr, "\t-- Expecting ok\n");
assert(ret == 0);
ret = set_and_get_mysql_users_wildcards("riccio", "192.0.0.%", "foo", "192.134.0.2", NULL, NULL, NULL);
if (ret) fprintf(stderr, "\t-- Expecting no match\n");
assert(ret == 1);

View File

@ -41,6 +41,8 @@
#define BINLOG_NAMEFMT "%s.%06d"
#define BINLOG_NAME_ROOT "mysql-bin"
#define BINLOG_EVENT_HDR_LEN 19
/* How often to call the binlog status function (seconds) */
#define BLR_STATS_FREQ 60
#define BLR_NSTATS_MINUTES 30
@ -64,9 +66,9 @@
* BLR_MASTER_BACKOFF_TIME The increments of the back off time (seconds)
* BLR_MAX_BACKOFF Maximum number of increments to backoff to
*/
#define BLR_MASTER_BACKOFF_TIME 5
#define BLR_MASTER_BACKOFF_TIME 10
#define BLR_MAX_BACKOFF 60
/**
* Some useful macros for examining the MySQL Response packets
*/
@ -128,6 +130,7 @@ typedef struct blfile {
*/
typedef struct {
int n_events; /*< Number of events sent */
unsigned long n_bytes; /*< Number of bytes sent */
int n_bursts; /*< Number of bursts sent */
int n_requests; /*< Number of requests received */
int n_flows; /*< Number of flow control restarts */
@ -138,6 +141,7 @@ typedef struct {
int n_above;
int n_failed_read;
int n_overrun;
int n_caughtup;
int n_actions[3];
uint64_t lastsample;
int minno;
@ -175,6 +179,7 @@ typedef struct router_slave {
*router; /*< Pointer to the owning router */
struct router_slave *next;
SLAVE_STATS stats; /*< Slave statistics */
time_t connect_time; /*< Connect time of slave */
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail;
#endif
@ -188,6 +193,7 @@ typedef struct {
int n_slaves; /*< Number slave sessions created */
int n_reads; /*< Number of record reads */
uint64_t n_binlogs; /*< Number of binlog records from master */
uint64_t n_binlogs_ses; /*< Number of binlog records from master */
uint64_t n_binlog_errors;/*< Number of binlog records from master */
uint64_t n_rotates; /*< Number of binlog rotate events */
uint64_t n_cachehits; /*< Number of hits on the binlog cache */
@ -265,10 +271,12 @@ typedef struct router_instance {
unsigned int short_burst; /*< Short burst for slave catchup */
unsigned int long_burst; /*< Long burst for slave catchup */
unsigned long burst_size; /*< Maximum size of burst to send */
unsigned long heartbeat; /*< Configured heartbeat value */
ROUTER_STATS stats; /*< Statistics for this router */
int active_logs;
int reconnect_pending;
int retry_backoff;
time_t connect_time;
int handling_threads;
struct router_instance
*next;
@ -278,25 +286,26 @@ typedef struct router_instance {
* State machine for the master to MaxScale replication
*/
#define BLRM_UNCONNECTED 0x0000
#define BLRM_AUTHENTICATED 0x0001
#define BLRM_TIMESTAMP 0x0002
#define BLRM_SERVERID 0x0003
#define BLRM_HBPERIOD 0x0004
#define BLRM_CHKSUM1 0x0005
#define BLRM_CHKSUM2 0x0006
#define BLRM_GTIDMODE 0x0007
#define BLRM_MUUID 0x0008
#define BLRM_SUUID 0x0009
#define BLRM_LATIN1 0x000A
#define BLRM_UTF8 0x000B
#define BLRM_SELECT1 0x000C
#define BLRM_SELECTVER 0x000D
#define BLRM_REGISTER 0x000E
#define BLRM_BINLOGDUMP 0x000F
#define BLRM_CONNECTING 0x0001
#define BLRM_AUTHENTICATED 0x0002
#define BLRM_TIMESTAMP 0x0003
#define BLRM_SERVERID 0x0004
#define BLRM_HBPERIOD 0x0005
#define BLRM_CHKSUM1 0x0006
#define BLRM_CHKSUM2 0x0007
#define BLRM_GTIDMODE 0x0008
#define BLRM_MUUID 0x0009
#define BLRM_SUUID 0x000A
#define BLRM_LATIN1 0x000B
#define BLRM_UTF8 0x000C
#define BLRM_SELECT1 0x000D
#define BLRM_SELECTVER 0x000E
#define BLRM_REGISTER 0x000F
#define BLRM_BINLOGDUMP 0x0010
#define BLRM_MAXSTATE 0x000F
#define BLRM_MAXSTATE 0x0010
static char *blrm_states[] = { "Unconnected", "Authenticated", "Timestamp retrieval",
static char *blrm_states[] = { "Unconnected", "Connecting", "Authenticated", "Timestamp retrieval",
"Server ID retrieval", "HeartBeat Period setup", "binlog checksum config",
"binlog checksum rerieval", "GTID Mode retrieval", "Master UUID retrieval",
"Set Slave UUID", "Set Names latin1", "Set Names utf8", "select 1",
@ -371,6 +380,8 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered",
#define ANONYMOUS_GTID_EVENT 0x22
#define PREVIOUS_GTIDS_EVENT 0x23
#define MAX_EVENT_TYPE 0x23
/**
* Binlog event flags
*/

View File

@ -761,7 +761,7 @@ int log_no_master = 1;
/* log master detection failure od first master becomes available after failure */
if (root_master && mon_status_changed(root_master) && !(root_master->server->status & SERVER_STALE_STATUS)) {
if (root_master->pending_status & (SERVER_MASTER)) {
if (!(root_master->mon_prev_status & SERVER_STALE_STATUS)) {
if (!(root_master->mon_prev_status & SERVER_STALE_STATUS) && !(root_master->server->status & SERVER_MAINT)) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Info: A Master Server is now available: %s:%i",

View File

@ -177,7 +177,7 @@ HTTPD_session *client_data = NULL;
j++;
}
while (!ISspace(buf[j]) && (i < sizeof(url) - 1) && (j < sizeof(buf) - 1)) {
while ((j < sizeof(buf) - 1) && !ISspace(buf[j]) && (i < sizeof(url) - 1)) {
url[i] = buf[j];
i++; j++;
}

View File

@ -962,11 +962,20 @@ static int gw_create_backend_connection(
}
/** Copy client flags to backend protocol */
protocol->client_capabilities =
((MySQLProtocol *)(backend_dcb->session->client->protocol))->client_capabilities;
/** Copy client charset to backend protocol */
protocol->charset =
((MySQLProtocol *)(backend_dcb->session->client->protocol))->charset;
if (backend_dcb->session->client->protocol)
{
/** Copy client flags to backend protocol */
protocol->client_capabilities =
((MySQLProtocol *)(backend_dcb->session->client->protocol))->client_capabilities;
/** Copy client charset to backend protocol */
protocol->charset =
((MySQLProtocol *)(backend_dcb->session->client->protocol))->charset;
}
else
{
protocol->client_capabilities = GW_MYSQL_CAPABILITIES_CLIENT;
protocol->charset = 0x08;
}
/*< if succeed, fd > 0, -1 otherwise */
rv = gw_do_connect_to_backend(server->name, server->port, &fd);
@ -1329,7 +1338,7 @@ static int gw_change_user(
/* now get the user, after 4 bytes header and 1 byte command */
client_auth_packet += 5;
strcpy(username, (char *)client_auth_packet);
strncpy(username, (char *)client_auth_packet,MYSQL_USER_MAXLEN);
client_auth_packet += strlen(username) + 1;
/* get the auth token len */
@ -1349,7 +1358,7 @@ static int gw_change_user(
}
/* get new database name */
strcpy(database, (char *)client_auth_packet);
strncpy(database, (char *)client_auth_packet,MYSQL_DATABASE_MAXLEN);
/* get character set */
if (strlen(database)) {
@ -1362,7 +1371,7 @@ static int gw_change_user(
memcpy(&backend_protocol->charset, client_auth_packet, sizeof(int));
/* save current_database name */
strcpy(current_database, current_session->db);
strncpy(current_database, current_session->db,MYSQL_DATABASE_MAXLEN);
/*
* Now clear database name in dcb as we don't do local authentication on db name for change user.

View File

@ -969,7 +969,7 @@ GWBUF* mysql_create_custom_error(
const char* msg)
{
uint8_t* outbuf = NULL;
uint8_t mysql_payload_size = 0;
uint32_t mysql_payload_size = 0;
uint8_t mysql_packet_header[4];
uint8_t* mysql_payload = NULL;
uint8_t field_count = 0;
@ -1579,7 +1579,7 @@ mysql_send_auth_error (
const char *mysql_message)
{
uint8_t *outbuf = NULL;
uint8_t mysql_payload_size = 0;
uint32_t mysql_payload_size = 0;
uint8_t mysql_packet_header[4];
uint8_t *mysql_payload = NULL;
uint8_t field_count = 0;

View File

@ -19,4 +19,7 @@ target_link_libraries(cli log_manager utils)
install(TARGETS cli DESTINATION modules)
add_subdirectory(readwritesplit)
if(BUILD_BINLOG)
add_subdirectory(binlog)
endif()

View File

@ -59,6 +59,8 @@
#include <mysql_client_server_protocol.h>
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
static char *version_str = "V1.0.6";
@ -186,6 +188,8 @@ int i;
inst->long_burst = DEF_LONG_BURST;
inst->burst_size = DEF_BURST_SIZE;
inst->retry_backoff = 1;
inst->binlogdir = NULL;
inst->heartbeat = 300; // Default is every 5 minutes
/*
* We only support one server behind this router, since the server is
@ -306,6 +310,14 @@ int i;
inst->burst_size = size;
}
else if (strcmp(options[i], "heartbeat") == 0)
{
inst->heartbeat = atoi(value);
}
else if (strcmp(options[i], "binlogdir") == 0)
{
inst->binlogdir = strdup(value);
}
else
{
LOGIF(LE, (skygw_log_write(
@ -416,6 +428,7 @@ ROUTER_SLAVE *slave;
slave->router = inst;
slave->file = NULL;
strcpy(slave->binlogfile, "unassigned");
slave->connect_time = time(0);
/**
* Add this session to the list of active sessions.
@ -509,9 +522,13 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
{
/*
* We must be closing the master session.
*
* TODO: Handle closure of master session
*/
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"%s: Master %s disconnected after %ld seconds. "
"%d events read,",
router->service->name, router->master->remote,
time(0) - router->connect_time, router->stats.n_binlogs_ses)));
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Binlog router close session with master server %s",
@ -529,6 +546,15 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
/* decrease server registered slaves counter */
atomic_add(&router->stats.n_registered, -1);
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"%s: Slave %s, server id %d, disconnected after %ld seconds. "
"%d events sent, %lu bytes.",
router->service->name, slave->dcb->remote,
slave->serverid,
time(0) - slave->connect_time, slave->stats.n_events,
slave->stats.n_bytes)));
/*
* Mark the slave as unregistered to prevent the forwarding
* of any more binlog records to this slave.
@ -641,25 +667,29 @@ struct tm tm;
min5 /= 5.0;
dcb_printf(dcb, "\tMaster connection DCB: %p\n",
dcb_printf(dcb, "\tMaster connection DCB: %p\n",
router_inst->master);
dcb_printf(dcb, "\tMaster connection state: %s\n",
dcb_printf(dcb, "\tMaster connection state: %s\n",
blrm_states[router_inst->master_state]);
localtime_r(&router_inst->stats.lastReply, &tm);
asctime_r(&tm, buf);
dcb_printf(dcb, "\tNumber of master connects: %d\n",
dcb_printf(dcb, "\tBinlog directory: %s\n",
router_inst->binlogdir);
dcb_printf(dcb, "\tNumber of master connects: %d\n",
router_inst->stats.n_masterstarts);
dcb_printf(dcb, "\tNumber of delayed reconnects: %d\n",
dcb_printf(dcb, "\tNumber of delayed reconnects: %d\n",
router_inst->stats.n_delayedreconnects);
dcb_printf(dcb, "\tCurrent binlog file: %s\n",
dcb_printf(dcb, "\tCurrent binlog file: %s\n",
router_inst->binlog_name);
dcb_printf(dcb, "\tCurrent binlog position: %u\n",
dcb_printf(dcb, "\tCurrent binlog position: %u\n",
router_inst->binlog_position);
dcb_printf(dcb, "\tNumber of slave servers: %u\n",
dcb_printf(dcb, "\tNumber of slave servers: %u\n",
router_inst->stats.n_slaves);
dcb_printf(dcb, "\tNumber of binlog events received: %u\n",
dcb_printf(dcb, "\tNo. of binlog events received this session: %u\n",
router_inst->stats.n_binlogs_ses);
dcb_printf(dcb, "\tTotal no. of binlog events received: %u\n",
router_inst->stats.n_binlogs);
minno = router_inst->stats.minno - 1;
if (minno == -1)
@ -668,28 +698,31 @@ struct tm tm;
dcb_printf(dcb, "\tCurrent 5 10 15 30 Min Avg\n");
dcb_printf(dcb, "\t %6d %8.1f %8.1f %8.1f %8.1f\n",
router_inst->stats.minavgs[minno], min5, min10, min15, min30);
dcb_printf(dcb, "\tNumber of fake binlog events: %u\n",
dcb_printf(dcb, "\tNumber of fake binlog events: %u\n",
router_inst->stats.n_fakeevents);
dcb_printf(dcb, "\tNumber of artificial binlog events: %u\n",
dcb_printf(dcb, "\tNumber of artificial binlog events: %u\n",
router_inst->stats.n_artificial);
dcb_printf(dcb, "\tNumber of binlog events in error: %u\n",
dcb_printf(dcb, "\tNumber of binlog events in error: %u\n",
router_inst->stats.n_binlog_errors);
dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n",
dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n",
router_inst->stats.n_rotates);
dcb_printf(dcb, "\tNumber of heartbeat events: %u\n",
dcb_printf(dcb, "\tNumber of heartbeat events: %u\n",
router_inst->stats.n_heartbeats);
dcb_printf(dcb, "\tNumber of packets received: %u\n",
dcb_printf(dcb, "\tNumber of packets received: %u\n",
router_inst->stats.n_reads);
dcb_printf(dcb, "\tNumber of residual data packets: %u\n",
dcb_printf(dcb, "\tNumber of residual data packets: %u\n",
router_inst->stats.n_residuals);
dcb_printf(dcb, "\tAverage events per packet %.1f\n",
dcb_printf(dcb, "\tAverage events per packet %.1f\n",
(double)router_inst->stats.n_binlogs / router_inst->stats.n_reads);
dcb_printf(dcb, "\tLast event from master at: %s",
dcb_printf(dcb, "\tLast event from master at: %s",
buf);
dcb_printf(dcb, "\t (%d seconds ago)\n",
time(0) - router_inst->stats.lastReply);
dcb_printf(dcb, "\tLast event from master: 0x%x\n",
router_inst->lastEventReceived);
dcb_printf(dcb, "\tLast event from master: 0x%x (%s)\n",
router_inst->lastEventReceived,
(router_inst->lastEventReceived >= 0 &&
router_inst->lastEventReceived < 0x24) ?
event_names[router_inst->lastEventReceived] : "unknown");
if (router_inst->active_logs)
dcb_printf(dcb, "\tRouter processing binlog records\n");
if (router_inst->reconnect_pending)
@ -697,7 +730,7 @@ struct tm tm;
dcb_printf(dcb, "\tEvents received:\n");
for (i = 0; i < 0x24; i++)
{
dcb_printf(dcb, "\t\t%-38s: %u\n", event_names[i], router_inst->stats.events[i]);
dcb_printf(dcb, "\t\t%-38s %u\n", event_names[i], router_inst->stats.events[i]);
}
#if SPINLOCK_PROFILE
@ -739,19 +772,44 @@ struct tm tm;
min15 /= 15.0;
min10 /= 10.0;
min5 /= 5.0;
dcb_printf(dcb, "\t\tServer-id: %d\n", session->serverid);
dcb_printf(dcb,
"\t\tServer-id: %d\n",
session->serverid);
if (session->hostname)
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
dcb_printf(dcb, "\t\tSlave DCB: %p\n", session->dcb);
dcb_printf(dcb, "\t\tNext Sequence No: %d\n", session->seqno);
dcb_printf(dcb, "\t\tState: %s\n", blrs_states[session->state]);
dcb_printf(dcb, "\t\tBinlog file: %s\n", session->binlogfile);
dcb_printf(dcb, "\t\tBinlog position: %u\n", session->binlog_pos);
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
dcb_printf(dcb,
"\t\tSlave: %d\n",
session->dcb->remote);
dcb_printf(dcb,
"\t\tSlave DCB: %p\n",
session->dcb);
dcb_printf(dcb,
"\t\tNext Sequence No: %d\n",
session->seqno);
dcb_printf(dcb,
"\t\tState: %s\n",
blrs_states[session->state]);
dcb_printf(dcb,
"\t\tBinlog file: %s\n",
session->binlogfile);
dcb_printf(dcb,
"\t\tBinlog position: %u\n",
session->binlog_pos);
if (session->nocrc)
dcb_printf(dcb, "\t\tMaster Binlog CRC: None\n");
dcb_printf(dcb, "\t\tNo. requests: %u\n", session->stats.n_requests);
dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events);
dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts);
dcb_printf(dcb,
"\t\tMaster Binlog CRC: None\n");
dcb_printf(dcb,
"\t\tNo. requests: %u\n",
session->stats.n_requests);
dcb_printf(dcb,
"\t\tNo. events sent: %u\n",
session->stats.n_events);
dcb_printf(dcb,
"\t\tNo. bursts sent: %u\n",
session->stats.n_bursts);
dcb_printf(dcb,
"\t\tNo. transitions to follow mode: %u\n",
session->stats.n_bursts);
minno = session->stats.minno - 1;
if (minno == -1)
minno = 30;
@ -760,15 +818,18 @@ struct tm tm;
dcb_printf(dcb, "\t\t %6d %8.1f %8.1f %8.1f %8.1f\n",
session->stats.minavgs[minno], min5, min10,
min15, min30);
dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows);
dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd);
dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb);
dcb_printf(dcb, "\t\tNo. of low water cbs N/A %u\n", session->stats.n_cbna);
dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read);
dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun);
dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]);
dcb_printf(dcb, "\t\tNo. of distribute action 2 %u\n", session->stats.n_actions[1]);
dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]);
dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows);
dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd);
dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb);
dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read);
#if DETAILED_DIAG
dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun);
dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]);
dcb_printf(dcb, "\t\tNo. of distribute action 2 %u\n", session->stats.n_actions[1]);
dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]);
#endif
if ((session->cstate & CS_UPTODATE) == 0)
{
dcb_printf(dcb, "\t\tSlave is in catchup mode. %s%s\n",
@ -793,7 +854,7 @@ struct tm tm;
dcb_printf(dcb, "\tSpinlock statistics (rses_lock):\n");
spinlock_stats(&session->rses_lock, spin_reporter, dcb);
#endif
dcb_printf(dcb, "\n");
dcb_printf(dcb, "\t\t--------------------\n\n");
session = session->next;
}
spinlock_release(&router_inst->lock);
@ -822,6 +883,24 @@ ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
router->stats.lastReply = time(0);
}
static char *
extract_message(GWBUF *errpkt)
{
char *rval;
int len;
len = EXTRACT24(errpkt->start);
if ((rval = (char *)malloc(len)) == NULL)
return NULL;
memcpy(rval, (char *)(errpkt->start) + 7, 6);
rval[6] = ' ';
memcpy(&rval[7], (char *)(errpkt->start) + 13, len - 8);
rval[len-2] = 0;
return rval;
}
/**
* Error Reply routine
*
@ -841,10 +920,10 @@ errorReply(ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
int error, len;
char msg[85];
char msg[85], *errmsg;
len = sizeof(error);
if (getsockopt(router->master->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0)
if (router->master && getsockopt(router->master->fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0 && error != 0)
{
strerror_r(error, msg, 80);
strcat(msg, " ");
@ -852,10 +931,21 @@ char msg[85];
else
strcpy(msg, "");
errmsg = extract_message(message);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, "Master connection '%s', %sattempting reconnect to master",
message, msg)));
LOGFILE_ERROR, "%s: Master connection error '%s' in state '%s', "
"%sattempting reconnect to master",
router->service->name, errmsg,
blrm_states[router->master_state], msg)));
if (errmsg)
free(errmsg);
*succp = true;
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"%s: Master %s disconnected after %ld seconds. "
"%d events read.",
router->service->name, router->master->remote,
time(0) - router->connect_time, router->stats.n_binlogs_ses)));
blr_master_reconnect(router);
}

View File

@ -54,6 +54,8 @@
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
/**

View File

@ -51,6 +51,8 @@
#include <log_manager.h>
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
static void blr_file_create(ROUTER_INSTANCE *router, char *file);
@ -75,18 +77,27 @@ int root_len, i;
DIR *dirp;
struct dirent *dp;
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
if (router->binlogdir == NULL)
{
strcpy(path, ptr);
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
{
strcpy(path, ptr);
}
strcat(path, "/");
strcat(path, router->service->name);
if (access(path, R_OK) == -1)
mkdir(path, 0777);
router->binlogdir = strdup(path);
}
if (access(router->binlogdir, R_OK) == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"%s: Unable to read the binlog directory %s.",
router->service->name, router->binlogdir)));
}
strcat(path, "/");
strcat(path, router->service->name);
if (access(path, R_OK) == -1)
mkdir(path, 0777);
router->binlogdir = strdup(path);
/* First try to find a binlog file number by reading the directory */
root_len = strlen(router->fileroot);
@ -353,7 +364,7 @@ struct stat statb;
"Short read when reading the header. "
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %d",
file->binlogname, pos, n)));
n, file->binlogname, pos)));
break;
}
return NULL;
@ -364,6 +375,17 @@ struct stat statb;
hdr->event_size = extract_field(&hdbuf[9], 32);
hdr->next_pos = EXTRACT32(&hdbuf[13]);
hdr->flags = EXTRACT16(&hdbuf[17]);
if (hdr->event_type > MAX_EVENT_TYPE)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Invalid event type 0x%x. "
"Binlog file is %s, position %d",
hdr->event_type,
file->binlogname, pos)));
return NULL;
}
if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,

View File

@ -63,6 +63,8 @@
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
static GWBUF *blr_make_query(char *statement);
static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
@ -91,6 +93,18 @@ blr_start_master(ROUTER_INSTANCE *router)
DCB *client;
GWBUF *buf;
router->stats.n_binlogs_ses = 0;
spinlock_acquire(&router->lock);
if (router->master_state != BLRM_UNCONNECTED)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"%s: Master Connect: Unexpected master state %s\n",
router->service->name, blrm_states[router->master_state])));
spinlock_release(&router->lock);
return;
}
router->master_state = BLRM_CONNECTING;
spinlock_release(&router->lock);
if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
@ -98,6 +112,7 @@ GWBUF *buf;
return;
}
router->client = client;
client->state = DCB_STATE_POLLING; /* Fake the client is reading */
client->data = CreateMySQLAuthData(router->user, router->password, "");
if ((router->session = session_alloc(router->service, client)) == NULL)
{
@ -108,17 +123,27 @@ GWBUF *buf;
client->session = router->session;
if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL)
{
char *name = malloc(strlen(router->service->name) + strlen(" Master") + 1);
sprintf(name, "%s Master", router->service->name);
hktask_oneshot(name, blr_start_master, router,
BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
char *name;
if ((name = malloc(strlen(router->service->name)
+ strlen(" Master") + 1)) != NULL)
{
sprintf(name, "%s Master", router->service->name);
hktask_oneshot(name, blr_start_master, router,
BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
}
if (router->retry_backoff > BLR_MAX_BACKOFF)
router->retry_backoff = 1;
router->retry_backoff = BLR_MAX_BACKOFF;
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Binlog router: failed to connect to master server '%s'",
router->service->databases->unique_name)));
return;
}
router->master->remote = strdup(router->service->databases->name);
LOGIF(LM,(skygw_log_write(
LOGFILE_MESSAGE,
"%s: atempting to connect to master server %s.",
router->service->name, router->master->remote)));
router->connect_time = time(0);
if (setsockopt(router->master->fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive , sizeof(keepalive )))
perror("setsockopt");
@ -129,7 +154,6 @@ perror("setsockopt");
router->master_state = BLRM_TIMESTAMP;
router->stats.n_masterstarts++;
router->retry_backoff = 1;
}
/**
@ -160,7 +184,27 @@ GWBUF *ptr;
router->reconnect_pending = 0;
router->active_logs = 0;
spinlock_release(&router->lock);
blr_start_master(router);
if (router->master_state < BLRM_BINLOGDUMP)
{
char *name;
router->master_state = BLRM_UNCONNECTED;
if ((name = malloc(strlen(router->service->name)
+ strlen(" Master")+1)) != NULL);
{
sprintf(name, "%s Master", router->service->name);
hktask_oneshot(name, blr_start_master, router,
BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
}
if (router->retry_backoff > BLR_MAX_BACKOFF)
router->retry_backoff = BLR_MAX_BACKOFF;
}
else
{
router->master_state = BLRM_UNCONNECTED;
blr_start_master(router);
}
}
/**
@ -225,8 +269,9 @@ char query[128];
if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.",
router->master_state)));
LOGFILE_ERROR,
"Invalid master state machine state (%d) for binlog router.",
router->master_state)));
gwbuf_consume(buf, gwbuf_length(buf));
spinlock_acquire(&router->lock);
if (router->reconnect_pending)
@ -234,6 +279,12 @@ char query[128];
router->active_logs = 0;
spinlock_release(&router->lock);
atomic_add(&router->handling_threads, -1);
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"%s: Pending reconnect in state %s.",
router->service->name,
blrm_states[router->master_state]
)));
blr_restart_master(router);
return;
}
@ -247,8 +298,11 @@ char query[128];
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Received error: %d, %s from master during %s phase of the master state machine.",
MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state]
"%s: Received error: %d, %s from master during %s phase "
"of the master state machine.",
router->service->name,
MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf),
blrm_states[router->master_state]
)));
gwbuf_consume(buf, gwbuf_length(buf));
spinlock_acquire(&router->lock);
@ -272,12 +326,17 @@ char query[128];
buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'");
router->master_state = BLRM_SERVERID;
router->master->func.write(router->master, buf);
router->retry_backoff = 1;
break;
case BLRM_SERVERID:
// Response to fetch of master's server-id
router->saved_master.server_id = buf;
// TODO: Extract the value of server-id and place in router->master_id
buf = blr_make_query("SET @master_heartbeat_period = 1799999979520");
{
char str[80];
sprintf(str, "SET @master_heartbeat_period = %lu000000000", router->heartbeat);
buf = blr_make_query(str);
}
router->master_state = BLRM_HBPERIOD;
router->master->func.write(router->master, buf);
break;
@ -357,6 +416,12 @@ char query[128];
buf = blr_make_binlog_dump(router);
router->master_state = BLRM_BINLOGDUMP;
router->master->func.write(router->master, buf);
LOGIF(LM,(skygw_log_write(
LOGFILE_MESSAGE,
"%s: Request binlog records from %s at "
"position %d from master server %s.",
router->service->name, router->binlog_name,
router->binlog_position, router->master->remote)));
break;
case BLRM_BINLOGDUMP:
// Main body, we have received a binlog record from the master
@ -618,133 +683,172 @@ static REP_HEADER phdr;
n_bufs = 1;
}
blr_extract_header(ptr, &hdr);
if (hdr.event_size != len - 5)
if (len < BINLOG_EVENT_HDR_LEN)
{
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"Packet length is %d, but event size is %d, "
"binlog file %s position %d"
"reslen is %d and preslen is %d, "
"length of previous event %d. %s",
len, hdr.event_size,
router->binlog_name,
router->binlog_position,
reslen, preslen, prev_length,
(prev_length == -1 ?
(no_residual ? "No residual data from previous call" : "Residual data from previous call") : "")
)));
blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len);
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"This event (0x%x) was contained in %d GWBUFs, "
"the previous events was contained in %d GWBUFs",
router->lastEventReceived, n_bufs, pn_bufs)));
if (msg)
char *msg = "";
if (ptr[4] == 0xfe) /* EOF Packet */
{
free(msg);
msg = NULL;
msg = "end of file";
}
break;
else if (ptr[4] == 0xff) /* EOF Packet */
{
msg = "error";
}
LOGIF(LM,(skygw_log_write(
LOGFILE_MESSAGE,
"Non-event message (%s) from master.",
msg)));
}
phdr = hdr;
if (hdr.ok == 0)
else
{
router->stats.n_binlogs++;
router->stats.n_binlogs_ses++;
router->lastEventReceived = hdr.event_type;
blr_extract_header(ptr, &hdr);
if (hdr.event_size != len - 5)
{
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"Packet length is %d, but event size is %d, "
"binlog file %s position %d "
"reslen is %d and preslen is %d, "
"length of previous event %d. %s",
len, hdr.event_size,
router->binlog_name,
router->binlog_position,
reslen, preslen, prev_length,
(prev_length == -1 ?
(no_residual ? "No residual data from previous call" : "Residual data from previous call") : "")
)));
blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len);
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"This event (0x%x) was contained in %d GWBUFs, "
"the previous events was contained in %d GWBUFs",
router->lastEventReceived, n_bufs, pn_bufs)));
if (msg)
{
free(msg);
msg = NULL;
}
break;
}
phdr = hdr;
if (hdr.ok == 0)
{
router->stats.n_binlogs++;
router->lastEventReceived = hdr.event_type;
// #define SHOW_EVENTS
#ifdef SHOW_EVENTS
printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size);
printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size);
#endif
if (hdr.event_type >= 0 && hdr.event_type < 0x24)
router->stats.events[hdr.event_type]++;
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
{
// Fake format description message
LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG,
"Replication fake event. "
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->stats.n_fakeevents++;
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
if (hdr.event_type >= 0 && hdr.event_type < 0x24)
router->stats.events[hdr.event_type]++;
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
{
/*
* We need to save this to replay to new
* slaves that attach later.
*/
if (router->saved_master.fde_event)
free(router->saved_master.fde_event);
router->saved_master.fde_len = hdr.event_size;
router->saved_master.fde_event = malloc(hdr.event_size);
if (router->saved_master.fde_event)
memcpy(router->saved_master.fde_event,
ptr + 5, hdr.event_size);
// Fake format description message
LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG,
"Replication fake event. "
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->stats.n_fakeevents++;
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
{
uint8_t *new_fde;
unsigned int new_fde_len;
/*
* We need to save this to replay to new
* slaves that attach later.
*/
new_fde_len = hdr.event_size;
new_fde = malloc(hdr.event_size);
if (new_fde)
{
memcpy(new_fde, ptr + 5, hdr.event_size);
if (router->saved_master.fde_event)
free(router->saved_master.fde_event);
router->saved_master.fde_event = new_fde;
router->saved_master.fde_len = new_fde_len;
}
else
{
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"%s: Received a format description "
"event that MaxScale was unable to "
"record. Event length is %d.",
router->service->name,
hdr.event_size)));
blr_log_packet(LOGFILE_ERROR,
"Format Description Event:", ptr, len);
}
}
}
else
{
if (hdr.event_type == HEARTBEAT_EVENT)
{
#ifdef SHOW_EVENTS
printf("Replication heartbeat\n");
#endif
LOGIF(LD,(skygw_log_write(
LOGFILE_DEBUG,
"Replication heartbeat. "
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->stats.n_heartbeats++;
}
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
{
ptr = ptr + 5; // We don't put the first byte of the payload
// into the binlog file
if (hdr.event_type == ROTATE_EVENT)
router->rotating = 1;
blr_write_binlog_record(router, &hdr, ptr);
if (hdr.event_type == ROTATE_EVENT)
{
blr_rotate_event(router, ptr, &hdr);
}
blr_distribute_binlog_record(router, &hdr, ptr);
}
else
{
router->stats.n_artificial++;
LOGIF(LD,(skygw_log_write(
LOGFILE_DEBUG,
"Artificial event not written "
"to disk or distributed. "
"Type 0x%x, Length %d, Binlog "
"%s @ %d.",
hdr.event_type,
hdr.event_size,
router->binlog_name,
router->binlog_position)));
ptr += 5;
if (hdr.event_type == ROTATE_EVENT)
{
router->rotating = 1;
blr_rotate_event(router, ptr, &hdr);
}
}
}
}
else
{
if (hdr.event_type == HEARTBEAT_EVENT)
{
#ifdef SHOW_EVENTS
printf("Replication heartbeat\n");
#endif
LOGIF(LD,(skygw_log_write(
LOGFILE_DEBUG,
"Replication heartbeat. "
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->stats.n_heartbeats++;
}
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
{
ptr = ptr + 5; // We don't put the first byte of the payload
// into the binlog file
if (hdr.event_type == ROTATE_EVENT)
router->rotating = 1;
blr_write_binlog_record(router, &hdr, ptr);
if (hdr.event_type == ROTATE_EVENT)
{
blr_rotate_event(router, ptr, &hdr);
}
blr_distribute_binlog_record(router, &hdr, ptr);
}
else
{
router->stats.n_artificial++;
LOGIF(LD,(skygw_log_write(
LOGFILE_DEBUG,
"Artificial event not written "
"to disk or distributed. "
"Type 0x%x, Length %d, Binlog "
"%s @ %d.",
hdr.event_type,
hdr.event_size,
router->binlog_name,
router->binlog_position)));
ptr += 5;
if (hdr.event_type == ROTATE_EVENT)
{
router->rotating = 1;
blr_rotate_event(router, ptr, &hdr);
}
}
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"Error packet in binlog stream.%s @ %d.",
router->binlog_name,
router->binlog_position)));
blr_log_packet(LOGFILE_ERROR, "Error Packet:",
ptr, len);
router->stats.n_binlog_errors++;
}
}
else
{
printf("Binlog router error: %s\n", &ptr[7]);
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"Error packet in binlog stream.%s @ %d.",
router->binlog_name,
router->binlog_position)));
blr_log_packet(LOGFILE_ERROR, "Error Packet:",
ptr, len);
router->stats.n_binlog_errors++;
}
if (msg)
{
@ -968,6 +1072,7 @@ int action;
{
blr_slave_rotate(slave, ptr);
}
slave->stats.n_bytes += gwbuf_length(pkt);
slave->dcb->func.write(slave->dcb, pkt);
if (hdr->event_type != ROTATE_EVENT)
{

View File

@ -65,8 +65,11 @@ int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static void blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
extern int lm_enabled_logfiles_bitmask;
extern size_t log_ses_count[];
extern __thread log_info_t tls_log_info;
/**
* Process a request packet from the slave server.
@ -544,29 +547,8 @@ uint32_t chksum;
rval = slave->dcb->func.write(slave->dcb, resp);
/* Send the FORMAT_DESCRIPTION_EVENT */
if (router->saved_master.fde_event)
{
resp = gwbuf_alloc(router->saved_master.fde_len + 5);
ptr = GWBUF_DATA(resp);
encode_value(ptr, router->saved_master.fde_len + 1, 24); // Payload length
ptr += 3;
*ptr++ = slave->seqno++;
*ptr++ = 0; // OK
memcpy(ptr, router->saved_master.fde_event, router->saved_master.fde_len);
encode_value(ptr, time(0), 32); // Overwrite timestamp
/*
* Since we have changed the timestamp we must recalculate the CRC
*
* Position ptr to the start of the event header,
* calculate a new checksum
* and write it into the header
*/
ptr = GWBUF_DATA(resp) + 5 + router->saved_master.fde_len - 4;
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, GWBUF_DATA(resp) + 5, router->saved_master.fde_len - 4);
encode_value(ptr, chksum, 32);
rval = slave->dcb->func.write(slave->dcb, resp);
}
if (slave->binlog_pos != 4)
blr_slave_send_fde(router, slave);
slave->dcb->low_water = router->low_water;
slave->dcb->high_water = router->high_water;
@ -575,8 +557,9 @@ uint32_t chksum;
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"%s: New slave %s requested binlog file %s from position %lu",
"%s: New slave %s, server id %d, requested binlog file %s from position %lu",
router->service->name, slave->dcb->remote,
slave->serverid,
slave->binlogfile, slave->binlog_pos)));
if (slave->binlog_pos != router->binlog_position ||
@ -782,6 +765,7 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "blr_open_binlog took %d beats",
hkheartbeat - beat1)));
}
slave->stats.n_bytes += gwbuf_length(head);
written = slave->dcb->func.write(slave->dcb, head);
if (written && hdr.event_type != ROTATE_EVENT)
{
@ -839,11 +823,23 @@ if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
if (state_change)
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"%s: Slave %s is up to date %s, %u.",
slave->stats.n_caughtup++;
if (slave->stats.n_caughtup == 1)
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"%s: Slave %s is up to date %s, %u.",
router->service->name,
slave->dcb->remote,
slave->binlogfile, slave->binlog_pos)));
}
else if ((slave->stats.n_caughtup % 50) == 0)
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"%s: Slave %s is up to date %s, %u.",
router->service->name,
slave->dcb->remote,
slave->binlogfile, slave->binlog_pos)));
}
}
}
else
@ -1031,3 +1027,51 @@ uint32_t chksum;
slave->dcb->func.write(slave->dcb, resp);
return 1;
}
/**
* Send a "fake" format description event to the newly connected slave
*
* @param router The router instance
* @param slave The slave to send the event to
*/
static void
blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
{
BLFILE *file;
REP_HEADER hdr;
GWBUF *record, *head;
uint8_t *ptr;
uint32_t chksum;
if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL)
return;
if ((record = blr_read_binlog(router, file, 4, &hdr)) == NULL)
{
blr_close_binlog(router, file);
return;
}
blr_close_binlog(router, file);
head = gwbuf_alloc(5);
ptr = GWBUF_DATA(head);
encode_value(ptr, hdr.event_size + 1, 24); // Payload length
ptr += 3;
*ptr++ = slave->seqno++;
*ptr++ = 0; // OK
head = gwbuf_append(head, record);
ptr = GWBUF_DATA(record);
encode_value(ptr, time(0), 32); // Overwrite timestamp
ptr += 13;
encode_value(ptr, 0, 32); // Set next position to 0
/*
* Since we have changed the timestamp we must recalculate the CRC
*
* Position ptr to the start of the event header,
* calculate a new checksum
* and write it into the header
*/
ptr = GWBUF_DATA(record) + hdr.event_size - 4;
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, GWBUF_DATA(record), hdr.event_size - 4);
encode_value(ptr, chksum, 32);
slave->dcb->func.write(slave->dcb, head);
}

View File

@ -1,4 +1,4 @@
#!/bin/sh
#!/bin/bash
NARGS=7
TLOG=$1
THOST=$2