Merge branch 'develop' into MXS-544
This commit is contained in:
@ -4,7 +4,9 @@ target_link_libraries(binlogrouter maxscale-common)
|
||||
install(TARGETS binlogrouter DESTINATION ${MAXSCALE_LIBDIR})
|
||||
|
||||
add_executable(maxbinlogcheck maxbinlogcheck.c blr_file.c blr_cache.c blr_master.c blr_slave.c blr.c)
|
||||
target_link_libraries(maxbinlogcheck maxscale-common)
|
||||
# maxbinlogcheck refers to my_uuid_init and my_uuin. They are non-public functions and
|
||||
# should not be used. They are found only from the embedded lib.
|
||||
target_link_libraries(maxbinlogcheck maxscale-common ${MYSQL_EMBEDDED_LIBRARIES})
|
||||
|
||||
install(TARGETS maxbinlogcheck DESTINATION bin)
|
||||
|
||||
|
@ -507,6 +507,7 @@ char task_name[BLRM_TASK_NAME_LEN+1] = "";
|
||||
inst->binlog_position = 0;
|
||||
inst->current_pos = 0;
|
||||
inst->current_safe_event = 0;
|
||||
inst->master_event_state = BLR_EVENT_DONE;
|
||||
|
||||
strcpy(inst->binlog_name, "");
|
||||
strcpy(inst->prevbinlog, "");
|
||||
|
@ -189,59 +189,78 @@ blr_file_rotate(ROUTER_INSTANCE *router, char *file, uint64_t pos)
|
||||
* binlog files need an initial 4 magic bytes at the start. blr_file_add_magic()
|
||||
* adds them.
|
||||
*
|
||||
* @param router The router instance
|
||||
* @param fd file descriptor to the open binlog file
|
||||
* @return Nothing
|
||||
* @param fd file descriptor to the open binlog file
|
||||
* @return True if the magic string could be written to the file.
|
||||
*/
|
||||
static void
|
||||
blr_file_add_magic(ROUTER_INSTANCE *router, int fd)
|
||||
static bool
|
||||
blr_file_add_magic(int fd)
|
||||
{
|
||||
unsigned char magic[] = BINLOG_MAGIC;
|
||||
static const unsigned char magic[] = BINLOG_MAGIC;
|
||||
|
||||
write(fd, magic, 4);
|
||||
router->current_pos = 4; /* Initial position after the magic number */
|
||||
router->binlog_position = 4; /* Initial position after the magic number */
|
||||
router->current_safe_event = 4;
|
||||
router->last_written = 0;
|
||||
ssize_t written = write(fd, magic, BINLOG_MAGIC_SIZE);
|
||||
|
||||
return written == BINLOG_MAGIC_SIZE;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a new binlog file for the router to use.
|
||||
*
|
||||
* @param router The router instance
|
||||
* @param file The binlog file name
|
||||
* @return Non-zero if the fie creation succeeded
|
||||
* @param router The router instance
|
||||
* @param file The binlog file name
|
||||
* @return Non-zero if the fie creation succeeded
|
||||
*/
|
||||
static int
|
||||
blr_file_create(ROUTER_INSTANCE *router, char *file)
|
||||
{
|
||||
char path[PATH_MAX + 1] = "";
|
||||
int fd;
|
||||
int created = 0;
|
||||
char err_msg[STRERROR_BUFLEN];
|
||||
|
||||
strcpy(path, router->binlogdir);
|
||||
strcat(path, "/");
|
||||
strcat(path, file);
|
||||
char path[PATH_MAX + 1] = "";
|
||||
|
||||
if ((fd = open(path, O_RDWR|O_CREAT, 0666)) != -1)
|
||||
{
|
||||
blr_file_add_magic(router,fd);
|
||||
}
|
||||
else
|
||||
{
|
||||
char err_msg[STRERROR_BUFLEN];
|
||||
strcpy(path, router->binlogdir);
|
||||
strcat(path, "/");
|
||||
strcat(path, file);
|
||||
|
||||
MXS_ERROR("%s: Failed to create binlog file %s, %s.",
|
||||
int fd = open(path, O_RDWR|O_CREAT, 0666);
|
||||
|
||||
if (fd != -1)
|
||||
{
|
||||
if (blr_file_add_magic(fd))
|
||||
{
|
||||
close(router->binlog_fd);
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
strncpy(router->binlog_name, file, BINLOG_FNAMELEN);
|
||||
router->binlog_fd = fd;
|
||||
router->current_pos = BINLOG_MAGIC_SIZE; /* Initial position after the magic number */
|
||||
router->binlog_position = BINLOG_MAGIC_SIZE;
|
||||
router->current_safe_event = BINLOG_MAGIC_SIZE;
|
||||
router->last_written = BINLOG_MAGIC_SIZE;
|
||||
router->last_event_pos = 0;
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
created = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("%s: Failed to write magic string to created binlog file %s, %s.",
|
||||
router->service->name, path, strerror_r(errno, err_msg, sizeof(err_msg)));
|
||||
close(fd);
|
||||
|
||||
if (!unlink(path))
|
||||
{
|
||||
MXS_ERROR("%s: Failed to delete file %s, %s.",
|
||||
router->service->name, path, strerror_r(errno, err_msg, sizeof(err_msg)));
|
||||
return 0;
|
||||
}
|
||||
fsync(fd);
|
||||
close(router->binlog_fd);
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
strncpy(router->binlog_name, file, BINLOG_FNAMELEN);
|
||||
router->binlog_fd = fd;
|
||||
spinlock_release(&router->binlog_lock);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("%s: Failed to create binlog file %s, %s.",
|
||||
router->service->name, path, strerror_r(errno, err_msg, sizeof(err_msg)));
|
||||
}
|
||||
|
||||
return created;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -273,7 +292,16 @@ int fd;
|
||||
router->current_pos = lseek(fd, 0L, SEEK_END);
|
||||
if (router->current_pos < 4) {
|
||||
if (router->current_pos == 0) {
|
||||
blr_file_add_magic(router, fd);
|
||||
if (blr_file_add_magic(fd))
|
||||
{
|
||||
router->current_pos = BINLOG_MAGIC_SIZE;
|
||||
router->binlog_position = BINLOG_MAGIC_SIZE;
|
||||
router->current_safe_event = BINLOG_MAGIC_SIZE;
|
||||
router->last_written = BINLOG_MAGIC_SIZE;
|
||||
router->last_event_pos = 0;
|
||||
} else {
|
||||
MXS_ERROR("%s: Could not write magic to binlog file.", router->service->name);
|
||||
}
|
||||
} else {
|
||||
/* If for any reason the file's length is between 1 and 3 bytes
|
||||
* then report an error. */
|
||||
@ -297,26 +325,33 @@ int fd;
|
||||
* @return Return the number of bytes written
|
||||
*/
|
||||
int
|
||||
blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *buf)
|
||||
blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint32_t size, uint8_t *buf)
|
||||
{
|
||||
int n;
|
||||
|
||||
if ((n = pwrite(router->binlog_fd, buf, hdr->event_size,
|
||||
hdr->next_pos - hdr->event_size)) != hdr->event_size)
|
||||
if ((n = pwrite(router->binlog_fd, buf, size,
|
||||
router->last_written)) != size)
|
||||
{
|
||||
char err_msg[STRERROR_BUFLEN];
|
||||
MXS_ERROR("%s: Failed to write binlog record at %d of %s, %s. "
|
||||
MXS_ERROR("%s: Failed to write binlog record at %lu of %s, %s. "
|
||||
"Truncating to previous record.",
|
||||
router->service->name, hdr->next_pos - hdr->event_size,
|
||||
router->service->name, router->last_written,
|
||||
router->binlog_name,
|
||||
strerror_r(errno, err_msg, sizeof(err_msg)));
|
||||
/* Remove any partual event that was written */
|
||||
ftruncate(router->binlog_fd, hdr->next_pos - hdr->event_size);
|
||||
/* Remove any partial event that was written */
|
||||
if (ftruncate(router->binlog_fd, router->last_written))
|
||||
{
|
||||
MXS_ERROR("%s: Failed to truncate binlog record at %lu of %s, %s. ",
|
||||
router->service->name, router->last_written,
|
||||
router->binlog_name,
|
||||
strerror_r(errno, err_msg, sizeof(err_msg)));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
router->current_pos = hdr->next_pos;
|
||||
router->last_written = hdr->next_pos - hdr->event_size;
|
||||
router->last_written += size;
|
||||
router->last_event_pos = hdr->next_pos - hdr->event_size;
|
||||
spinlock_release(&router->binlog_lock);
|
||||
return n;
|
||||
}
|
||||
@ -462,10 +497,9 @@ struct stat statb;
|
||||
if (strcmp(router->binlog_name, file->binlogname) == 0 &&
|
||||
pos >= router->binlog_position)
|
||||
{
|
||||
if (pos > router->binlog_position && !router->rotating)
|
||||
if (pos > router->binlog_position)
|
||||
{
|
||||
/* Unsafe position, slave will be disconnected by the calling routine */
|
||||
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Requested binlog position %lu. Position is unsafe so disconnecting. "
|
||||
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Requested binlog position %lu is unsafe. "
|
||||
"Latest safe position %lu, end of binlog file %lu",
|
||||
pos, router->binlog_position, router->current_pos);
|
||||
|
||||
|
@ -99,6 +99,10 @@ extern char * blr_last_event_description(ROUTER_INSTANCE *router);
|
||||
static void blr_log_identity(ROUTER_INSTANCE *router);
|
||||
static void blr_distribute_error_message(ROUTER_INSTANCE *router, char *message, char *state, unsigned int err_code);
|
||||
|
||||
int blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf);
|
||||
bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf);
|
||||
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len);
|
||||
|
||||
static int keepalive = 1;
|
||||
|
||||
/**
|
||||
@ -289,6 +293,7 @@ blr_master_close(ROUTER_INSTANCE *router)
|
||||
{
|
||||
dcb_close(router->master);
|
||||
router->master_state = BLRM_UNCONNECTED;
|
||||
router->master_event_state = BLR_EVENT_DONE;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -915,7 +920,7 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
* copy if the message straddles GWBUF's.
|
||||
*/
|
||||
|
||||
if (len < BINLOG_EVENT_HDR_LEN)
|
||||
if (len < BINLOG_EVENT_HDR_LEN && router->master_event_state != BLR_EVENT_ONGOING)
|
||||
{
|
||||
char *msg = "";
|
||||
|
||||
@ -932,90 +937,193 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
router->stats.n_binlogs++;
|
||||
router->stats.n_binlogs_ses++;
|
||||
|
||||
blr_extract_header(ptr, &hdr);
|
||||
|
||||
/* Sanity check */
|
||||
if (hdr.ok == 0 && hdr.event_size != len - 5)
|
||||
if (router->master_event_state == BLR_EVENT_DONE)
|
||||
{
|
||||
MXS_ERROR("Packet length is %d, but event size is %d, "
|
||||
"binlog file %s position %lu "
|
||||
"reslen is %d and preslen is %d, "
|
||||
"length of previous event %d. %s",
|
||||
len, hdr.event_size,
|
||||
router->binlog_name,
|
||||
router->current_pos,
|
||||
reslen, preslen, prev_length,
|
||||
(prev_length == -1 ?
|
||||
(no_residual ? "No residual data from previous call" :
|
||||
"Residual data from previous call") : ""));
|
||||
router->stats.n_binlogs++;
|
||||
router->stats.n_binlogs_ses++;
|
||||
|
||||
blr_log_packet(LOG_ERR, "Packet:", ptr, len);
|
||||
MXS_ERROR("This event (0x%x) was contained in %d GWBUFs, "
|
||||
"the previous events was contained in %d GWBUFs",
|
||||
router->lastEventReceived, n_bufs, pn_bufs);
|
||||
if (msg)
|
||||
blr_extract_header(ptr, &hdr);
|
||||
|
||||
/* Sanity check */
|
||||
if (hdr.ok == 0)
|
||||
{
|
||||
free(msg);
|
||||
msg = NULL;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (hdr.ok == 0)
|
||||
{
|
||||
int event_limit;
|
||||
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
/* set mysql errno to 0 */
|
||||
router->m_errno = 0;
|
||||
|
||||
/* Remove error message */
|
||||
if (router->m_errmsg)
|
||||
free(router->m_errmsg);
|
||||
router->m_errmsg = NULL;
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
#ifdef SHOW_EVENTS
|
||||
printf("blr: event type 0x%02x, flags 0x%04x, event size %d, event timestamp %lu\n", hdr.event_type, hdr.flags, hdr.event_size, hdr.timestamp);
|
||||
#endif
|
||||
|
||||
/*
|
||||
* First check that the checksum we calculate matches the
|
||||
* checksum in the packet we received.
|
||||
*/
|
||||
if (router->master_chksum)
|
||||
{
|
||||
uint32_t chksum, pktsum;
|
||||
|
||||
chksum = crc32(0L, NULL, 0);
|
||||
chksum = crc32(chksum, ptr + 5, hdr.event_size - 4);
|
||||
pktsum = EXTRACT32(ptr + hdr.event_size + 1);
|
||||
if (pktsum != chksum)
|
||||
if (hdr.event_size != len - 5 && (hdr.event_size + 1) < MYSQL_PACKET_LENGTH_MAX)
|
||||
{
|
||||
router->stats.n_badcrc++;
|
||||
MXS_ERROR("Packet length is %d, but event size is %d, "
|
||||
"binlog file %s position %lu "
|
||||
"reslen is %d and preslen is %d, "
|
||||
"length of previous event %d. %s",
|
||||
len, hdr.event_size,
|
||||
router->binlog_name,
|
||||
router->current_pos,
|
||||
reslen, preslen, prev_length,
|
||||
(prev_length == -1 ?
|
||||
(no_residual ? "No residual data from previous call" :
|
||||
"Residual data from previous call") : ""));
|
||||
|
||||
blr_log_packet(LOG_ERR, "Packet:", ptr, len);
|
||||
|
||||
MXS_ERROR("This event (0x%x) was contained in %d GWBUFs, "
|
||||
"the previous events was contained in %d GWBUFs",
|
||||
router->lastEventReceived, n_bufs, pn_bufs);
|
||||
|
||||
if (msg)
|
||||
{
|
||||
free(msg);
|
||||
msg = NULL;
|
||||
}
|
||||
MXS_ERROR("%s: Checksum error in event "
|
||||
"from master, "
|
||||
"binlog %s @ %lu. "
|
||||
"Closing master connection.",
|
||||
router->service->name,
|
||||
router->binlog_name,
|
||||
router->current_pos);
|
||||
blr_master_close(router);
|
||||
blr_master_delayed_connect(router);
|
||||
return;
|
||||
|
||||
break;
|
||||
}
|
||||
else if ((hdr.event_size + 1) >= MYSQL_PACKET_LENGTH_MAX)
|
||||
{
|
||||
router->master_event_state = BLR_EVENT_STARTED;
|
||||
|
||||
/** Store the header for later use */
|
||||
memcpy(&router->stored_header, &hdr, sizeof(hdr));
|
||||
}
|
||||
|
||||
/** Prepare the checksum variables for this event */
|
||||
router->stored_checksum = crc32(0L, NULL, 0);
|
||||
router->checksum_size = hdr.event_size - MYSQL_CHECKSUM_LEN;
|
||||
router->partial_checksum_bytes = 0;
|
||||
}
|
||||
|
||||
if (hdr.ok == 0)
|
||||
{
|
||||
spinlock_acquire(&router->lock);
|
||||
|
||||
/* set mysql errno to 0 */
|
||||
router->m_errno = 0;
|
||||
|
||||
/* Remove error message */
|
||||
if (router->m_errmsg)
|
||||
free(router->m_errmsg);
|
||||
router->m_errmsg = NULL;
|
||||
|
||||
spinlock_release(&router->lock);
|
||||
#ifdef SHOW_EVENTS
|
||||
printf("blr: len %lu, event type 0x%02x, flags 0x%04x, event size %d, event timestamp %lu\n", (unsigned long)len-4, hdr.event_type, hdr.flags, hdr.event_size, (unsigned long)hdr.timestamp);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
/* pending large event */
|
||||
if (router->master_event_state != BLR_EVENT_DONE)
|
||||
{
|
||||
if (len - MYSQL_HEADER_LEN < MYSQL_PACKET_LENGTH_MAX)
|
||||
{
|
||||
/** This is the last packet, we can now proceed to distribute
|
||||
* the event afer it has been written to disk */
|
||||
ss_dassert(router->master_event_state != BLR_EVENT_COMPLETE);
|
||||
router->master_event_state = BLR_EVENT_COMPLETE;
|
||||
memcpy(&hdr, &router->stored_header, sizeof(hdr));
|
||||
}
|
||||
else
|
||||
{
|
||||
/* current partial event is being written to disk file */
|
||||
uint32_t offset = MYSQL_HEADER_LEN;
|
||||
uint32_t extra_bytes = MYSQL_HEADER_LEN;
|
||||
|
||||
/** Don't write the OK byte into the binlog */
|
||||
if (router->master_event_state == BLR_EVENT_STARTED)
|
||||
{
|
||||
offset = MYSQL_HEADER_LEN + 1;
|
||||
router->master_event_state = BLR_EVENT_ONGOING;
|
||||
extra_bytes = MYSQL_HEADER_LEN + 1;
|
||||
}
|
||||
|
||||
if (router->master_chksum)
|
||||
{
|
||||
uint32_t size = MIN(len - extra_bytes, router->checksum_size);
|
||||
router->stored_checksum = crc32(router->stored_checksum,
|
||||
ptr + offset,
|
||||
size);
|
||||
router->checksum_size -= size;
|
||||
|
||||
if (router->checksum_size == 0 && size < len - offset)
|
||||
{
|
||||
extract_checksum(router, ptr + offset + size, len - offset - size);
|
||||
}
|
||||
}
|
||||
|
||||
if (blr_write_data_into_binlog(router, len - offset, ptr + offset) == 0)
|
||||
{
|
||||
/** Failed to write to the binlog file, destroy the buffer
|
||||
* chain and close the connection with the master */
|
||||
while (pkt)
|
||||
{
|
||||
pkt = GWBUF_CONSUME_ALL(pkt);
|
||||
}
|
||||
blr_master_close(router);
|
||||
blr_master_delayed_connect(router);
|
||||
return;
|
||||
}
|
||||
pkt = gwbuf_consume(pkt, len);
|
||||
pkt_length -= len;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* First check that the checksum we calculate matches the
|
||||
* checksum in the packet we received.
|
||||
*/
|
||||
if (router->master_chksum)
|
||||
{
|
||||
uint32_t pktsum, offset = MYSQL_HEADER_LEN;
|
||||
uint32_t size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN;
|
||||
|
||||
if (router->master_event_state == BLR_EVENT_DONE)
|
||||
{
|
||||
/** Set the pointer offset to the first byte after
|
||||
* the header and OK byte */
|
||||
offset = MYSQL_HEADER_LEN + 1;
|
||||
size = len - MYSQL_HEADER_LEN - MYSQL_CHECKSUM_LEN - 1;
|
||||
}
|
||||
|
||||
size = MIN(size, router->checksum_size);
|
||||
|
||||
if (router->checksum_size > 0)
|
||||
{
|
||||
router->stored_checksum = crc32(router->stored_checksum,
|
||||
ptr + offset,
|
||||
size);
|
||||
router->checksum_size -= size;
|
||||
}
|
||||
|
||||
if(router->checksum_size == 0 && size < len - offset)
|
||||
{
|
||||
extract_checksum(router, ptr + offset + size, len - offset - size);
|
||||
}
|
||||
|
||||
if (router->partial_checksum_bytes == MYSQL_CHECKSUM_LEN)
|
||||
{
|
||||
pktsum = EXTRACT32(&router->partial_checksum);
|
||||
if (pktsum != router->stored_checksum)
|
||||
{
|
||||
router->stats.n_badcrc++;
|
||||
free(msg);
|
||||
msg = NULL;
|
||||
MXS_ERROR("%s: Checksum error in event from master, "
|
||||
"binlog %s @ %lu. Closing master connection.",
|
||||
router->service->name, router->binlog_name,
|
||||
router->current_pos);
|
||||
blr_master_close(router);
|
||||
blr_master_delayed_connect(router);
|
||||
return;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
pkt = gwbuf_consume(pkt, len);
|
||||
pkt_length -= len;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (hdr.ok == 0)
|
||||
{
|
||||
router->lastEventReceived = hdr.event_type;
|
||||
router->lastEventTimestamp = hdr.timestamp;
|
||||
|
||||
@ -1045,7 +1153,7 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
* This marks the transaction starts instead of
|
||||
* QUERY_EVENT with "BEGIN"
|
||||
*/
|
||||
if (router->trx_safe) {
|
||||
if (router->trx_safe && router->master_event_state == BLR_EVENT_DONE) {
|
||||
if (router->mariadb10_compat) {
|
||||
if (hdr.event_type == MARIADB10_GTID_EVENT) {
|
||||
uint64_t n_sequence;
|
||||
@ -1130,7 +1238,9 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
}
|
||||
}
|
||||
|
||||
event_limit = router->mariadb10_compat ? MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;
|
||||
/** Gather statistics about the replication event types */
|
||||
int event_limit = router->mariadb10_compat ?
|
||||
MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;
|
||||
|
||||
if (hdr.event_type <= event_limit)
|
||||
router->stats.events[hdr.event_type]++;
|
||||
@ -1193,13 +1303,30 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
}
|
||||
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
|
||||
{
|
||||
ptr = ptr + 5; // We don't put the first byte of the payload
|
||||
// into the binlog file
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
router->rotating = 1;
|
||||
ptr = ptr + 4; // Skip header
|
||||
uint32_t offset = 4;
|
||||
|
||||
/* current event is being written to disk file */
|
||||
if (blr_write_binlog_record(router, &hdr, ptr) == 0)
|
||||
if (router->master_event_state == BLR_EVENT_STARTED ||
|
||||
router->master_event_state == BLR_EVENT_DONE)
|
||||
{
|
||||
ptr++;
|
||||
offset++;
|
||||
}
|
||||
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
{
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
router->rotating = 1;
|
||||
spinlock_release(&router->binlog_lock);
|
||||
}
|
||||
|
||||
/* Current event is being written to disk file.
|
||||
* It is possible for an empty packet to be sent if an
|
||||
* event is exactly 2^24 bytes long. In this case the
|
||||
* empty packet should be discarded. */
|
||||
|
||||
if (len > MYSQL_HEADER_LEN &&
|
||||
blr_write_binlog_record(router, &hdr, len - offset, ptr) == 0)
|
||||
{
|
||||
/*
|
||||
* Failed to write to the
|
||||
@ -1247,8 +1374,28 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
/* Now distribute events */
|
||||
blr_distribute_binlog_record(router, &hdr, ptr);
|
||||
if (router->master_event_state == BLR_EVENT_COMPLETE)
|
||||
{
|
||||
/** Read the complete event from the disk */
|
||||
GWBUF *record = blr_read_events_from_pos(router, router->last_event_pos, &hdr, hdr.next_pos);
|
||||
if (record)
|
||||
{
|
||||
uint8_t *data = GWBUF_DATA(record);
|
||||
blr_distribute_binlog_record(router, &hdr, data);
|
||||
gwbuf_free(record);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Failed to read event at position"
|
||||
"%lu with a size of %u bytes.",
|
||||
router->last_event_pos, hdr.event_size);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Now distribute events */
|
||||
blr_distribute_binlog_record(router, &hdr, ptr);
|
||||
}
|
||||
} else {
|
||||
/**
|
||||
* If transaction is closed:
|
||||
@ -1358,7 +1505,9 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
ptr += 5;
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
{
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
router->rotating = 1;
|
||||
spinlock_release(&router->binlog_lock);
|
||||
if (!blr_rotate_event(router, ptr, &hdr))
|
||||
{
|
||||
/*
|
||||
@ -1376,6 +1525,12 @@ int n_bufs = -1, pn_bufs = -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** A large event is now fully received and processed */
|
||||
if(router->master_event_state == BLR_EVENT_COMPLETE)
|
||||
{
|
||||
router->master_event_state = BLR_EVENT_DONE;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1474,6 +1629,7 @@ blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr)
|
||||
* @param router The instance of the router
|
||||
* @param ptr The packet containing the rotate event
|
||||
* @param hdr The replication message header
|
||||
* @return 1 if the file could be rotated, 0 otherwise.
|
||||
*/
|
||||
static int
|
||||
blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr)
|
||||
@ -1505,17 +1661,20 @@ char file[BINLOG_FNAMELEN+1];
|
||||
|
||||
strcpy(router->prevbinlog, router->binlog_name);
|
||||
|
||||
int rotated = 1;
|
||||
|
||||
if (strncmp(router->binlog_name, file, slen) != 0)
|
||||
{
|
||||
router->stats.n_rotates++;
|
||||
if (blr_file_rotate(router, file, pos) == 0)
|
||||
{
|
||||
router->rotating = 0;
|
||||
return 0;
|
||||
rotated = 0;
|
||||
}
|
||||
}
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
router->rotating = 0;
|
||||
return 1;
|
||||
spinlock_release(&router->binlog_lock);
|
||||
return rotated;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1602,6 +1761,11 @@ unsigned int cstate;
|
||||
/* Slave is in catchup mode */
|
||||
action = 3;
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("slave->cstate does not contain a meaningful state %d", slave->cstate);
|
||||
action = 0;
|
||||
}
|
||||
slave->stats.n_actions[action-1]++;
|
||||
spinlock_release(&slave->catch_lock);
|
||||
|
||||
@ -1621,7 +1785,7 @@ unsigned int cstate;
|
||||
*/
|
||||
slave_action = SLAVE_SEND_EVENT;
|
||||
}
|
||||
else if (slave->binlog_pos == router->last_written &&
|
||||
else if (slave->binlog_pos == router->last_event_pos &&
|
||||
(strcmp(slave->binlogfile, router->binlog_name) == 0 ||
|
||||
(hdr->event_type == ROTATE_EVENT &&
|
||||
strcmp(slave->binlogfile, router->prevbinlog))))
|
||||
@ -1680,20 +1844,13 @@ unsigned int cstate;
|
||||
if (router->send_slave_heartbeat)
|
||||
slave->lastReply = time(0);
|
||||
|
||||
pkt = gwbuf_alloc(hdr->event_size + 5);
|
||||
buf = GWBUF_DATA(pkt);
|
||||
encode_value(buf, hdr->event_size + 1, 24);
|
||||
buf += 3;
|
||||
*buf++ = slave->seqno++;
|
||||
*buf++ = 0; // OK
|
||||
memcpy(buf, ptr, hdr->event_size);
|
||||
if (hdr->event_type == ROTATE_EVENT)
|
||||
{
|
||||
blr_slave_rotate(router, slave, ptr);
|
||||
}
|
||||
slave->stats.n_bytes += gwbuf_length(pkt);
|
||||
slave->stats.n_events++;
|
||||
slave->dcb->func.write(slave->dcb, pkt);
|
||||
|
||||
blr_send_event(slave, hdr, ptr);
|
||||
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
if (hdr->event_type != ROTATE_EVENT)
|
||||
{
|
||||
@ -2215,3 +2372,163 @@ ROUTER_SLAVE *slave;
|
||||
spinlock_release(&router->lock);
|
||||
}
|
||||
|
||||
int
|
||||
blr_write_data_into_binlog(ROUTER_INSTANCE *router, uint32_t data_len, uint8_t *buf)
|
||||
{
|
||||
int n;
|
||||
|
||||
if ((n = pwrite(router->binlog_fd, buf, data_len,
|
||||
router->last_written)) != data_len)
|
||||
{
|
||||
char err_msg[STRERROR_BUFLEN];
|
||||
MXS_ERROR("%s: Failed to write binlog record at %lu of %s, %s. "
|
||||
"Truncating to previous record.",
|
||||
router->service->name, router->last_written,
|
||||
router->binlog_name,
|
||||
strerror_r(errno, err_msg, sizeof(err_msg)));
|
||||
|
||||
/* Remove any partial event that was written */
|
||||
if (ftruncate(router->binlog_fd, router->last_written))
|
||||
{
|
||||
MXS_ERROR("%s: Failed to truncate binlog record at %lu of %s, %s. ",
|
||||
router->service->name, router->last_written,
|
||||
router->binlog_name,
|
||||
strerror_r(errno, err_msg, sizeof(err_msg)));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
router->last_written += data_len;
|
||||
return n;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a replication event packet to a slave
|
||||
*
|
||||
* The first replication event packet contains one byte set to either
|
||||
* 0x0, 0xfe or 0xff which signals what the state of the replication stream is.
|
||||
* If the data pointed by @c buf is not the start of the replication header
|
||||
* and part of the replication event is already sent, @c first must be set to
|
||||
* false so that the first status byte is not sent again.
|
||||
*
|
||||
* @param slave Slave where the packet is sent to
|
||||
* @param buf Buffer containing the data
|
||||
* @param len Length of the data
|
||||
* @param first If this is the first packet of a multi-packet event
|
||||
* @return True on success, false when memory allocation fails
|
||||
*/
|
||||
bool blr_send_packet(ROUTER_SLAVE *slave, uint8_t *buf, uint32_t len, bool first)
|
||||
{
|
||||
bool rval = true;
|
||||
unsigned int datalen = len + (first ? 1 : 0);
|
||||
GWBUF *buffer = gwbuf_alloc(datalen + MYSQL_HEADER_LEN);
|
||||
if (buffer)
|
||||
{
|
||||
uint8_t *data = GWBUF_DATA(buffer);
|
||||
encode_value(data, datalen, 24);
|
||||
data += 3;
|
||||
*data++ = slave->seqno++;
|
||||
|
||||
if (first)
|
||||
{
|
||||
*data++ = 0; // OK byte
|
||||
}
|
||||
|
||||
if (len > 0)
|
||||
{
|
||||
memcpy(data, buf, len);
|
||||
}
|
||||
|
||||
slave->stats.n_bytes += GWBUF_LENGTH(buffer);
|
||||
slave->dcb->func.write(slave->dcb, buffer);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("failed to allocate %ld bytes of memory when writing an"
|
||||
" event.", datalen + MYSQL_HEADER_LEN);
|
||||
rval = false;
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a single replication event to a slave
|
||||
*
|
||||
* This sends the complete replication event to a slave. If the event size exceeds
|
||||
* the maximum size of a MySQL packet, it will be sent in multiple packets.
|
||||
*
|
||||
* @param slave Slave where the event is sent to
|
||||
* @param hdr Replication header
|
||||
* @param buf Pointer to the replication event as it was read from the disk
|
||||
* @return True on success, false if memory allocation failed
|
||||
*/
|
||||
bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf)
|
||||
{
|
||||
bool rval = true;
|
||||
|
||||
/** Check if the event and the OK byte fit into a single packet */
|
||||
if (hdr->event_size + 1 < MYSQL_PACKET_LENGTH_MAX)
|
||||
{
|
||||
rval = blr_send_packet(slave, buf, hdr->event_size, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
/** Total size of all the payloads in all the packets */
|
||||
int64_t len = hdr->event_size + 1;
|
||||
bool first = true;
|
||||
|
||||
while (rval && len > 0)
|
||||
{
|
||||
uint64_t payload_len = first ? MYSQL_PACKET_LENGTH_MAX - 1 :
|
||||
MIN(MYSQL_PACKET_LENGTH_MAX, len);
|
||||
|
||||
if (blr_send_packet(slave, buf, payload_len, first))
|
||||
{
|
||||
/** The check for exactly 0x00ffffff bytes needs to be done
|
||||
* here as well */
|
||||
if (len == MYSQL_PACKET_LENGTH_MAX)
|
||||
{
|
||||
blr_send_packet(slave, buf, 0, false);
|
||||
}
|
||||
|
||||
/** Add the extra byte written by blr_send_packet */
|
||||
len -= first ? payload_len + 1 : payload_len;
|
||||
buf += payload_len;
|
||||
first = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
rval = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
slave->stats.n_events++;
|
||||
|
||||
if (!rval)
|
||||
{
|
||||
MXS_ERROR("Failed to send an event of %u bytes to slave at %s:%d.",
|
||||
hdr->event_size, slave->dcb->remote,
|
||||
ntohs(slave->dcb->ipv4.sin_port));
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the checksum from the binlogs
|
||||
*
|
||||
* This updates the internal state of the router and will allow us to detect
|
||||
* if the checksum is split across two packets.
|
||||
* @param router Router instance
|
||||
* @param cksumptr Pointer to the checksum
|
||||
* @param len How much of the data is readable
|
||||
*/
|
||||
void extract_checksum(ROUTER_INSTANCE* router, uint8_t *cksumptr, uint8_t len)
|
||||
{
|
||||
uint8_t *ptr = cksumptr;
|
||||
while (ptr - cksumptr < len)
|
||||
{
|
||||
router->partial_checksum[router->partial_checksum_bytes] = *ptr;
|
||||
ptr++;
|
||||
router->partial_checksum_bytes++;
|
||||
}
|
||||
}
|
||||
|
@ -152,6 +152,7 @@ static int blr_slave_handle_status_variables(ROUTER_INSTANCE *router, ROUTER_SLA
|
||||
static int blr_slave_send_columndef_with_status_schema(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, char *name, int type, int len, uint8_t seqno);
|
||||
static void blr_send_slave_heartbeat(void *inst);
|
||||
static int blr_slave_send_heartbeat(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
|
||||
bool blr_send_event(ROUTER_SLAVE *slave, REP_HEADER *hdr, uint8_t *buf);
|
||||
|
||||
void poll_fake_write_event(DCB *dcb);
|
||||
|
||||
@ -1723,6 +1724,47 @@ uint32_t chksum;
|
||||
strncpy(slave->binlogfile, (char *)ptr, binlognamelen);
|
||||
slave->binlogfile[binlognamelen] = 0;
|
||||
|
||||
if (router->trx_safe)
|
||||
{
|
||||
/**
|
||||
* Check for a pending transaction and possible unsafe position.
|
||||
* Force slave disconnection if requested position is unsafe.
|
||||
*/
|
||||
|
||||
bool force_disconnect = false;
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
if (router->pending_transaction && strcmp(router->binlog_name, slave->binlogfile) == 0 &&
|
||||
(slave->binlog_pos > router->binlog_position))
|
||||
{
|
||||
force_disconnect = true;
|
||||
}
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
if (force_disconnect)
|
||||
{
|
||||
MXS_ERROR("%s: Slave %s:%i, server-id %d, binlog '%s', blr_slave_binlog_dump failure: "
|
||||
"Requested binlog position %lu. Position is unsafe so disconnecting. "
|
||||
"Latest safe position %lu, end of binlog file %lu",
|
||||
router->service->name,
|
||||
slave->dcb->remote,
|
||||
ntohs((slave->dcb->ipv4).sin_port),
|
||||
slave->serverid,
|
||||
slave->binlogfile,
|
||||
(unsigned long)slave->binlog_pos,
|
||||
router->binlog_position,
|
||||
router->current_pos);
|
||||
|
||||
/*
|
||||
* Close the slave session and socket
|
||||
* The slave will try to reconnect
|
||||
*/
|
||||
dcb_close(slave->dcb);
|
||||
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
MXS_DEBUG("%s: COM_BINLOG_DUMP: binlog name '%s', length %d, "
|
||||
"from position %lu.", router->service->name,
|
||||
slave->binlogfile, binlognamelen,
|
||||
@ -1905,15 +1947,32 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
||||
burst = router->long_burst;
|
||||
else
|
||||
burst = router->short_burst;
|
||||
|
||||
burst_size = router->burst_size;
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
if (slave->cstate & CS_BUSY)
|
||||
{
|
||||
spinlock_release(&slave->catch_lock);
|
||||
return 0;
|
||||
}
|
||||
slave->cstate |= CS_BUSY;
|
||||
spinlock_release(&slave->catch_lock);
|
||||
|
||||
int do_return;
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
|
||||
do_return = 0;
|
||||
|
||||
/* check for a pending transaction and safe position */
|
||||
if (router->pending_transaction && strcmp(router->binlog_name, slave->binlogfile) == 0 &&
|
||||
(slave->binlog_pos > router->binlog_position)) {
|
||||
do_return = 1;
|
||||
}
|
||||
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
if (do_return) {
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
slave->cstate &= ~CS_BUSY;
|
||||
slave->cstate |= CS_EXPECTCB;
|
||||
spinlock_release(&slave->catch_lock);
|
||||
poll_fake_write_event(slave->dcb);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
BLFILE *file;
|
||||
#ifdef BLFILE_IN_SLAVE
|
||||
@ -1957,25 +2016,17 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
slave->stats.n_bursts++;
|
||||
|
||||
#ifdef BLSLAVE_IN_FILE
|
||||
slave->file = file;
|
||||
#endif
|
||||
int events_before = slave->stats.n_events;
|
||||
|
||||
while (burst-- && burst_size > 0 &&
|
||||
(record = blr_read_binlog(router, file, slave->binlog_pos, &hdr, read_errmsg)) != NULL)
|
||||
{
|
||||
head = gwbuf_alloc(5);
|
||||
ptr = GWBUF_DATA(head);
|
||||
encode_value(ptr, hdr.event_size + 1, 24);
|
||||
ptr += 3;
|
||||
*ptr++ = slave->seqno++;
|
||||
*ptr++ = 0; // OK
|
||||
head = gwbuf_append(head, record);
|
||||
slave->lastEventTimestamp = hdr.timestamp;
|
||||
slave->lastEventReceived = hdr.event_type;
|
||||
|
||||
if (hdr.event_type == ROTATE_EVENT)
|
||||
{
|
||||
unsigned long beat1 = hkheartbeat;
|
||||
@ -2026,15 +2077,18 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
||||
MXS_ERROR("blr_open_binlog took %lu beats",
|
||||
hkheartbeat - beat1);
|
||||
}
|
||||
slave->stats.n_bytes += gwbuf_length(head);
|
||||
written = slave->dcb->func.write(slave->dcb, head);
|
||||
if (written && hdr.event_type != ROTATE_EVENT)
|
||||
{
|
||||
slave->binlog_pos = hdr.next_pos;
|
||||
}
|
||||
rval = written;
|
||||
slave->stats.n_events++;
|
||||
burst_size -= hdr.event_size;
|
||||
|
||||
if (blr_send_event(slave, &hdr, (uint8_t*) record->start))
|
||||
{
|
||||
if (hdr.event_type != ROTATE_EVENT)
|
||||
{
|
||||
slave->binlog_pos = hdr.next_pos;
|
||||
}
|
||||
slave->stats.n_events++;
|
||||
burst_size -= hdr.event_size;
|
||||
}
|
||||
gwbuf_free(record);
|
||||
record = NULL;
|
||||
|
||||
/* set lastReply for slave heartbeat check */
|
||||
if (router->send_slave_heartbeat)
|
||||
@ -2103,24 +2157,16 @@ char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
||||
|
||||
if (hdr.ok == SLAVE_POS_READ_UNSAFE) {
|
||||
|
||||
MXS_ERROR("%s: Slave %s:%i, server-id %d, binlog '%s', %s",
|
||||
MXS_NOTICE("%s: Slave %s:%i, server-id %d, binlog '%s', read %d events, "
|
||||
"current committed transaction event being sent: %lu, %s",
|
||||
router->service->name,
|
||||
slave->dcb->remote,
|
||||
ntohs((slave->dcb->ipv4).sin_port),
|
||||
slave->serverid,
|
||||
slave->binlogfile,
|
||||
slave->stats.n_events - events_before,
|
||||
router->current_safe_event,
|
||||
read_errmsg);
|
||||
|
||||
/*
|
||||
* Close the slave session and socket
|
||||
* The slave will try to reconnect
|
||||
*/
|
||||
dcb_close(slave->dcb);
|
||||
|
||||
#ifndef BLFILE_IN_SLAVE
|
||||
blr_close_binlog(router, file);
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
@ -2308,33 +2354,15 @@ unsigned int cstate;
|
||||
{
|
||||
if (slave->state == BLRS_DUMPING)
|
||||
{
|
||||
int do_return;
|
||||
|
||||
spinlock_acquire(&router->binlog_lock);
|
||||
|
||||
do_return = 0;
|
||||
cstate = slave->cstate;
|
||||
|
||||
/* check for a pending transaction and not rotating */
|
||||
if (router->pending_transaction && strcmp(router->binlog_name, slave->binlogfile) == 0 &&
|
||||
(slave->binlog_pos > router->binlog_position) && !router->rotating) {
|
||||
do_return = 1;
|
||||
}
|
||||
|
||||
spinlock_release(&router->binlog_lock);
|
||||
|
||||
if (do_return) {
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
slave->cstate |= CS_EXPECTCB;
|
||||
spinlock_release(&slave->catch_lock);
|
||||
poll_fake_write_event(slave->dcb);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
spinlock_acquire(&slave->catch_lock);
|
||||
if (slave->cstate & CS_BUSY)
|
||||
{
|
||||
spinlock_release(&slave->catch_lock);
|
||||
return 0;
|
||||
}
|
||||
cstate = slave->cstate;
|
||||
slave->cstate &= ~(CS_UPTODATE|CS_EXPECTCB);
|
||||
slave->cstate |= CS_BUSY;
|
||||
spinlock_release(&slave->catch_lock);
|
||||
|
||||
if ((cstate & CS_UPTODATE) == CS_UPTODATE)
|
||||
@ -4575,13 +4603,14 @@ GWBUF *resp;
|
||||
uint8_t *ptr;
|
||||
int len = BINLOG_EVENT_HDR_LEN;
|
||||
uint32_t chksum;
|
||||
int filename_len = strlen(slave->binlogfile);
|
||||
|
||||
/* Add CRC32 4 bytes */
|
||||
if (!slave->nocrc)
|
||||
len +=4;
|
||||
|
||||
/* add binlogname to data content len */
|
||||
len += strlen(slave->binlogfile);
|
||||
len += filename_len;
|
||||
|
||||
/**
|
||||
* Alloc buffer for network binlog stream:
|
||||
@ -4625,9 +4654,9 @@ uint32_t chksum;
|
||||
ptr = blr_build_header(resp, &hdr);
|
||||
|
||||
/* Copy binlog name */
|
||||
memcpy(ptr, slave->binlogfile, BINLOG_FNAMELEN);
|
||||
memcpy(ptr, slave->binlogfile, filename_len);
|
||||
|
||||
ptr += strlen(slave->binlogfile);
|
||||
ptr += filename_len;
|
||||
|
||||
/* Add the CRC32 */
|
||||
if (!slave->nocrc)
|
||||
@ -4640,4 +4669,3 @@ uint32_t chksum;
|
||||
/* Write the packet */
|
||||
return slave->dcb->func.write(slave->dcb, resp);
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
if(BUILD_TESTS)
|
||||
add_executable(testbinlogrouter testbinlog.c ../blr.c ../blr_slave.c ../blr_master.c ../blr_file.c ../blr_cache.c)
|
||||
target_link_libraries(testbinlogrouter maxscale-common)
|
||||
# testbinlogrouter refers to my_uuid_init and my_uuin. They are non-public functions and
|
||||
# should not be used. They are found only from the embedded lib.
|
||||
target_link_libraries(testbinlogrouter maxscale-common ${MYSQL_EMBEDDED_LIBRARIES})
|
||||
add_test(TestBinlogRouter ${CMAKE_CURRENT_BINARY_DIR}/testbinlogrouter)
|
||||
endif()
|
||||
|
@ -1,5 +1,5 @@
|
||||
add_library(readwritesplit SHARED readwritesplit.c)
|
||||
target_link_libraries(readwritesplit maxscale-common query_classifier)
|
||||
target_link_libraries(readwritesplit maxscale-common)
|
||||
set_target_properties(readwritesplit PROPERTIES VERSION "1.0.2")
|
||||
install(TARGETS readwritesplit DESTINATION ${MAXSCALE_LIBDIR})
|
||||
if(BUILD_TESTS)
|
||||
|
@ -1162,6 +1162,8 @@ static bool get_dcb(
|
||||
for (i=0; i<rses->rses_nbackends; i++)
|
||||
{
|
||||
BACKEND* b = backend_ref[i].bref_backend;
|
||||
SERVER server;
|
||||
server.status = backend_ref[i].bref_backend->backend_server->status;
|
||||
/**
|
||||
* To become chosen:
|
||||
* backend must be in use, name must match,
|
||||
@ -1175,9 +1177,9 @@ static bool get_dcb(
|
||||
b->backend_server->unique_name,
|
||||
PATH_MAX) == 0) &&
|
||||
master_bref->bref_backend != NULL &&
|
||||
(SERVER_IS_SLAVE(b->backend_server) ||
|
||||
SERVER_IS_RELAY_SERVER(b->backend_server) ||
|
||||
SERVER_IS_MASTER(b->backend_server)))
|
||||
(SERVER_IS_SLAVE(&server) ||
|
||||
SERVER_IS_RELAY_SERVER(&server) ||
|
||||
SERVER_IS_MASTER(&server)))
|
||||
{
|
||||
*p_dcb = backend_ref[i].bref_dcb;
|
||||
succp = true;
|
||||
@ -1202,13 +1204,16 @@ static bool get_dcb(
|
||||
for (i=0; i<rses->rses_nbackends; i++)
|
||||
{
|
||||
BACKEND* b = (&backend_ref[i])->bref_backend;
|
||||
SERVER server;
|
||||
SERVER candidate;
|
||||
server.status = backend_ref[i].bref_backend->backend_server->status;
|
||||
/**
|
||||
* Unused backend or backend which is not master nor
|
||||
* slave can't be used
|
||||
*/
|
||||
if (!BREF_IS_IN_USE(&backend_ref[i]) ||
|
||||
(!SERVER_IS_MASTER(b->backend_server) &&
|
||||
!SERVER_IS_SLAVE(b->backend_server)))
|
||||
(!SERVER_IS_MASTER(&server) &&
|
||||
!SERVER_IS_SLAVE(&server)))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
@ -1222,11 +1227,12 @@ static bool get_dcb(
|
||||
* Ensure that master has not changed dunring
|
||||
* session and abort if it has.
|
||||
*/
|
||||
if (SERVER_IS_MASTER(b->backend_server) &&
|
||||
if (SERVER_IS_MASTER(&server) &&
|
||||
&backend_ref[i] == master_bref)
|
||||
{
|
||||
/** found master */
|
||||
candidate_bref = &backend_ref[i];
|
||||
candidate.status = candidate_bref->bref_backend->backend_server->status;
|
||||
succp = true;
|
||||
}
|
||||
/**
|
||||
@ -1240,6 +1246,7 @@ static bool get_dcb(
|
||||
{
|
||||
/** found slave */
|
||||
candidate_bref = &backend_ref[i];
|
||||
candidate.status = candidate_bref->bref_backend->backend_server->status;
|
||||
succp = true;
|
||||
}
|
||||
}
|
||||
@ -1247,8 +1254,8 @@ static bool get_dcb(
|
||||
* If candidate is master, any slave which doesn't break
|
||||
* replication lag limits replaces it.
|
||||
*/
|
||||
else if (SERVER_IS_MASTER(candidate_bref->bref_backend->backend_server) &&
|
||||
SERVER_IS_SLAVE(b->backend_server) &&
|
||||
else if (SERVER_IS_MASTER(&candidate) &&
|
||||
SERVER_IS_SLAVE(&server) &&
|
||||
(max_rlag == MAX_RLAG_UNDEFINED ||
|
||||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||
b->backend_server->rlag <= max_rlag)) &&
|
||||
@ -1256,6 +1263,7 @@ static bool get_dcb(
|
||||
{
|
||||
/** found slave */
|
||||
candidate_bref = &backend_ref[i];
|
||||
candidate.status = candidate_bref->bref_backend->backend_server->status;
|
||||
succp = true;
|
||||
}
|
||||
/**
|
||||
@ -1263,7 +1271,7 @@ static bool get_dcb(
|
||||
* backend and update assign it to new candidate if
|
||||
* necessary.
|
||||
*/
|
||||
else if (SERVER_IS_SLAVE(b->backend_server))
|
||||
else if (SERVER_IS_SLAVE(&server))
|
||||
{
|
||||
if (max_rlag == MAX_RLAG_UNDEFINED ||
|
||||
(b->backend_server->rlag != MAX_RLAG_NOT_AVAILABLE &&
|
||||
@ -1273,6 +1281,7 @@ static bool get_dcb(
|
||||
candidate_bref,
|
||||
&backend_ref[i],
|
||||
rses->rses_config.rw_slave_select_criteria);
|
||||
candidate.status = candidate_bref->bref_backend->backend_server->status;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1292,30 +1301,34 @@ static bool get_dcb(
|
||||
|
||||
goto return_succp;
|
||||
} /*< if (btype == BE_SLAVE) */
|
||||
/**
|
||||
* If target was originally master only then the execution jumps
|
||||
* directly here.
|
||||
*/
|
||||
if (btype == BE_MASTER)
|
||||
/**
|
||||
* If target was originally master only then the execution jumps
|
||||
* directly here.
|
||||
*/
|
||||
if (btype == BE_MASTER)
|
||||
{
|
||||
/** It is possible for the server status to change at any point in time
|
||||
* so copying it locally will make possible error messages
|
||||
* easier to understand */
|
||||
SERVER server;
|
||||
server.status = master_bref->bref_backend->backend_server->status;
|
||||
if (BREF_IS_IN_USE(master_bref) && SERVER_IS_MASTER(&server))
|
||||
{
|
||||
if (BREF_IS_IN_USE(master_bref) &&
|
||||
SERVER_IS_MASTER(master_bref->bref_backend->backend_server))
|
||||
{
|
||||
*p_dcb = master_bref->bref_dcb;
|
||||
succp = true;
|
||||
/** if bref is in use DCB should not be closed */
|
||||
ss_dassert(master_bref->bref_dcb->state != DCB_STATE_ZOMBIE);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Server at %s:%d should be master but "
|
||||
"is %s instead and can't be chosen to master.",
|
||||
master_bref->bref_backend->backend_server->name,
|
||||
master_bref->bref_backend->backend_server->port,
|
||||
STRSRVSTATUS(master_bref->bref_backend->backend_server));
|
||||
succp = false;
|
||||
}
|
||||
*p_dcb = master_bref->bref_dcb;
|
||||
succp = true;
|
||||
/** if bref is in use DCB should not be closed */
|
||||
ss_dassert(master_bref->bref_dcb->state != DCB_STATE_ZOMBIE);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Server at %s:%d should be master but "
|
||||
"is %s instead and can't be chosen to master.",
|
||||
master_bref->bref_backend->backend_server->name,
|
||||
master_bref->bref_backend->backend_server->port,
|
||||
STRSRVSTATUS(&server));
|
||||
succp = false;
|
||||
}
|
||||
}
|
||||
|
||||
return_succp:
|
||||
return succp;
|
||||
@ -1566,8 +1579,6 @@ void check_drop_tmp_table(
|
||||
char** tbl = NULL;
|
||||
char *hkey,*dbname;
|
||||
MYSQL_session* data;
|
||||
|
||||
DCB* master_dcb = NULL;
|
||||
rses_property_t* rses_prop_tmp;
|
||||
|
||||
if(router_cli_ses == NULL || querybuf == NULL)
|
||||
@ -1577,27 +1588,14 @@ void check_drop_tmp_table(
|
||||
return;
|
||||
}
|
||||
|
||||
if(router_cli_ses->rses_master_ref == NULL)
|
||||
if(router_cli_ses->client_dcb == NULL)
|
||||
{
|
||||
MXS_ERROR("[%s] Error: Master server reference is NULL.",
|
||||
__FUNCTION__);
|
||||
MXS_ERROR("[%s] Error: Client DCB is NULL.", __FUNCTION__);
|
||||
return;
|
||||
}
|
||||
|
||||
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
|
||||
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
|
||||
|
||||
if(master_dcb == NULL || master_dcb->session == NULL)
|
||||
{
|
||||
MXS_ERROR("[%s] Error: Master server DBC is NULL. "
|
||||
"This means that the connection to the master server is already "
|
||||
"closed while a query is still being routed.",__FUNCTION__);
|
||||
return;
|
||||
}
|
||||
|
||||
CHK_DCB(master_dcb);
|
||||
|
||||
data = (MYSQL_session*)master_dcb->session->client_dcb->data;
|
||||
data = (MYSQL_session*)router_cli_ses->client_dcb->data;
|
||||
|
||||
if(data == NULL)
|
||||
{
|
||||
@ -1668,32 +1666,20 @@ static qc_query_type_t is_read_tmp_table(
|
||||
return type;
|
||||
}
|
||||
|
||||
if(router_cli_ses->rses_master_ref == NULL)
|
||||
if(router_cli_ses->client_dcb == NULL)
|
||||
{
|
||||
MXS_ERROR("[%s] Error: Master server reference is NULL.",
|
||||
__FUNCTION__);
|
||||
MXS_ERROR("[%s] Error: Client DCB is NULL.", __FUNCTION__);
|
||||
return type;
|
||||
}
|
||||
|
||||
if (BREF_IS_IN_USE(router_cli_ses->rses_master_ref))
|
||||
{
|
||||
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
|
||||
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
|
||||
|
||||
if(master_dcb == NULL || master_dcb->session == NULL)
|
||||
{
|
||||
MXS_ERROR("[%s] Error: Master server DBC is NULL. "
|
||||
"This means that the connection to the master server is already "
|
||||
"closed while a query is still being routed.",__FUNCTION__);
|
||||
return qtype;
|
||||
}
|
||||
CHK_DCB(master_dcb);
|
||||
|
||||
data = (MYSQL_session*)master_dcb->session->client_dcb->data;
|
||||
data = (MYSQL_session*)router_cli_ses->client_dcb->data;
|
||||
|
||||
if(data == NULL)
|
||||
{
|
||||
MXS_ERROR("[%s] Error: User data in master server DBC is NULL.",__FUNCTION__);
|
||||
MXS_ERROR("[%s] Error: User data in client DBC is NULL.",__FUNCTION__);
|
||||
return qtype;
|
||||
}
|
||||
|
||||
@ -1765,7 +1751,6 @@ static void check_create_tmp_table(
|
||||
int klen = 0;
|
||||
char *hkey,*dbname;
|
||||
MYSQL_session* data;
|
||||
DCB* master_dcb = NULL;
|
||||
rses_property_t* rses_prop_tmp;
|
||||
HASHTABLE* h;
|
||||
|
||||
@ -1776,28 +1761,15 @@ static void check_create_tmp_table(
|
||||
return;
|
||||
}
|
||||
|
||||
if(router_cli_ses->rses_master_ref == NULL)
|
||||
if(router_cli_ses->client_dcb == NULL)
|
||||
{
|
||||
MXS_ERROR("[%s] Error: Master server reference is NULL.",
|
||||
__FUNCTION__);
|
||||
MXS_ERROR("[%s] Error: Client DCB is NULL.", __FUNCTION__);
|
||||
return;
|
||||
}
|
||||
|
||||
router_cli_ses->have_tmp_tables = true;
|
||||
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
|
||||
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
|
||||
|
||||
if(master_dcb == NULL || master_dcb->session == NULL)
|
||||
{
|
||||
MXS_ERROR("[%s] Error: Master server DCB is NULL. "
|
||||
"This means that the connection to the master server is already "
|
||||
"closed while a query is still being routed.",__FUNCTION__);
|
||||
return;
|
||||
}
|
||||
|
||||
CHK_DCB(master_dcb);
|
||||
|
||||
data = (MYSQL_session*)master_dcb->session->client_dcb->data;
|
||||
data = (MYSQL_session*)router_cli_ses->client_dcb->data;
|
||||
|
||||
if(data == NULL)
|
||||
{
|
||||
@ -2097,10 +2069,10 @@ static bool route_single_stmt(
|
||||
* Read stored master DCB pointer. If master is not set, routing must
|
||||
* be aborted
|
||||
*/
|
||||
if ((master_dcb = rses->rses_master_ref->bref_dcb) == NULL)
|
||||
if ((master_dcb = rses->rses_master_ref->bref_dcb) == NULL ||
|
||||
BREF_IS_CLOSED(rses->rses_master_ref))
|
||||
{
|
||||
char* query_str = modutil_get_query(querybuf);
|
||||
CHK_DCB(master_dcb);
|
||||
MXS_ERROR("Can't route %s:%s:\"%s\" to "
|
||||
"backend server. Session doesn't have a Master "
|
||||
"node",
|
||||
@ -2112,6 +2084,7 @@ static bool route_single_stmt(
|
||||
goto retblock;
|
||||
}
|
||||
|
||||
CHK_DCB(master_dcb);
|
||||
packet = GWBUF_DATA(querybuf);
|
||||
packet_len = gw_mysql_get_byte3(packet);
|
||||
|
||||
@ -2491,7 +2464,6 @@ static bool route_single_stmt(
|
||||
#if defined(SS_EXTRA_DEBUG)
|
||||
MXS_INFO("Found DCB for slave.");
|
||||
#endif
|
||||
|
||||
atomic_add(&inst->stats.n_slave, 1);
|
||||
}
|
||||
else
|
||||
@ -2847,21 +2819,17 @@ static void clientReply (
|
||||
uint8_t* replybuf = (uint8_t *)GWBUF_DATA(writebuf);
|
||||
size_t len = MYSQL_GET_PACKET_LEN(buf);
|
||||
size_t replylen = MYSQL_GET_PACKET_LEN(replybuf);
|
||||
char* cmdstr = strndup(&((char *)buf)[5], len-4);
|
||||
char* err = strndup(&((char *)replybuf)[8], 5);
|
||||
char* replystr = strndup(&((char *)replybuf)[13],
|
||||
replylen-4-5);
|
||||
|
||||
ss_dassert(len+4 == GWBUF_LENGTH(scur->scmd_cur_cmd->my_sescmd_buf));
|
||||
|
||||
MXS_ERROR("Failed to execute %s in %s:%d. %s %s",
|
||||
cmdstr,
|
||||
|
||||
MXS_ERROR("Failed to execute session command in %s:%d. Error was: %s %s",
|
||||
bref->bref_backend->backend_server->name,
|
||||
bref->bref_backend->backend_server->port,
|
||||
err,
|
||||
replystr);
|
||||
|
||||
free(cmdstr);
|
||||
free(err);
|
||||
free(replystr);
|
||||
}
|
||||
@ -2893,11 +2861,9 @@ static void clientReply (
|
||||
* This applies to session commands only. Counter decrement
|
||||
* for other type of queries is done outside this block.
|
||||
*/
|
||||
if (writebuf != NULL && client_dcb != NULL)
|
||||
{
|
||||
/** Set response status as replied */
|
||||
bref_clear_state(bref, BREF_WAITING_RESULT);
|
||||
}
|
||||
|
||||
/** Set response status as replied */
|
||||
bref_clear_state(bref, BREF_WAITING_RESULT);
|
||||
}
|
||||
/**
|
||||
* Clear BREF_QUERY_ACTIVE flag and decrease waiter counter.
|
||||
@ -3815,8 +3781,8 @@ static GWBUF* sescmd_cursor_process_replies(
|
||||
|
||||
if(bref->reply_cmd != scmd->reply_cmd)
|
||||
{
|
||||
MXS_INFO("Backend server '%s' response differs from master's response. "
|
||||
"Closing connection.",
|
||||
MXS_ERROR("Slave server '%s': response differs from master's response. "
|
||||
"Closing connection due to inconsistent session state.",
|
||||
bref->bref_backend->backend_server->unique_name);
|
||||
sescmd_cursor_set_active(scur,false);
|
||||
bref_clear_state(bref,BREF_QUERY_ACTIVE);
|
||||
|
@ -1,11 +1,11 @@
|
||||
add_library(schemarouter SHARED schemarouter.c sharding_common.c)
|
||||
target_link_libraries(schemarouter maxscale-common query_classifier)
|
||||
target_link_libraries(schemarouter maxscale-common)
|
||||
add_dependencies(schemarouter pcre2)
|
||||
set_target_properties(schemarouter PROPERTIES VERSION "1.0.0")
|
||||
install(TARGETS schemarouter DESTINATION ${MAXSCALE_LIBDIR})
|
||||
|
||||
add_library(shardrouter SHARED shardrouter.c svcconn.c sharding_common.c)
|
||||
target_link_libraries(shardrouter maxscale-common query_classifier)
|
||||
target_link_libraries(shardrouter maxscale-common)
|
||||
add_dependencies(shardrouter pcre2)
|
||||
set_target_properties(shardrouter PROPERTIES VERSION "1.0.0")
|
||||
install(TARGETS shardrouter DESTINATION ${MAXSCALE_LIBDIR})
|
||||
|
@ -1767,7 +1767,7 @@ bool send_database_list(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client)
|
||||
{
|
||||
bool rval = false;
|
||||
spinlock_acquire(&client->shardmap->lock);
|
||||
if (client->shardmap->state == SHMAP_READY)
|
||||
if (client->shardmap->state != SHMAP_UNINIT)
|
||||
{
|
||||
struct string_array strarray;
|
||||
const int size = hashtable_size(client->shardmap->hash);
|
||||
@ -1849,12 +1849,7 @@ static int routeQuery(ROUTER* instance,
|
||||
char errbuf[26+MYSQL_DATABASE_MAXLEN];
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
|
||||
/** Dirty read for quick check if router is closed. */
|
||||
if (router_cli_ses->rses_closed)
|
||||
{
|
||||
rses_is_closed = true;
|
||||
}
|
||||
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
||||
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
||||
|
||||
if (!rses_begin_locked_router_action(router_cli_ses))
|
||||
{
|
||||
@ -4313,7 +4308,7 @@ int process_show_shards(ROUTER_CLIENT_SES* rses)
|
||||
int rval = 0;
|
||||
|
||||
spinlock_acquire(&rses->shardmap->lock);
|
||||
if (rses->shardmap->state == SHMAP_READY)
|
||||
if(rses->shardmap->state != SHMAP_UNINIT)
|
||||
{
|
||||
HASHITERATOR* iter = hashtable_iterator(rses->shardmap->hash);
|
||||
struct shard_list sl;
|
||||
@ -4380,7 +4375,7 @@ bool handle_default_db(ROUTER_CLIENT_SES *router_cli_ses)
|
||||
char* target = NULL;
|
||||
|
||||
spinlock_acquire(&router_cli_ses->shardmap->lock);
|
||||
if (router_cli_ses->shardmap->state == SHMAP_READY)
|
||||
if(router_cli_ses->shardmap->state != SHMAP_UNINIT)
|
||||
{
|
||||
target = hashtable_fetch(router_cli_ses->shardmap->hash, router_cli_ses->connect_db);
|
||||
}
|
||||
|
@ -1,9 +1,9 @@
|
||||
if(MYSQLCLIENT_FOUND AND BUILD_TESTS)
|
||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test.cmake.in ${CMAKE_CURRENT_BINARY_DIR}/test.cmake @ONLY)
|
||||
add_executable(testschemarouter testschemarouter.c)
|
||||
target_link_libraries(testschemarouter ${MYSQLCLIENT_LIBRARIES} ssl crypto dl z m rt pthread)
|
||||
target_link_libraries(testschemarouter maxscale-common)
|
||||
add_executable(testschemarouter2 testschemarouter2.c)
|
||||
target_link_libraries(testschemarouter2 ${MYSQLCLIENT_LIBRARIES} ssl crypto dl z m rt pthread)
|
||||
target_link_libraries(testschemarouter2 maxscale-common)
|
||||
add_test(NAME TestSchemaRouter COMMAND ${CMAKE_COMMAND} -P ${CMAKE_CURRENT_BINARY_DIR}/test.cmake)
|
||||
|
||||
endif()
|
||||
|
@ -1,6 +1,6 @@
|
||||
if(MYSQLCLIENT_FOUND)
|
||||
add_executable(testconnect testconnect.c)
|
||||
message(STATUS "Linking against: ${MYSQLCLIENT_LIBRARIES}")
|
||||
target_link_libraries(testconnect ${MYSQLCLIENT_LIBRARIES} ssl crypto dl z m rt pthread)
|
||||
message(STATUS "Linking against: ${MARIADB_CONNECTOR_LIB}")
|
||||
target_link_libraries(testconnect maxscale-common)
|
||||
add_test(NAME ReadConnRouterAuthTest COMMAND $<TARGET_FILE:testconnect> 10000 ${TEST_HOST} ${MASTER_PORT} ${TEST_HOST} ${TEST_PORT} 1.10)
|
||||
endif()
|
||||
endif()
|
||||
|
Reference in New Issue
Block a user