diff --git a/server/core/server.c b/server/core/server.c index b7c1a0236..629885afe 100644 --- a/server/core/server.c +++ b/server/core/server.c @@ -247,19 +247,26 @@ char *stat; while (ptr) { dcb_printf(dcb, "Server %p (%s)\n", ptr, ptr->unique_name); - dcb_printf(dcb, "\tServer: %s\n", ptr->name); + dcb_printf(dcb, "\tServer: %s\n", + ptr->name); stat = server_status(ptr); - dcb_printf(dcb, "\tStatus: %s\n", stat); + dcb_printf(dcb, "\tStatus: %s\n", + stat); free(stat); - dcb_printf(dcb, "\tProtocol: %s\n", ptr->protocol); - dcb_printf(dcb, "\tPort: %d\n", ptr->port); + dcb_printf(dcb, "\tProtocol: %s\n", + ptr->protocol); + dcb_printf(dcb, "\tPort: %d\n", + ptr->port); if (ptr->server_string) - dcb_printf(dcb, "\tServer Version:\t\t%s\n", ptr->server_string); - dcb_printf(dcb, "\tNode Id: %d\n", ptr->node_id); - dcb_printf(dcb, "\tMaster Id: %d\n", ptr->master_id); + dcb_printf(dcb, "\tServer Version:\t\t\t%s\n", + ptr->server_string); + dcb_printf(dcb, "\tNode Id: %d\n", + ptr->node_id); + dcb_printf(dcb, "\tMaster Id: %d\n", + ptr->master_id); if (ptr->slaves) { int i; - dcb_printf(dcb, "\tSlave Ids: "); + dcb_printf(dcb, "\tSlave Ids: "); for (i = 0; ptr->slaves[i]; i++) { if (i == 0) @@ -269,7 +276,8 @@ char *stat; } dcb_printf(dcb, "\n"); } - dcb_printf(dcb, "\tRepl Depth: %d\n", ptr->depth); + dcb_printf(dcb, "\tRepl Depth: %d\n", + ptr->depth); if (SERVER_IS_SLAVE(ptr) || SERVER_IS_RELAY_SERVER(ptr)) { if (ptr->rlag >= 0) { dcb_printf(dcb, "\tSlave delay:\t\t%d\n", ptr->rlag); @@ -278,9 +286,12 @@ char *stat; if (ptr->node_ts > 0) { dcb_printf(dcb, "\tLast Repl Heartbeat:\t%lu\n", ptr->node_ts); } - dcb_printf(dcb, "\tNumber of connections: %d\n", ptr->stats.n_connections); - dcb_printf(dcb, "\tCurrent no. of conns: %d\n", ptr->stats.n_current); - dcb_printf(dcb, "\tCurrent no. of operations: %d\n", ptr->stats.n_current_ops); + dcb_printf(dcb, "\tNumber of connections: %d\n", + ptr->stats.n_connections); + dcb_printf(dcb, "\tCurrent no. of conns: %d\n", + ptr->stats.n_current); + dcb_printf(dcb, "\tCurrent no. of operations: %d\n", + ptr->stats.n_current_ops); ptr = ptr->next; } spinlock_release(&server_spin); @@ -299,19 +310,19 @@ char *stat; SERVER_PARAM *param; dcb_printf(dcb, "Server %p (%s)\n", server, server->unique_name); - dcb_printf(dcb, "\tServer: %s\n", server->name); + dcb_printf(dcb, "\tServer: %s\n", server->name); stat = server_status(server); - dcb_printf(dcb, "\tStatus: %s\n", stat); + dcb_printf(dcb, "\tStatus: %s\n", stat); free(stat); - dcb_printf(dcb, "\tProtocol: %s\n", server->protocol); - dcb_printf(dcb, "\tPort: %d\n", server->port); + dcb_printf(dcb, "\tProtocol: %s\n", server->protocol); + dcb_printf(dcb, "\tPort: %d\n", server->port); if (server->server_string) - dcb_printf(dcb, "\tServer Version:\t\t%s\n", server->server_string); - dcb_printf(dcb, "\tNode Id: %d\n", server->node_id); - dcb_printf(dcb, "\tMaster Id: %d\n", server->master_id); + dcb_printf(dcb, "\tServer Version:\t\t\t%s\n", server->server_string); + dcb_printf(dcb, "\tNode Id: %d\n", server->node_id); + dcb_printf(dcb, "\tMaster Id: %d\n", server->master_id); if (server->slaves) { int i; - dcb_printf(dcb, "\tSlave Ids: "); + dcb_printf(dcb, "\tSlave Ids: "); for (i = 0; server->slaves[i]; i++) { if (i == 0) @@ -321,7 +332,7 @@ SERVER_PARAM *param; } dcb_printf(dcb, "\n"); } - dcb_printf(dcb, "\tRepl Depth: %d\n", server->depth); + dcb_printf(dcb, "\tRepl Depth: %d\n", server->depth); if (SERVER_IS_SLAVE(server) || SERVER_IS_RELAY_SERVER(server)) { if (server->rlag >= 0) { dcb_printf(dcb, "\tSlave delay:\t\t%d\n", server->rlag); @@ -336,14 +347,16 @@ SERVER_PARAM *param; dcb_printf(dcb, "\tServer Parameters:\n"); while (param) { - dcb_printf(dcb, "\t\t%-20s %s\n", param->name, + dcb_printf(dcb, "\t\t%-20s\t%s\n", param->name, param->value); param = param->next; } } - dcb_printf(dcb, "\tNumber of connections: %d\n", server->stats.n_connections); - dcb_printf(dcb, "\tCurrent no. of conns: %d\n", server->stats.n_current); - dcb_printf(dcb, "\tCurrent no. of operations: %d\n", server->stats.n_current_ops); + dcb_printf(dcb, "\tNumber of connections: %d\n", + server->stats.n_connections); + dcb_printf(dcb, "\tCurrent no. of conns: %d\n", + server->stats.n_current); + dcb_printf(dcb, "\tCurrent no. of operations: %d\n", server->stats.n_current_ops); } /** diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 3481b4412..6fc639005 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -175,9 +175,17 @@ typedef struct backend_st { #if defined(SS_DEBUG) skygw_chk_t be_chk_top; #endif - SERVER* backend_server; /*< The server itself */ - int backend_conn_count; /*< Number of connections to the server */ - bool be_valid; /*< valid when belongs to the router's configuration */ + SERVER* backend_server; /*< The server itself */ + int backend_conn_count; /*< Number of connections to + * the server + */ + bool be_valid; /*< Valid when belongs to the + * router's configuration + */ + int weight; /*< Desired weighting on the + * load. Expressed in .1% + * increments + */ #if defined(SS_DEBUG) skygw_chk_t be_chk_tail; #endif diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index f2d4debc5..c7aef4e60 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -345,15 +345,15 @@ static void refreshInstance( * * @return NULL in failure, pointer to router in success. */ -static ROUTER* createInstance( - SERVICE* service, - char** options) +static ROUTER * +createInstance(SERVICE *service, char **options) { ROUTER_INSTANCE* router; SERVER* server; int nservers; int i; CONFIG_PARAMETER* param; + char *weightby; if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { return NULL; @@ -399,6 +399,7 @@ static ROUTER* createInstance( router->servers[nservers]->backend_server = server; router->servers[nservers]->backend_conn_count = 0; router->servers[nservers]->be_valid = false; + router->servers[nservers]->weight = 1000; #if defined(SS_DEBUG) router->servers[nservers]->be_chk_top = CHK_NUM_BACKEND; router->servers[nservers]->be_chk_tail = CHK_NUM_BACKEND; @@ -407,6 +408,59 @@ static ROUTER* createInstance( server = server->nextdb; } router->servers[nservers] = NULL; + + /* + * If server weighting has been defined calculate the percentage + * of load that will be sent to each server. This is only used for + * calculating the least connections, either globally or within a + * service, or the numebr of current operations on a server. + */ + if ((weightby = serviceGetWeightingParameter(service)) != NULL) + { + int n, total = 0; + BACKEND *backend; + + for (n = 0; router->servers[n]; n++) + { + backend = router->servers[n]; + total += atoi(serverGetParameter( + backend->backend_server, weightby)); + } + if (total == 0) + { + LOGIF(LE, (skygw_log_write(LOGFILE_ERROR, + "WARNING: Weighting Parameter for service '%s' " + "will be ignored as no servers have values " + "for the parameter '%s'.\n", + service->name, weightby))); + } + else + { + for (n = 0; router->servers[n]; n++) + { + int perc; + backend = router->servers[n]; + perc = (atoi(serverGetParameter( + backend->backend_server, + weightby)) * 1000) / total; + if (perc == 0) + perc = 1; + backend->weight = perc; + if (perc == 0) + { + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Server '%s' has no value " + "for weighting parameter '%s', " + "no queries will be routed to " + "this server.\n", + server->unique_name, + weightby))); + } + + } + } + } /** * vraa : is this necessary for readwritesplit ? @@ -1271,9 +1325,11 @@ static void rses_end_locked_router_action( static void diagnostic(ROUTER *instance, DCB *dcb) { - ROUTER_CLIENT_SES *router_cli_ses; - ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; - int i = 0; +ROUTER_CLIENT_SES *router_cli_ses; +ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; +int i = 0; +BACKEND *backend; +char *weightby; spinlock_acquire(&router->lock); router_cli_ses = router->connections; @@ -1302,6 +1358,30 @@ diagnostic(ROUTER *instance, DCB *dcb) dcb_printf(dcb, "\tNumber of queries forwarded to all: %d\n", router->stats.n_all); + if ((weightby = serviceGetWeightingParameter(router->service)) != NULL) + { + dcb_printf(dcb, + "\tConnection distribution based on %s " + "server parameter.\n", weightby); + dcb_printf(dcb, + "\t\tServer Target %% Connections " + "Operations\n"); + dcb_printf(dcb, + "\t\t Global Router\n"); + for (i = 0; router->servers[i]; i++) + { + backend = router->servers[i]; + dcb_printf(dcb, + "\t\t%-20s %3.1f%% %-6d %-6d %d\n", + backend->backend_server->unique_name, + (float)backend->weight / 10, + backend->backend_server->stats.n_current, + backend->backend_conn_count, + backend->backend_server->stats.n_current_ops); + } + + } + } /** @@ -1481,8 +1561,8 @@ int bref_cmp_router_conn( BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend; BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend; - return ((b1->backend_conn_count < b2->backend_conn_count) ? -1 : - ((b1->backend_conn_count > b2->backend_conn_count) ? 1 : 0)); + return ((1000 * b1->backend_conn_count) / b1->weight) + - ((1000 * b2->backend_conn_count) / b2->weight); } int bref_cmp_global_conn( @@ -1492,8 +1572,8 @@ int bref_cmp_global_conn( BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend; BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend; - return ((b1->backend_server->stats.n_current < b2->backend_server->stats.n_current) ? -1 : - ((b1->backend_server->stats.n_current > b2->backend_server->stats.n_current) ? 1 : 0)); + return ((1000 * b1->backend_server->stats.n_current) / b1->weight) + - ((1000 * b2->backend_server->stats.n_current) / b2->weight); } @@ -1510,9 +1590,11 @@ int bref_cmp_current_load( { SERVER* s1 = ((backend_ref_t *)bref1)->bref_backend->backend_server; SERVER* s2 = ((backend_ref_t *)bref2)->bref_backend->backend_server; + BACKEND* b1 = ((backend_ref_t *)bref1)->bref_backend; + BACKEND* b2 = ((backend_ref_t *)bref2)->bref_backend; - return ((s1->stats.n_current_ops < s2->stats.n_current_ops) ? -1 : - ((s1->stats.n_current_ops > s2->stats.n_current_ops) ? 1 : 0)); + return ((1000 * s1->stats.n_current_ops) - b1->weight) + - ((1000 * s2->stats.n_current_ops) - b2->weight); } @@ -3446,7 +3528,7 @@ static BACKEND *get_root_master(backend_ref_t *servers, int router_nservers) { for (i = 0; i< router_nservers; i++) { BACKEND* b = NULL; b = servers[i].bref_backend; - if (b && (! SERVER_IS_DOWN(b->backend_server))) { + if (b && (b->backend_server->status & (SERVER_MASTER|SERVER_MAINT)) == SERVER_MASTER) { if (master_host && b->backend_server->depth < master_host->backend_server->depth) { master_host = b; } else { @@ -3456,27 +3538,6 @@ static BACKEND *get_root_master(backend_ref_t *servers, int router_nservers) { } } } - - /* (2) get the status of server(s) with lowest replication level and check it against SERVER_MASTER bitvalue */ - if (master_host) { - int found = 0; - for (i = 0; ibackend_server)) && (b->backend_server->depth == master_host->backend_server->depth)) { - if (b->backend_server->status & SERVER_MASTER) { - master_host = b; - found = 1; - } - } - } - if (!found) - master_host = NULL; - - if (found && SERVER_IN_MAINT(master_host->backend_server)) - master_host = NULL; - } - - return master_host; + return master_host; }