merge from develop

merge from develop
This commit is contained in:
MassimilianoPinto
2014-06-18 11:51:47 +02:00
29 changed files with 1822 additions and 826 deletions

View File

@ -308,7 +308,7 @@ bool gwbuf_set_type(
case GWBUF_TYPE_MYSQL:
case GWBUF_TYPE_PLAINSQL:
case GWBUF_TYPE_UNDEFINED:
buf->gwbuf_type = type;
buf->gwbuf_type |= type;
succp = true;
break;
default:

View File

@ -469,7 +469,7 @@ int error_count = 0;
s = strtok(NULL, ",");
}
}
if (filters)
if (filters && obj->element)
{
serviceSetFilters(obj->element, filters);
}

View File

@ -213,9 +213,9 @@ getUsers(SERVICE *service, struct users *users)
"Exiting.")));
return -1;
}
/*
* Attempt to connect to each database in the service in turn until
* we find one that we can connect to or until we run out of databases
/**
* Attempt to connect to one of the databases database or until we run
* out of databases
* to try
*/
server = service->databases;

View File

@ -83,6 +83,7 @@ static bool dcb_set_state_nomutex(
const dcb_state_t new_state,
dcb_state_t* old_state);
static void dcb_call_callback(DCB *dcb, DCB_REASON reason);
static DCB* dcb_get_next (DCB* dcb);
DCB* dcb_get_zombies(void)
{
@ -109,6 +110,7 @@ DCB *rval;
#if defined(SS_DEBUG)
rval->dcb_chk_top = CHK_NUM_DCB;
rval->dcb_chk_tail = CHK_NUM_DCB;
rval->dcb_errhandle_called = false;
#endif
rval->dcb_role = role;
#if 1
@ -148,7 +150,7 @@ DCB *rval;
/**
* Free a DCB that has not been associated with a decriptor.
* Free a DCB that has not been associated with a descriptor.
*
* @param dcb The DCB to free
*/
@ -559,7 +561,8 @@ int rc;
dcb->fd = fd;
/** Copy status field to DCB */
dcb->dcb_server_status = server->status;
ss_debug(dcb->dcb_port = server->port;)
/*<
* backend_dcb is connected to backend server, and once backend_dcb
* is added to poll set, authentication takes place as part of
@ -594,26 +597,29 @@ int rc;
*
* @param dcb The DCB to read from
* @param head Pointer to linked list to append data to
* @return -1 on error, otherwise the number of read bytes on the last.
* 0 is returned if no data available on the last iteration of while loop.
* @return -1 on error, otherwise the number of read bytes on the last
* iteration of while loop. 0 is returned if no data available.
*/
int
dcb_read(DCB *dcb, GWBUF **head)
int dcb_read(
DCB *dcb,
GWBUF **head)
{
GWBUF *buffer = NULL;
int b;
int rc;
int n = 0;
int eno = 0;
GWBUF *buffer = NULL;
int b;
int rc;
int n ;
int nread = 0;
int eno = 0;
CHK_DCB(dcb);
while (true)
{
int bufsize;
{
int bufsize;
rc = ioctl(dcb->fd, FIONREAD, &b);
if (rc == -1) {
if (rc == -1)
{
eno = errno;
errno = 0;
LOGIF(LE, (skygw_log_write_flush(
@ -628,19 +634,39 @@ int eno = 0;
n = -1;
goto return_n;
}
/*< Nothing to read - leave */
if (b == 0) {
if (b == 0 && nread == 0)
{
/** Handle closed client socket */
if (dcb_isclient(dcb))
{
char c;
int l_errno = 0;
int r = -1;
/* try to read 1 byte, without consuming the socket buffer */
r = recv(dcb->fd, &c, sizeof(char), MSG_PEEK);
l_errno = errno;
if (r <= 0 &&
l_errno != EAGAIN &&
l_errno != EWOULDBLOCK)
{
n = -1;
goto return_n;
}
}
n = 0;
goto return_n;
}
bufsize = MIN(b, MAX_BUFFER_SIZE);
if ((buffer = gwbuf_alloc(bufsize)) == NULL)
{
if ((buffer = gwbuf_alloc(bufsize)) == NULL)
{
/*<
* This is a fatal error which should cause shutdown.
* Todo shutdown if memory allocation fails.
*/
* This is a fatal error which should cause shutdown.
* Todo shutdown if memory allocation fails.
*/
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Failed to allocate read buffer "
@ -653,16 +679,17 @@ int eno = 0;
n = -1;
ss_dassert(buffer != NULL);
goto return_n;
}
GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize);
dcb->stats.n_reads++);
if (n <= 0)
{
}
GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize);
dcb->stats.n_reads++);
if (n <= 0)
{
int eno = errno;
errno = 0;
if (eno != EAGAIN && eno != EWOULDBLOCK) {
if (eno != 0 && eno != EAGAIN && eno != EWOULDBLOCK)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Read failed, dcb %p in state "
@ -673,18 +700,11 @@ int eno = 0;
eno,
strerror(eno))));
}
else
{
/*<
* If read would block it means that other thread
* has probably read the data.
*/
n = 0;
}
gwbuf_free(buffer);
gwbuf_free(buffer);
goto return_n;
}
nread += n;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [dcb_read] Read %d bytes from dcb %p in state %s "
@ -694,14 +714,13 @@ int eno = 0;
dcb,
STRDCBSTATE(dcb->state),
dcb->fd)));
/*< Append read data to the gwbuf */
*head = gwbuf_append(*head, buffer);
} /*< while (true) */
/*< Append read data to the gwbuf */
*head = gwbuf_append(*head, buffer);
} /*< while (true) */
return_n:
return n;
return n;
}
/**
* General purpose routine to write to a DCB
*
@ -711,7 +730,7 @@ return_n:
int
dcb_write(DCB *dcb, GWBUF *queue)
{
int w, qlen;
int w;
int saved_errno = 0;
int below_water;
@ -760,26 +779,26 @@ int below_water;
* not have a race condition on the event.
*/
if (queue)
qlen = gwbuf_length(queue);
else
qlen = 0;
atomic_add(&dcb->writeqlen, qlen);
dcb->writeq = gwbuf_append(dcb->writeq, queue);
dcb->stats.n_buffered++;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [dcb_write] Append to writequeue. %d writes "
"buffered for dcb %p in state %s fd %d",
pthread_self(),
dcb->stats.n_buffered,
dcb,
STRDCBSTATE(dcb->state),
dcb->fd)));
{
int qlen;
qlen = gwbuf_length(queue);
atomic_add(&dcb->writeqlen, qlen);
dcb->writeq = gwbuf_append(dcb->writeq, queue);
dcb->stats.n_buffered++;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [dcb_write] Append to writequeue. %d writes "
"buffered for dcb %p in state %s fd %d",
pthread_self(),
dcb->stats.n_buffered,
dcb,
STRDCBSTATE(dcb->state),
dcb->fd)));
}
}
else
{
int len;
/*
* Loop over the buffer chain that has been passed to us
* from the reading side.
@ -788,6 +807,7 @@ int below_water;
*/
while (queue != NULL)
{
int qlen;
#if defined(SS_DEBUG)
if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER &&
dcb->session != NULL)
@ -805,13 +825,13 @@ int below_water;
}
}
#endif /* SS_DEBUG */
len = GWBUF_LENGTH(queue);
qlen = GWBUF_LENGTH(queue);
GW_NOINTR_CALL(
w = gw_write(
#if defined(SS_DEBUG)
dcb,
#endif
dcb->fd, GWBUF_DATA(queue), len);
dcb->fd, GWBUF_DATA(queue), qlen);
dcb->stats.n_writes++;
);
@ -822,37 +842,39 @@ int below_water;
if (LOG_IS_ENABLED(LOGFILE_DEBUG))
{
if (saved_errno == EPIPE) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [dcb_write] Write to dcb "
"%p in state %s fd %d failed "
"due errno %d, %s",
pthread_self(),
dcb,
STRDCBSTATE(dcb->state),
dcb->fd,
saved_errno,
strerror(saved_errno))));
if (saved_errno == EPIPE)
{
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [dcb_write] Write to dcb "
"%p in state %s fd %d failed "
"due errno %d, %s",
pthread_self(),
dcb,
STRDCBSTATE(dcb->state),
dcb->fd,
saved_errno,
strerror(saved_errno))));
}
}
if (LOG_IS_ENABLED(LOGFILE_ERROR))
{
if (saved_errno != EPIPE &&
saved_errno != EAGAIN &&
saved_errno != EWOULDBLOCK)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Write to dcb %p in "
"state %s fd %d failed due "
"errno %d, %s",
dcb,
STRDCBSTATE(dcb->state),
dcb->fd,
saved_errno,
strerror(saved_errno))));
}
saved_errno != EWOULDBLOCK)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Write to dcb %p in "
"state %s fd %d failed due "
"errno %d, %s",
dcb,
STRDCBSTATE(dcb->state),
dcb->fd,
saved_errno,
strerror(saved_errno))));
}
}
break;
}
@ -876,20 +898,15 @@ int below_water;
* for suspended write.
*/
dcb->writeq = queue;
if (queue)
if (queue)
{
int qlen;
qlen = gwbuf_length(queue);
}
else
{
qlen = 0;
}
atomic_add(&dcb->writeqlen, qlen);
if (queue != NULL)
{
dcb->stats.n_buffered++;
}
atomic_add(&dcb->writeqlen, qlen);
dcb->stats.n_buffered++;
}
} /* if (dcb->writeq) */
if (saved_errno != 0 &&
@ -937,10 +954,10 @@ int above_water;
above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0;
spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq)
if (dcb->writeq)
{
int len;
/*
* Loop over the buffer chain in the pending writeq
* Send as much of the data in that chain as possible and
@ -996,16 +1013,17 @@ int above_water;
}
spinlock_release(&dcb->writeqlock);
atomic_add(&dcb->writeqlen, -n);
/* The write queue has drained, potentially need to call a callback function */
/* The write queue has drained, potentially need to call a callback function */
if (dcb->writeq == NULL)
dcb_call_callback(dcb, DCB_REASON_DRAINED);
if (above_water && dcb->writeqlen < dcb->low_water)
if (above_water && dcb->writeqlen < dcb->low_water)
{
atomic_add(&dcb->stats.n_low_water, 1);
dcb_call_callback(dcb, DCB_REASON_LOW_WATER);
}
return n;
}
@ -1024,13 +1042,15 @@ void
dcb_close(DCB *dcb)
{
int rc;
CHK_DCB(dcb);
/*<
* dcb_close may be called for freshly created dcb, in which case
* it only needs to be freed.
*/
if (dcb->state == DCB_STATE_ALLOC) {
if (dcb->state == DCB_STATE_ALLOC)
{
dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL);
dcb_final_free(dcb);
return;
@ -1041,13 +1061,19 @@ dcb_close(DCB *dcb)
dcb->state == DCB_STATE_ZOMBIE);
/*<
* Stop dcb's listening and modify state accordingly.
*/
* Stop dcb's listening and modify state accordingly.
*/
rc = poll_remove_dcb(dcb);
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE);
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE);
/**
* close protocol and router session
*/
if (dcb->func.close != NULL)
{
dcb->func.close(dcb);
}
dcb_call_callback(dcb, DCB_REASON_CLOSE);
if (rc == 0) {
@ -1068,7 +1094,8 @@ dcb_close(DCB *dcb)
STRDCBSTATE(dcb->state))));
}
if (dcb->state == DCB_STATE_NOPOLLING) {
if (dcb->state == DCB_STATE_NOPOLLING)
{
dcb_add_to_zombieslist(dcb);
}
}
@ -1452,7 +1479,7 @@ static bool dcb_set_state_nomutex(
} /*< switch (dcb->state) */
if (succp) {
LOGIF(LD, (skygw_log_write(
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [dcb_set_state_nomutex] dcb %p fd %d %s -> %s",
pthread_self(),
@ -1568,7 +1595,10 @@ int gw_write(
* @return Non-zero (true) if the callback was added
*/
int
dcb_add_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata)
dcb_add_callback(
DCB *dcb,
DCB_REASON reason,
int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata)
{
DCB_CALLBACK *cb, *ptr;
int rval = 1;
@ -1639,7 +1669,7 @@ int rval = 0;
if (cb->reason == reason && cb->cb == callback
&& cb->userdata == userdata)
{
if (pcb == NULL)
if (pcb != NULL)
pcb->next = cb->next;
else
dcb->callbacks = cb->next;
@ -1713,3 +1743,72 @@ int rval = 0;
return rval;
}
static DCB* dcb_get_next (
DCB* dcb)
{
DCB* p;
spinlock_acquire(&dcbspin);
p = allDCBs;
if (dcb == NULL || p == NULL)
{
dcb = p;
}
else
{
while (p != NULL && dcb != p)
{
p = p->next;
}
if (p != NULL)
{
dcb = p->next;
}
else
{
dcb = NULL;
}
}
spinlock_release(&dcbspin);
return dcb;
}
void dcb_call_foreach (
SERVER* srv,
DCB_REASON reason)
{
switch (reason) {
case DCB_REASON_CLOSE:
case DCB_REASON_DRAINED:
case DCB_REASON_HIGH_WATER:
case DCB_REASON_LOW_WATER:
case DCB_REASON_ERROR:
case DCB_REASON_HUP:
case DCB_REASON_NOT_RESPONDING:
{
DCB* dcb;
dcb = dcb_get_next(NULL);
while (dcb != NULL)
{
if (dcb->state == DCB_STATE_POLLING)
{
dcb_call_callback(dcb, DCB_REASON_NOT_RESPONDING);
}
dcb = dcb_get_next(dcb);
}
break;
}
default:
break;
}
return;
}

View File

@ -158,62 +158,6 @@ void gw_daemonize(void) {
}
}
/////////////////////////////////////////////////
// Read data from dcb and store it in the gwbuf
/////////////////////////////////////////////////
int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) {
GWBUF *buffer = NULL;
int n = -1;
if (b <= 0) {
ss_dassert(false);
#if 0
dcb->func.close(dcb);
#endif
return 1;
}
while (b > 0) {
int bufsize = b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE;
if ((buffer = gwbuf_alloc(bufsize)) == NULL) {
/* Bad news, we have run out of memory */
/* Error handling */
(dcb->func).close(dcb);
return 1;
}
GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); dcb->stats.n_reads++);
if (n < 0) {
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
gwbuf_free(buffer);
return 1;
} else {
gwbuf_free(buffer);
(dcb->func).close(dcb);
return 1;
}
}
if (n == 0) {
// socket closed
gwbuf_free(buffer);
#if 1
(dcb->func).close(dcb);
#endif
return 1;
}
// append read data to the gwbuf
*head = gwbuf_append(*head, buffer);
// how many bytes left
b -= n;
}
return 0;
}
/**
* Parse the bind config data. This is passed in a string as address:port.
*

View File

@ -349,8 +349,8 @@ poll_waitevents(void *arg)
ss_dassert(dcb->state != DCB_STATE_FREED);
ss_debug(spinlock_release(&dcb->dcb_initlock);)
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] event %d dcb %p "
"role %s",
pthread_self(),

View File

@ -1096,3 +1096,9 @@ static void service_add_qualified_param(
(*p)->next = NULL;
spinlock_release(&svc->spin);
}
char* service_get_name(
SERVICE* svc)
{
return svc->name;
}

View File

@ -648,3 +648,30 @@ int i;
return 1;
}
bool session_route_query (
SESSION* ses,
GWBUF* buf)
{
bool succp;
if (ses->head.routeQuery == NULL ||
ses->head.instance == NULL ||
ses->head.session == NULL)
{
succp = false;
goto return_succp;
}
if (ses->head.routeQuery(ses->head.instance, ses->head.session, buf) == 1)
{
succp = true;
}
else
{
succp = false;
}
return_succp:
return succp;
}