MXS-1545: handling of slave file EOF refactoring

MXS-1545: handling of slave file EOF refactoring.

Some slave/router state are now checked before any WARN/ERROR messages
about slave file EOF.

The missing “next_file” is always logged with warn.
This commit is contained in:
MassimilianoPinto
2017-12-01 14:02:02 +01:00
parent d4c0d74ab4
commit 574af7762d
4 changed files with 484 additions and 137 deletions

View File

@ -1063,7 +1063,7 @@ createInstance(SERVICE *service, char **options)
if (inst->mariadb10_master_gtid && if (inst->mariadb10_master_gtid &&
inst->current_pos <= 4) inst->current_pos <= 4)
{ {
MARIADB_GTID_INFO last_gtid = {}; MARIADB_GTID_INFO last_gtid;
memset(&last_gtid, 0, sizeof(last_gtid)); memset(&last_gtid, 0, sizeof(last_gtid));
/* Get last MariaDB GTID from repo */ /* Get last MariaDB GTID from repo */
@ -1224,6 +1224,7 @@ newSession(MXS_ROUTER *instance, MXS_SESSION *session)
slave->mariadb_gtid = NULL; slave->mariadb_gtid = NULL;
slave->gtid_maps = NULL; slave->gtid_maps = NULL;
memset(&slave->f_info, 0, sizeof (MARIADB_GTID_INFO)); memset(&slave->f_info, 0, sizeof (MARIADB_GTID_INFO));
slave->annotate_rows = false;
/** /**
* Add this session to the list of active sessions. * Add this session to the list of active sessions.

View File

@ -1714,7 +1714,7 @@ blr_file_next_exists(ROUTER_INSTANCE *router,
strncpy(next_file, result.file, BINLOG_FNAMELEN); strncpy(next_file, result.file, BINLOG_FNAMELEN);
next_file[BINLOG_FNAMELEN] = '\0'; next_file[BINLOG_FNAMELEN] = '\0';
MXS_INFO("The next Binlog file from GTID maps repo is [%s]", MXS_DEBUG("The next Binlog file from GTID maps repo is [%s]",
bigbuf); bigbuf);
spinlock_acquire(&slave->catch_lock); spinlock_acquire(&slave->catch_lock);

View File

@ -109,6 +109,14 @@ typedef struct
uint64_t rowid; /* ROWID of router current file*/ uint64_t rowid; /* ROWID of router current file*/
} BINARY_LOG_DATA_RESULT; } BINARY_LOG_DATA_RESULT;
/** Slave file read EOF handling */
typedef enum
{
SLAVE_EOF_ROTATE = 0,
SLAVE_EOF_WARNING,
SLAVE_EOF_ERROR
} slave_eof_action_t;
extern void poll_fake_write_event(DCB *dcb); extern void poll_fake_write_event(DCB *dcb);
static char* get_next_token(char *str, const char* delim, char **saveptr); static char* get_next_token(char *str, const char* delim, char **saveptr);
extern int load_mysql_users(SERV_LISTENER *listener); extern int load_mysql_users(SERV_LISTENER *listener);
@ -341,7 +349,7 @@ static bool blr_handle_complex_select(ROUTER_INSTANCE *router,
extern bool blr_is_current_binlog(ROUTER_INSTANCE *router, extern bool blr_is_current_binlog(ROUTER_INSTANCE *router,
ROUTER_SLAVE *slave); ROUTER_SLAVE *slave);
extern bool blr_compare_binlogs(const ROUTER_INSTANCE *router, extern bool blr_compare_binlogs(const ROUTER_INSTANCE *router,
const MARIADB_GTID_INFO *slave, const MARIADB_GTID_ELEMS *info,
const char *r_file, const char *r_file,
const char *s_file); const char *s_file);
static bool blr_purge_binary_logs(ROUTER_INSTANCE *router, static bool blr_purge_binary_logs(ROUTER_INSTANCE *router,
@ -376,6 +384,15 @@ static bool blr_apply_changes(ROUTER_INSTANCE *router,
CHANGE_MASTER_OPTIONS change_master, CHANGE_MASTER_OPTIONS change_master,
char *new_logfile, char *new_logfile,
char *error); char *error);
static void blr_slave_info_save(const MARIADB_GTID_INFO *info,
MARIADB_GTID_INFO *save_info,
char *save_prefix);
static void blr_slave_log_next_file_action(const ROUTER_INSTANCE *router,
const ROUTER_SLAVE *slave,
const char *c_prefix,
const char *next_file,
slave_eof_action_t log_action);
/** /**
* Process a request packet from the slave server. * Process a request packet from the slave server.
* *
@ -439,7 +456,10 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
/* Request now the binlog records */ /* Request now the binlog records */
rv = blr_slave_binlog_dump(router, slave, queue); rv = blr_slave_binlog_dump(router, slave, queue);
if (rv && router->send_slave_heartbeat && slave->heartbeat > 0) /* Check whether to add the heartbeat check for this slave */
if (rv && slave->state == BLRS_DUMPING &&
router->send_slave_heartbeat &&
slave->heartbeat > 0)
{ {
char task_name[BLRM_TASK_NAME_LEN + 1] = ""; char task_name[BLRM_TASK_NAME_LEN + 1] = "";
snprintf(task_name, snprintf(task_name,
@ -2341,6 +2361,9 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
file = NULL; file = NULL;
#endif #endif
// Prefix for BLR_BINLOG_STORAGE_TREE
char t_prefix[BINLOG_FILE_EXTRA_INFO] = "";
if (file == NULL) if (file == NULL)
{ {
rotating = router->rotating; rotating = router->rotating;
@ -2360,11 +2383,22 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
poll_fake_write_event(slave->dcb); poll_fake_write_event(slave->dcb);
return rval; return rval;
} }
MXS_ERROR("Slave %s:%i, server-id %d, binlog '%s': blr_slave_catchup "
"failed to open binlog file", /* Fill the file prefix */
if (f_tree)
{
sprintf(t_prefix,
"%" PRIu32 "/%" PRIu32 "/",
f_tree->gtid_elms.domain_id,
f_tree->gtid_elms.server_id);
}
MXS_ERROR("Slave %s:%i, server-id %d, binlog '%s%s': blr_slave_catchup "
"failed to open binlog file.",
slave->dcb->remote, slave->dcb->remote,
dcb_get_port(slave->dcb), dcb_get_port(slave->dcb),
slave->serverid, slave->serverid,
t_prefix,
slave->binlogfile); slave->binlogfile);
slave->cstate &= ~CS_BUSY; slave->cstate &= ~CS_BUSY;
@ -2395,7 +2429,9 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
#endif #endif
int events_before = slave->stats.n_events; int events_before = slave->stats.n_events;
/* Loop read binlog events from slave binlog file */
while (burst-- && burst_size > 0 && while (burst-- && burst_size > 0 &&
/* Read one binlog event */
(record = blr_read_binlog(router, (record = blr_read_binlog(router,
file, file,
slave->binlog_pos, slave->binlog_pos,
@ -2407,11 +2443,22 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
uint32_t binlog_pos; uint32_t binlog_pos;
uint32_t event_size; uint32_t event_size;
/* Get up to date file prefix */
if (f_tree)
{
sprintf(t_prefix,
"%" PRIu32 "/%" PRIu32 "/",
slave->f_info.gtid_elms.domain_id,
slave->f_info.gtid_elms.server_id);
}
strcpy(binlog_name, slave->binlogfile); strcpy(binlog_name, slave->binlogfile);
binlog_pos = slave->binlog_pos; binlog_pos = slave->binlog_pos;
/* Don't sent special events generated by MaxScale /**
* or ANNOTATE_ROWS events if not requested */ * Don't sent special events generated by MaxScale
* or ANNOTATE_ROWS events if not requested
*/
if (hdr.event_type == MARIADB10_START_ENCRYPTION_EVENT || if (hdr.event_type == MARIADB10_START_ENCRYPTION_EVENT ||
hdr.event_type == IGNORABLE_EVENT || hdr.event_type == IGNORABLE_EVENT ||
(!slave->annotate_rows && (!slave->annotate_rows &&
@ -2452,7 +2499,8 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
slave->encryption_ctx = encryption_ctx; slave->encryption_ctx = encryption_ctx;
MXS_INFO("Start Encryption event found while reading. " MXS_INFO("Start Encryption event found while reading. "
"Binlog %s is encrypted. First event at %lu", "Binlog '%s%s' is encrypted. First event at %lu",
t_prefix,
slave->binlogfile, slave->binlogfile,
(unsigned long)hdr.next_pos); (unsigned long)hdr.next_pos);
} }
@ -2460,18 +2508,20 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT) else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT)
{ {
MXS_INFO("Skipping ANNOTATE_ROWS event [%s] of size %lu while " MXS_INFO("Skipping ANNOTATE_ROWS event [%s] of size %lu while "
"reading binlog %s at %lu", "reading binlog '%s%s' at %lu",
blr_get_event_description(router, hdr.event_type), blr_get_event_description(router, hdr.event_type),
(unsigned long)hdr.event_size, (unsigned long)hdr.event_size,
t_prefix,
slave->binlogfile, slave->binlogfile,
(unsigned long)slave->binlog_pos); (unsigned long)slave->binlog_pos);
} }
else else
{ {
MXS_INFO("Found ignorable event [%s] of size %lu while " MXS_INFO("Found ignorable event [%s] of size %lu while "
"reading binlog %s at %lu", "reading binlog '%s%s' at %lu",
blr_get_event_description(router, hdr.event_type), blr_get_event_description(router, hdr.event_type),
(unsigned long)hdr.event_size, (unsigned long)hdr.event_size,
t_prefix,
slave->binlogfile, slave->binlogfile,
(unsigned long)slave->binlog_pos); (unsigned long)slave->binlog_pos);
} }
@ -2487,6 +2537,7 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
break; break;
} }
/* Handle ROTATE_EVENT */
if (hdr.event_type == ROTATE_EVENT) if (hdr.event_type == ROTATE_EVENT)
{ {
unsigned long beat1 = hkheartbeat; unsigned long beat1 = hkheartbeat;
@ -2497,6 +2548,7 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
MXS_ERROR("blr_close_binlog took %lu maxscale beats", MXS_ERROR("blr_close_binlog took %lu maxscale beats",
hkheartbeat - beat1); hkheartbeat - beat1);
} }
/* Set new file in slave->binlogfile */
blr_slave_rotate(router, slave, GWBUF_DATA(record)); blr_slave_rotate(router, slave, GWBUF_DATA(record));
/* reset the encryption context */ /* reset the encryption context */
@ -2526,11 +2578,22 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
poll_fake_write_event(slave->dcb); poll_fake_write_event(slave->dcb);
return rval; return rval;
} }
MXS_ERROR("Slave %s:%i, server-id %d, binlog '%s': blr_slave_catchup "
/* Refresh file prefix */
if (f_tree)
{
sprintf(t_prefix,
"%" PRIu32 "/%" PRIu32 "/",
slave->f_info.gtid_elms.domain_id,
slave->f_info.gtid_elms.server_id);
}
MXS_ERROR("Slave %s:%i, server-id %d, binlog '%s%s': blr_slave_catchup "
"failed to open binlog file in rotate event", "failed to open binlog file in rotate event",
slave->dcb->remote, slave->dcb->remote,
dcb_get_port(slave->dcb), dcb_get_port(slave->dcb),
slave->serverid, slave->serverid,
t_prefix,
slave->binlogfile); slave->binlogfile);
slave->state = BLRS_ERRORED; slave->state = BLRS_ERRORED;
@ -2556,6 +2619,7 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
return 0; return 0;
} }
#ifdef BLFILE_IN_SLAVE #ifdef BLFILE_IN_SLAVE
file = slave->file; file = slave->file;
#endif #endif
@ -2566,6 +2630,7 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
} }
} }
/* Send the binlog event */
if (blr_send_event(BLR_THREAD_ROLE_SLAVE, if (blr_send_event(BLR_THREAD_ROLE_SLAVE,
binlog_name, binlog_name,
binlog_pos, binlog_pos,
@ -2582,12 +2647,13 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
} }
else else
{ {
MXS_WARNING("Slave %s:%i, server-id %d, binlog '%s, position %u: " MXS_WARNING("Slave %s:%i, server-id %d, binlog '%s%s', position %u: "
"Slave-thread could not send event to slave, " "Slave-thread could not send event to slave, "
"closing connection.", "closing connection.",
slave->dcb->remote, slave->dcb->remote,
dcb_get_port(slave->dcb), dcb_get_port(slave->dcb),
slave->serverid, slave->serverid,
t_prefix,
binlog_name, binlog_name,
binlog_pos); binlog_pos);
#ifndef BLFILE_IN_SLAVE #ifndef BLFILE_IN_SLAVE
@ -2623,28 +2689,39 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
ss_dassert(record == NULL); ss_dassert(record == NULL);
/* Refresh file prefix */
if (f_tree)
{
sprintf(t_prefix,
"%" PRIu32 "/%" PRIu32 "/",
slave->f_info.gtid_elms.domain_id,
slave->f_info.gtid_elms.server_id);
}
if (hdr.ok != SLAVE_POS_READ_OK) if (hdr.ok != SLAVE_POS_READ_OK)
{ {
slave->stats.n_failed_read++; slave->stats.n_failed_read++;
if (hdr.ok == SLAVE_POS_BAD_FD) if (hdr.ok == SLAVE_POS_BAD_FD)
{ {
MXS_ERROR("%s Slave %s:%i, server-id %d, binlog '%s', %s", MXS_ERROR("%s Slave %s:%i, server-id %d, binlog '%s%s', %s",
router->service->name, router->service->name,
slave->dcb->remote, slave->dcb->remote,
dcb_get_port(slave->dcb), dcb_get_port(slave->dcb),
slave->serverid, slave->serverid,
t_prefix,
slave->binlogfile, slave->binlogfile,
read_errmsg); read_errmsg);
} }
if (hdr.ok == SLAVE_POS_BEYOND_EOF) if (hdr.ok == SLAVE_POS_BEYOND_EOF)
{ {
MXS_ERROR("%s Slave %s:%i, server-id %d, binlog '%s', %s", MXS_ERROR("%s Slave %s:%i, server-id %d, binlog '%s%s', %s",
router->service->name, router->service->name,
slave->dcb->remote, slave->dcb->remote,
dcb_get_port(slave->dcb), dcb_get_port(slave->dcb),
slave->serverid, slave->serverid,
t_prefix,
slave->binlogfile, slave->binlogfile,
read_errmsg); read_errmsg);
@ -2662,11 +2739,12 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
if (hdr.ok == SLAVE_POS_READ_ERR) if (hdr.ok == SLAVE_POS_READ_ERR)
{ {
MXS_ERROR("%s Slave %s:%i, server-id %d, binlog '%s', %s", MXS_ERROR("%s Slave %s:%i, server-id %d, binlog '%s%s', %s",
router->service->name, router->service->name,
slave->dcb->remote, slave->dcb->remote,
dcb_get_port(slave->dcb), dcb_get_port(slave->dcb),
slave->serverid, slave->serverid,
t_prefix,
slave->binlogfile, slave->binlogfile,
read_errmsg); read_errmsg);
@ -2696,12 +2774,13 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
if (hdr.ok == SLAVE_POS_READ_UNSAFE) if (hdr.ok == SLAVE_POS_READ_UNSAFE)
{ {
MXS_NOTICE("%s: Slave %s:%i, server-id %d, binlog '%s', read %d events, " MXS_NOTICE("%s: Slave %s:%i, server-id %d, binlog '%s%s', read %d events, "
"current committed transaction event being sent: %lu, %s", "current committed transaction event being sent: %lu, %s",
router->service->name, router->service->name,
slave->dcb->remote, slave->dcb->remote,
dcb_get_port(slave->dcb), dcb_get_port(slave->dcb),
slave->serverid, slave->serverid,
t_prefix,
slave->binlogfile, slave->binlogfile,
slave->stats.n_events - events_before, slave->stats.n_events - events_before,
router->current_safe_event, router->current_safe_event,
@ -2719,11 +2798,17 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
/** /**
* Check now slave position with read indicator = SLAVE_POS_READ_OK * Check now slave position with read indicator = SLAVE_POS_READ_OK
* *
* 1) Same name and pos as current router file: aka Up To Date * Two cases handled:
* (1) The slave is Up To Date
* (2) The slave is at EOF of a file which is not the current router file
*
*/ */
if (slave->binlog_pos == router->binlog_position && if (slave->binlog_pos == router->binlog_position &&
blr_is_current_binlog(router, slave)) blr_is_current_binlog(router, slave))
{ {
/**
* (1) Same name and pos as current router file: aka Up To Date
*/
spinlock_acquire(&router->binlog_lock); spinlock_acquire(&router->binlog_lock);
spinlock_acquire(&slave->catch_lock); spinlock_acquire(&slave->catch_lock);
@ -2738,7 +2823,7 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
spinlock_release(&slave->catch_lock); spinlock_release(&slave->catch_lock);
spinlock_release(&router->binlog_lock); spinlock_release(&router->binlog_lock);
/* force slave to read events via catchup routine */ /* Force slave to read events via catchup routine */
poll_fake_write_event(slave->dcb); poll_fake_write_event(slave->dcb);
} }
else else
@ -2758,45 +2843,132 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
} }
else else
{ {
char next_file[BINLOG_FNAMELEN + 1] = ""; /**
/* 2) Checking End Of File of the slave binlog file */ * (2) Checking End Of File of the slave binlog file
* and current router file
*/
if (slave->binlog_pos >= blr_file_size(file) && if (slave->binlog_pos >= blr_file_size(file) &&
router->rotating == 0 && router->rotating == 0 &&
(!blr_is_current_binlog(router, slave))) !blr_is_current_binlog(router, slave))
{ {
/** /**
* If next file to read doesn't exist, retry the check up to * This is end of current slave file
* MISSING_FILE_READ_RETRIES times before giving up. * which is not the current router binlog file
*/
char next_file[BINLOG_FNAMELEN + 1] = "";
MARIADB_GTID_INFO current_info;
char c_prefix[BINLOG_FILE_EXTRA_INFO] = "";
bool have_heartbeat = router->send_slave_heartbeat &&
(slave->heartbeat > 0);
/**
* Save current MARIADB_GTID_INFO detail because
* calling blr_file_next_exists() overwrites that
*/
if (f_tree)
{
spinlock_acquire(&slave->catch_lock);
blr_slave_info_save(&slave->f_info, &current_info, c_prefix);
spinlock_release(&slave->catch_lock);
}
/**
* Check now whether the next file exists and it's readable
*
* If not, handle some cases
* if found issue a fake_rotate event
*/ */
if (!blr_file_next_exists(router, slave, next_file)) if (!blr_file_next_exists(router, slave, next_file))
{ {
spinlock_acquire(&slave->catch_lock); /**
if (slave->stats.n_failed_read < MISSING_FILE_READ_RETRIES) * The next binlog file to read doesn't exist
* or it's not set.
*/
if (router->mariadb10_master_gtid &&
router->master_state == BLRM_SLAVE_STOPPED &&
!router->binlog_name[0])
{ {
slave->cstate |= CS_EXPECTCB; /**
slave->cstate &= ~CS_BUSY; * (1) Don't care about empty router->binlogname in
* BLRM_SLAVE_STOPPED state when GTID
* registration is on:
* set CS_WAIT_DATA and return.
*/
spinlock_acquire(&slave->catch_lock);
if (f_tree)
{
/**
* We need to deal with current slave file:
* restore first the GTID info into slave->f_info
*/
memcpy(&slave->f_info,
&current_info,
sizeof(MARIADB_GTID_INFO));
}
/**
* We force cachtup state to CS_WAIT_DATA now:
*
* The slave can be called by any new master
* event received (no matter which is the binlog file)
* or by an heartbeat event.
*/
slave->cstate = CS_WAIT_DATA;
spinlock_release(&slave->catch_lock); spinlock_release(&slave->catch_lock);
/* Force slave to read via catchup routine */ #ifndef BLFILE_IN_SLAVE
poll_fake_write_event(slave->dcb); /* Close file */
blr_close_binlog(router, file);
#endif
return rval; return rval;
} }
/**
* (2) The next file is not available/existent, actions:
*
* If router state is BLRM_BINLOGDUMP
* - abort slave connection if MISSING_FILE_READ_RETRIES is hit
* or
* - just log a warning message
*
* Note: in any other router state we don't log messages
*/
if (router->master_state == BLRM_BINLOGDUMP)
{
spinlock_acquire(&slave->catch_lock);
/* Router state is BLRM_BINLOGDUMP (aka replicating) */
if (slave->stats.n_failed_read < MISSING_FILE_READ_RETRIES)
{
slave->stats.n_failed_read++;
spinlock_release(&slave->catch_lock);
/* Log warning for missing file */
blr_slave_log_next_file_action(router,
slave,
c_prefix,
next_file,
SLAVE_EOF_WARNING);
}
else
{
/**
* Force error and disconnect
* when exceeding error counter limit
*/
slave->state = BLRS_ERRORED; slave->state = BLRS_ERRORED;
spinlock_release(&slave->catch_lock); spinlock_release(&slave->catch_lock);
MXS_ERROR("%s: Slave [%s]:%d, server-id %d reached " /* Log error for missing file */
"end of file for '%s' and next file to read '%s' " blr_slave_log_next_file_action(router,
"doesn't exist. Force replication abort after %d retries.", slave,
router->service->name, c_prefix,
slave->dcb->remote,
dcb_get_port(slave->dcb),
slave->serverid,
slave->binlogfile,
next_file, next_file,
MISSING_FILE_READ_RETRIES); SLAVE_EOF_ERROR);
/* Send error that stops slave replication */ /* Send error that stops slave replication */
blr_send_custom_error(slave->dcb, blr_send_custom_error(slave->dcb,
@ -2807,13 +2979,41 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
BINLOG_FATAL_ERROR_READING); BINLOG_FATAL_ERROR_READING);
#ifndef BLFILE_IN_SLAVE #ifndef BLFILE_IN_SLAVE
/* Close file */
blr_close_binlog(router, file); blr_close_binlog(router, file);
#endif #endif
/* Disconnect client */
dcb_close(slave->dcb); dcb_close(slave->dcb);
return 0; return 0;
} }
} // No else branch: no further actions
/**
* We need to deal with current slave file:
* restore first the GTID info into slave->f_info
*/
spinlock_acquire(&slave->catch_lock);
if (f_tree)
{
memcpy(&slave->f_info,
&current_info,
sizeof(MARIADB_GTID_INFO));
}
/**
* We force cachtup state to CS_WAIT_DATA now:
*
* The slave can be called by any new master
* event received (no matter which is the binlog file)
* or by an heartbeat event.
*/
slave->cstate = CS_WAIT_DATA;
spinlock_release(&slave->catch_lock);
}
else
{
/* We may have reached the end of file of a non-current /* We may have reached the end of file of a non-current
* binlog file. * binlog file.
* *
@ -2821,19 +3021,17 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
* which the rotate event has been written to the old binlog * which the rotate event has been written to the old binlog
* but the new binlog file has not yet been created. Therefore * but the new binlog file has not yet been created. Therefore
* we ignore these issues during the rotate processing. * we ignore these issues during the rotate processing.
*
* We send a fake_rotate_event to 'next_file'
* Note:
* slave->f_info updated by previous call to
* blr_file_next_exists()
*/ */
MXS_ERROR("%s: Slave [%s]:%d, server-id %d reached end of file for binlog file %s " blr_slave_log_next_file_action(router,
"at %lu which is not the file currently being downloaded. " slave,
"Master binlog is %s, %lu. This may be caused by a " c_prefix,
"previous failure of the master.", next_file,
router->service->name, SLAVE_EOF_ROTATE);
slave->dcb->remote,
dcb_get_port(slave->dcb),
slave->serverid,
slave->binlogfile,
(unsigned long)slave->binlog_pos,
router->binlog_name,
router->binlog_position);
/* Reset encryption context */ /* Reset encryption context */
MXS_FREE(slave->encryption_ctx); MXS_FREE(slave->encryption_ctx);
@ -2855,31 +3053,36 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
spinlock_acquire(&slave->catch_lock); spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB; slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock); spinlock_release(&slave->catch_lock);
/* /**
* Fake rotate written to client: * Note:
* Fake rotate just written to client,
* no need to call poll_fake_write_event() * no need to call poll_fake_write_event()
*/ */
} }
else else
{ {
/* Set ERROR */
slave->state = BLRS_ERRORED; slave->state = BLRS_ERRORED;
/* Disconnect client */
dcb_close(slave->dcb); dcb_close(slave->dcb);
#ifndef BLFILE_IN_SLAVE #ifndef BLFILE_IN_SLAVE
/* Close file */
blr_close_binlog(router, file); blr_close_binlog(router, file);
#endif #endif
return 0; return 0;
} }
} }
}
else else
{ { /**
/** * Still reading from current slave file but
* Nothing has been written to client right now * nothing has been written to client right now
* (perhaps some ignorable / skipped events)
* just retry to read again. * just retry to read again.
*/ */
spinlock_acquire(&slave->catch_lock); spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB; slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock); spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb); poll_fake_write_event(slave->dcb);
} }
} }
@ -2887,9 +3090,11 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
#ifndef BLFILE_IN_SLAVE #ifndef BLFILE_IN_SLAVE
if (file) if (file)
{ {
/* Close file */
blr_close_binlog(router, file); blr_close_binlog(router, file);
} }
#endif #endif
return rval; return rval;
} }
@ -3112,7 +3317,7 @@ blr_slave_read_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
static uint32_t static uint32_t
blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *fde) blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *fde)
{ {
GWBUF *head; GWBUF *event;
uint8_t *ptr; uint8_t *ptr;
uint32_t chksum; uint32_t chksum;
uint32_t event_size; uint32_t event_size;
@ -3123,12 +3328,14 @@ blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *fde)
return 0; return 0;
} }
event_ptr = GWBUF_DATA(fde); event_size = GWBUF_LENGTH(fde);
if ((head = gwbuf_alloc(MYSQL_HEADER_LEN + 1)) == NULL)
if ((event = gwbuf_alloc(MYSQL_HEADER_LEN + 1 + event_size)) == NULL)
{ {
return 0; return 0;
} }
ptr = GWBUF_DATA(head);
ptr = GWBUF_DATA(event);
event_size = GWBUF_LENGTH(fde); event_size = GWBUF_LENGTH(fde);
@ -3137,13 +3344,15 @@ blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *fde)
ptr += 3; ptr += 3;
*ptr++ = slave->seqno++; *ptr++ = slave->seqno++;
*ptr++ = 0; // OK/ERR byte *ptr++ = 0; // OK/ERR byte
head = gwbuf_append(head, fde);
event_ptr = GWBUF_DATA(fde); // Copy FDE data
encode_value(event_ptr, time(0), 32); // Overwrite timestamp memcpy(ptr, GWBUF_DATA(fde), event_size);
event_ptr += 13; // 4 time + 1 type + 4 server_id + 4 event_size
encode_value(ptr, time(0), 32); // Overwrite timestamp
ptr += 13; // 4 time + 1 type + 4 server_id + 4 event_size
/* event_ptr points to position of the next event */ /* event_ptr points to position of the next event */
encode_value(event_ptr, 0, 32); // Set next position to 0 encode_value(ptr, 0, 32); // Set next position to 0
/* /*
* Since we have changed the timestamp we must recalculate the CRC * Since we have changed the timestamp we must recalculate the CRC
@ -3152,14 +3361,14 @@ blr_slave_send_fde(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *fde)
* calculate a new checksum * calculate a new checksum
* and write it into the header * and write it into the header
*/ */
ptr = GWBUF_DATA(fde) + event_size - BINLOG_EVENT_CRC_SIZE; ptr = GWBUF_DATA(event) + MYSQL_HEADER_LEN + 1 + event_size - BINLOG_EVENT_CRC_SIZE;
chksum = crc32(0L, NULL, 0); chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, chksum = crc32(chksum,
GWBUF_DATA(fde), GWBUF_DATA(event) + MYSQL_HEADER_LEN + 1,
event_size - BINLOG_EVENT_CRC_SIZE); event_size - BINLOG_EVENT_CRC_SIZE);
encode_value(ptr, chksum, 32); encode_value(ptr, chksum, 32);
return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, head); return MXS_SESSION_ROUTE_REPLY(slave->dcb->session, event);
} }
@ -5903,9 +6112,11 @@ blr_send_slave_heartbeat(void *inst)
while (sptr) while (sptr)
{ {
/* skip servers with state = 0 */ /* skip servers with state = 0 */
if ( (sptr->state == BLRS_DUMPING) && (sptr->heartbeat > 0) && if ((sptr->state == BLRS_DUMPING) &&
((t_now + 1 - sptr->lastReply) >= sptr->heartbeat) ) (sptr->heartbeat > 0) &&
((t_now + 1 - sptr->lastReply) >= sptr->heartbeat))
{ {
MXS_NOTICE("Sending Heartbeat to slave server-id %d. " MXS_NOTICE("Sending Heartbeat to slave server-id %d. "
"Heartbeat interval is %d, last event time is %lu", "Heartbeat interval is %d, last event time is %lu",
@ -5919,9 +6130,6 @@ blr_send_slave_heartbeat(void *inst)
/* Set last time */ /* Set last time */
sptr->lastReply = t_now; sptr->lastReply = t_now;
} }
sptr->lastReply = t_now;
} }
sptr = sptr->next; sptr = sptr->next;
@ -6813,7 +7021,6 @@ static bool blr_slave_gtid_request(ROUTER_INSTANCE *router,
strcpy(slave->binlogfile, router_curr_file); strcpy(slave->binlogfile, router_curr_file);
slave->binlog_pos = 4; slave->binlog_pos = 4;
// TODO: Add prefix
MXS_INFO("Slave %d is registering with empty GTID:" MXS_INFO("Slave %d is registering with empty GTID:"
" sending events from current binlog file %s%s," " sending events from current binlog file %s%s,"
" pos %" PRIu32 "", " pos %" PRIu32 "",
@ -6877,9 +7084,10 @@ static bool blr_slave_gtid_request(ROUTER_INSTANCE *router,
snprintf(errmsg, snprintf(errmsg,
BINLOG_ERROR_MSG_LEN, BINLOG_ERROR_MSG_LEN,
"Requested MariaDB GTID '%s' by server %lu" "Requested MariaDB GTID '%s' by server %lu"
" has not been found", " not found. GTID_STRICT_MODE=%s",
slave->mariadb_gtid, slave->mariadb_gtid,
(unsigned long)slave->serverid); (unsigned long)slave->serverid,
slave->gtid_strict_mode ? "ON" : "OFF");
errmsg[BINLOG_ERROR_MSG_LEN] = '\0'; errmsg[BINLOG_ERROR_MSG_LEN] = '\0';
/* Check strict mode */ /* Check strict mode */
@ -6988,7 +7196,7 @@ static bool blr_slave_gtid_request(ROUTER_INSTANCE *router,
} }
} }
/* Set GTID details in f_info*/ /* Set GTID details in f_info */
memcpy(&slave->f_info, &f_gtid, sizeof(MARIADB_GTID_INFO)); memcpy(&slave->f_info, &f_gtid, sizeof(MARIADB_GTID_INFO));
} }
} }
@ -8201,14 +8409,15 @@ static void blr_slave_skip_empty_files(ROUTER_INSTANCE *router,
* Stop if the new file is the current binlog file. * Stop if the new file is the current binlog file.
*/ */
while (!blr_compare_binlogs(router, while (!blr_compare_binlogs(router,
f_tree, &f_tree->gtid_elms,
router_curr_file, router_curr_file,
binlog_file) && binlog_file) &&
blr_slave_get_file_size(file_path) <= 4 && blr_slave_get_file_size(file_path) <= 4 &&
blr_file_next_exists(router, slave, next_file)) blr_file_next_exists(router, slave, next_file))
{ {
// Log skipped file // Log skipped file
MXS_INFO("Slave %s:%i, skip reading empty file '%s' (4 bytes size).", MXS_INFO("Slave %s:%i, skip reading empty file '%s' "
"(0 or 4 bytes size).",
slave->dcb->remote, slave->dcb->remote,
dcb_get_port(slave->dcb), dcb_get_port(slave->dcb),
binlog_file); binlog_file);
@ -9471,3 +9680,140 @@ static bool blr_apply_changes(ROUTER_INSTANCE *router,
return ret; return ret;
} }
/**
* Saves a MARIADB_GTID_INFO data for later usage
*
* @param info The MARIADB_GTID_INFO data to copy
* @param save_info The MARIADB_GTID_INFO allocated
* buffer to save data
* @param save_prefix The allocated buffer where
* to save file prefix
*/
static void blr_slave_info_save(const MARIADB_GTID_INFO *info,
MARIADB_GTID_INFO *save_info,
char *save_prefix)
{
/* Save current file details */
memcpy(save_info, info, sizeof(MARIADB_GTID_INFO));
/* Fill save file prefix */
sprintf(save_prefix,
"%" PRIu32 "/%" PRIu32 "/",
save_info->gtid_elms.domain_id,
save_info->gtid_elms.server_id);
}
/**
* Log message for slave file End Of File
*
* @param router The current router instance
* @param slave The connected slave
* @param c_prefix The file prefix of slave file
* @param next_file The next file to read or fake rotate to
* @param log_action The action type to log
*/
static void blr_slave_log_next_file_action(const ROUTER_INSTANCE *router,
const ROUTER_SLAVE *slave,
const char *c_prefix,
const char *next_file,
slave_eof_action_t log_action)
{
char m_prefix[BINLOG_FILE_EXTRA_INFO] = "";
char r_prefix[BINLOG_FILE_EXTRA_INFO] = "";
bool s_tree = router->storage_type == BLR_BINLOG_STORAGE_TREE;
bool have_heartbeat = router->send_slave_heartbeat &&
(slave->heartbeat > 0);
spinlock_acquire(&router->binlog_lock);
if (s_tree)
{
/* Get master file prefix */
sprintf(m_prefix,
"%" PRIu32 "/%" PRIu32 "/",
router->mariadb10_gtid_domain,
router->orig_masterid);
/* Get rotating slave file prefix */
sprintf(r_prefix,
"%" PRIu32 "/%" PRIu32 "/",
slave->f_info.gtid_elms.domain_id,
slave->f_info.gtid_elms.server_id);
}
spinlock_release(&router->binlog_lock);
switch(log_action)
{
case SLAVE_EOF_ROTATE:
/* This has to be always logged */
MXS_WARNING("%s: Slave [%s]:%d, server-id %d reached end of file for binlog file [%s%s] "
"at %lu which is not the file currently being downloaded or last file found. "
"This may be caused by a previous failure of the master. "
"Current master binlog is [%s%s] at %lu, replication state is [%s]. "
"Now rotating to new file [%s%s]",
router->service->name,
slave->dcb->remote,
dcb_get_port(slave->dcb),
slave->serverid,
c_prefix,
slave->binlogfile,
(unsigned long)slave->binlog_pos,
m_prefix,
router->binlog_name[0] ? router->binlog_name : "no_set_yet",
router->binlog_position,
blrm_states[router->master_state],
r_prefix,
next_file);
break;
case SLAVE_EOF_ERROR:
/* Log error */
MXS_ERROR("%s: Slave [%s]:%d, server-id %d reached "
"end of file for '%s%s' and next file to read%s%s%s%s "
"is not %s. Force replication abort after %d retries.",
router->service->name,
slave->dcb->remote,
dcb_get_port(slave->dcb),
slave->serverid,
c_prefix,
slave->binlogfile,
next_file[0] ? " '" : "",
next_file[0] ? r_prefix : "",
next_file,
next_file[0] ? "'" : "",
next_file[0] ? "accessible" : "existent",
MISSING_FILE_READ_RETRIES);
break;
case SLAVE_EOF_WARNING:
/* We don't have the next_file, just warning */
MXS_WARNING("%s: Slave [%s]:%d, server-id %d reached end "
"of file for binlog file [%s%s] "
"at %lu. This is the last downloaded or "
"the last file found. "
"Next file%s%s%s%s is not %s. "
"This may be caused by a previous failure of "
"the master server. Current master binlog is "
"[%s%s] at %lu and replication state is [%s]. "
"The slave server is now in '%s' state.",
router->service->name,
slave->dcb->remote,
dcb_get_port(slave->dcb),
slave->serverid,
c_prefix,
slave->binlogfile,
(unsigned long)slave->binlog_pos,
next_file[0] ? " '" : "",
next_file[0] ? r_prefix : "",
next_file,
next_file[0] ? "'" : "",
next_file[0] ? "accessible" : "existent",
m_prefix,
router->binlog_name[0] ? router->binlog_name : "no_set_yet",
router->binlog_position,
blrm_states[router->master_state],
have_heartbeat ? "wait_state" : "read_again");
break;
default:
break;
}
}