Merge branch 'develop' into MAX-167

Conflicts:
	server/modules/monitor/mysql_mon.c
	server/modules/routing/readwritesplit/readwritesplit.c
This commit is contained in:
VilhoRaatikka
2014-06-30 14:17:00 +03:00
36 changed files with 1601 additions and 340 deletions

View File

@ -230,6 +230,8 @@ int error_count = 0;
config_get_value(obj->parameters, "passwd");
char *enable_root_user =
config_get_value(obj->parameters, "enable_root_user");
char *weightby =
config_get_value(obj->parameters, "weightby");
char *version_string = config_get_value(obj->parameters, "version_string");
@ -259,6 +261,8 @@ int error_count = 0;
if (enable_root_user)
serviceEnableRootUser(obj->element, config_truth_value(enable_root_user));
if (weightby)
serviceWeightBy(obj->element, weightby);
if (!auth)
auth = config_get_value(obj->parameters, "auth");
@ -362,6 +366,30 @@ int error_count = 0;
"defined but no corresponding password.",
obj->object)));
}
if (obj->element)
{
CONFIG_PARAMETER *params = obj->parameters;
while (params)
{
if (strcmp(params->name, "address")
&& strcmp(params->name, "port")
&& strcmp(params->name,
"protocol")
&& strcmp(params->name,
"monitoruser")
&& strcmp(params->name,
"monitorpw")
&& strcmp(params->name,
"type")
)
{
serverAddParameter(obj->element,
params->name,
params->value);
}
params = params->next;
}
}
}
else if (!strcmp(type, "filter"))
{
@ -1238,8 +1266,6 @@ int i;
{
if (!strcmp(type, "service"))
param_set = service_params;
else if (!strcmp(type, "server"))
param_set = server_params;
else if (!strcmp(type, "listener"))
param_set = listener_params;
else if (!strcmp(type, "monitor"))

View File

@ -1270,6 +1270,41 @@ DCB *dcb;
spinlock_release(&dcbspin);
}
/**
* Diagnotic routine to print client DCB data in a tabular form.
*
* @param pdcb DCB to print results to
*/
void
dListClients(DCB *pdcb)
{
DCB *dcb;
spinlock_acquire(&dcbspin);
dcb = allDCBs;
dcb_printf(pdcb, "Client Connections\n");
dcb_printf(pdcb, "-----------------+------------+----------------------+------------\n");
dcb_printf(pdcb, " %-15s | %-10s | %-20s | %s\n",
"Client", "DCB", "Service", "Session");
dcb_printf(pdcb, "-----------------+------------+----------------------+------------\n");
while (dcb)
{
if (dcb_isclient(dcb)
&& dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER)
{
dcb_printf(pdcb, " %-15s | %10p | %-20s | %10p\n",
(dcb->remote ? dcb->remote : ""),
dcb, (dcb->session->service ?
dcb->session->service->name : ""),
dcb->session);
}
dcb = dcb->next;
}
dcb_printf(pdcb, "-----------------+------------+----------------------+------------\n\n");
spinlock_release(&dcbspin);
}
/**
* Diagnostic to print a DCB to another DCB
*
@ -1281,8 +1316,14 @@ dprintDCB(DCB *pdcb, DCB *dcb)
{
dcb_printf(pdcb, "DCB: %p\n", (void *)dcb);
dcb_printf(pdcb, "\tDCB state: %s\n", gw_dcb_state2string(dcb->state));
if (dcb->session && dcb->session->service)
dcb_printf(pdcb, "\tService: %s\n",
dcb->session->service->name);
if (dcb->remote)
dcb_printf(pdcb, "\tConnected to: %s\n", dcb->remote);
if (dcb->user)
dcb_printf(pdcb, "\tUsername: %s\n",
dcb->user);
dcb_printf(pdcb, "\tOwning Session: %p\n", dcb->session);
if (dcb->writeq)
dcb_printf(pdcb, "\tQueued write data: %d\n", gwbuf_length(dcb->writeq));

View File

@ -31,10 +31,11 @@
* 19/06/13 Mark Riddoch Extract the epoll functionality
* 21/06/13 Mark Riddoch Added initial config support
* 27/06/13
* 28/06/13 Vilho Raatikka Added necessary headers, example functions and
* calls to log manager and to query classifier.
* Put example code behind SS_DEBUG macros.
* 28/06/13 Vilho Raatikka Added necessary headers, example functions and
* calls to log manager and to query classifier.
* Put example code behind SS_DEBUG macros.
* 05/02/14 Mark Riddoch Addition of version string
* 29/06/14 Massimiliano Pinto Addition of pidfile
*
* @endverbatim
*/
@ -109,6 +110,9 @@ static char* server_groups[] = {
/* The data directory we created for this gateway instance */
static char datadir[PATH_MAX+1] = "";
/* The data directory we created for this gateway instance */
static char pidfile[PATH_MAX+1] = "";
/**
* exit flag for log flusher.
*/
@ -126,6 +130,8 @@ static bool daemon_mode = true;
static void log_flush_shutdown(void);
static void log_flush_cb(void* arg);
static int write_pid_file(char *); /* write MaxScale pidfile */
static void unlink_pidfile(void); /* remove pidfile */
static void libmysqld_done(void);
static bool file_write_header(FILE* outfile);
static bool file_write_footer(FILE* outfile);
@ -1018,6 +1024,7 @@ int main(int argc, char **argv)
goto return_main;
}
}
if (!daemon_mode)
{
fprintf(stderr,
@ -1137,6 +1144,8 @@ int main(int argc, char **argv)
rc = MAXSCALE_INTERNALERROR;
goto return_main;
}
/* register exit function for embedded MySQL library */
l = atexit(libmysqld_done);
if (l != 0) {
@ -1148,6 +1157,7 @@ int main(int argc, char **argv)
rc = MAXSCALE_INTERNALERROR;
goto return_main;
}
/*<
* If MaxScale home directory wasn't set by command-line argument.
* Next, resolve it from environment variable and further on,
@ -1199,7 +1209,7 @@ int main(int argc, char **argv)
rc = MAXSCALE_BADCONFIG;
goto return_main;
}
/*<
* Set a data directory for the mysqld library, we use
* a unique directory name to avoid clauses if multiple
@ -1322,7 +1332,11 @@ int main(int argc, char **argv)
LOGFILE_MESSAGE,
"MaxScale is running in process %i",
getpid())));
/* Write process pid into MaxScale pidfile */
write_pid_file(home_dir);
/* Init MaxScale poll system */
poll_init();
/*<
@ -1363,6 +1377,7 @@ int main(int argc, char **argv)
* Serve clients.
*/
poll_waitevents((void *)0);
/*<
* Wait server threads' completion.
*/
@ -1389,6 +1404,9 @@ int main(int argc, char **argv)
LOGFILE_MESSAGE,
"MaxScale shutdown completed.")));
/* Remove Pidfile */
unlink_pidfile();
return_main:
return rc;
} /*< End of main */
@ -1435,3 +1453,59 @@ static void log_flush_cb(
LOGIF(LM, (skygw_log_write(LOGFILE_MESSAGE,
"Finished MaxScale log flusher.")));
}
/**
* Unlink pid file, called at program exit
*/
static void unlink_pidfile(void)
{
if (strlen(pidfile)) {
if (unlink(pidfile)) {
fprintf(stderr, "MaxScale failed to remove pidfile %s: error %d, %s\n", pidfile, errno, strerror(errno));
}
}
}
/**
* Write process pid into pidfile anc close it
* Parameters:
* @param home_dir The MaxScale home dir
* @return 0 on success, 1 on failure
*
*/
static int write_pid_file(char *home_dir) {
int fd = -1;
snprintf(pidfile, PATH_MAX, "%s/log/maxscale.pid", home_dir);
fd = open(pidfile, O_WRONLY | O_CREAT | O_TRUNC, 0777);
if (fd == -1) {
fprintf(stderr, "MaxScale failed to open pidFile %s: error %d, %s\n", pidfile, errno, strerror(errno));
return 1;
} else {
char pidstr[50]="";
/* truncate pidfile content */
if (ftruncate(fd, 0) == -1) {
fprintf(stderr, "MaxScale failed to truncate pidfile %s: error %d, %s\n", pidfile, errno, strerror(errno));
}
snprintf(pidstr, sizeof(pidstr)-1, "%d", getpid());
if (pwrite(fd, pidstr, strlen(pidstr), 0) != (ssize_t)strlen(pidstr)) {
fprintf(stderr, "MaxScale failed to write into pidfile %s: error %d, %s\n", pidfile, errno, strerror(errno));
/* close file and return */
close(fd);
return 1;
}
/* close file */
close(fd);
fprintf(stderr, "MaxScale PID %s in pidfile %s\n", pidstr, pidfile);
}
/* success */
return 0;
}

View File

@ -379,7 +379,9 @@ MODULES *ptr = registered;
: (ptr->info->status == MODULE_BETA_RELEASE
? "Beta"
: (ptr->info->status == MODULE_GA
? "GA" : "Unknown"))));
? "GA"
: (ptr->info->status == MODULE_EXPERIMENTAL
? "Experimental" : "Unknown")))));
dcb_printf(dcb, "\n");
ptr = ptr->next;
}

View File

@ -73,6 +73,7 @@ MONITOR *mon;
return NULL;
}
mon->handle = (*mon->module->startMonitor)(NULL);
mon->state |= MONITOR_STATE_RUNNING;
spinlock_acquire(&monLock);
mon->next = allMonitors;
allMonitors = mon;
@ -93,6 +94,7 @@ monitor_free(MONITOR *mon)
MONITOR *ptr;
mon->module->stopMonitor(mon->handle);
mon->state &= ~MONITOR_STATE_RUNNING;
spinlock_acquire(&monLock);
if (allMonitors == mon)
allMonitors = mon->next;
@ -119,6 +121,7 @@ void
monitorStart(MONITOR *monitor)
{
monitor->handle = (*monitor->module->startMonitor)(monitor->handle);
monitor->state |= MONITOR_STATE_RUNNING;
}
/**
@ -130,6 +133,7 @@ void
monitorStop(MONITOR *monitor)
{
monitor->module->stopMonitor(monitor->handle);
monitor->state &= ~MONITOR_STATE_RUNNING;
}
/**
@ -200,6 +204,47 @@ MONITOR *ptr;
spinlock_release(&monLock);
}
/**
* Show a single monitor
*
* @param dcb DCB for printing output
*/
void
monitorShow(DCB *dcb, MONITOR *monitor)
{
dcb_printf(dcb, "Monitor: %p\n", monitor);
dcb_printf(dcb, "\tName: %s\n", monitor->name);
if (monitor->module->diagnostics)
monitor->module->diagnostics(dcb, monitor->handle);
}
/**
* List all the monitors
*
* @param dcb DCB for printing output
*/
void
monitorList(DCB *dcb)
{
MONITOR *ptr;
spinlock_acquire(&monLock);
ptr = allMonitors;
dcb_printf(dcb, "+----------------------+---------------------\n");
dcb_printf(dcb, "| %-20s | Status\n", "Monitor");
dcb_printf(dcb, "+----------------------+---------------------\n");
while (ptr)
{
dcb_printf(dcb, "| %-20s | %s\n", ptr->name,
ptr->state & MONITOR_STATE_RUNNING
? "Running" : "Stopped");
ptr = ptr->next;
}
dcb_printf(dcb, "+----------------------+---------------------\n");
spinlock_release(&monLock);
}
/**
* Find a monitor by name
*
@ -249,6 +294,7 @@ void
monitorSetInterval (MONITOR *mon, unsigned long interval)
{
if (mon->module->setInterval != NULL) {
mon->interval = interval;
mon->module->setInterval(mon->handle, interval);
}
}

View File

@ -28,6 +28,8 @@
* 20/05/14 Massimiliano Pinto Addition of server_string
* 21/05/14 Massimiliano Pinto Addition of node_id
* 28/05/14 Massimiliano Pinto Addition of rlagd and node_ts fields
* 20/06/14 Massimiliano Pinto Addition of master_id, depth, slaves fields
* 26/06/14 Mark Riddoch Addition of server parameters
*
* @endverbatim
*/
@ -76,6 +78,10 @@ SERVER *server;
server->node_id = -1;
server->rlag = -1;
server->node_ts = 0;
server->parameters = NULL;
server->master_id = -1;
server->depth = -1;
server->slaves = NULL;
spinlock_acquire(&server_spin);
server->next = allServers;
@ -250,7 +256,21 @@ char *stat;
if (ptr->server_string)
dcb_printf(dcb, "\tServer Version:\t\t%s\n", ptr->server_string);
dcb_printf(dcb, "\tNode Id: %d\n", ptr->node_id);
if (SERVER_IS_SLAVE(ptr)) {
dcb_printf(dcb, "\tMaster Id: %d\n", ptr->master_id);
if (ptr->slaves) {
int i;
dcb_printf(dcb, "\tSlave Ids: ");
for (i = 0; ptr->slaves[i]; i++)
{
if (i == 0)
dcb_printf(dcb, "%li", ptr->slaves[i]);
else
dcb_printf(dcb, ", %li ", ptr->slaves[i]);
}
dcb_printf(dcb, "\n");
}
dcb_printf(dcb, "\tRepl Depth: %d\n", ptr->depth);
if (SERVER_IS_SLAVE(ptr) || SERVER_IS_RELAY_SERVER(ptr)) {
if (ptr->rlag >= 0) {
dcb_printf(dcb, "\tSlave delay:\t\t%d\n", ptr->rlag);
}
@ -275,7 +295,8 @@ char *stat;
void
dprintServer(DCB *dcb, SERVER *server)
{
char *stat;
char *stat;
SERVER_PARAM *param;
dcb_printf(dcb, "Server %p (%s)\n", server, server->unique_name);
dcb_printf(dcb, "\tServer: %s\n", server->name);
@ -287,13 +308,38 @@ char *stat;
if (server->server_string)
dcb_printf(dcb, "\tServer Version:\t\t%s\n", server->server_string);
dcb_printf(dcb, "\tNode Id: %d\n", server->node_id);
if (SERVER_IS_SLAVE(server)) {
dcb_printf(dcb, "\tMaster Id: %d\n", server->master_id);
if (server->slaves) {
int i;
dcb_printf(dcb, "\tSlave Ids: ");
for (i = 0; server->slaves[i]; i++)
{
if (i == 0)
dcb_printf(dcb, "%li", server->slaves[i]);
else
dcb_printf(dcb, ", %li ", server->slaves[i]);
}
dcb_printf(dcb, "\n");
}
dcb_printf(dcb, "\tRepl Depth: %d\n", server->depth);
if (SERVER_IS_SLAVE(server) || SERVER_IS_RELAY_SERVER(server)) {
if (server->rlag >= 0) {
dcb_printf(dcb, "\tSlave delay:\t\t%d\n", server->rlag);
}
}
if (server->node_ts > 0) {
dcb_printf(dcb, "\tLast Repl Heartbeat:\t%lu\n", server->node_ts);
dcb_printf(dcb, "\tLast Repl Heartbeat:\t%s",
asctime(localtime(&server->node_ts)));
}
if ((param = server->parameters) != NULL)
{
dcb_printf(dcb, "\tServer Parameters:\n");
while (param)
{
dcb_printf(dcb, "\t\t%-20s %s\n", param->name,
param->value);
param = param->next;
}
}
dcb_printf(dcb, "\tNumber of connections: %d\n", server->stats.n_connections);
dcb_printf(dcb, "\tCurrent no. of conns: %d\n", server->stats.n_current);
@ -446,3 +492,59 @@ server_update(SERVER *server, char *protocol, char *user, char *passwd)
}
}
/**
* Add a server parameter to a server.
*
* Server parameters may be used by routing to weight the load
* balancing they apply to the server.
*
* @param server The server we are adding the parameter to
* @param name The parameter name
* @param value The parameter value
*/
void
serverAddParameter(SERVER *server, char *name, char *value)
{
SERVER_PARAM *param;
if ((param = (SERVER_PARAM *)malloc(sizeof(SERVER_PARAM))) == NULL)
{
return;
}
if ((param->name = strdup(name)) == NULL)
{
free(param);
return;
}
if ((param->value = strdup(value)) == NULL)
{
free(param->value);
free(param);
return;
}
param->next = server->parameters;
server->parameters = param;
}
/**
* Retreive a parameter value from a server
*
* @param server The server we are looking for a parameter of
* @param name The name of the parameter we require
* @return The parameter value or NULL if not found
*/
char *
serverGetParameter(SERVER *server, char *name)
{
SERVER_PARAM *param = server->parameters;
while (param)
{
if (strcmp(param->name, name) == 0)
return param->value;
param = param->next;
}
return NULL;
}

View File

@ -114,6 +114,7 @@ SERVICE *service;
service->svc_config_version = 0;
service->filters = NULL;
service->n_filters = 0;
service->weightby = 0;
spinlock_init(&service->spin);
spinlock_init(&service->users_table_spin);
memset(&service->rate_limit, 0, sizeof(SERVICE_REFRESH_RATE));
@ -716,8 +717,8 @@ SERVER *ptr = service->databases;
int i;
printf("Service %p\n", service);
printf("\tService: %s\n", service->name);
printf("\tRouter: %s (%p)\n", service->routerModule, service->router);
printf("\tService: %s\n", service->name);
printf("\tRouter: %s (%p)\n", service->routerModule, service->router);
printf("\tStarted: %s", asctime(localtime(&service->stats.started)));
printf("\tBackend databases\n");
while (ptr)
@ -794,13 +795,16 @@ SERVER *server = service->databases;
int i;
dcb_printf(dcb, "Service %p\n", service);
dcb_printf(dcb, "\tService: %s\n", service->name);
dcb_printf(dcb, "\tRouter: %s (%p)\n", service->routerModule,
service->router);
dcb_printf(dcb, "\tService: %s\n",
service->name);
dcb_printf(dcb, "\tRouter: %s (%p)\n",
service->routerModule, service->router);
if (service->router)
service->router->diagnostics(service->router_instance, dcb);
dcb_printf(dcb, "\tStarted: %s",
dcb_printf(dcb, "\tStarted: %s",
asctime(localtime(&service->stats.started)));
dcb_printf(dcb, "\tRoot user access: %s\n",
service->enable_root ? "Enabled" : "Disabled");
if (service->n_filters)
{
dcb_printf(dcb, "\tFilter chain: ");
@ -818,9 +822,15 @@ int i;
server->protocol);
server = server->nextdb;
}
dcb_printf(dcb, "\tUsers data: %p\n", service->users);
dcb_printf(dcb, "\tTotal connections: %d\n", service->stats.n_sessions);
dcb_printf(dcb, "\tCurrently connected: %d\n", service->stats.n_current);
if (service->weightby)
dcb_printf(dcb, "\tRouting weight parameter: %s\n",
service->weightby);
dcb_printf(dcb, "\tUsers data: %p\n",
service->users);
dcb_printf(dcb, "\tTotal connections: %d\n",
service->stats.n_sessions);
dcb_printf(dcb, "\tCurrently connected: %d\n",
service->stats.n_current);
}
/**
@ -1097,8 +1107,38 @@ static void service_add_qualified_param(
spinlock_release(&svc->spin);
}
char* service_get_name(
SERVICE* svc)
/**
* Return the name of the service
*
* @param svc The service
*/
char *
service_get_name(SERVICE *svc)
{
return svc->name;
}
/**
* Set the weighting parameter for the service
*
* @param service The service pointer
* @param weightby The parameter name to weight the routing by
*/
void
serviceWeightBy(SERVICE *service, char *weightby)
{
if (service->weightby)
free(service->weightby);
service->weightby = strdup(weightby);
}
/**
* Return the parameter the wervice shoudl use to weight connections
* by
* @param service The Service pointer
*/
char *
serviceGetWeightingParameter(SERVICE *service)
{
return service->weightby;
}