From edf9f0c59c18977a63831b3dc2cf533652ada323 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Fri, 27 Jun 2014 10:50:56 +0300 Subject: [PATCH 01/11] Listed and described shortly max_slave_connections and server_options=slave_selection_criteria parameters. --- server/MaxScale_template.cnf | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/MaxScale_template.cnf b/server/MaxScale_template.cnf index 94981afbc..71afbbdbb 100644 --- a/server/MaxScale_template.cnf +++ b/server/MaxScale_template.cnf @@ -35,6 +35,13 @@ passwd=maxpwd # Valid options are: # # router= +# max_slave_connections= +# router_options=,,... +# valid options include:master,slave,synced, +# slave_selection_criteria= +# LEAST_CURRENT_OPERATIONS, +# LEAST_ROUTER_CONNECTIONS, +# LEAST_GLOBAL_CONNECTIONS # servers=,,... # user= # passwd= From 20637ee2240e31e56fc12b62a0444b609f55abd3 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Fri, 27 Jun 2014 10:54:11 +0300 Subject: [PATCH 02/11] Removed unnecessary SERVER pointer from dcb.c:dcb_call_foreach(), changed declaration and call accordingly. --- server/core/dcb.c | 1 - server/include/dcb.h | 2 ++ server/modules/monitor/mysql_mon.c | 3 +-- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/core/dcb.c b/server/core/dcb.c index 5f4c9fa15..6c6a6d152 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -1837,7 +1837,6 @@ static DCB* dcb_get_next ( } void dcb_call_foreach ( - SERVER* srv, DCB_REASON reason) { switch (reason) { diff --git a/server/include/dcb.h b/server/include/dcb.h index 816bc7c5b..406a588f2 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -296,6 +296,8 @@ bool dcb_set_state( DCB* dcb, dcb_state_t new_state, dcb_state_t* old_state); +void dcb_call_foreach (DCB_REASON reason); + /* DCB flags values */ diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index 38625a6ed..8dd162253 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -324,7 +324,6 @@ unsigned long int server_version = 0; char *server_string; unsigned long id = handle->id; int replication_heartbeat = handle->replicationHeartbeat; -static int conn_err_count; if (database->server->monuser != NULL) { @@ -695,7 +694,7 @@ MONITOR_SERVERS *ptr; if (mon_status_changed(ptr)) { - dcb_call_foreach(ptr->server, DCB_REASON_NOT_RESPONDING); + dcb_call_foreach(DCB_REASON_NOT_RESPONDING); } if (mon_status_changed(ptr) || From fcf67716fd9daf4b6cb270d6a495d78cade78df9 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Sun, 29 Jun 2014 22:21:30 +0300 Subject: [PATCH 03/11] Added mechanism for choosing slave for a query based on the current load in all connected slaves. Counting operations is not correctly done here. Reading values and choosing accordingly is done. Fixed several things in session command reply processing. --- server/core/buffer.c | 24 +- server/core/server.c | 4 +- server/include/buffer.h | 24 +- server/include/server.h | 1 + .../include/mysql_client_server_protocol.h | 27 +- server/modules/include/readwritesplit.h | 10 +- server/modules/protocol/mysql_backend.c | 293 +++++++++++------ server/modules/protocol/mysql_client.c | 5 +- server/modules/protocol/mysql_common.c | 306 ++++++++++++------ .../routing/readwritesplit/readwritesplit.c | 196 +++++++---- 10 files changed, 593 insertions(+), 297 deletions(-) diff --git a/server/core/buffer.c b/server/core/buffer.c index 2708d32ce..993f1d2af 100644 --- a/server/core/buffer.c +++ b/server/core/buffer.c @@ -152,6 +152,7 @@ GWBUF *gwbuf_clone_portion( } atomic_add(&buf->sbuf->refcount, 1); clonebuf->sbuf = buf->sbuf; + clonebuf->gwbuf_type = buf->gwbuf_type; /*< clone info bits too */ clonebuf->start = (void *)((char*)buf->start)+start_offset; clonebuf->end = (void *)((char *)clonebuf->start)+length; clonebuf->gwbuf_type = buf->gwbuf_type; /*< clone the type for now */ @@ -232,12 +233,12 @@ GWBUF *ptr = head; if (!head) return tail; CHK_GWBUF(head); - CHK_GWBUF(tail); while (ptr->next) { ptr = ptr->next; } ptr->next = tail; + return head; } @@ -316,27 +317,12 @@ gwbuf_trim(GWBUF *buf, unsigned int n_bytes) return buf; } -bool gwbuf_set_type( +void gwbuf_set_type( GWBUF* buf, gwbuf_type_t type) { - bool succp; - CHK_GWBUF(buf); - - switch (type) { - case GWBUF_TYPE_MYSQL: - case GWBUF_TYPE_PLAINSQL: - case GWBUF_TYPE_UNDEFINED: - case GWBUF_TYPE_SINGLE_STMT: /*< buffer contains one stmt */ - buf->gwbuf_type |= type; - succp = true; - break; - default: - succp = false; - break; - } - ss_dassert(succp); - return succp; + CHK_GWBUF(buf); + buf->gwbuf_type |= type; } diff --git a/server/core/server.c b/server/core/server.c index c1bec6189..2abd0a11c 100644 --- a/server/core/server.c +++ b/server/core/server.c @@ -260,7 +260,8 @@ char *stat; } dcb_printf(dcb, "\tNumber of connections: %d\n", ptr->stats.n_connections); dcb_printf(dcb, "\tCurrent no. of conns: %d\n", ptr->stats.n_current); - ptr = ptr->next; + dcb_printf(dcb, "\tCurrent no. of operations: %d\n", ptr->stats.n_current_ops); + ptr = ptr->next; } spinlock_release(&server_spin); } @@ -296,6 +297,7 @@ char *stat; } dcb_printf(dcb, "\tNumber of connections: %d\n", server->stats.n_connections); dcb_printf(dcb, "\tCurrent no. of conns: %d\n", server->stats.n_current); + dcb_printf(dcb, "\tCurrent no. of operations: %d\n", server->stats.n_current_ops); } /** diff --git a/server/include/buffer.h b/server/include/buffer.h index 3c4c022e3..9729c538c 100644 --- a/server/include/buffer.h +++ b/server/include/buffer.h @@ -46,16 +46,22 @@ typedef enum { - GWBUF_TYPE_UNDEFINED = 0x00, - GWBUF_TYPE_PLAINSQL = 0x01, - GWBUF_TYPE_MYSQL = 0x02, - GWBUF_TYPE_SINGLE_STMT = 0x04 + GWBUF_TYPE_UNDEFINED = 0x00, + GWBUF_TYPE_PLAINSQL = 0x01, + GWBUF_TYPE_MYSQL = 0x02, + GWBUF_TYPE_SINGLE_STMT = 0x04, + GWBUF_TYPE_SESCMD_RESPONSE = 0x08, + GWBUF_TYPE_RESPONSE_END = 0x10, + GWBUF_TYPE_SESCMD = 0x20 } gwbuf_type_t; -#define GWBUF_IS_TYPE_UNDEFINED(b) (b->gwbuf_type == 0) -#define GWBUF_IS_TYPE_PLAINSQL(b) (b->gwbuf_type & GWBUF_TYPE_PLAINSQL) -#define GWBUF_IS_TYPE_MYSQL(b) (b->gwbuf_type & GWBUF_TYPE_MYSQL) -#define GWBUF_IS_TYPE_SINGLE_STMT(b) (b->gwbuf_type & GWBUF_TYPE_SINGLE_STMT) +#define GWBUF_IS_TYPE_UNDEFINED(b) (b->gwbuf_type == 0) +#define GWBUF_IS_TYPE_PLAINSQL(b) (b->gwbuf_type & GWBUF_TYPE_PLAINSQL) +#define GWBUF_IS_TYPE_MYSQL(b) (b->gwbuf_type & GWBUF_TYPE_MYSQL) +#define GWBUF_IS_TYPE_SINGLE_STMT(b) (b->gwbuf_type & GWBUF_TYPE_SINGLE_STMT) +#define GWBUF_IS_TYPE_SESCMD_RESPONSE(b) (b->gwbuf_type & GWBUF_TYPE_SESCMD_RESPONSE) +#define GWBUF_IS_TYPE_RESPONSE_END(b) (b->gwbuf_type & GWBUF_TYPE_RESPONSE_END) +#define GWBUF_IS_TYPE_SESCMD(b) (b->gwbuf_type & GWBUF_TYPE_SESCMD) /** * A structure to encapsulate the data in a form that the data itself can be @@ -114,5 +120,5 @@ extern GWBUF *gwbuf_trim(GWBUF *head, unsigned int length); extern unsigned int gwbuf_length(GWBUF *head); extern GWBUF *gwbuf_clone_portion(GWBUF *head, size_t offset, size_t len); extern GWBUF *gwbuf_clone_transform(GWBUF *head, gwbuf_type_t type); -extern bool gwbuf_set_type(GWBUF *head, gwbuf_type_t type); +extern void gwbuf_set_type(GWBUF *head, gwbuf_type_t type); #endif diff --git a/server/include/server.h b/server/include/server.h index d32413bb7..3f95c99f0 100644 --- a/server/include/server.h +++ b/server/include/server.h @@ -47,6 +47,7 @@ typedef struct { int n_connections; /**< Number of connections */ int n_current; /**< Current connections */ + int n_current_ops; /**< Current active operations */ } SERVER_STATS; /** diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index 1d3749cfb..ffde09871 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -66,6 +66,7 @@ #define GW_MYSQL_LOOP_TIMEOUT 300000000 #define GW_MYSQL_READ 0 #define GW_MYSQL_WRITE 1 +#define MYSQL_HEADER_LEN 4L #define GW_MYSQL_PROTOCOL_VERSION 10 // version is 10 #define GW_MYSQL_HANDSHAKE_FILLER 0x00 @@ -235,7 +236,8 @@ typedef enum mysql_server_cmd { MYSQL_COM_STMT_RESET, MYSQL_COM_SET_OPTION, MYSQL_COM_STMT_FETCH, - MYSQL_COM_DAEMON + MYSQL_COM_DAEMON, + MYSQL_COM_END /*< Must be the last */ } mysql_server_cmd_t; @@ -245,9 +247,10 @@ typedef enum mysql_server_cmd { * one MySQLProtocol and one server command list. */ typedef struct server_command_st { - mysql_server_cmd_t cmd; - int nresponse_packets; /** filled when reply arrives */ - struct server_command_st* next; + mysql_server_cmd_t scom_cmd; + int scom_nresponse_packets; /*< packets in response */ + size_t scom_nbytes_to_read; /*< bytes left to read in current packet */ + struct server_command_st* scom_next; } server_command_t; /* @@ -262,6 +265,7 @@ typedef struct { * we are running on */ SPINLOCK protocol_lock; server_command_t protocol_command; /*< list of active commands */ + server_command_t* protocol_cmd_history; /*< command history list */ mysql_auth_state_t protocol_auth_state; /*< Authentication status */ uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /*< server scramble, * created or received */ @@ -285,6 +289,7 @@ typedef struct { #define MYSQL_GET_STMTOK_NPARAM(payload) (gw_mysql_get_byte2(&payload[9])) #define MYSQL_GET_STMTOK_NATTR(payload) (gw_mysql_get_byte2(&payload[11])) #define MYSQL_IS_ERROR_PACKET(payload) (MYSQL_GET_COMMAND(payload)==0xff) +#define MYSQL_GET_NATTR(payload) ((int)payload[4]) #endif /** _MYSQL_PROTOCOL_H */ @@ -365,8 +370,16 @@ void protocol_add_srv_command(MySQLProtocol* p, mysql_server_cmd_t cmd); void protocol_remove_srv_command(MySQLProtocol* p); bool protocol_waits_response(MySQLProtocol* p); mysql_server_cmd_t protocol_get_srv_command(MySQLProtocol* p,bool removep); -int get_stmt_nresponse_packets(GWBUF* buf, mysql_server_cmd_t cmd); -int protocol_get_nresponse_packets (MySQLProtocol* p); -bool protocol_set_nresponse_packets (MySQLProtocol* p, int nresponse_packets); +int get_stmt_nresponse_packets(GWBUF* buf, mysql_server_cmd_t cmd); +bool protocol_get_response_status (MySQLProtocol* p, int* npackets, size_t* nbytes); +void protocol_set_response_status (MySQLProtocol* p, int npackets, size_t nbytes); +void protocol_archive_srv_command(MySQLProtocol* p); + + +void init_response_status ( + GWBUF* buf, + mysql_server_cmd_t cmd, + int* npackets, + size_t* nbytes); diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 4db72ae5e..2a7a46418 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -58,7 +58,7 @@ typedef enum bref_state { #define BREF_IS_NOT_USED(s) (s->bref_state & ~BREF_IN_USE) #define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE) -#define BREF_IS_WAITING_RESULT(s) (s->bref_state & BREF_WAITING_RESULT) +#define BREF_IS_WAITING_RESULT(s) (s->bref_num_result_wait > 0) #define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED) typedef enum backend_type_t { @@ -90,9 +90,10 @@ typedef enum rses_property_type_t { typedef enum select_criteria { UNDEFINED_CRITERIA=0, LEAST_GLOBAL_CONNECTIONS, /*< all connections established by MaxScale */ - DEFAULT_CRITERIA=LEAST_GLOBAL_CONNECTIONS, LEAST_ROUTER_CONNECTIONS, /*< connections established by this router */ LEAST_BEHIND_MASTER, + LEAST_CURRENT_OPERATIONS, + DEFAULT_CRITERIA=LEAST_CURRENT_OPERATIONS, LAST_CRITERIA /*< not used except for an index */ } select_criteria_t; @@ -106,7 +107,9 @@ typedef enum select_criteria { strncmp(s,"LEAST_BEHIND_MASTER", strlen("LEAST_BEHIND_MASTER")) == 0 ? \ LEAST_BEHIND_MASTER : ( \ strncmp(s,"LEAST_ROUTER_CONNECTIONS", strlen("LEAST_ROUTER_CONNECTIONS")) == 0 ? \ - LEAST_ROUTER_CONNECTIONS : UNDEFINED_CRITERIA))) + LEAST_ROUTER_CONNECTIONS : ( \ + strncmp(s,"LEAST_CURRENT_OPERATIONS", strlen("LEAST_CURRENT_OPERATIONS")) == 0 ? \ + LEAST_CURRENT_OPERATIONS : UNDEFINED_CRITERIA)))) /** * Session variable command @@ -190,6 +193,7 @@ typedef struct backend_ref_st { BACKEND* bref_backend; DCB* bref_dcb; bref_state_t bref_state; + int bref_num_result_wait; sescmd_cursor_t bref_sescmd_cur; #if defined(SS_DEBUG) skygw_chk_t bref_chk_tail; diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index cdcdf2c05..8a29d12b8 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -65,6 +65,10 @@ static int gw_backend_hangup(DCB *dcb); static int backend_write_delayqueue(DCB *dcb); static void backend_set_delayqueue(DCB *dcb, GWBUF *queue); static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue); +static GWBUF* process_response_data (DCB* dcb, GWBUF* readbuf, int nbytes_to_process); + + + #if defined(NOT_USED) static int gw_session(DCB *backend_dcb, void *data); #endif @@ -416,21 +420,20 @@ static int gw_read_backend_event(DCB *dcb) { /* reading MySQL command output from backend and writing to the client */ { - GWBUF *readbuf = NULL; + GWBUF *read_buffer = NULL; ROUTER_OBJECT *router = NULL; ROUTER *router_instance = NULL; void *rsession = NULL; SESSION *session = dcb->session; int nbytes_read = 0; - mysql_server_cmd_t srvcmd = MYSQL_COM_UNDEFINED; - + CHK_SESSION(session); router = session->service->router; router_instance = session->service->router_instance; rsession = session->router_session; /* read available backend data */ - rc = dcb_read(dcb, &readbuf); + rc = dcb_read(dcb, &read_buffer); if (rc < 0) { @@ -463,7 +466,7 @@ static int gw_read_backend_event(DCB *dcb) { rc = 0; goto return_rc; } - nbytes_read = gwbuf_length(readbuf); + nbytes_read = gwbuf_length(read_buffer); if (nbytes_read == 0) { @@ -471,74 +474,41 @@ static int gw_read_backend_event(DCB *dcb) { } else { - ss_dassert(readbuf != NULL); + ss_dassert(read_buffer != NULL); } - /** - * ask for next response (1 or more packets) like in - * gw_MySQL_get_next_packet but gw_MySQL_get_next_response - */ - srvcmd = protocol_get_srv_command((MySQLProtocol *)dcb->protocol, - false); - /** - * If backend DCB is waiting for response to COM_STMT_PREPARE, - * it, then only that must be passed to clientReply. - * - * If response consists of ses cmd response and response to - * COM_STMT_PREPARE, there can't be anything after - * COM_STMT_PREPARE response because whole buffer may be - * discarded since router doesn't know the borderlines of MySQL - * packets. - */ - - /** - * Read all packets from which belong to STMT PREPARE - * response. - * Move packets not belonging to STMT PREPARE response to - * dcb_readqueue. - * When whole response is read, pass forward to - * clientReply. - */ - if (srvcmd == MYSQL_COM_STMT_PREPARE) + /** Packet prefix was read earlier */ + if (dcb->dcb_readqueue) { - MySQLProtocol* p; - int nresponse_packets; - GWBUF* tmpbuf; + read_buffer = gwbuf_append(dcb->dcb_readqueue, read_buffer); + nbytes_read = gwbuf_length(read_buffer); - p = (MySQLProtocol *)dcb->protocol; - nresponse_packets = protocol_get_nresponse_packets(p); - - /** count only once per response */ - if (nresponse_packets == 0) - { - nresponse_packets = get_stmt_nresponse_packets( - readbuf, - srvcmd); - } - tmpbuf = gw_MySQL_get_packets(&readbuf, &nresponse_packets); - gwbuf_append(dcb->dcb_readqueue, readbuf); - readbuf = tmpbuf; - - /** contains incomplete response to STMT PREPARE */ - if (nresponse_packets != 0) + if (nbytes_read < 5) /*< read at least command type */ { rc = 0; - - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Backend fd %d read incomplete response packet. " - "Waiting %d more, cmd %s.", - dcb->fd, - nresponse_packets, - STRPACKETTYPE(srvcmd)))); - /** - * store the number of how many packets the - * reponse consists of to backend's protocol. - */ - protocol_set_nresponse_packets(p, nresponse_packets); goto return_rc; } - protocol_remove_srv_command((MySQLProtocol *)dcb->protocol); + /** There is at least length and command type. */ + else + { + read_buffer = dcb->dcb_readqueue; + dcb->dcb_readqueue = NULL; + } + } + else + { + if (nbytes_read < 5) + { + gwbuf_append(dcb->dcb_readqueue, read_buffer); + rc = 0; + goto return_rc; + } + } + /** If protocol has command set it is session command */ + if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) != + MYSQL_COM_UNDEFINED) + { + read_buffer = process_response_data(dcb, read_buffer, nbytes_read); } /*< * If dcb->session->client is freed already it may be NULL. @@ -554,20 +524,20 @@ static int gw_read_backend_event(DCB *dcb) { if (client_protocol->protocol_auth_state == MYSQL_IDLE) { - gwbuf_set_type(readbuf, GWBUF_TYPE_MYSQL); - - router->clientReply(router_instance, - rsession, - readbuf, - dcb); + gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL); + router->clientReply( + router_instance, + rsession, + read_buffer, + dcb); rc = 1; } goto return_rc; } else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL) { - gwbuf_set_type(readbuf, GWBUF_TYPE_MYSQL); - router->clientReply(router_instance, rsession, readbuf, dcb); + gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL); + router->clientReply(router_instance, rsession, read_buffer, dcb); rc = 1; } } @@ -672,7 +642,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) * If auth failed, return value is 0, write and buffered write * return 1. */ - switch(backend_protocol->protocol_auth_state) { + switch (backend_protocol->protocol_auth_state) { case MYSQL_AUTH_FAILED: { size_t len; @@ -712,22 +682,18 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) dcb, dcb->fd, STRPROTOCOLSTATE(backend_protocol->protocol_auth_state)))); + spinlock_release(&dcb->authlock); /** + * Statement type is used in readwrite split router. + * Command is *not* set for readconn router. + * * Server commands are stored to MySQLProtocol structure - * if buffer always includes a single statement. That - * information is stored in GWBUF type field - * (GWBUF_TYPE_SINGLE_STMT bit). + * if buffer always includes a single statement. */ - if (GWBUF_IS_TYPE_SINGLE_STMT(queue)) + if (GWBUF_IS_TYPE_SINGLE_STMT(queue) && + GWBUF_IS_TYPE_SESCMD(queue)) { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Write to backend's DCB fd %d " - "cmd %s protocol state %s.", - dcb->fd, - STRPACKETTYPE(cmd), - STRPROTOCOLSTATE(backend_protocol->protocol_auth_state)))); /** Record the command to backend's protocol */ protocol_add_srv_command(backend_protocol, cmd); } @@ -751,21 +717,12 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) dcb->fd, STRPROTOCOLSTATE(backend_protocol->protocol_auth_state)))); /** - * Since it is known that buffer contains one complete - * command, store the command to backend's protocol. When - * backend server responses the command determines how - * response needs to be processed. This is mainly due to - * MYSQL_COM_STMT_PREPARE whose response consists of - * arbitrary number of packets. + * In case of session commands, store command to DCB's + * protocol struct. */ - if (GWBUF_IS_TYPE_SINGLE_STMT(queue)) + if (GWBUF_IS_TYPE_SINGLE_STMT(queue) && + GWBUF_IS_TYPE_SESCMD(queue)) { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Write to backend's delayqueue fd %d " - "protocol state %s.", - dcb->fd, - STRPROTOCOLSTATE(backend_protocol->protocol_auth_state)))); /** Record the command to backend's protocol */ protocol_add_srv_command(backend_protocol, cmd); } @@ -1010,7 +967,6 @@ gw_backend_close(DCB *dcb) DCB* client_dcb; SESSION* session; GWBUF* quitbuf; - bool succp; CHK_DCB(dcb); session = dcb->session; @@ -1095,7 +1051,6 @@ static int backend_write_delayqueue(DCB *dcb) ROUTER *router_instance = NULL; void *rsession = NULL; SESSION *session = dcb->session; - int receive_rc = 0; CHK_SESSION(session); @@ -1240,3 +1195,141 @@ static int gw_session(DCB *backend_dcb, void *data) { return 1; } */ + + +/** + * Move packets or parts of packets from redbuf to outbuf as the packet headers + * and lengths have been noticed and counted. + * Session commands need to be marked so that they can be handled properly in + * the router's clientReply. + * Return the pointer to outbuf. + */ +static GWBUF* process_response_data ( + DCB* dcb, + GWBUF* readbuf, + int nbytes_to_process) /*< number of new bytes read */ +{ + int npackets_left = 0; /*< response's packet count */ + size_t nbytes_left = 0; /*< nbytes to be read for the packet */ + MySQLProtocol* p; + GWBUF* outbuf = NULL; + + /** Get command which was stored in gw_MySQLWrite_backend */ + p = DCB_PROTOCOL(dcb, MySQLProtocol); + CHK_PROTOCOL(p); + + /** All buffers processed here are sescmd responses */ + gwbuf_set_type(readbuf, GWBUF_TYPE_SESCMD_RESPONSE); + + /** + * Now it is known how many packets there should be and how much + * is read earlier. + */ + while (nbytes_to_process != 0) + { + mysql_server_cmd_t srvcmd; + GWBUF* sparebuf; + bool succp; + + srvcmd = protocol_get_srv_command(p, false); + + /** + * Read values from protocol structure, fails if values are + * uninitialized. + */ + if (npackets_left == 0) + { + succp = protocol_get_response_status(p, &npackets_left, &nbytes_left); + + if (!succp || npackets_left == 0) + { + /** + * Examine command type and the readbuf. Conclude response + * packet count from the command type or from the first + * packet content. Fails if read buffer doesn't include + * enough data to read the packet length. + */ + init_response_status(readbuf, srvcmd, &npackets_left, &nbytes_left); + } + } + /** Only session commands with responses should be processed */ + ss_dassert(npackets_left > 0); + + /** Read incomplete packet. */ + if (nbytes_left > nbytes_to_process) + { + /** Includes length info so it can be processed */ + if (nbytes_to_process >= 5) + { + /** discard source buffer */ + readbuf = gwbuf_consume(readbuf, GWBUF_LENGTH(readbuf)); + nbytes_left -= nbytes_to_process; + } + nbytes_to_process = 0; + } + /** Packet was read. All bytes belonged to the last packet. */ + else if (nbytes_left == nbytes_to_process) + { + nbytes_left = 0; + nbytes_to_process = 0; + ss_dassert(npackets_left > 0); + npackets_left -= 1; + outbuf = gwbuf_append(outbuf, readbuf); + readbuf = NULL; + } + /** + * Packet was read. There should be more since bytes were + * left over. + * Move the next packet to its own buffer and add that next + * to the prev packet's buffer. + */ + else /*< nbytes_left < nbytes_to_process */ + { + size_t len = GWBUF_LENGTH(readbuf); + + nbytes_to_process -= nbytes_left; + + /** Move the prefix of the buffer to outbuf from redbuf */ + outbuf = gwbuf_append(outbuf, gwbuf_clone_portion(readbuf, 0, nbytes_left)); + readbuf = gwbuf_consume(readbuf, nbytes_left); + ss_dassert(npackets_left > 0); + npackets_left -= 1; + nbytes_left = 0; + } + + /** Store new status to protocol structure */ + protocol_set_response_status(p, npackets_left, nbytes_left); + + /** A complete packet was read */ + if (nbytes_left == 0) + { + /** No more packets in this response */ + if (npackets_left == 0) + { + GWBUF* b = outbuf; + + while (b->next != NULL) + { + b = b->next; + } + /** Mark last as end of response */ + gwbuf_set_type(b, GWBUF_TYPE_RESPONSE_END); + + /** Archive the command */ + protocol_archive_srv_command(p); + } + /** Read next packet */ + else + { + uint8_t* data; + + /** Read next packet length */ + data = GWBUF_DATA(readbuf); + nbytes_left = MYSQL_GET_PACKET_LEN(data)+MYSQL_HEADER_LEN; + /** Store new status to protocol structure */ + protocol_set_response_status(p, npackets_left, nbytes_left); + } + } + } + return outbuf; +} diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 08c648d39..037cb0522 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -574,9 +574,8 @@ int gw_read_client_event( else { uint8_t* data = (uint8_t *)GWBUF_DATA(read_buffer); - size_t packetlen = MYSQL_GET_PACKET_LEN(data)+4; - if (nbytes_read < 3 || nbytes_read < packetlen) + if (nbytes_read < 3 || nbytes_read < MYSQL_GET_PACKET_LEN(data)+4) { gwbuf_append(dcb->dcb_readqueue, read_buffer); rc = 0; @@ -1296,6 +1295,8 @@ static int gw_error_client_event( CHK_DCB(dcb); + session = dcb->session; + LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, "%lu [gw_error_client_event] Error event handling for DCB %p " diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index dd677ec58..4ce34452b 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -41,6 +41,9 @@ extern int gw_write_backend_event(DCB *dcb); extern int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue); extern int gw_error_backend_event(DCB *dcb); +static server_command_t* server_command_init(server_command_t* srvcmd, + mysql_server_cmd_t cmd); + /** * Creates MySQL protocol structure @@ -77,7 +80,9 @@ MySQLProtocol* mysql_protocol_init( goto return_p; } p->protocol_auth_state = MYSQL_ALLOC; - p->protocol_command.cmd = MYSQL_COM_UNDEFINED; + p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED; + p->protocol_command.scom_nresponse_packets = 0; + p->protocol_command.scom_nbytes_to_read = 0; #if defined(SS_DEBUG) p->protocol_chk_top = CHK_NUM_PROTOCOL; p->protocol_chk_tail = CHK_NUM_PROTOCOL; @@ -1557,6 +1562,94 @@ GWBUF* gw_MySQL_get_packets( } +static server_command_t* server_command_init( + server_command_t* srvcmd, + mysql_server_cmd_t cmd) +{ + server_command_t* c; + + if (srvcmd != NULL) + { + c = srvcmd; + } + else + { + c = (server_command_t *)malloc(sizeof(server_command_t)); + } + c->scom_cmd = cmd; + c->scom_nresponse_packets = -1; + c->scom_nbytes_to_read = 0; + c->scom_next = NULL; + + return c; +} + +static server_command_t* server_command_copy( + server_command_t* srvcmd) +{ + server_command_t* c = + (server_command_t *)malloc(sizeof(server_command_t)); + *c = *srvcmd; + + return c; +} + +#define MAX_CMD_HISTORY 10 + +void protocol_archive_srv_command( + MySQLProtocol* p) +{ + server_command_t* s1; + server_command_t** s2; + int len; + + spinlock_acquire(&p->protocol_lock); + + s1 = &p->protocol_command; + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Move command %s from fd %d to command history.", + STRPACKETTYPE(s1->scom_cmd), + p->owner_dcb->fd))); + + /** Copy to history list */ + s2 = &p->protocol_cmd_history; + + if (*s2 != NULL) + { + len = 0; + + while ((*s2)->scom_next != NULL) + { + *s2 = (*s2)->scom_next; + len += 1; + } + } + *s2 = server_command_copy(s1); + + /** Keep history limits, remove oldest */ + if (len > MAX_CMD_HISTORY) + { + server_command_t* c = p->protocol_cmd_history; + p->protocol_cmd_history = p->protocol_cmd_history->scom_next; + free(c); + } + + /** Remove from command list */ + if (s1->scom_next == NULL) + { + p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED; + } + else + { + p->protocol_command = *(s1->scom_next); + free(s1->scom_next); + } + spinlock_release(&p->protocol_lock); +} + + /** * If router expects to get separate, complete statements, add MySQL command * to MySQLProtocol structure. It is removed when response has arrived. @@ -1565,49 +1658,42 @@ void protocol_add_srv_command( MySQLProtocol* p, mysql_server_cmd_t cmd) { - spinlock_acquire(&p->protocol_lock); + server_command_t* c; - if (p->protocol_command.cmd == MYSQL_COM_UNDEFINED) + spinlock_acquire(&p->protocol_lock); + + /** this is the only server command in protocol */ + if (p->protocol_command.scom_cmd == MYSQL_COM_UNDEFINED) { - p->protocol_command.cmd = cmd; - p->protocol_command.nresponse_packets = 0; - - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Added command %s to fd %d.", - STRPACKETTYPE(cmd), - p->owner_dcb->fd))); + /** write into structure */ + server_command_init(&p->protocol_command, cmd); } else { - server_command_t* c = - (server_command_t *)malloc(sizeof(server_command_t)); - c->cmd = cmd; - c->nresponse_packets = 0; - c->next = NULL; - - p->protocol_command.next = c; - + /** add to the end of list */ + p->protocol_command.scom_next = server_command_init(NULL, cmd); + } + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Added command %s to fd %d.", + STRPACKETTYPE(cmd), + p->owner_dcb->fd))); + +#if defined(SS_DEBUG) + c = &p->protocol_command; + + while (c != NULL && c->scom_cmd != MYSQL_COM_UNDEFINED) + { LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, - "Added another command %s to fd %d.", - STRPACKETTYPE(cmd), - p->owner_dcb->fd))); -#if defined(SS_DEBUG) - c = &p->protocol_command; - - while (c != NULL && c->cmd != MYSQL_COM_UNDEFINED) - { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "fd %d : %d %s", - p->owner_dcb->fd, - c->cmd, - STRPACKETTYPE(c->cmd)))); - c = c->next; - } -#endif + "fd %d : %d %s", + p->owner_dcb->fd, + c->scom_cmd, + STRPACKETTYPE(c->scom_cmd)))); + c = c->scom_next; } +#endif spinlock_release(&p->protocol_lock); } @@ -1628,17 +1714,17 @@ void protocol_remove_srv_command( LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Removed command %s from fd %d.", - STRPACKETTYPE(s->cmd), + STRPACKETTYPE(s->scom_cmd), p->owner_dcb->fd))); - if (s->next == NULL) + if (s->scom_next == NULL) { - p->protocol_command.cmd = MYSQL_COM_UNDEFINED; + p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED; } else { - p->protocol_command = *(s->next); - free(s->next); + p->protocol_command = *(s->scom_next); + free(s->scom_next); } spinlock_release(&p->protocol_lock); @@ -1650,97 +1736,133 @@ mysql_server_cmd_t protocol_get_srv_command( { mysql_server_cmd_t cmd; - cmd = p->protocol_command.cmd; + cmd = p->protocol_command.scom_cmd; if (removep) { protocol_remove_srv_command(p); } - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Read command %s for fd %d.", + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [protocol_get_srv_command] Read command %s for fd %d.", + pthread_self(), STRPACKETTYPE(cmd), p->owner_dcb->fd))); return cmd; } -/** - * Return how many packets are included in the server's response. + +/** + * Examine command type and the readbuf. Conclude response + * packet count from the command type or from the first packet + * content. + * Fails if read buffer doesn't include enough data to read the + * packet length. */ -int get_stmt_nresponse_packets( +void init_response_status ( GWBUF* buf, - mysql_server_cmd_t cmd) + mysql_server_cmd_t cmd, + int* npackets, + size_t* nbytes_left) { - int npackets; uint8_t* packet; int nparam; int nattr; uint8_t* data; - switch (cmd) { - case MYSQL_COM_STMT_PREPARE: - data = (uint8_t *)buf->start; - - if (data[4] == 0xff) - { - npackets = 1; /*< error packet */ - } - else - { + ss_dassert(gwbuf_length(buf) >= 3); + + data = (uint8_t *)buf->start; + + if (data[4] == 0xff) /*< error */ + { + *npackets = 1; + } + else + { + switch (cmd) { + case MYSQL_COM_STMT_PREPARE: packet = (uint8_t *)GWBUF_DATA(buf); /** ok + nparam + eof + nattr + eof */ nparam = MYSQL_GET_STMTOK_NPARAM(packet); nattr = MYSQL_GET_STMTOK_NATTR(packet); - npackets = 1 + nparam + MIN(1, nparam) + - nattr + MIN(nattr, 1); - ss_dassert(npackets<128); - } - break; - - default: - npackets = 1; - break; + *npackets = 1 + nparam + MIN(1, nparam) + + nattr + MIN(nattr, 1); + break; + + case MYSQL_COM_QUIT: + case MYSQL_COM_STMT_SEND_LONG_DATA: + case MYSQL_COM_STMT_CLOSE: + *npackets = 0; /*< these don't reply anything */ + break; + + default: + /** + * assume that other session commands respond + * OK or ERR + */ + *npackets = 1; + break; + } } - ss_dassert(npackets<128); - return npackets; + *nbytes_left = MYSQL_GET_PACKET_LEN(data) + MYSQL_HEADER_LEN; + /** + * There is at least one complete packet in the buffer so buffer is bigger + * than packet + */ + ss_dassert(*nbytes_left > 0); + ss_dassert(*npackets > 0); + ss_dassert(*npackets<128); } -int protocol_get_nresponse_packets ( - MySQLProtocol* p) -{ - int rval; - - CHK_PROTOCOL(p); - spinlock_acquire(&p->protocol_lock); - rval = p->protocol_command.nresponse_packets; - spinlock_release(&p->protocol_lock); - ss_dassert(rval<128); - - return rval; -} -bool protocol_set_nresponse_packets ( + +/** + * Read how many packets are left from current response and how many bytes there + * is still to be read from the current packet. + */ +bool protocol_get_response_status ( MySQLProtocol* p, - int nresponse_packets) + int* npackets, + size_t* nbytes) { bool succp; CHK_PROTOCOL(p); + spinlock_acquire(&p->protocol_lock); - if (p->protocol_command.nresponse_packets > 0 && - nresponse_packets > p->protocol_command.nresponse_packets) + *npackets = p->protocol_command.scom_nresponse_packets; + *nbytes = p->protocol_command.scom_nbytes_to_read; + spinlock_release(&p->protocol_lock); + + if (*npackets < 0 && *nbytes == 0) { succp = false; } else { - p->protocol_command.nresponse_packets = nresponse_packets; - ss_dassert(nresponse_packets<128); succp = true; } - spinlock_release(&p->protocol_lock); - + return succp; } +void protocol_set_response_status ( + MySQLProtocol* p, + int npackets_left, + size_t nbytes) +{ + + CHK_PROTOCOL(p); + + spinlock_acquire(&p->protocol_lock); + + p->protocol_command.scom_nbytes_to_read = nbytes; + ss_dassert(p->protocol_command.scom_nbytes_to_read >= 0); + + p->protocol_command.scom_nresponse_packets = npackets_left; + + spinlock_release(&p->protocol_lock); +} + diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index d00d8d7df..38bfdba66 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -115,12 +115,18 @@ int bref_cmp_behind_master( const void* bref1, const void* bref2); +int bref_cmp_current_load( + const void* bref1, + const void* bref2); + + int (*criteria_cmpfun[LAST_CRITERIA])(const void*, const void*)= { NULL, bref_cmp_global_conn, bref_cmp_router_conn, - bref_cmp_behind_master + bref_cmp_behind_master, + bref_cmp_current_load }; static bool select_connect_backend_servers( @@ -460,7 +466,6 @@ static void* newSession( int max_nslaves; /*< max # of slaves used in this session */ int i; const int min_nservers = 1; /*< hard-coded for now */ - static uint64_t router_client_ses_seq; /*< ID for client session */ client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); @@ -1105,11 +1110,11 @@ static int routeQuery( { atomic_add(&inst->stats.n_slave, 1); /** - * This backend_ref waits resultset, flag it. + * Add one waiter to backend reference */ bref_set_state(get_bref_from_dcb(router_cli_ses, slave_dcb), - BREF_WAITING_RESULT); + BREF_WAITING_RESULT); } else { @@ -1154,15 +1159,14 @@ static int routeQuery( { succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER); } + if (succp) { - if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1) { atomic_add(&inst->stats.n_master, 1); /** - * This backend_ref waits reply to write stmt, - * flag it. + * Add one waiter to backend reference. */ bref_set_state(get_bref_from_dcb(router_cli_ses, master_dcb), BREF_WAITING_RESULT); @@ -1368,8 +1372,7 @@ static void clientReply ( scur = &bref->bref_sescmd_cur; /** * Active cursor means that reply is from session command - * execution. Majority of the time there are no session commands - * being executed. + * execution. */ if (sescmd_cursor_is_active(scur)) { @@ -1380,8 +1383,7 @@ static void clientReply ( (uint8_t *)GWBUF_DATA((scur->scmd_cur_cmd->my_sescmd_buf)); size_t len = MYSQL_GET_PACKET_LEN(buf); char* cmdstr = (char *)malloc(len+1); - ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; - + snprintf(cmdstr, len+1, "%s", &buf[5]); LOGIF(LE, (skygw_log_write_flush( @@ -1393,13 +1395,13 @@ static void clientReply ( free(cmdstr); /** Inform the client */ - handle_error_reply_client(ses,writebuf); + handle_error_reply_client(ses, writebuf); /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); goto lock_failed; } - else + else if (GWBUF_IS_TYPE_SESCMD_RESPONSE(writebuf)) { /** * Discard all those responses that have already been sent to @@ -1407,8 +1409,12 @@ static void clientReply ( * needs to be sent to client or NULL. */ writebuf = sescmd_cursor_process_replies(client_dcb, - writebuf, - bref); + writebuf, + bref); + } + else + { + ss_dassert(false); } } @@ -1416,7 +1422,9 @@ static void clientReply ( { /** Write reply to client DCB */ SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); +#if 0 bref_clear_state(bref, BREF_WAITING_RESULT); +#endif } /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); @@ -1427,7 +1435,7 @@ static void clientReply ( /** Log to debug that router was closed */ goto lock_failed; } - /** There is one pending session command to be xexecuted. */ + /** There is one pending session command to be executed. */ if (sescmd_cursor_is_active(scur)) { bool succp; @@ -1479,21 +1487,78 @@ int bref_cmp_behind_master( return 1; } +int bref_cmp_current_load( + const void* bref1, + const void* bref2) +{ + SERVER* s1 = ((backend_ref_t *)bref1)->bref_backend->backend_server; + SERVER* s2 = ((backend_ref_t *)bref2)->bref_backend->backend_server; + + return ((s1->stats.n_current_ops < s2->stats.n_current_ops) ? -1 : + ((s1->stats.n_current_ops > s2->stats.n_current_ops) ? 1 : 0)); +} + + static void bref_clear_state( backend_ref_t* bref, bref_state_t state) { - bref->bref_state &= ~state; + if (state != BREF_WAITING_RESULT) + { + bref->bref_state &= ~state; + } + else + { + int prev1; + int prev2; + + prev1 = atomic_add(&bref->bref_num_result_wait, -1); + ss_dassert(prev1 >= 0); + prev2 = atomic_add( + &bref->bref_backend->backend_server->stats.n_current_ops, -1); + ss_dassert(prev2 >= 0); + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Current waiters %d and ops %d after in %s:%d", + prev1-1, + prev2-1, + bref->bref_backend->backend_server->name, + bref->bref_backend->backend_server->port))); + } } static void bref_set_state( backend_ref_t* bref, bref_state_t state) { - bref->bref_state |= state; - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Set state %d for %s:%d fd %d", + if (state != BREF_WAITING_RESULT) + { + bref->bref_state |= state; + } + else + { + int prev1; + int prev2; + + prev1 = atomic_add(&bref->bref_num_result_wait, 1); + ss_dassert(prev1 >= 0); + prev2 = atomic_add( + &bref->bref_backend->backend_server->stats.n_current_ops, 1); + ss_dassert(prev2 >= 0); + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Current waiters %d and ops %d before in %s:%d", + prev1, + prev2, + bref->bref_backend->backend_server->name, + bref->bref_backend->backend_server->port))); + } + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [bref_set_state] Set state %d for %s:%d fd %d", + pthread_self(), bref->bref_state, bref->bref_backend->backend_server->name, bref->bref_backend->backend_server->port, @@ -1662,6 +1727,13 @@ static bool select_connect_backend_servers( b->backend_conn_count))); break; + case LEAST_CURRENT_OPERATIONS: + LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE, + "%s %d:%d", + b->backend_server->name, + b->backend_server->port, + b->backend_server->stats.n_current_ops))); + break; default: break; } @@ -1681,13 +1753,11 @@ static bool select_connect_backend_servers( LOGIF(LT, (skygw_log_write_flush( LOGFILE_TRACE, "Examine server " - "%s:%d %s with %d connections. " - "router->bitvalue is %d", + "%s:%d %s with %d active operations.", b->backend_server->name, b->backend_server->port, STRSRVSTATUS(b->backend_server), - b->backend_conn_count, - router->bitmask))); + b->backend_server->stats.n_current_ops))); if (SERVER_IS_RUNNING(b->backend_server) && ((b->backend_server->status & router->bitmask) == @@ -2195,35 +2265,27 @@ static GWBUF* sescmd_cursor_process_replies( /** Faster backend has already responded to client : discard */ if (scmd->my_sescmd_is_replied) { + bool last_packet = false; + CHK_GWBUF(replybuf); packet = (uint8_t *)GWBUF_DATA(replybuf); - /** - * If it is response to MYSQL_COM_STMT_PREPARE, then buffer - * only includes the response. - */ - if (scmd->my_sescmd_prop->rses_prop_data.sescmd.my_sescmd_packet_type == - MYSQL_COM_STMT_PREPARE) + + while (!last_packet) { - while (replybuf != NULL) - { -#if defined(SS_DEBUG) - int buflen; - - buflen = GWBUF_LENGTH(replybuf); - replybuf = gwbuf_consume(replybuf, buflen); -#else - replybuf = gwbuf_consume( - replybuf, - GWBUF_LENGTH(replybuf)); + int buflen; + + buflen = GWBUF_LENGTH(replybuf); + + ss_debug(GWBUF_IS_TYPE_SESCMD_RESPONSE(replybuf) == + (scmd->my_sescmd_packet_type == MYSQL_COM_STMT_PREPARE)); + + last_packet = GWBUF_IS_TYPE_RESPONSE_END(replybuf); + /** discard packet */ + replybuf = gwbuf_consume(replybuf, buflen); + } +#if 0 + bref_clear_state(bref, BREF_WAITING_RESULT); #endif - } - } - /** Only consume the leading packet */ - else - { - packetlen = packet[0]+packet[1]*256+packet[2]*256*256; - replybuf = gwbuf_consume(replybuf, packetlen+headerlen); - } } /** Response is in the buffer and it will be sent to client. */ else if (replybuf != NULL) @@ -2408,7 +2470,7 @@ static bool execute_sescmd_in_backend( goto return_succp; } - + if (!sescmd_cursor_is_active(scur)) { /** Cursor is left active when function returns. */ @@ -2425,9 +2487,11 @@ static bool execute_sescmd_in_backend( uint8_t* ptr = GWBUF_DATA(tmpbuf); unsigned char cmd = MYSQL_GET_COMMAND(ptr); - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Just before write, fd %d : cmd %s.", + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [execute_sescmd_in_backend] Just before write, fd " + "%d : cmd %s.", + pthread_self(), dcb->fd, STRPACKETTYPE(cmd)))); } @@ -2444,6 +2508,11 @@ static bool execute_sescmd_in_backend( case MYSQL_COM_QUERY: case MYSQL_COM_INIT_DB: default: + /** + * Mark session command buffer, it triggers writing + * MySQL command to protocol + */ + gwbuf_set_type(scur->scmd_cur_cmd->my_sescmd_buf, GWBUF_TYPE_SESCMD); rc = dcb->func.write( dcb, sescmd_cursor_clone_querybuf(scur)); @@ -2723,14 +2792,13 @@ static bool route_session_write( rses_property_done(prop); succp = false; goto return_succp; - } - - prop = rses_property_init(RSES_PROP_TYPE_SESCMD); + } /** * Additional reference is created to querybuf to * prevent it from being released before properties * are cleaned up as a part of router sessionclean-up. */ + prop = rses_property_init(RSES_PROP_TYPE_SESCMD); mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses); /** Add sescmd property to router client session */ @@ -2745,12 +2813,11 @@ static bool route_session_write( scur = backend_ref_get_sescmd_cursor(&backend_ref[i]); /** - * This backend_ref waits reply, flag it. + * Add one waiter to backend reference. */ bref_set_state(get_bref_from_dcb(router_cli_ses, backend_ref[i].bref_dcb), - BREF_WAITING_RESULT); - + BREF_WAITING_RESULT); /** * Start execution if cursor is not already executing. * Otherwise, cursor will execute pending commands @@ -2820,6 +2887,7 @@ static void rwsplit_process_options( c == LEAST_GLOBAL_CONNECTIONS || c == LEAST_ROUTER_CONNECTIONS || c == LEAST_BEHIND_MASTER || + c == LEAST_CURRENT_OPERATIONS || c == UNDEFINED_CRITERIA); if (c == UNDEFINED_CRITERIA) @@ -2829,7 +2897,7 @@ static void rwsplit_process_options( "slave selection criteria \"%s\". " "Allowed values are \"LEAST_GLOBAL_CONNECTIONS\", " "LEAST_ROUTER_CONNECTIONS, " - "and \"LEAST_ROUTER_CONNECTIONS\".", + "and \"LEAST_CURRENT_OPERATIONS\".", STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria)))); } else @@ -2957,7 +3025,6 @@ static bool handle_error_new_connection( /** failed DCB has already been replaced */ if (bref == NULL) { - rses_end_locked_router_action(rses); succp = true; goto return_succp; } @@ -2968,7 +3035,6 @@ static bool handle_error_new_connection( */ if (backend_dcb->state != DCB_STATE_POLLING) { - rses_end_locked_router_action(rses); succp = true; goto return_succp; } @@ -2980,7 +3046,9 @@ static bool handle_error_new_connection( DCB* client_dcb; client_dcb = ses->client; client_dcb->func.write(client_dcb, errmsg); - bref_clear_state(bref, BREF_WAITING_RESULT); +#if 0 + bref_clear_state(bref, BREF_WAITING_RESULT); +#endif } bref_clear_state(bref, BREF_IN_USE); bref_set_state(bref, BREF_CLOSED); From 862d504a746dc871bbcb105e826c4d6a7f58cf1b Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Mon, 30 Jun 2014 11:00:04 +0300 Subject: [PATCH 04/11] Enabled set and clear the flag BREF_WAITING_RESULT. It is actually a counter which is modified with atomic operations only. Setting and clearing BREF_WAITING_RESULT also includes atomic increment and decrement of corresponding backend server's current operations counter, backend_server->stats.n_current_ops. --- .../routing/readwritesplit/readwritesplit.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 38bfdba66..1fac99e2d 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -1417,12 +1417,13 @@ static void clientReply ( ss_dassert(false); } } - + if (writebuf != NULL && client_dcb != NULL) { /** Write reply to client DCB */ SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); -#if 0 +#if 1 + /** Set response status as replied */ bref_clear_state(bref, BREF_WAITING_RESULT); #endif } @@ -1512,8 +1513,11 @@ static void bref_clear_state( int prev1; int prev2; + /** Decrease waiter count */ prev1 = atomic_add(&bref->bref_num_result_wait, -1); ss_dassert(prev1 >= 0); + + /** Decrease global operation count */ prev2 = atomic_add( &bref->bref_backend->backend_server->stats.n_current_ops, -1); ss_dassert(prev2 >= 0); @@ -1541,8 +1545,11 @@ static void bref_set_state( int prev1; int prev2; + /** Increase waiter count */ prev1 = atomic_add(&bref->bref_num_result_wait, 1); ss_dassert(prev1 >= 0); + + /** Increase global operation count */ prev2 = atomic_add( &bref->bref_backend->backend_server->stats.n_current_ops, 1); ss_dassert(prev2 >= 0); @@ -2283,7 +2290,8 @@ static GWBUF* sescmd_cursor_process_replies( /** discard packet */ replybuf = gwbuf_consume(replybuf, buflen); } -#if 0 +#if 1 + /** Set response status received */ bref_clear_state(bref, BREF_WAITING_RESULT); #endif } @@ -3046,7 +3054,7 @@ static bool handle_error_new_connection( DCB* client_dcb; client_dcb = ses->client; client_dcb->func.write(client_dcb, errmsg); -#if 0 +#if 1 bref_clear_state(bref, BREF_WAITING_RESULT); #endif } From 3e3c1af21134761caae21c0b8148e4e0719678b5 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Mon, 30 Jun 2014 13:43:25 +0300 Subject: [PATCH 05/11] Clean up --- server/modules/protocol/mysql_backend.c | 5 ----- server/modules/protocol/mysql_client.c | 2 -- 2 files changed, 7 deletions(-) diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 8a29d12b8..255b205c1 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -754,7 +754,6 @@ static int gw_error_backend_event(DCB *dcb) void* rsession; ROUTER_OBJECT* router; ROUTER* router_instance; - int rc = 0; GWBUF* errbuf; bool succp; @@ -909,7 +908,6 @@ gw_backend_hangup(DCB *dcb) void* rsession; ROUTER_OBJECT* router; ROUTER* router_instance; - int rc = 0; bool succp; GWBUF* errbuf; @@ -1228,7 +1226,6 @@ static GWBUF* process_response_data ( while (nbytes_to_process != 0) { mysql_server_cmd_t srvcmd; - GWBUF* sparebuf; bool succp; srvcmd = protocol_get_srv_command(p, false); @@ -1285,8 +1282,6 @@ static GWBUF* process_response_data ( */ else /*< nbytes_left < nbytes_to_process */ { - size_t len = GWBUF_LENGTH(readbuf); - nbytes_to_process -= nbytes_left; /** Move the prefix of the buffer to outbuf from redbuf */ diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 037cb0522..7da257b4b 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -1290,7 +1290,6 @@ return_rc: static int gw_error_client_event( DCB* dcb) { - int rc; SESSION* session; CHK_DCB(dcb); @@ -1363,7 +1362,6 @@ gw_client_close(DCB *dcb) static int gw_client_hangup_event(DCB *dcb) { - int rc; SESSION* session; CHK_DCB(dcb); From 6d672cb967941e9602e3a947803c71ee0d2cf8c2 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Mon, 30 Jun 2014 13:44:34 +0300 Subject: [PATCH 06/11] Added status bit BREF_QUERY_ACTIVE to indicate if a query, other than session command, sent to backend for execution. The flag is cleared when the first packet belonging to the response arrives. The flag is part of the active operation counting, which is utilized in load balancing. The active operation count per backend is used by default as criteria when router chooses to which backend a query should be routed. --- server/modules/include/readwritesplit.h | 6 +- .../routing/readwritesplit/readwritesplit.c | 74 ++++++++++--------- 2 files changed, 44 insertions(+), 36 deletions(-) diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 2a7a46418..9f8e3ce30 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -52,13 +52,15 @@ typedef enum prep_stmt_state { typedef enum bref_state { BREF_IN_USE = 0x01, - BREF_WAITING_RESULT = 0x02, /*< for anything that responds */ - BREF_CLOSED = 0x04 + BREF_WAITING_RESULT = 0x02, /*< for session commands only */ + BREF_QUERY_ACTIVE = 0x04, /*< for other queries */ + BREF_CLOSED = 0x08 } bref_state_t; #define BREF_IS_NOT_USED(s) (s->bref_state & ~BREF_IN_USE) #define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE) #define BREF_IS_WAITING_RESULT(s) (s->bref_num_result_wait > 0) +#define BREF_IS_QUERY_ACTIVE(s) (s->bref_state & BREF_QUERY_ACTIVE) #define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED) typedef enum backend_type_t { diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 1fac99e2d..b25469967 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -216,10 +216,7 @@ static mysql_sescmd_t* sescmd_cursor_get_command( static bool sescmd_cursor_next( sescmd_cursor_t* scur); -static GWBUF* sescmd_cursor_process_replies( - DCB* client_dcb, - GWBUF* replybuf, - backend_ref_t* bref); +static GWBUF* sescmd_cursor_process_replies(GWBUF* replybuf, backend_ref_t* bref); static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, @@ -1108,13 +1105,15 @@ static int routeQuery( { if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1) { + backend_ref_t* bref; + atomic_add(&inst->stats.n_slave, 1); /** - * Add one waiter to backend reference + * Add one query response waiter to backend reference */ - bref_set_state(get_bref_from_dcb(router_cli_ses, - slave_dcb), - BREF_WAITING_RESULT); + bref = get_bref_from_dcb(router_cli_ses, slave_dcb); + bref_set_state(bref, BREF_QUERY_ACTIVE); + bref_set_state(bref, BREF_WAITING_RESULT); } else { @@ -1164,12 +1163,16 @@ static int routeQuery( { if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1) { + backend_ref_t* bref; + atomic_add(&inst->stats.n_master, 1); + /** - * Add one waiter to backend reference. + * Add one write response waiter to backend reference */ - bref_set_state(get_bref_from_dcb(router_cli_ses, master_dcb), - BREF_WAITING_RESULT); + bref = get_bref_from_dcb(router_cli_ses, master_dcb); + bref_set_state(bref, BREF_QUERY_ACTIVE); + bref_set_state(bref, BREF_WAITING_RESULT); } } rses_end_locked_router_action(router_cli_ses); @@ -1408,24 +1411,38 @@ static void clientReply ( * the client. Return with buffer including response that * needs to be sent to client or NULL. */ - writebuf = sescmd_cursor_process_replies(client_dcb, - writebuf, - bref); + writebuf = sescmd_cursor_process_replies(writebuf, bref); } else { ss_dassert(false); } + /** + * If response will be sent to client, decrease waiter count. + * This applies to session commands only. Counter decrement + * for other type of queries is done outside this block. + */ + if (writebuf != NULL && client_dcb != NULL) + { + /** Set response status as replied */ + bref_clear_state(bref, BREF_WAITING_RESULT); + } } - + /** + * Clear BREF_QUERY_ACTIVE flag and decrease waiter counter. + * This applies for queries other than session commands. + */ + else if (BREF_IS_QUERY_ACTIVE(bref)) + { + bref_clear_state(bref, BREF_QUERY_ACTIVE); + /** Set response status as replied */ + bref_clear_state(bref, BREF_WAITING_RESULT); + } + if (writebuf != NULL && client_dcb != NULL) { /** Write reply to client DCB */ SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); -#if 1 - /** Set response status as replied */ - bref_clear_state(bref, BREF_WAITING_RESULT); -#endif } /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); @@ -1449,6 +1466,8 @@ static void clientReply ( bref->bref_backend->backend_server->port))); succp = execute_sescmd_in_backend(bref); + + ss_dassert(succp); } /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); @@ -1515,12 +1534,12 @@ static void bref_clear_state( /** Decrease waiter count */ prev1 = atomic_add(&bref->bref_num_result_wait, -1); - ss_dassert(prev1 >= 0); + ss_dassert(prev1 > 0); /** Decrease global operation count */ prev2 = atomic_add( &bref->bref_backend->backend_server->stats.n_current_ops, -1); - ss_dassert(prev2 >= 0); + ss_dassert(prev2 > 0); LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, @@ -2246,13 +2265,9 @@ static void mysql_sescmd_done( * 9. s+q+ */ static GWBUF* sescmd_cursor_process_replies( - DCB* client_dcb, GWBUF* replybuf, backend_ref_t* bref) { - const size_t headerlen = 4; /*< mysql packet header */ - size_t packetlen; - uint8_t* packet; mysql_sescmd_t* scmd; sescmd_cursor_t* scur; @@ -2260,7 +2275,6 @@ static GWBUF* sescmd_cursor_process_replies( ss_dassert(SPINLOCK_IS_LOCKED(&(scur->scmd_cur_rses->rses_lock))); scmd = sescmd_cursor_get_command(scur); - CHK_DCB(client_dcb); CHK_GWBUF(replybuf); /** @@ -2275,25 +2289,18 @@ static GWBUF* sescmd_cursor_process_replies( bool last_packet = false; CHK_GWBUF(replybuf); - packet = (uint8_t *)GWBUF_DATA(replybuf); while (!last_packet) { int buflen; buflen = GWBUF_LENGTH(replybuf); - - ss_debug(GWBUF_IS_TYPE_SESCMD_RESPONSE(replybuf) == - (scmd->my_sescmd_packet_type == MYSQL_COM_STMT_PREPARE)); - last_packet = GWBUF_IS_TYPE_RESPONSE_END(replybuf); /** discard packet */ replybuf = gwbuf_consume(replybuf, buflen); } -#if 1 /** Set response status received */ bref_clear_state(bref, BREF_WAITING_RESULT); -#endif } /** Response is in the buffer and it will be sent to client. */ else if (replybuf != NULL) @@ -2941,7 +2948,6 @@ static void handleError ( error_action_t action, bool* succp) { - DCB* client_dcb; SESSION* session; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session; From d40711f2192a538a63b13028cddd3feac57ac852 Mon Sep 17 00:00:00 2001 From: MassimilianoPinto Date: Mon, 30 Jun 2014 17:04:26 +0200 Subject: [PATCH 07/11] failure message in start, for any error failure message in start, for any error --- etc/init.d/maxscale | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/etc/init.d/maxscale b/etc/init.d/maxscale index d694fa3ba..42ae02f31 100644 --- a/etc/init.d/maxscale +++ b/etc/init.d/maxscale @@ -70,13 +70,13 @@ start() { [ $CHECK_RET -eq 0 ] && echo -n $my_check && success fi - echo - # Return rigth code if [ $RETVAL -ne 0 ]; then + failure RETVAL=$_RETVAL_NOT_RUNNING fi + echo return $RETVAL } From 6c1960e53b235d4116f7dec9b5fe0ce6ac98f502 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Mon, 30 Jun 2014 16:16:27 +0100 Subject: [PATCH 08/11] Increase accuracy of percentages for connection distribution. --- server/modules/routing/readconnroute.c | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index 0c4a4e678..71d529d2b 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -238,7 +238,7 @@ char *weightby; } inst->servers[n]->server = server; inst->servers[n]->current_connection_count = 0; - inst->servers[n]->weight = 100; + inst->servers[n]->weight = 1000; n++; } inst->servers[n] = NULL; @@ -267,7 +267,9 @@ char *weightby; int perc; backend = inst->servers[n]; perc = (atoi(serverGetParameter(backend->server, - weightby)) * 100) / total; + weightby)) * 1000) / total; + if (perc == 0) + perc = 1; backend->weight = perc; if (perc == 0) { @@ -444,18 +446,18 @@ BACKEND *master_host = NULL; candidate = inst->servers[i]; } else if ((inst->servers[i]->current_connection_count - * 100) / inst->servers[i]->weight < + * 1000) / inst->servers[i]->weight < (candidate->current_connection_count * - 100) / candidate->weight) + 1000) / candidate->weight) { /* This running server has fewer connections, set it as a new candidate */ candidate = inst->servers[i]; } else if ((inst->servers[i]->current_connection_count - * 100) / inst->servers[i]->weight == + * 1000) / inst->servers[i]->weight == (candidate->current_connection_count * - 100) / candidate->weight && + 1000) / candidate->weight && inst->servers[i]->server->stats.n_connections < candidate->server->stats.n_connections) { @@ -747,9 +749,9 @@ char *weightby; for (i = 0; router_inst->servers[i]; i++) { backend = router_inst->servers[i]; - dcb_printf(dcb, "\t\t%-20s %3d%% %d\n", + dcb_printf(dcb, "\t\t%-20s %3.1f%% %d\n", backend->server->unique_name, - backend->weight, + (float)backend->weight / 10, backend->current_connection_count); } From f5e6f99ba72b046895899eef99b8070ae91cf725 Mon Sep 17 00:00:00 2001 From: MassimilianoPinto Date: Mon, 30 Jun 2014 18:31:09 +0200 Subject: [PATCH 09/11] Added failure in start() for not running process Added failure in start() for not running process --- etc/init.d/maxscale | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etc/init.d/maxscale b/etc/init.d/maxscale index 42ae02f31..567c8f284 100644 --- a/etc/init.d/maxscale +++ b/etc/init.d/maxscale @@ -67,7 +67,7 @@ start() { sleep 2 my_check=`status -p $MAXSCALE_PIDFILE $MAXSCALE_BIN/maxscale` CHECK_RET=$? - [ $CHECK_RET -eq 0 ] && echo -n $my_check && success + [ $CHECK_RET -eq 0 ] && echo -n $my_check && success || failure fi # Return rigth code From dd54dde4efb0b1a63478b27ec61064e5b66c69ee Mon Sep 17 00:00:00 2001 From: MassimilianoPinto Date: Mon, 30 Jun 2014 19:09:50 +0200 Subject: [PATCH 10/11] fix for root master NULL pinter fix for root master NULL pinter --- server/modules/monitor/mysql_mon.c | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index a4f34206d..87d3b40b6 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -340,8 +340,6 @@ char *uname = handle->defaultUser; char *passwd = handle->defaultPasswd; unsigned long int server_version = 0; char *server_string; -unsigned long id = handle->id; -int replication_heartbeat = handle->replicationHeartbeat; if (database->server->monuser != NULL) { @@ -1038,11 +1036,19 @@ static MONITOR_SERVERS *get_replication_tree(MYSQL_MONITOR *handle, int num_serv ptr = ptr->next; } - /* If root master is in MAINT, return NULL */ - if (SERVER_IN_MAINT(handle->master->server)) { + /* + * Return the root master + */ + + if (handle->master != NULL) { + /* If the root master is in MAINT, return NULL */ + if (SERVER_IN_MAINT(handle->master->server)) { + return NULL; + } else { + return handle->master; + } + } else { return NULL; - } else { - return handle->master; } } From 439cedc800b3c3c4a1007889aff1d2b90543a271 Mon Sep 17 00:00:00 2001 From: MassimilianoPinto Date: Mon, 30 Jun 2014 19:33:30 +0200 Subject: [PATCH 11/11] Fixed num servers counter Fixed num servers counter --- server/modules/monitor/mysql_mon.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index 87d3b40b6..b224e5962 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -555,7 +555,12 @@ MONITOR_SERVERS *root_master; handle->status = MONITOR_STOPPED; return; } + /* reset num_servers */ + num_servers = 0; + + /* start from the first server in the list */ ptr = handle->databases; + while (ptr) { /* copy server status into monitor pending_status */