Add more comments and clean up code

The binlog event processing code is now better commented and is slightly
easier to read.
This commit is contained in:
Markus Makela
2016-12-05 15:35:29 +02:00
parent 2f082cb7fb
commit f5eb4e21dc
2 changed files with 84 additions and 44 deletions

View File

@ -494,7 +494,7 @@ typedef struct router_instance
int pending_transaction; /*< Pending transaction */ int pending_transaction; /*< Pending transaction */
enum blr_event_state master_event_state; /*< Packet read state */ enum blr_event_state master_event_state; /*< Packet read state */
REP_HEADER stored_header; /*< Relication header of the event the master is sending */ REP_HEADER stored_header; /*< Relication header of the event the master is sending */
GWBUF *stored_event; /*< Partial even buffer */ GWBUF *stored_event; /*< Buffer where partial events are stored */
uint64_t last_safe_pos; /* last committed transaction */ uint64_t last_safe_pos; /* last committed transaction */
char binlog_name[BINLOG_FNAMELEN + 1]; char binlog_name[BINLOG_FNAMELEN + 1];
/*< Name of the current binlog file */ /*< Name of the current binlog file */

View File

@ -996,6 +996,38 @@ static bool verify_checksum(ROUTER_INSTANCE *router, size_t len, uint8_t *ptr)
return rval; return rval;
} }
/**
* @brief Reset router errors
*
* @param router Router instance
* @param hdr Replication header
*/
static void reset_errors(ROUTER_INSTANCE *router, REP_HEADER *hdr)
{
spinlock_acquire(&router->lock);
/* set mysql errno to 0 */
router->m_errno = 0;
/* Remove error message */
if (router->m_errmsg)
{
MXS_FREE(router->m_errmsg);
}
router->m_errmsg = NULL;
spinlock_release(&router->lock);
#ifdef SHOW_EVENTS
printf("blr: len %lu, event type 0x%02x, flags 0x%04x, "
"event size %d, event timestamp %lu\n",
(unsigned long)len - 4,
hdr->event_type,
hdr->flags,
hdr->event_size,
(unsigned long)hdr->timestamp);
#endif
}
/** /**
* blr_handle_binlog_record - we have received binlog records from * blr_handle_binlog_record - we have received binlog records from
* the master and we must now work out what to do with them. * the master and we must now work out what to do with them.
@ -1052,6 +1084,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
{ {
if (router->master_event_state == BLR_EVENT_DONE) if (router->master_event_state == BLR_EVENT_DONE)
{ {
/** This is the start of a new event */
spinlock_acquire(&router->lock); spinlock_acquire(&router->lock);
router->stats.n_binlogs++; router->stats.n_binlogs++;
router->stats.n_binlogs_ses++; router->stats.n_binlogs_ses++;
@ -1113,9 +1146,12 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
break; break;
} }
/** Store the header for later use */ /** This is the first (and possibly last) packet of a replication
memcpy(&router->stored_header, &hdr, sizeof(hdr)); * event. We store the header in case the event is large and
* it is transmitted over multiple network packets. */
router->master_event_state = BLR_EVENT_STARTED; router->master_event_state = BLR_EVENT_STARTED;
memcpy(&router->stored_header, &hdr, sizeof(hdr));
reset_errors(router, &hdr);
} }
else else
{ {
@ -1127,32 +1163,11 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
break; break;
} }
}
if (hdr.ok == 0) else
{ {
spinlock_acquire(&router->lock); /** We're processing a multi-packet replication event */
ss_dassert(router->master_event_state == BLR_EVENT_ONGOING);
/* set mysql errno to 0 */
router->m_errno = 0;
/* Remove error message */
if (router->m_errmsg)
{
MXS_FREE(router->m_errmsg);
}
router->m_errmsg = NULL;
spinlock_release(&router->lock);
#ifdef SHOW_EVENTS
printf("blr: len %lu, event type 0x%02x, flags 0x%04x, "
"event size %d, event timestamp %lu\n",
(unsigned long)len - 4,
hdr.event_type,
hdr.flags,
hdr.event_size,
(unsigned long)hdr.timestamp);
#endif
}
} }
/** Gather the event into one big buffer */ /** Gather the event into one big buffer */
@ -1169,7 +1184,11 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
if (router->master_event_state == BLR_EVENT_ONGOING) if (router->master_event_state == BLR_EVENT_ONGOING)
{ {
/** Only consume the network header */ /**
* Consume the network header so that we can append the raw
* event data to the original buffer. This allows both checksum
* calculations and encryption to process it as a contiguous event
*/
part = gwbuf_consume(part, MYSQL_HEADER_LEN); part = gwbuf_consume(part, MYSQL_HEADER_LEN);
} }
@ -1177,24 +1196,43 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
if (len < MYSQL_PACKET_LENGTH_MAX) if (len < MYSQL_PACKET_LENGTH_MAX)
{ {
/** This is the last packet, we can now write it to disk */ /**
* This is either the only packet for the event or the last
* packet in a series for this event. The buffer now contains
* the network header of the first packet (4 bytes) and one OK byte.
* The semi-sync bytes are always consumed at an earlier stage.
*/
ss_dassert(router->master_event_state != BLR_EVENT_DONE); ss_dassert(router->master_event_state != BLR_EVENT_DONE);
if (router->master_event_state != BLR_EVENT_STARTED) if (router->master_event_state != BLR_EVENT_STARTED)
{ {
/** This is not the first packet of the event, copy the /**
* stored header */ * This is not the first packet for this event. We must use
* the stored header.
*/
memcpy(&hdr, &router->stored_header, sizeof(hdr)); memcpy(&hdr, &router->stored_header, sizeof(hdr));
} }
/** The event is now complete */
router->master_event_state = BLR_EVENT_DONE; router->master_event_state = BLR_EVENT_DONE;
} }
else else
{ {
/**
* This packet is a part of a series of packets that contain an
* event larger than MYSQL_PACKET_LENGTH_MAX bytes.
*
* For each partial event chunk, we remove the network header and
* append it to router->stored_event. The first event is an
* exception to this and it is appended as-is with the network
* header and the extra OK byte.
*/
ss_dassert(len == MYSQL_PACKET_LENGTH_MAX);
router->master_event_state = BLR_EVENT_ONGOING; router->master_event_state = BLR_EVENT_ONGOING;
continue; continue;
} }
/** We have the complete event in one contiguous buffer */ /** We now have the complete event in one contiguous buffer */
router->stored_event = gwbuf_make_contiguous(router->stored_event); router->stored_event = gwbuf_make_contiguous(router->stored_event);
ptr = GWBUF_DATA(router->stored_event); ptr = GWBUF_DATA(router->stored_event);
@ -1202,9 +1240,9 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
* header and one OK byte. Semi-sync bytes are never stored. */ * header and one OK byte. Semi-sync bytes are never stored. */
len = gwbuf_length(router->stored_event); len = gwbuf_length(router->stored_event);
/* /**
* First check that the checksum we calculate matches the * If checksums are enabled, verify that the stored checksum
* checksum in the packet we received. * matches the one we calculated
*/ */
if (router->master_chksum && !verify_checksum(router, len, ptr)) if (router->master_chksum && !verify_checksum(router, len, ptr))
{ {
@ -1406,9 +1444,6 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
} }
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F) else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
{ {
ptr = ptr + MYSQL_HEADER_LEN + 1; // Skip header and OK byte
uint32_t offset = MYSQL_HEADER_LEN + 1;
if (hdr.event_type == ROTATE_EVENT) if (hdr.event_type == ROTATE_EVENT)
{ {
spinlock_acquire(&router->binlog_lock); spinlock_acquire(&router->binlog_lock);
@ -1416,8 +1451,13 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
spinlock_release(&router->binlog_lock); spinlock_release(&router->binlog_lock);
} }
/* Write event to disk */ uint32_t offset = MYSQL_HEADER_LEN + 1; // Skip header and OK byte
if (blr_write_binlog_record(router, &hdr, len - offset, ptr) == 0)
/**
* Write the raw event data to disk without the network
* header or the OK byte
*/
if (blr_write_binlog_record(router, &hdr, len - offset, ptr + offset) == 0)
{ {
gwbuf_free(pkt); gwbuf_free(pkt);
blr_master_close(router); blr_master_close(router);
@ -1425,7 +1465,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
return; return;
} }
/* Check for rotete event */ /* Check for rotate event */
if (hdr.event_type == ROTATE_EVENT) if (hdr.event_type == ROTATE_EVENT)
{ {
if (!blr_rotate_event(router, ptr, &hdr)) if (!blr_rotate_event(router, ptr, &hdr))
@ -1437,7 +1477,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
} }
} }
/* Handle semi-sync request fom master */ /* Handle semi-sync request from master */
if (router->master_semi_sync != MASTER_SEMISYNC_NOT_AVAILABLE && if (router->master_semi_sync != MASTER_SEMISYNC_NOT_AVAILABLE &&
semi_sync_send_ack == BLR_MASTER_SEMI_SYNC_ACK_REQ && semi_sync_send_ack == BLR_MASTER_SEMI_SYNC_ACK_REQ &&
(router->master_event_state == BLR_EVENT_DONE)) (router->master_event_state == BLR_EVENT_DONE))