diff --git a/core/Makefile b/core/Makefile index d6483b832..5cb944814 100644 --- a/core/Makefile +++ b/core/Makefile @@ -16,7 +16,8 @@ CC=cc CFLAGS=-c -I/usr/include -I../include -SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c gw_utils.c utils.c +SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c gw_utils.c \ + utils.c dcb.c OBJ=$(SRCS:.c=.o) LIBS=-lssl diff --git a/core/gw_utils.c b/core/gw_utils.c index 40415dba0..705d5c9d7 100644 --- a/core/gw_utils.c +++ b/core/gw_utils.c @@ -191,21 +191,3 @@ int do_read_buffer(DCB *dcb, uint8_t *buffer) { return n; } -const char *gw_dcb_state2string (int state) { - switch(state) { - case DCB_STATE_ALLOC: - return "DCB Allocated"; - case DCB_STATE_IDLE: - return "DCB not yet in polling"; - case DCB_STATE_POLLING: - return "DCB in the EPOLL"; - case DCB_STATE_PROCESSING: - return "DCB processing event"; - case DCB_STATE_LISTENING: - return "DCB for listening socket"; - case DCB_STATE_DISCONNECTED: - return "DCB socket closed"; - default: - return "DCB (unknown)"; - } -} diff --git a/core/utils.c b/core/utils.c index 1ae63e9fe..d8e6d4a66 100644 --- a/core/utils.c +++ b/core/utils.c @@ -63,10 +63,10 @@ int gw_read_backend_event(DCB *dcb, int epfd) { int w; int count_reads = 0; int count_writes = 0; - uint8_t buffer[MAX_BUFFER_SIZE]; int b = -1; int tot_b = -1; uint8_t *ptr_buffer; + GWBUF *buffer, *head; if (ioctl(dcb->fd, FIONREAD, &b)) { fprintf(stderr, "Backend Ioctl FIONREAD error %i, %s\n", errno , strerror(errno)); @@ -74,90 +74,85 @@ int gw_read_backend_event(DCB *dcb, int epfd) { fprintf(stderr, "Backend IOCTL FIONREAD bytes to read = %i\n", b); } - // detect pending data in the buffer for client write - if (dcb->session->client->buff_bytes > 0) { - - fprintf(stderr, "*********** Read backend says there are pending writes for %i bytes\n", dcb->session->client->buff_bytes); - // read data from socket, assume no error here (quick) and put it into the DCB second buffer - GW_NOINTR_CALL(n = read(dcb->fd, buffer, b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE); count_reads++); - - memcpy(&dcb->session->client->second_buff_bytes, &n, sizeof(int)); - fprintf(stderr, "#### second buff_bytes set to %i\n", dcb->session->client->second_buff_bytes); - - memcpy(dcb->session->client->second_buffer, buffer, n); - fprintf(stderr, "#### filled memory second buffer!\n"); - dcb->session->client->second_buffer_ptr = dcb->session->client->second_buffer; - - return 1; - } - - // else, no pending data - tot_b = b; - - // read all the data, without using multiple buffers, only one - while (b > 0) { - - GW_NOINTR_CALL(n = read(dcb->fd, buffer, b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE); count_reads++); - - fprintf (stderr, "Read %i/%i bytes done, n. reads = %i, %i. Next could be write\n", n, b, count_reads, errno); - - if (n < 0) { - if ((errno != EAGAIN) || (errno != EWOULDBLOCK)) - return 0; - else - return 1; - } else { - b = b - n; + /* + * Read all the data that is available into a chain of buffers + */ + head = NULL; + while (b > 0) + { + int bufsize = b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE; + if ((buffer = gwbuf_alloc(bufsize)) == NULL) + { + /* Bad news, we have run out of memory */ + return 0; } + GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); count_reads++); + head = gwbuf_append(head, buffer); } - ptr_buffer = buffer; - - if(n >2) - fprintf(stderr, ">>> The READ BUFFER last 2 byte [%i][%i]\n", buffer[n-2], buffer[n-1]); + /* + ** This next bit really belongs in the write function for the client DCB + ** It is here now just to illustrate the use of the buffers + */ + { + DCB *client = dcb->session->client; + int saved_errno = 0; + spinlock_acquire(&client->writeqlock); + if (client->writeq) + { + /* + * We have some queued data, so add our data to + * the write queue and return. + * The assumption is that there will be an EPOLLOUT + * event to drain what is already queued. We are protected + * by the spinlock, which will also be acquired by the + * the routine that drains the queue data, so we should + * not have a race condition on the event. + */ + client->writeq = gwbuf_append(client->writeq, head); + } + else + { + int len; - // write all the data - // if write fails for EAGAIN or EWOULDBLOCK, copy the data into the dcb buffer - while (n >0) { - GW_NOINTR_CALL(w = write(dcb->session->client->fd, ptr_buffer, n); count_writes++); - - fprintf (stderr, "Write Cycle %i, %i of %i bytes done, errno %i\n", count_writes, w, n, errno); - if (w > 2) - fprintf(stderr, "<<< writed BUFFER last 2 byte [%i][%i]\n", ptr_buffer[w-2], ptr_buffer[w-1]); - - if (w < 0) { - if ((errno != EAGAIN) || (errno != EWOULDBLOCK)) { + /* + * Loop over the buffer chain that has been passed to us + * from the reading side. + * Send as much of the data in that chain as possible and + * add any balance to the write queue. + */ + while (head != NULL) + { + len = GWBUF_LENGTH(head); + GW_NOINTR_CALL(w = write(client->fd, GWBUF_DATA(head), len); count_writes++); + saved_errno = errno; + if (w < 0) + { break; - } else { - fprintf(stderr, "<<<< Write to client not completed for %i bytes!\n", n); - if (dcb->session->client) { - fprintf(stderr, "<<<< Try to set buff_bytes!\n"); - memcpy(&dcb->session->client->buff_bytes, &n, sizeof(int)); - fprintf(stderr, "<<<< buff_bytes set to %i\n", dcb->session->client->buff_bytes); - - fprintf(stderr, "<<<< Try to fill memory buffer!\n"); - memcpy(dcb->session->client->buffer, ptr_buffer, n); - dcb->session->client->buffer_ptr = dcb->session->client->buffer; - - fprintf(stderr, "<<<< Buffer Write to client has %i bytes\n", dcb->session->client->buff_bytes); - if (n > 1) { - fprintf(stderr, "<<<< Buffer bytes last 2 [%i]\n", ptr_buffer[0]); - fprintf(stderr, "<<<< Buffer bytes last [%i]\n", ptr_buffer[1]); - } - } else { - fprintf(stderr, "<<< do data in memory for Buffer Write to client\n"); - } - return 0; } - } else { - n = n - w; - ptr_buffer = ptr_buffer + w; - fprintf(stderr, "<<< Write: pointer to buffer %i bytes shifted\n", w); - memset(&dcb->session->client->buff_bytes, '\0', sizeof(int)); + /* + * Pull the number of bytes we have written from + * queue with have. + */ + head = gwbuf_consume(head, w); + if (w < len) + { + /* We didn't write all the data */ + } } + /* Buffer the balance of any data */ + client->writeq = head; + } + spinlock_release(&client->writeqlock); + + if (head && (saved_errno != EAGAIN || saved_errno != EWOULDBLOCK)) + { + /* We had a real write failure that we must deal with */ + } + } return 1; } @@ -322,7 +317,6 @@ int gw_handle_write_event(DCB *dcb, int epfd) { MySQLProtocol *protocol = NULL; int n; struct epoll_event new_event; - n = dcb->buff_bytes; if (dcb == NULL) { @@ -360,8 +354,7 @@ int gw_handle_write_event(DCB *dcb, int epfd) { } if (dcb->session->backends) { - fprintf(stderr, "CLIENT WRITE READY State [%i], FIRST bytes left to write %i from back %i to client %i\n", dcb->state, dcb->buff_bytes, dcb->session->backends->fd, dcb->fd); - fprintf(stderr, "CLIENT WRITE READY, SECOND bytes left to write %i from back %i to client %i\n", dcb->second_buff_bytes, dcb->session->backends->fd, dcb->fd); + fprintf(stderr, "CLIENT WRITE READY State [%i], FIRST bytes left to write %i from back %i to client %i\n", dcb->state, gwbuf_length(dcb->writeq), dcb->session->backends->fd, dcb->fd); } //#ifdef GW_DEBUG_WRITE_EVENT @@ -392,50 +385,40 @@ int gw_handle_write_event(DCB *dcb, int epfd) { if ((protocol->state == MYSQL_IDLE) || (protocol->state == MYSQL_WAITING_RESULT)) { int w; int m; + int saved_errno = 0; - if (dcb->buff_bytes > 0) { - fprintf(stderr, "<<< Writing unsent data for state [%i], bytes %i\n", protocol->state, dcb->buff_bytes); + spinlock_acquire(&dcb->writeqlock); + if (dcb->writeq) + { + int len; - if (dcb->buff_bytes > 2) - fprintf(stderr, "READ BUFFER last 2 byte [%i%i]\n", dcb->buffer[dcb->buff_bytes-2], dcb->buffer[dcb->buff_bytes-1]); + /* + * Loop over the buffer chain in the pendign writeq + * Send as much of the data in that chain as possible and + * leave any balance on the write queue. + */ + while (dcb->writeq != NULL) + { + len = GWBUF_LENGTH(dcb->writeq); + GW_NOINTR_CALL(w = write(dcb->fd, GWBUF_DATA(dcb->writeq), len);); + saved_errno = errno; + if (w < 0) + { + break; + } - w = write(dcb->fd, dcb->buffer_ptr, dcb->buff_bytes); - - if (w < 0) { - if ((w != EAGAIN) || (w!= EWOULDBLOCK)) { - return 1; - } else - return 0; + /* + * Pull the number of bytes we have written from + * queue with have. + */ + dcb->writeq = gwbuf_consume(dcb->writeq, w); + if (w < len) + { + /* We didn't write all the data */ + } } - fprintf(stderr, "<<<<< Written %i bytes, left %i\n", w, dcb->buff_bytes - w); - n = n-w; - memcpy(&dcb->buff_bytes, &n, sizeof(int)); - - dcb->buffer_ptr = dcb->buffer_ptr + w; - - fprintf(stderr, "<<<<< Next time write %i bytes, buffer_ptr is %i bytes shifted\n", n, w); - } else { - fprintf(stderr, "<<<< Nothing to do with FIRST buffer left bytes\n"); } - m = dcb->second_buff_bytes; - - if (dcb->second_buff_bytes) { - fprintf(stderr, "<<<< Now use the SECOND buffer left %i bytes\n", m); - w = write(dcb->fd, dcb->second_buffer_ptr, dcb->second_buff_bytes); - if (w < 0) { - if ((w != EAGAIN) || (w!= EWOULDBLOCK)) { - return 1; - } else - return 0; - } - fprintf(stderr, "<<<<< second Written %i bytes, left %i\n", w, dcb->second_buff_bytes - w); - m = m-w; - memcpy(&dcb->second_buff_bytes, &m, sizeof(int)); - - dcb->second_buffer_ptr = dcb->second_buffer_ptr + w; - } - - fprintf(stderr, "<<<< Nothing to do with SECOND buffer left bytes\n"); + spinlock_release(&dcb->writeqlock); return 1; } @@ -708,7 +691,7 @@ static char gw_randomchar() { int gw_generate_random_str(char *output, int len) { int i; - srand(time()); + srand(time(0L)); for ( i = 0; i < len; ++i ) { output[i] = gw_randomchar(); diff --git a/include/dcb.h b/include/dcb.h index 4e4257e77..82da9e257 100644 --- a/include/dcb.h +++ b/include/dcb.h @@ -17,6 +17,8 @@ * * Copyright SkySQL Ab 2013 */ +#include +#include struct session; @@ -32,7 +34,6 @@ struct session; * entry points * */ - */ struct dcb; @@ -68,16 +69,12 @@ typedef struct dcb { struct session *session; /* The owning session */ GWPROTOCOL func; /* The functions for this descrioptor */ - /* queue buffer for write - is now a two buffer implementation - Only used in client write - */ - uint8_t buffer[MAX_BUFFER_SIZE]; /* network buffer */ - int buff_bytes; /* bytes in buffer */ - uint8_t *buffer_ptr; /* buffer pointer */ - uint8_t second_buffer[MAX_BUFFER_SIZE]; /* 2nd network buffer */ - int second_buff_bytes; /* 2nd bytes in buffer */ - uint8_t *second_buffer_ptr; /* 2nd buffer pointer */ + SPINLOCK writeqlock; /* Write Queue spinlock */ + GWBUF *writeq; /* Write Data Queue */ + + + + struct dcb *next; /* Next DCB in the chain of allocated DCB's */ } DCB; /* DCB states */ @@ -93,4 +90,8 @@ typedef struct dcb { #define DCB_SESSION(x) (x)->session #define DCB_PROTOCOL(x, type) (type *)((x)->protocol) +extern DCB *alloc_dcb(); /* Allocate a DCB */ +extern void printDCB(DCB *); /* Debug print routine */ +extern const char *gw_dcb_state2string(int); /* DCB state to string */ + #endif