ReadConnRoute can handle now COM_CHANGE_USER

mysql_backend calls routeReply with backand results.

routeReply in readwritesplit.c can understand the dcb->command value
This commit is contained in:
Massimiliano Pinto
2013-07-17 10:39:08 +02:00
parent 91f1b4bc6f
commit b6cbe3a34c
4 changed files with 131 additions and 58 deletions

View File

@ -36,7 +36,10 @@
* 03/07/2013 Massimiliano Pinto Added delayq for incoming data before mysql connection
* 04/07/2013 Massimiliano Pinto Added asyncrhronous MySQL protocol connection to backend
* 05/07/2013 Massimiliano Pinto Added closeSession if backend auth fails
* 12/07/2013 Massimiliano Pinto Addesd Mysql Change User via dcb->func.auth()
* 12/07/2013 Massimiliano Pinto Added Mysql Change User via dcb->func.auth()
* 15/07/2013 Massimiliano Pinto Added Mysql session change via dcb->func.session()
* 17/07/2013 Massimiliano Pinto Added dcb->command update from gwbuf->command for proper routing
server replies to client via router->clientReply
*/
static char *version_str = "V2.0.0";
@ -127,7 +130,7 @@ static int gw_read_backend_event(DCB *dcb) {
backend_protocol = (MySQLProtocol *) dcb->protocol;
current_session = (MYSQL_session *)dcb->session->data;
//fprintf(stderr, ">>> backend EPOLLIN from %i, protocol state [%s]\n", dcb->fd, gw_mysql_protocol_state2string(backend_protocol->state));
//fprintf(stderr, ">>> backend EPOLLIN from %i, command %i, protocol state [%s]\n", dcb->fd, dcb->command, gw_mysql_protocol_state2string(backend_protocol->state));
/* backend is connected:
*
@ -140,6 +143,7 @@ static int gw_read_backend_event(DCB *dcb) {
gw_read_backend_handshake(backend_protocol);
gw_send_authentication_to_backend(current_session->db, current_session->user, current_session->client_sha1, backend_protocol);
return 1;
}
@ -201,16 +205,16 @@ static int gw_read_backend_event(DCB *dcb) {
}
}
/* Check for a pending session change */
/* reading MySQL command output from backend and writing to the client */
if (backend_protocol->state == MYSQL_SESSION_CHANGE) {
if ((client_protocol->state == MYSQL_WAITING_RESULT) || (client_protocol->state == MYSQL_IDLE)) {
GWBUF *head = NULL;
ROUTER_OBJECT *router = NULL;
ROUTER *router_instance = NULL;
void *rsession = NULL;
SESSION *session = dcb->session;
GWBUF *head = NULL;
/* read the available backend data */
/* read available backend data */
dcb_read(dcb, &head);
if (session) {
@ -219,27 +223,14 @@ static int gw_read_backend_event(DCB *dcb) {
rsession = session->router_session;
}
/* The configured router will send this packet to the client */
/* With multiple backends only one reply will be sent */
/* Note the gwbuf doesn't have here a valid queue->command descriptions as it is a fresh new one!
* We only have the copied value in dcb->command from previuos func.write()
* and this will be used by the router->clientReply
*/
/* and pass now the gwbuf to the router */
router->clientReply(router_instance, rsession, head, dcb);
/* Protocol status is now IDLE */
backend_protocol->state = MYSQL_IDLE;
}
/* reading MySQL command output from backend and writing to the client */
if ((client_protocol->state == MYSQL_WAITING_RESULT) || (client_protocol->state == MYSQL_IDLE)) {
GWBUF *head = NULL;
/* read available backend data */
dcb_read(dcb, &head);
/* and write the gwbuffer to client */
dcb->session->client->func.write(dcb->session->client, head);
return 1;
}
@ -286,17 +277,24 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
spinlock_acquire(&dcb->authlock);
fprintf(stderr, ">>>> Backend %i: command %i, queue command %i\n", dcb->fd, dcb->command, queue->command);
/**
* Now put the incoming data to the delay queue unless backend is connected with auth ok
*/
if ( (backend_protocol->state != MYSQL_IDLE) && (backend_protocol->state != MYSQL_SESSION_CHANGE) ) {
//fprintf(stderr, ">>> Writing in the backend %i delay queue\n", dcb->fd);
if (backend_protocol->state != MYSQL_IDLE) {
fprintf(stderr, ">>> Writing in the backend %i delay queue: last dcb command %i, queue command %i, protocol state [%s]\n", dcb->fd, dcb->command, queue->command, gw_mysql_protocol_state2string(dcb->state));
backend_set_delayqueue(dcb, queue);
spinlock_release(&dcb->authlock);
return 1;
}
/**
* Now we set the last command received, from the current queue
*/
memcpy(&dcb->command, &queue->command, sizeof(dcb->command));
spinlock_release(&dcb->authlock);
return dcb_write(dcb, queue);
@ -446,6 +444,12 @@ static int backend_write_delayqueue(DCB *dcb)
localq = dcb->delayq;
dcb->delayq = NULL;
/**
* Now we set the last command received, from the delayed queue
*/
memcpy(&dcb->command, &localq->command, sizeof(dcb->command));
spinlock_release(&dcb->delayqlock);
return dcb_write(dcb, localq);
@ -471,7 +475,7 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB
backend_protocol = backend->protocol;
client_protocol = in_session->client->protocol;
backend_protocol->state = MYSQL_SESSION_CHANGE;
queue->command = ROUTER_CHANGE_SESSION;
// now get the user, after 4 bytes header and 1 byte command
client_auth_packet += 5;
@ -510,7 +514,16 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB
//fprintf(stderr, "<<<< Backend session data is [%s],[%s],[%s]\n", current_session->user, current_session->client_sha1, current_session->db);
rv = gw_send_change_user_to_backend(database, username, client_sha1, backend_protocol);
// Now copy new data into user session
/**
* The current queue was not handled by func.write() in gw_send_change_user_to_backend()
* We wrote a new gwbuf
* Set backend command here!
*/
memcpy(&backend->command, &queue->command, sizeof(backend->command));
/**
* Now copy new data into user session
*/
strcpy(current_session->user, username);
strcpy(current_session->db, database);
memcpy(current_session->client_sha1, client_sha1, sizeof(current_session->client_sha1));
@ -519,7 +532,7 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB
}
// consume all the data received from client
len = GWBUF_LENGTH(queue);
len = gwbuf_length(queue);
queue = gwbuf_consume(queue, len);
return rv;
@ -528,7 +541,7 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB
/**
* Session Change wrapper for func.write
* The reply packet will be back routed to the right server
* in the gw_read_backend_event checking the MYSQL_SESSION_CHANGE state
* in the gw_read_backend_event checking the ROUTER_CHANGE_SESSION command in dcb->command
*
* @param
* @return
@ -541,7 +554,7 @@ static int gw_session(DCB *backend_dcb, void *data) {
backend_protocol = backend_dcb->protocol;
queue = (GWBUF *) data;
backend_protocol->state = MYSQL_SESSION_CHANGE;
queue->command = ROUTER_CHANGE_SESSION;
backend_dcb->func.write(backend_dcb, queue);

View File

@ -423,8 +423,10 @@ int gw_send_authentication_to_backend(char *dbname, char *user, uint8_t *passwd,
// write to backend dcb
// ToDO: handle the EAGAIN | EWOULDBLOCK
rv = write(dcb->fd, GWBUF_DATA(buffer), bytes);
gwbuf_consume(buffer, bytes);
/* Set the new state, next would be MYSQL_IDLE or MYSQL_AUTH_FAILED */
conn->state = MYSQL_AUTH_RECV;
if (rv < 0)
@ -769,10 +771,7 @@ int gw_send_change_user_to_backend(char *dbname, char *user, uint8_t *passwd, My
// put here the paylod size: bytes to write - 4 bytes packet header
gw_mysql_set_byte3(payload_start, (bytes-4));
// write to backend dcb
// ToDO: handle the EAGAIN | EWOULDBLOCK
rv = write(dcb->fd, GWBUF_DATA(buffer), bytes);
gwbuf_consume(buffer, bytes);
rv = dcb->func.write(dcb, buffer);
if (rv < 0)
return rv;
@ -780,6 +779,10 @@ int gw_send_change_user_to_backend(char *dbname, char *user, uint8_t *passwd, My
return 0;
}
/**
* Check authentication token received against stage1_hash and scramble
*
*/
int gw_check_mysql_scramble_data(DCB *dcb, uint8_t *token, unsigned int token_len, uint8_t *scramble, unsigned int scramble_len, char *username, uint8_t *stage1_hash) {
uint8_t step1[GW_MYSQL_SCRAMBLE_SIZE]="";
uint8_t step2[GW_MYSQL_SCRAMBLE_SIZE +1]="";

View File

@ -42,14 +42,16 @@
* Revision History
*
* Date Who Description
* 14/06/13 Mark Riddoch Initial implementation
* 25/06/13 Mark Riddoch Addition of checks for current server state
* 26/06/13 Mark Riddoch Use server with least connections since
* 14/06/2013 Mark Riddoch Initial implementation
* 25/06/2013 Mark Riddoch Addition of checks for current server state
* 26/06/2013 Mark Riddoch Use server with least connections since
* startup if the number of current
* connections is the same for two servers
* Addition of master and slave options
* 27/06/13 Vilho Raatikka Added skygw_log_write command as an example
* 27/06/2013 Vilho Raatikka Added skygw_log_write command as an example
* and necessary headers.
* 17/07/2013 Massimiliano Pinto Added clientReply routine:
called by backend server to send data to client
*
* @endverbatim
*/
@ -77,9 +79,10 @@ static void *newSession(ROUTER *instance, SESSION *session);
static void closeSession(ROUTER *instance, void *router_session);
static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue);
static void diagnostics(ROUTER *instance, DCB *dcb);
static void clientReply(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb);
/** The module object definition */
static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, routeQuery, diagnostics, NULL };
static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, routeQuery, diagnostics, clientReply };
static SPINLOCK instlock;
static INSTANCE *instances;
@ -369,9 +372,18 @@ routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
{
INSTANCE *inst = (INSTANCE *)instance;
CLIENT_SESSION *session = (CLIENT_SESSION *)router_session;
char *paylod = GWBUF_DATA(queue);
int mysql_command = -1;
mysql_command = paylod[4];
inst->stats.n_queries++;
if (mysql_command == 0x11) {
return session->dcb->func.auth(session->dcb, NULL, session->dcb->session, queue);
} else {
return session->dcb->func.write(session->dcb, queue);
}
}
/**
@ -400,3 +412,29 @@ int i = 0;
dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n", i);
dcb_printf(dcb, "\tNumber of queries forwarded: %d\n", inst->stats.n_queries);
}
/**
* Client Reply routine
*
* The routine will reply to client data from backend server
*
* @param instance The router instance
* @param router_session The router session
* @param backend_dcb The backend DCB
* @param queue The GWBUF with reply data
*/
static void
clientReply(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb)
{
INSTANCE* inst = NULL;
DCB *client = NULL;
CLIENT_SESSION* session = NULL;
inst = (INSTANCE *)instance;
session = (CLIENT_SESSION *)router_session;
client = backend_dcb->session->client;
client->func.write(client, queue);
}
///

View File

@ -23,8 +23,10 @@
#include <stdlib.h>
#include <mysql.h>
#if defined(SS_DEBUG)
#include <skygw_utils.h>
#include <log_manager.h>
#endif
#include <query_classifier.h>
#include <dcb.h>
#include <spinlock.h>
@ -420,28 +422,35 @@ static int routeQuery(
case COM_DAEMON: /**< 1d ? */
break;
}
#if defined(SS_DEBUG_)
skygw_log_write(NULL, LOGFILE_TRACE, "String\t\"%s\"", querystr);
skygw_log_write(NULL,
LOGFILE_TRACE,
"Packet type\t%s",
STRPACKETTYPE(packet_type));
#endif
switch (qtype) {
case QUERY_TYPE_WRITE:
#if defined(SS_DEBUG_)
skygw_log_write(NULL,
LOGFILE_TRACE,
"Query type\t%s, routing to Master.",
STRQTYPE(qtype));
#endif
ret = session->masterconn->func.write(session->masterconn, queue);
atomic_add(&inst->stats.n_master, 1);
goto return_ret;
break;
case QUERY_TYPE_READ:
#if defined(SS_DEBUG_)
skygw_log_write(NULL,
LOGFILE_TRACE,
"Query type\t%s, routing to Slave.",
STRQTYPE(qtype));
#endif
ret = session->slaveconn->func.write(session->slaveconn, queue);
atomic_add(&inst->stats.n_slave, 1);
goto return_ret;
@ -449,10 +458,12 @@ static int routeQuery(
case QUERY_TYPE_SESSION_WRITE:
#if defined(SS_DEBUG_)
skygw_log_write(NULL,
LOGFILE_TRACE,
"Query type\t%s, routing to All servers.",
STRQTYPE(qtype));
#endif
/**
* TODO! Connection to all servers must be established, and
* the command must be executed in them.
@ -473,10 +484,12 @@ static int routeQuery(
break;
default:
#if defined(SS_DEBUG_)
skygw_log_write(NULL,
LOGFILE_TRACE,
"Query type\t%s, routing to Master by default.",
STRQTYPE(qtype));
#endif
/** Is this really ok? */
ret = session->masterconn->func.write(session->masterconn, queue);
atomic_add(&inst->stats.n_master, 1);
@ -537,19 +550,25 @@ clientReply(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_d
{
INSTANCE* inst = NULL;
DCB *master = NULL;
DCB *client = NULL;
CLIENT_SESSION* session = NULL;
int len = 0;
inst = (INSTANCE *)instance;
session = (CLIENT_SESSION *)router_session;
master = session->masterconn;
client = backend_dcb->session->client;
/* if backend_dcb is the master reply to the client */
if (backend_dcb->command == ROUTER_CHANGE_SESSION) {
/* if backend_dcb is the master we can reply to the client */
if (backend_dcb == master) {
master->session->client->func.write(master->session->client, queue);
} else {
/* just consume the gwbuf without writing to the client */
gwbuf_consume(queue, gwbuf_length(queue));
}
} else {
/* normal flow */
client->func.write(client, queue);
}
}
///