From 9186d3fa90c5cac029d325316d40072280555361 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Wed, 1 Oct 2014 18:30:13 +0100 Subject: [PATCH] Fixes for blr_salve performanc issues, linking of embedded library, housekeeper heartbeat addition and unloading of modules to allow profiling of modules using LD_PROFILE environment variable and sprof --- server/core/gateway.c | 1 + server/core/housekeeper.c | 12 +++-- server/core/load_utils.c | 15 ++++++ server/core/memlog.c | 2 + server/core/poll.c | 54 ++++++++++++++++++-- server/core/test/makefile | 9 +++- server/include/dcb.h | 12 +++++ server/include/modules.h | 3 ++ server/modules/include/blr.h | 22 +++++++- server/modules/routing/binlog/CMakeLists.txt | 2 +- server/modules/routing/binlog/Makefile | 8 ++- server/modules/routing/binlog/blr.c | 4 +- server/modules/routing/binlog/blr_file.c | 19 +++++-- server/modules/routing/binlog/blr_master.c | 45 ++++++++++------ server/modules/routing/binlog/blr_slave.c | 24 ++++++--- 15 files changed, 193 insertions(+), 39 deletions(-) diff --git a/server/core/gateway.c b/server/core/gateway.c index 1d67d362f..c8651fe2b 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -1633,6 +1633,7 @@ int main(int argc, char **argv) LOGFILE_MESSAGE, "MaxScale shutdown completed."))); + unload_all_modules(); /* Remove Pidfile */ unlink_pidfile(); diff --git a/server/core/housekeeper.c b/server/core/housekeeper.c index fe6cb9047..318ca8fde 100644 --- a/server/core/housekeeper.c +++ b/server/core/housekeeper.c @@ -43,6 +43,7 @@ static HKTASK *tasks = NULL; static SPINLOCK tasklock = SPINLOCK_INIT; static int do_shutdown = 0; +unsigned long hkheartbeat = 0; static void hkthread(void *); @@ -171,12 +172,17 @@ HKTASK *ptr; time_t now; void (*taskfn)(void *); void *taskdata; +int i; for (;;) { - if (do_shutdown) - return; - thread_millisleep(1000); + for (i = 0; i < 10; i++) + { + if (do_shutdown) + return; + thread_millisleep(100); + hkheartbeat++; + } now = time(0); spinlock_acquire(&tasklock); ptr = tasks; diff --git a/server/core/load_utils.c b/server/core/load_utils.c index cba0c6533..a17a04225 100644 --- a/server/core/load_utils.c +++ b/server/core/load_utils.c @@ -332,6 +332,21 @@ MODULES *ptr; free(mod); } +/** + * Unload all modules + * + * Remove all the modules from the system, called during shutdown + * to allow termination hooks to be called. + */ +void +unload_all_modules() +{ + while (registered) + { + unregister_module(registered->module); + } +} + /** * Print Modules * diff --git a/server/core/memlog.c b/server/core/memlog.c index 989950828..73f0af387 100644 --- a/server/core/memlog.c +++ b/server/core/memlog.c @@ -125,6 +125,8 @@ MEMLOG *ptr; void memlog_log(MEMLOG *log, void *value) { + if (!log) + return; spinlock_acquire(&log->lock); switch (log->type) { diff --git a/server/core/poll.c b/server/core/poll.c index b245b79c0..03ced1bfa 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -32,10 +32,14 @@ #include #include -#define PROFILE_POLL 1 +#define PROFILE_POLL 0 #if PROFILE_POLL #include +#include + +extern unsigned long hkheartbeat; +MEMLOG *plog; #endif extern int lm_enabled_logfiles_bitmask; @@ -143,7 +147,9 @@ static struct { int n_fds[MAXNFDS]; /*< Number of wakeups with particular n_fds value */ int evq_length; /*< Event queue length */ + int evq_pending; /*< Number of pending descriptors in event queue */ int evq_max; /*< Maximum event queue length */ + int wake_evqpending;/*< Woken from epoll_wait with pending events in queue */ } pollStats; /** @@ -194,6 +200,9 @@ int i; for (i = 0; i < n_avg_samples; i++) avg_samples[i] = 0.0; +#if PROFILE_POLL + plog = memlog_create("EventQueueWaitTime", ML_LONG, 10000); +#endif } /** @@ -396,6 +405,8 @@ DCB *zombies = NULL; if (thread_data) thread_data[thread_id].state = THREAD_ZPROCESSING; zombies = dcb_process_zombies(thread_id); + if (thread_data) + thread_data[thread_id].state = THREAD_IDLE; } atomic_add(&n_waiting, 1); @@ -423,6 +434,7 @@ DCB *zombies = NULL; pthread_self(), nfds, eno))); + atomic_add(&n_waiting, -1); } /* * If there are no new descriptors from the non-blocking call @@ -431,11 +443,12 @@ DCB *zombies = NULL; */ else if (nfds == 0 && process_pollq(thread_id) == 0) { - atomic_add(&n_waiting, 1); nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, EPOLL_TIMEOUT); + if (nfds == 0 && pollStats.evq_pending) + atomic_add(&pollStats.wake_evqpending, 1); } else { @@ -474,7 +487,7 @@ DCB *zombies = NULL; /* * Process every DCB that has a new event and add * it to the poll queue. - * If the DCB is currently beign processed then we + * If the DCB is currently being processed then we * or in the new eent bits to the pending event bits * and leave it in the queue. * If the DCB was not already in the queue then it was @@ -489,6 +502,13 @@ DCB *zombies = NULL; spinlock_acquire(&pollqlock); if (DCB_POLL_BUSY(dcb)) { + if (dcb->evq.pending_events == 0) + { + pollStats.evq_pending++; +#if PROFILE_POLL + dcb->evq.inserted = hkheartbeat; +#endif + } dcb->evq.pending_events |= ev; } else @@ -508,6 +528,10 @@ DCB *zombies = NULL; dcb->evq.next = dcb; } pollStats.evq_length++; + pollStats.evq_pending++; +#if PROFILE_POLL + dcb->evq.inserted = hkheartbeat; +#endif if (pollStats.evq_length > pollStats.evq_max) { pollStats.evq_max = pollStats.evq_length; @@ -527,6 +551,10 @@ DCB *zombies = NULL; thread_data[thread_id].state = THREAD_ZPROCESSING; } zombies = dcb_process_zombies(thread_id); + if (thread_data) + { + thread_data[thread_id].state = THREAD_IDLE; + } } if (do_shutdown) @@ -608,12 +636,17 @@ uint32_t ev; { ev = dcb->evq.pending_events; dcb->evq.pending_events = 0; + pollStats.evq_pending--; } spinlock_release(&pollqlock); if (found == 0) return 0; +#if PROFILE_POLL + memlog_log(plog, hkheartbeat - dcb->evq.inserted); +#endif + CHK_DCB(dcb); if (thread_data) @@ -927,6 +960,10 @@ int i; pollStats.evq_length); dcb_printf(dcb, "Maximum event queue length: %d\n", pollStats.evq_max); + dcb_printf(dcb, "Number of DCBs with pending events: %d\n", + pollStats.evq_pending); + dcb_printf(dcb, "Number of wakeups with pending queue: %d\n", + pollStats.wake_evqpending); dcb_printf(dcb, "No of poll completions with descriptors\n"); dcb_printf(dcb, "\tNo. of descriptors\tNo. of poll completions.\n"); @@ -1127,6 +1164,13 @@ uint32_t ev = EPOLLOUT; spinlock_acquire(&pollqlock); if (DCB_POLL_BUSY(dcb)) { + if (dcb->evq.pending_events == 0) + { + pollStats.evq_pending++; +#if PROFILE_POLL + dcb->evq.inserted = hkheartbeat; +#endif + } dcb->evq.pending_events |= ev; } else @@ -1146,6 +1190,10 @@ uint32_t ev = EPOLLOUT; dcb->evq.next = dcb; } pollStats.evq_length++; + pollStats.evq_pending++; +#if PROFILE_POLL + dcb->evq.inserted = hkheartbeat; +#endif if (pollStats.evq_length > pollStats.evq_max) { pollStats.evq_max = pollStats.evq_length; diff --git a/server/core/test/makefile b/server/core/test/makefile index 14f2828f2..651254476 100644 --- a/server/core/test/makefile +++ b/server/core/test/makefile @@ -21,7 +21,7 @@ LDFLAGS=-rdynamic -L$(LOGPATH) -L$(EMBEDDED_LIB) \ LIBS= -lz -lm -lcrypt -lcrypto -ldl -laio -lrt -pthread -llog_manager \ -L../../inih/extra -linih -lssl -lstdc++ -lmysqld -TESTS=testhash testspinlock testfilter testadminusers +TESTS=testhash testspinlock testfilter testadminusers testmemlog cleantests: - $(DEL) *.o @@ -59,6 +59,13 @@ testadminusers: testadminusers.c libcore.a -I$(ROOT_PATH)/utils \ testadminusers.c libcore.a $(UTILSPATH)/skygw_utils.o $(LIBS) -o testadminusers +testmemlog: testmemlog.c libcore.a + $(CC) $(CFLAGS) $(LDFLAGS) \ + -I$(ROOT_PATH)/server/include \ + -I$(ROOT_PATH)/utils \ + testmemlog.c libcore.a $(UTILSPATH)/skygw_utils.o $(LIBS) -o testmemlog + + libcore.a: ../*.o ar rv libcore.a ../*.o diff --git a/server/include/dcb.h b/server/include/dcb.h index 72686b7b7..11061bb8f 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -98,12 +98,24 @@ typedef struct gw_protocol { int (*session)(struct dcb *, void *); } GWPROTOCOL; +/** + * The event queue structure used in the polling loop to maintain a queue + * of events that need to be processed for the DCB. + * + * next The next DCB in the event queue + * prev The previous DCB in the event queue + * pending_events The events that are pending processing + * processing Flag to indicate the processing status of the DCB + * eventqlock Spinlock to protect this structure + * inserted Insertion time for logging purposes + */ typedef struct { struct dcb *next; struct dcb *prev; uint32_t pending_events; int processing; SPINLOCK eventqlock; + unsigned long inserted; } DCBEVENTQ; /** diff --git a/server/include/modules.h b/server/include/modules.h index 199e3a24b..3dff0e2e5 100644 --- a/server/include/modules.h +++ b/server/include/modules.h @@ -32,6 +32,8 @@ * 13/06/13 Mark Riddoch Initial implementation * 08/07/13 Mark Riddoch Addition of monitor modules * 29/05/14 Mark Riddoch Addition of filter modules + * 01/10/14 Mark Riddoch Addition of call to unload all modules on + * shutdown * @endverbatim */ @@ -58,6 +60,7 @@ typedef struct modules { extern void *load_module(const char *module, const char *type); extern void unload_module(const char *module); +extern void unload_all_modules(); extern void printModules(); extern void dprintAllModules(DCB *); char* get_maxscale_home(void); diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 1ba575d49..a6973593e 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -54,7 +54,7 @@ * Default burst sizes for slave catchup */ #define DEF_SHORT_BURST 15 -#define DEF_LONG_BURST 2000 +#define DEF_LONG_BURST 500 /** * Some useful macros for examining the MySQL Response packets @@ -366,6 +366,26 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered", #define LOG_EVENT_NO_FILTER_F 0x0100 #define LOG_EVENT_MTS_ISOLATE_F 0x0200 +/** + * Macros to extract common fields + */ +#define INLINE_EXTRACT 1 /* Set to 0 for debug purposes */ + +#if INLINE_EXTRACT +#define EXTRACT16(x) (*(uint8_t *)(x) | (*((uint8_t *)(x) + 1) << 8)) +#define EXTRACT24(x) (*(uint8_t *)(x) | \ + (*((uint8_t *)(x) + 1) << 8) | \ + (*((uint8_t *)(x) + 2) << 16)) +#define EXTRACT32(x) (*(uint8_t *)(x) | \ + (*((uint8_t *)(x) + 1) << 8) | \ + (*((uint8_t *)(x) + 2) << 16) | \ + (*((uint8_t *)(x) + 3) << 24)) +#else +#define EXTRACT16(x) extract_field((x), 16) +#define EXTRACT24(x) extract_field((x), 24) +#define EXTRACT32(x) extract_field((x), 32) +#endif + /* * Externals within the router */ diff --git a/server/modules/routing/binlog/CMakeLists.txt b/server/modules/routing/binlog/CMakeLists.txt index 071901b70..4de2a35b4 100644 --- a/server/modules/routing/binlog/CMakeLists.txt +++ b/server/modules/routing/binlog/CMakeLists.txt @@ -1,4 +1,4 @@ add_library(binlogrouter SHARED blr.c blr_master.c blr_cache.c blr_slave.c blr_file.c) set_target_properties(binlogrouter PROPERTIES INSTALL_RPATH ${CMAKE_INSTALL_RPATH}:${CMAKE_INSTALL_PREFIX}/lib) -target_link_libraries(binlogrouter ssl pthread log_manager ${EMBEDDED_LIB}) +target_link_libraries(binlogrouter ssl pthread log_manager) install(TARGETS binlogrouter DESTINATION modules) diff --git a/server/modules/routing/binlog/Makefile b/server/modules/routing/binlog/Makefile index 6e9282ea1..0d665d264 100644 --- a/server/modules/routing/binlog/Makefile +++ b/server/modules/routing/binlog/Makefile @@ -31,10 +31,14 @@ CFLAGS=-c -fPIC -I/usr/include -I../../include -I../../../include \ include ../../../../makefile.inc +#LDFLAGS=-shared -L$(LOGPATH) -L$(QCLASSPATH) -L$(EMBEDDED_LIB) \ +# -Wl,-rpath,$(DEST)/lib \ +# -Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH) -Wl,-rpath,$(QCLASSPATH) \ +# -Wl,-rpath,$(EMBEDDED_LIB) LDFLAGS=-shared -L$(LOGPATH) -L$(QCLASSPATH) -L$(EMBEDDED_LIB) \ -Wl,-rpath,$(DEST)/lib \ - -Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH) -Wl,-rpath,$(QCLASSPATH) \ - -Wl,-rpath,$(EMBEDDED_LIB) + -Wl,-rpath,$(LOGPATH) -Wl,-rpath,$(UTILSPATH) + SRCS=blr.c blr_master.c blr_cache.c blr_slave.c blr_file.c OBJ=$(SRCS:.c=.o) diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index 1d4a9d8ff..f41462468 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -173,6 +173,8 @@ int i; inst->service = service; spinlock_init(&inst->lock); + inst->binlog_fd = -1; + inst->low_water = DEF_LOW_WATER; inst->high_water = DEF_HIGH_WATER; inst->initbinlog = 0; @@ -749,7 +751,7 @@ struct tm tm; dcb_printf(dcb, "\tSpinlock statistics (rses_lock):\n"); spinlock_stats(&session->rses_lock, spin_reporter, dcb); #endif - + dcb_printf(dcb, "\n"); session = session->next; } spinlock_release(&router_inst->lock); diff --git a/server/modules/routing/binlog/blr_file.c b/server/modules/routing/binlog/blr_file.c index 7a44033fe..aad5fea45 100644 --- a/server/modules/routing/binlog/blr_file.c +++ b/server/modules/routing/binlog/blr_file.c @@ -290,14 +290,25 @@ int n; /* Read the header information from the file */ if ((n = read(fd, hdbuf, 19)) != 19) { - LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, - "Failed to read header for binlog entry, " - "at %d (%s).\n", pos, strerror(errno)))); - if (n> 0 && n < 19) + switch (n) + { + case 0: + LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG, + "Reached end of binlog file at %d.\n", + pos))); + break; + case -1: + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "Failed to read binlog file at position %d" + " (%s).\n", pos, strerror(errno)))); + break; + default: LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Short read when reading the header. " "Expected 19 bytes got %d bytes.\n", n))); + break; + } return NULL; } hdr->timestamp = extract_field(hdbuf, 32); diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index 6f92d193d..ad030359d 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -515,23 +515,23 @@ static REP_HEADER phdr; /* Get the length of the packet from the residual and new packet */ if (reslen >= 3) { - len = extract_field(pdata, 24); + len = EXTRACT24(pdata); } else if (reslen == 2) { - len = extract_field(pdata, 16); - len |= (extract_field(GWBUF_DATA(pkt->next), 8) << 16); + len = EXTRACT16(pdata); + len |= (*(uint8_t *)GWBUF_DATA(pkt->next) << 16); } else if (reslen == 1) { - len = extract_field(pdata, 8); - len |= (extract_field(GWBUF_DATA(pkt->next), 16) << 8); + len = *pdata; + len |= (EXTRACT16(GWBUF_DATA(pkt->next)) << 8); } len += 4; // Allow space for the header } else { - len = extract_field(pdata, 24) + 4; + len = EXTRACT24(pdata) + 4; } if (reslen < len && pkt_length >= len) @@ -779,18 +779,18 @@ static REP_HEADER phdr; * @param hdr The packet header to populate */ void -blr_extract_header(uint8_t *ptr, REP_HEADER *hdr) +blr_extract_header(register uint8_t *ptr, register REP_HEADER *hdr) { - hdr->payload_len = extract_field(ptr, 24); + hdr->payload_len = EXTRACT24(ptr); hdr->seqno = ptr[3]; hdr->ok = ptr[4]; - hdr->timestamp = extract_field(&ptr[5], 32); + hdr->timestamp = EXTRACT32(&ptr[5]); hdr->event_type = ptr[9]; - hdr->serverid = extract_field(&ptr[10], 32); - hdr->event_size = extract_field(&ptr[14], 32); - hdr->next_pos = extract_field(&ptr[18], 32); - hdr->flags = extract_field(&ptr[22], 16); + hdr->serverid = EXTRACT32(&ptr[10]); + hdr->event_size = EXTRACT32(&ptr[14]); + hdr->next_pos = EXTRACT32(&ptr[18]); + hdr->flags = EXTRACT16(&ptr[22]); } /** @@ -892,13 +892,18 @@ blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t * { GWBUF *pkt; uint8_t *buf; -ROUTER_SLAVE *slave; +ROUTER_SLAVE *slave, *nextslave; int action; spinlock_acquire(&router->lock); slave = router->slaves; while (slave) { + if (slave->state != BLRS_DUMPING) + { + slave = slave->next; + continue; + } spinlock_acquire(&slave->catch_lock); if ((slave->cstate & (CS_UPTODATE|CS_DIST)) == CS_UPTODATE) { @@ -986,6 +991,7 @@ int action; */ if (slave->cstate & CS_UPTODATE) { + nextslave = slave->next; spinlock_release(&router->lock); LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG, "Force slave %d into catchup mode %s@%d\n", @@ -998,9 +1004,18 @@ int action; spinlock_acquire(&router->lock); slave = router->slaves; if (slave) - continue; + { + while (slave && slave != nextslave) + slave = slave->next; + if (slave) + continue; + else + break; + } else + { break; + } } } } diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index 0ed3618c9..2afcda7cf 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -473,12 +473,13 @@ blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue { GWBUF *resp; uint8_t *ptr; -int len, flags, serverid, rval; +int len, flags, serverid, rval, binlognamelen; REP_HEADER hdr; uint32_t chksum; ptr = GWBUF_DATA(queue); len = extract_field(ptr, 24); + binlognamelen = len - 11; ptr += 4; // Skip length and sequence number if (*ptr++ != COM_BINLOG_DUMP) { @@ -495,15 +496,16 @@ uint32_t chksum; ptr += 2; serverid = extract_field(ptr, 32); ptr += 4; - strncpy(slave->binlogfile, (char *)ptr, BINLOG_FNAMELEN); + strncpy(slave->binlogfile, (char *)ptr, binlognamelen); + slave->binlogfile[binlognamelen] = 0; slave->state = BLRS_DUMPING; slave->seqno = 1; if (slave->nocrc) - len = 0x2b; + len = 19 + 8 + binlognamelen; else - len = 0x2f; + len = 19 + 8 + 4 + binlognamelen; // Build a fake rotate event resp = gwbuf_alloc(len + 5); @@ -758,7 +760,8 @@ struct timespec req; slave->cstate &= ~CS_BUSY; spinlock_release(&slave->catch_lock); - close(fd); + if (fd != -1) + close(fd); poll_fake_write_event(slave->dcb); if (record) { @@ -831,14 +834,19 @@ ROUTER_INSTANCE *router = slave->router; * Rotate the slave to the new binlog file * * @param slave The slave instance - * @param ptr The rotate event (minux header and OK byte) + * @param ptr The rotate event (minus header and OK byte) */ void blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr) { +int len = EXTRACT24(ptr + 9); // Extract the event length + + len = len - (19 + 8 + 4); // Remove length of header, checksum and position + if (len > BINLOG_FNAMELEN) + len = BINLOG_FNAMELEN; ptr += 19; // Skip header slave->binlog_pos = extract_field(ptr, 32); slave->binlog_pos += (extract_field(ptr+4, 32) << 32); - memcpy(slave->binlogfile, ptr + 8, BINLOG_FNAMELEN); - slave->binlogfile[BINLOG_FNAMELEN] = 0; + memcpy(slave->binlogfile, ptr + 8, len); + slave->binlogfile[len] = 0; }