Added simple_mutex_t dcb->mutex to DCB struct to protect processing of single dcb in poll.c:poll_waitevents. This is a coarse locking but seems to work with multiple threads at least with very simple load.

Added trace logging to many locations.
This commit is contained in:
vraatikka 2013-08-16 23:52:10 +03:00
parent 7c0ed171a0
commit 97b3da1d79
5 changed files with 130 additions and 51 deletions

View File

@ -95,6 +95,7 @@ DCB *rval;
rval->data = NULL;
rval->protocol = NULL;
rval->session = NULL;
simple_mutex_init(&rval->mutex, "dcb mutex");
memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics
bitmask_init(&rval->memdata.bitmask);
rval->memdata.next = NULL;
@ -123,10 +124,11 @@ DCB *rval;
*/
void
dcb_free(DCB *dcb)
{
{
if (dcb->state == DCB_STATE_ZOMBIE)
{
skygw_log_write( LOGFILE_ERROR, "Call to free a DCB that is already a zombie.\n");
skygw_log_write(LOGFILE_ERROR,
"Call to free a DCB that is already a zombie.\n");
return;
}
@ -155,7 +157,12 @@ dcb_free(DCB *dcb)
}
spinlock_release(&zombiespin);
skygw_log_write(
LOGFILE_TRACE,
"%lu [dcb_free] Set dcb %p for fd %d DCB_STATE_ZOMBIE",
pthread_self(),
(unsigned long)dcb,
dcb->fd);
dcb->state = DCB_STATE_ZOMBIE;
}
@ -228,7 +235,7 @@ DCB *ptr, *lptr;
ptr = zombies;
lptr = NULL;
while (ptr)
{
{
bitmask_clear(&ptr->memdata.bitmask, threadid);
if (bitmask_isallclear(&ptr->memdata.bitmask))
{
@ -248,6 +255,12 @@ DCB *ptr, *lptr;
zombies = tptr;
else
lptr->memdata.next = tptr;
skygw_log_write(
LOGFILE_TRACE,
"%lu [dcb_process_zombies] Free dcb %p for fd %d",
pthread_self(),
(unsigned long)ptr,
ptr->fd);
dcb_final_free(ptr);
ptr = tptr;
}
@ -286,7 +299,7 @@ GWPROTOCOL *funcs;
{
dcb_final_free(dcb);
skygw_log_write( LOGFILE_ERROR,
"Failed to load protocol module for %s, feee dcb %p\n", protocol, dcb);
"Failed to load protocol module for %s, free dcb %p\n", protocol, dcb);
return NULL;
}
memcpy(&(dcb->func), funcs, sizeof(GWPROTOCOL));
@ -330,7 +343,6 @@ dcb_read(DCB *dcb, GWBUF **head)
{
GWBUF *buffer = NULL;
int b, n = 0;
pthread_t tid = pthread_self();
ioctl(dcb->fd, FIONREAD, &b);
while (b > 0)
@ -361,8 +373,8 @@ pthread_t tid = pthread_self();
skygw_log_write(
LOGFILE_TRACE,
"%lu [dcb_read] Read %d Bytes from %d",
tid,
"%lu [dcb_read] Read %d Bytes from fd %d",
pthread_self(),
n,
dcb->fd);
// append read data to the gwbuf
@ -385,9 +397,8 @@ int
dcb_write(DCB *dcb, GWBUF *queue)
{
int w, saved_errno = 0;
pthread_t tid = pthread_self();
spinlock_acquire(&dcb->writeqlock);
spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq)
{
/*
@ -404,7 +415,7 @@ pthread_t tid = pthread_self();
skygw_log_write(
LOGFILE_TRACE,
"%lu [dcb_write] Append to writequeue. %d writes buffered for %d",
tid,
pthread_self(),
dcb->stats.n_buffered,
dcb->fd);
}
@ -427,14 +438,14 @@ pthread_t tid = pthread_self();
{
skygw_log_write(
LOGFILE_ERROR,
"%lu [dcb_write] Write to %d failed, errno %d",
tid,
"%lu [dcb_write] Write to fd %d failed, errno %d",
pthread_self(),
dcb->fd,
saved_errno);
skygw_log_write(
LOGFILE_TRACE,
"%lu [dcb_write] Write to %d failed, errno %d",
tid,
"%lu [dcb_write] Write to fd %d failed, errno %d",
pthread_self(),
dcb->fd,
saved_errno);
@ -452,8 +463,8 @@ pthread_t tid = pthread_self();
}
skygw_log_write(
LOGFILE_TRACE,
"%lu [dcb_write] Wrote %d Bytes to %d",
tid,
"%lu [dcb_write] Wrote %d Bytes to fd %d",
pthread_self(),
w,
dcb->fd);
}
@ -489,7 +500,6 @@ dcb_drain_writeq(DCB *dcb)
int n = 0;
int w;
int saved_errno = 0;
pthread_t tid = pthread_self();
spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq)
@ -510,14 +520,16 @@ pthread_t tid = pthread_self();
{
skygw_log_write(
LOGFILE_ERROR,
"%lu [dcb_drain_writeq] Write to %d failed, errno %d",
tid,
"%lu [dcb_drain_writeq] Write to fd %d failed, "
"errno %d",
pthread_self(),
dcb->fd,
saved_errno);
skygw_log_write(
LOGFILE_TRACE,
"%lu [dcb_drain_writeq] Write to %d failed, errno %d",
tid,
"%lu [dcb_drain_writeq] Write to df %d failed, "
"errno %d",
pthread_self(),
dcb->fd,
saved_errno);
@ -535,8 +547,8 @@ pthread_t tid = pthread_self();
}
skygw_log_write(
LOGFILE_TRACE,
"%lu [dcb_drain_writeq] Wrote %d Bytes to %d",
tid,
"%lu [dcb_drain_writeq] Wrote %d Bytes to fd %d",
pthread_self(),
w,
dcb->fd);
n += w;

View File

@ -234,13 +234,49 @@ char buf[1024];
}
static libmysqld_done(void)
static void libmysqld_done(void)
{
if (libmysqld_started) {
mysql_library_end();
}
}
#if 0
static char* set_home_and_variables(
int argc,
char** argv)
{
int i;
int n;
char* home = NULL;
bool home_set = FALSE;
for (i=1; i<argc; i++) {
if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--path") == 0){
int j = 0;
while (argv[n][j] == 0 && j<10) j++;
if (strnlen(&argv[n][j], 1) == 0) {
if (strnlen(&argv[n][j], 1) > 0 && access(&argv[n][j], R_OK) == 0) {
home = strdup(&argv[n][j]);
goto return_home;
}
}
}
if ((home = getenv("MAXSCALE_HOME")) != NULL)
{
sprintf(mysql_home, "%s/mysql", home);
setenv("MYSQL_HOME", mysql_home, 1);
}
return_home:
return home;
}
#endif
/**
* The main entry point into the gateway
@ -259,18 +295,15 @@ char mysql_home[1024], buf[1024], *home, *cnf_file = NULL;
char ddopt[1024];
void* log_flush_thr = NULL;
ssize_t log_flush_timeout_ms = 0;
int l;
int l;
l = atexit(skygw_logmanager_exit);
if (l != 0) {
fprintf(stderr, "Couldn't register exit function.\n");
}
atexit(datadir_cleanup);
for (n = 0; n < argc; n++)
{
if (strcmp(argv[n], "-d") == 0)

View File

@ -143,7 +143,6 @@ poll_waitevents(void *arg)
struct epoll_event events[MAX_EVENTS];
int i, nfds;
int thread_id = (int)arg;
pthread_t tid = pthread_self();
bool no_op = FALSE;
/* Add this thread to the bitmask of running polling threads */
@ -158,7 +157,7 @@ bool no_op = FALSE;
if (!no_op) {
skygw_log_write(LOGFILE_TRACE,
"%lu [poll_waitevents] > epoll_wait <",
tid);
pthread_self());
no_op = TRUE;
}
@ -167,15 +166,18 @@ bool no_op = FALSE;
int eno = errno;
errno = 0;
skygw_log_write(LOGFILE_TRACE,
"%lu [poll_waitevents] epoll_wait returned %d, errno %d",
tid,
"%lu [poll_waitevents] epoll_wait returned "
"%d, errno %d",
pthread_self(),
nfds,
eno);
no_op = FALSE;
}
else if (nfds == 0)
{
if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, EPOLL_TIMEOUT)) == -1)
nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, EPOLL_TIMEOUT);
if (nfds == -1)
{
}
}
@ -185,7 +187,7 @@ bool no_op = FALSE;
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] epoll_wait found %d fds",
tid,
pthread_self(),
nfds);
atomic_add(&pollStats.n_polls, 1);
@ -193,18 +195,22 @@ bool no_op = FALSE;
{
DCB *dcb = (DCB *)events[i].data.ptr;
__uint32_t ev = events[i].events;
simple_mutex_t* mutex = &dcb->mutex;
simple_mutex_lock(mutex, TRUE);
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] event %d",
tid,
pthread_self(),
ev);
if (DCB_ISZOMBIE(dcb))
{
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] dcb is zombie",
tid);
pthread_self());
simple_mutex_unlock(mutex);
continue;
}
@ -212,21 +218,26 @@ bool no_op = FALSE;
{
atomic_add(&pollStats.n_error, 1);
dcb->func.error(dcb);
if (DCB_ISZOMBIE(dcb))
if (DCB_ISZOMBIE(dcb)) {
simple_mutex_unlock(mutex);
continue;
}
}
if (ev & EPOLLHUP)
{
atomic_add(&pollStats.n_hup, 1);
dcb->func.hangup(dcb);
if (DCB_ISZOMBIE(dcb))
if (DCB_ISZOMBIE(dcb)) {
simple_mutex_unlock(mutex);
continue;
}
}
if (ev & EPOLLOUT)
{
skygw_log_write(LOGFILE_TRACE,
"%lu [poll_waitevents] Write in %d",
tid,
"%lu [poll_waitevents] "
"Write in fd %d",
pthread_self(),
dcb->fd);
atomic_add(&pollStats.n_write, 1);
dcb->func.write_ready(dcb);
@ -237,8 +248,9 @@ bool no_op = FALSE;
{
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] Accept in %d",
tid,
"%lu [poll_waitevents] "
"Accept in fd %d",
pthread_self(),
dcb->fd);
atomic_add(&pollStats.n_accept, 1);
dcb->func.accept(dcb);
@ -247,13 +259,15 @@ bool no_op = FALSE;
{
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] Read in %d",
tid,
"%lu [poll_waitevents] "
"Read in fd %d",
pthread_self(),
dcb->fd);
atomic_add(&pollStats.n_read, 1);
dcb->func.read(dcb);
}
}
simple_mutex_unlock(mutex);
} /**< for */
no_op = FALSE;
}

View File

@ -20,6 +20,7 @@
#include <spinlock.h>
#include <buffer.h>
#include <gwbitmask.h>
#include <skygw_utils.h>
struct session;
struct server;
@ -133,6 +134,7 @@ typedef struct {
* gateway may be selected to execute the required actions when a network event occurs.
*/
typedef struct dcb {
simple_mutex_t mutex; /**< Protects dcb processing. Coarse and temporary? */
int fd; /**< The descriptor */
int state; /**< Current descriptor state */
char *remote; /**< Address of remote end */

View File

@ -28,7 +28,8 @@
* 17/06/2013 Massimiliano Pinto Added Client To Gateway routines
* 24/06/2013 Massimiliano Pinto Added: fetch passwords from service users' hashtable
*/
#include <skygw_utils.h>
#include <log_manager.h>
#include <mysql_client_server_protocol.h>
static char *version_str = "V1.0.0";
@ -480,23 +481,40 @@ int w, saved_errno = 0;
* Client read event triggered by EPOLLIN
*
* @param dcb Descriptor control block
* @return TRUE on error
* @return 0 if succeed, 1 otherwise
*/
int gw_read_client_event(DCB* dcb) {
SESSION *session = NULL;
ROUTER_OBJECT *router = NULL;
ROUTER *router_instance = NULL;
void *rsession = NULL;
MySQLProtocol *protocol = NULL;
int b = -1;
MySQLProtocol *protocol = NULL;
int b = -1;
if (dcb) {
protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
}
if (ioctl(dcb->fd, FIONREAD, &b)) {
fprintf(stderr, "Client Ioctl FIONREAD error for %i: errno %i, %s\n", dcb->fd, errno , strerror(errno));
return 1;
int eno = errno;
errno = 0;
skygw_log_write(
LOGFILE_ERROR,
"%lu [gw_read_client_event] Setting FIONREAD for %d failed. "
"errno %d, %s",
pthread_self(),
dcb->fd,
eno ,
strerror(eno));
skygw_log_write(
LOGFILE_TRACE,
"%lu [gw_read_client_event] Setting FIONREAD for %d failed. "
"errno %d, %s",
pthread_self(),
dcb->fd,
eno ,
strerror(eno));
return 1;
} else {
//fprintf(stderr, "Client IOCTL FIONREAD bytes to read = %i\n", b);
}