diff --git a/modules/protocol/mysql_backend.c b/modules/protocol/mysql_backend.c index a61bc9d91..50c65522e 100644 --- a/modules/protocol/mysql_backend.c +++ b/modules/protocol/mysql_backend.c @@ -36,7 +36,10 @@ * 03/07/2013 Massimiliano Pinto Added delayq for incoming data before mysql connection * 04/07/2013 Massimiliano Pinto Added asyncrhronous MySQL protocol connection to backend * 05/07/2013 Massimiliano Pinto Added closeSession if backend auth fails - * 12/07/2013 Massimiliano Pinto Addesd Mysql Change User via dcb->func.auth() + * 12/07/2013 Massimiliano Pinto Added Mysql Change User via dcb->func.auth() + * 15/07/2013 Massimiliano Pinto Added Mysql session change via dcb->func.session() + * 17/07/2013 Massimiliano Pinto Added dcb->command update from gwbuf->command for proper routing + server replies to client via router->clientReply */ static char *version_str = "V2.0.0"; @@ -127,7 +130,7 @@ static int gw_read_backend_event(DCB *dcb) { backend_protocol = (MySQLProtocol *) dcb->protocol; current_session = (MYSQL_session *)dcb->session->data; - //fprintf(stderr, ">>> backend EPOLLIN from %i, protocol state [%s]\n", dcb->fd, gw_mysql_protocol_state2string(backend_protocol->state)); + //fprintf(stderr, ">>> backend EPOLLIN from %i, command %i, protocol state [%s]\n", dcb->fd, dcb->command, gw_mysql_protocol_state2string(backend_protocol->state)); /* backend is connected: * @@ -140,6 +143,7 @@ static int gw_read_backend_event(DCB *dcb) { gw_read_backend_handshake(backend_protocol); gw_send_authentication_to_backend(current_session->db, current_session->user, current_session->client_sha1, backend_protocol); + return 1; } @@ -201,16 +205,16 @@ static int gw_read_backend_event(DCB *dcb) { } } - /* Check for a pending session change */ + /* reading MySQL command output from backend and writing to the client */ - if (backend_protocol->state == MYSQL_SESSION_CHANGE) { - ROUTER_OBJECT *router = NULL; - ROUTER *router_instance = NULL; - void *rsession = NULL; - SESSION *session = dcb->session; - GWBUF *head = NULL; + if ((client_protocol->state == MYSQL_WAITING_RESULT) || (client_protocol->state == MYSQL_IDLE)) { + GWBUF *head = NULL; + ROUTER_OBJECT *router = NULL; + ROUTER *router_instance = NULL; + void *rsession = NULL; + SESSION *session = dcb->session; - /* read the available backend data */ + /* read available backend data */ dcb_read(dcb, &head); if (session) { @@ -219,27 +223,14 @@ static int gw_read_backend_event(DCB *dcb) { rsession = session->router_session; } - /* The configured router will send this packet to the client */ - /* With multiple backends only one reply will be sent */ + /* Note the gwbuf doesn't have here a valid queue->command descriptions as it is a fresh new one! + * We only have the copied value in dcb->command from previuos func.write() + * and this will be used by the router->clientReply + */ + + /* and pass now the gwbuf to the router */ router->clientReply(router_instance, rsession, head, dcb); - /* Protocol status is now IDLE */ - backend_protocol->state = MYSQL_IDLE; - - } - - - /* reading MySQL command output from backend and writing to the client */ - - if ((client_protocol->state == MYSQL_WAITING_RESULT) || (client_protocol->state == MYSQL_IDLE)) { - GWBUF *head = NULL; - - /* read available backend data */ - dcb_read(dcb, &head); - - /* and write the gwbuffer to client */ - dcb->session->client->func.write(dcb->session->client, head); - return 1; } @@ -286,17 +277,24 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) spinlock_acquire(&dcb->authlock); + fprintf(stderr, ">>>> Backend %i: command %i, queue command %i\n", dcb->fd, dcb->command, queue->command); + /** * Now put the incoming data to the delay queue unless backend is connected with auth ok */ - if ( (backend_protocol->state != MYSQL_IDLE) && (backend_protocol->state != MYSQL_SESSION_CHANGE) ) { - //fprintf(stderr, ">>> Writing in the backend %i delay queue\n", dcb->fd); + if (backend_protocol->state != MYSQL_IDLE) { + fprintf(stderr, ">>> Writing in the backend %i delay queue: last dcb command %i, queue command %i, protocol state [%s]\n", dcb->fd, dcb->command, queue->command, gw_mysql_protocol_state2string(dcb->state)); backend_set_delayqueue(dcb, queue); spinlock_release(&dcb->authlock); return 1; } + /** + * Now we set the last command received, from the current queue + */ + memcpy(&dcb->command, &queue->command, sizeof(dcb->command)); + spinlock_release(&dcb->authlock); return dcb_write(dcb, queue); @@ -446,6 +444,12 @@ static int backend_write_delayqueue(DCB *dcb) localq = dcb->delayq; dcb->delayq = NULL; + /** + * Now we set the last command received, from the delayed queue + */ + + memcpy(&dcb->command, &localq->command, sizeof(dcb->command)); + spinlock_release(&dcb->delayqlock); return dcb_write(dcb, localq); @@ -471,7 +475,7 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB backend_protocol = backend->protocol; client_protocol = in_session->client->protocol; - backend_protocol->state = MYSQL_SESSION_CHANGE; + queue->command = ROUTER_CHANGE_SESSION; // now get the user, after 4 bytes header and 1 byte command client_auth_packet += 5; @@ -510,7 +514,16 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB //fprintf(stderr, "<<<< Backend session data is [%s],[%s],[%s]\n", current_session->user, current_session->client_sha1, current_session->db); rv = gw_send_change_user_to_backend(database, username, client_sha1, backend_protocol); - // Now copy new data into user session + /** + * The current queue was not handled by func.write() in gw_send_change_user_to_backend() + * We wrote a new gwbuf + * Set backend command here! + */ + memcpy(&backend->command, &queue->command, sizeof(backend->command)); + + /** + * Now copy new data into user session + */ strcpy(current_session->user, username); strcpy(current_session->db, database); memcpy(current_session->client_sha1, client_sha1, sizeof(current_session->client_sha1)); @@ -519,7 +532,7 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB } // consume all the data received from client - len = GWBUF_LENGTH(queue); + len = gwbuf_length(queue); queue = gwbuf_consume(queue, len); return rv; @@ -528,7 +541,7 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB /** * Session Change wrapper for func.write * The reply packet will be back routed to the right server - * in the gw_read_backend_event checking the MYSQL_SESSION_CHANGE state + * in the gw_read_backend_event checking the ROUTER_CHANGE_SESSION command in dcb->command * * @param * @return @@ -541,7 +554,7 @@ static int gw_session(DCB *backend_dcb, void *data) { backend_protocol = backend_dcb->protocol; queue = (GWBUF *) data; - backend_protocol->state = MYSQL_SESSION_CHANGE; + queue->command = ROUTER_CHANGE_SESSION; backend_dcb->func.write(backend_dcb, queue); diff --git a/modules/protocol/mysql_common.c b/modules/protocol/mysql_common.c index a6c3c6a22..0d48221a2 100644 --- a/modules/protocol/mysql_common.c +++ b/modules/protocol/mysql_common.c @@ -420,11 +420,13 @@ int gw_send_authentication_to_backend(char *dbname, char *user, uint8_t *passwd, // put here the paylod size: bytes to write - 4 bytes packet header gw_mysql_set_byte3(payload_start, (bytes-4)); - // write to backend dcb + // write to backend dcb // ToDO: handle the EAGAIN | EWOULDBLOCK rv = write(dcb->fd, GWBUF_DATA(buffer), bytes); + gwbuf_consume(buffer, bytes); + /* Set the new state, next would be MYSQL_IDLE or MYSQL_AUTH_FAILED */ conn->state = MYSQL_AUTH_RECV; if (rv < 0) @@ -769,10 +771,7 @@ int gw_send_change_user_to_backend(char *dbname, char *user, uint8_t *passwd, My // put here the paylod size: bytes to write - 4 bytes packet header gw_mysql_set_byte3(payload_start, (bytes-4)); - // write to backend dcb - // ToDO: handle the EAGAIN | EWOULDBLOCK - rv = write(dcb->fd, GWBUF_DATA(buffer), bytes); - gwbuf_consume(buffer, bytes); + rv = dcb->func.write(dcb, buffer); if (rv < 0) return rv; @@ -780,6 +779,10 @@ int gw_send_change_user_to_backend(char *dbname, char *user, uint8_t *passwd, My return 0; } +/** + * Check authentication token received against stage1_hash and scramble + * + */ int gw_check_mysql_scramble_data(DCB *dcb, uint8_t *token, unsigned int token_len, uint8_t *scramble, unsigned int scramble_len, char *username, uint8_t *stage1_hash) { uint8_t step1[GW_MYSQL_SCRAMBLE_SIZE]=""; uint8_t step2[GW_MYSQL_SCRAMBLE_SIZE +1]=""; diff --git a/modules/routing/readconnroute.c b/modules/routing/readconnroute.c index 861921a58..29eb78bb9 100644 --- a/modules/routing/readconnroute.c +++ b/modules/routing/readconnroute.c @@ -42,14 +42,16 @@ * Revision History * * Date Who Description - * 14/06/13 Mark Riddoch Initial implementation - * 25/06/13 Mark Riddoch Addition of checks for current server state - * 26/06/13 Mark Riddoch Use server with least connections since - * startup if the number of current - * connections is the same for two servers - * Addition of master and slave options - * 27/06/13 Vilho Raatikka Added skygw_log_write command as an example - * and necessary headers. + * 14/06/2013 Mark Riddoch Initial implementation + * 25/06/2013 Mark Riddoch Addition of checks for current server state + * 26/06/2013 Mark Riddoch Use server with least connections since + * startup if the number of current + * connections is the same for two servers + * Addition of master and slave options + * 27/06/2013 Vilho Raatikka Added skygw_log_write command as an example + * and necessary headers. + * 17/07/2013 Massimiliano Pinto Added clientReply routine: + called by backend server to send data to client * * @endverbatim */ @@ -77,9 +79,10 @@ static void *newSession(ROUTER *instance, SESSION *session); static void closeSession(ROUTER *instance, void *router_session); static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue); static void diagnostics(ROUTER *instance, DCB *dcb); +static void clientReply(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb); /** The module object definition */ -static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, routeQuery, diagnostics, NULL }; +static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, routeQuery, diagnostics, clientReply }; static SPINLOCK instlock; static INSTANCE *instances; @@ -369,9 +372,18 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue) { INSTANCE *inst = (INSTANCE *)instance; CLIENT_SESSION *session = (CLIENT_SESSION *)router_session; +char *paylod = GWBUF_DATA(queue); +int mysql_command = -1; + + mysql_command = paylod[4]; inst->stats.n_queries++; - return session->dcb->func.write(session->dcb, queue); + + if (mysql_command == 0x11) { + return session->dcb->func.auth(session->dcb, NULL, session->dcb->session, queue); + } else { + return session->dcb->func.write(session->dcb, queue); + } } /** @@ -400,3 +412,29 @@ int i = 0; dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n", i); dcb_printf(dcb, "\tNumber of queries forwarded: %d\n", inst->stats.n_queries); } + +/** + * Client Reply routine + * + * The routine will reply to client data from backend server + * + * @param instance The router instance + * @param router_session The router session + * @param backend_dcb The backend DCB + * @param queue The GWBUF with reply data + */ +static void +clientReply(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb) +{ + INSTANCE* inst = NULL; + DCB *client = NULL; + CLIENT_SESSION* session = NULL; + + inst = (INSTANCE *)instance; + session = (CLIENT_SESSION *)router_session; + + client = backend_dcb->session->client; + + client->func.write(client, queue); +} +/// diff --git a/modules/routing/readwritesplit/readwritesplit.c b/modules/routing/readwritesplit/readwritesplit.c index f90716c3c..c4df97a62 100644 --- a/modules/routing/readwritesplit/readwritesplit.c +++ b/modules/routing/readwritesplit/readwritesplit.c @@ -23,8 +23,10 @@ #include #include +#if defined(SS_DEBUG) #include #include +#endif #include #include #include @@ -420,28 +422,35 @@ static int routeQuery( case COM_DAEMON: /**< 1d ? */ break; } + + #if defined(SS_DEBUG_) skygw_log_write(NULL, LOGFILE_TRACE, "String\t\"%s\"", querystr); skygw_log_write(NULL, LOGFILE_TRACE, "Packet type\t%s", STRPACKETTYPE(packet_type)); - + #endif + switch (qtype) { case QUERY_TYPE_WRITE: + #if defined(SS_DEBUG_) skygw_log_write(NULL, LOGFILE_TRACE, "Query type\t%s, routing to Master.", STRQTYPE(qtype)); + #endif ret = session->masterconn->func.write(session->masterconn, queue); atomic_add(&inst->stats.n_master, 1); goto return_ret; break; case QUERY_TYPE_READ: + #if defined(SS_DEBUG_) skygw_log_write(NULL, LOGFILE_TRACE, "Query type\t%s, routing to Slave.", STRQTYPE(qtype)); + #endif ret = session->slaveconn->func.write(session->slaveconn, queue); atomic_add(&inst->stats.n_slave, 1); goto return_ret; @@ -449,10 +458,12 @@ static int routeQuery( case QUERY_TYPE_SESSION_WRITE: + #if defined(SS_DEBUG_) skygw_log_write(NULL, LOGFILE_TRACE, "Query type\t%s, routing to All servers.", STRQTYPE(qtype)); + #endif /** * TODO! Connection to all servers must be established, and * the command must be executed in them. @@ -473,10 +484,12 @@ static int routeQuery( break; default: + #if defined(SS_DEBUG_) skygw_log_write(NULL, LOGFILE_TRACE, "Query type\t%s, routing to Master by default.", STRQTYPE(qtype)); + #endif /** Is this really ok? */ ret = session->masterconn->func.write(session->masterconn, queue); atomic_add(&inst->stats.n_master, 1); @@ -537,19 +550,25 @@ clientReply(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_d { INSTANCE* inst = NULL; DCB *master = NULL; + DCB *client = NULL; CLIENT_SESSION* session = NULL; - int len = 0; inst = (INSTANCE *)instance; session = (CLIENT_SESSION *)router_session; master = session->masterconn; + client = backend_dcb->session->client; - /* if backend_dcb is the master reply to the client */ - if (backend_dcb == master) { - master->session->client->func.write(master->session->client, queue); + if (backend_dcb->command == ROUTER_CHANGE_SESSION) { + /* if backend_dcb is the master we can reply to the client */ + if (backend_dcb == master) { + master->session->client->func.write(master->session->client, queue); + } else { + /* just consume the gwbuf without writing to the client */ + gwbuf_consume(queue, gwbuf_length(queue)); + } } else { - /* just consume the gwbuf without writing to the client */ - gwbuf_consume(queue, gwbuf_length(queue)); + /* normal flow */ + client->func.write(client, queue); } } ///