Merge branch 'develop' of github.com:skysql/MaxScale into develop

Conflicts:
	server/core/session.c
	server/modules/protocol/mysql_backend.c
	server/modules/routing/readwritesplit/readwritesplit.c

Resolved.

Addition of user in topfilter report and general tidyup
This commit is contained in:
Mark Riddoch
2014-06-18 17:45:57 +01:00
35 changed files with 2005 additions and 854 deletions

View File

@ -83,6 +83,7 @@ static bool dcb_set_state_nomutex(
const dcb_state_t new_state,
dcb_state_t* old_state);
static void dcb_call_callback(DCB *dcb, DCB_REASON reason);
static DCB* dcb_get_next (DCB* dcb);
DCB* dcb_get_zombies(void)
{
@ -109,6 +110,7 @@ DCB *rval;
#if defined(SS_DEBUG)
rval->dcb_chk_top = CHK_NUM_DCB;
rval->dcb_chk_tail = CHK_NUM_DCB;
rval->dcb_errhandle_called = false;
#endif
rval->dcb_role = role;
#if 1
@ -132,6 +134,9 @@ DCB *rval;
rval->next = NULL;
rval->callbacks = NULL;
rval->remote = NULL;
rval->user = NULL;
spinlock_acquire(&dcbspin);
if (allDCBs == NULL)
allDCBs = rval;
@ -148,7 +153,7 @@ DCB *rval;
/**
* Free a DCB that has not been associated with a decriptor.
* Free a DCB that has not been associated with a descriptor.
*
* @param dcb The DCB to free
*/
@ -311,6 +316,8 @@ DCB_CALLBACK *cb;
free(dcb->data);
if (dcb->remote)
free(dcb->remote);
if (dcb->user)
free(dcb->user);
/* Clear write and read buffers */
if (dcb->delayq) {
@ -559,7 +566,8 @@ int rc;
dcb->fd = fd;
/** Copy status field to DCB */
dcb->dcb_server_status = server->status;
ss_debug(dcb->dcb_port = server->port;)
/*<
* backend_dcb is connected to backend server, and once backend_dcb
* is added to poll set, authentication takes place as part of
@ -594,26 +602,29 @@ int rc;
*
* @param dcb The DCB to read from
* @param head Pointer to linked list to append data to
* @return -1 on error, otherwise the number of read bytes on the last.
* 0 is returned if no data available on the last iteration of while loop.
* @return -1 on error, otherwise the number of read bytes on the last
* iteration of while loop. 0 is returned if no data available.
*/
int
dcb_read(DCB *dcb, GWBUF **head)
int dcb_read(
DCB *dcb,
GWBUF **head)
{
GWBUF *buffer = NULL;
int b;
int rc;
int n = 0;
int eno = 0;
GWBUF *buffer = NULL;
int b;
int rc;
int n ;
int nread = 0;
int eno = 0;
CHK_DCB(dcb);
while (true)
{
int bufsize;
{
int bufsize;
rc = ioctl(dcb->fd, FIONREAD, &b);
if (rc == -1) {
if (rc == -1)
{
eno = errno;
errno = 0;
LOGIF(LE, (skygw_log_write_flush(
@ -628,19 +639,39 @@ int eno = 0;
n = -1;
goto return_n;
}
/*< Nothing to read - leave */
if (b == 0) {
if (b == 0 && nread == 0)
{
/** Handle closed client socket */
if (dcb_isclient(dcb))
{
char c;
int l_errno = 0;
int r = -1;
/* try to read 1 byte, without consuming the socket buffer */
r = recv(dcb->fd, &c, sizeof(char), MSG_PEEK);
l_errno = errno;
if (r <= 0 &&
l_errno != EAGAIN &&
l_errno != EWOULDBLOCK)
{
n = -1;
goto return_n;
}
}
n = 0;
goto return_n;
}
bufsize = MIN(b, MAX_BUFFER_SIZE);
if ((buffer = gwbuf_alloc(bufsize)) == NULL)
{
if ((buffer = gwbuf_alloc(bufsize)) == NULL)
{
/*<
* This is a fatal error which should cause shutdown.
* Todo shutdown if memory allocation fails.
*/
* This is a fatal error which should cause shutdown.
* Todo shutdown if memory allocation fails.
*/
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Failed to allocate read buffer "
@ -653,16 +684,17 @@ int eno = 0;
n = -1;
ss_dassert(buffer != NULL);
goto return_n;
}
GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize);
dcb->stats.n_reads++);
if (n <= 0)
{
}
GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize);
dcb->stats.n_reads++);
if (n <= 0)
{
int eno = errno;
errno = 0;
if (eno != EAGAIN && eno != EWOULDBLOCK) {
if (eno != 0 && eno != EAGAIN && eno != EWOULDBLOCK)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Read failed, dcb %p in state "
@ -673,18 +705,11 @@ int eno = 0;
eno,
strerror(eno))));
}
else
{
/*<
* If read would block it means that other thread
* has probably read the data.
*/
n = 0;
}
gwbuf_free(buffer);
gwbuf_free(buffer);
goto return_n;
}
nread += n;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [dcb_read] Read %d bytes from dcb %p in state %s "
@ -694,14 +719,13 @@ int eno = 0;
dcb,
STRDCBSTATE(dcb->state),
dcb->fd)));
/*< Append read data to the gwbuf */
*head = gwbuf_append(*head, buffer);
} /*< while (true) */
/*< Append read data to the gwbuf */
*head = gwbuf_append(*head, buffer);
} /*< while (true) */
return_n:
return n;
return n;
}
/**
* General purpose routine to write to a DCB
*
@ -711,7 +735,7 @@ return_n:
int
dcb_write(DCB *dcb, GWBUF *queue)
{
int w, qlen;
int w;
int saved_errno = 0;
int below_water;
@ -760,26 +784,26 @@ int below_water;
* not have a race condition on the event.
*/
if (queue)
qlen = gwbuf_length(queue);
else
qlen = 0;
atomic_add(&dcb->writeqlen, qlen);
dcb->writeq = gwbuf_append(dcb->writeq, queue);
dcb->stats.n_buffered++;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [dcb_write] Append to writequeue. %d writes "
"buffered for dcb %p in state %s fd %d",
pthread_self(),
dcb->stats.n_buffered,
dcb,
STRDCBSTATE(dcb->state),
dcb->fd)));
{
int qlen;
qlen = gwbuf_length(queue);
atomic_add(&dcb->writeqlen, qlen);
dcb->writeq = gwbuf_append(dcb->writeq, queue);
dcb->stats.n_buffered++;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [dcb_write] Append to writequeue. %d writes "
"buffered for dcb %p in state %s fd %d",
pthread_self(),
dcb->stats.n_buffered,
dcb,
STRDCBSTATE(dcb->state),
dcb->fd)));
}
}
else
{
int len;
/*
* Loop over the buffer chain that has been passed to us
* from the reading side.
@ -788,6 +812,7 @@ int below_water;
*/
while (queue != NULL)
{
int qlen;
#if defined(SS_DEBUG)
if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER &&
dcb->session != NULL)
@ -805,13 +830,13 @@ int below_water;
}
}
#endif /* SS_DEBUG */
len = GWBUF_LENGTH(queue);
qlen = GWBUF_LENGTH(queue);
GW_NOINTR_CALL(
w = gw_write(
#if defined(SS_DEBUG)
dcb,
#endif
dcb->fd, GWBUF_DATA(queue), len);
dcb->fd, GWBUF_DATA(queue), qlen);
dcb->stats.n_writes++;
);
@ -822,37 +847,39 @@ int below_water;
if (LOG_IS_ENABLED(LOGFILE_DEBUG))
{
if (saved_errno == EPIPE) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [dcb_write] Write to dcb "
"%p in state %s fd %d failed "
"due errno %d, %s",
pthread_self(),
dcb,
STRDCBSTATE(dcb->state),
dcb->fd,
saved_errno,
strerror(saved_errno))));
if (saved_errno == EPIPE)
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [dcb_write] Write to dcb "
"%p in state %s fd %d failed "
"due errno %d, %s",
pthread_self(),
dcb,
STRDCBSTATE(dcb->state),
dcb->fd,
saved_errno,
strerror(saved_errno))));
}
}
if (LOG_IS_ENABLED(LOGFILE_ERROR))
{
if (saved_errno != EPIPE &&
saved_errno != EAGAIN &&
saved_errno != EWOULDBLOCK)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Write to dcb %p in "
"state %s fd %d failed due "
"errno %d, %s",
dcb,
STRDCBSTATE(dcb->state),
dcb->fd,
saved_errno,
strerror(saved_errno))));
}
saved_errno != EWOULDBLOCK)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Write to dcb %p in "
"state %s fd %d failed due "
"errno %d, %s",
dcb,
STRDCBSTATE(dcb->state),
dcb->fd,
saved_errno,
strerror(saved_errno))));
}
}
break;
}
@ -876,20 +903,15 @@ int below_water;
* for suspended write.
*/
dcb->writeq = queue;
if (queue)
if (queue)
{
int qlen;
qlen = gwbuf_length(queue);
}
else
{
qlen = 0;
}
atomic_add(&dcb->writeqlen, qlen);
if (queue != NULL)
{
dcb->stats.n_buffered++;
}
atomic_add(&dcb->writeqlen, qlen);
dcb->stats.n_buffered++;
}
} /* if (dcb->writeq) */
if (saved_errno != 0 &&
@ -937,10 +959,10 @@ int above_water;
above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0;
spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq)
if (dcb->writeq)
{
int len;
/*
* Loop over the buffer chain in the pending writeq
* Send as much of the data in that chain as possible and
@ -996,16 +1018,17 @@ int above_water;
}
spinlock_release(&dcb->writeqlock);
atomic_add(&dcb->writeqlen, -n);
/* The write queue has drained, potentially need to call a callback function */
/* The write queue has drained, potentially need to call a callback function */
if (dcb->writeq == NULL)
dcb_call_callback(dcb, DCB_REASON_DRAINED);
if (above_water && dcb->writeqlen < dcb->low_water)
if (above_water && dcb->writeqlen < dcb->low_water)
{
atomic_add(&dcb->stats.n_low_water, 1);
dcb_call_callback(dcb, DCB_REASON_LOW_WATER);
}
return n;
}
@ -1024,13 +1047,15 @@ void
dcb_close(DCB *dcb)
{
int rc;
CHK_DCB(dcb);
/*<
* dcb_close may be called for freshly created dcb, in which case
* it only needs to be freed.
*/
if (dcb->state == DCB_STATE_ALLOC) {
if (dcb->state == DCB_STATE_ALLOC)
{
dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL);
dcb_final_free(dcb);
return;
@ -1041,13 +1066,19 @@ dcb_close(DCB *dcb)
dcb->state == DCB_STATE_ZOMBIE);
/*<
* Stop dcb's listening and modify state accordingly.
*/
* Stop dcb's listening and modify state accordingly.
*/
rc = poll_remove_dcb(dcb);
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE);
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE);
/**
* close protocol and router session
*/
if (dcb->func.close != NULL)
{
dcb->func.close(dcb);
}
dcb_call_callback(dcb, DCB_REASON_CLOSE);
if (rc == 0) {
@ -1068,7 +1099,8 @@ dcb_close(DCB *dcb)
STRDCBSTATE(dcb->state))));
}
if (dcb->state == DCB_STATE_NOPOLLING) {
if (dcb->state == DCB_STATE_NOPOLLING)
{
dcb_add_to_zombieslist(dcb);
}
}
@ -1086,6 +1118,8 @@ printDCB(DCB *dcb)
printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state));
if (dcb->remote)
printf("\tConnected to: %s\n", dcb->remote);
if (dcb->user)
printf("\tUsername to: %s\n", dcb->user);
if (dcb->writeq)
printf("\tQueued write data: %d\n",gwbuf_length(dcb->writeq));
printf("\tStatistics:\n");
@ -1143,6 +1177,9 @@ DCB *dcb;
if (dcb->remote)
dcb_printf(pdcb, "\tConnected to: %s\n",
dcb->remote);
if (dcb->user)
dcb_printf(pdcb, "\tUsername: %s\n",
dcb->user);
if (dcb->writeq)
dcb_printf(pdcb, "\tQueued write data: %d\n",
gwbuf_length(dcb->writeq));
@ -1170,6 +1207,8 @@ DCB *dcb;
spinlock_acquire(&dcbspin);
dcb = allDCBs;
dcb_printf(pdcb, "Descriptor Control Blocks\n");
dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n");
dcb_printf(pdcb, " %-10s | %-26s | %-20s | %s\n",
"DCB", "State", "Service", "Remote");
dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n");
@ -1182,7 +1221,7 @@ DCB *dcb;
(dcb->remote ? dcb->remote : ""));
dcb = dcb->next;
}
dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n");
dcb_printf(pdcb, "------------+----------------------------+----------------------+----------\n\n");
spinlock_release(&dcbspin);
}
@ -1450,7 +1489,7 @@ static bool dcb_set_state_nomutex(
} /*< switch (dcb->state) */
if (succp) {
LOGIF(LD, (skygw_log_write(
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [dcb_set_state_nomutex] dcb %p fd %d %s -> %s",
pthread_self(),
@ -1566,7 +1605,10 @@ int gw_write(
* @return Non-zero (true) if the callback was added
*/
int
dcb_add_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata)
dcb_add_callback(
DCB *dcb,
DCB_REASON reason,
int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata)
{
DCB_CALLBACK *cb, *ptr;
int rval = 1;
@ -1637,7 +1679,7 @@ int rval = 0;
if (cb->reason == reason && cb->cb == callback
&& cb->userdata == userdata)
{
if (pcb == NULL)
if (pcb != NULL)
pcb->next = cb->next;
else
dcb->callbacks = cb->next;
@ -1711,3 +1753,72 @@ int rval = 0;
return rval;
}
static DCB* dcb_get_next (
DCB* dcb)
{
DCB* p;
spinlock_acquire(&dcbspin);
p = allDCBs;
if (dcb == NULL || p == NULL)
{
dcb = p;
}
else
{
while (p != NULL && dcb != p)
{
p = p->next;
}
if (p != NULL)
{
dcb = p->next;
}
else
{
dcb = NULL;
}
}
spinlock_release(&dcbspin);
return dcb;
}
void dcb_call_foreach (
SERVER* srv,
DCB_REASON reason)
{
switch (reason) {
case DCB_REASON_CLOSE:
case DCB_REASON_DRAINED:
case DCB_REASON_HIGH_WATER:
case DCB_REASON_LOW_WATER:
case DCB_REASON_ERROR:
case DCB_REASON_HUP:
case DCB_REASON_NOT_RESPONDING:
{
DCB* dcb;
dcb = dcb_get_next(NULL);
while (dcb != NULL)
{
if (dcb->state == DCB_STATE_POLLING)
{
dcb_call_callback(dcb, DCB_REASON_NOT_RESPONDING);
}
dcb = dcb_get_next(dcb);
}
break;
}
default:
break;
}
return;
}