MXS-1849 Add functions for mapping tables to servers
This commit is contained in:
@ -1368,6 +1368,112 @@ enum showdb_response SchemaRouterSession::parse_mapping_response(SSRBackend& bre
|
||||
return rval;
|
||||
}
|
||||
|
||||
enum showdb_response SchemaRouterSession::parse_table_mapping_response(SSRBackend& bref, GWBUF** buffer)
|
||||
{
|
||||
unsigned char* ptr;
|
||||
SERVER* target = bref->backend()->server;
|
||||
GWBUF* buf;
|
||||
bool duplicate_found = false;
|
||||
enum showdb_response rval = SHOWDB_PARTIAL_RESPONSE;
|
||||
if (buffer == NULL || *buffer == NULL)
|
||||
{
|
||||
return SHOWDB_FATAL_ERROR;
|
||||
}
|
||||
|
||||
/** TODO: Don't make the buffer contiguous but process it as a buffer chain */
|
||||
*buffer = gwbuf_make_contiguous(*buffer);
|
||||
buf = modutil_get_complete_packets(buffer);
|
||||
|
||||
if (buf == NULL)
|
||||
{
|
||||
return SHOWDB_PARTIAL_RESPONSE;
|
||||
}
|
||||
int n_eof = 0;
|
||||
|
||||
ptr = (unsigned char*) buf->start;
|
||||
|
||||
if (PTR_IS_ERR(ptr))
|
||||
{
|
||||
MXS_INFO("Selecting tables returned an error.");
|
||||
gwbuf_free(buf);
|
||||
return SHOWDB_FATAL_ERROR;
|
||||
}
|
||||
|
||||
if (n_eof == 0)
|
||||
{
|
||||
/** Skip column definitions */
|
||||
while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr))
|
||||
{
|
||||
ptr += gw_mysql_get_byte3(ptr) + 4;
|
||||
}
|
||||
|
||||
if (ptr >= (unsigned char*) buf->end)
|
||||
{
|
||||
MXS_INFO("Malformed packet for select tables query.");
|
||||
*buffer = gwbuf_append(buf, *buffer);
|
||||
return SHOWDB_FATAL_ERROR;
|
||||
}
|
||||
|
||||
n_eof++;
|
||||
/** Skip first EOF packet */
|
||||
ptr += gw_mysql_get_byte3(ptr) + 4;
|
||||
}
|
||||
|
||||
while (ptr < (unsigned char*) buf->end && !PTR_IS_EOF(ptr))
|
||||
{
|
||||
int payloadlen = gw_mysql_get_byte3(ptr);
|
||||
int packetlen = payloadlen + 4;
|
||||
char* data = get_lenenc_str(ptr + 4);
|
||||
MXS_INFO("<%s, %s>", target->name, data);
|
||||
|
||||
if (data)
|
||||
{
|
||||
if (m_shard.add_table_location(data, target))
|
||||
{
|
||||
MXS_INFO("<%s, %s>", target->name, data);
|
||||
}
|
||||
else
|
||||
{
|
||||
duplicate_found = true;
|
||||
SERVER *duplicate = m_shard.get_table_location(data);
|
||||
|
||||
MXS_ERROR("Table '%s' found on servers '%s' and '%s' for user %s@%s.",
|
||||
data, target->name, duplicate->name,
|
||||
m_client->user, m_client->remote);
|
||||
|
||||
}
|
||||
MXS_FREE(data);
|
||||
}
|
||||
ptr += packetlen;
|
||||
}
|
||||
|
||||
if (ptr < (unsigned char*) buf->end && PTR_IS_EOF(ptr) && n_eof == 1)
|
||||
{
|
||||
n_eof++;
|
||||
MXS_INFO("Select tables query fully received from %s.",
|
||||
bref->backend()->server->name);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_INFO("Select tables query partially received from %s.",
|
||||
bref->backend()->server->name);
|
||||
}
|
||||
|
||||
gwbuf_free(buf);
|
||||
|
||||
if (duplicate_found)
|
||||
{
|
||||
rval = SHOWDB_DUPLICATE_DATABASES;
|
||||
}
|
||||
else if (n_eof == 2)
|
||||
{
|
||||
rval = SHOWDB_FULL_RESPONSE;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Initiate the generation of the database hash table by sending a
|
||||
* SHOW DATABASES query to each valid backend server. This sets the session
|
||||
@ -1409,6 +1515,39 @@ void SchemaRouterSession::query_databases()
|
||||
gwbuf_free(buffer);
|
||||
}
|
||||
|
||||
void SchemaRouterSession::query_tables()
|
||||
{
|
||||
|
||||
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
|
||||
{
|
||||
(*it)->set_mapped(false);
|
||||
}
|
||||
|
||||
m_state |= INIT_MAPPING;
|
||||
m_state &= ~INIT_UNINT;
|
||||
|
||||
GWBUF *buffer = modutil_create_query
|
||||
("SELECT table_name FROM information_schema.tables WHERE table_schema NOT IN ('information_schema', 'performance_schema', 'mysql');");
|
||||
gwbuf_set_type(buffer, GWBUF_TYPE_COLLECT_RESULT);
|
||||
|
||||
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
|
||||
{
|
||||
if ((*it)->in_use() && !(*it)->is_closed() &
|
||||
SERVER_IS_RUNNING((*it)->backend()->server))
|
||||
{
|
||||
GWBUF* clone = gwbuf_clone(buffer);
|
||||
MXS_ABORT_IF_NULL(clone);
|
||||
|
||||
if (!(*it)->write(clone))
|
||||
{
|
||||
MXS_ERROR("Failed to select tables from '%s'",
|
||||
(*it)->backend()->server->name);
|
||||
}
|
||||
}
|
||||
}
|
||||
gwbuf_free(buffer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check the hashtable for the right backend for this query.
|
||||
* @param router Router instance
|
||||
|
@ -147,6 +147,13 @@ private:
|
||||
void synchronize_shards();
|
||||
void handle_mapping_reply(SSRBackend& bref, GWBUF** pPacket);
|
||||
|
||||
/** Table mapping functions */
|
||||
void query_tables();
|
||||
bool send_tables();
|
||||
enum showdb_response parse_table_mapping_response(SSRBackend& bref, GWBUF** buffer);
|
||||
|
||||
|
||||
|
||||
/** Member variables */
|
||||
bool m_closed; /**< True if session closed */
|
||||
DCB* m_client; /**< The client DCB */
|
||||
|
@ -32,6 +32,12 @@ bool Shard::add_location(std::string db, SERVER* target)
|
||||
return m_map.insert(std::make_pair(db, target)).second;
|
||||
}
|
||||
|
||||
bool Shard::add_table_location(std::string table, SERVER* target)
|
||||
{
|
||||
std::transform(table.begin(), table.end(), table.begin(), ::tolower);
|
||||
return m_table_map.insert(std::make_pair(table, target)).second;
|
||||
}
|
||||
|
||||
void Shard::replace_location(std::string db, SERVER* target)
|
||||
{
|
||||
std::transform(db.begin(), db.end(), db.begin(), ::tolower);
|
||||
@ -52,6 +58,24 @@ SERVER* Shard::get_location(std::string db)
|
||||
return rval;
|
||||
}
|
||||
|
||||
SERVER* Shard::get_table_location(std::string table)
|
||||
{
|
||||
SERVER* rval = NULL;
|
||||
std::transform(table.begin(), table.end(), table.begin(), ::tolower);
|
||||
std::size_t pos = table.find(".");
|
||||
if(pos != std::string::npos)
|
||||
{
|
||||
table = table.substr(pos + 1);
|
||||
}
|
||||
ServerMap::iterator iter = m_table_map.find(table);
|
||||
if (iter != m_table_map.end())
|
||||
{
|
||||
rval = iter->second;
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
|
||||
bool Shard::stale(double max_interval) const
|
||||
{
|
||||
time_t now = time(NULL);
|
||||
@ -64,6 +88,11 @@ bool Shard::empty() const
|
||||
return m_map.size() == 0;
|
||||
}
|
||||
|
||||
bool Shard::tables_empty() const
|
||||
{
|
||||
return m_table_map.size() == 0;
|
||||
}
|
||||
|
||||
void Shard::get_content(ServerMap& dest)
|
||||
{
|
||||
for (ServerMap::iterator it = m_map.begin(); it != m_map.end(); it++)
|
||||
|
@ -42,6 +42,16 @@ public:
|
||||
*/
|
||||
bool add_location(std::string db, SERVER* target);
|
||||
|
||||
/**
|
||||
* @brief Add a table location
|
||||
*
|
||||
* @param table table to add
|
||||
* @param target Target where database is located
|
||||
*
|
||||
* @return True if location was added
|
||||
*/
|
||||
bool add_table_location(std::string table, SERVER* target);
|
||||
|
||||
/**
|
||||
* @brief Retrieve the location of a database
|
||||
*
|
||||
@ -51,6 +61,16 @@ public:
|
||||
*/
|
||||
SERVER* get_location(std::string db);
|
||||
|
||||
/**
|
||||
* @brief Retrieve the location of a table
|
||||
*
|
||||
* @param table table to locate
|
||||
*
|
||||
* @return The server or NULL if no server contains the database
|
||||
*/
|
||||
SERVER* get_table_location(std::string table);
|
||||
|
||||
|
||||
/**
|
||||
* @brief Change the location of a database
|
||||
*
|
||||
@ -75,6 +95,8 @@ public:
|
||||
*/
|
||||
bool empty() const;
|
||||
|
||||
bool tables_empty() const;
|
||||
|
||||
/**
|
||||
* @brief Retrieve all database to server mappings
|
||||
*
|
||||
@ -93,6 +115,7 @@ public:
|
||||
|
||||
private:
|
||||
ServerMap m_map;
|
||||
ServerMap m_table_map;
|
||||
time_t m_last_updated;
|
||||
};
|
||||
|
||||
|
Reference in New Issue
Block a user