MXS-1209: added SELECT @@GLOBAL.gtid_domain_id for Master GTID registration

Master GTID registration: added first step: support for “SELECT
@@GLOBAL.gtid_domain_id”
This commit is contained in:
MassimilianoPinto
2017-05-03 14:46:41 +02:00
parent c07350e710
commit f13410bae9
4 changed files with 172 additions and 238 deletions

View File

@ -57,7 +57,6 @@
*/
#include "blr.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@ -71,43 +70,31 @@
#include <maxscale/housekeeper.h>
#include <maxscale/buffer.h>
#include <maxscale/worker.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <maxscale/log_manager.h>
#include <maxscale/thread.h>
/* Temporary requirement for auth data */
#include <maxscale/protocol/mysql.h>
#include <maxscale/alloc.h>
static GWBUF *blr_make_query(DCB *dcb, char *query);
static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router);
void encode_value(unsigned char *data, unsigned int value, int len);
void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
static int blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
static int blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
static void *CreateMySQLAuthData(char *username, char *password, char *database);
void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
static void blr_log_packet(int priority, char *msg, uint8_t *ptr, int len);
void blr_master_close(ROUTER_INSTANCE *);
char *blr_extract_column(GWBUF *buf, int col);
void poll_fake_write_event(DCB *dcb);
GWBUF *blr_read_events_from_pos(ROUTER_INSTANCE *router,
unsigned long long pos,
REP_HEADER *hdr,
unsigned long long pos_end);
static void blr_check_last_master_event(void *inst);
extern int blr_check_heartbeat(ROUTER_INSTANCE *router);
static void blr_log_identity(ROUTER_INSTANCE *router);
static void blr_extract_header_semisync(uint8_t *pkt, REP_HEADER *hdr);
static int blr_send_semisync_ack (ROUTER_INSTANCE *router, uint64_t pos);
static int blr_get_master_semisync(GWBUF *buf);
static void blr_terminate_master_replication(ROUTER_INSTANCE *router,
uint8_t* ptr,
int len);
@ -140,6 +127,8 @@ static void blr_register_cache_response(ROUTER_INSTANCE *router,
const char *save_tag,
GWBUF *in_buf);
static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf);
static void blr_register_mariadb_gtid_domain(ROUTER_INSTANCE *router,
GWBUF *buf);
static void worker_cb_start_master(int worker_id, void* data);
static void blr_start_master_in_main(void* data);
@ -181,12 +170,14 @@ blr_start_master(void* data)
if (router->master_state != BLRM_SLAVE_STOPPED)
{
MXS_ERROR("%s: Master Connect: Unexpected master state %s\n",
router->service->name, blrm_states[router->master_state]);
router->service->name,
blrm_states[router->master_state]);
}
else
{
MXS_NOTICE("%s: Master Connect: binlog state is %s\n",
router->service->name, blrm_states[router->master_state]);
router->service->name,
blrm_states[router->master_state]);
}
spinlock_release(&router->lock);
return;
@ -233,7 +224,9 @@ blr_start_master(void* data)
if (name)
{
sprintf(name, "%s Master", router->service->name);
hktask_oneshot(name, blr_start_master_in_main, router,
hktask_oneshot(name,
blr_start_master_in_main,
router,
BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
MXS_FREE(name);
}
@ -247,13 +240,21 @@ blr_start_master(void* data)
}
router->master->remote = MXS_STRDUP_A(router->service->dbref->server->name);
MXS_NOTICE("%s: attempting to connect to master server [%s]:%d, binlog %s, pos %lu",
router->service->name, router->service->dbref->server->name,
router->service->dbref->server->port, router->binlog_name, router->current_pos);
MXS_NOTICE("%s: attempting to connect to master"
" server [%s]:%d, binlog %s, pos %lu",
router->service->name,
router->service->dbref->server->name,
router->service->dbref->server->port,
router->binlog_name,
router->current_pos);
router->connect_time = time(0);
if (setsockopt(router->master->fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive , sizeof(keepalive )))
if (setsockopt(router->master->fd,
SOL_SOCKET,
SO_KEEPALIVE,
&keepalive,
sizeof(keepalive)))
{
perror("setsockopt");
}
@ -444,14 +445,12 @@ blr_master_delayed_connect(ROUTER_INSTANCE *router)
void
blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
{
char query[BLRM_MASTER_REGITRATION_QUERY_LEN + 1];
char task_name[BLRM_TASK_NAME_LEN + 1] = "";
atomic_add(&router->handling_threads, 1);
ss_dassert(router->handling_threads == 1);
spinlock_acquire(&router->lock);
router->active_logs = 1;
spinlock_release(&router->lock);
if (router->master_state > BLRM_MAXSTATE)
{
MXS_ERROR("Invalid master state machine state (%d) for binlog router.",
@ -547,6 +546,7 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf)
{
blr_restart_master(router);
}
spinlock_acquire(&router->lock);
router->active_logs = 0;
spinlock_release(&router->lock);
@ -1647,143 +1647,6 @@ blr_extract_column(GWBUF *buf, int col)
return rval;
}
/**
* Read a replication event form current opened binlog into a GWBUF structure.
*
* @param router The router instance
* @param pos Position of binlog record to read
* @param hdr Binlog header to populate
* @return The binlog record wrapped in a GWBUF structure
*/
GWBUF
*blr_read_events_from_pos(ROUTER_INSTANCE *router,
unsigned long long pos,
REP_HEADER *hdr,
unsigned long long pos_end)
{
unsigned long long end_pos = 0;
uint8_t hdbuf[19];
uint8_t *data;
GWBUF *result;
int n;
int event_limit;
/* Get current binnlog position */
end_pos = pos_end;
/* end of file reached, we're done */
if (pos == end_pos)
{
return NULL;
}
/* error */
if (pos > end_pos)
{
MXS_ERROR("Reading saved events, the specified pos %llu "
"is ahead of current pos %lu for file %s",
pos, router->current_pos, router->binlog_name);
return NULL;
}
/* Read the event header information from the file */
if ((n = pread(router->binlog_fd, hdbuf, 19, pos)) != 19)
{
switch (n)
{
case 0:
MXS_DEBUG("Reading saved events: reached end of binlog file at %llu.", pos);
break;
case -1:
{
MXS_ERROR("Reading saved events: failed to read binlog "
"file %s at position %llu"
" (%s).", router->binlog_name,
pos, mxs_strerror(errno));
if (errno == EBADF)
{
MXS_ERROR("Reading saved events: bad file descriptor for file %s"
", descriptor %d.",
router->binlog_name, router->binlog_fd);
}
}
break;
default:
MXS_ERROR("Reading saved events: short read when reading the header. "
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %llu",
n, router->binlog_name, pos);
break;
}
return NULL;
}
hdr->timestamp = EXTRACT32(hdbuf);
hdr->event_type = hdbuf[4];
hdr->serverid = EXTRACT32(&hdbuf[5]);
hdr->event_size = extract_field(&hdbuf[9], 32);
hdr->next_pos = EXTRACT32(&hdbuf[13]);
hdr->flags = EXTRACT16(&hdbuf[17]);
event_limit = router->mariadb10_compat ? MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;
if (hdr->event_type > event_limit)
{
MXS_ERROR("Reading saved events: invalid event type 0x%x. "
"Binlog file is %s, position %llu",
hdr->event_type,
router->binlog_name, pos);
return NULL;
}
if ((result = gwbuf_alloc(hdr->event_size)) == NULL)
{
MXS_ERROR("Reading saved events: failed to allocate memory for binlog entry, "
"size %d at %llu.",
hdr->event_size, pos);
return NULL;
}
/* Copy event header*/
data = GWBUF_DATA(result);
memcpy(data, hdbuf, 19);
/* Read event data and put int into buffer after header */
if ((n = pread(router->binlog_fd, &data[19], hdr->event_size - 19, pos + 19)) != hdr->event_size - 19)
{
if (n == -1)
{
MXS_ERROR("Reading saved events: the event at %llu in %s. "
"%s, expected %d bytes.",
pos, router->binlog_name,
mxs_strerror(errno), hdr->event_size - 19);
}
else
{
MXS_ERROR("Reading saved events: short read when reading "
"the event at %llu in %s. "
"Expected %d bytes got %d bytes.",
pos, router->binlog_name, hdr->event_size - 19, n);
if (end_pos - pos < hdr->event_size)
{
MXS_ERROR("Reading saved events: binlog event "
"is close to the end of the binlog file, "
"current file size is %llu.", end_pos);
}
}
/* free buffer */
gwbuf_free(result);
return NULL;
}
return result;
}
/**
* Stop and start the master connection
*
@ -1794,7 +1657,8 @@ blr_stop_start_master(ROUTER_INSTANCE *router)
{
if (router->master)
{
if (router->master->fd != -1 && router->master->state == DCB_STATE_POLLING)
if (router->master->fd != -1 &&
router->master->state == DCB_STATE_POLLING)
{
blr_master_close(router);
}
@ -1819,7 +1683,8 @@ blr_stop_start_master(ROUTER_INSTANCE *router)
if (router->client)
{
if (router->client->fd != -1 && router->client->state == DCB_STATE_POLLING)
if (router->client->fd != -1 &&
router->client->state == DCB_STATE_POLLING)
{
dcb_close(router->client);
router->client = NULL;
@ -1871,7 +1736,9 @@ blr_check_last_master_event(void *inst)
* when master state is back to BLRM_BINLOGDUMP
* by blr_master_response()
*/
snprintf(task_name, BLRM_TASK_NAME_LEN, "%s heartbeat", router->service->name);
snprintf(task_name, BLRM_TASK_NAME_LEN,
"%s heartbeat",
router->service->name);
hktask_remove(task_name);
}
@ -1884,7 +1751,7 @@ blr_check_last_master_event(void *inst)
* that is currently set to 1
*
* @param router Current router instance
* @return 0 if master connection must be closed and opened again, 1 otherwise
* @return 0 if master connection must be closed and opened again, 1 otherwise
*/
int
@ -1900,7 +1767,8 @@ blr_check_heartbeat(ROUTER_INSTANCE *router)
event_desc = blr_last_event_description(router);
if (router->master_state == BLRM_BINLOGDUMP && router->lastEventReceived > 0)
if (router->master_state == BLRM_BINLOGDUMP &&
router->lastEventReceived > 0)
{
if ((t_now - router->stats.lastReply) > (router->heartbeat + BLR_NET_LATENCY_WAIT_TIME))
{
@ -1964,7 +1832,8 @@ static void blr_log_identity(ROUTER_INSTANCE *router)
MXS_NOTICE("%s: identity seen by the master: "
"server_id: %d, uuid: %s",
router->service->name,
router->serverid, (router->uuid == NULL ? "not available" : router->uuid));
router->serverid,
(router->uuid == NULL ? "not available" : router->uuid));
/* Seen by the slaves */
@ -1974,7 +1843,8 @@ static void blr_log_identity(ROUTER_INSTANCE *router)
MXS_NOTICE("%s: identity seen by the slaves: "
"server_id: %d, hostname: %s, MySQL version: %s",
router->service->name,
router->masterid, (master_hostname == NULL ? "not available" : master_hostname),
router->masterid,
(master_hostname == NULL ? "not available" : master_hostname),
(master_version == NULL ? "not available" : master_version));
}
else
@ -2181,7 +2051,9 @@ bool blr_send_event(blr_thread_role_t role,
* @param ptr Pointer to the start of the packet
* @param len Length of the packet
*/
static void blr_terminate_master_replication(ROUTER_INSTANCE* router, uint8_t* ptr, int len)
static void blr_terminate_master_replication(ROUTER_INSTANCE* router,
uint8_t* ptr,
int len)
{
unsigned long mysql_errno = extract_field(ptr + 5, 16);
int msg_len = len - 7 - 6; // msg len is decreased by 7 and 6
@ -2208,7 +2080,8 @@ static void blr_terminate_master_replication(ROUTER_INSTANCE* router, uint8_t* p
}
/**
* Populate a header structure for a replication message from a GWBUF structure with semi-sync enabled.
* Populate a header structure for a replication messager
* from a GWBUF structure with semi-sync enabled.
*
* @param pkt The incoming packet in a GWBUF chain
* @param hdr The packet header to populate
@ -2296,7 +2169,7 @@ blr_get_master_semisync(GWBUF *buf)
{
val = blr_extract_column(buf, 2);
}
free(key);
MXS_FREE(key);
if (val)
{
@ -2309,7 +2182,7 @@ blr_get_master_semisync(GWBUF *buf)
master_semisync = MASTER_SEMISYNC_DISABLED;
}
}
free(val);
MXS_FREE(val);
return master_semisync;
}
@ -2861,7 +2734,7 @@ static void blr_register_cache_response(ROUTER_INSTANCE *router,
static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf)
{
char task_name[BLRM_TASK_NAME_LEN + 1] = "";
switch (router->master_state)
{
case BLRM_TIMESTAMP:
@ -2899,29 +2772,39 @@ static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf)
blr_register_send_command(router, command, state);
}
break;
case BLRM_MARIADB10:
case BLRM_MARIADB10: // MariaDB10 Only
// Save server response
blr_register_cache_response(router,
&router->saved_master.mariadb10,
"mariadb10",
buf);
// Skip SERVER_UUID fetch and SET slave UUID
blr_register_send_command(router,
"SET NAMES latin1",
BLRM_LATIN1);
// Next state is BLRM_MARIADB10_GTID_DOMAIN or BLRM_LATIN1
{
unsigned int state = router->mariadb_gtid ?
BLRM_MARIADB10_GTID_DOMAIN :
BLRM_LATIN1;
const char *command = router->mariadb_gtid ?
"SELECT @@GLOBAL.gtid_domain_id" :
"SET NAMES latin1";
blr_register_send_command(router, command, state);
}
break;
case BLRM_GTIDMODE: // MySQL 5.6/7 only
case BLRM_MARIADB10_GTID_DOMAIN: // MariaDB10 Only
// Next state is BLRM_LATIN1
blr_register_mariadb_gtid_domain(router, buf);
break;
case BLRM_GTIDMODE: // MySQL 5.6/5.7 only
blr_register_serveruuid(router, buf);
break;
case BLRM_MUUID: // MySQL 5.6/7 only
case BLRM_MUUID: // MySQL 5.6/5.7 only
blr_register_slaveuuid(router, buf);
break;
case BLRM_SUUID: // MySQL 5.6/7 only
case BLRM_SUUID: // MySQL 5.6/5.7 only
// Save server response
blr_register_cache_response(router,
&router->saved_master.setslaveuuid,
"ssuuid",
buf);
buf);
blr_register_send_command(router,
"SET NAMES latin1",
BLRM_LATIN1);
@ -2935,8 +2818,7 @@ static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf)
&router->saved_master.utf8,
"utf8",
buf);
// Next state is MAXWELL BLRM_RESULTS_CHARSET or
// BLRM_SELECT1
// Next state is MAXWELL BLRM_RESULTS_CHARSET or BLRM_SELECT1
{
unsigned int state = router->maxwell_compat ?
BLRM_RESULTS_CHARSET :
@ -2948,13 +2830,13 @@ static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf)
blr_register_send_command(router, command, state);
break;
}
case BLRM_RESULTS_CHARSET:
case BLRM_RESULTS_CHARSET: // MAXWELL only
gwbuf_free(buf); // Discard server reply, don't save it
blr_register_send_command(router,
MYSQL_CONNECTOR_SQL_MODE_QUERY,
BLRM_SQL_MODE);
break;
case BLRM_SQL_MODE:
case BLRM_SQL_MODE: // MAXWELL only
gwbuf_free(buf); // Discard server reply, don't save it
blr_register_send_command(router,
"SELECT 1",
@ -2990,7 +2872,7 @@ static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf)
// Continue: ready for the registration, nothing to write/read
router->master_state = BLRM_REGISTER_READY;
}
case BLRM_SERVER_VARS:
case BLRM_SERVER_VARS: // MAXWELL only
/**
* This branch could be reached as fallthrough from BLRM_MAP
* with new state BLRM_REGISTER_READY
@ -3001,7 +2883,7 @@ static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf)
blr_register_mxw_binlogvars(router, buf);
break;
}
case BLRM_BINLOG_VARS:
case BLRM_BINLOG_VARS: // MAXWELL only
/**
* This branch could be reached as fallthrough from BLRM_MAP
* with new state BLRM_REGISTER_READY.
@ -3012,7 +2894,7 @@ static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf)
blr_register_mxw_tables(router, buf);
break;
}
case BLRM_LOWER_CASE_TABLES:
case BLRM_LOWER_CASE_TABLES: // MAXWELL only
/**
* This branch could be reached as fallthrough from BLRM_MAP
* with new state BLRM_REGISTER_READY.
@ -3061,8 +2943,9 @@ static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf)
}
case BLRM_REQUEST_SEMISYNC:
/**
* This branch could be reached as fallthrough from BLRM_REGISTER or BLRM_CHECK_SEMISYNC
* if request_semi_sync option is false or master doesn't support semisync or it's not enabled
* This branch could be reached as fallthrough from BLRM_REGISTER or
* BLRM_CHECK_SEMISYNC if request_semi_sync option is false or
* master doesn't support semisync or it's not enabled.
*/
if (router->master_state == BLRM_REQUEST_SEMISYNC)
{
@ -3075,10 +2958,15 @@ static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf)
case BLRM_REQUEST_BINLOGDUMP:
/**
* This branch is reached after semi-sync check/request or
* just after sending COM_REGISTER_SLAVE if request_semi_sync option is false
* just after sending COM_REGISTER_SLAVE
* if request_semi_sync option is false.
*/
/* Request now a dump of the binlog file: COM_BINLOG_DUMP */
/**
* End of registration process:
*
* Now request a dump of the binlog file: COM_BINLOG_DUMP
*/
buf = blr_make_binlog_dump(router);
router->master_state = BLRM_BINLOGDUMP;
router->master->func.write(router->master, buf);
@ -3102,9 +2990,40 @@ static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf)
/**
* Set heartbeat check task
*/
snprintf(task_name, BLRM_TASK_NAME_LEN, "%s heartbeat", router->service->name);
hktask_add(task_name, blr_check_last_master_event, router, router->heartbeat);
snprintf(task_name,
BLRM_TASK_NAME_LEN,
"%s heartbeat",
router->service->name);
hktask_add(task_name,
blr_check_last_master_event,
router,
router->heartbeat);
break;
}
}
/**
* Slave Protocol registration to Master (MariaDB 10 compatibility):
*
* Handles previous reply from MariaDB10 Master (GTID Domain ID) and
* sets the state to BLRM_LATIN1
*
* @param router Current router instance
* @param buf GWBUF with server reply to previous
* registration command
*/
static void blr_register_mariadb_gtid_domain(ROUTER_INSTANCE *router,
GWBUF *buf)
{
// Extract GTID domain
char *val = blr_extract_column(buf, 1);
// Store the Master GTID domain
router->mariadb_gtid_domain = atol(val);
MXS_FREE(val);
// Don't save the server response
gwbuf_free(buf);
blr_register_send_command(router,
"SET NAMES latin1",
BLRM_LATIN1);
}