Use service capabilities in response processing

The MySQLBackend protocol now only checks for complete packets if the
service requires statement based routing. This should remove unnecessary
processing when data is only streamed from the backend to the client.
This commit is contained in:
Markus Makela
2016-10-31 23:55:05 +02:00
parent abc0681248
commit fe56e65903

View File

@ -673,6 +673,42 @@ gw_reply_on_error(DCB *dcb, mxs_auth_state_t state)
gwbuf_free(errbuf); gwbuf_free(errbuf);
} }
/**
* @brief Check if a reply can be routed to the client
*
* @param Backend DCB
* @return True if session is ready for reply routing
*/
static inline bool session_ok_to_route(DCB *dcb)
{
bool rval = false;
if (dcb->session->state == SESSION_STATE_ROUTER_READY &&
dcb->session->client_dcb != NULL &&
dcb->session->client_dcb->state == DCB_STATE_POLLING &&
(dcb->session->router_session ||
service_get_capabilities(dcb->session->service) & RCAP_TYPE_NO_RSESSION))
{
MySQLProtocol *client_protocol = (MySQLProtocol *)dcb->session->client_dcb->protocol;
if (client_protocol)
{
CHK_PROTOCOL(client_protocol);
if (client_protocol->protocol_auth_state == MXS_AUTH_STATE_COMPLETE)
{
rval = true;
}
}
else if (dcb->session->client_dcb->dcb_role == DCB_ROLE_INTERNAL)
{
rval = true;
}
}
return rval;
}
/** /**
* @brief With authentication completed, read new data and write to backend * @brief With authentication completed, read new data and write to backend
* *
@ -686,7 +722,7 @@ gw_read_and_write(DCB *dcb)
GWBUF *read_buffer = NULL; GWBUF *read_buffer = NULL;
SESSION *session = dcb->session; SESSION *session = dcb->session;
int nbytes_read; int nbytes_read;
int return_code; int return_code = 0;
CHK_SESSION(session); CHK_SESSION(session);
@ -719,28 +755,24 @@ gw_read_and_write(DCB *dcb)
session->state = SESSION_STATE_STOPPING; session->state = SESSION_STATE_STOPPING;
spinlock_release(&session->ses_lock); spinlock_release(&session->ses_lock);
} }
return_code = 0; return 0;
goto return_rc;
} }
nbytes_read = gwbuf_length(read_buffer); nbytes_read = gwbuf_length(read_buffer);
if (nbytes_read == 0) if (nbytes_read == 0)
{ {
ss_dassert(read_buffer == NULL); ss_dassert(read_buffer == NULL);
goto return_rc; return return_code;
} }
else else
{ {
ss_dassert(read_buffer != NULL); ss_dassert(read_buffer != NULL);
} }
if (nbytes_read < 3) /** Ask what type of input the router/filter chain expects */
{ uint64_t capabilities = service_get_capabilities(session->service);
dcb->dcb_readqueue = read_buffer;
return_code = 0;
goto return_rc;
}
if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT))
{ {
GWBUF *tmp = modutil_get_complete_packets(&read_buffer); GWBUF *tmp = modutil_get_complete_packets(&read_buffer);
/* Put any residue into the read queue */ /* Put any residue into the read queue */
@ -750,19 +782,29 @@ gw_read_and_write(DCB *dcb)
if (tmp == NULL) if (tmp == NULL)
{ {
/** No complete packets */ /** No complete packets */
return_code = 0; return 0;
goto return_rc;
} }
else
read_buffer = tmp;
if (rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_INPUT))
{ {
read_buffer = tmp; if ((tmp = gwbuf_make_contiguous(read_buffer)))
{
read_buffer = tmp;
}
else
{
/** Failed to make the buffer contiguous */
gwbuf_free(read_buffer);
poll_fake_hangup_event(dcb);
return 0;
}
} }
} }
MySQLProtocol *proto = (MySQLProtocol *)dcb->protocol; MySQLProtocol *proto = (MySQLProtocol *)dcb->protocol;
spinlock_acquire(&dcb->authlock);
if (proto->ignore_reply) if (proto->ignore_reply)
{ {
@ -773,8 +815,6 @@ gw_read_and_write(DCB *dcb)
proto->ignore_reply = false; proto->ignore_reply = false;
gwbuf_free(read_buffer); gwbuf_free(read_buffer);
spinlock_release(&dcb->authlock);
int rval = 0; int rval = 0;
if (result == MYSQL_REPLY_OK) if (result == MYSQL_REPLY_OK)
@ -792,14 +832,13 @@ gw_read_and_write(DCB *dcb)
return rval; return rval;
} }
spinlock_release(&dcb->authlock);
/** /**
* If protocol has session command set, concatenate whole * If protocol has session command set, concatenate whole
* response into one buffer. * response into one buffer.
*/ */
if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) != MYSQL_COM_UNDEFINED) if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) != MYSQL_COM_UNDEFINED)
{ {
ss_dassert(rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT));
read_buffer = process_response_data(dcb, read_buffer, gwbuf_length(read_buffer)); read_buffer = process_response_data(dcb, read_buffer, gwbuf_length(read_buffer));
/** /**
* Received incomplete response to session command. * Received incomplete response to session command.
@ -807,64 +846,32 @@ gw_read_and_write(DCB *dcb)
*/ */
if (!sescmd_response_complete(dcb)) if (!sescmd_response_complete(dcb))
{ {
return_code = 0; return 0;
goto return_rc;
} }
if (!read_buffer) if (!read_buffer)
{ {
MXS_NOTICE("%lu [gw_read_backend_event] " MXS_ERROR("%lu [gw_read_backend_event] "
"Read buffer unexpectedly null, even though response " "Read buffer unexpectedly null, even though response "
"not marked as complete. User: %s", "not marked as complete. User: %s",
pthread_self(), dcb->session->client_dcb->user); pthread_self(), dcb->session->client_dcb->user);
return_code = 0; return 0;
goto return_rc;
} }
} }
/**
* Check that session is operable, and that client DCB is if (session_ok_to_route(dcb))
* still listening the socket for replies.
*/
if (dcb->session->state == SESSION_STATE_ROUTER_READY &&
dcb->session->client_dcb != NULL &&
dcb->session->client_dcb->state == DCB_STATE_POLLING &&
(session->router_session ||
service_get_capabilities(session->service) & RCAP_TYPE_NO_RSESSION))
{ {
MySQLProtocol *client_protocol = (MySQLProtocol *)dcb->session->client_dcb->protocol; gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
if (client_protocol != NULL) session->service->router->clientReply(session->service->router_instance,
{ session->router_session,
CHK_PROTOCOL(client_protocol); read_buffer, dcb);
return_code = 1;
if (client_protocol->protocol_auth_state == MXS_AUTH_STATE_COMPLETE)
{
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
session->service->router->clientReply(
session->service->router_instance,
session->router_session,
read_buffer,
dcb);
return_code = 1;
}
}
else if (dcb->session->client_dcb->dcb_role == DCB_ROLE_INTERNAL)
{
gwbuf_set_type(read_buffer, GWBUF_TYPE_MYSQL);
session->service->router->clientReply(
session->service->router_instance,
session->router_session,
read_buffer,
dcb);
return_code = 1;
}
} }
else /*< session is closing; replying to client isn't possible */ else /*< session is closing; replying to client isn't possible */
{ {
gwbuf_free(read_buffer); gwbuf_free(read_buffer);
} }
return_rc:
return return_code; return return_code;
} }
@ -943,7 +950,6 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
int rc = 0; int rc = 0;
CHK_DCB(dcb); CHK_DCB(dcb);
spinlock_acquire(&dcb->authlock);
if (dcb->was_persistent && dcb->state == DCB_STATE_POLLING) if (dcb->was_persistent && dcb->state == DCB_STATE_POLLING)
{ {
@ -963,8 +969,6 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
backend_protocol->ignore_reply = true; backend_protocol->ignore_reply = true;
backend_protocol->stored_query = queue; backend_protocol->stored_query = queue;
spinlock_release(&dcb->authlock);
GWBUF *buf = gw_create_change_user_packet(dcb->session->client_dcb->data, dcb->protocol); GWBUF *buf = gw_create_change_user_packet(dcb->session->client_dcb->data, dcb->protocol);
return dcb_write(dcb, buf) ? 1 : 0; return dcb_write(dcb, buf) ? 1 : 0;
} }
@ -983,7 +987,6 @@ static int gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
*/ */
backend_protocol->stored_query = gwbuf_append(backend_protocol->stored_query, queue); backend_protocol->stored_query = gwbuf_append(backend_protocol->stored_query, queue);
} }
spinlock_release(&dcb->authlock);
return 1; return 1;
} }