From ae9fce85917cacc6fe8e1fe02da78ba08d2e508d Mon Sep 17 00:00:00 2001 From: vraatikka Date: Wed, 28 Aug 2013 23:09:37 +0300 Subject: [PATCH] Added write and read lock to DCB to protect concurrent threads fro executing EPOLLIN and EPOLLOUT events on same descriptor, respectively. Added consistency checks and trace logging. --- server/core/dcb.c | 33 +++++++++++++++++++++++++-------- server/core/poll.c | 39 ++++++++++++++++++++++++++++++++++++--- server/include/dcb.h | 43 +++++++++++++++++++++++++++++++++---------- 3 files changed, 94 insertions(+), 21 deletions(-) diff --git a/server/core/dcb.c b/server/core/dcb.c index 0c358ad4a..de7267aaf 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -84,6 +84,12 @@ DCB *rval; { return NULL; } + rval->dcb_chk_top = CHK_NUM_DCB; + rval->dcb_chk_tail = CHK_NUM_DCB; + simple_mutex_init(&rval->dcb_write_lock, "DCB write mutex"); + simple_mutex_init(&rval->dcb_read_lock, "DCB read mutex"); + rval->dcb_write_active = FALSE; + rval->dcb_read_active = FALSE; spinlock_init(&rval->writeqlock); spinlock_init(&rval->delayqlock); spinlock_init(&rval->authlock); @@ -158,7 +164,7 @@ dcb_free(DCB *dcb) } spinlock_release(&zombiespin); - skygw_log_write( + skygw_log_write_flush( LOGFILE_TRACE, "%lu [dcb_free] Set dcb %p for fd %d DCB_STATE_ZOMBIE", pthread_self(), @@ -256,11 +262,13 @@ DCB *ptr, *lptr; zombies = tptr; else lptr->memdata.next = tptr; - skygw_log_write( + skygw_log_write_flush( LOGFILE_TRACE, - "%lu [dcb_process_zombies] Free dcb %p for fd %d", + "%lu [dcb_process_zombies] Free dcb %p in state " + "%s for fd %d", pthread_self(), (unsigned long)ptr, + STRDCBSTATE(ptr->state), ptr->fd); dcb_final_free(ptr); ptr = tptr; @@ -309,15 +317,17 @@ GWPROTOCOL *funcs; if ((dcb->fd = dcb->func.connect(dcb, server, session)) == -1) { dcb_final_free(dcb); - skygw_log_write( LOGFILE_ERROR, "Failed to connect to server %s:%d, free dcb %p\n", - server->name, server->port, dcb); + skygw_log_write(LOGFILE_ERROR, + "Failed to connect to server %s:%d, free dcb %p\n", + server->name, + server->port, + dcb); return NULL; } /* * The dcb will be addded into poll set by dcb->func.connect */ - atomic_add(&server->stats.n_connections, 1); atomic_add(&server->stats.n_current, 1); @@ -600,7 +610,7 @@ dcb_close(DCB *dcb) } poll_remove_dcb(dcb); close(dcb->fd); - dcb->state = DCB_STATE_DISCONNECTED; + dcb->state = DCB_STATE_DISCONNECTED; spinlock_release(&dcb->writeqlock); if (dcb_isclient(dcb)) @@ -631,11 +641,18 @@ dcb_close(DCB *dcb) } else { skygw_log_write_flush( LOGFILE_TRACE, - "%lu [dcb_close] rsession was NULL in dcb_close.", + "%lu [dcb_close] rsession was NULL in " + "dcb_close.", pthread_self()); } } session_free(dcb->session); + skygw_log_write_flush( + LOGFILE_TRACE, + "%lu [dcb_close] DCB %p freed session %p", + pthread_self(), + dcb, + dcb->session); } dcb_free(dcb); } diff --git a/server/core/poll.c b/server/core/poll.c index a5220b6a8..c7ee6cc55 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -201,7 +201,8 @@ poll_waitevents(void *arg) { DCB *dcb = (DCB *)events[i].data.ptr; __uint32_t ev = events[i].events; - + + CHK_DCB(dcb); skygw_log_write( LOGFILE_TRACE, "%lu [poll_waitevents] event %d", @@ -218,11 +219,24 @@ poll_waitevents(void *arg) continue; } + if (dcb->state == DCB_STATE_DISCONNECTED || + dcb->state == DCB_STATE_PROCESSING) + { + skygw_log_write( + LOGFILE_TRACE, + "%lu [poll_waitevents] dcb state is " + "%s", + pthread_self(), + STRDCBSTATE(dcb->state)); + continue; + } + + if (ev & EPOLLERR) { atomic_add(&pollStats.n_error, 1); dcb->func.error(dcb); - + if (DCB_ISZOMBIE(dcb)) { continue; } @@ -238,16 +252,32 @@ poll_waitevents(void *arg) } if (ev & EPOLLOUT) { + simple_mutex_lock(&dcb->dcb_write_lock, true); + ss_info_dassert(!dcb->dcb_write_active, + "Write already active"); + dcb->dcb_write_active = TRUE; + skygw_log_write(LOGFILE_TRACE, "%lu [poll_waitevents] " "Write in fd %d", pthread_self(), dcb->fd); atomic_add(&pollStats.n_write, 1); - dcb->func.write_ready(dcb); + + dcb->func.write_ready(dcb); + + dcb->dcb_write_active = FALSE; + simple_mutex_unlock( + &dcb->dcb_write_lock); } if (ev & EPOLLIN) { + simple_mutex_lock(&dcb->dcb_read_lock, + true); + ss_info_dassert(!dcb->dcb_read_active, + "Read already active"); + dcb->dcb_read_active = TRUE; + if (dcb->state == DCB_STATE_LISTENING) { skygw_log_write( @@ -270,6 +300,9 @@ poll_waitevents(void *arg) atomic_add(&pollStats.n_read, 1); dcb->func.read(dcb); } + dcb->dcb_read_active = FALSE; + simple_mutex_unlock( + &dcb->dcb_read_lock); } } /**< for */ no_op = FALSE; diff --git a/server/include/dcb.h b/server/include/dcb.h index f55fdb9d0..cccaca01c 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -122,6 +122,30 @@ typedef struct { struct dcb *next; /**< Next pointer for the zombie list */ } DCBMM; +/* DCB states */ +#if 0 +#define DCB_STATE_ALLOC 0 /**< Memory allocated but not populated */ +#define DCB_STATE_IDLE 1 /**< Not yet in the poll mask */ +#define DCB_STATE_POLLING 2 /**< Waiting in the poll loop */ +#define DCB_STATE_PROCESSING 4 /**< Processing an event */ +#define DCB_STATE_LISTENING 5 /**< The DCB is for a listening socket */ +#define DCB_STATE_DISCONNECTED 6 /**< The socket is now closed */ +#define DCB_STATE_FREED 7 /**< Memory freed */ +#define DCB_STATE_ZOMBIE 8 /**< DCB is no longer active, waiting to free it */ +#else +typedef enum { + DCB_STATE_ALLOC, /**< Memory allocated but not populated */ + DCB_STATE_IDLE, /**< Not yet in the poll mask */ + DCB_STATE_POLLING, /**< Waiting in the poll loop */ + DCB_STATE_PROCESSING, /**< Processing an event */ + DCB_STATE_LISTENING, /**< The DCB is for a listening socket */ + DCB_STATE_DISCONNECTED, /**< The socket is now closed */ + DCB_STATE_FREED, /**< Memory freed */ + DCB_STATE_ZOMBIE /**< DCB is no longer active, waiting to free it */ +} dcb_state_t; +#endif + + /** * Descriptor Control Block * @@ -134,8 +158,15 @@ typedef struct { * gateway may be selected to execute the required actions when a network event occurs. */ typedef struct dcb { + skygw_chk_t dcb_chk_top; + simple_mutex_t dcb_read_lock; + simple_mutex_t dcb_write_lock; int fd; /**< The descriptor */ - int state; /**< Current descriptor state */ +#if defined(SS_DEBUG) + bool dcb_read_active; + bool dcb_write_active; +#endif + dcb_state_t state; /**< Current descriptor state */ char *remote; /**< Address of remote end */ void *protocol; /**< The protocol specific state */ struct session *session; /**< The owning session */ @@ -154,17 +185,9 @@ typedef struct dcb { void *data; /**< Specific client data */ DCBMM memdata; /**< The data related to DCB memory management */ int command; /**< Specific client command type */ + skygw_chk_t dcb_chk_tail; } DCB; -/* DCB states */ -#define DCB_STATE_ALLOC 0 /**< Memory allocated but not populated */ -#define DCB_STATE_IDLE 1 /**< Not yet in the poll mask */ -#define DCB_STATE_POLLING 2 /**< Waiting in the poll loop */ -#define DCB_STATE_PROCESSING 4 /**< Processing an event */ -#define DCB_STATE_LISTENING 5 /**< The DCB is for a listening socket */ -#define DCB_STATE_DISCONNECTED 6 /**< The socket is now closed */ -#define DCB_STATE_FREED 7 /**< Memory freed */ -#define DCB_STATE_ZOMBIE 8 /**< DCB is no longer active, waiting to free it */ /* A few useful macros */ #define DCB_SESSION(x) (x)->session