-------
Removed DCB states DCB_STATE_IDLE, and DCB_STATE_PROCESSING.
Added DCB_STATE_UNDEFINED for initial content for state variable which doesn't have any specific value set, and DCB_STATE_NOPOLLING to indicate that dcb has been removed from poll set.

Added following dcb roles: DCB_ROLE_SERVICE_LISTENER for listeners of services, and DCB_ROLE_REQUEST_HANDLER for client/backend dcbs. Listeners may have state DCB_STATE_LISTENING, but not DCB_STATE_POLLING. Request handlers may have DCB_STATE_POLLING but not DCB_STATE_LISTENING. Role is passed as an argument to dcb.c:dcb_alloc.

From now on, struct check numbers of DCB are included and checked in DEBUG build only.

Added dcb_role_t dcb_role-member to DCB as well as SPINLOCK dcb_initlock, which protects state changes.

Removed extern keyword from function declarations because functions are by default externally visible if they are declared in header.

dcb.b
------
Function dcb_set_state, and dcb_set_state_nomutex provide functions for changing dcb states. Latter implements a state machine for dcb.
Function dcb_add_to_zombieslist replaces dcb_free. It adds in atomic step dcb to zombieslist and changes state to DCB_STATE_ZOMBIE.
Function dcb_final_free removes dcb from allDCBs list, terminates router and client sessions, and frees dcb and related memory.
Function dcb_process_zombies removes executing thread from dcb's bitmask, and it there are no further thread bits, moves dcb to a victim list, and finally, for each dcb on victim list, closes fd and sets state to DCB_STATE_DISCONNECTED.
Function dcb_close sets dcb state to DCB_STATE_NOPOLLIN, removes dcb from poll set and sets bit to bitmask for each server thread in an atomic step.  

poll.c
------
Function poll_add_dcb sets either DCB_STATE_LISTENING or DCB_STATE_POLLING state for newly created dcb, depending whether the role of dcb is DCB_ROLE_SERVICE_LISTENER, or DCB_ROLE_REQUEST_HANDLER, respectively. Then dcb is set to poll set.

poll_waitevents : commented out code which skipped event if dcb was added to zombieslist or if fd was closed. Added state checks.

service.c : Minor changes.
httpd.c : Removed dcb state changes. They are done in core.
mysql_backend.c : Added checks, removed dcb state changes.
mysql_client.c : Removed dcb state changes. Added checks.
mysql_common.c : Minor changes
telnetd.c : Removed state changes. Replaced some typecasts and pointer references with local variable reads.
skygw_debug.h : Removed two states, and added two to state printing macro.
This commit is contained in:
vraatikka
2013-09-05 22:00:02 +03:00
parent 17ec98fa3d
commit 66e9be814b
10 changed files with 576 additions and 384 deletions

View File

@ -68,6 +68,11 @@ static SPINLOCK dcbspin = SPINLOCK_INIT;
static SPINLOCK zombiespin = SPINLOCK_INIT; static SPINLOCK zombiespin = SPINLOCK_INIT;
static void dcb_final_free(DCB *dcb); static void dcb_final_free(DCB *dcb);
static bool dcb_set_state_nomutex(
DCB* dcb,
const dcb_state_t new_state,
dcb_state_t* old_state);
/** /**
* Allocate a new DCB. * Allocate a new DCB.
* *
@ -76,8 +81,8 @@ static void dcb_final_free(DCB *dcb);
* *
* @return A newly allocated DCB or NULL if non could be allocated. * @return A newly allocated DCB or NULL if non could be allocated.
*/ */
DCB * DCB * dcb_alloc(
dcb_alloc() dcb_role_t role)
{ {
DCB *rval; DCB *rval;
@ -85,12 +90,17 @@ DCB *rval;
{ {
return NULL; return NULL;
} }
#if defined(SS_DEBUG)
rval->dcb_chk_top = CHK_NUM_DCB; rval->dcb_chk_top = CHK_NUM_DCB;
rval->dcb_chk_tail = CHK_NUM_DCB; rval->dcb_chk_tail = CHK_NUM_DCB;
#endif
rval->dcb_role = role;
simple_mutex_init(&rval->dcb_write_lock, "DCB write mutex"); simple_mutex_init(&rval->dcb_write_lock, "DCB write mutex");
simple_mutex_init(&rval->dcb_read_lock, "DCB read mutex"); simple_mutex_init(&rval->dcb_read_lock, "DCB read mutex");
rval->dcb_write_active = FALSE; rval->dcb_write_active = false;
rval->dcb_read_active = FALSE; rval->dcb_read_active = false;
spinlock_init(&rval->dcb_initlock);
spinlock_init(&rval->writeqlock); spinlock_init(&rval->writeqlock);
spinlock_init(&rval->delayqlock); spinlock_init(&rval->delayqlock);
spinlock_init(&rval->authlock); spinlock_init(&rval->authlock);
@ -121,59 +131,79 @@ DCB *rval;
return rval; return rval;
} }
/** /**
* Free a DCB, this only marks the DCB as a zombie and adds it * @node DCB is added to the end of zombies list.
* to the zombie list. The real working of removing it occurs once *
* all the threads signal they no longer have access to the DCB * Parameters:
* @param dcb - <usage>
* <description>
*
* @return
*
*
* @details Adding to list occurs once per DCB. This is ensured by changing the
* state of DCB to DCB_STATE_ZOMBIE after addition. Prior insertion, DCB state
* is checked and operation proceeds only if state differs from DCB_STATE_ZOMBIE.
* *
* @param dcb The DCB to free
*/ */
void void
dcb_free(DCB *dcb) dcb_add_to_zombieslist(DCB *dcb)
{ {
if (dcb->state == DCB_STATE_ZOMBIE) bool succp = false;
{ dcb_state_t prev_state = DCB_STATE_UNDEFINED;
skygw_log_write(LOGFILE_ERROR,
"Call to free a DCB that is already a zombie.\n"); CHK_DCB(dcb);
return; /**
} * Serialize zombies list access.
*/
/* Set the bitmask of running pollng threads */
bitmask_copy(&dcb->memdata.bitmask, poll_bitmask());
/* Add the DCB to the Zombie list */
spinlock_acquire(&zombiespin); spinlock_acquire(&zombiespin);
if (zombies == NULL)
if (dcb->state == DCB_STATE_ZOMBIE)
{
ss_dassert(zombies != NULL);
return;
}
if (zombies == NULL) {
zombies = dcb; zombies = dcb;
else } else {
{
DCB *ptr = zombies; DCB *ptr = zombies;
while (ptr->memdata.next) while (ptr->memdata.next)
{ {
ss_info_dassert(
ptr->memdata.next->state == DCB_STATE_ZOMBIE,
"Next zombie is not in DCB_STATE_ZOMBIE state");
ss_info_dassert(
ptr != dcb,
"Attempt to add DCB to zombies list although it "
"is already there.");
if (ptr == dcb) if (ptr == dcb)
{ {
skygw_log_write( skygw_log_write(
LOGFILE_ERROR, LOGFILE_ERROR,
"Attempt to add DCB to zombie queue " "Attempt to add DCB to zombies list "
"when it is already in the queue"); "when it is already in the list");
break; break;
} }
ptr = ptr->memdata.next; ptr = ptr->memdata.next;
} }
if (ptr != dcb) if (ptr != dcb) {
ptr->memdata.next = dcb; ptr->memdata.next = dcb;
}
} }
/**
* Set state which indicates that it has been added to zombies
* list.
*/
succp = dcb_set_state(dcb, DCB_STATE_ZOMBIE, &prev_state);
ss_info_dassert(succp, "Failed to set DCB_STATE_ZOMBIE");
spinlock_release(&zombiespin); spinlock_release(&zombiespin);
skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [dcb_free] Set dcb %p for fd %d DCB_STATE_ZOMBIE",
pthread_self(),
(unsigned long)dcb,
dcb->fd);
dcb->state = DCB_STATE_ZOMBIE;
} }
/** /**
* Free a DCB and remove it from the chain of all DCBs * Free a DCB and remove it from the chain of all DCBs
* *
@ -185,7 +215,11 @@ dcb_free(DCB *dcb)
static void static void
dcb_final_free(DCB *dcb) dcb_final_free(DCB *dcb)
{ {
dcb->state = DCB_STATE_FREED; SERVICE *service;
CHK_DCB(dcb);
ss_info_dassert(dcb->state == DCB_STATE_DISCONNECTED,
"dcb not in DCB_STATE_DISCONNECTED state.");
/* First remove this DCB from the chain */ /* First remove this DCB from the chain */
spinlock_acquire(&dcbspin); spinlock_acquire(&dcbspin);
@ -200,7 +234,7 @@ dcb_final_free(DCB *dcb)
else else
{ {
/* /*
* We find the DCB that pont to the one we are removing and then * We find the DCB that point to the one we are removing and then
* set the next pointer of that DCB to the next pointer of the * set the next pointer of that DCB to the next pointer of the
* DCB we are removing. * DCB we are removing.
*/ */
@ -212,10 +246,41 @@ dcb_final_free(DCB *dcb)
} }
spinlock_release(&dcbspin); spinlock_release(&dcbspin);
if (dcb->session) { /**
SESSION *local_session = dcb->session; * Terminate router session.
if (dcb_isclient(dcb)) */
dcb->session->client = NULL; service = dcb->session->service;
if (service != NULL &&
service->router != NULL &&
dcb->session->router_session != NULL)
{
void* rsession = NULL;
/**
* Protect call of closeSession.
*/
spinlock_acquire(&dcb->session->ses_lock);
rsession = dcb->session->router_session;
dcb->session->router_session = NULL;
spinlock_release(&dcb->session->ses_lock);
if (rsession != NULL) {
service->router->closeSession(
service->router_instance,
rsession);
} else {
skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [dcb_final_free] rsession was NULL in "
"dcb_close.",
pthread_self());
}
}
/**
* Terminate client session.
*/
if (dcb->session) {
SESSION *local_session = dcb->session;
dcb->session = NULL; dcb->session = NULL;
session_free(local_session); session_free(local_session);
skygw_log_write_flush( skygw_log_write_flush(
@ -251,6 +316,9 @@ void
dcb_process_zombies(int threadid) dcb_process_zombies(int threadid)
{ {
DCB *ptr, *lptr; DCB *ptr, *lptr;
DCB* dcb_list = NULL;
DCB* dcb = NULL;
bool succp = false;
spinlock_acquire(&zombiespin); spinlock_acquire(&zombiespin);
ptr = zombies; ptr = zombies;
@ -278,13 +346,25 @@ DCB *ptr, *lptr;
lptr->memdata.next = tptr; lptr->memdata.next = tptr;
skygw_log_write_flush( skygw_log_write_flush(
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [dcb_process_zombies] Free dcb %p in state " "%lu [dcb_process_zombies] Remove dcb %p fd %d "
"%s for fd %d", "in state %s from zombies list.",
pthread_self(), pthread_self(),
ptr, ptr,
STRDCBSTATE(ptr->state), ptr->fd,
ptr->fd); STRDCBSTATE(ptr->state));
dcb_final_free(ptr); ss_info_dassert(ptr->state == DCB_STATE_ZOMBIE,
"dcb not in DCB_STATE_ZOMBIE state.");
/**
* Move dcb to linked list of victim dcbs.
*/
if (dcb_list == NULL) {
dcb_list = ptr;
dcb = dcb_list;
} else {
dcb->memdata.next = ptr;
dcb = dcb->memdata.next;
}
dcb->memdata.next = NULL;
ptr = tptr; ptr = tptr;
} }
else else
@ -294,6 +374,20 @@ DCB *ptr, *lptr;
} }
} }
spinlock_release(&zombiespin); spinlock_release(&zombiespin);
dcb = dcb_list;
while (dcb != NULL) {
/**
* Close file descriptor and move to clean-up phase.
*/
close(dcb->fd);
ss_debug(dcb->fd = 0;)
succp = dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL);
ss_dassert(succp);
dcb_final_free(dcb);
dcb = dcb->memdata.next;
}
} }
/** /**
@ -313,36 +407,43 @@ dcb_connect(SERVER *server, SESSION *session, const char *protocol)
{ {
DCB *dcb; DCB *dcb;
GWPROTOCOL *funcs; GWPROTOCOL *funcs;
int val;
if ((dcb = dcb_alloc()) == NULL) if ((dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER)) == NULL)
{ {
return NULL; return NULL;
} }
if ((funcs = (GWPROTOCOL *)load_module(protocol, MODULE_PROTOCOL)) == NULL) if ((funcs = (GWPROTOCOL *)load_module(protocol, MODULE_PROTOCOL)) == NULL)
{ {
dcb_final_free(dcb); dcb_final_free(dcb);
skygw_log_write( LOGFILE_ERROR, skygw_log_write(
"Failed to load protocol module for %s, free dcb %p\n", protocol, dcb); LOGFILE_ERROR,
"Failed to load protocol module for %s, free dcb %p\n",
protocol, dcb);
return NULL; return NULL;
} }
memcpy(&(dcb->func), funcs, sizeof(GWPROTOCOL)); memcpy(&(dcb->func), funcs, sizeof(GWPROTOCOL));
if (!session_link_dcb(session, dcb)) if (!session_link_dcb(session, dcb))
{ {
skygw_log_write(LOGFILE_TRACE, skygw_log_write(
"dcb_connect: failed to link to session, the session has been removed."); LOGFILE_TRACE,
"dcb_connect: failed to link to session, the session "
"has been removed.");
dcb_final_free(dcb); dcb_final_free(dcb);
return NULL; return NULL;
} }
if ((dcb->fd = dcb->func.connect(dcb, server, session)) == -1) if ((dcb->fd = dcb->func.connect(dcb, server, session)) == -1)
{ {
dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL);
dcb_final_free(dcb); dcb_final_free(dcb);
skygw_log_write_flush(LOGFILE_ERROR, skygw_log_write_flush(
"Failed to connect to server %s:%d, free dcb %p\n", LOGFILE_ERROR,
server->name, "Failed to connect to server %s:%d, free dcb %p\n",
server->port, server->name,
dcb); server->port,
dcb);
return NULL; return NULL;
} }
@ -585,8 +686,8 @@ int saved_errno = 0;
{ {
skygw_log_write( skygw_log_write(
LOGFILE_ERROR, LOGFILE_ERROR,
"%lu [dcb_drain_writeq] Write to fd %d failed, " "%lu [dcb_drain_writeq] Write to fd %d "
"errno %d", "failed due errno %d",
pthread_self(), pthread_self(),
dcb->fd, dcb->fd,
saved_errno); saved_errno);
@ -624,56 +725,43 @@ int saved_errno = 0;
void void
dcb_close(DCB *dcb) dcb_close(DCB *dcb)
{ {
/** protect state check and set */ dcb_state_t prev_state;
spinlock_acquire(&dcb->writeqlock); bool succp;
if (dcb->state == DCB_STATE_DISCONNECTED ||
dcb->state == DCB_STATE_FREED ||
dcb->state == DCB_STATE_ZOMBIE)
{
spinlock_release(&dcb->writeqlock);
return;
}
poll_remove_dcb(dcb);
close(dcb->fd);
dcb->state = DCB_STATE_DISCONNECTED;
spinlock_release(&dcb->writeqlock);
if (dcb_isclient(dcb)) CHK_DCB(dcb);
{
/*
* If the DCB we are closing is a client side DCB then shutdown
* the router session. This will close any backend connections.
*/
SERVICE *service = dcb->session->service;
if (service != NULL && /**
service->router != NULL && * Only the first call to dcb_close removes dcb from poll set.
dcb->session->router_session != NULL) */
{ spinlock_acquire(&dcb->dcb_initlock);
void* rsession = NULL; succp = dcb_set_state_nomutex(dcb, DCB_STATE_NOPOLLING, &prev_state);
/**
* Protect call of closeSession.
*/
spinlock_acquire(&dcb->session->ses_lock);
rsession = dcb->session->router_session;
dcb->session->router_session = NULL;
spinlock_release(&dcb->session->ses_lock);
if (rsession != NULL) { if (succp) {
service->router->closeSession( poll_remove_dcb(dcb);
service->router_instance, /* Set the bitmask of running polling threads */
rsession); bitmask_copy(&dcb->memdata.bitmask, poll_bitmask());
} else { } else {
skygw_log_write_flush( ss_info_dassert(!dcb_isclient(dcb) ||
LOGFILE_TRACE, prev_state == DCB_STATE_NOPOLLING ||
"%lu [dcb_close] rsession was NULL in " prev_state == DCB_STATE_ZOMBIE,
"dcb_close.", "Invalid state transition.");
pthread_self()); }
}
} spinlock_release(&dcb->dcb_initlock);
}
dcb_free(dcb); if (succp) {
skygw_log_write(
LOGFILE_TRACE,
"%lu [dcb_close] Removed dcb %p in state %s from "
"poll set.",
pthread_self(),
dcb,
STRDCBSTATE(dcb->state));
}
if (dcb->state == DCB_STATE_NOPOLLING) {
dcb_add_to_zombieslist(dcb);
}
} }
/** /**
@ -779,12 +867,8 @@ gw_dcb_state2string (int state) {
switch(state) { switch(state) {
case DCB_STATE_ALLOC: case DCB_STATE_ALLOC:
return "DCB Allocated"; return "DCB Allocated";
case DCB_STATE_IDLE:
return "DCB not yet in polling";
case DCB_STATE_POLLING: case DCB_STATE_POLLING:
return "DCB in the polling loop"; return "DCB in the polling loop";
case DCB_STATE_PROCESSING:
return "DCB processing event";
case DCB_STATE_LISTENING: case DCB_STATE_LISTENING:
return "DCB for listening socket"; return "DCB for listening socket";
case DCB_STATE_DISCONNECTED: case DCB_STATE_DISCONNECTED:
@ -869,3 +953,149 @@ void dcb_hashtable_stats(
dcb_printf(dcb, "\tLongest chain length: %d\n", longest); dcb_printf(dcb, "\tLongest chain length: %d\n", longest);
} }
bool dcb_set_state(
DCB* dcb,
const dcb_state_t new_state,
dcb_state_t* old_state)
{
bool succp;
dcb_state_t state;
CHK_DCB(dcb);
spinlock_acquire(&dcb->dcb_initlock);
succp = dcb_set_state_nomutex(dcb, new_state, &state);
ss_info_dassert(succp, "Failed to set new state for dcb");
spinlock_release(&dcb->dcb_initlock);
if (old_state != NULL) {
*old_state = state;
}
return succp;
}
static bool dcb_set_state_nomutex(
DCB* dcb,
const dcb_state_t new_state,
dcb_state_t* old_state)
{
bool succp;
dcb_state_t state = DCB_STATE_UNDEFINED;
CHK_DCB(dcb);
state = dcb->state;
if (old_state != NULL) {
*old_state = state;
}
switch (state) {
case DCB_STATE_UNDEFINED:
dcb->state = new_state;
succp = true;
break;
case DCB_STATE_ALLOC:
switch (new_state) {
case DCB_STATE_POLLING: /**< for client requests */
case DCB_STATE_LISTENING: /**< for connect listeners */
case DCB_STATE_DISCONNECTED: /**< for failed connections */
dcb->state = new_state;
succp = true;
break;
default:
ss_dassert(old_state != NULL);
break;
}
break;
case DCB_STATE_POLLING:
switch(new_state) {
case DCB_STATE_NOPOLLING:
case DCB_STATE_LISTENING:
dcb->state = new_state;
succp = true;
break;
default:
ss_dassert(old_state != NULL);
break;
}
break;
case DCB_STATE_LISTENING:
switch(new_state) {
case DCB_STATE_POLLING:
dcb->state = new_state;
succp = true;
break;
default:
ss_dassert(old_state != NULL);
break;
}
break;
case DCB_STATE_NOPOLLING:
switch (new_state) {
case DCB_STATE_ZOMBIE:
dcb->state = new_state;
case DCB_STATE_POLLING: /**< ok to try but state can't change */
succp = true;
break;
default:
ss_dassert(old_state != NULL);
break;
}
break;
case DCB_STATE_ZOMBIE:
switch (new_state) {
case DCB_STATE_DISCONNECTED:
dcb->state = new_state;
case DCB_STATE_POLLING: /**< ok to try but state can't change */
succp = true;
break;
default:
ss_dassert(old_state != NULL);
break;
}
break;
case DCB_STATE_DISCONNECTED:
switch (new_state) {
case DCB_STATE_FREED:
dcb->state = new_state;
succp = true;
break;
default:
ss_dassert(old_state != NULL);
break;
}
break;
case DCB_STATE_FREED:
ss_dassert(old_state != NULL);
break;
default:
skygw_log_write(
LOGFILE_ERROR,
"%lu [dcb_set_state_nomutex] Unknown dcb state %d",
pthread_self(),
dcb->state);
ss_dassert(false);
break;
} /* switch (dcb->state) */
if (succp) {
skygw_log_write(
LOGFILE_TRACE,
"%lu [dcb_set_state_nomutex] dcb %p fd %d %s -> %s",
pthread_self(),
dcb,
dcb->fd,
STRDCBSTATE(state),
STRDCBSTATE(dcb->state));
}
return succp;
}

View File

@ -90,12 +90,28 @@ poll_init()
int int
poll_add_dcb(DCB *dcb) poll_add_dcb(DCB *dcb)
{ {
int rc;
dcb_state_t old_state = DCB_STATE_UNDEFINED;
struct epoll_event ev; struct epoll_event ev;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET; ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.data.ptr = dcb; ev.data.ptr = dcb;
return epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dcb->fd, &ev); /**
* Service listeners have different state than
* DCBs serving client requests.
*/
if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER) {
dcb_set_state(dcb, DCB_STATE_LISTENING, &old_state);
} else if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER) {
dcb_set_state(dcb, DCB_STATE_POLLING, &old_state);
}
rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dcb->fd, &ev);
if (rc != 0) {
dcb_set_state(dcb, old_state, NULL);
}
return rc;
} }
@ -203,15 +219,19 @@ poll_waitevents(void *arg)
__uint32_t ev = events[i].events; __uint32_t ev = events[i].events;
CHK_DCB(dcb); CHK_DCB(dcb);
ss_dassert(dcb->state != DCB_STATE_IDLE &&
dcb->state != DCB_STATE_ALLOC); ss_debug(spinlock_acquire(&dcb->dcb_initlock);)
ss_dassert(dcb->state != DCB_STATE_ALLOC);
ss_dassert(dcb->state != DCB_STATE_DISCONNECTED);
ss_dassert(dcb->state != DCB_STATE_FREED);
ss_debug(spinlock_release(&dcb->dcb_initlock);)
skygw_log_write( skygw_log_write(
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [poll_waitevents] event %d", "%lu [poll_waitevents] event %d",
pthread_self(), pthread_self(),
ev); ev);
#if 0
if (DCB_ISZOMBIE(dcb)) if (DCB_ISZOMBIE(dcb))
{ {
skygw_log_write( skygw_log_write(
@ -233,7 +253,7 @@ poll_waitevents(void *arg)
STRDCBSTATE(dcb->state)); STRDCBSTATE(dcb->state));
continue; continue;
} }
#endif
if (ev & EPOLLERR) if (ev & EPOLLERR)
{ {
@ -255,23 +275,21 @@ poll_waitevents(void *arg)
} }
if (ev & EPOLLOUT) if (ev & EPOLLOUT)
{ {
simple_mutex_lock(&dcb->dcb_write_lock, true); simple_mutex_lock(&dcb->dcb_write_lock,
true);
ss_info_dassert(!dcb->dcb_write_active, ss_info_dassert(!dcb->dcb_write_active,
"Write already active"); "Write already active");
dcb->dcb_write_active = TRUE; dcb->dcb_write_active = TRUE;
skygw_log_write(
skygw_log_write(LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [poll_waitevents] " "%lu [poll_waitevents] "
"Write in fd %d", "Write in fd %d",
pthread_self(), pthread_self(),
dcb->fd); dcb->fd);
atomic_add(&pollStats.n_write, 1); atomic_add(&pollStats.n_write, 1);
dcb->func.write_ready(dcb); dcb->func.write_ready(dcb);
dcb->dcb_write_active = FALSE; dcb->dcb_write_active = FALSE;
simple_mutex_unlock( simple_mutex_unlock(&dcb->dcb_write_lock);
&dcb->dcb_write_lock);
} }
if (ev & EPOLLIN) if (ev & EPOLLIN)
{ {

View File

@ -101,18 +101,23 @@ serviceStartPort(SERVICE *service, SERV_PROTOCOL *port)
int listeners = 0; int listeners = 0;
char config_bind[40]; char config_bind[40];
GWPROTOCOL *funcs; GWPROTOCOL *funcs;
int rc;
if ((port->listener = dcb_alloc()) == NULL) port->listener = dcb_alloc(DCB_ROLE_SERVICE_LISTENER);
if (port->listener == NULL)
{ {
return 0; return 0;
} }
if (strcmp(port->protocol, "MySQLClient") == 0) { if (strcmp(port->protocol, "MySQLClient") == 0) {
int loaded = -1; int loaded = -1;
loaded = load_mysql_users(service); loaded = load_mysql_users(service);
skygw_log_write( LOGFILE_MESSAGE, "MySQL Users loaded: %i\n", loaded); skygw_log_write(
LOGFILE_MESSAGE,
"MySQL Users loaded: %i\n",
loaded);
} }
if ((funcs = (GWPROTOCOL *)load_module(port->protocol, MODULE_PROTOCOL)) == NULL) if ((funcs = (GWPROTOCOL *)load_module(port->protocol, MODULE_PROTOCOL)) == NULL)

View File

@ -124,16 +124,20 @@ typedef struct {
/* DCB states */ /* DCB states */
typedef enum { typedef enum {
DCB_STATE_UNDEFINED, /**< State variable with no state */
DCB_STATE_ALLOC, /**< Memory allocated but not populated */ DCB_STATE_ALLOC, /**< Memory allocated but not populated */
DCB_STATE_IDLE, /**< Not yet in the poll mask */
DCB_STATE_POLLING, /**< Waiting in the poll loop */ DCB_STATE_POLLING, /**< Waiting in the poll loop */
DCB_STATE_PROCESSING, /**< Processing an event */
DCB_STATE_LISTENING, /**< The DCB is for a listening socket */ DCB_STATE_LISTENING, /**< The DCB is for a listening socket */
DCB_STATE_DISCONNECTED, /**< The socket is now closed */ DCB_STATE_DISCONNECTED, /**< The socket is now closed */
DCB_STATE_FREED, /**< Memory freed */ DCB_STATE_FREED, /**< Memory freed */
DCB_STATE_NOPOLLING, /**< Removed from poll mask */
DCB_STATE_ZOMBIE /**< DCB is no longer active, waiting to free it */ DCB_STATE_ZOMBIE /**< DCB is no longer active, waiting to free it */
} dcb_state_t; } dcb_state_t;
typedef enum {
DCB_ROLE_SERVICE_LISTENER,
DCB_ROLE_REQUEST_HANDLER
} dcb_role_t;
/** /**
* Descriptor Control Block * Descriptor Control Block
@ -147,7 +151,11 @@ typedef enum {
* gateway may be selected to execute the required actions when a network event occurs. * gateway may be selected to execute the required actions when a network event occurs.
*/ */
typedef struct dcb { typedef struct dcb {
#if defined(SS_DEBUG)
skygw_chk_t dcb_chk_top; skygw_chk_t dcb_chk_top;
#endif
dcb_role_t dcb_role;
SPINLOCK dcb_initlock;
simple_mutex_t dcb_read_lock; simple_mutex_t dcb_read_lock;
simple_mutex_t dcb_write_lock; simple_mutex_t dcb_write_lock;
int fd; /**< The descriptor */ int fd; /**< The descriptor */
@ -172,7 +180,9 @@ typedef struct dcb {
void *data; /**< Specific client data */ void *data; /**< Specific client data */
DCBMM memdata; /**< The data related to DCB memory management */ DCBMM memdata; /**< The data related to DCB memory management */
int command; /**< Specific client command type */ int command; /**< Specific client command type */
#if defined(SS_DEBUG)
skygw_chk_t dcb_chk_tail; skygw_chk_t dcb_chk_tail;
#endif
} DCB; } DCB;
@ -181,21 +191,26 @@ typedef struct dcb {
#define DCB_PROTOCOL(x, type) (type *)((x)->protocol) #define DCB_PROTOCOL(x, type) (type *)((x)->protocol)
#define DCB_ISZOMBIE(x) ((x)->state == DCB_STATE_ZOMBIE) #define DCB_ISZOMBIE(x) ((x)->state == DCB_STATE_ZOMBIE)
extern DCB *dcb_alloc(); /* Allocate a DCB */ DCB *dcb_alloc(dcb_role_t);
extern void dcb_free(DCB *); /* Free a DCB */ void dcb_free(DCB *); /* Free a DCB */
extern DCB *dcb_connect(struct server *, struct session *, const char *); /* prepare Backend connection */ DCB *dcb_connect(struct server *, struct session *, const char *); /* prepare Backend connection */
extern int dcb_read(DCB *, GWBUF **); /* Generic read routine */ int dcb_read(DCB *, GWBUF **); /* Generic read routine */
extern int dcb_write(DCB *, GWBUF *); /* Generic write routine */ int dcb_write(DCB *, GWBUF *); /* Generic write routine */
extern int dcb_drain_writeq(DCB *); /* Generic write routine */ int dcb_drain_writeq(DCB *); /* Generic write routine */
extern void dcb_close(DCB *); /* Generic close functionality */ void dcb_close(DCB *); /* Generic close functionality */
extern void dcb_process_zombies(int); /* Process Zombies */ void dcb_process_zombies(int); /* Process Zombies */
extern void printAllDCBs(); /* Debug to print all DCB in the system */ void printAllDCBs(); /* Debug to print all DCB in the system */
extern void printDCB(DCB *); /* Debug print routine */ void printDCB(DCB *); /* Debug print routine */
extern void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */ void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */
extern void dprintDCB(DCB *, DCB *); /* Debug to print a DCB in the system */ void dprintDCB(DCB *, DCB *); /* Debug to print a DCB in the system */
extern const char *gw_dcb_state2string(int); /* DCB state to string */ const char *gw_dcb_state2string(int); /* DCB state to string */
extern void dcb_printf(DCB *, const char *, ...); /* DCB version of printf */ void dcb_printf(DCB *, const char *, ...); /* DCB version of printf */
extern int dcb_isclient(DCB *); /* the DCB is the client of the session */ int dcb_isclient(DCB *); /* the DCB is the client of the session */
extern void dcb_hashtable_stats(DCB *, void *); /**< Print statisitics */ void dcb_hashtable_stats(DCB *, void *); /**< Print statisitics */
void dcb_add_to_zombieslist(DCB* dcb);
#endif bool dcb_set_state(
DCB* dcb,
dcb_state_t new_state,
dcb_state_t* old_state);
#endif /* _DCB_H */

View File

@ -130,8 +130,6 @@ size_t i, j;
int headers_read = 0; int headers_read = 0;
HTTPD_session *client_data = NULL; HTTPD_session *client_data = NULL;
dcb->state = DCB_STATE_PROCESSING;
client_data = dcb->data; client_data = dcb->data;
/** /**
@ -320,14 +318,13 @@ int n_connect = 0;
else else
{ {
atomic_add(&dcb->stats.n_accepts, 1); atomic_add(&dcb->stats.n_accepts, 1);
client = dcb_alloc(); client = dcb_alloc(DCB_ROLE_SERVICE_LISTENER);
client->fd = so; client->fd = so;
client->remote = strdup(inet_ntoa(addr.sin_addr)); client->remote = strdup(inet_ntoa(addr.sin_addr));
memcpy(&client->func, &MyObject, sizeof(GWPROTOCOL)); memcpy(&client->func, &MyObject, sizeof(GWPROTOCOL));
client->session = session_alloc(dcb->session->service, client); client->session = session_alloc(dcb->session->service, client);
ss_dassert(client->session->state != SESSION_STATE_ALLOC); ss_dassert(
client->state = DCB_STATE_IDLE; client->session->state != SESSION_STATE_ALLOC);
/* create the session data for HTTPD */ /* create the session data for HTTPD */
client_data = (HTTPD_session *)calloc(1, sizeof(HTTPD_session)); client_data = (HTTPD_session *)calloc(1, sizeof(HTTPD_session));
client->data = client_data; client->data = client_data;
@ -337,8 +334,6 @@ int n_connect = 0;
return n_connect; return n_connect;
} }
n_connect++; n_connect++;
client->state = DCB_STATE_POLLING;
} }
} }
return n_connect; return n_connect;
@ -391,7 +386,11 @@ short pnum;
} }
/* socket options */ /* socket options */
setsockopt(listener->fd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); setsockopt(listener->fd,
SOL_SOCKET,
SO_REUSEADDR,
(char *)&one,
sizeof(one));
/* set NONBLOCKING mode */ /* set NONBLOCKING mode */
setnonblocking(listener->fd); setnonblocking(listener->fd);
@ -401,8 +400,6 @@ short pnum;
{ {
return 0; return 0;
} }
listener->state = DCB_STATE_LISTENING;
listen(listener->fd, SOMAXCONN); listen(listener->fd, SOMAXCONN);
if (poll_add_dcb(listener) == -1) if (poll_add_dcb(listener) == -1)
@ -461,4 +458,3 @@ static void httpd_send_headers(DCB *dcb, int final)
dcb_printf(dcb, "\r\n"); dcb_printf(dcb, "\r\n");
} }
} }
//

View File

@ -146,37 +146,21 @@ static int gw_read_backend_event(DCB *dcb) {
MYSQL_session *current_session = NULL; MYSQL_session *current_session = NULL;
int rc = 0; int rc = 0;
dcb->state = DCB_STATE_PROCESSING; CHK_DCB(dcb);
CHK_SESSION(dcb->session);
ss_info_dassert(dcb->session != NULL,
"Backend dcb doesn't have session");
ss_info_dassert(dcb->session->client != NULL,
"Session's client dcb pointer is NULL");
if(dcb->session) { client_protocol = SESSION_PROTOCOL(dcb->session, MySQLProtocol);
CHK_SESSION(dcb->session); backend_protocol = (MySQLProtocol *) dcb->protocol;
if (dcb->session->client == NULL) {
dcb->state = DCB_STATE_DISCONNECTED;
skygw_log_write(
LOGFILE_ERROR,
"%lu [gw_read_backend_event] client dcb is NULL for backend dcb %d.",
pthread_self(),
dcb->fd);
dcb->state = DCB_STATE_DISCONNECTED;
return 1;
}
client_protocol = SESSION_PROTOCOL(dcb->session, MySQLProtocol);
} else {
skygw_log_write(
LOGFILE_ERROR,
"%lu [gw_read_backend_event] dcb->session is NULL for backend dcb %d.",
pthread_self(),
dcb->fd);
dcb->state = DCB_STATE_DISCONNECTED;
return 1;
}
backend_protocol = (MySQLProtocol *) dcb->protocol;
/** return only with complete session */ /** return only with complete session */
current_session = gw_get_shared_session_auth_info(dcb); current_session = gw_get_shared_session_auth_info(dcb);
ss_dassert(current_session != NULL); ss_dassert(current_session != NULL);
ss_dassert(dcb->session->state != SESSION_STATE_ALLOC);
/* fprintf(stderr, ">>> backend EPOLLIN from %i, command %i,protocol /* fprintf(stderr, ">>> backend EPOLLIN from %i, command %i,protocol
* state [%s]\n", dcb->fd, dcb->command, gw_mysql_protocol_state2string * state [%s]\n", dcb->fd, dcb->command, gw_mysql_protocol_state2string
* (backend_protocol->state)); * (backend_protocol->state));
@ -202,17 +186,18 @@ static int gw_read_backend_event(DCB *dcb) {
/* ready to check the authentication reply from backend */ /* ready to check the authentication reply from backend */
if (backend_protocol->state == MYSQL_AUTH_RECV) { if (backend_protocol->state == MYSQL_AUTH_RECV) {
ROUTER_OBJECT *router = NULL; ROUTER_OBJECT *router = NULL;
ROUTER *router_instance = NULL; ROUTER *router_instance = NULL;
void *rsession = NULL; void *rsession = NULL;
int rv = -1; int rv = -1;
SESSION *session = dcb->session; SESSION *session = dcb->session;
if (session) { CHK_SESSION(session);
router = session->service->router;
router_instance = session->service->router_instance; router = session->service->router;
rsession = session->router_session; router_instance = session->service->router_instance;
} rsession = session->router_session;
/* read backed auth reply */ /* read backed auth reply */
rv = gw_receive_backend_auth(backend_protocol); rv = gw_receive_backend_auth(backend_protocol);
@ -222,12 +207,17 @@ static int gw_read_backend_event(DCB *dcb) {
LOGFILE_ERROR, LOGFILE_ERROR,
"%lu [gw_read_backend_event] caught " "%lu [gw_read_backend_event] caught "
"MYSQL_FAILED_AUTHENTICATION from " "MYSQL_FAILED_AUTHENTICATION from "
"gw_receive_backend_auth. Fd %d, user %s.", "gw_receive_backend_auth. Fd %d, "
"user %s.",
pthread_self(), pthread_self(),
dcb->fd, dcb->fd,
current_session->user); current_session->user);
backend_protocol->state = MYSQL_AUTH_FAILED; backend_protocol->state = MYSQL_AUTH_FAILED;
#if 0
ss_dassert(backend_protocol->state !=
MYSQL_AUTH_FAILED);
#endif
/* send an error to the client */ /* send an error to the client */
mysql_send_custom_error( mysql_send_custom_error(
dcb->session->client, dcb->session->client,
@ -245,17 +235,19 @@ static int gw_read_backend_event(DCB *dcb) {
if (rsession != NULL) { if (rsession != NULL) {
skygw_log_write_flush( skygw_log_write_flush(
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [gw_read_backend_event] Call " "%lu [gw_read_backend_event] "
"closeSession for backend session.", "Call closeSession for backend "
"session.",
pthread_self()); pthread_self());
/* close the active session */ /* close the active session */
router->closeSession(router_instance, rsession); router->closeSession(router_instance,
rsession);
} else { } else {
skygw_log_write( skygw_log_write(
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [gw_read_backend_event] " "%lu [gw_read_backend_event] "
"closeSession already called for " "closeSession already called "
"backend session.", "for backend session.",
pthread_self()); pthread_self());
} }
rc = 1; rc = 1;
@ -266,7 +258,8 @@ static int gw_read_backend_event(DCB *dcb) {
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [gw_read_backend_event] caught " "%lu [gw_read_backend_event] caught "
"MYSQL_SUCCESFUL_AUTHENTICATION from " "MYSQL_SUCCESFUL_AUTHENTICATION from "
"gw_receive_backend_auth. Fd %d, user %s.", "gw_receive_backend_auth. Fd %d, "
"user %s.",
pthread_self(), pthread_self(),
dcb->fd, dcb->fd,
current_session->user); current_session->user);
@ -276,7 +269,6 @@ static int gw_read_backend_event(DCB *dcb) {
/* check the delay queue and flush the data */ /* check the delay queue and flush the data */
if(dcb->delayq) { if(dcb->delayq) {
backend_write_delayqueue(dcb); backend_write_delayqueue(dcb);
dcb->state = DCB_STATE_POLLING;
spinlock_release(&dcb->authlock); spinlock_release(&dcb->authlock);
rc = 1; rc = 1;
goto return_rc; goto return_rc;
@ -304,6 +296,7 @@ static int gw_read_backend_event(DCB *dcb) {
void *rsession = NULL; void *rsession = NULL;
SESSION *session = dcb->session; SESSION *session = dcb->session;
CHK_SESSION(session);
/* read available backend data */ /* read available backend data */
rc = dcb_read(dcb, &head); rc = dcb_read(dcb, &head);
@ -312,26 +305,22 @@ static int gw_read_backend_event(DCB *dcb) {
goto return_rc; goto return_rc;
} }
if (session != NULL) { router = session->service->router;
router = session->service->router; router_instance = session->service->router_instance;
router_instance = session->service->router_instance; rsession = session->router_session;
rsession = session->router_session;
}
/* Note the gwbuf doesn't have here a valid queue->command /* Note the gwbuf doesn't have here a valid queue->command
* descriptions as it is a fresh new one! * descriptions as it is a fresh new one!
* We only have the copied value in dcb->command from previuos func.write() * We only have the copied value in dcb->command from
* and this will be used by the router->clientReply * previuos func.write() and this will be used by the
* router->clientReply
* and pass now the gwbuf to the router
*/ */
/* and pass now the gwbuf to the router */
router->clientReply(router_instance, rsession, head, dcb); router->clientReply(router_instance, rsession, head, dcb);
rc = 1; rc = 1;
goto return_rc; goto return_rc;
} }
rc = 0; rc = 0;
return_rc: return_rc:
dcb->state = DCB_STATE_POLLING;
return rc; return rc;
} }
@ -347,18 +336,16 @@ static int gw_write_backend_event(DCB *dcb) {
//fprintf(stderr, ">>> backend EPOLLOUT %i, protocol state [%s]\n", backend_protocol->fd, gw_mysql_protocol_state2string(backend_protocol->state)); //fprintf(stderr, ">>> backend EPOLLOUT %i, protocol state [%s]\n", backend_protocol->fd, gw_mysql_protocol_state2string(backend_protocol->state));
// spinlock_acquire(&dcb->connectlock); // spinlock_acquire(&dcb->connectlock);
dcb->state = DCB_STATE_PROCESSING; /**
* vraa: what is the logic in this?
*/
if (backend_protocol->state == MYSQL_PENDING_CONNECT) { if (backend_protocol->state == MYSQL_PENDING_CONNECT) {
backend_protocol->state = MYSQL_CONNECTED; backend_protocol->state = MYSQL_CONNECTED;
// spinlock_release(&dcb->connectlock); // spinlock_release(&dcb->connectlock);
dcb->state = DCB_STATE_POLLING;
return 1; return 1;
} }
// spinlock_release(&dcb->connectlock); // spinlock_release(&dcb->connectlock);
dcb_drain_writeq(dcb); dcb_drain_writeq(dcb);
dcb->state = DCB_STATE_POLLING;
return 1; return 1;
} }
@ -456,7 +443,8 @@ static int gw_create_backend_connection(
skygw_log_write( skygw_log_write(
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [gw_create_backend_connection] Established " "%lu [gw_create_backend_connection] Established "
"connection to %s:%i, backend fd %d client fd %d.", "connection to %s:%i, backend fd %d client "
"fd %d.",
pthread_self(), pthread_self(),
server->name, server->name,
server->port, server->port,
@ -492,8 +480,6 @@ static int gw_create_backend_connection(
session->client->fd); session->client->fd);
break; break;
} /**< switch */ } /**< switch */
backend_dcb->state = DCB_STATE_POLLING;
return_fd: return_fd:
ss_dassert(backend_dcb->fd == fd); ss_dassert(backend_dcb->fd == fd);
ss_dassert(backend_dcb->fd == protocol->fd); ss_dassert(backend_dcb->fd == protocol->fd);
@ -547,7 +533,6 @@ static void backend_set_delayqueue(DCB *dcb, GWBUF *queue) {
dcb->delayq = queue; dcb->delayq = queue;
} }
} }
spinlock_release(&dcb->delayqlock); spinlock_release(&dcb->delayqlock);
} }
@ -681,4 +666,3 @@ static int gw_session(DCB *backend_dcb, void *data) {
return 0; return 0;
} }
/////

View File

@ -510,22 +510,9 @@ int gw_read_client_event(DCB* dcb) {
MySQLProtocol *protocol = NULL; MySQLProtocol *protocol = NULL;
int b = -1; int b = -1;
int rc = 0; int rc = 0;
#if 0
dcb->state = dcb_begin_action(dcb, DCB_ACTION_READ);
#else
CHK_DCB(dcb); CHK_DCB(dcb);
if (dcb->state == DCB_STATE_DISCONNECTED ||
dcb->state == DCB_STATE_FREED ||
dcb->state == DCB_STATE_ZOMBIE ||
dcb->state == DCB_STATE_PROCESSING)
{
rc = 1;
goto return_rc;
}
ss_dassert(dcb->state == DCB_STATE_POLLING);
dcb->state = DCB_STATE_PROCESSING;
#endif
protocol = DCB_PROTOCOL(dcb, MySQLProtocol); protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
CHK_PROTOCOL(protocol); CHK_PROTOCOL(protocol);
/** /**
@ -647,9 +634,9 @@ int gw_read_client_event(DCB* dcb) {
if (ptr_buff) { if (ptr_buff) {
mysql_command = ptr_buff[4]; mysql_command = ptr_buff[4];
} }
if (mysql_command == '\x03') { if (mysql_command == '\x03') {
/// this is a standard MySQL query !!!! /* this is a standard MySQL query !!!! */
} }
/** /**
* Routing Client input to Backend * Routing Client input to Backend
@ -657,24 +644,20 @@ int gw_read_client_event(DCB* dcb) {
/* Do not route the query without session! */ /* Do not route the query without session! */
if(rsession == NULL) { if(rsession == NULL) {
if (mysql_command == '\x01') { if (mysql_command == '\x01') {
/* COM_QUIT handling */ /**
/* fprintf(stderr, "COM_QUIT received with * COM_QUIT handling
* no connected backends from %i\n", dcb->fd); */ *
* fprintf(stderr, "COM_QUIT received with
* no connected backends from %i\n", dcb->fd);
*/
(dcb->func).close(dcb); (dcb->func).close(dcb);
} else { } else {
/* Send a custom error as MySQL command reply */ /* Send a custom error as MySQL command reply */
if (dcb) { mysql_send_custom_error(
mysql_send_custom_error( dcb,
dcb, 1,
1, 0,
0, "Connection to backend lost");
"Connection to backend lost");
} else {
skygw_log_write(
LOGFILE_ERROR,
"%lu [mysql_send_custom_error] client dcb is NULL.",
pthread_self());
}
protocol->state = MYSQL_IDLE; protocol->state = MYSQL_IDLE;
} }
rc = 1; rc = 1;
@ -683,19 +666,19 @@ int gw_read_client_event(DCB* dcb) {
/* We can route the query */ /* We can route the query */
/* COM_QUIT handling */ /* COM_QUIT handling */
if (mysql_command == '\x01') { if (mysql_command == '\x01') {
/* fprintf(stderr, "COM_QUIT received from %i and /**
* passed to backed\n", dcb->fd); */ * fprintf(stderr, "COM_QUIT received from %i and
/* this will propagate COM_QUIT to backend(s) */ * passed to backed\n", dcb->fd);
//fprintf(stderr, "<<< Routing the COM_QUIT ...\n"); * this will propagate COM_QUIT to backend(s)
router->routeQuery(router_instance, * fprintf(stderr, "<<< Routing the COM_QUIT ...\n");
rsession, */
queue); router->routeQuery(router_instance, rsession, queue);
/* close client connection */ /* close client connection */
(dcb->func).close(dcb); (dcb->func).close(dcb);
rc = 1; rc = 1;
return rc; goto return_rc;
} }
/* MySQL Command Routing */ /* MySQL Command Routing */
protocol->state = MYSQL_ROUTING; protocol->state = MYSQL_ROUTING;
@ -715,65 +698,66 @@ int gw_read_client_event(DCB* dcb) {
rc = 0; rc = 0;
return_rc: return_rc:
dcb->state = DCB_STATE_POLLING;
return rc; return rc;
} }
/////////////////////////////////////////////// ///////////////////////////////////////////////
// client write event to Client triggered by EPOLLOUT // client write event to Client triggered by EPOLLOUT
////////////////////////////////////////////// //////////////////////////////////////////////
int gw_write_client_event(DCB *dcb) { int gw_write_client_event(DCB *dcb)
{
MySQLProtocol *protocol = NULL; MySQLProtocol *protocol = NULL;
CHK_DCB(dcb);
if (dcb == NULL) { if (dcb == NULL) {
fprintf(stderr, "DCB is NULL, return\n"); fprintf(stderr, "DCB is NULL, return\n");
return 1; return 1;
} }
ss_dassert(dcb->state != DCB_STATE_DISCONNECTED);
if (dcb->state == DCB_STATE_DISCONNECTED) { if (dcb->state == DCB_STATE_DISCONNECTED) {
return 1; return 1;
} }
dcb->state = DCB_STATE_PROCESSING;
if (dcb->protocol) { if (dcb->protocol) {
protocol = DCB_PROTOCOL(dcb, MySQLProtocol); protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
} else { } else {
fprintf(stderr, "DCB protocol is NULL, return\n"); goto return_1;
dcb->state = DCB_STATE_POLLING;
return 1;
} }
if (protocol->state == MYSQL_IDLE || if (protocol->state == MYSQL_IDLE ||
protocol->state == MYSQL_WAITING_RESULT) protocol->state == MYSQL_WAITING_RESULT)
{ {
dcb_drain_writeq(dcb); dcb_drain_writeq(dcb);
dcb->state = DCB_STATE_POLLING; goto return_1;
return 1;
} }
dcb->state = DCB_STATE_POLLING;
return 1; return_1:
return 1;
} }
/// /**
// set listener for mysql protocol, retur 1 on success and 0 in failure * set listener for mysql protocol, retur 1 on success and 0 in failure
/// */
int gw_MySQLListener(DCB *listener, char *config_bind) { int gw_MySQLListener(
DCB *listen_dcb,
char *config_bind)
{
int l_so; int l_so;
struct sockaddr_in serv_addr; struct sockaddr_in serv_addr;
char *bind_address_and_port = NULL; char *bind_address_and_port = NULL;
char *p; char *p;
char address[1024] = ""; char address[1024] = "";
int port=0; int port = 0;
int one = 1; int one = 1;
// this gateway, as default, will bind on port 4404 for localhost only /* this gateway, as default, will bind on port 4404 for localhost only */
if (config_bind != NULL) { if (config_bind != NULL) {
bind_address_and_port = config_bind; bind_address_and_port = config_bind;
} else { } else {
bind_address_and_port = "127.0.0.1:4406"; bind_address_and_port = "127.0.0.1:4406";
} }
listener->fd = -1; listen_dcb->fd = -1;
memset(&serv_addr, 0, sizeof serv_addr); memset(&serv_addr, 0, sizeof serv_addr);
serv_addr.sin_family = AF_INET; serv_addr.sin_family = AF_INET;
p = strchr(bind_address_and_port, ':'); p = strchr(bind_address_and_port, ':');
@ -829,12 +813,11 @@ int gw_MySQLListener(DCB *listener, char *config_bind) {
fprintf(stderr, fprintf(stderr,
">> GATEWAY listen backlog queue is %i\n", ">> GATEWAY listen backlog queue is %i\n",
10 * SOMAXCONN); 10 * SOMAXCONN);
listener->state = DCB_STATE_IDLE;
// assign l_so to dcb // assign l_so to dcb
listener->fd = l_so; listen_dcb->fd = l_so;
// add listening socket to poll structure // add listening socket to poll structure
if (poll_add_dcb(listener) == -1) { if (poll_add_dcb(listen_dcb) == -1) {
fprintf(stderr, fprintf(stderr,
">>> poll_add_dcb: can't add the listen_sock! Errno " ">>> poll_add_dcb: can't add the listen_sock! Errno "
"%i, %s\n", "%i, %s\n",
@ -842,15 +825,14 @@ int gw_MySQLListener(DCB *listener, char *config_bind) {
strerror(errno)); strerror(errno));
return 0; return 0;
} }
listener->func.accept = gw_MySQLAccept; listen_dcb->func.accept = gw_MySQLAccept;
listener->state = DCB_STATE_LISTENING;
return 1; return 1;
} }
int gw_MySQLAccept(DCB *listener) { int gw_MySQLAccept(DCB *listener)
{
fprintf(stderr, "MySQL Listener socket is: %i\n", listener->fd); fprintf(stderr, "MySQL Listener socket is: %i\n", listener->fd);
while (1) { while (1) {
@ -863,7 +845,9 @@ int gw_MySQLAccept(DCB *listener) {
socklen_t optlen = sizeof(sendbuf); socklen_t optlen = sizeof(sendbuf);
// new connection from client // new connection from client
c_sock = accept(listener->fd, (struct sockaddr *) &local, &addrlen); c_sock = accept(listener->fd,
(struct sockaddr *) &local,
&addrlen);
if (c_sock == -1) { if (c_sock == -1) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
@ -892,7 +876,7 @@ int gw_MySQLAccept(DCB *listener) {
setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen); setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen);
setnonblocking(c_sock); setnonblocking(c_sock);
client_dcb = dcb_alloc(); client_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
client_dcb->service = listener->session->service; client_dcb->service = listener->session->service;
client_dcb->fd = c_sock; client_dcb->fd = c_sock;
client_dcb->remote = strdup(inet_ntoa(local.sin_addr)); client_dcb->remote = strdup(inet_ntoa(local.sin_addr));
@ -909,9 +893,6 @@ int gw_MySQLAccept(DCB *listener) {
} }
// assign function poiters to "func" field // assign function poiters to "func" field
memcpy(&client_dcb->func, &MyObject, sizeof(GWPROTOCOL)); memcpy(&client_dcb->func, &MyObject, sizeof(GWPROTOCOL));
client_dcb->state = DCB_STATE_IDLE;
//send handshake to the client_dcb //send handshake to the client_dcb
MySQLSendHandshake(client_dcb); MySQLSendHandshake(client_dcb);
@ -919,21 +900,17 @@ int gw_MySQLAccept(DCB *listener) {
protocol->state = MYSQL_AUTH_SENT; protocol->state = MYSQL_AUTH_SENT;
/** /**
* Set new descriptor to event set. Before that * Set new descriptor to event set. At the same time,
* change state to DCB_STATE_POLLING so that * change state to DCB_STATE_POLLING so that
* thread which wakes up sees correct state. * thread which wakes up sees correct state.
*
*/ */
client_dcb->state = DCB_STATE_POLLING;
if (poll_add_dcb(client_dcb) == -1) if (poll_add_dcb(client_dcb) == -1)
{ {
/** Return to previous state. */ /** Previous state is recovered in poll_add_dcb. */
client_dcb->state = DCB_STATE_IDLE;
skygw_log_write_flush( skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"%lu [gw_MySQLAccept] Failed to add dcb %p for fd " "%lu [gw_MySQLAccept] Failed to add dcb %p for "
"%d to epoll set.", "fd %d to epoll set.",
pthread_self(), pthread_self(),
client_dcb, client_dcb,
client_dcb->fd); client_dcb->fd);
@ -957,7 +934,7 @@ int gw_MySQLAccept(DCB *listener) {
*/ */
static int gw_error_client_event(DCB *dcb) { static int gw_error_client_event(DCB *dcb) {
//fprintf(stderr, "#### Handle error function gw_error_client_event, for [%i] is [%s]\n", dcb->fd, gw_dcb_state2string(dcb->state)); //fprintf(stderr, "#### Handle error function gw_error_client_event, for [%i] is [%s]\n", dcb->fd, gw_dcb_state2string(dcb->state));
//dcb_close(dcb); dcb_close(dcb);
return 1; return 1;
} }

View File

@ -71,44 +71,6 @@ return_p:
return p; return p;
} }
#if 0
/**
* gw_mysql_init
*
* Initialize mysql protocol struct
*
* @param data The MySQLProtocol pointer, usually NULL
* @return The new MySQLProtocol allocated
*
*/
MySQLProtocol *gw_mysql_init(MySQLProtocol *data) {
MySQLProtocol *input = NULL;
// structure allocation
input = calloc(1, sizeof(MySQLProtocol));
if (input == NULL) {
int eno = errno;
errno = 0;
skygw_log_write_flush(
LOGFILE_ERROR,
"%lu [gw_mysql_init] failed to allocate memory for MySQL "
"protocol object. Errno %d, %s.",
pthread_self(),
eno,
strerror(eno));
return NULL;
}
input->protocol_chk_top = CHK_NUM_PROTOCOL;
input->protocol_chk_tail = CHK_NUM_PROTOCOL;
#ifdef MYSQL_CONN_DEBUG
fprintf(stderr, "gw_mysql_init() called\n");
#endif
return input;
}
#endif
/** /**
* gw_mysql_close * gw_mysql_close
@ -502,7 +464,10 @@ int gw_do_connect_to_backend(
struct sockaddr_in serv_addr; struct sockaddr_in serv_addr;
int rv; int rv;
int so = 0; int so = 0;
DCB* dcb = conn->descriptor;
CHK_DCB(dcb);
memset(&serv_addr, 0, sizeof serv_addr); memset(&serv_addr, 0, sizeof serv_addr);
serv_addr.sin_family = AF_INET; serv_addr.sin_family = AF_INET;
so = socket(AF_INET,SOCK_STREAM,0); so = socket(AF_INET,SOCK_STREAM,0);
@ -513,8 +478,9 @@ int gw_do_connect_to_backend(
errno = 0; errno = 0;
skygw_log_write_flush( skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"%lu [gw_do_connect_to_backend] Establishing connection to " "%lu [gw_do_connect_to_backend] Establishing connection "
"back-end server failed. Socket creation failed due %d, %s.", "to back-end server failed. Socket creation failed due "
"%d, %s.",
pthread_self(), pthread_self(),
eno, eno,
strerror(eno)); strerror(eno));
@ -522,7 +488,7 @@ int gw_do_connect_to_backend(
goto return_rv; goto return_rv;
} }
/* Assign so to the caller dcb, conn->descriptor */ /* Assign so to the caller dcb, conn->descriptor */
conn->descriptor->fd = so; dcb->fd = so;
/* prepare for connect */ /* prepare for connect */
setipaddress(&serv_addr.sin_addr, host); setipaddress(&serv_addr.sin_addr, host);
serv_addr.sin_port = htons(port); serv_addr.sin_port = htons(port);
@ -552,7 +518,7 @@ int gw_do_connect_to_backend(
/** /**
* Add the dcb in the poll set * Add the dcb in the poll set
*/ */
poll_add_dcb(conn->descriptor); poll_add_dcb(dcb);
return_rv: return_rv:
return rv; return rv;
} }
@ -1007,7 +973,6 @@ int gw_find_mysql_user_password_sha1(char *username, uint8_t *gateway_password,
if (strlen(user_password)) if (strlen(user_password))
gw_hex2bin(gateway_password, user_password, SHA_DIGEST_LENGTH * 2); gw_hex2bin(gateway_password, user_password, SHA_DIGEST_LENGTH * 2);
return 0; return 0;
} }
@ -1089,4 +1054,3 @@ mysql_send_auth_error (DCB *dcb, int packet_number, int in_affected_rows, const
return sizeof(mysql_packet_header) + mysql_payload_size; return sizeof(mysql_packet_header) + mysql_payload_size;
} }
///

View File

@ -144,7 +144,6 @@ char *password, *t;
if ((n = dcb_read(dcb, &head)) != -1) if ((n = dcb_read(dcb, &head)) != -1)
{ {
dcb->state = DCB_STATE_PROCESSING;
if (head) if (head)
{ {
unsigned char *ptr = GWBUF_DATA(head); unsigned char *ptr = GWBUF_DATA(head);
@ -198,8 +197,6 @@ char *password, *t;
} }
} }
} }
dcb->state = DCB_STATE_POLLING;
return n; return n;
} }
@ -269,40 +266,48 @@ int n_connect = 0;
int so; int so;
struct sockaddr_in addr; struct sockaddr_in addr;
socklen_t addrlen = sizeof(struct sockaddr); socklen_t addrlen = sizeof(struct sockaddr);
DCB *client; DCB *client_dcb;
TELNETD* telnetd_pr = NULL;
dcb_state_t old_state = DCB_STATE_UNDEFINED;
bool succp = FALSE;
if ((so = accept(dcb->fd, (struct sockaddr *)&addr, &addrlen)) == -1) so = accept(dcb->fd, (struct sockaddr *)&addr, &addrlen);
if (so == -1)
return n_connect; return n_connect;
else else
{ {
atomic_add(&dcb->stats.n_accepts, 1); atomic_add(&dcb->stats.n_accepts, 1);
if ((client = dcb_alloc()) == NULL) client_dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
if (client_dcb == NULL)
{ {
return n_connect; return n_connect;
} }
client->fd = so; client_dcb->fd = so;
client->remote = strdup(inet_ntoa(addr.sin_addr)); client_dcb->remote = strdup(inet_ntoa(addr.sin_addr));
memcpy(&client->func, &MyObject, sizeof(GWPROTOCOL)); memcpy(&client_dcb->func, &MyObject, sizeof(GWPROTOCOL));
client->session = session_alloc(dcb->session->service, client); client_dcb->session =
session_alloc(dcb->session->service, client_dcb);
telnetd_pr = (TELNETD *)malloc(sizeof(TELNETD));
client_dcb->protocol = (void *)telnetd_pr;
client->state = DCB_STATE_IDLE; if (telnetd_pr == NULL)
if ((client->protocol = malloc(sizeof(TELNETD))) == NULL) {
{ dcb_add_to_zombieslist(client_dcb);
dcb_free(client);
return n_connect; return n_connect;
} }
if (poll_add_dcb(client) == -1) if (poll_add_dcb(client_dcb) == -1)
{ {
dcb_free(client); dcb_add_to_zombieslist(dcb);
return n_connect; return n_connect;
} }
n_connect++; n_connect++;
telnetd_pr->state = TELNETD_STATE_LOGIN;
((TELNETD *)(client->protocol))->state = TELNETD_STATE_LOGIN; telnetd_pr->username = NULL;
((TELNETD *)(client->protocol))->username = NULL; dcb_printf(client_dcb, "MaxScale login: ");
dcb_printf(client, "MaxScale login: ");
client->state = DCB_STATE_POLLING;
} }
} }
return n_connect; return n_connect;
@ -368,8 +373,6 @@ short pnum;
{ {
return 0; return 0;
} }
listener->state = DCB_STATE_LISTENING;
listen(listener->fd, SOMAXCONN); listen(listener->fd, SOMAXCONN);
if (poll_add_dcb(listener) == -1) if (poll_add_dcb(listener) == -1)

View File

@ -143,19 +143,19 @@ typedef enum skygw_chk_t {
((p) == COM_DAEMON ? "COM_DAEMON" : "UNKNOWN MYSQL PACKET TYPE"))))))))))))))) ((p) == COM_DAEMON ? "COM_DAEMON" : "UNKNOWN MYSQL PACKET TYPE")))))))))))))))
#define STRDCBSTATE(s) ((s) == DCB_STATE_ALLOC ? "DCB_STATE_ALLOC" : \ #define STRDCBSTATE(s) ((s) == DCB_STATE_ALLOC ? "DCB_STATE_ALLOC" : \
((s) == DCB_STATE_IDLE ? "DCB_STATE_IDLE" : \ ((s) == DCB_STATE_POLLING ? "DCB_STATE_POLLING" : \
((s) == DCB_STATE_POLLING ? "DCB_STATE_POLLING" : \ ((s) == DCB_STATE_LISTENING ? "DCB_STATE_LISTENING" : \
((s) == DCB_STATE_PROCESSING ? "DCB_STATE_PROCESSING" : \ ((s) == DCB_STATE_DISCONNECTED ? "DCB_STATE_DISCONNECTED" : \
((s) == DCB_STATE_LISTENING ? "DCB_STATE_LISTENING" : \ ((s) == DCB_STATE_NOPOLLING ? "DCB_STATE_NOPOLLING" : \
((s) == DCB_STATE_DISCONNECTED ? "DCB_STATE_DISCONNECTED" : \ ((s) == DCB_STATE_FREED ? "DCB_STATE_FREED" : \
((s) == DCB_STATE_FREED ? "DCB_STATE_FREED" : \ ((s) == DCB_STATE_ZOMBIE ? "DCB_STATE_ZOMBIE" : \
((s) == DCB_STATE_ZOMBIE ? "DCB_STATE_ZOMBIE" : "DCB_STATE_UNKNOWN")))))))) ((s) == DCB_STATE_UNDEFINED ? "DCB_STATE_UNDEFINED" : "DCB_STATE_UNKNOWN"))))))))
#define STRSESSIONSTATE(s) ((s) == SESSION_STATE_ALLOC ? "SESSION_STATE_ALLOC" : \ #define STRSESSIONSTATE(s) ((s) == SESSION_STATE_ALLOC ? "SESSION_STATE_ALLOC" : \
((s) == SESSION_STATE_READY ? "SESSION_STATE_READY" : \ ((s) == SESSION_STATE_READY ? "SESSION_STATE_READY" : \
((s) == SESSION_STATE_LISTENER ? "SESSION_STATE_LISTENER" : \ ((s) == SESSION_STATE_LISTENER ? "SESSION_STATE_LISTENER" : \
((s) == SESSION_STATE_LISTENER_STOPPED ? "SESSION_STATE_LISTENER_STOPPED" : \ ((s) == SESSION_STATE_LISTENER_STOPPED ? "SESSION_STATE_LISTENER_STOPPED" : \
"SESSION_STATE_UNKNOWN")))) "SESSION_STATE_UNKNOWN"))))
#define CHK_MLIST(l) { \ #define CHK_MLIST(l) { \