Addition of example buffer handling

This commit is contained in:
Mark Riddoch
2013-06-12 10:01:02 +01:00
parent 037055d33e
commit 329a70eccd
4 changed files with 115 additions and 148 deletions

View File

@ -16,7 +16,8 @@
CC=cc CC=cc
CFLAGS=-c -I/usr/include -I../include CFLAGS=-c -I/usr/include -I../include
SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c gw_utils.c utils.c SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c gw_utils.c \
utils.c dcb.c
OBJ=$(SRCS:.c=.o) OBJ=$(SRCS:.c=.o)
LIBS=-lssl LIBS=-lssl

View File

@ -191,21 +191,3 @@ int do_read_buffer(DCB *dcb, uint8_t *buffer) {
return n; return n;
} }
const char *gw_dcb_state2string (int state) {
switch(state) {
case DCB_STATE_ALLOC:
return "DCB Allocated";
case DCB_STATE_IDLE:
return "DCB not yet in polling";
case DCB_STATE_POLLING:
return "DCB in the EPOLL";
case DCB_STATE_PROCESSING:
return "DCB processing event";
case DCB_STATE_LISTENING:
return "DCB for listening socket";
case DCB_STATE_DISCONNECTED:
return "DCB socket closed";
default:
return "DCB (unknown)";
}
}

View File

@ -63,10 +63,10 @@ int gw_read_backend_event(DCB *dcb, int epfd) {
int w; int w;
int count_reads = 0; int count_reads = 0;
int count_writes = 0; int count_writes = 0;
uint8_t buffer[MAX_BUFFER_SIZE];
int b = -1; int b = -1;
int tot_b = -1; int tot_b = -1;
uint8_t *ptr_buffer; uint8_t *ptr_buffer;
GWBUF *buffer, *head;
if (ioctl(dcb->fd, FIONREAD, &b)) { if (ioctl(dcb->fd, FIONREAD, &b)) {
fprintf(stderr, "Backend Ioctl FIONREAD error %i, %s\n", errno , strerror(errno)); fprintf(stderr, "Backend Ioctl FIONREAD error %i, %s\n", errno , strerror(errno));
@ -74,90 +74,85 @@ int gw_read_backend_event(DCB *dcb, int epfd) {
fprintf(stderr, "Backend IOCTL FIONREAD bytes to read = %i\n", b); fprintf(stderr, "Backend IOCTL FIONREAD bytes to read = %i\n", b);
} }
// detect pending data in the buffer for client write /*
if (dcb->session->client->buff_bytes > 0) { * Read all the data that is available into a chain of buffers
*/
fprintf(stderr, "*********** Read backend says there are pending writes for %i bytes\n", dcb->session->client->buff_bytes); head = NULL;
// read data from socket, assume no error here (quick) and put it into the DCB second buffer while (b > 0)
GW_NOINTR_CALL(n = read(dcb->fd, buffer, b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE); count_reads++); {
int bufsize = b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE;
memcpy(&dcb->session->client->second_buff_bytes, &n, sizeof(int)); if ((buffer = gwbuf_alloc(bufsize)) == NULL)
fprintf(stderr, "#### second buff_bytes set to %i\n", dcb->session->client->second_buff_bytes); {
/* Bad news, we have run out of memory */
memcpy(dcb->session->client->second_buffer, buffer, n); return 0;
fprintf(stderr, "#### filled memory second buffer!\n");
dcb->session->client->second_buffer_ptr = dcb->session->client->second_buffer;
return 1;
}
// else, no pending data
tot_b = b;
// read all the data, without using multiple buffers, only one
while (b > 0) {
GW_NOINTR_CALL(n = read(dcb->fd, buffer, b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE); count_reads++);
fprintf (stderr, "Read %i/%i bytes done, n. reads = %i, %i. Next could be write\n", n, b, count_reads, errno);
if (n < 0) {
if ((errno != EAGAIN) || (errno != EWOULDBLOCK))
return 0;
else
return 1;
} else {
b = b - n;
} }
GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); count_reads++);
head = gwbuf_append(head, buffer);
} }
ptr_buffer = buffer;
if(n >2) /*
fprintf(stderr, ">>> The READ BUFFER last 2 byte [%i][%i]\n", buffer[n-2], buffer[n-1]); ** This next bit really belongs in the write function for the client DCB
** It is here now just to illustrate the use of the buffers
*/
{
DCB *client = dcb->session->client;
int saved_errno = 0;
spinlock_acquire(&client->writeqlock);
if (client->writeq)
{
/*
* We have some queued data, so add our data to
* the write queue and return.
* The assumption is that there will be an EPOLLOUT
* event to drain what is already queued. We are protected
* by the spinlock, which will also be acquired by the
* the routine that drains the queue data, so we should
* not have a race condition on the event.
*/
client->writeq = gwbuf_append(client->writeq, head);
}
else
{
int len;
// write all the data /*
// if write fails for EAGAIN or EWOULDBLOCK, copy the data into the dcb buffer * Loop over the buffer chain that has been passed to us
while (n >0) { * from the reading side.
GW_NOINTR_CALL(w = write(dcb->session->client->fd, ptr_buffer, n); count_writes++); * Send as much of the data in that chain as possible and
* add any balance to the write queue.
fprintf (stderr, "Write Cycle %i, %i of %i bytes done, errno %i\n", count_writes, w, n, errno); */
if (w > 2) while (head != NULL)
fprintf(stderr, "<<< writed BUFFER last 2 byte [%i][%i]\n", ptr_buffer[w-2], ptr_buffer[w-1]); {
len = GWBUF_LENGTH(head);
if (w < 0) { GW_NOINTR_CALL(w = write(client->fd, GWBUF_DATA(head), len); count_writes++);
if ((errno != EAGAIN) || (errno != EWOULDBLOCK)) { saved_errno = errno;
if (w < 0)
{
break; break;
} else {
fprintf(stderr, "<<<< Write to client not completed for %i bytes!\n", n);
if (dcb->session->client) {
fprintf(stderr, "<<<< Try to set buff_bytes!\n");
memcpy(&dcb->session->client->buff_bytes, &n, sizeof(int));
fprintf(stderr, "<<<< buff_bytes set to %i\n", dcb->session->client->buff_bytes);
fprintf(stderr, "<<<< Try to fill memory buffer!\n");
memcpy(dcb->session->client->buffer, ptr_buffer, n);
dcb->session->client->buffer_ptr = dcb->session->client->buffer;
fprintf(stderr, "<<<< Buffer Write to client has %i bytes\n", dcb->session->client->buff_bytes);
if (n > 1) {
fprintf(stderr, "<<<< Buffer bytes last 2 [%i]\n", ptr_buffer[0]);
fprintf(stderr, "<<<< Buffer bytes last [%i]\n", ptr_buffer[1]);
}
} else {
fprintf(stderr, "<<< do data in memory for Buffer Write to client\n");
}
return 0;
} }
} else {
n = n - w;
ptr_buffer = ptr_buffer + w;
fprintf(stderr, "<<< Write: pointer to buffer %i bytes shifted\n", w);
memset(&dcb->session->client->buff_bytes, '\0', sizeof(int)); /*
* Pull the number of bytes we have written from
* queue with have.
*/
head = gwbuf_consume(head, w);
if (w < len)
{
/* We didn't write all the data */
}
} }
/* Buffer the balance of any data */
client->writeq = head;
}
spinlock_release(&client->writeqlock);
if (head && (saved_errno != EAGAIN || saved_errno != EWOULDBLOCK))
{
/* We had a real write failure that we must deal with */
}
} }
return 1; return 1;
} }
@ -322,7 +317,6 @@ int gw_handle_write_event(DCB *dcb, int epfd) {
MySQLProtocol *protocol = NULL; MySQLProtocol *protocol = NULL;
int n; int n;
struct epoll_event new_event; struct epoll_event new_event;
n = dcb->buff_bytes;
if (dcb == NULL) { if (dcb == NULL) {
@ -360,8 +354,7 @@ int gw_handle_write_event(DCB *dcb, int epfd) {
} }
if (dcb->session->backends) { if (dcb->session->backends) {
fprintf(stderr, "CLIENT WRITE READY State [%i], FIRST bytes left to write %i from back %i to client %i\n", dcb->state, dcb->buff_bytes, dcb->session->backends->fd, dcb->fd); fprintf(stderr, "CLIENT WRITE READY State [%i], FIRST bytes left to write %i from back %i to client %i\n", dcb->state, gwbuf_length(dcb->writeq), dcb->session->backends->fd, dcb->fd);
fprintf(stderr, "CLIENT WRITE READY, SECOND bytes left to write %i from back %i to client %i\n", dcb->second_buff_bytes, dcb->session->backends->fd, dcb->fd);
} }
//#ifdef GW_DEBUG_WRITE_EVENT //#ifdef GW_DEBUG_WRITE_EVENT
@ -392,50 +385,40 @@ int gw_handle_write_event(DCB *dcb, int epfd) {
if ((protocol->state == MYSQL_IDLE) || (protocol->state == MYSQL_WAITING_RESULT)) { if ((protocol->state == MYSQL_IDLE) || (protocol->state == MYSQL_WAITING_RESULT)) {
int w; int w;
int m; int m;
int saved_errno = 0;
if (dcb->buff_bytes > 0) { spinlock_acquire(&dcb->writeqlock);
fprintf(stderr, "<<< Writing unsent data for state [%i], bytes %i\n", protocol->state, dcb->buff_bytes); if (dcb->writeq)
{
int len;
if (dcb->buff_bytes > 2) /*
fprintf(stderr, "READ BUFFER last 2 byte [%i%i]\n", dcb->buffer[dcb->buff_bytes-2], dcb->buffer[dcb->buff_bytes-1]); * Loop over the buffer chain in the pendign writeq
* Send as much of the data in that chain as possible and
* leave any balance on the write queue.
*/
while (dcb->writeq != NULL)
{
len = GWBUF_LENGTH(dcb->writeq);
GW_NOINTR_CALL(w = write(dcb->fd, GWBUF_DATA(dcb->writeq), len););
saved_errno = errno;
if (w < 0)
{
break;
}
w = write(dcb->fd, dcb->buffer_ptr, dcb->buff_bytes); /*
* Pull the number of bytes we have written from
if (w < 0) { * queue with have.
if ((w != EAGAIN) || (w!= EWOULDBLOCK)) { */
return 1; dcb->writeq = gwbuf_consume(dcb->writeq, w);
} else if (w < len)
return 0; {
/* We didn't write all the data */
}
} }
fprintf(stderr, "<<<<< Written %i bytes, left %i\n", w, dcb->buff_bytes - w);
n = n-w;
memcpy(&dcb->buff_bytes, &n, sizeof(int));
dcb->buffer_ptr = dcb->buffer_ptr + w;
fprintf(stderr, "<<<<< Next time write %i bytes, buffer_ptr is %i bytes shifted\n", n, w);
} else {
fprintf(stderr, "<<<< Nothing to do with FIRST buffer left bytes\n");
} }
m = dcb->second_buff_bytes; spinlock_release(&dcb->writeqlock);
if (dcb->second_buff_bytes) {
fprintf(stderr, "<<<< Now use the SECOND buffer left %i bytes\n", m);
w = write(dcb->fd, dcb->second_buffer_ptr, dcb->second_buff_bytes);
if (w < 0) {
if ((w != EAGAIN) || (w!= EWOULDBLOCK)) {
return 1;
} else
return 0;
}
fprintf(stderr, "<<<<< second Written %i bytes, left %i\n", w, dcb->second_buff_bytes - w);
m = m-w;
memcpy(&dcb->second_buff_bytes, &m, sizeof(int));
dcb->second_buffer_ptr = dcb->second_buffer_ptr + w;
}
fprintf(stderr, "<<<< Nothing to do with SECOND buffer left bytes\n");
return 1; return 1;
} }
@ -708,7 +691,7 @@ static char gw_randomchar() {
int gw_generate_random_str(char *output, int len) { int gw_generate_random_str(char *output, int len) {
int i; int i;
srand(time()); srand(time(0L));
for ( i = 0; i < len; ++i ) { for ( i = 0; i < len; ++i ) {
output[i] = gw_randomchar(); output[i] = gw_randomchar();

View File

@ -17,6 +17,8 @@
* *
* Copyright SkySQL Ab 2013 * Copyright SkySQL Ab 2013
*/ */
#include <spinlock.h>
#include <buffer.h>
struct session; struct session;
@ -32,7 +34,6 @@ struct session;
* entry points * entry points
* *
*/ */
*/
struct dcb; struct dcb;
@ -68,16 +69,12 @@ typedef struct dcb {
struct session *session; /* The owning session */ struct session *session; /* The owning session */
GWPROTOCOL func; /* The functions for this descrioptor */ GWPROTOCOL func; /* The functions for this descrioptor */
/* queue buffer for write SPINLOCK writeqlock; /* Write Queue spinlock */
is now a two buffer implementation GWBUF *writeq; /* Write Data Queue */
Only used in client write
*/
uint8_t buffer[MAX_BUFFER_SIZE]; /* network buffer */
int buff_bytes; /* bytes in buffer */ struct dcb *next; /* Next DCB in the chain of allocated DCB's */
uint8_t *buffer_ptr; /* buffer pointer */
uint8_t second_buffer[MAX_BUFFER_SIZE]; /* 2nd network buffer */
int second_buff_bytes; /* 2nd bytes in buffer */
uint8_t *second_buffer_ptr; /* 2nd buffer pointer */
} DCB; } DCB;
/* DCB states */ /* DCB states */
@ -93,4 +90,8 @@ typedef struct dcb {
#define DCB_SESSION(x) (x)->session #define DCB_SESSION(x) (x)->session
#define DCB_PROTOCOL(x, type) (type *)((x)->protocol) #define DCB_PROTOCOL(x, type) (type *)((x)->protocol)
extern DCB *alloc_dcb(); /* Allocate a DCB */
extern void printDCB(DCB *); /* Debug print routine */
extern const char *gw_dcb_state2string(int); /* DCB state to string */
#endif #endif