Added detection of checksums split across two packets
The checksums should now be processed properly event if the event is in more than one packet.
This commit is contained in:
@ -44,6 +44,7 @@
|
|||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <memlog.h>
|
#include <memlog.h>
|
||||||
#include <zlib.h>
|
#include <zlib.h>
|
||||||
|
#include <mysql_client_server_protocol.h>
|
||||||
|
|
||||||
#define BINLOG_FNAMELEN 255
|
#define BINLOG_FNAMELEN 255
|
||||||
#define BLR_PROTOCOL "MySQLBackend"
|
#define BLR_PROTOCOL "MySQLBackend"
|
||||||
@ -425,6 +426,10 @@ typedef struct router_instance {
|
|||||||
int pending_transaction; /*< Pending transaction */
|
int pending_transaction; /*< Pending transaction */
|
||||||
enum blr_event_state master_event_state; /*< Packet read state */
|
enum blr_event_state master_event_state; /*< Packet read state */
|
||||||
uint32_t stored_checksum; /*< The current value of the checksum */
|
uint32_t stored_checksum; /*< The current value of the checksum */
|
||||||
|
uint8_t partial_checksum[MYSQL_CHECKSUM_LEN]; /*< The partial value of the checksum
|
||||||
|
* received from the master */
|
||||||
|
uint8_t partial_checksum_bytes; /*< How many bytes of the checksum we have read */
|
||||||
|
uint64_t checksum_size; /*< Data size for the checksum */
|
||||||
REP_HEADER stored_header; /*< Relication header of the event the master is sending */
|
REP_HEADER stored_header; /*< Relication header of the event the master is sending */
|
||||||
uint64_t last_safe_pos; /* last committed transaction */
|
uint64_t last_safe_pos; /* last committed transaction */
|
||||||
char binlog_name[BINLOG_FNAMELEN+1];
|
char binlog_name[BINLOG_FNAMELEN+1];
|
||||||
|
@ -101,6 +101,7 @@ static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message,
|
|||||||
|
|
||||||
int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf);
|
int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf);
|
||||||
bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf);
|
bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf);
|
||||||
|
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len);
|
||||||
|
|
||||||
static int keepalive = 1;
|
static int keepalive = 1;
|
||||||
|
|
||||||
@ -1033,15 +1034,24 @@ uint32_t partialpos = 0;
|
|||||||
if (router->master_chksum)
|
if (router->master_chksum)
|
||||||
{
|
{
|
||||||
router->stored_checksum = crc32(0L, NULL, 0);
|
router->stored_checksum = crc32(0L, NULL, 0);
|
||||||
|
router->checksum_size = hdr.event_size - MYSQL_CHECKSUM_LEN;
|
||||||
|
router->partial_checksum_bytes = 0;
|
||||||
extra_bytes = MYSQL_HEADER_LEN + 1;
|
extra_bytes = MYSQL_HEADER_LEN + 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (router->master_chksum)
|
if (router->master_chksum)
|
||||||
{
|
{
|
||||||
|
uint32_t size = MIN(len - extra_bytes, router->checksum_size);
|
||||||
router->stored_checksum = crc32(router->stored_checksum,
|
router->stored_checksum = crc32(router->stored_checksum,
|
||||||
ptr + offset,
|
ptr + offset,
|
||||||
len - extra_bytes);
|
size);
|
||||||
|
router->checksum_size -= size;
|
||||||
|
|
||||||
|
if(router->checksum_size == 0 && size < len - offset)
|
||||||
|
{
|
||||||
|
extract_checksum(router, ptr + offset + size, len - offset - size);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (blr_write_data_into_binlog(router, len - offset, ptr + offset) == 0)
|
if (blr_write_data_into_binlog(router, len - offset, ptr + offset) == 0)
|
||||||
@ -1076,26 +1086,49 @@ uint32_t partialpos = 0;
|
|||||||
/** Initialize the checksum and set the pointer offset to
|
/** Initialize the checksum and set the pointer offset to
|
||||||
* the first byte after the header and OK byte */
|
* the first byte after the header and OK byte */
|
||||||
router->stored_checksum = crc32(0L, NULL, 0);
|
router->stored_checksum = crc32(0L, NULL, 0);
|
||||||
|
router->checksum_size = hdr.event_size - MYSQL_CHECKSUM_LEN;
|
||||||
|
router->partial_checksum_bytes = 0;
|
||||||
offset = MYSQL_HEADER_LEN + 1;
|
offset = MYSQL_HEADER_LEN + 1;
|
||||||
size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN - 1;
|
size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
router->stored_checksum = crc32(router->stored_checksum,
|
size = MIN(size, router->checksum_size);
|
||||||
ptr + offset, size);
|
|
||||||
|
|
||||||
pktsum = EXTRACT32(ptr + len - MYSQL_CHECKSUM_LEN);
|
if (router->checksum_size > 0)
|
||||||
if (pktsum != router->stored_checksum)
|
|
||||||
{
|
{
|
||||||
router->stats.n_badcrc++;
|
router->stored_checksum = crc32(router->stored_checksum,
|
||||||
free(msg);
|
ptr + offset,
|
||||||
msg = NULL;
|
size);
|
||||||
MXS_ERROR("%s: Checksum error in event from master, "
|
router->checksum_size -= size;
|
||||||
"binlog %s @ %lu. Closing master connection.",
|
}
|
||||||
router->service->name, router->binlog_name,
|
|
||||||
router->current_pos);
|
if(router->checksum_size == 0 && size < len - offset)
|
||||||
blr_master_close(router);
|
{
|
||||||
blr_master_delayed_connect(router);
|
extract_checksum(router, ptr + offset + size, len - offset - size);
|
||||||
return;
|
}
|
||||||
|
|
||||||
|
if (router->partial_checksum_bytes == MYSQL_CHECKSUM_LEN)
|
||||||
|
{
|
||||||
|
pktsum = EXTRACT32(&router->partial_checksum);
|
||||||
|
if (pktsum != router->stored_checksum)
|
||||||
|
{
|
||||||
|
router->stats.n_badcrc++;
|
||||||
|
free(msg);
|
||||||
|
msg = NULL;
|
||||||
|
MXS_ERROR("%s: Checksum error in event from master, "
|
||||||
|
"binlog %s @ %lu. Closing master connection.",
|
||||||
|
router->service->name, router->binlog_name,
|
||||||
|
router->current_pos);
|
||||||
|
blr_master_close(router);
|
||||||
|
blr_master_delayed_connect(router);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
pkt = gwbuf_consume(pkt, len);
|
||||||
|
pkt_length -= len;
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2497,3 +2530,14 @@ bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf)
|
|||||||
}
|
}
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len)
|
||||||
|
{
|
||||||
|
uint8_t *ptr = cksumptr;
|
||||||
|
while (ptr - cksumptr < len)
|
||||||
|
{
|
||||||
|
router->partial_checksum[router->partial_checksum_bytes] = *ptr;
|
||||||
|
ptr++;
|
||||||
|
router->partial_checksum_bytes++;
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user