MXS-1209: added blr_handle_fake_gtid_list
New routine blr_handle_fake_gtid_list added for fake GTID_LIST_EVENT. blr_file_append O_APPEND is set only if mariadb10_master_gtid is not set. blr_file_append() routine could change name in next commits
This commit is contained in:
@ -695,6 +695,19 @@ createInstance(SERVICE *service, char **options)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check mariadb10_compat option before any other mariadb10 option.
|
||||||
|
*/
|
||||||
|
if (!inst->mariadb10_compat &&
|
||||||
|
inst->mariadb10_master_gtid)
|
||||||
|
{
|
||||||
|
MXS_ERROR("MariaDB Master GTID registration needs"
|
||||||
|
" MariaDB compatibilty option."
|
||||||
|
" Please enable it with option 'mariadb10-compatibility=On'");
|
||||||
|
free_instance(inst);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Force GTID slave request handling if GTID Master registration is On
|
* Force GTID slave request handling if GTID Master registration is On
|
||||||
*/
|
*/
|
||||||
|
@ -515,8 +515,6 @@ typedef struct
|
|||||||
GWBUF *server_vars; /*< MySQL Connector master server variables */
|
GWBUF *server_vars; /*< MySQL Connector master server variables */
|
||||||
GWBUF *binlog_vars; /*< SELECT @@global.log_bin, @@global.binlog_format, @@global.binlog_row_image; */
|
GWBUF *binlog_vars; /*< SELECT @@global.log_bin, @@global.binlog_format, @@global.binlog_row_image; */
|
||||||
GWBUF *lower_case_tables; /*< select @@lower_case_table_names */
|
GWBUF *lower_case_tables; /*< select @@lower_case_table_names */
|
||||||
uint8_t *fde_event; /*< Format Description Event */
|
|
||||||
int fde_len; /*< Length of fde_event */
|
|
||||||
} MASTER_RESPONSES;
|
} MASTER_RESPONSES;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -463,12 +463,19 @@ blr_file_append(ROUTER_INSTANCE *router, char *file)
|
|||||||
{
|
{
|
||||||
char path[PATH_MAX + 1] = "";
|
char path[PATH_MAX + 1] = "";
|
||||||
int fd;
|
int fd;
|
||||||
|
int flags = O_RDWR;
|
||||||
|
|
||||||
|
/* If Master GTID registration is not set, then use append */
|
||||||
|
if (!router->mariadb10_master_gtid)
|
||||||
|
{
|
||||||
|
flags |= O_APPEND;
|
||||||
|
}
|
||||||
|
|
||||||
strcpy(path, router->binlogdir);
|
strcpy(path, router->binlogdir);
|
||||||
strcat(path, "/");
|
strcat(path, "/");
|
||||||
strcat(path, file);
|
strcat(path, file);
|
||||||
|
|
||||||
if ((fd = open(path, O_RDWR | O_APPEND, 0666)) == -1)
|
if ((fd = open(path, flags, 0666)) == -1)
|
||||||
{
|
{
|
||||||
MXS_ERROR("Failed to open binlog file %s for append.",
|
MXS_ERROR("Failed to open binlog file %s for append.",
|
||||||
path);
|
path);
|
||||||
|
@ -129,6 +129,13 @@ static void blr_register_cache_response(ROUTER_INSTANCE *router,
|
|||||||
static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf);
|
static void blr_start_master_registration(ROUTER_INSTANCE *router, GWBUF *buf);
|
||||||
static void blr_register_mariadb_gtid_domain(ROUTER_INSTANCE *router,
|
static void blr_register_mariadb_gtid_domain(ROUTER_INSTANCE *router,
|
||||||
GWBUF *buf);
|
GWBUF *buf);
|
||||||
|
static bool blr_handle_fake_rotate(ROUTER_INSTANCE *router,
|
||||||
|
REP_HEADER *hdr,
|
||||||
|
uint8_t *ptr);
|
||||||
|
static void blr_handle_fake_gtid_list(ROUTER_INSTANCE *router,
|
||||||
|
REP_HEADER *hdr,
|
||||||
|
uint8_t *ptr);
|
||||||
|
|
||||||
|
|
||||||
static void worker_cb_start_master(int worker_id, void* data);
|
static void worker_cb_start_master(int worker_id, void* data);
|
||||||
static void blr_start_master_in_main(void* data);
|
static void blr_start_master_in_main(void* data);
|
||||||
@ -568,7 +575,7 @@ blr_make_query(DCB *dcb, char *query)
|
|||||||
unsigned char *data;
|
unsigned char *data;
|
||||||
int len;
|
int len;
|
||||||
|
|
||||||
if ((buf = gwbuf_alloc(strlen(query) + 5)) == NULL)
|
if ((buf = gwbuf_alloc(strlen(query) + MYSQL_HEADER_LEN + 1)) == NULL)
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@ -602,7 +609,7 @@ blr_make_registration(ROUTER_INSTANCE *router)
|
|||||||
int len = 18;
|
int len = 18;
|
||||||
int port = 3306;
|
int port = 3306;
|
||||||
|
|
||||||
if ((buf = gwbuf_alloc(len + 4)) == NULL)
|
if ((buf = gwbuf_alloc(len + MYSQL_HEADER_LEN)) == NULL)
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@ -646,7 +653,7 @@ blr_make_binlog_dump(ROUTER_INSTANCE *router)
|
|||||||
/* COM_BINLOG_DUMP needs 11 bytes + binlogname (terminating NULL is not required) */
|
/* COM_BINLOG_DUMP needs 11 bytes + binlogname (terminating NULL is not required) */
|
||||||
int len = 11 + binlog_file_len;
|
int len = 11 + binlog_file_len;
|
||||||
|
|
||||||
if ((buf = gwbuf_alloc(len + 4)) == NULL)
|
if ((buf = gwbuf_alloc(len + MYSQL_HEADER_LEN)) == NULL)
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@ -749,7 +756,7 @@ static void reset_errors(ROUTER_INSTANCE *router, REP_HEADER *hdr)
|
|||||||
#ifdef SHOW_EVENTS
|
#ifdef SHOW_EVENTS
|
||||||
printf("blr: len %lu, event type 0x%02x, flags 0x%04x, "
|
printf("blr: len %lu, event type 0x%02x, flags 0x%04x, "
|
||||||
"event size %d, event timestamp %lu\n",
|
"event size %d, event timestamp %lu\n",
|
||||||
(unsigned long)len - 4,
|
(unsigned long)len - MYSQL_HEADER_LEN,
|
||||||
hdr->event_type,
|
hdr->event_type,
|
||||||
hdr->flags,
|
hdr->flags,
|
||||||
hdr->event_size,
|
hdr->event_size,
|
||||||
@ -1157,37 +1164,18 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
|||||||
router->stats.events[hdr.event_type]++;
|
router->stats.events[hdr.event_type]++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FORMAT_DESCRIPTION_EVENT with next_pos = 0
|
||||||
|
* should not be saved
|
||||||
|
*/
|
||||||
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
|
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0)
|
||||||
{
|
{
|
||||||
// Fake format description message
|
router->stats.n_fakeevents++;
|
||||||
MXS_DEBUG("Replication fake event. "
|
MXS_DEBUG("Replication Fake FORMAT_DESCRIPTION_EVENT event. "
|
||||||
"Binlog %s @ %lu.",
|
"Binlog %s @ %lu.",
|
||||||
router->binlog_name,
|
router->binlog_name,
|
||||||
router->current_pos);
|
router->current_pos);
|
||||||
router->stats.n_fakeevents++;
|
|
||||||
|
|
||||||
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
|
|
||||||
{
|
|
||||||
uint8_t *new_fde;
|
|
||||||
unsigned int new_fde_len;
|
|
||||||
/*
|
|
||||||
* We need to save this to replay to new
|
|
||||||
* slaves that attach later.
|
|
||||||
*/
|
|
||||||
new_fde_len = hdr.event_size;
|
|
||||||
new_fde = MXS_MALLOC(hdr.event_size);
|
|
||||||
|
|
||||||
if (new_fde)
|
|
||||||
{
|
|
||||||
memcpy(new_fde, ptr + MYSQL_HEADER_LEN + 1, hdr.event_size);
|
|
||||||
if (router->saved_master.fde_event)
|
|
||||||
{
|
|
||||||
MXS_FREE(router->saved_master.fde_event);
|
|
||||||
}
|
|
||||||
router->saved_master.fde_event = new_fde;
|
|
||||||
router->saved_master.fde_len = new_fde_len;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -1338,7 +1326,12 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* Here we handle Artificial event, the ones with
|
||||||
|
* LOG_EVENT_ARTIFICIAL_F hdr.flags
|
||||||
|
*/
|
||||||
router->stats.n_artificial++;
|
router->stats.n_artificial++;
|
||||||
|
|
||||||
MXS_DEBUG("Artificial event not written "
|
MXS_DEBUG("Artificial event not written "
|
||||||
"to disk or distributed. "
|
"to disk or distributed. "
|
||||||
"Type 0x%x, Length %d, Binlog "
|
"Type 0x%x, Length %d, Binlog "
|
||||||
@ -1347,13 +1340,13 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
|||||||
hdr.event_size,
|
hdr.event_size,
|
||||||
router->binlog_name,
|
router->binlog_name,
|
||||||
router->current_pos);
|
router->current_pos);
|
||||||
|
|
||||||
ptr += MYSQL_HEADER_LEN + 1;
|
ptr += MYSQL_HEADER_LEN + 1;
|
||||||
|
|
||||||
|
// Fake Rotate event is always sent as first packet from master
|
||||||
if (hdr.event_type == ROTATE_EVENT)
|
if (hdr.event_type == ROTATE_EVENT)
|
||||||
{
|
{
|
||||||
spinlock_acquire(&router->binlog_lock);
|
if (!blr_handle_fake_rotate(router, &hdr, ptr))
|
||||||
router->rotating = 1;
|
|
||||||
spinlock_release(&router->binlog_lock);
|
|
||||||
if (!blr_rotate_event(router, ptr, &hdr))
|
|
||||||
{
|
{
|
||||||
gwbuf_free(pkt);
|
gwbuf_free(pkt);
|
||||||
blr_master_close(router);
|
blr_master_close(router);
|
||||||
@ -1361,6 +1354,15 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else if (hdr.event_type == MARIADB10_GTID_GTID_LIST_EVENT)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* MariaDB10 event:
|
||||||
|
* it could be sent as part of GTID registration
|
||||||
|
* before sending change data events.
|
||||||
|
*/
|
||||||
|
blr_handle_fake_gtid_list(router, &hdr, ptr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1420,15 +1422,15 @@ blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr)
|
|||||||
uint64_t pos;
|
uint64_t pos;
|
||||||
char file[BINLOG_FNAMELEN + 1];
|
char file[BINLOG_FNAMELEN + 1];
|
||||||
|
|
||||||
ptr += 19; // Skip event header
|
ptr += BINLOG_EVENT_HDR_LEN; // Skip event header
|
||||||
len = hdr->event_size - 19; // Event size minus header
|
len = hdr->event_size - BINLOG_EVENT_HDR_LEN; // Event size minus header
|
||||||
pos = extract_field(ptr + 4, 32);
|
pos = extract_field(ptr + 4, 32);
|
||||||
pos <<= 32;
|
pos <<= 32;
|
||||||
pos |= extract_field(ptr, 32);
|
pos |= extract_field(ptr, 32);
|
||||||
slen = len - (8 + 4); // Allow for position and CRC
|
slen = len - (8 + BINLOG_EVENT_CRC_SIZE); // Allow for position and CRC
|
||||||
if (router->master_chksum == 0)
|
if (!router->master_chksum)
|
||||||
{
|
{
|
||||||
slen += 4;
|
slen += BINLOG_EVENT_CRC_SIZE;
|
||||||
}
|
}
|
||||||
if (slen > BINLOG_FNAMELEN)
|
if (slen > BINLOG_FNAMELEN)
|
||||||
{
|
{
|
||||||
@ -2124,7 +2126,7 @@ blr_send_semisync_ack(ROUTER_INSTANCE *router, uint64_t pos)
|
|||||||
len = 1 + 8 + binlog_file_len;
|
len = 1 + 8 + binlog_file_len;
|
||||||
|
|
||||||
/* add network header to size */
|
/* add network header to size */
|
||||||
if ((buf = gwbuf_alloc(len + 4)) == NULL)
|
if ((buf = gwbuf_alloc(len + MYSQL_HEADER_LEN)) == NULL)
|
||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -3027,3 +3029,106 @@ static void blr_register_mariadb_gtid_domain(ROUTER_INSTANCE *router,
|
|||||||
"SET NAMES latin1",
|
"SET NAMES latin1",
|
||||||
BLRM_LATIN1);
|
BLRM_LATIN1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The routine hanldes a Fake ROTATE_EVENT
|
||||||
|
*
|
||||||
|
* @param router The router instance
|
||||||
|
* @param hdr The Replication event header
|
||||||
|
* @param ptr The packet data
|
||||||
|
* @return True for succesfull binlog file rotation,
|
||||||
|
* False otherwise.
|
||||||
|
*/
|
||||||
|
static bool blr_handle_fake_rotate(ROUTER_INSTANCE *router,
|
||||||
|
REP_HEADER *hdr,
|
||||||
|
uint8_t *ptr)
|
||||||
|
{
|
||||||
|
ss_dassert(hdr->event_type == ROTATE_EVENT);
|
||||||
|
|
||||||
|
uint64_t pos;
|
||||||
|
int len, slen;
|
||||||
|
char file[BINLOG_FNAMELEN + 1];
|
||||||
|
|
||||||
|
len = hdr->event_size - BINLOG_EVENT_HDR_LEN; // Event size minus header
|
||||||
|
slen = len - (8 + BINLOG_EVENT_CRC_SIZE); // Allow for position and CRC
|
||||||
|
if (!router->master_chksum)
|
||||||
|
{
|
||||||
|
slen += BINLOG_EVENT_CRC_SIZE;
|
||||||
|
}
|
||||||
|
if (slen > BINLOG_FNAMELEN)
|
||||||
|
{
|
||||||
|
slen = BINLOG_FNAMELEN;
|
||||||
|
}
|
||||||
|
memcpy(file, ptr + BINLOG_EVENT_HDR_LEN + 8, slen);
|
||||||
|
file[slen] = 0;
|
||||||
|
|
||||||
|
pos = extract_field(ptr + BINLOG_EVENT_HDR_LEN + 4, 32);
|
||||||
|
pos <<= 32;
|
||||||
|
pos |= extract_field(ptr + BINLOG_EVENT_HDR_LEN, 32);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TODO: Detect any missing file in sequence.
|
||||||
|
*/
|
||||||
|
|
||||||
|
spinlock_acquire(&router->binlog_lock);
|
||||||
|
|
||||||
|
/* Set writing pos to 4 if Master GTID */
|
||||||
|
if (router->mariadb10_master_gtid && pos == 4)
|
||||||
|
{
|
||||||
|
// Set pos = 4
|
||||||
|
router->last_written = BINLOG_MAGIC_SIZE;
|
||||||
|
router->current_pos = BINLOG_MAGIC_SIZE;
|
||||||
|
router->binlog_position = BINLOG_MAGIC_SIZE;
|
||||||
|
router->current_safe_event = BINLOG_MAGIC_SIZE;
|
||||||
|
router->last_event_pos = BINLOG_MAGIC_SIZE;
|
||||||
|
}
|
||||||
|
|
||||||
|
router->rotating = 1;
|
||||||
|
|
||||||
|
spinlock_release(&router->binlog_lock);
|
||||||
|
|
||||||
|
if (!blr_rotate_event(router, ptr, hdr))
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The routine hanldes a Fake GTID_LIST_EVENT, MariaDB 10 only.
|
||||||
|
*
|
||||||
|
* Fake MARIADB10_GTID_GTID_LIST_EVENT could be sent
|
||||||
|
* when using GTID registration with MariaDB 10 server
|
||||||
|
* The event header 'next_pos' tells where to write
|
||||||
|
* the next event.
|
||||||
|
* We set internal pointers to that position.
|
||||||
|
*
|
||||||
|
* @param router The router instance
|
||||||
|
* @param hdr The Replication event header
|
||||||
|
* @param ptr The packet data
|
||||||
|
*/
|
||||||
|
static void blr_handle_fake_gtid_list(ROUTER_INSTANCE *router,
|
||||||
|
REP_HEADER *hdr,
|
||||||
|
uint8_t *ptr)
|
||||||
|
{
|
||||||
|
ss_dassert(hdr->event_type == MARIADB10_GTID_GTID_LIST_EVENT);
|
||||||
|
|
||||||
|
if (router->mariadb10_master_gtid)
|
||||||
|
{
|
||||||
|
MXS_INFO("Fake GTID_LIST received: file %s, pos %lu. Next event at pos %lu\n",
|
||||||
|
router->binlog_name,
|
||||||
|
(unsigned long)router->current_pos,
|
||||||
|
(unsigned long)hdr->next_pos);
|
||||||
|
|
||||||
|
/* We can write in any (after FDE and STE) binlog file position */
|
||||||
|
/* TODO: fill any GAP with an ignorable event */
|
||||||
|
|
||||||
|
spinlock_acquire(&router->binlog_lock);
|
||||||
|
|
||||||
|
router->last_written = hdr->next_pos;
|
||||||
|
router->last_event_pos = router->current_pos;
|
||||||
|
router->current_pos = hdr->next_pos;
|
||||||
|
|
||||||
|
spinlock_release(&router->binlog_lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user