Cleaned up the code based on the code review
Added missing error condition checks and cleaned up code.
This commit is contained in:
@ -199,12 +199,15 @@
|
|||||||
#define MYSQL_ERROR_MSG(buf) ((uint8_t *)GWBUF_DATA(buf) + 7)
|
#define MYSQL_ERROR_MSG(buf) ((uint8_t *)GWBUF_DATA(buf) + 7)
|
||||||
#define MYSQL_COMMAND(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4))
|
#define MYSQL_COMMAND(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4))
|
||||||
|
|
||||||
|
/** Possible states of an event sent by the master */
|
||||||
enum blr_event_state
|
enum blr_event_state
|
||||||
{
|
{
|
||||||
BLR_EVENT_DONE,
|
BLR_EVENT_DONE, /*< No event being processed */
|
||||||
BLR_EVENT_STARTED,
|
BLR_EVENT_STARTED, /*< The first packet of an event which spans multiple packets
|
||||||
BLR_EVENT_ONGOING,
|
* has been received */
|
||||||
BLR_EVENT_COMPLETE
|
BLR_EVENT_ONGOING, /*< Other packets of a multi-packet event are being processed */
|
||||||
|
BLR_EVENT_COMPLETE /*< A multi-packet event has been successfully processed
|
||||||
|
* but the router is not yet ready to process another one */
|
||||||
};
|
};
|
||||||
|
|
||||||
/* Master Server configuration struct */
|
/* Master Server configuration struct */
|
||||||
|
@ -795,9 +795,6 @@ int no_residual = 1;
|
|||||||
int preslen = -1;
|
int preslen = -1;
|
||||||
int prev_length = -1;
|
int prev_length = -1;
|
||||||
int n_bufs = -1, pn_bufs = -1;
|
int n_bufs = -1, pn_bufs = -1;
|
||||||
int event_limit;
|
|
||||||
uint32_t totalsize = 0;
|
|
||||||
uint32_t partialpos = 0;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Prepend any residual buffer to the buffer chain we have
|
* Prepend any residual buffer to the buffer chain we have
|
||||||
@ -985,6 +982,11 @@ uint32_t partialpos = 0;
|
|||||||
/** Store the header for later use */
|
/** Store the header for later use */
|
||||||
memcpy(&router->stored_header, &hdr, sizeof(hdr));
|
memcpy(&router->stored_header, &hdr, sizeof(hdr));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Prepare the checksum variables for this event */
|
||||||
|
router->stored_checksum = crc32(0L, NULL, 0);
|
||||||
|
router->checksum_size = hdr.event_size - MYSQL_CHECKSUM_LEN;
|
||||||
|
router->partial_checksum_bytes = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hdr.ok == 0)
|
if (hdr.ok == 0)
|
||||||
@ -1012,7 +1014,7 @@ uint32_t partialpos = 0;
|
|||||||
if (len - MYSQL_HEADER_LEN < MYSQL_PACKET_LENGTH_MAX)
|
if (len - MYSQL_HEADER_LEN < MYSQL_PACKET_LENGTH_MAX)
|
||||||
{
|
{
|
||||||
/** This is the last packet, we can now proceed to distribute
|
/** This is the last packet, we can now proceed to distribute
|
||||||
* the event */
|
* the event afer it has been written to disk */
|
||||||
ss_dassert(router->master_event_state != BLR_EVENT_COMPLETE);
|
ss_dassert(router->master_event_state != BLR_EVENT_COMPLETE);
|
||||||
router->master_event_state = BLR_EVENT_COMPLETE;
|
router->master_event_state = BLR_EVENT_COMPLETE;
|
||||||
memcpy(&hdr, &router->stored_header, sizeof(hdr));
|
memcpy(&hdr, &router->stored_header, sizeof(hdr));
|
||||||
@ -1028,17 +1030,8 @@ uint32_t partialpos = 0;
|
|||||||
{
|
{
|
||||||
offset = MYSQL_HEADER_LEN + 1;
|
offset = MYSQL_HEADER_LEN + 1;
|
||||||
router->master_event_state = BLR_EVENT_ONGOING;
|
router->master_event_state = BLR_EVENT_ONGOING;
|
||||||
|
|
||||||
/** Initialize the checksum and calculate it for
|
|
||||||
* the first packet */
|
|
||||||
if (router->master_chksum)
|
|
||||||
{
|
|
||||||
router->stored_checksum = crc32(0L, NULL, 0);
|
|
||||||
router->checksum_size = hdr.event_size - MYSQL_CHECKSUM_LEN;
|
|
||||||
router->partial_checksum_bytes = 0;
|
|
||||||
extra_bytes = MYSQL_HEADER_LEN + 1;
|
extra_bytes = MYSQL_HEADER_LEN + 1;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (router->master_chksum)
|
if (router->master_chksum)
|
||||||
{
|
{
|
||||||
@ -1048,7 +1041,7 @@ uint32_t partialpos = 0;
|
|||||||
size);
|
size);
|
||||||
router->checksum_size -= size;
|
router->checksum_size -= size;
|
||||||
|
|
||||||
if(router->checksum_size == 0 && size < len - offset)
|
if (router->checksum_size == 0 && size < len - offset)
|
||||||
{
|
{
|
||||||
extract_checksum(router, ptr + offset + size, len - offset - size);
|
extract_checksum(router, ptr + offset + size, len - offset - size);
|
||||||
}
|
}
|
||||||
@ -1083,11 +1076,8 @@ uint32_t partialpos = 0;
|
|||||||
|
|
||||||
if (router->master_event_state == BLR_EVENT_DONE)
|
if (router->master_event_state == BLR_EVENT_DONE)
|
||||||
{
|
{
|
||||||
/** Initialize the checksum and set the pointer offset to
|
/** Set the pointer offset to the first byte after
|
||||||
* the first byte after the header and OK byte */
|
* the header and OK byte */
|
||||||
router->stored_checksum = crc32(0L, NULL, 0);
|
|
||||||
router->checksum_size = hdr.event_size - MYSQL_CHECKSUM_LEN;
|
|
||||||
router->partial_checksum_bytes = 0;
|
|
||||||
offset = MYSQL_HEADER_LEN + 1;
|
offset = MYSQL_HEADER_LEN + 1;
|
||||||
size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN - 1;
|
size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN - 1;
|
||||||
}
|
}
|
||||||
@ -1248,7 +1238,9 @@ uint32_t partialpos = 0;
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
event_limit = router->mariadb10_compat ? MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;
|
/** Gather statistics about the replication event types */
|
||||||
|
int event_limit = router->mariadb10_compat ?
|
||||||
|
MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;
|
||||||
|
|
||||||
if (hdr.event_type >= 0 && hdr.event_type <= event_limit)
|
if (hdr.event_type >= 0 && hdr.event_type <= event_limit)
|
||||||
router->stats.events[hdr.event_type]++;
|
router->stats.events[hdr.event_type]++;
|
||||||
@ -2396,7 +2388,13 @@ int n;
|
|||||||
strerror_r(errno, err_msg, sizeof(err_msg)));
|
strerror_r(errno, err_msg, sizeof(err_msg)));
|
||||||
|
|
||||||
/* Remove any partial event that was written */
|
/* Remove any partial event that was written */
|
||||||
ftruncate(router->binlog_fd, router->last_written);
|
if(ftruncate(router->binlog_fd, router->last_written))
|
||||||
|
{
|
||||||
|
MXS_ERROR("%s: Failed to truncate binlog record at %lu of %s, %s. ",
|
||||||
|
router->service->name, router->last_written,
|
||||||
|
router->binlog_name,
|
||||||
|
strerror_r(errno, err_msg, sizeof(err_msg)));
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
router->last_written += data_len;
|
router->last_written += data_len;
|
||||||
@ -2474,43 +2472,28 @@ bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
int64_t len = hdr->event_size + 1;
|
/** Total size of all the payloads in all the packets */
|
||||||
|
int64_t len = hdr->event_size;
|
||||||
if (blr_send_packet(slave, buf, MYSQL_PACKET_LENGTH_MAX - 1, true))
|
bool first = true;
|
||||||
{
|
|
||||||
len -= MYSQL_PACKET_LENGTH_MAX;
|
|
||||||
buf += MYSQL_PACKET_LENGTH_MAX - 1;
|
|
||||||
|
|
||||||
/** Check for special case where length of the MySQL packet
|
|
||||||
* is exactly 0x00ffffff bytes long. In this case we need to send
|
|
||||||
* an empty packet to tell the slave that the whole event is sent. */
|
|
||||||
if (len == 0)
|
|
||||||
{
|
|
||||||
blr_send_packet(slave, buf, 0, false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
rval = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (rval && len > 0)
|
while (rval && len > 0)
|
||||||
{
|
{
|
||||||
/** Write rest of the event data */
|
uint64_t payload_len = first ? MYSQL_PACKET_LENGTH_MAX - 1 :
|
||||||
uint64_t payload_len = MIN(MYSQL_PACKET_LENGTH_MAX, len);
|
MIN(MYSQL_PACKET_LENGTH_MAX, len);
|
||||||
|
|
||||||
if (blr_send_packet(slave, buf, payload_len, false))
|
if (blr_send_packet(slave, buf, payload_len, first))
|
||||||
{
|
{
|
||||||
/** The check for exactly 0x00ffffff bytes needs to be done
|
/** The check for exactly 0x00ffffff bytes needs to be done
|
||||||
* here as well */
|
* here as well */
|
||||||
if (len == MYSQL_PACKET_LENGTH_MAX)
|
if (len == MYSQL_PACKET_LENGTH_MAX)
|
||||||
{
|
{
|
||||||
ss_dassert(len - MYSQL_PACKET_LENGTH_MAX == 0);
|
|
||||||
blr_send_packet(slave, buf, 0, false);
|
blr_send_packet(slave, buf, 0, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
len -= payload_len;
|
/** Add the extra byte written by blr_send_packet */
|
||||||
|
len -= first ? payload_len + 1 : payload_len;
|
||||||
buf += payload_len;
|
buf += payload_len;
|
||||||
|
first = false;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -2530,6 +2513,15 @@ bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf)
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract the checksum from the binlogs
|
||||||
|
*
|
||||||
|
* This updates the internal state of the router and will allow us to detect
|
||||||
|
* if the checksum is split across two packets.
|
||||||
|
* @param router Router instance
|
||||||
|
* @param cksumptr Pointer to the checksum
|
||||||
|
* @param len How much of the data is readable
|
||||||
|
*/
|
||||||
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len)
|
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len)
|
||||||
{
|
{
|
||||||
uint8_t *ptr = cksumptr;
|
uint8_t *ptr = cksumptr;
|
||||||
|
@ -2078,16 +2078,17 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
|||||||
hkheartbeat - beat1);
|
hkheartbeat - beat1);
|
||||||
}
|
}
|
||||||
|
|
||||||
blr_send_event(slave, &hdr, (uint8_t*) record->start);
|
if (blr_send_event(slave, &hdr, (uint8_t*) record->start))
|
||||||
gwbuf_free(record);
|
{
|
||||||
record = NULL;
|
|
||||||
|
|
||||||
if (hdr.event_type != ROTATE_EVENT)
|
if (hdr.event_type != ROTATE_EVENT)
|
||||||
{
|
{
|
||||||
slave->binlog_pos = hdr.next_pos;
|
slave->binlog_pos = hdr.next_pos;
|
||||||
}
|
}
|
||||||
slave->stats.n_events++;
|
slave->stats.n_events++;
|
||||||
burst_size -= hdr.event_size;
|
burst_size -= hdr.event_size;
|
||||||
|
}
|
||||||
|
gwbuf_free(record);
|
||||||
|
record = NULL;
|
||||||
|
|
||||||
/* set lastReply for slave heartbeat check */
|
/* set lastReply for slave heartbeat check */
|
||||||
if (router->send_slave_heartbeat)
|
if (router->send_slave_heartbeat)
|
||||||
|
Reference in New Issue
Block a user