From 2d128de85f0fcd58945350214c7afa8fd65d7f21 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Wed, 25 Jun 2014 15:48:55 +0300 Subject: [PATCH] Replaced MySQL command macros with enumerated type. Each command has prefix 'MYSQL_' otherwise they match with those listed in mariadb-5.5 mysql_com.h. Added server command list structure which is included in MySQLProtocol. It holds command and number of response packets the command causes backend server to send as a response. Added set of functions related to protocol command and response packets counting etc. --- .../include/mysql_client_server_protocol.h | 145 +++++++---- server/modules/protocol/mysql_common.c | 235 ++++++++++++++++-- 2 files changed, 319 insertions(+), 61 deletions(-) diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index bb7338a3e..1d3749cfb 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -77,7 +77,7 @@ #define GW_SCRAMBLE_LENGTH_323 8 #ifndef MYSQL_SCRAMBLE_LEN -#define MYSQL_SCRAMBLE_LEN GW_MYSQL_SCRAMBLE_SIZE +# define MYSQL_SCRAMBLE_LEN GW_MYSQL_SCRAMBLE_SIZE #endif #define GW_NOINTR_CALL(A) do { errno = 0; A; } while (errno == EINTR) @@ -92,41 +92,15 @@ struct dcb; typedef enum { - MYSQL_ALLOC, - MYSQL_PENDING_CONNECT, - MYSQL_CONNECTED, - MYSQL_AUTH_SENT, - MYSQL_AUTH_RECV, - MYSQL_AUTH_FAILED, - MYSQL_IDLE, - MYSQL_ROUTING, - MYSQL_WAITING_RESULT, - MYSQL_SESSION_CHANGE -} mysql_pstate_t; + MYSQL_ALLOC, + MYSQL_PENDING_CONNECT, + MYSQL_CONNECTED, + MYSQL_AUTH_SENT, + MYSQL_AUTH_RECV, + MYSQL_AUTH_FAILED, + MYSQL_IDLE +} mysql_auth_state_t; -/* - * MySQL Protocol specific state data - */ -typedef struct { -#if defined(SS_DEBUG) - skygw_chk_t protocol_chk_top; -#endif - int fd; /*< The socket descriptor */ - struct dcb *owner_dcb; /*< The DCB of the socket - * we are running on */ - mysql_pstate_t state; /*< Current protocol state */ - uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /*< server scramble, - * created or received */ - uint32_t server_capabilities; /*< server capabilities, - * created or received */ - uint32_t client_capabilities; /*< client capabilities, - * created or received */ - unsigned long tid; /*< MySQL Thread ID, in - * handshake */ -#if defined(SS_DEBUG) - skygw_chk_t protocol_chk_tail; -#endif -} MySQLProtocol; /* * MySQL session specific data @@ -139,7 +113,6 @@ typedef struct mysql_session { } MYSQL_session; - /** Protocol packing macros. */ #define gw_mysql_set_byte2(__buffer, __int) do { \ (__buffer)[0]= (uint8_t)((__int) & 0xFF); \ @@ -230,18 +203,90 @@ typedef enum ), } gw_mysql_capabilities_t; -/** Basic mysql commands */ -#define MYSQL_COM_CHANGE_USER 0x11 -#define MYSQL_COM_QUIT 0x1 -#define MYSQL_COM_INIT_DB 0x2 -#define MYSQL_COM_QUERY 0x3 +/** Copy from enum in mariadb-5.5 mysql_com.h */ +typedef enum mysql_server_cmd { + MYSQL_COM_UNDEFINED = -1, + MYSQL_COM_SLEEP = 0, + MYSQL_COM_QUIT, + MYSQL_COM_INIT_DB, + MYSQL_COM_QUERY, + MYSQL_COM_FIELD_LIST, + MYSQL_COM_CREATE_DB, + MYSQL_COM_DROP_DB, + MYSQL_COM_REFRESH, + MYSQL_COM_SHUTDOWN, + MYSQL_COM_STATISTICS, + MYSQL_COM_PROCESS_INFO, + MYSQL_COM_CONNECT, + MYSQL_COM_PROCESS_KILL, + MYSQL_COM_DEBUG, + MYSQL_COM_PING, + MYSQL_COM_TIME, + MYSQL_COM_DELAYED_INSERT, + MYSQL_COM_CHANGE_USER, + MYSQL_COM_BINLOG_DUMP, + MYSQL_COM_TABLE_DUMP, + MYSQL_COM_CONNECT_OUT, + MYSQL_COM_REGISTER_SLAVE, + MYSQL_COM_STMT_PREPARE, + MYSQL_COM_STMT_EXECUTE, + MYSQL_COM_STMT_SEND_LONG_DATA, + MYSQL_COM_STMT_CLOSE, + MYSQL_COM_STMT_RESET, + MYSQL_COM_SET_OPTION, + MYSQL_COM_STMT_FETCH, + MYSQL_COM_DAEMON +} mysql_server_cmd_t; -#define MYSQL_GET_COMMAND(payload) (payload[4]) -#define MYSQL_GET_PACKET_NO(payload) (payload[3]) -#define MYSQL_GET_PACKET_LEN(payload) (gw_mysql_get_byte3(payload)) -#define MYSQL_GET_ERRCODE(payload) (gw_mysql_get_byte2(&payload[5])) +/** + * List of server commands, and number of response packets are stored here. + * server_command_t is used in MySQLProtocol structure, so for each DCB there is + * one MySQLProtocol and one server command list. + */ +typedef struct server_command_st { + mysql_server_cmd_t cmd; + int nresponse_packets; /** filled when reply arrives */ + struct server_command_st* next; +} server_command_t; + +/* + * MySQL Protocol specific state data + */ +typedef struct { +#if defined(SS_DEBUG) + skygw_chk_t protocol_chk_top; #endif + int fd; /*< The socket descriptor */ + struct dcb *owner_dcb; /*< The DCB of the socket + * we are running on */ + SPINLOCK protocol_lock; + server_command_t protocol_command; /*< list of active commands */ + mysql_auth_state_t protocol_auth_state; /*< Authentication status */ + uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /*< server scramble, + * created or received */ + uint32_t server_capabilities; /*< server capabilities, + * created or received */ + uint32_t client_capabilities; /*< client capabilities, + * created or received */ + unsigned long tid; /*< MySQL Thread ID, in + * handshake */ +#if defined(SS_DEBUG) + skygw_chk_t protocol_chk_tail; +#endif +} MySQLProtocol; + + + +#define MYSQL_GET_COMMAND(payload) (payload[4]) +#define MYSQL_GET_PACKET_NO(payload) (payload[3]) +#define MYSQL_GET_PACKET_LEN(payload) (gw_mysql_get_byte3(payload)) +#define MYSQL_GET_ERRCODE(payload) (gw_mysql_get_byte2(&payload[5])) +#define MYSQL_GET_STMTOK_NPARAM(payload) (gw_mysql_get_byte2(&payload[9])) +#define MYSQL_GET_STMTOK_NATTR(payload) (gw_mysql_get_byte2(&payload[11])) +#define MYSQL_IS_ERROR_PACKET(payload) (MYSQL_GET_COMMAND(payload)==0xff) + +#endif /** _MYSQL_PROTOCOL_H */ void gw_mysql_close(MySQLProtocol **ptr); MySQLProtocol* mysql_protocol_init(DCB* dcb, int fd); @@ -314,4 +359,14 @@ char *gw_strend(register const char *s); int setnonblocking(int fd); int setipaddress(struct in_addr *a, char *p); GWBUF* gw_MySQL_get_next_packet(GWBUF** p_readbuf); +GWBUF* gw_MySQL_get_packets(GWBUF** p_readbuf, int* npackets); +GWBUF* gw_MySQL_discard_packets(GWBUF* buf, int npackets); +void protocol_add_srv_command(MySQLProtocol* p, mysql_server_cmd_t cmd); +void protocol_remove_srv_command(MySQLProtocol* p); +bool protocol_waits_response(MySQLProtocol* p); +mysql_server_cmd_t protocol_get_srv_command(MySQLProtocol* p,bool removep); +int get_stmt_nresponse_packets(GWBUF* buf, mysql_server_cmd_t cmd); +int protocol_get_nresponse_packets (MySQLProtocol* p); +bool protocol_set_nresponse_packets (MySQLProtocol* p, int nresponse_packets); + diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index b546a0a88..dd677ec58 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -76,7 +76,8 @@ MySQLProtocol* mysql_protocol_init( strerror(eno)))); goto return_p; } - p->state = MYSQL_ALLOC; + p->protocol_auth_state = MYSQL_ALLOC; + p->protocol_command.cmd = MYSQL_COM_UNDEFINED; #if defined(SS_DEBUG) p->protocol_chk_top = CHK_NUM_PROTOCOL; p->protocol_chk_tail = CHK_NUM_PROTOCOL; @@ -151,7 +152,7 @@ int gw_read_backend_handshake( if (h_len <= 4) { /* log error this exit point */ - conn->state = MYSQL_AUTH_FAILED; + conn->protocol_auth_state = MYSQL_AUTH_FAILED; LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, "%lu [gw_read_backend_handshake] after " @@ -198,7 +199,7 @@ int gw_read_backend_handshake( * data in buffer less than expected in the * packet. Log error this exit point */ - conn->state = MYSQL_AUTH_FAILED; + conn->protocol_auth_state = MYSQL_AUTH_FAILED; LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, "%lu [gw_read_backend_handshake] after " @@ -223,7 +224,7 @@ int gw_read_backend_handshake( * we cannot continue * log error this exit point */ - conn->state = MYSQL_AUTH_FAILED; + conn->protocol_auth_state = MYSQL_AUTH_FAILED; LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, "%lu [gw_read_backend_handshake] after " @@ -236,7 +237,7 @@ int gw_read_backend_handshake( return 1; } - conn->state = MYSQL_AUTH_SENT; + conn->protocol_auth_state = MYSQL_AUTH_SENT; // consume all the data here head = gwbuf_consume(head, GWBUF_LENGTH(head)); @@ -789,13 +790,7 @@ gw_mysql_protocol_state2string (int state) { case MYSQL_AUTH_FAILED: return "MySQL Authentication failed"; case MYSQL_IDLE: - return "MySQL Auth done. Protocol is idle, waiting for statements"; - case MYSQL_ROUTING: - return "MySQL received command has been routed to backend(s)"; - case MYSQL_WAITING_RESULT: - return "MySQL Waiting for result set"; - case MYSQL_SESSION_CHANGE: - return "MySQL change session"; + return "MySQL authentication is succesfully done."; default: return "MySQL (unknown protocol state)"; } @@ -960,11 +955,9 @@ int mysql_send_custom_error ( const char *mysql_message) { GWBUF* buf; - int nbytes; - buf = mysql_create_custom_error(dcb, in_affected_rows, mysql_message); + buf = mysql_create_custom_error(packet_number, in_affected_rows, mysql_message); - nbytes = GWBUF_LENGTH(buf); dcb->func.write(dcb, buf); return GWBUF_LENGTH(buf); @@ -1500,7 +1493,7 @@ GWBUF* gw_MySQL_get_next_packet( packetbuf = NULL; goto return_packetbuf; } - + /** there is one complete packet in the buffer */ if (packetlen == buflen) { packetbuf = gwbuf_clone_portion(readbuf, 0, packetlen); @@ -1541,3 +1534,213 @@ return_packetbuf: return packetbuf; } + +/** + * Move from buffer pointed to by <*p_readbuf>. + */ +GWBUF* gw_MySQL_get_packets( + GWBUF** p_srcbuf, + int* npackets) +{ + GWBUF* packetbuf; + GWBUF* targetbuf = NULL; + + while (*npackets > 0 && (packetbuf = gw_MySQL_get_next_packet(p_srcbuf)) != NULL) + { + targetbuf = gwbuf_append(targetbuf, packetbuf); + *npackets -= 1; + } + ss_dassert(*npackets < 128); + ss_dassert(*npackets >= 0); + + return targetbuf; +} + + +/** + * If router expects to get separate, complete statements, add MySQL command + * to MySQLProtocol structure. It is removed when response has arrived. + */ +void protocol_add_srv_command( + MySQLProtocol* p, + mysql_server_cmd_t cmd) +{ + spinlock_acquire(&p->protocol_lock); + + if (p->protocol_command.cmd == MYSQL_COM_UNDEFINED) + { + p->protocol_command.cmd = cmd; + p->protocol_command.nresponse_packets = 0; + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Added command %s to fd %d.", + STRPACKETTYPE(cmd), + p->owner_dcb->fd))); + } + else + { + server_command_t* c = + (server_command_t *)malloc(sizeof(server_command_t)); + c->cmd = cmd; + c->nresponse_packets = 0; + c->next = NULL; + + p->protocol_command.next = c; + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Added another command %s to fd %d.", + STRPACKETTYPE(cmd), + p->owner_dcb->fd))); +#if defined(SS_DEBUG) + c = &p->protocol_command; + + while (c != NULL && c->cmd != MYSQL_COM_UNDEFINED) + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "fd %d : %d %s", + p->owner_dcb->fd, + c->cmd, + STRPACKETTYPE(c->cmd)))); + c = c->next; + } +#endif + } + spinlock_release(&p->protocol_lock); +} + + +/** + * If router processes separate statements, every stmt has corresponding MySQL + * command stored in MySQLProtocol structure. + * + * Remove current (=oldest) command. + */ +void protocol_remove_srv_command( + MySQLProtocol* p) +{ + server_command_t* s; + spinlock_acquire(&p->protocol_lock); + s = &p->protocol_command; + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Removed command %s from fd %d.", + STRPACKETTYPE(s->cmd), + p->owner_dcb->fd))); + + if (s->next == NULL) + { + p->protocol_command.cmd = MYSQL_COM_UNDEFINED; + } + else + { + p->protocol_command = *(s->next); + free(s->next); + } + + spinlock_release(&p->protocol_lock); +} + +mysql_server_cmd_t protocol_get_srv_command( + MySQLProtocol* p, + bool removep) +{ + mysql_server_cmd_t cmd; + + cmd = p->protocol_command.cmd; + + if (removep) + { + protocol_remove_srv_command(p); + } + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Read command %s for fd %d.", + STRPACKETTYPE(cmd), + p->owner_dcb->fd))); + return cmd; +} + +/** + * Return how many packets are included in the server's response. + */ +int get_stmt_nresponse_packets( + GWBUF* buf, + mysql_server_cmd_t cmd) +{ + int npackets; + uint8_t* packet; + int nparam; + int nattr; + uint8_t* data; + + switch (cmd) { + case MYSQL_COM_STMT_PREPARE: + data = (uint8_t *)buf->start; + + if (data[4] == 0xff) + { + npackets = 1; /*< error packet */ + } + else + { + packet = (uint8_t *)GWBUF_DATA(buf); + /** ok + nparam + eof + nattr + eof */ + nparam = MYSQL_GET_STMTOK_NPARAM(packet); + nattr = MYSQL_GET_STMTOK_NATTR(packet); + + npackets = 1 + nparam + MIN(1, nparam) + + nattr + MIN(nattr, 1); + ss_dassert(npackets<128); + } + break; + + default: + npackets = 1; + break; + } + ss_dassert(npackets<128); + return npackets; +} + +int protocol_get_nresponse_packets ( + MySQLProtocol* p) +{ + int rval; + + CHK_PROTOCOL(p); + spinlock_acquire(&p->protocol_lock); + rval = p->protocol_command.nresponse_packets; + spinlock_release(&p->protocol_lock); + ss_dassert(rval<128); + + return rval; +} + +bool protocol_set_nresponse_packets ( + MySQLProtocol* p, + int nresponse_packets) +{ + bool succp; + + CHK_PROTOCOL(p); + spinlock_acquire(&p->protocol_lock); + if (p->protocol_command.nresponse_packets > 0 && + nresponse_packets > p->protocol_command.nresponse_packets) + { + succp = false; + } + else + { + p->protocol_command.nresponse_packets = nresponse_packets; + ss_dassert(nresponse_packets<128); + succp = true; + } + spinlock_release(&p->protocol_lock); + + return succp; +} +