Merge branch 'blr' into develop

Conflicts:
	client/maxadmin.c
	server/core/CMakeLists.txt
	server/core/dcb.c
	server/core/gateway.c
	server/core/poll.c
	server/core/test/CMakeLists.txt
	server/core/test/makefile
	server/include/poll.h
	server/modules/routing/debugcmd.c
This commit is contained in:
Mark Riddoch
2014-11-19 12:00:55 +00:00
84 changed files with 2452 additions and 549 deletions

View File

@ -33,19 +33,40 @@
#include <buffer.h>
#include <pthread.h>
#include <memlog.h>
#define BINLOG_FNAMELEN 16
#define BLR_PROTOCOL "MySQLBackend"
#define BINLOG_MAGIC { 0xfe, 0x62, 0x69, 0x6e }
#define BINLOG_NAMEFMT "%s.%06d"
#define BINLOG_NAME_ROOT "mysql-bin"
/* How often to call the binlog status function (seconds) */
#define BLR_STATS_FREQ 60
#define BLR_NSTATS_MINUTES 30
/**
* High and Low water marks for the slave dcb. These values can be overriden
* by the router options highwater and lowwater.
*/
#define DEF_LOW_WATER 2000
#define DEF_HIGH_WATER 30000
#define DEF_LOW_WATER 1000
#define DEF_HIGH_WATER 10000
/**
* Default burst sizes for slave catchup
*/
#define DEF_SHORT_BURST 15
#define DEF_LONG_BURST 500
#define DEF_BURST_SIZE 1024000 /* 1 Mb */
/**
* master reconnect backoff constants
* BLR_MASTER_BACKOFF_TIME The increments of the back off time (seconds)
* BLR_MAX_BACKOFF Maximum number of increments to backoff to
*/
#define BLR_MASTER_BACKOFF_TIME 5
#define BLR_MAX_BACKOFF 60
/**
* Some useful macros for examining the MySQL Response packets
*/
@ -56,24 +77,71 @@
#define MYSQL_ERROR_MSG(buf) ((uint8_t *)GWBUF_DATA(buf) + 6)
#define MYSQL_COMMAND(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4))
/**
* Packet header for replication messages
*/
typedef struct rep_header {
int payload_len; /*< Payload length (24 bits) */
uint8_t seqno; /*< Response sequence number */
uint8_t ok; /*< OK Byte from packet */
uint32_t timestamp; /*< Timestamp - start of binlog record */
uint8_t event_type; /*< Binlog event type */
uint32_t serverid; /*< Server id of master */
uint32_t event_size; /*< Size of header, post-header and body */
uint32_t next_pos; /*< Position of next event */
uint16_t flags; /*< Event flags */
} REP_HEADER;
/**
* The binlog record structure. This contains the actual packet read from the binlog
* file.
*/
typedef struct {
unsigned long position; /*< binlog record position for this cache entry */
GWBUF *pkt; /*< The packet received from the master */
REP_HEADER hdr; /*< The packet header */
} BLCACHE_RECORD;
/**
* The binlog cache. A cache exists for each file that hold cached bin log records.
* Caches will be used for all files being read by more than 1 slave.
*/
typedef struct {
BLCACHE_RECORD **records; /*< The actual binlog records */
int current; /*< The next record that will be inserted */
int cnt; /*< The number of records in the cache */
SPINLOCK lock; /*< The spinlock for the cache */
} BLCACHE;
typedef struct blfile {
char binlogname[BINLOG_FNAMELEN+1]; /*< Name of the binlog file */
int fd; /*< Actual file descriptor */
int refcnt; /*< Reference count for file */
BLCACHE *cache; /*< Record cache for this file */
SPINLOCK lock; /*< The file lock */
struct blfile *next; /*< Next file in list */
} BLFILE;
/**
* Slave statistics
*/
typedef struct {
int n_events; /*< Number of events sent */
int n_bursts; /*< Number of bursts sent */
int n_requests; /*< Number of requests received */
int n_flows; /*< Number of flow control restarts */
int n_catchupnr; /*< No. of times catchup resulted in not entering loop */
int n_alreadyupd;
int n_upd;
int n_cb;
int n_cbna;
int n_dcb;
int n_above;
int n_failed_read;
int n_overrun;
int n_actions[3];
int n_events; /*< Number of events sent */
int n_bursts; /*< Number of bursts sent */
int n_requests; /*< Number of requests received */
int n_flows; /*< Number of flow control restarts */
int n_upd;
int n_cb;
int n_cbna;
int n_dcb;
int n_above;
int n_failed_read;
int n_overrun;
int n_actions[3];
uint64_t lastsample;
int minno;
int minavgs[BLR_NSTATS_MINUTES];
} SLAVE_STATS;
/**
@ -89,6 +157,7 @@ typedef struct router_slave {
int binlog_pos; /*< Binlog position for this slave */
char binlogfile[BINLOG_FNAMELEN+1];
/*< Current binlog file for this slave */
BLFILE *file; /*< Currently open binlog file */
int serverid; /*< Server-id of the slave */
char *hostname; /*< Hostname of the slave, if known */
char *user; /*< Username if given */
@ -132,6 +201,9 @@ typedef struct {
uint64_t n_fakeevents; /*< Fake events not written to disk */
uint64_t n_artificial; /*< Artificial events not written to disk */
uint64_t events[0x24]; /*< Per event counters */
uint64_t lastsample;
int minno;
int minavgs[BLR_NSTATS_MINUTES];
} ROUTER_STATS;
/**
@ -153,37 +225,6 @@ typedef struct {
int fde_len; /*< Length of fde_event */
} MASTER_RESPONSES;
/**
* The binlog record structure. This contains the actual packet received from the
* master, the binlog position of the data in the packet, a point to the data and
* the length of the binlog record.
*
* This allows requests for binlog records in the cache to be serviced by simply
* sending the exact same packet as was received by MaxScale from the master.
* Items are written to the backing file as soon as they are received. The binlog
* cache is flushed of old records periodically, releasing the GWBUF's back to the
* free memory pool.
*/
typedef struct {
unsigned long position; /*< binlog record position for this cache entry */
GWBUF *pkt; /*< The packet received from the master */
unsigned char *data; /*< Pointer to the data within the packet */
unsigned int record_len; /*< Binlog record length */
} BLCACHE_RECORD;
/**
* The binlog cache. A cache exists for each file that hold cached bin log records.
* Typically the router will hold two binlog caches, one for the current file and one
* for the previous file.
*/
typedef struct {
char filename[BINLOG_FNAMELEN+1];
BLCACHE_RECORD *first;
BLCACHE_RECORD *current;
int cnt;
} BLCACHE;
/**
* The per instance data for the router.
*/
@ -205,6 +246,8 @@ typedef struct router_instance {
uint8_t lastEventReceived;
GWBUF *residual; /*< Any residual binlog event */
MASTER_RESPONSES saved_master; /*< Saved master responses */
char *binlogdir; /*< The directory with the binlog files */
SPINLOCK binlog_lock; /*< Lock to control update of the binlog position */
char binlog_name[BINLOG_FNAMELEN+1];
/*< Name of the current binlog file */
uint64_t binlog_position;
@ -212,32 +255,25 @@ typedef struct router_instance {
int binlog_fd; /*< File descriptor of the binlog
* file being written
*/
uint64_t last_written; /*< Position of last event written */
char prevbinlog[BINLOG_FNAMELEN+1];
int rotating; /*< Rotation in progress flag */
BLFILE *files; /*< Files used by the slaves */
SPINLOCK fileslock; /*< Lock for the files queue above */
unsigned int low_water; /*< Low water mark for client DCB */
unsigned int high_water; /*< High water mark for client DCB */
BLCACHE *cache[2];
unsigned int short_burst; /*< Short burst for slave catchup */
unsigned int long_burst; /*< Long burst for slave catchup */
unsigned long burst_size; /*< Maximum size of burst to send */
ROUTER_STATS stats; /*< Statistics for this router */
int active_logs;
int reconnect_pending;
int retry_backoff;
int handling_threads;
struct router_instance
*next;
} ROUTER_INSTANCE;
/**
* Packet header for replication messages
*/
typedef struct rep_header {
int payload_len; /*< Payload length (24 bits) */
uint8_t seqno; /*< Response sequence number */
uint8_t ok; /*< OK Byte from packet */
uint32_t timestamp; /*< Timestamp - start of binlog record */
uint8_t event_type; /*< Binlog event type */
uint32_t serverid; /*< Server id of master */
uint32_t event_size; /*< Size of header, post-header and body */
uint32_t next_pos; /*< Position of next event */
uint16_t flags; /*< Event flags */
} REP_HEADER;
/**
* State machine for the master to MaxScale replication
*/
@ -270,21 +306,23 @@ static char *blrm_states[] = { "Unconnected", "Authenticated", "Timestamp retrie
#define BLRS_UNREGISTERED 0x0001
#define BLRS_REGISTERED 0x0002
#define BLRS_DUMPING 0x0003
#define BLRS_ERRORED 0x0004
#define BLRS_MAXSTATE 0x0003
#define BLRS_MAXSTATE 0x0004
static char *blrs_states[] = { "Created", "Unregistered", "Registered",
"Sending binlogs" };
"Sending binlogs", "Errored" };
/**
* Slave catch-up status
*/
#define CS_READING 0x0001
#define CS_INNERLOOP 0x0002
#define CS_UPTODATE 0x0004
#define CS_EXPECTCB 0x0008
#define CS_DIST 0x0010
#define CS_DISTLATCH 0x0020
#define CS_THRDWAIT 0x0040
#define CS_BUSY 0x0100
#define CS_HOLD 0x0200
/**
* MySQL protocol OpCodes needed for replication
@ -347,22 +385,45 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered",
#define LOG_EVENT_NO_FILTER_F 0x0100
#define LOG_EVENT_MTS_ISOLATE_F 0x0200
/**
* Macros to extract common fields
*/
#define INLINE_EXTRACT 1 /* Set to 0 for debug purposes */
#if INLINE_EXTRACT
#define EXTRACT16(x) (*(uint8_t *)(x) | (*((uint8_t *)(x) + 1) << 8))
#define EXTRACT24(x) (*(uint8_t *)(x) | \
(*((uint8_t *)(x) + 1) << 8) | \
(*((uint8_t *)(x) + 2) << 16))
#define EXTRACT32(x) (*(uint8_t *)(x) | \
(*((uint8_t *)(x) + 1) << 8) | \
(*((uint8_t *)(x) + 2) << 16) | \
(*((uint8_t *)(x) + 3) << 24))
#else
#define EXTRACT16(x) extract_field((x), 16)
#define EXTRACT24(x) extract_field((x), 24)
#define EXTRACT32(x) extract_field((x), 32)
#endif
/*
* Externals within the router
*/
extern void blr_start_master(ROUTER_INSTANCE *);
extern void blr_master_response(ROUTER_INSTANCE *, GWBUF *);
extern void blr_master_reconnect(ROUTER_INSTANCE *);
extern int blr_master_connected(ROUTER_INSTANCE *);
extern int blr_slave_request(ROUTER_INSTANCE *, ROUTER_SLAVE *, GWBUF *);
extern void blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr);
extern int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
extern int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
extern void blr_init_cache(ROUTER_INSTANCE *);
extern void blr_file_init(ROUTER_INSTANCE *);
extern int blr_open_binlog(ROUTER_INSTANCE *, char *);
extern void blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *);
extern void blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t);
extern void blr_file_flush(ROUTER_INSTANCE *);
extern GWBUF *blr_read_binlog(int, unsigned int, REP_HEADER *);
extern BLFILE *blr_open_binlog(ROUTER_INSTANCE *, char *);
extern GWBUF *blr_read_binlog(ROUTER_INSTANCE *, BLFILE *, unsigned int, REP_HEADER *);
extern void blr_close_binlog(ROUTER_INSTANCE *, BLFILE *);
extern unsigned long blr_file_size(BLFILE *);
#endif

View File

@ -823,6 +823,19 @@ static int gw_error_backend_event(DCB *dcb)
*/
if (dcb->state != DCB_STATE_POLLING)
{
int error, len;
char buf[100];
len = sizeof(error);
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0)
{
strerror_r(error, buf, 100);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"DCB in state %s got error '%s'.",
gw_dcb_state2string(dcb->state),
buf)));
}
return 1;
}
errbuf = mysql_create_custom_error(
@ -849,6 +862,18 @@ static int gw_error_backend_event(DCB *dcb)
if (ses_state != SESSION_STATE_ROUTER_READY)
{
int error, len;
char buf[100];
len = sizeof(error);
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0)
{
strerror_r(error, buf, 100);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error '%s' in session that is not ready for routing.",
buf)));
}
gwbuf_free(errbuf);
goto retblock;
}
@ -1040,6 +1065,19 @@ gw_backend_hangup(DCB *dcb)
if (ses_state != SESSION_STATE_ROUTER_READY)
{
int error, len;
char buf[100];
len = sizeof(error);
if (getsockopt(dcb->fd, SOL_SOCKET, SO_ERROR, &error, &len) == 0)
{
strerror_r(error, buf, 100);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Hangup in session that is not ready for routing, "
"Error reported is '%s'.",
buf)));
}
gwbuf_free(errbuf);
goto retblock;
}

View File

@ -1,4 +1,4 @@
add_library(binlogrouter SHARED blr.c blr_master.c blr_cache.c blr_slave.c blr_file.c)
set_target_properties(binlogrouter PROPERTIES INSTALL_RPATH ${CMAKE_INSTALL_RPATH}:${CMAKE_INSTALL_PREFIX}/lib)
target_link_libraries(binlogrouter ssl pthread log_manager ${EMBEDDED_LIB})
target_link_libraries(binlogrouter ssl pthread log_manager)
install(TARGETS binlogrouter DESTINATION modules)

View File

@ -31,10 +31,14 @@ CFLAGS=-c -fPIC -I/usr/include -I../../include -I../../../include \
include ../../../../makefile.inc
#LDFLAGS=-shared -L$(LOGPATH) -L$(QCLASSPATH) -L$(EMBEDDED_LIB) \
# -Wl,-rpath,$(DEST)/lib \
# -Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH) -Wl,-rpath,$(QCLASSPATH) \
# -Wl,-rpath,$(EMBEDDED_LIB)
LDFLAGS=-shared -L$(LOGPATH) -L$(QCLASSPATH) -L$(EMBEDDED_LIB) \
-Wl,-rpath,$(DEST)/lib \
-Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH) -Wl,-rpath,$(QCLASSPATH) \
-Wl,-rpath,$(EMBEDDED_LIB)
-Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH)
SRCS=blr.c blr_master.c blr_cache.c blr_slave.c blr_file.c
OBJ=$(SRCS:.c=.o)

View File

@ -39,6 +39,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <time.h>
#include <service.h>
#include <server.h>
@ -48,6 +49,7 @@
#include <blr.h>
#include <dcb.h>
#include <spinlock.h>
#include <housekeeper.h>
#include <time.h>
#include <skygw_types.h>
@ -95,6 +97,8 @@ static ROUTER_OBJECT MyObject = {
getCapabilities
};
static void stats_func(void *);
static bool rses_begin_locked_router_action(ROUTER_SLAVE *);
static void rses_end_locked_router_action(ROUTER_SLAVE *);
@ -157,7 +161,7 @@ static ROUTER *
createInstance(SERVICE *service, char **options)
{
ROUTER_INSTANCE *inst;
char *value;
char *value, *name;
int i;
if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
@ -169,10 +173,19 @@ int i;
inst->service = service;
spinlock_init(&inst->lock);
inst->files = NULL;
spinlock_init(&inst->fileslock);
spinlock_init(&inst->binlog_lock);
inst->binlog_fd = -1;
inst->low_water = DEF_LOW_WATER;
inst->high_water = DEF_HIGH_WATER;
inst->initbinlog = 0;
inst->short_burst = DEF_SHORT_BURST;
inst->long_burst = DEF_LONG_BURST;
inst->burst_size = DEF_BURST_SIZE;
inst->retry_backoff = 1;
/*
* We only support one server behind this router, since the server is
@ -249,6 +262,10 @@ int i;
{
inst->initbinlog = atoi(value);
}
else if (strcmp(options[i], "file") == 0)
{
inst->initbinlog = atoi(value);
}
else if (strcmp(options[i], "lowwater") == 0)
{
inst->low_water = atoi(value);
@ -257,6 +274,38 @@ int i;
{
inst->high_water = atoi(value);
}
else if (strcmp(options[i], "shortburst") == 0)
{
inst->short_burst = atoi(value);
}
else if (strcmp(options[i], "longburst") == 0)
{
inst->long_burst = atoi(value);
}
else if (strcmp(options[i], "burstsize") == 0)
{
unsigned long size = atoi(value);
char *ptr = value;
while (*ptr && isdigit(*ptr))
ptr++;
switch (*ptr)
{
case 'G':
case 'g':
size = size * 1024 * 1000 * 1000;
break;
case 'M':
case 'm':
size = size * 1024 * 1000;
break;
case 'K':
case 'k':
size = size * 1024;
break;
}
inst->burst_size = size;
}
else
{
LOGIF(LE, (skygw_log_write(
@ -284,6 +333,7 @@ int i;
inst->active_logs = 0;
inst->reconnect_pending = 0;
inst->handling_threads = 0;
inst->rotating = 0;
inst->residual = NULL;
inst->slaves = NULL;
inst->next = NULL;
@ -302,6 +352,12 @@ int i;
*/
blr_init_cache(inst);
if ((name = (char *)malloc(80)) != NULL)
{
sprintf(name, "%s stats", service->name);
hktask_add(name, stats_func, inst, BLR_STATS_FREQ);
}
/*
* Now start the replication from the master to MaxScale
*/
@ -358,6 +414,8 @@ ROUTER_SLAVE *slave;
spinlock_init(&slave->catch_lock);
slave->dcb = session->client;
slave->router = inst;
slave->file = NULL;
strcpy(slave->binlogfile, "unassigned");
/**
* Add this session to the list of active sessions.
@ -477,6 +535,9 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session;
*/
slave->state = BLRS_UNREGISTERED;
if (slave->file)
blr_close_binlog(router, slave->file);
/* Unlock */
rses_end_locked_router_action(slave);
}
@ -541,7 +602,9 @@ diagnostics(ROUTER *router, DCB *dcb)
{
ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)router;
ROUTER_SLAVE *session;
int i = 0;
int i = 0, j;
int minno = 0;
double min5, min10, min15, min30;
char buf[40];
struct tm tm;
@ -554,6 +617,30 @@ struct tm tm;
}
spinlock_release(&router_inst->lock);
minno = router_inst->stats.minno;
min30 = 0.0;
min15 = 0.0;
min10 = 0.0;
min5 = 0.0;
for (j = 0; j < 30; j++)
{
minno--;
if (minno < 0)
minno += 30;
min30 += router_inst->stats.minavgs[minno];
if (j < 15)
min15 += router_inst->stats.minavgs[minno];
if (j < 10)
min10 += router_inst->stats.minavgs[minno];
if (j < 5)
min5 += router_inst->stats.minavgs[minno];
}
min30 /= 30.0;
min15 /= 15.0;
min10 /= 10.0;
min5 /= 5.0;
dcb_printf(dcb, "\tMaster connection DCB: %p\n",
router_inst->master);
dcb_printf(dcb, "\tMaster connection state: %s\n",
@ -574,6 +661,13 @@ struct tm tm;
router_inst->stats.n_slaves);
dcb_printf(dcb, "\tNumber of binlog events received: %u\n",
router_inst->stats.n_binlogs);
minno = router_inst->stats.minno - 1;
if (minno == -1)
minno = 30;
dcb_printf(dcb, "\tNumber of binlog events per minute\n");
dcb_printf(dcb, "\tCurrent 5 10 15 30 Min Avg\n");
dcb_printf(dcb, "\t %6d %8.1f %8.1f %8.1f %8.1f\n",
router_inst->stats.minavgs[minno], min5, min10, min15, min30);
dcb_printf(dcb, "\tNumber of fake binlog events: %u\n",
router_inst->stats.n_fakeevents);
dcb_printf(dcb, "\tNumber of artificial binlog events: %u\n",
@ -582,10 +676,6 @@ struct tm tm;
router_inst->stats.n_binlog_errors);
dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n",
router_inst->stats.n_rotates);
dcb_printf(dcb, "\tNumber of binlog cache hits: %u\n",
router_inst->stats.n_cachehits);
dcb_printf(dcb, "\tNumber of binlog cache misses: %u\n",
router_inst->stats.n_cachemisses);
dcb_printf(dcb, "\tNumber of heartbeat events: %u\n",
router_inst->stats.n_heartbeats);
dcb_printf(dcb, "\tNumber of packets received: %u\n",
@ -615,6 +705,8 @@ struct tm tm;
spinlock_stats(&instlock, spin_reporter, dcb);
dcb_printf(dcb, "\tSpinlock statistics (instance lock):\n");
spinlock_stats(&router_inst->lock, spin_reporter, dcb);
dcb_printf(dcb, "\tSpinlock statistics (binlog position lock):\n");
spinlock_stats(&router_inst->binlog_lock, spin_reporter, dcb);
#endif
if (router_inst->slaves)
@ -624,6 +716,29 @@ struct tm tm;
session = router_inst->slaves;
while (session)
{
minno = session->stats.minno;
min30 = 0.0;
min15 = 0.0;
min10 = 0.0;
min5 = 0.0;
for (j = 0; j < 30; j++)
{
minno--;
if (minno < 0)
minno += 30;
min30 += session->stats.minavgs[minno];
if (j < 15)
min15 += session->stats.minavgs[minno];
if (j < 10)
min10 += session->stats.minavgs[minno];
if (j < 5)
min5 += session->stats.minavgs[minno];
}
min30 /= 30.0;
min15 /= 15.0;
min10 /= 10.0;
min5 /= 5.0;
dcb_printf(dcb, "\t\tServer-id: %d\n", session->serverid);
if (session->hostname)
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
@ -637,14 +752,18 @@ struct tm tm;
dcb_printf(dcb, "\t\tNo. requests: %u\n", session->stats.n_requests);
dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events);
dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts);
minno = session->stats.minno - 1;
if (minno == -1)
minno = 30;
dcb_printf(dcb, "\t\tNumber of binlog events per minute\n");
dcb_printf(dcb, "\t\tCurrent 5 10 15 30 Min Avg\n");
dcb_printf(dcb, "\t\t %6d %8.1f %8.1f %8.1f %8.1f\n",
session->stats.minavgs[minno], min5, min10,
min15, min30);
dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows);
dcb_printf(dcb, "\t\tNo. catchup NRs: %u\n", session->stats.n_catchupnr);
dcb_printf(dcb, "\t\tNo. already up to date: %u\n", session->stats.n_alreadyupd);
dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd);
dcb_printf(dcb, "\t\tNo. of low water cbs %u\n", session->stats.n_cb);
dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb);
dcb_printf(dcb, "\t\tNo. of low water cbs N/A %u\n", session->stats.n_cbna);
dcb_printf(dcb, "\t\tNo. of events > high water %u\n", session->stats.n_above);
dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read);
dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun);
dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]);
@ -652,9 +771,11 @@ struct tm tm;
dcb_printf(dcb, "\t\tNo. of distribute action 3 %u\n", session->stats.n_actions[2]);
if ((session->cstate & CS_UPTODATE) == 0)
{
dcb_printf(dcb, "\t\tSlave is in catchup mode. %s\n",
((session->cstate & CS_EXPECTCB) == 0 ? "" :
"Waiting for DCB queue to drain."));
dcb_printf(dcb, "\t\tSlave is in catchup mode. %s%s\n",
((session->cstate & CS_EXPECTCB) == 0 ? "" :
"Waiting for DCB queue to drain."),
((session->cstate & CS_BUSY) == 0 ? "" :
" Busy in slave catchup."));
}
else
@ -672,7 +793,7 @@ struct tm tm;
dcb_printf(dcb, "\tSpinlock statistics (rses_lock):\n");
spinlock_stats(&session->rses_lock, spin_reporter, dcb);
#endif
dcb_printf(dcb, "\n");
session = session->next;
}
spinlock_release(&router_inst->lock);
@ -718,9 +839,24 @@ ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
static void
errorReply(ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_dcb, error_action_t action, bool *succp)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
int error, len;
char msg[85];
len = sizeof(error);
if (getsockopt(router->master->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0)
{
strerror_r(error, msg, 80);
strcat(msg, " ");
}
else
strcpy(msg, "");
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, "Erorr Reply '%s'", message)));
*succp = false;
LOGFILE_ERROR, "Master connection '%s', %sattempting reconnect to master",
message, msg)));
*succp = true;
blr_master_reconnect(router);
}
/** to be inline'd */
@ -776,3 +912,35 @@ static uint8_t getCapabilities(ROUTER *inst, void *router_session)
{
return 0;
}
/**
* The stats gathering function called from the housekeeper so that we
* can get timed averages of binlog records shippped
*
* @param inst The router instance
*/
static void
stats_func(void *inst)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)inst;
ROUTER_SLAVE *slave;
router->stats.minavgs[router->stats.minno++]
= router->stats.n_binlogs - router->stats.lastsample;
router->stats.lastsample = router->stats.n_binlogs;
if (router->stats.minno == BLR_NSTATS_MINUTES)
router->stats.minno = 0;
spinlock_acquire(&router->lock);
slave = router->slaves;
while (slave)
{
slave->stats.minavgs[slave->stats.minno++]
= slave->stats.n_events - slave->stats.lastsample;
slave->stats.lastsample = slave->stats.n_events;
if (slave->stats.minno == BLR_NSTATS_MINUTES)
slave->stats.minno = 0;
slave = slave->next;
}
spinlock_release(&router->lock);
}

View File

@ -50,12 +50,13 @@
#include <skygw_utils.h>
#include <log_manager.h>
extern int lm_enabled_logfiles_bitmask;
static void blr_file_create(ROUTER_INSTANCE *router, char *file);
static void blr_file_append(ROUTER_INSTANCE *router, char *file);
static uint32_t extract_field(uint8_t *src, int bits);
static void blr_log_header(logfile_id_t file, char *msg, uint8_t *ptr);
/**
* Initialise the binlog file for this instance. MaxScale will look
@ -85,6 +86,8 @@ struct dirent *dp;
if (access(path, R_OK) == -1)
mkdir(path, 0777);
router->binlogdir = strdup(path);
/* First try to find a binlog file number by reading the directory */
root_len = strlen(router->fileroot);
dirp = opendir(path);
@ -146,17 +149,11 @@ blr_file_rotate(ROUTER_INSTANCE *router, char *file, uint64_t pos)
static void
blr_file_create(ROUTER_INSTANCE *router, char *file)
{
char *ptr, path[1024];
char path[1024];
int fd;
unsigned char magic[] = BINLOG_MAGIC;
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
{
strcpy(path, ptr);
}
strcat(path, "/");
strcat(path, router->service->name);
strcpy(path, router->binlogdir);
strcat(path, "/");
strcat(path, file);
@ -167,12 +164,14 @@ unsigned char magic[] = BINLOG_MAGIC;
else
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to create binlog file %s\n", path)));
"Failed to create binlog file %s", path)));
}
fsync(fd);
close(router->binlog_fd);
spinlock_acquire(&router->binlog_lock);
strcpy(router->binlog_name, file);
router->binlog_position = 4; /* Initial position after the magic number */
spinlock_release(&router->binlog_lock);
router->binlog_fd = fd;
}
@ -186,30 +185,26 @@ unsigned char magic[] = BINLOG_MAGIC;
static void
blr_file_append(ROUTER_INSTANCE *router, char *file)
{
char *ptr, path[1024];
char path[1024];
int fd;
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
{
strcpy(path, ptr);
}
strcat(path, "/");
strcat(path, router->service->name);
strcpy(path, router->binlogdir);
strcat(path, "/");
strcat(path, file);
if ((fd = open(path, O_RDWR|O_APPEND, 0666)) == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to open binlog file %s for append.\n",
"Failed to open binlog file %s for append.",
path)));
return;
}
fsync(fd);
close(router->binlog_fd);
spinlock_acquire(&router->binlog_lock);
strcpy(router->binlog_name, file);
router->binlog_position = lseek(fd, 0L, SEEK_END);
spinlock_release(&router->binlog_lock);
router->binlog_fd = fd;
}
@ -224,7 +219,10 @@ void
blr_write_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *buf)
{
pwrite(router->binlog_fd, buf, hdr->event_size, hdr->next_pos - hdr->event_size);
spinlock_acquire(&router->binlog_lock);
router->binlog_position = hdr->next_pos;
router->last_written = hdr->next_pos - hdr->event_size;
spinlock_release(&router->binlog_lock);
}
/**
@ -238,97 +236,272 @@ blr_file_flush(ROUTER_INSTANCE *router)
fsync(router->binlog_fd);
}
int
/**
* Open a binlog file for reading binlog records
*
* @param router The router instance
* @param binlog The binlog filename
* @return a binlog file record
*/
BLFILE *
blr_open_binlog(ROUTER_INSTANCE *router, char *binlog)
{
char *ptr, path[1024];
int rval;
BLFILE *file;
strcpy(path, "/usr/local/skysql/MaxScale");
if ((ptr = getenv("MAXSCALE_HOME")) != NULL)
spinlock_acquire(&router->fileslock);
file = router->files;
while (file && strcmp(file->binlogname, binlog) != 0)
file = file->next;
if (file)
{
strcpy(path, ptr);
file->refcnt++;
spinlock_release(&router->fileslock);
return file;
}
strcat(path, "/");
strcat(path, router->service->name);
if ((file = (BLFILE *)malloc(sizeof(BLFILE))) == NULL)
{
spinlock_release(&router->fileslock);
return NULL;
}
strcpy(file->binlogname, binlog);
file->refcnt = 1;
file->cache = 0;
spinlock_init(&file->lock);
strcpy(path, router->binlogdir);
strcat(path, "/");
strcat(path, binlog);
if ((rval = open(path, O_RDONLY, 0666)) == -1)
if ((file->fd = open(path, O_RDONLY, 0666)) == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to open binlog file %s\n", path)));
"Failed to open binlog file %s", path)));
free(file);
spinlock_release(&router->fileslock);
return NULL;
}
return rval;
file->next = router->files;
router->files = file;
spinlock_release(&router->fileslock);
return file;
}
/**
* Read a replication event into a GWBUF structure.
*
* @param fd File descriptor of the binlog file
* @param pos Position of binlog record to read
* @param hdr Binlog header to populate
* @return The binlog record wrapped in a GWBUF structure
* @param router The router instance
* @param file File record
* @param pos Position of binlog record to read
* @param hdr Binlog header to populate
* @return The binlog record wrapped in a GWBUF structure
*/
GWBUF *
blr_read_binlog(int fd, unsigned int pos, REP_HEADER *hdr)
blr_read_binlog(ROUTER_INSTANCE *router, BLFILE *file, unsigned int pos, REP_HEADER *hdr)
{
uint8_t hdbuf[19];
GWBUF *result;
unsigned char *data;
int n;
unsigned long filelen = 0;
struct stat statb;
if (lseek(fd, pos, SEEK_SET) != pos)
if (fstat(file->fd, &statb) == 0)
filelen = statb.st_size;
if (pos >= filelen)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to seek for binlog entry, "
"at %d.\n", pos)));
LOGIF(LD, (skygw_log_write(LOGFILE_ERROR,
"Attempting to read off the end of the binlog file %s, "
"event at %lu.", file->binlogname, pos)));
return NULL;
}
if (strcmp(router->binlog_name, file->binlogname) == 0 &&
pos >= router->binlog_position)
{
return NULL;
}
/* Read the header information from the file */
if ((n = read(fd, hdbuf, 19)) != 19)
if ((n = pread(file->fd, hdbuf, 19, pos)) != 19)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to read header for binlog entry, "
"at %d (%s).\n", pos, strerror(errno))));
if (n> 0 && n < 19)
switch (n)
{
case 0:
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"Reached end of binlog file at %d.",
pos)));
break;
case -1:
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to read binlog file %s at position %d"
" (%s).", file->binlogname,
pos, strerror(errno))));
if (errno == EBADF)
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Bad file descriptor in read binlog for file %s"
", reference count is %d, descriptor %d.",
file->binlogname, file->refcnt, file->fd)));
break;
default:
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Short read when reading the header. "
"Expected 19 bytes got %d bytes.\n",
n)));
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %d",
file->binlogname, pos, n)));
break;
}
return NULL;
}
hdr->timestamp = extract_field(hdbuf, 32);
hdr->timestamp = EXTRACT32(hdbuf);
hdr->event_type = hdbuf[4];
hdr->serverid = extract_field(&hdbuf[5], 32);
hdr->serverid = EXTRACT32(&hdbuf[5]);
hdr->event_size = extract_field(&hdbuf[9], 32);
hdr->next_pos = extract_field(&hdbuf[13], 32);
hdr->flags = extract_field(&hdbuf[17], 16);
hdr->next_pos = EXTRACT32(&hdbuf[13]);
hdr->flags = EXTRACT16(&hdbuf[17]);
if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Next position in header appears to be incorrect "
"rereading event header at pos %ul in file %s, "
"file size is %ul. Master will write %ul in %s next.",
pos, file->binlogname, filelen, router->binlog_position,
router->binlog_name)));
if ((n = pread(file->fd, hdbuf, 19, pos)) != 19)
{
switch (n)
{
case 0:
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"Reached end of binlog file at %d.",
pos)));
break;
case -1:
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to read binlog file %s at position %d"
" (%s).", file->binlogname,
pos, strerror(errno))));
if (errno == EBADF)
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Bad file descriptor in read binlog for file %s"
", reference count is %d, descriptor %d.",
file->binlogname, file->refcnt, file->fd)));
break;
default:
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Short read when reading the header. "
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %d",
file->binlogname, pos, n)));
break;
}
return NULL;
}
hdr->timestamp = EXTRACT32(hdbuf);
hdr->event_type = hdbuf[4];
hdr->serverid = EXTRACT32(&hdbuf[5]);
hdr->event_size = extract_field(&hdbuf[9], 32);
hdr->next_pos = EXTRACT32(&hdbuf[13]);
hdr->flags = EXTRACT16(&hdbuf[17]);
if (hdr->next_pos < pos && hdr->event_type != ROTATE_EVENT)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Next position still incorrect after "
"rereading")));
return NULL;
}
else
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Next position corrected by "
"rereading")));
}
}
if ((result = gwbuf_alloc(hdr->event_size)) == NULL)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to allocate memory for binlog entry, "
"size %d at %d.\n",
"size %d at %d.",
hdr->event_size, pos)));
return NULL;
}
data = GWBUF_DATA(result);
memcpy(data, hdbuf, 19); // Copy the header in
if ((n = read(fd, &data[19], hdr->event_size - 19))
if ((n = pread(file->fd, &data[19], hdr->event_size - 19, pos + 19))
!= hdr->event_size - 19) // Read the balance
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Short read when reading the event at %d. "
"Expected %d bytes got %d bytes.\n",
pos, n)));
if (n == -1)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Error reading the event at %ld in %s. "
"%s, expected %d bytes.",
pos, file->binlogname,
strerror(errno), hdr->event_size - 19)));
}
else
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Short read when reading the event at %ld in %s. "
"Expected %d bytes got %d bytes.",
pos, file->binlogname, hdr->event_size - 19, n)));
if (filelen != 0 && filelen - pos < hdr->event_size)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Binlog event is close to the end of the binlog file, "
"current file size is %u.",
filelen)));
}
blr_log_header(LOGFILE_ERROR, "Possible malformed event header", hdbuf);
}
gwbuf_consume(result, hdr->event_size);
return NULL;
}
return result;
}
/**
* Close a binlog file that has been opened to read binlog records
*
* The open binlog files are shared between multiple slaves that are
* reading the same binlog file.
*
* @param router The router instance
* @param file The file to close
*/
void
blr_close_binlog(ROUTER_INSTANCE *router, BLFILE *file)
{
spinlock_acquire(&file->lock);
file->refcnt--;
if (file->refcnt == 0)
{
spinlock_acquire(&router->fileslock);
if (router->files == file)
router->files = file->next;
else
{
BLFILE *ptr = router->files;
while (ptr && ptr->next != file)
ptr = ptr->next;
if (ptr)
ptr->next = file->next;
}
spinlock_release(&router->fileslock);
close(file->fd);
file->fd = -1;
}
spinlock_release(&file->lock);
if (file->refcnt == 0)
free(file);
}
/**
* Extract a numeric field from a packet of the specified number of bits
*
@ -348,3 +521,40 @@ uint32_t rval = 0, shift = 0;
}
return rval;
}
/**
* Log the event header of binlog event
*
* @param file The log file into which to write the entry
* @param msg A message strign to preceed the header with
* @param ptr The event header raw data
*/
static void
blr_log_header(logfile_id_t file, char *msg, uint8_t *ptr)
{
char buf[400], *bufp;
int i;
bufp = buf;
bufp += sprintf(bufp, "%s: ", msg);
for (i = 0; i < 19; i++)
bufp += sprintf(bufp, "0x%02x ", ptr[i]);
skygw_log_write_flush(file, "%s", buf);
}
/**
* Return the size of the current binlog file
*
* @param file The binlog file
* @return The current size of the binlog file
*/
unsigned long
blr_file_size(BLFILE *file)
{
struct stat statb;
if (fstat(file->fd, &statb) == 0)
return statb.st_size;
return 0;
}

View File

@ -47,6 +47,7 @@
#include <blr.h>
#include <dcb.h>
#include <spinlock.h>
#include <housekeeper.h>
#include <sys/types.h>
#include <sys/socket.h>
@ -60,39 +61,6 @@
/* Temporary requirement for auth data */
#include <mysql_client_server_protocol.h>
#define SAMPLE_COUNT 10000
CYCLES samples[10][SAMPLE_COUNT];
int sample_index[10] = { 0, 0, 0 };
#define LOGD_SLAVE_CATCHUP1 0
#define LOGD_SLAVE_CATCHUP2 1
#define LOGD_DISTRIBUTE 2
#define LOGD_FILE_FLUSH 3
SPINLOCK logspin = SPINLOCK_INIT;
void
log_duration(int sample, CYCLES duration)
{
char fname[100];
int i;
FILE *fp;
spinlock_acquire(&logspin);
samples[sample][sample_index[sample]++] = duration;
if (sample_index[sample] == SAMPLE_COUNT)
{
sprintf(fname, "binlog_profile.%d", sample);
if ((fp = fopen(fname, "a")) != NULL)
{
for (i = 0; i < SAMPLE_COUNT; i++)
fprintf(fp, "%ld\n", samples[sample][i]);
fclose(fp);
}
sample_index[sample] = 0;
}
spinlock_release(&logspin);
}
extern int lm_enabled_logfiles_bitmask;
@ -126,7 +94,7 @@ GWBUF *buf;
if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Binlog router: failed to create DCB for dummy client\n")));
"Binlog router: failed to create DCB for dummy client")));
return;
}
router->client = client;
@ -134,14 +102,20 @@ GWBUF *buf;
if ((router->session = session_alloc(router->service, client)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Binlog router: failed to create session for connection to master\n")));
"Binlog router: failed to create session for connection to master")));
return;
}
client->session = router->session;
if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL)
{
char *name = malloc(strlen(router->service->name) + strlen(" Master") + 1);
sprintf(name, "%s Master", router->service->name);
hktask_oneshot(name, blr_start_master, router,
BLR_MASTER_BACKOFF_TIME * router->retry_backoff++);
if (router->retry_backoff > BLR_MAX_BACKOFF)
router->retry_backoff = 1;
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Binlog router: failed to connect to master server '%s'\n",
"Binlog router: failed to connect to master server '%s'",
router->service->databases->unique_name)));
return;
}
@ -155,6 +129,7 @@ perror("setsockopt");
router->master_state = BLRM_TIMESTAMP;
router->stats.n_masterstarts++;
router->retry_backoff = 1;
}
/**
@ -170,9 +145,7 @@ blr_restart_master(ROUTER_INSTANCE *router)
{
GWBUF *ptr;
dcb_close(router->master);
dcb_free(router->master);
dcb_free(router->client);
dcb_close(router->client);
/* Discard the queued residual data */
ptr = router->residual;
@ -252,7 +225,7 @@ char query[128];
if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.\n",
LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.",
router->master_state)));
gwbuf_consume(buf, gwbuf_length(buf));
spinlock_acquire(&router->lock);
@ -274,7 +247,7 @@ char query[128];
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Received error: %d, %s from master during %s phase of the master state machine.\n",
"Received error: %d, %s from master during %s phase of the master state machine.",
MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state]
)));
gwbuf_consume(buf, gwbuf_length(buf));
@ -548,23 +521,23 @@ static REP_HEADER phdr;
/* Get the length of the packet from the residual and new packet */
if (reslen >= 3)
{
len = extract_field(pdata, 24);
len = EXTRACT24(pdata);
}
else if (reslen == 2)
{
len = extract_field(pdata, 16);
len |= (extract_field(GWBUF_DATA(pkt->next), 8) << 16);
len = EXTRACT16(pdata);
len |= (*(uint8_t *)GWBUF_DATA(pkt->next) << 16);
}
else if (reslen == 1)
{
len = extract_field(pdata, 8);
len |= (extract_field(GWBUF_DATA(pkt->next), 16) << 8);
len = *pdata;
len |= (EXTRACT16(GWBUF_DATA(pkt->next)) << 8);
}
len += 4; // Allow space for the header
}
else
{
len = extract_field(pdata, 24) + 4;
len = EXTRACT24(pdata) + 4;
}
if (reslen < len && pkt_length >= len)
@ -585,7 +558,7 @@ static REP_HEADER phdr;
LOGIF(LE,(skygw_log_write(
LOGFILE_ERROR,
"Insufficient memory to buffer event "
"of %d bytes. Binlog %s @ %d\n.",
"of %d bytes. Binlog %s @ %d.",
len, router->binlog_name,
router->binlog_position)));
break;
@ -610,7 +583,7 @@ static REP_HEADER phdr;
LOGFILE_ERROR,
"Expected entire message in buffer "
"chain, but failed to create complete "
"message as expected. %s @ %d\n",
"message as expected. %s @ %d",
router->binlog_name,
router->binlog_position)));
free(msg);
@ -631,7 +604,7 @@ static REP_HEADER phdr;
router->stats.n_residuals++;
LOGIF(LD,(skygw_log_write(
LOGFILE_DEBUG,
"Residual data left after %d records. %s @ %d\n",
"Residual data left after %d records. %s @ %d",
router->stats.n_binlogs,
router->binlog_name, router->binlog_position)));
break;
@ -683,7 +656,7 @@ static REP_HEADER phdr;
// #define SHOW_EVENTS
#ifdef SHOW_EVENTS
printf("blr: event type 0x%02x, flags 0x%04x, event size %d\n", hdr.event_type, hdr.flags, hdr.event_size);
printf("blr: event type 0x%02x, flags 0x%04x, event size %d", hdr.event_type, hdr.flags, hdr.event_size);
#endif
if (hdr.event_type >= 0 && hdr.event_type < 0x24)
router->stats.events[hdr.event_type]++;
@ -692,7 +665,7 @@ static REP_HEADER phdr;
// Fake format description message
LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG,
"Replication fake event. "
"Binlog %s @ %d.\n",
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->stats.n_fakeevents++;
@ -721,7 +694,7 @@ static REP_HEADER phdr;
LOGIF(LD,(skygw_log_write(
LOGFILE_DEBUG,
"Replication heartbeat. "
"Binlog %s @ %d.\n",
"Binlog %s @ %d.",
router->binlog_name,
router->binlog_position)));
router->stats.n_heartbeats++;
@ -730,6 +703,8 @@ static REP_HEADER phdr;
{
ptr = ptr + 5; // We don't put the first byte of the payload
// into the binlog file
if (hdr.event_type == ROTATE_EVENT)
router->rotating = 1;
blr_write_binlog_record(router, &hdr, ptr);
if (hdr.event_type == ROTATE_EVENT)
{
@ -745,7 +720,7 @@ static REP_HEADER phdr;
"Artificial event not written "
"to disk or distributed. "
"Type 0x%x, Length %d, Binlog "
"%s @ %d\n.",
"%s @ %d.",
hdr.event_type,
hdr.event_size,
router->binlog_name,
@ -753,6 +728,7 @@ static REP_HEADER phdr;
ptr += 5;
if (hdr.event_type == ROTATE_EVENT)
{
router->rotating = 1;
blr_rotate_event(router, ptr, &hdr);
}
}
@ -762,7 +738,7 @@ static REP_HEADER phdr;
{
printf("Binlog router error: %s\n", &ptr[7]);
LOGIF(LE,(skygw_log_write(LOGFILE_ERROR,
"Error packet in binlog stream.%s @ %d\n.",
"Error packet in binlog stream.%s @ %d.",
router->binlog_name,
router->binlog_position)));
blr_log_packet(LOGFILE_ERROR, "Error Packet:",
@ -802,9 +778,7 @@ static REP_HEADER phdr;
{
ss_dassert(pkt_length == 0);
}
{ CYCLES start = rdtsc();
blr_file_flush(router);
log_duration(LOGD_FILE_FLUSH, rdtsc() - start); }
}
/**
@ -814,25 +788,25 @@ log_duration(LOGD_FILE_FLUSH, rdtsc() - start); }
* @param hdr The packet header to populate
*/
void
blr_extract_header(uint8_t *ptr, REP_HEADER *hdr)
blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr)
{
hdr->payload_len = extract_field(ptr, 24);
hdr->payload_len = EXTRACT24(ptr);
hdr->seqno = ptr[3];
hdr->ok = ptr[4];
hdr->timestamp = extract_field(&ptr[5], 32);
hdr->timestamp = EXTRACT32(&ptr[5]);
hdr->event_type = ptr[9];
hdr->serverid = extract_field(&ptr[10], 32);
hdr->event_size = extract_field(&ptr[14], 32);
hdr->next_pos = extract_field(&ptr[18], 32);
hdr->flags = extract_field(&ptr[22], 16);
hdr->serverid = EXTRACT32(&ptr[10]);
hdr->event_size = EXTRACT32(&ptr[14]);
hdr->next_pos = EXTRACT32(&ptr[18]);
hdr->flags = EXTRACT16(&ptr[22]);
}
/**
* Extract a numeric field from a packet of the specified number of bits
*
* @param src The raw packet source
* @param birs The number of bits to extract (multiple of 8)
* @param bits The number of bits to extract (multiple of 8)
*/
inline uint32_t
extract_field(register uint8_t *src, int bits)
@ -881,11 +855,13 @@ char file[BINLOG_FNAMELEN+1];
printf("New file: %s @ %ld\n", file, pos);
#endif
strcpy(router->prevbinlog, router->binlog_name);
if (strncmp(router->binlog_name, file, slen) != 0)
{
router->stats.n_rotates++;
blr_file_rotate(router, file, pos);
}
router->rotating = 0;
}
/**
@ -927,28 +903,35 @@ blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *
{
GWBUF *pkt;
uint8_t *buf;
ROUTER_SLAVE *slave;
ROUTER_SLAVE *slave, *nextslave;
int action;
CYCLES entry;
entry = rdtsc();
spinlock_acquire(&router->lock);
slave = router->slaves;
while (slave)
{
spinlock_acquire(&slave->catch_lock);
if ((slave->cstate & (CS_UPTODATE|CS_DIST)) == CS_UPTODATE)
if (slave->state != BLRS_DUMPING)
{
/* Slave is up to date with the binlog and no distribute is
* running on this slave.
slave = slave->next;
continue;
}
spinlock_acquire(&slave->catch_lock);
if ((slave->cstate & (CS_UPTODATE|CS_BUSY)) == CS_UPTODATE)
{
/*
* This slave is reporting it is to date with the binlog of the
* master running on this slave.
* It has no thread running currently that is sending binlog
* events.
*/
action = 1;
slave->cstate |= CS_DIST;
slave->cstate |= CS_BUSY;
}
else if ((slave->cstate & (CS_UPTODATE|CS_DIST)) == (CS_UPTODATE|CS_DIST))
else if ((slave->cstate & (CS_UPTODATE|CS_BUSY)) == (CS_UPTODATE|CS_BUSY))
{
/* Slave is up to date with the binlog and a distribute is
* running on this slave.
/*
* The slave is up to date with the binlog and a process is
* running on this slave to send binlog events.
*/
slave->overrun = 1;
action = 2;
@ -960,12 +943,20 @@ CYCLES entry;
}
slave->stats.n_actions[action-1]++;
spinlock_release(&slave->catch_lock);
if (action == 1)
{
if ((slave->binlog_pos == hdr->next_pos - hdr->event_size)
&& (strcmp(slave->binlogfile, router->binlog_name) == 0 ||
hdr->event_type == ROTATE_EVENT))
if (slave->binlog_pos == router->last_written &&
(strcmp(slave->binlogfile, router->binlog_name) == 0 ||
(hdr->event_type == ROTATE_EVENT &&
strcmp(slave->binlogfile, router->prevbinlog))))
{
/*
* The slave should be up to date, check that the binlog
* position matches the event we have to distribute or
* this is a rotate event. Send the event directly from
* memory to the slave.
*/
pkt = gwbuf_alloc(hdr->event_size + 5);
buf = GWBUF_DATA(pkt);
encode_value(buf, hdr->event_size + 1, 24);
@ -985,77 +976,90 @@ CYCLES entry;
spinlock_acquire(&slave->catch_lock);
if (slave->overrun)
{
CYCLES cycle_start, cycles;
slave->stats.n_overrun++;
slave->overrun = 0;
spinlock_release(&router->lock);
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
spinlock_release(&slave->catch_lock);
cycle_start = rdtsc();
blr_slave_catchup(router, slave);
cycles = rdtsc() - cycle_start;
log_duration(LOGD_SLAVE_CATCHUP2, cycles);
spinlock_acquire(&router->lock);
slave = router->slaves;
if (slave)
continue;
else
break;
poll_fake_write_event(slave->dcb);
}
else
{
slave->cstate &= ~CS_DIST;
slave->cstate &= ~CS_BUSY;
}
spinlock_release(&slave->catch_lock);
}
else if (slave->binlog_pos == hdr->next_pos
&& strcmp(slave->binlogfile, router->binlog_name) == 0)
{
/*
* Slave has already read record from file, no
* need to distrbute this event
*/
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_BUSY;
spinlock_release(&slave->catch_lock);
}
else if ((slave->binlog_pos > hdr->next_pos - hdr->event_size)
&& strcmp(slave->binlogfile, router->binlog_name) == 0)
{
/*
* The slave is ahead of the master, this should never
* happen. Force the slave to catchup mode in order to
* try to resolve the issue.
*/
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Slave %d is ahead of expected position %s@%d. "
"Expected position %d",
slave->serverid, slave->binlogfile,
slave->binlog_pos,
hdr->next_pos - hdr->event_size)));
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~(CS_UPTODATE|CS_BUSY);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
}
else if ((hdr->event_type != ROTATE_EVENT)
&& (slave->binlog_pos != hdr->next_pos - hdr->event_size ||
strcmp(slave->binlogfile, router->binlog_name) != 0))
else
{
/* Check slave is in catchup mode and if not
* force it to go into catchup mode.
/*
* The slave is not at the position it should be. Force it into
* catchup mode rather than send this event.
*/
if (slave->cstate & CS_UPTODATE)
{
CYCLES cycle_start, cycles;
spinlock_release(&router->lock);
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"Force slave %d into catchup mode %s@%d\n",
slave->serverid, slave->binlogfile,
slave->binlog_pos)));
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
spinlock_release(&slave->catch_lock);
cycle_start = rdtsc();
blr_slave_catchup(router, slave);
cycles = rdtsc() - cycle_start;
log_duration(LOGD_SLAVE_CATCHUP1, cycles);
spinlock_acquire(&router->lock);
slave = router->slaves;
if (slave)
continue;
else
break;
}
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~(CS_UPTODATE|CS_BUSY);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
}
}
else if (action == 3)
{
/* Slave is not up to date
* Check if it is either expecting a callback or
* is busy processing a callback
*/
spinlock_acquire(&slave->catch_lock);
if ((slave->cstate & (CS_EXPECTCB|CS_BUSY)) == 0)
{
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
}
else
spinlock_release(&slave->catch_lock);
}
slave = slave->next;
}
spinlock_release(&router->lock);
log_duration(LOGD_DISTRIBUTE, rdtsc() - entry);
}
/**
* Write a raw event (the first 40 bytes at most) to a log file
*
* @param file The logfile to write to
* @param msg A textual message to write before the packet
* @param ptr Pointer to the message buffer
* @param len Length of message packet
*/
static void
blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len)
{
@ -1067,8 +1071,21 @@ int i;
for (i = 0; i < len && i < 40; i++)
bufp += sprintf(bufp, "0x%02x ", ptr[i]);
if (i < len)
skygw_log_write_flush(file, "%s...\n", buf);
skygw_log_write_flush(file, "%s...", buf);
else
skygw_log_write_flush(file, "%s\n", buf);
skygw_log_write_flush(file, "%s", buf);
}
/**
* Check if the master connection is in place and we
* are downlaoding binlogs
*
* @param router The router instance
* @return non-zero if we are recivign binlog records
*/
int
blr_master_connected(ROUTER_INSTANCE *router)
{
return router->master_state == BLRM_BINLOGDUMP;
}

View File

@ -47,12 +47,12 @@
#include <blr.h>
#include <dcb.h>
#include <spinlock.h>
#include <housekeeper.h>
#include <skygw_types.h>
#include <skygw_utils.h>
#include <log_manager.h>
static uint32_t extract_field(uint8_t *src, int bits);
static void encode_value(unsigned char *data, unsigned int value, int len);
static int blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
@ -61,9 +61,10 @@ static void blr_slave_send_error(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, c
static int blr_slave_send_timestamp(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static int blr_slave_register(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
static int blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
static int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
static int blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
extern int lm_enabled_logfiles_bitmask;
@ -89,7 +90,7 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
if (slave->state < 0 || slave->state > BLRS_MAXSTATE)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Invalid slave state machine state (%d) for binlog router.\n",
LOGFILE_ERROR, "Invalid slave state machine state (%d) for binlog router.",
slave->state)));
gwbuf_consume(queue, gwbuf_length(queue));
return 0;
@ -109,13 +110,13 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue)
break;
case COM_QUIT:
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG,
"COM_QUIT received from slave with server_id %d\n",
"COM_QUIT received from slave with server_id %d",
slave->serverid)));
break;
default:
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Unexpected MySQL Command (%d) received from slave\n",
"Unexpected MySQL Command (%d) received from slave",
MYSQL_COMMAND(queue))));
break;
}
@ -166,7 +167,7 @@ int query_len;
query_text = strndup(qtext, query_len);
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE, "Execute statement from the slave '%s'\n", query_text)));
LOGFILE_TRACE, "Execute statement from the slave '%s'", query_text)));
/*
* Implement a very rudimental "parsing" of the query text by extarcting the
* words from the statement and matchng them against the subset of queries we
@ -269,7 +270,7 @@ int query_len;
query_text = strndup(qtext, query_len);
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "Unexpected query from slave server %s\n", query_text)));
LOGFILE_ERROR, "Unexpected query from slave server %s", query_text)));
free(query_text);
blr_slave_send_error(router, slave, "Unexpected SQL query received from slave.");
return 0;
@ -300,7 +301,7 @@ GWBUF *clone;
else
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Failed to clone server response to send to slave.\n")));
"Failed to clone server response to send to slave.")));
return 0;
}
}
@ -474,18 +475,19 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue
{
GWBUF *resp;
uint8_t *ptr;
int len, flags, serverid, rval;
int len, flags, serverid, rval, binlognamelen;
REP_HEADER hdr;
uint32_t chksum;
ptr = GWBUF_DATA(queue);
len = extract_field(ptr, 24);
binlognamelen = len - 11;
ptr += 4; // Skip length and sequence number
if (*ptr++ != COM_BINLOG_DUMP)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_binlog_dump expected a COM_BINLOG_DUMP but received %d\n",
"blr_slave_binlog_dump expected a COM_BINLOG_DUMP but received %d",
*(ptr-1))));
return 0;
}
@ -496,15 +498,16 @@ uint32_t chksum;
ptr += 2;
serverid = extract_field(ptr, 32);
ptr += 4;
strncpy(slave->binlogfile, (char *)ptr, BINLOG_FNAMELEN);
strncpy(slave->binlogfile, (char *)ptr, binlognamelen);
slave->binlogfile[binlognamelen] = 0;
slave->state = BLRS_DUMPING;
slave->seqno = 1;
if (slave->nocrc)
len = 0x2b;
len = 19 + 8 + binlognamelen;
else
len = 0x2f;
len = 19 + 8 + 4 + binlognamelen;
// Build a fake rotate event
resp = gwbuf_alloc(len + 5);
@ -520,8 +523,8 @@ uint32_t chksum;
ptr = blr_build_header(resp, &hdr);
encode_value(ptr, slave->binlog_pos, 64);
ptr += 8;
memcpy(ptr, slave->binlogfile, BINLOG_FNAMELEN);
ptr += BINLOG_FNAMELEN;
memcpy(ptr, slave->binlogfile, binlognamelen);
ptr += binlognamelen;
if (!slave->nocrc)
{
@ -567,18 +570,24 @@ uint32_t chksum;
slave->dcb->low_water = router->low_water;
slave->dcb->high_water = router->high_water;
dcb_add_callback(slave->dcb, DCB_REASON_LOW_WATER, blr_slave_callback, slave);
dcb_add_callback(slave->dcb, DCB_REASON_DRAINED, blr_slave_callback, slave);
slave->state = BLRS_DUMPING;
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"%s: New slave %s requested binlog file %s from position %lu",
router->service->name, slave->dcb->remote,
slave->binlogfile, slave->binlog_pos)));
if (slave->binlog_pos != router->binlog_position ||
strcmp(slave->binlogfile, router->binlog_name) != 0)
{
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_UPTODATE;
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
rval = blr_slave_catchup(router, slave);
poll_fake_write_event(slave->dcb);
}
return rval;
}
@ -630,7 +639,7 @@ encode_value(unsigned char *data, unsigned int value, int len)
* @param hdr The packet header to populate
* @return A pointer to the first byte following the event header
*/
static uint8_t *
uint8_t *
blr_build_header(GWBUF *pkt, REP_HEADER *hdr)
{
uint8_t *ptr;
@ -660,229 +669,225 @@ uint8_t *ptr;
* We have a registered slave that is behind the current leading edge of the
* binlog. We must replay the log entries to bring this node up to speed.
*
* There may be a large numebr of records to send to the slave, the process
* There may be a large number of records to send to the slave, the process
* is triggered by the slave COM_BINLOG_DUMP message and all the events must
* be sent without receiving any new event. This measn there is no trigger into
* MaxScale other than this initial message. However, if we simply send all the
* events we end up with an extremely long write queue on the DCB and risk running
* the server out of resources.
* events we end up with an extremely long write queue on the DCB and risk
* running the server out of resources.
*
* To resolve this the concept of high and low water marks within the DCB has been
* added, with the ability for the DCB code to call user defined callbacks when the
* write queue is completely drained, when it crosses above the high water mark and
* when it crosses below the low water mark.
*
* The blr_slave_catchup routine will send binlog events to the slave until the high
* water mark is reached, at which point it will return. Later, when a low water mark
* callback is generated by the code that drains the DCB of data the blr_slave_catchup
* routine will again be called to write more events. The process is repeated until
* the slave has caught up with the master.
* The slave catchup routine will send a burst of replication events per single
* call. The paramter "long" control the number of events in the burst. The
* short burst is intended to be used when the master receive an event and
* needs to put the slave into catchup mode. This prevents the slave taking
* too much tiem away from the thread that is processing the master events.
*
* Note: an additional check that the DCB is still above the low water mark is done
* prior to the return from this function to allow for any delays due to the call to
* the close system call, since this may cause thread rescheduling.
* At the end of the burst a fake EPOLLOUT event is added to the poll event
* queue. This ensures that the slave callback for processing DCB write drain
* will be called and future catchup requests will be handled on another thread.
*
* @param router The binlog router
* @param slave The slave that is behind
* @param large Send a long or short burst of events
* @return The number of bytes written
*/
int
blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
{
GWBUF *head, *record;
REP_HEADER hdr;
int written, fd, rval = 1, burst = 0;
int written, rval = 1, burst;
int rotating;
unsigned long burst_size;
uint8_t *ptr;
struct timespec req;
if (large)
burst = router->long_burst;
else
burst = router->short_burst;
burst_size = router->burst_size;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
doitagain:
/*
* We have a slightly complex syncronisation mechansim here,
* we need to make sure that we do not have multiple threads
* running the catchup loop, but we need to be very careful
* that we do not loose a call that is coming via a callback
* call as this will stall the binlog catchup process.
*
* We don't want to simply use a traditional mutex here for
* the loop, since this would block a MaxScale thread for
* an unacceptable length of time.
*
* We have two status bits, the CS_READING that says we are
* in the outer loop and the CS_INNERLOOP, to say we are in
* the inner loop.
*
* If just CS_READING is set the other thread may be about to
* enter the inner loop or may be about to exit the function
* completely. Therefore we have to wait to see if CS_READING
* is cleared or CS_INNERLOOP is set.
*
* If CS_READING gets cleared then this thread should proceed
* into the loop.
*
* If CS_INNERLOOP get's set then this thread does not need to
* proceed.
*
* If CS_READING is not set then this thread simply enters the
* loop.
*/
req.tv_sec = 0;
req.tv_nsec = 1000;
spinlock_acquire(&slave->catch_lock);
if (slave->cstate & CS_UPTODATE)
if (slave->cstate & CS_BUSY)
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"blr_slave_catchup called with up to date slave %d at "
"%s@%d. Reading position %s@%d\n",
slave->serverid, slave->binlogfile,
slave->binlog_pos, router->binlog_name,
router->binlog_position)));
slave->stats.n_alreadyupd++;
spinlock_release(&slave->catch_lock);
return 1;
return 0;
}
while (slave->cstate & CS_READING)
{
// Wait until we know what the other thread is doing
while ((slave->cstate & (CS_READING|CS_INNERLOOP)) == CS_READING)
{
spinlock_release(&slave->catch_lock);
nanosleep(&req, NULL);
spinlock_acquire(&slave->catch_lock);
}
// Other thread is in the innerloop
if ((slave->cstate & (CS_READING|CS_INNERLOOP)) == (CS_READING|CS_INNERLOOP))
{
spinlock_release(&slave->catch_lock);
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"blr_slave_catchup thread returning due to "
"lock being held by another thread. %s@%d\n",
slave->binlogfile,
slave->binlog_pos)));
slave->stats.n_catchupnr++;
return 1; // We cheat here and return 1 because otherwise
// an error would be sent and we do not want that
}
/* Release the lock for a short time to allow the other
* thread to exit the outer reading loop.
*/
spinlock_release(&slave->catch_lock);
nanosleep(&req, NULL);
spinlock_acquire(&slave->catch_lock);
}
if (slave->pthread)
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG, "Multiple threads sending to same thread.\n")));
slave->pthread = pthread_self();
slave->cstate |= CS_READING;
slave->cstate |= CS_BUSY;
spinlock_release(&slave->catch_lock);
if (DCB_ABOVE_HIGH_WATER(slave->dcb))
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "blr_slave_catchup above high water on entry.\n")));
do {
if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
if (slave->file == NULL)
{
rotating = router->rotating;
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
{
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_READING;
spinlock_release(&slave->catch_lock);
LOGIF(LE, (skygw_log_write(
if (rotating)
{
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
slave->cstate &= ~CS_BUSY;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
return rval;
}
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s\n",
"blr_slave_catchup failed to open binlog file %s",
slave->binlogfile)));
slave->cstate &= ~CS_BUSY;
slave->state = BLRS_ERRORED;
dcb_close(slave->dcb);
return 0;
}
slave->stats.n_bursts++;
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_INNERLOOP;
spinlock_release(&slave->catch_lock);
while ((!DCB_ABOVE_HIGH_WATER(slave->dcb)) &&
(record = blr_read_binlog(fd, slave->binlog_pos, &hdr)) != NULL)
}
slave->stats.n_bursts++;
while (burst-- && burst_size > 0 &&
(record = blr_read_binlog(router, slave->file, slave->binlog_pos, &hdr)) != NULL)
{
head = gwbuf_alloc(5);
ptr = GWBUF_DATA(head);
encode_value(ptr, hdr.event_size + 1, 24);
ptr += 3;
*ptr++ = slave->seqno++;
*ptr++ = 0; // OK
head = gwbuf_append(head, record);
if (hdr.event_type == ROTATE_EVENT)
{
if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++;
head = gwbuf_alloc(5);
ptr = GWBUF_DATA(head);
encode_value(ptr, hdr.event_size + 1, 24);
ptr += 3;
*ptr++ = slave->seqno++;
*ptr++ = 0; // OK
head = gwbuf_append(head, record);
if (hdr.event_type == ROTATE_EVENT)
unsigned long beat1 = hkheartbeat;
blr_close_binlog(router, slave->file);
if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "blr_close_binlog took %d beats",
hkheartbeat - beat1)));
blr_slave_rotate(slave, GWBUF_DATA(record));
beat1 = hkheartbeat;
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
{
close(fd);
blr_slave_rotate(slave, GWBUF_DATA(record));
if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
if (rotating)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s\n",
slave->binlogfile)));
break;
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
slave->cstate &= ~CS_BUSY;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
return rval;
}
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s",
slave->binlogfile)));
slave->state = BLRS_ERRORED;
dcb_close(slave->dcb);
break;
}
written = slave->dcb->func.write(slave->dcb, head);
if (written && hdr.event_type != ROTATE_EVENT)
{
slave->binlog_pos = hdr.next_pos;
}
rval = written;
slave->stats.n_events++;
burst++;
if (hkheartbeat - beat1 > 1) LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR, "blr_open_binlog took %d beats",
hkheartbeat - beat1)));
}
if (record == NULL)
slave->stats.n_failed_read++;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_INNERLOOP;
spinlock_release(&slave->catch_lock);
written = slave->dcb->func.write(slave->dcb, head);
if (written && hdr.event_type != ROTATE_EVENT)
{
slave->binlog_pos = hdr.next_pos;
}
rval = written;
slave->stats.n_events++;
burst_size -= hdr.event_size;
}
if (record == NULL)
slave->stats.n_failed_read++;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_BUSY;
spinlock_release(&slave->catch_lock);
close(fd);
} while (record && DCB_BELOW_LOW_WATER(slave->dcb));
if (record)
{
slave->stats.n_flows++;
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
}
else if (slave->binlog_pos == router->binlog_position &&
strcmp(slave->binlogfile, router->binlog_name) == 0)
{
int state_change = 0;
spinlock_acquire(&router->binlog_lock);
spinlock_acquire(&slave->catch_lock);
/*
* Now check again since we hold the router->binlog_lock
* and slave->catch_lock.
*/
if (slave->binlog_pos != router->binlog_position ||
strcmp(slave->binlogfile, router->binlog_name) != 0)
{
slave->cstate &= ~CS_UPTODATE;
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
spinlock_release(&router->binlog_lock);
poll_fake_write_event(slave->dcb);
}
else
{
if ((slave->cstate & CS_UPTODATE) == 0)
{
slave->stats.n_upd++;
slave->cstate |= CS_UPTODATE;
spinlock_release(&slave->catch_lock);
spinlock_release(&router->binlog_lock);
state_change = 1;
}
}
if (state_change)
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"%s: Slave %s is up to date %s, %u.",
router->service->name,
slave->dcb->remote,
slave->binlogfile, slave->binlog_pos)));
}
}
else
{
int state_change = 0;
spinlock_acquire(&slave->catch_lock);
if ((slave->cstate & CS_UPTODATE) == 0)
if (slave->binlog_pos >= blr_file_size(slave->file)
&& router->rotating == 0
&& strcmp(router->binlog_name, slave->binlogfile) != 0
&& blr_master_connected(router))
{
slave->stats.n_upd++;
slave->cstate |= CS_UPTODATE;
state_change = 1;
/* We may have reached the end of file of a non-current
* binlog file.
*
* Note if the master is rotating there is a window during
* which the rotate event has been written to the old binlog
* but the new binlog file has not yet been created. Therefore
* we ignore these issues during the rotate processing.
*/
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR,
"Slave reached end of file for binlong file %s at %u "
"which is not the file currently being downloaded. "
"Master binlog is %s, %lu.",
slave->binlogfile, slave->binlog_pos,
router->binlog_name, router->binlog_position)));
if (blr_slave_fake_rotate(router, slave))
{
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
}
else
{
slave->state = BLRS_ERRORED;
dcb_close(slave->dcb);
}
}
else
{
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
poll_fake_write_event(slave->dcb);
}
spinlock_release(&slave->catch_lock);
if (state_change)
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"blr_slave_catchup slave is up to date %s, %u\n",
slave->binlogfile, slave->binlog_pos)));
}
spinlock_acquire(&slave->catch_lock);
#if 0
if (slave->pthread != pthread_self())
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Multple threads in catchup for same slave: %x and %x\n", slave->pthread, pthread_self())));
abort();
}
#endif
slave->pthread = 0;
#if 0
if (DCB_BELOW_LOW_WATER(slave->dcb) && slave->binlog_pos != router->binlog_position) abort();
#endif
slave->cstate &= ~CS_READING;
spinlock_release(&slave->catch_lock);
if (DCB_BELOW_LOW_WATER(slave->dcb) && slave->binlog_pos != router->binlog_position)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Expected to be above low water\n")));
goto doitagain;
}
return rval;
}
@ -896,7 +901,7 @@ goto doitagain;
* @param reason The reason the callback was called
* @param data The user data, in this case the server structure
*/
static int
int
blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data)
{
ROUTER_SLAVE *slave = (ROUTER_SLAVE *)data;
@ -904,11 +909,19 @@ ROUTER_INSTANCE *router = slave->router;
if (reason == DCB_REASON_DRAINED)
{
if (slave->state == BLRS_DUMPING &&
slave->binlog_pos != router->binlog_position)
if (slave->state == BLRS_DUMPING)
{
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~(CS_UPTODATE|CS_EXPECTCB);
spinlock_release(&slave->catch_lock);
slave->stats.n_dcb++;
blr_slave_catchup(router, slave);
blr_slave_catchup(router, slave, true);
}
else
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG, "Ignored callback due to slave state %s",
blrs_states[slave->state])));
}
}
@ -917,7 +930,7 @@ ROUTER_INSTANCE *router = slave->router;
if (slave->state == BLRS_DUMPING)
{
slave->stats.n_cb++;
blr_slave_catchup(router, slave);
blr_slave_catchup(router, slave, true);
}
else
{
@ -931,14 +944,90 @@ ROUTER_INSTANCE *router = slave->router;
* Rotate the slave to the new binlog file
*
* @param slave The slave instance
* @param ptr The rotate event (minux header and OK byte)
* @param ptr The rotate event (minus header and OK byte)
*/
void
blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr)
{
int len = EXTRACT24(ptr + 9); // Extract the event length
len = len - (19 + 8 + 4); // Remove length of header, checksum and position
if (len > BINLOG_FNAMELEN)
len = BINLOG_FNAMELEN;
ptr += 19; // Skip header
slave->binlog_pos = extract_field(ptr, 32);
slave->binlog_pos += (extract_field(ptr+4, 32) << 32);
memcpy(slave->binlogfile, ptr + 8, BINLOG_FNAMELEN);
slave->binlogfile[BINLOG_FNAMELEN] = 0;
memcpy(slave->binlogfile, ptr + 8, len);
slave->binlogfile[len] = 0;
}
/**
* Generate an internal rotate event that we can use to cause the slave to move beyond
* a binlog file that is misisng the rotate eent at the end.
*
* @param router The router instance
* @param slave The slave to rotate
* @return Non-zero if the rotate took place
*/
static int
blr_slave_fake_rotate(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
{
char *sptr;
int filenum;
GWBUF *resp;
uint8_t *ptr;
int len, binlognamelen;
REP_HEADER hdr;
uint32_t chksum;
if ((sptr = strrchr(slave->binlogfile, '.')) == NULL)
return 0;
blr_close_binlog(router, slave->file);
filenum = atoi(sptr + 1);
sprintf(slave->binlogfile, BINLOG_NAMEFMT, router->fileroot, filenum + 1);
slave->binlog_pos = 4;
if ((slave->file = blr_open_binlog(router, slave->binlogfile)) == NULL)
return 0;
binlognamelen = strlen(slave->binlogfile);
if (slave->nocrc)
len = 19 + 8 + binlognamelen;
else
len = 19 + 8 + 4 + binlognamelen;
// Build a fake rotate event
resp = gwbuf_alloc(len + 5);
hdr.payload_len = len + 1;
hdr.seqno = slave->seqno++;
hdr.ok = 0;
hdr.timestamp = 0L;
hdr.event_type = ROTATE_EVENT;
hdr.serverid = router->masterid;
hdr.event_size = len;
hdr.next_pos = 0;
hdr.flags = 0x20;
ptr = blr_build_header(resp, &hdr);
encode_value(ptr, slave->binlog_pos, 64);
ptr += 8;
memcpy(ptr, slave->binlogfile, binlognamelen);
ptr += binlognamelen;
if (!slave->nocrc)
{
/*
* Now add the CRC to the fake binlog rotate event.
*
* The algorithm is first to compute the checksum of an empty buffer
* and then the checksum of the event portion of the message, ie we do not
* include the length, sequence number and ok byte that makes up the first
* 5 bytes of the message. We also do not include the 4 byte checksum itself.
*/
chksum = crc32(0L, NULL, 0);
chksum = crc32(chksum, GWBUF_DATA(resp) + 5, hdr.event_size - 4);
encode_value(ptr, chksum, 32);
}
slave->dcb->func.write(slave->dcb, resp);
return 1;
}

View File

@ -41,6 +41,7 @@
* than simply addresses
* 23/05/14 Mark Riddoch Added support for developer and user modes
* 29/05/14 Mark Riddoch Add Filter support
* 16/10/14 Mark Riddoch Add show eventq
*
* @endverbatim
*/
@ -66,6 +67,8 @@
#include <adminusers.h>
#include <monitor.h>
#include <debugcli.h>
#include <poll.h>
#include <housekeeper.h>
#include <skygw_utils.h>
#include <log_manager.h>
@ -81,6 +84,7 @@
#define ARG_TYPE_DCB 7
#define ARG_TYPE_MONITOR 8
#define ARG_TYPE_FILTER 9
#define ARG_TYPE_NUMERIC 10
/**
* The subcommand structure
@ -117,6 +121,14 @@ struct subcommand showoptions[] = {
"Show the poll statistics",
"Show the poll statistics",
{0, 0, 0} },
{ "eventq", 0, dShowEventQ,
"Show the queue of events waiting to be processed",
"Show the queue of events waiting to be processed",
{0, 0, 0} },
{ "eventstats", 0, dShowEventStats,
"Show the event statistics",
"Show the event statistics",
{0, 0, 0} },
{ "filter", 1, dprintFilter,
"Show details of a filter, called with a filter name",
"Show details of a filter, called with the address of a filter",
@ -161,6 +173,10 @@ struct subcommand showoptions[] = {
"Show all active sessions in MaxScale",
"Show all active sessions in MaxScale",
{0, 0, 0} },
{ "tasks", 0, hkshow_tasks,
"Show all active housekeeper tasks in MaxScale",
"Show all active housekeeper tasks in MaxScale",
{0, 0, 0} },
{ "threads", 0, dShowThreads,
"Show the status of the polling threads in MaxScale",
"Show the status of the polling threads in MaxScale",
@ -282,6 +298,8 @@ struct subcommand restartoptions[] = {
};
static void set_server(DCB *dcb, SERVER *server, char *bit);
static void set_pollsleep(DCB *dcb, int);
static void set_nbpoll(DCB *dcb, int);
/**
* The subcommands of the set command
*/
@ -290,6 +308,15 @@ struct subcommand setoptions[] = {
"Set the status of a server. E.g. set server dbnode4 master",
"Set the status of a server. E.g. set server 0x4838320 master",
{ARG_TYPE_SERVER, ARG_TYPE_STRING, 0} },
{ "pollsleep", 1, set_pollsleep,
"Set the maximum poll sleep period in milliseconds",
"Set the maximum poll sleep period in milliseconds",
{ARG_TYPE_NUMERIC, 0, 0} },
{ "nbpolls", 1, set_nbpoll,
"Set the number of non-blocking polls",
"Set the number of non-blocking polls",
{ARG_TYPE_NUMERIC, 0, 0} },
{ NULL, 0, NULL, NULL, NULL,
{0, 0, 0} }
};
@ -672,6 +699,16 @@ SERVICE *service;
if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0)
rval = (unsigned long)filter_find(arg);
return rval;
case ARG_TYPE_NUMERIC:
{
int i;
for (i = 0; arg[i]; i++)
{
if (arg[i] < '0' || arg[i] > '9')
return 0;
}
return atoi(arg);
}
}
return 0;
}
@ -1303,6 +1340,30 @@ static void disable_log_action(DCB *dcb, char *arg1) {
skygw_log_disable(type);
}
/**
* Set the duration of the sleep passed to the poll wait
*
* @param dcb DCB for output
* @param sleeptime Sleep time in milliseconds
*/
static void
set_pollsleep(DCB *dcb, int sleeptime)
{
poll_set_maxwait(sleeptime);
}
/**
* Set the number of non-blockign spins to make
*
* @param dcb DCB for output
* @param nb Number of spins
*/
static void
set_nbpoll(DCB *dcb, int nb)
{
poll_set_nonblocking_polls(nb);
}
#if defined(FAKE_CODE)
static void fail_backendfd(void)
{