diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 024a8bd88..ab17b2c35 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -31,15 +31,14 @@ #include -/** - * Internal structure used to define the set of backend servers we are routing - * connections to. This provides the storage for routing module specific data - * that is required for each of the backend servers. - */ -typedef struct backend { - SERVER* backend_server; /*< The server itself */ - int backend_conn_count; /*< Number of connections to the server */ -} BACKEND; + +typedef enum backend_type_t { + BE_UNDEFINED=-1, + BE_MASTER, + BE_JOINED = BE_MASTER, + BE_SLAVE, + BE_COUNT +} backend_type_t; typedef struct rses_property_st rses_property_t; typedef struct router_client_session ROUTER_CLIENT_SES; @@ -52,14 +51,6 @@ typedef enum rses_property_type_t { RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1 } rses_property_type_t; -typedef enum backend_type_t { - BE_UNDEFINED=-1, - BE_MASTER, - BE_JOINED = BE_MASTER, - BE_SLAVE, - BE_COUNT -} backend_type_t; - /** * Session variable command */ @@ -98,13 +89,38 @@ struct rses_property_st { }; typedef struct sescmd_cursor_st { +#if defined(SS_DEBUG) + skygw_chk_t scmd_cur_chk_top; +#endif ROUTER_CLIENT_SES* scmd_cur_rses; /*< pointer to owning router session */ rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */ mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */ bool scmd_cur_active; /*< true if command is being executed */ - backend_type_t scmd_cur_be_type; /*< BE_MASTER or BE_SLAVE */ +#if defined(SS_DEBUG) + skygw_chk_t scmd_cur_chk_tail; +#endif } sescmd_cursor_t; +/** + * Internal structure used to define the set of backend servers we are routing + * connections to. This provides the storage for routing module specific data + * that is required for each of the backend servers. + */ +typedef struct backend { +#if defined(SS_DEBUG) + skygw_chk_t be_chk_top; +#endif + SERVER* backend_server; /*< The server itself */ + int backend_conn_count; /*< Number of connections to the server */ + bool be_valid; /*< valid when belongs to the router's configuration */ + DCB* be_dcb; + /*< cursor is pointer and status variable to current session command */ + sescmd_cursor_t be_sescmd_cursor; +#if defined(SS_DEBUG) + skygw_chk_t be_chk_tail; +#endif +} BACKEND; + /** * The client session structure used within this router. */ @@ -113,14 +129,13 @@ struct router_client_session { skygw_chk_t rses_chk_top; #endif SPINLOCK rses_lock; /*< protects rses_deleted */ - int rses_versno; /*< even = no active update, else odd */ + int rses_versno; /*< even = no active update, else odd. not used 4/14 */ bool rses_closed; /*< true when closeSession is called */ /** Properties listed by their type */ rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT]; - BACKEND* rses_backend[BE_COUNT];/*< Backends used by client session */ - DCB* rses_dcb[BE_COUNT]; - /*< cursor is pointer and status variable to current session command */ - sescmd_cursor_t rses_cursor[BE_COUNT]; + BACKEND* rses_master; /*< Pointer to master */ + BACKEND** rses_backend; /*< All backends used by client session */ + int rses_nbackends; int rses_capabilities; /*< input type, for example */ struct router_client_session* next; #if defined(SS_DEBUG) diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 5bd20f4f9..1e394d7d3 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -682,7 +682,7 @@ int gw_read_client_event(DCB* dcb) { dcb, 1, 0, - "Query routing failed. Connection to " + "Can't route query. Connection to " "backend lost"); protocol->state = MYSQL_IDLE; } diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 7b2285078..c177d6d62 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -70,11 +70,23 @@ static void clientReply( DCB* backend_dcb); static uint8_t getCapabilities (ROUTER* inst, void* router_session); +static int backend_cmp( + const void* be_1, + const void* be_2); + +static bool select_connect_backend_servers( + BACKEND** p_master, + BACKEND** b, + int router_nservers, + int max_nslaves, + SESSION* session, + ROUTER_INSTANCE* router); + +static bool get_dcb( + DCB** dcb, + ROUTER_CLIENT_SES* rses, + backend_type_t btype); -static bool search_backend_servers( - BACKEND** p_master, - BACKEND** p_slave, - ROUTER_INSTANCE* router); static ROUTER_OBJECT MyObject = { createInstance, @@ -118,13 +130,8 @@ static void rses_property_done( static mysql_sescmd_t* rses_property_get_sescmd( rses_property_t* prop); -static sescmd_cursor_t* rses_get_sescmd_cursor( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type); - static bool execute_sescmd_in_backend( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type); + BACKEND* backend); static void sescmd_cursor_set_active( sescmd_cursor_t* sescmd_cursor, @@ -151,11 +158,13 @@ static bool cont_exec_sescmd_in_backend( ROUTER_CLIENT_SES* rses, backend_type_t be_type); +#if !defined(MAX95) static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, char* funcname, DCB* dcb, GWBUF* buf); +#endif static bool route_session_write( ROUTER_CLIENT_SES* router_client_ses, @@ -220,7 +229,7 @@ static ROUTER* createInstance( { ROUTER_INSTANCE* router; SERVER* server; - int n; + int nservers; int i; if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { @@ -231,11 +240,14 @@ static ROUTER* createInstance( /** Calculate number of servers */ server = service->databases; + nservers = 0; - for (n=0; server != NULL; server=server->nextdb) { - n++; + while (server != NULL) + { + nservers++; + server=server->nextdb; } - router->servers = (BACKEND **)calloc(n + 1, sizeof(BACKEND *)); + router->servers = (BACKEND **)calloc(nservers + 1, sizeof(BACKEND *)); if (router->servers == NULL) { @@ -258,23 +270,31 @@ static ROUTER* createInstance( * backend server. */ server = service->databases; - n = 0; + nservers= 0; + while (server != NULL) { - if ((router->servers[n] = malloc(sizeof(BACKEND))) == NULL) + if ((router->servers[nservers] = malloc(sizeof(BACKEND))) == NULL) { - for (i = 0; i < n; i++) { + /** clean up */ + for (i = 0; i < nservers; i++) { free(router->servers[i]); } free(router->servers); free(router); return NULL; } - router->servers[n]->backend_server = server; - router->servers[n]->backend_conn_count = 0; - n += 1; + router->servers[nservers]->backend_server = server; + router->servers[nservers]->backend_conn_count = 0; + router->servers[nservers]->be_valid = false; + router->servers[nservers]->be_dcb = NULL; +#if defined(SS_DEBUG) + router->servers[nservers]->be_chk_top = CHK_NUM_BACKEND; + router->servers[nservers]->be_chk_tail = CHK_NUM_BACKEND; +#endif + nservers += 1; server = server->nextdb; } - router->servers[n] = NULL; + router->servers[nservers] = NULL; /** * vraa : is this necessary for readwritesplit ? @@ -330,97 +350,92 @@ static ROUTER* createInstance( * @param session The session itself * @return Session specific data for this session */ + const int conf_max_nslaves = 2; /*< replaces configuration parameter until its developed */ + static void* newSession( ROUTER* router_inst, SESSION* session) { - BACKEND* local_backend[BE_COUNT]; - ROUTER_CLIENT_SES* client_rses; - ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst; + BACKEND** pp_backend; + BACKEND** b; + BACKEND* master = NULL; /*< pointer to selected master */ + ROUTER_CLIENT_SES* client_rses = NULL; + ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst; bool succp; - - client_rses = - (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); + int router_nservers = 0; /*< # of servers in total */ + int max_nslaves; /*< max # of slaves used in this session */ + + b = router->servers; + + /** count servers */ + while (*(b++) != NULL) router_nservers++; + + /** Master + Slave is minimum requirement */ + if (router_nservers < 2) + { + /** log */ + goto return_rses; + } + max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); + client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); if (client_rses == NULL) { ss_dassert(false); - return NULL; + goto return_rses; } - memset(local_backend, 0, BE_COUNT*sizeof(void*)); + pp_backend = (BACKEND **)calloc(1, (router_nservers)*sizeof(BACKEND *)); + + /** + * Copy backend pointer array from global router instance to private use. + */ + memcpy(pp_backend, router->servers, router_nservers*sizeof(BACKEND *)); + ss_dassert(pp_backend[router_nservers] == NULL); + spinlock_init(&client_rses->rses_lock); #if defined(SS_DEBUG) client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; #endif - /** store pointers to sescmd list to both cursors */ - client_rses->rses_cursor[BE_MASTER].scmd_cur_rses = client_rses; - client_rses->rses_cursor[BE_MASTER].scmd_cur_active = false; - client_rses->rses_cursor[BE_MASTER].scmd_cur_ptr_property = - &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - client_rses->rses_cursor[BE_MASTER].scmd_cur_cmd = NULL; - client_rses->rses_cursor[BE_MASTER].scmd_cur_be_type = BE_MASTER; - - client_rses->rses_cursor[BE_SLAVE].scmd_cur_rses = client_rses; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_active = false; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_ptr_property = - &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_cmd = NULL; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_be_type = BE_SLAVE; + b = pp_backend; + /** Set up ses cmd objects for each backend */ + while (*b != NULL) + { + /** store pointers to sescmd list to both cursors */ + (*b)->be_sescmd_cursor.scmd_cur_rses = client_rses; + (*b)->be_sescmd_cursor.scmd_cur_active = false; + (*b)->be_sescmd_cursor.scmd_cur_ptr_property = + &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; + (*b)->be_sescmd_cursor.scmd_cur_cmd = NULL; +#if defined(SS_DEBUG) + (*b)->be_sescmd_cursor.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; + (*b)->be_sescmd_cursor.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; +#endif + b++; + } /** - * Find a backend server to connect to. This is the extent of the - * load balancing algorithm we need to implement for this simple - * connection router. + * Find a backend servers to connect to. */ - succp = search_backend_servers(&local_backend[BE_MASTER], - &local_backend[BE_SLAVE], - router); + succp = select_connect_backend_servers(&master, + pp_backend, + router_nservers, + max_nslaves, + session, + router); - /** Both Master and Slave must be found */ + /** Both Master and at least 1 slave must be found */ if (!succp) { free(client_rses); - return NULL; - } - /** - * Open the slave connection. - */ - client_rses->rses_dcb[BE_SLAVE] = dcb_connect( - local_backend[BE_SLAVE]->backend_server, - session, - local_backend[BE_SLAVE]->backend_server->protocol); - - if (client_rses->rses_dcb[BE_SLAVE] == NULL) { - ss_dassert(session->refcount == 1); - free(client_rses); - return NULL; - } - /** - * Open the master connection. - */ - client_rses->rses_dcb[BE_MASTER] = dcb_connect( - local_backend[BE_MASTER]->backend_server, - session, - local_backend[BE_MASTER]->backend_server->protocol); - if (client_rses->rses_dcb[BE_MASTER] == NULL) - { - /** Close slave connection first. */ - client_rses->rses_dcb[BE_SLAVE]->func.close(client_rses->rses_dcb[BE_SLAVE]); - free(client_rses); - return NULL; - } - /** - * We now have a master and a slave server with the least connections. - * Bump the connection counts for these servers. - */ - atomic_add(&local_backend[BE_SLAVE]->backend_conn_count, 1); - atomic_add(&local_backend[BE_MASTER]->backend_conn_count, 1); - - client_rses->rses_backend[BE_SLAVE] = local_backend[BE_SLAVE]; - client_rses->rses_backend[BE_MASTER] = local_backend[BE_MASTER]; - router->stats.n_sessions += 1; - + client_rses = NULL; + goto return_rses; + } + /** Copy backend pointers to router session. */ + client_rses->rses_master = master; + client_rses->rses_backend = pp_backend; + client_rses->rses_nbackends = router_nservers; /*< # of backend servers */ client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT; + router->stats.n_sessions += 1; /** * Version is bigger than zero once initialized. */ @@ -430,15 +445,18 @@ static void* newSession( * Add this session to end of the list of active sessions in router. */ spinlock_acquire(&router->lock); - client_rses->next = router->connections; + client_rses->next = router->connections; router->connections = client_rses; spinlock_release(&router->lock); CHK_CLIENT_RSES(client_rses); - + +return_rses: return (void *)client_rses; } + + /** * Close a session with the router, this is the mechanism * by which a router may cleanup data structure etc. @@ -451,44 +469,48 @@ static void closeSession( void* router_session) { ROUTER_CLIENT_SES* router_cli_ses; - DCB* slave_dcb; - DCB* master_dcb; - + BACKEND** b; + router_cli_ses = (ROUTER_CLIENT_SES *)router_session; - CHK_CLIENT_RSES(router_cli_ses); + CHK_CLIENT_RSES(router_cli_ses); + + b = router_cli_ses->rses_backend; /** * Lock router client session for secure read and update. */ - if (rses_begin_locked_router_action(router_cli_ses)) + if (!router_cli_ses->rses_closed && + rses_begin_locked_router_action(router_cli_ses)) { - /** decrease server current connection counters */ - atomic_add(&router_cli_ses->rses_backend[BE_SLAVE]->backend_server->stats.n_current, -1); - atomic_add(&router_cli_ses->rses_backend[BE_MASTER]->backend_server->stats.n_current, -1); - - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - router_cli_ses->rses_dcb[BE_SLAVE] = NULL; - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - router_cli_ses->rses_dcb[BE_MASTER] = NULL; - - router_cli_ses->rses_closed = true; - /** Unlock */ - rses_end_locked_router_action(router_cli_ses); - - /** - * Close the backend server connections + DCB* dcbs[router_cli_ses->rses_nbackends]; + int i = 0; + + /** + * This sets router closed. Nobody is allowed to use router + * whithout checking this first. */ - if (slave_dcb != NULL) { - CHK_DCB(slave_dcb); - slave_dcb->func.close(slave_dcb); - } + router_cli_ses->rses_closed = true; - if (master_dcb != NULL) { - master_dcb->func.close(master_dcb); - CHK_DCB(master_dcb); + while (*b != NULL) + { + /** decrease server current connection counters */ + atomic_add(&(*b)->backend_server->stats.n_current, -1); + + /** Close those which had been connected */ + if ((*b)->be_dcb != NULL) + { + CHK_DCB((*b)->be_dcb); + dcbs[i] = (*b)->be_dcb; + (*b)->be_dcb = NULL; /*< prevent new uses of DCB */ + dcbs[i]->func.close(dcbs[i]); + } + b++; } + /** Unlock */ + rses_end_locked_router_action(router_cli_ses); } } + static void freeSession( ROUTER* router_instance, void* router_client_session) @@ -545,6 +567,55 @@ static void freeSession( return; } +static bool get_dcb( + DCB** p_dcb, + ROUTER_CLIENT_SES* rses, + backend_type_t btype) +{ + BACKEND** b; + int smallest_nconn = -1; + bool succp = false; + + CHK_CLIENT_RSES(rses); + ss_dassert(*(p_dcb) == NULL); + b = rses->rses_backend; + + if (btype == BE_SLAVE) + { + while (*b != NULL) + { + if ((*b)->be_dcb != NULL && + SERVER_IS_SLAVE((*b)->backend_server) && + (smallest_nconn == -1 || + (*b)->backend_conn_count < smallest_nconn)) + { + *p_dcb = (*b)->be_dcb; + smallest_nconn = (*b)->backend_conn_count; + succp = true; + } + b++; + } + ss_dassert(succp); + } + else if (btype == BE_MASTER || BE_JOINED) + { + while (*b != NULL) + { + if ((*b)->be_dcb != NULL && + (SERVER_IS_MASTER((*b)->backend_server) || + SERVER_IS_JOINED((*b)->backend_server))) + { + *p_dcb = (*b)->be_dcb; + succp = true; + goto return_succp; + } + b++; + } + } +return_succp: + return succp; +} + /** * The main routing entry, this is called with every packet that is * received and has to be forwarded to the backend database. @@ -578,8 +649,7 @@ static int routeQuery( DCB* slave_dcb = NULL; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; - bool rses_is_closed; - rses_property_t* prop; + bool rses_is_closed = false; size_t len; /** if false everything goes to master and session commands to slave too */ static bool autocommit_enabled = true; @@ -593,40 +663,29 @@ static int routeQuery( { rses_is_closed = true; } - else - { - /*< Lock router client session for secure read of DCBs */ - rses_is_closed = - !(rses_begin_locked_router_action(router_cli_ses)); - } - - if (!rses_is_closed) - { - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - /** unlock */ - rses_end_locked_router_action(router_cli_ses); - } - packet = GWBUF_DATA(querybuf); packet_type = packet[4]; - if (rses_is_closed || (master_dcb == NULL && slave_dcb == NULL)) + if (rses_is_closed) { - LOGIF(LE, (skygw_log_write_flush( + LOGIF(LE, + (skygw_log_write_flush( LOGFILE_ERROR, "Error: Failed to route %s:%s:\"%s\" to " "backend server. %s.", STRPACKETTYPE(packet_type), - STRQTYPE(qtype), - (querystr == NULL ? "(empty)" : querystr), - (rses_is_closed ? "Router was closed" : - "Router has no backend servers where to " - "route to")))); + STRQTYPE(qtype), + (querystr == NULL ? "(empty)" : querystr), + (rses_is_closed ? "Router was closed" : + "Router has no backend servers where to " + "route to")))); goto return_ret; } inst->stats.n_queries++; startpos = (char *)&packet[5]; + + master_dcb = router_cli_ses->rses_master->be_dcb; + CHK_DCB(master_dcb); switch(packet_type) { case COM_QUIT: /**< 1 QUIT will close all sessions */ @@ -653,10 +712,11 @@ static int routeQuery( memset(&querystr[len], 0, 1); // querystr = (char *)GWBUF_DATA(plainsqlbuf); /* - querystr = master_dcb->func.getquerystr( - (void *) gwbuf_clone(querybuf), - &querystr_is_copy); - */ + * querystr = master_dcb->func.getquerystr( + * (void *) gwbuf_clone(querybuf), + * &querystr_is_copy); + */ + qtype = skygw_query_classifier_get_type(querystr, 0); break; @@ -719,56 +779,79 @@ static int routeQuery( */ if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE)) { - if (route_session_write( - router_cli_ses, - querybuf, - inst, - packet_type, - qtype)) + bool succp = route_session_write( + router_cli_ses, + querybuf, + inst, + packet_type, + qtype); + + if (succp) { ret = 1; } - else - { - ret = 0; - } + ss_dassert(succp); + ss_dassert(ret == 1); goto return_ret; } - else if (transaction_active) /*< all to master */ + else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) && !transaction_active) { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Transaction is active, routing to Master."))); - - ret = master_dcb->func.write(master_dcb, querybuf); - atomic_add(&inst->stats.n_master, 1); + bool succp; - goto return_ret; - } - else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ)) - { LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Read-only query, routing to Slave."))); - ss_dassert(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ)); - ret = slave_dcb->func.write(slave_dcb, querybuf); - atomic_add(&inst->stats.n_slave, 1); + succp = get_dcb(&slave_dcb, router_cli_ses, BE_SLAVE); + + if (succp) + { + if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1) + { + atomic_add(&inst->stats.n_slave, 1); + } + ss_dassert(ret == 1); + } + ss_dassert(succp); goto return_ret; } - else + else { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "Begin transaction, write or unspecified type, " - "routing to Master."))); - ret = master_dcb->func.write(master_dcb, querybuf); - atomic_add(&inst->stats.n_master, 1); + bool succp = true; + if (LOG_IS_ENABLED(LOGFILE_TRACE)) + { + if (transaction_active) /*< all to master */ + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Transaction is active, routing to Master."))); + } + else + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Begin transaction, write or unspecified type, " + "routing to Master."))); + } + } + + if (master_dcb == NULL) + { + succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER); + } + if (succp) + { + if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1) + { + atomic_add(&inst->stats.n_master, 1); + } + } + ss_dassert(succp); + ss_dassert(ret == 1); goto return_ret; } - return_ret: if (plainsqlbuf != NULL) { @@ -905,12 +988,11 @@ static void clientReply( GWBUF* writebuf, DCB* backend_dcb) { - DCB* master_dcb; - DCB* slave_dcb; DCB* client_dcb; ROUTER_CLIENT_SES* router_cli_ses; sescmd_cursor_t* scur = NULL; - backend_type_t be_type = BE_UNDEFINED; + backend_type_t be_type; + BACKEND** be; router_cli_ses = (ROUTER_CLIENT_SES *)router_session; CHK_CLIENT_RSES(router_cli_ses); @@ -926,16 +1008,12 @@ static void clientReply( GWBUF_LENGTH(writebuf))) != NULL); goto lock_failed; } - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - /** Holding lock ensures that router session remains open */ ss_dassert(backend_dcb->session != NULL); client_dcb = backend_dcb->session->client; /** Unlock */ - rses_end_locked_router_action(router_cli_ses); - + rses_end_locked_router_action(router_cli_ses); /** * 1. Check if backend received reply to sescmd. * 2. Check sescmd's state whether OK_PACKET has been @@ -951,29 +1029,32 @@ static void clientReply( writebuf, GWBUF_LENGTH(writebuf))) != NULL); /** Log that client was closed before reply */ - return; + goto lock_failed; } - - if (backend_dcb == master_dcb) - { - be_type = BE_MASTER; - } - else if (backend_dcb == slave_dcb) - { - be_type = BE_SLAVE; - } - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "reply_by_statement", - backend_dcb, - gwbuf_clone(writebuf))); + be = router_cli_ses->rses_backend; + + while (be !=NULL) + { + if ((*be)->be_dcb == backend_dcb) + { + be_type = (SERVER_IS_MASTER((*be)->backend_server) ? BE_MASTER : + (SERVER_IS_SLAVE((*be)->backend_server) ? BE_SLAVE : + (SERVER_IS_JOINED((*be)->backend_server) ? BE_JOINED : BE_UNDEFINED))); + break; + } + be++; + } +// LOGIF(LT, tracelog_routed_query(router_cli_ses, +// "reply_by_statement", +// backend_dcb, +// gwbuf_clone(writebuf))); /** Lock router session */ if (!rses_begin_locked_router_action(router_cli_ses)) { /** Log to debug that router was closed */ goto lock_failed; } - - scur = rses_get_sescmd_cursor(router_cli_ses, be_type); + scur = &(*be)->be_sescmd_cursor; /** * Active cursor means that reply is from session command * execution. Majority of the time there are no session commands @@ -983,8 +1064,7 @@ static void clientReply( { writebuf = sescmd_cursor_process_replies(client_dcb, writebuf, - scur); - + scur); } /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); @@ -1007,126 +1087,148 @@ lock_failed: return; } + +int backend_cmp( + const void* be_1, + const void* be_2) +{ + BACKEND* b1 = *(BACKEND **)be_1; + BACKEND* b2 = *(BACKEND **)be_2; + + return ((b1->backend_conn_count < b2->backend_conn_count) ? -1 : + ((b1->backend_conn_count > b2->backend_conn_count) ? 1 : 0)); +} + /** - * @node Search suitable backend server from those of router instance. + * @node Search suitable backend servers from those of router instance. * * Parameters: * @param p_master - in, use, out - * Pointer to location where master's address is to be stored. - * If NULL, then master is not searched. + * Pointer to location where master's address is to be stored. + * NULL is not allowed. * - * @param p_slave - in, use, out - * Pointer to location where slave's address is to be stored. - * if NULL, then slave is not searched. + * @param b - in, use, out + * Pointer to location where all backend server pointers are stored. + * NULL is not allowed. * - * @param inst - in, use - * Pointer to router instance + * @param router_nservers - in, use + * Number of backend server pointers pointed to by b. + * + * @param max_nslaves - in, use + * Upper limit for the number of slaves. Configuration parameter or default. * - * @return true, if all what what requested found, false if the request - * was not satisfied or was partially satisfied. + * @param session - in, use + * MaxScale session pointer used when connection to backend is established. + * + * @param router - in, use + * Pointer to router instance. Used when server states are qualified. + * + * @return true, if at least one master and one slave was found. * * * @details It is assumed that there is only one master among servers of - * a router instance. As a result, thr first master is always chosen. + * a router instance. As a result, the first master found is chosen. */ -static bool search_backend_servers( - BACKEND** p_master, - BACKEND** p_slave, - ROUTER_INSTANCE* router) -{ - BACKEND* local_backend[BE_COUNT] = {NULL,NULL}; - int i; - bool succp = true; - - /* - * Loop over all the servers and find any that have fewer connections - * than current candidate server. - * - * If a server has less connections than the current candidate it is - * chosen to a new candidate. - * - * If a server has the same number of connections currently as the - * candidate and has had less connections over time than the candidate - * it will also become the new candidate. This has the effect of - * spreading the connections over different servers during periods of - * very low load. - * - * If master is searched for, the first master found is chosen. - */ - for (i = 0; router->servers[i] != NULL; i++) { - BACKEND* be = router->servers[i]; - - if (be != NULL) { - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [search_backend_servers] Examine server " - "%s:%d with %d connections. Status is %d, " - "router->bitvalue is %d", - pthread_self(), - be->backend_server->name, - be->backend_server->port, - be->backend_conn_count, - be->backend_server->status, - router->bitmask))); - } - - if (be != NULL && - SERVER_IS_RUNNING(be->backend_server) && - (be->backend_server->status & router->bitmask) == - router->bitvalue) - { - if (SERVER_IS_SLAVE(be->backend_server) && - p_slave != NULL) - { - /** - * If no candidate set, set first running - * server as an initial candidate server. - */ - if (local_backend[BE_SLAVE] == NULL) - { - local_backend[BE_SLAVE] = be; - } - else if (be->backend_conn_count < - local_backend[BE_SLAVE]->backend_conn_count) - { - /** - * This running server has fewer - * connections, set it as a new - * candidate. - */ - local_backend[BE_SLAVE] = be; - } - else if (be->backend_conn_count == - local_backend[BE_SLAVE]->backend_conn_count && - be->backend_server->stats.n_connections < - local_backend[BE_SLAVE]->backend_server->stats.n_connections) - { - /** - * This running server has the same - * number of connections currently - * as the candidate but has had - * fewer connections over time - * than candidate, set this server - * to candidate. - */ - local_backend[BE_SLAVE] = be; - } - } - else if (p_master != NULL && - local_backend[BE_MASTER] == NULL && - SERVER_IS_MASTER(be->backend_server)) - { - local_backend[BE_MASTER] = be; - } - else if (p_master != NULL && - local_backend[BE_JOINED] == NULL && - SERVER_IS_JOINED(be->backend_server)) - { - local_backend[BE_JOINED] = be; - } - } - } +static bool select_connect_backend_servers( + BACKEND** p_master, + BACKEND** b, + int router_nservers, + int max_nslaves, + SESSION* session, + ROUTER_INSTANCE* router) +{ + bool succp = true; + bool master_found = false; + bool master_connected = false; + int slaves_found = 0; + int slaves_connected = 0; + /** + * Sort the pointer list to servers according to connection counts. As + * a consequence those backends having least connections are in the + * beginning of the list. + */ + qsort((void *)b, (size_t)router_nservers, sizeof(void*), backend_cmp); + + /** + * Choose at least 1+1 (master and slave) and at most 1+max_nslaves + * servers from the sorted list. First master found is selected. + */ + while (*b != NULL && (slaves_connected < max_nslaves || !master_connected)) + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "%lu [select_backend_servers] Examine server " + "%s:%d with %d connections. Status is %d, " + "router->bitvalue is %d", + pthread_self(), + (*b)->backend_server->name, + (*b)->backend_server->port, + (*b)->backend_conn_count, + (*b)->backend_server->status, + router->bitmask))); + + if (SERVER_IS_RUNNING((*b)->backend_server) && + ((*b)->backend_server->status & router->bitmask == + router->bitvalue)) + { + if (slaves_found < max_nslaves && + SERVER_IS_SLAVE((*b)->backend_server)) + { + slaves_found += 1; + + (*b)->be_dcb = dcb_connect( + (*b)->backend_server, + session, + (*b)->backend_server->protocol); + + if ((*b)->be_dcb != NULL) + { + slaves_connected += 1; + /** Increase backend connection counter */ + atomic_add(&(*b)->backend_conn_count, 1); + } + else + { + /* handle connect error */ + } + } + else if (!master_connected && + (SERVER_IS_MASTER((*b)->backend_server) || + SERVER_IS_JOINED((*b)->backend_server))) + { + master_found = true; + + (*b)->be_dcb = dcb_connect( + (*b)->backend_server, + session, + (*b)->backend_server->protocol); + + if ((*b)->be_dcb != NULL) + { + master_connected = true; + *p_master = *b; + /** Increase backend connection counter */ + atomic_add(&(*b)->backend_conn_count, 1); + } + else + { + /* handle connect error */ + } + } + } + b++; + } /*< while */ + + if (master_connected && slaves_connected > 0 && slaves_connected <= max_nslaves) + { + succp = true; + } + else + { + /** disconnect and clean up */ + } +#if 0 if (router->bitvalue != 0 && p_master != NULL && local_backend[BE_JOINED] == NULL) @@ -1143,49 +1245,51 @@ static bool search_backend_servers( if (p_slave != NULL && local_backend[BE_SLAVE] == NULL) { succp = false; LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Couldn't find suitable Slave from %d " - "candidates.", - i))); + LOGFILE_ERROR, + "Error : Couldn't find suitable Slave from %d " + "candidates.", + i))); } if (p_master != NULL && local_backend[BE_MASTER] == NULL) { succp = false; LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Couldn't find suitable Master from %d " - "candidates.", - i))); + LOGFILE_ERROR, + "Error : Couldn't find suitable Master from %d " + "candidates.", + i))); } - + if (local_backend[BE_SLAVE] != NULL) { *p_slave = local_backend[BE_SLAVE]; LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [readwritesplit:search_backend_servers] Selected " - "Slave %s:%d from %d candidates.", - pthread_self(), - local_backend[BE_SLAVE]->backend_server->name, - local_backend[BE_SLAVE]->backend_server->port, - i))); + LOGFILE_TRACE, + "%lu [readwritesplit:search_backend_servers] Selected " + "Slave %s:%d from %d candidates.", + pthread_self(), + local_backend[BE_SLAVE]->backend_server->name, + local_backend[BE_SLAVE]->backend_server->port, + i))); } if (local_backend[BE_MASTER] != NULL) { *p_master = local_backend[BE_MASTER]; LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [readwritesplit:search_backend_servers] Selected " - "Master %s:%d " - "from %d candidates.", - pthread_self(), - local_backend[BE_MASTER]->backend_server->name, - local_backend[BE_MASTER]->backend_server->port, - i))); + LOGFILE_TRACE, + "%lu [readwritesplit:search_backend_servers] Selected " + "Master %s:%d " + "from %d candidates.", + pthread_self(), + local_backend[BE_MASTER]->backend_server->name, + local_backend[BE_MASTER]->backend_server->port, + i))); } +#endif return_succp: return succp; } + /** * Create a generic router session property strcture. */ @@ -1397,7 +1501,7 @@ static GWBUF* sescmd_cursor_process_replies( packet = (uint8_t *)GWBUF_DATA(replybuf); packetlen = packet[0]+packet[1]*256+packet[2]*256*256; replybuf = gwbuf_consume(replybuf, packetlen+headerlen); - +/* LOGIF(LT, (skygw_log_write_flush( LOGFILE_TRACE, "%lu [sescmd_cursor_process_replies] cmd %p " @@ -1407,11 +1511,13 @@ static GWBUF* sescmd_cursor_process_replies( scmd, packetlen+headerlen, STRBETYPE(scur->scmd_cur_be_type)))); + */ } else { /** Mark the rest session commands as replied */ scmd->my_sescmd_is_replied = true; + /* LOGIF(LT, (skygw_log_write_flush( LOGFILE_TRACE, "%lu [sescmd_cursor_process_replies] Marked " @@ -1420,6 +1526,7 @@ static GWBUF* sescmd_cursor_process_replies( pthread_self(), scmd, STRBETYPE(scur->scmd_cur_be_type)))); + */ } if (sescmd_cursor_next(scur)) @@ -1459,17 +1566,6 @@ static mysql_sescmd_t* sescmd_cursor_get_command( return scmd; } -/** router must be locked */ -static sescmd_cursor_t* rses_get_sescmd_cursor( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type) -{ - CHK_CLIENT_RSES(rses); - ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); - - return &rses->rses_cursor[be_type]; -} - /** router must be locked */ static bool sescmd_cursor_is_active( sescmd_cursor_t* sescmd_cursor) @@ -1519,24 +1615,26 @@ static GWBUF* sescmd_cursor_clone_querybuf( * Router session must be locked. */ static bool execute_sescmd_in_backend( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type) + BACKEND* backend) { DCB* dcb; bool succp = true; int rc = 0; sescmd_cursor_t* scur; - dcb = rses->rses_dcb[be_type]; - + if (backend->be_dcb == NULL) + { + goto return_succp; + } + dcb = backend->be_dcb; + CHK_DCB(dcb); - CHK_CLIENT_RSES(rses); - ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); + CHK_BACKEND(backend); /** * Get cursor pointer and copy of command buffer to cursor. */ - scur = rses_get_sescmd_cursor(rses, be_type); + scur = &backend->be_sescmd_cursor; /** Return if there are no pending ses commands */ if (sescmd_cursor_get_command(scur) == NULL) @@ -1550,11 +1648,12 @@ static bool execute_sescmd_in_backend( /** Cursor is left active when function returns. */ sescmd_cursor_set_active(scur, true); } + /* LOGIF(LT, tracelog_routed_query(rses, "execute_sescmd_in_backend", dcb, sescmd_cursor_clone_querybuf(scur))); - + */ switch (scur->scmd_cur_cmd->my_sescmd_packet_type) { case COM_CHANGE_USER: rc = dcb->func.auth( @@ -1588,6 +1687,7 @@ return_succp: return succp; } + /** * Moves cursor to next property and copied address of its sescmd to cursor. * Current propery must be non-null. @@ -1665,7 +1765,7 @@ static rses_property_t* mysql_sescmd_get_property( return scmd->my_sescmd_prop; } - +#if 0 static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, char* funcname, @@ -1726,7 +1826,7 @@ static void tracelog_routed_query( } gwbuf_free(buf); } - +#endif /** * Return rc, rc < 0 if router session is closed. rc == 0 if there are no * capabilities specified, rc > 0 when there are capabilities. @@ -1774,17 +1874,17 @@ static bool route_session_write( DCB* master_dcb; DCB* slave_dcb; rses_property_t* prop; + BACKEND** b; - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - CHK_DCB(master_dcb); - CHK_DCB(slave_dcb); LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Session write, query type\t%s, packet type %s, " "routing to all servers.", STRQTYPE(qtype), STRPACKETTYPE(packet_type)))); + + b = router_cli_ses->rses_backend; + /** * COM_QUIT is one-way message. Server doesn't respond to that. * Therefore reply processing is unnecessary and session @@ -1796,13 +1896,24 @@ static bool route_session_write( int rc; int rc2; - rc = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf)); - rc2 = slave_dcb->func.write(slave_dcb, querybuf); - - if (rc == 1 && rc == rc2) + succp = true; + + while (*b != NULL) { - succp = true; + DCB* dcb = (*b)->be_dcb; + + if (dcb != NULL) + { + rc = dcb->func.write(dcb, gwbuf_clone(querybuf)); + + if (rc != 1) + { + succp = false; + } + } + b++; } + gwbuf_free(querybuf); goto return_succp; } prop = rses_property_init(RSES_PROP_TYPE_SESCMD); @@ -1823,23 +1934,18 @@ static bool route_session_write( /** Add sescmd property to router client session */ rses_property_add(router_cli_ses, prop); - /** Execute session command in master */ - succp = execute_sescmd_in_backend(router_cli_ses, BE_MASTER); - - if (!succp) + while (*b != NULL) { - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); - goto return_succp; + succp = execute_sescmd_in_backend((*b)); + + if (!succp) + { + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); + goto return_succp; + } + b++; } - /** Execute session command in slave */ - succp = execute_sescmd_in_backend(router_cli_ses, BE_SLAVE); - if (!succp) - { - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); - goto return_succp; - } /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); @@ -1847,4 +1953,4 @@ static bool route_session_write( return_succp: return succp; -} \ No newline at end of file +} diff --git a/utils/skygw_debug.h b/utils/skygw_debug.h index 8c11d9782..faaf268d4 100644 --- a/utils/skygw_debug.h +++ b/utils/skygw_debug.h @@ -119,7 +119,9 @@ typedef enum skygw_chk_t { CHK_NUM_SESSION, CHK_NUM_ROUTER_SES, CHK_NUM_MY_SESCMD, - CHK_NUM_ROUTER_PROPERTY + CHK_NUM_ROUTER_PROPERTY, + CHK_NUM_SESCMD_CUR, + CHK_NUM_BACKEND } skygw_chk_t; # define STRBOOL(b) ((b) ? "true" : "false") @@ -446,6 +448,17 @@ typedef enum skygw_chk_t { "Session command has invalid check fields"); \ } +#define CHK_SESCMD_CUR(c) { \ + ss_info_dassert((c)->scmd_cur_chk_top == CHK_NUM_SESCMD_CUR && \ + (c)->scmd_cur_chk_tail == CHK_NUM_SESCMD_CUR, \ + "Session command cursor has invalid check fields"); \ + } + +#define CHK_BACKEND(b) { \ + ss_info_dassert((b)->be_chk_top == CHK_NUM_BACKEND && \ + (b)->be_chk_tail == CHK_NUM_BACKEND, \ + "BACKEND has invalid check fields"); \ +} #if defined(SS_DEBUG) bool conn_open[10240];