Merge branch 'binlog_router_mariadb10' into dev-mdb10

Conflicts:
	server/core/config.c
This commit is contained in:
Markus Makela
2015-06-22 19:16:38 +03:00
6 changed files with 138 additions and 64 deletions

View File

@ -428,7 +428,7 @@ hashtable_memory_fns(monitorhash,strdup,NULL,free,NULL);
/** Add the 5.5.5- string to the start of the version string if /** Add the 5.5.5- string to the start of the version string if
* the version string starts with "10.". * the version string starts with "10.".
* This mimics MariaDB 10.0 behavior which adds 5.5.5- for backwards compatibility. */ * This mimics MariaDB 10.0 replication which adds 5.5.5- for backwards compatibility. */
if(strncmp(version_string,"10.",3) == 0) if(strncmp(version_string,"10.",3) == 0)
{ {
((SERVICE *)(obj->element))->version_string = malloc((strlen(version_string) + ((SERVICE *)(obj->element))->version_string = malloc((strlen(version_string) +

View File

@ -24,8 +24,9 @@
* @verbatim * @verbatim
* Revision History * Revision History
* *
* Date Who Description * Date Who Description
* 02/04/14 Mark Riddoch Initial implementation * 02/04/14 Mark Riddoch Initial implementation
* 11/05/15 Massimilaino Pinto Added mariadb10_compat to master and slave structs
* *
* @endverbatim * @endverbatim
*/ */
@ -176,6 +177,7 @@ typedef struct router_slave {
uint32_t lastEventTimestamp;/*< Last event timestamp sent */ uint32_t lastEventTimestamp;/*< Last event timestamp sent */
SPINLOCK catch_lock; /*< Event catchup lock */ SPINLOCK catch_lock; /*< Event catchup lock */
unsigned int cstate; /*< Catch up state */ unsigned int cstate; /*< Catch up state */
bool mariadb10_compat;/*< MariaDB 10.0 compatibility */
SPINLOCK rses_lock; /*< Protects rses_deleted */ SPINLOCK rses_lock; /*< Protects rses_deleted */
pthread_t pthread; pthread_t pthread;
struct router_instance struct router_instance
@ -234,6 +236,7 @@ typedef struct {
GWBUF *selectvercom; /*< select @@version_comment */ GWBUF *selectvercom; /*< select @@version_comment */
GWBUF *selecthostname;/*< select @@hostname */ GWBUF *selecthostname;/*< select @@hostname */
GWBUF *map; /*< select @@max_allowed_packet */ GWBUF *map; /*< select @@max_allowed_packet */
GWBUF *mariadb10; /*< set @mariadb_slave_capability */
uint8_t *fde_event; /*< Format Description Event */ uint8_t *fde_event; /*< Format Description Event */
int fde_len; /*< Length of fde_event */ int fde_len; /*< Length of fde_event */
} MASTER_RESPONSES; } MASTER_RESPONSES;
@ -242,54 +245,54 @@ typedef struct {
* The per instance data for the router. * The per instance data for the router.
*/ */
typedef struct router_instance { typedef struct router_instance {
SERVICE *service; /*< Pointer to the service using this router */ SERVICE *service; /*< Pointer to the service using this router */
ROUTER_SLAVE *slaves; /*< Link list of all the slave connections */ ROUTER_SLAVE *slaves; /*< Link list of all the slave connections */
SPINLOCK lock; /*< Spinlock for the instance data */ SPINLOCK lock; /*< Spinlock for the instance data */
char *uuid; /*< UUID for the router to use w/master */ char *uuid; /*< UUID for the router to use w/master */
int masterid; /*< Server ID of the master */ int masterid; /*< Server ID of the master */
int serverid; /*< Server ID to use with master */ int serverid; /*< Server ID to use with master */
int initbinlog; /*< Initial binlog file number */ int initbinlog; /*< Initial binlog file number */
char *user; /*< User name to use with master */ char *user; /*< User name to use with master */
char *password; /*< Password to use with master */ char *password; /*< Password to use with master */
char *fileroot; /*< Root of binlog filename */ char *fileroot; /*< Root of binlog filename */
bool master_chksum;/*< Does the master provide checksums */ bool master_chksum; /*< Does the master provide checksums */
char *master_uuid; /*< UUID of the master */ bool mariadb10_compat; /*< MariaDB 10.0 compatibility */
DCB *master; /*< DCB for master connection */ char *master_uuid; /*< UUID of the master */
DCB *client; /*< DCB for dummy client */ DCB *master; /*< DCB for master connection */
SESSION *session; /*< Fake session for master connection */ DCB *client; /*< DCB for dummy client */
unsigned int master_state; /*< State of the master FSM */ SESSION *session; /*< Fake session for master connection */
uint8_t lastEventReceived; unsigned int master_state; /*< State of the master FSM */
uint32_t lastEventTimestamp; /*< Timestamp from last event */ uint8_t lastEventReceived;
GWBUF *residual; /*< Any residual binlog event */ uint32_t lastEventTimestamp; /*< Timestamp from last event */
MASTER_RESPONSES saved_master; /*< Saved master responses */ GWBUF *residual; /*< Any residual binlog event */
char *binlogdir; /*< The directory with the binlog files */ MASTER_RESPONSES saved_master; /*< Saved master responses */
SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */ char *binlogdir; /*< The directory with the binlog files */
char binlog_name[BINLOG_FNAMELEN+1]; SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */
char binlog_name[BINLOG_FNAMELEN+1];
/*< Name of the current binlog file */ /*< Name of the current binlog file */
uint64_t binlog_position; uint64_t binlog_position;
/*< Current binlog position */ /*< Current binlog position */
int binlog_fd; /*< File descriptor of the binlog int binlog_fd; /*< File descriptor of the binlog
* file being written * file being written
*/ */
uint64_t last_written; /*< Position of last event written */ uint64_t last_written; /*< Position of last event written */
char prevbinlog[BINLOG_FNAMELEN+1]; char prevbinlog[BINLOG_FNAMELEN+1];
int rotating; /*< Rotation in progress flag */ int rotating; /*< Rotation in progress flag */
BLFILE *files; /*< Files used by the slaves */ BLFILE *files; /*< Files used by the slaves */
SPINLOCK fileslock; /*< Lock for the files queue above */ SPINLOCK fileslock; /*< Lock for the files queue above */
unsigned int low_water; /*< Low water mark for client DCB */ unsigned int low_water; /*< Low water mark for client DCB */
unsigned int high_water; /*< High water mark for client DCB */ unsigned int high_water; /*< High water mark for client DCB */
unsigned int short_burst; /*< Short burst for slave catchup */ unsigned int short_burst; /*< Short burst for slave catchup */
unsigned int long_burst; /*< Long burst for slave catchup */ unsigned int long_burst; /*< Long burst for slave catchup */
unsigned long burst_size; /*< Maximum size of burst to send */ unsigned long burst_size; /*< Maximum size of burst to send */
unsigned long heartbeat; /*< Configured heartbeat value */ unsigned long heartbeat; /*< Configured heartbeat value */
ROUTER_STATS stats; /*< Statistics for this router */ ROUTER_STATS stats; /*< Statistics for this router */
int active_logs; int active_logs;
int reconnect_pending; int reconnect_pending;
int retry_backoff; int retry_backoff;
time_t connect_time; time_t connect_time;
int handling_threads; int handling_threads;
struct router_instance struct router_instance *next;
*next;
} ROUTER_INSTANCE; } ROUTER_INSTANCE;
/** /**
@ -315,15 +318,16 @@ typedef struct router_instance {
#define BLRM_MAP 0x0011 #define BLRM_MAP 0x0011
#define BLRM_REGISTER 0x0012 #define BLRM_REGISTER 0x0012
#define BLRM_BINLOGDUMP 0x0013 #define BLRM_BINLOGDUMP 0x0013
#define BLRM_MARIADB10 0x0014
#define BLRM_MAXSTATE 0x0013 #define BLRM_MAXSTATE 0x0014
static char *blrm_states[] = { "Unconnected", "Connecting", "Authenticated", "Timestamp retrieval", static char *blrm_states[] = { "Unconnected", "Connecting", "Authenticated", "Timestamp retrieval",
"Server ID retrieval", "HeartBeat Period setup", "binlog checksum config", "Server ID retrieval", "HeartBeat Period setup", "binlog checksum config",
"binlog checksum rerieval", "GTID Mode retrieval", "Master UUID retrieval", "binlog checksum rerieval", "GTID Mode retrieval", "Master UUID retrieval",
"Set Slave UUID", "Set Names latin1", "Set Names utf8", "select 1", "Set Slave UUID", "Set Names latin1", "Set Names utf8", "select 1",
"select version()", "select @@version_comment", "select @@hostname", "select version()", "select @@version_comment", "select @@hostname",
"select @@mx_allowed_packet", "Register slave", "Binlog Dump" }; "select @@mx_allowed_packet", "Register slave", "Binlog Dump", "Set MariaDB slave capability" };
#define BLRS_CREATED 0x0000 #define BLRS_CREATED 0x0000
#define BLRS_UNREGISTERED 0x0001 #define BLRS_UNREGISTERED 0x0001
@ -397,6 +401,7 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered",
#define PREVIOUS_GTIDS_EVENT 0x23 #define PREVIOUS_GTIDS_EVENT 0x23
#define MAX_EVENT_TYPE 0x23 #define MAX_EVENT_TYPE 0x23
#define MAX_EVENT_TYPE_MARIADB10 0xa3
/** /**
* Binlog event flags * Binlog event flags

View File

@ -35,6 +35,8 @@
* 02/04/2014 Mark Riddoch Initial implementation * 02/04/2014 Mark Riddoch Initial implementation
* 17/02/2015 Massimiliano Pinto Addition of slave port and username in diagnostics * 17/02/2015 Massimiliano Pinto Addition of slave port and username in diagnostics
* 18/02/2015 Massimiliano Pinto Addition of dcb_close in closeSession * 18/02/2015 Massimiliano Pinto Addition of dcb_close in closeSession
* 07/05/2015 Massimiliano Pinto Addition of MariaDB 10 compatibility support
* *
* @endverbatim * @endverbatim
*/ */
@ -195,6 +197,7 @@ unsigned char *defuuid;
inst->retry_backoff = 1; inst->retry_backoff = 1;
inst->binlogdir = NULL; inst->binlogdir = NULL;
inst->heartbeat = 300; // Default is every 5 minutes inst->heartbeat = 300; // Default is every 5 minutes
inst->mariadb10_compat = false;
inst->user = strdup(service->credentials.name); inst->user = strdup(service->credentials.name);
inst->password = strdup(service->credentials.authdata); inst->password = strdup(service->credentials.authdata);
@ -282,6 +285,10 @@ unsigned char *defuuid;
{ {
inst->masterid = atoi(value); inst->masterid = atoi(value);
} }
else if (strcmp(options[i], "mariadb10-compatibility") == 0)
{
inst->mariadb10_compat = config_truth_value(value);
}
else if (strcmp(options[i], "filestem") == 0) else if (strcmp(options[i], "filestem") == 0)
{ {
inst->fileroot = strdup(value); inst->fileroot = strdup(value);
@ -388,6 +395,7 @@ unsigned char *defuuid;
inst->saved_master.selectvercom = blr_cache_read_response(inst, "selectvercom"); inst->saved_master.selectvercom = blr_cache_read_response(inst, "selectvercom");
inst->saved_master.selecthostname = blr_cache_read_response(inst, "selecthostname"); inst->saved_master.selecthostname = blr_cache_read_response(inst, "selecthostname");
inst->saved_master.map = blr_cache_read_response(inst, "map"); inst->saved_master.map = blr_cache_read_response(inst, "map");
inst->saved_master.mariadb10 = blr_cache_read_response(inst, "mariadb10");
/* /*
* Initialise the binlog file and position * Initialise the binlog file and position
@ -490,6 +498,7 @@ ROUTER_SLAVE *slave;
strcpy(slave->binlogfile, "unassigned"); strcpy(slave->binlogfile, "unassigned");
slave->connect_time = time(0); slave->connect_time = time(0);
slave->lastEventTimestamp = 0; slave->lastEventTimestamp = 0;
slave->mariadb10_compat = false;
/** /**
* Add this session to the list of active sessions. * Add this session to the list of active sessions.

View File

@ -23,8 +23,9 @@
* @verbatim * @verbatim
* Revision History * Revision History
* *
* Date Who Description * Date Who Description
* 14/04/2014 Mark Riddoch Initial implementation * 14/04/2014 Mark Riddoch Initial implementation
* 07/05/2015 Massimiliano Pinto Added MAX_EVENT_TYPE_MARIADB10
* *
* @endverbatim * @endverbatim
*/ */
@ -439,15 +440,26 @@ struct stat statb;
hdr->next_pos = EXTRACT32(&hdbuf[13]); hdr->next_pos = EXTRACT32(&hdbuf[13]);
hdr->flags = EXTRACT16(&hdbuf[17]); hdr->flags = EXTRACT16(&hdbuf[17]);
if (hdr->event_type > MAX_EVENT_TYPE) if (router->mariadb10_compat) {
{ if (hdr->event_type > MAX_EVENT_TYPE_MARIADB10) {
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Invalid event type 0x%x. " "Invalid MariaDB 10 event type 0x%x. "
"Binlog file is %s, position %d", "Binlog file is %s, position %d",
hdr->event_type, hdr->event_type,
file->binlogname, pos))); file->binlogname, pos)));
return NULL; return NULL;
} }
} else {
if (hdr->event_type > MAX_EVENT_TYPE) {
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Invalid event type 0x%x. "
"Binlog file is %s, position %d",
hdr->event_type,
file->binlogname, pos)));
return NULL;
}
}
if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT) if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT)
{ {

View File

@ -31,8 +31,9 @@
* @verbatim * @verbatim
* Revision History * Revision History
* *
* Date Who Description * Date Who Description
* 02/04/2014 Mark Riddoch Initial implementation * 02/04/2014 Mark Riddoch Initial implementation
* 07/05/2015 Massimiliano Pinto Added MariaDB 10 Compatibility
* *
* @endverbatim * @endverbatim
*/ */
@ -448,11 +449,27 @@ char query[128];
GWBUF_CONSUME_ALL(router->saved_master.chksum2); GWBUF_CONSUME_ALL(router->saved_master.chksum2);
router->saved_master.chksum2 = buf; router->saved_master.chksum2 = buf;
blr_cache_response(router, "chksum2", buf); blr_cache_response(router, "chksum2", buf);
buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE");
router->master_state = BLRM_GTIDMODE; if (router->mariadb10_compat) {
buf = blr_make_query("SET @mariadb_slave_capability=4");
router->master_state = BLRM_MARIADB10;
} else {
buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE");
router->master_state = BLRM_GTIDMODE;
}
router->master->func.write(router->master, buf); router->master->func.write(router->master, buf);
break; break;
} }
case BLRM_MARIADB10:
// Response to the SET @mariadb_slave_capability=4, should be stored
if (router->saved_master.mariadb10)
GWBUF_CONSUME_ALL(router->saved_master.mariadb10);
router->saved_master.mariadb10 = buf;
blr_cache_response(router, "mariadb10", buf);
buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'");
router->master_state = BLRM_MUUID;
router->master->func.write(router->master, buf);
break;
case BLRM_GTIDMODE: case BLRM_GTIDMODE:
// Response to the GTID_MODE, should be stored // Response to the GTID_MODE, should be stored
if (router->saved_master.gtid_mode) if (router->saved_master.gtid_mode)

View File

@ -36,9 +36,12 @@
* 18/02/2015 Massimiliano Pinto Addition of DISCONNECT ALL and DISCONNECT SERVER server_id * 18/02/2015 Massimiliano Pinto Addition of DISCONNECT ALL and DISCONNECT SERVER server_id
* 18/03/2015 Markus Makela Better detection of CRC32 | NONE checksum * 18/03/2015 Markus Makela Better detection of CRC32 | NONE checksum
* 19/03/2015 Massimiliano Pinto Addition of basic MariaDB 10 compatibility support * 19/03/2015 Massimiliano Pinto Addition of basic MariaDB 10 compatibility support
* 07/05/2015 Massimiliano Pinto Added MariaDB 10 Compatibility
* 11/05/2015 Massimiliano Pinto Only MariaDB 10 Slaves can register to binlog router with a MariaDB 10 Master
* *
* @endverbatim * @endverbatim
*/ */
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -123,7 +126,28 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
return blr_slave_query(router, slave, queue); return blr_slave_query(router, slave, queue);
break; break;
case COM_REGISTER_SLAVE: case COM_REGISTER_SLAVE:
return blr_slave_register(router, slave, queue); /*
* If Master is MariaDB10 don't allow registration from
* MariaDB/Mysql 5 Slaves
*/
if (router->mariadb10_compat && !slave->mariadb10_compat) {
slave->state = BLRS_ERRORED;
blr_send_custom_error(slave->dcb, 1, 0,
"MariaDB 10 Slave is required for Slave registration");
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"%s: Slave %s: a MariaDB 10 Slave is required for Slave registration",
router->service->name,
slave->dcb->remote)));
dcb_close(slave->dcb);
return 1;
} else {
/* Master and Slave version OK: continue with slave registration */
return blr_slave_register(router, slave, queue);
}
break; break;
case COM_BINLOG_DUMP: case COM_BINLOG_DUMP:
return blr_slave_binlog_dump(router, slave, queue); return blr_slave_binlog_dump(router, slave, queue);
@ -366,10 +390,17 @@ int query_len;
free(query_text); free(query_text);
return blr_slave_replay(router, slave, router->saved_master.heartbeat); return blr_slave_replay(router, slave, router->saved_master.heartbeat);
} }
else if (strcasecmp(word, "@mariadb_slave_capability") == 0) else if (strcasecmp(word, "@mariadb_slave_capability") == 0)
{ {
free(query_text); /* mariadb10 compatibility is set for the slave */
return blr_slave_send_ok(router, slave); slave->mariadb10_compat=true;
free(query_text);
if (router->mariadb10_compat) {
return blr_slave_replay(router, slave, router->saved_master.mariadb10);
} else {
return blr_slave_send_ok(router, slave);
}
} }
else if (strcasecmp(word, "@master_binlog_checksum") == 0) else if (strcasecmp(word, "@master_binlog_checksum") == 0)
{ {
@ -442,7 +473,7 @@ int query_len;
query_text = strndup(qtext, query_len); query_text = strndup(qtext, query_len);
LOGIF(LE, (skygw_log_write( LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Unexpected query from slave server %s", query_text))); LOGFILE_ERROR, "Unexpected query from slave %s: %s", slave->dcb->remote, query_text)));
free(query_text); free(query_text);
blr_slave_send_error(router, slave, "Unexpected SQL query received from slave."); blr_slave_send_error(router, slave, "Unexpected SQL query received from slave.");
return 1; return 1;