Complete merge of latest develop branch
This commit is contained in:
@ -302,8 +302,6 @@ static int find_last_seqno(strpart_t* parts, int seqno, int seqnoidx);
|
||||
void flushall_logfiles(bool flush);
|
||||
bool thr_flushall_check();
|
||||
|
||||
#include "../core/atomic.c"
|
||||
|
||||
const char* get_suffix_default(void)
|
||||
{
|
||||
return ".log";
|
||||
|
||||
@ -1175,7 +1175,7 @@ config_get_value(CONFIG_PARAMETER *params, const char *name)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the value of a config parameter
|
||||
* Get the value of a config parameter as a string
|
||||
*
|
||||
* @param params The linked list of config parameters
|
||||
* @param name The parameter to return
|
||||
|
||||
@ -49,6 +49,7 @@
|
||||
* backend
|
||||
* 07/05/2014 Mark Riddoch Addition of callback mechanism
|
||||
* 20/06/2014 Mark Riddoch Addition of dcb_clone
|
||||
* 29/05/2015 Markus Makela Addition of dcb_write_SSL
|
||||
* 11/06/2015 Martin Brampton Persistent connnections and tidy up
|
||||
*
|
||||
* @endverbatim
|
||||
@ -418,7 +419,8 @@ DCB_CALLBACK *cb;
|
||||
free(cb);
|
||||
}
|
||||
spinlock_release(&dcb->cb_lock);
|
||||
|
||||
if(dcb->ssl)
|
||||
SSL_free(dcb->ssl);
|
||||
bitmask_free(&dcb->memdata.bitmask);
|
||||
free(dcb);
|
||||
}
|
||||
@ -916,6 +918,364 @@ return_n:
|
||||
return n;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* General purpose read routine to read data from a socket in the
|
||||
* Descriptor Control Block and append it to a linked list of buffers.
|
||||
* This function will read at most nbytes of data.
|
||||
*
|
||||
* The list may be empty, in which case *head == NULL. This
|
||||
*
|
||||
* @param dcb The DCB to read from
|
||||
* @param head Pointer to linked list to append data to
|
||||
* @param nbytes Maximum number of bytes read
|
||||
* @return -1 on error, otherwise the number of read bytes on the last
|
||||
* iteration of while loop. 0 is returned if no data available.
|
||||
*/
|
||||
int dcb_read_n(
|
||||
DCB *dcb,
|
||||
GWBUF **head,
|
||||
int nbytes)
|
||||
{
|
||||
GWBUF *buffer = NULL;
|
||||
int b;
|
||||
int rc;
|
||||
int n;
|
||||
int nread = 0;
|
||||
|
||||
CHK_DCB(dcb);
|
||||
|
||||
if (dcb->fd <= 0)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Read failed, dcb is %s.",
|
||||
dcb->fd == DCBFD_CLOSED ? "closed" : "cloned, not readable")));
|
||||
n = 0;
|
||||
goto return_n;
|
||||
}
|
||||
|
||||
int bufsize;
|
||||
|
||||
rc = ioctl(dcb->fd, FIONREAD, &b);
|
||||
|
||||
if (rc == -1)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : ioctl FIONREAD for dcb %p in "
|
||||
"state %s fd %d failed due error %d, %s.",
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd,
|
||||
errno,
|
||||
strerror(errno))));
|
||||
n = -1;
|
||||
goto return_n;
|
||||
}
|
||||
|
||||
if (b == 0 && nread == 0)
|
||||
{
|
||||
/** Handle closed client socket */
|
||||
if (dcb_isclient(dcb))
|
||||
{
|
||||
char c;
|
||||
int l_errno = 0;
|
||||
int r = -1;
|
||||
|
||||
/* try to read 1 byte, without consuming the socket buffer */
|
||||
r = recv(dcb->fd, &c, sizeof(char), MSG_PEEK);
|
||||
l_errno = errno;
|
||||
|
||||
if (r <= 0 &&
|
||||
l_errno != EAGAIN &&
|
||||
l_errno != EWOULDBLOCK &&
|
||||
l_errno != 0)
|
||||
{
|
||||
n = -1;
|
||||
goto return_n;
|
||||
}
|
||||
}
|
||||
n = 0;
|
||||
goto return_n;
|
||||
}
|
||||
else if (b == 0)
|
||||
{
|
||||
n = 0;
|
||||
goto return_n;
|
||||
}
|
||||
|
||||
dcb->last_read = hkheartbeat;
|
||||
|
||||
bufsize = MIN(b, nbytes);
|
||||
|
||||
if ((buffer = gwbuf_alloc(bufsize)) == NULL)
|
||||
{
|
||||
/*<
|
||||
* This is a fatal error which should cause shutdown.
|
||||
* Todo shutdown if memory allocation fails.
|
||||
*/
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Failed to allocate read buffer "
|
||||
"for dcb %p fd %d, due %d, %s.",
|
||||
dcb,
|
||||
dcb->fd,
|
||||
errno,
|
||||
strerror(errno))));
|
||||
|
||||
n = -1;
|
||||
goto return_n;
|
||||
}
|
||||
GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize);
|
||||
dcb->stats.n_reads++);
|
||||
|
||||
if (n <= 0)
|
||||
{
|
||||
if (errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Read failed, dcb %p in state "
|
||||
"%s fd %d, due %d, %s.",
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd,
|
||||
errno,
|
||||
strerror(errno))));
|
||||
}
|
||||
gwbuf_free(buffer);
|
||||
goto return_n;
|
||||
}
|
||||
nread += n;
|
||||
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_read] Read %d bytes from dcb %p in state %s "
|
||||
"fd %d.",
|
||||
pthread_self(),
|
||||
n,
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
/*< Append read data to the gwbuf */
|
||||
*head = gwbuf_append(*head, buffer);
|
||||
|
||||
return_n:
|
||||
return n;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* General purpose read routine to read data from a socket through the SSL
|
||||
* structure lined with this DCB and append it to a linked list of buffers.
|
||||
* The list may be empty, in which case *head == NULL. The SSL structure should
|
||||
* be initialized and the SSL handshake should be done.
|
||||
*
|
||||
* @param dcb The DCB to read from
|
||||
* @param head Pointer to linked list to append data to
|
||||
* @return -1 on error, otherwise the number of read bytes on the last
|
||||
* iteration of while loop. 0 is returned if no data available.
|
||||
*/
|
||||
int dcb_read_SSL(
|
||||
DCB *dcb,
|
||||
GWBUF **head)
|
||||
{
|
||||
GWBUF *buffer = NULL;
|
||||
int b,pending;
|
||||
int rc;
|
||||
int n;
|
||||
int nread = 0;
|
||||
int ssl_errno = 0;
|
||||
CHK_DCB(dcb);
|
||||
|
||||
if (dcb->fd <= 0)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Read failed, dcb is %s.",
|
||||
dcb->fd == DCBFD_CLOSED ? "closed" : "cloned, not readable")));
|
||||
n = 0;
|
||||
goto return_n;
|
||||
}
|
||||
|
||||
while (true)
|
||||
{
|
||||
int bufsize;
|
||||
ssl_errno = 0;
|
||||
rc = ioctl(dcb->fd, FIONREAD, &b);
|
||||
pending = SSL_pending(dcb->ssl);
|
||||
if (rc == -1)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : ioctl FIONREAD for dcb %p in "
|
||||
"state %s fd %d failed due error %d, %s.",
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd,
|
||||
errno,
|
||||
strerror(errno))));
|
||||
n = -1;
|
||||
goto return_n;
|
||||
}
|
||||
|
||||
if (b == 0 && pending == 0 && nread == 0)
|
||||
{
|
||||
/** Handle closed client socket */
|
||||
if (dcb_isclient(dcb))
|
||||
{
|
||||
char c = 0;
|
||||
int r = -1;
|
||||
|
||||
/* try to read 1 byte, without consuming the socket buffer */
|
||||
r = SSL_peek(dcb->ssl, &c, sizeof(char));
|
||||
if (r <= 0)
|
||||
{
|
||||
ssl_errno = SSL_get_error(dcb->ssl,r);
|
||||
if(ssl_errno != SSL_ERROR_WANT_READ &&
|
||||
ssl_errno != SSL_ERROR_WANT_WRITE &&
|
||||
ssl_errno != SSL_ERROR_NONE)
|
||||
n = -1;
|
||||
else
|
||||
n = 0;
|
||||
goto return_n;
|
||||
}
|
||||
}
|
||||
n = 0;
|
||||
goto return_n;
|
||||
}
|
||||
else if (b == 0 && pending == 0)
|
||||
{
|
||||
n = 0;
|
||||
goto return_n;
|
||||
}
|
||||
#ifdef SS_DEBUG
|
||||
else
|
||||
{
|
||||
skygw_log_write_flush(LD,"Total: %d Socket: %d Pending: %d",
|
||||
nread,b,pending);
|
||||
}
|
||||
#endif
|
||||
|
||||
dcb->last_read = hkheartbeat;
|
||||
|
||||
bufsize = MIN(b, MAX_BUFFER_SIZE);
|
||||
|
||||
if ((buffer = gwbuf_alloc(bufsize)) == NULL)
|
||||
{
|
||||
/*<
|
||||
* This is a fatal error which should cause shutdown.
|
||||
* Todo shutdown if memory allocation fails.
|
||||
*/
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Failed to allocate read buffer "
|
||||
"for dcb %p fd %d, due %d, %s.",
|
||||
dcb,
|
||||
dcb->fd,
|
||||
errno,
|
||||
strerror(errno))));
|
||||
|
||||
n = -1;
|
||||
goto return_n;
|
||||
}
|
||||
|
||||
n = SSL_read(dcb->ssl, GWBUF_DATA(buffer), bufsize);
|
||||
dcb->stats.n_reads++;
|
||||
|
||||
if (n < 0)
|
||||
{
|
||||
char errbuf[200];
|
||||
ssl_errno = SSL_get_error(dcb->ssl,n);
|
||||
#ifdef SS_DEBUG
|
||||
if(ssl_errno == SSL_ERROR_SSL ||
|
||||
ssl_errno == SSL_ERROR_SYSCALL)
|
||||
{
|
||||
int eno;
|
||||
while((eno = ERR_get_error()) != 0)
|
||||
{
|
||||
ERR_error_string(eno,errbuf);
|
||||
skygw_log_write(LE,
|
||||
"%s",
|
||||
errbuf);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
if(ssl_errno == SSL_ERROR_WANT_READ ||
|
||||
ssl_errno == SSL_ERROR_WANT_WRITE ||
|
||||
ssl_errno == SSL_ERROR_NONE)
|
||||
{
|
||||
n = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Read failed, dcb %p in state "
|
||||
"%s fd %d, SSL error %d: %s.",
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd,
|
||||
ssl_errno,
|
||||
strerror(errno))));
|
||||
|
||||
if(ssl_errno == SSL_ERROR_SSL ||
|
||||
ssl_errno == SSL_ERROR_SYSCALL)
|
||||
{
|
||||
while((ssl_errno = ERR_get_error()) != 0)
|
||||
{
|
||||
ERR_error_string(ssl_errno,errbuf);
|
||||
skygw_log_write(LE,
|
||||
"%s",
|
||||
errbuf);
|
||||
}
|
||||
}
|
||||
}
|
||||
n = -1;
|
||||
gwbuf_free(buffer);
|
||||
goto return_n;
|
||||
}
|
||||
else if(n == 0)
|
||||
{
|
||||
gwbuf_free(buffer);
|
||||
goto return_n;
|
||||
}
|
||||
|
||||
gwbuf_rtrim(buffer,bufsize - n);
|
||||
#ifdef SS_DEBUG
|
||||
skygw_log_write(LD,"%lu SSL: Truncated buffer from %d to %d bytes. "
|
||||
"Read %d bytes, %d bytes waiting.\n",pthread_self(),
|
||||
bufsize,GWBUF_LENGTH(buffer),n,b);
|
||||
|
||||
if(GWBUF_LENGTH(buffer) != n){
|
||||
skygw_log_sync_all();
|
||||
}
|
||||
|
||||
ss_info_dassert((buffer->start <= buffer->end),"Buffer start has passed end.");
|
||||
ss_info_dassert(GWBUF_LENGTH(buffer) == n,"Buffer size not equal to read bytes.");
|
||||
#endif
|
||||
nread += n;
|
||||
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_read_SSL] Read %d bytes from dcb %p in state %s "
|
||||
"fd %d.",
|
||||
pthread_self(),
|
||||
n,
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
|
||||
/*< Append read data to the gwbuf */
|
||||
*head = gwbuf_append(*head, buffer);
|
||||
rc = ioctl(dcb->fd, FIONREAD, &b);
|
||||
pending = SSL_pending(dcb->ssl);
|
||||
|
||||
} /*< while (true) */
|
||||
return_n:
|
||||
return n;
|
||||
}
|
||||
/**
|
||||
* General purpose routine to write to a DCB
|
||||
*
|
||||
@ -1152,6 +1512,272 @@ int below_water;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* General purpose routine to write to an SSL enabled DCB
|
||||
*
|
||||
* @param dcb The DCB of the client
|
||||
* @param ssl The SSL structure for this DCB
|
||||
* @param queue Queue of buffers to write
|
||||
* @return 0 on failure, 1 on success
|
||||
*/
|
||||
int
|
||||
dcb_write_SSL(DCB *dcb, GWBUF *queue)
|
||||
{
|
||||
int w;
|
||||
int saved_errno = 0;
|
||||
int below_water;
|
||||
|
||||
below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0;
|
||||
ss_dassert(queue != NULL);
|
||||
|
||||
if (dcb->fd <= 0)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Write failed, dcb is %s.",
|
||||
dcb->fd == DCBFD_CLOSED ? "closed" : "cloned, not writable")));
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* SESSION_STATE_STOPPING means that one of the backends is closing
|
||||
* the router session. Some backends may have not completed
|
||||
* authentication yet and thus they have no information about router
|
||||
* being closed. Session state is changed to SESSION_STATE_STOPPING
|
||||
* before router's closeSession is called and that tells that DCB may
|
||||
* still be writable.
|
||||
*/
|
||||
if (queue == NULL ||
|
||||
(dcb->state != DCB_STATE_ALLOC &&
|
||||
dcb->state != DCB_STATE_POLLING &&
|
||||
dcb->state != DCB_STATE_LISTENING &&
|
||||
dcb->state != DCB_STATE_NOPOLLING &&
|
||||
(dcb->session == NULL ||
|
||||
dcb->session->state != SESSION_STATE_STOPPING)))
|
||||
{
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Write aborted to dcb %p because "
|
||||
"it is in state %s",
|
||||
pthread_self(),
|
||||
dcb->stats.n_buffered,
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
//ss_dassert(false);
|
||||
return 0;
|
||||
}
|
||||
|
||||
spinlock_acquire(&dcb->writeqlock);
|
||||
|
||||
if (dcb->writeq != NULL)
|
||||
{
|
||||
/*
|
||||
* We have some queued data, so add our data to
|
||||
* the write queue and return.
|
||||
* The assumption is that there will be an EPOLLOUT
|
||||
* event to drain what is already queued. We are protected
|
||||
* by the spinlock, which will also be acquired by the
|
||||
* the routine that drains the queue data, so we should
|
||||
* not have a race condition on the event.
|
||||
*/
|
||||
if (queue)
|
||||
{
|
||||
int qlen;
|
||||
|
||||
qlen = gwbuf_length(queue);
|
||||
atomic_add(&dcb->writeqlen, qlen);
|
||||
dcb->writeq = gwbuf_append(dcb->writeq, queue);
|
||||
dcb->stats.n_buffered++;
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Append to writequeue. %d writes "
|
||||
"buffered for dcb %p in state %s fd %d",
|
||||
pthread_self(),
|
||||
dcb->stats.n_buffered,
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Loop over the buffer chain that has been passed to us
|
||||
* from the reading side.
|
||||
* Send as much of the data in that chain as possible and
|
||||
* add any balance to the write queue.
|
||||
*/
|
||||
while (queue != NULL)
|
||||
{
|
||||
int qlen;
|
||||
#if defined(FAKE_CODE)
|
||||
if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER &&
|
||||
dcb->session != NULL)
|
||||
{
|
||||
if (dcb_isclient(dcb) && fail_next_client_fd) {
|
||||
dcb_fake_write_errno[dcb->fd] = 32;
|
||||
dcb_fake_write_ev[dcb->fd] = 29;
|
||||
fail_next_client_fd = false;
|
||||
} else if (!dcb_isclient(dcb) &&
|
||||
fail_next_backend_fd)
|
||||
{
|
||||
dcb_fake_write_errno[dcb->fd] = 32;
|
||||
dcb_fake_write_ev[dcb->fd] = 29;
|
||||
fail_next_backend_fd = false;
|
||||
}
|
||||
}
|
||||
#endif /* FAKE_CODE */
|
||||
qlen = GWBUF_LENGTH(queue);
|
||||
|
||||
w = gw_write_SSL(dcb->ssl, GWBUF_DATA(queue), qlen);
|
||||
dcb->stats.n_writes++;
|
||||
|
||||
if (w < 0)
|
||||
{
|
||||
int ssl_errno = SSL_get_error(dcb->ssl,w);
|
||||
|
||||
if (LOG_IS_ENABLED(LOGFILE_DEBUG))
|
||||
{
|
||||
switch(ssl_errno)
|
||||
{
|
||||
case SSL_ERROR_WANT_READ:
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Write to dcb "
|
||||
"%p in state %s fd %d failed "
|
||||
"due error SSL_ERROR_WANT_READ",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
break;
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Write to dcb "
|
||||
"%p in state %s fd %d failed "
|
||||
"due error SSL_ERROR_WANT_WRITE",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
break;
|
||||
default:
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Write to dcb "
|
||||
"%p in state %s fd %d failed "
|
||||
"due error %d",
|
||||
pthread_self(),
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd,ssl_errno)));
|
||||
if(ssl_errno == SSL_ERROR_SSL ||
|
||||
ssl_errno == SSL_ERROR_SYSCALL)
|
||||
{
|
||||
while((ssl_errno = ERR_get_error()) != 0)
|
||||
{
|
||||
char errbuf[140];
|
||||
ERR_error_string(ssl_errno,errbuf);
|
||||
skygw_log_write(LE,"%s",errbuf);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG_IS_ENABLED(LOGFILE_ERROR))
|
||||
{
|
||||
if (ssl_errno != 0)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Write to dcb %p in "
|
||||
"state %s fd %d failed due "
|
||||
"SSL error %d",
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd,
|
||||
ssl_errno)));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
/*
|
||||
* Pull the number of bytes we have written from
|
||||
* queue with have.
|
||||
*/
|
||||
queue = gwbuf_consume(queue, w);
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Wrote %d Bytes to dcb %p in "
|
||||
"state %s fd %d",
|
||||
pthread_self(),
|
||||
w,
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd)));
|
||||
} /*< while (queue != NULL) */
|
||||
/*<
|
||||
* What wasn't successfully written is stored to write queue
|
||||
* for suspended write.
|
||||
*/
|
||||
dcb->writeq = queue;
|
||||
|
||||
if (queue)
|
||||
{
|
||||
int qlen;
|
||||
|
||||
qlen = gwbuf_length(queue);
|
||||
atomic_add(&dcb->writeqlen, qlen);
|
||||
dcb->stats.n_buffered++;
|
||||
}
|
||||
} /* if (dcb->writeq) */
|
||||
|
||||
if (saved_errno != 0 &&
|
||||
queue != NULL &&
|
||||
saved_errno != EAGAIN &&
|
||||
saved_errno != EWOULDBLOCK)
|
||||
{
|
||||
bool dolog = true;
|
||||
|
||||
/**
|
||||
* Do not log if writing COM_QUIT to backend failed.
|
||||
*/
|
||||
if (GWBUF_IS_TYPE_MYSQL(queue))
|
||||
{
|
||||
uint8_t* data = GWBUF_DATA(queue);
|
||||
|
||||
if (data[4] == 0x01)
|
||||
{
|
||||
dolog = false;
|
||||
}
|
||||
}
|
||||
if (dolog)
|
||||
{
|
||||
LOGIF(LD, (skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [dcb_write] Writing to %s socket failed due %d, %s.",
|
||||
pthread_self(),
|
||||
dcb_isclient(dcb) ? "client" : "backend server",
|
||||
saved_errno,
|
||||
strerror(saved_errno))));
|
||||
}
|
||||
spinlock_release(&dcb->writeqlock);
|
||||
return 0;
|
||||
}
|
||||
spinlock_release(&dcb->writeqlock);
|
||||
|
||||
if (dcb->high_water && dcb->writeqlen > dcb->high_water && below_water)
|
||||
{
|
||||
atomic_add(&dcb->stats.n_high_water, 1);
|
||||
dcb_call_callback(dcb, DCB_REASON_HIGH_WATER);
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Drain the write queue of a DCB. This is called as part of the EPOLLOUT handling
|
||||
* of a socket and will try to send any buffered data from the write queue
|
||||
@ -1244,6 +1870,85 @@ int above_water;
|
||||
return n;
|
||||
}
|
||||
|
||||
/**
|
||||
* Drain the write queue of a DCB. This is called as part of the EPOLLOUT handling
|
||||
* of a socket and will try to send any buffered data from the write queue
|
||||
* up until the point the write would block. This function uses SSL encryption
|
||||
* and the SSL handshake should have been completed prior to calling this function.
|
||||
*
|
||||
* @param dcb DCB to drain the write queue of
|
||||
* @return The number of bytes written
|
||||
*/
|
||||
int
|
||||
dcb_drain_writeq_SSL(DCB *dcb)
|
||||
{
|
||||
int n = 0;
|
||||
int w;
|
||||
int saved_errno = 0;
|
||||
int above_water;
|
||||
|
||||
above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0;
|
||||
|
||||
spinlock_acquire(&dcb->writeqlock);
|
||||
|
||||
if (dcb->writeq)
|
||||
{
|
||||
int len;
|
||||
/*
|
||||
* Loop over the buffer chain in the pending writeq
|
||||
* Send as much of the data in that chain as possible and
|
||||
* leave any balance on the write queue.
|
||||
*/
|
||||
while (dcb->writeq != NULL)
|
||||
{
|
||||
len = GWBUF_LENGTH(dcb->writeq);
|
||||
w = gw_write_SSL(dcb->ssl, GWBUF_DATA(dcb->writeq), len);
|
||||
|
||||
if (w < 0)
|
||||
{
|
||||
int ssl_errno = ERR_get_error();
|
||||
|
||||
if(ssl_errno == SSL_ERROR_WANT_WRITE ||
|
||||
ssl_errno == SSL_ERROR_WANT_ACCEPT ||
|
||||
ssl_errno == SSL_ERROR_WANT_READ)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Write to dcb %p "
|
||||
"in state %s fd %d failed: %s",
|
||||
dcb,
|
||||
STRDCBSTATE(dcb->state),
|
||||
dcb->fd,
|
||||
ERR_error_string(ssl_errno,NULL));
|
||||
break;
|
||||
}
|
||||
/*
|
||||
* Pull the number of bytes we have written from
|
||||
* queue with have.
|
||||
*/
|
||||
dcb->writeq = gwbuf_consume(dcb->writeq, w);
|
||||
n += w;
|
||||
}
|
||||
}
|
||||
spinlock_release(&dcb->writeqlock);
|
||||
atomic_add(&dcb->writeqlen, -n);
|
||||
|
||||
/* The write queue has drained, potentially need to call a callback function */
|
||||
if (dcb->writeq == NULL)
|
||||
dcb_call_callback(dcb, DCB_REASON_DRAINED);
|
||||
|
||||
if (above_water && dcb->writeqlen < dcb->low_water)
|
||||
{
|
||||
atomic_add(&dcb->stats.n_low_water, 1);
|
||||
dcb_call_callback(dcb, DCB_REASON_LOW_WATER);
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes dcb from poll set, and adds it to zombies list. As a consequence,
|
||||
* dcb first moves to DCB_STATE_NOPOLLING, and then to DCB_STATE_ZOMBIE state.
|
||||
@ -1951,6 +2656,30 @@ dcb_set_state_nolock(
|
||||
return succp;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write data to a socket through an SSL structure. The SSL structure is linked to a DCB's socket
|
||||
* and all communication is encrypted and done via the SSL structure.
|
||||
*
|
||||
* @param ssl The SSL structure to use for writing
|
||||
* @param buf Buffer to write
|
||||
* @param nbytes Number of bytes to write
|
||||
* @return Number of written bytes
|
||||
*/
|
||||
int
|
||||
gw_write_SSL(SSL* ssl, const void *buf, size_t nbytes)
|
||||
{
|
||||
int w = 0;
|
||||
int fd = SSL_get_fd(ssl);
|
||||
|
||||
if (fd > 0)
|
||||
{
|
||||
w = SSL_write(ssl, buf, nbytes);
|
||||
}
|
||||
return w;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Write data to a DCB
|
||||
*
|
||||
@ -2443,6 +3172,205 @@ DCB *ptr;
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the SSL structure for this DCB.
|
||||
* This function creates the SSL structure for the given SSL context. This context
|
||||
* should be the service's context
|
||||
* @param dcb
|
||||
* @param context
|
||||
* @return
|
||||
*/
|
||||
int dcb_create_SSL(DCB* dcb)
|
||||
{
|
||||
|
||||
if(serviceInitSSL(dcb->service) != 0)
|
||||
{
|
||||
return -1;
|
||||
}
|
||||
|
||||
if((dcb->ssl = SSL_new(dcb->service->ctx)) == NULL)
|
||||
{
|
||||
skygw_log_write(LE,"Error: Failed to initialize SSL for connection.");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if(SSL_set_fd(dcb->ssl,dcb->fd) == 0)
|
||||
{
|
||||
skygw_log_write(LE,"Error: Failed to set file descriptor for SSL connection.");
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Accept a SSL connection and do the SSL authentication handshake.
|
||||
* This function accepts a client connection to a DCB. It assumes that the SSL
|
||||
* structure has the underlying method of communication set and this method is ready
|
||||
* for usage. It then proceeds with the SSL handshake and stops only if an error
|
||||
* occurs or the client has not yet written enough data to complete the handshake.
|
||||
* @param dcb DCB which should accept the SSL connection
|
||||
* @return 1 if the handshake was successfully completed, 0 if the handshake is
|
||||
* still ongoing and another call to dcb_SSL_accept should be made or -1 if an
|
||||
* error occurred during the handshake and the connection should be terminated.
|
||||
*/
|
||||
int dcb_accept_SSL(DCB* dcb)
|
||||
{
|
||||
int rval = 0,ssl_rval,errnum = 0,fd,b = 0,pending;
|
||||
char errbuf[140];
|
||||
fd = dcb->fd;
|
||||
|
||||
do
|
||||
{
|
||||
ssl_rval = SSL_accept(dcb->ssl);
|
||||
errnum = SSL_get_error(dcb->ssl,ssl_rval);
|
||||
LOGIF(LD,(skygw_log_write_flush(LD,"[dcb_accept_SSL] SSL_accept %d, error %d",
|
||||
ssl_rval,errnum)));
|
||||
switch(ssl_rval)
|
||||
{
|
||||
case 0:
|
||||
errnum = SSL_get_error(dcb->ssl,ssl_rval);
|
||||
skygw_log_write(LE,"Error: SSL authentication failed (SSL error %d):",
|
||||
dcb,
|
||||
dcb->remote,
|
||||
errnum);
|
||||
|
||||
if(errnum == SSL_ERROR_SSL ||
|
||||
errnum == SSL_ERROR_SYSCALL)
|
||||
{
|
||||
while((errnum = ERR_get_error()) != 0)
|
||||
{
|
||||
ERR_error_string(errnum,errbuf);
|
||||
skygw_log_write(LE,"%s",errbuf);
|
||||
}
|
||||
}
|
||||
rval = -1;
|
||||
break;
|
||||
case 1:
|
||||
rval = 1;
|
||||
LOGIF(LD,(skygw_log_write_flush(LD,"[dcb_accept_SSL] SSL_accept done for %s",
|
||||
dcb->remote)));
|
||||
return rval;
|
||||
|
||||
case -1:
|
||||
|
||||
errnum = SSL_get_error(dcb->ssl,ssl_rval);
|
||||
|
||||
if(errnum == SSL_ERROR_WANT_READ || errnum == SSL_ERROR_WANT_WRITE)
|
||||
{
|
||||
/** Not all of the data has been read. Go back to the poll
|
||||
queue and wait for more.*/
|
||||
rval = 0;
|
||||
LOGIF(LD,(skygw_log_write_flush(LD,"[dcb_accept_SSL] SSL_accept ongoing for %s",
|
||||
dcb->remote)));
|
||||
return rval;
|
||||
}
|
||||
else
|
||||
{
|
||||
rval = -1;
|
||||
skygw_log_write(LE,
|
||||
"Error: Fatal error in SSL_accept for %s: (SSL version: %s SSL error code: %d)",
|
||||
dcb->remote,
|
||||
SSL_get_version(dcb->ssl),
|
||||
errnum);
|
||||
if(errnum == SSL_ERROR_SSL ||
|
||||
errnum == SSL_ERROR_SYSCALL)
|
||||
{
|
||||
while((errnum = ERR_get_error()) != 0)
|
||||
{
|
||||
ERR_error_string(errnum,errbuf);
|
||||
skygw_log_write(LE,
|
||||
"%s",
|
||||
errbuf);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
skygw_log_write_flush(LE,
|
||||
"Error: Fatal library error in SSL_accept, returned value was %d.",
|
||||
ssl_rval);
|
||||
rval = -1;
|
||||
break;
|
||||
}
|
||||
ioctl(fd,FIONREAD,&b);
|
||||
pending = SSL_pending(dcb->ssl);
|
||||
#ifdef SS_DEBUG
|
||||
skygw_log_write_flush(LD,"[dcb_accept_SSL] fd %d: %d bytes, %d pending",fd,b,pending);
|
||||
#endif
|
||||
}while((b > 0 || pending > 0) && rval != -1);
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initiate an SSL client connection to a server
|
||||
*
|
||||
* This functions starts an SSL client connection to a server which is expecting
|
||||
* an SSL handshake. The DCB should already have a TCP connection to the server and
|
||||
* this connection should be in a state that expects an SSL handshake.
|
||||
* @param dcb DCB to connect
|
||||
* @return 1 on success, -1 on error and 0 if the SSL handshake is still ongoing
|
||||
*/
|
||||
int dcb_connect_SSL(DCB* dcb)
|
||||
{
|
||||
int rval,errnum;
|
||||
char errbuf[140];
|
||||
rval = SSL_connect(dcb->ssl);
|
||||
|
||||
switch(rval)
|
||||
{
|
||||
case 0:
|
||||
errnum = SSL_get_error(dcb->ssl,rval);
|
||||
LOGIF(LD,(skygw_log_write_flush(LD,"SSL_connect shutdown for %s@%s",
|
||||
dcb->user,
|
||||
dcb->remote)));
|
||||
return -1;
|
||||
break;
|
||||
case 1:
|
||||
rval = 1;
|
||||
LOGIF(LD,(skygw_log_write_flush(LD,"SSL_connect done for %s@%s",
|
||||
dcb->user,
|
||||
dcb->remote)));
|
||||
return rval;
|
||||
|
||||
case -1:
|
||||
errnum = SSL_get_error(dcb->ssl,rval);
|
||||
|
||||
if(errnum == SSL_ERROR_WANT_READ || errnum == SSL_ERROR_WANT_WRITE)
|
||||
{
|
||||
/** Not all of the data has been read. Go back to the poll
|
||||
queue and wait for more.*/
|
||||
|
||||
rval = 0;
|
||||
LOGIF(LD,(skygw_log_write_flush(LD,"SSL_connect ongoing for %s@%s",
|
||||
dcb->user,
|
||||
dcb->remote)));
|
||||
}
|
||||
else
|
||||
{
|
||||
rval = -1;
|
||||
ERR_error_string(errnum,errbuf);
|
||||
skygw_log_write_flush(LE,
|
||||
"Error: Fatal error in SSL_accept for %s@%s: (SSL error code: %d) %s",
|
||||
dcb->user,
|
||||
dcb->remote,
|
||||
errnum,
|
||||
errbuf);
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
skygw_log_write_flush(LE,
|
||||
"Error: Fatal error in SSL_connect, returned value was %d.",
|
||||
rval);
|
||||
break;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a DCB role to a string, the returned
|
||||
* string has been malloc'd and must be free'd by the caller
|
||||
|
||||
@ -33,7 +33,7 @@
|
||||
* 30/08/14 Massimiliano Pinto Addition of new service status description
|
||||
* 30/10/14 Massimiliano Pinto Addition of SERVER_MASTER_STICKINESS description
|
||||
* 01/06/15 Massimiliano Pinto Addition of server_update_address/port
|
||||
*
|
||||
* 19/06/15 Martin Brampton Extra code for persistent connections
|
||||
* @endverbatim
|
||||
*/
|
||||
#include <stdio.h>
|
||||
|
||||
@ -36,7 +36,8 @@
|
||||
* 06/02/15 Mark Riddoch Added caching of authentication data
|
||||
* 18/02/15 Mark Riddoch Added result set management
|
||||
* 03/03/15 Massimiliano Pinto Added config_enable_feedback_task() call in serviceStartAll
|
||||
*
|
||||
* 19/06/15 Martin Brampton More meaningful names for temp variables
|
||||
|
||||
* @endverbatim
|
||||
*/
|
||||
#include <stdio.h>
|
||||
|
||||
@ -58,6 +58,7 @@ struct service;
|
||||
* 08/05/2014 Mark Riddoch Addition of writeq high and low watermarks
|
||||
* 27/08/2014 Mark Riddoch Addition of write event queuing
|
||||
* 23/09/2014 Mark Riddoch New poll processing queue
|
||||
* 19/06/2015 Martin Brampton Provision of persistent connections
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
@ -362,4 +363,4 @@ int dcb_drain_writeq_SSL(DCB *dcb);
|
||||
|
||||
#define DCB_IS_CLONE(d) ((d)->flags & DCBF_CLONE)
|
||||
#define DCB_REPLIED(d) ((d)->flags & DCBF_REPLIED)
|
||||
#endif /* _DCB_H *
|
||||
#endif /* _DCB_H */
|
||||
|
||||
@ -44,7 +44,8 @@
|
||||
* 27/10/14 Massimiliano Pinto Addition of SERVER_MASTER_STICKINESS
|
||||
* 19/02/15 Mark Riddoch Addition of serverGetList
|
||||
* 01/06/15 Massimiliano Pinto Addition of server_update_address/port
|
||||
*
|
||||
* 19/06/15 Martin Brampton Extra fields for persistent connections, CHK_SERVER
|
||||
|
||||
* @endverbatim
|
||||
*/
|
||||
|
||||
|
||||
@ -72,7 +72,7 @@ static void backend_set_delayqueue(DCB *dcb, GWBUF *queue);
|
||||
static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue);
|
||||
static GWBUF* process_response_data (DCB* dcb, GWBUF* readbuf, int nbytes_to_process);
|
||||
extern char* create_auth_failed_msg( GWBUF* readbuf, char* hostaddr, uint8_t* sha1);
|
||||
extern char* create_auth_fail_str(char *username, char *hostaddr, char *sha1, char *db);
|
||||
extern char* create_auth_fail_str(char *username, char *hostaddr, char *sha1, char *db, int errcode);
|
||||
static bool sescmd_response_complete(DCB* dcb);
|
||||
|
||||
|
||||
@ -1446,7 +1446,8 @@ static int gw_change_user(
|
||||
message = create_auth_fail_str(username,
|
||||
backend->session->client->remote,
|
||||
password_set,
|
||||
"");
|
||||
"",
|
||||
auth_ret);
|
||||
if (message == NULL)
|
||||
{
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
|
||||
Reference in New Issue
Block a user