Merged dcb_read and dcb_read_n into a single function (dcb_read with an extra parameter).

This commit is contained in:
counterpoint
2015-07-10 15:12:22 +01:00
parent 015cb890d4
commit 4f5de063eb
7 changed files with 52 additions and 196 deletions

View File

@ -54,6 +54,7 @@
* 07/07/2015 Martin Brampton Merged add to zombieslist into dcb_close, * 07/07/2015 Martin Brampton Merged add to zombieslist into dcb_close,
* fixes for various error situations, * fixes for various error situations,
* remove dcb_set_state etc, simplifications. * remove dcb_set_state etc, simplifications.
* 10/07/2015 Martin Brampton Simplify, merge dcb_read and dcb_read_n
* *
* @endverbatim * @endverbatim
*/ */
@ -733,26 +734,28 @@ dcb_connect(SERVER *server, SESSION *session, const char *protocol)
return dcb; return dcb;
} }
/** /**
* General purpose read routine to read data from a socket in the * General purpose read routine to read data from a socket in the
* Descriptor Control Block and append it to a linked list of buffers. * Descriptor Control Block and append it to a linked list of buffers.
* The list may be empty, in which case *head == NULL * The list may be empty, in which case *head == NULL. The third
* parameter indicates the maximum number of bytes to be read (needed
* for SSL processing) with 0 meaning no limit.
* *
* @param dcb The DCB to read from * @param dcb The DCB to read from
* @param head Pointer to linked list to append data to * @param head Pointer to linked list to append data to
* @return -1 on error, otherwise the number of read bytes on the last * @param maxbytes Maximum bytes to read (0 = no limit)
* iteration of while loop. 0 is returned if no data available. * @return -1 on error, otherwise the number of read bytes on
* the last iteration of while loop. 0 is returned if no data available.
*/ */
int dcb_read( int dcb_read(
DCB *dcb, DCB *dcb,
GWBUF **head) GWBUF **head,
int maxbytes)
{ {
GWBUF *buffer = NULL; GWBUF *buffer = NULL;
int b; int bytesavailable;
int rc; int nsingleread = 0;
int n; int nreadtotal = 0;
int nread = 0;
CHK_DCB(dcb); CHK_DCB(dcb);
@ -760,34 +763,32 @@ int dcb_read(
{ {
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Read failed, dcb is %s.", "%lu [dcb_read] Error : Read failed, dcb is %s.",
pthread_self(),
dcb->fd == DCBFD_CLOSED ? "closed" : "cloned, not readable"))); dcb->fd == DCBFD_CLOSED ? "closed" : "cloned, not readable")));
n = 0; return 0;
goto return_n;
} }
while (true) while (0 == maxbytes || nreadtotal < maxbytes)
{ {
int bufsize; int bufsize;
rc = ioctl(dcb->fd, FIONREAD, &b); if (-1 == ioctl(dcb->fd, FIONREAD, &bytesavailable))
if (rc == -1)
{ {
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : ioctl FIONREAD for dcb %p in " "%lu [dcb_read] Error : ioctl FIONREAD for dcb %p in "
"state %s fd %d failed due error %d, %s.", "state %s fd %d failed due error %d, %s.",
pthread_self(),
dcb, dcb,
STRDCBSTATE(dcb->state), STRDCBSTATE(dcb->state),
dcb->fd, dcb->fd,
errno, errno,
strerror(errno)))); strerror(errno))));
n = -1; return -1;
goto return_n;
} }
if (b == 0 && nread == 0) if (bytesavailable == 0 && nreadtotal == 0)
{ {
/** Handle closed client socket */ /** Handle closed client socket */
if (dcb_isclient(dcb)) if (dcb_isclient(dcb))
@ -805,22 +806,20 @@ int dcb_read(
l_errno != EWOULDBLOCK && l_errno != EWOULDBLOCK &&
l_errno != 0) l_errno != 0)
{ {
n = -1; return -1;
goto return_n;
} }
} }
n = 0; return 0;
goto return_n;
} }
else if (b == 0) else if (bytesavailable == 0)
{ {
n = 0; return 0;
goto return_n;
} }
dcb->last_read = hkheartbeat; dcb->last_read = hkheartbeat;
bufsize = MIN(b, MAX_BUFFER_SIZE); bufsize = MIN(bytesavailable, MAX_BUFFER_SIZE);
if (maxbytes) bufsize = MIN(bufsize, maxbytes);
if ((buffer = gwbuf_alloc(bufsize)) == NULL) if ((buffer = gwbuf_alloc(bufsize)) == NULL)
{ {
@ -830,27 +829,28 @@ int dcb_read(
*/ */
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Failed to allocate read buffer " "%lu [dcb_read] Error : Failed to allocate read buffer "
"for dcb %p fd %d, due %d, %s.", "for dcb %p fd %d, due %d, %s.",
pthread_self(),
dcb, dcb,
dcb->fd, dcb->fd,
errno, errno,
strerror(errno)))); strerror(errno))));
n = -1; return -1;
goto return_n;
} }
GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); GW_NOINTR_CALL(nsingleread = read(dcb->fd, GWBUF_DATA(buffer), bufsize);
dcb->stats.n_reads++); dcb->stats.n_reads++);
if (n <= 0) if (nsingleread <= 0)
{ {
if (errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK) if (errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK)
{ {
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Read failed, dcb %p in state " "%lu [dcb_read] Error : Read failed, dcb %p in state "
"%s fd %d, due %d, %s.", "%s fd %d, due %d, %s.",
pthread_self(),
dcb, dcb,
STRDCBSTATE(dcb->state), STRDCBSTATE(dcb->state),
dcb->fd, dcb->fd,
@ -858,169 +858,26 @@ int dcb_read(
strerror(errno)))); strerror(errno))));
} }
gwbuf_free(buffer); gwbuf_free(buffer);
goto return_n; return nsingleread;
} }
nread += n; nreadtotal += nsingleread;
LOGIF(LD, (skygw_log_write( LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG, LOGFILE_DEBUG,
"%lu [dcb_read] Read %d bytes from dcb %p in state %s " "%lu [dcb_read] Read %d bytes from dcb %p in state %s "
"fd %d.", "fd %d.",
pthread_self(), pthread_self(),
n, nsingleread,
dcb, dcb,
STRDCBSTATE(dcb->state), STRDCBSTATE(dcb->state),
dcb->fd))); dcb->fd)));
/*< Append read data to the gwbuf */ /*< Append read data to the gwbuf */
*head = gwbuf_append(*head, buffer); *head = gwbuf_append(*head, buffer);
} /*< while (true) */ } /*< while (0 == maxbytes || nreadtotal < maxbytes) */
return_n:
return n; return nsingleread;
} }
/**
* General purpose read routine to read data from a socket in the
* Descriptor Control Block and append it to a linked list of buffers.
* This function will read at most nbytes of data.
*
* The list may be empty, in which case *head == NULL. This
*
* @param dcb The DCB to read from
* @param head Pointer to linked list to append data to
* @param nbytes Maximum number of bytes read
* @return -1 on error, otherwise the number of read bytes on the last
* iteration of while loop. 0 is returned if no data available.
*/
int dcb_read_n(
DCB *dcb,
GWBUF **head,
int nbytes)
{
GWBUF *buffer = NULL;
int b;
int rc;
int n;
int nread = 0;
CHK_DCB(dcb);
if (dcb->fd <= 0)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Read failed, dcb is %s.",
dcb->fd == DCBFD_CLOSED ? "closed" : "cloned, not readable")));
n = 0;
goto return_n;
}
int bufsize;
rc = ioctl(dcb->fd, FIONREAD, &b);
if (rc == -1)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : ioctl FIONREAD for dcb %p in "
"state %s fd %d failed due error %d, %s.",
dcb,
STRDCBSTATE(dcb->state),
dcb->fd,
errno,
strerror(errno))));
n = -1;
goto return_n;
}
if (b == 0)
{
/** Handle closed client socket */
if (nread == 0 && dcb_isclient(dcb))
{
char c;
int l_errno = 0;
int r = -1;
/* try to read 1 byte, without consuming the socket buffer */
r = recv(dcb->fd, &c, sizeof(char), MSG_PEEK);
l_errno = errno;
if (r <= 0 &&
l_errno != EAGAIN &&
l_errno != EWOULDBLOCK &&
l_errno != 0)
{
n = -1;
goto return_n;
}
}
n = 0;
goto return_n;
}
dcb->last_read = hkheartbeat;
bufsize = MIN(b, nbytes);
if ((buffer = gwbuf_alloc(bufsize)) == NULL)
{
/*<
* This is a fatal error which should cause shutdown.
* Todo shutdown if memory allocation fails.
*/
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Failed to allocate read buffer "
"for dcb %p fd %d, due %d, %s.",
dcb,
dcb->fd,
errno,
strerror(errno))));
n = -1;
goto return_n;
}
GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize);
dcb->stats.n_reads++);
if (n <= 0)
{
if (errno != 0 && errno != EAGAIN && errno != EWOULDBLOCK)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Read failed, dcb %p in state "
"%s fd %d, due %d, %s.",
dcb,
STRDCBSTATE(dcb->state),
dcb->fd,
errno,
strerror(errno))));
}
gwbuf_free(buffer);
goto return_n;
}
nread += n;
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [dcb_read] Read %d bytes from dcb %p in state %s "
"fd %d.",
pthread_self(),
n,
dcb,
STRDCBSTATE(dcb->state),
dcb->fd)));
/*< Append read data to the gwbuf */
*head = gwbuf_append(*head, buffer);
return_n:
return n;
}
/** /**
* General purpose read routine to read data from a socket through the SSL * General purpose read routine to read data from a socket through the SSL
* structure lined with this DCB and append it to a linked list of buffers. * structure lined with this DCB and append it to a linked list of buffers.

View File

@ -315,8 +315,7 @@ DCB *dcb_alloc(dcb_role_t);
void dcb_free(DCB *); void dcb_free(DCB *);
DCB *dcb_connect(struct server *, struct session *, const char *); DCB *dcb_connect(struct server *, struct session *, const char *);
DCB *dcb_clone(DCB *); DCB *dcb_clone(DCB *);
int dcb_read(DCB *, GWBUF **); int dcb_read(DCB *, GWBUF **, int);
int dcb_read_n(DCB*,GWBUF **,int);
int dcb_drain_writeq(DCB *); int dcb_drain_writeq(DCB *);
void dcb_close(DCB *); void dcb_close(DCB *);
DCB *dcb_process_zombies(int); /* Process Zombies except the one behind the pointer */ DCB *dcb_process_zombies(int); /* Process Zombies except the one behind the pointer */

View File

@ -143,7 +143,7 @@ SESSION *session = dcb->session;
MAXSCALED *maxscaled = (MAXSCALED *)dcb->protocol; MAXSCALED *maxscaled = (MAXSCALED *)dcb->protocol;
char *password; char *password;
if ((n = dcb_read(dcb, &head)) != -1) if ((n = dcb_read(dcb, &head, 0)) != -1)
{ {
if (head) if (head)

View File

@ -452,7 +452,7 @@ static int gw_read_backend_event(DCB *dcb) {
router_instance = session->service->router_instance; router_instance = session->service->router_instance;
/* read available backend data */ /* read available backend data */
rc = dcb_read(dcb, &read_buffer); rc = dcb_read(dcb, &read_buffer, 0);
if (rc < 0) if (rc < 0)
{ {

View File

@ -515,7 +515,7 @@ static int gw_mysql_do_authentication(DCB *dcb, GWBUF **buf) {
* is not requesting SSL and the rest of the auth packet is still * is not requesting SSL and the rest of the auth packet is still
* waiting in the socket. We need to read the data from the socket * waiting in the socket. We need to read the data from the socket
* to find out the username of the connecting client. */ * to find out the username of the connecting client. */
int bytes = dcb_read(dcb,&queue); int bytes = dcb_read(dcb,&queue, 0);
queue = gwbuf_make_contiguous(queue); queue = gwbuf_make_contiguous(queue);
client_auth_packet = GWBUF_DATA(queue); client_auth_packet = GWBUF_DATA(queue);
client_auth_packet_size = gwbuf_length(queue); client_auth_packet_size = gwbuf_length(queue);
@ -723,12 +723,12 @@ int gw_read_client_event(
* read only enough of the auth packet to know if the client is * read only enough of the auth packet to know if the client is
* requesting SSL. If the client is not requesting SSL the rest of * requesting SSL. If the client is not requesting SSL the rest of
the auth packet will be read later. */ the auth packet will be read later. */
rc = dcb_read_n(dcb, &read_buffer,(4 + 4 + 4 + 1 + 23)); rc = dcb_read(dcb, &read_buffer,(4 + 4 + 4 + 1 + 23));
} }
else else
{ {
/** Normal non-SSL connection */ /** Normal non-SSL connection */
rc = dcb_read(dcb, &read_buffer); rc = dcb_read(dcb, &read_buffer, 0);
} }
if (rc < 0) if (rc < 0)

View File

@ -173,7 +173,7 @@ int gw_read_backend_handshake(
int success = 0; int success = 0;
int packet_len = 0; int packet_len = 0;
if ((n = dcb_read(dcb, &head)) != -1) if ((n = dcb_read(dcb, &head, 0)) != -1)
{ {
dcb->last_read = hkheartbeat; dcb->last_read = hkheartbeat;
@ -426,7 +426,7 @@ int gw_receive_backend_auth(
uint8_t *ptr = NULL; uint8_t *ptr = NULL;
int rc = 0; int rc = 0;
n = dcb_read(dcb, &head); n = dcb_read(dcb, &head, 0);
dcb->last_read = hkheartbeat; dcb->last_read = hkheartbeat;

View File

@ -155,7 +155,7 @@ SESSION *session = dcb->session;
TELNETD *telnetd = (TELNETD *)dcb->protocol; TELNETD *telnetd = (TELNETD *)dcb->protocol;
char *password, *t; char *password, *t;
if ((n = dcb_read(dcb, &head)) != -1) if ((n = dcb_read(dcb, &head, 0)) != -1)
{ {
if (head) if (head)