From 2afe60dd0ec09c46c7bea515c149fa2658931021 Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Mon, 30 Nov 2015 12:25:59 +0200 Subject: [PATCH] Reindent server/core/dcb.c --- server/core/dcb.c | 2768 ++++++++++++++++++++++-------------------- server/include/dcb.h | 399 +++--- 2 files changed, 1648 insertions(+), 1519 deletions(-) diff --git a/server/core/dcb.c b/server/core/dcb.c index e96931056..a4d63d7e9 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -28,29 +28,29 @@ * @verbatim * Revision History * - * Date Who Description - * 12/06/13 Mark Riddoch Initial implementation - * 21/06/13 Massimiliano Pinto free_dcb is used - * 25/06/13 Massimiliano Pinto Added checks to session and router_session - * 28/06/13 Mark Riddoch Changed the free mechanism to - * introduce a zombie state for the - * dcb - * 02/07/2013 Massimiliano Pinto Addition of delayqlock, delayq and + * Date Who Description + * 12/06/13 Mark Riddoch Initial implementation + * 21/06/13 Massimiliano Pinto free_dcb is used + * 25/06/13 Massimiliano Pinto Added checks to session and router_session + * 28/06/13 Mark Riddoch Changed the free mechanism to + * introduce a zombie state for the + * dcb + * 02/07/2013 Massimiliano Pinto Addition of delayqlock, delayq and * authlock for handling backend * asynchronous protocol connection - * and a generic lock for backend + * and a generic lock for backend * authentication - * 16/07/2013 Massimiliano Pinto Added command type for dcb - * 23/07/2013 Mark Riddoch Tidy up logging - * 02/09/2013 Massimiliano Pinto Added session refcount - * 27/09/2013 Massimiliano Pinto dcb_read returns 0 if ioctl returns no + * 16/07/2013 Massimiliano Pinto Added command type for dcb + * 23/07/2013 Mark Riddoch Tidy up logging + * 02/09/2013 Massimiliano Pinto Added session refcount + * 27/09/2013 Massimiliano Pinto dcb_read returns 0 if ioctl returns no * error and 0 bytes to read. - * This fixes a bug with many reads from + * This fixes a bug with many reads from * backend - * 07/05/2014 Mark Riddoch Addition of callback mechanism - * 20/06/2014 Mark Riddoch Addition of dcb_clone - * 29/05/2015 Markus Makela Addition of dcb_write_SSL - * 11/06/2015 Martin Brampton Persistent connnections and tidy up + * 07/05/2014 Mark Riddoch Addition of callback mechanism + * 20/06/2014 Mark Riddoch Addition of dcb_clone + * 29/05/2015 Markus Makela Addition of dcb_write_SSL + * 11/06/2015 Martin Brampton Persistent connnections and tidy up * 07/07/2015 Martin Brampton Merged add to zombieslist into dcb_close, * fixes for various error situations, * remove dcb_set_state etc, simplifications. @@ -86,14 +86,14 @@ #define SSL_ERRBUF_LEN 140 -static DCB *allDCBs = NULL; /* Diagnostics need a list of DCBs */ +static DCB *allDCBs = NULL; /* Diagnostics need a list of DCBs */ static int nDCBs = 0; static int maxDCBs = 0; -static DCB *zombies = NULL; +static DCB *zombies = NULL; static int nzombies = 0; static int maxzombies = 0; -static SPINLOCK dcbspin = SPINLOCK_INIT; -static SPINLOCK zombiespin = SPINLOCK_INIT; +static SPINLOCK dcbspin = SPINLOCK_INIT; +static SPINLOCK zombiespin = SPINLOCK_INIT; static void dcb_final_free(DCB *dcb); static void dcb_call_callback(DCB *dcb, DCB_REASON reason); @@ -118,7 +118,7 @@ int dcb_bytes_readable_SSL (DCB *dcb, int nread); void dcb_log_ssl_read_error(DCB *dcb, int ssl_errno, int rc); size_t dcb_get_session_id( - DCB *dcb) + DCB *dcb) { return (dcb && dcb->session) ? dcb->session->ses_id : 0; } @@ -126,9 +126,9 @@ size_t dcb_get_session_id( /** * Read log info from session through DCB and store values to memory locations * passed as parameters. - * - * @param dcb DCB - * @param sesid location where session id is to be copied + * + * @param dcb DCB + * @param sesid location where session id is to be copied * @param enabled_log_prioritiess bit field indicating which log types are enabled for the * session * @@ -157,22 +157,22 @@ bool dcb_get_ses_log_info( DCB * dcb_get_zombies(void) { - return zombies; + return zombies; } /** - * Allocate a new DCB. + * Allocate a new DCB. * * This routine performs the generic initialisation on the DCB before returning * the newly allocated DCB. * - * @param dcb_role_t The role for the new DCB + * @param dcb_role_t The role for the new DCB * @return A newly allocated DCB or NULL if non could be allocated. */ DCB * dcb_alloc(dcb_role_t role) { -DCB *newdcb; + DCB *newdcb; if ((newdcb = calloc(1, sizeof(DCB))) == NULL) { @@ -180,7 +180,7 @@ DCB *newdcb; } newdcb->dcb_chk_top = CHK_NUM_DCB; newdcb->dcb_chk_tail = CHK_NUM_DCB; - + newdcb->dcb_errhandle_called = false; newdcb->dcb_role = role; spinlock_init(&newdcb->dcb_initlock); @@ -202,7 +202,7 @@ DCB *newdcb; newdcb->evq.processing = 0; spinlock_init(&newdcb->evq.eventqlock); - memset(&newdcb->stats, 0, sizeof(DCBSTATS)); // Zero the statistics + memset(&newdcb->stats, 0, sizeof(DCBSTATS)); // Zero the statistics newdcb->state = DCB_STATE_ALLOC; bitmask_init(&newdcb->memdata.bitmask); newdcb->writeqlen = 0; @@ -223,7 +223,9 @@ DCB *newdcb; spinlock_acquire(&dcbspin); if (allDCBs == NULL) + { allDCBs = newdcb; + } else { DCB *ptr = allDCBs; @@ -232,7 +234,10 @@ DCB *newdcb; ptr->next = newdcb; } nDCBs++; - if (nDCBs > maxDCBs) maxDCBs = nDCBs; + if (nDCBs > maxDCBs) + { + maxDCBs = nDCBs; + } spinlock_release(&dcbspin); return newdcb; } @@ -242,7 +247,7 @@ DCB *newdcb; * Provided only for consistency, simply calls dcb_close to guarantee * safe disposal of a DCB * - * @param dcb The DCB to free + * @param dcb The DCB to free */ void dcb_free(DCB *dcb) @@ -254,32 +259,36 @@ dcb_free(DCB *dcb) * Clone a DCB for internal use, mostly used for specialist filters * to create dummy clients based on real clients. * - * @param orig The DCB to clone - * @return A DCB that can be used as a client + * @param orig The DCB to clone + * @return A DCB that can be used as a client */ DCB * dcb_clone(DCB *orig) { -DCB *clonedcb; + DCB *clonedcb; if ((clonedcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER))) { - clonedcb->fd = DCBFD_CLOSED; - clonedcb->flags |= DCBF_CLONE; - clonedcb->state = orig->state; - clonedcb->data = orig->data; - if (orig->remote) - clonedcb->remote = strdup(orig->remote); - if (orig->user) - clonedcb->user = strdup(orig->user); - clonedcb->protocol = orig->protocol; + clonedcb->fd = DCBFD_CLOSED; + clonedcb->flags |= DCBF_CLONE; + clonedcb->state = orig->state; + clonedcb->data = orig->data; + if (orig->remote) + { + clonedcb->remote = strdup(orig->remote); + } + if (orig->user) + { + clonedcb->user = strdup(orig->user); + } + clonedcb->protocol = orig->protocol; - clonedcb->func.write = dcb_null_write; - /** - * Close triggers closing of router session as well which is needed. - */ - clonedcb->func.close = orig->func.close; - clonedcb->func.auth = dcb_null_auth; + clonedcb->func.write = dcb_null_write; + /** + * Close triggers closing of router session as well which is needed. + */ + clonedcb->func.close = orig->func.close; + clonedcb->func.auth = dcb_null_auth; } return clonedcb; } @@ -295,43 +304,47 @@ DCB *clonedcb; static void dcb_final_free(DCB *dcb) { - DCB_CALLBACK *cb; + DCB_CALLBACK *cb; CHK_DCB(dcb); - ss_info_dassert(dcb->state == DCB_STATE_DISCONNECTED || - dcb->state == DCB_STATE_ALLOC, - "dcb not in DCB_STATE_DISCONNECTED not in DCB_STATE_ALLOC state."); + ss_info_dassert(dcb->state == DCB_STATE_DISCONNECTED || + dcb->state == DCB_STATE_ALLOC, + "dcb not in DCB_STATE_DISCONNECTED not in DCB_STATE_ALLOC state."); - if (DCB_POLL_BUSY(dcb)) - { - /* Check if DCB has outstanding poll events */ - MXS_ERROR("dcb_final_free: DCB %p has outstanding events.", dcb); - } + if (DCB_POLL_BUSY(dcb)) + { + /* Check if DCB has outstanding poll events */ + MXS_ERROR("dcb_final_free: DCB %p has outstanding events.", dcb); + } - /*< First remove this DCB from the chain */ - spinlock_acquire(&dcbspin); - if (allDCBs == dcb) - { - /*< - * Deal with the special case of removing the DCB at the head of - * the chain. - */ - allDCBs = dcb->next; - } - else - { - /*< - * We find the DCB that point to the one we are removing and then - * set the next pointer of that DCB to the next pointer of the - * DCB we are removing. - */ - DCB *ptr = allDCBs; - while (ptr && ptr->next != dcb) - ptr = ptr->next; - if (ptr) - ptr->next = dcb->next; - } - nDCBs--; + /*< First remove this DCB from the chain */ + spinlock_acquire(&dcbspin); + if (allDCBs == dcb) + { + /*< + * Deal with the special case of removing the DCB at the head of + * the chain. + */ + allDCBs = dcb->next; + } + else + { + /*< + * We find the DCB that point to the one we are removing and then + * set the next pointer of that DCB to the next pointer of the + * DCB we are removing. + */ + DCB *ptr = allDCBs; + while (ptr && ptr->next != dcb) + { + ptr = ptr->next; + } + if (ptr) + { + ptr->next = dcb->next; + } + } + nDCBs--; spinlock_release(&dcbspin); if (dcb->session) { @@ -341,7 +354,7 @@ dcb_final_free(DCB *dcb) SESSION *local_session = dcb->session; dcb->session = NULL; CHK_SESSION(local_session); - /** + /** * Set session's client pointer NULL so that other threads * won't try to call dcb_close for client DCB * after this call. @@ -356,46 +369,67 @@ dcb_final_free(DCB *dcb) { session_free(local_session); } - } + } - if (dcb->protocol && (!DCB_IS_CLONE(dcb))) - free(dcb->protocol); + if (dcb->protocol && (!DCB_IS_CLONE(dcb))) + { + free(dcb->protocol); + } if (dcb->protoname) + { free(dcb->protoname); - if (dcb->remote) - free(dcb->remote); - if (dcb->user) - free(dcb->user); + } + if (dcb->remote) + { + free(dcb->remote); + } + if (dcb->user) + { + free(dcb->user); + } - /* Clear write and read buffers */ - if (dcb->delayq) { + /* Clear write and read buffers */ + if (dcb->delayq) + { GWBUF *queue = dcb->delayq; - while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL); + while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL) + { + ; + } dcb->delayq = NULL; } - if (dcb->writeq) { + if (dcb->writeq) + { GWBUF *queue = dcb->writeq; - while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL); + while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL) + { + ; + } dcb->writeq = NULL; } if (dcb->dcb_readqueue) { GWBUF* queue = dcb->dcb_readqueue; - while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL); + while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL) + { + ; + } dcb->dcb_readqueue = NULL; } - spinlock_acquire(&dcb->cb_lock); - while ((cb = dcb->callbacks) != NULL) - { - dcb->callbacks = cb->next; - free(cb); - } - spinlock_release(&dcb->cb_lock); - if(dcb->ssl) - SSL_free(dcb->ssl); - bitmask_free(&dcb->memdata.bitmask); - free(dcb); + spinlock_acquire(&dcb->cb_lock); + while ((cb = dcb->callbacks) != NULL) + { + dcb->callbacks = cb->next; + free(cb); + } + spinlock_release(&dcb->cb_lock); + if (dcb->ssl) + { + SSL_free(dcb->ssl); + } + bitmask_free(&dcb->memdata.bitmask); + free(dcb); } /** @@ -405,118 +439,118 @@ dcb_final_free(DCB *dcb) * the thread id of the polling thread. It must clear the bit in * the memdata bitmask for the polling thread that calls it. If the * operation of clearing this bit means that no bits are set in - * the memdata.bitmask then the DCB is no longer able to be + * the memdata.bitmask then the DCB is no longer able to be * referenced and it can be finally removed. * - * @param threadid The thread ID of the caller + * @param threadid The thread ID of the caller */ DCB * dcb_process_zombies(int threadid) { -DCB *zombiedcb; -DCB *previousdcb = NULL, *nextdcb; -DCB *listofdcb = NULL; + DCB *zombiedcb; + DCB *previousdcb = NULL, *nextdcb; + DCB *listofdcb = NULL; - /** - * Perform a dirty read to see if there is anything in the queue. - * This avoids threads hitting the queue spinlock when the queue - * is empty. This will really help when the only entry is being - * freed, since the queue is updated before the expensive call to - * dcb_final_free. - */ - if (!zombies) + /** + * Perform a dirty read to see if there is anything in the queue. + * This avoids threads hitting the queue spinlock when the queue + * is empty. This will really help when the only entry is being + * freed, since the queue is updated before the expensive call to + * dcb_final_free. + */ + if (!zombies) { - return NULL; + return NULL; } - /* - * Process the zombie queue and create a list of DCB's that can be - * finally freed. This processing is down under a spinlock that - * will prevent new entries being added to the zombie queue. Therefore - * we do not want to do any expensive operations under this spinlock - * as it will block other threads. The expensive operations will be - * performed on the victim queue within holding the zombie queue - * spinlock. - */ - spinlock_acquire(&zombiespin); - zombiedcb = zombies; - while (zombiedcb) - { - CHK_DCB(zombiedcb); + /* + * Process the zombie queue and create a list of DCB's that can be + * finally freed. This processing is down under a spinlock that + * will prevent new entries being added to the zombie queue. Therefore + * we do not want to do any expensive operations under this spinlock + * as it will block other threads. The expensive operations will be + * performed on the victim queue within holding the zombie queue + * spinlock. + */ + spinlock_acquire(&zombiespin); + zombiedcb = zombies; + while (zombiedcb) + { + CHK_DCB(zombiedcb); nextdcb = zombiedcb->memdata.next; - /* - * Skip processing of DCB's that are - * in the event queue waiting to be processed. - */ - if (zombiedcb->evq.next || zombiedcb->evq.prev) - { - previousdcb = zombiedcb; - } - else - { + /* + * Skip processing of DCB's that are + * in the event queue waiting to be processed. + */ + if (zombiedcb->evq.next || zombiedcb->evq.prev) + { + previousdcb = zombiedcb; + } + else + { - bitmask_clear(&zombiedcb->memdata.bitmask, threadid); - - if (bitmask_isallclear(&zombiedcb->memdata.bitmask)) - { - /** - * Remove the DCB from the zombie queue - * and call the final free routine for the - * DCB - * - * zombiedcb is the DCB we are processing - * previousdcb is the previous DCB on the zombie - * queue or NULL if the DCB is at the head of the - * queue. Remove zombiedcb from the zombies list. - */ - if (NULL == previousdcb) - { - zombies = zombiedcb->memdata.next; - } - else - { - previousdcb->memdata.next = zombiedcb->memdata.next; - } - - MXS_DEBUG("%lu [%s] Remove dcb " - "%p fd %d in state %s from the " - "list of zombies.", - pthread_self(), - __func__, - zombiedcb, - zombiedcb->fd, - STRDCBSTATE(zombiedcb->state)); - /*< - * Move zombie dcb to linked list of victim dcbs. - * The variable dcb is used to hold the last DCB - * to have been added to the linked list, or NULL - * if none has yet been added. If the list - * (listofdcb) is not NULL, then it follows that - * dcb will also not be null. - */ + bitmask_clear(&zombiedcb->memdata.bitmask, threadid); + + if (bitmask_isallclear(&zombiedcb->memdata.bitmask)) + { + /** + * Remove the DCB from the zombie queue + * and call the final free routine for the + * DCB + * + * zombiedcb is the DCB we are processing + * previousdcb is the previous DCB on the zombie + * queue or NULL if the DCB is at the head of the + * queue. Remove zombiedcb from the zombies list. + */ + if (NULL == previousdcb) + { + zombies = zombiedcb->memdata.next; + } + else + { + previousdcb->memdata.next = zombiedcb->memdata.next; + } + + MXS_DEBUG("%lu [%s] Remove dcb " + "%p fd %d in state %s from the " + "list of zombies.", + pthread_self(), + __func__, + zombiedcb, + zombiedcb->fd, + STRDCBSTATE(zombiedcb->state)); + /*< + * Move zombie dcb to linked list of victim dcbs. + * The variable dcb is used to hold the last DCB + * to have been added to the linked list, or NULL + * if none has yet been added. If the list + * (listofdcb) is not NULL, then it follows that + * dcb will also not be null. + */ nzombies--; zombiedcb->memdata.next = listofdcb; listofdcb = zombiedcb; - } - else - { - /* Since we didn't remove this dcb from the zombies - list, we need to advance the previous pointer */ - previousdcb = zombiedcb; - } - } - zombiedcb = nextdcb; - } - spinlock_release(&zombiespin); - - if (listofdcb) - { - dcb_process_victim_queue(listofdcb); + } + else + { + /* Since we didn't remove this dcb from the zombies + list, we need to advance the previous pointer */ + previousdcb = zombiedcb; + } } - - return zombies; -} - + zombiedcb = nextdcb; + } + spinlock_release(&zombiespin); + + if (listofdcb) + { + dcb_process_victim_queue(listofdcb); + } + + return zombies; +} + /** * Process the victim queue, selected from the list of zombies * @@ -524,14 +558,14 @@ DCB *listofdcb = NULL; * file descriptor is closed, the DCB marked as disconnected and the DCB * itself is finally freed. * - * @param listofdcb The first victim DCB + * @param listofdcb The first victim DCB */ static inline void dcb_process_victim_queue(DCB *listofdcb) { DCB *dcb = listofdcb; - while (dcb != NULL) + while (dcb != NULL) { DCB *nextdcb; /*< @@ -550,7 +584,8 @@ dcb_process_victim_queue(DCB *listofdcb) dcb, STRDCBSTATE(dcb->state)); } - else { + else + { /* Must be DCB_STATE_POLLING */ spinlock_release(&dcb->dcb_initlock); if (0 == dcb->persistentstart && dcb_maybe_add_persistent(dcb)) @@ -569,7 +604,10 @@ dcb_process_victim_queue(DCB *listofdcb) dcb->memdata.next = zombies; zombies = dcb; nzombies++; - if (nzombies > maxzombies) maxzombies = nzombies; + if (nzombies > maxzombies) + { + maxzombies = nzombies; + } spinlock_release(&zombiespin); dcb = nextdcb; continue; @@ -577,7 +615,7 @@ dcb_process_victim_queue(DCB *listofdcb) } } /* - * Into the final close logic, so if DCB is for backend server, we + * Into the final close logic, so if DCB is for backend server, we * must decrement the number of current connections. */ if (dcb->server && 0 == dcb->persistentstart) @@ -590,7 +628,7 @@ dcb_process_victim_queue(DCB *listofdcb) /*< * Close file descriptor and move to clean-up phase. */ - if (close(dcb->fd) < 0) + if (close(dcb->fd) < 0) { int eno = errno; errno = 0; @@ -602,11 +640,11 @@ dcb_process_victim_queue(DCB *listofdcb) dcb, eno, strerror_r(eno, errbuf, sizeof(errbuf))); - } - else + } + else { dcb->fd = DCBFD_CLOSED; - + MXS_DEBUG("%lu [dcb_process_victim_queue] Closed socket " "%d on dcb %p.", pthread_self(), @@ -635,10 +673,10 @@ dcb_process_victim_queue(DCB *listofdcb) /** * Remove a DCB from the poll list and trigger shutdown mechanisms. * - * @param dcb The DCB to be processed + * @param dcb The DCB to be processed */ static void -dcb_stop_polling_and_shutdown (DCB *dcb) +dcb_stop_polling_and_shutdown(DCB *dcb) { poll_remove_dcb(dcb); /** @@ -654,172 +692,175 @@ dcb_stop_polling_and_shutdown (DCB *dcb) /** * Connect to a server - * + * * This routine will create a server connection * If successful the new dcb will be put in * epoll set by dcb->func.connect * - * @param server The server to connect to - * @param session The session this connection is being made for - * @param protocol The protocol module to use - * @return The new allocated dcb or NULL if the DCB was not connected + * @param server The server to connect to + * @param session The session this connection is being made for + * @param protocol The protocol module to use + * @return The new allocated dcb or NULL if the DCB was not connected */ DCB * dcb_connect(SERVER *server, SESSION *session, const char *protocol) { - DCB *dcb; - GWPROTOCOL *funcs; + DCB *dcb; + GWPROTOCOL *funcs; int fd; int rc; char *user; - user = session_getUser(session); - if (user && strlen(user)) + user = session_getUser(session); + if (user && strlen(user)) + { + MXS_DEBUG("%lu [dcb_connect] Looking for persistent connection DCB " + "user %s protocol %s\n", pthread_self(), user, protocol); + dcb = server_get_persistent(server, user, protocol); + if (dcb) { - MXS_DEBUG("%lu [dcb_connect] Looking for persistent connection DCB " - "user %s protocol %s\n", pthread_self(), user, protocol); - dcb = server_get_persistent(server, user, protocol); - if (dcb) + /** + * Link dcb to session. Unlink is called in dcb_final_free + */ + if (!session_link_dcb(session, dcb)) { - /** - * Link dcb to session. Unlink is called in dcb_final_free - */ - if (!session_link_dcb(session, dcb)) - { - MXS_DEBUG("%lu [dcb_connect] Failed to link to session, the " - "session has been removed.\n", - pthread_self()); - dcb_close(dcb); - return NULL; - } - MXS_DEBUG("%lu [dcb_connect] Reusing a persistent connection, dcb %p\n", - pthread_self(), dcb); - dcb->persistentstart = 0; - return dcb; - } - else - { - MXS_DEBUG("%lu [dcb_connect] Failed to find a reusable persistent connection.\n", + MXS_DEBUG("%lu [dcb_connect] Failed to link to session, the " + "session has been removed.\n", pthread_self()); - } - } - - if ((dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER)) == NULL) - { - return NULL; - } - - if ((funcs = (GWPROTOCOL *)load_module(protocol, - MODULE_PROTOCOL)) == NULL) - { - dcb->state = DCB_STATE_DISCONNECTED; - dcb_final_free(dcb); - MXS_ERROR("Failed to load protocol module for %s, free dcb %p\n", - protocol, - dcb); - return NULL; - } - memcpy(&(dcb->func), funcs, sizeof(GWPROTOCOL)); - dcb->protoname = strdup(protocol); - - /** - * Link dcb to session. Unlink is called in dcb_final_free - */ - if (!session_link_dcb(session, dcb)) - { - MXS_DEBUG("%lu [dcb_connect] Failed to link to session, the " - "session has been removed.", - pthread_self()); - dcb_final_free(dcb); - return NULL; - } - fd = dcb->func.connect(dcb, server, session); - - if (fd == DCBFD_CLOSED) { - MXS_DEBUG("%lu [dcb_connect] Failed to connect to server %s:%d, " - "from backend dcb %p, client dcp %p fd %d.", - pthread_self(), - server->name, - server->port, - dcb, - session->client, - session->client->fd); - dcb->state = DCB_STATE_DISCONNECTED; - dcb_final_free(dcb); - return NULL; - } else { - MXS_DEBUG("%lu [dcb_connect] Connected to server %s:%d, " - "from backend dcb %p, client dcp %p fd %d.", - pthread_self(), - server->name, - server->port, - dcb, - session->client, - session->client->fd); - } - /** - * Successfully connected to backend. Assign file descriptor to dcb - */ - dcb->fd = fd; - - /** - * Add server pointer to dcb - */ - dcb->server = server; - - /** Copy status field to DCB */ - dcb->dcb_server_status = server->status; - dcb->dcb_port = server->port; - - /** - * backend_dcb is connected to backend server, and once backend_dcb - * is added to poll set, authentication takes place as part of - * EPOLLOUT event that will be received once the connection - * is established. - */ - - /** - * Add the dcb in the poll set - */ - rc = poll_add_dcb(dcb); - - if (rc) { - dcb->state = DCB_STATE_DISCONNECTED; - dcb_final_free(dcb); + dcb_close(dcb); return NULL; + } + MXS_DEBUG("%lu [dcb_connect] Reusing a persistent connection, dcb %p\n", + pthread_self(), dcb); + dcb->persistentstart = 0; + return dcb; } - /** - * The dcb will be addded into poll set by dcb->func.connect - */ - atomic_add(&server->stats.n_connections, 1); - atomic_add(&server->stats.n_current, 1); + else + { + MXS_DEBUG("%lu [dcb_connect] Failed to find a reusable persistent connection.\n", + pthread_self()); + } + } - return dcb; + if ((dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER)) == NULL) + { + return NULL; + } + + if ((funcs = (GWPROTOCOL *)load_module(protocol, + MODULE_PROTOCOL)) == NULL) + { + dcb->state = DCB_STATE_DISCONNECTED; + dcb_final_free(dcb); + MXS_ERROR("Failed to load protocol module for %s, free dcb %p\n", + protocol, + dcb); + return NULL; + } + memcpy(&(dcb->func), funcs, sizeof(GWPROTOCOL)); + dcb->protoname = strdup(protocol); + + /** + * Link dcb to session. Unlink is called in dcb_final_free + */ + if (!session_link_dcb(session, dcb)) + { + MXS_DEBUG("%lu [dcb_connect] Failed to link to session, the " + "session has been removed.", + pthread_self()); + dcb_final_free(dcb); + return NULL; + } + fd = dcb->func.connect(dcb, server, session); + + if (fd == DCBFD_CLOSED) + { + MXS_DEBUG("%lu [dcb_connect] Failed to connect to server %s:%d, " + "from backend dcb %p, client dcp %p fd %d.", + pthread_self(), + server->name, + server->port, + dcb, + session->client, + session->client->fd); + dcb->state = DCB_STATE_DISCONNECTED; + dcb_final_free(dcb); + return NULL; + } + else + { + MXS_DEBUG("%lu [dcb_connect] Connected to server %s:%d, " + "from backend dcb %p, client dcp %p fd %d.", + pthread_self(), + server->name, + server->port, + dcb, + session->client, + session->client->fd); + } + /** + * Successfully connected to backend. Assign file descriptor to dcb + */ + dcb->fd = fd; + + /** + * Add server pointer to dcb + */ + dcb->server = server; + + /** Copy status field to DCB */ + dcb->dcb_server_status = server->status; + dcb->dcb_port = server->port; + + /** + * backend_dcb is connected to backend server, and once backend_dcb + * is added to poll set, authentication takes place as part of + * EPOLLOUT event that will be received once the connection + * is established. + */ + + /** + * Add the dcb in the poll set + */ + rc = poll_add_dcb(dcb); + + if (rc) + { + dcb->state = DCB_STATE_DISCONNECTED; + dcb_final_free(dcb); + return NULL; + } + /** + * The dcb will be addded into poll set by dcb->func.connect + */ + atomic_add(&server->stats.n_connections, 1); + atomic_add(&server->stats.n_current, 1); + + return dcb; } /** * General purpose read routine to read data from a socket in the * Descriptor Control Block and append it to a linked list of buffers. - * The list may be empty, in which case *head == NULL. The third + * The list may be empty, in which case *head == NULL. The third * parameter indicates the maximum number of bytes to be read (needed * for SSL processing) with 0 meaning no limit. * - * @param dcb The DCB to read from - * @param head Pointer to linked list to append data to + * @param dcb The DCB to read from + * @param head Pointer to linked list to append data to * @param maxbytes Maximum bytes to read (0 = no limit) - * @return -1 on error, otherwise the number of read bytes on + * @return -1 on error, otherwise the number of read bytes on * the last iteration of while loop. 0 is returned if no data available. */ -int dcb_read( - DCB *dcb, - GWBUF **head, - int maxbytes) +int dcb_read(DCB *dcb, + GWBUF **head, + int maxbytes) { GWBUF *buffer = NULL; int bytesavailable; int nsingleread = 0; int nreadtotal = 0; - + CHK_DCB(dcb); if (dcb->fd <= 0) @@ -835,8 +876,8 @@ int dcb_read( while (0 == maxbytes || nreadtotal < maxbytes) { int bufsize; - - if (-1 == ioctl(dcb->fd, FIONREAD, &bytesavailable)) + + if (-1 == ioctl(dcb->fd, FIONREAD, &bytesavailable)) { char errbuf[STRERROR_BUFLEN]; /* */ @@ -853,22 +894,22 @@ int dcb_read( } if (bytesavailable == 0) - { + { /** Handle closed client socket */ - if (nreadtotal == 0 && dcb_isclient(dcb)) + if (nreadtotal == 0 && dcb_isclient(dcb)) { char c; int l_errno = 0; int r = -1; - + /* try to read 1 byte, without consuming the socket buffer */ r = recv(dcb->fd, &c, sizeof(char), MSG_PEEK); l_errno = errno; - - if (r <= 0 && - l_errno != EAGAIN && + + if (r <= 0 && + l_errno != EAGAIN && l_errno != EWOULDBLOCK && - l_errno != 0) + l_errno != 0) { return -1; } @@ -879,8 +920,11 @@ int dcb_read( dcb->last_read = hkheartbeat; bufsize = MIN(bytesavailable, MAX_BUFFER_SIZE); - if (maxbytes) bufsize = MIN(bufsize, maxbytes-nreadtotal); - + if (maxbytes) + { + bufsize = MIN(bufsize, maxbytes-nreadtotal); + } + if ((buffer = gwbuf_alloc(bufsize)) == NULL) { /*< @@ -896,15 +940,15 @@ int dcb_read( dcb->fd, errno, strerror_r(errno, errbuf, sizeof(errbuf))); - /* */ + /* */ return -1; } GW_NOINTR_CALL(nsingleread = read(dcb->fd, GWBUF_DATA(buffer), bufsize); - dcb->stats.n_reads++); - + dcb->stats.n_reads++); + if (nsingleread <= 0) - { - if (errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK) + { + if (errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK) { char errbuf[STRERROR_BUFLEN]; /* */ @@ -944,13 +988,12 @@ int dcb_read( * The list may be empty, in which case *head == NULL. The SSL structure should * be initialized and the SSL handshake should be done. * - * @param dcb The DCB to read from - * @param head Pointer to linked list to append data to - * @return -1 on error, otherwise the number of read bytes on the last + * @param dcb The DCB to read from + * @param head Pointer to linked list to append data to + * @return -1 on error, otherwise the number of read bytes on the last * iteration of while loop. 0 is returned if no data available. */ -int dcb_read_SSL(DCB *dcb, - GWBUF **head) +int dcb_read_SSL(DCB *dcb, GWBUF **head) { GWBUF *buffer = NULL; int b, n, nread = 0; @@ -1049,21 +1092,24 @@ int dcb_read_SSL(DCB *dcb, /** * General purpose routine to write to a DCB * - * @param dcb The DCB of the client - * @param queue Queue of buffers to write + * @param dcb The DCB of the client + * @param queue Queue of buffers to write */ int dcb_write(DCB *dcb, GWBUF *queue) { -int written; -int below_water; + int written; + int below_water; below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0; // The following guarantees that queue is not NULL - if (!dcb_write_parameter_check(dcb, queue)) return 0; - + if (!dcb_write_parameter_check(dcb, queue)) + { + return 0; + } + spinlock_acquire(&dcb->writeqlock); - if (dcb->writeq) + if (dcb->writeq) { dcb_write_when_already_queued(dcb, queue); } @@ -1083,11 +1129,11 @@ int below_water; GW_NOINTR_CALL( written = gw_write(dcb, GWBUF_DATA(queue), GWBUF_LENGTH(queue)); dcb->stats.n_writes++; - ); + ); if (written < 0) { - int saved_errno = errno; + int saved_errno = errno; dcb_log_write_failure(dcb, queue, saved_errno); /*< @@ -1099,12 +1145,12 @@ int below_water; dcb->stats.n_buffered++; spinlock_release(&dcb->writeqlock); - /** Return 1 if the write failure was due to EWOULDBLOCK or EAGAIN. - The rest of the buffer will be written once an EPOLL_OUT event - arrives.*/ - return saved_errno == 0 || - saved_errno == EAGAIN || - saved_errno == EWOULDBLOCK; + /** Return 1 if the write failure was due to EWOULDBLOCK or EAGAIN. + The rest of the buffer will be written once an EPOLL_OUT event + arrives.*/ + return saved_errno == 0 || + saved_errno == EAGAIN || + saved_errno == EWOULDBLOCK; } /* * Pull the number of bytes we have written from @@ -1119,7 +1165,6 @@ int below_water; STRDCBSTATE(dcb->state), dcb->fd); } /*< while (queue != NULL) */ - } /* if (dcb->writeq) */ dcb_write_tidy_up(dcb, below_water); @@ -1137,17 +1182,15 @@ int below_water; static inline void dcb_write_fake_code(DCB *dcb) { - if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER && - dcb->session != NULL) + if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER && dcb->session != NULL) { if (dcb_isclient(dcb) && fail_next_client_fd) { dcb_fake_write_errno[dcb->fd] = 32; dcb_fake_write_ev[dcb->fd] = 29; fail_next_client_fd = false; - } - else if (!dcb_isclient(dcb) && - fail_next_backend_fd) + } + else if (!dcb_isclient(dcb) && fail_next_backend_fd) { dcb_fake_write_errno[dcb->fd] = 32; dcb_fake_write_ev[dcb->fd] = 29; @@ -1167,8 +1210,11 @@ dcb_write_fake_code(DCB *dcb) static inline bool dcb_write_parameter_check(DCB *dcb, GWBUF *queue) { - if (queue == NULL) return false; - + if (queue == NULL) + { + return false; + } + if (dcb->fd <= 0) { MXS_ERROR("Write failed, dcb is %s.", @@ -1176,7 +1222,7 @@ dcb_write_parameter_check(DCB *dcb, GWBUF *queue) gwbuf_free(queue); return false; } - + if (dcb->session == NULL || dcb->session->state != SESSION_STATE_STOPPING) { /** @@ -1188,12 +1234,9 @@ dcb_write_parameter_check(DCB *dcb, GWBUF *queue) * still be writable. */ if (dcb->state != DCB_STATE_ALLOC && - dcb->state != DCB_STATE_POLLING && dcb->state != DCB_STATE_LISTENING && dcb->state != DCB_STATE_NOPOLLING) - - { MXS_DEBUG("%lu [dcb_write] Write aborted to dcb %p because " "it is in state %s", @@ -1242,7 +1285,7 @@ dcb_write_when_already_queued(DCB *dcb, GWBUF *queue) * * @param dcb The DCB of the client * @param queue Queue of buffers to write - * @param eno Error number for logging + * @param eno Error number for logging */ static void dcb_log_write_failure(DCB *dcb, GWBUF *queue, int eno) @@ -1283,7 +1326,7 @@ dcb_log_write_failure(DCB *dcb, GWBUF *queue, int eno) } } - + bool dolog = true; if (eno != 0 && @@ -1321,7 +1364,7 @@ dcb_log_write_failure(DCB *dcb, GWBUF *queue, int eno) * @param below_water A boolean */ static inline void -dcb_write_tidy_up (DCB *dcb, bool below_water) +dcb_write_tidy_up(DCB *dcb, bool below_water) { spinlock_release(&dcb->writeqlock); @@ -1330,26 +1373,29 @@ dcb_write_tidy_up (DCB *dcb, bool below_water) atomic_add(&dcb->stats.n_high_water, 1); dcb_call_callback(dcb, DCB_REASON_HIGH_WATER); } -} +} /** * General purpose routine to write to an SSL enabled DCB * - * @param dcb The DCB of the client + * @param dcb The DCB of the client * @param ssl The SSL structure for this DCB - * @param queue Queue of buffers to write + * @param queue Queue of buffers to write * @return 0 on failure, 1 on success */ int dcb_write_SSL(DCB *dcb, GWBUF *queue) { - int w; - int saved_errno = 0; - bool below_water; + int w; + int saved_errno = 0; + bool below_water; below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0; // The following guarantees that queue is not NULL - if (!dcb_write_parameter_check(dcb, queue)) return 0; + if (!dcb_write_parameter_check(dcb, queue)) + { + return 0; + } spinlock_acquire(&dcb->writeqlock); @@ -1359,14 +1405,14 @@ dcb_write_SSL(DCB *dcb, GWBUF *queue) } else { - /* - * Loop over the buffer chain that has been passed to us - * from the reading side. - * Send as much of the data in that chain as possible and - * add any balance to the write queue. - */ - while (queue != NULL) - { + /* + * Loop over the buffer chain that has been passed to us + * from the reading side. + * Send as much of the data in that chain as possible and + * add any balance to the write queue. + */ + while (queue != NULL) + { #if defined(FAKE_CODE) dcb_write_fake_code(dcb); #endif /* FAKE_CODE */ @@ -1393,7 +1439,8 @@ dcb_write_SSL(DCB *dcb, GWBUF *queue) } #endif } - } while(w <= 0); + } + while (w <= 0); /** Remove written bytes from the queue */ queue = gwbuf_consume(queue, w); @@ -1426,7 +1473,7 @@ dcb_write_SSL(DCB *dcb, GWBUF *queue) * @param ssl_errno The SSL error code */ static void -dcb_write_SSL_error_report (DCB *dcb, int ret, int ssl_errno) +dcb_write_SSL_error_report(DCB *dcb, int ret, int ssl_errno) { char errbuf[STRERROR_BUFLEN]; @@ -1434,33 +1481,33 @@ dcb_write_SSL_error_report (DCB *dcb, int ret, int ssl_errno) { switch(ssl_errno) { - case SSL_ERROR_WANT_READ: - MXS_DEBUG("%lu [dcb_write] Write to dcb " - "%p in state %s fd %d failed " - "due error SSL_ERROR_WANT_READ", - pthread_self(), - dcb, - STRDCBSTATE(dcb->state), - dcb->fd); - break; - case SSL_ERROR_WANT_WRITE: - MXS_DEBUG("%lu [dcb_write] Write to dcb " - "%p in state %s fd %d failed " - "due error SSL_ERROR_WANT_WRITE", - pthread_self(), - dcb, - STRDCBSTATE(dcb->state), - dcb->fd); - break; - default: - MXS_DEBUG("%lu [dcb_write] Write to dcb " - "%p in state %s fd %d failed " - "due error %d", - pthread_self(), - dcb, - STRDCBSTATE(dcb->state), - dcb->fd,ssl_errno); - break; + case SSL_ERROR_WANT_READ: + MXS_DEBUG("%lu [dcb_write] Write to dcb " + "%p in state %s fd %d failed " + "due error SSL_ERROR_WANT_READ", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state), + dcb->fd); + break; + case SSL_ERROR_WANT_WRITE: + MXS_DEBUG("%lu [dcb_write] Write to dcb " + "%p in state %s fd %d failed " + "due error SSL_ERROR_WANT_WRITE", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state), + dcb->fd); + break; + default: + MXS_DEBUG("%lu [dcb_write] Write to dcb " + "%p in state %s fd %d failed " + "due error %d", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state), + dcb->fd,ssl_errno); + break; } } @@ -1475,9 +1522,9 @@ dcb_write_SSL_error_report (DCB *dcb, int ret, int ssl_errno) STRDCBSTATE(dcb->state), dcb->fd, ssl_errno); - if(ssl_errno == SSL_ERROR_SSL || ssl_errno == SSL_ERROR_SYSCALL) + if (ssl_errno == SSL_ERROR_SSL || ssl_errno == SSL_ERROR_SYSCALL) { - if(ssl_errno == SSL_ERROR_SYSCALL) + if (ssl_errno == SSL_ERROR_SYSCALL) { MXS_ERROR("%d:%s", errno, strerror_r(errno, errbuf, sizeof(errbuf))); } @@ -1486,17 +1533,19 @@ dcb_write_SSL_error_report (DCB *dcb, int ret, int ssl_errno) char errbuf[SSL_ERRBUF_LEN]; ERR_error_string_n(ssl_errno,errbuf, sizeof(errbuf)); MXS_ERROR("%d:%s", ssl_errno,errbuf); - } while((ssl_errno = ERR_get_error()) != 0); + } + while ((ssl_errno = ERR_get_error()) != 0); } } - else if(ret == 0) + else if (ret == 0) { do { char errbuf[SSL_ERRBUF_LEN]; ERR_error_string_n(ssl_errno,errbuf,sizeof(errbuf)); MXS_ERROR("%d:%s", ssl_errno,errbuf); - } while((ssl_errno = ERR_get_error()) != 0); + } + while ((ssl_errno = ERR_get_error()) != 0); } } } @@ -1506,88 +1555,90 @@ dcb_write_SSL_error_report (DCB *dcb, int ret, int ssl_errno) * of a socket and will try to send any buffered data from the write queue * up until the point the write would block. * - * @param dcb DCB to drain the write queue of + * @param dcb DCB to drain the write queue of * @return The number of bytes written */ int dcb_drain_writeq(DCB *dcb) { -int n = 0; -int w; -int saved_errno = 0; -int above_water; + int n = 0; + int w; + int saved_errno = 0; + int above_water; - above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0; + above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0; - spinlock_acquire(&dcb->writeqlock); + spinlock_acquire(&dcb->writeqlock); - if (dcb->writeq) - { - int len; - /* - * Loop over the buffer chain in the pending writeq - * Send as much of the data in that chain as possible and - * leave any balance on the write queue. - */ - while (dcb->writeq != NULL) - { - len = GWBUF_LENGTH(dcb->writeq); - GW_NOINTR_CALL(w = gw_write(dcb, GWBUF_DATA(dcb->writeq), len);); - saved_errno = errno; - errno = 0; - - if (w < 0) - { + if (dcb->writeq) + { + int len; + /* + * Loop over the buffer chain in the pending writeq + * Send as much of the data in that chain as possible and + * leave any balance on the write queue. + */ + while (dcb->writeq != NULL) + { + len = GWBUF_LENGTH(dcb->writeq); + GW_NOINTR_CALL(w = gw_write(dcb, GWBUF_DATA(dcb->writeq), len);); + saved_errno = errno; + errno = 0; + + if (w < 0) + { #if defined(SS_DEBUG) - if (saved_errno == EAGAIN || - saved_errno == EWOULDBLOCK) + if (saved_errno == EAGAIN || + saved_errno == EWOULDBLOCK) #else - if (saved_errno == EAGAIN || - saved_errno == EWOULDBLOCK || - saved_errno == EPIPE) + if (saved_errno == EAGAIN || + saved_errno == EWOULDBLOCK || + saved_errno == EPIPE) #endif - { - break; - } - char errbuf[STRERROR_BUFLEN]; - MXS_ERROR("Write to dcb %p " - "in state %s fd %d failed due errno %d, %s", - dcb, - STRDCBSTATE(dcb->state), - dcb->fd, - saved_errno, - strerror_r(saved_errno, errbuf, sizeof(errbuf))); - break; - } - /* - * Pull the number of bytes we have written from - * queue with have. - */ - dcb->writeq = gwbuf_consume(dcb->writeq, w); - MXS_DEBUG("%lu [dcb_drain_writeq] Wrote %d Bytes to dcb %p " - "in state %s fd %d", - pthread_self(), - w, - dcb, - STRDCBSTATE(dcb->state), - dcb->fd); - n += w; - } - } - spinlock_release(&dcb->writeqlock); - atomic_add(&dcb->writeqlen, -n); - - /* The write queue has drained, potentially need to call a callback function */ - if (dcb->writeq == NULL) - dcb_call_callback(dcb, DCB_REASON_DRAINED); + { + break; + } + char errbuf[STRERROR_BUFLEN]; + MXS_ERROR("Write to dcb %p " + "in state %s fd %d failed due errno %d, %s", + dcb, + STRDCBSTATE(dcb->state), + dcb->fd, + saved_errno, + strerror_r(saved_errno, errbuf, sizeof(errbuf))); + break; + } + /* + * Pull the number of bytes we have written from + * queue with have. + */ + dcb->writeq = gwbuf_consume(dcb->writeq, w); + MXS_DEBUG("%lu [dcb_drain_writeq] Wrote %d Bytes to dcb %p " + "in state %s fd %d", + pthread_self(), + w, + dcb, + STRDCBSTATE(dcb->state), + dcb->fd); + n += w; + } + } + spinlock_release(&dcb->writeqlock); + atomic_add(&dcb->writeqlen, -n); - if (above_water && dcb->writeqlen < dcb->low_water) - { - atomic_add(&dcb->stats.n_low_water, 1); - dcb_call_callback(dcb, DCB_REASON_LOW_WATER); - } + /* The write queue has drained, potentially need to call a callback function */ + if (dcb->writeq == NULL) + { + dcb_call_callback(dcb, DCB_REASON_DRAINED); + } - return n; + if (above_water && dcb->writeqlen < dcb->low_water) + { + atomic_add(&dcb->stats.n_low_water, 1); + dcb_call_callback(dcb, DCB_REASON_LOW_WATER); + } + + return n; } /** @@ -1596,16 +1647,16 @@ int above_water; * up until the point the write would block. This function uses SSL encryption * and the SSL handshake should have been completed prior to calling this function. * - * @param dcb DCB to drain the write queue of + * @param dcb DCB to drain the write queue of * @return The number of bytes written */ int dcb_drain_writeq_SSL(DCB *dcb) { - int n = 0; - int w; - int saved_errno = 0; - int above_water; + int n = 0; + int w; + int saved_errno = 0; + int above_water; above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0; @@ -1613,73 +1664,73 @@ dcb_drain_writeq_SSL(DCB *dcb) if (dcb->writeq) { - int len; - /* - * Loop over the buffer chain in the pending writeq - * Send as much of the data in that chain as possible and - * leave any balance on the write queue. - */ - while (dcb->writeq != NULL) - { - len = GWBUF_LENGTH(dcb->writeq); - w = gw_write_SSL(dcb->ssl, GWBUF_DATA(dcb->writeq), len); + int len; + /* + * Loop over the buffer chain in the pending writeq + * Send as much of the data in that chain as possible and + * leave any balance on the write queue. + */ + while (dcb->writeq != NULL) + { + len = GWBUF_LENGTH(dcb->writeq); + w = gw_write_SSL(dcb->ssl, GWBUF_DATA(dcb->writeq), len); - if (w < 0) - { - int ssl_errno = SSL_get_error(dcb->ssl,w); + if (w < 0) + { + int ssl_errno = SSL_get_error(dcb->ssl,w); - if(ssl_errno == SSL_ERROR_WANT_WRITE || ssl_errno == SSL_ERROR_WANT_READ) - { - break; - } - MXS_ERROR("Write to dcb failed due to SSL error %d:", ssl_errno); - switch(ssl_errno) - { - case SSL_ERROR_SSL: - case SSL_ERROR_SYSCALL: - while((ssl_errno = ERR_get_error()) != 0) - { - char errbuf[SSL_ERRBUF_LEN]; - ERR_error_string_n(ssl_errno,errbuf,sizeof(errbuf)); - MXS_ERROR("%s", errbuf); - } - if(errno != 0) + if (ssl_errno == SSL_ERROR_WANT_WRITE || ssl_errno == SSL_ERROR_WANT_READ) + { + break; + } + MXS_ERROR("Write to dcb failed due to SSL error %d:", ssl_errno); + switch(ssl_errno) + { + case SSL_ERROR_SSL: + case SSL_ERROR_SYSCALL: + while ((ssl_errno = ERR_get_error()) != 0) + { + char errbuf[SSL_ERRBUF_LEN]; + ERR_error_string_n(ssl_errno,errbuf,sizeof(errbuf)); + MXS_ERROR("%s", errbuf); + } + if (errno != 0) { char errbuf[STRERROR_BUFLEN]; - MXS_ERROR("%d:%s", errno, strerror_r(errno, errbuf, sizeof(errbuf))); + MXS_ERROR("%d:%s", errno, strerror_r(errno, errbuf, sizeof(errbuf))); } - break; - case SSL_ERROR_ZERO_RETURN: - MXS_ERROR("Socket is closed."); - break; + break; + case SSL_ERROR_ZERO_RETURN: + MXS_ERROR("Socket is closed."); + break; - default: - MXS_ERROR("Unexpected error."); - break; - } - break; - - - } - /* - * Pull the number of bytes we have written from - * queue with have. - */ - dcb->writeq = gwbuf_consume(dcb->writeq, w); - n += w; - } + default: + MXS_ERROR("Unexpected error."); + break; + } + break; + } + /* + * Pull the number of bytes we have written from + * queue with have. + */ + dcb->writeq = gwbuf_consume(dcb->writeq, w); + n += w; + } } spinlock_release(&dcb->writeqlock); atomic_add(&dcb->writeqlen, -n); /* The write queue has drained, potentially need to call a callback function */ if (dcb->writeq == NULL) - dcb_call_callback(dcb, DCB_REASON_DRAINED); + { + dcb_call_callback(dcb, DCB_REASON_DRAINED); + } if (above_water && dcb->writeqlen < dcb->low_water) { - atomic_add(&dcb->stats.n_low_water, 1); - dcb_call_callback(dcb, DCB_REASON_LOW_WATER); + atomic_add(&dcb->stats.n_low_water, 1); + dcb_call_callback(dcb, DCB_REASON_LOW_WATER); } return n; @@ -1701,7 +1752,7 @@ dcb_close(DCB *dcb) { CHK_DCB(dcb); - if (DCB_STATE_UNDEFINED == dcb->state + if (DCB_STATE_UNDEFINED == dcb->state || DCB_STATE_DISCONNECTED == dcb->state) { MXS_ERROR("%lu [dcb_close] Error : Removing DCB %p but was in state %s " @@ -1709,9 +1760,9 @@ dcb_close(DCB *dcb) pthread_self(), dcb, STRDCBSTATE(dcb->state)); - raise(SIGABRT); + raise(SIGABRT); } - + /** * dcb_close may be called for freshly created dcb, in which case * it only needs to be freed. @@ -1721,7 +1772,7 @@ dcb_close(DCB *dcb) dcb_final_free(dcb); return; } - + /* * If DCB is in persistent pool, mark it as an error and exit */ @@ -1730,7 +1781,7 @@ dcb_close(DCB *dcb) dcb->dcb_errhandle_called = true; return; } - + spinlock_acquire(&zombiespin); if (!dcb->dcb_is_zombie) { @@ -1753,8 +1804,8 @@ dcb_close(DCB *dcb) nzombies++; if (nzombies > maxzombies) maxzombies = nzombies; /*< Set bit for each maxscale thread. This should be done before - * the state is changed, so as to protect the DCB from premature - * destruction. */ + * the state is changed, so as to protect the DCB from premature + * destruction. */ if (dcb->server) { bitmask_copy(&dcb->memdata.bitmask, poll_bitmask()); @@ -1766,7 +1817,7 @@ dcb_close(DCB *dcb) /** * Add DCB to persistent pool if it qualifies, close otherwise * - * @param dcb The DCB to go to persistent pool or be closed + * @param dcb The DCB to go to persistent pool or be closed * @return bool - whether the DCB was added to the pool * */ @@ -1776,8 +1827,8 @@ dcb_maybe_add_persistent(DCB *dcb) int poolcount = -1; if (dcb->user != NULL && strlen(dcb->user) - && dcb->server - && dcb->server->persistpoolmax + && dcb->server + && dcb->server->persistpoolmax && (dcb->server->status & SERVER_RUNNING) && !dcb->dcb_errhandle_called && !(dcb->flags & DCBF_HUNG) @@ -1790,9 +1841,9 @@ dcb_maybe_add_persistent(DCB *dcb) dcb->dcb_is_zombie = false; dcb->persistentstart = time(NULL); if (dcb->session) - /*< - * Terminate client session. - */ + /*< + * Terminate client session. + */ { SESSION *local_session = dcb->session; session_set_dummy(dcb); @@ -1802,13 +1853,13 @@ dcb_maybe_add_persistent(DCB *dcb) session_free(local_session); } } - spinlock_acquire(&dcb->cb_lock); - while ((loopcallback = dcb->callbacks) != NULL) - { - dcb->callbacks = loopcallback->next; - free(loopcallback); - } - spinlock_release(&dcb->cb_lock); + spinlock_acquire(&dcb->cb_lock); + while ((loopcallback = dcb->callbacks) != NULL) + { + dcb->callbacks = loopcallback->next; + free(loopcallback); + } + spinlock_release(&dcb->cb_lock); spinlock_acquire(&dcb->server->persistlock); dcb->nextpersistent = dcb->server->persistent; dcb->server->persistent = dcb; @@ -1837,47 +1888,55 @@ dcb_maybe_add_persistent(DCB *dcb) /** * Diagnostic to print a DCB * - * @param dcb The DCB to print + * @param dcb The DCB to print * */ void printDCB(DCB *dcb) { - printf("DCB: %p\n", (void *)dcb); - printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state)); - if (dcb->remote) - printf("\tConnected to: %s\n", dcb->remote); - if (dcb->user) - printf("\tUsername: %s\n", dcb->user); - if (dcb->protoname) - printf("\tProtocol: %s\n", dcb->protoname); - if (dcb->writeq) - printf("\tQueued write data: %d\n",gwbuf_length(dcb->writeq)); - char *statusname = server_status(dcb->server); - if (statusname) - { - printf("\tServer status: %s\n", statusname); - free(statusname); - } - char *rolename = dcb_role_name(dcb); - if (rolename) - { - printf("\tRole: %s\n", rolename); - free(rolename); - } - printf("\tStatistics:\n"); - printf("\t\tNo. of Reads: %d\n", - dcb->stats.n_reads); - printf("\t\tNo. of Writes: %d\n", - dcb->stats.n_writes); - printf("\t\tNo. of Buffered Writes: %d\n", - dcb->stats.n_buffered); - printf("\t\tNo. of Accepts: %d\n", - dcb->stats.n_accepts); - printf("\t\tNo. of High Water Events: %d\n", - dcb->stats.n_high_water); - printf("\t\tNo. of Low Water Events: %d\n", - dcb->stats.n_low_water); + printf("DCB: %p\n", (void *)dcb); + printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state)); + if (dcb->remote) + { + printf("\tConnected to: %s\n", dcb->remote); + } + if (dcb->user) + { + printf("\tUsername: %s\n", dcb->user); + } + if (dcb->protoname) + { + printf("\tProtocol: %s\n", dcb->protoname); + } + if (dcb->writeq) + { + printf("\tQueued write data: %d\n",gwbuf_length(dcb->writeq)); + } + char *statusname = server_status(dcb->server); + if (statusname) + { + printf("\tServer status: %s\n", statusname); + free(statusname); + } + char *rolename = dcb_role_name(dcb); + if (rolename) + { + printf("\tRole: %s\n", rolename); + free(rolename); + } + printf("\tStatistics:\n"); + printf("\t\tNo. of Reads: %d\n", + dcb->stats.n_reads); + printf("\t\tNo. of Writes: %d\n", + dcb->stats.n_writes); + printf("\t\tNo. of Buffered Writes: %d\n", + dcb->stats.n_buffered); + printf("\t\tNo. of Accepts: %d\n", + dcb->stats.n_accepts); + printf("\t\tNo. of High Water Events: %d\n", + dcb->stats.n_high_water); + printf("\t\tNo. of Low Water Events: %d\n", + dcb->stats.n_low_water); } /** * Display an entry from the spinlock statistics data @@ -1889,7 +1948,7 @@ printDCB(DCB *dcb) static void spin_reporter(void *dcb, char *desc, int value) { - dcb_printf((DCB *)dcb, "\t\t%-40s %d\n", desc, value); + dcb_printf((DCB *)dcb, "\t\t%-40s %d\n", desc, value); } @@ -1899,16 +1958,16 @@ spin_reporter(void *dcb, char *desc, int value) */ void printAllDCBs() { -DCB *dcb; + DCB *dcb; - spinlock_acquire(&dcbspin); - dcb = allDCBs; - while (dcb) - { - printDCB(dcb); - dcb = dcb->next; - } - spinlock_release(&dcbspin); + spinlock_acquire(&dcbspin); + dcb = allDCBs; + while (dcb) + { + printDCB(dcb); + dcb = dcb->next; + } + spinlock_release(&dcbspin); } /** @@ -1917,302 +1976,331 @@ DCB *dcb; * @param pdcb DCB to print results to * @param dcb DCB to be printed */ -void +void dprintOneDCB(DCB *pdcb, DCB *dcb) { - dcb_printf(pdcb, "DCB: %p\n", (void *)dcb); - dcb_printf(pdcb, "\tDCB state: %s\n", - gw_dcb_state2string(dcb->state)); - if (dcb->session && dcb->session->service) - dcb_printf(pdcb, "\tService: %s\n", - dcb->session->service->name); - if (dcb->remote) - dcb_printf(pdcb, "\tConnected to: %s\n", - dcb->remote); - if (dcb->server) + dcb_printf(pdcb, "DCB: %p\n", (void *)dcb); + dcb_printf(pdcb, "\tDCB state: %s\n", + gw_dcb_state2string(dcb->state)); + if (dcb->session && dcb->session->service) + { + dcb_printf(pdcb, "\tService: %s\n", + dcb->session->service->name); + } + if (dcb->remote) + { + dcb_printf(pdcb, "\tConnected to: %s\n", + dcb->remote); + } + if (dcb->server) + { + if (dcb->server->name) { - if (dcb->server->name) - dcb_printf(pdcb, "\tServer name/IP: %s\n", - dcb->server->name); - if (dcb->server->port) - dcb_printf(pdcb, "\tPort number: %d\n", - dcb->server->port); + dcb_printf(pdcb, "\tServer name/IP: %s\n", + dcb->server->name); } - if (dcb->user) - dcb_printf(pdcb, "\tUsername: %s\n", - dcb->user); - if (dcb->protoname) - dcb_printf(pdcb, "\tProtocol: %s\n", - dcb->protoname); - if (dcb->writeq) - dcb_printf(pdcb, "\tQueued write data: %d\n", - gwbuf_length(dcb->writeq)); - char *statusname = server_status(dcb->server); - if (statusname) - { - dcb_printf(pdcb, "\tServer status: %s\n", statusname); - free(statusname); - } - char *rolename = dcb_role_name(dcb); - if (rolename) - { - dcb_printf(pdcb, "\tRole: %s\n", rolename); - free(rolename); - } - if (!bitmask_isallclear(&dcb->memdata.bitmask)) + if (dcb->server->port) { - char *bitmasktext = bitmask_render_readable(&dcb->memdata.bitmask); - if (bitmasktext) - { - dcb_printf(pdcb, "\tBitMask: %s\n", bitmasktext); - free(bitmasktext); - } + dcb_printf(pdcb, "\tPort number: %d\n", + dcb->server->port); } - dcb_printf(pdcb, "\tStatistics:\n"); - dcb_printf(pdcb, "\t\tNo. of Reads: %d\n", dcb->stats.n_reads); - dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes); - dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered); - dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); - dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); - dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); - if (dcb->flags & DCBF_CLONE) - dcb_printf(pdcb, "\t\tDCB is a clone.\n"); - if (dcb->persistentstart) + } + if (dcb->user) + { + dcb_printf(pdcb, "\tUsername: %s\n", + dcb->user); + } + if (dcb->protoname) + { + dcb_printf(pdcb, "\tProtocol: %s\n", + dcb->protoname); + } + if (dcb->writeq) + { + dcb_printf(pdcb, "\tQueued write data: %d\n", + gwbuf_length(dcb->writeq)); + } + char *statusname = server_status(dcb->server); + if (statusname) + { + dcb_printf(pdcb, "\tServer status: %s\n", statusname); + free(statusname); + } + char *rolename = dcb_role_name(dcb); + if (rolename) + { + dcb_printf(pdcb, "\tRole: %s\n", rolename); + free(rolename); + } + if (!bitmask_isallclear(&dcb->memdata.bitmask)) + { + char *bitmasktext = bitmask_render_readable(&dcb->memdata.bitmask); + if (bitmasktext) { - char buff[20]; - struct tm timeinfo; - localtime_r(&dcb->persistentstart, &timeinfo); - strftime(buff, sizeof(buff), "%b %d %H:%M:%S", &timeinfo); - dcb_printf(pdcb, "\t\tAdded to persistent pool: %s\n", buff); + dcb_printf(pdcb, "\tBitMask: %s\n", bitmasktext); + free(bitmasktext); } + } + dcb_printf(pdcb, "\tStatistics:\n"); + dcb_printf(pdcb, "\t\tNo. of Reads: %d\n", dcb->stats.n_reads); + dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", dcb->stats.n_writes); + dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered); + dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); + dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); + dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); + if (dcb->flags & DCBF_CLONE) + { + dcb_printf(pdcb, "\t\tDCB is a clone.\n"); + } + if (dcb->persistentstart) + { + char buff[20]; + struct tm timeinfo; + localtime_r(&dcb->persistentstart, &timeinfo); + strftime(buff, sizeof(buff), "%b %d %H:%M:%S", &timeinfo); + dcb_printf(pdcb, "\t\tAdded to persistent pool: %s\n", buff); + } } /** * Diagnostic to print all DCB allocated in the system * * @param pdcb DCB to print results to */ -void +void dprintAllDCBs(DCB *pdcb) { -DCB *dcb; + DCB *dcb; - spinlock_acquire(&dcbspin); + spinlock_acquire(&dcbspin); #if SPINLOCK_PROFILE - dcb_printf(pdcb, "DCB List Spinlock Statistics:\n"); - spinlock_stats(&dcbspin, spin_reporter, pdcb); - dcb_printf(pdcb, "Zombie Queue Lock Statistics:\n"); - spinlock_stats(&zombiespin, spin_reporter, pdcb); + dcb_printf(pdcb, "DCB List Spinlock Statistics:\n"); + spinlock_stats(&dcbspin, spin_reporter, pdcb); + dcb_printf(pdcb, "Zombie Queue Lock Statistics:\n"); + spinlock_stats(&zombiespin, spin_reporter, pdcb); #endif - dcb = allDCBs; - while (dcb) - { - dprintOneDCB(pdcb, dcb); - dcb = dcb->next; - } - spinlock_release(&dcbspin); + dcb = allDCBs; + while (dcb) + { + dprintOneDCB(pdcb, dcb); + dcb = dcb->next; + } + spinlock_release(&dcbspin); } -/** +/** * Diagnostic routine to print DCB data in a tabular form. - * + * * @param pdcb DCB to print results to */ void dListDCBs(DCB *pdcb) { -DCB *dcb; + DCB *dcb; - spinlock_acquire(&dcbspin); - dcb = allDCBs; - dcb_printf(pdcb, "Descriptor Control Blocks\n"); - dcb_printf(pdcb, "------------------+----------------------------+--------------------+----------\n"); - dcb_printf(pdcb, " %-16s | %-26s | %-18s | %s\n", - "DCB", "State", "Service", "Remote"); - dcb_printf(pdcb, "------------------+----------------------------+--------------------+----------\n"); - while (dcb) - { - dcb_printf(pdcb, " %-16p | %-26s | %-18s | %s\n", - dcb, gw_dcb_state2string(dcb->state), - - ((dcb->session && dcb->session->service) ? dcb->session->service->name : ""), - (dcb->remote ? dcb->remote : "")); - dcb = dcb->next; - } - dcb_printf(pdcb, "------------------+----------------------------+--------------------+----------\n\n"); - spinlock_release(&dcbspin); + spinlock_acquire(&dcbspin); + dcb = allDCBs; + dcb_printf(pdcb, "Descriptor Control Blocks\n"); + dcb_printf(pdcb, "------------------+----------------------------+--------------------+----------\n"); + dcb_printf(pdcb, " %-16s | %-26s | %-18s | %s\n", + "DCB", "State", "Service", "Remote"); + dcb_printf(pdcb, "------------------+----------------------------+--------------------+----------\n"); + while (dcb) + { + dcb_printf(pdcb, " %-16p | %-26s | %-18s | %s\n", + dcb, gw_dcb_state2string(dcb->state), + ((dcb->session && dcb->session->service) ? dcb->session->service->name : ""), + (dcb->remote ? dcb->remote : "")); + dcb = dcb->next; + } + dcb_printf(pdcb, "------------------+----------------------------+--------------------+----------\n\n"); + spinlock_release(&dcbspin); } -/** +/** * Diagnostic routine to print client DCB data in a tabular form. - * + * * @param pdcb DCB to print results to */ void dListClients(DCB *pdcb) { -DCB *dcb; + DCB *dcb; - spinlock_acquire(&dcbspin); - dcb = allDCBs; - dcb_printf(pdcb, "Client Connections\n"); - dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n"); - dcb_printf(pdcb, " %-15s | %-16s | %-20s | %s\n", - "Client", "DCB", "Service", "Session"); - dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n"); - while (dcb) - { - if (dcb_isclient(dcb) - && dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER) - { - dcb_printf(pdcb, " %-15s | %16p | %-20s | %10p\n", - (dcb->remote ? dcb->remote : ""), - dcb, (dcb->session->service ? - dcb->session->service->name : ""), - dcb->session); - } - dcb = dcb->next; - } - dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n\n"); - spinlock_release(&dcbspin); + spinlock_acquire(&dcbspin); + dcb = allDCBs; + dcb_printf(pdcb, "Client Connections\n"); + dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n"); + dcb_printf(pdcb, " %-15s | %-16s | %-20s | %s\n", + "Client", "DCB", "Service", "Session"); + dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n"); + while (dcb) + { + if (dcb_isclient(dcb) && dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER) + { + dcb_printf(pdcb, " %-15s | %16p | %-20s | %10p\n", + (dcb->remote ? dcb->remote : ""), + dcb, (dcb->session->service ? + dcb->session->service->name : ""), + dcb->session); + } + dcb = dcb->next; + } + dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n\n"); + spinlock_release(&dcbspin); } /** * Diagnostic to print a DCB to another DCB * - * @param pdcb The DCB to which send the output - * @param dcb The DCB to print + * @param pdcb The DCB to which send the output + * @param dcb The DCB to print */ void dprintDCB(DCB *pdcb, DCB *dcb) { - dcb_printf(pdcb, "DCB: %p\n", (void *)dcb); - dcb_printf(pdcb, "\tDCB state: %s\n", gw_dcb_state2string(dcb->state)); - if (dcb->session && dcb->session->service) - dcb_printf(pdcb, "\tService: %s\n", - dcb->session->service->name); - if (dcb->remote) - dcb_printf(pdcb, "\tConnected to: %s\n", dcb->remote); - if (dcb->user) - dcb_printf(pdcb, "\tUsername: %s\n", - dcb->user); - if (dcb->protoname) - dcb_printf(pdcb, "\tProtocol: %s\n", - dcb->protoname); - dcb_printf(pdcb, "\tOwning Session: %p\n", dcb->session); - if (dcb->writeq) - dcb_printf(pdcb, "\tQueued write data: %d\n", gwbuf_length(dcb->writeq)); - if (dcb->delayq) - dcb_printf(pdcb, "\tDelayed write data: %d\n", gwbuf_length(dcb->delayq)); - char *statusname = server_status(dcb->server); - if (statusname) - { - dcb_printf(pdcb, "\tServer status: %s\n", statusname); - free(statusname); - } - char *rolename = dcb_role_name(dcb); - if (rolename) - { - dcb_printf(pdcb, "\tRole: %s\n", rolename); - free(rolename); - } - dcb_printf(pdcb, "\tStatistics:\n"); - dcb_printf(pdcb, "\t\tNo. of Reads: %d\n", - dcb->stats.n_reads); - dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", - dcb->stats.n_writes); - dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", - dcb->stats.n_buffered); - dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", - dcb->stats.n_accepts); - dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", - dcb->stats.n_high_water); - dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", - dcb->stats.n_low_water); - if (DCB_POLL_BUSY(dcb)) - { - dcb_printf(pdcb, "\t\tPending events in the queue: %x %s\n", - dcb->evq.pending_events, dcb->evq.processing ? "(processing)" : ""); - - } - if (dcb->flags & DCBF_CLONE) - dcb_printf(pdcb, "\t\tDCB is a clone.\n"); + dcb_printf(pdcb, "DCB: %p\n", (void *)dcb); + dcb_printf(pdcb, "\tDCB state: %s\n", gw_dcb_state2string(dcb->state)); + if (dcb->session && dcb->session->service) + { + dcb_printf(pdcb, "\tService: %s\n", + dcb->session->service->name); + } + if (dcb->remote) + { + dcb_printf(pdcb, "\tConnected to: %s\n", dcb->remote); + } + if (dcb->user) + { + dcb_printf(pdcb, "\tUsername: %s\n", + dcb->user); + } + if (dcb->protoname) + { + dcb_printf(pdcb, "\tProtocol: %s\n", + dcb->protoname); + } + dcb_printf(pdcb, "\tOwning Session: %p\n", dcb->session); + if (dcb->writeq) + { + dcb_printf(pdcb, "\tQueued write data: %d\n", gwbuf_length(dcb->writeq)); + } + if (dcb->delayq) + { + dcb_printf(pdcb, "\tDelayed write data: %d\n", gwbuf_length(dcb->delayq)); + } + char *statusname = server_status(dcb->server); + if (statusname) + { + dcb_printf(pdcb, "\tServer status: %s\n", statusname); + free(statusname); + } + char *rolename = dcb_role_name(dcb); + if (rolename) + { + dcb_printf(pdcb, "\tRole: %s\n", rolename); + free(rolename); + } + dcb_printf(pdcb, "\tStatistics:\n"); + dcb_printf(pdcb, "\t\tNo. of Reads: %d\n", + dcb->stats.n_reads); + dcb_printf(pdcb, "\t\tNo. of Writes: %d\n", + dcb->stats.n_writes); + dcb_printf(pdcb, "\t\tNo. of Buffered Writes: %d\n", + dcb->stats.n_buffered); + dcb_printf(pdcb, "\t\tNo. of Accepts: %d\n", + dcb->stats.n_accepts); + dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", + dcb->stats.n_high_water); + dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", + dcb->stats.n_low_water); + if (DCB_POLL_BUSY(dcb)) + { + dcb_printf(pdcb, "\t\tPending events in the queue: %x %s\n", + dcb->evq.pending_events, dcb->evq.processing ? "(processing)" : ""); + } + if (dcb->flags & DCBF_CLONE) + { + dcb_printf(pdcb, "\t\tDCB is a clone.\n"); + } #if SPINLOCK_PROFILE - dcb_printf(pdcb, "\tInitlock Statistics:\n"); - spinlock_stats(&dcb->dcb_initlock, spin_reporter, pdcb); - dcb_printf(pdcb, "\tWrite Queue Lock Statistics:\n"); - spinlock_stats(&dcb->writeqlock, spin_reporter, pdcb); - dcb_printf(pdcb, "\tDelay Queue Lock Statistics:\n"); - spinlock_stats(&dcb->delayqlock, spin_reporter, pdcb); - dcb_printf(pdcb, "\tPollin Lock Statistics:\n"); - spinlock_stats(&dcb->pollinlock, spin_reporter, pdcb); - dcb_printf(pdcb, "\tPollout Lock Statistics:\n"); - spinlock_stats(&dcb->polloutlock, spin_reporter, pdcb); - dcb_printf(pdcb, "\tCallback Lock Statistics:\n"); - spinlock_stats(&dcb->cb_lock, spin_reporter, pdcb); + dcb_printf(pdcb, "\tInitlock Statistics:\n"); + spinlock_stats(&dcb->dcb_initlock, spin_reporter, pdcb); + dcb_printf(pdcb, "\tWrite Queue Lock Statistics:\n"); + spinlock_stats(&dcb->writeqlock, spin_reporter, pdcb); + dcb_printf(pdcb, "\tDelay Queue Lock Statistics:\n"); + spinlock_stats(&dcb->delayqlock, spin_reporter, pdcb); + dcb_printf(pdcb, "\tPollin Lock Statistics:\n"); + spinlock_stats(&dcb->pollinlock, spin_reporter, pdcb); + dcb_printf(pdcb, "\tPollout Lock Statistics:\n"); + spinlock_stats(&dcb->polloutlock, spin_reporter, pdcb); + dcb_printf(pdcb, "\tCallback Lock Statistics:\n"); + spinlock_stats(&dcb->cb_lock, spin_reporter, pdcb); #endif - if (dcb->persistentstart) - { - char buff[20]; - struct tm timeinfo; - localtime_r(&dcb->persistentstart, &timeinfo); - strftime(buff, sizeof(buff), "%b %d %H:%M:%S", &timeinfo); - dcb_printf(pdcb, "\t\tAdded to persistent pool: %s\n", buff); - } + if (dcb->persistentstart) + { + char buff[20]; + struct tm timeinfo; + localtime_r(&dcb->persistentstart, &timeinfo); + strftime(buff, sizeof(buff), "%b %d %H:%M:%S", &timeinfo); + dcb_printf(pdcb, "\t\tAdded to persistent pool: %s\n", buff); + } } /** * Return a string representation of a DCB state. * - * @param state The DCB state + * @param state The DCB state * @return String representation of the state * */ const char * -gw_dcb_state2string (int state) +gw_dcb_state2string(int state) { - switch(state) { - case DCB_STATE_ALLOC: - return "DCB Allocated"; - case DCB_STATE_POLLING: - return "DCB in the polling loop"; - case DCB_STATE_NOPOLLING: - return "DCB not in polling loop"; - case DCB_STATE_LISTENING: - return "DCB for listening socket"; - case DCB_STATE_DISCONNECTED: - return "DCB socket closed"; - case DCB_STATE_ZOMBIE: - return "DCB Zombie"; - case DCB_STATE_UNDEFINED: - return "DCB undefined state"; - default: - return "DCB (unknown - erroneous)"; - } + switch(state) { + case DCB_STATE_ALLOC: + return "DCB Allocated"; + case DCB_STATE_POLLING: + return "DCB in the polling loop"; + case DCB_STATE_NOPOLLING: + return "DCB not in polling loop"; + case DCB_STATE_LISTENING: + return "DCB for listening socket"; + case DCB_STATE_DISCONNECTED: + return "DCB socket closed"; + case DCB_STATE_ZOMBIE: + return "DCB Zombie"; + case DCB_STATE_UNDEFINED: + return "DCB undefined state"; + default: + return "DCB (unknown - erroneous)"; + } } /** * A DCB based wrapper for printf. Allows formatting printing to * a descriptor control block. * - * @param dcb Descriptor to write to - * @param fmt A printf format string - * @param ... Variable arguments for the print format + * @param dcb Descriptor to write to + * @param fmt A printf format string + * @param ... Variable arguments for the print format */ void dcb_printf(DCB *dcb, const char *fmt, ...) { -GWBUF *buf; -va_list args; + GWBUF *buf; + va_list args; - if ((buf = gwbuf_alloc(10240)) == NULL) - return; - va_start(args, fmt); - vsnprintf(GWBUF_DATA(buf), 10240, fmt, args); - va_end(args); + if ((buf = gwbuf_alloc(10240)) == NULL) + { + return; + } + va_start(args, fmt); + vsnprintf(GWBUF_DATA(buf), 10240, fmt, args); + va_end(args); - buf->end = GWBUF_DATA(buf) + strlen(GWBUF_DATA(buf)); - dcb->func.write(dcb, buf); + buf->end = GWBUF_DATA(buf) + strlen(GWBUF_DATA(buf)); + dcb->func.write(dcb, buf); } /** @@ -2224,67 +2312,67 @@ va_list args; int dcb_isclient(DCB *dcb) { - if (dcb->state != DCB_STATE_LISTENING && dcb->session) - { - if (dcb->session->client) - { - return (dcb->session && dcb == dcb->session->client); - } - } + if (dcb->state != DCB_STATE_LISTENING && dcb->session) + { + if (dcb->session->client) + { + return (dcb->session && dcb == dcb->session->client); + } + } - return 0; + return 0; } /** * Print hash table statistics to a DCB * - * @param dcb The DCB to send the information to - * @param table The hash table + * @param dcb The DCB to send the information to + * @param table The hash table */ void dcb_hashtable_stats( - DCB *dcb, - void *table) + DCB *dcb, + void *table) { - int total; - int longest; - int hashsize; + int total; + int longest; + int hashsize; - total = 0; - longest = 0; + total = 0; + longest = 0; - hashtable_get_stats(table, &hashsize, &total, &longest); + hashtable_get_stats(table, &hashsize, &total, &longest); - dcb_printf(dcb, - "Hashtable: %p, size %d\n", - table, - hashsize); - - dcb_printf(dcb, "\tNo. of entries: %d\n", total); - dcb_printf(dcb, - "\tAverage chain length: %.1f\n", - (hashsize == 0 ? (float)hashsize : (float)total / hashsize)); - dcb_printf(dcb, "\tLongest chain length: %d\n", longest); + dcb_printf(dcb, + "Hashtable: %p, size %d\n", + table, + hashsize); + + dcb_printf(dcb, "\tNo. of entries: %d\n", total); + dcb_printf(dcb, + "\tAverage chain length: %.1f\n", + (hashsize == 0 ? (float)hashsize : (float)total / hashsize)); + dcb_printf(dcb, "\tLongest chain length: %d\n", longest); } /** * Write data to a socket through an SSL structure. The SSL structure is linked to a DCB's socket * and all communication is encrypted and done via the SSL structure. * - * @param ssl The SSL structure to use for writing - * @param buf Buffer to write - * @param nbytes Number of bytes to write + * @param ssl The SSL structure to use for writing + * @param buf Buffer to write + * @param nbytes Number of bytes to write * @return Number of written bytes */ int gw_write_SSL(SSL* ssl, const void *buf, size_t nbytes) { int w = 0; - int fd = SSL_get_fd(ssl); + int fd = SSL_get_fd(ssl); - if (fd > 0) - { - w = SSL_write(ssl, buf, nbytes); - } + if (fd > 0) + { + w = SSL_write(ssl, buf, nbytes); + } return w; } @@ -2293,85 +2381,87 @@ gw_write_SSL(SSL* ssl, const void *buf, size_t nbytes) /** * Write data to a DCB * - * @param dcb The DCB to write buffer - * @param buf Buffer to write - * @param nbytes Number of bytes to write + * @param dcb The DCB to write buffer + * @param buf Buffer to write + * @param nbytes Number of bytes to write * @return Number of written bytes */ int gw_write(DCB *dcb, const void *buf, size_t nbytes) { - int w = 0; - int fd = dcb->fd; -#if defined(FAKE_CODE) - if (fd > 0 && dcb_fake_write_errno[fd] != 0) - { - ss_dassert(dcb_fake_write_ev[fd] != 0); - w = write(fd, buf, nbytes/2); /*< leave peer to read missing bytes */ + int w = 0; + int fd = dcb->fd; +#if defined(FAKE_CODE) + if (fd > 0 && dcb_fake_write_errno[fd] != 0) + { + ss_dassert(dcb_fake_write_ev[fd] != 0); + w = write(fd, buf, nbytes/2); /*< leave peer to read missing bytes */ - if (w > 0) { - w = -1; - errno = dcb_fake_write_errno[fd]; - } - } else if (fd > 0) - { - w = write(fd, buf, nbytes); + if (w > 0) + { + w = -1; + errno = dcb_fake_write_errno[fd]; } + } + else if (fd > 0) + { + w = write(fd, buf, nbytes); + } #else - if (fd > 0) - { - w = write(fd, buf, nbytes); - } + if (fd > 0) + { + w = write(fd, buf, nbytes); + } #endif /* FAKE_CODE */ #if defined(SS_DEBUG_MYSQL) + { + size_t len; + uint8_t* packet = (uint8_t *)buf; + char* str; + + /** Print only MySQL packets */ + if (w > 5) { - size_t len; - uint8_t* packet = (uint8_t *)buf; - char* str; - - /** Print only MySQL packets */ - if (w > 5) + str = (char *)&packet[5]; + len = packet[0]; + len += 256*packet[1]; + len += 256*256*packet[2]; + + if (strncmp(str, "insert", 6) == 0 || + strncmp(str, "create", 6) == 0 || + strncmp(str, "drop", 4) == 0) + { + ss_dassert((dcb->dcb_server_status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE))==(SERVER_RUNNING|SERVER_MASTER)); + } + + if (strncmp(str, "set autocommit", 14) == 0 && nbytes > 17) + { + char* s = (char *)calloc(1, nbytes+1); + + if (nbytes-5 > len) { - str = (char *)&packet[5]; - len = packet[0]; - len += 256*packet[1]; - len += 256*256*packet[2]; - - if (strncmp(str, "insert", 6) == 0 || - strncmp(str, "create", 6) == 0 || - strncmp(str, "drop", 4) == 0) - { - ss_dassert((dcb->dcb_server_status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE))==(SERVER_RUNNING|SERVER_MASTER)); - } - - if (strncmp(str, "set autocommit", 14) == 0 && nbytes > 17) - { - char* s = (char *)calloc(1, nbytes+1); - - if (nbytes-5 > len) - { - size_t len2 = packet[4+len]; - len2 += 256*packet[4+len+1]; - len2 += 256*256*packet[4+len+2]; - - char* str2 = (char *)&packet[4+len+5]; - snprintf(s, 5+len+len2, "long %s %s", (char *)str, (char *)str2); - } - else - { - snprintf(s, len, "%s", (char *)str); - } - MXS_INFO("%lu [gw_write] Wrote %d bytes : %s ", - pthread_self(), - w, - s); - free(s); - } + size_t len2 = packet[4+len]; + len2 += 256*packet[4+len+1]; + len2 += 256*256*packet[4+len+2]; + + char* str2 = (char *)&packet[4+len+5]; + snprintf(s, 5+len+len2, "long %s %s", (char *)str, (char *)str2); } + else + { + snprintf(s, len, "%s", (char *)str); + } + MXS_INFO("%lu [gw_write] Wrote %d bytes : %s ", + pthread_self(), + w, + s); + free(s); + } } + } #endif - return w; + return w; } /** @@ -2383,54 +2473,57 @@ gw_write(DCB *dcb, const void *buf, size_t nbytes) * An error will also be returned if the is insufficient memeory available to * create the registration. * - * @param dcb The DCB to add the callback to - * @param reason The callback reason - * @param callback The callback function to call - * @param userdata User data to send in the call - * @return Non-zero (true) if the callback was added + * @param dcb The DCB to add the callback to + * @param reason The callback reason + * @param callback The callback function to call + * @param userdata User data to send in the call + * @return Non-zero (true) if the callback was added */ int -dcb_add_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata) +dcb_add_callback(DCB *dcb, + DCB_REASON reason, + int (*callback)(struct dcb *, DCB_REASON, void *), + void *userdata) { -DCB_CALLBACK *cb, *ptr; -int rval = 1; + DCB_CALLBACK *cb, *ptr; + int rval = 1; - if ((ptr = (DCB_CALLBACK *)malloc(sizeof(DCB_CALLBACK))) == NULL) - { - return 0; - } - ptr->reason = reason; - ptr->cb = callback; - ptr->userdata = userdata; - ptr->next = NULL; - spinlock_acquire(&dcb->cb_lock); - cb = dcb->callbacks; - if (cb == NULL) - { - dcb->callbacks = ptr; - spinlock_release(&dcb->cb_lock); - } - else - { - while (cb) - { - if (cb->reason == reason && cb->cb == callback && - cb->userdata == userdata) - { - free(ptr); - spinlock_release(&dcb->cb_lock); - return 0; - } - if (cb->next == NULL) - { - cb->next = ptr; - break; - } - cb = cb->next; - } - spinlock_release(&dcb->cb_lock); - } - return rval; + if ((ptr = (DCB_CALLBACK *)malloc(sizeof(DCB_CALLBACK))) == NULL) + { + return 0; + } + ptr->reason = reason; + ptr->cb = callback; + ptr->userdata = userdata; + ptr->next = NULL; + spinlock_acquire(&dcb->cb_lock); + cb = dcb->callbacks; + if (cb == NULL) + { + dcb->callbacks = ptr; + spinlock_release(&dcb->cb_lock); + } + else + { + while (cb) + { + if (cb->reason == reason && cb->cb == callback && + cb->userdata == userdata) + { + free(ptr); + spinlock_release(&dcb->cb_lock); + return 0; + } + if (cb->next == NULL) + { + cb->next = ptr; + break; + } + cb = cb->next; + } + spinlock_release(&dcb->cb_lock); + } + return rval; } /** @@ -2439,99 +2532,111 @@ int rval = 1; * Searches down the linked list to find the callback with a matching reason, function * and userdata. * - * @param dcb The DCB to add the callback to - * @param reason The callback reason - * @param callback The callback function to call - * @param userdata User data to send in the call - * @return Non-zero (true) if the callback was removed + * @param dcb The DCB to add the callback to + * @param reason The callback reason + * @param callback The callback function to call + * @param userdata User data to send in the call + * @return Non-zero (true) if the callback was removed */ int -dcb_remove_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata) +dcb_remove_callback(DCB *dcb, + DCB_REASON reason, + int (*callback)(struct dcb *, DCB_REASON, void *), + void *userdata) { -DCB_CALLBACK *cb, *pcb = NULL; -int rval = 0; + DCB_CALLBACK *cb, *pcb = NULL; + int rval = 0; - spinlock_acquire(&dcb->cb_lock); - cb = dcb->callbacks; - if (cb == NULL) - { - rval = 0; - } - else - { - while (cb) - { - if (cb->reason == reason && cb->cb == callback - && cb->userdata == userdata) - { - if (pcb != NULL) - pcb->next = cb->next; - else - dcb->callbacks = cb->next; - spinlock_release(&dcb->cb_lock); - free(cb); - rval = 1; - break; - } - pcb = cb; - cb = cb->next; - } - } - if (!rval) - spinlock_release(&dcb->cb_lock); - return rval; + spinlock_acquire(&dcb->cb_lock); + cb = dcb->callbacks; + if (cb == NULL) + { + rval = 0; + } + else + { + while (cb) + { + if (cb->reason == reason && + cb->cb == callback && + cb->userdata == userdata) + { + if (pcb != NULL) + { + pcb->next = cb->next; + } + else + { + dcb->callbacks = cb->next; + } + spinlock_release(&dcb->cb_lock); + free(cb); + rval = 1; + break; + } + pcb = cb; + cb = cb->next; + } + } + if (!rval) + { + spinlock_release(&dcb->cb_lock); + } + return rval; } /** * Call the set of callbacks registered for a particular reason. * - * @param dcb The DCB to call the callbacks regarding - * @param reason The reason that has triggered the call + * @param dcb The DCB to call the callbacks regarding + * @param reason The reason that has triggered the call */ static void dcb_call_callback(DCB *dcb, DCB_REASON reason) { -DCB_CALLBACK *cb, *nextcb; + DCB_CALLBACK *cb, *nextcb; - spinlock_acquire(&dcb->cb_lock); - cb = dcb->callbacks; - while (cb) - { - if (cb->reason == reason) - { - nextcb = cb->next; - spinlock_release(&dcb->cb_lock); - - MXS_DEBUG("%lu [dcb_call_callback] %s", - pthread_self(), - STRDCBREASON(reason)); - - cb->cb(dcb, reason, cb->userdata); - spinlock_acquire(&dcb->cb_lock); - cb = nextcb; - } - else - cb = cb->next; - } - spinlock_release(&dcb->cb_lock); + spinlock_acquire(&dcb->cb_lock); + cb = dcb->callbacks; + while (cb) + { + if (cb->reason == reason) + { + nextcb = cb->next; + spinlock_release(&dcb->cb_lock); + + MXS_DEBUG("%lu [dcb_call_callback] %s", + pthread_self(), + STRDCBREASON(reason)); + + cb->cb(dcb, reason, cb->userdata); + spinlock_acquire(&dcb->cb_lock); + cb = nextcb; + } + else + { + cb = cb->next; + } + } + spinlock_release(&dcb->cb_lock); } /** * Check the passed DCB to ensure it is in the list of allDCBS * - * @param dcb The DCB to check - * @return 1 if the DCB is in the list, otherwise 0 + * @param dcb The DCB to check + * @return 1 if the DCB is in the list, otherwise 0 */ int dcb_isvalid(DCB *dcb) { -int rval = 0; + int rval = 0; if (dcb) { - spinlock_acquire(&dcbspin); + spinlock_acquire(&dcbspin); rval = dcb_isvalid_nolock(dcb); - spinlock_release(&dcbspin); + spinlock_release(&dcbspin); } return rval; @@ -2546,14 +2651,14 @@ int rval = 0; static inline DCB * dcb_find_in_list (DCB *dcb) { - DCB *ptr = NULL; - if (dcb) + DCB *ptr = NULL; + if (dcb) { - ptr = allDCBs; - while (ptr && ptr != dcb) - { - ptr = ptr->next; - } + ptr = allDCBs; + while (ptr && ptr != dcb) + { + ptr = ptr->next; + } } return ptr; } @@ -2562,8 +2667,8 @@ dcb_find_in_list (DCB *dcb) * Check the passed DCB to ensure it is in the list of allDCBS. * Requires that the DCB list is already locked before call. * - * @param dcb The DCB to check - * @return 1 if the DCB is in the list, otherwise 0 + * @param dcb The DCB to check + * @return 1 if the DCB is in the list, otherwise 0 */ static inline int dcb_isvalid_nolock(DCB *dcb) @@ -2575,92 +2680,92 @@ dcb_isvalid_nolock(DCB *dcb) /** * Get the next DCB in the list of all DCB's * - * @param dcb The current DCB - * @return The pointer to the next DCB or NULL if this is the last + * @param dcb The current DCB + * @return The pointer to the next DCB or NULL if this is the last */ static DCB * -dcb_get_next (DCB *dcb) +dcb_get_next(DCB *dcb) { - spinlock_acquire(&dcbspin); - if (dcb) { - dcb = dcb_isvalid_nolock(dcb) ? dcb->next : NULL; - } - else dcb = allDCBs; - spinlock_release(&dcbspin); - - return dcb; -} + spinlock_acquire(&dcbspin); + if (dcb) { + dcb = dcb_isvalid_nolock(dcb) ? dcb->next : NULL; + } + else dcb = allDCBs; + spinlock_release(&dcbspin); -/** - * Call all the callbacks on all DCB's that match the server and the reason given - * - * @param reason The DCB_REASON that triggers the callback - */ -void -dcb_call_foreach(struct server* server, DCB_REASON reason) -{ - MXS_DEBUG("%lu [dcb_call_foreach]", pthread_self()); - - switch (reason) { - case DCB_REASON_CLOSE: - case DCB_REASON_DRAINED: - case DCB_REASON_HIGH_WATER: - case DCB_REASON_LOW_WATER: - case DCB_REASON_ERROR: - case DCB_REASON_HUP: - case DCB_REASON_NOT_RESPONDING: - { - DCB *dcb; - spinlock_acquire(&dcbspin); - dcb = allDCBs; - - while (dcb != NULL) - { - spinlock_acquire(&dcb->dcb_initlock); - if (dcb->state == DCB_STATE_POLLING && dcb->server && - strcmp(dcb->server->unique_name,server->unique_name) == 0) - { - dcb_call_callback(dcb, DCB_REASON_NOT_RESPONDING); - } - spinlock_release(&dcb->dcb_initlock); - dcb = dcb->next; - } - spinlock_release(&dcbspin); - break; - } - - default: - break; - } - return; + return dcb; } /** * Call all the callbacks on all DCB's that match the server and the reason given * - * @param reason The DCB_REASON that triggers the callback + * @param reason The DCB_REASON that triggers the callback + */ +void +dcb_call_foreach(struct server* server, DCB_REASON reason) +{ + MXS_DEBUG("%lu [dcb_call_foreach]", pthread_self()); + + switch (reason) { + case DCB_REASON_CLOSE: + case DCB_REASON_DRAINED: + case DCB_REASON_HIGH_WATER: + case DCB_REASON_LOW_WATER: + case DCB_REASON_ERROR: + case DCB_REASON_HUP: + case DCB_REASON_NOT_RESPONDING: + { + DCB *dcb; + spinlock_acquire(&dcbspin); + dcb = allDCBs; + + while (dcb != NULL) + { + spinlock_acquire(&dcb->dcb_initlock); + if (dcb->state == DCB_STATE_POLLING && dcb->server && + strcmp(dcb->server->unique_name,server->unique_name) == 0) + { + dcb_call_callback(dcb, DCB_REASON_NOT_RESPONDING); + } + spinlock_release(&dcb->dcb_initlock); + dcb = dcb->next; + } + spinlock_release(&dcbspin); + break; + } + + default: + break; + } + return; +} + +/** + * Call all the callbacks on all DCB's that match the server and the reason given + * + * @param reason The DCB_REASON that triggers the callback */ void dcb_hangup_foreach(struct server* server) { MXS_DEBUG("%lu [dcb_hangup_foreach]", pthread_self()); - - DCB *dcb; - spinlock_acquire(&dcbspin); - dcb = allDCBs; - while (dcb != NULL) - { - spinlock_acquire(&dcb->dcb_initlock); - if (dcb->state == DCB_STATE_POLLING && dcb->server && - strcmp(dcb->server->unique_name,server->unique_name) == 0) - { - poll_fake_hangup_event(dcb); - } - spinlock_release(&dcb->dcb_initlock); - dcb = dcb->next; - } - spinlock_release(&dcbspin); + DCB *dcb; + spinlock_acquire(&dcbspin); + dcb = allDCBs; + + while (dcb != NULL) + { + spinlock_acquire(&dcb->dcb_initlock); + if (dcb->state == DCB_STATE_POLLING && dcb->server && + strcmp(dcb->server->unique_name,server->unique_name) == 0) + { + poll_fake_hangup_event(dcb); + } + spinlock_release(&dcb->dcb_initlock); + dcb = dcb->next; + } + spinlock_release(&dcbspin); } @@ -2668,53 +2773,53 @@ dcb_hangup_foreach(struct server* server) * Null protocol write routine used for cloned dcb's. It merely consumes * buffers written on the cloned DCB and sets the DCB_REPLIED flag. * - * @param dcb The descriptor control block - * @param buf The buffer being written - * @return Always returns a good write operation result + * @param dcb The descriptor control block + * @param buf The buffer being written + * @return Always returns a good write operation result */ static int dcb_null_write(DCB *dcb, GWBUF *buf) { - while (buf) - { - buf = gwbuf_consume(buf, GWBUF_LENGTH(buf)); - } - - dcb->flags |= DCBF_REPLIED; + while (buf) + { + buf = gwbuf_consume(buf, GWBUF_LENGTH(buf)); + } - return 1; + dcb->flags |= DCBF_REPLIED; + + return 1; } /** * Null protocol close operation for use by cloned DCB's. * - * @param dcb The DCB being closed. + * @param dcb The DCB being closed. */ static int dcb_null_close(DCB *dcb) { - return 0; + return 0; } /** * Null protocol auth operation for use by cloned DCB's. * - * @param dcb The DCB being closed. - * @param server The server to auth against - * @param session The user session - * @param buf The buffer with the new auth request + * @param dcb The DCB being closed. + * @param server The server to auth against + * @param session The user session + * @param buf The buffer with the new auth request */ static int dcb_null_auth(DCB *dcb, SERVER *server, SESSION *session, GWBUF *buf) { - return 0; + return 0; } /** * Check persistent pool for expiry or excess size and count * - * @param dcb The DCB being closed. - * @param cleanall Boolean, if true the whole pool is cleared for the + * @param dcb The DCB being closed. + * @param cleanall Boolean, if true the whole pool is cleared for the * server related to the given DCB * @return A count of the DCBs remaining in the pool */ @@ -2728,16 +2833,16 @@ dcb_persistent_clean_count(DCB *dcb, bool cleanall) DCB *previousdcb = NULL; DCB *persistentdcb, *nextdcb; DCB *disposals = NULL; - + CHK_SERVER(server); spinlock_acquire(&server->persistlock); persistentdcb = server->persistent; while (persistentdcb) { CHK_DCB(persistentdcb); nextdcb = persistentdcb->nextpersistent; - if (cleanall - || persistentdcb-> dcb_errhandle_called - || count >= server->persistpoolmax + if (cleanall + || persistentdcb-> dcb_errhandle_called + || count >= server->persistpoolmax || persistentdcb->server == NULL || !(persistentdcb->server->status & SERVER_RUNNING) || (time(NULL) - persistentdcb->persistentstart) > server->persistmaxtime) @@ -2755,7 +2860,7 @@ dcb_persistent_clean_count(DCB *dcb, bool cleanall) disposals = persistentdcb; atomic_add(&server->stats.n_persistent, -1); } - else + else { count++; previousdcb = persistentdcb; @@ -2783,50 +2888,60 @@ dcb_persistent_clean_count(DCB *dcb, bool cleanall) /** * Return DCB counts optionally filtered by usage * - * @param usage The usage of the DCB - * @return A count of DCBs in the desired state + * @param usage The usage of the DCB + * @return A count of DCBs in the desired state */ int dcb_count_by_usage(DCB_USAGE usage) { -int rval = 0; -DCB *ptr; + int rval = 0; + DCB *ptr; - spinlock_acquire(&dcbspin); - ptr = allDCBs; - while (ptr) - { - switch (usage) - { - case DCB_USAGE_CLIENT: - if (dcb_isclient(ptr)) - rval++; - break; - case DCB_USAGE_LISTENER: - if (ptr->state == DCB_STATE_LISTENING) - rval++; - break; - case DCB_USAGE_BACKEND: - if (dcb_isclient(ptr) == 0 - && ptr->dcb_role == DCB_ROLE_REQUEST_HANDLER) - rval++; - break; - case DCB_USAGE_INTERNAL: - if (ptr->dcb_role == DCB_ROLE_REQUEST_HANDLER) - rval++; - break; - case DCB_USAGE_ZOMBIE: - if (DCB_ISZOMBIE(ptr)) - rval++; - break; - case DCB_USAGE_ALL: - rval++; - break; - } - ptr = ptr->next; - } - spinlock_release(&dcbspin); - return rval; + spinlock_acquire(&dcbspin); + ptr = allDCBs; + while (ptr) + { + switch (usage) + { + case DCB_USAGE_CLIENT: + if (dcb_isclient(ptr)) + { + rval++; + } + break; + case DCB_USAGE_LISTENER: + if (ptr->state == DCB_STATE_LISTENING) + { + rval++; + } + break; + case DCB_USAGE_BACKEND: + if (dcb_isclient(ptr) == 0 + && ptr->dcb_role == DCB_ROLE_REQUEST_HANDLER) + { + rval++; + } + break; + case DCB_USAGE_INTERNAL: + if (ptr->dcb_role == DCB_ROLE_REQUEST_HANDLER) + { + rval++; + } + break; + case DCB_USAGE_ZOMBIE: + if (DCB_ISZOMBIE(ptr)) + { + rval++; + } + break; + case DCB_USAGE_ALL: + rval++; + break; + } + ptr = ptr->next; + } + spinlock_release(&dcbspin); + return rval; } /** @@ -2835,26 +2950,25 @@ DCB *ptr; * should be the service's context * @param dcb * @param context - * @return + * @return */ int dcb_create_SSL(DCB* dcb) { - - if(serviceInitSSL(dcb->service) != 0) + if (serviceInitSSL(dcb->service) != 0) { - return -1; + return -1; } - if((dcb->ssl = SSL_new(dcb->service->ctx)) == NULL) + if ((dcb->ssl = SSL_new(dcb->service->ctx)) == NULL) { - MXS_ERROR("Failed to initialize SSL for connection."); - return -1; + MXS_ERROR("Failed to initialize SSL for connection."); + return -1; } - if(SSL_set_fd(dcb->ssl,dcb->fd) == 0) + if (SSL_set_fd(dcb->ssl,dcb->fd) == 0) { - MXS_ERROR("Failed to set file descriptor for SSL connection."); - return -1; + MXS_ERROR("Failed to set file descriptor for SSL connection."); + return -1; } return 0; @@ -2880,78 +2994,78 @@ int dcb_accept_SSL(DCB* dcb) do { - ssl_rval = SSL_accept(dcb->ssl); + ssl_rval = SSL_accept(dcb->ssl); - MXS_DEBUG("[dcb_accept_SSL] SSL_accept %d, error %d", ssl_rval,ssl_errnum); - switch(ssl_rval) - { - case 0: - ssl_errnum = SSL_get_error(dcb->ssl,ssl_rval); - MXS_ERROR("SSL authentication failed (SSL error %d):", ssl_errnum); + MXS_DEBUG("[dcb_accept_SSL] SSL_accept %d, error %d", ssl_rval,ssl_errnum); + switch(ssl_rval) + { + case 0: + ssl_errnum = SSL_get_error(dcb->ssl,ssl_rval); + MXS_ERROR("SSL authentication failed (SSL error %d):", ssl_errnum); - if(ssl_errnum == SSL_ERROR_SSL || - ssl_errnum == SSL_ERROR_SYSCALL) - { - while((err_errnum = ERR_get_error()) != 0) - { - ERR_error_string_n(err_errnum,errbuf,sizeof(errbuf)); - MXS_ERROR("%s", errbuf); - } - } - rval = -1; - break; - case 1: - rval = 1; - MXS_DEBUG("[dcb_accept_SSL] SSL_accept done for %s", dcb->remote); - return rval; + if (ssl_errnum == SSL_ERROR_SSL || + ssl_errnum == SSL_ERROR_SYSCALL) + { + while ((err_errnum = ERR_get_error()) != 0) + { + ERR_error_string_n(err_errnum,errbuf,sizeof(errbuf)); + MXS_ERROR("%s", errbuf); + } + } + rval = -1; + break; + case 1: + rval = 1; + MXS_DEBUG("[dcb_accept_SSL] SSL_accept done for %s", dcb->remote); + return rval; - case -1: + case -1: + ssl_errnum = SSL_get_error(dcb->ssl,ssl_rval); - ssl_errnum = SSL_get_error(dcb->ssl,ssl_rval); - - if(ssl_errnum == SSL_ERROR_WANT_READ || ssl_errnum == SSL_ERROR_WANT_WRITE) - { - /** Not all of the data has been read. Go back to the poll - queue and wait for more.*/ - rval = 0; - MXS_DEBUG("[dcb_accept_SSL] SSL_accept ongoing for %s", dcb->remote); - return rval; - } - else - { - rval = -1; - MXS_ERROR("Fatal error in SSL_accept for %s: (SSL version: %s SSL error code: %d)", + if (ssl_errnum == SSL_ERROR_WANT_READ || ssl_errnum == SSL_ERROR_WANT_WRITE) + { + /** Not all of the data has been read. Go back to the poll + queue and wait for more.*/ + rval = 0; + MXS_DEBUG("[dcb_accept_SSL] SSL_accept ongoing for %s", dcb->remote); + return rval; + } + else + { + rval = -1; + MXS_ERROR("Fatal error in SSL_accept for %s: (SSL version: %s SSL error code: %d)", dcb->remote, SSL_get_version(dcb->ssl), ssl_errnum); - if(ssl_errnum == SSL_ERROR_SSL || - ssl_errnum == SSL_ERROR_SYSCALL) - { - while((err_errnum = ERR_get_error()) != 0) - { - ERR_error_string_n(err_errnum,errbuf,sizeof(errbuf)); - MXS_ERROR("%s", errbuf); - } - if(errno) - { - MXS_ERROR("SSL authentication failed due to system" - " error %d: %s", errno, strerror_r(errno, errbuf, sizeof(errbuf))); + if (ssl_errnum == SSL_ERROR_SSL || + ssl_errnum == SSL_ERROR_SYSCALL) + { + while ((err_errnum = ERR_get_error()) != 0) + { + ERR_error_string_n(err_errnum,errbuf,sizeof(errbuf)); + MXS_ERROR("%s", errbuf); + } + if (errno) + { + MXS_ERROR("SSL authentication failed due to system" + " error %d: %s", errno, strerror_r(errno, errbuf, sizeof(errbuf))); + } + } } - } - } - break; + break; - default: - MXS_ERROR("Fatal library error in SSL_accept, returned value was %d.", ssl_rval); - rval = -1; - break; - } - ioctl(fd,FIONREAD,&b); - pending = SSL_pending(dcb->ssl); + default: + MXS_ERROR("Fatal library error in SSL_accept, returned value was %d.", ssl_rval); + rval = -1; + break; + } + ioctl(fd,FIONREAD,&b); + pending = SSL_pending(dcb->ssl); #ifdef SS_DEBUG MXS_DEBUG("[dcb_accept_SSL] fd %d: %d bytes, %d pending", fd, b, pending); #endif - }while((b > 0 || pending > 0) && rval != -1); + } + while ((b > 0 || pending > 0) && rval != -1); return rval; } @@ -2974,47 +3088,47 @@ int dcb_connect_SSL(DCB* dcb) switch(rval) { case 0: - errnum = SSL_get_error(dcb->ssl,rval); - MXS_DEBUG("SSL_connect shutdown for %s@%s", + errnum = SSL_get_error(dcb->ssl,rval); + MXS_DEBUG("SSL_connect shutdown for %s@%s", dcb->user, dcb->remote); - return -1; - break; + return -1; + break; case 1: - rval = 1; + rval = 1; MXS_DEBUG("SSL_connect done for %s@%s", dcb->user, dcb->remote); - return rval; + return rval; case -1: - errnum = SSL_get_error(dcb->ssl,rval); + errnum = SSL_get_error(dcb->ssl,rval); - if(errnum == SSL_ERROR_WANT_READ || errnum == SSL_ERROR_WANT_WRITE) - { - /** Not all of the data has been read. Go back to the poll - queue and wait for more.*/ + if (errnum == SSL_ERROR_WANT_READ || errnum == SSL_ERROR_WANT_WRITE) + { + /** Not all of the data has been read. Go back to the poll + queue and wait for more.*/ - rval = 0; - MXS_DEBUG("SSL_connect ongoing for %s@%s", + rval = 0; + MXS_DEBUG("SSL_connect ongoing for %s@%s", dcb->user, dcb->remote); - } - else - { - rval = -1; - ERR_error_string_n(errnum,errbuf,sizeof(errbuf)); - MXS_ERROR("Fatal error in SSL_accept for %s@%s: (SSL error code: %d) %s", + } + else + { + rval = -1; + ERR_error_string_n(errnum,errbuf,sizeof(errbuf)); + MXS_ERROR("Fatal error in SSL_accept for %s@%s: (SSL error code: %d) %s", dcb->user, dcb->remote, errnum, errbuf); - } - break; + } + break; default: - MXS_ERROR("Fatal error in SSL_connect, returned value was %d.", rval); - break; + MXS_ERROR("Fatal error in SSL_connect, returned value was %d.", rval); + break; } return rval; @@ -3030,21 +3144,29 @@ int dcb_connect_SSL(DCB* dcb) char * dcb_role_name(DCB *dcb) { -char *name = NULL; + char *name = NULL; - if (NULL != (name = (char *)malloc(64))) + if (NULL != (name = (char *)malloc(64))) + { + name[0] = 0; + if (DCB_ROLE_SERVICE_LISTENER == dcb->dcb_role) { - name[0] = 0; - if (DCB_ROLE_SERVICE_LISTENER == dcb->dcb_role) - strcat(name, "Service Listener"); - else if (DCB_ROLE_REQUEST_HANDLER == dcb->dcb_role) - strcat(name, "Request Handler"); - else if (DCB_ROLE_INTERNAL == dcb->dcb_role) - strcat(name, "Internal"); - else - strcat(name, "Unknown"); + strcat(name, "Service Listener"); } - return name; + else if (DCB_ROLE_REQUEST_HANDLER == dcb->dcb_role) + { + strcat(name, "Request Handler"); + } + else if (DCB_ROLE_INTERNAL == dcb->dcb_role) + { + strcat(name, "Internal"); + } + else + { + strcat(name, "Unknown"); + } + } + return name; } /** diff --git a/server/include/dcb.h b/server/include/dcb.h index 22e8d21aa..aca470b73 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -34,7 +34,7 @@ struct server; struct service; /** - * @file dcb.h The Descriptor Control Block + * @file dcb.h The Descriptor Control Block * * The function pointer table used by descriptors to call relevant functions * within the protocol specific code. @@ -42,88 +42,90 @@ struct service; * @verbatim * Revision History * - * Date Who Description - * 01/06/2013 Mark Riddoch Initial implementation - * 11/06/2013 Mark Riddoch Updated GWPROTOCOL structure with new - * entry points - * 18/06/2013 Mark Riddoch Addition of the listener entry point - * 02/07/2013 Massimiliano Pinto Addition of delayqlock, delayq and authlock - * for handling backend asynchronous protocol connection - * and a generic lock for backend authentication - * 12/07/2013 Massimiliano Pinto Added auth entry point - * 15/07/2013 Massimiliano Pinto Added session entry point - * 16/07/2013 Massimiliano Pinto Added command type for dcb - * 07/02/2014 Massimiliano Pinto Added ipv4 data struct into for dcb - * 07/05/2014 Mark Riddoch Addition of callback mechanism - * 08/05/2014 Mark Riddoch Addition of writeq high and low watermarks - * 27/08/2014 Mark Riddoch Addition of write event queuing - * 23/09/2014 Mark Riddoch New poll processing queue - * 19/06/2015 Martin Brampton Provision of persistent connections + * Date Who Description + * 01/06/2013 Mark Riddoch Initial implementation + * 11/06/2013 Mark Riddoch Updated GWPROTOCOL structure with new + * entry points + * 18/06/2013 Mark Riddoch Addition of the listener entry point + * 02/07/2013 Massimiliano Pinto Addition of delayqlock, delayq and authlock + * for handling backend asynchronous protocol connection + * and a generic lock for backend authentication + * 12/07/2013 Massimiliano Pinto Added auth entry point + * 15/07/2013 Massimiliano Pinto Added session entry point + * 16/07/2013 Massimiliano Pinto Added command type for dcb + * 07/02/2014 Massimiliano Pinto Added ipv4 data struct into for dcb + * 07/05/2014 Mark Riddoch Addition of callback mechanism + * 08/05/2014 Mark Riddoch Addition of writeq high and low watermarks + * 27/08/2014 Mark Riddoch Addition of write event queuing + * 23/09/2014 Mark Riddoch New poll processing queue + * 19/06/2015 Martin Brampton Provision of persistent connections * * @endverbatim */ struct dcb; - /** - * @verbatim - * The operations that can be performed on the descriptor - * - * read EPOLLIN handler for the socket - * write MaxScale data write entry point - * write_ready EPOLLOUT handler for the socket, indicates - * that the socket is ready to send more data - * error EPOLLERR handler for the socket - * hangup EPOLLHUP handler for the socket - * accept Accept handler for listener socket only - * connect Create a connection to the specified server - * for the session pased in - * close MaxScale close entry point for the socket - * listen Create a listener for the protocol - * auth Authentication entry point - * session Session handling entry point - * @endverbatim - * - * This forms the "module object" for protocol modules within the gateway. - * - * @see load_module - */ -typedef struct gw_protocol { - int (*read)(struct dcb *); - int (*write)(struct dcb *, GWBUF *); - int (*write_ready)(struct dcb *); - int (*error)(struct dcb *); - int (*hangup)(struct dcb *); - int (*accept)(struct dcb *); - int (*connect)(struct dcb *, struct server *, struct session *); - int (*close)(struct dcb *); - int (*listen)(struct dcb *, char *); - int (*auth)(struct dcb *, struct server *, struct session *, GWBUF *); - int (*session)(struct dcb *, void *); +/** + * @verbatim + * The operations that can be performed on the descriptor + * + * read EPOLLIN handler for the socket + * write MaxScale data write entry point + * write_ready EPOLLOUT handler for the socket, indicates + * that the socket is ready to send more data + * error EPOLLERR handler for the socket + * hangup EPOLLHUP handler for the socket + * accept Accept handler for listener socket only + * connect Create a connection to the specified server + * for the session pased in + * close MaxScale close entry point for the socket + * listen Create a listener for the protocol + * auth Authentication entry point + * session Session handling entry point + * @endverbatim + * + * This forms the "module object" for protocol modules within the gateway. + * + * @see load_module + */ +typedef struct gw_protocol +{ + int (*read)(struct dcb *); + int (*write)(struct dcb *, GWBUF *); + int (*write_ready)(struct dcb *); + int (*error)(struct dcb *); + int (*hangup)(struct dcb *); + int (*accept)(struct dcb *); + int (*connect)(struct dcb *, struct server *, struct session *); + int (*close)(struct dcb *); + int (*listen)(struct dcb *, char *); + int (*auth)(struct dcb *, struct server *, struct session *, GWBUF *); + int (*session)(struct dcb *, void *); } GWPROTOCOL; /** * The event queue structure used in the polling loop to maintain a queue * of events that need to be processed for the DCB. * - * next The next DCB in the event queue - * prev The previous DCB in the event queue - * pending_events The events that are pending processing - * processing_events The evets currently being processed - * processing Flag to indicate the processing status of the DCB - * eventqlock Spinlock to protect this structure - * inserted Insertion time for logging purposes - * started Time that the processign started + * next The next DCB in the event queue + * prev The previous DCB in the event queue + * pending_events The events that are pending processing + * processing_events The evets currently being processed + * processing Flag to indicate the processing status of the DCB + * eventqlock Spinlock to protect this structure + * inserted Insertion time for logging purposes + * started Time that the processign started */ -typedef struct { - struct dcb *next; - struct dcb *prev; - uint32_t pending_events; - uint32_t processing_events; - int processing; - SPINLOCK eventqlock; - unsigned long inserted; - unsigned long started; +typedef struct +{ + struct dcb *next; + struct dcb *prev; + uint32_t pending_events; + uint32_t processing_events; + int processing; + SPINLOCK eventqlock; + unsigned long inserted; + unsigned long started; } DCBEVENTQ; /** @@ -131,20 +133,21 @@ typedef struct { * the GWPROTOCOL structure is changed. See the rules defined in modinfo.h * that define how these numbers should change. */ -#define GWPROTOCOL_VERSION {1, 0, 0} +#define GWPROTOCOL_VERSION {1, 0, 0} #define DCBFD_CLOSED -1 /** * The statistics gathered on a descriptor control block */ -typedef struct dcbstats { - int n_reads; /*< Number of reads on this descriptor */ - int n_writes; /*< Number of writes on this descriptor */ - int n_accepts; /*< Number of accepts on this descriptor */ - int n_buffered; /*< Number of buffered writes */ - int n_high_water; /*< Number of crosses of high water mark */ - int n_low_water; /*< Number of crosses of low water mark */ +typedef struct dcbstats +{ + int n_reads; /*< Number of reads on this descriptor */ + int n_writes; /*< Number of writes on this descriptor */ + int n_accepts; /*< Number of accepts on this descriptor */ + int n_buffered; /*< Number of buffered writes */ + int n_high_water; /*< Number of crosses of high water mark */ + int n_low_water; /*< Number of crosses of low water mark */ } DCBSTATS; /** @@ -165,50 +168,54 @@ typedef struct dcbstats { * will clear the bit value that corresponds to the calling thread. Once the bitmask * is completely cleared the DCB can finally be freed and removed from the zombie list. */ -typedef struct { - GWBITMASK bitmask; /*< The bitmask of threads */ - struct dcb *next; /*< Next pointer for the zombie list */ +typedef struct +{ + GWBITMASK bitmask; /*< The bitmask of threads */ + struct dcb *next; /*< Next pointer for the zombie list */ } DCBMM; /* DCB states */ -typedef enum { - DCB_STATE_UNDEFINED, /*< State variable with no state */ - DCB_STATE_ALLOC, /*< Memory allocated but not populated */ - DCB_STATE_POLLING, /*< Waiting in the poll loop */ - DCB_STATE_LISTENING, /*< The DCB is for a listening socket */ - DCB_STATE_DISCONNECTED, /*< The socket is now closed */ - DCB_STATE_NOPOLLING, /*< Removed from poll mask */ - DCB_STATE_ZOMBIE, /*< DCB is no longer active, waiting to free it */ +typedef enum +{ + DCB_STATE_UNDEFINED, /*< State variable with no state */ + DCB_STATE_ALLOC, /*< Memory allocated but not populated */ + DCB_STATE_POLLING, /*< Waiting in the poll loop */ + DCB_STATE_LISTENING, /*< The DCB is for a listening socket */ + DCB_STATE_DISCONNECTED, /*< The socket is now closed */ + DCB_STATE_NOPOLLING, /*< Removed from poll mask */ + DCB_STATE_ZOMBIE, /*< DCB is no longer active, waiting to free it */ } dcb_state_t; -typedef enum { - DCB_ROLE_SERVICE_LISTENER, /*< Receives initial connect requests from clients */ - DCB_ROLE_REQUEST_HANDLER, /*< Serves dedicated client */ - DCB_ROLE_INTERNAL /*< Internal DCB not connected to the outside */ +typedef enum +{ + DCB_ROLE_SERVICE_LISTENER, /*< Receives initial connect requests from clients */ + DCB_ROLE_REQUEST_HANDLER, /*< Serves dedicated client */ + DCB_ROLE_INTERNAL /*< Internal DCB not connected to the outside */ } dcb_role_t; /** * Callback reasons for the DCB callback mechanism. */ -typedef enum { - DCB_REASON_CLOSE, /*< The DCB is closing */ - DCB_REASON_DRAINED, /*< The write delay queue has drained */ - DCB_REASON_HIGH_WATER, /*< Cross high water mark */ - DCB_REASON_LOW_WATER, /*< Cross low water mark */ - DCB_REASON_ERROR, /*< An error was flagged on the connection */ - DCB_REASON_HUP, /*< A hangup was detected */ - DCB_REASON_NOT_RESPONDING /*< Server connection was lost */ +typedef enum +{ + DCB_REASON_CLOSE, /*< The DCB is closing */ + DCB_REASON_DRAINED, /*< The write delay queue has drained */ + DCB_REASON_HIGH_WATER, /*< Cross high water mark */ + DCB_REASON_LOW_WATER, /*< Cross low water mark */ + DCB_REASON_ERROR, /*< An error was flagged on the connection */ + DCB_REASON_HUP, /*< A hangup was detected */ + DCB_REASON_NOT_RESPONDING /*< Server connection was lost */ } DCB_REASON; /** * Callback structure - used to track callbacks registered on a DCB */ -typedef struct dcb_callback { - DCB_REASON reason; /*< The reason for the callback */ - int (*cb)(struct dcb *dcb, DCB_REASON reason, void *userdata); - void *userdata; /*< User data to be sent in the callback */ - struct dcb_callback - *next; /*< Next callback for this DCB */ +typedef struct dcb_callback +{ + DCB_REASON reason; /*< The reason for the callback */ + int (*cb)(struct dcb *dcb, DCB_REASON reason, void *userdata); + void *userdata; /*< User data to be sent in the callback */ + struct dcb_callback *next; /*< Next callback for this DCB */ } DCB_CALLBACK; @@ -223,53 +230,54 @@ typedef struct dcb_callback { * It is important to hold the state information here such that any thread within the * gateway may be selected to execute the required actions when a network event occurs. */ -typedef struct dcb { +typedef struct dcb +{ skygw_chk_t dcb_chk_top; - bool dcb_errhandle_called; /*< this can be called only once */ - bool dcb_is_zombie; /**< Whether the DCB is in the zombie list */ - dcb_role_t dcb_role; + bool dcb_errhandle_called; /*< this can be called only once */ + bool dcb_is_zombie; /**< Whether the DCB is in the zombie list */ + dcb_role_t dcb_role; SPINLOCK dcb_initlock; - DCBEVENTQ evq; /**< The event queue for this DCB */ - int fd; /**< The descriptor */ - dcb_state_t state; /**< Current descriptor state */ - int flags; /**< DCB flags */ - char *remote; /**< Address of remote end */ - char *user; /**< User name for connection */ - struct sockaddr_in ipv4; /**< remote end IPv4 address */ + DCBEVENTQ evq; /**< The event queue for this DCB */ + int fd; /**< The descriptor */ + dcb_state_t state; /**< Current descriptor state */ + int flags; /**< DCB flags */ + char *remote; /**< Address of remote end */ + char *user; /**< User name for connection */ + struct sockaddr_in ipv4; /**< remote end IPv4 address */ char *protoname; /**< Name of the protocol */ - void *protocol; /**< The protocol specific state */ - struct session *session; /**< The owning session */ - GWPROTOCOL func; /**< The functions for this descriptor */ + void *protocol; /**< The protocol specific state */ + struct session *session; /**< The owning session */ + GWPROTOCOL func; /**< The functions for this descriptor */ - int writeqlen; /**< Current number of byes in the write queue */ - SPINLOCK writeqlock; /**< Write Queue spinlock */ - GWBUF *writeq; /**< Write Data Queue */ - SPINLOCK delayqlock; /**< Delay Backend Write Queue spinlock */ - GWBUF *delayq; /**< Delay Backend Write Data Queue */ - GWBUF *dcb_readqueue; /**< read queue for storing incomplete reads */ - SPINLOCK authlock; /**< Generic Authorization spinlock */ + int writeqlen; /**< Current number of byes in the write queue */ + SPINLOCK writeqlock; /**< Write Queue spinlock */ + GWBUF *writeq; /**< Write Data Queue */ + SPINLOCK delayqlock; /**< Delay Backend Write Queue spinlock */ + GWBUF *delayq; /**< Delay Backend Write Data Queue */ + GWBUF *dcb_readqueue; /**< read queue for storing incomplete reads */ + SPINLOCK authlock; /**< Generic Authorization spinlock */ - DCBSTATS stats; /**< DCB related statistics */ + DCBSTATS stats; /**< DCB related statistics */ unsigned int dcb_server_status; /*< the server role indicator from SERVER */ - struct dcb *next; /**< Next DCB in the chain of allocated DCB's */ + struct dcb *next; /**< Next DCB in the chain of allocated DCB's */ struct dcb *nextpersistent; /**< Next DCB in the persistent pool for SERVER */ time_t persistentstart; /**< Time when DCB placed in persistent pool */ - struct service *service; /**< The related service */ - void *data; /**< Specific client data */ - DCBMM memdata; /**< The data related to DCB memory management */ - SPINLOCK cb_lock; /**< The lock for the callbacks linked list */ - DCB_CALLBACK *callbacks; /**< The list of callbacks for the DCB */ - SPINLOCK pollinlock; - int pollinbusy; - int readcheck; + struct service *service; /**< The related service */ + void *data; /**< Specific client data */ + DCBMM memdata; /**< The data related to DCB memory management */ + SPINLOCK cb_lock; /**< The lock for the callbacks linked list */ + DCB_CALLBACK *callbacks; /**< The list of callbacks for the DCB */ + SPINLOCK pollinlock; + int pollinbusy; + int readcheck; - SPINLOCK polloutlock; - int polloutbusy; - int writecheck; + SPINLOCK polloutlock; + int polloutbusy; + int writecheck; unsigned long last_read; /*< Last time the DCB received data */ - unsigned int high_water; /**< High water mark */ - unsigned int low_water; /**< Low water mark */ - struct server *server; /**< The associated backend server */ + unsigned int high_water; /**< High water mark */ + unsigned int low_water; /**< Low water mark */ + struct server *server; /**< The associated backend server */ SSL* ssl; /*< SSL struct for connection */ int dcb_port; /**< port of target server */ skygw_chk_t dcb_chk_tail; @@ -278,13 +286,14 @@ typedef struct dcb { /** * The DCB usage filer used for returning DCB's in use for a certain reason */ -typedef enum { - DCB_USAGE_CLIENT, - DCB_USAGE_LISTENER, - DCB_USAGE_BACKEND, - DCB_USAGE_INTERNAL, - DCB_USAGE_ZOMBIE, - DCB_USAGE_ALL +typedef enum +{ + DCB_USAGE_CLIENT, + DCB_USAGE_LISTENER, + DCB_USAGE_BACKEND, + DCB_USAGE_INTERNAL, + DCB_USAGE_ZOMBIE, + DCB_USAGE_ALL } DCB_USAGE; #if defined(FAKE_CODE) @@ -297,52 +306,50 @@ int fail_accept_errno; #endif /* FAKE_CODE */ /* A few useful macros */ -#define DCB_SESSION(x) (x)->session -#define DCB_PROTOCOL(x, type) (type *)((x)->protocol) -#define DCB_ISZOMBIE(x) ((x)->state == DCB_STATE_ZOMBIE) -#define DCB_WRITEQLEN(x) (x)->writeqlen -#define DCB_SET_LOW_WATER(x, lo) (x)->low_water = (lo); -#define DCB_SET_HIGH_WATER(x, hi) (x)->low_water = (hi); -#define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water) -#define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water) +#define DCB_SESSION(x) (x)->session +#define DCB_PROTOCOL(x, type) (type *)((x)->protocol) +#define DCB_ISZOMBIE(x) ((x)->state == DCB_STATE_ZOMBIE) +#define DCB_WRITEQLEN(x) (x)->writeqlen +#define DCB_SET_LOW_WATER(x, lo) (x)->low_water = (lo); +#define DCB_SET_HIGH_WATER(x, hi) (x)->low_water = (hi); +#define DCB_BELOW_LOW_WATER(x) ((x)->low_water && (x)->writeqlen < (x)->low_water) +#define DCB_ABOVE_HIGH_WATER(x) ((x)->high_water && (x)->writeqlen > (x)->high_water) -#define DCB_POLL_BUSY(x) ((x)->evq.next != NULL) +#define DCB_POLL_BUSY(x) ((x)->evq.next != NULL) -DCB *dcb_get_zombies(void); -int gw_write(DCB *, const void *, size_t); -int dcb_write(DCB *, GWBUF *); -DCB *dcb_alloc(dcb_role_t); -void dcb_free(DCB *); -DCB *dcb_connect(struct server *, struct session *, const char *); -DCB *dcb_clone(DCB *); -int dcb_read(DCB *, GWBUF **, int); -int dcb_drain_writeq(DCB *); -void dcb_close(DCB *); -DCB *dcb_process_zombies(int); /* Process Zombies except the one behind the pointer */ -void printAllDCBs(); /* Debug to print all DCB in the system */ -void printDCB(DCB *); /* Debug print routine */ -void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */ -void dprintOneDCB(DCB *, DCB *); /* Debug to print one DCB */ -void dprintDCB(DCB *, DCB *); /* Debug to print a DCB in the system */ -void dListDCBs(DCB *); /* List all DCBs in the system */ -void dListClients(DCB *); /* List al the client DCBs */ -const char *gw_dcb_state2string(int); /* DCB state to string */ -void dcb_printf(DCB *, const char *, ...); /* DCB version of printf */ -int dcb_isclient(DCB *); /* the DCB is the client of the session */ -void dcb_hashtable_stats(DCB *, void *); /**< Print statisitics */ -int dcb_add_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *), - void *); -int dcb_remove_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *), - void *); -int dcb_isvalid(DCB *); /* Check the DCB is in the linked list */ -int dcb_count_by_usage(DCB_USAGE); /* Return counts of DCBs */ -int dcb_persistent_clean_count(DCB *, bool); /* Clean persistent and return count */ +DCB *dcb_get_zombies(void); +int gw_write(DCB *, const void *, size_t); +int dcb_write(DCB *, GWBUF *); +DCB *dcb_alloc(dcb_role_t); +void dcb_free(DCB *); +DCB *dcb_connect(struct server *, struct session *, const char *); +DCB *dcb_clone(DCB *); +int dcb_read(DCB *, GWBUF **, int); +int dcb_drain_writeq(DCB *); +void dcb_close(DCB *); +DCB *dcb_process_zombies(int); /* Process Zombies except the one behind the pointer */ +void printAllDCBs(); /* Debug to print all DCB in the system */ +void printDCB(DCB *); /* Debug print routine */ +void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */ +void dprintOneDCB(DCB *, DCB *); /* Debug to print one DCB */ +void dprintDCB(DCB *, DCB *); /* Debug to print a DCB in the system */ +void dListDCBs(DCB *); /* List all DCBs in the system */ +void dListClients(DCB *); /* List al the client DCBs */ +const char *gw_dcb_state2string(int); /* DCB state to string */ +void dcb_printf(DCB *, const char *, ...); /* DCB version of printf */ +int dcb_isclient(DCB *); /* the DCB is the client of the session */ +void dcb_hashtable_stats(DCB *, void *); /**< Print statisitics */ +int dcb_add_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *), void *); +int dcb_remove_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *), void *); +int dcb_isvalid(DCB *); /* Check the DCB is in the linked list */ +int dcb_count_by_usage(DCB_USAGE); /* Return counts of DCBs */ +int dcb_persistent_clean_count(DCB *, bool); /* Clean persistent and return count */ -void dcb_call_foreach (struct server* server, DCB_REASON reason); -void dcb_hangup_foreach (struct server* server); +void dcb_call_foreach (struct server* server, DCB_REASON reason); +void dcb_hangup_foreach (struct server* server); size_t dcb_get_session_id(DCB* dcb); -bool dcb_get_ses_log_info(DCB* dcb, size_t* sesid, int* enabled_logs); -char *dcb_role_name(DCB *); /* Return the name of a role */ +bool dcb_get_ses_log_info(DCB* dcb, size_t* sesid, int* enabled_logs); +char *dcb_role_name(DCB *); /* Return the name of a role */ int dcb_create_SSL(DCB* dcb); int dcb_accept_SSL(DCB* dcb); int dcb_connect_SSL(DCB* dcb); @@ -355,9 +362,9 @@ int dcb_drain_writeq_SSL(DCB *dcb); /** * DCB flags values */ -#define DCBF_CLONE 0x0001 /*< DCB is a clone */ -#define DCBF_HUNG 0x0002 /*< Hangup has been dispatched */ -#define DCBF_REPLIED 0x0004 /*< DCB was written to */ +#define DCBF_CLONE 0x0001 /*< DCB is a clone */ +#define DCBF_HUNG 0x0002 /*< Hangup has been dispatched */ +#define DCBF_REPLIED 0x0004 /*< DCB was written to */ #define DCB_IS_CLONE(d) ((d)->flags & DCBF_CLONE) #define DCB_REPLIED(d) ((d)->flags & DCBF_REPLIED)