diff --git a/server/modules/routing/schemarouter/CMakeLists.txt b/server/modules/routing/schemarouter/CMakeLists.txt index 4a04a85f5..38b2f6c68 100644 --- a/server/modules/routing/schemarouter/CMakeLists.txt +++ b/server/modules/routing/schemarouter/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(schemarouter SHARED schemarouter.c sharding_common.c) +add_library(schemarouter SHARED schemarouter.cc) target_link_libraries(schemarouter maxscale-common) add_dependencies(schemarouter pcre2) set_target_properties(schemarouter PROPERTIES VERSION "1.0.0") diff --git a/server/modules/routing/schemarouter/schemarouter.c b/server/modules/routing/schemarouter/schemarouter.cc similarity index 94% rename from server/modules/routing/schemarouter/schemarouter.c rename to server/modules/routing/schemarouter/schemarouter.cc index 9a4e4bd3c..a99b4fade 100644 --- a/server/modules/routing/schemarouter/schemarouter.c +++ b/server/modules/routing/schemarouter/schemarouter.cc @@ -13,26 +13,22 @@ #include "schemarouter.h" -#include -#include -#include -#include -#include #include -#include -#include "sharding_common.h" -#include -#include +#include +#include +#include +#include + +#include +#include #include -#include -#include -#include #include #include -#include -#include #include -#include +#include +#include +#include +#include #define DEFAULT_REFRESH_INTERVAL "300" @@ -57,32 +53,11 @@ * @endverbatim */ -static MXS_ROUTER* createInstance(SERVICE *service, char **options); -static MXS_ROUTER_SESSION* newSession(MXS_ROUTER *instance, MXS_SESSION *session); -static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session); -static void freeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session); -static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session, GWBUF *queue); -static void diagnostic(MXS_ROUTER *instance, DCB *dcb); - -static void clientReply(MXS_ROUTER* instance, - MXS_ROUTER_SESSION* router_session, - GWBUF* queue, - DCB* backend_dcb); - -static void handleError(MXS_ROUTER* instance, - MXS_ROUTER_SESSION* router_session, - GWBUF* errmsgbuf, - DCB* backend_dcb, - mxs_error_action_t action, - bool* succp); static backend_ref_t* get_bref_from_dcb(SCHEMAROUTER_SESSION* rses, DCB* dcb); static route_target_t get_shard_route_target(qc_query_type_t qtype, bool trx_active, HINT* hint); - -static uint64_t getCapabilities(MXS_ROUTER* instance); - static bool connect_backend_servers(backend_ref_t* backend_ref, int router_nservers, MXS_SESSION* session, @@ -122,7 +97,7 @@ static bool route_session_write(SCHEMAROUTER_SESSION* router_client_ses, GWBUF* querybuf, SCHEMAROUTER* inst, unsigned char packet_type, - qc_query_type_t qtype); + uint32_t qtype); static void bref_clear_state(backend_ref_t* bref, bref_state_t state); static void bref_set_state(backend_ref_t* bref, bref_state_t state); static sescmd_cursor_t* backend_ref_get_sescmd_cursor (backend_ref_t* bref); @@ -135,7 +110,8 @@ static void handle_error_reply_client(MXS_SESSION* ses, SCHEMAROUTER_SESSION* rses, DCB* backend_dcb, GWBUF* errmsg); - +bool change_current_db(char* dest, HASHTABLE* dbhash, GWBUF* buf); +bool extract_database(GWBUF* buf, char* str); static SPINLOCK instlock; static SCHEMAROUTER* instances; @@ -256,7 +232,7 @@ char* get_lenenc_str(void* data) } } - rval = MXS_MALLOC(sizeof(char) * (size + 1)); + rval = (char*)MXS_MALLOC(sizeof(char) * (size + 1)); if (rval) { memcpy(rval, ptr + offset, size); @@ -416,12 +392,13 @@ int gen_databaselist(SCHEMAROUTER* inst, SCHEMAROUTER_SESSION* session) session->init &= ~INIT_UNINT; len = strlen(query) + 1; buffer = gwbuf_alloc(len + 4); - *((unsigned char*)buffer->start) = len; - *((unsigned char*)buffer->start + 1) = len >> 8; - *((unsigned char*)buffer->start + 2) = len >> 16; - *((unsigned char*)buffer->start + 3) = 0x0; - *((unsigned char*)buffer->start + 4) = 0x03; - memcpy(buffer->start + 5, query, strlen(query)); + uint8_t *data = GWBUF_DATA(buffer); + *(data) = len; + *(data + 1) = len >> 8; + *(data + 2) = len >> 16; + *(data + 3) = 0x0; + *(data + 4) = 0x03; + memcpy(data + 5, query, strlen(query)); for (i = 0; i < session->rses_nbackends; i++) { @@ -452,7 +429,7 @@ int gen_databaselist(SCHEMAROUTER* inst, SCHEMAROUTER_SESSION* session) char* get_shard_target_name(SCHEMAROUTER* router, SCHEMAROUTER_SESSION* client, GWBUF* buffer, - qc_query_type_t qtype) + uint32_t qtype) { int sz = 0, i, j; char** dbnms = NULL; @@ -536,7 +513,7 @@ char* get_shard_target_name(SCHEMAROUTER* router, { char *srvnm = client->rses_backend_ref[i].bref_backend->server->unique_name; - if (strcmp(srvnm, buffer->hint->data) == 0) + if (strcmp(srvnm, (char*)buffer->hint->data) == 0) { rval = srvnm; MXS_INFO("Routing hint found (%s)", srvnm); @@ -583,240 +560,6 @@ bool check_shard_status(SCHEMAROUTER* router, char* shard) return false; } -/** - * The module entry point routine. It is this routine that - * must populate the structure that is referred to as the - * "module object", this is a structure with the set of - * external entry points for this module. - * - * @return The module object - */ -MXS_MODULE* MXS_CREATE_MODULE() -{ - MXS_NOTICE("Initializing Schema Sharding Router."); - spinlock_init(&instlock); - instances = NULL; - - static MXS_ROUTER_OBJECT MyObject = - { - createInstance, - newSession, - closeSession, - freeSession, - routeQuery, - diagnostic, - clientReply, - handleError, - getCapabilities, - NULL - }; - - static MXS_MODULE info = - { - MXS_MODULE_API_ROUTER, - MXS_MODULE_BETA_RELEASE, - MXS_ROUTER_VERSION, - "A database sharding router for simple sharding", - "V1.0.0", - RCAP_TYPE_CONTIGUOUS_INPUT, - &MyObject, - NULL, /* Process init. */ - NULL, /* Process finish. */ - NULL, /* Thread init. */ - NULL, /* Thread finish. */ - { - {"ignore_databases", MXS_MODULE_PARAM_STRING}, - {"ignore_databases_regex", MXS_MODULE_PARAM_STRING}, - {"max_sescmd_history", MXS_MODULE_PARAM_COUNT, "0"}, - {"disable_sescmd_history", MXS_MODULE_PARAM_BOOL, "false"}, - {"refresh_databases", MXS_MODULE_PARAM_BOOL, "true"}, - {"refresh_interval", MXS_MODULE_PARAM_COUNT, DEFAULT_REFRESH_INTERVAL}, - {"debug", MXS_MODULE_PARAM_BOOL, "false"}, - {MXS_END_MODULE_PARAMS} - } - }; - - return &info; -} - -/** - * Create an instance of schemarouter router within the MaxScale. - * - * - * @param service The service this router is being create for - * @param options The options for this query router - * - * @return NULL in failure, pointer to router in success. - */ -static MXS_ROUTER* createInstance(SERVICE *service, char **options) -{ - SCHEMAROUTER* router; - MXS_CONFIG_PARAMETER* conf; - MXS_CONFIG_PARAMETER* param; - - if ((router = MXS_CALLOC(1, sizeof(SCHEMAROUTER))) == NULL) - { - return NULL; - } - - if ((router->ignored_dbs = hashtable_alloc(SCHEMAROUTER_HASHSIZE, hashkeyfun, hashcmpfun)) == NULL) - { - MXS_ERROR("Memory allocation failed when allocating schemarouter database ignore list."); - MXS_FREE(router); - return NULL; - } - - hashtable_memory_fns(router->ignored_dbs, hashtable_item_strdup, NULL, hashtable_item_free, NULL); - - if ((router->shard_maps = hashtable_alloc(SCHEMAROUTER_USERHASH_SIZE, hashkeyfun, hashcmpfun)) == NULL) - { - MXS_ERROR("Memory allocation failed when allocating schemarouter database ignore list."); - hashtable_free(router->ignored_dbs); - MXS_FREE(router); - return NULL; - } - - hashtable_memory_fns(router->shard_maps, hashtable_item_strdup, NULL, keyfreefun, NULL); - - /** Add default system databases to ignore */ - hashtable_add(router->ignored_dbs, "mysql", ""); - hashtable_add(router->ignored_dbs, "information_schema", ""); - hashtable_add(router->ignored_dbs, "performance_schema", ""); - router->service = service; - router->schemarouter_config.max_sescmd_hist = 0; - router->schemarouter_config.last_refresh = time(NULL); - router->stats.longest_sescmd = 0; - router->stats.n_hist_exceeded = 0; - router->stats.n_queries = 0; - router->stats.n_sescmd = 0; - router->stats.ses_longest = 0; - router->stats.ses_shortest = (double)((unsigned long)(~0)); - spinlock_init(&router->lock); - - conf = service->svc_config_param; - - router->schemarouter_config.refresh_databases = config_get_bool(conf, "refresh_databases"); - router->schemarouter_config.refresh_min_interval = config_get_integer(conf, "refresh_interval"); - router->schemarouter_config.max_sescmd_hist = config_get_integer(conf, "max_sescmd_history"); - router->schemarouter_config.disable_sescmd_hist = config_get_bool(conf, "disable_sescmd_history"); - router->schemarouter_config.debug = config_get_bool(conf, "debug"); - - if ((config_get_param(conf, "auth_all_servers")) == NULL) - { - MXS_NOTICE("Authentication data is fetched from all servers. To disable this " - "add 'auth_all_servers=0' to the service."); - service->users_from_all = true; - } - - if ((param = config_get_param(conf, "ignore_databases_regex"))) - { - int errcode; - PCRE2_SIZE erroffset; - pcre2_code* re = pcre2_compile((PCRE2_SPTR)param->value, PCRE2_ZERO_TERMINATED, 0, - &errcode, &erroffset, NULL); - - if (re == NULL) - { - PCRE2_UCHAR errbuf[512]; - pcre2_get_error_message(errcode, errbuf, sizeof(errbuf)); - MXS_ERROR("Regex compilation failed at %d for regex '%s': %s", - (int)erroffset, param->value, errbuf); - hashtable_free(router->ignored_dbs); - MXS_FREE(router); - return NULL; - } - - pcre2_match_data* match_data = pcre2_match_data_create_from_pattern(re, NULL); - - if (match_data == NULL) - { - MXS_ERROR("PCRE2 match data creation failed. This" - " is most likely caused by a lack of available memory."); - pcre2_code_free(re); - hashtable_free(router->ignored_dbs); - MXS_FREE(router); - return NULL; - } - - router->ignore_regex = re; - router->ignore_match_data = match_data; - } - - if ((param = config_get_param(conf, "ignore_databases"))) - { - char val[strlen(param->value) + 1]; - strcpy(val, param->value); - - const char *sep = ", \t"; - char *sptr; - char *tok = strtok_r(val, sep, &sptr); - - while (tok) - { - hashtable_add(router->ignored_dbs, tok, ""); - tok = strtok_r(NULL, sep, &sptr); - } - } - - bool failure = false; - - for (int i = 0; options && options[i]; i++) - { - char* value = strchr(options[i], '='); - - if (value == NULL) - { - MXS_ERROR("Unknown router options for %s", options[i]); - failure = true; - break; - } - - *value = '\0'; - value++; - - if (strcmp(options[i], "max_sescmd_history") == 0) - { - router->schemarouter_config.max_sescmd_hist = atoi(value); - } - else if (strcmp(options[i], "disable_sescmd_history") == 0) - { - router->schemarouter_config.disable_sescmd_hist = config_truth_value(value); - } - else if (strcmp(options[i], "refresh_databases") == 0) - { - router->schemarouter_config.refresh_databases = config_truth_value(value); - } - else if (strcmp(options[i], "refresh_interval") == 0) - { - router->schemarouter_config.refresh_min_interval = atof(value); - } - else if (strcmp(options[i], "debug") == 0) - { - router->schemarouter_config.debug = config_truth_value(value); - } - else - { - MXS_ERROR("Unknown router options for %s", options[i]); - failure = true; - break; - } - } - - /** Setting a limit to the history size is not needed if it is disabled.*/ - if (router->schemarouter_config.disable_sescmd_hist && router->schemarouter_config.max_sescmd_hist > 0) - { - router->schemarouter_config.max_sescmd_hist = 0; - } - - if (failure) - { - MXS_FREE(router); - router = NULL; - } - - return (MXS_ROUTER *)router; -} - /** * Check if the shard map is out of date and update its state if necessary. * @param router Router instance @@ -836,294 +579,6 @@ enum shard_map_state shard_map_update_state(shard_map_t *self, SCHEMAROUTER* rou return state; } -/** - * Associate a new session with this instance of the router. - * - * The session is used to store all the data required for a particular - * client connection. - * - * @param instance The router instance data - * @param session The session itself - * @return Session specific data for this session - */ -static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* session) -{ - backend_ref_t* backend_ref; /*< array of backend references (DCB, BACKEND, cursor) */ - SCHEMAROUTER_SESSION* client_rses = NULL; - SCHEMAROUTER* router = (SCHEMAROUTER *)router_inst; - bool succp; - int router_nservers = 0; /*< # of servers in total */ - char db[MYSQL_DATABASE_MAXLEN + 1] = ""; - MySQLProtocol* protocol = session->client_dcb->protocol; - MYSQL_session* data = session->client_dcb->data; - bool using_db = false; - bool have_db = false; - - /* To enable connecting directly to a sharded database we first need - * to disable it for the client DCB's protocol so that we can connect to them*/ - if (protocol->client_capabilities & GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB && - (have_db = strnlen(data->db, MYSQL_DATABASE_MAXLEN) > 0)) - { - protocol->client_capabilities &= ~GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB; - strcpy(db, data->db); - *data->db = 0; - using_db = true; - MXS_INFO("Client logging in directly to a database '%s', " - "postponing until databases have been mapped.", db); - } - - if (!have_db) - { - MXS_INFO("Client'%s' connecting with empty database.", data->user); - } - - client_rses = (SCHEMAROUTER_SESSION *)MXS_CALLOC(1, sizeof(SCHEMAROUTER_SESSION)); - - if (client_rses == NULL) - { - return NULL; - } -#if defined(SS_DEBUG) - client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; - client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; -#endif - - client_rses->router = router; - client_rses->rses_mysql_session = (MYSQL_session*)session->client_dcb->data; - client_rses->rses_client_dcb = (DCB*)session->client_dcb; - - spinlock_acquire(&router->lock); - - shard_map_t *map = hashtable_fetch(router->shard_maps, session->client_dcb->user); - enum shard_map_state state; - - if (map) - { - state = shard_map_update_state(map, router); - } - - spinlock_release(&router->lock); - - if (map == NULL || state != SHMAP_READY) - { - if ((map = shard_map_alloc()) == NULL) - { - MXS_ERROR("Failed to allocate enough memory to create" - "new shard mapping. Session will be closed."); - MXS_FREE(client_rses); - return NULL; - } - client_rses->init = INIT_UNINT; - } - else - { - client_rses->init = INIT_READY; - atomic_add(&router->stats.shmap_cache_hit, 1); - } - - client_rses->shardmap = map; - memcpy(&client_rses->rses_config, &router->schemarouter_config, sizeof(schemarouter_config_t)); - client_rses->n_sescmd = 0; - client_rses->rses_config.last_refresh = time(NULL); - - if (using_db) - { - client_rses->init |= INIT_USE_DB; - } - /** - * Set defaults to session variables. - */ - client_rses->rses_autocommit_enabled = true; - client_rses->rses_transaction_active = false; - - /** - * Instead of calling this, ensure that there is at least one - * responding server. - */ - - router_nservers = router->service->n_dbref; - - /** - * Create backend reference objects for this session. - */ - backend_ref = (backend_ref_t *)MXS_CALLOC(router_nservers, sizeof(backend_ref_t)); - - if (backend_ref == NULL) - { - MXS_FREE(client_rses); - return NULL; - } - /** - * Initialize backend references with BACKEND ptr. - * Initialize session command cursors for each backend reference. - */ - - int i = 0; - - for (SERVER_REF *ref = router->service->dbref; ref; ref = ref->next) - { - if (ref->active) - { -#if defined(SS_DEBUG) - backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF; - backend_ref[i].bref_chk_tail = CHK_NUM_BACKEND_REF; - backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; - backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; -#endif - backend_ref[i].bref_state = 0; - backend_ref[i].n_mapping_eof = 0; - backend_ref[i].map_queue = NULL; - backend_ref[i].bref_backend = ref; - /** store pointers to sescmd list to both cursors */ - backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses; - backend_ref[i].bref_sescmd_cur.scmd_cur_active = false; - backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property = - &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL; - i++; - } - } - - if (i < router_nservers) - { - router_nservers = i; - } - - client_rses->rses_backend_ref = backend_ref; - client_rses->rses_nbackends = router_nservers; - - /** - * Connect to all backend servers - */ - succp = connect_backend_servers(backend_ref, router_nservers, session, router); - - if (!succp || client_rses->closed) - { - MXS_FREE(client_rses->rses_backend_ref); - MXS_FREE(client_rses); - return NULL; - } - - if (db[0]) - { - /* Store the database the client is connecting to */ - snprintf(client_rses->connect_db, MYSQL_DATABASE_MAXLEN + 1, "%s", db); - } - - atomic_add(&router->stats.sessions, 1); - - return (void *)client_rses; -} - - - -/** - * Close a session with the router, this is the mechanism - * by which a router may cleanup data structure etc. - * - * @param instance The router instance data - * @param session The session being closed - */ -static void closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session) -{ - SCHEMAROUTER_SESSION *router_cli_ses = (SCHEMAROUTER_SESSION *)router_session; - CHK_CLIENT_RSES(router_cli_ses); - ss_dassert(!router_cli_ses->closed); - - /** - * Lock router client session for secure read and update. - */ - if (!router_cli_ses->closed) - { - router_cli_ses->closed = true; - - for (int i = 0; i < router_cli_ses->rses_nbackends; i++) - { - backend_ref_t* bref = &router_cli_ses->rses_backend_ref[i]; - DCB* dcb = bref->bref_dcb; - /** Close those which had been connected */ - if (BREF_IS_IN_USE(bref)) - { - CHK_DCB(dcb); - - /** Clean operation counter in bref and in SERVER */ - while (BREF_IS_WAITING_RESULT(bref)) - { - bref_clear_state(bref, BREF_WAITING_RESULT); - } - bref_clear_state(bref, BREF_IN_USE); - bref_set_state(bref, BREF_CLOSED); - /** - * closes protocol and dcb - */ - dcb_close(dcb); - /** decrease server current connection counters */ - atomic_add(&bref->bref_backend->connections, -1); - } - } - - gwbuf_free(router_cli_ses->queue); - - SCHEMAROUTER *inst = router_cli_ses->router; - - spinlock_acquire(&inst->lock); - if (inst->stats.longest_sescmd < router_cli_ses->stats.longest_sescmd) - { - inst->stats.longest_sescmd = router_cli_ses->stats.longest_sescmd; - } - double ses_time = difftime(time(NULL), router_cli_ses->rses_client_dcb->session->stats.connect); - if (inst->stats.ses_longest < ses_time) - { - inst->stats.ses_longest = ses_time; - } - if (inst->stats.ses_shortest > ses_time && inst->stats.ses_shortest > 0) - { - inst->stats.ses_shortest = ses_time; - } - - inst->stats.ses_average = - (ses_time + ((inst->stats.sessions - 1) * inst->stats.ses_average)) / - (inst->stats.sessions); - - spinlock_release(&inst->lock); - } -} - -static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_client_session) -{ - SCHEMAROUTER_SESSION* router_cli_ses = (SCHEMAROUTER_SESSION *)router_client_session; - - for (int i = 0; i < router_cli_ses->rses_nbackends; i++) - { - gwbuf_free(router_cli_ses->rses_backend_ref[i].bref_pending_cmd); - } - - /** - * For each property type, walk through the list, finalize properties - * and free the allocated memory. - */ - for (int i = RSES_PROP_TYPE_FIRST; i < RSES_PROP_TYPE_COUNT; i++) - { - rses_property_t* p = router_cli_ses->rses_properties[i]; - rses_property_t* q = p; - - while (p != NULL) - { - q = p->rses_prop_next; - rses_property_done(p); - p = q; - } - } - /* - * We are no longer in the linked list, free - * all the memory and other resources associated - * to the client session. - */ - MXS_FREE(router_cli_ses->rses_backend_ref); - MXS_FREE(router_cli_ses); - return; -} - /** * Provide the router with a pointer to a suitable backend dcb. * @@ -1191,7 +646,7 @@ return_succp: * @return bitfield including the routing target, or the target server name * if the query would otherwise be routed to slave. */ -static route_target_t get_shard_route_target(qc_query_type_t qtype, +static route_target_t get_shard_route_target(uint32_t qtype, bool trx_active, /*< !!! turha ? */ HINT* hint) /*< !!! turha ? */ { @@ -1253,7 +708,7 @@ void check_drop_tmp_table(MXS_ROUTER* instance, for (i = 0; i < tsize; i++) { klen = strlen(dbname) + strlen(tbl[i]) + 2; - hkey = MXS_CALLOC(klen, sizeof(char)); + hkey = (char*)MXS_CALLOC(klen, sizeof(char)); MXS_ABORT_IF_NULL(hkey); strcpy(hkey, dbname); strcat(hkey, "."); @@ -1316,7 +771,7 @@ qc_query_type_t is_read_tmp_table(MXS_ROUTER* instance, for (i = 0; i < tsize && !target_tmp_table && tbl[i]; i++) { klen = strlen(dbname) + strlen(tbl[i]) + 2; - hkey = MXS_CALLOC(klen, sizeof(char)); + hkey = (char*)MXS_CALLOC(klen, sizeof(char)); MXS_ABORT_IF_NULL(hkey); strcpy(hkey, dbname); strcat(hkey, "."); @@ -1386,7 +841,7 @@ void check_create_tmp_table(MXS_ROUTER* instance, if (tblname && strlen(tblname) > 0) { klen = strlen(dbname) + strlen(tblname) + 2; - hkey = MXS_CALLOC(klen, sizeof(char)); + hkey = (char*)MXS_CALLOC(klen, sizeof(char)); MXS_ABORT_IF_NULL(hkey); strcpy(hkey, dbname); strcat(hkey, "."); @@ -1508,7 +963,7 @@ bool send_database_list(SCHEMAROUTER* router, SCHEMAROUTER_SESSION* client) { struct string_array strarray; const int size = hashtable_size(client->shardmap->hash); - strarray.array = MXS_MALLOC(size * sizeof(char*)); + strarray.array = (char**)MXS_MALLOC(size * sizeof(char*)); MXS_ABORT_IF_NULL(strarray.array); strarray.position = 0; HASHITERATOR *iter = hashtable_iterator(client->shardmap->hash); @@ -1518,9 +973,9 @@ bool send_database_list(SCHEMAROUTER* router, SCHEMAROUTER_SESSION* client) { char *key; int i = 0; - while ((key = hashtable_next(iter))) + while ((key = (char*)hashtable_next(iter))) { - char *value = hashtable_fetch(client->shardmap->hash, key); + char *value = (char*)hashtable_fetch(client->shardmap->hash, key); SERVER * server = server_find_by_unique_name(value); if (SERVER_IS_RUNNING(server)) { @@ -1544,6 +999,2101 @@ bool send_database_list(SCHEMAROUTER* router, SCHEMAROUTER_SESSION* client) return rval; } +/** Compare number of connections from this router in backend servers */ +int bref_cmp_router_conn(const void* bref1, const void* bref2) +{ + SERVER_REF* b1 = ((backend_ref_t *)bref1)->bref_backend; + SERVER_REF* b2 = ((backend_ref_t *)bref2)->bref_backend; + + return ((1000 * b1->connections) / b1->weight) + - ((1000 * b2->connections) / b2->weight); +} + +/** Compare number of global connections in backend servers */ +int bref_cmp_global_conn(const void* bref1, const void* bref2) +{ + SERVER_REF* b1 = ((backend_ref_t *)bref1)->bref_backend; + SERVER_REF* b2 = ((backend_ref_t *)bref2)->bref_backend; + + return ((1000 * b1->server->stats.n_current) / b1->weight) + - ((1000 * b2->server->stats.n_current) / b2->weight); +} + + +/** Compare replication lag between backend servers */ +int bref_cmp_behind_master(const void* bref1, const void* bref2) +{ + SERVER_REF* b1 = ((backend_ref_t *)bref1)->bref_backend; + SERVER_REF* b2 = ((backend_ref_t *)bref2)->bref_backend; + + return b1->server->rlag - b2->server->rlag; +} + +/** Compare number of current operations in backend servers */ +int bref_cmp_current_load(const void* bref1, const void* bref2) +{ + SERVER_REF* b1 = ((backend_ref_t *)bref1)->bref_backend; + SERVER_REF* b2 = ((backend_ref_t *)bref2)->bref_backend; + + return ((1000 * b1->server->stats.n_current_ops) - b1->weight) + - ((1000 * b2->server->stats.n_current_ops) - b2->weight); +} + +static void bref_clear_state(backend_ref_t* bref, bref_state_t state) +{ + if (bref == NULL) + { + MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__); + return; + } + if (state != BREF_WAITING_RESULT) + { + bref->bref_state &= ~state; + } + else + { + int prev1; + int prev2; + + /** Decrease waiter count */ + prev1 = atomic_add(&bref->bref_num_result_wait, -1); + + if (prev1 <= 0) + { + atomic_add(&bref->bref_num_result_wait, 1); + } + else + { + /** Decrease global operation count */ + prev2 = atomic_add(&bref->bref_backend->server->stats.n_current_ops, -1); + ss_dassert(prev2 > 0); + if (prev2 <= 0) + { + MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u", + __FUNCTION__, + bref->bref_backend->server->name, + bref->bref_backend->server->port); + } + } + } +} + +static void bref_set_state(backend_ref_t* bref, bref_state_t state) +{ + if (bref == NULL) + { + MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__); + return; + } + if (state != BREF_WAITING_RESULT) + { + bref->bref_state |= state; + } + else + { + int prev1; + int prev2; + + /** Increase waiter count */ + prev1 = atomic_add(&bref->bref_num_result_wait, 1); + ss_dassert(prev1 >= 0); + if (prev1 < 0) + { + MXS_ERROR("[%s] Error: negative number of connections waiting " + "for results in backend %s:%u", + __FUNCTION__, + bref->bref_backend->server->name, + bref->bref_backend->server->port); + } + /** Increase global operation count */ + prev2 = atomic_add(&bref->bref_backend->server->stats.n_current_ops, 1); + ss_dassert(prev2 >= 0); + if (prev2 < 0) + { + MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u", + __FUNCTION__, + bref->bref_backend->server->name, + bref->bref_backend->server->port); + } + } +} + +/** + * @node Search all RUNNING backend servers and connect + * + * Parameters: + * @param backend_ref - in, use, out + * Pointer to backend server reference object array. + * NULL is not allowed. + * + * @param router_nservers - in, use + * Number of backend server pointers pointed to by b. + * + * @param session - in, use + * MaxScale session pointer used when connection to backend is established. + * + * @param router - in, use + * Pointer to router instance. Used when server states are qualified. + * + * @return true, if at least one master and one slave was found. + * + * + * @details It is assumed that there is only one available server. + * There will be exactly as many backend references than there are + * connections because all servers are supposed to be operational. It is, + * however, possible that there are less available servers than expected. + */ +static bool connect_backend_servers(backend_ref_t* backend_ref, + int router_nservers, + MXS_SESSION* session, + SCHEMAROUTER* router) +{ + bool succp = false; + int servers_found = 0; + int servers_connected = 0; + int slaves_connected = 0; + int i; + + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) + { + MXS_INFO("Servers and connection counts:"); + + for (int i = 0; i < router_nservers; i++) + { + SERVER_REF* b = backend_ref[i].bref_backend; + + MXS_INFO("MaxScale connections : %d (%d) in \t%s:%d %s", + b->connections, + b->server->stats.n_current, + b->server->name, + b->server->port, + STRSRVSTATUS(b->server)); + } + } + /** + * Scan server list and connect each of them. None should fail or session + * can't be established. + */ + for (int i = 0; i < router_nservers; i++) + { + SERVER_REF* b = backend_ref[i].bref_backend; + + if (SERVER_IS_RUNNING(b->server)) + { + servers_found += 1; + + /** Server is already connected */ + if (BREF_IS_IN_USE((&backend_ref[i]))) + { + slaves_connected += 1; + } + /** New server connection */ + else + { + backend_ref[i].bref_dcb = dcb_connect(b->server, + session, + b->server->protocol); + + if (backend_ref[i].bref_dcb != NULL) + { + servers_connected += 1; + /** + * Start executing session command + * history. + */ + execute_sescmd_history(&backend_ref[i]); + /** + * When server fails, this callback + * is called. + * !!! Todo, routine which removes + * corresponding entries from the hash + * table. + */ + + backend_ref[i].bref_state = 0; + bref_set_state(&backend_ref[i], BREF_IN_USE); + /** + * Increase backend connection counter. + * Server's stats are _increased_ in + * dcb.c:dcb_alloc ! + * But decreased in the calling function + * of dcb_close. + */ + atomic_add(&b->connections, 1); + } + else + { + succp = false; + MXS_ERROR("Unable to establish " + "connection with slave %s:%d", + b->server->name, + b->server->port); + /* handle connect error */ + break; + } + } + } + } + + if (servers_connected > 0) + { + succp = true; + + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) + { + for (int i = 0; i < router_nservers; i++) + { + SERVER_REF* b = backend_ref[i].bref_backend; + + if (BREF_IS_IN_USE((&backend_ref[i]))) + { + MXS_INFO("Connected %s in \t%s:%d", + STRSRVSTATUS(b->server), + b->server->name, + b->server->port); + } + } + } + } + + return succp; +} + +/** + * Create a generic router session property strcture. + */ +static rses_property_t* rses_property_init(rses_property_type_t prop_type) +{ + rses_property_t* prop; + + prop = (rses_property_t*)MXS_CALLOC(1, sizeof(rses_property_t)); + if (prop == NULL) + { + goto return_prop; + } + prop->rses_prop_type = prop_type; +#if defined(SS_DEBUG) + prop->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY; + prop->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY; +#endif + +return_prop: + CHK_RSES_PROP(prop); + return prop; +} + +/** + * Property is freed at the end of router client session. + */ +static void rses_property_done(rses_property_t* prop) +{ + CHK_RSES_PROP(prop); + + switch (prop->rses_prop_type) + { + case RSES_PROP_TYPE_SESCMD: + mysql_sescmd_done(&prop->rses_prop_data.sescmd); + break; + + case RSES_PROP_TYPE_TMPTABLES: + hashtable_free(prop->rses_prop_data.temp_tables); + break; + + default: + MXS_DEBUG("%lu [rses_property_done] Unknown property type %d " + "in property %p", + pthread_self(), + prop->rses_prop_type, + prop); + ss_dassert(false); + break; + } + MXS_FREE(prop); +} + +/** + * Add property to the router_client_ses structure's rses_properties + * array. The slot is determined by the type of property. + * In each slot there is a list of properties of similar type. + * + * Router client session must be locked. + */ +static void rses_property_add(SCHEMAROUTER_SESSION* rses, + rses_property_t* prop) +{ + rses_property_t* p; + + CHK_CLIENT_RSES(rses); + CHK_RSES_PROP(prop); + + prop->rses_prop_rsession = rses; + p = rses->rses_properties[prop->rses_prop_type]; + + if (p == NULL) + { + rses->rses_properties[prop->rses_prop_type] = prop; + } + else + { + while (p->rses_prop_next != NULL) + { + p = p->rses_prop_next; + } + p->rses_prop_next = prop; + } +} + +/** + * Router session must be locked. + * Return session command pointer if succeed, NULL if failed. + */ +static mysql_sescmd_t* rses_property_get_sescmd(rses_property_t* prop) +{ + CHK_RSES_PROP(prop); + mysql_sescmd_t *sescmd = &prop->rses_prop_data.sescmd; + + if (sescmd != NULL) + { + CHK_MYSQL_SESCMD(sescmd); + } + return sescmd; +} + +/** + * Create session command property. + */ +static mysql_sescmd_t* mysql_sescmd_init(rses_property_t* rses_prop, + GWBUF* sescmd_buf, + unsigned char packet_type, + SCHEMAROUTER_SESSION* rses) +{ + mysql_sescmd_t* sescmd; + + CHK_RSES_PROP(rses_prop); + /** Can't call rses_property_get_sescmd with uninitialized sescmd */ + sescmd = &rses_prop->rses_prop_data.sescmd; + sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */ +#if defined(SS_DEBUG) + sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD; + sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD; +#endif + /** Set session command buffer */ + sescmd->my_sescmd_buf = sescmd_buf; + sescmd->my_sescmd_packet_type = packet_type; + sescmd->position = atomic_add(&rses->pos_generator, 1); + return sescmd; +} + +static void mysql_sescmd_done(mysql_sescmd_t* sescmd) +{ + CHK_RSES_PROP(sescmd->my_sescmd_prop); + gwbuf_free(sescmd->my_sescmd_buf); + memset(sescmd, 0, sizeof(mysql_sescmd_t)); +} + +/** + * All cases where backend message starts at least with one response to session + * command are handled here. + * Read session commands from property list. If command is already replied, + * discard packet. Else send reply to client. In both cases move cursor forward + * until all session command replies are handled. + * + * Cases that are expected to happen and which are handled: + * s = response not yet replied to client, S = already replied response, + * q = query + * 1. q+ for example : select * from mysql.user + * 2. s+ for example : set autocommit=1 + * 3. S+ + * 4. sq+ + * 5. Sq+ + * 6. Ss+ + * 7. Ss+q+ + * 8. S+q+ + * 9. s+q+ + */ +static GWBUF* sescmd_cursor_process_replies(GWBUF* replybuf, + backend_ref_t* bref) +{ + mysql_sescmd_t* scmd; + sescmd_cursor_t* scur; + + scur = &bref->bref_sescmd_cur; + scmd = sescmd_cursor_get_command(scur); + + CHK_GWBUF(replybuf); + + /** + * Walk through packets in the message and the list of session + * commands. + */ + while (scmd != NULL && replybuf != NULL) + { + scur->position = scmd->position; + /** Faster backend has already responded to client : discard */ + if (scmd->my_sescmd_is_replied) + { + bool last_packet = false; + + CHK_GWBUF(replybuf); + + while (!last_packet) + { + int buflen; + + buflen = GWBUF_LENGTH(replybuf); + last_packet = GWBUF_IS_TYPE_RESPONSE_END(replybuf); + /** discard packet */ + replybuf = gwbuf_consume(replybuf, buflen); + } + /** Set response status received */ + bref_clear_state(bref, BREF_WAITING_RESULT); + } + /** Response is in the buffer and it will be sent to client. */ + else if (replybuf != NULL) + { + /** Mark the rest session commands as replied */ + scmd->my_sescmd_is_replied = true; + } + + if (sescmd_cursor_next(scur)) + { + scmd = sescmd_cursor_get_command(scur); + } + else + { + scmd = NULL; + /** All session commands are replied */ + scur->scmd_cur_active = false; + } + } + ss_dassert(replybuf == NULL || *scur->scmd_cur_ptr_property == NULL); + + return replybuf; +} + + + +/** + * Get the address of current session command. + * + * Router session must be locked */ +static mysql_sescmd_t* sescmd_cursor_get_command(sescmd_cursor_t* scur) +{ + mysql_sescmd_t* scmd; + + scur->scmd_cur_cmd = rses_property_get_sescmd(*scur->scmd_cur_ptr_property); + + CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); + + scmd = scur->scmd_cur_cmd; + + return scmd; +} + +/** router must be locked */ +static bool sescmd_cursor_is_active(sescmd_cursor_t* sescmd_cursor) +{ + bool succp; + + succp = sescmd_cursor->scmd_cur_active; + return succp; +} + +/** router must be locked */ +static void sescmd_cursor_set_active(sescmd_cursor_t* sescmd_cursor, + bool value) +{ + /** avoid calling unnecessarily */ + ss_dassert(sescmd_cursor->scmd_cur_active != value); + sescmd_cursor->scmd_cur_active = value; +} + +/** + * Clone session command's command buffer. + * Router session must be locked + */ +static GWBUF* sescmd_cursor_clone_querybuf(sescmd_cursor_t* scur) +{ + GWBUF* buf; + ss_dassert(scur->scmd_cur_cmd != NULL); + + buf = gwbuf_clone(scur->scmd_cur_cmd->my_sescmd_buf); + + CHK_GWBUF(buf); + return buf; +} + +static bool sescmd_cursor_history_empty(sescmd_cursor_t* scur) +{ + bool succp; + + CHK_SESCMD_CUR(scur); + + if (scur->scmd_cur_rses->rses_properties[RSES_PROP_TYPE_SESCMD] == NULL) + { + succp = true; + } + else + { + succp = false; + } + + return succp; +} + +static void sescmd_cursor_reset(sescmd_cursor_t* scur) +{ + SCHEMAROUTER_SESSION* rses; + CHK_SESCMD_CUR(scur); + CHK_CLIENT_RSES(scur->scmd_cur_rses); + rses = scur->scmd_cur_rses; + + scur->scmd_cur_ptr_property = &rses->rses_properties[RSES_PROP_TYPE_SESCMD]; + + CHK_RSES_PROP((*scur->scmd_cur_ptr_property)); + scur->scmd_cur_active = false; + scur->scmd_cur_cmd = &(*scur->scmd_cur_ptr_property)->rses_prop_data.sescmd; +} + +static bool execute_sescmd_history(backend_ref_t* bref) +{ + bool succp; + sescmd_cursor_t* scur; + CHK_BACKEND_REF(bref); + + scur = &bref->bref_sescmd_cur; + CHK_SESCMD_CUR(scur); + + if (!sescmd_cursor_history_empty(scur)) + { + sescmd_cursor_reset(scur); + succp = execute_sescmd_in_backend(bref); + } + else + { + succp = true; + } + + return succp; +} + +/** + * If session command cursor is passive, sends the command to backend for + * execution. + * + * Returns true if command was sent or added successfully to the queue. + * Returns false if command sending failed or if there are no pending session + * commands. + * + * Router session must be locked. + */ +static bool execute_sescmd_in_backend(backend_ref_t* backend_ref) +{ + DCB* dcb; + bool succp; + int rc = 0; + sescmd_cursor_t* scur; + + if (BREF_IS_CLOSED(backend_ref)) + { + succp = false; + goto return_succp; + } + dcb = backend_ref->bref_dcb; + + CHK_DCB(dcb); + CHK_BACKEND_REF(backend_ref); + + /** + * Get cursor pointer and copy of command buffer to cursor. + */ + scur = &backend_ref->bref_sescmd_cur; + + /** Return if there are no pending ses commands */ + if (sescmd_cursor_get_command(scur) == NULL) + { + succp = false; + MXS_INFO("Cursor had no pending session commands."); + + goto return_succp; + } + + if (!sescmd_cursor_is_active(scur)) + { + /** Cursor is left active when function returns. */ + sescmd_cursor_set_active(scur, true); + } + + switch (scur->scmd_cur_cmd->my_sescmd_packet_type) + { + case MYSQL_COM_CHANGE_USER: + /** This makes it possible to handle replies correctly */ + gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD); + rc = dcb->func.auth(dcb, + NULL, + dcb->session, + sescmd_cursor_clone_querybuf(scur)); + break; + + case MYSQL_COM_QUERY: + default: + /** + * Mark session command buffer, it triggers writing + * MySQL command to protocol + */ + gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD); + rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur)); + break; + } + + if (rc == 1) + { + succp = true; + } + else + { + succp = false; + } +return_succp: + return succp; +} + + +/** + * Moves cursor to next property and copied address of its sescmd to cursor. + * Current propery must be non-null. + * If current property is the last on the list, *scur->scmd_ptr_property == NULL + * + * Router session must be locked + */ +static bool sescmd_cursor_next(sescmd_cursor_t* scur) +{ + bool succp = false; + rses_property_t* prop_curr; + rses_property_t* prop_next; + + ss_dassert(scur != NULL); + ss_dassert(*(scur->scmd_cur_ptr_property) != NULL); + + /** Illegal situation */ + if (scur == NULL || + *scur->scmd_cur_ptr_property == NULL || + scur->scmd_cur_cmd == NULL) + { + /** Log error */ + goto return_succp; + } + prop_curr = *(scur->scmd_cur_ptr_property); + + CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); + ss_dassert(prop_curr == mysql_sescmd_get_property(scur->scmd_cur_cmd)); + CHK_RSES_PROP(prop_curr); + + /** Copy address of pointer to next property */ + scur->scmd_cur_ptr_property = &(prop_curr->rses_prop_next); + prop_next = *scur->scmd_cur_ptr_property; + ss_dassert(prop_next == *(scur->scmd_cur_ptr_property)); + + /** If there is a next property move forward */ + if (prop_next != NULL) + { + CHK_RSES_PROP(prop_next); + CHK_RSES_PROP((*(scur->scmd_cur_ptr_property))); + + /** Get pointer to next property's sescmd */ + scur->scmd_cur_cmd = rses_property_get_sescmd(prop_next); + + ss_dassert(prop_next == scur->scmd_cur_cmd->my_sescmd_prop); + CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); + CHK_RSES_PROP(scur->scmd_cur_cmd->my_sescmd_prop); + } + else + { + /** No more properties, can't proceed. */ + goto return_succp; + } + + if (scur->scmd_cur_cmd != NULL) + { + succp = true; + } + else + { + ss_dassert(false); /*< Log error, sescmd shouldn't be NULL */ + } +return_succp: + return succp; +} + +static rses_property_t* mysql_sescmd_get_property(mysql_sescmd_t* scmd) +{ + CHK_MYSQL_SESCMD(scmd); + return scmd->my_sescmd_prop; +} + +/** + * Execute in backends used by current router session. + * Save session variable commands to router session property + * struct. Thus, they can be replayed in backends which are + * started and joined later. + * + * Suppress redundant OK packets sent by backends. + * + * The first OK packet is replied to the client. + * Return true if succeed, false is returned if router session was closed or + * if execute_sescmd_in_backend failed. + */ +static bool route_session_write(SCHEMAROUTER_SESSION* router_cli_ses, + GWBUF* querybuf, + SCHEMAROUTER* inst, + unsigned char packet_type, + uint32_t qtype) +{ + bool succp = false; + rses_property_t* prop; + backend_ref_t* backend_ref; + int i; + + MXS_INFO("Session write, routing to all servers."); + + backend_ref = router_cli_ses->rses_backend_ref; + + /** + * These are one-way messages and server doesn't respond to them. + * Therefore reply processing is unnecessary and session + * command property is not needed. It is just routed to all available + * backends. + */ + if (packet_type == MYSQL_COM_STMT_SEND_LONG_DATA || + packet_type == MYSQL_COM_QUIT || + packet_type == MYSQL_COM_STMT_CLOSE) + { + int rc; + + succp = true; + + /** Lock router session */ + if (router_cli_ses->closed) + { + return false; + } + + for (i = 0; i < router_cli_ses->rses_nbackends; i++) + { + DCB* dcb = backend_ref[i].bref_dcb; + + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) + { + MXS_INFO("Route query to %s\t%s:%d%s", + (SERVER_IS_MASTER(backend_ref[i].bref_backend->server) ? + "master" : "slave"), + backend_ref[i].bref_backend->server->name, + backend_ref[i].bref_backend->server->port, + (i + 1 == router_cli_ses->rses_nbackends ? " <" : "")); + } + + if (BREF_IS_IN_USE((&backend_ref[i]))) + { + rc = dcb->func.write(dcb, gwbuf_clone(querybuf)); + if (rc != 1) + { + succp = false; + } + } + } + + gwbuf_free(querybuf); + return succp; + } + /** Lock router session */ + + if (router_cli_ses->rses_nbackends <= 0) + { + return false; + } + + if (router_cli_ses->rses_config.max_sescmd_hist > 0 && + router_cli_ses->n_sescmd >= router_cli_ses->rses_config.max_sescmd_hist) + { + MXS_ERROR("Router session exceeded session command history limit of %d. " + "Closing router session.", + router_cli_ses->rses_config.max_sescmd_hist); + gwbuf_free(querybuf); + atomic_add(&router_cli_ses->router->stats.n_hist_exceeded, 1); + poll_fake_hangup_event(router_cli_ses->rses_client_dcb); + + return succp; + } + + if (router_cli_ses->rses_config.disable_sescmd_hist) + { + rses_property_t *prop, *tmp; + backend_ref_t* bref; + bool conflict; + + prop = router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD]; + while (prop) + { + conflict = false; + + for (i = 0; i < router_cli_ses->rses_nbackends; i++) + { + bref = &backend_ref[i]; + if (BREF_IS_IN_USE(bref)) + { + + if (bref->bref_sescmd_cur.position <= prop->rses_prop_data.sescmd.position) + { + conflict = true; + break; + } + } + } + + if (conflict) + { + break; + } + + tmp = prop; + router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD] = prop->rses_prop_next; + rses_property_done(tmp); + prop = router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD]; + } + } + + /** + * + * Additional reference is created to querybuf to + * prevent it from being released before properties + * are cleaned up as a part of router session clean-up. + */ + prop = rses_property_init(RSES_PROP_TYPE_SESCMD); + mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses); + + /** Add sescmd property to router client session */ + rses_property_add(router_cli_ses, prop); + atomic_add(&router_cli_ses->stats.longest_sescmd, 1); + atomic_add(&router_cli_ses->n_sescmd, 1); + + for (i = 0; i < router_cli_ses->rses_nbackends; i++) + { + if (BREF_IS_IN_USE((&backend_ref[i]))) + { + sescmd_cursor_t* scur; + + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) + { + MXS_INFO("Route query to %s\t%s:%d%s", + (SERVER_IS_MASTER(backend_ref[i].bref_backend->server) ? + "master" : "slave"), + backend_ref[i].bref_backend->server->name, + backend_ref[i].bref_backend->server->port, + (i + 1 == router_cli_ses->rses_nbackends ? " <" : "")); + } + + scur = backend_ref_get_sescmd_cursor(&backend_ref[i]); + + /** + * Add one waiter to backend reference. + */ + bref_set_state(get_bref_from_dcb(router_cli_ses, + backend_ref[i].bref_dcb), + BREF_WAITING_RESULT); + /** + * Start execution if cursor is not already executing. + * Otherwise, cursor will execute pending commands + * when it completes with previous commands. + */ + if (sescmd_cursor_is_active(scur)) + { + succp = true; + + MXS_INFO("Backend %s:%d already executing sescmd.", + backend_ref[i].bref_backend->server->name, + backend_ref[i].bref_backend->server->port); + } + else + { + succp = execute_sescmd_in_backend(&backend_ref[i]); + + if (!succp) + { + MXS_ERROR("Failed to execute session " + "command in %s:%d", + backend_ref[i].bref_backend->server->name, + backend_ref[i].bref_backend->server->port); + } + } + } + else + { + succp = false; + } + } + + return succp; +} + +static void handle_error_reply_client(MXS_SESSION* ses, + SCHEMAROUTER_SESSION* rses, + DCB* backend_dcb, + GWBUF* errmsg) +{ + mxs_session_state_t sesstate; + DCB* client_dcb; + backend_ref_t* bref; + + sesstate = ses->state; + client_dcb = ses->client_dcb; + + /** + * If bref exists, mark it closed + */ + if ((bref = get_bref_from_dcb(rses, backend_dcb)) != NULL) + { + CHK_BACKEND_REF(bref); + bref_clear_state(bref, BREF_IN_USE); + bref_set_state(bref, BREF_CLOSED); + } + + if (sesstate == SESSION_STATE_ROUTER_READY) + { + CHK_DCB(client_dcb); + client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); + } +} + +/** + * Check if a router session has servers in use + * @param rses Router client session + * @return True if session has a single backend server in use that is running. + * False if no backends are in use or running. + */ +bool have_servers(SCHEMAROUTER_SESSION* rses) +{ + for (int i = 0; i < rses->rses_nbackends; i++) + { + if (BREF_IS_IN_USE(&rses->rses_backend_ref[i]) && + !BREF_IS_CLOSED(&rses->rses_backend_ref[i])) + { + return true; + } + } + + return false; +} + +/** + * Check if there is backend reference pointing at failed DCB, and reset its + * flags. Then clear DCB's callback and finally try to reconnect. + * + * This must be called with router lock. + * + * @param inst router instance + * @param rses router client session + * @param dcb failed DCB + * @param errmsg error message which is sent to client if it is waiting + * + * @return true if there are enough backend connections to continue, false if not + */ +static bool handle_error_new_connection(SCHEMAROUTER* inst, + SCHEMAROUTER_SESSION* rses, + DCB* backend_dcb, + GWBUF* errmsg) +{ + backend_ref_t* bref; + bool succp; + + MXS_SESSION *ses = backend_dcb->session; + CHK_SESSION(ses); + + /** + * If bref == NULL it has been replaced already with another one. + */ + if ((bref = get_bref_from_dcb(rses, backend_dcb)) == NULL) + { + succp = false; + goto return_succp; + } + + CHK_BACKEND_REF(bref); + + /** + * If query was sent through the bref and it is waiting for reply from + * the backend server it is necessary to send an error to the client + * because it is waiting for reply. + */ + if (BREF_IS_WAITING_RESULT(bref)) + { + DCB* client_dcb; + client_dcb = ses->client_dcb; + client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); + bref_clear_state(bref, BREF_WAITING_RESULT); + } + bref_clear_state(bref, BREF_IN_USE); + bref_set_state(bref, BREF_CLOSED); + + /** + * Error handler is already called for this DCB because + * it's not polling anymore. It can be assumed that + * it succeed because rses isn't closed. + */ + if (backend_dcb->state != DCB_STATE_POLLING) + { + succp = true; + goto return_succp; + } + + /** + * Try to get replacement slave or at least the minimum + * number of slave connections for router session. + */ + succp = connect_backend_servers(rses->rses_backend_ref, + rses->rses_nbackends, + ses, inst); + + if (!have_servers(rses)) + { + MXS_ERROR("No more valid servers, closing session"); + succp = false; + goto return_succp; + } + +return_succp: + return succp; +} + +/** + * Finds out if there is a backend reference pointing at the DCB given as + * parameter. + * @param rses router client session + * @param dcb DCB + * + * @return backend reference pointer if succeed or NULL + */ +static backend_ref_t* get_bref_from_dcb(SCHEMAROUTER_SESSION *rses, + DCB *dcb) +{ + CHK_DCB(dcb); + CHK_CLIENT_RSES(rses); + + for (int i = 0; i < rses->rses_nbackends; i++) + { + if (rses->rses_backend_ref[i].bref_dcb == dcb) + { + return &rses->rses_backend_ref[i]; + } + } + + return NULL; +} + +static sescmd_cursor_t* backend_ref_get_sescmd_cursor(backend_ref_t* bref) +{ + CHK_BACKEND_REF(bref); + CHK_SESCMD_CUR((&bref->bref_sescmd_cur)); + return &bref->bref_sescmd_cur; +} + +/** + * Detect if a query contains a SHOW SHARDS query. + * @param query Query to inspect + * @return true if the query is a SHOW SHARDS query otherwise false + */ +bool detect_show_shards(GWBUF* query) +{ + bool rval = false; + char *querystr, *tok, *sptr; + + if (query == NULL) + { + MXS_ERROR("NULL value passed at %s:%d", __FILE__, __LINE__); + return false; + } + + if (!modutil_is_SQL(query) && !modutil_is_SQL_prepare(query)) + { + return false; + } + + if ((querystr = modutil_get_SQL(query)) == NULL) + { + MXS_ERROR("Failure to parse SQL at %s:%d", __FILE__, __LINE__); + return false; + } + + tok = strtok_r(querystr, " ", &sptr); + if (tok && strcasecmp(tok, "show") == 0) + { + tok = strtok_r(NULL, " ", &sptr); + if (tok && strcasecmp(tok, "shards") == 0) + { + rval = true; + } + } + + MXS_FREE(querystr); + return rval; +} + +struct shard_list +{ + HASHITERATOR* iter; + SCHEMAROUTER_SESSION* rses; + RESULTSET* rset; +}; + +/** + * Callback for the shard list result set creation + */ +RESULT_ROW* shard_list_cb(struct resultset* rset, void* data) +{ + char *key, *value; + struct shard_list *sl = (struct shard_list*)data; + RESULT_ROW* rval = NULL; + + if ((key = (char*)hashtable_next(sl->iter)) && + (value = (char*)hashtable_fetch(sl->rses->shardmap->hash, key))) + { + if ((rval = resultset_make_row(sl->rset))) + { + resultset_row_set(rval, 0, key); + resultset_row_set(rval, 1, value); + } + } + return rval; +} + +/** + * Send a result set of all shards and their locations to the client. + * @param rses Router client session + * @return 0 on success, -1 on error + */ +int process_show_shards(SCHEMAROUTER_SESSION* rses) +{ + int rval = 0; + + spinlock_acquire(&rses->shardmap->lock); + if (rses->shardmap->state != SHMAP_UNINIT) + { + HASHITERATOR* iter = hashtable_iterator(rses->shardmap->hash); + struct shard_list sl; + if (iter) + { + sl.iter = iter; + sl.rses = rses; + if ((sl.rset = resultset_create(shard_list_cb, &sl)) == NULL) + { + MXS_ERROR("[%s] Error: Failed to create resultset.", __FUNCTION__); + rval = -1; + } + else + { + resultset_add_column(sl.rset, "Database", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); + resultset_add_column(sl.rset, "Server", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); + resultset_stream_mysql(sl.rset, rses->rses_client_dcb); + resultset_free(sl.rset); + hashtable_iterator_free(iter); + } + } + else + { + MXS_ERROR("hashtable_iterator creation failed. " + "This is caused by a memory allocation failure."); + rval = -1; + } + } + spinlock_release(&rses->shardmap->lock); + return rval; +} + +/** + * + * @param dcb + * @param errnum + * @param mysqlstate + * @param errmsg + */ +void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg) +{ + GWBUF* errbuff = modutil_create_mysql_err_msg(1, 0, errnum, mysqlstate, errmsg); + if (errbuff) + { + if (dcb->func.write(dcb, errbuff) != 1) + { + MXS_ERROR("Failed to write error packet to client."); + } + } + else + { + MXS_ERROR("Memory allocation failed when creating error packet."); + } +} + +/** + * + * @param router_cli_ses + * @return + */ +bool handle_default_db(SCHEMAROUTER_SESSION *router_cli_ses) +{ + bool rval = false; + char* target = NULL; + + spinlock_acquire(&router_cli_ses->shardmap->lock); + if (router_cli_ses->shardmap->state != SHMAP_UNINIT) + { + target = (char*)hashtable_fetch(router_cli_ses->shardmap->hash, router_cli_ses->connect_db); + } + spinlock_release(&router_cli_ses->shardmap->lock); + + if (target) + { + /* Send a COM_INIT_DB packet to the server with the right database + * and set it as the client's active database */ + + unsigned int qlen = strlen(router_cli_ses->connect_db); + GWBUF* buffer = gwbuf_alloc(qlen + 5); + + if (buffer) + { + uint8_t *data = GWBUF_DATA(buffer); + gw_mysql_set_byte3(data, qlen + 1); + gwbuf_set_type(buffer, GWBUF_TYPE_MYSQL); + data[3] = 0x0; + data[4] = 0x2; + memcpy(data + 5, router_cli_ses->connect_db, qlen); + DCB* dcb = NULL; + + if (get_shard_dcb(&dcb, router_cli_ses, target)) + { + dcb->func.write(dcb, buffer); + MXS_DEBUG("USE '%s' sent to %s for session %p", + router_cli_ses->connect_db, + target, + router_cli_ses->rses_client_dcb->session); + rval = true; + } + else + { + MXS_INFO("Couldn't find target DCB for '%s'.", target); + } + } + else + { + MXS_ERROR("Buffer allocation failed."); + } + } + else + { + /** Unknown database, hang up on the client*/ + MXS_INFO("Connecting to a non-existent database '%s'", + router_cli_ses->connect_db); + char errmsg[128 + MYSQL_DATABASE_MAXLEN + 1]; + sprintf(errmsg, "Unknown database '%s'", router_cli_ses->connect_db); + if (router_cli_ses->rses_config.debug) + { + sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)", + router_cli_ses->rses_client_dcb->session->ses_id); + } + write_error_to_client(router_cli_ses->rses_client_dcb, + SCHEMA_ERR_DBNOTFOUND, + SCHEMA_ERRSTR_DBNOTFOUND, + errmsg); + } + + return rval; +} + +void route_queued_query(SCHEMAROUTER_SESSION *router_cli_ses) +{ + GWBUF* tmp = router_cli_ses->queue; + router_cli_ses->queue = router_cli_ses->queue->next; + tmp->next = NULL; +#ifdef SS_DEBUG + char* querystr = modutil_get_SQL(tmp); + MXS_DEBUG("Sending queued buffer for session %p: %s", + router_cli_ses->rses_client_dcb->session, + querystr); + MXS_FREE(querystr); +#endif + poll_add_epollin_event_to_dcb(router_cli_ses->rses_client_dcb, tmp); +} + +/** + * + * @param router_cli_ses Router client session + * @return 1 if mapping is done, 0 if it is still ongoing and -1 on error + */ +int inspect_backend_mapping_states(SCHEMAROUTER_SESSION *router_cli_ses, + backend_ref_t *bref, + GWBUF** wbuf) +{ + bool mapped = true; + GWBUF* writebuf = *wbuf; + backend_ref_t* bkrf = router_cli_ses->rses_backend_ref; + + for (int i = 0; i < router_cli_ses->rses_nbackends; i++) + { + if (bref->bref_dcb == bkrf[i].bref_dcb && !BREF_IS_MAPPED(&bkrf[i])) + { + if (bref->map_queue) + { + writebuf = gwbuf_append(bref->map_queue, writebuf); + bref->map_queue = NULL; + } + showdb_response_t rc = parse_showdb_response(router_cli_ses, + &router_cli_ses->rses_backend_ref[i], + &writebuf); + if (rc == SHOWDB_FULL_RESPONSE) + { + router_cli_ses->rses_backend_ref[i].bref_mapped = true; + MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p", + router_cli_ses->rses_backend_ref[i].bref_backend->server->unique_name, + router_cli_ses->rses_client_dcb->session); + } + else if (rc == SHOWDB_PARTIAL_RESPONSE) + { + bref->map_queue = writebuf; + writebuf = NULL; + MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p", + router_cli_ses->rses_backend_ref[i].bref_backend->server->unique_name, + router_cli_ses->rses_client_dcb->session); + } + else + { + DCB* client_dcb = NULL; + + if ((router_cli_ses->init & INIT_FAILED) == 0) + { + if (rc == SHOWDB_DUPLICATE_DATABASES) + { + MXS_ERROR("Duplicate databases found, closing session."); + } + else + { + MXS_ERROR("Fatal error when processing SHOW DATABASES response, closing session."); + } + client_dcb = router_cli_ses->rses_client_dcb; + + /** This is the first response to the database mapping which + * has duplicate database conflict. Set the initialization bitmask + * to INIT_FAILED */ + router_cli_ses->init |= INIT_FAILED; + + /** Send the client an error about duplicate databases + * if there is a queued query from the client. */ + if (router_cli_ses->queue) + { + GWBUF* error = modutil_create_mysql_err_msg(1, 0, + SCHEMA_ERR_DUPLICATEDB, + SCHEMA_ERRSTR_DUPLICATEDB, + "Error: duplicate databases " + "found on two different shards."); + + if (error) + { + client_dcb->func.write(client_dcb, error); + } + else + { + MXS_ERROR("Creating buffer for error message failed."); + } + } + } + *wbuf = writebuf; + return -1; + } + } + + if (BREF_IS_IN_USE(&bkrf[i]) && !BREF_IS_MAPPED(&bkrf[i])) + { + mapped = false; + MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p", + bkrf[i].bref_backend->server->unique_name, + router_cli_ses->rses_client_dcb->session); + } + } + *wbuf = writebuf; + return mapped ? 1 : 0; +} + +/** + * Replace a shard map with another one. This function copies the contents of + * the source shard map to the target and frees the source memory. + * @param target Target shard map to replace + * @param source Source shard map to use + */ +void replace_shard_map(shard_map_t **target, shard_map_t **source) +{ + shard_map_t *tgt = *target; + shard_map_t *src = *source; + tgt->last_updated = src->last_updated; + tgt->state = src->state; + hashtable_free(tgt->hash); + tgt->hash = src->hash; + MXS_FREE(src); + *source = NULL; +} + +/** + * Synchronize the router client session shard map with the global shard map for + * this user. + * + * If the router doesn't have a shard map for this user then the current shard map + * of the client session is added to the router. If the shard map in the router is + * out of date, its contents are replaced with the contents of the current client + * session. If the router has a usable shard map, the current shard map of the client + * is discarded and the router's shard map is used. + * @param client Router session + */ +void synchronize_shard_map(SCHEMAROUTER_SESSION *client) +{ + spinlock_acquire(&client->router->lock); + + client->router->stats.shmap_cache_miss++; + + shard_map_t *map = (shard_map_t *)hashtable_fetch(client->router->shard_maps, + client->rses_client_dcb->user); + if (map) + { + spinlock_acquire(&map->lock); + if (map->state == SHMAP_STALE) + { + replace_shard_map(&map, &client->shardmap); + } + else if (map->state != SHMAP_READY) + { + MXS_WARNING("Shard map state is not ready but" + "it is in use. Replacing it with a newer one."); + replace_shard_map(&map, &client->shardmap); + } + else + { + /** + * Another thread has already updated the shard map for this user + */ + hashtable_free(client->shardmap->hash); + MXS_FREE(client->shardmap); + } + spinlock_release(&map->lock); + client->shardmap = map; + } + else + { + hashtable_add(client->router->shard_maps, + client->rses_client_dcb->user, + client->shardmap); + ss_dassert(hashtable_fetch(client->router->shard_maps, + client->rses_client_dcb->user) == client->shardmap); + } + spinlock_release(&client->router->lock); +} + +/** + * Extract the database name from a COM_INIT_DB or literal USE ... query. + * @param buf Buffer with the database change query + * @param str Pointer where the database name is copied + * @return True for success, false for failure + */ +bool extract_database(GWBUF* buf, char* str) +{ + uint8_t* packet; + char *saved, *tok, *query = NULL; + bool succp = true; + unsigned int plen; + + packet = GWBUF_DATA(buf); + plen = gw_mysql_get_byte3(packet) - 1; + + /** Copy database name from MySQL packet to session */ + if (qc_get_operation(buf) == QUERY_OP_CHANGE_DB) + { + const char *delim = "` \n\t;"; + + query = modutil_get_SQL(buf); + tok = strtok_r(query, delim, &saved); + + if (tok == NULL || strcasecmp(tok, "use") != 0) + { + MXS_ERROR("extract_database: Malformed chage database packet."); + succp = false; + goto retblock; + } + + tok = strtok_r(NULL, delim, &saved); + + if (tok == NULL) + { + MXS_ERROR("extract_database: Malformed change database packet."); + succp = false; + goto retblock; + } + + strncpy(str, tok, MYSQL_DATABASE_MAXLEN); + } + else + { + memcpy(str, packet + 5, plen); + memset(str + plen, 0, 1); + } +retblock: + MXS_FREE(query); + return succp; +} + +/** + * Create a fake error message from a DCB. + * @param fail_str Custom error message + * @param dcb DCB to use as the origin of the error + */ +void create_error_reply(char* fail_str, DCB* dcb) +{ + MXS_INFO("change_current_db: failed to change database: %s", fail_str); + GWBUF* errbuf = modutil_create_mysql_err_msg(1, 0, 1049, "42000", fail_str); + + if (errbuf == NULL) + { + MXS_ERROR("Creating buffer for error message failed."); + return; + } + /** Set flags that help router to identify session commands reply */ + gwbuf_set_type(errbuf, GWBUF_TYPE_MYSQL); + gwbuf_set_type(errbuf, GWBUF_TYPE_SESCMD_RESPONSE); + gwbuf_set_type(errbuf, GWBUF_TYPE_RESPONSE_END); + + poll_add_epollin_event_to_dcb(dcb, + errbuf); +} + +/** + * Read new database name from MYSQL_COM_INIT_DB packet or a literal USE ... COM_QUERY + * packet, check that it exists in the hashtable and copy its name to MYSQL_session. + * + * @param dest Destination where the database name will be written + * @param dbhash Hashtable containing valid databases + * @param buf Buffer containing the database change query + * + * @return true if new database is set, false if non-existent database was tried + * to be set + */ +bool change_current_db(char* dest, + HASHTABLE* dbhash, + GWBUF* buf) +{ + char* target; + bool succp; + char db[MYSQL_DATABASE_MAXLEN + 1]; + if (GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5) + { + /** Copy database name from MySQL packet to session */ + if (!extract_database(buf, db)) + { + succp = false; + goto retblock; + } + MXS_INFO("change_current_db: INIT_DB with database '%s'", db); + /** + * Update the session's active database only if it's in the hashtable. + * If it isn't found, send a custom error packet to the client. + */ + + if ((target = (char*)hashtable_fetch(dbhash, (char*)db)) == NULL) + { + succp = false; + goto retblock; + } + else + { + strcpy(dest, db); + MXS_INFO("change_current_db: database is on server: '%s'.", target); + succp = true; + goto retblock; + } + } + else + { + /** Create error message */ + MXS_ERROR("change_current_db: failed to change database: Query buffer too large"); + MXS_INFO("change_current_db: failed to change database: " + "Query buffer too large [%ld bytes]", GWBUF_LENGTH(buf)); + succp = false; + goto retblock; + } + +retblock: + return succp; +} + +MXS_BEGIN_DECLS + +/** + * Create an instance of schemarouter router within the MaxScale. + * + * + * @param service The service this router is being create for + * @param options The options for this query router + * + * @return NULL in failure, pointer to router in success. + */ +static MXS_ROUTER* createInstance(SERVICE *service, char **options) +{ + SCHEMAROUTER* router; + MXS_CONFIG_PARAMETER* conf; + MXS_CONFIG_PARAMETER* param; + + if ((router = (SCHEMAROUTER*)MXS_CALLOC(1, sizeof(SCHEMAROUTER))) == NULL) + { + return NULL; + } + + if ((router->ignored_dbs = hashtable_alloc(SCHEMAROUTER_HASHSIZE, hashkeyfun, hashcmpfun)) == NULL) + { + MXS_ERROR("Memory allocation failed when allocating schemarouter database ignore list."); + MXS_FREE(router); + return NULL; + } + + hashtable_memory_fns(router->ignored_dbs, hashtable_item_strdup, NULL, hashtable_item_free, NULL); + + if ((router->shard_maps = hashtable_alloc(SCHEMAROUTER_USERHASH_SIZE, hashkeyfun, hashcmpfun)) == NULL) + { + MXS_ERROR("Memory allocation failed when allocating schemarouter database ignore list."); + hashtable_free(router->ignored_dbs); + MXS_FREE(router); + return NULL; + } + + hashtable_memory_fns(router->shard_maps, hashtable_item_strdup, NULL, keyfreefun, NULL); + + /** Add default system databases to ignore */ + hashtable_add(router->ignored_dbs, (void*)"mysql", (void*)""); + hashtable_add(router->ignored_dbs, (void*)"information_schema", (void*)""); + hashtable_add(router->ignored_dbs, (void*)"performance_schema", (void*)""); + router->service = service; + router->schemarouter_config.max_sescmd_hist = 0; + router->schemarouter_config.last_refresh = time(NULL); + router->stats.longest_sescmd = 0; + router->stats.n_hist_exceeded = 0; + router->stats.n_queries = 0; + router->stats.n_sescmd = 0; + router->stats.ses_longest = 0; + router->stats.ses_shortest = (double)((unsigned long)(~0)); + spinlock_init(&router->lock); + + conf = service->svc_config_param; + + router->schemarouter_config.refresh_databases = config_get_bool(conf, "refresh_databases"); + router->schemarouter_config.refresh_min_interval = config_get_integer(conf, "refresh_interval"); + router->schemarouter_config.max_sescmd_hist = config_get_integer(conf, "max_sescmd_history"); + router->schemarouter_config.disable_sescmd_hist = config_get_bool(conf, "disable_sescmd_history"); + router->schemarouter_config.debug = config_get_bool(conf, "debug"); + + if ((config_get_param(conf, "auth_all_servers")) == NULL) + { + MXS_NOTICE("Authentication data is fetched from all servers. To disable this " + "add 'auth_all_servers=0' to the service."); + service->users_from_all = true; + } + + if ((param = config_get_param(conf, "ignore_databases_regex"))) + { + int errcode; + PCRE2_SIZE erroffset; + pcre2_code* re = pcre2_compile((PCRE2_SPTR)param->value, PCRE2_ZERO_TERMINATED, 0, + &errcode, &erroffset, NULL); + + if (re == NULL) + { + PCRE2_UCHAR errbuf[512]; + pcre2_get_error_message(errcode, errbuf, sizeof(errbuf)); + MXS_ERROR("Regex compilation failed at %d for regex '%s': %s", + (int)erroffset, param->value, errbuf); + hashtable_free(router->ignored_dbs); + MXS_FREE(router); + return NULL; + } + + pcre2_match_data* match_data = pcre2_match_data_create_from_pattern(re, NULL); + + if (match_data == NULL) + { + MXS_ERROR("PCRE2 match data creation failed. This" + " is most likely caused by a lack of available memory."); + pcre2_code_free(re); + hashtable_free(router->ignored_dbs); + MXS_FREE(router); + return NULL; + } + + router->ignore_regex = re; + router->ignore_match_data = match_data; + } + + if ((param = config_get_param(conf, "ignore_databases"))) + { + char val[strlen(param->value) + 1]; + strcpy(val, param->value); + + const char *sep = ", \t"; + char *sptr; + char *tok = strtok_r(val, sep, &sptr); + + while (tok) + { + hashtable_add(router->ignored_dbs, tok, (void*)""); + tok = strtok_r(NULL, sep, &sptr); + } + } + + bool failure = false; + + for (int i = 0; options && options[i]; i++) + { + char* value = strchr(options[i], '='); + + if (value == NULL) + { + MXS_ERROR("Unknown router options for %s", options[i]); + failure = true; + break; + } + + *value = '\0'; + value++; + + if (strcmp(options[i], "max_sescmd_history") == 0) + { + router->schemarouter_config.max_sescmd_hist = atoi(value); + } + else if (strcmp(options[i], "disable_sescmd_history") == 0) + { + router->schemarouter_config.disable_sescmd_hist = config_truth_value(value); + } + else if (strcmp(options[i], "refresh_databases") == 0) + { + router->schemarouter_config.refresh_databases = config_truth_value(value); + } + else if (strcmp(options[i], "refresh_interval") == 0) + { + router->schemarouter_config.refresh_min_interval = atof(value); + } + else if (strcmp(options[i], "debug") == 0) + { + router->schemarouter_config.debug = config_truth_value(value); + } + else + { + MXS_ERROR("Unknown router options for %s", options[i]); + failure = true; + break; + } + } + + /** Setting a limit to the history size is not needed if it is disabled.*/ + if (router->schemarouter_config.disable_sescmd_hist && router->schemarouter_config.max_sescmd_hist > 0) + { + router->schemarouter_config.max_sescmd_hist = 0; + } + + if (failure) + { + MXS_FREE(router); + router = NULL; + } + + return (MXS_ROUTER *)router; +} + +/** + * Associate a new session with this instance of the router. + * + * The session is used to store all the data required for a particular + * client connection. + * + * @param instance The router instance data + * @param session The session itself + * @return Session specific data for this session + */ +static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* session) +{ + backend_ref_t* backend_ref; /*< array of backend references (DCB, BACKEND, cursor) */ + SCHEMAROUTER_SESSION* client_rses = NULL; + SCHEMAROUTER* router = (SCHEMAROUTER *)router_inst; + bool succp; + int router_nservers = 0; /*< # of servers in total */ + char db[MYSQL_DATABASE_MAXLEN + 1] = ""; + MySQLProtocol* protocol = (MySQLProtocol*)session->client_dcb->protocol; + MYSQL_session* data = (MYSQL_session*)session->client_dcb->data; + bool using_db = false; + bool have_db = false; + + /* To enable connecting directly to a sharded database we first need + * to disable it for the client DCB's protocol so that we can connect to them*/ + if (protocol->client_capabilities & GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB && + (have_db = strnlen(data->db, MYSQL_DATABASE_MAXLEN) > 0)) + { + protocol->client_capabilities &= ~GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB; + strcpy(db, data->db); + *data->db = 0; + using_db = true; + MXS_INFO("Client logging in directly to a database '%s', " + "postponing until databases have been mapped.", db); + } + + if (!have_db) + { + MXS_INFO("Client'%s' connecting with empty database.", data->user); + } + + client_rses = (SCHEMAROUTER_SESSION *)MXS_CALLOC(1, sizeof(SCHEMAROUTER_SESSION)); + + if (client_rses == NULL) + { + return NULL; + } +#if defined(SS_DEBUG) + client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; + client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; +#endif + + client_rses->router = router; + client_rses->rses_mysql_session = (MYSQL_session*)session->client_dcb->data; + client_rses->rses_client_dcb = (DCB*)session->client_dcb; + + spinlock_acquire(&router->lock); + + shard_map_t *map = (shard_map_t*)hashtable_fetch(router->shard_maps, session->client_dcb->user); + enum shard_map_state state; + + if (map) + { + state = shard_map_update_state(map, router); + } + + spinlock_release(&router->lock); + + if (map == NULL || state != SHMAP_READY) + { + if ((map = shard_map_alloc()) == NULL) + { + MXS_ERROR("Failed to allocate enough memory to create" + "new shard mapping. Session will be closed."); + MXS_FREE(client_rses); + return NULL; + } + client_rses->init = INIT_UNINT; + } + else + { + client_rses->init = INIT_READY; + atomic_add(&router->stats.shmap_cache_hit, 1); + } + + client_rses->shardmap = map; + memcpy(&client_rses->rses_config, &router->schemarouter_config, sizeof(schemarouter_config_t)); + client_rses->n_sescmd = 0; + client_rses->rses_config.last_refresh = time(NULL); + + if (using_db) + { + client_rses->init |= INIT_USE_DB; + } + /** + * Set defaults to session variables. + */ + client_rses->rses_autocommit_enabled = true; + client_rses->rses_transaction_active = false; + + /** + * Instead of calling this, ensure that there is at least one + * responding server. + */ + + router_nservers = router->service->n_dbref; + + /** + * Create backend reference objects for this session. + */ + backend_ref = (backend_ref_t *)MXS_CALLOC(router_nservers, sizeof(backend_ref_t)); + + if (backend_ref == NULL) + { + MXS_FREE(client_rses); + return NULL; + } + /** + * Initialize backend references with BACKEND ptr. + * Initialize session command cursors for each backend reference. + */ + + int i = 0; + + for (SERVER_REF *ref = router->service->dbref; ref; ref = ref->next) + { + if (ref->active) + { +#if defined(SS_DEBUG) + backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF; + backend_ref[i].bref_chk_tail = CHK_NUM_BACKEND_REF; + backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; + backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; +#endif + backend_ref[i].bref_state = 0; + backend_ref[i].n_mapping_eof = 0; + backend_ref[i].map_queue = NULL; + backend_ref[i].bref_backend = ref; + /** store pointers to sescmd list to both cursors */ + backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses; + backend_ref[i].bref_sescmd_cur.scmd_cur_active = false; + backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property = + &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; + backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL; + i++; + } + } + + if (i < router_nservers) + { + router_nservers = i; + } + + client_rses->rses_backend_ref = backend_ref; + client_rses->rses_nbackends = router_nservers; + + /** + * Connect to all backend servers + */ + succp = connect_backend_servers(backend_ref, router_nservers, session, router); + + if (!succp || client_rses->closed) + { + MXS_FREE(client_rses->rses_backend_ref); + MXS_FREE(client_rses); + return NULL; + } + + if (db[0]) + { + /* Store the database the client is connecting to */ + snprintf(client_rses->connect_db, MYSQL_DATABASE_MAXLEN + 1, "%s", db); + } + + atomic_add(&router->stats.sessions, 1); + + return (MXS_ROUTER_SESSION*)client_rses; +} + + + +/** + * Close a session with the router, this is the mechanism + * by which a router may cleanup data structure etc. + * + * @param instance The router instance data + * @param session The session being closed + */ +static void closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session) +{ + SCHEMAROUTER_SESSION *router_cli_ses = (SCHEMAROUTER_SESSION *)router_session; + CHK_CLIENT_RSES(router_cli_ses); + ss_dassert(!router_cli_ses->closed); + + /** + * Lock router client session for secure read and update. + */ + if (!router_cli_ses->closed) + { + router_cli_ses->closed = true; + + for (int i = 0; i < router_cli_ses->rses_nbackends; i++) + { + backend_ref_t* bref = &router_cli_ses->rses_backend_ref[i]; + DCB* dcb = bref->bref_dcb; + /** Close those which had been connected */ + if (BREF_IS_IN_USE(bref)) + { + CHK_DCB(dcb); + + /** Clean operation counter in bref and in SERVER */ + while (BREF_IS_WAITING_RESULT(bref)) + { + bref_clear_state(bref, BREF_WAITING_RESULT); + } + bref_clear_state(bref, BREF_IN_USE); + bref_set_state(bref, BREF_CLOSED); + /** + * closes protocol and dcb + */ + dcb_close(dcb); + /** decrease server current connection counters */ + atomic_add(&bref->bref_backend->connections, -1); + } + } + + gwbuf_free(router_cli_ses->queue); + + SCHEMAROUTER *inst = router_cli_ses->router; + + spinlock_acquire(&inst->lock); + if (inst->stats.longest_sescmd < router_cli_ses->stats.longest_sescmd) + { + inst->stats.longest_sescmd = router_cli_ses->stats.longest_sescmd; + } + double ses_time = difftime(time(NULL), router_cli_ses->rses_client_dcb->session->stats.connect); + if (inst->stats.ses_longest < ses_time) + { + inst->stats.ses_longest = ses_time; + } + if (inst->stats.ses_shortest > ses_time && inst->stats.ses_shortest > 0) + { + inst->stats.ses_shortest = ses_time; + } + + inst->stats.ses_average = + (ses_time + ((inst->stats.sessions - 1) * inst->stats.ses_average)) / + (inst->stats.sessions); + + spinlock_release(&inst->lock); + } +} + +static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_client_session) +{ + SCHEMAROUTER_SESSION* router_cli_ses = (SCHEMAROUTER_SESSION *)router_client_session; + + for (int i = 0; i < router_cli_ses->rses_nbackends; i++) + { + gwbuf_free(router_cli_ses->rses_backend_ref[i].bref_pending_cmd); + } + + /** + * For each property type, walk through the list, finalize properties + * and free the allocated memory. + */ + for (int i = RSES_PROP_TYPE_FIRST; i < RSES_PROP_TYPE_COUNT; i++) + { + rses_property_t* p = router_cli_ses->rses_properties[i]; + rses_property_t* q = p; + + while (p != NULL) + { + q = p->rses_prop_next; + rses_property_done(p); + p = q; + } + } + /* + * We are no longer in the linked list, free + * all the memory and other resources associated + * to the client session. + */ + MXS_FREE(router_cli_ses->rses_backend_ref); + MXS_FREE(router_cli_ses); + return; +} + /** * The main routing entry, this is called with every packet that is * received and has to be forwarded to the backend database. @@ -1567,8 +3117,8 @@ bool send_database_list(SCHEMAROUTER* router, SCHEMAROUTER_SESSION* client) */ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, GWBUF* qbuf) { - qc_query_type_t qtype = QUERY_TYPE_UNKNOWN; - mysql_server_cmd_t packet_type; + uint32_t qtype = QUERY_TYPE_UNKNOWN; + uint8_t packet_type; uint8_t* packet; int ret = 0; DCB* target_dcb = NULL; @@ -1648,6 +3198,7 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, packet = GWBUF_DATA(querybuf); packet_type = packet[4]; + qc_query_op_t op = QUERY_OP_UNDEFINED; if (detect_show_shards(querybuf)) { @@ -1656,8 +3207,6 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, goto retblock; } - qc_query_op_t op = QUERY_OP_UNDEFINED; - switch (packet_type) { case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */ @@ -1794,7 +3343,7 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, route_target = TARGET_UNDEFINED; spinlock_acquire(&router_cli_ses->shardmap->lock); - tname = hashtable_fetch(router_cli_ses->shardmap->hash, router_cli_ses->current_db); + tname = (char*)hashtable_fetch(router_cli_ses->shardmap->hash, router_cli_ses->current_db); if (tname) @@ -2319,993 +3868,6 @@ static void clientReply(MXS_ROUTER* instance, return; } -/** Compare number of connections from this router in backend servers */ -int bref_cmp_router_conn(const void* bref1, const void* bref2) -{ - SERVER_REF* b1 = ((backend_ref_t *)bref1)->bref_backend; - SERVER_REF* b2 = ((backend_ref_t *)bref2)->bref_backend; - - return ((1000 * b1->connections) / b1->weight) - - ((1000 * b2->connections) / b2->weight); -} - -/** Compare number of global connections in backend servers */ -int bref_cmp_global_conn(const void* bref1, const void* bref2) -{ - SERVER_REF* b1 = ((backend_ref_t *)bref1)->bref_backend; - SERVER_REF* b2 = ((backend_ref_t *)bref2)->bref_backend; - - return ((1000 * b1->server->stats.n_current) / b1->weight) - - ((1000 * b2->server->stats.n_current) / b2->weight); -} - - -/** Compare replication lag between backend servers */ -int bref_cmp_behind_master(const void* bref1, const void* bref2) -{ - SERVER_REF* b1 = ((backend_ref_t *)bref1)->bref_backend; - SERVER_REF* b2 = ((backend_ref_t *)bref2)->bref_backend; - - return b1->server->rlag - b2->server->rlag; -} - -/** Compare number of current operations in backend servers */ -int bref_cmp_current_load(const void* bref1, const void* bref2) -{ - SERVER_REF* b1 = ((backend_ref_t *)bref1)->bref_backend; - SERVER_REF* b2 = ((backend_ref_t *)bref2)->bref_backend; - - return ((1000 * b1->server->stats.n_current_ops) - b1->weight) - - ((1000 * b2->server->stats.n_current_ops) - b2->weight); -} - -static void bref_clear_state(backend_ref_t* bref, bref_state_t state) -{ - if (bref == NULL) - { - MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__); - return; - } - if (state != BREF_WAITING_RESULT) - { - bref->bref_state &= ~state; - } - else - { - int prev1; - int prev2; - - /** Decrease waiter count */ - prev1 = atomic_add(&bref->bref_num_result_wait, -1); - - if (prev1 <= 0) - { - atomic_add(&bref->bref_num_result_wait, 1); - } - else - { - /** Decrease global operation count */ - prev2 = atomic_add(&bref->bref_backend->server->stats.n_current_ops, -1); - ss_dassert(prev2 > 0); - if (prev2 <= 0) - { - MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u", - __FUNCTION__, - bref->bref_backend->server->name, - bref->bref_backend->server->port); - } - } - } -} - -static void bref_set_state(backend_ref_t* bref, bref_state_t state) -{ - if (bref == NULL) - { - MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__); - return; - } - if (state != BREF_WAITING_RESULT) - { - bref->bref_state |= state; - } - else - { - int prev1; - int prev2; - - /** Increase waiter count */ - prev1 = atomic_add(&bref->bref_num_result_wait, 1); - ss_dassert(prev1 >= 0); - if (prev1 < 0) - { - MXS_ERROR("[%s] Error: negative number of connections waiting " - "for results in backend %s:%u", - __FUNCTION__, - bref->bref_backend->server->name, - bref->bref_backend->server->port); - } - /** Increase global operation count */ - prev2 = atomic_add(&bref->bref_backend->server->stats.n_current_ops, 1); - ss_dassert(prev2 >= 0); - if (prev2 < 0) - { - MXS_ERROR("[%s] Error: negative current operation count in backend %s:%u", - __FUNCTION__, - bref->bref_backend->server->name, - bref->bref_backend->server->port); - } - } -} - -/** - * @node Search all RUNNING backend servers and connect - * - * Parameters: - * @param backend_ref - in, use, out - * Pointer to backend server reference object array. - * NULL is not allowed. - * - * @param router_nservers - in, use - * Number of backend server pointers pointed to by b. - * - * @param session - in, use - * MaxScale session pointer used when connection to backend is established. - * - * @param router - in, use - * Pointer to router instance. Used when server states are qualified. - * - * @return true, if at least one master and one slave was found. - * - * - * @details It is assumed that there is only one available server. - * There will be exactly as many backend references than there are - * connections because all servers are supposed to be operational. It is, - * however, possible that there are less available servers than expected. - */ -static bool connect_backend_servers(backend_ref_t* backend_ref, - int router_nservers, - MXS_SESSION* session, - SCHEMAROUTER* router) -{ - bool succp = true; - /* - bool is_synced_master; - bool master_connected = true; - */ - int servers_found = 0; - int servers_connected = 0; - int slaves_connected = 0; - int i; - /* - select_criteria_t select_criteria = LEAST_GLOBAL_CONNECTIONS; - */ - -#if defined(EXTRA_SS_DEBUG) - MXS_INFO("Servers and conns before ordering:"); - - for (i = 0; i < router_nservers; i++) - { - BACKEND* b = backend_ref[i].bref_backend; - - MXS_INFO("bref %p %d %s %d:%d", - &backend_ref[i], - backend_ref[i].bref_state, - b->backend_server->name, - b->backend_server->port, - b->backend_conn_count); - } -#endif - - if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) - { - MXS_INFO("Servers and connection counts:"); - - for (i = 0; i < router_nservers; i++) - { - SERVER_REF* b = backend_ref[i].bref_backend; - - MXS_INFO("MaxScale connections : %d (%d) in \t%s:%d %s", - b->connections, - b->server->stats.n_current, - b->server->name, - b->server->port, - STRSRVSTATUS(b->server)); - } - } /*< log only */ - /** - * Scan server list and connect each of them. None should fail or session - * can't be established. - */ - for (i = 0; i < router_nservers; i++) - { - SERVER_REF* b = backend_ref[i].bref_backend; - - if (SERVER_IS_RUNNING(b->server)) - { - servers_found += 1; - - /** Server is already connected */ - if (BREF_IS_IN_USE((&backend_ref[i]))) - { - slaves_connected += 1; - } - /** New server connection */ - else - { - backend_ref[i].bref_dcb = dcb_connect(b->server, - session, - b->server->protocol); - - if (backend_ref[i].bref_dcb != NULL) - { - servers_connected += 1; - /** - * Start executing session command - * history. - */ - execute_sescmd_history(&backend_ref[i]); - /** - * When server fails, this callback - * is called. - * !!! Todo, routine which removes - * corresponding entries from the hash - * table. - */ - - backend_ref[i].bref_state = 0; - bref_set_state(&backend_ref[i], BREF_IN_USE); - /** - * Increase backend connection counter. - * Server's stats are _increased_ in - * dcb.c:dcb_alloc ! - * But decreased in the calling function - * of dcb_close. - */ - atomic_add(&b->connections, 1); - } - else - { - succp = false; - MXS_ERROR("Unable to establish " - "connection with slave %s:%d", - b->server->name, - b->server->port); - /* handle connect error */ - break; - } - } - } - } /*< for */ - -#if defined(EXTRA_SS_DEBUG) - MXS_INFO("Servers and conns after ordering:"); - - for (i = 0; i < router_nservers; i++) - { - BACKEND* b = backend_ref[i].bref_backend; - - MXS_INFO("bref %p %d %s %d:%d", - &backend_ref[i], - backend_ref[i].bref_state, - b->backend_server->name, - b->backend_server->port, - b->backend_conn_count); - } -#endif - /** - * Successful cases - */ - if (servers_connected == router_nservers) - { - succp = true; - - if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) - { - for (i = 0; i < router_nservers; i++) - { - SERVER_REF* b = backend_ref[i].bref_backend; - - if (BREF_IS_IN_USE((&backend_ref[i]))) - { - MXS_INFO("Connected %s in \t%s:%d", - STRSRVSTATUS(b->server), - b->server->name, - b->server->port); - } - } /* for */ - } - } - - return succp; -} - -/** - * Create a generic router session property strcture. - */ -static rses_property_t* rses_property_init(rses_property_type_t prop_type) -{ - rses_property_t* prop; - - prop = (rses_property_t*)MXS_CALLOC(1, sizeof(rses_property_t)); - if (prop == NULL) - { - goto return_prop; - } - prop->rses_prop_type = prop_type; -#if defined(SS_DEBUG) - prop->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY; - prop->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY; -#endif - -return_prop: - CHK_RSES_PROP(prop); - return prop; -} - -/** - * Property is freed at the end of router client session. - */ -static void rses_property_done(rses_property_t* prop) -{ - CHK_RSES_PROP(prop); - - switch (prop->rses_prop_type) - { - case RSES_PROP_TYPE_SESCMD: - mysql_sescmd_done(&prop->rses_prop_data.sescmd); - break; - - case RSES_PROP_TYPE_TMPTABLES: - hashtable_free(prop->rses_prop_data.temp_tables); - break; - - default: - MXS_DEBUG("%lu [rses_property_done] Unknown property type %d " - "in property %p", - pthread_self(), - prop->rses_prop_type, - prop); - ss_dassert(false); - break; - } - MXS_FREE(prop); -} - -/** - * Add property to the router_client_ses structure's rses_properties - * array. The slot is determined by the type of property. - * In each slot there is a list of properties of similar type. - * - * Router client session must be locked. - */ -static void rses_property_add(SCHEMAROUTER_SESSION* rses, - rses_property_t* prop) -{ - rses_property_t* p; - - CHK_CLIENT_RSES(rses); - CHK_RSES_PROP(prop); - - prop->rses_prop_rsession = rses; - p = rses->rses_properties[prop->rses_prop_type]; - - if (p == NULL) - { - rses->rses_properties[prop->rses_prop_type] = prop; - } - else - { - while (p->rses_prop_next != NULL) - { - p = p->rses_prop_next; - } - p->rses_prop_next = prop; - } -} - -/** - * Router session must be locked. - * Return session command pointer if succeed, NULL if failed. - */ -static mysql_sescmd_t* rses_property_get_sescmd(rses_property_t* prop) -{ - CHK_RSES_PROP(prop); - mysql_sescmd_t *sescmd = &prop->rses_prop_data.sescmd; - - if (sescmd != NULL) - { - CHK_MYSQL_SESCMD(sescmd); - } - return sescmd; -} - -/** - * Create session command property. - */ -static mysql_sescmd_t* mysql_sescmd_init(rses_property_t* rses_prop, - GWBUF* sescmd_buf, - unsigned char packet_type, - SCHEMAROUTER_SESSION* rses) -{ - mysql_sescmd_t* sescmd; - - CHK_RSES_PROP(rses_prop); - /** Can't call rses_property_get_sescmd with uninitialized sescmd */ - sescmd = &rses_prop->rses_prop_data.sescmd; - sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */ -#if defined(SS_DEBUG) - sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD; - sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD; -#endif - /** Set session command buffer */ - sescmd->my_sescmd_buf = sescmd_buf; - sescmd->my_sescmd_packet_type = packet_type; - sescmd->position = atomic_add(&rses->pos_generator, 1); - return sescmd; -} - -static void mysql_sescmd_done(mysql_sescmd_t* sescmd) -{ - CHK_RSES_PROP(sescmd->my_sescmd_prop); - gwbuf_free(sescmd->my_sescmd_buf); - memset(sescmd, 0, sizeof(mysql_sescmd_t)); -} - -/** - * All cases where backend message starts at least with one response to session - * command are handled here. - * Read session commands from property list. If command is already replied, - * discard packet. Else send reply to client. In both cases move cursor forward - * until all session command replies are handled. - * - * Cases that are expected to happen and which are handled: - * s = response not yet replied to client, S = already replied response, - * q = query - * 1. q+ for example : select * from mysql.user - * 2. s+ for example : set autocommit=1 - * 3. S+ - * 4. sq+ - * 5. Sq+ - * 6. Ss+ - * 7. Ss+q+ - * 8. S+q+ - * 9. s+q+ - */ -static GWBUF* sescmd_cursor_process_replies(GWBUF* replybuf, - backend_ref_t* bref) -{ - mysql_sescmd_t* scmd; - sescmd_cursor_t* scur; - - scur = &bref->bref_sescmd_cur; - scmd = sescmd_cursor_get_command(scur); - - CHK_GWBUF(replybuf); - - /** - * Walk through packets in the message and the list of session - * commands. - */ - while (scmd != NULL && replybuf != NULL) - { - scur->position = scmd->position; - /** Faster backend has already responded to client : discard */ - if (scmd->my_sescmd_is_replied) - { - bool last_packet = false; - - CHK_GWBUF(replybuf); - - while (!last_packet) - { - int buflen; - - buflen = GWBUF_LENGTH(replybuf); - last_packet = GWBUF_IS_TYPE_RESPONSE_END(replybuf); - /** discard packet */ - replybuf = gwbuf_consume(replybuf, buflen); - } - /** Set response status received */ - bref_clear_state(bref, BREF_WAITING_RESULT); - } - /** Response is in the buffer and it will be sent to client. */ - else if (replybuf != NULL) - { - /** Mark the rest session commands as replied */ - scmd->my_sescmd_is_replied = true; - } - - if (sescmd_cursor_next(scur)) - { - scmd = sescmd_cursor_get_command(scur); - } - else - { - scmd = NULL; - /** All session commands are replied */ - scur->scmd_cur_active = false; - } - } - ss_dassert(replybuf == NULL || *scur->scmd_cur_ptr_property == NULL); - - return replybuf; -} - - - -/** - * Get the address of current session command. - * - * Router session must be locked */ -static mysql_sescmd_t* sescmd_cursor_get_command(sescmd_cursor_t* scur) -{ - mysql_sescmd_t* scmd; - - scur->scmd_cur_cmd = rses_property_get_sescmd(*scur->scmd_cur_ptr_property); - - CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); - - scmd = scur->scmd_cur_cmd; - - return scmd; -} - -/** router must be locked */ -static bool sescmd_cursor_is_active(sescmd_cursor_t* sescmd_cursor) -{ - bool succp; - - succp = sescmd_cursor->scmd_cur_active; - return succp; -} - -/** router must be locked */ -static void sescmd_cursor_set_active(sescmd_cursor_t* sescmd_cursor, - bool value) -{ - /** avoid calling unnecessarily */ - ss_dassert(sescmd_cursor->scmd_cur_active != value); - sescmd_cursor->scmd_cur_active = value; -} - -/** - * Clone session command's command buffer. - * Router session must be locked - */ -static GWBUF* sescmd_cursor_clone_querybuf(sescmd_cursor_t* scur) -{ - GWBUF* buf; - ss_dassert(scur->scmd_cur_cmd != NULL); - - buf = gwbuf_clone(scur->scmd_cur_cmd->my_sescmd_buf); - - CHK_GWBUF(buf); - return buf; -} - -static bool sescmd_cursor_history_empty(sescmd_cursor_t* scur) -{ - bool succp; - - CHK_SESCMD_CUR(scur); - - if (scur->scmd_cur_rses->rses_properties[RSES_PROP_TYPE_SESCMD] == NULL) - { - succp = true; - } - else - { - succp = false; - } - - return succp; -} - -static void sescmd_cursor_reset(sescmd_cursor_t* scur) -{ - SCHEMAROUTER_SESSION* rses; - CHK_SESCMD_CUR(scur); - CHK_CLIENT_RSES(scur->scmd_cur_rses); - rses = scur->scmd_cur_rses; - - scur->scmd_cur_ptr_property = &rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - - CHK_RSES_PROP((*scur->scmd_cur_ptr_property)); - scur->scmd_cur_active = false; - scur->scmd_cur_cmd = &(*scur->scmd_cur_ptr_property)->rses_prop_data.sescmd; -} - -static bool execute_sescmd_history(backend_ref_t* bref) -{ - bool succp; - sescmd_cursor_t* scur; - CHK_BACKEND_REF(bref); - - scur = &bref->bref_sescmd_cur; - CHK_SESCMD_CUR(scur); - - if (!sescmd_cursor_history_empty(scur)) - { - sescmd_cursor_reset(scur); - succp = execute_sescmd_in_backend(bref); - } - else - { - succp = true; - } - - return succp; -} - -/** - * If session command cursor is passive, sends the command to backend for - * execution. - * - * Returns true if command was sent or added successfully to the queue. - * Returns false if command sending failed or if there are no pending session - * commands. - * - * Router session must be locked. - */ -static bool execute_sescmd_in_backend(backend_ref_t* backend_ref) -{ - DCB* dcb; - bool succp; - int rc = 0; - sescmd_cursor_t* scur; - - if (BREF_IS_CLOSED(backend_ref)) - { - succp = false; - goto return_succp; - } - dcb = backend_ref->bref_dcb; - - CHK_DCB(dcb); - CHK_BACKEND_REF(backend_ref); - - /** - * Get cursor pointer and copy of command buffer to cursor. - */ - scur = &backend_ref->bref_sescmd_cur; - - /** Return if there are no pending ses commands */ - if (sescmd_cursor_get_command(scur) == NULL) - { - succp = false; - MXS_INFO("Cursor had no pending session commands."); - - goto return_succp; - } - - if (!sescmd_cursor_is_active(scur)) - { - /** Cursor is left active when function returns. */ - sescmd_cursor_set_active(scur, true); - } - - switch (scur->scmd_cur_cmd->my_sescmd_packet_type) - { - case MYSQL_COM_CHANGE_USER: - /** This makes it possible to handle replies correctly */ - gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD); - rc = dcb->func.auth(dcb, - NULL, - dcb->session, - sescmd_cursor_clone_querybuf(scur)); - break; - - case MYSQL_COM_QUERY: - default: - /** - * Mark session command buffer, it triggers writing - * MySQL command to protocol - */ - gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD); - rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur)); - break; - } - - if (rc == 1) - { - succp = true; - } - else - { - succp = false; - } -return_succp: - return succp; -} - - -/** - * Moves cursor to next property and copied address of its sescmd to cursor. - * Current propery must be non-null. - * If current property is the last on the list, *scur->scmd_ptr_property == NULL - * - * Router session must be locked - */ -static bool sescmd_cursor_next(sescmd_cursor_t* scur) -{ - bool succp = false; - rses_property_t* prop_curr; - rses_property_t* prop_next; - - ss_dassert(scur != NULL); - ss_dassert(*(scur->scmd_cur_ptr_property) != NULL); - - /** Illegal situation */ - if (scur == NULL || - *scur->scmd_cur_ptr_property == NULL || - scur->scmd_cur_cmd == NULL) - { - /** Log error */ - goto return_succp; - } - prop_curr = *(scur->scmd_cur_ptr_property); - - CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); - ss_dassert(prop_curr == mysql_sescmd_get_property(scur->scmd_cur_cmd)); - CHK_RSES_PROP(prop_curr); - - /** Copy address of pointer to next property */ - scur->scmd_cur_ptr_property = &(prop_curr->rses_prop_next); - prop_next = *scur->scmd_cur_ptr_property; - ss_dassert(prop_next == *(scur->scmd_cur_ptr_property)); - - /** If there is a next property move forward */ - if (prop_next != NULL) - { - CHK_RSES_PROP(prop_next); - CHK_RSES_PROP((*(scur->scmd_cur_ptr_property))); - - /** Get pointer to next property's sescmd */ - scur->scmd_cur_cmd = rses_property_get_sescmd(prop_next); - - ss_dassert(prop_next == scur->scmd_cur_cmd->my_sescmd_prop); - CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); - CHK_RSES_PROP(scur->scmd_cur_cmd->my_sescmd_prop); - } - else - { - /** No more properties, can't proceed. */ - goto return_succp; - } - - if (scur->scmd_cur_cmd != NULL) - { - succp = true; - } - else - { - ss_dassert(false); /*< Log error, sescmd shouldn't be NULL */ - } -return_succp: - return succp; -} - -static rses_property_t* mysql_sescmd_get_property(mysql_sescmd_t* scmd) -{ - CHK_MYSQL_SESCMD(scmd); - return scmd->my_sescmd_prop; -} - -/** - * @brief Get router capabilities - * - * @return Always RCAP_TYPE_CONTIGUOUS_INPUT - */ -static uint64_t getCapabilities(MXS_ROUTER* instance) -{ - return RCAP_TYPE_NONE; -} - -/** - * Execute in backends used by current router session. - * Save session variable commands to router session property - * struct. Thus, they can be replayed in backends which are - * started and joined later. - * - * Suppress redundant OK packets sent by backends. - * - * The first OK packet is replied to the client. - * Return true if succeed, false is returned if router session was closed or - * if execute_sescmd_in_backend failed. - */ -static bool route_session_write(SCHEMAROUTER_SESSION* router_cli_ses, - GWBUF* querybuf, - SCHEMAROUTER* inst, - unsigned char packet_type, - qc_query_type_t qtype) -{ - bool succp = false; - rses_property_t* prop; - backend_ref_t* backend_ref; - int i; - - MXS_INFO("Session write, routing to all servers."); - - backend_ref = router_cli_ses->rses_backend_ref; - - /** - * These are one-way messages and server doesn't respond to them. - * Therefore reply processing is unnecessary and session - * command property is not needed. It is just routed to all available - * backends. - */ - if (packet_type == MYSQL_COM_STMT_SEND_LONG_DATA || - packet_type == MYSQL_COM_QUIT || - packet_type == MYSQL_COM_STMT_CLOSE) - { - int rc; - - succp = true; - - /** Lock router session */ - if (router_cli_ses->closed) - { - return false; - } - - for (i = 0; i < router_cli_ses->rses_nbackends; i++) - { - DCB* dcb = backend_ref[i].bref_dcb; - - if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) - { - MXS_INFO("Route query to %s\t%s:%d%s", - (SERVER_IS_MASTER(backend_ref[i].bref_backend->server) ? - "master" : "slave"), - backend_ref[i].bref_backend->server->name, - backend_ref[i].bref_backend->server->port, - (i + 1 == router_cli_ses->rses_nbackends ? " <" : "")); - } - - if (BREF_IS_IN_USE((&backend_ref[i]))) - { - rc = dcb->func.write(dcb, gwbuf_clone(querybuf)); - if (rc != 1) - { - succp = false; - } - } - } - - gwbuf_free(querybuf); - return succp; - } - /** Lock router session */ - - if (router_cli_ses->rses_nbackends <= 0) - { - return false; - } - - if (router_cli_ses->rses_config.max_sescmd_hist > 0 && - router_cli_ses->n_sescmd >= router_cli_ses->rses_config.max_sescmd_hist) - { - MXS_ERROR("Router session exceeded session command history limit of %d. " - "Closing router session.", - router_cli_ses->rses_config.max_sescmd_hist); - gwbuf_free(querybuf); - atomic_add(&router_cli_ses->router->stats.n_hist_exceeded, 1); - poll_fake_hangup_event(router_cli_ses->rses_client_dcb); - - return succp; - } - - if (router_cli_ses->rses_config.disable_sescmd_hist) - { - rses_property_t *prop, *tmp; - backend_ref_t* bref; - bool conflict; - - prop = router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD]; - while (prop) - { - conflict = false; - - for (i = 0; i < router_cli_ses->rses_nbackends; i++) - { - bref = &backend_ref[i]; - if (BREF_IS_IN_USE(bref)) - { - - if (bref->bref_sescmd_cur.position <= prop->rses_prop_data.sescmd.position) - { - conflict = true; - break; - } - } - } - - if (conflict) - { - break; - } - - tmp = prop; - router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD] = prop->rses_prop_next; - rses_property_done(tmp); - prop = router_cli_ses->rses_properties[RSES_PROP_TYPE_SESCMD]; - } - } - - /** - * - * Additional reference is created to querybuf to - * prevent it from being released before properties - * are cleaned up as a part of router session clean-up. - */ - prop = rses_property_init(RSES_PROP_TYPE_SESCMD); - mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses); - - /** Add sescmd property to router client session */ - rses_property_add(router_cli_ses, prop); - atomic_add(&router_cli_ses->stats.longest_sescmd, 1); - atomic_add(&router_cli_ses->n_sescmd, 1); - - for (i = 0; i < router_cli_ses->rses_nbackends; i++) - { - if (BREF_IS_IN_USE((&backend_ref[i]))) - { - sescmd_cursor_t* scur; - - if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) - { - MXS_INFO("Route query to %s\t%s:%d%s", - (SERVER_IS_MASTER(backend_ref[i].bref_backend->server) ? - "master" : "slave"), - backend_ref[i].bref_backend->server->name, - backend_ref[i].bref_backend->server->port, - (i + 1 == router_cli_ses->rses_nbackends ? " <" : "")); - } - - scur = backend_ref_get_sescmd_cursor(&backend_ref[i]); - - /** - * Add one waiter to backend reference. - */ - bref_set_state(get_bref_from_dcb(router_cli_ses, - backend_ref[i].bref_dcb), - BREF_WAITING_RESULT); - /** - * Start execution if cursor is not already executing. - * Otherwise, cursor will execute pending commands - * when it completes with previous commands. - */ - if (sescmd_cursor_is_active(scur)) - { - succp = true; - - MXS_INFO("Backend %s:%d already executing sescmd.", - backend_ref[i].bref_backend->server->name, - backend_ref[i].bref_backend->server->port); - } - else - { - succp = execute_sescmd_in_backend(&backend_ref[i]); - - if (!succp) - { - MXS_ERROR("Failed to execute session " - "command in %s:%d", - backend_ref[i].bref_backend->server->name, - backend_ref[i].bref_backend->server->port); - } - } - } - else - { - succp = false; - } - } - - return succp; -} - /** * Error Handler routine to resolve _backend_ failures. If it succeeds then there * are enough operative backends available and connected. Otherwise it fails, @@ -3357,558 +3919,60 @@ static void handleError(MXS_ROUTER* instance, dcb_close(problem_dcb); } - -static void handle_error_reply_client(MXS_SESSION* ses, - SCHEMAROUTER_SESSION* rses, - DCB* backend_dcb, - GWBUF* errmsg) -{ - mxs_session_state_t sesstate; - DCB* client_dcb; - backend_ref_t* bref; - - sesstate = ses->state; - client_dcb = ses->client_dcb; - - /** - * If bref exists, mark it closed - */ - if ((bref = get_bref_from_dcb(rses, backend_dcb)) != NULL) - { - CHK_BACKEND_REF(bref); - bref_clear_state(bref, BREF_IN_USE); - bref_set_state(bref, BREF_CLOSED); - } - - if (sesstate == SESSION_STATE_ROUTER_READY) - { - CHK_DCB(client_dcb); - client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); - } -} - /** - * Check if a router session has servers in use - * @param rses Router client session - * @return True if session has a single backend server in use that is running. - * False if no backends are in use or running. - */ -bool have_servers(SCHEMAROUTER_SESSION* rses) -{ - for (int i = 0; i < rses->rses_nbackends; i++) - { - if (BREF_IS_IN_USE(&rses->rses_backend_ref[i]) && - !BREF_IS_CLOSED(&rses->rses_backend_ref[i])) - { - return true; - } - } - - return false; -} - -/** - * Check if there is backend reference pointing at failed DCB, and reset its - * flags. Then clear DCB's callback and finally try to reconnect. + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. * - * This must be called with router lock. - * - * @param inst router instance - * @param rses router client session - * @param dcb failed DCB - * @param errmsg error message which is sent to client if it is waiting - * - * @return true if there are enough backend connections to continue, false if not + * @return The module object */ -static bool handle_error_new_connection(SCHEMAROUTER* inst, - SCHEMAROUTER_SESSION* rses, - DCB* backend_dcb, - GWBUF* errmsg) +MXS_MODULE* MXS_CREATE_MODULE() { - backend_ref_t* bref; - bool succp; + MXS_NOTICE("Initializing Schema Sharding Router."); + spinlock_init(&instlock); + instances = NULL; - MXS_SESSION *ses = backend_dcb->session; - CHK_SESSION(ses); - - /** - * If bref == NULL it has been replaced already with another one. - */ - if ((bref = get_bref_from_dcb(rses, backend_dcb)) == NULL) + static MXS_ROUTER_OBJECT MyObject = { - succp = false; - goto return_succp; - } + createInstance, + newSession, + closeSession, + freeSession, + routeQuery, + diagnostic, + clientReply, + handleError, + NULL, + NULL + }; - CHK_BACKEND_REF(bref); - - /** - * If query was sent through the bref and it is waiting for reply from - * the backend server it is necessary to send an error to the client - * because it is waiting for reply. - */ - if (BREF_IS_WAITING_RESULT(bref)) + static MXS_MODULE info = { - DCB* client_dcb; - client_dcb = ses->client_dcb; - client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); - bref_clear_state(bref, BREF_WAITING_RESULT); - } - bref_clear_state(bref, BREF_IN_USE); - bref_set_state(bref, BREF_CLOSED); + MXS_MODULE_API_ROUTER, + MXS_MODULE_BETA_RELEASE, + MXS_ROUTER_VERSION, + "A database sharding router for simple sharding", + "V1.0.0", + RCAP_TYPE_CONTIGUOUS_INPUT, + &MyObject, + NULL, /* Process init. */ + NULL, /* Process finish. */ + NULL, /* Thread init. */ + NULL, /* Thread finish. */ + { + {"ignore_databases", MXS_MODULE_PARAM_STRING}, + {"ignore_databases_regex", MXS_MODULE_PARAM_STRING}, + {"max_sescmd_history", MXS_MODULE_PARAM_COUNT, "0"}, + {"disable_sescmd_history", MXS_MODULE_PARAM_BOOL, "false"}, + {"refresh_databases", MXS_MODULE_PARAM_BOOL, "true"}, + {"refresh_interval", MXS_MODULE_PARAM_COUNT, DEFAULT_REFRESH_INTERVAL}, + {"debug", MXS_MODULE_PARAM_BOOL, "false"}, + {MXS_END_MODULE_PARAMS} + } + }; - /** - * Error handler is already called for this DCB because - * it's not polling anymore. It can be assumed that - * it succeed because rses isn't closed. - */ - if (backend_dcb->state != DCB_STATE_POLLING) - { - succp = true; - goto return_succp; - } - - /** - * Try to get replacement slave or at least the minimum - * number of slave connections for router session. - */ - succp = connect_backend_servers(rses->rses_backend_ref, - rses->rses_nbackends, - ses, inst); - - if (!have_servers(rses)) - { - MXS_ERROR("No more valid servers, closing session"); - succp = false; - goto return_succp; - } - -return_succp: - return succp; + return &info; } -/** - * Finds out if there is a backend reference pointing at the DCB given as - * parameter. - * @param rses router client session - * @param dcb DCB - * - * @return backend reference pointer if succeed or NULL - */ -static backend_ref_t* get_bref_from_dcb(SCHEMAROUTER_SESSION *rses, - DCB *dcb) -{ - CHK_DCB(dcb); - CHK_CLIENT_RSES(rses); - - for (int i = 0; i < rses->rses_nbackends; i++) - { - if (rses->rses_backend_ref[i].bref_dcb == dcb) - { - return &rses->rses_backend_ref[i]; - } - } - - return NULL; -} - -static sescmd_cursor_t* backend_ref_get_sescmd_cursor(backend_ref_t* bref) -{ - CHK_BACKEND_REF(bref); - CHK_SESCMD_CUR((&bref->bref_sescmd_cur)); - return &bref->bref_sescmd_cur; -} - -/** - * Detect if a query contains a SHOW SHARDS query. - * @param query Query to inspect - * @return true if the query is a SHOW SHARDS query otherwise false - */ -bool detect_show_shards(GWBUF* query) -{ - bool rval = false; - char *querystr, *tok, *sptr; - - if (query == NULL) - { - MXS_ERROR("NULL value passed at %s:%d", __FILE__, __LINE__); - return false; - } - - if (!modutil_is_SQL(query) && !modutil_is_SQL_prepare(query)) - { - return false; - } - - if ((querystr = modutil_get_SQL(query)) == NULL) - { - MXS_ERROR("Failure to parse SQL at %s:%d", __FILE__, __LINE__); - return false; - } - - tok = strtok_r(querystr, " ", &sptr); - if (tok && strcasecmp(tok, "show") == 0) - { - tok = strtok_r(NULL, " ", &sptr); - if (tok && strcasecmp(tok, "shards") == 0) - { - rval = true; - } - } - - MXS_FREE(querystr); - return rval; -} - -struct shard_list -{ - HASHITERATOR* iter; - SCHEMAROUTER_SESSION* rses; - RESULTSET* rset; -}; - -/** - * Callback for the shard list result set creation - */ -RESULT_ROW* shard_list_cb(struct resultset* rset, void* data) -{ - char *key, *value; - struct shard_list *sl = (struct shard_list*)data; - RESULT_ROW* rval = NULL; - - if ((key = hashtable_next(sl->iter)) && - (value = hashtable_fetch(sl->rses->shardmap->hash, key))) - { - if ((rval = resultset_make_row(sl->rset))) - { - resultset_row_set(rval, 0, key); - resultset_row_set(rval, 1, value); - } - } - return rval; -} - -/** - * Send a result set of all shards and their locations to the client. - * @param rses Router client session - * @return 0 on success, -1 on error - */ -int process_show_shards(SCHEMAROUTER_SESSION* rses) -{ - int rval = 0; - - spinlock_acquire(&rses->shardmap->lock); - if (rses->shardmap->state != SHMAP_UNINIT) - { - HASHITERATOR* iter = hashtable_iterator(rses->shardmap->hash); - struct shard_list sl; - if (iter) - { - sl.iter = iter; - sl.rses = rses; - if ((sl.rset = resultset_create(shard_list_cb, &sl)) == NULL) - { - MXS_ERROR("[%s] Error: Failed to create resultset.", __FUNCTION__); - rval = -1; - } - else - { - resultset_add_column(sl.rset, "Database", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); - resultset_add_column(sl.rset, "Server", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); - resultset_stream_mysql(sl.rset, rses->rses_client_dcb); - resultset_free(sl.rset); - hashtable_iterator_free(iter); - } - } - else - { - MXS_ERROR("hashtable_iterator creation failed. " - "This is caused by a memory allocation failure."); - rval = -1; - } - } - spinlock_release(&rses->shardmap->lock); - return rval; -} - -/** - * - * @param dcb - * @param errnum - * @param mysqlstate - * @param errmsg - */ -void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg) -{ - GWBUF* errbuff = modutil_create_mysql_err_msg(1, 0, errnum, mysqlstate, errmsg); - if (errbuff) - { - if (dcb->func.write(dcb, errbuff) != 1) - { - MXS_ERROR("Failed to write error packet to client."); - } - } - else - { - MXS_ERROR("Memory allocation failed when creating error packet."); - } -} - -/** - * - * @param router_cli_ses - * @return - */ -bool handle_default_db(SCHEMAROUTER_SESSION *router_cli_ses) -{ - bool rval = false; - char* target = NULL; - - spinlock_acquire(&router_cli_ses->shardmap->lock); - if (router_cli_ses->shardmap->state != SHMAP_UNINIT) - { - target = hashtable_fetch(router_cli_ses->shardmap->hash, router_cli_ses->connect_db); - } - spinlock_release(&router_cli_ses->shardmap->lock); - - if (target) - { - /* Send a COM_INIT_DB packet to the server with the right database - * and set it as the client's active database */ - - unsigned int qlen = strlen(router_cli_ses->connect_db); - GWBUF* buffer = gwbuf_alloc(qlen + 5); - - if (buffer) - { - gw_mysql_set_byte3((unsigned char*) buffer->start, qlen + 1); - gwbuf_set_type(buffer, GWBUF_TYPE_MYSQL); - *((unsigned char*) buffer->start + 3) = 0x0; - *((unsigned char*) buffer->start + 4) = 0x2; - memcpy(buffer->start + 5, router_cli_ses->connect_db, qlen); - DCB* dcb = NULL; - - if (get_shard_dcb(&dcb, router_cli_ses, target)) - { - dcb->func.write(dcb, buffer); - MXS_DEBUG("USE '%s' sent to %s for session %p", - router_cli_ses->connect_db, - target, - router_cli_ses->rses_client_dcb->session); - rval = true; - } - else - { - MXS_INFO("Couldn't find target DCB for '%s'.", target); - } - } - else - { - MXS_ERROR("Buffer allocation failed."); - } - } - else - { - /** Unknown database, hang up on the client*/ - MXS_INFO("Connecting to a non-existent database '%s'", - router_cli_ses->connect_db); - char errmsg[128 + MYSQL_DATABASE_MAXLEN + 1]; - sprintf(errmsg, "Unknown database '%s'", router_cli_ses->connect_db); - if (router_cli_ses->rses_config.debug) - { - sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)", - router_cli_ses->rses_client_dcb->session->ses_id); - } - write_error_to_client(router_cli_ses->rses_client_dcb, - SCHEMA_ERR_DBNOTFOUND, - SCHEMA_ERRSTR_DBNOTFOUND, - errmsg); - } - - return rval; -} - -void route_queued_query(SCHEMAROUTER_SESSION *router_cli_ses) -{ - GWBUF* tmp = router_cli_ses->queue; - router_cli_ses->queue = router_cli_ses->queue->next; - tmp->next = NULL; -#ifdef SS_DEBUG - char* querystr = modutil_get_SQL(tmp); - MXS_DEBUG("Sending queued buffer for session %p: %s", - router_cli_ses->rses_client_dcb->session, - querystr); - MXS_FREE(querystr); -#endif - poll_add_epollin_event_to_dcb(router_cli_ses->rses_client_dcb, tmp); -} - -/** - * - * @param router_cli_ses Router client session - * @return 1 if mapping is done, 0 if it is still ongoing and -1 on error - */ -int inspect_backend_mapping_states(SCHEMAROUTER_SESSION *router_cli_ses, - backend_ref_t *bref, - GWBUF** wbuf) -{ - bool mapped = true; - GWBUF* writebuf = *wbuf; - backend_ref_t* bkrf = router_cli_ses->rses_backend_ref; - - for (int i = 0; i < router_cli_ses->rses_nbackends; i++) - { - if (bref->bref_dcb == bkrf[i].bref_dcb && !BREF_IS_MAPPED(&bkrf[i])) - { - if (bref->map_queue) - { - writebuf = gwbuf_append(bref->map_queue, writebuf); - bref->map_queue = NULL; - } - showdb_response_t rc = parse_showdb_response(router_cli_ses, - &router_cli_ses->rses_backend_ref[i], - &writebuf); - if (rc == SHOWDB_FULL_RESPONSE) - { - router_cli_ses->rses_backend_ref[i].bref_mapped = true; - MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p", - router_cli_ses->rses_backend_ref[i].bref_backend->server->unique_name, - router_cli_ses->rses_client_dcb->session); - } - else if (rc == SHOWDB_PARTIAL_RESPONSE) - { - bref->map_queue = writebuf; - writebuf = NULL; - MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p", - router_cli_ses->rses_backend_ref[i].bref_backend->server->unique_name, - router_cli_ses->rses_client_dcb->session); - } - else - { - DCB* client_dcb = NULL; - - if ((router_cli_ses->init & INIT_FAILED) == 0) - { - if (rc == SHOWDB_DUPLICATE_DATABASES) - { - MXS_ERROR("Duplicate databases found, closing session."); - } - else - { - MXS_ERROR("Fatal error when processing SHOW DATABASES response, closing session."); - } - client_dcb = router_cli_ses->rses_client_dcb; - - /** This is the first response to the database mapping which - * has duplicate database conflict. Set the initialization bitmask - * to INIT_FAILED */ - router_cli_ses->init |= INIT_FAILED; - - /** Send the client an error about duplicate databases - * if there is a queued query from the client. */ - if (router_cli_ses->queue) - { - GWBUF* error = modutil_create_mysql_err_msg(1, 0, - SCHEMA_ERR_DUPLICATEDB, - SCHEMA_ERRSTR_DUPLICATEDB, - "Error: duplicate databases " - "found on two different shards."); - - if (error) - { - client_dcb->func.write(client_dcb, error); - } - else - { - MXS_ERROR("Creating buffer for error message failed."); - } - } - } - *wbuf = writebuf; - return -1; - } - } - - if (BREF_IS_IN_USE(&bkrf[i]) && !BREF_IS_MAPPED(&bkrf[i])) - { - mapped = false; - MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p", - bkrf[i].bref_backend->server->unique_name, - router_cli_ses->rses_client_dcb->session); - } - } - *wbuf = writebuf; - return mapped ? 1 : 0; -} - -/** - * Replace a shard map with another one. This function copies the contents of - * the source shard map to the target and frees the source memory. - * @param target Target shard map to replace - * @param source Source shard map to use - */ -void replace_shard_map(shard_map_t **target, shard_map_t **source) -{ - shard_map_t *tgt = *target; - shard_map_t *src = *source; - tgt->last_updated = src->last_updated; - tgt->state = src->state; - hashtable_free(tgt->hash); - tgt->hash = src->hash; - MXS_FREE(src); - *source = NULL; -} - -/** - * Synchronize the router client session shard map with the global shard map for - * this user. - * - * If the router doesn't have a shard map for this user then the current shard map - * of the client session is added to the router. If the shard map in the router is - * out of date, its contents are replaced with the contents of the current client - * session. If the router has a usable shard map, the current shard map of the client - * is discarded and the router's shard map is used. - * @param client Router session - */ -void synchronize_shard_map(SCHEMAROUTER_SESSION *client) -{ - spinlock_acquire(&client->router->lock); - - client->router->stats.shmap_cache_miss++; - - shard_map_t *map = hashtable_fetch(client->router->shard_maps, - client->rses_client_dcb->user); - if (map) - { - spinlock_acquire(&map->lock); - if (map->state == SHMAP_STALE) - { - replace_shard_map(&map, &client->shardmap); - } - else if (map->state != SHMAP_READY) - { - MXS_WARNING("Shard map state is not ready but" - "it is in use. Replacing it with a newer one."); - replace_shard_map(&map, &client->shardmap); - } - else - { - /** - * Another thread has already updated the shard map for this user - */ - hashtable_free(client->shardmap->hash); - MXS_FREE(client->shardmap); - } - spinlock_release(&map->lock); - client->shardmap = map; - } - else - { - hashtable_add(client->router->shard_maps, - client->rses_client_dcb->user, - client->shardmap); - ss_dassert(hashtable_fetch(client->router->shard_maps, - client->rses_client_dcb->user) == client->shardmap); - } - spinlock_release(&client->router->lock); -} +MXS_END_DECLS diff --git a/server/modules/routing/schemarouter/schemarouter.h b/server/modules/routing/schemarouter/schemarouter.h index 43e755047..38cf8c537 100644 --- a/server/modules/routing/schemarouter/schemarouter.h +++ b/server/modules/routing/schemarouter/schemarouter.h @@ -265,7 +265,7 @@ typedef struct backend_ref_st GWBUF* map_queue; SERVER_REF* bref_backend; /*< Backend server */ DCB* bref_dcb; /*< Backend DCB */ - bref_state_t bref_state; /*< State of the backend */ + int bref_state; /*< State of the backend */ bool bref_mapped; /*< Whether the backend has been mapped */ bool last_sescmd_replied; int bref_num_result_wait; /*< Number of not yet received results */ @@ -335,7 +335,7 @@ struct schemarouter_session * mapped to the servers that contain them */ char connect_db[MYSQL_DATABASE_MAXLEN + 1]; /*< Database the user was trying to connect to */ char current_db[MYSQL_DATABASE_MAXLEN + 1]; /*< Current active database */ - init_mask_t init; /*< Initialization state bitmask */ + int init; /*< Initialization state bitmask */ GWBUF* queue; /*< Query that was received before the session was ready */ DCB* dcb_route; /*< Internal DCB used to trigger re-routing of buffers */ DCB* dcb_reply; /*< Internal DCB used to send replies to the client */ diff --git a/server/modules/routing/schemarouter/sharding_common.c b/server/modules/routing/schemarouter/sharding_common.c deleted file mode 100644 index 28a0bc10a..000000000 --- a/server/modules/routing/schemarouter/sharding_common.c +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright (c) 2016 MariaDB Corporation Ab - * - * Use of this software is governed by the Business Source License included - * in the LICENSE.TXT file and at www.mariadb.com/bsl11. - * - * Change Date: 2019-07-01 - * - * On the date above, in accordance with the Business Source License, use - * of this software will be governed by version 2 or later of the General - * Public License. - */ - -#include "sharding_common.h" -#include -#include - -/** - * Extract the database name from a COM_INIT_DB or literal USE ... query. - * @param buf Buffer with the database change query - * @param str Pointer where the database name is copied - * @return True for success, false for failure - */ -bool extract_database(GWBUF* buf, char* str) -{ - uint8_t* packet; - char *saved, *tok, *query = NULL; - bool succp = true; - unsigned int plen; - - packet = GWBUF_DATA(buf); - plen = gw_mysql_get_byte3(packet) - 1; - - /** Copy database name from MySQL packet to session */ - if (qc_get_operation(buf) == QUERY_OP_CHANGE_DB) - { - const char *delim = "` \n\t;"; - - query = modutil_get_SQL(buf); - tok = strtok_r(query, delim, &saved); - - if (tok == NULL || strcasecmp(tok, "use") != 0) - { - MXS_ERROR("extract_database: Malformed chage database packet."); - succp = false; - goto retblock; - } - - tok = strtok_r(NULL, delim, &saved); - - if (tok == NULL) - { - MXS_ERROR("extract_database: Malformed change database packet."); - succp = false; - goto retblock; - } - - strncpy(str, tok, MYSQL_DATABASE_MAXLEN); - } - else - { - memcpy(str, packet + 5, plen); - memset(str + plen, 0, 1); - } -retblock: - MXS_FREE(query); - return succp; -} - -/** - * Create a fake error message from a DCB. - * @param fail_str Custom error message - * @param dcb DCB to use as the origin of the error - */ -void create_error_reply(char* fail_str, DCB* dcb) -{ - MXS_INFO("change_current_db: failed to change database: %s", fail_str); - GWBUF* errbuf = modutil_create_mysql_err_msg(1, 0, 1049, "42000", fail_str); - - if (errbuf == NULL) - { - MXS_ERROR("Creating buffer for error message failed."); - return; - } - /** Set flags that help router to identify session commands reply */ - gwbuf_set_type(errbuf, GWBUF_TYPE_MYSQL); - gwbuf_set_type(errbuf, GWBUF_TYPE_SESCMD_RESPONSE); - gwbuf_set_type(errbuf, GWBUF_TYPE_RESPONSE_END); - - poll_add_epollin_event_to_dcb(dcb, - errbuf); -} - -/** - * Read new database name from MYSQL_COM_INIT_DB packet or a literal USE ... COM_QUERY - * packet, check that it exists in the hashtable and copy its name to MYSQL_session. - * - * @param dest Destination where the database name will be written - * @param dbhash Hashtable containing valid databases - * @param buf Buffer containing the database change query - * - * @return true if new database is set, false if non-existent database was tried - * to be set - */ -bool change_current_db(char* dest, - HASHTABLE* dbhash, - GWBUF* buf) -{ - char* target; - bool succp; - char db[MYSQL_DATABASE_MAXLEN + 1]; - if (GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5) - { - /** Copy database name from MySQL packet to session */ - if (!extract_database(buf, db)) - { - succp = false; - goto retblock; - } - MXS_INFO("change_current_db: INIT_DB with database '%s'", db); - /** - * Update the session's active database only if it's in the hashtable. - * If it isn't found, send a custom error packet to the client. - */ - - if ((target = (char*)hashtable_fetch(dbhash, (char*)db)) == NULL) - { - succp = false; - goto retblock; - } - else - { - strcpy(dest, db); - MXS_INFO("change_current_db: database is on server: '%s'.", target); - succp = true; - goto retblock; - } - } - else - { - /** Create error message */ - MXS_ERROR("change_current_db: failed to change database: Query buffer too large"); - MXS_INFO("change_current_db: failed to change database: " - "Query buffer too large [%ld bytes]", GWBUF_LENGTH(buf)); - succp = false; - goto retblock; - } - -retblock: - return succp; -} diff --git a/server/modules/routing/schemarouter/sharding_common.h b/server/modules/routing/schemarouter/sharding_common.h deleted file mode 100644 index 5ad404c86..000000000 --- a/server/modules/routing/schemarouter/sharding_common.h +++ /dev/null @@ -1,35 +0,0 @@ -#pragma once -#ifndef _SHARDING_COMMON_HG -#define _SHARDING_COMMON_HG -/* - * Copyright (c) 2016 MariaDB Corporation Ab - * - * Use of this software is governed by the Business Source License included - * in the LICENSE.TXT file and at www.mariadb.com/bsl11. - * - * Change Date: 2019-07-01 - * - * On the date above, in accordance with the Business Source License, use - * of this software will be governed by version 2 or later of the General - * Public License. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -MXS_BEGIN_DECLS - -bool extract_database(GWBUF* buf, char* str); -void create_error_reply(char* fail_str, DCB* dcb); -bool change_current_db(char* dest, HASHTABLE* dbhash, GWBUF* buf); - -MXS_END_DECLS - -#endif