First fix for 16MB handling in the master part

First fix for 16MB handling in the master part.

Distribute events to up to date slave is not included yet
This commit is contained in:
MassimilianoPinto
2016-02-08 14:52:50 +01:00
committed by Markus Makela
parent ab1fb90d86
commit d3e1d4dd2f
2 changed files with 157 additions and 72 deletions

View File

@ -415,6 +415,7 @@ typedef struct router_instance {
SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */ SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */
int trx_safe; /*< Detect and handle partial transactions */ int trx_safe; /*< Detect and handle partial transactions */
int pending_transaction; /*< Pending transaction */ int pending_transaction; /*< Pending transaction */
int pending_16mb; /*< Pending larger than 16mb transmission */
uint64_t last_safe_pos; /* last committed transaction */ uint64_t last_safe_pos; /* last committed transaction */
char binlog_name[BINLOG_FNAMELEN+1]; char binlog_name[BINLOG_FNAMELEN+1];
/*< Name of the current binlog file */ /*< Name of the current binlog file */

View File

@ -99,6 +99,8 @@ extern char * blr_last_event_description(ROUTER_INSTANCE *router);
static void blr_log_identity(ROUTER_INSTANCE *router); static void blr_log_identity(ROUTER_INSTANCE *router);
static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code); static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code);
int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint32_t pos, uint8_t *buf);
static int keepalive = 1; static int keepalive = 1;
/** /**
@ -790,6 +792,9 @@ int no_residual = 1;
int preslen = -1; int preslen = -1;
int prev_length = -1; int prev_length = -1;
int n_bufs = -1, pn_bufs = -1; int n_bufs = -1, pn_bufs = -1;
int event_limit;
uint32_t totalsize = 0;
uint32_t partialpos = 0;
/* /*
* Prepend any residual buffer to the buffer chain we have * Prepend any residual buffer to the buffer chain we have
@ -931,6 +936,8 @@ int n_bufs = -1, pn_bufs = -1;
MXS_NOTICE("Non-event message (%s) from master.", msg); MXS_NOTICE("Non-event message (%s) from master.", msg);
} }
else else
{
if (!router->pending_16mb)
{ {
router->stats.n_binlogs++; router->stats.n_binlogs++;
router->stats.n_binlogs_ses++; router->stats.n_binlogs_ses++;
@ -938,7 +945,9 @@ int n_bufs = -1, pn_bufs = -1;
blr_extract_header(ptr, &hdr); blr_extract_header(ptr, &hdr);
/* Sanity check */ /* Sanity check */
if (hdr.ok == 0 && hdr.event_size != len - 5) if (hdr.ok == 0 && (hdr.event_size != len - 5))
{
if ((hdr.event_size + 1) < 0x00ffffff)
{ {
MXS_ERROR("Packet length is %d, but event size is %d, " MXS_ERROR("Packet length is %d, but event size is %d, "
"binlog file %s position %lu " "binlog file %s position %lu "
@ -953,21 +962,33 @@ int n_bufs = -1, pn_bufs = -1;
"Residual data from previous call") : "")); "Residual data from previous call") : ""));
blr_log_packet(LOG_ERR, "Packet:", ptr, len); blr_log_packet(LOG_ERR, "Packet:", ptr, len);
MXS_ERROR("This event (0x%x) was contained in %d GWBUFs, " MXS_ERROR("This event (0x%x) was contained in %d GWBUFs, "
"the previous events was contained in %d GWBUFs", "the previous events was contained in %d GWBUFs",
router->lastEventReceived, n_bufs, pn_bufs); router->lastEventReceived, n_bufs, pn_bufs);
if (msg) if (msg)
{ {
free(msg); free(msg);
msg = NULL; msg = NULL;
} }
break; break;
} }
}
else
{
MXS_INFO("Transmission of event > 16MB");
spinlock_acquire(&router->binlog_lock);
router->pending_16mb = 1;
totalsize = hdr.event_size + 1;
partialpos = router->current_pos;
spinlock_release(&router->binlog_lock);
}
if (hdr.ok == 0) if (hdr.ok == 0)
{ {
int event_limit;
spinlock_acquire(&router->lock); spinlock_acquire(&router->lock);
/* set mysql errno to 0 */ /* set mysql errno to 0 */
@ -979,16 +1000,16 @@ int n_bufs = -1, pn_bufs = -1;
router->m_errmsg = NULL; router->m_errmsg = NULL;
spinlock_release(&router->lock); spinlock_release(&router->lock);
#define SHOW_EVENTS
#ifdef SHOW_EVENTS #ifdef SHOW_EVENTS
printf("blr: event type 0x%02x, flags 0x%04x, event size %d, event timestamp %lu\n", hdr.event_type, hdr.flags, hdr.event_size, hdr.timestamp); printf("blr: len %lu, event type 0x%02x, flags 0x%04x, event size %d, event timestamp %lu\n", (unsigned long)len-4, hdr.event_type, hdr.flags, hdr.event_size, (unsigned long)hdr.timestamp);
#endif #endif
/* /*
* First check that the checksum we calculate matches the * First check that the checksum we calculate matches the
* checksum in the packet we received. * checksum in the packet we received.
*/ */
if (router->master_chksum) if (router->master_chksum && !router->pending_16mb)
{ {
uint32_t chksum, pktsum; uint32_t chksum, pktsum;
@ -1015,7 +1036,49 @@ int n_bufs = -1, pn_bufs = -1;
return; return;
} }
} }
}
}
/* pending 16 mb */
if (router->pending_16mb)
{
uint32_t data_len;
if (totalsize >= 0x00ffffff)
data_len = 0x00ffffff;
else
data_len = len-5;
/* pending 16 mb */
/* current partial event is being written to disk file */
if (blr_write_data_into_binlog(router, data_len, partialpos, ptr) == 0)
{
/*
* Failed to write to the
* binlog file, destroy the
* buffer chain and close the
* connection with the master
*/
while ((pkt = gwbuf_consume(pkt,
GWBUF_LENGTH(pkt))) != NULL);
blr_master_close(router);
blr_master_delayed_connect(router);
return;
}
partialpos += data_len;
if (data_len >= 0x00ffffff)
{
totalsize -= 0x00ffffff;
continue;
}
else
{
router->pending_16mb = 0;
}
}
if (hdr.ok == 0)
{
router->lastEventReceived = hdr.event_type; router->lastEventReceived = hdr.event_type;
router->lastEventTimestamp = hdr.timestamp; router->lastEventTimestamp = hdr.timestamp;
@ -2230,3 +2293,24 @@ ROUTER_SLAVE *slave;
spinlock_release(&router->lock); spinlock_release(&router->lock);
} }
int
blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint32_t pos, uint8_t *buf)
{
int n;
if ((n = pwrite(router->binlog_fd, buf, data_len,
pos)) != data_len)
{
char err_msg[STRERROR_BUFLEN];
MXS_ERROR("%s: Failed to write binlog record at %d of %s, %s. "
"Truncating to previous record.",
router->service->name, pos,
router->binlog_name,
strerror_r(errno, err_msg, sizeof(err_msg)));
/* Remove any partial event that was written */
ftruncate(router->binlog_fd, pos);
return 0;
}
return n;
}