Read write split router can reply the master packet to client in session change.

Mysql backend handles auth and session func.


The MYSQL_SESSION_CHANGE may be not in sync with authentication
This commit is contained in:
Massimiliano Pinto
2013-07-15 18:56:14 +02:00
parent 0d3e798b73
commit e0d9d45f3b
11 changed files with 129 additions and 28 deletions

View File

@ -42,7 +42,8 @@ struct service;
* 02/07/2013 Massimiliano Pinto Addition of delayqlock, delayq and authlock * 02/07/2013 Massimiliano Pinto Addition of delayqlock, delayq and authlock
* for handling backend asynchronous protocol connection * for handling backend asynchronous protocol connection
* and a generic lock for backend authentication * and a generic lock for backend authentication
* 12/07/2013 Massimiliano Pinto Added auth and generic func pointers * 12/07/2013 Massimiliano Pinto Added auth entry point
* 15/07/2013 Massimiliano Pinto Added session entry point
* *
* @endverbatim * @endverbatim
*/ */
@ -64,8 +65,8 @@ struct dcb;
* for the session pased in * for the session pased in
* close Gateway close entry point for the socket * close Gateway close entry point for the socket
* listen Create a listener for the protocol * listen Create a listener for the protocol
* auth Authentication entry point for backend server * auth Authentication entry point
* generic Geeneric purpose entry point * session Session handling entry point
* @endverbatim * @endverbatim
* *
* This forms the "module object" for protocol modules within the gateway. * This forms the "module object" for protocol modules within the gateway.
@ -83,7 +84,7 @@ typedef struct gw_protocol {
int (*close)(struct dcb *); int (*close)(struct dcb *);
int (*listen)(struct dcb *, char *); int (*listen)(struct dcb *, char *);
int (*auth)(struct dcb *, struct server *, struct session *, GWBUF *); int (*auth)(struct dcb *, struct server *, struct session *, GWBUF *);
int (*generic)(struct dcb *, void *); int (*session)(struct dcb *, void *);
} GWPROTOCOL; } GWPROTOCOL;
/** /**

View File

@ -23,10 +23,11 @@
* *
* Revision History * Revision History
* *
* Date Who Description * Date Who Description
* 14/06/13 Mark Riddoch Initial implementation * 14/06/2013 Mark Riddoch Initial implementation
* 26/06/13 Mark Riddoch Addition of router options * 26/06/2013 Mark Riddoch Addition of router options
* and the diagnostic entry point * and the diagnostic entry point
* 15/07/2013 Massimiliano Pinto Added clientReply entry point
* *
*/ */
#include <service.h> #include <service.h>
@ -54,6 +55,8 @@ typedef void *ROUTER;
* routing * routing
* diagnostics Called to force the router to print * diagnostics Called to force the router to print
* diagnostic output * diagnostic output
* clientReply Called to reply to client the data from one or all backends
*
* @endverbatim * @endverbatim
* *
* @see load_module * @see load_module
@ -64,5 +67,6 @@ typedef struct router_object {
void (*closeSession)(ROUTER *instance, void *router_session); void (*closeSession)(ROUTER *instance, void *router_session);
int (*routeQuery)(ROUTER *instance, void *router_session, GWBUF *queue); int (*routeQuery)(ROUTER *instance, void *router_session, GWBUF *queue);
void (*diagnostics)(ROUTER *instance, DCB *dcb); void (*diagnostics)(ROUTER *instance, DCB *dcb);
void (*clientReply)(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb);
} ROUTER_OBJECT; } ROUTER_OBJECT;
#endif #endif

View File

@ -68,7 +68,7 @@ static GWPROTOCOL MyObject = {
httpd_close, /**< Close */ httpd_close, /**< Close */
httpd_listen, /**< Create a listener */ httpd_listen, /**< Create a listener */
NULL, /**< Authentication */ NULL, /**< Authentication */
NULL /**< Generic */ NULL /**< Session */
}; };
/** /**

View File

@ -51,6 +51,7 @@ static int gw_backend_hangup(DCB *dcb);
static int backend_write_delayqueue(DCB *dcb); static int backend_write_delayqueue(DCB *dcb);
static void backend_set_delayqueue(DCB *dcb, GWBUF *queue); static void backend_set_delayqueue(DCB *dcb, GWBUF *queue);
static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue); static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue);
static int gw_session(DCB *backend_dcb, void *data);
extern char *gw_strend(register const char *s); extern char *gw_strend(register const char *s);
@ -65,7 +66,7 @@ static GWPROTOCOL MyObject = {
gw_backend_close, /* Close */ gw_backend_close, /* Close */
NULL, /* Listen */ NULL, /* Listen */
gw_change_user, /* Authentication */ gw_change_user, /* Authentication */
NULL /* Generic */ gw_session /* Session */
}; };
/* /*
@ -200,6 +201,34 @@ static int gw_read_backend_event(DCB *dcb) {
} }
} }
/* Check for a pending session change */
if (backend_protocol->state == MYSQL_SESSION_CHANGE) {
ROUTER_OBJECT *router = NULL;
ROUTER *router_instance = NULL;
void *rsession = NULL;
SESSION *session = dcb->session;
GWBUF *head = NULL;
/* read the available backend data */
dcb_read(dcb, &head);
if (session) {
router = session->service->router;
router_instance = session->service->router_instance;
rsession = session->router_session;
}
/* The configured router will send this packet to the client */
/* With multiple backends only one reply will be sent */
router->clientReply(router_instance, rsession, head, dcb);
/* Protocol status is now IDLE */
backend_protocol->state = MYSQL_IDLE;
}
/* reading MySQL command output from backend and writing to the client */ /* reading MySQL command output from backend and writing to the client */
if ((client_protocol->state == MYSQL_WAITING_RESULT) || (client_protocol->state == MYSQL_IDLE)) { if ((client_protocol->state == MYSQL_WAITING_RESULT) || (client_protocol->state == MYSQL_IDLE)) {
@ -260,7 +289,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
/** /**
* Now put the incoming data to the delay queue unless backend is connected with auth ok * Now put the incoming data to the delay queue unless backend is connected with auth ok
*/ */
if (backend_protocol->state != MYSQL_IDLE) { if ( (backend_protocol->state != MYSQL_IDLE) && (backend_protocol->state != MYSQL_SESSION_CHANGE) ) {
//fprintf(stderr, ">>> Writing in the backend %i delay queue\n", dcb->fd); //fprintf(stderr, ">>> Writing in the backend %i delay queue\n", dcb->fd);
backend_set_delayqueue(dcb, queue); backend_set_delayqueue(dcb, queue);
@ -442,6 +471,8 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB
backend_protocol = backend->protocol; backend_protocol = backend->protocol;
client_protocol = in_session->client->protocol; client_protocol = in_session->client->protocol;
backend_protocol->state = MYSQL_SESSION_CHANGE;
// now get the user, after 4 bytes header and 1 byte command // now get the user, after 4 bytes header and 1 byte command
client_auth_packet += 5; client_auth_packet += 5;
strcpy(username, (char *)client_auth_packet); strcpy(username, (char *)client_auth_packet);
@ -468,8 +499,10 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB
if (auth_ret != 0) { if (auth_ret != 0) {
fprintf(stderr, "<<< CLIENT AUTH FAILED for user [%s], user session will not change!\n", username); fprintf(stderr, "<<< CLIENT AUTH FAILED for user [%s], user session will not change!\n", username);
// send the error packet // send the error packet
mysql_send_auth_error(backend->session->client, 1, 0, "Authorization failed on change_user"); mysql_send_auth_error(backend->session->client, 1, 0, "Authorization failed on change_user");
} else { } else {
// get db name // get db name
strcpy(database, (char *)client_auth_packet); strcpy(database, (char *)client_auth_packet);
@ -482,7 +515,7 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB
strcpy(current_session->db, database); strcpy(current_session->db, database);
memcpy(current_session->client_sha1, client_sha1, sizeof(current_session->client_sha1)); memcpy(current_session->client_sha1, client_sha1, sizeof(current_session->client_sha1));
fprintf(stderr, ">>> The NEW Backend session data is [%s],[%s],[%s]\n", current_session->user, current_session->client_sha1, current_session->db); //fprintf(stderr, ">>> The NEW Backend session data is [%s],[%s],[%s]: protocol state [%i]\n", current_session->user, current_session->client_sha1, current_session->db, backend_protocol->state);
} }
// consume all the data received from client // consume all the data received from client
@ -491,4 +524,27 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB
return rv; return rv;
} }
/**
* Session Change wrapper for func.write
* The reply packet will be back routed to the right server
* in the gw_read_backend_event checking the MYSQL_SESSION_CHANGE state
*
* @param
* @return
*/
static int gw_session(DCB *backend_dcb, void *data) {
GWBUF *queue = NULL;
MySQLProtocol *backend_protocol = NULL;
backend_protocol = backend_dcb->protocol;
queue = (GWBUF *) data;
backend_protocol->state = MYSQL_SESSION_CHANGE;
backend_dcb->func.write(backend_dcb, queue);
return 0;
}
///// /////

View File

@ -60,7 +60,7 @@ static GWPROTOCOL MyObject = {
gw_client_close, /* Close */ gw_client_close, /* Close */
gw_MySQLListener, /* Listen */ gw_MySQLListener, /* Listen */
NULL, /* Authentication */ NULL, /* Authentication */
NULL /* Generic */ NULL /* Session */
}; };
/** /**

View File

@ -506,6 +506,8 @@ gw_mysql_protocol_state2string (int state) {
return "MySQL received command has been routed to backend(s)"; return "MySQL received command has been routed to backend(s)";
case MYSQL_WAITING_RESULT: case MYSQL_WAITING_RESULT:
return "MySQL Waiting for result set"; return "MySQL Waiting for result set";
case MYSQL_SESSION_CHANGE:
return "MySQL change session";
default: default:
return "MySQL (unknown protocol state)"; return "MySQL (unknown protocol state)";
} }
@ -772,8 +774,6 @@ int gw_send_change_user_to_backend(char *dbname, char *user, uint8_t *passwd, My
rv = write(dcb->fd, GWBUF_DATA(buffer), bytes); rv = write(dcb->fd, GWBUF_DATA(buffer), bytes);
gwbuf_consume(buffer, bytes); gwbuf_consume(buffer, bytes);
conn->state = MYSQL_IDLE;
if (rv < 0) if (rv < 0)
return rv; return rv;
else else

View File

@ -78,7 +78,7 @@ static GWPROTOCOL MyObject = {
telnetd_close, /**< Close */ telnetd_close, /**< Close */
telnetd_listen, /**< Create a listener */ telnetd_listen, /**< Create a listener */
NULL, /**< Authentication */ NULL, /**< Authentication */
NULL /**< Generic */ NULL /**< Session */
}; };
static void static void

View File

@ -51,7 +51,7 @@ static int execute(ROUTER *instance, void *router_session, GWBUF *queue);
static void diagnostics(ROUTER *instance, DCB *dcb); static void diagnostics(ROUTER *instance, DCB *dcb);
/** The module object definition */ /** The module object definition */
static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, execute, diagnostics }; static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, execute, diagnostics, NULL };
extern int execute_cmd(CLI_SESSION *cli); extern int execute_cmd(CLI_SESSION *cli);

View File

@ -79,7 +79,7 @@ static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue);
static void diagnostics(ROUTER *instance, DCB *dcb); static void diagnostics(ROUTER *instance, DCB *dcb);
/** The module object definition */ /** The module object definition */
static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, routeQuery, diagnostics }; static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, routeQuery, diagnostics, NULL };
static SPINLOCK instlock; static SPINLOCK instlock;
static INSTANCE *instances; static INSTANCE *instances;

View File

@ -30,27 +30,38 @@
#include <spinlock.h> #include <spinlock.h>
/** /**
* @file router.c The entry points for the read/write query splitting * @file readwritesplit.c The entry points for the read/write query splitting
* router module. * router module.
* *
* This file contains the entry points that comprise the API to the read write * This file contains the entry points that comprise the API to the read write
* query splitting router. * query splitting router.
* @verbatim
* Revision History
* *
* Date Who Description
* 01/07/2013 Vilho Raatikka Initial implementation
* 15/07/2013 Massimiliano Pinto Added clientReply
* from master only in case of session change
*
* @endverbatim
*/ */
static char *version_str = "V1.0.0";
static char *version_str = "V1.0.1";
static ROUTER* createInstance(SERVICE *service, char **options); static ROUTER* createInstance(SERVICE *service, char **options);
static void* newSession(ROUTER *instance, SESSION *session); static void* newSession(ROUTER *instance, SESSION *session);
static void closeSession(ROUTER *instance, void *session); static void closeSession(ROUTER *instance, void *session);
static int routeQuery(ROUTER *instance, void *session, GWBUF *queue); static int routeQuery(ROUTER *instance, void *session, GWBUF *queue);
static void diagnostic(ROUTER *instance, DCB *dcb); static void diagnostic(ROUTER *instance, DCB *dcb);
static void clientReply(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb);
static ROUTER_OBJECT MyObject = static ROUTER_OBJECT MyObject =
{ createInstance, { createInstance,
newSession, newSession,
closeSession, closeSession,
routeQuery, routeQuery,
diagnostic }; diagnostic,
clientReply };
static SPINLOCK instlock; static SPINLOCK instlock;
static INSTANCE* instances; static INSTANCE* instances;
@ -377,8 +388,6 @@ static int routeQuery(
len += 255*packet[1]; len += 255*packet[1];
len += 255*255*packet[2]; len += 255*255*packet[2];
fprintf(stderr, "\n\n\n>>> Packet type is [%x]\n\n\n", packet_type);
switch(packet_type) { switch(packet_type) {
case COM_INIT_DB: /**< 2 DDL must go to the master */ case COM_INIT_DB: /**< 2 DDL must go to the master */
case COM_REFRESH: /**< 7 - I guess this is session but not sure */ case COM_REFRESH: /**< 7 - I guess this is session but not sure */
@ -452,12 +461,10 @@ static int routeQuery(
if (packet_type != COM_CHANGE_USER) if (packet_type != COM_CHANGE_USER)
{ {
GWBUF *cq = gwbuf_clone(queue); GWBUF *cq = gwbuf_clone(queue);
fprintf(stderr, "\n\n>>>> SESSION WRITE type [%x]\n\n", packet_type); ret = session->masterconn->func.session(session->masterconn, (void *)queue);
ret = session->masterconn->func.write(session->masterconn, queue); session->slaveconn->func.session(session->slaveconn, (void *)cq);
session->slaveconn->func.write(session->slaveconn, cq);
} else { } else {
GWBUF *cq = gwbuf_clone(queue); GWBUF *cq = gwbuf_clone(queue);
fprintf(stderr, "\n\n>>>> COM_CHANGE_USER here\n\n");
session->masterconn->func.auth(session->masterconn, NULL, session->masterconn->session, queue); session->masterconn->func.auth(session->masterconn, NULL, session->masterconn->session, queue);
session->slaveconn->func.auth(session->slaveconn, NULL, session->masterconn->session, cq); session->slaveconn->func.auth(session->slaveconn, NULL, session->masterconn->session, cq);
} }
@ -512,4 +519,37 @@ int i = 0;
dcb_printf(dcb, "\tNumber of queries forwarded to master: %d\n", inst->stats.n_master); dcb_printf(dcb, "\tNumber of queries forwarded to master: %d\n", inst->stats.n_master);
dcb_printf(dcb, "\tNumber of queries forwarded to slave: %d\n", inst->stats.n_slave); dcb_printf(dcb, "\tNumber of queries forwarded to slave: %d\n", inst->stats.n_slave);
dcb_printf(dcb, "\tNumber of queries forwarded to all: %d\n", inst->stats.n_all); dcb_printf(dcb, "\tNumber of queries forwarded to all: %d\n", inst->stats.n_all);
} }
/**
* Client Reply routine
*
* The routine will reply to client for session change with master server data
*
* @param instance The router instance
* @param router_session The router session
* @param backend_dcb The backend DCB
* @param queue The GWBUF with reply data
*/
static void
clientReply(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb)
{
INSTANCE* inst = NULL;
DCB *master = NULL;
CLIENT_SESSION* session = NULL;
int len = 0;
inst = (INSTANCE *)instance;
session = (CLIENT_SESSION *)router_session;
master = session->masterconn;
/* if backend_dcb is the master reply to the client */
if (backend_dcb == master) {
master->session->client->func.write(master->session->client, queue);
} else {
/* just consume the gwbuf without writing to the client */
gwbuf_consume(queue, gwbuf_length(queue));
}
}
///

View File

@ -26,7 +26,7 @@ static void closeSession(ROUTER *instance, void *session);
static int routeQuery(ROUTER *instance, void *session, GWBUF *queue); static int routeQuery(ROUTER *instance, void *session, GWBUF *queue);
static void diagnostic(ROUTER *instance, DCB *dcb); static void diagnostic(ROUTER *instance, DCB *dcb);
static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, routeQuery, diagnostic }; static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, routeQuery, diagnostic, NULL };
/** /**
* Implementation of the mandatory version entry point * Implementation of the mandatory version entry point