MaxScale/server/core/service.cc
Markus Mäkelä a50fce0c65
MXS-1929: Allow startup with no services
MaxScale can now be started with an empty configuration file and services
can be created at runtime. Filters cannot yet be created at runtime so
complete runtime creation of configurations is not yet possible.
2018-07-31 09:41:09 +03:00

2809 lines
76 KiB
C++

/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2022-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
/**
* @file service.c - A representation of a service within MaxScale
*/
#include <maxscale/cppdefs.hh>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <errno.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <math.h>
#include <fcntl.h>
#include <map>
#include <string>
#include <set>
#include <vector>
#include <unordered_set>
#include <maxscale/service.h>
#include <maxscale/alloc.h>
#include <maxscale/dcb.h>
#include <maxscale/paths.h>
#include <maxscale/housekeeper.h>
#include <maxscale/listener.h>
#include <maxscale/log_manager.h>
#include <maxscale/poll.h>
#include <maxscale/protocol.h>
#include <maxscale/resultset.h>
#include <maxscale/router.h>
#include <maxscale/server.h>
#include <maxscale/session.h>
#include <maxscale/spinlock.h>
#include <maxscale/users.h>
#include <maxscale/utils.h>
#include <maxscale/utils.hh>
#include <maxscale/version.h>
#include <maxscale/jansson.h>
#include <maxscale/json_api.h>
#include <maxscale/routingworker.h>
#include "internal/config.h"
#include "internal/filter.h"
#include "internal/modules.h"
#include "internal/service.h"
#include "internal/routingworker.hh"
/** This define is needed in CentOS 6 systems */
#if !defined(UINT64_MAX)
#define UINT64_MAX (18446744073709551615UL)
#endif
using std::string;
using std::set;
/** Base value for server weights */
#define SERVICE_BASE_SERVER_WEIGHT 1000
static SPINLOCK service_spin = SPINLOCK_INIT;
static SERVICE *allServices = NULL;
static bool service_internal_restart(void *data);
static void service_calculate_weights(SERVICE *service);
SERVICE* service_alloc(const char *name, const char *router, MXS_CONFIG_PARAMETER* params)
{
MXS_ROUTER_OBJECT* router_api = (MXS_ROUTER_OBJECT*)load_module(router, MODULE_ROUTER);
if (router_api == NULL)
{
MXS_ERROR("Unable to load router module '%s'", router);
return NULL;
}
char *my_name = MXS_STRDUP(name);
char *my_router = MXS_STRDUP(router);
SERVICE *service = (SERVICE *)MXS_CALLOC(1, sizeof(*service));
SERVICE_REFRESH_RATE* rate_limits = (SERVICE_REFRESH_RATE*)MXS_CALLOC(config_threadcount(),
sizeof(*rate_limits));
if (!my_name || !my_router || !service || !rate_limits)
{
MXS_FREE(my_name);
MXS_FREE(my_router);
MXS_FREE(service);
MXS_FREE(rate_limits);
return NULL;
}
const MXS_MODULE* module = get_module(my_router, MODULE_ROUTER);
ss_dassert(module);
service->router = router_api;
service->capabilities = module->module_capabilities;
service->client_count = 0;
service->n_dbref = 0;
service->name = my_name;
service->routerModule = my_router;
service->svc_config_param = NULL;
service->rate_limits = rate_limits;
service->stats.started = time(0);
service->stats.n_failed_starts = 0;
service->state = SERVICE_STATE_ALLOC;
service->active = true;
spinlock_init(&service->spin);
service->max_retry_interval = config_get_integer(params, CN_MAX_RETRY_INTERVAL);
service->users_from_all = config_get_bool(params, CN_AUTH_ALL_SERVERS);
service->localhost_match_wildcard_host = config_get_bool(params, CN_LOCALHOST_MATCH_WILDCARD_HOST);
service->retry_start = config_get_bool(params, CN_RETRY_ON_FAILURE);
service->enable_root = config_get_bool(params, CN_ENABLE_ROOT_USER);
service->conn_idle_timeout = config_get_integer(params, CN_CONNECTION_TIMEOUT);
service->max_connections = config_get_integer(params, CN_MAX_CONNECTIONS);
service->log_auth_warnings = config_get_bool(params, CN_LOG_AUTH_WARNINGS);
service->strip_db_esc = config_get_bool(params, CN_STRIP_DB_ESC);
service->session_track_trx_state = config_get_bool(params, CN_SESSION_TRACK_TRX_STATE);
serviceWeightBy(service, config_get_string(params, CN_WEIGHTBY));
serviceSetUser(service, config_get_string(params, CN_USER),
config_get_string(params, CN_PASSWORD));
std::string version_string = config_get_string(params, CN_VERSION_STRING);
if (!version_string.empty())
{
/** Add the 5.5.5- string to the start of the version string if
* the version string starts with "10.".
* This mimics MariaDB 10.0 replication which adds 5.5.5- for backwards compatibility. */
if (version_string[0] != '5')
{
version_string = "5.5.5-" + version_string;
}
serviceSetVersionString(service, version_string.c_str());
}
else if (config_get_global_options()->version_string)
{
serviceSetVersionString(service, config_get_global_options()->version_string);
}
if (service->conn_idle_timeout)
{
dcb_enable_session_timeouts();
}
// Store parameters in the service
service_add_parameters(service, params);
service->router_instance = router_api->createInstance(service, params);
if (service->router_instance == NULL)
{
MXS_ERROR("%s: Failed to create router instance. Service not started.", service->name);
service->active = false;
service_free(service);
return NULL;
}
if (router_api->getCapabilities)
{
service->capabilities |= router_api->getCapabilities(service->router_instance);
}
spinlock_acquire(&service_spin);
service->next = allServices;
allServices = service;
spinlock_release(&service_spin);
return service;
}
void service_free(SERVICE* service)
{
ss_dassert(atomic_load_int(&service->client_count) == 0);
ss_dassert(!service->active);
spinlock_acquire(&service_spin);
if (service == allServices)
{
allServices = allServices->next;
}
else
{
for (SERVICE* s = allServices; s; s = s->next)
{
if (s->next == service)
{
s->next = service->next;
break;
}
}
}
spinlock_release(&service_spin);
while (service->ports)
{
auto tmp = service->ports;
service->ports = service->ports->next;
ss_dassert(!tmp->active);
listener_free(tmp);
}
if (service->router && service->router_instance)
{
service->router->destroyInstance(service->router_instance);
}
while (service->dbref)
{
SERVER_REF* tmp = service->dbref;
ss_dassert(!tmp->active);
service->dbref = service->dbref->next;
MXS_FREE(tmp);
}
config_parameter_free(service->svc_config_param);
MXS_FREE(service->name);
MXS_FREE(service->routerModule);
MXS_FREE(service->filters);
MXS_FREE(service->rate_limits);
MXS_FREE(service);
}
void service_destroy(SERVICE* service)
{
#ifdef SS_DEBUG
auto current = mxs::RoutingWorker::get_current();
auto main = mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN);
ss_info_dassert(current == main, "Destruction of service must be done on the main worker");
#endif
ss_dassert(service->active);
service->active = false;
char filename[PATH_MAX + 1];
snprintf(filename, sizeof(filename), "%s/%s.cnf", get_config_persistdir(),
service->name);
if (unlink(filename) == -1 && errno != ENOENT)
{
MXS_ERROR("Failed to remove persisted service configuration at '%s': %d, %s",
filename, errno, mxs_strerror(errno));
}
if (atomic_load_int(&service->client_count) == 0)
{
// The service has no active sessions, it can be closed immediately
service_free(service);
}
}
/**
* Check to see if a service pointer is valid
*
* @param service The pointer to check
* @return 1 if the service is in the list of all services
*/
int
service_isvalid(SERVICE *service)
{
SERVICE *checkservice;
int rval = 0;
spinlock_acquire(&service_spin);
checkservice = allServices;
while (checkservice)
{
if (checkservice == service)
{
rval = 1;
break;
}
checkservice = checkservice->next;
}
spinlock_release(&service_spin);
return rval;
}
static inline void close_port(SERV_LISTENER *port)
{
if (port->service->state == SERVICE_STATE_ALLOC)
{
/** The service failed when it was being allocated */
port->service->state = SERVICE_STATE_FAILED;
}
if (port->listener)
{
dcb_close(port->listener);
port->listener = NULL;
}
}
/**
* Start an individual port/protocol pair
*
* @param service The service
* @param port The port to start
* @return The number of listeners started
*/
static int
serviceStartPort(SERVICE *service, SERV_LISTENER *port)
{
const size_t ANY_IPV4_ADDRESS_LEN = 7; // strlen("0:0:0:0");
int listeners = 0;
size_t config_bind_len =
(port->address ? strlen(port->address) : ANY_IPV4_ADDRESS_LEN) + 1 + UINTLEN(port->port);
char config_bind[config_bind_len + 1]; // +1 for NULL
MXS_PROTOCOL *funcs;
if (service == NULL || service->router == NULL || service->router_instance == NULL)
{
/* Should never happen, this guarantees it can't */
MXS_ERROR("Attempt to start port with null or incomplete service");
close_port(port);
ss_dassert(false);
return 0;
}
port->listener = dcb_alloc(DCB_ROLE_SERVICE_LISTENER, port);
if (port->listener == NULL)
{
MXS_ERROR("Failed to create listener for service %s.", service->name);
close_port(port);
return 0;
}
port->listener->service = service;
if ((funcs = (MXS_PROTOCOL *)load_module(port->protocol, MODULE_PROTOCOL)) == NULL)
{
MXS_ERROR("Unable to load protocol module %s. Listener for service %s not started.",
port->protocol, service->name);
close_port(port);
return 0;
}
memcpy(&(port->listener->func), funcs, sizeof(MXS_PROTOCOL));
const char *authenticator_name = "NullAuthDeny";
if (port->authenticator)
{
authenticator_name = port->authenticator;
}
else if (port->listener->func.auth_default)
{
authenticator_name = port->listener->func.auth_default();
}
MXS_AUTHENTICATOR *authfuncs = (MXS_AUTHENTICATOR *)load_module(authenticator_name, MODULE_AUTHENTICATOR);
if (authfuncs == NULL)
{
MXS_ERROR("Failed to load authenticator module '%s' for listener '%s'",
authenticator_name, port->name);
close_port(port);
return 0;
}
// Add protocol and authenticator capabilities from the listener
const MXS_MODULE* proto_mod = get_module(port->protocol, MODULE_PROTOCOL);
const MXS_MODULE* auth_mod = get_module(authenticator_name, MODULE_AUTHENTICATOR);
ss_dassert(proto_mod && auth_mod);
service->capabilities |= proto_mod->module_capabilities | auth_mod->module_capabilities;
memcpy(&port->listener->authfunc, authfuncs, sizeof(MXS_AUTHENTICATOR));
/**
* Normally, we'd allocate the DCB specific authentication data. As the
* listeners aren't normal DCBs, we can skip that.
*/
if (port->address)
{
sprintf(config_bind, "%s|%d", port->address, port->port);
}
else
{
sprintf(config_bind, "::|%d", port->port);
}
/** Load the authentication users before before starting the listener */
if (port->listener->authfunc.loadusers)
{
switch (port->listener->authfunc.loadusers(port))
{
case MXS_AUTH_LOADUSERS_FATAL:
MXS_ERROR("[%s] Fatal error when loading users for listener '%s', "
"service is not started.", service->name, port->name);
close_port(port);
return 0;
case MXS_AUTH_LOADUSERS_ERROR:
MXS_WARNING("[%s] Failed to load users for listener '%s', authentication"
" might not work.", service->name, port->name);
break;
default:
break;
}
}
MXS_CONFIG* config = config_get_global_options();
time_t last;
bool warned;
/**
* At service start last update is set to config->users_refresh_time seconds earlier.
* This way MaxScale could try reloading users just after startup. But only if user
* refreshing has not been turned off.
*/
if (config->users_refresh_time == INT32_MAX)
{
last = time(NULL);
warned = true; // So that there will not be a refresh rate warning.
}
else
{
last = time(NULL) - config->users_refresh_time;
warned = false;
}
int nthreads = config_threadcount();
for (int i = 0; i < nthreads; ++i)
{
service->rate_limits[i].last = last;
service->rate_limits[i].warned = warned;
}
if (port->listener->func.listen(port->listener, config_bind))
{
port->listener->session = session_alloc(service, port->listener);
if (port->listener->session != NULL)
{
port->listener->session->state = SESSION_STATE_LISTENER;
listeners += 1;
}
else
{
MXS_ERROR("[%s] Failed to create listener session.", service->name);
close_port(port);
}
}
else
{
MXS_ERROR("[%s] Failed to listen on %s", service->name, config_bind);
close_port(port);
}
return listeners;
}
/**
* Start all ports for a service.
* serviceStartAllPorts will try to start all listeners associated with the service.
* If no listeners are started, the starting of ports will be retried after a period of time.
* @param service Service to start
* @return Number of started listeners. This is equal to the number of ports the service
* is listening to.
*/
int serviceStartAllPorts(SERVICE* service)
{
SERV_LISTENER *port = service->ports;
int listeners = 0;
if (port)
{
while (!service->svc_do_shutdown && port)
{
listeners += serviceStartPort(service, port);
port = port->next;
}
if (service->state == SERVICE_STATE_FAILED)
{
listeners = 0;
}
else if (listeners)
{
service->state = SERVICE_STATE_STARTED;
service->stats.started = time(0);
}
else if (service->retry_start)
{
/** Service failed to start any ports. Try again later. */
service->stats.n_failed_starts++;
char taskname[strlen(service->name) + strlen("_start_retry_") +
(int) ceil(log10(INT_MAX)) + 1];
int retry_after = MXS_MIN(service->stats.n_failed_starts * 10, service->max_retry_interval);
snprintf(taskname, sizeof(taskname), "%s_start_retry_%d",
service->name, service->stats.n_failed_starts);
hktask_add(taskname, service_internal_restart, service, retry_after);
MXS_NOTICE("Failed to start service %s, retrying in %d seconds.",
service->name, retry_after);
/** This will prevent MaxScale from shutting down if service start is retried later */
listeners = 1;
}
}
else
{
MXS_WARNING("Service '%s' has no listeners defined.", service->name);
listeners = 1; /** Set this to one to suppress errors */
}
return listeners;
}
/**
* Start a service
*
* This function loads the protocol modules for each port on which the
* service listens and starts the listener on that port
*
* @param service The Service that should be started
*
* @return Returns the number of listeners created
*/
int serviceInitialize(SERVICE *service)
{
/** Calculate the server weights */
service_calculate_weights(service);
int listeners = 0;
if (!config_get_global_options()->config_check)
{
listeners = serviceStartAllPorts(service);
}
else
{
/** We're only checking that the configuration is valid */
listeners++;
}
return listeners;
}
bool serviceLaunchListener(SERVICE *service, SERV_LISTENER *port)
{
ss_dassert(service->state != SERVICE_STATE_FAILED);
bool rval = true;
spinlock_acquire(&service->spin);
if (serviceStartPort(service, port) == 0)
{
/** Failed to start the listener */
service_remove_listener(service, port->name);
rval = false;
}
spinlock_release(&service->spin);
return rval;
}
bool serviceStopListener(SERVICE *service, const char *name)
{
bool rval = false;
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener_is_active(listener) && strcmp(listener->name, name) == 0)
{
if (poll_remove_dcb(listener->listener) == 0)
{
listener->listener->session->state = SESSION_STATE_LISTENER_STOPPED;
rval = true;
}
break;
}
}
return rval;
}
bool serviceStartListener(SERVICE *service, const char *name)
{
bool rval = false;
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener_is_active(listener) && strcmp(listener->name, name) == 0)
{
if (listener->listener && listener->listener->session->state == SESSION_STATE_LISTENER_STOPPED &&
poll_add_dcb(listener->listener) == 0)
{
listener->listener->session->state = SESSION_STATE_LISTENER;
rval = true;
}
break;
}
}
return rval;
}
int service_launch_all()
{
SERVICE *ptr;
int n = 0, i;
bool error = false;
int num_svc = 0;
for (ptr = allServices; ptr; ptr = ptr->next)
{
num_svc++;
}
MXS_NOTICE("Starting a total of %d services...", num_svc);
int curr_svc = 1;
ptr = allServices;
while (ptr && !ptr->svc_do_shutdown)
{
n += (i = serviceInitialize(ptr));
MXS_NOTICE("Service '%s' started (%d/%d)", ptr->name, curr_svc++, num_svc);
if (i == 0)
{
MXS_ERROR("Failed to start service '%s'.", ptr->name);
error = true;
}
ptr = ptr->next;
}
return error ? -1 : n;
}
bool serviceStop(SERVICE *service)
{
int listeners = 0;
if (service)
{
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener_is_active(listener) &&
listener->listener && listener->listener->session->state == SESSION_STATE_LISTENER)
{
if (poll_remove_dcb(listener->listener) == 0)
{
listener->listener->session->state = SESSION_STATE_LISTENER_STOPPED;
listeners++;
}
}
}
service->state = SERVICE_STATE_STOPPED;
}
return listeners > 0;
}
/**
* Restart a service
*
* This function stops the listener for the service
*
* @param service The Service that should be restarted
* @return Returns the number of listeners restarted
*/
bool serviceStart(SERVICE *service)
{
int listeners = 0;
if (service)
{
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener_is_active(listener) &&
listener->listener && listener->listener->session->state == SESSION_STATE_LISTENER_STOPPED)
{
if (poll_add_dcb(listener->listener) == 0)
{
listener->listener->session->state = SESSION_STATE_LISTENER;
listeners++;
}
}
}
service->state = SERVICE_STATE_STARTED;
}
return listeners > 0;
}
/**
* Add a listener to a service
*
* @param service Service where listener is added
* @param proto Listener to add
*/
static void service_add_listener(SERVICE* service, SERV_LISTENER* proto)
{
do
{
/** Read the current value of the list's head. This will be our expected
* value for the following compare-and-swap operation. */
proto->next = (SERV_LISTENER*)atomic_load_ptr((void**)&service->ports);
}
/** Compare the current value to our expected value and if they match, replace
* the current value with our new value. */
while (!atomic_cas_ptr((void**)&service->ports, (void**)&proto->next, proto));
}
bool service_remove_listener(SERVICE *service, const char* target)
{
bool rval = false;
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener_is_active(listener) && strcmp(listener->name, target) == 0)
{
listener_set_active(listener, false);
if (poll_remove_dcb(listener->listener) == 0)
{
listener->listener->session->state = SESSION_STATE_LISTENER_STOPPED;
rval = true;
}
break;
}
}
return rval;
}
/**
* Create a listener for the service
*
* @param service The service
* @param protocol The name of the protocol module
* @param address The address to listen with
* @param port The port to listen on
* @param authenticator Name of the authenticator to be used
* @param ssl SSL configuration
*
* @return Created listener or NULL on error
*/
SERV_LISTENER* serviceCreateListener(SERVICE *service, const char *name, const char *protocol,
const char *address, unsigned short port, const char *authenticator,
const char *options, SSL_LISTENER *ssl)
{
SERV_LISTENER *proto = listener_alloc(service, name, protocol, address,
port, authenticator, options, ssl);
if (proto)
{
service_add_listener(service, proto);
}
return proto;
}
SERV_LISTENER* service_find_listener(SERVICE* service,
const char* socket,
const char* address, unsigned short port)
{
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener_is_active(listener))
{
bool is_same_port = false;
if (port && (port == listener->port) &&
((address && listener->address && strcmp(listener->address, address) == 0) ||
(address == NULL && listener->address == NULL)))
{
is_same_port = true;
}
bool is_same_socket = false;
if (!is_same_port)
{
if (socket && listener->address && strcmp(listener->address, socket) == 0)
{
is_same_socket = true;
}
}
if (is_same_port || is_same_socket)
{
return listener;
}
}
}
return NULL;
}
/**
* Check if a protocol/port pair is part of the service
*
* @param service The service
* @param protocol The name of the protocol module
* @param address The address to listen on
* @param port The port to listen on
* @return True if the protocol/port is already part of the service
*/
bool serviceHasListener(SERVICE* service, const char* name, const char* protocol,
const char* address, unsigned short port)
{
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener_is_active(listener) &&
// Listener with same name exists
(strcmp(listener->name, name) == 0 ||
// Listener listening on the same interface and port exists
((strcmp(listener->protocol, protocol) == 0 && listener->port == port &&
((address && listener->address && strcmp(listener->address, address) == 0) ||
(address == NULL && listener->address == NULL))))))
{
return true;
}
}
return false;
}
bool service_has_named_listener(SERVICE *service, const char *name)
{
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener_is_active(listener) && strcmp(listener->name, name) == 0)
{
return true;
}
}
return false;
}
bool service_can_be_destroyed(SERVICE *service)
{
bool rval = true;
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener_is_active(listener))
{
rval = false;
break;
}
}
if (rval)
{
for (auto s = service->dbref; s; s = s->next)
{
if (s->active)
{
rval = false;
break;
}
}
}
return rval;
}
/**
* Allocate a new server reference
*
* @param server Server to refer to
* @return Server reference or NULL on error
*/
static SERVER_REF* server_ref_create(SERVER *server)
{
SERVER_REF *sref = (SERVER_REF*)MXS_MALLOC(sizeof(SERVER_REF));
if (sref)
{
sref->next = NULL;
sref->server = server;
sref->weight = SERVICE_BASE_SERVER_WEIGHT;
sref->connections = 0;
sref->active = true;
}
return sref;
}
/**
* Add a backend database server to a service
*
* @param service The service to add the server to
* @param server The server to add
*/
bool serviceAddBackend(SERVICE *service, SERVER *server)
{
bool rval = false;
if (!serviceHasBackend(service, server))
{
SERVER_REF *new_ref = server_ref_create(server);
if (new_ref)
{
rval = true;
spinlock_acquire(&service->spin);
service->n_dbref++;
if (service->dbref)
{
SERVER_REF *ref = service->dbref;
SERVER_REF *prev = ref;
while (ref)
{
if (ref->server == server)
{
ref->active = true;
break;
}
prev = ref;
ref = ref->next;
}
if (ref == NULL)
{
/** A new server that hasn't been used by this service */
atomic_synchronize();
prev->next = new_ref;
}
else
{
MXS_FREE(new_ref);
}
}
else
{
atomic_synchronize();
service->dbref = new_ref;
}
spinlock_release(&service->spin);
}
}
return rval;
}
/**
* @brief Remove a server from a service
*
* This function sets the server reference into an inactive state. This does not
* remove the server from the list or free any of the memory.
*
* @param service Service to modify
* @param server Server to remove
*/
void serviceRemoveBackend(SERVICE *service, const SERVER *server)
{
spinlock_acquire(&service->spin);
for (SERVER_REF *ref = service->dbref; ref; ref = ref->next)
{
if (ref->server == server && ref->active)
{
ref->active = false;
service->n_dbref--;
break;
}
}
spinlock_release(&service->spin);
}
/**
* Test if a server is part of a service
*
* @param service The service to add the server to
* @param server The server to add
* @return Non-zero if the server is already part of the service
*/
bool
serviceHasBackend(SERVICE *service, SERVER *server)
{
SERVER_REF *ptr;
spinlock_acquire(&service->spin);
ptr = service->dbref;
while (ptr)
{
if (ptr->server == server && ptr->active)
{
break;
}
ptr = ptr->next;
}
spinlock_release(&service->spin);
return ptr != NULL;
}
/**
* Set the service user that is used to log in to the backebd servers
* associated with this service.
*
* @param service The service we are setting the data for
* @param user The user name to use for connections
* @param auth The authentication data we need, e.g. MySQL SHA1 password
* @return 0 on failure
*/
int
serviceSetUser(SERVICE *service, const char *user, const char *auth)
{
if (service->credentials.name != user)
{
snprintf(service->credentials.name,
sizeof(service->credentials.name), "%s", user);
}
if (service->credentials.authdata != auth)
{
snprintf(service->credentials.authdata,
sizeof(service->credentials.authdata), "%s", auth);
}
return 1;
}
/**
* Get the service user that is used to log in to the backebd servers
* associated with this service.
*
* @param service The service we are setting the data for
* @param user The user name to use for connections
* @param auth The authentication data we need, e.g. MySQL SHA1 password
* @return 0 on failure
*/
int
serviceGetUser(SERVICE *service, char **user, char **auth)
{
if (service->credentials.name == NULL || service->credentials.authdata == NULL)
{
return 0;
}
*user = service->credentials.name;
*auth = service->credentials.authdata;
return 1;
}
/**
* Enable/Disable root user for this service
* associated with this service.
*
* @param service The service we are setting the data for
* @param action 1 for root enable, 0 for disable access
* @return 0 on failure
*/
int
serviceEnableRootUser(SERVICE *service, int action)
{
if (action != 0 && action != 1)
{
return 0;
}
service->enable_root = action;
return 1;
}
/**
* Enable/Disable loading the user data from only one server or all of them
*
* @param service The service we are setting the data for
* @param action 1 for all servers, 0 for single server
* @return 0 on failure
*/
int
serviceAuthAllServers(SERVICE *service, int action)
{
if (action != 0 && action != 1)
{
return 0;
}
service->users_from_all = action;
return 1;
}
/**
* Whether to strip escape characters from the name of the database the client
* is connecting to.
* @param service Service to configure
* @param action 0 for disabled, 1 for enabled
* @return 1 if successful, 0 on error
*/
int serviceStripDbEsc(SERVICE* service, int action)
{
if (action != 0 && action != 1)
{
return 0;
}
service->strip_db_esc = action;
return 1;
}
/**
* Sets the session timeout for the service.
* @param service Service to configure
* @param val Timeout in seconds
* @return 1 on success, 0 when the value is invalid
*/
int
serviceSetTimeout(SERVICE *service, int val)
{
if (val < 0)
{
return 0;
}
/** Enable the session timeout checks if and only if at least one service is
* configured with a idle timeout. */
if ((service->conn_idle_timeout = val))
{
dcb_enable_session_timeouts();
}
return 1;
}
void serviceSetVersionString(SERVICE *service, const char* value)
{
if (service->version_string != value)
{
snprintf(service->version_string, sizeof(service->version_string), "%s", value);
}
}
/**
* Sets the connection limits, if any, for the service.
* @param service Service to configure
* @param max The maximum number of client connections at any one time
* @param queued The maximum number of connections to queue up when
* max_connections clients are already connected
* @param timeout Maximum amount of time to wait for a connection to
* become available.
* @return 1 on success, 0 when the values are invalid
*/
int
serviceSetConnectionLimits(SERVICE *service, int max, int queued, int timeout)
{
if (max < 0 || queued < 0)
{
return 0;
}
service->max_connections = max;
ss_info_dassert(queued == 0, "Queued connections not implemented.");
ss_info_dassert(timeout == 0, "Queued connections not implemented.");
return 1;
}
/**
* Enable or disable the restarting of the service on failure.
* @param service Service to configure
* @param value A string representation of a boolean value
*/
void serviceSetRetryOnFailure(SERVICE *service, const char* value)
{
if (value)
{
service->retry_start = config_truth_value(value);
}
}
void service_set_retry_interval(SERVICE *service, int value)
{
ss_dassert(value > 0);
service->max_retry_interval = value;
}
/**
* Set the filters used by the service
*
* @param service The service itself
* @param filters The filters to use separated by the pipe character |
*
* @return True if loading and creating all filters was successful. False if a
* filter module was not found or the instance creation failed.
*/
bool service_set_filters(SERVICE* service, const char* filters)
{
bool rval = true;
std::vector<MXS_FILTER_DEF*> flist;
uint64_t capabilities = 0;
for (auto&& f: mxs::strtok(filters, "|"))
{
fix_object_name(&f[0]);
if (MXS_FILTER_DEF* def = filter_def_find(f.c_str()))
{
if (filter_load(def))
{
flist.push_back(def);
const MXS_MODULE* module = get_module(def->module, MODULE_FILTER);
ss_dassert(module);
capabilities |= module->module_capabilities;
if (def->obj->getCapabilities)
{
capabilities |= def->obj->getCapabilities(def->filter);
}
}
else
{
MXS_ERROR("Failed to load filter '%s' for service '%s'.", f.c_str(), service->name);
rval = false;
}
}
else
{
MXS_ERROR("Unable to find filter '%s' for service '%s'", f.c_str(), service->name);
rval = false;
}
}
if (rval)
{
service->filters = (MXS_FILTER_DEF**)MXS_MALLOC((flist.size() + 1) * sizeof(MXS_FILTER_DEF*));
std::copy(flist.begin(), flist.end(), service->filters);
service->n_filters = flist.size();
service->filters[service->n_filters] = NULL;
service->capabilities |= capabilities;
}
return rval;
}
/**
* Return a named service
*
* @param servname The name of the service to find
* @return The service or NULL if not found
*/
SERVICE *
service_find(const char *servname)
{
SERVICE *service;
spinlock_acquire(&service_spin);
service = allServices;
while (service)
{
if (strcmp(service->name, servname) == 0 && service->active)
{
break;
}
service = service->next;
}
spinlock_release(&service_spin);
return service;
}
/**
* Print details of an individual service
*
* @param service Service to print
*/
void
printService(SERVICE *service)
{
SERVER_REF *ptr = service->dbref;
struct tm result;
char time_buf[30];
int i;
printf("\tService: %s\n", service->name);
printf("\tRouter: %s\n", service->routerModule);
printf("\tStarted: %s",
asctime_r(localtime_r(&service->stats.started, &result), time_buf));
printf("\tBackend databases\n");
while (ptr)
{
printf("\t\t[%s]:%d Protocol: %s\n", ptr->server->address, ptr->server->port, ptr->server->protocol);
ptr = ptr->next;
}
if (service->n_filters)
{
printf("\tFilter chain: ");
for (i = 0; i < service->n_filters; i++)
{
printf("%s %s ", service->filters[i]->name,
i + 1 < service->n_filters ? "|" : "");
}
printf("\n");
}
printf("\tTotal connections: %d\n", service->stats.n_sessions);
printf("\tCurrently connected: %d\n", service->stats.n_current);
}
/**
* Print all services
*
* Designed to be called within a debugger session in order
* to display all active services within the gateway
*/
void
printAllServices()
{
SERVICE *ptr;
spinlock_acquire(&service_spin);
ptr = allServices;
while (ptr)
{
printService(ptr);
ptr = ptr->next;
}
spinlock_release(&service_spin);
}
/**
* Print all services to a DCB
*
* Designed to be called within a CLI command in order
* to display all active services within the gateway
*/
void
dprintAllServices(DCB *dcb)
{
SERVICE *ptr;
spinlock_acquire(&service_spin);
ptr = allServices;
while (ptr)
{
dprintService(dcb, ptr);
ptr = ptr->next;
}
spinlock_release(&service_spin);
}
/**
* Print details of a single service.
*
* @param dcb DCB to print data to
* @param service The service to print
*/
void dprintService(DCB *dcb, SERVICE *service)
{
SERVER_REF *server = service->dbref;
struct tm result;
char timebuf[30];
int i;
dcb_printf(dcb, "\tService: %s\n", service->name);
dcb_printf(dcb, "\tRouter: %s\n", service->routerModule);
switch (service->state)
{
case SERVICE_STATE_STARTED:
dcb_printf(dcb, "\tState: Started\n");
break;
case SERVICE_STATE_STOPPED:
dcb_printf(dcb, "\tState: Stopped\n");
break;
case SERVICE_STATE_FAILED:
dcb_printf(dcb, "\tState: Failed\n");
break;
case SERVICE_STATE_ALLOC:
dcb_printf(dcb, "\tState: Allocated\n");
break;
}
if (service->router && service->router_instance)
{
service->router->diagnostics(service->router_instance, dcb);
}
dcb_printf(dcb, "\tStarted: %s",
asctime_r(localtime_r(&service->stats.started, &result), timebuf));
dcb_printf(dcb, "\tRoot user access: %s\n",
service->enable_root ? "Enabled" : "Disabled");
if (service->n_filters)
{
dcb_printf(dcb, "\tFilter chain: ");
for (i = 0; i < service->n_filters; i++)
{
dcb_printf(dcb, "%s %s ", service->filters[i]->name,
i + 1 < service->n_filters ? "|" : "");
}
dcb_printf(dcb, "\n");
}
dcb_printf(dcb, "\tBackend databases:\n");
while (server)
{
if (SERVER_REF_IS_ACTIVE(server))
{
dcb_printf(dcb, "\t\t[%s]:%d Protocol: %s Name: %s\n",
server->server->address, server->server->port,
server->server->protocol, server->server->name);
}
server = server->next;
}
if (*service->weightby)
{
dcb_printf(dcb, "\tRouting weight parameter: %s\n",
service->weightby);
}
dcb_printf(dcb, "\tTotal connections: %d\n",
service->stats.n_sessions);
dcb_printf(dcb, "\tCurrently connected: %d\n",
service->stats.n_current);
}
/**
* List the defined services in a tabular format.
*
* @param dcb DCB to print the service list to.
*/
void
dListServices(DCB *dcb)
{
SERVICE *service;
const char HORIZ_SEPARATOR[] = "--------------------------+-------------------"
"+--------+----------------+-------------------\n";
spinlock_acquire(&service_spin);
service = allServices;
if (service)
{
dcb_printf(dcb, "Services.\n");
dcb_printf(dcb, "%s", HORIZ_SEPARATOR);
dcb_printf(dcb, "%-25s | %-17s | #Users | Total Sessions | Backend databases\n",
"Service Name", "Router Module");
dcb_printf(dcb, "%s", HORIZ_SEPARATOR);
}
while (service)
{
ss_dassert(service->stats.n_current >= 0);
dcb_printf(dcb, "%-25s | %-17s | %6d | %14d | ",
service->name, service->routerModule,
service->stats.n_current, service->stats.n_sessions);
SERVER_REF* server_ref = service->dbref;
bool first = true;
while (server_ref)
{
if (SERVER_REF_IS_ACTIVE(server_ref))
{
if (first)
{
dcb_printf(dcb, "%s", server_ref->server->name);
}
else
{
dcb_printf(dcb, ", %s", server_ref->server->name);
}
first = false;
}
server_ref = server_ref->next;
}
dcb_printf(dcb, "\n");
service = service->next;
}
if (allServices)
{
dcb_printf(dcb, "%s\n", HORIZ_SEPARATOR);
}
spinlock_release(&service_spin);
}
/**
* List the defined listeners in a tabular format.
*
* @param dcb DCB to print the service list to.
*/
void
dListListeners(DCB *dcb)
{
SERVICE *service;
SERV_LISTENER *port;
spinlock_acquire(&service_spin);
service = allServices;
if (service)
{
dcb_printf(dcb, "Listeners.\n");
dcb_printf(dcb, "---------------------+---------------------+"
"--------------------+-----------------+-------+--------\n");
dcb_printf(dcb, "%-20s | %-19s | %-18s | %-15s | Port | State\n",
"Name", "Service Name", "Protocol Module", "Address");
dcb_printf(dcb, "---------------------+---------------------+"
"--------------------+-----------------+-------+--------\n");
}
while (service)
{
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener_is_active(listener))
{
dcb_printf(dcb, "%-20s | %-19s | %-18s | %-15s | %5d | %s\n",
listener->name, service->name, listener->protocol,
(listener && listener->address) ? listener->address : "*",
listener->port,
listener_state_to_string(listener));
}
}
service = service->next;
}
if (allServices)
{
dcb_printf(dcb, "---------------------+---------------------+"
"--------------------+-----------------+-------+--------\n\n");
}
spinlock_release(&service_spin);
}
/**
* Refresh the database users for the service
* This function replaces the MySQL users used by the service with the latest
* version found on the backend servers. There is a limit on how often the users
* can be reloaded and if this limit is exceeded, the reload will fail.
* @param service Service to reload
* @return 0 on success and 1 on error
*/
int service_refresh_users(SERVICE *service)
{
ss_dassert(service);
int ret = 1;
int self = mxs_rworker_get_current_id();
ss_dassert(self >= 0);
time_t now = time(NULL);
if ((service->capabilities & ACAP_TYPE_ASYNC) == 0)
{
spinlock_acquire(&service->spin);
// Use only one rate limitation for synchronous authenticators to keep
// rate limitations synchronous as well
self = 0;
}
MXS_CONFIG* config = config_get_global_options();
/* Check if refresh rate limit has been exceeded */
if (now < service->rate_limits[self].last + config->users_refresh_time)
{
if (!service->rate_limits[self].warned)
{
MXS_WARNING("[%s] Refresh rate limit (once every %ld seconds) exceeded for "
"load of users' table.",
service->name, config->users_refresh_time);
service->rate_limits[self].warned = true;
}
}
else
{
service->rate_limits[self].last = now;
service->rate_limits[self].warned = false;
ret = 0;
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
/** Load the authentication users before before starting the listener */
if (listener_is_active(listener) && listener->listener &&
listener->listener->authfunc.loadusers)
{
switch (listener->listener->authfunc.loadusers(listener))
{
case MXS_AUTH_LOADUSERS_FATAL:
MXS_ERROR("[%s] Fatal error when loading users for listener '%s',"
" authentication will not work.", service->name, listener->name);
ret = 1;
break;
case MXS_AUTH_LOADUSERS_ERROR:
MXS_WARNING("[%s] Failed to load users for listener '%s', authentication"
" might not work.", service->name, listener->name);
ret = 1;
break;
default:
break;
}
}
}
}
if ((service->capabilities & ACAP_TYPE_ASYNC) == 0)
{
spinlock_release(&service->spin);
}
return ret;
}
void service_add_parameters(SERVICE *service, const MXS_CONFIG_PARAMETER *param)
{
while (param)
{
MXS_CONFIG_PARAMETER *new_param = config_clone_param(param);
new_param->next = service->svc_config_param;
service->svc_config_param = new_param;
param = param->next;
}
}
void service_add_parameter(SERVICE *service, const char* key, const char* value)
{
MXS_CONFIG_PARAMETER p{const_cast<char*>(key), const_cast<char*>(value), nullptr};
service_add_parameters(service, &p);
}
void service_remove_parameter(SERVICE *service, const char* key)
{
if (MXS_CONFIG_PARAMETER* params = service->svc_config_param)
{
MXS_CONFIG_PARAMETER* to_free = NULL;
if (strcasecmp(params->name, key) == 0)
{
service->svc_config_param = params->next;
to_free = params;
}
else
{
while (MXS_CONFIG_PARAMETER* p = params->next)
{
if (strcasecmp(p->name, key) == 0)
{
params->next = p->next;
to_free = p;
break;
}
params = p;
}
}
if (to_free)
{
// Set next pointer to null to prevent freeing of other parameters
to_free->next = NULL;
config_parameter_free(to_free);
}
}
}
void service_replace_parameter(SERVICE *service, const char* key, const char* value)
{
for (MXS_CONFIG_PARAMETER* p = service->svc_config_param; p; p = p->next)
{
if (strcasecmp(p->name, key) == 0)
{
MXS_FREE(p->value);
p->value = MXS_STRDUP_A(value);
return;
}
}
service_add_parameter(service, key, value);
}
/**
* Set the weighting parameter for the service
*
* @param service The service pointer
* @param weightby The parameter name to weight the routing by
*/
void serviceWeightBy(SERVICE *service, const char *weightby)
{
if (service->weightby != weightby)
{
snprintf(service->weightby, sizeof(service->weightby), "%s", weightby);
}
}
/**
* Return the parameter the wervice shoudl use to weight connections
* by
* @param service The Service pointer
*/
const char* serviceGetWeightingParameter(SERVICE *service)
{
return service->weightby;
}
/**
* Enable/Disable localhost authentication match criteria
* associated with this service.
*
* @param service The service we are setting the data for
* @param action 1 for enable, 0 for disable access
* @return 0 on failure
*/
int
serviceEnableLocalhostMatchWildcardHost(SERVICE *service, int action)
{
if (action != 0 && action != 1)
{
return 0;
}
service->localhost_match_wildcard_host = action;
return 1;
}
void service_shutdown()
{
SERVICE* svc;
spinlock_acquire(&service_spin);
svc = allServices;
while (svc != NULL)
{
svc->svc_do_shutdown = true;
svc = svc->next;
}
spinlock_release(&service_spin);
}
/**
* Destroy a listener
*
* @param sl The listener to destroy.
*
* @return The next listener or NULL if there is not one.
*/
static SERV_LISTENER* service_destroy_listener(SERV_LISTENER* sl)
{
SERV_LISTENER* next = sl->next;
dcb_close(sl->listener);
// TODO: What else should be closed and freed here?
return next;
}
typedef std::map<MXS_FILTER*, void(*)(MXS_FILTER*)> DestructorsByFilter;
/**
* Destroy one service instance
*
* @param svc The service to destroy.
*/
static void service_destroy_instance(SERVICE* svc, DestructorsByFilter* filters_to_delete)
{
SERV_LISTENER* sl = svc->ports;
while (sl)
{
sl = service_destroy_listener(sl);
}
/* Call destroyInstance hook for routers */
if (svc->router->destroyInstance && svc->router_instance)
{
svc->router->destroyInstance(svc->router_instance);
}
if (svc->n_filters)
{
MXS_FILTER_DEF **filters = svc->filters;
for (int i = 0; i < svc->n_filters; i++)
{
if (filters[i]->obj->destroyInstance && filters[i]->filter)
{
if (filters_to_delete->find(filters[i]->filter) == filters_to_delete->end())
{
auto entry = std::make_pair(filters[i]->filter, filters[i]->obj->destroyInstance);
filters_to_delete->insert(entry);
}
}
}
}
}
void service_destroy_instances(void)
{
spinlock_acquire(&service_spin);
DestructorsByFilter filters_to_delete;
SERVICE* svc = allServices;
while (svc != NULL)
{
ss_dassert(svc->svc_do_shutdown);
service_destroy_instance(svc, &filters_to_delete);
svc = svc->next;
}
for (auto i = filters_to_delete.begin(); i != filters_to_delete.end(); ++i)
{
i->second(i->first);
}
spinlock_release(&service_spin);
}
/**
* Return the count of all sessions active for all services
*
* @return Count of all active sessions
*/
int
serviceSessionCountAll()
{
SERVICE *service;
int rval = 0;
spinlock_acquire(&service_spin);
service = allServices;
while (service)
{
rval += service->stats.n_current;
service = service->next;
}
spinlock_release(&service_spin);
return rval;
}
/**
* Provide a row to the result set that defines the set of service
* listeners
*
* TODO: Replace these
*
* @param set The result set
* @param data The index of the row to send
* @return The next row or NULL
*/
static RESULT_ROW *
serviceListenerRowCallback(RESULTSET *set, void *data)
{
int *rowno = (int *)data;
int i = 0;;
char buf[20];
RESULT_ROW *row;
SERVICE *service;
SERV_LISTENER *lptr = NULL;
spinlock_acquire(&service_spin);
service = allServices;
if (service)
{
lptr = service->ports;
}
while (i < *rowno && service)
{
lptr = service->ports;
while (i < *rowno && lptr)
{
if ((lptr = lptr->next) != NULL)
{
i++;
}
}
if (i < *rowno)
{
service = service->next;
if (service && (lptr = service->ports) != NULL)
{
i++;
}
}
}
if (lptr == NULL)
{
spinlock_release(&service_spin);
MXS_FREE(data);
return NULL;
}
(*rowno)++;
row = resultset_make_row(set);
resultset_row_set(row, 0, service->name);
resultset_row_set(row, 1, lptr->protocol);
resultset_row_set(row, 2, (lptr && lptr->address) ? lptr->address : "*");
sprintf(buf, "%d", lptr->port);
resultset_row_set(row, 3, buf);
resultset_row_set(row, 4, listener_state_to_string(lptr));
spinlock_release(&service_spin);
return row;
}
/**
* Return a resultset that has the current set of services in it
*
* @return A Result set
*/
RESULTSET *
serviceGetListenerList()
{
RESULTSET *set;
int *data;
if ((data = (int *)MXS_MALLOC(sizeof(int))) == NULL)
{
return NULL;
}
*data = 0;
if ((set = resultset_create(serviceListenerRowCallback, data)) == NULL)
{
MXS_FREE(data);
return NULL;
}
resultset_add_column(set, "Service Name", 25, COL_TYPE_VARCHAR);
resultset_add_column(set, "Protocol Module", 20, COL_TYPE_VARCHAR);
resultset_add_column(set, "Address", 15, COL_TYPE_VARCHAR);
resultset_add_column(set, "Port", 5, COL_TYPE_VARCHAR);
resultset_add_column(set, "State", 8, COL_TYPE_VARCHAR);
return set;
}
/**
* Provide a row to the result set that defines the set of services
*
* @param set The result set
* @param data The index of the row to send
* @return The next row or NULL
*/
static RESULT_ROW *
serviceRowCallback(RESULTSET *set, void *data)
{
int *rowno = (int *)data;
int i = 0;
char buf[20];
RESULT_ROW *row;
SERVICE *service;
spinlock_acquire(&service_spin);
service = allServices;
while (i < *rowno && service)
{
i++;
service = service->next;
}
if (service == NULL)
{
spinlock_release(&service_spin);
MXS_FREE(data);
return NULL;
}
(*rowno)++;
row = resultset_make_row(set);
resultset_row_set(row, 0, service->name);
resultset_row_set(row, 1, service->routerModule);
sprintf(buf, "%d", service->stats.n_current);
resultset_row_set(row, 2, buf);
sprintf(buf, "%d", service->stats.n_sessions);
resultset_row_set(row, 3, buf);
spinlock_release(&service_spin);
return row;
}
/**
* Return a result set that has the current set of services in it
*
* @return A Result set
*/
RESULTSET *
serviceGetList()
{
RESULTSET *set;
int *data;
if ((data = (int *)MXS_MALLOC(sizeof(int))) == NULL)
{
return NULL;
}
*data = 0;
if ((set = resultset_create(serviceRowCallback, data)) == NULL)
{
MXS_FREE(data);
return NULL;
}
resultset_add_column(set, "Service Name", 25, COL_TYPE_VARCHAR);
resultset_add_column(set, "Router Module", 20, COL_TYPE_VARCHAR);
resultset_add_column(set, "No. Sessions", 10, COL_TYPE_VARCHAR);
resultset_add_column(set, "Total Sessions", 10, COL_TYPE_VARCHAR);
return set;
}
/**
* Function called by the housekeeper thread to retry starting of a service
* @param data Service to restart
*/
static bool service_internal_restart(void *data)
{
SERVICE* service = (SERVICE*)data;
serviceStartAllPorts(service);
return false;
}
/**
* Check that all services have listeners
* @return True if all services have listeners
*/
bool service_all_services_have_listeners()
{
bool rval = true;
spinlock_acquire(&service_spin);
SERVICE* service = allServices;
while (service)
{
LISTENER_ITERATOR iter;
SERV_LISTENER *listener = listener_iterator_init(service, &iter);
if (listener == NULL)
{
MXS_ERROR("Service '%s' has no listeners.", service->name);
rval = false;
}
service = service->next;
}
spinlock_release(&service_spin);
return rval;
}
static void service_calculate_weights(SERVICE *service)
{
const char *weightby = serviceGetWeightingParameter(service);
if (*weightby && service->dbref)
{
char buf[50]; // Enough to hold most numbers
/** Service has a weighting parameter and at least one server */
int total = 0;
/** Calculate total weight */
for (SERVER_REF *server = service->dbref; server; server = server->next)
{
server->weight = SERVICE_BASE_SERVER_WEIGHT;
if (server_get_parameter(server->server, weightby, buf, sizeof(buf)))
{
total += atoi(buf);
}
}
if (total == 0)
{
MXS_WARNING("Weighting Parameter for service '%s' will be ignored as "
"no servers have values for the parameter '%s'.",
service->name, weightby);
}
else if (total < 0)
{
MXS_ERROR("Sum of weighting parameter '%s' for service '%s' exceeds "
"maximum value of %d. Weighting will be ignored.",
weightby, service->name, INT_MAX);
}
else
{
/** Calculate the relative weight of the servers */
for (SERVER_REF *server = service->dbref; server; server = server->next)
{
if (server_get_parameter(server->server, weightby, buf, sizeof(buf)))
{
int wght = atoi(buf);
int perc = (wght * SERVICE_BASE_SERVER_WEIGHT) / total;
if (perc == 0)
{
MXS_WARNING("Weighting parameter '%s' with a value of %d for"
" server '%s' rounds down to zero with total weight"
" of %d for service '%s'. No queries will be "
"routed to this server as long as a server with"
" positive weight is available.",
weightby, wght, server->server->name,
total, service->name);
}
else if (perc < 0)
{
MXS_ERROR("Weighting parameter '%s' for server '%s' is too large, "
"maximum value is %d. No weighting will be used for this "
"server.", weightby, server->server->name,
INT_MAX / SERVICE_BASE_SERVER_WEIGHT);
perc = SERVICE_BASE_SERVER_WEIGHT;
}
server->weight = perc;
}
else
{
MXS_WARNING("Server '%s' has no parameter '%s' used for weighting"
" for service '%s'.", server->server->name,
weightby, service->name);
}
}
}
}
}
void service_update_weights()
{
spinlock_acquire(&service_spin);
for (SERVICE *service = allServices; service; service = service->next)
{
service_calculate_weights(service);
}
spinlock_release(&service_spin);
}
bool service_server_in_use(const SERVER *server)
{
bool rval = false;
spinlock_acquire(&service_spin);
for (SERVICE *service = allServices; service && !rval; service = service->next)
{
spinlock_acquire(&service->spin);
for (SERVER_REF *ref = service->dbref; ref && !rval; ref = ref->next)
{
if (ref->active && ref->server == server)
{
rval = true;
}
}
spinlock_release(&service->spin);
}
spinlock_release(&service_spin);
return rval;
}
/**
* Creates a service configuration at the location pointed by @c filename
*
* @param service Service to serialize into a configuration
* @param filename Filename where configuration is written
* @return True on success, false on error
*/
static bool create_service_config(const SERVICE *service, const char *filename)
{
int file = open(filename, O_EXCL | O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (file == -1)
{
MXS_ERROR("Failed to open file '%s' when serializing service '%s': %d, %s",
filename, service->name, errno, mxs_strerror(errno));
return false;
}
/**
* TODO: Check for return values on all of the dprintf calls
*/
dprintf(file, "[%s]\n", service->name);
dprintf(file, "%s=service\n", CN_TYPE);
dprintf(file, "%s=%s\n", CN_ROUTER, service->routerModule);
dprintf(file, "%s=%s\n", CN_USER, service->credentials.name);
dprintf(file, "%s=%s\n", CN_PASSWORD, service->credentials.authdata);
dprintf(file, "%s=%s\n", CN_ENABLE_ROOT_USER, service->enable_root ? "true" : "false");
dprintf(file, "%s=%d\n", CN_MAX_RETRY_INTERVAL, service->max_retry_interval);
dprintf(file, "%s=%d\n", CN_MAX_CONNECTIONS, service->max_connections);
dprintf(file, "%s=%ld\n", CN_CONNECTION_TIMEOUT, service->conn_idle_timeout);
dprintf(file, "%s=%s\n", CN_AUTH_ALL_SERVERS, service->users_from_all ? "true" : "false");
dprintf(file, "%s=%s\n", CN_STRIP_DB_ESC, service->strip_db_esc ? "true" : "false");
dprintf(file, "%s=%s\n", CN_LOCALHOST_MATCH_WILDCARD_HOST,
service->localhost_match_wildcard_host ? "true" : "false");
dprintf(file, "%s=%s\n", CN_LOG_AUTH_WARNINGS, service->log_auth_warnings ? "true" : "false");
dprintf(file, "%s=%s\n", CN_RETRY_ON_FAILURE, service->retry_start ? "true" : "false");
if (*service->version_string)
{
dprintf(file, "%s=%s\n", CN_VERSION_STRING, service->version_string);
}
if (*service->weightby)
{
dprintf(file, "%s=%s\n", CN_WEIGHTBY, service->weightby);
}
if (service->dbref)
{
dprintf(file, "%s=", CN_SERVERS);
const char *sep = "";
for (SERVER_REF *db = service->dbref; db; db = db->next)
{
if (SERVER_REF_IS_ACTIVE(db))
{
dprintf(file, "%s%s", sep, db->server->name);
sep = ",";
}
}
dprintf(file, "\n");
}
std::unordered_set<std::string> common_params
{
CN_TYPE,
CN_USER,
CN_PASSWORD,
CN_ENABLE_ROOT_USER,
CN_MAX_RETRY_INTERVAL,
CN_MAX_CONNECTIONS,
CN_CONNECTION_TIMEOUT,
CN_AUTH_ALL_SERVERS,
CN_STRIP_DB_ESC,
CN_LOCALHOST_MATCH_WILDCARD_HOST,
CN_LOG_AUTH_WARNINGS,
CN_RETRY_ON_FAILURE,
CN_VERSION_STRING,
CN_WEIGHTBY,
CN_SERVERS
};
// Dump router specific parameters
for (auto p = service->svc_config_param; p; p = p->next)
{
if (common_params.count(p->name) == 0)
{
dprintf(file, "%s=%s\n", p->name, p->value);
}
}
close(file);
return true;
}
bool service_serialize(const SERVICE *service)
{
bool rval = false;
char filename[PATH_MAX];
snprintf(filename, sizeof(filename), "%s/%s.cnf.tmp", get_config_persistdir(),
service->name);
if (unlink(filename) == -1 && errno != ENOENT)
{
MXS_ERROR("Failed to remove temporary service configuration at '%s': %d, %s",
filename, errno, mxs_strerror(errno));
}
else if (create_service_config(service, filename))
{
char final_filename[PATH_MAX];
strcpy(final_filename, filename);
char *dot = strrchr(final_filename, '.');
ss_dassert(dot);
*dot = '\0';
if (rename(filename, final_filename) == 0)
{
rval = true;
}
else
{
MXS_ERROR("Failed to rename temporary service configuration at '%s': %d, %s",
filename, errno, mxs_strerror(errno));
}
}
return rval;
}
bool service_serialize_servers(const SERVICE *service)
{
bool rval = false;
char filename[PATH_MAX];
snprintf(filename, sizeof(filename), "%s/%s.cnf.tmp", get_config_persistdir(),
service->name);
if (unlink(filename) == -1 && errno != ENOENT)
{
MXS_ERROR("Failed to remove temporary service configuration at '%s': %d, %s",
filename, errno, mxs_strerror(errno));
}
else if (create_service_config(service, filename))
{
char final_filename[PATH_MAX];
strcpy(final_filename, filename);
char *dot = strrchr(final_filename, '.');
ss_dassert(dot);
*dot = '\0';
if (rename(filename, final_filename) == 0)
{
rval = true;
}
else
{
MXS_ERROR("Failed to rename temporary service configuration at '%s': %d, %s",
filename, errno, mxs_strerror(errno));
}
}
return rval;
}
void service_print_users(DCB *dcb, const SERVICE *service)
{
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener_is_active(listener) && listener->listener &&
listener->listener->authfunc.diagnostic)
{
dcb_printf(dcb, "User names (%s): ", listener->name);
listener->listener->authfunc.diagnostic(dcb, listener);
dcb_printf(dcb, "\n");
}
}
}
bool service_port_is_used(unsigned short port)
{
bool rval = false;
spinlock_acquire(&service_spin);
for (SERVICE *service = allServices; service && !rval; service = service->next)
{
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener_is_active(listener) && listener->port == port)
{
rval = true;
break;
}
}
}
spinlock_release(&service_spin);
return rval;
}
static const char* service_state_to_string(int state)
{
switch (state)
{
case SERVICE_STATE_STARTED:
return "Started";
case SERVICE_STATE_STOPPED:
return "Stopped";
case SERVICE_STATE_FAILED:
return "Failed";
case SERVICE_STATE_ALLOC:
return "Allocated";
default:
ss_dassert(false);
return "Unknown";
}
}
json_t* service_parameters_to_json(const SERVICE* service)
{
json_t* rval = json_object();
string options{config_get_string(service->svc_config_param, "router_options")};
json_object_set_new(rval, CN_ROUTER_OPTIONS, json_string(options.c_str()));
json_object_set_new(rval, CN_USER, json_string(service->credentials.name));
json_object_set_new(rval, CN_PASSWORD, json_string(service->credentials.authdata));
json_object_set_new(rval, CN_ENABLE_ROOT_USER, json_boolean(service->enable_root));
json_object_set_new(rval, CN_MAX_RETRY_INTERVAL, json_integer(service->max_retry_interval));
json_object_set_new(rval, CN_MAX_CONNECTIONS, json_integer(service->max_connections));
json_object_set_new(rval, CN_CONNECTION_TIMEOUT, json_integer(service->conn_idle_timeout));
json_object_set_new(rval, CN_AUTH_ALL_SERVERS, json_boolean(service->users_from_all));
json_object_set_new(rval, CN_STRIP_DB_ESC, json_boolean(service->strip_db_esc));
json_object_set_new(rval, CN_LOCALHOST_MATCH_WILDCARD_HOST,
json_boolean(service->localhost_match_wildcard_host));
json_object_set_new(rval, CN_VERSION_STRING, json_string(service->version_string));
if (*service->weightby)
{
json_object_set_new(rval, CN_WEIGHTBY, json_string(service->weightby));
}
json_object_set_new(rval, CN_LOG_AUTH_WARNINGS, json_boolean(service->log_auth_warnings));
json_object_set_new(rval, CN_RETRY_ON_FAILURE, json_boolean(service->retry_start));
/** Add custom module parameters */
const MXS_MODULE* mod = get_module(service->routerModule, MODULE_ROUTER);
config_add_module_params_json(mod, service->svc_config_param, config_service_params, rval);
return rval;
}
static inline bool have_active_servers(const SERVICE* service)
{
for (SERVER_REF* ref = service->dbref; ref; ref = ref->next)
{
if (SERVER_REF_IS_ACTIVE(ref))
{
return true;
}
}
return false;
}
static json_t* service_all_listeners_json_data(const SERVICE* service)
{
json_t* arr = json_array();
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener_is_active(listener))
{
json_array_append_new(arr, listener_to_json(listener));
}
}
return arr;
}
static json_t* service_listener_json_data(const SERVICE* service, const char* name)
{
LISTENER_ITERATOR iter;
for (SERV_LISTENER *listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener_is_active(listener) && strcmp(listener->name, name) == 0)
{
return listener_to_json(listener);
}
}
return NULL;
}
json_t* service_attributes(const SERVICE* service)
{
json_t* attr = json_object();
json_object_set_new(attr, CN_ROUTER, json_string(service->routerModule));
json_object_set_new(attr, CN_STATE, json_string(service_state_to_string(service->state)));
if (service->router && service->router_instance && service->router->diagnostics_json)
{
json_t* diag = service->router->diagnostics_json(service->router_instance);
if (diag)
{
json_object_set_new(attr, CN_ROUTER_DIAGNOSTICS, diag);
}
}
struct tm result;
char timebuf[30];
asctime_r(localtime_r(&service->stats.started, &result), timebuf);
trim(timebuf);
json_object_set_new(attr, "started", json_string(timebuf));
json_object_set_new(attr, "total_connections", json_integer(service->stats.n_sessions));
json_object_set_new(attr, "connections", json_integer(service->stats.n_current));
/** Add service parameters and listeners */
json_object_set_new(attr, CN_PARAMETERS, service_parameters_to_json(service));
json_object_set_new(attr, CN_LISTENERS, service_all_listeners_json_data(service));
return attr;
}
json_t* service_relationships(const SERVICE* service, const char* host)
{
/** Store relationships to other objects */
json_t* rel = json_object();
if (service->n_filters)
{
json_t* filters = mxs_json_relationship(host, MXS_JSON_API_FILTERS);
for (int i = 0; i < service->n_filters; i++)
{
mxs_json_add_relation(filters, service->filters[i]->name, CN_FILTERS);
}
json_object_set_new(rel, CN_FILTERS, filters);
}
if (have_active_servers(service))
{
json_t* servers = mxs_json_relationship(host, MXS_JSON_API_SERVERS);
for (SERVER_REF* ref = service->dbref; ref; ref = ref->next)
{
if (SERVER_REF_IS_ACTIVE(ref))
{
mxs_json_add_relation(servers, ref->server->name, CN_SERVERS);
}
}
json_object_set_new(rel, CN_SERVERS, servers);
}
return rel;
}
json_t* service_json_data(const SERVICE* service, const char* host)
{
json_t* rval = json_object();
spinlock_acquire(&service->spin);
json_object_set_new(rval, CN_ID, json_string(service->name));
json_object_set_new(rval, CN_TYPE, json_string(CN_SERVICES));
json_object_set_new(rval, CN_ATTRIBUTES, service_attributes(service));
json_object_set_new(rval, CN_RELATIONSHIPS, service_relationships(service, host));
json_object_set_new(rval, CN_LINKS, mxs_json_self_link(host, CN_SERVICES, service->name));
spinlock_release(&service->spin);
return rval;
}
json_t* service_to_json(const SERVICE* service, const char* host)
{
string self = MXS_JSON_API_SERVICES;
self += service->name;
return mxs_json_resource(host, self.c_str(), service_json_data(service, host));
}
json_t* service_listener_list_to_json(const SERVICE* service, const char* host)
{
/** This needs to be done here as the listeners are sort of sub-resources
* of the service. */
string self = MXS_JSON_API_SERVICES;
self += service->name;
self += "/listeners";
return mxs_json_resource(host, self.c_str(), service_all_listeners_json_data(service));
}
json_t* service_listener_to_json(const SERVICE* service, const char* name, const char* host)
{
/** This needs to be done here as the listeners are sort of sub-resources
* of the service. */
string self = MXS_JSON_API_SERVICES;
self += service->name;
self += "/listeners/";
self += name;
return mxs_json_resource(host, self.c_str(), service_listener_json_data(service, name));
}
json_t* service_list_to_json(const char* host)
{
json_t* arr = json_array();
spinlock_acquire(&service_spin);
for (SERVICE *service = allServices; service; service = service->next)
{
json_t* svc = service_json_data(service, host);
if (svc)
{
json_array_append_new(arr, svc);
}
}
spinlock_release(&service_spin);
return mxs_json_resource(host, MXS_JSON_API_SERVICES, arr);
}
json_t* service_relations_to_filter(const MXS_FILTER_DEF* filter, const char* host)
{
json_t* rel = mxs_json_relationship(host, MXS_JSON_API_SERVICES);
spinlock_acquire(&service_spin);
for (SERVICE *service = allServices; service; service = service->next)
{
spinlock_acquire(&service->spin);
for (int i = 0; i < service->n_filters; i++)
{
if (service->filters[i] == filter)
{
mxs_json_add_relation(rel, service->name, CN_SERVICES);
}
}
spinlock_release(&service->spin);
}
spinlock_release(&service_spin);
return rel;
}
json_t* service_relations_to_server(const SERVER* server, const char* host)
{
std::vector<std::string> names;
spinlock_acquire(&service_spin);
for (SERVICE *service = allServices; service; service = service->next)
{
spinlock_acquire(&service->spin);
for (SERVER_REF *ref = service->dbref; ref; ref = ref->next)
{
if (ref->server == server && SERVER_REF_IS_ACTIVE(ref))
{
names.push_back(service->name);
}
}
spinlock_release(&service->spin);
}
spinlock_release(&service_spin);
json_t* rel = NULL;
if (!names.empty())
{
rel = mxs_json_relationship(host, MXS_JSON_API_SERVICES);
for (std::vector<std::string>::iterator it = names.begin();
it != names.end(); it++)
{
mxs_json_add_relation(rel, it->c_str(), CN_SERVICES);
}
}
return rel;
}
uint64_t service_get_version(const SERVICE *service, service_version_which_t which)
{
uint64_t version = 0;
if (which == SERVICE_VERSION_ANY)
{
SERVER_REF* sref = service->dbref;
while (sref && !sref->active)
{
sref = sref->next;
}
if (sref)
{
version = server_get_version(sref->server);
}
}
else
{
size_t n = 0;
uint64_t v;
if (which == SERVICE_VERSION_MIN)
{
v = UINT64_MAX;
}
else
{
ss_dassert(which == SERVICE_VERSION_MAX);
v = 0;
}
SERVER_REF* sref = service->dbref;
while (sref)
{
if (sref->active)
{
++n;
SERVER* s = sref->server;
uint64_t server_version = server_get_version(s);
if (which == SERVICE_VERSION_MIN)
{
if (server_version < v)
{
v = server_version;
}
}
else
{
ss_dassert(which == SERVICE_VERSION_MAX);
if (server_version > v)
{
v = server_version;
}
}
}
sref = sref->next;
}
if (n == 0)
{
v = 0;
}
version = v;
}
return version;
}
bool service_thread_init()
{
spinlock_acquire(&service_spin);
for (SERVICE* service = allServices; service; service = service->next)
{
if (service->capabilities & ACAP_TYPE_ASYNC)
{
service_refresh_users(service);
}
}
spinlock_release(&service_spin);
return true;
}