diff --git a/log_manager/log_manager.cc b/log_manager/log_manager.cc index 9f80b2799..0fa80858a 100644 --- a/log_manager/log_manager.cc +++ b/log_manager/log_manager.cc @@ -302,8 +302,6 @@ static int find_last_seqno(strpart_t* parts, int seqno, int seqnoidx); void flushall_logfiles(bool flush); bool thr_flushall_check(); -#include "../core/atomic.c" - const char* get_suffix_default(void) { return ".log"; diff --git a/server/core/config.c b/server/core/config.c index c0d55447c..bc4c42210 100644 --- a/server/core/config.c +++ b/server/core/config.c @@ -1175,7 +1175,7 @@ config_get_value(CONFIG_PARAMETER *params, const char *name) } /** - * Get the value of a config parameter + * Get the value of a config parameter as a string * * @param params The linked list of config parameters * @param name The parameter to return diff --git a/server/core/dcb.c b/server/core/dcb.c index 766044604..98436f72d 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -49,6 +49,7 @@ * backend * 07/05/2014 Mark Riddoch Addition of callback mechanism * 20/06/2014 Mark Riddoch Addition of dcb_clone + * 29/05/2015 Markus Makela Addition of dcb_write_SSL * 11/06/2015 Martin Brampton Persistent connnections and tidy up * * @endverbatim @@ -159,12 +160,12 @@ DCB *newdcb; { return NULL; } - newdcb->dcb_chk_top = CHK_NUM_DCB; - newdcb->dcb_chk_tail = CHK_NUM_DCB; + newdcb->dcb_chk_top = CHK_NUM_DCB; + newdcb->dcb_chk_tail = CHK_NUM_DCB; newdcb->dcb_errhandle_called = false; - newdcb->dcb_role = role; - spinlock_init(&newdcb->dcb_initlock); + newdcb->dcb_role = role; + spinlock_init(&newdcb->dcb_initlock); spinlock_init(&newdcb->writeqlock); spinlock_init(&newdcb->delayqlock); spinlock_init(&newdcb->authlock); @@ -175,7 +176,7 @@ DCB *newdcb; newdcb->readcheck = 0; newdcb->polloutbusy = 0; newdcb->writecheck = 0; - newdcb->fd = DCBFD_CLOSED; + newdcb->fd = DCBFD_CLOSED; newdcb->evq.next = NULL; newdcb->evq.prev = NULL; @@ -189,12 +190,12 @@ DCB *newdcb; newdcb->writeqlen = 0; newdcb->high_water = 0; newdcb->low_water = 0; - newdcb->session = NULL; - newdcb->server = NULL; - newdcb->service = NULL; + newdcb->session = NULL; + newdcb->server = NULL; + newdcb->service = NULL; newdcb->next = NULL; - newdcb->nextpersistent = NULL; - newdcb->persistentstart = 0; + newdcb->nextpersistent = NULL; + newdcb->persistentstart = 0; newdcb->callbacks = NULL; newdcb->data = NULL; @@ -296,22 +297,22 @@ DCB *clonedcb; if ((clonedcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER))) { - clonedcb->fd = DCBFD_CLOSED; - clonedcb->flags |= DCBF_CLONE; - clonedcb->state = orig->state; - clonedcb->data = orig->data; - if (orig->remote) - clonedcb->remote = strdup(orig->remote); - if (orig->user) - clonedcb->user = strdup(orig->user); - clonedcb->protocol = orig->protocol; + clonedcb->fd = DCBFD_CLOSED; + clonedcb->flags |= DCBF_CLONE; + clonedcb->state = orig->state; + clonedcb->data = orig->data; + if (orig->remote) + clonedcb->remote = strdup(orig->remote); + if (orig->user) + clonedcb->user = strdup(orig->user); + clonedcb->protocol = orig->protocol; - clonedcb->func.write = dcb_null_write; - /** - * Close triggers closing of router session as well which is needed. - */ - clonedcb->func.close = orig->func.close; - clonedcb->func.auth = dcb_null_auth; + clonedcb->func.write = dcb_null_write; + /** + * Close triggers closing of router session as well which is needed. + */ + clonedcb->func.close = orig->func.close; + clonedcb->func.auth = dcb_null_auth; } return clonedcb; } @@ -418,7 +419,8 @@ DCB_CALLBACK *cb; free(cb); } spinlock_release(&dcb->cb_lock); - + if(dcb->ssl) + SSL_free(dcb->ssl); bitmask_free(&dcb->memdata.bitmask); free(dcb); } @@ -916,6 +918,364 @@ return_n: return n; } + +/** + * General purpose read routine to read data from a socket in the + * Descriptor Control Block and append it to a linked list of buffers. + * This function will read at most nbytes of data. + * + * The list may be empty, in which case *head == NULL. This + * + * @param dcb The DCB to read from + * @param head Pointer to linked list to append data to + * @param nbytes Maximum number of bytes read + * @return -1 on error, otherwise the number of read bytes on the last + * iteration of while loop. 0 is returned if no data available. + */ +int dcb_read_n( + DCB *dcb, + GWBUF **head, + int nbytes) +{ + GWBUF *buffer = NULL; + int b; + int rc; + int n; + int nread = 0; + + CHK_DCB(dcb); + + if (dcb->fd <= 0) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Read failed, dcb is %s.", + dcb->fd == DCBFD_CLOSED ? "closed" : "cloned, not readable"))); + n = 0; + goto return_n; + } + + int bufsize; + + rc = ioctl(dcb->fd, FIONREAD, &b); + + if (rc == -1) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : ioctl FIONREAD for dcb %p in " + "state %s fd %d failed due error %d, %s.", + dcb, + STRDCBSTATE(dcb->state), + dcb->fd, + errno, + strerror(errno)))); + n = -1; + goto return_n; + } + + if (b == 0 && nread == 0) + { + /** Handle closed client socket */ + if (dcb_isclient(dcb)) + { + char c; + int l_errno = 0; + int r = -1; + + /* try to read 1 byte, without consuming the socket buffer */ + r = recv(dcb->fd, &c, sizeof(char), MSG_PEEK); + l_errno = errno; + + if (r <= 0 && + l_errno != EAGAIN && + l_errno != EWOULDBLOCK && + l_errno != 0) + { + n = -1; + goto return_n; + } + } + n = 0; + goto return_n; + } + else if (b == 0) + { + n = 0; + goto return_n; + } + + dcb->last_read = hkheartbeat; + + bufsize = MIN(b, nbytes); + + if ((buffer = gwbuf_alloc(bufsize)) == NULL) + { + /*< + * This is a fatal error which should cause shutdown. + * Todo shutdown if memory allocation fails. + */ + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Failed to allocate read buffer " + "for dcb %p fd %d, due %d, %s.", + dcb, + dcb->fd, + errno, + strerror(errno)))); + + n = -1; + goto return_n; + } + GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); + dcb->stats.n_reads++); + + if (n <= 0) + { + if (errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Read failed, dcb %p in state " + "%s fd %d, due %d, %s.", + dcb, + STRDCBSTATE(dcb->state), + dcb->fd, + errno, + strerror(errno)))); + } + gwbuf_free(buffer); + goto return_n; + } + nread += n; + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [dcb_read] Read %d bytes from dcb %p in state %s " + "fd %d.", + pthread_self(), + n, + dcb, + STRDCBSTATE(dcb->state), + dcb->fd))); + /*< Append read data to the gwbuf */ + *head = gwbuf_append(*head, buffer); + +return_n: + return n; +} + + +/** + * General purpose read routine to read data from a socket through the SSL + * structure lined with this DCB and append it to a linked list of buffers. + * The list may be empty, in which case *head == NULL. The SSL structure should + * be initialized and the SSL handshake should be done. + * + * @param dcb The DCB to read from + * @param head Pointer to linked list to append data to + * @return -1 on error, otherwise the number of read bytes on the last + * iteration of while loop. 0 is returned if no data available. + */ +int dcb_read_SSL( + DCB *dcb, + GWBUF **head) +{ + GWBUF *buffer = NULL; + int b,pending; + int rc; + int n; + int nread = 0; + int ssl_errno = 0; + CHK_DCB(dcb); + + if (dcb->fd <= 0) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Read failed, dcb is %s.", + dcb->fd == DCBFD_CLOSED ? "closed" : "cloned, not readable"))); + n = 0; + goto return_n; + } + + while (true) + { + int bufsize; + ssl_errno = 0; + rc = ioctl(dcb->fd, FIONREAD, &b); + pending = SSL_pending(dcb->ssl); + if (rc == -1) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : ioctl FIONREAD for dcb %p in " + "state %s fd %d failed due error %d, %s.", + dcb, + STRDCBSTATE(dcb->state), + dcb->fd, + errno, + strerror(errno)))); + n = -1; + goto return_n; + } + + if (b == 0 && pending == 0 && nread == 0) + { + /** Handle closed client socket */ + if (dcb_isclient(dcb)) + { + char c = 0; + int r = -1; + + /* try to read 1 byte, without consuming the socket buffer */ + r = SSL_peek(dcb->ssl, &c, sizeof(char)); + if (r <= 0) + { + ssl_errno = SSL_get_error(dcb->ssl,r); + if(ssl_errno != SSL_ERROR_WANT_READ && + ssl_errno != SSL_ERROR_WANT_WRITE && + ssl_errno != SSL_ERROR_NONE) + n = -1; + else + n = 0; + goto return_n; + } + } + n = 0; + goto return_n; + } + else if (b == 0 && pending == 0) + { + n = 0; + goto return_n; + } +#ifdef SS_DEBUG + else + { + skygw_log_write_flush(LD,"Total: %d Socket: %d Pending: %d", + nread,b,pending); + } +#endif + + dcb->last_read = hkheartbeat; + + bufsize = MIN(b, MAX_BUFFER_SIZE); + + if ((buffer = gwbuf_alloc(bufsize)) == NULL) + { + /*< + * This is a fatal error which should cause shutdown. + * Todo shutdown if memory allocation fails. + */ + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Failed to allocate read buffer " + "for dcb %p fd %d, due %d, %s.", + dcb, + dcb->fd, + errno, + strerror(errno)))); + + n = -1; + goto return_n; + } + + n = SSL_read(dcb->ssl, GWBUF_DATA(buffer), bufsize); + dcb->stats.n_reads++; + + if (n < 0) + { + char errbuf[200]; + ssl_errno = SSL_get_error(dcb->ssl,n); +#ifdef SS_DEBUG + if(ssl_errno == SSL_ERROR_SSL || + ssl_errno == SSL_ERROR_SYSCALL) + { + int eno; + while((eno = ERR_get_error()) != 0) + { + ERR_error_string(eno,errbuf); + skygw_log_write(LE, + "%s", + errbuf); + } + } +#endif + if(ssl_errno == SSL_ERROR_WANT_READ || + ssl_errno == SSL_ERROR_WANT_WRITE || + ssl_errno == SSL_ERROR_NONE) + { + n = 0; + } + else + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Read failed, dcb %p in state " + "%s fd %d, SSL error %d: %s.", + dcb, + STRDCBSTATE(dcb->state), + dcb->fd, + ssl_errno, + strerror(errno)))); + + if(ssl_errno == SSL_ERROR_SSL || + ssl_errno == SSL_ERROR_SYSCALL) + { + while((ssl_errno = ERR_get_error()) != 0) + { + ERR_error_string(ssl_errno,errbuf); + skygw_log_write(LE, + "%s", + errbuf); + } + } + } + n = -1; + gwbuf_free(buffer); + goto return_n; + } + else if(n == 0) + { + gwbuf_free(buffer); + goto return_n; + } + + gwbuf_rtrim(buffer,bufsize - n); +#ifdef SS_DEBUG + skygw_log_write(LD,"%lu SSL: Truncated buffer from %d to %d bytes. " + "Read %d bytes, %d bytes waiting.\n",pthread_self(), + bufsize,GWBUF_LENGTH(buffer),n,b); + + if(GWBUF_LENGTH(buffer) != n){ + skygw_log_sync_all(); + } + + ss_info_dassert((buffer->start <= buffer->end),"Buffer start has passed end."); + ss_info_dassert(GWBUF_LENGTH(buffer) == n,"Buffer size not equal to read bytes."); +#endif + nread += n; + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [dcb_read_SSL] Read %d bytes from dcb %p in state %s " + "fd %d.", + pthread_self(), + n, + dcb, + STRDCBSTATE(dcb->state), + dcb->fd))); + + /*< Append read data to the gwbuf */ + *head = gwbuf_append(*head, buffer); + rc = ioctl(dcb->fd, FIONREAD, &b); + pending = SSL_pending(dcb->ssl); + + } /*< while (true) */ +return_n: + return n; +} /** * General purpose routine to write to a DCB * @@ -941,11 +1301,11 @@ int below_water; return 0; } /** - * SESSION_STATE_STOPPING means that one of the backends is closing - * the router session. Some backends may have not completed + * SESSION_STATE_STOPPING means that one of the backends is closing + * the router session. Some backends may have not completed * authentication yet and thus they have no information about router * being closed. Session state is changed to SESSION_STATE_STOPPING - * before router's closeSession is called and that tells that DCB may + * before router's closeSession is called and that tells that DCB may * still be writable. */ if (queue == NULL || @@ -968,9 +1328,9 @@ int below_water; //ss_dassert(false); return 0; } - + spinlock_acquire(&dcb->writeqlock); - + if (dcb->writeq != NULL) { /* @@ -985,7 +1345,7 @@ int below_water; if (queue) { int qlen; - + qlen = gwbuf_length(queue); atomic_add(&dcb->writeqlen, qlen); dcb->writeq = gwbuf_append(dcb->writeq, queue); @@ -1034,7 +1394,7 @@ int below_water; w = gw_write(dcb, GWBUF_DATA(queue), qlen); dcb->stats.n_writes++; ); - + if (w < 0) { saved_errno = errno; @@ -1042,7 +1402,7 @@ int below_water; if (LOG_IS_ENABLED(LOGFILE_DEBUG)) { - if (saved_errno == EPIPE) + if (saved_errno == EPIPE) { LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, @@ -1055,9 +1415,9 @@ int below_water; dcb->fd, saved_errno, strerror(saved_errno)))); - } + } } - + if (LOG_IS_ENABLED(LOGFILE_ERROR)) { if (saved_errno != EPIPE && @@ -1102,7 +1462,7 @@ int below_water; if (queue) { int qlen; - + qlen = gwbuf_length(queue); atomic_add(&dcb->writeqlen, qlen); dcb->stats.n_buffered++; @@ -1122,7 +1482,7 @@ int below_water; if (GWBUF_IS_TYPE_MYSQL(queue)) { uint8_t* data = GWBUF_DATA(queue); - + if (data[4] == 0x01) { dolog = false; @@ -1152,6 +1512,272 @@ int below_water; return 1; } +/** + * General purpose routine to write to an SSL enabled DCB + * + * @param dcb The DCB of the client + * @param ssl The SSL structure for this DCB + * @param queue Queue of buffers to write + * @return 0 on failure, 1 on success + */ +int +dcb_write_SSL(DCB *dcb, GWBUF *queue) +{ + int w; + int saved_errno = 0; + int below_water; + + below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0; + ss_dassert(queue != NULL); + + if (dcb->fd <= 0) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Write failed, dcb is %s.", + dcb->fd == DCBFD_CLOSED ? "closed" : "cloned, not writable"))); + return 0; + } + + /** + * SESSION_STATE_STOPPING means that one of the backends is closing + * the router session. Some backends may have not completed + * authentication yet and thus they have no information about router + * being closed. Session state is changed to SESSION_STATE_STOPPING + * before router's closeSession is called and that tells that DCB may + * still be writable. + */ + if (queue == NULL || + (dcb->state != DCB_STATE_ALLOC && + dcb->state != DCB_STATE_POLLING && + dcb->state != DCB_STATE_LISTENING && + dcb->state != DCB_STATE_NOPOLLING && + (dcb->session == NULL || + dcb->session->state != SESSION_STATE_STOPPING))) + { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [dcb_write] Write aborted to dcb %p because " + "it is in state %s", + pthread_self(), + dcb->stats.n_buffered, + dcb, + STRDCBSTATE(dcb->state), + dcb->fd))); + //ss_dassert(false); + return 0; + } + + spinlock_acquire(&dcb->writeqlock); + + if (dcb->writeq != NULL) + { + /* + * We have some queued data, so add our data to + * the write queue and return. + * The assumption is that there will be an EPOLLOUT + * event to drain what is already queued. We are protected + * by the spinlock, which will also be acquired by the + * the routine that drains the queue data, so we should + * not have a race condition on the event. + */ + if (queue) + { + int qlen; + + qlen = gwbuf_length(queue); + atomic_add(&dcb->writeqlen, qlen); + dcb->writeq = gwbuf_append(dcb->writeq, queue); + dcb->stats.n_buffered++; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [dcb_write] Append to writequeue. %d writes " + "buffered for dcb %p in state %s fd %d", + pthread_self(), + dcb->stats.n_buffered, + dcb, + STRDCBSTATE(dcb->state), + dcb->fd))); + } + } + else + { + /* + * Loop over the buffer chain that has been passed to us + * from the reading side. + * Send as much of the data in that chain as possible and + * add any balance to the write queue. + */ + while (queue != NULL) + { + int qlen; +#if defined(FAKE_CODE) + if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER && + dcb->session != NULL) + { + if (dcb_isclient(dcb) && fail_next_client_fd) { + dcb_fake_write_errno[dcb->fd] = 32; + dcb_fake_write_ev[dcb->fd] = 29; + fail_next_client_fd = false; + } else if (!dcb_isclient(dcb) && + fail_next_backend_fd) + { + dcb_fake_write_errno[dcb->fd] = 32; + dcb_fake_write_ev[dcb->fd] = 29; + fail_next_backend_fd = false; + } + } +#endif /* FAKE_CODE */ + qlen = GWBUF_LENGTH(queue); + + w = gw_write_SSL(dcb->ssl, GWBUF_DATA(queue), qlen); + dcb->stats.n_writes++; + + if (w < 0) + { + int ssl_errno = SSL_get_error(dcb->ssl,w); + + if (LOG_IS_ENABLED(LOGFILE_DEBUG)) + { + switch(ssl_errno) + { + case SSL_ERROR_WANT_READ: + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [dcb_write] Write to dcb " + "%p in state %s fd %d failed " + "due error SSL_ERROR_WANT_READ", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state), + dcb->fd))); + break; + case SSL_ERROR_WANT_WRITE: + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [dcb_write] Write to dcb " + "%p in state %s fd %d failed " + "due error SSL_ERROR_WANT_WRITE", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state), + dcb->fd))); + break; + default: + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [dcb_write] Write to dcb " + "%p in state %s fd %d failed " + "due error %d", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state), + dcb->fd,ssl_errno))); + if(ssl_errno == SSL_ERROR_SSL || + ssl_errno == SSL_ERROR_SYSCALL) + { + while((ssl_errno = ERR_get_error()) != 0) + { + char errbuf[140]; + ERR_error_string(ssl_errno,errbuf); + skygw_log_write(LE,"%s",errbuf); + } + } + break; + } + } + + if (LOG_IS_ENABLED(LOGFILE_ERROR)) + { + if (ssl_errno != 0) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Write to dcb %p in " + "state %s fd %d failed due " + "SSL error %d", + dcb, + STRDCBSTATE(dcb->state), + dcb->fd, + ssl_errno))); + } + } + break; + } + /* + * Pull the number of bytes we have written from + * queue with have. + */ + queue = gwbuf_consume(queue, w); + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [dcb_write] Wrote %d Bytes to dcb %p in " + "state %s fd %d", + pthread_self(), + w, + dcb, + STRDCBSTATE(dcb->state), + dcb->fd))); + } /*< while (queue != NULL) */ + /*< + * What wasn't successfully written is stored to write queue + * for suspended write. + */ + dcb->writeq = queue; + + if (queue) + { + int qlen; + + qlen = gwbuf_length(queue); + atomic_add(&dcb->writeqlen, qlen); + dcb->stats.n_buffered++; + } + } /* if (dcb->writeq) */ + + if (saved_errno != 0 && + queue != NULL && + saved_errno != EAGAIN && + saved_errno != EWOULDBLOCK) + { + bool dolog = true; + + /** + * Do not log if writing COM_QUIT to backend failed. + */ + if (GWBUF_IS_TYPE_MYSQL(queue)) + { + uint8_t* data = GWBUF_DATA(queue); + + if (data[4] == 0x01) + { + dolog = false; + } + } + if (dolog) + { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [dcb_write] Writing to %s socket failed due %d, %s.", + pthread_self(), + dcb_isclient(dcb) ? "client" : "backend server", + saved_errno, + strerror(saved_errno)))); + } + spinlock_release(&dcb->writeqlock); + return 0; + } + spinlock_release(&dcb->writeqlock); + + if (dcb->high_water && dcb->writeqlen > dcb->high_water && below_water) + { + atomic_add(&dcb->stats.n_high_water, 1); + dcb_call_callback(dcb, DCB_REASON_HIGH_WATER); + } + + return 1; +} + /** * Drain the write queue of a DCB. This is called as part of the EPOLLOUT handling * of a socket and will try to send any buffered data from the write queue @@ -1244,6 +1870,85 @@ int above_water; return n; } +/** + * Drain the write queue of a DCB. This is called as part of the EPOLLOUT handling + * of a socket and will try to send any buffered data from the write queue + * up until the point the write would block. This function uses SSL encryption + * and the SSL handshake should have been completed prior to calling this function. + * + * @param dcb DCB to drain the write queue of + * @return The number of bytes written + */ +int +dcb_drain_writeq_SSL(DCB *dcb) +{ + int n = 0; + int w; + int saved_errno = 0; + int above_water; + + above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0; + + spinlock_acquire(&dcb->writeqlock); + + if (dcb->writeq) + { + int len; + /* + * Loop over the buffer chain in the pending writeq + * Send as much of the data in that chain as possible and + * leave any balance on the write queue. + */ + while (dcb->writeq != NULL) + { + len = GWBUF_LENGTH(dcb->writeq); + w = gw_write_SSL(dcb->ssl, GWBUF_DATA(dcb->writeq), len); + + if (w < 0) + { + int ssl_errno = ERR_get_error(); + + if(ssl_errno == SSL_ERROR_WANT_WRITE || + ssl_errno == SSL_ERROR_WANT_ACCEPT || + ssl_errno == SSL_ERROR_WANT_READ) + { + break; + } + + skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Write to dcb %p " + "in state %s fd %d failed: %s", + dcb, + STRDCBSTATE(dcb->state), + dcb->fd, + ERR_error_string(ssl_errno,NULL)); + break; + } + /* + * Pull the number of bytes we have written from + * queue with have. + */ + dcb->writeq = gwbuf_consume(dcb->writeq, w); + n += w; + } + } + spinlock_release(&dcb->writeqlock); + atomic_add(&dcb->writeqlen, -n); + + /* The write queue has drained, potentially need to call a callback function */ + if (dcb->writeq == NULL) + dcb_call_callback(dcb, DCB_REASON_DRAINED); + + if (above_water && dcb->writeqlen < dcb->low_water) + { + atomic_add(&dcb->stats.n_low_water, 1); + dcb_call_callback(dcb, DCB_REASON_LOW_WATER); + } + + return n; +} + /** * Removes dcb from poll set, and adds it to zombies list. As a consequence, * dcb first moves to DCB_STATE_NOPOLLING, and then to DCB_STATE_ZOMBIE state. @@ -1951,6 +2656,30 @@ dcb_set_state_nolock( return succp; } +/** + * Write data to a socket through an SSL structure. The SSL structure is linked to a DCB's socket + * and all communication is encrypted and done via the SSL structure. + * + * @param ssl The SSL structure to use for writing + * @param buf Buffer to write + * @param nbytes Number of bytes to write + * @return Number of written bytes + */ +int +gw_write_SSL(SSL* ssl, const void *buf, size_t nbytes) +{ + int w = 0; + int fd = SSL_get_fd(ssl); + + if (fd > 0) + { + w = SSL_write(ssl, buf, nbytes); + } + return w; +} + + + /** * Write data to a DCB * @@ -2443,6 +3172,205 @@ DCB *ptr; return rval; } +/** + * Create the SSL structure for this DCB. + * This function creates the SSL structure for the given SSL context. This context + * should be the service's context + * @param dcb + * @param context + * @return + */ +int dcb_create_SSL(DCB* dcb) +{ + + if(serviceInitSSL(dcb->service) != 0) + { + return -1; + } + + if((dcb->ssl = SSL_new(dcb->service->ctx)) == NULL) + { + skygw_log_write(LE,"Error: Failed to initialize SSL for connection."); + return -1; + } + + if(SSL_set_fd(dcb->ssl,dcb->fd) == 0) + { + skygw_log_write(LE,"Error: Failed to set file descriptor for SSL connection."); + return -1; + } + + return 0; +} + +/** + * Accept a SSL connection and do the SSL authentication handshake. + * This function accepts a client connection to a DCB. It assumes that the SSL + * structure has the underlying method of communication set and this method is ready + * for usage. It then proceeds with the SSL handshake and stops only if an error + * occurs or the client has not yet written enough data to complete the handshake. + * @param dcb DCB which should accept the SSL connection + * @return 1 if the handshake was successfully completed, 0 if the handshake is + * still ongoing and another call to dcb_SSL_accept should be made or -1 if an + * error occurred during the handshake and the connection should be terminated. + */ +int dcb_accept_SSL(DCB* dcb) +{ + int rval = 0,ssl_rval,errnum = 0,fd,b = 0,pending; + char errbuf[140]; + fd = dcb->fd; + + do + { + ssl_rval = SSL_accept(dcb->ssl); + errnum = SSL_get_error(dcb->ssl,ssl_rval); + LOGIF(LD,(skygw_log_write_flush(LD,"[dcb_accept_SSL] SSL_accept %d, error %d", + ssl_rval,errnum))); + switch(ssl_rval) + { + case 0: + errnum = SSL_get_error(dcb->ssl,ssl_rval); + skygw_log_write(LE,"Error: SSL authentication failed (SSL error %d):", + dcb, + dcb->remote, + errnum); + + if(errnum == SSL_ERROR_SSL || + errnum == SSL_ERROR_SYSCALL) + { + while((errnum = ERR_get_error()) != 0) + { + ERR_error_string(errnum,errbuf); + skygw_log_write(LE,"%s",errbuf); + } + } + rval = -1; + break; + case 1: + rval = 1; + LOGIF(LD,(skygw_log_write_flush(LD,"[dcb_accept_SSL] SSL_accept done for %s", + dcb->remote))); + return rval; + + case -1: + + errnum = SSL_get_error(dcb->ssl,ssl_rval); + + if(errnum == SSL_ERROR_WANT_READ || errnum == SSL_ERROR_WANT_WRITE) + { + /** Not all of the data has been read. Go back to the poll + queue and wait for more.*/ + rval = 0; + LOGIF(LD,(skygw_log_write_flush(LD,"[dcb_accept_SSL] SSL_accept ongoing for %s", + dcb->remote))); + return rval; + } + else + { + rval = -1; + skygw_log_write(LE, + "Error: Fatal error in SSL_accept for %s: (SSL version: %s SSL error code: %d)", + dcb->remote, + SSL_get_version(dcb->ssl), + errnum); + if(errnum == SSL_ERROR_SSL || + errnum == SSL_ERROR_SYSCALL) + { + while((errnum = ERR_get_error()) != 0) + { + ERR_error_string(errnum,errbuf); + skygw_log_write(LE, + "%s", + errbuf); + } + } + } + break; + + default: + skygw_log_write_flush(LE, + "Error: Fatal library error in SSL_accept, returned value was %d.", + ssl_rval); + rval = -1; + break; + } + ioctl(fd,FIONREAD,&b); + pending = SSL_pending(dcb->ssl); +#ifdef SS_DEBUG + skygw_log_write_flush(LD,"[dcb_accept_SSL] fd %d: %d bytes, %d pending",fd,b,pending); +#endif + }while((b > 0 || pending > 0) && rval != -1); + + return rval; +} + +/** + * Initiate an SSL client connection to a server + * + * This functions starts an SSL client connection to a server which is expecting + * an SSL handshake. The DCB should already have a TCP connection to the server and + * this connection should be in a state that expects an SSL handshake. + * @param dcb DCB to connect + * @return 1 on success, -1 on error and 0 if the SSL handshake is still ongoing + */ +int dcb_connect_SSL(DCB* dcb) +{ + int rval,errnum; + char errbuf[140]; + rval = SSL_connect(dcb->ssl); + + switch(rval) + { + case 0: + errnum = SSL_get_error(dcb->ssl,rval); + LOGIF(LD,(skygw_log_write_flush(LD,"SSL_connect shutdown for %s@%s", + dcb->user, + dcb->remote))); + return -1; + break; + case 1: + rval = 1; + LOGIF(LD,(skygw_log_write_flush(LD,"SSL_connect done for %s@%s", + dcb->user, + dcb->remote))); + return rval; + + case -1: + errnum = SSL_get_error(dcb->ssl,rval); + + if(errnum == SSL_ERROR_WANT_READ || errnum == SSL_ERROR_WANT_WRITE) + { + /** Not all of the data has been read. Go back to the poll + queue and wait for more.*/ + + rval = 0; + LOGIF(LD,(skygw_log_write_flush(LD,"SSL_connect ongoing for %s@%s", + dcb->user, + dcb->remote))); + } + else + { + rval = -1; + ERR_error_string(errnum,errbuf); + skygw_log_write_flush(LE, + "Error: Fatal error in SSL_accept for %s@%s: (SSL error code: %d) %s", + dcb->user, + dcb->remote, + errnum, + errbuf); + } + break; + + default: + skygw_log_write_flush(LE, + "Error: Fatal error in SSL_connect, returned value was %d.", + rval); + break; + } + + return rval; +} + /** * Convert a DCB role to a string, the returned * string has been malloc'd and must be free'd by the caller diff --git a/server/core/server.c b/server/core/server.c index 29eee33e0..8c16f3fe9 100644 --- a/server/core/server.c +++ b/server/core/server.c @@ -33,7 +33,7 @@ * 30/08/14 Massimiliano Pinto Addition of new service status description * 30/10/14 Massimiliano Pinto Addition of SERVER_MASTER_STICKINESS description * 01/06/15 Massimiliano Pinto Addition of server_update_address/port - * + * 19/06/15 Martin Brampton Extra code for persistent connections * @endverbatim */ #include diff --git a/server/core/service.c b/server/core/service.c index b1bba0957..3de32aa25 100644 --- a/server/core/service.c +++ b/server/core/service.c @@ -36,7 +36,8 @@ * 06/02/15 Mark Riddoch Added caching of authentication data * 18/02/15 Mark Riddoch Added result set management * 03/03/15 Massimiliano Pinto Added config_enable_feedback_task() call in serviceStartAll - * + * 19/06/15 Martin Brampton More meaningful names for temp variables + * @endverbatim */ #include diff --git a/server/include/dcb.h b/server/include/dcb.h index 8af20f026..fb0bc33c6 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -58,15 +58,16 @@ struct service; * 08/05/2014 Mark Riddoch Addition of writeq high and low watermarks * 27/08/2014 Mark Riddoch Addition of write event queuing * 23/09/2014 Mark Riddoch New poll processing queue + * 19/06/2015 Martin Brampton Provision of persistent connections * * @endverbatim */ struct dcb; - /** + /** * @verbatim - * The operations that can be performed on the descriptor + * The operations that can be performed on the descriptor * * read EPOLLIN handler for the socket * write MaxScale data write entry point @@ -80,7 +81,7 @@ struct dcb; * close MaxScale close entry point for the socket * listen Create a listener for the protocol * auth Authentication entry point - * session Session handling entry point + * session Session handling entry point * @endverbatim * * This forms the "module object" for protocol modules within the gateway. @@ -224,10 +225,10 @@ typedef struct dcb_callback { * gateway may be selected to execute the required actions when a network event occurs. */ typedef struct dcb { - skygw_chk_t dcb_chk_top; + skygw_chk_t dcb_chk_top; bool dcb_errhandle_called; /*< this can be called only once */ dcb_role_t dcb_role; - SPINLOCK dcb_initlock; + SPINLOCK dcb_initlock; DCBEVENTQ evq; /**< The event queue for this DCB */ int fd; /**< The descriptor */ dcb_state_t state; /**< Current descriptor state */ @@ -235,7 +236,7 @@ typedef struct dcb { char *remote; /**< Address of remote end */ char *user; /**< User name for connection */ struct sockaddr_in ipv4; /**< remote end IPv4 address */ - char *protoname; /**< Name of the protocol */ + char *protoname; /**< Name of the protocol */ void *protocol; /**< The protocol specific state */ struct session *session; /**< The owning session */ GWPROTOCOL func; /**< The functions for this descriptor */ @@ -249,10 +250,10 @@ typedef struct dcb { SPINLOCK authlock; /**< Generic Authorization spinlock */ DCBSTATS stats; /**< DCB related statistics */ - unsigned int dcb_server_status; /*< the server role indicator from SERVER */ + unsigned int dcb_server_status; /*< the server role indicator from SERVER */ struct dcb *next; /**< Next DCB in the chain of allocated DCB's */ - struct dcb *nextpersistent; /**< Next DCB in the persistent pool for SERVER */ - time_t persistentstart; /**< Time when DCB placed in persistent pool */ + struct dcb *nextpersistent; /**< Next DCB in the persistent pool for SERVER */ + time_t persistentstart; /**< Time when DCB placed in persistent pool */ struct service *service; /**< The related service */ void *data; /**< Specific client data */ DCBMM memdata; /**< The data related to DCB memory management */ @@ -265,13 +266,13 @@ typedef struct dcb { SPINLOCK polloutlock; int polloutbusy; int writecheck; - unsigned long last_read; /*< Last time the DCB received data */ + unsigned long last_read; /*< Last time the DCB received data */ unsigned int high_water; /**< High water mark */ unsigned int low_water; /**< Low water mark */ struct server *server; /**< The associated backend server */ - SSL* ssl; /*< SSL struct for connection */ - int dcb_port; /**< port of target server */ - skygw_chk_t dcb_chk_tail; + SSL* ssl; /*< SSL struct for connection */ + int dcb_port; /**< port of target server */ + skygw_chk_t dcb_chk_tail; } DCB; /** @@ -362,4 +363,4 @@ int dcb_drain_writeq_SSL(DCB *dcb); #define DCB_IS_CLONE(d) ((d)->flags & DCBF_CLONE) #define DCB_REPLIED(d) ((d)->flags & DCBF_REPLIED) -#endif /* _DCB_H * +#endif /* _DCB_H */ diff --git a/server/include/server.h b/server/include/server.h index 6c3244f50..986796a9f 100644 --- a/server/include/server.h +++ b/server/include/server.h @@ -44,7 +44,8 @@ * 27/10/14 Massimiliano Pinto Addition of SERVER_MASTER_STICKINESS * 19/02/15 Mark Riddoch Addition of serverGetList * 01/06/15 Massimiliano Pinto Addition of server_update_address/port - * + * 19/06/15 Martin Brampton Extra fields for persistent connections, CHK_SERVER + * @endverbatim */ diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index a4e065a81..818824def 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -72,7 +72,7 @@ static void backend_set_delayqueue(DCB *dcb, GWBUF *queue); static int gw_change_user(DCB *backend_dcb, SERVER *server, SESSION *in_session, GWBUF *queue); static GWBUF* process_response_data (DCB* dcb, GWBUF* readbuf, int nbytes_to_process); extern char* create_auth_failed_msg( GWBUF* readbuf, char* hostaddr, uint8_t* sha1); -extern char* create_auth_fail_str(char *username, char *hostaddr, char *sha1, char *db); +extern char* create_auth_fail_str(char *username, char *hostaddr, char *sha1, char *db, int errcode); static bool sescmd_response_complete(DCB* dcb); @@ -1446,7 +1446,8 @@ static int gw_change_user( message = create_auth_fail_str(username, backend->session->client->remote, password_set, - ""); + "", + auth_ret); if (message == NULL) { LOGIF(LE, (skygw_log_write_flush(