merge from branch 'cenh'

merge from branch ‘cenh’
This commit is contained in:
MassimilianoPinto
2014-05-26 12:15:34 +02:00
21 changed files with 778 additions and 124 deletions

View File

@ -29,7 +29,8 @@
* 06/02/14 Massimiliano Pinto Added support for enable/disable root user in services
* 14/02/14 Massimiliano Pinto Added enable_root_user in the service_params list
* 11/03/14 Massimiliano Pinto Added Unix socket support
* 11/05/14 Massimiliano Pinto Added version_string support to service
* 11/05/14 Massimiliano Pinto Added version_string support to service
* 19/05/14 Mark Riddoch Added unique names from section headers
*
* @endverbatim
*/
@ -129,7 +130,6 @@ int rval;
if (ptr) {
*ptr = '\0';
}
}
mysql_close(conn);
}
@ -226,7 +226,7 @@ int error_count = 0;
config_get_value(obj->parameters, "passwd");
char *enable_root_user =
config_get_value(obj->parameters, "enable_root_user");
char *version_string = config_get_value(obj->parameters, "version_string");
if (obj->element == NULL) /*< if module load failed */
@ -241,7 +241,7 @@ int error_count = 0;
obj = obj->next;
continue; /*< process next obj */
}
if (version_string) {
((SERVICE *)(obj->element))->version_string = strdup(version_string);
} else {
@ -333,6 +333,7 @@ int error_count = 0;
obj->element = server_alloc(address,
protocol,
atoi(port));
server_set_unique_name(obj->element, obj->object);
}
else
{

View File

@ -183,6 +183,8 @@ getUsers(SERVICE *service, struct users *users)
}
serviceGetUser(service, &service_user, &service_passwd);
if (service_user == NULL || service_passwd == NULL)
return -1;
/** multi-thread environment requires that thread init succeeds. */
if (mysql_thread_init()) {

View File

@ -47,6 +47,7 @@
* error and 0 bytes to read.
* This fixes a bug with many reads from
* backend
* 07/05/2014 Mark Riddoch Addition of callback mechanism
*
* @endverbatim
*/
@ -80,6 +81,7 @@ static bool dcb_set_state_nomutex(
DCB* dcb,
const dcb_state_t new_state,
dcb_state_t* old_state);
static void dcb_call_callback(DCB *dcb, DCB_REASON reason);
DCB* dcb_get_zombies(void)
{
@ -94,8 +96,8 @@ DCB* dcb_get_zombies(void)
*
* @return A newly allocated DCB or NULL if non could be allocated.
*/
DCB * dcb_alloc(
dcb_role_t role)
DCB *
dcb_alloc(dcb_role_t role)
{
DCB *rval;
@ -118,11 +120,16 @@ DCB *rval;
spinlock_init(&rval->writeqlock);
spinlock_init(&rval->delayqlock);
spinlock_init(&rval->authlock);
spinlock_init(&rval->cb_lock);
rval->fd = -1;
memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics
rval->state = DCB_STATE_ALLOC;
bitmask_init(&rval->memdata.bitmask);
rval->writeqlen = 0;
rval->high_water = 0;
rval->low_water = 0;
rval->next = NULL;
rval->callbacks = NULL;
spinlock_acquire(&dcbspin);
if (allDCBs == NULL)
@ -248,6 +255,8 @@ dcb_add_to_zombieslist(DCB *dcb)
static void
dcb_final_free(DCB *dcb)
{
DCB_CALLBACK *cb;
CHK_DCB(dcb);
ss_info_dassert(dcb->state == DCB_STATE_DISCONNECTED,
"dcb not in DCB_STATE_DISCONNECTED state.");
@ -307,6 +316,19 @@ dcb_final_free(DCB *dcb)
GWBUF *queue = dcb->delayq;
while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL);
}
if (dcb->dcb_readqueue)
{
GWBUF* queue = dcb->dcb_readqueue;
while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL);
}
spinlock_acquire(&dcb->cb_lock);
while ((cb = dcb->callbacks) != NULL)
{
dcb->callbacks = cb->next;
free(cb);
}
spinlock_release(&dcb->cb_lock);
if (dcb->dcb_readqueue)
{
@ -688,9 +710,11 @@ return_n:
int
dcb_write(DCB *dcb, GWBUF *queue)
{
int w;
int saved_errno = 0;
int w, qlen;
int saved_errno = 0;
int below_water;
below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0;
ss_dassert(queue != NULL);
/**
@ -734,6 +758,8 @@ dcb_write(DCB *dcb, GWBUF *queue)
* the routine that drains the queue data, so we should
* not have a race condition on the event.
*/
qlen = gwbuf_length(queue);
atomic_add(&dcb->writeqlen, qlen);
dcb->writeq = gwbuf_append(dcb->writeq, queue);
dcb->stats.n_buffered++;
LOGIF(LD, (skygw_log_write(
@ -846,6 +872,8 @@ dcb_write(DCB *dcb, GWBUF *queue)
* for suspended write.
*/
dcb->writeq = queue;
qlen = gwbuf_length(queue);
atomic_add(&dcb->writeqlen, qlen);
if (queue != NULL)
{
@ -869,6 +897,13 @@ dcb_write(DCB *dcb, GWBUF *queue)
return 0;
}
spinlock_release(&dcb->writeqlock);
if (dcb->high_water && dcb->writeqlen > dcb->high_water && below_water)
{
atomic_add(&dcb->stats.n_high_water, 1);
dcb_call_callback(dcb, DCB_REASON_HIGH_WATER);
}
return 1;
}
@ -883,9 +918,12 @@ dcb_write(DCB *dcb, GWBUF *queue)
int
dcb_drain_writeq(DCB *dcb)
{
int n = 0;
int w;
int saved_errno = 0;
int n = 0;
int w;
int saved_errno = 0;
int above_water;
above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0;
spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq)
@ -946,6 +984,17 @@ int saved_errno = 0;
}
}
spinlock_release(&dcb->writeqlock);
atomic_add(&dcb->writeqlen, -n);
/* The write queue has drained, potentially need to call a callback function */
if (dcb->writeq == NULL)
dcb_call_callback(dcb, DCB_REASON_DRAINED);
if (above_water && dcb->writeqlen < dcb->low_water)
{
atomic_add(&dcb->stats.n_low_water, 1);
dcb_call_callback(dcb, DCB_REASON_LOW_WATER);
}
return n;
}
@ -988,6 +1037,8 @@ dcb_close(DCB *dcb)
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE);
dcb_call_callback(dcb, DCB_REASON_CLOSE);
if (rc == 0) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
@ -1030,6 +1081,8 @@ printDCB(DCB *dcb)
printf("\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
printf("\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
printf("\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
printf("\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
printf("\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
}
/**
@ -1076,6 +1129,8 @@ DCB *dcb;
dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
dcb = dcb->next;
}
spinlock_release(&dcbspin);
@ -1101,6 +1156,8 @@ dprintDCB(DCB *pdcb, DCB *dcb)
dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
}
/**
@ -1437,4 +1494,163 @@ int gw_write(
return w;
}
/**
* Add a callback
*
* Duplicate registrations are not allowed, therefore an error will be returned if
* the specific function, reason and userdata triple are already registered.
* An error will also be returned if the is insufficient memeory available to
* create the registration.
*
* @param dcb The DCB to add the callback to
* @param reason The callback reason
* @param cb The callback function to call
* @param userdata User data to send in the call
* @return Non-zero (true) if the callback was added
*/
int
dcb_add_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata)
{
DCB_CALLBACK *cb, *ptr;
int rval = 1;
if ((ptr = (DCB_CALLBACK *)malloc(sizeof(DCB_CALLBACK))) == NULL)
{
return 0;
}
ptr->reason = reason;
ptr->cb = callback;
ptr->userdata = userdata;
ptr->next = NULL;
spinlock_acquire(&dcb->cb_lock);
cb = dcb->callbacks;
if (cb == NULL)
{
dcb->callbacks = ptr;
spinlock_release(&dcb->cb_lock);
}
else
{
while (cb)
{
if (cb->reason == reason && cb->cb == callback &&
cb->userdata == userdata)
{
free(ptr);
spinlock_release(&dcb->cb_lock);
return 0;
}
if (cb->next == NULL)
cb->next = ptr;
cb = cb->next;
}
spinlock_release(&dcb->cb_lock);
}
return rval;
}
/**
* Remove a callback from the callback list for the DCB
*
* Searches down the linked list to find he callback with a matching reason, function
* and userdata.
*
* @param dcb The DCB to add the callback to
* @param reason The callback reason
* @param cb The callback function to call
* @param userdata User data to send in the call
* @return Non-zero (true) if the callback was removed
*/
int
dcb_remove_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON), void *userdata)
{
DCB_CALLBACK *cb, *pcb = NULL;
int rval = 0;
spinlock_acquire(&dcb->cb_lock);
cb = dcb->callbacks;
if (cb == NULL)
{
rval = 0;
}
else
{
while (cb)
{
if (cb->reason == reason && cb->cb == callback
&& cb->userdata == userdata)
{
if (pcb == NULL)
pcb->next = cb->next;
else
dcb->callbacks = cb->next;
spinlock_release(&dcb->cb_lock);
free(cb);
rval = 1;
break;
}
pcb = cb;
cb = cb->next;
}
}
if (!rval)
spinlock_release(&dcb->cb_lock);
return rval;
}
/**
* Call the set of callbacks registered for a particular reason.
*
* @param dcb The DCB to call the callbacks regarding
* @param reason The reason that has triggered the call
*/
static void
dcb_call_callback(DCB *dcb, DCB_REASON reason)
{
DCB_CALLBACK *cb, *nextcb;
spinlock_acquire(&dcb->cb_lock);
cb = dcb->callbacks;
while (cb)
{
if (cb->reason == reason)
{
nextcb = cb->next;
spinlock_release(&dcb->cb_lock);
cb->cb(dcb, reason, cb->userdata);
spinlock_acquire(&dcb->cb_lock);
cb = nextcb;
}
else
cb = cb->next;
}
spinlock_release(&dcb->cb_lock);
}
/**
* Check the passed DCB to ensure it is in the list of allDCBS
*
* @param DCB The DCB to check
* @return 1 if the DCB is in the list, otherwise 0
*/
int
dcb_isvalid(DCB *dcb)
{
DCB *ptr;
int rval = 0;
spinlock_acquire(&dcbspin);
ptr = allDCBs;
while (ptr)
{
if (ptr == dcb)
{
rval = 1;
break;
}
ptr = ptr->next;
}
spinlock_release(&dcbspin);
return rval;
}

View File

@ -106,6 +106,7 @@ MODULES *mod;
return NULL;
}
}
if ((dlhandle = dlopen(fname, RTLD_NOW|RTLD_LOCAL)) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
@ -156,9 +157,10 @@ MODULES *mod;
LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE,
"Loaded module %s: %s.",
"Loaded module %s: %s from %s",
module,
version)));
version,
fname)));
register_module(module, type, dlhandle, version, modobj);
}
else

View File

@ -197,3 +197,26 @@ MONITOR *ptr;
}
spinlock_release(&monLock);
}
/**
* Find a monitor by name
*
* @param name The name of the monitor
* @return Pointer to the monitor or NULL
*/
MONITOR *
monitor_find(char *name)
{
MONITOR *ptr;
spinlock_acquire(&monLock);
ptr = allMonitors;
while (ptr)
{
if (!strcmp(ptr->name, name))
break;
ptr = ptr->next;
}
spinlock_release(&monLock);
return ptr;
}

View File

@ -67,6 +67,7 @@ SERVER *server;
server->nextdb = NULL;
server->monuser = NULL;
server->monpw = NULL;
server->unique_name = NULL;
spinlock_acquire(&server_spin);
server->next = allServers;
@ -109,10 +110,49 @@ SERVER *ptr;
/* Clean up session and free the memory */
free(server->name);
free(server->protocol);
if (server->unique_name)
free(server->unique_name);
free(server);
return 1;
}
/**
* Set a unique name for the server
*
* @param server The server to ste the name on
* @param name The unique name for the server
*/
void
server_set_unique_name(SERVER *server, char *name)
{
server->unique_name = strdup(name);
}
/**
* Find an existing server using the unique section name in
* configuration file
*
* @param servname The Server name or address
* @param port The server port
* @return The server or NULL if not found
*/
SERVER *
server_find_by_unique_name(char *name)
{
SERVER *server;
spinlock_acquire(&server_spin);
server = allServers;
while (server)
{
if (strcmp(server->unique_name, name) == 0)
break;
server = server->next;
}
spinlock_release(&server_spin);
return server;
}
/**
* Find an existing server
*
@ -190,7 +230,7 @@ char *stat;
ptr = allServers;
while (ptr)
{
dcb_printf(dcb, "Server %p\n", ptr);
dcb_printf(dcb, "Server %p (%s)\n", ptr, ptr->unique_name);
dcb_printf(dcb, "\tServer: %s\n", ptr->name);
stat = server_status(ptr);
dcb_printf(dcb, "\tStatus: %s\n", stat);
@ -215,7 +255,7 @@ dprintServer(DCB *dcb, SERVER *server)
{
char *stat;
dcb_printf(dcb, "Server %p\n", server);
dcb_printf(dcb, "Server %p (%s)\n", server, server->unique_name);
dcb_printf(dcb, "\tServer: %s\n", server->name);
stat = server_status(server);
dcb_printf(dcb, "\tStatus: %s\n", stat);

View File

@ -29,6 +29,7 @@
* 25/02/14 Massimiliano Pinto Added: service refresh limit feature
* 28/02/14 Massimiliano Pinto users_alloc moved from service_alloc to serviceStartPort (generic hashable for services)
* 07/05/14 Massimiliano Pinto Added: version_string initialized to NULL
* 23/05/14 Mark Riddoch Addition of service validation call
*
* @endverbatim
*/
@ -121,6 +122,33 @@ SERVICE *service;
return service;
}
/**
* Check to see if a service pointer is valid
*
* @param service The poitner to check
* @return 1 if the service is in the list of all services
*/
int
service_isvalid(SERVICE *service)
{
SERVICE *ptr;
int rval = 0;
spinlock_acquire(&service_spin);
ptr = allServices;
while (ptr)
{
if (ptr == service)
{
rval = 1;
break;
}
ptr = ptr->next;
}
spinlock_release(&service_spin);
return rval;
}
/**
* Start an individual port/protocol pair
*
@ -184,7 +212,7 @@ GWPROTOCOL *funcs;
if (port->listener->func.listen(port->listener, config_bind)) {
port->listener->session = session_alloc(service, port->listener);
if (port->listener->session != NULL) {
port->listener->session->state = SESSION_STATE_LISTENER;
listeners += 1;
@ -650,7 +678,7 @@ SERVICE *ptr;
/**
* Print all services to a DCB
*
* Designed to be called within a debugger session in order
* Designed to be called within a CLI command in order
* to display all active services within the gateway
*/
void
@ -662,30 +690,42 @@ SERVICE *ptr;
ptr = allServices;
while (ptr)
{
SERVER *server = ptr->databases;
dcb_printf(dcb, "Service %p\n", ptr);
dcb_printf(dcb, "\tService: %s\n", ptr->name);
dcb_printf(dcb, "\tRouter: %s (%p)\n", ptr->routerModule,
ptr->router);
if (ptr->router)
ptr->router->diagnostics(ptr->router_instance, dcb);
dcb_printf(dcb, "\tStarted: %s",
asctime(localtime(&ptr->stats.started)));
dcb_printf(dcb, "\tBackend databases\n");
while (server)
{
dcb_printf(dcb, "\t\t%s:%d Protocol: %s\n", server->name, server->port,
server->protocol);
server = server->nextdb;
}
dcb_printf(dcb, "\tUsers data: %p\n", ptr->users);
dcb_printf(dcb, "\tTotal connections: %d\n", ptr->stats.n_sessions);
dcb_printf(dcb, "\tCurrently connected: %d\n", ptr->stats.n_current);
dprintService(dcb, ptr);
ptr = ptr->next;
}
spinlock_release(&service_spin);
}
/**
* Print details of a single service.
*
* @param dcb DCB to print data to
* @param service The service to print
*/
dprintService(DCB *dcb, SERVICE *service)
{
SERVER *server = service->databases;
dcb_printf(dcb, "Service %p\n", service);
dcb_printf(dcb, "\tService: %s\n", service->name);
dcb_printf(dcb, "\tRouter: %s (%p)\n", service->routerModule,
service->router);
if (service->router)
service->router->diagnostics(service->router_instance, dcb);
dcb_printf(dcb, "\tStarted: %s",
asctime(localtime(&service->stats.started)));
dcb_printf(dcb, "\tBackend databases\n");
while (server)
{
dcb_printf(dcb, "\t\t%s:%d Protocol: %s\n", server->name, server->port,
server->protocol);
server = server->nextdb;
}
dcb_printf(dcb, "\tUsers data: %p\n", service->users);
dcb_printf(dcb, "\tTotal connections: %d\n", service->stats.n_sessions);
dcb_printf(dcb, "\tCurrently connected: %d\n", service->stats.n_current);
}
/**
* Update the definition of a service
*

View File

@ -114,7 +114,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
/*
* Only create a router session if we are not the listening
* DCB. Creating a router session may create a connection to a
* DCB or an internal DCB. Creating a router session may create a connection to a
* backend server, depending upon the router module implementation
* and should be avoided for the listener session
*
@ -122,7 +122,7 @@ session_alloc(SERVICE *service, DCB *client_dcb)
* session, therefore it is important that the session lock is
* relinquished beforethe router call.
*/
if (client_dcb->state != DCB_STATE_LISTENING)
if (client_dcb->state != DCB_STATE_LISTENING && client_dcb->dcb_role != DCB_ROLE_INTERNAL)
{
session->router_session =
service->router->newSession(service->router_instance,
@ -273,6 +273,34 @@ return_succp :
return succp;
}
/**
* Check to see if a session is valid, i.e. in the list of all sessions
*
* @param session Session to check
* @return 1 if the session is valid otherwise 0
*/
int
session_isvalid(SESSION *session)
{
SESSION *ptr;
int rval = 0;
spinlock_acquire(&session_spin);
ptr = allSessions;
while (ptr)
{
if (ptr == session)
{
rval = 1;
break;
}
ptr = ptr->next;
}
spinlock_release(&session_spin);
return rval;
}
/**
* Print details of an individual session
*