Addition of backoff process for master reconnect

Housekeeper task display
This commit is contained in:
Mark Riddoch
2014-10-23 10:46:55 +01:00
parent 932fc5dc2c
commit 70672e43a1
9 changed files with 326 additions and 30 deletions

View File

@ -150,6 +150,7 @@ DCB *rval;
rval->low_water = 0; rval->low_water = 0;
rval->next = NULL; rval->next = NULL;
rval->callbacks = NULL; rval->callbacks = NULL;
rval->data = NULL;
rval->remote = NULL; rval->remote = NULL;
rval->user = NULL; rval->user = NULL;

View File

@ -24,11 +24,21 @@
/** /**
* @file housekeeper.c Provide a mechanism to run periodic tasks * @file housekeeper.c Provide a mechanism to run periodic tasks
* *
* The housekeeper provides a mechanism to allow for tasks, function
* calls basically, to be run on a tiem basis. A task may be run
* repeatedly, with a given frequency (in seconds), or may be a one
* shot task that will only be run once after a specified number of
* seconds.
*
* The housekeeper also maintains a global variable, hkheartbeat, that
* is incremented every 100ms.
*
* @verbatim * @verbatim
* Revision History * Revision History
* *
* Date Who Description * Date Who Description
* 29/08/14 Mark Riddoch Initial implementation * 29/08/14 Mark Riddoch Initial implementation
* 22/10/14 Mark Riddoch Addition of one-shot tasks
* *
* @endverbatim * @endverbatim
*/ */
@ -70,7 +80,7 @@ hkinit()
* @param taskfn The function to call for the task * @param taskfn The function to call for the task
* @param data Data to pass to the task function * @param data Data to pass to the task function
* @param frequency How often to run the task, expressed in seconds * @param frequency How often to run the task, expressed in seconds
* @return Return the tiem in seconds when the task will be first run if the task was added, otherwise 0 * @return Return the time in seconds when the task will be first run if the task was added, otherwise 0
*/ */
int int
hktask_add(char *name, void (*taskfn)(void *), void *data, int frequency) hktask_add(char *name, void (*taskfn)(void *), void *data, int frequency)
@ -89,6 +99,7 @@ HKTASK *task, *ptr;
task->task = taskfn; task->task = taskfn;
task->data = data; task->data = data;
task->frequency = frequency; task->frequency = frequency;
task->type = HK_REPEATED;
task->nextdue = time(0) + frequency; task->nextdue = time(0) + frequency;
task->next = NULL; task->next = NULL;
spinlock_acquire(&tasklock); spinlock_acquire(&tasklock);
@ -113,6 +124,61 @@ HKTASK *task, *ptr;
return task->nextdue; return task->nextdue;
} }
/**
* Add a one-shot task to the housekeeper task list
*
* Task names must be unique.
*
* @param name The unique name for this housekeeper task
* @param taskfn The function to call for the task
* @param data Data to pass to the task function
* @param when How many second until the task is executed
* @return Return the time in seconds when the task will be first run if the task was added, otherwise 0
*
*/
int
hktask_oneshot(char *name, void (*taskfn)(void *), void *data, int when)
{
HKTASK *task, *ptr;
if ((task = (HKTASK *)malloc(sizeof(HKTASK))) == NULL)
{
return 0;
}
if ((task->name = strdup(name)) == NULL)
{
free(task);
return 0;
}
task->task = taskfn;
task->data = data;
task->frequency = 0;
task->type = HK_ONESHOT;
task->nextdue = time(0) + when;
task->next = NULL;
spinlock_acquire(&tasklock);
ptr = tasks;
while (ptr && ptr->next)
{
if (strcmp(ptr->name, name) == 0)
{
spinlock_release(&tasklock);
free(task->name);
free(task);
return 0;
}
ptr = ptr->next;
}
if (ptr)
ptr->next = task;
else
tasks = task;
spinlock_release(&tasklock);
return task->nextdue;
}
/** /**
* Remove a named task from the housekeepers task list * Remove a named task from the housekeepers task list
* *
@ -195,6 +261,8 @@ int i;
taskdata = ptr->data; taskdata = ptr->data;
spinlock_release(&tasklock); spinlock_release(&tasklock);
(*taskfn)(taskdata); (*taskfn)(taskdata);
if (ptr->type == HK_ONESHOT)
hktask_remove(ptr->name);
spinlock_acquire(&tasklock); spinlock_acquire(&tasklock);
ptr = tasks; ptr = tasks;
} }
@ -214,3 +282,32 @@ hkshutdown()
{ {
do_shutdown = 1; do_shutdown = 1;
} }
/**
* Show the tasks that are scheduled for the house keeper
*
* @param pdcb The DCB to send to output
*/
void
hkshow_tasks(DCB *pdcb)
{
HKTASK *ptr;
struct tm tm;
char buf[40];
dcb_printf(pdcb, "%-25s | Type | Frequency | Next Due\n", "Name");
dcb_printf(pdcb, "--------------------------+----------+-----------+-----------------------\n");
spinlock_acquire(&tasklock);
ptr = tasks;
while (ptr)
{
localtime_r(&ptr->nextdue, &tm);
asctime_r(&tm, buf);
dcb_printf(pdcb, "%-25s | %-8s | %-9d | %s",
ptr->name,
ptr->type == HK_REPEATED ? "Repeated" : "One-Shot",
ptr->frequency,
buf);
ptr = ptr->next;
}
}

View File

@ -147,6 +147,8 @@ static struct {
int n_hup; /*< Number of hangup events */ int n_hup; /*< Number of hangup events */
int n_accept; /*< Number of accept events */ int n_accept; /*< Number of accept events */
int n_polls; /*< Number of poll cycles */ int n_polls; /*< Number of poll cycles */
int n_pollev; /*< Number of polls returnign events */
int n_nbpollev; /*< Number of polls returnign events */
int n_nothreads; /*< Number of times no threads are polling */ int n_nothreads; /*< Number of times no threads are polling */
int n_fds[MAXNFDS]; /*< Number of wakeups with particular int n_fds[MAXNFDS]; /*< Number of wakeups with particular
n_fds value */ n_fds value */
@ -446,6 +448,7 @@ int poll_spins = 0;
thread_data[thread_id].state = THREAD_POLLING; thread_data[thread_id].state = THREAD_POLLING;
} }
atomic_add(&pollStats.n_polls, 1);
if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1) if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1)
{ {
atomic_add(&n_waiting, -1); atomic_add(&n_waiting, -1);
@ -462,7 +465,7 @@ int poll_spins = 0;
} }
/* /*
* If there are no new descriptors from the non-blocking call * If there are no new descriptors from the non-blocking call
* and nothing to proces on the event queue then for do a * and nothing to process on the event queue then for do a
* blocking call to epoll_wait. * blocking call to epoll_wait.
* *
* We calculate a timeout bias to alter the length of the blocking * We calculate a timeout bias to alter the length of the blocking
@ -495,13 +498,15 @@ int poll_spins = 0;
if (nfds > 0) if (nfds > 0)
{ {
timeout_bias = 1; timeout_bias = 1;
if (poll_spins <= number_poll_spins + 1)
atomic_add(&pollStats.n_nbpollev, 1);
poll_spins = 0; poll_spins = 0;
LOGIF(LD, (skygw_log_write( LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG, LOGFILE_DEBUG,
"%lu [poll_waitevents] epoll_wait found %d fds", "%lu [poll_waitevents] epoll_wait found %d fds",
pthread_self(), pthread_self(),
nfds))); nfds)));
atomic_add(&pollStats.n_polls, 1); atomic_add(&pollStats.n_pollev, 1);
if (thread_data) if (thread_data)
{ {
thread_data[thread_id].n_fds = nfds; thread_data[thread_id].n_fds = nfds;
@ -1005,29 +1010,33 @@ dprintPollStats(DCB *dcb)
{ {
int i; int i;
dcb_printf(dcb, "Number of epoll cycles: %d\n", dcb_printf(dcb, "No. of epoll cycles: %d\n",
pollStats.n_polls); pollStats.n_polls);
dcb_printf(dcb, "Number of epoll cycles with wait: %d\n", dcb_printf(dcb, "No. of epoll cycles with wait: %d\n",
pollStats.blockingpolls); pollStats.blockingpolls);
dcb_printf(dcb, "Number of read events: %d\n", dcb_printf(dcb, "No. of epoll calls returning events: %d\n",
pollStats.n_pollev);
dcb_printf(dcb, "No. of non-blocking calls returning events: %d\n",
pollStats.n_nbpollev);
dcb_printf(dcb, "No. of read events: %d\n",
pollStats.n_read); pollStats.n_read);
dcb_printf(dcb, "Number of write events: %d\n", dcb_printf(dcb, "No. of write events: %d\n",
pollStats.n_write); pollStats.n_write);
dcb_printf(dcb, "Number of error events: %d\n", dcb_printf(dcb, "No. of error events: %d\n",
pollStats.n_error); pollStats.n_error);
dcb_printf(dcb, "Number of hangup events: %d\n", dcb_printf(dcb, "No. of hangup events: %d\n",
pollStats.n_hup); pollStats.n_hup);
dcb_printf(dcb, "Number of accept events: %d\n", dcb_printf(dcb, "No. of accept events: %d\n",
pollStats.n_accept); pollStats.n_accept);
dcb_printf(dcb, "Number of times no threads polling: %d\n", dcb_printf(dcb, "No. of times no threads polling: %d\n",
pollStats.n_nothreads); pollStats.n_nothreads);
dcb_printf(dcb, "Current event queue length: %d\n", dcb_printf(dcb, "Current event queue length: %d\n",
pollStats.evq_length); pollStats.evq_length);
dcb_printf(dcb, "Maximum event queue length: %d\n", dcb_printf(dcb, "Maximum event queue length: %d\n",
pollStats.evq_max); pollStats.evq_max);
dcb_printf(dcb, "Number of DCBs with pending events: %d\n", dcb_printf(dcb, "No. of DCBs with pending events: %d\n",
pollStats.evq_pending); pollStats.evq_pending);
dcb_printf(dcb, "Number of wakeups with pending queue: %d\n", dcb_printf(dcb, "No. of wakeups with pending queue: %d\n",
pollStats.wake_evqpending); pollStats.wake_evqpending);
dcb_printf(dcb, "No of poll completions with descriptors\n"); dcb_printf(dcb, "No of poll completions with descriptors\n");

View File

@ -18,6 +18,7 @@
* Copyright SkySQL Ab 2014 * Copyright SkySQL Ab 2014
*/ */
#include <time.h> #include <time.h>
#include <dcb.h>
/** /**
* @file housekeeper.h A mechanism to have task run periodically * @file housekeeper.h A mechanism to have task run periodically
@ -31,6 +32,11 @@
* @endverbatim * @endverbatim
*/ */
typedef enum {
HK_REPEATED = 1,
HK_ONESHOT
} HKTASK_TYPE;
/** /**
* The housekeeper task list * The housekeeper task list
*/ */
@ -40,12 +46,22 @@ typedef struct hktask {
void *data; /*< Data to pass the task */ void *data; /*< Data to pass the task */
int frequency; /*< How often to call the tasks (seconds) */ int frequency; /*< How often to call the tasks (seconds) */
time_t nextdue; /*< When the task should be next run */ time_t nextdue; /*< When the task should be next run */
HKTASK_TYPE
type; /*< The task type */
struct hktask struct hktask
*next; /*< Next task in the list */ *next; /*< Next task in the list */
} HKTASK; } HKTASK;
/**
* The global housekeeper heartbeat value. This value is increamente
* every 100ms and may be used for crude timing etc.
*/
extern unsigned long hkheartbeat;
extern void hkinit(); extern void hkinit();
extern int hktask_add(char *name, void (*task)(void *), void *data, int frequency); extern int hktask_add(char *name, void (*task)(void *), void *data, int frequency);
extern int hktask_oneshot(char *name, void (*task)(void *), void *data, int when);
extern int hktask_remove(char *name); extern int hktask_remove(char *name);
extern void hkshutdown(); extern void hkshutdown();
extern void hkshow_tasks(DCB *pdcb);
#endif #endif

View File

@ -57,7 +57,16 @@
*/ */
#define DEF_SHORT_BURST 15 #define DEF_SHORT_BURST 15
#define DEF_LONG_BURST 500 #define DEF_LONG_BURST 500
#define DEF_BURST_SIZE 1024000 /* 1 Mb */
/**
* master reconnect backoff constants
* BLR_MASTER_BACKOFF_TIME The increments of the back off time (seconds)
* BLR_MAX_BACKOFF Maximum number of increments to backoff to
*/
#define BLR_MASTER_BACKOFF_TIME 5
#define BLR_MAX_BACKOFF 60
/** /**
* Some useful macros for examining the MySQL Response packets * Some useful macros for examining the MySQL Response packets
*/ */
@ -255,9 +264,11 @@ typedef struct router_instance {
unsigned int high_water; /*< High water mark for client DCB */ unsigned int high_water; /*< High water mark for client DCB */
unsigned int short_burst; /*< Short burst for slave catchup */ unsigned int short_burst; /*< Short burst for slave catchup */
unsigned int long_burst; /*< Long burst for slave catchup */ unsigned int long_burst; /*< Long burst for slave catchup */
unsigned long burst_size; /*< Maximum size of burst to send */
ROUTER_STATS stats; /*< Statistics for this router */ ROUTER_STATS stats; /*< Statistics for this router */
int active_logs; int active_logs;
int reconnect_pending; int reconnect_pending;
int retry_backoff;
int handling_threads; int handling_threads;
struct router_instance struct router_instance
*next; *next;

View File

@ -39,6 +39,7 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <ctype.h>
#include <time.h> #include <time.h>
#include <service.h> #include <service.h>
#include <server.h> #include <server.h>
@ -160,7 +161,7 @@ static ROUTER *
createInstance(SERVICE *service, char **options) createInstance(SERVICE *service, char **options)
{ {
ROUTER_INSTANCE *inst; ROUTER_INSTANCE *inst;
char *value; char *value, *name;
int i; int i;
if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
@ -183,6 +184,8 @@ int i;
inst->initbinlog = 0; inst->initbinlog = 0;
inst->short_burst = DEF_SHORT_BURST; inst->short_burst = DEF_SHORT_BURST;
inst->long_burst = DEF_LONG_BURST; inst->long_burst = DEF_LONG_BURST;
inst->burst_size = DEF_BURST_SIZE;
inst->retry_backoff = 1;
/* /*
* We only support one server behind this router, since the server is * We only support one server behind this router, since the server is
@ -279,6 +282,30 @@ int i;
{ {
inst->long_burst = atoi(value); inst->long_burst = atoi(value);
} }
else if (strcmp(options[i], "burstsize") == 0)
{
unsigned long size = atoi(value);
char *ptr = value;
while (*ptr && isdigit(*ptr))
ptr++;
switch (*ptr)
{
case 'G':
case 'g':
size = size * 1024 * 1000 * 1000;
break;
case 'M':
case 'm':
size = size * 1024 * 1000;
break;
case 'K':
case 'k':
size = size * 1024;
break;
}
inst->burst_size = size;
}
else else
{ {
LOGIF(LE, (skygw_log_write( LOGIF(LE, (skygw_log_write(
@ -325,7 +352,11 @@ int i;
*/ */
blr_init_cache(inst); blr_init_cache(inst);
hktask_add("Binlog Router", stats_func, inst, BLR_STATS_FREQ); if ((name = (char *)malloc(80)) != NULL)
{
sprintf(name, "%s stats", service->name);
hktask_add(name, stats_func, inst, BLR_STATS_FREQ);
}
/* /*
* Now start the replication from the master to MaxScale * Now start the replication from the master to MaxScale
@ -822,9 +853,9 @@ char msg[85];
strcpy(msg, ""); strcpy(msg, "");
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, "Erorr Reply '%s', %sattempting reconnect to master", LOGFILE_ERROR, "Master connection '%s', %sattempting reconnect to master",
message, msg))); message, msg)));
*succp = false; *succp = true;
blr_master_reconnect(router); blr_master_reconnect(router);
} }

View File

@ -47,6 +47,7 @@
#include <blr.h> #include <blr.h>
#include <dcb.h> #include <dcb.h>
#include <spinlock.h> #include <spinlock.h>
#include <housekeeper.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
@ -107,6 +108,12 @@ GWBUF *buf;
client->session = router->session; client->session = router->session;
if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL) if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL)
{ {
char *name = malloc(strlen(router->service->name) + strlen(" Master") + 1);
sprintf(name, "%s Master", router->service->name);
hktask_oneshot(name, blr_start_master, router,
BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
if (router->retry_backoff > BLR_MAX_BACKOFF)
router->retry_backoff = 1;
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Binlog router: failed to connect to master server '%s'", "Binlog router: failed to connect to master server '%s'",
router->service->databases->unique_name))); router->service->databases->unique_name)));
@ -122,6 +129,7 @@ perror("setsockopt");
router->master_state = BLRM_TIMESTAMP; router->master_state = BLRM_TIMESTAMP;
router->stats.n_masterstarts++; router->stats.n_masterstarts++;
router->retry_backoff = 1;
} }
/** /**
@ -137,10 +145,7 @@ blr_restart_master(ROUTER_INSTANCE *router)
{ {
GWBUF *ptr; GWBUF *ptr;
dcb_close(router->master);
dcb_close(router->client); dcb_close(router->client);
dcb_free(router->master);
dcb_free(router->client);
/* Discard the queued residual data */ /* Discard the queued residual data */
ptr = router->residual; ptr = router->residual;
@ -696,9 +701,10 @@ static REP_HEADER phdr;
} }
else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F) else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F)
{ {
router->rotating = 1;
ptr = ptr + 5; // We don't put the first byte of the payload ptr = ptr + 5; // We don't put the first byte of the payload
// into the binlog file // into the binlog file
if (hdr.event_type == ROTATE_EVENT)
router->rotating = 1;
blr_write_binlog_record(router, &hdr, ptr); blr_write_binlog_record(router, &hdr, ptr);
if (hdr.event_type == ROTATE_EVENT) if (hdr.event_type == ROTATE_EVENT)
{ {
@ -719,10 +725,10 @@ static REP_HEADER phdr;
hdr.event_size, hdr.event_size,
router->binlog_name, router->binlog_name,
router->binlog_position))); router->binlog_position)));
router->rotating = 1;
ptr += 5; ptr += 5;
if (hdr.event_type == ROTATE_EVENT) if (hdr.event_type == ROTATE_EVENT)
{ {
router->rotating = 1;
blr_rotate_event(router, ptr, &hdr); blr_rotate_event(router, ptr, &hdr);
} }
} }
@ -800,7 +806,7 @@ blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr)
* Extract a numeric field from a packet of the specified number of bits * Extract a numeric field from a packet of the specified number of bits
* *
* @param src The raw packet source * @param src The raw packet source
* @param birs The number of bits to extract (multiple of 8) * @param bits The number of bits to extract (multiple of 8)
*/ */
inline uint32_t inline uint32_t
extract_field(register uint8_t *src, int bits) extract_field(register uint8_t *src, int bits)

View File

@ -47,6 +47,7 @@
#include <blr.h> #include <blr.h>
#include <dcb.h> #include <dcb.h>
#include <spinlock.h> #include <spinlock.h>
#include <housekeeper.h>
#include <skygw_types.h> #include <skygw_types.h>
#include <skygw_utils.h> #include <skygw_utils.h>
@ -63,6 +64,7 @@ static int blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, G
int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large); int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr); uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data); int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
extern int lm_enabled_logfiles_bitmask; extern int lm_enabled_logfiles_bitmask;
@ -501,6 +503,7 @@ uint32_t chksum;
slave->seqno = 1; slave->seqno = 1;
if (slave->nocrc) if (slave->nocrc)
len = 19 + 8 + binlognamelen; len = 19 + 8 + binlognamelen;
else else
@ -694,12 +697,15 @@ blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
GWBUF *head, *record; GWBUF *head, *record;
REP_HEADER hdr; REP_HEADER hdr;
int written, rval = 1, burst; int written, rval = 1, burst;
int rotating;
unsigned long burst_size;
uint8_t *ptr; uint8_t *ptr;
if (large) if (large)
burst = router->long_burst; burst = router->long_burst;
else else
burst = router->short_burst; burst = router->short_burst;
burst_size = router->burst_size;
spinlock_acquire(&slave->catch_lock); spinlock_acquire(&slave->catch_lock);
if (slave->cstate & CS_BUSY) if (slave->cstate & CS_BUSY)
{ {
@ -711,19 +717,30 @@ uint8_t *ptr;
if (slave->file == NULL) if (slave->file == NULL)
{ {
rotating = router->rotating;
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL) if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
{ {
if (rotating)
{
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
slave->cstate &= ~CS_BUSY;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
return rval;
}
LOGIF(LE, (skygw_log_write( LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s", "blr_slave_catchup failed to open binlog file %s",
slave->binlogfile))); slave->binlogfile)));
slave->cstate &= ~CS_BUSY; slave->cstate &= ~CS_BUSY;
slave->state = BLRS_ERRORED; slave->state = BLRS_ERRORED;
dcb_close(slave->dcb);
return 0; return 0;
} }
} }
slave->stats.n_bursts++; slave->stats.n_bursts++;
while (burst-- && while (burst-- && burst_size > 0 &&
(record = blr_read_binlog(router, slave->file, slave->binlog_pos, &hdr)) != NULL) (record = blr_read_binlog(router, slave->file, slave->binlog_pos, &hdr)) != NULL)
{ {
head = gwbuf_alloc(5); head = gwbuf_alloc(5);
@ -735,17 +752,35 @@ uint8_t *ptr;
head = gwbuf_append(head, record); head = gwbuf_append(head, record);
if (hdr.event_type == ROTATE_EVENT) if (hdr.event_type == ROTATE_EVENT)
{ {
unsigned long beat1 = hkheartbeat;
blr_close_binlog(router, slave->file); blr_close_binlog(router, slave->file);
if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "blr_close_binlog took %d beats",
hkheartbeat - beat1)));
blr_slave_rotate(slave, GWBUF_DATA(record)); blr_slave_rotate(slave, GWBUF_DATA(record));
beat1 = hkheartbeat;
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL) if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
{ {
if (rotating)
{
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
slave->cstate &= ~CS_BUSY;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
return rval;
}
LOGIF(LE, (skygw_log_write( LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s", "blr_slave_catchup failed to open binlog file %s",
slave->binlogfile))); slave->binlogfile)));
slave->state = BLRS_ERRORED; slave->state = BLRS_ERRORED;
dcb_close(slave->dcb);
break; break;
} }
if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "blr_open_binlog took %d beats",
hkheartbeat - beat1)));
} }
written = slave->dcb->func.write(slave->dcb, head); written = slave->dcb->func.write(slave->dcb, head);
if (written && hdr.event_type != ROTATE_EVENT) if (written && hdr.event_type != ROTATE_EVENT)
@ -754,6 +789,7 @@ uint8_t *ptr;
} }
rval = written; rval = written;
slave->stats.n_events++; slave->stats.n_events++;
burst_size -= hdr.event_size;
} }
if (record == NULL) if (record == NULL)
slave->stats.n_failed_read++; slave->stats.n_failed_read++;
@ -785,6 +821,8 @@ uint8_t *ptr;
{ {
slave->cstate &= ~CS_UPTODATE; slave->cstate &= ~CS_UPTODATE;
slave->cstate |= CS_EXPECTCB; slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
spinlock_release(&router->binlog_lock);
poll_fake_write_event(slave->dcb); poll_fake_write_event(slave->dcb);
} }
else else
@ -793,11 +831,11 @@ uint8_t *ptr;
{ {
slave->stats.n_upd++; slave->stats.n_upd++;
slave->cstate |= CS_UPTODATE; slave->cstate |= CS_UPTODATE;
spinlock_release(&slave->catch_lock);
spinlock_release(&router->binlog_lock);
state_change = 1; state_change = 1;
} }
} }
spinlock_release(&slave->catch_lock);
spinlock_release(&router->binlog_lock);
if (state_change) if (state_change)
{ {
@ -819,7 +857,7 @@ uint8_t *ptr;
* binlog file. * binlog file.
* *
* Note if the master is rotating there is a window during * Note if the master is rotating there is a window during
* whch the rotate event has been written to the old binlog * which the rotate event has been written to the old binlog
* but the new binlog file has not yet been created. Therefore * but the new binlog file has not yet been created. Therefore
* we ignore these issues during the rotate processing. * we ignore these issues during the rotate processing.
*/ */
@ -829,7 +867,18 @@ uint8_t *ptr;
"Master binlog is %s, %lu.", "Master binlog is %s, %lu.",
slave->binlogfile, slave->binlog_pos, slave->binlogfile, slave->binlog_pos,
router->binlog_name, router->binlog_position))); router->binlog_name, router->binlog_position)));
slave->state = BLRS_ERRORED; if (blr_slave_fake_rotate(router, slave))
{
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
}
else
{
slave->state = BLRS_ERRORED;
dcb_close(slave->dcb);
}
} }
else else
{ {
@ -911,3 +960,74 @@ int len = EXTRACT24(ptr + 9); // Extract the event length
memcpy(slave->binlogfile, ptr + 8, len); memcpy(slave->binlogfile, ptr + 8, len);
slave->binlogfile[len] = 0; slave->binlogfile[len] = 0;
} }
/**
* Generate an internal rotate event that we can use to cause the slave to move beyond
* a binlog file that is misisng the rotate eent at the end.
*
* @param router The router instance
* @param slave The slave to rotate
* @return Non-zero if the rotate took place
*/
static int
blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
{
char *sptr;
int filenum;
GWBUF *resp;
uint8_t *ptr;
int len, binlognamelen;
REP_HEADER hdr;
uint32_t chksum;
if ((sptr = strrchr(slave->binlogfile, '.')) == NULL)
return 0;
blr_close_binlog(router, slave->file);
filenum = atoi(sptr + 1);
sprintf(slave->binlogfile, BINLOG_NAMEFMT, router->fileroot, filenum + 1);
slave->binlog_pos = 4;
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
return 0;
binlognamelen = strlen(slave->binlogfile);
if (slave->nocrc)
len = 19 + 8 + binlognamelen;
else
len = 19 + 8 + 4 + binlognamelen;
// Build a fake rotate event
resp = gwbuf_alloc(len + 5);
hdr.payload_len = len + 1;
hdr.seqno = slave->seqno++;
hdr.ok = 0;
hdr.timestamp = 0L;
hdr.event_type = ROTATE_EVENT;
hdr.serverid = router->masterid;
hdr.event_size = len;
hdr.next_pos = 0;
hdr.flags = 0x20;
ptr = blr_build_header(resp, &hdr);
encode_value(ptr, slave->binlog_pos, 64);
ptr += 8;
memcpy(ptr, slave->binlogfile, binlognamelen);
ptr += binlognamelen;
if (!slave->nocrc)
{
/*
* Now add the CRC to the fake binlog rotate event.
*
* The algorithm is first to compute the checksum of an empty buffer
* and then the checksum of the event portion of the message, ie we do not
* include the length, sequence number and ok byte that makes up the first
* 5 bytes of the message. We also do not include the 4 byte checksum itself.
*/
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, GWBUF_DATA(resp) + 5, hdr.event_size - 4);
encode_value(ptr, chksum, 32);
}
slave->dcb->func.write(slave->dcb, resp);
return 1;
}

View File

@ -67,6 +67,7 @@
#include <monitor.h> #include <monitor.h>
#include <debugcli.h> #include <debugcli.h>
#include <poll.h> #include <poll.h>
#include <housekeeper.h>
#include <skygw_utils.h> #include <skygw_utils.h>
#include <log_manager.h> #include <log_manager.h>
@ -167,6 +168,10 @@ struct subcommand showoptions[] = {
"Show all active sessions in MaxScale", "Show all active sessions in MaxScale",
"Show all active sessions in MaxScale", "Show all active sessions in MaxScale",
{0, 0, 0} }, {0, 0, 0} },
{ "tasks", 0, hkshow_tasks,
"Show all active housekeeper tasks in MaxScale",
"Show all active housekeeper tasks in MaxScale",
{0, 0, 0} },
{ "threads", 0, dShowThreads, { "threads", 0, dShowThreads,
"Show the status of the polling threads in MaxScale", "Show the status of the polling threads in MaxScale",
"Show the status of the polling threads in MaxScale", "Show the status of the polling threads in MaxScale",