MXS-1203: Fix response tracking of LOAD DATA LOCAL INFILE
When responses are being tracked, the execution of a LOAD DATA LOCAL INFILE requires special handling. The readwritesplit now has a simple state machine for the handling of the LOAD DATA LOCAL INFILE command. This should also make the code a bit more readable.
This commit is contained in:
parent
ea38d511e3
commit
673631084a
@ -336,6 +336,7 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess
|
||||
client_rses->forced_node = NULL;
|
||||
client_rses->expected_responses = 0;
|
||||
client_rses->query_queue = NULL;
|
||||
client_rses->load_data_state = LOAD_DATA_INACTIVE;
|
||||
memcpy(&client_rses->rses_config, &router->rwsplit_config, sizeof(client_rses->rses_config));
|
||||
|
||||
int router_nservers = router->service->n_dbref;
|
||||
@ -601,8 +602,18 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
|
||||
}
|
||||
else
|
||||
{
|
||||
if (rses->expected_responses || rses->query_queue)
|
||||
if ((rses->expected_responses == 0 && rses->query_queue == NULL) ||
|
||||
rses->load_data_state == LOAD_DATA_ACTIVE)
|
||||
{
|
||||
/** No active or pending queries */
|
||||
if (route_single_stmt(inst, rses, querybuf))
|
||||
{
|
||||
rval = 1;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ss_dassert(rses->expected_responses || rses->query_queue);
|
||||
/** We are already processing a request from the client. Store the
|
||||
* new query and wait for the previous one to complete. */
|
||||
MXS_DEBUG("Storing query, expecting %d replies", rses->expected_responses);
|
||||
@ -615,10 +626,6 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
|
||||
rval = 0;
|
||||
}
|
||||
}
|
||||
else if (route_single_stmt(inst, rses, querybuf))
|
||||
{
|
||||
rval = 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (querybuf != NULL)
|
||||
|
@ -334,6 +334,15 @@ typedef struct prep_stmt_st
|
||||
|
||||
#endif /*< PREP_STMT_CACHING */
|
||||
|
||||
/** States of a LOAD DATA LOCAL INFILE */
|
||||
enum ld_state
|
||||
{
|
||||
LOAD_DATA_INACTIVE, /**< Not active */
|
||||
LOAD_DATA_START, /**< Current query starts a load */
|
||||
LOAD_DATA_ACTIVE, /**< Load is active */
|
||||
LOAD_DATA_END /**< Current query contains an empty packet that ends the load */
|
||||
};
|
||||
|
||||
/**
|
||||
* The client session structure used within this router.
|
||||
*/
|
||||
@ -349,7 +358,7 @@ struct router_client_session
|
||||
rwsplit_config_t rses_config; /*< copied config info from router instance */
|
||||
int rses_nbackends;
|
||||
int rses_nsescmd; /*< Number of executed session commands */
|
||||
bool rses_load_active; /*< If LOAD DATA LOCAL INFILE is being currently executed */
|
||||
enum ld_state load_data_state; /*< Current load data state */
|
||||
bool have_tmp_tables;
|
||||
uint64_t rses_load_data_sent; /*< How much data has been sent */
|
||||
DCB* client_dcb;
|
||||
|
@ -160,7 +160,7 @@ is_packet_a_one_way_message(int packet_type)
|
||||
void
|
||||
log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t qtype)
|
||||
{
|
||||
if (!rses->rses_load_active)
|
||||
if (rses->load_data_state == LOAD_DATA_INACTIVE)
|
||||
{
|
||||
unsigned char command = MYSQL_GET_COMMAND(GWBUF_DATA(querybuf));
|
||||
char *qtypestr = qc_typemask_to_string(qtype);
|
||||
|
@ -130,12 +130,13 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
}
|
||||
else
|
||||
{
|
||||
route_target = TARGET_MASTER;
|
||||
/** Empty packet signals end of LOAD DATA LOCAL INFILE, send it to master*/
|
||||
rses->rses_load_active = false;
|
||||
route_target = TARGET_MASTER;
|
||||
rses->load_data_state = LOAD_DATA_END;
|
||||
MXS_INFO("> LOAD DATA LOCAL INFILE finished: %lu bytes sent.",
|
||||
rses->rses_load_data_sent + gwbuf_length(querybuf));
|
||||
}
|
||||
|
||||
if (TARGET_IS_ALL(route_target))
|
||||
{
|
||||
succp = handle_target_is_all(route_target, inst, rses, querybuf, packet_type, qtype);
|
||||
@ -719,7 +720,7 @@ route_target_t get_route_target(ROUTER_CLIENT_SES *rses,
|
||||
qc_query_type_t qtype, HINT *hint)
|
||||
{
|
||||
bool trx_active = session_trx_is_active(rses->client_dcb->session);
|
||||
bool load_active = rses->rses_load_active;
|
||||
bool load_active = rses->load_data_state != LOAD_DATA_INACTIVE;
|
||||
mxs_target_t use_sql_variables_in = rses->rses_config.use_sql_variables_in;
|
||||
route_target_t target = TARGET_UNDEFINED;
|
||||
|
||||
@ -988,7 +989,7 @@ handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
|
||||
* Check if this is a LOAD DATA LOCAL INFILE query. If so, send all queries
|
||||
* to the master until the last, empty packet arrives.
|
||||
*/
|
||||
if (rses->rses_load_active)
|
||||
if (rses->load_data_state == LOAD_DATA_ACTIVE)
|
||||
{
|
||||
rses->rses_load_data_sent += gwbuf_length(querybuf);
|
||||
}
|
||||
@ -997,7 +998,7 @@ handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf,
|
||||
qc_query_op_t queryop = qc_get_operation(querybuf);
|
||||
if (queryop == QUERY_OP_LOAD)
|
||||
{
|
||||
rses->rses_load_active = true;
|
||||
rses->load_data_state = LOAD_DATA_START;
|
||||
rses->rses_load_data_sent = 0;
|
||||
}
|
||||
}
|
||||
@ -1233,8 +1234,8 @@ bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
static inline bool query_creates_reply(mysql_server_cmd_t cmd)
|
||||
{
|
||||
return cmd != MYSQL_COM_QUIT &&
|
||||
cmd != MYSQL_COM_STMT_SEND_LONG_DATA &&
|
||||
cmd != MYSQL_COM_STMT_CLOSE;
|
||||
cmd != MYSQL_COM_STMT_SEND_LONG_DATA &&
|
||||
cmd != MYSQL_COM_STMT_CLOSE;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1290,7 +1291,7 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
|
||||
mysql_server_cmd_t cmd = mxs_mysql_current_command(rses->client_dcb->session);
|
||||
|
||||
if (query_creates_reply(cmd))
|
||||
if (rses->load_data_state != LOAD_DATA_ACTIVE && query_creates_reply(cmd))
|
||||
{
|
||||
/** The server will reply to this command */
|
||||
ss_dassert(bref->reply_state == REPLY_STATE_DONE);
|
||||
@ -1302,6 +1303,20 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses,
|
||||
LOG_RS(bref, REPLY_STATE_START);
|
||||
bref->reply_state = REPLY_STATE_START;
|
||||
rses->expected_responses++;
|
||||
|
||||
if (rses->load_data_state == LOAD_DATA_START)
|
||||
{
|
||||
/** The first packet contains the actual query and the server
|
||||
* will respond to it */
|
||||
rses->load_data_state = LOAD_DATA_ACTIVE;
|
||||
}
|
||||
else if (rses->load_data_state == LOAD_DATA_END)
|
||||
{
|
||||
/** The final packet in a LOAD DATA LOCAL INFILE is an empty packet
|
||||
* to which the server responds with an OK or an ERR packet */
|
||||
ss_dassert(gwbuf_length(querybuf) == 4);
|
||||
rses->load_data_state = LOAD_DATA_INACTIVE;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
Loading…
x
Reference in New Issue
Block a user