New funcion for reading and store data in gwbuff, and its use in reading from client
This commit is contained in:
@ -22,11 +22,15 @@
|
|||||||
*
|
*
|
||||||
* Date Who Description
|
* Date Who Description
|
||||||
* 03-06-2013 Massimiliano Pinto gateway utils
|
* 03-06-2013 Massimiliano Pinto gateway utils
|
||||||
|
* 12-06-2013 Massimiliano Pinto gw_read_gwbuff
|
||||||
|
* with error detection
|
||||||
|
* and its handling
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <gw.h>
|
#include <gw.h>
|
||||||
#include <dcb.h>
|
#include <dcb.h>
|
||||||
|
#include <session.h>
|
||||||
|
|
||||||
///
|
///
|
||||||
// set ip address in sockect struct
|
// set ip address in sockect struct
|
||||||
@ -190,4 +194,61 @@ int do_read_buffer(DCB *dcb, uint8_t *buffer) {
|
|||||||
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
/////////////////////////////////////////////////
|
||||||
|
// Read data from dcb and store it in the gwbuf
|
||||||
|
/////////////////////////////////////////////////
|
||||||
|
int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b) {
|
||||||
|
GWBUF *buffer = NULL;
|
||||||
|
int n = -1;
|
||||||
|
|
||||||
|
if (b <= 0)
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
while (b > 0) {
|
||||||
|
int bufsize = b < MAX_BUFFER_SIZE ? b : MAX_BUFFER_SIZE;
|
||||||
|
if ((buffer = gwbuf_alloc(bufsize)) == NULL) {
|
||||||
|
/* Bad news, we have run out of memory */
|
||||||
|
/* Error handling */
|
||||||
|
if (dcb->session->backends) {
|
||||||
|
(dcb->session->backends->func).error(dcb->session->backends, -1);
|
||||||
|
}
|
||||||
|
(dcb->func).error(dcb, -1);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); dcb->stats.n_reads++);
|
||||||
|
|
||||||
|
if (n < 0) {
|
||||||
|
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
|
||||||
|
fprintf(stderr, "Client connection %i: continue for %i, %s\n", dcb->fd, errno, strerror(errno));
|
||||||
|
return 1;
|
||||||
|
} else {
|
||||||
|
fprintf(stderr, "Client connection %i error: %i, %s\n", dcb->fd, errno, strerror(errno));;
|
||||||
|
if (dcb->session->backends) {
|
||||||
|
(dcb->session->backends->func).error(dcb->session->backends, -1);
|
||||||
|
}
|
||||||
|
(dcb->func).error(dcb, -1);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (n == 0) {
|
||||||
|
// socket closed
|
||||||
|
fprintf(stderr, "Client connection %i closed: %i, %s\n", dcb->fd, errno, strerror(errno));
|
||||||
|
if (dcb->session->backends) {
|
||||||
|
(dcb->session->backends->func).error(dcb->session->backends, -1);
|
||||||
|
}
|
||||||
|
(dcb->func).error(dcb, -1);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// append read data to the gwbuf
|
||||||
|
*head = gwbuf_append(*head, buffer);
|
||||||
|
|
||||||
|
// how many bytes left
|
||||||
|
b -= n;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
201
core/utils.c
201
core/utils.c
@ -91,20 +91,18 @@ int gw_read_backend_event(DCB *dcb, int epfd) {
|
|||||||
// do the right task, not just break
|
// do the right task, not just break
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
head = gwbuf_append(head, buffer);
|
head = gwbuf_append(head, buffer);
|
||||||
|
|
||||||
// how many bytes left
|
// how many bytes left
|
||||||
b -= n;
|
b -= n;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// write the gwbuffer to client
|
||||||
dcb->session->client->func.write(dcb->session->client, head);
|
dcb->session->client->func.write(dcb->session->client, head);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
#ifdef GW_DEBUG_READ_EVENT
|
|
||||||
fprintf(stderr, "The backend says that Client Protocol state is %i\n", client_protocol->state);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
@ -198,10 +196,16 @@ int gw_write_backend_event(DCB *dcb, int epfd) {
|
|||||||
//client read event triggered by EPOLLIN
|
//client read event triggered by EPOLLIN
|
||||||
//////////////////////////////////////////
|
//////////////////////////////////////////
|
||||||
int gw_route_read_event(DCB* dcb, int epfd) {
|
int gw_route_read_event(DCB* dcb, int epfd) {
|
||||||
MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
|
MySQLProtocol *protocol = NULL;
|
||||||
uint8_t buffer[MAX_BUFFER_SIZE] = "";
|
uint8_t buffer[MAX_BUFFER_SIZE] = "";
|
||||||
int n;
|
int n = 0;
|
||||||
int b = -1;
|
int b = -1;
|
||||||
|
GWBUF *head = NULL;
|
||||||
|
GWBUF *gw_buffer = NULL;
|
||||||
|
|
||||||
|
if (dcb) {
|
||||||
|
protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
if (ioctl(dcb->fd, FIONREAD, &b)) {
|
if (ioctl(dcb->fd, FIONREAD, &b)) {
|
||||||
@ -210,56 +214,45 @@ int gw_route_read_event(DCB* dcb, int epfd) {
|
|||||||
fprintf(stderr, "Client IOCTL FIONREAD bytes to read = %i\n", b);
|
fprintf(stderr, "Client IOCTL FIONREAD bytes to read = %i\n", b);
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef GW_DEBUG_READ_EVENT
|
|
||||||
fprintf(stderr, "Client DCB [%s], EPOLLIN Protocol state [%i] for socket %i, scramble [%s]\n", gw_dcb_state2string(dcb->state), protocol->state, dcb->fd, protocol->scramble);
|
|
||||||
#endif
|
|
||||||
switch (protocol->state) {
|
switch (protocol->state) {
|
||||||
case MYSQL_AUTH_SENT:
|
case MYSQL_AUTH_SENT:
|
||||||
// read client auth
|
|
||||||
n = read(dcb->fd, buffer, MAX_BUFFER_SIZE);
|
|
||||||
|
|
||||||
fprintf(stderr, "Client DCB [%s], EPOLLIN Protocol state [%i] for socket %i, bytes read %i\n", gw_dcb_state2string(dcb->state), protocol->state, dcb->fd, n);
|
|
||||||
|
|
||||||
if (n < 0) {
|
|
||||||
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
|
|
||||||
fprintf(stderr, "Client connection %i: continue for %i, %s\n", dcb->fd, errno, strerror(errno));
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
|
|
||||||
fprintf(stderr, "Client connection %i error: %i, %s\n", dcb->fd, errno, strerror(errno));;
|
|
||||||
|
|
||||||
if (dcb->session->backends) {
|
|
||||||
(dcb->session->backends->func).error(dcb->session->backends, -1);
|
|
||||||
}
|
|
||||||
(dcb->func).error(dcb, -1);
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (n == 0) {
|
|
||||||
// EOF
|
|
||||||
fprintf(stderr, "Client connection %i closed: %i, %s\n", dcb->fd, errno, strerror(errno));
|
|
||||||
|
|
||||||
if (dcb->session->backends) {
|
|
||||||
(dcb->session->backends->func).error(dcb->session->backends, -1);
|
|
||||||
}
|
|
||||||
(dcb->func).error(dcb, -1);
|
|
||||||
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle possible errors:
|
/*
|
||||||
// 0 connection close
|
* Read all the data that is available into a chain of buffers
|
||||||
// -1, error: not EAGAIN or EWOULDBLOCK
|
*/
|
||||||
|
{
|
||||||
|
int len = -1;
|
||||||
|
int ret = -1;
|
||||||
|
GWBUF *queue = NULL;
|
||||||
|
GWBUF *gw_buffer = NULL;
|
||||||
|
//////////////////////////////////////////////////////
|
||||||
|
// read and handle errors & close, or return if busyA
|
||||||
|
// note: if b == 0 error handling is not triggered, just return
|
||||||
|
// without closing
|
||||||
|
//////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
if ((ret = gw_read_gwbuff(dcb, &gw_buffer, b)) != 0)
|
||||||
|
return ret;
|
||||||
|
|
||||||
|
// example with consume, assuming one buffer only ...
|
||||||
|
queue = gw_buffer;
|
||||||
|
len = GWBUF_LENGTH(queue);
|
||||||
|
|
||||||
|
fprintf(stderr, "<<< Reading from Client %i bytes: [%s]\n", len, GWBUF_DATA(queue));
|
||||||
|
|
||||||
|
// gw_do_local_authentication(dcb, GWBUF_DATA(queue));
|
||||||
|
|
||||||
|
// Data printed on stderr or handled withot the dcb->func.write
|
||||||
|
// so consume it now
|
||||||
|
queue = gwbuf_consume(queue, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToDo
|
||||||
|
// now we have the data in the buffer, do the authentication
|
||||||
|
// but don't consume the gwbuf as in the above example !!!
|
||||||
|
|
||||||
protocol->state = MYSQL_AUTH_RECV;
|
protocol->state = MYSQL_AUTH_RECV;
|
||||||
|
|
||||||
|
|
||||||
#ifdef GW_DEBUG_READ_EVENT
|
|
||||||
fprintf(stderr, "DCB [%i], EPOLLIN Protocol next state MYSQL_AUTH_RECV [%i], Packet #%i for socket %i, scramble [%s]\n", dcb->state, dcb->proto_state, 1, dcb->fd, protocol->scramble);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// check authentication
|
// check authentication
|
||||||
// if OK return mysql_ok
|
// if OK return mysql_ok
|
||||||
// else return error
|
// else return error
|
||||||
@ -269,57 +262,65 @@ int gw_route_read_event(DCB* dcb, int epfd) {
|
|||||||
|
|
||||||
case MYSQL_IDLE:
|
case MYSQL_IDLE:
|
||||||
case MYSQL_WAITING_RESULT:
|
case MYSQL_WAITING_RESULT:
|
||||||
n = read(dcb->fd, buffer, MAX_BUFFER_SIZE);
|
/*
|
||||||
if (n < 0) {
|
* Read all the data that is available into a chain of buffers
|
||||||
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
|
*/
|
||||||
fprintf(stderr, "WAITING RESULT connection %i: continue for %i, %s\n", dcb->fd, errno, strerror(errno));
|
{
|
||||||
break;
|
int len;
|
||||||
} else {
|
GWBUF *queue = NULL;
|
||||||
|
GWBUF *gw_buffer = NULL;
|
||||||
fprintf(stderr, "connection %i error: %i, %s\n", dcb->fd, errno, strerror(errno));;
|
uint8_t *ptr_buff = NULL;
|
||||||
|
int mysql_command = -1;
|
||||||
|
int ret = -1;
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////
|
||||||
|
// read and handle errors & close, or return if busy
|
||||||
|
//////////////////////////////////////////////////////
|
||||||
|
if ((ret = gw_read_gwbuff(dcb, &gw_buffer, b)) != 0)
|
||||||
|
return ret;
|
||||||
|
|
||||||
(dcb->session->backends->func).error(dcb->session->backends, -1);
|
// Now assuming in the first buffer there is the information form mysql command
|
||||||
(dcb->func).error(dcb, -1);
|
|
||||||
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (n == 0) {
|
// following code is only for debug now
|
||||||
fprintf(stderr, "connection %i closed: %i, %s\n", dcb->fd, errno, strerror(errno));
|
queue = gw_buffer;
|
||||||
if (dcb->session->backends) {
|
len = GWBUF_LENGTH(queue);
|
||||||
(dcb->session->backends->func).error(dcb->session->backends, -1);
|
|
||||||
}
|
ptr_buff = GWBUF_DATA(queue);
|
||||||
(dcb->func).error(dcb, -1);
|
|
||||||
|
|
||||||
return 1;
|
// get mysql commang
|
||||||
}
|
if (ptr_buff)
|
||||||
|
mysql_command = ptr_buff[4];
|
||||||
|
|
||||||
#ifdef GW_DEBUG_READ_EVENT
|
if (mysql_command == '\x03')
|
||||||
fprintf(stderr, "Client DCB [%s], EPOLLIN Protocol state [%i] for fd %i has read %i bytes\n", gw_dcb_state2string(dcb->state), protocol->state, dcb->fd, n);
|
/// this is a query !!!!
|
||||||
#endif
|
fprintf(stderr, "<<< MySQL Query from Client %i bytes: [%s]\n", len, ptr_buff+5);
|
||||||
|
else
|
||||||
if (buffer[4] == '\x01') {
|
fprintf(stderr, "<<< Reading from Client %i bytes: [%s]\n", len, ptr_buff);
|
||||||
|
|
||||||
|
///////////////////////////
|
||||||
|
// Handling the COM_QUIT
|
||||||
|
//////////////////////////
|
||||||
|
if (mysql_command == '\x01') {
|
||||||
fprintf(stderr, "COM_QUIT received\n");
|
fprintf(stderr, "COM_QUIT received\n");
|
||||||
if (dcb->session->backends) {
|
if (dcb->session->backends) {
|
||||||
write(dcb->session->backends->fd, buffer, n);
|
dcb->session->backends->func.write(dcb, queue);
|
||||||
(dcb->session->backends->func).error(dcb->session->backends, -1);
|
(dcb->session->backends->func).error(dcb->session->backends, -1);
|
||||||
}
|
}
|
||||||
(dcb->func).error(dcb, -1);
|
(dcb->func).error(dcb, -1);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
protocol->state = MYSQL_ROUTING;
|
||||||
|
|
||||||
|
///////////////////////////////////////
|
||||||
|
// writing in the backend buffer queue
|
||||||
|
///////////////////////////////////////
|
||||||
|
dcb->session->backends->func.write(dcb->session->backends, queue);
|
||||||
|
|
||||||
|
protocol->state = MYSQL_WAITING_RESULT;
|
||||||
|
|
||||||
}
|
}
|
||||||
#ifdef GW_DEBUG_READ_EVENT
|
|
||||||
fprintf(stderr, "DCB [%i], EPOLLIN Protocol next state MYSQL_ROUTING [%i], Packet #%i for socket %i, scramble [%s]\n", dcb->state, protocol->state, 1, dcb->fd, protocol->scramble);
|
|
||||||
#endif
|
|
||||||
protocol->state = MYSQL_ROUTING;
|
|
||||||
|
|
||||||
write(dcb->session->backends->fd, buffer, n);
|
|
||||||
#ifdef GW_DEBUG_READ_EVENT
|
|
||||||
fprintf(stderr, "Client %i, has written to backend %i, btytes %i [%s]\n", dcb->fd, dcb->session->backends->fd, n, buffer);
|
|
||||||
#endif
|
|
||||||
protocol->state = MYSQL_WAITING_RESULT;
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
@ -344,16 +345,11 @@ int gw_handle_write_event(DCB *dcb, int epfd) {
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
fprintf(stderr, "DCB is ok, continue state [%i] is [%s]\n", dcb->state, gw_dcb_state2string(dcb->state));
|
|
||||||
|
|
||||||
if (dcb->state == DCB_STATE_DISCONNECTED) {
|
if (dcb->state == DCB_STATE_DISCONNECTED) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
fprintf(stderr, "DCB is connected, continue\n");
|
|
||||||
|
|
||||||
if (dcb->protocol) {
|
if (dcb->protocol) {
|
||||||
fprintf(stderr, "DCB protocol is OK, continue\n");
|
|
||||||
protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
|
protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
|
||||||
} else {
|
} else {
|
||||||
fprintf(stderr, "DCB protocol is NULL, return\n");
|
fprintf(stderr, "DCB protocol is NULL, return\n");
|
||||||
@ -361,26 +357,16 @@ int gw_handle_write_event(DCB *dcb, int epfd) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (dcb->session) {
|
if (dcb->session) {
|
||||||
fprintf(stderr, "DCB session is OK, continue\n");
|
|
||||||
} else {
|
} else {
|
||||||
fprintf(stderr, "DCB session is NULL, return\n");
|
fprintf(stderr, "DCB session is NULL, return\n");
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dcb->session->backends) {
|
if (dcb->session->backends) {
|
||||||
fprintf(stderr, "DCB backend is OK, continue\n");
|
|
||||||
} else {
|
} else {
|
||||||
fprintf(stderr, "DCB backend is NULL, continue\n");
|
fprintf(stderr, "DCB backend is NULL, continue\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dcb->session->backends) {
|
|
||||||
fprintf(stderr, "CLIENT WRITE READY State [%i], FIRST bytes left to write %i from back %i to client %i\n", dcb->state, gwbuf_length(dcb->writeq), dcb->session->backends->fd, dcb->fd);
|
|
||||||
}
|
|
||||||
|
|
||||||
//#ifdef GW_DEBUG_WRITE_EVENT
|
|
||||||
fprintf(stderr, "$$$$$ DCB [%i], EPOLLOUT Protocol state is [%i], Packet #%i for socket %i, scramble [%s]\n", dcb->state, protocol->state, 1, dcb->fd, protocol->scramble);
|
|
||||||
//#endif
|
|
||||||
|
|
||||||
if(protocol->state == MYSQL_AUTH_RECV) {
|
if(protocol->state == MYSQL_AUTH_RECV) {
|
||||||
|
|
||||||
//write to client mysql AUTH_OK packet, packet n. is 2
|
//write to client mysql AUTH_OK packet, packet n. is 2
|
||||||
@ -413,7 +399,7 @@ int gw_handle_write_event(DCB *dcb, int epfd) {
|
|||||||
int len;
|
int len;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Loop over the buffer chain in the pending writeq
|
* Loop over the buffer chain in the pendign writeq
|
||||||
* Send as much of the data in that chain as possible and
|
* Send as much of the data in that chain as possible and
|
||||||
* leave any balance on the write queue.
|
* leave any balance on the write queue.
|
||||||
*/
|
*/
|
||||||
@ -468,7 +454,7 @@ void MySQLListener(int epfd, char *config_bind) {
|
|||||||
struct epoll_event ev;
|
struct epoll_event ev;
|
||||||
|
|
||||||
// this gateway, as default, will bind on port 4404 for localhost only
|
// this gateway, as default, will bind on port 4404 for localhost only
|
||||||
(config_bind != NULL) ? (bind_address_and_port = config_bind) : (bind_address_and_port = "127.0.0.1:4404");
|
(config_bind != NULL) ? (bind_address_and_port = config_bind) : (bind_address_and_port = "127.0.0.1:4406");
|
||||||
|
|
||||||
listener = (DCB *) calloc(1, sizeof(DCB));
|
listener = (DCB *) calloc(1, sizeof(DCB));
|
||||||
|
|
||||||
@ -631,6 +617,7 @@ int MySQLAccept(DCB *listener, int efd) {
|
|||||||
backend->state = DCB_STATE_POLLING;
|
backend->state = DCB_STATE_POLLING;
|
||||||
backend->session = session;
|
backend->session = session;
|
||||||
(backend->func).read = gw_read_backend_event;
|
(backend->func).read = gw_read_backend_event;
|
||||||
|
(backend->func).write = MySQLWrite;
|
||||||
(backend->func).write_ready = gw_write_backend_event;
|
(backend->func).write_ready = gw_write_backend_event;
|
||||||
(backend->func).error = handle_event_errors_backend;
|
(backend->func).error = handle_event_errors_backend;
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user