Store large events in memory
Storing the large events in memory allows checksum calculations to be done in one step. This also makes the encryption of events easier as they require the complete event in memory.
This commit is contained in:
parent
83109a6e9e
commit
2f082cb7fb
@ -530,7 +530,6 @@ createInstance(SERVICE *service, char **options)
|
||||
inst->reconnect_pending = 0;
|
||||
inst->handling_threads = 0;
|
||||
inst->rotating = 0;
|
||||
inst->residual = NULL;
|
||||
inst->slaves = NULL;
|
||||
inst->next = NULL;
|
||||
inst->lastEventTimestamp = 0;
|
||||
@ -2334,13 +2333,6 @@ destroyInstance(ROUTER *instance)
|
||||
}
|
||||
}
|
||||
|
||||
/* Discard the queued residual data */
|
||||
while (inst->residual)
|
||||
{
|
||||
inst->residual = gwbuf_consume(inst->residual, GWBUF_LENGTH(inst->residual));
|
||||
}
|
||||
inst->residual = NULL;
|
||||
|
||||
MXS_INFO("%s is being stopped by MaxScale shudown. Disconnecting from master %s:%d, "
|
||||
"read up to log %s, pos %lu, transaction safe pos %lu",
|
||||
inst->service->name,
|
||||
|
@ -231,12 +231,9 @@ static const char BLR_DBUSERS_FILE[] = "dbusers";
|
||||
/** Possible states of an event sent by the master */
|
||||
enum blr_event_state
|
||||
{
|
||||
BLR_EVENT_DONE, /*< No event being processed */
|
||||
BLR_EVENT_STARTED, /*< The first packet of an event which spans multiple packets
|
||||
* has been received */
|
||||
BLR_EVENT_STARTED, /*< The first packet of an event has been received */
|
||||
BLR_EVENT_ONGOING, /*< Other packets of a multi-packet event are being processed */
|
||||
BLR_EVENT_COMPLETE /*< A multi-packet event has been successfully processed
|
||||
* but the router is not yet ready to process another one */
|
||||
BLR_EVENT_DONE, /*< The complete event was received */
|
||||
};
|
||||
|
||||
/* Master Server configuration struct */
|
||||
@ -490,19 +487,14 @@ typedef struct router_instance
|
||||
unsigned int master_state; /*< State of the master FSM */
|
||||
uint8_t lastEventReceived; /*< Last even received */
|
||||
uint32_t lastEventTimestamp; /*< Timestamp from last event */
|
||||
GWBUF *residual; /*< Any residual binlog event */
|
||||
MASTER_RESPONSES saved_master; /*< Saved master responses */
|
||||
char *binlogdir; /*< The directory with the binlog files */
|
||||
SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */
|
||||
int trx_safe; /*< Detect and handle partial transactions */
|
||||
int pending_transaction; /*< Pending transaction */
|
||||
enum blr_event_state master_event_state; /*< Packet read state */
|
||||
uint32_t stored_checksum; /*< The current value of the checksum */
|
||||
uint8_t partial_checksum[MYSQL_CHECKSUM_LEN]; /*< The partial value of the checksum
|
||||
* received from the master */
|
||||
uint8_t partial_checksum_bytes; /*< How many bytes of the checksum we have read */
|
||||
uint64_t checksum_size; /*< Data size for the checksum */
|
||||
REP_HEADER stored_header; /*< Relication header of the event the master is sending */
|
||||
GWBUF *stored_event; /*< Partial even buffer */
|
||||
uint64_t last_safe_pos; /* last committed transaction */
|
||||
char binlog_name[BINLOG_FNAMELEN + 1];
|
||||
/*< Name of the current binlog file */
|
||||
|
@ -102,8 +102,6 @@ static void blr_extract_header_semisync(uint8_t *pkt, REP_HEADER *hdr);
|
||||
static int blr_send_semisync_ack (ROUTER_INSTANCE *router, uint64_t pos);
|
||||
static int blr_get_master_semisync(GWBUF *buf);
|
||||
|
||||
int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf);
|
||||
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len);
|
||||
static void blr_terminate_master_replication(ROUTER_INSTANCE *router, uint8_t* ptr, int len);
|
||||
void blr_notify_all_slaves(ROUTER_INSTANCE *router);
|
||||
extern bool blr_notify_waiting_slave(ROUTER_SLAVE *slave);
|
||||
@ -166,13 +164,6 @@ blr_start_master(void* data)
|
||||
}
|
||||
router->master_state = BLRM_CONNECTING;
|
||||
|
||||
/* Discard the queued residual data */
|
||||
while (router->residual)
|
||||
{
|
||||
router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual));
|
||||
}
|
||||
router->residual = NULL;
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
if ((client = dcb_alloc(DCB_ROLE_INTERNAL, NULL)) == NULL)
|
||||
{
|
||||
@ -240,13 +231,6 @@ blr_restart_master(ROUTER_INSTANCE *router)
|
||||
{
|
||||
dcb_close(router->client);
|
||||
|
||||
/* Discard the queued residual data */
|
||||
while (router->residual)
|
||||
{
|
||||
router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual));
|
||||
}
|
||||
router->residual = NULL;
|
||||
|
||||
/* Now it is safe to unleash other threads on this router instance */
|
||||
spinlock_acquire(&router->lock);
|
||||
router->reconnect_pending = 0;
|
||||
@ -333,6 +317,8 @@ blr_master_close(ROUTER_INSTANCE *router)
|
||||
dcb_close(router->master);
|
||||
router->master_state = BLRM_UNCONNECTED;
|
||||
router->master_event_state = BLR_EVENT_DONE;
|
||||
gwbuf_free(router->stored_event);
|
||||
router->stored_event = NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -985,6 +971,31 @@ encode_value(unsigned char *data, unsigned int value, int len)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that the stored event checksum matches the calculated checksum
|
||||
*/
|
||||
static bool verify_checksum(ROUTER_INSTANCE *router, size_t len, uint8_t *ptr)
|
||||
{
|
||||
bool rval = true;
|
||||
uint32_t offset = MYSQL_HEADER_LEN + 1;
|
||||
uint32_t size = len - (offset + MYSQL_CHECKSUM_LEN);
|
||||
|
||||
uint32_t checksum = crc32(0L, ptr + offset, size);
|
||||
uint32_t pktsum = EXTRACT32(ptr + offset + size);
|
||||
|
||||
if (pktsum != checksum)
|
||||
{
|
||||
rval = false;
|
||||
MXS_ERROR("%s: Checksum error in event from master, "
|
||||
"binlog %s @ %lu. Closing master connection.",
|
||||
router->service->name, router->binlog_name,
|
||||
router->current_pos);
|
||||
router->stats.n_badcrc++;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* blr_handle_binlog_record - we have received binlog records from
|
||||
* the master and we must now work out what to do with them.
|
||||
@ -998,30 +1009,18 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
uint8_t *msg = NULL, *ptr;
|
||||
REP_HEADER hdr;
|
||||
unsigned int len = 0;
|
||||
unsigned int pkt_length;
|
||||
int prev_length = -1;
|
||||
int n_bufs = -1, pn_bufs = -1;
|
||||
int check_packet_len;
|
||||
int semisync_bytes;
|
||||
int semi_sync_send_ack = 0;
|
||||
|
||||
/*
|
||||
* Prepend any residual buffer to the buffer chain we have
|
||||
* been called with.
|
||||
*/
|
||||
if (router->residual)
|
||||
{
|
||||
pkt = gwbuf_append(router->residual, pkt);
|
||||
router->residual = NULL;
|
||||
}
|
||||
|
||||
pkt_length = gwbuf_length(pkt);
|
||||
/*
|
||||
* Loop over all the packets while we still have some data
|
||||
* and the packet length is enough to hold a replication event
|
||||
* header.
|
||||
*/
|
||||
while (pkt && pkt_length > 24)
|
||||
while (pkt)
|
||||
{
|
||||
ptr = GWBUF_DATA(pkt);
|
||||
len = gw_mysql_get_byte3(ptr);
|
||||
@ -1035,9 +1034,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
|
||||
if (len < BINLOG_EVENT_HDR_LEN && router->master_event_state != BLR_EVENT_ONGOING)
|
||||
{
|
||||
// Dead code?
|
||||
|
||||
char *event_msg = "";
|
||||
char *event_msg = "unknown";
|
||||
|
||||
/* Packet is too small to be a binlog event */
|
||||
if (ptr[4] == 0xfe) /* EOF Packet */
|
||||
@ -1049,6 +1046,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
event_msg = "error";
|
||||
}
|
||||
MXS_NOTICE("Non-event message (%s) from master.", event_msg);
|
||||
pkt = gwbuf_consume(pkt, len);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1089,7 +1087,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
/* Sanity check */
|
||||
if (hdr.ok == 0)
|
||||
{
|
||||
if (hdr.event_size != len - check_packet_len &&
|
||||
if (hdr.event_size != len - (check_packet_len - MYSQL_HEADER_LEN) &&
|
||||
(hdr.event_size + (check_packet_len - MYSQL_HEADER_LEN)) < MYSQL_PACKET_LENGTH_MAX)
|
||||
{
|
||||
MXS_ERROR("Packet length is %d, but event size is %d, "
|
||||
@ -1114,18 +1112,10 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
|
||||
break;
|
||||
}
|
||||
else if (hdr.event_size + (check_packet_len - MYSQL_HEADER_LEN) >= MYSQL_PACKET_LENGTH_MAX)
|
||||
{
|
||||
router->master_event_state = BLR_EVENT_STARTED;
|
||||
|
||||
/** Store the header for later use */
|
||||
memcpy(&router->stored_header, &hdr, sizeof(hdr));
|
||||
}
|
||||
|
||||
/** Prepare the checksum variables for this event */
|
||||
router->stored_checksum = crc32(0L, NULL, 0);
|
||||
router->checksum_size = hdr.event_size - MYSQL_CHECKSUM_LEN;
|
||||
router->partial_checksum_bytes = 0;
|
||||
/** Store the header for later use */
|
||||
memcpy(&router->stored_header, &hdr, sizeof(hdr));
|
||||
router->master_event_state = BLR_EVENT_STARTED;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1134,7 +1124,6 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
|
||||
gwbuf_free(pkt);
|
||||
pkt = NULL;
|
||||
pkt_length = 0;
|
||||
|
||||
break;
|
||||
}
|
||||
@ -1166,125 +1155,63 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
}
|
||||
}
|
||||
|
||||
/* pending large event */
|
||||
if (router->master_event_state != BLR_EVENT_DONE)
|
||||
/** Gather the event into one big buffer */
|
||||
GWBUF *part = gwbuf_split(&pkt, len + MYSQL_HEADER_LEN);
|
||||
|
||||
if (semisync_bytes)
|
||||
{
|
||||
if (len - MYSQL_HEADER_LEN < MYSQL_PACKET_LENGTH_MAX)
|
||||
/** Consume the two semi-sync bytes */
|
||||
part = gwbuf_consume(part, semisync_bytes);
|
||||
}
|
||||
|
||||
ss_dassert(router->master_event_state == BLR_EVENT_STARTED ||
|
||||
router->master_event_state == BLR_EVENT_ONGOING);
|
||||
|
||||
if (router->master_event_state == BLR_EVENT_ONGOING)
|
||||
{
|
||||
/** Only consume the network header */
|
||||
part = gwbuf_consume(part, MYSQL_HEADER_LEN);
|
||||
}
|
||||
|
||||
router->stored_event = gwbuf_append(router->stored_event, part);
|
||||
|
||||
if (len < MYSQL_PACKET_LENGTH_MAX)
|
||||
{
|
||||
/** This is the last packet, we can now write it to disk */
|
||||
ss_dassert(router->master_event_state != BLR_EVENT_DONE);
|
||||
|
||||
if (router->master_event_state != BLR_EVENT_STARTED)
|
||||
{
|
||||
/** This is the last packet, we can now proceed to distribute
|
||||
* the event afer it has been written to disk */
|
||||
ss_dassert(router->master_event_state != BLR_EVENT_COMPLETE);
|
||||
router->master_event_state = BLR_EVENT_COMPLETE;
|
||||
/** This is not the first packet of the event, copy the
|
||||
* stored header */
|
||||
memcpy(&hdr, &router->stored_header, sizeof(hdr));
|
||||
}
|
||||
else
|
||||
{
|
||||
/* current partial event is being written to disk file */
|
||||
uint32_t offset = MYSQL_HEADER_LEN;
|
||||
uint32_t extra_bytes = MYSQL_HEADER_LEN;
|
||||
|
||||
/** Don't write the OK byte into the binlog */
|
||||
if (router->master_event_state == BLR_EVENT_STARTED)
|
||||
{
|
||||
offset = MYSQL_HEADER_LEN + 1;
|
||||
router->master_event_state = BLR_EVENT_ONGOING;
|
||||
extra_bytes = MYSQL_HEADER_LEN + 1;
|
||||
}
|
||||
|
||||
ss_dassert(len - extra_bytes - semisync_bytes > 0);
|
||||
uint32_t bytes_available = len - extra_bytes - semisync_bytes;
|
||||
|
||||
if (router->master_chksum)
|
||||
{
|
||||
uint32_t size = MXS_MIN(len - extra_bytes - semisync_bytes,
|
||||
router->checksum_size);
|
||||
|
||||
router->stored_checksum = crc32(router->stored_checksum,
|
||||
ptr + offset,
|
||||
size);
|
||||
router->checksum_size -= size;
|
||||
|
||||
if (router->checksum_size == 0 && size < bytes_available)
|
||||
{
|
||||
extract_checksum(router, ptr + offset + size,
|
||||
bytes_available - size);
|
||||
}
|
||||
}
|
||||
|
||||
if (blr_write_data_into_binlog(router, bytes_available,
|
||||
ptr + offset) == 0)
|
||||
{
|
||||
/** Failed to write to the binlog file, destroy the buffer
|
||||
* chain and close the connection with the master */
|
||||
while (pkt)
|
||||
{
|
||||
pkt = GWBUF_CONSUME_ALL(pkt);
|
||||
}
|
||||
blr_master_close(router);
|
||||
blr_master_delayed_connect(router);
|
||||
return;
|
||||
}
|
||||
pkt = gwbuf_consume(pkt, len);
|
||||
pkt_length -= len;
|
||||
continue;
|
||||
}
|
||||
router->master_event_state = BLR_EVENT_DONE;
|
||||
}
|
||||
else
|
||||
{
|
||||
router->master_event_state = BLR_EVENT_ONGOING;
|
||||
continue;
|
||||
}
|
||||
|
||||
/** We have the complete event in one contiguous buffer */
|
||||
router->stored_event = gwbuf_make_contiguous(router->stored_event);
|
||||
ptr = GWBUF_DATA(router->stored_event);
|
||||
|
||||
/** len is now the length of the complete event plus 4 bytes of network
|
||||
* header and one OK byte. Semi-sync bytes are never stored. */
|
||||
len = gwbuf_length(router->stored_event);
|
||||
|
||||
/*
|
||||
* First check that the checksum we calculate matches the
|
||||
* checksum in the packet we received.
|
||||
*/
|
||||
if (router->master_chksum)
|
||||
if (router->master_chksum && !verify_checksum(router, len, ptr))
|
||||
{
|
||||
uint32_t offset = MYSQL_HEADER_LEN;
|
||||
uint32_t size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN;
|
||||
|
||||
if (router->master_event_state == BLR_EVENT_DONE)
|
||||
{
|
||||
/** Set the pointer offset to the first byte after
|
||||
* the header and OK byte */
|
||||
offset = MYSQL_HEADER_LEN + 1;
|
||||
size = len - (check_packet_len + MYSQL_CHECKSUM_LEN);
|
||||
}
|
||||
|
||||
size = MXS_MIN(size, router->checksum_size);
|
||||
|
||||
if (router->checksum_size > 0)
|
||||
{
|
||||
router->stored_checksum = crc32(router->stored_checksum,
|
||||
ptr + offset, size);
|
||||
router->checksum_size -= size;
|
||||
}
|
||||
|
||||
if(router->checksum_size == 0 && size < (len - offset - semisync_bytes))
|
||||
{
|
||||
extract_checksum(router, ptr + offset + size,
|
||||
len - offset - size - semisync_bytes);
|
||||
}
|
||||
|
||||
if (router->partial_checksum_bytes == MYSQL_CHECKSUM_LEN)
|
||||
{
|
||||
uint32_t pktsum = EXTRACT32(router->partial_checksum);
|
||||
if (pktsum != router->stored_checksum)
|
||||
{
|
||||
router->stats.n_badcrc++;
|
||||
MXS_FREE(msg);
|
||||
/* msg = NULL; Not needed unless msg will be referred to again */
|
||||
MXS_ERROR("%s: Checksum error in event from master, "
|
||||
"binlog %s @ %lu. Closing master connection.",
|
||||
router->service->name, router->binlog_name,
|
||||
router->current_pos);
|
||||
blr_master_close(router);
|
||||
blr_master_delayed_connect(router);
|
||||
return;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
pkt = gwbuf_consume(pkt, len);
|
||||
pkt_length -= len;
|
||||
continue;
|
||||
}
|
||||
MXS_FREE(msg);
|
||||
blr_master_close(router);
|
||||
blr_master_delayed_connect(router);
|
||||
return;
|
||||
}
|
||||
|
||||
if (hdr.ok == 0)
|
||||
@ -1479,15 +1406,8 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
}
|
||||
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
|
||||
{
|
||||
ptr = ptr + 4; // Skip header
|
||||
uint32_t offset = 4;
|
||||
|
||||
if (router->master_event_state == BLR_EVENT_STARTED ||
|
||||
router->master_event_state == BLR_EVENT_DONE)
|
||||
{
|
||||
ptr++;
|
||||
offset++;
|
||||
}
|
||||
ptr = ptr + MYSQL_HEADER_LEN + 1; // Skip header and OK byte
|
||||
uint32_t offset = MYSQL_HEADER_LEN + 1;
|
||||
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
{
|
||||
@ -1496,23 +1416,10 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
spinlock_release(&router->binlog_lock);
|
||||
}
|
||||
|
||||
/* Current event is being written to disk file.
|
||||
* It is possible for an empty packet to be sent if an
|
||||
* event is exactly 2^24 bytes long. In this case the
|
||||
* empty packet should be discarded. */
|
||||
if (len > MYSQL_HEADER_LEN &&
|
||||
blr_write_binlog_record(router, &hdr, len - offset - semisync_bytes, ptr) == 0)
|
||||
/* Write event to disk */
|
||||
if (blr_write_binlog_record(router, &hdr, len - offset, ptr) == 0)
|
||||
{
|
||||
/*
|
||||
* Failed to write to the
|
||||
* binlog file, destroy the
|
||||
* buffer chain and close the
|
||||
* connection with the master
|
||||
*/
|
||||
while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL)
|
||||
{
|
||||
;
|
||||
}
|
||||
gwbuf_free(pkt);
|
||||
blr_master_close(router);
|
||||
blr_master_delayed_connect(router);
|
||||
return;
|
||||
@ -1523,16 +1430,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
{
|
||||
if (!blr_rotate_event(router, ptr, &hdr))
|
||||
{
|
||||
/*
|
||||
* Failed to write to the
|
||||
* binlog file, destroy the
|
||||
* buffer chain and close the
|
||||
* connection with the master
|
||||
*/
|
||||
while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL)
|
||||
{
|
||||
;
|
||||
}
|
||||
gwbuf_free(pkt);
|
||||
blr_master_close(router);
|
||||
blr_master_delayed_connect(router);
|
||||
return;
|
||||
@ -1542,8 +1440,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
/* Handle semi-sync request fom master */
|
||||
if (router->master_semi_sync != MASTER_SEMISYNC_NOT_AVAILABLE &&
|
||||
semi_sync_send_ack == BLR_MASTER_SEMI_SYNC_ACK_REQ &&
|
||||
(router->master_event_state == BLR_EVENT_COMPLETE ||
|
||||
router->master_event_state == BLR_EVENT_DONE))
|
||||
(router->master_event_state == BLR_EVENT_DONE))
|
||||
{
|
||||
|
||||
MXS_DEBUG("%s: binlog record in file %s, pos %lu has "
|
||||
@ -1629,16 +1526,7 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
spinlock_release(&router->binlog_lock);
|
||||
if (!blr_rotate_event(router, ptr, &hdr))
|
||||
{
|
||||
/*
|
||||
* Failed to write to the
|
||||
* binlog file, destroy the
|
||||
* buffer chain and close the
|
||||
* connection with the master
|
||||
*/
|
||||
while ((pkt = gwbuf_consume(pkt, GWBUF_LENGTH(pkt))) != NULL)
|
||||
{
|
||||
;
|
||||
}
|
||||
gwbuf_free(pkt);
|
||||
blr_master_close(router);
|
||||
blr_master_delayed_connect(router);
|
||||
return;
|
||||
@ -1646,17 +1534,15 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** A large event is now fully received and processed */
|
||||
if (router->master_event_state == BLR_EVENT_COMPLETE)
|
||||
{
|
||||
router->master_event_state = BLR_EVENT_DONE;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
blr_terminate_master_replication(router, ptr, len);
|
||||
}
|
||||
|
||||
/** Finished processing the event */
|
||||
gwbuf_free(router->stored_event);
|
||||
router->stored_event = NULL;
|
||||
}
|
||||
|
||||
if (msg)
|
||||
@ -1664,33 +1550,8 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
|
||||
MXS_FREE(msg);
|
||||
msg = NULL;
|
||||
}
|
||||
prev_length = len;
|
||||
while (len > 0)
|
||||
{
|
||||
unsigned int n, plen;
|
||||
plen = GWBUF_LENGTH(pkt);
|
||||
n = (plen < len ? plen : len);
|
||||
pkt = gwbuf_consume(pkt, n);
|
||||
len -= n;
|
||||
pkt_length -= n;
|
||||
}
|
||||
|
||||
pn_bufs = n_bufs;
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if we have a residual, part binlog message to deal with.
|
||||
* Just simply store the GWBUF for next time
|
||||
*/
|
||||
if (pkt)
|
||||
{
|
||||
router->residual = pkt;
|
||||
ss_dassert(pkt_length != 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
ss_dassert(pkt_length == 0);
|
||||
}
|
||||
blr_file_flush(router);
|
||||
}
|
||||
|
||||
@ -2129,13 +1990,6 @@ blr_stop_start_master(ROUTER_INSTANCE *router)
|
||||
}
|
||||
}
|
||||
|
||||
/* Discard the queued residual data */
|
||||
while (router->residual)
|
||||
{
|
||||
router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual));
|
||||
}
|
||||
router->residual = NULL;
|
||||
|
||||
router->master_state = BLRM_UNCONNECTED;
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
@ -2298,45 +2152,6 @@ static void blr_log_identity(ROUTER_INSTANCE *router)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Write data into binlogs (incomplete event)
|
||||
*
|
||||
* Writes @c data_len bytes of data from @c buf into the current binlog being processed.
|
||||
*
|
||||
* @param router Router instance
|
||||
* @param data_len Number of bytes to write
|
||||
* @param buf Pointer where the data is read
|
||||
* @return Number of bytes written or 0 on error
|
||||
*/
|
||||
int
|
||||
blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf)
|
||||
{
|
||||
int n;
|
||||
|
||||
if ((n = pwrite(router->binlog_fd, buf, data_len,
|
||||
router->last_written)) != data_len)
|
||||
{
|
||||
char err_msg[MXS_STRERROR_BUFLEN];
|
||||
MXS_ERROR("%s: Failed to write binlog record at %lu of %s, %s. "
|
||||
"Truncating to previous record.",
|
||||
router->service->name, router->last_written,
|
||||
router->binlog_name,
|
||||
strerror_r(errno, err_msg, sizeof(err_msg)));
|
||||
|
||||
/* Remove any partial event that was written */
|
||||
if (ftruncate(router->binlog_fd, router->last_written))
|
||||
{
|
||||
MXS_ERROR("%s: Failed to truncate binlog record at %lu of %s, %s. ",
|
||||
router->service->name, router->last_written,
|
||||
router->binlog_name,
|
||||
strerror_r(errno, err_msg, sizeof(err_msg)));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
router->last_written += data_len;
|
||||
return n;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a replication event packet to a slave
|
||||
*
|
||||
@ -2485,26 +2300,6 @@ bool blr_send_event(blr_thread_role_t role,
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the checksum from the binlogs
|
||||
*
|
||||
* This updates the internal state of the router and will allow us to detect
|
||||
* if the checksum is split across two packets.
|
||||
* @param router Router instance
|
||||
* @param cksumptr Pointer to the checksum
|
||||
* @param len How much of the data is readable
|
||||
*/
|
||||
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len)
|
||||
{
|
||||
uint8_t *ptr = cksumptr;
|
||||
while (ptr - cksumptr < len && router->partial_checksum_bytes < MYSQL_CHECKSUM_LEN)
|
||||
{
|
||||
router->partial_checksum[router->partial_checksum_bytes] = *ptr;
|
||||
ptr++;
|
||||
router->partial_checksum_bytes++;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the slave connection and log errors
|
||||
*
|
||||
|
@ -3332,13 +3332,6 @@ blr_stop_slave(ROUTER_INSTANCE* router, ROUTER_SLAVE* slave)
|
||||
}
|
||||
}
|
||||
|
||||
/* Discard the queued residual data */
|
||||
while (router->residual)
|
||||
{
|
||||
router->residual = gwbuf_consume(router->residual, GWBUF_LENGTH(router->residual));
|
||||
}
|
||||
router->residual = NULL;
|
||||
|
||||
/* Now it is safe to unleash other threads on this router instance */
|
||||
router->reconnect_pending = 0;
|
||||
router->active_logs = 0;
|
||||
|
Loading…
x
Reference in New Issue
Block a user