diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index e4977f55e..a203a9865 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -70,12 +70,46 @@ static void clientReply( DCB* backend_dcb); static uint8_t getCapabilities (ROUTER* inst, void* router_session); +int bref_cmp_global_conn( + const void* bref1, + const void* bref2); -static bool search_backend_servers( - BACKEND** p_master, - BACKEND** p_slave, +int bref_cmp_router_conn( + const void* bref1, + const void* bref2); + +int bref_cmp_behind_master( + const void* bref1, + const void* bref2); + +int (*criteria_cmpfun[LAST_CRITERIA])(const void*, const void*)= +{ + NULL, + bref_cmp_global_conn, + bref_cmp_router_conn, + bref_cmp_behind_master +}; + +static bool select_connect_backend_servers( + backend_ref_t** p_master_ref, + backend_ref_t* backend_ref, + int router_nservers, + int max_nslaves, + select_criteria_t select_criteria, + SESSION* session, ROUTER_INSTANCE* router); +static bool get_dcb( + DCB** dcb, + ROUTER_CLIENT_SES* rses, + backend_type_t btype); + +static void rwsplit_process_options( + ROUTER_INSTANCE* router, + char** options); + + + static ROUTER_OBJECT MyObject = { createInstance, newSession, @@ -118,13 +152,8 @@ static void rses_property_done( static mysql_sescmd_t* rses_property_get_sescmd( rses_property_t* prop); -static sescmd_cursor_t* rses_get_sescmd_cursor( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type); - static bool execute_sescmd_in_backend( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type); + backend_ref_t* backend_ref); static void sescmd_cursor_set_active( sescmd_cursor_t* sescmd_cursor, @@ -147,14 +176,10 @@ static GWBUF* sescmd_cursor_process_replies( GWBUF* replybuf, sescmd_cursor_t* scur); -static bool cont_exec_sescmd_in_backend( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type); - static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, char* funcname, - DCB* dcb, + backend_ref_t* bref, GWBUF* buf); static bool route_session_write( @@ -164,6 +189,10 @@ static bool route_session_write( unsigned char packet_type, skygw_query_type_t qtype); +static void refreshInstance( + ROUTER_INSTANCE* router, + CONFIG_PARAMETER* param); + static SPINLOCK instlock; static ROUTER_INSTANCE* instances; @@ -205,6 +234,35 @@ ROUTER_OBJECT* GetModuleObject() return &MyObject; } +static void refreshInstance( + ROUTER_INSTANCE* router, + CONFIG_PARAMETER* param) +{ + config_param_type_t paramtype; + + paramtype = config_get_paramtype(param); + + if (paramtype == COUNT_TYPE) + { + if (strncmp(param->name, "max_slave_connections", MAX_PARAM_LEN) == 0) + { + router->rwsplit_config.rw_max_slave_conn_percent = 0; + router->rwsplit_config.rw_max_slave_conn_count = + config_get_valint(param, NULL, paramtype); + } + } + else if (paramtype == PERCENT_TYPE) + { + if (strncmp(param->name, "max_slave_connections", MAX_PARAM_LEN) == 0) + { + router->rwsplit_config.rw_max_slave_conn_count = 0; + router->rwsplit_config.rw_max_slave_conn_percent = + config_get_valint(param, NULL, paramtype); + } + } +} + + /** * Create an instance of read/write statemtn router within the MaxScale. * @@ -220,8 +278,9 @@ static ROUTER* createInstance( { ROUTER_INSTANCE* router; SERVER* server; - int n; + int nservers; int i; + CONFIG_PARAMETER* param; if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { return NULL; @@ -231,49 +290,50 @@ static ROUTER* createInstance( /** Calculate number of servers */ server = service->databases; + nservers = 0; - for (n=0; server != NULL; server=server->nextdb) { - n++; + while (server != NULL) + { + nservers++; + server=server->nextdb; } - router->servers = (BACKEND **)calloc(n + 1, sizeof(BACKEND *)); + router->servers = (BACKEND **)calloc(nservers + 1, sizeof(BACKEND *)); if (router->servers == NULL) { free(router); return NULL; } - - if (options != NULL) - { - LOGIF(LM, (skygw_log_write_flush( - LOGFILE_MESSAGE, - "Router options supplied to read/write statement router " - "module but none are supported. The options will be " - "ignored."))); - } /** * Create an array of the backend servers in the router structure to * maintain a count of the number of connections to each * backend server. */ server = service->databases; - n = 0; + nservers= 0; + while (server != NULL) { - if ((router->servers[n] = malloc(sizeof(BACKEND))) == NULL) + if ((router->servers[nservers] = malloc(sizeof(BACKEND))) == NULL) { - for (i = 0; i < n; i++) { + /** clean up */ + for (i = 0; i < nservers; i++) { free(router->servers[i]); } free(router->servers); free(router); return NULL; } - router->servers[n]->backend_server = server; - router->servers[n]->backend_conn_count = 0; - n += 1; + router->servers[nservers]->backend_server = server; + router->servers[nservers]->backend_conn_count = 0; + router->servers[nservers]->be_valid = false; +#if defined(SS_DEBUG) + router->servers[nservers]->be_chk_top = CHK_NUM_BACKEND; + router->servers[nservers]->be_chk_tail = CHK_NUM_BACKEND; +#endif + nservers += 1; server = server->nextdb; } - router->servers[n] = NULL; + router->servers[nservers] = NULL; /** * vraa : is this necessary for readwritesplit ? @@ -288,25 +348,32 @@ static ROUTER* createInstance( router->bitvalue = 0; if (options) { - for (i = 0; options[i]; i++) - { - if (!strcasecmp(options[i], "synced")) - { - router->bitmask |= (SERVER_JOINED); - router->bitvalue |= SERVER_JOINED; - } - else - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Warning : Unsupported " - "router option \"%s\" " - "for readwritesplit router.", - options[i]))); - } - } + rwsplit_process_options(router, options); } + /** + * Set default value for max_slave_connections and for slave selection + * criteria. If parameter is set in config file max_slave_connections + * will be overwritten. + */ + router->rwsplit_config.rw_max_slave_conn_count = CONFIG_MAX_SLAVE_CONN; + + if (router->rwsplit_config.rw_slave_select_criteria == UNDEFINED_CRITERIA) + { + router->rwsplit_config.rw_slave_select_criteria = DEFAULT_CRITERIA; + } + /** + * Copy all config parameters from service to router instance. + * Finally, copy version number to indicate that configs match. + */ + param = config_get_param(service->svc_config_param, "max_slave_connections"); + + if (param != NULL) + { + refreshInstance(router, param); + router->rwsplit_version = service->svc_config_version; + } + /** * We have completed the creation of the router data, so now * insert this router into the linked list of routers * that have been created with this module. @@ -333,93 +400,192 @@ static void* newSession( ROUTER* router_inst, SESSION* session) { - BACKEND* local_backend[BE_COUNT]; - ROUTER_CLIENT_SES* client_rses; + backend_ref_t* backend_ref; /*< array of backend references (DCB,BACKEND,cursor) */ + backend_ref_t* master_ref = NULL; /*< pointer to selected master */ + BACKEND** b; + ROUTER_CLIENT_SES* client_rses = NULL; ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst; bool succp; + int router_nservers = 0; /*< # of servers in total */ + int max_nslaves; /*< max # of slaves used in this session */ + int conf_max_nslaves; /*< value from configuration file */ + int i; + const int min_nservers = 1; /*< hard-coded for now */ + static uint64_t router_client_ses_seq; /*< ID for client session */ - client_rses = - (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); + client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); if (client_rses == NULL) { ss_dassert(false); - return NULL; + goto return_rses; } - memset(local_backend, 0, BE_COUNT*sizeof(void*)); - spinlock_init(&client_rses->rses_lock); #if defined(SS_DEBUG) client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; -#endif - /** store pointers to sescmd list to both cursors */ - client_rses->rses_cursor[BE_MASTER].scmd_cur_rses = client_rses; - client_rses->rses_cursor[BE_MASTER].scmd_cur_active = false; - client_rses->rses_cursor[BE_MASTER].scmd_cur_ptr_property = - &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - client_rses->rses_cursor[BE_MASTER].scmd_cur_cmd = NULL; - client_rses->rses_cursor[BE_MASTER].scmd_cur_be_type = BE_MASTER; - - client_rses->rses_cursor[BE_SLAVE].scmd_cur_rses = client_rses; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_active = false; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_ptr_property = - &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_cmd = NULL; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_be_type = BE_SLAVE; - +#endif /** - * Find a backend server to connect to. This is the extent of the - * load balancing algorithm we need to implement for this simple - * connection router. + * If service config has been changed, reload config from service to + * router instance first. */ - succp = search_backend_servers(&local_backend[BE_MASTER], - &local_backend[BE_SLAVE], - router); + spinlock_acquire(&router->lock); + if (router->service->svc_config_version > router->rwsplit_version) + { + CONFIG_PARAMETER* param = router->service->svc_config_param; + + while (param != NULL) + { + refreshInstance(router, param); + param = param->next; + } + router->rwsplit_version = router->service->svc_config_version; + /** Read options */ + rwsplit_process_options(router, router->service->routerOptions); + } + /** Copy config struct from router instance */ + client_rses->rses_config = router->rwsplit_config; + /** Create ID for the new client (router_client_ses) session */ + client_rses->rses_id = router_client_ses_seq += 1; + + spinlock_release(&router->lock); + /** + * Set defaults to session variables. + */ + client_rses->rses_autocommit_enabled = true; + client_rses->rses_transaction_active = false; + + /** count servers */ + b = router->servers; + while (*(b++) != NULL) router_nservers++; + + /** With too few servers session is not created */ + if (router_nservers < min_nservers || + MAX(client_rses->rses_config.rw_max_slave_conn_count, + (router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100) + < min_nservers) + { + if (router_nservers < min_nservers) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to start %s service. There are " + "too few backend servers available. Found %d " + "when %d is required.", + router->service->name, + router_nservers, + min_nservers))); + } + else + { + double pct = client_rses->rses_config.rw_max_slave_conn_percent/100; + double nservers = (double)router_nservers*pct; - /** Both Master and Slave must be found */ - if (!succp) { + if (client_rses->rses_config.rw_max_slave_conn_count < + min_nservers) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to start %s service. There are " + "too few backend servers configured in " + "MaxScale.cnf. Found %d when %d is required.", + router->service->name, + client_rses->rses_config.rw_max_slave_conn_count, + min_nservers))); + } + if (nservers < min_nservers) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to start %s service. There are " + "too few backend servers configured in " + "MaxScale.cnf. Found %d%% when at least %.0f%% " + "would be required.", + router->service->name, + client_rses->rses_config.rw_max_slave_conn_percent, + min_nservers/(((double)router_nservers)/100)))); + } + } free(client_rses); - return NULL; + client_rses = NULL; + goto return_rses; } /** - * Open the slave connection. + * Create backend reference objects for this session. */ - client_rses->rses_dcb[BE_SLAVE] = dcb_connect( - local_backend[BE_SLAVE]->backend_server, - session, - local_backend[BE_SLAVE]->backend_server->protocol); + backend_ref = (backend_ref_t *)calloc (1, router_nservers*sizeof(backend_ref_t)); - if (client_rses->rses_dcb[BE_SLAVE] == NULL) { - ss_dassert(session->refcount == 1); + if (backend_ref == NULL) + { + /** log this */ free(client_rses); - return NULL; + free(backend_ref); + client_rses = NULL; + goto return_rses; } /** - * Open the master connection. + * Initialize backend references with BACKEND ptr. + * Initialize session command cursors for each backend reference. */ - client_rses->rses_dcb[BE_MASTER] = dcb_connect( - local_backend[BE_MASTER]->backend_server, - session, - local_backend[BE_MASTER]->backend_server->protocol); - if (client_rses->rses_dcb[BE_MASTER] == NULL) + for (i=0; i< router_nservers; i++) { - /** Close slave connection first. */ - client_rses->rses_dcb[BE_SLAVE]->func.close(client_rses->rses_dcb[BE_SLAVE]); - free(client_rses); - return NULL; - } - /** - * We now have a master and a slave server with the least connections. - * Bump the connection counts for these servers. +#if defined(SS_DEBUG) + backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF; + backend_ref[i].bref_chk_tail = CHK_NUM_BACKEND_REF; + backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; + backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; +#endif + backend_ref[i].bref_backend = router->servers[i]; + /** store pointers to sescmd list to both cursors */ + backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses; + backend_ref[i].bref_sescmd_cur.scmd_cur_active = false; + backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property = + &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; + backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL; + } + /** + * Find out the number of read backend servers. + * Depending on the configuration value type, either copy direct count + * of slave connections or calculate the count from percentage value. */ - atomic_add(&local_backend[BE_SLAVE]->backend_conn_count, 1); - atomic_add(&local_backend[BE_MASTER]->backend_conn_count, 1); + if (client_rses->rses_config.rw_max_slave_conn_count > 0) + { + conf_max_nslaves = client_rses->rses_config.rw_max_slave_conn_count; + } + else + { + conf_max_nslaves = + (router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100; + } + max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); + + spinlock_init(&client_rses->rses_lock); + client_rses->rses_backend_ref = backend_ref; - client_rses->rses_backend[BE_SLAVE] = local_backend[BE_SLAVE]; - client_rses->rses_backend[BE_MASTER] = local_backend[BE_MASTER]; + /** + * Find a backend servers to connect to. + */ + succp = select_connect_backend_servers(&master_ref, + backend_ref, + router_nservers, + max_nslaves, + client_rses->rses_config.rw_slave_select_criteria, + session, + router); + + /** Both Master and at least 1 slave must be found */ + if (!succp) { + free(client_rses->rses_backend_ref); + free(client_rses); + client_rses = NULL; + goto return_rses; + } + /** Copy backend pointers to router session. */ + client_rses->rses_master_ref = master_ref; + client_rses->rses_backend_ref = backend_ref; + client_rses->rses_nbackends = router_nservers; /*< # of backend servers */ + client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT; router->stats.n_sessions += 1; - client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT; /** * Version is bigger than zero once initialized. */ @@ -433,8 +599,13 @@ static void* newSession( router->connections = client_rses; spinlock_release(&router->lock); +return_rses: +#if defined(SS_DEBUG) + if (client_rses != NULL) + { CHK_CLIENT_RSES(client_rses); - + } +#endif return (void *)client_rses; } @@ -452,41 +623,53 @@ static void closeSession( void* router_session) { ROUTER_CLIENT_SES* router_cli_ses; - DCB* slave_dcb; - DCB* master_dcb; + backend_ref_t* backend_ref; router_cli_ses = (ROUTER_CLIENT_SES *)router_session; CHK_CLIENT_RSES(router_cli_ses); + + backend_ref = router_cli_ses->rses_backend_ref; /** * Lock router client session for secure read and update. */ - if (rses_begin_locked_router_action(router_cli_ses)) + if (!router_cli_ses->rses_closed && + rses_begin_locked_router_action(router_cli_ses)) { - /** decrease server current connection counters */ - atomic_add(&router_cli_ses->rses_backend[BE_SLAVE]->backend_server->stats.n_current, -1); - atomic_add(&router_cli_ses->rses_backend[BE_MASTER]->backend_server->stats.n_current, -1); - - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - router_cli_ses->rses_dcb[BE_SLAVE] = NULL; - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - router_cli_ses->rses_dcb[BE_MASTER] = NULL; + int i = 0; + /** + * session must be moved to SESSION_STATE_STOPPING state before + * router session is closed. + */ +#if defined(SS_DEBUG) + SESSION* ses = get_session_by_router_ses((void*)router_cli_ses); - router_cli_ses->rses_closed = true; - /** Unlock */ - rses_end_locked_router_action(router_cli_ses); + ss_dassert(ses != NULL); + ss_dassert(ses->state == SESSION_STATE_STOPPING); +#endif /** - * Close the backend server connections + * This sets router closed. Nobody is allowed to use router + * whithout checking this first. */ - if (slave_dcb != NULL) { - CHK_DCB(slave_dcb); - slave_dcb->func.close(slave_dcb); + router_cli_ses->rses_closed = true; + + for (i=0; irses_nbackends; i++) + { + DCB* dcb = backend_ref[i].bref_dcb; + + /** Close those which had been connected */ + if (dcb != NULL) + { + CHK_DCB(dcb); + backend_ref[i].bref_dcb = NULL; /*< prevent new uses of DCB */ + dcb->func.close(dcb); + /** decrease server current connection counters */ + atomic_add(&backend_ref[i].bref_backend->backend_server->stats.n_current, -1); + atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); } - - if (master_dcb != NULL) { - master_dcb->func.close(master_dcb); - CHK_DCB(master_dcb); } + /** Unlock */ + rses_end_locked_router_action(router_cli_ses); } } @@ -497,13 +680,21 @@ static void freeSession( ROUTER_CLIENT_SES* router_cli_ses; ROUTER_INSTANCE* router; int i; + backend_ref_t* backend_ref; router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session; router = (ROUTER_INSTANCE *)router_instance; + backend_ref = router_cli_ses->rses_backend_ref; - atomic_add(&router_cli_ses->rses_backend[BE_SLAVE]->backend_conn_count, -1); - atomic_add(&router_cli_ses->rses_backend[BE_MASTER]->backend_conn_count, -1); - + for (i=0; irses_nbackends; i++) + { + if (backend_ref[i].bref_dcb == NULL) + { + continue; + } + ss_dassert(backend_ref[i].bref_backend->backend_conn_count > 0); + atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); + } spinlock_acquire(&router->lock); if (router->connections == router_cli_ses) { @@ -542,17 +733,106 @@ static void freeSession( * all the memory and other resources associated * to the client session. */ + free(router_cli_ses->rses_backend_ref); free(router_cli_ses); return; } + +static bool get_dcb( + DCB** p_dcb, + ROUTER_CLIENT_SES* rses, + backend_type_t btype) +{ + backend_ref_t* backend_ref; + int smallest_nconn = -1; + int i; + bool succp = false; + + CHK_CLIENT_RSES(rses); + ss_dassert(p_dcb != NULL && *(p_dcb) == NULL); + + if (p_dcb == NULL) + { + goto return_succp; + } + backend_ref = rses->rses_backend_ref; + + if (btype == BE_SLAVE) + { + for (i=0; irses_nbackends; i++) + { + BACKEND* b = backend_ref[i].bref_backend; + + if (backend_ref[i].bref_dcb != NULL && + SERVER_IS_SLAVE(b->backend_server) && + (smallest_nconn == -1 || + b->backend_conn_count < smallest_nconn)) + { + *p_dcb = backend_ref[i].bref_dcb; + smallest_nconn = b->backend_conn_count; + succp = true; + } + } + + if (!succp) + { + backend_ref = rses->rses_master_ref; + + if (backend_ref->bref_dcb != NULL) + { + *p_dcb = backend_ref->bref_dcb; + succp = true; + + ss_dassert( + SERVER_IS_MASTER(backend_ref->bref_backend->backend_server) && + smallest_nconn == -1); + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Warning : No slaves connected nor " + "available. Choosing master %s:%d " + "instead.", + backend_ref->bref_backend->backend_server->name, + backend_ref->bref_backend->backend_server->port))); + } + } + ss_dassert(succp); + } + else if (btype == BE_MASTER || BE_JOINED) + { + for (i=0; irses_nbackends; i++) + { + BACKEND* b = backend_ref[i].bref_backend; + + if (backend_ref[i].bref_dcb != NULL && + (SERVER_IS_MASTER(b->backend_server) || + SERVER_IS_JOINED(b->backend_server))) + { + *p_dcb = backend_ref[i].bref_dcb; + succp = true; + goto return_succp; + } + } + } +return_succp: + return succp; +} + /** - * It is assumed that whole query comes in a single gwbuf instead of linked list. - * + * The main routing entry, this is called with every packet that is + * received and has to be forwarded to the backend database. * - * @param instance The router instance - * @param router_session The session associated with the client - * @param querybuf Gateway buffer queue with the packets received + * The routeQuery will make the routing decision based on the contents + * of the instance, session and the query itself in the queue. The + * data in the queue may not represent a complete query, it represents + * the data that has been received. The query router itself is responsible + * for buffering the partial query, a later call to the query router will + * contain the remainder, or part thereof of the query. + * + * @param instance The query router instance + * @param session The session associated with the client + * @param queue Gateway buffer queue with the packets received * * @return The number of queries forwarded */ @@ -572,7 +852,7 @@ static int routeQuery( DCB* slave_dcb = NULL; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; - bool rses_is_closed; + bool rses_is_closed = false; size_t len; CHK_CLIENT_RSES(router_cli_ses); @@ -582,27 +862,13 @@ static int routeQuery( { rses_is_closed = true; } - else - { - /*< Lock router client session for secure read of DCBs */ - rses_is_closed = - !(rses_begin_locked_router_action(router_cli_ses)); - } - - if (!rses_is_closed) - { - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - /** unlock */ - rses_end_locked_router_action(router_cli_ses); - } - packet = GWBUF_DATA(querybuf); packet_type = packet[4]; - if (rses_is_closed || (master_dcb == NULL && slave_dcb == NULL)) + if (rses_is_closed) { - LOGIF(LE, (skygw_log_write_flush( + LOGIF(LE, + (skygw_log_write_flush( LOGFILE_ERROR, "Error: Failed to route %s:%s:\"%s\" to " "backend server. %s.", @@ -617,6 +883,9 @@ static int routeQuery( inst->stats.n_queries++; startpos = (char *)&packet[5]; + master_dcb = router_cli_ses->rses_master_ref->bref_dcb; + CHK_DCB(master_dcb); + switch(packet_type) { case COM_QUIT: /**< 1 QUIT will close all sessions */ case COM_INIT_DB: /**< 2 DDL must go to the master */ @@ -642,10 +911,11 @@ static int routeQuery( memset(&querystr[len], 0, 1); // querystr = (char *)GWBUF_DATA(plainsqlbuf); /* - querystr = master_dcb->func.getquerystr( - (void *) gwbuf_clone(querybuf), - &querystr_is_copy); + * querystr = master_dcb->func.getquerystr( + * (void *) gwbuf_clone(querybuf), + * &querystr_is_copy); */ + qtype = skygw_query_classifier_get_type(querystr, 0); break; @@ -660,13 +930,23 @@ static int routeQuery( default: break; } /**< switch by packet type */ - +#if 0 LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "String\t\"%s\"", querystr == NULL ? "(empty)" : querystr))); LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Packet type\t%s", STRPACKETTYPE(packet_type)))); +#endif +#if defined(AUTOCOMMIT_OPT) + if ((QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT) && + !router_cli_ses->rses_autocommit_enabled) || + (QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) && + router_cli_ses->rses_autocommit_enabled)) + { + /** reply directly to client */ + } +#endif /** * If autocommit is disabled or transaction is explicitly started * transaction becomes active and master gets all statements until @@ -725,20 +1005,46 @@ static int routeQuery( } else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) && !router_cli_ses->rses_transaction_active) - - { + { + bool succp; + LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, - "Read-only query, routing to Slave."))); + "[%s.%d]\tRead-only query, routing to Slave.", + inst->service->name, + router_cli_ses->rses_id))); ss_dassert(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ)); - ret = slave_dcb->func.write(slave_dcb, querybuf); - atomic_add(&inst->stats.n_slave, 1); + succp = get_dcb(&slave_dcb, router_cli_ses, BE_SLAVE); + if (succp) + { + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + goto return_ret; + } + + if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1) + { + atomic_add(&inst->stats.n_slave, 1); + } + else + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Routing query \"%s\" failed.", + querystr))); + } + rses_end_locked_router_action(router_cli_ses); + } + ss_dassert(succp); goto return_ret; } else - { + { + bool succp = true; + if (LOG_IS_ENABLED(LOGFILE_TRACE)) { if (router_cli_ses->rses_transaction_active) /*< all to master */ @@ -755,12 +1061,29 @@ static int routeQuery( "routing to Master."))); } } - ret = master_dcb->func.write(master_dcb, querybuf); - atomic_add(&inst->stats.n_master, 1); + if (master_dcb == NULL) + { + succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER); + } + if (succp) + { + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + goto return_ret; + } + + if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1) + { + atomic_add(&inst->stats.n_master, 1); + } + rses_end_locked_router_action(router_cli_ses); + } + ss_dassert(succp); + ss_dassert(ret == 1); goto return_ret; } - return_ret: if (plainsqlbuf != NULL) { @@ -897,12 +1220,11 @@ static void clientReply( GWBUF* writebuf, DCB* backend_dcb) { - DCB* master_dcb; - DCB* slave_dcb; DCB* client_dcb; ROUTER_CLIENT_SES* router_cli_ses; sescmd_cursor_t* scur = NULL; - backend_type_t be_type = BE_UNDEFINED; + backend_ref_t* backend_ref; + int i; router_cli_ses = (ROUTER_CLIENT_SES *)router_session; CHK_CLIENT_RSES(router_cli_ses); @@ -918,9 +1240,6 @@ static void clientReply( GWBUF_LENGTH(writebuf))) != NULL); goto lock_failed; } - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - /** Holding lock ensures that router session remains open */ ss_dassert(backend_dcb->session != NULL); client_dcb = backend_dcb->session->client; @@ -942,29 +1261,31 @@ static void clientReply( writebuf, GWBUF_LENGTH(writebuf))) != NULL); /** Log that client was closed before reply */ - return; + goto lock_failed; } - - if (backend_dcb == master_dcb) - { - be_type = BE_MASTER; - } - else if (backend_dcb == slave_dcb) - { - be_type = BE_SLAVE; - } - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "reply_by_statement", - backend_dcb, - gwbuf_clone(writebuf))); /** Lock router session */ if (!rses_begin_locked_router_action(router_cli_ses)) { /** Log to debug that router was closed */ goto lock_failed; } + backend_ref = router_cli_ses->rses_backend_ref; - scur = rses_get_sescmd_cursor(router_cli_ses, be_type); + /** find backend_dcb's corresponding BACKEND */ + i = 0; + while (irses_nbackends && + backend_ref[i].bref_dcb != backend_dcb) + { + i++; + } + ss_dassert(backend_ref[i].bref_dcb == backend_dcb); + + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "reply_by_statement", + &backend_ref[i], + gwbuf_clone(writebuf))); + + scur = &backend_ref[i].bref_sescmd_cur; /** * Active cursor means that reply is from session command * execution. Majority of the time there are no session commands @@ -997,181 +1318,420 @@ lock_failed: return; } + +int bref_cmp_router_conn( + const void* bref1, + const void* bref2) +{ + BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend; + BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend; + + return ((b1->backend_conn_count < b2->backend_conn_count) ? -1 : + ((b1->backend_conn_count > b2->backend_conn_count) ? 1 : 0)); +} + +int bref_cmp_global_conn( + const void* bref1, + const void* bref2) +{ + BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend; + BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend; + + return ((b1->backend_server->stats.n_current < b2->backend_server->stats.n_current) ? -1 : + ((b1->backend_server->stats.n_current > b2->backend_server->stats.n_current) ? 1 : 0)); +} + + +int bref_cmp_behind_master( + const void* bref1, + const void* bref2) +{ + return 1; +} + /** - * @node Search suitable backend server from those of router instance. + * @node Search suitable backend servers from those of router instance. * * Parameters: - * @param p_master - in, use, out - * Pointer to location where master's address is to be stored. - * If NULL, then master is not searched. + * @param p_master_ref - in, use, out + * Pointer to location where master's backend reference is to be stored. + * NULL is not allowed. * - * @param p_slave - in, use, out - * Pointer to location where slave's address is to be stored. - * if NULL, then slave is not searched. + * @param backend_ref - in, use, out + * Pointer to backend server reference object array. + * NULL is not allowed. * - * @param inst - in, use - * Pointer to router instance + * @param router_nservers - in, use + * Number of backend server pointers pointed to by b. * - * @return true, if all what what requested found, false if the request - * was not satisfied or was partially satisfied. + * @param max_nslaves - in, use + * Upper limit for the number of slaves. Configuration parameter or default. + * + * @param session - in, use + * MaxScale session pointer used when connection to backend is established. + * + * @param router - in, use + * Pointer to router instance. Used when server states are qualified. + * + * @return true, if at least one master and one slave was found. * * * @details It is assumed that there is only one master among servers of - * a router instance. As a result, thr first master is always chosen. + * a router instance. As a result, the first master found is chosen. */ -static bool search_backend_servers( - BACKEND** p_master, - BACKEND** p_slave, +static bool select_connect_backend_servers( + backend_ref_t** p_master_ref, + backend_ref_t* backend_ref, + int router_nservers, + int max_nslaves, + select_criteria_t select_criteria, + SESSION* session, ROUTER_INSTANCE* router) { - BACKEND* local_backend[BE_COUNT] = {NULL,NULL}; + bool succp = true; + bool master_found = false; + bool master_connected = false; + int slaves_found = 0; + int slaves_connected = 0; int i; - bool succp = true; - - /* - * Loop over all the servers and find any that have fewer connections - * than current candidate server. - * - * If a server has less connections than the current candidate it is - * chosen to a new candidate. - * - * If a server has the same number of connections currently as the - * candidate and has had less connections over time than the candidate - * it will also become the new candidate. This has the effect of - * spreading the connections over different servers during periods of - * very low load. - * - * If master is searched for, the first master found is chosen. + const int min_nslaves = 0; /*< not configurable at the time */ + bool is_synced_master; + int (*p)(const void *, const void *); + + if (p_master_ref == NULL || backend_ref == NULL) + { + ss_dassert(FALSE); + succp = false; + goto return_succp; + } + /** Check slave selection criteria and set compare function */ + p = criteria_cmpfun[select_criteria]; + + if (p == NULL) + { + succp = false; + goto return_succp; + } + + if (router->bitvalue != 0) /*< 'synced' is the only bitvalue in rwsplit */ + { + is_synced_master = true; + } + else + { + is_synced_master = false; + } + +#if 0 + LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:"))); + for (i=0; ibackend_server->name, + b->backend_server->port, + b->backend_conn_count))); + } +#endif + /** + * Sort the pointer list to servers according to connection counts. As + * a consequence those backends having least connections are in the + * beginning of the list. */ - for (i = 0; router->servers[i] != NULL; i++) { - BACKEND* be = router->servers[i]; + qsort((void *)backend_ref, (size_t)router_nservers, sizeof(backend_ref_t), p); + + if (LOG_IS_ENABLED(LOGFILE_TRACE)) + { + if (select_criteria == LEAST_GLOBAL_CONNECTIONS || + select_criteria == LEAST_ROUTER_CONNECTIONS) + { + LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, + "Servers and %s connection counts:", + select_criteria == LEAST_GLOBAL_CONNECTIONS ? + "all MaxScale" : "router"))); + + for (i=0; ibackend_server->name, + b->backend_server->port, + b->backend_server->stats.n_current))); + break; + + case LEAST_ROUTER_CONNECTIONS: + LOGIF(LT, (skygw_log_write_flush(LOGFILE_TRACE, + "%s %d:%d", + b->backend_server->name, + b->backend_server->port, + b->backend_conn_count))); + break; + + default: + break; + } + } + } + } + /** + * Choose at least 1+1 (master and slave) and at most 1+max_nslaves + * servers from the sorted list. First master found is selected. + */ + for (i=0; + ibitvalue is %d", pthread_self(), - be->backend_server->name, - be->backend_server->port, - be->backend_conn_count, - be->backend_server->status, + b->backend_server->name, + b->backend_server->port, + b->backend_conn_count, + b->backend_server->status, router->bitmask))); - } - if (be != NULL && - SERVER_IS_RUNNING(be->backend_server) && - (be->backend_server->status & router->bitmask) == - router->bitvalue) + if (SERVER_IS_RUNNING(b->backend_server) && + ((b->backend_server->status & router->bitmask) == + router->bitvalue)) + { + if (slaves_found < max_nslaves && + SERVER_IS_SLAVE(b->backend_server)) { - if (SERVER_IS_SLAVE(be->backend_server) && - p_slave != NULL) + slaves_found += 1; + backend_ref[i].bref_dcb = dcb_connect( + b->backend_server, + session, + b->backend_server->protocol); + + if (backend_ref[i].bref_dcb != NULL) { - /** - * If no candidate set, set first running - * server as an initial candidate server. - */ - if (local_backend[BE_SLAVE] == NULL) - { - local_backend[BE_SLAVE] = be; - } - else if (be->backend_conn_count < - local_backend[BE_SLAVE]->backend_conn_count) - { - /** - * This running server has fewer - * connections, set it as a new - * candidate. + slaves_connected += 1; + /** + * Increase backend connection counter. + * Server's stats are _increased_ in + * dcb.c:dcb_alloc ! + * But decreased in the calling function + * of dcb_close. */ - local_backend[BE_SLAVE] = be; + atomic_add(&b->backend_conn_count, 1); } - else if (be->backend_conn_count == - local_backend[BE_SLAVE]->backend_conn_count && - be->backend_server->stats.n_connections < - local_backend[BE_SLAVE]->backend_server->stats.n_connections) + else { - /** - * This running server has the same - * number of connections currently - * as the candidate but has had - * fewer connections over time - * than candidate, set this server - * to candidate. - */ - local_backend[BE_SLAVE] = be; - } - } - else if (p_master != NULL && - local_backend[BE_MASTER] == NULL && - SERVER_IS_MASTER(be->backend_server)) - { - local_backend[BE_MASTER] = be; - } - else if (p_master != NULL && - local_backend[BE_JOINED] == NULL && - SERVER_IS_JOINED(be->backend_server)) - { - local_backend[BE_JOINED] = be; + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to establish " + "connection with slave %s:%d", + b->backend_server->name, + b->backend_server->port))); + /* handle connect error */ } + } + else if (!master_connected && + (SERVER_IS_MASTER(b->backend_server) || + SERVER_IS_JOINED(b->backend_server))) + { + master_found = true; + + backend_ref[i].bref_dcb = dcb_connect( + b->backend_server, + session, + b->backend_server->protocol); + + if (backend_ref[i].bref_dcb != NULL) + { + master_connected = true; + *p_master_ref = &backend_ref[i]; + /** Increase backend connection counter */ + /** Increase backend connection counter */ + atomic_add(&b->backend_server->stats.n_current, 1); + atomic_add(&b->backend_server->stats.n_connections, 1); + atomic_add(&b->backend_conn_count, 1); + } + else + { + succp = false; + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to establish " + "connection with master %s:%d", + b->backend_server->name, + b->backend_server->port))); + /* handle connect error */ + } + } } - } + } /*< for */ - if (router->bitvalue != 0 && - p_master != NULL && - local_backend[BE_JOINED] == NULL) + /** + * Successful cases + */ + if (master_connected && + slaves_connected >= min_nslaves && + slaves_connected <= max_nslaves) + { + succp = true; + + if (slaves_connected == 0 && slaves_found > 0) { - succp = false; - LOGIF(LE, (skygw_log_write_flush( + LOGIF(LE, (skygw_log_write( LOGFILE_ERROR, - "Error : Couldn't find a Joined Galera node from %d " - "candidates.", - i))); - goto return_succp; - } + "Warning : Couldn't connect to any of the %d " + "slaves. Routing to %s only.", + slaves_found, + (is_synced_master ? "Galera nodes" : "Master")))); - if (p_slave != NULL && local_backend[BE_SLAVE] == NULL) { - succp = false; - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Couldn't find suitable Slave from %d " - "candidates.", - i))); - } - - if (p_master != NULL && local_backend[BE_MASTER] == NULL) { - succp = false; - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Couldn't find suitable Master from %d " - "candidates.", - i))); + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : Couldn't connect to any of the %d " + "slaves. Routing to %s only.", + slaves_found, + (is_synced_master ? "Galera nodes" : "Master")))); } - - if (local_backend[BE_SLAVE] != NULL) { - *p_slave = local_backend[BE_SLAVE]; + else if (slaves_found == 0) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Warning : Couldn't find any slaves from existing " + "%d servers. Routing to %s only.", + router_nservers, + (is_synced_master ? "Galera nodes" : "Master")))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Warning : Couldn't find any slaves from existing " + "%d servers. Routing to %s only.", + router_nservers, + (is_synced_master ? "Galera nodes" : "Master")))); + } + else if (slaves_connected < max_nslaves) + { + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "Note : Couldn't connect to maximum number of " + "slaves. Connected successfully to %d slaves " + "of %d of them.", + slaves_connected, + slaves_found))); + } + + if (LOG_IS_ENABLED(LT)) + { + for (i=0; ibackend_server->name, + b->backend_server->port))); + } + } /* for */ + } + } + /** + * Failure cases + */ + else + { + if (!master_found) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : Couldn't find suitable %s from %d " + "candidates.", + (is_synced_master ? "Galera node" : "Master"), + router_nservers))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Error : Couldn't find suitable %s from %d " + "candidates.", + (is_synced_master ? "Galera node" : "Master"), + router_nservers))); + LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, - "%lu [readwritesplit:search_backend_servers] Selected " - "Slave %s:%d from %d candidates.", - pthread_self(), - local_backend[BE_SLAVE]->backend_server->name, - local_backend[BE_SLAVE]->backend_server->port, - i))); + "Error : Couldn't find suitable %s from %d " + "candidates.", + (is_synced_master ? "Galera node" : "Master"), + router_nservers))); } - if (local_backend[BE_MASTER] != NULL) { - *p_master = local_backend[BE_MASTER]; - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [readwritesplit:search_backend_servers] Selected " - "Master %s:%d " - "from %d candidates.", - pthread_self(), - local_backend[BE_MASTER]->backend_server->name, - local_backend[BE_MASTER]->backend_server->port, - i))); + else if (!master_connected) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : Couldn't connect to any %s although " + "there exists at least one %s node in the " + "cluster.", + (is_synced_master ? "Galera node" : "Master"), + (is_synced_master ? "Galera node" : "Master")))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "* Error : Couldn't connect to any %s although " + "there exists at least one %s node in the " + "cluster.", + (is_synced_master ? "Galera node" : "Master"), + (is_synced_master ? "Galera node" : "Master")))); + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Error : Couldn't connect to any %s although " + "there exists at least one %s node in the " + "cluster.", + (is_synced_master ? "Galera node" : "Master"), + (is_synced_master ? "Galera node" : "Master")))); + } + + if (slaves_connected < min_nslaves) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error : Couldn't establish required amount of " + "slave connections for router session."))); + + LOGIF(LM, (skygw_log_write( + LOGFILE_MESSAGE, + "*Error : Couldn't establish required amount of " + "slave connections for router session."))); + } + + /** Clean up connections */ + for (i=0; ibackend_conn_count > 0); + /** disconnect opened connections */ + backend_ref[i].bref_dcb->func.close(backend_ref[i].bref_dcb); + atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); + } + } + master_connected = false; + slaves_connected = 0; } return_succp: + return succp; } @@ -1387,7 +1947,7 @@ static GWBUF* sescmd_cursor_process_replies( packet = (uint8_t *)GWBUF_DATA(replybuf); packetlen = packet[0]+packet[1]*256+packet[2]*256*256; replybuf = gwbuf_consume(replybuf, packetlen+headerlen); - +/* LOGIF(LT, (skygw_log_write_flush( LOGFILE_TRACE, "%lu [sescmd_cursor_process_replies] cmd %p " @@ -1397,11 +1957,13 @@ static GWBUF* sescmd_cursor_process_replies( scmd, packetlen+headerlen, STRBETYPE(scur->scmd_cur_be_type)))); + */ } else { /** Mark the rest session commands as replied */ scmd->my_sescmd_is_replied = true; + /* LOGIF(LT, (skygw_log_write_flush( LOGFILE_TRACE, "%lu [sescmd_cursor_process_replies] Marked " @@ -1410,6 +1972,7 @@ static GWBUF* sescmd_cursor_process_replies( pthread_self(), scmd, STRBETYPE(scur->scmd_cur_be_type)))); + */ } if (sescmd_cursor_next(scur)) @@ -1449,17 +2012,6 @@ static mysql_sescmd_t* sescmd_cursor_get_command( return scmd; } -/** router must be locked */ -static sescmd_cursor_t* rses_get_sescmd_cursor( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type) -{ - CHK_CLIENT_RSES(rses); - ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); - - return &rses->rses_cursor[be_type]; -} - /** router must be locked */ static bool sescmd_cursor_is_active( sescmd_cursor_t* sescmd_cursor) @@ -1509,24 +2061,26 @@ static GWBUF* sescmd_cursor_clone_querybuf( * Router session must be locked. */ static bool execute_sescmd_in_backend( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type) + backend_ref_t* backend_ref) { DCB* dcb; bool succp = true; int rc = 0; sescmd_cursor_t* scur; - dcb = rses->rses_dcb[be_type]; + if (backend_ref->bref_dcb == NULL) + { + goto return_succp; + } + dcb = backend_ref->bref_dcb; CHK_DCB(dcb); - CHK_CLIENT_RSES(rses); - ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); + CHK_BACKEND_REF(backend_ref); /** * Get cursor pointer and copy of command buffer to cursor. */ - scur = rses_get_sescmd_cursor(rses, be_type); + scur = &backend_ref->bref_sescmd_cur; /** Return if there are no pending ses commands */ if (sescmd_cursor_get_command(scur) == NULL) @@ -1540,9 +2094,10 @@ static bool execute_sescmd_in_backend( /** Cursor is left active when function returns. */ sescmd_cursor_set_active(scur, true); } - LOGIF(LT, tracelog_routed_query(rses, + + LOGIF(LT, tracelog_routed_query(scur->scmd_cur_rses, "execute_sescmd_in_backend", - dcb, + backend_ref, sescmd_cursor_clone_querybuf(scur))); switch (scur->scmd_cur_cmd->my_sescmd_packet_type) { @@ -1554,7 +2109,6 @@ static bool execute_sescmd_in_backend( sescmd_cursor_clone_querybuf(scur)); break; - case COM_QUIT: case COM_QUERY: case COM_INIT_DB: default: @@ -1659,7 +2213,7 @@ static rses_property_t* mysql_sescmd_get_property( static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, char* funcname, - DCB* dcb, + backend_ref_t* bref, GWBUF* buf) { uint8_t* packet = GWBUF_DATA(buf); @@ -1668,20 +2222,18 @@ static void tracelog_routed_query( size_t buflen = GWBUF_LENGTH(buf); char* querystr; char* startpos = (char *)&packet[5]; + BACKEND* b; backend_type_t be_type; + DCB* dcb; + + CHK_BACKEND_REF(bref); + b = bref->bref_backend; + CHK_BACKEND(b); + dcb = bref->bref_dcb; + CHK_DCB(dcb); + + be_type = BACKEND_TYPE(b); - if (rses->rses_dcb[BE_MASTER] == dcb) - { - be_type = BE_MASTER; - } - else if (rses->rses_dcb[BE_SLAVE] == dcb) - { - be_type = BE_SLAVE; - } - else - { - be_type = BE_UNDEFINED; - } if (GWBUF_TYPE(buf) == GWBUF_TYPE_MYSQL) { len = packet[0]; @@ -1700,16 +2252,8 @@ static void tracelog_routed_query( funcname, buflen, querystr, - (be_type == BE_MASTER ? - rses->rses_backend[BE_MASTER]->backend_server->name : - (be_type == BE_SLAVE ? - rses->rses_backend[BE_SLAVE]->backend_server->name : - "Target DCB is neither of the backends. This is error")), - (be_type == BE_MASTER ? - rses->rses_backend[BE_MASTER]->backend_server->port : - (be_type == BE_SLAVE ? - rses->rses_backend[BE_SLAVE]->backend_server->port : - -1)), + b->backend_server->name, + b->backend_server->port, STRBETYPE(be_type), dcb))); free(querystr); @@ -1763,20 +2307,19 @@ static bool route_session_write( skygw_query_type_t qtype) { bool succp; - DCB* master_dcb; - DCB* slave_dcb; rses_property_t* prop; + backend_ref_t* backend_ref; + int i; - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - CHK_DCB(master_dcb); - CHK_DCB(slave_dcb); LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "Session write, query type\t%s, packet type %s, " "routing to all servers.", STRQTYPE(qtype), STRPACKETTYPE(packet_type)))); + + backend_ref = router_cli_ses->rses_backend_ref; + /** * COM_QUIT is one-way message. Server doesn't respond to that. * Therefore reply processing is unnecessary and session @@ -1786,15 +2329,32 @@ static bool route_session_write( if (packet_type == COM_QUIT) { int rc; - int rc2; - rc = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf)); - rc2 = slave_dcb->func.write(slave_dcb, querybuf); - - if (rc == 1 && rc == rc2) - { - succp = true; + succp = true; + + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + succp = false; + goto return_succp; } + + for (i=0; irses_nbackends; i++) + { + DCB* dcb = backend_ref[i].bref_dcb; + + if (dcb != NULL) + { + rc = dcb->func.write(dcb, gwbuf_clone(querybuf)); + + if (rc != 1) + { + succp = false; + } + } + } + rses_end_locked_router_action(router_cli_ses); + gwbuf_free(querybuf); goto return_succp; } prop = rses_property_init(RSES_PROP_TYPE_SESCMD); @@ -1815,8 +2375,9 @@ static bool route_session_write( /** Add sescmd property to router client session */ rses_property_add(router_cli_ses, prop); - /** Execute session command in master */ - succp = execute_sescmd_in_backend(router_cli_ses, BE_MASTER); + for (i=0; irses_nbackends; i++) + { + succp = execute_sescmd_in_backend(&backend_ref[i]); if (!succp) { @@ -1824,13 +2385,6 @@ static bool route_session_write( rses_end_locked_router_action(router_cli_ses); goto return_succp; } - /** Execute session command in slave */ - succp = execute_sescmd_in_backend(router_cli_ses, BE_SLAVE); - if (!succp) - { - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); - goto return_succp; } /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); @@ -1841,3 +2395,52 @@ return_succp: return succp; } +static void rwsplit_process_options( + ROUTER_INSTANCE* router, + char** options) +{ + int i; + char* value; + select_criteria_t c; + + for (i = 0; options[i]; i++) + { + if ((value = strchr(options[i], '=')) == NULL) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, "Warning : Unsupported " + "router option \"%s\" for " + "readwritesplit router.", + options[i]))); + } + else + { + *value = 0; + value++; + if (strcmp(options[i], "slave_selection_criteria") == 0) + { + c = GET_SELECT_CRITERIA(value); + ss_dassert( + c == LEAST_GLOBAL_CONNECTIONS || + c == LEAST_ROUTER_CONNECTIONS || + c == LEAST_BEHIND_MASTER || + c == UNDEFINED_CRITERIA); + + if (c == UNDEFINED_CRITERIA) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, "Warning : Unknown " + "slave selection criteria \"%s\". " + "Allowed values are \"LEAST_GLOBAL_CONNECTIONS\", " + "LEAST_ROUTER_CONNECTIONS, " + "and \"LEAST_ROUTER_CONNECTIONS\".", + STRCRITERIA(router->rwsplit_config.rw_slave_select_criteria)))); + } + else + { + router->rwsplit_config.rw_slave_select_criteria = c; + } + } + } + } /*< for */ +} \ No newline at end of file