From 4b5f801ff9c6ae26323fe94f439ad83fa0c5783f Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Wed, 28 May 2014 23:38:54 +0100 Subject: [PATCH 01/10] Misc fixes for bin log rotate issues --- server/modules/include/blr.h | 2 + server/modules/routing/binlog/STATUS | 9 +--- server/modules/routing/binlog/blr.c | 10 ++++ server/modules/routing/binlog/blr_master.c | 32 +++++++++++-- server/modules/routing/binlog/blr_slave.c | 53 ++++++++++++++-------- 5 files changed, 78 insertions(+), 28 deletions(-) diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 722775478..48117d10a 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -248,6 +248,8 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered", */ #define CS_READING 0x0001 #define CS_INNERLOOP 0x0002 +#define CS_UPTODATE 0x0004 +#define CS_EXPECTCB 0x0008 /** * MySQL protocol OpCodes needed for replication diff --git a/server/modules/routing/binlog/STATUS b/server/modules/routing/binlog/STATUS index bd981306b..db3a190f5 100644 --- a/server/modules/routing/binlog/STATUS +++ b/server/modules/routing/binlog/STATUS @@ -8,11 +8,6 @@ MariaDB or Percona Server. To Do List: -1. Thread safety needs to be examine, currently MaxScale has been -run with a single thread when testing this router. +1. The router does not implement the replication heartbeat mechanism. -2. Binlog rotate events have yet to be tested. - -3. The router does not implement the replication heartbeat mechanism. - -4. Performance measurements have yet to be made. +2. Performance measurements have yet to be made. diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 6560bd609..8d1e2b39e 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -330,6 +330,8 @@ ROUTER_SLAVE *slave; memset(&slave->stats, 0, sizeof(SLAVE_STATS)); atomic_add(&inst->stats.n_slaves, 1); slave->state = BLRS_CREATED; /* Set initial state of the slave */ + slave->cstate = 0; + spinlock_init(&slave->catch_lock); slave->dcb = session->client; slave->router = instance; @@ -555,6 +557,14 @@ int i = 0; dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events); dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts); dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows); + if ((session->cstate & CS_UPTODATE) == 0) + { + dcb_printf(dcb, "\t\tSlave is in catchup mode. %s\n", + ((session->cstate & CS_EXPECTCB) == 0 ? "" : + "Waiting for DCB queue to drain.")); + } + else + dcb_printf(dcb, "\t\tSlave is in normal mode.\n"); session = session->next; } spinlock_release(&router_inst->lock); diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index fee0be5db..c26c68376 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -658,7 +658,8 @@ ROUTER_SLAVE *slave; slave = router->slaves; while (slave) { - if (slave->binlog_pos == hdr->next_pos - hdr->event_size) + if ((slave->binlog_pos == hdr->next_pos - hdr->event_size) + && strcmp(slave->binlogfile, router->binlog_name) == 0) { pkt = gwbuf_alloc(hdr->event_size + 5); buf = GWBUF_DATA(pkt); @@ -667,12 +668,37 @@ ROUTER_SLAVE *slave; *buf++ = slave->seqno++; *buf++ = 0; // OK memcpy(buf, ptr, hdr->event_size); - slave->dcb->func.write(slave->dcb, pkt); - slave->binlog_pos = hdr->next_pos; if (hdr->event_type == ROTATE_EVENT) { blr_slave_rotate(slave, ptr); } + slave->dcb->func.write(slave->dcb, pkt); + if (hdr->event_type != ROTATE_EVENT) + { + slave->binlog_pos = hdr->next_pos; + } + } + else if ((hdr->event_type != ROTATE_EVENT) + && (slave->binlog_pos != hdr->next_pos || + strcmp(slave->binlogfile, router->binlog_name) != 0)) + { + /* Check slave is in catchup mode and if not + * force it to go into catchup mode. + */ + if (slave->cstate & CS_UPTODATE) + { + spinlock_release(&router->lock); + spinlock_acquire(&slave->catch_lock); + slave->cstate &= ~CS_UPTODATE; + spinlock_release(&slave->catch_lock); + blr_slave_catchup(router, slave); + spinlock_acquire(&router->lock); + slave = router->slaves; + if (slave) + continue; + else + break; + } } slave = slave->next; diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index 0a52fe2c0..da7d7f8a2 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -61,7 +61,7 @@ static void blr_slave_send_error(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, c static int blr_slave_send_timestamp(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); static int blr_slave_register(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue); static int blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue); -static int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); +int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); static uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr); static int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data); @@ -88,7 +88,7 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) { if (slave->state < 0 || slave->state > BLRS_MAXSTATE) { - LOGIF(LM, (skygw_log_write( + LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, "Invalid slave state machine state (%d) for binlog router.\n", slave->state))); gwbuf_consume(queue, gwbuf_length(queue)); @@ -108,6 +108,10 @@ blr_slave_request(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue) return blr_slave_binlog_dump(router, slave, queue); break; default: + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Unexpected MySQL Command (%d) received from slave\n", + MYSQL_COMMAND(queue)))); break; } return 0; @@ -450,7 +454,13 @@ uint32_t chksum; len = extract_field(ptr, 24); ptr += 4; // Skip length and sequence number if (*ptr++ != COM_BINLOG_DUMP) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "blr_slave_binlog_dump expected a COM_BINLOG_DUMP but received %d\n", + *(ptr-1)))); return 0; + } slave->binlog_pos = extract_field(ptr, 32); ptr += 4; @@ -632,16 +642,19 @@ uint8_t *ptr; * @param slave The slave that is behind * @return The number of bytes written */ -static int +int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) { GWBUF *head, *record; REP_HEADER hdr; -int written, fd, rval = 0, burst = 0; +int written, fd, rval = 1, burst = 0; uint8_t *ptr; struct timespec req; + spinlock_acquire(&slave->catch_lock); + slave->cstate &= ~CS_EXPECTCB; + spinlock_release(&slave->catch_lock); /* * We have a slightly complex syncronisation mechansim here, * we need to make sure that we do not have multiple threads @@ -733,22 +746,11 @@ struct timespec req; } } written = slave->dcb->func.write(slave->dcb, head); - if (written) - slave->binlog_pos = hdr.next_pos; - rval = written; - if (hdr.event_type == ROTATE_EVENT) + if (written && hdr.event_type != ROTATE_EVENT) { - close(fd); - blr_slave_rotate(slave, GWBUF_DATA(record)); - if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1) - { - LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, - "blr_slave_catchup failed to open binlog file %s\n", - slave->binlogfile))); - break; - } + slave->binlog_pos = hdr.next_pos; } + rval = written; atomic_add(&slave->stats.n_events, 1); burst++; } @@ -762,7 +764,21 @@ struct timespec req; slave->cstate &= ~CS_READING; spinlock_release(&slave->catch_lock); if (record) + { atomic_add(&slave->stats.n_flows, 1); + spinlock_acquire(&slave->catch_lock); + slave->cstate |= CS_EXPECTCB; + spinlock_release(&slave->catch_lock); + } + else + { + spinlock_acquire(&slave->catch_lock); + slave->cstate |= CS_UPTODATE; + spinlock_release(&slave->catch_lock); + LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, + "blr_slave_catchup slave is up to date %s, %u\n", + slave->binlogfile, slave->binlog_pos))); + } return rval; } @@ -790,6 +806,7 @@ ROUTER_INSTANCE *router = slave->router; atomic_add(&slave->stats.n_events, 1); blr_slave_catchup(router, slave); } + return 0; } /** From bb0e6c3858d68de8da151eae1e82a79a125cb1b9 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Wed, 4 Jun 2014 18:37:41 +0100 Subject: [PATCH 02/10] Support non-default binlog filenames Add support for bin log file names that are shorter than the default. Handle events per more than 2 packets --- server/core/buffer.c | 4 ++ server/modules/include/blr.h | 4 +- server/modules/routing/binlog/blr.c | 8 ++- server/modules/routing/binlog/blr_file.c | 26 +++++++- server/modules/routing/binlog/blr_master.c | 77 +++++++++++++++++----- 5 files changed, 99 insertions(+), 20 deletions(-) diff --git a/server/core/buffer.c b/server/core/buffer.c index b21cf216c..7cb771b37 100644 --- a/server/core/buffer.c +++ b/server/core/buffer.c @@ -262,6 +262,7 @@ GWBUF * gwbuf_consume(GWBUF *head, unsigned int length) { GWBUF *rval = head; + CHK_GWBUF(head); GWBUF_CONSUME(head, length); CHK_GWBUF(head); @@ -271,6 +272,9 @@ GWBUF *rval = head; rval = head->next; gwbuf_free(head); } + + ss_dassert(rval->end > rval->start); + return rval; } diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 48117d10a..865f57f3d 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -35,7 +35,8 @@ #define BINLOG_FNAMELEN 16 #define BLR_PROTOCOL "MySQLBackend" #define BINLOG_MAGIC { 0xfe, 0x62, 0x69, 0x6e } -#define BINLOG_NAMEFMT "mysql-bin.%06d" +#define BINLOG_NAMEFMT "%s.%06d" +#define BINLOG_NAME_ROOT "mysql-bin" /** * High and Low water marks for the slave dcb. These values can be overriden @@ -172,6 +173,7 @@ typedef struct router_instance { int serverid; /*< Server ID to use with master */ char *user; /*< User name to use with master */ char *password; /*< Password to use with master */ + char *fileroot; /*< Root of binlog filename */ DCB *master; /*< DCB for master connection */ SESSION *session; /*< Fake session for master connection */ unsigned int master_state; /*< State of the master FSM */ diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 8d1e2b39e..a6f2bb921 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -56,7 +56,7 @@ extern int lm_enabled_logfiles_bitmask; -static char *version_str = "V1.0.0"; +static char *version_str = "V1.0.1"; /* The router entry points */ static ROUTER *createInstance(SERVICE *service, char **options); @@ -234,6 +234,10 @@ int i; { inst->masterid = atoi(value); } + else if (strcmp(options[i], "filestem") == 0) + { + inst->fileroot = strdup(value); + } else if (strcmp(options[i], "lowwater") == 0) { inst->low_water = atoi(value); @@ -252,6 +256,8 @@ int i; } } } + if (inst->fileroot == NULL) + inst->fileroot = strdup(BINLOG_NAME_ROOT); } /* diff --git a/server/modules/routing/binlog/blr_file.c b/server/modules/routing/binlog/blr_file.c index c1ca8d2fd..7632b9d2e 100644 --- a/server/modules/routing/binlog/blr_file.c +++ b/server/modules/routing/binlog/blr_file.c @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -68,6 +69,9 @@ blr_file_init(ROUTER_INSTANCE *router) { char *ptr, path[1024], filename[1050]; int file_found, n = 1; +int root_len, i; +DIR *dirp; +struct dirent *dp; strcpy(path, "/usr/local/skysql/MaxScale"); if ((ptr = getenv("MAXSCALE_HOME")) != NULL) @@ -79,9 +83,25 @@ int file_found, n = 1; if (access(path, R_OK) == -1) mkdir(path, 0777); + + /* First try to find a binlog file number by reading the directory */ + root_len = strlen(router->fileroot); + dirp = opendir(path); + while ((dp = readdir(dirp)) != NULL) + { + if (strncmp(dp->d_name, router->fileroot, root_len) == 0) + { + i = atoi(dp->d_name + root_len + 1); + if (i > n) + n = i; + } + } + closedir(dirp); + + file_found = 0; do { - sprintf(filename, "%s/" BINLOG_NAMEFMT, path, n); + sprintf(filename, "%s/" BINLOG_NAMEFMT, path, router->fileroot, n); if (access(filename, R_OK) != -1) { file_found = 1; @@ -94,12 +114,12 @@ int file_found, n = 1; if (n == 0) // No binlog files found { - sprintf(filename, BINLOG_NAMEFMT, 1); + sprintf(filename, BINLOG_NAMEFMT, router->fileroot, 1); blr_file_create(router, filename); } else { - sprintf(filename, BINLOG_NAMEFMT, n); + sprintf(filename, BINLOG_NAMEFMT, router->fileroot, n); blr_file_append(router, filename); } diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index c26c68376..6dee211d8 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -153,7 +153,7 @@ char query[128]; return; } - if (MYSQL_RESPONSE_ERR(buf)) + if (router->master_state != BLRM_BINLOGDUMP && MYSQL_RESPONSE_ERR(buf)) { LOGIF(LM, (skygw_log_write( LOGFILE_ERROR, @@ -422,19 +422,46 @@ int no_residual = 1; if (reslen < len && gwbuf_length(pkt) >= len) { /* - * The message is contianed in more than the current + * The message is contained in more than the current * buffer, however we have the complete messasge in * this buffer and the chain of remaining buffers. * * Allocate a contiguous buffer for the binlog message * and copy the complete message into this buffer. */ - msg = malloc(len); + int remainder = len; + GWBUF *p = pkt; + + if ((msg = malloc(len)) == NULL) + { + LOGIF(LE,(skygw_log_write( + LOGFILE_ERROR, + "Insufficient memory to buffer event " + "of %d bytes\n.", len))); + break; + } + + ptr = msg; + while (p && remainder > 0) + { + int plen = GWBUF_LENGTH(p); + int n = (remainder > plen ? plen : remainder); + memcpy(ptr, GWBUF_DATA(p), n); + remainder -= n; + ptr += n; + if (remainder > 0) + p = p->next; + } + if (remainder) + { + LOGIF(LE,(skygw_log_write( + LOGFILE_ERROR, + "Expected entire message in buffer " + "chain, but failed to create complete " + "message as expected.\n"))); + break; + } - if (GWBUF_LENGTH(pkt->next) < len - reslen) - printf("Packet (length %d) spans more than 2 buffers\n", len); - memcpy(msg, pdata, reslen); - memcpy(&msg[reslen], GWBUF_DATA(pkt->next), len - reslen); ptr = msg; } else if (reslen < len) @@ -456,6 +483,13 @@ int no_residual = 1; } blr_extract_header(ptr, &hdr); + + if (hdr.event_size != len - 5) + { + printf("Packet length is %d, but event size is %d\n", + len, hdr.event_size); + abort(); + } if (hdr.ok == 0) { router->stats.n_binlogs++; @@ -525,12 +559,14 @@ int no_residual = 1; { free(msg); msg = NULL; - pkt = gwbuf_consume(pkt, reslen); - pkt = gwbuf_consume(pkt, len - reslen); } - else + while (len > 0) { - pkt = gwbuf_consume(pkt, len); + int n, plen; + plen = GWBUF_LENGTH(pkt); + n = (plen < len ? plen : len); + pkt = gwbuf_consume(pkt, n); + len -= n; } } @@ -596,15 +632,18 @@ uint32_t rval = 0, shift = 0; static void blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *ptr, REP_HEADER *hdr) { -int len; +int len, slen; uint64_t pos; char file[BINLOG_FNAMELEN+1]; ptr += 19; // Skip event header len = hdr->event_size - 19; // Event size minus header pos = extract_field(ptr, 32) + (extract_field(ptr+4, 32) << 32); - memcpy(file, ptr + 8, BINLOG_FNAMELEN); - file[BINLOG_FNAMELEN] = 0; + slen = len - 8; + if (slen > BINLOG_FNAMELEN) + slen = BINLOG_FNAMELEN; + memcpy(file, ptr + 8, slen); + file[slen] = 0; #ifdef VEBOSE_ROTATE printf("binlog rotate: "); @@ -614,7 +653,7 @@ char file[BINLOG_FNAMELEN+1]; printf("New file: %s @ %ld\n", file, pos); #endif - if (strncmp(router->binlog_name, file, BINLOG_FNAMELEN) != 0) + if (strncmp(router->binlog_name, file, slen) != 0) { router->stats.n_rotates++; blr_file_rotate(router, file, pos); @@ -631,6 +670,14 @@ CreateMySQLAuthData(char *username, char *password, char *database) { MYSQL_session *auth_info; + if (username == NULL || password == NULL) + { + LOGIF(LE,(skygw_log_write( + LOGFILE_ERROR, + "You must specify both username and password for the binlog router.\n"))); + return NULL; + } + if ((auth_info = calloc(1, sizeof(MYSQL_session))) == NULL) return NULL; strcpy(auth_info->user, username); From 13e95ffc536d39b4bef3c4594fb8439ec208f14c Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Sat, 7 Jun 2014 21:01:11 +0100 Subject: [PATCH 03/10] Improved diagnostics Added master reconnect on failure Added EPOLLRDHUP events --- server/core/config.c | 21 ++++- server/core/dcb.c | 1 + server/core/poll.c | 21 ++++- server/modules/include/blr.h | 7 ++ server/modules/protocol/mysql_backend.c | 25 ++++++ server/modules/routing/binlog/blr.c | 43 +++++++++- server/modules/routing/binlog/blr_master.c | 94 ++++++++++++++++------ 7 files changed, 180 insertions(+), 32 deletions(-) diff --git a/server/core/config.c b/server/core/config.c index dad4c4022..ee6a1d3bb 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -602,6 +602,15 @@ config_threadcount() return gateway.n_threads; } +static struct { + char *logname; + logfile_id_t logfile; +} lognames[] = { + { "log_messages", LOGFILE_MESSAGE }, + { "log_trace", LOGFILE_TRACE }, + { "log_debug", LOGFILE_DEBUG }, + { NULL, 0 } +}; /** * Configuration handler for items in the global [MaxScale] section * @@ -612,10 +621,20 @@ config_threadcount() static int handle_global_item(const char *name, const char *value) { +int i; if (strcmp(name, "threads") == 0) { gateway.n_threads = atoi(value); } else { - return 0; + for (i = 0; lognames[i].logname; i++) + { + if (strcasecmp(name, lognames[i].logname) == 0) + { + if (atoi(value)) + skygw_log_enable(lognames[i].logfile); + else + skygw_log_disable(lognames[i].logfile); + } + } } return 1; } diff --git a/server/core/dcb.c b/server/core/dcb.c index 52a9bbfc9..15f4ea637 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -1131,6 +1131,7 @@ dprintDCB(DCB *pdcb, DCB *dcb) dcb_printf(pdcb, "\tConnected to: %s\n", dcb->remote); dcb_printf(pdcb, "\tOwning Session: %d\n", dcb->session); dcb_printf(pdcb, "\tQueued write data: %d\n", gwbuf_length(dcb->writeq)); + dcb_printf(pdcb, "\tDelayed write data: %d\n", gwbuf_length(dcb->delayq)); dcb_printf(pdcb, "\tStatistics:\n"); dcb_printf(pdcb, "\t\tNo. of Reads: %d\n", dcb->stats.n_reads); dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes); diff --git a/server/core/poll.c b/server/core/poll.c index f1c65ebea..365b05e0e 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -99,7 +99,7 @@ poll_add_dcb(DCB *dcb) CHK_DCB(dcb); - ev.events = EPOLLIN | EPOLLOUT | EPOLLET; + ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET; ev.data.ptr = dcb; /*< @@ -474,6 +474,25 @@ poll_waitevents(void *arg) atomic_add(&pollStats.n_hup, 1); dcb->func.hangup(dcb); } + + if (ev & EPOLLRDHUP) + { + int eno = 0; + eno = gw_getsockerrno(dcb->fd); + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "EPOLLRDHUP on dcb %p, fd %d. " + "Errno %d, %s.", + pthread_self(), + dcb, + dcb->fd, + eno, + strerror(eno)))); + atomic_add(&pollStats.n_hup, 1); + dcb->func.hangup(dcb); + } } /*< for */ no_op = FALSE; } diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 865f57f3d..b00a63ad0 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -110,6 +110,11 @@ typedef struct { uint64_t n_cachehits; /*< Number of hits on the binlog cache */ uint64_t n_cachemisses; /*< Number of misses on the binlog cache */ unsigned int n_registered; /*< Number of registered slaves */ + int n_masterstarts; /*< Numebr of times connection restarted */ + int n_queueadd; /*< Numebr of times incoming data was added to processign queue */ + int n_residuals; /*< Number of times residual data was buffered */ + unsigned int n_heartbeats; /*< Number of heartbeat messages */ + time_t lastReply; uint64_t n_fakeevents; /*< Fake events not written to disk */ uint64_t events[0x24]; /*< Per event counters */ } ROUTER_STATS; @@ -175,8 +180,10 @@ typedef struct router_instance { char *password; /*< Password to use with master */ char *fileroot; /*< Root of binlog filename */ DCB *master; /*< DCB for master connection */ + DCB *client; /*< DCB for dummy client */ SESSION *session; /*< Fake session for master connection */ unsigned int master_state; /*< State of the master FSM */ + uint8_t lastEventReceived; GWBUF *residual; /*< Any residual binlog event */ MASTER_RESPONSES saved_master; /*< Saved master responses */ char binlog_name[BINLOG_FNAMELEN+1]; diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index f1774485d..1653de5f0 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -792,6 +792,31 @@ return_fd: static int gw_backend_hangup(DCB *dcb) { + SESSION *session; + void *rsession; + ROUTER_OBJECT *router; + ROUTER *router_instance; + int rc = 0; + + session = dcb->session; + + if (session->state == SESSION_STATE_ROUTER_READY) + { + router = session->service->router; + router_instance = session->service->router_instance; + rsession = session->router_session; + /*< + * rsession should never be NULL here. + */ + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [gw_backend_hangup] " + "Call closeSession for backend " + "session.", + pthread_self()))); + + router->closeSession(router_instance, rsession); + } /*< vraa : errorHandle */ return 1; } diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index a6f2bb921..006b30861 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -47,6 +48,7 @@ #include #include #include +#include #include #include @@ -436,8 +438,13 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session; * * TODO: Handle closure of master session */ - LOGIF(LD, (skygw_log_write_flush( + LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Binlog router close session with master"))); + router->master_state = BLRM_UNCONNECTED; + dcb_close(router->master); + dcb_free(router->master); + dcb_free(router->client); + blr_start_master(router); return; } CHK_CLIENT_RSES(slave); @@ -504,8 +511,10 @@ static void diagnostics(ROUTER *router, DCB *dcb) { ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)router; -ROUTER_SLAVE *session; -int i = 0; +ROUTER_SLAVE *session; +int i = 0; +char buf[40]; +struct tm tm; spinlock_acquire(&router_inst->lock); session = router_inst->slaves; @@ -515,7 +524,17 @@ int i = 0; session = session->next; } spinlock_release(&router_inst->lock); + + dcb_printf(dcb, "\tMaster connection DCB: %p\n", + router_inst->master); + dcb_printf(dcb, "\tMaster connection state: %s\n", + blrm_states[router_inst->master_state]); + + localtime_r(&router_inst->stats.lastReply, &tm); + asctime_r(&tm, buf); + dcb_printf(dcb, "\tNumber of master connects: %d\n", + router_inst->stats.n_masterstarts); dcb_printf(dcb, "\tCurrent binlog file: %s\n", router_inst->binlog_name); dcb_printf(dcb, "\tCurrent binlog position: %u\n", @@ -524,7 +543,7 @@ int i = 0; router_inst->stats.n_slaves); dcb_printf(dcb, "\tNumber of binlog events received: %u\n", router_inst->stats.n_binlogs); - dcb_printf(dcb, "\tNumber of fake binlog events: %u\n", + dcb_printf(dcb, "\tNumber of fake binlog events: %u\n", router_inst->stats.n_fakeevents); dcb_printf(dcb, "\tNumber of binlog events in error: %u\n", router_inst->stats.n_binlog_errors); @@ -534,10 +553,23 @@ int i = 0; router_inst->stats.n_cachehits); dcb_printf(dcb, "\tNumber of binlog cache misses: %u\n", router_inst->stats.n_cachemisses); + dcb_printf(dcb, "\tNumber of heartbeat events: %u\n", + router_inst->stats.n_heartbeats); dcb_printf(dcb, "\tNumber of packets received: %u\n", router_inst->stats.n_reads); + dcb_printf(dcb, "\tNumber of packets queued: %u\n", + router_inst->stats.n_queueadd); + dcb_printf(dcb, "\tCurrent length of incoming queue: %d\n", + gwbuf_length(router_inst->queue)); + dcb_printf(dcb, "\tNumber of residual data packets: %u\n", + router_inst->stats.n_residuals); dcb_printf(dcb, "\tAverage events per packet %.1f\n", (double)router_inst->stats.n_binlogs / router_inst->stats.n_reads); + dcb_printf(dcb, "\tLast event from master at: %s\n", buf); + dcb_printf(dcb, "\tLast event from master: 0x%x\n", + router_inst->lastEventReceived); + if (router_inst->active_logs) + dcb_printf(dcb, "\tRouter processing binlog records\n"); dcb_printf(dcb, "\tEvents received:\n"); for (i = 0; i < 0x24; i++) { @@ -596,6 +628,7 @@ ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; atomic_add(&router->stats.n_reads, 1); blr_master_response(router, queue); + router->stats.lastReply = time(0); } /** @@ -619,6 +652,8 @@ errorReply( DCB *backend_dcb, int action) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, "Erorr Reply '%s'", message))); } /** to be inline'd */ diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index 6dee211d8..6f3afd8c7 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -48,6 +48,9 @@ #include #include +#include +#include + #include #include #include @@ -68,6 +71,8 @@ static void *CreateMySQLAuthData(char *username, char *password, char *database) static void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr); static uint32_t extract_field(uint8_t *src, int bits); +static int keepalive = 1; + /** * blr_start_master - controls the connection of the binlog router to the * master MySQL server and triggers the slave registration process for @@ -81,15 +86,37 @@ blr_start_master(ROUTER_INSTANCE *router) DCB *client; GWBUF *buf; - client = dcb_alloc(DCB_ROLE_INTERNAL); + if ((client = dcb_alloc(DCB_ROLE_INTERNAL)) == NULL) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "Binlog router: failed to create DCB for dummy client\n"))); + return; + } + router->client = client; client->data = CreateMySQLAuthData(router->user, router->password, ""); - router->session = session_alloc(router->service, client); + if ((router->session = session_alloc(router->service, client)) == NULL) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "Binlog router: failed to create session for connection to master\n"))); + return; + } client->session = router->session; - router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL); + if ((router->master = dcb_connect(router->service->databases, router->session, BLR_PROTOCOL)) == NULL) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "Binlog router: failed to connect to master\n"))); + return; + } + +if (setsockopt(router->master->fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive , sizeof(keepalive ))) +perror("setsockopt"); + router->master_state = BLRM_AUTHENTICATED; buf = blr_make_query("SELECT UNIX_TIMESTAMP()"); router->master->func.write(router->master, buf); router->master_state = BLRM_TIMESTAMP; + + router->stats.n_masterstarts++; } /** @@ -131,6 +158,7 @@ char query[128]; * to the point that it will not look at new packets * added to the queue. */ + router->stats.n_queueadd++; router->queue = gwbuf_append(router->queue, buf); spinlock_release(&router->lock); return; @@ -305,16 +333,17 @@ int len = 18; if ((buf = gwbuf_alloc(len + 4)) == NULL) return NULL; data = GWBUF_DATA(buf); - encode_value(&data[0], len, 24); // Payload length - data[3] = 0; // Sequence ID - data[4] = COM_REGISTER_SLAVE; // Command - encode_value(&data[5], router->serverid, 32); // Slave Server ID - data[9] = 0; // Slave hostname length - data[10] = 0; // Slave username length - data[11] = 0; // Slave password length - encode_value(&data[12], router->service->ports->port, 16); // Slave master port - encode_value(&data[14], 0, 32); // Replication rank - encode_value(&data[18], router->masterid, 32); // Master server-id + encode_value(&data[0], len, 24); // Payload length + data[3] = 0; // Sequence ID + data[4] = COM_REGISTER_SLAVE; // Command + encode_value(&data[5], router->serverid, 32); // Slave Server ID + data[9] = 0; // Slave hostname length + data[10] = 0; // Slave username length + data[11] = 0; // Slave password length + encode_value(&data[12], + router->service->ports->port, 16); // Slave master port + encode_value(&data[14], 0, 32); // Replication rank + encode_value(&data[18], router->masterid, 32); // Master server-id return buf; } @@ -338,14 +367,16 @@ int len = 0x1b; return NULL; data = GWBUF_DATA(buf); - encode_value(&data[0], len,24); // Payload length - data[3] = 0; // Sequence ID - data[4] = COM_BINLOG_DUMP; // Command - encode_value(&data[5], router->binlog_position, 32); // binlog position - encode_value(&data[9], 0, 16); // Flags - encode_value(&data[11], router->serverid, 32); // Server-id of MaxScale + encode_value(&data[0], len,24); // Payload length + data[3] = 0; // Sequence ID + data[4] = COM_BINLOG_DUMP; // Command + encode_value(&data[5], + router->binlog_position, 32); // binlog position + encode_value(&data[9], 0, 16); // Flags + encode_value(&data[11], + router->serverid, 32); // Server-id of MaxScale strncpy((char *)&data[15], router->binlog_name, - BINLOG_FNAMELEN); // binlog filename + BINLOG_FNAMELEN); // binlog filename return buf; } @@ -469,9 +500,14 @@ int no_residual = 1; /* * The message is not fully contained in the current * and we do not have the complete message in the - * buffer chain. Therefore we must stop processing until - * we receive the next buffer. + * buffer chain. Therefore we must stop processing + * until we receive the next buffer. */ + router->stats.n_residuals++; + LOGIF(LD,(skygw_log_write( + LOGFILE_DEBUG, + "Residual data left after %d records.\n", + router->stats.n_binlogs))); break; } else @@ -486,13 +522,19 @@ int no_residual = 1; if (hdr.event_size != len - 5) { - printf("Packet length is %d, but event size is %d\n", - len, hdr.event_size); - abort(); + LOGIF(LE,(skygw_log_write( + LOGFILE_ERROR, + "Packet length is %d, but event size is %d, " + "binlog file %s position %d", + len, hdr.event_size, + router->binlog_name, + router->binlog_position))); + break; } if (hdr.ok == 0) { router->stats.n_binlogs++; + router->lastEventReceived = hdr.event_type; // #define SHOW_EVENTS #ifdef SHOW_EVENTS @@ -526,7 +568,7 @@ int no_residual = 1; #ifdef SHOW_EVENTS printf("Replication heartbeat\n"); #endif - ; + router->stats.n_heartbeats++; } else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F) { From f7a177dac82142dd6a10cf5a731f66266873a509 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Sun, 8 Jun 2014 13:52:48 +0100 Subject: [PATCH 04/10] Improved diagnostics --- server/modules/include/blr.h | 1 + server/modules/routing/binlog/blr.c | 7 ++- server/modules/routing/binlog/blr_master.c | 58 ++++++++++++++++++++-- 3 files changed, 61 insertions(+), 5 deletions(-) diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index b00a63ad0..8e0939737 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -116,6 +116,7 @@ typedef struct { unsigned int n_heartbeats; /*< Number of heartbeat messages */ time_t lastReply; uint64_t n_fakeevents; /*< Fake events not written to disk */ + uint64_t n_artificial; /*< Artificial events not written to disk */ uint64_t events[0x24]; /*< Per event counters */ } ROUTER_STATS; diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 0fdb7961c..762493788 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -548,6 +548,8 @@ struct tm tm; router_inst->stats.n_binlogs); dcb_printf(dcb, "\tNumber of fake binlog events: %u\n", router_inst->stats.n_fakeevents); + dcb_printf(dcb, "\tNumber of artificial binlog events: %u\n", + router_inst->stats.n_artificial); dcb_printf(dcb, "\tNumber of binlog events in error: %u\n", router_inst->stats.n_binlog_errors); dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n", @@ -568,7 +570,10 @@ struct tm tm; router_inst->stats.n_residuals); dcb_printf(dcb, "\tAverage events per packet %.1f\n", (double)router_inst->stats.n_binlogs / router_inst->stats.n_reads); - dcb_printf(dcb, "\tLast event from master at: %s\n", buf); + dcb_printf(dcb, "\tLast event from master at: %s", + buf); + dcb_printf(dcb, "\t (%d seconds ago)\n", + time(0) - router_inst->stats.lastReply); dcb_printf(dcb, "\tLast event from master: 0x%x\n", router_inst->lastEventReceived); if (router_inst->active_logs) diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index 6f3afd8c7..acb7b01cf 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -70,6 +70,7 @@ static void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hd static void *CreateMySQLAuthData(char *username, char *password, char *database); static void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr); static uint32_t extract_field(uint8_t *src, int bits); +static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len); static int keepalive = 1; @@ -468,7 +469,9 @@ int no_residual = 1; LOGIF(LE,(skygw_log_write( LOGFILE_ERROR, "Insufficient memory to buffer event " - "of %d bytes\n.", len))); + "of %d bytes. Binlog %s @ %d\n.", + len, router->binlog_name, + router->binlog_position))); break; } @@ -489,7 +492,9 @@ int no_residual = 1; LOGFILE_ERROR, "Expected entire message in buffer " "chain, but failed to create complete " - "message as expected.\n"))); + "message as expected. %s @ %d\n", + router->binlog_name, + router->binlog_position))); break; } @@ -506,8 +511,9 @@ int no_residual = 1; router->stats.n_residuals++; LOGIF(LD,(skygw_log_write( LOGFILE_DEBUG, - "Residual data left after %d records.\n", - router->stats.n_binlogs))); + "Residual data left after %d records. %s @ %d\n", + router->stats.n_binlogs, + router->binlog_name, router->binlog_position))); break; } else @@ -529,6 +535,7 @@ int no_residual = 1; len, hdr.event_size, router->binlog_name, router->binlog_position))); + blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len); break; } if (hdr.ok == 0) @@ -545,6 +552,11 @@ int no_residual = 1; if (hdr.event_type == FORMAT_DESCRIPTION_EVENT && hdr.next_pos == 0) { // Fake format description message + LOGIF(LD,(skygw_log_write(LOGFILE_DEBUG, + "Replication fake event. " + "Binlog %s @ %d.\n", + router->binlog_name, + router->binlog_position))); router->stats.n_fakeevents++; if (hdr.event_type == FORMAT_DESCRIPTION_EVENT) { @@ -568,6 +580,12 @@ int no_residual = 1; #ifdef SHOW_EVENTS printf("Replication heartbeat\n"); #endif + LOGIF(LD,(skygw_log_write( + LOGFILE_DEBUG, + "Replication heartbeat. " + "Binlog %s @ %d.\n", + router->binlog_name, + router->binlog_position))); router->stats.n_heartbeats++; } else if (hdr.flags != LOG_EVENT_ARTIFICIAL_F) @@ -583,6 +601,17 @@ int no_residual = 1; } else { + router->stats.n_artificial++; + LOGIF(LD,(skygw_log_write( + LOGFILE_DEBUG, + "Artificial event not written " + "to disk or distributed. " + "Type 0x%x, Length %d, Binlog " + "%s @ %d\n.", + hdr.event_type, + hdr.event_size, + router->binlog_name, + router->binlog_position))); ptr += 5; if (hdr.event_type == ROTATE_EVENT) { @@ -594,6 +623,12 @@ int no_residual = 1; else { printf("Binlog router error: %s\n", &ptr[7]); + LOGIF(LE,(skygw_log_write(LOGFILE_ERROR, + "Error packet in binlog stream.%s @ %d\n.", + router->binlog_name, + router->binlog_position))); + blr_log_packet(LOGFILE_ERROR, "Error Packet:", + ptr, len); router->stats.n_binlog_errors++; } @@ -794,3 +829,18 @@ ROUTER_SLAVE *slave; } spinlock_release(&router->lock); } + +static void +blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len) +{ +int i; + + skygw_log_write(file, "%s length = %d: ", msg, len); + for (i = 0; i < len && i < 40; i++) + skygw_log_write(file, "0x%02x ", ptr[i]); + if (i < len) + skygw_log_write_flush(file, "...\n"); + else + skygw_log_write_flush(file, "\n"); + +} From 2963a8448bb9ea5947edcc19438b45634f3b1b9b Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Tue, 10 Jun 2014 17:59:49 +0100 Subject: [PATCH 05/10] Implement non-blocking alternative to mutexes for read serialisation --- server/core/dcb.c | 60 ++++++++-- server/core/poll.c | 8 ++ server/include/dcb.h | 20 ++-- server/modules/include/blr.h | 24 ++-- server/modules/routing/binlog/blr.c | 11 +- server/modules/routing/binlog/blr_master.c | 132 +++++++++++++++++++-- server/modules/routing/binlog/blr_slave.c | 26 ++-- 7 files changed, 230 insertions(+), 51 deletions(-) diff --git a/server/core/dcb.c b/server/core/dcb.c index 713cf4b3c..5dc11a374 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -84,7 +84,13 @@ static bool dcb_set_state_nomutex( dcb_state_t* old_state); static void dcb_call_callback(DCB *dcb, DCB_REASON reason); -DCB* dcb_get_zombies(void) +/** + * Return the pointer to the lsit of zombie DCB's + * + * @return Zombies DCB list + */ +DCB * +dcb_get_zombies(void) { return zombies; } @@ -122,6 +128,9 @@ DCB *rval; spinlock_init(&rval->delayqlock); spinlock_init(&rval->authlock); spinlock_init(&rval->cb_lock); + spinlock_init(&rval->pollinlock); + rval->pollinbusy = 0; + rval->readcheck = 0; rval->fd = -1; memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics rval->state = DCB_STATE_ALLOC; @@ -1147,12 +1156,14 @@ DCB *dcb; dcb_printf(pdcb, "\tQueued write data: %d\n", gwbuf_length(dcb->writeq)); dcb_printf(pdcb, "\tStatistics:\n"); - dcb_printf(pdcb, "\t\tNo. of Reads: %d\n", dcb->stats.n_reads); - dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes); - dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered); - dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); - dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); - dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); + dcb_printf(pdcb, "\t\tNo. of Reads: %d\n", dcb->stats.n_reads); + dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes); + dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered); + dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); + dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls); + dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_busypolls); + dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); + dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); dcb = dcb->next; } spinlock_release(&dcbspin); @@ -1211,6 +1222,8 @@ dprintDCB(DCB *pdcb, DCB *dcb) dcb->stats.n_buffered); dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); + dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls); + dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_busypolls); dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", @@ -1711,3 +1724,36 @@ int rval = 0; return rval; } + +/** + * Called by the EPOLLIN event. Take care of calling the protocol + * read entry point and managing multiple threads copeting for the DCB + * without blocking those threads. + * + * @param dcb The DCB that has data available + */ +void +dcb_pollin(DCB *dcb) +{ + + spinlock_acquire(&dcb->pollinlock); + if (dcb->pollinbusy == 0) + { + dcb->pollinbusy = 1; + do { + if (dcb->readcheck) + dcb->stats.n_readrechecks++; + dcb->readcheck = 0; + spinlock_release(&dcb->pollinlock); + dcb->func.read(dcb); + spinlock_acquire(&dcb->pollinlock); + } while (dcb->readcheck); + dcb->pollinbusy = 0; + } + else + { + dcb->stats.n_busypolls++; + dcb->readcheck = 1; + } + spinlock_release(&dcb->pollinlock); +} diff --git a/server/core/poll.c b/server/core/poll.c index e86d656ad..f6100df4d 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -393,11 +393,13 @@ poll_waitevents(void *arg) } if (ev & EPOLLIN) { +#if MUTEX_BLOCK simple_mutex_lock(&dcb->dcb_read_lock, true); ss_info_dassert(!dcb->dcb_read_active, "Read already active"); dcb->dcb_read_active = TRUE; +#endif if (dcb->state == DCB_STATE_LISTENING) { @@ -421,11 +423,17 @@ poll_waitevents(void *arg) dcb, dcb->fd))); atomic_add(&pollStats.n_read, 1); +#if MUTEX_BLOCK dcb->func.read(dcb); +#else + dcb_pollin(dcb); +#endif } +#if MUTEX_BLOCK dcb->dcb_read_active = FALSE; simple_mutex_unlock( &dcb->dcb_read_lock); +#endif } if (ev & EPOLLERR) { diff --git a/server/include/dcb.h b/server/include/dcb.h index e7d2ec716..68b1f349c 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -105,12 +105,14 @@ typedef struct gw_protocol { * The statitics gathered on a descriptor control block */ typedef struct dcbstats { - int n_reads; /*< Number of reads on this descriptor */ - int n_writes; /*< Number of writes on this descriptor */ - int n_accepts; /*< Number of accepts on this descriptor */ - int n_buffered; /*< Number of buffered writes */ - int n_high_water; /*< Number of crosses of high water mark */ - int n_low_water; /*< Number of crosses of low water mark */ + int n_reads; /*< Number of reads on this descriptor */ + int n_writes; /*< Number of writes on this descriptor */ + int n_accepts; /*< Number of accepts on this descriptor */ + int n_buffered; /*< Number of buffered writes */ + int n_high_water; /*< Number of crosses of high water mark */ + int n_low_water; /*< Number of crosses of low water mark */ + int n_busypolls; /*< Number of read polls whiel reading */ + int n_readrechecks; /*< Number of rechecks for reads */ } DCBSTATS; /** @@ -226,6 +228,9 @@ typedef struct dcb { int command; /**< Specific client command type */ SPINLOCK cb_lock; /**< The lock for the callbacks linked list */ DCB_CALLBACK *callbacks; /**< The list of callbacks for the DCB */ + SPINLOCK pollinlock; + int pollinbusy; + int readcheck; unsigned int high_water; /**< High water mark */ unsigned int low_water; /**< Low water mark */ @@ -253,6 +258,7 @@ int fail_accept_errno; #define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water) #define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water) +void dcb_pollin(DCB *); DCB *dcb_get_zombies(void); int gw_write( #if defined(SS_DEBUG) @@ -281,7 +287,7 @@ void dcb_hashtable_stats(DCB *, void *); /**< Print statisitics */ void dcb_add_to_zombieslist(DCB* dcb); int dcb_add_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *), void *); -int dcb_remove_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON), +int dcb_remove_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON), void *); int dcb_isvalid(DCB *); /* Check the DCB is in the linked list */ diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 8e0939737..f4ec0f314 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -59,10 +59,10 @@ * Slave statistics */ typedef struct { - unsigned int n_events; /*< Number of events sent */ - unsigned int n_bursts; /*< Number of bursts sent */ - unsigned int n_requests; /*< Number of requests received */ - unsigned int n_flows; /*< Number of flow control restarts */ + int n_events; /*< Number of events sent */ + int n_bursts; /*< Number of bursts sent */ + int n_requests; /*< Number of requests received */ + int n_flows; /*< Number of flow control restarts */ } SLAVE_STATS; /** @@ -102,18 +102,19 @@ typedef struct router_slave { * The statistics for this router instance */ typedef struct { - unsigned int n_slaves; /*< Number slave sessions created */ - unsigned int n_reads; /*< Number of record reads */ + int n_slaves; /*< Number slave sessions created */ + int n_reads; /*< Number of record reads */ uint64_t n_binlogs; /*< Number of binlog records from master */ uint64_t n_binlog_errors;/*< Number of binlog records from master */ uint64_t n_rotates; /*< Number of binlog rotate events */ uint64_t n_cachehits; /*< Number of hits on the binlog cache */ uint64_t n_cachemisses; /*< Number of misses on the binlog cache */ - unsigned int n_registered; /*< Number of registered slaves */ + int n_registered; /*< Number of registered slaves */ int n_masterstarts; /*< Numebr of times connection restarted */ + int n_delayedreconnects; int n_queueadd; /*< Numebr of times incoming data was added to processign queue */ int n_residuals; /*< Number of times residual data was buffered */ - unsigned int n_heartbeats; /*< Number of heartbeat messages */ + int n_heartbeats; /*< Number of heartbeat messages */ time_t lastReply; uint64_t n_fakeevents; /*< Fake events not written to disk */ uint64_t n_artificial; /*< Artificial events not written to disk */ @@ -199,6 +200,7 @@ typedef struct router_instance { BLCACHE *cache[2]; ROUTER_STATS stats; /*< Statistics for this router */ int active_logs; + int reconnect_pending; GWBUF *queue; struct router_instance *next; @@ -326,13 +328,17 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered", */ extern void blr_start_master(ROUTER_INSTANCE *); extern void blr_master_response(ROUTER_INSTANCE *, GWBUF *); +extern void blr_master_reconnect(ROUTER_INSTANCE *); extern int blr_slave_request(ROUTER_INSTANCE *, ROUTER_SLAVE *, GWBUF *); - +extern void blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr); +extern int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); extern void blr_init_cache(ROUTER_INSTANCE *); extern void blr_file_init(ROUTER_INSTANCE *); +extern int blr_open_binlog(ROUTER_INSTANCE *, char *); extern void blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *); extern void blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t); +extern void blr_file_flush(ROUTER_INSTANCE *); extern GWBUF *blr_read_binlog(int, unsigned int, REP_HEADER *); #endif diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 762493788..e25f688ff 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -276,6 +276,7 @@ int i; spinlock_release(&instlock); inst->active_logs = 0; + inst->reconnect_pending = 0; /* * Initialise the binlog file and position @@ -443,11 +444,7 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session; */ LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Binlog router close session with master"))); - router->master_state = BLRM_UNCONNECTED; - dcb_close(router->master); - dcb_free(router->master); - dcb_free(router->client); - blr_start_master(router); + blr_master_reconnect(router); return; } CHK_CLIENT_RSES(slave); @@ -538,6 +535,8 @@ struct tm tm; dcb_printf(dcb, "\tNumber of master connects: %d\n", router_inst->stats.n_masterstarts); + dcb_printf(dcb, "\tNumber of delayed reconnects: %d\n", + router_inst->stats.n_delayedreconnects); dcb_printf(dcb, "\tCurrent binlog file: %s\n", router_inst->binlog_name); dcb_printf(dcb, "\tCurrent binlog position: %u\n", @@ -578,6 +577,8 @@ struct tm tm; router_inst->lastEventReceived); if (router_inst->active_logs) dcb_printf(dcb, "\tRouter processing binlog records\n"); + if (router_inst->reconnect_pending) + dcb_printf(dcb, "\tRouter pending reconnect to master\n"); dcb_printf(dcb, "\tEvents received:\n"); for (i = 0; i < 0x24; i++) { diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index acb7b01cf..800535e65 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -120,6 +120,86 @@ perror("setsockopt"); router->stats.n_masterstarts++; } +/** + * Reconnect to the master server. + * + * IMPORTANT - must be called with router->active_logs set by the + * thread that set active_logs. + * + * @param router The router instance + */ +static void +blr_restart_master(ROUTER_INSTANCE *router) +{ +GWBUF *ptr; + + dcb_close(router->master); + dcb_free(router->master); + dcb_free(router->client); + + /* Discard the queued residual data */ + ptr = router->residual; + while (ptr) + { + ptr = gwbuf_consume(ptr, GWBUF_LENGTH(ptr)); + } + router->residual = NULL; + + /* Discard the queued data */ + ptr = router->queue; + while (ptr) + { + ptr = gwbuf_consume(ptr, GWBUF_LENGTH(ptr)); + } + router->queue = NULL; + /* Now it is safe to unleash other threads on this router instance */ + spinlock_acquire(&router->lock); + router->reconnect_pending = 0; + router->active_logs = 0; + spinlock_release(&router->lock); + blr_start_master(router); +} + +/** + * Request a reconnect to the master. + * + * If another thread is active processing messages from the master + * then merely set a flag for that thread to do the restart. If no + * threads are active then directly call the restart routine to + * reconnect to the master. + * + * @param router The router instance + */ +void +blr_master_reconnect(ROUTER_INSTANCE *router) +{ +int do_reconnect = 0; + + spinlock_acquire(&router->lock); + if (router->active_logs) + { + /* Currently processing a response, set a flag + * and get the thread that is process a response + * to deal with the reconnect. + */ + router->reconnect_pending = 1; + router->stats.n_delayedreconnects++; + } + else + { + router->active_logs = 1; + do_reconnect = 1; + } + spinlock_release(&router->lock); + if (do_reconnect) + { + blr_restart_master(router); + spinlock_acquire(&router->lock); + router->active_logs = 0; + spinlock_release(&router->lock); + } +} + /** * Binlog router master side state machine event handler. * @@ -154,6 +234,7 @@ char query[128]; spinlock_acquire(&router->lock); if (router->active_logs) { + int length; /* * Thread already processing a packet and has not got * to the point that it will not look at new packets @@ -161,7 +242,14 @@ char query[128]; */ router->stats.n_queueadd++; router->queue = gwbuf_append(router->queue, buf); + length = gwbuf_length(router->queue); spinlock_release(&router->lock); + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, "Queued data due to active log " + "handling. %s @ %d, queue length %d\n", + router->binlog_name, + router->binlog_position, + length))); return; } else @@ -172,11 +260,17 @@ char query[128]; if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE) { - LOGIF(LM, (skygw_log_write( + LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.\n", router->master_state))); gwbuf_consume(buf, gwbuf_length(buf)); spinlock_acquire(&router->lock); + if (router->reconnect_pending) + { + spinlock_release(&router->lock); + blr_restart_master(router); + return; + } router->active_logs = 0; spinlock_release(&router->lock); return; @@ -184,7 +278,7 @@ char query[128]; if (router->master_state != BLRM_BINLOGDUMP && MYSQL_RESPONSE_ERR(buf)) { - LOGIF(LM, (skygw_log_write( + LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, "Received error: %d, %s from master during %s phase of the master state machine.\n", MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state] @@ -192,6 +286,12 @@ char query[128]; gwbuf_consume(buf, gwbuf_length(buf)); spinlock_acquire(&router->lock); router->active_logs = 0; + if (router->reconnect_pending) + { + spinlock_release(&router->lock); + blr_restart_master(router); + return; + } spinlock_release(&router->lock); return; } @@ -281,12 +381,20 @@ char query[128]; spinlock_acquire(&router->lock); if ((buf = router->queue) != NULL) { - router->queue = buf->next; - buf->next = NULL; + router->queue = NULL; } else { - router->active_logs = 0; + if (router->reconnect_pending) + { + spinlock_release(&router->lock); + blr_restart_master(router); + spinlock_acquire(&router->lock); + } + else + { + router->active_logs = 0; + } } spinlock_release(&router->lock); } while (buf != NULL); @@ -415,7 +523,9 @@ REP_HEADER hdr; int len, reslen; int no_residual = 1; - /* Prepend any residual buffer to the buffer chain we have been called with. */ + /* Prepend any residual buffer to the buffer chain we have + * been called with. + */ if (router->residual) { pkt = gwbuf_append(router->residual, pkt); @@ -833,14 +943,16 @@ ROUTER_SLAVE *slave; static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len) { +char buf[400], *bufp; int i; - skygw_log_write(file, "%s length = %d: ", msg, len); + bufp = buf; + bufp += sprintf(bufp, "%s length = %d: ", msg, len); for (i = 0; i < len && i < 40; i++) - skygw_log_write(file, "0x%02x ", ptr[i]); + bufp += sprintf(bufp, "0x%02x ", ptr[i]); if (i < len) - skygw_log_write_flush(file, "...\n"); + skygw_log_write_flush(file, "%s...\n", buf); else - skygw_log_write_flush(file, "\n"); + skygw_log_write_flush(file, "%s\n", buf); } diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index da7d7f8a2..a9afc2a50 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -153,7 +153,7 @@ char *word, *brkb; int query_len; qtext = GWBUF_DATA(queue); - query_len = extract_field(qtext, 24) - 1; + query_len = extract_field((uint8_t *)qtext, 24) - 1; qtext += 5; // Skip header and first byte of the payload query_text = strndup(qtext, query_len); @@ -292,7 +292,7 @@ unsigned char *data; int len; if ((pkt = gwbuf_alloc(strlen(msg) + 13)) == NULL) - return NULL; + return; data = GWBUF_DATA(pkt); len = strlen(msg) + 1; encode_value(&data[0], len, 24); // Payload length @@ -301,7 +301,7 @@ int len; data[4] = 0xff; // Error indicator data[5] = 0; // Error Code data[6] = 0; // Error Code - strncpy(&data[7], "#00000", 6); + strncpy((char *)&data[7], "#00000", 6); memcpy(&data[13], msg, strlen(msg)); // Error Message slave->dcb->func.write(slave->dcb, pkt); } @@ -346,15 +346,15 @@ int len, ts_len; if ((pkt = gwbuf_alloc(len)) == NULL) return 0; ptr = GWBUF_DATA(pkt); - memcpy(ptr, timestamp_def, sizeof(timestamp_def)); // Fixed preamble + memcpy(ptr, timestamp_def, sizeof(timestamp_def)); // Fixed preamble ptr += sizeof(timestamp_def); - encode_value(ptr, ts_len + 1, 24); // Add length of data packet + encode_value(ptr, ts_len + 1, 24); // Add length of data packet ptr += 3; - *ptr++ = 0x04; // Sequence number in response - *ptr++ = ts_len; // Length of result string - strncpy(ptr, timestamp, ts_len); // Result string + *ptr++ = 0x04; // Sequence number in response + *ptr++ = ts_len; // Length of result string + strncpy((char *)ptr, timestamp, ts_len); // Result string ptr += ts_len; - memcpy(ptr, timestamp_eof, sizeof(timestamp_eof)); // EOF packet to terminate result + memcpy(ptr, timestamp_eof, sizeof(timestamp_eof)); // EOF packet to terminate result return slave->dcb->func.write(slave->dcb, pkt); } @@ -386,7 +386,7 @@ int len, slen; slen = *ptr++; if (slen != 0) { - slave->hostname = strndup(ptr, slen); + slave->hostname = strndup((char *)ptr, slen); ptr += slen; } else @@ -395,14 +395,14 @@ int len, slen; if (slen != 0) { ptr += slen; - slave->user = strndup(ptr, slen); + slave->user = strndup((char *)ptr, slen); } else slave->user = NULL; slen = *ptr++; if (slen != 0) { - slave->passwd = strndup(ptr, slen); + slave->passwd = strndup((char *)ptr, slen); ptr += slen; } else @@ -468,7 +468,7 @@ uint32_t chksum; ptr += 2; serverid = extract_field(ptr, 32); ptr += 4; - strncpy(slave->binlogfile, ptr, BINLOG_FNAMELEN); + strncpy(slave->binlogfile, (char *)ptr, BINLOG_FNAMELEN); slave->state = BLRS_DUMPING; slave->seqno = 1; From cf55f271f04bdfd3a6ef0f3eb47be9f131187d31 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Thu, 28 Aug 2014 11:42:29 +0100 Subject: [PATCH 06/10] Add file left off last commit --- server/modules/include/blr.h | 38 +++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index f4ec0f314..0327a34ed 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -31,6 +31,7 @@ */ #include #include +#include #define BINLOG_FNAMELEN 16 #define BLR_PROTOCOL "MySQLBackend" @@ -43,7 +44,7 @@ * by the router options highwater and lowwater. */ #define DEF_LOW_WATER 20000 -#define DEF_HIGH_WATER 100000 +#define DEF_HIGH_WATER 300000 /** * Some useful macros for examining the MySQL Response packets @@ -63,6 +64,16 @@ typedef struct { int n_bursts; /*< Number of bursts sent */ int n_requests; /*< Number of requests received */ int n_flows; /*< Number of flow control restarts */ + int n_catchupnr; /*< No. of times catchup resulted in not entering loop */ + int n_alreadyupd; + int n_upd; + int n_cb; + int n_cbna; + int n_dcb; + int n_above; + int n_failed_read; + int n_overrun; + int n_actions[3]; } SLAVE_STATS; /** @@ -83,11 +94,14 @@ typedef struct router_slave { char *user; /*< Username if given */ char *passwd; /*< Password if given */ short port; /*< MySQL port */ + int nocrc; /*< Disable CRC */ + int overrun; uint32_t rank; /*< Replication rank */ uint8_t seqno; /*< Replication dump sequence no */ SPINLOCK catch_lock; /*< Event catchup lock */ unsigned int cstate; /*< Catch up state */ SPINLOCK rses_lock; /*< Protects rses_deleted */ + pthread_t pthread; struct router_instance *router; /*< Pointer to the owning router */ struct router_slave *next; @@ -110,9 +124,9 @@ typedef struct { uint64_t n_cachehits; /*< Number of hits on the binlog cache */ uint64_t n_cachemisses; /*< Number of misses on the binlog cache */ int n_registered; /*< Number of registered slaves */ - int n_masterstarts; /*< Numebr of times connection restarted */ + int n_masterstarts; /*< Number of times connection restarted */ int n_delayedreconnects; - int n_queueadd; /*< Numebr of times incoming data was added to processign queue */ + int n_queueadd; /*< Number of times incoming data was added to processign queue */ int n_residuals; /*< Number of times residual data was buffered */ int n_heartbeats; /*< Number of heartbeat messages */ time_t lastReply; @@ -133,6 +147,9 @@ typedef struct { GWBUF *uuid; /*< Master UUID */ GWBUF *setslaveuuid; /*< Set Slave UUID */ GWBUF *setnames; /*< Set NAMES latin1 */ + GWBUF *utf8; /*< Set NAMES utf8 */ + GWBUF *select1; /*< select 1 */ + GWBUF *selectver; /*< select version() */ uint8_t *fde_event; /*< Format Description Event */ int fde_len; /*< Length of fde_event */ } MASTER_RESPONSES; @@ -235,15 +252,19 @@ typedef struct rep_header { #define BLRM_MUUID 0x0008 #define BLRM_SUUID 0x0009 #define BLRM_LATIN1 0x000A -#define BLRM_REGISTER 0x000B -#define BLRM_BINLOGDUMP 0x000C +#define BLRM_UTF8 0x000B +#define BLRM_SELECT1 0x000C +#define BLRM_SELECTVER 0x000D +#define BLRM_REGISTER 0x000E +#define BLRM_BINLOGDUMP 0x000F -#define BLRM_MAXSTATE 0x000C +#define BLRM_MAXSTATE 0x000F static char *blrm_states[] = { "Unconnected", "Authenticated", "Timestamp retrieval", "Server ID retrieval", "HeartBeat Period setup", "binlog checksum config", "binlog checksum rerieval", "GTID Mode retrieval", "Master UUID retrieval", - "Set Slave UUID", "Set Names", "Register slave", "Binlog Dump" }; + "Set Slave UUID", "Set Names latin1", "Set Names utf8", "select 1", + "select version()", "Register slave", "Binlog Dump" }; #define BLRS_CREATED 0x0000 #define BLRS_UNREGISTERED 0x0001 @@ -262,10 +283,13 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered", #define CS_INNERLOOP 0x0002 #define CS_UPTODATE 0x0004 #define CS_EXPECTCB 0x0008 +#define CS_DIST 0x0010 +#define CS_DISTLATCH 0x0020 /** * MySQL protocol OpCodes needed for replication */ +#define COM_QUIT 0x01 #define COM_QUERY 0x03 #define COM_REGISTER_SLAVE 0x15 #define COM_BINLOG_DUMP 0x12 From 531dfd017c8d6685888f9a6174db3e928cc8a8a6 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Fri, 29 Aug 2014 11:24:58 +0100 Subject: [PATCH 07/10] Addition of thread data to commands --- server/core/hashtable.c | 2 +- server/core/poll.c | 201 +++++++++++++++++++++++++++++- server/include/dcb.h | 1 + server/include/poll.h | 1 + server/modules/routing/debugcmd.c | 8 ++ 5 files changed, 210 insertions(+), 3 deletions(-) diff --git a/server/core/hashtable.c b/server/core/hashtable.c index 50857bfec..ce6c828ef 100644 --- a/server/core/hashtable.c +++ b/server/core/hashtable.c @@ -28,7 +28,7 @@ * and value and to free them. * * The hashtable is arrange as a set of linked lists, the number of linked - * lists beign the hashsize as requested by the user. Entries are hashed by + * lists being the hashsize as requested by the user. Entries are hashed by * calling the hash function that is passed in by the user, this is used as * an index into the array of linked lists, usign modulo hashsize. * diff --git a/server/core/poll.c b/server/core/poll.c index 17882d1ed..80c424885 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -28,6 +28,7 @@ #include #include #include +#include extern int lm_enabled_logfiles_bitmask; @@ -41,6 +42,8 @@ extern int lm_enabled_logfiles_bitmask; * 19/06/13 Mark Riddoch Initial implementation * 28/06/13 Mark Riddoch Added poll mask support and DCB * zombie management + * 29/08/14 Mark Riddoch Addition of thread status data, load average + * etc. * * @endverbatim */ @@ -51,6 +54,45 @@ static GWBITMASK poll_mask; static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */ static int n_waiting = 0; /*< No. of threads in epoll_wait */ +/** + * Thread load average, this is the average number of descriptors in each + * poll completion, a value of 1 or less is he ideal. + */ +static double load_average = 0.0; +static int load_samples = 0; + +/* Thread statistics data */ +static int n_threads; /*< No. of threads */ + +/** + * Internal MaxScale thread states + */ +typedef enum { THREAD_STOPPED, THREAD_IDLE, + THREAD_POLLING, THREAD_PROCESSING, + THREAD_ZPROCESSING } THREAD_STATE; + +/** + * Thread data used to report the current state and activity related to + * a thread + */ +typedef struct { + THREAD_STATE state; /*< Current thread state */ + int n_fds; /*< No. of descriptors thread is processing */ + DCB *cur_dcb; /*< Current DCB being processed */ + uint32_t event; /*< Current event being processed */ +} THREAD_DATA; + +static THREAD_DATA *thread_data = NULL; /*< Status of each thread */ + +/** + * The number of buckets used to gather statistics about how many + * descriptors where processed on each epoll completion. + * + * An array of wakeup counts is created, with the number of descriptors used + * to index that array. Each time a completion occurs the n_fds - 1 value is + * used to index this array and increment the count held there. + * If n_fds - 1 >= MAXFDS then the count at MAXFDS -1 is incremented. + */ #define MAXNFDS 10 /** @@ -77,6 +119,8 @@ static struct { void poll_init() { +int i; + if (epoll_fd != -1) return; if ((epoll_fd = epoll_create(MAX_EVENTS)) == -1) @@ -86,6 +130,15 @@ poll_init() } memset(&pollStats, 0, sizeof(pollStats)); bitmask_init(&poll_mask); + n_threads = config_threadcount(); + if ((thread_data = + (THREAD_DATA *)malloc(n_threads * sizeof(THREAD_DATA))) != NULL) + { + for (i = 0; i < n_threads; i++) + { + thread_data[i].state = THREAD_STOPPED; + } + } simple_mutex_init(&epoll_wait_mutex, "epoll_wait_mutex"); } @@ -260,6 +313,10 @@ DCB *zombies = NULL; /* Add this thread to the bitmask of running polling threads */ bitmask_set(&poll_mask, thread_id); + if (thread_data) + { + thread_data[thread_id].state = THREAD_IDLE; + } while (1) { @@ -280,6 +337,10 @@ DCB *zombies = NULL; #if 0 simple_mutex_lock(&epoll_wait_mutex, TRUE); #endif + if (thread_data) + { + thread_data[thread_id].state = THREAD_POLLING; + } if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1) { @@ -339,8 +400,19 @@ DCB *zombies = NULL; pthread_self(), nfds))); atomic_add(&pollStats.n_polls, 1); + if (thread_data) + { + thread_data[thread_id].n_fds = nfds; + thread_data[thread_id].cur_dcb = NULL; + thread_data[thread_id].event = 0; + thread_data[thread_id].state = THREAD_PROCESSING; + } - pollStats.n_fds[(nfds < MAXNFDS ? (nfds - 1) : MAXNFDS)]++; + pollStats.n_fds[(nfds < MAXNFDS ? (nfds - 1) : MAXNFDS - 1)]++; + + load_average = (load_average * load_samples + nfds) + / (load_samples + 1); + atomic_add(&load_samples, 1); for (i = 0; i < nfds; i++) { @@ -348,6 +420,11 @@ DCB *zombies = NULL; __uint32_t ev = events[i].events; CHK_DCB(dcb); + if (thread_data) + { + thread_data[thread_id].cur_dcb = dcb; + thread_data[thread_id].event = ev; + } #if defined(SS_DEBUG) if (dcb_fake_write_ev[dcb->fd] != 0) { @@ -531,6 +608,10 @@ DCB *zombies = NULL; no_op = FALSE; } process_zombies: + if (thread_data) + { + thread_data[thread_id].state = THREAD_ZPROCESSING; + } zombies = dcb_process_zombies(thread_id); if (zombies == NULL) { @@ -543,9 +624,17 @@ DCB *zombies = NULL; * Remove the thread from the bitmask of running * polling threads. */ + if (thread_data) + { + thread_data[thread_id].state = THREAD_STOPPED; + } bitmask_clear(&poll_mask, thread_id); return; } + if (thread_data) + { + thread_data[thread_id].state = THREAD_IDLE; + } } /*< while(1) */ } @@ -600,6 +689,114 @@ int i; { dcb_printf(dcb, "\t%2d\t\t\t%d\n", i + 1, pollStats.n_fds[i]); } - dcb_printf(dcb, "\t> %d\t\t\t%d\n", MAXNFDS, + dcb_printf(dcb, "\t>= %d\t\t\t%d\n", MAXNFDS, pollStats.n_fds[MAXNFDS-1]); } + +/** + * Convert an EPOLL event mask into a printable string + * + * @param event The event mask + * @return A string representation, the caller must free the string + */ +static char * +event_to_string(uint32_t event) +{ +char *str; + + str = malloc(22); // 22 is max returned string length + if (str == NULL) + return NULL; + *str = 0; + if (event & EPOLLIN) + { + strcat(str, "IN"); + } + if (event & EPOLLOUT) + { + if (*str) + strcat(str, "|"); + strcat(str, "OUT"); + } + if (event & EPOLLERR) + { + if (*str) + strcat(str, "|"); + strcat(str, "ERR"); + } + if (event & EPOLLHUP) + { + if (*str) + strcat(str, "|"); + strcat(str, "HUP"); + } + if (event & EPOLLRDHUP) + { + if (*str) + strcat(str, "|"); + strcat(str, "RDHUP"); + } + + return str; +} + +/** + * Print the thread status for all the polling threads + * + * @param dcb The DCB to send the thread status data + */ +void +dShowThreads(DCB *dcb) +{ +int i; +char *state; + + + dcb_printf(dcb, "Polling Threads.\n\n"); + dcb_printf(dcb, "Thread Load Average: %.2f.\n", load_average); + if (thread_data == NULL) + return; + dcb_printf(dcb, " ID | State | # fds | Descriptor | Event\n"); + dcb_printf(dcb, "----+------------+--------+------------------+---------------\n"); + for (i = 0; i < n_threads; i++) + { + switch (thread_data[i].state) + { + case THREAD_STOPPED: + state = "Stopped"; + break; + case THREAD_IDLE: + state = "Idle"; + break; + case THREAD_POLLING: + state = "Polling"; + break; + case THREAD_PROCESSING: + state = "Processing"; + break; + case THREAD_ZPROCESSING: + state = "Collecting"; + break; + } + if (thread_data[i].state != THREAD_PROCESSING) + dcb_printf(dcb, + " %2d | %-10s | | |\n", + i, state); + else if (thread_data[i].cur_dcb == NULL) + dcb_printf(dcb, + " %2d | %-10s | %6d | |\n", + i, state, thread_data[i].n_fds); + else + { + char *event_string + = event_to_string(thread_data[i].event); + if (event_string == NULL) + event_string = "??"; + dcb_printf(dcb, + " %2d | %-10s | %6d | %-16p | %s\n", + i, state, thread_data[i].n_fds, + thread_data[i].cur_dcb, event_string); + free(event_string); + } + } +} diff --git a/server/include/dcb.h b/server/include/dcb.h index 559113e72..67fc9ddac 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -272,6 +272,7 @@ int fail_accept_errno; #define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water) void dcb_pollin(DCB *); +void dcb_pollout(DCB *); DCB *dcb_get_zombies(void); int gw_write( #if defined(SS_DEBUG) diff --git a/server/include/poll.h b/server/include/poll.h index e19be9c94..6524f1bbb 100644 --- a/server/include/poll.h +++ b/server/include/poll.h @@ -41,4 +41,5 @@ extern void poll_waitevents(void *); extern void poll_shutdown(); extern GWBITMASK *poll_bitmask(); extern void dprintPollStats(DCB *); +extern void dShowThreads(DCB *dcb); #endif diff --git a/server/modules/routing/debugcmd.c b/server/modules/routing/debugcmd.c index a9a5da12a..0a8fd2897 100644 --- a/server/modules/routing/debugcmd.c +++ b/server/modules/routing/debugcmd.c @@ -160,6 +160,10 @@ struct subcommand showoptions[] = { "Show all active sessions in MaxScale", "Show all active sessions in MaxScale", {0, 0, 0} }, + { "threads", 0, dShowThreads, + "Show the status of the polling threads in MaxScale", + "Show the status of the polling threads in MaxScale", + {0, 0, 0} }, { "users", 0, telnetdShowUsers, "Show statistics and user names for the debug interface", "Show statistics and user names for the debug interface", @@ -208,6 +212,10 @@ struct subcommand listoptions[] = { "List all the active sessions within MaxScale", "List all the active sessions within MaxScale", {0, 0, 0} }, + { "threads", 0, dShowThreads, + "List the status of the polling threads in MaxScale", + "List the status of the polling threads in MaxScale", + {0, 0, 0} }, { NULL, 0, NULL, NULL, NULL, {0, 0, 0} } }; From 877442c9417db222b1a9d278de63412696904d7b Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Fri, 29 Aug 2014 16:53:05 +0100 Subject: [PATCH 08/10] Addition of periodic task execution via the housekeeper thread Addition of 15, 5 and 1 minute load averages in poll --- server/core/Makefile | 5 +- server/core/gateway.c | 7 ++ server/core/housekeeper.c | 195 +++++++++++++++++++++++++++++++++++ server/core/poll.c | 86 ++++++++++++++- server/include/housekeeper.h | 50 +++++++++ 5 files changed, 338 insertions(+), 5 deletions(-) create mode 100644 server/core/housekeeper.c create mode 100644 server/include/housekeeper.h diff --git a/server/core/Makefile b/server/core/Makefile index ec1ebff93..939dc070e 100644 --- a/server/core/Makefile +++ b/server/core/Makefile @@ -34,6 +34,7 @@ # gateway needs mysql client lib, not qc. # 24/07/13 Mark Ridoch Addition of encryption routines # 30/05/14 Mark Ridoch Filter API added +# 29/08/14 Mark Riddoch Added housekeeper include ../../build_gateway.inc @@ -63,7 +64,7 @@ include ../../makefile.inc SRCS= atomic.c buffer.c spinlock.c gateway.c \ gw_utils.c utils.c dcb.c load_utils.c session.c service.c server.c \ poll.c config.c users.c hashtable.c dbusers.c thread.c gwbitmask.c \ - monitor.c adminusers.c secrets.c filter.c modutil.c + monitor.c adminusers.c secrets.c filter.c modutil.c housekeeper.c HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \ ../include/gw.h ../modules/include/mysql_client_server_protocol.h \ @@ -71,7 +72,7 @@ HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \ ../include/modules.h ../include/poll.h ../include/config.h \ ../include/users.h ../include/hashtable.h ../include/gwbitmask.h \ ../include/adminusers.h ../include/version.h ../include/maxscale.h \ - ../include/filter.h modutil.h + ../include/filter.h ../include/modutil.h ../include/housekeeper.h OBJ=$(SRCS:.c=.o) diff --git a/server/core/gateway.c b/server/core/gateway.c index fda19fff8..c0128beca 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -51,6 +51,7 @@ #include #include #include +#include #include #include @@ -1357,6 +1358,12 @@ int main(int argc, char **argv) log_flush_thr = thread_start( log_flush_cb, (void *)&log_flush_timeout_ms); + + /* + * Start the housekeeper thread + */ + hkinit(); + /*< * Start the polling threads, note this is one less than is * configured as the main thread will also poll. diff --git a/server/core/housekeeper.c b/server/core/housekeeper.c new file mode 100644 index 000000000..6180f24a5 --- /dev/null +++ b/server/core/housekeeper.c @@ -0,0 +1,195 @@ +/* + * This file is distributed as part of the SkySQL Gateway. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ +#include +#include +#include +#include + +/** + * @file housekeeper.c Provide a mechanism to run periodic tasks + * + * @verbatim + * Revision History + * + * Date Who Description + * 29/08/14 Mark Riddoch Initial implementation + * + * @endverbatim + */ + +/** + * List of all tasks that need to be run + */ +static HKTASK *tasks = NULL; +/** + * Spinlock to protect the tasks list + */ +static SPINLOCK tasklock = SPINLOCK_INIT; + +static void hkthread(void *); + +/** + * Initialise the housekeeper thread + */ +void +hkinit() +{ + thread_start(hkthread, NULL); +} + +/** + * Add a new task to the housekeepers lists of tasks that should be + * run periodically. + * + * The task will be first run frequency seconds after this call is + * made and will the be executed repeatedly every frequency seconds + * until the task is removed. + * + * Task names must be unique. + * + * @param name The unique name for this housekeeper task + * @param taskfn The function to call for the task + * @param data Data to pass to the task function + * @param frequency How often to run the task, expressed in seconds + * @return Return the tiem in seconds when the task will be first run if the task was added, otherwise 0 + */ +int +hktask_add(char *name, void (*taskfn)(void *), void *data, int frequency) +{ +HKTASK *task, *ptr; + + if ((task = (HKTASK *)malloc(sizeof(HKTASK))) == NULL) + { + return 0; + } + if ((task->name = strdup(name)) == NULL) + { + free(task); + return 0; + } + task->task = taskfn; + task->data = data; + task->frequency = frequency; + task->nextdue = time(0) + frequency; + task->next = NULL; + spinlock_acquire(&tasklock); + ptr = tasks; + while (ptr && ptr->next) + { + if (strcmp(ptr->name, name) == 0) + { + spinlock_release(&tasklock); + free(task->name); + free(task); + return 0; + } + ptr = ptr->next; + } + if (ptr) + ptr->next = task; + else + tasks = task; + spinlock_release(&tasklock); + + return task->nextdue; +} + +/** + * Remove a named task from the housekeepers task list + * + * @param name The task name to remove + * @return Returns 0 if the task could not be removed + */ +int +hktask_remove(char *name) +{ +HKTASK *ptr, *lptr = NULL; + + spinlock_acquire(&tasklock); + ptr = tasks; + while (ptr && strcmp(ptr->name, name) != 0) + { + lptr = ptr; + ptr = ptr->next; + } + if (ptr && lptr) + lptr->next = ptr->next; + else if (ptr) + tasks = ptr->next; + spinlock_release(&tasklock); + + if (ptr) + { + free(ptr->name); + free(ptr); + return 1; + } + else + { + return 0; + } +} + + +/** + * The housekeeper thread implementation. + * + * This function is responsible for executing the housekeeper tasks. + * + * The implementation of the callng of the task functions is such that + * the tasks are called without the tasklock spinlock being held. This + * allows manipulation of the housekeeper task list during execution of + * one of the tasks. The resutl is that upon completion of a task the + * search for tasks to run must restart from the start of the queue. + * It is vital that the task->nextdue tiem is updated before the task + * is run. + * + * @param data Unused, here to satisfy the thread system + */ +void +hkthread(void *data) +{ +HKTASK *ptr; +time_t now; +void (*taskfn)(void *); +void *taskdata; + + for (;;) + { + thread_millisleep(1000); + now = time(0); + spinlock_acquire(&tasklock); + ptr = tasks; + while (ptr) + { + if (ptr->nextdue <= now) + { + ptr->nextdue = now + ptr->frequency; + taskfn = ptr->task; + taskdata = ptr->data; + spinlock_release(&tasklock); + (*taskfn)(taskdata); + spinlock_acquire(&tasklock); + ptr = tasks; + } + else + ptr = ptr->next; + } + spinlock_release(&tasklock); + } +} diff --git a/server/core/poll.c b/server/core/poll.c index 80c424885..2043cca62 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -29,6 +29,7 @@ #include #include #include +#include extern int lm_enabled_logfiles_bitmask; @@ -56,10 +57,15 @@ static int n_waiting = 0; /*< No. of threads in epoll_wait */ /** * Thread load average, this is the average number of descriptors in each - * poll completion, a value of 1 or less is he ideal. + * poll completion, a value of 1 or less is the ideal. */ static double load_average = 0.0; static int load_samples = 0; +static int load_nfds = 0; +static double current_avg = 0.0; +static double *avg_samples = NULL; +static int next_sample = 0; +static int n_avg_samples; /* Thread statistics data */ static int n_threads; /*< No. of threads */ @@ -110,6 +116,15 @@ static struct { n_fds value */ } pollStats; +/** + * How frequently to call the poll_loadav function used to monitor the load + * average of the poll subsystem. + */ +#define POLL_LOAD_FREQ 10 +/** + * Periodic function to collect load data for average calculations + */ +static void poll_loadav(void *); /** * Initialise the polling system we are using for the gateway. @@ -140,6 +155,13 @@ int i; } } simple_mutex_init(&epoll_wait_mutex, "epoll_wait_mutex"); + + hktask_add("Load Average", poll_loadav, NULL, POLL_LOAD_FREQ); + n_avg_samples = 15 * 60 / POLL_LOAD_FREQ; + avg_samples = (double *)malloc(sizeof(double *) * n_avg_samples); + for (i = 0; i < n_avg_samples; i++) + avg_samples[i] = 0.0; + } /** @@ -413,6 +435,7 @@ DCB *zombies = NULL; load_average = (load_average * load_samples + nfds) / (load_samples + 1); atomic_add(&load_samples, 1); + atomic_add(&load_nfds, nfds); for (i = 0; i < nfds; i++) { @@ -748,12 +771,41 @@ char *str; void dShowThreads(DCB *dcb) { -int i; +int i, j, n; char *state; +double avg1 = 0.0, avg5 = 0.0, avg15 = 0.0; dcb_printf(dcb, "Polling Threads.\n\n"); - dcb_printf(dcb, "Thread Load Average: %.2f.\n", load_average); + dcb_printf(dcb, "Historic Thread Load Average: %.2f.\n", load_average); + dcb_printf(dcb, "Current Thread Load Average: %.2f.\n", current_avg); + + /* Average all the samples to get the 15 minute average */ + for (i = 0; i < n_avg_samples; i++) + avg15 += avg_samples[i]; + avg15 = avg15 / n_avg_samples; + + /* Average the last third of the samples to get the 5 minute average */ + n = 5 * 60 / POLL_LOAD_FREQ; + i = next_sample - (n + 1); + if (i < 0) + i += n_avg_samples; + for (j = i; j < i + n; j++) + avg5 += avg_samples[j % n_avg_samples]; + avg5 = (3 * avg5) / (n_avg_samples); + + /* Average the last 15th of the samples to get the 1 minute average */ + n = 60 / POLL_LOAD_FREQ; + i = next_sample - (n + 1); + if (i < 0) + i += n_avg_samples; + for (j = i; j < i + n; j++) + avg1 += avg_samples[j % n_avg_samples]; + avg1 = (15 * avg1) / (n_avg_samples); + + dcb_printf(dcb, "15 Minute Average: %.2f, 5 Minute Average: %.2f, " + "1 Minute Average: %.2f\n\n", avg15, avg5, avg1); + if (thread_data == NULL) return; dcb_printf(dcb, " ID | State | # fds | Descriptor | Event\n"); @@ -800,3 +852,31 @@ char *state; } } } + +/** + * The function used to calculate time based load data. This is called by the + * housekeeper every POLL_LOAD_FREQ seconds. + * + * @param data Argument required by the housekeeper but not used here + */ +static void +poll_loadav(void *data) +{ +static int last_samples = 0, last_nfds = 0; +int new_samples, new_nfds; + + new_samples = load_samples - last_samples; + new_nfds = load_nfds - last_nfds; + last_samples = load_samples; + last_nfds = load_nfds; + + /* POLL_LOAD_FREQ average is... */ + if (new_samples) + current_avg = new_nfds / new_samples; + else + current_avg = 0.0; + avg_samples[next_sample] = current_avg; + next_sample++; + if (next_sample >= n_avg_samples) + next_sample = 0; +} diff --git a/server/include/housekeeper.h b/server/include/housekeeper.h new file mode 100644 index 000000000..597f19a91 --- /dev/null +++ b/server/include/housekeeper.h @@ -0,0 +1,50 @@ +#ifndef _HOUSEKEEPER_H +#define _HOUSEKEEPER_H +/* + * This file is distributed as part of the SkySQL Gateway. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2014 + */ +#include + +/** + * @file housekeeper.h A mechanism to have task run periodically + * + * @verbatim + * Revision History + * + * Date Who Description + * 29/08/14 Mark Riddoch Initial implementation + * + * @endverbatim + */ + +/** + * The housekeeper task list + */ +typedef struct hktask { + char *name; /*< A simple task name */ + void (*task)(void *data); /*< The task to call */ + void *data; /*< Data to pass the task */ + int frequency; /*< How often to call the tasks (seconds) */ + time_t nextdue; /*< When the task should be next run */ + struct hktask + *next; /*< Next task in the list */ +} HKTASK; + +extern void hkinit(); +extern int hktask_add(char *name, void (*task)(void *), void *data, int frequency); +extern int hktask_remove(char *name); +#endif From c273988e517de0acc17b994385c7eb85b759f7ac Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Fri, 5 Sep 2014 17:29:17 +0100 Subject: [PATCH 09/10] Fixed for EPOLLHUP events Possible double free in maxscaled on close binlog router debugging/tracing --- server/core/buffer.c | 4 + server/core/dcb.c | 52 ++++++------ server/core/poll.c | 8 +- server/include/dcb.h | 4 +- server/modules/include/blr.h | 1 + server/modules/protocol/maxscaled.c | 5 +- server/modules/routing/Makefile | 2 + server/modules/routing/binlog/blr.c | 9 ++- server/modules/routing/binlog/blr_file.c | 41 +++++++++- server/modules/routing/binlog/blr_master.c | 93 +++++++++++++++------- server/modules/routing/binlog/blr_slave.c | 4 +- 11 files changed, 160 insertions(+), 63 deletions(-) diff --git a/server/core/buffer.c b/server/core/buffer.c index 8487aaf2e..db49517f1 100644 --- a/server/core/buffer.c +++ b/server/core/buffer.c @@ -308,6 +308,8 @@ int rval = 0; /** * Trim bytes form the end of a GWBUF structure * + * This routine assumes the buffer is not part of a chain + * * @param buf The buffer to trim * @param nbytes The number of bytes to trim off * @return The buffer chain @@ -315,6 +317,8 @@ int rval = 0; GWBUF * gwbuf_trim(GWBUF *buf, unsigned int n_bytes) { + ss_dassert(buf->next == NULL); + if (GWBUF_LENGTH(buf) <= n_bytes) { gwbuf_consume(buf, GWBUF_LENGTH(buf)); diff --git a/server/core/dcb.c b/server/core/dcb.c index 402c34dc5..ff17116fb 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -388,11 +388,6 @@ DCB_CALLBACK *cb; } spinlock_release(&dcb->cb_lock); - if (dcb->dcb_readqueue) - { - GWBUF* queue = dcb->dcb_readqueue; - while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL); - } bitmask_free(&dcb->memdata.bitmask); simple_mutex_done(&dcb->dcb_read_lock); simple_mutex_done(&dcb->dcb_write_lock); @@ -411,7 +406,7 @@ DCB_CALLBACK *cb; * * @param threadid The thread ID of the caller */ -DCB* +DCB * dcb_process_zombies(int threadid) { DCB *ptr, *lptr; @@ -1255,6 +1250,12 @@ void dprintAllDCBs(DCB *pdcb) DCB *dcb; spinlock_acquire(&dcbspin); +#if SPINLOCK_PROFILE + dcb_printf(pdcb, "DCB List Spinlock Statistics:\n"); + spinlock_stats(&dcbspin, spin_reporter, pdcb); + dcb_printf(pdcb, "Zombie Queue Lock Statistics:\n"); + spinlock_stats(&zombiespin, spin_reporter, pdcb); +#endif dcb = allDCBs; while (dcb) { @@ -1280,7 +1281,7 @@ DCB *dcb; dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls); dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_readrechecks); - dcb_printf(pdcb, "\t\tNo. of busy write polls: %d\n", dcb->stats.n_busywrpolls); + dcb_printf(pdcb, "\t\tNo. of busy write polls: %d\n", dcb->stats.n_busywrpolls); dcb_printf(pdcb, "\t\tNo. of write rechecks: %d\n", dcb->stats.n_writerechecks); dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); @@ -1304,21 +1305,20 @@ DCB *dcb; spinlock_acquire(&dcbspin); dcb = allDCBs; dcb_printf(pdcb, "Descriptor Control Blocks\n"); - dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n"); - dcb_printf(pdcb, " %-10s | %-26s | %-20s | %s\n", + dcb_printf(pdcb, "------------------+----------------------------+--------------------+----------\n"); + dcb_printf(pdcb, " %-16s | %-26s | %-18s | %s\n", "DCB", "State", "Service", "Remote"); - dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n"); + dcb_printf(pdcb, "------------------+----------------------------+--------------------+----------\n"); while (dcb) { - dcb_printf(pdcb, " %-14p | %-26s | %-20s | %s\n", + dcb_printf(pdcb, " %-16p | %-26s | %-18s | %s\n", dcb, gw_dcb_state2string(dcb->state), - (dcb->session->service ? dcb->session->service->name : ""), - (dcb->session->service ? - dcb->session->service->name : ""), + + ((dcb->session && dcb->session->service) ? dcb->session->service->name : ""), (dcb->remote ? dcb->remote : "")); dcb = dcb->next; } - dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n\n"); + dcb_printf(pdcb, "------------------+----------------------------+--------------------+----------\n\n"); spinlock_release(&dcbspin); } @@ -1335,16 +1335,16 @@ DCB *dcb; spinlock_acquire(&dcbspin); dcb = allDCBs; dcb_printf(pdcb, "Client Connections\n"); - dcb_printf(pdcb, "-----------------+------------+----------------------+------------\n"); - dcb_printf(pdcb, " %-15s | %-10s | %-20s | %s\n", + dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n"); + dcb_printf(pdcb, " %-15s | %-16s | %-20s | %s\n", "Client", "DCB", "Service", "Session"); - dcb_printf(pdcb, "-----------------+------------+----------------------+------------\n"); + dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n"); while (dcb) { if (dcb_isclient(dcb) && dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER) { - dcb_printf(pdcb, " %-15s | %10p | %-20s | %10p\n", + dcb_printf(pdcb, " %-15s | %16p | %-20s | %10p\n", (dcb->remote ? dcb->remote : ""), dcb, (dcb->session->service ? dcb->session->service->name : ""), @@ -1352,7 +1352,7 @@ DCB *dcb; } dcb = dcb->next; } - dcb_printf(pdcb, "-----------------+------------+----------------------+------------\n\n"); + dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n\n"); spinlock_release(&dcbspin); } @@ -1392,7 +1392,7 @@ dprintDCB(DCB *pdcb, DCB *dcb) dcb->stats.n_accepts); dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls); dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_readrechecks); - dcb_printf(pdcb, "\t\tNo. of busy write polls: %d\n", dcb->stats.n_busywrpolls); + dcb_printf(pdcb, "\t\tNo. of busy write polls: %d\n", dcb->stats.n_busywrpolls); dcb_printf(pdcb, "\t\tNo. of write rechecks: %d\n", dcb->stats.n_writerechecks); dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); @@ -1929,7 +1929,7 @@ int rval = 0; * @param dcb The DCB that has data available */ void -dcb_pollin(DCB *dcb) +dcb_pollin(DCB *dcb, int thread_id) { spinlock_acquire(&dcb->pollinlock); @@ -1938,7 +1938,10 @@ dcb_pollin(DCB *dcb) dcb->pollinbusy = 1; do { if (dcb->readcheck) + { dcb->stats.n_readrechecks++; + dcb_process_zombies(thread_id); + } dcb->readcheck = 0; spinlock_release(&dcb->pollinlock); dcb->func.read(dcb); @@ -1970,7 +1973,7 @@ dcb_pollin(DCB *dcb) * @param dcb The DCB thats available for writes */ void -dcb_pollout(DCB *dcb) +dcb_pollout(DCB *dcb, int thread_id) { spinlock_acquire(&dcb->polloutlock); @@ -1979,7 +1982,10 @@ dcb_pollout(DCB *dcb) dcb->polloutbusy = 1; do { if (dcb->writecheck) + { + dcb_process_zombies(thread_id); dcb->stats.n_writerechecks++; + } dcb->writecheck = 0; spinlock_release(&dcb->polloutlock); dcb->func.write_ready(dcb); diff --git a/server/core/poll.c b/server/core/poll.c index 2043cca62..3d11b6421 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -181,7 +181,7 @@ poll_add_dcb(DCB *dcb) CHK_DCB(dcb); - ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET; + ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET; ev.data.ptr = dcb; /*< @@ -501,7 +501,7 @@ DCB *zombies = NULL; #else atomic_add(&pollStats.n_write, 1); - dcb_pollout(dcb); + dcb_pollout(dcb, thread_id); #endif } else { LOGIF(LD, (skygw_log_write( @@ -551,7 +551,7 @@ DCB *zombies = NULL; #if MUTEX_BLOCK dcb->func.read(dcb); #else - dcb_pollin(dcb); + dcb_pollin(dcb, thread_id); #endif } #if MUTEX_BLOCK @@ -706,7 +706,7 @@ int i; dcb_printf(dcb, "Number of times no threads polling: %d\n", pollStats.n_nothreads); - dcb_printf(dcb, "No of poll completions with dscriptors\n"); + dcb_printf(dcb, "No of poll completions with descriptors\n"); dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n"); for (i = 0; i < MAXNFDS - 1; i++) { diff --git a/server/include/dcb.h b/server/include/dcb.h index 67fc9ddac..5597b860b 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -271,8 +271,8 @@ int fail_accept_errno; #define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water) #define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water) -void dcb_pollin(DCB *); -void dcb_pollout(DCB *); +void dcb_pollin(DCB *, int); +void dcb_pollout(DCB *, int); DCB *dcb_get_zombies(void); int gw_write( #if defined(SS_DEBUG) diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 0327a34ed..9aaf450af 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -216,6 +216,7 @@ typedef struct router_instance { unsigned int high_water; /*< High water mark for client DCB */ BLCACHE *cache[2]; ROUTER_STATS stats; /*< Statistics for this router */ + SPINLOCK alock; int active_logs; int reconnect_pending; GWBUF *queue; diff --git a/server/modules/protocol/maxscaled.c b/server/modules/protocol/maxscaled.c index 738c78111..f580764f8 100644 --- a/server/modules/protocol/maxscaled.c +++ b/server/modules/protocol/maxscaled.c @@ -235,6 +235,7 @@ maxscaled_error(DCB *dcb) static int maxscaled_hangup(DCB *dcb) { + dcb_close(dcb); return 0; } @@ -313,9 +314,11 @@ maxscaled_close(DCB *dcb) MAXSCALED *maxscaled = dcb->protocol; if (maxscaled && maxscaled->username) + { free(maxscaled->username); + maxscaled->username = NULL; + } - dcb_close(dcb); return 0; } diff --git a/server/modules/routing/Makefile b/server/modules/routing/Makefile index 6dbb9cd4f..b18428bf5 100644 --- a/server/modules/routing/Makefile +++ b/server/modules/routing/Makefile @@ -75,10 +75,12 @@ libreadwritesplit.so: clean: rm -f $(OBJ) $(MODULES) (cd readwritesplit; touch depend.mk; make clean) + (cd binlog; touch depend.mk; make clean) tags: ctags $(SRCS) $(HDRS) (cd readwritesplit; make tags) + (cd binlog; make tags) depend: @rm -f depend.mk diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 1633c374e..94bcbc6cd 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -275,8 +275,13 @@ int i; instances = inst; spinlock_release(&instlock); + spinlock_init(&inst->alock); inst->active_logs = 0; inst->reconnect_pending = 0; + inst->queue = NULL; + inst->residual = NULL; + inst->slaves = NULL; + inst->next = NULL; /* * Initialise the binlog file and position @@ -347,7 +352,7 @@ ROUTER_SLAVE *slave; slave->overrun = 0; spinlock_init(&slave->catch_lock); slave->dcb = session->client; - slave->router = instance; + slave->router = inst; /** * Add this session to the list of active sessions. @@ -606,6 +611,8 @@ struct tm tm; spinlock_stats(&instlock, spin_reporter, dcb); dcb_printf(dcb, "\tSpinlock statistics (instance lock):\n"); spinlock_stats(&router_inst->lock, spin_reporter, dcb); + dcb_printf(dcb, "\tSpinlock statistics (active log lock):\n"); + spinlock_stats(&router_inst->alock, spin_reporter, dcb); #endif if (router_inst->slaves) diff --git a/server/modules/routing/binlog/blr_file.c b/server/modules/routing/binlog/blr_file.c index a17ea5e92..4f7232e64 100644 --- a/server/modules/routing/binlog/blr_file.c +++ b/server/modules/routing/binlog/blr_file.c @@ -160,6 +160,11 @@ unsigned char magic[] = BINLOG_MAGIC; { write(fd, magic, 4); } + else + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Failed to create binlog file %s\n", path))); + } fsync(fd); close(router->binlog_fd); strcpy(router->binlog_name, file); @@ -190,7 +195,13 @@ int fd; strcat(path, "/"); strcat(path, file); - fd = open(path, O_RDWR|O_APPEND, 0666); + if ((fd = open(path, O_RDWR|O_APPEND, 0666)) == -1) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Failed to open binlog file %s for append.\n", + path))); + return; + } fsync(fd); close(router->binlog_fd); strcpy(router->binlog_name, file); @@ -227,6 +238,7 @@ int blr_open_binlog(ROUTER_INSTANCE *router, char *binlog) { char *ptr, path[1024]; +int rval; strcpy(path, "/usr/local/skysql/MaxScale"); if ((ptr = getenv("MAXSCALE_HOME")) != NULL) @@ -238,7 +250,13 @@ char *ptr, path[1024]; strcat(path, "/"); strcat(path, binlog); - return open(path, O_RDONLY, 0666); + if ((rval = open(path, O_RDONLY, 0666)) == -1) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Failed to open binlog file %s\n", path))); + } + + return rval; } /** @@ -255,6 +273,7 @@ blr_read_binlog(int fd, unsigned int pos, REP_HEADER *hdr) uint8_t hdbuf[19]; GWBUF *result; unsigned char *data; +int n; if (lseek(fd, pos, SEEK_SET) != pos) { @@ -265,11 +284,16 @@ unsigned char *data; } /* Read the header information from the file */ - if (read(fd, hdbuf, 19) != 19) + if ((n = read(fd, hdbuf, 19)) != 19) { LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Failed to read header for binlog entry, " "at %d (%s).\n", pos, strerror(errno)))); + if (n> 0 && n < 19) + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Short read when reading the header. " + "Expected 19 bytes got %d bytes.\n", + n))); return NULL; } hdr->timestamp = extract_field(hdbuf, 32); @@ -288,7 +312,16 @@ unsigned char *data; } data = GWBUF_DATA(result); memcpy(data, hdbuf, 19); // Copy the header in - read(fd, &data[19], hdr->event_size - 19); // Read the balance + if ((n = read(fd, &data[19], hdr->event_size - 19)) + != hdr->event_size - 19) // Read the balance + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Short read when reading the event at %d. " + "Expected %d bytes got %d bytes.\n", + pos, n))); + gwbuf_consume(result, hdr->event_size); + return NULL; + } return result; } diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index ef32f5ccf..d9bc40f48 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -153,10 +153,10 @@ GWBUF *ptr; } router->queue = NULL; /* Now it is safe to unleash other threads on this router instance */ - spinlock_acquire(&router->lock); + spinlock_acquire(&router->alock); router->reconnect_pending = 0; router->active_logs = 0; - spinlock_release(&router->lock); + spinlock_release(&router->alock); blr_start_master(router); } @@ -175,7 +175,7 @@ blr_master_reconnect(ROUTER_INSTANCE *router) { int do_reconnect = 0; - spinlock_acquire(&router->lock); + spinlock_acquire(&router->alock); if (router->active_logs) { /* Currently processing a response, set a flag @@ -190,13 +190,13 @@ int do_reconnect = 0; router->active_logs = 1; do_reconnect = 1; } - spinlock_release(&router->lock); + spinlock_release(&router->alock); if (do_reconnect) { blr_restart_master(router); - spinlock_acquire(&router->lock); + spinlock_acquire(&router->alock); router->active_logs = 0; - spinlock_release(&router->lock); + spinlock_release(&router->alock); } } @@ -231,10 +231,9 @@ char query[128]; * manipulate the queue or the flag the router spinlock must * be held. */ - spinlock_acquire(&router->lock); + spinlock_acquire(&router->alock); if (router->active_logs) { - int length; /* * Thread already processing a packet and has not got * to the point that it will not look at new packets @@ -242,21 +241,32 @@ char query[128]; */ router->stats.n_queueadd++; router->queue = gwbuf_append(router->queue, buf); - length = gwbuf_length(router->queue); - spinlock_release(&router->lock); + spinlock_release(&router->alock); LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Queued data due to active log " "handling. %s @ %d, queue length %d\n", router->binlog_name, router->binlog_position, - length))); + gwbuf_length(router->queue)))); return; } else { router->active_logs = 1; + if (router->queue) + { + GWBUF *tmp; + + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, "Found an unexpected queue item" + " prepending queue of length %d.\n", + gwbuf_length(router->queue)))); + tmp = gwbuf_append(router->queue, buf); + buf = tmp; + router->queue = NULL; + } } - spinlock_release(&router->lock); + spinlock_release(&router->alock); if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE) { @@ -264,15 +274,16 @@ char query[128]; LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.\n", router->master_state))); gwbuf_consume(buf, gwbuf_length(buf)); - spinlock_acquire(&router->lock); + spinlock_acquire(&router->alock); if (router->reconnect_pending) { - spinlock_release(&router->lock); + router->active_logs = 0; + spinlock_release(&router->alock); blr_restart_master(router); return; } router->active_logs = 0; - spinlock_release(&router->lock); + spinlock_release(&router->alock); return; } @@ -284,15 +295,15 @@ char query[128]; MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state] ))); gwbuf_consume(buf, gwbuf_length(buf)); - spinlock_acquire(&router->lock); + spinlock_acquire(&router->alock); router->active_logs = 0; if (router->reconnect_pending) { - spinlock_release(&router->lock); + spinlock_release(&router->alock); blr_restart_master(router); return; } - spinlock_release(&router->lock); + spinlock_release(&router->alock); return; } do { @@ -399,7 +410,7 @@ char query[128]; /* * Check for messages queued by other threads. */ - spinlock_acquire(&router->lock); + spinlock_acquire(&router->alock); if ((buf = router->queue) != NULL) { router->queue = NULL; @@ -408,16 +419,14 @@ char query[128]; { if (router->reconnect_pending) { - spinlock_release(&router->lock); blr_restart_master(router); - spinlock_acquire(&router->lock); } else { router->active_logs = 0; } } - spinlock_release(&router->lock); + spinlock_release(&router->alock); } while (buf != NULL); } @@ -543,8 +552,12 @@ uint8_t *msg = NULL, *ptr, *pdata; REP_HEADER hdr; int len, reslen; int no_residual = 1; +int preslen = -1; +int prev_length = -1; +int n_bufs = -1, pn_bufs = -1; - /* Prepend any residual buffer to the buffer chain we have + /* + * Prepend any residual buffer to the buffer chain we have * been called with. */ if (router->residual) @@ -606,6 +619,7 @@ int no_residual = 1; break; } + n_bufs = 0; ptr = msg; while (p && remainder > 0) { @@ -616,6 +630,7 @@ int no_residual = 1; ptr += n; if (remainder > 0) p = p->next; + n_bufs++; } if (remainder) { @@ -626,6 +641,8 @@ int no_residual = 1; "message as expected. %s @ %d\n", router->binlog_name, router->binlog_position))); + free(msg); + msg = NULL; break; } @@ -653,6 +670,7 @@ int no_residual = 1; * The message is fully contained in the current buffer */ ptr = pdata; + n_bufs = 1; } blr_extract_header(ptr, &hdr); @@ -662,11 +680,27 @@ int no_residual = 1; LOGIF(LE,(skygw_log_write( LOGFILE_ERROR, "Packet length is %d, but event size is %d, " - "binlog file %s position %d", + "binlog file %s position %d" + "reslen is %d and preslen is %d, " + "length of previous event %d. %s", len, hdr.event_size, router->binlog_name, - router->binlog_position))); + router->binlog_position, + reslen, preslen, prev_length, + (prev_length == -1 ? + (no_residual ? "No residual data from previous call" : "Residual data from previous call") : "") + ))); blr_log_packet(LOGFILE_ERROR, "Packet:", ptr, len); + LOGIF(LE,(skygw_log_write( + LOGFILE_ERROR, + "This event (0x%x) was contained in %d GWBUFs, " + "the previous events was contained in %d GWBUFs", + router->lastEventReceived, n_bufs, pn_bufs))); + if (msg) + { + free(msg); + msg = NULL; + } break; } if (hdr.ok == 0) @@ -768,6 +802,7 @@ int no_residual = 1; free(msg); msg = NULL; } + prev_length = len; while (len > 0) { int n, plen; @@ -776,6 +811,8 @@ int no_residual = 1; pkt = gwbuf_consume(pkt, n); len -= n; } + preslen = reslen; + pn_bufs = n_bufs; } /* @@ -846,7 +883,9 @@ char file[BINLOG_FNAMELEN+1]; ptr += 19; // Skip event header len = hdr->event_size - 19; // Event size minus header - pos = extract_field(ptr, 32) + (extract_field(ptr+4, 32) << 32); + pos = extract_field(ptr+4, 32); + pos <<= 32; + pos |= extract_field(ptr, 32); slen = len - 8; if (slen > BINLOG_FNAMELEN) slen = BINLOG_FNAMELEN; @@ -905,7 +944,7 @@ MYSQL_session *auth_info; static void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) { -GWBUF *pkt, *distq; +GWBUF *pkt; uint8_t *buf; ROUTER_SLAVE *slave; int action; diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index 30aebfe65..176efbe4c 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -883,6 +883,7 @@ if (DCB_BELOW_LOW_WATER(slave->dcb) && slave->binlog_pos != router->binlog_posit LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Expected to be above low water\n"))); goto doitagain; } + return rval; } /** @@ -936,7 +937,8 @@ void blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr) { ptr += 19; // Skip header - slave->binlog_pos = extract_field(ptr, 32) + (extract_field(ptr+4, 32) << 32); + slave->binlog_pos = extract_field(ptr, 32); + slave->binlog_pos += (extract_field(ptr+4, 32) << 32); memcpy(slave->binlogfile, ptr + 8, BINLOG_FNAMELEN); slave->binlogfile[BINLOG_FNAMELEN] = 0; } From f9aece511306ff03b081f00cde322a0179ef16a6 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Wed, 10 Sep 2014 15:51:53 +0100 Subject: [PATCH 10/10] Remove queuing that is no longer required Fixed bug in mysql_backend.c that lost up to 4 bytes of data in rare circumstances. --- server/modules/include/blr.h | 4 +- server/modules/protocol/mysql_backend.c | 2 +- server/modules/routing/binlog/blr.c | 23 +- server/modules/routing/binlog/blr_master.c | 350 +++++++++------------ 4 files changed, 155 insertions(+), 224 deletions(-) diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 9aaf450af..f493ec715 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -126,7 +126,6 @@ typedef struct { int n_registered; /*< Number of registered slaves */ int n_masterstarts; /*< Number of times connection restarted */ int n_delayedreconnects; - int n_queueadd; /*< Number of times incoming data was added to processign queue */ int n_residuals; /*< Number of times residual data was buffered */ int n_heartbeats; /*< Number of heartbeat messages */ time_t lastReply; @@ -216,10 +215,9 @@ typedef struct router_instance { unsigned int high_water; /*< High water mark for client DCB */ BLCACHE *cache[2]; ROUTER_STATS stats; /*< Statistics for this router */ - SPINLOCK alock; int active_logs; int reconnect_pending; - GWBUF *queue; + int handling_threads; struct router_instance *next; } ROUTER_INSTANCE; diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 5d8088d4e..44b92b1b9 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -497,7 +497,7 @@ static int gw_read_backend_event(DCB *dcb) { { if (nbytes_read < 5) { - gwbuf_append(dcb->dcb_readqueue, read_buffer); + dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, read_buffer); rc = 0; goto return_rc; } diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 94bcbc6cd..dec20f8b4 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -75,9 +75,10 @@ static void clientReply( static void errorReply( ROUTER *instance, void *router_session, - char *message, + GWBUF *message, DCB *backend_dcb, - int action); + error_action_t action, + bool *succp); static uint8_t getCapabilities (ROUTER* inst, void* router_session); @@ -275,10 +276,9 @@ int i; instances = inst; spinlock_release(&instlock); - spinlock_init(&inst->alock); inst->active_logs = 0; inst->reconnect_pending = 0; - inst->queue = NULL; + inst->handling_threads = 0; inst->residual = NULL; inst->slaves = NULL; inst->next = NULL; @@ -582,10 +582,6 @@ struct tm tm; router_inst->stats.n_heartbeats); dcb_printf(dcb, "\tNumber of packets received: %u\n", router_inst->stats.n_reads); - dcb_printf(dcb, "\tNumber of packets queued: %u\n", - router_inst->stats.n_queueadd); - dcb_printf(dcb, "\tCurrent length of incoming queue: %d\n", - gwbuf_length(router_inst->queue)); dcb_printf(dcb, "\tNumber of residual data packets: %u\n", router_inst->stats.n_residuals); dcb_printf(dcb, "\tAverage events per packet %.1f\n", @@ -611,8 +607,6 @@ struct tm tm; spinlock_stats(&instlock, spin_reporter, dcb); dcb_printf(dcb, "\tSpinlock statistics (instance lock):\n"); spinlock_stats(&router_inst->lock, spin_reporter, dcb); - dcb_printf(dcb, "\tSpinlock statistics (active log lock):\n"); - spinlock_stats(&router_inst->alock, spin_reporter, dcb); #endif if (router_inst->slaves) @@ -710,18 +704,15 @@ ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; * @param message The error message to reply * @param backend_dcb The backend DCB * @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION + * @param succp Result of action * */ static void -errorReply( - ROUTER *instance, - void *router_session, - char *message, - DCB *backend_dcb, - int action) +errorReply(ROUTER *instance, void *router_session, GWBUF *message, DCB *backend_dcb, error_action_t action, bool *succp) { LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Erorr Reply '%s'", message))); + *succp = false; } /** to be inline'd */ diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index d9bc40f48..412276e48 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -145,18 +145,11 @@ GWBUF *ptr; } router->residual = NULL; - /* Discard the queued data */ - ptr = router->queue; - while (ptr) - { - ptr = gwbuf_consume(ptr, GWBUF_LENGTH(ptr)); - } - router->queue = NULL; /* Now it is safe to unleash other threads on this router instance */ - spinlock_acquire(&router->alock); + spinlock_acquire(&router->lock); router->reconnect_pending = 0; router->active_logs = 0; - spinlock_release(&router->alock); + spinlock_release(&router->lock); blr_start_master(router); } @@ -175,7 +168,7 @@ blr_master_reconnect(ROUTER_INSTANCE *router) { int do_reconnect = 0; - spinlock_acquire(&router->alock); + spinlock_acquire(&router->lock); if (router->active_logs) { /* Currently processing a response, set a flag @@ -190,13 +183,13 @@ int do_reconnect = 0; router->active_logs = 1; do_reconnect = 1; } - spinlock_release(&router->alock); + spinlock_release(&router->lock); if (do_reconnect) { blr_restart_master(router); - spinlock_acquire(&router->alock); + spinlock_acquire(&router->lock); router->active_logs = 0; - spinlock_release(&router->alock); + spinlock_release(&router->lock); } } @@ -214,76 +207,29 @@ blr_master_response(ROUTER_INSTANCE *router, GWBUF *buf) { char query[128]; - /* - * We need to make sure that incoming packets (gwbufs) are - * strictly processed in order and that we do not have packets - * from the same master being processed on multiple threads. - * To do this we create a queue of the GWBUF structures and have - * a flag that indicates if this routine is processing a packet - * on another thread. Items will be added to the queue if the - * routine is running in another thread. That thread will read - * the queue before returning. - * - * The action of adding items to the queue is protected by a - * spinlock and a flag that inidicates if the routine running - * in the other thread has reached the point at which it will - * no longer check the queue before returning. In order to - * manipulate the queue or the flag the router spinlock must - * be held. - */ - spinlock_acquire(&router->alock); - if (router->active_logs) - { - /* - * Thread already processing a packet and has not got - * to the point that it will not look at new packets - * added to the queue. - */ - router->stats.n_queueadd++; - router->queue = gwbuf_append(router->queue, buf); - spinlock_release(&router->alock); - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, "Queued data due to active log " - "handling. %s @ %d, queue length %d\n", - router->binlog_name, - router->binlog_position, - gwbuf_length(router->queue)))); - return; - } - else - { - router->active_logs = 1; - if (router->queue) - { - GWBUF *tmp; - - LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, "Found an unexpected queue item" - " prepending queue of length %d.\n", - gwbuf_length(router->queue)))); - tmp = gwbuf_append(router->queue, buf); - buf = tmp; - router->queue = NULL; - } - } - spinlock_release(&router->alock); - + atomic_add(&router->handling_threads, 1); + ss_dassert(router->handling_threads == 1); + spinlock_acquire(&router->lock); + router->active_logs = 1; + spinlock_release(&router->lock); if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE) { LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.\n", router->master_state))); gwbuf_consume(buf, gwbuf_length(buf)); - spinlock_acquire(&router->alock); + spinlock_acquire(&router->lock); if (router->reconnect_pending) { router->active_logs = 0; - spinlock_release(&router->alock); + spinlock_release(&router->lock); + atomic_add(&router->handling_threads, -1); blr_restart_master(router); return; } router->active_logs = 0; - spinlock_release(&router->alock); + spinlock_release(&router->lock); + atomic_add(&router->handling_threads, -1); return; } @@ -295,139 +241,125 @@ char query[128]; MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state] ))); gwbuf_consume(buf, gwbuf_length(buf)); - spinlock_acquire(&router->alock); + spinlock_acquire(&router->lock); router->active_logs = 0; if (router->reconnect_pending) { - spinlock_release(&router->alock); + spinlock_release(&router->lock); + atomic_add(&router->handling_threads, -1); blr_restart_master(router); return; } - spinlock_release(&router->alock); + spinlock_release(&router->lock); + atomic_add(&router->handling_threads, -1); return; } - do { - switch (router->master_state) - { - case BLRM_TIMESTAMP: - // Response to a timestamp message, no need to save this. - gwbuf_consume(buf, GWBUF_LENGTH(buf)); - buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'"); - router->master_state = BLRM_SERVERID; - router->master->func.write(router->master, buf); - break; - case BLRM_SERVERID: - // Response to fetch of master's server-id - router->saved_master.server_id = buf; - // TODO: Extract the value of server-id and place in router->master_id - buf = blr_make_query("SET @master_heartbeat_period = 1799999979520"); - router->master_state = BLRM_HBPERIOD; - router->master->func.write(router->master, buf); - break; - case BLRM_HBPERIOD: - // Response to set the heartbeat period - router->saved_master.heartbeat = buf; - buf = blr_make_query("SET @master_binlog_checksum = @@global.binlog_checksum"); - router->master_state = BLRM_CHKSUM1; - router->master->func.write(router->master, buf); - break; - case BLRM_CHKSUM1: - // Response to set the master binlog checksum - router->saved_master.chksum1 = buf; - buf = blr_make_query("SELECT @master_binlog_checksum"); - router->master_state = BLRM_CHKSUM2; - router->master->func.write(router->master, buf); - break; - case BLRM_CHKSUM2: - // Response to the master_binlog_checksum, should be stored - router->saved_master.chksum2 = buf; - buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE"); - router->master_state = BLRM_GTIDMODE; - router->master->func.write(router->master, buf); - break; - case BLRM_GTIDMODE: - // Response to the GTID_MODE, should be stored - router->saved_master.gtid_mode = buf; - buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'"); - router->master_state = BLRM_MUUID; - router->master->func.write(router->master, buf); - break; - case BLRM_MUUID: - // Response to the SERVER_UUID, should be stored - router->saved_master.uuid = buf; - sprintf(query, "SET @slave_uuid='%s'", router->uuid); - buf = blr_make_query(query); - router->master_state = BLRM_SUUID; - router->master->func.write(router->master, buf); - break; - case BLRM_SUUID: - // Response to the SET @server_uuid, should be stored - router->saved_master.setslaveuuid = buf; - buf = blr_make_query("SET NAMES latin1"); - router->master_state = BLRM_LATIN1; - router->master->func.write(router->master, buf); - break; - case BLRM_LATIN1: - // Response to the SET NAMES latin1, should be stored - router->saved_master.setnames = buf; - buf = blr_make_query("SET NAMES utf8"); - router->master_state = BLRM_UTF8; - router->master->func.write(router->master, buf); - break; - case BLRM_UTF8: - // Response to the SET NAMES utf8, should be stored - router->saved_master.utf8 = buf; - buf = blr_make_query("SELECT 1"); - router->master_state = BLRM_SELECT1; - router->master->func.write(router->master, buf); - break; - case BLRM_SELECT1: - // Response to the SELECT 1, should be stored - router->saved_master.select1 = buf; - buf = blr_make_query("SELECT VERSION();"); - router->master_state = BLRM_SELECTVER; - router->master->func.write(router->master, buf); - break; - case BLRM_SELECTVER: - // Response to SELECT VERSION should be stored - router->saved_master.selectver = buf; - buf = blr_make_registration(router); - router->master_state = BLRM_REGISTER; - router->master->func.write(router->master, buf); - break; - case BLRM_REGISTER: - // Request a dump of the binlog file - buf = blr_make_binlog_dump(router); - router->master_state = BLRM_BINLOGDUMP; - router->master->func.write(router->master, buf); - break; - case BLRM_BINLOGDUMP: - // Main body, we have received a binlog record from the master - blr_handle_binlog_record(router, buf); - break; - } + switch (router->master_state) + { + case BLRM_TIMESTAMP: + // Response to a timestamp message, no need to save this. + gwbuf_consume(buf, GWBUF_LENGTH(buf)); + buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_ID'"); + router->master_state = BLRM_SERVERID; + router->master->func.write(router->master, buf); + break; + case BLRM_SERVERID: + // Response to fetch of master's server-id + router->saved_master.server_id = buf; + // TODO: Extract the value of server-id and place in router->master_id + buf = blr_make_query("SET @master_heartbeat_period = 1799999979520"); + router->master_state = BLRM_HBPERIOD; + router->master->func.write(router->master, buf); + break; + case BLRM_HBPERIOD: + // Response to set the heartbeat period + router->saved_master.heartbeat = buf; + buf = blr_make_query("SET @master_binlog_checksum = @@global.binlog_checksum"); + router->master_state = BLRM_CHKSUM1; + router->master->func.write(router->master, buf); + break; + case BLRM_CHKSUM1: + // Response to set the master binlog checksum + router->saved_master.chksum1 = buf; + buf = blr_make_query("SELECT @master_binlog_checksum"); + router->master_state = BLRM_CHKSUM2; + router->master->func.write(router->master, buf); + break; + case BLRM_CHKSUM2: + // Response to the master_binlog_checksum, should be stored + router->saved_master.chksum2 = buf; + buf = blr_make_query("SELECT @@GLOBAL.GTID_MODE"); + router->master_state = BLRM_GTIDMODE; + router->master->func.write(router->master, buf); + break; + case BLRM_GTIDMODE: + // Response to the GTID_MODE, should be stored + router->saved_master.gtid_mode = buf; + buf = blr_make_query("SHOW VARIABLES LIKE 'SERVER_UUID'"); + router->master_state = BLRM_MUUID; + router->master->func.write(router->master, buf); + break; + case BLRM_MUUID: + // Response to the SERVER_UUID, should be stored + router->saved_master.uuid = buf; + sprintf(query, "SET @slave_uuid='%s'", router->uuid); + buf = blr_make_query(query); + router->master_state = BLRM_SUUID; + router->master->func.write(router->master, buf); + break; + case BLRM_SUUID: + // Response to the SET @server_uuid, should be stored + router->saved_master.setslaveuuid = buf; + buf = blr_make_query("SET NAMES latin1"); + router->master_state = BLRM_LATIN1; + router->master->func.write(router->master, buf); + break; + case BLRM_LATIN1: + // Response to the SET NAMES latin1, should be stored + router->saved_master.setnames = buf; + buf = blr_make_query("SET NAMES utf8"); + router->master_state = BLRM_UTF8; + router->master->func.write(router->master, buf); + break; + case BLRM_UTF8: + // Response to the SET NAMES utf8, should be stored + router->saved_master.utf8 = buf; + buf = blr_make_query("SELECT 1"); + router->master_state = BLRM_SELECT1; + router->master->func.write(router->master, buf); + break; + case BLRM_SELECT1: + // Response to the SELECT 1, should be stored + router->saved_master.select1 = buf; + buf = blr_make_query("SELECT VERSION();"); + router->master_state = BLRM_SELECTVER; + router->master->func.write(router->master, buf); + break; + case BLRM_SELECTVER: + // Response to SELECT VERSION should be stored + router->saved_master.selectver = buf; + buf = blr_make_registration(router); + router->master_state = BLRM_REGISTER; + router->master->func.write(router->master, buf); + break; + case BLRM_REGISTER: + // Request a dump of the binlog file + buf = blr_make_binlog_dump(router); + router->master_state = BLRM_BINLOGDUMP; + router->master->func.write(router->master, buf); + break; + case BLRM_BINLOGDUMP: + // Main body, we have received a binlog record from the master + blr_handle_binlog_record(router, buf); + break; + } - /* - * Check for messages queued by other threads. - */ - spinlock_acquire(&router->alock); - if ((buf = router->queue) != NULL) - { - router->queue = NULL; - } - else - { - if (router->reconnect_pending) - { - blr_restart_master(router); - } - else - { - router->active_logs = 0; - } - } - spinlock_release(&router->alock); - } while (buf != NULL); + if (router->reconnect_pending) + blr_restart_master(router); + spinlock_acquire(&router->lock); + router->active_logs = 0; + spinlock_release(&router->lock); + atomic_add(&router->handling_threads, -1); } /** @@ -548,13 +480,15 @@ encode_value(unsigned char *data, unsigned int value, int len) static void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) { -uint8_t *msg = NULL, *ptr, *pdata; -REP_HEADER hdr; -int len, reslen; -int no_residual = 1; -int preslen = -1; -int prev_length = -1; -int n_bufs = -1, pn_bufs = -1; +uint8_t *msg = NULL, *ptr, *pdata; +REP_HEADER hdr; +unsigned int len, reslen; +unsigned int pkt_length; +int no_residual = 1; +int preslen = -1; +int prev_length = -1; +int n_bufs = -1, pn_bufs = -1; +static REP_HEADER phdr; /* * Prepend any residual buffer to the buffer chain we have @@ -567,7 +501,8 @@ int n_bufs = -1, pn_bufs = -1; no_residual = 0; } - while (pkt && gwbuf_length(pkt) > 24) + pkt_length = gwbuf_length(pkt); + while (pkt && pkt_length > 24) { reslen = GWBUF_LENGTH(pkt); pdata = GWBUF_DATA(pkt); @@ -595,7 +530,7 @@ int n_bufs = -1, pn_bufs = -1; len = extract_field(pdata, 24) + 4; } - if (reslen < len && gwbuf_length(pkt) >= len) + if (reslen < len && pkt_length >= len) { /* * The message is contained in more than the current @@ -703,6 +638,7 @@ int n_bufs = -1, pn_bufs = -1; } break; } + phdr = hdr; if (hdr.ok == 0) { router->stats.n_binlogs++; @@ -810,6 +746,7 @@ int n_bufs = -1, pn_bufs = -1; n = (plen < len ? plen : len); pkt = gwbuf_consume(pkt, n); len -= n; + pkt_length -= n; } preslen = reslen; pn_bufs = n_bufs; @@ -822,6 +759,11 @@ int n_bufs = -1, pn_bufs = -1; if (pkt) { router->residual = pkt; + ss_dassert(pkt_length != 0); + } + else + { + ss_dassert(pkt_length == 0); } blr_file_flush(router); } @@ -892,7 +834,7 @@ char file[BINLOG_FNAMELEN+1]; memcpy(file, ptr + 8, slen); file[slen] = 0; -#ifdef VEBOSE_ROTATE +#ifdef VERBOSE_ROTATE printf("binlog rotate: "); while (len--) printf("0x%02x ", *ptr++);