Added support for new options

Added support for new options:

master_uuid,  master_hostname, master_version

If set, those values will be sent to slaves instead of
saved master responses
This commit is contained in:
MassimilianoPinto
2015-08-21 16:31:40 +02:00
parent 34bd94fbcd
commit 4242d28e13
4 changed files with 245 additions and 23 deletions

View File

@ -357,15 +357,15 @@ typedef struct router_instance {
ROUTER_SLAVE *slaves; /*< Link list of all the slave connections */
SPINLOCK lock; /*< Spinlock for the instance data */
char *uuid; /*< UUID for the router to use w/master */
int masterid; /*< Server ID of the master */
int serverid; /*< Server ID to use with master */
int masterid; /*< Set ID of the master, sent to slaves */
int serverid; /*< ID for the router to use w/master */
int initbinlog; /*< Initial binlog file number */
char *user; /*< User name to use with master */
char *password; /*< Password to use with master */
char *fileroot; /*< Root of binlog filename */
bool master_chksum; /*< Does the master provide checksums */
bool mariadb10_compat; /*< MariaDB 10.0 compatibility */
char *master_uuid; /*< UUID of the master */
char *master_uuid; /*< Set UUID of the master, sent to slaves */
DCB *master; /*< DCB for master connection */
DCB *client; /*< DCB for dummy client */
SESSION *session; /*< Fake session for master connection */
@ -407,6 +407,10 @@ typedef struct router_instance {
int handling_threads;
unsigned long m_errno; /*< master response mysql errno */
char *m_errmsg; /*< master response mysql error message */
char *set_master_version; /*< Send custom Version to slaves */
char *set_master_hostname; /*< Send custom Hostname to slaves */
char *set_master_uuid; /*< Send custom Master UUID to slaves */
char *set_master_server_id; /*< Send custom Master server_id to slaves */
struct router_instance *next;
} ROUTER_INSTANCE;

View File

@ -41,6 +41,10 @@
* If not found router goes into BLRM_UNCONFIGURED state.
* Cache dir is 'cache' under router->binlogdir.
* 07/08/2015 Massimiliano Pinto Addition of binlog check at startup if trx_safe is on
* 21/08/2015 Massimiliano Pinto Added support for new config options:
* master_uuid, master_hostname, master_version
* If set those values are sent to slaves instead of
* saved master responses
*
* @endverbatim
*/
@ -277,6 +281,11 @@ int rc = 0;
inst->pending_transaction = 0;
inst->last_safe_pos = 0;
inst->set_master_version = NULL;
inst->set_master_hostname = NULL;
inst->set_master_uuid = NULL;
inst->set_master_server_id = NULL;
my_uuid_init((ulong)rand()*12345,12345);
if ((defuuid = (unsigned char *)malloc(20)) != NULL)
{
@ -340,6 +349,19 @@ int rc = 0;
else if (strcmp(options[i], "master-id") == 0)
{
inst->masterid = atoi(value);
inst->set_master_server_id = value;
}
else if (strcmp(options[i], "master_uuid") == 0)
{
inst->set_master_uuid = strdup(value);
}
else if (strcmp(options[i], "master_version") == 0)
{
inst->set_master_version = strdup(value);
}
else if (strcmp(options[i], "master_hostname") == 0)
{
inst->set_master_hostname = strdup(value);
}
else if (strcmp(options[i], "mariadb10-compatibility") == 0)
{
@ -349,17 +371,13 @@ int rc = 0;
{
inst->fileroot = strdup(value);
}
else if (strcmp(options[i], "initialfile") == 0)
{
inst->initbinlog = atoi(value);
}
else if (strcmp(options[i], "file") == 0)
{
inst->initbinlog = atoi(value);
}
else if (strcmp(options[i], "transaction_safety") == 0)
{
inst->trx_safe = atoi(value);
inst->trx_safe = config_truth_value(value);
}
else if (strcmp(options[i], "lowwater") == 0)
{

View File

@ -86,7 +86,7 @@ static void *CreateMySQLAuthData(char *username, char *password, char *database)
void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len);
void blr_master_close(ROUTER_INSTANCE *);
static char *blr_extract_column(GWBUF *buf, int col);
char *blr_extract_column(GWBUF *buf, int col);
void blr_cache_response(ROUTER_INSTANCE *router, char *response, GWBUF *buf);
void poll_fake_write_event(DCB *dcb);
GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router, unsigned long long pos, REP_HEADER *hdr);
@ -533,9 +533,11 @@ char query[128];
if (key)
free(key);
if (router->master_uuid)
free(router->master_uuid);
router->master_uuid = val;
/* set the master_uuid from master if not set by the option */
if (router->set_master_uuid == NULL)
router->master_uuid = val;
else
router->master_uuid = router->set_master_uuid;
// Response to the SERVER_UUID, should be stored
if (router->saved_master.uuid)
@ -948,7 +950,6 @@ int n_bufs = -1, pn_bufs = -1;
{
router->stats.n_binlogs++;
router->stats.n_binlogs_ses++;
router->lastEventReceived = hdr.event_type;
blr_extract_header(ptr, &hdr);
@ -996,16 +997,12 @@ int n_bufs = -1, pn_bufs = -1;
free(router->m_errmsg);
router->m_errmsg = NULL;
router->stats.n_binlogs++;
router->lastEventReceived = hdr.event_type;
router->lastEventTimestamp = hdr.timestamp;
spinlock_release(&router->lock);
#ifdef SHOW_EVENTS
printf("blr: event type 0x%02x, flags 0x%04x, event size %d, event timestamp %lu\n", hdr.event_type, hdr.flags, hdr.event_size, hdr.timestamp);
#endif
spinlock_release(&router->lock);
/*
* First check that the checksum we calculate matches the
* checksum in the packet we received.
@ -1039,6 +1036,12 @@ int n_bufs = -1, pn_bufs = -1;
}
}
router->stats.n_binlogs++;
router->stats.n_binlogs_ses++;
router->lastEventReceived = hdr.event_type;
router->lastEventTimestamp = hdr.timestamp;
/**
* Check for an open transaction, if the option is set
* Only complete transactions should be sent to sleves
@ -1711,7 +1714,7 @@ blr_master_connected(ROUTER_INSTANCE *router)
* @param col The column number to return
* @return The result form the column or NULL. The caller must free the result
*/
static char *
char *
blr_extract_column(GWBUF *buf, int col)
{
uint8_t *ptr;

View File

@ -48,6 +48,10 @@
* 29/06/2015 Massimiliano Pinto Successfully CHANGE MASTER results in updating master.ini
* in blr_handle_change_master()
* 20/08/2015 Massimiliano Pinto Added parsing and validation for CHANGE MASTER TO
* 21/08/2015 Massimiliano Pinto Added support for new config options:
* master_uuid, master_hostname, master_version
* If set those values are sent to slaves instead of
* saved master responses
*
* @endverbatim
*/
@ -78,6 +82,7 @@ extern void blr_master_close(ROUTER_INSTANCE* router);
extern void blr_file_use_binlog(ROUTER_INSTANCE *router, char *file);
extern int blr_file_new_binlog(ROUTER_INSTANCE *router, char *file);
extern int blr_file_write_master_config(ROUTER_INSTANCE *router, char *error);
extern char *blr_extract_column(GWBUF *buf, int col);
int blr_file_get_next_binlogname(ROUTER_INSTANCE *router);
uint32_t extract_field(uint8_t *src, int bits);
static void encode_value(unsigned char *data, unsigned int value, int len);
@ -125,6 +130,9 @@ static int blr_set_master_password(ROUTER_INSTANCE *router, char *password);
static int blr_parse_change_master_command(char *input, char *error_string, CHANGE_MASTER_OPTIONS *config);
static int blr_handle_change_master_token(char *input, char *error, CHANGE_MASTER_OPTIONS *config);
static void blr_master_free_parsed_options(CHANGE_MASTER_OPTIONS *options);
static int blr_slave_send_var_value(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *variable, char *value);
static int blr_slave_send_variable(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *variable, char *value, int column_type);
static int blr_slave_send_columndef_with_info_schema(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *name, int type, int len, uint8_t seqno);
void poll_fake_write_event(DCB *dcb);
@ -358,7 +366,20 @@ extern char *strcasestr();
else if (strcasecmp(word, "VERSION()") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.selectver);
if (router->set_master_version)
return blr_slave_send_var_value(router, slave, "VERSION()", router->set_master_version);
else
return blr_slave_replay(router, slave, router->saved_master.selectver);
}
else if (strcasecmp(word, "@@version") == 0)
{
free(query_text);
if (router->set_master_version)
return blr_slave_send_var_value(router, slave, "@@version", router->set_master_version);
else {
char *version = blr_extract_column(router->saved_master.selectver, 1);
return blr_slave_send_var_value(router, slave, "@@version", version);
}
}
else if (strcasecmp(word, "@@version_comment") == 0)
{
@ -372,7 +393,20 @@ extern char *strcasestr();
else if (strcasecmp(word, "@@hostname") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.selecthostname);
if (router->set_master_hostname)
return blr_slave_send_var_value(router, slave, "@@hostname", router->set_master_hostname);
else
return blr_slave_replay(router, slave, router->saved_master.selecthostname);
}
else if (strcasecmp(word, "@@server_uuid") == 0)
{
free(query_text);
if (router->set_master_uuid)
return blr_slave_send_var_value(router, slave, "@@server_uuid", router->master_uuid);
else {
char *master_uuid = blr_extract_column(router->saved_master.uuid, 2);
return blr_slave_send_var_value(router, slave, "@@server_uuid", master_uuid);
}
}
else if (strcasecmp(word, "@@max_allowed_packet") == 0)
{
@ -416,12 +450,19 @@ extern char *strcasestr();
else if (strcasecmp(word, "'SERVER_ID'") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.server_id);
if (router->set_master_server_id)
return blr_slave_send_variable(router, slave, "'SERVER_ID'", router->set_master_server_id, 0x03);
else
return blr_slave_replay(router, slave, router->saved_master.server_id);
}
else if (strcasecmp(word, "'SERVER_UUID'") == 0)
{
free(query_text);
return blr_slave_replay(router, slave, router->saved_master.uuid);
if (router->set_master_uuid)
return blr_slave_send_variable(router, slave, "'SERVER_UUID'", router->master_uuid, 0xf);
else
return blr_slave_replay(router, slave, router->saved_master.uuid);
}
else if (strcasecmp(word, "'MAXSCALE%'") == 0)
{
@ -3553,3 +3594,159 @@ blr_master_free_parsed_options(CHANGE_MASTER_OPTIONS *options) {
free(options->binlog_pos);
options->binlog_pos = NULL;
}
/**
* Send a MySQL protocol response for selected variable
*
* @param router The binlog router instance
* @param slave The slave server to which we are sending the response
* @param variable The variable name
* @param value The variable value
* @return Non-zero if data was sent
*/
static int
blr_slave_send_var_value(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *variable, char *value)
{
GWBUF *pkt;
uint8_t *ptr;
int len, vers_len;
vers_len = strlen(value);
blr_slave_send_fieldcount(router, slave, 1);
blr_slave_send_columndef(router, slave, variable, 0xf, vers_len, 2);
blr_slave_send_eof(router, slave, 3);
len = 5 + vers_len;
if ((pkt = gwbuf_alloc(len)) == NULL)
return 0;
ptr = GWBUF_DATA(pkt);
encode_value(ptr, vers_len + 1, 24); // Add length of data packet
ptr += 3;
*ptr++ = 0x04; // Sequence number in response
*ptr++ = vers_len; // Length of result string
strncpy((char *)ptr, value, vers_len); // Result string
ptr += vers_len;
slave->dcb->func.write(slave->dcb, pkt);
return blr_slave_send_eof(router, slave, 5);
}
/**
* Send the response to the SQL command "SHOW VARIABLES LIKE 'xxx'
*
* @param router The binlog router instance
* @param slave The slave server to which we are sending the response
* @return Non-zero if data was sent
*/
static int
blr_slave_send_variable(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *variable, char *value, int column_type)
{
GWBUF *pkt;
uint8_t *ptr;
int len, vers_len, seqno = 2;
char *p = strdup(variable);
int var_len;
char *old_value = p;
if(*p == '\'')
p++;
if (p[strlen(p)-1] == '\'')
p[strlen(p)-1] = '\0';
var_len = strlen(p);
for(int i = 0; i< var_len; i++) {
p[i] = tolower(p[i]);
}
blr_slave_send_fieldcount(router, slave, 2);
blr_slave_send_columndef_with_info_schema(router, slave, "Variable_name", 0xf, 40, seqno++);
blr_slave_send_columndef_with_info_schema(router, slave, "Value", column_type, 40, seqno++);
blr_slave_send_eof(router, slave, seqno++);
vers_len = strlen(value);
len = 5 + vers_len + var_len + 1;
if ((pkt = gwbuf_alloc(len)) == NULL)
return 0;
ptr = GWBUF_DATA(pkt);
encode_value(ptr, vers_len + 2 + var_len, 24); // Add length of data packet
ptr += 3;
*ptr++ = seqno++; // Sequence number in response
*ptr++ = var_len; // Length of result string
strncpy((char *)ptr, p, var_len); // Result string
ptr += var_len;
*ptr++ = vers_len; // Length of result string
strncpy((char *)ptr, value, vers_len); // Result string
ptr += vers_len;
slave->dcb->func.write(slave->dcb, pkt);
free(old_value);
return blr_slave_send_eof(router, slave, seqno++);
}
/**
* Send the column definition packet for a variable in a response packet sequence.
*
* It adds information_schema and variables an variable_name
* @param router The router
* @param slave The slave connection
* @param name Name of the column
* @param type Column type
* @param len Column length
* @param seqno Packet sequence number
* @return Non-zero on success
*/
static int
blr_slave_send_columndef_with_info_schema(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *name, int type, int len, uint8_t seqno)
{
GWBUF *pkt;
uint8_t *ptr;
int info_len = strlen("information_schema");
int virtual_table_name_len = strlen("VARIABLES");
int table_name_len = strlen("VARIABLES");
int column_name_len = strlen("Variable_name");
int orig_column_name_len = strlen("VARIABLE_NAME");
if ((pkt = gwbuf_alloc(4 + 22 + strlen(name) + info_len + virtual_table_name_len + table_name_len + orig_column_name_len)) == NULL)
return 0;
ptr = GWBUF_DATA(pkt);
encode_value(ptr, 22 + strlen(name) + info_len + virtual_table_name_len + table_name_len + orig_column_name_len, 24); // Add length of data packet
ptr += 3;
*ptr++ = seqno; // Sequence number in response
*ptr++ = 3; // Catalog is always def
*ptr++ = 'd';
*ptr++ = 'e';
*ptr++ = 'f';
*ptr++ = info_len; // Schema name length
strcpy((char *)ptr,"information_schema");
ptr += info_len;
*ptr++ = virtual_table_name_len; // virtual table name length
strcpy((char *)ptr, "VARIABLES");
ptr += virtual_table_name_len;
*ptr++ = table_name_len; // Table name length
strcpy((char *)ptr, "VARIABLES");
ptr += table_name_len;
*ptr++ = strlen(name); // Column name length;
while (*name)
*ptr++ = *name++; // Copy the column name
*ptr++ = orig_column_name_len; // Orginal column name
strcpy((char *)ptr, "VARIABLE_NAME");
ptr += orig_column_name_len;
*ptr++ = 0x0c; // Length of next fields always 12
*ptr++ = 0x3f; // Character set
*ptr++ = 0;
encode_value(ptr, len, 32); // Add length of column
ptr += 4;
*ptr++ = type;
*ptr++ = 0x81; // Two bytes of flags
if (type == 0xfd)
*ptr++ = 0x1f;
else
*ptr++ = 0x00;
*ptr++= 0;
*ptr++= 0;
*ptr++= 0;
return slave->dcb->func.write(slave->dcb, pkt);
}