diff --git a/client/Makefile b/client/Makefile index 4c1982e5d..216ef23ca 100644 --- a/client/Makefile +++ b/client/Makefile @@ -18,10 +18,19 @@ # Date Who Description # 13/06/14 Mark Riddoch Initial implementation of MaxScale # client program +# 18/06/14 Mark Riddoch Addition of conditional for histedit + +ifeq ($(wildcard /usr/include/histedit.h), ) +HISTLIB= +HISTFLAG= +else +HISTLIB=-ledit +HISTFLAG=-DHISTORY +endif CC=cc -CFLAGS=-c -Wall -g +CFLAGS=-c -Wall -g $(HISTFLAG) SRCS= maxadmin.c @@ -29,7 +38,7 @@ HDRS= OBJ=$(SRCS:.c=.o) -LIBS=-ledit +LIBS=$(HISTLIB) all: maxadmin diff --git a/client/maxadmin.c b/client/maxadmin.c index 4065a971d..23c3eee04 100644 --- a/client/maxadmin.c +++ b/client/maxadmin.c @@ -24,6 +24,7 @@ * * Date Who Description * 13/06/14 Mark Riddoch Initial implementation + * 15/06/14 Mark Riddoch Addition of source command * * @endverbatim */ @@ -45,13 +46,17 @@ #include #include +#ifdef HISTORY #include +#endif static int connectMaxScale(char *hostname, char *port); static int setipaddress(struct in_addr *a, char *p); static int authMaxScale(int so, char *user, char *password); static int sendCommand(int so, char *cmd); +static void DoSource(int so, char *cmd); +#ifdef HISTORY static char * prompt(EditLine *el __attribute__((__unused__))) { @@ -59,17 +64,22 @@ prompt(EditLine *el __attribute__((__unused__))) return prompt; } +#endif int main(int argc, char **argv) { -EditLine *el = NULL; int i, num, rv, fatal = 0; +#ifdef HISTORY char *buf; +EditLine *el = NULL; Tokenizer *tok; History *hist; HistEvent ev; const LineInfo *li; +#else +char buf[1024]; +#endif char *hostname = "localhost"; char *port = "6603"; char *user = "admin"; @@ -194,7 +204,7 @@ char *cmd; } (void) setlocale(LC_CTYPE, ""); - +#ifdef HISTORY hist = history_init(); /* Init the builtin history */ /* Remember 100 events */ history(hist, &ev, H_SETSIZE, 100); @@ -225,12 +235,19 @@ char *cmd; while ((buf = el_gets(el, &num)) != NULL && num != 0) { +#else + while (printf("MaxScale> ") && fgets(buf, 1024, stdin) != NULL) + { + num = strlen(buf); +#endif /* Strip trailing \n\r */ for (i = num - 1; buf[i] == '\r' || buf[i] == '\n'; i--) buf[i] = 0; +#ifdef HISTORY li = el_line(el); history(hist, &ev, H_ENTER, buf); +#endif if (!strcasecmp(buf, "quit")) { @@ -238,10 +255,18 @@ char *cmd; } else if (!strcasecmp(buf, "history")) { +#ifdef HISTORY for (rv = history(hist, &ev, H_LAST); rv != -1; rv = history(hist, &ev, H_PREV)) fprintf(stdout, "%4d %s\n", ev.num, ev.str); +#else + fprintf(stderr, "History not supported in this version.\n"); +#endif + } + else if (!strncasecmp(buf, "source", 6)) + { + DoSource(so, buf); } else if (*buf) { @@ -249,9 +274,11 @@ char *cmd; } } +#ifdef HISTORY el_end(el); tok_end(tok); history_end(hist); +#endif close(so); return 0; } @@ -371,3 +398,44 @@ int i; } return 1; } + +static void +DoSource(int so, char *buf) +{ +char *ptr, *pe; +char line[132]; +FILE *fp; + + /* Find the filename */ + ptr = &buf[strlen("source")]; + while (*ptr && isspace(*ptr)) + ptr++; + + if ((fp = fopen(ptr, "r")) == NULL) + { + fprintf(stderr, "Unable to open command file '%s'.\n", + ptr); + return; + } + + while ((ptr = fgets(line, 132, fp)) != NULL) + { + /* Strip tailing newlines */ + pe = &ptr[strlen(ptr)-1]; + while (pe >= ptr && (*pe == '\r' || *pe == '\n')) + { + *pe = '\0'; + pe--; + } + + if (*ptr != '#') /* Comment */ + { + if (! sendCommand(so, ptr)) + { + break; + } + } + } + fclose(fp); + return; +} diff --git a/maxscale.spec b/maxscale.spec index 9ab55ef41..7d7b8634d 100644 --- a/maxscale.spec +++ b/maxscale.spec @@ -1,7 +1,7 @@ %define _topdir %(echo $PWD)/ %define name maxscale -%define release ##RELEASE_TAG## -%define version ##VERSION_TAG## +%define release 1 +%define version 0.7 %define install_path /usr/local/sbin/ BuildRoot: %{buildroot} @@ -14,7 +14,10 @@ Source: %{name}-%{version}-%{release}.tar.gz Prefix: / Group: Development/Tools #Requires: -BuildRequires: gcc gcc-c++ ncurses-devel bison glibc-devel cmake libgcc perl make libtool openssl-devel libaio MariaDB-devel MariaDB-server +BuildRequires: gcc gcc-c++ ncurses-devel bison glibc-devel cmake libgcc perl make libtool openssl-devel libaio libaio-devel MariaDB-devel MariaDB-server +%if 0%{?rhel} == 6 +BuildRequires: libedit-devel +%endif %description MaxScale @@ -24,7 +27,7 @@ MaxScale %setup -q %build -ln -s /lib64/libaio.so.1 /lib64/libaio.so +#ln -s /lib64/libaio.so.1 /lib64/libaio.so make ROOT_PATH=`pwd` HOME="" $DEBUG_FLAG1 $DEBUG_FLAG2 clean make ROOT_PATH=`pwd` HOME="" $DEBUG_FLAG1 $DEBUG_FLAG2 depend make ROOT_PATH=`pwd` HOME="" $DEBUG_FLAG1 $DEBUG_FLAG2 diff --git a/query_classifier/query_classifier.cc b/query_classifier/query_classifier.cc index aafd746ce..2da6d4ad5 100644 --- a/query_classifier/query_classifier.cc +++ b/query_classifier/query_classifier.cc @@ -116,9 +116,7 @@ skygw_query_type_t skygw_query_classifier_get_type( query_str = const_cast(query); LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, - "%lu [skygw_query_classifier_get_type] Query : \"%s\"", - pthread_self(), - query_str))); + "Query : \"%s\"", query_str))); /** Get server handle */ mysql = mysql_init(NULL); diff --git a/server/core/buffer.c b/server/core/buffer.c index 11fb5b556..db6da8bec 100644 --- a/server/core/buffer.c +++ b/server/core/buffer.c @@ -308,7 +308,7 @@ bool gwbuf_set_type( case GWBUF_TYPE_MYSQL: case GWBUF_TYPE_PLAINSQL: case GWBUF_TYPE_UNDEFINED: - buf->gwbuf_type = type; + buf->gwbuf_type |= type; succp = true; break; default: diff --git a/server/core/config.c b/server/core/config.c index 988e263fd..8fd53318b 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -469,7 +469,7 @@ int error_count = 0; s = strtok(NULL, ","); } } - if (filters) + if (filters && obj->element) { serviceSetFilters(obj->element, filters); } @@ -1090,7 +1090,7 @@ SERVER *server; s = strtok(NULL, ","); } } - if (filters) + if (filters && obj->element) serviceSetFilters(obj->element, filters); } else if (!strcmp(type, "listener")) diff --git a/server/core/dbusers.c b/server/core/dbusers.c index 1ef765bc2..dd36d683c 100644 --- a/server/core/dbusers.c +++ b/server/core/dbusers.c @@ -213,9 +213,9 @@ getUsers(SERVICE *service, struct users *users) "Exiting."))); return -1; } - /* - * Attempt to connect to each database in the service in turn until - * we find one that we can connect to or until we run out of databases + /** + * Attempt to connect to one of the databases database or until we run + * out of databases * to try */ server = service->databases; diff --git a/server/core/dcb.c b/server/core/dcb.c index 5417abe80..52585e9d7 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -83,6 +83,7 @@ static bool dcb_set_state_nomutex( const dcb_state_t new_state, dcb_state_t* old_state); static void dcb_call_callback(DCB *dcb, DCB_REASON reason); +static DCB* dcb_get_next (DCB* dcb); DCB* dcb_get_zombies(void) { @@ -109,6 +110,7 @@ DCB *rval; #if defined(SS_DEBUG) rval->dcb_chk_top = CHK_NUM_DCB; rval->dcb_chk_tail = CHK_NUM_DCB; + rval->dcb_errhandle_called = false; #endif rval->dcb_role = role; #if 1 @@ -132,6 +134,9 @@ DCB *rval; rval->next = NULL; rval->callbacks = NULL; + rval->remote = NULL; + rval->user = NULL; + spinlock_acquire(&dcbspin); if (allDCBs == NULL) allDCBs = rval; @@ -148,7 +153,7 @@ DCB *rval; /** - * Free a DCB that has not been associated with a decriptor. + * Free a DCB that has not been associated with a descriptor. * * @param dcb The DCB to free */ @@ -311,6 +316,8 @@ DCB_CALLBACK *cb; free(dcb->data); if (dcb->remote) free(dcb->remote); + if (dcb->user) + free(dcb->user); /* Clear write and read buffers */ if (dcb->delayq) { @@ -559,7 +566,8 @@ int rc; dcb->fd = fd; /** Copy status field to DCB */ dcb->dcb_server_status = server->status; - + ss_debug(dcb->dcb_port = server->port;) + /*< * backend_dcb is connected to backend server, and once backend_dcb * is added to poll set, authentication takes place as part of @@ -594,26 +602,29 @@ int rc; * * @param dcb The DCB to read from * @param head Pointer to linked list to append data to - * @return -1 on error, otherwise the number of read bytes on the last. - * 0 is returned if no data available on the last iteration of while loop. + * @return -1 on error, otherwise the number of read bytes on the last + * iteration of while loop. 0 is returned if no data available. */ -int -dcb_read(DCB *dcb, GWBUF **head) +int dcb_read( + DCB *dcb, + GWBUF **head) { -GWBUF *buffer = NULL; -int b; -int rc; -int n = 0; -int eno = 0; - + GWBUF *buffer = NULL; + int b; + int rc; + int n ; + int nread = 0; + int eno = 0; + CHK_DCB(dcb); while (true) - { - int bufsize; - + { + int bufsize; + rc = ioctl(dcb->fd, FIONREAD, &b); - - if (rc == -1) { + + if (rc == -1) + { eno = errno; errno = 0; LOGIF(LE, (skygw_log_write_flush( @@ -628,19 +639,39 @@ int eno = 0; n = -1; goto return_n; } - /*< Nothing to read - leave */ - if (b == 0) { + + if (b == 0 && nread == 0) + { + /** Handle closed client socket */ + if (dcb_isclient(dcb)) + { + char c; + int l_errno = 0; + int r = -1; + + /* try to read 1 byte, without consuming the socket buffer */ + r = recv(dcb->fd, &c, sizeof(char), MSG_PEEK); + l_errno = errno; + + if (r <= 0 && + l_errno != EAGAIN && + l_errno != EWOULDBLOCK) + { + n = -1; + goto return_n; + } + } n = 0; goto return_n; } bufsize = MIN(b, MAX_BUFFER_SIZE); - - if ((buffer = gwbuf_alloc(bufsize)) == NULL) - { + + if ((buffer = gwbuf_alloc(bufsize)) == NULL) + { /*< - * This is a fatal error which should cause shutdown. - * Todo shutdown if memory allocation fails. - */ + * This is a fatal error which should cause shutdown. + * Todo shutdown if memory allocation fails. + */ LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Failed to allocate read buffer " @@ -653,16 +684,17 @@ int eno = 0; n = -1; ss_dassert(buffer != NULL); goto return_n; - } - GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); - dcb->stats.n_reads++); - - if (n <= 0) - { + } + GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); + dcb->stats.n_reads++); + + if (n <= 0) + { int eno = errno; errno = 0; - - if (eno != EAGAIN && eno != EWOULDBLOCK) { + + if (eno != 0 && eno != EAGAIN && eno != EWOULDBLOCK) + { LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Read failed, dcb %p in state " @@ -673,18 +705,11 @@ int eno = 0; eno, strerror(eno)))); } - else - { - /*< - * If read would block it means that other thread - * has probably read the data. - */ - n = 0; - } - - gwbuf_free(buffer); + gwbuf_free(buffer); goto return_n; } + nread += n; + LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, "%lu [dcb_read] Read %d bytes from dcb %p in state %s " @@ -694,14 +719,13 @@ int eno = 0; dcb, STRDCBSTATE(dcb->state), dcb->fd))); - /*< Append read data to the gwbuf */ - *head = gwbuf_append(*head, buffer); - } /*< while (true) */ + /*< Append read data to the gwbuf */ + *head = gwbuf_append(*head, buffer); + } /*< while (true) */ return_n: - return n; + return n; } - /** * General purpose routine to write to a DCB * @@ -711,7 +735,7 @@ return_n: int dcb_write(DCB *dcb, GWBUF *queue) { -int w, qlen; +int w; int saved_errno = 0; int below_water; @@ -760,26 +784,26 @@ int below_water; * not have a race condition on the event. */ if (queue) - qlen = gwbuf_length(queue); - else - qlen = 0; - atomic_add(&dcb->writeqlen, qlen); - dcb->writeq = gwbuf_append(dcb->writeq, queue); - dcb->stats.n_buffered++; - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [dcb_write] Append to writequeue. %d writes " - "buffered for dcb %p in state %s fd %d", - pthread_self(), - dcb->stats.n_buffered, - dcb, - STRDCBSTATE(dcb->state), - dcb->fd))); + { + int qlen; + + qlen = gwbuf_length(queue); + atomic_add(&dcb->writeqlen, qlen); + dcb->writeq = gwbuf_append(dcb->writeq, queue); + dcb->stats.n_buffered++; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [dcb_write] Append to writequeue. %d writes " + "buffered for dcb %p in state %s fd %d", + pthread_self(), + dcb->stats.n_buffered, + dcb, + STRDCBSTATE(dcb->state), + dcb->fd))); + } } else { - int len; - /* * Loop over the buffer chain that has been passed to us * from the reading side. @@ -788,6 +812,7 @@ int below_water; */ while (queue != NULL) { + int qlen; #if defined(SS_DEBUG) if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER && dcb->session != NULL) @@ -805,13 +830,13 @@ int below_water; } } #endif /* SS_DEBUG */ - len = GWBUF_LENGTH(queue); + qlen = GWBUF_LENGTH(queue); GW_NOINTR_CALL( w = gw_write( #if defined(SS_DEBUG) dcb, #endif - dcb->fd, GWBUF_DATA(queue), len); + dcb->fd, GWBUF_DATA(queue), qlen); dcb->stats.n_writes++; ); @@ -822,37 +847,39 @@ int below_water; if (LOG_IS_ENABLED(LOGFILE_DEBUG)) { - if (saved_errno == EPIPE) { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [dcb_write] Write to dcb " - "%p in state %s fd %d failed " - "due errno %d, %s", - pthread_self(), - dcb, - STRDCBSTATE(dcb->state), - dcb->fd, - saved_errno, - strerror(saved_errno)))); + if (saved_errno == EPIPE) + { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [dcb_write] Write to dcb " + "%p in state %s fd %d failed " + "due errno %d, %s", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state), + dcb->fd, + saved_errno, + strerror(saved_errno)))); } } + if (LOG_IS_ENABLED(LOGFILE_ERROR)) { if (saved_errno != EPIPE && saved_errno != EAGAIN && - saved_errno != EWOULDBLOCK) - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Write to dcb %p in " - "state %s fd %d failed due " - "errno %d, %s", - dcb, - STRDCBSTATE(dcb->state), - dcb->fd, - saved_errno, - strerror(saved_errno)))); - } + saved_errno != EWOULDBLOCK) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Write to dcb %p in " + "state %s fd %d failed due " + "errno %d, %s", + dcb, + STRDCBSTATE(dcb->state), + dcb->fd, + saved_errno, + strerror(saved_errno)))); + } } break; } @@ -876,20 +903,15 @@ int below_water; * for suspended write. */ dcb->writeq = queue; - if (queue) + + if (queue) { + int qlen; + qlen = gwbuf_length(queue); - } - else - { - qlen = 0; - } - atomic_add(&dcb->writeqlen, qlen); - - if (queue != NULL) - { - dcb->stats.n_buffered++; - } + atomic_add(&dcb->writeqlen, qlen); + dcb->stats.n_buffered++; + } } /* if (dcb->writeq) */ if (saved_errno != 0 && @@ -937,10 +959,10 @@ int above_water; above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0; spinlock_acquire(&dcb->writeqlock); - if (dcb->writeq) + + if (dcb->writeq) { int len; - /* * Loop over the buffer chain in the pending writeq * Send as much of the data in that chain as possible and @@ -996,16 +1018,17 @@ int above_water; } spinlock_release(&dcb->writeqlock); atomic_add(&dcb->writeqlen, -n); - /* The write queue has drained, potentially need to call a callback function */ + + /* The write queue has drained, potentially need to call a callback function */ if (dcb->writeq == NULL) dcb_call_callback(dcb, DCB_REASON_DRAINED); - if (above_water && dcb->writeqlen < dcb->low_water) + + if (above_water && dcb->writeqlen < dcb->low_water) { atomic_add(&dcb->stats.n_low_water, 1); dcb_call_callback(dcb, DCB_REASON_LOW_WATER); } - return n; } @@ -1024,13 +1047,15 @@ void dcb_close(DCB *dcb) { int rc; + CHK_DCB(dcb); /*< * dcb_close may be called for freshly created dcb, in which case * it only needs to be freed. */ - if (dcb->state == DCB_STATE_ALLOC) { + if (dcb->state == DCB_STATE_ALLOC) + { dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL); dcb_final_free(dcb); return; @@ -1041,13 +1066,19 @@ dcb_close(DCB *dcb) dcb->state == DCB_STATE_ZOMBIE); /*< - * Stop dcb's listening and modify state accordingly. - */ + * Stop dcb's listening and modify state accordingly. + */ rc = poll_remove_dcb(dcb); - - ss_dassert(dcb->state == DCB_STATE_NOPOLLING || - dcb->state == DCB_STATE_ZOMBIE); + ss_dassert(dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE); + /** + * close protocol and router session + */ + if (dcb->func.close != NULL) + { + dcb->func.close(dcb); + } dcb_call_callback(dcb, DCB_REASON_CLOSE); if (rc == 0) { @@ -1068,7 +1099,8 @@ dcb_close(DCB *dcb) STRDCBSTATE(dcb->state)))); } - if (dcb->state == DCB_STATE_NOPOLLING) { + if (dcb->state == DCB_STATE_NOPOLLING) + { dcb_add_to_zombieslist(dcb); } } @@ -1086,6 +1118,8 @@ printDCB(DCB *dcb) printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state)); if (dcb->remote) printf("\tConnected to: %s\n", dcb->remote); + if (dcb->user) + printf("\tUsername to: %s\n", dcb->user); if (dcb->writeq) printf("\tQueued write data: %d\n",gwbuf_length(dcb->writeq)); printf("\tStatistics:\n"); @@ -1143,6 +1177,9 @@ DCB *dcb; if (dcb->remote) dcb_printf(pdcb, "\tConnected to: %s\n", dcb->remote); + if (dcb->user) + dcb_printf(pdcb, "\tUsername: %s\n", + dcb->user); if (dcb->writeq) dcb_printf(pdcb, "\tQueued write data: %d\n", gwbuf_length(dcb->writeq)); @@ -1170,6 +1207,8 @@ DCB *dcb; spinlock_acquire(&dcbspin); dcb = allDCBs; + dcb_printf(pdcb, "Descriptor Control Blocks\n"); + dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n"); dcb_printf(pdcb, " %-10s | %-26s | %-20s | %s\n", "DCB", "State", "Service", "Remote"); dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n"); @@ -1182,7 +1221,7 @@ DCB *dcb; (dcb->remote ? dcb->remote : "")); dcb = dcb->next; } - dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n"); + dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n\n"); spinlock_release(&dcbspin); } @@ -1450,7 +1489,7 @@ static bool dcb_set_state_nomutex( } /*< switch (dcb->state) */ if (succp) { - LOGIF(LD, (skygw_log_write( + LOGIF(LD, (skygw_log_write_flush( LOGFILE_DEBUG, "%lu [dcb_set_state_nomutex] dcb %p fd %d %s -> %s", pthread_self(), @@ -1566,7 +1605,10 @@ int gw_write( * @return Non-zero (true) if the callback was added */ int -dcb_add_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata) +dcb_add_callback( + DCB *dcb, + DCB_REASON reason, + int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata) { DCB_CALLBACK *cb, *ptr; int rval = 1; @@ -1637,7 +1679,7 @@ int rval = 0; if (cb->reason == reason && cb->cb == callback && cb->userdata == userdata) { - if (pcb == NULL) + if (pcb != NULL) pcb->next = cb->next; else dcb->callbacks = cb->next; @@ -1711,3 +1753,72 @@ int rval = 0; return rval; } + +static DCB* dcb_get_next ( + DCB* dcb) +{ + DCB* p; + + spinlock_acquire(&dcbspin); + + p = allDCBs; + + if (dcb == NULL || p == NULL) + { + dcb = p; + + } + else + { + while (p != NULL && dcb != p) + { + p = p->next; + } + + if (p != NULL) + { + dcb = p->next; + } + else + { + dcb = NULL; + } + } + spinlock_release(&dcbspin); + + return dcb; +} + +void dcb_call_foreach ( + SERVER* srv, + DCB_REASON reason) +{ + switch (reason) { + case DCB_REASON_CLOSE: + case DCB_REASON_DRAINED: + case DCB_REASON_HIGH_WATER: + case DCB_REASON_LOW_WATER: + case DCB_REASON_ERROR: + case DCB_REASON_HUP: + case DCB_REASON_NOT_RESPONDING: + { + DCB* dcb; + dcb = dcb_get_next(NULL); + + while (dcb != NULL) + { + if (dcb->state == DCB_STATE_POLLING) + { + dcb_call_callback(dcb, DCB_REASON_NOT_RESPONDING); + } + dcb = dcb_get_next(dcb); + } + break; + } + + default: + break; + } + return; +} + diff --git a/server/core/filter.c b/server/core/filter.c index e077cbccf..8381e782b 100644 --- a/server/core/filter.c +++ b/server/core/filter.c @@ -220,6 +220,8 @@ int i; ptr = allFilters; if (ptr) { + dcb_printf(dcb, "Filters\n"); + dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n"); dcb_printf(dcb, "%-18s | %-15s | Options\n", "Filter", "Module"); dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n"); @@ -234,7 +236,7 @@ int i; ptr = ptr->next; } if (allFilters) - dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n"); + dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n\n"); spinlock_release(&filter_spin); } @@ -333,7 +335,7 @@ DOWNSTREAM *me; return NULL; } me->instance = filter->filter; - me->routeQuery = filter->obj->routeQuery; + me->routeQuery = (void *)(filter->obj->routeQuery); me->session = filter->obj->newSession(me->instance, session); filter->obj->setDownstream(me->instance, me->session, downstream); diff --git a/server/core/gateway.c b/server/core/gateway.c index 2bd592fe7..25c01e9b2 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -52,6 +52,7 @@ #include #include +#include #include #include #include diff --git a/server/core/gw_utils.c b/server/core/gw_utils.c index 0507e3d1c..2662ab41d 100644 --- a/server/core/gw_utils.c +++ b/server/core/gw_utils.c @@ -130,6 +130,7 @@ setipaddress(struct in_addr *a, char *p) { return 1; } #endif + return 0; } /** @@ -157,62 +158,6 @@ void gw_daemonize(void) { } } -///////////////////////////////////////////////// -// Read data from dcb and store it in the gwbuf -///////////////////////////////////////////////// -int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) { - GWBUF *buffer = NULL; - int n = -1; - - if (b <= 0) { - ss_dassert(false); -#if 0 - dcb->func.close(dcb); -#endif - return 1; - } - - while (b > 0) { - int bufsize = b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE; - if ((buffer = gwbuf_alloc(bufsize)) == NULL) { - /* Bad news, we have run out of memory */ - /* Error handling */ - (dcb->func).close(dcb); - return 1; - } - - GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); dcb->stats.n_reads++); - - if (n < 0) { - if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { - gwbuf_free(buffer); - return 1; - } else { - gwbuf_free(buffer); - (dcb->func).close(dcb); - return 1; - } - } - - if (n == 0) { - // socket closed - gwbuf_free(buffer); -#if 1 - (dcb->func).close(dcb); -#endif - return 1; - } - - // append read data to the gwbuf - *head = gwbuf_append(*head, buffer); - - // how many bytes left - b -= n; - } - - return 0; -} - /** * Parse the bind config data. This is passed in a string as address:port. * diff --git a/server/core/load_utils.c b/server/core/load_utils.c index 188535375..50efc5c89 100644 --- a/server/core/load_utils.c +++ b/server/core/load_utils.c @@ -359,6 +359,8 @@ dprintAllModules(DCB *dcb) { MODULES *ptr = registered; + dcb_printf(dcb, "Modules.\n"); + dcb_printf(dcb, "----------------+-------------+---------+-------+-------------------------\n"); dcb_printf(dcb, "%-15s | %-11s | Version | API | Status\n", "Module Name", "Module Type"); dcb_printf(dcb, "----------------+-------------+---------+-------+-------------------------\n"); while (ptr) @@ -380,5 +382,5 @@ MODULES *ptr = registered; dcb_printf(dcb, "\n"); ptr = ptr->next; } - dcb_printf(dcb, "----------------+-------------+---------+-------+-------------------------\n"); + dcb_printf(dcb, "----------------+-------------+---------+-------+-------------------------\n\n"); } diff --git a/server/core/poll.c b/server/core/poll.c index 0b23a6b05..87d3640f0 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -349,8 +349,8 @@ poll_waitevents(void *arg) ss_dassert(dcb->state != DCB_STATE_FREED); ss_debug(spinlock_release(&dcb->dcb_initlock);) - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, "%lu [poll_waitevents] event %d dcb %p " "role %s", pthread_self(), diff --git a/server/core/server.c b/server/core/server.c index 0a13b7751..c1bec6189 100644 --- a/server/core/server.c +++ b/server/core/server.c @@ -312,14 +312,16 @@ char *stat; ptr = allServers; if (ptr) { - dcb_printf(dcb, "%-18s | %-15s | Port | %-18s | Connections\n", + dcb_printf(dcb, "Servers.\n"); + dcb_printf(dcb, "-------------------+-----------------+-------+----------------------+------------\n"); + dcb_printf(dcb, "%-18s | %-15s | Port | %-20s | Connections\n", "Server", "Address", "Status"); - dcb_printf(dcb, "-------------------+-----------------+-------+--------------------+------------\n"); + dcb_printf(dcb, "-------------------+-----------------+-------+----------------------+------------\n"); } while (ptr) { stat = server_status(ptr); - dcb_printf(dcb, "%-18s | %-15s | %5d | %-18s | %4d\n", + dcb_printf(dcb, "%-18s | %-15s | %5d | %-20s | %4d\n", ptr->unique_name, ptr->name, ptr->port, stat, ptr->stats.n_current); @@ -327,7 +329,7 @@ char *stat; ptr = ptr->next; } if (allServers) - dcb_printf(dcb, "-------------------+-----------------+-------+--------------------+------------\n"); + dcb_printf(dcb, "-------------------+-----------------+-------+----------------------+------------\n\n"); spinlock_release(&server_spin); } diff --git a/server/core/service.c b/server/core/service.c index f7c626a34..d3db48390 100644 --- a/server/core/service.c +++ b/server/core/service.c @@ -650,12 +650,23 @@ FILTER_DEF **flist; char *ptr, *brkt; int n = 0; - flist = (FILTER_DEF *)malloc(sizeof(FILTER_DEF *)); + if ((flist = (FILTER_DEF **)malloc(sizeof(FILTER_DEF *))) == NULL) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "Out of memory adding filters to service.\n"))); + return; + } ptr = strtok_r(filters, "|", &brkt); while (ptr) { n++; - flist = (FILTER_DEF *)realloc(flist, (n + 1) * sizeof(FILTER_DEF *)); + if ((flist = (FILTER_DEF **)realloc(flist, + (n + 1) * sizeof(FILTER_DEF *))) == NULL) + { + LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR, + "Out of memory adding filters to service.\n"))); + return; + } if ((flist[n-1] = filter_find(trim(ptr))) == NULL) { LOGIF(LE, (skygw_log_write_flush( @@ -826,6 +837,8 @@ SERVICE *ptr; ptr = allServices; if (ptr) { + dcb_printf(dcb, "Services.\n"); + dcb_printf(dcb, "--------------------------+----------------------+--------+---------------\n"); dcb_printf(dcb, "%-25s | %-20s | #Users | Total Sessions\n", "Service Name", "Router Module"); dcb_printf(dcb, "--------------------------+----------------------+--------+---------------\n"); @@ -838,7 +851,7 @@ SERVICE *ptr; ptr = ptr->next; } if (allServices) - dcb_printf(dcb, "--------------------------+----------------------+--------+---------------\n"); + dcb_printf(dcb, "--------------------------+----------------------+--------+---------------\n\n"); spinlock_release(&service_spin); } @@ -857,9 +870,11 @@ SERV_PROTOCOL *lptr; ptr = allServices; if (ptr) { + dcb_printf(dcb, "Listeners.\n"); + dcb_printf(dcb, "---------------------+--------------------+-----------------+-------+--------\n"); dcb_printf(dcb, "%-20s | %-18s | %-15s | Port | State\n", "Service Name", "Protocol Module", "Address"); - dcb_printf(dcb, "---------------------+--------------------+-----------------+-------+------\n"); + dcb_printf(dcb, "---------------------+--------------------+-----------------+-------+--------\n"); } while (ptr) { @@ -868,7 +883,7 @@ SERV_PROTOCOL *lptr; { dcb_printf(dcb, "%-20s | %-18s | %-15s | %5d | %s\n", ptr->name, lptr->protocol, - (lptr != NULL) ? lptr->address : "*", + (lptr && lptr->address) ? lptr->address : "*", lptr->port, (lptr->listener->session->state == SESSION_STATE_LISTENER_STOPPED) ? "Stopped" : "Running" ); @@ -878,7 +893,7 @@ SERV_PROTOCOL *lptr; ptr = ptr->next; } if (allServices) - dcb_printf(dcb, "---------------------+--------------------+-----------------+-------+------\n"); + dcb_printf(dcb, "---------------------+--------------------+-----------------+-------+--------\n\n"); spinlock_release(&service_spin); } @@ -1081,3 +1096,9 @@ static void service_add_qualified_param( (*p)->next = NULL; spinlock_release(&svc->spin); } + +char* service_get_name( + SERVICE* svc) +{ + return svc->name; +} diff --git a/server/core/session.c b/server/core/session.c index 156fd9e51..e983af027 100644 --- a/server/core/session.c +++ b/server/core/session.c @@ -164,7 +164,8 @@ session_alloc(SERVICE *service, DCB *client_dcb) */ session->head.instance = service->router_instance; session->head.session = session->router_session; - session->head.routeQuery = service->router->routeQuery; + + session->head.routeQuery = (void *)(service->router->routeQuery); session->tail.instance = session; session->tail.session = session; @@ -546,19 +547,23 @@ SESSION *ptr; ptr = allSessions; if (ptr) { - dcb_printf(dcb, "Session | Client | State\n"); - dcb_printf(dcb, "-----------------+-----------------+----------------\n"); + dcb_printf(dcb, "Sessions.\n"); + dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n"); + dcb_printf(dcb, "Session | Client | Service | State\n"); + dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n"); } while (ptr) { - dcb_printf(dcb, "%-16p | %-15s | %s\n", ptr, + dcb_printf(dcb, "%-16p | %-15s | %-14s | %s\n", ptr, ((ptr->client && ptr->client->remote) ? ptr->client->remote : ""), + (ptr->service && ptr->service->name ? ptr->service->name + : ""), session_state(ptr->state)); ptr = ptr->next; } if (allSessions) - dcb_printf(dcb, "-----------------+-----------------+----------------\n"); + dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n\n"); spinlock_release(&session_spin); } @@ -671,7 +676,6 @@ int i; return 1; } - /** * Entry point for the final element int he upstream filter, i.e. the writing * of the data to the client. @@ -700,3 +704,30 @@ session_get_remote(SESSION *session) return session->client->remote; return NULL; } + +bool session_route_query ( + SESSION* ses, + GWBUF* buf) +{ + bool succp; + + if (ses->head.routeQuery == NULL || + ses->head.instance == NULL || + ses->head.session == NULL) + { + succp = false; + goto return_succp; + } + + if (ses->head.routeQuery(ses->head.instance, ses->head.session, buf) == 1) + { + succp = true; + } + else + { + succp = false; + } +return_succp: + return succp; +} + diff --git a/server/include/buffer.h b/server/include/buffer.h index 9651031b2..66c56322b 100644 --- a/server/include/buffer.h +++ b/server/include/buffer.h @@ -46,11 +46,14 @@ typedef enum { - GWBUF_TYPE_UNDEFINED = 0x0, - GWBUF_TYPE_PLAINSQL = 0x1, - GWBUF_TYPE_MYSQL = 0x2 + GWBUF_TYPE_UNDEFINED = 0x00, + GWBUF_TYPE_PLAINSQL = 0x01, + GWBUF_TYPE_MYSQL = 0x02 } gwbuf_type_t; +#define GWBUF_IS_TYPE_PLAINSQL(b) (b->gwbuf_type & GWBUF_TYPE_PLAINSQL) +#define GWBUF_IS_TYPE_MYSQL(b) (b->gwbuf_type & GWBUF_TYPE_MYSQL) + /** * A structure to encapsulate the data in a form that the data itself can be * shared between multiple GWBUF's without the need to make multiple copies diff --git a/server/include/dcb.h b/server/include/dcb.h index e7d2ec716..aa9b10e51 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -24,6 +24,8 @@ #include #include +#define ERRHANDLE + struct session; struct server; struct service; @@ -163,7 +165,8 @@ typedef enum { DCB_REASON_HIGH_WATER, /*< Cross high water mark */ DCB_REASON_LOW_WATER, /*< Cross low water mark */ DCB_REASON_ERROR, /*< An error was flagged on the connection */ - DCB_REASON_HUP /*< A hangup was detected */ + DCB_REASON_HUP, /*< A hangup was detected */ + DCB_REASON_NOT_RESPONDING /*< Server connection was lost */ } DCB_REASON; /** @@ -192,6 +195,7 @@ typedef struct dcb_callback { typedef struct dcb { #if defined(SS_DEBUG) skygw_chk_t dcb_chk_top; + bool dcb_errhandle_called; #endif dcb_role_t dcb_role; SPINLOCK dcb_initlock; @@ -204,12 +208,13 @@ typedef struct dcb { int fd; /**< The descriptor */ dcb_state_t state; /**< Current descriptor state */ char *remote; /**< Address of remote end */ + char *user; /**< User name for connection */ struct sockaddr_in ipv4; /**< remote end IPv4 address */ void *protocol; /**< The protocol specific state */ struct session *session; /**< The owning session */ GWPROTOCOL func; /**< The functions for this descriptor */ - unsigned int writeqlen; /**< Current number of byes in the write queue */ + int writeqlen; /**< Current number of byes in the write queue */ SPINLOCK writeqlock; /**< Write Queue spinlock */ GWBUF *writeq; /**< Write Data Queue */ SPINLOCK delayqlock; /**< Delay Backend Write Queue spinlock */ @@ -230,6 +235,7 @@ typedef struct dcb { unsigned int high_water; /**< High water mark */ unsigned int low_water; /**< Low water mark */ #if defined(SS_DEBUG) + int dcb_port; /**< port of target server */ skygw_chk_t dcb_chk_tail; #endif } DCB; diff --git a/server/include/router.h b/server/include/router.h index ce8f547d8..8f0851091 100644 --- a/server/include/router.h +++ b/server/include/router.h @@ -66,6 +66,12 @@ typedef void *ROUTER; * * @see load_module */ +typedef enum error_action { + ERRACT_NEW_CONNECTION = 0x001, + ERRACT_REPLY_CLIENT = 0x002 +} error_action_t; + + typedef struct router_object { ROUTER *(*createInstance)(SERVICE *service, char **options); void *(*newSession)(ROUTER *instance, SESSION *session); @@ -74,7 +80,13 @@ typedef struct router_object { int (*routeQuery)(ROUTER *instance, void *router_session, GWBUF *queue); void (*diagnostics)(ROUTER *instance, DCB *dcb); void (*clientReply)(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb); - void (*errorReply)(ROUTER* instance, void* router_session, char* message, DCB *backend_dcb, int action); + void (*handleError)( + ROUTER* instance, + void* router_session, + GWBUF* errmsgbuf, + DCB* backend_dcb, + error_action_t action, + bool* succp); uint8_t (*getCapabilities)(ROUTER *instance, void* router_session); } ROUTER_OBJECT; @@ -91,4 +103,6 @@ typedef enum router_capability_t { RCAP_TYPE_PACKET_INPUT = (1 << 1) } router_capability_t; + + #endif diff --git a/server/include/server.h b/server/include/server.h index b15453c18..d32413bb7 100644 --- a/server/include/server.h +++ b/server/include/server.h @@ -117,6 +117,11 @@ typedef struct server { */ #define SERVER_IN_MAINT(server) ((server)->status & SERVER_MAINT) +/** server is not master, slave or joined */ +#define SERVER_NOT_IN_CLUSTER(s) (((s)->status & (SERVER_MASTER|SERVER_SLAVE|SERVER_JOINED)) == 0) + +#define SERVER_IS_IN_CLUSTER(s) (((s)->status & (SERVER_MASTER|SERVER_SLAVE|SERVER_JOINED)) != 0) + extern SERVER *server_alloc(char *, char *, unsigned short); extern int server_free(SERVER *); extern SERVER *server_find_by_unique_name(char *); diff --git a/server/include/service.h b/server/include/service.h index 40023332b..1c6fd4822 100644 --- a/server/include/service.h +++ b/server/include/service.h @@ -155,6 +155,7 @@ extern int serviceStop(SERVICE *); extern int serviceRestart(SERVICE *); extern int serviceSetUser(SERVICE *, char *, char *); extern int serviceGetUser(SERVICE *, char **, char **); +extern void serviceSetFilters(SERVICE *, char *); extern int serviceEnableRootUser(SERVICE *, int ); extern void service_update(SERVICE *, char *, char *, char *); extern int service_refresh_users(SERVICE *); @@ -169,4 +170,5 @@ bool service_set_slave_conn_limit ( extern void dprintService(DCB *, SERVICE *); extern void dListServices(DCB *); extern void dListListeners(DCB *); +char* service_get_name(SERVICE* svc); #endif diff --git a/server/include/session.h b/server/include/session.h index 3fd1109e8..584f36b70 100644 --- a/server/include/session.h +++ b/server/include/session.h @@ -57,7 +57,7 @@ typedef enum { SESSION_STATE_ALLOC, /*< for all sessions */ SESSION_STATE_READY, /*< for router session */ SESSION_STATE_ROUTER_READY, /*< for router session */ - SESSION_STATE_STOPPING, /*< router is being closed */ + SESSION_STATE_STOPPING, /*< session and router are being closed */ SESSION_STATE_LISTENER, /*< for listener session */ SESSION_STATE_LISTENER_STOPPED, /*< for listener session */ SESSION_STATE_FREE /*< for all sessions */ diff --git a/server/modules/filter/topfilter.c b/server/modules/filter/topfilter.c index d7bf12828..63a2299c1 100644 --- a/server/modules/filter/topfilter.c +++ b/server/modules/filter/topfilter.c @@ -90,6 +90,7 @@ typedef struct { int topN; /* Number of queries to store */ char *filebase; /* Base of fielname to log into */ char *source; /* The source of the client connection */ + char *user; /* A user name to filter on */ char *match; /* Optional text to match against */ regex_t re; /* Compiled regex text */ char *exclude; /* Optional text to match against for exclusion */ @@ -117,6 +118,7 @@ typedef struct { UPSTREAM up; int active; char *clientHost; + char *userName; char *filename; int fd; struct timeval start; @@ -182,6 +184,7 @@ TOPN_INSTANCE *my_instance; my_instance->match = NULL; my_instance->exclude = NULL; my_instance->source = NULL; + my_instance->user = NULL; my_instance->filebase = strdup("top"); for (i = 0; params && params[i]; i++) { @@ -202,6 +205,8 @@ TOPN_INSTANCE *my_instance; } else if (!strcmp(params[i]->name, "source")) my_instance->source = strdup(params[i]->value); + else if (!strcmp(params[i]->name, "user")) + my_instance->user = strdup(params[i]->value); else if (!filter_standard_parameter(params[i]->name)) { LOGIF(LE, (skygw_log_write_flush( @@ -226,6 +231,7 @@ TOPN_INSTANCE *my_instance; my_instance->match))); free(my_instance->match); free(my_instance->source); + free(my_instance->user); free(my_instance->filebase); free(my_instance); return NULL; @@ -241,6 +247,7 @@ TOPN_INSTANCE *my_instance; regfree(&my_instance->re); free(my_instance->match); free(my_instance->source); + free(my_instance->user); free(my_instance->filebase); free(my_instance); return NULL; @@ -292,10 +299,17 @@ int i; my_session->clientHost = strdup(session->client->remote); else my_session->clientHost = NULL; + if (session && session->client && session->client->user) + my_session->userName = strdup(session->client->user); + else + my_session->userName = NULL; my_session->active = 1; if (my_instance->source && strcmp(my_session->clientHost, my_instance->source)) my_session->active = 0; + if (my_instance->user && strcmp(my_session->userName, + my_instance->user)) + my_session->active = 0; sprintf(my_session->filename, "%s.%d", my_instance->filebase, my_instance->sessions); @@ -328,30 +342,39 @@ FILE *fp; { fprintf(fp, "Top %d longest running queries in session.\n", my_instance->topN); + fprintf(fp, "==========================================\n\n"); + fprintf(fp, "Time (sec) | Query\n"); + fprintf(fp, "-----------+-----------------------------------------------------------------\n"); for (i = 0; i < my_instance->topN; i++) { if (my_session->top[i]->sql) { - fprintf(fp, "%.3f, %s\n", + fprintf(fp, "%10.3f | %s\n", (double)((my_session->top[i]->duration.tv_sec * 1000) + (my_session->top[i]->duration.tv_usec / 1000)) / 1000, my_session->top[i]->sql); } } - fprintf(fp, "\n\nTotal of %d statements executed.\n", - my_session->n_statements); - fprintf(fp, "Total statement execution time %d.%d seconds\n", - (int)my_session->total.tv_sec, - (int)my_session->total.tv_usec / 1000); - fprintf(fp, "Average statement execution time %.3f.\n", - (double)((my_session->total.tv_sec * 1000) - + (my_session->total.tv_usec / 1000)) - / (1000 * my_session->n_statements)); - fprintf(fp, "Total connection time %d.%d seconds\n", - (int)diff.tv_sec, (int)diff.tv_usec / 1000); + fprintf(fp, "-----------+-----------------------------------------------------------------\n"); + fprintf(fp, "\n\nSession started %s", + asctime(localtime(&my_session->connect))); if (my_session->clientHost) fprintf(fp, "Connection from %s\n", my_session->clientHost); + if (my_session->userName) + fprintf(fp, "Username %s\n", + my_session->userName); + fprintf(fp, "\nTotal of %d statements executed.\n", + my_session->n_statements); + fprintf(fp, "Total statement execution time %5d.%d seconds\n", + (int)my_session->total.tv_sec, + (int)my_session->total.tv_usec / 1000); + fprintf(fp, "Average statement execution time %9.3f seconds\n", + (double)((my_session->total.tv_sec * 1000) + + (my_session->total.tv_usec / 1000)) + / (1000 * my_session->n_statements)); + fprintf(fp, "Total connection time %5d.%d seconds\n", + (int)diff.tv_sec, (int)diff.tv_usec / 1000); fclose(fp); } } diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index 41bcf0416..bb7338a3e 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -88,7 +88,7 @@ #define SMALL_CHUNK 1024 #define MAX_CHUNK SMALL_CHUNK * 8 * 4 #define ToHex(Y) (Y>='0'&&Y<='9'?Y-'0':Y-'A'+10) - +#define COM_QUIT_PACKET_SIZE (4+1) struct dcb; typedef enum { @@ -104,7 +104,6 @@ typedef enum { MYSQL_SESSION_CHANGE } mysql_pstate_t; - /* * MySQL Protocol specific state data */ @@ -237,9 +236,10 @@ typedef enum #define MYSQL_COM_INIT_DB 0x2 #define MYSQL_COM_QUERY 0x3 -#define MYSQL_GET_COMMAND(payload) (payload[4]) -#define MYSQL_GET_PACKET_NO(payload) (payload[3]) +#define MYSQL_GET_COMMAND(payload) (payload[4]) +#define MYSQL_GET_PACKET_NO(payload) (payload[3]) #define MYSQL_GET_PACKET_LEN(payload) (gw_mysql_get_byte3(payload)) +#define MYSQL_GET_ERRCODE(payload) (gw_mysql_get_byte2(&payload[5])) #endif @@ -256,12 +256,21 @@ int gw_send_authentication_to_backend( uint8_t *passwd, MySQLProtocol *protocol); const char *gw_mysql_protocol_state2string(int state); -int gw_do_connect_to_backend(char *host, int port, int* fd); +int gw_do_connect_to_backend(char *host, int port, int* fd); +int mysql_send_com_quit(DCB* dcb, int packet_number, GWBUF* buf); +GWBUF* mysql_create_com_quit(GWBUF* bufparam, int packet_number); + int mysql_send_custom_error ( DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message); + +GWBUF* mysql_create_custom_error( + int packet_number, + int affected_rows, + const char* msg); + int gw_send_change_user_to_backend( char *dbname, char *user, @@ -297,12 +306,12 @@ void gw_str_xor( const uint8_t *input1, const uint8_t *input2, unsigned int len); -char *gw_bin2hex(char *out, const uint8_t *in, unsigned int len); -int gw_hex2bin(uint8_t *out, const char *in, unsigned int len); -int gw_generate_random_str(char *output, int len); -char *gw_strend(register const char *s); -int setnonblocking(int fd); -int setipaddress(struct in_addr *a, char *p); -int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b); + +char *gw_bin2hex(char *out, const uint8_t *in, unsigned int len); +int gw_hex2bin(uint8_t *out, const char *in, unsigned int len); +int gw_generate_random_str(char *output, int len); +char *gw_strend(register const char *s); +int setnonblocking(int fd); +int setipaddress(struct in_addr *a, char *p); GWBUF* gw_MySQL_get_next_packet(GWBUF** p_readbuf); diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 00857ae1b..b18bd5473 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -31,6 +31,18 @@ #include +typedef enum bref_state { + BREF_NOT_USED = 0x00, + BREF_IN_USE = 0x01, + BREF_WAITING_RESULT = 0x02, /*< for anything that responds */ + BREF_CLOSED = 0x04 +} bref_state_t; + +#define BREF_IS_NOT_USED(s) (s->bref_state & BREF_NOT_USED) +#define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE) +#define BREF_IS_WAITING_RESULT(s) (s->bref_state & BREF_WAITING_RESULT) +#define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED) + typedef enum backend_type_t { BE_UNDEFINED=-1, BE_MASTER, @@ -43,8 +55,8 @@ typedef struct rses_property_st rses_property_t; typedef struct router_client_session ROUTER_CLIENT_SES; typedef enum rses_property_type_t { - RSES_PROP_TYPE_UNDEFINED=0, - RSES_PROP_TYPE_SESCMD, + RSES_PROP_TYPE_UNDEFINED=-1, + RSES_PROP_TYPE_SESCMD=0, RSES_PROP_TYPE_FIRST = RSES_PROP_TYPE_SESCMD, RSES_PROP_TYPE_LAST=RSES_PROP_TYPE_SESCMD, RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1 @@ -159,6 +171,7 @@ typedef struct backend_ref_st { #endif BACKEND* bref_backend; DCB* bref_dcb; + bref_state_t bref_state; sescmd_cursor_t bref_sescmd_cur; #if defined(SS_DEBUG) skygw_chk_t bref_chk_tail; diff --git a/server/modules/monitor/mysql_mon.c b/server/modules/monitor/mysql_mon.c index d643a00b9..38625a6ed 100644 --- a/server/modules/monitor/mysql_mon.c +++ b/server/modules/monitor/mysql_mon.c @@ -73,6 +73,8 @@ static void diagnostics(DCB *, void *); static void setInterval(void *, unsigned long); static void defaultId(void *, unsigned long); static void replicationHeartbeat(void *, int); +static bool mon_status_changed(MONITOR_SERVERS* mon_srv); +static bool mon_print_fail_status(MONITOR_SERVERS* mon_srv); static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUser, diagnostics, setInterval, defaultId, replicationHeartbeat }; @@ -142,6 +144,7 @@ MYSQL_MONITOR *handle; handle->defaultPasswd = NULL; handle->id = MONITOR_DEFAULT_ID; handle->interval = MONITOR_INTERVAL; + handle->replicationHeartbeat = 0; spinlock_init(&handle->lock); } handle->tid = (THREAD)thread_start(monitorMain, handle); @@ -180,7 +183,10 @@ MONITOR_SERVERS *ptr, *db; db->server = server; db->con = NULL; db->next = NULL; + db->mon_err_count = 0; + db->mon_prev_status = 0; spinlock_acquire(&handle->lock); + if (handle->databases == NULL) handle->databases = db; else @@ -307,21 +313,25 @@ char *sep; static void monitorDatabase(MYSQL_MONITOR *handle, MONITOR_SERVERS *database) { -MYSQL_ROW row; -MYSQL_RES *result; -int num_fields; -int ismaster = 0, isslave = 0; -char *uname = handle->defaultUser, *passwd = handle->defaultPasswd; -unsigned long int server_version = 0; -char *server_string; -unsigned long id = handle->id; -int replication_heartbeat = handle->replicationHeartbeat; +MYSQL_ROW row; +MYSQL_RES *result; +int num_fields; +int ismaster = 0; +int isslave = 0; +char *uname = handle->defaultUser; +char *passwd = handle->defaultPasswd; +unsigned long int server_version = 0; +char *server_string; +unsigned long id = handle->id; +int replication_heartbeat = handle->replicationHeartbeat; +static int conn_err_count; - if (database->server->monuser != NULL) + if (database->server->monuser != NULL) { uname = database->server->monuser; passwd = database->server->monpw; } + if (uname == NULL) return; @@ -329,12 +339,17 @@ int replication_heartbeat = handle->replicationHeartbeat; if (SERVER_IN_MAINT(database->server)) return; + /** Store prevous status */ + database->mon_prev_status = database->server->status; + if (database->con == NULL || mysql_ping(database->con) != 0) { char *dpwd = decryptPassword(passwd); int rc; int read_timeout = 1; - database->con = mysql_init(NULL); + + database->con = mysql_init(NULL); + rc = mysql_options(database->con, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout); if (mysql_real_connect(database->con, @@ -346,23 +361,27 @@ int replication_heartbeat = handle->replicationHeartbeat; NULL, 0) == NULL) { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Monitor was unable to connect to " - "server %s:%d : \"%s\"", - database->server->name, - database->server->port, - mysql_error(database->con)))); + free(dpwd); - free(dpwd); + if (mon_print_fail_status(database)) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Monitor was unable to connect to " + "server %s:%d : \"%s\"", + database->server->name, + database->server->port, + mysql_error(database->con)))); + } + /** Store current status */ server_clear_status(database->server, SERVER_RUNNING); + return; } free(dpwd); - } - - /* If we get this far then we have a working connection */ - server_set_status(database->server, SERVER_RUNNING); + } + /** Store current status */ + server_set_status(database->server, SERVER_RUNNING); /* get server version from current server */ server_version = mysql_get_server_version(database->con); @@ -529,7 +548,7 @@ int replication_heartbeat = handle->replicationHeartbeat; } mysql_free_result(result); - if (isslave == i) + if (isslave > 0 && isslave == i) isslave = 1; else isslave = 0; @@ -622,7 +641,7 @@ int replication_heartbeat = handle->replicationHeartbeat; } } } - + /** Store current status */ if (ismaster) { server_set_status(database->server, SERVER_MASTER); @@ -672,21 +691,33 @@ MONITOR_SERVERS *ptr; ptr = handle->databases; while (ptr) { - unsigned int prev_status = ptr->server->status; - monitorDatabase(handle, ptr); + + if (mon_status_changed(ptr)) + { + dcb_call_foreach(ptr->server, DCB_REASON_NOT_RESPONDING); + } - if (ptr->server->status != prev_status || - SERVER_IS_DOWN(ptr->server)) + if (mon_status_changed(ptr) || + mon_print_fail_status(ptr)) { LOGIF(LM, (skygw_log_write_flush( LOGFILE_MESSAGE, "Backend server %s:%d state : %s", ptr->server->name, ptr->server->port, - STRSRVSTATUS(ptr->server)))); + STRSRVSTATUS(ptr->server)))); + } + if (SERVER_IS_DOWN(ptr->server)) + { + /** Increase this server'e error count */ + ptr->mon_err_count += 1; + } + else + { + /** Reset this server's error count */ + ptr->mon_err_count = 0; } - ptr = ptr->next; } thread_millisleep(handle->interval); @@ -731,3 +762,39 @@ replicationHeartbeat(void *arg, int replicationHeartbeat) MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; memcpy(&handle->replicationHeartbeat, &replicationHeartbeat, sizeof(int)); } + +static bool mon_status_changed( + MONITOR_SERVERS* mon_srv) +{ + bool succp; + + if (mon_srv->mon_prev_status != mon_srv->server->status) + { + succp = true; + } + else + { + succp = false; + } + return succp; +} + +static bool mon_print_fail_status( + MONITOR_SERVERS* mon_srv) +{ + bool succp; + int errcount = mon_srv->mon_err_count; + uint8_t modval; + + modval = 1<<(MIN(errcount/10, 7)); + + if (SERVER_IS_DOWN(mon_srv->server) && errcount%modval == 0) + { + succp = true; + } + else + { + succp = false; + } + return succp; +} diff --git a/server/modules/monitor/mysqlmon.h b/server/modules/monitor/mysqlmon.h index 8f5bcd704..5b5c7d04a 100644 --- a/server/modules/monitor/mysqlmon.h +++ b/server/modules/monitor/mysqlmon.h @@ -42,6 +42,8 @@ typedef struct monitor_servers { SERVER *server; /**< The server being monitored */ MYSQL *con; /**< The MySQL connection */ + int mon_err_count; + unsigned int mon_prev_status; struct monitor_servers *next; /**< The next server in the list */ } MONITOR_SERVERS; diff --git a/server/modules/protocol/httpd.c b/server/modules/protocol/httpd.c index 7d06264b9..7db1366ad 100644 --- a/server/modules/protocol/httpd.c +++ b/server/modules/protocol/httpd.c @@ -245,7 +245,7 @@ HTTPD_session *client_data = NULL; } /* force the client connecton close */ - dcb->func.close(dcb); + dcb_close(dcb); return 0; } @@ -359,7 +359,6 @@ int n_connect = 0; static int httpd_close(DCB *dcb) { - dcb_close(dcb); return 0; } diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 08589438e..4ef59bbe1 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -65,7 +65,9 @@ static int gw_backend_hangup(DCB *dcb); static int backend_write_delayqueue(DCB *dcb); static void backend_set_delayqueue(DCB *dcb, GWBUF *queue); static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue); -static int gw_session(DCB *backend_dcb, void *data); +#if defined(NOT_USED) + static int gw_session(DCB *backend_dcb, void *data); +#endif static MYSQL_session* gw_get_shared_session_auth_info(DCB* dcb); static GWPROTOCOL MyObject = { @@ -79,7 +81,7 @@ static GWPROTOCOL MyObject = { gw_backend_close, /* Close */ NULL, /* Listen */ gw_change_user, /* Authentication */ - gw_session /* Session */ + NULL /* Session */ }; /* @@ -195,6 +197,14 @@ static int gw_read_backend_event(DCB *dcb) { if (gw_read_backend_handshake(backend_protocol) != 0) { backend_protocol->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] after " + "gw_read_backend_handshake, fd %d, " + "state = MYSQL_AUTH_FAILED.", + pthread_self(), + backend_protocol->owner_dcb->fd))); + } else { /* handshake decoded, send the auth credentials */ if (gw_send_authentication_to_backend( @@ -204,6 +214,13 @@ static int gw_read_backend_event(DCB *dcb) { backend_protocol) != 0) { backend_protocol->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] after " + "gw_send_authentication_to_backend " + "fd %d, state = MYSQL_AUTH_FAILED.", + pthread_self(), + backend_protocol->owner_dcb->fd))); } else { backend_protocol->state = MYSQL_AUTH_RECV; } @@ -240,6 +257,7 @@ static int gw_read_backend_event(DCB *dcb) { router = session->service->router; router_instance = session->service->router_instance; + rsession = session->router_session; if (backend_protocol->state == MYSQL_AUTH_RECV) { /*< @@ -251,13 +269,21 @@ static int gw_read_backend_event(DCB *dcb) { switch (receive_rc) { case -1: backend_protocol->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_event] after " + "gw_receive_backend_authentication " + "fd %d, state = MYSQL_AUTH_FAILED.", + backend_protocol->owner_dcb->fd, + pthread_self()))); + LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Backend server didn't " "accept authentication for user " "%s.", - current_session->user))); + current_session->user))); break; case 1: backend_protocol->state = MYSQL_IDLE; @@ -298,72 +324,50 @@ static int gw_read_backend_event(DCB *dcb) { */ spinlock_release(&dcb->authlock); spinlock_acquire(&dcb->delayqlock); - /*< - * vraa : errorHandle - * check the delayq before the reply - */ - if (dcb->delayq != NULL) { - /* send an error to the client */ - mysql_send_custom_error( - dcb->session->client, - 1, - 0, - "Connection to backend lost."); - // consume all the delay queue - while ((dcb->delayq = gwbuf_consume( + + if (dcb->delayq != NULL) + { + while ((dcb->delayq = gwbuf_consume( dcb->delayq, GWBUF_LENGTH(dcb->delayq))) != NULL); } spinlock_release(&dcb->delayqlock); - - /* try reload users' table for next connection */ - service_refresh_users(dcb->session->service); - - while (session->state != SESSION_STATE_ROUTER_READY && - session->state != SESSION_STATE_STOPPING) - { - ss_dassert( - session->state == SESSION_STATE_READY || - session->state == - SESSION_STATE_ROUTER_READY || - session->state == SESSION_STATE_STOPPING); - /** - * Session shouldn't be NULL at this point - * anymore. Just checking.. - */ - if (session->client->session == NULL) - { - rc = 1; - goto return_rc; - } - usleep(1); - } - - if (session->state == SESSION_STATE_STOPPING) { - goto return_rc; + GWBUF* errbuf; + bool succp; + + /* try reload users' table for next connection */ + service_refresh_users(dcb->session->service); +#if defined(SS_DEBUG) + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Backend read error handling."))); +#endif + + errbuf = mysql_create_custom_error( + 1, + 0, + "Authentication with backend failed. " + "Session will be closed."); + + router->handleError(router_instance, + rsession, + errbuf, + dcb, + ERRACT_REPLY_CLIENT, + &succp); + + ss_dassert(!succp); + + if (session != NULL) + { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + } + dcb_close(dcb); } - spinlock_acquire(&session->ses_lock); - session->state = SESSION_STATE_STOPPING; - spinlock_release(&session->ses_lock); - - /** - * rsession shouldn't be NULL since session - * state indicates that it was initialized - * successfully. - */ - rsession = session->router_session; - ss_dassert(rsession != NULL); - - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [gw_read_backend_event] " - "Call closeSession for backend's " - "router client session.", - pthread_self()))); - /* close router_session */ - router->closeSession(router_instance, rsession); rc = 1; goto return_rc; } @@ -401,17 +405,56 @@ static int gw_read_backend_event(DCB *dcb) { SESSION *session = dcb->session; CHK_SESSION(session); - /* read available backend data */ - rc = dcb_read(dcb, &writebuf); + router = session->service->router; + router_instance = session->service->router_instance; + rsession = session->router_session; + + /* read available backend data */ + rc = dcb_read(dcb, &writebuf); - if (rc < 0) { + if (rc < 0) + { /*< vraa : errorHandle */ /*< * Backend generated EPOLLIN event and if backend has * failed, connection must be closed to avoid backend * dcb from getting hanged. */ - (dcb->func).close(dcb); + GWBUF* errbuf; + bool succp; + /** + * - send error for client + * - mark failed backend BREF_NOT_USED + * - go through all servers and select one according to + * the criteria that user specified in the beginning. + */ + +#if defined(SS_DEBUG) + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Backend read error handling #2."))); +#endif + + + errbuf = mysql_create_custom_error( + 1, + 0, + "Read from backend failed"); + + router->handleError(router_instance, + rsession, + errbuf, + dcb, + ERRACT_NEW_CONNECTION, + &succp); + + if (!succp) + { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + } + dcb_close(dcb); rc = 0; goto return_rc; } @@ -420,18 +463,6 @@ static int gw_read_backend_event(DCB *dcb) { rc = 0; goto return_rc; } - router = session->service->router; - router_instance = session->service->router_instance; - rsession = session->router_session; - - /* Note the gwbuf doesn't have here a valid queue->command - * descriptions as it is a fresh new one! - * We only have the copied value in dcb->command from - * previuos func.write() and this will be used by the - * router->clientReply - * and pass now the gwbuf to the router - */ - /*< * If dcb->session->client is freed already it may be NULL. */ @@ -443,7 +474,8 @@ static int gw_read_backend_event(DCB *dcb) { if (client_protocol->state == MYSQL_IDLE) { - router->clientReply(router_instance, + gwbuf_set_type(writebuf, GWBUF_TYPE_MYSQL); + router->clientReply(router_instance, rsession, writebuf, dcb); @@ -451,6 +483,7 @@ static int gw_read_backend_event(DCB *dcb) { } goto return_rc; } else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL) { + gwbuf_set_type(writebuf, GWBUF_TYPE_MYSQL); router->clientReply(router_instance, rsession, writebuf, dcb); rc = 1; } @@ -550,29 +583,6 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) MySQLProtocol *backend_protocol = dcb->protocol; int rc = 0; - /*< - * Don't write to backend if backend_dcb is not in poll set anymore. - */ - spinlock_acquire(&dcb->dcb_initlock); - - if (dcb->state != DCB_STATE_POLLING) { - /*< vraa : errorHandle */ - /*< Free buffer memory */ - gwbuf_consume(queue, GWBUF_LENGTH(queue)); - - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [gw_MySQLWrite_backend] Write to backend failed. " - "Backend dcb %p fd %d is %s.", - pthread_self(), - dcb, - dcb->fd, - STRDCBSTATE(dcb->state)))); - spinlock_release(&dcb->dcb_initlock); - rc = 0; - goto return_rc; - } - spinlock_release(&dcb->dcb_initlock); spinlock_acquire(&dcb->authlock); /** * Pick action according to state of protocol. @@ -600,11 +610,11 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) queue, GWBUF_LENGTH(queue))) != NULL); free(str); - } rc = 0; spinlock_release(&dcb->authlock); goto return_rc; break; + } case MYSQL_IDLE: LOGIF(LD, (skygw_log_write( @@ -616,6 +626,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) dcb->fd, STRPROTOCOLSTATE(backend_protocol->state)))); spinlock_release(&dcb->authlock); + rc = dcb_write(dcb, queue); goto return_rc; break; @@ -644,73 +655,57 @@ return_rc: } /** - * Backend Error Handling for EPOLLER - * + * Error event handler. + * Create error message, pass it to router's error handler and if error + * handler fails in providing enough backend servers, mark session being + * closed and call DCB close function which triggers closing router session + * and related backends (if any exists. */ -static int gw_error_backend_event(DCB *dcb) { - SESSION *session; - void *rsession; - ROUTER_OBJECT *router; - ROUTER *router_instance; - int rc = 0; - +static int gw_error_backend_event(DCB *dcb) +{ + SESSION* session; + void* rsession; + ROUTER_OBJECT* router; + ROUTER* router_instance; + int rc = 0; + GWBUF* errbuf; + bool succp; + CHK_DCB(dcb); session = dcb->session; CHK_SESSION(session); + rsession = session->router_session; + router = session->service->router; + router_instance = session->service->router_instance; - router = session->service->router; - router_instance = session->service->router_instance; +#if defined(SS_DEBUG) + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Backend error event handling."))); +#endif - if (dcb->state != DCB_STATE_POLLING) { - /*< vraa : errorHandle */ - /*< - * if client is not available it needs to be handled in send - * function. Session != NULL, that is known. - */ - mysql_send_custom_error( - dcb->session->client, - 1, - 0, - "Writing to backend failed."); - - rc = 0; - } else { - /*< vraa : errorHandle */ - mysql_send_custom_error( - dcb->session->client, - 1, - 0, - "Closed backend connection."); - rc = 1; - } - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [gw_error_backend_event] Some error occurred in backend. " - "rc = %d", - pthread_self(), - rc))); - - if (session->state == SESSION_STATE_ROUTER_READY) - { + + errbuf = mysql_create_custom_error( + 1, + 0, + "Lost connection to backend server."); + + router->handleError(router_instance, + rsession, + errbuf, + dcb, + ERRACT_NEW_CONNECTION, + &succp); + + /** There are not required backends available, close session. */ + if (!succp) { spinlock_acquire(&session->ses_lock); session->state = SESSION_STATE_STOPPING; spinlock_release(&session->ses_lock); - - rsession = session->router_session; - /*< - * rsession should never be NULL here. - */ - ss_dassert(rsession != NULL); - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [gw_error_backend_event] " - "Call closeSession for backend " - "session.", - pthread_self()))); - - router->closeSession(router_instance, rsession); } - return rc; + dcb_close(dcb); + + return 1; } /* @@ -811,7 +806,11 @@ return_fd: /** - * Hangup routine the backend dcb: it does nothing + * Error event handler. + * Create error message, pass it to router's error handler and if error + * handler fails in providing enough backend servers, mark session being + * closed and call DCB close function which triggers closing router session + * and related backends (if any exists. * * @param dcb The current Backend DCB * @return 1 always @@ -819,21 +818,90 @@ return_fd: static int gw_backend_hangup(DCB *dcb) { - /*< vraa : errorHandle */ + SESSION* session; + void* rsession; + ROUTER_OBJECT* router; + ROUTER* router_instance; + int rc = 0; + bool succp; + GWBUF* errbuf; + + CHK_DCB(dcb); + session = dcb->session; + CHK_SESSION(session); + rsession = session->router_session; + router = session->service->router; + router_instance = session->service->router_instance; + +#if defined(SS_DEBUG) + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Backend hangup error handling."))); +#endif + + + errbuf = mysql_create_custom_error( + 1, + 0, + "Lost connection to backend server."); + + router->handleError(router_instance, + rsession, + errbuf, + dcb, + ERRACT_NEW_CONNECTION, + &succp); + + /** There are not required backends available, close session. */ + if (!succp) { +#if defined(SS_DEBUG) + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Backend hangup -> closing session."))); +#endif + + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + } + dcb_close(dcb); + return 1; } /** - * Close the backend dcb - * + * Send COM_QUIT to backend so that it can be closed. * @param dcb The current Backend DCB * @return 1 always */ static int gw_backend_close(DCB *dcb) { - /*< vraa : errorHandle */ - dcb_close(dcb); + DCB* client_dcb; + SESSION* session; + GWBUF* quitbuf; + bool succp; + + CHK_DCB(dcb); + session = dcb->session; + CHK_SESSION(session); + + quitbuf = mysql_create_com_quit(NULL, 0); + + /** Send COM_QUIT to the backend being closed */ + mysql_send_com_quit(dcb, 0, quitbuf); + + if (session != NULL && session->state == SESSION_STATE_STOPPING) + { + client_dcb = session->client; + + if (client_dcb != NULL && + client_dcb->state == DCB_STATE_POLLING) + { + /** Close client DCB */ + dcb_close(client_dcb); + } + } return 1; } @@ -883,27 +951,56 @@ static int backend_write_delayqueue(DCB *dcb) } else { - localq = dcb->delayq; - dcb->delayq = NULL; - spinlock_release(&dcb->delayqlock); - rc = dcb_write(dcb, localq); + localq = dcb->delayq; + dcb->delayq = NULL; + spinlock_release(&dcb->delayqlock); + rc = dcb_write(dcb, localq); } - if (rc == 0) { + if (rc == 0) + { + GWBUF* errbuf; + bool succp; + ROUTER_OBJECT *router = NULL; + ROUTER *router_instance = NULL; + void *rsession = NULL; + SESSION *session = dcb->session; + int receive_rc = 0; + + CHK_SESSION(session); + + router = session->service->router; + router_instance = session->service->router_instance; + rsession = session->router_session; +#if defined(SS_DEBUG) LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, - "Error : failed to write buffered data to back-end " - "server. Buffer was empty of back-end was disconnected " - "during operation."))); - - mysql_send_custom_error( - dcb->session->client, - 1, - 0, + "Backend write delayqueue error handling."))); +#endif + errbuf = mysql_create_custom_error( + 1, + 0, "Failed to write buffered data to back-end server. " "Buffer was empty or back-end was disconnected during " - "operation."); - dcb_close(dcb); + "operation. Session will be closed."); + + router->handleError(router_instance, + rsession, + errbuf, + dcb, + ERRACT_NEW_CONNECTION, + &succp); + + if (!succp) + { + if (session != NULL) + { + spinlock_acquire(&session->ses_lock); + session->state = SESSION_STATE_STOPPING; + spinlock_release(&session->ses_lock); + } + dcb_close(dcb); + } } return rc; @@ -911,7 +1008,12 @@ static int backend_write_delayqueue(DCB *dcb) -static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWBUF *queue) { +static int gw_change_user( + DCB *backend, + SERVER *server, + SESSION *in_session, + GWBUF *queue) +{ MYSQL_session *current_session = NULL; MySQLProtocol *backend_protocol = NULL; MySQLProtocol *client_protocol = NULL; @@ -997,6 +1099,7 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB * @param * @return always 1 */ +/* static int gw_session(DCB *backend_dcb, void *data) { GWBUF *queue = NULL; @@ -1006,3 +1109,4 @@ static int gw_session(DCB *backend_dcb, void *data) { return 1; } +*/ diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 8134005a4..7f3ebd853 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -483,6 +483,11 @@ static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue) { if (auth_token) free(auth_token); + if (auth_ret == 0) + { + dcb->user = strdup(client_data->user); + } + return auth_ret; } @@ -504,75 +509,32 @@ gw_MySQLWrite_client(DCB *dcb, GWBUF *queue) * @param dcb Descriptor control block * @return 0 if succeed, 1 otherwise */ -int gw_read_client_event(DCB* dcb) { +int gw_read_client_event( + DCB* dcb) +{ SESSION *session = NULL; ROUTER_OBJECT *router = NULL; ROUTER *router_instance = NULL; void *rsession = NULL; MySQLProtocol *protocol = NULL; GWBUF *read_buffer = NULL; - int b = -1; int rc = 0; int nbytes_read = 0; CHK_DCB(dcb); protocol = DCB_PROTOCOL(dcb, MySQLProtocol); CHK_PROTOCOL(protocol); - /** - * Check how many bytes are readable in dcb->fd. - */ - if (ioctl(dcb->fd, FIONREAD, &b) != 0) { - int eno = errno; - errno = 0; - - LOGIF(LE, (skygw_log_write( - LOGFILE_ERROR, - "%lu [gw_read_client_event] ioctl FIONREAD for fd " - "%d failed. errno %d, %s. dcb->state = %d", - pthread_self(), - dcb->fd, - eno, - strerror(eno), - dcb->state))); - rc = 1; - goto return_rc; + rc = dcb_read(dcb, &read_buffer); + + if (rc < 0) + { + dcb_close(dcb); } - - /* - * Handle the closed client socket. - */ - if (b == 0) { - char c; - int l_errno = 0; - int r = -1; - - rc = 0; - - /* try to read 1 byte, without consuming the socket buffer */ - r = recv(dcb->fd, &c, sizeof(char), MSG_PEEK); - l_errno = errno; - - if (r <= 0) { - if ( (l_errno == EAGAIN) || (l_errno == EWOULDBLOCK)) { - goto return_rc; - } - - // close client socket and the session too - dcb->func.close(dcb); - } else { - // do nothing if reading 1 byte - } - - goto return_rc; - } - rc = gw_read_gwbuff(dcb, &read_buffer, b); - - if (rc != 0) { - goto return_rc; - } - nbytes_read = gwbuf_length(read_buffer); - ss_dassert(nbytes_read > 0); - + + if (nbytes_read == 0) + { + goto return_rc; + } /** * if read queue existed appent read to it. * if length of read buffer is less than 3 or less than mysql packet @@ -602,7 +564,8 @@ int gw_read_client_event(DCB* dcb) { else { /** - * There is at least one complete mysql packet read + * There is at least one complete mysql packet in + * read_buffer. */ read_buffer = dcb->dcb_readqueue; dcb->dcb_readqueue = NULL; @@ -627,58 +590,80 @@ int gw_read_client_event(DCB* dcb) { switch (protocol->state) { case MYSQL_AUTH_SENT: - /* - * Read all the data that is available into a chain of buffers - */ { int auth_val = -1; auth_val = gw_mysql_do_authentication(dcb, read_buffer); - // Data handled withot the dcb->func.write - // so consume it now - // be sure to consume it all read_buffer = gwbuf_consume(read_buffer, nbytes_read); + ss_dassert(read_buffer == NULL || GWBUF_EMPTY(read_buffer)); if (auth_val == 0) { SESSION *session = NULL; protocol->state = MYSQL_AUTH_RECV; - //write to client mysql AUTH_OK packet, packet n. is 2 - // start a new session, and connect to backends + /** + * Create session, and a router session for it. + * If successful, there will be backend connection(s) + * after this point. + */ session = session_alloc(dcb->service, dcb); - if (session != NULL) { + if (session != NULL) + { CHK_SESSION(session); ss_dassert(session->state != SESSION_STATE_ALLOC); protocol->state = MYSQL_IDLE; + /** + * Send an AUTH_OK packet to the client, + * packet sequence is # 2 + */ mysql_send_ok(dcb, 2, 0, NULL); - } else { + } + else + { protocol->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_client_event] session " + "creation failed. fd %d, " + "state = MYSQL_AUTH_FAILED.", + protocol->owner_dcb->fd, + pthread_self()))); + + /** Send ERR 1045 to client */ mysql_send_auth_error( dcb, 2, 0, "failed to create new session"); - dcb->func.close(dcb); + + dcb_close(dcb); } } else { protocol->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_client_event] after " + "gw_mysql_do_authentication, fd %d, " + "state = MYSQL_AUTH_FAILED.", + protocol->owner_dcb->fd, + pthread_self()))); + + /** Send ERR 1045 to client */ mysql_send_auth_error( dcb, 2, 0, "Authorization failed"); - dcb->func.close(dcb); + + dcb_close(dcb); } } break; case MYSQL_IDLE: - /* - * Read all the data that is available into a chain of buffers - */ { uint8_t cap = 0; uint8_t *ptr_buff = NULL; @@ -686,14 +671,16 @@ int gw_read_client_event(DCB* dcb) { bool stmt_input; /*< router input type */ session = dcb->session; + ss_dassert( session!= NULL); - // get the backend session, if available - if (session != NULL) { + if (session != NULL) + { CHK_SESSION(session); router = session->service->router; router_instance = session->service->router_instance; rsession = session->router_session; + ss_dassert(rsession != NULL); } /* Now, we are assuming in the first buffer there is @@ -710,9 +697,11 @@ int gw_read_client_event(DCB* dcb) { * COM_QUIT : close client dcb * else : write custom error to client dcb. */ - if(rsession == NULL) { + if(rsession == NULL) + { /** COM_QUIT */ - if (mysql_command == '\x01') { + if (mysql_command == '\x01') + { LOGIF(LD, (skygw_log_write_flush( LOGFILE_DEBUG, "%lu [gw_read_client_event] Client read " @@ -720,8 +709,20 @@ int gw_read_client_event(DCB* dcb) { "client dcb %p.", pthread_self(), dcb))); - (dcb->func).close(dcb); - } else { + /** + * close router session and that closes + * backends + */ + dcb_close(dcb); + } + else + { +#if defined(SS_DEBUG) + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Client read error handling."))); +#endif + /* Send a custom error as MySQL command reply */ mysql_send_custom_error( dcb, @@ -729,16 +730,16 @@ int gw_read_client_event(DCB* dcb) { 0, "Can't route query. Connection to " "backend lost"); - protocol->state = MYSQL_IDLE; } rc = 1; /** Free buffer */ read_buffer = gwbuf_consume(read_buffer, nbytes_read); goto return_rc; } + /** Ask what type of input the router expects */ cap = router->getCapabilities(router_instance, rsession); - + if (cap == 0 || (cap == RCAP_TYPE_PACKET_INPUT)) { stmt_input = false; @@ -756,7 +757,6 @@ int gw_read_client_event(DCB* dcb) { "%lu [gw_read_client_event] Reading router " "capabilities failed.", pthread_self()))); - mysql_send_custom_error(dcb, 1, 0, @@ -765,19 +765,20 @@ int gw_read_client_event(DCB* dcb) { rc = 1; goto return_rc; } - /** Route COM_QUIT to backend */ - if (mysql_command == '\x01') { + if (mysql_command == '\x01') + { + /** + * Sends COM_QUIT packets since buffer is already + * created. A BREF_CLOSED flag is set so dcb_close won't + * send redundant COM_QUIT. + */ SESSION_ROUTE_QUERY(session, read_buffer); - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [gw_read_client_event] Routed COM_QUIT to " - "backend. Close client dcb %p", - pthread_self(), - dcb))); - /** close client connection, closes router session too */ - rc = dcb->func.close(dcb); + /** + * Close router session which causes closing of backends. + */ + dcb_close(dcb); } else { @@ -788,6 +789,7 @@ int gw_read_client_event(DCB* dcb) { * to router. */ rc = route_by_statement(session, read_buffer); + if (read_buffer != NULL) { /** add incomplete mysql packet to read queue */ @@ -804,13 +806,32 @@ int gw_read_client_event(DCB* dcb) { if (rc == 1) { rc = 0; /**< here '0' means success */ } else { - mysql_send_custom_error(dcb, - 1, - 0, - "Query routing failed. " - "Connection to backend " - "lost."); - protocol->state = MYSQL_IDLE; + GWBUF* errbuf; + bool succp; + + errbuf = mysql_create_custom_error( + 1, + 0, + "Write to backend failed. Session closed."); +#if defined(SS_DEBUG) + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Client routing error handling."))); +#endif + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Routing the query failed. " + "Session will be closed."))); + + router->handleError(router_instance, + rsession, + errbuf, + dcb, + ERRACT_REPLY_CLIENT, + &succp); + ss_dassert(!succp); + + dcb_close(dcb); } } goto return_rc; @@ -1170,23 +1191,31 @@ int gw_MySQLAccept(DCB *listener) client_dcb->fd = c_sock; // get client address - if ( client_conn.sa_family == AF_UNIX) { + if ( client_conn.sa_family == AF_UNIX) + { // client address client_dcb->remote = strdup("localhost_from_socket"); // set localhost IP for user authentication (client_dcb->ipv4).sin_addr.s_addr = 0x0100007F; - } else { + } + else + { /* client IPv4 in raw data*/ - memcpy(&client_dcb->ipv4, (struct sockaddr_in *)&client_conn, sizeof(struct sockaddr_in)); + memcpy(&client_dcb->ipv4, + (struct sockaddr_in *)&client_conn, + sizeof(struct sockaddr_in)); /* client IPv4 in string representation */ client_dcb->remote = (char *)calloc(INET_ADDRSTRLEN+1, sizeof(char)); - if (client_dcb->remote != NULL) { - inet_ntop(AF_INET, &(client_dcb->ipv4).sin_addr, client_dcb->remote, INET_ADDRSTRLEN); + + if (client_dcb->remote != NULL) + { + inet_ntop(AF_INET, + &(client_dcb->ipv4).sin_addr, + client_dcb->remote, + INET_ADDRSTRLEN); } } - protocol = mysql_protocol_init(client_dcb, c_sock); - ss_dassert(protocol != NULL); if (protocol == NULL) { @@ -1223,7 +1252,7 @@ int gw_MySQLAccept(DCB *listener) 0, "MaxScale internal error."); - /** delete client_dcb */ + /** close client_dcb */ dcb_close(client_dcb); /** Previous state is recovered in poll_add_dcb. */ @@ -1260,14 +1289,21 @@ return_rc: static int gw_error_client_event( DCB* dcb) - { +{ int rc; + SESSION* session; CHK_DCB(dcb); - - rc = dcb->func.close(dcb); + session = dcb->session; + CHK_SESSION(session); - return rc; +#if defined(SS_DEBUG) + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Client error event handling."))); +#endif + dcb_close(dcb); + return 1; } static int @@ -1301,11 +1337,9 @@ gw_client_close(DCB *dcb) router = session->service->router; router_instance = session->service->router_instance; rsession = session->router_session; - + /** Close router session and all its connections */ router->closeSession(router_instance, rsession); } - dcb_close(dcb); - return 1; } @@ -1320,12 +1354,20 @@ gw_client_close(DCB *dcb) static int gw_client_hangup_event(DCB *dcb) { - int rc; + int rc; + SESSION* session; CHK_DCB(dcb); - rc = dcb->func.close(dcb); + session = dcb->session; + CHK_SESSION(session); - return rc; +#if defined(SS_DEBUG) + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Client hangup error handling."))); +#endif + dcb_close(dcb); + return 1; } diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index 1d0932d7b..b546a0a88 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -126,7 +126,9 @@ void gw_mysql_close(MySQLProtocol **ptr) { * @param conn MySQL protocol structure * @return 0 on success, 1 on failure */ -int gw_read_backend_handshake(MySQLProtocol *conn) { +int gw_read_backend_handshake( + MySQLProtocol *conn) +{ GWBUF *head = NULL; DCB *dcb = conn->owner_dcb; int n = -1; @@ -135,12 +137,14 @@ int gw_read_backend_handshake(MySQLProtocol *conn) { int success = 0; int packet_len = 0; - if ((n = dcb_read(dcb, &head)) != -1) { - if (head) { + if ((n = dcb_read(dcb, &head)) != -1) + { + if (head) + { payload = GWBUF_DATA(head); h_len = gwbuf_length(head); - - /* + + /** * The mysql packets content starts at byte fifth * just return with less bytes */ @@ -148,10 +152,45 @@ int gw_read_backend_handshake(MySQLProtocol *conn) { if (h_len <= 4) { /* log error this exit point */ conn->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_handshake] after " + "dcb_read, fd %d, " + "state = MYSQL_AUTH_FAILED.", + dcb->fd, + pthread_self()))); + return 1; } - //get mysql packet size, 3 bytes + if (payload[4] == 0xff) + { + size_t len = MYSQL_GET_PACKET_LEN(payload); + uint16_t errcode = MYSQL_GET_ERRCODE(payload); + char* bufstr = strndup(&((char *)payload)[7], len-3); + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_receive_backend_auth] Invalid " + "authentication message from backend dcb %p " + "fd %d, ptr[4] = %p, error code %d, msg %s.", + pthread_self(), + dcb, + dcb->fd, + payload[4], + errcode, + bufstr))); + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Invalid authentication message " + "from backend. Error code: %d, Msg : %s", + errcode, + bufstr))); + + free(bufstr); + } + //get mysql packet size, 3 bytes packet_len = gw_mysql_get_byte3(payload); if (h_len < (packet_len + 4)) { @@ -160,6 +199,15 @@ int gw_read_backend_handshake(MySQLProtocol *conn) { * packet. Log error this exit point */ conn->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_handshake] after " + "gw_mysql_get_byte3, fd %d, " + "state = MYSQL_AUTH_FAILED.", + pthread_self(), + dcb->fd, + pthread_self()))); + return 1; } @@ -176,6 +224,15 @@ int gw_read_backend_handshake(MySQLProtocol *conn) { * log error this exit point */ conn->state = MYSQL_AUTH_FAILED; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [gw_read_backend_handshake] after " + "gw_decode_mysql_server_handshake, fd %d, " + "state = MYSQL_AUTH_FAILED.", + pthread_self(), + conn->owner_dcb->fd, + pthread_self()))); + return 1; } @@ -202,7 +259,10 @@ int gw_read_backend_handshake(MySQLProtocol *conn) { * @return 0 on success, < 0 on failure * */ -int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) { +int gw_decode_mysql_server_handshake( + MySQLProtocol *conn, + uint8_t *payload) +{ uint8_t *server_version_end = NULL; uint16_t mysql_server_capabilities_one = 0; uint16_t mysql_server_capabilities_two = 0; @@ -216,8 +276,8 @@ int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) { protocol_version = payload[0]; - if (protocol_version != GW_MYSQL_PROTOCOL_VERSION) { - /* log error for this */ + if (protocol_version != GW_MYSQL_PROTOCOL_VERSION) + { return -1; } @@ -257,19 +317,23 @@ int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) { payload+=2; // get scramble len - if (payload[0] > 0) { + if (payload[0] > 0) + { scramble_len = payload[0] -1; ss_dassert(scramble_len > GW_SCRAMBLE_LENGTH_323); ss_dassert(scramble_len <= GW_MYSQL_SCRAMBLE_SIZE); - if ( (scramble_len < GW_SCRAMBLE_LENGTH_323) || scramble_len > GW_MYSQL_SCRAMBLE_SIZE) { + if ((scramble_len < GW_SCRAMBLE_LENGTH_323) || + scramble_len > GW_MYSQL_SCRAMBLE_SIZE) + { /* log this */ - return -2; + return -2; } - } else { + } + else + { scramble_len = GW_MYSQL_SCRAMBLE_SIZE; } - // skip 10 zero bytes payload += 11; @@ -321,26 +385,27 @@ int gw_receive_backend_auth( } else if (ptr[4] == 0xff) { - size_t packetlen = MYSQL_GET_PACKET_LEN(ptr)+4; - char* bufstr = (char *)calloc(1, packetlen-3); - - snprintf(bufstr, packetlen-6, "%s", &ptr[7]); - + size_t len = MYSQL_GET_PACKET_LEN(ptr); + char* err = strndup(&((char *)ptr)[8], 5); + char* bufstr = strndup(&((char *)ptr)[13], len-4-5); + LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, "%lu [gw_receive_backend_auth] Invalid " "authentication message from backend dcb %p " - "fd %d, ptr[4] = %p, msg %s.", + "fd %d, ptr[4] = %p, error %s, msg %s.", pthread_self(), dcb, dcb->fd, ptr[4], + err, bufstr))); LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Invalid authentication message " - "from backend. Msg : %s", + "from backend. Error : %s, Msg : %s", + err, bufstr))); free(bufstr); @@ -367,7 +432,7 @@ int gw_receive_backend_auth( /*< * Remove data from buffer. */ - head = gwbuf_consume(head, GWBUF_LENGTH(head)); + while ((head = gwbuf_consume(head, GWBUF_LENGTH(head))) != NULL); } else if (n == 0) { @@ -634,8 +699,8 @@ int gw_do_connect_to_backend( LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error: Establishing connection to backend server " - "%s:%d failed.\n\t\t Socket creation failed due " - "%d, %s.", + "%s:%d failed.\n\t\t Socket creation failed " + "due %d, %s.", host, port, eno, @@ -736,6 +801,145 @@ gw_mysql_protocol_state2string (int state) { } } +GWBUF* mysql_create_com_quit( + GWBUF* bufparam, + int packet_number) +{ + uint8_t* data; + GWBUF* buf; + + if (bufparam == NULL) + { + buf = gwbuf_alloc(COM_QUIT_PACKET_SIZE); + } + else + { + buf = bufparam; + } + + if (buf == NULL) + { + return 0; + } + ss_dassert(GWBUF_LENGTH(buf) == COM_QUIT_PACKET_SIZE); + + data = GWBUF_DATA(buf); + + *data++ = 0x1; + *data++ = 0x0; + *data++ = 0x0; + *data++ = packet_number; + *data = 0x1; + + return buf; +} + +int mysql_send_com_quit( + DCB* dcb, + int packet_number, + GWBUF* bufparam) +{ + GWBUF *buf; + int nbytes = 0; + + CHK_DCB(dcb); + ss_dassert(packet_number <= 255); + + if (dcb == NULL || dcb->state == DCB_STATE_ZOMBIE) + { + return 0; + } + if (bufparam == NULL) + { + buf = mysql_create_com_quit(NULL, packet_number); + } + else + { + buf = bufparam; + } + + if (buf == NULL) + { + return 0; + } + nbytes = dcb->func.write(dcb, buf); + + return nbytes; +} + + +GWBUF* mysql_create_custom_error( + int packet_number, + int affected_rows, + const char* msg) +{ + uint8_t* outbuf = NULL; + uint8_t mysql_payload_size = 0; + uint8_t mysql_packet_header[4]; + uint8_t* mysql_payload = NULL; + uint8_t field_count = 0; + uint8_t mysql_err[2]; + uint8_t mysql_statemsg[6]; + unsigned int mysql_errno = 0; + const char* mysql_error_msg = NULL; + const char* mysql_state = NULL; + + GWBUF* errbuf = NULL; + + mysql_errno = 2003; + mysql_error_msg = "An errorr occurred ..."; + mysql_state = "HY000"; + + field_count = 0xff; + gw_mysql_set_byte2(mysql_err, mysql_errno); + mysql_statemsg[0]='#'; + memcpy(mysql_statemsg+1, mysql_state, 5); + + if (msg != NULL) { + mysql_error_msg = msg; + } + + mysql_payload_size = sizeof(field_count) + + sizeof(mysql_err) + + sizeof(mysql_statemsg) + + strlen(mysql_error_msg); + + /** allocate memory for packet header + payload */ + errbuf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size); + ss_dassert(errbuf != NULL); + + if (errbuf == NULL) + { + return 0; + } + outbuf = GWBUF_DATA(errbuf); + + /** write packet header and packet number */ + gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size); + mysql_packet_header[3] = packet_number; + + /** write header */ + memcpy(outbuf, mysql_packet_header, sizeof(mysql_packet_header)); + + mysql_payload = outbuf + sizeof(mysql_packet_header); + + /** write field */ + memcpy(mysql_payload, &field_count, sizeof(field_count)); + mysql_payload = mysql_payload + sizeof(field_count); + + /** write errno */ + memcpy(mysql_payload, mysql_err, sizeof(mysql_err)); + mysql_payload = mysql_payload + sizeof(mysql_err); + + /** write sqlstate */ + memcpy(mysql_payload, mysql_statemsg, sizeof(mysql_statemsg)); + mysql_payload = mysql_payload + sizeof(mysql_statemsg); + + /** write error message */ + memcpy(mysql_payload, mysql_error_msg, strlen(mysql_error_msg)); + + return errbuf; +} /** * mysql_send_custom_error * @@ -749,79 +953,21 @@ gw_mysql_protocol_state2string (int state) { * @return packet length * */ -int -mysql_send_custom_error (DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message) { - uint8_t *outbuf = NULL; - uint8_t mysql_payload_size = 0; - uint8_t mysql_packet_header[4]; - uint8_t *mysql_payload = NULL; - uint8_t field_count = 0; - uint8_t mysql_err[2]; - uint8_t mysql_statemsg[6]; - unsigned int mysql_errno = 0; - const char *mysql_error_msg = NULL; - const char *mysql_state = NULL; +int mysql_send_custom_error ( + DCB *dcb, + int packet_number, + int in_affected_rows, + const char *mysql_message) +{ + GWBUF* buf; + int nbytes; - GWBUF *buf = NULL; - - if (dcb == NULL || - dcb->state != DCB_STATE_POLLING) - { - return 0; - } - mysql_errno = 2003; - mysql_error_msg = "An errorr occurred ..."; - mysql_state = "HY000"; - - field_count = 0xff; - gw_mysql_set_byte2(mysql_err, mysql_errno); - mysql_statemsg[0]='#'; - memcpy(mysql_statemsg+1, mysql_state, 5); - - if (mysql_message != NULL) { - mysql_error_msg = mysql_message; - } - - mysql_payload_size = sizeof(field_count) + sizeof(mysql_err) + sizeof(mysql_statemsg) + strlen(mysql_error_msg); - - // allocate memory for packet header + payload - buf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size); - ss_dassert(buf != NULL); + buf = mysql_create_custom_error(dcb, in_affected_rows, mysql_message); - if (buf == NULL) - { - return 0; - } - outbuf = GWBUF_DATA(buf); - - // write packet header with packet number - gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size); - mysql_packet_header[3] = packet_number; - - // write header - memcpy(outbuf, mysql_packet_header, sizeof(mysql_packet_header)); - - mysql_payload = outbuf + sizeof(mysql_packet_header); - - // write field - memcpy(mysql_payload, &field_count, sizeof(field_count)); - mysql_payload = mysql_payload + sizeof(field_count); - - // write errno - memcpy(mysql_payload, mysql_err, sizeof(mysql_err)); - mysql_payload = mysql_payload + sizeof(mysql_err); - - // write sqlstate - memcpy(mysql_payload, mysql_statemsg, sizeof(mysql_statemsg)); - mysql_payload = mysql_payload + sizeof(mysql_statemsg); - - // write err messg - memcpy(mysql_payload, mysql_error_msg, strlen(mysql_error_msg)); - - // writing data in the Client buffer queue + nbytes = GWBUF_LENGTH(buf); dcb->func.write(dcb, buf); - return sizeof(mysql_packet_header) + mysql_payload_size; + return GWBUF_LENGTH(buf); } /** @@ -1229,7 +1375,12 @@ int gw_find_mysql_user_password_sha1(char *username, uint8_t *gateway_password, * */ int -mysql_send_auth_error (DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message) { +mysql_send_auth_error ( + DCB *dcb, + int packet_number, + int in_affected_rows, + const char *mysql_message) +{ uint8_t *outbuf = NULL; uint8_t mysql_payload_size = 0; uint8_t mysql_packet_header[4]; diff --git a/server/modules/protocol/telnetd.c b/server/modules/protocol/telnetd.c index 86e98f397..e8e8ec7c0 100644 --- a/server/modules/protocol/telnetd.c +++ b/server/modules/protocol/telnetd.c @@ -343,8 +343,7 @@ TELNETD *telnetd = dcb->protocol; if (telnetd && telnetd->username) free(telnetd->username); - dcb_close(dcb); - return 0; + return 0; } /** diff --git a/server/modules/routing/GaleraHACRoute.c b/server/modules/routing/GaleraHACRoute.c index cf3e833ed..33a9597cc 100644 --- a/server/modules/routing/GaleraHACRoute.c +++ b/server/modules/routing/GaleraHACRoute.c @@ -58,12 +58,14 @@ static void GHACloseSession(ROUTER *instance, void *router_session); static void GHAFreeSession(ROUTER *instance, void *router_session); static int GHARouteQuery(ROUTER *instance, void *router_session, GWBUF *queue); static void GHADiagnostics(ROUTER *instance, DCB *dcb); + static void GHAClientReply( ROUTER *instance, void *router_session, GWBUF *queue, DCB *backend_dcb); -static void GHAErrorReply( + +static void GHAHandleError( ROUTER *instance, void *router_session, char *message, @@ -79,7 +81,7 @@ static ROUTER_OBJECT MyObject = { GHARouteQuery, GHADiagnostics, GHAClientReply, - GHAErrorReply + GHAHandleError }; static bool rses_begin_router_action( @@ -489,7 +491,7 @@ DCB* backend_dcb; */ if (backend_dcb != NULL) { CHK_DCB(backend_dcb); - backend_dcb->func.close(backend_dcb); + dcb_close(backend_dcb); } } } @@ -630,10 +632,9 @@ GHAClientReply( } /** - * Error Reply routine + * Error handling routine * - * The routine will reply to client errors and/or closing the session - * or try to open a new backend connection. + * The routine will handle error occurred in backend. * * @param instance The router instance * @param router_session The router session @@ -643,7 +644,7 @@ GHAClientReply( * */ static void -GHAErrorReply( +GHAHandleError( ROUTER *instance, void *router_session, char *message, diff --git a/server/modules/routing/debugcli.c b/server/modules/routing/debugcli.c index bf2404c08..e58a25163 100644 --- a/server/modules/routing/debugcli.c +++ b/server/modules/routing/debugcli.c @@ -295,7 +295,7 @@ CLI_SESSION *session = (CLI_SESSION *)router_session; if (execute_cmd(session)) dcb_printf(session->session->client, "MaxScale> "); else - session->session->client->func.close(session->session->client); + dcb_close(session->session->client); } return 1; } diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index d01e61599..1afbe902a 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -110,12 +110,13 @@ static void clientReply( void *router_session, GWBUF *queue, DCB *backend_dcb); -static void errorReply( +static void handleError( ROUTER *instance, void *router_session, char *message, DCB *backend_dcb, - int action); + int action, + bool *succp); static uint8_t getCapabilities (ROUTER* inst, void* router_session); @@ -128,7 +129,7 @@ static ROUTER_OBJECT MyObject = { routeQuery, diagnostics, clientReply, - errorReply, + handleError, getCapabilities }; @@ -551,7 +552,7 @@ DCB* backend_dcb; */ if (backend_dcb != NULL) { CHK_DCB(backend_dcb); - backend_dcb->func.close(backend_dcb); + dcb_close(backend_dcb); } } } @@ -692,10 +693,9 @@ clientReply( } /** - * Error Reply routine + * Error Handler routine * - * The routine will reply to client errors and/or closing the session - * or try to open a new backend connection. + * The routine will handle errors that occurred in backend writes. * * @param instance The router instance * @param router_session The router session @@ -705,12 +705,13 @@ clientReply( * */ static void -errorReply( +handleError( ROUTER *instance, void *router_session, char *message, DCB *backend_dcb, - int action) + int action, + bool *succp) { DCB *client = NULL; SESSION *session = backend_dcb->session; diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 67139bf02..8086029aa 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -38,6 +38,9 @@ MODULE_INFO info = { ROUTER_VERSION, "A Read/Write splitting router for enhancement read scalability" }; +#if defined(SS_DEBUG) +# include +#endif extern int lm_enabled_logfiles_bitmask; @@ -72,11 +75,26 @@ static void closeSession(ROUTER *instance, void *session); static void freeSession(ROUTER *instance, void *session); static int routeQuery(ROUTER *instance, void *session, GWBUF *queue); static void diagnostic(ROUTER *instance, DCB *dcb); + static void clientReply( ROUTER* instance, void* router_session, GWBUF* queue, DCB* backend_dcb); + +static void handleError( + ROUTER* instance, + void* router_session, + GWBUF* errmsgbuf, + DCB* backend_dcb, + error_action_t action, + bool* succp); + +static void print_error_packet(ROUTER_CLIENT_SES* rses, GWBUF* buf, DCB* dcb); +static int router_get_servercount(ROUTER_INSTANCE* router); +static int rses_get_max_slavecount(ROUTER_CLIENT_SES* rses, int router_nservers); +static backend_ref_t* get_bref_from_dcb(ROUTER_CLIENT_SES* rses, DCB* dcb); + static uint8_t getCapabilities (ROUTER* inst, void* router_session); int bref_cmp_global_conn( @@ -127,7 +145,7 @@ static ROUTER_OBJECT MyObject = { routeQuery, diagnostic, clientReply, - NULL, + handleError, getCapabilities }; static bool rses_begin_locked_router_action( @@ -161,9 +179,15 @@ static void rses_property_done( static mysql_sescmd_t* rses_property_get_sescmd( rses_property_t* prop); +static bool execute_sescmd_history(backend_ref_t* bref); + static bool execute_sescmd_in_backend( backend_ref_t* backend_ref); +static void sescmd_cursor_reset(sescmd_cursor_t* scur); + +static bool sescmd_cursor_history_empty(sescmd_cursor_t* scur); + static void sescmd_cursor_set_active( sescmd_cursor_t* sescmd_cursor, bool value); @@ -183,7 +207,8 @@ static bool sescmd_cursor_next( static GWBUF* sescmd_cursor_process_replies( DCB* client_dcb, GWBUF* replybuf, - sescmd_cursor_t* scur); + sescmd_cursor_t* scur, + bool* has_query); static void tracelog_routed_query( ROUTER_CLIENT_SES* rses, @@ -202,6 +227,11 @@ static void refreshInstance( ROUTER_INSTANCE* router, CONFIG_PARAMETER* param); +static void bref_clear_state(backend_ref_t* bref, bref_state_t state); +static void bref_set_state(backend_ref_t* bref, bref_state_t state); + +static int router_handle_state_switch(DCB* dcb, DCB_REASON reason, void* data); + static SPINLOCK instlock; static ROUTER_INSTANCE* instances; @@ -410,14 +440,12 @@ static void* newSession( SESSION* session) { backend_ref_t* backend_ref; /*< array of backend references (DCB,BACKEND,cursor) */ - backend_ref_t* master_ref = NULL; /*< pointer to selected master */ - BACKEND** b; + backend_ref_t* master_ref = NULL; /*< pointer to selected master */ ROUTER_CLIENT_SES* client_rses = NULL; ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst; bool succp; int router_nservers = 0; /*< # of servers in total */ int max_nslaves; /*< max # of slaves used in this session */ - int conf_max_nslaves; /*< value from configuration file */ int i; const int min_nservers = 1; /*< hard-coded for now */ static uint64_t router_client_ses_seq; /*< ID for client session */ @@ -438,6 +466,7 @@ static void* newSession( * router instance first. */ spinlock_acquire(&router->lock); + if (router->service->svc_config_version > router->rwsplit_version) { CONFIG_PARAMETER* param = router->service->svc_config_param; @@ -463,9 +492,7 @@ static void* newSession( client_rses->rses_autocommit_enabled = true; client_rses->rses_transaction_active = false; - /** count servers */ - b = router->servers; - while (*(b++) != NULL) router_nservers++; + router_nservers = router_get_servercount(router); /** With too few servers session is not created */ if (router_nservers < min_nservers || @@ -543,6 +570,8 @@ static void* newSession( backend_ref[i].bref_sescmd_cur.scmd_cur_chk_top = CHK_NUM_SESCMD_CUR; backend_ref[i].bref_sescmd_cur.scmd_cur_chk_tail = CHK_NUM_SESCMD_CUR; #endif + backend_ref[i].bref_state = 0; + bref_set_state(&backend_ref[i], BREF_NOT_USED); backend_ref[i].bref_backend = router->servers[i]; /** store pointers to sescmd list to both cursors */ backend_ref[i].bref_sescmd_cur.scmd_cur_rses = client_rses; @@ -550,29 +579,18 @@ static void* newSession( backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property = &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL; - } - /** - * Find out the number of read backend servers. - * Depending on the configuration value type, either copy direct count - * of slave connections or calculate the count from percentage value. - */ - if (client_rses->rses_config.rw_max_slave_conn_count > 0) - { - conf_max_nslaves = client_rses->rses_config.rw_max_slave_conn_count; - } - else - { - conf_max_nslaves = - (router_nservers*client_rses->rses_config.rw_max_slave_conn_percent)/100; - } - max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); + } + max_nslaves = rses_get_max_slavecount(client_rses, router_nservers); spinlock_init(&client_rses->rses_lock); client_rses->rses_backend_ref = backend_ref; /** * Find a backend servers to connect to. + * This command requires that rsession's lock is held. */ + rses_begin_locked_router_action(client_rses); + succp = select_connect_backend_servers(&master_ref, backend_ref, router_nservers, @@ -580,6 +598,8 @@ static void* newSession( client_rses->rses_config.rw_slave_select_criteria, session, router); + + rses_end_locked_router_action(client_rses); /** Both Master and at least 1 slave must be found */ if (!succp) { @@ -590,11 +610,12 @@ static void* newSession( } /** Copy backend pointers to router session. */ client_rses->rses_master_ref = master_ref; + ss_dassert(SERVER_IS_MASTER(master_ref->bref_backend->backend_server)); client_rses->rses_backend_ref = backend_ref; client_rses->rses_nbackends = router_nservers; /*< # of backend servers */ client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT; router->stats.n_sessions += 1; - + /** * Version is bigger than zero once initialized. */ @@ -633,7 +654,15 @@ static void closeSession( { ROUTER_CLIENT_SES* router_cli_ses; backend_ref_t* backend_ref; - + + /** + * router session can be NULL if newSession failed and it is discarding + * its connections and DCB's. + */ + if (router_session == NULL) + { + return; + } router_cli_ses = (ROUTER_CLIENT_SES *)router_session; CHK_CLIENT_RSES(router_cli_ses); @@ -667,11 +696,15 @@ static void closeSession( DCB* dcb = backend_ref[i].bref_dcb; /** Close those which had been connected */ - if (dcb != NULL) + if (BREF_IS_IN_USE((&backend_ref[i]))) { CHK_DCB(dcb); - backend_ref[i].bref_dcb = NULL; /*< prevent new uses of DCB */ - dcb->func.close(dcb); + bref_clear_state(&backend_ref[i], BREF_IN_USE); + bref_set_state(&backend_ref[i], BREF_CLOSED); + /** + * closes protocol and dcb + */ + dcb_close(dcb); /** decrease server current connection counters */ atomic_add(&backend_ref[i].bref_backend->backend_server->stats.n_current, -1); atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); @@ -697,12 +730,10 @@ static void freeSession( for (i=0; irses_nbackends; i++) { - if (backend_ref[i].bref_dcb == NULL) + if (!BREF_IS_IN_USE((&backend_ref[i]))) { continue; } - ss_dassert(backend_ref[i].bref_backend->backend_conn_count > 0); - atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); } spinlock_acquire(&router->lock); @@ -747,7 +778,10 @@ static void freeSession( return; } - +/** + * Provide a pointer to a suitable backend dcb. + * Detect failures in server statuses and reselect backends if necessary. + */ static bool get_dcb( DCB** p_dcb, ROUTER_CLIENT_SES* rses, @@ -772,8 +806,7 @@ static bool get_dcb( for (i=0; irses_nbackends; i++) { BACKEND* b = backend_ref[i].bref_backend; - - if (backend_ref[i].bref_dcb != NULL && + if (BREF_IS_IN_USE((&backend_ref[i])) && SERVER_IS_SLAVE(b->backend_server) && (smallest_nconn == -1 || b->backend_conn_count < smallest_nconn)) @@ -787,8 +820,8 @@ static bool get_dcb( if (!succp) { backend_ref = rses->rses_master_ref; - - if (backend_ref->bref_dcb != NULL) + + if (BREF_IS_IN_USE(backend_ref)) { *p_dcb = backend_ref->bref_dcb; succp = true; @@ -814,7 +847,7 @@ static bool get_dcb( { BACKEND* b = backend_ref[i].bref_backend; - if (backend_ref[i].bref_dcb != NULL && + if (BREF_IS_IN_USE((&backend_ref[i])) && (SERVER_IS_MASTER(b->backend_server))) { *p_dcb = backend_ref[i].bref_dcb; @@ -842,7 +875,13 @@ return_succp: * @param session The session associated with the client * @param queue Gateway buffer queue with the packets received * - * @return The number of queries forwarded + * @return if succeed 1, otherwise 0 + * If routeQuery fails, it means that router session has failed. + * In any tolerated failure, handleError is called and if necessary, + * an error message is sent to the client. + * + * For now, routeQuery don't tolerate errors, so any error will close + * the session. vraa 14.6.14 */ static int routeQuery( ROUTER* instance, @@ -875,17 +914,24 @@ static int routeQuery( if (rses_is_closed) { - LOGIF(LE, - (skygw_log_write_flush( - LOGFILE_ERROR, - "Error: Failed to route %s:%s:\"%s\" to " - "backend server. %s.", - STRPACKETTYPE(packet_type), - STRQTYPE(qtype), - (querystr == NULL ? "(empty)" : querystr), - (rses_is_closed ? "Router was closed" : - "Router has no backend servers where to " - "route to")))); + /** + * COM_QUIT may have sent by client and as a part of backend + * closing procedure. + */ + if (packet_type != COM_QUIT) + { + LOGIF(LE, + (skygw_log_write_flush( + LOGFILE_ERROR, + "Error: Failed to route %s:%s:\"%s\" to " + "backend server. %s.", + STRPACKETTYPE(packet_type), + STRQTYPE(qtype), + (querystr == NULL ? "(empty)" : querystr), + (rses_is_closed ? "Router was closed" : + "Router has no backend servers where to " + "route to")))); + } goto return_ret; } inst->stats.n_queries++; @@ -938,23 +984,7 @@ static int routeQuery( default: break; } /**< switch by packet type */ -#if 0 - LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, - "String\t\"%s\"", - querystr == NULL ? "(empty)" : querystr))); - LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, - "Packet type\t%s", - STRPACKETTYPE(packet_type)))); -#endif -#if defined(AUTOCOMMIT_OPT) - if ((QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT) && - !router_cli_ses->rses_autocommit_enabled) || - (QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) && - router_cli_ses->rses_autocommit_enabled)) - { - /** reply directly to client */ - } -#endif + /** * If autocommit is disabled or transaction is explicitly started * transaction becomes active and master gets all statements until @@ -996,6 +1026,10 @@ static int routeQuery( */ if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE)) { + /** + * It is not sure if the session command in question requires + * response. Statement must be examined in route_session_write. + */ bool succp = route_session_write( router_cli_ses, querybuf, @@ -1007,8 +1041,6 @@ static int routeQuery( { ret = 1; } - ss_dassert(succp); - ss_dassert(ret == 1); goto return_ret; } else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) && @@ -1036,6 +1068,12 @@ static int routeQuery( if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1) { atomic_add(&inst->stats.n_slave, 1); + /** + * This backend_ref waits resultset, flag it. + */ + bref_set_state(get_bref_from_dcb(router_cli_ses, + slave_dcb), + BREF_WAITING_RESULT); } else { @@ -1053,7 +1091,7 @@ static int routeQuery( else { bool succp = true; - + if (LOG_IS_ENABLED(LOGFILE_TRACE)) { if (router_cli_ses->rses_transaction_active) /*< all to master */ @@ -1086,6 +1124,12 @@ static int routeQuery( if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1) { atomic_add(&inst->stats.n_master, 1); + /** + * This backend_ref waits reply to write stmt, + * flag it. + */ + bref_set_state(get_bref_from_dcb(router_cli_ses, master_dcb), + BREF_WAITING_RESULT); } } rses_end_locked_router_action(router_cli_ses); @@ -1136,6 +1180,7 @@ static bool rses_begin_locked_router_action( CHK_CLIENT_RSES(rses); if (rses->rses_closed) { + goto return_succp; } spinlock_acquire(&rses->rses_lock); @@ -1146,10 +1191,6 @@ static bool rses_begin_locked_router_action( succp = true; return_succp: - if (!succp) - { - /** log that router session was closed */ - } return succp; } @@ -1250,9 +1291,7 @@ static void clientReply( */ if (!rses_begin_locked_router_action(router_cli_ses)) { - while ((writebuf = gwbuf_consume( - writebuf, - GWBUF_LENGTH(writebuf))) != NULL); + print_error_packet(router_cli_ses, writebuf, backend_dcb); goto lock_failed; } /** Holding lock ensures that router session remains open */ @@ -1308,10 +1347,17 @@ static void clientReply( */ if (sescmd_cursor_is_active(scur)) { + bool has_query; writebuf = sescmd_cursor_process_replies(client_dcb, writebuf, - scur); + scur, + &has_query); + if (has_query) + { + bref_clear_state(backend_ref, BREF_WAITING_RESULT); + } } + /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); @@ -1327,6 +1373,7 @@ static void clientReply( pthread_self(), client_dcb, backend_dcb))); + bref_clear_state(backend_ref, BREF_WAITING_RESULT); } lock_failed: @@ -1364,6 +1411,20 @@ int bref_cmp_behind_master( return 1; } +static void bref_clear_state( + backend_ref_t* bref, + bref_state_t state) +{ + bref->bref_state &= ~state; +} + +static void bref_set_state( + backend_ref_t* bref, + bref_state_t state) +{ + bref->bref_state |= state; +} + /** * @node Search suitable backend servers from those of router instance. * @@ -1393,6 +1454,8 @@ int bref_cmp_behind_master( * * @details It is assumed that there is only one master among servers of * a router instance. As a result, the first master found is chosen. + * There will possibly be more backend references than connected backends + * because only those in correct state are connected to. */ static bool select_connect_backend_servers( backend_ref_t** p_master_ref, @@ -1402,10 +1465,10 @@ static bool select_connect_backend_servers( select_criteria_t select_criteria, SESSION* session, ROUTER_INSTANCE* router) -{ +{ bool succp = true; - bool master_found = false; - bool master_connected = false; + bool master_found; + bool master_connected; int slaves_found = 0; int slaves_connected = 0; int i; @@ -1419,6 +1482,36 @@ static bool select_connect_backend_servers( succp = false; goto return_succp; } + + /** Master is already chosen and connected. This is slave failure case */ + if (*p_master_ref != NULL && + BREF_IS_IN_USE((*p_master_ref))) + { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [select_connect_backend_servers] Master %p fd %d found.", + pthread_self(), + (*p_master_ref)->bref_dcb, + (*p_master_ref)->bref_dcb->fd))); + + master_found = true; + master_connected = true; + ss_dassert(SERVER_IS_MASTER((*p_master_ref)->bref_backend->backend_server)); + } + /** New session or master failure case */ + else + { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [select_connect_backend_servers] Didn't find master ", + "for session %p rses %p.", + pthread_self(), + session, + backend_ref))); + + master_found = false; + master_connected = false; + } /** Check slave selection criteria and set compare function */ p = criteria_cmpfun[select_criteria]; @@ -1431,31 +1524,37 @@ static bool select_connect_backend_servers( if (router->bitvalue != 0) /*< 'synced' is the only bitvalue in rwsplit */ { is_synced_master = true; - } + } else { is_synced_master = false; - } - -#if 0 + } + +#if defined(EXTRA_SS_DEBUG) LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns before ordering:"))); + for (i=0; ibackend_server->name, b->backend_server->port, b->backend_conn_count))); } #endif + ss_dassert(!master_connected || + SERVER_IS_MASTER((*p_master_ref)->bref_backend->backend_server)); /** * Sort the pointer list to servers according to connection counts. As * a consequence those backends having least connections are in the * beginning of the list. */ - qsort((void *)backend_ref, (size_t)router_nservers, sizeof(backend_ref_t), p); + qsort(backend_ref, (size_t)router_nservers, sizeof(backend_ref_t), p); if (LOG_IS_ENABLED(LOGFILE_TRACE)) { @@ -1495,7 +1594,7 @@ static bool select_connect_backend_servers( } } /** - * Choose at least 1+1 (master and slave) and at most 1+max_nslaves + * Choose at least 1+min_nslaves (master and slave) and at most 1+max_nslaves * servers from the sorted list. First master found is selected. */ for (i=0; @@ -1503,18 +1602,17 @@ static bool select_connect_backend_servers( i++) { BACKEND* b = backend_ref[i].bref_backend; - + LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, - "%lu [select_backend_servers] Examine server " - "%s:%d with %d connections. Status is %d, " + "Examine server " + "%s:%d %s with %d connections. " "router->bitvalue is %d", - pthread_self(), b->backend_server->name, b->backend_server->port, + STRSRVSTATUS(b->backend_server), b->backend_conn_count, - b->backend_server->status, - router->bitmask))); + router->bitmask))); if (SERVER_IS_RUNNING(b->backend_server) && ((b->backend_server->status & router->bitmask) == @@ -1524,37 +1622,72 @@ static bool select_connect_backend_servers( SERVER_IS_SLAVE(b->backend_server)) { slaves_found += 1; - backend_ref[i].bref_dcb = dcb_connect( - b->backend_server, - session, - b->backend_server->protocol); - if (backend_ref[i].bref_dcb != NULL) + /** Slave is already connected */ + if (BREF_IS_IN_USE((&backend_ref[i]))) { slaves_connected += 1; - /** - * Increase backend connection counter. - * Server's stats are _increased_ in - * dcb.c:dcb_alloc ! - * But decreased in the calling function - * of dcb_close. - */ - atomic_add(&b->backend_conn_count, 1); } + /** New slave connection is taking place */ else { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Unable to establish " - "connection with slave %s:%d", - b->backend_server->name, - b->backend_server->port))); - /* handle connect error */ + backend_ref[i].bref_dcb = dcb_connect( + b->backend_server, + session, + b->backend_server->protocol); + + if (backend_ref[i].bref_dcb != NULL) + { + slaves_connected += 1; + /** + * Start executing session command + * history. + */ + execute_sescmd_history(&backend_ref[i]); + /** + * Callback which is called when + * node fails. + */ + dcb_add_callback( + backend_ref[i].bref_dcb, + DCB_REASON_NOT_RESPONDING, + &router_handle_state_switch, + (void *)&backend_ref[i]); + bref_clear_state(&backend_ref[i], + BREF_CLOSED); + bref_clear_state(&backend_ref[i], + BREF_NOT_USED); + bref_set_state(&backend_ref[i], + BREF_IN_USE); + /** + * Increase backend connection counter. + * Server's stats are _increased_ in + * dcb.c:dcb_alloc ! + * But decreased in the calling function + * of dcb_close. + */ + atomic_add(&b->backend_conn_count, 1); + } + else + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Unable to establish " + "connection with slave %s:%d", + b->backend_server->name, + b->backend_server->port))); + /* handle connect error */ + } } } - else if (!master_connected && - (SERVER_IS_MASTER(b->backend_server))) + else if (SERVER_IS_MASTER(b->backend_server)) { + *p_master_ref = &backend_ref[i]; + + if (master_connected) + { + continue; + } master_found = true; backend_ref[i].bref_dcb = dcb_connect( @@ -1565,7 +1698,18 @@ static bool select_connect_backend_servers( if (backend_ref[i].bref_dcb != NULL) { master_connected = true; - *p_master_ref = &backend_ref[i]; + + dcb_add_callback( + backend_ref[i].bref_dcb, + DCB_REASON_NOT_RESPONDING, + &router_handle_state_switch, + (void *)&backend_ref[i]); + + bref_clear_state(&backend_ref[i], + BREF_NOT_USED); + bref_set_state(&backend_ref[i], + BREF_IN_USE); + /** Increase backend connection counter */ /** Increase backend connection counter */ atomic_add(&b->backend_server->stats.n_current, 1); @@ -1583,10 +1727,30 @@ static bool select_connect_backend_servers( b->backend_server->port))); /* handle connect error */ } - } + } } } /*< for */ +#if defined(EXTRA_SS_DEBUG) + LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, "Servers and conns after ordering:"))); + + for (i=0; ibackend_server->name, + b->backend_server->port, + b->backend_conn_count))); + } + ss_dassert(!master_connected || + SERVER_IS_MASTER((*p_master_ref)->bref_backend->backend_server)); +#endif + /** * Successful cases */ @@ -1644,8 +1808,8 @@ static bool select_connect_backend_servers( for (i=0; ibackend_conn_count > 0); + /** disconnect opened connections */ - backend_ref[i].bref_dcb->func.close(backend_ref[i].bref_dcb); + dcb_close(backend_ref[i].bref_dcb); + bref_clear_state(&backend_ref[i], BREF_IN_USE); + bref_set_state(&backend_ref[i], BREF_NOT_USED); + atomic_add(&backend_ref[i].bref_backend->backend_conn_count, -1); } } @@ -1930,7 +2100,8 @@ static void mysql_sescmd_done( static GWBUF* sescmd_cursor_process_replies( DCB* client_dcb, GWBUF* replybuf, - sescmd_cursor_t* scur) + sescmd_cursor_t* scur, + bool* has_query) { const size_t headerlen = 4; /*< mysql packet header */ uint8_t* packet; @@ -1946,7 +2117,7 @@ static GWBUF* sescmd_cursor_process_replies( /** * Walk through packets in the message and the list of session - *commands. + * commands. */ while (scmd != NULL && replybuf != NULL) { @@ -1960,32 +2131,11 @@ static GWBUF* sescmd_cursor_process_replies( packet = (uint8_t *)GWBUF_DATA(replybuf); packetlen = packet[0]+packet[1]*256+packet[2]*256*256; replybuf = gwbuf_consume(replybuf, packetlen+headerlen); -/* - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [sescmd_cursor_process_replies] cmd %p " - "is already replied. Discarded %d bytes from " - "the %s replybuffer.", - pthread_self(), - scmd, - packetlen+headerlen, - STRBETYPE(scur->scmd_cur_be_type)))); - */ } else { /** Mark the rest session commands as replied */ scmd->my_sescmd_is_replied = true; - /* - LOGIF(LT, (skygw_log_write_flush( - LOGFILE_TRACE, - "%lu [sescmd_cursor_process_replies] Marked " - "cmd %p to as replied. Left message to %s's " - "buffer for reply.", - pthread_self(), - scmd, - STRBETYPE(scur->scmd_cur_be_type)))); - */ } if (sescmd_cursor_next(scur)) @@ -1998,7 +2148,9 @@ static GWBUF* sescmd_cursor_process_replies( /** All session commands are replied */ scur->scmd_cur_active = false; } - } + } + /** vraa:this is set but only because there's not yet way to find out */ + *has_query = false; ss_dassert(replybuf == NULL || *scur->scmd_cur_ptr_property == NULL); return replybuf; @@ -2063,6 +2215,64 @@ static GWBUF* sescmd_cursor_clone_querybuf( return buf; } +static bool sescmd_cursor_history_empty( + sescmd_cursor_t* scur) +{ + bool succp; + + CHK_SESCMD_CUR(scur); + + if (scur->scmd_cur_rses->rses_properties[RSES_PROP_TYPE_SESCMD] == NULL) + { + succp = true; + } + else + { + succp = false; + } + + return succp; +} + + +static void sescmd_cursor_reset( + sescmd_cursor_t* scur) +{ + ROUTER_CLIENT_SES* rses; + CHK_SESCMD_CUR(scur); + CHK_CLIENT_RSES(scur->scmd_cur_rses); + rses = scur->scmd_cur_rses; + + scur->scmd_cur_ptr_property = &rses->rses_properties[RSES_PROP_TYPE_SESCMD]; + + CHK_RSES_PROP((*scur->scmd_cur_ptr_property)); + scur->scmd_cur_active = false; + scur->scmd_cur_cmd = &(*scur->scmd_cur_ptr_property)->rses_prop_data.sescmd; +} + +static bool execute_sescmd_history( + backend_ref_t* bref) +{ + bool succp; + sescmd_cursor_t* scur; + CHK_BACKEND_REF(bref); + + scur = &bref->bref_sescmd_cur; + CHK_SESCMD_CUR(scur); + + if (!sescmd_cursor_history_empty(scur)) + { + sescmd_cursor_reset(scur); + succp = execute_sescmd_in_backend(bref); + } + else + { + succp = true; + } + + return succp; +} + /** * If session command cursor is passive, sends the command to backend for * execution. @@ -2080,8 +2290,8 @@ static bool execute_sescmd_in_backend( bool succp = true; int rc = 0; sescmd_cursor_t* scur; - - if (backend_ref->bref_dcb == NULL) + + if (BREF_IS_CLOSED(backend_ref)) { goto return_succp; } @@ -2135,9 +2345,16 @@ static bool execute_sescmd_in_backend( "%lu [execute_sescmd_in_backend] Routed %s cmd %p.", pthread_self(), STRPACKETTYPE(scur->scmd_cur_cmd->my_sescmd_packet_type), - scur->scmd_cur_cmd))); - - if (rc != 1) + scur->scmd_cur_cmd))); + + if (rc == 1) + { + /** + * All but COM_QUIT cause backend to send reply. flag backend_ref. + */ + bref_set_state(backend_ref, BREF_WAITING_RESULT); + } + else { succp = false; } @@ -2355,8 +2572,8 @@ static bool route_session_write( for (i=0; irses_nbackends; i++) { DCB* dcb = backend_ref[i].bref_dcb; - - if (dcb != NULL) + + if (BREF_IS_IN_USE((&backend_ref[i]))) { rc = dcb->func.write(dcb, gwbuf_clone(querybuf)); @@ -2387,16 +2604,22 @@ static bool route_session_write( } /** Add sescmd property to router client session */ rses_property_add(router_cli_ses, prop); - + for (i=0; irses_nbackends; i++) { - succp = execute_sescmd_in_backend(&backend_ref[i]); + if (BREF_IS_IN_USE((&backend_ref[i]))) + { + succp = execute_sescmd_in_backend(&backend_ref[i]); - if (!succp) - { - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); - goto return_succp; + if (!succp) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Failed to execute session " + "command in %s:%d", + backend_ref[i].bref_backend->backend_server->name, + backend_ref[i].bref_backend->backend_server->port))); + } } } /** Unlock router session */ @@ -2457,3 +2680,300 @@ static void rwsplit_process_options( } } /*< for */ } + +/** + * Error Handler routine to resolve backend failures. If it succeeds then there + * are enough operative backends available and connected. Otherwise it fails, + * and session is terminated. + * + * @param instance The router instance + * @param router_session The router session + * @param message The error message to reply + * @param backend_dcb The backend DCB + * @param action The action: REPLY, REPLY_AND_CLOSE, NEW_CONNECTION + * @param succp Result of action. + * + * Even if succp == true connecting to new slave may have failed. succp is to + * tell whether router has enough master/slave connections to continue work. + */ +static void handleError ( + ROUTER* instance, + void* router_session, + GWBUF* errmsgbuf, + DCB* backend_dcb, + error_action_t action, + bool* succp) +{ + DCB* client_dcb; + SESSION* session; + ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; + ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session; + + CHK_DCB(backend_dcb); +#if defined(SS_DEBUG) + backend_dcb->dcb_errhandle_called = true; +#endif + session = backend_dcb->session; + CHK_SESSION(session); + + switch (action) { + case ERRACT_NEW_CONNECTION: + { + int router_nservers; + int max_nslaves; + backend_ref_t* bref; + + CHK_CLIENT_RSES(rses); + + if (!rses_begin_locked_router_action(rses)) + { + *succp = false; + return; + } + + bref = get_bref_from_dcb(rses, backend_dcb); + + /** failed DCB has already been replaced */ + if (bref == NULL) + { + rses_end_locked_router_action(rses); + *succp = true; + return; + } + /** + * Error handler is already called for this DCB because + * it's not polling anymore. It can be assumed that + * it succeed because rses isn't closed. + */ + if (backend_dcb->state != DCB_STATE_POLLING) + { + rses_end_locked_router_action(rses); + *succp = true; + return; + } + + CHK_BACKEND_REF(bref); + + if (BREF_IS_WAITING_RESULT(bref)) + { + DCB* client_dcb; + client_dcb = session->client; + client_dcb->func.write(client_dcb, errmsgbuf); + bref_clear_state(bref, BREF_WAITING_RESULT); + } + bref_clear_state(bref, BREF_IN_USE); + bref_set_state(bref, BREF_NOT_USED); + bref_set_state(bref, BREF_CLOSED); + /** + * Remove callback because this DCB won't be used + * unless it is reconnected later, and then the callback + * is set again. + */ + dcb_remove_callback(backend_dcb, + DCB_REASON_NOT_RESPONDING, + &router_handle_state_switch, + (void *)bref); + + router_nservers = router_get_servercount(inst); + max_nslaves = rses_get_max_slavecount(rses, router_nservers); + /** + * Try to get replacement slave or at least the minimum + * number of slave connections for router session. + */ + *succp = select_connect_backend_servers( + &rses->rses_master_ref, + rses->rses_backend_ref, + router_nservers, + max_nslaves, + rses->rses_config.rw_slave_select_criteria, + session, + inst); + + rses_end_locked_router_action(rses); + break; + } + + case ERRACT_REPLY_CLIENT: + { + session_state_t sesstate; + + spinlock_acquire(&session->ses_lock); + sesstate = session->state; + client_dcb = session->client; + spinlock_release(&session->ses_lock); + + if (sesstate == SESSION_STATE_ROUTER_READY) + { + CHK_DCB(client_dcb); + client_dcb->func.write(client_dcb, errmsgbuf); + } + succp = false; /** false because new servers aren's selected. */ + break; + } + + default: + *succp = false; + break; + } +} + +static void print_error_packet( + ROUTER_CLIENT_SES* rses, + GWBUF* buf, + DCB* dcb) +{ +#if defined(SS_DEBUG) + if (buf->gwbuf_type == GWBUF_TYPE_MYSQL) + { + while (gwbuf_length(buf) > 0) + { + /** + * This works with MySQL protocol only ! + * Protocol specific packet print functions would be nice. + */ + uint8_t* ptr = GWBUF_DATA(buf); + size_t len = MYSQL_GET_PACKET_LEN(ptr); + + if (MYSQL_GET_COMMAND(ptr) == 0xff) + { + SERVER* srv = NULL; + backend_ref_t* bref = rses->rses_backend_ref; + int i; + char* bufstr; + + for (i=0; irses_nbackends; i++) + { + if (bref[i].bref_dcb == dcb) + { + srv = bref[i].bref_backend->backend_server; + } + } + ss_dassert(srv != NULL); + + bufstr = strndup(&ptr[7], len-3); + + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Backend server %s:%d responded with " + "error : %s", + srv->name, + srv->port, + bufstr))); + free(bufstr); + } + buf = gwbuf_consume(buf, len+4); + } + } + else + { + while ((buf = gwbuf_consume(buf, GWBUF_LENGTH(buf))) != NULL); + } +#endif +} + +static int router_get_servercount( + ROUTER_INSTANCE* inst) +{ + int router_nservers = 0; + BACKEND** b = inst->servers; + /** count servers */ + while (*(b++) != NULL) router_nservers++; + + return router_nservers; +} + +/** + * Find out the number of read backend servers. + * Depending on the configuration value type, either copy direct count + * of slave connections or calculate the count from percentage value. + */ +static int rses_get_max_slavecount( + ROUTER_CLIENT_SES* rses, + int router_nservers) +{ + int conf_max_nslaves; + int max_nslaves; + + CHK_CLIENT_RSES(rses); + + if (rses->rses_config.rw_max_slave_conn_count > 0) + { + conf_max_nslaves = rses->rses_config.rw_max_slave_conn_count; + } + else + { + conf_max_nslaves = + (router_nservers*rses->rses_config.rw_max_slave_conn_percent)/100; + } + max_nslaves = MIN(router_nservers-1, MAX(1, conf_max_nslaves)); + + return max_nslaves; +} + +static backend_ref_t* get_bref_from_dcb( + ROUTER_CLIENT_SES* rses, + DCB* dcb) +{ + backend_ref_t* bref; + int i = 0; + CHK_DCB(dcb); + CHK_CLIENT_RSES(rses); + + bref = rses->rses_backend_ref; + + while (irses_nbackends) + { + if (bref->bref_dcb == dcb) + { + break; + } + bref++; + i += 1; + } + + if (i == rses->rses_nbackends) + { + bref = NULL; + } + return bref; +} + +static int router_handle_state_switch( + DCB* dcb, + DCB_REASON reason, + void* data) +{ + backend_ref_t* bref; + int rc = 1; + ROUTER_CLIENT_SES* rses; + SESSION* ses; + SERVER* srv; + + CHK_DCB(dcb); + bref = (backend_ref_t *)data; + CHK_BACKEND_REF(bref); + + srv = bref->bref_backend->backend_server; + + if (SERVER_IS_RUNNING(srv) && SERVER_IS_IN_CLUSTER(srv)) + { + goto return_rc; + } + ses = dcb->session; + CHK_SESSION(ses); + + rses = (ROUTER_CLIENT_SES *)dcb->session->router_session; + CHK_CLIENT_RSES(rses); + + switch (reason) { + case DCB_REASON_NOT_RESPONDING: + dcb->func.hangup(dcb); + break; + + default: + break; + } + +return_rc: + return rc; +}