diff --git a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc index 087540241..ea982cdc2 100644 --- a/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc +++ b/server/modules/routing/readwritesplit/rwsplit_route_stmt.cc @@ -322,23 +322,7 @@ bool RWSplitSession::route_single_stmt(GWBUF* querybuf) if (!succp && should_migrate_trx(target)) { - MXS_INFO("Starting transaction migration from '%s' to '%s'", - m_current_master->name(), - target->name()); - - /** - * Stash the current query so that the transaction replay treats - * it as if the query was interrupted. - */ - m_current_query.copy_from(querybuf); - - /** - * After the transaction replay has been started, the rest of - * the query processing needs to be skipped. This is done to avoid - * the error logging done when no valid target is found for a query - * as well as to prevent retrying of queries in the wrong order. - */ - return start_trx_replay(); + return start_trx_migration(target, querybuf); } } @@ -362,15 +346,6 @@ bool RWSplitSession::route_single_stmt(GWBUF* querybuf) { // Target server was found and is in the correct state succp = handle_got_target(querybuf, target, store_stmt); - - if (succp && command == MXS_COM_STMT_EXECUTE && !is_locked_to_master()) - { - /** Track the targets of the COM_STMT_EXECUTE statements. This - * information is used to route all COM_STMT_FETCH commands - * to the same server where the COM_STMT_EXECUTE was done. */ - m_exec_map[stmt_id] = target; - MXS_INFO("COM_STMT_EXECUTE on %s: %s", target->name(), target->uri()); - } } } else if (can_retry_query() || can_continue_trx_replay()) @@ -995,6 +970,25 @@ bool RWSplitSession::should_migrate_trx(RWBackend* target) m_can_replay_trx; } +bool RWSplitSession::start_trx_migration(RWBackend* target, GWBUF* querybuf) +{ + MXS_INFO("Starting transaction migration from '%s' to '%s'", m_current_master->name(), target->name()); + + /** + * Stash the current query so that the transaction replay treats + * it as if the query was interrupted. + */ + m_current_query.copy_from(querybuf); + + /** + * After the transaction replay has been started, the rest of + * the query processing needs to be skipped. This is done to avoid + * the error logging done when no valid target is found for a query + * as well as to prevent retrying of queries in the wrong order. + */ + return start_trx_replay(); +} + /** * @brief Handle master is the target * @@ -1125,30 +1119,31 @@ GWBUF* RWSplitSession::add_prefix_wait_gtid(SERVER* server, GWBUF* origin) */ bool RWSplitSession::handle_got_target(GWBUF* querybuf, RWBackend* target, bool store) { - mxb_assert(target->in_use()); + mxb_assert_message(target->in_use(), "Target must be in use before routing to it"); + mxb_assert_message(!target->has_session_commands(), "The session command cursor must not be active"); + /** - * If the transaction is READ ONLY set forced_node to this backend. - * This SLAVE backend will be used until the COMMIT is seen. + * TODO: This effectively disables pipelining of queries, very bad for batch insert performance. Replace + * with proper, per server tracking of which responses need to be sent to the client. This would + * also solve MXS-2009 by speeding up session commands. */ + mxb_assert_message(target->get_reply_state() == REPLY_STATE_DONE || m_qc.large_query(), + "Node must be idle when routing queries to it"); + + MXS_INFO("Route query to %s: %s \t%s <", target->is_master() ? "master" : "slave", + target->name(), target->uri()); + if (!m_target_node && session_trx_is_read_only(m_client->session)) { + // Lock the session to this node until the read-only transaction ends m_target_node = target; } - MXS_INFO("Route query to %s: %s \t%s <", - target->is_master() ? "master" : "slave", - target->name(), - target->uri()); - - /** The session command cursor must not be active */ - mxb_assert(!target->has_session_commands()); - mxs::Backend::response_type response = mxs::Backend::NO_RESPONSE; uint8_t cmd = mxs_mysql_get_command(querybuf); GWBUF* send_buf = gwbuf_clone(querybuf); - if (m_config.causal_reads && cmd == COM_QUERY && !m_gtid_pos.empty() - && target->is_slave()) + if (m_config.causal_reads && cmd == COM_QUERY && !m_gtid_pos.empty() && target->is_slave()) { // Perform the causal read only when the query is routed to a slave send_buf = add_prefix_wait_gtid(target->server(), send_buf); @@ -1165,16 +1160,6 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, RWBackend* target, bool } bool large_query = is_large_query(querybuf); - - /** - * We should not be routing a query to a server that is busy processing a result. - * - * TODO: This effectively disables pipelining of queries, very bad for batch insert performance. Replace - * with proper, per server tracking of which responses need to be sent to the client. This would - * also solve MXS-2009 by speeding up session commands. - */ - mxb_assert(target->get_reply_state() == REPLY_STATE_DONE || m_qc.large_query()); - uint32_t orig_id = 0; if (!is_locked_to_master() && mxs_mysql_is_ps_command(cmd) && !m_qc.large_query()) @@ -1226,13 +1211,11 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, RWBackend* target, bool // Store the current target m_prev_target = target; - /** - * If a READ ONLY transaction is ending set forced_node to NULL - */ if (m_target_node && session_trx_is_read_only(m_client->session) && session_trx_is_ending(m_client->session)) { + // Read-only transaction is over, stop routing queries to a specific node m_target_node = nullptr; } } @@ -1247,5 +1230,14 @@ bool RWSplitSession::handle_got_target(GWBUF* querybuf, RWBackend* target, bool MXS_ERROR("Routing query failed."); } + if (success && cmd == MXS_COM_STMT_EXECUTE && !is_locked_to_master()) + { + /** Track the targets of the COM_STMT_EXECUTE statements. This + * information is used to route all COM_STMT_FETCH commands + * to the same server where the COM_STMT_EXECUTE was done. */ + m_exec_map[m_qc.current_route_info().stmt_id()] = target; + MXS_INFO("COM_STMT_EXECUTE on %s: %s", target->name(), target->uri()); + } + return success; } diff --git a/server/modules/routing/readwritesplit/rwsplitsession.cc b/server/modules/routing/readwritesplit/rwsplitsession.cc index d4b78761d..625655541 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.cc +++ b/server/modules/routing/readwritesplit/rwsplitsession.cc @@ -25,7 +25,7 @@ RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session, mxs::SRW : mxs::RouterSession(session) , m_backends(std::move(backends)) , m_raw_backends(sptr_vec_to_ptr_vec(m_backends)) - , m_current_master(master) + , m_current_master(nullptr) , m_target_node(nullptr) , m_prev_target(nullptr) , m_config(instance->config()) @@ -88,10 +88,8 @@ RWSplitSession* RWSplitSession::create(RWSplit* router, MXS_SESSION* session) void close_all_connections(PRWBackends& backends) { - for (PRWBackends::iterator it = backends.begin(); it != backends.end(); it++) + for (auto& backend : backends) { - RWBackend* backend = *it; - if (backend->in_use()) { backend->close(); @@ -133,31 +131,12 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf) return 1; } - if (m_query_queue == NULL - && (m_expected_responses == 0 - || m_qc.load_data_state() == QueryClassifier::LOAD_DATA_ACTIVE - || m_qc.large_query())) + if (can_route_queries()) { /** Gather the information required to make routing decisions */ - - QueryClassifier::current_target_t current_target; - - if (m_target_node == NULL) - { - current_target = QueryClassifier::CURRENT_TARGET_UNDEFINED; - } - else if (m_target_node == m_current_master) - { - current_target = QueryClassifier::CURRENT_TARGET_MASTER; - } - else - { - current_target = QueryClassifier::CURRENT_TARGET_SLAVE; - } - if (!m_qc.large_query()) { - m_qc.update_route_info(current_target, querybuf); + m_qc.update_route_info(get_current_target(), querybuf); } /** No active or pending queries */ @@ -168,15 +147,12 @@ int32_t RWSplitSession::routeQuery(GWBUF* querybuf) } else { - /** - * We are already processing a request from the client. Store the - * new query and wait for the previous one to complete. - */ + // Already busy executing a query, put the query in a queue and route it later mxb_assert(m_expected_responses > 0 || m_query_queue); - MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command", - gwbuf_length(querybuf), - GWBUF_DATA(querybuf)[4], - m_expected_responses); + MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command: %s", + gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4], m_expected_responses, + mxs::extract_sql(querybuf, 1024).c_str()); + m_query_queue = gwbuf_append(m_query_queue, querybuf); querybuf = NULL; rval = 1; diff --git a/server/modules/routing/readwritesplit/rwsplitsession.hh b/server/modules/routing/readwritesplit/rwsplitsession.hh index f673739a4..5b2df4f54 100644 --- a/server/modules/routing/readwritesplit/rwsplitsession.hh +++ b/server/modules/routing/readwritesplit/rwsplitsession.hh @@ -168,6 +168,7 @@ private: bool should_replace_master(mxs::RWBackend* target); void replace_master(mxs::RWBackend* target); bool should_migrate_trx(mxs::RWBackend* target); + bool start_trx_migration(mxs::RWBackend* target, GWBUF* querybuf); void log_master_routing_failure(bool found, mxs::RWBackend* old_master, mxs::RWBackend* curr_master); @@ -266,6 +267,34 @@ private: return buflen == MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN; } + inline bool can_route_queries() const + { + return m_query_queue == NULL + && (m_expected_responses == 0 + || m_qc.load_data_state() == mxs::QueryClassifier::LOAD_DATA_ACTIVE + || m_qc.large_query()); + } + + inline mxs::QueryClassifier::current_target_t get_current_target() const + { + mxs::QueryClassifier::current_target_t current_target; + + if (m_target_node == NULL) + { + current_target = mxs::QueryClassifier::CURRENT_TARGET_UNDEFINED; + } + else if (m_target_node == m_current_master) + { + current_target = mxs::QueryClassifier::CURRENT_TARGET_MASTER; + } + else + { + current_target = mxs::QueryClassifier::CURRENT_TARGET_SLAVE; + } + + return current_target; + } + void update_trx_statistics() { if (session_trx_is_ending(m_client->session))