From 06596a0bc33a590ab2fe6664965c18c997f9d597 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Thu, 25 Sep 2014 17:35:27 +0100 Subject: [PATCH 1/3] Add binlog router back into Makefiles --- server/modules/routing/Makefile | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/server/modules/routing/Makefile b/server/modules/routing/Makefile index d27430112..d09ce30ce 100644 --- a/server/modules/routing/Makefile +++ b/server/modules/routing/Makefile @@ -47,11 +47,12 @@ CLIOBJ=$(CLISRCS:.c=.o) SRCS=$(TESTSRCS) $(READCONSRCS) $(DEBUGCLISRCS) cli.c OBJ=$(SRCS:.c=.o) LIBS=$(UTILSPATH)/skygw_utils.o -lssl -llog_manager -MODULES= libdebugcli.so libreadconnroute.so libtestroute.so libcli.so +MODULES= libdebugcli.so libreadconnroute.so libtestroute.so libcli.so libbinlogrouter.so all: $(MODULES) (cd readwritesplit; make) + (cd binlog; make) libtestroute.so: $(TESTOBJ) $(CC) $(LDFLAGS) $(TESTOBJ) $(LIBS) -o $@ @@ -68,12 +69,16 @@ libcli.so: $(CLIOBJ) libreadwritesplit.so: (cd readwritesplit; touch depend.mk ; make; cp $@ ..) +libbinlogrouter.so: + (cd binlog; touch depend.mk ; make; cp $@ ..) + .c.o: $(CC) $(CFLAGS) $< -o $@ clean: $(DEL) $(OBJ) $(MODULES) (cd readwritesplit; touch depend.mk; make clean) + (cd binlog; touch depend.mk; make clean) tags: ctags $(SRCS) $(HDRS) @@ -83,6 +88,7 @@ depend: @$(DEL) depend.mk cc -M $(CFLAGS) $(SRCS) > depend.mk (cd readwritesplit; touch depend.mk ; make depend) + (cd binlog; touch depend.mk ; make depend) install: $(MODULES) install -D $(MODULES) $(DEST)/modules From 3430fc99d2496d59de9dbcd6ce3696f3eb517265 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Fri, 26 Sep 2014 12:36:59 +0100 Subject: [PATCH 2/3] Shutdown fix in housekeeper In memory logging in blr_master --- server/core/gateway.c | 1 + server/core/housekeeper.c | 14 +++++ server/core/poll.c | 21 ++++++- server/include/housekeeper.h | 1 + server/include/spinlock.h | 2 +- server/modules/include/blr.h | 4 +- server/modules/routing/binlog/blr_master.c | 73 ++++++++++++++++++---- 7 files changed, 100 insertions(+), 16 deletions(-) diff --git a/server/core/gateway.c b/server/core/gateway.c index 79c2585fe..f2c1eefb2 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -1646,6 +1646,7 @@ void shutdown_server() { poll_shutdown(); + hkshutdown(); log_flush_shutdown(); } diff --git a/server/core/housekeeper.c b/server/core/housekeeper.c index 5aa5c7a29..fe6cb9047 100644 --- a/server/core/housekeeper.c +++ b/server/core/housekeeper.c @@ -42,6 +42,8 @@ static HKTASK *tasks = NULL; */ static SPINLOCK tasklock = SPINLOCK_INIT; +static int do_shutdown = 0; + static void hkthread(void *); /** @@ -172,6 +174,8 @@ void *taskdata; for (;;) { + if (do_shutdown) + return; thread_millisleep(1000); now = time(0); spinlock_acquire(&tasklock); @@ -194,3 +198,13 @@ void *taskdata; spinlock_release(&tasklock); } } + +/** + * Called to shutdown the housekeeper + * + */ +void +hkshutdown() +{ + do_shutdown = 1; +} diff --git a/server/core/poll.c b/server/core/poll.c index 4c1a57a71..12cb1d69d 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -391,7 +391,7 @@ DCB *zombies = NULL; while (1) { /* Process of the queue of waiting requests */ - while (process_pollq(thread_id)) + while (do_shutdown == 0 && process_pollq(thread_id)) { if (thread_data) thread_data[thread_id].state = THREAD_ZPROCESSING; @@ -885,6 +885,20 @@ poll_bitmask() return &poll_mask; } +/** + * Display an entry from the spinlock statistics data + * + * @param dcb The DCB to print to + * @param desc Description of the statistic + * @param value The statistic value + */ +static void +spin_reporter(void *dcb, char *desc, int value) +{ + dcb_printf((DCB *)dcb, "\t%-40s %d\n", desc, value); +} + + /** * Debug routine to print the polling statistics * @@ -922,6 +936,11 @@ int i; } dcb_printf(dcb, "\t>= %d\t\t\t%d\n", MAXNFDS, pollStats.n_fds[MAXNFDS-1]); + +#if SPINLOCK_PROFILE + dcb_printf(dcb, "Event queue lock statistics:\n"); + spinlock_stats(&pollqlock, spin_reporter, dcb); +#endif } /** diff --git a/server/include/housekeeper.h b/server/include/housekeeper.h index 597f19a91..35e76e80d 100644 --- a/server/include/housekeeper.h +++ b/server/include/housekeeper.h @@ -47,4 +47,5 @@ typedef struct hktask { extern void hkinit(); extern int hktask_add(char *name, void (*task)(void *), void *data, int frequency); extern int hktask_remove(char *name); +extern void hkshutdown(); #endif diff --git a/server/include/spinlock.h b/server/include/spinlock.h index e5f938815..43192da3f 100644 --- a/server/include/spinlock.h +++ b/server/include/spinlock.h @@ -31,7 +31,7 @@ #include #include -#define SPINLOCK_PROFILE 0 +#define SPINLOCK_PROFILE 1 /** * The spinlock structure. diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 22db7b1ba..6e151d923 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -43,8 +43,8 @@ * High and Low water marks for the slave dcb. These values can be overriden * by the router options highwater and lowwater. */ -#define DEF_LOW_WATER 20000 -#define DEF_HIGH_WATER 300000 +#define DEF_LOW_WATER 2000 +#define DEF_HIGH_WATER 30000 /** * Some useful macros for examining the MySQL Response packets diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index ca7ec605b..ee14cec4c 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -55,21 +55,57 @@ #include #include +#include + /* Temporary requirement for auth data */ #include +#define SAMPLE_COUNT 10000 +CYCLES samples[10][SAMPLE_COUNT]; +int sample_index[10] = { 0, 0, 0 }; + +#define LOGD_SLAVE_CATCHUP1 0 +#define LOGD_SLAVE_CATCHUP2 1 +#define LOGD_DISTRIBUTE 2 +#define LOGD_FILE_FLUSH 3 + +SPINLOCK logspin = SPINLOCK_INIT; + +void +log_duration(int sample, CYCLES duration) +{ +char fname[100]; +int i; +FILE *fp; + + spinlock_acquire(&logspin); + samples[sample][sample_index[sample]++] = duration; + if (sample_index[sample] == SAMPLE_COUNT) + { + sprintf(fname, "binlog_profile.%d", sample); + if ((fp = fopen(fname, "a")) != NULL) + { + for (i = 0; i < SAMPLE_COUNT; i++) + fprintf(fp, "%ld\n", samples[sample][i]); + fclose(fp); + } + sample_index[sample] = 0; + } + spinlock_release(&logspin); +} + extern int lm_enabled_logfiles_bitmask; static GWBUF *blr_make_query(char *statement); static GWBUF *blr_make_registration(ROUTER_INSTANCE *router); static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router); -static void encode_value(unsigned char *data, unsigned int value, int len); -static void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt); +void encode_value(unsigned char *data, unsigned int value, int len); +void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt); static void blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr); -static void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr); +void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr); static void *CreateMySQLAuthData(char *username, char *password, char *database); -static void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr); -static uint32_t extract_field(uint8_t *src, int bits); +void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr); +inline uint32_t extract_field(uint8_t *src, int bits); static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len); static int keepalive = 1; @@ -460,7 +496,7 @@ int len = 0x1b; * @param value The value to pack * @param len Number of bits to encode value into */ -static void +void encode_value(unsigned char *data, unsigned int value, int len) { while (len > 0) @@ -478,7 +514,7 @@ encode_value(unsigned char *data, unsigned int value, int len) * @param router The router instance * @param pkt The binlog records */ -static void +void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt) { uint8_t *msg = NULL, *ptr, *pdata; @@ -766,7 +802,9 @@ static REP_HEADER phdr; { ss_dassert(pkt_length == 0); } +{ CYCLES start = rdtsc(); blr_file_flush(router); +log_duration(LOGD_FILE_FLUSH, rdtsc() - start); } } /** @@ -775,7 +813,7 @@ static REP_HEADER phdr; * @param pkt The incoming packet in a GWBUF chain * @param hdr The packet header to populate */ -static void +void blr_extract_header(uint8_t *ptr, REP_HEADER *hdr) { @@ -796,10 +834,10 @@ blr_extract_header(uint8_t *ptr, REP_HEADER *hdr) * @param src The raw packet source * @param birs The number of bits to extract (multiple of 8) */ -static uint32_t -extract_field(uint8_t *src, int bits) +inline uint32_t +extract_field(register uint8_t *src, int bits) { -uint32_t rval = 0, shift = 0; +register uint32_t rval = 0, shift = 0; while (bits > 0) { @@ -884,14 +922,16 @@ MYSQL_session *auth_info; * @param hdr The replication event header * @param ptr The raw replication event data */ -static void +void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) { GWBUF *pkt; uint8_t *buf; ROUTER_SLAVE *slave; int action; +CYCLES entry; + entry = rdtsc(); spinlock_acquire(&router->lock); slave = router->slaves; while (slave) @@ -945,12 +985,16 @@ int action; spinlock_acquire(&slave->catch_lock); if (slave->overrun) { +CYCLES cycle_start, cycles; slave->stats.n_overrun++; slave->overrun = 0; spinlock_release(&router->lock); slave->cstate &= ~(CS_UPTODATE|CS_DIST); spinlock_release(&slave->catch_lock); +cycle_start = rdtsc(); blr_slave_catchup(router, slave); +cycles = rdtsc() - cycle_start; +log_duration(LOGD_SLAVE_CATCHUP2, cycles); spinlock_acquire(&router->lock); slave = router->slaves; if (slave) @@ -983,6 +1027,7 @@ int action; */ if (slave->cstate & CS_UPTODATE) { +CYCLES cycle_start, cycles; spinlock_release(&router->lock); LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG, "Force slave %d into catchup mode %s@%d\n", @@ -991,7 +1036,10 @@ int action; spinlock_acquire(&slave->catch_lock); slave->cstate &= ~(CS_UPTODATE|CS_DIST); spinlock_release(&slave->catch_lock); +cycle_start = rdtsc(); blr_slave_catchup(router, slave); +cycles = rdtsc() - cycle_start; +log_duration(LOGD_SLAVE_CATCHUP1, cycles); spinlock_acquire(&router->lock); slave = router->slaves; if (slave) @@ -1005,6 +1053,7 @@ int action; slave = slave->next; } spinlock_release(&router->lock); + log_duration(LOGD_DISTRIBUTE, rdtsc() - entry); } static void From 5ec1a83f3bab6dd555b2991aeb7a56db173e16a6 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Fri, 26 Sep 2014 15:40:32 +0300 Subject: [PATCH 3/3] Fix to #478, http://bugs.skysql.com/show_bug.cgi?id=478 Execution of session commands failed because session commands and normal sql statements were executed in a wrong order if backend was executing previous session command while new sql stmt was routed to that backend. There was a window where ordering went wrong. It is possible that one normal sql stmt arrives while previous sescmds are still being executed. Introduced a new member in backend_ref_t structure, bref_pending_cmd where new sql stmt pointer is stored in that case. When sescmds are executed completely, that command is automatically executed next. --- server/core/dcb.c | 1 - server/modules/include/readwritesplit.h | 1 + server/modules/protocol/mysql_common.c | 3 + .../routing/readwritesplit/readwritesplit.c | 57 ++++++++++++++++++- 4 files changed, 60 insertions(+), 2 deletions(-) diff --git a/server/core/dcb.c b/server/core/dcb.c index f4d5a560c..0b2b038e7 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -485,7 +485,6 @@ bool succp = false; /*< * Close file descriptor and move to clean-up phase. */ - ss_dassert(excluded != dcb); rc = close(dcb->fd); if (rc < 0) { diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 5b17dbc7d..482ecea27 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -224,6 +224,7 @@ typedef struct backend_ref_st { bref_state_t bref_state; int bref_num_result_wait; sescmd_cursor_t bref_sescmd_cur; + GWBUF* bref_pending_cmd; /*< For stmt which can't be routed due active sescmd execution */ #if defined(SS_DEBUG) skygw_chk_t bref_chk_tail; #endif diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index 2f845f268..b1bd5481d 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -1642,6 +1642,8 @@ void protocol_archive_srv_command( server_command_t* h1; int len = 0; + CHK_PROTOCOL(p); + spinlock_acquire(&p->protocol_lock); if (p->protocol_state != MYSQL_PROTOCOL_ACTIVE) @@ -1692,6 +1694,7 @@ void protocol_archive_srv_command( retblock: spinlock_release(&p->protocol_lock); + CHK_PROTOCOL(p); } diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 451b29742..c602d8913 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -1957,6 +1957,26 @@ static int routeQuery( if (succp) /*< Have DCB of the target backend */ { + backend_ref_t* bref; + sescmd_cursor_t* scur; + + bref = get_bref_from_dcb(router_cli_ses, target_dcb); + scur = &bref->bref_sescmd_cur; + /** + * Store current stmt if execution of previous session command + * haven't completed yet. Note that according to MySQL protocol + * there can only be one such non-sescmd stmt at the time. + */ + if (sescmd_cursor_is_active(scur)) + { + ss_dassert(bref->bref_pending_cmd == NULL); + bref->bref_pending_cmd = gwbuf_clone(querybuf); + + rses_end_locked_router_action(router_cli_ses); + ret = 1; + goto retblock; + } + if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1) { backend_ref_t* bref; @@ -2295,7 +2315,34 @@ static void clientReply ( ss_dassert(succp); } - /** Unlock router session */ + else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */ + { + int ret; + + CHK_GWBUF(bref->bref_pending_cmd); + + if ((ret = bref->bref_dcb->func.write(bref->bref_dcb, + gwbuf_clone(bref->bref_pending_cmd))) == 1) + { + ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; + atomic_add(&inst->stats.n_queries, 1); + /** + * Add one query response waiter to backend reference + */ + bref_set_state(bref, BREF_QUERY_ACTIVE); + bref_set_state(bref, BREF_WAITING_RESULT); + } + else + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Routing query \"%s\" failed.", + bref->bref_pending_cmd))); + } + gwbuf_free(bref->bref_pending_cmd); + bref->bref_pending_cmd = NULL; + } + /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); lock_failed: @@ -3656,6 +3703,14 @@ static bool route_session_write( { succp = false; } + else if (LOG_IS_ENABLED(LOGFILE_TRACE)) + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Wrote to %s:%d", + backend_ref[i].bref_backend->backend_server->name, + backend_ref[i].bref_backend->backend_server->port))); + } } } rses_end_locked_router_action(router_cli_ses);