diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index 6058a9c1c..a7171422b 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -681,6 +681,7 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session, rses_closed(false), backends(backends), current_master(master), + large_query(false), rses_config(instance->config()), rses_nbackends(instance->service()->n_dbref), load_data_state(LOAD_DATA_INACTIVE), @@ -892,7 +893,8 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, if (rses->query_queue == NULL && (rses->expected_responses == 0 || info.command == MYSQL_COM_STMT_FETCH || - rses->load_data_state == LOAD_DATA_ACTIVE)) + rses->load_data_state == LOAD_DATA_ACTIVE || + rses->large_query)) { /** No active or pending queries */ if (route_single_stmt(inst, rses, querybuf, info)) diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 19e46aeb4..c6e9e66ed 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -181,9 +181,10 @@ bool route_single_stmt(RWSplit *inst, RWSplitSession *rses, GWBUF *querybuf, con uint8_t command = info.command; uint32_t qtype = info.type; route_target_t route_target = info.target; - bool not_locked_to_master = !rses->target_node || rses->target_node != rses->current_master; + bool not_locked_to_master = !rses->prev_target && + (!rses->target_node || rses->target_node != rses->current_master); - if (is_ps_command(command) && not_locked_to_master) + if (not_locked_to_master && is_ps_command(command)) { /** Replace the client statement ID with our internal one only if the * target node is not the current master */ @@ -194,19 +195,28 @@ bool route_single_stmt(RWSplit *inst, RWSplitSession *rses, GWBUF *querybuf, con if (TARGET_IS_ALL(route_target)) { + // TODO: Handle payloads larger than (2^24 - 1) bytes that are routed to all servers succp = handle_target_is_all(route_target, inst, rses, querybuf, command, qtype); } else { bool store_stmt = false; - /** - * There is a hint which either names the target backend or - * hint which sets maximum allowed replication lag for the - * backend. - */ - if (TARGET_IS_NAMED_SERVER(route_target) || - TARGET_IS_RLAG_MAX(route_target)) + + if (rses->large_query) { + /** We're processing a large query that's split across multiple packets. + * Route it to the same backend where we routed the previous packet. */ + ss_dassert(rses->prev_target); + target = rses->prev_target; + succp = true; + } + else if (TARGET_IS_NAMED_SERVER(route_target) || TARGET_IS_RLAG_MAX(route_target)) + { + /** + * There is a hint which either names the target backend or + * hint which sets maximum allowed replication lag for the + * backend. + */ if ((target = handle_hinted_target(rses, querybuf, route_target))) { succp = true; @@ -1077,6 +1087,13 @@ static inline bool query_creates_reply(uint8_t cmd) cmd != MYSQL_COM_STMT_FETCH; // Fetch is done mid-result } +static inline bool is_large_query(GWBUF* buf) +{ + uint32_t buflen = gwbuf_length(buf); + ss_dassert(buflen <= MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN); + return buflen == MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN; +} + /** * @brief Handle writing to a target server * @@ -1112,6 +1129,8 @@ bool handle_got_target(RWSplit *inst, RWSplitSession *rses, response = mxs::Backend::EXPECT_RESPONSE; } + bool large_query = is_large_query(querybuf); + if (target->write(gwbuf_clone(querybuf), response)) { if (store && !session_store_stmt(rses->client_dcb->session, querybuf, target->server())) @@ -1121,7 +1140,7 @@ bool handle_got_target(RWSplit *inst, RWSplitSession *rses, atomic_add_uint64(&inst->stats().n_queries, 1); - if (response == mxs::Backend::EXPECT_RESPONSE) + if (!rses->large_query && response == mxs::Backend::EXPECT_RESPONSE) { /** The server will reply to this command */ ss_dassert(target->get_reply_state() == REPLY_STATE_DONE); @@ -1145,6 +1164,12 @@ bool handle_got_target(RWSplit *inst, RWSplitSession *rses, } } + /** Store the previous target if we're processing a multi-packet query */ + if ((rses->large_query = large_query)) + { + rses->prev_target = target; + } + /** * If a READ ONLY transaction is ending set forced_node to NULL */ diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index 6d2c7e3d9..5768bf6bd 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -90,6 +90,8 @@ public: SRWBackendList backends; /**< List of backend servers */ SRWBackend current_master; /**< Current master server */ SRWBackend target_node; /**< The currently locked target node */ + SRWBackend prev_target; /**< The previous target where a query was sent */ + bool large_query; /**< Set to true when processing payloads >= 2^24 bytes */ Config rses_config; /**< copied config info from router instance */ int rses_nbackends; enum ld_state load_data_state; /**< Current load data state */