Merge 2.1 in 2.2

Merge 2.1 in 2.2
This commit is contained in:
MassimilianoPinto
2017-11-29 11:30:33 +01:00
parent ba22e3b1db
commit 8a10b72e4d
8 changed files with 195 additions and 25 deletions

View File

@ -1980,10 +1980,12 @@ blr_read_events_all_events(ROUTER_INSTANCE *router,
router->current_pos = pos;
MXS_WARNING("an error has been found in %s. "
"Setting safe pos to %lu, current pos %lu",
"Setting safe pos to %lu, current pos %lu. "
"ErrMsg [%s]",
router->binlog_name,
router->binlog_position,
router->current_pos);
router->current_pos,
errmsg);
if (fix)
{

View File

@ -1298,7 +1298,12 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
}
}
/** Gather statistics about the replication event types */
/**
* Check Event Type limit:
* If supported, gather statistics about
* the replication event types
* else stop replication from master
*/
int event_limit = router->mariadb10_compat ?
MAX_EVENT_TYPE_MARIADB10 : MAX_EVENT_TYPE;
@ -1306,6 +1311,38 @@ blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
{
router->stats.events[hdr.event_type]++;
}
else
{
char errmsg[BINLOG_ERROR_MSG_LEN + 1];
sprintf(errmsg,
"Event type [%d] not supported yet. "
"Check master server configuration and "
"disable any new feature. "
"Replication from master has been stopped.",
hdr.event_type);
MXS_ERROR("%s", errmsg);
gwbuf_free(pkt);
pkt = NULL;
spinlock_acquire(&router->lock);
/* Handle error messages */
char* old_errmsg = router->m_errmsg;
router->m_errmsg = MXS_STRDUP_A(errmsg);
router->m_errno = 1235;
/* Set state to stopped */
router->master_state = BLRM_SLAVE_STOPPED;
router->stats.n_binlog_errors++;
spinlock_release(&router->lock);
MXS_FREE(old_errmsg);
/* Stop replication */
blr_master_close(router);
return;
}
/*

View File

@ -2691,6 +2691,30 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
ss_dassert(hdr.ok == SLAVE_POS_READ_OK);
/**
* Handle Heartbeat: don't check anything else
* set CS_WAIT_DATA and return
*/
if (slave->lastEventReceived == HEARTBEAT_EVENT)
{
#ifndef BLFILE_IN_SLAVE
blr_close_binlog(router, file);
#endif
spinlock_acquire(&router->binlog_lock);
spinlock_acquire(&slave->catch_lock);
/**
* Set the CS_WAIT_DATA that allows notification
* of new events after HEARTBEAT_EVENT
*/
slave->cstate |= CS_WAIT_DATA;
spinlock_release(&slave->catch_lock);
spinlock_release(&router->binlog_lock);
return 1;
}
/**
* Check now slave position with read indicator = SLAVE_POS_READ_OK
*
@ -5878,7 +5902,6 @@ blr_send_slave_heartbeat(void *inst)
while (sptr)
{
/* skip servers with state = 0 */
if ( (sptr->state == BLRS_DUMPING) && (sptr->heartbeat > 0) &&
((t_now + 1 - sptr->lastReply) >= sptr->heartbeat) )
@ -5888,7 +5911,13 @@ blr_send_slave_heartbeat(void *inst)
sptr->serverid, sptr->heartbeat,
(unsigned long)sptr->lastReply);
blr_slave_send_heartbeat(router, sptr);
if (blr_slave_send_heartbeat(router, sptr))
{
/* Set last event */
sptr->lastEventReceived = HEARTBEAT_EVENT;
/* Set last time */
sptr->lastReply = t_now;
}
sptr->lastReply = t_now;