diff --git a/CMakeLists.txt b/CMakeLists.txt index 23dd646e1..855b60294 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -152,7 +152,7 @@ add_custom_target(buildtests add_custom_target(testall COMMAND ${CMAKE_COMMAND} -DDEPS_OK=Y -DBUILD_TESTS=Y -DBUILD_TYPE=Debug -DINSTALL_DIR=${CMAKE_BINARY_DIR} -DINSTALL_SYSTEM_FILES=N ${CMAKE_SOURCE_DIR} COMMAND make install - COMMAND cp -vn ${CMAKE_SOURCE_DIR}/server/test/MaxScale_test.cnf ${CMAKE_BINARY_DIR}/etc/MaxScale.cnf + COMMAND cp ${CMAKE_SOURCE_DIR}/server/test/MaxScale_test.cnf ${CMAKE_BINARY_DIR}/etc/MaxScale.cnf COMMAND /bin/sh -c "${CMAKE_BINARY_DIR}/bin/maxscale -c ${CMAKE_BINARY_DIR} &>/dev/null" COMMAND /bin/sh -c "make test || echo \"Test results written to: ${CMAKE_BINARY_DIR}/Testing/Temporary/\"" COMMAND killall maxscale diff --git a/log_manager/log_manager.cc b/log_manager/log_manager.cc index 792cc934a..75d6614d9 100644 --- a/log_manager/log_manager.cc +++ b/log_manager/log_manager.cc @@ -423,9 +423,17 @@ static bool logmanager_init_nomutex( return_succp: if (err != 0) { - skygw_message_done(lm->lm_clientmes); - skygw_message_done(lm->lm_logmes); - + if (lm != NULL) + { + if (lm->lm_clientmes != NULL) + { + skygw_message_done(lm->lm_clientmes); + } + if (lm->lm_logmes != NULL) + { + skygw_message_done(lm->lm_logmes); + } + } /** This releases memory of all created objects */ logmanager_done_nomutex(); fprintf(stderr, "*\n* Error : Initializing log manager failed.\n*\n"); @@ -1895,15 +1903,18 @@ static char* form_full_file_name( fprintf(stderr, "Error : Too long file name= %d.\n", (int)fnlen); goto return_filename; } - filename = (char*)calloc(1, fnlen); - snprintf(seqnostr, s+1, "%d", seqno); - + + if (seqnostr != NULL) + { + snprintf(seqnostr, s+1, "%d", seqno); + } + for (i=0, p=parts; p->sp_string != NULL; i++, p=p->sp_next) { - if (i == seqnoidx) + if (seqnostr != NULL && i == seqnoidx) { - strcat(filename, seqnostr); + strcat(filename, seqnostr); /*< add sequence number */ } strcat(filename, p->sp_string); diff --git a/log_manager/test/testorder.c b/log_manager/test/testorder.c index 047e57966..f9bf14213 100644 --- a/log_manager/test/testorder.c +++ b/log_manager/test/testorder.c @@ -45,8 +45,9 @@ int main(int argc, char** argv) } block_size = atoi(argv[3]); - if(block_size < 1){ - fprintf(stderr,"Message size too small, must be at least 1 byte long."); + if(block_size < 1 || block_size > 1024){ + fprintf(stderr,"Message size too small or large, must be at least 1 byte long and must not exceed 1024 bytes."); + return 1; } diff --git a/macros.cmake b/macros.cmake index d8f1b7827..812bfc94d 100644 --- a/macros.cmake +++ b/macros.cmake @@ -27,7 +27,7 @@ macro(set_variables) set(TEST_HOST "127.0.0.1" CACHE STRING "hostname or IP address of MaxScale's host") # port of read connection router module - set(TEST_PORT_RW "4008" CACHE STRING "port of read connection router module") + set(TEST_PORT "4008" CACHE STRING "port of read connection router module") # port of read/write split router module set(TEST_PORT_RW "4006" CACHE STRING "port of read/write split router module") @@ -38,6 +38,9 @@ macro(set_variables) # master test server server_id set(TEST_MASTER_ID "3000" CACHE STRING "master test server server_id") + # master test server port + set(MASTER_PORT "3000" CACHE STRING "master test server port") + # username of MaxScale user set(TEST_USER "maxuser" CACHE STRING "username of MaxScale user") @@ -202,6 +205,16 @@ debugmsg("Search returned: ${MYSQL_DIR_LOC}") unset(DEB_FNC) unset(RPM_FNC) + #Find the MySQL client library + find_library(MYSQLCLIENT_LIBRARIES NAMES mysqlclient PATH_SUFFIXES mysql mariadb) + if(${MYSQLCLIENT_LIBRARIES} MATCHES "NOTFOUND") + set(MYSQLCLIENT_FOUND FALSE CACHE INTERNAL "") + message(STATUS "Cannot find MySQL client library: Login tests disabled.") + else() + set(MYSQLCLIENT_FOUND TRUE CACHE INTERNAL "") + message(STATUS "Found MySQL client library: ${MYSQLCLIENT_LIBRARIES}") + endif() + #Check RabbitMQ headers and libraries if(BUILD_RABBITMQ) include(CheckCSourceCompiles) diff --git a/query_classifier/query_classifier.cc b/query_classifier/query_classifier.cc index 833c4e0ea..aece59d79 100644 --- a/query_classifier/query_classifier.cc +++ b/query_classifier/query_classifier.cc @@ -470,11 +470,12 @@ static skygw_query_type_t resolve_query_type( * When force_data_modify_op_replication is TRUE, gateway distributes * all write operations to all nodes. */ +#if defined(NOT_IN_USE) bool force_data_modify_op_replication; - + force_data_modify_op_replication = FALSE; +#endif /* NOT_IN_USE */ ss_info_dassert(thd != NULL, ("thd is NULL\n")); - force_data_modify_op_replication = FALSE; lex = thd->lex; /** SELECT ..INTO variable|OUTFILE|DUMPFILE */ @@ -584,13 +585,15 @@ static skygw_query_type_t resolve_query_type( if (is_log_table_write_query(lex->sql_command) || is_update_query(lex->sql_command)) { +#if defined(NOT_IN_USE) if (thd->variables.sql_log_bin == 0 && force_data_modify_op_replication) { /** Not replicated */ type |= QUERY_TYPE_SESSION_WRITE; } - else + else +#endif /* NOT_IN_USE */ { /** Written to binlog, that is, replicated except tmp tables */ type |= QUERY_TYPE_WRITE; /*< to master */ @@ -1085,30 +1088,31 @@ char** skygw_get_table_names(GWBUF* querybuf,int* tblsize, bool fullnames) } } + if(tmp != NULL){ + char *catnm = NULL; - char *catnm = NULL; - - if(fullnames) - { - if(tbl->db && strcmp(tbl->db,"skygw_virtual") != 0) - { - catnm = (char*)calloc(strlen(tbl->db) + strlen(tbl->table_name) + 2,sizeof(char)); - strcpy(catnm,tbl->db); - strcat(catnm,"."); - strcat(catnm,tbl->table_name); - } - } + if(fullnames) + { + if(tbl->db && strcmp(tbl->db,"skygw_virtual") != 0) + { + catnm = (char*)calloc(strlen(tbl->db) + strlen(tbl->table_name) + 2,sizeof(char)); + strcpy(catnm,tbl->db); + strcat(catnm,"."); + strcat(catnm,tbl->table_name); + } + } - if(catnm) - { - tables[i++] = catnm; - } - else - { - tables[i++] = strdup(tbl->table_name); - } + if(catnm) + { + tables[i++] = catnm; + } + else + { + tables[i++] = strdup(tbl->table_name); + } - tbl=tbl->next_local; + tbl=tbl->next_local; + } } lex->current_select = lex->current_select->next_select_in_list(); } diff --git a/query_classifier/test/canonical_tests/canonizer.c b/query_classifier/test/canonical_tests/canonizer.c index 223f91f2a..7983c9041 100644 --- a/query_classifier/test/canonical_tests/canonizer.c +++ b/query_classifier/test/canonical_tests/canonizer.c @@ -57,6 +57,9 @@ int main(int argc, char** argv) { fgets(readbuff,4092,infile); psize = strlen(readbuff); + if(psize < 0 || psize > 4092){ + continue; + } qbuff = gwbuf_alloc(psize + 7); *(qbuff->sbuf->data + 0) = (unsigned char)psize; *(qbuff->sbuf->data + 1) = (unsigned char)(psize>>8); diff --git a/query_classifier/test/classify.c b/query_classifier/test/classify.c index 0555844c9..5ceadea9c 100644 --- a/query_classifier/test/classify.c +++ b/query_classifier/test/classify.c @@ -34,7 +34,7 @@ int main(int argc, char** argv) return 1; } int rd = 0,buffsz = getpagesize(),strsz = 0,ex_val = 0; - char buffer[buffsz], *strbuff = (char*)calloc(buffsz,sizeof(char)); + char buffer[1024], *strbuff = (char*)calloc(buffsz,sizeof(char)); FILE *input,*expected; if(mysql_library_init(num_elements, server_options, server_groups)) @@ -45,25 +45,30 @@ int main(int argc, char** argv) input = fopen(argv[1],"rb"); expected = fopen(argv[2],"rb"); - memset(buffer,0,buffsz); - while((rd = fread(buffer,sizeof(char),buffsz - 1,input))){ + + while((rd = fread(buffer,sizeof(char),1023,input))){ /**Fill the read buffer*/ + if(strsz + rd >= buffsz){ - char* tmp = (char*)calloc((buffsz*2),sizeof(char)); + + char* tmp = realloc(strbuff,(buffsz*2)*sizeof(char)); - if(!tmp){ - fprintf(stderr,"Error: Cannot allocate enough memory."); + if(tmp == NULL){ + free(strbuff); + fclose(input); + fclose(expected); + mysql_library_end(); + fprintf(stderr,"Error: Memory allocation failed."); return 1; } - memcpy(tmp,strbuff,buffsz); - free(strbuff); strbuff = tmp; buffsz *= 2; } memcpy(strbuff+strsz,buffer,rd); strsz += rd; + *(strbuff+strsz) = '\0'; char *tok,*nlptr; @@ -167,10 +172,10 @@ int main(int argc, char** argv) gwbuf_free(buff); } - memset(buffer,0,buffsz); } fclose(input); fclose(expected); + mysql_library_end(); free(strbuff); return ex_val; } diff --git a/server/core/config.c b/server/core/config.c index 4ee39fcb2..b0aa25178 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -38,6 +38,8 @@ * 09/09/14 Massimiliano Pinto Added localhost_match_wildcard_host parameter * 12/09/14 Mark Riddoch Addition of checks on servers list and * internal router suppression of messages + * 30/10/14 Massimiliano Pinto Added disable_master_failback parameter + * 07/11/14 Massimiliano Pinto Addition of monitor timeouts for connect/read/write * * @endverbatim */ @@ -58,6 +60,7 @@ extern int lm_enabled_logfiles_bitmask; +extern int setipaddress(struct in_addr *, char *); static int process_config_context(CONFIG_CONTEXT *); static int process_config_update(CONFIG_CONTEXT *); static void free_config_context(CONFIG_CONTEXT *); @@ -772,6 +775,10 @@ int error_count = 0; unsigned long interval = 0; int replication_heartbeat = 0; int detect_stale_master = 0; + int disable_master_failback = 0; + int connect_timeout = 0; + int read_timeout = 0; + int write_timeout = 0; module = config_get_value(obj->parameters, "module"); servers = config_get_value(obj->parameters, "servers"); @@ -789,6 +796,20 @@ int error_count = 0; detect_stale_master = atoi(config_get_value(obj->parameters, "detect_stale_master")); } + if (config_get_value(obj->parameters, "disable_master_failback")) { + disable_master_failback = atoi(config_get_value(obj->parameters, "disable_master_failback")); + } + + if (config_get_value(obj->parameters, "backend_connect_timeout")) { + connect_timeout = atoi(config_get_value(obj->parameters, "backend_connect_timeout")); + } + if (config_get_value(obj->parameters, "backend_read_timeout")) { + read_timeout = atoi(config_get_value(obj->parameters, "backend_read_timeout")); + } + if (config_get_value(obj->parameters, "backend_write_timeout")) { + write_timeout = atoi(config_get_value(obj->parameters, "backend_write_timeout")); + } + if (module) { obj->element = monitor_alloc(obj->object, module); @@ -816,6 +837,18 @@ int error_count = 0; if(detect_stale_master == 1) monitorDetectStaleMaster(obj->element, detect_stale_master); + /* disable master failback */ + if(disable_master_failback == 1) + monitorDisableMasterFailback(obj->element, disable_master_failback); + + /* set timeouts */ + if (connect_timeout > 0) + monitorSetNetworkTimeout(obj->element, MONITOR_CONNECT_TIMEOUT, connect_timeout); + if (read_timeout > 0) + monitorSetNetworkTimeout(obj->element, MONITOR_READ_TIMEOUT, read_timeout); + if (write_timeout > 0) + monitorSetNetworkTimeout(obj->element, MONITOR_WRITE_TIMEOUT, write_timeout); + /* get the servers to monitor */ s = strtok(servers, ","); while (s) @@ -1268,7 +1301,7 @@ SERVER *server; (PERCENT_TYPE|COUNT_TYPE)); } - if (!succp) + if (!succp && param != NULL) { LOGIF(LM, (skygw_log_write( LOGFILE_MESSAGE, @@ -1359,10 +1392,10 @@ SERVER *server; serviceSetUser(obj->element, user, auth); - if (enable_root_user && service) + if (enable_root_user) serviceEnableRootUser(service, atoi(enable_root_user)); - if (allow_localhost_match_wildcard_host) + if (allow_localhost_match_wildcard_host && service) serviceEnableLocalhostMatchWildcardHost( service, atoi(allow_localhost_match_wildcard_host)); @@ -1625,6 +1658,10 @@ static char *monitor_params[] = "monitor_interval", "detect_replication_lag", "detect_stale_master", + "disable_master_failback", + "backend_connect_timeout", + "backend_read_timeout", + "backend_write_timeout", NULL }; /** diff --git a/server/core/dbusers.c b/server/core/dbusers.c index 8b40a4148..662f480a3 100644 --- a/server/core/dbusers.c +++ b/server/core/dbusers.c @@ -646,7 +646,13 @@ getUsers(SERVICE *service, USERS *users) users_data = (char *)calloc(nusers, (users_data_row_len * sizeof(char)) + 1); - if(users_data == NULL) { + if (users_data == NULL) { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Memory allocation for user data failed due to " + "%d, %s.", + errno, + strerror(errno)))); mysql_free_result(result); mysql_close(con); diff --git a/server/core/filter.c b/server/core/filter.c index a1f210354..b14f23a87 100644 --- a/server/core/filter.c +++ b/server/core/filter.c @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -328,27 +329,43 @@ DOWNSTREAM *me; if ((filter->obj = load_module(filter->module, MODULE_FILTER)) == NULL) { - return NULL; + me = NULL; + goto retblock; } } + if (filter->filter == NULL) { if ((filter->filter = (filter->obj->createInstance)(filter->options, filter->parameters)) == NULL) { - return NULL; + me = NULL; + goto retblock; } } if ((me = (DOWNSTREAM *)calloc(1, sizeof(DOWNSTREAM))) == NULL) { - return NULL; + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Memory allocation for filter session failed " + "due to %d,%s.", + errno, + strerror(errno)))); + + goto retblock; } me->instance = filter->filter; me->routeQuery = (void *)(filter->obj->routeQuery); - me->session = filter->obj->newSession(me->instance, session); - + + if ((me->session=filter->obj->newSession(me->instance, session)) == NULL) + { + free(me); + me = NULL; + goto retblock; + } filter->obj->setDownstream(me->instance, me->session, downstream); - + +retblock: return me; } diff --git a/server/core/gateway.c b/server/core/gateway.c index f6b7a18fa..79cd8c969 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -141,7 +141,7 @@ const char *progname = NULL; static struct option long_options[] = { {"homedir", required_argument, 0, 'c'}, {"config", required_argument, 0, 'f'}, - {"nodeamon", required_argument, 0, 'd'}, + {"nodaemon", no_argument, 0, 'd'}, {"log", required_argument, 0, 'l'}, {"version", no_argument, 0, 'v'}, {"help", no_argument, 0, '?'}, @@ -1002,7 +1002,7 @@ int main(int argc, char **argv) int n_services; int eno = 0; /*< local variable for errno */ int opt; - void** threads; /*< thread list */ + void** threads = NULL; /*< thread list */ char mysql_home[PATH_MAX+1]; char datadir_arg[10+PATH_MAX+1]; /*< '--datadir=' + PATH_MAX */ char language_arg[11+PATH_MAX+1]; /*< '--language=' + PATH_MAX */ @@ -1479,7 +1479,14 @@ int main(int argc, char **argv) bool succp; sprintf(buf, "%s/log", home_dir); - mkdir(buf, 0777); + if(mkdir(buf, 0777) != 0){ + + if(errno != EEXIST){ + fprintf(stderr, + "Error: Cannot create log directory: %s\n",buf); + goto return_main; + } + } argv[0] = "MaxScale"; argv[1] = "-j"; argv[2] = buf; @@ -1733,9 +1740,12 @@ int main(int argc, char **argv) unlink_pidfile(); return_main: - free(threads); - free(home_dir); - free(cnf_file_path); + if (threads) + free(threads); + if (home_dir) + free(home_dir); + if (cnf_file_path) + free(cnf_file_path); return rc; } /*< End of main */ diff --git a/server/core/load_utils.c b/server/core/load_utils.c index 9e2b36598..b7edef9e9 100644 --- a/server/core/load_utils.c +++ b/server/core/load_utils.c @@ -80,7 +80,7 @@ void * load_module(const char *module, const char *type) { char *home, *version; -char fname[MAXPATHLEN]; +char fname[MAXPATHLEN+1]; void *dlhandle, *sym; char *(*ver)(); void *(*ep)(), *modobj; @@ -94,11 +94,12 @@ MODULE_INFO *mod_info = NULL; * * Search of the shared object. */ - sprintf(fname, "./lib%s.so", module); + snprintf(fname,MAXPATHLEN+1, "./lib%s.so", module); + if (access(fname, F_OK) == -1) { home = get_maxscale_home (); - sprintf(fname, "%s/modules/lib%s.so", home, module); + snprintf(fname, MAXPATHLEN+1,"%s/modules/lib%s.so", home, module); if (access(fname, F_OK) == -1) { diff --git a/server/core/maxpasswd.c b/server/core/maxpasswd.c index 8d90a7631..0c728d869 100644 --- a/server/core/maxpasswd.c +++ b/server/core/maxpasswd.c @@ -39,7 +39,7 @@ int main(int argc, char **argv) { -char *enc; + char *enc, *pw; if (argc != 2) { @@ -47,9 +47,21 @@ char *enc; exit(1); } - if ((enc = encryptPassword(argv[1])) != NULL) + pw = calloc(81,sizeof(char)); + + if(pw == NULL){ + fprintf(stderr, "Error: cannot allocate enough memory."); + exit(1); + } + + strncpy(pw,argv[1],80); + + if ((enc = encryptPassword(pw)) != NULL){ printf("%s\n", enc); - else + }else{ fprintf(stderr, "Failed to encode the password\n"); + } + + free(pw); return 0; } diff --git a/server/core/modutil.c b/server/core/modutil.c index cd596fb2b..b9a7aab1c 100644 --- a/server/core/modutil.c +++ b/server/core/modutil.c @@ -175,6 +175,51 @@ GWBUF *addition; return orig; } + +/** + * Extract the SQL from a COM_QUERY packet and return in a NULL terminated buffer. + * The buffer shoudl be freed by the caller when it is no longer required. + * + * If the packet is not a COM_QUERY packet then the function will return NULL + * + * @param buf The buffer chain + * @return Null terminated string containing query text or NULL on error + */ +char * +modutil_get_SQL(GWBUF *buf) +{ +unsigned int len, length; +unsigned char *ptr, *dptr, *rval = NULL; + + if (!modutil_is_SQL(buf)) + return rval; + ptr = GWBUF_DATA(buf); + length = *ptr++; + length += (*ptr++ << 8); + length += (*ptr++ << 16); + + if ((rval = (char *)malloc(length + 1)) == NULL) + return NULL; + dptr = rval; + ptr += 2; // Skip sequence id and COM_QUERY byte + len = GWBUF_LENGTH(buf) - 5; + while (buf && length > 0) + { + int clen = length > len ? len : length; + memcpy(dptr, ptr, clen); + dptr += clen; + length -= clen; + buf = buf->next; + if (buf) + { + ptr = GWBUF_DATA(buf); + len = GWBUF_LENGTH(buf); + } + } + *dptr = 0; + return rval; +} + /** * Copy query string from GWBUF buffer to separate memory area. * diff --git a/server/core/monitor.c b/server/core/monitor.c index 3fb1cbb6f..b2b348e5e 100644 --- a/server/core/monitor.c +++ b/server/core/monitor.c @@ -26,6 +26,8 @@ * 08/07/13 Mark Riddoch Initial implementation * 23/05/14 Massimiliano Pinto Addition of monitor_interval parameter * and monitor id + * 30/10/14 Massimiliano Pinto Addition of disable_master_failback parameter + * 07/11/14 Massimiliano Pinto Addition of monitor network timeouts * * @endverbatim */ @@ -329,3 +331,31 @@ monitorDetectStaleMaster(MONITOR *mon, int enable) mon->module->detectStaleMaster(mon->handle, enable); } } + +/** + * Disable Master Failback + * + * @param mon The monitor instance + * @param disable The value 1 disable the failback, 0 keeps it + */ +void +monitorDisableMasterFailback(MONITOR *mon, int disable) +{ + if (mon->module->disableMasterFailback != NULL) { + mon->module->disableMasterFailback(mon->handle, disable); + } +} + +/** + * Set Monitor timeouts for connect/read/write + * + * @param mon The monitor instance + * @param type The timeout handling type + * @param value The timeout to set + */ +void +monitorSetNetworkTimeout(MONITOR *mon, int type, int value) { + if (mon->module->setNetworkTimeout != NULL) { + mon->module->setNetworkTimeout(mon->handle, type, value); + } +} diff --git a/server/core/server.c b/server/core/server.c index 3fabd2b34..dd0e8819d 100644 --- a/server/core/server.c +++ b/server/core/server.c @@ -30,7 +30,8 @@ * 28/05/14 Massimiliano Pinto Addition of rlagd and node_ts fields * 20/06/14 Massimiliano Pinto Addition of master_id, depth, slaves fields * 26/06/14 Mark Riddoch Addition of server parameters - * 30/08/14 Massimiliano Pinto Addition of new service status description + * 30/08/14 Massimiliano Pinto Addition of new service status description + * 30/10/14 Massimiliano Pinto Addition of SERVER_MASTER_STICKINESS description * * @endverbatim */ @@ -424,6 +425,8 @@ char *status = NULL; strcat(status, "Slave of External Server, "); if (server->status & SERVER_STALE_STATUS) strcat(status, "Stale Status, "); + if (server->status & SERVER_MASTER_STICKINESS) + strcat(status, "Master Stickiness, "); if (server->status & SERVER_AUTH_ERROR) strcat(status, "Auth Error, "); if (server->status & SERVER_RUNNING) diff --git a/server/core/service.c b/server/core/service.c index 52796910c..ce5261990 100644 --- a/server/core/service.c +++ b/server/core/service.c @@ -122,6 +122,13 @@ SERVICE *service; } service->name = strdup(servname); service->routerModule = strdup(router); + if (service->name == NULL || service->routerModule == NULL) + { + if (service->name) + free(service->name); + free(service); + return NULL; + } service->version_string = NULL; memset(&service->stats, 0, sizeof(SERVICE_STATS)); service->ports = NULL; @@ -196,8 +203,13 @@ GWPROTOCOL *funcs; if (port->listener == NULL) { - return 0; + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Failed to create listener for service %s.", + service->name))); + goto retblock; } + if (strcmp(port->protocol, "MySQLClient") == 0) { int loaded; @@ -206,12 +218,25 @@ GWPROTOCOL *funcs; * including hosts and db names */ service->users = mysql_users_alloc(); - loaded = load_mysql_users(service); - + + if ((loaded = load_mysql_users(service)) < 0) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to load users from %s:%d for " + "service %s.", + port->address, + port->port, + service->name))); + hashtable_free(service->users->data); + free(service->users); + dcb_free(port->listener); + port->listener = NULL; + goto retblock; + } /* At service start last update is set to USERS_REFRESH_TIME seconds earlier. * This way MaxScale could try reloading users' just after startup */ - service->rate_limit.last=time(NULL) - USERS_REFRESH_TIME; service->rate_limit.nloads=1; @@ -219,14 +244,21 @@ GWPROTOCOL *funcs; LOGFILE_MESSAGE, "Loaded %d MySQL Users for service [%s].", loaded, service->name))); - } else { + } + else + { /* Generic users table */ service->users = users_alloc(); } - if ((funcs = - (GWPROTOCOL *)load_module(port->protocol, MODULE_PROTOCOL)) == NULL) + if ((funcs=(GWPROTOCOL *)load_module(port->protocol, MODULE_PROTOCOL)) + == NULL) { + if (service->users->data) + { + hashtable_free(service->users->data); + } + free(service->users); dcb_free(port->listener); port->listener = NULL; LOGIF(LE, (skygw_log_write_flush( @@ -235,34 +267,60 @@ GWPROTOCOL *funcs; "for service %s not started.", port->protocol, service->name))); - return 0; + goto retblock; } memcpy(&(port->listener->func), funcs, sizeof(GWPROTOCOL)); port->listener->session = NULL; + if (port->address) sprintf(config_bind, "%s:%d", port->address, port->port); else sprintf(config_bind, "0.0.0.0:%d", port->port); - if (port->listener->func.listen(port->listener, config_bind)) { + if (port->listener->func.listen(port->listener, config_bind)) + { port->listener->session = session_alloc(service, port->listener); - if (port->listener->session != NULL) { + if (port->listener->session != NULL) + { port->listener->session->state = SESSION_STATE_LISTENER; listeners += 1; - } else { + } + else + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Failed to create session to service %s.", + service->name))); + + if (service->users->data) + { + hashtable_free(service->users->data); + } + free(service->users); dcb_close(port->listener); + port->listener = NULL; + goto retblock; } - } else { - dcb_close(port->listener); - + } + else + { LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Unable to start to listen port %d for %s %s.", port->port, port->protocol, service->name))); + if (service->users->data) + { + hashtable_free(service->users->data); + } + free(service->users); + dcb_close(port->listener); + port->listener = NULL; } + +retblock: return listeners; } @@ -1003,8 +1061,8 @@ int service_refresh_users(SERVICE *service) { if ( (time(NULL) < (service->rate_limit.last + USERS_REFRESH_TIME)) || (service->rate_limit.nloads > USERS_REFRESH_MAX_PER_TIME)) { LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, - "%lu [service_refresh_users] refresh rate limit exceeded loading new users' table", - pthread_self()))); + "Refresh rate limit exceeded for load of users' table for service '%s'.", + service->name))); spinlock_release(&service->users_table_spin); return 1; diff --git a/server/core/session.c b/server/core/session.c index 645ca7019..d83881532 100644 --- a/server/core/session.c +++ b/server/core/session.c @@ -668,9 +668,10 @@ int i; { LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, - "Failed to create filter '%s' for service '%s'.\n", - service->filters[i]->name, - service->name))); + "Error : Failed to create filter '%s' for " + "service '%s'.\n", + service->filters[i]->name, + service->name))); return 0; } session->filters[i].filter = service->filters[i]; diff --git a/server/core/test/testadminusers.c b/server/core/test/testadminusers.c index eb2ee8e40..ec2917783 100644 --- a/server/core/test/testadminusers.c +++ b/server/core/test/testadminusers.c @@ -269,7 +269,7 @@ int result = 0; char *home, buf[1024]; /* Unlink any existing password file before running this test */ - if ((home = getenv("MAXSCALE_HOME")) == NULL) + if ((home = getenv("MAXSCALE_HOME")) == NULL || strlen(home) >= 1024) home = "/usr/local/skysql"; sprintf(buf, "%s/etc/passwd", home); if (strcmp(buf, "/etc/passwd") != 0) diff --git a/server/core/test/testbuffer.c b/server/core/test/testbuffer.c index ee8507161..73d71dc27 100644 --- a/server/core/test/testbuffer.c +++ b/server/core/test/testbuffer.c @@ -61,7 +61,9 @@ int buflen; ss_info_dassert(0 == GWBUF_EMPTY(buffer), "Buffer should not be empty"); ss_info_dassert(GWBUF_IS_TYPE_UNDEFINED(buffer), "Buffer type should be undefined"); ss_dfprintf(stderr, "\t..done\nSet a hint for the buffer"); - hint = hint_create_parameter(NULL, strdup("name"), "value"); + char* name = strdup("name"); + hint = hint_create_parameter(NULL, name, "value"); + free(name); gwbuf_add_hint(buffer, hint); ss_info_dassert(hint == buffer->hint, "Buffer should point to first and only hint"); ss_dfprintf(stderr, "\t..done\nSet a property for the buffer"); diff --git a/server/core/test/testhash.c b/server/core/test/testhash.c index 9fc2fb9cf..89720da80 100644 --- a/server/core/test/testhash.c +++ b/server/core/test/testhash.c @@ -158,6 +158,7 @@ static bool do_hashtest( hashtable_free(h); return_succp: + free(val_arr); return succp; } diff --git a/server/core/test/testhint.c b/server/core/test/testhint.c index 3ec5d9953..0f46e915b 100644 --- a/server/core/test/testhint.c +++ b/server/core/test/testhint.c @@ -46,7 +46,9 @@ HINT *hint; /* Hint tests */ ss_dfprintf(stderr, "testhint : Add a parameter hint to a null list"); - hint = hint_create_parameter(NULL, strdup("name"), "value"); + char* name = strdup("name"); + hint = hint_create_parameter(NULL, name, "value"); + free(name); ss_info_dassert(NULL != hint, "New hint list should not be null"); ss_info_dassert(0 == strcmp("value", hint->value), "Hint value should be correct"); ss_info_dassert(0 != hint_exists(&hint, HINT_PARAMETER), "Hint of parameter type should exist"); diff --git a/server/core/test/testserver.c b/server/core/test/testserver.c index b1065ace0..5d4d130a8 100644 --- a/server/core/test/testserver.c +++ b/server/core/test/testserver.c @@ -68,7 +68,8 @@ char *status; server_set_status(server, SERVER_MASTER); status = server_status(server); ss_info_dassert(0 == strcmp("Master, Running", status), "Should find correct status."); - server_clear_status(server, SERVER_MASTER); + server_clear_status(server, SERVER_MASTER); + free(status); status = server_status(server); ss_info_dassert(0 == strcmp("Running", status), "Status of Server should be Running after master status cleared."); if (NULL != status) free(status); @@ -78,7 +79,6 @@ char *status; ss_dfprintf(stderr, "\t..done\nFreeing Server."); ss_info_dassert(0 != server_free(server), "Free should succeed"); ss_dfprintf(stderr, "\t..done\n"); - return 0; } diff --git a/server/include/modutil.h b/server/include/modutil.h index eccdb9a9b..a0daf60a9 100644 --- a/server/include/modutil.h +++ b/server/include/modutil.h @@ -37,6 +37,7 @@ extern int modutil_is_SQL(GWBUF *); extern int modutil_extract_SQL(GWBUF *, char **, int *); extern int modutil_MySQL_Query(GWBUF *, char **, int *, int *); +extern char *modutil_get_SQL(GWBUF *); extern GWBUF *modutil_replace_SQL(GWBUF *, char *); extern char *modutil_get_query(GWBUF* buf); extern int modutil_send_mysql_err_packet(DCB *, int, int, int, const char *, const char *); diff --git a/server/include/monitor.h b/server/include/monitor.h index 6123cd614..c5a9d314d 100644 --- a/server/include/monitor.h +++ b/server/include/monitor.h @@ -33,6 +33,8 @@ * 23/05/14 Massimiliano Pinto Addition of defaultId and setInterval * 23/06/14 Massimiliano Pinto Addition of replicationHeartbeat * 28/08/14 Massimiliano Pinto Addition of detectStaleMaster + * 30/10/14 Massimiliano Pinto Addition of disableMasterFailback + * 07/11/14 Massimiliano Pinto Addition of setNetworkTimeout * * @endverbatim */ @@ -70,9 +72,11 @@ typedef struct { void (*defaultUser)(void *, char *, char *); void (*diagnostics)(DCB *, void *); void (*setInterval)(void *, size_t); + void (*setNetworkTimeout)(void *, int, int); void (*defaultId)(void *, unsigned long); void (*replicationHeartbeat)(void *, int); void (*detectStaleMaster)(void *, int); + void (*disableMasterFailback)(void *, int); } MONITOR_OBJECT; /** @@ -96,6 +100,16 @@ typedef enum MONITOR_STATE_FREED = 0x08 } monitor_state_t; +/** + * Monitor network timeout types + */ +typedef enum +{ + MONITOR_CONNECT_TIMEOUT = 0, + MONITOR_READ_TIMEOUT = 1, + MONITOR_WRITE_TIMEOUT = 2 +} monitor_timeouts_t; + /** * Representation of the running monitor. */ @@ -123,4 +137,6 @@ extern void monitorSetId(MONITOR *, unsigned long); extern void monitorSetInterval (MONITOR *, unsigned long); extern void monitorSetReplicationHeartbeat(MONITOR *, int); extern void monitorDetectStaleMaster(MONITOR *, int); +extern void monitorDisableMasterFailback(MONITOR *, int); +extern void monitorSetNetworkTimeout(MONITOR *, int, int); #endif diff --git a/server/include/server.h b/server/include/server.h index b8ca445f0..1e4aa25ab 100644 --- a/server/include/server.h +++ b/server/include/server.h @@ -40,6 +40,7 @@ * 26/06/14 Mark Riddoch Adidtion of server parameters * 30/07/14 Massimiliano Pinto Addition of NDB status for MySQL Cluster * 30/08/14 Massimiliano Pinto Addition of SERVER_STALE_STATUS + * 27/10/14 Massimiliano Pinto Addition of SERVER_MASTER_STICKINESS * * @endverbatim */ @@ -105,6 +106,7 @@ typedef struct server { #define SERVER_MAINT 0x0020 /**<< Server is in maintenance mode */ #define SERVER_SLAVE_OF_EXTERNAL_MASTER 0x0040 /**<< Server is slave of a Master outside the provided replication topology */ #define SERVER_STALE_STATUS 0x0080 /**<< Server stale status, monitor didn't update it */ +#define SERVER_MASTER_STICKINESS 0x0100 /**<< Server Master stickiness */ #define SERVER_AUTH_ERROR 0x1000 /**<< Authentication erorr from monitor */ /** diff --git a/server/modules/filter/qlafilter.c b/server/modules/filter/qlafilter.c index 43d07b613..0f669b060 100644 --- a/server/modules/filter/qlafilter.c +++ b/server/modules/filter/qlafilter.c @@ -202,7 +202,7 @@ int i; free(my_instance->filebase); my_instance->filebase = NULL; } - my_instance->source = strdup(params[i]->value); + my_instance->filebase = strdup(params[i]->value); } else if (!filter_standard_parameter(params[i]->name)) { @@ -408,7 +408,7 @@ struct timeval tv; { queue = gwbuf_make_contiguous(queue); } - if (modutil_extract_SQL(queue, &ptr, &length) != 0) + if ((ptr = modutil_get_SQL(queue)) != NULL) { if ((my_instance->match == NULL || regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) && @@ -424,6 +424,7 @@ struct timeval tv; fwrite(ptr, sizeof(char), length, my_session->fp); fwrite("\n", sizeof(char), 1, my_session->fp); } + free(ptr); } } /* Pass the query downstream */ diff --git a/server/modules/filter/regexfilter.c b/server/modules/filter/regexfilter.c index 898aae101..4ca20563e 100644 --- a/server/modules/filter/regexfilter.c +++ b/server/modules/filter/regexfilter.c @@ -60,7 +60,7 @@ static void setDownstream(FILTER *instance, void *fsession, DOWNSTREAM *downstre static int routeQuery(FILTER *instance, void *fsession, GWBUF *queue); static void diagnostic(FILTER *instance, void *fsession, DCB *dcb); -static char *regex_replace(char *sql, int length, regex_t *re, char *replace); +static char *regex_replace(char *sql, regex_t *re, char *replace); static FILTER_OBJECT MyObject = { createInstance, @@ -302,7 +302,6 @@ routeQuery(FILTER *instance, void *session, GWBUF *queue) REGEX_INSTANCE *my_instance = (REGEX_INSTANCE *)instance; REGEX_SESSION *my_session = (REGEX_SESSION *)session; char *sql, *newsql; -int length; if (modutil_is_SQL(queue)) { @@ -310,18 +309,21 @@ int length; { queue = gwbuf_make_contiguous(queue); } - modutil_extract_SQL(queue, &sql, &length); - newsql = regex_replace(sql, length, &my_instance->re, - my_instance->replace); - if (newsql) + if ((sql = modutil_get_SQL(queue)) != NULL) { - queue = modutil_replace_SQL(queue, newsql); - queue = gwbuf_make_contiguous(queue); - free(newsql); - my_session->replacements++; + newsql = regex_replace(sql, &my_instance->re, + my_instance->replace); + if (newsql) + { + queue = modutil_replace_SQL(queue, newsql); + queue = gwbuf_make_contiguous(queue); + free(newsql); + my_session->replacements++; + } + else + my_session->no_change++; + free(sql); } - else - my_session->no_change++; } return my_session->down.routeQuery(my_session->down.instance, @@ -368,26 +370,23 @@ REGEX_SESSION *my_session = (REGEX_SESSION *)fsession; * Perform a regular expression match and subsititution on the SQL * * @param sql The original SQL text - * @param length The length of the SQL text * @param re The compiled regular expression * @param replace The replacement text * @return The replaced text or NULL if no replacement was done. */ static char * -regex_replace(char *sql, int length, regex_t *re, char *replace) +regex_replace(char *sql, regex_t *re, char *replace) { char *orig, *result, *ptr; int i, res_size, res_length, rep_length; -int last_match; +int last_match, length; regmatch_t match[10]; - orig = strndup(sql, length); - if (regexec(re, orig, 10, match, 0)) + if (regexec(re, sql, 10, match, 0)) { - free(orig); return NULL; } - free(orig); + length = strlen(sql); res_size = 2 * length; result = (char *)malloc(res_size); diff --git a/server/modules/filter/tee.c b/server/modules/filter/tee.c index 60be14413..94dc29ea0 100644 --- a/server/modules/filter/tee.c +++ b/server/modules/filter/tee.c @@ -403,16 +403,19 @@ GWBUF *clone = NULL; if (my_session->residual < 0) my_session->residual = 0; } - else if (my_session->active && - modutil_MySQL_Query(queue, &ptr, &length, &residual)) + else if (my_session->active && (ptr = modutil_get_SQL(queue) != NULL)) { if ((my_instance->match == NULL || regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) && (my_instance->nomatch == NULL || regexec(&my_instance->nore,ptr,0,NULL, 0) != 0)) { + char *dummy; + + modutil_MySQL_Query(queue, &dummy, &length, &residual); clone = gwbuf_clone(queue); my_session->residual = residual; + free(ptr); } } diff --git a/server/modules/filter/test/harness_common.c b/server/modules/filter/test/harness_common.c index a247bc54a..3c42bfdde 100644 --- a/server/modules/filter/test/harness_common.c +++ b/server/modules/filter/test/harness_common.c @@ -137,8 +137,8 @@ FILTER_PARAMETER** read_params(int* paramc) do_read = 0; } } - FILTER_PARAMETER** params; - if((params = malloc(sizeof(FILTER_PARAMETER*)*(pc+1)))!=NULL){ + FILTER_PARAMETER** params = NULL; + if((params = malloc(sizeof(FILTER_PARAMETER*)*(pc+1))) != NULL){ for(i = 0;iactive) { @@ -461,7 +460,7 @@ int length; { queue = gwbuf_make_contiguous(queue); } - if (modutil_extract_SQL(queue, &ptr, &length) != 0) + if ((ptr = modutil_get_SQL(queue)) != NULL) { if ((my_instance->match == NULL || regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) && @@ -472,7 +471,11 @@ int length; if (my_session->current) free(my_session->current); gettimeofday(&my_session->start, NULL); - my_session->current = strndup(ptr, length); + my_session->current = ptr; + } + else + { + free(ptr); } } } diff --git a/server/modules/monitor/galera_mon.c b/server/modules/monitor/galera_mon.c index ce2900e4a..e63bf9bfa 100644 --- a/server/modules/monitor/galera_mon.c +++ b/server/modules/monitor/galera_mon.c @@ -30,6 +30,7 @@ * Interval is printed in diagnostics. * 03/06/14 Mark Riddoch Add support for maintenance mode * 24/06/14 Massimiliano Pinto Added depth level 0 for each node + * 30/10/14 Massimiliano Pinto Added disableMasterFailback feature * * @endverbatim */ @@ -52,7 +53,7 @@ extern int lm_enabled_logfiles_bitmask; static void monitorMain(void *); -static char *version_str = "V1.2.0"; +static char *version_str = "V1.3.0"; MODULE_INFO info = { MODULE_API_MONITOR, @@ -68,6 +69,9 @@ static void unregisterServer(void *, SERVER *); static void defaultUsers(void *, char *, char *); static void diagnostics(DCB *, void *); static void setInterval(void *, size_t); +static MONITOR_SERVERS *get_candidate_master(MONITOR_SERVERS *); +static MONITOR_SERVERS *set_cluster_master(MONITOR_SERVERS *, MONITOR_SERVERS *, int); +static void disableMasterFailback(void *, int); static MONITOR_OBJECT MyObject = { startMonitor, @@ -76,10 +80,12 @@ static MONITOR_OBJECT MyObject = { unregisterServer, defaultUsers, diagnostics, - setInterval, + setInterval, + NULL, NULL, NULL, NULL, + disableMasterFailback }; /** @@ -147,6 +153,8 @@ MYSQL_MONITOR *handle; handle->defaultPasswd = NULL; handle->id = MONITOR_DEFAULT_ID; handle->interval = MONITOR_INTERVAL; + handle->disableMasterFailback = 0; + handle->master = NULL; spinlock_init(&handle->lock); } handle->tid = (THREAD)thread_start(monitorMain, handle); @@ -264,6 +272,7 @@ char *sep; } dcb_printf(dcb,"\tSampling interval:\t%lu milliseconds\n", handle->interval); + dcb_printf(dcb,"\tMaster Failback:\t%s\n", (handle->disableMasterFailback == 1) ? "off" : "on"); dcb_printf(dcb, "\tMonitored servers: "); db = handle->databases; @@ -331,8 +340,10 @@ char *server_string; char *dpwd = decryptPassword(passwd); int rc; int read_timeout = 1; + int connect_timeout = 2; database->con = mysql_init(NULL); + rc = mysql_options(database->con, MYSQL_OPT_CONNECT_TIMEOUT, (void *)&connect_timeout); rc = mysql_options(database->con, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout); if (mysql_real_connect(database->con, database->server->name, @@ -345,7 +356,15 @@ char *server_string; database->server->name, database->server->port, mysql_error(database->con)))); + server_clear_status(database->server, SERVER_RUNNING); + + /* Also clear Joined, M/S and Stickiness bits */ + server_clear_status(database->server, SERVER_JOINED); + server_clear_status(database->server, SERVER_SLAVE); + server_clear_status(database->server, SERVER_MASTER); + server_clear_status(database->server, SERVER_MASTER_STICKINESS); + if (mysql_errno(database->con) == ER_ACCESS_DENIED_ERROR) { server_set_status(database->server, SERVER_AUTH_ERROR); @@ -421,10 +440,11 @@ char *server_string; static void monitorMain(void *arg) { -MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; -MONITOR_SERVERS *ptr; -long master_id; -size_t nrounds = 0; +MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; +MONITOR_SERVERS *ptr; +size_t nrounds = 0; +MONITOR_SERVERS *candidate_master = NULL; +int master_stickiness = handle->disableMasterFailback; if (mysql_thread_init()) { @@ -453,39 +473,35 @@ size_t nrounds = 0; * interval, then skip monitoring checks. Excluding the first * round. */ - if (nrounds != 0 && - ((nrounds*MON_BASE_INTERVAL_MS)%handle->interval) >= - MON_BASE_INTERVAL_MS) + + if (nrounds != 0 && ((nrounds*MON_BASE_INTERVAL_MS)%handle->interval) >= MON_BASE_INTERVAL_MS) { nrounds += 1; continue; } + nrounds += 1; - master_id = -1; ptr = handle->databases; while (ptr) { unsigned int prev_status = ptr->server->status; + monitorDatabase(ptr, handle->defaultUser, handle->defaultPasswd); - /* set master_id to the lowest value of ptr->server->node_id */ + /* clear bits for non member nodes */ + if ( ! SERVER_IN_MAINT(ptr->server) && (ptr->server->node_id < 0 || ! SERVER_IS_JOINED(ptr->server))) { + ptr->server->depth = -1; - if ((! SERVER_IN_MAINT(ptr->server)) && ptr->server->node_id >= 0 && SERVER_IS_JOINED(ptr->server)) { - ptr->server->depth = 0; - if (ptr->server->node_id < master_id && master_id >= 0) { - master_id = ptr->server->node_id; - } else { - if (master_id < 0) { - master_id = ptr->server->node_id; - } - } - } else if (!SERVER_IN_MAINT(ptr->server)) { /* clear M/S status */ server_clear_status(ptr->server, SERVER_SLAVE); server_clear_status(ptr->server, SERVER_MASTER); - ptr->server->depth = -1; + + /* clear master sticky status */ + server_clear_status(ptr->server, SERVER_MASTER_STICKINESS); } + + /* Log server status change */ if (ptr->server->status != prev_status || SERVER_IS_DOWN(ptr->server)) { @@ -496,24 +512,50 @@ size_t nrounds = 0; ptr->server->port, STRSRVSTATUS(ptr->server)))); } + ptr = ptr->next; } + /* + * Let's select a master server: + * it could be the candidate master following MIN(node_id) rule or + * the server that was master in the previous monitor polling cycle + * Decision depends on master_stickiness value set in configuration + */ + + /* get the candidate master, followinf MIN(node_id) rule */ + candidate_master = get_candidate_master(handle->databases); + + /* Select the master, based on master_stickiness */ + handle->master = set_cluster_master(handle->master, candidate_master, master_stickiness); + ptr = handle->databases; - /* this server loop sets Master and Slave roles */ - while (ptr) - { - if ((! SERVER_IN_MAINT(ptr->server)) && ptr->server->node_id >= 0 && master_id >= 0) { - /* set the Master role */ - if (SERVER_IS_JOINED(ptr->server) && (ptr->server->node_id == master_id)) { - server_set_status(ptr->server, SERVER_MASTER); - server_clear_status(ptr->server, SERVER_SLAVE); - } else if (SERVER_IS_JOINED(ptr->server) && (ptr->server->node_id > master_id)) { + while (ptr && handle->master) { + if (!SERVER_IS_JOINED(ptr->server) || SERVER_IN_MAINT(ptr->server)) { + ptr = ptr->next; + continue; + } + + if (ptr != handle->master) { /* set the Slave role */ - server_set_status(ptr->server, SERVER_SLAVE); - server_clear_status(ptr->server, SERVER_MASTER); - } + server_set_status(ptr->server, SERVER_SLAVE); + server_clear_status(ptr->server, SERVER_MASTER); + + /* clear master stickyness */ + server_clear_status(ptr->server, SERVER_MASTER_STICKINESS); + } else { + /* set the Master role */ + server_set_status(handle->master->server, SERVER_MASTER); + server_clear_status(handle->master->server, SERVER_SLAVE); + + if (candidate_master && handle->master->server->node_id != candidate_master->server->node_id) { + /* set master stickyness */ + server_set_status(handle->master->server, SERVER_MASTER_STICKINESS); + } else { + /* clear master stickyness */ + server_clear_status(ptr->server, SERVER_MASTER_STICKINESS); + } } ptr = ptr->next; @@ -533,3 +575,90 @@ setInterval(void *arg, size_t interval) MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; memcpy(&handle->interval, &interval, sizeof(unsigned long)); } + +/** + * get candidate master from all nodes + * + * The current available rule: get the server with MIN(node_id) + * node_id comes from 'wsrep_local_index' variable + * + * @param servers The monitored servers list + * @return The candidate master on success, NULL on failure + */ +static MONITOR_SERVERS *get_candidate_master(MONITOR_SERVERS *servers) { + MONITOR_SERVERS *ptr = servers; + MONITOR_SERVERS *candidate_master = NULL; + long min_id = -1; + + /* set min_id to the lowest value of ptr->server->node_id */ + while(ptr) { + if ((! SERVER_IN_MAINT(ptr->server)) && ptr->server->node_id >= 0 && SERVER_IS_JOINED(ptr->server)) { + ptr->server->depth = 0; + if ((ptr->server->node_id < min_id) && min_id >= 0) { + min_id = ptr->server->node_id; + candidate_master = ptr; + } else { + if (min_id < 0) { + min_id = ptr->server->node_id; + candidate_master = ptr; + } + } + } + + ptr = ptr->next; + } + + return candidate_master; +} + +/** + * set the master server in the cluster + * + * master could be the last one from previous monitor cycle Iis running) or + * the candidate master. + * The selection is based on the configuration option mapped to master_stickiness + * The candidate master may change over time due to + * 'wsrep_local_index' value change in the Galera Cluster + * Enabling master_stickiness will avoid master change unless a failure is spotted + * + * @param current_master Previous master server + * @param candidate_master The candidate master server accordingly to the selection rule + * @return The master node pointer (could be NULL) + */ +static MONITOR_SERVERS *set_cluster_master(MONITOR_SERVERS *current_master, MONITOR_SERVERS *candidate_master, int master_stickiness) { + /* + * if current master is not set or master_stickiness is not enable + * just return candidate_master. + */ + if (current_master == NULL || master_stickiness == 0) { + return candidate_master; + } else { + /* + * if current_master is still a cluster member use it + * + */ + if (SERVER_IS_JOINED(current_master->server) && (! SERVER_IN_MAINT(current_master->server))) { + return current_master; + } else + return candidate_master; + } +} + +/** + * Disable/Enable the Master failback in a Galera Cluster. + * + * A restarted / rejoined node may get back the previous 'wsrep_local_index' + * from Cluster: if the value is the lowest in the cluster it will be selected as Master + * This will cause a Master change even if there is no failure. + * The option if set to 1 will avoid this situation, keeping the current Master (if running) available + * + * @param arg The handle allocated by startMonitor + * @param disable To disable it use 1, 0 keeps failback + */ +static void +disableMasterFailback(void *arg, int disable) +{ +MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; + memcpy(&handle->disableMasterFailback, &disable, sizeof(int)); +} + diff --git a/server/modules/monitor/mm_mon.c b/server/modules/monitor/mm_mon.c new file mode 100644 index 000000000..b4f62e8b0 --- /dev/null +++ b/server/modules/monitor/mm_mon.c @@ -0,0 +1,824 @@ +/* + * This file is distributed as part of the MariaDB Corporation MaxScale. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright MariaDB Corporation Ab 2013-2014 + */ + +/** + * @file mysql_mon.c - A MySQL Multi Muster cluster monitor + * + * @verbatim + * Revision History + * + * Date Who Description + * 08/09/14 Massimiliano Pinto Initial implementation + * + * @endverbatim + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +extern int lm_enabled_logfiles_bitmask; + +static void monitorMain(void *); + +static char *version_str = "V1.0.1"; + +MODULE_INFO info = { + MODULE_API_MONITOR, + MODULE_BETA_RELEASE, + MONITOR_VERSION, + "A MySQL Multi Master monitor" +}; + +static void *startMonitor(void *); +static void stopMonitor(void *); +static void registerServer(void *, SERVER *); +static void unregisterServer(void *, SERVER *); +static void defaultUser(void *, char *, char *); +static void diagnostics(DCB *, void *); +static void setInterval(void *, size_t); +static void detectStaleMaster(void *, int); +static bool mon_status_changed(MONITOR_SERVERS* mon_srv); +static bool mon_print_fail_status(MONITOR_SERVERS* mon_srv); +static MONITOR_SERVERS *get_current_master(MYSQL_MONITOR *); +static void monitor_set_pending_status(MONITOR_SERVERS *, int); +static void monitor_clear_pending_status(MONITOR_SERVERS *, int); + +static MONITOR_OBJECT MyObject = { + startMonitor, + stopMonitor, + registerServer, + unregisterServer, + defaultUser, + diagnostics, + setInterval, + NULL, + NULL, + NULL, + detectStaleMaster, + NULL +}; + +/** + * Implementation of the mandatory version entry point + * + * @return version string of the module + */ +char * +version() +{ + return version_str; +} + +/** + * The module initialisation routine, called when the module + * is first loaded. + */ +void +ModuleInit() +{ + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "Initialise the MySQL Monitor module %s.", + version_str))); +} + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +MONITOR_OBJECT * +GetModuleObject() +{ + return &MyObject; +} + +/** + * Start the instance of the monitor, returning a handle on the monitor. + * + * This function creates a thread to execute the actual monitoring. + * + * @param arg The current handle - NULL if first start + * @return A handle to use when interacting with the monitor + */ +static void * +startMonitor(void *arg) +{ +MYSQL_MONITOR *handle; + + if (arg) + { + handle = arg; /* Must be a restart */ + handle->shutdown = 0; + } + else + { + if ((handle = (MYSQL_MONITOR *)malloc(sizeof(MYSQL_MONITOR))) == NULL) + return NULL; + handle->databases = NULL; + handle->shutdown = 0; + handle->defaultUser = NULL; + handle->defaultPasswd = NULL; + handle->id = MONITOR_DEFAULT_ID; + handle->interval = MONITOR_INTERVAL; + handle->replicationHeartbeat = 0; + handle->detectStaleMaster = 0; + handle->master = NULL; + spinlock_init(&handle->lock); + } + handle->tid = (THREAD)thread_start(monitorMain, handle); + return handle; +} + +/** + * Stop a running monitor + * + * @param arg Handle on thr running monior + */ +static void +stopMonitor(void *arg) +{ +MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; + + handle->shutdown = 1; + thread_wait((void *)handle->tid); +} + +/** + * Register a server that must be added to the monitored servers for + * a monitoring module. + * + * @param arg A handle on the running monitor module + * @param server The server to add + */ +static void +registerServer(void *arg, SERVER *server) +{ +MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; +MONITOR_SERVERS *ptr, *db; + + if ((db = (MONITOR_SERVERS *)malloc(sizeof(MONITOR_SERVERS))) == NULL) + return; + db->server = server; + db->con = NULL; + db->next = NULL; + db->mon_err_count = 0; + db->mon_prev_status = 0; + /* pending status is updated by monitorMain */ + db->pending_status = 0; + + spinlock_acquire(&handle->lock); + + if (handle->databases == NULL) + handle->databases = db; + else + { + ptr = handle->databases; + while (ptr->next != NULL) + ptr = ptr->next; + ptr->next = db; + } + spinlock_release(&handle->lock); +} + +/** + * Remove a server from those being monitored by a monitoring module + * + * @param arg A handle on the running monitor module + * @param server The server to remove + */ +static void +unregisterServer(void *arg, SERVER *server) +{ +MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; +MONITOR_SERVERS *ptr, *lptr; + + spinlock_acquire(&handle->lock); + if (handle->databases == NULL) + { + spinlock_release(&handle->lock); + return; + } + if (handle->databases->server == server) + { + ptr = handle->databases; + handle->databases = handle->databases->next; + free(ptr); + } + else + { + ptr = handle->databases; + while (ptr->next != NULL && ptr->next->server != server) + ptr = ptr->next; + if (ptr->next) + { + lptr = ptr->next; + ptr->next = ptr->next->next; + free(lptr); + } + } + spinlock_release(&handle->lock); +} + +/** + * Set the default username and password to use to monitor if the server does not + * override this. + * + * @param arg The handle allocated by startMonitor + * @param uname The default user name + * @param passwd The default password + */ +static void +defaultUser(void *arg, char *uname, char *passwd) +{ +MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; + + if (handle->defaultUser) + free(handle->defaultUser); + if (handle->defaultPasswd) + free(handle->defaultPasswd); + handle->defaultUser = strdup(uname); + handle->defaultPasswd = strdup(passwd); +} + +/** + * Daignostic interface + * + * @param dcb DCB to print diagnostics + * @param arg The monitor handle + */ +static void diagnostics(DCB *dcb, void *arg) +{ +MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; +MONITOR_SERVERS *db; +char *sep; + + switch (handle->status) + { + case MONITOR_RUNNING: + dcb_printf(dcb, "\tMonitor running\n"); + break; + case MONITOR_STOPPING: + dcb_printf(dcb, "\tMonitor stopping\n"); + break; + case MONITOR_STOPPED: + dcb_printf(dcb, "\tMonitor stopped\n"); + break; + } + + dcb_printf(dcb,"\tSampling interval:\t%lu milliseconds\n", handle->interval); + dcb_printf(dcb,"\tDetect Stale Master:\t%s\n", (handle->detectStaleMaster == 1) ? "enabled" : "disabled"); + dcb_printf(dcb, "\tMonitored servers: "); + + db = handle->databases; + sep = ""; + while (db) + { + dcb_printf(dcb, + "%s%s:%d", + sep, + db->server->name, + db->server->port); + sep = ", "; + db = db->next; + } + dcb_printf(dcb, "\n"); +} + +/** + * Monitor an individual server + * + * @param handle The MySQL Monitor object + * @param database The database to probe + */ +static void +monitorDatabase(MYSQL_MONITOR *handle, MONITOR_SERVERS *database) +{ +MYSQL_ROW row; +MYSQL_RES *result; +int num_fields; +int isslave = 0; +int ismaster = 0; +char *uname = handle->defaultUser; +char *passwd = handle->defaultPasswd; +unsigned long int server_version = 0; +char *server_string; + + if (database->server->monuser != NULL) + { + uname = database->server->monuser; + passwd = database->server->monpw; + } + + if (uname == NULL) + return; + + /* Don't probe servers in maintenance mode */ + if (SERVER_IN_MAINT(database->server)) + return; + + /** Store prevous status */ + database->mon_prev_status = database->server->status; + + if (database->con == NULL || mysql_ping(database->con) != 0) + { + char *dpwd = decryptPassword(passwd); + int rc; + int read_timeout = 1; + + database->con = mysql_init(NULL); + + rc = mysql_options(database->con, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout); + + if (mysql_real_connect(database->con, + database->server->name, + uname, + dpwd, + NULL, + database->server->port, + NULL, + 0) == NULL) + { + free(dpwd); + + if (mon_print_fail_status(database)) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Monitor was unable to connect to " + "server %s:%d : \"%s\"", + database->server->name, + database->server->port, + mysql_error(database->con)))); + } + + /* The current server is not running + * + * Store server NOT running in server and monitor server pending struct + * + */ + if (mysql_errno(database->con) == ER_ACCESS_DENIED_ERROR) + { + server_set_status(database->server, SERVER_AUTH_ERROR); + monitor_set_pending_status(database, SERVER_AUTH_ERROR); + } + server_clear_status(database->server, SERVER_RUNNING); + monitor_clear_pending_status(database, SERVER_RUNNING); + + /* Also clear M/S state in both server and monitor server pending struct */ + server_clear_status(database->server, SERVER_SLAVE); + server_clear_status(database->server, SERVER_MASTER); + monitor_clear_pending_status(database, SERVER_SLAVE); + monitor_clear_pending_status(database, SERVER_MASTER); + + /* Clean addition status too */ + server_clear_status(database->server, SERVER_STALE_STATUS); + monitor_clear_pending_status(database, SERVER_STALE_STATUS); + + return; + } else { + server_clear_status(database->server, SERVER_AUTH_ERROR); + monitor_clear_pending_status(database, SERVER_AUTH_ERROR); + } + free(dpwd); + } + /* Store current status in both server and monitor server pending struct */ + server_set_status(database->server, SERVER_RUNNING); + monitor_set_pending_status(database, SERVER_RUNNING); + + /* get server version from current server */ + server_version = mysql_get_server_version(database->con); + + /* get server version string */ + server_string = (char *)mysql_get_server_info(database->con); + if (server_string) { + database->server->server_string = realloc(database->server->server_string, strlen(server_string)+1); + if (database->server->server_string) + strcpy(database->server->server_string, server_string); + } + + /* get server_id form current node */ + if (mysql_query(database->con, "SELECT @@server_id") == 0 + && (result = mysql_store_result(database->con)) != NULL) + { + long server_id = -1; + num_fields = mysql_num_fields(result); + while ((row = mysql_fetch_row(result))) + { + server_id = strtol(row[0], NULL, 10); + if ((errno == ERANGE && (server_id == LONG_MAX + || server_id == LONG_MIN)) || (errno != 0 && server_id == 0)) + { + server_id = -1; + } + database->server->node_id = server_id; + } + mysql_free_result(result); + } + + /* Check if the Slave_SQL_Running and Slave_IO_Running status is + * set to Yes + */ + + /* Check first for MariaDB 10.x.x and get status for multimaster replication */ + if (server_version >= 100000) { + + if (mysql_query(database->con, "SHOW ALL SLAVES STATUS") == 0 + && (result = mysql_store_result(database->con)) != NULL) + { + int i = 0; + long master_id = -1; + num_fields = mysql_num_fields(result); + while ((row = mysql_fetch_row(result))) + { + /* get Slave_IO_Running and Slave_SQL_Running values*/ + if (strncmp(row[12], "Yes", 3) == 0 + && strncmp(row[13], "Yes", 3) == 0) { + isslave += 1; + } + + /* If Slave_IO_Running = Yes, assign the master_id to current server: this allows building + * the replication tree, slaves ids will be added to master(s) and we will have at least the + * root master server. + * Please note, there could be no slaves at all if Slave_SQL_Running == 'No' + */ + if (strncmp(row[12], "Yes", 3) == 0) { + /* get Master_Server_Id values */ + master_id = atol(row[41]); + if (master_id == 0) + master_id = -1; + } + + i++; + } + /* store master_id of current node */ + memcpy(&database->server->master_id, &master_id, sizeof(long)); + + mysql_free_result(result); + + /* If all configured slaves are running set this node as slave */ + if (isslave > 0 && isslave == i) + isslave = 1; + else + isslave = 0; + } + } else { + if (mysql_query(database->con, "SHOW SLAVE STATUS") == 0 + && (result = mysql_store_result(database->con)) != NULL) + { + long master_id = -1; + num_fields = mysql_num_fields(result); + while ((row = mysql_fetch_row(result))) + { + /* get Slave_IO_Running and Slave_SQL_Running values*/ + if (strncmp(row[10], "Yes", 3) == 0 + && strncmp(row[11], "Yes", 3) == 0) { + isslave = 1; + } + + /* If Slave_IO_Running = Yes, assign the master_id to current server: this allows building + * the replication tree, slaves ids will be added to master(s) and we will have at least the + * root master server. + * Please note, there could be no slaves at all if Slave_SQL_Running == 'No' + */ + if (strncmp(row[10], "Yes", 3) == 0) { + /* get Master_Server_Id values */ + master_id = atol(row[39]); + if (master_id == 0) + master_id = -1; + } + } + /* store master_id of current node */ + memcpy(&database->server->master_id, &master_id, sizeof(long)); + + mysql_free_result(result); + } + } + + /* get variable 'read_only' set by an external component */ + if (mysql_query(database->con, "SHOW GLOBAL VARIABLES LIKE 'read_only'") == 0 + && (result = mysql_store_result(database->con)) != NULL) + { + num_fields = mysql_num_fields(result); + while ((row = mysql_fetch_row(result))) + { + if (strncasecmp(row[1], "OFF", 3) == 0) { + ismaster = 1; + } + } + mysql_free_result(result); + } + + /* Remove addition info */ + monitor_clear_pending_status(database, SERVER_STALE_STATUS); + + /* Set the Slave Role */ + if (isslave) + { + monitor_set_pending_status(database, SERVER_SLAVE); + /* Avoid any possible stale Master state */ + monitor_clear_pending_status(database, SERVER_MASTER); + + /* Set replication depth to 1 */ + database->server->depth = 1; + } else { + /* Avoid any possible Master/Slave stale state */ + monitor_clear_pending_status(database, SERVER_SLAVE); + monitor_clear_pending_status(database, SERVER_MASTER); + } + + /* Set the Master role */ + if (isslave && ismaster) + { + monitor_clear_pending_status(database, SERVER_SLAVE); + monitor_set_pending_status(database, SERVER_MASTER); + + /* Set replication depth to 0 */ + database->server->depth = 0; + } + +} + +/** + * The entry point for the monitoring module thread + * + * @param arg The handle of the monitor + */ +static void +monitorMain(void *arg) +{ +MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; +MONITOR_SERVERS *ptr; +int detect_stale_master = handle->detectStaleMaster; +MONITOR_SERVERS *root_master; +size_t nrounds = 0; + + if (mysql_thread_init()) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Fatal : mysql_thread_init failed in monitor " + "module. Exiting.\n"))); + return; + } + + handle->status = MONITOR_RUNNING; + while (1) + { + if (handle->shutdown) + { + handle->status = MONITOR_STOPPING; + mysql_thread_end(); + handle->status = MONITOR_STOPPED; + return; + } + + /** Wait base interval */ + thread_millisleep(MON_BASE_INTERVAL_MS); + /** + * Calculate how far away the monitor interval is from its full + * cycle and if monitor interval time further than the base + * interval, then skip monitoring checks. Excluding the first + * round. + */ + if (nrounds != 0 && + ((nrounds*MON_BASE_INTERVAL_MS)%handle->interval) >= + MON_BASE_INTERVAL_MS) + { + nrounds += 1; + continue; + } + nrounds += 1; + + /* start from the first server in the list */ + ptr = handle->databases; + + while (ptr) + { + /* copy server status into monitor pending_status */ + ptr->pending_status = ptr->server->status; + + /* monitor current node */ + monitorDatabase(handle, ptr); + + if (mon_status_changed(ptr)) + { + dcb_call_foreach(DCB_REASON_NOT_RESPONDING); + } + + if (mon_status_changed(ptr) || + mon_print_fail_status(ptr)) + { + LOGIF(LM, (skygw_log_write_flush( + LOGFILE_MESSAGE, + "Backend server %s:%d state : %s", + ptr->server->name, + ptr->server->port, + STRSRVSTATUS(ptr->server)))); + } + if (SERVER_IS_DOWN(ptr->server)) + { + /** Increase this server'e error count */ + ptr->mon_err_count += 1; + } + else + { + /** Reset this server's error count */ + ptr->mon_err_count = 0; + } + + ptr = ptr->next; + } + + /* Get Master server pointer */ + root_master = get_current_master(handle); + + /* Update server status from monitor pending status on that server*/ + + ptr = handle->databases; + while (ptr) + { + if (! SERVER_IN_MAINT(ptr->server)) { + /* If "detect_stale_master" option is On, let's use the previus master */ + if (detect_stale_master && root_master && (!strcmp(ptr->server->name, root_master->server->name) && ptr->server->port == root_master->server->port) && (ptr->server->status & SERVER_MASTER) && !(ptr->pending_status & SERVER_MASTER)) { + /* in this case server->status will not be updated from pending_status */ + LOGIF(LM, (skygw_log_write_flush( + LOGFILE_MESSAGE, "[mysql_mon]: root server [%s:%i] is no longer Master, let's use it again even if it could be a stale master, you have been warned!", ptr->server->name, ptr->server->port))); + /* Set the STALE bit for this server in server struct */ + server_set_status(ptr->server, SERVER_STALE_STATUS); + } else { + ptr->server->status = ptr->pending_status; + } + } + ptr = ptr->next; + } + } +} + +/** + * Set the monitor sampling interval. + * + * @param arg The handle allocated by startMonitor + * @param interval The interval to set in monitor struct, in milliseconds + */ +static void +setInterval(void *arg, size_t interval) +{ +MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; + memcpy(&handle->interval, &interval, sizeof(unsigned long)); +} + +/** + * Enable/Disable the MySQL Replication Stale Master dectection, allowing a previouvsly detected master to still act as a Master. + * This option must be enabled in order to keep the Master when the replication is stopped or removed from slaves. + * If the replication is still stopped when MaxSclale is restarted no Master will be available. + * + * @param arg The handle allocated by startMonitor + * @param enable To enable it 1, disable it with 0 + */ +static void +detectStaleMaster(void *arg, int enable) +{ +MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; + memcpy(&handle->detectStaleMaster, &enable, sizeof(int)); +} + +static bool mon_status_changed( + MONITOR_SERVERS* mon_srv) +{ + bool succp; + + if (mon_srv->mon_prev_status != mon_srv->server->status) + { + succp = true; + } + else + { + succp = false; + } + return succp; +} + +static bool mon_print_fail_status( + MONITOR_SERVERS* mon_srv) +{ + bool succp; + int errcount = mon_srv->mon_err_count; + uint8_t modval; + + modval = 1<<(MIN(errcount/10, 7)); + + if (SERVER_IS_DOWN(mon_srv->server) && errcount%modval == 0) + { + succp = true; + } + else + { + succp = false; + } + return succp; +} + +/** + * Set a pending status bit in the monior server + * + * @param server The server to update + * @param bit The bit to clear for the server + */ +static void +monitor_set_pending_status(MONITOR_SERVERS *ptr, int bit) +{ + ptr->pending_status |= bit; +} + +/** + * Clear a pending status bit in the monior server + * + * @param server The server to update + * @param bit The bit to clear for the server + */ +static void +monitor_clear_pending_status(MONITOR_SERVERS *ptr, int bit) +{ + ptr->pending_status &= ~bit; +} + +/******* + * This function returns the master server + * from a set of MySQL Multi Master monitored servers + * and returns the root server (that has SERVER_MASTER bit) + * The server is returned even for servers in 'maintenance' mode. + * + * @param handle The monitor handle + * @return The server at root level with SERVER_MASTER bit + */ + +static MONITOR_SERVERS *get_current_master(MYSQL_MONITOR *handle) { +MONITOR_SERVERS *ptr; + + ptr = handle->databases; + + while (ptr) + { + /* The server could be in SERVER_IN_MAINT + * that means SERVER_IS_RUNNING returns 0 + * Let's check only for SERVER_IS_DOWN: server is not running + */ + if (SERVER_IS_DOWN(ptr->server)) { + ptr = ptr->next; + continue; + } + + if (ptr->server->depth == 0) { + handle->master = ptr; + } + + ptr = ptr->next; + } + + + /* + * Return the root master + */ + + if (handle->master != NULL) { + /* If the root master is in MAINT, return NULL */ + if (SERVER_IN_MAINT(handle->master->server)) { + return NULL; + } else { + return handle->master; + } + } else { + return NULL; + } +} + diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index 5050f3e85..a89d46d84 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -84,6 +84,7 @@ static void setInterval(void *, size_t); static void defaultId(void *, unsigned long); static void replicationHeartbeat(void *, int); static void detectStaleMaster(void *, int); +static void setNetworkTimeout(void *, int, int); static bool mon_status_changed(MONITOR_SERVERS* mon_srv); static bool mon_print_fail_status(MONITOR_SERVERS* mon_srv); static MONITOR_SERVERS *getServerByNodeId(MONITOR_SERVERS *, long); @@ -103,9 +104,11 @@ static MONITOR_OBJECT MyObject = { defaultUser, diagnostics, setInterval, + NULL, defaultId, replicationHeartbeat, - detectStaleMaster + detectStaleMaster, + NULL }; /** @@ -1203,3 +1206,15 @@ monitor_clear_pending_status(MONITOR_SERVERS *ptr, int bit) { ptr->pending_status &= ~bit; } + +/** + * Set the default id to use in the monitor. + * + * @param arg The handle allocated by startMonitor + * @param id The id to set in monitor struct + */ +static void +setNetworkTimeout(void *arg, int type, int value) +{ +MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; +} diff --git a/server/modules/monitor/mysqlmon.h b/server/modules/monitor/mysqlmon.h index 018d695ff..54658325a 100644 --- a/server/modules/monitor/mysqlmon.h +++ b/server/modules/monitor/mysqlmon.h @@ -65,6 +65,7 @@ typedef struct { unsigned long id; /**< Monitor ID */ int replicationHeartbeat; /**< Monitor flag for MySQL replication heartbeat */ int detectStaleMaster; /**< Monitor flag for MySQL replication Stale Master detection */ + int disableMasterFailback; /**< Monitor flag for Galera Cluster Master failback */ MONITOR_SERVERS *master; /**< Master server for MySQL Master/Slave replication */ MONITOR_SERVERS *databases; /**< Linked list of servers to monitor */ } MYSQL_MONITOR; diff --git a/server/modules/monitor/ndbcluster_mon.c b/server/modules/monitor/ndbcluster_mon.c index 660179ef1..d273fe32f 100644 --- a/server/modules/monitor/ndbcluster_mon.c +++ b/server/modules/monitor/ndbcluster_mon.c @@ -73,6 +73,8 @@ static MONITOR_OBJECT MyObject = { setInterval, NULL, NULL, + NULL, + NULL, NULL }; diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 868647cfb..b61b20c48 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -394,7 +394,8 @@ static int gw_read_backend_event(DCB *dcb) { session->state = SESSION_STATE_STOPPING; spinlock_release(&session->ses_lock); } - dcb_close(dcb); + ss_dassert(dcb->dcb_errhandle_called); + dcb_close(dcb); } rc = 1; goto return_rc; @@ -453,12 +454,13 @@ static int gw_read_backend_event(DCB *dcb) { 0, "Read from backend failed"); - router->handleError(router_instance, - session->router_session, - errbuf, - dcb, - ERRACT_NEW_CONNECTION, - &succp); + router->handleError( + router_instance, + session->router_session, + errbuf, + dcb, + ERRACT_NEW_CONNECTION, + &succp); gwbuf_free(errbuf); if (!succp) @@ -467,6 +469,7 @@ static int gw_read_backend_event(DCB *dcb) { session->state = SESSION_STATE_STOPPING; spinlock_release(&session->ses_lock); } + ss_dassert(dcb->dcb_errhandle_called); dcb_close(dcb); rc = 0; goto return_rc; @@ -857,6 +860,7 @@ static int gw_error_backend_event(DCB *dcb) session->state = SESSION_STATE_STOPPING; spinlock_release(&session->ses_lock); } + ss_dassert(dcb->dcb_errhandle_called); dcb_close(dcb); retblock: @@ -1046,6 +1050,7 @@ gw_backend_hangup(DCB *dcb) session->state = SESSION_STATE_STOPPING; spinlock_release(&session->ses_lock); } + ss_dassert(dcb->dcb_errhandle_called); dcb_close(dcb); retblock: @@ -1170,7 +1175,7 @@ static int backend_write_delayqueue(DCB *dcb) 0, "Failed to write buffered data to back-end server. " "Buffer was empty or back-end was disconnected during " - "operation. Session will be closed."); + "operation. Attempting to find a new backend."); router->handleError(router_instance, rsession, @@ -1188,6 +1193,7 @@ static int backend_write_delayqueue(DCB *dcb) session->state = SESSION_STATE_STOPPING; spinlock_release(&session->ses_lock); } + ss_dassert(dcb->dcb_errhandle_called); dcb_close(dcb); } } diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 364aec3f9..d25039f4c 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -838,29 +838,18 @@ int gw_read_client_event( { GWBUF* errbuf; bool succp; - - errbuf = mysql_create_custom_error( - 1, - 0, - "Write to backend failed. Session closed."); -#if defined(SS_DEBUG) - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Client routing error handling."))); -#endif - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Routing the query failed. " - "Session will be closed."))); - - router->handleError(router_instance, - rsession, - errbuf, - dcb, - ERRACT_REPLY_CLIENT, - &succp); - gwbuf_free(errbuf); - ss_dassert(!succp); + + modutil_send_mysql_err_packet(dcb, + 1, + 0, + 2003, + "HY000", + "Write to backend failed. Session closed."); + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Routing the query failed. " + "Session will be closed."))); + dcb_close(dcb); } diff --git a/server/modules/routing/CMakeLists.txt b/server/modules/routing/CMakeLists.txt index 9d2f25f76..8208c470b 100644 --- a/server/modules/routing/CMakeLists.txt +++ b/server/modules/routing/CMakeLists.txt @@ -1,3 +1,7 @@ +if(BUILD_TESTS) + add_subdirectory(test) +endif() + add_library(testroute SHARED testroute.c) target_link_libraries(testroute log_manager utils) install(TARGETS testroute DESTINATION modules) diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 18766a893..d7f68b5c3 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -1406,7 +1406,7 @@ void check_drop_tmp_table( { int tsize = 0, klen = 0,i; - char** tbl; + char** tbl = NULL; char *hkey,*dbname; MYSQL_session* data; @@ -1425,29 +1425,31 @@ void check_drop_tmp_table( if (is_drop_table_query(querybuf)) { tbl = skygw_get_table_names(querybuf,&tsize,false); - - for(i = 0; irses_prop_data.temp_tables) - { - if (hashtable_delete(rses_prop_tmp->rses_prop_data.temp_tables, - (void *)hkey)) - { - LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, - "Temporary table dropped: %s",hkey))); - } - } - free(tbl[i]); - free(hkey); - } - free(tbl); + if (rses_prop_tmp && + rses_prop_tmp->rses_prop_data.temp_tables) + { + if (hashtable_delete(rses_prop_tmp->rses_prop_data.temp_tables, + (void *)hkey)) + { + LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, + "Temporary table dropped: %s",hkey))); + } + } + free(tbl[i]); + free(hkey); + } + + free(tbl); + } } } @@ -1468,7 +1470,7 @@ skygw_query_type_t is_read_tmp_table( bool target_tmp_table = false; int tsize = 0, klen = 0,i; - char** tbl; + char** tbl = NULL; char *hkey,*dbname; MYSQL_session* data; @@ -1493,7 +1495,7 @@ skygw_query_type_t is_read_tmp_table( { tbl = skygw_get_table_names(querybuf,&tsize,false); - if (tsize > 0) + if (tbl != NULL && tsize > 0) { /** Query targets at least one table */ for(i = 0; i 0) + for(i = 0; irses_master_ref->bref_dcb) == NULL) + { + char* query_str = modutil_get_query(querybuf); + CHK_DCB(master_dcb); + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error: Can't route %s:%s:\"%s\" to " + "backend server. Session doesn't have a Master " + "node", + STRPACKETTYPE(packet_type), + STRQTYPE(qtype), + (query_str == NULL ? "(empty)" : query_str)))); + free(query_str); + ret = 0; + goto retblock; + } + /** If buffer is not contiguous, make it such */ if (querybuf->next != NULL) { querybuf = gwbuf_make_contiguous(querybuf); } - - /** Read stored master DCB pointer */ - master_dcb = router_cli_ses->rses_master_ref->bref_dcb; - CHK_DCB(master_dcb); - + switch(packet_type) { case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */ case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */ @@ -1898,6 +1916,10 @@ static int routeQuery( /** Lock router session */ if (!rses_begin_locked_router_action(router_cli_ses)) { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Route query aborted! Routing session is closed <"))); + ret = 0; goto retblock; } /** @@ -2030,13 +2052,13 @@ static int routeQuery( NULL, MAX_RLAG_UNDEFINED); - if (succp && (master_dcb == NULL || master_dcb == curr_master_dcb)) + if (succp && master_dcb == curr_master_dcb) { atomic_add(&inst->stats.n_master, 1); target_dcb = master_dcb; } else - { + { if (succp && master_dcb != curr_master_dcb) { LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, @@ -2055,8 +2077,15 @@ static int routeQuery( "suitable state " "failed."))); } + /** + * Master has changed. Set the dcb pointer NULL and + * return with error indicator. + */ + router_cli_ses->rses_master_ref->bref_dcb = NULL; + rses_end_locked_router_action(router_cli_ses); succp = false; ret = 0; + goto retblock; } } @@ -4029,7 +4058,7 @@ static void rwsplit_process_router_options( } /** - * Error Handler routine to resolve backend failures. If it succeeds then there + * Error Handler routine to resolve _backend_ failures. If it succeeds then there * are enough operative backends available and connected. Otherwise it fails, * and session is terminated. * @@ -4058,6 +4087,7 @@ static void handleError ( CHK_DCB(backend_dcb); #if defined(SS_DEBUG) + ss_dassert(!backend_dcb->dcb_errhandle_called); backend_dcb->dcb_errhandle_called = true; #endif session = backend_dcb->session; @@ -4077,6 +4107,22 @@ static void handleError ( return; } + if (rses->rses_master_ref->bref_dcb == backend_dcb) + { + /** Master failed, can't recover */ + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Master node have failed. " + "Session will be closed."))); + + *succp = false; + rses_end_locked_router_action(rses); + return; + } + /** + * This is called in hope of getting replacement for + * failed slave(s). + */ *succp = handle_error_new_connection(inst, rses, backend_dcb, @@ -4119,7 +4165,18 @@ static void handle_error_reply_client( } /** - * This must be called with router lock + * Check if there is backend reference pointing at failed DCB, and reset its + * flags. Then clear DCB's callback and finally : try to find replacement(s) + * for failed slave(s). + * + * This must be called with router lock. + * + * @param inst router instance + * @param rses router client session + * @param dcb failed DCB + * @param errmsg error message which is sent to client if it is waiting + * + * @return true if there are enough backend connections to continue, false if not */ static bool handle_error_new_connection( ROUTER_INSTANCE* inst, @@ -4139,25 +4196,14 @@ static bool handle_error_new_connection( ses = backend_dcb->session; CHK_SESSION(ses); - bref = get_bref_from_dcb(rses, backend_dcb); - - /** failed DCB has already been replaced */ - if (bref == NULL) - { - succp = true; - goto return_succp; - } - /** - * Error handler is already called for this DCB because - * it's not polling anymore. It can be assumed that - * it succeed because rses isn't closed. + /** + * If bref == NULL it has been replaced already with another one. */ - if (backend_dcb->state != DCB_STATE_POLLING) + if ((bref = get_bref_from_dcb(rses, backend_dcb)) == NULL) { succp = true; goto return_succp; } - CHK_BACKEND_REF(bref); if (BREF_IS_WAITING_RESULT(bref)) @@ -4169,6 +4215,17 @@ static bool handle_error_new_connection( } bref_clear_state(bref, BREF_IN_USE); bref_set_state(bref, BREF_CLOSED); + + /** + * Error handler is already called for this DCB because + * it's not polling anymore. It can be assumed that + * it succeed because rses isn't closed. + */ + if (backend_dcb->state != DCB_STATE_POLLING) + { + succp = true; + goto return_succp; + } /** * Remove callback because this DCB won't be used * unless it is reconnected later, and then the callback @@ -4293,8 +4350,8 @@ static bool have_enough_servers( } else { - double pct = (*p_rses)->rses_config.rw_max_slave_conn_percent/100; - double nservers = (double)router_nsrv*pct; + int pct = (*p_rses)->rses_config.rw_max_slave_conn_percent/100; + int nservers = router_nsrv*pct; if ((*p_rses)->rses_config.rw_max_slave_conn_count < min_nsrv) { @@ -4313,11 +4370,11 @@ static bool have_enough_servers( LOGFILE_ERROR, "Error : Unable to start %s service. There are " "too few backend servers configured in " - "MaxScale.cnf. Found %d%% when at least %.0f%% " + "MaxScale.cnf. Found %d%% when at least %d%% " "would be required.", router->service->name, (*p_rses)->rses_config.rw_max_slave_conn_percent, - min_nsrv/(((double)router_nsrv)/100)))); + min_nsrv/(router_nsrv/100)))); } } free(*p_rses); @@ -4380,7 +4437,14 @@ static int rses_get_max_replication_lag( return conf_max_rlag; } - +/** + * Finds out if there is a backend reference pointing at the DCB given as + * parameter. + * @param rses router client session + * @param dcb DCB + * + * @return backend reference pointer if succeed or NULL + */ static backend_ref_t* get_bref_from_dcb( ROUTER_CLIENT_SES* rses, DCB* dcb) diff --git a/server/modules/routing/readwritesplit/test/CMakeLists.txt b/server/modules/routing/readwritesplit/test/CMakeLists.txt index 1cd40be63..d2af1e9b4 100644 --- a/server/modules/routing/readwritesplit/test/CMakeLists.txt +++ b/server/modules/routing/readwritesplit/test/CMakeLists.txt @@ -1,2 +1,7 @@ add_test(NAME ReadWriteSplitTest COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/rwsplit.sh testrwsplit.log ${TEST_HOST} ${TEST_PORT_RW} ${TEST_MASTER_ID} ${TEST_USER} ${TEST_PASSWORD} ${CMAKE_CURRENT_SOURCE_DIR}) + +if(MYSQLCLIENT_FOUND) + add_test(NAME ReadWriteSplitLoginTest COMMAND $ 10000 ${TEST_HOST} ${MASTER_PORT} ${TEST_HOST} ${TEST_PORT_RW} 1.10) +endif() + add_subdirectory(test_hints) \ No newline at end of file diff --git a/server/modules/routing/test/CMakeLists.txt b/server/modules/routing/test/CMakeLists.txt new file mode 100644 index 000000000..550985243 --- /dev/null +++ b/server/modules/routing/test/CMakeLists.txt @@ -0,0 +1,6 @@ +if(MYSQLCLIENT_FOUND) + add_executable(testconnect testconnect.c) + message(STATUS "Linking against: ${MYSQLCLIENT_LIBRARIES}") + target_link_libraries(testconnect ${MYSQLCLIENT_LIBRARIES} ssl crypto dl z m) + add_test(NAME ReadConnRouterLoginTest COMMAND $ 10000 ${TEST_HOST} ${MASTER_PORT} ${TEST_HOST} ${TEST_PORT} 1.10) +endif() \ No newline at end of file diff --git a/server/modules/routing/test/testconnect.c b/server/modules/routing/test/testconnect.c new file mode 100644 index 000000000..46a293e51 --- /dev/null +++ b/server/modules/routing/test/testconnect.c @@ -0,0 +1,97 @@ +#include +#include +#include +#include +#include +#include + +int main(int argc, char** argv) +{ + + MYSQL* server; + char *host; + unsigned int port; + int rval, iterations,i; + clock_t begin,end; + double baseline,test, ratio, result, minimum; + + if(argc < 7){ + fprintf(stderr,"Usage: %s \n",argv[0]); + fprintf(stderr,"The ratio is measured as:\ntest CPU time / baseline CPU time\n"); + fprintf(stderr,"The test fails if this ratio is exceeded.\n"); + return 1; + } + + iterations = atoi(argv[1]); + host = strdup(argv[2]); + port = atoi(argv[3]); + ratio = atof(argv[6]); + rval = 0; + + if(ratio <= 0.0){ + return 1; + } + + /**Testing direct connection to master*/ + + printf("Connecting to MySQL server through %s:%d.\n",host,port); + begin = clock(); + + for(i = 0;i ratio){ + printf("Test failed: CPU time ratio was %f which exceeded the limit of %f.\n", result, ratio); + rval = 1; + }else{ + printf("Test passed: CPU time ratio was %f.\n",result); + } + + free(host); + return rval; +} diff --git a/utils/skygw_utils.cc b/utils/skygw_utils.cc index 138689c4f..52ab2b732 100644 --- a/utils/skygw_utils.cc +++ b/utils/skygw_utils.cc @@ -204,16 +204,33 @@ int skygw_rwlock_unlock( int skygw_rwlock_destroy( skygw_rwlock_t* rwlock) { - int err = pthread_rwlock_destroy(rwlock->srw_rwlock); - - if (err == 0) { - rwlock->srw_rwlock_thr = 0; - rwlock->srw_rwlock = NULL; - } else { - ss_dfprintf(stderr, - "* pthread_rwlock_destroy : %s\n", - strerror(err)); + int err; + /** Lock */ + if ((err = pthread_rwlock_wrlock(rwlock->srw_rwlock)) != 0) + { + fprintf(stderr, + "* Error : pthread_rwlock_wrlock failed due to %d, %s.\n", + err, + strerror(err)); + goto retblock; + } + /** Clean the struct */ + rwlock->srw_rwlock_thr = 0; + /** Unlock */ + pthread_rwlock_unlock(rwlock->srw_rwlock); + /** Destroy */ + if ((err = pthread_rwlock_destroy(rwlock->srw_rwlock)) != 0) + { + fprintf(stderr, + "* Error : pthread_rwlock_destroy failed due to %d,%s\n", + err, + strerror(err)); } + else + { + rwlock->srw_rwlock = NULL; + } +retblock: return err; } @@ -659,7 +676,7 @@ int get_timestamp_len(void) * Write position in memory. Must be filled with at least * zeroes * - * @return Length of string written. Length includes terminating '\0'. + * @return Length of string written to p_ts. Length includes terminating '\0'. * * * @details (write detailed description here) @@ -671,9 +688,11 @@ int snprint_timestamp( { time_t t; struct tm tm; + int rval; if (p_ts == NULL) { - goto return_p_ts; + rval = 0; + goto retblock; } /** Generate timestamp */ @@ -689,8 +708,9 @@ int snprint_timestamp( tm.tm_min, tm.tm_sec); -return_p_ts: - return (strlen(p_ts)); + rval = strlen(p_ts); +retblock: + return rval; } @@ -950,13 +970,11 @@ void slcursor_add_data( CHK_SLIST_CURSOR(c); list = c->slcursor_list; CHK_SLIST(list); - pos = c->slcursor_pos; - - if (pos != NULL) { - CHK_SLIST_NODE(pos); - pos = list->slist_tail->slnode_next; + if (c->slcursor_pos != NULL) + { + CHK_SLIST_NODE(c->slcursor_pos); } - ss_dassert(pos == NULL); + ss_dassert(list->slist_tail->slnode_next == NULL); pos = slist_node_init(data, c); slist_add_node(list, pos); CHK_SLIST(list); @@ -1277,7 +1295,7 @@ simple_mutex_t* simple_mutex_init( /** Write zeroes if flat, free otherwise. */ if (sm->sm_flat) { - memset(sm, 0, sizeof(sm)); + memset(sm, 0, sizeof(*sm)); } else { simple_mutex_free_memory(sm); sm = NULL; @@ -1491,7 +1509,7 @@ skygw_mes_rc_t skygw_message_send( if (err != 0) { fprintf(stderr, "* Locking pthread mutex failed, " - "due error %d, %s\n", + "due to error %d, %s\n", err, strerror(errno)); goto return_mes_rc; @@ -1499,25 +1517,28 @@ skygw_mes_rc_t skygw_message_send( mes->mes_sent = true; err = pthread_cond_signal(&(mes->mes_cond)); - if (err != 0) { + if (err == 0) + { + rc = MES_RC_SUCCESS; + } + else + { fprintf(stderr, "* Signaling pthread cond var failed, " - "due error %d, %s\n", + "due to error %d, %s\n", err, strerror(errno)); - goto return_mes_rc; } err = pthread_mutex_unlock(&(mes->mes_mutex)); - if (err != 0) { + if (err != 0) + { fprintf(stderr, "* Unlocking pthread mutex failed, " - "due error %d, %s\n", + "due to error %d, %s\n", err, strerror(errno)); - goto return_mes_rc; } - rc = MES_RC_SUCCESS; return_mes_rc: return rc; @@ -1744,7 +1765,7 @@ bool skygw_file_write( #endif CHK_FILE(file); -#if (LAPTOP_TEST) +#if defined(LAPTOP_TEST) usleep(DISKWRITE_LATENCY); #else nwritten = fwrite(data, nbytes, 1, file->sf_file); @@ -1760,7 +1781,8 @@ bool skygw_file_write( } writecount += 1; - if (flush || writecount == FSYNCLIMIT) { + if (flush || writecount == FSYNCLIMIT) + { fd = fileno(file->sf_file); err = fflush(file->sf_file); err = fsync(fd); @@ -1779,21 +1801,21 @@ skygw_file_t* skygw_file_init( { skygw_file_t* file; - file = (skygw_file_t *)calloc(1, sizeof(skygw_file_t)); - - if (file == NULL) { + if ((file = (skygw_file_t *)calloc(1, sizeof(skygw_file_t))) == NULL) + { fprintf(stderr, - "* Memory allocation for skygw file failed.\n"); + "* Error : Memory allocation for file %s failed.\n", + fname); perror("SkyGW file allocation\n"); + goto return_file; } ss_dassert(file != NULL); file->sf_chk_top = CHK_NUM_FILE; file->sf_chk_tail = CHK_NUM_FILE; file->sf_fname = strdup(fname); - file->sf_file = fopen(file->sf_fname, "a"); - - if (file->sf_file == NULL) { + if ((file->sf_file = fopen(file->sf_fname, "a")) == NULL) + { int eno = errno; errno = 0; fprintf(stderr, @@ -1807,7 +1829,8 @@ skygw_file_t* skygw_file_init( } setvbuf(file->sf_file, NULL, _IONBF, 0); - if (!file_write_header(file)) { + if (!file_write_header(file)) + { int eno = errno; errno = 0; fprintf(stderr, @@ -1878,12 +1901,14 @@ void skygw_file_done( fprintf(stderr, "* Closing file %s failed : %s.\n", file->sf_fname, - strerror(err)); + strerror(errno)); } - ss_dassert(err == 0); - ss_dfprintf(stderr, "Closed %s\n", file->sf_fname); - free(file->sf_fname); - free(file); + else + { + ss_dfprintf(stderr, "Closed %s\n", file->sf_fname); + free(file->sf_fname); + free(file); + } } }