Router has now capability value which currently tells whether router session expects stream or individual, complete statements. With read con

nection router stream is used and with read/write split router individual statements are passed to router.
Added new function to ROUTER_OBJECT : uint8_t (*getCapabilities)(ROUTER *instance, void* router_session); which is implemented in every route
r.

Added support for multi-statement packets in rwsplit router. In other words, if network packet includes multiple mysql statements, they are separated and passed to router one by one.

Multi-packet statements (those which exceeds network packet boundaries) are _not_ supported yet.
This commit is contained in:
VilhoRaatikka
2014-03-11 23:12:11 +02:00
parent c28892323a
commit cb6a976555
13 changed files with 701 additions and 422 deletions

View File

@ -48,11 +48,16 @@ static int gw_MySQLWrite_client(DCB *dcb, GWBUF *queue);
static int gw_error_client_event(DCB *dcb);
static int gw_client_close(DCB *dcb);
static int gw_client_hangup_event(DCB *dcb);
static void* gw_MySQL_get_next_stmt(void* buffer);
int mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message);
int MySQLSendHandshake(DCB* dcb);
static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue);
static int route_by_statement(
ROUTER* router_instance,
ROUTER_OBJECT* router,
void* rsession,
GWBUF* read_buf);
static GWBUF* gw_MySQL_get_next_stmt(GWBUF** buffer);
/*
* The "module object" for the mysqld client protocol module.
@ -68,8 +73,7 @@ static GWPROTOCOL MyObject = {
gw_client_close, /* Close */
gw_MySQLListener, /* Listen */
NULL, /* Authentication */
NULL, /* Session */
gw_MySQL_get_next_stmt /* get single stmt from read buf */
NULL /* Session */
};
/**
@ -609,9 +613,11 @@ int gw_read_client_event(DCB* dcb) {
*/
{
int len = -1;
uint8_t cap = 0;
GWBUF *read_buffer = NULL;
uint8_t *ptr_buff = NULL;
int mysql_command = -1;
bool stmt_input; /*< router input type */
session = dcb->session;
@ -640,7 +646,7 @@ int gw_read_client_event(DCB* dcb) {
/* get mysql commang at fifth byte */
if (ptr_buff) {
mysql_command = ptr_buff[4];
}
}
/**
* Without rsession there is no access to backend.
* COM_QUIT : close client dcb
@ -672,6 +678,32 @@ int gw_read_client_event(DCB* dcb) {
read_buffer = gwbuf_consume(read_buffer, len);
goto return_rc;
}
/** Ask what type of input the router expects */
cap = router->getCapabilities(router_instance, rsession);
if (cap == 0 || (cap == RCAP_TYPE_PACKET_INPUT))
{
stmt_input = false;
}
else if (cap == RCAP_TYPE_STMT_INPUT)
{
stmt_input = true;
/** Mark buffer to as MySQL type */
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
}
else
{
mysql_send_custom_error(dcb,
1,
0,
"Reading router capabilities "
"failed. Router session is "
"closed.");
rc = 1;
goto return_rc;
}
/** Route COM_QUIT to backend */
if (mysql_command == '\x01') {
router->routeQuery(router_instance, rsession, read_buffer);
@ -690,10 +722,25 @@ int gw_read_client_event(DCB* dcb) {
}
else
{
/** Route other commands to backend */
rc = router->routeQuery(router_instance,
if (stmt_input)
{
/**
* Feed each statement completely and separately
* to router.
*/
rc = route_by_statement(router_instance,
router,
rsession,
read_buffer);
}
else
{
/** Feed whole packet to router */
rc = router->routeQuery(router_instance,
rsession,
read_buffer);
}
/** succeed */
if (rc == 1) {
rc = 0; /**< here '0' means success */
@ -1213,37 +1260,143 @@ return_rc:
* so that it only cover the remaining buffer.
*
*/
static void* gw_MySQL_get_next_stmt(
void* buffer)
static GWBUF* gw_MySQL_get_next_stmt(
GWBUF** p_readbuf)
{
GWBUF* readbuf = (GWBUF *)buffer;
GWBUF* stmtbuf;
unsigned char* packet;
size_t len;
size_t buflen;
size_t strlen;
uint8_t* packet;
CHK_GWBUF(readbuf);
if (GWBUF_EMPTY(readbuf))
if (*p_readbuf == NULL)
{
stmtbuf = NULL;
goto return_stmtbuf;
}
packet = GWBUF_DATA(readbuf);
len = packet[0];
len += 255*packet[1];
len += 255*255*packet[2];
}
CHK_GWBUF(*p_readbuf);
/** vraa :Multi-packet stmt is not supported as of 7.3.14 */
if (len+4 > GWBUF_LENGTH(readbuf))
if (GWBUF_EMPTY(*p_readbuf))
{
stmtbuf = NULL;
goto return_stmtbuf;
}
stmtbuf = gwbuf_clone_portion(readbuf, 0, 4+len);
gwbuf_consume(readbuf, 4+len);
buflen = GWBUF_LENGTH((*p_readbuf));
packet = GWBUF_DATA((*p_readbuf));
strlen = MYSQL_GET_PACKET_LEN(packet);
/** vraa :Multi-packet stmt is not supported as of 7.3.14 */
if (strlen-1 > buflen-5)
{
stmtbuf = NULL;
goto return_stmtbuf;
}
stmtbuf = gwbuf_clone_portion(*p_readbuf, 0, strlen+4);
*p_readbuf = gwbuf_consume(*p_readbuf, strlen+4);
return_stmtbuf:
return (void *)stmtbuf;
return stmtbuf;
}
/**
* Detect if buffer includes partial mysql packet or multiple packets.
* Store partial packet to pendingqueue. Send complete packets one by one
* to router.
*/
static int route_by_statement(
ROUTER* router_instance,
ROUTER_OBJECT* router,
void* rsession,
GWBUF* readbuf)
{
int rc = -1;
DCB* master_dcb;
GWBUF* stmtbuf;
uint8_t* payload;
static size_t len;
#if defined(SS_DEBUG)
uint8_t router_capabilities;
router_capabilities = router->getCapabilities(router_instance, rsession);
ss_dassert(router_capabilities == RCAP_TYPE_STMT_INPUT);
#endif
do
{
stmtbuf = gw_MySQL_get_next_stmt(&readbuf);
ss_dassert(stmtbuf != NULL);
CHK_GWBUF(stmtbuf);
payload = (uint8_t *)GWBUF_DATA(stmtbuf);
len += MYSQL_GET_PACKET_LEN(payload);
/**
* If message is longer than read data, suspend routing and
* add statement buffer to wait queue.
*/
rc = router->routeQuery(router_instance, rsession, stmtbuf);
len = 0; /*< if routed, reset the length indicator */
}
while (readbuf != NULL);
return rc;
}
/**
* Create a character array including the query string.
* GWBUF given as input includes either one complete or partial query.
* Length of buffer is at most the query length+4 (length of packet header).
*/
#if defined(NOT_USED)
static char* gw_get_or_create_querystr (
void* data,
bool* new_allocation)
{
GWBUF* buf = (GWBUF *)data;
size_t buflen; /*< first gw buffer data length */
size_t packetlen; /*< length of mysql packet */
size_t querylen; /*< total buffer length-<length of type indicator> */
size_t nbytes_copied;
char* startpos; /*< first byte of query in gw buffer */
char* str; /*< resulting query string */
CHK_GWBUF(buf);
packetlen = MYSQL_GET_PACKET_LEN((uint8_t *)GWBUF_DATA(buf));
str = (char *)malloc(packetlen); /*< leave space for terminating null */
if (str == NULL)
{
goto return_str;
}
*new_allocation = true;
/**
* First buffer includes 4 bytes header and a type indicator byte.
*/
buflen = GWBUF_LENGTH(buf);
querylen = packetlen-1;
ss_dassert(buflen<=querylen+5); /*< 5 == header+type indicator */
startpos = (char *)GWBUF_DATA(buf)+5;
nbytes_copied = MIN(querylen, buflen-5);
memcpy(str, startpos, nbytes_copied);
memset(&str[querylen-1], 0, 1);
buf = gwbuf_consume(buf, querylen-1);
/**
* In case of multi-packet statement whole buffer consists of query
* string.
*/
while (buf != NULL)
{
buflen = GWBUF_LENGTH(buf);
memcpy(str+nbytes_copied, GWBUF_DATA(buf), buflen);
nbytes_copied += buflen;
buf = gwbuf_consume(buf, buflen);
}
ss_dassert(str[querylen-1] == 0);
return_str:
return str;
}
#endif