diff --git a/server/core/buffer.c b/server/core/buffer.c index 2708d32ce..993f1d2af 100644 --- a/server/core/buffer.c +++ b/server/core/buffer.c @@ -152,6 +152,7 @@ GWBUF *gwbuf_clone_portion( } atomic_add(&buf->sbuf->refcount, 1); clonebuf->sbuf = buf->sbuf; + clonebuf->gwbuf_type = buf->gwbuf_type; /*< clone info bits too */ clonebuf->start = (void *)((char*)buf->start)+start_offset; clonebuf->end = (void *)((char *)clonebuf->start)+length; clonebuf->gwbuf_type = buf->gwbuf_type; /*< clone the type for now */ @@ -232,12 +233,12 @@ GWBUF *ptr = head; if (!head) return tail; CHK_GWBUF(head); - CHK_GWBUF(tail); while (ptr->next) { ptr = ptr->next; } ptr->next = tail; + return head; } @@ -316,27 +317,12 @@ gwbuf_trim(GWBUF *buf, unsigned int n_bytes) return buf; } -bool gwbuf_set_type( +void gwbuf_set_type( GWBUF* buf, gwbuf_type_t type) { - bool succp; - CHK_GWBUF(buf); - - switch (type) { - case GWBUF_TYPE_MYSQL: - case GWBUF_TYPE_PLAINSQL: - case GWBUF_TYPE_UNDEFINED: - case GWBUF_TYPE_SINGLE_STMT: /*< buffer contains one stmt */ - buf->gwbuf_type |= type; - succp = true; - break; - default: - succp = false; - break; - } - ss_dassert(succp); - return succp; + CHK_GWBUF(buf); + buf->gwbuf_type |= type; } diff --git a/server/core/server.c b/server/core/server.c index c1bec6189..2abd0a11c 100644 --- a/server/core/server.c +++ b/server/core/server.c @@ -260,7 +260,8 @@ char *stat; } dcb_printf(dcb, "\tNumber of connections: %d\n", ptr->stats.n_connections); dcb_printf(dcb, "\tCurrent no. of conns: %d\n", ptr->stats.n_current); - ptr = ptr->next; + dcb_printf(dcb, "\tCurrent no. of operations: %d\n", ptr->stats.n_current_ops); + ptr = ptr->next; } spinlock_release(&server_spin); } @@ -296,6 +297,7 @@ char *stat; } dcb_printf(dcb, "\tNumber of connections: %d\n", server->stats.n_connections); dcb_printf(dcb, "\tCurrent no. of conns: %d\n", server->stats.n_current); + dcb_printf(dcb, "\tCurrent no. of operations: %d\n", server->stats.n_current_ops); } /** diff --git a/server/include/buffer.h b/server/include/buffer.h index 3c4c022e3..9729c538c 100644 --- a/server/include/buffer.h +++ b/server/include/buffer.h @@ -46,16 +46,22 @@ typedef enum { - GWBUF_TYPE_UNDEFINED = 0x00, - GWBUF_TYPE_PLAINSQL = 0x01, - GWBUF_TYPE_MYSQL = 0x02, - GWBUF_TYPE_SINGLE_STMT = 0x04 + GWBUF_TYPE_UNDEFINED = 0x00, + GWBUF_TYPE_PLAINSQL = 0x01, + GWBUF_TYPE_MYSQL = 0x02, + GWBUF_TYPE_SINGLE_STMT = 0x04, + GWBUF_TYPE_SESCMD_RESPONSE = 0x08, + GWBUF_TYPE_RESPONSE_END = 0x10, + GWBUF_TYPE_SESCMD = 0x20 } gwbuf_type_t; -#define GWBUF_IS_TYPE_UNDEFINED(b) (b->gwbuf_type == 0) -#define GWBUF_IS_TYPE_PLAINSQL(b) (b->gwbuf_type & GWBUF_TYPE_PLAINSQL) -#define GWBUF_IS_TYPE_MYSQL(b) (b->gwbuf_type & GWBUF_TYPE_MYSQL) -#define GWBUF_IS_TYPE_SINGLE_STMT(b) (b->gwbuf_type & GWBUF_TYPE_SINGLE_STMT) +#define GWBUF_IS_TYPE_UNDEFINED(b) (b->gwbuf_type == 0) +#define GWBUF_IS_TYPE_PLAINSQL(b) (b->gwbuf_type & GWBUF_TYPE_PLAINSQL) +#define GWBUF_IS_TYPE_MYSQL(b) (b->gwbuf_type & GWBUF_TYPE_MYSQL) +#define GWBUF_IS_TYPE_SINGLE_STMT(b) (b->gwbuf_type & GWBUF_TYPE_SINGLE_STMT) +#define GWBUF_IS_TYPE_SESCMD_RESPONSE(b) (b->gwbuf_type & GWBUF_TYPE_SESCMD_RESPONSE) +#define GWBUF_IS_TYPE_RESPONSE_END(b) (b->gwbuf_type & GWBUF_TYPE_RESPONSE_END) +#define GWBUF_IS_TYPE_SESCMD(b) (b->gwbuf_type & GWBUF_TYPE_SESCMD) /** * A structure to encapsulate the data in a form that the data itself can be @@ -114,5 +120,5 @@ extern GWBUF *gwbuf_trim(GWBUF *head, unsigned int length); extern unsigned int gwbuf_length(GWBUF *head); extern GWBUF *gwbuf_clone_portion(GWBUF *head, size_t offset, size_t len); extern GWBUF *gwbuf_clone_transform(GWBUF *head, gwbuf_type_t type); -extern bool gwbuf_set_type(GWBUF *head, gwbuf_type_t type); +extern void gwbuf_set_type(GWBUF *head, gwbuf_type_t type); #endif diff --git a/server/include/server.h b/server/include/server.h index d32413bb7..3f95c99f0 100644 --- a/server/include/server.h +++ b/server/include/server.h @@ -47,6 +47,7 @@ typedef struct { int n_connections; /**< Number of connections */ int n_current; /**< Current connections */ + int n_current_ops; /**< Current active operations */ } SERVER_STATS; /** diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index 1d3749cfb..ffde09871 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -66,6 +66,7 @@ #define GW_MYSQL_LOOP_TIMEOUT 300000000 #define GW_MYSQL_READ 0 #define GW_MYSQL_WRITE 1 +#define MYSQL_HEADER_LEN 4L #define GW_MYSQL_PROTOCOL_VERSION 10 // version is 10 #define GW_MYSQL_HANDSHAKE_FILLER 0x00 @@ -235,7 +236,8 @@ typedef enum mysql_server_cmd { MYSQL_COM_STMT_RESET, MYSQL_COM_SET_OPTION, MYSQL_COM_STMT_FETCH, - MYSQL_COM_DAEMON + MYSQL_COM_DAEMON, + MYSQL_COM_END /*< Must be the last */ } mysql_server_cmd_t; @@ -245,9 +247,10 @@ typedef enum mysql_server_cmd { * one MySQLProtocol and one server command list. */ typedef struct server_command_st { - mysql_server_cmd_t cmd; - int nresponse_packets; /** filled when reply arrives */ - struct server_command_st* next; + mysql_server_cmd_t scom_cmd; + int scom_nresponse_packets; /*< packets in response */ + size_t scom_nbytes_to_read; /*< bytes left to read in current packet */ + struct server_command_st* scom_next; } server_command_t; /* @@ -262,6 +265,7 @@ typedef struct { * we are running on */ SPINLOCK protocol_lock; server_command_t protocol_command; /*< list of active commands */ + server_command_t* protocol_cmd_history; /*< command history list */ mysql_auth_state_t protocol_auth_state; /*< Authentication status */ uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /*< server scramble, * created or received */ @@ -285,6 +289,7 @@ typedef struct { #define MYSQL_GET_STMTOK_NPARAM(payload) (gw_mysql_get_byte2(&payload[9])) #define MYSQL_GET_STMTOK_NATTR(payload) (gw_mysql_get_byte2(&payload[11])) #define MYSQL_IS_ERROR_PACKET(payload) (MYSQL_GET_COMMAND(payload)==0xff) +#define MYSQL_GET_NATTR(payload) ((int)payload[4]) #endif /** _MYSQL_PROTOCOL_H */ @@ -365,8 +370,16 @@ void protocol_add_srv_command(MySQLProtocol* p, mysql_server_cmd_t cmd); void protocol_remove_srv_command(MySQLProtocol* p); bool protocol_waits_response(MySQLProtocol* p); mysql_server_cmd_t protocol_get_srv_command(MySQLProtocol* p,bool removep); -int get_stmt_nresponse_packets(GWBUF* buf, mysql_server_cmd_t cmd); -int protocol_get_nresponse_packets (MySQLProtocol* p); -bool protocol_set_nresponse_packets (MySQLProtocol* p, int nresponse_packets); +int get_stmt_nresponse_packets(GWBUF* buf, mysql_server_cmd_t cmd); +bool protocol_get_response_status (MySQLProtocol* p, int* npackets, size_t* nbytes); +void protocol_set_response_status (MySQLProtocol* p, int npackets, size_t nbytes); +void protocol_archive_srv_command(MySQLProtocol* p); + + +void init_response_status ( + GWBUF* buf, + mysql_server_cmd_t cmd, + int* npackets, + size_t* nbytes); diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 4db72ae5e..2a7a46418 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -58,7 +58,7 @@ typedef enum bref_state { #define BREF_IS_NOT_USED(s) (s->bref_state & ~BREF_IN_USE) #define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE) -#define BREF_IS_WAITING_RESULT(s) (s->bref_state & BREF_WAITING_RESULT) +#define BREF_IS_WAITING_RESULT(s) (s->bref_num_result_wait > 0) #define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED) typedef enum backend_type_t { @@ -90,9 +90,10 @@ typedef enum rses_property_type_t { typedef enum select_criteria { UNDEFINED_CRITERIA=0, LEAST_GLOBAL_CONNECTIONS, /*< all connections established by MaxScale */ - DEFAULT_CRITERIA=LEAST_GLOBAL_CONNECTIONS, LEAST_ROUTER_CONNECTIONS, /*< connections established by this router */ LEAST_BEHIND_MASTER, + LEAST_CURRENT_OPERATIONS, + DEFAULT_CRITERIA=LEAST_CURRENT_OPERATIONS, LAST_CRITERIA /*< not used except for an index */ } select_criteria_t; @@ -106,7 +107,9 @@ typedef enum select_criteria { strncmp(s,"LEAST_BEHIND_MASTER", strlen("LEAST_BEHIND_MASTER")) == 0 ? \ LEAST_BEHIND_MASTER : ( \ strncmp(s,"LEAST_ROUTER_CONNECTIONS", strlen("LEAST_ROUTER_CONNECTIONS")) == 0 ? \ - LEAST_ROUTER_CONNECTIONS : UNDEFINED_CRITERIA))) + LEAST_ROUTER_CONNECTIONS : ( \ + strncmp(s,"LEAST_CURRENT_OPERATIONS", strlen("LEAST_CURRENT_OPERATIONS")) == 0 ? \ + LEAST_CURRENT_OPERATIONS : UNDEFINED_CRITERIA)))) /** * Session variable command @@ -190,6 +193,7 @@ typedef struct backend_ref_st { BACKEND* bref_backend; DCB* bref_dcb; bref_state_t bref_state; + int bref_num_result_wait; sescmd_cursor_t bref_sescmd_cur; #if defined(SS_DEBUG) skygw_chk_t bref_chk_tail; diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index cdcdf2c05..8a29d12b8 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -65,6 +65,10 @@ static int gw_backend_hangup(DCB *dcb); static int backend_write_delayqueue(DCB *dcb); static void backend_set_delayqueue(DCB *dcb, GWBUF *queue); static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue); +static GWBUF* process_response_data (DCB* dcb, GWBUF* readbuf, int nbytes_to_process); + + + #if defined(NOT_USED) static int gw_session(DCB *backend_dcb, void *data); #endif @@ -416,21 +420,20 @@ static int gw_read_backend_event(DCB *dcb) { /* reading MySQL command output from backend and writing to the client */ { - GWBUF *readbuf = NULL; + GWBUF *read_buffer = NULL; ROUTER_OBJECT *router = NULL; ROUTER *router_instance = NULL; void *rsession = NULL; SESSION *session = dcb->session; int nbytes_read = 0; - mysql_server_cmd_t srvcmd = MYSQL_COM_UNDEFINED; - + CHK_SESSION(session); router = session->service->router; router_instance = session->service->router_instance; rsession = session->router_session; /* read available backend data */ - rc = dcb_read(dcb, &readbuf); + rc = dcb_read(dcb, &read_buffer); if (rc < 0) { @@ -463,7 +466,7 @@ static int gw_read_backend_event(DCB *dcb) { rc = 0; goto return_rc; } - nbytes_read = gwbuf_length(readbuf); + nbytes_read = gwbuf_length(read_buffer); if (nbytes_read == 0) { @@ -471,74 +474,41 @@ static int gw_read_backend_event(DCB *dcb) { } else { - ss_dassert(readbuf != NULL); + ss_dassert(read_buffer != NULL); } - /** - * ask for next response (1 or more packets) like in - * gw_MySQL_get_next_packet but gw_MySQL_get_next_response - */ - srvcmd = protocol_get_srv_command((MySQLProtocol *)dcb->protocol, - false); - /** - * If backend DCB is waiting for response to COM_STMT_PREPARE, - * it, then only that must be passed to clientReply. - * - * If response consists of ses cmd response and response to - * COM_STMT_PREPARE, there can't be anything after - * COM_STMT_PREPARE response because whole buffer may be - * discarded since router doesn't know the borderlines of MySQL - * packets. - */ - - /** - * Read all packets from which belong to STMT PREPARE - * response. - * Move packets not belonging to STMT PREPARE response to - * dcb_readqueue. - * When whole response is read, pass forward to - * clientReply. - */ - if (srvcmd == MYSQL_COM_STMT_PREPARE) + /** Packet prefix was read earlier */ + if (dcb->dcb_readqueue) { - MySQLProtocol* p; - int nresponse_packets; - GWBUF* tmpbuf; + read_buffer = gwbuf_append(dcb->dcb_readqueue, read_buffer); + nbytes_read = gwbuf_length(read_buffer); - p = (MySQLProtocol *)dcb->protocol; - nresponse_packets = protocol_get_nresponse_packets(p); - - /** count only once per response */ - if (nresponse_packets == 0) - { - nresponse_packets = get_stmt_nresponse_packets( - readbuf, - srvcmd); - } - tmpbuf = gw_MySQL_get_packets(&readbuf, &nresponse_packets); - gwbuf_append(dcb->dcb_readqueue, readbuf); - readbuf = tmpbuf; - - /** contains incomplete response to STMT PREPARE */ - if (nresponse_packets != 0) + if (nbytes_read < 5) /*< read at least command type */ { rc = 0; - - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Backend fd %d read incomplete response packet. " - "Waiting %d more, cmd %s.", - dcb->fd, - nresponse_packets, - STRPACKETTYPE(srvcmd)))); - /** - * store the number of how many packets the - * reponse consists of to backend's protocol. - */ - protocol_set_nresponse_packets(p, nresponse_packets); goto return_rc; } - protocol_remove_srv_command((MySQLProtocol *)dcb->protocol); + /** There is at least length and command type. */ + else + { + read_buffer = dcb->dcb_readqueue; + dcb->dcb_readqueue = NULL; + } + } + else + { + if (nbytes_read < 5) + { + gwbuf_append(dcb->dcb_readqueue, read_buffer); + rc = 0; + goto return_rc; + } + } + /** If protocol has command set it is session command */ + if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) != + MYSQL_COM_UNDEFINED) + { + read_buffer = process_response_data(dcb, read_buffer, nbytes_read); } /*< * If dcb->session->client is freed already it may be NULL. @@ -554,20 +524,20 @@ static int gw_read_backend_event(DCB *dcb) { if (client_protocol->protocol_auth_state == MYSQL_IDLE) { - gwbuf_set_type(readbuf, GWBUF_TYPE_MYSQL); - - router->clientReply(router_instance, - rsession, - readbuf, - dcb); + gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL); + router->clientReply( + router_instance, + rsession, + read_buffer, + dcb); rc = 1; } goto return_rc; } else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL) { - gwbuf_set_type(readbuf, GWBUF_TYPE_MYSQL); - router->clientReply(router_instance, rsession, readbuf, dcb); + gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL); + router->clientReply(router_instance, rsession, read_buffer, dcb); rc = 1; } } @@ -672,7 +642,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) * If auth failed, return value is 0, write and buffered write * return 1. */ - switch(backend_protocol->protocol_auth_state) { + switch (backend_protocol->protocol_auth_state) { case MYSQL_AUTH_FAILED: { size_t len; @@ -712,22 +682,18 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) dcb, dcb->fd, STRPROTOCOLSTATE(backend_protocol->protocol_auth_state)))); + spinlock_release(&dcb->authlock); /** + * Statement type is used in readwrite split router. + * Command is *not* set for readconn router. + * * Server commands are stored to MySQLProtocol structure - * if buffer always includes a single statement. That - * information is stored in GWBUF type field - * (GWBUF_TYPE_SINGLE_STMT bit). + * if buffer always includes a single statement. */ - if (GWBUF_IS_TYPE_SINGLE_STMT(queue)) + if (GWBUF_IS_TYPE_SINGLE_STMT(queue) && + GWBUF_IS_TYPE_SESCMD(queue)) { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Write to backend's DCB fd %d " - "cmd %s protocol state %s.", - dcb->fd, - STRPACKETTYPE(cmd), - STRPROTOCOLSTATE(backend_protocol->protocol_auth_state)))); /** Record the command to backend's protocol */ protocol_add_srv_command(backend_protocol, cmd); } @@ -751,21 +717,12 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) dcb->fd, STRPROTOCOLSTATE(backend_protocol->protocol_auth_state)))); /** - * Since it is known that buffer contains one complete - * command, store the command to backend's protocol. When - * backend server responses the command determines how - * response needs to be processed. This is mainly due to - * MYSQL_COM_STMT_PREPARE whose response consists of - * arbitrary number of packets. + * In case of session commands, store command to DCB's + * protocol struct. */ - if (GWBUF_IS_TYPE_SINGLE_STMT(queue)) + if (GWBUF_IS_TYPE_SINGLE_STMT(queue) && + GWBUF_IS_TYPE_SESCMD(queue)) { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Write to backend's delayqueue fd %d " - "protocol state %s.", - dcb->fd, - STRPROTOCOLSTATE(backend_protocol->protocol_auth_state)))); /** Record the command to backend's protocol */ protocol_add_srv_command(backend_protocol, cmd); } @@ -1010,7 +967,6 @@ gw_backend_close(DCB *dcb) DCB* client_dcb; SESSION* session; GWBUF* quitbuf; - bool succp; CHK_DCB(dcb); session = dcb->session; @@ -1095,7 +1051,6 @@ static int backend_write_delayqueue(DCB *dcb) ROUTER *router_instance = NULL; void *rsession = NULL; SESSION *session = dcb->session; - int receive_rc = 0; CHK_SESSION(session); @@ -1240,3 +1195,141 @@ static int gw_session(DCB *backend_dcb, void *data) { return 1; } */ + + +/** + * Move packets or parts of packets from redbuf to outbuf as the packet headers + * and lengths have been noticed and counted. + * Session commands need to be marked so that they can be handled properly in + * the router's clientReply. + * Return the pointer to outbuf. + */ +static GWBUF* process_response_data ( + DCB* dcb, + GWBUF* readbuf, + int nbytes_to_process) /*< number of new bytes read */ +{ + int npackets_left = 0; /*< response's packet count */ + size_t nbytes_left = 0; /*< nbytes to be read for the packet */ + MySQLProtocol* p; + GWBUF* outbuf = NULL; + + /** Get command which was stored in gw_MySQLWrite_backend */ + p = DCB_PROTOCOL(dcb, MySQLProtocol); + CHK_PROTOCOL(p); + + /** All buffers processed here are sescmd responses */ + gwbuf_set_type(readbuf, GWBUF_TYPE_SESCMD_RESPONSE); + + /** + * Now it is known how many packets there should be and how much + * is read earlier. + */ + while (nbytes_to_process != 0) + { + mysql_server_cmd_t srvcmd; + GWBUF* sparebuf; + bool succp; + + srvcmd = protocol_get_srv_command(p, false); + + /** + * Read values from protocol structure, fails if values are + * uninitialized. + */ + if (npackets_left == 0) + { + succp = protocol_get_response_status(p, &npackets_left, &nbytes_left); + + if (!succp || npackets_left == 0) + { + /** + * Examine command type and the readbuf. Conclude response + * packet count from the command type or from the first + * packet content. Fails if read buffer doesn't include + * enough data to read the packet length. + */ + init_response_status(readbuf, srvcmd, &npackets_left, &nbytes_left); + } + } + /** Only session commands with responses should be processed */ + ss_dassert(npackets_left > 0); + + /** Read incomplete packet. */ + if (nbytes_left > nbytes_to_process) + { + /** Includes length info so it can be processed */ + if (nbytes_to_process >= 5) + { + /** discard source buffer */ + readbuf = gwbuf_consume(readbuf, GWBUF_LENGTH(readbuf)); + nbytes_left -= nbytes_to_process; + } + nbytes_to_process = 0; + } + /** Packet was read. All bytes belonged to the last packet. */ + else if (nbytes_left == nbytes_to_process) + { + nbytes_left = 0; + nbytes_to_process = 0; + ss_dassert(npackets_left > 0); + npackets_left -= 1; + outbuf = gwbuf_append(outbuf, readbuf); + readbuf = NULL; + } + /** + * Packet was read. There should be more since bytes were + * left over. + * Move the next packet to its own buffer and add that next + * to the prev packet's buffer. + */ + else /*< nbytes_left < nbytes_to_process */ + { + size_t len = GWBUF_LENGTH(readbuf); + + nbytes_to_process -= nbytes_left; + + /** Move the prefix of the buffer to outbuf from redbuf */ + outbuf = gwbuf_append(outbuf, gwbuf_clone_portion(readbuf, 0, nbytes_left)); + readbuf = gwbuf_consume(readbuf, nbytes_left); + ss_dassert(npackets_left > 0); + npackets_left -= 1; + nbytes_left = 0; + } + + /** Store new status to protocol structure */ + protocol_set_response_status(p, npackets_left, nbytes_left); + + /** A complete packet was read */ + if (nbytes_left == 0) + { + /** No more packets in this response */ + if (npackets_left == 0) + { + GWBUF* b = outbuf; + + while (b->next != NULL) + { + b = b->next; + } + /** Mark last as end of response */ + gwbuf_set_type(b, GWBUF_TYPE_RESPONSE_END); + + /** Archive the command */ + protocol_archive_srv_command(p); + } + /** Read next packet */ + else + { + uint8_t* data; + + /** Read next packet length */ + data = GWBUF_DATA(readbuf); + nbytes_left = MYSQL_GET_PACKET_LEN(data)+MYSQL_HEADER_LEN; + /** Store new status to protocol structure */ + protocol_set_response_status(p, npackets_left, nbytes_left); + } + } + } + return outbuf; +} diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 08c648d39..037cb0522 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -574,9 +574,8 @@ int gw_read_client_event( else { uint8_t* data = (uint8_t *)GWBUF_DATA(read_buffer); - size_t packetlen = MYSQL_GET_PACKET_LEN(data)+4; - if (nbytes_read < 3 || nbytes_read < packetlen) + if (nbytes_read < 3 || nbytes_read < MYSQL_GET_PACKET_LEN(data)+4) { gwbuf_append(dcb->dcb_readqueue, read_buffer); rc = 0; @@ -1296,6 +1295,8 @@ static int gw_error_client_event( CHK_DCB(dcb); + session = dcb->session; + LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, "%lu [gw_error_client_event] Error event handling for DCB %p " diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index dd677ec58..4ce34452b 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -41,6 +41,9 @@ extern int gw_write_backend_event(DCB *dcb); extern int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue); extern int gw_error_backend_event(DCB *dcb); +static server_command_t* server_command_init(server_command_t* srvcmd, + mysql_server_cmd_t cmd); + /** * Creates MySQL protocol structure @@ -77,7 +80,9 @@ MySQLProtocol* mysql_protocol_init( goto return_p; } p->protocol_auth_state = MYSQL_ALLOC; - p->protocol_command.cmd = MYSQL_COM_UNDEFINED; + p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED; + p->protocol_command.scom_nresponse_packets = 0; + p->protocol_command.scom_nbytes_to_read = 0; #if defined(SS_DEBUG) p->protocol_chk_top = CHK_NUM_PROTOCOL; p->protocol_chk_tail = CHK_NUM_PROTOCOL; @@ -1557,6 +1562,94 @@ GWBUF* gw_MySQL_get_packets( } +static server_command_t* server_command_init( + server_command_t* srvcmd, + mysql_server_cmd_t cmd) +{ + server_command_t* c; + + if (srvcmd != NULL) + { + c = srvcmd; + } + else + { + c = (server_command_t *)malloc(sizeof(server_command_t)); + } + c->scom_cmd = cmd; + c->scom_nresponse_packets = -1; + c->scom_nbytes_to_read = 0; + c->scom_next = NULL; + + return c; +} + +static server_command_t* server_command_copy( + server_command_t* srvcmd) +{ + server_command_t* c = + (server_command_t *)malloc(sizeof(server_command_t)); + *c = *srvcmd; + + return c; +} + +#define MAX_CMD_HISTORY 10 + +void protocol_archive_srv_command( + MySQLProtocol* p) +{ + server_command_t* s1; + server_command_t** s2; + int len; + + spinlock_acquire(&p->protocol_lock); + + s1 = &p->protocol_command; + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Move command %s from fd %d to command history.", + STRPACKETTYPE(s1->scom_cmd), + p->owner_dcb->fd))); + + /** Copy to history list */ + s2 = &p->protocol_cmd_history; + + if (*s2 != NULL) + { + len = 0; + + while ((*s2)->scom_next != NULL) + { + *s2 = (*s2)->scom_next; + len += 1; + } + } + *s2 = server_command_copy(s1); + + /** Keep history limits, remove oldest */ + if (len > MAX_CMD_HISTORY) + { + server_command_t* c = p->protocol_cmd_history; + p->protocol_cmd_history = p->protocol_cmd_history->scom_next; + free(c); + } + + /** Remove from command list */ + if (s1->scom_next == NULL) + { + p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED; + } + else + { + p->protocol_command = *(s1->scom_next); + free(s1->scom_next); + } + spinlock_release(&p->protocol_lock); +} + + /** * If router expects to get separate, complete statements, add MySQL command * to MySQLProtocol structure. It is removed when response has arrived. @@ -1565,49 +1658,42 @@ void protocol_add_srv_command( MySQLProtocol* p, mysql_server_cmd_t cmd) { - spinlock_acquire(&p->protocol_lock); + server_command_t* c; - if (p->protocol_command.cmd == MYSQL_COM_UNDEFINED) + spinlock_acquire(&p->protocol_lock); + + /** this is the only server command in protocol */ + if (p->protocol_command.scom_cmd == MYSQL_COM_UNDEFINED) { - p->protocol_command.cmd = cmd; - p->protocol_command.nresponse_packets = 0; - - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Added command %s to fd %d.", - STRPACKETTYPE(cmd), - p->owner_dcb->fd))); + /** write into structure */ + server_command_init(&p->protocol_command, cmd); } else { - server_command_t* c = - (server_command_t *)malloc(sizeof(server_command_t)); - c->cmd = cmd; - c->nresponse_packets = 0; - c->next = NULL; - - p->protocol_command.next = c; - + /** add to the end of list */ + p->protocol_command.scom_next = server_command_init(NULL, cmd); + } + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Added command %s to fd %d.", + STRPACKETTYPE(cmd), + p->owner_dcb->fd))); + +#if defined(SS_DEBUG) + c = &p->protocol_command; + + while (c != NULL && c->scom_cmd != MYSQL_COM_UNDEFINED) + { LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, - "Added another command %s to fd %d.", - STRPACKETTYPE(cmd), - p->owner_dcb->fd))); -#if defined(SS_DEBUG) - c = &p->protocol_command; - - while (c != NULL && c->cmd != MYSQL_COM_UNDEFINED) - { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "fd %d : %d %s", - p->owner_dcb->fd, - c->cmd, - STRPACKETTYPE(c->cmd)))); - c = c->next; - } -#endif + "fd %d : %d %s", + p->owner_dcb->fd, + c->scom_cmd, + STRPACKETTYPE(c->scom_cmd)))); + c = c->scom_next; } +#endif spinlock_release(&p->protocol_lock); } @@ -1628,17 +1714,17 @@ void protocol_remove_srv_command( LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Removed command %s from fd %d.", - STRPACKETTYPE(s->cmd), + STRPACKETTYPE(s->scom_cmd), p->owner_dcb->fd))); - if (s->next == NULL) + if (s->scom_next == NULL) { - p->protocol_command.cmd = MYSQL_COM_UNDEFINED; + p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED; } else { - p->protocol_command = *(s->next); - free(s->next); + p->protocol_command = *(s->scom_next); + free(s->scom_next); } spinlock_release(&p->protocol_lock); @@ -1650,97 +1736,133 @@ mysql_server_cmd_t protocol_get_srv_command( { mysql_server_cmd_t cmd; - cmd = p->protocol_command.cmd; + cmd = p->protocol_command.scom_cmd; if (removep) { protocol_remove_srv_command(p); } - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Read command %s for fd %d.", + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [protocol_get_srv_command] Read command %s for fd %d.", + pthread_self(), STRPACKETTYPE(cmd), p->owner_dcb->fd))); return cmd; } -/** - * Return how many packets are included in the server's response. + +/** + * Examine command type and the readbuf. Conclude response + * packet count from the command type or from the first packet + * content. + * Fails if read buffer doesn't include enough data to read the + * packet length. */ -int get_stmt_nresponse_packets( +void init_response_status ( GWBUF* buf, - mysql_server_cmd_t cmd) + mysql_server_cmd_t cmd, + int* npackets, + size_t* nbytes_left) { - int npackets; uint8_t* packet; int nparam; int nattr; uint8_t* data; - switch (cmd) { - case MYSQL_COM_STMT_PREPARE: - data = (uint8_t *)buf->start; - - if (data[4] == 0xff) - { - npackets = 1; /*< error packet */ - } - else - { + ss_dassert(gwbuf_length(buf) >= 3); + + data = (uint8_t *)buf->start; + + if (data[4] == 0xff) /*< error */ + { + *npackets = 1; + } + else + { + switch (cmd) { + case MYSQL_COM_STMT_PREPARE: packet = (uint8_t *)GWBUF_DATA(buf); /** ok + nparam + eof + nattr + eof */ nparam = MYSQL_GET_STMTOK_NPARAM(packet); nattr = MYSQL_GET_STMTOK_NATTR(packet); - npackets = 1 + nparam + MIN(1, nparam) + - nattr + MIN(nattr, 1); - ss_dassert(npackets<128); - } - break; - - default: - npackets = 1; - break; + *npackets = 1 + nparam + MIN(1, nparam) + + nattr + MIN(nattr, 1); + break; + + case MYSQL_COM_QUIT: + case MYSQL_COM_STMT_SEND_LONG_DATA: + case MYSQL_COM_STMT_CLOSE: + *npackets = 0; /*< these don't reply anything */ + break; + + default: + /** + * assume that other session commands respond + * OK or ERR + */ + *npackets = 1; + break; + } } - ss_dassert(npackets<128); - return npackets; + *nbytes_left = MYSQL_GET_PACKET_LEN(data) + MYSQL_HEADER_LEN; + /** + * There is at least one complete packet in the buffer so buffer is bigger + * than packet + */ + ss_dassert(*nbytes_left > 0); + ss_dassert(*npackets > 0); + ss_dassert(*npackets<128); } -int protocol_get_nresponse_packets ( - MySQLProtocol* p) -{ - int rval; - - CHK_PROTOCOL(p); - spinlock_acquire(&p->protocol_lock); - rval = p->protocol_command.nresponse_packets; - spinlock_release(&p->protocol_lock); - ss_dassert(rval<128); - - return rval; -} -bool protocol_set_nresponse_packets ( + +/** + * Read how many packets are left from current response and how many bytes there + * is still to be read from the current packet. + */ +bool protocol_get_response_status ( MySQLProtocol* p, - int nresponse_packets) + int* npackets, + size_t* nbytes) { bool succp; CHK_PROTOCOL(p); + spinlock_acquire(&p->protocol_lock); - if (p->protocol_command.nresponse_packets > 0 && - nresponse_packets > p->protocol_command.nresponse_packets) + *npackets = p->protocol_command.scom_nresponse_packets; + *nbytes = p->protocol_command.scom_nbytes_to_read; + spinlock_release(&p->protocol_lock); + + if (*npackets < 0 && *nbytes == 0) { succp = false; } else { - p->protocol_command.nresponse_packets = nresponse_packets; - ss_dassert(nresponse_packets<128); succp = true; } - spinlock_release(&p->protocol_lock); - + return succp; } +void protocol_set_response_status ( + MySQLProtocol* p, + int npackets_left, + size_t nbytes) +{ + + CHK_PROTOCOL(p); + + spinlock_acquire(&p->protocol_lock); + + p->protocol_command.scom_nbytes_to_read = nbytes; + ss_dassert(p->protocol_command.scom_nbytes_to_read >= 0); + + p->protocol_command.scom_nresponse_packets = npackets_left; + + spinlock_release(&p->protocol_lock); +} + diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index d00d8d7df..38bfdba66 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -115,12 +115,18 @@ int bref_cmp_behind_master( const void* bref1, const void* bref2); +int bref_cmp_current_load( + const void* bref1, + const void* bref2); + + int (*criteria_cmpfun[LAST_CRITERIA])(const void*, const void*)= { NULL, bref_cmp_global_conn, bref_cmp_router_conn, - bref_cmp_behind_master + bref_cmp_behind_master, + bref_cmp_current_load }; static bool select_connect_backend_servers( @@ -460,7 +466,6 @@ static void* newSession( int max_nslaves; /*< max # of slaves used in this session */ int i; const int min_nservers = 1; /*< hard-coded for now */ - static uint64_t router_client_ses_seq; /*< ID for client session */ client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); @@ -1105,11 +1110,11 @@ static int routeQuery( { atomic_add(&inst->stats.n_slave, 1); /** - * This backend_ref waits resultset, flag it. + * Add one waiter to backend reference */ bref_set_state(get_bref_from_dcb(router_cli_ses, slave_dcb), - BREF_WAITING_RESULT); + BREF_WAITING_RESULT); } else { @@ -1154,15 +1159,14 @@ static int routeQuery( { succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER); } + if (succp) { - if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1) { atomic_add(&inst->stats.n_master, 1); /** - * This backend_ref waits reply to write stmt, - * flag it. + * Add one waiter to backend reference. */ bref_set_state(get_bref_from_dcb(router_cli_ses, master_dcb), BREF_WAITING_RESULT); @@ -1368,8 +1372,7 @@ static void clientReply ( scur = &bref->bref_sescmd_cur; /** * Active cursor means that reply is from session command - * execution. Majority of the time there are no session commands - * being executed. + * execution. */ if (sescmd_cursor_is_active(scur)) { @@ -1380,8 +1383,7 @@ static void clientReply ( (uint8_t *)GWBUF_DATA((scur->scmd_cur_cmd->my_sescmd_buf)); size_t len = MYSQL_GET_PACKET_LEN(buf); char* cmdstr = (char *)malloc(len+1); - ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; - + snprintf(cmdstr, len+1, "%s", &buf[5]); LOGIF(LE, (skygw_log_write_flush( @@ -1393,13 +1395,13 @@ static void clientReply ( free(cmdstr); /** Inform the client */ - handle_error_reply_client(ses,writebuf); + handle_error_reply_client(ses, writebuf); /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); goto lock_failed; } - else + else if (GWBUF_IS_TYPE_SESCMD_RESPONSE(writebuf)) { /** * Discard all those responses that have already been sent to @@ -1407,8 +1409,12 @@ static void clientReply ( * needs to be sent to client or NULL. */ writebuf = sescmd_cursor_process_replies(client_dcb, - writebuf, - bref); + writebuf, + bref); + } + else + { + ss_dassert(false); } } @@ -1416,7 +1422,9 @@ static void clientReply ( { /** Write reply to client DCB */ SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); +#if 0 bref_clear_state(bref, BREF_WAITING_RESULT); +#endif } /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); @@ -1427,7 +1435,7 @@ static void clientReply ( /** Log to debug that router was closed */ goto lock_failed; } - /** There is one pending session command to be xexecuted. */ + /** There is one pending session command to be executed. */ if (sescmd_cursor_is_active(scur)) { bool succp; @@ -1479,21 +1487,78 @@ int bref_cmp_behind_master( return 1; } +int bref_cmp_current_load( + const void* bref1, + const void* bref2) +{ + SERVER* s1 = ((backend_ref_t *)bref1)->bref_backend->backend_server; + SERVER* s2 = ((backend_ref_t *)bref2)->bref_backend->backend_server; + + return ((s1->stats.n_current_ops < s2->stats.n_current_ops) ? -1 : + ((s1->stats.n_current_ops > s2->stats.n_current_ops) ? 1 : 0)); +} + + static void bref_clear_state( backend_ref_t* bref, bref_state_t state) { - bref->bref_state &= ~state; + if (state != BREF_WAITING_RESULT) + { + bref->bref_state &= ~state; + } + else + { + int prev1; + int prev2; + + prev1 = atomic_add(&bref->bref_num_result_wait, -1); + ss_dassert(prev1 >= 0); + prev2 = atomic_add( + &bref->bref_backend->backend_server->stats.n_current_ops, -1); + ss_dassert(prev2 >= 0); + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Current waiters %d and ops %d after in %s:%d", + prev1-1, + prev2-1, + bref->bref_backend->backend_server->name, + bref->bref_backend->backend_server->port))); + } } static void bref_set_state( backend_ref_t* bref, bref_state_t state) { - bref->bref_state |= state; - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Set state %d for %s:%d fd %d", + if (state != BREF_WAITING_RESULT) + { + bref->bref_state |= state; + } + else + { + int prev1; + int prev2; + + prev1 = atomic_add(&bref->bref_num_result_wait, 1); + ss_dassert(prev1 >= 0); + prev2 = atomic_add( + &bref->bref_backend->backend_server->stats.n_current_ops, 1); + ss_dassert(prev2 >= 0); + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Current waiters %d and ops %d before in %s:%d", + prev1, + prev2, + bref->bref_backend->backend_server->name, + bref->bref_backend->backend_server->port))); + } + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [bref_set_state] Set state %d for %s:%d fd %d", + pthread_self(), bref->bref_state, bref->bref_backend->backend_server->name, bref->bref_backend->backend_server->port, @@ -1662,6 +1727,13 @@ static bool select_connect_backend_servers( b->backend_conn_count))); break; + case LEAST_CURRENT_OPERATIONS: + LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE, + "%s %d:%d", + b->backend_server->name, + b->backend_server->port, + b->backend_server->stats.n_current_ops))); + break; default: break; } @@ -1681,13 +1753,11 @@ static bool select_connect_backend_servers( LOGIF(LT, (skygw_log_write_flush( LOGFILE_TRACE, "Examine server " - "%s:%d %s with %d connections. " - "router->bitvalue is %d", + "%s:%d %s with %d active operations.", b->backend_server->name, b->backend_server->port, STRSRVSTATUS(b->backend_server), - b->backend_conn_count, - router->bitmask))); + b->backend_server->stats.n_current_ops))); if (SERVER_IS_RUNNING(b->backend_server) && ((b->backend_server->status & router->bitmask) == @@ -2195,35 +2265,27 @@ static GWBUF* sescmd_cursor_process_replies( /** Faster backend has already responded to client : discard */ if (scmd->my_sescmd_is_replied) { + bool last_packet = false; + CHK_GWBUF(replybuf); packet = (uint8_t *)GWBUF_DATA(replybuf); - /** - * If it is response to MYSQL_COM_STMT_PREPARE, then buffer - * only includes the response. - */ - if (scmd->my_sescmd_prop->rses_prop_data.sescmd.my_sescmd_packet_type == - MYSQL_COM_STMT_PREPARE) + + while (!last_packet) { - while (replybuf != NULL) - { -#if defined(SS_DEBUG) - int buflen; - - buflen = GWBUF_LENGTH(replybuf); - replybuf = gwbuf_consume(replybuf, buflen); -#else - replybuf = gwbuf_consume( - replybuf, - GWBUF_LENGTH(replybuf)); + int buflen; + + buflen = GWBUF_LENGTH(replybuf); + + ss_debug(GWBUF_IS_TYPE_SESCMD_RESPONSE(replybuf) == + (scmd->my_sescmd_packet_type == MYSQL_COM_STMT_PREPARE)); + + last_packet = GWBUF_IS_TYPE_RESPONSE_END(replybuf); + /** discard packet */ + replybuf = gwbuf_consume(replybuf, buflen); + } +#if 0 + bref_clear_state(bref, BREF_WAITING_RESULT); #endif - } - } - /** Only consume the leading packet */ - else - { - packetlen = packet[0]+packet[1]*256+packet[2]*256*256; - replybuf = gwbuf_consume(replybuf, packetlen+headerlen); - } } /** Response is in the buffer and it will be sent to client. */ else if (replybuf != NULL) @@ -2408,7 +2470,7 @@ static bool execute_sescmd_in_backend( goto return_succp; } - + if (!sescmd_cursor_is_active(scur)) { /** Cursor is left active when function returns. */ @@ -2425,9 +2487,11 @@ static bool execute_sescmd_in_backend( uint8_t* ptr = GWBUF_DATA(tmpbuf); unsigned char cmd = MYSQL_GET_COMMAND(ptr); - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Just before write, fd %d : cmd %s.", + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [execute_sescmd_in_backend] Just before write, fd " + "%d : cmd %s.", + pthread_self(), dcb->fd, STRPACKETTYPE(cmd)))); } @@ -2444,6 +2508,11 @@ static bool execute_sescmd_in_backend( case MYSQL_COM_QUERY: case MYSQL_COM_INIT_DB: default: + /** + * Mark session command buffer, it triggers writing + * MySQL command to protocol + */ + gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD); rc = dcb->func.write( dcb, sescmd_cursor_clone_querybuf(scur)); @@ -2723,14 +2792,13 @@ static bool route_session_write( rses_property_done(prop); succp = false; goto return_succp; - } - - prop = rses_property_init(RSES_PROP_TYPE_SESCMD); + } /** * Additional reference is created to querybuf to * prevent it from being released before properties * are cleaned up as a part of router sessionclean-up. */ + prop = rses_property_init(RSES_PROP_TYPE_SESCMD); mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses); /** Add sescmd property to router client session */ @@ -2745,12 +2813,11 @@ static bool route_session_write( scur = backend_ref_get_sescmd_cursor(&backend_ref[i]); /** - * This backend_ref waits reply, flag it. + * Add one waiter to backend reference. */ bref_set_state(get_bref_from_dcb(router_cli_ses, backend_ref[i].bref_dcb), - BREF_WAITING_RESULT); - + BREF_WAITING_RESULT); /** * Start execution if cursor is not already executing. * Otherwise, cursor will execute pending commands @@ -2820,6 +2887,7 @@ static void rwsplit_process_options( c == LEAST_GLOBAL_CONNECTIONS || c == LEAST_ROUTER_CONNECTIONS || c == LEAST_BEHIND_MASTER || + c == LEAST_CURRENT_OPERATIONS || c == UNDEFINED_CRITERIA); if (c == UNDEFINED_CRITERIA) @@ -2829,7 +2897,7 @@ static void rwsplit_process_options( "slave selection criteria \"%s\". " "Allowed values are \"LEAST_GLOBAL_CONNECTIONS\", " "LEAST_ROUTER_CONNECTIONS, " - "and \"LEAST_ROUTER_CONNECTIONS\".", + "and \"LEAST_CURRENT_OPERATIONS\".", STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria)))); } else @@ -2957,7 +3025,6 @@ static bool handle_error_new_connection( /** failed DCB has already been replaced */ if (bref == NULL) { - rses_end_locked_router_action(rses); succp = true; goto return_succp; } @@ -2968,7 +3035,6 @@ static bool handle_error_new_connection( */ if (backend_dcb->state != DCB_STATE_POLLING) { - rses_end_locked_router_action(rses); succp = true; goto return_succp; } @@ -2980,7 +3046,9 @@ static bool handle_error_new_connection( DCB* client_dcb; client_dcb = ses->client; client_dcb->func.write(client_dcb, errmsg); - bref_clear_state(bref, BREF_WAITING_RESULT); +#if 0 + bref_clear_state(bref, BREF_WAITING_RESULT); +#endif } bref_clear_state(bref, BREF_IN_USE); bref_set_state(bref, BREF_CLOSED);