MXS-1849 Combine table and database mapping
Previously schemarouter only mapped databases to the servers they were resided on. Now all the tables are also mapped to allow the router to route queries to the right server based on the tables used in that query.
This commit is contained in:
@ -1255,9 +1255,6 @@ bool SchemaRouterSession::ignore_duplicate_database(const char* data)
|
||||
*/
|
||||
enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bref, GWBUF** buffer)
|
||||
{
|
||||
unsigned char* ptr;
|
||||
SERVER* target = bref->backend()->server;
|
||||
GWBUF* buf;
|
||||
bool duplicate_found = false;
|
||||
enum showdb_response rval = SHOWDB_PARTIAL_RESPONSE;
|
||||
|
||||
@ -1268,7 +1265,8 @@ enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bre
|
||||
|
||||
/** TODO: Don't make the buffer contiguous but process it as a buffer chain */
|
||||
*buffer = gwbuf_make_contiguous(*buffer);
|
||||
buf = modutil_get_complete_packets(buffer);
|
||||
MXS_ABORT_IF_NULL(*buffer);
|
||||
GWBUF* buf = modutil_get_complete_packets(buffer);
|
||||
|
||||
if (buf == NULL)
|
||||
{
|
||||
@ -1276,11 +1274,11 @@ enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bre
|
||||
}
|
||||
int n_eof = 0;
|
||||
|
||||
ptr = (unsigned char*) buf->start;
|
||||
uint8_t* ptr = (uint8_t*) buf->start;
|
||||
|
||||
if (PTR_IS_ERR(ptr))
|
||||
{
|
||||
MXS_INFO("SHOW DATABASES returned an error.");
|
||||
MXS_INFO("Mapping query returned an error.");
|
||||
gwbuf_free(buf);
|
||||
return SHOWDB_FATAL_ERROR;
|
||||
}
|
||||
@ -1288,14 +1286,14 @@ enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bre
|
||||
if (n_eof == 0)
|
||||
{
|
||||
/** Skip column definitions */
|
||||
while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr))
|
||||
while (ptr < (uint8_t*) buf->end && !PTR_IS_EOF(ptr))
|
||||
{
|
||||
ptr += gw_mysql_get_byte3(ptr) + 4;
|
||||
}
|
||||
|
||||
if (ptr >= (unsigned char*) buf->end)
|
||||
if (ptr >= (uint8_t*) buf->end)
|
||||
{
|
||||
MXS_INFO("Malformed packet for SHOW DATABASES.");
|
||||
MXS_INFO("Malformed packet for mapping query.");
|
||||
*buffer = gwbuf_append(buf, *buffer);
|
||||
return SHOWDB_FATAL_ERROR;
|
||||
}
|
||||
@ -1305,11 +1303,12 @@ enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bre
|
||||
ptr += gw_mysql_get_byte3(ptr) + 4;
|
||||
}
|
||||
|
||||
while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr))
|
||||
while (ptr < (uint8_t*) buf->end && !PTR_IS_EOF(ptr))
|
||||
{
|
||||
int payloadlen = gw_mysql_get_byte3(ptr);
|
||||
int packetlen = payloadlen + 4;
|
||||
char* data = get_lenenc_str(ptr + 4);
|
||||
SERVER* target = bref->backend()->server;
|
||||
|
||||
if (data)
|
||||
{
|
||||
@ -1368,112 +1367,6 @@ enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bre
|
||||
return rval;
|
||||
}
|
||||
|
||||
enum showdb_response SchemaRouterSession::parse_table_mapping_response(SSRBackend& bref, GWBUF** buffer)
|
||||
{
|
||||
unsigned char* ptr;
|
||||
SERVER* target = bref->backend()->server;
|
||||
GWBUF* buf;
|
||||
bool duplicate_found = false;
|
||||
enum showdb_response rval = SHOWDB_PARTIAL_RESPONSE;
|
||||
if (buffer == NULL || *buffer == NULL)
|
||||
{
|
||||
return SHOWDB_FATAL_ERROR;
|
||||
}
|
||||
|
||||
/** TODO: Don't make the buffer contiguous but process it as a buffer chain */
|
||||
*buffer = gwbuf_make_contiguous(*buffer);
|
||||
buf = modutil_get_complete_packets(buffer);
|
||||
|
||||
if (buf == NULL)
|
||||
{
|
||||
return SHOWDB_PARTIAL_RESPONSE;
|
||||
}
|
||||
int n_eof = 0;
|
||||
|
||||
ptr = (unsigned char*) buf->start;
|
||||
|
||||
if (PTR_IS_ERR(ptr))
|
||||
{
|
||||
MXS_INFO("Selecting tables returned an error.");
|
||||
gwbuf_free(buf);
|
||||
return SHOWDB_FATAL_ERROR;
|
||||
}
|
||||
|
||||
if (n_eof == 0)
|
||||
{
|
||||
/** Skip column definitions */
|
||||
while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr))
|
||||
{
|
||||
ptr += gw_mysql_get_byte3(ptr) + 4;
|
||||
}
|
||||
|
||||
if (ptr >= (unsigned char*) buf->end)
|
||||
{
|
||||
MXS_INFO("Malformed packet for select tables query.");
|
||||
*buffer = gwbuf_append(buf, *buffer);
|
||||
return SHOWDB_FATAL_ERROR;
|
||||
}
|
||||
|
||||
n_eof++;
|
||||
/** Skip first EOF packet */
|
||||
ptr += gw_mysql_get_byte3(ptr) + 4;
|
||||
}
|
||||
|
||||
while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr))
|
||||
{
|
||||
int payloadlen = gw_mysql_get_byte3(ptr);
|
||||
int packetlen = payloadlen + 4;
|
||||
char* data = get_lenenc_str(ptr + 4);
|
||||
MXS_INFO("<%s, %s>", target->name, data);
|
||||
|
||||
if (data)
|
||||
{
|
||||
if (m_shard.add_table_location(data, target))
|
||||
{
|
||||
MXS_INFO("<%s, %s>", target->name, data);
|
||||
}
|
||||
else
|
||||
{
|
||||
duplicate_found = true;
|
||||
SERVER *duplicate = m_shard.get_table_location(data);
|
||||
|
||||
MXS_ERROR("Table '%s' found on servers '%s' and '%s' for user %s@%s.",
|
||||
data, target->name, duplicate->name,
|
||||
m_client->user, m_client->remote);
|
||||
|
||||
}
|
||||
MXS_FREE(data);
|
||||
}
|
||||
ptr += packetlen;
|
||||
}
|
||||
|
||||
if (ptr < (unsigned char*) buf->end && PTR_IS_EOF(ptr) && n_eof == 1)
|
||||
{
|
||||
n_eof++;
|
||||
MXS_INFO("Select tables query fully received from %s.",
|
||||
bref->backend()->server->name);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_INFO("Select tables query partially received from %s.",
|
||||
bref->backend()->server->name);
|
||||
}
|
||||
|
||||
gwbuf_free(buf);
|
||||
|
||||
if (duplicate_found)
|
||||
{
|
||||
rval = SHOWDB_DUPLICATE_DATABASES;
|
||||
}
|
||||
else if (n_eof == 2)
|
||||
{
|
||||
rval = SHOWDB_FULL_RESPONSE;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Initiate the generation of the database hash table by sending a
|
||||
* SHOW DATABASES query to each valid backend server. This sets the session
|
||||
@ -1494,7 +1387,8 @@ void SchemaRouterSession::query_databases()
|
||||
m_state |= INIT_MAPPING;
|
||||
m_state &= ~INIT_UNINT;
|
||||
|
||||
GWBUF *buffer = modutil_create_query("SHOW DATABASES");
|
||||
GWBUF *buffer = modutil_create_query
|
||||
("SELECT LOWER(CONCAT(table_schema, '.', table_name)) FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'performance_schema', 'mysql');");
|
||||
gwbuf_set_type(buffer, GWBUF_TYPE_COLLECT_RESULT);
|
||||
|
||||
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
|
||||
@ -1507,40 +1401,7 @@ void SchemaRouterSession::query_databases()
|
||||
|
||||
if (!(*it)->write(clone))
|
||||
{
|
||||
MXS_ERROR("Failed to write SHOW DATABASES to '%s'",
|
||||
(*it)->backend()->server->name);
|
||||
}
|
||||
}
|
||||
}
|
||||
gwbuf_free(buffer);
|
||||
}
|
||||
|
||||
void SchemaRouterSession::query_tables()
|
||||
{
|
||||
|
||||
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
|
||||
{
|
||||
(*it)->set_mapped(false);
|
||||
}
|
||||
|
||||
m_state |= INIT_MAPPING;
|
||||
m_state &= ~INIT_UNINT;
|
||||
|
||||
GWBUF *buffer = modutil_create_query
|
||||
("SELECT table_name FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'performance_schema', 'mysql');");
|
||||
gwbuf_set_type(buffer, GWBUF_TYPE_COLLECT_RESULT);
|
||||
|
||||
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
|
||||
{
|
||||
if ((*it)->in_use() && !(*it)->is_closed() &
|
||||
SERVER_IS_RUNNING((*it)->backend()->server))
|
||||
{
|
||||
GWBUF* clone = gwbuf_clone(buffer);
|
||||
MXS_ABORT_IF_NULL(clone);
|
||||
|
||||
if (!(*it)->write(clone))
|
||||
{
|
||||
MXS_ERROR("Failed to select tables from '%s'",
|
||||
MXS_ERROR("Failed to write mapping query to '%s'",
|
||||
(*it)->backend()->server->name);
|
||||
}
|
||||
}
|
||||
@ -1562,7 +1423,6 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
|
||||
|
||||
if (mxs_mysql_get_command(buffer) == MXS_COM_QUERY)
|
||||
{
|
||||
bool uses_current_database = false;
|
||||
int n_tables = 0;
|
||||
char** tables = qc_get_table_names(buffer, &n_tables, true);
|
||||
|
||||
@ -1570,54 +1430,50 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
|
||||
{
|
||||
if (strchr(tables[i], '.') == NULL)
|
||||
{
|
||||
uses_current_database = true;
|
||||
rval = m_shard.get_location(m_current_db);
|
||||
break;
|
||||
}
|
||||
|
||||
MXS_FREE(tables[i]);
|
||||
}
|
||||
|
||||
MXS_FREE(tables);
|
||||
|
||||
if (uses_current_database)
|
||||
{
|
||||
MXS_INFO("Query uses current database");
|
||||
return m_shard.get_location(m_current_db);
|
||||
}
|
||||
|
||||
int n_databases = 0;
|
||||
char** databases = qc_get_database_names(buffer, &n_databases);
|
||||
|
||||
for (int i = 0; i < n_databases; i++)
|
||||
{
|
||||
if (strcasecmp(databases[i], "information_schema") == 0 && rval == NULL)
|
||||
for (int j = 0; j < n_tables; j++)
|
||||
{
|
||||
has_dbs = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
SERVER* target = m_shard.get_location(databases[i]);
|
||||
|
||||
SERVER* target = m_shard.get_location(tables[j]);
|
||||
if (target)
|
||||
{
|
||||
|
||||
if (rval && target != rval)
|
||||
{
|
||||
MXS_ERROR("Query targets databases on servers '%s' and '%s'. "
|
||||
"Cross database queries across servers are not supported.",
|
||||
MXS_ERROR("Query targets tables on servers '%s' and '%s'. "
|
||||
"Cross server queries are not supported.",
|
||||
rval->name, target->name);
|
||||
}
|
||||
else if (rval == NULL)
|
||||
{
|
||||
rval = target;
|
||||
has_dbs = true;
|
||||
MXS_INFO("Query targets database '%s' on server '%s'",
|
||||
databases[i], rval->name);
|
||||
MXS_INFO("Query targets table '%s' on server '%s'",
|
||||
tables[j], rval->name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (strcasecmp(databases[i], "information_schema") == 0 && rval == NULL)
|
||||
{
|
||||
has_dbs = false;
|
||||
}
|
||||
|
||||
MXS_FREE(databases[i]);
|
||||
}
|
||||
|
||||
for (int i = 0; i < n_tables; i++)
|
||||
{
|
||||
MXS_FREE(tables[i]);
|
||||
}
|
||||
MXS_FREE(tables);
|
||||
MXS_FREE(databases);
|
||||
}
|
||||
|
||||
@ -1635,30 +1491,10 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
|
||||
|
||||
if (tok)
|
||||
{
|
||||
rval = m_shard.get_location(tok);
|
||||
|
||||
if (rval)
|
||||
{
|
||||
MXS_INFO("SHOW TABLES with specific database '%s' on server '%s'", tok, tmp);
|
||||
}
|
||||
send_tables(tok);
|
||||
}
|
||||
}
|
||||
MXS_FREE(query);
|
||||
|
||||
if (rval == NULL)
|
||||
{
|
||||
rval = m_shard.get_location(m_current_db);
|
||||
|
||||
if (rval)
|
||||
{
|
||||
MXS_INFO("SHOW TABLES query, current database '%s' on server '%s'",
|
||||
m_current_db.c_str(), rval->name);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
has_dbs = true;
|
||||
}
|
||||
}
|
||||
else if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER)
|
||||
{
|
||||
@ -1680,7 +1516,6 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
|
||||
* If the target name has not been found and the session has an
|
||||
* active database, set is as the target
|
||||
*/
|
||||
|
||||
rval = m_shard.get_location(m_current_db);
|
||||
|
||||
if (rval)
|
||||
@ -1689,7 +1524,6 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
|
||||
m_current_db.c_str(), rval->name);
|
||||
}
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
@ -1781,11 +1615,11 @@ enum route_target get_shard_route_target(uint32_t qtype)
|
||||
RESULT_ROW *result_set_cb(struct resultset * rset, void *data)
|
||||
{
|
||||
RESULT_ROW *row = resultset_make_row(rset);
|
||||
ServerMap* arr = (ServerMap*) data;
|
||||
std::list<std::string>* arr = (std::list<std::string>*) data;
|
||||
|
||||
if (row)
|
||||
{
|
||||
if (arr->size() > 0 && resultset_row_set(row, 0, arr->begin()->first.c_str()))
|
||||
if (arr->size() > 0 && resultset_row_set(row, 0, arr->begin()->c_str()))
|
||||
{
|
||||
arr->erase(arr->begin());
|
||||
}
|
||||
@ -1810,11 +1644,18 @@ RESULT_ROW *result_set_cb(struct resultset * rset, void *data)
|
||||
bool SchemaRouterSession::send_databases()
|
||||
{
|
||||
bool rval = false;
|
||||
|
||||
ServerMap dblist;
|
||||
std::list<std::string> list;
|
||||
m_shard.get_content(dblist);
|
||||
|
||||
RESULTSET* resultset = resultset_create(result_set_cb, &dblist);
|
||||
for (ServerMap::iterator it = dblist.begin(); it != dblist.end(); it++)
|
||||
{
|
||||
std::string db = it->first.substr(0, it->first.find("."));
|
||||
if (std::find(list.begin(), list.end(), db) == list.end())
|
||||
{
|
||||
list.push_back(db);
|
||||
}
|
||||
}
|
||||
RESULTSET* resultset = resultset_create(result_set_cb, &list);
|
||||
|
||||
if (resultset_add_column(resultset, "Database", MYSQL_DATABASE_MAXLEN,
|
||||
COL_TYPE_VARCHAR))
|
||||
@ -1827,4 +1668,33 @@ bool SchemaRouterSession::send_databases()
|
||||
return rval;
|
||||
}
|
||||
|
||||
bool SchemaRouterSession::send_tables(std::string database)
|
||||
{
|
||||
bool rval = false;
|
||||
ServerMap tablelist;
|
||||
std::list<std::string> list;
|
||||
m_shard.get_content(tablelist);
|
||||
|
||||
for (ServerMap::iterator it = tablelist.begin(); it != tablelist.end(); it++)
|
||||
{
|
||||
std::string db = it->first.substr(0, it->first.find("."));
|
||||
if (db.compare(database) == 0)
|
||||
{
|
||||
std::string table = it->first.substr(it->first.find(".")+1);
|
||||
list.push_back(table);
|
||||
}
|
||||
}
|
||||
RESULTSET* resultset = resultset_create(result_set_cb, &list);
|
||||
|
||||
if (resultset_add_column(resultset, "Table", MYSQL_DATABASE_MAXLEN,
|
||||
COL_TYPE_VARCHAR))
|
||||
{
|
||||
resultset_stream_mysql(resultset, m_client);
|
||||
rval = true;
|
||||
}
|
||||
resultset_free(resultset);
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user