When there is no load but there are zombies to be cleaned up, it used to take at least nthreads*timeout time to get socket closed. Now in this case, once the timeout exceeds for the first time, all threads are allowed to call dcb_process_zombies without having to wait the timeout period until there are no zombies anymore.
This commit is contained in:
@ -73,6 +73,11 @@ static bool dcb_set_state_nomutex(
|
|||||||
const dcb_state_t new_state,
|
const dcb_state_t new_state,
|
||||||
dcb_state_t* old_state);
|
dcb_state_t* old_state);
|
||||||
|
|
||||||
|
DCB* dcb_get_zombies(void)
|
||||||
|
{
|
||||||
|
return zombies;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allocate a new DCB.
|
* Allocate a new DCB.
|
||||||
*
|
*
|
||||||
@ -145,15 +150,18 @@ dcb_add_to_zombieslist(DCB *dcb)
|
|||||||
|
|
||||||
CHK_DCB(dcb);
|
CHK_DCB(dcb);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Protect zombies list access.
|
||||||
|
*/
|
||||||
|
spinlock_acquire(&zombiespin);
|
||||||
|
/**
|
||||||
|
* If dcb is already added to zombies list, return.
|
||||||
|
*/
|
||||||
if (dcb->state != DCB_STATE_NOPOLLING) {
|
if (dcb->state != DCB_STATE_NOPOLLING) {
|
||||||
ss_dassert(dcb->state != DCB_STATE_POLLING &&
|
ss_dassert(dcb->state != DCB_STATE_POLLING &&
|
||||||
dcb->state != DCB_STATE_LISTENING);
|
dcb->state != DCB_STATE_LISTENING);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
/**
|
|
||||||
* Protect zombies list access.
|
|
||||||
*/
|
|
||||||
spinlock_acquire(&zombiespin);
|
|
||||||
|
|
||||||
if (zombies == NULL) {
|
if (zombies == NULL) {
|
||||||
zombies = dcb;
|
zombies = dcb;
|
||||||
@ -278,7 +286,7 @@ void* rsession = NULL;
|
|||||||
*
|
*
|
||||||
* @param threadid The thread ID of the caller
|
* @param threadid The thread ID of the caller
|
||||||
*/
|
*/
|
||||||
void
|
DCB*
|
||||||
dcb_process_zombies(int threadid)
|
dcb_process_zombies(int threadid)
|
||||||
{
|
{
|
||||||
DCB *ptr, *lptr;
|
DCB *ptr, *lptr;
|
||||||
@ -294,7 +302,7 @@ bool succp = false;
|
|||||||
* dcb_final_free.
|
* dcb_final_free.
|
||||||
*/
|
*/
|
||||||
if (!zombies)
|
if (!zombies)
|
||||||
return;
|
return NULL;
|
||||||
|
|
||||||
spinlock_acquire(&zombiespin);
|
spinlock_acquire(&zombiespin);
|
||||||
ptr = zombies;
|
ptr = zombies;
|
||||||
@ -393,6 +401,7 @@ bool succp = false;
|
|||||||
dcb_final_free(dcb);
|
dcb_final_free(dcb);
|
||||||
dcb = dcb_next;
|
dcb = dcb_next;
|
||||||
}
|
}
|
||||||
|
return zombies;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -238,10 +238,12 @@ return_rc:
|
|||||||
void
|
void
|
||||||
poll_waitevents(void *arg)
|
poll_waitevents(void *arg)
|
||||||
{
|
{
|
||||||
struct epoll_event events[MAX_EVENTS];
|
struct epoll_event events[MAX_EVENTS];
|
||||||
int i, nfds;
|
int i, nfds;
|
||||||
int thread_id = (int)arg;
|
int thread_id = (int)arg;
|
||||||
bool no_op = FALSE;
|
bool no_op = false;
|
||||||
|
static bool process_zombies_only = false; /**< flag for all threads */
|
||||||
|
DCB *zombies = NULL;
|
||||||
|
|
||||||
/* Add this thread to the bitmask of running polling threads */
|
/* Add this thread to the bitmask of running polling threads */
|
||||||
bitmask_set(&poll_mask, thread_id);
|
bitmask_set(&poll_mask, thread_id);
|
||||||
@ -277,10 +279,24 @@ poll_waitevents(void *arg)
|
|||||||
}
|
}
|
||||||
else if (nfds == 0)
|
else if (nfds == 0)
|
||||||
{
|
{
|
||||||
nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, EPOLL_TIMEOUT);
|
if (process_zombies_only) {
|
||||||
|
simple_mutex_unlock(&epoll_wait_mutex);
|
||||||
if (nfds == -1)
|
goto process_zombies;
|
||||||
{
|
} else {
|
||||||
|
nfds = epoll_wait(epoll_fd,
|
||||||
|
events,
|
||||||
|
MAX_EVENTS,
|
||||||
|
EPOLL_TIMEOUT);
|
||||||
|
/**
|
||||||
|
* When there are zombies to be cleaned up but
|
||||||
|
* no client requests, allow all threads to call
|
||||||
|
* dcb_process_zombies without having to wait
|
||||||
|
* for the timeout.
|
||||||
|
*/
|
||||||
|
if (nfds == 0 && dcb_get_zombies() != NULL)
|
||||||
|
{
|
||||||
|
process_zombies_only = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
simple_mutex_unlock(&epoll_wait_mutex);
|
simple_mutex_unlock(&epoll_wait_mutex);
|
||||||
@ -368,8 +384,11 @@ poll_waitevents(void *arg)
|
|||||||
}
|
}
|
||||||
if (ev & EPOLLOUT)
|
if (ev & EPOLLOUT)
|
||||||
{
|
{
|
||||||
|
int eno = 0;
|
||||||
simple_mutex_lock(&dcb->dcb_write_lock,
|
simple_mutex_lock(&dcb->dcb_write_lock,
|
||||||
true);
|
true);
|
||||||
|
eno = gw_getsockerrno(dcb->fd);
|
||||||
|
ss_dassert(eno == 0);
|
||||||
ss_info_dassert(!dcb->dcb_write_active,
|
ss_info_dassert(!dcb->dcb_write_active,
|
||||||
"Write already active");
|
"Write already active");
|
||||||
dcb->dcb_write_active = TRUE;
|
dcb->dcb_write_active = TRUE;
|
||||||
@ -416,7 +435,12 @@ poll_waitevents(void *arg)
|
|||||||
} /**< for */
|
} /**< for */
|
||||||
no_op = FALSE;
|
no_op = FALSE;
|
||||||
}
|
}
|
||||||
dcb_process_zombies(thread_id);
|
process_zombies:
|
||||||
|
zombies = dcb_process_zombies(thread_id);
|
||||||
|
|
||||||
|
if (zombies == NULL) {
|
||||||
|
process_zombies_only = false;
|
||||||
|
}
|
||||||
|
|
||||||
if (shutdown)
|
if (shutdown)
|
||||||
{
|
{
|
||||||
|
@ -63,15 +63,16 @@ session_alloc(SERVICE *service, DCB *client)
|
|||||||
SESSION *session;
|
SESSION *session;
|
||||||
|
|
||||||
session = (SESSION *)calloc(1, sizeof(SESSION));
|
session = (SESSION *)calloc(1, sizeof(SESSION));
|
||||||
ss_info_dassert(session != NULL, "Allocating memory for session failed.");
|
ss_info_dassert(session != NULL,
|
||||||
|
"Allocating memory for session failed.");
|
||||||
|
|
||||||
if (session == NULL) {
|
if (session == NULL) {
|
||||||
int eno = errno;
|
int eno = errno;
|
||||||
errno = 0;
|
errno = 0;
|
||||||
skygw_log_write_flush(
|
skygw_log_write_flush(
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
"%lu [session_alloc] FAiled to allocate memory for session "
|
"%lu [session_alloc] Failed to allocate memory for "
|
||||||
"object due error %d, %s.",
|
"session object due error %d, %s.",
|
||||||
pthread_self(),
|
pthread_self(),
|
||||||
eno,
|
eno,
|
||||||
strerror(eno));
|
strerror(eno));
|
||||||
@ -94,14 +95,18 @@ session_alloc(SERVICE *service, DCB *client)
|
|||||||
session->state = SESSION_STATE_ALLOC;
|
session->state = SESSION_STATE_ALLOC;
|
||||||
/**
|
/**
|
||||||
* Associate the session to the client DCB and set the reference count on
|
* Associate the session to the client DCB and set the reference count on
|
||||||
* the session to indicate that there is a single reference to the session.
|
* the session to indicate that there is a single reference to the
|
||||||
* There is no need to protect this or use atomic add as the session has not
|
* session. There is no need to protect this or use atomic add as the
|
||||||
* been made available to the other threads at this point.
|
* session has not been made available to the other threads at this
|
||||||
|
* point.
|
||||||
*/
|
*/
|
||||||
session->data = client->data;
|
session->data = client->data;
|
||||||
client->session = session;
|
client->session = session;
|
||||||
session->refcount = 1;
|
session->refcount = 1;
|
||||||
/** This indicates that session is ready to be shared with backend DCBs. */
|
/**
|
||||||
|
* This indicates that session is ready to be shared with backend
|
||||||
|
* DCBs.
|
||||||
|
*/
|
||||||
session->state = SESSION_STATE_READY;
|
session->state = SESSION_STATE_READY;
|
||||||
|
|
||||||
/** Release session lock */
|
/** Release session lock */
|
||||||
|
@ -199,6 +199,7 @@ int fail_accept_errno;
|
|||||||
#define DCB_PROTOCOL(x, type) (type *)((x)->protocol)
|
#define DCB_PROTOCOL(x, type) (type *)((x)->protocol)
|
||||||
#define DCB_ISZOMBIE(x) ((x)->state == DCB_STATE_ZOMBIE)
|
#define DCB_ISZOMBIE(x) ((x)->state == DCB_STATE_ZOMBIE)
|
||||||
|
|
||||||
|
DCB *dcb_get_zombies(void);
|
||||||
int gw_write(int fd, const void* buf, size_t nbytes);
|
int gw_write(int fd, const void* buf, size_t nbytes);
|
||||||
int dcb_write(DCB *, GWBUF *);
|
int dcb_write(DCB *, GWBUF *);
|
||||||
DCB *dcb_alloc(dcb_role_t);
|
DCB *dcb_alloc(dcb_role_t);
|
||||||
@ -207,7 +208,7 @@ DCB *dcb_connect(struct server *, struct session *, const char *);
|
|||||||
int dcb_read(DCB *, GWBUF **);
|
int dcb_read(DCB *, GWBUF **);
|
||||||
int dcb_drain_writeq(DCB *);
|
int dcb_drain_writeq(DCB *);
|
||||||
void dcb_close(DCB *);
|
void dcb_close(DCB *);
|
||||||
void dcb_process_zombies(int); /* Process Zombies */
|
DCB *dcb_process_zombies(int); /* Process Zombies */
|
||||||
void printAllDCBs(); /* Debug to print all DCB in the system */
|
void printAllDCBs(); /* Debug to print all DCB in the system */
|
||||||
void printDCB(DCB *); /* Debug print routine */
|
void printDCB(DCB *); /* Debug print routine */
|
||||||
void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */
|
void dprintAllDCBs(DCB *); /* Debug to print all DCB in the system */
|
||||||
|
Reference in New Issue
Block a user