From d151512d2074d6879147dba7c024ae92f0750a06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sun, 26 Mar 2017 23:51:17 +0300 Subject: [PATCH] Take new sharding implementation into use The sharding implementation now uses a class to abstract the details of the shard. This allows for different design where each session makes a copy of the global shard map which is then used for the duration of the session. In addition to making the desing a bit clearer to understand, it also removes lock competition between threads. Due to the change to C++, the main entry points need to be wrapped in the exception-safety macros. The next step in the refactoring will be to use the router template. This will remove the need to manually define them. --- .../routing/schemarouter/schemarouter.cc | 688 +++++++----------- .../routing/schemarouter/schemarouter.h | 14 +- .../modules/routing/schemarouter/shard_map.cc | 141 ++-- .../modules/routing/schemarouter/shard_map.hh | 151 ++-- 4 files changed, 431 insertions(+), 563 deletions(-) diff --git a/server/modules/routing/schemarouter/schemarouter.cc b/server/modules/routing/schemarouter/schemarouter.cc index 21e236260..964de6ff8 100644 --- a/server/modules/routing/schemarouter/schemarouter.cc +++ b/server/modules/routing/schemarouter/schemarouter.cc @@ -32,12 +32,10 @@ #include using std::string; +using std::map; #define DEFAULT_REFRESH_INTERVAL "300" -/** Hashtable size for the per user shard maps */ -#define SCHEMAROUTER_USERHASH_SIZE 10 - /** * @file schemarouter.c The entry points for the simple sharding router module. */ @@ -55,10 +53,6 @@ static bool get_shard_dcb(DCB** dcb, char* name); static bool execute_sescmd_in_backend(backend_ref_t* backend_ref); -static void tracelog_routed_query(SCHEMAROUTER_SESSION* rses, - char* funcname, - backend_ref_t* bref, - GWBUF* buf); static bool route_session_write(SCHEMAROUTER_SESSION* router_client_ses, GWBUF* querybuf, SCHEMAROUTER* inst, @@ -75,10 +69,8 @@ static void handle_error_reply_client(MXS_SESSION* ses, SCHEMAROUTER_SESSION* rses, DCB* backend_dcb, GWBUF* errmsg); -bool change_current_db(char* dest, HASHTABLE* dbhash, GWBUF* buf); +bool change_current_db(char* dest, Shard& shard, GWBUF* buf); bool extract_database(GWBUF* buf, char* str); -static SPINLOCK instlock; -static SCHEMAROUTER* instances; bool detect_show_shards(GWBUF* query); int process_show_shards(SCHEMAROUTER_SESSION* rses); @@ -91,20 +83,6 @@ bool handle_default_db(SCHEMAROUTER_SESSION *router_cli_ses); void route_queued_query(SCHEMAROUTER_SESSION *router_cli_ses); void synchronize_shard_map(SCHEMAROUTER_SESSION *client); -bool check_server_status(SERVER_REF *servers, char* target) -{ - for (SERVER_REF *ref = servers; ref; ref = ref->next) - { - if (strcmp(ref->server->unique_name, target) == 0 && - SERVER_IS_RUNNING(ref->server)) - { - return true; - } - } - - return false; -} - /** * Convert a length encoded string into a C string. * @param data Pointer to the first byte of the string @@ -177,7 +155,7 @@ char* get_lenenc_str(void* data) showdb_response_t parse_showdb_response(SCHEMAROUTER_SESSION* rses, backend_ref_t* bref, GWBUF** buffer) { unsigned char* ptr; - char* target = bref->bref_backend->server->unique_name; + SERVER* target = bref->bref_backend->server; GWBUF* buf; bool duplicate_found = false; showdb_response_t rval = SHOWDB_PARTIAL_RESPONSE; @@ -225,7 +203,6 @@ showdb_response_t parse_showdb_response(SCHEMAROUTER_SESSION* rses, backend_ref_ ptr += gw_mysql_get_byte3(ptr) + 4; } - spinlock_acquire(&rses->shardmap->lock); while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr)) { int payloadlen = gw_mysql_get_byte3(ptr); @@ -234,22 +211,23 @@ showdb_response_t parse_showdb_response(SCHEMAROUTER_SESSION* rses, backend_ref_ if (data) { - if (hashtable_add(rses->shardmap->hash, data, target)) + if (rses->shardmap.add_location(data, target)) { - MXS_INFO("<%s, %s>", target, data); + MXS_INFO("<%s, %s>", target->unique_name, data); } else { - if (!(hashtable_fetch(rses->router->ignored_dbs, data) || + if (!(rses->router->ignored_dbs.find(data) != rses->router->ignored_dbs.end() || (rses->router->ignore_regex && pcre2_match(rses->router->ignore_regex, (PCRE2_SPTR)data, PCRE2_ZERO_TERMINATED, 0, 0, rses->router->ignore_match_data, NULL) >= 0))) { duplicate_found = true; + SERVER *duplicate = rses->shardmap.get_location(data); + MXS_ERROR("Database '%s' found on servers '%s' and '%s' for user %s@%s.", - data, target, - (char*)hashtable_fetch(rses->shardmap->hash, data), + data, target->unique_name, duplicate->unique_name, rses->rses_client_dcb->user, rses->rses_client_dcb->remote); } @@ -258,7 +236,6 @@ showdb_response_t parse_showdb_response(SCHEMAROUTER_SESSION* rses, backend_ref_ } ptr += packetlen; } - spinlock_release(&rses->shardmap->lock); if (ptr < (unsigned char*) buf->end && PTR_IS_EOF(ptr) && bref->n_mapping_eof == 1) { @@ -347,112 +324,116 @@ int gen_databaselist(SCHEMAROUTER* inst, SCHEMAROUTER_SESSION* session) * @param buffer Query to inspect * @return Name of the backend or NULL if the query contains no known databases. */ -char* get_shard_target_name(SCHEMAROUTER* router, - SCHEMAROUTER_SESSION* client, - GWBUF* buffer, - uint32_t qtype) +SERVER* get_shard_target(SCHEMAROUTER* router, + SCHEMAROUTER_SESSION* client, + GWBUF* buffer, + uint32_t qtype) { - int sz = 0, i, j; - char** dbnms = NULL; - char* rval = NULL, *query, *tmp = NULL; + SERVER *rval = NULL; bool has_dbs = false; /**If the query targets any database other than the current one*/ + const QC_FIELD_INFO* info; + size_t n_info; - dbnms = qc_get_database_names(buffer, &sz); + qc_get_field_info(buffer, &info, &n_info); - HASHTABLE* ht = client->shardmap->hash; - - if (sz > 0) + for (size_t i = 0; i < n_info; i++) { - for (i = 0; i < sz; i++) + if (info[i].database) { - char* name; - if ((name = (char*)hashtable_fetch(ht, dbnms[i]))) + if (strcmp(info[i].database, "information_schema") == 0 && rval == NULL) { - if (strcmp(dbnms[i], "information_schema") == 0 && rval == NULL) + has_dbs = false; + } + else + { + SERVER* target = client->shardmap.get_location(info[i].database); + + if (target) { - has_dbs = false; - } - else - { - /** Warn about improper usage of the router */ - if (rval && strcmp(name, rval) != 0) + if (rval && target != rval) { MXS_ERROR("Query targets databases on servers '%s' and '%s'. " "Cross database queries across servers are not supported.", - rval, name); + rval->unique_name, target->unique_name); } else if (rval == NULL) { - rval = name; + rval = target; has_dbs = true; - MXS_INFO("Query targets database '%s' on server '%s'", dbnms[i], rval); + MXS_INFO("Query targets database '%s' on server '%s'", + info[i].database, rval->unique_name); } } } - MXS_FREE(dbnms[i]); } - MXS_FREE(dbnms); } /* Check if the query is a show tables query with a specific database */ if (qc_query_is_type(qtype, QUERY_TYPE_SHOW_TABLES)) { - query = modutil_get_SQL(buffer); + char *query = modutil_get_SQL(buffer); + char *tmp; + if ((tmp = strcasestr(query, "from"))) { const char *delim = "` \n\t;"; char *saved, *tok = strtok_r(tmp, delim, &saved); tok = strtok_r(NULL, delim, &saved); - ss_dassert(tok != NULL); - tmp = (char*) hashtable_fetch(ht, tok); - if (tmp) + if (tok) { - MXS_INFO("SHOW TABLES with specific database '%s' on server '%s'", tok, tmp); + rval = client->shardmap.get_location(tok); + + if (rval) + { + MXS_INFO("SHOW TABLES with specific database '%s' on server '%s'", tok, tmp); + } } } MXS_FREE(query); - if (tmp == NULL) + if (rval == NULL) { - rval = (char*) hashtable_fetch(ht, client->current_db); - MXS_INFO("SHOW TABLES query, current database '%s' on server '%s'", - client->current_db, rval); + rval = client->shardmap.get_location(client->current_db); + + if (rval) + { + MXS_INFO("SHOW TABLES query, current database '%s' on server '%s'", + client->current_db, rval->unique_name); + } } else { - rval = tmp; has_dbs = true; } } - else + else if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER) { - if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER) + for (int i = 0; i < client->rses_nbackends; i++) { - for (i = 0; i < client->rses_nbackends; i++) - { + char *srvnm = client->rses_backend_ref[i].bref_backend->server->unique_name; - char *srvnm = client->rses_backend_ref[i].bref_backend->server->unique_name; - if (strcmp(srvnm, (char*)buffer->hint->data) == 0) - { - rval = srvnm; - MXS_INFO("Routing hint found (%s)", srvnm); - } + if (strcmp(srvnm, (char*)buffer->hint->data) == 0) + { + rval = client->rses_backend_ref[i].bref_backend->server; + MXS_INFO("Routing hint found (%s)", rval->unique_name); } } - if (rval == NULL && !has_dbs && client->current_db[0] != '\0') + if (rval == NULL && !has_dbs && client->current_db[0]) { /** * If the target name has not been found and the session has an * active database, set is as the target */ - rval = (char*) hashtable_fetch(ht, client->current_db); + rval = client->shardmap.get_location(client->current_db); + if (rval) { - MXS_INFO("Using active database '%s'", client->current_db); + MXS_INFO("Using active database '%s' on '%s'", + client->current_db, rval->unique_name); } } } @@ -554,19 +535,6 @@ static route_target_t get_shard_route_target(uint32_t qtype) return target; } -int cmpfn(const void* a, const void *b) -{ - return strcmp(*(char* const *)a, *(char* const *)b); -} - -/** Internal structure used to stream the list of databases */ -struct string_array -{ - char** array; - int position; - int size; -}; - /** * Callback for the database list streaming. * @param rset Result set which is being processed @@ -576,12 +544,16 @@ struct string_array */ RESULT_ROW *result_set_cb(struct resultset * rset, void *data) { - RESULT_ROW *row = NULL; - struct string_array *strarray = (struct string_array*) data; + RESULT_ROW *row = resultset_make_row(rset); + ServerMap* arr = (ServerMap*) data; - if (strarray->position < strarray->size && (row = resultset_make_row(rset))) + if (row) { - if (resultset_row_set(row, 0, strarray->array[strarray->position++]) == 0) + if (arr->size() > 0 && resultset_row_set(row, 0, arr->begin()->first.c_str())) + { + arr->erase(arr->begin()); + } + else { resultset_free_row(row); row = NULL; @@ -602,44 +574,20 @@ RESULT_ROW *result_set_cb(struct resultset * rset, void *data) bool send_database_list(SCHEMAROUTER* router, SCHEMAROUTER_SESSION* client) { bool rval = false; - spinlock_acquire(&client->shardmap->lock); - if (client->shardmap->state != SHMAP_UNINIT) - { - struct string_array strarray; - const int size = hashtable_size(client->shardmap->hash); - strarray.array = (char**)MXS_MALLOC(size * sizeof(char*)); - MXS_ABORT_IF_NULL(strarray.array); - strarray.position = 0; - HASHITERATOR *iter = hashtable_iterator(client->shardmap->hash); - RESULTSET* resultset = resultset_create(result_set_cb, &strarray); - if (strarray.array && iter && resultset) - { - char *key; - int i = 0; - while ((key = (char*)hashtable_next(iter))) - { - char *value = (char*)hashtable_fetch(client->shardmap->hash, key); - SERVER * server = server_find_by_unique_name(value); - if (SERVER_IS_RUNNING(server)) - { - strarray.array[i++] = key; - } - } - strarray.size = i; - qsort(strarray.array, strarray.size, sizeof(char*), cmpfn); - if (resultset_add_column(resultset, "Database", MYSQL_DATABASE_MAXLEN, - COL_TYPE_VARCHAR)) - { - resultset_stream_mysql(resultset, client->rses_client_dcb); - rval = true; - } - } - resultset_free(resultset); - hashtable_iterator_free(iter); - MXS_FREE(strarray.array); + ServerMap dblist; + client->shardmap.get_content(dblist); + + RESULTSET* resultset = resultset_create(result_set_cb, &dblist); + + if (resultset_add_column(resultset, "Database", MYSQL_DATABASE_MAXLEN, + COL_TYPE_VARCHAR)) + { + resultset_stream_mysql(resultset, client->rses_client_dcb); + rval = true; } - spinlock_release(&client->shardmap->lock); + resultset_free(resultset); + return rval; } @@ -1046,27 +994,18 @@ static void handle_error_reply_client(MXS_SESSION* ses, DCB* backend_dcb, GWBUF* errmsg) { - mxs_session_state_t sesstate; - DCB* client_dcb; - backend_ref_t* bref; + backend_ref_t* bref = get_bref_from_dcb(rses, backend_dcb); - sesstate = ses->state; - client_dcb = ses->client_dcb; - - /** - * If bref exists, mark it closed - */ - if ((bref = get_bref_from_dcb(rses, backend_dcb)) != NULL) + if (bref) { CHK_BACKEND_REF(bref); bref_clear_state(bref, BREF_IN_USE); bref_set_state(bref, BREF_CLOSED); } - if (sesstate == SESSION_STATE_ROUTER_READY) + if (ses->state == SESSION_STATE_ROUTER_READY) { - CHK_DCB(client_dcb); - client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); + ses->client_dcb->func.write(ses->client_dcb, gwbuf_clone(errmsg)); } } @@ -1210,31 +1149,21 @@ bool detect_show_shards(GWBUF* query) return rval; } -struct shard_list -{ - HASHITERATOR* iter; - SCHEMAROUTER_SESSION* rses; - RESULTSET* rset; -}; - /** * Callback for the shard list result set creation */ RESULT_ROW* shard_list_cb(struct resultset* rset, void* data) { - char *key, *value; - struct shard_list *sl = (struct shard_list*)data; - RESULT_ROW* rval = NULL; + ServerMap* pContent = (ServerMap*)data; + RESULT_ROW* rval = resultset_make_row(rset); - if ((key = (char*)hashtable_next(sl->iter)) && - (value = (char*)hashtable_fetch(sl->rses->shardmap->hash, key))) + if (rval) { - if ((rval = resultset_make_row(sl->rset))) - { - resultset_row_set(rval, 0, key); - resultset_row_set(rval, 1, value); - } + resultset_row_set(rval, 0, pContent->begin()->first.c_str()); + resultset_row_set(rval, 1, pContent->begin()->second->unique_name); + pContent->erase(pContent->begin()); } + return rval; } @@ -1245,39 +1174,21 @@ RESULT_ROW* shard_list_cb(struct resultset* rset, void* data) */ int process_show_shards(SCHEMAROUTER_SESSION* rses) { - int rval = 0; + int rval = -1; - spinlock_acquire(&rses->shardmap->lock); - if (rses->shardmap->state != SHMAP_UNINIT) + ServerMap pContent; + rses->shardmap.get_content(pContent); + RESULTSET* rset = resultset_create(shard_list_cb, &pContent); + + if (rset) { - HASHITERATOR* iter = hashtable_iterator(rses->shardmap->hash); - struct shard_list sl; - if (iter) - { - sl.iter = iter; - sl.rses = rses; - if ((sl.rset = resultset_create(shard_list_cb, &sl)) == NULL) - { - MXS_ERROR("[%s] Error: Failed to create resultset.", __FUNCTION__); - rval = -1; - } - else - { - resultset_add_column(sl.rset, "Database", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); - resultset_add_column(sl.rset, "Server", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); - resultset_stream_mysql(sl.rset, rses->rses_client_dcb); - resultset_free(sl.rset); - hashtable_iterator_free(iter); - } - } - else - { - MXS_ERROR("hashtable_iterator creation failed. " - "This is caused by a memory allocation failure."); - rval = -1; - } + resultset_add_column(rset, "Database", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); + resultset_add_column(rset, "Server", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); + resultset_stream_mysql(rset, rses->rses_client_dcb); + resultset_free(rset); + rval = 0; } - spinlock_release(&rses->shardmap->lock); + return rval; } @@ -1312,14 +1223,7 @@ void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const c bool handle_default_db(SCHEMAROUTER_SESSION *router_cli_ses) { bool rval = false; - char* target = NULL; - - spinlock_acquire(&router_cli_ses->shardmap->lock); - if (router_cli_ses->shardmap->state != SHMAP_UNINIT) - { - target = (char*)hashtable_fetch(router_cli_ses->shardmap->hash, router_cli_ses->connect_db); - } - spinlock_release(&router_cli_ses->shardmap->lock); + SERVER* target = router_cli_ses->shardmap.get_location(router_cli_ses->connect_db); if (target) { @@ -1339,18 +1243,18 @@ bool handle_default_db(SCHEMAROUTER_SESSION *router_cli_ses) memcpy(data + 5, router_cli_ses->connect_db, qlen); DCB* dcb = NULL; - if (get_shard_dcb(&dcb, router_cli_ses, target)) + if (get_shard_dcb(&dcb, router_cli_ses, target->unique_name)) { dcb->func.write(dcb, buffer); MXS_DEBUG("USE '%s' sent to %s for session %p", router_cli_ses->connect_db, - target, + target->unique_name, router_cli_ses->rses_client_dcb->session); rval = true; } else { - MXS_INFO("Couldn't find target DCB for '%s'.", target); + MXS_INFO("Couldn't find target DCB for '%s'.", target->unique_name); } } else @@ -1505,26 +1409,8 @@ int inspect_backend_mapping_states(SCHEMAROUTER_SESSION *router_cli_ses, */ void synchronize_shard_map(SCHEMAROUTER_SESSION *client) { - spinlock_acquire(&client->router->lock); - client->router->stats.shmap_cache_miss++; - - shard_map_t *map = (shard_map_t *)hashtable_fetch(client->router->shard_maps, - client->rses_client_dcb->user); - if (map) - { - map = get_latest_shard_map(map, client->shardmap); - } - else - { - /** No previous map found */ - hashtable_add(client->router->shard_maps, - client->rses_client_dcb->user, - client->shardmap); - ss_dassert(hashtable_fetch(client->router->shard_maps, - client->rses_client_dcb->user) == client->shardmap); - } - spinlock_release(&client->router->lock); + client->router->shard_manager.update_shard(client->shardmap, client->rses_client_dcb->user); } /** @@ -1614,51 +1500,37 @@ void create_error_reply(char* fail_str, DCB* dcb) * @return true if new database is set, false if non-existent database was tried * to be set */ -bool change_current_db(char* dest, - HASHTABLE* dbhash, - GWBUF* buf) +bool change_current_db(char* dest, Shard& shard, GWBUF* buf) { - char* target; - bool succp; + bool succp = false; char db[MYSQL_DATABASE_MAXLEN + 1]; + if (GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5) { /** Copy database name from MySQL packet to session */ - if (!extract_database(buf, db)) + if (extract_database(buf, db)) { - succp = false; - goto retblock; - } - MXS_INFO("change_current_db: INIT_DB with database '%s'", db); - /** - * Update the session's active database only if it's in the hashtable. - * If it isn't found, send a custom error packet to the client. - */ + MXS_INFO("change_current_db: INIT_DB with database '%s'", db); + /** + * Update the session's active database only if it's in the hashtable. + * If it isn't found, send a custom error packet to the client. + */ - if ((target = (char*)hashtable_fetch(dbhash, (char*)db)) == NULL) - { - succp = false; - goto retblock; - } - else - { - strcpy(dest, db); - MXS_INFO("change_current_db: database is on server: '%s'.", target); - succp = true; - goto retblock; + SERVER* target = shard.get_location(db); + + if (target) + { + strcpy(dest, db); + MXS_INFO("change_current_db: database is on server: '%s'.", target->unique_name); + succp = true; + } } } else { - /** Create error message */ MXS_ERROR("change_current_db: failed to change database: Query buffer too large"); - MXS_INFO("change_current_db: failed to change database: " - "Query buffer too large [%ld bytes]", GWBUF_LENGTH(buf)); - succp = false; - goto retblock; } -retblock: return succp; } @@ -1673,42 +1545,17 @@ MXS_BEGIN_DECLS * * @return NULL in failure, pointer to router in success. */ -static MXS_ROUTER* createInstance(SERVICE *service, char **options) +static MXS_ROUTER* do_createInstance(SERVICE *service, char **options) { - SCHEMAROUTER* router = NULL; MXS_CONFIG_PARAMETER* conf; MXS_CONFIG_PARAMETER* param; - MXS_EXCEPTION_GUARD(router = new SCHEMAROUTER); - - if (router == NULL) - { - return NULL; - } - - if ((router->ignored_dbs = hashtable_alloc(SCHEMAROUTER_HASHSIZE, hashkeyfun, hashcmpfun)) == NULL) - { - MXS_ERROR("Memory allocation failed when allocating schemarouter database ignore list."); - MXS_FREE(router); - return NULL; - } - - hashtable_memory_fns(router->ignored_dbs, hashtable_item_strdup, NULL, hashtable_item_free, NULL); - - if ((router->shard_maps = hashtable_alloc(SCHEMAROUTER_USERHASH_SIZE, hashkeyfun, hashcmpfun)) == NULL) - { - MXS_ERROR("Memory allocation failed when allocating schemarouter database ignore list."); - hashtable_free(router->ignored_dbs); - MXS_FREE(router); - return NULL; - } - - hashtable_memory_fns(router->shard_maps, hashtable_item_strdup, NULL, keyfreefun, NULL); + SCHEMAROUTER* router = new SCHEMAROUTER; /** Add default system databases to ignore */ - hashtable_add(router->ignored_dbs, (void*)"mysql", (void*)""); - hashtable_add(router->ignored_dbs, (void*)"information_schema", (void*)""); - hashtable_add(router->ignored_dbs, (void*)"performance_schema", (void*)""); + router->ignored_dbs.insert("mysql"); + router->ignored_dbs.insert("information_schema"); + router->ignored_dbs.insert("performance_schema"); router->service = service; router->schemarouter_config.last_refresh = time(NULL); router->stats.longest_sescmd = 0; @@ -1745,8 +1592,7 @@ static MXS_ROUTER* createInstance(SERVICE *service, char **options) pcre2_get_error_message(errcode, errbuf, sizeof(errbuf)); MXS_ERROR("Regex compilation failed at %d for regex '%s': %s", (int)erroffset, param->value, errbuf); - hashtable_free(router->ignored_dbs); - MXS_FREE(router); + delete router; return NULL; } @@ -1757,8 +1603,7 @@ static MXS_ROUTER* createInstance(SERVICE *service, char **options) MXS_ERROR("PCRE2 match data creation failed. This" " is most likely caused by a lack of available memory."); pcre2_code_free(re); - hashtable_free(router->ignored_dbs); - MXS_FREE(router); + delete router; return NULL; } @@ -1777,7 +1622,7 @@ static MXS_ROUTER* createInstance(SERVICE *service, char **options) while (tok) { - hashtable_add(router->ignored_dbs, tok, (void*)""); + router->ignored_dbs.insert(tok); tok = strtok_r(NULL, sep, &sptr); } } @@ -1835,6 +1680,15 @@ static MXS_ROUTER* createInstance(SERVICE *service, char **options) return (MXS_ROUTER *)router; } +static MXS_ROUTER* createInstance(SERVICE *service, char **options) +{ + MXS_ROUTER* rval = NULL; + + MXS_EXCEPTION_GUARD((rval = do_createInstance(service, options))); + + return rval; +} + /** * Associate a new session with this instance of the router. * @@ -1845,7 +1699,7 @@ static MXS_ROUTER* createInstance(SERVICE *service, char **options) * @param session The session itself * @return Session specific data for this session */ -static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* session) +static MXS_ROUTER_SESSION* do_newSession(MXS_ROUTER* router_inst, MXS_SESSION* session) { char db[MYSQL_DATABASE_MAXLEN + 1] = ""; MySQLProtocol* protocol = (MySQLProtocol*)session->client_dcb->protocol; @@ -1871,14 +1725,7 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess MXS_INFO("Client'%s' connecting with empty database.", data->user); } - SCHEMAROUTER_SESSION* client_rses = NULL; - - MXS_EXCEPTION_GUARD(client_rses = new SCHEMAROUTER_SESSION); - - if (client_rses == NULL) - { - return NULL; - } + SCHEMAROUTER_SESSION* client_rses = new SCHEMAROUTER_SESSION; SCHEMAROUTER* router = (SCHEMAROUTER*)router_inst; @@ -1889,37 +1736,10 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess client_rses->router = router; client_rses->rses_mysql_session = (MYSQL_session*)session->client_dcb->data; client_rses->rses_client_dcb = (DCB*)session->client_dcb; + client_rses->queue = NULL; + client_rses->shardmap = router->shard_manager.get_shard(session->client_dcb->user, + router->schemarouter_config.refresh_min_interval); - spinlock_acquire(&router->lock); - - shard_map_t *map = (shard_map_t*)hashtable_fetch(router->shard_maps, session->client_dcb->user); - enum shard_map_state state; - - if (map) - { - state = shard_map_update_state(map, router->schemarouter_config.refresh_min_interval); - } - - spinlock_release(&router->lock); - - if (map == NULL || state != SHMAP_READY) - { - if ((map = shard_map_alloc()) == NULL) - { - MXS_ERROR("Failed to allocate enough memory to create" - "new shard mapping. Session will be closed."); - delete client_rses; - return NULL; - } - client_rses->init = INIT_UNINT; - } - else - { - client_rses->init = INIT_READY; - atomic_add(&router->stats.shmap_cache_hit, 1); - } - - client_rses->shardmap = map; memcpy(&client_rses->rses_config, &router->schemarouter_config, sizeof(schemarouter_config_t)); client_rses->n_sescmd = 0; client_rses->rses_config.last_refresh = time(NULL); @@ -1944,19 +1764,11 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess int router_nservers = router->service->n_dbref; - backend_ref_t* backend_ref = NULL; - /** * Create backend reference objects for this session. */ - MXS_EXCEPTION_GUARD(backend_ref = new backend_ref_t[router_nservers]); - - if (backend_ref == NULL) - { - delete client_rses; - return NULL; - } + backend_ref_t* backend_ref = new backend_ref_t[router_nservers]; /** * Initialize backend references with BACKEND ptr. @@ -1998,7 +1810,7 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess if (!succp || client_rses->closed) { - delete client_rses->rses_backend_ref; + delete[] client_rses->rses_backend_ref; delete client_rses; return NULL; } @@ -2014,7 +1826,14 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess return (MXS_ROUTER_SESSION*)client_rses; } +static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* session) +{ + MXS_ROUTER_SESSION* rval = NULL; + MXS_EXCEPTION_GUARD((rval = do_newSession(router_inst, session))); + + return rval; +} /** * Close a session with the router, this is the mechanism @@ -2023,7 +1842,7 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess * @param instance The router instance data * @param session The session being closed */ -static void closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session) +static void do_closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session) { SCHEMAROUTER_SESSION *router_cli_ses = (SCHEMAROUTER_SESSION *)router_session; CHK_CLIENT_RSES(router_cli_ses); @@ -2088,7 +1907,12 @@ static void closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_sessio } } -static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_client_session) +static void closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session) +{ + MXS_EXCEPTION_GUARD(do_closeSession(instance, router_session)); +} + +static void do_freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_client_session) { SCHEMAROUTER_SESSION* router_cli_ses = (SCHEMAROUTER_SESSION *)router_client_session; @@ -2107,6 +1931,11 @@ static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_ return; } +static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_client_session) +{ + MXS_EXCEPTION_GUARD(do_freeSession(router_instance, router_client_session)); +} + /** * The main routing entry, this is called with every packet that is * received and has to be forwarded to the backend database. @@ -2128,7 +1957,7 @@ static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_ * an error message is sent to the client. * */ -static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, GWBUF* qbuf) +static int do_routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, GWBUF* querybuf) { uint32_t qtype = QUERY_TYPE_UNKNOWN; uint8_t packet_type; @@ -2140,12 +1969,10 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, bool change_successful = false; route_target_t route_target = TARGET_UNDEFINED; bool succp = false; - char* tname = NULL; - char* targetserver = NULL; - GWBUF* querybuf = qbuf; char db[MYSQL_DATABASE_MAXLEN + 1]; char errbuf[26 + MYSQL_DATABASE_MAXLEN]; CHK_CLIENT_RSES(router_cli_ses); + SERVER* target = NULL; ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf)); @@ -2154,11 +1981,10 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, return 0; } - if (router_cli_ses->init & INIT_UNINT) + if (router_cli_ses->shardmap.empty()) { /* Generate database list */ gen_databaselist(inst, router_cli_ses); - } /** @@ -2215,8 +2041,8 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, if (detect_show_shards(querybuf)) { process_show_shards(router_cli_ses); - ret = 1; - goto retblock; + gwbuf_free(querybuf); + return 1; } switch (packet_type) @@ -2286,11 +2112,9 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, if (packet_type == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB) { - spinlock_acquire(&router_cli_ses->shardmap->lock); change_successful = change_current_db(router_cli_ses->current_db, - router_cli_ses->shardmap->hash, + router_cli_ses->shardmap, querybuf); - spinlock_release(&router_cli_ses->shardmap->lock); if (!change_successful) { time_t now = time(NULL); @@ -2298,23 +2122,15 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, difftime(now, router_cli_ses->rses_config.last_refresh) > router_cli_ses->rses_config.refresh_min_interval) { - spinlock_acquire(&router_cli_ses->shardmap->lock); - router_cli_ses->shardmap->state = SHMAP_STALE; - spinlock_release(&router_cli_ses->shardmap->lock); router_cli_ses->rses_config.last_refresh = now; router_cli_ses->queue = querybuf; - int rc_refresh = 1; - if ((router_cli_ses->shardmap = shard_map_alloc())) - { - gen_databaselist(inst, router_cli_ses); - } - else - { - rc_refresh = 0; - } - return rc_refresh; + // Reset the shard map by constructing a new Shard + router_cli_ses->shardmap = Shard(); + gen_databaselist(inst, router_cli_ses); + + return 1; } extract_database(querybuf, db); snprintf(errbuf, 25 + MYSQL_DATABASE_MAXLEN, "Unknown database: %s", db); @@ -2331,8 +2147,8 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, errbuf); MXS_ERROR("Changing database failed."); - ret = 1; - goto retblock; + gwbuf_free(querybuf); + return 1; } } @@ -2343,7 +2159,9 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, { ret = 1; } - goto retblock; + + gwbuf_free(querybuf); + return ret; } route_target = get_shard_route_target(qtype); @@ -2351,44 +2169,37 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, if (packet_type == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB) { route_target = TARGET_UNDEFINED; + target = router_cli_ses->shardmap.get_location(router_cli_ses->current_db); - spinlock_acquire(&router_cli_ses->shardmap->lock); - tname = (char*)hashtable_fetch(router_cli_ses->shardmap->hash, router_cli_ses->current_db); - - - if (tname) + if (target) { MXS_INFO("INIT_DB for database '%s' on server '%s'", - router_cli_ses->current_db, tname); + router_cli_ses->current_db, target->unique_name); route_target = TARGET_NAMED_SERVER; - targetserver = MXS_STRDUP_A(tname); } else { MXS_INFO("INIT_DB with unknown database"); } - spinlock_release(&router_cli_ses->shardmap->lock); } else if (route_target != TARGET_ALL) { /** If no database is found in the query and there is no active database - * or hints in the query we need to route the query to the first available + * or hints in the query we route the query to the first available * server. This isn't ideal for monitoring server status but works if * we just want the server to send an error back. */ - spinlock_acquire(&router_cli_ses->shardmap->lock); - if ((tname = get_shard_target_name(inst, router_cli_ses, querybuf, qtype)) != NULL) - { - bool shard_ok = check_server_status(inst->service->dbref, tname); + target = get_shard_target(inst, router_cli_ses, querybuf, qtype); - if (shard_ok) + if (target) + { + if (SERVER_IS_RUNNING(target)) { route_target = TARGET_NAMED_SERVER; - targetserver = MXS_STRDUP_A(tname); } else { - MXS_INFO("Backend server '%s' is not in a viable state", tname); + MXS_INFO("Backend server '%s' is not in a viable state", target->unique_name); /** * Shard is not a viable target right now so we check @@ -2397,15 +2208,13 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, */ } } - spinlock_release(&router_cli_ses->shardmap->lock); } if (TARGET_IS_UNDEFINED(route_target)) { - spinlock_acquire(&router_cli_ses->shardmap->lock); - tname = get_shard_target_name(inst, router_cli_ses, querybuf, qtype); + target = get_shard_target(inst, router_cli_ses, querybuf, qtype); - if ((tname == NULL && + if ((target == NULL && packet_type != MYSQL_COM_INIT_DB && router_cli_ses->current_db[0] == '\0') || packet_type == MYSQL_COM_FIELD_LIST || @@ -2422,10 +2231,6 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, } else { - if (tname) - { - targetserver = MXS_STRDUP_A(tname); - } if (!change_successful) { /** @@ -2441,10 +2246,10 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, /** Something else went wrong, terminate connection */ ret = 0; } - spinlock_release(&router_cli_ses->shardmap->lock); - goto retblock; + + gwbuf_free(querybuf); + return ret; } - spinlock_release(&router_cli_ses->shardmap->lock); } if (TARGET_IS_ALL(route_target)) @@ -2455,7 +2260,7 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, * Router locking is done inside the function. */ succp = route_session_write(router_cli_ses, - gwbuf_clone(querybuf), + querybuf, inst, packet_type, qtype); @@ -2466,7 +2271,9 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, atomic_add(&inst->stats.n_queries, 1); ret = 1; } - goto retblock; + + gwbuf_free(querybuf); + return ret; } if (TARGET_IS_ANY(route_target)) @@ -2477,7 +2284,7 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, if (SERVER_IS_RUNNING(server)) { route_target = TARGET_NAMED_SERVER; - targetserver = MXS_STRDUP_A(server->unique_name); + target = server; break; } } @@ -2486,8 +2293,8 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, { /**No valid backends alive*/ MXS_ERROR("Failed to route query, no backends are available."); - ret = 0; - goto retblock; + gwbuf_free(querybuf); + return 0; } } @@ -2495,20 +2302,20 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, /** * Query is routed to one of the backends */ - if (TARGET_IS_NAMED_SERVER(route_target) && targetserver != NULL) + if (TARGET_IS_NAMED_SERVER(route_target) && target) { /** * Search backend server by name or replication lag. * If it fails, then try to find valid slave or master. */ - succp = get_shard_dcb(&target_dcb, router_cli_ses, targetserver); + succp = get_shard_dcb(&target_dcb, router_cli_ses, target->unique_name); if (!succp) { MXS_INFO("Was supposed to route to named server " "%s but couldn't find the server in a " - "suitable state.", targetserver); + "suitable state.", target->unique_name); } } @@ -2529,10 +2336,8 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, { ss_dassert((bref->bref_pending_cmd == NULL || router_cli_ses->closed)); - bref->bref_pending_cmd = gwbuf_clone(querybuf); - - ret = 1; - goto retblock; + bref->bref_pending_cmd = querybuf; + return 1; } if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1) @@ -2554,13 +2359,20 @@ static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, } } -retblock: - MXS_FREE(targetserver); gwbuf_free(querybuf); return ret; } +static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, GWBUF* qbuf) +{ + int rval = 0; + + MXS_EXCEPTION_GUARD((rval = do_routeQuery(instance, router_session, qbuf))); + + return rval; +} + /** * Diagnostics routine * @@ -2569,7 +2381,7 @@ retblock: * @param instance The router instance * @param dcb The DCB for diagnostic output */ -static void diagnostic(MXS_ROUTER *instance, DCB *dcb) +static void do_diagnostic(MXS_ROUTER *instance, DCB *dcb) { SCHEMAROUTER *router = (SCHEMAROUTER *)instance; int i = 0; @@ -2603,6 +2415,11 @@ static void diagnostic(MXS_ROUTER *instance, DCB *dcb) dcb_printf(dcb, "\n"); } +static void diagnostic(MXS_ROUTER *instance, DCB *dcb) +{ + MXS_EXCEPTION_GUARD(do_diagnostic(instance, dcb)); +} + /** * Client Reply routine * @@ -2613,10 +2430,10 @@ static void diagnostic(MXS_ROUTER *instance, DCB *dcb) * @param backend_dcb The backend DCB * @param queue The GWBUF with reply data */ -static void clientReply(MXS_ROUTER* instance, - MXS_ROUTER_SESSION* router_session, - GWBUF* buffer, - DCB* backend_dcb) +static void do_clientReply(MXS_ROUTER* instance, + MXS_ROUTER_SESSION* router_session, + GWBUF* buffer, + DCB* backend_dcb) { backend_ref_t* bref; GWBUF* writebuf = buffer; @@ -2664,12 +2481,6 @@ static void clientReply(MXS_ROUTER* instance, if (rc == 1) { - spinlock_acquire(&router_cli_ses->shardmap->lock); - - router_cli_ses->shardmap->state = SHMAP_READY; - router_cli_ses->shardmap->last_updated = time(NULL); - spinlock_release(&router_cli_ses->shardmap->lock); - synchronize_shard_map(router_cli_ses); /* @@ -2746,12 +2557,14 @@ static void clientReply(MXS_ROUTER* instance, * needs to be sent to client or NULL. */ if (router_cli_ses->replied_sescmd < router_cli_ses->sent_sescmd && - bref->session_commands.begin()->get_position() == router_cli_ses->replied_sescmd + 1) + bref->session_commands.front().get_position() == router_cli_ses->replied_sescmd + 1) { ++router_cli_ses->replied_sescmd; } else { + /** The reply to this session command has already been sent + * to the client. */ gwbuf_free(writebuf); writebuf = NULL; } @@ -2837,8 +2650,14 @@ static void clientReply(MXS_ROUTER* instance, gwbuf_free(bref->bref_pending_cmd); bref->bref_pending_cmd = NULL; } +} - return; +static void clientReply(MXS_ROUTER* instance, + MXS_ROUTER_SESSION* router_session, + GWBUF* buffer, + DCB* backend_dcb) +{ + MXS_EXCEPTION_GUARD(do_clientReply(instance, router_session, buffer, backend_dcb)); } /** @@ -2856,12 +2675,12 @@ static void clientReply(MXS_ROUTER* instance, * Even if succp == true connecting to new slave may have failed. succp is to * tell whether router has enough master/slave connections to continue work. */ -static void handleError(MXS_ROUTER* instance, - MXS_ROUTER_SESSION* router_session, - GWBUF* errmsgbuf, - DCB* problem_dcb, - mxs_error_action_t action, - bool* succp) +static void do_handleError(MXS_ROUTER* instance, + MXS_ROUTER_SESSION* router_session, + GWBUF* errmsgbuf, + DCB* problem_dcb, + mxs_error_action_t action, + bool* succp) { ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER); CHK_DCB(problem_dcb); @@ -2892,6 +2711,17 @@ static void handleError(MXS_ROUTER* instance, dcb_close(problem_dcb); } +static void handleError(MXS_ROUTER* instance, + MXS_ROUTER_SESSION* router_session, + GWBUF* errmsgbuf, + DCB* problem_dcb, + mxs_error_action_t action, + bool* succp) +{ + MXS_EXCEPTION_GUARD(do_handleError(instance, router_session, errmsgbuf, + problem_dcb, action, succp)); +} + /** * The module entry point routine. It is this routine that * must populate the structure that is referred to as the @@ -2903,8 +2733,6 @@ static void handleError(MXS_ROUTER* instance, MXS_MODULE* MXS_CREATE_MODULE() { MXS_NOTICE("Initializing Schema Sharding Router."); - spinlock_init(&instlock); - instances = NULL; static MXS_ROUTER_OBJECT MyObject = { diff --git a/server/modules/routing/schemarouter/schemarouter.h b/server/modules/routing/schemarouter/schemarouter.h index 4d10c8812..19a26bc53 100644 --- a/server/modules/routing/schemarouter/schemarouter.h +++ b/server/modules/routing/schemarouter/schemarouter.h @@ -28,6 +28,10 @@ #define MXS_MODULE_NAME "schemarouter" #include + +#include +#include + #include #include #include @@ -250,7 +254,7 @@ struct schemarouter_session bool rses_transaction_active; /*< Is a transaction active */ struct schemarouter_instance *router; /*< The router instance */ struct schemarouter_session* next; /*< List of router sessions */ - shard_map_t *shardmap; /*< Database hash containing names of the databases + Shard shardmap; /**< Database hash containing names of the databases * mapped to the servers that contain them */ char connect_db[MYSQL_DATABASE_MAXLEN + 1]; /*< Database the user was trying to connect to */ char current_db[MYSQL_DATABASE_MAXLEN + 1]; /*< Current active database */ @@ -275,7 +279,7 @@ struct schemarouter_session */ typedef struct schemarouter_instance { - HASHTABLE* shard_maps; /*< Shard maps hashed by user name */ + ShardManager shard_manager; /*< Shard maps hashed by user name */ SERVICE* service; /*< Pointer to service */ SCHEMAROUTER_SESSION* connections; /*< List of client connections */ SPINLOCK lock; /*< Lock for the instance data */ @@ -286,9 +290,9 @@ typedef struct schemarouter_instance ROUTER_STATS stats; /*< Statistics for this router */ struct schemarouter_instance* next; /*< Next router on the list */ bool available_slaves; /*< The router has some slaves available */ - HASHTABLE* ignored_dbs; /*< List of databases to ignore when the - * database mapping finds multiple servers - * with the same database */ + std::set ignored_dbs; /*< List of databases to ignore when the + * database mapping finds multiple servers + * with the same database */ pcre2_code* ignore_regex; /*< Databases matching this regex will * not cause the session to be terminated * if they are found on more than one server. */ diff --git a/server/modules/routing/schemarouter/shard_map.cc b/server/modules/routing/schemarouter/shard_map.cc index 0f82f2bb2..f6025997d 100644 --- a/server/modules/routing/schemarouter/shard_map.cc +++ b/server/modules/routing/schemarouter/shard_map.cc @@ -15,109 +15,96 @@ #include -int hashkeyfun(const void* key) +Shard::Shard(): + m_last_updated(time(NULL)) { - if (key == NULL) - { - return 0; - } - int hash = 0, c = 0; - const char* ptr = (const char*)key; - while ((c = *ptr++)) - { - hash = c + (hash << 6) + (hash << 16) - hash; - } - - return hash; } -int hashcmpfun(const void* v1, const void* v2) +Shard::~Shard() { - const char* i1 = (const char*) v1; - const char* i2 = (const char*) v2; - - return strcmp(i1, i2); } -void keyfreefun(void* data) +bool Shard::add_location(string db, SERVER* target) { - MXS_FREE(data); + return m_map.insert(make_pair(db, target)).second; } -shard_map_t* shard_map_alloc() +SERVER* Shard::get_location(string db) { - shard_map_t *rval = (shard_map_t*) MXS_MALLOC(sizeof(shard_map_t)); + SERVER* rval = NULL; + ServerMap::iterator iter = m_map.find(db); - if (rval) + if (iter != m_map.end()) { - if ((rval->hash = hashtable_alloc(SCHEMAROUTER_HASHSIZE, hashkeyfun, hashcmpfun))) - { - HASHCOPYFN kcopy = (HASHCOPYFN)strdup; - hashtable_memory_fns(rval->hash, kcopy, kcopy, keyfreefun, keyfreefun); - spinlock_init(&rval->lock); - rval->last_updated = 0; - rval->state = SHMAP_UNINIT; - } - else - { - MXS_FREE(rval); - rval = NULL; - } + rval = iter->second; } + return rval; } -enum shard_map_state shard_map_update_state(shard_map_t *self, double refresh_min_interval) +bool Shard::stale(double max_interval) const { - spinlock_acquire(&self->lock); - double tdiff = difftime(time(NULL), self->last_updated); - if (tdiff > refresh_min_interval) - { - self->state = SHMAP_STALE; - } - enum shard_map_state state = self->state; - spinlock_release(&self->lock); - return state; + time_t now = time(NULL); + + return difftime(now, m_last_updated) > max_interval; } -void replace_shard_map(shard_map_t **target, shard_map_t **source) +bool Shard::empty() const { - shard_map_t *tgt = *target; - shard_map_t *src = *source; - tgt->last_updated = src->last_updated; - tgt->state = src->state; - hashtable_free(tgt->hash); - tgt->hash = src->hash; - MXS_FREE(src); - *source = NULL; + return m_map.size() == 0; } -shard_map_t* get_latest_shard_map(shard_map_t *stored, shard_map_t *current) +void Shard::get_content(ServerMap& dest) { - shard_map_t *map = stored; - - spinlock_acquire(&map->lock); - - if (map->state == SHMAP_STALE) + for (ServerMap::iterator it = m_map.begin(); it != m_map.end(); it++) { - replace_shard_map(&map, ¤t); + dest.insert(*it); } - else if (map->state != SHMAP_READY) +} + +bool Shard::newer_than(const Shard& shard) const +{ + return m_last_updated > shard.m_last_updated; +} + +ShardManager::ShardManager() +{ + spinlock_init(&m_lock); +} + +ShardManager::~ShardManager() +{ +} + +Shard ShardManager::get_shard(string user, double max_interval) +{ + SpinLockGuard guard(m_lock); + + ShardMap::iterator iter = m_maps.find(user); + + if (iter == m_maps.end() || iter->second.stale(max_interval)) { - MXS_WARNING("Shard map state is not ready but" - "it is in use. Replacing it with a newer one."); - replace_shard_map(&map, ¤t); - } - else - { - /** - * Another thread has already updated the shard map for this user - */ - hashtable_free(current->hash); - MXS_FREE(current); + // No previous shard or a stale shard, construct a new one + + if (iter != m_maps.end()) + { + m_maps.erase(iter); + } + + return Shard(); } - spinlock_release(&map->lock); + // Found valid shard + return iter->second; +} - return map; -} \ No newline at end of file +void ShardManager::update_shard(Shard& shard, string user) +{ + SpinLockGuard guard(m_lock); + ShardMap::iterator iter = m_maps.find(user); + + if (iter == m_maps.end() || shard.newer_than(iter->second)) + { + m_maps[user] = shard; + } +} diff --git a/server/modules/routing/schemarouter/shard_map.hh b/server/modules/routing/schemarouter/shard_map.hh index e4fffee91..b4705d7b1 100644 --- a/server/modules/routing/schemarouter/shard_map.hh +++ b/server/modules/routing/schemarouter/shard_map.hh @@ -15,65 +15,114 @@ #include +#include +#include +#include + #include #include #include -enum shard_map_state +using namespace maxscale; +using std::map; +using std::string; + +/** This contains the database to server mapping */ +typedef map ServerMap; + +class Shard { - SHMAP_UNINIT, /*< No databases have been added to this shard map */ - SHMAP_READY, /*< All available databases have been added */ - SHMAP_STALE /*< The shard map has old data or has not been updated recently */ +public: + Shard(); + ~Shard(); + + /** + * @brief Add a database location + * + * @param db Database to add + * @param target Target where database is located + * + * @return True if location was added + */ + bool add_location(string db, SERVER* target); + + /** + * @brief Retrieve the location of a database + * + * @param db Database to locate + * + * @return The database or NULL if no server contains the database + */ + SERVER* get_location(string db); + + /** + * @brief Check if shard contains stale information + * + * @param max_interval The maximum lifetime of the shard + * + * @return True if the shard is stale + */ + bool stale(double max_interval) const; + + /** + * @brief Check if shard is empty + * + * @return True if shard contains no locations + */ + bool empty() const; + + /** + * @brief Retrieve all database to server mappings + * + * @param keys A map where the database to server mappings are added + */ + void get_content(ServerMap& dest); + + /** + * @brief Check if this shard is newer than the other shard + * + * @param shard The other shard to check + * + * @return True if this shard is newer + */ + bool newer_than(const Shard& shard) const; + +private: + ServerMap m_map; + time_t m_last_updated; }; -/** - * A map of the shards tied to a single user. - */ -typedef struct shard_map +typedef map ShardMap; + +class ShardManager { - HASHTABLE *hash; /*< A hashtable of database names and the servers which - * have these databases. */ - SPINLOCK lock; - time_t last_updated; - enum shard_map_state state; /*< State of the shard map */ -} shard_map_t; +public: + ShardManager(); + ~ShardManager(); -/** TODO: Replace these */ -int hashkeyfun(const void* key); -int hashcmpfun(const void *, const void *); -void keyfreefun(void* data); + /** + * @brief Retrieve or create a shard + * + * @param user User whose shard to retrieve + * @param max_lifetime The maximum lifetime of a shard + * + * @return The latest version of the shard or a newly created shard if no + * old version is available + */ + Shard get_shard(string user, double max_lifetime); -/** TODO: Don't use this everywhere */ -/** Size of the hashtable used to store ignored databases */ -#define SCHEMAROUTER_HASHSIZE 100 + /** + * @brief Update the shard information + * + * The shard information is updated if the new shard contains more up to date + * information than the one stored in the shard manager. + * + * @param shard New version of the shard + * @param user The user whose shard this is + */ + void update_shard(Shard& shard, string user); -/** - * Allocate a shard map and initialize it. - * @return Pointer to new shard_map_t or NULL if memory allocation failed - */ -shard_map_t* shard_map_alloc(); - -/** - * Check if the shard map is out of date and update its state if necessary. - * @param router Router instance - * @param map Shard map to update - * @return Current state of the shard map - */ -enum shard_map_state shard_map_update_state(shard_map_t *self, double refresh_min_interval); - -/** - * Replace a shard map with another one. This function copies the contents of - * the source shard map to the target and frees the source memory. - * @param target Target shard map to replace - * @param source Source shard map to use - */ -void replace_shard_map(shard_map_t **target, shard_map_t **source); - -/** - * Return the newer of two shard maps - * - * @param stored The currently stored shard map - * @param current The replacement map the current client is using - * @return The newer of the two shard maps - */ -shard_map_t* get_latest_shard_map(shard_map_t *stored, shard_map_t *current); +private: + SPINLOCK m_lock; + ShardMap m_maps; +};