Updates to slave catchup mode to use fake events

Addition of fake EPOLLOUT event mechanism

New memlog feature for debugging purposes
This commit is contained in:
Mark Riddoch 2014-09-30 13:25:45 +01:00
parent 3430fc99d2
commit 0ef87e3cc1
13 changed files with 951 additions and 253 deletions

View File

@ -5,7 +5,8 @@ target_link_libraries(fullcore log_manager utils pthread ${EMBEDDED_LIB} ssl aio
add_executable(maxscale atomic.c buffer.c spinlock.c gateway.c
gw_utils.c utils.c dcb.c load_utils.c session.c service.c server.c
poll.c config.c users.c hashtable.c dbusers.c thread.c gwbitmask.c
monitor.c adminusers.c secrets.c filter.c modutil.c hint.c housekeeper.c)
monitor.c adminusers.c secrets.c filter.c modutil.c hint.c
housekeeper.c memlog.c)
target_link_libraries(maxscale ${EMBEDDED_LIB} log_manager utils ssl aio pthread crypt dl crypto inih z rt m stdc++)
install(TARGETS maxscale DESTINATION bin)
@ -19,4 +20,4 @@ install(TARGETS maxpasswd DESTINATION bin)
if(BUILD_TESTS)
add_subdirectory(test)
endif()
endif()

View File

@ -65,7 +65,8 @@ include ../../makefile.inc
SRCS= atomic.c buffer.c spinlock.c gateway.c \
gw_utils.c utils.c dcb.c load_utils.c session.c service.c server.c \
poll.c config.c users.c hashtable.c dbusers.c thread.c gwbitmask.c \
monitor.c adminusers.c secrets.c filter.c modutil.c hint.c housekeeper.c
monitor.c adminusers.c secrets.c filter.c modutil.c hint.c \
housekeeper.c memlog.c
HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \
../include/gw.h ../modules/include/mysql_client_server_protocol.h \
@ -73,7 +74,8 @@ HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \
../include/modules.h ../include/poll.h ../include/config.h \
../include/users.h ../include/hashtable.h ../include/gwbitmask.h \
../include/adminusers.h ../include/version.h ../include/maxscale.h \
../include/filter.h ../include/modutil.h ../hint.h ../include/housekeeper.h
../include/filter.h ../include/modutil.h ../hint.h \
../include/housekeeper.h ../include/memlog.h
OBJ=$(SRCS:.c=.o)

View File

@ -53,6 +53,7 @@
#include <config.h>
#include <poll.h>
#include <housekeeper.h>
#include <memlog.h>
#include <stdlib.h>
#include <unistd.h>
@ -1643,10 +1644,11 @@ return_main:
* Shutdown MaxScale server
*/
void
shutdown_server()
shutdown_server()
{
poll_shutdown();
hkshutdown();
memlog_flush_all();
log_flush_shutdown();
}

209
server/core/memlog.c Normal file
View File

@ -0,0 +1,209 @@
/*
* This file is distributed as part of the MariaDB MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright MariaDB Ab 2014
*/
/**
* @file memlog.c - Implementation of memory logging mechanism for debug purposes
*
* @verbatim
* Revision History
*
* Date Who Description
* 26/09/14 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <memlog.h>
#include <stdio.h>
static MEMLOG *memlogs = NULL;
static SPINLOCK *memlock = SPINLOCK_INIT;
static void memlog_flush(MEMLOG *);
/**
* Create a new instance of a memory logger.
*
* @param name The name of the memory log
* @param type The type of item being logged
* @param size The number of items to store in memory before flushign to disk
*
* @return MEMLOG* A memory log handle
*/
MEMLOG *
memlog_create(char *name, MEMLOGTYPE type, int size)
{
MEMLOG *log;
if ((log = (MEMLOG *)malloc(sizeof(MEMLOG))) == NULL)
{
return NULL;
}
log->name = strdup(name);
spinlock_init(&log->lock);
log->type = type;
log->offset = 0;
log->size = size;
switch (type)
{
case ML_INT:
log->values = malloc(sizeof(int) * size);
break;
case ML_LONG:
log->values = malloc(sizeof(long) * size);
break;
case ML_LONGLONG:
log->values = malloc(sizeof(long long) * size);
break;
case ML_STRING:
log->values = malloc(sizeof(char *) * size);
break;
}
if (log->values == NULL)
{
free(log);
return NULL;
}
spinlock_acquire(&memlock);
log->next = memlogs;
memlogs = log;
spinlock_release(&memlock);
return log;
}
/**
* Destroy a memory logger any unwritten data will be flushed to disk
*
* @param log The memory log to destroy
*/
void
memlog_destroy(MEMLOG *log)
{
MEMLOG *ptr;
memlog_flush(log);
free(log->values);
spinlock_acquire(&memlock);
if (memlogs == log)
memlogs = log->next;
else
{
ptr = memlogs;
while (ptr && ptr->next != log)
ptr = ptr->next;
if (ptr)
ptr->next = log->next;
}
spinlock_release(&memlock);
free(log->name);
free(log);
}
/**
* Log a data item to the memory logger
*
* @param log The memory logger
* @param value The value to log
*/
void
memlog_log(MEMLOG *log, void *value)
{
spinlock_acquire(&log->lock);
switch (log->type)
{
case ML_INT:
((int *)(log->values))[log->offset] = (int)value;
break;
case ML_LONG:
((long *)(log->values))[log->offset] = (long)value;
break;
case ML_LONGLONG:
((long long *)(log->values))[log->offset] = (long long)value;
break;
case ML_STRING:
((char **)(log->values))[log->offset] = (char *)value;
break;
}
log->offset++;
if (log->offset == log->size)
{
memlog_flush(log);
log->offset = 0;
}
spinlock_release(&log->lock);
}
/**
* Flush all memlogs to disk, called during shutdown
*
*/
void
memlog_flush_all()
{
MEMLOG *log;
spinlock_acquire(&memlock);
log = memlogs;
while (log)
{
spinlock_acquire(&log->lock);
memlog_flush(log);
spinlock_release(&log->lock);
log = log->next;
}
spinlock_release(&memlock);
}
/**
* Flush a memory log to disk
*
* Assumes the the log->lock has been acquired by the caller
*
* @param log The memory log to flush
*/
static void
memlog_flush(MEMLOG *log)
{
FILE *fp;
int i;
if ((fp = fopen(log->name, "a")) == NULL)
return;
for (i = 0; i < log->offset; i++)
{
switch (log->type)
{
case ML_INT:
fprintf(fp, "%d\n", ((int *)(log->values))[i]);
break;
case ML_LONG:
fprintf(fp, "%ld\n", ((long *)(log->values))[i]);
break;
case ML_LONGLONG:
fprintf(fp, "%lld\n", ((long long *)(log->values))[i]);
break;
case ML_STRING:
fprintf(fp, "%s\n", ((char **)(log->values))[i]);
break;
}
}
fclose(fp);
}

View File

@ -1109,3 +1109,47 @@ int new_samples, new_nfds;
if (next_sample >= n_avg_samples)
next_sample = 0;
}
/**
* Insert a fake write completion event for a DCB into the polling
* queue.
*
* This is used to trigger transmission activity on another DCB from
* within the event processing routine of a DCB.
*
* @param dcb DCB to emulate an EPOLLOUT event for
*/
void
poll_fake_write_event(DCB *dcb)
{
uint32_t ev = EPOLLOUT;
spinlock_acquire(&pollqlock);
if (DCB_POLL_BUSY(dcb))
{
dcb->evq.pending_events |= ev;
}
else
{
dcb->evq.pending_events = ev;
if (eventq)
{
dcb->evq.prev = eventq->evq.prev;
eventq->evq.prev->evq.next = dcb;
eventq->evq.prev = dcb;
dcb->evq.next = eventq;
}
else
{
eventq = dcb;
dcb->evq.prev = dcb;
dcb->evq.next = dcb;
}
pollStats.evq_length++;
if (pollStats.evq_length > pollStats.evq_max)
{
pollStats.evq_max = pollStats.evq_length;
}
}
spinlock_release(&pollqlock);
}

View File

@ -2,12 +2,15 @@ add_executable(test_hash testhash.c)
add_executable(test_spinlock testspinlock.c)
add_executable(test_filter testfilter.c)
add_executable(test_adminusers testadminusers.c)
add_executable(testmemlog testmemlog.c)
target_link_libraries(test_hash fullcore)
target_link_libraries(test_spinlock fullcore)
target_link_libraries(test_filter fullcore)
target_link_libraries(test_adminusers fullcore)
target_link_libraries(testmemlog fullcore)
add_test(TestHash test_hash)
add_test(TestSpinlock test_spinlock)
add_test(TestFilter test_filter)
add_test(TestAdminUsers test_adminusers)
add_test(TestMemlog testmemlog)

View File

@ -0,0 +1,404 @@
/*
* This file is distributed as part of MaxScale from MariaDB. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright MariaDB Corporation 2014
*/
/**
*
* @verbatim
* Revision History
*
* Date Who Description
* 30/09/2014 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <memlog.h>
/**
* Count the number of lines in a file
*
* @param file The name of the file
* @return -1 if the file could not be opened or the numebr of lines
*/
int
linecount(char *file)
{
FILE *fp;
int i = 0;
char buffer[180];
if ((fp = fopen(file, "r")) == NULL)
return -1;
while (fgets(buffer, 180, fp) != NULL)
i++;
fclose(fp);
return i;
}
/* Some strings to log */
char *strings[] = {
"First log entry",
"Second entry",
"Third",
"The fourth thing to log",
"Add a final 5th item"
};
int
main()
{
MEMLOG *log, *log2;
int i;
long j;
long long k;
int failures = 0;
unlink("memlog1");
if ((log = memlog_create("memlog1", ML_INT, 100)) == NULL)
{
printf("Memlog Creation: Failed\n");
failures++;
}
else
{
printf("Memlog Creation: Passed\n");
if (access("memlog1",R_OK) == 0)
{
printf("File existance 1: Failed\n");
failures++;
}
else
printf("File existance 1: Passed\n");
for (i = 0; i < 50; i++)
memlog_log(log, (void *)i);
if (access("memlog1",R_OK) == 0)
{
printf("File existance 2: Failed\n");
failures++;
}
else
printf("File existance 2: Passed\n");
for (i = 0; i < 50; i++)
memlog_log(log, (void *)i);
if (access("memlog1",R_OK) != 0)
{
printf("File existance 3: Failed\n");
failures++;
}
else
printf("File existance 3: Passed\n");
if (linecount("memlog1") != 100)
{
printf("Incorrect entry count: Failed\n");
failures++;
}
else
printf("Incorrect entry count: Passed\n");
for (i = 0; i < 50; i++)
memlog_log(log, (void *)i);
if (linecount("memlog1") != 100)
{
printf("Premature Flushing: Failed\n");
failures++;
}
else
printf("Premature Flushing: Passed\n");
memlog_destroy(log);
if (linecount("memlog1") != 150)
{
printf("Flush on destroy: Failed\n");
failures++;
}
else
printf("Flush on destroy: Passed\n");
}
unlink("memlog2");
if ((log = memlog_create("memlog2", ML_LONG, 100)) == NULL)
{
printf("Memlog Creation: Failed\n");
failures++;
}
else
{
printf("Memlog Creation: Passed\n");
if (access("memlog2",R_OK) == 0)
{
printf("File existance 1: Failed\n");
failures++;
}
else
printf("File existance 1: Passed\n");
for (j = 0; j < 50; j++)
memlog_log(log, (void *)j);
if (access("memlog2",R_OK) == 0)
{
printf("File existance 2: Failed\n");
failures++;
}
else
printf("File existance 2: Passed\n");
for (j = 0; j < 50; j++)
memlog_log(log, (void *)j);
if (access("memlog2",R_OK) != 0)
{
printf("File existance 3: Failed\n");
failures++;
}
else
printf("File existance 3: Passed\n");
if (linecount("memlog2") != 100)
{
printf("Incorrect entry count: Failed\n");
failures++;
}
else
printf("Incorrect entry count: Passed\n");
for (j = 0; j < 50; j++)
memlog_log(log, (void *)j);
if (linecount("memlog2") != 100)
{
printf("Premature Flushing: Failed\n");
failures++;
}
else
printf("Premature Flushing: Passed\n");
memlog_destroy(log);
if (linecount("memlog2") != 150)
{
printf("Flush on destroy: Failed\n");
failures++;
}
else
printf("Flush on destroy: Passed\n");
}
unlink("memlog3");
if ((log = memlog_create("memlog3", ML_LONGLONG, 100)) == NULL)
{
printf("Memlog Creation: Failed\n");
failures++;
}
else
{
printf("Memlog Creation: Passed\n");
if (access("memlog3",R_OK) == 0)
{
printf("File existance 1: Failed\n");
failures++;
}
else
printf("File existance 1: Passed\n");
for (k = 0; k < 50; k++)
memlog_log(log, (void *)k);
if (access("memlog3",R_OK) == 0)
{
printf("File existance 2: Failed\n");
failures++;
}
else
printf("File existance 2: Passed\n");
for (k = 0; k < 50; k++)
memlog_log(log, (void *)k);
if (access("memlog3",R_OK) != 0)
{
printf("File existance 3: Failed\n");
failures++;
}
else
printf("File existance 3: Passed\n");
if (linecount("memlog3") != 100)
{
printf("Incorrect entry count: Failed\n");
failures++;
}
else
printf("Incorrect entry count: Passed\n");
for (k = 0; k < 50; k++)
memlog_log(log, (void *)k);
if (linecount("memlog3") != 100)
{
printf("Premature Flushing: Failed\n");
failures++;
}
else
printf("Premature Flushing: Passed\n");
memlog_destroy(log);
if (linecount("memlog3") != 150)
{
printf("Flush on destroy: Failed\n");
failures++;
}
else
printf("Flush on destroy: Passed\n");
}
unlink("memlog4");
if ((log = memlog_create("memlog4", ML_STRING, 100)) == NULL)
{
printf("Memlog Creation: Failed\n");
failures++;
}
else
{
printf("Memlog Creation: Passed\n");
if (access("memlog4",R_OK) == 0)
{
printf("File existance 1: Failed\n");
failures++;
}
else
printf("File existance 1: Passed\n");
for (i = 0; i < 50; i++)
memlog_log(log, strings[i%5]);
if (access("memlog4",R_OK) == 0)
{
printf("File existance 2: Failed\n");
failures++;
}
else
printf("File existance 2: Passed\n");
for (i = 0; i < 50; i++)
memlog_log(log, strings[i%5]);
if (access("memlog4",R_OK) != 0)
{
printf("File existance 3: Failed\n");
failures++;
}
else
printf("File existance 3: Passed\n");
if (linecount("memlog4") != 100)
{
printf("Incorrect entry count: Failed\n");
failures++;
}
else
printf("Incorrect entry count: Passed\n");
for (i = 0; i < 50; i++)
memlog_log(log, strings[i%5]);
if (linecount("memlog4") != 100)
{
printf("Premature Flushing: Failed\n");
failures++;
}
else
printf("Premature Flushing: Passed\n");
memlog_destroy(log);
if (linecount("memlog4") != 150)
{
printf("Flush on destroy: Failed\n");
failures++;
}
else
printf("Flush on destroy: Passed\n");
}
unlink("memlog5");
unlink("memlog6");
if ((log = memlog_create("memlog5", ML_INT, 100)) == NULL)
{
printf("Memlog Creation: Failed\n");
failures++;
}
else
{
printf("Memlog Creation: Passed\n");
if ((log2 = memlog_create("memlog6", ML_INT, 100)) == NULL)
{
printf("Memlog Creation: Failed\n");
failures++;
}
else
{
printf("Memlog Creation: Passed\n");
for (i = 0; i < 40; i++)
memlog_log(log, (void *)i);
for (i = 0; i < 30; i++)
memlog_log(log2, (void *)i);
memlog_flush_all();
if (linecount("memlog5") != 40 ||
linecount("memlog6") != 30)
{
printf(
"Memlog flush all: Failed\n");
failures++;
}
else
printf(
"Memlog flush all: Passed\n");
}
}
unlink("memlog7");
if ((log = memlog_create("memlog7", ML_INT, 100)) == NULL)
{
printf("Memlog Creation: Failed\n");
failures++;
}
else
{
printf("Memlog Creation: Passed\n");
if (access("memlog7",R_OK) == 0)
{
printf("File existance 1: Failed\n");
failures++;
}
else
printf("File existance 1: Passed\n");
for (i = 0; i < 5050; i++)
memlog_log(log, (void *)i);
if (access("memlog7",R_OK) != 0)
{
printf("File existance 3: Failed\n");
failures++;
}
else
printf("File existance 3: Passed\n");
if (linecount("memlog7") != 5000)
{
printf("Incorrect entry count: Failed\n");
failures++;
}
else
printf("Incorrect entry count: Passed\n");
for (i = 0; i < 50; i++)
memlog_log(log, (void *)i);
if (linecount("memlog7") != 5100)
{
printf("Residual flushing: Failed\n");
failures++;
}
else
printf("Premature Flushing: Passed\n");
for (i = 0; i < 10120; i++)
memlog_log(log, (void *)i);
memlog_destroy(log);
if (linecount("memlog7") != 15220)
{
printf("Flush on destroy: Failed\n");
failures++;
}
else
printf("Flush on destroy: Passed\n");
}
exit(failures);
}

51
server/include/memlog.h Normal file
View File

@ -0,0 +1,51 @@
#ifndef _MEMLOG_H
#define _MEMLOG_H
/*
* This file is distributed as part of MariaDB MaxScale. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright MariaDB Ab 2014
*/
/**
* @file memlog.h The memory logging mechanism
*
* @verbatim
* Revision History
*
* Date Who Description
* 26/09/14 Mark Riddoch Initial implementation
*
* @endverbatim
*/
#include <spinlock.h>
typedef enum { ML_INT, ML_LONG, ML_LONGLONG, ML_STRING } MEMLOGTYPE;
typedef struct memlog {
char *name;
SPINLOCK lock;
void *values;
int offset;
int size;
MEMLOGTYPE type;
struct memlog *next;
} MEMLOG;
extern MEMLOG *memlog_create(char *, MEMLOGTYPE, int);
extern void memlog_destroy(MEMLOG *);
extern void memlog_log(MEMLOG *, void *);
extern void memlog_flush_all();
#endif

View File

@ -31,7 +31,7 @@
#include <thread.h>
#include <stdbool.h>
#define SPINLOCK_PROFILE 1
#define SPINLOCK_PROFILE 0
/**
* The spinlock structure.

View File

@ -39,12 +39,22 @@
#define BINLOG_NAMEFMT "%s.%06d"
#define BINLOG_NAME_ROOT "mysql-bin"
/* How often to call the binlog status function (seconds) */
#define BLR_STATS_FREQ 60
#define BLR_NSTATS_MINUTES 30
/**
* High and Low water marks for the slave dcb. These values can be overriden
* by the router options highwater and lowwater.
*/
#define DEF_LOW_WATER 2000
#define DEF_HIGH_WATER 30000
#define DEF_LOW_WATER 1000
#define DEF_HIGH_WATER 10000
/**
* Default burst sizes for slave catchup
*/
#define DEF_SHORT_BURST 15
#define DEF_LONG_BURST 2000
/**
* Some useful macros for examining the MySQL Response packets
@ -60,20 +70,23 @@
* Slave statistics
*/
typedef struct {
int n_events; /*< Number of events sent */
int n_bursts; /*< Number of bursts sent */
int n_requests; /*< Number of requests received */
int n_flows; /*< Number of flow control restarts */
int n_catchupnr; /*< No. of times catchup resulted in not entering loop */
int n_alreadyupd;
int n_upd;
int n_cb;
int n_cbna;
int n_dcb;
int n_above;
int n_failed_read;
int n_overrun;
int n_actions[3];
int n_events; /*< Number of events sent */
int n_bursts; /*< Number of bursts sent */
int n_requests; /*< Number of requests received */
int n_flows; /*< Number of flow control restarts */
int n_catchupnr; /*< No. of times catchup resulted in not entering loop */
int n_alreadyupd;
int n_upd;
int n_cb;
int n_cbna;
int n_dcb;
int n_above;
int n_failed_read;
int n_overrun;
int n_actions[3];
uint64_t lastsample;
int minno;
int minavgs[BLR_NSTATS_MINUTES];
} SLAVE_STATS;
/**
@ -132,6 +145,9 @@ typedef struct {
uint64_t n_fakeevents; /*< Fake events not written to disk */
uint64_t n_artificial; /*< Artificial events not written to disk */
uint64_t events[0x24]; /*< Per event counters */
uint64_t lastsample;
int minno;
int minavgs[BLR_NSTATS_MINUTES];
} ROUTER_STATS;
/**
@ -214,6 +230,8 @@ typedef struct router_instance {
*/
unsigned int low_water; /*< Low water mark for client DCB */
unsigned int high_water; /*< High water mark for client DCB */
unsigned int short_burst; /*< Short burst for slave catchup */
unsigned int long_burst; /*< Long burst for slave catchup */
BLCACHE *cache[2];
ROUTER_STATS stats; /*< Statistics for this router */
int active_logs;
@ -279,12 +297,13 @@ static char *blrs_states[] = { "Created", "Unregistered", "Registered",
/**
* Slave catch-up status
*/
#define CS_READING 0x0001
#define CS_INNERLOOP 0x0002
#define CS_UPTODATE 0x0004
#define CS_EXPECTCB 0x0008
#define CS_DIST 0x0010
#define CS_DISTLATCH 0x0020
#define CS_THRDWAIT 0x0040
#define CS_BUSY 0x0100
#define CS_HOLD 0x0200
/**
* MySQL protocol OpCodes needed for replication
@ -356,7 +375,7 @@ extern void blr_master_reconnect(ROUTER_INSTANCE *);
extern int blr_slave_request(ROUTER_INSTANCE *, ROUTER_SLAVE *, GWBUF *);
extern void blr_slave_rotate(ROUTER_SLAVE *slave, uint8_t *ptr);
extern int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
extern int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
extern void blr_init_cache(ROUTER_INSTANCE *);
extern void blr_file_init(ROUTER_INSTANCE *);

View File

@ -48,6 +48,7 @@
#include <blr.h>
#include <dcb.h>
#include <spinlock.h>
#include <housekeeper.h>
#include <time.h>
#include <skygw_types.h>
@ -95,6 +96,8 @@ static ROUTER_OBJECT MyObject = {
getCapabilities
};
static void stats_func(void *);
static bool rses_begin_locked_router_action(ROUTER_SLAVE *);
static void rses_end_locked_router_action(ROUTER_SLAVE *);
@ -173,6 +176,8 @@ int i;
inst->low_water = DEF_LOW_WATER;
inst->high_water = DEF_HIGH_WATER;
inst->initbinlog = 0;
inst->short_burst = DEF_SHORT_BURST;
inst->long_burst = DEF_LONG_BURST;
/*
* We only support one server behind this router, since the server is
@ -249,6 +254,10 @@ int i;
{
inst->initbinlog = atoi(value);
}
else if (strcmp(options[i], "file") == 0)
{
inst->initbinlog = atoi(value);
}
else if (strcmp(options[i], "lowwater") == 0)
{
inst->low_water = atoi(value);
@ -257,6 +266,14 @@ int i;
{
inst->high_water = atoi(value);
}
else if (strcmp(options[i], "shortburst") == 0)
{
inst->short_burst = atoi(value);
}
else if (strcmp(options[i], "longburst") == 0)
{
inst->long_burst = atoi(value);
}
else
{
LOGIF(LE, (skygw_log_write(
@ -302,6 +319,8 @@ int i;
*/
blr_init_cache(inst);
hktask_add("Binlog Router", stats_func, inst, BLR_STATS_FREQ);
/*
* Now start the replication from the master to MaxScale
*/
@ -541,7 +560,9 @@ diagnostics(ROUTER *router, DCB *dcb)
{
ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)router;
ROUTER_SLAVE *session;
int i = 0;
int i = 0, j;
int minno = 0;
double min5, min10, min15, min30;
char buf[40];
struct tm tm;
@ -554,6 +575,30 @@ struct tm tm;
}
spinlock_release(&router_inst->lock);
minno = router_inst->stats.minno;
min30 = 0.0;
min15 = 0.0;
min10 = 0.0;
min5 = 0.0;
for (j = 0; j < 30; j++)
{
minno--;
if (minno < 0)
minno += 30;
min30 += router_inst->stats.minavgs[minno];
if (j < 15)
min15 += router_inst->stats.minavgs[minno];
if (j < 10)
min10 += router_inst->stats.minavgs[minno];
if (j < 5)
min5 += router_inst->stats.minavgs[minno];
}
min30 /= 30.0;
min15 /= 15.0;
min10 /= 10.0;
min5 /= 5.0;
dcb_printf(dcb, "\tMaster connection DCB: %p\n",
router_inst->master);
dcb_printf(dcb, "\tMaster connection state: %s\n",
@ -574,6 +619,13 @@ struct tm tm;
router_inst->stats.n_slaves);
dcb_printf(dcb, "\tNumber of binlog events received: %u\n",
router_inst->stats.n_binlogs);
minno = router_inst->stats.minno - 1;
if (minno == -1)
minno = 30;
dcb_printf(dcb, "\tNumber of binlog events per minute\n");
dcb_printf(dcb, "\tCurrent 5 10 15 30 Min Avg\n");
dcb_printf(dcb, "\t %6d %8.1f %8.1f %8.1f %8.1f\n",
router_inst->stats.minavgs[minno], min5, min10, min15, min30);
dcb_printf(dcb, "\tNumber of fake binlog events: %u\n",
router_inst->stats.n_fakeevents);
dcb_printf(dcb, "\tNumber of artificial binlog events: %u\n",
@ -582,10 +634,6 @@ struct tm tm;
router_inst->stats.n_binlog_errors);
dcb_printf(dcb, "\tNumber of binlog rotate events: %u\n",
router_inst->stats.n_rotates);
dcb_printf(dcb, "\tNumber of binlog cache hits: %u\n",
router_inst->stats.n_cachehits);
dcb_printf(dcb, "\tNumber of binlog cache misses: %u\n",
router_inst->stats.n_cachemisses);
dcb_printf(dcb, "\tNumber of heartbeat events: %u\n",
router_inst->stats.n_heartbeats);
dcb_printf(dcb, "\tNumber of packets received: %u\n",
@ -624,6 +672,29 @@ struct tm tm;
session = router_inst->slaves;
while (session)
{
minno = session->stats.minno;
min30 = 0.0;
min15 = 0.0;
min10 = 0.0;
min5 = 0.0;
for (j = 0; j < 30; j++)
{
minno--;
if (minno < 0)
minno += 30;
min30 += session->stats.minavgs[minno];
if (j < 15)
min15 += session->stats.minavgs[minno];
if (j < 10)
min10 += session->stats.minavgs[minno];
if (j < 5)
min5 += session->stats.minavgs[minno];
}
min30 /= 30.0;
min15 /= 15.0;
min10 /= 10.0;
min5 /= 5.0;
dcb_printf(dcb, "\t\tServer-id: %d\n", session->serverid);
if (session->hostname)
dcb_printf(dcb, "\t\tHostname: %s\n", session->hostname);
@ -637,14 +708,20 @@ struct tm tm;
dcb_printf(dcb, "\t\tNo. requests: %u\n", session->stats.n_requests);
dcb_printf(dcb, "\t\tNo. events sent: %u\n", session->stats.n_events);
dcb_printf(dcb, "\t\tNo. bursts sent: %u\n", session->stats.n_bursts);
minno = session->stats.minno - 1;
if (minno == -1)
minno = 30;
dcb_printf(dcb, "\t\tNumber of binlog events per minute\n");
dcb_printf(dcb, "\t\tCurrent 5 10 15 30 Min Avg\n");
dcb_printf(dcb, "\t\t %6d %8.1f %8.1f %8.1f %8.1f\n",
session->stats.minavgs[minno], min5, min10,
min15, min30);
dcb_printf(dcb, "\t\tNo. flow control: %u\n", session->stats.n_flows);
dcb_printf(dcb, "\t\tNo. catchup NRs: %u\n", session->stats.n_catchupnr);
dcb_printf(dcb, "\t\tNo. already up to date: %u\n", session->stats.n_alreadyupd);
dcb_printf(dcb, "\t\tNo. up to date: %u\n", session->stats.n_upd);
dcb_printf(dcb, "\t\tNo. of low water cbs %u\n", session->stats.n_cb);
dcb_printf(dcb, "\t\tNo. of drained cbs %u\n", session->stats.n_dcb);
dcb_printf(dcb, "\t\tNo. of low water cbs N/A %u\n", session->stats.n_cbna);
dcb_printf(dcb, "\t\tNo. of events > high water %u\n", session->stats.n_above);
dcb_printf(dcb, "\t\tNo. of failed reads %u\n", session->stats.n_failed_read);
dcb_printf(dcb, "\t\tNo. of nested distribute events %u\n", session->stats.n_overrun);
dcb_printf(dcb, "\t\tNo. of distribute action 1 %u\n", session->stats.n_actions[0]);
@ -776,3 +853,35 @@ static uint8_t getCapabilities(ROUTER *inst, void *router_session)
{
return 0;
}
/**
* The stats gathering function called from the housekeeper so that we
* can get timed averages of binlog records shippped
*
* @param inst The router instance
*/
static void
stats_func(void *inst)
{
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)inst;
ROUTER_SLAVE *slave;
router->stats.minavgs[router->stats.minno++]
= router->stats.n_binlogs - router->stats.lastsample;
router->stats.lastsample = router->stats.n_binlogs;
if (router->stats.minno == BLR_NSTATS_MINUTES)
router->stats.minno = 0;
spinlock_acquire(&router->lock);
slave = router->slaves;
while (slave)
{
slave->stats.minavgs[slave->stats.minno++]
= slave->stats.n_events - slave->stats.lastsample;
slave->stats.lastsample = slave->stats.n_events;
if (slave->stats.minno == BLR_NSTATS_MINUTES)
slave->stats.minno = 0;
slave = slave->next;
}
spinlock_release(&router->lock);
}

View File

@ -60,39 +60,6 @@
/* Temporary requirement for auth data */
#include <mysql_client_server_protocol.h>
#define SAMPLE_COUNT 10000
CYCLES samples[10][SAMPLE_COUNT];
int sample_index[10] = { 0, 0, 0 };
#define LOGD_SLAVE_CATCHUP1 0
#define LOGD_SLAVE_CATCHUP2 1
#define LOGD_DISTRIBUTE 2
#define LOGD_FILE_FLUSH 3
SPINLOCK logspin = SPINLOCK_INIT;
void
log_duration(int sample, CYCLES duration)
{
char fname[100];
int i;
FILE *fp;
spinlock_acquire(&logspin);
samples[sample][sample_index[sample]++] = duration;
if (sample_index[sample] == SAMPLE_COUNT)
{
sprintf(fname, "binlog_profile.%d", sample);
if ((fp = fopen(fname, "a")) != NULL)
{
for (i = 0; i < SAMPLE_COUNT; i++)
fprintf(fp, "%ld\n", samples[sample][i]);
fclose(fp);
}
sample_index[sample] = 0;
}
spinlock_release(&logspin);
}
extern int lm_enabled_logfiles_bitmask;
@ -802,9 +769,7 @@ static REP_HEADER phdr;
{
ss_dassert(pkt_length == 0);
}
{ CYCLES start = rdtsc();
blr_file_flush(router);
log_duration(LOGD_FILE_FLUSH, rdtsc() - start); }
}
/**
@ -929,9 +894,7 @@ GWBUF *pkt;
uint8_t *buf;
ROUTER_SLAVE *slave;
int action;
CYCLES entry;
entry = rdtsc();
spinlock_acquire(&router->lock);
slave = router->slaves;
while (slave)
@ -985,16 +948,12 @@ CYCLES entry;
spinlock_acquire(&slave->catch_lock);
if (slave->overrun)
{
CYCLES cycle_start, cycles;
slave->stats.n_overrun++;
slave->overrun = 0;
spinlock_release(&router->lock);
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
spinlock_release(&slave->catch_lock);
cycle_start = rdtsc();
blr_slave_catchup(router, slave);
cycles = rdtsc() - cycle_start;
log_duration(LOGD_SLAVE_CATCHUP2, cycles);
blr_slave_catchup(router, slave, false);
spinlock_acquire(&router->lock);
slave = router->slaves;
if (slave)
@ -1027,7 +986,6 @@ log_duration(LOGD_SLAVE_CATCHUP2, cycles);
*/
if (slave->cstate & CS_UPTODATE)
{
CYCLES cycle_start, cycles;
spinlock_release(&router->lock);
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"Force slave %d into catchup mode %s@%d\n",
@ -1036,10 +994,7 @@ CYCLES cycle_start, cycles;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
spinlock_release(&slave->catch_lock);
cycle_start = rdtsc();
blr_slave_catchup(router, slave);
cycles = rdtsc() - cycle_start;
log_duration(LOGD_SLAVE_CATCHUP1, cycles);
blr_slave_catchup(router, slave, false);
spinlock_acquire(&router->lock);
slave = router->slaves;
if (slave)
@ -1053,7 +1008,6 @@ log_duration(LOGD_SLAVE_CATCHUP1, cycles);
slave = slave->next;
}
spinlock_release(&router->lock);
log_duration(LOGD_DISTRIBUTE, rdtsc() - entry);
}
static void

View File

@ -52,7 +52,6 @@
#include <skygw_utils.h>
#include <log_manager.h>
static uint32_t extract_field(uint8_t *src, int bits);
static void encode_value(unsigned char *data, unsigned int value, int len);
static int blr_slave_query(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
@ -61,7 +60,7 @@ static void blr_slave_send_error(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, c
static int blr_slave_send_timestamp(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
static int blr_slave_register(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
static int blr_slave_binlog_dump(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, GWBUF *queue);
int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave);
int blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large);
static uint8_t *blr_build_header(GWBUF *pkt, REP_HEADER *hdr);
static int blr_slave_callback(DCB *dcb, DCB_REASON reason, void *data);
@ -567,7 +566,7 @@ uint32_t chksum;
slave->dcb->low_water = router->low_water;
slave->dcb->high_water = router->high_water;
dcb_add_callback(slave->dcb, DCB_REASON_LOW_WATER, blr_slave_callback, slave);
// dcb_add_callback(slave->dcb, DCB_REASON_LOW_WATER, blr_slave_callback, slave);
dcb_add_callback(slave->dcb, DCB_REASON_DRAINED, blr_slave_callback, slave);
if (slave->binlog_pos != router->binlog_position ||
@ -576,7 +575,7 @@ uint32_t chksum;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_UPTODATE;
spinlock_release(&slave->catch_lock);
rval = blr_slave_catchup(router, slave);
rval = blr_slave_catchup(router, slave, true);
}
return rval;
@ -660,187 +659,107 @@ uint8_t *ptr;
* We have a registered slave that is behind the current leading edge of the
* binlog. We must replay the log entries to bring this node up to speed.
*
* There may be a large numebr of records to send to the slave, the process
* There may be a large number of records to send to the slave, the process
* is triggered by the slave COM_BINLOG_DUMP message and all the events must
* be sent without receiving any new event. This measn there is no trigger into
* MaxScale other than this initial message. However, if we simply send all the
* events we end up with an extremely long write queue on the DCB and risk running
* the server out of resources.
* events we end up with an extremely long write queue on the DCB and risk
* running the server out of resources.
*
* To resolve this the concept of high and low water marks within the DCB has been
* added, with the ability for the DCB code to call user defined callbacks when the
* write queue is completely drained, when it crosses above the high water mark and
* when it crosses below the low water mark.
*
* The blr_slave_catchup routine will send binlog events to the slave until the high
* water mark is reached, at which point it will return. Later, when a low water mark
* callback is generated by the code that drains the DCB of data the blr_slave_catchup
* routine will again be called to write more events. The process is repeated until
* the slave has caught up with the master.
* The slave catchup routine will send a burst of replication events per single
* call. The paramter "long" control the number of events in the burst. The
* short burst is intended to be used when the master receive an event and
* needs to put the slave into catchup mode. This prevents the slave taking
* too much tiem away from the thread that is processing the master events.
*
* Note: an additional check that the DCB is still above the low water mark is done
* prior to the return from this function to allow for any delays due to the call to
* the close system call, since this may cause thread rescheduling.
* At the end ofthe burst a fake EPOLLOUT event is added to the poll event
* queue. This ensures that the slave callback for processing DCB write drain
* will be called and future catchup requests will be handle on another thread.
*
* @param router The binlog router
* @param slave The slave that is behind
* @param large Send a long or short burst of events
* @return The number of bytes written
*/
int
blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave)
blr_slave_catchup(ROUTER_INSTANCE *router, ROUTER_SLAVE *slave, bool large)
{
GWBUF *head, *record;
REP_HEADER hdr;
int written, fd, rval = 1, burst = 0;
int written, fd, rval = 1, burst;
uint8_t *ptr;
struct timespec req;
if (large)
burst = router->long_burst;
else
burst = router->short_burst;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_EXPECTCB;
spinlock_release(&slave->catch_lock);
doitagain:
/*
* We have a slightly complex syncronisation mechansim here,
* we need to make sure that we do not have multiple threads
* running the catchup loop, but we need to be very careful
* that we do not loose a call that is coming via a callback
* call as this will stall the binlog catchup process.
*
* We don't want to simply use a traditional mutex here for
* the loop, since this would block a MaxScale thread for
* an unacceptable length of time.
*
* We have two status bits, the CS_READING that says we are
* in the outer loop and the CS_INNERLOOP, to say we are in
* the inner loop.
*
* If just CS_READING is set the other thread may be about to
* enter the inner loop or may be about to exit the function
* completely. Therefore we have to wait to see if CS_READING
* is cleared or CS_INNERLOOP is set.
*
* If CS_READING gets cleared then this thread should proceed
* into the loop.
*
* If CS_INNERLOOP get's set then this thread does not need to
* proceed.
*
* If CS_READING is not set then this thread simply enters the
* loop.
*/
req.tv_sec = 0;
req.tv_nsec = 1000;
spinlock_acquire(&slave->catch_lock);
if (slave->cstate & CS_UPTODATE)
while ((slave->cstate & (CS_HOLD|CS_BUSY)) == (CS_HOLD|CS_BUSY))
{
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"blr_slave_catchup called with up to date slave %d at "
"%s@%d. Reading position %s@%d\n",
slave->serverid, slave->binlogfile,
slave->binlog_pos, router->binlog_name,
router->binlog_position)));
slave->stats.n_alreadyupd++;
spinlock_release(&slave->catch_lock);
return 1;
}
while (slave->cstate & CS_READING)
{
// Wait until we know what the other thread is doing
while ((slave->cstate & (CS_READING|CS_INNERLOOP)) == CS_READING)
{
spinlock_release(&slave->catch_lock);
nanosleep(&req, NULL);
spinlock_acquire(&slave->catch_lock);
}
// Other thread is in the innerloop
if ((slave->cstate & (CS_READING|CS_INNERLOOP)) == (CS_READING|CS_INNERLOOP))
{
spinlock_release(&slave->catch_lock);
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"blr_slave_catchup thread returning due to "
"lock being held by another thread. %s@%d\n",
slave->binlogfile,
slave->binlog_pos)));
slave->stats.n_catchupnr++;
return 1; // We cheat here and return 1 because otherwise
// an error would be sent and we do not want that
}
/* Release the lock for a short time to allow the other
* thread to exit the outer reading loop.
*/
spinlock_release(&slave->catch_lock);
req.tv_sec = 0;
req.tv_nsec = 100;
nanosleep(&req, NULL);
spinlock_acquire(&slave->catch_lock);
}
if (slave->pthread)
LOGIF(LD, (skygw_log_write(LOGFILE_DEBUG, "Multiple threads sending to same thread.\n")));
slave->pthread = pthread_self();
slave->cstate |= CS_READING;
slave->cstate |= (CS_HOLD|CS_BUSY);
spinlock_release(&slave->catch_lock);
if (DCB_ABOVE_HIGH_WATER(slave->dcb))
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "blr_slave_catchup above high water on entry.\n")));
do {
if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s\n",
slave->binlogfile)));
return 0;
}
slave->stats.n_bursts++;
while (burst-- &&
(record = blr_read_binlog(fd, slave->binlog_pos, &hdr)) != NULL)
{
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_HOLD;
spinlock_release(&slave->catch_lock);
head = gwbuf_alloc(5);
ptr = GWBUF_DATA(head);
encode_value(ptr, hdr.event_size + 1, 24);
ptr += 3;
*ptr++ = slave->seqno++;
*ptr++ = 0; // OK
head = gwbuf_append(head, record);
if (hdr.event_type == ROTATE_EVENT)
{
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_READING;
spinlock_release(&slave->catch_lock);
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s\n",
close(fd);
blr_slave_rotate(slave, GWBUF_DATA(record));
if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s\n",
slave->binlogfile)));
return 0;
break;
}
}
slave->stats.n_bursts++;
spinlock_acquire(&slave->catch_lock);
slave->cstate |= CS_INNERLOOP;
slave->cstate |= CS_HOLD;
spinlock_release(&slave->catch_lock);
while ((!DCB_ABOVE_HIGH_WATER(slave->dcb)) &&
(record = blr_read_binlog(fd, slave->binlog_pos, &hdr)) != NULL)
written = slave->dcb->func.write(slave->dcb, head);
if (written && hdr.event_type != ROTATE_EVENT)
{
if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++;
head = gwbuf_alloc(5);
ptr = GWBUF_DATA(head);
encode_value(ptr, hdr.event_size + 1, 24);
ptr += 3;
*ptr++ = slave->seqno++;
*ptr++ = 0; // OK
head = gwbuf_append(head, record);
if (hdr.event_type == ROTATE_EVENT)
{
close(fd);
blr_slave_rotate(slave, GWBUF_DATA(record));
if ((fd = blr_open_binlog(router, slave->binlogfile)) == -1)
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"blr_slave_catchup failed to open binlog file %s\n",
slave->binlogfile)));
break;
}
}
written = slave->dcb->func.write(slave->dcb, head);
if (written && hdr.event_type != ROTATE_EVENT)
{
slave->binlog_pos = hdr.next_pos;
}
rval = written;
slave->stats.n_events++;
burst++;
slave->binlog_pos = hdr.next_pos;
}
if (record == NULL)
slave->stats.n_failed_read++;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_INNERLOOP;
spinlock_release(&slave->catch_lock);
rval = written;
slave->stats.n_events++;
}
if (record == NULL)
slave->stats.n_failed_read++;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~CS_BUSY;
spinlock_release(&slave->catch_lock);
close(fd);
} while (record && DCB_BELOW_LOW_WATER(slave->dcb));
close(fd);
poll_fake_write_event(slave->dcb);
if (record)
{
slave->stats.n_flows++;
@ -864,25 +783,6 @@ if (hdr.event_size > DEF_HIGH_WATER) slave->stats.n_above++;
"blr_slave_catchup slave is up to date %s, %u\n",
slave->binlogfile, slave->binlog_pos)));
}
spinlock_acquire(&slave->catch_lock);
#if 0
if (slave->pthread != pthread_self())
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Multple threads in catchup for same slave: %x and %x\n", slave->pthread, pthread_self())));
abort();
}
#endif
slave->pthread = 0;
#if 0
if (DCB_BELOW_LOW_WATER(slave->dcb) && slave->binlog_pos != router->binlog_position) abort();
#endif
slave->cstate &= ~CS_READING;
spinlock_release(&slave->catch_lock);
if (DCB_BELOW_LOW_WATER(slave->dcb) && slave->binlog_pos != router->binlog_position)
{
LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, "Expected to be above low water\n")));
goto doitagain;
}
return rval;
}
@ -908,7 +808,7 @@ ROUTER_INSTANCE *router = slave->router;
slave->binlog_pos != router->binlog_position)
{
slave->stats.n_dcb++;
blr_slave_catchup(router, slave);
blr_slave_catchup(router, slave, true);
}
}
@ -917,7 +817,7 @@ ROUTER_INSTANCE *router = slave->router;
if (slave->state == BLRS_DUMPING)
{
slave->stats.n_cb++;
blr_slave_catchup(router, slave);
blr_slave_catchup(router, slave, true);
}
else
{