Reconnect to master on error
Refine locking in blr_slave_catchup and add tracing
This commit is contained in:
		@ -797,9 +797,13 @@ ROUTER_INSTANCE	*router = (ROUTER_INSTANCE *)instance;
 | 
				
			|||||||
static  void
 | 
					static  void
 | 
				
			||||||
errorReply(ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_dcb, error_action_t action, bool *succp)
 | 
					errorReply(ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_dcb, error_action_t action, bool *succp)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
 | 
					ROUTER_INSTANCE	*router = (ROUTER_INSTANCE *)instance;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
       	LOGIF(LE, (skygw_log_write_flush(
 | 
					       	LOGIF(LE, (skygw_log_write_flush(
 | 
				
			||||||
		LOGFILE_ERROR, "Erorr Reply '%s'", message)));
 | 
							LOGFILE_ERROR, "Erorr Reply '%s', attempting reconnect to master",
 | 
				
			||||||
 | 
								message)));
 | 
				
			||||||
	*succp = false;
 | 
						*succp = false;
 | 
				
			||||||
 | 
						blr_master_reconnect(router);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/** to be inline'd */
 | 
					/** to be inline'd */
 | 
				
			||||||
 | 
				
			|||||||
@ -674,9 +674,9 @@ uint8_t	*ptr;
 | 
				
			|||||||
 * needs to put the slave into catchup mode. This prevents the slave taking
 | 
					 * needs to put the slave into catchup mode. This prevents the slave taking
 | 
				
			||||||
 * too much tiem away from the thread that is processing the master events.
 | 
					 * too much tiem away from the thread that is processing the master events.
 | 
				
			||||||
 *
 | 
					 *
 | 
				
			||||||
 * At the end ofthe burst a fake EPOLLOUT event is added to the poll event
 | 
					 * At the end of the burst a fake EPOLLOUT event is added to the poll event
 | 
				
			||||||
 * queue. This ensures that the slave callback for processing DCB write drain
 | 
					 * queue. This ensures that the slave callback for processing DCB write drain
 | 
				
			||||||
 * will be called and future catchup requests will be handle on another thread.
 | 
					 * will be called and future catchup requests will be handled on another thread.
 | 
				
			||||||
 *
 | 
					 *
 | 
				
			||||||
 * @param	router		The binlog router
 | 
					 * @param	router		The binlog router
 | 
				
			||||||
 * @param	slave		The slave that is behind
 | 
					 * @param	slave		The slave that is behind
 | 
				
			||||||
@ -692,6 +692,10 @@ int		written, fd, rval = 1, burst;
 | 
				
			|||||||
uint8_t		*ptr;
 | 
					uint8_t		*ptr;
 | 
				
			||||||
struct timespec	req;
 | 
					struct timespec	req;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					extern unsigned long	hkheartbeat;
 | 
				
			||||||
 | 
					unsigned long		beat;
 | 
				
			||||||
 | 
						beat = hkheartbeat;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (large)
 | 
						if (large)
 | 
				
			||||||
		burst = router->long_burst;
 | 
							burst = router->long_burst;
 | 
				
			||||||
	else
 | 
						else
 | 
				
			||||||
@ -705,8 +709,19 @@ struct timespec	req;
 | 
				
			|||||||
		nanosleep(&req, NULL);
 | 
							nanosleep(&req, NULL);
 | 
				
			||||||
		spinlock_acquire(&slave->catch_lock);
 | 
							spinlock_acquire(&slave->catch_lock);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	slave->cstate |= (CS_HOLD|CS_BUSY);
 | 
						if (slave->cstate & CS_BUSY)
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							spinlock_release(&slave->catch_lock);
 | 
				
			||||||
 | 
					if (hkheartbeat - beat > 5) LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
 | 
				
			||||||
 | 
						"Long wait in blr_salve_catchup %ld00ms with %s burst, return without write records.\n",
 | 
				
			||||||
 | 
					hkheartbeat - beat, large ? "long" : "short")));
 | 
				
			||||||
 | 
							return 0;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						slave->cstate |= CS_BUSY;
 | 
				
			||||||
	spinlock_release(&slave->catch_lock);
 | 
						spinlock_release(&slave->catch_lock);
 | 
				
			||||||
 | 
					if (hkheartbeat - beat > 5) LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
 | 
				
			||||||
 | 
						"Long wait in blr_salve_catchup %ld00ms with %s burst.\n",
 | 
				
			||||||
 | 
					hkheartbeat - beat, large ? "long" : "short")));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
 | 
						if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
 | 
				
			||||||
	{
 | 
						{
 | 
				
			||||||
@ -743,9 +758,6 @@ struct timespec	req;
 | 
				
			|||||||
				break;
 | 
									break;
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		spinlock_acquire(&slave->catch_lock);
 | 
					 | 
				
			||||||
		slave->cstate |= CS_HOLD;
 | 
					 | 
				
			||||||
		spinlock_release(&slave->catch_lock);
 | 
					 | 
				
			||||||
		written = slave->dcb->func.write(slave->dcb, head);
 | 
							written = slave->dcb->func.write(slave->dcb, head);
 | 
				
			||||||
		if (written && hdr.event_type != ROTATE_EVENT)
 | 
							if (written && hdr.event_type != ROTATE_EVENT)
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
@ -753,6 +765,9 @@ struct timespec	req;
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
		rval = written;
 | 
							rval = written;
 | 
				
			||||||
		slave->stats.n_events++;
 | 
							slave->stats.n_events++;
 | 
				
			||||||
 | 
							spinlock_acquire(&slave->catch_lock);
 | 
				
			||||||
 | 
							slave->cstate |= CS_HOLD;
 | 
				
			||||||
 | 
							spinlock_release(&slave->catch_lock);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if (record == NULL)
 | 
						if (record == NULL)
 | 
				
			||||||
		slave->stats.n_failed_read++;
 | 
							slave->stats.n_failed_read++;
 | 
				
			||||||
@ -762,13 +777,13 @@ struct timespec	req;
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	if (fd != -1)
 | 
						if (fd != -1)
 | 
				
			||||||
		close(fd);
 | 
							close(fd);
 | 
				
			||||||
	poll_fake_write_event(slave->dcb);
 | 
					 | 
				
			||||||
	if (record)
 | 
						if (record)
 | 
				
			||||||
	{
 | 
						{
 | 
				
			||||||
		slave->stats.n_flows++;
 | 
							slave->stats.n_flows++;
 | 
				
			||||||
		spinlock_acquire(&slave->catch_lock);
 | 
							spinlock_acquire(&slave->catch_lock);
 | 
				
			||||||
		slave->cstate |= CS_EXPECTCB;
 | 
							slave->cstate |= CS_EXPECTCB;
 | 
				
			||||||
		spinlock_release(&slave->catch_lock);
 | 
							spinlock_release(&slave->catch_lock);
 | 
				
			||||||
 | 
							poll_fake_write_event(slave->dcb);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	else
 | 
						else
 | 
				
			||||||
	{
 | 
						{
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user