Merge branch '1.2.1-binlog_router_trx' into develop

This commit is contained in:
Markus Makela 2016-02-19 18:51:59 +02:00
commit 28bd7c2202
6 changed files with 615 additions and 211 deletions

View File

@ -44,10 +44,12 @@
#include <stdint.h>
#include <memlog.h>
#include <zlib.h>
#include <mysql_client_server_protocol.h>
#define BINLOG_FNAMELEN 255
#define BLR_PROTOCOL "MySQLBackend"
#define BINLOG_MAGIC { 0xfe, 0x62, 0x69, 0x6e }
#define BINLOG_MAGIC_SIZE 4
#define BINLOG_NAMEFMT "%s.%06d"
#define BINLOG_NAME_ROOT "mysql-bin"
@ -197,6 +199,17 @@
#define MYSQL_ERROR_MSG(buf) ((uint8_t *)GWBUF_DATA(buf) + 7)
#define MYSQL_COMMAND(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4))
/** Possible states of an event sent by the master */
enum blr_event_state
{
BLR_EVENT_DONE, /*< No event being processed */
BLR_EVENT_STARTED, /*< The first packet of an event which spans multiple packets
* has been received */
BLR_EVENT_ONGOING, /*< Other packets of a multi-packet event are being processed */
BLR_EVENT_COMPLETE /*< A multi-packet event has been successfully processed
* but the router is not yet ready to process another one */
};
/* Master Server configuration struct */
typedef struct master_server_config {
char *host;
@ -414,6 +427,13 @@ typedef struct router_instance {
SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */
int trx_safe; /*< Detect and handle partial transactions */
int pending_transaction; /*< Pending transaction */
enum blr_event_state master_event_state; /*< Packet read state */
uint32_t stored_checksum; /*< The current value of the checksum */
uint8_t partial_checksum[MYSQL_CHECKSUM_LEN]; /*< The partial value of the checksum
* received from the master */
uint8_t partial_checksum_bytes; /*< How many bytes of the checksum we have read */
uint64_t checksum_size; /*< Data size for the checksum */
REP_HEADER stored_header; /*< Relication header of the event the master is sending */
uint64_t last_safe_pos; /* last committed transaction */
char binlog_name[BINLOG_FNAMELEN+1];
/*< Name of the current binlog file */
@ -424,7 +444,8 @@ typedef struct router_instance {
int binlog_fd; /*< File descriptor of the binlog
* file being written
*/
uint64_t last_written; /*< Position of last event written */
uint64_t last_written; /*< Position of the last write operation */
uint64_t last_event_pos; /*< Position of last event written */
uint64_t current_safe_event;
/*< Position of the latest safe event being sent to slaves */
char prevbinlog[BINLOG_FNAMELEN+1];
@ -555,7 +576,7 @@ extern int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool
extern void blr_init_cache(ROUTER_INSTANCE *);
extern int blr_file_init(ROUTER_INSTANCE *);
extern int blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *);
extern int blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *, uint32_t pos, uint8_t *);
extern int blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t);
extern void blr_file_flush(ROUTER_INSTANCE *);
extern BLFILE *blr_open_binlog(ROUTER_INSTANCE *, char *);

View File

@ -71,6 +71,7 @@
#define GW_MYSQL_READ 0
#define GW_MYSQL_WRITE 1
#define MYSQL_HEADER_LEN 4L
#define MYSQL_CHECKSUM_LEN 4L
#define GW_MYSQL_PROTOCOL_VERSION 10 // version is 10
#define GW_MYSQL_HANDSHAKE_FILLER 0x00
@ -81,6 +82,9 @@
#define GW_MYSQL_SCRAMBLE_SIZE 20
#define GW_SCRAMBLE_LENGTH_323 8
/** Maximum length of a MySQL packet */
#define MYSQL_PACKET_LENGTH_MAX 0x00ffffff
#ifndef MYSQL_SCRAMBLE_LEN
# define MYSQL_SCRAMBLE_LEN GW_MYSQL_SCRAMBLE_SIZE
#endif

View File

@ -507,6 +507,7 @@ char task_name[BLRM_TASK_NAME_LEN+1] = "";
inst->binlog_position = 0;
inst->current_pos = 0;
inst->current_safe_event = 0;
inst->master_event_state = BLR_EVENT_DONE;
strcpy(inst->binlog_name, "");
strcpy(inst->prevbinlog, "");

View File

@ -189,59 +189,78 @@ blr_file_rotate(ROUTER_INSTANCE *router, char *file, uint64_t pos)
* binlog files need an initial 4 magic bytes at the start. blr_file_add_magic()
* adds them.
*
* @param router The router instance
* @param fd file descriptor to the open binlog file
* @return Nothing
* @param fd file descriptor to the open binlog file
* @return True if the magic string could be written to the file.
*/
static void
blr_file_add_magic(ROUTER_INSTANCE *router, int fd)
static bool
blr_file_add_magic(int fd)
{
unsigned char magic[] = BINLOG_MAGIC;
static const unsigned char magic[] = BINLOG_MAGIC;
write(fd, magic, 4);
router->current_pos = 4; /* Initial position after the magic number */
router->binlog_position = 4; /* Initial position after the magic number */
router->current_safe_event = 4;
router->last_written = 0;
ssize_t written = write(fd, magic, BINLOG_MAGIC_SIZE);
return written == BINLOG_MAGIC_SIZE;
}
/**
* Create a new binlog file for the router to use.
*
* @param router The router instance
* @param file The binlog file name
* @return Non-zero if the fie creation succeeded
* @param router The router instance
* @param file The binlog file name
* @return Non-zero if the fie creation succeeded
*/
static int
blr_file_create(ROUTER_INSTANCE *router, char *file)
{
char path[PATH_MAX + 1] = "";
int fd;
int created = 0;
char err_msg[STRERROR_BUFLEN];
strcpy(path, router->binlogdir);
strcat(path, "/");
strcat(path, file);
char path[PATH_MAX + 1] = "";
if ((fd = open(path, O_RDWR|O_CREAT, 0666)) != -1)
{
blr_file_add_magic(router,fd);
}
else
{
char err_msg[STRERROR_BUFLEN];
strcpy(path, router->binlogdir);
strcat(path, "/");
strcat(path, file);
MXS_ERROR("%s: Failed to create binlog file %s, %s.",
int fd = open(path, O_RDWR|O_CREAT, 0666);
if (fd != -1)
{
if (blr_file_add_magic(fd))
{
close(router->binlog_fd);
spinlock_acquire(&router->binlog_lock);
strncpy(router->binlog_name, file, BINLOG_FNAMELEN);
router->binlog_fd = fd;
router->current_pos = BINLOG_MAGIC_SIZE; /* Initial position after the magic number */
router->binlog_position = BINLOG_MAGIC_SIZE;
router->current_safe_event = BINLOG_MAGIC_SIZE;
router->last_written = BINLOG_MAGIC_SIZE;
router->last_event_pos = 0;
spinlock_release(&router->binlog_lock);
created = 1;
}
else
{
MXS_ERROR("%s: Failed to write magic string to created binlog file %s, %s.",
router->service->name, path, strerror_r(errno, err_msg, sizeof(err_msg)));
close(fd);
if (!unlink(path))
{
MXS_ERROR("%s: Failed to delete file %s, %s.",
router->service->name, path, strerror_r(errno, err_msg, sizeof(err_msg)));
return 0;
}
fsync(fd);
close(router->binlog_fd);
spinlock_acquire(&router->binlog_lock);
strncpy(router->binlog_name, file, BINLOG_FNAMELEN);
router->binlog_fd = fd;
spinlock_release(&router->binlog_lock);
return 1;
}
}
}
else
{
MXS_ERROR("%s: Failed to create binlog file %s, %s.",
router->service->name, path, strerror_r(errno, err_msg, sizeof(err_msg)));
}
return created;
}
/**
@ -273,7 +292,16 @@ int fd;
router->current_pos = lseek(fd, 0L, SEEK_END);
if (router->current_pos < 4) {
if (router->current_pos == 0) {
blr_file_add_magic(router, fd);
if (blr_file_add_magic(fd))
{
router->current_pos = BINLOG_MAGIC_SIZE;
router->binlog_position = BINLOG_MAGIC_SIZE;
router->current_safe_event = BINLOG_MAGIC_SIZE;
router->last_written = BINLOG_MAGIC_SIZE;
router->last_event_pos = 0;
} else {
MXS_ERROR("%s: Could not write magic to binlog file.", router->service->name);
}
} else {
/* If for any reason the file's length is between 1 and 3 bytes
* then report an error. */
@ -297,26 +325,33 @@ int fd;
* @return Return the number of bytes written
*/
int
blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *buf)
blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint32_t size, uint8_t *buf)
{
int n;
if ((n = pwrite(router->binlog_fd, buf, hdr->event_size,
hdr->next_pos - hdr->event_size)) != hdr->event_size)
if ((n = pwrite(router->binlog_fd, buf, size,
router->last_written)) != size)
{
char err_msg[STRERROR_BUFLEN];
MXS_ERROR("%s: Failed to write binlog record at %d of %s, %s. "
MXS_ERROR("%s: Failed to write binlog record at %lu of %s, %s. "
"Truncating to previous record.",
router->service->name, hdr->next_pos - hdr->event_size,
router->service->name, router->last_written,
router->binlog_name,
strerror_r(errno, err_msg, sizeof(err_msg)));
/* Remove any partual event that was written */
ftruncate(router->binlog_fd, hdr->next_pos - hdr->event_size);
/* Remove any partial event that was written */
if (ftruncate(router->binlog_fd, router->last_written))
{
MXS_ERROR("%s: Failed to truncate binlog record at %lu of %s, %s. ",
router->service->name, router->last_written,
router->binlog_name,
strerror_r(errno, err_msg, sizeof(err_msg)));
}
return 0;
}
spinlock_acquire(&router->binlog_lock);
router->current_pos = hdr->next_pos;
router->last_written = hdr->next_pos - hdr->event_size;
router->last_written += size;
router->last_event_pos = hdr->next_pos - hdr->event_size;
spinlock_release(&router->binlog_lock);
return n;
}
@ -462,10 +497,9 @@ struct stat statb;
if (strcmp(router->binlog_name, file->binlogname) == 0 &&
pos >= router->binlog_position)
{
if (pos > router->binlog_position && !router->rotating)
if (pos > router->binlog_position)
{
/* Unsafe position, slave will be disconnected by the calling routine */
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Requested binlog position %lu. Position is unsafe so disconnecting. "
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Requested binlog position %lu is unsafe. "
"Latest safe position %lu, end of binlog file %lu",
pos, router->binlog_position, router->current_pos);

View File

@ -99,6 +99,10 @@ extern char * blr_last_event_description(ROUTER_INSTANCE *router);
static void blr_log_identity(ROUTER_INSTANCE *router);
static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code);
int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf);
bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf);
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len);
static int keepalive = 1;
/**
@ -289,6 +293,7 @@ blr_master_close(ROUTER_INSTANCE *router)
{
dcb_close(router->master);
router->master_state = BLRM_UNCONNECTED;
router->master_event_state = BLR_EVENT_DONE;
}
/**
@ -915,7 +920,7 @@ int n_bufs = -1, pn_bufs = -1;
* copy if the message straddles GWBUF's.
*/
if (len < BINLOG_EVENT_HDR_LEN)
if (len < BINLOG_EVENT_HDR_LEN && router->master_event_state != BLR_EVENT_ONGOING)
{
char *msg = "";
@ -932,90 +937,193 @@ int n_bufs = -1, pn_bufs = -1;
}
else
{
router->stats.n_binlogs++;
router->stats.n_binlogs_ses++;
blr_extract_header(ptr, &hdr);
/* Sanity check */
if (hdr.ok == 0 && hdr.event_size != len - 5)
if (router->master_event_state == BLR_EVENT_DONE)
{
MXS_ERROR("Packet length is %d, but event size is %d, "
"binlog file %s position %lu "
"reslen is %d and preslen is %d, "
"length of previous event %d. %s",
len, hdr.event_size,
router->binlog_name,
router->current_pos,
reslen, preslen, prev_length,
(prev_length == -1 ?
(no_residual ? "No residual data from previous call" :
"Residual data from previous call") : ""));
router->stats.n_binlogs++;
router->stats.n_binlogs_ses++;
blr_log_packet(LOG_ERR, "Packet:", ptr, len);
MXS_ERROR("This event (0x%x) was contained in %d GWBUFs, "
"the previous events was contained in %d GWBUFs",
router->lastEventReceived, n_bufs, pn_bufs);
if (msg)
blr_extract_header(ptr, &hdr);
/* Sanity check */
if (hdr.ok == 0)
{
free(msg);
msg = NULL;
}
break;
}
if (hdr.ok == 0)
{
int event_limit;
spinlock_acquire(&router->lock);
/* set mysql errno to 0 */
router->m_errno = 0;
/* Remove error message */
if (router->m_errmsg)
free(router->m_errmsg);
router->m_errmsg = NULL;
spinlock_release(&router->lock);
#ifdef SHOW_EVENTS
printf("blr: event type 0x%02x, flags 0x%04x, event size %d, event timestamp %lu\n", hdr.event_type, hdr.flags, hdr.event_size, hdr.timestamp);
#endif
/*
* First check that the checksum we calculate matches the
* checksum in the packet we received.
*/
if (router->master_chksum)
{
uint32_t chksum, pktsum;
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, ptr + 5, hdr.event_size - 4);
pktsum = EXTRACT32(ptr + hdr.event_size + 1);
if (pktsum != chksum)
if (hdr.event_size != len - 5 && (hdr.event_size + 1) < MYSQL_PACKET_LENGTH_MAX)
{
router->stats.n_badcrc++;
MXS_ERROR("Packet length is %d, but event size is %d, "
"binlog file %s position %lu "
"reslen is %d and preslen is %d, "
"length of previous event %d. %s",
len, hdr.event_size,
router->binlog_name,
router->current_pos,
reslen, preslen, prev_length,
(prev_length == -1 ?
(no_residual ? "No residual data from previous call" :
"Residual data from previous call") : ""));
blr_log_packet(LOG_ERR, "Packet:", ptr, len);
MXS_ERROR("This event (0x%x) was contained in %d GWBUFs, "
"the previous events was contained in %d GWBUFs",
router->lastEventReceived, n_bufs, pn_bufs);
if (msg)
{
free(msg);
msg = NULL;
}
MXS_ERROR("%s: Checksum error in event "
"from master, "
"binlog %s @ %lu. "
"Closing master connection.",
router->service->name,
router->binlog_name,
router->current_pos);
blr_master_close(router);
blr_master_delayed_connect(router);
return;
break;
}
else if ((hdr.event_size + 1) >= MYSQL_PACKET_LENGTH_MAX)
{
router->master_event_state = BLR_EVENT_STARTED;
/** Store the header for later use */
memcpy(&router->stored_header, &hdr, sizeof(hdr));
}
/** Prepare the checksum variables for this event */
router->stored_checksum = crc32(0L, NULL, 0);
router->checksum_size = hdr.event_size - MYSQL_CHECKSUM_LEN;
router->partial_checksum_bytes = 0;
}
if (hdr.ok == 0)
{
spinlock_acquire(&router->lock);
/* set mysql errno to 0 */
router->m_errno = 0;
/* Remove error message */
if (router->m_errmsg)
free(router->m_errmsg);
router->m_errmsg = NULL;
spinlock_release(&router->lock);
#ifdef SHOW_EVENTS
printf("blr: len %lu, event type 0x%02x, flags 0x%04x, event size %d, event timestamp %lu\n", (unsigned long)len-4, hdr.event_type, hdr.flags, hdr.event_size, (unsigned long)hdr.timestamp);
#endif
}
}
/* pending large event */
if (router->master_event_state != BLR_EVENT_DONE)
{
if (len - MYSQL_HEADER_LEN < MYSQL_PACKET_LENGTH_MAX)
{
/** This is the last packet, we can now proceed to distribute
* the event afer it has been written to disk */
ss_dassert(router->master_event_state != BLR_EVENT_COMPLETE);
router->master_event_state = BLR_EVENT_COMPLETE;
memcpy(&hdr, &router->stored_header, sizeof(hdr));
}
else
{
/* current partial event is being written to disk file */
uint32_t offset = MYSQL_HEADER_LEN;
uint32_t extra_bytes = MYSQL_HEADER_LEN;
/** Don't write the OK byte into the binlog */
if (router->master_event_state == BLR_EVENT_STARTED)
{
offset = MYSQL_HEADER_LEN + 1;
router->master_event_state = BLR_EVENT_ONGOING;
extra_bytes = MYSQL_HEADER_LEN + 1;
}
if (router->master_chksum)
{
uint32_t size = MIN(len - extra_bytes, router->checksum_size);
router->stored_checksum = crc32(router->stored_checksum,
ptr + offset,
size);
router->checksum_size -= size;
if (router->checksum_size == 0 && size < len - offset)
{
extract_checksum(router, ptr + offset + size, len - offset - size);
}
}
if (blr_write_data_into_binlog(router, len - offset, ptr + offset) == 0)
{
/** Failed to write to the binlog file, destroy the buffer
* chain and close the connection with the master */
while (pkt)
{
pkt = GWBUF_CONSUME_ALL(pkt);
}
blr_master_close(router);
blr_master_delayed_connect(router);
return;
}
pkt = gwbuf_consume(pkt, len);
pkt_length -= len;
continue;
}
}
/*
* First check that the checksum we calculate matches the
* checksum in the packet we received.
*/
if (router->master_chksum)
{
uint32_t pktsum, offset = MYSQL_HEADER_LEN;
uint32_t size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN;
if (router->master_event_state == BLR_EVENT_DONE)
{
/** Set the pointer offset to the first byte after
* the header and OK byte */
offset = MYSQL_HEADER_LEN + 1;
size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN - 1;
}
size = MIN(size, router->checksum_size);
if (router->checksum_size > 0)
{
router->stored_checksum = crc32(router->stored_checksum,
ptr + offset,
size);
router->checksum_size -= size;
}
if(router->checksum_size == 0 && size < len - offset)
{
extract_checksum(router, ptr + offset + size, len - offset - size);
}
if (router->partial_checksum_bytes == MYSQL_CHECKSUM_LEN)
{
pktsum = EXTRACT32(&router->partial_checksum);
if (pktsum != router->stored_checksum)
{
router->stats.n_badcrc++;
free(msg);
msg = NULL;
MXS_ERROR("%s: Checksum error in event from master, "
"binlog %s @ %lu. Closing master connection.",
router->service->name, router->binlog_name,
router->current_pos);
blr_master_close(router);
blr_master_delayed_connect(router);
return;
}
}
else
{
pkt = gwbuf_consume(pkt, len);
pkt_length -= len;
continue;
}
}
if (hdr.ok == 0)
{
router->lastEventReceived = hdr.event_type;
router->lastEventTimestamp = hdr.timestamp;
@ -1045,7 +1153,7 @@ int n_bufs = -1, pn_bufs = -1;
* This marks the transaction starts instead of
* QUERY_EVENT with "BEGIN"
*/
if (router->trx_safe) {
if (router->trx_safe && router->master_event_state == BLR_EVENT_DONE) {
if (router->mariadb10_compat) {
if (hdr.event_type == MARIADB10_GTID_EVENT) {
uint64_t n_sequence;
@ -1130,7 +1238,9 @@ int n_bufs = -1, pn_bufs = -1;
}
}
event_limit = router->mariadb10_compat ? MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;
/** Gather statistics about the replication event types */
int event_limit = router->mariadb10_compat ?
MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;
if (hdr.event_type <= event_limit)
router->stats.events[hdr.event_type]++;
@ -1193,13 +1303,30 @@ int n_bufs = -1, pn_bufs = -1;
}
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
{
ptr = ptr + 5; // We don't put the first byte of the payload
// into the binlog file
if (hdr.event_type == ROTATE_EVENT)
router->rotating = 1;
ptr = ptr + 4; // Skip header
uint32_t offset = 4;
/* current event is being written to disk file */
if (blr_write_binlog_record(router, &hdr, ptr) == 0)
if (router->master_event_state == BLR_EVENT_STARTED ||
router->master_event_state == BLR_EVENT_DONE)
{
ptr++;
offset++;
}
if (hdr.event_type == ROTATE_EVENT)
{
spinlock_acquire(&router->binlog_lock);
router->rotating = 1;
spinlock_release(&router->binlog_lock);
}
/* Current event is being written to disk file.
* It is possible for an empty packet to be sent if an
* event is exactly 2^24 bytes long. In this case the
* empty packet should be discarded. */
if (len > MYSQL_HEADER_LEN &&
blr_write_binlog_record(router, &hdr, len - offset, ptr) == 0)
{
/*
* Failed to write to the
@ -1247,8 +1374,28 @@ int n_bufs = -1, pn_bufs = -1;
spinlock_release(&router->binlog_lock);
/* Now distribute events */
blr_distribute_binlog_record(router, &hdr, ptr);
if (router->master_event_state == BLR_EVENT_COMPLETE)
{
/** Read the complete event from the disk */
GWBUF *record = blr_read_events_from_pos(router, router->last_event_pos, &hdr, hdr.next_pos);
if (record)
{
uint8_t *data = GWBUF_DATA(record);
blr_distribute_binlog_record(router, &hdr, data);
gwbuf_free(record);
}
else
{
MXS_ERROR("Failed to read event at position"
"%lu with a size of %u bytes.",
router->last_event_pos, hdr.event_size);
}
}
else
{
/* Now distribute events */
blr_distribute_binlog_record(router, &hdr, ptr);
}
} else {
/**
* If transaction is closed:
@ -1358,7 +1505,9 @@ int n_bufs = -1, pn_bufs = -1;
ptr += 5;
if (hdr.event_type == ROTATE_EVENT)
{
spinlock_acquire(&router->binlog_lock);
router->rotating = 1;
spinlock_release(&router->binlog_lock);
if (!blr_rotate_event(router, ptr, &hdr))
{
/*
@ -1376,6 +1525,12 @@ int n_bufs = -1, pn_bufs = -1;
}
}
}
/** A large event is now fully received and processed */
if(router->master_event_state == BLR_EVENT_COMPLETE)
{
router->master_event_state = BLR_EVENT_DONE;
}
}
else
{
@ -1474,6 +1629,7 @@ blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr)
* @param router The instance of the router
* @param ptr The packet containing the rotate event
* @param hdr The replication message header
* @return 1 if the file could be rotated, 0 otherwise.
*/
static int
blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr)
@ -1505,17 +1661,20 @@ char file[BINLOG_FNAMELEN+1];
strcpy(router->prevbinlog, router->binlog_name);
int rotated = 1;
if (strncmp(router->binlog_name, file, slen) != 0)
{
router->stats.n_rotates++;
if (blr_file_rotate(router, file, pos) == 0)
{
router->rotating = 0;
return 0;
rotated = 0;
}
}
spinlock_acquire(&router->binlog_lock);
router->rotating = 0;
return 1;
spinlock_release(&router->binlog_lock);
return rotated;
}
/**
@ -1602,6 +1761,11 @@ unsigned int cstate;
/* Slave is in catchup mode */
action = 3;
}
else
{
MXS_ERROR("slave->cstate does not contain a meaningful state %d", slave->cstate);
action = 0;
}
slave->stats.n_actions[action-1]++;
spinlock_release(&slave->catch_lock);
@ -1621,7 +1785,7 @@ unsigned int cstate;
*/
slave_action = SLAVE_SEND_EVENT;
}
else if (slave->binlog_pos == router->last_written &&
else if (slave->binlog_pos == router->last_event_pos &&
(strcmp(slave->binlogfile, router->binlog_name) == 0 ||
(hdr->event_type == ROTATE_EVENT &&
strcmp(slave->binlogfile, router->prevbinlog))))
@ -1680,20 +1844,13 @@ unsigned int cstate;
if (router->send_slave_heartbeat)
slave->lastReply = time(0);
pkt = gwbuf_alloc(hdr->event_size + 5);
buf = GWBUF_DATA(pkt);
encode_value(buf, hdr->event_size + 1, 24);
buf += 3;
*buf++ = slave->seqno++;
*buf++ = 0; // OK
memcpy(buf, ptr, hdr->event_size);
if (hdr->event_type == ROTATE_EVENT)
{
blr_slave_rotate(router, slave, ptr);
}
slave->stats.n_bytes += gwbuf_length(pkt);
slave->stats.n_events++;
slave->dcb->func.write(slave->dcb, pkt);
blr_send_event(slave, hdr, ptr);
spinlock_acquire(&slave->catch_lock);
if (hdr->event_type != ROTATE_EVENT)
{
@ -2215,3 +2372,163 @@ ROUTER_SLAVE *slave;
spinlock_release(&router->lock);
}
int
blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf)
{
int n;
if ((n = pwrite(router->binlog_fd, buf, data_len,
router->last_written)) != data_len)
{
char err_msg[STRERROR_BUFLEN];
MXS_ERROR("%s: Failed to write binlog record at %lu of %s, %s. "
"Truncating to previous record.",
router->service->name, router->last_written,
router->binlog_name,
strerror_r(errno, err_msg, sizeof(err_msg)));
/* Remove any partial event that was written */
if (ftruncate(router->binlog_fd, router->last_written))
{
MXS_ERROR("%s: Failed to truncate binlog record at %lu of %s, %s. ",
router->service->name, router->last_written,
router->binlog_name,
strerror_r(errno, err_msg, sizeof(err_msg)));
}
return 0;
}
router->last_written += data_len;
return n;
}
/**
* Send a replication event packet to a slave
*
* The first replication event packet contains one byte set to either
* 0x0, 0xfe or 0xff which signals what the state of the replication stream is.
* If the data pointed by @c buf is not the start of the replication header
* and part of the replication event is already sent, @c first must be set to
* false so that the first status byte is not sent again.
*
* @param slave Slave where the packet is sent to
* @param buf Buffer containing the data
* @param len Length of the data
* @param first If this is the first packet of a multi-packet event
* @return True on success, false when memory allocation fails
*/
bool blr_send_packet(ROUTER_SLAVE *slave, uint8_t *buf, uint32_t len, bool first)
{
bool rval = true;
unsigned int datalen = len + (first ? 1 : 0);
GWBUF *buffer = gwbuf_alloc(datalen + MYSQL_HEADER_LEN);
if (buffer)
{
uint8_t *data = GWBUF_DATA(buffer);
encode_value(data, datalen, 24);
data += 3;
*data++ = slave->seqno++;
if (first)
{
*data++ = 0; // OK byte
}
if (len > 0)
{
memcpy(data, buf, len);
}
slave->stats.n_bytes += GWBUF_LENGTH(buffer);
slave->dcb->func.write(slave->dcb, buffer);
}
else
{
MXS_ERROR("failed to allocate %ld bytes of memory when writing an"
" event.", datalen + MYSQL_HEADER_LEN);
rval = false;
}
return rval;
}
/**
* Send a single replication event to a slave
*
* This sends the complete replication event to a slave. If the event size exceeds
* the maximum size of a MySQL packet, it will be sent in multiple packets.
*
* @param slave Slave where the event is sent to
* @param hdr Replication header
* @param buf Pointer to the replication event as it was read from the disk
* @return True on success, false if memory allocation failed
*/
bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf)
{
bool rval = true;
/** Check if the event and the OK byte fit into a single packet */
if (hdr->event_size + 1 < MYSQL_PACKET_LENGTH_MAX)
{
rval = blr_send_packet(slave, buf, hdr->event_size, true);
}
else
{
/** Total size of all the payloads in all the packets */
int64_t len = hdr->event_size + 1;
bool first = true;
while (rval && len > 0)
{
uint64_t payload_len = first ? MYSQL_PACKET_LENGTH_MAX - 1 :
MIN(MYSQL_PACKET_LENGTH_MAX, len);
if (blr_send_packet(slave, buf, payload_len, first))
{
/** The check for exactly 0x00ffffff bytes needs to be done
* here as well */
if (len == MYSQL_PACKET_LENGTH_MAX)
{
blr_send_packet(slave, buf, 0, false);
}
/** Add the extra byte written by blr_send_packet */
len -= first ? payload_len + 1 : payload_len;
buf += payload_len;
first = false;
}
else
{
rval = false;
}
}
}
slave->stats.n_events++;
if (!rval)
{
MXS_ERROR("Failed to send an event of %u bytes to slave at %s:%d.",
hdr->event_size, slave->dcb->remote,
ntohs(slave->dcb->ipv4.sin_port));
}
return rval;
}
/**
* Extract the checksum from the binlogs
*
* This updates the internal state of the router and will allow us to detect
* if the checksum is split across two packets.
* @param router Router instance
* @param cksumptr Pointer to the checksum
* @param len How much of the data is readable
*/
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len)
{
uint8_t *ptr = cksumptr;
while (ptr - cksumptr < len)
{
router->partial_checksum[router->partial_checksum_bytes] = *ptr;
ptr++;
router->partial_checksum_bytes++;
}
}

View File

@ -152,6 +152,7 @@ static int blr_slave_handle_status_variables(ROUTER_INSTANCE *router, ROUTER_SLA
static int blr_slave_send_columndef_with_status_schema(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *name, int type, int len, uint8_t seqno);
static void blr_send_slave_heartbeat(void *inst);
static int blr_slave_send_heartbeat(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf);
void poll_fake_write_event(DCB *dcb);
@ -1723,6 +1724,47 @@ uint32_t chksum;
strncpy(slave->binlogfile, (char *)ptr, binlognamelen);
slave->binlogfile[binlognamelen] = 0;
if (router->trx_safe)
{
/**
* Check for a pending transaction and possible unsafe position.
* Force slave disconnection if requested position is unsafe.
*/
bool force_disconnect = false;
spinlock_acquire(&router->binlog_lock);
if (router->pending_transaction && strcmp(router->binlog_name, slave->binlogfile) == 0 &&
(slave->binlog_pos > router->binlog_position))
{
force_disconnect = true;
}
spinlock_release(&router->binlog_lock);
if (force_disconnect)
{
MXS_ERROR("%s: Slave %s:%i, server-id %d, binlog '%s', blr_slave_binlog_dump failure: "
"Requested binlog position %lu. Position is unsafe so disconnecting. "
"Latest safe position %lu, end of binlog file %lu",
router->service->name,
slave->dcb->remote,
ntohs((slave->dcb->ipv4).sin_port),
slave->serverid,
slave->binlogfile,
(unsigned long)slave->binlog_pos,
router->binlog_position,
router->current_pos);
/*
* Close the slave session and socket
* The slave will try to reconnect
*/
dcb_close(slave->dcb);
return 1;
}
}
MXS_DEBUG("%s: COM_BINLOG_DUMP: binlog name '%s', length %d, "
"from position %lu.", router->service->name,
slave->binlogfile, binlognamelen,
@ -1905,15 +1947,32 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
burst = router->long_burst;
else
burst = router->short_burst;
burst_size = router->burst_size;
spinlock_acquire(&slave->catch_lock);
if (slave->cstate & CS_BUSY)
{
spinlock_release(&slave->catch_lock);
return 0;
}
slave->cstate |= CS_BUSY;
spinlock_release(&slave->catch_lock);
int do_return;
spinlock_acquire(&router->binlog_lock);
do_return = 0;
/* check for a pending transaction and safe position */
if (router->pending_transaction && strcmp(router->binlog_name, slave->binlogfile) == 0 &&
(slave->binlog_pos > router->binlog_position)) {
do_return = 1;
}
spinlock_release(&router->binlog_lock);
if (do_return) {
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_BUSY;
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
return 0;
}
BLFILE *file;
#ifdef BLFILE_IN_SLAVE
@ -1957,25 +2016,17 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
return 0;
}
}
slave->stats.n_bursts++;
#ifdef BLSLAVE_IN_FILE
slave->file = file;
#endif
int events_before = slave->stats.n_events;
while (burst-- && burst_size > 0 &&
(record = blr_read_binlog(router, file, slave->binlog_pos, &hdr, read_errmsg)) != NULL)
{
head = gwbuf_alloc(5);
ptr = GWBUF_DATA(head);
encode_value(ptr, hdr.event_size + 1, 24);
ptr += 3;
*ptr++ = slave->seqno++;
*ptr++ = 0; // OK
head = gwbuf_append(head, record);
slave->lastEventTimestamp = hdr.timestamp;
slave->lastEventReceived = hdr.event_type;
if (hdr.event_type == ROTATE_EVENT)
{
unsigned long beat1 = hkheartbeat;
@ -2026,15 +2077,18 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
MXS_ERROR("blr_open_binlog took %lu beats",
hkheartbeat - beat1);
}
slave->stats.n_bytes += gwbuf_length(head);
written = slave->dcb->func.write(slave->dcb, head);
if (written && hdr.event_type != ROTATE_EVENT)
{
slave->binlog_pos = hdr.next_pos;
}
rval = written;
slave->stats.n_events++;
burst_size -= hdr.event_size;
if (blr_send_event(slave, &hdr, (uint8_t*) record->start))
{
if (hdr.event_type != ROTATE_EVENT)
{
slave->binlog_pos = hdr.next_pos;
}
slave->stats.n_events++;
burst_size -= hdr.event_size;
}
gwbuf_free(record);
record = NULL;
/* set lastReply for slave heartbeat check */
if (router->send_slave_heartbeat)
@ -2103,24 +2157,16 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
if (hdr.ok == SLAVE_POS_READ_UNSAFE) {
MXS_ERROR("%s: Slave %s:%i, server-id %d, binlog '%s', %s",
MXS_NOTICE("%s: Slave %s:%i, server-id %d, binlog '%s', read %d events, "
"current committed transaction event being sent: %lu, %s",
router->service->name,
slave->dcb->remote,
ntohs((slave->dcb->ipv4).sin_port),
slave->serverid,
slave->binlogfile,
slave->stats.n_events - events_before,
router->current_safe_event,
read_errmsg);
/*
* Close the slave session and socket
* The slave will try to reconnect
*/
dcb_close(slave->dcb);
#ifndef BLFILE_IN_SLAVE
blr_close_binlog(router, file);
#endif
return 0;
}
}
spinlock_acquire(&slave->catch_lock);
@ -2308,33 +2354,15 @@ unsigned int cstate;
{
if (slave->state == BLRS_DUMPING)
{
int do_return;
spinlock_acquire(&router->binlog_lock);
do_return = 0;
cstate = slave->cstate;
/* check for a pending transaction and not rotating */
if (router->pending_transaction && strcmp(router->binlog_name, slave->binlogfile) == 0 &&
(slave->binlog_pos > router->binlog_position) && !router->rotating) {
do_return = 1;
}
spinlock_release(&router->binlog_lock);
if (do_return) {
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
return 0;
}
spinlock_acquire(&slave->catch_lock);
if (slave->cstate & CS_BUSY)
{
spinlock_release(&slave->catch_lock);
return 0;
}
cstate = slave->cstate;
slave->cstate &= ~(CS_UPTODATE|CS_EXPECTCB);
slave->cstate |= CS_BUSY;
spinlock_release(&slave->catch_lock);
if ((cstate & CS_UPTODATE) == CS_UPTODATE)
@ -4641,4 +4669,3 @@ int filename_len = strlen(slave->binlogfile);
/* Write the packet */
return slave->dcb->func.write(slave->dcb, resp);
}