Merge branch 'SESvars' of https://github.com/skysql/MaxScale into SESvars

Conflicts:
	server/core/dcb.c
	server/core/poll.c
	server/modules/include/mysql_client_server_protocol.h
	server/modules/routing/readwritesplit/readwritesplit.c
This commit is contained in:
VilhoRaatikka
2014-03-18 10:28:06 +02:00
19 changed files with 1667 additions and 403 deletions

View File

@ -57,6 +57,11 @@ static int gw_client_hangup_event(DCB *dcb);
int mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message);
int MySQLSendHandshake(DCB* dcb);
static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue);
static int route_by_statement(
ROUTER* router_instance,
ROUTER_OBJECT* router,
void* rsession,
GWBUF* read_buf);
/*
* The "module object" for the mysqld client protocol module.
@ -73,7 +78,7 @@ static GWPROTOCOL MyObject = {
gw_MySQLListener, /* Listen */
NULL, /* Authentication */
NULL /* Session */
};
};
/**
* Implementation of the mandatory version entry point
@ -621,10 +626,11 @@ int gw_read_client_event(DCB* dcb) {
*/
{
int len = -1;
GWBUF *queue = NULL;
GWBUF *gw_buffer = NULL;
uint8_t cap = 0;
GWBUF *read_buffer = NULL;
uint8_t *ptr_buff = NULL;
int mysql_command = -1;
bool stmt_input; /*< router input type */
session = dcb->session;
@ -640,21 +646,20 @@ int gw_read_client_event(DCB* dcb) {
//////////////////////////////////////////////////////
// read and handle errors & close, or return if busy
//////////////////////////////////////////////////////
rc = gw_read_gwbuff(dcb, &gw_buffer, b);
rc = gw_read_gwbuff(dcb, &read_buffer, b);
if (rc != 0) {
goto return_rc;
}
/* Now, we are assuming in the first buffer there is
* the information form mysql command */
queue = gw_buffer;
len = GWBUF_LENGTH(queue);
ptr_buff = GWBUF_DATA(queue);
len = GWBUF_LENGTH(read_buffer);
ptr_buff = GWBUF_DATA(read_buffer);
/* get mysql commang at fifth byte */
if (ptr_buff) {
mysql_command = ptr_buff[4];
}
}
/**
* Without rsession there is no access to backend.
* COM_QUIT : close client dcb
@ -683,12 +688,38 @@ int gw_read_client_event(DCB* dcb) {
}
rc = 1;
/** Free buffer */
queue = gwbuf_consume(queue, len);
read_buffer = gwbuf_consume(read_buffer, len);
goto return_rc;
}
/** Ask what type of input the router expects */
cap = router->getCapabilities(router_instance, rsession);
if (cap == 0 || (cap == RCAP_TYPE_PACKET_INPUT))
{
stmt_input = false;
}
else if (cap == RCAP_TYPE_STMT_INPUT)
{
stmt_input = true;
/** Mark buffer to as MySQL type */
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
}
else
{
mysql_send_custom_error(dcb,
1,
0,
"Reading router capabilities "
"failed. Router session is "
"closed.");
rc = 1;
goto return_rc;
}
/** Route COM_QUIT to backend */
if (mysql_command == '\x01') {
router->routeQuery(router_instance, rsession, queue);
router->routeQuery(router_instance, rsession, read_buffer);
LOGIF(LD, (skygw_log_write_flush(
LOGFILE_DEBUG,
"%lu [gw_read_client_event] Routed COM_QUIT to "
@ -704,10 +735,25 @@ int gw_read_client_event(DCB* dcb) {
}
else
{
/** Route other commands to backend */
rc = router->routeQuery(router_instance,
if (stmt_input)
{
/**
* Feed each statement completely and separately
* to router.
*/
rc = route_by_statement(router_instance,
router,
rsession,
read_buffer);
}
else
{
/** Feed whole packet to router */
rc = router->routeQuery(router_instance,
rsession,
queue);
read_buffer);
}
/** succeed */
if (rc == 1) {
rc = 0; /**< here '0' means success */
@ -1193,7 +1239,6 @@ static int gw_error_client_event(DCB *dcb) {
router = session->service->router;
router_instance = session->service->router_instance;
rsession = session->router_session;
router->closeSession(router_instance, rsession);
}
dcb_close(dcb);
@ -1251,7 +1296,8 @@ gw_client_hangup_event(DCB *dcb)
void* router_instance;
void* rsession;
int rc = 1;
#if defined(SS_DEBUG)
#if defined(SS_DEBUG)
MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol;
if (dcb->state == DCB_STATE_POLLING ||
dcb->state == DCB_STATE_NOPOLLING ||
@ -1260,8 +1306,6 @@ gw_client_hangup_event(DCB *dcb)
CHK_PROTOCOL(protocol);
}
#endif
CHK_DCB(dcb);
if (dcb->state != DCB_STATE_POLLING) {
@ -1278,7 +1322,6 @@ gw_client_hangup_event(DCB *dcb)
router = session->service->router;
router_instance = session->service->router_instance;
rsession = session->router_session;
router->closeSession(router_instance, rsession);
}
@ -1286,3 +1329,99 @@ gw_client_hangup_event(DCB *dcb)
return_rc:
return rc;
}
/**
* Detect if buffer includes partial mysql packet or multiple packets.
* Store partial packet to pendingqueue. Send complete packets one by one
* to router.
*/
static int route_by_statement(
ROUTER* router_instance,
ROUTER_OBJECT* router,
void* rsession,
GWBUF* readbuf)
{
int rc = -1;
DCB* master_dcb;
GWBUF* stmtbuf;
uint8_t* payload;
static size_t len;
do
{
stmtbuf = gw_MySQL_get_next_stmt(&readbuf);
ss_dassert(stmtbuf != NULL);
CHK_GWBUF(stmtbuf);
payload = (uint8_t *)GWBUF_DATA(stmtbuf);
/**
* If message is longer than read data, suspend routing and
* add statement buffer to wait queue.
*/
rc = router->routeQuery(router_instance, rsession, stmtbuf);
}
while (readbuf != NULL);
return rc;
}
/**
* Create a character array including the query string.
* GWBUF given as input includes either one complete or partial query.
* Length of buffer is at most the query length+4 (length of packet header).
*/
#if defined(NOT_USED)
static char* gw_get_or_create_querystr (
void* data,
bool* new_allocation)
{
GWBUF* buf = (GWBUF *)data;
size_t buflen; /*< first gw buffer data length */
size_t packetlen; /*< length of mysql packet */
size_t querylen; /*< total buffer length-<length of type indicator> */
size_t nbytes_copied;
char* startpos; /*< first byte of query in gw buffer */
char* str; /*< resulting query string */
CHK_GWBUF(buf);
packetlen = MYSQL_GET_PACKET_LEN((uint8_t *)GWBUF_DATA(buf));
str = (char *)malloc(packetlen); /*< leave space for terminating null */
if (str == NULL)
{
goto return_str;
}
*new_allocation = true;
/**
* First buffer includes 4 bytes header and a type indicator byte.
*/
buflen = GWBUF_LENGTH(buf);
querylen = packetlen-1;
ss_dassert(buflen<=querylen+5); /*< 5 == header+type indicator */
startpos = (char *)GWBUF_DATA(buf)+5;
nbytes_copied = MIN(querylen, buflen-5);
memcpy(str, startpos, nbytes_copied);
memset(&str[querylen-1], 0, 1);
buf = gwbuf_consume(buf, querylen-1);
/**
* In case of multi-packet statement whole buffer consists of query
* string.
*/
while (buf != NULL)
{
buflen = GWBUF_LENGTH(buf);
memcpy(str+nbytes_copied, GWBUF_DATA(buf), buflen);
nbytes_copied += buflen;
buf = gwbuf_consume(buf, buflen);
}
ss_dassert(str[querylen-1] == 0);
return_str:
return str;
}
#endif