1946 lines
		
	
	
		
			62 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1946 lines
		
	
	
		
			62 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|  *
 | |
|  * Copyright (c) 2016 MariaDB Corporation Ab
 | |
|  *
 | |
|  * Use of this software is governed by the Business Source License included
 | |
|  * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | |
|  *
 | |
|  * Change Date: 2020-01-01
 | |
|  *
 | |
|  * On the date above, in accordance with the Business Source License, use
 | |
|  * of this software will be governed by version 2 or later of the General
 | |
|  * Public License.
 | |
|  */
 | |
| 
 | |
| #define MXS_MODULE_NAME "mariadbclient"
 | |
| 
 | |
| #include <maxscale/cppdefs.hh>
 | |
| 
 | |
| #include <inttypes.h>
 | |
| #include <limits.h>
 | |
| #include <netinet/tcp.h>
 | |
| #include <sys/stat.h>
 | |
| #include <string>
 | |
| 
 | |
| #include <maxscale/alloc.h>
 | |
| #include <maxscale/authenticator.h>
 | |
| #include <maxscale/log_manager.h>
 | |
| #include <maxscale/modinfo.h>
 | |
| #include <maxscale/modutil.h>
 | |
| #include <maxscale/poll.h>
 | |
| #include <maxscale/protocol.h>
 | |
| #include <maxscale/protocol/mysql.h>
 | |
| #include <maxscale/query_classifier.h>
 | |
| #include <maxscale/router.h>
 | |
| #include <maxscale/session.h>
 | |
| #include <maxscale/ssl.h>
 | |
| #include <maxscale/utils.h>
 | |
| #include <maxscale/worker.h>
 | |
| 
 | |
| #include "setsqlmodeparser.hh"
 | |
| 
 | |
| /** Return type of process_special_commands() */
 | |
| typedef enum spec_com_res_t
 | |
| {
 | |
|     RES_CONTINUE, // No special command detected, proceed as normal.
 | |
|     RES_END,      // Query handling completed, do not send to filters/router.
 | |
|     RES_MORE_DATA // Possible special command, but not enough data to be sure. Must
 | |
|     // wait for more data.
 | |
| } spec_com_res_t;
 | |
| 
 | |
| const char WORD_KILL[] = "KILL";
 | |
| 
 | |
| static int process_init(void);
 | |
| static void process_finish(void);
 | |
| static int thread_init(void);
 | |
| static void thread_finish(void);
 | |
| 
 | |
| static int gw_MySQLAccept(DCB *listener);
 | |
| static int gw_MySQLListener(DCB *listener, char *config_bind);
 | |
| static int gw_read_client_event(DCB* dcb);
 | |
| static int gw_write_client_event(DCB *dcb);
 | |
| static int gw_MySQLWrite_client(DCB *dcb, GWBUF *queue);
 | |
| static int gw_error_client_event(DCB *dcb);
 | |
| static int gw_client_close(DCB *dcb);
 | |
| static int gw_client_hangup_event(DCB *dcb);
 | |
| static char *gw_default_auth();
 | |
| static int gw_connection_limit(DCB *dcb, int limit);
 | |
| static int MySQLSendHandshake(DCB* dcb);
 | |
| static int route_by_statement(MXS_SESSION *, uint64_t, GWBUF **);
 | |
| static void mysql_client_auth_error_handling(DCB *dcb, int auth_val, int packet_number);
 | |
| static int gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read);
 | |
| static int gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read);
 | |
| static int gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities);
 | |
| static void gw_process_one_new_client(DCB *client_dcb);
 | |
| static spec_com_res_t process_special_commands(DCB *client_dcb, GWBUF *read_buffer, int nbytes_read);
 | |
| static spec_com_res_t handle_query_kill(DCB* dcb, GWBUF* read_buffer, spec_com_res_t current,
 | |
|                                         bool is_complete, unsigned int packet_len);
 | |
| static bool parse_kill_query(char *query, uint64_t *thread_id_out, kill_type_t *kt_out, std::string* user);
 | |
| static void parse_and_set_trx_state(MXS_SESSION *ses, GWBUF *data);
 | |
| /**
 | |
|  * The module entry point routine. It is this routine that
 | |
|  * must populate the structure that is referred to as the
 | |
|  * "module object", this is a structure with the set of
 | |
|  * external entry points for this module.
 | |
|  *
 | |
|  * @return The module object
 | |
|  */
 | |
| 
 | |
| extern "C"
 | |
| {
 | |
| 
 | |
|     MXS_MODULE* MXS_CREATE_MODULE()
 | |
|     {
 | |
|         static MXS_PROTOCOL MyObject =
 | |
|         {
 | |
|             gw_read_client_event,                   /* Read - EPOLLIN handler        */
 | |
|             gw_MySQLWrite_client,                   /* Write - data from gateway     */
 | |
|             gw_write_client_event,                  /* WriteReady - EPOLLOUT handler */
 | |
|             gw_error_client_event,                  /* Error - EPOLLERR handler      */
 | |
|             gw_client_hangup_event,                 /* HangUp - EPOLLHUP handler     */
 | |
|             gw_MySQLAccept,                         /* Accept                        */
 | |
|             NULL,                                   /* Connect                       */
 | |
|             gw_client_close,                        /* Close                         */
 | |
|             gw_MySQLListener,                       /* Listen                        */
 | |
|             NULL,                                   /* Authentication                */
 | |
|             NULL,                                   /* Session                       */
 | |
|             gw_default_auth,                        /* Default authenticator         */
 | |
|             gw_connection_limit,                    /* Send error connection limit   */
 | |
|             NULL
 | |
|         };
 | |
| 
 | |
|         static MXS_MODULE info =
 | |
|         {
 | |
|             MXS_MODULE_API_PROTOCOL,
 | |
|             MXS_MODULE_GA,
 | |
|             MXS_PROTOCOL_VERSION,
 | |
|             "The client to MaxScale MySQL protocol implementation",
 | |
|             "V1.1.0",
 | |
|             MXS_NO_MODULE_CAPABILITIES,
 | |
|             &MyObject,
 | |
|             process_init,
 | |
|             process_finish,
 | |
|             thread_init,
 | |
|             thread_finish,
 | |
|             {
 | |
|                 {MXS_END_MODULE_PARAMS}
 | |
|             }
 | |
|         };
 | |
| 
 | |
|         return &info;
 | |
|     }
 | |
| 
 | |
| }
 | |
| /*lint +e14 */
 | |
| 
 | |
| /**
 | |
|  * Performs process wide initialization.
 | |
|  *
 | |
|  * @return 0 if successful, non-zero otherwise.
 | |
|  */
 | |
| static int process_init(void)
 | |
| {
 | |
|     int rv = mysql_library_init(0, NULL, NULL);
 | |
| 
 | |
|     if (rv != 0)
 | |
|     {
 | |
|         MXS_ERROR("MySQL initialization failed, MariaDB MaxScale will exit. "
 | |
|                   "MySQL Error: %d, %s.", mysql_errno(NULL), mysql_error(NULL));
 | |
|     }
 | |
| 
 | |
|     return rv;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Performs process wide finalization.
 | |
|  */
 | |
| static void process_finish(void)
 | |
| {
 | |
|     mysql_library_end();
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Performs thread-specific initialization.
 | |
|  *
 | |
|  * @return 0 if successful, non-zero otherwise.
 | |
|  */
 | |
| static int thread_init(void)
 | |
| {
 | |
|     int rv = mysql_thread_init();
 | |
| 
 | |
|     if (rv != 0)
 | |
|     {
 | |
|         MXS_ERROR("MySQL thread initialization failed, the thread will exit.");
 | |
|     }
 | |
| 
 | |
|     return rv;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Performs thread specific finalization.
 | |
|  */
 | |
| static void thread_finish(void)
 | |
| {
 | |
|     mysql_thread_end();
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * The default authenticator name for this protocol
 | |
|  *
 | |
|  * @return name of authenticator
 | |
|  */
 | |
| static char *gw_default_auth()
 | |
| {
 | |
|     return (char*)"MySQLAuth";
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * MySQLSendHandshake
 | |
|  *
 | |
|  * @param dcb The descriptor control block to use for sending the handshake request
 | |
|  * @return      The packet length sent
 | |
|  */
 | |
| int MySQLSendHandshake(DCB* dcb)
 | |
| {
 | |
|     uint8_t *outbuf = NULL;
 | |
|     uint32_t mysql_payload_size = 0;
 | |
|     uint8_t mysql_packet_header[4];
 | |
|     uint8_t mysql_packet_id = 0;
 | |
|     /* uint8_t mysql_filler = GW_MYSQL_HANDSHAKE_FILLER; not needed*/
 | |
|     uint8_t mysql_protocol_version = GW_MYSQL_PROTOCOL_VERSION;
 | |
|     uint8_t *mysql_handshake_payload = NULL;
 | |
|     uint8_t mysql_thread_id_num[4];
 | |
|     uint8_t mysql_scramble_buf[9] = "";
 | |
|     uint8_t mysql_plugin_data[13] = "";
 | |
|     uint8_t mysql_server_capabilities_one[2];
 | |
|     uint8_t mysql_server_capabilities_two[2];
 | |
|     uint8_t mysql_server_language = 8;
 | |
|     uint8_t mysql_server_status[2];
 | |
|     uint8_t mysql_scramble_len = 21;
 | |
|     uint8_t mysql_filler_ten[10] = {};
 | |
|     /* uint8_t mysql_last_byte = 0x00; not needed */
 | |
|     char server_scramble[GW_MYSQL_SCRAMBLE_SIZE + 1] = "";
 | |
|     char *version_string;
 | |
|     int len_version_string = 0;
 | |
| 
 | |
|     bool is_maria = false;
 | |
| 
 | |
|     if (dcb->service->dbref)
 | |
|     {
 | |
|         mysql_server_language = dcb->service->dbref->server->charset;
 | |
| 
 | |
|         if (strstr(dcb->service->dbref->server->version_string, "10.2."))
 | |
|         {
 | |
|             /** The backend servers support the extended capabilities */
 | |
|             is_maria = true;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
 | |
|     GWBUF *buf;
 | |
| 
 | |
|     /* get the version string from service property if available*/
 | |
|     if (dcb->service->version_string[0])
 | |
|     {
 | |
|         version_string = dcb->service->version_string;
 | |
|         len_version_string = strlen(version_string);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         version_string = (char*)GW_MYSQL_VERSION;
 | |
|         len_version_string = strlen(GW_MYSQL_VERSION);
 | |
|     }
 | |
| 
 | |
|     gw_generate_random_str(server_scramble, GW_MYSQL_SCRAMBLE_SIZE);
 | |
| 
 | |
|     // copy back to the caller
 | |
|     memcpy(protocol->scramble, server_scramble, GW_MYSQL_SCRAMBLE_SIZE);
 | |
| 
 | |
|     if (is_maria)
 | |
|     {
 | |
|         /**
 | |
|          * The new 10.2 capability flags are stored in the last 4 bytes of the
 | |
|          * 10 byte filler block.
 | |
|          */
 | |
|         uint32_t new_flags = MXS_MARIA_CAP_STMT_BULK_OPERATIONS;
 | |
|         memcpy(mysql_filler_ten + 6, &new_flags, sizeof(new_flags));
 | |
|     }
 | |
| 
 | |
|     // Get the equivalent of the server thread id.
 | |
|     protocol->thread_id = session_get_next_id();
 | |
|     // Send only the low 32bits in the handshake.
 | |
|     gw_mysql_set_byte4(mysql_thread_id_num, (uint32_t)(protocol->thread_id));
 | |
|     memcpy(mysql_scramble_buf, server_scramble, 8);
 | |
| 
 | |
|     memcpy(mysql_plugin_data, server_scramble + 8, 12);
 | |
| 
 | |
|     /**
 | |
|      * Use the default authentication plugin name in the initial handshake. If the
 | |
|      * authenticator needs to change the authentication method, it should send
 | |
|      * an AuthSwitchRequest packet to the client.
 | |
|      */
 | |
|     const char* plugin_name = DEFAULT_MYSQL_AUTH_PLUGIN;
 | |
|     int plugin_name_len = strlen(plugin_name);
 | |
| 
 | |
|     mysql_payload_size =
 | |
|         sizeof(mysql_protocol_version) + (len_version_string + 1) + sizeof(mysql_thread_id_num) + 8 +
 | |
|         sizeof(/* mysql_filler */ uint8_t) + sizeof(mysql_server_capabilities_one) + sizeof(mysql_server_language) +
 | |
|         sizeof(mysql_server_status) + sizeof(mysql_server_capabilities_two) + sizeof(mysql_scramble_len) +
 | |
|         sizeof(mysql_filler_ten) + 12 + sizeof(/* mysql_last_byte */ uint8_t) + plugin_name_len +
 | |
|         sizeof(/* mysql_last_byte */ uint8_t);
 | |
| 
 | |
|     // allocate memory for packet header + payload
 | |
|     if ((buf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size)) == NULL)
 | |
|     {
 | |
|         ss_dassert(buf != NULL);
 | |
|         return 0;
 | |
|     }
 | |
|     outbuf = GWBUF_DATA(buf);
 | |
| 
 | |
|     // write packet header with mysql_payload_size
 | |
|     gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size);
 | |
| 
 | |
|     // write packet number, now is 0
 | |
|     mysql_packet_header[3] = mysql_packet_id;
 | |
|     memcpy(outbuf, mysql_packet_header, sizeof(mysql_packet_header));
 | |
| 
 | |
|     // current buffer pointer
 | |
|     mysql_handshake_payload = outbuf + sizeof(mysql_packet_header);
 | |
| 
 | |
|     // write protocol version
 | |
|     memcpy(mysql_handshake_payload, &mysql_protocol_version, sizeof(mysql_protocol_version));
 | |
|     mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_protocol_version);
 | |
| 
 | |
|     // write server version plus 0 filler
 | |
|     strcpy((char *)mysql_handshake_payload, version_string);
 | |
|     mysql_handshake_payload = mysql_handshake_payload + len_version_string;
 | |
| 
 | |
|     *mysql_handshake_payload = 0x00;
 | |
| 
 | |
|     mysql_handshake_payload++;
 | |
| 
 | |
|     // write thread id
 | |
|     memcpy(mysql_handshake_payload, mysql_thread_id_num, sizeof(mysql_thread_id_num));
 | |
|     mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_thread_id_num);
 | |
| 
 | |
|     // write scramble buf
 | |
|     memcpy(mysql_handshake_payload, mysql_scramble_buf, 8);
 | |
|     mysql_handshake_payload = mysql_handshake_payload + 8;
 | |
|     *mysql_handshake_payload = GW_MYSQL_HANDSHAKE_FILLER;
 | |
|     mysql_handshake_payload++;
 | |
| 
 | |
|     // write server capabilities part one
 | |
|     mysql_server_capabilities_one[0] = (uint8_t)GW_MYSQL_CAPABILITIES_SERVER;
 | |
|     mysql_server_capabilities_one[1] = (uint8_t)(GW_MYSQL_CAPABILITIES_SERVER >> 8);
 | |
| 
 | |
|     // Check that we match the old values
 | |
|     ss_dassert(mysql_server_capabilities_one[0] = 0xff);
 | |
|     ss_dassert(mysql_server_capabilities_one[1] = 0xf7);
 | |
| 
 | |
|     if (is_maria)
 | |
|     {
 | |
|         /** A MariaDB 10.2 server doesn't send the CLIENT_MYSQL capability
 | |
|          * to signal that it supports extended capabilities */
 | |
|         mysql_server_capabilities_one[0] &= ~(uint8_t)GW_MYSQL_CAPABILITIES_CLIENT_MYSQL;
 | |
|     }
 | |
| 
 | |
|     if (ssl_required_by_dcb(dcb))
 | |
|     {
 | |
|         mysql_server_capabilities_one[1] |= (int)GW_MYSQL_CAPABILITIES_SSL >> 8;
 | |
|     }
 | |
| 
 | |
|     memcpy(mysql_handshake_payload, mysql_server_capabilities_one, sizeof(mysql_server_capabilities_one));
 | |
|     mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_server_capabilities_one);
 | |
| 
 | |
|     // write server language
 | |
|     memcpy(mysql_handshake_payload, &mysql_server_language, sizeof(mysql_server_language));
 | |
|     mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_server_language);
 | |
| 
 | |
|     //write server status
 | |
|     mysql_server_status[0] = 2;
 | |
|     mysql_server_status[1] = 0;
 | |
|     memcpy(mysql_handshake_payload, mysql_server_status, sizeof(mysql_server_status));
 | |
|     mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_server_status);
 | |
| 
 | |
|     //write server capabilities part two
 | |
|     mysql_server_capabilities_two[0] = (uint8_t)(GW_MYSQL_CAPABILITIES_SERVER >> 16);
 | |
|     mysql_server_capabilities_two[1] = (uint8_t)(GW_MYSQL_CAPABILITIES_SERVER >> 24);
 | |
| 
 | |
|     // Check that we match the old values
 | |
|     ss_dassert(mysql_server_capabilities_two[0] == 15);
 | |
|     /** NOTE: pre-2.1 versions sent the fourth byte of the capabilities as
 | |
|      the value 128 even though there's no such capability. */
 | |
| 
 | |
|     memcpy(mysql_handshake_payload, mysql_server_capabilities_two, sizeof(mysql_server_capabilities_two));
 | |
|     mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_server_capabilities_two);
 | |
| 
 | |
|     // write scramble_len
 | |
|     memcpy(mysql_handshake_payload, &mysql_scramble_len, sizeof(mysql_scramble_len));
 | |
|     mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_scramble_len);
 | |
| 
 | |
|     //write 10 filler
 | |
|     memcpy(mysql_handshake_payload, mysql_filler_ten, sizeof(mysql_filler_ten));
 | |
|     mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_filler_ten);
 | |
| 
 | |
|     // write plugin data
 | |
|     memcpy(mysql_handshake_payload, mysql_plugin_data, 12);
 | |
|     mysql_handshake_payload = mysql_handshake_payload + 12;
 | |
| 
 | |
|     //write last byte, 0
 | |
|     *mysql_handshake_payload = 0x00;
 | |
|     mysql_handshake_payload++;
 | |
| 
 | |
|     // to be understanded ????
 | |
|     memcpy(mysql_handshake_payload, plugin_name, plugin_name_len);
 | |
|     mysql_handshake_payload = mysql_handshake_payload + plugin_name_len;
 | |
| 
 | |
|     //write last byte, 0
 | |
|     *mysql_handshake_payload = 0x00;
 | |
| 
 | |
|     // writing data in the Client buffer queue
 | |
|     dcb->func.write(dcb, buf);
 | |
| 
 | |
|     return sizeof(mysql_packet_header) + mysql_payload_size;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Write function for client DCB: writes data from MaxScale to Client
 | |
|  *
 | |
|  * @param dcb   The DCB of the client
 | |
|  * @param queue Queue of buffers to write
 | |
|  */
 | |
| int gw_MySQLWrite_client(DCB *dcb, GWBUF *queue)
 | |
| {
 | |
|     if (GWBUF_IS_REPLY_OK(queue) && dcb->service->session_track_trx_state)
 | |
|     {
 | |
|         parse_and_set_trx_state(dcb->session, queue);
 | |
|     }
 | |
|     return dcb_write(dcb, queue);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Client read event triggered by EPOLLIN
 | |
|  *
 | |
|  * @param dcb   Descriptor control block
 | |
|  * @return 0 if succeed, 1 otherwise
 | |
|  */
 | |
| int gw_read_client_event(DCB* dcb)
 | |
| {
 | |
|     MySQLProtocol *protocol;
 | |
|     GWBUF *read_buffer = NULL;
 | |
|     int return_code = 0;
 | |
|     uint32_t nbytes_read = 0;
 | |
|     uint32_t max_bytes = 0;
 | |
| 
 | |
|     CHK_DCB(dcb);
 | |
|     if (dcb->dcb_role != DCB_ROLE_CLIENT_HANDLER)
 | |
|     {
 | |
|         MXS_ERROR("DCB must be a client handler for MySQL client protocol.");
 | |
|         return 1;
 | |
|     }
 | |
| 
 | |
|     protocol = (MySQLProtocol *)dcb->protocol;
 | |
|     CHK_PROTOCOL(protocol);
 | |
| 
 | |
|     MXS_DEBUG("Protocol state: %s", gw_mysql_protocol_state2string(protocol->protocol_auth_state));
 | |
| 
 | |
|     /**
 | |
|      * The use of max_bytes seems like a hack, but no better option is available
 | |
|      * at the time of writing. When a MySQL server receives a new connection
 | |
|      * request, it sends an Initial Handshake Packet. Where the client wants to
 | |
|      * use SSL, it responds with an SSL Request Packet (in place of a Handshake
 | |
|      * Response Packet). The SSL Request Packet contains only the basic header,
 | |
|      * and not the user credentials. It is 36 bytes long.  The server then
 | |
|      * initiates the SSL handshake (via calls to OpenSSL).
 | |
|      *
 | |
|      * In many cases, this is what happens. But occasionally, the client seems
 | |
|      * to send a packet much larger than 36 bytes (in tests it was 333 bytes).
 | |
|      * If the whole of the packet is read, it is then lost to the SSL handshake
 | |
|      * process. Why this happens is presently unknown. Reading just 36 bytes
 | |
|      * when the server requires SSL and SSL has not yet been negotiated seems
 | |
|      * to solve the problem.
 | |
|      *
 | |
|      * If a neater solution can be found, so much the better.
 | |
|      */
 | |
|     if (ssl_required_but_not_negotiated(dcb))
 | |
|     {
 | |
|         max_bytes = 36;
 | |
|     }
 | |
|     return_code = dcb_read(dcb, &read_buffer, max_bytes);
 | |
|     if (return_code < 0)
 | |
|     {
 | |
|         dcb_close(dcb);
 | |
|     }
 | |
|     if (0 == (nbytes_read = gwbuf_length(read_buffer)))
 | |
|     {
 | |
|         return return_code;
 | |
|     }
 | |
| 
 | |
|     return_code = 0;
 | |
| 
 | |
|     switch (protocol->protocol_auth_state)
 | |
|     {
 | |
|     /**
 | |
|      *
 | |
|      * When a listener receives a new connection request, it creates a
 | |
|      * request handler DCB to for the client connection. The listener also
 | |
|      * sends the initial authentication request to the client. The first
 | |
|      * time this function is called from the poll loop, the client reply
 | |
|      * to the authentication request should be available.
 | |
|      *
 | |
|      * If the authentication is successful the protocol authentication state
 | |
|      * will be changed to MYSQL_IDLE (see below).
 | |
|      *
 | |
|      */
 | |
|     case MXS_AUTH_STATE_MESSAGE_READ:
 | |
|         if (nbytes_read < 3 ||
 | |
|             (0 == max_bytes && nbytes_read < MYSQL_GET_PACKET_LEN(read_buffer)) ||
 | |
|             (0 != max_bytes && nbytes_read < max_bytes))
 | |
|         {
 | |
|             dcb_readq_append(dcb, read_buffer);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             if (nbytes_read > MYSQL_GET_PACKET_LEN(read_buffer))
 | |
|             {
 | |
|                 // We read more data than was needed
 | |
|                 dcb_readq_append(dcb, read_buffer);
 | |
|                 read_buffer = modutil_get_next_MySQL_packet(&dcb->readq);
 | |
|             }
 | |
| 
 | |
|             return_code = gw_read_do_authentication(dcb, read_buffer, nbytes_read);
 | |
|         }
 | |
|         break;
 | |
| 
 | |
|     /**
 | |
|      *
 | |
|      * Once a client connection is authenticated, the protocol authentication
 | |
|      * state will be MYSQL_IDLE and so every event of data received will
 | |
|      * result in a call that comes to this section of code.
 | |
|      *
 | |
|      */
 | |
|     case MXS_AUTH_STATE_COMPLETE:
 | |
|         /* After this call read_buffer will point to freed data */
 | |
|         return_code = gw_read_normal_data(dcb, read_buffer, nbytes_read);
 | |
|         break;
 | |
| 
 | |
|     case MXS_AUTH_STATE_FAILED:
 | |
|         gwbuf_free(read_buffer);
 | |
|         return_code = 1;
 | |
|         break;
 | |
| 
 | |
|     default:
 | |
|         MXS_ERROR("In mysql_client.c unexpected protocol authentication state");
 | |
|         break;
 | |
|     }
 | |
| 
 | |
|     return return_code;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Store client connection information into the DCB
 | |
|  * @param dcb Client DCB
 | |
|  * @param buffer Buffer containing the handshake response packet
 | |
|  */
 | |
| static void store_client_information(DCB *dcb, GWBUF *buffer)
 | |
| {
 | |
|     size_t len = gwbuf_length(buffer);
 | |
|     uint8_t data[len];
 | |
|     MySQLProtocol *proto = (MySQLProtocol*)dcb->protocol;
 | |
|     MYSQL_session *ses = (MYSQL_session*)dcb->data;
 | |
| 
 | |
|     gwbuf_copy_data(buffer, 0, len, data);
 | |
|     ss_dassert(MYSQL_GET_PAYLOAD_LEN(data) + MYSQL_HEADER_LEN == len ||
 | |
|                len == MYSQL_AUTH_PACKET_BASE_SIZE); // For SSL request packet
 | |
| 
 | |
|     proto->client_capabilities = gw_mysql_get_byte4(data + MYSQL_CLIENT_CAP_OFFSET);
 | |
|     proto->charset = data[MYSQL_CHARSET_OFFSET];
 | |
| 
 | |
|     /** MariaDB 10.2 compatible clients don't set the first bit to signal that
 | |
|      * there are extra capabilities stored in the last 4 bytes of the 23 byte filler. */
 | |
|     if ((proto->client_capabilities & GW_MYSQL_CAPABILITIES_CLIENT_MYSQL) == 0)
 | |
|     {
 | |
|         proto->extra_capabilities = gw_mysql_get_byte4(data + MARIADB_CAP_OFFSET);
 | |
|     }
 | |
| 
 | |
|     if (len > MYSQL_AUTH_PACKET_BASE_SIZE)
 | |
|     {
 | |
|         strcpy(ses->user, (char*)data + MYSQL_AUTH_PACKET_BASE_SIZE);
 | |
| 
 | |
|         if (proto->client_capabilities & GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB)
 | |
|         {
 | |
|             /** Client supports default database on connect */
 | |
|             size_t userlen = strlen(ses->user) + 1;
 | |
| 
 | |
|             /** Skip the authentication token, it is handled by the authenticators */
 | |
|             uint8_t authlen = data[MYSQL_AUTH_PACKET_BASE_SIZE + userlen];
 | |
| 
 | |
|             size_t dboffset = MYSQL_AUTH_PACKET_BASE_SIZE + userlen + authlen + 1;
 | |
| 
 | |
|             if (data[dboffset])
 | |
|             {
 | |
|                 /** Client is connecting with a default database */
 | |
|                 strcpy(ses->db, (char*)data + dboffset);
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Debug check function for authentication packets
 | |
|  *
 | |
|  * Check that the packet is consistent with how the protocol works and that no
 | |
|  * unexpected data is processed.
 | |
|  *
 | |
|  * @param dcb Client DCB
 | |
|  * @param buf Buffer containing packet
 | |
|  * @param bytes Number of bytes available
 | |
|  */
 | |
| static void check_packet(DCB *dcb, GWBUF *buf, int bytes)
 | |
| {
 | |
|     uint8_t hdr[MYSQL_HEADER_LEN];
 | |
|     ss_dassert(gwbuf_copy_data(buf, 0, MYSQL_HEADER_LEN, hdr) == MYSQL_HEADER_LEN);
 | |
| 
 | |
|     int buflen = gwbuf_length(buf);
 | |
|     int pktlen = MYSQL_GET_PAYLOAD_LEN(hdr) + MYSQL_HEADER_LEN;
 | |
| 
 | |
|     if (bytes == MYSQL_AUTH_PACKET_BASE_SIZE)
 | |
|     {
 | |
|         /** This is an SSL request packet */
 | |
|         ss_dassert(dcb->listener->ssl);
 | |
|         ss_dassert(buflen == bytes && pktlen >= buflen);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         /** Normal packet */
 | |
|         ss_dassert(buflen == pktlen);
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Client read event, process when client not yet authenticated
 | |
|  *
 | |
|  * @param dcb           Descriptor control block
 | |
|  * @param read_buffer   A buffer containing the data read from client
 | |
|  * @param nbytes_read   The number of bytes of data read
 | |
|  * @return 0 if succeed, 1 otherwise
 | |
|  */
 | |
| static int
 | |
| gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
 | |
| {
 | |
|     ss_debug(check_packet(dcb, read_buffer, nbytes_read));
 | |
| 
 | |
|     /** Allocate the shared session structure */
 | |
|     if (dcb->data == NULL && (dcb->data = mysql_session_alloc()) == NULL)
 | |
|     {
 | |
|         dcb_close(dcb);
 | |
|         return 1;
 | |
|     }
 | |
| 
 | |
|     /** Read the client's packet sequence and increment that by one */
 | |
|     uint8_t next_sequence;
 | |
|     gwbuf_copy_data(read_buffer, MYSQL_SEQ_OFFSET, 1, &next_sequence);
 | |
| 
 | |
|     if (next_sequence == 1 || (ssl_required_by_dcb(dcb) && next_sequence == 2))
 | |
|     {
 | |
|         /** This is the first response from the client, read the connection
 | |
|          * information and store them in the shared structure. For SSL connections,
 | |
|          * this will be packet number two since the first packet will be the
 | |
|          * Protocol::SSLRequest packet.
 | |
|          *
 | |
|          * @see https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::SSLRequest
 | |
|          */
 | |
|         store_client_information(dcb, read_buffer);
 | |
|     }
 | |
| 
 | |
|     next_sequence++;
 | |
| 
 | |
|     /**
 | |
|      * The first step in the authentication process is to extract the
 | |
|      * relevant information from the buffer supplied and place it
 | |
|      * into a data structure pointed to by the DCB.  The "success"
 | |
|      * result is not final, it implies only that the process is so
 | |
|      * far successful, not that authentication has completed.  If the
 | |
|      * data extraction succeeds, then a call is made to the actual
 | |
|      * authenticate function to carry out the user checks.
 | |
|      */
 | |
|     int auth_val = MXS_AUTH_FAILED;
 | |
|     if (dcb->authfunc.extract(dcb, read_buffer))
 | |
|     {
 | |
|         auth_val = dcb->authfunc.authenticate(dcb);
 | |
|     }
 | |
| 
 | |
|     MySQLProtocol *protocol = (MySQLProtocol *)dcb->protocol;
 | |
| 
 | |
|     /**
 | |
|      * At this point, if the auth_val return code indicates success
 | |
|      * the user authentication has been successfully completed.
 | |
|      * But in order to have a working connection, a session has to
 | |
|      * be created.  Provided that is also successful (indicated by a
 | |
|      * non-null session) then the whole process has succeeded. In all
 | |
|      * other cases an error return is made.
 | |
|      */
 | |
|     if (MXS_AUTH_SUCCEEDED == auth_val)
 | |
|     {
 | |
|         if (dcb->user == NULL)
 | |
|         {
 | |
|             /** User authentication complete, copy the username to the DCB */
 | |
|             MYSQL_session *ses = (MYSQL_session*)dcb->data;
 | |
|             if ((dcb->user = MXS_STRDUP(ses->user)) == NULL)
 | |
|             {
 | |
|                 dcb_close(dcb);
 | |
|                 gwbuf_free(read_buffer);
 | |
|                 return 0;
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         protocol->protocol_auth_state = MXS_AUTH_STATE_RESPONSE_SENT;
 | |
|         /**
 | |
|          * Create session, and a router session for it.
 | |
|          * If successful, there will be backend connection(s)
 | |
|          * after this point. The protocol authentication state
 | |
|          * is changed so that future data will go through the
 | |
|          * normal data handling function instead of this one.
 | |
|          */
 | |
|         MXS_SESSION *session =
 | |
|             session_alloc_with_id(dcb->service, dcb, protocol->thread_id);
 | |
| 
 | |
|         if (session != NULL)
 | |
|         {
 | |
|             CHK_SESSION(session);
 | |
|             ss_dassert(session->state != SESSION_STATE_ALLOC &&
 | |
|                        session->state != SESSION_STATE_DUMMY);
 | |
|             // For the time being only the sql_mode is stored in MXS_SESSION::client_protocol_data.
 | |
|             session->client_protocol_data = QC_SQL_MODE_DEFAULT;
 | |
|             protocol->protocol_auth_state = MXS_AUTH_STATE_COMPLETE;
 | |
|             ss_debug(bool check = ) mxs_worker_register_session(session);
 | |
|             ss_dassert(check);
 | |
|             mxs_mysql_send_ok(dcb, next_sequence, 0, NULL);
 | |
| 
 | |
|             if (dcb->readq)
 | |
|             {
 | |
|                 // The user has already send more data, process it
 | |
|                 poll_fake_read_event(dcb);
 | |
|             }
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             auth_val = MXS_AUTH_NO_SESSION;
 | |
|         }
 | |
|     }
 | |
|     /**
 | |
|      * If we did not get success throughout or authentication is not yet complete,
 | |
|      * then the protocol state is updated, the client is notified of the failure
 | |
|      * and the DCB is closed.
 | |
|      */
 | |
|     if (MXS_AUTH_SUCCEEDED != auth_val &&
 | |
|         MXS_AUTH_INCOMPLETE != auth_val &&
 | |
|         MXS_AUTH_SSL_INCOMPLETE != auth_val)
 | |
|     {
 | |
|         protocol->protocol_auth_state = MXS_AUTH_STATE_FAILED;
 | |
|         mysql_client_auth_error_handling(dcb, auth_val, next_sequence);
 | |
|         /**
 | |
|          * Close DCB and which will release MYSQL_session
 | |
|          */
 | |
|         dcb_close(dcb);
 | |
|     }
 | |
|     /* One way or another, the buffer is now fully processed */
 | |
|     gwbuf_free(read_buffer);
 | |
|     return 0;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Helper function to split and store the buffer
 | |
|  * @param client_dcb Client DCB
 | |
|  * @param queue Buffer to split
 | |
|  * @param offset Offset where the split is made
 | |
|  * @return The first part of the buffer
 | |
|  */
 | |
| static GWBUF* split_and_store(DCB *client_dcb, GWBUF* queue, int offset)
 | |
| {
 | |
|     GWBUF* newbuf = gwbuf_split(&queue, offset);
 | |
|     dcb_readq_append(client_dcb, queue);
 | |
|     return newbuf;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Check if the DCB is idle from the protocol's point of view
 | |
|  *
 | |
|  * This checks if all expected data from the DCB has been read. The values
 | |
|  * prefixed with @c protocol_ should be manipulated by the protocol modules.
 | |
|  *
 | |
|  * @param dcb DCB to check
 | |
|  * @return True if the DCB protocol is not expecting any data
 | |
|  */
 | |
| static bool protocol_is_idle(DCB *dcb)
 | |
| {
 | |
|     return dcb->protocol_bytes_processed == dcb->protocol_packet_length;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Process the commands the client is executing
 | |
|  *
 | |
|  * The data read from the network is not guaranteed to contain a complete MySQL
 | |
|  * packet. This means that it is possible that a command sent by the client is
 | |
|  * split across multiple network packets and those packets need to be processed
 | |
|  * individually.
 | |
|  *
 | |
|  * The forwarding of the data to the routers starts once the length and command
 | |
|  * bytes have been read. The @c current_command field of the protocol
 | |
|  * structure is guaranteed to always represent the current command being executed
 | |
|  * by the client.
 | |
|  *
 | |
|  * Currently the gathered information is used by the readconnroute module to
 | |
|  * detect COM_CHANGE_USER packets.
 | |
|  *
 | |
|  * @param dcb Client MySQL protocol struct
 | |
|  * @param bytes_available Number of bytes available
 | |
|  * @param queue Data written by the client
 | |
|  * @return True if routing can proceed, false if processing should be attempted
 | |
|  * later when more data is available
 | |
|  */
 | |
| static bool process_client_commands(DCB* dcb, int bytes_available, GWBUF** buffer)
 | |
| {
 | |
|     GWBUF* queue = *buffer;
 | |
| 
 | |
|     /** Make sure we have enough data if the client is sending a new command */
 | |
|     if (protocol_is_idle(dcb) && bytes_available < MYSQL_HEADER_LEN)
 | |
|     {
 | |
|         dcb_readq_append(dcb, queue);
 | |
|         return false;
 | |
|     }
 | |
| 
 | |
|     int offset = 0;
 | |
| 
 | |
|     while (bytes_available)
 | |
|     {
 | |
|         if (protocol_is_idle(dcb))
 | |
|         {
 | |
|             int pktlen;
 | |
|             uint8_t cmd = (uint8_t)MXS_COM_QUERY; // Treat empty packets as COM_QUERY
 | |
| 
 | |
|             uint8_t packet_header[MYSQL_HEADER_LEN];
 | |
| 
 | |
|             if (gwbuf_copy_data(queue, offset, MYSQL_HEADER_LEN, packet_header) != MYSQL_HEADER_LEN)
 | |
|             {
 | |
|                 ss_dassert(offset > 0);
 | |
|                 queue = split_and_store(dcb, queue, offset);
 | |
|                 break;
 | |
|             }
 | |
| 
 | |
|             pktlen = gw_mysql_get_byte3(packet_header);
 | |
| 
 | |
|             /**
 | |
|              * Check if the packet is empty, and if not, if we have the command byte.
 | |
|              * If we an empty packet or have at least 5 bytes of data, we can start
 | |
|              * sending the data to the router.
 | |
|              */
 | |
|             if (pktlen && gwbuf_copy_data(queue, offset + MYSQL_HEADER_LEN, 1, &cmd) != 1)
 | |
|             {
 | |
|                 if ((queue = split_and_store(dcb, queue, offset)) == NULL)
 | |
|                 {
 | |
|                     ss_dassert(bytes_available - offset == MYSQL_HEADER_LEN);
 | |
|                     return false;
 | |
|                 }
 | |
|                 ss_dassert(offset > 0);
 | |
|                 break;
 | |
|             }
 | |
| 
 | |
|             MySQLProtocol *proto = (MySQLProtocol*)dcb->protocol;
 | |
|             if (dcb->protocol_packet_length - MYSQL_HEADER_LEN != GW_MYSQL_MAX_PACKET_LEN)
 | |
|             {
 | |
|                 /** We're processing the first packet of a command */
 | |
|                 proto->current_command = (mxs_mysql_cmd_t)cmd;
 | |
|             }
 | |
| 
 | |
|             dcb->protocol_packet_length = pktlen + MYSQL_HEADER_LEN;
 | |
|             dcb->protocol_bytes_processed = 0;
 | |
|         }
 | |
| 
 | |
|         int bytes_needed = dcb->protocol_packet_length - dcb->protocol_bytes_processed;
 | |
|         int packet_bytes = bytes_needed <= bytes_available ? bytes_needed : bytes_available;
 | |
| 
 | |
|         bytes_available -= packet_bytes;
 | |
|         dcb->protocol_bytes_processed += packet_bytes;
 | |
|         offset += packet_bytes;
 | |
|         ss_dassert(dcb->protocol_bytes_processed <= dcb->protocol_packet_length);
 | |
|     }
 | |
| 
 | |
|     ss_dassert(bytes_available >= 0);
 | |
|     ss_dassert(queue);
 | |
|     *buffer = queue;
 | |
|     return true;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Sets the query classifier mode.
 | |
|  *
 | |
|  * @param session      The session for which the query classifier mode is adjusted.
 | |
|  * @param read_buffer  Pointer to a buffer, assumed to contain a statement.
 | |
|  *                     May be reallocated if not contiguous.
 | |
|  */
 | |
| void set_qc_mode(MXS_SESSION* session, GWBUF** read_buffer)
 | |
| {
 | |
|     SetSqlModeParser parser;
 | |
|     SetSqlModeParser::sql_mode_t sql_mode;
 | |
| 
 | |
|     switch (parser.get_sql_mode(read_buffer, &sql_mode))
 | |
|     {
 | |
|     case SetSqlModeParser::ERROR:
 | |
|         // In practice only OOM.
 | |
|         break;
 | |
| 
 | |
|     case SetSqlModeParser::IS_SET_SQL_MODE:
 | |
|         switch (sql_mode)
 | |
|         {
 | |
|         case SetSqlModeParser::ORACLE:
 | |
|             session_set_autocommit(session, false);
 | |
|             session->client_protocol_data = QC_SQL_MODE_ORACLE;
 | |
|             break;
 | |
| 
 | |
|         case SetSqlModeParser::DEFAULT:
 | |
|             session_set_autocommit(session, true);
 | |
|             session->client_protocol_data = QC_SQL_MODE_DEFAULT;
 | |
|             break;
 | |
| 
 | |
|         case SetSqlModeParser::SOMETHING:
 | |
|             break;
 | |
| 
 | |
|         default:
 | |
|             ss_dassert(!true);
 | |
|         }
 | |
|         break;
 | |
| 
 | |
|     case SetSqlModeParser::NOT_SET_SQL_MODE:
 | |
|         break;
 | |
| 
 | |
|     default:
 | |
|         ss_dassert(!true);
 | |
|     }
 | |
| 
 | |
|     qc_set_sql_mode(static_cast<qc_sql_mode_t>(session->client_protocol_data));
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Client read event, process data, client already authenticated
 | |
|  *
 | |
|  * First do some checks and get the router capabilities.  If the router
 | |
|  * wants to process each individual statement, then the data must be split
 | |
|  * into individual SQL statements. Any data that is left over is held in the
 | |
|  * DCB read queue.
 | |
|  *
 | |
|  * Finally, the general client data processing function is called.
 | |
|  *
 | |
|  * @param dcb           Descriptor control block
 | |
|  * @param read_buffer   A buffer containing the data read from client
 | |
|  * @param nbytes_read   The number of bytes of data read
 | |
|  * @return 0 if succeed, 1 otherwise
 | |
|  */
 | |
| static int
 | |
| gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
 | |
| {
 | |
|     MXS_SESSION *session;
 | |
|     mxs_session_state_t session_state_value;
 | |
|     uint64_t capabilities = 0;
 | |
| 
 | |
|     session = dcb->session;
 | |
|     CHK_SESSION(session);
 | |
|     session_state_value = session->state;
 | |
|     if (session_state_value != SESSION_STATE_ROUTER_READY)
 | |
|     {
 | |
|         if (session_state_value != SESSION_STATE_STOPPING)
 | |
|         {
 | |
|             MXS_ERROR("Session received a query in incorrect state %s",
 | |
|                       STRSESSIONSTATE(session_state_value));
 | |
|         }
 | |
|         gwbuf_free(read_buffer);
 | |
|         dcb_close(dcb);
 | |
|         return 1;
 | |
|     }
 | |
| 
 | |
|     /** Ask what type of input the router/filter chain expects */
 | |
|     capabilities = service_get_capabilities(session->service);
 | |
| 
 | |
|     /** If the router requires statement input we need to make sure that
 | |
|      * a complete SQL packet is read before continuing. The current command
 | |
|      * that is tracked by the protocol module is updated in route_by_statement() */
 | |
|     if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT))
 | |
|     {
 | |
|         uint8_t pktlen[MYSQL_HEADER_LEN];
 | |
|         size_t n_copied = gwbuf_copy_data(read_buffer, 0, MYSQL_HEADER_LEN, pktlen);
 | |
| 
 | |
|         if (n_copied != sizeof(pktlen) ||
 | |
|             (uint32_t)nbytes_read < MYSQL_GET_PAYLOAD_LEN(pktlen) + MYSQL_HEADER_LEN)
 | |
|         {
 | |
|             dcb_readq_append(dcb, read_buffer);
 | |
|             return 0;
 | |
|         }
 | |
| 
 | |
|         set_qc_mode(session, &read_buffer);
 | |
|     }
 | |
|     /** Update the current protocol command being executed */
 | |
|     else if (!process_client_commands(dcb, nbytes_read, &read_buffer))
 | |
|     {
 | |
|         return 0;
 | |
|     }
 | |
| 
 | |
|     /** The query classifier classifies according to the service's server that has
 | |
|      * the smallest version number. */
 | |
|     qc_set_server_version(service_get_version(session->service, SERVICE_VERSION_MIN));
 | |
| 
 | |
|     spec_com_res_t res = process_special_commands(dcb, read_buffer, nbytes_read);
 | |
|     int rval = 1;
 | |
|     switch (res)
 | |
|     {
 | |
|     case RES_MORE_DATA:
 | |
|         dcb_readq_set(dcb, read_buffer);
 | |
|         rval = 0;
 | |
|         break;
 | |
| 
 | |
|     case RES_END:
 | |
|         // Do not send this packet for routing
 | |
|         gwbuf_free(read_buffer);
 | |
|         rval = 0;
 | |
|         break;
 | |
| 
 | |
|     case RES_CONTINUE:
 | |
|         rval = gw_read_finish_processing(dcb, read_buffer, capabilities);
 | |
|         break;
 | |
| 
 | |
|     default:
 | |
|         ss_dassert(!true);
 | |
|     }
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Check if a connection qualifies to be added into the persistent connection pool
 | |
|  *
 | |
|  * @param dcb The client DCB to check
 | |
|  */
 | |
| void check_pool_candidate(DCB* dcb)
 | |
| {
 | |
|     MXS_SESSION *session = dcb->session;
 | |
|     MySQLProtocol *proto = (MySQLProtocol*)dcb->protocol;
 | |
| 
 | |
|     if (proto->current_command == MXS_COM_QUIT)
 | |
|     {
 | |
|         /** The client is closing the connection. We know that this will be the
 | |
|          * last command the client sends so the backend connections are very likely
 | |
|          * to be in an idle state.
 | |
|          *
 | |
|          * If the client is pipelining the queries (i.e. sending N request as
 | |
|          * a batch and then expecting N responses) then it is possible that
 | |
|          * the backend connections are not idle when the COM_QUIT is received.
 | |
|          * In most cases we can assume that the connections are idle. */
 | |
|         session_qualify_for_pool(session);
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Client read event, common processing after single statement handling
 | |
|  *
 | |
|  * @param dcb           Descriptor control block
 | |
|  * @param read_buffer   A buffer containing the data read from client
 | |
|  * @param capabilities  The router capabilities flags
 | |
|  * @return 0 if succeed, 1 otherwise
 | |
|  */
 | |
| static int
 | |
| gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities)
 | |
| {
 | |
|     MXS_SESSION *session = dcb->session;
 | |
|     uint8_t *payload = GWBUF_DATA(read_buffer);
 | |
|     MySQLProtocol *proto = (MySQLProtocol*)dcb->protocol;
 | |
|     CHK_PROTOCOL(proto);
 | |
|     int return_code = 0;
 | |
| 
 | |
|     /** Reset error handler when routing of the new query begins */
 | |
|     dcb->dcb_errhandle_called = false;
 | |
| 
 | |
|     if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT))
 | |
|     {
 | |
|         /**
 | |
|          * Feed each statement completely and separately to router.
 | |
|          */
 | |
|         return_code = route_by_statement(session, capabilities, &read_buffer) ? 0 : 1;
 | |
| 
 | |
|         if (read_buffer != NULL)
 | |
|         {
 | |
|             /* Must have been data left over */
 | |
|             /* Add incomplete mysql packet to read queue */
 | |
| 
 | |
|             dcb_readq_append(dcb, read_buffer);
 | |
| 
 | |
|         }
 | |
|     }
 | |
|     else if (NULL != session->router_session || (rcap_type_required(capabilities, RCAP_TYPE_NO_RSESSION)))
 | |
|     {
 | |
|         /** Check if this connection qualifies for the connection pool */
 | |
|         check_pool_candidate(dcb);
 | |
| 
 | |
|         /** Feed the whole buffer to the router */
 | |
|         return_code = MXS_SESSION_ROUTE_QUERY(session, read_buffer) ? 0 : 1;
 | |
|     }
 | |
|     /* else return_code is still 0 from when it was originally set */
 | |
|     /* Note that read_buffer has been freed or transferred by this point */
 | |
| 
 | |
|     if (return_code != 0)
 | |
|     {
 | |
|         /** Routing failed, close the client connection */
 | |
|         dcb_close(dcb);
 | |
|         MXS_ERROR("Routing the query failed. Session will be closed.");
 | |
|     }
 | |
|     else if (proto->current_command == MXS_COM_QUIT)
 | |
|     {
 | |
|         /** Close router session which causes closing of backends */
 | |
|         dcb_close(dcb);
 | |
|     }
 | |
| 
 | |
|     return return_code;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Analyse authentication errors and write appropriate log messages
 | |
|  *
 | |
|  * @param dcb Request handler DCB connected to the client
 | |
|  * @param auth_val The type of authentication failure
 | |
|  * @note Authentication status codes are defined in maxscale/protocol/mysql.h
 | |
|  */
 | |
| static void
 | |
| mysql_client_auth_error_handling(DCB *dcb, int auth_val, int packet_number)
 | |
| {
 | |
|     int message_len;
 | |
|     char *fail_str = NULL;
 | |
|     MYSQL_session *session = (MYSQL_session*)dcb->data;
 | |
| 
 | |
|     switch (auth_val)
 | |
|     {
 | |
|     case MXS_AUTH_NO_SESSION:
 | |
|         MXS_DEBUG("session creation failed. fd %d, state = MYSQL_AUTH_NO_SESSION.", dcb->fd);
 | |
| 
 | |
|         /** Send ERR 1045 to client */
 | |
|         mysql_send_auth_error(dcb, packet_number, 0, "failed to create new session");
 | |
|         break;
 | |
| 
 | |
|     case MXS_AUTH_FAILED_DB:
 | |
|         MXS_DEBUG("database specified was not valid. fd %d, state = MYSQL_FAILED_AUTH_DB.", dcb->fd);
 | |
|         /** Send error 1049 to client */
 | |
|         message_len = 25 + MYSQL_DATABASE_MAXLEN;
 | |
| 
 | |
|         fail_str = (char*)MXS_CALLOC(1, message_len + 1);
 | |
|         MXS_ABORT_IF_NULL(fail_str);
 | |
|         snprintf(fail_str, message_len, "Unknown database '%s'", session->db);
 | |
| 
 | |
|         modutil_send_mysql_err_packet(dcb, packet_number, 0, 1049, "42000", fail_str);
 | |
|         break;
 | |
| 
 | |
|     case MXS_AUTH_FAILED_SSL:
 | |
|         MXS_DEBUG("client is "
 | |
|                   "not SSL capable for SSL listener. fd %d, "
 | |
|                   "state = MYSQL_FAILED_AUTH_SSL.", dcb->fd);
 | |
| 
 | |
|         /** Send ERR 1045 to client */
 | |
|         mysql_send_auth_error(dcb, packet_number, 0, "Access without SSL denied");
 | |
|         break;
 | |
| 
 | |
|     case MXS_AUTH_SSL_INCOMPLETE:
 | |
|         MXS_DEBUG("unable to complete SSL authentication. fd %d, "
 | |
|                   "state = MYSQL_AUTH_SSL_INCOMPLETE.", dcb->fd);
 | |
| 
 | |
|         /** Send ERR 1045 to client */
 | |
|         mysql_send_auth_error(dcb, packet_number, 0,
 | |
|                               "failed to complete SSL authentication");
 | |
|         break;
 | |
| 
 | |
|     case MXS_AUTH_FAILED:
 | |
|         MXS_DEBUG("authentication failed. fd %d, state = MYSQL_FAILED_AUTH.", dcb->fd);
 | |
|         /** Send error 1045 to client */
 | |
|         fail_str = create_auth_fail_str(session->user, dcb->remote,
 | |
|                                         session->auth_token_len > 0,
 | |
|                                         session->db, auth_val);
 | |
|         modutil_send_mysql_err_packet(dcb, packet_number, 0, 1045, "28000", fail_str);
 | |
|         break;
 | |
| 
 | |
|     default:
 | |
|         MXS_DEBUG("authentication failed. fd %d, state unrecognized.", dcb->fd);
 | |
|         /** Send error 1045 to client */
 | |
|         fail_str = create_auth_fail_str(session->user, dcb->remote,
 | |
|                                         session->auth_token_len > 0,
 | |
|                                         session->db, auth_val);
 | |
|         modutil_send_mysql_err_packet(dcb, packet_number, 0, 1045, "28000", fail_str);
 | |
|     }
 | |
|     MXS_FREE(fail_str);
 | |
| }
 | |
| 
 | |
| static int
 | |
| gw_connection_limit(DCB *dcb, int limit)
 | |
| {
 | |
|     return mysql_send_standard_error(dcb, 0, 1040, "Too many connections");
 | |
| }
 | |
| ///////////////////////////////////////////////
 | |
| // client write event to Client triggered by EPOLLOUT
 | |
| //////////////////////////////////////////////
 | |
| /**
 | |
|  * @node Client's fd became writable, and EPOLLOUT event
 | |
|  * arrived. As a consequence, client input buffer (writeq) is flushed.
 | |
|  *
 | |
|  * Parameters:
 | |
|  * @param dcb - in, use
 | |
|  *          client dcb
 | |
|  *
 | |
|  * @return constantly 1
 | |
|  *
 | |
|  *
 | |
|  * @details (write detailed description here)
 | |
|  *
 | |
|  */
 | |
| int gw_write_client_event(DCB *dcb)
 | |
| {
 | |
|     MySQLProtocol *protocol = NULL;
 | |
| 
 | |
|     CHK_DCB(dcb);
 | |
| 
 | |
|     ss_dassert(dcb->state != DCB_STATE_DISCONNECTED);
 | |
| 
 | |
|     if (dcb == NULL)
 | |
|     {
 | |
|         goto return_1;
 | |
|     }
 | |
| 
 | |
|     if (dcb->state == DCB_STATE_DISCONNECTED)
 | |
|     {
 | |
|         goto return_1;
 | |
|     }
 | |
| 
 | |
|     if (dcb->protocol == NULL)
 | |
|     {
 | |
|         goto return_1;
 | |
|     }
 | |
|     protocol = (MySQLProtocol *)dcb->protocol;
 | |
|     CHK_PROTOCOL(protocol);
 | |
| 
 | |
|     if (protocol->protocol_auth_state == MXS_AUTH_STATE_COMPLETE)
 | |
|     {
 | |
|         dcb_drain_writeq(dcb);
 | |
|         goto return_1;
 | |
|     }
 | |
| 
 | |
| return_1:
 | |
| #if defined(SS_DEBUG)
 | |
|     if (dcb->state == DCB_STATE_POLLING ||
 | |
|         dcb->state == DCB_STATE_NOPOLLING)
 | |
|     {
 | |
|         CHK_PROTOCOL(protocol);
 | |
|     }
 | |
| #endif
 | |
|     return 1;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Bind the DCB to a network port or a UNIX Domain Socket.
 | |
|  * @param listen_dcb Listener DCB
 | |
|  * @param config_bind Bind address in either IP:PORT format for network sockets or PATH
 | |
|  *                    for UNIX Domain Sockets
 | |
|  * @return 1 on success, 0 on error
 | |
|  */
 | |
| int gw_MySQLListener(DCB *listen_dcb, char *config_bind)
 | |
| {
 | |
|     if (dcb_listen(listen_dcb, config_bind, "MySQL") < 0)
 | |
|     {
 | |
|         return 0;
 | |
|     }
 | |
|     listen_dcb->func.accept = gw_MySQLAccept;
 | |
| 
 | |
|     return 1;
 | |
| }
 | |
| 
 | |
| 
 | |
| /**
 | |
|  * @node Accept a new connection, using the DCB code for the basic work
 | |
|  *
 | |
|  * For as long as dcb_accept can return new client DCBs for new connections,
 | |
|  * continue to loop. The code will always give a failure return, since it
 | |
|  * continues to try to create new connections until a failure occurs.
 | |
|  *
 | |
|  * @param listener - The Listener DCB that picks up new connection requests
 | |
|  * @return 0 in success, 1 in failure
 | |
|  *
 | |
|  */
 | |
| int gw_MySQLAccept(DCB *listener)
 | |
| {
 | |
|     DCB *client_dcb;
 | |
| 
 | |
|     CHK_DCB(listener);
 | |
| 
 | |
|     if (DCB_STATE_WAITING == listener->state)
 | |
|     {
 | |
|         gw_process_one_new_client(listener);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         while ((client_dcb = dcb_accept(listener)) != NULL)
 | |
|         {
 | |
|             gw_process_one_new_client(client_dcb);
 | |
|         } /**< while client_dcb != NULL */
 | |
|     }
 | |
| 
 | |
|     /* Must have broken out of while loop or received NULL client_dcb */
 | |
|     return 1;
 | |
| }
 | |
| 
 | |
| static void gw_process_one_new_client(DCB *client_dcb)
 | |
| {
 | |
|     MySQLProtocol *protocol;
 | |
| 
 | |
|     CHK_DCB(client_dcb);
 | |
|     protocol = mysql_protocol_init(client_dcb, client_dcb->fd);
 | |
| 
 | |
|     if (protocol == NULL)
 | |
|     {
 | |
|         /** delete client_dcb */
 | |
|         dcb_close(client_dcb);
 | |
|         MXS_ERROR("Failed to create protocol object for client connection.");
 | |
|         return;
 | |
|     }
 | |
|     CHK_PROTOCOL(protocol);
 | |
|     client_dcb->protocol = protocol;
 | |
|     if (DCB_STATE_WAITING == client_dcb->state)
 | |
|     {
 | |
|         client_dcb->state = DCB_STATE_ALLOC;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         atomic_add(&client_dcb->service->client_count, 1);
 | |
|     }
 | |
|     //send handshake to the client_dcb
 | |
|     MySQLSendHandshake(client_dcb);
 | |
| 
 | |
|     // client protocol state change
 | |
|     protocol->protocol_auth_state = MXS_AUTH_STATE_MESSAGE_READ;
 | |
| 
 | |
|     /**
 | |
|      * Set new descriptor to event set. At the same time,
 | |
|      * change state to DCB_STATE_POLLING so that
 | |
|      * thread which wakes up sees correct state.
 | |
|      */
 | |
|     if (poll_add_dcb(client_dcb) == -1)
 | |
|     {
 | |
|         /* Send a custom error as MySQL command reply */
 | |
|         mysql_send_custom_error(client_dcb,
 | |
|                                 1,
 | |
|                                 0,
 | |
|                                 "MaxScale encountered system limit while "
 | |
|                                 "attempting to register on an epoll instance.");
 | |
| 
 | |
|         /** close client_dcb */
 | |
|         dcb_close(client_dcb);
 | |
| 
 | |
|         /** Previous state is recovered in poll_add_dcb. */
 | |
|         MXS_ERROR("Failed to add dcb %p for fd %d to epoll set.",
 | |
|                   client_dcb, client_dcb->fd);
 | |
|         return;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         MXS_DEBUG("Added dcb %p for fd %d to epoll set.",
 | |
|                   client_dcb, client_dcb->fd);
 | |
|     }
 | |
|     return;
 | |
| }
 | |
| 
 | |
| static int gw_error_client_event(DCB* dcb)
 | |
| {
 | |
|     MXS_SESSION* session;
 | |
| 
 | |
|     CHK_DCB(dcb);
 | |
| 
 | |
|     session = dcb->session;
 | |
| 
 | |
|     if (session != NULL && session->state == SESSION_STATE_STOPPING)
 | |
|     {
 | |
|         goto retblock;
 | |
|     }
 | |
| 
 | |
| #if defined(SS_DEBUG)
 | |
|     MXS_DEBUG("Client error event handling.");
 | |
| #endif
 | |
|     dcb_close(dcb);
 | |
| 
 | |
| retblock:
 | |
|     return 1;
 | |
| }
 | |
| 
 | |
| static int gw_client_close(DCB *dcb)
 | |
| {
 | |
|     CHK_DCB(dcb);
 | |
|     ss_dassert(dcb->protocol);
 | |
| 
 | |
|     if (mysql_protocol_done(dcb))
 | |
|     {
 | |
|         MXS_SESSION* target = dcb->session;
 | |
| 
 | |
|         if (target->state != SESSION_STATE_TO_BE_FREED &&
 | |
|             target->state != SESSION_STATE_DUMMY)
 | |
|         {
 | |
|             ss_dassert(target->state == SESSION_STATE_ROUTER_READY ||
 | |
|                        target->state == SESSION_STATE_STOPPING);
 | |
|             ss_debug(bool removed = ) mxs_worker_deregister_session(target->ses_id);
 | |
|             ss_dassert(removed);
 | |
|             session_close(target);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return 1;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Handle a hangup event on the client side descriptor.
 | |
|  *
 | |
|  * We simply close the DCB, this will propagate the closure to any
 | |
|  * backend descriptors and perform the session cleanup.
 | |
|  *
 | |
|  * @param dcb           The DCB of the connection
 | |
|  */
 | |
| static int gw_client_hangup_event(DCB *dcb)
 | |
| {
 | |
|     CHK_DCB(dcb);
 | |
|     MXS_SESSION* session = dcb->session;
 | |
| 
 | |
|     if (session)
 | |
|     {
 | |
|         CHK_SESSION(session);
 | |
|         if (session->state != SESSION_STATE_DUMMY && !session_valid_for_pool(session))
 | |
|         {
 | |
|             // The client did not send a COM_QUIT packet
 | |
|             modutil_send_mysql_err_packet(dcb, 0, 0, 1927, "08S01", "Connection killed by MaxScale");
 | |
|         }
 | |
|         dcb_close(dcb);
 | |
|     }
 | |
| 
 | |
|     return 1;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Update protocol tracking information for an individual statement
 | |
|  *
 | |
|  * @param dcb    Client DCB
 | |
|  * @param buffer Buffer containing a single packet
 | |
|  */
 | |
| void update_current_command(DCB* dcb, GWBUF* buffer)
 | |
| {
 | |
|     MySQLProtocol *proto = (MySQLProtocol*)dcb->protocol;
 | |
|     uint8_t cmd = (uint8_t)MXS_COM_QUERY;
 | |
| 
 | |
|     /**
 | |
|      * As we are routing individual packets, we can extract the command byte here.
 | |
|      * Empty packets are treated as COM_QUERY packets by default.
 | |
|      */
 | |
|     gwbuf_copy_data(buffer, MYSQL_HEADER_LEN, 1, &cmd);
 | |
|     proto->current_command = (mxs_mysql_cmd_t)cmd;
 | |
| 
 | |
|     /**
 | |
|      * Now that we have the current command, we can check if this connection
 | |
|      * can be a candidate for the connection pool.
 | |
|      */
 | |
|     check_pool_candidate(dcb);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Detect if buffer includes partial mysql packet or multiple packets.
 | |
|  * Store partial packet to dcb_readqueue. Send complete packets one by one
 | |
|  * to router.
 | |
|  *
 | |
|  * It is assumed readbuf includes at least one complete packet.
 | |
|  * Return 1 in success. If the last packet is incomplete return success but
 | |
|  * leave incomplete packet to readbuf.
 | |
|  *
 | |
|  * @param session       Session pointer
 | |
|  * @param capabilities  The capabilities of the service.
 | |
|  * @param p_readbuf     Pointer to the address of GWBUF including the query
 | |
|  *
 | |
|  * @return 1 if succeed,
 | |
|  */
 | |
| static int route_by_statement(MXS_SESSION* session, uint64_t capabilities, GWBUF** p_readbuf)
 | |
| {
 | |
|     int rc;
 | |
|     GWBUF* packetbuf;
 | |
|     do
 | |
|     {
 | |
|         /**
 | |
|          * Collect incoming bytes to a buffer until complete packet has
 | |
|          * arrived and then return the buffer.
 | |
|          */
 | |
|         // TODO: This should be replaced with modutil_get_next_MySQL_packet.
 | |
|         packetbuf = gw_MySQL_get_next_packet(p_readbuf);
 | |
| 
 | |
|         if (packetbuf != NULL)
 | |
|         {
 | |
|             CHK_GWBUF(packetbuf);
 | |
| 
 | |
|             /**
 | |
|              * Update the currently command being executed.
 | |
|              */
 | |
|             update_current_command(session->client_dcb, packetbuf);
 | |
| 
 | |
|             if (rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_INPUT))
 | |
|             {
 | |
|                 if (!GWBUF_IS_CONTIGUOUS(packetbuf))
 | |
|                 {
 | |
|                     // TODO: As long as gw_MySQL_get_next_packet is used above, the buffer
 | |
|                     // TODO: will be contiguous. That function should be replaced with
 | |
|                     // TODO: modutil_get_next_MySQL_packet.
 | |
|                     GWBUF* tmp = gwbuf_make_contiguous(packetbuf);
 | |
|                     if (tmp)
 | |
|                     {
 | |
|                         packetbuf = tmp;
 | |
|                     }
 | |
|                     else
 | |
|                     {
 | |
|                         // TODO: A memory allocation failure. We should close the dcb
 | |
|                         // TODO: and terminate the session.
 | |
|                         rc = 0;
 | |
|                         goto return_rc;
 | |
|                     }
 | |
|                 }
 | |
|                 SERVICE *service = session->client_dcb->service;
 | |
|                 if (rcap_type_required(capabilities, RCAP_TYPE_TRANSACTION_TRACKING) && !service->session_track_trx_state)
 | |
|                 {
 | |
|                     if (session_trx_is_ending(session))
 | |
|                     {
 | |
|                         session_set_trx_state(session, SESSION_TRX_INACTIVE);
 | |
|                     }
 | |
| 
 | |
|                     if (mxs_mysql_get_command(packetbuf) == MXS_COM_QUERY)
 | |
|                     {
 | |
|                         uint32_t type = qc_get_trx_type_mask(packetbuf);
 | |
| 
 | |
|                         if (type & QUERY_TYPE_BEGIN_TRX)
 | |
|                         {
 | |
|                             if (type & QUERY_TYPE_DISABLE_AUTOCOMMIT)
 | |
|                             {
 | |
|                                 session_set_autocommit(session, false);
 | |
|                                 session_set_trx_state(session, SESSION_TRX_INACTIVE);
 | |
|                             }
 | |
|                             else
 | |
|                             {
 | |
|                                 mxs_session_trx_state_t trx_state;
 | |
|                                 if (type & QUERY_TYPE_WRITE)
 | |
|                                 {
 | |
|                                     trx_state = SESSION_TRX_READ_WRITE;
 | |
|                                 }
 | |
|                                 else if (type & QUERY_TYPE_READ)
 | |
|                                 {
 | |
|                                     trx_state = SESSION_TRX_READ_ONLY;
 | |
|                                 }
 | |
|                                 else
 | |
|                                 {
 | |
|                                     trx_state = SESSION_TRX_ACTIVE;
 | |
|                                 }
 | |
| 
 | |
|                                 session_set_trx_state(session, trx_state);
 | |
|                             }
 | |
|                         }
 | |
|                         else if ((type & QUERY_TYPE_COMMIT) || (type & QUERY_TYPE_ROLLBACK))
 | |
|                         {
 | |
|                             uint32_t trx_state = session_get_trx_state(session);
 | |
|                             trx_state |= SESSION_TRX_ENDING_BIT;
 | |
|                             session_set_trx_state(session, (mxs_session_trx_state_t)trx_state);
 | |
| 
 | |
|                             if (type & QUERY_TYPE_ENABLE_AUTOCOMMIT)
 | |
|                             {
 | |
|                                 session_set_autocommit(session, true);
 | |
|                             }
 | |
|                         }
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             /** Route query */
 | |
|             rc = MXS_SESSION_ROUTE_QUERY(session, packetbuf);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             rc = 1;
 | |
|             goto return_rc;
 | |
|         }
 | |
|     }
 | |
|     while (rc == 1 && *p_readbuf != NULL);
 | |
| 
 | |
| return_rc:
 | |
|     return rc;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Some SQL commands/queries need to be detected and handled by the protocol
 | |
|  * and MaxScale instead of being routed forward as is.
 | |
|  *
 | |
|  * @param dcb Client dcb
 | |
|  * @param read_buffer the current read buffer
 | |
|  * @param nbytes_read How many bytes were read
 | |
|  * @return see @c spec_com_res_t
 | |
|  */
 | |
| static spec_com_res_t process_special_commands(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
 | |
| {
 | |
|     spec_com_res_t rval = RES_CONTINUE;
 | |
|     bool is_complete = false;
 | |
|     unsigned int packet_len =
 | |
|         MYSQL_GET_PAYLOAD_LEN((uint8_t *)GWBUF_DATA(read_buffer)) + MYSQL_HEADER_LEN;
 | |
|     if (gwbuf_length(read_buffer) == packet_len)
 | |
|     {
 | |
|         is_complete = true;
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Handle COM_SET_OPTION. This seems to be only used by some versions of PHP.
 | |
|      *
 | |
|      * The option is stored as a two byte integer with the values 0 for enabling
 | |
|      * multi-statements and 1 for disabling it.
 | |
|      */
 | |
|     MySQLProtocol *proto = (MySQLProtocol*)dcb->protocol;
 | |
|     uint8_t opt;
 | |
| 
 | |
|     if (proto->current_command == MXS_COM_SET_OPTION &&
 | |
|         gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 2, 1, &opt))
 | |
|     {
 | |
|         if (opt)
 | |
|         {
 | |
|             proto->client_capabilities &= ~GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             proto->client_capabilities |= GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS;
 | |
|         }
 | |
|     }
 | |
|     /**
 | |
|      * Handle COM_PROCESS_KILL
 | |
|      */
 | |
|     else if (proto->current_command == MXS_COM_PROCESS_KILL)
 | |
|     {
 | |
|         /* Make sure we have a complete SQL packet before trying to read the
 | |
|          * process id. If not, try again next time. */
 | |
|         if (!is_complete)
 | |
|         {
 | |
|             rval = RES_MORE_DATA;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             uint8_t bytes[4];
 | |
|             if (gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 1, sizeof(bytes), bytes)
 | |
|                 == sizeof(bytes))
 | |
|             {
 | |
|                 uint64_t process_id = gw_mysql_get_byte4(bytes);
 | |
|                 mxs_mysql_execute_kill(dcb->session, process_id, KT_CONNECTION);
 | |
|                 rval = RES_END;
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     else if (proto->current_command == MXS_COM_QUERY)
 | |
|     {
 | |
|         /* Limits on the length of the queries in which "KILL" is searched for. Reducing
 | |
|          * LONGEST_KILL will reduce overhead but also limit the range of accepted queries. */
 | |
|         const int SHORTEST_KILL = sizeof("KILL 1") - 1;
 | |
|         const int LONGEST_KILL = sizeof("KILL CONNECTION 12345678901234567890 ;");
 | |
|         /* Is length within limits for a kill-type query? */
 | |
|         if (packet_len >= (MYSQL_HEADER_LEN + 1 + SHORTEST_KILL) &&
 | |
|             packet_len <= (MYSQL_HEADER_LEN + 1 + LONGEST_KILL))
 | |
|         {
 | |
|             rval = handle_query_kill(dcb, read_buffer, rval, is_complete, packet_len);
 | |
|         }
 | |
|     }
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Handle text version of KILL [CONNECTION | QUERY] <process_id>. Only detects
 | |
|  * commands in the beginning of the packet and with no comments.
 | |
|  * Increased parsing would slow down the handling of every single query.
 | |
|  *
 | |
|  * @param dcb Client dcb
 | |
|  * @param read_buffer Input buffer
 | |
|  * @param current Latest value of rval in calling function
 | |
|  * @param is_complete Is read_buffer a complete sql packet
 | |
|  * @param packet_len Read from sql header
 | |
|  * @return Updated (or old) value of rval
 | |
|  */
 | |
| spec_com_res_t handle_query_kill(DCB* dcb, GWBUF* read_buffer, spec_com_res_t current,
 | |
|                                  bool is_complete, unsigned int packet_len)
 | |
| {
 | |
|     spec_com_res_t rval = current;
 | |
|     /* First, we need to detect the text "KILL" (ignorecase) in the start
 | |
|      * of the packet. Copy just enough characters. */
 | |
|     const size_t KILL_BEGIN_LEN = sizeof(WORD_KILL) - 1;
 | |
|     char startbuf[KILL_BEGIN_LEN]; // Not 0-terminated, careful...
 | |
|     size_t copied_len = gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 1,
 | |
|                                         KILL_BEGIN_LEN, (uint8_t*)startbuf);
 | |
|     if (is_complete)
 | |
|     {
 | |
|         if (strncasecmp(WORD_KILL, startbuf, KILL_BEGIN_LEN) == 0)
 | |
|         {
 | |
|             /* Good chance that the query is a KILL-query. Copy the entire
 | |
|              * buffer and process. */
 | |
|             size_t buffer_len = packet_len - (MYSQL_HEADER_LEN + 1);
 | |
|             char querybuf[buffer_len + 1]; // 0-terminated
 | |
|             copied_len = gwbuf_copy_data(read_buffer,
 | |
|                                          MYSQL_HEADER_LEN + 1,
 | |
|                                          buffer_len,
 | |
|                                          (uint8_t*)querybuf);
 | |
|             querybuf[copied_len] = '\0';
 | |
|             kill_type_t kt = KT_CONNECTION;
 | |
|             uint64_t thread_id = 0;
 | |
|             rval = RES_END;
 | |
|             std::string user;
 | |
| 
 | |
|             if (parse_kill_query(querybuf, &thread_id, &kt, &user))
 | |
|             {
 | |
|                 if (thread_id > 0)
 | |
|                 {
 | |
|                     mxs_mysql_execute_kill(dcb->session, thread_id, kt);
 | |
|                 }
 | |
|                 else if (!user.empty())
 | |
|                 {
 | |
|                     mxs_mysql_execute_kill_user(dcb->session, user.c_str(), kt);
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         /* Look at the start of the query and see if it might contain "KILL" */
 | |
|         if (strncasecmp(WORD_KILL, startbuf, copied_len) == 0)
 | |
|         {
 | |
|             rval = RES_MORE_DATA;
 | |
|         }
 | |
|     }
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| static void extract_user(char* token, std::string* user)
 | |
| {
 | |
|     char* end = strchr(token, ';');
 | |
| 
 | |
|     if (end)
 | |
|     {
 | |
|         user->assign(token, end - token);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         user->assign(token);
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Parse a "KILL [CONNECTION | QUERY] [ <process_id> | USER <username> ]" query.
 | |
|  * Will modify the argument string even if unsuccessful.
 | |
|  *
 | |
|  * @param query Query string to parse
 | |
|  * @paran thread_id_out Thread id output
 | |
|  * @param kt_out Kill command type output
 | |
|  * @return true on success, false on error
 | |
|  */
 | |
| static bool parse_kill_query(char *query, uint64_t *thread_id_out, kill_type_t *kt_out, std::string* user)
 | |
| {
 | |
|     const char WORD_CONNECTION[] = "CONNECTION";
 | |
|     const char WORD_QUERY[] = "QUERY";
 | |
|     const char WORD_HARD[] = "HARD";
 | |
|     const char WORD_SOFT[] = "SOFT";
 | |
|     const char WORD_USER[] = "USER";
 | |
|     const char DELIM[] = " \n\t";
 | |
| 
 | |
|     int kill_type = KT_CONNECTION;
 | |
|     unsigned long long int thread_id = 0;
 | |
|     std::string tmpuser;
 | |
| 
 | |
|     enum kill_parse_state_t
 | |
|     {
 | |
|         KILL,
 | |
|         CONN_QUERY,
 | |
|         ID,
 | |
|         USER,
 | |
|         SEMICOLON,
 | |
|         DONE
 | |
|     } state = KILL;
 | |
|     char *saveptr = NULL;
 | |
|     bool error = false;
 | |
| 
 | |
|     char *token = strtok_r(query, DELIM, &saveptr);
 | |
| 
 | |
|     while (token && !error)
 | |
|     {
 | |
|         bool get_next = false;
 | |
|         switch (state)
 | |
|         {
 | |
|         case KILL:
 | |
|             if (strncasecmp(token, WORD_KILL, sizeof(WORD_KILL) - 1) == 0)
 | |
|             {
 | |
|                 state = CONN_QUERY;
 | |
|                 get_next = true;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 error = true;
 | |
|             }
 | |
|             break;
 | |
| 
 | |
|         case CONN_QUERY:
 | |
|             if (strncasecmp(token, WORD_QUERY, sizeof(WORD_QUERY) - 1) == 0)
 | |
|             {
 | |
|                 kill_type &= ~KT_CONNECTION;
 | |
|                 kill_type |= KT_QUERY;
 | |
|                 get_next = true;
 | |
|             }
 | |
|             else if (strncasecmp(token, WORD_CONNECTION, sizeof(WORD_CONNECTION) - 1) == 0)
 | |
|             {
 | |
|                 get_next = true;
 | |
|             }
 | |
| 
 | |
|             if (strncasecmp(token, WORD_HARD, sizeof(WORD_HARD) - 1) == 0)
 | |
|             {
 | |
|                 kill_type |= KT_HARD;
 | |
|                 get_next = true;
 | |
|             }
 | |
|             else if (strncasecmp(token, WORD_SOFT, sizeof(WORD_SOFT) - 1) == 0)
 | |
|             {
 | |
|                 kill_type |= KT_SOFT;
 | |
|                 get_next = true;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 /* Move to next state regardless of comparison result. The current
 | |
|                  * part is optional and the process id may already be in the token. */
 | |
|                 state = ID;
 | |
|             }
 | |
|             break;
 | |
| 
 | |
|         case ID:
 | |
|             if (strncasecmp(token, WORD_USER, sizeof(WORD_USER) - 1) == 0)
 | |
|             {
 | |
|                 state = USER;
 | |
|                 get_next = true;
 | |
|                 break;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 char *endptr_id = NULL;
 | |
| 
 | |
|                 long long int l  = strtoll(token, &endptr_id, 0);
 | |
| 
 | |
|                 if ((l == LLONG_MAX && errno == ERANGE) ||
 | |
|                     (*endptr_id != '\0' && *endptr_id != ';') ||
 | |
|                     l <= 0 || endptr_id == token)
 | |
|                 {
 | |
|                     // Not a positive 32-bit integer
 | |
|                     error = true;
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     ss_dassert(*endptr_id == '\0' || *endptr_id == ';');
 | |
|                     state = SEMICOLON; // In case we have space before ;
 | |
|                     get_next = true;
 | |
|                     thread_id = l;
 | |
|                 }
 | |
|             }
 | |
|             break;
 | |
| 
 | |
|         case USER:
 | |
|             extract_user(token, &tmpuser);
 | |
|             state = SEMICOLON;
 | |
|             get_next = true;
 | |
|             break;
 | |
| 
 | |
|         case SEMICOLON:
 | |
|             if (strncmp(token, ";", 1) == 0)
 | |
|             {
 | |
|                 state = DONE;
 | |
|                 get_next = true;
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 error = true;
 | |
|             }
 | |
|             break;
 | |
| 
 | |
|         default:
 | |
|             error = true;
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|         if (get_next)
 | |
|         {
 | |
|             token = strtok_r(NULL, DELIM, &saveptr);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     if (error || (state != DONE && state != SEMICOLON))
 | |
|     {
 | |
|         return false;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         *thread_id_out = thread_id;
 | |
|         *kt_out = (kill_type_t)kill_type;
 | |
|         *user = tmpuser;
 | |
|         return true;
 | |
|     }
 | |
| }
 | |
| 
 | |
| /*
 | |
| * Mapping three session tracker's info to mxs_session_trx_state_t
 | |
| * SESSION_TRACK_STATE_CHANGE:
 | |
| *   Get lasted autocommit value;
 | |
| *   https://dev.mysql.com/worklog/task/?id=6885
 | |
| * SESSION_TRACK_TRANSACTION_TYPE:
 | |
| *   Get transaction boundaries
 | |
| *   TX_EMPTY                  => SESSION_TRX_INACTIVE
 | |
| *   TX_EXPLICIT | TX_IMPLICIT => SESSION_TRX_ACTIVE
 | |
| *   https://dev.mysql.com/worklog/task/?id=6885
 | |
| * SESSION_TRACK_TRANSACTION_CHARACTERISTICS
 | |
| *   Get trx characteristics such as read only, read write, snapshot ...
 | |
| *
 | |
| */
 | |
| static void parse_and_set_trx_state(MXS_SESSION *ses, GWBUF *data)
 | |
| {
 | |
|     char *autocommit = gwbuf_get_property(data, (char *)"autocommit");
 | |
| 
 | |
|     if (autocommit)
 | |
|     {
 | |
|         MXS_DEBUG("autocommit:%s", autocommit);
 | |
|         if (strncasecmp(autocommit, "ON", 2) == 0)
 | |
|         {
 | |
|             session_set_autocommit(ses, true);
 | |
|         }
 | |
|         if (strncasecmp(autocommit, "OFF", 3) == 0)
 | |
|         {
 | |
|             session_set_autocommit(ses, false);
 | |
|         }
 | |
|     }
 | |
|     char *trx_state = gwbuf_get_property(data, (char *)"trx_state");
 | |
|     if (trx_state)
 | |
|     {
 | |
|         mysql_tx_state_t s = parse_trx_state(trx_state);
 | |
| 
 | |
|         if (s == TX_EMPTY)
 | |
|         {
 | |
|             session_set_trx_state(ses, SESSION_TRX_INACTIVE);
 | |
|         }
 | |
|         else if ((s & TX_EXPLICIT) ||  (s & TX_IMPLICIT))
 | |
|         {
 | |
|             session_set_trx_state(ses, SESSION_TRX_ACTIVE);
 | |
|         }
 | |
|     }
 | |
|     char *trx_characteristics = gwbuf_get_property(data, (char *)"trx_characteristics");
 | |
|     if (trx_characteristics)
 | |
|     {
 | |
|         if (strncmp(trx_characteristics, "START TRANSACTION READ ONLY;", 28) == 0)
 | |
|         {
 | |
|             session_set_trx_state(ses, SESSION_TRX_READ_ONLY);
 | |
|         }
 | |
| 
 | |
|         if (strncmp(trx_characteristics, "START TRANSACTION READ WRITE;", 29) == 0)
 | |
|         {
 | |
|             session_set_trx_state(ses, SESSION_TRX_READ_WRITE);
 | |
|         }
 | |
|     }
 | |
|     MXS_DEBUG("trx state:%s", session_trx_state_to_string(ses->trx_state));
 | |
|     MXS_DEBUG("autcommit:%s", session_is_autocommit(ses) ? "ON" : "OFF");
 | |
| }
 | 
