@ -568,6 +568,9 @@ int gw_read_client_event(
|
|||||||
GWBUF *read_buffer = NULL;
|
GWBUF *read_buffer = NULL;
|
||||||
int rc = 0;
|
int rc = 0;
|
||||||
int nbytes_read = 0;
|
int nbytes_read = 0;
|
||||||
|
uint8_t cap = 0;
|
||||||
|
bool stmt_input; /*< router input type */
|
||||||
|
|
||||||
CHK_DCB(dcb);
|
CHK_DCB(dcb);
|
||||||
protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
|
protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
|
||||||
CHK_PROTOCOL(protocol);
|
CHK_PROTOCOL(protocol);
|
||||||
@ -583,54 +586,133 @@ int gw_read_client_event(
|
|||||||
{
|
{
|
||||||
goto return_rc;
|
goto return_rc;
|
||||||
}
|
}
|
||||||
/**
|
|
||||||
* if read queue existed appent read to it.
|
|
||||||
* if length of read buffer is less than 3 or less than mysql packet
|
|
||||||
* then return.
|
|
||||||
* else copy mysql packets to separate buffers from read buffer and
|
|
||||||
* continue.
|
|
||||||
* else
|
|
||||||
* if read queue didn't exist, length of read is less than 3 or less
|
|
||||||
* than mysql packet then
|
|
||||||
* create read queue and append to it and return.
|
|
||||||
* if length read is less than mysql packet length append to read queue
|
|
||||||
* append to it and return.
|
|
||||||
* else (complete packet was read) continue.
|
|
||||||
*/
|
|
||||||
if (dcb->dcb_readqueue)
|
|
||||||
{
|
|
||||||
uint8_t* data;
|
|
||||||
|
|
||||||
dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, read_buffer);
|
|
||||||
nbytes_read = gwbuf_length(dcb->dcb_readqueue);
|
|
||||||
data = (uint8_t *)GWBUF_DATA(dcb->dcb_readqueue);
|
|
||||||
|
|
||||||
if (nbytes_read < 3 || nbytes_read < MYSQL_GET_PACKET_LEN(data))
|
|
||||||
{
|
|
||||||
rc = 0;
|
|
||||||
goto return_rc;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* There is at least one complete mysql packet in
|
|
||||||
* read_buffer.
|
|
||||||
*/
|
|
||||||
read_buffer = dcb->dcb_readqueue;
|
|
||||||
dcb->dcb_readqueue = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
uint8_t* data = (uint8_t *)GWBUF_DATA(read_buffer);
|
|
||||||
|
|
||||||
if (nbytes_read < 3 || nbytes_read < MYSQL_GET_PACKET_LEN(data)+4)
|
session = dcb->session;
|
||||||
{
|
ss_dassert(session!= NULL);
|
||||||
|
|
||||||
|
if (protocol->protocol_auth_state == MYSQL_IDLE && session != NULL)
|
||||||
|
{
|
||||||
|
CHK_SESSION(session);
|
||||||
|
router = session->service->router;
|
||||||
|
router_instance = session->service->router_instance;
|
||||||
|
rsession = session->router_session;
|
||||||
|
ss_dassert(rsession != NULL);
|
||||||
|
|
||||||
|
if (router_instance != NULL && rsession != NULL) {
|
||||||
|
|
||||||
|
/** Ask what type of input the router expects */
|
||||||
|
cap = router->getCapabilities(router_instance, rsession);
|
||||||
|
|
||||||
|
if (cap == 0 || (cap == RCAP_TYPE_PACKET_INPUT))
|
||||||
|
{
|
||||||
|
stmt_input = false;
|
||||||
|
}
|
||||||
|
else if (cap == RCAP_TYPE_STMT_INPUT)
|
||||||
|
{
|
||||||
|
stmt_input = true;
|
||||||
|
/** Mark buffer to as MySQL type */
|
||||||
|
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If router doesn't implement getCapabilities correctly we end
|
||||||
|
* up here.
|
||||||
|
*/
|
||||||
|
else
|
||||||
|
{
|
||||||
|
GWBUF* errbuf;
|
||||||
|
bool succp;
|
||||||
|
|
||||||
|
LOGIF(LD, (skygw_log_write_flush(
|
||||||
|
LOGFILE_DEBUG,
|
||||||
|
"%lu [gw_read_client_event] Reading router "
|
||||||
|
"capabilities failed.",
|
||||||
|
pthread_self())));
|
||||||
|
|
||||||
|
errbuf = mysql_create_custom_error(
|
||||||
|
1,
|
||||||
|
0,
|
||||||
|
"Read invalid router capabilities. Routing failed. "
|
||||||
|
"Session will be closed.");
|
||||||
|
|
||||||
|
router->handleError(
|
||||||
|
router_instance,
|
||||||
|
rsession,
|
||||||
|
errbuf,
|
||||||
|
dcb,
|
||||||
|
ERRACT_REPLY_CLIENT,
|
||||||
|
&succp);
|
||||||
|
gwbuf_free(errbuf);
|
||||||
|
/**
|
||||||
|
* If there are not enough backends close
|
||||||
|
* session
|
||||||
|
*/
|
||||||
|
if (!succp)
|
||||||
|
{
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error : Routing the query failed. "
|
||||||
|
"Session will be closed.")));
|
||||||
|
dcb_close(dcb);
|
||||||
|
}
|
||||||
|
rc = 1;
|
||||||
|
goto return_rc;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stmt_input) {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* if read queue existed appent read to it.
|
||||||
|
* if length of read buffer is less than 3 or less than mysql packet
|
||||||
|
* then return.
|
||||||
|
* else copy mysql packets to separate buffers from read buffer and
|
||||||
|
* continue.
|
||||||
|
* else
|
||||||
|
* if read queue didn't exist, length of read is less than 3 or less
|
||||||
|
* than mysql packet then
|
||||||
|
* create read queue and append to it and return.
|
||||||
|
* if length read is less than mysql packet length append to read queue
|
||||||
|
* append to it and return.
|
||||||
|
* else (complete packet was read) continue.
|
||||||
|
*/
|
||||||
|
if (dcb->dcb_readqueue)
|
||||||
|
{
|
||||||
|
uint8_t* data;
|
||||||
|
|
||||||
dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, read_buffer);
|
dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, read_buffer);
|
||||||
rc = 0;
|
nbytes_read = gwbuf_length(dcb->dcb_readqueue);
|
||||||
goto return_rc;
|
data = (uint8_t *)GWBUF_DATA(dcb->dcb_readqueue);
|
||||||
}
|
|
||||||
}
|
if (nbytes_read < 3 || nbytes_read < MYSQL_GET_PACKET_LEN(data))
|
||||||
|
{
|
||||||
|
rc = 0;
|
||||||
|
goto return_rc;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* There is at least one complete mysql packet in
|
||||||
|
* read_buffer.
|
||||||
|
*/
|
||||||
|
read_buffer = dcb->dcb_readqueue;
|
||||||
|
dcb->dcb_readqueue = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
uint8_t* data = (uint8_t *)GWBUF_DATA(read_buffer);
|
||||||
|
|
||||||
|
if (nbytes_read < 3 || nbytes_read < MYSQL_GET_PACKET_LEN(data)+4)
|
||||||
|
{
|
||||||
|
dcb->dcb_readqueue = gwbuf_append(dcb->dcb_readqueue, read_buffer);
|
||||||
|
rc = 0;
|
||||||
|
goto return_rc;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Now there should be at least one complete mysql packet in read_buffer.
|
* Now there should be at least one complete mysql packet in read_buffer.
|
||||||
@ -738,9 +820,7 @@ int gw_read_client_event(
|
|||||||
|
|
||||||
case MYSQL_IDLE:
|
case MYSQL_IDLE:
|
||||||
{
|
{
|
||||||
uint8_t cap = 0;
|
|
||||||
uint8_t* payload = NULL;
|
uint8_t* payload = NULL;
|
||||||
bool stmt_input; /*< router input type */
|
|
||||||
|
|
||||||
ss_dassert(nbytes_read >= 5);
|
ss_dassert(nbytes_read >= 5);
|
||||||
|
|
||||||
@ -750,74 +830,11 @@ int gw_read_client_event(
|
|||||||
if (session != NULL)
|
if (session != NULL)
|
||||||
{
|
{
|
||||||
CHK_SESSION(session);
|
CHK_SESSION(session);
|
||||||
router = session->service->router;
|
|
||||||
router_instance =
|
|
||||||
session->service->router_instance;
|
|
||||||
rsession = session->router_session;
|
|
||||||
ss_dassert(rsession != NULL);
|
|
||||||
}
|
}
|
||||||
/* Now, we are assuming in the first buffer there is
|
/* Now, we are assuming in the first buffer there is
|
||||||
* the information form mysql command */
|
* the information form mysql command */
|
||||||
payload = GWBUF_DATA(read_buffer);
|
payload = GWBUF_DATA(read_buffer);
|
||||||
|
|
||||||
/** Ask what type of input the router expects */
|
|
||||||
cap = router->getCapabilities(router_instance, rsession);
|
|
||||||
|
|
||||||
if (cap == 0 || (cap == RCAP_TYPE_PACKET_INPUT))
|
|
||||||
{
|
|
||||||
stmt_input = false;
|
|
||||||
}
|
|
||||||
else if (cap == RCAP_TYPE_STMT_INPUT)
|
|
||||||
{
|
|
||||||
stmt_input = true;
|
|
||||||
/** Mark buffer to as MySQL type */
|
|
||||||
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* If router doesn't implement getCapabilities correctly we end
|
|
||||||
* up here.
|
|
||||||
*/
|
|
||||||
else
|
|
||||||
{
|
|
||||||
GWBUF* errbuf;
|
|
||||||
bool succp;
|
|
||||||
|
|
||||||
LOGIF(LD, (skygw_log_write_flush(
|
|
||||||
LOGFILE_DEBUG,
|
|
||||||
"%lu [gw_read_client_event] Reading router "
|
|
||||||
"capabilities failed.",
|
|
||||||
pthread_self())));
|
|
||||||
|
|
||||||
errbuf = mysql_create_custom_error(
|
|
||||||
1,
|
|
||||||
0,
|
|
||||||
"Read invalid router capabilities. Routing failed. "
|
|
||||||
"Session will be closed.");
|
|
||||||
|
|
||||||
router->handleError(
|
|
||||||
router_instance,
|
|
||||||
rsession,
|
|
||||||
errbuf,
|
|
||||||
dcb,
|
|
||||||
ERRACT_REPLY_CLIENT,
|
|
||||||
&succp);
|
|
||||||
gwbuf_free(errbuf);
|
|
||||||
/**
|
|
||||||
* If there are not enough backends close
|
|
||||||
* session
|
|
||||||
*/
|
|
||||||
if (!succp)
|
|
||||||
{
|
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
|
||||||
LOGFILE_ERROR,
|
|
||||||
"Error : Routing the query failed. "
|
|
||||||
"Session will be closed.")));
|
|
||||||
dcb_close(dcb);
|
|
||||||
}
|
|
||||||
rc = 1;
|
|
||||||
goto return_rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Route COM_QUIT to backend */
|
/** Route COM_QUIT to backend */
|
||||||
if (MYSQL_IS_COM_QUIT(payload))
|
if (MYSQL_IS_COM_QUIT(payload))
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user