From 04313caf82847ea04c1bde08e19bd7d8a8e1f5d5 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Wed, 23 Apr 2014 14:55:04 +0300 Subject: [PATCH 1/5] Initial implementation of the support for multiple slaves in read/write split router session. Omits configuration changes. --- server/modules/include/readwritesplit.h | 61 +- server/modules/protocol/mysql_client.c | 2 +- .../routing/readwritesplit/readwritesplit.c | 874 ++++++++++-------- utils/skygw_debug.h | 15 +- 4 files changed, 543 insertions(+), 409 deletions(-) diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 024a8bd88..ab17b2c35 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -31,15 +31,14 @@ #include -/** - * Internal structure used to define the set of backend servers we are routing - * connections to. This provides the storage for routing module specific data - * that is required for each of the backend servers. - */ -typedef struct backend { - SERVER* backend_server; /*< The server itself */ - int backend_conn_count; /*< Number of connections to the server */ -} BACKEND; + +typedef enum backend_type_t { + BE_UNDEFINED=-1, + BE_MASTER, + BE_JOINED = BE_MASTER, + BE_SLAVE, + BE_COUNT +} backend_type_t; typedef struct rses_property_st rses_property_t; typedef struct router_client_session ROUTER_CLIENT_SES; @@ -52,14 +51,6 @@ typedef enum rses_property_type_t { RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1 } rses_property_type_t; -typedef enum backend_type_t { - BE_UNDEFINED=-1, - BE_MASTER, - BE_JOINED = BE_MASTER, - BE_SLAVE, - BE_COUNT -} backend_type_t; - /** * Session variable command */ @@ -98,13 +89,38 @@ struct rses_property_st { }; typedef struct sescmd_cursor_st { +#if defined(SS_DEBUG) + skygw_chk_t scmd_cur_chk_top; +#endif ROUTER_CLIENT_SES* scmd_cur_rses; /*< pointer to owning router session */ rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */ mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */ bool scmd_cur_active; /*< true if command is being executed */ - backend_type_t scmd_cur_be_type; /*< BE_MASTER or BE_SLAVE */ +#if defined(SS_DEBUG) + skygw_chk_t scmd_cur_chk_tail; +#endif } sescmd_cursor_t; +/** + * Internal structure used to define the set of backend servers we are routing + * connections to. This provides the storage for routing module specific data + * that is required for each of the backend servers. + */ +typedef struct backend { +#if defined(SS_DEBUG) + skygw_chk_t be_chk_top; +#endif + SERVER* backend_server; /*< The server itself */ + int backend_conn_count; /*< Number of connections to the server */ + bool be_valid; /*< valid when belongs to the router's configuration */ + DCB* be_dcb; + /*< cursor is pointer and status variable to current session command */ + sescmd_cursor_t be_sescmd_cursor; +#if defined(SS_DEBUG) + skygw_chk_t be_chk_tail; +#endif +} BACKEND; + /** * The client session structure used within this router. */ @@ -113,14 +129,13 @@ struct router_client_session { skygw_chk_t rses_chk_top; #endif SPINLOCK rses_lock; /*< protects rses_deleted */ - int rses_versno; /*< even = no active update, else odd */ + int rses_versno; /*< even = no active update, else odd. not used 4/14 */ bool rses_closed; /*< true when closeSession is called */ /** Properties listed by their type */ rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; - BACKEND* rses_backend[BE_COUNT];/*< Backends used by client session */ - DCB* rses_dcb[BE_COUNT]; - /*< cursor is pointer and status variable to current session command */ - sescmd_cursor_t rses_cursor[BE_COUNT]; + BACKEND* rses_master; /*< Pointer to master */ + BACKEND** rses_backend; /*< All backends used by client session */ + int rses_nbackends; int rses_capabilities; /*< input type, for example */ struct router_client_session* next; #if defined(SS_DEBUG) diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 5bd20f4f9..1e394d7d3 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -682,7 +682,7 @@ int gw_read_client_event(DCB* dcb) { dcb, 1, 0, - "Query routing failed. Connection to " + "Can't route query. Connection to " "backend lost"); protocol->state = MYSQL_IDLE; } diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 7b2285078..c177d6d62 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -70,11 +70,23 @@ static void clientReply( DCB* backend_dcb); static uint8_t getCapabilities (ROUTER* inst, void* router_session); +static int backend_cmp( + const void* be_1, + const void* be_2); + +static bool select_connect_backend_servers( + BACKEND** p_master, + BACKEND** b, + int router_nservers, + int max_nslaves, + SESSION* session, + ROUTER_INSTANCE* router); + +static bool get_dcb( + DCB** dcb, + ROUTER_CLIENT_SES* rses, + backend_type_t btype); -static bool search_backend_servers( - BACKEND** p_master, - BACKEND** p_slave, - ROUTER_INSTANCE* router); static ROUTER_OBJECT MyObject = { createInstance, @@ -118,13 +130,8 @@ static void rses_property_done( static mysql_sescmd_t* rses_property_get_sescmd( rses_property_t* prop); -static sescmd_cursor_t* rses_get_sescmd_cursor( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type); - static bool execute_sescmd_in_backend( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type); + BACKEND* backend); static void sescmd_cursor_set_active( sescmd_cursor_t* sescmd_cursor, @@ -151,11 +158,13 @@ static bool cont_exec_sescmd_in_backend( ROUTER_CLIENT_SES* rses, backend_type_t be_type); +#if !defined(MAX95) static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, char* funcname, DCB* dcb, GWBUF* buf); +#endif static bool route_session_write( ROUTER_CLIENT_SES* router_client_ses, @@ -220,7 +229,7 @@ static ROUTER* createInstance( { ROUTER_INSTANCE* router; SERVER* server; - int n; + int nservers; int i; if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { @@ -231,11 +240,14 @@ static ROUTER* createInstance( /** Calculate number of servers */ server = service->databases; + nservers = 0; - for (n=0; server != NULL; server=server->nextdb) { - n++; + while (server != NULL) + { + nservers++; + server=server->nextdb; } - router->servers = (BACKEND **)calloc(n + 1, sizeof(BACKEND *)); + router->servers = (BACKEND **)calloc(nservers + 1, sizeof(BACKEND *)); if (router->servers == NULL) { @@ -258,23 +270,31 @@ static ROUTER* createInstance( * backend server. */ server = service->databases; - n = 0; + nservers= 0; + while (server != NULL) { - if ((router->servers[n] = malloc(sizeof(BACKEND))) == NULL) + if ((router->servers[nservers] = malloc(sizeof(BACKEND))) == NULL) { - for (i = 0; i < n; i++) { + /** clean up */ + for (i = 0; i < nservers; i++) { free(router->servers[i]); } free(router->servers); free(router); return NULL; } - router->servers[n]->backend_server = server; - router->servers[n]->backend_conn_count = 0; - n += 1; + router->servers[nservers]->backend_server = server; + router->servers[nservers]->backend_conn_count = 0; + router->servers[nservers]->be_valid = false; + router->servers[nservers]->be_dcb = NULL; +#if defined(SS_DEBUG) + router->servers[nservers]->be_chk_top = CHK_NUM_BACKEND; + router->servers[nservers]->be_chk_tail = CHK_NUM_BACKEND; +#endif + nservers += 1; server = server->nextdb; } - router->servers[n] = NULL; + router->servers[nservers] = NULL; /** * vraa : is this necessary for readwritesplit ? @@ -330,97 +350,92 @@ static ROUTER* createInstance( * @param session The session itself * @return Session specific data for this session */ + const int conf_max_nslaves = 2; /*< replaces configuration parameter until its developed */ + static void* newSession( ROUTER* router_inst, SESSION* session) { - BACKEND* local_backend[BE_COUNT]; - ROUTER_CLIENT_SES* client_rses; - ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst; + BACKEND** pp_backend; + BACKEND** b; + BACKEND* master = NULL; /*< pointer to selected master */ + ROUTER_CLIENT_SES* client_rses = NULL; + ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst; bool succp; - - client_rses = - (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); + int router_nservers = 0; /*< # of servers in total */ + int max_nslaves; /*< max # of slaves used in this session */ + + b = router->servers; + + /** count servers */ + while (*(b++) != NULL) router_nservers++; + + /** Master + Slave is minimum requirement */ + if (router_nservers < 2) + { + /** log */ + goto return_rses; + } + max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); + client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); if (client_rses == NULL) { ss_dassert(false); - return NULL; + goto return_rses; } - memset(local_backend, 0, BE_COUNT*sizeof(void*)); + pp_backend = (BACKEND **)calloc(1, (router_nservers)*sizeof(BACKEND *)); + + /** + * Copy backend pointer array from global router instance to private use. + */ + memcpy(pp_backend, router->servers, router_nservers*sizeof(BACKEND *)); + ss_dassert(pp_backend[router_nservers] == NULL); + spinlock_init(&client_rses->rses_lock); #if defined(SS_DEBUG) client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; #endif - /** store pointers to sescmd list to both cursors */ - client_rses->rses_cursor[BE_MASTER].scmd_cur_rses = client_rses; - client_rses->rses_cursor[BE_MASTER].scmd_cur_active = false; - client_rses->rses_cursor[BE_MASTER].scmd_cur_ptr_property = - &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - client_rses->rses_cursor[BE_MASTER].scmd_cur_cmd = NULL; - client_rses->rses_cursor[BE_MASTER].scmd_cur_be_type = BE_MASTER; - - client_rses->rses_cursor[BE_SLAVE].scmd_cur_rses = client_rses; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_active = false; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_ptr_property = - &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_cmd = NULL; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_be_type = BE_SLAVE; + b = pp_backend; + /** Set up ses cmd objects for each backend */ + while (*b != NULL) + { + /** store pointers to sescmd list to both cursors */ + (*b)->be_sescmd_cursor.scmd_cur_rses = client_rses; + (*b)->be_sescmd_cursor.scmd_cur_active = false; + (*b)->be_sescmd_cursor.scmd_cur_ptr_property = + &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; + (*b)->be_sescmd_cursor.scmd_cur_cmd = NULL; +#if defined(SS_DEBUG) + (*b)->be_sescmd_cursor.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; + (*b)->be_sescmd_cursor.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; +#endif + b++; + } /** - * Find a backend server to connect to. This is the extent of the - * load balancing algorithm we need to implement for this simple - * connection router. + * Find a backend servers to connect to. */ - succp = search_backend_servers(&local_backend[BE_MASTER], - &local_backend[BE_SLAVE], - router); + succp = select_connect_backend_servers(&master, + pp_backend, + router_nservers, + max_nslaves, + session, + router); - /** Both Master and Slave must be found */ + /** Both Master and at least 1 slave must be found */ if (!succp) { free(client_rses); - return NULL; - } - /** - * Open the slave connection. - */ - client_rses->rses_dcb[BE_SLAVE] = dcb_connect( - local_backend[BE_SLAVE]->backend_server, - session, - local_backend[BE_SLAVE]->backend_server->protocol); - - if (client_rses->rses_dcb[BE_SLAVE] == NULL) { - ss_dassert(session->refcount == 1); - free(client_rses); - return NULL; - } - /** - * Open the master connection. - */ - client_rses->rses_dcb[BE_MASTER] = dcb_connect( - local_backend[BE_MASTER]->backend_server, - session, - local_backend[BE_MASTER]->backend_server->protocol); - if (client_rses->rses_dcb[BE_MASTER] == NULL) - { - /** Close slave connection first. */ - client_rses->rses_dcb[BE_SLAVE]->func.close(client_rses->rses_dcb[BE_SLAVE]); - free(client_rses); - return NULL; - } - /** - * We now have a master and a slave server with the least connections. - * Bump the connection counts for these servers. - */ - atomic_add(&local_backend[BE_SLAVE]->backend_conn_count, 1); - atomic_add(&local_backend[BE_MASTER]->backend_conn_count, 1); - - client_rses->rses_backend[BE_SLAVE] = local_backend[BE_SLAVE]; - client_rses->rses_backend[BE_MASTER] = local_backend[BE_MASTER]; - router->stats.n_sessions += 1; - + client_rses = NULL; + goto return_rses; + } + /** Copy backend pointers to router session. */ + client_rses->rses_master = master; + client_rses->rses_backend = pp_backend; + client_rses->rses_nbackends = router_nservers; /*< # of backend servers */ client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT; + router->stats.n_sessions += 1; /** * Version is bigger than zero once initialized. */ @@ -430,15 +445,18 @@ static void* newSession( * Add this session to end of the list of active sessions in router. */ spinlock_acquire(&router->lock); - client_rses->next = router->connections; + client_rses->next = router->connections; router->connections = client_rses; spinlock_release(&router->lock); CHK_CLIENT_RSES(client_rses); - + +return_rses: return (void *)client_rses; } + + /** * Close a session with the router, this is the mechanism * by which a router may cleanup data structure etc. @@ -451,44 +469,48 @@ static void closeSession( void* router_session) { ROUTER_CLIENT_SES* router_cli_ses; - DCB* slave_dcb; - DCB* master_dcb; - + BACKEND** b; + router_cli_ses = (ROUTER_CLIENT_SES *)router_session; - CHK_CLIENT_RSES(router_cli_ses); + CHK_CLIENT_RSES(router_cli_ses); + + b = router_cli_ses->rses_backend; /** * Lock router client session for secure read and update. */ - if (rses_begin_locked_router_action(router_cli_ses)) + if (!router_cli_ses->rses_closed && + rses_begin_locked_router_action(router_cli_ses)) { - /** decrease server current connection counters */ - atomic_add(&router_cli_ses->rses_backend[BE_SLAVE]->backend_server->stats.n_current, -1); - atomic_add(&router_cli_ses->rses_backend[BE_MASTER]->backend_server->stats.n_current, -1); - - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - router_cli_ses->rses_dcb[BE_SLAVE] = NULL; - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - router_cli_ses->rses_dcb[BE_MASTER] = NULL; - - router_cli_ses->rses_closed = true; - /** Unlock */ - rses_end_locked_router_action(router_cli_ses); - - /** - * Close the backend server connections + DCB* dcbs[router_cli_ses->rses_nbackends]; + int i = 0; + + /** + * This sets router closed. Nobody is allowed to use router + * whithout checking this first. */ - if (slave_dcb != NULL) { - CHK_DCB(slave_dcb); - slave_dcb->func.close(slave_dcb); - } + router_cli_ses->rses_closed = true; - if (master_dcb != NULL) { - master_dcb->func.close(master_dcb); - CHK_DCB(master_dcb); + while (*b != NULL) + { + /** decrease server current connection counters */ + atomic_add(&(*b)->backend_server->stats.n_current, -1); + + /** Close those which had been connected */ + if ((*b)->be_dcb != NULL) + { + CHK_DCB((*b)->be_dcb); + dcbs[i] = (*b)->be_dcb; + (*b)->be_dcb = NULL; /*< prevent new uses of DCB */ + dcbs[i]->func.close(dcbs[i]); + } + b++; } + /** Unlock */ + rses_end_locked_router_action(router_cli_ses); } } + static void freeSession( ROUTER* router_instance, void* router_client_session) @@ -545,6 +567,55 @@ static void freeSession( return; } +static bool get_dcb( + DCB** p_dcb, + ROUTER_CLIENT_SES* rses, + backend_type_t btype) +{ + BACKEND** b; + int smallest_nconn = -1; + bool succp = false; + + CHK_CLIENT_RSES(rses); + ss_dassert(*(p_dcb) == NULL); + b = rses->rses_backend; + + if (btype == BE_SLAVE) + { + while (*b != NULL) + { + if ((*b)->be_dcb != NULL && + SERVER_IS_SLAVE((*b)->backend_server) && + (smallest_nconn == -1 || + (*b)->backend_conn_count < smallest_nconn)) + { + *p_dcb = (*b)->be_dcb; + smallest_nconn = (*b)->backend_conn_count; + succp = true; + } + b++; + } + ss_dassert(succp); + } + else if (btype == BE_MASTER || BE_JOINED) + { + while (*b != NULL) + { + if ((*b)->be_dcb != NULL && + (SERVER_IS_MASTER((*b)->backend_server) || + SERVER_IS_JOINED((*b)->backend_server))) + { + *p_dcb = (*b)->be_dcb; + succp = true; + goto return_succp; + } + b++; + } + } +return_succp: + return succp; +} + /** * The main routing entry, this is called with every packet that is * received and has to be forwarded to the backend database. @@ -578,8 +649,7 @@ static int routeQuery( DCB* slave_dcb = NULL; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; - bool rses_is_closed; - rses_property_t* prop; + bool rses_is_closed = false; size_t len; /** if false everything goes to master and session commands to slave too */ static bool autocommit_enabled = true; @@ -593,40 +663,29 @@ static int routeQuery( { rses_is_closed = true; } - else - { - /*< Lock router client session for secure read of DCBs */ - rses_is_closed = - !(rses_begin_locked_router_action(router_cli_ses)); - } - - if (!rses_is_closed) - { - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - /** unlock */ - rses_end_locked_router_action(router_cli_ses); - } - packet = GWBUF_DATA(querybuf); packet_type = packet[4]; - if (rses_is_closed || (master_dcb == NULL && slave_dcb == NULL)) + if (rses_is_closed) { - LOGIF(LE, (skygw_log_write_flush( + LOGIF(LE, + (skygw_log_write_flush( LOGFILE_ERROR, "Error: Failed to route %s:%s:\"%s\" to " "backend server. %s.", STRPACKETTYPE(packet_type), - STRQTYPE(qtype), - (querystr == NULL ? "(empty)" : querystr), - (rses_is_closed ? "Router was closed" : - "Router has no backend servers where to " - "route to")))); + STRQTYPE(qtype), + (querystr == NULL ? "(empty)" : querystr), + (rses_is_closed ? "Router was closed" : + "Router has no backend servers where to " + "route to")))); goto return_ret; } inst->stats.n_queries++; startpos = (char *)&packet[5]; + + master_dcb = router_cli_ses->rses_master->be_dcb; + CHK_DCB(master_dcb); switch(packet_type) { case COM_QUIT: /**< 1 QUIT will close all sessions */ @@ -653,10 +712,11 @@ static int routeQuery( memset(&querystr[len], 0, 1); // querystr = (char *)GWBUF_DATA(plainsqlbuf); /* - querystr = master_dcb->func.getquerystr( - (void *) gwbuf_clone(querybuf), - &querystr_is_copy); - */ + * querystr = master_dcb->func.getquerystr( + * (void *) gwbuf_clone(querybuf), + * &querystr_is_copy); + */ + qtype = skygw_query_classifier_get_type(querystr, 0); break; @@ -719,56 +779,79 @@ static int routeQuery( */ if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE)) { - if (route_session_write( - router_cli_ses, - querybuf, - inst, - packet_type, - qtype)) + bool succp = route_session_write( + router_cli_ses, + querybuf, + inst, + packet_type, + qtype); + + if (succp) { ret = 1; } - else - { - ret = 0; - } + ss_dassert(succp); + ss_dassert(ret == 1); goto return_ret; } - else if (transaction_active) /*< all to master */ + else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) && !transaction_active) { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Transaction is active, routing to Master."))); - - ret = master_dcb->func.write(master_dcb, querybuf); - atomic_add(&inst->stats.n_master, 1); + bool succp; - goto return_ret; - } - else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ)) - { LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Read-only query, routing to Slave."))); - ss_dassert(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ)); - ret = slave_dcb->func.write(slave_dcb, querybuf); - atomic_add(&inst->stats.n_slave, 1); + succp = get_dcb(&slave_dcb, router_cli_ses, BE_SLAVE); + + if (succp) + { + if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1) + { + atomic_add(&inst->stats.n_slave, 1); + } + ss_dassert(ret == 1); + } + ss_dassert(succp); goto return_ret; } - else + else { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Begin transaction, write or unspecified type, " - "routing to Master."))); - ret = master_dcb->func.write(master_dcb, querybuf); - atomic_add(&inst->stats.n_master, 1); + bool succp = true; + if (LOG_IS_ENABLED(LOGFILE_TRACE)) + { + if (transaction_active) /*< all to master */ + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Transaction is active, routing to Master."))); + } + else + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Begin transaction, write or unspecified type, " + "routing to Master."))); + } + } + + if (master_dcb == NULL) + { + succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER); + } + if (succp) + { + if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1) + { + atomic_add(&inst->stats.n_master, 1); + } + } + ss_dassert(succp); + ss_dassert(ret == 1); goto return_ret; } - return_ret: if (plainsqlbuf != NULL) { @@ -905,12 +988,11 @@ static void clientReply( GWBUF* writebuf, DCB* backend_dcb) { - DCB* master_dcb; - DCB* slave_dcb; DCB* client_dcb; ROUTER_CLIENT_SES* router_cli_ses; sescmd_cursor_t* scur = NULL; - backend_type_t be_type = BE_UNDEFINED; + backend_type_t be_type; + BACKEND** be; router_cli_ses = (ROUTER_CLIENT_SES *)router_session; CHK_CLIENT_RSES(router_cli_ses); @@ -926,16 +1008,12 @@ static void clientReply( GWBUF_LENGTH(writebuf))) != NULL); goto lock_failed; } - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - /** Holding lock ensures that router session remains open */ ss_dassert(backend_dcb->session != NULL); client_dcb = backend_dcb->session->client; /** Unlock */ - rses_end_locked_router_action(router_cli_ses); - + rses_end_locked_router_action(router_cli_ses); /** * 1. Check if backend received reply to sescmd. * 2. Check sescmd's state whether OK_PACKET has been @@ -951,29 +1029,32 @@ static void clientReply( writebuf, GWBUF_LENGTH(writebuf))) != NULL); /** Log that client was closed before reply */ - return; + goto lock_failed; } - - if (backend_dcb == master_dcb) - { - be_type = BE_MASTER; - } - else if (backend_dcb == slave_dcb) - { - be_type = BE_SLAVE; - } - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "reply_by_statement", - backend_dcb, - gwbuf_clone(writebuf))); + be = router_cli_ses->rses_backend; + + while (be !=NULL) + { + if ((*be)->be_dcb == backend_dcb) + { + be_type = (SERVER_IS_MASTER((*be)->backend_server) ? BE_MASTER : + (SERVER_IS_SLAVE((*be)->backend_server) ? BE_SLAVE : + (SERVER_IS_JOINED((*be)->backend_server) ? BE_JOINED : BE_UNDEFINED))); + break; + } + be++; + } +// LOGIF(LT, tracelog_routed_query(router_cli_ses, +// "reply_by_statement", +// backend_dcb, +// gwbuf_clone(writebuf))); /** Lock router session */ if (!rses_begin_locked_router_action(router_cli_ses)) { /** Log to debug that router was closed */ goto lock_failed; } - - scur = rses_get_sescmd_cursor(router_cli_ses, be_type); + scur = &(*be)->be_sescmd_cursor; /** * Active cursor means that reply is from session command * execution. Majority of the time there are no session commands @@ -983,8 +1064,7 @@ static void clientReply( { writebuf = sescmd_cursor_process_replies(client_dcb, writebuf, - scur); - + scur); } /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); @@ -1007,126 +1087,148 @@ lock_failed: return; } + +int backend_cmp( + const void* be_1, + const void* be_2) +{ + BACKEND* b1 = *(BACKEND **)be_1; + BACKEND* b2 = *(BACKEND **)be_2; + + return ((b1->backend_conn_count < b2->backend_conn_count) ? -1 : + ((b1->backend_conn_count > b2->backend_conn_count) ? 1 : 0)); +} + /** - * @node Search suitable backend server from those of router instance. + * @node Search suitable backend servers from those of router instance. * * Parameters: * @param p_master - in, use, out - * Pointer to location where master's address is to be stored. - * If NULL, then master is not searched. + * Pointer to location where master's address is to be stored. + * NULL is not allowed. * - * @param p_slave - in, use, out - * Pointer to location where slave's address is to be stored. - * if NULL, then slave is not searched. + * @param b - in, use, out + * Pointer to location where all backend server pointers are stored. + * NULL is not allowed. * - * @param inst - in, use - * Pointer to router instance + * @param router_nservers - in, use + * Number of backend server pointers pointed to by b. + * + * @param max_nslaves - in, use + * Upper limit for the number of slaves. Configuration parameter or default. * - * @return true, if all what what requested found, false if the request - * was not satisfied or was partially satisfied. + * @param session - in, use + * MaxScale session pointer used when connection to backend is established. + * + * @param router - in, use + * Pointer to router instance. Used when server states are qualified. + * + * @return true, if at least one master and one slave was found. * * * @details It is assumed that there is only one master among servers of - * a router instance. As a result, thr first master is always chosen. + * a router instance. As a result, the first master found is chosen. */ -static bool search_backend_servers( - BACKEND** p_master, - BACKEND** p_slave, - ROUTER_INSTANCE* router) -{ - BACKEND* local_backend[BE_COUNT] = {NULL,NULL}; - int i; - bool succp = true; - - /* - * Loop over all the servers and find any that have fewer connections - * than current candidate server. - * - * If a server has less connections than the current candidate it is - * chosen to a new candidate. - * - * If a server has the same number of connections currently as the - * candidate and has had less connections over time than the candidate - * it will also become the new candidate. This has the effect of - * spreading the connections over different servers during periods of - * very low load. - * - * If master is searched for, the first master found is chosen. - */ - for (i = 0; router->servers[i] != NULL; i++) { - BACKEND* be = router->servers[i]; - - if (be != NULL) { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [search_backend_servers] Examine server " - "%s:%d with %d connections. Status is %d, " - "router->bitvalue is %d", - pthread_self(), - be->backend_server->name, - be->backend_server->port, - be->backend_conn_count, - be->backend_server->status, - router->bitmask))); - } - - if (be != NULL && - SERVER_IS_RUNNING(be->backend_server) && - (be->backend_server->status & router->bitmask) == - router->bitvalue) - { - if (SERVER_IS_SLAVE(be->backend_server) && - p_slave != NULL) - { - /** - * If no candidate set, set first running - * server as an initial candidate server. - */ - if (local_backend[BE_SLAVE] == NULL) - { - local_backend[BE_SLAVE] = be; - } - else if (be->backend_conn_count < - local_backend[BE_SLAVE]->backend_conn_count) - { - /** - * This running server has fewer - * connections, set it as a new - * candidate. - */ - local_backend[BE_SLAVE] = be; - } - else if (be->backend_conn_count == - local_backend[BE_SLAVE]->backend_conn_count && - be->backend_server->stats.n_connections < - local_backend[BE_SLAVE]->backend_server->stats.n_connections) - { - /** - * This running server has the same - * number of connections currently - * as the candidate but has had - * fewer connections over time - * than candidate, set this server - * to candidate. - */ - local_backend[BE_SLAVE] = be; - } - } - else if (p_master != NULL && - local_backend[BE_MASTER] == NULL && - SERVER_IS_MASTER(be->backend_server)) - { - local_backend[BE_MASTER] = be; - } - else if (p_master != NULL && - local_backend[BE_JOINED] == NULL && - SERVER_IS_JOINED(be->backend_server)) - { - local_backend[BE_JOINED] = be; - } - } - } +static bool select_connect_backend_servers( + BACKEND** p_master, + BACKEND** b, + int router_nservers, + int max_nslaves, + SESSION* session, + ROUTER_INSTANCE* router) +{ + bool succp = true; + bool master_found = false; + bool master_connected = false; + int slaves_found = 0; + int slaves_connected = 0; + /** + * Sort the pointer list to servers according to connection counts. As + * a consequence those backends having least connections are in the + * beginning of the list. + */ + qsort((void *)b, (size_t)router_nservers, sizeof(void*), backend_cmp); + + /** + * Choose at least 1+1 (master and slave) and at most 1+max_nslaves + * servers from the sorted list. First master found is selected. + */ + while (*b != NULL && (slaves_connected < max_nslaves || !master_connected)) + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "%lu [select_backend_servers] Examine server " + "%s:%d with %d connections. Status is %d, " + "router->bitvalue is %d", + pthread_self(), + (*b)->backend_server->name, + (*b)->backend_server->port, + (*b)->backend_conn_count, + (*b)->backend_server->status, + router->bitmask))); + + if (SERVER_IS_RUNNING((*b)->backend_server) && + ((*b)->backend_server->status & router->bitmask == + router->bitvalue)) + { + if (slaves_found < max_nslaves && + SERVER_IS_SLAVE((*b)->backend_server)) + { + slaves_found += 1; + + (*b)->be_dcb = dcb_connect( + (*b)->backend_server, + session, + (*b)->backend_server->protocol); + + if ((*b)->be_dcb != NULL) + { + slaves_connected += 1; + /** Increase backend connection counter */ + atomic_add(&(*b)->backend_conn_count, 1); + } + else + { + /* handle connect error */ + } + } + else if (!master_connected && + (SERVER_IS_MASTER((*b)->backend_server) || + SERVER_IS_JOINED((*b)->backend_server))) + { + master_found = true; + + (*b)->be_dcb = dcb_connect( + (*b)->backend_server, + session, + (*b)->backend_server->protocol); + + if ((*b)->be_dcb != NULL) + { + master_connected = true; + *p_master = *b; + /** Increase backend connection counter */ + atomic_add(&(*b)->backend_conn_count, 1); + } + else + { + /* handle connect error */ + } + } + } + b++; + } /*< while */ + + if (master_connected && slaves_connected > 0 && slaves_connected <= max_nslaves) + { + succp = true; + } + else + { + /** disconnect and clean up */ + } +#if 0 if (router->bitvalue != 0 && p_master != NULL && local_backend[BE_JOINED] == NULL) @@ -1143,49 +1245,51 @@ static bool search_backend_servers( if (p_slave != NULL && local_backend[BE_SLAVE] == NULL) { succp = false; LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Couldn't find suitable Slave from %d " - "candidates.", - i))); + LOGFILE_ERROR, + "Error : Couldn't find suitable Slave from %d " + "candidates.", + i))); } if (p_master != NULL && local_backend[BE_MASTER] == NULL) { succp = false; LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Couldn't find suitable Master from %d " - "candidates.", - i))); + LOGFILE_ERROR, + "Error : Couldn't find suitable Master from %d " + "candidates.", + i))); } - + if (local_backend[BE_SLAVE] != NULL) { *p_slave = local_backend[BE_SLAVE]; LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [readwritesplit:search_backend_servers] Selected " - "Slave %s:%d from %d candidates.", - pthread_self(), - local_backend[BE_SLAVE]->backend_server->name, - local_backend[BE_SLAVE]->backend_server->port, - i))); + LOGFILE_TRACE, + "%lu [readwritesplit:search_backend_servers] Selected " + "Slave %s:%d from %d candidates.", + pthread_self(), + local_backend[BE_SLAVE]->backend_server->name, + local_backend[BE_SLAVE]->backend_server->port, + i))); } if (local_backend[BE_MASTER] != NULL) { *p_master = local_backend[BE_MASTER]; LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [readwritesplit:search_backend_servers] Selected " - "Master %s:%d " - "from %d candidates.", - pthread_self(), - local_backend[BE_MASTER]->backend_server->name, - local_backend[BE_MASTER]->backend_server->port, - i))); + LOGFILE_TRACE, + "%lu [readwritesplit:search_backend_servers] Selected " + "Master %s:%d " + "from %d candidates.", + pthread_self(), + local_backend[BE_MASTER]->backend_server->name, + local_backend[BE_MASTER]->backend_server->port, + i))); } +#endif return_succp: return succp; } + /** * Create a generic router session property strcture. */ @@ -1397,7 +1501,7 @@ static GWBUF* sescmd_cursor_process_replies( packet = (uint8_t *)GWBUF_DATA(replybuf); packetlen = packet[0]+packet[1]*256+packet[2]*256*256; replybuf = gwbuf_consume(replybuf, packetlen+headerlen); - +/* LOGIF(LT, (skygw_log_write_flush( LOGFILE_TRACE, "%lu [sescmd_cursor_process_replies] cmd %p " @@ -1407,11 +1511,13 @@ static GWBUF* sescmd_cursor_process_replies( scmd, packetlen+headerlen, STRBETYPE(scur->scmd_cur_be_type)))); + */ } else { /** Mark the rest session commands as replied */ scmd->my_sescmd_is_replied = true; + /* LOGIF(LT, (skygw_log_write_flush( LOGFILE_TRACE, "%lu [sescmd_cursor_process_replies] Marked " @@ -1420,6 +1526,7 @@ static GWBUF* sescmd_cursor_process_replies( pthread_self(), scmd, STRBETYPE(scur->scmd_cur_be_type)))); + */ } if (sescmd_cursor_next(scur)) @@ -1459,17 +1566,6 @@ static mysql_sescmd_t* sescmd_cursor_get_command( return scmd; } -/** router must be locked */ -static sescmd_cursor_t* rses_get_sescmd_cursor( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type) -{ - CHK_CLIENT_RSES(rses); - ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); - - return &rses->rses_cursor[be_type]; -} - /** router must be locked */ static bool sescmd_cursor_is_active( sescmd_cursor_t* sescmd_cursor) @@ -1519,24 +1615,26 @@ static GWBUF* sescmd_cursor_clone_querybuf( * Router session must be locked. */ static bool execute_sescmd_in_backend( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type) + BACKEND* backend) { DCB* dcb; bool succp = true; int rc = 0; sescmd_cursor_t* scur; - dcb = rses->rses_dcb[be_type]; - + if (backend->be_dcb == NULL) + { + goto return_succp; + } + dcb = backend->be_dcb; + CHK_DCB(dcb); - CHK_CLIENT_RSES(rses); - ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); + CHK_BACKEND(backend); /** * Get cursor pointer and copy of command buffer to cursor. */ - scur = rses_get_sescmd_cursor(rses, be_type); + scur = &backend->be_sescmd_cursor; /** Return if there are no pending ses commands */ if (sescmd_cursor_get_command(scur) == NULL) @@ -1550,11 +1648,12 @@ static bool execute_sescmd_in_backend( /** Cursor is left active when function returns. */ sescmd_cursor_set_active(scur, true); } + /* LOGIF(LT, tracelog_routed_query(rses, "execute_sescmd_in_backend", dcb, sescmd_cursor_clone_querybuf(scur))); - + */ switch (scur->scmd_cur_cmd->my_sescmd_packet_type) { case COM_CHANGE_USER: rc = dcb->func.auth( @@ -1588,6 +1687,7 @@ return_succp: return succp; } + /** * Moves cursor to next property and copied address of its sescmd to cursor. * Current propery must be non-null. @@ -1665,7 +1765,7 @@ static rses_property_t* mysql_sescmd_get_property( return scmd->my_sescmd_prop; } - +#if 0 static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, char* funcname, @@ -1726,7 +1826,7 @@ static void tracelog_routed_query( } gwbuf_free(buf); } - +#endif /** * Return rc, rc < 0 if router session is closed. rc == 0 if there are no * capabilities specified, rc > 0 when there are capabilities. @@ -1774,17 +1874,17 @@ static bool route_session_write( DCB* master_dcb; DCB* slave_dcb; rses_property_t* prop; + BACKEND** b; - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - CHK_DCB(master_dcb); - CHK_DCB(slave_dcb); LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Session write, query type\t%s, packet type %s, " "routing to all servers.", STRQTYPE(qtype), STRPACKETTYPE(packet_type)))); + + b = router_cli_ses->rses_backend; + /** * COM_QUIT is one-way message. Server doesn't respond to that. * Therefore reply processing is unnecessary and session @@ -1796,13 +1896,24 @@ static bool route_session_write( int rc; int rc2; - rc = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf)); - rc2 = slave_dcb->func.write(slave_dcb, querybuf); - - if (rc == 1 && rc == rc2) + succp = true; + + while (*b != NULL) { - succp = true; + DCB* dcb = (*b)->be_dcb; + + if (dcb != NULL) + { + rc = dcb->func.write(dcb, gwbuf_clone(querybuf)); + + if (rc != 1) + { + succp = false; + } + } + b++; } + gwbuf_free(querybuf); goto return_succp; } prop = rses_property_init(RSES_PROP_TYPE_SESCMD); @@ -1823,23 +1934,18 @@ static bool route_session_write( /** Add sescmd property to router client session */ rses_property_add(router_cli_ses, prop); - /** Execute session command in master */ - succp = execute_sescmd_in_backend(router_cli_ses, BE_MASTER); - - if (!succp) + while (*b != NULL) { - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); - goto return_succp; + succp = execute_sescmd_in_backend((*b)); + + if (!succp) + { + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); + goto return_succp; + } + b++; } - /** Execute session command in slave */ - succp = execute_sescmd_in_backend(router_cli_ses, BE_SLAVE); - if (!succp) - { - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); - goto return_succp; - } /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); @@ -1847,4 +1953,4 @@ static bool route_session_write( return_succp: return succp; -} \ No newline at end of file +} diff --git a/utils/skygw_debug.h b/utils/skygw_debug.h index 8c11d9782..faaf268d4 100644 --- a/utils/skygw_debug.h +++ b/utils/skygw_debug.h @@ -119,7 +119,9 @@ typedef enum skygw_chk_t { CHK_NUM_SESSION, CHK_NUM_ROUTER_SES, CHK_NUM_MY_SESCMD, - CHK_NUM_ROUTER_PROPERTY + CHK_NUM_ROUTER_PROPERTY, + CHK_NUM_SESCMD_CUR, + CHK_NUM_BACKEND } skygw_chk_t; # define STRBOOL(b) ((b) ? "true" : "false") @@ -446,6 +448,17 @@ typedef enum skygw_chk_t { "Session command has invalid check fields"); \ } +#define CHK_SESCMD_CUR(c) { \ + ss_info_dassert((c)->scmd_cur_chk_top == CHK_NUM_SESCMD_CUR && \ + (c)->scmd_cur_chk_tail == CHK_NUM_SESCMD_CUR, \ + "Session command cursor has invalid check fields"); \ + } + +#define CHK_BACKEND(b) { \ + ss_info_dassert((b)->be_chk_top == CHK_NUM_BACKEND && \ + (b)->be_chk_tail == CHK_NUM_BACKEND, \ + "BACKEND has invalid check fields"); \ +} #if defined(SS_DEBUG) bool conn_open[10240]; From 28bc3509cca1d1097bb0317b817ee830d2c0937c Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Thu, 24 Apr 2014 22:05:26 +0300 Subject: [PATCH 2/5] Added new parameter RW Split Router.max_slave_connections=[|%] which specifies the maximum number of slaves which read/write split router connects in each routing session. Parameter it read from config file to CONFIG_CONTEXT's parameter list. It is qualified in service.c:service_set_slave_conn_limit and if qualified, the qualified integer value and the value type are copied to the CONFIG_PARAMETER structure. This CONFIG_PARAMETER struct is cloned (=copied to different memory area) and linked to RW Split SERVICE struct. When RW Split router_instance is created in readwritesplit.c:createInstance, the value is copied to (new) rwsplit_config_t structure from SERVICE's parameter list. When new routing session is created in readwritesplit.c:newSession, the rwsplit_config_t structure is copied to ROUTER_CLIENT_SES struct and the actual max_nslaves value is calculated from the config value (if percentage is used). Tests and many error handling branches are missing but functionality seems to be working. --- server/core/config.c | 163 ++++++++++++++++++ server/core/service.c | 99 +++++++++++ server/include/config.h | 37 +++- server/include/service.h | 11 ++ server/modules/include/readwritesplit.h | 8 + server/modules/routing/readconnroute.c | 10 +- .../routing/readwritesplit/readwritesplit.c | 68 ++++++-- 7 files changed, 373 insertions(+), 23 deletions(-) diff --git a/server/core/config.c b/server/core/config.c index d964ba5fa..00fd947ce 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -195,6 +195,8 @@ int error_count = 0; "router"); if (router) { + char* max_slave_conn_str; + obj->element = service_alloc(obj->object, router); char *user = config_get_value(obj->parameters, "user"); @@ -203,6 +205,10 @@ int error_count = 0; char *enable_root_user = config_get_value(obj->parameters, "enable_root_user"); + max_slave_conn_str = + config_get_value(obj->parameters, + "max_slave_connections"); + if (enable_root_user) serviceEnableRootUser(obj->element, atoi(enable_root_user)); @@ -222,6 +228,35 @@ int error_count = 0; "corresponding password.", obj->object))); } + if (max_slave_conn_str != NULL) + { + CONFIG_PARAMETER* param; + bool succp; + + param = config_get_param(obj->parameters, + "max_slave_connections"); + + succp = service_set_slave_conn_limit( + obj->element, + param, + max_slave_conn_str, + COUNT_ATMOST); + + if (!succp) + { + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : invalid value type " + "for parameter \'%s.%s = %s\'\n\tExpected " + "type is either for slave connection " + "count or\n\t%% for specifying the " + "maximum percentage of available the " + "slaves that will be connected.", + ((SERVICE*)obj->element)->name, + param->name, + param->value))); + } + } } else { @@ -515,6 +550,89 @@ config_get_value(CONFIG_PARAMETER *params, const char *name) return NULL; } + +CONFIG_PARAMETER* config_get_param( + CONFIG_PARAMETER* params, + const char* name) +{ + while (params) + { + if (!strcmp(params->name, name)) + return params; + params = params->next; + } + return NULL; +} + +config_param_type_t config_get_paramtype( + CONFIG_PARAMETER* param) +{ + return param->qfd_param_type; +} + +int config_get_valint( + CONFIG_PARAMETER* param, + const char* name, /*< if NULL examine current param only */ + config_param_type_t ptype) +{ + int val = -1; /*< -1 indicates failure */ + + while (param) + { + if (name == NULL || !strncmp(param->name, name, MAX_PARAM_LEN)) + { + switch (ptype) { + case COUNT_TYPE: + val = param->qfd.valcount; + goto return_val; + + case PERCENT_TYPE: + val = param->qfd.valpercent; + goto return_val; + + case BOOL_TYPE: + val = param->qfd.valbool; + goto return_val; + + default: + goto return_val; + } + } + else if (name == NULL) + { + goto return_val; + } + param = param->next; + } +return_val: + return val; +} + + +CONFIG_PARAMETER* config_clone_param( + CONFIG_PARAMETER* param) +{ + CONFIG_PARAMETER* p2; + + p2 = (CONFIG_PARAMETER*) malloc(sizeof(CONFIG_PARAMETER)); + + if (p2 == NULL) + { + goto return_p2; + } + memcpy(p2, param, sizeof(CONFIG_PARAMETER)); + p2->name = strndup(param->name, MAX_PARAM_LEN); + p2->value = strndup(param->value, MAX_PARAM_LEN); + + if (param->qfd_param_type == STRING_TYPE) + { + p2->qfd.valstr = strndup(param->qfd.valstr, MAX_PARAM_LEN); + } + +return_p2: + return p2; +} + /** * Free a config tree * @@ -861,6 +979,7 @@ static char *service_params[] = "user", "passwd", "enable_root_user", + "max_slave_connections", NULL }; @@ -950,3 +1069,47 @@ int i; obj = obj->next; } } + +/** + * Set qualified parameter value to CONFIG_PARAMETER struct. + */ +bool config_set_qualified_param( + CONFIG_PARAMETER* param, + void* val, + config_param_type_t type) +{ + bool succp; + + switch (type) { + case STRING_TYPE: + param->qfd.valstr = strndup((const char *)val, MAX_PARAM_LEN); + succp = true; + break; + + case COUNT_TYPE: + param->qfd.valcount = *(int *)val; + succp = true; + break; + + case PERCENT_TYPE: + param->qfd.valpercent = *(int *)val; + succp = true; + break; + + case BOOL_TYPE: + param->qfd.valbool = *(bool *)val; + succp = true; + break; + + default: + succp = false; + break; + } + + if (succp) + { + param->qfd_param_type = type; + } + return succp; +} + diff --git a/server/core/service.c b/server/core/service.c index db41f881a..16532a1fa 100644 --- a/server/core/service.c +++ b/server/core/service.c @@ -34,6 +34,8 @@ #include #include #include +#include +#include #include #include #include @@ -52,6 +54,11 @@ extern int lm_enabled_logfiles_bitmask; static SPINLOCK service_spin = SPINLOCK_INIT; static SERVICE *allServices = NULL; +static void service_add_qualified_param( + SERVICE* svc, + CONFIG_PARAMETER* param); + + /** * Allocate a new service for the gateway to support * @@ -752,3 +759,95 @@ int service_refresh_users(SERVICE *service) { else return 1; } + +bool service_set_slave_conn_limit ( + SERVICE* service, + CONFIG_PARAMETER* param, + char* valstr, + count_spec_t count_spec) +{ + char* p; + int valint; + bool percent = false; + bool succp; + + /** + * Find out whether the value is numeric and ends with '%' or '\0' + */ + p = valstr; + + while(isdigit(*p)) p++; + + errno = 0; + + if (p == valstr || (*p != '%' && *p != '\0')) + { + succp = false; + } + else if (*p == '%') + { + if (*(p+1) == '\0') + { + *p = '\0'; + valint = (int) strtol(valstr, (char **)NULL, 10); + + if (valint == 0 && errno != 0) + { + succp = false; + } + else + { + succp = true; + config_set_qualified_param(param, (void *)&valint, PERCENT_TYPE); + } + } + else + { + succp = false; + } + } + else if (*p == '\0') + { + valint = (int) strtol(valstr, (char **)NULL, 10); + + if (valint == 0 && errno != 0) + { + succp = false; + } + else + { + succp = true; + config_set_qualified_param(param, (void *)&valint, COUNT_TYPE); + } + } + + if (succp) + { + service_add_qualified_param(service, param); /*< add param to svc */ + } + return succp; +} + +/** + * Add qualified config parameter to SERVICE struct. + */ +static void service_add_qualified_param( + SERVICE* svc, + CONFIG_PARAMETER* param) +{ + CONFIG_PARAMETER** p = &svc->svc_config_param; + + spinlock_acquire(&svc->spin); + + if ((*p) != NULL) + { + while ((*p)->next != NULL) *p = (*p)->next; + (*p)->next = config_clone_param(param); + } + else + { + (*p) = config_clone_param(param); + } + (*p)->next = NULL; + spinlock_release(&svc->spin); +} \ No newline at end of file diff --git a/server/include/config.h b/server/include/config.h index 88620f015..af91bbf5a 100644 --- a/server/include/config.h +++ b/server/include/config.h @@ -17,6 +17,7 @@ * * Copyright SkySQL Ab 2013 */ +#include /** * @file config.h The configuration handling elements @@ -30,12 +31,32 @@ * @endverbatim */ +/** + * Maximum length for configuration parameter value. + */ +enum {MAX_PARAM_LEN=256}; + +typedef enum { + UNDEFINED_TYPE=0, + STRING_TYPE, + COUNT_TYPE, + PERCENT_TYPE, + BOOL_TYPE +} config_param_type_t; + /** * The config parameter */ typedef struct config_parameter { char *name; /**< The name of the parameter */ - char *value; /**< The value of the parameter */ + char *value; /**< The value of the parameter */ + union { /*< qualified parameter value by type */ + char* valstr; /*< terminated char* array */ + int valcount; /*< int */ + int valpercent; /*< int */ + bool valbool; /*< bool */ + } qfd; + config_param_type_t qfd_param_type; struct config_parameter *next; /**< Next pointer in the linked list */ } CONFIG_PARAMETER; @@ -60,4 +81,18 @@ typedef struct { extern int config_load(char *); extern int config_reload(); extern int config_threadcount(); +CONFIG_PARAMETER* config_get_param(CONFIG_PARAMETER* params, const char* name); + +bool config_set_qualified_param( + CONFIG_PARAMETER* param, + void* val, + config_param_type_t type); + +CONFIG_PARAMETER* config_clone_param(CONFIG_PARAMETER* param); + +int config_get_valint( + CONFIG_PARAMETER* param, + const char* name, /*< if NULL examine current param only */ + config_param_type_t ptype); + #endif diff --git a/server/include/service.h b/server/include/service.h index d52a7eccf..ea44c7f37 100644 --- a/server/include/service.h +++ b/server/include/service.h @@ -22,6 +22,7 @@ #include #include #include +#include "config.h" /** * @file service.h @@ -114,6 +115,7 @@ typedef struct service { SERVICE_STATS stats; /**< The service statistics */ struct users *users; /**< The user data for this service */ int enable_root; /**< Allow root user access */ + CONFIG_PARAMETER* svc_config_param; /*< list of config params and values */ SPINLOCK users_table_spin; /**< The spinlock for users data refresh */ SERVICE_REFRESH_RATE @@ -121,6 +123,8 @@ typedef struct service { struct service *next; /**< The next service in the linked list */ } SERVICE; +typedef enum count_spec_t {COUNT_ATLEAST=0, COUNT_EXACT, COUNT_ATMOST} count_spec_t; + #define SERVICE_STATE_ALLOC 1 /**< The service has been allocated */ #define SERVICE_STATE_STARTED 2 /**< The service has been started */ @@ -146,4 +150,11 @@ extern int service_refresh_users(SERVICE *); extern void printService(SERVICE *); extern void printAllServices(); extern void dprintAllServices(DCB *); + +bool service_set_slave_conn_limit ( + SERVICE* service, + CONFIG_PARAMETER* param, + char* valstr, + count_spec_t count_spec); + #endif diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index ab17b2c35..9a63ab6f0 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -121,6 +121,12 @@ typedef struct backend { #endif } BACKEND; +typedef struct rwsplit_config_st { + int rw_max_slave_conn_percent; + int rw_max_slave_conn_count; +} rwsplit_config_t; + + /** * The client session structure used within this router. */ @@ -135,6 +141,7 @@ struct router_client_session { rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; BACKEND* rses_master; /*< Pointer to master */ BACKEND** rses_backend; /*< All backends used by client session */ + rwsplit_config_t rses_config; /*< copied config info from router instance */ int rses_nbackends; int rses_capabilities; /*< input type, for example */ struct router_client_session* next; @@ -164,6 +171,7 @@ typedef struct router_instance { SPINLOCK lock; /*< Lock for the instance data */ BACKEND** servers; /*< Backend servers */ BACKEND* master; /*< NULL or pointer */ + rwsplit_config_t rwsplit_config; /*< expanded config info from SERVICE */ unsigned int bitmask; /*< Bitmask to apply to server->status */ unsigned int bitvalue; /*< Required value of server->status */ ROUTER_STATS stats; /*< Statistics for this router */ diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index f587b2c63..2bfba7efc 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -252,10 +252,12 @@ int i, n; } else { - LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, - "Warning : Unsupported router " - "option %s for readconnroute.", + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : Unsupported router " + "option \'%s\' for readconnroute. " + "Expected router options are " + "[slave|master|synced]", options[i]))); } } diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index c177d6d62..f056da724 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -154,11 +154,7 @@ static GWBUF* sescmd_cursor_process_replies( GWBUF* replybuf, sescmd_cursor_t* scur); -static bool cont_exec_sescmd_in_backend( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type); - -#if !defined(MAX95) +#if 0 /*< disabled for now due multiple slaves changes */ static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, char* funcname, @@ -227,10 +223,12 @@ static ROUTER* createInstance( SERVICE* service, char** options) { - ROUTER_INSTANCE* router; - SERVER* server; - int nservers; - int i; + ROUTER_INSTANCE* router; + SERVER* server; + int nservers; + int i; + CONFIG_PARAMETER* param; + config_param_type_t paramtype; if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { return NULL; @@ -327,6 +325,27 @@ static ROUTER* createInstance( } } } + /** + * Copy config parameter value from service struct. This becomes the + * default value for every new rwsplit router session. + */ + param = config_get_param(service->svc_config_param, "max_slave_connections"); + + if (param != NULL) + { + paramtype = config_get_paramtype(param); + + if (paramtype == COUNT_TYPE) + { + router->rwsplit_config.rw_max_slave_conn_count = + config_get_valint(param, NULL, paramtype); + } + else if (paramtype == PERCENT_TYPE) + { + router->rwsplit_config.rw_max_slave_conn_percent = + config_get_valint(param, NULL, paramtype); + } + } /** * We have completed the creation of the router data, so now * insert this router into the linked list of routers @@ -350,7 +369,6 @@ static ROUTER* createInstance( * @param session The session itself * @return Session specific data for this session */ - const int conf_max_nslaves = 2; /*< replaces configuration parameter until its developed */ static void* newSession( ROUTER* router_inst, @@ -363,7 +381,8 @@ static void* newSession( ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst; bool succp; int router_nservers = 0; /*< # of servers in total */ - int max_nslaves; /*< max # of slaves used in this session */ + int max_nslaves; /*< max # of slaves used in this session */ + int conf_max_nslaves; /*< value from configuration file */ b = router->servers; @@ -376,7 +395,6 @@ static void* newSession( /** log */ goto return_rses; } - max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); if (client_rses == NULL) @@ -384,6 +402,23 @@ static void* newSession( ss_dassert(false); goto return_rses; } + /** Copy config struct from router instance */ + client_rses->rses_config = router->rwsplit_config; + + /** + * Either copy direct count of slave connections or calculate the count + * from percentage value. + */ + if (client_rses->rses_config.rw_max_slave_conn_count > 0) + { + conf_max_nslaves = client_rses->rses_config.rw_max_slave_conn_count; + } + else + { + conf_max_nslaves = + (router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100; + } + max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); pp_backend = (BACKEND **)calloc(1, (router_nservers)*sizeof(BACKEND *)); /** @@ -1033,7 +1068,7 @@ static void clientReply( } be = router_cli_ses->rses_backend; - while (be !=NULL) + while (*be !=NULL) { if ((*be)->be_dcb == backend_dcb) { @@ -1169,7 +1204,7 @@ static bool select_connect_backend_servers( router->bitmask))); if (SERVER_IS_RUNNING((*b)->backend_server) && - ((*b)->backend_server->status & router->bitmask == + (((*b)->backend_server->status & router->bitmask) == router->bitvalue)) { if (slaves_found < max_nslaves && @@ -1871,8 +1906,6 @@ static bool route_session_write( skygw_query_type_t qtype) { bool succp; - DCB* master_dcb; - DCB* slave_dcb; rses_property_t* prop; BACKEND** b; @@ -1894,8 +1927,7 @@ static bool route_session_write( if (packet_type == COM_QUIT) { int rc; - int rc2; - + succp = true; while (*b != NULL) From c927057b5c1a6f6daf0198dd00286c9cfccd95a2 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Mon, 28 Apr 2014 23:33:49 +0300 Subject: [PATCH 3/5] Fixed two bugs of which one was older. 1. in query_classifier.cc autocommit_enabled, and transaction_active variables maintained their values across different sessions. Now those values are stored in each router_client_ses object. 2. As a part of implementation of MAX-95 session variables were added to BACKEND struct which is shared with all sessions using the SERVICE which the particular BACKEND serves. Now each router_client_ses object has a backend reference struct which includes pointer to BACKEND, DCB and to session command cursor. Added test - set_autocommit_disabled.sql, test_after_autocommit_disabled.sql - to check that session variable is discarded when session where it belongs terminates. --- query_classifier/query_classifier.cc | 1 - server/core/gateway.c | 6 +- server/include/config.h | 11 +- server/modules/include/readwritesplit.h | 41 +- .../routing/readwritesplit/readwritesplit.c | 723 ++++++++++-------- .../routing/readwritesplit/test/rwsplit.sh | 13 + .../test/set_autocommit_disabled.sql | 7 + .../test/test_after_autocommit_disabled.sql | 2 + .../test/test_autocommit_disabled3.sql | 9 + utils/skygw_debug.h | 12 +- 10 files changed, 505 insertions(+), 320 deletions(-) create mode 100644 server/modules/routing/readwritesplit/test/set_autocommit_disabled.sql create mode 100644 server/modules/routing/readwritesplit/test/test_after_autocommit_disabled.sql create mode 100644 server/modules/routing/readwritesplit/test/test_autocommit_disabled3.sql diff --git a/query_classifier/query_classifier.cc b/query_classifier/query_classifier.cc index 7b2ec8b99..82cf85b71 100644 --- a/query_classifier/query_classifier.cc +++ b/query_classifier/query_classifier.cc @@ -764,7 +764,6 @@ static int is_autocommit_stmt( { String str(target, sizeof(target), system_charset_info); String* res = item->val_str(&str); - int rc; if ((rc = find_type(&bool_typelib, res->ptr(), res->length(), false))) { diff --git a/server/core/gateway.c b/server/core/gateway.c index bd773c7f9..2bd592fe7 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -136,7 +136,7 @@ static void usage(void); static char* get_expanded_pathname( char** abs_path, char* input_path, - char* fname); + const char* fname); static void print_log_n_stderr( bool do_log, bool do_stderr, @@ -725,7 +725,7 @@ static bool file_is_writable( static char* get_expanded_pathname( char** output_path, char* relative_path, - char* fname) + const char* fname) { char* cnf_file_buf = NULL; char* expanded_path; @@ -1228,7 +1228,7 @@ int main(int argc, char **argv) datadir))); LOGIF(LM, (skygw_log_write_flush( LOGFILE_MESSAGE, - "Configuration file : %s", + "Configuration file : %s", cnf_file_path))); /*< Update the server options */ diff --git a/server/include/config.h b/server/include/config.h index af91bbf5a..09f2459e1 100644 --- a/server/include/config.h +++ b/server/include/config.h @@ -78,17 +78,18 @@ typedef struct { int n_threads; /**< Number of polling threads */ } GATEWAY_CONF; -extern int config_load(char *); -extern int config_reload(); -extern int config_threadcount(); -CONFIG_PARAMETER* config_get_param(CONFIG_PARAMETER* params, const char* name); +extern int config_load(char *); +extern int config_reload(); +extern int config_threadcount(); +CONFIG_PARAMETER* config_get_param(CONFIG_PARAMETER* params, const char* name); +config_param_type_t config_get_paramtype(CONFIG_PARAMETER* param); +CONFIG_PARAMETER* config_clone_param(CONFIG_PARAMETER* param); bool config_set_qualified_param( CONFIG_PARAMETER* param, void* val, config_param_type_t type); -CONFIG_PARAMETER* config_clone_param(CONFIG_PARAMETER* param); int config_get_valint( CONFIG_PARAMETER* param, diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 9a63ab6f0..408743c25 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -31,7 +31,6 @@ #include - typedef enum backend_type_t { BE_UNDEFINED=-1, BE_MASTER, @@ -105,27 +104,45 @@ typedef struct sescmd_cursor_st { * Internal structure used to define the set of backend servers we are routing * connections to. This provides the storage for routing module specific data * that is required for each of the backend servers. + * + * Owned by router_instance, referenced by each routing session. */ -typedef struct backend { +typedef struct backend_st { #if defined(SS_DEBUG) skygw_chk_t be_chk_top; #endif SERVER* backend_server; /*< The server itself */ int backend_conn_count; /*< Number of connections to the server */ bool be_valid; /*< valid when belongs to the router's configuration */ - DCB* be_dcb; - /*< cursor is pointer and status variable to current session command */ - sescmd_cursor_t be_sescmd_cursor; #if defined(SS_DEBUG) skygw_chk_t be_chk_tail; #endif } BACKEND; + +/** + * Reference to BACKEND. + * + * Owned by router client session. + */ +typedef struct backend_ref_st { +#if defined(SS_DEBUG) + skygw_chk_t bref_chk_top; +#endif + BACKEND* bref_backend; + DCB* bref_dcb; + sescmd_cursor_t bref_sescmd_cur; +#if defined(SS_DEBUG) + skygw_chk_t bref_chk_tail; +#endif +} backend_ref_t; + + typedef struct rwsplit_config_st { int rw_max_slave_conn_percent; int rw_max_slave_conn_count; } rwsplit_config_t; - + /** * The client session structure used within this router. @@ -139,11 +156,13 @@ struct router_client_session { bool rses_closed; /*< true when closeSession is called */ /** Properties listed by their type */ rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; - BACKEND* rses_master; /*< Pointer to master */ - BACKEND** rses_backend; /*< All backends used by client session */ - rwsplit_config_t rses_config; /*< copied config info from router instance */ + backend_ref_t* rses_master_ref; + backend_ref_t* rses_backend_ref; /*< Pointer to backend reference array */ + rwsplit_config_t rses_config; /*< copied config info from router instance */ int rses_nbackends; int rses_capabilities; /*< input type, for example */ + bool rses_autocommit_enabled; + bool rses_transaction_active; struct router_client_session* next; #if defined(SS_DEBUG) skygw_chk_t rses_chk_tail; @@ -178,4 +197,8 @@ typedef struct router_instance { struct router_instance* next; /*< Next router on the list */ } ROUTER_INSTANCE; +#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \ + (SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : \ + (SERVER_IS_JOINED((b)->backend_server) ? BE_JOINED : BE_UNDEFINED))); + #endif /*< _RWSPLITROUTER_H */ diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index f056da724..de4d803a5 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -70,13 +70,13 @@ static void clientReply( DCB* backend_dcb); static uint8_t getCapabilities (ROUTER* inst, void* router_session); -static int backend_cmp( - const void* be_1, - const void* be_2); +int bref_cmp( + const void* bref1, + const void* bref2); static bool select_connect_backend_servers( - BACKEND** p_master, - BACKEND** b, + backend_ref_t** p_master_ref, + backend_ref_t* backend_ref, int router_nservers, int max_nslaves, SESSION* session, @@ -131,7 +131,7 @@ static mysql_sescmd_t* rses_property_get_sescmd( rses_property_t* prop); static bool execute_sescmd_in_backend( - BACKEND* backend); + backend_ref_t* backend_ref); static void sescmd_cursor_set_active( sescmd_cursor_t* sescmd_cursor, @@ -154,13 +154,11 @@ static GWBUF* sescmd_cursor_process_replies( GWBUF* replybuf, sescmd_cursor_t* scur); -#if 0 /*< disabled for now due multiple slaves changes */ static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, char* funcname, - DCB* dcb, + backend_ref_t* bref, GWBUF* buf); -#endif static bool route_session_write( ROUTER_CLIENT_SES* router_client_ses, @@ -255,13 +253,12 @@ static ROUTER* createInstance( if (options != NULL) { - LOGIF(LM, (skygw_log_write_flush( + LOGIF(LM, (skygw_log_write( LOGFILE_MESSAGE, "Router options supplied to read/write statement router " "module but none are supported. The options will be " "ignored."))); } - /** * Create an array of the backend servers in the router structure to * maintain a count of the number of connections to each @@ -284,14 +281,13 @@ static ROUTER* createInstance( router->servers[nservers]->backend_server = server; router->servers[nservers]->backend_conn_count = 0; router->servers[nservers]->be_valid = false; - router->servers[nservers]->be_dcb = NULL; #if defined(SS_DEBUG) router->servers[nservers]->be_chk_top = CHK_NUM_BACKEND; router->servers[nservers]->be_chk_tail = CHK_NUM_BACKEND; #endif nservers += 1; server = server->nextdb; - } + } router->servers[nservers] = NULL; /** @@ -369,45 +365,87 @@ static ROUTER* createInstance( * @param session The session itself * @return Session specific data for this session */ - static void* newSession( ROUTER* router_inst, SESSION* session) { - BACKEND** pp_backend; + backend_ref_t* backend_ref; /*< array of backend references (DCB,BACKEND,cursor) */ + backend_ref_t* master_ref = NULL; /*< pointer to selected master */ BACKEND** b; - BACKEND* master = NULL; /*< pointer to selected master */ ROUTER_CLIENT_SES* client_rses = NULL; ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst; bool succp; int router_nservers = 0; /*< # of servers in total */ int max_nslaves; /*< max # of slaves used in this session */ int conf_max_nslaves; /*< value from configuration file */ - - b = router->servers; - - /** count servers */ - while (*(b++) != NULL) router_nservers++; - - /** Master + Slave is minimum requirement */ - if (router_nservers < 2) - { - /** log */ - goto return_rses; - } + int i; + static int min_nservers = 1; /*< hard-coded for now */ + client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); - + if (client_rses == NULL) { ss_dassert(false); goto return_rses; } +#if defined(SS_DEBUG) + client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; + client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; +#endif /** Copy config struct from router instance */ client_rses->rses_config = router->rwsplit_config; /** - * Either copy direct count of slave connections or calculate the count - * from percentage value. + * Set defaults to session variables. + */ + client_rses->rses_autocommit_enabled = true; + client_rses->rses_transaction_active = false; + + /** count servers */ + b = router->servers; + while (*(b++) != NULL) router_nservers++; + + /** With too few servers session is not created */ + if (router_nservers < min_nservers) + { + /** log this */ + goto return_rses; + } + /** + * Create backend reference objects for this session. + */ + backend_ref = (backend_ref_t *)calloc (1, router_nservers*sizeof(backend_ref_t)); + + if (backend_ref == NULL) + { + /** log this */ + free(client_rses); + goto return_rses; + } + /** + * Initialize backend references with BACKEND ptr. + * Initialize session command cursors for each backend reference. + */ + for (i=0; i< router_nservers; i++) + { +#if defined(SS_DEBUG) + backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF; + backend_ref[i].bref_chk_tail = CHK_NUM_BACKEND_REF; + backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; + backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; +#endif + backend_ref[i].bref_backend = router->servers[i]; + /** store pointers to sescmd list to both cursors */ + backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses; + backend_ref[i].bref_sescmd_cur.scmd_cur_active = false; + backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property = + &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; + backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL; + } + /** + * Find out the number of read backend servers. + * Depending on the configuration value type, either copy direct count + * of slave connections or calculate the count from percentage value. */ if (client_rses->rses_config.rw_max_slave_conn_count > 0) { @@ -417,48 +455,22 @@ static void* newSession( { conf_max_nslaves = (router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100; - } - max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); - pp_backend = (BACKEND **)calloc(1, (router_nservers)*sizeof(BACKEND *)); - - /** - * Copy backend pointer array from global router instance to private use. - */ - memcpy(pp_backend, router->servers, router_nservers*sizeof(BACKEND *)); - ss_dassert(pp_backend[router_nservers] == NULL); - - spinlock_init(&client_rses->rses_lock); -#if defined(SS_DEBUG) - client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; - client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; -#endif - b = pp_backend; - - /** Set up ses cmd objects for each backend */ - while (*b != NULL) - { - /** store pointers to sescmd list to both cursors */ - (*b)->be_sescmd_cursor.scmd_cur_rses = client_rses; - (*b)->be_sescmd_cursor.scmd_cur_active = false; - (*b)->be_sescmd_cursor.scmd_cur_ptr_property = - &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - (*b)->be_sescmd_cursor.scmd_cur_cmd = NULL; -#if defined(SS_DEBUG) - (*b)->be_sescmd_cursor.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; - (*b)->be_sescmd_cursor.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; -#endif - b++; } - /** + max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); + + spinlock_init(&client_rses->rses_lock); + client_rses->rses_backend_ref = backend_ref; + + /** * Find a backend servers to connect to. */ - succp = select_connect_backend_servers(&master, - pp_backend, - router_nservers, - max_nslaves, - session, - router); - + succp = select_connect_backend_servers(&master_ref, + backend_ref, + router_nservers, + max_nslaves, + session, + router); + /** Both Master and at least 1 slave must be found */ if (!succp) { free(client_rses); @@ -466,11 +478,12 @@ static void* newSession( goto return_rses; } /** Copy backend pointers to router session. */ - client_rses->rses_master = master; - client_rses->rses_backend = pp_backend; + client_rses->rses_master_ref = master_ref; + client_rses->rses_backend_ref = backend_ref; client_rses->rses_nbackends = router_nservers; /*< # of backend servers */ client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT; router->stats.n_sessions += 1; + /** * Version is bigger than zero once initialized. */ @@ -504,12 +517,12 @@ static void closeSession( void* router_session) { ROUTER_CLIENT_SES* router_cli_ses; - BACKEND** b; + backend_ref_t* backend_ref; router_cli_ses = (ROUTER_CLIENT_SES *)router_session; CHK_CLIENT_RSES(router_cli_ses); - b = router_cli_ses->rses_backend; + backend_ref = router_cli_ses->rses_backend_ref; /** * Lock router client session for secure read and update. */ @@ -524,28 +537,27 @@ static void closeSession( * whithout checking this first. */ router_cli_ses->rses_closed = true; - - while (*b != NULL) + + for (i=0; irses_nbackends; i++) { /** decrease server current connection counters */ - atomic_add(&(*b)->backend_server->stats.n_current, -1); + atomic_add(&backend_ref[i].bref_backend->backend_server->stats.n_current, -1); /** Close those which had been connected */ - if ((*b)->be_dcb != NULL) + if (backend_ref[i].bref_dcb != NULL) { - CHK_DCB((*b)->be_dcb); - dcbs[i] = (*b)->be_dcb; - (*b)->be_dcb = NULL; /*< prevent new uses of DCB */ + CHK_DCB(backend_ref[i].bref_dcb); + dcbs[i] = backend_ref[i].bref_dcb; + backend_ref[i].bref_dcb = + (DCB *)0xdeadbeef; /*< prevent new uses of DCB */ dcbs[i]->func.close(dcbs[i]); } - b++; } /** Unlock */ rses_end_locked_router_action(router_cli_ses); } } - static void freeSession( ROUTER* router_instance, void* router_client_session) @@ -553,13 +565,16 @@ static void freeSession( ROUTER_CLIENT_SES* router_cli_ses; ROUTER_INSTANCE* router; int i; + backend_ref_t* backend_ref; router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session; - router = (ROUTER_INSTANCE *)router_instance; - - atomic_add(&router_cli_ses->rses_backend[BE_SLAVE]->backend_conn_count, -1); - atomic_add(&router_cli_ses->rses_backend[BE_MASTER]->backend_conn_count, -1); - + router = (ROUTER_INSTANCE *)router_instance; + backend_ref = router_cli_ses->rses_backend_ref; + + for (i=0; irses_nbackends; i++) + { + atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); + } spinlock_acquire(&router->lock); if (router->connections == router_cli_ses) { @@ -602,49 +617,58 @@ static void freeSession( return; } + static bool get_dcb( DCB** p_dcb, ROUTER_CLIENT_SES* rses, backend_type_t btype) { - BACKEND** b; - int smallest_nconn = -1; - bool succp = false; + backend_ref_t* backend_ref; + int smallest_nconn = -1; + int i; + bool succp = false; CHK_CLIENT_RSES(rses); - ss_dassert(*(p_dcb) == NULL); - b = rses->rses_backend; + ss_dassert(p_dcb != NULL && *(p_dcb) == NULL); + + if (p_dcb == NULL) + { + goto return_succp; + } + backend_ref = rses->rses_backend_ref; if (btype == BE_SLAVE) { - while (*b != NULL) - { - if ((*b)->be_dcb != NULL && - SERVER_IS_SLAVE((*b)->backend_server) && + for (i=0; irses_nbackends; i++) + { + BACKEND* b = backend_ref[i].bref_backend; + + if (backend_ref[i].bref_dcb != NULL && + SERVER_IS_SLAVE(b->backend_server) && (smallest_nconn == -1 || - (*b)->backend_conn_count < smallest_nconn)) + b->backend_conn_count < smallest_nconn)) { - *p_dcb = (*b)->be_dcb; - smallest_nconn = (*b)->backend_conn_count; + *p_dcb = backend_ref[i].bref_dcb; + smallest_nconn = b->backend_conn_count; succp = true; } - b++; } ss_dassert(succp); } else if (btype == BE_MASTER || BE_JOINED) { - while (*b != NULL) + for (i=0; irses_nbackends; i++) { - if ((*b)->be_dcb != NULL && - (SERVER_IS_MASTER((*b)->backend_server) || - SERVER_IS_JOINED((*b)->backend_server))) + BACKEND* b = backend_ref[i].bref_backend; + + if (backend_ref[i].bref_dcb != NULL && + (SERVER_IS_MASTER(b->backend_server) || + SERVER_IS_JOINED(b->backend_server))) { - *p_dcb = (*b)->be_dcb; + *p_dcb = backend_ref[i].bref_dcb; succp = true; goto return_succp; } - b++; } } return_succp: @@ -673,23 +697,19 @@ static int routeQuery( void* router_session, GWBUF* querybuf) { - skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; - GWBUF* plainsqlbuf = NULL; - char* querystr = NULL; + skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; + GWBUF* plainsqlbuf = NULL; + char* querystr = NULL; char* startpos; unsigned char packet_type; uint8_t* packet; int ret = 0; - DCB* master_dcb = NULL; - DCB* slave_dcb = NULL; + DCB* master_dcb = NULL; + DCB* slave_dcb = NULL; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; bool rses_is_closed = false; size_t len; - /** if false everything goes to master and session commands to slave too */ - static bool autocommit_enabled = true; - /** if true everything goes to master and session commands to slave too */ - static bool transaction_active = false; CHK_CLIENT_RSES(router_cli_ses); @@ -719,7 +739,7 @@ static int routeQuery( inst->stats.n_queries++; startpos = (char *)&packet[5]; - master_dcb = router_cli_ses->rses_master->be_dcb; + master_dcb = router_cli_ses->rses_master_ref->bref_dcb; CHK_DCB(master_dcb); switch(packet_type) { @@ -778,36 +798,36 @@ static int routeQuery( * transaction becomes active and master gets all statements until * transaction is committed and autocommit is enabled again. */ - if (autocommit_enabled && + if (router_cli_ses->rses_autocommit_enabled && QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT)) { - autocommit_enabled = false; + router_cli_ses->rses_autocommit_enabled = false; - if (!transaction_active) + if (!router_cli_ses->rses_transaction_active) { - transaction_active = true; + router_cli_ses->rses_transaction_active = true; } } - else if (!transaction_active && + else if (!router_cli_ses->rses_transaction_active && QUERY_IS_TYPE(qtype, QUERY_TYPE_BEGIN_TRX)) { - transaction_active = true; + router_cli_ses->rses_transaction_active = true; } /** * Explicit COMMIT and ROLLBACK, implicit COMMIT. */ - if (autocommit_enabled && - transaction_active && + if (router_cli_ses->rses_autocommit_enabled && + router_cli_ses->rses_transaction_active && (QUERY_IS_TYPE(qtype,QUERY_TYPE_COMMIT) || QUERY_IS_TYPE(qtype,QUERY_TYPE_ROLLBACK))) { - transaction_active = false; + router_cli_ses->rses_transaction_active = false; } - else if (!autocommit_enabled && + else if (!router_cli_ses->rses_autocommit_enabled && QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT)) { - autocommit_enabled = true; - transaction_active = false; + router_cli_ses->rses_autocommit_enabled = true; + router_cli_ses->rses_transaction_active = false; } /** * Session update is always routed in the same way. @@ -829,7 +849,8 @@ static int routeQuery( ss_dassert(ret == 1); goto return_ret; } - else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) && !transaction_active) + else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) && + !router_cli_ses->rses_transaction_active) { bool succp; @@ -857,7 +878,7 @@ static int routeQuery( if (LOG_IS_ENABLED(LOGFILE_TRACE)) { - if (transaction_active) /*< all to master */ + if (router_cli_ses->rses_transaction_active) /*< all to master */ { LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, @@ -1026,8 +1047,8 @@ static void clientReply( DCB* client_dcb; ROUTER_CLIENT_SES* router_cli_ses; sescmd_cursor_t* scur = NULL; - backend_type_t be_type; - BACKEND** be; + backend_ref_t* backend_ref; + int i; router_cli_ses = (ROUTER_CLIENT_SES *)router_session; CHK_CLIENT_RSES(router_cli_ses); @@ -1066,30 +1087,29 @@ static void clientReply( /** Log that client was closed before reply */ goto lock_failed; } - be = router_cli_ses->rses_backend; - - while (*be !=NULL) - { - if ((*be)->be_dcb == backend_dcb) - { - be_type = (SERVER_IS_MASTER((*be)->backend_server) ? BE_MASTER : - (SERVER_IS_SLAVE((*be)->backend_server) ? BE_SLAVE : - (SERVER_IS_JOINED((*be)->backend_server) ? BE_JOINED : BE_UNDEFINED))); - break; - } - be++; - } -// LOGIF(LT, tracelog_routed_query(router_cli_ses, -// "reply_by_statement", -// backend_dcb, -// gwbuf_clone(writebuf))); /** Lock router session */ if (!rses_begin_locked_router_action(router_cli_ses)) { /** Log to debug that router was closed */ goto lock_failed; } - scur = &(*be)->be_sescmd_cursor; + backend_ref = router_cli_ses->rses_backend_ref; + + /** find backend_dcb's corresponding BACKEND */ + i = 0; + while (irses_nbackends && + backend_ref[i].bref_dcb != backend_dcb) + { + i++; + } + ss_dassert(backend_ref[i].bref_dcb == backend_dcb); + + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "reply_by_statement", + &backend_ref[i], + gwbuf_clone(writebuf))); + + scur = &backend_ref[i].bref_sescmd_cur; /** * Active cursor means that reply is from session command * execution. Majority of the time there are no session commands @@ -1123,13 +1143,13 @@ lock_failed: } -int backend_cmp( - const void* be_1, - const void* be_2) +int bref_cmp( + const void* bref1, + const void* bref2) { - BACKEND* b1 = *(BACKEND **)be_1; - BACKEND* b2 = *(BACKEND **)be_2; - + BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend; + BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend; + return ((b1->backend_conn_count < b2->backend_conn_count) ? -1 : ((b1->backend_conn_count > b2->backend_conn_count) ? 1 : 0)); } @@ -1138,12 +1158,12 @@ int backend_cmp( * @node Search suitable backend servers from those of router instance. * * Parameters: - * @param p_master - in, use, out - * Pointer to location where master's address is to be stored. + * @param p_master_ref - in, use, out + * Pointer to location where master's backend reference is to be stored. * NULL is not allowed. * - * @param b - in, use, out - * Pointer to location where all backend server pointers are stored. + * @param backend_ref - in, use, out + * Pointer to backend server reference object array. * NULL is not allowed. * * @param router_nservers - in, use @@ -1165,63 +1185,85 @@ int backend_cmp( * a router instance. As a result, the first master found is chosen. */ static bool select_connect_backend_servers( - BACKEND** p_master, - BACKEND** b, + backend_ref_t** p_master_ref, + backend_ref_t* backend_ref, int router_nservers, int max_nslaves, SESSION* session, ROUTER_INSTANCE* router) { - bool succp = true; - bool master_found = false; - bool master_connected = false; - int slaves_found = 0; - int slaves_connected = 0; + bool succp = true; + bool master_found = false; + bool master_connected = false; + int slaves_found = 0; + int slaves_connected = 0; + int i; + const int min_nslaves = 0; /*< not configurable at the time */ + bool is_synced_master; + + if (p_master_ref == NULL || backend_ref == NULL) + { + ss_dassert(FALSE); + succp = false; + goto return_succp; + } + + if (router->bitvalue != 0) /*< 'synced' is the only bitvalue in rwsplit */ + { + is_synced_master = true; + } + else + { + is_synced_master = false; + } /** * Sort the pointer list to servers according to connection counts. As * a consequence those backends having least connections are in the * beginning of the list. */ - qsort((void *)b, (size_t)router_nservers, sizeof(void*), backend_cmp); + qsort((void *)backend_ref, (size_t)router_nservers, sizeof(backend_ref_t), bref_cmp); /** * Choose at least 1+1 (master and slave) and at most 1+max_nslaves * servers from the sorted list. First master found is selected. */ - while (*b != NULL && (slaves_connected < max_nslaves || !master_connected)) - { + for (i=0; + ibitvalue is %d", pthread_self(), - (*b)->backend_server->name, - (*b)->backend_server->port, - (*b)->backend_conn_count, - (*b)->backend_server->status, - router->bitmask))); + b->backend_server->name, + b->backend_server->port, + b->backend_conn_count, + b->backend_server->status, + router->bitmask))); - if (SERVER_IS_RUNNING((*b)->backend_server) && - (((*b)->backend_server->status & router->bitmask) == + if (SERVER_IS_RUNNING(b->backend_server) && + ((b->backend_server->status & router->bitmask) == router->bitvalue)) { if (slaves_found < max_nslaves && - SERVER_IS_SLAVE((*b)->backend_server)) + SERVER_IS_SLAVE(b->backend_server)) { slaves_found += 1; - - (*b)->be_dcb = dcb_connect( - (*b)->backend_server, + backend_ref[i].bref_dcb = dcb_connect( + b->backend_server, session, - (*b)->backend_server->protocol); + b->backend_server->protocol); - if ((*b)->be_dcb != NULL) + if (backend_ref[i].bref_dcb != NULL) { slaves_connected += 1; /** Increase backend connection counter */ - atomic_add(&(*b)->backend_conn_count, 1); + atomic_add(&b->backend_conn_count, 1); } else { @@ -1229,22 +1271,22 @@ static bool select_connect_backend_servers( } } else if (!master_connected && - (SERVER_IS_MASTER((*b)->backend_server) || - SERVER_IS_JOINED((*b)->backend_server))) + (SERVER_IS_MASTER(b->backend_server) || + SERVER_IS_JOINED(b->backend_server))) { master_found = true; - (*b)->be_dcb = dcb_connect( - (*b)->backend_server, + backend_ref[i].bref_dcb = dcb_connect( + b->backend_server, session, - (*b)->backend_server->protocol); + b->backend_server->protocol); - if ((*b)->be_dcb != NULL) + if (backend_ref[i].bref_dcb != NULL) { master_connected = true; - *p_master = *b; + *p_master_ref = &backend_ref[i]; /** Increase backend connection counter */ - atomic_add(&(*b)->backend_conn_count, 1); + atomic_add(&b->backend_conn_count, 1); } else { @@ -1252,79 +1294,170 @@ static bool select_connect_backend_servers( } } } - b++; - } /*< while */ + } /*< for */ - if (master_connected && slaves_connected > 0 && slaves_connected <= max_nslaves) + /** + * Successful cases + */ + if (master_connected && + slaves_connected >= min_nslaves && + slaves_connected <= max_nslaves) { succp = true; + + if (slaves_connected == 0 && slaves_found > 0) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Warning : Couldn't connect to any of the %d " + "slaves. Routing to %s only.", + slaves_found, + (is_synced_master ? "Galera nodes" : "Master")))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : Couldn't connect to any of the %d " + "slaves. Routing to %s only.", + slaves_found, + (is_synced_master ? "Galera nodes" : "Master")))); + } + else if (slaves_found == 0) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Warning : Couldn't find any slaves from existing " + "%d servers. Routing to %s only.", + router_nservers, + (is_synced_master ? "Galera nodes" : "Master")))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : Couldn't find any slaves from existing " + "%d servers. Routing to %s only.", + router_nservers, + (is_synced_master ? "Galera nodes" : "Master")))); + } + else if (slaves_connected < max_nslaves) + { + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "Note : Couldn't connect to maximum number of " + "slaves. Connected successfully to %d slaves " + "of %d of them.", + slaves_connected, + slaves_found))); + } + + if (LOG_IS_ENABLED(LT)) + { + for (i=0; ibackend_server->name, + b->backend_server->port))); + } + } /* for */ + } } + /** + * Failure cases + */ else - { - /** disconnect and clean up */ + { + if (!master_found) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : Couldn't find suitable %s from %d " + "candidates.", + (is_synced_master ? "Galera node" : "Master"), + router_nservers))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Error : Couldn't find suitable %s from %d " + "candidates.", + (is_synced_master ? "Galera node" : "Master"), + router_nservers))); + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Error : Couldn't find suitable %s from %d " + "candidates.", + (is_synced_master ? "Galera node" : "Master"), + router_nservers))); + } + else if (!master_connected) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : Couldn't connect to any %s although " + "there exists at least one %s node in the " + "cluster.", + (is_synced_master ? "Galera node" : "Master"), + (is_synced_master ? "Galera node" : "Master")))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Error : Couldn't connect to any %s although " + "there exists at least one %s node in the " + "cluster.", + (is_synced_master ? "Galera node" : "Master"), + (is_synced_master ? "Galera node" : "Master")))); + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Error : Couldn't connect to any %s although " + "there exists at least one %s node in the " + "cluster.", + (is_synced_master ? "Galera node" : "Master"), + (is_synced_master ? "Galera node" : "Master")))); + } + + if (slaves_connected < min_nslaves) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : Couldn't establish required amount of " + "slave connections for router session."))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "*Error : Couldn't establish required amount of " + "slave connections for router session."))); + } + + /** Clean up connections */ + for (i=0; ifunc.close(backend_ref[i].bref_dcb); + atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); + } + } + master_connected = false; + slaves_connected = 0; } -#if 0 - if (router->bitvalue != 0 && - p_master != NULL && - local_backend[BE_JOINED] == NULL) - { - succp = false; - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Couldn't find a Joined Galera node from %d " - "candidates.", - i))); - goto return_succp; - } - - if (p_slave != NULL && local_backend[BE_SLAVE] == NULL) { - succp = false; - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Couldn't find suitable Slave from %d " - "candidates.", - i))); - } - - if (p_master != NULL && local_backend[BE_MASTER] == NULL) { - succp = false; - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Couldn't find suitable Master from %d " - "candidates.", - i))); - } - - if (local_backend[BE_SLAVE] != NULL) { - *p_slave = local_backend[BE_SLAVE]; - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [readwritesplit:search_backend_servers] Selected " - "Slave %s:%d from %d candidates.", - pthread_self(), - local_backend[BE_SLAVE]->backend_server->name, - local_backend[BE_SLAVE]->backend_server->port, - i))); - } - if (local_backend[BE_MASTER] != NULL) { - *p_master = local_backend[BE_MASTER]; - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [readwritesplit:search_backend_servers] Selected " - "Master %s:%d " - "from %d candidates.", - pthread_self(), - local_backend[BE_MASTER]->backend_server->name, - local_backend[BE_MASTER]->backend_server->port, - i))); - } -#endif return_succp: + return succp; } - /** * Create a generic router session property strcture. */ @@ -1650,26 +1783,26 @@ static GWBUF* sescmd_cursor_clone_querybuf( * Router session must be locked. */ static bool execute_sescmd_in_backend( - BACKEND* backend) + backend_ref_t* backend_ref) { DCB* dcb; bool succp = true; int rc = 0; sescmd_cursor_t* scur; - if (backend->be_dcb == NULL) + if (backend_ref->bref_dcb == NULL) { goto return_succp; } - dcb = backend->be_dcb; + dcb = backend_ref->bref_dcb; CHK_DCB(dcb); - CHK_BACKEND(backend); + CHK_BACKEND_REF(backend_ref); /** * Get cursor pointer and copy of command buffer to cursor. */ - scur = &backend->be_sescmd_cursor; + scur = &backend_ref->bref_sescmd_cur; /** Return if there are no pending ses commands */ if (sescmd_cursor_get_command(scur) == NULL) @@ -1683,12 +1816,12 @@ static bool execute_sescmd_in_backend( /** Cursor is left active when function returns. */ sescmd_cursor_set_active(scur, true); } - /* - LOGIF(LT, tracelog_routed_query(rses, + + LOGIF(LT, tracelog_routed_query(scur->scmd_cur_rses, "execute_sescmd_in_backend", - dcb, + backend_ref, sescmd_cursor_clone_querybuf(scur))); - */ + switch (scur->scmd_cur_cmd->my_sescmd_packet_type) { case COM_CHANGE_USER: rc = dcb->func.auth( @@ -1800,11 +1933,10 @@ static rses_property_t* mysql_sescmd_get_property( return scmd->my_sescmd_prop; } -#if 0 static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, char* funcname, - DCB* dcb, + backend_ref_t* bref, GWBUF* buf) { uint8_t* packet = GWBUF_DATA(buf); @@ -1813,20 +1945,18 @@ static void tracelog_routed_query( size_t buflen = GWBUF_LENGTH(buf); char* querystr; char* startpos = (char *)&packet[5]; + BACKEND* b; backend_type_t be_type; - - if (rses->rses_dcb[BE_MASTER] == dcb) - { - be_type = BE_MASTER; - } - else if (rses->rses_dcb[BE_SLAVE] == dcb) - { - be_type = BE_SLAVE; - } - else - { - be_type = BE_UNDEFINED; - } + DCB* dcb; + + CHK_BACKEND_REF(bref); + b = bref->bref_backend; + CHK_BACKEND(b); + dcb = bref->bref_dcb; + CHK_DCB(dcb); + + be_type = BACKEND_TYPE(b); + if (GWBUF_TYPE(buf) == GWBUF_TYPE_MYSQL) { len = packet[0]; @@ -1845,23 +1975,16 @@ static void tracelog_routed_query( funcname, buflen, querystr, - (be_type == BE_MASTER ? - rses->rses_backend[BE_MASTER]->backend_server->name : - (be_type == BE_SLAVE ? - rses->rses_backend[BE_SLAVE]->backend_server->name : - "Target DCB is neither of the backends. This is error")), - (be_type == BE_MASTER ? - rses->rses_backend[BE_MASTER]->backend_server->port : - (be_type == BE_SLAVE ? - rses->rses_backend[BE_SLAVE]->backend_server->port : - -1)), + b->backend_server->name, + b->backend_server->port, STRBETYPE(be_type), dcb))); } } gwbuf_free(buf); } -#endif + + /** * Return rc, rc < 0 if router session is closed. rc == 0 if there are no * capabilities specified, rc > 0 when there are capabilities. @@ -1907,8 +2030,9 @@ static bool route_session_write( { bool succp; rses_property_t* prop; - BACKEND** b; - + backend_ref_t* backend_ref; + int i; + LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Session write, query type\t%s, packet type %s, " @@ -1916,7 +2040,7 @@ static bool route_session_write( STRQTYPE(qtype), STRPACKETTYPE(packet_type)))); - b = router_cli_ses->rses_backend; + backend_ref = router_cli_ses->rses_backend_ref; /** * COM_QUIT is one-way message. Server doesn't respond to that. @@ -1930,9 +2054,9 @@ static bool route_session_write( succp = true; - while (*b != NULL) + for (i=0; irses_nbackends; i++) { - DCB* dcb = (*b)->be_dcb; + DCB* dcb = backend_ref[i].bref_dcb; if (dcb != NULL) { @@ -1943,7 +2067,6 @@ static bool route_session_write( succp = false; } } - b++; } gwbuf_free(querybuf); goto return_succp; @@ -1965,10 +2088,10 @@ static bool route_session_write( } /** Add sescmd property to router client session */ rses_property_add(router_cli_ses, prop); - - while (*b != NULL) + + for (i=0; irses_nbackends; i++) { - succp = execute_sescmd_in_backend((*b)); + succp = execute_sescmd_in_backend(&backend_ref[i]); if (!succp) { @@ -1976,7 +2099,6 @@ static bool route_session_write( rses_end_locked_router_action(router_cli_ses); goto return_succp; } - b++; } /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); @@ -1986,3 +2108,4 @@ static bool route_session_write( return_succp: return succp; } + diff --git a/server/modules/routing/readwritesplit/test/rwsplit.sh b/server/modules/routing/readwritesplit/test/rwsplit.sh index fc0c56822..d64fcf495 100755 --- a/server/modules/routing/readwritesplit/test/rwsplit.sh +++ b/server/modules/routing/readwritesplit/test/rwsplit.sh @@ -169,6 +169,8 @@ else echo "$TINPUT PASSED">>$TLOG ; fi +# Disable autocommit in the first session and then test in new session that +# it is again enabled. TINPUT=test_autocommit_disabled2.sql TRETVAL=1 a=`$RUNCMD < ./$TINPUT` @@ -178,3 +180,14 @@ else echo "$TINPUT PASSED">>$TLOG ; fi +TINPUT=set_autocommit_disabled.sql +`$RUNCMD < ./$TINPUT` +TINPUT=test_after_autocommit_disabled.sql +TRETVAL=$TMASTER_ID +a=`$RUNCMD < ./$TINPUT` +if [ "$a" == "$TRETVAL" ]; then + echo "$TINPUT FAILED, return value $a when it was not accetable">>$TLOG; +else + echo "$TINPUT PASSED">>$TLOG ; +fi + diff --git a/server/modules/routing/readwritesplit/test/set_autocommit_disabled.sql b/server/modules/routing/readwritesplit/test/set_autocommit_disabled.sql new file mode 100644 index 000000000..a182b4922 --- /dev/null +++ b/server/modules/routing/readwritesplit/test/set_autocommit_disabled.sql @@ -0,0 +1,7 @@ +use test; +drop table if exists t1; +create table t1 (id integer); +set autocommit=0; -- open transaction +begin; +insert into t1 values(1); -- write to master +commit; diff --git a/server/modules/routing/readwritesplit/test/test_after_autocommit_disabled.sql b/server/modules/routing/readwritesplit/test/test_after_autocommit_disabled.sql new file mode 100644 index 000000000..f10c5eb8c --- /dev/null +++ b/server/modules/routing/readwritesplit/test/test_after_autocommit_disabled.sql @@ -0,0 +1,2 @@ +use test; +select @@server_id; \ No newline at end of file diff --git a/server/modules/routing/readwritesplit/test/test_autocommit_disabled3.sql b/server/modules/routing/readwritesplit/test/test_autocommit_disabled3.sql new file mode 100644 index 000000000..04be4024e --- /dev/null +++ b/server/modules/routing/readwritesplit/test/test_autocommit_disabled3.sql @@ -0,0 +1,9 @@ +use test; +drop table if exists t1; +create table t1 (id integer); +set autocommit=0; -- open transaction +begin; +insert into t1 values(1); -- write to master +commit; +select count(*) from t1; -- read from master since autocommit is disabled +drop table t1; diff --git a/utils/skygw_debug.h b/utils/skygw_debug.h index faaf268d4..5d0f9955f 100644 --- a/utils/skygw_debug.h +++ b/utils/skygw_debug.h @@ -121,7 +121,8 @@ typedef enum skygw_chk_t { CHK_NUM_MY_SESCMD, CHK_NUM_ROUTER_PROPERTY, CHK_NUM_SESCMD_CUR, - CHK_NUM_BACKEND + CHK_NUM_BACKEND, + CHK_NUM_BACKEND_REF } skygw_chk_t; # define STRBOOL(b) ((b) ? "true" : "false") @@ -459,7 +460,14 @@ typedef enum skygw_chk_t { (b)->be_chk_tail == CHK_NUM_BACKEND, \ "BACKEND has invalid check fields"); \ } - + +#define CHK_BACKEND_REF(r) { \ + ss_info_dassert((r)->bref_chk_top == CHK_NUM_BACKEND_REF && \ + (r)->bref_chk_tail == CHK_NUM_BACKEND_REF, \ + "Backend reference has invalid check fields"); \ +} + + #if defined(SS_DEBUG) bool conn_open[10240]; #endif From 8a40a44823d3971db0fafc19ca77efba676ea92e Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Tue, 29 Apr 2014 10:10:09 +0300 Subject: [PATCH 4/5] Removed this because it is deprecated. --- server/include/gateway_mysql.h | 126 --------------------------------- 1 file changed, 126 deletions(-) delete mode 100644 server/include/gateway_mysql.h diff --git a/server/include/gateway_mysql.h b/server/include/gateway_mysql.h deleted file mode 100644 index 01c569bba..000000000 --- a/server/include/gateway_mysql.h +++ /dev/null @@ -1,126 +0,0 @@ -/* - * This file is distributed as part of the SkySQL Gateway. It is free - * software: you can redistribute it and/or modify it under the terms of the - * GNU General Public License as published by the Free Software Foundation, - * version 2. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS - * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more - * details. - * - * You should have received a copy of the GNU General Public License along with - * this program; if not, write to the Free Software Foundation, Inc., 51 - * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Copyright SkySQL Ab 2013 - * - */ - -/* - * MYSQL mysql protocol header file - * Revision History - * - * Date Who Description - * 10/06/13 Massimiliano Pinto Initial implementation - * - */ -#include - -/* Protocol packing macros. */ -#define gw_mysql_set_byte2(__buffer, __int) do { \ - (__buffer)[0]= (uint8_t)((__int) & 0xFF); \ - (__buffer)[1]= (uint8_t)(((__int) >> 8) & 0xFF); } while (0) -#define gw_mysql_set_byte3(__buffer, __int) do { \ - (__buffer)[0]= (uint8_t)((__int) & 0xFF); \ - (__buffer)[1]= (uint8_t)(((__int) >> 8) & 0xFF); \ - (__buffer)[2]= (uint8_t)(((__int) >> 16) & 0xFF); } while (0) -#define gw_mysql_set_byte4(__buffer, __int) do { \ - (__buffer)[0]= (uint8_t)((__int) & 0xFF); \ - (__buffer)[1]= (uint8_t)(((__int) >> 8) & 0xFF); \ - (__buffer)[2]= (uint8_t)(((__int) >> 16) & 0xFF); \ - (__buffer)[3]= (uint8_t)(((__int) >> 24) & 0xFF); } while (0) - - -/* Protocol unpacking macros. */ -#define gw_mysql_get_byte2(__buffer) \ - (uint16_t)((__buffer)[0] | \ - ((__buffer)[1] << 8)) -#define gw_mysql_get_byte3(__buffer) \ - (uint32_t)((__buffer)[0] | \ - ((__buffer)[1] << 8) | \ - ((__buffer)[2] << 16)) -#define gw_mysql_get_byte4(__buffer) \ - (uint32_t)((__buffer)[0] | \ - ((__buffer)[1] << 8) | \ - ((__buffer)[2] << 16) | \ - ((__buffer)[3] << 24)) -#define gw_mysql_get_byte8(__buffer) \ - ((uint64_t)(__buffer)[0] | \ - ((uint64_t)(__buffer)[1] << 8) | \ - ((uint64_t)(__buffer)[2] << 16) | \ - ((uint64_t)(__buffer)[3] << 24) | \ - ((uint64_t)(__buffer)[4] << 32) | \ - ((uint64_t)(__buffer)[5] << 40) | \ - ((uint64_t)(__buffer)[6] << 48) | \ - ((uint64_t)(__buffer)[7] << 56)) - -typedef enum -{ - GW_MYSQL_CAPABILITIES_NONE= 0, - GW_MYSQL_CAPABILITIES_LONG_PASSWORD= (1 << 0), - GW_MYSQL_CAPABILITIES_FOUND_ROWS= (1 << 1), - GW_MYSQL_CAPABILITIES_LONG_FLAG= (1 << 2), - GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB= (1 << 3), - GW_MYSQL_CAPABILITIES_NO_SCHEMA= (1 << 4), - GW_MYSQL_CAPABILITIES_COMPRESS= (1 << 5), - GW_MYSQL_CAPABILITIES_ODBC= (1 << 6), - GW_MYSQL_CAPABILITIES_LOCAL_FILES= (1 << 7), - GW_MYSQL_CAPABILITIES_IGNORE_SPACE= (1 << 8), - GW_MYSQL_CAPABILITIES_PROTOCOL_41= (1 << 9), - GW_MYSQL_CAPABILITIES_INTERACTIVE= (1 << 10), - GW_MYSQL_CAPABILITIES_SSL= (1 << 11), - GW_MYSQL_CAPABILITIES_IGNORE_SIGPIPE= (1 << 12), - GW_MYSQL_CAPABILITIES_TRANSACTIONS= (1 << 13), - GW_MYSQL_CAPABILITIES_RESERVED= (1 << 14), - GW_MYSQL_CAPABILITIES_SECURE_CONNECTION= (1 << 15), - GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS= (1 << 16), - GW_MYSQL_CAPABILITIES_MULTI_RESULTS= (1 << 17), - GW_MYSQL_CAPABILITIES_PS_MULTI_RESULTS= (1 << 18), - GW_MYSQL_CAPABILITIES_PLUGIN_AUTH= (1 << 19), - GW_MYSQL_CAPABILITIES_SSL_VERIFY_SERVER_CERT= (1 << 30), - GW_MYSQL_CAPABILITIES_REMEMBER_OPTIONS= (1 << 31), - GW_MYSQL_CAPABILITIES_CLIENT= (GW_MYSQL_CAPABILITIES_LONG_PASSWORD | - GW_MYSQL_CAPABILITIES_FOUND_ROWS | - GW_MYSQL_CAPABILITIES_LONG_FLAG | - GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB | - GW_MYSQL_CAPABILITIES_LOCAL_FILES | - GW_MYSQL_CAPABILITIES_PLUGIN_AUTH | - GW_MYSQL_CAPABILITIES_TRANSACTIONS | - GW_MYSQL_CAPABILITIES_PROTOCOL_41 | - GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS | - GW_MYSQL_CAPABILITIES_MULTI_RESULTS | - GW_MYSQL_CAPABILITIES_PS_MULTI_RESULTS | - GW_MYSQL_CAPABILITIES_SECURE_CONNECTION), - GW_MYSQL_CAPABILITIES_CLIENT_COMPRESS= (GW_MYSQL_CAPABILITIES_LONG_PASSWORD | - GW_MYSQL_CAPABILITIES_FOUND_ROWS | - GW_MYSQL_CAPABILITIES_LONG_FLAG | - GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB | - GW_MYSQL_CAPABILITIES_LOCAL_FILES | - GW_MYSQL_CAPABILITIES_PLUGIN_AUTH | - GW_MYSQL_CAPABILITIES_TRANSACTIONS | - GW_MYSQL_CAPABILITIES_PROTOCOL_41 | - GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS | - GW_MYSQL_CAPABILITIES_MULTI_RESULTS | - GW_MYSQL_CAPABILITIES_PS_MULTI_RESULTS | - GW_MYSQL_CAPABILITIES_COMPRESS - ), -} gw_mysql_capabilities_t; - - -#define SMALL_CHUNK 1024 -#define MAX_CHUNK SMALL_CHUNK * 8 * 4 -#define ToHex(Y) (Y>='0'&&Y<='9'?Y-'0':Y-'A'+10) - -extern int mysql_send_ok(DCB *, int, int, const char *); -extern int MySQLSendHandshake(DCB *); From b5e9428ff7dcee1ad7e4ddd0dd404fb67b44e511 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Tue, 29 Apr 2014 14:50:09 +0300 Subject: [PATCH 5/5] log_manager.cc fixed memory leak, block buffer mutex names weren't freed. query_classifier.cc use of uninitialized value in skygw_stmt_causes_implicit_commit config.c crashed if module load failed, use of unitialized value load_utils.c pretty-printed error service.c use of uninitialized value in service_add_qualified_param modules.h function prototype readwritesplit.c memory leaks --- log_manager/log_manager.cc | 10 ++++++-- query_classifier/query_classifier.cc | 4 ++++ server/core/config.c | 12 ++++++++++ server/core/load_utils.c | 23 ++++++++++++++----- server/core/service.c | 19 ++++++++++++++- server/include/modules.h | 2 ++ .../routing/readwritesplit/readwritesplit.c | 12 ++++++++++ 7 files changed, 73 insertions(+), 9 deletions(-) diff --git a/log_manager/log_manager.cc b/log_manager/log_manager.cc index 022523488..c8f96266e 100644 --- a/log_manager/log_manager.cc +++ b/log_manager/log_manager.cc @@ -253,6 +253,7 @@ static int logmanager_write_log( va_list valist); static blockbuf_t* blockbuf_init(logfile_id_t id); +static void blockbuf_node_done(void* bb_data); static char* blockbuf_get_writepos( #if 0 int** refcount, @@ -996,8 +997,13 @@ static char* blockbuf_get_writepos( simple_mutex_unlock(&bb->bb_mutex); return pos; } - +static void blockbuf_node_done( + void* bb_data) +{ + blockbuf_t* bb = (blockbuf_t *)bb_data; + simple_mutex_done(&bb->bb_mutex); +} static blockbuf_t* blockbuf_init( @@ -2059,7 +2065,7 @@ static bool logfile_init( if (mlist_init(&logfile->lf_blockbuf_list, NULL, strdup("logfile block buffer list"), - NULL, + blockbuf_node_done, MAXNBLOCKBUFS) == NULL) { ss_dfprintf(stderr, diff --git a/query_classifier/query_classifier.cc b/query_classifier/query_classifier.cc index 82cf85b71..aafd746ce 100644 --- a/query_classifier/query_classifier.cc +++ b/query_classifier/query_classifier.cc @@ -706,6 +706,10 @@ static bool skygw_stmt_causes_implicit_commit( { succp = true; } + else + { + succp =false; + } break; default: succp = true; diff --git a/server/core/config.c b/server/core/config.c index 00fd947ce..c35d75ddd 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -205,6 +205,18 @@ int error_count = 0; char *enable_root_user = config_get_value(obj->parameters, "enable_root_user"); + if (obj->element == NULL) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Reading configuration " + "for router service '%s' failed. " + "Router %s is not loaded.", + obj->object, + obj->object))); + obj = obj->next; + continue; /*< process next obj */ + } max_slave_conn_str = config_get_value(obj->parameters, "max_slave_connections"); diff --git a/server/core/load_utils.c b/server/core/load_utils.c index 0a88b5348..28c95a3ae 100644 --- a/server/core/load_utils.c +++ b/server/core/load_utils.c @@ -53,6 +53,17 @@ static void register_module(const char *module, void *modobj); static void unregister_module(const char *module); +char* get_maxscale_home(void) +{ + char* home = getenv("MAXSCALE_HOME"); + if (home == NULL) + { + home = "/usr/local/skysql/MaxScale"; + } + return home; +} + + /** * Load the dynamic library related to a gateway module. The routine * will look for library files in the current directory, @@ -82,10 +93,10 @@ MODULES *mod; sprintf(fname, "./lib%s.so", module); if (access(fname, F_OK) == -1) { - if ((home = getenv("MAXSCALE_HOME")) == NULL) - home = "/usr/local/skysql/MaxScale"; + home = get_maxscale_home (); sprintf(fname, "%s/modules/lib%s.so", home, module); - if (access(fname, F_OK) == -1) + + if (access(fname, F_OK) == -1) { LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, @@ -100,7 +111,7 @@ MODULES *mod; LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Unable to load library for module: " - "%s, %s.", + "%s\n\t\t\t %s.", module, dlerror()))); return NULL; @@ -111,7 +122,7 @@ MODULES *mod; LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Version interface not supported by " - "module: %s, %s.", + "module: %s\n\t\t\t %s.", module, dlerror()))); dlclose(dlhandle); @@ -134,7 +145,7 @@ MODULES *mod; LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Expected entry point interface missing " - "from module: %s, %s.", + "from module: %s\n\t\t\t %s.", module, dlerror()))); dlclose(dlhandle); diff --git a/server/core/service.c b/server/core/service.c index 16532a1fa..b951a4c35 100644 --- a/server/core/service.c +++ b/server/core/service.c @@ -77,6 +77,20 @@ SERVICE *service; return NULL; if ((service->router = load_module(router, MODULE_ROUTER)) == NULL) { + char* home = get_maxscale_home(); + char* ldpath = getenv("LD_LIBRARY_PATH"); + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to load %s module \"%s\".\n\t\t\t" + " Ensure that lib%s.so exists in one of the " + "following directories :\n\t\t\t " + "- %s/modules\n\t\t\t - %s", + MODULE_ROUTER, + router, + router, + home, + ldpath))); free(service); return NULL; } @@ -91,6 +105,7 @@ SERVICE *service; service->enable_root = 0; service->routerOptions = NULL; service->databases = NULL; + service->svc_config_param = NULL; spinlock_init(&service->spin); spinlock_init(&service->users_table_spin); memset(&service->rate_limit, 0, sizeof(SERVICE_REFRESH_RATE)); @@ -835,10 +850,12 @@ static void service_add_qualified_param( SERVICE* svc, CONFIG_PARAMETER* param) { - CONFIG_PARAMETER** p = &svc->svc_config_param; + CONFIG_PARAMETER** p; spinlock_acquire(&svc->spin); + p = &svc->svc_config_param; + if ((*p) != NULL) { while ((*p)->next != NULL) *p = (*p)->next; diff --git a/server/include/modules.h b/server/include/modules.h index f3ec14f07..c90cf45a1 100644 --- a/server/include/modules.h +++ b/server/include/modules.h @@ -55,4 +55,6 @@ extern void *load_module(const char *module, const char *type); extern void unload_module(const char *module); extern void printModules(); extern void dprintAllModules(DCB *); +char* get_maxscale_home(void); + #endif diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index de4d803a5..7283f98a1 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -473,6 +473,7 @@ static void* newSession( /** Both Master and at least 1 slave must be found */ if (!succp) { + free(client_rses->rses_backend_ref); free(client_rses); client_rses = NULL; goto return_rses; @@ -613,6 +614,7 @@ static void freeSession( * all the memory and other resources associated * to the client session. */ + free(router_cli_ses->rses_backend_ref); free(router_cli_ses); return; } @@ -793,6 +795,15 @@ static int routeQuery( LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Packet type\t%s", STRPACKETTYPE(packet_type)))); +#if defined(AUTOCOMMIT_OPT) + if ((QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT) && + !router_cli_ses->rses_autocommit_enabled) || + (QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) && + router_cli_ses->rses_autocommit_enabled)) + { + /** reply directly to client */ + } +#endif /** * If autocommit is disabled or transaction is explicitly started * transaction becomes active and master gets all statements until @@ -1979,6 +1990,7 @@ static void tracelog_routed_query( b->backend_server->port, STRBETYPE(be_type), dcb))); + free(querystr); } } gwbuf_free(buf);