Split the backend read function as an example, this still needs to be reorganised into protocols

Addition of DCB diagnostics, and free routine
This commit is contained in:
Mark Riddoch 2013-06-12 12:57:09 +01:00
parent 329a70eccd
commit c7f533abaf
4 changed files with 247 additions and 64 deletions

168
core/dcb.c Normal file
View File

@ -0,0 +1,168 @@
/*
* This file is distributed as part of the SkySQL Gateway. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2013
*/
/*
* dcb.c - Descriptor Control Block generic functions
*
* Revision History
*
* Date Who Description
* 12/06/13 Mark Riddoch Initial implementation
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <dcb.h>
#include <spinlock.h>
static DCB *allDCBs = NULL; /* Diagnotics need a list of DCBs */
static SPINLOCK *dcbspin = NULL;
/*
* Allocate a new DCB.
*
* This routine performs the generic initialisation on the DCB before returning
* the newly allocated DCB.
*
* @return A newly allocated DCB or NULL if non could be allocated.
*/
DCB *
alloc_dcb()
{
DCB *rval;
if (dcbspin == NULL)
{
if ((dcbspin = malloc(sizeof(SPINLOCK))) == NULL)
return NULL;
spinlock_init(dcbspin);
}
if ((rval = malloc(sizeof(DCB))) == NULL)
{
return NULL;
}
spinlock_init(&rval->writeqlock);
rval->writeq = NULL;
rval->state = DCB_STATE_ALLOC;
spinlock_acquire(dcbspin);
if (allDCBs == NULL)
allDCBs = rval;
else
{
DCB *ptr = allDCBs;
while (ptr->next)
ptr = ptr->next;
ptr->next = rval;
}
spinlock_release(dcbspin);
return rval;
}
/*
* Free a DCB and remove it from the chain of all DCBs
*
* @param dcb THe DCB to free
*/
void
free_dcb(DCB *dcb)
{
dcb->state = DCB_STATE_FREED;
/* First remove this DCB from the chain */
spinlock_acquire(dcbspin);
if (allDCBs == dcb)
allDCBs = dcb->next;
else
{
DCB *ptr = allDCBs;
while (ptr && ptr->next != dcb)
ptr = ptr->next;
if (ptr)
ptr->next = dcb->next;
}
spinlock_release(dcbspin);
free(dcb);
}
/*
* Diagnostic to print a DCB
*
* @param dcb The DCB to print
*
*/
void
printDCB(DCB *dcb)
{
(void)printf("DCB: 0x%x\n", (void *)dcb);
(void)printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state));
(void)printf("\tQueued write data: %d\n", gwbuf_length(dcb->writeq));
}
/*
* Diagnostic to print all DCB allocated in the system
*
*/
void printAllDCBs()
{
DCB *dcb;
if (dcbspin == NULL)
{
if ((dcbspin = malloc(sizeof(SPINLOCK))) == NULL)
return;
spinlock_init(dcbspin);
}
spinlock_acquire(dcbspin);
dcb = allDCBs;
while (dcb)
{
printDCB(dcb);
dcb = dcb->next;
}
spinlock_release(dcbspin);
}
/*
* Return a string representation of a DCB state.
*
* @param state The DCB state
* @return String representation of the state
*
*/
const char *
gw_dcb_state2string (int state) {
switch(state) {
case DCB_STATE_ALLOC:
return "DCB Allocated";
case DCB_STATE_IDLE:
return "DCB not yet in polling";
case DCB_STATE_POLLING:
return "DCB in the EPOLL";
case DCB_STATE_PROCESSING:
return "DCB processing event";
case DCB_STATE_LISTENING:
return "DCB for listening socket";
case DCB_STATE_DISCONNECTED:
return "DCB socket closed";
default:
return "DCB (unknown)";
}
}

View File

@ -257,7 +257,7 @@ int main(int argc, char **argv) {
if (events[n].events & EPOLLOUT) {
if (dcb->state != DCB_STATE_LISTENING) {
fprintf(stderr, "CALL the WRITE pointer\n");
(dcb->func).write(dcb, epollfd);
(dcb->func).write_ready(dcb, epollfd);
fprintf(stderr, ">>> CALLED the WRITE pointer\n");
}
}

View File

@ -95,65 +95,8 @@ int gw_read_backend_event(DCB *dcb, int epfd) {
** This next bit really belongs in the write function for the client DCB
** It is here now just to illustrate the use of the buffers
*/
{
DCB *client = dcb->session->client;
int saved_errno = 0;
dcb->session->client->func.write(dcb->session->client, head);
spinlock_acquire(&client->writeqlock);
if (client->writeq)
{
/*
* We have some queued data, so add our data to
* the write queue and return.
* The assumption is that there will be an EPOLLOUT
* event to drain what is already queued. We are protected
* by the spinlock, which will also be acquired by the
* the routine that drains the queue data, so we should
* not have a race condition on the event.
*/
client->writeq = gwbuf_append(client->writeq, head);
}
else
{
int len;
/*
* Loop over the buffer chain that has been passed to us
* from the reading side.
* Send as much of the data in that chain as possible and
* add any balance to the write queue.
*/
while (head != NULL)
{
len = GWBUF_LENGTH(head);
GW_NOINTR_CALL(w = write(client->fd, GWBUF_DATA(head), len); count_writes++);
saved_errno = errno;
if (w < 0)
{
break;
}
/*
* Pull the number of bytes we have written from
* queue with have.
*/
head = gwbuf_consume(head, w);
if (w < len)
{
/* We didn't write all the data */
}
}
/* Buffer the balance of any data */
client->writeq = head;
}
spinlock_release(&client->writeqlock);
if (head && (saved_errno != EAGAIN || saved_errno != EWOULDBLOCK))
{
/* We had a real write failure that we must deal with */
}
}
return 1;
}
#ifdef GW_DEBUG_READ_EVENT
@ -163,6 +106,75 @@ int gw_read_backend_event(DCB *dcb, int epfd) {
return 1;
}
/*
* Write function for client DCB
*
* @param dcb The DCB of the client
* @param queue Queue of buffers to write
*/
int
MySQLWrite(DCB *dcb, GWBUF *queue)
{
int w, count_writes = 0, saved_errno = 0;
spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq)
{
/*
* We have some queued data, so add our data to
* the write queue and return.
* The assumption is that there will be an EPOLLOUT
* event to drain what is already queued. We are protected
* by the spinlock, which will also be acquired by the
* the routine that drains the queue data, so we should
* not have a race condition on the event.
*/
dcb->writeq = gwbuf_append(dcb->writeq, queue);
}
else
{
int len;
/*
* Loop over the buffer chain that has been passed to us
* from the reading side.
* Send as much of the data in that chain as possible and
* add any balance to the write queue.
*/
while (queue != NULL)
{
len = GWBUF_LENGTH(queue);
GW_NOINTR_CALL(w = write(dcb->fd, GWBUF_DATA(queue), len); count_writes++);
saved_errno = errno;
if (w < 0)
{
break;
}
/*
* Pull the number of bytes we have written from
* queue with have.
*/
queue = gwbuf_consume(queue, w);
if (w < len)
{
/* We didn't write all the data */
}
}
/* Buffer the balance of any data */
dcb->writeq = queue;
}
spinlock_release(&dcb->writeqlock);
if (queue && (saved_errno != EAGAIN || saved_errno != EWOULDBLOCK))
{
/* We had a real write failure that we must deal with */
return 0;
}
return 1;
}
//////////////////////////////////////////
//backend write event triggered by EPOLLOUT
//////////////////////////////////////////
@ -612,7 +624,7 @@ int MySQLAccept(DCB *listener, int efd) {
backend->state = DCB_STATE_POLLING;
backend->session = session;
(backend->func).read = gw_read_backend_event;
(backend->func).write = gw_write_backend_event;
(backend->func).write_ready = gw_write_backend_event;
(backend->func).error = handle_event_errors_backend;
// assume here one backend only.
@ -626,7 +638,8 @@ int MySQLAccept(DCB *listener, int efd) {
// assign function poiters to "func" field
(client->func).error = handle_event_errors;
(client->func).read = gw_route_read_event;
(client->func).write = gw_handle_write_event;
(client->func).write = MySQLWrite;
(client->func).write_ready = gw_handle_write_event;
// edge triggering flag added
ee.events = EPOLLIN | EPOLLOUT | EPOLLET;

View File

@ -51,8 +51,8 @@ typedef struct gw_protocol {
* close Gateway close entry point for the socket
*/
int (*read)(struct dcb *, int);
int (*write)(struct dcb *, int);
int (*write_ready)(struct dcb *);
int (*write)(struct dcb *, GWBUF *);
int (*write_ready)(struct dcb *, int);
int (*error)(struct dcb *, int);
int (*hangup)(struct dcb *, int);
int (*accept)(struct dcb *, int);
@ -84,13 +84,15 @@ typedef struct dcb {
#define DCB_STATE_PROCESSING 4 /* Processing an event */
#define DCB_STATE_LISTENING 5 /* The DCB is for a listening socket */
#define DCB_STATE_DISCONNECTED 6 /* The socket is now closed */
#define DCB_STATE_FREED 7 /* Memory freed */
#define DCB_STATE_FREED 7 /* Memory freed */
/* A few useful macros */
#define DCB_SESSION(x) (x)->session
#define DCB_PROTOCOL(x, type) (type *)((x)->protocol)
extern DCB *alloc_dcb(); /* Allocate a DCB */
extern void free_dcb(DCB *); /* Free a DCB */
extern void printAllDCBs(); /* Debug to print all DCB in the system */
extern void printDCB(DCB *); /* Debug print routine */
extern const char *gw_dcb_state2string(int); /* DCB state to string */