Move transaction state management to the right place

The transaction state must be updated after a buffer has been split
into buffer containing individual packets.

NOTE: The actual updating of the transaction state and the autocommit
      mode is currently wrong, but will be updated in a subsequent change.
This commit is contained in:
Johan Wikman
2016-10-25 10:58:49 +03:00
parent 7e822aed4d
commit 041df39819
2 changed files with 82 additions and 42 deletions

View File

@ -40,5 +40,18 @@ typedef enum routing_capability
#define RCAP_TYPE_NONE 0
/**
* Determines whether a particular capability type is required.
*
* @param capabilites The capability bits to be tested.
* @param type A particular capability type or a bitmask of types.
*
* @return True, if @c type is present in @c capabilities.
*/
static inline bool rcap_type_required(uint64_t capabilities, uint64_t type)
{
return (capabilities & type) == type;
}
MXS_END_DECLS

View File

@ -86,7 +86,7 @@ static int gw_client_hangup_event(DCB *dcb);
static char *gw_default_auth();
static int gw_connection_limit(DCB *dcb, int limit);
static int MySQLSendHandshake(DCB* dcb);
static int route_by_statement(SESSION *, GWBUF **);
static int route_by_statement(SESSION *, uint64_t, GWBUF **);
static void mysql_client_auth_error_handling(DCB *dcb, int auth_val, int packet_number);
static int gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read);
static int gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read);
@ -848,7 +848,7 @@ gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
/** If the router requires statement input or we are still authenticating
* we need to make sure that a complete SQL packet is read before continuing */
if (capabilities & RCAP_TYPE_STMT_INPUT)
if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT))
{
uint8_t* data;
int packet_size;
@ -886,50 +886,14 @@ gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities)
/** Reset error handler when routing of the new query begins */
dcb->dcb_errhandle_called = false;
if (capabilities & RCAP_TYPE_STMT_INPUT)
if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT))
{
SESSION *ses = dcb->session;
ss_dassert(ses);
uint32_t type = qc_get_type(read_buffer);
if (type & QUERY_TYPE_BEGIN_TRX)
{
session_trx_state_t trx_state;
if (type & QUERY_TYPE_WRITE)
{
trx_state = SESSION_TRX_READ_WRITE;
}
else if (type & QUERY_TYPE_READ)
{
trx_state = SESSION_TRX_READ_ONLY;
}
else
{
trx_state = SESSION_TRX_ACTIVE;
}
session_set_trx_state(ses, trx_state);
}
else if ((type & QUERY_TYPE_COMMIT) || (type & QUERY_TYPE_ROLLBACK))
{
session_set_trx_state(ses, SESSION_TRX_INACTIVE);
}
else if (type & QUERY_TYPE_ENABLE_AUTOCOMMIT)
{
session_set_autocommit(ses, true);
}
else if (type & QUERY_TYPE_DISABLE_AUTOCOMMIT)
{
session_set_autocommit(ses, false);
}
/**
* Feed each statement completely and separately
* to router. The routing functions return 1 for
* success or 0 for failure.
*/
return_code = route_by_statement(session, &read_buffer) ? 0 : 1;
return_code = route_by_statement(session, capabilities, &read_buffer) ? 0 : 1;
if (read_buffer != NULL)
{
@ -940,7 +904,7 @@ gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities)
spinlock_release(&dcb->authlock);
}
}
else if (NULL != session->router_session || (capabilities & RCAP_TYPE_NO_RSESSION))
else if (NULL != session->router_session || (rcap_type_required(capabilities, RCAP_TYPE_NO_RSESSION)))
{
/** Feed whole packet to router, which will free it
* and return 1 for success, 0 for failure
@ -1400,11 +1364,12 @@ retblock:
* leave incomplete packet to readbuf.
*
* @param session Session pointer
* @param capabilities The capabilities of the service.
* @param p_readbuf Pointer to the address of GWBUF including the query
*
* @return 1 if succeed,
*/
static int route_by_statement(SESSION* session, GWBUF** p_readbuf)
static int route_by_statement(SESSION* session, uint64_t capabilities, GWBUF** p_readbuf)
{
int rc;
GWBUF* packetbuf;
@ -1426,6 +1391,7 @@ static int route_by_statement(SESSION* session, GWBUF** p_readbuf)
* Collect incoming bytes to a buffer until complete packet has
* arrived and then return the buffer.
*/
// TODO: This should be replaced with modutil_get_next_MySQL_packet.
packetbuf = gw_MySQL_get_next_packet(p_readbuf);
if (packetbuf != NULL)
@ -1445,6 +1411,67 @@ static int route_by_statement(SESSION* session, GWBUF** p_readbuf)
* sure it is set to each (MySQL) packet.
*/
gwbuf_set_type(packetbuf, GWBUF_TYPE_SINGLE_STMT);
if (rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_INPUT))
{
if (!GWBUF_IS_CONTIGUOUS(packetbuf))
{
// TODO: As long as gw_MySQL_get_next_packet is used above, the buffer
// TODO: will be contiguous. That function should be replaced with
// TODO: modutil_get_next_MySQL_packet.
GWBUF* tmp = gwbuf_make_contiguous(packetbuf);
if (tmp)
{
packetbuf = tmp;
}
else
{
// TODO: A memory allocation failure. We should close the dcb
// TODO: and terminate the session.
rc = 0;
goto return_rc;
}
}
if (rcap_type_required(capabilities, RCAP_TYPE_TRANSACTION_TRACKING))
{
uint32_t type = qc_get_type(packetbuf);
if (type & QUERY_TYPE_BEGIN_TRX)
{
session_trx_state_t trx_state;
if (type & QUERY_TYPE_WRITE)
{
trx_state = SESSION_TRX_READ_WRITE;
}
else if (type & QUERY_TYPE_READ)
{
trx_state = SESSION_TRX_READ_ONLY;
}
else
{
trx_state = SESSION_TRX_ACTIVE;
}
session_set_trx_state(session, trx_state);
if (type & QUERY_TYPE_DISABLE_AUTOCOMMIT)
{
session_set_autocommit(session, false);
}
}
else if ((type & QUERY_TYPE_COMMIT) || (type & QUERY_TYPE_ROLLBACK))
{
session_set_trx_state(session, SESSION_TRX_INACTIVE);
if (type & QUERY_TYPE_ENABLE_AUTOCOMMIT)
{
session_set_autocommit(session, true);
}
}
}
}
/** Route query */
rc = SESSION_ROUTE_QUERY(session, packetbuf);
}