MXS-1113 Add support for prepared statements in schemarouter

Add support for binary protocol prepared statements for schemarouter.
This implementation doesn't yet attempt to handle all the edge cases.

Prepared statements are routed to the server that contains the affected
tables, the internal id from the server is then mapped to the session
command id that is inceremented for each prepared statement. This unique
session command id is returned to the client because internal id given
by server might be same around different servers and this way it is
possible to keep track of them and route them to the right servers when
executed.
This commit is contained in:
Marko
2018-07-29 14:48:30 +03:00
parent adbc3a6749
commit 11d57a264c
4 changed files with 160 additions and 10 deletions

View File

@ -143,9 +143,9 @@ static void inspect_query(GWBUF* pPacket, uint32_t* type, qc_query_op_t* op, uin
case MXS_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
case MXS_COM_PING: /*< 0e all servers are pinged */
case MXS_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
case MXS_COM_STMT_CLOSE: /*< free prepared statement */
case MXS_COM_STMT_SEND_LONG_DATA: /*< send data to column */
case MXS_COM_STMT_RESET: /*< resets the data of a prepared statement */
//case MXS_COM_STMT_CLOSE: /*< free prepared statement */
//case MXS_COM_STMT_SEND_LONG_DATA: /*< send data to column */
//case MXS_COM_STMT_RESET: /*< resets the data of a prepared statement */
*type = QUERY_TYPE_SESSION_WRITE;
break;
@ -447,6 +447,15 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
pPacket = NULL;
ret = 1;
}
else if (qc_query_is_type(type, QUERY_TYPE_PREPARE_STMT))
{
if (handle_statement(pPacket, bref, command, type))
{
atomic_add(&m_router->m_stats.n_sescmd, 1);
atomic_add(&m_router->m_stats.n_queries, 1);
ret = 1;
}
}
else if (bref->write(pPacket))
{
/** Add one query response waiter to backend reference */
@ -502,10 +511,22 @@ void SchemaRouterSession::process_sescmd_response(SSRBackend& bref, GWBUF** ppPa
if (bref->has_session_commands())
{
ss_dassert(GWBUF_IS_COLLECTED_RESULT(*ppPacket));
uint8_t command = bref->next_session_command()->get_command();
uint64_t id = bref->complete_session_command();
MXS_PS_RESPONSE resp = {};
if (m_replied_sescmd < m_sent_sescmd && id == m_replied_sescmd + 1)
{
if (command == MXS_COM_STMT_PREPARE)
{
mxs_mysql_extract_ps_response(*ppPacket, &resp);
MXS_INFO("ID: %lu HANDLE: %lu", (unsigned long)id, (unsigned long)resp.id);
m_shard.add_ps_handle(id, resp.id);
MXS_INFO("STMT SERVER: %s", bref->backend()->server->name);
m_shard.add_statement(id, bref->backend()->server);
uint8_t* ptr = GWBUF_DATA(*ppPacket) + MYSQL_PS_ID_OFFSET;
gw_mysql_set_byte4(ptr, id);
}
/** First reply to this session command, route it to the client */
++m_replied_sescmd;
}
@ -564,9 +585,11 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
{
process_sescmd_response(bref, &pPacket);
ss_dassert(bref->is_waiting_result());
/** Set response status as replied */
bref->ack_write();
if (bref->is_waiting_result())
{
/** Set response status as replied */
bref->ack_write();
}
if (pPacket)
{
@ -1409,8 +1432,9 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
SERVER *rval = NULL;
bool has_dbs = false; /**If the query targets any database other than the current one*/
qc_query_op_t op = QUERY_OP_UNDEFINED;
uint8_t command = mxs_mysql_get_command(buffer);
if (mxs_mysql_get_command(buffer) == MXS_COM_QUERY)
if (command == MXS_COM_QUERY)
{
op = qc_get_operation(buffer);
int n_tables = 0;
@ -1495,6 +1519,7 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
if (rval)
{
MXS_INFO("PREPARING NAMED %s ON SERVER %s", stmt, rval->name);
m_shard.add_statement(stmt, rval);
}
MXS_FREE(tables);
@ -1504,6 +1529,7 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
{
char* stmt = qc_get_prepare_name(buffer);
rval = m_shard.get_statement(stmt);
MXS_INFO("EXECUTING NAMED %s ON SERVER %s", stmt, rval->name);
MXS_FREE(stmt);
}
else if (qc_query_is_type(qtype, QUERY_TYPE_DEALLOC_PREPARE))
@ -1511,10 +1537,38 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
char* stmt = qc_get_prepare_name(buffer);
if ((rval = m_shard.get_statement(stmt)))
{
MXS_INFO("DEALLOCING NAMED %s ON SERVER %s", stmt, rval->name);
m_shard.remove_statement(stmt);
}
MXS_FREE(stmt);
}
else if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT))
{
int n_tables = 0;
char** tables = qc_get_table_names(buffer, &n_tables, true);
for (int i = 0; i < n_tables; i++)
{
rval = m_shard.get_location(tables[0]);
MXS_FREE(tables[i]);
}
rval ? MXS_INFO("PREPARE STATEMENT ON SERVER %s", rval->name) :
MXS_INFO("PREPARE STATEMENT TARGETS NO MAPPED TABLES");
MXS_FREE(tables);
}
else if (mxs_mysql_is_ps_command(command))
{
uint32_t id = mxs_mysql_extract_ps_id(buffer);
uint32_t handle = m_shard.get_ps_handle(id);
uint8_t* ptr = GWBUF_DATA(buffer) + MYSQL_PS_ID_OFFSET;
gw_mysql_set_byte4(ptr, handle);
rval = m_shard.get_statement(id);
if (command == MXS_COM_STMT_CLOSE)
{
MXS_INFO("CLOSING STATEMENT %d ", id);
m_shard.remove_statement(id);
}
}
if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER)
{
@ -1741,4 +1795,48 @@ bool SchemaRouterSession::send_tables(GWBUF* pPacket)
return rval;
}
bool SchemaRouterSession::handle_statement(GWBUF* querybuf, SSRBackend& bref, uint8_t command, uint32_t type)
{
bool succp = false;
atomic_add(&m_stats.longest_sescmd, 1);
/** Increment the session command count */
++m_sent_sescmd;
if (bref->in_use())
{
GWBUF *buffer = gwbuf_clone(querybuf);
bref->append_session_command(buffer, m_sent_sescmd);
if (bref->session_command_count() == 1)
{
if (bref->execute_session_command())
{
succp = true;
atomic_add_uint64(&bref->server()->stats.packets, 1);
}
else
{
MXS_ERROR("Failed to execute session "
"command in %s:%d",
bref->backend()->server->address,
bref->backend()->server->port);
}
}
else
{
ss_dassert(bref->session_command_count() > 1);
/** The server is already executing a session command */
MXS_INFO("Backend %s:%d already executing sescmd.",
bref->backend()->server->address,
bref->backend()->server->port);
succp = true;
}
}
gwbuf_free(querybuf);
return succp;
}
}