From 296c1001a2994eaa68950e96e2b3bb20bc2833aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 23 Jun 2017 12:47:33 +0300 Subject: [PATCH] MXS-852: Track COM_STMT_EXECUTE by statement ID The COM_STMT_EXECUTE targets are now tracked per statement ID. This should theoretically allow parallel execution of COM_STMT_EXECUTE commands that use cursors but the current implementation of the reply state processing does not yet allow it. --- .../routing/readwritesplit/readwritesplit.cc | 6 ++-- .../routing/readwritesplit/readwritesplit.hh | 5 +++- .../readwritesplit/rwsplit_internal.hh | 3 +- .../readwritesplit/rwsplit_route_stmt.cc | 30 +++++++++++++------ 4 files changed, 30 insertions(+), 14 deletions(-) diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index 20d6acf57..3b0bce5b2 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -933,9 +933,9 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, ss_dassert(rses->expected_responses || rses->query_queue); /** We are already processing a request from the client. Store the * new query and wait for the previous one to complete. */ - MXS_DEBUG("Storing query (len: %d cmd: %0x), expecting %d replies", - gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], - rses->expected_responses); + MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command", + gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], + rses->expected_responses); rses->query_queue = gwbuf_append(rses->query_queue, querybuf); querybuf = NULL; rval = 1; diff --git a/server/modules/routing/readwritesplit/readwritesplit.hh b/server/modules/routing/readwritesplit/readwritesplit.hh index 3be98e899..e5c05bc3d 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.hh +++ b/server/modules/routing/readwritesplit/readwritesplit.hh @@ -301,6 +301,9 @@ typedef std::list SRWBackendList; typedef std::tr1::unordered_set TableSet; typedef std::map ResponseMap; +/** Map of COM_STMT_EXECUTE targets by internal ID */ +typedef std::tr1::unordered_map ExecMap; + /** * The client session structure used within this router. */ @@ -311,7 +314,6 @@ struct ROUTER_CLIENT_SES SRWBackendList backends; /**< List of backend servers */ SRWBackend current_master; /**< Current master server */ SRWBackend target_node; /**< The currently locked target node */ - SRWBackend last_exec_target; /**< Node where the latest COM_STMT_EXECUTE was sent */ rwsplit_config_t rses_config; /**< copied config info from router instance */ int rses_nbackends; enum ld_state load_data_state; /**< Current load data state */ @@ -330,6 +332,7 @@ struct ROUTER_CLIENT_SES uint64_t recv_sescmd; /**< ID of the most recently completed session command */ PSManager ps_manager; /**< Prepared statement manager*/ ClientHandleMap ps_handles; /**< Client PS handle to internal ID mapping */ + ExecMap exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to Backends */ skygw_chk_t rses_chk_tail; }; diff --git a/server/modules/routing/readwritesplit/rwsplit_internal.hh b/server/modules/routing/readwritesplit/rwsplit_internal.hh index d5ff4e77f..cc18c0bcc 100644 --- a/server/modules/routing/readwritesplit/rwsplit_internal.hh +++ b/server/modules/routing/readwritesplit/rwsplit_internal.hh @@ -70,7 +70,8 @@ void handle_multi_temp_and_load(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, uint8_t packet_type, uint32_t *qtype); SRWBackend handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, route_target_t route_target); -SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, uint8_t cmd); +SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, + uint8_t cmd, uint32_t id); bool handle_master_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, SRWBackend* dest); bool handle_got_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 0efd020d8..770e63683 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -129,6 +129,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, route_target_t route_target; bool succp = false; bool non_empty_packet; + uint32_t stmt_id = 0; ss_dassert(querybuf->next == NULL); // The buffer must be contiguous. @@ -170,7 +171,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, } else if (is_ps_command(command)) { - uint32_t stmt_id = get_stmt_id(rses, querybuf); + stmt_id = get_stmt_id(rses, querybuf); qtype = rses->ps_manager.get_type(stmt_id); replace_stmt_id(querybuf, stmt_id); } @@ -210,7 +211,7 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, } else if (TARGET_IS_SLAVE(route_target)) { - if ((target = handle_slave_is_target(inst, rses, command))) + if ((target = handle_slave_is_target(inst, rses, command, stmt_id))) { succp = true; store_stmt = rses->rses_config.retry_failed_reads; @@ -235,10 +236,11 @@ bool route_single_stmt(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, if (succp && command == MYSQL_COM_STMT_EXECUTE) { - /** Track where the target of the last COM_STMT_EXECUTE. This + /** Track the targets of the COM_STMT_EXECUTE statements. This * information is used to route all COM_STMT_FETCH commands * to the same server where the COM_STMT_EXECUTE was done. */ - rses->last_exec_target = target; + ss_dassert(stmt_id > 0); + rses->exec_map[stmt_id] = target; MXS_INFO("COM_STMT_EXECUTE on %s", target->uri()); } } @@ -929,19 +931,29 @@ SRWBackend handle_hinted_target(ROUTER_CLIENT_SES *rses, GWBUF *querybuf, * @return bool - true if succeeded, false otherwise */ SRWBackend handle_slave_is_target(ROUTER_INSTANCE *inst, ROUTER_CLIENT_SES *rses, - uint8_t cmd) + uint8_t cmd, uint32_t stmt_id) { int rlag_max = rses_get_max_replication_lag(rses); SRWBackend target; - if (cmd == MYSQL_COM_STMT_FETCH && rses->last_exec_target) + if (cmd == MYSQL_COM_STMT_FETCH) { /** The COM_STMT_FETCH must be executed on the same server as the * COM_STMT_EXECUTE was executed on */ - target = rses->last_exec_target; - MXS_INFO("COM_STMT_FETCH on %s", target->uri()); + ExecMap::iterator it = rses->exec_map.find(stmt_id); + + if (it != rses->exec_map.end()) + { + target = it->second; + MXS_INFO("COM_STMT_FETCH on %s", target->uri()); + } + else + { + MXS_WARNING("Unknown statement ID %u used in COM_STMT_FETCH", stmt_id); + } } - else + + if (!target) { target = get_target_backend(rses, BE_SLAVE, NULL, rlag_max); }