From 0ef87e3cc133a9d1f4214b50152ff1a7bdbaa8f2 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Tue, 30 Sep 2014 13:25:45 +0100 Subject: [PATCH] Updates to slave catchup mode to use fake events Addition of fake EPOLLOUT event mechanism New memlog feature for debugging purposes --- server/core/CMakeLists.txt | 5 +- server/core/Makefile | 6 +- server/core/gateway.c | 4 +- server/core/memlog.c | 209 +++++++++++ server/core/poll.c | 44 +++ server/core/test/CMakeLists.txt | 3 + server/core/test/testmemlog.c | 404 +++++++++++++++++++++ server/include/memlog.h | 51 +++ server/include/spinlock.h | 2 +- server/modules/include/blr.h | 57 ++- server/modules/routing/binlog/blr.c | 123 ++++++- server/modules/routing/binlog/blr_master.c | 50 +-- server/modules/routing/binlog/blr_slave.c | 246 ++++--------- 13 files changed, 951 insertions(+), 253 deletions(-) create mode 100644 server/core/memlog.c create mode 100644 server/core/test/testmemlog.c create mode 100644 server/include/memlog.h diff --git a/server/core/CMakeLists.txt b/server/core/CMakeLists.txt index 2c7854de3..00c072b1d 100644 --- a/server/core/CMakeLists.txt +++ b/server/core/CMakeLists.txt @@ -5,7 +5,8 @@ target_link_libraries(fullcore log_manager utils pthread ${EMBEDDED_LIB} ssl aio add_executable(maxscale atomic.c buffer.c spinlock.c gateway.c gw_utils.c utils.c dcb.c load_utils.c session.c service.c server.c poll.c config.c users.c hashtable.c dbusers.c thread.c gwbitmask.c - monitor.c adminusers.c secrets.c filter.c modutil.c hint.c housekeeper.c) + monitor.c adminusers.c secrets.c filter.c modutil.c hint.c + housekeeper.c memlog.c) target_link_libraries(maxscale ${EMBEDDED_LIB} log_manager utils ssl aio pthread crypt dl crypto inih z rt m stdc++) install(TARGETS maxscale DESTINATION bin) @@ -19,4 +20,4 @@ install(TARGETS maxpasswd DESTINATION bin) if(BUILD_TESTS) add_subdirectory(test) -endif() \ No newline at end of file +endif() diff --git a/server/core/Makefile b/server/core/Makefile index 7ad4e1b01..e82204589 100644 --- a/server/core/Makefile +++ b/server/core/Makefile @@ -65,7 +65,8 @@ include ../../makefile.inc SRCS= atomic.c buffer.c spinlock.c gateway.c \ gw_utils.c utils.c dcb.c load_utils.c session.c service.c server.c \ poll.c config.c users.c hashtable.c dbusers.c thread.c gwbitmask.c \ - monitor.c adminusers.c secrets.c filter.c modutil.c hint.c housekeeper.c + monitor.c adminusers.c secrets.c filter.c modutil.c hint.c \ + housekeeper.c memlog.c HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \ ../include/gw.h ../modules/include/mysql_client_server_protocol.h \ @@ -73,7 +74,8 @@ HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \ ../include/modules.h ../include/poll.h ../include/config.h \ ../include/users.h ../include/hashtable.h ../include/gwbitmask.h \ ../include/adminusers.h ../include/version.h ../include/maxscale.h \ - ../include/filter.h ../include/modutil.h ../hint.h ../include/housekeeper.h + ../include/filter.h ../include/modutil.h ../hint.h \ + ../include/housekeeper.h ../include/memlog.h OBJ=$(SRCS:.c=.o) diff --git a/server/core/gateway.c b/server/core/gateway.c index f2c1eefb2..1d67d362f 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -53,6 +53,7 @@ #include #include #include +#include #include #include @@ -1643,10 +1644,11 @@ return_main: * Shutdown MaxScale server */ void - shutdown_server() +shutdown_server() { poll_shutdown(); hkshutdown(); + memlog_flush_all(); log_flush_shutdown(); } diff --git a/server/core/memlog.c b/server/core/memlog.c new file mode 100644 index 000000000..989950828 --- /dev/null +++ b/server/core/memlog.c @@ -0,0 +1,209 @@ +/* + * This file is distributed as part of the MariaDB MaxScale. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright MariaDB Ab 2014 + */ + +/** + * @file memlog.c - Implementation of memory logging mechanism for debug purposes + * + * @verbatim + * Revision History + * + * Date Who Description + * 26/09/14 Mark Riddoch Initial implementation + * + * @endverbatim + */ +#include +#include + +static MEMLOG *memlogs = NULL; +static SPINLOCK *memlock = SPINLOCK_INIT; + +static void memlog_flush(MEMLOG *); + +/** + * Create a new instance of a memory logger. + * + * @param name The name of the memory log + * @param type The type of item being logged + * @param size The number of items to store in memory before flushign to disk + * + * @return MEMLOG* A memory log handle + */ +MEMLOG * +memlog_create(char *name, MEMLOGTYPE type, int size) +{ +MEMLOG *log; + + if ((log = (MEMLOG *)malloc(sizeof(MEMLOG))) == NULL) + { + return NULL; + } + + log->name = strdup(name); + spinlock_init(&log->lock); + log->type = type; + log->offset = 0; + log->size = size; + switch (type) + { + case ML_INT: + log->values = malloc(sizeof(int) * size); + break; + case ML_LONG: + log->values = malloc(sizeof(long) * size); + break; + case ML_LONGLONG: + log->values = malloc(sizeof(long long) * size); + break; + case ML_STRING: + log->values = malloc(sizeof(char *) * size); + break; + } + if (log->values == NULL) + { + free(log); + return NULL; + } + spinlock_acquire(&memlock); + log->next = memlogs; + memlogs = log; + spinlock_release(&memlock); + + return log; +} + +/** + * Destroy a memory logger any unwritten data will be flushed to disk + * + * @param log The memory log to destroy + */ +void +memlog_destroy(MEMLOG *log) +{ +MEMLOG *ptr; + + memlog_flush(log); + free(log->values); + + spinlock_acquire(&memlock); + if (memlogs == log) + memlogs = log->next; + else + { + ptr = memlogs; + while (ptr && ptr->next != log) + ptr = ptr->next; + if (ptr) + ptr->next = log->next; + } + spinlock_release(&memlock); + free(log->name); + free(log); +} + +/** + * Log a data item to the memory logger + * + * @param log The memory logger + * @param value The value to log + */ +void +memlog_log(MEMLOG *log, void *value) +{ + spinlock_acquire(&log->lock); + switch (log->type) + { + case ML_INT: + ((int *)(log->values))[log->offset] = (int)value; + break; + case ML_LONG: + ((long *)(log->values))[log->offset] = (long)value; + break; + case ML_LONGLONG: + ((long long *)(log->values))[log->offset] = (long long)value; + break; + case ML_STRING: + ((char **)(log->values))[log->offset] = (char *)value; + break; + } + log->offset++; + if (log->offset == log->size) + { + memlog_flush(log); + log->offset = 0; + } + spinlock_release(&log->lock); +} + +/** + * Flush all memlogs to disk, called during shutdown + * + */ +void +memlog_flush_all() +{ +MEMLOG *log; + + spinlock_acquire(&memlock); + log = memlogs; + while (log) + { + spinlock_acquire(&log->lock); + memlog_flush(log); + spinlock_release(&log->lock); + log = log->next; + } + spinlock_release(&memlock); +} + + +/** + * Flush a memory log to disk + * + * Assumes the the log->lock has been acquired by the caller + * + * @param log The memory log to flush + */ +static void +memlog_flush(MEMLOG *log) +{ +FILE *fp; +int i; + + if ((fp = fopen(log->name, "a")) == NULL) + return; + for (i = 0; i < log->offset; i++) + { + switch (log->type) + { + case ML_INT: + fprintf(fp, "%d\n", ((int *)(log->values))[i]); + break; + case ML_LONG: + fprintf(fp, "%ld\n", ((long *)(log->values))[i]); + break; + case ML_LONGLONG: + fprintf(fp, "%lld\n", ((long long *)(log->values))[i]); + break; + case ML_STRING: + fprintf(fp, "%s\n", ((char **)(log->values))[i]); + break; + } + } + fclose(fp); +} diff --git a/server/core/poll.c b/server/core/poll.c index 12cb1d69d..b245b79c0 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -1109,3 +1109,47 @@ int new_samples, new_nfds; if (next_sample >= n_avg_samples) next_sample = 0; } + +/** + * Insert a fake write completion event for a DCB into the polling + * queue. + * + * This is used to trigger transmission activity on another DCB from + * within the event processing routine of a DCB. + * + * @param dcb DCB to emulate an EPOLLOUT event for + */ +void +poll_fake_write_event(DCB *dcb) +{ +uint32_t ev = EPOLLOUT; + + spinlock_acquire(&pollqlock); + if (DCB_POLL_BUSY(dcb)) + { + dcb->evq.pending_events |= ev; + } + else + { + dcb->evq.pending_events = ev; + if (eventq) + { + dcb->evq.prev = eventq->evq.prev; + eventq->evq.prev->evq.next = dcb; + eventq->evq.prev = dcb; + dcb->evq.next = eventq; + } + else + { + eventq = dcb; + dcb->evq.prev = dcb; + dcb->evq.next = dcb; + } + pollStats.evq_length++; + if (pollStats.evq_length > pollStats.evq_max) + { + pollStats.evq_max = pollStats.evq_length; + } + } + spinlock_release(&pollqlock); +} diff --git a/server/core/test/CMakeLists.txt b/server/core/test/CMakeLists.txt index 914fe277e..a0b6002db 100644 --- a/server/core/test/CMakeLists.txt +++ b/server/core/test/CMakeLists.txt @@ -2,12 +2,15 @@ add_executable(test_hash testhash.c) add_executable(test_spinlock testspinlock.c) add_executable(test_filter testfilter.c) add_executable(test_adminusers testadminusers.c) +add_executable(testmemlog testmemlog.c) target_link_libraries(test_hash fullcore) target_link_libraries(test_spinlock fullcore) target_link_libraries(test_filter fullcore) target_link_libraries(test_adminusers fullcore) +target_link_libraries(testmemlog fullcore) add_test(TestHash test_hash) add_test(TestSpinlock test_spinlock) add_test(TestFilter test_filter) add_test(TestAdminUsers test_adminusers) +add_test(TestMemlog testmemlog) diff --git a/server/core/test/testmemlog.c b/server/core/test/testmemlog.c new file mode 100644 index 000000000..1523ec8ec --- /dev/null +++ b/server/core/test/testmemlog.c @@ -0,0 +1,404 @@ +/* + * This file is distributed as part of MaxScale from MariaDB. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright MariaDB Corporation 2014 + */ + +/** + * + * @verbatim + * Revision History + * + * Date Who Description + * 30/09/2014 Mark Riddoch Initial implementation + * + * @endverbatim + */ + +#include +#include +#include +#include +#include + +/** + * Count the number of lines in a file + * + * @param file The name of the file + * @return -1 if the file could not be opened or the numebr of lines + */ +int +linecount(char *file) +{ +FILE *fp; +int i = 0; +char buffer[180]; + + if ((fp = fopen(file, "r")) == NULL) + return -1; + while (fgets(buffer, 180, fp) != NULL) + i++; + fclose(fp); + return i; +} + +/* Some strings to log */ +char *strings[] = { + "First log entry", + "Second entry", + "Third", + "The fourth thing to log", + "Add a final 5th item" +}; + +int +main() +{ +MEMLOG *log, *log2; +int i; +long j; +long long k; +int failures = 0; + + unlink("memlog1"); + if ((log = memlog_create("memlog1", ML_INT, 100)) == NULL) + { + printf("Memlog Creation: Failed\n"); + failures++; + } + else + { + printf("Memlog Creation: Passed\n"); + if (access("memlog1",R_OK) == 0) + { + printf("File existance 1: Failed\n"); + failures++; + } + else + printf("File existance 1: Passed\n"); + for (i = 0; i < 50; i++) + memlog_log(log, (void *)i); + if (access("memlog1",R_OK) == 0) + { + printf("File existance 2: Failed\n"); + failures++; + } + else + printf("File existance 2: Passed\n"); + for (i = 0; i < 50; i++) + memlog_log(log, (void *)i); + if (access("memlog1",R_OK) != 0) + { + printf("File existance 3: Failed\n"); + failures++; + } + else + printf("File existance 3: Passed\n"); + if (linecount("memlog1") != 100) + { + printf("Incorrect entry count: Failed\n"); + failures++; + } + else + printf("Incorrect entry count: Passed\n"); + for (i = 0; i < 50; i++) + memlog_log(log, (void *)i); + if (linecount("memlog1") != 100) + { + printf("Premature Flushing: Failed\n"); + failures++; + } + else + printf("Premature Flushing: Passed\n"); + memlog_destroy(log); + if (linecount("memlog1") != 150) + { + printf("Flush on destroy: Failed\n"); + failures++; + } + else + printf("Flush on destroy: Passed\n"); + } + + unlink("memlog2"); + if ((log = memlog_create("memlog2", ML_LONG, 100)) == NULL) + { + printf("Memlog Creation: Failed\n"); + failures++; + } + else + { + printf("Memlog Creation: Passed\n"); + if (access("memlog2",R_OK) == 0) + { + printf("File existance 1: Failed\n"); + failures++; + } + else + printf("File existance 1: Passed\n"); + for (j = 0; j < 50; j++) + memlog_log(log, (void *)j); + if (access("memlog2",R_OK) == 0) + { + printf("File existance 2: Failed\n"); + failures++; + } + else + printf("File existance 2: Passed\n"); + for (j = 0; j < 50; j++) + memlog_log(log, (void *)j); + if (access("memlog2",R_OK) != 0) + { + printf("File existance 3: Failed\n"); + failures++; + } + else + printf("File existance 3: Passed\n"); + if (linecount("memlog2") != 100) + { + printf("Incorrect entry count: Failed\n"); + failures++; + } + else + printf("Incorrect entry count: Passed\n"); + for (j = 0; j < 50; j++) + memlog_log(log, (void *)j); + if (linecount("memlog2") != 100) + { + printf("Premature Flushing: Failed\n"); + failures++; + } + else + printf("Premature Flushing: Passed\n"); + memlog_destroy(log); + if (linecount("memlog2") != 150) + { + printf("Flush on destroy: Failed\n"); + failures++; + } + else + printf("Flush on destroy: Passed\n"); + } + + unlink("memlog3"); + if ((log = memlog_create("memlog3", ML_LONGLONG, 100)) == NULL) + { + printf("Memlog Creation: Failed\n"); + failures++; + } + else + { + printf("Memlog Creation: Passed\n"); + if (access("memlog3",R_OK) == 0) + { + printf("File existance 1: Failed\n"); + failures++; + } + else + printf("File existance 1: Passed\n"); + for (k = 0; k < 50; k++) + memlog_log(log, (void *)k); + if (access("memlog3",R_OK) == 0) + { + printf("File existance 2: Failed\n"); + failures++; + } + else + printf("File existance 2: Passed\n"); + for (k = 0; k < 50; k++) + memlog_log(log, (void *)k); + if (access("memlog3",R_OK) != 0) + { + printf("File existance 3: Failed\n"); + failures++; + } + else + printf("File existance 3: Passed\n"); + if (linecount("memlog3") != 100) + { + printf("Incorrect entry count: Failed\n"); + failures++; + } + else + printf("Incorrect entry count: Passed\n"); + for (k = 0; k < 50; k++) + memlog_log(log, (void *)k); + if (linecount("memlog3") != 100) + { + printf("Premature Flushing: Failed\n"); + failures++; + } + else + printf("Premature Flushing: Passed\n"); + memlog_destroy(log); + if (linecount("memlog3") != 150) + { + printf("Flush on destroy: Failed\n"); + failures++; + } + else + printf("Flush on destroy: Passed\n"); + } + + unlink("memlog4"); + if ((log = memlog_create("memlog4", ML_STRING, 100)) == NULL) + { + printf("Memlog Creation: Failed\n"); + failures++; + } + else + { + printf("Memlog Creation: Passed\n"); + if (access("memlog4",R_OK) == 0) + { + printf("File existance 1: Failed\n"); + failures++; + } + else + printf("File existance 1: Passed\n"); + for (i = 0; i < 50; i++) + memlog_log(log, strings[i%5]); + if (access("memlog4",R_OK) == 0) + { + printf("File existance 2: Failed\n"); + failures++; + } + else + printf("File existance 2: Passed\n"); + for (i = 0; i < 50; i++) + memlog_log(log, strings[i%5]); + if (access("memlog4",R_OK) != 0) + { + printf("File existance 3: Failed\n"); + failures++; + } + else + printf("File existance 3: Passed\n"); + if (linecount("memlog4") != 100) + { + printf("Incorrect entry count: Failed\n"); + failures++; + } + else + printf("Incorrect entry count: Passed\n"); + for (i = 0; i < 50; i++) + memlog_log(log, strings[i%5]); + if (linecount("memlog4") != 100) + { + printf("Premature Flushing: Failed\n"); + failures++; + } + else + printf("Premature Flushing: Passed\n"); + memlog_destroy(log); + if (linecount("memlog4") != 150) + { + printf("Flush on destroy: Failed\n"); + failures++; + } + else + printf("Flush on destroy: Passed\n"); + } + + unlink("memlog5"); + unlink("memlog6"); + if ((log = memlog_create("memlog5", ML_INT, 100)) == NULL) + { + printf("Memlog Creation: Failed\n"); + failures++; + } + else + { + printf("Memlog Creation: Passed\n"); + if ((log2 = memlog_create("memlog6", ML_INT, 100)) == NULL) + { + printf("Memlog Creation: Failed\n"); + failures++; + } + else + { + printf("Memlog Creation: Passed\n"); + for (i = 0; i < 40; i++) + memlog_log(log, (void *)i); + for (i = 0; i < 30; i++) + memlog_log(log2, (void *)i); + memlog_flush_all(); + if (linecount("memlog5") != 40 || + linecount("memlog6") != 30) + { + printf( + "Memlog flush all: Failed\n"); + failures++; + } + else + printf( + "Memlog flush all: Passed\n"); + } + } + + unlink("memlog7"); + if ((log = memlog_create("memlog7", ML_INT, 100)) == NULL) + { + printf("Memlog Creation: Failed\n"); + failures++; + } + else + { + printf("Memlog Creation: Passed\n"); + if (access("memlog7",R_OK) == 0) + { + printf("File existance 1: Failed\n"); + failures++; + } + else + printf("File existance 1: Passed\n"); + for (i = 0; i < 5050; i++) + memlog_log(log, (void *)i); + if (access("memlog7",R_OK) != 0) + { + printf("File existance 3: Failed\n"); + failures++; + } + else + printf("File existance 3: Passed\n"); + if (linecount("memlog7") != 5000) + { + printf("Incorrect entry count: Failed\n"); + failures++; + } + else + printf("Incorrect entry count: Passed\n"); + for (i = 0; i < 50; i++) + memlog_log(log, (void *)i); + if (linecount("memlog7") != 5100) + { + printf("Residual flushing: Failed\n"); + failures++; + } + else + printf("Premature Flushing: Passed\n"); + for (i = 0; i < 10120; i++) + memlog_log(log, (void *)i); + memlog_destroy(log); + if (linecount("memlog7") != 15220) + { + printf("Flush on destroy: Failed\n"); + failures++; + } + else + printf("Flush on destroy: Passed\n"); + } + exit(failures); +} diff --git a/server/include/memlog.h b/server/include/memlog.h new file mode 100644 index 000000000..032348e38 --- /dev/null +++ b/server/include/memlog.h @@ -0,0 +1,51 @@ +#ifndef _MEMLOG_H +#define _MEMLOG_H +/* + * This file is distributed as part of MariaDB MaxScale. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright MariaDB Ab 2014 + */ + +/** + * @file memlog.h The memory logging mechanism + * + * @verbatim + * Revision History + * + * Date Who Description + * 26/09/14 Mark Riddoch Initial implementation + * + * @endverbatim + */ +#include + +typedef enum { ML_INT, ML_LONG, ML_LONGLONG, ML_STRING } MEMLOGTYPE; + +typedef struct memlog { + char *name; + SPINLOCK lock; + void *values; + int offset; + int size; + MEMLOGTYPE type; + struct memlog *next; +} MEMLOG; + + +extern MEMLOG *memlog_create(char *, MEMLOGTYPE, int); +extern void memlog_destroy(MEMLOG *); +extern void memlog_log(MEMLOG *, void *); +extern void memlog_flush_all(); +#endif diff --git a/server/include/spinlock.h b/server/include/spinlock.h index 43192da3f..e5f938815 100644 --- a/server/include/spinlock.h +++ b/server/include/spinlock.h @@ -31,7 +31,7 @@ #include #include -#define SPINLOCK_PROFILE 1 +#define SPINLOCK_PROFILE 0 /** * The spinlock structure. diff --git a/server/modules/include/blr.h b/server/modules/include/blr.h index 6e151d923..1ba575d49 100644 --- a/server/modules/include/blr.h +++ b/server/modules/include/blr.h @@ -39,12 +39,22 @@ #define BINLOG_NAMEFMT "%s.%06d" #define BINLOG_NAME_ROOT "mysql-bin" +/* How often to call the binlog status function (seconds) */ +#define BLR_STATS_FREQ 60 +#define BLR_NSTATS_MINUTES 30 + /** * High and Low water marks for the slave dcb. These values can be overriden * by the router options highwater and lowwater. */ -#define DEF_LOW_WATER 2000 -#define DEF_HIGH_WATER 30000 +#define DEF_LOW_WATER 1000 +#define DEF_HIGH_WATER 10000 + +/** + * Default burst sizes for slave catchup + */ +#define DEF_SHORT_BURST 15 +#define DEF_LONG_BURST 2000 /** * Some useful macros for examining the MySQL Response packets @@ -60,20 +70,23 @@ * Slave statistics */ typedef struct { - int n_events; /*< Number of events sent */ - int n_bursts; /*< Number of bursts sent */ - int n_requests; /*< Number of requests received */ - int n_flows; /*< Number of flow control restarts */ - int n_catchupnr; /*< No. of times catchup resulted in not entering loop */ - int n_alreadyupd; - int n_upd; - int n_cb; - int n_cbna; - int n_dcb; - int n_above; - int n_failed_read; - int n_overrun; - int n_actions[3]; + int n_events; /*< Number of events sent */ + int n_bursts; /*< Number of bursts sent */ + int n_requests; /*< Number of requests received */ + int n_flows; /*< Number of flow control restarts */ + int n_catchupnr; /*< No. of times catchup resulted in not entering loop */ + int n_alreadyupd; + int n_upd; + int n_cb; + int n_cbna; + int n_dcb; + int n_above; + int n_failed_read; + int n_overrun; + int n_actions[3]; + uint64_t lastsample; + int minno; + int minavgs[BLR_NSTATS_MINUTES]; } SLAVE_STATS; /** @@ -132,6 +145,9 @@ typedef struct { uint64_t n_fakeevents; /*< Fake events not written to disk */ uint64_t n_artificial; /*< Artificial events not written to disk */ uint64_t events[0x24]; /*< Per event counters */ + uint64_t lastsample; + int minno; + int minavgs[BLR_NSTATS_MINUTES]; } ROUTER_STATS; /** @@ -214,6 +230,8 @@ typedef struct router_instance { */ unsigned int low_water; /*< Low water mark for client DCB */ unsigned int high_water; /*< High water mark for client DCB */ + unsigned int short_burst; /*< Short burst for slave catchup */ + unsigned int long_burst; /*< Long burst for slave catchup */ BLCACHE *cache[2]; ROUTER_STATS stats; /*< Statistics for this router */ int active_logs; @@ -279,12 +297,13 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered", /** * Slave catch-up status */ -#define CS_READING 0x0001 -#define CS_INNERLOOP 0x0002 #define CS_UPTODATE 0x0004 #define CS_EXPECTCB 0x0008 #define CS_DIST 0x0010 #define CS_DISTLATCH 0x0020 +#define CS_THRDWAIT 0x0040 +#define CS_BUSY 0x0100 +#define CS_HOLD 0x0200 /** * MySQL protocol OpCodes needed for replication @@ -356,7 +375,7 @@ extern void blr_master_reconnect(ROUTER_INSTANCE *); extern int blr_slave_request(ROUTER_INSTANCE *, ROUTER_SLAVE *, GWBUF *); extern void blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr); -extern int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); +extern int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large); extern void blr_init_cache(ROUTER_INSTANCE *); extern void blr_file_init(ROUTER_INSTANCE *); diff --git a/server/modules/routing/binlog/blr.c b/server/modules/routing/binlog/blr.c index d92f03c10..1d4a9d8ff 100644 --- a/server/modules/routing/binlog/blr.c +++ b/server/modules/routing/binlog/blr.c @@ -48,6 +48,7 @@ #include #include #include +#include #include #include @@ -95,6 +96,8 @@ static ROUTER_OBJECT MyObject = { getCapabilities }; +static void stats_func(void *); + static bool rses_begin_locked_router_action(ROUTER_SLAVE *); static void rses_end_locked_router_action(ROUTER_SLAVE *); @@ -173,6 +176,8 @@ int i; inst->low_water = DEF_LOW_WATER; inst->high_water = DEF_HIGH_WATER; inst->initbinlog = 0; + inst->short_burst = DEF_SHORT_BURST; + inst->long_burst = DEF_LONG_BURST; /* * We only support one server behind this router, since the server is @@ -249,6 +254,10 @@ int i; { inst->initbinlog = atoi(value); } + else if (strcmp(options[i], "file") == 0) + { + inst->initbinlog = atoi(value); + } else if (strcmp(options[i], "lowwater") == 0) { inst->low_water = atoi(value); @@ -257,6 +266,14 @@ int i; { inst->high_water = atoi(value); } + else if (strcmp(options[i], "shortburst") == 0) + { + inst->short_burst = atoi(value); + } + else if (strcmp(options[i], "longburst") == 0) + { + inst->long_burst = atoi(value); + } else { LOGIF(LE, (skygw_log_write( @@ -302,6 +319,8 @@ int i; */ blr_init_cache(inst); + hktask_add("Binlog Router", stats_func, inst, BLR_STATS_FREQ); + /* * Now start the replication from the master to MaxScale */ @@ -541,7 +560,9 @@ diagnostics(ROUTER *router, DCB *dcb) { ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)router; ROUTER_SLAVE *session; -int i = 0; +int i = 0, j; +int minno = 0; +double min5, min10, min15, min30; char buf[40]; struct tm tm; @@ -554,6 +575,30 @@ struct tm tm; } spinlock_release(&router_inst->lock); + minno = router_inst->stats.minno; + min30 = 0.0; + min15 = 0.0; + min10 = 0.0; + min5 = 0.0; + for (j = 0; j < 30; j++) + { + minno--; + if (minno < 0) + minno += 30; + min30 += router_inst->stats.minavgs[minno]; + if (j < 15) + min15 += router_inst->stats.minavgs[minno]; + if (j < 10) + min10 += router_inst->stats.minavgs[minno]; + if (j < 5) + min5 += router_inst->stats.minavgs[minno]; + } + min30 /= 30.0; + min15 /= 15.0; + min10 /= 10.0; + min5 /= 5.0; + + dcb_printf(dcb, "\tMaster connection DCB: %p\n", router_inst->master); dcb_printf(dcb, "\tMaster connection state: %s\n", @@ -574,6 +619,13 @@ struct tm tm; router_inst->stats.n_slaves); dcb_printf(dcb, "\tNumber of binlog events received: %u\n", router_inst->stats.n_binlogs); + minno = router_inst->stats.minno - 1; + if (minno == -1) + minno = 30; + dcb_printf(dcb, "\tNumber of binlog events per minute\n"); + dcb_printf(dcb, "\tCurrent 5 10 15 30 Min Avg\n"); + dcb_printf(dcb, "\t %6d %8.1f %8.1f %8.1f %8.1f\n", + router_inst->stats.minavgs[minno], min5, min10, min15, min30); dcb_printf(dcb, "\tNumber of fake binlog events: %u\n", router_inst->stats.n_fakeevents); dcb_printf(dcb, "\tNumber of artificial binlog events: %u\n", @@ -582,10 +634,6 @@ struct tm tm; router_inst->stats.n_binlog_errors); dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n", router_inst->stats.n_rotates); - dcb_printf(dcb, "\tNumber of binlog cache hits: %u\n", - router_inst->stats.n_cachehits); - dcb_printf(dcb, "\tNumber of binlog cache misses: %u\n", - router_inst->stats.n_cachemisses); dcb_printf(dcb, "\tNumber of heartbeat events: %u\n", router_inst->stats.n_heartbeats); dcb_printf(dcb, "\tNumber of packets received: %u\n", @@ -624,6 +672,29 @@ struct tm tm; session = router_inst->slaves; while (session) { + + minno = session->stats.minno; + min30 = 0.0; + min15 = 0.0; + min10 = 0.0; + min5 = 0.0; + for (j = 0; j < 30; j++) + { + minno--; + if (minno < 0) + minno += 30; + min30 += session->stats.minavgs[minno]; + if (j < 15) + min15 += session->stats.minavgs[minno]; + if (j < 10) + min10 += session->stats.minavgs[minno]; + if (j < 5) + min5 += session->stats.minavgs[minno]; + } + min30 /= 30.0; + min15 /= 15.0; + min10 /= 10.0; + min5 /= 5.0; dcb_printf(dcb, "\t\tServer-id: %d\n", session->serverid); if (session->hostname) dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname); @@ -637,14 +708,20 @@ struct tm tm; dcb_printf(dcb, "\t\tNo. requests: %u\n", session->stats.n_requests); dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events); dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts); + minno = session->stats.minno - 1; + if (minno == -1) + minno = 30; + dcb_printf(dcb, "\t\tNumber of binlog events per minute\n"); + dcb_printf(dcb, "\t\tCurrent 5 10 15 30 Min Avg\n"); + dcb_printf(dcb, "\t\t %6d %8.1f %8.1f %8.1f %8.1f\n", + session->stats.minavgs[minno], min5, min10, + min15, min30); dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows); dcb_printf(dcb, "\t\tNo. catchup NRs: %u\n", session->stats.n_catchupnr); dcb_printf(dcb, "\t\tNo. already up to date: %u\n", session->stats.n_alreadyupd); dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd); - dcb_printf(dcb, "\t\tNo. of low water cbs %u\n", session->stats.n_cb); dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb); dcb_printf(dcb, "\t\tNo. of low water cbs N/A %u\n", session->stats.n_cbna); - dcb_printf(dcb, "\t\tNo. of events > high water %u\n", session->stats.n_above); dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read); dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun); dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]); @@ -776,3 +853,35 @@ static uint8_t getCapabilities(ROUTER *inst, void *router_session) { return 0; } + +/** + * The stats gathering function called from the housekeeper so that we + * can get timed averages of binlog records shippped + * + * @param inst The router instance + */ +static void +stats_func(void *inst) +{ +ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)inst; +ROUTER_SLAVE *slave; + + router->stats.minavgs[router->stats.minno++] + = router->stats.n_binlogs - router->stats.lastsample; + router->stats.lastsample = router->stats.n_binlogs; + if (router->stats.minno == BLR_NSTATS_MINUTES) + router->stats.minno = 0; + + spinlock_acquire(&router->lock); + slave = router->slaves; + while (slave) + { + slave->stats.minavgs[slave->stats.minno++] + = slave->stats.n_events - slave->stats.lastsample; + slave->stats.lastsample = slave->stats.n_events; + if (slave->stats.minno == BLR_NSTATS_MINUTES) + slave->stats.minno = 0; + slave = slave->next; + } + spinlock_release(&router->lock); +} diff --git a/server/modules/routing/binlog/blr_master.c b/server/modules/routing/binlog/blr_master.c index ee14cec4c..6f92d193d 100644 --- a/server/modules/routing/binlog/blr_master.c +++ b/server/modules/routing/binlog/blr_master.c @@ -60,39 +60,6 @@ /* Temporary requirement for auth data */ #include -#define SAMPLE_COUNT 10000 -CYCLES samples[10][SAMPLE_COUNT]; -int sample_index[10] = { 0, 0, 0 }; - -#define LOGD_SLAVE_CATCHUP1 0 -#define LOGD_SLAVE_CATCHUP2 1 -#define LOGD_DISTRIBUTE 2 -#define LOGD_FILE_FLUSH 3 - -SPINLOCK logspin = SPINLOCK_INIT; - -void -log_duration(int sample, CYCLES duration) -{ -char fname[100]; -int i; -FILE *fp; - - spinlock_acquire(&logspin); - samples[sample][sample_index[sample]++] = duration; - if (sample_index[sample] == SAMPLE_COUNT) - { - sprintf(fname, "binlog_profile.%d", sample); - if ((fp = fopen(fname, "a")) != NULL) - { - for (i = 0; i < SAMPLE_COUNT; i++) - fprintf(fp, "%ld\n", samples[sample][i]); - fclose(fp); - } - sample_index[sample] = 0; - } - spinlock_release(&logspin); -} extern int lm_enabled_logfiles_bitmask; @@ -802,9 +769,7 @@ static REP_HEADER phdr; { ss_dassert(pkt_length == 0); } -{ CYCLES start = rdtsc(); blr_file_flush(router); -log_duration(LOGD_FILE_FLUSH, rdtsc() - start); } } /** @@ -929,9 +894,7 @@ GWBUF *pkt; uint8_t *buf; ROUTER_SLAVE *slave; int action; -CYCLES entry; - entry = rdtsc(); spinlock_acquire(&router->lock); slave = router->slaves; while (slave) @@ -985,16 +948,12 @@ CYCLES entry; spinlock_acquire(&slave->catch_lock); if (slave->overrun) { -CYCLES cycle_start, cycles; slave->stats.n_overrun++; slave->overrun = 0; spinlock_release(&router->lock); slave->cstate &= ~(CS_UPTODATE|CS_DIST); spinlock_release(&slave->catch_lock); -cycle_start = rdtsc(); - blr_slave_catchup(router, slave); -cycles = rdtsc() - cycle_start; -log_duration(LOGD_SLAVE_CATCHUP2, cycles); + blr_slave_catchup(router, slave, false); spinlock_acquire(&router->lock); slave = router->slaves; if (slave) @@ -1027,7 +986,6 @@ log_duration(LOGD_SLAVE_CATCHUP2, cycles); */ if (slave->cstate & CS_UPTODATE) { -CYCLES cycle_start, cycles; spinlock_release(&router->lock); LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG, "Force slave %d into catchup mode %s@%d\n", @@ -1036,10 +994,7 @@ CYCLES cycle_start, cycles; spinlock_acquire(&slave->catch_lock); slave->cstate &= ~(CS_UPTODATE|CS_DIST); spinlock_release(&slave->catch_lock); -cycle_start = rdtsc(); - blr_slave_catchup(router, slave); -cycles = rdtsc() - cycle_start; -log_duration(LOGD_SLAVE_CATCHUP1, cycles); + blr_slave_catchup(router, slave, false); spinlock_acquire(&router->lock); slave = router->slaves; if (slave) @@ -1053,7 +1008,6 @@ log_duration(LOGD_SLAVE_CATCHUP1, cycles); slave = slave->next; } spinlock_release(&router->lock); - log_duration(LOGD_DISTRIBUTE, rdtsc() - entry); } static void diff --git a/server/modules/routing/binlog/blr_slave.c b/server/modules/routing/binlog/blr_slave.c index 5d7a16475..0ed3618c9 100644 --- a/server/modules/routing/binlog/blr_slave.c +++ b/server/modules/routing/binlog/blr_slave.c @@ -52,7 +52,6 @@ #include #include - static uint32_t extract_field(uint8_t *src, int bits); static void encode_value(unsigned char *data, unsigned int value, int len); static int blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue); @@ -61,7 +60,7 @@ static void blr_slave_send_error(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, c static int blr_slave_send_timestamp(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); static int blr_slave_register(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue); static int blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue); -int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave); +int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large); static uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr); static int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data); @@ -567,7 +566,7 @@ uint32_t chksum; slave->dcb->low_water = router->low_water; slave->dcb->high_water = router->high_water; - dcb_add_callback(slave->dcb, DCB_REASON_LOW_WATER, blr_slave_callback, slave); +// dcb_add_callback(slave->dcb, DCB_REASON_LOW_WATER, blr_slave_callback, slave); dcb_add_callback(slave->dcb, DCB_REASON_DRAINED, blr_slave_callback, slave); if (slave->binlog_pos != router->binlog_position || @@ -576,7 +575,7 @@ uint32_t chksum; spinlock_acquire(&slave->catch_lock); slave->cstate &= ~CS_UPTODATE; spinlock_release(&slave->catch_lock); - rval = blr_slave_catchup(router, slave); + rval = blr_slave_catchup(router, slave, true); } return rval; @@ -660,187 +659,107 @@ uint8_t *ptr; * We have a registered slave that is behind the current leading edge of the * binlog. We must replay the log entries to bring this node up to speed. * - * There may be a large numebr of records to send to the slave, the process + * There may be a large number of records to send to the slave, the process * is triggered by the slave COM_BINLOG_DUMP message and all the events must * be sent without receiving any new event. This measn there is no trigger into * MaxScale other than this initial message. However, if we simply send all the - * events we end up with an extremely long write queue on the DCB and risk running - * the server out of resources. + * events we end up with an extremely long write queue on the DCB and risk + * running the server out of resources. * - * To resolve this the concept of high and low water marks within the DCB has been - * added, with the ability for the DCB code to call user defined callbacks when the - * write queue is completely drained, when it crosses above the high water mark and - * when it crosses below the low water mark. - * - * The blr_slave_catchup routine will send binlog events to the slave until the high - * water mark is reached, at which point it will return. Later, when a low water mark - * callback is generated by the code that drains the DCB of data the blr_slave_catchup - * routine will again be called to write more events. The process is repeated until - * the slave has caught up with the master. + * The slave catchup routine will send a burst of replication events per single + * call. The paramter "long" control the number of events in the burst. The + * short burst is intended to be used when the master receive an event and + * needs to put the slave into catchup mode. This prevents the slave taking + * too much tiem away from the thread that is processing the master events. * - * Note: an additional check that the DCB is still above the low water mark is done - * prior to the return from this function to allow for any delays due to the call to - * the close system call, since this may cause thread rescheduling. + * At the end ofthe burst a fake EPOLLOUT event is added to the poll event + * queue. This ensures that the slave callback for processing DCB write drain + * will be called and future catchup requests will be handle on another thread. * * @param router The binlog router * @param slave The slave that is behind + * @param large Send a long or short burst of events * @return The number of bytes written */ int -blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave) +blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large) { GWBUF *head, *record; REP_HEADER hdr; -int written, fd, rval = 1, burst = 0; +int written, fd, rval = 1, burst; uint8_t *ptr; struct timespec req; - + if (large) + burst = router->long_burst; + else + burst = router->short_burst; spinlock_acquire(&slave->catch_lock); - slave->cstate &= ~CS_EXPECTCB; - spinlock_release(&slave->catch_lock); -doitagain: - /* - * We have a slightly complex syncronisation mechansim here, - * we need to make sure that we do not have multiple threads - * running the catchup loop, but we need to be very careful - * that we do not loose a call that is coming via a callback - * call as this will stall the binlog catchup process. - * - * We don't want to simply use a traditional mutex here for - * the loop, since this would block a MaxScale thread for - * an unacceptable length of time. - * - * We have two status bits, the CS_READING that says we are - * in the outer loop and the CS_INNERLOOP, to say we are in - * the inner loop. - * - * If just CS_READING is set the other thread may be about to - * enter the inner loop or may be about to exit the function - * completely. Therefore we have to wait to see if CS_READING - * is cleared or CS_INNERLOOP is set. - * - * If CS_READING gets cleared then this thread should proceed - * into the loop. - * - * If CS_INNERLOOP get's set then this thread does not need to - * proceed. - * - * If CS_READING is not set then this thread simply enters the - * loop. - */ - req.tv_sec = 0; - req.tv_nsec = 1000; - spinlock_acquire(&slave->catch_lock); - if (slave->cstate & CS_UPTODATE) + while ((slave->cstate & (CS_HOLD|CS_BUSY)) == (CS_HOLD|CS_BUSY)) { - LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE, - "blr_slave_catchup called with up to date slave %d at " - "%s@%d. Reading position %s@%d\n", - slave->serverid, slave->binlogfile, - slave->binlog_pos, router->binlog_name, - router->binlog_position))); - slave->stats.n_alreadyupd++; - spinlock_release(&slave->catch_lock); - return 1; - } - while (slave->cstate & CS_READING) - { - // Wait until we know what the other thread is doing - while ((slave->cstate & (CS_READING|CS_INNERLOOP)) == CS_READING) - { - spinlock_release(&slave->catch_lock); - nanosleep(&req, NULL); - spinlock_acquire(&slave->catch_lock); - } - // Other thread is in the innerloop - if ((slave->cstate & (CS_READING|CS_INNERLOOP)) == (CS_READING|CS_INNERLOOP)) - { - spinlock_release(&slave->catch_lock); - LOGIF(LM, (skygw_log_write( - LOGFILE_MESSAGE, - "blr_slave_catchup thread returning due to " - "lock being held by another thread. %s@%d\n", - slave->binlogfile, - slave->binlog_pos))); - slave->stats.n_catchupnr++; - return 1; // We cheat here and return 1 because otherwise - // an error would be sent and we do not want that - } - - /* Release the lock for a short time to allow the other - * thread to exit the outer reading loop. - */ spinlock_release(&slave->catch_lock); + req.tv_sec = 0; + req.tv_nsec = 100; nanosleep(&req, NULL); spinlock_acquire(&slave->catch_lock); } - if (slave->pthread) - LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG, "Multiple threads sending to same thread.\n"))); - slave->pthread = pthread_self(); - slave->cstate |= CS_READING; + slave->cstate |= (CS_HOLD|CS_BUSY); spinlock_release(&slave->catch_lock); - if (DCB_ABOVE_HIGH_WATER(slave->dcb)) - LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "blr_slave_catchup above high water on entry.\n"))); - - do { - if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1) + if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "blr_slave_catchup failed to open binlog file %s\n", + slave->binlogfile))); + return 0; + } + slave->stats.n_bursts++; + while (burst-- && + (record = blr_read_binlog(fd, slave->binlog_pos, &hdr)) != NULL) + { + spinlock_acquire(&slave->catch_lock); + slave->cstate &= ~CS_HOLD; + spinlock_release(&slave->catch_lock); + head = gwbuf_alloc(5); + ptr = GWBUF_DATA(head); + encode_value(ptr, hdr.event_size + 1, 24); + ptr += 3; + *ptr++ = slave->seqno++; + *ptr++ = 0; // OK + head = gwbuf_append(head, record); + if (hdr.event_type == ROTATE_EVENT) { - spinlock_acquire(&slave->catch_lock); - slave->cstate &= ~CS_READING; - spinlock_release(&slave->catch_lock); - LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, - "blr_slave_catchup failed to open binlog file %s\n", + close(fd); + blr_slave_rotate(slave, GWBUF_DATA(record)); + if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "blr_slave_catchup failed to open binlog file %s\n", slave->binlogfile))); - return 0; + break; + } } - slave->stats.n_bursts++; spinlock_acquire(&slave->catch_lock); - slave->cstate |= CS_INNERLOOP; + slave->cstate |= CS_HOLD; spinlock_release(&slave->catch_lock); - while ((!DCB_ABOVE_HIGH_WATER(slave->dcb)) && - (record = blr_read_binlog(fd, slave->binlog_pos, &hdr)) != NULL) + written = slave->dcb->func.write(slave->dcb, head); + if (written && hdr.event_type != ROTATE_EVENT) { -if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++; - head = gwbuf_alloc(5); - ptr = GWBUF_DATA(head); - encode_value(ptr, hdr.event_size + 1, 24); - ptr += 3; - *ptr++ = slave->seqno++; - *ptr++ = 0; // OK - head = gwbuf_append(head, record); - if (hdr.event_type == ROTATE_EVENT) - { - close(fd); - blr_slave_rotate(slave, GWBUF_DATA(record)); - if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1) - { - LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, - "blr_slave_catchup failed to open binlog file %s\n", - slave->binlogfile))); - break; - } - } - written = slave->dcb->func.write(slave->dcb, head); - if (written && hdr.event_type != ROTATE_EVENT) - { - slave->binlog_pos = hdr.next_pos; - } - rval = written; - slave->stats.n_events++; - burst++; + slave->binlog_pos = hdr.next_pos; } - if (record == NULL) - slave->stats.n_failed_read++; - spinlock_acquire(&slave->catch_lock); - slave->cstate &= ~CS_INNERLOOP; - spinlock_release(&slave->catch_lock); + rval = written; + slave->stats.n_events++; + } + if (record == NULL) + slave->stats.n_failed_read++; + spinlock_acquire(&slave->catch_lock); + slave->cstate &= ~CS_BUSY; + spinlock_release(&slave->catch_lock); - close(fd); - } while (record && DCB_BELOW_LOW_WATER(slave->dcb)); + close(fd); + poll_fake_write_event(slave->dcb); if (record) { slave->stats.n_flows++; @@ -864,25 +783,6 @@ if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++; "blr_slave_catchup slave is up to date %s, %u\n", slave->binlogfile, slave->binlog_pos))); } - spinlock_acquire(&slave->catch_lock); -#if 0 -if (slave->pthread != pthread_self()) -{ - LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Multple threads in catchup for same slave: %x and %x\n", slave->pthread, pthread_self()))); -abort(); -} -#endif - slave->pthread = 0; -#if 0 -if (DCB_BELOW_LOW_WATER(slave->dcb) && slave->binlog_pos != router->binlog_position) abort(); -#endif - slave->cstate &= ~CS_READING; - spinlock_release(&slave->catch_lock); -if (DCB_BELOW_LOW_WATER(slave->dcb) && slave->binlog_pos != router->binlog_position) -{ - LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Expected to be above low water\n"))); -goto doitagain; -} return rval; } @@ -908,7 +808,7 @@ ROUTER_INSTANCE *router = slave->router; slave->binlog_pos != router->binlog_position) { slave->stats.n_dcb++; - blr_slave_catchup(router, slave); + blr_slave_catchup(router, slave, true); } } @@ -917,7 +817,7 @@ ROUTER_INSTANCE *router = slave->router; if (slave->state == BLRS_DUMPING) { slave->stats.n_cb++; - blr_slave_catchup(router, slave); + blr_slave_catchup(router, slave, true); } else {