Added write and read lock to DCB to protect concurrent threads fro executing EPOLLIN and EPOLLOUT events on same descriptor, respectively. Added consistency checks and trace logging.
This commit is contained in:
parent
f2f119f692
commit
ae9fce8591
@ -84,6 +84,12 @@ DCB *rval;
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
rval->dcb_chk_top = CHK_NUM_DCB;
|
||||
rval->dcb_chk_tail = CHK_NUM_DCB;
|
||||
simple_mutex_init(&rval->dcb_write_lock, "DCB write mutex");
|
||||
simple_mutex_init(&rval->dcb_read_lock, "DCB read mutex");
|
||||
rval->dcb_write_active = FALSE;
|
||||
rval->dcb_read_active = FALSE;
|
||||
spinlock_init(&rval->writeqlock);
|
||||
spinlock_init(&rval->delayqlock);
|
||||
spinlock_init(&rval->authlock);
|
||||
@ -158,7 +164,7 @@ dcb_free(DCB *dcb)
|
||||
}
|
||||
spinlock_release(&zombiespin);
|
||||
|
||||
skygw_log_write(
|
||||
skygw_log_write_flush(
|
||||
LOGFILE_TRACE,
|
||||
"%lu [dcb_free] Set dcb %p for fd %d DCB_STATE_ZOMBIE",
|
||||
pthread_self(),
|
||||
@ -256,11 +262,13 @@ DCB *ptr, *lptr;
|
||||
zombies = tptr;
|
||||
else
|
||||
lptr->memdata.next = tptr;
|
||||
skygw_log_write(
|
||||
skygw_log_write_flush(
|
||||
LOGFILE_TRACE,
|
||||
"%lu [dcb_process_zombies] Free dcb %p for fd %d",
|
||||
"%lu [dcb_process_zombies] Free dcb %p in state "
|
||||
"%s for fd %d",
|
||||
pthread_self(),
|
||||
(unsigned long)ptr,
|
||||
STRDCBSTATE(ptr->state),
|
||||
ptr->fd);
|
||||
dcb_final_free(ptr);
|
||||
ptr = tptr;
|
||||
@ -309,15 +317,17 @@ GWPROTOCOL *funcs;
|
||||
if ((dcb->fd = dcb->func.connect(dcb, server, session)) == -1)
|
||||
{
|
||||
dcb_final_free(dcb);
|
||||
skygw_log_write( LOGFILE_ERROR, "Failed to connect to server %s:%d, free dcb %p\n",
|
||||
server->name, server->port, dcb);
|
||||
skygw_log_write(LOGFILE_ERROR,
|
||||
"Failed to connect to server %s:%d, free dcb %p\n",
|
||||
server->name,
|
||||
server->port,
|
||||
dcb);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* The dcb will be addded into poll set by dcb->func.connect
|
||||
*/
|
||||
|
||||
atomic_add(&server->stats.n_connections, 1);
|
||||
atomic_add(&server->stats.n_current, 1);
|
||||
|
||||
@ -600,7 +610,7 @@ dcb_close(DCB *dcb)
|
||||
}
|
||||
poll_remove_dcb(dcb);
|
||||
close(dcb->fd);
|
||||
dcb->state = DCB_STATE_DISCONNECTED;
|
||||
dcb->state = DCB_STATE_DISCONNECTED;
|
||||
spinlock_release(&dcb->writeqlock);
|
||||
|
||||
if (dcb_isclient(dcb))
|
||||
@ -631,11 +641,18 @@ dcb_close(DCB *dcb)
|
||||
} else {
|
||||
skygw_log_write_flush(
|
||||
LOGFILE_TRACE,
|
||||
"%lu [dcb_close] rsession was NULL in dcb_close.",
|
||||
"%lu [dcb_close] rsession was NULL in "
|
||||
"dcb_close.",
|
||||
pthread_self());
|
||||
}
|
||||
}
|
||||
session_free(dcb->session);
|
||||
skygw_log_write_flush(
|
||||
LOGFILE_TRACE,
|
||||
"%lu [dcb_close] DCB %p freed session %p",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
dcb->session);
|
||||
}
|
||||
dcb_free(dcb);
|
||||
}
|
||||
|
@ -201,7 +201,8 @@ poll_waitevents(void *arg)
|
||||
{
|
||||
DCB *dcb = (DCB *)events[i].data.ptr;
|
||||
__uint32_t ev = events[i].events;
|
||||
|
||||
|
||||
CHK_DCB(dcb);
|
||||
skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"%lu [poll_waitevents] event %d",
|
||||
@ -218,11 +219,24 @@ poll_waitevents(void *arg)
|
||||
continue;
|
||||
}
|
||||
|
||||
if (dcb->state == DCB_STATE_DISCONNECTED ||
|
||||
dcb->state == DCB_STATE_PROCESSING)
|
||||
{
|
||||
skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"%lu [poll_waitevents] dcb state is "
|
||||
"%s",
|
||||
pthread_self(),
|
||||
STRDCBSTATE(dcb->state));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
if (ev & EPOLLERR)
|
||||
{
|
||||
atomic_add(&pollStats.n_error, 1);
|
||||
dcb->func.error(dcb);
|
||||
|
||||
|
||||
if (DCB_ISZOMBIE(dcb)) {
|
||||
continue;
|
||||
}
|
||||
@ -238,16 +252,32 @@ poll_waitevents(void *arg)
|
||||
}
|
||||
if (ev & EPOLLOUT)
|
||||
{
|
||||
simple_mutex_lock(&dcb->dcb_write_lock, true);
|
||||
ss_info_dassert(!dcb->dcb_write_active,
|
||||
"Write already active");
|
||||
dcb->dcb_write_active = TRUE;
|
||||
|
||||
skygw_log_write(LOGFILE_TRACE,
|
||||
"%lu [poll_waitevents] "
|
||||
"Write in fd %d",
|
||||
pthread_self(),
|
||||
dcb->fd);
|
||||
atomic_add(&pollStats.n_write, 1);
|
||||
dcb->func.write_ready(dcb);
|
||||
|
||||
dcb->func.write_ready(dcb);
|
||||
|
||||
dcb->dcb_write_active = FALSE;
|
||||
simple_mutex_unlock(
|
||||
&dcb->dcb_write_lock);
|
||||
}
|
||||
if (ev & EPOLLIN)
|
||||
{
|
||||
simple_mutex_lock(&dcb->dcb_read_lock,
|
||||
true);
|
||||
ss_info_dassert(!dcb->dcb_read_active,
|
||||
"Read already active");
|
||||
dcb->dcb_read_active = TRUE;
|
||||
|
||||
if (dcb->state == DCB_STATE_LISTENING)
|
||||
{
|
||||
skygw_log_write(
|
||||
@ -270,6 +300,9 @@ poll_waitevents(void *arg)
|
||||
atomic_add(&pollStats.n_read, 1);
|
||||
dcb->func.read(dcb);
|
||||
}
|
||||
dcb->dcb_read_active = FALSE;
|
||||
simple_mutex_unlock(
|
||||
&dcb->dcb_read_lock);
|
||||
}
|
||||
} /**< for */
|
||||
no_op = FALSE;
|
||||
|
@ -122,6 +122,30 @@ typedef struct {
|
||||
struct dcb *next; /**< Next pointer for the zombie list */
|
||||
} DCBMM;
|
||||
|
||||
/* DCB states */
|
||||
#if 0
|
||||
#define DCB_STATE_ALLOC 0 /**< Memory allocated but not populated */
|
||||
#define DCB_STATE_IDLE 1 /**< Not yet in the poll mask */
|
||||
#define DCB_STATE_POLLING 2 /**< Waiting in the poll loop */
|
||||
#define DCB_STATE_PROCESSING 4 /**< Processing an event */
|
||||
#define DCB_STATE_LISTENING 5 /**< The DCB is for a listening socket */
|
||||
#define DCB_STATE_DISCONNECTED 6 /**< The socket is now closed */
|
||||
#define DCB_STATE_FREED 7 /**< Memory freed */
|
||||
#define DCB_STATE_ZOMBIE 8 /**< DCB is no longer active, waiting to free it */
|
||||
#else
|
||||
typedef enum {
|
||||
DCB_STATE_ALLOC, /**< Memory allocated but not populated */
|
||||
DCB_STATE_IDLE, /**< Not yet in the poll mask */
|
||||
DCB_STATE_POLLING, /**< Waiting in the poll loop */
|
||||
DCB_STATE_PROCESSING, /**< Processing an event */
|
||||
DCB_STATE_LISTENING, /**< The DCB is for a listening socket */
|
||||
DCB_STATE_DISCONNECTED, /**< The socket is now closed */
|
||||
DCB_STATE_FREED, /**< Memory freed */
|
||||
DCB_STATE_ZOMBIE /**< DCB is no longer active, waiting to free it */
|
||||
} dcb_state_t;
|
||||
#endif
|
||||
|
||||
|
||||
/**
|
||||
* Descriptor Control Block
|
||||
*
|
||||
@ -134,8 +158,15 @@ typedef struct {
|
||||
* gateway may be selected to execute the required actions when a network event occurs.
|
||||
*/
|
||||
typedef struct dcb {
|
||||
skygw_chk_t dcb_chk_top;
|
||||
simple_mutex_t dcb_read_lock;
|
||||
simple_mutex_t dcb_write_lock;
|
||||
int fd; /**< The descriptor */
|
||||
int state; /**< Current descriptor state */
|
||||
#if defined(SS_DEBUG)
|
||||
bool dcb_read_active;
|
||||
bool dcb_write_active;
|
||||
#endif
|
||||
dcb_state_t state; /**< Current descriptor state */
|
||||
char *remote; /**< Address of remote end */
|
||||
void *protocol; /**< The protocol specific state */
|
||||
struct session *session; /**< The owning session */
|
||||
@ -154,17 +185,9 @@ typedef struct dcb {
|
||||
void *data; /**< Specific client data */
|
||||
DCBMM memdata; /**< The data related to DCB memory management */
|
||||
int command; /**< Specific client command type */
|
||||
skygw_chk_t dcb_chk_tail;
|
||||
} DCB;
|
||||
|
||||
/* DCB states */
|
||||
#define DCB_STATE_ALLOC 0 /**< Memory allocated but not populated */
|
||||
#define DCB_STATE_IDLE 1 /**< Not yet in the poll mask */
|
||||
#define DCB_STATE_POLLING 2 /**< Waiting in the poll loop */
|
||||
#define DCB_STATE_PROCESSING 4 /**< Processing an event */
|
||||
#define DCB_STATE_LISTENING 5 /**< The DCB is for a listening socket */
|
||||
#define DCB_STATE_DISCONNECTED 6 /**< The socket is now closed */
|
||||
#define DCB_STATE_FREED 7 /**< Memory freed */
|
||||
#define DCB_STATE_ZOMBIE 8 /**< DCB is no longer active, waiting to free it */
|
||||
|
||||
/* A few useful macros */
|
||||
#define DCB_SESSION(x) (x)->session
|
||||
|
Loading…
x
Reference in New Issue
Block a user