diff --git a/server/core/gateway.c b/server/core/gateway.c index 8c01b5a62..df1e44cac 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -129,94 +129,7 @@ static void signal_set (int sig, void (*handler)(int)) { } } -int handle_event_errors(DCB *dcb) { - fprintf(stderr, "#### Handle error function for [%i] is [%s]\n", dcb->state, gw_dcb_state2string(dcb->state)); - - if (dcb->state == DCB_STATE_DISCONNECTED) { - fprintf(stderr, "#### Handle error function, session is %p\n", dcb->session); - return 1; - } - -#ifdef GW_EVENT_DEBUG - if (event != -1) { - fprintf(stderr, ">>>>>> DCB state %i, Protocol State %i: event %i, %i\n", dcb->state, protocol->state, event & EPOLLERR, event & EPOLLHUP); - if(event & EPOLLHUP) - fprintf(stderr, "EPOLLHUP\n"); - - if(event & EPOLLERR) - fprintf(stderr, "EPOLLERR\n"); - - if(event & EPOLLPRI) - fprintf(stderr, "EPOLLPRI\n"); - } -#endif - - if (dcb->state != DCB_STATE_LISTENING) { - if (poll_remove_dcb(dcb) == -1) { - fprintf(stderr, "poll_remove_dcb: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno)); - } - -#ifdef GW_EVENT_DEBUG - fprintf(stderr, "closing fd [%i]=[%i], from events\n", dcb->fd, protocol->fd); -#endif - if (dcb->fd) { - //fprintf(stderr, "Client protocol dcb->protocol %p\n", dcb->protocol); - - gw_mysql_close((MySQLProtocol **)&dcb->protocol); - fprintf(stderr, "Client protocol dcb->protocol %p\n", dcb->protocol); - - dcb->state = DCB_STATE_DISCONNECTED; - - } - } - - fprintf(stderr, "Return from error handling, dcb is %p\n", dcb); - //free(dcb->session); - dcb->state = DCB_STATE_FREED; - - fprintf(stderr, "#### Handle error function RETURN for [%i] is [%s]\n", dcb->state, gw_dcb_state2string(dcb->state)); - //free(dcb); - - return 1; -} - -int handle_event_errors_backend(DCB *dcb) { - - fprintf(stderr, "#### Handle Backend error function for %i\n", dcb->fd); - -#ifdef GW_EVENT_DEBUG - if (event != -1) { - fprintf(stderr, ">>>>>> Backend DCB state %i, Protocol State %i: event %i, %i\n", dcb->state, dcb->proto_state, event & EPOLLERR, event & EPOLLHUP); - if(event & EPOLLHUP) - fprintf(stderr, "EPOLLHUP\n"); - - if(event & EPOLLERR) - fprintf(stderr, "EPOLLERR\n"); - - if(event & EPOLLPRI) - fprintf(stderr, "EPOLLPRI\n"); - } -#endif - - if (dcb->state != DCB_STATE_LISTENING) { - if (poll_remove_dcb(dcb) == -1) { - fprintf(stderr, "Backend poll_remove_dcb: from events check failed to delete %i, [%i]:[%s]\n", dcb->fd, errno, strerror(errno)); - } - -#ifdef GW_EVENT_DEBUG - fprintf(stderr, "Backend closing fd [%i]=%i, from events check\n", dcb->fd, protocol->fd); -#endif - if (dcb->fd) { - dcb->state = DCB_STATE_DISCONNECTED; - fprintf(stderr, "Freeing backend MySQL conn %p, %p\n", dcb->protocol, &dcb->protocol); - gw_mysql_close((MySQLProtocol **)&dcb->protocol); - fprintf(stderr, "Freeing backend MySQL conn %p, %p\n", dcb->protocol, &dcb->protocol); - } - } - - return 0; -} /** * Cleanup the temporary data directory we created for the gateway diff --git a/server/core/gateway_mysql_protocol.c b/server/core/gateway_mysql_protocol.c index 0dbfbb0b8..65ff4cf2e 100644 --- a/server/core/gateway_mysql_protocol.c +++ b/server/core/gateway_mysql_protocol.c @@ -41,6 +41,10 @@ #include #include +#include +#include +#include + /////////////////////////////////////// // MYSQL_conn structure setup @@ -49,19 +53,28 @@ MySQLProtocol *gw_mysql_init(MySQLProtocol *data) { MySQLProtocol *input = NULL; + // structure allocation + input = calloc(1, sizeof(MySQLProtocol)); + if (input == NULL) { - // structure allocation - input = calloc(1, sizeof(MySQLProtocol)); - - if (input == NULL) - return NULL; - + int eno = errno; + errno = 0; + skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [gw_mysql_init] failed to allocate memory for MySQL " + "protocol object. Errno %d, %s.", + pthread_self(), + eno, + strerror(eno)); + goto return_input; } - + input->protocol_chk_top = CHK_NUM_PROTOCOL; + input->protocol_chk_tail = CHK_NUM_PROTOCOL; + simple_mutex_init(&input->protocol_mutex, "MySQL Protocol mutex"); #ifdef MYSQL_CONN_DEBUG fprintf(stderr, "gw_mysql_init() called\n"); #endif - +return_input: return input; } diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index ea8313020..6307b6e72 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -82,17 +82,52 @@ struct dcb; +#if 0 +/* MySQL Protocol States */ +#define MYSQL_ALLOC 0 /* Allocate data */ +#define MYSQL_PENDING_CONNECT 1 /* Backend socket pending connect */ +#define MYSQL_CONNECTED 2 /* Backend socket Connected */ +#define MYSQL_AUTH_SENT 3 /* Authentication handshake has been sent */ +#define MYSQL_AUTH_RECV 4 /* Received user, password, db and capabilities */ +#define MYSQL_AUTH_FAILED 5 /* Auth failed, return error packet */ +#define MYSQL_IDLE 6 /* Auth done. Protocol is idle, waiting for statements */ +#define MYSQL_ROUTING 7 /* The received command has been routed to backend(s) */ +#define MYSQL_WAITING_RESULT 8 /* Waiting for result set */ +#define MYSQL_SESSION_CHANGE 9 /* Pending session change */ +#else +typedef enum { + MYSQL_ALLOC, + MYSQL_PENDING_CONNECT, + MYSQL_CONNECTED, + MYSQL_AUTH_SENT, + MYSQL_AUTH_RECV, + MYSQL_AUTH_FAILED, + MYSQL_IDLE, + MYSQL_ROUTING, + MYSQL_WAITING_RESULT, + MYSQL_SESSION_CHANGE +} mysql_pstate_t; +#endif + /* * MySQL Protocol specific state data */ typedef struct { + skygw_chk_t protocol_chk_top; + simple_mutex_t protocol_mutex; int fd; /* The socket descriptor */ - struct dcb *descriptor; /* The DCB of the socket we are running on */ - int state; /* Current descriptor state */ - uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /* server scramble, created or received */ - uint32_t server_capabilities; /* server capabilities, created or received */ - uint32_t client_capabilities; /* client capabilities, created or received */ - unsigned long tid; /* MySQL Thread ID, in handshake */ + struct dcb *descriptor; /** The DCB of the socket + * we are running on */ + mysql_pstate_t state; /** Current descriptor state */ + uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /** server scramble, + * created or received */ + uint32_t server_capabilities; /** server capabilities, + * created or received */ + uint32_t client_capabilities; /** client capabilities, + * created or received */ + unsigned long tid; /** MySQL Thread ID, in + * handshake */ + skygw_chk_t protocol_chk_tail; } MySQLProtocol; /* @@ -105,17 +140,6 @@ typedef struct mysql_session { char db[MYSQL_DATABASE_MAXLEN]; /* database */ } MYSQL_session; -/* MySQL Protocol States */ -#define MYSQL_ALLOC 0 /* Allocate data */ -#define MYSQL_PENDING_CONNECT 1 /* Backend socket pending connect */ -#define MYSQL_CONNECTED 2 /* Backend socket Connected */ -#define MYSQL_AUTH_SENT 3 /* Authentication handshake has been sent */ -#define MYSQL_AUTH_RECV 4 /* Received user, password, db and capabilities */ -#define MYSQL_AUTH_FAILED 5 /* Auth failed, return error packet */ -#define MYSQL_IDLE 6 /* Auth done. Protocol is idle, waiting for statements */ -#define MYSQL_ROUTING 7 /* The received command has been routed to backend(s) */ -#define MYSQL_WAITING_RESULT 8 /* Waiting for result set */ -#define MYSQL_SESSION_CHANGE 9 /* Pending session change */ /* MySQL states for authentication reply */ #define MYSQL_FAILED_AUTHENTICATION 1 diff --git a/server/modules/protocol/httpd.c b/server/modules/protocol/httpd.c index 3e38c5891..4b8423111 100644 --- a/server/modules/protocol/httpd.c +++ b/server/modules/protocol/httpd.c @@ -325,7 +325,7 @@ int n_connect = 0; client->remote = strdup(inet_ntoa(addr.sin_addr)); memcpy(&client->func, &MyObject, sizeof(GWPROTOCOL)); client->session = session_alloc(dcb->session->service, client); - + ss_dassert(client->session->state != SESSION_STATE_ALLOC); client->state = DCB_STATE_IDLE; /* create the session data for HTTPD */ diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index 5827ca21f..df75b785e 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -89,10 +89,21 @@ static void *newSession(ROUTER *instance, SESSION *session); static void closeSession(ROUTER *instance, void *router_session); static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue); static void diagnostics(ROUTER *instance, DCB *dcb); -static void clientReply(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb); +static void clientReply( + ROUTER *instance, + void *router_session, + GWBUF *queue, + DCB *backend_dcb); /** The module object definition */ -static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, routeQuery, diagnostics, clientReply }; +static ROUTER_OBJECT MyObject = { + createInstance, + newSession, + closeSession, + routeQuery, + diagnostics, + clientReply +}; static SPINLOCK instlock; static INSTANCE *instances; @@ -250,7 +261,7 @@ static void * newSession(ROUTER *instance, SESSION *session) { INSTANCE *inst = (INSTANCE *)instance; -CLIENT_SESSION *client; +CLIENT_SESSION *client_ses; BACKEND *candidate = NULL; int i; @@ -263,7 +274,7 @@ int i; inst); - if ((client = (CLIENT_SESSION *)malloc(sizeof(CLIENT_SESSION))) == NULL) { + if ((client_ses = (CLIENT_SESSION *)malloc(sizeof(CLIENT_SESSION))) == NULL) { return NULL; } /* @@ -335,7 +346,7 @@ int i; "%lu [newSession] Couldn't find eligible candidate " "server. Exiting.", pthread_self()); - free(client); + free(client_ses); return NULL; } @@ -345,7 +356,7 @@ int i; */ atomic_add(&candidate->current_connection_count, 1); - client->backend = candidate; + client_ses->backend = candidate; skygw_log_write( LOGFILE_TRACE, @@ -356,10 +367,10 @@ int i; candidate->current_connection_count); /* * Open a backend connection, putting the DCB for this - * connection in the client->dcb + * connection in the client_ses->dcb */ - if ((client->dcb = dcb_connect(candidate->server, session, + if ((client_ses->dcb = dcb_connect(candidate->server, session, candidate->server->protocol)) == NULL) { atomic_add(&candidate->current_connection_count, -1); @@ -369,7 +380,7 @@ int i; "server in port %d. Exiting.", pthread_self(), candidate->server->port); - free(client); + free(client_ses); return NULL; } @@ -377,10 +388,10 @@ int i; /* Add this session to the list of active sessions */ spinlock_acquire(&inst->lock); - client->next = inst->connections; - inst->connections = client; + client_ses->next = inst->connections; + inst->connections = client_ses; spinlock_release(&inst->lock); - return (void *)client; + return (void *)client_ses; } /** @@ -394,36 +405,37 @@ static void closeSession(ROUTER *instance, void *router_session) { INSTANCE *inst = (INSTANCE *)instance; -CLIENT_SESSION *session = (CLIENT_SESSION *)router_session; -bool succp = FALSE; +CLIENT_SESSION *client_ses = (CLIENT_SESSION *)router_session; +bool succp = FALSE; /* * Close the connection to the backend */ skygw_log_write_flush( LOGFILE_TRACE, - "%lu [closeSession] closing session with router_session " + "%lu [closeSession] closing session with " + "router_session " "%p, and inst %p.", pthread_self(), - session, + client_ses, inst); - succp = session->dcb->func.close(session->dcb); + succp = client_ses->dcb->func.close(client_ses->dcb); if (succp) { - session->dcb = NULL; + client_ses->dcb = NULL; } - atomic_add(&session->backend->current_connection_count, -1); - atomic_add(&session->backend->server->stats.n_current, -1); + atomic_add(&client_ses->backend->current_connection_count, -1); + atomic_add(&client_ses->backend->server->stats.n_current, -1); spinlock_acquire(&inst->lock); - if (inst->connections == session) - inst->connections = session->next; + if (inst->connections == client_ses) + inst->connections = client_ses->next; else { CLIENT_SESSION *ptr = inst->connections; - while (ptr && ptr->next != session) + while (ptr && ptr->next != client_ses) ptr = ptr->next; if (ptr) - ptr->next = session->next; + ptr->next = client_ses->next; } spinlock_release(&inst->lock); @@ -432,7 +444,7 @@ bool succp = FALSE; * all the memory and other resources associated * to the client session. */ - free(session); + free(client_ses); } /**