From 037055d33ed9a9a1732d2aedc8d99f81ec19cc41 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Wed, 12 Jun 2013 08:07:50 +0100 Subject: [PATCH 1/4] Added clean rule --- core/Makefile | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/Makefile b/core/Makefile index 2ba75ed5e..d6483b832 100644 --- a/core/Makefile +++ b/core/Makefile @@ -25,3 +25,6 @@ gateway: $(OBJ) .c.o: $(CC) $(CFLAGS) $< -o $@ + +clean: + rm -f $(OBJ) gateway From 329a70eccd7b216b69139a937778795c6ef29662 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Wed, 12 Jun 2013 10:01:02 +0100 Subject: [PATCH 2/4] Addition of example buffer handling --- core/Makefile | 3 +- core/gw_utils.c | 18 ---- core/utils.c | 219 ++++++++++++++++++++++-------------------------- include/dcb.h | 23 ++--- 4 files changed, 115 insertions(+), 148 deletions(-) diff --git a/core/Makefile b/core/Makefile index d6483b832..5cb944814 100644 --- a/core/Makefile +++ b/core/Makefile @@ -16,7 +16,8 @@ CC=cc CFLAGS=-c -I/usr/include -I../include -SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c gw_utils.c utils.c +SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c gw_utils.c \ + utils.c dcb.c OBJ=$(SRCS:.c=.o) LIBS=-lssl diff --git a/core/gw_utils.c b/core/gw_utils.c index 40415dba0..705d5c9d7 100644 --- a/core/gw_utils.c +++ b/core/gw_utils.c @@ -191,21 +191,3 @@ int do_read_buffer(DCB *dcb, uint8_t *buffer) { return n; } -const char *gw_dcb_state2string (int state) { - switch(state) { - case DCB_STATE_ALLOC: - return "DCB Allocated"; - case DCB_STATE_IDLE: - return "DCB not yet in polling"; - case DCB_STATE_POLLING: - return "DCB in the EPOLL"; - case DCB_STATE_PROCESSING: - return "DCB processing event"; - case DCB_STATE_LISTENING: - return "DCB for listening socket"; - case DCB_STATE_DISCONNECTED: - return "DCB socket closed"; - default: - return "DCB (unknown)"; - } -} diff --git a/core/utils.c b/core/utils.c index 1ae63e9fe..d8e6d4a66 100644 --- a/core/utils.c +++ b/core/utils.c @@ -63,10 +63,10 @@ int gw_read_backend_event(DCB *dcb, int epfd) { int w; int count_reads = 0; int count_writes = 0; - uint8_t buffer[MAX_BUFFER_SIZE]; int b = -1; int tot_b = -1; uint8_t *ptr_buffer; + GWBUF *buffer, *head; if (ioctl(dcb->fd, FIONREAD, &b)) { fprintf(stderr, "Backend Ioctl FIONREAD error %i, %s\n", errno , strerror(errno)); @@ -74,90 +74,85 @@ int gw_read_backend_event(DCB *dcb, int epfd) { fprintf(stderr, "Backend IOCTL FIONREAD bytes to read = %i\n", b); } - // detect pending data in the buffer for client write - if (dcb->session->client->buff_bytes > 0) { - - fprintf(stderr, "*********** Read backend says there are pending writes for %i bytes\n", dcb->session->client->buff_bytes); - // read data from socket, assume no error here (quick) and put it into the DCB second buffer - GW_NOINTR_CALL(n = read(dcb->fd, buffer, b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE); count_reads++); - - memcpy(&dcb->session->client->second_buff_bytes, &n, sizeof(int)); - fprintf(stderr, "#### second buff_bytes set to %i\n", dcb->session->client->second_buff_bytes); - - memcpy(dcb->session->client->second_buffer, buffer, n); - fprintf(stderr, "#### filled memory second buffer!\n"); - dcb->session->client->second_buffer_ptr = dcb->session->client->second_buffer; - - return 1; - } - - // else, no pending data - tot_b = b; - - // read all the data, without using multiple buffers, only one - while (b > 0) { - - GW_NOINTR_CALL(n = read(dcb->fd, buffer, b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE); count_reads++); - - fprintf (stderr, "Read %i/%i bytes done, n. reads = %i, %i. Next could be write\n", n, b, count_reads, errno); - - if (n < 0) { - if ((errno != EAGAIN) || (errno != EWOULDBLOCK)) - return 0; - else - return 1; - } else { - b = b - n; + /* + * Read all the data that is available into a chain of buffers + */ + head = NULL; + while (b > 0) + { + int bufsize = b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE; + if ((buffer = gwbuf_alloc(bufsize)) == NULL) + { + /* Bad news, we have run out of memory */ + return 0; } + GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); count_reads++); + head = gwbuf_append(head, buffer); } - ptr_buffer = buffer; - - if(n >2) - fprintf(stderr, ">>> The READ BUFFER last 2 byte [%i][%i]\n", buffer[n-2], buffer[n-1]); + /* + ** This next bit really belongs in the write function for the client DCB + ** It is here now just to illustrate the use of the buffers + */ + { + DCB *client = dcb->session->client; + int saved_errno = 0; + spinlock_acquire(&client->writeqlock); + if (client->writeq) + { + /* + * We have some queued data, so add our data to + * the write queue and return. + * The assumption is that there will be an EPOLLOUT + * event to drain what is already queued. We are protected + * by the spinlock, which will also be acquired by the + * the routine that drains the queue data, so we should + * not have a race condition on the event. + */ + client->writeq = gwbuf_append(client->writeq, head); + } + else + { + int len; - // write all the data - // if write fails for EAGAIN or EWOULDBLOCK, copy the data into the dcb buffer - while (n >0) { - GW_NOINTR_CALL(w = write(dcb->session->client->fd, ptr_buffer, n); count_writes++); - - fprintf (stderr, "Write Cycle %i, %i of %i bytes done, errno %i\n", count_writes, w, n, errno); - if (w > 2) - fprintf(stderr, "<<< writed BUFFER last 2 byte [%i][%i]\n", ptr_buffer[w-2], ptr_buffer[w-1]); - - if (w < 0) { - if ((errno != EAGAIN) || (errno != EWOULDBLOCK)) { + /* + * Loop over the buffer chain that has been passed to us + * from the reading side. + * Send as much of the data in that chain as possible and + * add any balance to the write queue. + */ + while (head != NULL) + { + len = GWBUF_LENGTH(head); + GW_NOINTR_CALL(w = write(client->fd, GWBUF_DATA(head), len); count_writes++); + saved_errno = errno; + if (w < 0) + { break; - } else { - fprintf(stderr, "<<<< Write to client not completed for %i bytes!\n", n); - if (dcb->session->client) { - fprintf(stderr, "<<<< Try to set buff_bytes!\n"); - memcpy(&dcb->session->client->buff_bytes, &n, sizeof(int)); - fprintf(stderr, "<<<< buff_bytes set to %i\n", dcb->session->client->buff_bytes); - - fprintf(stderr, "<<<< Try to fill memory buffer!\n"); - memcpy(dcb->session->client->buffer, ptr_buffer, n); - dcb->session->client->buffer_ptr = dcb->session->client->buffer; - - fprintf(stderr, "<<<< Buffer Write to client has %i bytes\n", dcb->session->client->buff_bytes); - if (n > 1) { - fprintf(stderr, "<<<< Buffer bytes last 2 [%i]\n", ptr_buffer[0]); - fprintf(stderr, "<<<< Buffer bytes last [%i]\n", ptr_buffer[1]); - } - } else { - fprintf(stderr, "<<< do data in memory for Buffer Write to client\n"); - } - return 0; } - } else { - n = n - w; - ptr_buffer = ptr_buffer + w; - fprintf(stderr, "<<< Write: pointer to buffer %i bytes shifted\n", w); - memset(&dcb->session->client->buff_bytes, '\0', sizeof(int)); + /* + * Pull the number of bytes we have written from + * queue with have. + */ + head = gwbuf_consume(head, w); + if (w < len) + { + /* We didn't write all the data */ + } } + /* Buffer the balance of any data */ + client->writeq = head; + } + spinlock_release(&client->writeqlock); + + if (head && (saved_errno != EAGAIN || saved_errno != EWOULDBLOCK)) + { + /* We had a real write failure that we must deal with */ + } + } return 1; } @@ -322,7 +317,6 @@ int gw_handle_write_event(DCB *dcb, int epfd) { MySQLProtocol *protocol = NULL; int n; struct epoll_event new_event; - n = dcb->buff_bytes; if (dcb == NULL) { @@ -360,8 +354,7 @@ int gw_handle_write_event(DCB *dcb, int epfd) { } if (dcb->session->backends) { - fprintf(stderr, "CLIENT WRITE READY State [%i], FIRST bytes left to write %i from back %i to client %i\n", dcb->state, dcb->buff_bytes, dcb->session->backends->fd, dcb->fd); - fprintf(stderr, "CLIENT WRITE READY, SECOND bytes left to write %i from back %i to client %i\n", dcb->second_buff_bytes, dcb->session->backends->fd, dcb->fd); + fprintf(stderr, "CLIENT WRITE READY State [%i], FIRST bytes left to write %i from back %i to client %i\n", dcb->state, gwbuf_length(dcb->writeq), dcb->session->backends->fd, dcb->fd); } //#ifdef GW_DEBUG_WRITE_EVENT @@ -392,50 +385,40 @@ int gw_handle_write_event(DCB *dcb, int epfd) { if ((protocol->state == MYSQL_IDLE) || (protocol->state == MYSQL_WAITING_RESULT)) { int w; int m; + int saved_errno = 0; - if (dcb->buff_bytes > 0) { - fprintf(stderr, "<<< Writing unsent data for state [%i], bytes %i\n", protocol->state, dcb->buff_bytes); + spinlock_acquire(&dcb->writeqlock); + if (dcb->writeq) + { + int len; - if (dcb->buff_bytes > 2) - fprintf(stderr, "READ BUFFER last 2 byte [%i%i]\n", dcb->buffer[dcb->buff_bytes-2], dcb->buffer[dcb->buff_bytes-1]); + /* + * Loop over the buffer chain in the pendign writeq + * Send as much of the data in that chain as possible and + * leave any balance on the write queue. + */ + while (dcb->writeq != NULL) + { + len = GWBUF_LENGTH(dcb->writeq); + GW_NOINTR_CALL(w = write(dcb->fd, GWBUF_DATA(dcb->writeq), len);); + saved_errno = errno; + if (w < 0) + { + break; + } - w = write(dcb->fd, dcb->buffer_ptr, dcb->buff_bytes); - - if (w < 0) { - if ((w != EAGAIN) || (w!= EWOULDBLOCK)) { - return 1; - } else - return 0; + /* + * Pull the number of bytes we have written from + * queue with have. + */ + dcb->writeq = gwbuf_consume(dcb->writeq, w); + if (w < len) + { + /* We didn't write all the data */ + } } - fprintf(stderr, "<<<<< Written %i bytes, left %i\n", w, dcb->buff_bytes - w); - n = n-w; - memcpy(&dcb->buff_bytes, &n, sizeof(int)); - - dcb->buffer_ptr = dcb->buffer_ptr + w; - - fprintf(stderr, "<<<<< Next time write %i bytes, buffer_ptr is %i bytes shifted\n", n, w); - } else { - fprintf(stderr, "<<<< Nothing to do with FIRST buffer left bytes\n"); } - m = dcb->second_buff_bytes; - - if (dcb->second_buff_bytes) { - fprintf(stderr, "<<<< Now use the SECOND buffer left %i bytes\n", m); - w = write(dcb->fd, dcb->second_buffer_ptr, dcb->second_buff_bytes); - if (w < 0) { - if ((w != EAGAIN) || (w!= EWOULDBLOCK)) { - return 1; - } else - return 0; - } - fprintf(stderr, "<<<<< second Written %i bytes, left %i\n", w, dcb->second_buff_bytes - w); - m = m-w; - memcpy(&dcb->second_buff_bytes, &m, sizeof(int)); - - dcb->second_buffer_ptr = dcb->second_buffer_ptr + w; - } - - fprintf(stderr, "<<<< Nothing to do with SECOND buffer left bytes\n"); + spinlock_release(&dcb->writeqlock); return 1; } @@ -708,7 +691,7 @@ static char gw_randomchar() { int gw_generate_random_str(char *output, int len) { int i; - srand(time()); + srand(time(0L)); for ( i = 0; i < len; ++i ) { output[i] = gw_randomchar(); diff --git a/include/dcb.h b/include/dcb.h index 4e4257e77..82da9e257 100644 --- a/include/dcb.h +++ b/include/dcb.h @@ -17,6 +17,8 @@ * * Copyright SkySQL Ab 2013 */ +#include +#include struct session; @@ -32,7 +34,6 @@ struct session; * entry points * */ - */ struct dcb; @@ -68,16 +69,12 @@ typedef struct dcb { struct session *session; /* The owning session */ GWPROTOCOL func; /* The functions for this descrioptor */ - /* queue buffer for write - is now a two buffer implementation - Only used in client write - */ - uint8_t buffer[MAX_BUFFER_SIZE]; /* network buffer */ - int buff_bytes; /* bytes in buffer */ - uint8_t *buffer_ptr; /* buffer pointer */ - uint8_t second_buffer[MAX_BUFFER_SIZE]; /* 2nd network buffer */ - int second_buff_bytes; /* 2nd bytes in buffer */ - uint8_t *second_buffer_ptr; /* 2nd buffer pointer */ + SPINLOCK writeqlock; /* Write Queue spinlock */ + GWBUF *writeq; /* Write Data Queue */ + + + + struct dcb *next; /* Next DCB in the chain of allocated DCB's */ } DCB; /* DCB states */ @@ -93,4 +90,8 @@ typedef struct dcb { #define DCB_SESSION(x) (x)->session #define DCB_PROTOCOL(x, type) (type *)((x)->protocol) +extern DCB *alloc_dcb(); /* Allocate a DCB */ +extern void printDCB(DCB *); /* Debug print routine */ +extern const char *gw_dcb_state2string(int); /* DCB state to string */ + #endif From c7f533abaf7bea1ab17e6fadf0ec43b642e10eea Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Wed, 12 Jun 2013 12:57:09 +0100 Subject: [PATCH 3/4] Split the backend read function as an example, this still needs to be reorganised into protocols Addition of DCB diagnostics, and free routine --- core/dcb.c | 168 +++++++++++++++++++++++++++++++++++++++++++++++++ core/gateway.c | 2 +- core/utils.c | 133 +++++++++++++++++++++------------------ include/dcb.h | 8 ++- 4 files changed, 247 insertions(+), 64 deletions(-) create mode 100644 core/dcb.c diff --git a/core/dcb.c b/core/dcb.c new file mode 100644 index 000000000..e52540e33 --- /dev/null +++ b/core/dcb.c @@ -0,0 +1,168 @@ +/* + * This file is distributed as part of the SkySQL Gateway. It is free + * software: you can redistribute it and/or modify it under the terms of the + * GNU General Public License as published by the Free Software Foundation, + * version 2. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more + * details. + * + * You should have received a copy of the GNU General Public License along with + * this program; if not, write to the Free Software Foundation, Inc., 51 + * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Copyright SkySQL Ab 2013 + */ + +/* + * dcb.c - Descriptor Control Block generic functions + * + * Revision History + * + * Date Who Description + * 12/06/13 Mark Riddoch Initial implementation + * + */ +#include +#include +#include +#include + +static DCB *allDCBs = NULL; /* Diagnotics need a list of DCBs */ +static SPINLOCK *dcbspin = NULL; + +/* + * Allocate a new DCB. + * + * This routine performs the generic initialisation on the DCB before returning + * the newly allocated DCB. + * + * @return A newly allocated DCB or NULL if non could be allocated. + */ +DCB * +alloc_dcb() +{ +DCB *rval; + + if (dcbspin == NULL) + { + if ((dcbspin = malloc(sizeof(SPINLOCK))) == NULL) + return NULL; + spinlock_init(dcbspin); + } + + if ((rval = malloc(sizeof(DCB))) == NULL) + { + return NULL; + } + spinlock_init(&rval->writeqlock); + rval->writeq = NULL; + rval->state = DCB_STATE_ALLOC; + + spinlock_acquire(dcbspin); + if (allDCBs == NULL) + allDCBs = rval; + else + { + DCB *ptr = allDCBs; + while (ptr->next) + ptr = ptr->next; + ptr->next = rval; + } + spinlock_release(dcbspin); + return rval; +} + +/* + * Free a DCB and remove it from the chain of all DCBs + * + * @param dcb THe DCB to free + */ +void +free_dcb(DCB *dcb) +{ + dcb->state = DCB_STATE_FREED; + + /* First remove this DCB from the chain */ + spinlock_acquire(dcbspin); + if (allDCBs == dcb) + allDCBs = dcb->next; + else + { + DCB *ptr = allDCBs; + while (ptr && ptr->next != dcb) + ptr = ptr->next; + if (ptr) + ptr->next = dcb->next; + } + spinlock_release(dcbspin); + + free(dcb); +} + +/* + * Diagnostic to print a DCB + * + * @param dcb The DCB to print + * + */ +void +printDCB(DCB *dcb) +{ + (void)printf("DCB: 0x%x\n", (void *)dcb); + (void)printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state)); + (void)printf("\tQueued write data: %d\n", gwbuf_length(dcb->writeq)); +} + +/* + * Diagnostic to print all DCB allocated in the system + * + */ +void printAllDCBs() +{ +DCB *dcb; + + if (dcbspin == NULL) + { + if ((dcbspin = malloc(sizeof(SPINLOCK))) == NULL) + return; + spinlock_init(dcbspin); + } + spinlock_acquire(dcbspin); + dcb = allDCBs; + while (dcb) + { + printDCB(dcb); + dcb = dcb->next; + } + spinlock_release(dcbspin); +} + +/* + * Return a string representation of a DCB state. + * + * @param state The DCB state + * @return String representation of the state + * + */ +const char * +gw_dcb_state2string (int state) { + switch(state) { + case DCB_STATE_ALLOC: + return "DCB Allocated"; + case DCB_STATE_IDLE: + return "DCB not yet in polling"; + case DCB_STATE_POLLING: + return "DCB in the EPOLL"; + case DCB_STATE_PROCESSING: + return "DCB processing event"; + case DCB_STATE_LISTENING: + return "DCB for listening socket"; + case DCB_STATE_DISCONNECTED: + return "DCB socket closed"; + default: + return "DCB (unknown)"; + } +} diff --git a/core/gateway.c b/core/gateway.c index d19f6e05f..23a570b85 100644 --- a/core/gateway.c +++ b/core/gateway.c @@ -257,7 +257,7 @@ int main(int argc, char **argv) { if (events[n].events & EPOLLOUT) { if (dcb->state != DCB_STATE_LISTENING) { fprintf(stderr, "CALL the WRITE pointer\n"); - (dcb->func).write(dcb, epollfd); + (dcb->func).write_ready(dcb, epollfd); fprintf(stderr, ">>> CALLED the WRITE pointer\n"); } } diff --git a/core/utils.c b/core/utils.c index d8e6d4a66..65e0c1805 100644 --- a/core/utils.c +++ b/core/utils.c @@ -95,65 +95,8 @@ int gw_read_backend_event(DCB *dcb, int epfd) { ** This next bit really belongs in the write function for the client DCB ** It is here now just to illustrate the use of the buffers */ - { - DCB *client = dcb->session->client; - int saved_errno = 0; + dcb->session->client->func.write(dcb->session->client, head); - spinlock_acquire(&client->writeqlock); - if (client->writeq) - { - /* - * We have some queued data, so add our data to - * the write queue and return. - * The assumption is that there will be an EPOLLOUT - * event to drain what is already queued. We are protected - * by the spinlock, which will also be acquired by the - * the routine that drains the queue data, so we should - * not have a race condition on the event. - */ - client->writeq = gwbuf_append(client->writeq, head); - } - else - { - int len; - - /* - * Loop over the buffer chain that has been passed to us - * from the reading side. - * Send as much of the data in that chain as possible and - * add any balance to the write queue. - */ - while (head != NULL) - { - len = GWBUF_LENGTH(head); - GW_NOINTR_CALL(w = write(client->fd, GWBUF_DATA(head), len); count_writes++); - saved_errno = errno; - if (w < 0) - { - break; - } - - /* - * Pull the number of bytes we have written from - * queue with have. - */ - head = gwbuf_consume(head, w); - if (w < len) - { - /* We didn't write all the data */ - } - } - /* Buffer the balance of any data */ - client->writeq = head; - } - spinlock_release(&client->writeqlock); - - if (head && (saved_errno != EAGAIN || saved_errno != EWOULDBLOCK)) - { - /* We had a real write failure that we must deal with */ - } - - } return 1; } #ifdef GW_DEBUG_READ_EVENT @@ -163,6 +106,75 @@ int gw_read_backend_event(DCB *dcb, int epfd) { return 1; } +/* + * Write function for client DCB + * + * @param dcb The DCB of the client + * @param queue Queue of buffers to write + */ +int +MySQLWrite(DCB *dcb, GWBUF *queue) +{ +int w, count_writes = 0, saved_errno = 0; + + spinlock_acquire(&dcb->writeqlock); + if (dcb->writeq) + { + /* + * We have some queued data, so add our data to + * the write queue and return. + * The assumption is that there will be an EPOLLOUT + * event to drain what is already queued. We are protected + * by the spinlock, which will also be acquired by the + * the routine that drains the queue data, so we should + * not have a race condition on the event. + */ + dcb->writeq = gwbuf_append(dcb->writeq, queue); + } + else + { + int len; + + /* + * Loop over the buffer chain that has been passed to us + * from the reading side. + * Send as much of the data in that chain as possible and + * add any balance to the write queue. + */ + while (queue != NULL) + { + len = GWBUF_LENGTH(queue); + GW_NOINTR_CALL(w = write(dcb->fd, GWBUF_DATA(queue), len); count_writes++); + saved_errno = errno; + if (w < 0) + { + break; + } + + /* + * Pull the number of bytes we have written from + * queue with have. + */ + queue = gwbuf_consume(queue, w); + if (w < len) + { + /* We didn't write all the data */ + } + } + /* Buffer the balance of any data */ + dcb->writeq = queue; + } + spinlock_release(&dcb->writeqlock); + + if (queue && (saved_errno != EAGAIN || saved_errno != EWOULDBLOCK)) + { + /* We had a real write failure that we must deal with */ + return 0; + } + + return 1; +} + ////////////////////////////////////////// //backend write event triggered by EPOLLOUT ////////////////////////////////////////// @@ -612,7 +624,7 @@ int MySQLAccept(DCB *listener, int efd) { backend->state = DCB_STATE_POLLING; backend->session = session; (backend->func).read = gw_read_backend_event; - (backend->func).write = gw_write_backend_event; + (backend->func).write_ready = gw_write_backend_event; (backend->func).error = handle_event_errors_backend; // assume here one backend only. @@ -626,7 +638,8 @@ int MySQLAccept(DCB *listener, int efd) { // assign function poiters to "func" field (client->func).error = handle_event_errors; (client->func).read = gw_route_read_event; - (client->func).write = gw_handle_write_event; + (client->func).write = MySQLWrite; + (client->func).write_ready = gw_handle_write_event; // edge triggering flag added ee.events = EPOLLIN | EPOLLOUT | EPOLLET; diff --git a/include/dcb.h b/include/dcb.h index 82da9e257..222aff49a 100644 --- a/include/dcb.h +++ b/include/dcb.h @@ -51,8 +51,8 @@ typedef struct gw_protocol { * close Gateway close entry point for the socket */ int (*read)(struct dcb *, int); - int (*write)(struct dcb *, int); - int (*write_ready)(struct dcb *); + int (*write)(struct dcb *, GWBUF *); + int (*write_ready)(struct dcb *, int); int (*error)(struct dcb *, int); int (*hangup)(struct dcb *, int); int (*accept)(struct dcb *, int); @@ -84,13 +84,15 @@ typedef struct dcb { #define DCB_STATE_PROCESSING 4 /* Processing an event */ #define DCB_STATE_LISTENING 5 /* The DCB is for a listening socket */ #define DCB_STATE_DISCONNECTED 6 /* The socket is now closed */ -#define DCB_STATE_FREED 7 /* Memory freed */ +#define DCB_STATE_FREED 7 /* Memory freed */ /* A few useful macros */ #define DCB_SESSION(x) (x)->session #define DCB_PROTOCOL(x, type) (type *)((x)->protocol) extern DCB *alloc_dcb(); /* Allocate a DCB */ +extern void free_dcb(DCB *); /* Free a DCB */ +extern void printAllDCBs(); /* Debug to print all DCB in the system */ extern void printDCB(DCB *); /* Debug print routine */ extern const char *gw_dcb_state2string(int); /* DCB state to string */ From 08549ad5aec215526a93f534f103ae72baaa65d7 Mon Sep 17 00:00:00 2001 From: Mark Riddoch Date: Wed, 12 Jun 2013 13:08:33 +0100 Subject: [PATCH 4/4] Addition of some basic statistics on the DCB --- core/dcb.c | 6 ++++++ core/utils.c | 13 +++++-------- include/dcb.h | 7 ++++++- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/core/dcb.c b/core/dcb.c index e52540e33..071604fac 100644 --- a/core/dcb.c +++ b/core/dcb.c @@ -27,6 +27,7 @@ */ #include #include +#include #include #include @@ -60,6 +61,7 @@ DCB *rval; spinlock_init(&rval->writeqlock); rval->writeq = NULL; rval->state = DCB_STATE_ALLOC; + memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics spinlock_acquire(dcbspin); if (allDCBs == NULL) @@ -114,6 +116,10 @@ printDCB(DCB *dcb) (void)printf("DCB: 0x%x\n", (void *)dcb); (void)printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state)); (void)printf("\tQueued write data: %d\n", gwbuf_length(dcb->writeq)); + (void)printf("\tStatistics:\n"); + (void)printf("\t\tNo. of Reads: %d\n", dcb->stats.n_reads); + (void)printf("\t\tNo. of Writes: %d\n", dcb->stats.n_writes); + (void)printf("\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); } /* diff --git a/core/utils.c b/core/utils.c index 65e0c1805..af7735adf 100644 --- a/core/utils.c +++ b/core/utils.c @@ -61,8 +61,6 @@ int gw_read_backend_event(DCB *dcb, int epfd) { if ((client_protocol->state == MYSQL_WAITING_RESULT) || (client_protocol->state == MYSQL_IDLE)) { struct epoll_event new_event; int w; - int count_reads = 0; - int count_writes = 0; int b = -1; int tot_b = -1; uint8_t *ptr_buffer; @@ -86,7 +84,7 @@ int gw_read_backend_event(DCB *dcb, int epfd) { /* Bad news, we have run out of memory */ return 0; } - GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); count_reads++); + GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); dcb->stats.n_reads++); head = gwbuf_append(head, buffer); } @@ -115,7 +113,7 @@ int gw_read_backend_event(DCB *dcb, int epfd) { int MySQLWrite(DCB *dcb, GWBUF *queue) { -int w, count_writes = 0, saved_errno = 0; +int w, saved_errno = 0; spinlock_acquire(&dcb->writeqlock); if (dcb->writeq) @@ -144,7 +142,7 @@ int w, count_writes = 0, saved_errno = 0; while (queue != NULL) { len = GWBUF_LENGTH(queue); - GW_NOINTR_CALL(w = write(dcb->fd, GWBUF_DATA(queue), len); count_writes++); + GW_NOINTR_CALL(w = write(dcb->fd, GWBUF_DATA(queue), len); dcb->stats.n_writes++); saved_errno = errno; if (w < 0) { @@ -536,7 +534,6 @@ void MySQLListener(int epfd, char *config_bind) { int MySQLAccept(DCB *listener, int efd) { - int accept_counter = 0; fprintf(stderr, "MySQL Listener socket is: %i\n", listener->fd); @@ -569,9 +566,9 @@ int MySQLAccept(DCB *listener, int efd) { } } - accept_counter++; + listener->stats.n_accepts++; - fprintf(stderr, "Processing %i connection fd %i for listener %i\n", accept_counter, c_sock, listener->fd); + fprintf(stderr, "Processing %i connection fd %i for listener %i\n", listener->stats.n_accepts, c_sock, listener->fd); // set nonblocking setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen); diff --git a/include/dcb.h b/include/dcb.h index 222aff49a..18dcf19e9 100644 --- a/include/dcb.h +++ b/include/dcb.h @@ -59,6 +59,11 @@ typedef struct gw_protocol { int (*close)(struct dcb *, int); } GWPROTOCOL; +typedef struct dcbstats { + int n_reads; /* Number of reads on this descriptor */ + int n_writes; /* Number of writes on this descriptor */ + int n_accepts; /* Number of accepts on this descriptor */ +} DCBSTATS; /* * Descriptor Control Block */ @@ -72,7 +77,7 @@ typedef struct dcb { SPINLOCK writeqlock; /* Write Queue spinlock */ GWBUF *writeq; /* Write Data Queue */ - + DCBSTATS stats; /* DCB related statistics */ struct dcb *next; /* Next DCB in the chain of allocated DCB's */ } DCB;