MXS-1203: Better handling of batch queries

When batched queries are done through readwritesplit, it will now handle
them one by one. This allows batched queries to be used with
readwritesplit but it does impose a performance penalty when compared to
direct execution on the backend.
This commit is contained in:
Markus Mäkelä
2017-03-31 14:00:20 +03:00
parent a8b42d24b7
commit 66cf571412
9 changed files with 266 additions and 108 deletions

View File

@ -430,6 +430,8 @@ bool mxs_mysql_is_ok_packet(GWBUF *buffer);
/** Check for result set */
bool mxs_mysql_is_result_set(GWBUF *buffer);
/** Get current command for a session */
mysql_server_cmd_t mxs_mysql_current_command(MXS_SESSION* session);
/**
* @brief Calculate how many packets a session command will receive
*

View File

@ -756,7 +756,7 @@ gw_read_and_write(DCB *dcb)
* If protocol has session command set, concatenate whole
* response into one buffer.
*/
if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, false) != MYSQL_COM_UNDEFINED)
if (protocol_get_srv_command((MySQLProtocol *)dcb->protocol, true) != MYSQL_COM_UNDEFINED)
{
stmt = process_response_data(dcb, &read_buffer, gwbuf_length(read_buffer));
/**

View File

@ -1562,3 +1562,9 @@ bool mxs_mysql_is_result_set(GWBUF *buffer)
return rval;
}
mysql_server_cmd_t mxs_mysql_current_command(MXS_SESSION* session)
{
MySQLProtocol* proto = (MySQLProtocol*)session->client_dcb->protocol;
return proto->current_command;
}

View File

@ -1,4 +1,4 @@
add_library(readwritesplit SHARED readwritesplit.c rwsplit_mysql.c rwsplit_route_stmt.c rwsplit_select_backends.c rwsplit_session_cmd.c rwsplit_tmp_table_multi.c)
target_link_libraries(readwritesplit maxscale-common)
target_link_libraries(readwritesplit maxscale-common MySQLCommon)
set_target_properties(readwritesplit PROPERTIES VERSION "1.0.2")
install_module(readwritesplit core)

View File

@ -334,6 +334,8 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess
client_rses->client_dcb = session->client_dcb;
client_rses->have_tmp_tables = false;
client_rses->forced_node = NULL;
client_rses->expected_responses = 0;
client_rses->query_queue = NULL;
memcpy(&client_rses->rses_config, &router->rwsplit_config, sizeof(client_rses->rses_config));
int router_nservers = router->service->n_dbref;
@ -532,12 +534,44 @@ void close_failed_bref(backend_ref_t *bref, bool fatal)
{
sescmd_cursor_set_active(&bref->bref_sescmd_cur, false);
}
}
if (bref->bref_pending_cmd)
bool route_stored_query(ROUTER_CLIENT_SES *rses)
{
bool rval = true;
if (rses->query_queue)
{
gwbuf_free(bref->bref_pending_cmd);
bref->bref_pending_cmd = NULL;
GWBUF* query_queue = modutil_get_next_MySQL_packet(&rses->query_queue);
query_queue = gwbuf_make_contiguous(query_queue);
/** Store the query queue locally for the duration of the routeQuery call.
* This prevents recursive calls into this function. */
GWBUF *temp_storage = rses->query_queue;
rses->query_queue = NULL;
if (!routeQuery((MXS_ROUTER*)rses->router, (MXS_ROUTER_SESSION*)rses, query_queue))
{
rval = false;
char* sql = modutil_get_SQL(query_queue);
if (sql)
{
MXS_ERROR("Routing query \"%s\" failed.", sql);
MXS_FREE(sql);
}
else
{
MXS_ERROR("Failed to route query.");
}
gwbuf_free(query_queue);
}
ss_dassert(rses->query_queue == NULL);
rses->query_queue = temp_storage;
}
return rval;
}
/**
@ -565,10 +599,26 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
{
closed_session_reply(querybuf);
}
else
{
if (rses->expected_responses || rses->query_queue)
{
/** We are already processing a request from the client. Store the
* new query and wait for the previous one to complete. */
rses->query_queue = gwbuf_append(rses->query_queue, querybuf);
querybuf = NULL;
rval = 1;
if (rses->expected_responses == 0 && !route_stored_query(rses))
{
rval = 0;
}
}
else if (route_single_stmt(inst, rses, querybuf))
{
rval = 1;
}
}
if (querybuf != NULL)
{
@ -651,6 +701,57 @@ static void diagnostics(MXS_ROUTER *instance, DCB *dcb)
}
}
/**
* @brief Check if we have received a complete reply from the backend
*
* @param bref Backend reference
* @param buffer Buffer containing the response
*
* @return True if the complete response has been received
*/
bool reply_is_complete(backend_ref_t* bref, GWBUF *buffer)
{
if (bref->reply_state == REPLY_STATE_START &&
!mxs_mysql_is_result_set(buffer))
{
/** Not a result set, we have the complete response */
LOG_RS(bref, REPLY_STATE_DONE);
bref->reply_state = REPLY_STATE_DONE;
}
else
{
int more;
int n_eof = bref->reply_state == REPLY_STATE_RSET_ROWS ? 1 : 0;
n_eof += modutil_count_signal_packets(buffer, 0, n_eof, &more);
mysql_server_cmd_t cmd = mxs_mysql_current_command(bref->bref_dcb->session);
if (n_eof == 0)
{
/** Waiting for the EOF packet after the column definitions */
LOG_RS(bref, REPLY_STATE_RSET_COLDEF);
bref->reply_state = REPLY_STATE_RSET_COLDEF;
}
else if (n_eof == 1 && cmd != MYSQL_COM_FIELD_LIST)
{
/** Waiting for the EOF packet after the rows */
LOG_RS(bref, REPLY_STATE_RSET_ROWS);
bref->reply_state = REPLY_STATE_RSET_ROWS;
}
else
{
/** We either have a complete result set or a response to
* a COM_FIELD_LIST command */
ss_dassert(n_eof == 2 || (n_eof == 1 && cmd == MYSQL_COM_FIELD_LIST));
LOG_RS(bref, REPLY_STATE_DONE);
bref->reply_state = REPLY_STATE_DONE;
}
}
return bref->reply_state == REPLY_STATE_DONE;
}
/**
* @brief Client Reply routine (API)
*
@ -698,7 +799,15 @@ static void clientReply(MXS_ROUTER *instance,
/** Statement was successfully executed, free the stored statement */
session_clear_stmt(backend_dcb->session);
ss_dassert(bref->reply_state != REPLY_STATE_DONE);
if (reply_is_complete(bref, writebuf))
{
/** Got a complete reply, decrement expected response count */
router_cli_ses->expected_responses--;
ss_dassert(router_cli_ses->expected_responses >= 0);
ss_dassert(bref->reply_state == REPLY_STATE_DONE);
}
/**
* Active cursor means that reply is from session command
* execution.
@ -750,62 +859,38 @@ static void clientReply(MXS_ROUTER *instance,
bref_clear_state(bref, BREF_WAITING_RESULT);
}
bool queue_routed = false;
if (router_cli_ses->expected_responses == 0)
{
for (int i = 0; i < router_cli_ses->rses_nbackends; i++)
{
ss_dassert(router_cli_ses->rses_backend_ref[i].reply_state == REPLY_STATE_DONE);
}
queue_routed = router_cli_ses->query_queue != NULL;
route_stored_query(router_cli_ses);
}
else
{
ss_dassert(router_cli_ses->expected_responses > 0);
}
if (writebuf != NULL && client_dcb != NULL)
{
/** Write reply to client DCB */
MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
}
/** There is one pending session command to be executed. */
if (sescmd_cursor_is_active(scur))
else if (!queue_routed && sescmd_cursor_is_active(scur))
{
bool succp;
MXS_INFO("Backend [%s]:%d processed reply and starts to execute active cursor.",
bref->ref->server->name, bref->ref->server->port);
succp = execute_sescmd_in_backend(bref);
if (!succp)
if (execute_sescmd_in_backend(bref))
{
MXS_INFO("Backend [%s]:%d failed to execute session command.",
bref->ref->server->name, bref->ref->server->port);
router_cli_ses->expected_responses++;
}
}
else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */
{
int ret;
CHK_GWBUF(bref->bref_pending_cmd);
if ((ret = bref->bref_dcb->func.write(bref->bref_dcb,
gwbuf_clone(bref->bref_pending_cmd))) == 1)
{
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
atomic_add_uint64(&inst->stats.n_queries, 1);
/**
* Add one query response waiter to backend reference
*/
bref_set_state(bref, BREF_QUERY_ACTIVE);
bref_set_state(bref, BREF_WAITING_RESULT);
}
else
{
char* sql = modutil_get_SQL(bref->bref_pending_cmd);
if (sql)
{
MXS_ERROR("Routing query \"%s\" failed.", sql);
MXS_FREE(sql);
}
else
{
MXS_ERROR("Failed to route query.");
}
}
gwbuf_free(bref->bref_pending_cmd);
bref->bref_pending_cmd = NULL;
}
}
@ -820,7 +905,7 @@ static void clientReply(MXS_ROUTER *instance,
*/
static uint64_t getCapabilities(MXS_ROUTER* instance)
{
return RCAP_TYPE_NONE;
return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_STMT_OUTPUT;
}
/*
@ -1374,6 +1459,10 @@ static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, backend_ref_t *old
if (bref->bref_dcb->func.write(bref->bref_dcb, stored))
{
MXS_INFO("Retrying failed read at '%s'.", bref->ref->server->unique_name);
ss_dassert(bref->reply_state == REPLY_STATE_DONE);
LOG_RS(bref, REPLY_STATE_START);
bref->reply_state = REPLY_STATE_START;
rses->expected_responses++;
success = true;
break;
}
@ -1391,6 +1480,10 @@ static bool reroute_stored_statement(ROUTER_CLIENT_SES *rses, backend_ref_t *old
if (bref->bref_dcb->func.write(bref->bref_dcb, stored))
{
MXS_INFO("Retrying failed read at '%s'.", bref->ref->server->unique_name);
LOG_RS(bref, REPLY_STATE_START);
ss_dassert(bref->reply_state == REPLY_STATE_DONE);
bref->reply_state = REPLY_STATE_START;
rses->expected_responses++;
success = true;
}
}
@ -1441,14 +1534,14 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
}
CHK_BACKEND_REF(bref);
/**
* If query was sent through the bref and it is waiting for reply from
* the backend server it is necessary to send an error to the client
* because it is waiting for reply.
*/
if (BREF_IS_WAITING_RESULT(bref))
{
GWBUF *stored;
/**
* A query was sent through the backend and it is waiting for a reply.
* Try to reroute the statement to a working server or send an error
* to the client.
*/
GWBUF *stored = NULL;
const SERVER *target;
if (!session_take_stmt(backend_dcb->session, &stored, &target) ||
@ -1457,13 +1550,32 @@ static bool handle_error_new_connection(ROUTER_INSTANCE *inst,
{
/**
* We failed to route the stored statement or no statement was
* stored for this server. Either way we can safely free the buffer.
* stored for this server. Either way we can safely free the buffer
* and decrement the expected response count.
*/
gwbuf_free(stored);
myrses->expected_responses--;
if (!sescmd_cursor_is_active(&bref->bref_sescmd_cur))
{
/**
* The backend was executing a command that requires a reply.
* Send an error to the client to let it know the query has
* failed.
*/
DCB *client_dcb = ses->client_dcb;
client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
}
if (myrses->expected_responses == 0)
{
/**
* The response from this server was the last one, try to
* route any stored queries
*/
route_stored_query(myrses);
}
}
}
RW_CHK_DCB(bref, backend_dcb);
@ -1605,6 +1717,7 @@ static bool create_backends(ROUTER_CLIENT_SES *rses, backend_ref_t** dest, int*
#endif
backend_ref[i].bref_state = 0;
backend_ref[i].ref = sref;
backend_ref[i].reply_state = REPLY_STATE_DONE;
/** store pointers to sescmd list to both cursors */
backend_ref[i].bref_sescmd_cur.scmd_cur_rses = rses;
backend_ref[i].bref_sescmd_cur.scmd_cur_active = false;

View File

@ -234,6 +234,39 @@ typedef struct sescmd_cursor_st
#endif
} sescmd_cursor_t;
/** Enum for tracking client reply state */
typedef enum
{
REPLY_STATE_START, /**< Query sent to backend */
REPLY_STATE_DONE, /**< Complete reply received */
REPLY_STATE_RSET_COLDEF, /**< Resultset response, waiting for column definitions */
REPLY_STATE_RSET_ROWS /**< Resultset response, waiting for rows */
} reply_state_t;
/**
* Helper function to convert reply_state_t to string
*/
static inline const char* rstostr(reply_state_t state)
{
switch (state)
{
case REPLY_STATE_START:
return "REPLY_STATE_START";
case REPLY_STATE_DONE:
return "REPLY_STATE_DONE";
case REPLY_STATE_RSET_COLDEF:
return "REPLY_STATE_RSET_COLDEF";
case REPLY_STATE_RSET_ROWS:
return "REPLY_STATE_RSET_ROWS";
}
ss_dassert(false);
return "UNKNOWN";
}
/**
* Reference to BACKEND.
*
@ -249,9 +282,9 @@ typedef struct backend_ref_st
bref_state_t bref_state;
int bref_num_result_wait;
sescmd_cursor_t bref_sescmd_cur;
GWBUF* bref_pending_cmd; /**< For stmt which can't be routed due active sescmd execution */
unsigned char reply_cmd; /**< The reply the backend server sent to a session command.
* Used to detect slaves that fail to execute session command. */
reply_state_t reply_state; /**< Reply state of the current query */
#if defined(SS_DEBUG)
skygw_chk_t bref_chk_tail;
#endif
@ -322,6 +355,8 @@ struct router_client_session
DCB* client_dcb;
int pos_generator;
backend_ref_t *forced_node; /*< Current server where all queries should be sent */
int expected_responses; /**< Number of expected responses to the current query */
GWBUF* query_queue; /**< Queued commands waiting to be executed */
#if defined(PREP_STMT_CACHING)
HASHTABLE* rses_prep_stmt[2];
#endif
@ -359,6 +394,22 @@ typedef struct router_instance
#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \
(SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : BE_UNDEFINED));
/**
* @brief Route a stored query
*
* When multiple queries are executed in a pipeline fashion, the readwritesplit
* stores the extra queries in a queue. This queue is emptied after reading a
* reply from the backend server.
*
* @param rses Router client session
* @return True if a stored query was routed successfully
*/
bool route_stored_query(ROUTER_CLIENT_SES *rses);
/** Reply state change debug logging */
#define LOG_RS(a, b) MXS_DEBUG("[%s]:%d %s -> %s", (a)->ref->server->name, \
(a)->ref->server->port, rstostr((a)->reply_state), rstostr(b));
MXS_END_DECLS
#endif /*< _RWSPLITROUTER_H */

View File

@ -365,38 +365,29 @@ void check_session_command_reply(GWBUF *writebuf, sescmd_cursor_t *scur, backend
*/
bool execute_sescmd_in_backend(backend_ref_t *backend_ref)
{
DCB *dcb;
bool succp;
int rc = 0;
sescmd_cursor_t *scur;
GWBUF *buf;
if (backend_ref == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return false;
}
ss_dassert(backend_ref);
CHK_BACKEND_REF(backend_ref);
bool succp = false;
if (BREF_IS_CLOSED(backend_ref))
{
succp = false;
goto return_succp;
return succp;
}
dcb = backend_ref->bref_dcb;
DCB *dcb = backend_ref->bref_dcb;
CHK_DCB(dcb);
CHK_BACKEND_REF(backend_ref);
/**
* Get cursor pointer and copy of command buffer to cursor.
*/
scur = &backend_ref->bref_sescmd_cur;
sescmd_cursor_t *scur = &backend_ref->bref_sescmd_cur;
/** Return if there are no pending ses commands */
if (sescmd_cursor_get_command(scur) == NULL)
{
succp = true;
MXS_INFO("Cursor had no pending session commands.");
goto return_succp;
return succp;
}
if (!sescmd_cursor_is_active(scur))
@ -405,6 +396,9 @@ bool execute_sescmd_in_backend(backend_ref_t *backend_ref)
sescmd_cursor_set_active(scur, true);
}
int rc = 0;
GWBUF *buf;
switch (scur->scmd_cur_cmd->my_sescmd_packet_type)
{
case MYSQL_COM_CHANGE_USER:
@ -418,12 +412,14 @@ bool execute_sescmd_in_backend(backend_ref_t *backend_ref)
{
/**
* Record database name and store to session.
*
* TODO: Do this in the client protocol module
*/
GWBUF *tmpbuf;
MYSQL_session *data;
MYSQL_session* data;
unsigned int qlen;
data = dcb->session->client_dcb->data;
data = (MYSQL_session*)dcb->session->client_dcb->data;
*data->db = 0;
tmpbuf = scur->scmd_cur_cmd->my_sescmd_buf;
qlen = MYSQL_GET_PAYLOAD_LEN((unsigned char *) GWBUF_DATA(tmpbuf));
@ -458,12 +454,11 @@ bool execute_sescmd_in_backend(backend_ref_t *backend_ref)
if (rc == 1)
{
succp = true;
ss_dassert(backend_ref->reply_state == REPLY_STATE_DONE);
LOG_RS(backend_ref, REPLY_STATE_START);
backend_ref->reply_state = REPLY_STATE_START;
}
else
{
succp = false;
}
return_succp:
return succp;
}

View File

@ -383,6 +383,7 @@ bool route_session_write(ROUTER_CLIENT_SES *router_cli_ses,
{
if (execute_sescmd_in_backend(&backend_ref[i]))
{
router_cli_ses->expected_responses++;
nsucc += 1;
}
else
@ -1246,9 +1247,9 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
GWBUF *querybuf, DCB *target_dcb, bool store)
{
backend_ref_t *bref;
sescmd_cursor_t *scur;
bref = get_bref_from_dcb(rses, target_dcb);
ss_dassert(bref->reply_state == REPLY_STATE_DONE);
/**
* If the transaction is READ ONLY set forced_node to bref
@ -1262,23 +1263,14 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
target_dcb->server->unique_name);
}
scur = &bref->bref_sescmd_cur;
ss_dassert(target_dcb != NULL);
MXS_INFO("Route query to %s \t[%s]:%d <",
(SERVER_IS_MASTER(bref->ref->server) ? "master"
: "slave"), bref->ref->server->name, bref->ref->server->port);
/**
* Store current statement if execution of previous session command is still
* active. Since the master server's response is always used, we can safely
* write session commands to the master even if it is already executing.
*/
if (sescmd_cursor_is_active(scur) && bref != rses->rses_master_ref)
{
bref->bref_pending_cmd = gwbuf_append(bref->bref_pending_cmd, gwbuf_clone(querybuf));
return true;
}
(SERVER_IS_MASTER(bref->ref->server) ? "master" : "slave"),
bref->ref->server->name, bref->ref->server->port);
/** The session command cursor must not be active */
ss_dassert(!sescmd_cursor_is_active(&bref->bref_sescmd_cur));
if (target_dcb->func.write(target_dcb, gwbuf_clone(querybuf)) == 1)
{
@ -1297,6 +1289,10 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
bref_set_state(bref, BREF_QUERY_ACTIVE);
bref_set_state(bref, BREF_WAITING_RESULT);
ss_dassert(bref->reply_state == REPLY_STATE_DONE);
LOG_RS(bref, REPLY_STATE_START);
bref->reply_state = REPLY_STATE_START;
rses->expected_responses++;
/**
* If a READ ONLYtransaction is ending set forced_node to NULL
*/

View File

@ -321,16 +321,11 @@ GWBUF *sescmd_cursor_clone_querybuf(sescmd_cursor_t *scur)
bool execute_sescmd_history(backend_ref_t *bref)
{
bool succp = true;
sescmd_cursor_t *scur;
if (bref == NULL)
{
MXS_ERROR("[%s] Error: NULL parameter.", __FUNCTION__);
return false;
}
ss_dassert(bref);
CHK_BACKEND_REF(bref);
bool succp = true;
scur = &bref->bref_sescmd_cur;
sescmd_cursor_t *scur = &bref->bref_sescmd_cur;
CHK_SESCMD_CUR(scur);
if (!sescmd_cursor_history_empty(scur))