Merge branch 'develop' into MAX-65

This commit is contained in:
Mark Riddoch
2014-06-18 17:47:21 +01:00
36 changed files with 2010 additions and 856 deletions

View File

@ -18,10 +18,19 @@
# Date Who Description # Date Who Description
# 13/06/14 Mark Riddoch Initial implementation of MaxScale # 13/06/14 Mark Riddoch Initial implementation of MaxScale
# client program # client program
# 18/06/14 Mark Riddoch Addition of conditional for histedit
ifeq ($(wildcard /usr/include/histedit.h), )
HISTLIB=
HISTFLAG=
else
HISTLIB=-ledit
HISTFLAG=-DHISTORY
endif
CC=cc CC=cc
CFLAGS=-c -Wall -g CFLAGS=-c -Wall -g $(HISTFLAG)
SRCS= maxadmin.c SRCS= maxadmin.c
@ -29,7 +38,7 @@ HDRS=
OBJ=$(SRCS:.c=.o) OBJ=$(SRCS:.c=.o)
LIBS=-ledit LIBS=$(HISTLIB)
all: maxadmin all: maxadmin

View File

@ -24,6 +24,7 @@
* *
* Date Who Description * Date Who Description
* 13/06/14 Mark Riddoch Initial implementation * 13/06/14 Mark Riddoch Initial implementation
* 15/06/14 Mark Riddoch Addition of source command
* *
* @endverbatim * @endverbatim
*/ */
@ -45,13 +46,17 @@
#include <locale.h> #include <locale.h>
#include <errno.h> #include <errno.h>
#ifdef HISTORY
#include <histedit.h> #include <histedit.h>
#endif
static int connectMaxScale(char *hostname, char *port); static int connectMaxScale(char *hostname, char *port);
static int setipaddress(struct in_addr *a, char *p); static int setipaddress(struct in_addr *a, char *p);
static int authMaxScale(int so, char *user, char *password); static int authMaxScale(int so, char *user, char *password);
static int sendCommand(int so, char *cmd); static int sendCommand(int so, char *cmd);
static void DoSource(int so, char *cmd);
#ifdef HISTORY
static char * static char *
prompt(EditLine *el __attribute__((__unused__))) prompt(EditLine *el __attribute__((__unused__)))
{ {
@ -59,17 +64,22 @@ prompt(EditLine *el __attribute__((__unused__)))
return prompt; return prompt;
} }
#endif
int int
main(int argc, char **argv) main(int argc, char **argv)
{ {
EditLine *el = NULL;
int i, num, rv, fatal = 0; int i, num, rv, fatal = 0;
#ifdef HISTORY
char *buf; char *buf;
EditLine *el = NULL;
Tokenizer *tok; Tokenizer *tok;
History *hist; History *hist;
HistEvent ev; HistEvent ev;
const LineInfo *li; const LineInfo *li;
#else
char buf[1024];
#endif
char *hostname = "localhost"; char *hostname = "localhost";
char *port = "6603"; char *port = "6603";
char *user = "admin"; char *user = "admin";
@ -194,7 +204,7 @@ char *cmd;
} }
(void) setlocale(LC_CTYPE, ""); (void) setlocale(LC_CTYPE, "");
#ifdef HISTORY
hist = history_init(); /* Init the builtin history */ hist = history_init(); /* Init the builtin history */
/* Remember 100 events */ /* Remember 100 events */
history(hist, &ev, H_SETSIZE, 100); history(hist, &ev, H_SETSIZE, 100);
@ -225,12 +235,19 @@ char *cmd;
while ((buf = el_gets(el, &num)) != NULL && num != 0) while ((buf = el_gets(el, &num)) != NULL && num != 0)
{ {
#else
while (printf("MaxScale> ") && fgets(buf, 1024, stdin) != NULL)
{
num = strlen(buf);
#endif
/* Strip trailing \n\r */ /* Strip trailing \n\r */
for (i = num - 1; buf[i] == '\r' || buf[i] == '\n'; i--) for (i = num - 1; buf[i] == '\r' || buf[i] == '\n'; i--)
buf[i] = 0; buf[i] = 0;
#ifdef HISTORY
li = el_line(el); li = el_line(el);
history(hist, &ev, H_ENTER, buf); history(hist, &ev, H_ENTER, buf);
#endif
if (!strcasecmp(buf, "quit")) if (!strcasecmp(buf, "quit"))
{ {
@ -238,10 +255,18 @@ char *cmd;
} }
else if (!strcasecmp(buf, "history")) else if (!strcasecmp(buf, "history"))
{ {
#ifdef HISTORY
for (rv = history(hist, &ev, H_LAST); rv != -1; for (rv = history(hist, &ev, H_LAST); rv != -1;
rv = history(hist, &ev, H_PREV)) rv = history(hist, &ev, H_PREV))
fprintf(stdout, "%4d %s\n", fprintf(stdout, "%4d %s\n",
ev.num, ev.str); ev.num, ev.str);
#else
fprintf(stderr, "History not supported in this version.\n");
#endif
}
else if (!strncasecmp(buf, "source", 6))
{
DoSource(so, buf);
} }
else if (*buf) else if (*buf)
{ {
@ -249,9 +274,11 @@ char *cmd;
} }
} }
#ifdef HISTORY
el_end(el); el_end(el);
tok_end(tok); tok_end(tok);
history_end(hist); history_end(hist);
#endif
close(so); close(so);
return 0; return 0;
} }
@ -371,3 +398,44 @@ int i;
} }
return 1; return 1;
} }
static void
DoSource(int so, char *buf)
{
char *ptr, *pe;
char line[132];
FILE *fp;
/* Find the filename */
ptr = &buf[strlen("source")];
while (*ptr && isspace(*ptr))
ptr++;
if ((fp = fopen(ptr, "r")) == NULL)
{
fprintf(stderr, "Unable to open command file '%s'.\n",
ptr);
return;
}
while ((ptr = fgets(line, 132, fp)) != NULL)
{
/* Strip tailing newlines */
pe = &ptr[strlen(ptr)-1];
while (pe >= ptr && (*pe == '\r' || *pe == '\n'))
{
*pe = '\0';
pe--;
}
if (*ptr != '#') /* Comment */
{
if (! sendCommand(so, ptr))
{
break;
}
}
}
fclose(fp);
return;
}

View File

@ -1,7 +1,7 @@
%define _topdir %(echo $PWD)/ %define _topdir %(echo $PWD)/
%define name maxscale %define name maxscale
%define release ##RELEASE_TAG## %define release 1
%define version ##VERSION_TAG## %define version 0.7
%define install_path /usr/local/sbin/ %define install_path /usr/local/sbin/
BuildRoot: %{buildroot} BuildRoot: %{buildroot}
@ -14,7 +14,10 @@ Source: %{name}-%{version}-%{release}.tar.gz
Prefix: / Prefix: /
Group: Development/Tools Group: Development/Tools
#Requires: #Requires:
BuildRequires: gcc gcc-c++ ncurses-devel bison glibc-devel cmake libgcc perl make libtool openssl-devel libaio MariaDB-devel MariaDB-server BuildRequires: gcc gcc-c++ ncurses-devel bison glibc-devel cmake libgcc perl make libtool openssl-devel libaio libaio-devel MariaDB-devel MariaDB-server
%if 0%{?rhel} == 6
BuildRequires: libedit-devel
%endif
%description %description
MaxScale MaxScale
@ -24,7 +27,7 @@ MaxScale
%setup -q %setup -q
%build %build
ln -s /lib64/libaio.so.1 /lib64/libaio.so #ln -s /lib64/libaio.so.1 /lib64/libaio.so
make ROOT_PATH=`pwd` HOME="" $DEBUG_FLAG1 $DEBUG_FLAG2 clean make ROOT_PATH=`pwd` HOME="" $DEBUG_FLAG1 $DEBUG_FLAG2 clean
make ROOT_PATH=`pwd` HOME="" $DEBUG_FLAG1 $DEBUG_FLAG2 depend make ROOT_PATH=`pwd` HOME="" $DEBUG_FLAG1 $DEBUG_FLAG2 depend
make ROOT_PATH=`pwd` HOME="" $DEBUG_FLAG1 $DEBUG_FLAG2 make ROOT_PATH=`pwd` HOME="" $DEBUG_FLAG1 $DEBUG_FLAG2

View File

@ -116,9 +116,7 @@ skygw_query_type_t skygw_query_classifier_get_type(
query_str = const_cast<char*>(query); query_str = const_cast<char*>(query);
LOGIF(LT, (skygw_log_write( LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [skygw_query_classifier_get_type] Query : \"%s\"", "Query : \"%s\"", query_str)));
pthread_self(),
query_str)));
/** Get server handle */ /** Get server handle */
mysql = mysql_init(NULL); mysql = mysql_init(NULL);

View File

@ -308,7 +308,7 @@ bool gwbuf_set_type(
case GWBUF_TYPE_MYSQL: case GWBUF_TYPE_MYSQL:
case GWBUF_TYPE_PLAINSQL: case GWBUF_TYPE_PLAINSQL:
case GWBUF_TYPE_UNDEFINED: case GWBUF_TYPE_UNDEFINED:
buf->gwbuf_type = type; buf->gwbuf_type |= type;
succp = true; succp = true;
break; break;
default: default:

View File

@ -469,7 +469,7 @@ int error_count = 0;
s = strtok(NULL, ","); s = strtok(NULL, ",");
} }
} }
if (filters) if (filters && obj->element)
{ {
serviceSetFilters(obj->element, filters); serviceSetFilters(obj->element, filters);
} }
@ -1090,7 +1090,7 @@ SERVER *server;
s = strtok(NULL, ","); s = strtok(NULL, ",");
} }
} }
if (filters) if (filters && obj->element)
serviceSetFilters(obj->element, filters); serviceSetFilters(obj->element, filters);
} }
else if (!strcmp(type, "listener")) else if (!strcmp(type, "listener"))

View File

@ -213,9 +213,9 @@ getUsers(SERVICE *service, struct users *users)
"Exiting."))); "Exiting.")));
return -1; return -1;
} }
/* /**
* Attempt to connect to each database in the service in turn until * Attempt to connect to one of the databases database or until we run
* we find one that we can connect to or until we run out of databases * out of databases
* to try * to try
*/ */
server = service->databases; server = service->databases;

View File

@ -83,6 +83,7 @@ static bool dcb_set_state_nomutex(
const dcb_state_t new_state, const dcb_state_t new_state,
dcb_state_t* old_state); dcb_state_t* old_state);
static void dcb_call_callback(DCB *dcb, DCB_REASON reason); static void dcb_call_callback(DCB *dcb, DCB_REASON reason);
static DCB* dcb_get_next (DCB* dcb);
DCB* dcb_get_zombies(void) DCB* dcb_get_zombies(void)
{ {
@ -109,6 +110,7 @@ DCB *rval;
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
rval->dcb_chk_top = CHK_NUM_DCB; rval->dcb_chk_top = CHK_NUM_DCB;
rval->dcb_chk_tail = CHK_NUM_DCB; rval->dcb_chk_tail = CHK_NUM_DCB;
rval->dcb_errhandle_called = false;
#endif #endif
rval->dcb_role = role; rval->dcb_role = role;
#if 1 #if 1
@ -132,6 +134,9 @@ DCB *rval;
rval->next = NULL; rval->next = NULL;
rval->callbacks = NULL; rval->callbacks = NULL;
rval->remote = NULL;
rval->user = NULL;
spinlock_acquire(&dcbspin); spinlock_acquire(&dcbspin);
if (allDCBs == NULL) if (allDCBs == NULL)
allDCBs = rval; allDCBs = rval;
@ -148,7 +153,7 @@ DCB *rval;
/** /**
* Free a DCB that has not been associated with a decriptor. * Free a DCB that has not been associated with a descriptor.
* *
* @param dcb The DCB to free * @param dcb The DCB to free
*/ */
@ -311,6 +316,8 @@ DCB_CALLBACK *cb;
free(dcb->data); free(dcb->data);
if (dcb->remote) if (dcb->remote)
free(dcb->remote); free(dcb->remote);
if (dcb->user)
free(dcb->user);
/* Clear write and read buffers */ /* Clear write and read buffers */
if (dcb->delayq) { if (dcb->delayq) {
@ -559,6 +566,7 @@ int rc;
dcb->fd = fd; dcb->fd = fd;
/** Copy status field to DCB */ /** Copy status field to DCB */
dcb->dcb_server_status = server->status; dcb->dcb_server_status = server->status;
ss_debug(dcb->dcb_port = server->port;)
/*< /*<
* backend_dcb is connected to backend server, and once backend_dcb * backend_dcb is connected to backend server, and once backend_dcb
@ -594,26 +602,29 @@ int rc;
* *
* @param dcb The DCB to read from * @param dcb The DCB to read from
* @param head Pointer to linked list to append data to * @param head Pointer to linked list to append data to
* @return -1 on error, otherwise the number of read bytes on the last. * @return -1 on error, otherwise the number of read bytes on the last
* 0 is returned if no data available on the last iteration of while loop. * iteration of while loop. 0 is returned if no data available.
*/ */
int int dcb_read(
dcb_read(DCB *dcb, GWBUF **head) DCB *dcb,
GWBUF **head)
{ {
GWBUF *buffer = NULL; GWBUF *buffer = NULL;
int b; int b;
int rc; int rc;
int n = 0; int n ;
int eno = 0; int nread = 0;
int eno = 0;
CHK_DCB(dcb); CHK_DCB(dcb);
while (true) while (true)
{ {
int bufsize; int bufsize;
rc = ioctl(dcb->fd, FIONREAD, &b); rc = ioctl(dcb->fd, FIONREAD, &b);
if (rc == -1) { if (rc == -1)
{
eno = errno; eno = errno;
errno = 0; errno = 0;
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
@ -628,19 +639,39 @@ int eno = 0;
n = -1; n = -1;
goto return_n; goto return_n;
} }
/*< Nothing to read - leave */
if (b == 0) { if (b == 0 && nread == 0)
{
/** Handle closed client socket */
if (dcb_isclient(dcb))
{
char c;
int l_errno = 0;
int r = -1;
/* try to read 1 byte, without consuming the socket buffer */
r = recv(dcb->fd, &c, sizeof(char), MSG_PEEK);
l_errno = errno;
if (r <= 0 &&
l_errno != EAGAIN &&
l_errno != EWOULDBLOCK)
{
n = -1;
goto return_n;
}
}
n = 0; n = 0;
goto return_n; goto return_n;
} }
bufsize = MIN(b, MAX_BUFFER_SIZE); bufsize = MIN(b, MAX_BUFFER_SIZE);
if ((buffer = gwbuf_alloc(bufsize)) == NULL) if ((buffer = gwbuf_alloc(bufsize)) == NULL)
{ {
/*< /*<
* This is a fatal error which should cause shutdown. * This is a fatal error which should cause shutdown.
* Todo shutdown if memory allocation fails. * Todo shutdown if memory allocation fails.
*/ */
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Failed to allocate read buffer " "Error : Failed to allocate read buffer "
@ -653,16 +684,17 @@ int eno = 0;
n = -1; n = -1;
ss_dassert(buffer != NULL); ss_dassert(buffer != NULL);
goto return_n; goto return_n;
} }
GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize);
dcb->stats.n_reads++); dcb->stats.n_reads++);
if (n <= 0) if (n <= 0)
{ {
int eno = errno; int eno = errno;
errno = 0; errno = 0;
if (eno != EAGAIN && eno != EWOULDBLOCK) { if (eno != 0 && eno != EAGAIN && eno != EWOULDBLOCK)
{
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Read failed, dcb %p in state " "Error : Read failed, dcb %p in state "
@ -673,18 +705,11 @@ int eno = 0;
eno, eno,
strerror(eno)))); strerror(eno))));
} }
else gwbuf_free(buffer);
{
/*<
* If read would block it means that other thread
* has probably read the data.
*/
n = 0;
}
gwbuf_free(buffer);
goto return_n; goto return_n;
} }
nread += n;
LOGIF(LD, (skygw_log_write( LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG, LOGFILE_DEBUG,
"%lu [dcb_read] Read %d bytes from dcb %p in state %s " "%lu [dcb_read] Read %d bytes from dcb %p in state %s "
@ -694,14 +719,13 @@ int eno = 0;
dcb, dcb,
STRDCBSTATE(dcb->state), STRDCBSTATE(dcb->state),
dcb->fd))); dcb->fd)));
/*< Append read data to the gwbuf */ /*< Append read data to the gwbuf */
*head = gwbuf_append(*head, buffer); *head = gwbuf_append(*head, buffer);
} /*< while (true) */ } /*< while (true) */
return_n: return_n:
return n; return n;
} }
/** /**
* General purpose routine to write to a DCB * General purpose routine to write to a DCB
* *
@ -711,7 +735,7 @@ return_n:
int int
dcb_write(DCB *dcb, GWBUF *queue) dcb_write(DCB *dcb, GWBUF *queue)
{ {
int w, qlen; int w;
int saved_errno = 0; int saved_errno = 0;
int below_water; int below_water;
@ -760,26 +784,26 @@ int below_water;
* not have a race condition on the event. * not have a race condition on the event.
*/ */
if (queue) if (queue)
qlen = gwbuf_length(queue); {
else int qlen;
qlen = 0;
atomic_add(&dcb->writeqlen, qlen); qlen = gwbuf_length(queue);
dcb->writeq = gwbuf_append(dcb->writeq, queue); atomic_add(&dcb->writeqlen, qlen);
dcb->stats.n_buffered++; dcb->writeq = gwbuf_append(dcb->writeq, queue);
LOGIF(LD, (skygw_log_write( dcb->stats.n_buffered++;
LOGFILE_DEBUG, LOGIF(LD, (skygw_log_write(
"%lu [dcb_write] Append to writequeue. %d writes " LOGFILE_DEBUG,
"buffered for dcb %p in state %s fd %d", "%lu [dcb_write] Append to writequeue. %d writes "
pthread_self(), "buffered for dcb %p in state %s fd %d",
dcb->stats.n_buffered, pthread_self(),
dcb, dcb->stats.n_buffered,
STRDCBSTATE(dcb->state), dcb,
dcb->fd))); STRDCBSTATE(dcb->state),
dcb->fd)));
}
} }
else else
{ {
int len;
/* /*
* Loop over the buffer chain that has been passed to us * Loop over the buffer chain that has been passed to us
* from the reading side. * from the reading side.
@ -788,6 +812,7 @@ int below_water;
*/ */
while (queue != NULL) while (queue != NULL)
{ {
int qlen;
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER && if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER &&
dcb->session != NULL) dcb->session != NULL)
@ -805,13 +830,13 @@ int below_water;
} }
} }
#endif /* SS_DEBUG */ #endif /* SS_DEBUG */
len = GWBUF_LENGTH(queue); qlen = GWBUF_LENGTH(queue);
GW_NOINTR_CALL( GW_NOINTR_CALL(
w = gw_write( w = gw_write(
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
dcb, dcb,
#endif #endif
dcb->fd, GWBUF_DATA(queue), len); dcb->fd, GWBUF_DATA(queue), qlen);
dcb->stats.n_writes++; dcb->stats.n_writes++;
); );
@ -822,37 +847,39 @@ int below_water;
if (LOG_IS_ENABLED(LOGFILE_DEBUG)) if (LOG_IS_ENABLED(LOGFILE_DEBUG))
{ {
if (saved_errno == EPIPE) { if (saved_errno == EPIPE)
LOGIF(LD, (skygw_log_write( {
LOGFILE_DEBUG, LOGIF(LD, (skygw_log_write(
"%lu [dcb_write] Write to dcb " LOGFILE_DEBUG,
"%p in state %s fd %d failed " "%lu [dcb_write] Write to dcb "
"due errno %d, %s", "%p in state %s fd %d failed "
pthread_self(), "due errno %d, %s",
dcb, pthread_self(),
STRDCBSTATE(dcb->state), dcb,
dcb->fd, STRDCBSTATE(dcb->state),
saved_errno, dcb->fd,
strerror(saved_errno)))); saved_errno,
strerror(saved_errno))));
} }
} }
if (LOG_IS_ENABLED(LOGFILE_ERROR)) if (LOG_IS_ENABLED(LOGFILE_ERROR))
{ {
if (saved_errno != EPIPE && if (saved_errno != EPIPE &&
saved_errno != EAGAIN && saved_errno != EAGAIN &&
saved_errno != EWOULDBLOCK) saved_errno != EWOULDBLOCK)
{ {
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Write to dcb %p in " "Error : Write to dcb %p in "
"state %s fd %d failed due " "state %s fd %d failed due "
"errno %d, %s", "errno %d, %s",
dcb, dcb,
STRDCBSTATE(dcb->state), STRDCBSTATE(dcb->state),
dcb->fd, dcb->fd,
saved_errno, saved_errno,
strerror(saved_errno)))); strerror(saved_errno))));
} }
} }
break; break;
} }
@ -876,20 +903,15 @@ int below_water;
* for suspended write. * for suspended write.
*/ */
dcb->writeq = queue; dcb->writeq = queue;
if (queue)
{
qlen = gwbuf_length(queue);
}
else
{
qlen = 0;
}
atomic_add(&dcb->writeqlen, qlen);
if (queue != NULL) if (queue)
{ {
dcb->stats.n_buffered++; int qlen;
}
qlen = gwbuf_length(queue);
atomic_add(&dcb->writeqlen, qlen);
dcb->stats.n_buffered++;
}
} /* if (dcb->writeq) */ } /* if (dcb->writeq) */
if (saved_errno != 0 && if (saved_errno != 0 &&
@ -937,10 +959,10 @@ int above_water;
above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0; above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0;
spinlock_acquire(&dcb->writeqlock); spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq)
if (dcb->writeq)
{ {
int len; int len;
/* /*
* Loop over the buffer chain in the pending writeq * Loop over the buffer chain in the pending writeq
* Send as much of the data in that chain as possible and * Send as much of the data in that chain as possible and
@ -996,16 +1018,17 @@ int above_water;
} }
spinlock_release(&dcb->writeqlock); spinlock_release(&dcb->writeqlock);
atomic_add(&dcb->writeqlen, -n); atomic_add(&dcb->writeqlen, -n);
/* The write queue has drained, potentially need to call a callback function */
/* The write queue has drained, potentially need to call a callback function */
if (dcb->writeq == NULL) if (dcb->writeq == NULL)
dcb_call_callback(dcb, DCB_REASON_DRAINED); dcb_call_callback(dcb, DCB_REASON_DRAINED);
if (above_water && dcb->writeqlen < dcb->low_water)
if (above_water && dcb->writeqlen < dcb->low_water)
{ {
atomic_add(&dcb->stats.n_low_water, 1); atomic_add(&dcb->stats.n_low_water, 1);
dcb_call_callback(dcb, DCB_REASON_LOW_WATER); dcb_call_callback(dcb, DCB_REASON_LOW_WATER);
} }
return n; return n;
} }
@ -1024,13 +1047,15 @@ void
dcb_close(DCB *dcb) dcb_close(DCB *dcb)
{ {
int rc; int rc;
CHK_DCB(dcb); CHK_DCB(dcb);
/*< /*<
* dcb_close may be called for freshly created dcb, in which case * dcb_close may be called for freshly created dcb, in which case
* it only needs to be freed. * it only needs to be freed.
*/ */
if (dcb->state == DCB_STATE_ALLOC) { if (dcb->state == DCB_STATE_ALLOC)
{
dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL); dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL);
dcb_final_free(dcb); dcb_final_free(dcb);
return; return;
@ -1041,13 +1066,19 @@ dcb_close(DCB *dcb)
dcb->state == DCB_STATE_ZOMBIE); dcb->state == DCB_STATE_ZOMBIE);
/*< /*<
* Stop dcb's listening and modify state accordingly. * Stop dcb's listening and modify state accordingly.
*/ */
rc = poll_remove_dcb(dcb); rc = poll_remove_dcb(dcb);
ss_dassert(dcb->state == DCB_STATE_NOPOLLING || ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE); dcb->state == DCB_STATE_ZOMBIE);
/**
* close protocol and router session
*/
if (dcb->func.close != NULL)
{
dcb->func.close(dcb);
}
dcb_call_callback(dcb, DCB_REASON_CLOSE); dcb_call_callback(dcb, DCB_REASON_CLOSE);
if (rc == 0) { if (rc == 0) {
@ -1068,7 +1099,8 @@ dcb_close(DCB *dcb)
STRDCBSTATE(dcb->state)))); STRDCBSTATE(dcb->state))));
} }
if (dcb->state == DCB_STATE_NOPOLLING) { if (dcb->state == DCB_STATE_NOPOLLING)
{
dcb_add_to_zombieslist(dcb); dcb_add_to_zombieslist(dcb);
} }
} }
@ -1086,6 +1118,8 @@ printDCB(DCB *dcb)
printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state)); printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state));
if (dcb->remote) if (dcb->remote)
printf("\tConnected to: %s\n", dcb->remote); printf("\tConnected to: %s\n", dcb->remote);
if (dcb->user)
printf("\tUsername to: %s\n", dcb->user);
if (dcb->writeq) if (dcb->writeq)
printf("\tQueued write data: %d\n",gwbuf_length(dcb->writeq)); printf("\tQueued write data: %d\n",gwbuf_length(dcb->writeq));
printf("\tStatistics:\n"); printf("\tStatistics:\n");
@ -1143,6 +1177,9 @@ DCB *dcb;
if (dcb->remote) if (dcb->remote)
dcb_printf(pdcb, "\tConnected to: %s\n", dcb_printf(pdcb, "\tConnected to: %s\n",
dcb->remote); dcb->remote);
if (dcb->user)
dcb_printf(pdcb, "\tUsername: %s\n",
dcb->user);
if (dcb->writeq) if (dcb->writeq)
dcb_printf(pdcb, "\tQueued write data: %d\n", dcb_printf(pdcb, "\tQueued write data: %d\n",
gwbuf_length(dcb->writeq)); gwbuf_length(dcb->writeq));
@ -1170,6 +1207,8 @@ DCB *dcb;
spinlock_acquire(&dcbspin); spinlock_acquire(&dcbspin);
dcb = allDCBs; dcb = allDCBs;
dcb_printf(pdcb, "Descriptor Control Blocks\n");
dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n");
dcb_printf(pdcb, " %-10s | %-26s | %-20s | %s\n", dcb_printf(pdcb, " %-10s | %-26s | %-20s | %s\n",
"DCB", "State", "Service", "Remote"); "DCB", "State", "Service", "Remote");
dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n"); dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n");
@ -1182,7 +1221,7 @@ DCB *dcb;
(dcb->remote ? dcb->remote : "")); (dcb->remote ? dcb->remote : ""));
dcb = dcb->next; dcb = dcb->next;
} }
dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n"); dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n\n");
spinlock_release(&dcbspin); spinlock_release(&dcbspin);
} }
@ -1450,7 +1489,7 @@ static bool dcb_set_state_nomutex(
} /*< switch (dcb->state) */ } /*< switch (dcb->state) */
if (succp) { if (succp) {
LOGIF(LD, (skygw_log_write( LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG, LOGFILE_DEBUG,
"%lu [dcb_set_state_nomutex] dcb %p fd %d %s -> %s", "%lu [dcb_set_state_nomutex] dcb %p fd %d %s -> %s",
pthread_self(), pthread_self(),
@ -1566,7 +1605,10 @@ int gw_write(
* @return Non-zero (true) if the callback was added * @return Non-zero (true) if the callback was added
*/ */
int int
dcb_add_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata) dcb_add_callback(
DCB *dcb,
DCB_REASON reason,
int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata)
{ {
DCB_CALLBACK *cb, *ptr; DCB_CALLBACK *cb, *ptr;
int rval = 1; int rval = 1;
@ -1637,7 +1679,7 @@ int rval = 0;
if (cb->reason == reason && cb->cb == callback if (cb->reason == reason && cb->cb == callback
&& cb->userdata == userdata) && cb->userdata == userdata)
{ {
if (pcb == NULL) if (pcb != NULL)
pcb->next = cb->next; pcb->next = cb->next;
else else
dcb->callbacks = cb->next; dcb->callbacks = cb->next;
@ -1711,3 +1753,72 @@ int rval = 0;
return rval; return rval;
} }
static DCB* dcb_get_next (
DCB* dcb)
{
DCB* p;
spinlock_acquire(&dcbspin);
p = allDCBs;
if (dcb == NULL || p == NULL)
{
dcb = p;
}
else
{
while (p != NULL && dcb != p)
{
p = p->next;
}
if (p != NULL)
{
dcb = p->next;
}
else
{
dcb = NULL;
}
}
spinlock_release(&dcbspin);
return dcb;
}
void dcb_call_foreach (
SERVER* srv,
DCB_REASON reason)
{
switch (reason) {
case DCB_REASON_CLOSE:
case DCB_REASON_DRAINED:
case DCB_REASON_HIGH_WATER:
case DCB_REASON_LOW_WATER:
case DCB_REASON_ERROR:
case DCB_REASON_HUP:
case DCB_REASON_NOT_RESPONDING:
{
DCB* dcb;
dcb = dcb_get_next(NULL);
while (dcb != NULL)
{
if (dcb->state == DCB_STATE_POLLING)
{
dcb_call_callback(dcb, DCB_REASON_NOT_RESPONDING);
}
dcb = dcb_get_next(dcb);
}
break;
}
default:
break;
}
return;
}

View File

@ -220,6 +220,8 @@ int i;
ptr = allFilters; ptr = allFilters;
if (ptr) if (ptr)
{ {
dcb_printf(dcb, "Filters\n");
dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n");
dcb_printf(dcb, "%-18s | %-15s | Options\n", dcb_printf(dcb, "%-18s | %-15s | Options\n",
"Filter", "Module"); "Filter", "Module");
dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n"); dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n");
@ -234,7 +236,7 @@ int i;
ptr = ptr->next; ptr = ptr->next;
} }
if (allFilters) if (allFilters)
dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n"); dcb_printf(dcb, "--------------------+-----------------+----------------------------------------\n\n");
spinlock_release(&filter_spin); spinlock_release(&filter_spin);
} }
@ -333,7 +335,7 @@ DOWNSTREAM *me;
return NULL; return NULL;
} }
me->instance = filter->filter; me->instance = filter->filter;
me->routeQuery = filter->obj->routeQuery; me->routeQuery = (void *)(filter->obj->routeQuery);
me->session = filter->obj->newSession(me->instance, session); me->session = filter->obj->newSession(me->instance, session);
filter->obj->setDownstream(me->instance, me->session, downstream); filter->obj->setDownstream(me->instance, me->session, downstream);

View File

@ -52,6 +52,7 @@
#include <poll.h> #include <poll.h>
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h>
#include <mysql.h> #include <mysql.h>
#include <monitor.h> #include <monitor.h>
#include <version.h> #include <version.h>

View File

@ -130,6 +130,7 @@ setipaddress(struct in_addr *a, char *p) {
return 1; return 1;
} }
#endif #endif
return 0;
} }
/** /**
@ -157,62 +158,6 @@ void gw_daemonize(void) {
} }
} }
/////////////////////////////////////////////////
// Read data from dcb and store it in the gwbuf
/////////////////////////////////////////////////
int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) {
GWBUF *buffer = NULL;
int n = -1;
if (b <= 0) {
ss_dassert(false);
#if 0
dcb->func.close(dcb);
#endif
return 1;
}
while (b > 0) {
int bufsize = b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE;
if ((buffer = gwbuf_alloc(bufsize)) == NULL) {
/* Bad news, we have run out of memory */
/* Error handling */
(dcb->func).close(dcb);
return 1;
}
GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); dcb->stats.n_reads++);
if (n < 0) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
gwbuf_free(buffer);
return 1;
} else {
gwbuf_free(buffer);
(dcb->func).close(dcb);
return 1;
}
}
if (n == 0) {
// socket closed
gwbuf_free(buffer);
#if 1
(dcb->func).close(dcb);
#endif
return 1;
}
// append read data to the gwbuf
*head = gwbuf_append(*head, buffer);
// how many bytes left
b -= n;
}
return 0;
}
/** /**
* Parse the bind config data. This is passed in a string as address:port. * Parse the bind config data. This is passed in a string as address:port.
* *

View File

@ -359,6 +359,8 @@ dprintAllModules(DCB *dcb)
{ {
MODULES *ptr = registered; MODULES *ptr = registered;
dcb_printf(dcb, "Modules.\n");
dcb_printf(dcb, "----------------+-------------+---------+-------+-------------------------\n");
dcb_printf(dcb, "%-15s | %-11s | Version | API | Status\n", "Module Name", "Module Type"); dcb_printf(dcb, "%-15s | %-11s | Version | API | Status\n", "Module Name", "Module Type");
dcb_printf(dcb, "----------------+-------------+---------+-------+-------------------------\n"); dcb_printf(dcb, "----------------+-------------+---------+-------+-------------------------\n");
while (ptr) while (ptr)
@ -380,5 +382,5 @@ MODULES *ptr = registered;
dcb_printf(dcb, "\n"); dcb_printf(dcb, "\n");
ptr = ptr->next; ptr = ptr->next;
} }
dcb_printf(dcb, "----------------+-------------+---------+-------+-------------------------\n"); dcb_printf(dcb, "----------------+-------------+---------+-------+-------------------------\n\n");
} }

View File

@ -349,8 +349,8 @@ poll_waitevents(void *arg)
ss_dassert(dcb->state != DCB_STATE_FREED); ss_dassert(dcb->state != DCB_STATE_FREED);
ss_debug(spinlock_release(&dcb->dcb_initlock);) ss_debug(spinlock_release(&dcb->dcb_initlock);)
LOGIF(LT, (skygw_log_write( LOGIF(LD, (skygw_log_write(
LOGFILE_TRACE, LOGFILE_DEBUG,
"%lu [poll_waitevents] event %d dcb %p " "%lu [poll_waitevents] event %d dcb %p "
"role %s", "role %s",
pthread_self(), pthread_self(),

View File

@ -312,14 +312,16 @@ char *stat;
ptr = allServers; ptr = allServers;
if (ptr) if (ptr)
{ {
dcb_printf(dcb, "%-18s | %-15s | Port | %-18s | Connections\n", dcb_printf(dcb, "Servers.\n");
dcb_printf(dcb, "-------------------+-----------------+-------+----------------------+------------\n");
dcb_printf(dcb, "%-18s | %-15s | Port | %-20s | Connections\n",
"Server", "Address", "Status"); "Server", "Address", "Status");
dcb_printf(dcb, "-------------------+-----------------+-------+--------------------+------------\n"); dcb_printf(dcb, "-------------------+-----------------+-------+----------------------+------------\n");
} }
while (ptr) while (ptr)
{ {
stat = server_status(ptr); stat = server_status(ptr);
dcb_printf(dcb, "%-18s | %-15s | %5d | %-18s | %4d\n", dcb_printf(dcb, "%-18s | %-15s | %5d | %-20s | %4d\n",
ptr->unique_name, ptr->name, ptr->unique_name, ptr->name,
ptr->port, stat, ptr->port, stat,
ptr->stats.n_current); ptr->stats.n_current);
@ -327,7 +329,7 @@ char *stat;
ptr = ptr->next; ptr = ptr->next;
} }
if (allServers) if (allServers)
dcb_printf(dcb, "-------------------+-----------------+-------+--------------------+------------\n"); dcb_printf(dcb, "-------------------+-----------------+-------+----------------------+------------\n\n");
spinlock_release(&server_spin); spinlock_release(&server_spin);
} }

View File

@ -650,12 +650,23 @@ FILTER_DEF **flist;
char *ptr, *brkt; char *ptr, *brkt;
int n = 0; int n = 0;
flist = (FILTER_DEF *)malloc(sizeof(FILTER_DEF *)); if ((flist = (FILTER_DEF **)malloc(sizeof(FILTER_DEF *))) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Out of memory adding filters to service.\n")));
return;
}
ptr = strtok_r(filters, "|", &brkt); ptr = strtok_r(filters, "|", &brkt);
while (ptr) while (ptr)
{ {
n++; n++;
flist = (FILTER_DEF *)realloc(flist, (n + 1) * sizeof(FILTER_DEF *)); if ((flist = (FILTER_DEF **)realloc(flist,
(n + 1) * sizeof(FILTER_DEF *))) == NULL)
{
LOGIF(LE, (skygw_log_write_flush(LOGFILE_ERROR,
"Out of memory adding filters to service.\n")));
return;
}
if ((flist[n-1] = filter_find(trim(ptr))) == NULL) if ((flist[n-1] = filter_find(trim(ptr))) == NULL)
{ {
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
@ -826,6 +837,8 @@ SERVICE *ptr;
ptr = allServices; ptr = allServices;
if (ptr) if (ptr)
{ {
dcb_printf(dcb, "Services.\n");
dcb_printf(dcb, "--------------------------+----------------------+--------+---------------\n");
dcb_printf(dcb, "%-25s | %-20s | #Users | Total Sessions\n", dcb_printf(dcb, "%-25s | %-20s | #Users | Total Sessions\n",
"Service Name", "Router Module"); "Service Name", "Router Module");
dcb_printf(dcb, "--------------------------+----------------------+--------+---------------\n"); dcb_printf(dcb, "--------------------------+----------------------+--------+---------------\n");
@ -838,7 +851,7 @@ SERVICE *ptr;
ptr = ptr->next; ptr = ptr->next;
} }
if (allServices) if (allServices)
dcb_printf(dcb, "--------------------------+----------------------+--------+---------------\n"); dcb_printf(dcb, "--------------------------+----------------------+--------+---------------\n\n");
spinlock_release(&service_spin); spinlock_release(&service_spin);
} }
@ -857,9 +870,11 @@ SERV_PROTOCOL *lptr;
ptr = allServices; ptr = allServices;
if (ptr) if (ptr)
{ {
dcb_printf(dcb, "Listeners.\n");
dcb_printf(dcb, "---------------------+--------------------+-----------------+-------+--------\n");
dcb_printf(dcb, "%-20s | %-18s | %-15s | Port | State\n", dcb_printf(dcb, "%-20s | %-18s | %-15s | Port | State\n",
"Service Name", "Protocol Module", "Address"); "Service Name", "Protocol Module", "Address");
dcb_printf(dcb, "---------------------+--------------------+-----------------+-------+------\n"); dcb_printf(dcb, "---------------------+--------------------+-----------------+-------+--------\n");
} }
while (ptr) while (ptr)
{ {
@ -868,7 +883,7 @@ SERV_PROTOCOL *lptr;
{ {
dcb_printf(dcb, "%-20s | %-18s | %-15s | %5d | %s\n", dcb_printf(dcb, "%-20s | %-18s | %-15s | %5d | %s\n",
ptr->name, lptr->protocol, ptr->name, lptr->protocol,
(lptr != NULL) ? lptr->address : "*", (lptr && lptr->address) ? lptr->address : "*",
lptr->port, lptr->port,
(lptr->listener->session->state == SESSION_STATE_LISTENER_STOPPED) ? "Stopped" : "Running" (lptr->listener->session->state == SESSION_STATE_LISTENER_STOPPED) ? "Stopped" : "Running"
); );
@ -878,7 +893,7 @@ SERV_PROTOCOL *lptr;
ptr = ptr->next; ptr = ptr->next;
} }
if (allServices) if (allServices)
dcb_printf(dcb, "---------------------+--------------------+-----------------+-------+------\n"); dcb_printf(dcb, "---------------------+--------------------+-----------------+-------+--------\n\n");
spinlock_release(&service_spin); spinlock_release(&service_spin);
} }
@ -1081,3 +1096,9 @@ static void service_add_qualified_param(
(*p)->next = NULL; (*p)->next = NULL;
spinlock_release(&svc->spin); spinlock_release(&svc->spin);
} }
char* service_get_name(
SERVICE* svc)
{
return svc->name;
}

View File

@ -164,7 +164,8 @@ session_alloc(SERVICE *service, DCB *client_dcb)
*/ */
session->head.instance = service->router_instance; session->head.instance = service->router_instance;
session->head.session = session->router_session; session->head.session = session->router_session;
session->head.routeQuery = service->router->routeQuery;
session->head.routeQuery = (void *)(service->router->routeQuery);
session->tail.instance = session; session->tail.instance = session;
session->tail.session = session; session->tail.session = session;
@ -546,19 +547,23 @@ SESSION *ptr;
ptr = allSessions; ptr = allSessions;
if (ptr) if (ptr)
{ {
dcb_printf(dcb, "Session | Client | State\n"); dcb_printf(dcb, "Sessions.\n");
dcb_printf(dcb, "-----------------+-----------------+----------------\n"); dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n");
dcb_printf(dcb, "Session | Client | Service | State\n");
dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n");
} }
while (ptr) while (ptr)
{ {
dcb_printf(dcb, "%-16p | %-15s | %s\n", ptr, dcb_printf(dcb, "%-16p | %-15s | %-14s | %s\n", ptr,
((ptr->client && ptr->client->remote) ((ptr->client && ptr->client->remote)
? ptr->client->remote : ""), ? ptr->client->remote : ""),
(ptr->service && ptr->service->name ? ptr->service->name
: ""),
session_state(ptr->state)); session_state(ptr->state));
ptr = ptr->next; ptr = ptr->next;
} }
if (allSessions) if (allSessions)
dcb_printf(dcb, "-----------------+-----------------+----------------\n"); dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n\n");
spinlock_release(&session_spin); spinlock_release(&session_spin);
} }
@ -671,7 +676,6 @@ int i;
return 1; return 1;
} }
/** /**
* Entry point for the final element int he upstream filter, i.e. the writing * Entry point for the final element int he upstream filter, i.e. the writing
* of the data to the client. * of the data to the client.
@ -700,3 +704,30 @@ session_get_remote(SESSION *session)
return session->client->remote; return session->client->remote;
return NULL; return NULL;
} }
bool session_route_query (
SESSION* ses,
GWBUF* buf)
{
bool succp;
if (ses->head.routeQuery == NULL ||
ses->head.instance == NULL ||
ses->head.session == NULL)
{
succp = false;
goto return_succp;
}
if (ses->head.routeQuery(ses->head.instance, ses->head.session, buf) == 1)
{
succp = true;
}
else
{
succp = false;
}
return_succp:
return succp;
}

View File

@ -46,11 +46,14 @@
typedef enum typedef enum
{ {
GWBUF_TYPE_UNDEFINED = 0x0, GWBUF_TYPE_UNDEFINED = 0x00,
GWBUF_TYPE_PLAINSQL = 0x1, GWBUF_TYPE_PLAINSQL = 0x01,
GWBUF_TYPE_MYSQL = 0x2 GWBUF_TYPE_MYSQL = 0x02
} gwbuf_type_t; } gwbuf_type_t;
#define GWBUF_IS_TYPE_PLAINSQL(b) (b->gwbuf_type & GWBUF_TYPE_PLAINSQL)
#define GWBUF_IS_TYPE_MYSQL(b) (b->gwbuf_type & GWBUF_TYPE_MYSQL)
/** /**
* A structure to encapsulate the data in a form that the data itself can be * A structure to encapsulate the data in a form that the data itself can be
* shared between multiple GWBUF's without the need to make multiple copies * shared between multiple GWBUF's without the need to make multiple copies

View File

@ -24,6 +24,8 @@
#include <skygw_utils.h> #include <skygw_utils.h>
#include <netinet/in.h> #include <netinet/in.h>
#define ERRHANDLE
struct session; struct session;
struct server; struct server;
struct service; struct service;
@ -163,7 +165,8 @@ typedef enum {
DCB_REASON_HIGH_WATER, /*< Cross high water mark */ DCB_REASON_HIGH_WATER, /*< Cross high water mark */
DCB_REASON_LOW_WATER, /*< Cross low water mark */ DCB_REASON_LOW_WATER, /*< Cross low water mark */
DCB_REASON_ERROR, /*< An error was flagged on the connection */ DCB_REASON_ERROR, /*< An error was flagged on the connection */
DCB_REASON_HUP /*< A hangup was detected */ DCB_REASON_HUP, /*< A hangup was detected */
DCB_REASON_NOT_RESPONDING /*< Server connection was lost */
} DCB_REASON; } DCB_REASON;
/** /**
@ -192,6 +195,7 @@ typedef struct dcb_callback {
typedef struct dcb { typedef struct dcb {
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
skygw_chk_t dcb_chk_top; skygw_chk_t dcb_chk_top;
bool dcb_errhandle_called;
#endif #endif
dcb_role_t dcb_role; dcb_role_t dcb_role;
SPINLOCK dcb_initlock; SPINLOCK dcb_initlock;
@ -204,12 +208,13 @@ typedef struct dcb {
int fd; /**< The descriptor */ int fd; /**< The descriptor */
dcb_state_t state; /**< Current descriptor state */ dcb_state_t state; /**< Current descriptor state */
char *remote; /**< Address of remote end */ char *remote; /**< Address of remote end */
char *user; /**< User name for connection */
struct sockaddr_in ipv4; /**< remote end IPv4 address */ struct sockaddr_in ipv4; /**< remote end IPv4 address */
void *protocol; /**< The protocol specific state */ void *protocol; /**< The protocol specific state */
struct session *session; /**< The owning session */ struct session *session; /**< The owning session */
GWPROTOCOL func; /**< The functions for this descriptor */ GWPROTOCOL func; /**< The functions for this descriptor */
unsigned int writeqlen; /**< Current number of byes in the write queue */ int writeqlen; /**< Current number of byes in the write queue */
SPINLOCK writeqlock; /**< Write Queue spinlock */ SPINLOCK writeqlock; /**< Write Queue spinlock */
GWBUF *writeq; /**< Write Data Queue */ GWBUF *writeq; /**< Write Data Queue */
SPINLOCK delayqlock; /**< Delay Backend Write Queue spinlock */ SPINLOCK delayqlock; /**< Delay Backend Write Queue spinlock */
@ -230,6 +235,7 @@ typedef struct dcb {
unsigned int high_water; /**< High water mark */ unsigned int high_water; /**< High water mark */
unsigned int low_water; /**< Low water mark */ unsigned int low_water; /**< Low water mark */
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
int dcb_port; /**< port of target server */
skygw_chk_t dcb_chk_tail; skygw_chk_t dcb_chk_tail;
#endif #endif
} DCB; } DCB;

View File

@ -66,6 +66,12 @@ typedef void *ROUTER;
* *
* @see load_module * @see load_module
*/ */
typedef enum error_action {
ERRACT_NEW_CONNECTION = 0x001,
ERRACT_REPLY_CLIENT = 0x002
} error_action_t;
typedef struct router_object { typedef struct router_object {
ROUTER *(*createInstance)(SERVICE *service, char **options); ROUTER *(*createInstance)(SERVICE *service, char **options);
void *(*newSession)(ROUTER *instance, SESSION *session); void *(*newSession)(ROUTER *instance, SESSION *session);
@ -74,7 +80,13 @@ typedef struct router_object {
int (*routeQuery)(ROUTER *instance, void *router_session, GWBUF *queue); int (*routeQuery)(ROUTER *instance, void *router_session, GWBUF *queue);
void (*diagnostics)(ROUTER *instance, DCB *dcb); void (*diagnostics)(ROUTER *instance, DCB *dcb);
void (*clientReply)(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb); void (*clientReply)(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb);
void (*errorReply)(ROUTER* instance, void* router_session, char* message, DCB *backend_dcb, int action); void (*handleError)(
ROUTER* instance,
void* router_session,
GWBUF* errmsgbuf,
DCB* backend_dcb,
error_action_t action,
bool* succp);
uint8_t (*getCapabilities)(ROUTER *instance, void* router_session); uint8_t (*getCapabilities)(ROUTER *instance, void* router_session);
} ROUTER_OBJECT; } ROUTER_OBJECT;
@ -91,4 +103,6 @@ typedef enum router_capability_t {
RCAP_TYPE_PACKET_INPUT = (1 << 1) RCAP_TYPE_PACKET_INPUT = (1 << 1)
} router_capability_t; } router_capability_t;
#endif #endif

View File

@ -117,6 +117,11 @@ typedef struct server {
*/ */
#define SERVER_IN_MAINT(server) ((server)->status & SERVER_MAINT) #define SERVER_IN_MAINT(server) ((server)->status & SERVER_MAINT)
/** server is not master, slave or joined */
#define SERVER_NOT_IN_CLUSTER(s) (((s)->status & (SERVER_MASTER|SERVER_SLAVE|SERVER_JOINED)) == 0)
#define SERVER_IS_IN_CLUSTER(s) (((s)->status & (SERVER_MASTER|SERVER_SLAVE|SERVER_JOINED)) != 0)
extern SERVER *server_alloc(char *, char *, unsigned short); extern SERVER *server_alloc(char *, char *, unsigned short);
extern int server_free(SERVER *); extern int server_free(SERVER *);
extern SERVER *server_find_by_unique_name(char *); extern SERVER *server_find_by_unique_name(char *);

View File

@ -155,6 +155,7 @@ extern int serviceStop(SERVICE *);
extern int serviceRestart(SERVICE *); extern int serviceRestart(SERVICE *);
extern int serviceSetUser(SERVICE *, char *, char *); extern int serviceSetUser(SERVICE *, char *, char *);
extern int serviceGetUser(SERVICE *, char **, char **); extern int serviceGetUser(SERVICE *, char **, char **);
extern void serviceSetFilters(SERVICE *, char *);
extern int serviceEnableRootUser(SERVICE *, int ); extern int serviceEnableRootUser(SERVICE *, int );
extern void service_update(SERVICE *, char *, char *, char *); extern void service_update(SERVICE *, char *, char *, char *);
extern int service_refresh_users(SERVICE *); extern int service_refresh_users(SERVICE *);
@ -169,4 +170,5 @@ bool service_set_slave_conn_limit (
extern void dprintService(DCB *, SERVICE *); extern void dprintService(DCB *, SERVICE *);
extern void dListServices(DCB *); extern void dListServices(DCB *);
extern void dListListeners(DCB *); extern void dListListeners(DCB *);
char* service_get_name(SERVICE* svc);
#endif #endif

View File

@ -57,7 +57,7 @@ typedef enum {
SESSION_STATE_ALLOC, /*< for all sessions */ SESSION_STATE_ALLOC, /*< for all sessions */
SESSION_STATE_READY, /*< for router session */ SESSION_STATE_READY, /*< for router session */
SESSION_STATE_ROUTER_READY, /*< for router session */ SESSION_STATE_ROUTER_READY, /*< for router session */
SESSION_STATE_STOPPING, /*< router is being closed */ SESSION_STATE_STOPPING, /*< session and router are being closed */
SESSION_STATE_LISTENER, /*< for listener session */ SESSION_STATE_LISTENER, /*< for listener session */
SESSION_STATE_LISTENER_STOPPED, /*< for listener session */ SESSION_STATE_LISTENER_STOPPED, /*< for listener session */
SESSION_STATE_FREE /*< for all sessions */ SESSION_STATE_FREE /*< for all sessions */

View File

@ -90,6 +90,7 @@ typedef struct {
int topN; /* Number of queries to store */ int topN; /* Number of queries to store */
char *filebase; /* Base of fielname to log into */ char *filebase; /* Base of fielname to log into */
char *source; /* The source of the client connection */ char *source; /* The source of the client connection */
char *user; /* A user name to filter on */
char *match; /* Optional text to match against */ char *match; /* Optional text to match against */
regex_t re; /* Compiled regex text */ regex_t re; /* Compiled regex text */
char *exclude; /* Optional text to match against for exclusion */ char *exclude; /* Optional text to match against for exclusion */
@ -117,6 +118,7 @@ typedef struct {
UPSTREAM up; UPSTREAM up;
int active; int active;
char *clientHost; char *clientHost;
char *userName;
char *filename; char *filename;
int fd; int fd;
struct timeval start; struct timeval start;
@ -182,6 +184,7 @@ TOPN_INSTANCE *my_instance;
my_instance->match = NULL; my_instance->match = NULL;
my_instance->exclude = NULL; my_instance->exclude = NULL;
my_instance->source = NULL; my_instance->source = NULL;
my_instance->user = NULL;
my_instance->filebase = strdup("top"); my_instance->filebase = strdup("top");
for (i = 0; params && params[i]; i++) for (i = 0; params && params[i]; i++)
{ {
@ -202,6 +205,8 @@ TOPN_INSTANCE *my_instance;
} }
else if (!strcmp(params[i]->name, "source")) else if (!strcmp(params[i]->name, "source"))
my_instance->source = strdup(params[i]->value); my_instance->source = strdup(params[i]->value);
else if (!strcmp(params[i]->name, "user"))
my_instance->user = strdup(params[i]->value);
else if (!filter_standard_parameter(params[i]->name)) else if (!filter_standard_parameter(params[i]->name))
{ {
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
@ -226,6 +231,7 @@ TOPN_INSTANCE *my_instance;
my_instance->match))); my_instance->match)));
free(my_instance->match); free(my_instance->match);
free(my_instance->source); free(my_instance->source);
free(my_instance->user);
free(my_instance->filebase); free(my_instance->filebase);
free(my_instance); free(my_instance);
return NULL; return NULL;
@ -241,6 +247,7 @@ TOPN_INSTANCE *my_instance;
regfree(&my_instance->re); regfree(&my_instance->re);
free(my_instance->match); free(my_instance->match);
free(my_instance->source); free(my_instance->source);
free(my_instance->user);
free(my_instance->filebase); free(my_instance->filebase);
free(my_instance); free(my_instance);
return NULL; return NULL;
@ -292,10 +299,17 @@ int i;
my_session->clientHost = strdup(session->client->remote); my_session->clientHost = strdup(session->client->remote);
else else
my_session->clientHost = NULL; my_session->clientHost = NULL;
if (session && session->client && session->client->user)
my_session->userName = strdup(session->client->user);
else
my_session->userName = NULL;
my_session->active = 1; my_session->active = 1;
if (my_instance->source && strcmp(my_session->clientHost, if (my_instance->source && strcmp(my_session->clientHost,
my_instance->source)) my_instance->source))
my_session->active = 0; my_session->active = 0;
if (my_instance->user && strcmp(my_session->userName,
my_instance->user))
my_session->active = 0;
sprintf(my_session->filename, "%s.%d", my_instance->filebase, sprintf(my_session->filename, "%s.%d", my_instance->filebase,
my_instance->sessions); my_instance->sessions);
@ -328,30 +342,39 @@ FILE *fp;
{ {
fprintf(fp, "Top %d longest running queries in session.\n", fprintf(fp, "Top %d longest running queries in session.\n",
my_instance->topN); my_instance->topN);
fprintf(fp, "==========================================\n\n");
fprintf(fp, "Time (sec) | Query\n");
fprintf(fp, "-----------+-----------------------------------------------------------------\n");
for (i = 0; i < my_instance->topN; i++) for (i = 0; i < my_instance->topN; i++)
{ {
if (my_session->top[i]->sql) if (my_session->top[i]->sql)
{ {
fprintf(fp, "%.3f, %s\n", fprintf(fp, "%10.3f | %s\n",
(double)((my_session->top[i]->duration.tv_sec * 1000) (double)((my_session->top[i]->duration.tv_sec * 1000)
+ (my_session->top[i]->duration.tv_usec / 1000)) / 1000, + (my_session->top[i]->duration.tv_usec / 1000)) / 1000,
my_session->top[i]->sql); my_session->top[i]->sql);
} }
} }
fprintf(fp, "\n\nTotal of %d statements executed.\n", fprintf(fp, "-----------+-----------------------------------------------------------------\n");
my_session->n_statements); fprintf(fp, "\n\nSession started %s",
fprintf(fp, "Total statement execution time %d.%d seconds\n", asctime(localtime(&my_session->connect)));
(int)my_session->total.tv_sec,
(int)my_session->total.tv_usec / 1000);
fprintf(fp, "Average statement execution time %.3f.\n",
(double)((my_session->total.tv_sec * 1000)
+ (my_session->total.tv_usec / 1000))
/ (1000 * my_session->n_statements));
fprintf(fp, "Total connection time %d.%d seconds\n",
(int)diff.tv_sec, (int)diff.tv_usec / 1000);
if (my_session->clientHost) if (my_session->clientHost)
fprintf(fp, "Connection from %s\n", fprintf(fp, "Connection from %s\n",
my_session->clientHost); my_session->clientHost);
if (my_session->userName)
fprintf(fp, "Username %s\n",
my_session->userName);
fprintf(fp, "\nTotal of %d statements executed.\n",
my_session->n_statements);
fprintf(fp, "Total statement execution time %5d.%d seconds\n",
(int)my_session->total.tv_sec,
(int)my_session->total.tv_usec / 1000);
fprintf(fp, "Average statement execution time %9.3f seconds\n",
(double)((my_session->total.tv_sec * 1000)
+ (my_session->total.tv_usec / 1000))
/ (1000 * my_session->n_statements));
fprintf(fp, "Total connection time %5d.%d seconds\n",
(int)diff.tv_sec, (int)diff.tv_usec / 1000);
fclose(fp); fclose(fp);
} }
} }

View File

@ -88,7 +88,7 @@
#define SMALL_CHUNK 1024 #define SMALL_CHUNK 1024
#define MAX_CHUNK SMALL_CHUNK * 8 * 4 #define MAX_CHUNK SMALL_CHUNK * 8 * 4
#define ToHex(Y) (Y>='0'&&Y<='9'?Y-'0':Y-'A'+10) #define ToHex(Y) (Y>='0'&&Y<='9'?Y-'0':Y-'A'+10)
#define COM_QUIT_PACKET_SIZE (4+1)
struct dcb; struct dcb;
typedef enum { typedef enum {
@ -104,7 +104,6 @@ typedef enum {
MYSQL_SESSION_CHANGE MYSQL_SESSION_CHANGE
} mysql_pstate_t; } mysql_pstate_t;
/* /*
* MySQL Protocol specific state data * MySQL Protocol specific state data
*/ */
@ -237,9 +236,10 @@ typedef enum
#define MYSQL_COM_INIT_DB 0x2 #define MYSQL_COM_INIT_DB 0x2
#define MYSQL_COM_QUERY 0x3 #define MYSQL_COM_QUERY 0x3
#define MYSQL_GET_COMMAND(payload) (payload[4]) #define MYSQL_GET_COMMAND(payload) (payload[4])
#define MYSQL_GET_PACKET_NO(payload) (payload[3]) #define MYSQL_GET_PACKET_NO(payload) (payload[3])
#define MYSQL_GET_PACKET_LEN(payload) (gw_mysql_get_byte3(payload)) #define MYSQL_GET_PACKET_LEN(payload) (gw_mysql_get_byte3(payload))
#define MYSQL_GET_ERRCODE(payload) (gw_mysql_get_byte2(&payload[5]))
#endif #endif
@ -256,12 +256,21 @@ int gw_send_authentication_to_backend(
uint8_t *passwd, uint8_t *passwd,
MySQLProtocol *protocol); MySQLProtocol *protocol);
const char *gw_mysql_protocol_state2string(int state); const char *gw_mysql_protocol_state2string(int state);
int gw_do_connect_to_backend(char *host, int port, int* fd); int gw_do_connect_to_backend(char *host, int port, int* fd);
int mysql_send_com_quit(DCB* dcb, int packet_number, GWBUF* buf);
GWBUF* mysql_create_com_quit(GWBUF* bufparam, int packet_number);
int mysql_send_custom_error ( int mysql_send_custom_error (
DCB *dcb, DCB *dcb,
int packet_number, int packet_number,
int in_affected_rows, int in_affected_rows,
const char* mysql_message); const char* mysql_message);
GWBUF* mysql_create_custom_error(
int packet_number,
int affected_rows,
const char* msg);
int gw_send_change_user_to_backend( int gw_send_change_user_to_backend(
char *dbname, char *dbname,
char *user, char *user,
@ -297,12 +306,12 @@ void gw_str_xor(
const uint8_t *input1, const uint8_t *input1,
const uint8_t *input2, const uint8_t *input2,
unsigned int len); unsigned int len);
char *gw_bin2hex(char *out, const uint8_t *in, unsigned int len);
int gw_hex2bin(uint8_t *out, const char *in, unsigned int len); char *gw_bin2hex(char *out, const uint8_t *in, unsigned int len);
int gw_generate_random_str(char *output, int len); int gw_hex2bin(uint8_t *out, const char *in, unsigned int len);
char *gw_strend(register const char *s); int gw_generate_random_str(char *output, int len);
int setnonblocking(int fd); char *gw_strend(register const char *s);
int setipaddress(struct in_addr *a, char *p); int setnonblocking(int fd);
int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b); int setipaddress(struct in_addr *a, char *p);
GWBUF* gw_MySQL_get_next_packet(GWBUF** p_readbuf); GWBUF* gw_MySQL_get_next_packet(GWBUF** p_readbuf);

View File

@ -31,6 +31,18 @@
#include <dcb.h> #include <dcb.h>
typedef enum bref_state {
BREF_NOT_USED = 0x00,
BREF_IN_USE = 0x01,
BREF_WAITING_RESULT = 0x02, /*< for anything that responds */
BREF_CLOSED = 0x04
} bref_state_t;
#define BREF_IS_NOT_USED(s) (s->bref_state & BREF_NOT_USED)
#define BREF_IS_IN_USE(s) (s->bref_state & BREF_IN_USE)
#define BREF_IS_WAITING_RESULT(s) (s->bref_state & BREF_WAITING_RESULT)
#define BREF_IS_CLOSED(s) (s->bref_state & BREF_CLOSED)
typedef enum backend_type_t { typedef enum backend_type_t {
BE_UNDEFINED=-1, BE_UNDEFINED=-1,
BE_MASTER, BE_MASTER,
@ -43,8 +55,8 @@ typedef struct rses_property_st rses_property_t;
typedef struct router_client_session ROUTER_CLIENT_SES; typedef struct router_client_session ROUTER_CLIENT_SES;
typedef enum rses_property_type_t { typedef enum rses_property_type_t {
RSES_PROP_TYPE_UNDEFINED=0, RSES_PROP_TYPE_UNDEFINED=-1,
RSES_PROP_TYPE_SESCMD, RSES_PROP_TYPE_SESCMD=0,
RSES_PROP_TYPE_FIRST = RSES_PROP_TYPE_SESCMD, RSES_PROP_TYPE_FIRST = RSES_PROP_TYPE_SESCMD,
RSES_PROP_TYPE_LAST=RSES_PROP_TYPE_SESCMD, RSES_PROP_TYPE_LAST=RSES_PROP_TYPE_SESCMD,
RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1 RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1
@ -159,6 +171,7 @@ typedef struct backend_ref_st {
#endif #endif
BACKEND* bref_backend; BACKEND* bref_backend;
DCB* bref_dcb; DCB* bref_dcb;
bref_state_t bref_state;
sescmd_cursor_t bref_sescmd_cur; sescmd_cursor_t bref_sescmd_cur;
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
skygw_chk_t bref_chk_tail; skygw_chk_t bref_chk_tail;

View File

@ -73,6 +73,8 @@ static void diagnostics(DCB *, void *);
static void setInterval(void *, unsigned long); static void setInterval(void *, unsigned long);
static void defaultId(void *, unsigned long); static void defaultId(void *, unsigned long);
static void replicationHeartbeat(void *, int); static void replicationHeartbeat(void *, int);
static bool mon_status_changed(MONITOR_SERVERS* mon_srv);
static bool mon_print_fail_status(MONITOR_SERVERS* mon_srv);
static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUser, diagnostics, setInterval, defaultId, replicationHeartbeat }; static MONITOR_OBJECT MyObject = { startMonitor, stopMonitor, registerServer, unregisterServer, defaultUser, diagnostics, setInterval, defaultId, replicationHeartbeat };
@ -142,6 +144,7 @@ MYSQL_MONITOR *handle;
handle->defaultPasswd = NULL; handle->defaultPasswd = NULL;
handle->id = MONITOR_DEFAULT_ID; handle->id = MONITOR_DEFAULT_ID;
handle->interval = MONITOR_INTERVAL; handle->interval = MONITOR_INTERVAL;
handle->replicationHeartbeat = 0;
spinlock_init(&handle->lock); spinlock_init(&handle->lock);
} }
handle->tid = (THREAD)thread_start(monitorMain, handle); handle->tid = (THREAD)thread_start(monitorMain, handle);
@ -180,7 +183,10 @@ MONITOR_SERVERS *ptr, *db;
db->server = server; db->server = server;
db->con = NULL; db->con = NULL;
db->next = NULL; db->next = NULL;
db->mon_err_count = 0;
db->mon_prev_status = 0;
spinlock_acquire(&handle->lock); spinlock_acquire(&handle->lock);
if (handle->databases == NULL) if (handle->databases == NULL)
handle->databases = db; handle->databases = db;
else else
@ -307,21 +313,25 @@ char *sep;
static void static void
monitorDatabase(MYSQL_MONITOR *handle, MONITOR_SERVERS *database) monitorDatabase(MYSQL_MONITOR *handle, MONITOR_SERVERS *database)
{ {
MYSQL_ROW row; MYSQL_ROW row;
MYSQL_RES *result; MYSQL_RES *result;
int num_fields; int num_fields;
int ismaster = 0, isslave = 0; int ismaster = 0;
char *uname = handle->defaultUser, *passwd = handle->defaultPasswd; int isslave = 0;
unsigned long int server_version = 0; char *uname = handle->defaultUser;
char *server_string; char *passwd = handle->defaultPasswd;
unsigned long id = handle->id; unsigned long int server_version = 0;
int replication_heartbeat = handle->replicationHeartbeat; char *server_string;
unsigned long id = handle->id;
int replication_heartbeat = handle->replicationHeartbeat;
static int conn_err_count;
if (database->server->monuser != NULL) if (database->server->monuser != NULL)
{ {
uname = database->server->monuser; uname = database->server->monuser;
passwd = database->server->monpw; passwd = database->server->monpw;
} }
if (uname == NULL) if (uname == NULL)
return; return;
@ -329,12 +339,17 @@ int replication_heartbeat = handle->replicationHeartbeat;
if (SERVER_IN_MAINT(database->server)) if (SERVER_IN_MAINT(database->server))
return; return;
/** Store prevous status */
database->mon_prev_status = database->server->status;
if (database->con == NULL || mysql_ping(database->con) != 0) if (database->con == NULL || mysql_ping(database->con) != 0)
{ {
char *dpwd = decryptPassword(passwd); char *dpwd = decryptPassword(passwd);
int rc; int rc;
int read_timeout = 1; int read_timeout = 1;
database->con = mysql_init(NULL);
database->con = mysql_init(NULL);
rc = mysql_options(database->con, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout); rc = mysql_options(database->con, MYSQL_OPT_READ_TIMEOUT, (void *)&read_timeout);
if (mysql_real_connect(database->con, if (mysql_real_connect(database->con,
@ -346,23 +361,27 @@ int replication_heartbeat = handle->replicationHeartbeat;
NULL, NULL,
0) == NULL) 0) == NULL)
{ {
LOGIF(LE, (skygw_log_write_flush( free(dpwd);
LOGFILE_ERROR,
"Error : Monitor was unable to connect to "
"server %s:%d : \"%s\"",
database->server->name,
database->server->port,
mysql_error(database->con))));
free(dpwd); if (mon_print_fail_status(database))
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Monitor was unable to connect to "
"server %s:%d : \"%s\"",
database->server->name,
database->server->port,
mysql_error(database->con))));
}
/** Store current status */
server_clear_status(database->server, SERVER_RUNNING); server_clear_status(database->server, SERVER_RUNNING);
return; return;
} }
free(dpwd); free(dpwd);
} }
/** Store current status */
/* If we get this far then we have a working connection */ server_set_status(database->server, SERVER_RUNNING);
server_set_status(database->server, SERVER_RUNNING);
/* get server version from current server */ /* get server version from current server */
server_version = mysql_get_server_version(database->con); server_version = mysql_get_server_version(database->con);
@ -529,7 +548,7 @@ int replication_heartbeat = handle->replicationHeartbeat;
} }
mysql_free_result(result); mysql_free_result(result);
if (isslave == i) if (isslave > 0 && isslave == i)
isslave = 1; isslave = 1;
else else
isslave = 0; isslave = 0;
@ -622,7 +641,7 @@ int replication_heartbeat = handle->replicationHeartbeat;
} }
} }
} }
/** Store current status */
if (ismaster) if (ismaster)
{ {
server_set_status(database->server, SERVER_MASTER); server_set_status(database->server, SERVER_MASTER);
@ -672,12 +691,15 @@ MONITOR_SERVERS *ptr;
ptr = handle->databases; ptr = handle->databases;
while (ptr) while (ptr)
{ {
unsigned int prev_status = ptr->server->status;
monitorDatabase(handle, ptr); monitorDatabase(handle, ptr);
if (ptr->server->status != prev_status || if (mon_status_changed(ptr))
SERVER_IS_DOWN(ptr->server)) {
dcb_call_foreach(ptr->server, DCB_REASON_NOT_RESPONDING);
}
if (mon_status_changed(ptr) ||
mon_print_fail_status(ptr))
{ {
LOGIF(LM, (skygw_log_write_flush( LOGIF(LM, (skygw_log_write_flush(
LOGFILE_MESSAGE, LOGFILE_MESSAGE,
@ -686,7 +708,16 @@ MONITOR_SERVERS *ptr;
ptr->server->port, ptr->server->port,
STRSRVSTATUS(ptr->server)))); STRSRVSTATUS(ptr->server))));
} }
if (SERVER_IS_DOWN(ptr->server))
{
/** Increase this server'e error count */
ptr->mon_err_count += 1;
}
else
{
/** Reset this server's error count */
ptr->mon_err_count = 0;
}
ptr = ptr->next; ptr = ptr->next;
} }
thread_millisleep(handle->interval); thread_millisleep(handle->interval);
@ -731,3 +762,39 @@ replicationHeartbeat(void *arg, int replicationHeartbeat)
MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg; MYSQL_MONITOR *handle = (MYSQL_MONITOR *)arg;
memcpy(&handle->replicationHeartbeat, &replicationHeartbeat, sizeof(int)); memcpy(&handle->replicationHeartbeat, &replicationHeartbeat, sizeof(int));
} }
static bool mon_status_changed(
MONITOR_SERVERS* mon_srv)
{
bool succp;
if (mon_srv->mon_prev_status != mon_srv->server->status)
{
succp = true;
}
else
{
succp = false;
}
return succp;
}
static bool mon_print_fail_status(
MONITOR_SERVERS* mon_srv)
{
bool succp;
int errcount = mon_srv->mon_err_count;
uint8_t modval;
modval = 1<<(MIN(errcount/10, 7));
if (SERVER_IS_DOWN(mon_srv->server) && errcount%modval == 0)
{
succp = true;
}
else
{
succp = false;
}
return succp;
}

View File

@ -42,6 +42,8 @@
typedef struct monitor_servers { typedef struct monitor_servers {
SERVER *server; /**< The server being monitored */ SERVER *server; /**< The server being monitored */
MYSQL *con; /**< The MySQL connection */ MYSQL *con; /**< The MySQL connection */
int mon_err_count;
unsigned int mon_prev_status;
struct monitor_servers struct monitor_servers
*next; /**< The next server in the list */ *next; /**< The next server in the list */
} MONITOR_SERVERS; } MONITOR_SERVERS;

View File

@ -245,7 +245,7 @@ HTTPD_session *client_data = NULL;
} }
/* force the client connecton close */ /* force the client connecton close */
dcb->func.close(dcb); dcb_close(dcb);
return 0; return 0;
} }
@ -359,7 +359,6 @@ int n_connect = 0;
static int static int
httpd_close(DCB *dcb) httpd_close(DCB *dcb)
{ {
dcb_close(dcb);
return 0; return 0;
} }

View File

@ -65,7 +65,9 @@ static int gw_backend_hangup(DCB *dcb);
static int backend_write_delayqueue(DCB *dcb); static int backend_write_delayqueue(DCB *dcb);
static void backend_set_delayqueue(DCB *dcb, GWBUF *queue); static void backend_set_delayqueue(DCB *dcb, GWBUF *queue);
static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue); static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue);
static int gw_session(DCB *backend_dcb, void *data); #if defined(NOT_USED)
static int gw_session(DCB *backend_dcb, void *data);
#endif
static MYSQL_session* gw_get_shared_session_auth_info(DCB* dcb); static MYSQL_session* gw_get_shared_session_auth_info(DCB* dcb);
static GWPROTOCOL MyObject = { static GWPROTOCOL MyObject = {
@ -79,7 +81,7 @@ static GWPROTOCOL MyObject = {
gw_backend_close, /* Close */ gw_backend_close, /* Close */
NULL, /* Listen */ NULL, /* Listen */
gw_change_user, /* Authentication */ gw_change_user, /* Authentication */
gw_session /* Session */ NULL /* Session */
}; };
/* /*
@ -195,6 +197,14 @@ static int gw_read_backend_event(DCB *dcb) {
if (gw_read_backend_handshake(backend_protocol) != 0) { if (gw_read_backend_handshake(backend_protocol) != 0) {
backend_protocol->state = MYSQL_AUTH_FAILED; backend_protocol->state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] after "
"gw_read_backend_handshake, fd %d, "
"state = MYSQL_AUTH_FAILED.",
pthread_self(),
backend_protocol->owner_dcb->fd)));
} else { } else {
/* handshake decoded, send the auth credentials */ /* handshake decoded, send the auth credentials */
if (gw_send_authentication_to_backend( if (gw_send_authentication_to_backend(
@ -204,6 +214,13 @@ static int gw_read_backend_event(DCB *dcb) {
backend_protocol) != 0) backend_protocol) != 0)
{ {
backend_protocol->state = MYSQL_AUTH_FAILED; backend_protocol->state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] after "
"gw_send_authentication_to_backend "
"fd %d, state = MYSQL_AUTH_FAILED.",
pthread_self(),
backend_protocol->owner_dcb->fd)));
} else { } else {
backend_protocol->state = MYSQL_AUTH_RECV; backend_protocol->state = MYSQL_AUTH_RECV;
} }
@ -240,6 +257,7 @@ static int gw_read_backend_event(DCB *dcb) {
router = session->service->router; router = session->service->router;
router_instance = session->service->router_instance; router_instance = session->service->router_instance;
rsession = session->router_session;
if (backend_protocol->state == MYSQL_AUTH_RECV) { if (backend_protocol->state == MYSQL_AUTH_RECV) {
/*< /*<
@ -251,6 +269,14 @@ static int gw_read_backend_event(DCB *dcb) {
switch (receive_rc) { switch (receive_rc) {
case -1: case -1:
backend_protocol->state = MYSQL_AUTH_FAILED; backend_protocol->state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] after "
"gw_receive_backend_authentication "
"fd %d, state = MYSQL_AUTH_FAILED.",
backend_protocol->owner_dcb->fd,
pthread_self())));
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
@ -298,72 +324,50 @@ static int gw_read_backend_event(DCB *dcb) {
*/ */
spinlock_release(&dcb->authlock); spinlock_release(&dcb->authlock);
spinlock_acquire(&dcb->delayqlock); spinlock_acquire(&dcb->delayqlock);
/*<
* vraa : errorHandle if (dcb->delayq != NULL)
* check the delayq before the reply {
*/ while ((dcb->delayq = gwbuf_consume(
if (dcb->delayq != NULL) {
/* send an error to the client */
mysql_send_custom_error(
dcb->session->client,
1,
0,
"Connection to backend lost.");
// consume all the delay queue
while ((dcb->delayq = gwbuf_consume(
dcb->delayq, dcb->delayq,
GWBUF_LENGTH(dcb->delayq))) != NULL); GWBUF_LENGTH(dcb->delayq))) != NULL);
} }
spinlock_release(&dcb->delayqlock); spinlock_release(&dcb->delayqlock);
/* try reload users' table for next connection */
service_refresh_users(dcb->session->service);
while (session->state != SESSION_STATE_ROUTER_READY &&
session->state != SESSION_STATE_STOPPING)
{ {
ss_dassert( GWBUF* errbuf;
session->state == SESSION_STATE_READY || bool succp;
session->state ==
SESSION_STATE_ROUTER_READY || /* try reload users' table for next connection */
session->state == SESSION_STATE_STOPPING); service_refresh_users(dcb->session->service);
/** #if defined(SS_DEBUG)
* Session shouldn't be NULL at this point LOGIF(LE, (skygw_log_write_flush(
* anymore. Just checking.. LOGFILE_ERROR,
*/ "Backend read error handling.")));
if (session->client->session == NULL) #endif
errbuf = mysql_create_custom_error(
1,
0,
"Authentication with backend failed. "
"Session will be closed.");
router->handleError(router_instance,
rsession,
errbuf,
dcb,
ERRACT_REPLY_CLIENT,
&succp);
ss_dassert(!succp);
if (session != NULL)
{ {
rc = 1; spinlock_acquire(&session->ses_lock);
goto return_rc; session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
} }
usleep(1); dcb_close(dcb);
} }
if (session->state == SESSION_STATE_STOPPING)
{
goto return_rc;
}
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
/**
* rsession shouldn't be NULL since session
* state indicates that it was initialized
* successfully.
*/
rsession = session->router_session;
ss_dassert(rsession != NULL);
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [gw_read_backend_event] "
"Call closeSession for backend's "
"router client session.",
pthread_self())));
/* close router_session */
router->closeSession(router_instance, rsession);
rc = 1; rc = 1;
goto return_rc; goto return_rc;
} }
@ -401,17 +405,56 @@ static int gw_read_backend_event(DCB *dcb) {
SESSION *session = dcb->session; SESSION *session = dcb->session;
CHK_SESSION(session); CHK_SESSION(session);
/* read available backend data */ router = session->service->router;
rc = dcb_read(dcb, &writebuf); router_instance = session->service->router_instance;
rsession = session->router_session;
if (rc < 0) { /* read available backend data */
rc = dcb_read(dcb, &writebuf);
if (rc < 0)
{
/*< vraa : errorHandle */ /*< vraa : errorHandle */
/*< /*<
* Backend generated EPOLLIN event and if backend has * Backend generated EPOLLIN event and if backend has
* failed, connection must be closed to avoid backend * failed, connection must be closed to avoid backend
* dcb from getting hanged. * dcb from getting hanged.
*/ */
(dcb->func).close(dcb); GWBUF* errbuf;
bool succp;
/**
* - send error for client
* - mark failed backend BREF_NOT_USED
* - go through all servers and select one according to
* the criteria that user specified in the beginning.
*/
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend read error handling #2.")));
#endif
errbuf = mysql_create_custom_error(
1,
0,
"Read from backend failed");
router->handleError(router_instance,
rsession,
errbuf,
dcb,
ERRACT_NEW_CONNECTION,
&succp);
if (!succp)
{
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
}
dcb_close(dcb);
rc = 0; rc = 0;
goto return_rc; goto return_rc;
} }
@ -420,18 +463,6 @@ static int gw_read_backend_event(DCB *dcb) {
rc = 0; rc = 0;
goto return_rc; goto return_rc;
} }
router = session->service->router;
router_instance = session->service->router_instance;
rsession = session->router_session;
/* Note the gwbuf doesn't have here a valid queue->command
* descriptions as it is a fresh new one!
* We only have the copied value in dcb->command from
* previuos func.write() and this will be used by the
* router->clientReply
* and pass now the gwbuf to the router
*/
/*< /*<
* If dcb->session->client is freed already it may be NULL. * If dcb->session->client is freed already it may be NULL.
*/ */
@ -443,7 +474,8 @@ static int gw_read_backend_event(DCB *dcb) {
if (client_protocol->state == MYSQL_IDLE) if (client_protocol->state == MYSQL_IDLE)
{ {
router->clientReply(router_instance, gwbuf_set_type(writebuf, GWBUF_TYPE_MYSQL);
router->clientReply(router_instance,
rsession, rsession,
writebuf, writebuf,
dcb); dcb);
@ -451,6 +483,7 @@ static int gw_read_backend_event(DCB *dcb) {
} }
goto return_rc; goto return_rc;
} else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL) { } else if (dcb->session->client->dcb_role == DCB_ROLE_INTERNAL) {
gwbuf_set_type(writebuf, GWBUF_TYPE_MYSQL);
router->clientReply(router_instance, rsession, writebuf, dcb); router->clientReply(router_instance, rsession, writebuf, dcb);
rc = 1; rc = 1;
} }
@ -550,29 +583,6 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
MySQLProtocol *backend_protocol = dcb->protocol; MySQLProtocol *backend_protocol = dcb->protocol;
int rc = 0; int rc = 0;
/*<
* Don't write to backend if backend_dcb is not in poll set anymore.
*/
spinlock_acquire(&dcb->dcb_initlock);
if (dcb->state != DCB_STATE_POLLING) {
/*< vraa : errorHandle */
/*< Free buffer memory */
gwbuf_consume(queue, GWBUF_LENGTH(queue));
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_MySQLWrite_backend] Write to backend failed. "
"Backend dcb %p fd %d is %s.",
pthread_self(),
dcb,
dcb->fd,
STRDCBSTATE(dcb->state))));
spinlock_release(&dcb->dcb_initlock);
rc = 0;
goto return_rc;
}
spinlock_release(&dcb->dcb_initlock);
spinlock_acquire(&dcb->authlock); spinlock_acquire(&dcb->authlock);
/** /**
* Pick action according to state of protocol. * Pick action according to state of protocol.
@ -600,11 +610,11 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
queue, queue,
GWBUF_LENGTH(queue))) != NULL); GWBUF_LENGTH(queue))) != NULL);
free(str); free(str);
}
rc = 0; rc = 0;
spinlock_release(&dcb->authlock); spinlock_release(&dcb->authlock);
goto return_rc; goto return_rc;
break; break;
}
case MYSQL_IDLE: case MYSQL_IDLE:
LOGIF(LD, (skygw_log_write( LOGIF(LD, (skygw_log_write(
@ -616,6 +626,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
dcb->fd, dcb->fd,
STRPROTOCOLSTATE(backend_protocol->state)))); STRPROTOCOLSTATE(backend_protocol->state))));
spinlock_release(&dcb->authlock); spinlock_release(&dcb->authlock);
rc = dcb_write(dcb, queue); rc = dcb_write(dcb, queue);
goto return_rc; goto return_rc;
break; break;
@ -644,73 +655,57 @@ return_rc:
} }
/** /**
* Backend Error Handling for EPOLLER * Error event handler.
* * Create error message, pass it to router's error handler and if error
* handler fails in providing enough backend servers, mark session being
* closed and call DCB close function which triggers closing router session
* and related backends (if any exists.
*/ */
static int gw_error_backend_event(DCB *dcb) { static int gw_error_backend_event(DCB *dcb)
SESSION *session; {
void *rsession; SESSION* session;
ROUTER_OBJECT *router; void* rsession;
ROUTER *router_instance; ROUTER_OBJECT* router;
int rc = 0; ROUTER* router_instance;
int rc = 0;
GWBUF* errbuf;
bool succp;
CHK_DCB(dcb); CHK_DCB(dcb);
session = dcb->session; session = dcb->session;
CHK_SESSION(session); CHK_SESSION(session);
rsession = session->router_session;
router = session->service->router;
router_instance = session->service->router_instance;
router = session->service->router; #if defined(SS_DEBUG)
router_instance = session->service->router_instance; LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend error event handling.")));
#endif
if (dcb->state != DCB_STATE_POLLING) {
/*< vraa : errorHandle */
/*<
* if client is not available it needs to be handled in send
* function. Session != NULL, that is known.
*/
mysql_send_custom_error(
dcb->session->client,
1,
0,
"Writing to backend failed.");
rc = 0; errbuf = mysql_create_custom_error(
} else { 1,
/*< vraa : errorHandle */ 0,
mysql_send_custom_error( "Lost connection to backend server.");
dcb->session->client,
1,
0,
"Closed backend connection.");
rc = 1;
}
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [gw_error_backend_event] Some error occurred in backend. "
"rc = %d",
pthread_self(),
rc)));
if (session->state == SESSION_STATE_ROUTER_READY) router->handleError(router_instance,
{ rsession,
errbuf,
dcb,
ERRACT_NEW_CONNECTION,
&succp);
/** There are not required backends available, close session. */
if (!succp) {
spinlock_acquire(&session->ses_lock); spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING; session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock); spinlock_release(&session->ses_lock);
rsession = session->router_session;
/*<
* rsession should never be NULL here.
*/
ss_dassert(rsession != NULL);
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [gw_error_backend_event] "
"Call closeSession for backend "
"session.",
pthread_self())));
router->closeSession(router_instance, rsession);
} }
return rc; dcb_close(dcb);
return 1;
} }
/* /*
@ -811,7 +806,11 @@ return_fd:
/** /**
* Hangup routine the backend dcb: it does nothing * Error event handler.
* Create error message, pass it to router's error handler and if error
* handler fails in providing enough backend servers, mark session being
* closed and call DCB close function which triggers closing router session
* and related backends (if any exists.
* *
* @param dcb The current Backend DCB * @param dcb The current Backend DCB
* @return 1 always * @return 1 always
@ -819,21 +818,90 @@ return_fd:
static int static int
gw_backend_hangup(DCB *dcb) gw_backend_hangup(DCB *dcb)
{ {
/*< vraa : errorHandle */ SESSION* session;
void* rsession;
ROUTER_OBJECT* router;
ROUTER* router_instance;
int rc = 0;
bool succp;
GWBUF* errbuf;
CHK_DCB(dcb);
session = dcb->session;
CHK_SESSION(session);
rsession = session->router_session;
router = session->service->router;
router_instance = session->service->router_instance;
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend hangup error handling.")));
#endif
errbuf = mysql_create_custom_error(
1,
0,
"Lost connection to backend server.");
router->handleError(router_instance,
rsession,
errbuf,
dcb,
ERRACT_NEW_CONNECTION,
&succp);
/** There are not required backends available, close session. */
if (!succp) {
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Backend hangup -> closing session.")));
#endif
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
}
dcb_close(dcb);
return 1; return 1;
} }
/** /**
* Close the backend dcb * Send COM_QUIT to backend so that it can be closed.
*
* @param dcb The current Backend DCB * @param dcb The current Backend DCB
* @return 1 always * @return 1 always
*/ */
static int static int
gw_backend_close(DCB *dcb) gw_backend_close(DCB *dcb)
{ {
/*< vraa : errorHandle */ DCB* client_dcb;
dcb_close(dcb); SESSION* session;
GWBUF* quitbuf;
bool succp;
CHK_DCB(dcb);
session = dcb->session;
CHK_SESSION(session);
quitbuf = mysql_create_com_quit(NULL, 0);
/** Send COM_QUIT to the backend being closed */
mysql_send_com_quit(dcb, 0, quitbuf);
if (session != NULL && session->state == SESSION_STATE_STOPPING)
{
client_dcb = session->client;
if (client_dcb != NULL &&
client_dcb->state == DCB_STATE_POLLING)
{
/** Close client DCB */
dcb_close(client_dcb);
}
}
return 1; return 1;
} }
@ -883,27 +951,56 @@ static int backend_write_delayqueue(DCB *dcb)
} }
else else
{ {
localq = dcb->delayq; localq = dcb->delayq;
dcb->delayq = NULL; dcb->delayq = NULL;
spinlock_release(&dcb->delayqlock); spinlock_release(&dcb->delayqlock);
rc = dcb_write(dcb, localq); rc = dcb_write(dcb, localq);
} }
if (rc == 0) { if (rc == 0)
{
GWBUF* errbuf;
bool succp;
ROUTER_OBJECT *router = NULL;
ROUTER *router_instance = NULL;
void *rsession = NULL;
SESSION *session = dcb->session;
int receive_rc = 0;
CHK_SESSION(session);
router = session->service->router;
router_instance = session->service->router_instance;
rsession = session->router_session;
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : failed to write buffered data to back-end " "Backend write delayqueue error handling.")));
"server. Buffer was empty of back-end was disconnected " #endif
"during operation."))); errbuf = mysql_create_custom_error(
mysql_send_custom_error(
dcb->session->client,
1, 1,
0, 0,
"Failed to write buffered data to back-end server. " "Failed to write buffered data to back-end server. "
"Buffer was empty or back-end was disconnected during " "Buffer was empty or back-end was disconnected during "
"operation."); "operation. Session will be closed.");
dcb_close(dcb);
router->handleError(router_instance,
rsession,
errbuf,
dcb,
ERRACT_NEW_CONNECTION,
&succp);
if (!succp)
{
if (session != NULL)
{
spinlock_acquire(&session->ses_lock);
session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock);
}
dcb_close(dcb);
}
} }
return rc; return rc;
@ -911,7 +1008,12 @@ static int backend_write_delayqueue(DCB *dcb)
static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWBUF *queue) { static int gw_change_user(
DCB *backend,
SERVER *server,
SESSION *in_session,
GWBUF *queue)
{
MYSQL_session *current_session = NULL; MYSQL_session *current_session = NULL;
MySQLProtocol *backend_protocol = NULL; MySQLProtocol *backend_protocol = NULL;
MySQLProtocol *client_protocol = NULL; MySQLProtocol *client_protocol = NULL;
@ -997,6 +1099,7 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB
* @param * @param
* @return always 1 * @return always 1
*/ */
/*
static int gw_session(DCB *backend_dcb, void *data) { static int gw_session(DCB *backend_dcb, void *data) {
GWBUF *queue = NULL; GWBUF *queue = NULL;
@ -1006,3 +1109,4 @@ static int gw_session(DCB *backend_dcb, void *data) {
return 1; return 1;
} }
*/

View File

@ -483,6 +483,11 @@ static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue) {
if (auth_token) if (auth_token)
free(auth_token); free(auth_token);
if (auth_ret == 0)
{
dcb->user = strdup(client_data->user);
}
return auth_ret; return auth_ret;
} }
@ -504,75 +509,32 @@ gw_MySQLWrite_client(DCB *dcb, GWBUF *queue)
* @param dcb Descriptor control block * @param dcb Descriptor control block
* @return 0 if succeed, 1 otherwise * @return 0 if succeed, 1 otherwise
*/ */
int gw_read_client_event(DCB* dcb) { int gw_read_client_event(
DCB* dcb)
{
SESSION *session = NULL; SESSION *session = NULL;
ROUTER_OBJECT *router = NULL; ROUTER_OBJECT *router = NULL;
ROUTER *router_instance = NULL; ROUTER *router_instance = NULL;
void *rsession = NULL; void *rsession = NULL;
MySQLProtocol *protocol = NULL; MySQLProtocol *protocol = NULL;
GWBUF *read_buffer = NULL; GWBUF *read_buffer = NULL;
int b = -1;
int rc = 0; int rc = 0;
int nbytes_read = 0; int nbytes_read = 0;
CHK_DCB(dcb); CHK_DCB(dcb);
protocol = DCB_PROTOCOL(dcb, MySQLProtocol); protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
CHK_PROTOCOL(protocol); CHK_PROTOCOL(protocol);
/** rc = dcb_read(dcb, &read_buffer);
* Check how many bytes are readable in dcb->fd.
*/
if (ioctl(dcb->fd, FIONREAD, &b) != 0) {
int eno = errno;
errno = 0;
LOGIF(LE, (skygw_log_write( if (rc < 0)
LOGFILE_ERROR, {
"%lu [gw_read_client_event] ioctl FIONREAD for fd " dcb_close(dcb);
"%d failed. errno %d, %s. dcb->state = %d",
pthread_self(),
dcb->fd,
eno,
strerror(eno),
dcb->state)));
rc = 1;
goto return_rc;
} }
/*
* Handle the closed client socket.
*/
if (b == 0) {
char c;
int l_errno = 0;
int r = -1;
rc = 0;
/* try to read 1 byte, without consuming the socket buffer */
r = recv(dcb->fd, &c, sizeof(char), MSG_PEEK);
l_errno = errno;
if (r <= 0) {
if ( (l_errno == EAGAIN) || (l_errno == EWOULDBLOCK)) {
goto return_rc;
}
// close client socket and the session too
dcb->func.close(dcb);
} else {
// do nothing if reading 1 byte
}
goto return_rc;
}
rc = gw_read_gwbuff(dcb, &read_buffer, b);
if (rc != 0) {
goto return_rc;
}
nbytes_read = gwbuf_length(read_buffer); nbytes_read = gwbuf_length(read_buffer);
ss_dassert(nbytes_read > 0);
if (nbytes_read == 0)
{
goto return_rc;
}
/** /**
* if read queue existed appent read to it. * if read queue existed appent read to it.
* if length of read buffer is less than 3 or less than mysql packet * if length of read buffer is less than 3 or less than mysql packet
@ -602,7 +564,8 @@ int gw_read_client_event(DCB* dcb) {
else else
{ {
/** /**
* There is at least one complete mysql packet read * There is at least one complete mysql packet in
* read_buffer.
*/ */
read_buffer = dcb->dcb_readqueue; read_buffer = dcb->dcb_readqueue;
dcb->dcb_readqueue = NULL; dcb->dcb_readqueue = NULL;
@ -627,58 +590,80 @@ int gw_read_client_event(DCB* dcb) {
switch (protocol->state) { switch (protocol->state) {
case MYSQL_AUTH_SENT: case MYSQL_AUTH_SENT:
/*
* Read all the data that is available into a chain of buffers
*/
{ {
int auth_val = -1; int auth_val = -1;
auth_val = gw_mysql_do_authentication(dcb, read_buffer); auth_val = gw_mysql_do_authentication(dcb, read_buffer);
// Data handled withot the dcb->func.write
// so consume it now
// be sure to consume it all
read_buffer = gwbuf_consume(read_buffer, nbytes_read); read_buffer = gwbuf_consume(read_buffer, nbytes_read);
ss_dassert(read_buffer == NULL || GWBUF_EMPTY(read_buffer));
if (auth_val == 0) if (auth_val == 0)
{ {
SESSION *session = NULL; SESSION *session = NULL;
protocol->state = MYSQL_AUTH_RECV; protocol->state = MYSQL_AUTH_RECV;
//write to client mysql AUTH_OK packet, packet n. is 2 /**
// start a new session, and connect to backends * Create session, and a router session for it.
* If successful, there will be backend connection(s)
* after this point.
*/
session = session_alloc(dcb->service, dcb); session = session_alloc(dcb->service, dcb);
if (session != NULL) { if (session != NULL)
{
CHK_SESSION(session); CHK_SESSION(session);
ss_dassert(session->state != SESSION_STATE_ALLOC); ss_dassert(session->state != SESSION_STATE_ALLOC);
protocol->state = MYSQL_IDLE; protocol->state = MYSQL_IDLE;
/**
* Send an AUTH_OK packet to the client,
* packet sequence is # 2
*/
mysql_send_ok(dcb, 2, 0, NULL); mysql_send_ok(dcb, 2, 0, NULL);
} else { }
else
{
protocol->state = MYSQL_AUTH_FAILED; protocol->state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_client_event] session "
"creation failed. fd %d, "
"state = MYSQL_AUTH_FAILED.",
protocol->owner_dcb->fd,
pthread_self())));
/** Send ERR 1045 to client */
mysql_send_auth_error( mysql_send_auth_error(
dcb, dcb,
2, 2,
0, 0,
"failed to create new session"); "failed to create new session");
dcb->func.close(dcb);
dcb_close(dcb);
} }
} }
else else
{ {
protocol->state = MYSQL_AUTH_FAILED; protocol->state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_client_event] after "
"gw_mysql_do_authentication, fd %d, "
"state = MYSQL_AUTH_FAILED.",
protocol->owner_dcb->fd,
pthread_self())));
/** Send ERR 1045 to client */
mysql_send_auth_error( mysql_send_auth_error(
dcb, dcb,
2, 2,
0, 0,
"Authorization failed"); "Authorization failed");
dcb->func.close(dcb);
dcb_close(dcb);
} }
} }
break; break;
case MYSQL_IDLE: case MYSQL_IDLE:
/*
* Read all the data that is available into a chain of buffers
*/
{ {
uint8_t cap = 0; uint8_t cap = 0;
uint8_t *ptr_buff = NULL; uint8_t *ptr_buff = NULL;
@ -686,14 +671,16 @@ int gw_read_client_event(DCB* dcb) {
bool stmt_input; /*< router input type */ bool stmt_input; /*< router input type */
session = dcb->session; session = dcb->session;
ss_dassert( session!= NULL);
// get the backend session, if available if (session != NULL)
if (session != NULL) { {
CHK_SESSION(session); CHK_SESSION(session);
router = session->service->router; router = session->service->router;
router_instance = router_instance =
session->service->router_instance; session->service->router_instance;
rsession = session->router_session; rsession = session->router_session;
ss_dassert(rsession != NULL);
} }
/* Now, we are assuming in the first buffer there is /* Now, we are assuming in the first buffer there is
@ -710,9 +697,11 @@ int gw_read_client_event(DCB* dcb) {
* COM_QUIT : close client dcb * COM_QUIT : close client dcb
* else : write custom error to client dcb. * else : write custom error to client dcb.
*/ */
if(rsession == NULL) { if(rsession == NULL)
{
/** COM_QUIT */ /** COM_QUIT */
if (mysql_command == '\x01') { if (mysql_command == '\x01')
{
LOGIF(LD, (skygw_log_write_flush( LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG, LOGFILE_DEBUG,
"%lu [gw_read_client_event] Client read " "%lu [gw_read_client_event] Client read "
@ -720,8 +709,20 @@ int gw_read_client_event(DCB* dcb) {
"client dcb %p.", "client dcb %p.",
pthread_self(), pthread_self(),
dcb))); dcb)));
(dcb->func).close(dcb); /**
} else { * close router session and that closes
* backends
*/
dcb_close(dcb);
}
else
{
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Client read error handling.")));
#endif
/* Send a custom error as MySQL command reply */ /* Send a custom error as MySQL command reply */
mysql_send_custom_error( mysql_send_custom_error(
dcb, dcb,
@ -729,13 +730,13 @@ int gw_read_client_event(DCB* dcb) {
0, 0,
"Can't route query. Connection to " "Can't route query. Connection to "
"backend lost"); "backend lost");
protocol->state = MYSQL_IDLE;
} }
rc = 1; rc = 1;
/** Free buffer */ /** Free buffer */
read_buffer = gwbuf_consume(read_buffer, nbytes_read); read_buffer = gwbuf_consume(read_buffer, nbytes_read);
goto return_rc; goto return_rc;
} }
/** Ask what type of input the router expects */ /** Ask what type of input the router expects */
cap = router->getCapabilities(router_instance, rsession); cap = router->getCapabilities(router_instance, rsession);
@ -756,7 +757,6 @@ int gw_read_client_event(DCB* dcb) {
"%lu [gw_read_client_event] Reading router " "%lu [gw_read_client_event] Reading router "
"capabilities failed.", "capabilities failed.",
pthread_self()))); pthread_self())));
mysql_send_custom_error(dcb, mysql_send_custom_error(dcb,
1, 1,
0, 0,
@ -766,18 +766,19 @@ int gw_read_client_event(DCB* dcb) {
goto return_rc; goto return_rc;
} }
/** Route COM_QUIT to backend */ /** Route COM_QUIT to backend */
if (mysql_command == '\x01') { if (mysql_command == '\x01')
{
/**
* Sends COM_QUIT packets since buffer is already
* created. A BREF_CLOSED flag is set so dcb_close won't
* send redundant COM_QUIT.
*/
SESSION_ROUTE_QUERY(session, read_buffer); SESSION_ROUTE_QUERY(session, read_buffer);
LOGIF(LD, (skygw_log_write_flush( /**
LOGFILE_DEBUG, * Close router session which causes closing of backends.
"%lu [gw_read_client_event] Routed COM_QUIT to " */
"backend. Close client dcb %p", dcb_close(dcb);
pthread_self(),
dcb)));
/** close client connection, closes router session too */
rc = dcb->func.close(dcb);
} }
else else
{ {
@ -788,6 +789,7 @@ int gw_read_client_event(DCB* dcb) {
* to router. * to router.
*/ */
rc = route_by_statement(session, read_buffer); rc = route_by_statement(session, read_buffer);
if (read_buffer != NULL) if (read_buffer != NULL)
{ {
/** add incomplete mysql packet to read queue */ /** add incomplete mysql packet to read queue */
@ -804,13 +806,32 @@ int gw_read_client_event(DCB* dcb) {
if (rc == 1) { if (rc == 1) {
rc = 0; /**< here '0' means success */ rc = 0; /**< here '0' means success */
} else { } else {
mysql_send_custom_error(dcb, GWBUF* errbuf;
1, bool succp;
0,
"Query routing failed. " errbuf = mysql_create_custom_error(
"Connection to backend " 1,
"lost."); 0,
protocol->state = MYSQL_IDLE; "Write to backend failed. Session closed.");
#if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Client routing error handling.")));
#endif
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Routing the query failed. "
"Session will be closed.")));
router->handleError(router_instance,
rsession,
errbuf,
dcb,
ERRACT_REPLY_CLIENT,
&succp);
ss_dassert(!succp);
dcb_close(dcb);
} }
} }
goto return_rc; goto return_rc;
@ -1170,23 +1191,31 @@ int gw_MySQLAccept(DCB *listener)
client_dcb->fd = c_sock; client_dcb->fd = c_sock;
// get client address // get client address
if ( client_conn.sa_family == AF_UNIX) { if ( client_conn.sa_family == AF_UNIX)
{
// client address // client address
client_dcb->remote = strdup("localhost_from_socket"); client_dcb->remote = strdup("localhost_from_socket");
// set localhost IP for user authentication // set localhost IP for user authentication
(client_dcb->ipv4).sin_addr.s_addr = 0x0100007F; (client_dcb->ipv4).sin_addr.s_addr = 0x0100007F;
} else { }
else
{
/* client IPv4 in raw data*/ /* client IPv4 in raw data*/
memcpy(&client_dcb->ipv4, (struct sockaddr_in *)&client_conn, sizeof(struct sockaddr_in)); memcpy(&client_dcb->ipv4,
(struct sockaddr_in *)&client_conn,
sizeof(struct sockaddr_in));
/* client IPv4 in string representation */ /* client IPv4 in string representation */
client_dcb->remote = (char *)calloc(INET_ADDRSTRLEN+1, sizeof(char)); client_dcb->remote = (char *)calloc(INET_ADDRSTRLEN+1, sizeof(char));
if (client_dcb->remote != NULL) {
inet_ntop(AF_INET, &(client_dcb->ipv4).sin_addr, client_dcb->remote, INET_ADDRSTRLEN); if (client_dcb->remote != NULL)
{
inet_ntop(AF_INET,
&(client_dcb->ipv4).sin_addr,
client_dcb->remote,
INET_ADDRSTRLEN);
} }
} }
protocol = mysql_protocol_init(client_dcb, c_sock); protocol = mysql_protocol_init(client_dcb, c_sock);
ss_dassert(protocol != NULL); ss_dassert(protocol != NULL);
if (protocol == NULL) { if (protocol == NULL) {
@ -1223,7 +1252,7 @@ int gw_MySQLAccept(DCB *listener)
0, 0,
"MaxScale internal error."); "MaxScale internal error.");
/** delete client_dcb */ /** close client_dcb */
dcb_close(client_dcb); dcb_close(client_dcb);
/** Previous state is recovered in poll_add_dcb. */ /** Previous state is recovered in poll_add_dcb. */
@ -1260,14 +1289,21 @@ return_rc:
static int gw_error_client_event( static int gw_error_client_event(
DCB* dcb) DCB* dcb)
{ {
int rc; int rc;
SESSION* session;
CHK_DCB(dcb); CHK_DCB(dcb);
session = dcb->session;
CHK_SESSION(session);
rc = dcb->func.close(dcb); #if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
return rc; LOGFILE_ERROR,
"Client error event handling.")));
#endif
dcb_close(dcb);
return 1;
} }
static int static int
@ -1301,11 +1337,9 @@ gw_client_close(DCB *dcb)
router = session->service->router; router = session->service->router;
router_instance = session->service->router_instance; router_instance = session->service->router_instance;
rsession = session->router_session; rsession = session->router_session;
/** Close router session and all its connections */
router->closeSession(router_instance, rsession); router->closeSession(router_instance, rsession);
} }
dcb_close(dcb);
return 1; return 1;
} }
@ -1320,12 +1354,20 @@ gw_client_close(DCB *dcb)
static int static int
gw_client_hangup_event(DCB *dcb) gw_client_hangup_event(DCB *dcb)
{ {
int rc; int rc;
SESSION* session;
CHK_DCB(dcb); CHK_DCB(dcb);
rc = dcb->func.close(dcb); session = dcb->session;
CHK_SESSION(session);
return rc; #if defined(SS_DEBUG)
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Client hangup error handling.")));
#endif
dcb_close(dcb);
return 1;
} }

View File

@ -126,7 +126,9 @@ void gw_mysql_close(MySQLProtocol **ptr) {
* @param conn MySQL protocol structure * @param conn MySQL protocol structure
* @return 0 on success, 1 on failure * @return 0 on success, 1 on failure
*/ */
int gw_read_backend_handshake(MySQLProtocol *conn) { int gw_read_backend_handshake(
MySQLProtocol *conn)
{
GWBUF *head = NULL; GWBUF *head = NULL;
DCB *dcb = conn->owner_dcb; DCB *dcb = conn->owner_dcb;
int n = -1; int n = -1;
@ -135,12 +137,14 @@ int gw_read_backend_handshake(MySQLProtocol *conn) {
int success = 0; int success = 0;
int packet_len = 0; int packet_len = 0;
if ((n = dcb_read(dcb, &head)) != -1) { if ((n = dcb_read(dcb, &head)) != -1)
if (head) { {
if (head)
{
payload = GWBUF_DATA(head); payload = GWBUF_DATA(head);
h_len = gwbuf_length(head); h_len = gwbuf_length(head);
/* /**
* The mysql packets content starts at byte fifth * The mysql packets content starts at byte fifth
* just return with less bytes * just return with less bytes
*/ */
@ -148,10 +152,45 @@ int gw_read_backend_handshake(MySQLProtocol *conn) {
if (h_len <= 4) { if (h_len <= 4) {
/* log error this exit point */ /* log error this exit point */
conn->state = MYSQL_AUTH_FAILED; conn->state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_handshake] after "
"dcb_read, fd %d, "
"state = MYSQL_AUTH_FAILED.",
dcb->fd,
pthread_self())));
return 1; return 1;
} }
//get mysql packet size, 3 bytes if (payload[4] == 0xff)
{
size_t len = MYSQL_GET_PACKET_LEN(payload);
uint16_t errcode = MYSQL_GET_ERRCODE(payload);
char* bufstr = strndup(&((char *)payload)[7], len-3);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_receive_backend_auth] Invalid "
"authentication message from backend dcb %p "
"fd %d, ptr[4] = %p, error code %d, msg %s.",
pthread_self(),
dcb,
dcb->fd,
payload[4],
errcode,
bufstr)));
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Invalid authentication message "
"from backend. Error code: %d, Msg : %s",
errcode,
bufstr)));
free(bufstr);
}
//get mysql packet size, 3 bytes
packet_len = gw_mysql_get_byte3(payload); packet_len = gw_mysql_get_byte3(payload);
if (h_len < (packet_len + 4)) { if (h_len < (packet_len + 4)) {
@ -160,6 +199,15 @@ int gw_read_backend_handshake(MySQLProtocol *conn) {
* packet. Log error this exit point * packet. Log error this exit point
*/ */
conn->state = MYSQL_AUTH_FAILED; conn->state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_handshake] after "
"gw_mysql_get_byte3, fd %d, "
"state = MYSQL_AUTH_FAILED.",
pthread_self(),
dcb->fd,
pthread_self())));
return 1; return 1;
} }
@ -176,6 +224,15 @@ int gw_read_backend_handshake(MySQLProtocol *conn) {
* log error this exit point * log error this exit point
*/ */
conn->state = MYSQL_AUTH_FAILED; conn->state = MYSQL_AUTH_FAILED;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [gw_read_backend_handshake] after "
"gw_decode_mysql_server_handshake, fd %d, "
"state = MYSQL_AUTH_FAILED.",
pthread_self(),
conn->owner_dcb->fd,
pthread_self())));
return 1; return 1;
} }
@ -202,7 +259,10 @@ int gw_read_backend_handshake(MySQLProtocol *conn) {
* @return 0 on success, < 0 on failure * @return 0 on success, < 0 on failure
* *
*/ */
int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) { int gw_decode_mysql_server_handshake(
MySQLProtocol *conn,
uint8_t *payload)
{
uint8_t *server_version_end = NULL; uint8_t *server_version_end = NULL;
uint16_t mysql_server_capabilities_one = 0; uint16_t mysql_server_capabilities_one = 0;
uint16_t mysql_server_capabilities_two = 0; uint16_t mysql_server_capabilities_two = 0;
@ -216,8 +276,8 @@ int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) {
protocol_version = payload[0]; protocol_version = payload[0];
if (protocol_version != GW_MYSQL_PROTOCOL_VERSION) { if (protocol_version != GW_MYSQL_PROTOCOL_VERSION)
/* log error for this */ {
return -1; return -1;
} }
@ -257,19 +317,23 @@ int gw_decode_mysql_server_handshake(MySQLProtocol *conn, uint8_t *payload) {
payload+=2; payload+=2;
// get scramble len // get scramble len
if (payload[0] > 0) { if (payload[0] > 0)
{
scramble_len = payload[0] -1; scramble_len = payload[0] -1;
ss_dassert(scramble_len > GW_SCRAMBLE_LENGTH_323); ss_dassert(scramble_len > GW_SCRAMBLE_LENGTH_323);
ss_dassert(scramble_len <= GW_MYSQL_SCRAMBLE_SIZE); ss_dassert(scramble_len <= GW_MYSQL_SCRAMBLE_SIZE);
if ( (scramble_len < GW_SCRAMBLE_LENGTH_323) || scramble_len > GW_MYSQL_SCRAMBLE_SIZE) { if ((scramble_len < GW_SCRAMBLE_LENGTH_323) ||
scramble_len > GW_MYSQL_SCRAMBLE_SIZE)
{
/* log this */ /* log this */
return -2; return -2;
} }
} else { }
else
{
scramble_len = GW_MYSQL_SCRAMBLE_SIZE; scramble_len = GW_MYSQL_SCRAMBLE_SIZE;
} }
// skip 10 zero bytes // skip 10 zero bytes
payload += 11; payload += 11;
@ -321,26 +385,27 @@ int gw_receive_backend_auth(
} }
else if (ptr[4] == 0xff) else if (ptr[4] == 0xff)
{ {
size_t packetlen = MYSQL_GET_PACKET_LEN(ptr)+4; size_t len = MYSQL_GET_PACKET_LEN(ptr);
char* bufstr = (char *)calloc(1, packetlen-3); char* err = strndup(&((char *)ptr)[8], 5);
char* bufstr = strndup(&((char *)ptr)[13], len-4-5);
snprintf(bufstr, packetlen-6, "%s", &ptr[7]);
LOGIF(LD, (skygw_log_write( LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG, LOGFILE_DEBUG,
"%lu [gw_receive_backend_auth] Invalid " "%lu [gw_receive_backend_auth] Invalid "
"authentication message from backend dcb %p " "authentication message from backend dcb %p "
"fd %d, ptr[4] = %p, msg %s.", "fd %d, ptr[4] = %p, error %s, msg %s.",
pthread_self(), pthread_self(),
dcb, dcb,
dcb->fd, dcb->fd,
ptr[4], ptr[4],
err,
bufstr))); bufstr)));
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Invalid authentication message " "Error : Invalid authentication message "
"from backend. Msg : %s", "from backend. Error : %s, Msg : %s",
err,
bufstr))); bufstr)));
free(bufstr); free(bufstr);
@ -367,7 +432,7 @@ int gw_receive_backend_auth(
/*< /*<
* Remove data from buffer. * Remove data from buffer.
*/ */
head = gwbuf_consume(head, GWBUF_LENGTH(head)); while ((head = gwbuf_consume(head, GWBUF_LENGTH(head))) != NULL);
} }
else if (n == 0) else if (n == 0)
{ {
@ -634,8 +699,8 @@ int gw_do_connect_to_backend(
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error: Establishing connection to backend server " "Error: Establishing connection to backend server "
"%s:%d failed.\n\t\t Socket creation failed due " "%s:%d failed.\n\t\t Socket creation failed "
"%d, %s.", "due %d, %s.",
host, host,
port, port,
eno, eno,
@ -736,6 +801,145 @@ gw_mysql_protocol_state2string (int state) {
} }
} }
GWBUF* mysql_create_com_quit(
GWBUF* bufparam,
int packet_number)
{
uint8_t* data;
GWBUF* buf;
if (bufparam == NULL)
{
buf = gwbuf_alloc(COM_QUIT_PACKET_SIZE);
}
else
{
buf = bufparam;
}
if (buf == NULL)
{
return 0;
}
ss_dassert(GWBUF_LENGTH(buf) == COM_QUIT_PACKET_SIZE);
data = GWBUF_DATA(buf);
*data++ = 0x1;
*data++ = 0x0;
*data++ = 0x0;
*data++ = packet_number;
*data = 0x1;
return buf;
}
int mysql_send_com_quit(
DCB* dcb,
int packet_number,
GWBUF* bufparam)
{
GWBUF *buf;
int nbytes = 0;
CHK_DCB(dcb);
ss_dassert(packet_number <= 255);
if (dcb == NULL || dcb->state == DCB_STATE_ZOMBIE)
{
return 0;
}
if (bufparam == NULL)
{
buf = mysql_create_com_quit(NULL, packet_number);
}
else
{
buf = bufparam;
}
if (buf == NULL)
{
return 0;
}
nbytes = dcb->func.write(dcb, buf);
return nbytes;
}
GWBUF* mysql_create_custom_error(
int packet_number,
int affected_rows,
const char* msg)
{
uint8_t* outbuf = NULL;
uint8_t mysql_payload_size = 0;
uint8_t mysql_packet_header[4];
uint8_t* mysql_payload = NULL;
uint8_t field_count = 0;
uint8_t mysql_err[2];
uint8_t mysql_statemsg[6];
unsigned int mysql_errno = 0;
const char* mysql_error_msg = NULL;
const char* mysql_state = NULL;
GWBUF* errbuf = NULL;
mysql_errno = 2003;
mysql_error_msg = "An errorr occurred ...";
mysql_state = "HY000";
field_count = 0xff;
gw_mysql_set_byte2(mysql_err, mysql_errno);
mysql_statemsg[0]='#';
memcpy(mysql_statemsg+1, mysql_state, 5);
if (msg != NULL) {
mysql_error_msg = msg;
}
mysql_payload_size = sizeof(field_count) +
sizeof(mysql_err) +
sizeof(mysql_statemsg) +
strlen(mysql_error_msg);
/** allocate memory for packet header + payload */
errbuf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size);
ss_dassert(errbuf != NULL);
if (errbuf == NULL)
{
return 0;
}
outbuf = GWBUF_DATA(errbuf);
/** write packet header and packet number */
gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size);
mysql_packet_header[3] = packet_number;
/** write header */
memcpy(outbuf, mysql_packet_header, sizeof(mysql_packet_header));
mysql_payload = outbuf + sizeof(mysql_packet_header);
/** write field */
memcpy(mysql_payload, &field_count, sizeof(field_count));
mysql_payload = mysql_payload + sizeof(field_count);
/** write errno */
memcpy(mysql_payload, mysql_err, sizeof(mysql_err));
mysql_payload = mysql_payload + sizeof(mysql_err);
/** write sqlstate */
memcpy(mysql_payload, mysql_statemsg, sizeof(mysql_statemsg));
mysql_payload = mysql_payload + sizeof(mysql_statemsg);
/** write error message */
memcpy(mysql_payload, mysql_error_msg, strlen(mysql_error_msg));
return errbuf;
}
/** /**
* mysql_send_custom_error * mysql_send_custom_error
* *
@ -749,79 +953,21 @@ gw_mysql_protocol_state2string (int state) {
* @return packet length * @return packet length
* *
*/ */
int int mysql_send_custom_error (
mysql_send_custom_error (DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message) { DCB *dcb,
uint8_t *outbuf = NULL; int packet_number,
uint8_t mysql_payload_size = 0; int in_affected_rows,
uint8_t mysql_packet_header[4]; const char *mysql_message)
uint8_t *mysql_payload = NULL; {
uint8_t field_count = 0; GWBUF* buf;
uint8_t mysql_err[2]; int nbytes;
uint8_t mysql_statemsg[6];
unsigned int mysql_errno = 0;
const char *mysql_error_msg = NULL;
const char *mysql_state = NULL;
GWBUF *buf = NULL; buf = mysql_create_custom_error(dcb, in_affected_rows, mysql_message);
if (dcb == NULL || nbytes = GWBUF_LENGTH(buf);
dcb->state != DCB_STATE_POLLING)
{
return 0;
}
mysql_errno = 2003;
mysql_error_msg = "An errorr occurred ...";
mysql_state = "HY000";
field_count = 0xff;
gw_mysql_set_byte2(mysql_err, mysql_errno);
mysql_statemsg[0]='#';
memcpy(mysql_statemsg+1, mysql_state, 5);
if (mysql_message != NULL) {
mysql_error_msg = mysql_message;
}
mysql_payload_size = sizeof(field_count) + sizeof(mysql_err) + sizeof(mysql_statemsg) + strlen(mysql_error_msg);
// allocate memory for packet header + payload
buf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size);
ss_dassert(buf != NULL);
if (buf == NULL)
{
return 0;
}
outbuf = GWBUF_DATA(buf);
// write packet header with packet number
gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size);
mysql_packet_header[3] = packet_number;
// write header
memcpy(outbuf, mysql_packet_header, sizeof(mysql_packet_header));
mysql_payload = outbuf + sizeof(mysql_packet_header);
// write field
memcpy(mysql_payload, &field_count, sizeof(field_count));
mysql_payload = mysql_payload + sizeof(field_count);
// write errno
memcpy(mysql_payload, mysql_err, sizeof(mysql_err));
mysql_payload = mysql_payload + sizeof(mysql_err);
// write sqlstate
memcpy(mysql_payload, mysql_statemsg, sizeof(mysql_statemsg));
mysql_payload = mysql_payload + sizeof(mysql_statemsg);
// write err messg
memcpy(mysql_payload, mysql_error_msg, strlen(mysql_error_msg));
// writing data in the Client buffer queue
dcb->func.write(dcb, buf); dcb->func.write(dcb, buf);
return sizeof(mysql_packet_header) + mysql_payload_size; return GWBUF_LENGTH(buf);
} }
/** /**
@ -1229,7 +1375,12 @@ int gw_find_mysql_user_password_sha1(char *username, uint8_t *gateway_password,
* *
*/ */
int int
mysql_send_auth_error (DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message) { mysql_send_auth_error (
DCB *dcb,
int packet_number,
int in_affected_rows,
const char *mysql_message)
{
uint8_t *outbuf = NULL; uint8_t *outbuf = NULL;
uint8_t mysql_payload_size = 0; uint8_t mysql_payload_size = 0;
uint8_t mysql_packet_header[4]; uint8_t mysql_packet_header[4];

View File

@ -343,8 +343,7 @@ TELNETD *telnetd = dcb->protocol;
if (telnetd && telnetd->username) if (telnetd && telnetd->username)
free(telnetd->username); free(telnetd->username);
dcb_close(dcb); return 0;
return 0;
} }
/** /**

View File

@ -58,12 +58,14 @@ static void GHACloseSession(ROUTER *instance, void *router_session);
static void GHAFreeSession(ROUTER *instance, void *router_session); static void GHAFreeSession(ROUTER *instance, void *router_session);
static int GHARouteQuery(ROUTER *instance, void *router_session, GWBUF *queue); static int GHARouteQuery(ROUTER *instance, void *router_session, GWBUF *queue);
static void GHADiagnostics(ROUTER *instance, DCB *dcb); static void GHADiagnostics(ROUTER *instance, DCB *dcb);
static void GHAClientReply( static void GHAClientReply(
ROUTER *instance, ROUTER *instance,
void *router_session, void *router_session,
GWBUF *queue, GWBUF *queue,
DCB *backend_dcb); DCB *backend_dcb);
static void GHAErrorReply(
static void GHAHandleError(
ROUTER *instance, ROUTER *instance,
void *router_session, void *router_session,
char *message, char *message,
@ -79,7 +81,7 @@ static ROUTER_OBJECT MyObject = {
GHARouteQuery, GHARouteQuery,
GHADiagnostics, GHADiagnostics,
GHAClientReply, GHAClientReply,
GHAErrorReply GHAHandleError
}; };
static bool rses_begin_router_action( static bool rses_begin_router_action(
@ -489,7 +491,7 @@ DCB* backend_dcb;
*/ */
if (backend_dcb != NULL) { if (backend_dcb != NULL) {
CHK_DCB(backend_dcb); CHK_DCB(backend_dcb);
backend_dcb->func.close(backend_dcb); dcb_close(backend_dcb);
} }
} }
} }
@ -630,10 +632,9 @@ GHAClientReply(
} }
/** /**
* Error Reply routine * Error handling routine
* *
* The routine will reply to client errors and/or closing the session * The routine will handle error occurred in backend.
* or try to open a new backend connection.
* *
* @param instance The router instance * @param instance The router instance
* @param router_session The router session * @param router_session The router session
@ -643,7 +644,7 @@ GHAClientReply(
* *
*/ */
static void static void
GHAErrorReply( GHAHandleError(
ROUTER *instance, ROUTER *instance,
void *router_session, void *router_session,
char *message, char *message,

View File

@ -295,7 +295,7 @@ CLI_SESSION *session = (CLI_SESSION *)router_session;
if (execute_cmd(session)) if (execute_cmd(session))
dcb_printf(session->session->client, "MaxScale> "); dcb_printf(session->session->client, "MaxScale> ");
else else
session->session->client->func.close(session->session->client); dcb_close(session->session->client);
} }
return 1; return 1;
} }

View File

@ -110,12 +110,13 @@ static void clientReply(
void *router_session, void *router_session,
GWBUF *queue, GWBUF *queue,
DCB *backend_dcb); DCB *backend_dcb);
static void errorReply( static void handleError(
ROUTER *instance, ROUTER *instance,
void *router_session, void *router_session,
char *message, char *message,
DCB *backend_dcb, DCB *backend_dcb,
int action); int action,
bool *succp);
static uint8_t getCapabilities (ROUTER* inst, void* router_session); static uint8_t getCapabilities (ROUTER* inst, void* router_session);
@ -128,7 +129,7 @@ static ROUTER_OBJECT MyObject = {
routeQuery, routeQuery,
diagnostics, diagnostics,
clientReply, clientReply,
errorReply, handleError,
getCapabilities getCapabilities
}; };
@ -551,7 +552,7 @@ DCB* backend_dcb;
*/ */
if (backend_dcb != NULL) { if (backend_dcb != NULL) {
CHK_DCB(backend_dcb); CHK_DCB(backend_dcb);
backend_dcb->func.close(backend_dcb); dcb_close(backend_dcb);
} }
} }
} }
@ -692,10 +693,9 @@ clientReply(
} }
/** /**
* Error Reply routine * Error Handler routine
* *
* The routine will reply to client errors and/or closing the session * The routine will handle errors that occurred in backend writes.
* or try to open a new backend connection.
* *
* @param instance The router instance * @param instance The router instance
* @param router_session The router session * @param router_session The router session
@ -705,12 +705,13 @@ clientReply(
* *
*/ */
static void static void
errorReply( handleError(
ROUTER *instance, ROUTER *instance,
void *router_session, void *router_session,
char *message, char *message,
DCB *backend_dcb, DCB *backend_dcb,
int action) int action,
bool *succp)
{ {
DCB *client = NULL; DCB *client = NULL;
SESSION *session = backend_dcb->session; SESSION *session = backend_dcb->session;

File diff suppressed because it is too large Load Diff