Merge branch 'develop' of https://github.com/skysql/MaxScale into develop

This commit is contained in:
VilhoRaatikka
2014-05-30 10:11:00 +03:00
31 changed files with 3096 additions and 692 deletions

Binary file not shown.

View File

@ -284,7 +284,11 @@ unsigned int
gwbuf_length(GWBUF *head)
{
int rval = 0;
CHK_GWBUF(head);
if (head)
{
CHK_GWBUF(head);
}
while (head)
{
rval += GWBUF_LENGTH(head);

View File

@ -29,7 +29,10 @@
* 06/02/14 Massimiliano Pinto Added support for enable/disable root user in services
* 14/02/14 Massimiliano Pinto Added enable_root_user in the service_params list
* 11/03/14 Massimiliano Pinto Added Unix socket support
* 11/05/14 Massimiliano Pinto Added version_string support to service
* 11/05/14 Massimiliano Pinto Added version_string support to service
* 19/05/14 Mark Riddoch Added unique names from section headers
* 23/05/14 Massimiliano Pinto Added automatic set of maxscale-id: first listening ipv4_raw + port + pid
* 28/05/14 Massimiliano Pinto Added detect_replication_lag parameter
*
* @endverbatim
*/
@ -58,7 +61,7 @@ static void check_config_objects(CONFIG_CONTEXT *context);
static char *config_file = NULL;
static GATEWAY_CONF gateway;
char *version_string = NULL;
char *version_string = NULL;
/**
* Config item handler for the ini file reader
@ -129,7 +132,6 @@ int rval;
if (ptr) {
*ptr = '\0';
}
}
mysql_close(conn);
}
@ -165,7 +167,6 @@ int rval;
if (!config_file)
return 0;
if (gateway.version_string)
free(gateway.version_string);
@ -218,6 +219,8 @@ int error_count = 0;
"router");
if (router)
{
char* max_slave_conn_str;
obj->element = service_alloc(obj->object, router);
char *user =
config_get_value(obj->parameters, "user");
@ -228,13 +231,6 @@ int error_count = 0;
char *version_string = config_get_value(obj->parameters, "version_string");
if (version_string) {
((SERVICE *)(obj->element))->version_string = strdup(version_string);
} else {
if (gateway.version_string)
((SERVICE *)(obj->element))->version_string = strdup(gateway.version_string);
}
if (obj->element == NULL) /*< if module load failed */
{
LOGIF(LE, (skygw_log_write_flush(
@ -247,6 +243,17 @@ int error_count = 0;
obj = obj->next;
continue; /*< process next obj */
}
if (version_string) {
((SERVICE *)(obj->element))->version_string = strdup(version_string);
} else {
if (gateway.version_string)
((SERVICE *)(obj->element))->version_string = strdup(gateway.version_string);
}
max_slave_conn_str =
config_get_value(obj->parameters,
"max_slave_connections");
if (enable_root_user)
serviceEnableRootUser(obj->element, atoi(enable_root_user));
@ -267,6 +274,35 @@ int error_count = 0;
"corresponding password.",
obj->object)));
}
if (max_slave_conn_str != NULL)
{
CONFIG_PARAMETER* param;
bool succp;
param = config_get_param(obj->parameters,
"max_slave_connections");
succp = service_set_slave_conn_limit(
obj->element,
param,
max_slave_conn_str,
COUNT_ATMOST);
if (!succp)
{
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Warning : invalid value type "
"for parameter \'%s.%s = %s\'\n\tExpected "
"type is either <int> for slave connection "
"count or\n\t<int>%% for specifying the "
"maximum percentage of available the "
"slaves that will be connected.",
((SERVICE*)obj->element)->name,
param->name,
param->value)));
}
}
}
else
{
@ -299,6 +335,7 @@ int error_count = 0;
obj->element = server_alloc(address,
protocol,
atoi(port));
server_set_unique_name(obj->element, obj->object);
}
else
{
@ -391,12 +428,19 @@ int error_count = 0;
char *port;
char *protocol;
char *socket;
struct sockaddr_in serv_addr;
service = config_get_value(obj->parameters, "service");
port = config_get_value(obj->parameters, "port");
address = config_get_value(obj->parameters, "address");
protocol = config_get_value(obj->parameters, "protocol");
socket = config_get_value(obj->parameters, "socket");
/* if id is not set, do it now */
if (gateway.id == 0) {
setipaddress(&serv_addr.sin_addr, (address == NULL) ? "0.0.0.0" : address);
gateway.id = (unsigned long) (serv_addr.sin_addr.s_addr + port + getpid());
}
if (service && socket && protocol) {
CONFIG_CONTEXT *ptr = context;
@ -459,18 +503,46 @@ int error_count = 0;
char *servers;
char *user;
char *passwd;
unsigned long interval = 0;
int replication_heartbeat = 0;
module = config_get_value(obj->parameters, "module");
servers = config_get_value(obj->parameters, "servers");
user = config_get_value(obj->parameters, "user");
passwd = config_get_value(obj->parameters, "passwd");
if (config_get_value(obj->parameters, "monitor_interval")) {
interval = strtoul(config_get_value(obj->parameters, "monitor_interval"), NULL, 10);
}
if (config_get_value(obj->parameters, "detect_replication_lag")) {
replication_heartbeat = atoi(config_get_value(obj->parameters, "detect_replication_lag"));
}
if (module)
{
obj->element = monitor_alloc(obj->object, module);
if (servers && obj->element)
{
char *s = strtok(servers, ",");
char *s;
/* if id is not set, compute it now with pid only */
if (gateway.id == 0) {
gateway.id = getpid();
}
/* add the maxscale-id to monitor data */
monitorSetId(obj->element, gateway.id);
/* set monitor interval */
if (interval > 0)
monitorSetInterval(obj->element, interval);
/* set replication heartbeat */
if(replication_heartbeat == 1)
monitorSetReplicationHeartbeat(obj->element, replication_heartbeat);
/* get the servers to monitor */
s = strtok(servers, ",");
while (s)
{
CONFIG_CONTEXT *obj1 = context;
@ -560,6 +632,89 @@ config_get_value(CONFIG_PARAMETER *params, const char *name)
return NULL;
}
CONFIG_PARAMETER* config_get_param(
CONFIG_PARAMETER* params,
const char* name)
{
while (params)
{
if (!strcmp(params->name, name))
return params;
params = params->next;
}
return NULL;
}
config_param_type_t config_get_paramtype(
CONFIG_PARAMETER* param)
{
return param->qfd_param_type;
}
int config_get_valint(
CONFIG_PARAMETER* param,
const char* name, /*< if NULL examine current param only */
config_param_type_t ptype)
{
int val = -1; /*< -1 indicates failure */
while (param)
{
if (name == NULL || !strncmp(param->name, name, MAX_PARAM_LEN))
{
switch (ptype) {
case COUNT_TYPE:
val = param->qfd.valcount;
goto return_val;
case PERCENT_TYPE:
val = param->qfd.valpercent;
goto return_val;
case BOOL_TYPE:
val = param->qfd.valbool;
goto return_val;
default:
goto return_val;
}
}
else if (name == NULL)
{
goto return_val;
}
param = param->next;
}
return_val:
return val;
}
CONFIG_PARAMETER* config_clone_param(
CONFIG_PARAMETER* param)
{
CONFIG_PARAMETER* p2;
p2 = (CONFIG_PARAMETER*) malloc(sizeof(CONFIG_PARAMETER));
if (p2 == NULL)
{
goto return_p2;
}
memcpy(p2, param, sizeof(CONFIG_PARAMETER));
p2->name = strndup(param->name, MAX_PARAM_LEN);
p2->value = strndup(param->value, MAX_PARAM_LEN);
if (param->qfd_param_type == STRING_TYPE)
{
p2->qfd.valstr = strndup(param->qfd.valstr, MAX_PARAM_LEN);
}
return_p2:
return p2;
}
/**
* Free a config tree
*
@ -629,6 +784,7 @@ global_defaults()
gateway.version_string = strdup(version_string);
else
gateway.version_string = NULL;
gateway.id=0;
}
/**
@ -671,6 +827,7 @@ SERVER *server;
char *user;
char *auth;
char *enable_root_user;
char* max_slave_conn_str;
char *version_string;
enable_root_user = config_get_value(obj->parameters, "enable_root_user");
@ -683,8 +840,9 @@ SERVER *server;
version_string = config_get_value(obj->parameters, "version_string");
if (version_string) {
if (service->version_string)
if (service->version_string) {
free(service->version_string);
}
service->version_string = strdup(version_string);
}
@ -694,6 +852,42 @@ SERVER *server;
auth);
if (enable_root_user)
serviceEnableRootUser(service, atoi(enable_root_user));
max_slave_conn_str =
config_get_value(
obj->parameters,
"max_slave_connections");
if (max_slave_conn_str != NULL)
{
CONFIG_PARAMETER* param;
bool succp;
param = config_get_param(obj->parameters,
"max_slave_connections");
succp = service_set_slave_conn_limit(
service,
param,
max_slave_conn_str,
COUNT_ATMOST);
if (!succp)
{
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Warning : invalid value type "
"for parameter \'%s.%s = %s\'\n\tExpected "
"type is either <int> for slave connection "
"count or\n\t<int>%% for specifying the "
"maximum percentage of available the "
"slaves that will be connected.",
((SERVICE*)obj->element)->name,
param->name,
param->value)));
}
}
}
obj->element = service;
@ -704,7 +898,9 @@ SERVER *server;
char *auth;
char *enable_root_user;
enable_root_user = config_get_value(obj->parameters, "enable_root_user");
enable_root_user =
config_get_value(obj->parameters,
"enable_root_user");
user = config_get_value(obj->parameters,
"user");
@ -920,6 +1116,7 @@ static char *service_params[] =
"user",
"passwd",
"enable_root_user",
"max_slave_connections",
"version_string",
NULL
};
@ -953,6 +1150,8 @@ static char *monitor_params[] =
"servers",
"user",
"passwd",
"monitor_interval",
"detect_replication_lag",
NULL
};
/**
@ -1010,3 +1209,47 @@ int i;
obj = obj->next;
}
}
/**
* Set qualified parameter value to CONFIG_PARAMETER struct.
*/
bool config_set_qualified_param(
CONFIG_PARAMETER* param,
void* val,
config_param_type_t type)
{
bool succp;
switch (type) {
case STRING_TYPE:
param->qfd.valstr = strndup((const char *)val, MAX_PARAM_LEN);
succp = true;
break;
case COUNT_TYPE:
param->qfd.valcount = *(int *)val;
succp = true;
break;
case PERCENT_TYPE:
param->qfd.valpercent = *(int *)val;
succp = true;
break;
case BOOL_TYPE:
param->qfd.valbool = *(bool *)val;
succp = true;
break;
default:
succp = false;
break;
}
if (succp)
{
param->qfd_param_type = type;
}
return succp;
}

View File

@ -184,6 +184,8 @@ getUsers(SERVICE *service, struct users *users)
}
serviceGetUser(service, &service_user, &service_passwd);
if (service_user == NULL || service_passwd == NULL)
return -1;
/** multi-thread environment requires that thread init succeeds. */
if (mysql_thread_init()) {
@ -218,15 +220,26 @@ getUsers(SERVICE *service, struct users *users)
*/
server = service->databases;
dpwd = decryptPassword(service_passwd);
while (server != NULL && mysql_real_connect(con,
while (server != NULL && (mysql_real_connect(con,
server->name,
service_user,
dpwd,
NULL,
server->port,
NULL,
0) == NULL)
0) == NULL))
{
if (server == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to connect to %s:%d, \"%s\"",
server->name,
server->port,
mysql_error(con))));
mysql_close(con);
return -1;
}
server = server->nextdb;
}
free(dpwd);

View File

@ -47,6 +47,7 @@
* error and 0 bytes to read.
* This fixes a bug with many reads from
* backend
* 07/05/2014 Mark Riddoch Addition of callback mechanism
*
* @endverbatim
*/
@ -81,6 +82,7 @@ static bool dcb_set_state_nomutex(
DCB* dcb,
const dcb_state_t new_state,
dcb_state_t* old_state);
static void dcb_call_callback(DCB *dcb, DCB_REASON reason);
DCB* dcb_get_zombies(void)
{
@ -95,8 +97,8 @@ DCB* dcb_get_zombies(void)
*
* @return A newly allocated DCB or NULL if non could be allocated.
*/
DCB * dcb_alloc(
dcb_role_t role)
DCB *
dcb_alloc(dcb_role_t role)
{
DCB *rval;
@ -118,13 +120,17 @@ DCB *rval;
spinlock_init(&rval->dcb_initlock);
spinlock_init(&rval->writeqlock);
spinlock_init(&rval->delayqlock);
spinlock_init(&rval->dcb_readqlock);
spinlock_init(&rval->authlock);
spinlock_init(&rval->cb_lock);
rval->fd = -1;
memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics
rval->state = DCB_STATE_ALLOC;
bitmask_init(&rval->memdata.bitmask);
rval->writeqlen = 0;
rval->high_water = 0;
rval->low_water = 0;
rval->next = NULL;
rval->callbacks = NULL;
spinlock_acquire(&dcbspin);
if (allDCBs == NULL)
@ -250,6 +256,8 @@ dcb_add_to_zombieslist(DCB *dcb)
static void
dcb_final_free(DCB *dcb)
{
DCB_CALLBACK *cb;
CHK_DCB(dcb);
ss_info_dassert(dcb->state == DCB_STATE_DISCONNECTED,
"dcb not in DCB_STATE_DISCONNECTED state.");
@ -309,6 +317,19 @@ dcb_final_free(DCB *dcb)
GWBUF *queue = dcb->delayq;
while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL);
}
if (dcb->dcb_readqueue)
{
GWBUF* queue = dcb->dcb_readqueue;
while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL);
}
spinlock_acquire(&dcb->cb_lock);
while ((cb = dcb->callbacks) != NULL)
{
dcb->callbacks = cb->next;
free(cb);
}
spinlock_release(&dcb->cb_lock);
if (dcb->dcb_readqueue)
{
@ -690,16 +711,27 @@ return_n:
int
dcb_write(DCB *dcb, GWBUF *queue)
{
int w;
int saved_errno = 0;
int w, qlen;
int saved_errno = 0;
int below_water;
below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0;
ss_dassert(queue != NULL);
/**
* SESSION_STATE_STOPPING means that one of the backends is closing
* the router session. Some backends may have not completed
* authentication yet and thus they have no information about router
* being closed. Session state is changed to SESSION_STATE_STOPPING
* before router's closeSession is called and that tells that DCB may
* still be writable.
*/
if (queue == NULL ||
(dcb->state != DCB_STATE_ALLOC &&
dcb->state != DCB_STATE_POLLING &&
dcb->state != DCB_STATE_LISTENING &&
dcb->state != DCB_STATE_NOPOLLING))
dcb->state != DCB_STATE_NOPOLLING &&
dcb->session->state != SESSION_STATE_STOPPING))
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
@ -710,6 +742,7 @@ dcb_write(DCB *dcb, GWBUF *queue)
dcb,
STRDCBSTATE(dcb->state),
dcb->fd)));
ss_dassert(false);
return 0;
}
@ -726,6 +759,11 @@ dcb_write(DCB *dcb, GWBUF *queue)
* the routine that drains the queue data, so we should
* not have a race condition on the event.
*/
if (queue)
qlen = gwbuf_length(queue);
else
qlen = 0;
atomic_add(&dcb->writeqlen, qlen);
dcb->writeq = gwbuf_append(dcb->writeq, queue);
dcb->stats.n_buffered++;
LOGIF(LD, (skygw_log_write(
@ -838,6 +876,15 @@ dcb_write(DCB *dcb, GWBUF *queue)
* for suspended write.
*/
dcb->writeq = queue;
if (queue)
{
qlen = gwbuf_length(queue);
}
else
{
qlen = 0;
}
atomic_add(&dcb->writeqlen, qlen);
if (queue != NULL)
{
@ -861,6 +908,13 @@ dcb_write(DCB *dcb, GWBUF *queue)
return 0;
}
spinlock_release(&dcb->writeqlock);
if (dcb->high_water && dcb->writeqlen > dcb->high_water && below_water)
{
atomic_add(&dcb->stats.n_high_water, 1);
dcb_call_callback(dcb, DCB_REASON_HIGH_WATER);
}
return 1;
}
@ -875,9 +929,12 @@ dcb_write(DCB *dcb, GWBUF *queue)
int
dcb_drain_writeq(DCB *dcb)
{
int n = 0;
int w;
int saved_errno = 0;
int n = 0;
int w;
int saved_errno = 0;
int above_water;
above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0;
spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq)
@ -938,6 +995,17 @@ int saved_errno = 0;
}
}
spinlock_release(&dcb->writeqlock);
atomic_add(&dcb->writeqlen, -n);
/* The write queue has drained, potentially need to call a callback function */
if (dcb->writeq == NULL)
dcb_call_callback(dcb, DCB_REASON_DRAINED);
if (above_water && dcb->writeqlen < dcb->low_water)
{
atomic_add(&dcb->stats.n_low_water, 1);
dcb_call_callback(dcb, DCB_REASON_LOW_WATER);
}
return n;
}
@ -980,6 +1048,8 @@ dcb_close(DCB *dcb)
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE);
dcb_call_callback(dcb, DCB_REASON_CLOSE);
if (rc == 0) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
@ -1016,12 +1086,15 @@ printDCB(DCB *dcb)
printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state));
if (dcb->remote)
printf("\tConnected to: %s\n", dcb->remote);
printf("\tQueued write data: %d\n", gwbuf_length(dcb->writeq));
if (dcb->writeq)
printf("\tQueued write data: %d\n",gwbuf_length(dcb->writeq));
printf("\tStatistics:\n");
printf("\t\tNo. of Reads: %d\n", dcb->stats.n_reads);
printf("\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
printf("\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
printf("\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
printf("\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
printf("\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
}
/**
@ -1068,6 +1141,8 @@ DCB *dcb;
dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
dcb = dcb->next;
}
spinlock_release(&dcbspin);
@ -1087,12 +1162,15 @@ dprintDCB(DCB *pdcb, DCB *dcb)
if (dcb->remote)
dcb_printf(pdcb, "\tConnected to: %s\n", dcb->remote);
dcb_printf(pdcb, "\tOwning Session: %d\n", dcb->session);
dcb_printf(pdcb, "\tQueued write data: %d\n", gwbuf_length(dcb->writeq));
if (dcb->writeq)
dcb_printf(pdcb, "\tQueued write data: %d\n", gwbuf_length(dcb->writeq));
dcb_printf(pdcb, "\tStatistics:\n");
dcb_printf(pdcb, "\t\tNo. of Reads: %d\n", dcb->stats.n_reads);
dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
}
/**
@ -1429,4 +1507,163 @@ int gw_write(
return w;
}
/**
* Add a callback
*
* Duplicate registrations are not allowed, therefore an error will be returned if
* the specific function, reason and userdata triple are already registered.
* An error will also be returned if the is insufficient memeory available to
* create the registration.
*
* @param dcb The DCB to add the callback to
* @param reason The callback reason
* @param cb The callback function to call
* @param userdata User data to send in the call
* @return Non-zero (true) if the callback was added
*/
int
dcb_add_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata)
{
DCB_CALLBACK *cb, *ptr;
int rval = 1;
if ((ptr = (DCB_CALLBACK *)malloc(sizeof(DCB_CALLBACK))) == NULL)
{
return 0;
}
ptr->reason = reason;
ptr->cb = callback;
ptr->userdata = userdata;
ptr->next = NULL;
spinlock_acquire(&dcb->cb_lock);
cb = dcb->callbacks;
if (cb == NULL)
{
dcb->callbacks = ptr;
spinlock_release(&dcb->cb_lock);
}
else
{
while (cb)
{
if (cb->reason == reason && cb->cb == callback &&
cb->userdata == userdata)
{
free(ptr);
spinlock_release(&dcb->cb_lock);
return 0;
}
if (cb->next == NULL)
cb->next = ptr;
cb = cb->next;
}
spinlock_release(&dcb->cb_lock);
}
return rval;
}
/**
* Remove a callback from the callback list for the DCB
*
* Searches down the linked list to find the callback with a matching reason, function
* and userdata.
*
* @param dcb The DCB to add the callback to
* @param reason The callback reason
* @param cb The callback function to call
* @param userdata User data to send in the call
* @return Non-zero (true) if the callback was removed
*/
int
dcb_remove_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON), void *userdata)
{
DCB_CALLBACK *cb, *pcb = NULL;
int rval = 0;
spinlock_acquire(&dcb->cb_lock);
cb = dcb->callbacks;
if (cb == NULL)
{
rval = 0;
}
else
{
while (cb)
{
if (cb->reason == reason && cb->cb == callback
&& cb->userdata == userdata)
{
if (pcb == NULL)
pcb->next = cb->next;
else
dcb->callbacks = cb->next;
spinlock_release(&dcb->cb_lock);
free(cb);
rval = 1;
break;
}
pcb = cb;
cb = cb->next;
}
}
if (!rval)
spinlock_release(&dcb->cb_lock);
return rval;
}
/**
* Call the set of callbacks registered for a particular reason.
*
* @param dcb The DCB to call the callbacks regarding
* @param reason The reason that has triggered the call
*/
static void
dcb_call_callback(DCB *dcb, DCB_REASON reason)
{
DCB_CALLBACK *cb, *nextcb;
spinlock_acquire(&dcb->cb_lock);
cb = dcb->callbacks;
while (cb)
{
if (cb->reason == reason)
{
nextcb = cb->next;
spinlock_release(&dcb->cb_lock);
cb->cb(dcb, reason, cb->userdata);
spinlock_acquire(&dcb->cb_lock);
cb = nextcb;
}
else
cb = cb->next;
}
spinlock_release(&dcb->cb_lock);
}
/**
* Check the passed DCB to ensure it is in the list of allDCBS
*
* @param DCB The DCB to check
* @return 1 if the DCB is in the list, otherwise 0
*/
int
dcb_isvalid(DCB *dcb)
{
DCB *ptr;
int rval = 0;
spinlock_acquire(&dcbspin);
ptr = allDCBs;
while (ptr)
{
if (ptr == dcb)
{
rval = 1;
break;
}
ptr = ptr->next;
}
spinlock_release(&dcbspin);
return rval;
}

View File

@ -136,7 +136,7 @@ static void usage(void);
static char* get_expanded_pathname(
char** abs_path,
char* input_path,
char* fname);
const char* fname);
static void print_log_n_stderr(
bool do_log,
bool do_stderr,
@ -725,7 +725,7 @@ static bool file_is_writable(
static char* get_expanded_pathname(
char** output_path,
char* relative_path,
char* fname)
const char* fname)
{
char* cnf_file_buf = NULL;
char* expanded_path;
@ -1228,7 +1228,7 @@ int main(int argc, char **argv)
datadir)));
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"Configuration file : %s",
"Configuration file : %s",
cnf_file_path)));
/*< Update the server options */

View File

@ -22,8 +22,10 @@
* @verbatim
* Revision History
*
* Date Who Description
* 08/07/13 Mark Riddoch Initial implementation
* Date Who Description
* 08/07/13 Mark Riddoch Initial implementation
* 23/05/14 Massimiliano Pinto Addition of monitor_interval parameter
* and monitor id
*
* @endverbatim
*/
@ -197,3 +199,70 @@ MONITOR *ptr;
}
spinlock_release(&monLock);
}
/**
* Find a monitor by name
*
* @param name The name of the monitor
* @return Pointer to the monitor or NULL
*/
MONITOR *
monitor_find(char *name)
{
MONITOR *ptr;
spinlock_acquire(&monLock);
ptr = allMonitors;
while (ptr)
{
if (!strcmp(ptr->name, name))
break;
ptr = ptr->next;
}
spinlock_release(&monLock);
return ptr;
}
/**
* Set the id of the monitor.
*
* @param mon The monitor instance
* @param id The id for the monitor
*/
void
monitorSetId(MONITOR *mon, unsigned long id)
{
if (mon->module->defaultId != NULL) {
mon->module->defaultId(mon->handle, id);
}
}
/**
* Set the monitor sampling interval.
*
* @param mon The monitor instance
* @param interval The sampling interval in milliseconds
*/
void
monitorSetInterval (MONITOR *mon, unsigned long interval)
{
if (mon->module->setInterval != NULL) {
mon->module->setInterval(mon->handle, interval);
}
}
/**
* Enable Replication Heartbeat support in monitor.
*
* @param mon The monitor instance
* @param interval The sampling interval in milliseconds
*/
void
monitorSetReplicationHeartbeat(MONITOR *mon, int replication_heartbeat)
{
if (mon->module->replicationHeartbeat != NULL) {
mon->module->replicationHeartbeat(mon->handle, replication_heartbeat);
}
}

View File

@ -22,8 +22,12 @@
* @verbatim
* Revision History
*
* Date Who Description
* 18/06/13 Mark Riddoch Initial implementation
* Date Who Description
* 18/06/13 Mark Riddoch Initial implementation
* 17/05/14 Mark Riddoch Addition of unique_name
* 20/05/14 Massimiliano Pinto Addition of server_string
* 21/05/14 Massimiliano Pinto Addition of node_id
* 28/05/14 Massimiliano Pinto Addition of rlagd and node_ts fields
*
* @endverbatim
*/
@ -67,6 +71,11 @@ SERVER *server;
server->nextdb = NULL;
server->monuser = NULL;
server->monpw = NULL;
server->unique_name = NULL;
server->server_string = NULL;
server->node_id = -1;
server->rlag = -1;
server->node_ts = 0;
spinlock_acquire(&server_spin);
server->next = allServers;
@ -109,10 +118,51 @@ SERVER *ptr;
/* Clean up session and free the memory */
free(server->name);
free(server->protocol);
if (server->unique_name)
free(server->unique_name);
if (server->server_string)
free(server->server_string);
free(server);
return 1;
}
/**
* Set a unique name for the server
*
* @param server The server to ste the name on
* @param name The unique name for the server
*/
void
server_set_unique_name(SERVER *server, char *name)
{
server->unique_name = strdup(name);
}
/**
* Find an existing server using the unique section name in
* configuration file
*
* @param servname The Server name or address
* @param port The server port
* @return The server or NULL if not found
*/
SERVER *
server_find_by_unique_name(char *name)
{
SERVER *server;
spinlock_acquire(&server_spin);
server = allServers;
while (server)
{
if (strcmp(server->unique_name, name) == 0)
break;
server = server->next;
}
spinlock_release(&server_spin);
return server;
}
/**
* Find an existing server
*
@ -190,15 +240,26 @@ char *stat;
ptr = allServers;
while (ptr)
{
dcb_printf(dcb, "Server %p\n", ptr);
dcb_printf(dcb, "Server %p (%s)\n", ptr, ptr->unique_name);
dcb_printf(dcb, "\tServer: %s\n", ptr->name);
stat = server_status(ptr);
dcb_printf(dcb, "\tStatus: %s\n", stat);
free(stat);
dcb_printf(dcb, "\tProtocol: %s\n", ptr->protocol);
dcb_printf(dcb, "\tPort: %d\n", ptr->port);
if (ptr->server_string)
dcb_printf(dcb, "\tServer Version:\t\t%s\n", ptr->server_string);
dcb_printf(dcb, "\tNode Id: %d\n", ptr->node_id);
if (SERVER_IS_SLAVE(ptr)) {
if (ptr->rlag >= 0) {
dcb_printf(dcb, "\tSlave delay:\t\t%d\n", ptr->rlag);
}
}
if (ptr->node_ts > 0) {
dcb_printf(dcb, "\tLast Repl Heartbeat:\t%lu\n", ptr->node_ts);
}
dcb_printf(dcb, "\tNumber of connections: %d\n", ptr->stats.n_connections);
dcb_printf(dcb, "\tCurrent no. of connections: %d\n", ptr->stats.n_current);
dcb_printf(dcb, "\tCurrent no. of conns: %d\n", ptr->stats.n_current);
ptr = ptr->next;
}
spinlock_release(&server_spin);
@ -215,15 +276,57 @@ dprintServer(DCB *dcb, SERVER *server)
{
char *stat;
dcb_printf(dcb, "Server %p\n", server);
dcb_printf(dcb, "Server %p (%s)\n", server, server->unique_name);
dcb_printf(dcb, "\tServer: %s\n", server->name);
stat = server_status(server);
dcb_printf(dcb, "\tStatus: %s\n", stat);
free(stat);
dcb_printf(dcb, "\tProtocol: %s\n", server->protocol);
dcb_printf(dcb, "\tPort: %d\n", server->port);
if (server->server_string)
dcb_printf(dcb, "\tServer Version:\t\t%s\n", server->server_string);
dcb_printf(dcb, "\tNode Id: %d\n", server->node_id);
if (SERVER_IS_SLAVE(server)) {
if (server->rlag >= 0) {
dcb_printf(dcb, "\tSlave delay:\t\t%d\n", server->rlag);
}
}
if (server->node_ts > 0) {
dcb_printf(dcb, "\tLast Repl Heartbeat:\t%lu\n", server->node_ts);
}
dcb_printf(dcb, "\tNumber of connections: %d\n", server->stats.n_connections);
dcb_printf(dcb, "\tCurrent No. of connections: %d\n", server->stats.n_current);
dcb_printf(dcb, "\tCurrent no. of conns: %d\n", server->stats.n_current);
}
/**
* List all servers in a tabular form to a DCB
*
*/
void
dListServers(DCB *dcb)
{
SERVER *ptr;
char *stat;
spinlock_acquire(&server_spin);
ptr = allServers;
if (ptr)
{
dcb_printf(dcb, "%-18s | %-15s | Port | %-18s | Connections\n",
"Server", "Address", "Status");
dcb_printf(dcb, "-------------------------------------------------------------------------------\n");
}
while (ptr)
{
stat = server_status(ptr);
dcb_printf(dcb, "%-18s | %-15s | %5d | %-18s | %4d\n",
ptr->unique_name, ptr->name,
ptr->port, stat,
ptr->stats.n_current);
free(stat);
ptr = ptr->next;
}
spinlock_release(&server_spin);
}
/**

View File

@ -29,6 +29,7 @@
* 25/02/14 Massimiliano Pinto Added: service refresh limit feature
* 28/02/14 Massimiliano Pinto users_alloc moved from service_alloc to serviceStartPort (generic hashable for services)
* 07/05/14 Massimiliano Pinto Added: version_string initialized to NULL
* 23/05/14 Mark Riddoch Addition of service validation call
*
* @endverbatim
*/
@ -55,6 +56,11 @@ extern int lm_enabled_logfiles_bitmask;
static SPINLOCK service_spin = SPINLOCK_INIT;
static SERVICE *allServices = NULL;
static void service_add_qualified_param(
SERVICE* svc,
CONFIG_PARAMETER* param);
/**
* Allocate a new service for the gateway to support
*
@ -102,6 +108,8 @@ SERVICE *service;
service->enable_root = 0;
service->routerOptions = NULL;
service->databases = NULL;
service->svc_config_param = NULL;
service->svc_config_version = 0;
spinlock_init(&service->spin);
spinlock_init(&service->users_table_spin);
memset(&service->rate_limit, 0, sizeof(SERVICE_REFRESH_RATE));
@ -114,6 +122,33 @@ SERVICE *service;
return service;
}
/**
* Check to see if a service pointer is valid
*
* @param service The poitner to check
* @return 1 if the service is in the list of all services
*/
int
service_isvalid(SERVICE *service)
{
SERVICE *ptr;
int rval = 0;
spinlock_acquire(&service_spin);
ptr = allServices;
while (ptr)
{
if (ptr == service)
{
rval = 1;
break;
}
ptr = ptr->next;
}
spinlock_release(&service_spin);
return rval;
}
/**
* Start an individual port/protocol pair
*
@ -691,6 +726,72 @@ SERVER *server = service->databases;
dcb_printf(dcb, "\tCurrently connected: %d\n", service->stats.n_current);
}
/**
* List the defined services in a tabular format.
*
* @param dcb DCB to print the service list to.
*/
void
dListServices(DCB *dcb)
{
SERVICE *ptr;
spinlock_acquire(&service_spin);
ptr = allServices;
if (ptr)
{
dcb_printf(dcb, "%-25s | %-20s | #Users | Total Sessions\n",
"Service Name", "Router Module");
dcb_printf(dcb, "--------------------------------------------------------------------------\n");
}
while (ptr)
{
dcb_printf(dcb, "%-25s | %-20s | %6d | %5d\n",
ptr->name, ptr->routerModule,
ptr->stats.n_current, ptr->stats.n_sessions);
ptr = ptr->next;
}
spinlock_release(&service_spin);
}
/**
* List the defined listeners in a tabular format.
*
* @param dcb DCB to print the service list to.
*/
void
dListListeners(DCB *dcb)
{
SERVICE *ptr;
SERV_PROTOCOL *lptr;
spinlock_acquire(&service_spin);
ptr = allServices;
if (ptr)
{
dcb_printf(dcb, "%-20s | %-18s | %-15s | Port | State\n",
"Service Name", "Protocol Module", "Address");
dcb_printf(dcb, "---------------------------------------------------------------------------\n");
}
while (ptr)
{
lptr = ptr->ports;
while (lptr)
{
dcb_printf(dcb, "%-20s | %-18s | %-15s | %5d | %s\n",
ptr->name, lptr->protocol,
(lptr != NULL) ? lptr->address : "*",
lptr->port,
(lptr->listener->session->state == SESSION_STATE_LISTENER_STOPPED) ? "Stopped" : "Running"
);
lptr = lptr->next;
}
ptr = ptr->next;
}
spinlock_release(&service_spin);
}
/**
* Update the definition of a service
*
@ -782,3 +883,111 @@ int service_refresh_users(SERVICE *service) {
else
return 1;
}
bool service_set_slave_conn_limit (
SERVICE* service,
CONFIG_PARAMETER* param,
char* valstr,
count_spec_t count_spec)
{
char* p;
int valint;
bool percent = false;
bool succp;
/**
* Find out whether the value is numeric and ends with '%' or '\0'
*/
p = valstr;
while(isdigit(*p)) p++;
errno = 0;
if (p == valstr || (*p != '%' && *p != '\0'))
{
succp = false;
}
else if (*p == '%')
{
if (*(p+1) == '\0')
{
*p = '\0';
valint = (int) strtol(valstr, (char **)NULL, 10);
if (valint == 0 && errno != 0)
{
succp = false;
}
else
{
succp = true;
config_set_qualified_param(param, (void *)&valint, PERCENT_TYPE);
}
}
else
{
succp = false;
}
}
else if (*p == '\0')
{
valint = (int) strtol(valstr, (char **)NULL, 10);
if (valint == 0 && errno != 0)
{
succp = false;
}
else
{
succp = true;
config_set_qualified_param(param, (void *)&valint, COUNT_TYPE);
}
}
if (succp)
{
service_add_qualified_param(service, param); /*< add param to svc */
}
return succp;
}
/**
* Add qualified config parameter to SERVICE struct.
*/
static void service_add_qualified_param(
SERVICE* svc,
CONFIG_PARAMETER* param)
{
CONFIG_PARAMETER** p;
spinlock_acquire(&svc->spin);
p = &svc->svc_config_param;
if ((*p) != NULL)
{
do
{
/** If duplicate is found, latter remains */
if (strncasecmp(param->name,
(*p)->name,
strlen(param->name)) == 0)
{
*p = config_clone_param(param);
break;
}
}
while ((*p)->next != NULL);
(*p)->next = config_clone_param(param);
}
else
{
(*p) = config_clone_param(param);
}
/** Increment service's configuration version */
atomic_add(&svc->svc_config_version, 1);
(*p)->next = NULL;
spinlock_release(&svc->spin);
}

View File

@ -114,7 +114,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
/*
* Only create a router session if we are not the listening
* DCB. Creating a router session may create a connection to a
* DCB or an internal DCB. Creating a router session may create a connection to a
* backend server, depending upon the router module implementation
* and should be avoided for the listener session
*
@ -122,13 +122,17 @@ session_alloc(SERVICE *service, DCB *client_dcb)
* session, therefore it is important that the session lock is
* relinquished beforethe router call.
*/
if (client_dcb->state != DCB_STATE_LISTENING)
if (client_dcb->state != DCB_STATE_LISTENING && client_dcb->dcb_role != DCB_ROLE_INTERNAL)
{
session->router_session =
service->router->newSession(service->router_instance,
session);
if (session->router_session == NULL) {
/**
* Inform other threads that session is closing.
*/
session->state == SESSION_STATE_STOPPING;
/*<
* Decrease refcount, set dcb's session pointer NULL
* and set session pointer to NULL.
@ -138,8 +142,8 @@ session_alloc(SERVICE *service, DCB *client_dcb)
session = NULL;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Failed to create router "
"client session. Freeing allocated resources.")));
"Error : Failed to create %s session.",
service->name)));
goto return_session;
}
@ -269,6 +273,34 @@ return_succp :
return succp;
}
/**
* Check to see if a session is valid, i.e. in the list of all sessions
*
* @param session Session to check
* @return 1 if the session is valid otherwise 0
*/
int
session_isvalid(SESSION *session)
{
SESSION *ptr;
int rval = 0;
spinlock_acquire(&session_spin);
ptr = allSessions;
while (ptr)
{
if (ptr == session)
{
rval = 1;
break;
}
ptr = ptr->next;
}
spinlock_release(&session_spin);
return rval;
}
/**
* Print details of an individual session
*
@ -366,6 +398,7 @@ int norouter = 0;
if (norouter)
printf("%d Sessions have no router session\n", norouter);
}
/**
* Print all sessions to a DCB
*
@ -416,6 +449,37 @@ dprintSession(DCB *dcb, SESSION *ptr)
dcb_printf(dcb, "\tConnected: %s", asctime(localtime(&ptr->stats.connect)));
}
/**
* List all sessions in tabular form to a DCB
*
* Designed to be called within a debugger session in order
* to display all active sessions within the gateway
*
* @param dcb The DCB to print to
*/
void
dListSessions(DCB *dcb)
{
SESSION *ptr;
spinlock_acquire(&session_spin);
ptr = allSessions;
if (ptr)
{
dcb_printf(dcb, "Session | Client | State\n");
dcb_printf(dcb, "------------------------------------------\n");
}
while (ptr)
{
dcb_printf(dcb, "%-16p | %-15s | %s\n", ptr,
((ptr->client && ptr->client->remote)
? ptr->client->remote : ""),
session_state(ptr->state));
ptr = ptr->next;
}
spinlock_release(&session_spin);
}
/**
* Convert a session state to a string representation
*
@ -441,3 +505,18 @@ session_state(int state)
return "Invalid State";
}
}
SESSION* get_session_by_router_ses(
void* rses)
{
SESSION* ses = allSessions;
while (ses->router_session != rses && ses->next != NULL)
ses = ses->next;
if (ses->router_session != rses)
{
ses = NULL;
}
return ses;
}

View File

@ -17,6 +17,7 @@
*
* Copyright SkySQL Ab 2013
*/
#include <skygw_utils.h>
/**
* @file config.h The configuration handling elements
@ -27,16 +28,37 @@
* Date Who Description
* 21/06/13 Mark Riddoch Initial implementation
* 07/05/14 Massimiliano Pinto Added version_string to global configuration
* 23/05/14 Massimiliano Pinto Added id to global configuration
*
* @endverbatim
*/
/**
* Maximum length for configuration parameter value.
*/
enum {MAX_PARAM_LEN=256};
typedef enum {
UNDEFINED_TYPE=0,
STRING_TYPE,
COUNT_TYPE,
PERCENT_TYPE,
BOOL_TYPE
} config_param_type_t;
/**
* The config parameter
*/
typedef struct config_parameter {
char *name; /**< The name of the parameter */
char *value; /**< The value of the parameter */
char *value; /**< The value of the parameter */
union { /*< qualified parameter value by type */
char* valstr; /*< terminated char* array */
int valcount; /*< int */
int valpercent; /*< int */
bool valbool; /*< bool */
} qfd;
config_param_type_t qfd_param_type;
struct config_parameter *next; /**< Next pointer in the linked list */
} CONFIG_PARAMETER;
@ -57,9 +79,25 @@ typedef struct config_context {
typedef struct {
int n_threads; /**< Number of polling threads */
char *version_string; /**< The version string of embedded database library */
unsigned long id; /**< MaxScale ID */
} GATEWAY_CONF;
extern int config_load(char *);
extern int config_reload();
extern int config_threadcount();
extern int config_load(char *);
extern int config_reload();
extern int config_threadcount();
CONFIG_PARAMETER* config_get_param(CONFIG_PARAMETER* params, const char* name);
config_param_type_t config_get_paramtype(CONFIG_PARAMETER* param);
CONFIG_PARAMETER* config_clone_param(CONFIG_PARAMETER* param);
bool config_set_qualified_param(
CONFIG_PARAMETER* param,
void* val,
config_param_type_t type);
int config_get_valint(
CONFIG_PARAMETER* param,
const char* name, /*< if NULL examine current param only */
config_param_type_t ptype);
#endif

View File

@ -48,6 +48,8 @@ struct service;
* 15/07/2013 Massimiliano Pinto Added session entry point
* 16/07/2013 Massimiliano Pinto Added command type for dcb
* 07/02/2014 Massimiliano Pinto Added ipv4 data struct into for dcb
* 07/05/2014 Mark Riddoch Addition of callback mechanism
* 08/05/2014 Mark Riddoch Addition of writeq high and low watermarks
*
* @endverbatim
*/
@ -99,6 +101,8 @@ typedef struct dcbstats {
int n_writes; /*< Number of writes on this descriptor */
int n_accepts; /*< Number of accepts on this descriptor */
int n_buffered; /*< Number of buffered writes */
int n_high_water; /*< Number of crosses of high water mark */
int n_low_water; /*< Number of crosses of low water mark */
} DCBSTATS;
/**
@ -137,10 +141,35 @@ typedef enum {
} dcb_state_t;
typedef enum {
DCB_ROLE_SERVICE_LISTENER, /*< Receives initial connect requests from clients */
DCB_ROLE_REQUEST_HANDLER /*< Serves dedicated client */
DCB_ROLE_SERVICE_LISTENER, /*< Receives initial connect requests from clients */
DCB_ROLE_REQUEST_HANDLER, /*< Serves dedicated client */
DCB_ROLE_INTERNAL /*< Internal DCB not connected to the outside */
} dcb_role_t;
/**
* Callback reasons for the DCB callback mechanism.
*/
typedef enum {
DCB_REASON_CLOSE, /*< The DCB is closing */
DCB_REASON_DRAINED, /*< The write delay queue has drained */
DCB_REASON_HIGH_WATER, /*< Cross high water mark */
DCB_REASON_LOW_WATER, /*< Cross low water mark */
DCB_REASON_ERROR, /*< An error was flagged on the connection */
DCB_REASON_HUP /*< A hangup was detected */
} DCB_REASON;
/**
* Callback structure - used to track callbacks registered on a DCB
*/
typedef struct dcb_callback {
DCB_REASON reason; /*< The reason for the callback */
int (*cb)(struct dcb *dcb, DCB_REASON reason, void *userdata);
void *userdata; /*< User data to be sent in the callback */
struct dcb_callback
*next; /*< Next callback for this DCB */
} DCB_CALLBACK;
/**
* Descriptor Control Block
*
@ -172,12 +201,12 @@ typedef struct dcb {
struct session *session; /**< The owning session */
GWPROTOCOL func; /**< The functions for this descriptor */
unsigned int writeqlen; /**< Current number of byes in the write queue */
SPINLOCK writeqlock; /**< Write Queue spinlock */
GWBUF *writeq; /**< Write Data Queue */
SPINLOCK delayqlock; /**< Delay Backend Write Queue spinlock */
GWBUF *delayq; /**< Delay Backend Write Data Queue */
GWBUF *dcb_readqueue; /**< read queue for storing incomplete reads */
SPINLOCK dcb_readqlock; /**< read/write access protection to read queue */
SPINLOCK authlock; /**< Generic Authorization spinlock */
DCBSTATS stats; /**< DCB related statistics */
@ -187,6 +216,11 @@ typedef struct dcb {
void *data; /**< Specific client data */
DCBMM memdata; /**< The data related to DCB memory management */
int command; /**< Specific client command type */
SPINLOCK cb_lock; /**< The lock for the callbacks linked list */
DCB_CALLBACK *callbacks; /**< The list of callbacks for the DCB */
unsigned int high_water; /**< High water mark */
unsigned int low_water; /**< Low water mark */
#if defined(SS_DEBUG)
skygw_chk_t dcb_chk_tail;
#endif
@ -205,6 +239,11 @@ int fail_accept_errno;
#define DCB_SESSION(x) (x)->session
#define DCB_PROTOCOL(x, type) (type *)((x)->protocol)
#define DCB_ISZOMBIE(x) ((x)->state == DCB_STATE_ZOMBIE)
#define DCB_WRITEQLEN(x) (x)->writeqlen
#define DCB_SET_LOW_WATER(x, lo) (x)->low_water = (lo);
#define DCB_SET_HIGH_WATER(x, hi) (x)->low_water = (hi);
#define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water)
#define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water)
DCB *dcb_get_zombies(void);
int gw_write(
@ -231,6 +270,11 @@ void dcb_printf(DCB *, const char *, ...); /* DCB version of printf */
int dcb_isclient(DCB *); /* the DCB is the client of the session */
void dcb_hashtable_stats(DCB *, void *); /**< Print statisitics */
void dcb_add_to_zombieslist(DCB* dcb);
int dcb_add_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *),
void *);
int dcb_remove_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON),
void *);
int dcb_isvalid(DCB *); /* Check the DCB is in the linked list */
bool dcb_set_state(
DCB* dcb,

View File

@ -26,9 +26,11 @@
* @verbatim
* Revision History
*
* Date Who Description
* 07/07/13 Mark Riddoch Initial implementation
* 25/07/13 Mark Riddoch Addition of diagnotics
* Date Who Description
* 07/07/13 Mark Riddoch Initial implementation
* 25/07/13 Mark Riddoch Addition of diagnotics
* 23/05/14 Mark Riddoch Addition of routine to find monitors by name
* 23/05/14 Massimiliano Pinto Addition of defaultId and setInterval
*
* @endverbatim
*/
@ -65,6 +67,9 @@ typedef struct {
void (*unregisterServer)(void *, SERVER *);
void (*defaultUser)(void *, char *, char *);
void (*diagnostics)(DCB *, void *);
void (*setInterval)(void *, unsigned long);
void (*defaultId)(void *, unsigned long);
void (*replicationHeartbeat)(void *, int);
} MONITOR_OBJECT;
/**
@ -79,10 +84,14 @@ typedef struct monitor {
extern MONITOR *monitor_alloc(char *, char *);
extern void monitor_free(MONITOR *);
extern MONITOR *monitor_find(char *);
extern void monitorAddServer(MONITOR *, SERVER *);
extern void monitorAddUser(MONITOR *, char *, char *);
extern void monitorStop(MONITOR *);
extern void monitorStart(MONITOR *);
extern void monitorStopAll();
extern void monitorShowAll(DCB *);
extern void monitorSetId(MONITOR *, unsigned long);
extern void monitorSetInterval (MONITOR *, unsigned long);
extern void monitorSetReplicationHeartbeat(MONITOR *, int);
#endif

View File

@ -27,10 +27,14 @@
* @verbatim
* Revision History
*
* Date Who Description
* 14/06/13 Mark Riddoch Initial implementation
* 21/06/13 Mark Riddoch Addition of server status flags
* 22/07/13 Mark Riddoch Addition of JOINED status for Galera
* Date Who Description
* 14/06/13 Mark Riddoch Initial implementation
* 21/06/13 Mark Riddoch Addition of server status flags
* 22/07/13 Mark Riddoch Addition of JOINED status for Galera
* 18/05/14 Mark Riddoch Addition of unique_name field
* 20/05/14 Massimiliano Pinto Addition of server_string field
* 20/05/14 Massimiliano Pinto Addition of node_id field
* 23/05/14 Massimiliano Pinto Addition of rlag and node_ts fields
*
* @endverbatim
*/
@ -51,6 +55,7 @@ typedef struct {
* between the gateway and the server.
*/
typedef struct server {
char *unique_name; /**< Unique name for the server */
char *name; /**< Server name/IP address*/
unsigned short port; /**< Port to listen on */
char *protocol; /**< Protocol module to use */
@ -60,6 +65,10 @@ typedef struct server {
SERVER_STATS stats; /**< The server statistics */
struct server *next; /**< Next server */
struct server *nextdb; /**< Next server in list attached to a service */
char *server_string; /**< Server version string, i.e. MySQL server version */
long node_id; /**< Node id, server_id for M/S or local_index for Galera */
int rlag; /**< Replication Lag for Master / Slave replication */
unsigned long node_ts; /**< Last timestamp set from M/S monitor module */
} SERVER;
/**
@ -99,18 +108,21 @@ typedef struct server {
* Is the server joined Galera node? The server must be running and joined.
*/
#define SERVER_IS_JOINED(server) \
(((server)->status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE|SERVER_JOINED)) == (SERVER_RUNNING|SERVER_JOINED))
(((server)->status & (SERVER_RUNNING|SERVER_JOINED)) == (SERVER_RUNNING|SERVER_JOINED))
extern SERVER *server_alloc(char *, char *, unsigned short);
extern int server_free(SERVER *);
extern SERVER *server_find_by_unique_name(char *);
extern SERVER *server_find(char *, unsigned short);
extern void printServer(SERVER *);
extern void printAllServers();
extern void dprintAllServers(DCB *);
extern void dprintServer(DCB *, SERVER *);
extern void dListServers(DCB *);
extern char *server_status(SERVER *);
extern void server_set_status(SERVER *, int);
extern void server_clear_status(SERVER *, int);
extern void serverAddMonUser(SERVER *, char *, char *);
extern void server_update(SERVER *, char *, char *, char *);
extern void server_set_unique_name(SERVER *, char *);
#endif

View File

@ -22,6 +22,7 @@
#include <spinlock.h>
#include <dcb.h>
#include <server.h>
#include "config.h"
/**
* @file service.h
@ -116,6 +117,8 @@ typedef struct service {
SERVICE_STATS stats; /**< The service statistics */
struct users *users; /**< The user data for this service */
int enable_root; /**< Allow root user access */
CONFIG_PARAMETER* svc_config_param; /*< list of config params and values */
int svc_config_version; /*< Version number of configuration */
SPINLOCK
users_table_spin; /**< The spinlock for users data refresh */
SERVICE_REFRESH_RATE
@ -123,12 +126,15 @@ typedef struct service {
struct service *next; /**< The next service in the linked list */
} SERVICE;
typedef enum count_spec_t {COUNT_ATLEAST=0, COUNT_EXACT, COUNT_ATMOST} count_spec_t;
#define SERVICE_STATE_ALLOC 1 /**< The service has been allocated */
#define SERVICE_STATE_STARTED 2 /**< The service has been started */
extern SERVICE *service_alloc(char *, char *);
extern int service_free(SERVICE *);
extern SERVICE *service_find(char *);
extern int service_isvalid(SERVICE *);
extern int serviceAddProtocol(SERVICE *, char *, char *, unsigned short);
extern int serviceHasProtocol(SERVICE *, char *, unsigned short);
extern void serviceAddBackend(SERVICE *, SERVER *);
@ -148,5 +154,12 @@ extern int service_refresh_users(SERVICE *);
extern void printService(SERVICE *);
extern void printAllServices();
extern void dprintAllServices(DCB *);
bool service_set_slave_conn_limit (
SERVICE* service,
CONFIG_PARAMETER* param,
char* valstr,
count_spec_t count_spec);
extern void dprintService(DCB *, SERVICE *);
extern void dListServices(DCB *);
extern void dListListeners(DCB *);
#endif

View File

@ -53,6 +53,7 @@ typedef enum {
SESSION_STATE_ALLOC, /*< for all sessions */
SESSION_STATE_READY, /*< for router session */
SESSION_STATE_ROUTER_READY, /*< for router session */
SESSION_STATE_STOPPING, /*< router is being closed */
SESSION_STATE_LISTENER, /*< for listener session */
SESSION_STATE_LISTENER_STOPPED, /*< for listener session */
SESSION_STATE_FREE /*< for all sessions */
@ -87,10 +88,13 @@ typedef struct session {
SESSION *session_alloc(struct service *, struct dcb *);
bool session_free(SESSION *);
int session_isvalid(SESSION *);
void printAllSessions();
void printSession(SESSION *);
void dprintAllSessions(struct dcb *);
void dprintSession(struct dcb *, SESSION *);
void dListSessions(struct dcb *);
char *session_state(int);
bool session_link_dcb(SESSION *, struct dcb *);
SESSION* get_session_by_router_ses(void* rses);
#endif

View File

@ -41,6 +41,7 @@ struct cli_session;
typedef struct cli_instance {
SPINLOCK lock; /*< The instance spinlock */
SERVICE *service; /*< The debug cli service */
int mode; /*< CLI interface mode */
struct cli_session
*sessions; /*< Linked list of sessions within this instance */
struct cli_instance
@ -53,8 +54,13 @@ typedef struct cli_instance {
*/
typedef struct cli_session {
char cmdbuf[80]; /*< The command buffer used to build up user commands */
int mode; /*< The CLI Mode for this session */
SESSION *session; /*< The gateway session */
struct cli_session
*next; /*< The next pointer for the list of sessions */
} CLI_SESSION;
/* Command line interface modes */
#define CLIM_USER 1
#define CLIM_DEVELOPER 2
#endif

View File

@ -31,15 +31,13 @@
#include <dcb.h>
/**
* Internal structure used to define the set of backend servers we are routing
* connections to. This provides the storage for routing module specific data
* that is required for each of the backend servers.
*/
typedef struct backend {
SERVER* backend_server; /*< The server itself */
int backend_conn_count; /*< Number of connections to the server */
} BACKEND;
typedef enum backend_type_t {
BE_UNDEFINED=-1,
BE_MASTER,
BE_JOINED = BE_MASTER,
BE_SLAVE,
BE_COUNT
} backend_type_t;
typedef struct rses_property_st rses_property_t;
typedef struct router_client_session ROUTER_CLIENT_SES;
@ -52,14 +50,34 @@ typedef enum rses_property_type_t {
RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1
} rses_property_type_t;
typedef enum backend_type_t {
BE_UNDEFINED=-1,
BE_MASTER,
BE_JOINED = BE_MASTER,
BE_SLAVE,
BE_COUNT
} backend_type_t;
/**
* This criteria is used when backends are chosen for a router session's use.
* Backend servers are sorted to ascending order according to the criteria
* and top N are chosen.
*/
typedef enum select_criteria {
UNDEFINED_CRITERIA=0,
LEAST_GLOBAL_CONNECTIONS, /*< all connections established by MaxScale */
DEFAULT_CRITERIA=LEAST_GLOBAL_CONNECTIONS,
LEAST_ROUTER_CONNECTIONS, /*< connections established by this router */
LEAST_BEHIND_MASTER,
LAST_CRITERIA /*< not used except for an index */
} select_criteria_t;
/** default values for rwsplit configuration parameters */
#define CONFIG_MAX_SLAVE_CONN 1
#define GET_SELECT_CRITERIA(s) \
(strncmp(s,"LEAST_GLOBAL_CONNECTIONS", strlen("LEAST_GLOBAL_CONNECTIONS")) == 0 ? \
LEAST_GLOBAL_CONNECTIONS : ( \
strncmp(s,"LEAST_BEHIND_MASTER", strlen("LEAST_BEHIND_MASTER")) == 0 ? \
LEAST_BEHIND_MASTER : ( \
strncmp(s,"LEAST_ROUTER_CONNECTIONS", strlen("LEAST_ROUTER_CONNECTIONS")) == 0 ? \
LEAST_ROUTER_CONNECTIONS : UNDEFINED_CRITERIA)))
/**
* Session variable command
*/
@ -98,13 +116,63 @@ struct rses_property_st {
};
typedef struct sescmd_cursor_st {
#if defined(SS_DEBUG)
skygw_chk_t scmd_cur_chk_top;
#endif
ROUTER_CLIENT_SES* scmd_cur_rses; /*< pointer to owning router session */
rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */
mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */
bool scmd_cur_active; /*< true if command is being executed */
backend_type_t scmd_cur_be_type; /*< BE_MASTER or BE_SLAVE */
#if defined(SS_DEBUG)
skygw_chk_t scmd_cur_chk_tail;
#endif
} sescmd_cursor_t;
/**
* Internal structure used to define the set of backend servers we are routing
* connections to. This provides the storage for routing module specific data
* that is required for each of the backend servers.
*
* Owned by router_instance, referenced by each routing session.
*/
typedef struct backend_st {
#if defined(SS_DEBUG)
skygw_chk_t be_chk_top;
#endif
SERVER* backend_server; /*< The server itself */
int backend_conn_count; /*< Number of connections to the server */
bool be_valid; /*< valid when belongs to the router's configuration */
#if defined(SS_DEBUG)
skygw_chk_t be_chk_tail;
#endif
} BACKEND;
/**
* Reference to BACKEND.
*
* Owned by router client session.
*/
typedef struct backend_ref_st {
#if defined(SS_DEBUG)
skygw_chk_t bref_chk_top;
#endif
BACKEND* bref_backend;
DCB* bref_dcb;
sescmd_cursor_t bref_sescmd_cur;
#if defined(SS_DEBUG)
skygw_chk_t bref_chk_tail;
#endif
} backend_ref_t;
typedef struct rwsplit_config_st {
int rw_max_slave_conn_percent;
int rw_max_slave_conn_count;
select_criteria_t rw_slave_select_criteria;
} rwsplit_config_t;
/**
* The client session structure used within this router.
*/
@ -113,17 +181,18 @@ struct router_client_session {
skygw_chk_t rses_chk_top;
#endif
SPINLOCK rses_lock; /*< protects rses_deleted */
int rses_versno; /*< even = no active update, else odd */
int rses_versno; /*< even = no active update, else odd. not used 4/14 */
bool rses_closed; /*< true when closeSession is called */
/** Properties listed by their type */
rses_property_t* rses_properties[RSES_PROP_TYPE_COUNT];
BACKEND* rses_backend[BE_COUNT];/*< Backends used by client session */
DCB* rses_dcb[BE_COUNT];
/*< cursor is pointer and status variable to current session command */
sescmd_cursor_t rses_cursor[BE_COUNT];
backend_ref_t* rses_master_ref;
backend_ref_t* rses_backend_ref; /*< Pointer to backend reference array */
rwsplit_config_t rses_config; /*< copied config info from router instance */
int rses_nbackends;
int rses_capabilities; /*< input type, for example */
bool rses_autocommit_enabled;
bool rses_transaction_active;
uint64_t rses_id; /*< ID for router client session */
struct router_client_session* next;
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail;
@ -151,6 +220,8 @@ typedef struct router_instance {
SPINLOCK lock; /*< Lock for the instance data */
BACKEND** servers; /*< Backend servers */
BACKEND* master; /*< NULL or pointer */
rwsplit_config_t rwsplit_config; /*< expanded config info from SERVICE */
int rwsplit_version;/*< version number for router's config */
unsigned int bitmask; /*< Bitmask to apply to server->status */
unsigned int bitvalue; /*< Required value of server->status */
ROUTER_STATS stats; /*< Statistics for this router */

View File

@ -22,8 +22,12 @@
* @verbatim
* Revision History
*
* Date Who Description
* 22/07/13 Mark Riddoch Initial implementation
* Date Who Description
* 22/07/13 Mark Riddoch Initial implementation
* 21/05/14 Massimiliano Pinto Monitor sets a master server
* that has the lowest value of wsrep_local_index
* 23/05/14 Massimiliano Pinto Added 1 configuration option (setInterval).
* Interval is printed in diagnostics.
*
* @endverbatim
*/
@ -45,7 +49,7 @@ extern int lm_enabled_logfiles_bitmask;
static void monitorMain(void *);
static char *version_str = "V1.0.0";
static char *version_str = "V1.2.0";
static void *startMonitor(void *);
static void stopMonitor(void *);
@ -53,8 +57,9 @@ static void registerServer(void *, SERVER *);
static void unregisterServer(void *, SERVER *);
static void defaultUsers(void *, char *, char *);
static void diagnostics(DCB *, void *);
static void setInterval(void *, unsigned long);
static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUsers, diagnostics };
static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUsers, diagnostics, setInterval, NULL, NULL };
/**
* Implementation of the mandatory version entry point
@ -119,9 +124,11 @@ MYSQL_MONITOR *handle;
handle->shutdown = 0;
handle->defaultUser = NULL;
handle->defaultPasswd = NULL;
handle->id = MONITOR_DEFAULT_ID;
handle->interval = MONITOR_INTERVAL;
spinlock_init(&handle->lock);
}
handle->tid = thread_start(monitorMain, handle);
handle->tid = (THREAD)thread_start(monitorMain, handle);
return handle;
}
@ -136,7 +143,7 @@ stopMonitor(void *arg)
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
handle->shutdown = 1;
thread_wait(handle->tid);
thread_wait((void *)handle->tid);
}
/**
@ -234,7 +241,10 @@ char *sep;
dcb_printf(dcb, "\tMonitor stopped\n");
break;
}
dcb_printf(dcb,"\tSampling interval:\t%lu milliseconds\n", handle->interval);
dcb_printf(dcb, "\tMonitored servers: ");
db = handle->databases;
sep = "";
while (db)
@ -280,6 +290,8 @@ MYSQL_RES *result;
int num_fields;
int isjoined = 0;
char *uname = defaultUser, *passwd = defaultPasswd;
unsigned long int server_version = 0;
char *server_string;
if (database->server->monuser != NULL)
{
@ -297,6 +309,7 @@ char *uname = defaultUser, *passwd = defaultPasswd;
uname, dpwd, NULL, database->server->port, NULL, 0) == NULL)
{
server_clear_status(database->server, SERVER_RUNNING);
database->server->node_id = -1;
free(dpwd);
return;
}
@ -306,6 +319,15 @@ char *uname = defaultUser, *passwd = defaultPasswd;
/* If we get this far then we have a working connection */
server_set_status(database->server, SERVER_RUNNING);
/* get server version from current server */
server_version = mysql_get_server_version(database->con);
/* get server version string */
server_string = (char *)mysql_get_server_info(database->con);
if (server_string) {
database->server->server_string = strdup(server_string);
}
/* Check if the the Galera FSM shows this node is joined to the cluster */
if (mysql_query(database->con, "SHOW STATUS LIKE 'wsrep_local_state_comment'") == 0
&& (result = mysql_store_result(database->con)) != NULL)
@ -319,6 +341,25 @@ char *uname = defaultUser, *passwd = defaultPasswd;
mysql_free_result(result);
}
/* Check the the Galera node index in the cluster */
if (mysql_query(database->con, "SHOW STATUS LIKE 'wsrep_local_index'") == 0
&& (result = mysql_store_result(database->con)) != NULL)
{
long local_index = -1;
num_fields = mysql_num_fields(result);
while ((row = mysql_fetch_row(result)))
{
local_index = strtol(row[1], NULL, 10);
if ((errno == ERANGE && (local_index == LONG_MAX
|| local_index == LONG_MIN)) || (errno != 0 && local_index == 0))
{
local_index = -1;
}
database->server->node_id = local_index;
}
mysql_free_result(result);
}
if (isjoined)
server_set_status(database->server, SERVER_JOINED);
else
@ -335,6 +376,7 @@ monitorMain(void *arg)
{
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
MONITOR_SERVERS *ptr;
long master_id;
if (mysql_thread_init())
{
@ -347,6 +389,8 @@ MONITOR_SERVERS *ptr;
handle->status = MONITOR_RUNNING;
while (1)
{
master_id = -1;
if (handle->shutdown)
{
handle->status = MONITOR_STOPPING;
@ -354,12 +398,63 @@ MONITOR_SERVERS *ptr;
handle->status = MONITOR_STOPPED;
return;
}
ptr = handle->databases;
while (ptr)
{
monitorDatabase(ptr, handle->defaultUser, handle->defaultPasswd);
/* set master_id to the lowest value of ptr->server->node_id */
if (ptr->server->node_id >= 0 && SERVER_IS_JOINED(ptr->server)) {
if (ptr->server->node_id < master_id && master_id >= 0) {
master_id = ptr->server->node_id;
} else {
if (master_id < 0) {
master_id = ptr->server->node_id;
}
}
} else {
/* clear M/S status */
server_clear_status(ptr->server, SERVER_SLAVE);
server_clear_status(ptr->server, SERVER_MASTER);
}
ptr = ptr->next;
}
thread_millisleep(10000);
ptr = handle->databases;
/* this server loop sets Master and Slave roles */
while (ptr)
{
if (ptr->server->node_id >= 0 && master_id >= 0) {
/* set the Master role */
if (SERVER_IS_JOINED(ptr->server) && (ptr->server->node_id == master_id)) {
server_set_status(ptr->server, SERVER_MASTER);
server_clear_status(ptr->server, SERVER_SLAVE);
} else if (SERVER_IS_JOINED(ptr->server) && (ptr->server->node_id > master_id)) {
/* set the Slave role */
server_set_status(ptr->server, SERVER_SLAVE);
server_clear_status(ptr->server, SERVER_MASTER);
}
}
ptr = ptr->next;
}
thread_millisleep(handle->interval);
}
}
/**
* Set the monitor sampling interval.
*
* @param arg The handle allocated by startMonitor
* @param interval The interval to set in monitor struct, in milliseconds
*/
static void
setInterval(void *arg, unsigned long interval)
{
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
memcpy(&handle->interval, &interval, sizeof(unsigned long));
}

View File

@ -22,12 +22,16 @@
* @verbatim
* Revision History
*
* Date Who Description
* 08/07/13 Mark Riddoch Initial implementation
* 11/07/13 Mark Riddoch Addition of code to check replication
* status
* 25/07/13 Mark Riddoch Addition of decrypt for passwords and
* diagnostic interface
* Date Who Description
* 08/07/13 Mark Riddoch Initial implementation
* 11/07/13 Mark Riddoch Addition of code to check replication
* status
* 25/07/13 Mark Riddoch Addition of decrypt for passwords and
* diagnostic interface
* 20/05/14 Massimiliano Pinto Addition of support for MariadDB multimaster replication setup.
* New server field version_string is updated.
* 28/05/14 Massimiliano Pinto Added set Id and configuration options (setInverval)
* Parameters are now printed in diagnostics
*
* @endverbatim
*/
@ -49,7 +53,7 @@ extern int lm_enabled_logfiles_bitmask;
static void monitorMain(void *);
static char *version_str = "V1.0.0";
static char *version_str = "V1.2.0";
static void *startMonitor(void *);
static void stopMonitor(void *);
@ -57,8 +61,11 @@ static void registerServer(void *, SERVER *);
static void unregisterServer(void *, SERVER *);
static void defaultUser(void *, char *, char *);
static void diagnostics(DCB *, void *);
static void setInterval(void *, unsigned long);
static void defaultId(void *, unsigned long);
static void replicationHeartbeat(void *, int);
static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUser, diagnostics };
static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUser, diagnostics, setInterval, defaultId, replicationHeartbeat };
/**
* Implementation of the mandatory version entry point
@ -124,6 +131,8 @@ MYSQL_MONITOR *handle;
handle->shutdown = 0;
handle->defaultUser = NULL;
handle->defaultPasswd = NULL;
handle->id = MONITOR_DEFAULT_ID;
handle->interval = MONITOR_INTERVAL;
spinlock_init(&handle->lock);
}
handle->tid = (THREAD)thread_start(monitorMain, handle);
@ -259,7 +268,12 @@ char *sep;
dcb_printf(dcb, "\tMonitor stopped\n");
break;
}
dcb_printf(dcb,"\tSampling interval:\t%lu milliseconds\n", handle->interval);
dcb_printf(dcb,"\tMaxScale MonitorId:\t%lu\n", handle->id);
dcb_printf(dcb,"\tReplication lag:\t%s\n", (handle->replicationHeartbeat == 1) ? "enabled" : "disabled");
dcb_printf(dcb, "\tMonitored servers: ");
db = handle->databases;
sep = "";
while (db)
@ -278,18 +292,21 @@ char *sep;
/**
* Monitor an individual server
*
* @param database The database to probe
* @param defaultUser Default username for the monitor
* @param defaultPasswd Default password for the monitor
* @param handle The MySQL Monitor object
* @param database The database to probe
*/
static void
monitorDatabase(MONITOR_SERVERS *database, char *defaultUser, char *defaultPasswd)
monitorDatabase(MYSQL_MONITOR *handle, MONITOR_SERVERS *database)
{
MYSQL_ROW row;
MYSQL_RES *result;
int num_fields;
int ismaster = 0, isslave = 0;
char *uname = defaultUser, *passwd = defaultPasswd;
char *uname = handle->defaultUser, *passwd = handle->defaultPasswd;
unsigned long int server_version = 0;
char *server_string;
unsigned long id = handle->id;
int replication_heartbeat = handle->replicationHeartbeat;
if (database->server->monuser != NULL)
{
@ -321,6 +338,34 @@ char *uname = defaultUser, *passwd = defaultPasswd;
/* If we get this far then we have a working connection */
server_set_status(database->server, SERVER_RUNNING);
/* get server version from current server */
server_version = mysql_get_server_version(database->con);
/* get server version string */
server_string = (char *)mysql_get_server_info(database->con);
if (server_string) {
database->server->server_string = strdup(server_string);
}
/* get server_id form current node */
if (mysql_query(database->con, "SELECT @@server_id") == 0
&& (result = mysql_store_result(database->con)) != NULL)
{
long server_id = -1;
num_fields = mysql_num_fields(result);
while ((row = mysql_fetch_row(result)))
{
server_id = strtol(row[0], NULL, 10);
if ((errno == ERANGE && (server_id == LONG_MAX
|| server_id == LONG_MIN)) || (errno != 0 && server_id == 0))
{
server_id = -1;
}
database->server->node_id = server_id;
}
mysql_free_result(result);
}
/* Check SHOW SLAVE HOSTS - if we get rows then we are a master */
if (mysql_query(database->con, "SHOW SLAVE HOSTS"))
{
@ -328,31 +373,228 @@ char *uname = defaultUser, *passwd = defaultPasswd;
{
/* Log lack of permission */
}
}
else if ((result = mysql_store_result(database->con)) != NULL)
{
database->server->rlag = -1;
} else if ((result = mysql_store_result(database->con)) != NULL) {
num_fields = mysql_num_fields(result);
while ((row = mysql_fetch_row(result)))
{
ismaster = 1;
}
mysql_free_result(result);
if (ismaster && replication_heartbeat == 1) {
time_t heartbeat;
time_t purge_time;
char heartbeat_insert_query[128]="";
char heartbeat_purge_query[128]="";
handle->master_id = database->server->node_id;
/* create the maxscale_schema database */
if (mysql_query(database->con, "CREATE DATABASE IF NOT EXISTS maxscale_schema")) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"[mysql_mon]: Error creating maxscale_schema database in Master server"
": %s", mysql_error(database->con))));
database->server->rlag = -1;
}
/* create repl_heartbeat table in maxscale_schema database */
if (mysql_query(database->con, "CREATE TABLE IF NOT EXISTS "
"maxscale_schema.replication_heartbeat "
"(maxscale_id INT NOT NULL, "
"master_server_id INT NOT NULL, "
"master_timestamp INT UNSIGNED NOT NULL, "
"PRIMARY KEY ( master_server_id, maxscale_id ) ) "
"ENGINE=MYISAM DEFAULT CHARSET=latin1")) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"[mysql_mon]: Error creating maxscale_schema.replication_heartbeat table in Master server"
": %s", mysql_error(database->con))));
database->server->rlag = -1;
}
/* auto purge old values after 48 hours*/
purge_time = time(0) - (3600 * 48);
sprintf(heartbeat_purge_query, "DELETE FROM maxscale_schema.replication_heartbeat WHERE master_timestamp < %lu", purge_time);
if (mysql_query(database->con, heartbeat_purge_query)) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"[mysql_mon]: Error deleting from maxscale_schema.replication_heartbeat table: [%s], %s",
heartbeat_purge_query,
mysql_error(database->con))));
}
heartbeat = time(0);
/* set node_ts for master as time(0) */
database->server->node_ts = heartbeat;
sprintf(heartbeat_insert_query, "UPDATE maxscale_schema.replication_heartbeat SET master_timestamp = %lu WHERE master_server_id = %i AND maxscale_id = %lu", heartbeat, handle->master_id, id);
/* Try to insert MaxScale timestamp into master */
if (mysql_query(database->con, heartbeat_insert_query)) {
database->server->rlag = -1;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"[mysql_mon]: Error updating maxscale_schema.replication_heartbeat table: [%s], %s",
heartbeat_insert_query,
mysql_error(database->con))));
} else {
if (mysql_affected_rows(database->con) == 0) {
heartbeat = time(0);
sprintf(heartbeat_insert_query, "REPLACE INTO maxscale_schema.replication_heartbeat (master_server_id, maxscale_id, master_timestamp ) VALUES ( %i, %lu, %lu)", handle->master_id, id, heartbeat);
if (mysql_query(database->con, heartbeat_insert_query)) {
database->server->rlag = -1;
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"[mysql_mon]: Error inserting into maxscale_schema.replication_heartbeat table: [%s], %s",
heartbeat_insert_query,
mysql_error(database->con))));
} else {
/* Set replication lag to 0 for the master */
database->server->rlag = 0;
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"[mysql_mon]: heartbeat table inserted data for %s:%i", database->server->name, database->server->port)));
}
} else {
/* Set replication lag as 0 for the master */
database->server->rlag = 0;
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"[mysql_mon]: heartbeat table updated for %s:%i", database->server->name, database->server->port)));
}
}
}
}
/* Check if the Slave_SQL_Running and Slave_IO_Running status is
* set to Yes
*/
if (mysql_query(database->con, "SHOW SLAVE STATUS") == 0
&& (result = mysql_store_result(database->con)) != NULL)
{
num_fields = mysql_num_fields(result);
while ((row = mysql_fetch_row(result)))
/* Check first for MariaDB 10.x.x and get status for multimaster replication */
if (server_version >= 100000) {
if (mysql_query(database->con, "SHOW ALL SLAVES STATUS") == 0
&& (result = mysql_store_result(database->con)) != NULL)
{
if (strncmp(row[10], "Yes", 3) == 0
&& strncmp(row[11], "Yes", 3) == 0)
int i = 0;
num_fields = mysql_num_fields(result);
while ((row = mysql_fetch_row(result)))
{
if (strncmp(row[12], "Yes", 3) == 0
&& strncmp(row[13], "Yes", 3) == 0) {
isslave += 1;
}
i++;
}
mysql_free_result(result);
if (isslave == i)
isslave = 1;
else
isslave = 0;
}
} else {
if (mysql_query(database->con, "SHOW SLAVE STATUS") == 0
&& (result = mysql_store_result(database->con)) != NULL)
{
num_fields = mysql_num_fields(result);
while ((row = mysql_fetch_row(result)))
{
if (strncmp(row[10], "Yes", 3) == 0
&& strncmp(row[11], "Yes", 3) == 0)
isslave = 1;
}
mysql_free_result(result);
}
}
/* Get the master_timestamp value from maxscale_schema.replication_heartbeat table */
if (isslave && replication_heartbeat == 1) {
time_t heartbeat;
char select_heartbeat_query[256] = "";
sprintf(select_heartbeat_query, "SELECT master_timestamp "
"FROM maxscale_schema.replication_heartbeat "
"WHERE maxscale_id = %lu AND master_server_id = %i",
id, handle->master_id);
/* if there is a master then send the query to the slave with master_id*/
if (handle->master_id >= 0 && (mysql_query(database->con, select_heartbeat_query) == 0
&& (result = mysql_store_result(database->con)) != NULL)) {
num_fields = mysql_num_fields(result);
while ((row = mysql_fetch_row(result))) {
int rlag = -1;
time_t slave_read;
heartbeat = time(0);
slave_read = strtoul(row[0], NULL, 10);
if ((errno == ERANGE && (slave_read == LONG_MAX || slave_read == LONG_MIN)) || (errno != 0 && slave_read == 0)) {
slave_read = 0;
}
if (slave_read) {
/* set the replication lag */
rlag = heartbeat - slave_read;
}
/* set this node_ts as master_timestamp read from replication_heartbeat table */
database->server->node_ts = slave_read;
if (rlag >= 0) {
/* store rlag only if greater than monitor sampling interval */
database->server->rlag = (rlag > (handle->interval / 1000)) ? rlag : 0;
} else {
database->server->rlag = -1;
}
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"[mysql_mon]: replication heartbeat: "
"server %s:%i is %i seconds behind master",
database->server->name,
database->server->port,
database->server->rlag)));
}
mysql_free_result(result);
} else {
database->server->rlag = -1;
database->server->node_ts = 0;
if (handle->master_id < 0) {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"[mysql_mon]: error: replication heartbeat: "
"master_server_id NOT available for %s:%i",
database->server->name,
database->server->port)));
} else {
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"[mysql_mon]: error: replication heartbeat: "
"failed selecting from hearthbeat table of %s:%i : [%s], %s",
database->server->name,
database->server->port,
select_heartbeat_query,
mysql_error(database->con))));
}
}
mysql_free_result(result);
}
if (ismaster)
@ -405,9 +647,48 @@ MONITOR_SERVERS *ptr;
ptr = handle->databases;
while (ptr)
{
monitorDatabase(ptr, handle->defaultUser, handle->defaultPasswd);
monitorDatabase(handle, ptr);
ptr = ptr->next;
}
thread_millisleep(10000);
thread_millisleep(handle->interval);
}
}
/**
* Set the default id to use in the monitor.
*
* @param arg The handle allocated by startMonitor
* @param id The id to set in monitor struct
*/
static void
defaultId(void *arg, unsigned long id)
{
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
memcpy(&handle->id, &id, sizeof(unsigned long));
}
/**
* Set the monitor sampling interval.
*
* @param arg The handle allocated by startMonitor
* @param interval The interval to set in monitor struct, in milliseconds
*/
static void
setInterval(void *arg, unsigned long interval)
{
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
memcpy(&handle->interval, &interval, sizeof(unsigned long));
}
/**
* Enable/Disable the MySQL Replication hearbeat, detecting slave lag behind master.
*
* @param arg The handle allocated by startMonitor
* @param replicationHeartbeat To enable it 1, disable it with 0
*/
static void
replicationHeartbeat(void *arg, int replicationHeartbeat)
{
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
memcpy(&handle->replicationHeartbeat, &replicationHeartbeat, sizeof(int));
}

View File

@ -27,8 +27,10 @@
* @verbatim
* Revision History
*
* Date Who Description
* 08/07/13 Mark Riddoch Initial implementation
* Date Who Description
* 08/07/13 Mark Riddoch Initial implementation
* 26/05/14 Massimiliano Pinto Default values for MONITOR_INTERVAL
* 28/05/14 Massimiliano Pinto Addition of new fields in MYSQL_MONITOR struct
*
* @endverbatim
*/
@ -54,6 +56,10 @@ typedef struct {
int status; /**< Monitor status */
char *defaultUser; /**< Default username for monitoring */
char *defaultPasswd; /**< Default password for monitoring */
unsigned long interval; /**< Monitor sampling interval */
unsigned long id; /**< Monitor ID */
int replicationHeartbeat; /**< Monitor flag for MySQL replication heartbeat */
int master_id; /**< Master server-id for MySQL Master/Slave replication */
MONITOR_SERVERS *databases; /**< Linked list of servers to monitor */
} MYSQL_MONITOR;
@ -61,4 +67,7 @@ typedef struct {
#define MONITOR_STOPPING 2
#define MONITOR_STOPPED 3
#define MONITOR_INTERVAL 10000 // in milliseconds
#define MONITOR_DEFAULT_ID 1UL // unsigned long value
#endif

View File

@ -282,7 +282,8 @@ static int gw_read_backend_event(DCB *dcb) {
} /* switch */
}
if (backend_protocol->state == MYSQL_AUTH_FAILED) {
if (backend_protocol->state == MYSQL_AUTH_FAILED)
{
/**
* protocol state won't change anymore,
* lock can be freed
@ -311,12 +312,14 @@ static int gw_read_backend_event(DCB *dcb) {
/* try reload users' table for next connection */
service_refresh_users(dcb->session->client->service);
while (session->state != SESSION_STATE_ROUTER_READY)
while (session->state != SESSION_STATE_ROUTER_READY &&
session->state != SESSION_STATE_STOPPING)
{
ss_dassert(
session->state == SESSION_STATE_READY ||
session->state ==
SESSION_STATE_ROUTER_READY);
SESSION_STATE_ROUTER_READY ||
session->state == SESSION_STATE_STOPPING);
/**
* Session shouldn't be NULL at this point
* anymore. Just checking..
@ -328,6 +331,15 @@ static int gw_read_backend_event(DCB *dcb) {
}
usleep(1);
}
if (session->state == SESSION_STATE_STOPPING)
{
goto return_rc;
}
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
/**
* rsession shouldn't be NULL since session
* state indicates that it was initialized
@ -362,8 +374,7 @@ static int gw_read_backend_event(DCB *dcb) {
/* check the delay queue and flush the data */
if (dcb->delayq)
{
backend_write_delayqueue(dcb);
rc = 1;
rc = backend_write_delayqueue(dcb);
goto return_with_lock;
}
}
@ -419,21 +430,23 @@ static int gw_read_backend_event(DCB *dcb) {
if (dcb->session->client != NULL) {
client_protocol = SESSION_PROTOCOL(dcb->session,
MySQLProtocol);
}
if (client_protocol != NULL) {
CHK_PROTOCOL(client_protocol);
if (client_protocol != NULL) {
CHK_PROTOCOL(client_protocol);
if (client_protocol->state == MYSQL_IDLE)
{
router->clientReply(router_instance,
if (client_protocol->state == MYSQL_IDLE)
{
router->clientReply(router_instance,
rsession,
writebuf,
dcb);
rc = 1;
}
goto return_rc;
}
rc = 1;
}
goto return_rc;
} else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL) {
router->clientReply(router_instance, rsession, writebuf, dcb);
rc = 1;
}
}
}
return_rc:
@ -572,9 +585,8 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
snprintf(str, len+1, "%s", startpoint);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing query \"%s\" failed due to "
"authentication failure.",
str)));
"Error : Unable to write to backend due to "
"authentication failure.")));
/** Consume query buffer */
while ((queue = gwbuf_consume(
queue,
@ -672,6 +684,10 @@ static int gw_error_backend_event(DCB *dcb) {
if (session->state == SESSION_STATE_ROUTER_READY)
{
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
rsession = session->router_session;
/*<
* rsession should never be NULL here.
@ -852,34 +868,36 @@ static int backend_write_delayqueue(DCB *dcb)
spinlock_acquire(&dcb->delayqlock);
if (dcb->delayq == NULL)
{
spinlock_release(&dcb->delayqlock);
rc = 1;
}
else
{
localq = dcb->delayq;
dcb->delayq = NULL;
spinlock_release(&dcb->delayqlock);
rc = dcb_write(dcb, localq);
}
if (rc == 0) {
/*< vraa : errorHandle */
/**
* This error can be muted because it is often due
* unexpected dcb state which means that concurrent thread
* already wrote the queue and closed dcb.
*/
#if 0
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"%lu [backend_write_delayqueue] Some error occurred in "
"backend.",
pthread_self())));
#endif
"Error : failed to write buffered data to back-end "
"server. Buffer was empty of back-end was disconnected "
"during operation.")));
mysql_send_custom_error(
dcb->session->client,
1,
0,
"Unable to write to backend server. Connection was "
"closed.");
"Failed to write buffered data to back-end server. "
"Buffer was empty or back-end was disconnected during "
"operation.");
dcb_close(dcb);
}
return rc;
}

View File

@ -552,7 +552,7 @@ int gw_read_client_event(DCB* dcb) {
goto return_rc;
}
// close client socket and the sessioA too
// close client socket and the session too
dcb->func.close(dcb);
} else {
// do nothing if reading 1 byte
@ -723,7 +723,7 @@ int gw_read_client_event(DCB* dcb) {
dcb,
1,
0,
"Query routing failed. Connection to "
"Can't route query. Connection to "
"backend lost");
protocol->state = MYSQL_IDLE;
}
@ -772,12 +772,8 @@ int gw_read_client_event(DCB* dcb) {
"backend. Close client dcb %p",
pthread_self(),
dcb)));
/** close client connection */
(dcb->func).close(dcb);
/** close backends connection */
router->closeSession(router_instance, rsession);
rc = 1;
/** close client connection, closes router session too */
rc = dcb->func.close(dcb);
}
else
{
@ -1263,37 +1259,16 @@ return_rc:
return rc;
}
static int gw_error_client_event(DCB *dcb) {
SESSION* session;
ROUTER_OBJECT* router;
void* router_instance;
void* rsession;
#if defined(SS_DEBUG)
MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol;
if (dcb->state == DCB_STATE_POLLING ||
dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE)
static int gw_error_client_event(
DCB* dcb)
{
CHK_PROTOCOL(protocol);
}
#endif
int rc;
session = dcb->session;
CHK_DCB(dcb);
/**
* session may be NULL if session_alloc failed.
* In that case router session was not created.
*/
if (session != NULL) {
CHK_SESSION(session);
router = session->service->router;
router_instance = session->service->router_instance;
rsession = session->router_session;
router->closeSession(router_instance, rsession);
}
dcb_close(dcb);
return 1;
rc = dcb->func.close(dcb);
return rc;
}
static int
@ -1320,6 +1295,10 @@ gw_client_close(DCB *dcb)
*/
if (session != NULL) {
CHK_SESSION(session);
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
router = session->service->router;
router_instance = session->service->router_instance;
rsession = session->router_session;
@ -1342,42 +1321,11 @@ gw_client_close(DCB *dcb)
static int
gw_client_hangup_event(DCB *dcb)
{
SESSION* session;
ROUTER_OBJECT* router;
void* router_instance;
void* rsession;
int rc = 1;
int rc;
#if defined(SS_DEBUG)
MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol;
if (dcb->state == DCB_STATE_POLLING ||
dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE)
{
CHK_PROTOCOL(protocol);
}
#endif
CHK_DCB(dcb);
rc = dcb->func.close(dcb);
if (dcb->state != DCB_STATE_POLLING) {
goto return_rc;
}
session = dcb->session;
/**
* session may be NULL if session_alloc failed.
* In that case router session was not created.
*/
if (session != NULL) {
CHK_SESSION(session);
router = session->service->router;
router_instance = session->service->router_instance;
rsession = session->router_session;
router->closeSession(router_instance, rsession);
}
dcb_close(dcb);
return_rc:
return rc;
}

View File

@ -45,7 +45,7 @@
extern int lm_enabled_logfiles_bitmask;
static char *version_str = "V1.0.1";
static char *version_str = "V1.1.1";
/* The router entry points */
static ROUTER *createInstance(SERVICE *service, char **options);
@ -127,6 +127,7 @@ static ROUTER *
createInstance(SERVICE *service, char **options)
{
CLI_INSTANCE *inst;
int i;
if ((inst = malloc(sizeof(CLI_INSTANCE))) == NULL)
return NULL;
@ -134,7 +135,29 @@ CLI_INSTANCE *inst;
inst->service = service;
spinlock_init(&inst->lock);
inst->sessions = NULL;
inst->mode = CLIM_USER;
if (options)
{
for (i = 0; options[i]; i++)
{
if (!strcasecmp(options[i], "developer"))
{
inst->mode = CLIM_DEVELOPER;
}
else if (!strcasecmp(options[i], "user"))
{
inst->mode = CLIM_USER;
}
else
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Unknown option for CLI '%s'\n",
options[i])));
}
}
}
/*
* We have completed the creation of the instance data, so now
@ -176,11 +199,15 @@ CLI_SESSION *client;
spinlock_release(&inst->lock);
session->state = SESSION_STATE_READY;
client->mode = inst->mode;
dcb_printf(session->client, "Welcome the SkySQL MaxScale Debug Interface (%s).\n",
version_str);
dcb_printf(session->client, "WARNING: This interface is meant for developer usage,\n");
dcb_printf(session->client, "passing incorrect addresses to commands can endanger your MaxScale server.\n\n");
if (client->mode == CLIM_DEVELOPER)
{
dcb_printf(session->client, "WARNING: This interface is meant for developer usage,\n");
dcb_printf(session->client, "passing incorrect addresses to commands can endanger your MaxScale server.\n\n");
}
dcb_printf(session->client, "Type help for a list of available commands.\n\n");
return (void *)client;
@ -281,4 +308,4 @@ static uint8_t getCapabilities(
void* router_session)
{
return 0;
}
}

View File

@ -36,7 +36,10 @@
* Date Who Description
* 20/06/13 Mark Riddoch Initial implementation
* 17/07/13 Mark Riddoch Additional commands
* 09/08/2013 Massimiliano Pinto Addes enable/disable commands (now only for log)
* 09/08/2013 Massimiliano Pinto Added enable/disable commands (now only for log)
* 20/05/14 Mark Riddoch Added ability to give server and service names rather
* than simply addresses
* 23/05/14 Mark Riddoch Added support for developer and user modes
*
* @endverbatim
*/
@ -69,6 +72,12 @@
#define ARG_TYPE_ADDRESS 1
#define ARG_TYPE_STRING 2
#define ARG_TYPE_SERVICE 3
#define ARG_TYPE_SERVER 4
#define ARG_TYPE_DBUSERS 5
#define ARG_TYPE_SESSION 6
#define ARG_TYPE_DCB 7
#define ARG_TYPE_MONITOR 8
/**
* The subcommand structure
*
@ -79,6 +88,7 @@ struct subcommand {
int n_args;
void (*fn)();
char *help;
char *devhelp;
int arg_types[3];
};
@ -87,33 +97,87 @@ static void telnetdShowUsers(DCB *);
* The subcommands of the show command
*/
struct subcommand showoptions[] = {
{ "dcbs", 0, dprintAllDCBs, "Show all descriptor control blocks (network connections)",
{ "dcbs", 0, dprintAllDCBs,
"Show all descriptor control blocks (network connections)",
"Show all descriptor control blocks (network connections)",
{0, 0, 0} },
{ "dcb", 1, dprintDCB, "Show a single descriptor control block e.g. show dcb 0x493340",
{ARG_TYPE_ADDRESS, 0, 0} },
{ "dbusers", 1, dcb_usersPrint, "Show statistics and user names for a service's user table.\n\t\tExample : show dbusers <ptr of 'User's data' from services list>",
{ARG_TYPE_ADDRESS, 0, 0} },
{ "epoll", 0, dprintPollStats, "Show the poll statistics",
{ "dcb", 1, dprintDCB,
"Show a single descriptor control block e.g. show dcb 0x493340",
"Show a single descriptor control block e.g. show dcb 0x493340",
{ARG_TYPE_DCB, 0, 0} },
{ "dbusers", 1, dcb_usersPrint,
"Show statistics and user names for a service's user table.\n\t\tExample : show dbusers <service name>",
"Show statistics and user names for a service's user table.\n\t\tExample : show dbusers <ptr of 'User's data' from services list>|<service name>",
{ARG_TYPE_DBUSERS, 0, 0} },
{ "epoll", 0, dprintPollStats,
"Show the poll statistics",
"Show the poll statistics",
{0, 0, 0} },
{ "modules", 0, dprintAllModules, "Show all currently loaded modules",
{ "modules", 0, dprintAllModules,
"Show all currently loaded modules",
"Show all currently loaded modules",
{0, 0, 0} },
{ "monitors", 0, monitorShowAll, "Show the monitors that are configured",
{ "monitors", 0, monitorShowAll,
"Show the monitors that are configured",
"Show the monitors that are configured",
{0, 0, 0} },
{ "server", 1, dprintServer, "Show details for a server, e.g. show server 0x485390",
{ARG_TYPE_ADDRESS, 0, 0} },
{ "servers", 0, dprintAllServers, "Show all configured servers",
{ "server", 1, dprintServer,
"Show details for a named server, e.g. show server dbnode1",
"Show details for a server, e.g. show server 0x485390. The address may also be repalced with the server name from the configuration file",
{ARG_TYPE_SERVER, 0, 0} },
{ "servers", 0, dprintAllServers,
"Show all configured servers",
"Show all configured servers",
{0, 0, 0} },
{ "services", 0, dprintAllServices, "Show all configured services in MaxScale",
{ "services", 0, dprintAllServices,
"Show all configured services in MaxScale",
"Show all configured services in MaxScale",
{0, 0, 0} },
{ "service", 1, dprintService, "Show single service in MaxScale",
{ "service", 1, dprintService,
"Show a single service in MaxScale, may be passed a service name",
"Show a single service in MaxScale, may be passed a service name or address of a service object",
{ARG_TYPE_SERVICE, 0, 0} },
{ "session", 1, dprintSession, "Show a single session in MaxScale, e.g. show session 0x284830",
{ARG_TYPE_ADDRESS, 0, 0} },
{ "sessions", 0, dprintAllSessions, "Show all active sessions in MaxScale",
{ "session", 1, dprintSession,
"Show a single session in MaxScale, e.g. show session 0x284830",
"Show a single session in MaxScale, e.g. show session 0x284830",
{ARG_TYPE_SESSION, 0, 0} },
{ "sessions", 0, dprintAllSessions,
"Show all active sessions in MaxScale",
"Show all active sessions in MaxScale",
{0, 0, 0} },
{ "users", 0, telnetdShowUsers, "Show statistics and user names for the debug interface",
{ARG_TYPE_ADDRESS, 0, 0} },
{ NULL, 0, NULL, NULL,
{ "users", 0, telnetdShowUsers,
"Show statistics and user names for the debug interface",
"Show statistics and user names for the debug interface",
{0, 0, 0} },
{ NULL, 0, NULL, NULL, NULL,
{0, 0, 0} }
};
/**
* The subcommands of the list command
*/
struct subcommand listoptions[] = {
{ "listeners", 0, dListListeners,
"List all the listeners defined within MaxScale",
"List all the listeners defined within MaxScale",
{0, 0, 0} },
{ "modules", 0, dprintAllModules,
"Show all currently loaded modules",
"Show all currently loaded modules",
{0, 0, 0} },
{ "services", 0, dListServices,
"List all the services defined within MaxScale",
"List all the services defined within MaxScale",
{0, 0, 0} },
{ "servers", 0, dListServers,
"List all the servers defined within MaxScale",
"List all the servers defined within MaxScale",
{0, 0, 0} },
{ "sessions", 0, dListSessions,
"List all the active sessions within MaxScale",
"List all the active sessions within MaxScale",
{0, 0, 0} },
{ NULL, 0, NULL, NULL, NULL,
{0, 0, 0} }
};
@ -129,7 +193,7 @@ struct subcommand shutdownoptions[] = {
0,
shutdown_server,
"Shutdown MaxScale",
"Shutdown MaxScale",
{0, 0, 0}
},
{
@ -137,13 +201,15 @@ struct subcommand shutdownoptions[] = {
1,
shutdown_monitor,
"Shutdown a monitor, e.g. shutdown monitor 0x48381e0",
{ARG_TYPE_ADDRESS, 0, 0}
"Shutdown a monitor, e.g. shutdown monitor 0x48381e0",
{ARG_TYPE_MONITOR, 0, 0}
},
{
"service",
1,
shutdown_service,
"Shutdown a service, e.g. shutdown service 0x4838320",
"Shutdown a service, e.g. shutdown service \"Sales Database\"",
"Shutdown a service, e.g. shutdown service 0x4838320 or shutdown service \"Sales Database\"",
{ARG_TYPE_SERVICE, 0, 0}
},
{
@ -151,6 +217,7 @@ struct subcommand shutdownoptions[] = {
0,
NULL,
NULL,
NULL,
{0, 0, 0}
}
};
@ -162,11 +229,15 @@ static void restart_monitor(DCB *dcb, MONITOR *monitor);
* The subcommands of the restart command
*/
struct subcommand restartoptions[] = {
{ "monitor", 1, restart_monitor, "Restart a monitor, e.g. restart monitor 0x48181e0",
{ARG_TYPE_ADDRESS, 0, 0} },
{ "service", 1, restart_service, "Restart a service, e.g. restart service name",
{ "monitor", 1, restart_monitor,
"Restart a monitor, e.g. restart monitor 0x48181e0",
"Restart a monitor, e.g. restart monitor 0x48181e0",
{ARG_TYPE_MONITOR, 0, 0} },
{ "service", 1, restart_service,
"Restart a service, e.g. restart service \"Test Service\"",
"Restart a service, e.g. restart service 0x4838320",
{ARG_TYPE_SERVICE, 0, 0} },
{ NULL, 0, NULL, NULL,
{ NULL, 0, NULL, NULL, NULL,
{0, 0, 0} }
};
@ -175,9 +246,11 @@ static void set_server(DCB *dcb, SERVER *server, char *bit);
* The subcommands of the set command
*/
struct subcommand setoptions[] = {
{ "server", 2, set_server, "Set the status of a server. E.g. set server 0x4838320 master",
{ARG_TYPE_ADDRESS, ARG_TYPE_STRING, 0} },
{ NULL, 0, NULL, NULL,
{ "server", 2, set_server,
"Set the status of a server. E.g. set server dbnode4 master",
"Set the status of a server. E.g. set server 0x4838320 master",
{ARG_TYPE_SERVER, ARG_TYPE_STRING, 0} },
{ NULL, 0, NULL, NULL, NULL,
{0, 0, 0} }
};
@ -186,9 +259,11 @@ static void clear_server(DCB *dcb, SERVER *server, char *bit);
* The subcommands of the clear command
*/
struct subcommand clearoptions[] = {
{ "server", 2, clear_server, "Clear the status of a server. E.g. clear server 0x4838320 master",
{ARG_TYPE_ADDRESS, ARG_TYPE_STRING, 0} },
{ NULL, 0, NULL, NULL,
{ "server", 2, clear_server,
"Clear the status of a server. E.g. clear server dbnode2 master",
"Clear the status of a server. E.g. clear server 0x4838320 master",
{ARG_TYPE_SERVER, ARG_TYPE_STRING, 0} },
{ NULL, 0, NULL, NULL, NULL,
{0, 0, 0} }
};
@ -199,11 +274,15 @@ static void reload_config(DCB *dcb);
* The subcommands of the reload command
*/
struct subcommand reloadoptions[] = {
{ "config", 0, reload_config, "Reload the configuration data for MaxScale.",
{ARG_TYPE_ADDRESS, 0, 0} },
{ "dbusers", 1, reload_dbusers, "Reload the dbuser data for a service. E.g. reload dbusers 0x849420",
{ARG_TYPE_ADDRESS, 0, 0} },
{ NULL, 0, NULL, NULL,
{ "config", 0, reload_config,
"Reload the configuration data for MaxScale.",
"Reload the configuration data for MaxScale.",
{0, 0, 0} },
{ "dbusers", 1, reload_dbusers,
"Reload the dbuser data for a service. E.g. reload dbusers \"splitter service\"",
"Reload the dbuser data for a service. E.g. reload dbusers 0x849420",
{ARG_TYPE_DBUSERS, 0, 0} },
{ NULL, 0, NULL, NULL, NULL,
{0, 0, 0} }
};
@ -220,6 +299,8 @@ struct subcommand enableoptions[] = {
enable_log_action,
"Enable Log options for MaxScale, options trace | error | "
"message E.g. enable log message.",
"Enable Log options for MaxScale, options trace | error | "
"message E.g. enable log message.",
{ARG_TYPE_STRING, 0, 0}
},
{
@ -227,6 +308,7 @@ struct subcommand enableoptions[] = {
0,
NULL,
NULL,
NULL,
{0, 0, 0}
}
};
@ -242,6 +324,8 @@ struct subcommand disableoptions[] = {
disable_log_action,
"Disable Log for MaxScale, Options: debug | trace | error | message "
"E.g. disable log debug",
"Disable Log for MaxScale, Options: debug | trace | error | message "
"E.g. disable log debug",
{ARG_TYPE_STRING, 0, 0}
},
{
@ -249,6 +333,7 @@ struct subcommand disableoptions[] = {
0,
NULL,
NULL,
NULL,
{0, 0, 0}
}
};
@ -267,6 +352,7 @@ struct subcommand failoptions[] = {
0,
fail_backendfd,
"Fail backend socket for next operation.",
"Fail backend socket for next operation.",
{ARG_TYPE_STRING, 0, 0}
},
{
@ -274,6 +360,7 @@ struct subcommand failoptions[] = {
0,
fail_clientfd,
"Fail client socket for next operation.",
"Fail client socket for next operation.",
{ARG_TYPE_STRING, 0, 0}
},
{
@ -281,6 +368,7 @@ struct subcommand failoptions[] = {
2,
fail_accept,
"Fail to accept next client connection.",
"Fail to accept next client connection.",
{ARG_TYPE_STRING, ARG_TYPE_STRING, 0}
},
{
@ -288,6 +376,7 @@ struct subcommand failoptions[] = {
0,
NULL,
NULL,
NULL,
{0, 0, 0}
}
};
@ -298,9 +387,11 @@ static void telnetdAddUser(DCB *, char *, char *);
* The subcommands of the add command
*/
struct subcommand addoptions[] = {
{ "user", 2, telnetdAddUser, "Add a new user for the debug interface. E.g. add user john today",
{ "user", 2, telnetdAddUser,
"Add a new user for the debug interface. E.g. add user john today",
"Add a new user for the debug interface. E.g. add user john today",
{ARG_TYPE_STRING, ARG_TYPE_STRING, 0} },
{ NULL, 0, NULL, NULL,
{ NULL, 0, NULL, NULL, NULL,
{0, 0, 0} }
};
@ -315,10 +406,11 @@ struct subcommand removeoptions[] = {
2,
telnetdRemoveUser,
"Remove existing maxscale user. Example : remove user john johnpwd",
"Remove existing maxscale user. Example : remove user john johnpwd",
{ARG_TYPE_STRING, ARG_TYPE_STRING, 0}
},
{
NULL, 0, NULL, NULL, {0, 0, 0}
NULL, 0, NULL, NULL, NULL, {0, 0, 0}
}
};
@ -332,17 +424,18 @@ static struct {
} cmds[] = {
{ "add", addoptions },
{ "clear", clearoptions },
{ "disable", disableoptions },
{ "enable", enableoptions },
#if defined(SS_DEBUG)
{ "fail", failoptions },
#endif
{ "list", listoptions },
{ "reload", reloadoptions },
{ "remove", removeoptions },
{ "restart", restartoptions },
{ "set", setoptions },
{ "show", showoptions },
{ "shutdown", shutdownoptions },
{ "reload", reloadoptions },
{ "enable", enableoptions },
{ "disable", disableoptions },
#if defined(SS_DEBUG)
{ "fail", failoptions },
#endif
{ NULL, NULL }
};
@ -351,25 +444,55 @@ static struct {
* Convert a string argument to a numeric, observing prefixes
* for number bases, e.g. 0x for hex, 0 for octal
*
* @param mode The CLI mode
* @param arg The string representation of the argument
* @param arg_type The target type for the argument
* @return The argument as a long integer
*/
static unsigned long
convert_arg(char *arg, int arg_type)
convert_arg(int mode, char *arg, int arg_type)
{
unsigned long rval;
SERVICE *service;
switch (arg_type)
{
case ARG_TYPE_SERVICE:
if ((rval = (unsigned long)strtol(arg, NULL, 0)) == 0)
rval = (unsigned long)service_find(arg);
return rval;
case ARG_TYPE_ADDRESS:
return (unsigned long)strtol(arg, NULL, 0);
case ARG_TYPE_STRING:
return (unsigned long)arg;
case ARG_TYPE_SERVICE:
if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0)
rval = (unsigned long)service_find(arg);
return rval;
case ARG_TYPE_SERVER:
if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0)
rval = (unsigned long)server_find_by_unique_name(arg);
return rval;
case ARG_TYPE_DBUSERS:
if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0)
{
service = service_find(arg);
if (service)
return (unsigned long)(service->users);
else
return 0;
}
return rval;
case ARG_TYPE_DCB:
rval = (unsigned long)strtol(arg, NULL, 0);
if (mode == CLIM_USER && dcb_isvalid((DCB *)rval) == 0)
rval = 0;
return rval;
case ARG_TYPE_SESSION:
rval = (unsigned long)strtol(arg, NULL, 0);
if (mode == CLIM_USER && session_isvalid((SESSION *)rval) == 0)
rval = 0;
return rval;
case ARG_TYPE_MONITOR:
if (mode == CLIM_USER || (rval = (unsigned long)strtol(arg, NULL, 0)) == 0)
rval = (unsigned long)monitor_find(arg);
return rval;
}
return 0;
}
@ -393,36 +516,96 @@ execute_cmd(CLI_SESSION *cli)
{
DCB *dcb = cli->session->client;
int argc, i, j, found = 0;
char *args[MAXARGS];
char *saveptr, *delim = " \t\r\n";
char *args[MAXARGS + 1];
unsigned long arg1, arg2, arg3;
int in_quotes = 0, escape_next = 0;
char *ptr, *lptr;
/* Tokenize the input string */
args[0] = strtok_r(cli->cmdbuf, delim, &saveptr);
args[0] = cli->cmdbuf;
ptr = args[0];
lptr = ptr;
i = 0;
do {
i++;
args[i] = strtok_r(NULL, delim, &saveptr);
} while (args[i] != NULL && i < MAXARGS);
/*
* Break the command line into a number of words. Whitespace is used
* to delimit words and may be escaped by use of the \ character or
* the use of double quotes.
* The array args contains the broken down words, one per index.
*/
while (*ptr)
{
if (escape_next)
{
*lptr++ = *ptr++;
escape_next = 0;
}
else if (*ptr == '\\')
{
escape_next = 1;
ptr++;
}
else if (in_quotes == 0 && (*ptr == ' ' || *ptr == '\t' || *ptr == '\r' || *ptr == '\n'))
{
*lptr = 0;
if (args[i] == ptr)
args[i] = ptr + 1;
else
{
i++;
if (i >= MAXARGS)
break;
args[i] = ptr + 1;
}
ptr++;
lptr++;
}
else if (*ptr == '\"' && in_quotes == 0)
{
in_quotes = 1;
ptr++;
}
else if (*ptr == '\"' && in_quotes == 1)
{
in_quotes = 0;
ptr++;
}
else
{
*lptr++ = *ptr++;
}
}
*lptr = 0;
args[i+1] = NULL;
if (args[0] == NULL)
if (args[0] == NULL || *args[0] == 0)
return 1;
argc = i - 2; /* The number of extra arguments to commands */
if (!strcasecmp(args[0], "help"))
{
if (args[1] == NULL)
if (args[1] == NULL || *args[1] == 0)
{
found = 1;
dcb_printf(dcb, "Available commands:\n");
for (i = 0; cmds[i].cmd; i++)
{
for (j = 0; cmds[i].options[j].arg1; j++)
if (cmds[i].options[1].arg1 == NULL)
dcb_printf(dcb, " %s %s\n", cmds[i].cmd, cmds[i].options[0].arg1);
else
{
dcb_printf(dcb, " %s %s\n", cmds[i].cmd, cmds[i].options[j].arg1);
dcb_printf(dcb, " %s [", cmds[i].cmd);
for (j = 0; cmds[i].options[j].arg1; j++)
{
dcb_printf(dcb, "%s%s", cmds[i].options[j].arg1,
cmds[i].options[j+1].arg1 ? "|" : "");
}
dcb_printf(dcb, "]\n");
}
}
dcb_printf(dcb, "\nType help command to see details of each command.\n");
dcb_printf(dcb, "Where commands require names as arguments and these names contain\n");
dcb_printf(dcb, "whitespace either the \\ character may be used to escape the whitespace\n");
dcb_printf(dcb, "or the name may be enclosed in double quotes \".\n\n");
}
else
{
@ -458,9 +641,9 @@ unsigned long arg1, arg2, arg3;
{
for (j = 0; cmds[i].options[j].arg1; j++)
{
found = 1; /**< command and sub-command match */
if (strcasecmp(args[1], cmds[i].options[j].arg1) == 0)
{
found = 1; /**< command and sub-command match */
if (argc != cmds[i].options[j].n_args)
{
dcb_printf(dcb, "Incorrect number of arguments: %s %s expects %d arguments\n",
@ -476,7 +659,7 @@ unsigned long arg1, arg2, arg3;
cmds[i].options[j].fn(dcb);
break;
case 1:
arg1 = convert_arg(args[2],cmds[i].options[j].arg_types[0]);
arg1 = convert_arg(cli->mode, args[2],cmds[i].options[j].arg_types[0]);
if (arg1)
cmds[i].options[j].fn(dcb, arg1);
else
@ -484,8 +667,8 @@ unsigned long arg1, arg2, arg3;
args[2]);
break;
case 2:
arg1 = convert_arg(args[2],cmds[i].options[j].arg_types[0]);
arg2 = convert_arg(args[3],cmds[i].options[j].arg_types[1]);
arg1 = convert_arg(cli->mode, args[2],cmds[i].options[j].arg_types[0]);
arg2 = convert_arg(cli->mode, args[3],cmds[i].options[j].arg_types[1]);
if (arg1 && arg2)
cmds[i].options[j].fn(dcb, arg1, arg2);
else if (arg1 == 0)
@ -496,9 +679,9 @@ unsigned long arg1, arg2, arg3;
args[3]);
break;
case 3:
arg1 = convert_arg(args[2],cmds[i].options[j].arg_types[0]);
arg2 = convert_arg(args[3],cmds[i].options[j].arg_types[1]);
arg3 = convert_arg(args[4],cmds[i].options[j].arg_types[2]);
arg1 = convert_arg(cli->mode, args[2],cmds[i].options[j].arg_types[0]);
arg2 = convert_arg(cli->mode, args[3],cmds[i].options[j].arg_types[1]);
arg3 = convert_arg(cli->mode, args[4],cmds[i].options[j].arg_types[2]);
if (arg1 && arg2 && arg3)
cmds[i].options[j].fn(dcb, arg1, arg2, arg3);
else if (arg1 == 0)

View File

@ -252,10 +252,12 @@ int i, n;
}
else
{
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Warning : Unsupported router "
"option %s for readconnroute.",
LOGIF(LM, (skygw_log_write(
LOGFILE_MESSAGE,
"* Warning : Unsupported router "
"option \'%s\' for readconnroute. "
"Expected router options are "
"[slave|master|synced]",
options[i])));
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,9 @@
use test;
drop table if exists t1;
create table t1 (id integer);
set autocommit=0; -- open transaction
begin;
insert into t1 values(1); -- write to master
commit;
select count(*) from t1; -- read from master since autocommit is disabled
drop table t1;

View File

@ -0,0 +1,41 @@
# cleantests - clean local and subdirectories' tests
# buildtests - build all local and subdirectories' tests
# runtests - run all local tests
# testall - clean, build and run local and subdirectories' tests
include ../../../../build_gateway.inc
include $(ROOT_PATH)/makefile.inc
include $(ROOT_PATH)/test.inc
CC=cc
TESTLOG := $(shell pwd)/testrouting.log
RET := -1
cleantests:
- $(DEL) *.o
- $(DEL) *~
testall:
-$(MAKE) cleantests
-$(MAKE) DEBUG=Y buildtests
-$(MAKE) runtests
@echo "" >> $(TESTLOG)
@echo "-------------------------------" >> $(TESTLOG)
@echo $(shell date) >> $(TESTLOG)
@echo "Test Read/Write Split Router" >> $(TESTLOG)
$(MAKE) -C $(ROOT_PATH)/server/modules/routing/readwritesplit testall
buildtests:
$(MAKE) -C $(ROOT_PATH)/server/modules/routing/readwritesplit buildtests
runtests:
@echo "" > $(TESTLOG)
@echo "-------------------------------" >> $(TESTLOG)
@echo $(shell date) >> $(TESTLOG)
@echo "Test routing" >> $(TESTLOG)
@echo "-------------------------------" >> $(TESTLOG)
@echo "Nothing to run here so far" >> $(TESTLOG)
@cat $(TESTLOG) >> $(TEST_MAXSCALE_LOG)

View File

@ -119,7 +119,10 @@ typedef enum skygw_chk_t {
CHK_NUM_SESSION,
CHK_NUM_ROUTER_SES,
CHK_NUM_MY_SESCMD,
CHK_NUM_ROUTER_PROPERTY
CHK_NUM_ROUTER_PROPERTY,
CHK_NUM_SESCMD_CUR,
CHK_NUM_BACKEND,
CHK_NUM_BACKEND_REF
} skygw_chk_t;
# define STRBOOL(b) ((b) ? "true" : "false")
@ -221,6 +224,11 @@ typedef enum skygw_chk_t {
((t) == BE_UNDEFINED ? "BE_UNDEFINED" : \
"Unknown backend tpe")))
#define STRCRITERIA(c) ((c) == UNDEFINED_CRITERIA ? "UNDEFINED_CRITERIA" : \
((c) == LEAST_GLOBAL_CONNECTIONS ? "LEAST_GLOBAL_CONNECTIONS" : \
((c) == LEAST_ROUTER_CONNECTIONS ? "LEAST_ROUTER_CONNECTIONS" : \
((c) == LEAST_BEHIND_MASTER ? "LEAST_BEHIND_MASTER" : "Unknown criteria"))))
#define CHK_MLIST(l) { \
ss_info_dassert((l->mlist_chk_top == CHK_NUM_MLIST && \
l->mlist_chk_tail == CHK_NUM_MLIST), \
@ -446,7 +454,25 @@ typedef enum skygw_chk_t {
"Session command has invalid check fields"); \
}
#define CHK_SESCMD_CUR(c) { \
ss_info_dassert((c)->scmd_cur_chk_top == CHK_NUM_SESCMD_CUR && \
(c)->scmd_cur_chk_tail == CHK_NUM_SESCMD_CUR, \
"Session command cursor has invalid check fields"); \
}
#define CHK_BACKEND(b) { \
ss_info_dassert((b)->be_chk_top == CHK_NUM_BACKEND && \
(b)->be_chk_tail == CHK_NUM_BACKEND, \
"BACKEND has invalid check fields"); \
}
#define CHK_BACKEND_REF(r) { \
ss_info_dassert((r)->bref_chk_top == CHK_NUM_BACKEND_REF && \
(r)->bref_chk_tail == CHK_NUM_BACKEND_REF, \
"Backend reference has invalid check fields"); \
}
#if defined(SS_DEBUG)
bool conn_open[10240];
#endif