The resultset processing for MySQL requires some extra work as it lacks the proper SERVER_MORE_RESULTS_EXIST flag in the last EOF packet. Instead, the first EOF packet has the SERVER_PS_OUT_PARAMS flag which needs to be interpreted as a SERVER_MORE_RESULTS_EXIST flag for the second EOF packet. Also corrected the EOF packet handling to do the flag checks in the code that deals with the EOF packets. As the modutil_state parameter is now used for more than large packet tracking, the correct solution is to store this state object in the readwritesplit session instead of interpreting it to a boolean value.
		
			
				
	
	
		
			1470 lines
		
	
	
		
			50 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1470 lines
		
	
	
		
			50 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
 * Copyright (c) 2016 MariaDB Corporation Ab
 | 
						|
 *
 | 
						|
 * Use of this software is governed by the Business Source License included
 | 
						|
 * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | 
						|
 *
 | 
						|
 * Change Date: 2020-01-01
 | 
						|
 *
 | 
						|
 * On the date above, in accordance with the Business Source License, use
 | 
						|
 * of this software will be governed by version 2 or later of the General
 | 
						|
 * Public License.
 | 
						|
 */
 | 
						|
 | 
						|
#include "readwritesplit.hh"
 | 
						|
 | 
						|
#include <inttypes.h>
 | 
						|
#include <stdint.h>
 | 
						|
#include <stdio.h>
 | 
						|
#include <stdlib.h>
 | 
						|
#include <string.h>
 | 
						|
#include <strings.h>
 | 
						|
#include <cmath>
 | 
						|
#include <new>
 | 
						|
 | 
						|
#include <maxscale/alloc.h>
 | 
						|
#include <maxscale/dcb.h>
 | 
						|
#include <maxscale/log_manager.h>
 | 
						|
#include <maxscale/modinfo.h>
 | 
						|
#include <maxscale/modutil.h>
 | 
						|
#include <maxscale/query_classifier.h>
 | 
						|
#include <maxscale/router.h>
 | 
						|
#include <maxscale/spinlock.h>
 | 
						|
#include <maxscale/mysql_utils.h>
 | 
						|
 | 
						|
#include "rwsplit_internal.hh"
 | 
						|
#include "rwsplitsession.hh"
 | 
						|
 | 
						|
/**
 | 
						|
 * The entry points for the read/write query splitting router module.
 | 
						|
 *
 | 
						|
 * This file contains the entry points that comprise the API to the read
 | 
						|
 * write query splitting router. It also contains functions that are
 | 
						|
 * directly called by the entry point functions. Some of these are used by
 | 
						|
 * functions in other modules of the read write split router, others are
 | 
						|
 * used only within this module.
 | 
						|
 */
 | 
						|
 | 
						|
/** Maximum number of slaves */
 | 
						|
#define MAX_SLAVE_COUNT "255"
 | 
						|
 | 
						|
/*
 | 
						|
 * The functions that implement the router module API
 | 
						|
 */
 | 
						|
 | 
						|
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *session, GWBUF *queue);
 | 
						|
 | 
						|
static bool rwsplit_process_router_options(RWSplit *router,
 | 
						|
                                           char **options);
 | 
						|
static void handle_error_reply_client(MXS_SESSION *ses, RWSplitSession *rses,
 | 
						|
                                      DCB *backend_dcb, GWBUF *errmsg);
 | 
						|
static bool handle_error_new_connection(RWSplit *inst,
 | 
						|
                                        RWSplitSession **rses,
 | 
						|
                                        DCB *backend_dcb, GWBUF *errmsg);
 | 
						|
static bool route_stored_query(RWSplitSession *rses);
 | 
						|
 | 
						|
/**
 | 
						|
 * Internal functions
 | 
						|
 */
 | 
						|
 | 
						|
/*
 | 
						|
 * @brief Get the maximum replication lag for this router
 | 
						|
 *
 | 
						|
 * @param   rses    Router client session
 | 
						|
 * @return  Replication lag from configuration or very large number
 | 
						|
 */
 | 
						|
int rses_get_max_replication_lag(RWSplitSession *rses)
 | 
						|
{
 | 
						|
    int conf_max_rlag;
 | 
						|
 | 
						|
    CHK_CLIENT_RSES(rses);
 | 
						|
 | 
						|
    /** if there is no configured value, then longest possible int is used */
 | 
						|
    if (rses->rses_config.max_slave_replication_lag > 0)
 | 
						|
    {
 | 
						|
        conf_max_rlag = rses->rses_config.max_slave_replication_lag;
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        conf_max_rlag = ~(1 << 31);
 | 
						|
    }
 | 
						|
 | 
						|
    return conf_max_rlag;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Find a back end reference that matches the given DCB
 | 
						|
 *
 | 
						|
 * Finds out if there is a backend reference pointing at the DCB given as
 | 
						|
 * parameter.
 | 
						|
 *
 | 
						|
 * @param rses  router client session
 | 
						|
 * @param dcb   DCB
 | 
						|
 *
 | 
						|
 * @return backend reference pointer if succeed or NULL
 | 
						|
 */
 | 
						|
 | 
						|
static inline SRWBackend& get_backend_from_dcb(RWSplitSession *rses, DCB *dcb)
 | 
						|
{
 | 
						|
    ss_dassert(dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
 | 
						|
    CHK_DCB(dcb);
 | 
						|
    CHK_CLIENT_RSES(rses);
 | 
						|
 | 
						|
    for (SRWBackendList::iterator it = rses->backends.begin();
 | 
						|
         it != rses->backends.end(); it++)
 | 
						|
    {
 | 
						|
        SRWBackend& backend = *it;
 | 
						|
 | 
						|
        if (backend->dcb() == dcb)
 | 
						|
        {
 | 
						|
            return backend;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    /** We should always have a valid backend reference and in case we don't,
 | 
						|
     * something is terribly wrong. */
 | 
						|
    MXS_ALERT("No reference to DCB %p found, aborting.", dcb);
 | 
						|
    raise(SIGABRT);
 | 
						|
 | 
						|
    // To make the compiler happy, we return a reference to a static value.
 | 
						|
    static SRWBackend this_should_not_happen;
 | 
						|
    return this_should_not_happen;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Process router options
 | 
						|
 *
 | 
						|
 * @param router Router instance
 | 
						|
 * @param options Router options
 | 
						|
 * @return True on success, false if a configuration error was found
 | 
						|
 */
 | 
						|
static bool rwsplit_process_router_options(Config& config,
 | 
						|
                                           char **options)
 | 
						|
{
 | 
						|
    int i;
 | 
						|
    char *value;
 | 
						|
    select_criteria_t c;
 | 
						|
 | 
						|
    if (options == NULL)
 | 
						|
    {
 | 
						|
        return true;
 | 
						|
    }
 | 
						|
 | 
						|
    MXS_WARNING("Router options for readwritesplit are deprecated.");
 | 
						|
 | 
						|
    bool success = true;
 | 
						|
 | 
						|
    for (i = 0; options[i]; i++)
 | 
						|
    {
 | 
						|
        if ((value = strchr(options[i], '=')) == NULL)
 | 
						|
        {
 | 
						|
            MXS_ERROR("Unsupported router option \"%s\" for readwritesplit router.", options[i]);
 | 
						|
            success = false;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            *value = 0;
 | 
						|
            value++;
 | 
						|
            if (strcmp(options[i], "slave_selection_criteria") == 0)
 | 
						|
            {
 | 
						|
                c = GET_SELECT_CRITERIA(value);
 | 
						|
                ss_dassert(c == LEAST_GLOBAL_CONNECTIONS ||
 | 
						|
                           c == LEAST_ROUTER_CONNECTIONS || c == LEAST_BEHIND_MASTER ||
 | 
						|
                           c == LEAST_CURRENT_OPERATIONS || c == UNDEFINED_CRITERIA);
 | 
						|
 | 
						|
                if (c == UNDEFINED_CRITERIA)
 | 
						|
                {
 | 
						|
                    MXS_ERROR("Unknown slave selection criteria \"%s\". "
 | 
						|
                              "Allowed values are LEAST_GLOBAL_CONNECTIONS, "
 | 
						|
                              "LEAST_ROUTER_CONNECTIONS, LEAST_BEHIND_MASTER,"
 | 
						|
                              "and LEAST_CURRENT_OPERATIONS.",
 | 
						|
                              STRCRITERIA(config.slave_selection_criteria));
 | 
						|
                    success = false;
 | 
						|
                }
 | 
						|
                else
 | 
						|
                {
 | 
						|
                    config.slave_selection_criteria = c;
 | 
						|
                }
 | 
						|
            }
 | 
						|
            else if (strcmp(options[i], "max_sescmd_history") == 0)
 | 
						|
            {
 | 
						|
                config.max_sescmd_history = atoi(value);
 | 
						|
            }
 | 
						|
            else if (strcmp(options[i], "disable_sescmd_history") == 0)
 | 
						|
            {
 | 
						|
                config.disable_sescmd_history = config_truth_value(value);
 | 
						|
            }
 | 
						|
            else if (strcmp(options[i], "master_accept_reads") == 0)
 | 
						|
            {
 | 
						|
                config.master_accept_reads = config_truth_value(value);
 | 
						|
            }
 | 
						|
            else if (strcmp(options[i], "strict_multi_stmt") == 0)
 | 
						|
            {
 | 
						|
                config.strict_multi_stmt = config_truth_value(value);
 | 
						|
            }
 | 
						|
            else if (strcmp(options[i], "strict_sp_calls") == 0)
 | 
						|
            {
 | 
						|
                config.strict_sp_calls = config_truth_value(value);
 | 
						|
            }
 | 
						|
            else if (strcmp(options[i], "retry_failed_reads") == 0)
 | 
						|
            {
 | 
						|
                config.retry_failed_reads = config_truth_value(value);
 | 
						|
            }
 | 
						|
            else if (strcmp(options[i], "master_failure_mode") == 0)
 | 
						|
            {
 | 
						|
                if (strcasecmp(value, "fail_instantly") == 0)
 | 
						|
                {
 | 
						|
                    config.master_failure_mode = RW_FAIL_INSTANTLY;
 | 
						|
                }
 | 
						|
                else if (strcasecmp(value, "fail_on_write") == 0)
 | 
						|
                {
 | 
						|
                    config.master_failure_mode = RW_FAIL_ON_WRITE;
 | 
						|
                }
 | 
						|
                else if (strcasecmp(value, "error_on_write") == 0)
 | 
						|
                {
 | 
						|
                    config.master_failure_mode = RW_ERROR_ON_WRITE;
 | 
						|
                }
 | 
						|
                else
 | 
						|
                {
 | 
						|
                    MXS_ERROR("Unknown value for 'master_failure_mode': %s", value);
 | 
						|
                    success = false;
 | 
						|
                }
 | 
						|
            }
 | 
						|
            else
 | 
						|
            {
 | 
						|
                MXS_ERROR("Unknown router option \"%s=%s\" for readwritesplit router.",
 | 
						|
                          options[i], value);
 | 
						|
                success = false;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    } /*< for */
 | 
						|
 | 
						|
    return success;
 | 
						|
}
 | 
						|
 | 
						|
// TODO: Don't process parameters in readwritesplit
 | 
						|
static bool handle_max_slaves(Config& config, const char *str)
 | 
						|
{
 | 
						|
    bool rval = true;
 | 
						|
    char *endptr;
 | 
						|
    int val = strtol(str, &endptr, 10);
 | 
						|
 | 
						|
    if (*endptr == '%' && *(endptr + 1) == '\0')
 | 
						|
    {
 | 
						|
        config.rw_max_slave_conn_percent = val;
 | 
						|
        config.max_slave_connections = 0;
 | 
						|
    }
 | 
						|
    else if (*endptr == '\0')
 | 
						|
    {
 | 
						|
        config.max_slave_connections = val;
 | 
						|
        config.rw_max_slave_conn_percent = 0;
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        MXS_ERROR("Invalid value for 'max_slave_connections': %s", str);
 | 
						|
        rval = false;
 | 
						|
    }
 | 
						|
 | 
						|
    return rval;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Handle an error reply for a client
 | 
						|
 *
 | 
						|
 * @param ses           Session
 | 
						|
 * @param rses          Router session
 | 
						|
 * @param backend_dcb   DCB for the backend server that has failed
 | 
						|
 * @param errmsg        GWBUF containing the error message
 | 
						|
 */
 | 
						|
static void handle_error_reply_client(MXS_SESSION *ses, RWSplitSession *rses,
 | 
						|
                                      DCB *backend_dcb, GWBUF *errmsg)
 | 
						|
{
 | 
						|
 | 
						|
    mxs_session_state_t sesstate = ses->state;
 | 
						|
    DCB *client_dcb = ses->client_dcb;
 | 
						|
 | 
						|
    SRWBackend& backend = get_backend_from_dcb(rses, backend_dcb);
 | 
						|
 | 
						|
    backend->close();
 | 
						|
 | 
						|
    if (sesstate == SESSION_STATE_ROUTER_READY)
 | 
						|
    {
 | 
						|
        CHK_DCB(client_dcb);
 | 
						|
        client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
static bool reroute_stored_statement(RWSplitSession *rses, const SRWBackend& old, GWBUF *stored)
 | 
						|
{
 | 
						|
    bool success = false;
 | 
						|
 | 
						|
    if (!session_trx_is_active(rses->client_dcb->session))
 | 
						|
    {
 | 
						|
        /**
 | 
						|
         * Only try to retry the read if autocommit is enabled and we are
 | 
						|
         * outside of a transaction
 | 
						|
         */
 | 
						|
        for (SRWBackendList::iterator it = rses->backends.begin();
 | 
						|
             it != rses->backends.end(); it++)
 | 
						|
        {
 | 
						|
            SRWBackend& backend = *it;
 | 
						|
 | 
						|
            if (backend->in_use() && backend != old &&
 | 
						|
                !backend->is_master() &&
 | 
						|
                backend->is_slave())
 | 
						|
            {
 | 
						|
                /** Found a valid candidate; a non-master slave that's in use */
 | 
						|
                if (backend->write(stored))
 | 
						|
                {
 | 
						|
                    MXS_INFO("Retrying failed read at '%s'.", backend->name());
 | 
						|
                    ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
 | 
						|
                    LOG_RS(backend, REPLY_STATE_START);
 | 
						|
                    backend->set_reply_state(REPLY_STATE_START);
 | 
						|
                    rses->expected_responses++;
 | 
						|
                    success = true;
 | 
						|
                    break;
 | 
						|
                }
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        if (!success && rses->current_master && rses->current_master->in_use())
 | 
						|
        {
 | 
						|
            /**
 | 
						|
             * Either we failed to write to the slave or no valid slave was found.
 | 
						|
             * Try to retry the read on the master.
 | 
						|
             */
 | 
						|
            if (rses->current_master->write(stored))
 | 
						|
            {
 | 
						|
                MXS_INFO("Retrying failed read at '%s'.", rses->current_master->name());
 | 
						|
                LOG_RS(rses->current_master, REPLY_STATE_START);
 | 
						|
                ss_dassert(rses->current_master->get_reply_state() == REPLY_STATE_DONE);
 | 
						|
                rses->current_master->set_reply_state(REPLY_STATE_START);
 | 
						|
                rses->expected_responses++;
 | 
						|
                success = true;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return success;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * Check if there is backend reference pointing at failed DCB, and reset its
 | 
						|
 * flags. Then clear DCB's callback and finally : try to find replacement(s)
 | 
						|
 * for failed slave(s).
 | 
						|
 *
 | 
						|
 * This must be called with router lock.
 | 
						|
 *
 | 
						|
 * @param inst      router instance
 | 
						|
 * @param rses      router client session
 | 
						|
 * @param dcb       failed DCB
 | 
						|
 * @param errmsg    error message which is sent to client if it is waiting
 | 
						|
 *
 | 
						|
 * @return true if there are enough backend connections to continue, false if
 | 
						|
 * not
 | 
						|
 */
 | 
						|
static bool handle_error_new_connection(RWSplit *inst,
 | 
						|
                                        RWSplitSession **rses,
 | 
						|
                                        DCB *backend_dcb, GWBUF *errmsg)
 | 
						|
{
 | 
						|
    RWSplitSession *myrses = *rses;
 | 
						|
    SRWBackend& backend = get_backend_from_dcb(myrses, backend_dcb);
 | 
						|
 | 
						|
    MXS_SESSION* ses = backend_dcb->session;
 | 
						|
    bool route_stored = false;
 | 
						|
    CHK_SESSION(ses);
 | 
						|
 | 
						|
    if (backend->is_waiting_result())
 | 
						|
    {
 | 
						|
        ss_dassert(myrses->expected_responses > 0);
 | 
						|
        myrses->expected_responses--;
 | 
						|
 | 
						|
        /**
 | 
						|
         * A query was sent through the backend and it is waiting for a reply.
 | 
						|
         * Try to reroute the statement to a working server or send an error
 | 
						|
         * to the client.
 | 
						|
         */
 | 
						|
        GWBUF *stored = NULL;
 | 
						|
        const SERVER *target = NULL;
 | 
						|
        if (!session_take_stmt(backend_dcb->session, &stored, &target) ||
 | 
						|
            target != backend->backend()->server ||
 | 
						|
            !reroute_stored_statement(*rses, backend, stored))
 | 
						|
        {
 | 
						|
            /**
 | 
						|
             * We failed to route the stored statement or no statement was
 | 
						|
             * stored for this server. Either way we can safely free the buffer
 | 
						|
             * and decrement the expected response count.
 | 
						|
             */
 | 
						|
            gwbuf_free(stored);
 | 
						|
 | 
						|
            if (backend->session_command_count() == 0)
 | 
						|
            {
 | 
						|
                /**
 | 
						|
                 * The backend was executing a command that requires a reply.
 | 
						|
                 * Send an error to the client to let it know the query has
 | 
						|
                 * failed.
 | 
						|
                 */
 | 
						|
                DCB *client_dcb = ses->client_dcb;
 | 
						|
                client_dcb->func.write(client_dcb, gwbuf_clone(errmsg));
 | 
						|
            }
 | 
						|
 | 
						|
            if (myrses->expected_responses == 0)
 | 
						|
            {
 | 
						|
                /** The response from this server was the last one, try to
 | 
						|
                 * route all queued queries */
 | 
						|
                route_stored = true;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    /** Close the current connection. This needs to be done before routing any
 | 
						|
     * of the stored queries. If we route a stored query before the connection
 | 
						|
     * is closed, it's possible that the routing logic will pick the failed
 | 
						|
     * server as the target. */
 | 
						|
    backend->close();
 | 
						|
 | 
						|
    if (route_stored)
 | 
						|
    {
 | 
						|
        route_stored_query(myrses);
 | 
						|
    }
 | 
						|
 | 
						|
    int max_nslaves = inst->max_slave_count();
 | 
						|
    bool succp;
 | 
						|
    /**
 | 
						|
     * Try to get replacement slave or at least the minimum
 | 
						|
     * number of slave connections for router session.
 | 
						|
     */
 | 
						|
    if (inst->config().disable_sescmd_history)
 | 
						|
    {
 | 
						|
        succp = inst->have_enough_servers();
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        succp = select_connect_backend_servers(myrses->rses_nbackends, max_nslaves,
 | 
						|
                                               ses, inst->config(), myrses->backends,
 | 
						|
                                               myrses->current_master, &myrses->sescmd_list,
 | 
						|
                                               &myrses->expected_responses,
 | 
						|
                                               connection_type::SLAVE);
 | 
						|
    }
 | 
						|
 | 
						|
    return succp;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Route a stored query
 | 
						|
 *
 | 
						|
 * When multiple queries are executed in a pipeline fashion, the readwritesplit
 | 
						|
 * stores the extra queries in a queue. This queue is emptied after reading a
 | 
						|
 * reply from the backend server.
 | 
						|
 *
 | 
						|
 * @param rses Router client session
 | 
						|
 * @return True if a stored query was routed successfully
 | 
						|
 */
 | 
						|
static bool route_stored_query(RWSplitSession *rses)
 | 
						|
{
 | 
						|
    bool rval = true;
 | 
						|
 | 
						|
    /** Loop over the stored statements as long as the routeQuery call doesn't
 | 
						|
     * append more data to the queue. If it appends data to the queue, we need
 | 
						|
     * to wait for a response before attempting another reroute */
 | 
						|
    while (rses->query_queue)
 | 
						|
    {
 | 
						|
        GWBUF* query_queue = modutil_get_next_MySQL_packet(&rses->query_queue);
 | 
						|
        query_queue = gwbuf_make_contiguous(query_queue);
 | 
						|
 | 
						|
        /** Store the query queue locally for the duration of the routeQuery call.
 | 
						|
         * This prevents recursive calls into this function. */
 | 
						|
        GWBUF *temp_storage = rses->query_queue;
 | 
						|
        rses->query_queue = NULL;
 | 
						|
 | 
						|
        // TODO: Move the handling of queued queries to the client protocol
 | 
						|
        // TODO: module where the command tracking is done automatically.
 | 
						|
        uint8_t cmd = mxs_mysql_get_command(query_queue);
 | 
						|
        mysql_protocol_set_current_command(rses->client_dcb, (mxs_mysql_cmd_t)cmd);
 | 
						|
 | 
						|
        if (!routeQuery((MXS_ROUTER*)rses->router, (MXS_ROUTER_SESSION*)rses, query_queue))
 | 
						|
        {
 | 
						|
            rval = false;
 | 
						|
            MXS_ERROR("Failed to route queued query.");
 | 
						|
        }
 | 
						|
 | 
						|
        if (rses->query_queue == NULL)
 | 
						|
        {
 | 
						|
            /** Query successfully routed and no responses are expected */
 | 
						|
            rses->query_queue = temp_storage;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            /** Routing was stopped, we need to wait for a response before retrying */
 | 
						|
            rses->query_queue = gwbuf_append(temp_storage, rses->query_queue);
 | 
						|
            break;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return rval;
 | 
						|
}
 | 
						|
 | 
						|
static inline bool have_next_packet(GWBUF* buffer)
 | 
						|
{
 | 
						|
    uint32_t len = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)) + MYSQL_HEADER_LEN;
 | 
						|
    return gwbuf_length(buffer) > len;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Check if we have received a complete reply from the backend
 | 
						|
 *
 | 
						|
 * @param backend Backend reference
 | 
						|
 * @param buffer  Buffer containing the response
 | 
						|
 *
 | 
						|
 * @return True if the complete response has been received
 | 
						|
 */
 | 
						|
bool reply_is_complete(SRWBackend& backend, GWBUF *buffer)
 | 
						|
{
 | 
						|
    if (backend->current_command() == MXS_COM_STMT_FETCH)
 | 
						|
    {
 | 
						|
        bool more = false;
 | 
						|
        modutil_state state = backend->get_modutil_state();
 | 
						|
        int n_eof = modutil_count_signal_packets(buffer, 0, &more, &state);
 | 
						|
        backend->set_modutil_state(state);
 | 
						|
 | 
						|
        // If the server responded with an error, n_eof > 0
 | 
						|
        if (n_eof > 0 || backend->consume_fetched_rows(buffer))
 | 
						|
        {
 | 
						|
            LOG_RS(backend, REPLY_STATE_DONE);
 | 
						|
            backend->set_reply_state(REPLY_STATE_DONE);
 | 
						|
        }
 | 
						|
    }
 | 
						|
    else if (backend->current_command() == MXS_COM_STATISTICS)
 | 
						|
    {
 | 
						|
        // COM_STATISTICS returns a single string and thus requires special handling
 | 
						|
        LOG_RS(backend, REPLY_STATE_DONE);
 | 
						|
        backend->set_reply_state(REPLY_STATE_DONE);
 | 
						|
    }
 | 
						|
    else if (backend->get_reply_state() == REPLY_STATE_START &&
 | 
						|
        (!mxs_mysql_is_result_set(buffer) || GWBUF_IS_COLLECTED_RESULT(buffer)))
 | 
						|
    {
 | 
						|
        if (GWBUF_IS_COLLECTED_RESULT(buffer) ||
 | 
						|
            backend->current_command() == MXS_COM_STMT_PREPARE ||
 | 
						|
            !mxs_mysql_is_ok_packet(buffer) ||
 | 
						|
            !mxs_mysql_more_results_after_ok(buffer))
 | 
						|
        {
 | 
						|
            /** Not a result set, we have the complete response */
 | 
						|
            LOG_RS(backend, REPLY_STATE_DONE);
 | 
						|
            backend->set_reply_state(REPLY_STATE_DONE);
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            // This is an OK packet and more results will follow
 | 
						|
            ss_dassert(mxs_mysql_is_ok_packet(buffer) &&
 | 
						|
                mxs_mysql_more_results_after_ok(buffer));
 | 
						|
 | 
						|
            if (have_next_packet(buffer))
 | 
						|
            {
 | 
						|
                LOG_RS(backend, REPLY_STATE_RSET_COLDEF);
 | 
						|
                backend->set_reply_state(REPLY_STATE_RSET_COLDEF);
 | 
						|
                return reply_is_complete(backend, buffer);
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        bool more = false;
 | 
						|
        modutil_state state = backend->get_modutil_state();
 | 
						|
        int n_old_eof = backend->get_reply_state() == REPLY_STATE_RSET_ROWS ? 1 : 0;
 | 
						|
        int n_eof = modutil_count_signal_packets(buffer, n_old_eof, &more, &state);
 | 
						|
        backend->set_modutil_state(state);
 | 
						|
 | 
						|
        if (n_eof > 2)
 | 
						|
        {
 | 
						|
            /**
 | 
						|
             * We have multiple results in the buffer, we only care about
 | 
						|
             * the state of the last one. Skip the complete result sets and act
 | 
						|
             * like we're processing a single result set.
 | 
						|
             */
 | 
						|
            n_eof = n_eof % 2 ? 1 : 2;
 | 
						|
        }
 | 
						|
 | 
						|
        if (n_eof == 0)
 | 
						|
        {
 | 
						|
            /** Waiting for the EOF packet after the column definitions */
 | 
						|
            LOG_RS(backend, REPLY_STATE_RSET_COLDEF);
 | 
						|
            backend->set_reply_state(REPLY_STATE_RSET_COLDEF);
 | 
						|
        }
 | 
						|
        else if (n_eof == 1 && backend->current_command() != MXS_COM_FIELD_LIST)
 | 
						|
        {
 | 
						|
            /** Waiting for the EOF packet after the rows */
 | 
						|
            LOG_RS(backend, REPLY_STATE_RSET_ROWS);
 | 
						|
            backend->set_reply_state(REPLY_STATE_RSET_ROWS);
 | 
						|
 | 
						|
            if (backend->is_opening_cursor())
 | 
						|
            {
 | 
						|
                backend->set_cursor_opened();
 | 
						|
                MXS_INFO("Cursor successfully opened");
 | 
						|
                LOG_RS(backend, REPLY_STATE_DONE);
 | 
						|
                backend->set_reply_state(REPLY_STATE_DONE);
 | 
						|
            }
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            /** We either have a complete result set or a response to
 | 
						|
             * a COM_FIELD_LIST command */
 | 
						|
            ss_dassert(n_eof == 2 || (n_eof == 1 && backend->current_command() == MXS_COM_FIELD_LIST));
 | 
						|
            LOG_RS(backend, REPLY_STATE_DONE);
 | 
						|
            backend->set_reply_state(REPLY_STATE_DONE);
 | 
						|
 | 
						|
            if (more)
 | 
						|
            {
 | 
						|
                /** The server will send more resultsets */
 | 
						|
                LOG_RS(backend, REPLY_STATE_START);
 | 
						|
                backend->set_reply_state(REPLY_STATE_START);
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return backend->get_reply_state() == REPLY_STATE_DONE;
 | 
						|
}
 | 
						|
 | 
						|
void close_all_connections(SRWBackendList& backends)
 | 
						|
{
 | 
						|
    for (SRWBackendList::iterator it = backends.begin(); it != backends.end(); it++)
 | 
						|
    {
 | 
						|
        SRWBackend& backend = *it;
 | 
						|
 | 
						|
        if (backend->in_use())
 | 
						|
        {
 | 
						|
            backend->close();
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
void check_and_log_backend_state(const SRWBackend& backend, DCB* problem_dcb)
 | 
						|
{
 | 
						|
    if (backend)
 | 
						|
    {
 | 
						|
        /** This is a valid DCB for a backend ref */
 | 
						|
        if (backend->in_use() && backend->dcb() == problem_dcb)
 | 
						|
        {
 | 
						|
            ss_dassert(false);
 | 
						|
            MXS_ERROR("Backend '%s' is still in use and points to the problem DCB.",
 | 
						|
                      backend->name());
 | 
						|
        }
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        const char *remote = problem_dcb->state == DCB_STATE_POLLING &&
 | 
						|
                             problem_dcb->server ? problem_dcb->server->unique_name : "CLOSED";
 | 
						|
 | 
						|
        MXS_ERROR("DCB connected to '%s' is not in use by the router "
 | 
						|
                  "session, not closing it. DCB is in state '%s'",
 | 
						|
                  remote, STRDCBSTATE(problem_dcb->state));
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
RWSplit::RWSplit(SERVICE* service, const Config& config):
 | 
						|
    m_service(service),
 | 
						|
    m_config(config)
 | 
						|
{
 | 
						|
}
 | 
						|
 | 
						|
RWSplit::~RWSplit()
 | 
						|
{
 | 
						|
}
 | 
						|
 | 
						|
SERVICE* RWSplit::service() const
 | 
						|
{
 | 
						|
    return m_service;
 | 
						|
}
 | 
						|
 | 
						|
const Config& RWSplit::config() const
 | 
						|
{
 | 
						|
    return m_config;
 | 
						|
}
 | 
						|
 | 
						|
Stats& RWSplit::stats()
 | 
						|
{
 | 
						|
    return m_stats;
 | 
						|
}
 | 
						|
 | 
						|
int RWSplit::max_slave_count() const
 | 
						|
{
 | 
						|
    int router_nservers = m_service->n_dbref;
 | 
						|
    int conf_max_nslaves = m_config.max_slave_connections > 0 ?
 | 
						|
                           m_config.max_slave_connections :
 | 
						|
                           (router_nservers * m_config.rw_max_slave_conn_percent) / 100;
 | 
						|
    return MXS_MIN(router_nservers - 1, MXS_MAX(1, conf_max_nslaves));
 | 
						|
}
 | 
						|
 | 
						|
bool RWSplit::have_enough_servers() const
 | 
						|
{
 | 
						|
    bool succp = true;
 | 
						|
    const int min_nsrv = 1;
 | 
						|
    const int router_nsrv = m_service->n_dbref;
 | 
						|
 | 
						|
    int n_serv = MXS_MAX(m_config.max_slave_connections,
 | 
						|
                         (router_nsrv * m_config.rw_max_slave_conn_percent) / 100);
 | 
						|
 | 
						|
    /** With too few servers session is not created */
 | 
						|
    if (router_nsrv < min_nsrv || n_serv < min_nsrv)
 | 
						|
    {
 | 
						|
        if (router_nsrv < min_nsrv)
 | 
						|
        {
 | 
						|
            MXS_ERROR("Unable to start %s service. There are "
 | 
						|
                      "too few backend servers available. Found %d "
 | 
						|
                      "when %d is required.", m_service->name, router_nsrv, min_nsrv);
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            int pct = m_config.rw_max_slave_conn_percent / 100;
 | 
						|
            int nservers = router_nsrv * pct;
 | 
						|
 | 
						|
            if (m_config.max_slave_connections < min_nsrv)
 | 
						|
            {
 | 
						|
                MXS_ERROR("Unable to start %s service. There are "
 | 
						|
                          "too few backend servers configured in "
 | 
						|
                          "MaxScale.cnf. Found %d when %d is required.",
 | 
						|
                          m_service->name, m_config.max_slave_connections, min_nsrv);
 | 
						|
            }
 | 
						|
            if (nservers < min_nsrv)
 | 
						|
            {
 | 
						|
                double dbgpct = ((double)min_nsrv / (double)router_nsrv) * 100.0;
 | 
						|
                MXS_ERROR("Unable to start %s service. There are "
 | 
						|
                          "too few backend servers configured in "
 | 
						|
                          "MaxScale.cnf. Found %d%% when at least %.0f%% "
 | 
						|
                          "would be required.", m_service->name,
 | 
						|
                          m_config.rw_max_slave_conn_percent, dbgpct);
 | 
						|
            }
 | 
						|
        }
 | 
						|
        succp = false;
 | 
						|
    }
 | 
						|
 | 
						|
    return succp;
 | 
						|
}
 | 
						|
 | 
						|
RWSplitSession::RWSplitSession(RWSplit* instance, MXS_SESSION* session,
 | 
						|
                               const SRWBackendList& backends,
 | 
						|
                               const SRWBackend& master):
 | 
						|
    rses_chk_top(CHK_NUM_ROUTER_SES),
 | 
						|
    rses_closed(false),
 | 
						|
    backends(backends),
 | 
						|
    current_master(master),
 | 
						|
    large_query(false),
 | 
						|
    rses_config(instance->config()),
 | 
						|
    rses_nbackends(instance->service()->n_dbref),
 | 
						|
    load_data_state(LOAD_DATA_INACTIVE),
 | 
						|
    have_tmp_tables(false),
 | 
						|
    rses_load_data_sent(0),
 | 
						|
    client_dcb(session->client_dcb),
 | 
						|
    sescmd_count(1), // Needs to be a positive number to work
 | 
						|
    expected_responses(0),
 | 
						|
    query_queue(NULL),
 | 
						|
    router(instance),
 | 
						|
    sent_sescmd(0),
 | 
						|
    recv_sescmd(0),
 | 
						|
    rses_chk_tail(CHK_NUM_ROUTER_SES)
 | 
						|
{
 | 
						|
    if (rses_config.rw_max_slave_conn_percent)
 | 
						|
    {
 | 
						|
        int n_conn = 0;
 | 
						|
        double pct = (double)rses_config.rw_max_slave_conn_percent / 100.0;
 | 
						|
        n_conn = MXS_MAX(floor((double)rses_nbackends * pct), 1);
 | 
						|
        rses_config.max_slave_connections = n_conn;
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
RWSplitSession* RWSplitSession::create(RWSplit* router, MXS_SESSION* session)
 | 
						|
{
 | 
						|
    RWSplitSession* rses = NULL;
 | 
						|
 | 
						|
    if (router->have_enough_servers())
 | 
						|
    {
 | 
						|
        SRWBackendList backends;
 | 
						|
 | 
						|
        for (SERVER_REF *sref = router->service()->dbref; sref; sref = sref->next)
 | 
						|
        {
 | 
						|
            if (sref->active)
 | 
						|
            {
 | 
						|
                backends.push_back(SRWBackend(new RWBackend(sref)));
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        /**
 | 
						|
         * At least the master must be found if the router is in the strict mode.
 | 
						|
         * If sessions without master are allowed, only a slave must be found.
 | 
						|
         */
 | 
						|
 | 
						|
        SRWBackend master;
 | 
						|
 | 
						|
        if (select_connect_backend_servers(router->service()->n_dbref, router->max_slave_count(),
 | 
						|
                                           session, router->config(), backends, master,
 | 
						|
                                           NULL, NULL, connection_type::ALL))
 | 
						|
        {
 | 
						|
            if ((rses = new RWSplitSession(router, session, backends, master)))
 | 
						|
            {
 | 
						|
                router->stats().n_sessions += 1;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return rses;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * API function definitions
 | 
						|
 */
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Create a new readwritesplit router instance
 | 
						|
 *
 | 
						|
 * An instance of the router is required for each service that uses this router.
 | 
						|
 * One instance of the router will handle multiple router sessions.
 | 
						|
 *
 | 
						|
 * @param service The service this router is being create for
 | 
						|
 * @param options The options for this query router
 | 
						|
 *
 | 
						|
 * @return New router instance or NULL on error
 | 
						|
 */
 | 
						|
static MXS_ROUTER* createInstance(SERVICE *service, char **options)
 | 
						|
{
 | 
						|
 | 
						|
    MXS_CONFIG_PARAMETER* params = service->svc_config_param;
 | 
						|
    Config config(params);
 | 
						|
 | 
						|
    if (!handle_max_slaves(config, config_get_string(params, "max_slave_connections")) ||
 | 
						|
        (options && !rwsplit_process_router_options(config, options)))
 | 
						|
    {
 | 
						|
        return NULL;
 | 
						|
    }
 | 
						|
 | 
						|
    /** These options cancel each other out */
 | 
						|
    if (config.disable_sescmd_history && config.max_sescmd_history > 0)
 | 
						|
    {
 | 
						|
        config.max_sescmd_history = 0;
 | 
						|
    }
 | 
						|
 | 
						|
    return (MXS_ROUTER*)new (std::nothrow) RWSplit(service, config);
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Create a new session for this router instance
 | 
						|
 *
 | 
						|
 * The session is used to store all the data required by the router for a
 | 
						|
 * particular client connection. The instance of the router that relates to a
 | 
						|
 * particular service is passed as the first parameter. The second parameter is
 | 
						|
 * the session that has been created in response to the request from a client
 | 
						|
 * for a connection. The passed session contains generic information; this
 | 
						|
 * function creates the session structure that holds router specific data.
 | 
						|
 * There is often a one to one relationship between sessions and router
 | 
						|
 * sessions, although it is possible to create configurations where a
 | 
						|
 * connection is handled by multiple routers, one after another.
 | 
						|
 *
 | 
						|
 * @param instance The router instance data
 | 
						|
 * @param session  The MaxScale session (generic connection data)
 | 
						|
 *
 | 
						|
 * @return New router session or NULL on error
 | 
						|
 */
 | 
						|
static MXS_ROUTER_SESSION* newSession(MXS_ROUTER *router_inst, MXS_SESSION *session)
 | 
						|
{
 | 
						|
    RWSplit* router = reinterpret_cast<RWSplit*>(router_inst);
 | 
						|
    RWSplitSession* rses = NULL;
 | 
						|
    MXS_EXCEPTION_GUARD(rses = RWSplitSession::create(router, session));
 | 
						|
    return reinterpret_cast<MXS_ROUTER_SESSION*>(rses);
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Close a router session
 | 
						|
 *
 | 
						|
 * Close a session with the router, this is the mechanism by which a router
 | 
						|
 * may perform cleanup. The instance of the router that relates to
 | 
						|
 * the relevant service is passed, along with the router session that is to
 | 
						|
 * be closed. The freeSession will be called once the session has been closed.
 | 
						|
 *
 | 
						|
 * @param instance The router instance data
 | 
						|
 * @param session  The router session being closed
 | 
						|
 */
 | 
						|
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session)
 | 
						|
{
 | 
						|
    RWSplitSession *router_cli_ses = (RWSplitSession *)router_session;
 | 
						|
    CHK_CLIENT_RSES(router_cli_ses);
 | 
						|
 | 
						|
    if (!router_cli_ses->rses_closed)
 | 
						|
    {
 | 
						|
        router_cli_ses->rses_closed = true;
 | 
						|
        close_all_connections(router_cli_ses->backends);
 | 
						|
 | 
						|
        if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO) &&
 | 
						|
            router_cli_ses->sescmd_list.size())
 | 
						|
        {
 | 
						|
            std::string sescmdstr;
 | 
						|
 | 
						|
            for (mxs::SessionCommandList::iterator it = router_cli_ses->sescmd_list.begin();
 | 
						|
                 it != router_cli_ses->sescmd_list.end(); it++)
 | 
						|
            {
 | 
						|
                mxs::SSessionCommand& scmd = *it;
 | 
						|
                sescmdstr += scmd->to_string();
 | 
						|
                sescmdstr += "\n";
 | 
						|
            }
 | 
						|
 | 
						|
            MXS_INFO("Executed session commands:\n%s", sescmdstr.c_str());
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Free a router session
 | 
						|
 *
 | 
						|
 * When a router session has been closed, freeSession can be called to free
 | 
						|
 * allocated resources.
 | 
						|
 *
 | 
						|
 * @param instance The router instance
 | 
						|
 * @param session  The router session
 | 
						|
 *
 | 
						|
 */
 | 
						|
static void freeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* session)
 | 
						|
{
 | 
						|
    RWSplitSession* rses = reinterpret_cast<RWSplitSession*>(session);
 | 
						|
    delete rses;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief The main routing entry point
 | 
						|
 *
 | 
						|
 * The routeQuery function will make the routing decision based on the contents
 | 
						|
 * of the instance, session and the query itself. The query always represents
 | 
						|
 * a complete MariaDB/MySQL packet because we define the RCAP_TYPE_STMT_INPUT in
 | 
						|
 * getCapabilities().
 | 
						|
 *
 | 
						|
 * @param instance       Router instance
 | 
						|
 * @param router_session Router session associated with the client
 | 
						|
 * @param querybuf       Buffer containing the query
 | 
						|
 *
 | 
						|
 * @return 1 on success, 0 on error
 | 
						|
 */
 | 
						|
static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *querybuf)
 | 
						|
{
 | 
						|
    RWSplit *inst = (RWSplit *) instance;
 | 
						|
    RWSplitSession *rses = (RWSplitSession *) router_session;
 | 
						|
    int rval = 0;
 | 
						|
 | 
						|
    CHK_CLIENT_RSES(rses);
 | 
						|
 | 
						|
    if (rses->rses_closed)
 | 
						|
    {
 | 
						|
        closed_session_reply(querybuf);
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        if (rses->query_queue == NULL &&
 | 
						|
            (rses->expected_responses == 0 ||
 | 
						|
             rses->load_data_state == LOAD_DATA_ACTIVE ||
 | 
						|
             rses->large_query))
 | 
						|
        {
 | 
						|
            /** Gather the information required to make routing decisions */
 | 
						|
            RouteInfo info(rses, querybuf);
 | 
						|
 | 
						|
            /** No active or pending queries */
 | 
						|
            if (route_single_stmt(inst, rses, querybuf, info))
 | 
						|
            {
 | 
						|
                rval = 1;
 | 
						|
            }
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            /**
 | 
						|
             * We are already processing a request from the client. Store the
 | 
						|
             * new query and wait for the previous one to complete.
 | 
						|
             */
 | 
						|
            ss_dassert(rses->expected_responses || rses->query_queue);
 | 
						|
            MXS_INFO("Storing query (len: %d cmd: %0x), expecting %d replies to current command",
 | 
						|
                     gwbuf_length(querybuf), GWBUF_DATA(querybuf)[4],
 | 
						|
                     rses->expected_responses);
 | 
						|
            rses->query_queue = gwbuf_append(rses->query_queue, querybuf);
 | 
						|
            querybuf = NULL;
 | 
						|
            rval = 1;
 | 
						|
            ss_dassert(rses->expected_responses > 0);
 | 
						|
 | 
						|
            if (rses->expected_responses == 0 && !route_stored_query(rses))
 | 
						|
            {
 | 
						|
                rval = 0;
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    if (querybuf != NULL)
 | 
						|
    {
 | 
						|
        gwbuf_free(querybuf);
 | 
						|
    }
 | 
						|
 | 
						|
    return rval;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Diagnostics routine
 | 
						|
 *
 | 
						|
 * Print query router statistics to the DCB passed in
 | 
						|
 *
 | 
						|
 * @param instance The router instance
 | 
						|
 * @param dcb      The DCB for diagnostic output
 | 
						|
 */
 | 
						|
static void diagnostics(MXS_ROUTER *instance, DCB *dcb)
 | 
						|
{
 | 
						|
    RWSplit *router = (RWSplit *)instance;
 | 
						|
    const char *weightby = serviceGetWeightingParameter(router->service());
 | 
						|
    double master_pct = 0.0, slave_pct = 0.0, all_pct = 0.0;
 | 
						|
 | 
						|
    dcb_printf(dcb, "\n");
 | 
						|
    dcb_printf(dcb, "\tuse_sql_variables_in:      %s\n",
 | 
						|
               mxs_target_to_str(router->config().use_sql_variables_in));
 | 
						|
    dcb_printf(dcb, "\tslave_selection_criteria:  %s\n",
 | 
						|
               select_criteria_to_str(router->config().slave_selection_criteria));
 | 
						|
    dcb_printf(dcb, "\tmaster_failure_mode:       %s\n",
 | 
						|
               failure_mode_to_str(router->config().master_failure_mode));
 | 
						|
    dcb_printf(dcb, "\tmax_slave_replication_lag: %d\n",
 | 
						|
               router->config().max_slave_replication_lag);
 | 
						|
    dcb_printf(dcb, "\tretry_failed_reads:        %s\n",
 | 
						|
               router->config().retry_failed_reads ? "true" : "false");
 | 
						|
    dcb_printf(dcb, "\tstrict_multi_stmt:         %s\n",
 | 
						|
               router->config().strict_multi_stmt ? "true" : "false");
 | 
						|
    dcb_printf(dcb, "\tstrict_sp_calls:           %s\n",
 | 
						|
               router->config().strict_sp_calls ? "true" : "false");
 | 
						|
    dcb_printf(dcb, "\tdisable_sescmd_history:    %s\n",
 | 
						|
               router->config().disable_sescmd_history ? "true" : "false");
 | 
						|
    dcb_printf(dcb, "\tmax_sescmd_history:        %lu\n",
 | 
						|
               router->config().max_sescmd_history);
 | 
						|
    dcb_printf(dcb, "\tmaster_accept_reads:       %s\n",
 | 
						|
               router->config().master_accept_reads ? "true" : "false");
 | 
						|
    dcb_printf(dcb, "\n");
 | 
						|
 | 
						|
    if (router->stats().n_queries > 0)
 | 
						|
    {
 | 
						|
        master_pct = ((double)router->stats().n_master / (double)router->stats().n_queries) * 100.0;
 | 
						|
        slave_pct = ((double)router->stats().n_slave / (double)router->stats().n_queries) * 100.0;
 | 
						|
        all_pct = ((double)router->stats().n_all / (double)router->stats().n_queries) * 100.0;
 | 
						|
    }
 | 
						|
 | 
						|
    dcb_printf(dcb, "\tNumber of router sessions:           	%" PRIu64 "\n",
 | 
						|
               router->stats().n_sessions);
 | 
						|
    dcb_printf(dcb, "\tCurrent no. of router sessions:      	%d\n",
 | 
						|
               router->service()->stats.n_current);
 | 
						|
    dcb_printf(dcb, "\tNumber of queries forwarded:          	%" PRIu64 "\n",
 | 
						|
               router->stats().n_queries);
 | 
						|
    dcb_printf(dcb, "\tNumber of queries forwarded to master:	%" PRIu64 " (%.2f%%)\n",
 | 
						|
               router->stats().n_master, master_pct);
 | 
						|
    dcb_printf(dcb, "\tNumber of queries forwarded to slave: 	%" PRIu64 " (%.2f%%)\n",
 | 
						|
               router->stats().n_slave, slave_pct);
 | 
						|
    dcb_printf(dcb, "\tNumber of queries forwarded to all:   	%" PRIu64 " (%.2f%%)\n",
 | 
						|
               router->stats().n_all, all_pct);
 | 
						|
 | 
						|
    if (*weightby)
 | 
						|
    {
 | 
						|
        dcb_printf(dcb, "\tConnection distribution based on %s "
 | 
						|
                   "server parameter.\n",
 | 
						|
                   weightby);
 | 
						|
        dcb_printf(dcb, "\t\tServer               Target %%    Connections  "
 | 
						|
                   "Operations\n");
 | 
						|
        dcb_printf(dcb, "\t\t                               Global  Router\n");
 | 
						|
        for (SERVER_REF *ref = router->service()->dbref; ref; ref = ref->next)
 | 
						|
        {
 | 
						|
            dcb_printf(dcb, "\t\t%-20s %3.1f%%     %-6d  %-6d  %d\n",
 | 
						|
                       ref->server->unique_name, (float)ref->weight / 10,
 | 
						|
                       ref->server->stats.n_current, ref->connections,
 | 
						|
                       ref->server->stats.n_current_ops);
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief JSON diagnostics routine
 | 
						|
 *
 | 
						|
 * @param instance The router instance
 | 
						|
 * @param dcb      The DCB for diagnostic output
 | 
						|
 */
 | 
						|
static json_t* diagnostics_json(const MXS_ROUTER *instance)
 | 
						|
{
 | 
						|
    RWSplit *router = (RWSplit *)instance;
 | 
						|
 | 
						|
    json_t* rval = json_object();
 | 
						|
 | 
						|
    json_object_set_new(rval, "use_sql_variables_in",
 | 
						|
                        json_string(mxs_target_to_str(router->config().use_sql_variables_in)));
 | 
						|
    json_object_set_new(rval, "slave_selection_criteria",
 | 
						|
                        json_string(select_criteria_to_str(router->config().slave_selection_criteria)));
 | 
						|
    json_object_set_new(rval, "master_failure_mode",
 | 
						|
                        json_string(failure_mode_to_str(router->config().master_failure_mode)));
 | 
						|
    json_object_set_new(rval, "max_slave_replication_lag",
 | 
						|
                        json_integer(router->config().max_slave_replication_lag));
 | 
						|
    json_object_set_new(rval, "retry_failed_reads",
 | 
						|
                        json_boolean(router->config().retry_failed_reads));
 | 
						|
    json_object_set_new(rval, "strict_multi_stmt",
 | 
						|
                        json_boolean(router->config().strict_multi_stmt));
 | 
						|
    json_object_set_new(rval, "strict_sp_calls",
 | 
						|
                        json_boolean(router->config().strict_sp_calls));
 | 
						|
    json_object_set_new(rval, "disable_sescmd_history",
 | 
						|
                        json_boolean(router->config().disable_sescmd_history));
 | 
						|
    json_object_set_new(rval, "max_sescmd_history",
 | 
						|
                        json_integer(router->config().max_sescmd_history));
 | 
						|
    json_object_set_new(rval, "master_accept_reads",
 | 
						|
                        json_boolean(router->config().master_accept_reads));
 | 
						|
 | 
						|
 | 
						|
    json_object_set_new(rval, "connections", json_integer(router->stats().n_sessions));
 | 
						|
    json_object_set_new(rval, "current_connections", json_integer(router->service()->stats.n_current));
 | 
						|
    json_object_set_new(rval, "queries", json_integer(router->stats().n_queries));
 | 
						|
    json_object_set_new(rval, "route_master", json_integer(router->stats().n_master));
 | 
						|
    json_object_set_new(rval, "route_slave", json_integer(router->stats().n_slave));
 | 
						|
    json_object_set_new(rval, "route_all", json_integer(router->stats().n_all));
 | 
						|
 | 
						|
    const char *weightby = serviceGetWeightingParameter(router->service());
 | 
						|
 | 
						|
    if (*weightby)
 | 
						|
    {
 | 
						|
        json_object_set_new(rval, "weightby", json_string(weightby));
 | 
						|
    }
 | 
						|
 | 
						|
    return rval;
 | 
						|
}
 | 
						|
 | 
						|
static void log_unexpected_response(DCB* dcb, GWBUF* buffer)
 | 
						|
{
 | 
						|
    if (mxs_mysql_is_err_packet(buffer))
 | 
						|
    {
 | 
						|
        /** This should be the only valid case where the server sends a response
 | 
						|
         * without the client sending one first. MaxScale does not yet advertise
 | 
						|
         * the progress reporting flag so we don't need to handle it. */
 | 
						|
        uint8_t* data = GWBUF_DATA(buffer);
 | 
						|
        size_t len = MYSQL_GET_PAYLOAD_LEN(data);
 | 
						|
        uint16_t errcode = MYSQL_GET_ERRCODE(data);
 | 
						|
        std::string errstr((char*)data + 7, (char*)data + 7 + len - 3);
 | 
						|
 | 
						|
        if (errcode == ER_CONNECTION_KILLED)
 | 
						|
        {
 | 
						|
            MXS_INFO("Connection from '%s'@'%s' to '%s' was killed",
 | 
						|
                     dcb->session->client_dcb->user,
 | 
						|
                     dcb->session->client_dcb->remote,
 | 
						|
                     dcb->server->unique_name);
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            MXS_WARNING("Server '%s' sent an unexpected error: %hu, %s",
 | 
						|
                        dcb->server->unique_name, errcode, errstr.c_str());
 | 
						|
        }
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        MXS_ERROR("Unexpected internal state: received response 0x%02hhx from "
 | 
						|
                  "server '%s' when no response was expected",
 | 
						|
                  mxs_mysql_get_command(buffer), dcb->server->unique_name);
 | 
						|
        ss_dassert(false);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Client Reply routine
 | 
						|
 *
 | 
						|
 * @param   instance       The router instance
 | 
						|
 * @param   router_session The router session
 | 
						|
 * @param   backend_dcb    The backend DCB
 | 
						|
 * @param   queue          The Buffer containing the reply
 | 
						|
 */
 | 
						|
static void clientReply(MXS_ROUTER *instance,
 | 
						|
                        MXS_ROUTER_SESSION *router_session,
 | 
						|
                        GWBUF *writebuf,
 | 
						|
                        DCB *backend_dcb)
 | 
						|
{
 | 
						|
    RWSplitSession *rses = (RWSplitSession *)router_session;
 | 
						|
    DCB *client_dcb = backend_dcb->session->client_dcb;
 | 
						|
    CHK_CLIENT_RSES(rses);
 | 
						|
    ss_dassert(!rses->rses_closed);
 | 
						|
 | 
						|
    SRWBackend& backend = get_backend_from_dcb(rses, backend_dcb);
 | 
						|
 | 
						|
    if (rses->load_data_state == LOAD_DATA_ACTIVE && mxs_mysql_is_err_packet(writebuf))
 | 
						|
    {
 | 
						|
        // Server responded with an error to the LOAD DATA LOCAL INFILE
 | 
						|
        rses->load_data_state = LOAD_DATA_INACTIVE;
 | 
						|
    }
 | 
						|
 | 
						|
    if (backend->get_reply_state() == REPLY_STATE_DONE)
 | 
						|
    {
 | 
						|
        /** If we receive an unexpected response from the server, the internal
 | 
						|
         * logic cannot handle this situation. Routing the reply straight to
 | 
						|
         * the client should be the safest thing to do at this point. */
 | 
						|
        log_unexpected_response(backend_dcb, writebuf);
 | 
						|
        MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
 | 
						|
        return;
 | 
						|
    }
 | 
						|
 | 
						|
    if (session_have_stmt(backend_dcb->session))
 | 
						|
    {
 | 
						|
        /** Statement was successfully executed, free the stored statement */
 | 
						|
        session_clear_stmt(backend_dcb->session);
 | 
						|
    }
 | 
						|
 | 
						|
    if (reply_is_complete(backend, writebuf))
 | 
						|
    {
 | 
						|
        /** Got a complete reply, acknowledge the write and decrement expected response count */
 | 
						|
        backend->ack_write();
 | 
						|
        rses->expected_responses--;
 | 
						|
        ss_dassert(rses->expected_responses >= 0);
 | 
						|
        ss_dassert(backend->get_reply_state() == REPLY_STATE_DONE);
 | 
						|
        MXS_INFO("Reply complete, last reply from %s", backend->name());
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        MXS_INFO("Reply not yet complete. Waiting for %d replies, got one from %s",
 | 
						|
                 rses->expected_responses, backend->name());
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * Active cursor means that reply is from session command
 | 
						|
     * execution.
 | 
						|
     */
 | 
						|
    if (backend->session_command_count())
 | 
						|
    {
 | 
						|
        /** This discards all responses that have already been sent to the client */
 | 
						|
        bool rconn = false;
 | 
						|
        process_sescmd_response(rses, backend, &writebuf, &rconn);
 | 
						|
 | 
						|
        if (rconn && !rses->router->config().disable_sescmd_history)
 | 
						|
        {
 | 
						|
            select_connect_backend_servers(
 | 
						|
                rses->rses_nbackends,
 | 
						|
                rses->rses_config.max_slave_connections,
 | 
						|
                rses->client_dcb->session,
 | 
						|
                rses->router->config(), rses->backends, rses->current_master,
 | 
						|
                &rses->sescmd_list, &rses->expected_responses,
 | 
						|
                connection_type::SLAVE);
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    bool queue_routed = false;
 | 
						|
 | 
						|
    if (rses->expected_responses == 0 && rses->query_queue)
 | 
						|
    {
 | 
						|
        queue_routed = true;
 | 
						|
        route_stored_query(rses);
 | 
						|
    }
 | 
						|
 | 
						|
    if (writebuf)
 | 
						|
    {
 | 
						|
        ss_dassert(client_dcb);
 | 
						|
        /** Write reply to client DCB */
 | 
						|
        MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
 | 
						|
    }
 | 
						|
    /** Check pending session commands */
 | 
						|
    else if (!queue_routed && backend->session_command_count())
 | 
						|
    {
 | 
						|
        MXS_DEBUG("Backend %s processed reply and starts to execute active cursor.",
 | 
						|
                  backend->uri());
 | 
						|
 | 
						|
        if (backend->execute_session_command())
 | 
						|
        {
 | 
						|
            rses->expected_responses++;
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Get router capabilities
 | 
						|
 */
 | 
						|
static uint64_t getCapabilities(MXS_ROUTER* instance)
 | 
						|
{
 | 
						|
    return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_PACKET_OUTPUT;
 | 
						|
}
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Router error handling routine
 | 
						|
 *
 | 
						|
 * Error Handler routine to resolve backend failures. If it succeeds then
 | 
						|
 * there are enough operative backends available and connected. Otherwise it
 | 
						|
 * fails, and session is terminated.
 | 
						|
 *
 | 
						|
 * @param instance       The router instance
 | 
						|
 * @param router_session The router session
 | 
						|
 * @param errmsgbuf      The error message to reply
 | 
						|
 * @param backend_dcb    The backend DCB
 | 
						|
 * @param action         The action: ERRACT_NEW_CONNECTION or
 | 
						|
 *                       ERRACT_REPLY_CLIENT
 | 
						|
 * @param succp          Result of action: true if router can continue
 | 
						|
 */
 | 
						|
static void handleError(MXS_ROUTER *instance,
 | 
						|
                        MXS_ROUTER_SESSION *router_session,
 | 
						|
                        GWBUF *errmsgbuf,
 | 
						|
                        DCB *problem_dcb,
 | 
						|
                        mxs_error_action_t action,
 | 
						|
                        bool *succp)
 | 
						|
{
 | 
						|
    ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER);
 | 
						|
    RWSplit *inst = (RWSplit *)instance;
 | 
						|
    RWSplitSession *rses = (RWSplitSession *)router_session;
 | 
						|
    CHK_CLIENT_RSES(rses);
 | 
						|
    CHK_DCB(problem_dcb);
 | 
						|
 | 
						|
    if (rses->rses_closed)
 | 
						|
    {
 | 
						|
        *succp = false;
 | 
						|
        return;
 | 
						|
    }
 | 
						|
 | 
						|
    MXS_SESSION *session = problem_dcb->session;
 | 
						|
    ss_dassert(session);
 | 
						|
 | 
						|
    SRWBackend& backend = get_backend_from_dcb(rses, problem_dcb);
 | 
						|
 | 
						|
    switch (action)
 | 
						|
    {
 | 
						|
    case ERRACT_NEW_CONNECTION:
 | 
						|
        {
 | 
						|
            if (rses->current_master && rses->current_master->in_use() &&
 | 
						|
                rses->current_master->dcb() == problem_dcb)
 | 
						|
            {
 | 
						|
                /** The connection to the master has failed */
 | 
						|
                SERVER *srv = rses->current_master->server();
 | 
						|
                bool can_continue = false;
 | 
						|
 | 
						|
                if (rses->rses_config.master_failure_mode != RW_FAIL_INSTANTLY &&
 | 
						|
                    (!backend || !backend->is_waiting_result()))
 | 
						|
                {
 | 
						|
                    /** The failure of a master is not considered a critical
 | 
						|
                     * failure as partial functionality still remains. Reads
 | 
						|
                     * are allowed as long as slave servers are available
 | 
						|
                     * and writes will cause an error to be returned.
 | 
						|
                     *
 | 
						|
                     * If we were waiting for a response from the master, we
 | 
						|
                     * can't be sure whether it was executed or not. In this
 | 
						|
                     * case the safest thing to do is to close the client
 | 
						|
                     * connection. */
 | 
						|
                    can_continue = true;
 | 
						|
                }
 | 
						|
                else if (!SERVER_IS_MASTER(srv) && !srv->master_err_is_logged)
 | 
						|
                {
 | 
						|
                    MXS_ERROR("Server %s:%d lost the master status. Readwritesplit "
 | 
						|
                              "service can't locate the master. Client sessions "
 | 
						|
                              "will be closed.", srv->name, srv->port);
 | 
						|
                    srv->master_err_is_logged = true;
 | 
						|
                }
 | 
						|
 | 
						|
                *succp = can_continue;
 | 
						|
 | 
						|
                if (backend)
 | 
						|
                {
 | 
						|
                    backend->close(mxs::Backend::CLOSE_FATAL);
 | 
						|
                }
 | 
						|
                else
 | 
						|
                {
 | 
						|
                    MXS_ERROR("Server %s:%d lost the master status but could not locate the "
 | 
						|
                              "corresponding backend ref.", srv->name, srv->port);
 | 
						|
                }
 | 
						|
            }
 | 
						|
            else if (backend)
 | 
						|
            {
 | 
						|
                if (rses->target_node &&
 | 
						|
                    (rses->target_node->dcb() == problem_dcb &&
 | 
						|
                     session_trx_is_read_only(problem_dcb->session)))
 | 
						|
                {
 | 
						|
                    /** The problem DCB is the current target of a READ ONLY transaction.
 | 
						|
                     * Reset the target and close the session. */
 | 
						|
                    rses->target_node.reset();
 | 
						|
                    *succp = false;
 | 
						|
                }
 | 
						|
                else
 | 
						|
                {
 | 
						|
                    /** Try to replace the failed connection with a new one */
 | 
						|
                    *succp = handle_error_new_connection(inst, &rses, problem_dcb, errmsgbuf);
 | 
						|
                }
 | 
						|
            }
 | 
						|
 | 
						|
            check_and_log_backend_state(backend, problem_dcb);
 | 
						|
            break;
 | 
						|
        }
 | 
						|
 | 
						|
    case ERRACT_REPLY_CLIENT:
 | 
						|
        {
 | 
						|
            handle_error_reply_client(session, rses, problem_dcb, errmsgbuf);
 | 
						|
            *succp = false; /*< no new backend servers were made available */
 | 
						|
            break;
 | 
						|
        }
 | 
						|
 | 
						|
    default:
 | 
						|
        ss_dassert(!true);
 | 
						|
        *succp = false;
 | 
						|
        break;
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
MXS_BEGIN_DECLS
 | 
						|
 | 
						|
/**
 | 
						|
 * The module entry point routine. It is this routine that must return
 | 
						|
 * the structure that is referred to as the "module object". This is a
 | 
						|
 * structure with the set of external entry points for this module.
 | 
						|
 */
 | 
						|
MXS_MODULE *MXS_CREATE_MODULE()
 | 
						|
{
 | 
						|
    static MXS_ROUTER_OBJECT MyObject =
 | 
						|
    {
 | 
						|
        createInstance,
 | 
						|
        newSession,
 | 
						|
        closeSession,
 | 
						|
        freeSession,
 | 
						|
        routeQuery,
 | 
						|
        diagnostics,
 | 
						|
        diagnostics_json,
 | 
						|
        clientReply,
 | 
						|
        handleError,
 | 
						|
        getCapabilities,
 | 
						|
        NULL
 | 
						|
    };
 | 
						|
 | 
						|
    static MXS_MODULE info =
 | 
						|
    {
 | 
						|
        MXS_MODULE_API_ROUTER, MXS_MODULE_GA, MXS_ROUTER_VERSION,
 | 
						|
        "A Read/Write splitting router for enhancement read scalability",
 | 
						|
        "V1.1.0",
 | 
						|
        RCAP_TYPE_STMT_INPUT | RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_PACKET_OUTPUT,
 | 
						|
        &MyObject,
 | 
						|
        NULL, /* Process init. */
 | 
						|
        NULL, /* Process finish. */
 | 
						|
        NULL, /* Thread init. */
 | 
						|
        NULL, /* Thread finish. */
 | 
						|
        {
 | 
						|
            {
 | 
						|
                "use_sql_variables_in",
 | 
						|
                MXS_MODULE_PARAM_ENUM,
 | 
						|
                "all",
 | 
						|
                MXS_MODULE_OPT_NONE,
 | 
						|
                use_sql_variables_in_values
 | 
						|
            },
 | 
						|
            {
 | 
						|
                "slave_selection_criteria",
 | 
						|
                MXS_MODULE_PARAM_ENUM,
 | 
						|
                "LEAST_CURRENT_OPERATIONS",
 | 
						|
                MXS_MODULE_OPT_NONE,
 | 
						|
                slave_selection_criteria_values
 | 
						|
            },
 | 
						|
            {
 | 
						|
                "master_failure_mode",
 | 
						|
                MXS_MODULE_PARAM_ENUM,
 | 
						|
                "fail_instantly",
 | 
						|
                MXS_MODULE_OPT_NONE,
 | 
						|
                master_failure_mode_values
 | 
						|
            },
 | 
						|
            {"max_slave_replication_lag", MXS_MODULE_PARAM_INT, "-1"},
 | 
						|
            {"max_slave_connections", MXS_MODULE_PARAM_STRING, MAX_SLAVE_COUNT},
 | 
						|
            {"retry_failed_reads", MXS_MODULE_PARAM_BOOL, "true"},
 | 
						|
            {"disable_sescmd_history", MXS_MODULE_PARAM_BOOL, "true"},
 | 
						|
            {"max_sescmd_history", MXS_MODULE_PARAM_COUNT, "0"},
 | 
						|
            {"strict_multi_stmt",  MXS_MODULE_PARAM_BOOL, "false"},
 | 
						|
            {"strict_sp_calls",  MXS_MODULE_PARAM_BOOL, "false"},
 | 
						|
            {"master_accept_reads", MXS_MODULE_PARAM_BOOL, "false"},
 | 
						|
            {"connection_keepalive", MXS_MODULE_PARAM_COUNT, "0"},
 | 
						|
            {MXS_END_MODULE_PARAMS}
 | 
						|
        }
 | 
						|
    };
 | 
						|
 | 
						|
    MXS_NOTICE("Initializing statement-based read/write split router module.");
 | 
						|
    return &info;
 | 
						|
}
 | 
						|
 | 
						|
MXS_END_DECLS
 |