diff --git a/core/gw_utils.c b/core/gw_utils.c index 705d5c9d7..c5dbaa587 100644 --- a/core/gw_utils.c +++ b/core/gw_utils.c @@ -22,11 +22,15 @@ * * Date Who Description * 03-06-2013 Massimiliano Pinto gateway utils + * 12-06-2013 Massimiliano Pinto gw_read_gwbuff + * with error detection + * and its handling * */ #include #include +#include /// // set ip address in sockect struct @@ -190,4 +194,61 @@ int do_read_buffer(DCB *dcb, uint8_t *buffer) { return n; } +///////////////////////////////////////////////// +// Read data from dcb and store it in the gwbuf +///////////////////////////////////////////////// +int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) { + GWBUF *buffer = NULL; + int n = -1; + + if (b <= 0) + return 1; + + while (b > 0) { + int bufsize = b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE; + if ((buffer = gwbuf_alloc(bufsize)) == NULL) { + /* Bad news, we have run out of memory */ + /* Error handling */ + if (dcb->session->backends) { + (dcb->session->backends->func).error(dcb->session->backends, -1); + } + (dcb->func).error(dcb, -1); + return 1; + } + + GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); dcb->stats.n_reads++); + + if (n < 0) { + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { + fprintf(stderr, "Client connection %i: continue for %i, %s\n", dcb->fd, errno, strerror(errno)); + return 1; + } else { + fprintf(stderr, "Client connection %i error: %i, %s\n", dcb->fd, errno, strerror(errno));; + if (dcb->session->backends) { + (dcb->session->backends->func).error(dcb->session->backends, -1); + } + (dcb->func).error(dcb, -1); + return 1; + } + } + + if (n == 0) { + // socket closed + fprintf(stderr, "Client connection %i closed: %i, %s\n", dcb->fd, errno, strerror(errno)); + if (dcb->session->backends) { + (dcb->session->backends->func).error(dcb->session->backends, -1); + } + (dcb->func).error(dcb, -1); + return 1; + } + + // append read data to the gwbuf + *head = gwbuf_append(*head, buffer); + + // how many bytes left + b -= n; + } + + return 0; +} diff --git a/core/utils.c b/core/utils.c index 75b8df46e..75e3b040a 100644 --- a/core/utils.c +++ b/core/utils.c @@ -91,20 +91,18 @@ int gw_read_backend_event(DCB *dcb, int epfd) { // do the right task, not just break break; } + head = gwbuf_append(head, buffer); - + // how many bytes left b -= n; } - + // write the gwbuffer to client dcb->session->client->func.write(dcb->session->client, head); return 1; } -#ifdef GW_DEBUG_READ_EVENT - fprintf(stderr, "The backend says that Client Protocol state is %i\n", client_protocol->state); -#endif return 1; } @@ -198,10 +196,16 @@ int gw_write_backend_event(DCB *dcb, int epfd) { //client read event triggered by EPOLLIN ////////////////////////////////////////// int gw_route_read_event(DCB* dcb, int epfd) { - MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol); + MySQLProtocol *protocol = NULL; uint8_t buffer[MAX_BUFFER_SIZE] = ""; - int n; + int n = 0; int b = -1; + GWBUF *head = NULL; + GWBUF *gw_buffer = NULL; + + if (dcb) { + protocol = DCB_PROTOCOL(dcb, MySQLProtocol); + } if (ioctl(dcb->fd, FIONREAD, &b)) { @@ -210,56 +214,45 @@ int gw_route_read_event(DCB* dcb, int epfd) { fprintf(stderr, "Client IOCTL FIONREAD bytes to read = %i\n", b); } -#ifdef GW_DEBUG_READ_EVENT - fprintf(stderr, "Client DCB [%s], EPOLLIN Protocol state [%i] for socket %i, scramble [%s]\n", gw_dcb_state2string(dcb->state), protocol->state, dcb->fd, protocol->scramble); -#endif switch (protocol->state) { case MYSQL_AUTH_SENT: - // read client auth - n = read(dcb->fd, buffer, MAX_BUFFER_SIZE); - - fprintf(stderr, "Client DCB [%s], EPOLLIN Protocol state [%i] for socket %i, bytes read %i\n", gw_dcb_state2string(dcb->state), protocol->state, dcb->fd, n); - - if (n < 0) { - if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { - fprintf(stderr, "Client connection %i: continue for %i, %s\n", dcb->fd, errno, strerror(errno)); - break; - } else { - - fprintf(stderr, "Client connection %i error: %i, %s\n", dcb->fd, errno, strerror(errno));; - - if (dcb->session->backends) { - (dcb->session->backends->func).error(dcb->session->backends, -1); - } - (dcb->func).error(dcb, -1); - - break; - } - } - - if (n == 0) { - // EOF - fprintf(stderr, "Client connection %i closed: %i, %s\n", dcb->fd, errno, strerror(errno)); - - if (dcb->session->backends) { - (dcb->session->backends->func).error(dcb->session->backends, -1); - } - (dcb->func).error(dcb, -1); - - return 1; - } - // handle possible errors: - // 0 connection close - // -1, error: not EAGAIN or EWOULDBLOCK + /* + * Read all the data that is available into a chain of buffers + */ + { + int len = -1; + int ret = -1; + GWBUF *queue = NULL; + GWBUF *gw_buffer = NULL; + ////////////////////////////////////////////////////// + // read and handle errors & close, or return if busyA + // note: if b == 0 error handling is not triggered, just return + // without closing + ////////////////////////////////////////////////////// + + if ((ret = gw_read_gwbuff(dcb, &gw_buffer, b)) != 0) + return ret; + + // example with consume, assuming one buffer only ... + queue = gw_buffer; + len = GWBUF_LENGTH(queue); + + fprintf(stderr, "<<< Reading from Client %i bytes: [%s]\n", len, GWBUF_DATA(queue)); + + // gw_do_local_authentication(dcb, GWBUF_DATA(queue)); + + // Data printed on stderr or handled withot the dcb->func.write + // so consume it now + queue = gwbuf_consume(queue, len); + } + + // ToDo + // now we have the data in the buffer, do the authentication + // but don't consume the gwbuf as in the above example !!! protocol->state = MYSQL_AUTH_RECV; - -#ifdef GW_DEBUG_READ_EVENT - fprintf(stderr, "DCB [%i], EPOLLIN Protocol next state MYSQL_AUTH_RECV [%i], Packet #%i for socket %i, scramble [%s]\n", dcb->state, dcb->proto_state, 1, dcb->fd, protocol->scramble); -#endif - // check authentication // if OK return mysql_ok // else return error @@ -269,57 +262,65 @@ int gw_route_read_event(DCB* dcb, int epfd) { case MYSQL_IDLE: case MYSQL_WAITING_RESULT: - n = read(dcb->fd, buffer, MAX_BUFFER_SIZE); - if (n < 0) { - if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { - fprintf(stderr, "WAITING RESULT connection %i: continue for %i, %s\n", dcb->fd, errno, strerror(errno)); - break; - } else { - - fprintf(stderr, "connection %i error: %i, %s\n", dcb->fd, errno, strerror(errno));; + /* + * Read all the data that is available into a chain of buffers + */ + { + int len; + GWBUF *queue = NULL; + GWBUF *gw_buffer = NULL; + uint8_t *ptr_buff = NULL; + int mysql_command = -1; + int ret = -1; + + ////////////////////////////////////////////////////// + // read and handle errors & close, or return if busy + ////////////////////////////////////////////////////// + if ((ret = gw_read_gwbuff(dcb, &gw_buffer, b)) != 0) + return ret; - (dcb->session->backends->func).error(dcb->session->backends, -1); - (dcb->func).error(dcb, -1); - - return 1; - } - } + // Now assuming in the first buffer there is the information form mysql command - if (n == 0) { - fprintf(stderr, "connection %i closed: %i, %s\n", dcb->fd, errno, strerror(errno)); - if (dcb->session->backends) { - (dcb->session->backends->func).error(dcb->session->backends, -1); - } - (dcb->func).error(dcb, -1); + // following code is only for debug now + queue = gw_buffer; + len = GWBUF_LENGTH(queue); + + ptr_buff = GWBUF_DATA(queue); - return 1; - } + // get mysql commang + if (ptr_buff) + mysql_command = ptr_buff[4]; -#ifdef GW_DEBUG_READ_EVENT - fprintf(stderr, "Client DCB [%s], EPOLLIN Protocol state [%i] for fd %i has read %i bytes\n", gw_dcb_state2string(dcb->state), protocol->state, dcb->fd, n); -#endif - - if (buffer[4] == '\x01') { + if (mysql_command == '\x03') + /// this is a query !!!! + fprintf(stderr, "<<< MySQL Query from Client %i bytes: [%s]\n", len, ptr_buff+5); + else + fprintf(stderr, "<<< Reading from Client %i bytes: [%s]\n", len, ptr_buff); + + /////////////////////////// + // Handling the COM_QUIT + ////////////////////////// + if (mysql_command == '\x01') { fprintf(stderr, "COM_QUIT received\n"); if (dcb->session->backends) { - write(dcb->session->backends->fd, buffer, n); + dcb->session->backends->func.write(dcb, queue); (dcb->session->backends->func).error(dcb->session->backends, -1); } (dcb->func).error(dcb, -1); return 1; + } + + protocol->state = MYSQL_ROUTING; + + /////////////////////////////////////// + // writing in the backend buffer queue + /////////////////////////////////////// + dcb->session->backends->func.write(dcb->session->backends, queue); + + protocol->state = MYSQL_WAITING_RESULT; + } -#ifdef GW_DEBUG_READ_EVENT - fprintf(stderr, "DCB [%i], EPOLLIN Protocol next state MYSQL_ROUTING [%i], Packet #%i for socket %i, scramble [%s]\n", dcb->state, protocol->state, 1, dcb->fd, protocol->scramble); -#endif - protocol->state = MYSQL_ROUTING; - - write(dcb->session->backends->fd, buffer, n); -#ifdef GW_DEBUG_READ_EVENT - fprintf(stderr, "Client %i, has written to backend %i, btytes %i [%s]\n", dcb->fd, dcb->session->backends->fd, n, buffer); -#endif - protocol->state = MYSQL_WAITING_RESULT; - break; default: @@ -344,16 +345,11 @@ int gw_handle_write_event(DCB *dcb, int epfd) { return 1; } - fprintf(stderr, "DCB is ok, continue state [%i] is [%s]\n", dcb->state, gw_dcb_state2string(dcb->state)); - if (dcb->state == DCB_STATE_DISCONNECTED) { return 1; } - fprintf(stderr, "DCB is connected, continue\n"); - if (dcb->protocol) { - fprintf(stderr, "DCB protocol is OK, continue\n"); protocol = DCB_PROTOCOL(dcb, MySQLProtocol); } else { fprintf(stderr, "DCB protocol is NULL, return\n"); @@ -361,26 +357,16 @@ int gw_handle_write_event(DCB *dcb, int epfd) { } if (dcb->session) { - fprintf(stderr, "DCB session is OK, continue\n"); } else { fprintf(stderr, "DCB session is NULL, return\n"); return 1; } if (dcb->session->backends) { - fprintf(stderr, "DCB backend is OK, continue\n"); } else { fprintf(stderr, "DCB backend is NULL, continue\n"); } - if (dcb->session->backends) { - fprintf(stderr, "CLIENT WRITE READY State [%i], FIRST bytes left to write %i from back %i to client %i\n", dcb->state, gwbuf_length(dcb->writeq), dcb->session->backends->fd, dcb->fd); - } - -//#ifdef GW_DEBUG_WRITE_EVENT - fprintf(stderr, "$$$$$ DCB [%i], EPOLLOUT Protocol state is [%i], Packet #%i for socket %i, scramble [%s]\n", dcb->state, protocol->state, 1, dcb->fd, protocol->scramble); -//#endif - if(protocol->state == MYSQL_AUTH_RECV) { //write to client mysql AUTH_OK packet, packet n. is 2 @@ -413,7 +399,7 @@ int gw_handle_write_event(DCB *dcb, int epfd) { int len; /* - * Loop over the buffer chain in the pending writeq + * Loop over the buffer chain in the pendign writeq * Send as much of the data in that chain as possible and * leave any balance on the write queue. */ @@ -468,7 +454,7 @@ void MySQLListener(int epfd, char *config_bind) { struct epoll_event ev; // this gateway, as default, will bind on port 4404 for localhost only - (config_bind != NULL) ? (bind_address_and_port = config_bind) : (bind_address_and_port = "127.0.0.1:4404"); + (config_bind != NULL) ? (bind_address_and_port = config_bind) : (bind_address_and_port = "127.0.0.1:4406"); listener = (DCB *) calloc(1, sizeof(DCB)); @@ -631,6 +617,7 @@ int MySQLAccept(DCB *listener, int efd) { backend->state = DCB_STATE_POLLING; backend->session = session; (backend->func).read = gw_read_backend_event; + (backend->func).write = MySQLWrite; (backend->func).write_ready = gw_write_backend_event; (backend->func).error = handle_event_errors_backend;