diff --git a/server/core/dcb.c b/server/core/dcb.c index 713cf4b3c..5dc11a374 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -84,7 +84,13 @@ static bool dcb_set_state_nomutex( dcb_state_t* old_state); static void dcb_call_callback(DCB *dcb, DCB_REASON reason); -DCB* dcb_get_zombies(void) +/** + * Return the pointer to the lsit of zombie DCB's + * + * @return Zombies DCB list + */ +DCB * +dcb_get_zombies(void) { return zombies; } @@ -122,6 +128,9 @@ DCB *rval; spinlock_init(&rval->delayqlock); spinlock_init(&rval->authlock); spinlock_init(&rval->cb_lock); + spinlock_init(&rval->pollinlock); + rval->pollinbusy = 0; + rval->readcheck = 0; rval->fd = -1; memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics rval->state = DCB_STATE_ALLOC; @@ -1147,12 +1156,14 @@ DCB *dcb; dcb_printf(pdcb, "\tQueued write data: %d\n", gwbuf_length(dcb->writeq)); dcb_printf(pdcb, "\tStatistics:\n"); - dcb_printf(pdcb, "\t\tNo. of Reads: %d\n", dcb->stats.n_reads); - dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes); - dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered); - dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); - dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); - dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); + dcb_printf(pdcb, "\t\tNo. of Reads: %d\n", dcb->stats.n_reads); + dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes); + dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered); + dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); + dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls); + dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_busypolls); + dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); + dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); dcb = dcb->next; } spinlock_release(&dcbspin); @@ -1211,6 +1222,8 @@ dprintDCB(DCB *pdcb, DCB *dcb) dcb->stats.n_buffered); dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); + dcb_printf(pdcb, "\t\tNo. of busy polls: %d\n", dcb->stats.n_busypolls); + dcb_printf(pdcb, "\t\tNo. of read rechecks: %d\n", dcb->stats.n_busypolls); dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", @@ -1711,3 +1724,36 @@ int rval = 0; return rval; } + +/** + * Called by the EPOLLIN event. Take care of calling the protocol + * read entry point and managing multiple threads copeting for the DCB + * without blocking those threads. + * + * @param dcb The DCB that has data available + */ +void +dcb_pollin(DCB *dcb) +{ + + spinlock_acquire(&dcb->pollinlock); + if (dcb->pollinbusy == 0) + { + dcb->pollinbusy = 1; + do { + if (dcb->readcheck) + dcb->stats.n_readrechecks++; + dcb->readcheck = 0; + spinlock_release(&dcb->pollinlock); + dcb->func.read(dcb); + spinlock_acquire(&dcb->pollinlock); + } while (dcb->readcheck); + dcb->pollinbusy = 0; + } + else + { + dcb->stats.n_busypolls++; + dcb->readcheck = 1; + } + spinlock_release(&dcb->pollinlock); +} diff --git a/server/core/poll.c b/server/core/poll.c index e86d656ad..f6100df4d 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -393,11 +393,13 @@ poll_waitevents(void *arg) } if (ev & EPOLLIN) { +#if MUTEX_BLOCK simple_mutex_lock(&dcb->dcb_read_lock, true); ss_info_dassert(!dcb->dcb_read_active, "Read already active"); dcb->dcb_read_active = TRUE; +#endif if (dcb->state == DCB_STATE_LISTENING) { @@ -421,11 +423,17 @@ poll_waitevents(void *arg) dcb, dcb->fd))); atomic_add(&pollStats.n_read, 1); +#if MUTEX_BLOCK dcb->func.read(dcb); +#else + dcb_pollin(dcb); +#endif } +#if MUTEX_BLOCK dcb->dcb_read_active = FALSE; simple_mutex_unlock( &dcb->dcb_read_lock); +#endif } if (ev & EPOLLERR) { diff --git a/server/include/dcb.h b/server/include/dcb.h index e7d2ec716..68b1f349c 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -105,12 +105,14 @@ typedef struct gw_protocol { * The statitics gathered on a descriptor control block */ typedef struct dcbstats { - int n_reads; /*< Number of reads on this descriptor */ - int n_writes; /*< Number of writes on this descriptor */ - int n_accepts; /*< Number of accepts on this descriptor */ - int n_buffered; /*< Number of buffered writes */ - int n_high_water; /*< Number of crosses of high water mark */ - int n_low_water; /*< Number of crosses of low water mark */ + int n_reads; /*< Number of reads on this descriptor */ + int n_writes; /*< Number of writes on this descriptor */ + int n_accepts; /*< Number of accepts on this descriptor */ + int n_buffered; /*< Number of buffered writes */ + int n_high_water; /*< Number of crosses of high water mark */ + int n_low_water; /*< Number of crosses of low water mark */ + int n_busypolls; /*< Number of read polls whiel reading */ + int n_readrechecks; /*< Number of rechecks for reads */ } DCBSTATS; /** @@ -226,6 +228,9 @@ typedef struct dcb { int command; /**< Specific client command type */ SPINLOCK cb_lock; /**< The lock for the callbacks linked list */ DCB_CALLBACK *callbacks; /**< The list of callbacks for the DCB */ + SPINLOCK pollinlock; + int pollinbusy; + int readcheck; unsigned int high_water; /**< High water mark */ unsigned int low_water; /**< Low water mark */ @@ -253,6 +258,7 @@ int fail_accept_errno; #define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water) #define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water) +void dcb_pollin(DCB *); DCB *dcb_get_zombies(void); int gw_write( #if defined(SS_DEBUG) @@ -281,7 +287,7 @@ void dcb_hashtable_stats(DCB *, void *); /**< Print statisitics */ void dcb_add_to_zombieslist(DCB* dcb); int dcb_add_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *), void *); -int dcb_remove_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON), +int dcb_remove_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON), void *); int dcb_isvalid(DCB *); /* Check the DCB is in the linked list */ diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 8e0939737..f4ec0f314 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -59,10 +59,10 @@ * Slave statistics */ typedef struct { - unsigned int n_events; /*< Number of events sent */ - unsigned int n_bursts; /*< Number of bursts sent */ - unsigned int n_requests; /*< Number of requests received */ - unsigned int n_flows; /*< Number of flow control restarts */ + int n_events; /*< Number of events sent */ + int n_bursts; /*< Number of bursts sent */ + int n_requests; /*< Number of requests received */ + int n_flows; /*< Number of flow control restarts */ } SLAVE_STATS; /** @@ -102,18 +102,19 @@ typedef struct router_slave { * The statistics for this router instance */ typedef struct { - unsigned int n_slaves; /*< Number slave sessions created */ - unsigned int n_reads; /*< Number of record reads */ + int n_slaves; /*< Number slave sessions created */ + int n_reads; /*< Number of record reads */ uint64_t n_binlogs; /*< Number of binlog records from master */ uint64_t n_binlog_errors;/*< Number of binlog records from master */ uint64_t n_rotates; /*< Number of binlog rotate events */ uint64_t n_cachehits; /*< Number of hits on the binlog cache */ uint64_t n_cachemisses; /*< Number of misses on the binlog cache */ - unsigned int n_registered; /*< Number of registered slaves */ + int n_registered; /*< Number of registered slaves */ int n_masterstarts; /*< Numebr of times connection restarted */ + int n_delayedreconnects; int n_queueadd; /*< Numebr of times incoming data was added to processign queue */ int n_residuals; /*< Number of times residual data was buffered */ - unsigned int n_heartbeats; /*< Number of heartbeat messages */ + int n_heartbeats; /*< Number of heartbeat messages */ time_t lastReply; uint64_t n_fakeevents; /*< Fake events not written to disk */ uint64_t n_artificial; /*< Artificial events not written to disk */ @@ -199,6 +200,7 @@ typedef struct router_instance { BLCACHE *cache[2]; ROUTER_STATS stats; /*< Statistics for this router */ int active_logs; + int reconnect_pending; GWBUF *queue; struct router_instance *next; @@ -326,13 +328,17 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered", */ extern void blr_start_master(ROUTER_INSTANCE *); extern void blr_master_response(ROUTER_INSTANCE *, GWBUF *); +extern void blr_master_reconnect(ROUTER_INSTANCE *); extern int blr_slave_request(ROUTER_INSTANCE *, ROUTER_SLAVE *, GWBUF *); - +extern void blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr); +extern int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); extern void blr_init_cache(ROUTER_INSTANCE *); extern void blr_file_init(ROUTER_INSTANCE *); +extern int blr_open_binlog(ROUTER_INSTANCE *, char *); extern void blr_write_binlog_record(ROUTER_INSTANCE *, REP_HEADER *,uint8_t *); extern void blr_file_rotate(ROUTER_INSTANCE *, char *, uint64_t); +extern void blr_file_flush(ROUTER_INSTANCE *); extern GWBUF *blr_read_binlog(int, unsigned int, REP_HEADER *); #endif diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 762493788..e25f688ff 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -276,6 +276,7 @@ int i; spinlock_release(&instlock); inst->active_logs = 0; + inst->reconnect_pending = 0; /* * Initialise the binlog file and position @@ -443,11 +444,7 @@ ROUTER_SLAVE *slave = (ROUTER_SLAVE *)router_session; */ LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Binlog router close session with master"))); - router->master_state = BLRM_UNCONNECTED; - dcb_close(router->master); - dcb_free(router->master); - dcb_free(router->client); - blr_start_master(router); + blr_master_reconnect(router); return; } CHK_CLIENT_RSES(slave); @@ -538,6 +535,8 @@ struct tm tm; dcb_printf(dcb, "\tNumber of master connects: %d\n", router_inst->stats.n_masterstarts); + dcb_printf(dcb, "\tNumber of delayed reconnects: %d\n", + router_inst->stats.n_delayedreconnects); dcb_printf(dcb, "\tCurrent binlog file: %s\n", router_inst->binlog_name); dcb_printf(dcb, "\tCurrent binlog position: %u\n", @@ -578,6 +577,8 @@ struct tm tm; router_inst->lastEventReceived); if (router_inst->active_logs) dcb_printf(dcb, "\tRouter processing binlog records\n"); + if (router_inst->reconnect_pending) + dcb_printf(dcb, "\tRouter pending reconnect to master\n"); dcb_printf(dcb, "\tEvents received:\n"); for (i = 0; i < 0x24; i++) { diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index acb7b01cf..800535e65 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -120,6 +120,86 @@ perror("setsockopt"); router->stats.n_masterstarts++; } +/** + * Reconnect to the master server. + * + * IMPORTANT - must be called with router->active_logs set by the + * thread that set active_logs. + * + * @param router The router instance + */ +static void +blr_restart_master(ROUTER_INSTANCE *router) +{ +GWBUF *ptr; + + dcb_close(router->master); + dcb_free(router->master); + dcb_free(router->client); + + /* Discard the queued residual data */ + ptr = router->residual; + while (ptr) + { + ptr = gwbuf_consume(ptr, GWBUF_LENGTH(ptr)); + } + router->residual = NULL; + + /* Discard the queued data */ + ptr = router->queue; + while (ptr) + { + ptr = gwbuf_consume(ptr, GWBUF_LENGTH(ptr)); + } + router->queue = NULL; + /* Now it is safe to unleash other threads on this router instance */ + spinlock_acquire(&router->lock); + router->reconnect_pending = 0; + router->active_logs = 0; + spinlock_release(&router->lock); + blr_start_master(router); +} + +/** + * Request a reconnect to the master. + * + * If another thread is active processing messages from the master + * then merely set a flag for that thread to do the restart. If no + * threads are active then directly call the restart routine to + * reconnect to the master. + * + * @param router The router instance + */ +void +blr_master_reconnect(ROUTER_INSTANCE *router) +{ +int do_reconnect = 0; + + spinlock_acquire(&router->lock); + if (router->active_logs) + { + /* Currently processing a response, set a flag + * and get the thread that is process a response + * to deal with the reconnect. + */ + router->reconnect_pending = 1; + router->stats.n_delayedreconnects++; + } + else + { + router->active_logs = 1; + do_reconnect = 1; + } + spinlock_release(&router->lock); + if (do_reconnect) + { + blr_restart_master(router); + spinlock_acquire(&router->lock); + router->active_logs = 0; + spinlock_release(&router->lock); + } +} + /** * Binlog router master side state machine event handler. * @@ -154,6 +234,7 @@ char query[128]; spinlock_acquire(&router->lock); if (router->active_logs) { + int length; /* * Thread already processing a packet and has not got * to the point that it will not look at new packets @@ -161,7 +242,14 @@ char query[128]; */ router->stats.n_queueadd++; router->queue = gwbuf_append(router->queue, buf); + length = gwbuf_length(router->queue); spinlock_release(&router->lock); + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, "Queued data due to active log " + "handling. %s @ %d, queue length %d\n", + router->binlog_name, + router->binlog_position, + length))); return; } else @@ -172,11 +260,17 @@ char query[128]; if (router->master_state < 0 || router->master_state > BLRM_MAXSTATE) { - LOGIF(LM, (skygw_log_write( + LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, "Invalid master state machine state (%d) for binlog router.\n", router->master_state))); gwbuf_consume(buf, gwbuf_length(buf)); spinlock_acquire(&router->lock); + if (router->reconnect_pending) + { + spinlock_release(&router->lock); + blr_restart_master(router); + return; + } router->active_logs = 0; spinlock_release(&router->lock); return; @@ -184,7 +278,7 @@ char query[128]; if (router->master_state != BLRM_BINLOGDUMP && MYSQL_RESPONSE_ERR(buf)) { - LOGIF(LM, (skygw_log_write( + LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, "Received error: %d, %s from master during %s phase of the master state machine.\n", MYSQL_ERROR_CODE(buf), MYSQL_ERROR_MSG(buf), blrm_states[router->master_state] @@ -192,6 +286,12 @@ char query[128]; gwbuf_consume(buf, gwbuf_length(buf)); spinlock_acquire(&router->lock); router->active_logs = 0; + if (router->reconnect_pending) + { + spinlock_release(&router->lock); + blr_restart_master(router); + return; + } spinlock_release(&router->lock); return; } @@ -281,12 +381,20 @@ char query[128]; spinlock_acquire(&router->lock); if ((buf = router->queue) != NULL) { - router->queue = buf->next; - buf->next = NULL; + router->queue = NULL; } else { - router->active_logs = 0; + if (router->reconnect_pending) + { + spinlock_release(&router->lock); + blr_restart_master(router); + spinlock_acquire(&router->lock); + } + else + { + router->active_logs = 0; + } } spinlock_release(&router->lock); } while (buf != NULL); @@ -415,7 +523,9 @@ REP_HEADER hdr; int len, reslen; int no_residual = 1; - /* Prepend any residual buffer to the buffer chain we have been called with. */ + /* Prepend any residual buffer to the buffer chain we have + * been called with. + */ if (router->residual) { pkt = gwbuf_append(router->residual, pkt); @@ -833,14 +943,16 @@ ROUTER_SLAVE *slave; static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len) { +char buf[400], *bufp; int i; - skygw_log_write(file, "%s length = %d: ", msg, len); + bufp = buf; + bufp += sprintf(bufp, "%s length = %d: ", msg, len); for (i = 0; i < len && i < 40; i++) - skygw_log_write(file, "0x%02x ", ptr[i]); + bufp += sprintf(bufp, "0x%02x ", ptr[i]); if (i < len) - skygw_log_write_flush(file, "...\n"); + skygw_log_write_flush(file, "%s...\n", buf); else - skygw_log_write_flush(file, "\n"); + skygw_log_write_flush(file, "%s\n", buf); } diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index da7d7f8a2..a9afc2a50 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -153,7 +153,7 @@ char *word, *brkb; int query_len; qtext = GWBUF_DATA(queue); - query_len = extract_field(qtext, 24) - 1; + query_len = extract_field((uint8_t *)qtext, 24) - 1; qtext += 5; // Skip header and first byte of the payload query_text = strndup(qtext, query_len); @@ -292,7 +292,7 @@ unsigned char *data; int len; if ((pkt = gwbuf_alloc(strlen(msg) + 13)) == NULL) - return NULL; + return; data = GWBUF_DATA(pkt); len = strlen(msg) + 1; encode_value(&data[0], len, 24); // Payload length @@ -301,7 +301,7 @@ int len; data[4] = 0xff; // Error indicator data[5] = 0; // Error Code data[6] = 0; // Error Code - strncpy(&data[7], "#00000", 6); + strncpy((char *)&data[7], "#00000", 6); memcpy(&data[13], msg, strlen(msg)); // Error Message slave->dcb->func.write(slave->dcb, pkt); } @@ -346,15 +346,15 @@ int len, ts_len; if ((pkt = gwbuf_alloc(len)) == NULL) return 0; ptr = GWBUF_DATA(pkt); - memcpy(ptr, timestamp_def, sizeof(timestamp_def)); // Fixed preamble + memcpy(ptr, timestamp_def, sizeof(timestamp_def)); // Fixed preamble ptr += sizeof(timestamp_def); - encode_value(ptr, ts_len + 1, 24); // Add length of data packet + encode_value(ptr, ts_len + 1, 24); // Add length of data packet ptr += 3; - *ptr++ = 0x04; // Sequence number in response - *ptr++ = ts_len; // Length of result string - strncpy(ptr, timestamp, ts_len); // Result string + *ptr++ = 0x04; // Sequence number in response + *ptr++ = ts_len; // Length of result string + strncpy((char *)ptr, timestamp, ts_len); // Result string ptr += ts_len; - memcpy(ptr, timestamp_eof, sizeof(timestamp_eof)); // EOF packet to terminate result + memcpy(ptr, timestamp_eof, sizeof(timestamp_eof)); // EOF packet to terminate result return slave->dcb->func.write(slave->dcb, pkt); } @@ -386,7 +386,7 @@ int len, slen; slen = *ptr++; if (slen != 0) { - slave->hostname = strndup(ptr, slen); + slave->hostname = strndup((char *)ptr, slen); ptr += slen; } else @@ -395,14 +395,14 @@ int len, slen; if (slen != 0) { ptr += slen; - slave->user = strndup(ptr, slen); + slave->user = strndup((char *)ptr, slen); } else slave->user = NULL; slen = *ptr++; if (slen != 0) { - slave->passwd = strndup(ptr, slen); + slave->passwd = strndup((char *)ptr, slen); ptr += slen; } else @@ -468,7 +468,7 @@ uint32_t chksum; ptr += 2; serverid = extract_field(ptr, 32); ptr += 4; - strncpy(slave->binlogfile, ptr, BINLOG_FNAMELEN); + strncpy(slave->binlogfile, (char *)ptr, BINLOG_FNAMELEN); slave->state = BLRS_DUMPING; slave->seqno = 1;