From 041df398197321710785f179f31939bf35133bf7 Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Tue, 25 Oct 2016 10:58:49 +0300 Subject: [PATCH] Move transaction state management to the right place The transaction state must be updated after a buffer has been split into buffer containing individual packets. NOTE: The actual updating of the transaction state and the autocommit mode is currently wrong, but will be updated in a subsequent change. --- include/maxscale/routing.h | 13 ++ .../protocol/MySQL/MySQLClient/mysql_client.c | 111 +++++++++++------- 2 files changed, 82 insertions(+), 42 deletions(-) diff --git a/include/maxscale/routing.h b/include/maxscale/routing.h index 7d7ac8158..1544414a2 100644 --- a/include/maxscale/routing.h +++ b/include/maxscale/routing.h @@ -40,5 +40,18 @@ typedef enum routing_capability #define RCAP_TYPE_NONE 0 +/** + * Determines whether a particular capability type is required. + * + * @param capabilites The capability bits to be tested. + * @param type A particular capability type or a bitmask of types. + * + * @return True, if @c type is present in @c capabilities. + */ +static inline bool rcap_type_required(uint64_t capabilities, uint64_t type) +{ + return (capabilities & type) == type; +} + MXS_END_DECLS diff --git a/server/modules/protocol/MySQL/MySQLClient/mysql_client.c b/server/modules/protocol/MySQL/MySQLClient/mysql_client.c index bad382043..b28191aa7 100644 --- a/server/modules/protocol/MySQL/MySQLClient/mysql_client.c +++ b/server/modules/protocol/MySQL/MySQLClient/mysql_client.c @@ -86,7 +86,7 @@ static int gw_client_hangup_event(DCB *dcb); static char *gw_default_auth(); static int gw_connection_limit(DCB *dcb, int limit); static int MySQLSendHandshake(DCB* dcb); -static int route_by_statement(SESSION *, GWBUF **); +static int route_by_statement(SESSION *, uint64_t, GWBUF **); static void mysql_client_auth_error_handling(DCB *dcb, int auth_val, int packet_number); static int gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read); static int gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read); @@ -848,7 +848,7 @@ gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read) /** If the router requires statement input or we are still authenticating * we need to make sure that a complete SQL packet is read before continuing */ - if (capabilities & RCAP_TYPE_STMT_INPUT) + if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT)) { uint8_t* data; int packet_size; @@ -886,50 +886,14 @@ gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities) /** Reset error handler when routing of the new query begins */ dcb->dcb_errhandle_called = false; - if (capabilities & RCAP_TYPE_STMT_INPUT) + if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT)) { - SESSION *ses = dcb->session; - ss_dassert(ses); - - uint32_t type = qc_get_type(read_buffer); - - if (type & QUERY_TYPE_BEGIN_TRX) - { - session_trx_state_t trx_state; - if (type & QUERY_TYPE_WRITE) - { - trx_state = SESSION_TRX_READ_WRITE; - } - else if (type & QUERY_TYPE_READ) - { - trx_state = SESSION_TRX_READ_ONLY; - } - else - { - trx_state = SESSION_TRX_ACTIVE; - } - - session_set_trx_state(ses, trx_state); - } - else if ((type & QUERY_TYPE_COMMIT) || (type & QUERY_TYPE_ROLLBACK)) - { - session_set_trx_state(ses, SESSION_TRX_INACTIVE); - } - else if (type & QUERY_TYPE_ENABLE_AUTOCOMMIT) - { - session_set_autocommit(ses, true); - } - else if (type & QUERY_TYPE_DISABLE_AUTOCOMMIT) - { - session_set_autocommit(ses, false); - } - /** * Feed each statement completely and separately * to router. The routing functions return 1 for * success or 0 for failure. */ - return_code = route_by_statement(session, &read_buffer) ? 0 : 1; + return_code = route_by_statement(session, capabilities, &read_buffer) ? 0 : 1; if (read_buffer != NULL) { @@ -940,7 +904,7 @@ gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities) spinlock_release(&dcb->authlock); } } - else if (NULL != session->router_session || (capabilities & RCAP_TYPE_NO_RSESSION)) + else if (NULL != session->router_session || (rcap_type_required(capabilities, RCAP_TYPE_NO_RSESSION))) { /** Feed whole packet to router, which will free it * and return 1 for success, 0 for failure @@ -1400,11 +1364,12 @@ retblock: * leave incomplete packet to readbuf. * * @param session Session pointer + * @param capabilities The capabilities of the service. * @param p_readbuf Pointer to the address of GWBUF including the query * * @return 1 if succeed, */ -static int route_by_statement(SESSION* session, GWBUF** p_readbuf) +static int route_by_statement(SESSION* session, uint64_t capabilities, GWBUF** p_readbuf) { int rc; GWBUF* packetbuf; @@ -1426,6 +1391,7 @@ static int route_by_statement(SESSION* session, GWBUF** p_readbuf) * Collect incoming bytes to a buffer until complete packet has * arrived and then return the buffer. */ + // TODO: This should be replaced with modutil_get_next_MySQL_packet. packetbuf = gw_MySQL_get_next_packet(p_readbuf); if (packetbuf != NULL) @@ -1445,6 +1411,67 @@ static int route_by_statement(SESSION* session, GWBUF** p_readbuf) * sure it is set to each (MySQL) packet. */ gwbuf_set_type(packetbuf, GWBUF_TYPE_SINGLE_STMT); + + if (rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_INPUT)) + { + if (!GWBUF_IS_CONTIGUOUS(packetbuf)) + { + // TODO: As long as gw_MySQL_get_next_packet is used above, the buffer + // TODO: will be contiguous. That function should be replaced with + // TODO: modutil_get_next_MySQL_packet. + GWBUF* tmp = gwbuf_make_contiguous(packetbuf); + if (tmp) + { + packetbuf = tmp; + } + else + { + // TODO: A memory allocation failure. We should close the dcb + // TODO: and terminate the session. + rc = 0; + goto return_rc; + } + } + + if (rcap_type_required(capabilities, RCAP_TYPE_TRANSACTION_TRACKING)) + { + uint32_t type = qc_get_type(packetbuf); + + if (type & QUERY_TYPE_BEGIN_TRX) + { + session_trx_state_t trx_state; + if (type & QUERY_TYPE_WRITE) + { + trx_state = SESSION_TRX_READ_WRITE; + } + else if (type & QUERY_TYPE_READ) + { + trx_state = SESSION_TRX_READ_ONLY; + } + else + { + trx_state = SESSION_TRX_ACTIVE; + } + + session_set_trx_state(session, trx_state); + + if (type & QUERY_TYPE_DISABLE_AUTOCOMMIT) + { + session_set_autocommit(session, false); + } + } + else if ((type & QUERY_TYPE_COMMIT) || (type & QUERY_TYPE_ROLLBACK)) + { + session_set_trx_state(session, SESSION_TRX_INACTIVE); + + if (type & QUERY_TYPE_ENABLE_AUTOCOMMIT) + { + session_set_autocommit(session, true); + } + } + } + } + /** Route query */ rc = SESSION_ROUTE_QUERY(session, packetbuf); }