diff --git a/server/core/dcb.c b/server/core/dcb.c index d6d137e13..73d7948ce 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -95,6 +95,7 @@ DCB *rval; rval->data = NULL; rval->protocol = NULL; rval->session = NULL; + simple_mutex_init(&rval->mutex, "dcb mutex"); memset(&rval->stats, 0, sizeof(DCBSTATS)); // Zero the statistics bitmask_init(&rval->memdata.bitmask); rval->memdata.next = NULL; @@ -123,10 +124,11 @@ DCB *rval; */ void dcb_free(DCB *dcb) -{ +{ if (dcb->state == DCB_STATE_ZOMBIE) { - skygw_log_write( LOGFILE_ERROR, "Call to free a DCB that is already a zombie.\n"); + skygw_log_write(LOGFILE_ERROR, + "Call to free a DCB that is already a zombie.\n"); return; } @@ -155,7 +157,12 @@ dcb_free(DCB *dcb) } spinlock_release(&zombiespin); - + skygw_log_write( + LOGFILE_TRACE, + "%lu [dcb_free] Set dcb %p for fd %d DCB_STATE_ZOMBIE", + pthread_self(), + (unsigned long)dcb, + dcb->fd); dcb->state = DCB_STATE_ZOMBIE; } @@ -228,7 +235,7 @@ DCB *ptr, *lptr; ptr = zombies; lptr = NULL; while (ptr) - { + { bitmask_clear(&ptr->memdata.bitmask, threadid); if (bitmask_isallclear(&ptr->memdata.bitmask)) { @@ -248,6 +255,12 @@ DCB *ptr, *lptr; zombies = tptr; else lptr->memdata.next = tptr; + skygw_log_write( + LOGFILE_TRACE, + "%lu [dcb_process_zombies] Free dcb %p for fd %d", + pthread_self(), + (unsigned long)ptr, + ptr->fd); dcb_final_free(ptr); ptr = tptr; } @@ -286,7 +299,7 @@ GWPROTOCOL *funcs; { dcb_final_free(dcb); skygw_log_write( LOGFILE_ERROR, - "Failed to load protocol module for %s, feee dcb %p\n", protocol, dcb); + "Failed to load protocol module for %s, free dcb %p\n", protocol, dcb); return NULL; } memcpy(&(dcb->func), funcs, sizeof(GWPROTOCOL)); @@ -330,7 +343,6 @@ dcb_read(DCB *dcb, GWBUF **head) { GWBUF *buffer = NULL; int b, n = 0; -pthread_t tid = pthread_self(); ioctl(dcb->fd, FIONREAD, &b); while (b > 0) @@ -361,8 +373,8 @@ pthread_t tid = pthread_self(); skygw_log_write( LOGFILE_TRACE, - "%lu [dcb_read] Read %d Bytes from %d", - tid, + "%lu [dcb_read] Read %d Bytes from fd %d", + pthread_self(), n, dcb->fd); // append read data to the gwbuf @@ -385,9 +397,8 @@ int dcb_write(DCB *dcb, GWBUF *queue) { int w, saved_errno = 0; -pthread_t tid = pthread_self(); - spinlock_acquire(&dcb->writeqlock); + spinlock_acquire(&dcb->writeqlock); if (dcb->writeq) { /* @@ -404,7 +415,7 @@ pthread_t tid = pthread_self(); skygw_log_write( LOGFILE_TRACE, "%lu [dcb_write] Append to writequeue. %d writes buffered for %d", - tid, + pthread_self(), dcb->stats.n_buffered, dcb->fd); } @@ -427,14 +438,14 @@ pthread_t tid = pthread_self(); { skygw_log_write( LOGFILE_ERROR, - "%lu [dcb_write] Write to %d failed, errno %d", - tid, + "%lu [dcb_write] Write to fd %d failed, errno %d", + pthread_self(), dcb->fd, saved_errno); skygw_log_write( LOGFILE_TRACE, - "%lu [dcb_write] Write to %d failed, errno %d", - tid, + "%lu [dcb_write] Write to fd %d failed, errno %d", + pthread_self(), dcb->fd, saved_errno); @@ -452,8 +463,8 @@ pthread_t tid = pthread_self(); } skygw_log_write( LOGFILE_TRACE, - "%lu [dcb_write] Wrote %d Bytes to %d", - tid, + "%lu [dcb_write] Wrote %d Bytes to fd %d", + pthread_self(), w, dcb->fd); } @@ -489,7 +500,6 @@ dcb_drain_writeq(DCB *dcb) int n = 0; int w; int saved_errno = 0; -pthread_t tid = pthread_self(); spinlock_acquire(&dcb->writeqlock); if (dcb->writeq) @@ -510,14 +520,16 @@ pthread_t tid = pthread_self(); { skygw_log_write( LOGFILE_ERROR, - "%lu [dcb_drain_writeq] Write to %d failed, errno %d", - tid, + "%lu [dcb_drain_writeq] Write to fd %d failed, " + "errno %d", + pthread_self(), dcb->fd, saved_errno); skygw_log_write( LOGFILE_TRACE, - "%lu [dcb_drain_writeq] Write to %d failed, errno %d", - tid, + "%lu [dcb_drain_writeq] Write to df %d failed, " + "errno %d", + pthread_self(), dcb->fd, saved_errno); @@ -535,8 +547,8 @@ pthread_t tid = pthread_self(); } skygw_log_write( LOGFILE_TRACE, - "%lu [dcb_drain_writeq] Wrote %d Bytes to %d", - tid, + "%lu [dcb_drain_writeq] Wrote %d Bytes to fd %d", + pthread_self(), w, dcb->fd); n += w; diff --git a/server/core/gateway.c b/server/core/gateway.c index fc1f01b3e..8c01b5a62 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -234,13 +234,49 @@ char buf[1024]; } -static libmysqld_done(void) +static void libmysqld_done(void) { if (libmysqld_started) { mysql_library_end(); } } +#if 0 +static char* set_home_and_variables( + int argc, + char** argv) +{ + int i; + int n; + char* home = NULL; + bool home_set = FALSE; + + for (i=1; i 0 && access(&argv[n][j], R_OK) == 0) { + home = strdup(&argv[n][j]); + goto return_home; + } + } + } + if ((home = getenv("MAXSCALE_HOME")) != NULL) + { + sprintf(mysql_home, "%s/mysql", home); + setenv("MYSQL_HOME", mysql_home, 1); + } + +return_home: + return home; + +} +#endif /** * The main entry point into the gateway @@ -259,18 +295,15 @@ char mysql_home[1024], buf[1024], *home, *cnf_file = NULL; char ddopt[1024]; void* log_flush_thr = NULL; ssize_t log_flush_timeout_ms = 0; - - int l; +int l; l = atexit(skygw_logmanager_exit); if (l != 0) { fprintf(stderr, "Couldn't register exit function.\n"); } - atexit(datadir_cleanup); - for (n = 0; n < argc; n++) { if (strcmp(argv[n], "-d") == 0) diff --git a/server/core/poll.c b/server/core/poll.c index c351b4c38..353da62a8 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -143,7 +143,6 @@ poll_waitevents(void *arg) struct epoll_event events[MAX_EVENTS]; int i, nfds; int thread_id = (int)arg; -pthread_t tid = pthread_self(); bool no_op = FALSE; /* Add this thread to the bitmask of running polling threads */ @@ -158,7 +157,7 @@ bool no_op = FALSE; if (!no_op) { skygw_log_write(LOGFILE_TRACE, "%lu [poll_waitevents] > epoll_wait <", - tid); + pthread_self()); no_op = TRUE; } @@ -167,15 +166,18 @@ bool no_op = FALSE; int eno = errno; errno = 0; skygw_log_write(LOGFILE_TRACE, - "%lu [poll_waitevents] epoll_wait returned %d, errno %d", - tid, + "%lu [poll_waitevents] epoll_wait returned " + "%d, errno %d", + pthread_self(), nfds, eno); no_op = FALSE; } else if (nfds == 0) { - if ((nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, EPOLL_TIMEOUT)) == -1) + nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, EPOLL_TIMEOUT); + + if (nfds == -1) { } } @@ -185,7 +187,7 @@ bool no_op = FALSE; skygw_log_write( LOGFILE_TRACE, "%lu [poll_waitevents] epoll_wait found %d fds", - tid, + pthread_self(), nfds); atomic_add(&pollStats.n_polls, 1); @@ -193,18 +195,22 @@ bool no_op = FALSE; { DCB *dcb = (DCB *)events[i].data.ptr; __uint32_t ev = events[i].events; + simple_mutex_t* mutex = &dcb->mutex; + simple_mutex_lock(mutex, TRUE); + skygw_log_write( LOGFILE_TRACE, "%lu [poll_waitevents] event %d", - tid, + pthread_self(), ev); if (DCB_ISZOMBIE(dcb)) { skygw_log_write( LOGFILE_TRACE, "%lu [poll_waitevents] dcb is zombie", - tid); + pthread_self()); + simple_mutex_unlock(mutex); continue; } @@ -212,21 +218,26 @@ bool no_op = FALSE; { atomic_add(&pollStats.n_error, 1); dcb->func.error(dcb); - if (DCB_ISZOMBIE(dcb)) + if (DCB_ISZOMBIE(dcb)) { + simple_mutex_unlock(mutex); continue; + } } if (ev & EPOLLHUP) { atomic_add(&pollStats.n_hup, 1); dcb->func.hangup(dcb); - if (DCB_ISZOMBIE(dcb)) + if (DCB_ISZOMBIE(dcb)) { + simple_mutex_unlock(mutex); continue; + } } if (ev & EPOLLOUT) { skygw_log_write(LOGFILE_TRACE, - "%lu [poll_waitevents] Write in %d", - tid, + "%lu [poll_waitevents] " + "Write in fd %d", + pthread_self(), dcb->fd); atomic_add(&pollStats.n_write, 1); dcb->func.write_ready(dcb); @@ -237,8 +248,9 @@ bool no_op = FALSE; { skygw_log_write( LOGFILE_TRACE, - "%lu [poll_waitevents] Accept in %d", - tid, + "%lu [poll_waitevents] " + "Accept in fd %d", + pthread_self(), dcb->fd); atomic_add(&pollStats.n_accept, 1); dcb->func.accept(dcb); @@ -247,13 +259,15 @@ bool no_op = FALSE; { skygw_log_write( LOGFILE_TRACE, - "%lu [poll_waitevents] Read in %d", - tid, + "%lu [poll_waitevents] " + "Read in fd %d", + pthread_self(), dcb->fd); atomic_add(&pollStats.n_read, 1); dcb->func.read(dcb); } } + simple_mutex_unlock(mutex); } /**< for */ no_op = FALSE; } diff --git a/server/include/dcb.h b/server/include/dcb.h index 31ae2e930..332bfbf4b 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -20,6 +20,7 @@ #include #include #include +#include struct session; struct server; @@ -133,6 +134,7 @@ typedef struct { * gateway may be selected to execute the required actions when a network event occurs. */ typedef struct dcb { + simple_mutex_t mutex; /**< Protects dcb processing. Coarse and temporary? */ int fd; /**< The descriptor */ int state; /**< Current descriptor state */ char *remote; /**< Address of remote end */ diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index db44ff8ca..930daeff4 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -28,7 +28,8 @@ * 17/06/2013 Massimiliano Pinto Added Client To Gateway routines * 24/06/2013 Massimiliano Pinto Added: fetch passwords from service users' hashtable */ - +#include +#include #include static char *version_str = "V1.0.0"; @@ -480,23 +481,40 @@ int w, saved_errno = 0; * Client read event triggered by EPOLLIN * * @param dcb Descriptor control block - * @return TRUE on error + * @return 0 if succeed, 1 otherwise */ int gw_read_client_event(DCB* dcb) { SESSION *session = NULL; ROUTER_OBJECT *router = NULL; ROUTER *router_instance = NULL; void *rsession = NULL; - MySQLProtocol *protocol = NULL; - int b = -1; + MySQLProtocol *protocol = NULL; + int b = -1; if (dcb) { protocol = DCB_PROTOCOL(dcb, MySQLProtocol); } if (ioctl(dcb->fd, FIONREAD, &b)) { - fprintf(stderr, "Client Ioctl FIONREAD error for %i: errno %i, %s\n", dcb->fd, errno , strerror(errno)); - return 1; + int eno = errno; + errno = 0; + skygw_log_write( + LOGFILE_ERROR, + "%lu [gw_read_client_event] Setting FIONREAD for %d failed. " + "errno %d, %s", + pthread_self(), + dcb->fd, + eno , + strerror(eno)); + skygw_log_write( + LOGFILE_TRACE, + "%lu [gw_read_client_event] Setting FIONREAD for %d failed. " + "errno %d, %s", + pthread_self(), + dcb->fd, + eno , + strerror(eno)); + return 1; } else { //fprintf(stderr, "Client IOCTL FIONREAD bytes to read = %i\n", b); }