diff --git a/client/Makefile b/client/Makefile index 19f38785f..22220db2d 100644 --- a/client/Makefile +++ b/client/Makefile @@ -33,7 +33,7 @@ endif CC=cc -CFLAGS=-c -Wall -g $(HISTFLAG) +CFLAGS=-c -Wall -g $(HISTFLAG) -I ../server/include SRCS= maxadmin.c diff --git a/client/maxadmin.c b/client/maxadmin.c index 683916e9f..dfb4dcfb5 100644 --- a/client/maxadmin.c +++ b/client/maxadmin.c @@ -47,6 +47,9 @@ #include #include #include +#include + +#include #ifdef HISTORY #include @@ -59,6 +62,7 @@ static int sendCommand(int so, char *cmd); static void DoSource(int so, char *cmd); static void DoUsage(); static int isquit(char *buf); +static void PrintVersion(const char *progname); #ifdef HISTORY static char * @@ -70,6 +74,16 @@ prompt(EditLine *el __attribute__((__unused__))) } #endif +static struct option long_options[] = { + {"host", required_argument, 0, 'h'}, + {"user", required_argument, 0, 'u'}, + {"password", required_argument, 0, 'p'}, + {"port", required_argument, 0, 'P'}, + {"version", no_argument, 0, 'v'}, + {"help", no_argument, 0, '?'}, + {0, 0, 0, 0} +}; + /** * The main for the maxadmin client * @@ -79,7 +93,7 @@ prompt(EditLine *el __attribute__((__unused__))) int main(int argc, char **argv) { -int i, num, rv, fatal = 0; +int i, num, rv; #ifdef HISTORY char *buf; EditLine *el = NULL; @@ -94,109 +108,36 @@ char *hostname = "localhost"; char *port = "6603"; char *user = "admin"; char *passwd = NULL; -int so, cmdlen; -char *cmd; -int argno = 0; +int so; +int option_index = 0; +char c; - cmd = malloc(1); - *cmd = 0; - cmdlen = 1; - - for (i = 1; i < argc; i++) - { - if (argv[i][0] == '-') - { - switch (argv[i][1]) - { - case 'u': /* User */ - if (argv[i][2]) - user = &(argv[i][2]); - else if (i + 1 < argc) - user = argv[++i]; - else - { - fprintf(stderr, "Missing username" - "in -u option.\n"); - fatal = 1; - } - break; - case 'p': /* Password */ - if (argv[i][2]) - passwd = &(argv[i][2]); - else if (i + 1 < argc) - passwd = argv[++i]; - else - { - fprintf(stderr, "Missing password " - "in -p option.\n"); - fatal = 1; - } - break; - case 'h': /* hostname */ - if (argv[i][2]) - hostname = &(argv[i][2]); - else if (i + 1 < argc) - hostname = argv[++i]; - else - { - fprintf(stderr, "Missing hostname value " - "in -h option.\n"); - fatal = 1; - } - break; - case 'P': /* Port */ - if (argv[i][2]) - port = &(argv[i][2]); - else if (i + 1 < argc) - port = argv[++i]; - else - { - fprintf(stderr, "Missing Port value " - "in -P option.\n"); - fatal = 1; - } - break; - case '-': - { - char *word; - - word = &argv[i][2]; - if (strcmp(word, "help") == 0) - { - DoUsage(); - exit(0); - } - break; - } - } - } - else - { - /* Arguments after the second argument are quoted - * to allow for quoted names on the command line - * to be passed on in quotes. - */ - if (argno++ > 1) - { - cmdlen += strlen(argv[i]) + 3; - cmd = realloc(cmd, cmdlen); - strcat(cmd, "\""); - strcat(cmd, argv[i]); - strcat(cmd, "\" "); - } - else - { - cmdlen += strlen(argv[i]) + 1; - cmd = realloc(cmd, cmdlen); - strcat(cmd, argv[i]); - strcat(cmd, " "); - } - } + while ((c = getopt_long(argc, argv, "h:p:P:u:v?", + long_options, &option_index)) + >= 0) + { + switch (c) { + case 'h': + hostname = strdup(optarg); + break; + case 'p': + passwd = strdup(optarg); + break; + case 'P': + port = strdup(optarg); + break; + case 'u': + user = strdup(optarg); + break; + case 'v': + PrintVersion(*argv); + exit(EXIT_SUCCESS); + case '?': + DoUsage(*argv); + exit(optopt ? EXIT_FAILURE : EXIT_SUCCESS); + } } - if (fatal) - exit(1); - if (passwd == NULL) { struct termios tty_attr; @@ -234,14 +175,28 @@ int argno = 0; exit(1); } - if (cmdlen > 1) - { - cmd[cmdlen - 2] = '\0'; /* Remove trailing space */ - if (access(cmd, R_OK) == 0) - DoSource(so, cmd); - else - sendCommand(so, cmd); - exit(0); + if (optind < argc) { + int i, len = 0; + char *cmd; + + for (i = optind; i < argc; i++) { + len += strlen(argv[i]) + 1; + } + + cmd = malloc(len); + strcpy(cmd, argv[optind]); + for (i = optind +1; i < argc; i++) { + strcat(cmd, " "); + strcat(cmd, argv[i]); + } + + if (access(cmd, R_OK) == 0) + DoSource(so, cmd); + else + sendCommand(so, cmd); + + free(cmd); + exit(0); } (void) setlocale(LC_CTYPE, ""); @@ -445,7 +400,7 @@ char buf[20]; * Send a comamnd using the MaxScaled protocol, display the return data * on standard output. * - * Input terminates with a lien containing jsut the text OK + * Input terminates with a lien containing just the text OK * * @param so The socket connect to MaxScale * @param cmd The command to send @@ -455,7 +410,7 @@ static int sendCommand(int so, char *cmd) { char buf[80]; -int i, j, newline = 0; +int i, j, newline = 1; write(so, cmd, strlen(cmd)); while (1) @@ -533,23 +488,34 @@ FILE *fp; return; } +/** + * Print version information + */ +static void +PrintVersion(const char *progname) +{ + printf("%s Version %s\n", progname, MAXSCALE_VERSION); +} + /** * Display the --help text. */ static void -DoUsage() +DoUsage(const char *progname) { - printf("maxadmin: The MaxScale administrative and monitor client.\n\n"); - printf("Usage: maxadmin [-u user] [-p password] [-h hostname] [-P port] [ | ]\n\n"); - printf(" -u user The user name to use for the connection, default\n"); + PrintVersion(progname); + printf("The MaxScale administrative and monitor client.\n\n"); + printf("Usage: %s [-u user] [-p password] [-h hostname] [-P port] [ | ]\n\n", progname); + printf(" -u|--user=... The user name to use for the connection, default\n"); printf(" is admin.\n"); - printf(" -p password The user password, if not given the password will\n"); + printf(" -p|--password=... The user password, if not given the password will\n"); printf(" be prompted for interactively\n"); - printf(" -h hostname The maxscale host to connecto to. The default is\n"); + printf(" -h|--hostname=... The maxscale host to connecto to. The default is\n"); printf(" localhost\n"); - printf(" -P port The port to use for the connection, the default\n"); + printf(" -P|--port=... The port to use for the connection, the default\n"); printf(" port is 6603.\n"); - printf(" --help Print this help text.\n"); + printf(" -v|--version print version information and exit\n"); + printf(" -?|--help Print this help text.\n"); printf("Any remaining arguments are treated as MaxScale commands or a file\n"); printf("containing commands to execute.\n"); } diff --git a/log_manager/log_manager.cc b/log_manager/log_manager.cc index 9bfc9dd41..2b274a610 100644 --- a/log_manager/log_manager.cc +++ b/log_manager/log_manager.cc @@ -255,7 +255,7 @@ static int logmanager_write_log( bool use_valist, bool spread_down, size_t len, - char* str, + const char* str, va_list valist); static blockbuf_t* blockbuf_init(logfile_id_t id); @@ -609,7 +609,7 @@ static int logmanager_write_log( bool use_valist, bool spread_down, size_t str_len, - char* str, + const char* str, va_list valist) { logfile_t* lf; @@ -623,7 +623,7 @@ static int logmanager_write_log( CHK_LOGMANAGER(lm); if (id < LOGFILE_FIRST || id > LOGFILE_LAST) { - char* errstr = "Invalid logfile id argument."; + const char* errstr = "Invalid logfile id argument."; /** * invalid id, since we don't have logfile yet. */ @@ -1181,7 +1181,7 @@ static bool logfile_set_enabled( CHK_LOGMANAGER(lm); if (id < LOGFILE_FIRST || id > LOGFILE_LAST) { - char* errstr = "Invalid logfile id argument."; + const char* errstr = "Invalid logfile id argument."; /** * invalid id, since we don't have logfile yet. */ @@ -1235,7 +1235,7 @@ return_succp: int skygw_log_write_flush( logfile_id_t id, - char* str, + const char* str, ...) { int err = 0; @@ -1291,7 +1291,7 @@ return_err: int skygw_log_write( logfile_id_t id, - char* str, + const char* str, ...) { int err = 0; diff --git a/log_manager/log_manager.h b/log_manager/log_manager.h index d9932d640..6a4c1d6cc 100644 --- a/log_manager/log_manager.h +++ b/log_manager/log_manager.h @@ -72,9 +72,9 @@ void skygw_logmanager_exit(void); * free private write buffer list */ void skygw_log_done(void); -int skygw_log_write(logfile_id_t id, char* format, ...); +int skygw_log_write(logfile_id_t id, const char* format, ...); int skygw_log_flush(logfile_id_t id); -int skygw_log_write_flush(logfile_id_t id, char* format, ...); +int skygw_log_write_flush(logfile_id_t id, const char* format, ...); int skygw_log_enable(logfile_id_t id); int skygw_log_disable(logfile_id_t id); diff --git a/server/core/config.c b/server/core/config.c index 1ba2bcbf4..995e0e0f4 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -1575,6 +1575,7 @@ static char *service_params[] = "use_sql_variables_in", /*< rwsplit only */ "version_string", "filters", + "weightby", NULL }; diff --git a/server/core/dcb.c b/server/core/dcb.c index deccdb0a5..b8f9108de 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -404,17 +404,22 @@ DCB_CALLBACK *cb; * the memdata.bitmask then the DCB is no longer able to be * referenced and it can be finally removed. * + * The excluded DCB allows a thread to exclude a DCB from zombie processing. + * It is used when a thread calls dcb_process_zombies when there is + * a DCB that the caller knows it will continue processing with. + * * @param threadid The thread ID of the caller + * @param excluded The DCB the thread currently uses, NULL or valid DCB. */ DCB * -dcb_process_zombies(int threadid) +dcb_process_zombies(int threadid, DCB *excluded) { DCB *ptr, *lptr; DCB* dcb_list = NULL; DCB* dcb = NULL; bool succp = false; - /*< + /** * Perform a dirty read to see if there is anything in the queue. * This avoids threads hitting the queue spinlock when the queue * is empty. This will really help when the only entry is being @@ -424,69 +429,100 @@ bool succp = false; if (!zombies) return NULL; + /* + * Process the zombie queue and create a list of DCB's that can be + * finally freed. This processing is down under a spinlock that + * will prevent new entries being added to the zombie queue. Therefore + * we do not want to do any expensive operations under this spinlock + * as it will block other threads. The expensive operations will be + * performed on the victim queue within holding the zombie queue + * spinlock. + */ spinlock_acquire(&zombiespin); ptr = zombies; lptr = NULL; while (ptr) - { - bitmask_clear(&ptr->memdata.bitmask, threadid); - if (bitmask_isallclear(&ptr->memdata.bitmask)) - { - /*< - * Remove the DCB from the zombie queue - * and call the final free routine for the - * DCB - * - * ptr is the DCB we are processing - * lptr is the previous DCB on the zombie queue - * or NULL if the DCB is at the head of the queue - * tptr is the DCB after the one we are processing - * on the zombie queue - */ - DCB *tptr = ptr->memdata.next; - if (lptr == NULL) - zombies = tptr; - else - lptr->memdata.next = tptr; - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [dcb_process_zombies] Remove dcb %p fd %d " - "in state %s from zombies list.", - pthread_self(), - ptr, - ptr->fd, - STRDCBSTATE(ptr->state)))); - ss_info_dassert(ptr->state == DCB_STATE_ZOMBIE, - "dcb not in DCB_STATE_ZOMBIE state."); - /*< - * Move dcb to linked list of victim dcbs. - */ - if (dcb_list == NULL) { - dcb_list = ptr; - dcb = dcb_list; - } else { - dcb->memdata.next = ptr; - dcb = dcb->memdata.next; - } - dcb->memdata.next = NULL; - ptr = tptr; - } - else + { + CHK_DCB(ptr); + + /* + * Skip processing of the excluded DCB + */ + if (ptr == excluded) { lptr = ptr; ptr = ptr->memdata.next; } + else + { + + bitmask_clear(&ptr->memdata.bitmask, threadid); + + if (bitmask_isallclear(&ptr->memdata.bitmask)) + { + /** + * Remove the DCB from the zombie queue + * and call the final free routine for the + * DCB + * + * ptr is the DCB we are processing + * lptr is the previous DCB on the zombie queue + * or NULL if the DCB is at the head of the + * queue tptr is the DCB after the one we are + * processing on the zombie queue + */ + DCB *tptr = ptr->memdata.next; + if (lptr == NULL) + zombies = tptr; + else + lptr->memdata.next = tptr; + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [dcb_process_zombies] Remove dcb " + "%p fd %d " "in state %s from the " + "list of zombies.", + pthread_self(), + ptr, + ptr->fd, + STRDCBSTATE(ptr->state)))); + ss_info_dassert(ptr->state == DCB_STATE_ZOMBIE, + "dcb not in DCB_STATE_ZOMBIE state."); + /*< + * Move dcb to linked list of victim dcbs. + */ + if (dcb_list == NULL) { + dcb_list = ptr; + dcb = dcb_list; + } else { + dcb->memdata.next = ptr; + dcb = dcb->memdata.next; + } + dcb->memdata.next = NULL; + ptr = tptr; + } + else + { + lptr = ptr; + ptr = ptr->memdata.next; + } + } } spinlock_release(&zombiespin); + /* + * Process the victim queue. These are DCBs that are not in + * use by any thread. + * The corresponding file descriptor is closed, the DCB marked + * as disconnected and the DCB itself is fianlly freed. + */ dcb = dcb_list; - /*< Close, and set DISCONNECTED victims */ while (dcb != NULL) { DCB* dcb_next = NULL; int rc = 0; /*< * Close file descriptor and move to clean-up phase. */ + ss_dassert(excluded != dcb); rc = close(dcb->fd); if (rc < 0) { @@ -1108,8 +1144,8 @@ int above_water; /** * Removes dcb from poll set, and adds it to zombies list. As a consequense, * dcb first moves to DCB_STATE_NOPOLLING, and then to DCB_STATE_ZOMBIE state. - * At the end of the function state may not be DCB_STATE_ZOMBIE because once dcb_initlock - * is released parallel threads may change the state. + * At the end of the function state may not be DCB_STATE_ZOMBIE because once + * dcb_initlock is released parallel threads may change the state. * * Parameters: * @param dcb The DCB to close @@ -1928,13 +1964,15 @@ int rval = 0; * and instead implements a queuing mechanism in which nested events are * queued on the DCB such that when the thread processing the first event * returns it will read the queued event and process it. This allows the - * thread that woudl otherwise have to wait to process the nested event + * thread that would otherwise have to wait to process the nested event * to return immediately and and process other events. * * @param dcb The DCB that has data available + * @param thread_id The ID of the calling thread + * @param nozombies If non-zero then do not do zombie processing */ void -dcb_pollin(DCB *dcb, int thread_id) +dcb_pollin(DCB *dcb, int thread_id, int nozombies) { spinlock_acquire(&dcb->pollinlock); @@ -1945,7 +1983,8 @@ dcb_pollin(DCB *dcb, int thread_id) if (dcb->readcheck) { dcb->stats.n_readrechecks++; - dcb_process_zombies(thread_id); + if (!nozombies) + dcb_process_zombies(thread_id, dcb); } dcb->readcheck = 0; spinlock_release(&dcb->pollinlock); @@ -1976,9 +2015,11 @@ dcb_pollin(DCB *dcb, int thread_id) * to return immediately and and process other events. * * @param dcb The DCB thats available for writes + * @param thread_id The ID of the calling thread + * @param nozombies If non-zero then do not do zombie processing */ void -dcb_pollout(DCB *dcb, int thread_id) +dcb_pollout(DCB *dcb, int thread_id, int nozombies) { spinlock_acquire(&dcb->polloutlock); @@ -1988,7 +2029,8 @@ dcb_pollout(DCB *dcb, int thread_id) do { if (dcb->writecheck) { - dcb_process_zombies(thread_id); + if (!nozombies) + dcb_process_zombies(thread_id, dcb); dcb->stats.n_writerechecks++; } dcb->writecheck = 0; diff --git a/server/core/gateway.c b/server/core/gateway.c index 7c674ad34..79c2585fe 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -131,6 +132,17 @@ static bool libmysqld_started = FALSE; */ static bool daemon_mode = true; +const char *progname = NULL; +static struct option long_options[] = { + {"homedir", required_argument, 0, 'c'}, + {"config", required_argument, 0, 'f'}, + {"nodeamon", required_argument, 0, 'd'}, + {"log", required_argument, 0, 'l'}, + {"version", no_argument, 0, 'v'}, + {"help", no_argument, 0, '?'}, + {0, 0, 0, 0} +}; + static void log_flush_shutdown(void); static void log_flush_cb(void* arg); static int write_pid_file(char *); /* write MaxScale pidfile */ @@ -878,15 +890,19 @@ return_cnf_file_buf: return cnf_file_buf; } - static void usage(void) { fprintf(stderr, - "*\n* Usage : maxscale [-h] | [-d] [-c ] [-f ]\n* where:\n* " - "-h help\n* -d enable running in terminal process (default:disabled)\n* " - "-c relative|absolute MaxScale home directory\n* " - "-f relative|absolute pathname of MaxScale configuration file (default:MAXSCALE_HOME/etc/MaxScale.cnf)\n*\n"); + "\nUsage : %s [-h] | [-d] [-c ] [-f ]\n\n" + " -d|--nodaemon enable running in terminal process (default:disabled)\n" + " -c|--homedir=... relative|absolute MaxScale home directory\n" + " -f|--config=... relative|absolute pathname of MaxScale configuration file\n" + " (default: $MAXSCALE_HOME/etc/MaxScale.cnf)\n" + " -l|--log=... log to file or shared memory\n" + " -lfile or -lshm - defaults to shared memory\n" + " -v|--version print version info and exit\n" + " -?|--help show this help\n" + , progname); } /** @@ -943,6 +959,8 @@ int main(int argc, char **argv) char* cnf_file_path = NULL; /*< conf file, to be freed */ char* cnf_file_arg = NULL; /*< conf filename from cmd-line arg */ void* log_flush_thr = NULL; + int option_index; + int logtofile = 0; /* Use shared memory or file */ ssize_t log_flush_timeout_ms = 0; sigset_t sigset; sigset_t sigpipe_mask; @@ -954,6 +972,8 @@ int main(int argc, char **argv) sigemptyset(&sigpipe_mask); sigaddset(&sigpipe_mask, SIGPIPE); + progname = *argv; + #if defined(SS_DEBUG) memset(conn_open, 0, sizeof(bool)*10240); memset(dcb_fake_write_errno, 0, sizeof(unsigned char)*10240); @@ -980,7 +1000,8 @@ int main(int argc, char **argv) goto return_main; } } - while ((opt = getopt(argc, argv, "dc:f:h")) != -1) + while ((opt = getopt_long(argc, argv, "dc:f:l:v?", + long_options, &option_index)) != -1) { bool succp = true; @@ -1061,9 +1082,36 @@ int main(int argc, char **argv) succp = false; } break; + + case 'v': + rc = EXIT_SUCCESS; + goto return_main; + + case 'l': + if (strncasecmp(optarg, "file") == 0) + logtofile = 1; + else if (strncasecmp(optarg, "shm") == 0) + logtofile = 0; + else + { + char* logerr = "Configuration file argument " + "identifier \'-l\' was specified but " + "the argument didn't specify\n a valid " + "configuration file or the argument " + "was missing."; + print_log_n_stderr(true, true, logerr, logerr, 0); + usage(); + succp = false; + } + break; + + case '?': + usage(); + rc = EXIT_SUCCESS; + goto return_main; default: - usage(); + usage(); succp = false; break; } @@ -1345,12 +1393,23 @@ int main(int argc, char **argv) argv[0] = "MaxScale"; argv[1] = "-j"; argv[2] = buf; - argv[3] = "-s"; /*< store to shared memory */ - argv[4] = "LOGFILE_DEBUG,LOGFILE_TRACE"; /*< ..these logs to shm */ - argv[5] = "-l"; /*< write to syslog */ - argv[6] = "LOGFILE_MESSAGE,LOGFILE_ERROR"; /*< ..these logs to syslog */ - argv[7] = NULL; - skygw_logmanager_init(7, argv); + if (logtofile) + { + argv[3] = "-l"; /*< write to syslog */ + argv[4] = "LOGFILE_MESSAGE,LOGFILE_ERROR" + "LOGFILE_DEBUG,LOGFILE_TRACE"; + argv[5] = NULL; + skygw_logmanager_init(5, argv); + } + else + { + argv[3] = "-s"; /*< store to shared memory */ + argv[4] = "LOGFILE_DEBUG,LOGFILE_TRACE"; /*< ..these logs to shm */ + argv[5] = "-l"; /*< write to syslog */ + argv[6] = "LOGFILE_MESSAGE,LOGFILE_ERROR"; /*< ..these logs to syslog */ + argv[7] = NULL; + skygw_logmanager_init(7, argv); + } } /*< @@ -1629,8 +1688,13 @@ static void log_flush_cb( static void unlink_pidfile(void) { if (strlen(pidfile)) { - if (unlink(pidfile)) { - fprintf(stderr, "MaxScale failed to remove pidfile %s: error %d, %s\n", pidfile, errno, strerror(errno)); + if (unlink(pidfile)) + { + fprintf(stderr, + "MaxScale failed to remove pidfile %s: error %d, %s\n", + pidfile, + errno, + strerror(errno)); } } } diff --git a/server/core/hint.c b/server/core/hint.c index 2a463585e..2d716771a 100644 --- a/server/core/hint.c +++ b/server/core/hint.c @@ -17,6 +17,7 @@ */ #include #include +#include #include /** diff --git a/server/core/housekeeper.c b/server/core/housekeeper.c index 6180f24a5..5aa5c7a29 100644 --- a/server/core/housekeeper.c +++ b/server/core/housekeeper.c @@ -16,6 +16,7 @@ * Copyright SkySQL Ab 2014 */ #include +#include #include #include #include diff --git a/server/core/modutil.c b/server/core/modutil.c index e2800b849..7247f181d 100644 --- a/server/core/modutil.c +++ b/server/core/modutil.c @@ -125,7 +125,7 @@ unsigned char *ptr; /** * Replace the contents of a GWBUF with the new SQL statement passed as a text string. * The routine takes care of the modification needed to the MySQL packet, - * returning a GWBUF chian that cna be used to send the data to a MySQL server + * returning a GWBUF chain that can be used to send the data to a MySQL server * * @param orig The original request in a GWBUF * @param sql The SQL text to replace in the packet @@ -225,4 +225,4 @@ char* modutil_get_query( } /*< switch */ retblock: return query_str; -} \ No newline at end of file +} diff --git a/server/core/monitor.c b/server/core/monitor.c index 6227afd5d..3504be917 100644 --- a/server/core/monitor.c +++ b/server/core/monitor.c @@ -60,8 +60,9 @@ MONITOR *mon; { return NULL; } - + mon->state = MONITOR_STATE_ALLOC; mon->name = strdup(name); + if ((mon->module = load_module(module, MODULE_MONITOR)) == NULL) { LOGIF(LE, (skygw_log_write_flush( @@ -73,7 +74,8 @@ MONITOR *mon; return NULL; } mon->handle = (*mon->module->startMonitor)(NULL); - mon->state |= MONITOR_STATE_RUNNING; + mon->state = MONITOR_STATE_RUNNING; + spinlock_acquire(&monLock); mon->next = allMonitors; allMonitors = mon; @@ -94,7 +96,7 @@ monitor_free(MONITOR *mon) MONITOR *ptr; mon->module->stopMonitor(mon->handle); - mon->state &= ~MONITOR_STATE_RUNNING; + mon->state = MONITOR_STATE_FREED; spinlock_acquire(&monLock); if (allMonitors == mon) allMonitors = mon->next; @@ -121,7 +123,7 @@ void monitorStart(MONITOR *monitor) { monitor->handle = (*monitor->module->startMonitor)(monitor->handle); - monitor->state |= MONITOR_STATE_RUNNING; + monitor->state = MONITOR_STATE_RUNNING; } /** @@ -132,8 +134,9 @@ monitorStart(MONITOR *monitor) void monitorStop(MONITOR *monitor) { + monitor->state = MONITOR_STATE_STOPPING; monitor->module->stopMonitor(monitor->handle); - monitor->state &= ~MONITOR_STATE_RUNNING; + monitor->state = MONITOR_STATE_STOPPED; } /** diff --git a/server/core/poll.c b/server/core/poll.c index 2df68fd60..15e4acadb 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -45,14 +45,25 @@ extern int lm_enabled_logfiles_bitmask; * zombie management * 29/08/14 Mark Riddoch Addition of thread status data, load average * etc. + * 23/09/14 Mark Riddoch Make use of RDHUP conditional to allow CentOS 5 + * builds. * * @endverbatim */ +/** + * Control the use of mutexes for the epoll_wait call. Setting to 1 will + * cause the epoll_wait calls to be moved under a mutex. This may be useful + * for debuggign purposes but should be avoided in general use. + */ +#define MUTEX_EPOLL 0 + static int epoll_fd = -1; /*< The epoll file descriptor */ static int do_shutdown = 0; /*< Flag the shutdown of the poll subsystem */ static GWBITMASK poll_mask; +#if MUTEX_EPOLL static simple_mutex_t epoll_wait_mutex; /*< serializes calls to epoll_wait */ +#endif static int n_waiting = 0; /*< No. of threads in epoll_wait */ /** @@ -154,7 +165,9 @@ int i; thread_data[i].state = THREAD_STOPPED; } } +#if MUTEX_EPOLL simple_mutex_init(&epoll_wait_mutex, "epoll_wait_mutex"); +#endif hktask_add("Load Average", poll_loadav, NULL, POLL_LOAD_FREQ); n_avg_samples = 15 * 60 / POLL_LOAD_FREQ; @@ -180,8 +193,12 @@ poll_add_dcb(DCB *dcb) struct epoll_event ev; CHK_DCB(dcb); - + +#ifdef EPOLLRDHUP ev.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLHUP | EPOLLET; +#else + ev.events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET; +#endif ev.data.ptr = dcb; /*< @@ -359,7 +376,7 @@ DCB *zombies = NULL; thread_id))); no_op = TRUE; } -#if 0 +#if MUTEX_EPOLL simple_mutex_lock(&epoll_wait_mutex, TRUE); #endif if (thread_data) @@ -385,7 +402,7 @@ DCB *zombies = NULL; { atomic_add(&n_waiting, -1); if (process_zombies_only) { -#if 0 +#if MUTEX_EPOLL simple_mutex_unlock(&epoll_wait_mutex); #endif goto process_zombies; @@ -413,7 +430,7 @@ DCB *zombies = NULL; if (n_waiting == 0) atomic_add(&pollStats.n_nothreads, 1); -#if 0 +#if MUTEX_EPOLL simple_mutex_unlock(&epoll_wait_mutex); #endif #endif /* BLOCKINGPOLL */ @@ -442,7 +459,7 @@ DCB *zombies = NULL; for (i = 0; i < nfds; i++) { - DCB *dcb = (DCB *)events[i].data.ptr; + DCB *dcb = (DCB *)events[i].data.ptr; __uint32_t ev = events[i].events; CHK_DCB(dcb); @@ -504,7 +521,7 @@ DCB *zombies = NULL; #else atomic_add(&pollStats.n_write, 1); - dcb_pollout(dcb, thread_id); + dcb_pollout(dcb, thread_id, nfds); #endif } else { LOGIF(LD, (skygw_log_write( @@ -554,7 +571,7 @@ DCB *zombies = NULL; #if MUTEX_BLOCK dcb->func.read(dcb); #else - dcb_pollin(dcb, thread_id); + dcb_pollin(dcb, thread_id, nfds); #endif } #if MUTEX_BLOCK @@ -609,9 +626,18 @@ DCB *zombies = NULL; eno, strerror(eno)))); atomic_add(&pollStats.n_hup, 1); - dcb->func.hangup(dcb); + spinlock_acquire(&dcb->dcb_initlock); + if ((dcb->flags & DCBF_HUNG) == 0) + { + dcb->flags |= DCBF_HUNG; + spinlock_release(&dcb->dcb_initlock); + dcb->func.hangup(dcb); + } + else + spinlock_release(&dcb->dcb_initlock); } +#ifdef EPOLLRDHUP if (ev & EPOLLRDHUP) { int eno = 0; @@ -628,17 +654,26 @@ DCB *zombies = NULL; eno, strerror(eno)))); atomic_add(&pollStats.n_hup, 1); - dcb->func.hangup(dcb); + spinlock_acquire(&dcb->dcb_initlock); + if ((dcb->flags & DCBF_HUNG) == 0) + { + dcb->flags |= DCBF_HUNG; + spinlock_release(&dcb->dcb_initlock); + dcb->func.hangup(dcb); + } + else + spinlock_release(&dcb->dcb_initlock); } +#endif } /*< for */ no_op = FALSE; - } - process_zombies: + } /*< if (nfds > 0) */ +process_zombies: if (thread_data) { thread_data[thread_id].state = THREAD_ZPROCESSING; } - zombies = dcb_process_zombies(thread_id); + zombies = dcb_process_zombies(thread_id, NULL); if (zombies == NULL) { process_zombies_only = false; @@ -655,6 +690,8 @@ DCB *zombies = NULL; thread_data[thread_id].state = THREAD_STOPPED; } bitmask_clear(&poll_mask, thread_id); + /** Release mysql thread context */ + mysql_thread_end(); return; } if (thread_data) @@ -662,8 +699,6 @@ DCB *zombies = NULL; thread_data[thread_id].state = THREAD_IDLE; } } /*< while(1) */ - /** Release mysql thread context */ - mysql_thread_end(); } /** @@ -758,12 +793,14 @@ char *str; strcat(str, "|"); strcat(str, "HUP"); } +#ifdef EPOLLRDHUP if (event & EPOLLRDHUP) { if (*str) strcat(str, "|"); strcat(str, "RDHUP"); } +#endif return str; } diff --git a/server/include/dcb.h b/server/include/dcb.h index 723acec5d..06687f349 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -271,8 +271,8 @@ int fail_accept_errno; #define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water) #define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water) -void dcb_pollin(DCB *, int); -void dcb_pollout(DCB *, int); +void dcb_pollin(DCB *, int, int); +void dcb_pollout(DCB *, int, int); DCB *dcb_get_zombies(void); int gw_write( #if defined(SS_DEBUG) @@ -289,7 +289,7 @@ DCB *dcb_clone(DCB *); int dcb_read(DCB *, GWBUF **); int dcb_drain_writeq(DCB *); void dcb_close(DCB *); -DCB *dcb_process_zombies(int); /* Process Zombies */ +DCB *dcb_process_zombies(int, DCB*); /* Process Zombies except the one behind the pointer */ void printAllDCBs(); /* Debug to print all DCB in the system */ void printDCB(DCB *); /* Debug print routine */ void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */ @@ -317,6 +317,9 @@ void dcb_call_foreach (DCB_REASON reason); void dcb_call_foreach ( DCB_REASON reason); -/* DCB flags values */ -#define DCBF_CLONE 0x0001 /* DCB is a clone */ +/** + * DCB flags values + */ +#define DCBF_CLONE 0x0001 /*< DCB is a clone */ +#define DCBF_HUNG 0x0002 /*< Hangup has been dispatched */ #endif /* _DCB_H */ diff --git a/server/include/monitor.h b/server/include/monitor.h index 5d018c16f..04337d761 100644 --- a/server/include/monitor.h +++ b/server/include/monitor.h @@ -69,7 +69,7 @@ typedef struct { void (*unregisterServer)(void *, SERVER *); void (*defaultUser)(void *, char *, char *); void (*diagnostics)(DCB *, void *); - void (*setInterval)(void *, unsigned long); + void (*setInterval)(void *, size_t); void (*defaultId)(void *, unsigned long); void (*replicationHeartbeat)(void *, int); void (*detectStaleMaster)(void *, int); @@ -81,21 +81,30 @@ typedef struct { */ #define MONITOR_VERSION {1, 0, 0} +/** Monitor's poll frequency */ +#define MON_BASE_INTERVAL_MS 100 + /** * Monitor state bit mask values */ -#define MONITOR_STATE_RUNNING 0x0001 - +typedef enum +{ + MONITOR_STATE_ALLOC = 0x00, + MONITOR_STATE_RUNNING = 0x01, + MONITOR_STATE_STOPPING = 0x02, + MONITOR_STATE_STOPPED = 0x04, + MONITOR_STATE_FREED = 0x08 +} monitor_state_t; /** * Representation of the running monitor. */ typedef struct monitor { char *name; /**< The name of the monitor module */ - unsigned int state; /**< The monitor status */ + monitor_state_t state; /**< The state of the monitor */ MONITOR_OBJECT *module; /**< The "monitor object" */ void *handle; /**< Handle returned from startMonitor */ - int interval; /**< The monitor interval */ + size_t interval; /**< The monitor interval */ struct monitor *next; /**< Next monitor in the linked list */ } MONITOR; diff --git a/server/modules/monitor/galera_mon.c b/server/modules/monitor/galera_mon.c index 211407f86..d2ad38264 100644 --- a/server/modules/monitor/galera_mon.c +++ b/server/modules/monitor/galera_mon.c @@ -67,9 +67,20 @@ static void registerServer(void *, SERVER *); static void unregisterServer(void *, SERVER *); static void defaultUsers(void *, char *, char *); static void diagnostics(DCB *, void *); -static void setInterval(void *, unsigned long); +static void setInterval(void *, size_t); -static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUsers, diagnostics, setInterval, NULL, NULL, NULL }; +static MONITOR_OBJECT MyObject = { + startMonitor, + stopMonitor, + registerServer, + unregisterServer, + defaultUsers, + diagnostics, + setInterval, + NULL, + NULL, + NULL, +}; /** * Implementation of the mandatory version entry point @@ -413,6 +424,7 @@ monitorMain(void *arg) MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; MONITOR_SERVERS *ptr; long master_id; +size_t nrounds = 0; if (mysql_thread_init()) { @@ -423,10 +435,9 @@ long master_id; return; } handle->status = MONITOR_RUNNING; + while (1) { - master_id = -1; - if (handle->shutdown) { handle->status = MONITOR_STOPPING; @@ -434,7 +445,16 @@ long master_id; handle->status = MONITOR_STOPPED; return; } - + /** Wait base interval */ + thread_millisleep(MON_BASE_INTERVAL_MS); + nrounds += 1; + + /** If monitor interval time isn't consumed skip checks */ + if ((nrounds*MON_BASE_INTERVAL_MS)%handle->interval != 0) + { + continue; + } + master_id = -1; ptr = handle->databases; while (ptr) @@ -491,7 +511,6 @@ long master_id; ptr = ptr->next; } - thread_millisleep(handle->interval); } } @@ -502,7 +521,7 @@ long master_id; * @param interval The interval to set in monitor struct, in milliseconds */ static void -setInterval(void *arg, unsigned long interval) +setInterval(void *arg, size_t interval) { MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; memcpy(&handle->interval, &interval, sizeof(unsigned long)); diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index 976aa315d..d7e0b34bd 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -80,7 +80,7 @@ static void registerServer(void *, SERVER *); static void unregisterServer(void *, SERVER *); static void defaultUser(void *, char *, char *); static void diagnostics(DCB *, void *); -static void setInterval(void *, unsigned long); +static void setInterval(void *, size_t); static void defaultId(void *, unsigned long); static void replicationHeartbeat(void *, int); static void detectStaleMaster(void *, int); @@ -95,7 +95,18 @@ static int add_slave_to_master(long *, int, long); static void monitor_set_pending_status(MONITOR_SERVERS *, int); static void monitor_clear_pending_status(MONITOR_SERVERS *, int); -static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUser, diagnostics, setInterval, defaultId, replicationHeartbeat, detectStaleMaster }; +static MONITOR_OBJECT MyObject = { + startMonitor, + stopMonitor, + registerServer, + unregisterServer, + defaultUser, + diagnostics, + setInterval, + defaultId, + replicationHeartbeat, + detectStaleMaster +}; /** * Implementation of the mandatory version entry point @@ -577,6 +588,7 @@ int replication_heartbeat = handle->replicationHeartbeat; int detect_stale_master = handle->detectStaleMaster; int num_servers=0; MONITOR_SERVERS *root_master; +size_t nrounds = 0; if (mysql_thread_init()) { @@ -586,8 +598,8 @@ MONITOR_SERVERS *root_master; "module. Exiting.\n"))); return; } - handle->status = MONITOR_RUNNING; + while (1) { if (handle->shutdown) @@ -597,6 +609,15 @@ MONITOR_SERVERS *root_master; handle->status = MONITOR_STOPPED; return; } + /** Wait base interval */ + thread_millisleep(MON_BASE_INTERVAL_MS); + nrounds += 1; + + /** If monitor interval time isn't consumed skip checks */ + if ((nrounds*MON_BASE_INTERVAL_MS)%handle->interval != 0) + { + continue; + } /* reset num_servers */ num_servers = 0; @@ -686,10 +707,7 @@ MONITOR_SERVERS *root_master; ptr = ptr->next; } } - - /* wait for the configured interval */ - thread_millisleep(handle->interval); - } + } /*< while (1) */ } /** @@ -704,7 +722,7 @@ defaultId(void *arg, unsigned long id) MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; memcpy(&handle->id, &id, sizeof(unsigned long)); } - + /** * Set the monitor sampling interval. * @@ -712,7 +730,7 @@ MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; * @param interval The interval to set in monitor struct, in milliseconds */ static void -setInterval(void *arg, unsigned long interval) +setInterval(void *arg, size_t interval) { MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; memcpy(&handle->interval, &interval, sizeof(unsigned long)); diff --git a/server/modules/monitor/ndbcluster_mon.c b/server/modules/monitor/ndbcluster_mon.c index 840e30691..2e009e000 100644 --- a/server/modules/monitor/ndbcluster_mon.c +++ b/server/modules/monitor/ndbcluster_mon.c @@ -61,9 +61,20 @@ static void registerServer(void *, SERVER *); static void unregisterServer(void *, SERVER *); static void defaultUsers(void *, char *, char *); static void diagnostics(DCB *, void *); -static void setInterval(void *, unsigned long); +static void setInterval(void *, size_t); -static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUsers, diagnostics, setInterval, NULL, NULL, NULL }; +static MONITOR_OBJECT MyObject = { + startMonitor, + stopMonitor, + registerServer, + unregisterServer, + defaultUsers, + diagnostics, + setInterval, + NULL, + NULL, + NULL +}; /** * Implementation of the mandatory version entry point @@ -410,6 +421,7 @@ monitorMain(void *arg) MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; MONITOR_SERVERS *ptr; long master_id; +size_t nrounds = 0; if (mysql_thread_init()) { @@ -420,10 +432,9 @@ long master_id; return; } handle->status = MONITOR_RUNNING; + while (1) { - master_id = -1; - if (handle->shutdown) { handle->status = MONITOR_STOPPING; @@ -432,6 +443,16 @@ long master_id; return; } + /** Wait base interval */ + thread_millisleep(MON_BASE_INTERVAL_MS); + nrounds += 1; + + /** If monitor interval time isn't consumed skip checks */ + if ((nrounds*MON_BASE_INTERVAL_MS)%handle->interval != 0) + { + continue; + } + master_id = -1; ptr = handle->databases; while (ptr) @@ -452,8 +473,6 @@ long master_id; ptr = ptr->next; } - - thread_millisleep(handle->interval); } } @@ -464,7 +483,7 @@ long master_id; * @param interval The interval to set in monitor struct, in milliseconds */ static void -setInterval(void *arg, unsigned long interval) +setInterval(void *arg, size_t interval) { MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; memcpy(&handle->interval, &interval, sizeof(unsigned long)); diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 967f332a1..451b29742 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -1674,7 +1674,6 @@ static int routeQuery( { rses_is_closed = true; } - ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf)); packet = GWBUF_DATA(querybuf); @@ -1702,7 +1701,6 @@ static int routeQuery( (rses_is_closed ? "Router was closed" : "Router has no backend servers where to " "route to")))); - free(querybuf); } goto retblock; } @@ -2195,7 +2193,17 @@ static void clientReply ( goto lock_failed; } bref = get_bref_from_dcb(router_cli_ses, backend_dcb); - + +#if !defined(FOR_BUG548_FIX_ONLY) + /** This makes the issue becoming visible in poll.c */ + if (bref == NULL) + { + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); + goto lock_failed; + } +#endif + CHK_BACKEND_REF(bref); scur = &bref->bref_sescmd_cur; /**