Shutdown fix in housekeeper

In memory logging in blr_master
This commit is contained in:
Mark Riddoch 2014-09-26 12:36:59 +01:00
parent 06596a0bc3
commit 3430fc99d2
7 changed files with 100 additions and 16 deletions

View File

@ -1646,6 +1646,7 @@ void
shutdown_server()
{
poll_shutdown();
hkshutdown();
log_flush_shutdown();
}

View File

@ -42,6 +42,8 @@ static HKTASK *tasks = NULL;
*/
static SPINLOCK tasklock = SPINLOCK_INIT;
static int do_shutdown = 0;
static void hkthread(void *);
/**
@ -172,6 +174,8 @@ void *taskdata;
for (;;)
{
if (do_shutdown)
return;
thread_millisleep(1000);
now = time(0);
spinlock_acquire(&tasklock);
@ -194,3 +198,13 @@ void *taskdata;
spinlock_release(&tasklock);
}
}
/**
* Called to shutdown the housekeeper
*
*/
void
hkshutdown()
{
do_shutdown = 1;
}

View File

@ -391,7 +391,7 @@ DCB *zombies = NULL;
while (1)
{
/* Process of the queue of waiting requests */
while (process_pollq(thread_id))
while (do_shutdown == 0 && process_pollq(thread_id))
{
if (thread_data)
thread_data[thread_id].state = THREAD_ZPROCESSING;
@ -885,6 +885,20 @@ poll_bitmask()
return &poll_mask;
}
/**
* Display an entry from the spinlock statistics data
*
* @param dcb The DCB to print to
* @param desc Description of the statistic
* @param value The statistic value
*/
static void
spin_reporter(void *dcb, char *desc, int value)
{
dcb_printf((DCB *)dcb, "\t%-40s %d\n", desc, value);
}
/**
* Debug routine to print the polling statistics
*
@ -922,6 +936,11 @@ int i;
}
dcb_printf(dcb, "\t>= %d\t\t\t%d\n", MAXNFDS,
pollStats.n_fds[MAXNFDS-1]);
#if SPINLOCK_PROFILE
dcb_printf(dcb, "Event queue lock statistics:\n");
spinlock_stats(&pollqlock, spin_reporter, dcb);
#endif
}
/**

View File

@ -47,4 +47,5 @@ typedef struct hktask {
extern void hkinit();
extern int hktask_add(char *name, void (*task)(void *), void *data, int frequency);
extern int hktask_remove(char *name);
extern void hkshutdown();
#endif

View File

@ -31,7 +31,7 @@
#include <thread.h>
#include <stdbool.h>
#define SPINLOCK_PROFILE 0
#define SPINLOCK_PROFILE 1
/**
* The spinlock structure.

View File

@ -43,8 +43,8 @@
* High and Low water marks for the slave dcb. These values can be overriden
* by the router options highwater and lowwater.
*/
#define DEF_LOW_WATER 20000
#define DEF_HIGH_WATER 300000
#define DEF_LOW_WATER 2000
#define DEF_HIGH_WATER 30000
/**
* Some useful macros for examining the MySQL Response packets

View File

@ -55,21 +55,57 @@
#include <skygw_utils.h>
#include <log_manager.h>
#include <rdtsc.h>
/* Temporary requirement for auth data */
#include <mysql_client_server_protocol.h>
#define SAMPLE_COUNT 10000
CYCLES samples[10][SAMPLE_COUNT];
int sample_index[10] = { 0, 0, 0 };
#define LOGD_SLAVE_CATCHUP1 0
#define LOGD_SLAVE_CATCHUP2 1
#define LOGD_DISTRIBUTE 2
#define LOGD_FILE_FLUSH 3
SPINLOCK logspin = SPINLOCK_INIT;
void
log_duration(int sample, CYCLES duration)
{
char fname[100];
int i;
FILE *fp;
spinlock_acquire(&logspin);
samples[sample][sample_index[sample]++] = duration;
if (sample_index[sample] == SAMPLE_COUNT)
{
sprintf(fname, "binlog_profile.%d", sample);
if ((fp = fopen(fname, "a")) != NULL)
{
for (i = 0; i < SAMPLE_COUNT; i++)
fprintf(fp, "%ld\n", samples[sample][i]);
fclose(fp);
}
sample_index[sample] = 0;
}
spinlock_release(&logspin);
}
extern int lm_enabled_logfiles_bitmask;
static GWBUF *blr_make_query(char *statement);
static GWBUF *blr_make_registration(ROUTER_INSTANCE *router);
static GWBUF *blr_make_binlog_dump(ROUTER_INSTANCE *router);
static void encode_value(unsigned char *data, unsigned int value, int len);
static void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
void encode_value(unsigned char *data, unsigned int value, int len);
void blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt);
static void blr_rotate_event(ROUTER_INSTANCE *router, uint8_t *pkt, REP_HEADER *hdr);
static void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
void blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
static void *CreateMySQLAuthData(char *username, char *password, char *database);
static void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
static uint32_t extract_field(uint8_t *src, int bits);
void blr_extract_header(uint8_t *pkt, REP_HEADER *hdr);
inline uint32_t extract_field(uint8_t *src, int bits);
static void blr_log_packet(logfile_id_t file, char *msg, uint8_t *ptr, int len);
static int keepalive = 1;
@ -460,7 +496,7 @@ int len = 0x1b;
* @param value The value to pack
* @param len Number of bits to encode value into
*/
static void
void
encode_value(unsigned char *data, unsigned int value, int len)
{
while (len > 0)
@ -478,7 +514,7 @@ encode_value(unsigned char *data, unsigned int value, int len)
* @param router The router instance
* @param pkt The binlog records
*/
static void
void
blr_handle_binlog_record(ROUTER_INSTANCE *router, GWBUF *pkt)
{
uint8_t *msg = NULL, *ptr, *pdata;
@ -766,7 +802,9 @@ static REP_HEADER phdr;
{
ss_dassert(pkt_length == 0);
}
{ CYCLES start = rdtsc();
blr_file_flush(router);
log_duration(LOGD_FILE_FLUSH, rdtsc() - start); }
}
/**
@ -775,7 +813,7 @@ static REP_HEADER phdr;
* @param pkt The incoming packet in a GWBUF chain
* @param hdr The packet header to populate
*/
static void
void
blr_extract_header(uint8_t *ptr, REP_HEADER *hdr)
{
@ -796,10 +834,10 @@ blr_extract_header(uint8_t *ptr, REP_HEADER *hdr)
* @param src The raw packet source
* @param birs The number of bits to extract (multiple of 8)
*/
static uint32_t
extract_field(uint8_t *src, int bits)
inline uint32_t
extract_field(register uint8_t *src, int bits)
{
uint32_t rval = 0, shift = 0;
register uint32_t rval = 0, shift = 0;
while (bits > 0)
{
@ -884,14 +922,16 @@ MYSQL_session *auth_info;
* @param hdr The replication event header
* @param ptr The raw replication event data
*/
static void
void
blr_distribute_binlog_record(ROUTER_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
{
GWBUF *pkt;
uint8_t *buf;
ROUTER_SLAVE *slave;
int action;
CYCLES entry;
entry = rdtsc();
spinlock_acquire(&router->lock);
slave = router->slaves;
while (slave)
@ -945,12 +985,16 @@ int action;
spinlock_acquire(&slave->catch_lock);
if (slave->overrun)
{
CYCLES cycle_start, cycles;
slave->stats.n_overrun++;
slave->overrun = 0;
spinlock_release(&router->lock);
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
spinlock_release(&slave->catch_lock);
cycle_start = rdtsc();
blr_slave_catchup(router, slave);
cycles = rdtsc() - cycle_start;
log_duration(LOGD_SLAVE_CATCHUP2, cycles);
spinlock_acquire(&router->lock);
slave = router->slaves;
if (slave)
@ -983,6 +1027,7 @@ int action;
*/
if (slave->cstate & CS_UPTODATE)
{
CYCLES cycle_start, cycles;
spinlock_release(&router->lock);
LOGIF(LD, (skygw_log_write_flush(LOGFILE_DEBUG,
"Force slave %d into catchup mode %s@%d\n",
@ -991,7 +1036,10 @@ int action;
spinlock_acquire(&slave->catch_lock);
slave->cstate &= ~(CS_UPTODATE|CS_DIST);
spinlock_release(&slave->catch_lock);
cycle_start = rdtsc();
blr_slave_catchup(router, slave);
cycles = rdtsc() - cycle_start;
log_duration(LOGD_SLAVE_CATCHUP1, cycles);
spinlock_acquire(&router->lock);
slave = router->slaves;
if (slave)
@ -1005,6 +1053,7 @@ int action;
slave = slave->next;
}
spinlock_release(&router->lock);
log_duration(LOGD_DISTRIBUTE, rdtsc() - entry);
}
static void