MXS-1113 Add support for prepared statements in schemarouter

Implement handling of the text protocol part of the prepared statements
in schemarouter.
This commit is contained in:
Marko 2018-07-29 14:21:55 +03:00
parent bfe5bcd7a7
commit adbc3a6749
4 changed files with 80 additions and 3 deletions

View File

@ -1408,9 +1408,11 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
{
SERVER *rval = NULL;
bool has_dbs = false; /**If the query targets any database other than the current one*/
qc_query_op_t op = QUERY_OP_UNDEFINED;
if (mxs_mysql_get_command(buffer) == MXS_COM_QUERY)
{
op = qc_get_operation(buffer);
int n_tables = 0;
char** tables = qc_get_table_names(buffer, &n_tables, true);
@ -1465,6 +1467,55 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
MXS_FREE(databases);
}
if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT))
{
GWBUF* pStmt = qc_get_preparable_stmt(buffer);
int n_tables = 0;
char** tables = qc_get_table_names(pStmt, &n_tables, true);
char* stmt = qc_get_prepare_name(buffer);
for (int i = 0; i < n_tables; i++)
{
SERVER* target = m_shard.get_location(tables[i]);
if (target)
{
if (rval && target != rval)
{
MXS_ERROR("Statement targets tables on servers '%s' and '%s'. "
"Cross server queries are not supported.",
rval->name, target->name);
}
else if (rval == NULL)
{
rval = target;
}
}
MXS_FREE(tables[i]);
}
if (rval)
{
m_shard.add_statement(stmt, rval);
}
MXS_FREE(tables);
MXS_FREE(stmt);
}
else if (op == QUERY_OP_EXECUTE)
{
char* stmt = qc_get_prepare_name(buffer);
rval = m_shard.get_statement(stmt);
MXS_FREE(stmt);
}
else if (qc_query_is_type(qtype, QUERY_TYPE_DEALLOC_PREPARE))
{
char* stmt = qc_get_prepare_name(buffer);
if ((rval = m_shard.get_statement(stmt)))
{
m_shard.remove_statement(stmt);
}
MXS_FREE(stmt);
}
if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER)
{
for (SSRBackendList::iterator it = m_backends.begin(); it != m_backends.end(); it++)
@ -1557,8 +1608,6 @@ enum route_target get_shard_route_target(uint32_t qtype)
if (qc_query_is_type(qtype, QUERY_TYPE_SESSION_WRITE) ||
qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_WRITE) ||
qc_query_is_type(qtype, QUERY_TYPE_USERVAR_WRITE) ||
qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT) ||
qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT) ||
qc_query_is_type(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) ||
qc_query_is_type(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT))
{

View File

@ -109,7 +109,7 @@ public:
/**
*
* @param pMessage The rror message.
* @param pMessage The error message.
* @param pProblem The DCB on which the error occurred.
* @param action The context.
* @param pSuccess On output, if false, the session will be terminated.

View File

@ -31,6 +31,11 @@ bool Shard::add_location(std::string db, SERVER* target)
return m_map.insert(std::make_pair(db, target)).second;
}
void Shard::add_statement(std::string stmt, SERVER* target)
{
stmt_map[stmt] = target;
}
void Shard::replace_location(std::string db, SERVER* target)
{
m_map[db] = target;
@ -78,6 +83,22 @@ SERVER* Shard::get_location(std::string table)
return rval;
}
SERVER* Shard::get_statement(std::string stmt)
{
SERVER* rval = NULL;
ServerMap::iterator iter = stmt_map.find(stmt);
if(iter != stmt_map.end())
{
rval = iter->second;
}
return rval;
}
bool Shard::remove_statement(std::string stmt)
{
return stmt_map.erase(stmt);
}
bool Shard::stale(double max_interval) const
{
time_t now = time(NULL);

View File

@ -51,6 +51,12 @@ public:
*/
SERVER* get_location(std::string db);
void add_statement(std::string stmt, SERVER* target);
SERVER* get_statement(std::string stmt);
bool remove_statement(std::string stmt);
/**
* @brief Change the location of a database
*
@ -93,6 +99,7 @@ public:
private:
ServerMap m_map;
ServerMap stmt_map;
time_t m_last_updated;
};