Files
MaxScale/server/core/poll.c
vraatikka 66e9be814b dcb.h
-------
Removed DCB states DCB_STATE_IDLE, and DCB_STATE_PROCESSING.
Added DCB_STATE_UNDEFINED for initial content for state variable which doesn't have any specific value set, and DCB_STATE_NOPOLLING to indicate that dcb has been removed from poll set.

Added following dcb roles: DCB_ROLE_SERVICE_LISTENER for listeners of services, and DCB_ROLE_REQUEST_HANDLER for client/backend dcbs. Listeners may have state DCB_STATE_LISTENING, but not DCB_STATE_POLLING. Request handlers may have DCB_STATE_POLLING but not DCB_STATE_LISTENING. Role is passed as an argument to dcb.c:dcb_alloc.

From now on, struct check numbers of DCB are included and checked in DEBUG build only.

Added dcb_role_t dcb_role-member to DCB as well as SPINLOCK dcb_initlock, which protects state changes.

Removed extern keyword from function declarations because functions are by default externally visible if they are declared in header.

dcb.b
------
Function dcb_set_state, and dcb_set_state_nomutex provide functions for changing dcb states. Latter implements a state machine for dcb.
Function dcb_add_to_zombieslist replaces dcb_free. It adds in atomic step dcb to zombieslist and changes state to DCB_STATE_ZOMBIE.
Function dcb_final_free removes dcb from allDCBs list, terminates router and client sessions, and frees dcb and related memory.
Function dcb_process_zombies removes executing thread from dcb's bitmask, and it there are no further thread bits, moves dcb to a victim list, and finally, for each dcb on victim list, closes fd and sets state to DCB_STATE_DISCONNECTED.
Function dcb_close sets dcb state to DCB_STATE_NOPOLLIN, removes dcb from poll set and sets bit to bitmask for each server thread in an atomic step.  

poll.c
------
Function poll_add_dcb sets either DCB_STATE_LISTENING or DCB_STATE_POLLING state for newly created dcb, depending whether the role of dcb is DCB_ROLE_SERVICE_LISTENER, or DCB_ROLE_REQUEST_HANDLER, respectively. Then dcb is set to poll set.

poll_waitevents : commented out code which skipped event if dcb was added to zombieslist or if fd was closed. Added state checks.

service.c : Minor changes.
httpd.c : Removed dcb state changes. They are done in core.
mysql_backend.c : Added checks, removed dcb state changes.
mysql_client.c : Removed dcb state changes. Added checks.
mysql_common.c : Minor changes
telnetd.c : Removed state changes. Replaced some typecasts and pointer references with local variable reads.
skygw_debug.h : Removed two states, and added two to state printing macro.
2013-09-05 22:00:02 +03:00

380 lines
13 KiB
C

/*
* This file is distributed as part of the SkySQL Gateway. It is free
* software: you can redistribute it and/or modify it under the terms of the
* GNU General Public License as published by the Free Software Foundation,
* version 2.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
* details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc., 51
* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Copyright SkySQL Ab 2013
*/
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <errno.h>
#include <poll.h>
#include <dcb.h>
#include <atomic.h>
#include <gwbitmask.h>
#include <skygw_utils.h>
#include <log_manager.h>
/**
* @file poll.c - Abstraction of the epoll functionality
*
* @verbatim
* Revision History
*
* Date Who Description
* 19/06/13 Mark Riddoch Initial implementation
* 28/06/13 Mark Riddoch Added poll mask support and DCB
* zombie management
*
* @endverbatim
*/
static int epoll_fd = -1; /**< The epoll file descriptor */
static int shutdown = 0; /**< Flag the shutdown of the poll subsystem */
static GWBITMASK poll_mask;
static simple_mutex_t epoll_wait_mutex; /**< serializes calls to epoll_wait */
/**
* The polling statistics
*/
static struct {
int n_read; /**< Number of read events */
int n_write; /**< Number of write events */
int n_error; /**< Number of error events */
int n_hup; /**< Number of hangup events */
int n_accept; /**< Number of accept events */
int n_polls; /**< Number of poll cycles */
} pollStats;
/**
* Initialise the polling system we are using for the gateway.
*
* In this case we are using the Linux epoll mechanism
*/
void
poll_init()
{
if (epoll_fd != -1)
return;
if ((epoll_fd = epoll_create(MAX_EVENTS)) == -1)
{
perror("epoll_create");
exit(-1);
}
memset(&pollStats, 0, sizeof(pollStats));
bitmask_init(&poll_mask);
simple_mutex_init(&epoll_wait_mutex, "epoll_wait_mutex");
}
/**
* Add a DCB to the set of descriptors within the polling
* environment.
*
* @param dcb The descriptor to add to the poll
* @return -1 on error or 0 on success
*/
int
poll_add_dcb(DCB *dcb)
{
int rc;
dcb_state_t old_state = DCB_STATE_UNDEFINED;
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.data.ptr = dcb;
/**
* Service listeners have different state than
* DCBs serving client requests.
*/
if (dcb->dcb_role == DCB_ROLE_SERVICE_LISTENER) {
dcb_set_state(dcb, DCB_STATE_LISTENING, &old_state);
} else if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER) {
dcb_set_state(dcb, DCB_STATE_POLLING, &old_state);
}
rc = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, dcb->fd, &ev);
if (rc != 0) {
dcb_set_state(dcb, old_state, NULL);
}
return rc;
}
/**
* Remove a descriptor from the set of descriptors within the
* polling environment.
*
* @param dcb The descriptor to remove
* @return -1 on error or 0 on success
*/
int
poll_remove_dcb(DCB *dcb)
{
struct epoll_event ev;
return epoll_ctl(epoll_fd, EPOLL_CTL_DEL, dcb->fd, &ev);
}
#define BLOCKINGPOLL 0 /* Set BLOCKING POLL to 1 if using a single thread and to make
* debugging easier.
*/
/**
* The main polling loop
*
* This routine does the polling and despatches of IO events
* to the DCB's. It may be called either directly or as the entry point
* of a polling thread within the gateway.
*
* The routine will loop as long as the variable "shutdown" is set to zero,
* setting this to a non-zero value will cause the polling loop to return.
*
* There are two options for the polling, a debug option that is only useful if
* you have a single thread. This blocks in epoll_wait until an event occurs.
*
* The non-debug option does an epoll with a time out. This allows the checking of
* shutdown value to be checked in all threads. The algorithm for polling in this
* mode is to do a poll with no-wait, if no events are detected then the poll is
* repeated with a time out. This allows for a quick check before making the call
* with timeout. The call with the timeout differs in that the Linux scheduler may
* deschedule a process if a timeout is included, but will not do this if a 0 timeout
* value is given. this improves performance when the gateway is under heavy load.
*
* @param arg The thread ID passed as a void * to satisfy the threading package
*/
void
poll_waitevents(void *arg)
{
struct epoll_event events[MAX_EVENTS];
int i, nfds;
int thread_id = (int)arg;
bool no_op = FALSE;
/* Add this thread to the bitmask of running polling threads */
bitmask_set(&poll_mask, thread_id);
while (1)
{
#if BLOCKINGPOLL
if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1)) == -1)
{
}
#else
if (!no_op) {
skygw_log_write(LOGFILE_TRACE,
"%lu [poll_waitevents] > epoll_wait <",
pthread_self());
no_op = TRUE;
}
simple_mutex_lock(&epoll_wait_mutex, TRUE);
if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, 0)) == -1)
{
int eno = errno;
errno = 0;
skygw_log_write(LOGFILE_TRACE,
"%lu [poll_waitevents] epoll_wait returned "
"%d, errno %d",
pthread_self(),
nfds,
eno);
no_op = FALSE;
}
else if (nfds == 0)
{
nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, EPOLL_TIMEOUT);
if (nfds == -1)
{
}
}
simple_mutex_unlock(&epoll_wait_mutex);
#endif
if (nfds > 0)
{
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] epoll_wait found %d fds",
pthread_self(),
nfds);
atomic_add(&pollStats.n_polls, 1);
for (i = 0; i < nfds; i++)
{
DCB *dcb = (DCB *)events[i].data.ptr;
__uint32_t ev = events[i].events;
CHK_DCB(dcb);
ss_debug(spinlock_acquire(&dcb->dcb_initlock);)
ss_dassert(dcb->state != DCB_STATE_ALLOC);
ss_dassert(dcb->state != DCB_STATE_DISCONNECTED);
ss_dassert(dcb->state != DCB_STATE_FREED);
ss_debug(spinlock_release(&dcb->dcb_initlock);)
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] event %d",
pthread_self(),
ev);
#if 0
if (DCB_ISZOMBIE(dcb))
{
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] dcb is "
"zombie",
pthread_self());
continue;
}
if (dcb->state == DCB_STATE_DISCONNECTED ||
dcb->state == DCB_STATE_PROCESSING)
{
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] dcb state is "
"%s",
pthread_self(),
STRDCBSTATE(dcb->state));
continue;
}
#endif
if (ev & EPOLLERR)
{
atomic_add(&pollStats.n_error, 1);
dcb->func.error(dcb);
if (DCB_ISZOMBIE(dcb)) {
continue;
}
}
if (ev & EPOLLHUP)
{
atomic_add(&pollStats.n_hup, 1);
dcb->func.hangup(dcb);
if (DCB_ISZOMBIE(dcb)) {
continue;
}
}
if (ev & EPOLLOUT)
{
simple_mutex_lock(&dcb->dcb_write_lock,
true);
ss_info_dassert(!dcb->dcb_write_active,
"Write already active");
dcb->dcb_write_active = TRUE;
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] "
"Write in fd %d",
pthread_self(),
dcb->fd);
atomic_add(&pollStats.n_write, 1);
dcb->func.write_ready(dcb);
dcb->dcb_write_active = FALSE;
simple_mutex_unlock(&dcb->dcb_write_lock);
}
if (ev & EPOLLIN)
{
simple_mutex_lock(&dcb->dcb_read_lock,
true);
ss_info_dassert(!dcb->dcb_read_active,
"Read already active");
dcb->dcb_read_active = TRUE;
if (dcb->state == DCB_STATE_LISTENING)
{
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] "
"Accept in fd %d",
pthread_self(),
dcb->fd);
atomic_add(&pollStats.n_accept, 1);
dcb->func.accept(dcb);
}
else
{
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] "
"Read in fd %d",
pthread_self(),
dcb->fd);
atomic_add(&pollStats.n_read, 1);
dcb->func.read(dcb);
}
dcb->dcb_read_active = FALSE;
simple_mutex_unlock(
&dcb->dcb_read_lock);
}
} /**< for */
no_op = FALSE;
}
dcb_process_zombies(thread_id);
if (shutdown)
{
/**
* Remove this thread from the bitmask of running
* polling threads
*/
bitmask_clear(&poll_mask, thread_id);
return;
}
} /**< while(1) */
}
/**
* Shutdown the polling loop
*/
void
poll_shutdown()
{
shutdown = 1;
}
/**
* Return the bitmask of polling threads
*
* @return The bitmask of the running polling threads
*/
GWBITMASK *
poll_bitmask()
{
return &poll_mask;
}
/**
* Debug routine to print the polling statistics
*
* @param dcb DCB to print to
*/
void
dprintPollStats(DCB *dcb)
{
dcb_printf(dcb, "Number of epoll cycles: %d\n", pollStats.n_polls);
dcb_printf(dcb, "Number of read events: %d\n", pollStats.n_read);
dcb_printf(dcb, "Number of write events: %d\n", pollStats.n_write);
dcb_printf(dcb, "Number of error events: %d\n", pollStats.n_error);
dcb_printf(dcb, "Number of hangup events: %d\n", pollStats.n_hup);
dcb_printf(dcb, "Number of accept events: %d\n", pollStats.n_accept);
}