From 673631084afded858136237612bd00b09c0de237 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Mon, 3 Apr 2017 16:12:04 +0300 Subject: [PATCH] MXS-1203: Fix response tracking of LOAD DATA LOCAL INFILE When responses are being tracked, the execution of a LOAD DATA LOCAL INFILE requires special handling. The readwritesplit now has a simple state machine for the handling of the LOAD DATA LOCAL INFILE command. This should also make the code a bit more readable. --- .../routing/readwritesplit/readwritesplit.c | 17 +++++++--- .../routing/readwritesplit/readwritesplit.h | 11 ++++++- .../routing/readwritesplit/rwsplit_mysql.c | 2 +- .../readwritesplit/rwsplit_route_stmt.c | 31 ++++++++++++++----- 4 files changed, 46 insertions(+), 15 deletions(-) diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index fa10970c5..a0f018c4d 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -336,6 +336,7 @@ static MXS_ROUTER_SESSION *newSession(MXS_ROUTER *router_inst, MXS_SESSION *sess client_rses->forced_node = NULL; client_rses->expected_responses = 0; client_rses->query_queue = NULL; + client_rses->load_data_state = LOAD_DATA_INACTIVE; memcpy(&client_rses->rses_config, &router->rwsplit_config, sizeof(client_rses->rses_config)); int router_nservers = router->service->n_dbref; @@ -601,8 +602,18 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, } else { - if (rses->expected_responses || rses->query_queue) + if ((rses->expected_responses == 0 && rses->query_queue == NULL) || + rses->load_data_state == LOAD_DATA_ACTIVE) { + /** No active or pending queries */ + if (route_single_stmt(inst, rses, querybuf)) + { + rval = 1; + } + } + else + { + ss_dassert(rses->expected_responses || rses->query_queue); /** We are already processing a request from the client. Store the * new query and wait for the previous one to complete. */ MXS_DEBUG("Storing query, expecting %d replies", rses->expected_responses); @@ -615,10 +626,6 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, rval = 0; } } - else if (route_single_stmt(inst, rses, querybuf)) - { - rval = 1; - } } if (querybuf != NULL) diff --git a/server/modules/routing/readwritesplit/readwritesplit.h b/server/modules/routing/readwritesplit/readwritesplit.h index bfe227952..5bfc3cc67 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.h +++ b/server/modules/routing/readwritesplit/readwritesplit.h @@ -334,6 +334,15 @@ typedef struct prep_stmt_st #endif /*< PREP_STMT_CACHING */ +/** States of a LOAD DATA LOCAL INFILE */ +enum ld_state +{ + LOAD_DATA_INACTIVE, /**< Not active */ + LOAD_DATA_START, /**< Current query starts a load */ + LOAD_DATA_ACTIVE, /**< Load is active */ + LOAD_DATA_END /**< Current query contains an empty packet that ends the load */ +}; + /** * The client session structure used within this router. */ @@ -349,7 +358,7 @@ struct router_client_session rwsplit_config_t rses_config; /*< copied config info from router instance */ int rses_nbackends; int rses_nsescmd; /*< Number of executed session commands */ - bool rses_load_active; /*< If LOAD DATA LOCAL INFILE is being currently executed */ + enum ld_state load_data_state; /*< Current load data state */ bool have_tmp_tables; uint64_t rses_load_data_sent; /*< How much data has been sent */ DCB* client_dcb; diff --git a/server/modules/routing/readwritesplit/rwsplit_mysql.c b/server/modules/routing/readwritesplit/rwsplit_mysql.c index 0a0e30863..7961cd720 100644 --- a/server/modules/routing/readwritesplit/rwsplit_mysql.c +++ b/server/modules/routing/readwritesplit/rwsplit_mysql.c @@ -160,7 +160,7 @@ is_packet_a_one_way_message(int packet_type) void log_transaction_status(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_type_t qtype) { - if (!rses->rses_load_active) + if (rses->load_data_state == LOAD_DATA_INACTIVE) { unsigned char command = MYSQL_GET_COMMAND(GWBUF_DATA(querybuf)); char *qtypestr = qc_typemask_to_string(qtype); diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.c b/server/modules/routing/readwritesplit/rwsplit_route_stmt.c index 91ba4f90c..ee67970ba 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.c +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.c @@ -130,12 +130,13 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, } else { - route_target = TARGET_MASTER; /** Empty packet signals end of LOAD DATA LOCAL INFILE, send it to master*/ - rses->rses_load_active = false; + route_target = TARGET_MASTER; + rses->load_data_state = LOAD_DATA_END; MXS_INFO("> LOAD DATA LOCAL INFILE finished: %lu bytes sent.", rses->rses_load_data_sent + gwbuf_length(querybuf)); } + if (TARGET_IS_ALL(route_target)) { succp = handle_target_is_all(route_target, inst, rses, querybuf, packet_type, qtype); @@ -719,7 +720,7 @@ route_target_t get_route_target(ROUTER_CLIENT_SES *rses, qc_query_type_t qtype, HINT *hint) { bool trx_active = session_trx_is_active(rses->client_dcb->session); - bool load_active = rses->rses_load_active; + bool load_active = rses->load_data_state != LOAD_DATA_INACTIVE; mxs_target_t use_sql_variables_in = rses->rses_config.use_sql_variables_in; route_target_t target = TARGET_UNDEFINED; @@ -988,7 +989,7 @@ handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, * Check if this is a LOAD DATA LOCAL INFILE query. If so, send all queries * to the master until the last, empty packet arrives. */ - if (rses->rses_load_active) + if (rses->load_data_state == LOAD_DATA_ACTIVE) { rses->rses_load_data_sent += gwbuf_length(querybuf); } @@ -997,7 +998,7 @@ handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, qc_query_op_t queryop = qc_get_operation(querybuf); if (queryop == QUERY_OP_LOAD) { - rses->rses_load_active = true; + rses->load_data_state = LOAD_DATA_START; rses->rses_load_data_sent = 0; } } @@ -1233,8 +1234,8 @@ bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, static inline bool query_creates_reply(mysql_server_cmd_t cmd) { return cmd != MYSQL_COM_QUIT && - cmd != MYSQL_COM_STMT_SEND_LONG_DATA && - cmd != MYSQL_COM_STMT_CLOSE; + cmd != MYSQL_COM_STMT_SEND_LONG_DATA && + cmd != MYSQL_COM_STMT_CLOSE; } /** @@ -1290,7 +1291,7 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, mysql_server_cmd_t cmd = mxs_mysql_current_command(rses->client_dcb->session); - if (query_creates_reply(cmd)) + if (rses->load_data_state != LOAD_DATA_ACTIVE && query_creates_reply(cmd)) { /** The server will reply to this command */ ss_dassert(bref->reply_state == REPLY_STATE_DONE); @@ -1302,6 +1303,20 @@ handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, LOG_RS(bref, REPLY_STATE_START); bref->reply_state = REPLY_STATE_START; rses->expected_responses++; + + if (rses->load_data_state == LOAD_DATA_START) + { + /** The first packet contains the actual query and the server + * will respond to it */ + rses->load_data_state = LOAD_DATA_ACTIVE; + } + else if (rses->load_data_state == LOAD_DATA_END) + { + /** The final packet in a LOAD DATA LOCAL INFILE is an empty packet + * to which the server responds with an OK or an ERR packet */ + ss_dassert(gwbuf_length(querybuf) == 4); + rses->load_data_state = LOAD_DATA_INACTIVE; + } } /**