diff --git a/Documentation/Debug And Diagnostic Support.pdf b/Documentation/Debug And Diagnostic Support.pdf index 2b426b8e3..2b848d77b 100644 Binary files a/Documentation/Debug And Diagnostic Support.pdf and b/Documentation/Debug And Diagnostic Support.pdf differ diff --git a/Documentation/MaxScale 0.6 Release Notes.pdf b/Documentation/MaxScale 0.6 Release Notes.pdf new file mode 100644 index 000000000..0be8ddf0a Binary files /dev/null and b/Documentation/MaxScale 0.6 Release Notes.pdf differ diff --git a/server/core/config.c b/server/core/config.c index 4da5135e7..be38314ab 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -29,7 +29,8 @@ * 06/02/14 Massimiliano Pinto Added support for enable/disable root user in services * 14/02/14 Massimiliano Pinto Added enable_root_user in the service_params list * 11/03/14 Massimiliano Pinto Added Unix socket support - * 11/05/14 Massimiliano Pinto Added version_string support to service + * 11/05/14 Massimiliano Pinto Added version_string support to service + * 19/05/14 Mark Riddoch Added unique names from section headers * * @endverbatim */ @@ -58,7 +59,7 @@ static void check_config_objects(CONFIG_CONTEXT *context); static char *config_file = NULL; static GATEWAY_CONF gateway; -char *version_string = NULL; +char *version_string = NULL; /** * Config item handler for the ini file reader @@ -129,7 +130,6 @@ int rval; if (ptr) { *ptr = '\0'; } - } mysql_close(conn); } @@ -165,7 +165,6 @@ int rval; if (!config_file) return 0; - if (gateway.version_string) free(gateway.version_string); @@ -218,6 +217,8 @@ int error_count = 0; "router"); if (router) { + char* max_slave_conn_str; + obj->element = service_alloc(obj->object, router); char *user = config_get_value(obj->parameters, "user"); @@ -228,13 +229,6 @@ int error_count = 0; char *version_string = config_get_value(obj->parameters, "version_string"); - if (version_string) { - ((SERVICE *)(obj->element))->version_string = strdup(version_string); - } else { - if (gateway.version_string) - ((SERVICE *)(obj->element))->version_string = strdup(gateway.version_string); - } - if (obj->element == NULL) /*< if module load failed */ { LOGIF(LE, (skygw_log_write_flush( @@ -247,6 +241,17 @@ int error_count = 0; obj = obj->next; continue; /*< process next obj */ } + + if (version_string) { + ((SERVICE *)(obj->element))->version_string = strdup(version_string); + } else { + if (gateway.version_string) + ((SERVICE *)(obj->element))->version_string = strdup(gateway.version_string); + } + + max_slave_conn_str = + config_get_value(obj->parameters, + "max_slave_connections"); if (enable_root_user) serviceEnableRootUser(obj->element, atoi(enable_root_user)); @@ -267,6 +272,35 @@ int error_count = 0; "corresponding password.", obj->object))); } + if (max_slave_conn_str != NULL) + { + CONFIG_PARAMETER* param; + bool succp; + + param = config_get_param(obj->parameters, + "max_slave_connections"); + + succp = service_set_slave_conn_limit( + obj->element, + param, + max_slave_conn_str, + COUNT_ATMOST); + + if (!succp) + { + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : invalid value type " + "for parameter \'%s.%s = %s\'\n\tExpected " + "type is either for slave connection " + "count or\n\t%% for specifying the " + "maximum percentage of available the " + "slaves that will be connected.", + ((SERVICE*)obj->element)->name, + param->name, + param->value))); + } + } } else { @@ -299,6 +333,7 @@ int error_count = 0; obj->element = server_alloc(address, protocol, atoi(port)); + server_set_unique_name(obj->element, obj->object); } else { @@ -560,6 +595,89 @@ config_get_value(CONFIG_PARAMETER *params, const char *name) return NULL; } + +CONFIG_PARAMETER* config_get_param( + CONFIG_PARAMETER* params, + const char* name) +{ + while (params) + { + if (!strcmp(params->name, name)) + return params; + params = params->next; + } + return NULL; +} + +config_param_type_t config_get_paramtype( + CONFIG_PARAMETER* param) +{ + return param->qfd_param_type; +} + +int config_get_valint( + CONFIG_PARAMETER* param, + const char* name, /*< if NULL examine current param only */ + config_param_type_t ptype) +{ + int val = -1; /*< -1 indicates failure */ + + while (param) + { + if (name == NULL || !strncmp(param->name, name, MAX_PARAM_LEN)) + { + switch (ptype) { + case COUNT_TYPE: + val = param->qfd.valcount; + goto return_val; + + case PERCENT_TYPE: + val = param->qfd.valpercent; + goto return_val; + + case BOOL_TYPE: + val = param->qfd.valbool; + goto return_val; + + default: + goto return_val; + } + } + else if (name == NULL) + { + goto return_val; + } + param = param->next; + } +return_val: + return val; +} + + +CONFIG_PARAMETER* config_clone_param( + CONFIG_PARAMETER* param) +{ + CONFIG_PARAMETER* p2; + + p2 = (CONFIG_PARAMETER*) malloc(sizeof(CONFIG_PARAMETER)); + + if (p2 == NULL) + { + goto return_p2; + } + memcpy(p2, param, sizeof(CONFIG_PARAMETER)); + p2->name = strndup(param->name, MAX_PARAM_LEN); + p2->value = strndup(param->value, MAX_PARAM_LEN); + + if (param->qfd_param_type == STRING_TYPE) + { + p2->qfd.valstr = strndup(param->qfd.valstr, MAX_PARAM_LEN); + } + +return_p2: + return p2; +} + /** * Free a config tree * @@ -671,6 +789,7 @@ SERVER *server; char *user; char *auth; char *enable_root_user; + char* max_slave_conn_str; char *version_string; enable_root_user = config_get_value(obj->parameters, "enable_root_user"); @@ -683,8 +802,9 @@ SERVER *server; version_string = config_get_value(obj->parameters, "version_string"); if (version_string) { - if (service->version_string) + if (service->version_string) { free(service->version_string); + } service->version_string = strdup(version_string); } @@ -694,6 +814,42 @@ SERVER *server; auth); if (enable_root_user) serviceEnableRootUser(service, atoi(enable_root_user)); + max_slave_conn_str = + config_get_value( + obj->parameters, + "max_slave_connections"); + + if (max_slave_conn_str != NULL) + { + CONFIG_PARAMETER* param; + bool succp; + + param = config_get_param(obj->parameters, + "max_slave_connections"); + + succp = service_set_slave_conn_limit( + service, + param, + max_slave_conn_str, + COUNT_ATMOST); + + if (!succp) + { + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : invalid value type " + "for parameter \'%s.%s = %s\'\n\tExpected " + "type is either for slave connection " + "count or\n\t%% for specifying the " + "maximum percentage of available the " + "slaves that will be connected.", + ((SERVICE*)obj->element)->name, + param->name, + param->value))); + } + } + + } obj->element = service; @@ -704,7 +860,9 @@ SERVER *server; char *auth; char *enable_root_user; - enable_root_user = config_get_value(obj->parameters, "enable_root_user"); + enable_root_user = + config_get_value(obj->parameters, + "enable_root_user"); user = config_get_value(obj->parameters, "user"); @@ -920,6 +1078,7 @@ static char *service_params[] = "user", "passwd", "enable_root_user", + "max_slave_connections", "version_string", NULL }; @@ -1010,3 +1169,47 @@ int i; obj = obj->next; } } + +/** + * Set qualified parameter value to CONFIG_PARAMETER struct. + */ +bool config_set_qualified_param( + CONFIG_PARAMETER* param, + void* val, + config_param_type_t type) +{ + bool succp; + + switch (type) { + case STRING_TYPE: + param->qfd.valstr = strndup((const char *)val, MAX_PARAM_LEN); + succp = true; + break; + + case COUNT_TYPE: + param->qfd.valcount = *(int *)val; + succp = true; + break; + + case PERCENT_TYPE: + param->qfd.valpercent = *(int *)val; + succp = true; + break; + + case BOOL_TYPE: + param->qfd.valbool = *(bool *)val; + succp = true; + break; + + default: + succp = false; + break; + } + + if (succp) + { + param->qfd_param_type = type; + } + return succp; +} + diff --git a/server/core/dbusers.c b/server/core/dbusers.c index 6bd314c79..41dda5a92 100644 --- a/server/core/dbusers.c +++ b/server/core/dbusers.c @@ -184,6 +184,8 @@ getUsers(SERVICE *service, struct users *users) } serviceGetUser(service, &service_user, &service_passwd); + if (service_user == NULL || service_passwd == NULL) + return -1; /** multi-thread environment requires that thread init succeeds. */ if (mysql_thread_init()) { @@ -218,15 +220,26 @@ getUsers(SERVICE *service, struct users *users) */ server = service->databases; dpwd = decryptPassword(service_passwd); - while (server != NULL && mysql_real_connect(con, + while (server != NULL && (mysql_real_connect(con, server->name, service_user, dpwd, NULL, server->port, NULL, - 0) == NULL) + 0) == NULL)) { + if (server == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to connect to %s:%d, \"%s\"", + server->name, + server->port, + mysql_error(con)))); + mysql_close(con); + return -1; + } server = server->nextdb; } free(dpwd); diff --git a/server/core/dcb.c b/server/core/dcb.c index 8a3186aec..c499e067b 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -47,6 +47,7 @@ * error and 0 bytes to read. * This fixes a bug with many reads from * backend + * 07/05/2014 Mark Riddoch Addition of callback mechanism * * @endverbatim */ @@ -81,6 +82,7 @@ static bool dcb_set_state_nomutex( DCB* dcb, const dcb_state_t new_state, dcb_state_t* old_state); +static void dcb_call_callback(DCB *dcb, DCB_REASON reason); DCB* dcb_get_zombies(void) { @@ -95,8 +97,8 @@ DCB* dcb_get_zombies(void) * * @return A newly allocated DCB or NULL if non could be allocated. */ -DCB * dcb_alloc( - dcb_role_t role) +DCB * +dcb_alloc(dcb_role_t role) { DCB *rval; @@ -118,13 +120,17 @@ DCB *rval; spinlock_init(&rval->dcb_initlock); spinlock_init(&rval->writeqlock); spinlock_init(&rval->delayqlock); - spinlock_init(&rval->dcb_readqlock); spinlock_init(&rval->authlock); + spinlock_init(&rval->cb_lock); rval->fd = -1; memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics rval->state = DCB_STATE_ALLOC; bitmask_init(&rval->memdata.bitmask); + rval->writeqlen = 0; + rval->high_water = 0; + rval->low_water = 0; rval->next = NULL; + rval->callbacks = NULL; spinlock_acquire(&dcbspin); if (allDCBs == NULL) @@ -250,6 +256,8 @@ dcb_add_to_zombieslist(DCB *dcb) static void dcb_final_free(DCB *dcb) { +DCB_CALLBACK *cb; + CHK_DCB(dcb); ss_info_dassert(dcb->state == DCB_STATE_DISCONNECTED, "dcb not in DCB_STATE_DISCONNECTED state."); @@ -309,6 +317,19 @@ dcb_final_free(DCB *dcb) GWBUF *queue = dcb->delayq; while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL); } + if (dcb->dcb_readqueue) + { + GWBUF* queue = dcb->dcb_readqueue; + while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL); + } + + spinlock_acquire(&dcb->cb_lock); + while ((cb = dcb->callbacks) != NULL) + { + dcb->callbacks = cb->next; + free(cb); + } + spinlock_release(&dcb->cb_lock); if (dcb->dcb_readqueue) { @@ -690,16 +711,27 @@ return_n: int dcb_write(DCB *dcb, GWBUF *queue) { - int w; - int saved_errno = 0; +int w, qlen; +int saved_errno = 0; +int below_water; + below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0; ss_dassert(queue != NULL); + /** + * SESSION_STATE_STOPPING means that one of the backends is closing + * the router session. Some backends may have not completed + * authentication yet and thus they have no information about router + * being closed. Session state is changed to SESSION_STATE_STOPPING + * before router's closeSession is called and that tells that DCB may + * still be writable. + */ if (queue == NULL || (dcb->state != DCB_STATE_ALLOC && dcb->state != DCB_STATE_POLLING && dcb->state != DCB_STATE_LISTENING && - dcb->state != DCB_STATE_NOPOLLING)) + dcb->state != DCB_STATE_NOPOLLING && + dcb->session->state != SESSION_STATE_STOPPING)) { LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, @@ -710,6 +742,7 @@ dcb_write(DCB *dcb, GWBUF *queue) dcb, STRDCBSTATE(dcb->state), dcb->fd))); + ss_dassert(false); return 0; } @@ -726,6 +759,8 @@ dcb_write(DCB *dcb, GWBUF *queue) * the routine that drains the queue data, so we should * not have a race condition on the event. */ + qlen = gwbuf_length(queue); + atomic_add(&dcb->writeqlen, qlen); dcb->writeq = gwbuf_append(dcb->writeq, queue); dcb->stats.n_buffered++; LOGIF(LD, (skygw_log_write( @@ -838,6 +873,8 @@ dcb_write(DCB *dcb, GWBUF *queue) * for suspended write. */ dcb->writeq = queue; + qlen = gwbuf_length(queue); + atomic_add(&dcb->writeqlen, qlen); if (queue != NULL) { @@ -861,6 +898,13 @@ dcb_write(DCB *dcb, GWBUF *queue) return 0; } spinlock_release(&dcb->writeqlock); + + if (dcb->high_water && dcb->writeqlen > dcb->high_water && below_water) + { + atomic_add(&dcb->stats.n_high_water, 1); + dcb_call_callback(dcb, DCB_REASON_HIGH_WATER); + } + return 1; } @@ -875,9 +919,12 @@ dcb_write(DCB *dcb, GWBUF *queue) int dcb_drain_writeq(DCB *dcb) { -int n = 0; -int w; -int saved_errno = 0; +int n = 0; +int w; +int saved_errno = 0; +int above_water; + + above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0; spinlock_acquire(&dcb->writeqlock); if (dcb->writeq) @@ -938,6 +985,17 @@ int saved_errno = 0; } } spinlock_release(&dcb->writeqlock); + atomic_add(&dcb->writeqlen, -n); + /* The write queue has drained, potentially need to call a callback function */ + if (dcb->writeq == NULL) + dcb_call_callback(dcb, DCB_REASON_DRAINED); + if (above_water && dcb->writeqlen < dcb->low_water) + { + atomic_add(&dcb->stats.n_low_water, 1); + dcb_call_callback(dcb, DCB_REASON_LOW_WATER); + } + + return n; } @@ -980,6 +1038,8 @@ dcb_close(DCB *dcb) ss_dassert(dcb->state == DCB_STATE_NOPOLLING || dcb->state == DCB_STATE_ZOMBIE); + dcb_call_callback(dcb, DCB_REASON_CLOSE); + if (rc == 0) { LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, @@ -1022,6 +1082,8 @@ printDCB(DCB *dcb) printf("\t\tNo. of Writes: %d\n", dcb->stats.n_writes); printf("\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered); printf("\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); + printf("\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); + printf("\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); } /** @@ -1068,6 +1130,8 @@ DCB *dcb; dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes); dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered); dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); + dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); + dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); dcb = dcb->next; } spinlock_release(&dcbspin); @@ -1093,6 +1157,8 @@ dprintDCB(DCB *pdcb, DCB *dcb) dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes); dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered); dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); + dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); + dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); } /** @@ -1429,4 +1495,163 @@ int gw_write( return w; } +/** + * Add a callback + * + * Duplicate registrations are not allowed, therefore an error will be returned if + * the specific function, reason and userdata triple are already registered. + * An error will also be returned if the is insufficient memeory available to + * create the registration. + * + * @param dcb The DCB to add the callback to + * @param reason The callback reason + * @param cb The callback function to call + * @param userdata User data to send in the call + * @return Non-zero (true) if the callback was added + */ +int +dcb_add_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata) +{ +DCB_CALLBACK *cb, *ptr; +int rval = 1; + if ((ptr = (DCB_CALLBACK *)malloc(sizeof(DCB_CALLBACK))) == NULL) + { + return 0; + } + ptr->reason = reason; + ptr->cb = callback; + ptr->userdata = userdata; + ptr->next = NULL; + spinlock_acquire(&dcb->cb_lock); + cb = dcb->callbacks; + if (cb == NULL) + { + dcb->callbacks = ptr; + spinlock_release(&dcb->cb_lock); + } + else + { + while (cb) + { + if (cb->reason == reason && cb->cb == callback && + cb->userdata == userdata) + { + free(ptr); + spinlock_release(&dcb->cb_lock); + return 0; + } + if (cb->next == NULL) + cb->next = ptr; + cb = cb->next; + } + spinlock_release(&dcb->cb_lock); + } + return rval; +} + +/** + * Remove a callback from the callback list for the DCB + * + * Searches down the linked list to find he callback with a matching reason, function + * and userdata. + * + * @param dcb The DCB to add the callback to + * @param reason The callback reason + * @param cb The callback function to call + * @param userdata User data to send in the call + * @return Non-zero (true) if the callback was removed + */ +int +dcb_remove_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON), void *userdata) +{ +DCB_CALLBACK *cb, *pcb = NULL; +int rval = 0; + + spinlock_acquire(&dcb->cb_lock); + cb = dcb->callbacks; + if (cb == NULL) + { + rval = 0; + } + else + { + while (cb) + { + if (cb->reason == reason && cb->cb == callback + && cb->userdata == userdata) + { + if (pcb == NULL) + pcb->next = cb->next; + else + dcb->callbacks = cb->next; + spinlock_release(&dcb->cb_lock); + free(cb); + rval = 1; + break; + } + pcb = cb; + cb = cb->next; + } + } + if (!rval) + spinlock_release(&dcb->cb_lock); + return rval; +} + +/** + * Call the set of callbacks registered for a particular reason. + * + * @param dcb The DCB to call the callbacks regarding + * @param reason The reason that has triggered the call + */ +static void +dcb_call_callback(DCB *dcb, DCB_REASON reason) +{ +DCB_CALLBACK *cb, *nextcb; + + spinlock_acquire(&dcb->cb_lock); + cb = dcb->callbacks; + while (cb) + { + if (cb->reason == reason) + { + nextcb = cb->next; + spinlock_release(&dcb->cb_lock); + cb->cb(dcb, reason, cb->userdata); + spinlock_acquire(&dcb->cb_lock); + cb = nextcb; + } + else + cb = cb->next; + } + spinlock_release(&dcb->cb_lock); +} + +/** + * Check the passed DCB to ensure it is in the list of allDCBS + * + * @param DCB The DCB to check + * @return 1 if the DCB is in the list, otherwise 0 + */ +int +dcb_isvalid(DCB *dcb) +{ +DCB *ptr; +int rval = 0; + + spinlock_acquire(&dcbspin); + ptr = allDCBs; + while (ptr) + { + if (ptr == dcb) + { + rval = 1; + break; + } + ptr = ptr->next; + } + spinlock_release(&dcbspin); + + return rval; +} diff --git a/server/core/gateway.c b/server/core/gateway.c index bd773c7f9..2bd592fe7 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -136,7 +136,7 @@ static void usage(void); static char* get_expanded_pathname( char** abs_path, char* input_path, - char* fname); + const char* fname); static void print_log_n_stderr( bool do_log, bool do_stderr, @@ -725,7 +725,7 @@ static bool file_is_writable( static char* get_expanded_pathname( char** output_path, char* relative_path, - char* fname) + const char* fname) { char* cnf_file_buf = NULL; char* expanded_path; @@ -1228,7 +1228,7 @@ int main(int argc, char **argv) datadir))); LOGIF(LM, (skygw_log_write_flush( LOGFILE_MESSAGE, - "Configuration file : %s", + "Configuration file : %s", cnf_file_path))); /*< Update the server options */ diff --git a/server/core/monitor.c b/server/core/monitor.c index 6c028cad6..cee2f2d9e 100644 --- a/server/core/monitor.c +++ b/server/core/monitor.c @@ -197,3 +197,26 @@ MONITOR *ptr; } spinlock_release(&monLock); } + +/** + * Find a monitor by name + * + * @param name The name of the monitor + * @return Pointer to the monitor or NULL + */ +MONITOR * +monitor_find(char *name) +{ +MONITOR *ptr; + + spinlock_acquire(&monLock); + ptr = allMonitors; + while (ptr) + { + if (!strcmp(ptr->name, name)) + break; + ptr = ptr->next; + } + spinlock_release(&monLock); + return ptr; +} diff --git a/server/core/server.c b/server/core/server.c index bf5b3174f..058db7460 100644 --- a/server/core/server.c +++ b/server/core/server.c @@ -22,8 +22,10 @@ * @verbatim * Revision History * - * Date Who Description - * 18/06/13 Mark Riddoch Initial implementation + * Date Who Description + * 18/06/13 Mark Riddoch Initial implementation + * 17/05/14 Mark Riddoch Addition of unique_name + * 20/05/14 Massimiliano Pinto Addition of server_string * * @endverbatim */ @@ -67,6 +69,8 @@ SERVER *server; server->nextdb = NULL; server->monuser = NULL; server->monpw = NULL; + server->unique_name = NULL; + server->server_string = NULL; spinlock_acquire(&server_spin); server->next = allServers; @@ -109,10 +113,51 @@ SERVER *ptr; /* Clean up session and free the memory */ free(server->name); free(server->protocol); + if (server->unique_name) + free(server->unique_name); + if (server->server_string) + free(server->server_string); free(server); return 1; } +/** + * Set a unique name for the server + * + * @param server The server to ste the name on + * @param name The unique name for the server + */ +void +server_set_unique_name(SERVER *server, char *name) +{ + server->unique_name = strdup(name); +} + +/** + * Find an existing server using the unique section name in + * configuration file + * + * @param servname The Server name or address + * @param port The server port + * @return The server or NULL if not found + */ +SERVER * +server_find_by_unique_name(char *name) +{ +SERVER *server; + + spinlock_acquire(&server_spin); + server = allServers; + while (server) + { + if (strcmp(server->unique_name, name) == 0) + break; + server = server->next; + } + spinlock_release(&server_spin); + return server; +} + /** * Find an existing server * @@ -190,13 +235,15 @@ char *stat; ptr = allServers; while (ptr) { - dcb_printf(dcb, "Server %p\n", ptr); + dcb_printf(dcb, "Server %p (%s)\n", ptr, ptr->unique_name); dcb_printf(dcb, "\tServer: %s\n", ptr->name); stat = server_status(ptr); dcb_printf(dcb, "\tStatus: %s\n", stat); free(stat); dcb_printf(dcb, "\tProtocol: %s\n", ptr->protocol); dcb_printf(dcb, "\tPort: %d\n", ptr->port); + if (ptr->server_string) + dcb_printf(dcb, "\tServer Version:\t\t%s\n", ptr->server_string); dcb_printf(dcb, "\tNumber of connections: %d\n", ptr->stats.n_connections); dcb_printf(dcb, "\tCurrent no. of connections: %d\n", ptr->stats.n_current); ptr = ptr->next; @@ -215,13 +262,15 @@ dprintServer(DCB *dcb, SERVER *server) { char *stat; - dcb_printf(dcb, "Server %p\n", server); + dcb_printf(dcb, "Server %p (%s)\n", server, server->unique_name); dcb_printf(dcb, "\tServer: %s\n", server->name); stat = server_status(server); dcb_printf(dcb, "\tStatus: %s\n", stat); free(stat); dcb_printf(dcb, "\tProtocol: %s\n", server->protocol); dcb_printf(dcb, "\tPort: %d\n", server->port); + if (server->server_string) + dcb_printf(dcb, "\tServer Version:\t\t%s\n", server->server_string); dcb_printf(dcb, "\tNumber of connections: %d\n", server->stats.n_connections); dcb_printf(dcb, "\tCurrent No. of connections: %d\n", server->stats.n_current); } diff --git a/server/core/service.c b/server/core/service.c index 2e7632bb8..efc441173 100644 --- a/server/core/service.c +++ b/server/core/service.c @@ -29,6 +29,7 @@ * 25/02/14 Massimiliano Pinto Added: service refresh limit feature * 28/02/14 Massimiliano Pinto users_alloc moved from service_alloc to serviceStartPort (generic hashable for services) * 07/05/14 Massimiliano Pinto Added: version_string initialized to NULL + * 23/05/14 Mark Riddoch Addition of service validation call * * @endverbatim */ @@ -55,6 +56,11 @@ extern int lm_enabled_logfiles_bitmask; static SPINLOCK service_spin = SPINLOCK_INIT; static SERVICE *allServices = NULL; +static void service_add_qualified_param( + SERVICE* svc, + CONFIG_PARAMETER* param); + + /** * Allocate a new service for the gateway to support * @@ -102,6 +108,8 @@ SERVICE *service; service->enable_root = 0; service->routerOptions = NULL; service->databases = NULL; + service->svc_config_param = NULL; + service->svc_config_version = 0; spinlock_init(&service->spin); spinlock_init(&service->users_table_spin); memset(&service->rate_limit, 0, sizeof(SERVICE_REFRESH_RATE)); @@ -114,6 +122,33 @@ SERVICE *service; return service; } +/** + * Check to see if a service pointer is valid + * + * @param service The poitner to check + * @return 1 if the service is in the list of all services + */ +int +service_isvalid(SERVICE *service) +{ +SERVICE *ptr; +int rval = 0; + + spinlock_acquire(&service_spin); + ptr = allServices; + while (ptr) + { + if (ptr == service) + { + rval = 1; + break; + } + ptr = ptr->next; + } + spinlock_release(&service_spin); + return rval; +} + /** * Start an individual port/protocol pair * @@ -782,3 +817,111 @@ int service_refresh_users(SERVICE *service) { else return 1; } + +bool service_set_slave_conn_limit ( + SERVICE* service, + CONFIG_PARAMETER* param, + char* valstr, + count_spec_t count_spec) +{ + char* p; + int valint; + bool percent = false; + bool succp; + + /** + * Find out whether the value is numeric and ends with '%' or '\0' + */ + p = valstr; + + while(isdigit(*p)) p++; + + errno = 0; + + if (p == valstr || (*p != '%' && *p != '\0')) + { + succp = false; + } + else if (*p == '%') + { + if (*(p+1) == '\0') + { + *p = '\0'; + valint = (int) strtol(valstr, (char **)NULL, 10); + + if (valint == 0 && errno != 0) + { + succp = false; + } + else + { + succp = true; + config_set_qualified_param(param, (void *)&valint, PERCENT_TYPE); + } + } + else + { + succp = false; + } + } + else if (*p == '\0') + { + valint = (int) strtol(valstr, (char **)NULL, 10); + + if (valint == 0 && errno != 0) + { + succp = false; + } + else + { + succp = true; + config_set_qualified_param(param, (void *)&valint, COUNT_TYPE); + } + } + + if (succp) + { + service_add_qualified_param(service, param); /*< add param to svc */ + } + return succp; +} + +/** + * Add qualified config parameter to SERVICE struct. + */ +static void service_add_qualified_param( + SERVICE* svc, + CONFIG_PARAMETER* param) +{ + CONFIG_PARAMETER** p; + + spinlock_acquire(&svc->spin); + + p = &svc->svc_config_param; + + if ((*p) != NULL) + { + do + { + /** If duplicate is found, latter remains */ + if (strncasecmp(param->name, + (*p)->name, + strlen(param->name)) == 0) + { + *p = config_clone_param(param); + break; + } + } + while ((*p)->next != NULL); + + (*p)->next = config_clone_param(param); + } + else + { + (*p) = config_clone_param(param); + } + /** Increment service's configuration version */ + atomic_add(&svc->svc_config_version, 1); + (*p)->next = NULL; + spinlock_release(&svc->spin); +} diff --git a/server/core/session.c b/server/core/session.c index 4df4cef07..bd8188bcc 100644 --- a/server/core/session.c +++ b/server/core/session.c @@ -114,7 +114,7 @@ session_alloc(SERVICE *service, DCB *client_dcb) /* * Only create a router session if we are not the listening - * DCB. Creating a router session may create a connection to a + * DCB or an internal DCB. Creating a router session may create a connection to a * backend server, depending upon the router module implementation * and should be avoided for the listener session * @@ -122,13 +122,17 @@ session_alloc(SERVICE *service, DCB *client_dcb) * session, therefore it is important that the session lock is * relinquished beforethe router call. */ - if (client_dcb->state != DCB_STATE_LISTENING) + if (client_dcb->state != DCB_STATE_LISTENING && client_dcb->dcb_role != DCB_ROLE_INTERNAL) { session->router_session = service->router->newSession(service->router_instance, session); if (session->router_session == NULL) { + /** + * Inform other threads that session is closing. + */ + session->state == SESSION_STATE_STOPPING; /*< * Decrease refcount, set dcb's session pointer NULL * and set session pointer to NULL. @@ -138,8 +142,8 @@ session_alloc(SERVICE *service, DCB *client_dcb) session = NULL; LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, - "Error : Failed to create router " - "client session. Freeing allocated resources."))); + "Error : Failed to create %s session.", + service->name))); goto return_session; } @@ -269,6 +273,34 @@ return_succp : return succp; } +/** + * Check to see if a session is valid, i.e. in the list of all sessions + * + * @param session Session to check + * @return 1 if the session is valid otherwise 0 + */ +int +session_isvalid(SESSION *session) +{ +SESSION *ptr; +int rval = 0; + + spinlock_acquire(&session_spin); + ptr = allSessions; + while (ptr) + { + if (ptr == session) + { + rval = 1; + break; + } + ptr = ptr->next; + } + spinlock_release(&session_spin); + + return rval; +} + /** * Print details of an individual session * @@ -441,3 +473,18 @@ session_state(int state) return "Invalid State"; } } + +SESSION* get_session_by_router_ses( + void* rses) +{ + SESSION* ses = allSessions; + + while (ses->router_session != rses && ses->next != NULL) + ses = ses->next; + + if (ses->router_session != rses) + { + ses = NULL; + } + return ses; +} diff --git a/server/include/config.h b/server/include/config.h index 03da9b029..9331739f6 100644 --- a/server/include/config.h +++ b/server/include/config.h @@ -17,6 +17,7 @@ * * Copyright SkySQL Ab 2013 */ +#include /** * @file config.h The configuration handling elements @@ -31,12 +32,32 @@ * @endverbatim */ +/** + * Maximum length for configuration parameter value. + */ +enum {MAX_PARAM_LEN=256}; + +typedef enum { + UNDEFINED_TYPE=0, + STRING_TYPE, + COUNT_TYPE, + PERCENT_TYPE, + BOOL_TYPE +} config_param_type_t; + /** * The config parameter */ typedef struct config_parameter { char *name; /**< The name of the parameter */ - char *value; /**< The value of the parameter */ + char *value; /**< The value of the parameter */ + union { /*< qualified parameter value by type */ + char* valstr; /*< terminated char* array */ + int valcount; /*< int */ + int valpercent; /*< int */ + bool valbool; /*< bool */ + } qfd; + config_param_type_t qfd_param_type; struct config_parameter *next; /**< Next pointer in the linked list */ } CONFIG_PARAMETER; @@ -59,7 +80,22 @@ typedef struct { char *version_string; /**< The version string of embedded database library */ } GATEWAY_CONF; -extern int config_load(char *); -extern int config_reload(); -extern int config_threadcount(); +extern int config_load(char *); +extern int config_reload(); +extern int config_threadcount(); +CONFIG_PARAMETER* config_get_param(CONFIG_PARAMETER* params, const char* name); +config_param_type_t config_get_paramtype(CONFIG_PARAMETER* param); +CONFIG_PARAMETER* config_clone_param(CONFIG_PARAMETER* param); + +bool config_set_qualified_param( + CONFIG_PARAMETER* param, + void* val, + config_param_type_t type); + + +int config_get_valint( + CONFIG_PARAMETER* param, + const char* name, /*< if NULL examine current param only */ + config_param_type_t ptype); + #endif diff --git a/server/include/dcb.h b/server/include/dcb.h index f0c4cff31..e90a64856 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -48,6 +48,8 @@ struct service; * 15/07/2013 Massimiliano Pinto Added session entry point * 16/07/2013 Massimiliano Pinto Added command type for dcb * 07/02/2014 Massimiliano Pinto Added ipv4 data struct into for dcb + * 07/05/2014 Mark Riddoch Addition of callback mechanism + * 08/05/2014 Mark Riddoch Addition of writeq high and low watermarks * * @endverbatim */ @@ -99,6 +101,8 @@ typedef struct dcbstats { int n_writes; /*< Number of writes on this descriptor */ int n_accepts; /*< Number of accepts on this descriptor */ int n_buffered; /*< Number of buffered writes */ + int n_high_water; /*< Number of crosses of high water mark */ + int n_low_water; /*< Number of crosses of low water mark */ } DCBSTATS; /** @@ -137,10 +141,35 @@ typedef enum { } dcb_state_t; typedef enum { - DCB_ROLE_SERVICE_LISTENER, /*< Receives initial connect requests from clients */ - DCB_ROLE_REQUEST_HANDLER /*< Serves dedicated client */ + DCB_ROLE_SERVICE_LISTENER, /*< Receives initial connect requests from clients */ + DCB_ROLE_REQUEST_HANDLER, /*< Serves dedicated client */ + DCB_ROLE_INTERNAL /*< Internal DCB not connected to the outside */ } dcb_role_t; +/** + * Callback reasons for the DCB callback mechanism. + */ +typedef enum { + DCB_REASON_CLOSE, /*< The DCB is closing */ + DCB_REASON_DRAINED, /*< The write delay queue has drained */ + DCB_REASON_HIGH_WATER, /*< Cross high water mark */ + DCB_REASON_LOW_WATER, /*< Cross low water mark */ + DCB_REASON_ERROR, /*< An error was flagged on the connection */ + DCB_REASON_HUP /*< A hangup was detected */ +} DCB_REASON; + +/** + * Callback structure - used to track callbacks registered on a DCB + */ +typedef struct dcb_callback { + DCB_REASON reason; /*< The reason for the callback */ + int (*cb)(struct dcb *dcb, DCB_REASON reason, void *userdata); + void *userdata; /*< User data to be sent in the callback */ + struct dcb_callback + *next; /*< Next callback for this DCB */ +} DCB_CALLBACK; + + /** * Descriptor Control Block * @@ -172,12 +201,12 @@ typedef struct dcb { struct session *session; /**< The owning session */ GWPROTOCOL func; /**< The functions for this descriptor */ + unsigned int writeqlen; /**< Current number of byes in the write queue */ SPINLOCK writeqlock; /**< Write Queue spinlock */ GWBUF *writeq; /**< Write Data Queue */ SPINLOCK delayqlock; /**< Delay Backend Write Queue spinlock */ GWBUF *delayq; /**< Delay Backend Write Data Queue */ GWBUF *dcb_readqueue; /**< read queue for storing incomplete reads */ - SPINLOCK dcb_readqlock; /**< read/write access protection to read queue */ SPINLOCK authlock; /**< Generic Authorization spinlock */ DCBSTATS stats; /**< DCB related statistics */ @@ -187,6 +216,11 @@ typedef struct dcb { void *data; /**< Specific client data */ DCBMM memdata; /**< The data related to DCB memory management */ int command; /**< Specific client command type */ + SPINLOCK cb_lock; /**< The lock for the callbacks linked list */ + DCB_CALLBACK *callbacks; /**< The list of callbacks for the DCB */ + + unsigned int high_water; /**< High water mark */ + unsigned int low_water; /**< Low water mark */ #if defined(SS_DEBUG) skygw_chk_t dcb_chk_tail; #endif @@ -205,6 +239,11 @@ int fail_accept_errno; #define DCB_SESSION(x) (x)->session #define DCB_PROTOCOL(x, type) (type *)((x)->protocol) #define DCB_ISZOMBIE(x) ((x)->state == DCB_STATE_ZOMBIE) +#define DCB_WRITEQLEN(x) (x)->writeqlen +#define DCB_SET_LOW_WATER(x, lo) (x)->low_water = (lo); +#define DCB_SET_HIGH_WATER(x, hi) (x)->low_water = (hi); +#define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water) +#define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water) DCB *dcb_get_zombies(void); int gw_write( @@ -231,6 +270,11 @@ void dcb_printf(DCB *, const char *, ...); /* DCB version of printf */ int dcb_isclient(DCB *); /* the DCB is the client of the session */ void dcb_hashtable_stats(DCB *, void *); /**< Print statisitics */ void dcb_add_to_zombieslist(DCB* dcb); +int dcb_add_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *), + void *); +int dcb_remove_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON), + void *); +int dcb_isvalid(DCB *); /* Check the DCB is in the linked list */ bool dcb_set_state( DCB* dcb, diff --git a/server/include/monitor.h b/server/include/monitor.h index 6aa0f2430..6444fecd5 100644 --- a/server/include/monitor.h +++ b/server/include/monitor.h @@ -29,6 +29,7 @@ * Date Who Description * 07/07/13 Mark Riddoch Initial implementation * 25/07/13 Mark Riddoch Addition of diagnotics + * 23/05/14 Mark Riddoch Addition of routine to find monitors by name * * @endverbatim */ @@ -79,6 +80,7 @@ typedef struct monitor { extern MONITOR *monitor_alloc(char *, char *); extern void monitor_free(MONITOR *); +extern MONITOR *monitor_find(char *); extern void monitorAddServer(MONITOR *, SERVER *); extern void monitorAddUser(MONITOR *, char *, char *); extern void monitorStop(MONITOR *); diff --git a/server/include/server.h b/server/include/server.h index e355e8c49..19ca798ae 100644 --- a/server/include/server.h +++ b/server/include/server.h @@ -27,10 +27,12 @@ * @verbatim * Revision History * - * Date Who Description - * 14/06/13 Mark Riddoch Initial implementation - * 21/06/13 Mark Riddoch Addition of server status flags - * 22/07/13 Mark Riddoch Addition of JOINED status for Galera + * Date Who Description + * 14/06/13 Mark Riddoch Initial implementation + * 21/06/13 Mark Riddoch Addition of server status flags + * 22/07/13 Mark Riddoch Addition of JOINED status for Galera + * 18/05/14 Mark Riddoch Addition of unique_name field + * 20/05/14 Massimiliano Pinto Addition of server_string field * * @endverbatim */ @@ -51,6 +53,7 @@ typedef struct { * between the gateway and the server. */ typedef struct server { + char *unique_name; /**< Unique name for the server */ char *name; /**< Server name/IP address*/ unsigned short port; /**< Port to listen on */ char *protocol; /**< Protocol module to use */ @@ -60,6 +63,7 @@ typedef struct server { SERVER_STATS stats; /**< The server statistics */ struct server *next; /**< Next server */ struct server *nextdb; /**< Next server in list attached to a service */ + char *server_string; /**< Server version string, i.e. MySQL server version */ } SERVER; /** @@ -103,6 +107,7 @@ typedef struct server { extern SERVER *server_alloc(char *, char *, unsigned short); extern int server_free(SERVER *); +extern SERVER *server_find_by_unique_name(char *); extern SERVER *server_find(char *, unsigned short); extern void printServer(SERVER *); extern void printAllServers(); diff --git a/server/include/service.h b/server/include/service.h index 85fe3972d..9369712f0 100644 --- a/server/include/service.h +++ b/server/include/service.h @@ -22,6 +22,7 @@ #include #include #include +#include "config.h" /** * @file service.h @@ -116,6 +117,8 @@ typedef struct service { SERVICE_STATS stats; /**< The service statistics */ struct users *users; /**< The user data for this service */ int enable_root; /**< Allow root user access */ + CONFIG_PARAMETER* svc_config_param; /*< list of config params and values */ + int svc_config_version; /*< Version number of configuration */ SPINLOCK users_table_spin; /**< The spinlock for users data refresh */ SERVICE_REFRESH_RATE @@ -123,12 +126,15 @@ typedef struct service { struct service *next; /**< The next service in the linked list */ } SERVICE; +typedef enum count_spec_t {COUNT_ATLEAST=0, COUNT_EXACT, COUNT_ATMOST} count_spec_t; + #define SERVICE_STATE_ALLOC 1 /**< The service has been allocated */ #define SERVICE_STATE_STARTED 2 /**< The service has been started */ extern SERVICE *service_alloc(char *, char *); extern int service_free(SERVICE *); extern SERVICE *service_find(char *); +extern int service_isvalid(SERVICE *); extern int serviceAddProtocol(SERVICE *, char *, char *, unsigned short); extern int serviceHasProtocol(SERVICE *, char *, unsigned short); extern void serviceAddBackend(SERVICE *, SERVER *); @@ -148,5 +154,10 @@ extern int service_refresh_users(SERVICE *); extern void printService(SERVICE *); extern void printAllServices(); extern void dprintAllServices(DCB *); +bool service_set_slave_conn_limit ( + SERVICE* service, + CONFIG_PARAMETER* param, + char* valstr, + count_spec_t count_spec); extern void dprintService(DCB *, SERVICE *); #endif diff --git a/server/include/session.h b/server/include/session.h index 6cafa160d..790922d25 100644 --- a/server/include/session.h +++ b/server/include/session.h @@ -53,6 +53,7 @@ typedef enum { SESSION_STATE_ALLOC, /*< for all sessions */ SESSION_STATE_READY, /*< for router session */ SESSION_STATE_ROUTER_READY, /*< for router session */ + SESSION_STATE_STOPPING, /*< router is being closed */ SESSION_STATE_LISTENER, /*< for listener session */ SESSION_STATE_LISTENER_STOPPED, /*< for listener session */ SESSION_STATE_FREE /*< for all sessions */ @@ -87,10 +88,12 @@ typedef struct session { SESSION *session_alloc(struct service *, struct dcb *); bool session_free(SESSION *); +int session_isvalid(SESSION *); void printAllSessions(); void printSession(SESSION *); void dprintAllSessions(struct dcb *); void dprintSession(struct dcb *, SESSION *); char *session_state(int); bool session_link_dcb(SESSION *, struct dcb *); +SESSION* get_session_by_router_ses(void* rses); #endif diff --git a/server/modules/include/debugcli.h b/server/modules/include/debugcli.h index 0e7afcfe2..b373cae6b 100644 --- a/server/modules/include/debugcli.h +++ b/server/modules/include/debugcli.h @@ -41,6 +41,7 @@ struct cli_session; typedef struct cli_instance { SPINLOCK lock; /*< The instance spinlock */ SERVICE *service; /*< The debug cli service */ + int mode; /*< CLI interface mode */ struct cli_session *sessions; /*< Linked list of sessions within this instance */ struct cli_instance @@ -53,8 +54,13 @@ typedef struct cli_instance { */ typedef struct cli_session { char cmdbuf[80]; /*< The command buffer used to build up user commands */ + int mode; /*< The CLI Mode for this session */ SESSION *session; /*< The gateway session */ struct cli_session *next; /*< The next pointer for the list of sessions */ } CLI_SESSION; + +/* Command line interface modes */ +#define CLIM_USER 1 +#define CLIM_DEVELOPER 2 #endif diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 17697f101..a4eecf4d5 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -31,15 +31,13 @@ #include -/** - * Internal structure used to define the set of backend servers we are routing - * connections to. This provides the storage for routing module specific data - * that is required for each of the backend servers. - */ -typedef struct backend { - SERVER* backend_server; /*< The server itself */ - int backend_conn_count; /*< Number of connections to the server */ -} BACKEND; +typedef enum backend_type_t { + BE_UNDEFINED=-1, + BE_MASTER, + BE_JOINED = BE_MASTER, + BE_SLAVE, + BE_COUNT +} backend_type_t; typedef struct rses_property_st rses_property_t; typedef struct router_client_session ROUTER_CLIENT_SES; @@ -52,14 +50,34 @@ typedef enum rses_property_type_t { RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1 } rses_property_type_t; -typedef enum backend_type_t { - BE_UNDEFINED=-1, - BE_MASTER, - BE_JOINED = BE_MASTER, - BE_SLAVE, - BE_COUNT -} backend_type_t; + +/** + * This criteria is used when backends are chosen for a router session's use. + * Backend servers are sorted to ascending order according to the criteria + * and top N are chosen. + */ +typedef enum select_criteria { + UNDEFINED_CRITERIA=0, + LEAST_GLOBAL_CONNECTIONS, /*< all connections established by MaxScale */ + DEFAULT_CRITERIA=LEAST_GLOBAL_CONNECTIONS, + LEAST_ROUTER_CONNECTIONS, /*< connections established by this router */ + LEAST_BEHIND_MASTER, + LAST_CRITERIA /*< not used except for an index */ +} select_criteria_t; + + +/** default values for rwsplit configuration parameters */ +#define CONFIG_MAX_SLAVE_CONN 1 + +#define GET_SELECT_CRITERIA(s) \ + (strncmp(s,"LEAST_GLOBAL_CONNECTIONS", strlen("LEAST_GLOBAL_CONNECTIONS")) == 0 ? \ + LEAST_GLOBAL_CONNECTIONS : ( \ + strncmp(s,"LEAST_BEHIND_MASTER", strlen("LEAST_BEHIND_MASTER")) == 0 ? \ + LEAST_BEHIND_MASTER : ( \ + strncmp(s,"LEAST_ROUTER_CONNECTIONS", strlen("LEAST_ROUTER_CONNECTIONS")) == 0 ? \ + LEAST_ROUTER_CONNECTIONS : UNDEFINED_CRITERIA))) + /** * Session variable command */ @@ -98,13 +116,63 @@ struct rses_property_st { }; typedef struct sescmd_cursor_st { +#if defined(SS_DEBUG) + skygw_chk_t scmd_cur_chk_top; +#endif ROUTER_CLIENT_SES* scmd_cur_rses; /*< pointer to owning router session */ rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */ mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */ bool scmd_cur_active; /*< true if command is being executed */ - backend_type_t scmd_cur_be_type; /*< BE_MASTER or BE_SLAVE */ +#if defined(SS_DEBUG) + skygw_chk_t scmd_cur_chk_tail; +#endif } sescmd_cursor_t; +/** + * Internal structure used to define the set of backend servers we are routing + * connections to. This provides the storage for routing module specific data + * that is required for each of the backend servers. + * + * Owned by router_instance, referenced by each routing session. + */ +typedef struct backend_st { +#if defined(SS_DEBUG) + skygw_chk_t be_chk_top; +#endif + SERVER* backend_server; /*< The server itself */ + int backend_conn_count; /*< Number of connections to the server */ + bool be_valid; /*< valid when belongs to the router's configuration */ +#if defined(SS_DEBUG) + skygw_chk_t be_chk_tail; +#endif +} BACKEND; + + +/** + * Reference to BACKEND. + * + * Owned by router client session. + */ +typedef struct backend_ref_st { +#if defined(SS_DEBUG) + skygw_chk_t bref_chk_top; +#endif + BACKEND* bref_backend; + DCB* bref_dcb; + sescmd_cursor_t bref_sescmd_cur; +#if defined(SS_DEBUG) + skygw_chk_t bref_chk_tail; +#endif +} backend_ref_t; + + +typedef struct rwsplit_config_st { + int rw_max_slave_conn_percent; + int rw_max_slave_conn_count; + select_criteria_t rw_slave_select_criteria; +} rwsplit_config_t; + + /** * The client session structure used within this router. */ @@ -113,17 +181,18 @@ struct router_client_session { skygw_chk_t rses_chk_top; #endif SPINLOCK rses_lock; /*< protects rses_deleted */ - int rses_versno; /*< even = no active update, else odd */ + int rses_versno; /*< even = no active update, else odd. not used 4/14 */ bool rses_closed; /*< true when closeSession is called */ /** Properties listed by their type */ rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; - BACKEND* rses_backend[BE_COUNT];/*< Backends used by client session */ - DCB* rses_dcb[BE_COUNT]; - /*< cursor is pointer and status variable to current session command */ - sescmd_cursor_t rses_cursor[BE_COUNT]; + backend_ref_t* rses_master_ref; + backend_ref_t* rses_backend_ref; /*< Pointer to backend reference array */ + rwsplit_config_t rses_config; /*< copied config info from router instance */ + int rses_nbackends; int rses_capabilities; /*< input type, for example */ bool rses_autocommit_enabled; bool rses_transaction_active; + uint64_t rses_id; /*< ID for router client session */ struct router_client_session* next; #if defined(SS_DEBUG) skygw_chk_t rses_chk_tail; @@ -151,6 +220,8 @@ typedef struct router_instance { SPINLOCK lock; /*< Lock for the instance data */ BACKEND** servers; /*< Backend servers */ BACKEND* master; /*< NULL or pointer */ + rwsplit_config_t rwsplit_config; /*< expanded config info from SERVICE */ + int rwsplit_version;/*< version number for router's config */ unsigned int bitmask; /*< Bitmask to apply to server->status */ unsigned int bitvalue; /*< Required value of server->status */ ROUTER_STATS stats; /*< Statistics for this router */ diff --git a/server/modules/monitor/galera_mon.c b/server/modules/monitor/galera_mon.c index 1f277dcbc..ecdfa44a3 100644 --- a/server/modules/monitor/galera_mon.c +++ b/server/modules/monitor/galera_mon.c @@ -360,6 +360,6 @@ MONITOR_SERVERS *ptr; monitorDatabase(ptr, handle->defaultUser, handle->defaultPasswd); ptr = ptr->next; } - thread_millisleep(10000); + thread_millisleep(MONITOR_INTERVAL); } } diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index a13cd7d73..554f3810f 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -22,12 +22,14 @@ * @verbatim * Revision History * - * Date Who Description - * 08/07/13 Mark Riddoch Initial implementation - * 11/07/13 Mark Riddoch Addition of code to check replication - * status - * 25/07/13 Mark Riddoch Addition of decrypt for passwords and - * diagnostic interface + * Date Who Description + * 08/07/13 Mark Riddoch Initial implementation + * 11/07/13 Mark Riddoch Addition of code to check replication + * status + * 25/07/13 Mark Riddoch Addition of decrypt for passwords and + * diagnostic interface + * 20/05/14 Massimiliano Pinto Addition of support for MariadDB multimaster replication setup. + * New server field version_string is updated. * * @endverbatim */ @@ -49,7 +51,7 @@ extern int lm_enabled_logfiles_bitmask; static void monitorMain(void *); -static char *version_str = "V1.0.0"; +static char *version_str = "V1.1.0"; static void *startMonitor(void *); static void stopMonitor(void *); @@ -290,6 +292,8 @@ MYSQL_RES *result; int num_fields; int ismaster = 0, isslave = 0; char *uname = defaultUser, *passwd = defaultPasswd; +unsigned long int server_version = 0; +char *server_string; if (database->server->monuser != NULL) { @@ -321,6 +325,15 @@ char *uname = defaultUser, *passwd = defaultPasswd; /* If we get this far then we have a working connection */ server_set_status(database->server, SERVER_RUNNING); + /* get server version from current server */ + server_version = mysql_get_server_version(database->con); + + /* get server version string */ + server_string = (char *)mysql_get_server_info(database->con); + if (server_string) { + database->server->server_string = strdup(server_string); + } + /* Check SHOW SLAVE HOSTS - if we get rows then we are a master */ if (mysql_query(database->con, "SHOW SLAVE HOSTS")) { @@ -342,17 +355,43 @@ char *uname = defaultUser, *passwd = defaultPasswd; /* Check if the Slave_SQL_Running and Slave_IO_Running status is * set to Yes */ - if (mysql_query(database->con, "SHOW SLAVE STATUS") == 0 - && (result = mysql_store_result(database->con)) != NULL) - { - num_fields = mysql_num_fields(result); - while ((row = mysql_fetch_row(result))) + + /* Check first for MariaDB 10.x.x and get status for multimaster replication */ + if (server_version >= 100000) { + + if (mysql_query(database->con, "SHOW ALL SLAVES STATUS") == 0 + && (result = mysql_store_result(database->con)) != NULL) { - if (strncmp(row[10], "Yes", 3) == 0 - && strncmp(row[11], "Yes", 3) == 0) + int i = 0; + num_fields = mysql_num_fields(result); + while ((row = mysql_fetch_row(result))) + { + if (strncmp(row[12], "Yes", 3) == 0 + && strncmp(row[13], "Yes", 3) == 0) { + isslave += 1; + } + i++; + } + mysql_free_result(result); + + if (isslave == i) isslave = 1; + else + isslave = 0; + } + } else { + if (mysql_query(database->con, "SHOW SLAVE STATUS") == 0 + && (result = mysql_store_result(database->con)) != NULL) + { + num_fields = mysql_num_fields(result); + while ((row = mysql_fetch_row(result))) + { + if (strncmp(row[10], "Yes", 3) == 0 + && strncmp(row[11], "Yes", 3) == 0) + isslave = 1; + } + mysql_free_result(result); } - mysql_free_result(result); } if (ismaster) diff --git a/server/modules/monitor/mysqlmon.h b/server/modules/monitor/mysqlmon.h index c2aacd21e..a2c2e364c 100644 --- a/server/modules/monitor/mysqlmon.h +++ b/server/modules/monitor/mysqlmon.h @@ -27,8 +27,9 @@ * @verbatim * Revision History * - * Date Who Description - * 08/07/13 Mark Riddoch Initial implementation + * Date Who Description + * 08/07/13 Mark Riddoch Initial implementation + * 26/05/14 Massimiliano Pinto Default values for MONITOR_INTERVAL * * @endverbatim */ @@ -61,4 +62,6 @@ typedef struct { #define MONITOR_STOPPING 2 #define MONITOR_STOPPED 3 +#define MONITOR_INTERVAL 10000 // in milliseconds + #endif diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 902305c6b..e4b461dad 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -311,12 +311,14 @@ static int gw_read_backend_event(DCB *dcb) { /* try reload users' table for next connection */ service_refresh_users(dcb->session->client->service); - while (session->state != SESSION_STATE_ROUTER_READY) + while (session->state != SESSION_STATE_ROUTER_READY && + session->state != SESSION_STATE_STOPPING) { ss_dassert( session->state == SESSION_STATE_READY || session->state == - SESSION_STATE_ROUTER_READY); + SESSION_STATE_ROUTER_READY || + session->state == SESSION_STATE_STOPPING); /** * Session shouldn't be NULL at this point * anymore. Just checking.. @@ -328,6 +330,15 @@ static int gw_read_backend_event(DCB *dcb) { } usleep(1); } + + if (session->state == SESSION_STATE_STOPPING) + { + goto return_with_lock; + } + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + /** * rsession shouldn't be NULL since session * state indicates that it was initialized @@ -362,8 +373,7 @@ static int gw_read_backend_event(DCB *dcb) { /* check the delay queue and flush the data */ if (dcb->delayq) { - backend_write_delayqueue(dcb); - rc = 1; + rc = backend_write_delayqueue(dcb); goto return_with_lock; } } @@ -419,21 +429,23 @@ static int gw_read_backend_event(DCB *dcb) { if (dcb->session->client != NULL) { client_protocol = SESSION_PROTOCOL(dcb->session, MySQLProtocol); - } - - if (client_protocol != NULL) { - CHK_PROTOCOL(client_protocol); + if (client_protocol != NULL) { + CHK_PROTOCOL(client_protocol); - if (client_protocol->state == MYSQL_IDLE) - { - router->clientReply(router_instance, + if (client_protocol->state == MYSQL_IDLE) + { + router->clientReply(router_instance, rsession, writebuf, dcb); - rc = 1; - } - goto return_rc; - } + rc = 1; + } + goto return_rc; + } else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL) { + router->clientReply(router_instance, rsession, writebuf, dcb); + rc = 1; + } + } } return_rc: @@ -572,9 +584,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) snprintf(str, len+1, "%s", startpoint); LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, - "Error : Routing query \"%s\" failed due to " - "authentication failure.", - str))); + "Error : Authentication to backend failed."))); /** Consume query buffer */ while ((queue = gwbuf_consume( queue, @@ -672,6 +682,10 @@ static int gw_error_backend_event(DCB *dcb) { if (session->state == SESSION_STATE_ROUTER_READY) { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + rsession = session->router_session; /*< * rsession should never be NULL here. @@ -852,34 +866,36 @@ static int backend_write_delayqueue(DCB *dcb) spinlock_acquire(&dcb->delayqlock); + if (dcb->delayq == NULL) + { + spinlock_release(&dcb->delayqlock); + rc = 1; + } + else + { localq = dcb->delayq; dcb->delayq = NULL; - spinlock_release(&dcb->delayqlock); rc = dcb_write(dcb, localq); + } if (rc == 0) { - /*< vraa : errorHandle */ - /** - * This error can be muted because it is often due - * unexpected dcb state which means that concurrent thread - * already wrote the queue and closed dcb. - */ -#if 0 LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, - "%lu [backend_write_delayqueue] Some error occurred in " - "backend.", - pthread_self()))); -#endif + "Error : failed to write buffered data to back-end " + "server. Buffer was empty of back-end was disconnected " + "during operation."))); + mysql_send_custom_error( dcb->session->client, 1, 0, - "Unable to write to backend server. Connection was " - "closed."); + "Failed to write buffered data to back-end server. " + "Buffer was empty or back-end was disconnected during " + "operation."); dcb_close(dcb); } + return rc; } diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 16735e817..bcd94e423 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -552,7 +552,7 @@ int gw_read_client_event(DCB* dcb) { goto return_rc; } - // close client socket and the sessioA too + // close client socket and the session too dcb->func.close(dcb); } else { // do nothing if reading 1 byte @@ -723,7 +723,7 @@ int gw_read_client_event(DCB* dcb) { dcb, 1, 0, - "Query routing failed. Connection to " + "Can't route query. Connection to " "backend lost"); protocol->state = MYSQL_IDLE; } @@ -772,12 +772,8 @@ int gw_read_client_event(DCB* dcb) { "backend. Close client dcb %p", pthread_self(), dcb))); - - /** close client connection */ - (dcb->func).close(dcb); - /** close backends connection */ - router->closeSession(router_instance, rsession); - rc = 1; + /** close client connection, closes router session too */ + rc = dcb->func.close(dcb); } else { @@ -1263,37 +1259,16 @@ return_rc: return rc; } -static int gw_error_client_event(DCB *dcb) { - SESSION* session; - ROUTER_OBJECT* router; - void* router_instance; - void* rsession; - -#if defined(SS_DEBUG) - MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol; - if (dcb->state == DCB_STATE_POLLING || - dcb->state == DCB_STATE_NOPOLLING || - dcb->state == DCB_STATE_ZOMBIE) +static int gw_error_client_event( + DCB* dcb) { - CHK_PROTOCOL(protocol); - } -#endif + int rc; - session = dcb->session; + CHK_DCB(dcb); - /** - * session may be NULL if session_alloc failed. - * In that case router session was not created. - */ - if (session != NULL) { - CHK_SESSION(session); - router = session->service->router; - router_instance = session->service->router_instance; - rsession = session->router_session; - router->closeSession(router_instance, rsession); - } - dcb_close(dcb); - return 1; + rc = dcb->func.close(dcb); + + return rc; } static int @@ -1320,6 +1295,10 @@ gw_client_close(DCB *dcb) */ if (session != NULL) { CHK_SESSION(session); + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + router = session->service->router; router_instance = session->service->router_instance; rsession = session->router_session; @@ -1342,42 +1321,11 @@ gw_client_close(DCB *dcb) static int gw_client_hangup_event(DCB *dcb) { - SESSION* session; - ROUTER_OBJECT* router; - void* router_instance; - void* rsession; - int rc = 1; + int rc; - #if defined(SS_DEBUG) - MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol; - if (dcb->state == DCB_STATE_POLLING || - dcb->state == DCB_STATE_NOPOLLING || - dcb->state == DCB_STATE_ZOMBIE) - { - CHK_PROTOCOL(protocol); - } -#endif CHK_DCB(dcb); + rc = dcb->func.close(dcb); - if (dcb->state != DCB_STATE_POLLING) { - goto return_rc; - } - - session = dcb->session; - /** - * session may be NULL if session_alloc failed. - * In that case router session was not created. - */ - if (session != NULL) { - CHK_SESSION(session); - router = session->service->router; - router_instance = session->service->router_instance; - rsession = session->router_session; - router->closeSession(router_instance, rsession); - } - - dcb_close(dcb); -return_rc: return rc; } diff --git a/server/modules/routing/debugcli.c b/server/modules/routing/debugcli.c index 66c02c98b..d6becdfda 100644 --- a/server/modules/routing/debugcli.c +++ b/server/modules/routing/debugcli.c @@ -45,7 +45,7 @@ extern int lm_enabled_logfiles_bitmask; -static char *version_str = "V1.0.1"; +static char *version_str = "V1.1.0"; /* The router entry points */ static ROUTER *createInstance(SERVICE *service, char **options); @@ -127,6 +127,7 @@ static ROUTER * createInstance(SERVICE *service, char **options) { CLI_INSTANCE *inst; +int i; if ((inst = malloc(sizeof(CLI_INSTANCE))) == NULL) return NULL; @@ -134,7 +135,29 @@ CLI_INSTANCE *inst; inst->service = service; spinlock_init(&inst->lock); inst->sessions = NULL; + inst->mode = CLIM_USER; + if (options) + { + for (i = 0; options[i]; i++) + { + if (!strcasecmp(options[i], "developer")) + { + inst->mode = CLIM_DEVELOPER; + } + else if (!strcasecmp(options[i], "user")) + { + inst->mode = CLIM_USER; + } + else + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Unknown option for CLI '%s'\n", + options[i]))); + } + } + } /* * We have completed the creation of the instance data, so now @@ -176,11 +199,15 @@ CLI_SESSION *client; spinlock_release(&inst->lock); session->state = SESSION_STATE_READY; + client->mode = inst->mode; dcb_printf(session->client, "Welcome the SkySQL MaxScale Debug Interface (%s).\n", version_str); - dcb_printf(session->client, "WARNING: This interface is meant for developer usage,\n"); - dcb_printf(session->client, "passing incorrect addresses to commands can endanger your MaxScale server.\n\n"); + if (client->mode == CLIM_DEVELOPER) + { + dcb_printf(session->client, "WARNING: This interface is meant for developer usage,\n"); + dcb_printf(session->client, "passing incorrect addresses to commands can endanger your MaxScale server.\n\n"); + } dcb_printf(session->client, "Type help for a list of available commands.\n\n"); return (void *)client; @@ -281,4 +308,4 @@ static uint8_t getCapabilities( void* router_session) { return 0; -} \ No newline at end of file +} diff --git a/server/modules/routing/debugcmd.c b/server/modules/routing/debugcmd.c index bf3aa888b..bab8c2f55 100644 --- a/server/modules/routing/debugcmd.c +++ b/server/modules/routing/debugcmd.c @@ -36,7 +36,10 @@ * Date Who Description * 20/06/13 Mark Riddoch Initial implementation * 17/07/13 Mark Riddoch Additional commands - * 09/08/2013 Massimiliano Pinto Addes enable/disable commands (now only for log) + * 09/08/2013 Massimiliano Pinto Added enable/disable commands (now only for log) + * 20/05/14 Mark Riddoch Added ability to give server and service names rather + * than simply addresses + * 23/05/14 Mark Riddoch Added support for developer and user modes * * @endverbatim */ @@ -69,6 +72,12 @@ #define ARG_TYPE_ADDRESS 1 #define ARG_TYPE_STRING 2 #define ARG_TYPE_SERVICE 3 +#define ARG_TYPE_SERVER 4 +#define ARG_TYPE_DBUSERS 5 +#define ARG_TYPE_SESSION 6 +#define ARG_TYPE_DCB 7 +#define ARG_TYPE_MONITOR 8 + /** * The subcommand structure * @@ -79,6 +88,7 @@ struct subcommand { int n_args; void (*fn)(); char *help; + char *devhelp; int arg_types[3]; }; @@ -87,33 +97,59 @@ static void telnetdShowUsers(DCB *); * The subcommands of the show command */ struct subcommand showoptions[] = { - { "dcbs", 0, dprintAllDCBs, "Show all descriptor control blocks (network connections)", + { "dcbs", 0, dprintAllDCBs, + "Show all descriptor control blocks (network connections)", + "Show all descriptor control blocks (network connections)", {0, 0, 0} }, - { "dcb", 1, dprintDCB, "Show a single descriptor control block e.g. show dcb 0x493340", - {ARG_TYPE_ADDRESS, 0, 0} }, - { "dbusers", 1, dcb_usersPrint, "Show statistics and user names for a service's user table.\n\t\tExample : show dbusers ", - {ARG_TYPE_ADDRESS, 0, 0} }, - { "epoll", 0, dprintPollStats, "Show the poll statistics", + { "dcb", 1, dprintDCB, + "Show a single descriptor control block e.g. show dcb 0x493340", + "Show a single descriptor control block e.g. show dcb 0x493340", + {ARG_TYPE_DCB, 0, 0} }, + { "dbusers", 1, dcb_usersPrint, + "Show statistics and user names for a service's user table.\n\t\tExample : show dbusers ", + "Show statistics and user names for a service's user table.\n\t\tExample : show dbusers |", + {ARG_TYPE_DBUSERS, 0, 0} }, + { "epoll", 0, dprintPollStats, + "Show the poll statistics", + "Show the poll statistics", {0, 0, 0} }, - { "modules", 0, dprintAllModules, "Show all currently loaded modules", + { "modules", 0, dprintAllModules, + "Show all currently loaded modules", + "Show all currently loaded modules", {0, 0, 0} }, - { "monitors", 0, monitorShowAll, "Show the monitors that are configured", + { "monitors", 0, monitorShowAll, + "Show the monitors that are configured", + "Show the monitors that are configured", {0, 0, 0} }, - { "server", 1, dprintServer, "Show details for a server, e.g. show server 0x485390", - {ARG_TYPE_ADDRESS, 0, 0} }, - { "servers", 0, dprintAllServers, "Show all configured servers", + { "server", 1, dprintServer, + "Show details for a named server, e.g. show server dbnode1", + "Show details for a server, e.g. show server 0x485390. The address may also be repalced with the server name from the configuration file", + {ARG_TYPE_SERVER, 0, 0} }, + { "servers", 0, dprintAllServers, + "Show all configured servers", + "Show all configured servers", {0, 0, 0} }, - { "services", 0, dprintAllServices, "Show all configured services in MaxScale", + { "services", 0, dprintAllServices, + "Show all configured services in MaxScale", + "Show all configured services in MaxScale", {0, 0, 0} }, - { "service", 1, dprintService, "Show single service in MaxScale", + { "service", 1, dprintService, + "Show a single service in MaxScale, may be passed a service name", + "Show a single service in MaxScale, may be passed a service name or address of a service object", {ARG_TYPE_SERVICE, 0, 0} }, - { "session", 1, dprintSession, "Show a single session in MaxScale, e.g. show session 0x284830", - {ARG_TYPE_ADDRESS, 0, 0} }, - { "sessions", 0, dprintAllSessions, "Show all active sessions in MaxScale", + { "session", 1, dprintSession, + "Show a single session in MaxScale, e.g. show session 0x284830", + "Show a single session in MaxScale, e.g. show session 0x284830", + {ARG_TYPE_SESSION, 0, 0} }, + { "sessions", 0, dprintAllSessions, + "Show all active sessions in MaxScale", + "Show all active sessions in MaxScale", {0, 0, 0} }, - { "users", 0, telnetdShowUsers, "Show statistics and user names for the debug interface", - {ARG_TYPE_ADDRESS, 0, 0} }, - { NULL, 0, NULL, NULL, + { "users", 0, telnetdShowUsers, + "Show statistics and user names for the debug interface", + "Show statistics and user names for the debug interface", + {0, 0, 0} }, + { NULL, 0, NULL, NULL, NULL, {0, 0, 0} } }; @@ -129,7 +165,7 @@ struct subcommand shutdownoptions[] = { 0, shutdown_server, "Shutdown MaxScale", - + "Shutdown MaxScale", {0, 0, 0} }, { @@ -137,13 +173,15 @@ struct subcommand shutdownoptions[] = { 1, shutdown_monitor, "Shutdown a monitor, e.g. shutdown monitor 0x48381e0", - {ARG_TYPE_ADDRESS, 0, 0} + "Shutdown a monitor, e.g. shutdown monitor 0x48381e0", + {ARG_TYPE_MONITOR, 0, 0} }, { "service", 1, shutdown_service, - "Shutdown a service, e.g. shutdown service 0x4838320", + "Shutdown a service, e.g. shutdown service \"Sales Database\"", + "Shutdown a service, e.g. shutdown service 0x4838320 or shutdown service \"Sales Database\"", {ARG_TYPE_SERVICE, 0, 0} }, { @@ -151,6 +189,7 @@ struct subcommand shutdownoptions[] = { 0, NULL, NULL, + NULL, {0, 0, 0} } }; @@ -162,11 +201,15 @@ static void restart_monitor(DCB *dcb, MONITOR *monitor); * The subcommands of the restart command */ struct subcommand restartoptions[] = { - { "monitor", 1, restart_monitor, "Restart a monitor, e.g. restart monitor 0x48181e0", - {ARG_TYPE_ADDRESS, 0, 0} }, - { "service", 1, restart_service, "Restart a service, e.g. restart service name", + { "monitor", 1, restart_monitor, + "Restart a monitor, e.g. restart monitor 0x48181e0", + "Restart a monitor, e.g. restart monitor 0x48181e0", + {ARG_TYPE_MONITOR, 0, 0} }, + { "service", 1, restart_service, + "Restart a service, e.g. restart service \"Test Service\"", + "Restart a service, e.g. restart service 0x4838320", {ARG_TYPE_SERVICE, 0, 0} }, - { NULL, 0, NULL, NULL, + { NULL, 0, NULL, NULL, NULL, {0, 0, 0} } }; @@ -175,9 +218,11 @@ static void set_server(DCB *dcb, SERVER *server, char *bit); * The subcommands of the set command */ struct subcommand setoptions[] = { - { "server", 2, set_server, "Set the status of a server. E.g. set server 0x4838320 master", - {ARG_TYPE_ADDRESS, ARG_TYPE_STRING, 0} }, - { NULL, 0, NULL, NULL, + { "server", 2, set_server, + "Set the status of a server. E.g. set server dbnode4 master", + "Set the status of a server. E.g. set server 0x4838320 master", + {ARG_TYPE_SERVER, ARG_TYPE_STRING, 0} }, + { NULL, 0, NULL, NULL, NULL, {0, 0, 0} } }; @@ -186,9 +231,11 @@ static void clear_server(DCB *dcb, SERVER *server, char *bit); * The subcommands of the clear command */ struct subcommand clearoptions[] = { - { "server", 2, clear_server, "Clear the status of a server. E.g. clear server 0x4838320 master", - {ARG_TYPE_ADDRESS, ARG_TYPE_STRING, 0} }, - { NULL, 0, NULL, NULL, + { "server", 2, clear_server, + "Clear the status of a server. E.g. clear server dbnode2 master", + "Clear the status of a server. E.g. clear server 0x4838320 master", + {ARG_TYPE_SERVER, ARG_TYPE_STRING, 0} }, + { NULL, 0, NULL, NULL, NULL, {0, 0, 0} } }; @@ -199,11 +246,15 @@ static void reload_config(DCB *dcb); * The subcommands of the reload command */ struct subcommand reloadoptions[] = { - { "config", 0, reload_config, "Reload the configuration data for MaxScale.", - {ARG_TYPE_ADDRESS, 0, 0} }, - { "dbusers", 1, reload_dbusers, "Reload the dbuser data for a service. E.g. reload dbusers 0x849420", - {ARG_TYPE_ADDRESS, 0, 0} }, - { NULL, 0, NULL, NULL, + { "config", 0, reload_config, + "Reload the configuration data for MaxScale.", + "Reload the configuration data for MaxScale.", + {0, 0, 0} }, + { "dbusers", 1, reload_dbusers, + "Reload the dbuser data for a service. E.g. reload dbusers \"splitter service\"", + "Reload the dbuser data for a service. E.g. reload dbusers 0x849420", + {ARG_TYPE_DBUSERS, 0, 0} }, + { NULL, 0, NULL, NULL, NULL, {0, 0, 0} } }; @@ -220,6 +271,8 @@ struct subcommand enableoptions[] = { enable_log_action, "Enable Log options for MaxScale, options trace | error | " "message E.g. enable log message.", + "Enable Log options for MaxScale, options trace | error | " + "message E.g. enable log message.", {ARG_TYPE_STRING, 0, 0} }, { @@ -227,6 +280,7 @@ struct subcommand enableoptions[] = { 0, NULL, NULL, + NULL, {0, 0, 0} } }; @@ -242,6 +296,8 @@ struct subcommand disableoptions[] = { disable_log_action, "Disable Log for MaxScale, Options: debug | trace | error | message " "E.g. disable log debug", + "Disable Log for MaxScale, Options: debug | trace | error | message " + "E.g. disable log debug", {ARG_TYPE_STRING, 0, 0} }, { @@ -249,6 +305,7 @@ struct subcommand disableoptions[] = { 0, NULL, NULL, + NULL, {0, 0, 0} } }; @@ -267,6 +324,7 @@ struct subcommand failoptions[] = { 0, fail_backendfd, "Fail backend socket for next operation.", + "Fail backend socket for next operation.", {ARG_TYPE_STRING, 0, 0} }, { @@ -274,6 +332,7 @@ struct subcommand failoptions[] = { 0, fail_clientfd, "Fail client socket for next operation.", + "Fail client socket for next operation.", {ARG_TYPE_STRING, 0, 0} }, { @@ -281,6 +340,7 @@ struct subcommand failoptions[] = { 2, fail_accept, "Fail to accept next client connection.", + "Fail to accept next client connection.", {ARG_TYPE_STRING, ARG_TYPE_STRING, 0} }, { @@ -288,6 +348,7 @@ struct subcommand failoptions[] = { 0, NULL, NULL, + NULL, {0, 0, 0} } }; @@ -298,9 +359,11 @@ static void telnetdAddUser(DCB *, char *, char *); * The subcommands of the add command */ struct subcommand addoptions[] = { - { "user", 2, telnetdAddUser, "Add a new user for the debug interface. E.g. add user john today", + { "user", 2, telnetdAddUser, + "Add a new user for the debug interface. E.g. add user john today", + "Add a new user for the debug interface. E.g. add user john today", {ARG_TYPE_STRING, ARG_TYPE_STRING, 0} }, - { NULL, 0, NULL, NULL, + { NULL, 0, NULL, NULL, NULL, {0, 0, 0} } }; @@ -315,10 +378,11 @@ struct subcommand removeoptions[] = { 2, telnetdRemoveUser, "Remove existing maxscale user. Example : remove user john johnpwd", + "Remove existing maxscale user. Example : remove user john johnpwd", {ARG_TYPE_STRING, ARG_TYPE_STRING, 0} }, { - NULL, 0, NULL, NULL, {0, 0, 0} + NULL, 0, NULL, NULL, NULL, {0, 0, 0} } }; @@ -351,25 +415,55 @@ static struct { * Convert a string argument to a numeric, observing prefixes * for number bases, e.g. 0x for hex, 0 for octal * + * @param mode The CLI mode * @param arg The string representation of the argument * @param arg_type The target type for the argument * @return The argument as a long integer */ static unsigned long -convert_arg(char *arg, int arg_type) +convert_arg(int mode, char *arg, int arg_type) { unsigned long rval; +SERVICE *service; switch (arg_type) { - case ARG_TYPE_SERVICE: - if ((rval = (unsigned long)strtol(arg, NULL, 0)) == 0) - rval = (unsigned long)service_find(arg); - return rval; case ARG_TYPE_ADDRESS: return (unsigned long)strtol(arg, NULL, 0); case ARG_TYPE_STRING: return (unsigned long)arg; + case ARG_TYPE_SERVICE: + if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0) + rval = (unsigned long)service_find(arg); + return rval; + case ARG_TYPE_SERVER: + if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0) + rval = (unsigned long)server_find_by_unique_name(arg); + return rval; + case ARG_TYPE_DBUSERS: + if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0) + { + service = service_find(arg); + if (service) + return (unsigned long)(service->users); + else + return 0; + } + return rval; + case ARG_TYPE_DCB: + rval = (unsigned long)strtol(arg, NULL, 0); + if (mode == CLIM_USER && dcb_isvalid((DCB *)rval) == 0) + rval = 0; + return rval; + case ARG_TYPE_SESSION: + rval = (unsigned long)strtol(arg, NULL, 0); + if (mode == CLIM_USER && session_isvalid((SESSION *)rval) == 0) + rval = 0; + return rval; + case ARG_TYPE_MONITOR: + if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0) + rval = (unsigned long)monitor_find(arg); + return rval; } return 0; } @@ -396,33 +490,94 @@ int argc, i, j, found = 0; char *args[MAXARGS]; char *saveptr, *delim = " \t\r\n"; unsigned long arg1, arg2, arg3; +int in_quotes = 0, escape_next = 0; +char *ptr, *lptr; - /* Tokenize the input string */ - args[0] = strtok_r(cli->cmdbuf, delim, &saveptr); + args[0] = cli->cmdbuf; + ptr = args[0]; + lptr = ptr; i = 0; - do { - i++; - args[i] = strtok_r(NULL, delim, &saveptr); - } while (args[i] != NULL && i < MAXARGS); + /* + * Break the command line into a number of words. Whitespace is used + * to delimit words and may be escaped by use of the \ character or + * the use of double quotes. + * The array args contains the broken down words, one per index. + */ + while (*ptr) + { + if (escape_next) + { + *lptr++ = *ptr++; + escape_next = 0; + } + else if (*ptr == '\\') + { + escape_next = 1; + ptr++; + } + else if (in_quotes == 0 && (*ptr == ' ' || *ptr == '\t' || *ptr == '\r' || *ptr == '\n')) + { + *lptr = 0; + if (args[i] == ptr) + args[i] = ptr + 1; + else + { + i++; + if (i >= MAXARGS) + break; + args[i] = ptr + 1; + } + ptr++; + lptr++; + } + else if (*ptr == '\"' && in_quotes == 0) + { + in_quotes = 1; + ptr++; + } + else if (*ptr == '\"' && in_quotes == 1) + { + in_quotes = 0; + ptr++; + } + else + { + *lptr++ = *ptr++; + } + } + *lptr = 0; + args[i+1] = NULL; - if (args[0] == NULL) + if (args[0] == NULL || *args[0] == 0) return 1; argc = i - 2; /* The number of extra arguments to commands */ if (!strcasecmp(args[0], "help")) { - if (args[1] == NULL) + if (args[1] == NULL || *args[1] == 0) { found = 1; dcb_printf(dcb, "Available commands:\n"); for (i = 0; cmds[i].cmd; i++) { - for (j = 0; cmds[i].options[j].arg1; j++) + if (cmds[i].options[1].arg1 == NULL) + dcb_printf(dcb, " %s %s\n", cmds[i].cmd, cmds[i].options[0].arg1); + else { - dcb_printf(dcb, " %s %s\n", cmds[i].cmd, cmds[i].options[j].arg1); + dcb_printf(dcb, " %s [", cmds[i].cmd); + for (j = 0; cmds[i].options[j].arg1; j++) + { + dcb_printf(dcb, "%s%s", cmds[i].options[j].arg1, + cmds[i].options[j+1].arg1 ? "|" : ""); + } + dcb_printf(dcb, "]\n"); } } + dcb_printf(dcb, "\nType help command to see details of each command.\n"); + dcb_printf(dcb, "Where commands require names as arguments and these names contain\n"); + dcb_printf(dcb, "whitespace either the \\ character may be used to escape the whitespace\n"); + dcb_printf(dcb, "or the name may be enclosed in double quotes \".\n\n"); } else { @@ -458,9 +613,9 @@ unsigned long arg1, arg2, arg3; { for (j = 0; cmds[i].options[j].arg1; j++) { - found = 1; /**< command and sub-command match */ if (strcasecmp(args[1], cmds[i].options[j].arg1) == 0) { + found = 1; /**< command and sub-command match */ if (argc != cmds[i].options[j].n_args) { dcb_printf(dcb, "Incorrect number of arguments: %s %s expects %d arguments\n", @@ -476,7 +631,7 @@ unsigned long arg1, arg2, arg3; cmds[i].options[j].fn(dcb); break; case 1: - arg1 = convert_arg(args[2],cmds[i].options[j].arg_types[0]); + arg1 = convert_arg(cli->mode, args[2],cmds[i].options[j].arg_types[0]); if (arg1) cmds[i].options[j].fn(dcb, arg1); else @@ -484,8 +639,8 @@ unsigned long arg1, arg2, arg3; args[2]); break; case 2: - arg1 = convert_arg(args[2],cmds[i].options[j].arg_types[0]); - arg2 = convert_arg(args[3],cmds[i].options[j].arg_types[1]); + arg1 = convert_arg(cli->mode, args[2],cmds[i].options[j].arg_types[0]); + arg2 = convert_arg(cli->mode, args[3],cmds[i].options[j].arg_types[1]); if (arg1 && arg2) cmds[i].options[j].fn(dcb, arg1, arg2); else if (arg1 == 0) @@ -496,9 +651,9 @@ unsigned long arg1, arg2, arg3; args[3]); break; case 3: - arg1 = convert_arg(args[2],cmds[i].options[j].arg_types[0]); - arg2 = convert_arg(args[3],cmds[i].options[j].arg_types[1]); - arg3 = convert_arg(args[4],cmds[i].options[j].arg_types[2]); + arg1 = convert_arg(cli->mode, args[2],cmds[i].options[j].arg_types[0]); + arg2 = convert_arg(cli->mode, args[3],cmds[i].options[j].arg_types[1]); + arg3 = convert_arg(cli->mode, args[4],cmds[i].options[j].arg_types[2]); if (arg1 && arg2 && arg3) cmds[i].options[j].fn(dcb, arg1, arg2, arg3); else if (arg1 == 0) diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index f587b2c63..2bfba7efc 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -252,10 +252,12 @@ int i, n; } else { - LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, - "Warning : Unsupported router " - "option %s for readconnroute.", + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : Unsupported router " + "option \'%s\' for readconnroute. " + "Expected router options are " + "[slave|master|synced]", options[i]))); } } diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 614c5c3e8..a203a9865 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -70,12 +70,46 @@ static void clientReply( DCB* backend_dcb); static uint8_t getCapabilities (ROUTER* inst, void* router_session); +int bref_cmp_global_conn( + const void* bref1, + const void* bref2); -static bool search_backend_servers( - BACKEND** p_master, - BACKEND** p_slave, +int bref_cmp_router_conn( + const void* bref1, + const void* bref2); + +int bref_cmp_behind_master( + const void* bref1, + const void* bref2); + +int (*criteria_cmpfun[LAST_CRITERIA])(const void*, const void*)= +{ + NULL, + bref_cmp_global_conn, + bref_cmp_router_conn, + bref_cmp_behind_master +}; + +static bool select_connect_backend_servers( + backend_ref_t** p_master_ref, + backend_ref_t* backend_ref, + int router_nservers, + int max_nslaves, + select_criteria_t select_criteria, + SESSION* session, ROUTER_INSTANCE* router); +static bool get_dcb( + DCB** dcb, + ROUTER_CLIENT_SES* rses, + backend_type_t btype); + +static void rwsplit_process_options( + ROUTER_INSTANCE* router, + char** options); + + + static ROUTER_OBJECT MyObject = { createInstance, newSession, @@ -118,13 +152,8 @@ static void rses_property_done( static mysql_sescmd_t* rses_property_get_sescmd( rses_property_t* prop); -static sescmd_cursor_t* rses_get_sescmd_cursor( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type); - static bool execute_sescmd_in_backend( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type); + backend_ref_t* backend_ref); static void sescmd_cursor_set_active( sescmd_cursor_t* sescmd_cursor, @@ -147,14 +176,10 @@ static GWBUF* sescmd_cursor_process_replies( GWBUF* replybuf, sescmd_cursor_t* scur); -static bool cont_exec_sescmd_in_backend( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type); - static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, char* funcname, - DCB* dcb, + backend_ref_t* bref, GWBUF* buf); static bool route_session_write( @@ -164,6 +189,10 @@ static bool route_session_write( unsigned char packet_type, skygw_query_type_t qtype); +static void refreshInstance( + ROUTER_INSTANCE* router, + CONFIG_PARAMETER* param); + static SPINLOCK instlock; static ROUTER_INSTANCE* instances; @@ -205,6 +234,35 @@ ROUTER_OBJECT* GetModuleObject() return &MyObject; } +static void refreshInstance( + ROUTER_INSTANCE* router, + CONFIG_PARAMETER* param) +{ + config_param_type_t paramtype; + + paramtype = config_get_paramtype(param); + + if (paramtype == COUNT_TYPE) + { + if (strncmp(param->name, "max_slave_connections", MAX_PARAM_LEN) == 0) + { + router->rwsplit_config.rw_max_slave_conn_percent = 0; + router->rwsplit_config.rw_max_slave_conn_count = + config_get_valint(param, NULL, paramtype); + } + } + else if (paramtype == PERCENT_TYPE) + { + if (strncmp(param->name, "max_slave_connections", MAX_PARAM_LEN) == 0) + { + router->rwsplit_config.rw_max_slave_conn_count = 0; + router->rwsplit_config.rw_max_slave_conn_percent = + config_get_valint(param, NULL, paramtype); + } + } +} + + /** * Create an instance of read/write statemtn router within the MaxScale. * @@ -220,8 +278,9 @@ static ROUTER* createInstance( { ROUTER_INSTANCE* router; SERVER* server; - int n; + int nservers; int i; + CONFIG_PARAMETER* param; if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { return NULL; @@ -231,49 +290,50 @@ static ROUTER* createInstance( /** Calculate number of servers */ server = service->databases; + nservers = 0; - for (n=0; server != NULL; server=server->nextdb) { - n++; + while (server != NULL) + { + nservers++; + server=server->nextdb; } - router->servers = (BACKEND **)calloc(n + 1, sizeof(BACKEND *)); + router->servers = (BACKEND **)calloc(nservers + 1, sizeof(BACKEND *)); if (router->servers == NULL) { free(router); return NULL; } - - if (options != NULL) - { - LOGIF(LM, (skygw_log_write_flush( - LOGFILE_MESSAGE, - "Router options supplied to read/write statement router " - "module but none are supported. The options will be " - "ignored."))); - } /** * Create an array of the backend servers in the router structure to * maintain a count of the number of connections to each * backend server. */ server = service->databases; - n = 0; + nservers= 0; + while (server != NULL) { - if ((router->servers[n] = malloc(sizeof(BACKEND))) == NULL) + if ((router->servers[nservers] = malloc(sizeof(BACKEND))) == NULL) { - for (i = 0; i < n; i++) { + /** clean up */ + for (i = 0; i < nservers; i++) { free(router->servers[i]); } free(router->servers); free(router); return NULL; } - router->servers[n]->backend_server = server; - router->servers[n]->backend_conn_count = 0; - n += 1; + router->servers[nservers]->backend_server = server; + router->servers[nservers]->backend_conn_count = 0; + router->servers[nservers]->be_valid = false; +#if defined(SS_DEBUG) + router->servers[nservers]->be_chk_top = CHK_NUM_BACKEND; + router->servers[nservers]->be_chk_tail = CHK_NUM_BACKEND; +#endif + nservers += 1; server = server->nextdb; } - router->servers[n] = NULL; + router->servers[nservers] = NULL; /** * vraa : is this necessary for readwritesplit ? @@ -288,25 +348,32 @@ static ROUTER* createInstance( router->bitvalue = 0; if (options) { - for (i = 0; options[i]; i++) - { - if (!strcasecmp(options[i], "synced")) - { - router->bitmask |= (SERVER_JOINED); - router->bitvalue |= SERVER_JOINED; - } - else - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Warning : Unsupported " - "router option \"%s\" " - "for readwritesplit router.", - options[i]))); - } - } + rwsplit_process_options(router, options); } + /** + * Set default value for max_slave_connections and for slave selection + * criteria. If parameter is set in config file max_slave_connections + * will be overwritten. + */ + router->rwsplit_config.rw_max_slave_conn_count = CONFIG_MAX_SLAVE_CONN; + + if (router->rwsplit_config.rw_slave_select_criteria == UNDEFINED_CRITERIA) + { + router->rwsplit_config.rw_slave_select_criteria = DEFAULT_CRITERIA; + } + /** + * Copy all config parameters from service to router instance. + * Finally, copy version number to indicate that configs match. + */ + param = config_get_param(service->svc_config_param, "max_slave_connections"); + + if (param != NULL) + { + refreshInstance(router, param); + router->rwsplit_version = service->svc_config_version; + } + /** * We have completed the creation of the router data, so now * insert this router into the linked list of routers * that have been created with this module. @@ -333,93 +400,192 @@ static void* newSession( ROUTER* router_inst, SESSION* session) { - BACKEND* local_backend[BE_COUNT]; - ROUTER_CLIENT_SES* client_rses; + backend_ref_t* backend_ref; /*< array of backend references (DCB,BACKEND,cursor) */ + backend_ref_t* master_ref = NULL; /*< pointer to selected master */ + BACKEND** b; + ROUTER_CLIENT_SES* client_rses = NULL; ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst; bool succp; + int router_nservers = 0; /*< # of servers in total */ + int max_nslaves; /*< max # of slaves used in this session */ + int conf_max_nslaves; /*< value from configuration file */ + int i; + const int min_nservers = 1; /*< hard-coded for now */ + static uint64_t router_client_ses_seq; /*< ID for client session */ - client_rses = - (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); + client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); if (client_rses == NULL) { ss_dassert(false); - return NULL; + goto return_rses; } - memset(local_backend, 0, BE_COUNT*sizeof(void*)); - spinlock_init(&client_rses->rses_lock); #if defined(SS_DEBUG) client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; -#endif - /** store pointers to sescmd list to both cursors */ - client_rses->rses_cursor[BE_MASTER].scmd_cur_rses = client_rses; - client_rses->rses_cursor[BE_MASTER].scmd_cur_active = false; - client_rses->rses_cursor[BE_MASTER].scmd_cur_ptr_property = - &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - client_rses->rses_cursor[BE_MASTER].scmd_cur_cmd = NULL; - client_rses->rses_cursor[BE_MASTER].scmd_cur_be_type = BE_MASTER; - - client_rses->rses_cursor[BE_SLAVE].scmd_cur_rses = client_rses; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_active = false; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_ptr_property = - &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_cmd = NULL; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_be_type = BE_SLAVE; - +#endif /** - * Find a backend server to connect to. This is the extent of the - * load balancing algorithm we need to implement for this simple - * connection router. + * If service config has been changed, reload config from service to + * router instance first. */ - succp = search_backend_servers(&local_backend[BE_MASTER], - &local_backend[BE_SLAVE], - router); + spinlock_acquire(&router->lock); + if (router->service->svc_config_version > router->rwsplit_version) + { + CONFIG_PARAMETER* param = router->service->svc_config_param; + + while (param != NULL) + { + refreshInstance(router, param); + param = param->next; + } + router->rwsplit_version = router->service->svc_config_version; + /** Read options */ + rwsplit_process_options(router, router->service->routerOptions); + } + /** Copy config struct from router instance */ + client_rses->rses_config = router->rwsplit_config; + /** Create ID for the new client (router_client_ses) session */ + client_rses->rses_id = router_client_ses_seq += 1; + + spinlock_release(&router->lock); + /** + * Set defaults to session variables. + */ + client_rses->rses_autocommit_enabled = true; + client_rses->rses_transaction_active = false; + + /** count servers */ + b = router->servers; + while (*(b++) != NULL) router_nservers++; + + /** With too few servers session is not created */ + if (router_nservers < min_nservers || + MAX(client_rses->rses_config.rw_max_slave_conn_count, + (router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100) + < min_nservers) + { + if (router_nservers < min_nservers) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to start %s service. There are " + "too few backend servers available. Found %d " + "when %d is required.", + router->service->name, + router_nservers, + min_nservers))); + } + else + { + double pct = client_rses->rses_config.rw_max_slave_conn_percent/100; + double nservers = (double)router_nservers*pct; - /** Both Master and Slave must be found */ - if (!succp) { + if (client_rses->rses_config.rw_max_slave_conn_count < + min_nservers) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to start %s service. There are " + "too few backend servers configured in " + "MaxScale.cnf. Found %d when %d is required.", + router->service->name, + client_rses->rses_config.rw_max_slave_conn_count, + min_nservers))); + } + if (nservers < min_nservers) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to start %s service. There are " + "too few backend servers configured in " + "MaxScale.cnf. Found %d%% when at least %.0f%% " + "would be required.", + router->service->name, + client_rses->rses_config.rw_max_slave_conn_percent, + min_nservers/(((double)router_nservers)/100)))); + } + } free(client_rses); - return NULL; + client_rses = NULL; + goto return_rses; } /** - * Open the slave connection. + * Create backend reference objects for this session. */ - client_rses->rses_dcb[BE_SLAVE] = dcb_connect( - local_backend[BE_SLAVE]->backend_server, - session, - local_backend[BE_SLAVE]->backend_server->protocol); + backend_ref = (backend_ref_t *)calloc (1, router_nservers*sizeof(backend_ref_t)); - if (client_rses->rses_dcb[BE_SLAVE] == NULL) { - ss_dassert(session->refcount == 1); + if (backend_ref == NULL) + { + /** log this */ free(client_rses); - return NULL; + free(backend_ref); + client_rses = NULL; + goto return_rses; } /** - * Open the master connection. + * Initialize backend references with BACKEND ptr. + * Initialize session command cursors for each backend reference. */ - client_rses->rses_dcb[BE_MASTER] = dcb_connect( - local_backend[BE_MASTER]->backend_server, - session, - local_backend[BE_MASTER]->backend_server->protocol); - if (client_rses->rses_dcb[BE_MASTER] == NULL) + for (i=0; i< router_nservers; i++) { - /** Close slave connection first. */ - client_rses->rses_dcb[BE_SLAVE]->func.close(client_rses->rses_dcb[BE_SLAVE]); - free(client_rses); - return NULL; - } - /** - * We now have a master and a slave server with the least connections. - * Bump the connection counts for these servers. +#if defined(SS_DEBUG) + backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF; + backend_ref[i].bref_chk_tail = CHK_NUM_BACKEND_REF; + backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; + backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; +#endif + backend_ref[i].bref_backend = router->servers[i]; + /** store pointers to sescmd list to both cursors */ + backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses; + backend_ref[i].bref_sescmd_cur.scmd_cur_active = false; + backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property = + &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; + backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL; + } + /** + * Find out the number of read backend servers. + * Depending on the configuration value type, either copy direct count + * of slave connections or calculate the count from percentage value. */ - atomic_add(&local_backend[BE_SLAVE]->backend_conn_count, 1); - atomic_add(&local_backend[BE_MASTER]->backend_conn_count, 1); + if (client_rses->rses_config.rw_max_slave_conn_count > 0) + { + conf_max_nslaves = client_rses->rses_config.rw_max_slave_conn_count; + } + else + { + conf_max_nslaves = + (router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100; + } + max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); + + spinlock_init(&client_rses->rses_lock); + client_rses->rses_backend_ref = backend_ref; - client_rses->rses_backend[BE_SLAVE] = local_backend[BE_SLAVE]; - client_rses->rses_backend[BE_MASTER] = local_backend[BE_MASTER]; + /** + * Find a backend servers to connect to. + */ + succp = select_connect_backend_servers(&master_ref, + backend_ref, + router_nservers, + max_nslaves, + client_rses->rses_config.rw_slave_select_criteria, + session, + router); + + /** Both Master and at least 1 slave must be found */ + if (!succp) { + free(client_rses->rses_backend_ref); + free(client_rses); + client_rses = NULL; + goto return_rses; + } + /** Copy backend pointers to router session. */ + client_rses->rses_master_ref = master_ref; + client_rses->rses_backend_ref = backend_ref; + client_rses->rses_nbackends = router_nservers; /*< # of backend servers */ + client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT; router->stats.n_sessions += 1; - client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT; /** * Version is bigger than zero once initialized. */ @@ -433,8 +599,13 @@ static void* newSession( router->connections = client_rses; spinlock_release(&router->lock); +return_rses: +#if defined(SS_DEBUG) + if (client_rses != NULL) + { CHK_CLIENT_RSES(client_rses); - + } +#endif return (void *)client_rses; } @@ -452,41 +623,53 @@ static void closeSession( void* router_session) { ROUTER_CLIENT_SES* router_cli_ses; - DCB* slave_dcb; - DCB* master_dcb; + backend_ref_t* backend_ref; router_cli_ses = (ROUTER_CLIENT_SES *)router_session; CHK_CLIENT_RSES(router_cli_ses); + + backend_ref = router_cli_ses->rses_backend_ref; /** * Lock router client session for secure read and update. */ - if (rses_begin_locked_router_action(router_cli_ses)) + if (!router_cli_ses->rses_closed && + rses_begin_locked_router_action(router_cli_ses)) { - /** decrease server current connection counters */ - atomic_add(&router_cli_ses->rses_backend[BE_SLAVE]->backend_server->stats.n_current, -1); - atomic_add(&router_cli_ses->rses_backend[BE_MASTER]->backend_server->stats.n_current, -1); - - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - router_cli_ses->rses_dcb[BE_SLAVE] = NULL; - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - router_cli_ses->rses_dcb[BE_MASTER] = NULL; + int i = 0; + /** + * session must be moved to SESSION_STATE_STOPPING state before + * router session is closed. + */ +#if defined(SS_DEBUG) + SESSION* ses = get_session_by_router_ses((void*)router_cli_ses); - router_cli_ses->rses_closed = true; - /** Unlock */ - rses_end_locked_router_action(router_cli_ses); + ss_dassert(ses != NULL); + ss_dassert(ses->state == SESSION_STATE_STOPPING); +#endif /** - * Close the backend server connections + * This sets router closed. Nobody is allowed to use router + * whithout checking this first. */ - if (slave_dcb != NULL) { - CHK_DCB(slave_dcb); - slave_dcb->func.close(slave_dcb); + router_cli_ses->rses_closed = true; + + for (i=0; irses_nbackends; i++) + { + DCB* dcb = backend_ref[i].bref_dcb; + + /** Close those which had been connected */ + if (dcb != NULL) + { + CHK_DCB(dcb); + backend_ref[i].bref_dcb = NULL; /*< prevent new uses of DCB */ + dcb->func.close(dcb); + /** decrease server current connection counters */ + atomic_add(&backend_ref[i].bref_backend->backend_server->stats.n_current, -1); + atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); } - - if (master_dcb != NULL) { - master_dcb->func.close(master_dcb); - CHK_DCB(master_dcb); } + /** Unlock */ + rses_end_locked_router_action(router_cli_ses); } } @@ -497,13 +680,21 @@ static void freeSession( ROUTER_CLIENT_SES* router_cli_ses; ROUTER_INSTANCE* router; int i; + backend_ref_t* backend_ref; router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session; router = (ROUTER_INSTANCE *)router_instance; + backend_ref = router_cli_ses->rses_backend_ref; - atomic_add(&router_cli_ses->rses_backend[BE_SLAVE]->backend_conn_count, -1); - atomic_add(&router_cli_ses->rses_backend[BE_MASTER]->backend_conn_count, -1); - + for (i=0; irses_nbackends; i++) + { + if (backend_ref[i].bref_dcb == NULL) + { + continue; + } + ss_dassert(backend_ref[i].bref_backend->backend_conn_count > 0); + atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); + } spinlock_acquire(&router->lock); if (router->connections == router_cli_ses) { @@ -542,17 +733,106 @@ static void freeSession( * all the memory and other resources associated * to the client session. */ + free(router_cli_ses->rses_backend_ref); free(router_cli_ses); return; } + +static bool get_dcb( + DCB** p_dcb, + ROUTER_CLIENT_SES* rses, + backend_type_t btype) +{ + backend_ref_t* backend_ref; + int smallest_nconn = -1; + int i; + bool succp = false; + + CHK_CLIENT_RSES(rses); + ss_dassert(p_dcb != NULL && *(p_dcb) == NULL); + + if (p_dcb == NULL) + { + goto return_succp; + } + backend_ref = rses->rses_backend_ref; + + if (btype == BE_SLAVE) + { + for (i=0; irses_nbackends; i++) + { + BACKEND* b = backend_ref[i].bref_backend; + + if (backend_ref[i].bref_dcb != NULL && + SERVER_IS_SLAVE(b->backend_server) && + (smallest_nconn == -1 || + b->backend_conn_count < smallest_nconn)) + { + *p_dcb = backend_ref[i].bref_dcb; + smallest_nconn = b->backend_conn_count; + succp = true; + } + } + + if (!succp) + { + backend_ref = rses->rses_master_ref; + + if (backend_ref->bref_dcb != NULL) + { + *p_dcb = backend_ref->bref_dcb; + succp = true; + + ss_dassert( + SERVER_IS_MASTER(backend_ref->bref_backend->backend_server) && + smallest_nconn == -1); + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Warning : No slaves connected nor " + "available. Choosing master %s:%d " + "instead.", + backend_ref->bref_backend->backend_server->name, + backend_ref->bref_backend->backend_server->port))); + } + } + ss_dassert(succp); + } + else if (btype == BE_MASTER || BE_JOINED) + { + for (i=0; irses_nbackends; i++) + { + BACKEND* b = backend_ref[i].bref_backend; + + if (backend_ref[i].bref_dcb != NULL && + (SERVER_IS_MASTER(b->backend_server) || + SERVER_IS_JOINED(b->backend_server))) + { + *p_dcb = backend_ref[i].bref_dcb; + succp = true; + goto return_succp; + } + } + } +return_succp: + return succp; +} + /** - * It is assumed that whole query comes in a single gwbuf instead of linked list. - * + * The main routing entry, this is called with every packet that is + * received and has to be forwarded to the backend database. * - * @param instance The router instance - * @param router_session The session associated with the client - * @param querybuf Gateway buffer queue with the packets received + * The routeQuery will make the routing decision based on the contents + * of the instance, session and the query itself in the queue. The + * data in the queue may not represent a complete query, it represents + * the data that has been received. The query router itself is responsible + * for buffering the partial query, a later call to the query router will + * contain the remainder, or part thereof of the query. + * + * @param instance The query router instance + * @param session The session associated with the client + * @param queue Gateway buffer queue with the packets received * * @return The number of queries forwarded */ @@ -572,7 +852,7 @@ static int routeQuery( DCB* slave_dcb = NULL; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; - bool rses_is_closed; + bool rses_is_closed = false; size_t len; CHK_CLIENT_RSES(router_cli_ses); @@ -582,27 +862,13 @@ static int routeQuery( { rses_is_closed = true; } - else - { - /*< Lock router client session for secure read of DCBs */ - rses_is_closed = - !(rses_begin_locked_router_action(router_cli_ses)); - } - - if (!rses_is_closed) - { - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - /** unlock */ - rses_end_locked_router_action(router_cli_ses); - } - packet = GWBUF_DATA(querybuf); packet_type = packet[4]; - if (rses_is_closed || (master_dcb == NULL && slave_dcb == NULL)) + if (rses_is_closed) { - LOGIF(LE, (skygw_log_write_flush( + LOGIF(LE, + (skygw_log_write_flush( LOGFILE_ERROR, "Error: Failed to route %s:%s:\"%s\" to " "backend server. %s.", @@ -617,6 +883,9 @@ static int routeQuery( inst->stats.n_queries++; startpos = (char *)&packet[5]; + master_dcb = router_cli_ses->rses_master_ref->bref_dcb; + CHK_DCB(master_dcb); + switch(packet_type) { case COM_QUIT: /**< 1 QUIT will close all sessions */ case COM_INIT_DB: /**< 2 DDL must go to the master */ @@ -642,10 +911,11 @@ static int routeQuery( memset(&querystr[len], 0, 1); // querystr = (char *)GWBUF_DATA(plainsqlbuf); /* - querystr = master_dcb->func.getquerystr( - (void *) gwbuf_clone(querybuf), - &querystr_is_copy); + * querystr = master_dcb->func.getquerystr( + * (void *) gwbuf_clone(querybuf), + * &querystr_is_copy); */ + qtype = skygw_query_classifier_get_type(querystr, 0); break; @@ -660,13 +930,23 @@ static int routeQuery( default: break; } /**< switch by packet type */ - +#if 0 LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "String\t\"%s\"", querystr == NULL ? "(empty)" : querystr))); LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Packet type\t%s", STRPACKETTYPE(packet_type)))); +#endif +#if defined(AUTOCOMMIT_OPT) + if ((QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT) && + !router_cli_ses->rses_autocommit_enabled) || + (QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) && + router_cli_ses->rses_autocommit_enabled)) + { + /** reply directly to client */ + } +#endif /** * If autocommit is disabled or transaction is explicitly started * transaction becomes active and master gets all statements until @@ -725,19 +1005,46 @@ static int routeQuery( } else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) && !router_cli_ses->rses_transaction_active) - { + { + bool succp; + LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, - "Read-only query, routing to Slave."))); + "[%s.%d]\tRead-only query, routing to Slave.", + inst->service->name, + router_cli_ses->rses_id))); ss_dassert(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ)); - ret = slave_dcb->func.write(slave_dcb, querybuf); - atomic_add(&inst->stats.n_slave, 1); + succp = get_dcb(&slave_dcb, router_cli_ses, BE_SLAVE); + if (succp) + { + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + goto return_ret; + } + + if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1) + { + atomic_add(&inst->stats.n_slave, 1); + } + else + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Routing query \"%s\" failed.", + querystr))); + } + rses_end_locked_router_action(router_cli_ses); + } + ss_dassert(succp); goto return_ret; } else - { + { + bool succp = true; + if (LOG_IS_ENABLED(LOGFILE_TRACE)) { if (router_cli_ses->rses_transaction_active) /*< all to master */ @@ -754,12 +1061,29 @@ static int routeQuery( "routing to Master."))); } } - ret = master_dcb->func.write(master_dcb, querybuf); - atomic_add(&inst->stats.n_master, 1); + if (master_dcb == NULL) + { + succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER); + } + if (succp) + { + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + goto return_ret; + } + + if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1) + { + atomic_add(&inst->stats.n_master, 1); + } + rses_end_locked_router_action(router_cli_ses); + } + ss_dassert(succp); + ss_dassert(ret == 1); goto return_ret; } - return_ret: if (plainsqlbuf != NULL) { @@ -896,12 +1220,11 @@ static void clientReply( GWBUF* writebuf, DCB* backend_dcb) { - DCB* master_dcb; - DCB* slave_dcb; DCB* client_dcb; ROUTER_CLIENT_SES* router_cli_ses; sescmd_cursor_t* scur = NULL; - backend_type_t be_type = BE_UNDEFINED; + backend_ref_t* backend_ref; + int i; router_cli_ses = (ROUTER_CLIENT_SES *)router_session; CHK_CLIENT_RSES(router_cli_ses); @@ -917,9 +1240,6 @@ static void clientReply( GWBUF_LENGTH(writebuf))) != NULL); goto lock_failed; } - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - /** Holding lock ensures that router session remains open */ ss_dassert(backend_dcb->session != NULL); client_dcb = backend_dcb->session->client; @@ -941,29 +1261,31 @@ static void clientReply( writebuf, GWBUF_LENGTH(writebuf))) != NULL); /** Log that client was closed before reply */ - return; + goto lock_failed; } - - if (backend_dcb == master_dcb) - { - be_type = BE_MASTER; - } - else if (backend_dcb == slave_dcb) - { - be_type = BE_SLAVE; - } - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "reply_by_statement", - backend_dcb, - gwbuf_clone(writebuf))); /** Lock router session */ if (!rses_begin_locked_router_action(router_cli_ses)) { /** Log to debug that router was closed */ goto lock_failed; } + backend_ref = router_cli_ses->rses_backend_ref; - scur = rses_get_sescmd_cursor(router_cli_ses, be_type); + /** find backend_dcb's corresponding BACKEND */ + i = 0; + while (irses_nbackends && + backend_ref[i].bref_dcb != backend_dcb) + { + i++; + } + ss_dassert(backend_ref[i].bref_dcb == backend_dcb); + + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "reply_by_statement", + &backend_ref[i], + gwbuf_clone(writebuf))); + + scur = &backend_ref[i].bref_sescmd_cur; /** * Active cursor means that reply is from session command * execution. Majority of the time there are no session commands @@ -996,181 +1318,420 @@ lock_failed: return; } + +int bref_cmp_router_conn( + const void* bref1, + const void* bref2) +{ + BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend; + BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend; + + return ((b1->backend_conn_count < b2->backend_conn_count) ? -1 : + ((b1->backend_conn_count > b2->backend_conn_count) ? 1 : 0)); +} + +int bref_cmp_global_conn( + const void* bref1, + const void* bref2) +{ + BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend; + BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend; + + return ((b1->backend_server->stats.n_current < b2->backend_server->stats.n_current) ? -1 : + ((b1->backend_server->stats.n_current > b2->backend_server->stats.n_current) ? 1 : 0)); +} + + +int bref_cmp_behind_master( + const void* bref1, + const void* bref2) +{ + return 1; +} + /** - * @node Search suitable backend server from those of router instance. + * @node Search suitable backend servers from those of router instance. * * Parameters: - * @param p_master - in, use, out - * Pointer to location where master's address is to be stored. - * If NULL, then master is not searched. + * @param p_master_ref - in, use, out + * Pointer to location where master's backend reference is to be stored. + * NULL is not allowed. * - * @param p_slave - in, use, out - * Pointer to location where slave's address is to be stored. - * if NULL, then slave is not searched. + * @param backend_ref - in, use, out + * Pointer to backend server reference object array. + * NULL is not allowed. * - * @param inst - in, use - * Pointer to router instance + * @param router_nservers - in, use + * Number of backend server pointers pointed to by b. * - * @return true, if all what what requested found, false if the request - * was not satisfied or was partially satisfied. + * @param max_nslaves - in, use + * Upper limit for the number of slaves. Configuration parameter or default. + * + * @param session - in, use + * MaxScale session pointer used when connection to backend is established. + * + * @param router - in, use + * Pointer to router instance. Used when server states are qualified. + * + * @return true, if at least one master and one slave was found. * * * @details It is assumed that there is only one master among servers of - * a router instance. As a result, thr first master is always chosen. + * a router instance. As a result, the first master found is chosen. */ -static bool search_backend_servers( - BACKEND** p_master, - BACKEND** p_slave, +static bool select_connect_backend_servers( + backend_ref_t** p_master_ref, + backend_ref_t* backend_ref, + int router_nservers, + int max_nslaves, + select_criteria_t select_criteria, + SESSION* session, ROUTER_INSTANCE* router) { - BACKEND* local_backend[BE_COUNT] = {NULL,NULL}; + bool succp = true; + bool master_found = false; + bool master_connected = false; + int slaves_found = 0; + int slaves_connected = 0; int i; - bool succp = true; - - /* - * Loop over all the servers and find any that have fewer connections - * than current candidate server. - * - * If a server has less connections than the current candidate it is - * chosen to a new candidate. - * - * If a server has the same number of connections currently as the - * candidate and has had less connections over time than the candidate - * it will also become the new candidate. This has the effect of - * spreading the connections over different servers during periods of - * very low load. - * - * If master is searched for, the first master found is chosen. + const int min_nslaves = 0; /*< not configurable at the time */ + bool is_synced_master; + int (*p)(const void *, const void *); + + if (p_master_ref == NULL || backend_ref == NULL) + { + ss_dassert(FALSE); + succp = false; + goto return_succp; + } + /** Check slave selection criteria and set compare function */ + p = criteria_cmpfun[select_criteria]; + + if (p == NULL) + { + succp = false; + goto return_succp; + } + + if (router->bitvalue != 0) /*< 'synced' is the only bitvalue in rwsplit */ + { + is_synced_master = true; + } + else + { + is_synced_master = false; + } + +#if 0 + LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:"))); + for (i=0; ibackend_server->name, + b->backend_server->port, + b->backend_conn_count))); + } +#endif + /** + * Sort the pointer list to servers according to connection counts. As + * a consequence those backends having least connections are in the + * beginning of the list. */ - for (i = 0; router->servers[i] != NULL; i++) { - BACKEND* be = router->servers[i]; + qsort((void *)backend_ref, (size_t)router_nservers, sizeof(backend_ref_t), p); + + if (LOG_IS_ENABLED(LOGFILE_TRACE)) + { + if (select_criteria == LEAST_GLOBAL_CONNECTIONS || + select_criteria == LEAST_ROUTER_CONNECTIONS) + { + LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, + "Servers and %s connection counts:", + select_criteria == LEAST_GLOBAL_CONNECTIONS ? + "all MaxScale" : "router"))); + + for (i=0; ibackend_server->name, + b->backend_server->port, + b->backend_server->stats.n_current))); + break; + + case LEAST_ROUTER_CONNECTIONS: + LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE, + "%s %d:%d", + b->backend_server->name, + b->backend_server->port, + b->backend_conn_count))); + break; + + default: + break; + } + } + } + } + /** + * Choose at least 1+1 (master and slave) and at most 1+max_nslaves + * servers from the sorted list. First master found is selected. + */ + for (i=0; + ibitvalue is %d", pthread_self(), - be->backend_server->name, - be->backend_server->port, - be->backend_conn_count, - be->backend_server->status, + b->backend_server->name, + b->backend_server->port, + b->backend_conn_count, + b->backend_server->status, router->bitmask))); - } - if (be != NULL && - SERVER_IS_RUNNING(be->backend_server) && - (be->backend_server->status & router->bitmask) == - router->bitvalue) + if (SERVER_IS_RUNNING(b->backend_server) && + ((b->backend_server->status & router->bitmask) == + router->bitvalue)) + { + if (slaves_found < max_nslaves && + SERVER_IS_SLAVE(b->backend_server)) { - if (SERVER_IS_SLAVE(be->backend_server) && - p_slave != NULL) + slaves_found += 1; + backend_ref[i].bref_dcb = dcb_connect( + b->backend_server, + session, + b->backend_server->protocol); + + if (backend_ref[i].bref_dcb != NULL) { - /** - * If no candidate set, set first running - * server as an initial candidate server. - */ - if (local_backend[BE_SLAVE] == NULL) - { - local_backend[BE_SLAVE] = be; - } - else if (be->backend_conn_count < - local_backend[BE_SLAVE]->backend_conn_count) - { - /** - * This running server has fewer - * connections, set it as a new - * candidate. + slaves_connected += 1; + /** + * Increase backend connection counter. + * Server's stats are _increased_ in + * dcb.c:dcb_alloc ! + * But decreased in the calling function + * of dcb_close. */ - local_backend[BE_SLAVE] = be; + atomic_add(&b->backend_conn_count, 1); } - else if (be->backend_conn_count == - local_backend[BE_SLAVE]->backend_conn_count && - be->backend_server->stats.n_connections < - local_backend[BE_SLAVE]->backend_server->stats.n_connections) + else { - /** - * This running server has the same - * number of connections currently - * as the candidate but has had - * fewer connections over time - * than candidate, set this server - * to candidate. - */ - local_backend[BE_SLAVE] = be; - } - } - else if (p_master != NULL && - local_backend[BE_MASTER] == NULL && - SERVER_IS_MASTER(be->backend_server)) - { - local_backend[BE_MASTER] = be; - } - else if (p_master != NULL && - local_backend[BE_JOINED] == NULL && - SERVER_IS_JOINED(be->backend_server)) - { - local_backend[BE_JOINED] = be; + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to establish " + "connection with slave %s:%d", + b->backend_server->name, + b->backend_server->port))); + /* handle connect error */ } + } + else if (!master_connected && + (SERVER_IS_MASTER(b->backend_server) || + SERVER_IS_JOINED(b->backend_server))) + { + master_found = true; + + backend_ref[i].bref_dcb = dcb_connect( + b->backend_server, + session, + b->backend_server->protocol); + + if (backend_ref[i].bref_dcb != NULL) + { + master_connected = true; + *p_master_ref = &backend_ref[i]; + /** Increase backend connection counter */ + /** Increase backend connection counter */ + atomic_add(&b->backend_server->stats.n_current, 1); + atomic_add(&b->backend_server->stats.n_connections, 1); + atomic_add(&b->backend_conn_count, 1); + } + else + { + succp = false; + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to establish " + "connection with master %s:%d", + b->backend_server->name, + b->backend_server->port))); + /* handle connect error */ + } + } } - } + } /*< for */ - if (router->bitvalue != 0 && - p_master != NULL && - local_backend[BE_JOINED] == NULL) + /** + * Successful cases + */ + if (master_connected && + slaves_connected >= min_nslaves && + slaves_connected <= max_nslaves) + { + succp = true; + + if (slaves_connected == 0 && slaves_found > 0) { - succp = false; - LOGIF(LE, (skygw_log_write_flush( + LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, - "Error : Couldn't find a Joined Galera node from %d " - "candidates.", - i))); - goto return_succp; - } + "Warning : Couldn't connect to any of the %d " + "slaves. Routing to %s only.", + slaves_found, + (is_synced_master ? "Galera nodes" : "Master")))); - if (p_slave != NULL && local_backend[BE_SLAVE] == NULL) { - succp = false; - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Couldn't find suitable Slave from %d " - "candidates.", - i))); - } - - if (p_master != NULL && local_backend[BE_MASTER] == NULL) { - succp = false; - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Couldn't find suitable Master from %d " - "candidates.", - i))); + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : Couldn't connect to any of the %d " + "slaves. Routing to %s only.", + slaves_found, + (is_synced_master ? "Galera nodes" : "Master")))); } - - if (local_backend[BE_SLAVE] != NULL) { - *p_slave = local_backend[BE_SLAVE]; + else if (slaves_found == 0) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Warning : Couldn't find any slaves from existing " + "%d servers. Routing to %s only.", + router_nservers, + (is_synced_master ? "Galera nodes" : "Master")))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : Couldn't find any slaves from existing " + "%d servers. Routing to %s only.", + router_nservers, + (is_synced_master ? "Galera nodes" : "Master")))); + } + else if (slaves_connected < max_nslaves) + { + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "Note : Couldn't connect to maximum number of " + "slaves. Connected successfully to %d slaves " + "of %d of them.", + slaves_connected, + slaves_found))); + } + + if (LOG_IS_ENABLED(LT)) + { + for (i=0; ibackend_server->name, + b->backend_server->port))); + } + } /* for */ + } + } + /** + * Failure cases + */ + else + { + if (!master_found) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : Couldn't find suitable %s from %d " + "candidates.", + (is_synced_master ? "Galera node" : "Master"), + router_nservers))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Error : Couldn't find suitable %s from %d " + "candidates.", + (is_synced_master ? "Galera node" : "Master"), + router_nservers))); + LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, - "%lu [readwritesplit:search_backend_servers] Selected " - "Slave %s:%d from %d candidates.", - pthread_self(), - local_backend[BE_SLAVE]->backend_server->name, - local_backend[BE_SLAVE]->backend_server->port, - i))); + "Error : Couldn't find suitable %s from %d " + "candidates.", + (is_synced_master ? "Galera node" : "Master"), + router_nservers))); } - if (local_backend[BE_MASTER] != NULL) { - *p_master = local_backend[BE_MASTER]; - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [readwritesplit:search_backend_servers] Selected " - "Master %s:%d " - "from %d candidates.", - pthread_self(), - local_backend[BE_MASTER]->backend_server->name, - local_backend[BE_MASTER]->backend_server->port, - i))); + else if (!master_connected) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : Couldn't connect to any %s although " + "there exists at least one %s node in the " + "cluster.", + (is_synced_master ? "Galera node" : "Master"), + (is_synced_master ? "Galera node" : "Master")))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Error : Couldn't connect to any %s although " + "there exists at least one %s node in the " + "cluster.", + (is_synced_master ? "Galera node" : "Master"), + (is_synced_master ? "Galera node" : "Master")))); + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Error : Couldn't connect to any %s although " + "there exists at least one %s node in the " + "cluster.", + (is_synced_master ? "Galera node" : "Master"), + (is_synced_master ? "Galera node" : "Master")))); + } + + if (slaves_connected < min_nslaves) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : Couldn't establish required amount of " + "slave connections for router session."))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "*Error : Couldn't establish required amount of " + "slave connections for router session."))); + } + + /** Clean up connections */ + for (i=0; ibackend_conn_count > 0); + /** disconnect opened connections */ + backend_ref[i].bref_dcb->func.close(backend_ref[i].bref_dcb); + atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); + } + } + master_connected = false; + slaves_connected = 0; } return_succp: + return succp; } @@ -1386,7 +1947,7 @@ static GWBUF* sescmd_cursor_process_replies( packet = (uint8_t *)GWBUF_DATA(replybuf); packetlen = packet[0]+packet[1]*256+packet[2]*256*256; replybuf = gwbuf_consume(replybuf, packetlen+headerlen); - +/* LOGIF(LT, (skygw_log_write_flush( LOGFILE_TRACE, "%lu [sescmd_cursor_process_replies] cmd %p " @@ -1396,11 +1957,13 @@ static GWBUF* sescmd_cursor_process_replies( scmd, packetlen+headerlen, STRBETYPE(scur->scmd_cur_be_type)))); + */ } else { /** Mark the rest session commands as replied */ scmd->my_sescmd_is_replied = true; + /* LOGIF(LT, (skygw_log_write_flush( LOGFILE_TRACE, "%lu [sescmd_cursor_process_replies] Marked " @@ -1409,6 +1972,7 @@ static GWBUF* sescmd_cursor_process_replies( pthread_self(), scmd, STRBETYPE(scur->scmd_cur_be_type)))); + */ } if (sescmd_cursor_next(scur)) @@ -1448,17 +2012,6 @@ static mysql_sescmd_t* sescmd_cursor_get_command( return scmd; } -/** router must be locked */ -static sescmd_cursor_t* rses_get_sescmd_cursor( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type) -{ - CHK_CLIENT_RSES(rses); - ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); - - return &rses->rses_cursor[be_type]; -} - /** router must be locked */ static bool sescmd_cursor_is_active( sescmd_cursor_t* sescmd_cursor) @@ -1508,24 +2061,26 @@ static GWBUF* sescmd_cursor_clone_querybuf( * Router session must be locked. */ static bool execute_sescmd_in_backend( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type) + backend_ref_t* backend_ref) { DCB* dcb; bool succp = true; int rc = 0; sescmd_cursor_t* scur; - dcb = rses->rses_dcb[be_type]; + if (backend_ref->bref_dcb == NULL) + { + goto return_succp; + } + dcb = backend_ref->bref_dcb; CHK_DCB(dcb); - CHK_CLIENT_RSES(rses); - ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); + CHK_BACKEND_REF(backend_ref); /** * Get cursor pointer and copy of command buffer to cursor. */ - scur = rses_get_sescmd_cursor(rses, be_type); + scur = &backend_ref->bref_sescmd_cur; /** Return if there are no pending ses commands */ if (sescmd_cursor_get_command(scur) == NULL) @@ -1539,9 +2094,10 @@ static bool execute_sescmd_in_backend( /** Cursor is left active when function returns. */ sescmd_cursor_set_active(scur, true); } - LOGIF(LT, tracelog_routed_query(rses, + + LOGIF(LT, tracelog_routed_query(scur->scmd_cur_rses, "execute_sescmd_in_backend", - dcb, + backend_ref, sescmd_cursor_clone_querybuf(scur))); switch (scur->scmd_cur_cmd->my_sescmd_packet_type) { @@ -1553,7 +2109,6 @@ static bool execute_sescmd_in_backend( sescmd_cursor_clone_querybuf(scur)); break; - case COM_QUIT: case COM_QUERY: case COM_INIT_DB: default: @@ -1658,7 +2213,7 @@ static rses_property_t* mysql_sescmd_get_property( static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, char* funcname, - DCB* dcb, + backend_ref_t* bref, GWBUF* buf) { uint8_t* packet = GWBUF_DATA(buf); @@ -1667,20 +2222,18 @@ static void tracelog_routed_query( size_t buflen = GWBUF_LENGTH(buf); char* querystr; char* startpos = (char *)&packet[5]; + BACKEND* b; backend_type_t be_type; + DCB* dcb; + + CHK_BACKEND_REF(bref); + b = bref->bref_backend; + CHK_BACKEND(b); + dcb = bref->bref_dcb; + CHK_DCB(dcb); + + be_type = BACKEND_TYPE(b); - if (rses->rses_dcb[BE_MASTER] == dcb) - { - be_type = BE_MASTER; - } - else if (rses->rses_dcb[BE_SLAVE] == dcb) - { - be_type = BE_SLAVE; - } - else - { - be_type = BE_UNDEFINED; - } if (GWBUF_TYPE(buf) == GWBUF_TYPE_MYSQL) { len = packet[0]; @@ -1699,16 +2252,8 @@ static void tracelog_routed_query( funcname, buflen, querystr, - (be_type == BE_MASTER ? - rses->rses_backend[BE_MASTER]->backend_server->name : - (be_type == BE_SLAVE ? - rses->rses_backend[BE_SLAVE]->backend_server->name : - "Target DCB is neither of the backends. This is error")), - (be_type == BE_MASTER ? - rses->rses_backend[BE_MASTER]->backend_server->port : - (be_type == BE_SLAVE ? - rses->rses_backend[BE_SLAVE]->backend_server->port : - -1)), + b->backend_server->name, + b->backend_server->port, STRBETYPE(be_type), dcb))); free(querystr); @@ -1762,20 +2307,19 @@ static bool route_session_write( skygw_query_type_t qtype) { bool succp; - DCB* master_dcb; - DCB* slave_dcb; rses_property_t* prop; + backend_ref_t* backend_ref; + int i; - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - CHK_DCB(master_dcb); - CHK_DCB(slave_dcb); LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Session write, query type\t%s, packet type %s, " "routing to all servers.", STRQTYPE(qtype), STRPACKETTYPE(packet_type)))); + + backend_ref = router_cli_ses->rses_backend_ref; + /** * COM_QUIT is one-way message. Server doesn't respond to that. * Therefore reply processing is unnecessary and session @@ -1785,15 +2329,32 @@ static bool route_session_write( if (packet_type == COM_QUIT) { int rc; - int rc2; - rc = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf)); - rc2 = slave_dcb->func.write(slave_dcb, querybuf); - - if (rc == 1 && rc == rc2) - { - succp = true; + succp = true; + + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + succp = false; + goto return_succp; } + + for (i=0; irses_nbackends; i++) + { + DCB* dcb = backend_ref[i].bref_dcb; + + if (dcb != NULL) + { + rc = dcb->func.write(dcb, gwbuf_clone(querybuf)); + + if (rc != 1) + { + succp = false; + } + } + } + rses_end_locked_router_action(router_cli_ses); + gwbuf_free(querybuf); goto return_succp; } prop = rses_property_init(RSES_PROP_TYPE_SESCMD); @@ -1814,8 +2375,9 @@ static bool route_session_write( /** Add sescmd property to router client session */ rses_property_add(router_cli_ses, prop); - /** Execute session command in master */ - succp = execute_sescmd_in_backend(router_cli_ses, BE_MASTER); + for (i=0; irses_nbackends; i++) + { + succp = execute_sescmd_in_backend(&backend_ref[i]); if (!succp) { @@ -1823,13 +2385,6 @@ static bool route_session_write( rses_end_locked_router_action(router_cli_ses); goto return_succp; } - /** Execute session command in slave */ - succp = execute_sescmd_in_backend(router_cli_ses, BE_SLAVE); - if (!succp) - { - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); - goto return_succp; } /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); @@ -1840,3 +2395,52 @@ return_succp: return succp; } +static void rwsplit_process_options( + ROUTER_INSTANCE* router, + char** options) +{ + int i; + char* value; + select_criteria_t c; + + for (i = 0; options[i]; i++) + { + if ((value = strchr(options[i], '=')) == NULL) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, "Warning : Unsupported " + "router option \"%s\" for " + "readwritesplit router.", + options[i]))); + } + else + { + *value = 0; + value++; + if (strcmp(options[i], "slave_selection_criteria") == 0) + { + c = GET_SELECT_CRITERIA(value); + ss_dassert( + c == LEAST_GLOBAL_CONNECTIONS || + c == LEAST_ROUTER_CONNECTIONS || + c == LEAST_BEHIND_MASTER || + c == UNDEFINED_CRITERIA); + + if (c == UNDEFINED_CRITERIA) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, "Warning : Unknown " + "slave selection criteria \"%s\". " + "Allowed values are \"LEAST_GLOBAL_CONNECTIONS\", " + "LEAST_ROUTER_CONNECTIONS, " + "and \"LEAST_ROUTER_CONNECTIONS\".", + STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria)))); + } + else + { + router->rwsplit_config.rw_slave_select_criteria = c; + } + } + } + } /*< for */ +} \ No newline at end of file diff --git a/server/modules/routing/readwritesplit/test/test_autocommit_disabled3.sql b/server/modules/routing/readwritesplit/test/test_autocommit_disabled3.sql new file mode 100644 index 000000000..04be4024e --- /dev/null +++ b/server/modules/routing/readwritesplit/test/test_autocommit_disabled3.sql @@ -0,0 +1,9 @@ +use test; +drop table if exists t1; +create table t1 (id integer); +set autocommit=0; -- open transaction +begin; +insert into t1 values(1); -- write to master +commit; +select count(*) from t1; -- read from master since autocommit is disabled +drop table t1; diff --git a/server/modules/routing/test/makefile b/server/modules/routing/test/makefile new file mode 100644 index 000000000..039f2910e --- /dev/null +++ b/server/modules/routing/test/makefile @@ -0,0 +1,41 @@ +# cleantests - clean local and subdirectories' tests +# buildtests - build all local and subdirectories' tests +# runtests - run all local tests +# testall - clean, build and run local and subdirectories' tests + +include ../../../../build_gateway.inc +include $(ROOT_PATH)/makefile.inc +include $(ROOT_PATH)/test.inc + +CC=cc +TESTLOG := $(shell pwd)/testrouting.log +RET := -1 + +cleantests: + - $(DEL) *.o + - $(DEL) *~ + + +testall: + -$(MAKE) cleantests + -$(MAKE) DEBUG=Y buildtests + -$(MAKE) runtests + @echo "" >> $(TESTLOG) + @echo "-------------------------------" >> $(TESTLOG) + @echo $(shell date) >> $(TESTLOG) + @echo "Test Read/Write Split Router" >> $(TESTLOG) + $(MAKE) -C $(ROOT_PATH)/server/modules/routing/readwritesplit testall + + +buildtests: + $(MAKE) -C $(ROOT_PATH)/server/modules/routing/readwritesplit buildtests + + +runtests: + @echo "" > $(TESTLOG) + @echo "-------------------------------" >> $(TESTLOG) + @echo $(shell date) >> $(TESTLOG) + @echo "Test routing" >> $(TESTLOG) + @echo "-------------------------------" >> $(TESTLOG) + @echo "Nothing to run here so far" >> $(TESTLOG) + @cat $(TESTLOG) >> $(TEST_MAXSCALE_LOG) diff --git a/utils/skygw_debug.h b/utils/skygw_debug.h index 8c11d9782..7277cb2e5 100644 --- a/utils/skygw_debug.h +++ b/utils/skygw_debug.h @@ -119,7 +119,10 @@ typedef enum skygw_chk_t { CHK_NUM_SESSION, CHK_NUM_ROUTER_SES, CHK_NUM_MY_SESCMD, - CHK_NUM_ROUTER_PROPERTY + CHK_NUM_ROUTER_PROPERTY, + CHK_NUM_SESCMD_CUR, + CHK_NUM_BACKEND, + CHK_NUM_BACKEND_REF } skygw_chk_t; # define STRBOOL(b) ((b) ? "true" : "false") @@ -221,6 +224,11 @@ typedef enum skygw_chk_t { ((t) == BE_UNDEFINED ? "BE_UNDEFINED" : \ "Unknown backend tpe"))) +#define STRCRITERIA(c) ((c) == UNDEFINED_CRITERIA ? "UNDEFINED_CRITERIA" : \ + ((c) == LEAST_GLOBAL_CONNECTIONS ? "LEAST_GLOBAL_CONNECTIONS" : \ + ((c) == LEAST_ROUTER_CONNECTIONS ? "LEAST_ROUTER_CONNECTIONS" : \ + ((c) == LEAST_BEHIND_MASTER ? "LEAST_BEHIND_MASTER" : "Unknown criteria")))) + #define CHK_MLIST(l) { \ ss_info_dassert((l->mlist_chk_top == CHK_NUM_MLIST && \ l->mlist_chk_tail == CHK_NUM_MLIST), \ @@ -446,7 +454,25 @@ typedef enum skygw_chk_t { "Session command has invalid check fields"); \ } +#define CHK_SESCMD_CUR(c) { \ + ss_info_dassert((c)->scmd_cur_chk_top == CHK_NUM_SESCMD_CUR && \ + (c)->scmd_cur_chk_tail == CHK_NUM_SESCMD_CUR, \ + "Session command cursor has invalid check fields"); \ + } +#define CHK_BACKEND(b) { \ + ss_info_dassert((b)->be_chk_top == CHK_NUM_BACKEND && \ + (b)->be_chk_tail == CHK_NUM_BACKEND, \ + "BACKEND has invalid check fields"); \ +} + +#define CHK_BACKEND_REF(r) { \ + ss_info_dassert((r)->bref_chk_top == CHK_NUM_BACKEND_REF && \ + (r)->bref_chk_tail == CHK_NUM_BACKEND_REF, \ + "Backend reference has invalid check fields"); \ +} + + #if defined(SS_DEBUG) bool conn_open[10240]; #endif