Changed log messages for blr_read_binlog, added slave send error feaure
Changed log messages for blr_read_binlog, added slave send error feaure. When blr_read_binlog detects an error an error message with 1236 code is sent to the slave and it will stop the replication to binlogrouter
This commit is contained in:
@ -547,7 +547,7 @@ extern int blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *);
|
|||||||
extern int blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t);
|
extern int blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t);
|
||||||
extern void blr_file_flush(ROUTER_INSTANCE *);
|
extern void blr_file_flush(ROUTER_INSTANCE *);
|
||||||
extern BLFILE *blr_open_binlog(ROUTER_INSTANCE *, char *);
|
extern BLFILE *blr_open_binlog(ROUTER_INSTANCE *, char *);
|
||||||
extern GWBUF *blr_read_binlog(ROUTER_INSTANCE *, BLFILE *, unsigned long, REP_HEADER *);
|
extern GWBUF *blr_read_binlog(ROUTER_INSTANCE *, BLFILE *, unsigned long, REP_HEADER *, char *);
|
||||||
extern void blr_close_binlog(ROUTER_INSTANCE *, BLFILE *);
|
extern void blr_close_binlog(ROUTER_INSTANCE *, BLFILE *);
|
||||||
extern unsigned long blr_file_size(BLFILE *);
|
extern unsigned long blr_file_size(BLFILE *);
|
||||||
extern int blr_statistics(ROUTER_INSTANCE *, ROUTER_SLAVE *, GWBUF *);
|
extern int blr_statistics(ROUTER_INSTANCE *, ROUTER_SLAVE *, GWBUF *);
|
||||||
|
@ -395,22 +395,27 @@ BLFILE *file;
|
|||||||
* @param file File record
|
* @param file File record
|
||||||
* @param pos Position of binlog record to read
|
* @param pos Position of binlog record to read
|
||||||
* @param hdr Binlog header to populate
|
* @param hdr Binlog header to populate
|
||||||
|
* @param errmsg Allocated BINLOG_ERROR_MSG_LEN bytes message error buffer
|
||||||
* @return The binlog record wrapped in a GWBUF structure
|
* @return The binlog record wrapped in a GWBUF structure
|
||||||
*/
|
*/
|
||||||
GWBUF *
|
GWBUF *
|
||||||
blr_read_binlog(ROUTER_INSTANCE *router, BLFILE *file, unsigned long pos, REP_HEADER *hdr)
|
blr_read_binlog(ROUTER_INSTANCE *router, BLFILE *file, unsigned long pos, REP_HEADER *hdr, char *errmsg)
|
||||||
{
|
{
|
||||||
uint8_t hdbuf[BINLOG_EVENT_HDR_LEN];
|
uint8_t hdbuf[BINLOG_EVENT_HDR_LEN];
|
||||||
GWBUF *result;
|
GWBUF *result;
|
||||||
unsigned char *data;
|
unsigned char *data;
|
||||||
int n;
|
int n;
|
||||||
unsigned long filelen = 0;
|
unsigned long filelen = 0;
|
||||||
struct stat statb;
|
struct stat statb;
|
||||||
|
|
||||||
memset(&hdbuf, '\0', BINLOG_EVENT_HDR_LEN);
|
memset(&hdbuf, '\0', BINLOG_EVENT_HDR_LEN);
|
||||||
|
|
||||||
|
/* set error indicator */
|
||||||
|
hdr->ok = 0xff;
|
||||||
|
|
||||||
if (!file)
|
if (!file)
|
||||||
{
|
{
|
||||||
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Invalid file pointer for requested binlog at position %lu", pos);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (fstat(file->fd, &statb) == 0)
|
if (fstat(file->fd, &statb) == 0)
|
||||||
@ -418,9 +423,7 @@ struct stat statb;
|
|||||||
|
|
||||||
if (pos > filelen)
|
if (pos > filelen)
|
||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Attempting to read off the end of the binlog file, size %lu, event at %lu", filelen, pos);
|
||||||
"Attempting to read off the end of the binlog file %s, "
|
|
||||||
"event at %lu.", file->binlogname, pos)));
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -429,9 +432,11 @@ struct stat statb;
|
|||||||
{
|
{
|
||||||
if (pos > router->binlog_position)
|
if (pos > router->binlog_position)
|
||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Attempting to read off the binlog file pos %lu, event at %lu",
|
||||||
"Attempting to read off the binlog pos in file %s, "
|
router->binlog_position, pos);
|
||||||
"event at %lu.", file->binlogname, pos)));
|
} else {
|
||||||
|
/* accessing last position is ok */
|
||||||
|
hdr->ok = 0x0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
@ -446,32 +451,33 @@ struct stat statb;
|
|||||||
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
|
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
|
||||||
"Reached end of binlog file at %d.",
|
"Reached end of binlog file at %d.",
|
||||||
pos)));
|
pos)));
|
||||||
|
|
||||||
|
/* set ok indicator */
|
||||||
|
hdr->ok = 0x0;
|
||||||
|
|
||||||
break;
|
break;
|
||||||
case -1:
|
case -1:
|
||||||
{
|
{
|
||||||
char err_msg[BLRM_STRERROR_R_MSG_SIZE+1] = "";
|
char err_msg[BLRM_STRERROR_R_MSG_SIZE+1] = "";
|
||||||
strerror_r(errno, err_msg, BLRM_STRERROR_R_MSG_SIZE);
|
strerror_r(errno, err_msg, BLRM_STRERROR_R_MSG_SIZE);
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
|
||||||
"Failed to read binlog file %s at position %d"
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Failed to read binlog file; (%s), event at %lu",
|
||||||
" (%s).", file->binlogname,
|
err_msg, pos);
|
||||||
pos, err_msg)));
|
|
||||||
if (errno == EBADF)
|
if (errno == EBADF)
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Bad file descriptor for binlog file, refcount %d, descriptor %d, event at %lu",
|
||||||
"Bad file descriptor in read binlog for file %s"
|
file->refcnt, file->fd, pos);
|
||||||
", reference count is %d, descriptor %d.",
|
|
||||||
file->binlogname, file->refcnt, file->fd)));
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Bogus data in log event header; "
|
||||||
"Short read when reading the header. "
|
"expected %d bytes but read %d, position %lu",
|
||||||
"Expected 19 bytes but got %d bytes. "
|
BINLOG_EVENT_HDR_LEN, n, pos);
|
||||||
"Binlog file is %s, position %d",
|
|
||||||
n, file->binlogname, pos)));
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
hdr->timestamp = EXTRACT32(hdbuf);
|
hdr->timestamp = EXTRACT32(hdbuf);
|
||||||
hdr->event_type = hdbuf[4];
|
hdr->event_type = hdbuf[4];
|
||||||
hdr->serverid = EXTRACT32(&hdbuf[5]);
|
hdr->serverid = EXTRACT32(&hdbuf[5]);
|
||||||
@ -479,23 +485,21 @@ struct stat statb;
|
|||||||
hdr->next_pos = EXTRACT32(&hdbuf[13]);
|
hdr->next_pos = EXTRACT32(&hdbuf[13]);
|
||||||
hdr->flags = EXTRACT16(&hdbuf[17]);
|
hdr->flags = EXTRACT16(&hdbuf[17]);
|
||||||
|
|
||||||
|
/* event pos & size checks */
|
||||||
|
if (hdr->next_pos < 0 || hdr->event_size <= 0 || ((hdr->next_pos != (pos + hdr->event_size)) && (hdr->event_type != ROTATE_EVENT))) {
|
||||||
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Client requested master to start replication from invalid position %lu", pos);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* event type checks */
|
||||||
if (router->mariadb10_compat) {
|
if (router->mariadb10_compat) {
|
||||||
if (hdr->event_type > MAX_EVENT_TYPE_MARIADB10) {
|
if (hdr->event_type > MAX_EVENT_TYPE_MARIADB10) {
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Invalid MariaDB 10 event type 0x%x at %lu", hdr->event_type, pos);
|
||||||
"Invalid MariaDB 10 event type 0x%x. "
|
|
||||||
"Binlog file is %s, position %d",
|
|
||||||
hdr->event_type,
|
|
||||||
file->binlogname, pos)));
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (hdr->event_type > MAX_EVENT_TYPE) {
|
if (hdr->event_type > MAX_EVENT_TYPE) {
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Invalid event type 0x%x at %lu", hdr->event_type, pos);
|
||||||
"Invalid event type 0x%x. "
|
|
||||||
"Binlog file is %s, position %d",
|
|
||||||
hdr->event_type,
|
|
||||||
file->binlogname, pos)));
|
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -508,6 +512,7 @@ struct stat statb;
|
|||||||
"file size is %lu. Master will write %lu in %s next.",
|
"file size is %lu. Master will write %lu in %s next.",
|
||||||
pos, file->binlogname, filelen, router->binlog_position,
|
pos, file->binlogname, filelen, router->binlog_position,
|
||||||
router->binlog_name)));
|
router->binlog_name)));
|
||||||
|
|
||||||
if ((n = pread(file->fd, hdbuf, BINLOG_EVENT_HDR_LEN, pos)) != BINLOG_EVENT_HDR_LEN)
|
if ((n = pread(file->fd, hdbuf, BINLOG_EVENT_HDR_LEN, pos)) != BINLOG_EVENT_HDR_LEN)
|
||||||
{
|
{
|
||||||
switch (n)
|
switch (n)
|
||||||
@ -516,33 +521,34 @@ struct stat statb;
|
|||||||
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
|
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
|
||||||
"Reached end of binlog file at %d.",
|
"Reached end of binlog file at %d.",
|
||||||
pos)));
|
pos)));
|
||||||
|
|
||||||
|
/* set ok indicator */
|
||||||
|
hdr->ok = 0x0;
|
||||||
|
|
||||||
break;
|
break;
|
||||||
case -1:
|
case -1:
|
||||||
{
|
{
|
||||||
char err_msg[BLRM_STRERROR_R_MSG_SIZE+1] = "";
|
char err_msg[BLRM_STRERROR_R_MSG_SIZE+1] = "";
|
||||||
strerror_r(errno, err_msg, BLRM_STRERROR_R_MSG_SIZE);
|
strerror_r(errno, err_msg, BLRM_STRERROR_R_MSG_SIZE);
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
|
||||||
"Failed to read binlog file %s at position %d"
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Failed to reread header in binlog file; (%s), event at %lu",
|
||||||
" (%s).", file->binlogname,
|
err_msg, pos);
|
||||||
pos, err_msg)));
|
|
||||||
|
|
||||||
if (errno == EBADF)
|
if (errno == EBADF)
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Bad file descriptor rereading header for binlog file, "
|
||||||
"Bad file descriptor in read binlog for file %s"
|
"refcount %d, descriptor %d, event at %lu",
|
||||||
", reference count is %d, descriptor %d.",
|
file->refcnt, file->fd, pos);
|
||||||
file->binlogname, file->refcnt, file->fd)));
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Bogus data rereading log event header; "
|
||||||
"Short read when reading the header. "
|
"expected %d bytes but read %d, position %lu",
|
||||||
"Expected 19 bytes but got %d bytes. "
|
BINLOG_EVENT_HDR_LEN, n, pos);
|
||||||
"Binlog file is %s, position %d",
|
|
||||||
file->binlogname, pos, n)));
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
hdr->timestamp = EXTRACT32(hdbuf);
|
hdr->timestamp = EXTRACT32(hdbuf);
|
||||||
hdr->event_type = hdbuf[4];
|
hdr->event_type = hdbuf[4];
|
||||||
hdr->serverid = EXTRACT32(&hdbuf[5]);
|
hdr->serverid = EXTRACT32(&hdbuf[5]);
|
||||||
@ -552,9 +558,7 @@ struct stat statb;
|
|||||||
|
|
||||||
if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT)
|
if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT)
|
||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Next event position still incorrect after rereading, event at %lu", pos);
|
||||||
"Next position still incorrect after "
|
|
||||||
"rereading")));
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -566,14 +570,15 @@ struct stat statb;
|
|||||||
}
|
}
|
||||||
if ((result = gwbuf_alloc(hdr->event_size)) == NULL)
|
if ((result = gwbuf_alloc(hdr->event_size)) == NULL)
|
||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Failed to allocate memory for binlog entry, size %d, event at %lu",
|
||||||
"Failed to allocate memory for binlog entry, "
|
hdr->event_size, pos);
|
||||||
"size %d at %d.",
|
|
||||||
hdr->event_size, pos)));
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
data = GWBUF_DATA(result);
|
data = GWBUF_DATA(result);
|
||||||
|
|
||||||
memcpy(data, hdbuf, BINLOG_EVENT_HDR_LEN); // Copy the header in
|
memcpy(data, hdbuf, BINLOG_EVENT_HDR_LEN); // Copy the header in
|
||||||
|
|
||||||
if ((n = pread(file->fd, &data[BINLOG_EVENT_HDR_LEN], hdr->event_size - BINLOG_EVENT_HDR_LEN, pos + BINLOG_EVENT_HDR_LEN))
|
if ((n = pread(file->fd, &data[BINLOG_EVENT_HDR_LEN], hdr->event_size - BINLOG_EVENT_HDR_LEN, pos + BINLOG_EVENT_HDR_LEN))
|
||||||
!= hdr->event_size - BINLOG_EVENT_HDR_LEN) // Read the balance
|
!= hdr->event_size - BINLOG_EVENT_HDR_LEN) // Read the balance
|
||||||
{
|
{
|
||||||
@ -581,24 +586,22 @@ struct stat statb;
|
|||||||
{
|
{
|
||||||
char err_msg[BLRM_STRERROR_R_MSG_SIZE+1] = "";
|
char err_msg[BLRM_STRERROR_R_MSG_SIZE+1] = "";
|
||||||
strerror_r(errno, err_msg, BLRM_STRERROR_R_MSG_SIZE);
|
strerror_r(errno, err_msg, BLRM_STRERROR_R_MSG_SIZE);
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
|
||||||
"Error reading the event at %ld in %s. "
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Error reading the binlog event at %lu;"
|
||||||
"%s, expected %d bytes.",
|
"(%s), expected %d bytes.",
|
||||||
pos, file->binlogname,
|
pos, err_msg, hdr->event_size - BINLOG_EVENT_HDR_LEN);
|
||||||
err_msg, hdr->event_size - BINLOG_EVENT_HDR_LEN)));
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Bogus data in log event entry; "
|
||||||
"Short read when reading the event at %ld in %s. "
|
"expected %d bytes but got %d, position %lu",
|
||||||
"Expected %d bytes got %d bytes.",
|
hdr->event_size - BINLOG_EVENT_HDR_LEN, n, pos);
|
||||||
pos, file->binlogname, hdr->event_size - BINLOG_EVENT_HDR_LEN, n)));
|
|
||||||
if (filelen != 0 && filelen - pos < hdr->event_size)
|
if (filelen != 0 && filelen - pos < hdr->event_size)
|
||||||
{
|
{
|
||||||
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
snprintf(errmsg, BINLOG_ERROR_MSG_LEN, "Binlog event is close to the end of the binlog file; "
|
||||||
"Binlog event is close to the end of the binlog file, "
|
"current file size is %lu, event at %lu",
|
||||||
"current file size is %u.",
|
filelen, pos);
|
||||||
filelen)));
|
|
||||||
}
|
}
|
||||||
blr_log_header(LOGFILE_ERROR, "Possible malformed event header", hdbuf);
|
blr_log_header(LOGFILE_ERROR, "Possible malformed event header", hdbuf);
|
||||||
}
|
}
|
||||||
@ -607,6 +610,10 @@ struct stat statb;
|
|||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* set OK indicator */
|
||||||
|
hdr->ok = 0x0;
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1329,6 +1329,25 @@ int n_bufs = -1, pn_bufs = -1;
|
|||||||
gwbuf_free(record);
|
gwbuf_free(record);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Log whether no event has been sent */
|
||||||
|
if (pos == router->binlog_position) {
|
||||||
|
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
|
||||||
|
"No events distributed to slaves for a pending transaction in %s at %lu."
|
||||||
|
" Last event from master at %lu",
|
||||||
|
router->binlog_name,
|
||||||
|
router->binlog_position,
|
||||||
|
router->current_pos)));
|
||||||
|
}
|
||||||
|
if (pos < router->current_pos) {
|
||||||
|
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
|
||||||
|
"Some events were not distributed to slaves for a pending transaction "
|
||||||
|
"in %s at %lu. Last distributed even at %lu, last event from master at %lu",
|
||||||
|
router->binlog_name,
|
||||||
|
router->binlog_position,
|
||||||
|
pos,
|
||||||
|
router->current_pos)));
|
||||||
|
}
|
||||||
|
|
||||||
spinlock_acquire(&router->lock);
|
spinlock_acquire(&router->lock);
|
||||||
|
|
||||||
router->binlog_position = router->current_pos;
|
router->binlog_position = router->current_pos;
|
||||||
|
@ -1917,6 +1917,9 @@ int written, rval = 1, burst;
|
|||||||
int rotating = 0;
|
int rotating = 0;
|
||||||
unsigned long burst_size;
|
unsigned long burst_size;
|
||||||
uint8_t *ptr;
|
uint8_t *ptr;
|
||||||
|
char read_errmsg[BINLOG_ERROR_MSG_LEN+1];
|
||||||
|
|
||||||
|
read_errmsg[BINLOG_ERROR_MSG_LEN] = '\0';
|
||||||
|
|
||||||
if (large)
|
if (large)
|
||||||
burst = router->long_burst;
|
burst = router->long_burst;
|
||||||
@ -1937,6 +1940,9 @@ uint8_t *ptr;
|
|||||||
rotating = router->rotating;
|
rotating = router->rotating;
|
||||||
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
|
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
|
||||||
{
|
{
|
||||||
|
char err_msg[BINLOG_ERROR_MSG_LEN+1];
|
||||||
|
err_msg[BINLOG_ERROR_MSG_LEN] = '\0';
|
||||||
|
|
||||||
if (rotating)
|
if (rotating)
|
||||||
{
|
{
|
||||||
spinlock_acquire(&slave->catch_lock);
|
spinlock_acquire(&slave->catch_lock);
|
||||||
@ -1948,10 +1954,18 @@ uint8_t *ptr;
|
|||||||
}
|
}
|
||||||
LOGIF(LE, (skygw_log_write(
|
LOGIF(LE, (skygw_log_write(
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
"blr_slave_catchup failed to open binlog file %s",
|
"Slave %s:%i, server-id %d, binlog '%s': blr_slave_catchup failed to open binlog file",
|
||||||
slave->binlogfile)));
|
slave->dcb->remote, slave->port, slave->serverid,
|
||||||
|
slave->binlogfile)));
|
||||||
|
|
||||||
slave->cstate &= ~CS_BUSY;
|
slave->cstate &= ~CS_BUSY;
|
||||||
slave->state = BLRS_ERRORED;
|
slave->state = BLRS_ERRORED;
|
||||||
|
|
||||||
|
snprintf(err_msg, BINLOG_ERROR_MSG_LEN, "Failed to open binlog '%s'", slave->binlogfile);
|
||||||
|
|
||||||
|
/* Send error that stops slave replication */
|
||||||
|
blr_send_custom_error(slave->dcb, slave->seqno++, 0, err_msg, "HY000", 1236);
|
||||||
|
|
||||||
dcb_close(slave->dcb);
|
dcb_close(slave->dcb);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -1959,7 +1973,7 @@ uint8_t *ptr;
|
|||||||
slave->stats.n_bursts++;
|
slave->stats.n_bursts++;
|
||||||
|
|
||||||
while (burst-- && burst_size > 0 &&
|
while (burst-- && burst_size > 0 &&
|
||||||
(record = blr_read_binlog(router, slave->file, slave->binlog_pos, &hdr)) != NULL)
|
(record = blr_read_binlog(router, slave->file, slave->binlog_pos, &hdr, read_errmsg)) != NULL)
|
||||||
{
|
{
|
||||||
head = gwbuf_alloc(5);
|
head = gwbuf_alloc(5);
|
||||||
ptr = GWBUF_DATA(head);
|
ptr = GWBUF_DATA(head);
|
||||||
@ -1983,6 +1997,8 @@ uint8_t *ptr;
|
|||||||
beat1 = hkheartbeat;
|
beat1 = hkheartbeat;
|
||||||
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
|
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
|
||||||
{
|
{
|
||||||
|
char err_msg[BINLOG_ERROR_MSG_LEN+1];
|
||||||
|
err_msg[BINLOG_ERROR_MSG_LEN] = '\0';
|
||||||
if (rotating)
|
if (rotating)
|
||||||
{
|
{
|
||||||
spinlock_acquire(&slave->catch_lock);
|
spinlock_acquire(&slave->catch_lock);
|
||||||
@ -1994,9 +2010,19 @@ uint8_t *ptr;
|
|||||||
}
|
}
|
||||||
LOGIF(LE, (skygw_log_write(
|
LOGIF(LE, (skygw_log_write(
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
"blr_slave_catchup failed to open binlog file %s",
|
"Slave %s:%i, server-id %d, binlog '%s': blr_slave_catchup failed to open binlog file in rotate event",
|
||||||
|
slave->dcb->remote,
|
||||||
|
slave->port,
|
||||||
|
slave->serverid,
|
||||||
slave->binlogfile)));
|
slave->binlogfile)));
|
||||||
|
|
||||||
slave->state = BLRS_ERRORED;
|
slave->state = BLRS_ERRORED;
|
||||||
|
|
||||||
|
snprintf(err_msg, BINLOG_ERROR_MSG_LEN, "Failed to open binlog '%s' in rotate event", slave->binlogfile);
|
||||||
|
|
||||||
|
/* Send error that stops slave replication */
|
||||||
|
blr_send_custom_error(slave->dcb, (slave->seqno - 1), 0, err_msg, "HY000", 1236);
|
||||||
|
|
||||||
dcb_close(slave->dcb);
|
dcb_close(slave->dcb);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -2019,8 +2045,31 @@ uint8_t *ptr;
|
|||||||
if (router->send_slave_heartbeat)
|
if (router->send_slave_heartbeat)
|
||||||
slave->lastReply = time(0);
|
slave->lastReply = time(0);
|
||||||
}
|
}
|
||||||
if (record == NULL)
|
if (record == NULL) {
|
||||||
slave->stats.n_failed_read++;
|
slave->stats.n_failed_read++;
|
||||||
|
|
||||||
|
if (hdr.ok == 0xff) {
|
||||||
|
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||||
|
"Slave %s:%i, server-id %d, binlog '%s', blr_read_binlog failure: %s",
|
||||||
|
slave->dcb->remote,
|
||||||
|
slave->port,
|
||||||
|
slave->serverid,
|
||||||
|
slave->binlogfile,
|
||||||
|
read_errmsg)));
|
||||||
|
|
||||||
|
spinlock_acquire(&slave->catch_lock);
|
||||||
|
|
||||||
|
slave->state = BLRS_ERRORED;
|
||||||
|
|
||||||
|
spinlock_release(&slave->catch_lock);
|
||||||
|
|
||||||
|
blr_send_custom_error(slave->dcb, slave->seqno++, 0, read_errmsg, "HY000", 1236);
|
||||||
|
|
||||||
|
dcb_close(slave->dcb);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
spinlock_acquire(&slave->catch_lock);
|
spinlock_acquire(&slave->catch_lock);
|
||||||
slave->cstate &= ~CS_BUSY;
|
slave->cstate &= ~CS_BUSY;
|
||||||
spinlock_release(&slave->catch_lock);
|
spinlock_release(&slave->catch_lock);
|
||||||
@ -2289,13 +2338,24 @@ REP_HEADER hdr;
|
|||||||
GWBUF *record, *head;
|
GWBUF *record, *head;
|
||||||
uint8_t *ptr;
|
uint8_t *ptr;
|
||||||
uint32_t chksum;
|
uint32_t chksum;
|
||||||
|
char err_msg[BINLOG_ERROR_MSG_LEN+1];
|
||||||
|
|
||||||
|
err_msg[BINLOG_ERROR_MSG_LEN] = '\0';
|
||||||
|
|
||||||
memset(&hdr, 0, BINLOG_EVENT_HDR_LEN);
|
memset(&hdr, 0, BINLOG_EVENT_HDR_LEN);
|
||||||
|
|
||||||
if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL)
|
if ((file = blr_open_binlog(router, slave->binlogfile)) == NULL)
|
||||||
return;
|
return;
|
||||||
if ((record = blr_read_binlog(router, file, 4, &hdr)) == NULL)
|
if ((record = blr_read_binlog(router, file, 4, &hdr, err_msg)) == NULL)
|
||||||
{
|
{
|
||||||
|
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
|
||||||
|
"Slave %s:%i, server-id %d, binlog '%s', blr_read_binlog failure: %s",
|
||||||
|
slave->dcb->remote,
|
||||||
|
slave->port,
|
||||||
|
slave->serverid,
|
||||||
|
slave->binlogfile,
|
||||||
|
err_msg)));
|
||||||
|
|
||||||
blr_close_binlog(router, file);
|
blr_close_binlog(router, file);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user