diff --git a/server/modules/routing/schemarouter/schemaroutersession.cc b/server/modules/routing/schemarouter/schemaroutersession.cc index 0d1cecda4..940b662f2 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -1255,9 +1255,6 @@ bool SchemaRouterSession::ignore_duplicate_database(const char* data) */ enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bref, GWBUF** buffer) { - unsigned char* ptr; - SERVER* target = bref->backend()->server; - GWBUF* buf; bool duplicate_found = false; enum showdb_response rval = SHOWDB_PARTIAL_RESPONSE; @@ -1268,7 +1265,8 @@ enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bre /** TODO: Don't make the buffer contiguous but process it as a buffer chain */ *buffer = gwbuf_make_contiguous(*buffer); - buf = modutil_get_complete_packets(buffer); + MXS_ABORT_IF_NULL(*buffer); + GWBUF* buf = modutil_get_complete_packets(buffer); if (buf == NULL) { @@ -1276,11 +1274,11 @@ enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bre } int n_eof = 0; - ptr = (unsigned char*) buf->start; + uint8_t* ptr = (uint8_t*) buf->start; if (PTR_IS_ERR(ptr)) { - MXS_INFO("SHOW DATABASES returned an error."); + MXS_INFO("Mapping query returned an error."); gwbuf_free(buf); return SHOWDB_FATAL_ERROR; } @@ -1288,14 +1286,14 @@ enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bre if (n_eof == 0) { /** Skip column definitions */ - while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr)) + while (ptr < (uint8_t*) buf->end && !PTR_IS_EOF(ptr)) { ptr += gw_mysql_get_byte3(ptr) + 4; } - if (ptr >= (unsigned char*) buf->end) + if (ptr >= (uint8_t*) buf->end) { - MXS_INFO("Malformed packet for SHOW DATABASES."); + MXS_INFO("Malformed packet for mapping query."); *buffer = gwbuf_append(buf, *buffer); return SHOWDB_FATAL_ERROR; } @@ -1305,11 +1303,12 @@ enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bre ptr += gw_mysql_get_byte3(ptr) + 4; } - while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr)) + while (ptr < (uint8_t*) buf->end && !PTR_IS_EOF(ptr)) { int payloadlen = gw_mysql_get_byte3(ptr); int packetlen = payloadlen + 4; char* data = get_lenenc_str(ptr + 4); + SERVER* target = bref->backend()->server; if (data) { @@ -1368,112 +1367,6 @@ enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bre return rval; } -enum showdb_response SchemaRouterSession::parse_table_mapping_response(SSRBackend& bref, GWBUF** buffer) -{ - unsigned char* ptr; - SERVER* target = bref->backend()->server; - GWBUF* buf; - bool duplicate_found = false; - enum showdb_response rval = SHOWDB_PARTIAL_RESPONSE; - if (buffer == NULL || *buffer == NULL) - { - return SHOWDB_FATAL_ERROR; - } - - /** TODO: Don't make the buffer contiguous but process it as a buffer chain */ - *buffer = gwbuf_make_contiguous(*buffer); - buf = modutil_get_complete_packets(buffer); - - if (buf == NULL) - { - return SHOWDB_PARTIAL_RESPONSE; - } - int n_eof = 0; - - ptr = (unsigned char*) buf->start; - - if (PTR_IS_ERR(ptr)) - { - MXS_INFO("Selecting tables returned an error."); - gwbuf_free(buf); - return SHOWDB_FATAL_ERROR; - } - - if (n_eof == 0) - { - /** Skip column definitions */ - while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr)) - { - ptr += gw_mysql_get_byte3(ptr) + 4; - } - - if (ptr >= (unsigned char*) buf->end) - { - MXS_INFO("Malformed packet for select tables query."); - *buffer = gwbuf_append(buf, *buffer); - return SHOWDB_FATAL_ERROR; - } - - n_eof++; - /** Skip first EOF packet */ - ptr += gw_mysql_get_byte3(ptr) + 4; - } - - while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr)) - { - int payloadlen = gw_mysql_get_byte3(ptr); - int packetlen = payloadlen + 4; - char* data = get_lenenc_str(ptr + 4); - MXS_INFO("<%s, %s>", target->name, data); - - if (data) - { - if (m_shard.add_table_location(data, target)) - { - MXS_INFO("<%s, %s>", target->name, data); - } - else - { - duplicate_found = true; - SERVER *duplicate = m_shard.get_table_location(data); - - MXS_ERROR("Table '%s' found on servers '%s' and '%s' for user %s@%s.", - data, target->name, duplicate->name, - m_client->user, m_client->remote); - - } - MXS_FREE(data); - } - ptr += packetlen; - } - - if (ptr < (unsigned char*) buf->end && PTR_IS_EOF(ptr) && n_eof == 1) - { - n_eof++; - MXS_INFO("Select tables query fully received from %s.", - bref->backend()->server->name); - } - else - { - MXS_INFO("Select tables query partially received from %s.", - bref->backend()->server->name); - } - - gwbuf_free(buf); - - if (duplicate_found) - { - rval = SHOWDB_DUPLICATE_DATABASES; - } - else if (n_eof == 2) - { - rval = SHOWDB_FULL_RESPONSE; - } - - return rval; -} - - /** * Initiate the generation of the database hash table by sending a * SHOW DATABASES query to each valid backend server. This sets the session @@ -1494,7 +1387,8 @@ void SchemaRouterSession::query_databases() m_state |= INIT_MAPPING; m_state &= ~INIT_UNINT; - GWBUF *buffer = modutil_create_query("SHOW DATABASES"); + GWBUF *buffer = modutil_create_query + ("SELECT LOWER(CONCAT(table_schema, '.', table_name)) FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'performance_schema', 'mysql');"); gwbuf_set_type(buffer, GWBUF_TYPE_COLLECT_RESULT); for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) @@ -1507,40 +1401,7 @@ void SchemaRouterSession::query_databases() if (!(*it)->write(clone)) { - MXS_ERROR("Failed to write SHOW DATABASES to '%s'", - (*it)->backend()->server->name); - } - } - } - gwbuf_free(buffer); -} - -void SchemaRouterSession::query_tables() -{ - - for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) - { - (*it)->set_mapped(false); - } - - m_state |= INIT_MAPPING; - m_state &= ~INIT_UNINT; - - GWBUF *buffer = modutil_create_query - ("SELECT table_name FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'performance_schema', 'mysql');"); - gwbuf_set_type(buffer, GWBUF_TYPE_COLLECT_RESULT); - - for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++) - { - if ((*it)->in_use() && !(*it)->is_closed() & - SERVER_IS_RUNNING((*it)->backend()->server)) - { - GWBUF* clone = gwbuf_clone(buffer); - MXS_ABORT_IF_NULL(clone); - - if (!(*it)->write(clone)) - { - MXS_ERROR("Failed to select tables from '%s'", + MXS_ERROR("Failed to write mapping query to '%s'", (*it)->backend()->server->name); } } @@ -1562,7 +1423,6 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) if (mxs_mysql_get_command(buffer) == MXS_COM_QUERY) { - bool uses_current_database = false; int n_tables = 0; char** tables = qc_get_table_names(buffer, &n_tables, true); @@ -1570,54 +1430,50 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) { if (strchr(tables[i], '.') == NULL) { - uses_current_database = true; + rval = m_shard.get_location(m_current_db); + break; } - - MXS_FREE(tables[i]); - } - - MXS_FREE(tables); - - if (uses_current_database) - { - MXS_INFO("Query uses current database"); - return m_shard.get_location(m_current_db); } int n_databases = 0; char** databases = qc_get_database_names(buffer, &n_databases); - for (int i = 0; i < n_databases; i++) { - if (strcasecmp(databases[i], "information_schema") == 0 && rval == NULL) + for (int j = 0; j < n_tables; j++) { - has_dbs = false; - } - else - { - SERVER* target = m_shard.get_location(databases[i]); - + SERVER* target = m_shard.get_location(tables[j]); if (target) { + if (rval && target != rval) { - MXS_ERROR("Query targets databases on servers '%s' and '%s'. " - "Cross database queries across servers are not supported.", + MXS_ERROR("Query targets tables on servers '%s' and '%s'. " + "Cross server queries are not supported.", rval->name, target->name); } else if (rval == NULL) { rval = target; has_dbs = true; - MXS_INFO("Query targets database '%s' on server '%s'", - databases[i], rval->name); + MXS_INFO("Query targets table '%s' on server '%s'", + tables[j], rval->name); } } } + if (strcasecmp(databases[i], "information_schema") == 0 && rval == NULL) + { + has_dbs = false; + } + MXS_FREE(databases[i]); } + for (int i = 0; i < n_tables; i++) + { + MXS_FREE(tables[i]); + } + MXS_FREE(tables); MXS_FREE(databases); } @@ -1635,30 +1491,10 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) if (tok) { - rval = m_shard.get_location(tok); - - if (rval) - { - MXS_INFO("SHOW TABLES with specific database '%s' on server '%s'", tok, tmp); - } + send_tables(tok); } } MXS_FREE(query); - - if (rval == NULL) - { - rval = m_shard.get_location(m_current_db); - - if (rval) - { - MXS_INFO("SHOW TABLES query, current database '%s' on server '%s'", - m_current_db.c_str(), rval->name); - } - } - else - { - has_dbs = true; - } } else if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER) { @@ -1680,7 +1516,6 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) * If the target name has not been found and the session has an * active database, set is as the target */ - rval = m_shard.get_location(m_current_db); if (rval) @@ -1689,7 +1524,6 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) m_current_db.c_str(), rval->name); } } - return rval; } @@ -1781,11 +1615,11 @@ enum route_target get_shard_route_target(uint32_t qtype) RESULT_ROW *result_set_cb(struct resultset * rset, void *data) { RESULT_ROW *row = resultset_make_row(rset); - ServerMap* arr = (ServerMap*) data; + std::list* arr = (std::list*) data; if (row) { - if (arr->size() > 0 && resultset_row_set(row, 0, arr->begin()->first.c_str())) + if (arr->size() > 0 && resultset_row_set(row, 0, arr->begin()->c_str())) { arr->erase(arr->begin()); } @@ -1810,11 +1644,18 @@ RESULT_ROW *result_set_cb(struct resultset * rset, void *data) bool SchemaRouterSession::send_databases() { bool rval = false; - ServerMap dblist; + std::list list; m_shard.get_content(dblist); - - RESULTSET* resultset = resultset_create(result_set_cb, &dblist); + for (ServerMap::iterator it = dblist.begin(); it != dblist.end(); it++) + { + std::string db = it->first.substr(0, it->first.find(".")); + if (std::find(list.begin(), list.end(), db) == list.end()) + { + list.push_back(db); + } + } + RESULTSET* resultset = resultset_create(result_set_cb, &list); if (resultset_add_column(resultset, "Database", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR)) @@ -1827,4 +1668,33 @@ bool SchemaRouterSession::send_databases() return rval; } +bool SchemaRouterSession::send_tables(std::string database) +{ + bool rval = false; + ServerMap tablelist; + std::list list; + m_shard.get_content(tablelist); + + for (ServerMap::iterator it = tablelist.begin(); it != tablelist.end(); it++) + { + std::string db = it->first.substr(0, it->first.find(".")); + if (db.compare(database) == 0) + { + std::string table = it->first.substr(it->first.find(".")+1); + list.push_back(table); + } + } + RESULTSET* resultset = resultset_create(result_set_cb, &list); + + if (resultset_add_column(resultset, "Table", MYSQL_DATABASE_MAXLEN, + COL_TYPE_VARCHAR)) + { + resultset_stream_mysql(resultset, m_client); + rval = true; + } + resultset_free(resultset); + + return rval; +} + } \ No newline at end of file diff --git a/server/modules/routing/schemarouter/schemaroutersession.hh b/server/modules/routing/schemarouter/schemaroutersession.hh index b63b13033..deb9f2057 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.hh +++ b/server/modules/routing/schemarouter/schemaroutersession.hh @@ -140,6 +140,7 @@ private: /** Shard mapping functions */ bool send_databases(); bool send_shards(); + bool send_tables(std::string db); void query_databases(); int inspect_mapping_states(SSRBackend& bref, GWBUF** wbuf); enum showdb_response parse_mapping_response(SSRBackend& bref, GWBUF** buffer); @@ -147,13 +148,6 @@ private: void synchronize_shards(); void handle_mapping_reply(SSRBackend& bref, GWBUF** pPacket); - /** Table mapping functions */ - void query_tables(); - bool send_tables(); - enum showdb_response parse_table_mapping_response(SSRBackend& bref, GWBUF** buffer); - - - /** Member variables */ bool m_closed; /**< True if session closed */ DCB* m_client; /**< The client DCB */ diff --git a/server/modules/routing/schemarouter/shard_map.cc b/server/modules/routing/schemarouter/shard_map.cc index a9fd68212..4ef8561a9 100644 --- a/server/modules/routing/schemarouter/shard_map.cc +++ b/server/modules/routing/schemarouter/shard_map.cc @@ -32,50 +32,49 @@ bool Shard::add_location(std::string db, SERVER* target) return m_map.insert(std::make_pair(db, target)).second; } -bool Shard::add_table_location(std::string table, SERVER* target) -{ - std::transform(table.begin(), table.end(), table.begin(), ::tolower); - return m_table_map.insert(std::make_pair(table, target)).second; -} - void Shard::replace_location(std::string db, SERVER* target) { std::transform(db.begin(), db.end(), db.begin(), ::tolower); m_map[db] = target; } -SERVER* Shard::get_location(std::string db) -{ - SERVER* rval = NULL; - std::transform(db.begin(), db.end(), db.begin(), ::tolower); - ServerMap::iterator iter = m_map.find(db); - - if (iter != m_map.end()) - { - rval = iter->second; - } - - return rval; -} - -SERVER* Shard::get_table_location(std::string table) +SERVER* Shard::get_location(std::string table) { SERVER* rval = NULL; std::transform(table.begin(), table.end(), table.begin(), ::tolower); - std::size_t pos = table.find("."); - if(pos != std::string::npos) + + if (table.find(".") == std::string::npos) { - table = table.substr(pos + 1); + for (ServerMap::iterator it = m_map.begin(); it != m_map.end(); it++) + { + std::string db = it->first.substr(0, it->first.find(".")); + if (db.compare(table) == 0) + { + if ((rval && rval != it->second)) + { + MXS_DEBUG("There are 2 databases with same name on a different servers: '%s' and '%s'. Connecting to '%s'" + , rval->name,it->second->name, rval->name); + break; + } + else + { + rval = it->second; + } + } + } } - ServerMap::iterator iter = m_table_map.find(table); - if (iter != m_table_map.end()) + else { - rval = iter->second; + ServerMap::iterator iter = m_map.find(table); + + if (iter != m_map.end()) + { + rval = iter->second; + } } return rval; } - bool Shard::stale(double max_interval) const { time_t now = time(NULL); @@ -88,11 +87,6 @@ bool Shard::empty() const return m_map.size() == 0; } -bool Shard::tables_empty() const -{ - return m_table_map.size() == 0; -} - void Shard::get_content(ServerMap& dest) { for (ServerMap::iterator it = m_map.begin(); it != m_map.end(); it++) diff --git a/server/modules/routing/schemarouter/shard_map.hh b/server/modules/routing/schemarouter/shard_map.hh index 15af662d2..be30744f4 100644 --- a/server/modules/routing/schemarouter/shard_map.hh +++ b/server/modules/routing/schemarouter/shard_map.hh @@ -42,16 +42,6 @@ public: */ bool add_location(std::string db, SERVER* target); - /** - * @brief Add a table location - * - * @param table table to add - * @param target Target where database is located - * - * @return True if location was added - */ - bool add_table_location(std::string table, SERVER* target); - /** * @brief Retrieve the location of a database * @@ -61,16 +51,6 @@ public: */ SERVER* get_location(std::string db); - /** - * @brief Retrieve the location of a table - * - * @param table table to locate - * - * @return The server or NULL if no server contains the database - */ - SERVER* get_table_location(std::string table); - - /** * @brief Change the location of a database * @@ -95,8 +75,6 @@ public: */ bool empty() const; - bool tables_empty() const; - /** * @brief Retrieve all database to server mappings * @@ -115,7 +93,6 @@ public: private: ServerMap m_map; - ServerMap m_table_map; time_t m_last_updated; };