diff --git a/.bzrignore b/.bzrignore new file mode 100644 index 000000000..f8634da7f --- /dev/null +++ b/.bzrignore @@ -0,0 +1 @@ +core/tags diff --git a/core/Makefile b/core/Makefile index 5cb944814..a35b13be9 100644 --- a/core/Makefile +++ b/core/Makefile @@ -18,6 +18,9 @@ CC=cc CFLAGS=-c -I/usr/include -I../include SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c gw_utils.c \ utils.c dcb.c +HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \ + ../include/gateway_mysql.h ../include/gw.h ../include/mysql_protocol.h \ + ../include/session.h ../include/spinlock.h ../include/thread.h OBJ=$(SRCS:.c=.o) LIBS=-lssl @@ -29,3 +32,6 @@ gateway: $(OBJ) clean: rm -f $(OBJ) gateway + +tags: + ctags $(SRCS) $(HDRS) diff --git a/core/dcb.c b/core/dcb.c index 52560db17..d3647026c 100644 --- a/core/dcb.c +++ b/core/dcb.c @@ -119,6 +119,7 @@ printDCB(DCB *dcb) (void)printf("\tStatistics:\n"); (void)printf("\t\tNo. of Reads: %d\n", dcb->stats.n_reads); (void)printf("\t\tNo. of Writes: %d\n", dcb->stats.n_writes); + (void)printf("\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered); (void)printf("\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts); } diff --git a/core/gateway.c b/core/gateway.c index 83c6417d5..dfd5b0d16 100644 --- a/core/gateway.c +++ b/core/gateway.c @@ -22,6 +22,8 @@ * * Date Who Description * 23-05-2013 Massimiliano Pinto epoll loop test + * 12-06-2013 Mark Riddoch Add the -p option to set the + * listening port * */ @@ -164,6 +166,15 @@ int main(int argc, char **argv) { struct epoll_event ev; int nfds; int n; + char *port = NULL; + + for (n = 0; n < argc; n++) + { + if (strncmp(argv[n], "-p", 2) == 0) + { + port = &argv[n][2]; + } + } fprintf(stderr, "(C) SkySQL Ab 2013\n"); @@ -213,7 +224,7 @@ int main(int argc, char **argv) { 5. bind 6. epoll add event */ - MySQLListener(epollfd, NULL); + MySQLListener(epollfd, port); // event loop for all the descriptors added via epoll_ctl while (1) { diff --git a/core/gateway_mysql_protocol.c b/core/gateway_mysql_protocol.c index 37ae065c7..cdc22311f 100644 --- a/core/gateway_mysql_protocol.c +++ b/core/gateway_mysql_protocol.c @@ -25,6 +25,8 @@ * 1)send handshake in accept * 2) read data * 3) alway send OK + * 12-06-2013 Mark Riddoch Move mysql_send_ok and MySQLSendHandshake + * to use the new buffer management scheme * */ @@ -32,11 +34,25 @@ #include #include #include +#include #define MYSQL_CONN_DEBUG //#undef MYSQL_CONN_DEBUG -int mysql_send_ok(int fd, int packet_number, int in_affected_rows, const char* mysql_message) { +/* + * mysql_send_ok + * + * Send a MySQL protocol OK message to the dcb + * + * @param dcb Descriptor Control Block for the connection to which the OK is sent + * @param packet_number + * @param in_affected_rows + * @param mysql_message + * @return + * + */ +int +mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message) { int n = 0; uint8_t *outbuf = NULL; uint8_t mysql_payload_size = 0; @@ -47,17 +63,22 @@ int mysql_send_ok(int fd, int packet_number, int in_affected_rows, const char* m uint8_t insert_id = 0; uint8_t mysql_server_status[2]; uint8_t mysql_warning_count[2]; + GWBUF *buf; affected_rows = in_affected_rows; - mysql_payload_size = sizeof(field_count) + sizeof(affected_rows) + sizeof(insert_id) + sizeof(mysql_server_status) + sizeof(mysql_warning_count); + mysql_payload_size = sizeof(field_count) + sizeof(affected_rows) + sizeof(insert_id) + sizeof(mysql_server_status) + sizeof(mysql_warning_count); if (mysql_message != NULL) { mysql_payload_size += strlen(mysql_message); } // allocate memory for packet header + payload - outbuf = (uint8_t *) calloc(1, sizeof(mysql_packet_header) + mysql_payload_size); + if ((buf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size)) == NULL) + { + return 0; + } + outbuf = GWBUF_DATA(buf); // write packet header with packet number gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size); @@ -94,15 +115,21 @@ int mysql_send_ok(int fd, int packet_number, int in_affected_rows, const char* m } // write data - n = write(fd, outbuf, sizeof(mysql_packet_header) + mysql_payload_size); + dcb->func.write(dcb, buf); - free(outbuf); - - return n; + return sizeof(mysql_packet_header) + mysql_payload_size; } -int MySQLSendHandshake(DCB* dcb) { +/* + * MySQLSendHandshake + * + * @param dcb The descriptor control block to use for sendign the handshake request + * @return + */ +int +MySQLSendHandshake(DCB* dcb) +{ int n = 0; uint8_t *outbuf = NULL; uint8_t mysql_payload_size = 0; @@ -123,6 +150,7 @@ int MySQLSendHandshake(DCB* dcb) { uint8_t mysql_last_byte = 0x00; char server_scramble[GW_MYSQL_SCRAMBLE_SIZE + 1]=""; MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol); + GWBUF *buf; gw_generate_random_str(server_scramble, GW_MYSQL_SCRAMBLE_SIZE); @@ -143,7 +171,11 @@ int MySQLSendHandshake(DCB* dcb) { mysql_payload_size = sizeof(mysql_protocol_version) + (strlen(GW_MYSQL_VERSION) + 1) + sizeof(mysql_thread_id) + 8 + sizeof(mysql_filler) + sizeof(mysql_server_capabilities_one) + sizeof(mysql_server_language) + sizeof(mysql_server_status) + sizeof(mysql_server_capabilities_two) + sizeof(mysql_scramble_len) + sizeof(mysql_filler_ten) + 12 + sizeof(mysql_last_byte) + strlen("mysql_native_password") + sizeof(mysql_last_byte); // allocate memory for packet header + payload - outbuf = (uint8_t *) calloc(1, sizeof(mysql_packet_header) + mysql_payload_size); + if ((buf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size)) == NULL) + { + return 0; + } + outbuf = GWBUF_DATA(buf); // write packet heder with mysql_payload_size gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size); @@ -230,16 +262,15 @@ int MySQLSendHandshake(DCB* dcb) { mysql_handshake_payload++; - - // write it to the socket - // this not covers the EAGAIN | EWOULDBLOCK - n = write(dcb->fd, outbuf, sizeof(mysql_packet_header) + mysql_payload_size); + // write data + dcb->func.write(dcb, buf); - free(outbuf); - - return n; + return sizeof(mysql_packet_header) + mysql_payload_size; } + + + int gw_mysql_do_authentication(DCB *dcb) { int packet_no; MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol); @@ -253,7 +284,7 @@ int gw_mysql_do_authentication(DCB *dcb) { fprintf(stderr, "DoAuth DCB [%i], EPOLLIN Protocol next state MYSQL_AUTH_RECV [%i], Packet #%i for socket %i, scramble [%s]\n", dcb->state, protocol->state, packet_no, dcb->fd, protocol->scramble); //write to client mysql AUTH_OK packet, packet n. is 2 - mysql_send_ok(dcb->fd, 2, 0, NULL); + mysql_send_ok(dcb, 2, 0, NULL); protocol->state = MYSQL_IDLE; @@ -326,7 +357,7 @@ int gw_mysql_read_command(DCB *dcb) { // could be a mysql_ping() reply // writing the result set would come from async read from backends - mysql_send_ok(dcb->fd, packet_no, 0, NULL); + mysql_send_ok(dcb, packet_no, 0, NULL); return 0; } diff --git a/core/utils.c b/core/utils.c index 527c4b37a..75b8df46e 100644 --- a/core/utils.c +++ b/core/utils.c @@ -85,16 +85,16 @@ int gw_read_backend_event(DCB *dcb, int epfd) { return 0; } GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); dcb->stats.n_reads++); - if (n < 0) { + if (n < 0) + { // if eerno == EAGAIN || EWOULDBLOCK is missing - - // do the rigth task, not just break + // do the right task, not just break break; } head = gwbuf_append(head, buffer); // how many bytes left - b = b - n; + b -= n; } @@ -133,6 +133,7 @@ int w, saved_errno = 0; * not have a race condition on the event. */ dcb->writeq = gwbuf_append(dcb->writeq, queue); + dcb->stats.n_buffered++; } else { @@ -166,6 +167,10 @@ int w, saved_errno = 0; } /* Buffer the balance of any data */ dcb->writeq = queue; + if (queue) + { + dcb->stats.n_buffered++; + } } spinlock_release(&dcb->writeqlock); @@ -379,7 +384,7 @@ int gw_handle_write_event(DCB *dcb, int epfd) { if(protocol->state == MYSQL_AUTH_RECV) { //write to client mysql AUTH_OK packet, packet n. is 2 - mysql_send_ok(dcb->fd, 2, 0, NULL); + mysql_send_ok(dcb, 2, 0, NULL); protocol->state = MYSQL_IDLE; @@ -392,7 +397,7 @@ int gw_handle_write_event(DCB *dcb, int epfd) { if (protocol->state == MYSQL_AUTH_FAILED) { // still to implement - mysql_send_ok(dcb->fd, 2, 0, NULL); + mysql_send_ok(dcb, 2, 0, NULL); return 0; } @@ -408,7 +413,7 @@ int gw_handle_write_event(DCB *dcb, int epfd) { int len; /* - * Loop over the buffer chain in the pendign writeq + * Loop over the buffer chain in the pending writeq * Send as much of the data in that chain as possible and * leave any balance on the write queue. */ diff --git a/include/dcb.h b/include/dcb.h index 18dcf19e9..5c7fc036e 100644 --- a/include/dcb.h +++ b/include/dcb.h @@ -63,6 +63,7 @@ typedef struct dcbstats { int n_reads; /* Number of reads on this descriptor */ int n_writes; /* Number of writes on this descriptor */ int n_accepts; /* Number of accepts on this descriptor */ + int n_buffered; /* Number of buffered writes */ } DCBSTATS; /* * Descriptor Control Block diff --git a/include/gateway_mysql.h b/include/gateway_mysql.h index 7aaeba34b..01c569bba 100644 --- a/include/gateway_mysql.h +++ b/include/gateway_mysql.h @@ -25,6 +25,7 @@ * 10/06/13 Massimiliano Pinto Initial implementation * */ +#include /* Protocol packing macros. */ #define gw_mysql_set_byte2(__buffer, __int) do { \ @@ -120,3 +121,6 @@ typedef enum #define SMALL_CHUNK 1024 #define MAX_CHUNK SMALL_CHUNK * 8 * 4 #define ToHex(Y) (Y>='0'&&Y<='9'?Y-'0':Y-'A'+10) + +extern int mysql_send_ok(DCB *, int, int, const char *); +extern int MySQLSendHandshake(DCB *);