in utils.c see line 88 for modifications
This commit is contained in:
		@ -16,7 +16,8 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
CC=cc
 | 
					CC=cc
 | 
				
			||||||
CFLAGS=-c -I/usr/include -I../include
 | 
					CFLAGS=-c -I/usr/include -I../include
 | 
				
			||||||
SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c gw_utils.c utils.c
 | 
					SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c gw_utils.c \
 | 
				
			||||||
 | 
						utils.c dcb.c
 | 
				
			||||||
OBJ=$(SRCS:.c=.o)
 | 
					OBJ=$(SRCS:.c=.o)
 | 
				
			||||||
LIBS=-lssl
 | 
					LIBS=-lssl
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -25,3 +26,6 @@ gateway: $(OBJ)
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
.c.o:
 | 
					.c.o:
 | 
				
			||||||
	$(CC) $(CFLAGS) $< -o $@
 | 
						$(CC) $(CFLAGS) $< -o $@
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					clean:
 | 
				
			||||||
 | 
						rm -f $(OBJ) gateway
 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										174
									
								
								core/dcb.c
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										174
									
								
								core/dcb.c
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,174 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					 * This file is distributed as part of the SkySQL Gateway.  It is free
 | 
				
			||||||
 | 
					 * software: you can redistribute it and/or modify it under the terms of the
 | 
				
			||||||
 | 
					 * GNU General Public License as published by the Free Software Foundation,
 | 
				
			||||||
 | 
					 * version 2.
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * This program is distributed in the hope that it will be useful, but WITHOUT
 | 
				
			||||||
 | 
					 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 | 
				
			||||||
 | 
					 * FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 | 
				
			||||||
 | 
					 * details.
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * You should have received a copy of the GNU General Public License along with
 | 
				
			||||||
 | 
					 * this program; if not, write to the Free Software Foundation, Inc., 51
 | 
				
			||||||
 | 
					 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * Copyright SkySQL Ab 2013
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
 | 
					 * dcb.c  -  Descriptor Control Block generic functions
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * Revision History
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * Date		Who		Description
 | 
				
			||||||
 | 
					 * 12/06/13	Mark Riddoch	Initial implementation
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					#include <stdio.h>
 | 
				
			||||||
 | 
					#include <stdlib.h>
 | 
				
			||||||
 | 
					#include <string.h>
 | 
				
			||||||
 | 
					#include <dcb.h>
 | 
				
			||||||
 | 
					#include <spinlock.h>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static	DCB		*allDCBs = NULL;	/* Diagnotics need a list of DCBs */
 | 
				
			||||||
 | 
					static	SPINLOCK	*dcbspin = NULL;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
 | 
					 * Allocate a new DCB. 
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * This routine performs the generic initialisation on the DCB before returning
 | 
				
			||||||
 | 
					 * the newly allocated DCB.
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * @return A newly allocated DCB or NULL if non could be allocated.
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					DCB *
 | 
				
			||||||
 | 
					alloc_dcb()
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					DCB	*rval;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if (dcbspin == NULL)
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							if ((dcbspin = malloc(sizeof(SPINLOCK))) == NULL)
 | 
				
			||||||
 | 
								return NULL;
 | 
				
			||||||
 | 
							spinlock_init(dcbspin);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if ((rval = malloc(sizeof(DCB))) == NULL)
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							return NULL;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						spinlock_init(&rval->writeqlock);
 | 
				
			||||||
 | 
						rval->writeq = NULL;
 | 
				
			||||||
 | 
						rval->state = DCB_STATE_ALLOC;
 | 
				
			||||||
 | 
						memset(&rval->stats, 0, sizeof(DCBSTATS));	// Zero the statistics
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						spinlock_acquire(dcbspin);
 | 
				
			||||||
 | 
						if (allDCBs == NULL)
 | 
				
			||||||
 | 
							allDCBs = rval;
 | 
				
			||||||
 | 
						else
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							DCB *ptr = allDCBs;
 | 
				
			||||||
 | 
							while (ptr->next)
 | 
				
			||||||
 | 
								ptr = ptr->next;
 | 
				
			||||||
 | 
							ptr->next = rval;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						spinlock_release(dcbspin);
 | 
				
			||||||
 | 
						return rval;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
 | 
					 * Free a DCB and remove it from the chain of all DCBs
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * @param dcb THe DCB to free
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					void
 | 
				
			||||||
 | 
					free_dcb(DCB *dcb)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						dcb->state = DCB_STATE_FREED;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						/* First remove this DCB from the chain */
 | 
				
			||||||
 | 
						spinlock_acquire(dcbspin);
 | 
				
			||||||
 | 
						if (allDCBs == dcb)
 | 
				
			||||||
 | 
							allDCBs = dcb->next;
 | 
				
			||||||
 | 
						else
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							DCB *ptr = allDCBs;
 | 
				
			||||||
 | 
							while (ptr && ptr->next != dcb)
 | 
				
			||||||
 | 
								ptr = ptr->next;
 | 
				
			||||||
 | 
							if (ptr)
 | 
				
			||||||
 | 
								ptr->next = dcb->next;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						spinlock_release(dcbspin);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						free(dcb);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
 | 
					 * Diagnostic to print a DCB
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * @param dcb	The DCB to print
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					void
 | 
				
			||||||
 | 
					printDCB(DCB *dcb)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						(void)printf("DCB: 0x%x\n", (void *)dcb);
 | 
				
			||||||
 | 
						(void)printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state));
 | 
				
			||||||
 | 
						(void)printf("\tQueued write data: %d\n", gwbuf_length(dcb->writeq));
 | 
				
			||||||
 | 
						(void)printf("\tStatistics:\n");
 | 
				
			||||||
 | 
						(void)printf("\t\tNo. of Reads: %d\n", dcb->stats.n_reads);
 | 
				
			||||||
 | 
						(void)printf("\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
 | 
				
			||||||
 | 
						(void)printf("\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
 | 
					 * Diagnostic to print all DCB allocated in the system
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					void printAllDCBs()
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					DCB	*dcb;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if (dcbspin == NULL)
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							if ((dcbspin = malloc(sizeof(SPINLOCK))) == NULL)
 | 
				
			||||||
 | 
								return;
 | 
				
			||||||
 | 
							spinlock_init(dcbspin);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						spinlock_acquire(dcbspin);
 | 
				
			||||||
 | 
						dcb = allDCBs;
 | 
				
			||||||
 | 
						while (dcb)
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							printDCB(dcb);
 | 
				
			||||||
 | 
							dcb = dcb->next;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						spinlock_release(dcbspin);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
 | 
					 * Return a string representation of a DCB state.
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * @param state	The DCB state
 | 
				
			||||||
 | 
					 * @return String representation of the state
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					const char *
 | 
				
			||||||
 | 
					gw_dcb_state2string (int state) {
 | 
				
			||||||
 | 
						switch(state) {
 | 
				
			||||||
 | 
							case DCB_STATE_ALLOC:
 | 
				
			||||||
 | 
								return "DCB Allocated";
 | 
				
			||||||
 | 
							case DCB_STATE_IDLE:
 | 
				
			||||||
 | 
								return "DCB not yet in polling";
 | 
				
			||||||
 | 
							case DCB_STATE_POLLING:
 | 
				
			||||||
 | 
								return "DCB in the EPOLL";
 | 
				
			||||||
 | 
							case DCB_STATE_PROCESSING:
 | 
				
			||||||
 | 
								return "DCB processing event";
 | 
				
			||||||
 | 
							case DCB_STATE_LISTENING:
 | 
				
			||||||
 | 
								return "DCB for listening socket";
 | 
				
			||||||
 | 
							case DCB_STATE_DISCONNECTED:
 | 
				
			||||||
 | 
								return "DCB socket closed";
 | 
				
			||||||
 | 
							default:
 | 
				
			||||||
 | 
								return "DCB (unknown)";
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@ -265,7 +265,7 @@ int main(int argc, char **argv) {
 | 
				
			|||||||
			if (events[n].events & EPOLLOUT) {
 | 
								if (events[n].events & EPOLLOUT) {
 | 
				
			||||||
				if (dcb->state != DCB_STATE_LISTENING) {
 | 
									if (dcb->state != DCB_STATE_LISTENING) {
 | 
				
			||||||
					fprintf(stderr, "CALL the WRITE pointer\n");
 | 
										fprintf(stderr, "CALL the WRITE pointer\n");
 | 
				
			||||||
					(dcb->func).write(dcb, epollfd);
 | 
										(dcb->func).write_ready(dcb, epollfd);
 | 
				
			||||||
					fprintf(stderr, ">>> CALLED the WRITE pointer\n");
 | 
										fprintf(stderr, ">>> CALLED the WRITE pointer\n");
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
				
			|||||||
@ -191,21 +191,3 @@ int do_read_buffer(DCB *dcb, uint8_t *buffer) {
 | 
				
			|||||||
	return n;
 | 
						return n;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const char *gw_dcb_state2string (int state) {
 | 
					 | 
				
			||||||
	switch(state) {
 | 
					 | 
				
			||||||
		case DCB_STATE_ALLOC:
 | 
					 | 
				
			||||||
			return "DCB Allocated";
 | 
					 | 
				
			||||||
		case DCB_STATE_IDLE:
 | 
					 | 
				
			||||||
			return "DCB not yet in polling";
 | 
					 | 
				
			||||||
		case DCB_STATE_POLLING:
 | 
					 | 
				
			||||||
			return "DCB in the EPOLL";
 | 
					 | 
				
			||||||
		case DCB_STATE_PROCESSING:
 | 
					 | 
				
			||||||
			return "DCB processing event";
 | 
					 | 
				
			||||||
		case DCB_STATE_LISTENING:
 | 
					 | 
				
			||||||
			return "DCB for listening socket";
 | 
					 | 
				
			||||||
		case DCB_STATE_DISCONNECTED:
 | 
					 | 
				
			||||||
			return "DCB socket closed";
 | 
					 | 
				
			||||||
		default:
 | 
					 | 
				
			||||||
			return "DCB (unknown)";
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
				
			|||||||
							
								
								
									
										260
									
								
								core/utils.c
									
									
									
									
									
								
							
							
						
						
									
										260
									
								
								core/utils.c
									
									
									
									
									
								
							@ -61,12 +61,10 @@ int gw_read_backend_event(DCB *dcb, int epfd) {
 | 
				
			|||||||
	if ((client_protocol->state == MYSQL_WAITING_RESULT) || (client_protocol->state == MYSQL_IDLE)) {
 | 
						if ((client_protocol->state == MYSQL_WAITING_RESULT) || (client_protocol->state == MYSQL_IDLE)) {
 | 
				
			||||||
		struct epoll_event new_event;
 | 
							struct epoll_event new_event;
 | 
				
			||||||
		int w;
 | 
							int w;
 | 
				
			||||||
		int count_reads = 0;
 | 
					 | 
				
			||||||
		int count_writes = 0;
 | 
					 | 
				
			||||||
		uint8_t buffer[MAX_BUFFER_SIZE];
 | 
					 | 
				
			||||||
		int b = -1;
 | 
							int b = -1;
 | 
				
			||||||
		int tot_b = -1;
 | 
							int tot_b = -1;
 | 
				
			||||||
		uint8_t *ptr_buffer;
 | 
							uint8_t *ptr_buffer;
 | 
				
			||||||
 | 
							GWBUF	*buffer, *head;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if (ioctl(dcb->fd, FIONREAD, &b)) {
 | 
							if (ioctl(dcb->fd, FIONREAD, &b)) {
 | 
				
			||||||
			fprintf(stderr, "Backend Ioctl FIONREAD error %i, %s\n", errno , strerror(errno));
 | 
								fprintf(stderr, "Backend Ioctl FIONREAD error %i, %s\n", errno , strerror(errno));
 | 
				
			||||||
@ -74,91 +72,34 @@ int gw_read_backend_event(DCB *dcb, int epfd) {
 | 
				
			|||||||
			fprintf(stderr, "Backend IOCTL FIONREAD bytes to read = %i\n", b);
 | 
								fprintf(stderr, "Backend IOCTL FIONREAD bytes to read = %i\n", b);
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// detect pending data in the buffer for client write
 | 
							/*
 | 
				
			||||||
		if (dcb->session->client->buff_bytes > 0) {
 | 
							 * Read all the data that is available into a chain of buffers
 | 
				
			||||||
 | 
							 */
 | 
				
			||||||
			fprintf(stderr, "*********** Read backend says there are pending writes for %i bytes\n", dcb->session->client->buff_bytes);
 | 
							head = NULL;
 | 
				
			||||||
			// read data from socket, assume no error here (quick) and put it into the DCB second buffer
 | 
							while (b > 0)
 | 
				
			||||||
			GW_NOINTR_CALL(n = read(dcb->fd, buffer, b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE); count_reads++);
 | 
							{
 | 
				
			||||||
 | 
								int bufsize = b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE;
 | 
				
			||||||
			memcpy(&dcb->session->client->second_buff_bytes, &n, sizeof(int));
 | 
								if ((buffer = gwbuf_alloc(bufsize)) == NULL)
 | 
				
			||||||
			fprintf(stderr, "#### second buff_bytes set to %i\n", dcb->session->client->second_buff_bytes);
 | 
								{
 | 
				
			||||||
 | 
									/* Bad news, we have run out of memory */
 | 
				
			||||||
			memcpy(dcb->session->client->second_buffer, buffer, n);
 | 
					 | 
				
			||||||
			fprintf(stderr, "#### filled memory second buffer!\n");
 | 
					 | 
				
			||||||
			dcb->session->client->second_buffer_ptr = dcb->session->client->second_buffer;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			return 1;
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		// else, no pending data
 | 
					 | 
				
			||||||
		tot_b = b;
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
		// read all the data, without using multiple buffers, only one
 | 
					 | 
				
			||||||
		while (b > 0) {
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			GW_NOINTR_CALL(n = read(dcb->fd, buffer, b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE); count_reads++);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			fprintf (stderr, "Read %i/%i bytes done, n. reads = %i, %i. Next could be write\n", n, b, count_reads, errno);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			if (n < 0) {
 | 
					 | 
				
			||||||
				if ((errno != EAGAIN) || (errno != EWOULDBLOCK))
 | 
					 | 
				
			||||||
				return 0;
 | 
									return 0;
 | 
				
			||||||
				else
 | 
								}
 | 
				
			||||||
					return 1;
 | 
								GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); dcb->stats.n_reads++);
 | 
				
			||||||
			} else {
 | 
								if (n < 0) {
 | 
				
			||||||
 | 
									// if eerno == EAGAIN || EWOULDBLOCK is missing
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									// do the rigth task, not just break
 | 
				
			||||||
 | 
									break;
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								head = gwbuf_append(head, buffer);
 | 
				
			||||||
 | 
								
 | 
				
			||||||
 | 
								// how many bytes left
 | 
				
			||||||
			b = b - n;
 | 
								b = b - n;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		ptr_buffer = buffer;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if(n >2)
 | 
							dcb->session->client->func.write(dcb->session->client, head);
 | 
				
			||||||
			fprintf(stderr, ">>> The READ BUFFER last 2 byte [%i][%i]\n", buffer[n-2], buffer[n-1]);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					 | 
				
			||||||
		// write all the data
 | 
					 | 
				
			||||||
		// if write fails for EAGAIN or EWOULDBLOCK, copy the data into the dcb buffer
 | 
					 | 
				
			||||||
		while (n >0) {
 | 
					 | 
				
			||||||
				GW_NOINTR_CALL(w = write(dcb->session->client->fd, ptr_buffer, n); count_writes++);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				fprintf (stderr, "Write Cycle %i,  %i of %i bytes done, errno %i\n", count_writes, w, n, errno);
 | 
					 | 
				
			||||||
				if (w > 2)
 | 
					 | 
				
			||||||
					fprintf(stderr, "<<< writed BUFFER last 2 byte [%i][%i]\n", ptr_buffer[w-2], ptr_buffer[w-1]);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				if (w < 0) {
 | 
					 | 
				
			||||||
					if ((errno != EAGAIN) || (errno != EWOULDBLOCK)) {
 | 
					 | 
				
			||||||
						break;
 | 
					 | 
				
			||||||
					} else {
 | 
					 | 
				
			||||||
						fprintf(stderr, "<<<< Write to client not completed for %i bytes!\n", n);
 | 
					 | 
				
			||||||
						if (dcb->session->client) {
 | 
					 | 
				
			||||||
							fprintf(stderr, "<<<< Try to set buff_bytes!\n");
 | 
					 | 
				
			||||||
							memcpy(&dcb->session->client->buff_bytes, &n, sizeof(int));
 | 
					 | 
				
			||||||
							fprintf(stderr, "<<<< buff_bytes set to %i\n", dcb->session->client->buff_bytes);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
							fprintf(stderr, "<<<< Try to fill memory buffer!\n");
 | 
					 | 
				
			||||||
							memcpy(dcb->session->client->buffer, ptr_buffer, n);
 | 
					 | 
				
			||||||
							dcb->session->client->buffer_ptr = dcb->session->client->buffer;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
							fprintf(stderr, "<<<< Buffer Write to client has %i bytes\n", dcb->session->client->buff_bytes);
 | 
					 | 
				
			||||||
							if (n > 1) {
 | 
					 | 
				
			||||||
							fprintf(stderr, "<<<< Buffer bytes last 2 [%i]\n", ptr_buffer[0]);
 | 
					 | 
				
			||||||
							fprintf(stderr, "<<<< Buffer bytes last [%i]\n", ptr_buffer[1]);
 | 
					 | 
				
			||||||
							}
 | 
					 | 
				
			||||||
						} else {
 | 
					 | 
				
			||||||
							fprintf(stderr, "<<< do data in memory for Buffer Write to client\n");
 | 
					 | 
				
			||||||
						}
 | 
					 | 
				
			||||||
						return 0;
 | 
					 | 
				
			||||||
					}
 | 
					 | 
				
			||||||
				} else {
 | 
					 | 
				
			||||||
					n = n - w;
 | 
					 | 
				
			||||||
					ptr_buffer = ptr_buffer + w;
 | 
					 | 
				
			||||||
					fprintf(stderr, "<<< Write: pointer to buffer %i bytes shifted\n", w);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
					memset(&dcb->session->client->buff_bytes, '\0', sizeof(int));
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		return 1;
 | 
							return 1;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
#ifdef GW_DEBUG_READ_EVENT
 | 
					#ifdef GW_DEBUG_READ_EVENT
 | 
				
			||||||
@ -168,6 +109,75 @@ int gw_read_backend_event(DCB *dcb, int epfd) {
 | 
				
			|||||||
	return 1;
 | 
						return 1;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/*
 | 
				
			||||||
 | 
					 * Write function for client DCB
 | 
				
			||||||
 | 
					 *
 | 
				
			||||||
 | 
					 * @param dcb	The DCB of the client
 | 
				
			||||||
 | 
					 * @param queue	Queue of buffers to write
 | 
				
			||||||
 | 
					 */
 | 
				
			||||||
 | 
					int
 | 
				
			||||||
 | 
					MySQLWrite(DCB *dcb, GWBUF *queue)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					int	w, saved_errno = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						spinlock_acquire(&dcb->writeqlock);
 | 
				
			||||||
 | 
						if (dcb->writeq)
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							/*
 | 
				
			||||||
 | 
							 * We have some queued data, so add our data to
 | 
				
			||||||
 | 
							 * the write queue and return.
 | 
				
			||||||
 | 
							 * The assumption is that there will be an EPOLLOUT
 | 
				
			||||||
 | 
							 * event to drain what is already queued. We are protected
 | 
				
			||||||
 | 
							 * by the spinlock, which will also be acquired by the
 | 
				
			||||||
 | 
							 * the routine that drains the queue data, so we should
 | 
				
			||||||
 | 
							 * not have a race condition on the event.
 | 
				
			||||||
 | 
							 */
 | 
				
			||||||
 | 
							dcb->writeq = gwbuf_append(dcb->writeq, queue);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						else
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							int	len;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							/*
 | 
				
			||||||
 | 
							 * Loop over the buffer chain that has been passed to us
 | 
				
			||||||
 | 
							 * from the reading side.
 | 
				
			||||||
 | 
							 * Send as much of the data in that chain as possible and
 | 
				
			||||||
 | 
							 * add any balance to the write queue.
 | 
				
			||||||
 | 
							 */
 | 
				
			||||||
 | 
							while (queue != NULL)
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								len = GWBUF_LENGTH(queue);
 | 
				
			||||||
 | 
								GW_NOINTR_CALL(w = write(dcb->fd, GWBUF_DATA(queue), len); dcb->stats.n_writes++);
 | 
				
			||||||
 | 
								saved_errno = errno;
 | 
				
			||||||
 | 
								if (w < 0)
 | 
				
			||||||
 | 
								{
 | 
				
			||||||
 | 
									break;
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								/*
 | 
				
			||||||
 | 
								 * Pull the number of bytes we have written from
 | 
				
			||||||
 | 
								 * queue with have.
 | 
				
			||||||
 | 
								 */
 | 
				
			||||||
 | 
								queue = gwbuf_consume(queue, w);
 | 
				
			||||||
 | 
								if (w < len)
 | 
				
			||||||
 | 
								{
 | 
				
			||||||
 | 
									/* We didn't write all the data */
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							/* Buffer the balance of any data */
 | 
				
			||||||
 | 
							dcb->writeq = queue;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						spinlock_release(&dcb->writeqlock);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if (queue && (saved_errno != EAGAIN || saved_errno != EWOULDBLOCK))
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							/* We had a real write failure that we must deal with */
 | 
				
			||||||
 | 
							return 0;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return 1;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
//////////////////////////////////////////
 | 
					//////////////////////////////////////////
 | 
				
			||||||
//backend write event triggered by EPOLLOUT
 | 
					//backend write event triggered by EPOLLOUT
 | 
				
			||||||
//////////////////////////////////////////
 | 
					//////////////////////////////////////////
 | 
				
			||||||
@ -322,7 +332,6 @@ int gw_handle_write_event(DCB *dcb, int epfd) {
 | 
				
			|||||||
	MySQLProtocol *protocol = NULL;
 | 
						MySQLProtocol *protocol = NULL;
 | 
				
			||||||
	int n;
 | 
						int n;
 | 
				
			||||||
        struct epoll_event new_event;
 | 
					        struct epoll_event new_event;
 | 
				
			||||||
        n = dcb->buff_bytes;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (dcb == NULL) {
 | 
						if (dcb == NULL) {
 | 
				
			||||||
@ -360,8 +369,7 @@ int gw_handle_write_event(DCB *dcb, int epfd) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (dcb->session->backends) {
 | 
						if (dcb->session->backends) {
 | 
				
			||||||
		fprintf(stderr, "CLIENT WRITE READY State [%i], FIRST bytes left to write %i from back %i to client %i\n", dcb->state, dcb->buff_bytes, dcb->session->backends->fd, dcb->fd);
 | 
							fprintf(stderr, "CLIENT WRITE READY State [%i], FIRST bytes left to write %i from back %i to client %i\n", dcb->state, gwbuf_length(dcb->writeq), dcb->session->backends->fd, dcb->fd);
 | 
				
			||||||
		fprintf(stderr, "CLIENT WRITE READY, SECOND bytes left to write %i from back %i to client %i\n", dcb->second_buff_bytes, dcb->session->backends->fd, dcb->fd);
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
//#ifdef GW_DEBUG_WRITE_EVENT
 | 
					//#ifdef GW_DEBUG_WRITE_EVENT
 | 
				
			||||||
@ -392,50 +400,40 @@ int gw_handle_write_event(DCB *dcb, int epfd) {
 | 
				
			|||||||
	if ((protocol->state == MYSQL_IDLE) || (protocol->state == MYSQL_WAITING_RESULT)) {
 | 
						if ((protocol->state == MYSQL_IDLE) || (protocol->state == MYSQL_WAITING_RESULT)) {
 | 
				
			||||||
		int w;
 | 
							int w;
 | 
				
			||||||
		int m;
 | 
							int m;
 | 
				
			||||||
 | 
							int saved_errno = 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if (dcb->buff_bytes > 0) {
 | 
							spinlock_acquire(&dcb->writeqlock);
 | 
				
			||||||
			fprintf(stderr, "<<< Writing unsent data for state [%i], bytes %i\n", protocol->state, dcb->buff_bytes);
 | 
							if (dcb->writeq)
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								int	len;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if (dcb->buff_bytes > 2)
 | 
								/*
 | 
				
			||||||
				fprintf(stderr, "READ BUFFER last 2 byte [%i%i]\n", dcb->buffer[dcb->buff_bytes-2], dcb->buffer[dcb->buff_bytes-1]);
 | 
								 * Loop over the buffer chain in the pendign writeq
 | 
				
			||||||
 | 
								 * Send as much of the data in that chain as possible and
 | 
				
			||||||
			w = write(dcb->fd, dcb->buffer_ptr, dcb->buff_bytes);
 | 
								 * leave any balance on the write queue.
 | 
				
			||||||
 | 
								 */
 | 
				
			||||||
			if (w < 0) {
 | 
								while (dcb->writeq != NULL)
 | 
				
			||||||
				if ((w != EAGAIN) || (w!= EWOULDBLOCK)) {
 | 
								{
 | 
				
			||||||
					return 1;
 | 
									len = GWBUF_LENGTH(dcb->writeq);
 | 
				
			||||||
				} else
 | 
									GW_NOINTR_CALL(w = write(dcb->fd, GWBUF_DATA(dcb->writeq), len););
 | 
				
			||||||
					return 0;
 | 
									saved_errno = errno;
 | 
				
			||||||
			}
 | 
									if (w < 0)
 | 
				
			||||||
			fprintf(stderr, "<<<<< Written %i bytes, left %i\n", w, dcb->buff_bytes - w);
 | 
									{
 | 
				
			||||||
			n = n-w;
 | 
										break;
 | 
				
			||||||
			memcpy(&dcb->buff_bytes, &n, sizeof(int));
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			dcb->buffer_ptr = dcb->buffer_ptr + w;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			fprintf(stderr, "<<<<< Next time write %i bytes, buffer_ptr is %i bytes shifted\n", n, w);
 | 
					 | 
				
			||||||
		} else {
 | 
					 | 
				
			||||||
			fprintf(stderr, "<<<< Nothing to do with FIRST buffer left bytes\n");
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		m = dcb->second_buff_bytes;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		if (dcb->second_buff_bytes) {
 | 
					 | 
				
			||||||
			fprintf(stderr, "<<<< Now use the SECOND buffer left %i bytes\n", m);
 | 
					 | 
				
			||||||
			w = write(dcb->fd, dcb->second_buffer_ptr, dcb->second_buff_bytes);
 | 
					 | 
				
			||||||
			if (w < 0) {
 | 
					 | 
				
			||||||
				if ((w != EAGAIN) || (w!= EWOULDBLOCK)) {
 | 
					 | 
				
			||||||
					return 1;
 | 
					 | 
				
			||||||
				} else
 | 
					 | 
				
			||||||
					return 0;
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			fprintf(stderr, "<<<<< second Written %i bytes, left %i\n", w, dcb->second_buff_bytes - w);
 | 
					 | 
				
			||||||
			m = m-w;
 | 
					 | 
				
			||||||
			memcpy(&dcb->second_buff_bytes, &m, sizeof(int));
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			dcb->second_buffer_ptr = dcb->second_buffer_ptr + w;
 | 
					 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		fprintf(stderr, "<<<< Nothing to do with SECOND buffer left bytes\n");
 | 
									/*
 | 
				
			||||||
 | 
									 * Pull the number of bytes we have written from
 | 
				
			||||||
 | 
									 * queue with have.
 | 
				
			||||||
 | 
									 */
 | 
				
			||||||
 | 
									dcb->writeq = gwbuf_consume(dcb->writeq, w);
 | 
				
			||||||
 | 
									if (w < len)
 | 
				
			||||||
 | 
									{
 | 
				
			||||||
 | 
										/* We didn't write all the data */
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							spinlock_release(&dcb->writeqlock);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		return 1;
 | 
							return 1;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
@ -541,7 +539,6 @@ void MySQLListener(int epfd, char *config_bind) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
int MySQLAccept(DCB *listener, int efd) {
 | 
					int MySQLAccept(DCB *listener, int efd) {
 | 
				
			||||||
	int accept_counter = 0;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	fprintf(stderr, "MySQL Listener socket is: %i\n", listener->fd);
 | 
						fprintf(stderr, "MySQL Listener socket is: %i\n", listener->fd);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -574,9 +571,9 @@ int MySQLAccept(DCB *listener, int efd) {
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		accept_counter++;
 | 
							listener->stats.n_accepts++;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		fprintf(stderr, "Processing %i connection fd %i for listener %i\n", accept_counter, c_sock, listener->fd);
 | 
							fprintf(stderr, "Processing %i connection fd %i for listener %i\n", listener->stats.n_accepts, c_sock, listener->fd);
 | 
				
			||||||
		// set nonblocking 
 | 
							// set nonblocking 
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen);
 | 
							setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen);
 | 
				
			||||||
@ -629,7 +626,7 @@ int MySQLAccept(DCB *listener, int efd) {
 | 
				
			|||||||
				backend->state = DCB_STATE_POLLING;
 | 
									backend->state = DCB_STATE_POLLING;
 | 
				
			||||||
				backend->session = session;
 | 
									backend->session = session;
 | 
				
			||||||
				(backend->func).read = gw_read_backend_event;
 | 
									(backend->func).read = gw_read_backend_event;
 | 
				
			||||||
				(backend->func).write = gw_write_backend_event;
 | 
									(backend->func).write_ready = gw_write_backend_event;
 | 
				
			||||||
				(backend->func).error = handle_event_errors_backend;
 | 
									(backend->func).error = handle_event_errors_backend;
 | 
				
			||||||
		
 | 
							
 | 
				
			||||||
				// assume here one backend only.
 | 
									// assume here one backend only.
 | 
				
			||||||
@ -643,7 +640,8 @@ int MySQLAccept(DCB *listener, int efd) {
 | 
				
			|||||||
		// assign function poiters to "func" field
 | 
							// assign function poiters to "func" field
 | 
				
			||||||
		(client->func).error = handle_event_errors;
 | 
							(client->func).error = handle_event_errors;
 | 
				
			||||||
		(client->func).read = gw_route_read_event;
 | 
							(client->func).read = gw_route_read_event;
 | 
				
			||||||
		(client->func).write = gw_handle_write_event;
 | 
							(client->func).write = MySQLWrite;
 | 
				
			||||||
 | 
							(client->func).write_ready = gw_handle_write_event;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// edge triggering flag added
 | 
							// edge triggering flag added
 | 
				
			||||||
		ee.events = EPOLLIN | EPOLLOUT | EPOLLET;
 | 
							ee.events = EPOLLIN | EPOLLOUT | EPOLLET;
 | 
				
			||||||
@ -708,7 +706,7 @@ static char gw_randomchar() {
 | 
				
			|||||||
int gw_generate_random_str(char *output, int len) {
 | 
					int gw_generate_random_str(char *output, int len) {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	int i;
 | 
						int i;
 | 
				
			||||||
	srand(time());
 | 
						srand(time(0L));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for ( i = 0; i < len; ++i ) {
 | 
						for ( i = 0; i < len; ++i ) {
 | 
				
			||||||
		output[i] = gw_randomchar();
 | 
							output[i] = gw_randomchar();
 | 
				
			||||||
 | 
				
			|||||||
@ -17,6 +17,8 @@
 | 
				
			|||||||
 *
 | 
					 *
 | 
				
			||||||
 * Copyright SkySQL Ab 2013
 | 
					 * Copyright SkySQL Ab 2013
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
 | 
					#include <spinlock.h>
 | 
				
			||||||
 | 
					#include <buffer.h>
 | 
				
			||||||
 | 
					
 | 
				
			||||||
struct session;
 | 
					struct session;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -32,7 +34,6 @@ struct session;
 | 
				
			|||||||
 *					entry points
 | 
					 *					entry points
 | 
				
			||||||
 *
 | 
					 *
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
 */
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
struct dcb;
 | 
					struct dcb;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -50,14 +51,19 @@ typedef struct gw_protocol {
 | 
				
			|||||||
	 *	close		Gateway close entry point for the socket
 | 
						 *	close		Gateway close entry point for the socket
 | 
				
			||||||
         */                             
 | 
					         */                             
 | 
				
			||||||
	int		(*read)(struct dcb *, int);
 | 
						int		(*read)(struct dcb *, int);
 | 
				
			||||||
	int		(*write)(struct dcb *, int);
 | 
						int		(*write)(struct dcb *, GWBUF *);
 | 
				
			||||||
	int		(*write_ready)(struct dcb *);
 | 
						int		(*write_ready)(struct dcb *, int);
 | 
				
			||||||
	int		(*error)(struct dcb *, int);
 | 
						int		(*error)(struct dcb *, int);
 | 
				
			||||||
	int		(*hangup)(struct dcb *, int);
 | 
						int		(*hangup)(struct dcb *, int);
 | 
				
			||||||
	int		(*accept)(struct dcb *, int);
 | 
						int		(*accept)(struct dcb *, int);
 | 
				
			||||||
	int		(*close)(struct dcb *, int);
 | 
						int		(*close)(struct dcb *, int);
 | 
				
			||||||
} GWPROTOCOL;
 | 
					} GWPROTOCOL;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					typedef struct dcbstats {
 | 
				
			||||||
 | 
						int		n_reads;	/* Number of reads on this descriptor */
 | 
				
			||||||
 | 
						int		n_writes;	/* Number of writes on this descriptor */
 | 
				
			||||||
 | 
						int		n_accepts;	/* Number of accepts on this descriptor */
 | 
				
			||||||
 | 
					} DCBSTATS;
 | 
				
			||||||
/*
 | 
					/*
 | 
				
			||||||
 * Descriptor Control Block
 | 
					 * Descriptor Control Block
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
@ -68,16 +74,12 @@ typedef struct dcb {
 | 
				
			|||||||
	struct session	*session;	/* The owning session */
 | 
						struct session	*session;	/* The owning session */
 | 
				
			||||||
	GWPROTOCOL	func;		/* The functions for this descrioptor */
 | 
						GWPROTOCOL	func;		/* The functions for this descrioptor */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/* queue buffer for write
 | 
						SPINLOCK	writeqlock;	/* Write Queue spinlock */
 | 
				
			||||||
	is now a two buffer implementation
 | 
						GWBUF		*writeq;	/* Write Data Queue */
 | 
				
			||||||
	Only used in client write
 | 
					
 | 
				
			||||||
	*/
 | 
						DCBSTATS	stats;		/* DCB related statistics */
 | 
				
			||||||
	uint8_t buffer[MAX_BUFFER_SIZE];	/* network buffer */
 | 
					
 | 
				
			||||||
	int buff_bytes;				/* bytes in buffer */
 | 
						struct dcb	*next;		/* Next DCB in the chain of allocated DCB's */
 | 
				
			||||||
	uint8_t *buffer_ptr;			/* buffer pointer */
 | 
					 | 
				
			||||||
	uint8_t second_buffer[MAX_BUFFER_SIZE];	/* 2nd network buffer */
 | 
					 | 
				
			||||||
	int second_buff_bytes;			/* 2nd bytes in buffer */
 | 
					 | 
				
			||||||
	uint8_t *second_buffer_ptr;		/* 2nd buffer pointer */
 | 
					 | 
				
			||||||
} DCB;
 | 
					} DCB;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/* DCB states */
 | 
					/* DCB states */
 | 
				
			||||||
@ -93,4 +95,10 @@ typedef struct dcb {
 | 
				
			|||||||
#define	DCB_SESSION(x)			(x)->session
 | 
					#define	DCB_SESSION(x)			(x)->session
 | 
				
			||||||
#define DCB_PROTOCOL(x, type)		(type *)((x)->protocol)
 | 
					#define DCB_PROTOCOL(x, type)		(type *)((x)->protocol)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					extern DCB		*alloc_dcb();			/* Allocate a DCB */
 | 
				
			||||||
 | 
					extern void		free_dcb(DCB *);		/* Free a DCB */
 | 
				
			||||||
 | 
					extern void		printAllDCBs();			/* Debug to print all DCB in the system */
 | 
				
			||||||
 | 
					extern void		printDCB(DCB *);		/* Debug print routine */
 | 
				
			||||||
 | 
					extern const char 	*gw_dcb_state2string(int);	/* DCB state to string */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#endif
 | 
					#endif
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user