Converted mysql_send_ok and MySQLSendHandshake to use the new buffer management
Also added the -p option to set listener port. This is a short term measure for testing only until we have the proper configuration in place.
This commit is contained in:
1
.bzrignore
Normal file
1
.bzrignore
Normal file
@ -0,0 +1 @@
|
|||||||
|
core/tags
|
@ -18,6 +18,9 @@ CC=cc
|
|||||||
CFLAGS=-c -I/usr/include -I../include
|
CFLAGS=-c -I/usr/include -I../include
|
||||||
SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c gw_utils.c \
|
SRCS= atomic.c buffer.c spinlock.c gateway.c gateway_mysql_protocol.c gw_utils.c \
|
||||||
utils.c dcb.c
|
utils.c dcb.c
|
||||||
|
HDRS= ../include/atomic.h ../include/buffer.h ../include/dcb.h \
|
||||||
|
../include/gateway_mysql.h ../include/gw.h ../include/mysql_protocol.h \
|
||||||
|
../include/session.h ../include/spinlock.h ../include/thread.h
|
||||||
OBJ=$(SRCS:.c=.o)
|
OBJ=$(SRCS:.c=.o)
|
||||||
LIBS=-lssl
|
LIBS=-lssl
|
||||||
|
|
||||||
@ -29,3 +32,6 @@ gateway: $(OBJ)
|
|||||||
|
|
||||||
clean:
|
clean:
|
||||||
rm -f $(OBJ) gateway
|
rm -f $(OBJ) gateway
|
||||||
|
|
||||||
|
tags:
|
||||||
|
ctags $(SRCS) $(HDRS)
|
||||||
|
@ -119,6 +119,7 @@ printDCB(DCB *dcb)
|
|||||||
(void)printf("\tStatistics:\n");
|
(void)printf("\tStatistics:\n");
|
||||||
(void)printf("\t\tNo. of Reads: %d\n", dcb->stats.n_reads);
|
(void)printf("\t\tNo. of Reads: %d\n", dcb->stats.n_reads);
|
||||||
(void)printf("\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
|
(void)printf("\t\tNo. of Writes: %d\n", dcb->stats.n_writes);
|
||||||
|
(void)printf("\t\tNo. of Buffered Writes: %d\n", dcb->stats.n_buffered);
|
||||||
(void)printf("\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
|
(void)printf("\t\tNo. of Accepts: %d\n", dcb->stats.n_accepts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,6 +22,8 @@
|
|||||||
*
|
*
|
||||||
* Date Who Description
|
* Date Who Description
|
||||||
* 23-05-2013 Massimiliano Pinto epoll loop test
|
* 23-05-2013 Massimiliano Pinto epoll loop test
|
||||||
|
* 12-06-2013 Mark Riddoch Add the -p option to set the
|
||||||
|
* listening port
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@ -164,6 +166,15 @@ int main(int argc, char **argv) {
|
|||||||
struct epoll_event ev;
|
struct epoll_event ev;
|
||||||
int nfds;
|
int nfds;
|
||||||
int n;
|
int n;
|
||||||
|
char *port = NULL;
|
||||||
|
|
||||||
|
for (n = 0; n < argc; n++)
|
||||||
|
{
|
||||||
|
if (strncmp(argv[n], "-p", 2) == 0)
|
||||||
|
{
|
||||||
|
port = &argv[n][2];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fprintf(stderr, "(C) SkySQL Ab 2013\n");
|
fprintf(stderr, "(C) SkySQL Ab 2013\n");
|
||||||
|
|
||||||
@ -213,7 +224,7 @@ int main(int argc, char **argv) {
|
|||||||
5. bind
|
5. bind
|
||||||
6. epoll add event
|
6. epoll add event
|
||||||
*/
|
*/
|
||||||
MySQLListener(epollfd, NULL);
|
MySQLListener(epollfd, port);
|
||||||
|
|
||||||
// event loop for all the descriptors added via epoll_ctl
|
// event loop for all the descriptors added via epoll_ctl
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -25,6 +25,8 @@
|
|||||||
* 1)send handshake in accept
|
* 1)send handshake in accept
|
||||||
* 2) read data
|
* 2) read data
|
||||||
* 3) alway send OK
|
* 3) alway send OK
|
||||||
|
* 12-06-2013 Mark Riddoch Move mysql_send_ok and MySQLSendHandshake
|
||||||
|
* to use the new buffer management scheme
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@ -32,11 +34,25 @@
|
|||||||
#include <gw.h>
|
#include <gw.h>
|
||||||
#include <dcb.h>
|
#include <dcb.h>
|
||||||
#include <session.h>
|
#include <session.h>
|
||||||
|
#include <buffer.h>
|
||||||
|
|
||||||
#define MYSQL_CONN_DEBUG
|
#define MYSQL_CONN_DEBUG
|
||||||
//#undef MYSQL_CONN_DEBUG
|
//#undef MYSQL_CONN_DEBUG
|
||||||
|
|
||||||
int mysql_send_ok(int fd, int packet_number, int in_affected_rows, const char* mysql_message) {
|
/*
|
||||||
|
* mysql_send_ok
|
||||||
|
*
|
||||||
|
* Send a MySQL protocol OK message to the dcb
|
||||||
|
*
|
||||||
|
* @param dcb Descriptor Control Block for the connection to which the OK is sent
|
||||||
|
* @param packet_number
|
||||||
|
* @param in_affected_rows
|
||||||
|
* @param mysql_message
|
||||||
|
* @return
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message) {
|
||||||
int n = 0;
|
int n = 0;
|
||||||
uint8_t *outbuf = NULL;
|
uint8_t *outbuf = NULL;
|
||||||
uint8_t mysql_payload_size = 0;
|
uint8_t mysql_payload_size = 0;
|
||||||
@ -47,17 +63,22 @@ int mysql_send_ok(int fd, int packet_number, int in_affected_rows, const char* m
|
|||||||
uint8_t insert_id = 0;
|
uint8_t insert_id = 0;
|
||||||
uint8_t mysql_server_status[2];
|
uint8_t mysql_server_status[2];
|
||||||
uint8_t mysql_warning_count[2];
|
uint8_t mysql_warning_count[2];
|
||||||
|
GWBUF *buf;
|
||||||
|
|
||||||
affected_rows = in_affected_rows;
|
affected_rows = in_affected_rows;
|
||||||
|
|
||||||
mysql_payload_size = sizeof(field_count) + sizeof(affected_rows) + sizeof(insert_id) + sizeof(mysql_server_status) + sizeof(mysql_warning_count);
|
mysql_payload_size = sizeof(field_count) + sizeof(affected_rows) + sizeof(insert_id) + sizeof(mysql_server_status) + sizeof(mysql_warning_count);
|
||||||
|
|
||||||
if (mysql_message != NULL) {
|
if (mysql_message != NULL) {
|
||||||
mysql_payload_size += strlen(mysql_message);
|
mysql_payload_size += strlen(mysql_message);
|
||||||
}
|
}
|
||||||
|
|
||||||
// allocate memory for packet header + payload
|
// allocate memory for packet header + payload
|
||||||
outbuf = (uint8_t *) calloc(1, sizeof(mysql_packet_header) + mysql_payload_size);
|
if ((buf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size)) == NULL)
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
outbuf = GWBUF_DATA(buf);
|
||||||
|
|
||||||
// write packet header with packet number
|
// write packet header with packet number
|
||||||
gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size);
|
gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size);
|
||||||
@ -94,15 +115,21 @@ int mysql_send_ok(int fd, int packet_number, int in_affected_rows, const char* m
|
|||||||
}
|
}
|
||||||
|
|
||||||
// write data
|
// write data
|
||||||
n = write(fd, outbuf, sizeof(mysql_packet_header) + mysql_payload_size);
|
dcb->func.write(dcb, buf);
|
||||||
|
|
||||||
free(outbuf);
|
return sizeof(mysql_packet_header) + mysql_payload_size;
|
||||||
|
|
||||||
return n;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int MySQLSendHandshake(DCB* dcb) {
|
/*
|
||||||
|
* MySQLSendHandshake
|
||||||
|
*
|
||||||
|
* @param dcb The descriptor control block to use for sendign the handshake request
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
MySQLSendHandshake(DCB* dcb)
|
||||||
|
{
|
||||||
int n = 0;
|
int n = 0;
|
||||||
uint8_t *outbuf = NULL;
|
uint8_t *outbuf = NULL;
|
||||||
uint8_t mysql_payload_size = 0;
|
uint8_t mysql_payload_size = 0;
|
||||||
@ -123,6 +150,7 @@ int MySQLSendHandshake(DCB* dcb) {
|
|||||||
uint8_t mysql_last_byte = 0x00;
|
uint8_t mysql_last_byte = 0x00;
|
||||||
char server_scramble[GW_MYSQL_SCRAMBLE_SIZE + 1]="";
|
char server_scramble[GW_MYSQL_SCRAMBLE_SIZE + 1]="";
|
||||||
MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
|
MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
|
||||||
|
GWBUF *buf;
|
||||||
|
|
||||||
gw_generate_random_str(server_scramble, GW_MYSQL_SCRAMBLE_SIZE);
|
gw_generate_random_str(server_scramble, GW_MYSQL_SCRAMBLE_SIZE);
|
||||||
|
|
||||||
@ -143,7 +171,11 @@ int MySQLSendHandshake(DCB* dcb) {
|
|||||||
mysql_payload_size = sizeof(mysql_protocol_version) + (strlen(GW_MYSQL_VERSION) + 1) + sizeof(mysql_thread_id) + 8 + sizeof(mysql_filler) + sizeof(mysql_server_capabilities_one) + sizeof(mysql_server_language) + sizeof(mysql_server_status) + sizeof(mysql_server_capabilities_two) + sizeof(mysql_scramble_len) + sizeof(mysql_filler_ten) + 12 + sizeof(mysql_last_byte) + strlen("mysql_native_password") + sizeof(mysql_last_byte);
|
mysql_payload_size = sizeof(mysql_protocol_version) + (strlen(GW_MYSQL_VERSION) + 1) + sizeof(mysql_thread_id) + 8 + sizeof(mysql_filler) + sizeof(mysql_server_capabilities_one) + sizeof(mysql_server_language) + sizeof(mysql_server_status) + sizeof(mysql_server_capabilities_two) + sizeof(mysql_scramble_len) + sizeof(mysql_filler_ten) + 12 + sizeof(mysql_last_byte) + strlen("mysql_native_password") + sizeof(mysql_last_byte);
|
||||||
|
|
||||||
// allocate memory for packet header + payload
|
// allocate memory for packet header + payload
|
||||||
outbuf = (uint8_t *) calloc(1, sizeof(mysql_packet_header) + mysql_payload_size);
|
if ((buf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size)) == NULL)
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
outbuf = GWBUF_DATA(buf);
|
||||||
|
|
||||||
// write packet heder with mysql_payload_size
|
// write packet heder with mysql_payload_size
|
||||||
gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size);
|
gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size);
|
||||||
@ -230,16 +262,15 @@ int MySQLSendHandshake(DCB* dcb) {
|
|||||||
|
|
||||||
mysql_handshake_payload++;
|
mysql_handshake_payload++;
|
||||||
|
|
||||||
|
// write data
|
||||||
// write it to the socket
|
dcb->func.write(dcb, buf);
|
||||||
// this not covers the EAGAIN | EWOULDBLOCK
|
|
||||||
n = write(dcb->fd, outbuf, sizeof(mysql_packet_header) + mysql_payload_size);
|
|
||||||
|
|
||||||
free(outbuf);
|
return sizeof(mysql_packet_header) + mysql_payload_size;
|
||||||
|
|
||||||
return n;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int gw_mysql_do_authentication(DCB *dcb) {
|
int gw_mysql_do_authentication(DCB *dcb) {
|
||||||
int packet_no;
|
int packet_no;
|
||||||
MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
|
MySQLProtocol *protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
|
||||||
@ -253,7 +284,7 @@ int gw_mysql_do_authentication(DCB *dcb) {
|
|||||||
fprintf(stderr, "DoAuth DCB [%i], EPOLLIN Protocol next state MYSQL_AUTH_RECV [%i], Packet #%i for socket %i, scramble [%s]\n", dcb->state, protocol->state, packet_no, dcb->fd, protocol->scramble);
|
fprintf(stderr, "DoAuth DCB [%i], EPOLLIN Protocol next state MYSQL_AUTH_RECV [%i], Packet #%i for socket %i, scramble [%s]\n", dcb->state, protocol->state, packet_no, dcb->fd, protocol->scramble);
|
||||||
|
|
||||||
//write to client mysql AUTH_OK packet, packet n. is 2
|
//write to client mysql AUTH_OK packet, packet n. is 2
|
||||||
mysql_send_ok(dcb->fd, 2, 0, NULL);
|
mysql_send_ok(dcb, 2, 0, NULL);
|
||||||
|
|
||||||
protocol->state = MYSQL_IDLE;
|
protocol->state = MYSQL_IDLE;
|
||||||
|
|
||||||
@ -326,7 +357,7 @@ int gw_mysql_read_command(DCB *dcb) {
|
|||||||
// could be a mysql_ping() reply
|
// could be a mysql_ping() reply
|
||||||
// writing the result set would come from async read from backends
|
// writing the result set would come from async read from backends
|
||||||
|
|
||||||
mysql_send_ok(dcb->fd, packet_no, 0, NULL);
|
mysql_send_ok(dcb, packet_no, 0, NULL);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
19
core/utils.c
19
core/utils.c
@ -85,16 +85,16 @@ int gw_read_backend_event(DCB *dcb, int epfd) {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); dcb->stats.n_reads++);
|
GW_NOINTR_CALL(n = read(dcb->fd, GWBUF_DATA(buffer), bufsize); dcb->stats.n_reads++);
|
||||||
if (n < 0) {
|
if (n < 0)
|
||||||
|
{
|
||||||
// if eerno == EAGAIN || EWOULDBLOCK is missing
|
// if eerno == EAGAIN || EWOULDBLOCK is missing
|
||||||
|
// do the right task, not just break
|
||||||
// do the rigth task, not just break
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
head = gwbuf_append(head, buffer);
|
head = gwbuf_append(head, buffer);
|
||||||
|
|
||||||
// how many bytes left
|
// how many bytes left
|
||||||
b = b - n;
|
b -= n;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -133,6 +133,7 @@ int w, saved_errno = 0;
|
|||||||
* not have a race condition on the event.
|
* not have a race condition on the event.
|
||||||
*/
|
*/
|
||||||
dcb->writeq = gwbuf_append(dcb->writeq, queue);
|
dcb->writeq = gwbuf_append(dcb->writeq, queue);
|
||||||
|
dcb->stats.n_buffered++;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -166,6 +167,10 @@ int w, saved_errno = 0;
|
|||||||
}
|
}
|
||||||
/* Buffer the balance of any data */
|
/* Buffer the balance of any data */
|
||||||
dcb->writeq = queue;
|
dcb->writeq = queue;
|
||||||
|
if (queue)
|
||||||
|
{
|
||||||
|
dcb->stats.n_buffered++;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
spinlock_release(&dcb->writeqlock);
|
spinlock_release(&dcb->writeqlock);
|
||||||
|
|
||||||
@ -379,7 +384,7 @@ int gw_handle_write_event(DCB *dcb, int epfd) {
|
|||||||
if(protocol->state == MYSQL_AUTH_RECV) {
|
if(protocol->state == MYSQL_AUTH_RECV) {
|
||||||
|
|
||||||
//write to client mysql AUTH_OK packet, packet n. is 2
|
//write to client mysql AUTH_OK packet, packet n. is 2
|
||||||
mysql_send_ok(dcb->fd, 2, 0, NULL);
|
mysql_send_ok(dcb, 2, 0, NULL);
|
||||||
|
|
||||||
protocol->state = MYSQL_IDLE;
|
protocol->state = MYSQL_IDLE;
|
||||||
|
|
||||||
@ -392,7 +397,7 @@ int gw_handle_write_event(DCB *dcb, int epfd) {
|
|||||||
|
|
||||||
if (protocol->state == MYSQL_AUTH_FAILED) {
|
if (protocol->state == MYSQL_AUTH_FAILED) {
|
||||||
// still to implement
|
// still to implement
|
||||||
mysql_send_ok(dcb->fd, 2, 0, NULL);
|
mysql_send_ok(dcb, 2, 0, NULL);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -408,7 +413,7 @@ int gw_handle_write_event(DCB *dcb, int epfd) {
|
|||||||
int len;
|
int len;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Loop over the buffer chain in the pendign writeq
|
* Loop over the buffer chain in the pending writeq
|
||||||
* Send as much of the data in that chain as possible and
|
* Send as much of the data in that chain as possible and
|
||||||
* leave any balance on the write queue.
|
* leave any balance on the write queue.
|
||||||
*/
|
*/
|
||||||
|
@ -63,6 +63,7 @@ typedef struct dcbstats {
|
|||||||
int n_reads; /* Number of reads on this descriptor */
|
int n_reads; /* Number of reads on this descriptor */
|
||||||
int n_writes; /* Number of writes on this descriptor */
|
int n_writes; /* Number of writes on this descriptor */
|
||||||
int n_accepts; /* Number of accepts on this descriptor */
|
int n_accepts; /* Number of accepts on this descriptor */
|
||||||
|
int n_buffered; /* Number of buffered writes */
|
||||||
} DCBSTATS;
|
} DCBSTATS;
|
||||||
/*
|
/*
|
||||||
* Descriptor Control Block
|
* Descriptor Control Block
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
* 10/06/13 Massimiliano Pinto Initial implementation
|
* 10/06/13 Massimiliano Pinto Initial implementation
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
#include <dcb.h>
|
||||||
|
|
||||||
/* Protocol packing macros. */
|
/* Protocol packing macros. */
|
||||||
#define gw_mysql_set_byte2(__buffer, __int) do { \
|
#define gw_mysql_set_byte2(__buffer, __int) do { \
|
||||||
@ -120,3 +121,6 @@ typedef enum
|
|||||||
#define SMALL_CHUNK 1024
|
#define SMALL_CHUNK 1024
|
||||||
#define MAX_CHUNK SMALL_CHUNK * 8 * 4
|
#define MAX_CHUNK SMALL_CHUNK * 8 * 4
|
||||||
#define ToHex(Y) (Y>='0'&&Y<='9'?Y-'0':Y-'A'+10)
|
#define ToHex(Y) (Y>='0'&&Y<='9'?Y-'0':Y-'A'+10)
|
||||||
|
|
||||||
|
extern int mysql_send_ok(DCB *, int, int, const char *);
|
||||||
|
extern int MySQLSendHandshake(DCB *);
|
||||||
|
Reference in New Issue
Block a user