Merge branch 'release-1.0beta-refresh' into filter_harness

This commit is contained in:
Markus Makela
2014-09-24 09:26:20 +03:00
18 changed files with 446 additions and 255 deletions

View File

@ -33,7 +33,7 @@ endif
CC=cc CC=cc
CFLAGS=-c -Wall -g $(HISTFLAG) CFLAGS=-c -Wall -g $(HISTFLAG) -I ../server/include
SRCS= maxadmin.c SRCS= maxadmin.c

View File

@ -47,6 +47,9 @@
#include <dirent.h> #include <dirent.h>
#include <locale.h> #include <locale.h>
#include <errno.h> #include <errno.h>
#include <getopt.h>
#include <version.h>
#ifdef HISTORY #ifdef HISTORY
#include <histedit.h> #include <histedit.h>
@ -59,6 +62,7 @@ static int sendCommand(int so, char *cmd);
static void DoSource(int so, char *cmd); static void DoSource(int so, char *cmd);
static void DoUsage(); static void DoUsage();
static int isquit(char *buf); static int isquit(char *buf);
static void PrintVersion(const char *progname);
#ifdef HISTORY #ifdef HISTORY
static char * static char *
@ -70,6 +74,16 @@ prompt(EditLine *el __attribute__((__unused__)))
} }
#endif #endif
static struct option long_options[] = {
{"host", required_argument, 0, 'h'},
{"user", required_argument, 0, 'u'},
{"password", required_argument, 0, 'p'},
{"port", required_argument, 0, 'P'},
{"version", no_argument, 0, 'v'},
{"help", no_argument, 0, '?'},
{0, 0, 0, 0}
};
/** /**
* The main for the maxadmin client * The main for the maxadmin client
* *
@ -79,7 +93,7 @@ prompt(EditLine *el __attribute__((__unused__)))
int int
main(int argc, char **argv) main(int argc, char **argv)
{ {
int i, num, rv, fatal = 0; int i, num, rv;
#ifdef HISTORY #ifdef HISTORY
char *buf; char *buf;
EditLine *el = NULL; EditLine *el = NULL;
@ -94,109 +108,36 @@ char *hostname = "localhost";
char *port = "6603"; char *port = "6603";
char *user = "admin"; char *user = "admin";
char *passwd = NULL; char *passwd = NULL;
int so, cmdlen; int so;
char *cmd; int option_index = 0;
int argno = 0; char c;
cmd = malloc(1); while ((c = getopt_long(argc, argv, "h:p:P:u:v?",
*cmd = 0; long_options, &option_index))
cmdlen = 1; >= 0)
{
for (i = 1; i < argc; i++) switch (c) {
{ case 'h':
if (argv[i][0] == '-') hostname = strdup(optarg);
{ break;
switch (argv[i][1]) case 'p':
{ passwd = strdup(optarg);
case 'u': /* User */ break;
if (argv[i][2]) case 'P':
user = &(argv[i][2]); port = strdup(optarg);
else if (i + 1 < argc) break;
user = argv[++i]; case 'u':
else user = strdup(optarg);
{ break;
fprintf(stderr, "Missing username" case 'v':
"in -u option.\n"); PrintVersion(*argv);
fatal = 1; exit(EXIT_SUCCESS);
} case '?':
break; DoUsage(*argv);
case 'p': /* Password */ exit(optopt ? EXIT_FAILURE : EXIT_SUCCESS);
if (argv[i][2]) }
passwd = &(argv[i][2]);
else if (i + 1 < argc)
passwd = argv[++i];
else
{
fprintf(stderr, "Missing password "
"in -p option.\n");
fatal = 1;
}
break;
case 'h': /* hostname */
if (argv[i][2])
hostname = &(argv[i][2]);
else if (i + 1 < argc)
hostname = argv[++i];
else
{
fprintf(stderr, "Missing hostname value "
"in -h option.\n");
fatal = 1;
}
break;
case 'P': /* Port */
if (argv[i][2])
port = &(argv[i][2]);
else if (i + 1 < argc)
port = argv[++i];
else
{
fprintf(stderr, "Missing Port value "
"in -P option.\n");
fatal = 1;
}
break;
case '-':
{
char *word;
word = &argv[i][2];
if (strcmp(word, "help") == 0)
{
DoUsage();
exit(0);
}
break;
}
}
}
else
{
/* Arguments after the second argument are quoted
* to allow for quoted names on the command line
* to be passed on in quotes.
*/
if (argno++ > 1)
{
cmdlen += strlen(argv[i]) + 3;
cmd = realloc(cmd, cmdlen);
strcat(cmd, "\"");
strcat(cmd, argv[i]);
strcat(cmd, "\" ");
}
else
{
cmdlen += strlen(argv[i]) + 1;
cmd = realloc(cmd, cmdlen);
strcat(cmd, argv[i]);
strcat(cmd, " ");
}
}
} }
if (fatal)
exit(1);
if (passwd == NULL) if (passwd == NULL)
{ {
struct termios tty_attr; struct termios tty_attr;
@ -234,14 +175,28 @@ int argno = 0;
exit(1); exit(1);
} }
if (cmdlen > 1) if (optind < argc) {
{ int i, len = 0;
cmd[cmdlen - 2] = '\0'; /* Remove trailing space */ char *cmd;
if (access(cmd, R_OK) == 0)
DoSource(so, cmd); for (i = optind; i < argc; i++) {
else len += strlen(argv[i]) + 1;
sendCommand(so, cmd); }
exit(0);
cmd = malloc(len);
strcpy(cmd, argv[optind]);
for (i = optind +1; i < argc; i++) {
strcat(cmd, " ");
strcat(cmd, argv[i]);
}
if (access(cmd, R_OK) == 0)
DoSource(so, cmd);
else
sendCommand(so, cmd);
free(cmd);
exit(0);
} }
(void) setlocale(LC_CTYPE, ""); (void) setlocale(LC_CTYPE, "");
@ -445,7 +400,7 @@ char buf[20];
* Send a comamnd using the MaxScaled protocol, display the return data * Send a comamnd using the MaxScaled protocol, display the return data
* on standard output. * on standard output.
* *
* Input terminates with a lien containing jsut the text OK * Input terminates with a lien containing just the text OK
* *
* @param so The socket connect to MaxScale * @param so The socket connect to MaxScale
* @param cmd The command to send * @param cmd The command to send
@ -455,7 +410,7 @@ static int
sendCommand(int so, char *cmd) sendCommand(int so, char *cmd)
{ {
char buf[80]; char buf[80];
int i, j, newline = 0; int i, j, newline = 1;
write(so, cmd, strlen(cmd)); write(so, cmd, strlen(cmd));
while (1) while (1)
@ -533,23 +488,34 @@ FILE *fp;
return; return;
} }
/**
* Print version information
*/
static void
PrintVersion(const char *progname)
{
printf("%s Version %s\n", progname, MAXSCALE_VERSION);
}
/** /**
* Display the --help text. * Display the --help text.
*/ */
static void static void
DoUsage() DoUsage(const char *progname)
{ {
printf("maxadmin: The MaxScale administrative and monitor client.\n\n"); PrintVersion(progname);
printf("Usage: maxadmin [-u user] [-p password] [-h hostname] [-P port] [<command file> | <command>]\n\n"); printf("The MaxScale administrative and monitor client.\n\n");
printf(" -u user The user name to use for the connection, default\n"); printf("Usage: %s [-u user] [-p password] [-h hostname] [-P port] [<command file> | <command>]\n\n", progname);
printf(" -u|--user=... The user name to use for the connection, default\n");
printf(" is admin.\n"); printf(" is admin.\n");
printf(" -p password The user password, if not given the password will\n"); printf(" -p|--password=... The user password, if not given the password will\n");
printf(" be prompted for interactively\n"); printf(" be prompted for interactively\n");
printf(" -h hostname The maxscale host to connecto to. The default is\n"); printf(" -h|--hostname=... The maxscale host to connecto to. The default is\n");
printf(" localhost\n"); printf(" localhost\n");
printf(" -P port The port to use for the connection, the default\n"); printf(" -P|--port=... The port to use for the connection, the default\n");
printf(" port is 6603.\n"); printf(" port is 6603.\n");
printf(" --help Print this help text.\n"); printf(" -v|--version print version information and exit\n");
printf(" -?|--help Print this help text.\n");
printf("Any remaining arguments are treated as MaxScale commands or a file\n"); printf("Any remaining arguments are treated as MaxScale commands or a file\n");
printf("containing commands to execute.\n"); printf("containing commands to execute.\n");
} }

View File

@ -255,7 +255,7 @@ static int logmanager_write_log(
bool use_valist, bool use_valist,
bool spread_down, bool spread_down,
size_t len, size_t len,
char* str, const char* str,
va_list valist); va_list valist);
static blockbuf_t* blockbuf_init(logfile_id_t id); static blockbuf_t* blockbuf_init(logfile_id_t id);
@ -609,7 +609,7 @@ static int logmanager_write_log(
bool use_valist, bool use_valist,
bool spread_down, bool spread_down,
size_t str_len, size_t str_len,
char* str, const char* str,
va_list valist) va_list valist)
{ {
logfile_t* lf; logfile_t* lf;
@ -623,7 +623,7 @@ static int logmanager_write_log(
CHK_LOGMANAGER(lm); CHK_LOGMANAGER(lm);
if (id < LOGFILE_FIRST || id > LOGFILE_LAST) { if (id < LOGFILE_FIRST || id > LOGFILE_LAST) {
char* errstr = "Invalid logfile id argument."; const char* errstr = "Invalid logfile id argument.";
/** /**
* invalid id, since we don't have logfile yet. * invalid id, since we don't have logfile yet.
*/ */
@ -1181,7 +1181,7 @@ static bool logfile_set_enabled(
CHK_LOGMANAGER(lm); CHK_LOGMANAGER(lm);
if (id < LOGFILE_FIRST || id > LOGFILE_LAST) { if (id < LOGFILE_FIRST || id > LOGFILE_LAST) {
char* errstr = "Invalid logfile id argument."; const char* errstr = "Invalid logfile id argument.";
/** /**
* invalid id, since we don't have logfile yet. * invalid id, since we don't have logfile yet.
*/ */
@ -1235,7 +1235,7 @@ return_succp:
int skygw_log_write_flush( int skygw_log_write_flush(
logfile_id_t id, logfile_id_t id,
char* str, const char* str,
...) ...)
{ {
int err = 0; int err = 0;
@ -1291,7 +1291,7 @@ return_err:
int skygw_log_write( int skygw_log_write(
logfile_id_t id, logfile_id_t id,
char* str, const char* str,
...) ...)
{ {
int err = 0; int err = 0;

View File

@ -72,9 +72,9 @@ void skygw_logmanager_exit(void);
* free private write buffer list * free private write buffer list
*/ */
void skygw_log_done(void); void skygw_log_done(void);
int skygw_log_write(logfile_id_t id, char* format, ...); int skygw_log_write(logfile_id_t id, const char* format, ...);
int skygw_log_flush(logfile_id_t id); int skygw_log_flush(logfile_id_t id);
int skygw_log_write_flush(logfile_id_t id, char* format, ...); int skygw_log_write_flush(logfile_id_t id, const char* format, ...);
int skygw_log_enable(logfile_id_t id); int skygw_log_enable(logfile_id_t id);
int skygw_log_disable(logfile_id_t id); int skygw_log_disable(logfile_id_t id);

View File

@ -1575,6 +1575,7 @@ static char *service_params[] =
"use_sql_variables_in", /*< rwsplit only */ "use_sql_variables_in", /*< rwsplit only */
"version_string", "version_string",
"filters", "filters",
"weightby",
NULL NULL
}; };

View File

@ -404,17 +404,22 @@ DCB_CALLBACK *cb;
* the memdata.bitmask then the DCB is no longer able to be * the memdata.bitmask then the DCB is no longer able to be
* referenced and it can be finally removed. * referenced and it can be finally removed.
* *
* The excluded DCB allows a thread to exclude a DCB from zombie processing.
* It is used when a thread calls dcb_process_zombies when there is
* a DCB that the caller knows it will continue processing with.
*
* @param threadid The thread ID of the caller * @param threadid The thread ID of the caller
* @param excluded The DCB the thread currently uses, NULL or valid DCB.
*/ */
DCB * DCB *
dcb_process_zombies(int threadid) dcb_process_zombies(int threadid, DCB *excluded)
{ {
DCB *ptr, *lptr; DCB *ptr, *lptr;
DCB* dcb_list = NULL; DCB* dcb_list = NULL;
DCB* dcb = NULL; DCB* dcb = NULL;
bool succp = false; bool succp = false;
/*< /**
* Perform a dirty read to see if there is anything in the queue. * Perform a dirty read to see if there is anything in the queue.
* This avoids threads hitting the queue spinlock when the queue * This avoids threads hitting the queue spinlock when the queue
* is empty. This will really help when the only entry is being * is empty. This will really help when the only entry is being
@ -424,69 +429,100 @@ bool succp = false;
if (!zombies) if (!zombies)
return NULL; return NULL;
/*
* Process the zombie queue and create a list of DCB's that can be
* finally freed. This processing is down under a spinlock that
* will prevent new entries being added to the zombie queue. Therefore
* we do not want to do any expensive operations under this spinlock
* as it will block other threads. The expensive operations will be
* performed on the victim queue within holding the zombie queue
* spinlock.
*/
spinlock_acquire(&zombiespin); spinlock_acquire(&zombiespin);
ptr = zombies; ptr = zombies;
lptr = NULL; lptr = NULL;
while (ptr) while (ptr)
{ {
bitmask_clear(&ptr->memdata.bitmask, threadid); CHK_DCB(ptr);
if (bitmask_isallclear(&ptr->memdata.bitmask))
{ /*
/*< * Skip processing of the excluded DCB
* Remove the DCB from the zombie queue */
* and call the final free routine for the if (ptr == excluded)
* DCB
*
* ptr is the DCB we are processing
* lptr is the previous DCB on the zombie queue
* or NULL if the DCB is at the head of the queue
* tptr is the DCB after the one we are processing
* on the zombie queue
*/
DCB *tptr = ptr->memdata.next;
if (lptr == NULL)
zombies = tptr;
else
lptr->memdata.next = tptr;
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [dcb_process_zombies] Remove dcb %p fd %d "
"in state %s from zombies list.",
pthread_self(),
ptr,
ptr->fd,
STRDCBSTATE(ptr->state))));
ss_info_dassert(ptr->state == DCB_STATE_ZOMBIE,
"dcb not in DCB_STATE_ZOMBIE state.");
/*<
* Move dcb to linked list of victim dcbs.
*/
if (dcb_list == NULL) {
dcb_list = ptr;
dcb = dcb_list;
} else {
dcb->memdata.next = ptr;
dcb = dcb->memdata.next;
}
dcb->memdata.next = NULL;
ptr = tptr;
}
else
{ {
lptr = ptr; lptr = ptr;
ptr = ptr->memdata.next; ptr = ptr->memdata.next;
} }
else
{
bitmask_clear(&ptr->memdata.bitmask, threadid);
if (bitmask_isallclear(&ptr->memdata.bitmask))
{
/**
* Remove the DCB from the zombie queue
* and call the final free routine for the
* DCB
*
* ptr is the DCB we are processing
* lptr is the previous DCB on the zombie queue
* or NULL if the DCB is at the head of the
* queue tptr is the DCB after the one we are
* processing on the zombie queue
*/
DCB *tptr = ptr->memdata.next;
if (lptr == NULL)
zombies = tptr;
else
lptr->memdata.next = tptr;
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [dcb_process_zombies] Remove dcb "
"%p fd %d " "in state %s from the "
"list of zombies.",
pthread_self(),
ptr,
ptr->fd,
STRDCBSTATE(ptr->state))));
ss_info_dassert(ptr->state == DCB_STATE_ZOMBIE,
"dcb not in DCB_STATE_ZOMBIE state.");
/*<
* Move dcb to linked list of victim dcbs.
*/
if (dcb_list == NULL) {
dcb_list = ptr;
dcb = dcb_list;
} else {
dcb->memdata.next = ptr;
dcb = dcb->memdata.next;
}
dcb->memdata.next = NULL;
ptr = tptr;
}
else
{
lptr = ptr;
ptr = ptr->memdata.next;
}
}
} }
spinlock_release(&zombiespin); spinlock_release(&zombiespin);
/*
* Process the victim queue. These are DCBs that are not in
* use by any thread.
* The corresponding file descriptor is closed, the DCB marked
* as disconnected and the DCB itself is fianlly freed.
*/
dcb = dcb_list; dcb = dcb_list;
/*< Close, and set DISCONNECTED victims */
while (dcb != NULL) { while (dcb != NULL) {
DCB* dcb_next = NULL; DCB* dcb_next = NULL;
int rc = 0; int rc = 0;
/*< /*<
* Close file descriptor and move to clean-up phase. * Close file descriptor and move to clean-up phase.
*/ */
ss_dassert(excluded != dcb);
rc = close(dcb->fd); rc = close(dcb->fd);
if (rc < 0) { if (rc < 0) {
@ -1108,8 +1144,8 @@ int above_water;
/** /**
* Removes dcb from poll set, and adds it to zombies list. As a consequense, * Removes dcb from poll set, and adds it to zombies list. As a consequense,
* dcb first moves to DCB_STATE_NOPOLLING, and then to DCB_STATE_ZOMBIE state. * dcb first moves to DCB_STATE_NOPOLLING, and then to DCB_STATE_ZOMBIE state.
* At the end of the function state may not be DCB_STATE_ZOMBIE because once dcb_initlock * At the end of the function state may not be DCB_STATE_ZOMBIE because once
* is released parallel threads may change the state. * dcb_initlock is released parallel threads may change the state.
* *
* Parameters: * Parameters:
* @param dcb The DCB to close * @param dcb The DCB to close
@ -1928,13 +1964,15 @@ int rval = 0;
* and instead implements a queuing mechanism in which nested events are * and instead implements a queuing mechanism in which nested events are
* queued on the DCB such that when the thread processing the first event * queued on the DCB such that when the thread processing the first event
* returns it will read the queued event and process it. This allows the * returns it will read the queued event and process it. This allows the
* thread that woudl otherwise have to wait to process the nested event * thread that would otherwise have to wait to process the nested event
* to return immediately and and process other events. * to return immediately and and process other events.
* *
* @param dcb The DCB that has data available * @param dcb The DCB that has data available
* @param thread_id The ID of the calling thread
* @param nozombies If non-zero then do not do zombie processing
*/ */
void void
dcb_pollin(DCB *dcb, int thread_id) dcb_pollin(DCB *dcb, int thread_id, int nozombies)
{ {
spinlock_acquire(&dcb->pollinlock); spinlock_acquire(&dcb->pollinlock);
@ -1945,7 +1983,8 @@ dcb_pollin(DCB *dcb, int thread_id)
if (dcb->readcheck) if (dcb->readcheck)
{ {
dcb->stats.n_readrechecks++; dcb->stats.n_readrechecks++;
dcb_process_zombies(thread_id); if (!nozombies)
dcb_process_zombies(thread_id, dcb);
} }
dcb->readcheck = 0; dcb->readcheck = 0;
spinlock_release(&dcb->pollinlock); spinlock_release(&dcb->pollinlock);
@ -1976,9 +2015,11 @@ dcb_pollin(DCB *dcb, int thread_id)
* to return immediately and and process other events. * to return immediately and and process other events.
* *
* @param dcb The DCB thats available for writes * @param dcb The DCB thats available for writes
* @param thread_id The ID of the calling thread
* @param nozombies If non-zero then do not do zombie processing
*/ */
void void
dcb_pollout(DCB *dcb, int thread_id) dcb_pollout(DCB *dcb, int thread_id, int nozombies)
{ {
spinlock_acquire(&dcb->polloutlock); spinlock_acquire(&dcb->polloutlock);
@ -1988,7 +2029,8 @@ dcb_pollout(DCB *dcb, int thread_id)
do { do {
if (dcb->writecheck) if (dcb->writecheck)
{ {
dcb_process_zombies(thread_id); if (!nozombies)
dcb_process_zombies(thread_id, dcb);
dcb->stats.n_writerechecks++; dcb->stats.n_writerechecks++;
} }
dcb->writecheck = 0; dcb->writecheck = 0;

View File

@ -44,6 +44,7 @@
#include <string.h> #include <string.h>
#include <gw.h> #include <gw.h>
#include <unistd.h> #include <unistd.h>
#include <getopt.h>
#include <service.h> #include <service.h>
#include <server.h> #include <server.h>
#include <dcb.h> #include <dcb.h>
@ -131,6 +132,17 @@ static bool libmysqld_started = FALSE;
*/ */
static bool daemon_mode = true; static bool daemon_mode = true;
const char *progname = NULL;
static struct option long_options[] = {
{"homedir", required_argument, 0, 'c'},
{"config", required_argument, 0, 'f'},
{"nodeamon", required_argument, 0, 'd'},
{"log", required_argument, 0, 'l'},
{"version", no_argument, 0, 'v'},
{"help", no_argument, 0, '?'},
{0, 0, 0, 0}
};
static void log_flush_shutdown(void); static void log_flush_shutdown(void);
static void log_flush_cb(void* arg); static void log_flush_cb(void* arg);
static int write_pid_file(char *); /* write MaxScale pidfile */ static int write_pid_file(char *); /* write MaxScale pidfile */
@ -878,15 +890,19 @@ return_cnf_file_buf:
return cnf_file_buf; return cnf_file_buf;
} }
static void usage(void) static void usage(void)
{ {
fprintf(stderr, fprintf(stderr,
"*\n* Usage : maxscale [-h] | [-d] [-c <home " "\nUsage : %s [-h] | [-d] [-c <home directory>] [-f <config file name>]\n\n"
"directory>] [-f <config file name>]\n* where:\n* " " -d|--nodaemon enable running in terminal process (default:disabled)\n"
"-h help\n* -d enable running in terminal process (default:disabled)\n* " " -c|--homedir=... relative|absolute MaxScale home directory\n"
"-c relative|absolute MaxScale home directory\n* " " -f|--config=... relative|absolute pathname of MaxScale configuration file\n"
"-f relative|absolute pathname of MaxScale configuration file (default:MAXSCALE_HOME/etc/MaxScale.cnf)\n*\n"); " (default: $MAXSCALE_HOME/etc/MaxScale.cnf)\n"
" -l|--log=... log to file or shared memory\n"
" -lfile or -lshm - defaults to shared memory\n"
" -v|--version print version info and exit\n"
" -?|--help show this help\n"
, progname);
} }
/** /**
@ -943,6 +959,8 @@ int main(int argc, char **argv)
char* cnf_file_path = NULL; /*< conf file, to be freed */ char* cnf_file_path = NULL; /*< conf file, to be freed */
char* cnf_file_arg = NULL; /*< conf filename from cmd-line arg */ char* cnf_file_arg = NULL; /*< conf filename from cmd-line arg */
void* log_flush_thr = NULL; void* log_flush_thr = NULL;
int option_index;
int logtofile = 0; /* Use shared memory or file */
ssize_t log_flush_timeout_ms = 0; ssize_t log_flush_timeout_ms = 0;
sigset_t sigset; sigset_t sigset;
sigset_t sigpipe_mask; sigset_t sigpipe_mask;
@ -954,6 +972,8 @@ int main(int argc, char **argv)
sigemptyset(&sigpipe_mask); sigemptyset(&sigpipe_mask);
sigaddset(&sigpipe_mask, SIGPIPE); sigaddset(&sigpipe_mask, SIGPIPE);
progname = *argv;
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
memset(conn_open, 0, sizeof(bool)*10240); memset(conn_open, 0, sizeof(bool)*10240);
memset(dcb_fake_write_errno, 0, sizeof(unsigned char)*10240); memset(dcb_fake_write_errno, 0, sizeof(unsigned char)*10240);
@ -980,7 +1000,8 @@ int main(int argc, char **argv)
goto return_main; goto return_main;
} }
} }
while ((opt = getopt(argc, argv, "dc:f:h")) != -1) while ((opt = getopt_long(argc, argv, "dc:f:l:v?",
long_options, &option_index)) != -1)
{ {
bool succp = true; bool succp = true;
@ -1062,8 +1083,35 @@ int main(int argc, char **argv)
} }
break; break;
case 'v':
rc = EXIT_SUCCESS;
goto return_main;
case 'l':
if (strncasecmp(optarg, "file") == 0)
logtofile = 1;
else if (strncasecmp(optarg, "shm") == 0)
logtofile = 0;
else
{
char* logerr = "Configuration file argument "
"identifier \'-l\' was specified but "
"the argument didn't specify\n a valid "
"configuration file or the argument "
"was missing.";
print_log_n_stderr(true, true, logerr, logerr, 0);
usage();
succp = false;
}
break;
case '?':
usage();
rc = EXIT_SUCCESS;
goto return_main;
default: default:
usage(); usage();
succp = false; succp = false;
break; break;
} }
@ -1345,12 +1393,23 @@ int main(int argc, char **argv)
argv[0] = "MaxScale"; argv[0] = "MaxScale";
argv[1] = "-j"; argv[1] = "-j";
argv[2] = buf; argv[2] = buf;
argv[3] = "-s"; /*< store to shared memory */ if (logtofile)
argv[4] = "LOGFILE_DEBUG,LOGFILE_TRACE"; /*< ..these logs to shm */ {
argv[5] = "-l"; /*< write to syslog */ argv[3] = "-l"; /*< write to syslog */
argv[6] = "LOGFILE_MESSAGE,LOGFILE_ERROR"; /*< ..these logs to syslog */ argv[4] = "LOGFILE_MESSAGE,LOGFILE_ERROR"
argv[7] = NULL; "LOGFILE_DEBUG,LOGFILE_TRACE";
skygw_logmanager_init(7, argv); argv[5] = NULL;
skygw_logmanager_init(5, argv);
}
else
{
argv[3] = "-s"; /*< store to shared memory */
argv[4] = "LOGFILE_DEBUG,LOGFILE_TRACE"; /*< ..these logs to shm */
argv[5] = "-l"; /*< write to syslog */
argv[6] = "LOGFILE_MESSAGE,LOGFILE_ERROR"; /*< ..these logs to syslog */
argv[7] = NULL;
skygw_logmanager_init(7, argv);
}
} }
/*< /*<
@ -1629,8 +1688,13 @@ static void log_flush_cb(
static void unlink_pidfile(void) static void unlink_pidfile(void)
{ {
if (strlen(pidfile)) { if (strlen(pidfile)) {
if (unlink(pidfile)) { if (unlink(pidfile))
fprintf(stderr, "MaxScale failed to remove pidfile %s: error %d, %s\n", pidfile, errno, strerror(errno)); {
fprintf(stderr,
"MaxScale failed to remove pidfile %s: error %d, %s\n",
pidfile,
errno,
strerror(errno));
} }
} }
} }

View File

@ -17,6 +17,7 @@
*/ */
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#include <hint.h> #include <hint.h>
/** /**

View File

@ -16,6 +16,7 @@
* Copyright SkySQL Ab 2014 * Copyright SkySQL Ab 2014
*/ */
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#include <housekeeper.h> #include <housekeeper.h>
#include <thread.h> #include <thread.h>
#include <spinlock.h> #include <spinlock.h>

View File

@ -125,7 +125,7 @@ unsigned char *ptr;
/** /**
* Replace the contents of a GWBUF with the new SQL statement passed as a text string. * Replace the contents of a GWBUF with the new SQL statement passed as a text string.
* The routine takes care of the modification needed to the MySQL packet, * The routine takes care of the modification needed to the MySQL packet,
* returning a GWBUF chian that cna be used to send the data to a MySQL server * returning a GWBUF chain that can be used to send the data to a MySQL server
* *
* @param orig The original request in a GWBUF * @param orig The original request in a GWBUF
* @param sql The SQL text to replace in the packet * @param sql The SQL text to replace in the packet

View File

@ -60,8 +60,9 @@ MONITOR *mon;
{ {
return NULL; return NULL;
} }
mon->state = MONITOR_STATE_ALLOC;
mon->name = strdup(name); mon->name = strdup(name);
if ((mon->module = load_module(module, MODULE_MONITOR)) == NULL) if ((mon->module = load_module(module, MODULE_MONITOR)) == NULL)
{ {
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
@ -73,7 +74,8 @@ MONITOR *mon;
return NULL; return NULL;
} }
mon->handle = (*mon->module->startMonitor)(NULL); mon->handle = (*mon->module->startMonitor)(NULL);
mon->state |= MONITOR_STATE_RUNNING; mon->state = MONITOR_STATE_RUNNING;
spinlock_acquire(&monLock); spinlock_acquire(&monLock);
mon->next = allMonitors; mon->next = allMonitors;
allMonitors = mon; allMonitors = mon;
@ -94,7 +96,7 @@ monitor_free(MONITOR *mon)
MONITOR *ptr; MONITOR *ptr;
mon->module->stopMonitor(mon->handle); mon->module->stopMonitor(mon->handle);
mon->state &= ~MONITOR_STATE_RUNNING; mon->state = MONITOR_STATE_FREED;
spinlock_acquire(&monLock); spinlock_acquire(&monLock);
if (allMonitors == mon) if (allMonitors == mon)
allMonitors = mon->next; allMonitors = mon->next;
@ -121,7 +123,7 @@ void
monitorStart(MONITOR *monitor) monitorStart(MONITOR *monitor)
{ {
monitor->handle = (*monitor->module->startMonitor)(monitor->handle); monitor->handle = (*monitor->module->startMonitor)(monitor->handle);
monitor->state |= MONITOR_STATE_RUNNING; monitor->state = MONITOR_STATE_RUNNING;
} }
/** /**
@ -132,8 +134,9 @@ monitorStart(MONITOR *monitor)
void void
monitorStop(MONITOR *monitor) monitorStop(MONITOR *monitor)
{ {
monitor->state = MONITOR_STATE_STOPPING;
monitor->module->stopMonitor(monitor->handle); monitor->module->stopMonitor(monitor->handle);
monitor->state &= ~MONITOR_STATE_RUNNING; monitor->state = MONITOR_STATE_STOPPED;
} }
/** /**

View File

@ -45,14 +45,25 @@ extern int lm_enabled_logfiles_bitmask;
* zombie management * zombie management
* 29/08/14 Mark Riddoch Addition of thread status data, load average * 29/08/14 Mark Riddoch Addition of thread status data, load average
* etc. * etc.
* 23/09/14 Mark Riddoch Make use of RDHUP conditional to allow CentOS 5
* builds.
* *
* @endverbatim * @endverbatim
*/ */
/**
* Control the use of mutexes for the epoll_wait call. Setting to 1 will
* cause the epoll_wait calls to be moved under a mutex. This may be useful
* for debuggign purposes but should be avoided in general use.
*/
#define MUTEX_EPOLL 0
static int epoll_fd = -1; /*< The epoll file descriptor */ static int epoll_fd = -1; /*< The epoll file descriptor */
static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */ static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */
static GWBITMASK poll_mask; static GWBITMASK poll_mask;
#if MUTEX_EPOLL
static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */ static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */
#endif
static int n_waiting = 0; /*< No. of threads in epoll_wait */ static int n_waiting = 0; /*< No. of threads in epoll_wait */
/** /**
@ -154,7 +165,9 @@ int i;
thread_data[i].state = THREAD_STOPPED; thread_data[i].state = THREAD_STOPPED;
} }
} }
#if MUTEX_EPOLL
simple_mutex_init(&epoll_wait_mutex, "epoll_wait_mutex"); simple_mutex_init(&epoll_wait_mutex, "epoll_wait_mutex");
#endif
hktask_add("Load Average", poll_loadav, NULL, POLL_LOAD_FREQ); hktask_add("Load Average", poll_loadav, NULL, POLL_LOAD_FREQ);
n_avg_samples = 15 * 60 / POLL_LOAD_FREQ; n_avg_samples = 15 * 60 / POLL_LOAD_FREQ;
@ -181,7 +194,11 @@ poll_add_dcb(DCB *dcb)
CHK_DCB(dcb); CHK_DCB(dcb);
#ifdef EPOLLRDHUP
ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET; ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET;
#else
ev.events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET;
#endif
ev.data.ptr = dcb; ev.data.ptr = dcb;
/*< /*<
@ -359,7 +376,7 @@ DCB *zombies = NULL;
thread_id))); thread_id)));
no_op = TRUE; no_op = TRUE;
} }
#if 0 #if MUTEX_EPOLL
simple_mutex_lock(&epoll_wait_mutex, TRUE); simple_mutex_lock(&epoll_wait_mutex, TRUE);
#endif #endif
if (thread_data) if (thread_data)
@ -385,7 +402,7 @@ DCB *zombies = NULL;
{ {
atomic_add(&n_waiting, -1); atomic_add(&n_waiting, -1);
if (process_zombies_only) { if (process_zombies_only) {
#if 0 #if MUTEX_EPOLL
simple_mutex_unlock(&epoll_wait_mutex); simple_mutex_unlock(&epoll_wait_mutex);
#endif #endif
goto process_zombies; goto process_zombies;
@ -413,7 +430,7 @@ DCB *zombies = NULL;
if (n_waiting == 0) if (n_waiting == 0)
atomic_add(&pollStats.n_nothreads, 1); atomic_add(&pollStats.n_nothreads, 1);
#if 0 #if MUTEX_EPOLL
simple_mutex_unlock(&epoll_wait_mutex); simple_mutex_unlock(&epoll_wait_mutex);
#endif #endif
#endif /* BLOCKINGPOLL */ #endif /* BLOCKINGPOLL */
@ -442,7 +459,7 @@ DCB *zombies = NULL;
for (i = 0; i < nfds; i++) for (i = 0; i < nfds; i++)
{ {
DCB *dcb = (DCB *)events[i].data.ptr; DCB *dcb = (DCB *)events[i].data.ptr;
__uint32_t ev = events[i].events; __uint32_t ev = events[i].events;
CHK_DCB(dcb); CHK_DCB(dcb);
@ -504,7 +521,7 @@ DCB *zombies = NULL;
#else #else
atomic_add(&pollStats.n_write, atomic_add(&pollStats.n_write,
1); 1);
dcb_pollout(dcb, thread_id); dcb_pollout(dcb, thread_id, nfds);
#endif #endif
} else { } else {
LOGIF(LD, (skygw_log_write( LOGIF(LD, (skygw_log_write(
@ -554,7 +571,7 @@ DCB *zombies = NULL;
#if MUTEX_BLOCK #if MUTEX_BLOCK
dcb->func.read(dcb); dcb->func.read(dcb);
#else #else
dcb_pollin(dcb, thread_id); dcb_pollin(dcb, thread_id, nfds);
#endif #endif
} }
#if MUTEX_BLOCK #if MUTEX_BLOCK
@ -609,9 +626,18 @@ DCB *zombies = NULL;
eno, eno,
strerror(eno)))); strerror(eno))));
atomic_add(&pollStats.n_hup, 1); atomic_add(&pollStats.n_hup, 1);
dcb->func.hangup(dcb); spinlock_acquire(&dcb->dcb_initlock);
if ((dcb->flags & DCBF_HUNG) == 0)
{
dcb->flags |= DCBF_HUNG;
spinlock_release(&dcb->dcb_initlock);
dcb->func.hangup(dcb);
}
else
spinlock_release(&dcb->dcb_initlock);
} }
#ifdef EPOLLRDHUP
if (ev & EPOLLRDHUP) if (ev & EPOLLRDHUP)
{ {
int eno = 0; int eno = 0;
@ -628,17 +654,26 @@ DCB *zombies = NULL;
eno, eno,
strerror(eno)))); strerror(eno))));
atomic_add(&pollStats.n_hup, 1); atomic_add(&pollStats.n_hup, 1);
dcb->func.hangup(dcb); spinlock_acquire(&dcb->dcb_initlock);
if ((dcb->flags & DCBF_HUNG) == 0)
{
dcb->flags |= DCBF_HUNG;
spinlock_release(&dcb->dcb_initlock);
dcb->func.hangup(dcb);
}
else
spinlock_release(&dcb->dcb_initlock);
} }
#endif
} /*< for */ } /*< for */
no_op = FALSE; no_op = FALSE;
} } /*< if (nfds > 0) */
process_zombies: process_zombies:
if (thread_data) if (thread_data)
{ {
thread_data[thread_id].state = THREAD_ZPROCESSING; thread_data[thread_id].state = THREAD_ZPROCESSING;
} }
zombies = dcb_process_zombies(thread_id); zombies = dcb_process_zombies(thread_id, NULL);
if (zombies == NULL) { if (zombies == NULL) {
process_zombies_only = false; process_zombies_only = false;
@ -655,6 +690,8 @@ DCB *zombies = NULL;
thread_data[thread_id].state = THREAD_STOPPED; thread_data[thread_id].state = THREAD_STOPPED;
} }
bitmask_clear(&poll_mask, thread_id); bitmask_clear(&poll_mask, thread_id);
/** Release mysql thread context */
mysql_thread_end();
return; return;
} }
if (thread_data) if (thread_data)
@ -662,8 +699,6 @@ DCB *zombies = NULL;
thread_data[thread_id].state = THREAD_IDLE; thread_data[thread_id].state = THREAD_IDLE;
} }
} /*< while(1) */ } /*< while(1) */
/** Release mysql thread context */
mysql_thread_end();
} }
/** /**
@ -758,12 +793,14 @@ char *str;
strcat(str, "|"); strcat(str, "|");
strcat(str, "HUP"); strcat(str, "HUP");
} }
#ifdef EPOLLRDHUP
if (event & EPOLLRDHUP) if (event & EPOLLRDHUP)
{ {
if (*str) if (*str)
strcat(str, "|"); strcat(str, "|");
strcat(str, "RDHUP"); strcat(str, "RDHUP");
} }
#endif
return str; return str;
} }

View File

@ -271,8 +271,8 @@ int fail_accept_errno;
#define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water) #define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water)
#define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water) #define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water)
void dcb_pollin(DCB *, int); void dcb_pollin(DCB *, int, int);
void dcb_pollout(DCB *, int); void dcb_pollout(DCB *, int, int);
DCB *dcb_get_zombies(void); DCB *dcb_get_zombies(void);
int gw_write( int gw_write(
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
@ -289,7 +289,7 @@ DCB *dcb_clone(DCB *);
int dcb_read(DCB *, GWBUF **); int dcb_read(DCB *, GWBUF **);
int dcb_drain_writeq(DCB *); int dcb_drain_writeq(DCB *);
void dcb_close(DCB *); void dcb_close(DCB *);
DCB *dcb_process_zombies(int); /* Process Zombies */ DCB *dcb_process_zombies(int, DCB*); /* Process Zombies except the one behind the pointer */
void printAllDCBs(); /* Debug to print all DCB in the system */ void printAllDCBs(); /* Debug to print all DCB in the system */
void printDCB(DCB *); /* Debug print routine */ void printDCB(DCB *); /* Debug print routine */
void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */ void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */
@ -317,6 +317,9 @@ void dcb_call_foreach (DCB_REASON reason);
void dcb_call_foreach ( void dcb_call_foreach (
DCB_REASON reason); DCB_REASON reason);
/* DCB flags values */ /**
#define DCBF_CLONE 0x0001 /* DCB is a clone */ * DCB flags values
*/
#define DCBF_CLONE 0x0001 /*< DCB is a clone */
#define DCBF_HUNG 0x0002 /*< Hangup has been dispatched */
#endif /* _DCB_H */ #endif /* _DCB_H */

View File

@ -69,7 +69,7 @@ typedef struct {
void (*unregisterServer)(void *, SERVER *); void (*unregisterServer)(void *, SERVER *);
void (*defaultUser)(void *, char *, char *); void (*defaultUser)(void *, char *, char *);
void (*diagnostics)(DCB *, void *); void (*diagnostics)(DCB *, void *);
void (*setInterval)(void *, unsigned long); void (*setInterval)(void *, size_t);
void (*defaultId)(void *, unsigned long); void (*defaultId)(void *, unsigned long);
void (*replicationHeartbeat)(void *, int); void (*replicationHeartbeat)(void *, int);
void (*detectStaleMaster)(void *, int); void (*detectStaleMaster)(void *, int);
@ -81,21 +81,30 @@ typedef struct {
*/ */
#define MONITOR_VERSION {1, 0, 0} #define MONITOR_VERSION {1, 0, 0}
/** Monitor's poll frequency */
#define MON_BASE_INTERVAL_MS 100
/** /**
* Monitor state bit mask values * Monitor state bit mask values
*/ */
#define MONITOR_STATE_RUNNING 0x0001 typedef enum
{
MONITOR_STATE_ALLOC = 0x00,
MONITOR_STATE_RUNNING = 0x01,
MONITOR_STATE_STOPPING = 0x02,
MONITOR_STATE_STOPPED = 0x04,
MONITOR_STATE_FREED = 0x08
} monitor_state_t;
/** /**
* Representation of the running monitor. * Representation of the running monitor.
*/ */
typedef struct monitor { typedef struct monitor {
char *name; /**< The name of the monitor module */ char *name; /**< The name of the monitor module */
unsigned int state; /**< The monitor status */ monitor_state_t state; /**< The state of the monitor */
MONITOR_OBJECT *module; /**< The "monitor object" */ MONITOR_OBJECT *module; /**< The "monitor object" */
void *handle; /**< Handle returned from startMonitor */ void *handle; /**< Handle returned from startMonitor */
int interval; /**< The monitor interval */ size_t interval; /**< The monitor interval */
struct monitor *next; /**< Next monitor in the linked list */ struct monitor *next; /**< Next monitor in the linked list */
} MONITOR; } MONITOR;

View File

@ -67,9 +67,20 @@ static void registerServer(void *, SERVER *);
static void unregisterServer(void *, SERVER *); static void unregisterServer(void *, SERVER *);
static void defaultUsers(void *, char *, char *); static void defaultUsers(void *, char *, char *);
static void diagnostics(DCB *, void *); static void diagnostics(DCB *, void *);
static void setInterval(void *, unsigned long); static void setInterval(void *, size_t);
static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUsers, diagnostics, setInterval, NULL, NULL, NULL }; static MONITOR_OBJECT MyObject = {
startMonitor,
stopMonitor,
registerServer,
unregisterServer,
defaultUsers,
diagnostics,
setInterval,
NULL,
NULL,
NULL,
};
/** /**
* Implementation of the mandatory version entry point * Implementation of the mandatory version entry point
@ -413,6 +424,7 @@ monitorMain(void *arg)
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
MONITOR_SERVERS *ptr; MONITOR_SERVERS *ptr;
long master_id; long master_id;
size_t nrounds = 0;
if (mysql_thread_init()) if (mysql_thread_init())
{ {
@ -423,10 +435,9 @@ long master_id;
return; return;
} }
handle->status = MONITOR_RUNNING; handle->status = MONITOR_RUNNING;
while (1) while (1)
{ {
master_id = -1;
if (handle->shutdown) if (handle->shutdown)
{ {
handle->status = MONITOR_STOPPING; handle->status = MONITOR_STOPPING;
@ -434,7 +445,16 @@ long master_id;
handle->status = MONITOR_STOPPED; handle->status = MONITOR_STOPPED;
return; return;
} }
/** Wait base interval */
thread_millisleep(MON_BASE_INTERVAL_MS);
nrounds += 1;
/** If monitor interval time isn't consumed skip checks */
if ((nrounds*MON_BASE_INTERVAL_MS)%handle->interval != 0)
{
continue;
}
master_id = -1;
ptr = handle->databases; ptr = handle->databases;
while (ptr) while (ptr)
@ -491,7 +511,6 @@ long master_id;
ptr = ptr->next; ptr = ptr->next;
} }
thread_millisleep(handle->interval);
} }
} }
@ -502,7 +521,7 @@ long master_id;
* @param interval The interval to set in monitor struct, in milliseconds * @param interval The interval to set in monitor struct, in milliseconds
*/ */
static void static void
setInterval(void *arg, unsigned long interval) setInterval(void *arg, size_t interval)
{ {
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
memcpy(&handle->interval, &interval, sizeof(unsigned long)); memcpy(&handle->interval, &interval, sizeof(unsigned long));

View File

@ -80,7 +80,7 @@ static void registerServer(void *, SERVER *);
static void unregisterServer(void *, SERVER *); static void unregisterServer(void *, SERVER *);
static void defaultUser(void *, char *, char *); static void defaultUser(void *, char *, char *);
static void diagnostics(DCB *, void *); static void diagnostics(DCB *, void *);
static void setInterval(void *, unsigned long); static void setInterval(void *, size_t);
static void defaultId(void *, unsigned long); static void defaultId(void *, unsigned long);
static void replicationHeartbeat(void *, int); static void replicationHeartbeat(void *, int);
static void detectStaleMaster(void *, int); static void detectStaleMaster(void *, int);
@ -95,7 +95,18 @@ static int add_slave_to_master(long *, int, long);
static void monitor_set_pending_status(MONITOR_SERVERS *, int); static void monitor_set_pending_status(MONITOR_SERVERS *, int);
static void monitor_clear_pending_status(MONITOR_SERVERS *, int); static void monitor_clear_pending_status(MONITOR_SERVERS *, int);
static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUser, diagnostics, setInterval, defaultId, replicationHeartbeat, detectStaleMaster }; static MONITOR_OBJECT MyObject = {
startMonitor,
stopMonitor,
registerServer,
unregisterServer,
defaultUser,
diagnostics,
setInterval,
defaultId,
replicationHeartbeat,
detectStaleMaster
};
/** /**
* Implementation of the mandatory version entry point * Implementation of the mandatory version entry point
@ -577,6 +588,7 @@ int replication_heartbeat = handle->replicationHeartbeat;
int detect_stale_master = handle->detectStaleMaster; int detect_stale_master = handle->detectStaleMaster;
int num_servers=0; int num_servers=0;
MONITOR_SERVERS *root_master; MONITOR_SERVERS *root_master;
size_t nrounds = 0;
if (mysql_thread_init()) if (mysql_thread_init())
{ {
@ -586,8 +598,8 @@ MONITOR_SERVERS *root_master;
"module. Exiting.\n"))); "module. Exiting.\n")));
return; return;
} }
handle->status = MONITOR_RUNNING; handle->status = MONITOR_RUNNING;
while (1) while (1)
{ {
if (handle->shutdown) if (handle->shutdown)
@ -597,6 +609,15 @@ MONITOR_SERVERS *root_master;
handle->status = MONITOR_STOPPED; handle->status = MONITOR_STOPPED;
return; return;
} }
/** Wait base interval */
thread_millisleep(MON_BASE_INTERVAL_MS);
nrounds += 1;
/** If monitor interval time isn't consumed skip checks */
if ((nrounds*MON_BASE_INTERVAL_MS)%handle->interval != 0)
{
continue;
}
/* reset num_servers */ /* reset num_servers */
num_servers = 0; num_servers = 0;
@ -686,10 +707,7 @@ MONITOR_SERVERS *root_master;
ptr = ptr->next; ptr = ptr->next;
} }
} }
} /*< while (1) */
/* wait for the configured interval */
thread_millisleep(handle->interval);
}
} }
/** /**
@ -712,7 +730,7 @@ MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
* @param interval The interval to set in monitor struct, in milliseconds * @param interval The interval to set in monitor struct, in milliseconds
*/ */
static void static void
setInterval(void *arg, unsigned long interval) setInterval(void *arg, size_t interval)
{ {
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
memcpy(&handle->interval, &interval, sizeof(unsigned long)); memcpy(&handle->interval, &interval, sizeof(unsigned long));

View File

@ -61,9 +61,20 @@ static void registerServer(void *, SERVER *);
static void unregisterServer(void *, SERVER *); static void unregisterServer(void *, SERVER *);
static void defaultUsers(void *, char *, char *); static void defaultUsers(void *, char *, char *);
static void diagnostics(DCB *, void *); static void diagnostics(DCB *, void *);
static void setInterval(void *, unsigned long); static void setInterval(void *, size_t);
static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUsers, diagnostics, setInterval, NULL, NULL, NULL }; static MONITOR_OBJECT MyObject = {
startMonitor,
stopMonitor,
registerServer,
unregisterServer,
defaultUsers,
diagnostics,
setInterval,
NULL,
NULL,
NULL
};
/** /**
* Implementation of the mandatory version entry point * Implementation of the mandatory version entry point
@ -410,6 +421,7 @@ monitorMain(void *arg)
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
MONITOR_SERVERS *ptr; MONITOR_SERVERS *ptr;
long master_id; long master_id;
size_t nrounds = 0;
if (mysql_thread_init()) if (mysql_thread_init())
{ {
@ -420,10 +432,9 @@ long master_id;
return; return;
} }
handle->status = MONITOR_RUNNING; handle->status = MONITOR_RUNNING;
while (1) while (1)
{ {
master_id = -1;
if (handle->shutdown) if (handle->shutdown)
{ {
handle->status = MONITOR_STOPPING; handle->status = MONITOR_STOPPING;
@ -432,6 +443,16 @@ long master_id;
return; return;
} }
/** Wait base interval */
thread_millisleep(MON_BASE_INTERVAL_MS);
nrounds += 1;
/** If monitor interval time isn't consumed skip checks */
if ((nrounds*MON_BASE_INTERVAL_MS)%handle->interval != 0)
{
continue;
}
master_id = -1;
ptr = handle->databases; ptr = handle->databases;
while (ptr) while (ptr)
@ -452,8 +473,6 @@ long master_id;
ptr = ptr->next; ptr = ptr->next;
} }
thread_millisleep(handle->interval);
} }
} }
@ -464,7 +483,7 @@ long master_id;
* @param interval The interval to set in monitor struct, in milliseconds * @param interval The interval to set in monitor struct, in milliseconds
*/ */
static void static void
setInterval(void *arg, unsigned long interval) setInterval(void *arg, size_t interval)
{ {
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
memcpy(&handle->interval, &interval, sizeof(unsigned long)); memcpy(&handle->interval, &interval, sizeof(unsigned long));

View File

@ -1674,7 +1674,6 @@ static int routeQuery(
{ {
rses_is_closed = true; rses_is_closed = true;
} }
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf)); ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
packet = GWBUF_DATA(querybuf); packet = GWBUF_DATA(querybuf);
@ -1702,7 +1701,6 @@ static int routeQuery(
(rses_is_closed ? "Router was closed" : (rses_is_closed ? "Router was closed" :
"Router has no backend servers where to " "Router has no backend servers where to "
"route to")))); "route to"))));
free(querybuf);
} }
goto retblock; goto retblock;
} }
@ -2196,6 +2194,16 @@ static void clientReply (
} }
bref = get_bref_from_dcb(router_cli_ses, backend_dcb); bref = get_bref_from_dcb(router_cli_ses, backend_dcb);
#if !defined(FOR_BUG548_FIX_ONLY)
/** This makes the issue becoming visible in poll.c */
if (bref == NULL)
{
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
goto lock_failed;
}
#endif
CHK_BACKEND_REF(bref); CHK_BACKEND_REF(bref);
scur = &bref->bref_sescmd_cur; scur = &bref->bref_sescmd_cur;
/** /**