Clean up get_shard_target in schemarouter
get_shard_target had become little bloated with the recent changes so some routing cases were moved to their own functions. Also removed some code that was not needed.
This commit is contained in:
@ -1409,144 +1409,22 @@ void SchemaRouterSession::query_databases()
|
||||
SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
|
||||
{
|
||||
SERVER *rval = NULL;
|
||||
bool has_dbs = false; /**If the query targets any database other than the current one*/
|
||||
qc_query_op_t op = QUERY_OP_UNDEFINED;
|
||||
uint8_t command = mxs_mysql_get_command(buffer);
|
||||
|
||||
if (command == MXS_COM_QUERY)
|
||||
{
|
||||
op = qc_get_operation(buffer);
|
||||
int n_tables = 0;
|
||||
char** tables = qc_get_table_names(buffer, &n_tables, true);
|
||||
|
||||
for (int i = 0; i < n_tables; i++)
|
||||
{
|
||||
if (strchr(tables[i], '.') == NULL)
|
||||
{
|
||||
rval = m_shard.get_location(m_current_db);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int n_databases = 0;
|
||||
char** databases = qc_get_database_names(buffer, &n_databases);
|
||||
for (int i = 0; i < n_databases; i++)
|
||||
{
|
||||
for (int j = 0; j < n_tables; j++)
|
||||
{
|
||||
SERVER* target = m_shard.get_location(tables[j]);
|
||||
if (target)
|
||||
{
|
||||
|
||||
if (rval && target != rval)
|
||||
{
|
||||
MXS_ERROR("Query targets tables on servers '%s' and '%s'. "
|
||||
"Cross server queries are not supported.",
|
||||
rval->name, target->name);
|
||||
}
|
||||
else if (rval == NULL)
|
||||
{
|
||||
rval = target;
|
||||
has_dbs = true;
|
||||
MXS_INFO("Query targets table '%s' on server '%s'",
|
||||
tables[j], rval->name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (strcasecmp(databases[i], "information_schema") == 0 && rval == NULL)
|
||||
{
|
||||
has_dbs = false;
|
||||
}
|
||||
|
||||
MXS_FREE(databases[i]);
|
||||
}
|
||||
|
||||
for (int i = 0; i < n_tables; i++)
|
||||
{
|
||||
MXS_FREE(tables[i]);
|
||||
}
|
||||
MXS_FREE(tables);
|
||||
MXS_FREE(databases);
|
||||
rval = get_query_target(buffer);
|
||||
}
|
||||
|
||||
if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT))
|
||||
if (mxs_mysql_is_ps_command(command) ||
|
||||
qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT) ||
|
||||
qc_query_is_type(qtype, QUERY_TYPE_DEALLOC_PREPARE) ||
|
||||
qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT) ||
|
||||
op == QUERY_OP_EXECUTE)
|
||||
{
|
||||
GWBUF* pStmt = qc_get_preparable_stmt(buffer);
|
||||
int n_tables = 0;
|
||||
char** tables = qc_get_table_names(pStmt, &n_tables, true);
|
||||
char* stmt = qc_get_prepare_name(buffer);
|
||||
for (int i = 0; i < n_tables; i++)
|
||||
{
|
||||
SERVER* target = m_shard.get_location(tables[i]);
|
||||
if (target)
|
||||
{
|
||||
|
||||
if (rval && target != rval)
|
||||
{
|
||||
MXS_ERROR("Statement targets tables on servers '%s' and '%s'. "
|
||||
"Cross server queries are not supported.",
|
||||
rval->name, target->name);
|
||||
}
|
||||
else if (rval == NULL)
|
||||
{
|
||||
rval = target;
|
||||
}
|
||||
}
|
||||
MXS_FREE(tables[i]);
|
||||
}
|
||||
|
||||
if (rval)
|
||||
{
|
||||
MXS_INFO("PREPARING NAMED %s ON SERVER %s", stmt, rval->name);
|
||||
m_shard.add_statement(stmt, rval);
|
||||
}
|
||||
MXS_FREE(tables);
|
||||
MXS_FREE(stmt);
|
||||
}
|
||||
else if (op == QUERY_OP_EXECUTE)
|
||||
{
|
||||
char* stmt = qc_get_prepare_name(buffer);
|
||||
rval = m_shard.get_statement(stmt);
|
||||
MXS_INFO("EXECUTING NAMED %s ON SERVER %s", stmt, rval->name);
|
||||
MXS_FREE(stmt);
|
||||
}
|
||||
else if (qc_query_is_type(qtype, QUERY_TYPE_DEALLOC_PREPARE))
|
||||
{
|
||||
char* stmt = qc_get_prepare_name(buffer);
|
||||
if ((rval = m_shard.get_statement(stmt)))
|
||||
{
|
||||
MXS_INFO("DEALLOCING NAMED %s ON SERVER %s", stmt, rval->name);
|
||||
m_shard.remove_statement(stmt);
|
||||
}
|
||||
MXS_FREE(stmt);
|
||||
}
|
||||
else if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT))
|
||||
{
|
||||
int n_tables = 0;
|
||||
char** tables = qc_get_table_names(buffer, &n_tables, true);
|
||||
for (int i = 0; i < n_tables; i++)
|
||||
{
|
||||
rval = m_shard.get_location(tables[0]);
|
||||
MXS_FREE(tables[i]);
|
||||
}
|
||||
rval ? MXS_INFO("PREPARE STATEMENT ON SERVER %s", rval->name) :
|
||||
MXS_INFO("PREPARE STATEMENT TARGETS NO MAPPED TABLES");
|
||||
|
||||
MXS_FREE(tables);
|
||||
}
|
||||
else if (mxs_mysql_is_ps_command(command))
|
||||
{
|
||||
uint32_t id = mxs_mysql_extract_ps_id(buffer);
|
||||
uint32_t handle = m_shard.get_ps_handle(id);
|
||||
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
|
||||
gw_mysql_set_byte4(ptr, handle);
|
||||
rval = m_shard.get_statement(id);
|
||||
if (command == MXS_COM_STMT_CLOSE)
|
||||
{
|
||||
MXS_INFO("CLOSING STATEMENT %d ", id);
|
||||
m_shard.remove_statement(id);
|
||||
}
|
||||
rval = get_ps_target(buffer, qtype, op);
|
||||
}
|
||||
|
||||
if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER)
|
||||
@ -1563,7 +1441,7 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
|
||||
}
|
||||
}
|
||||
|
||||
if (rval == NULL && !has_dbs && m_current_db.length())
|
||||
if (rval == NULL && m_current_db.length())
|
||||
{
|
||||
/**
|
||||
* If the target name has not been found and the session has an
|
||||
@ -1791,4 +1669,148 @@ bool SchemaRouterSession::handle_statement(GWBUF* querybuf, SSRBackend& bref, ui
|
||||
return succp;
|
||||
}
|
||||
|
||||
SERVER* SchemaRouterSession::get_query_target(GWBUF* buffer)
|
||||
{
|
||||
int n_tables = 0;
|
||||
char** tables = qc_get_table_names(buffer, &n_tables, true);
|
||||
SERVER* rval = NULL;
|
||||
|
||||
for (int i = 0; i < n_tables; i++)
|
||||
{
|
||||
if (strchr(tables[i], '.') == NULL)
|
||||
{
|
||||
rval = m_shard.get_location(m_current_db);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int n_databases = 0;
|
||||
char** databases = qc_get_database_names(buffer, &n_databases);
|
||||
|
||||
for (int i = 0; i < n_databases; i++)
|
||||
{
|
||||
for (int j = 0; j < n_tables; j++)
|
||||
{
|
||||
SERVER* target = m_shard.get_location(tables[j]);
|
||||
|
||||
if (target)
|
||||
{
|
||||
|
||||
if (rval && target != rval)
|
||||
{
|
||||
MXS_ERROR("Query targets tables on servers '%s' and '%s'. "
|
||||
"Cross server queries are not supported.",
|
||||
rval->name, target->name);
|
||||
}
|
||||
else if (rval == NULL)
|
||||
{
|
||||
rval = target;
|
||||
MXS_INFO("Query targets table '%s' on server '%s'",
|
||||
tables[j], rval->name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
MXS_FREE(databases[i]);
|
||||
}
|
||||
|
||||
for (int i = 0; i < n_tables; i++)
|
||||
{
|
||||
MXS_FREE(tables[i]);
|
||||
}
|
||||
MXS_FREE(tables);
|
||||
MXS_FREE(databases);
|
||||
return rval;
|
||||
}
|
||||
|
||||
SERVER* SchemaRouterSession::get_ps_target(GWBUF* buffer, uint32_t qtype, qc_query_op_t op)
|
||||
{
|
||||
SERVER* rval = NULL;
|
||||
uint8_t command = mxs_mysql_get_command(buffer);
|
||||
|
||||
if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT))
|
||||
{
|
||||
GWBUF* pStmt = qc_get_preparable_stmt(buffer);
|
||||
int n_tables = 0;
|
||||
char** tables = qc_get_table_names(pStmt, &n_tables, true);
|
||||
char* stmt = qc_get_prepare_name(buffer);
|
||||
|
||||
for (int i = 0; i < n_tables; i++)
|
||||
{
|
||||
SERVER* target = m_shard.get_location(tables[i]);
|
||||
|
||||
if (target)
|
||||
{
|
||||
|
||||
if (rval && target != rval)
|
||||
{
|
||||
MXS_ERROR("Statement targets tables on servers '%s' and '%s'. "
|
||||
"Cross server queries are not supported.",
|
||||
rval->name, target->name);
|
||||
}
|
||||
else if (rval == NULL)
|
||||
{
|
||||
rval = target;
|
||||
}
|
||||
}
|
||||
MXS_FREE(tables[i]);
|
||||
}
|
||||
|
||||
if (rval)
|
||||
{
|
||||
MXS_INFO("PREPARING NAMED %s ON SERVER %s", stmt, rval->name);
|
||||
m_shard.add_statement(stmt, rval);
|
||||
}
|
||||
MXS_FREE(tables);
|
||||
MXS_FREE(stmt);
|
||||
}
|
||||
else if (op == QUERY_OP_EXECUTE)
|
||||
{
|
||||
char* stmt = qc_get_prepare_name(buffer);
|
||||
rval = m_shard.get_statement(stmt);
|
||||
MXS_INFO("Executing named statement %s on server %s", stmt, rval->name);
|
||||
MXS_FREE(stmt);
|
||||
}
|
||||
else if (qc_query_is_type(qtype, QUERY_TYPE_DEALLOC_PREPARE))
|
||||
{
|
||||
char* stmt = qc_get_prepare_name(buffer);
|
||||
|
||||
if ((rval = m_shard.get_statement(stmt)))
|
||||
{
|
||||
MXS_INFO("Closing named statement %s on server %s", stmt, rval->name);
|
||||
m_shard.remove_statement(stmt);
|
||||
}
|
||||
MXS_FREE(stmt);
|
||||
}
|
||||
else if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT))
|
||||
{
|
||||
int n_tables = 0;
|
||||
char** tables = qc_get_table_names(buffer, &n_tables, true);
|
||||
|
||||
for (int i = 0; i < n_tables; i++)
|
||||
{
|
||||
rval = m_shard.get_location(tables[0]);
|
||||
MXS_FREE(tables[i]);
|
||||
}
|
||||
rval ? MXS_INFO("Prepare statement on server %s", rval->name) :
|
||||
MXS_INFO("Prepared statement targets no mapped tables");
|
||||
MXS_FREE(tables);
|
||||
}
|
||||
else if (mxs_mysql_is_ps_command(command))
|
||||
{
|
||||
uint32_t id = mxs_mysql_extract_ps_id(buffer);
|
||||
uint32_t handle = m_shard.get_ps_handle(id);
|
||||
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
|
||||
gw_mysql_set_byte4(ptr, handle);
|
||||
rval = m_shard.get_statement(id);
|
||||
|
||||
if (command == MXS_COM_STMT_CLOSE)
|
||||
{
|
||||
MXS_INFO("Closing prepared statement %d ", id);
|
||||
m_shard.remove_statement(id);
|
||||
}
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
}
|
@ -130,6 +130,8 @@ private:
|
||||
bool have_servers();
|
||||
bool handle_default_db();
|
||||
bool ignore_duplicate_database(const char* data);
|
||||
SERVER* get_query_target(GWBUF* buffer);
|
||||
SERVER* get_ps_target(GWBUF* buffer, uint32_t qtype, qc_query_op_t op);
|
||||
|
||||
/** Routing functions */
|
||||
bool route_session_write(GWBUF* querybuf, uint8_t command);
|
||||
|
Reference in New Issue
Block a user