402 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			402 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
 * Copyright (c) 2018 MariaDB Corporation Ab
 | 
						|
 *
 | 
						|
 * Use of this software is governed by the Business Source License included
 | 
						|
 * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | 
						|
 *
 | 
						|
 * Change Date: 2023-01-01
 | 
						|
 *
 | 
						|
 * On the date above, in accordance with the Business Source License, use
 | 
						|
 * of this software will be governed by version 2 or later of the General
 | 
						|
 * Public License.
 | 
						|
 */
 | 
						|
#pragma once
 | 
						|
 | 
						|
#include "readwritesplit.hh"
 | 
						|
#include "trx.hh"
 | 
						|
 | 
						|
#include <string>
 | 
						|
#include <deque>
 | 
						|
 | 
						|
#include <maxscale/buffer.hh>
 | 
						|
#include <maxscale/modutil.hh>
 | 
						|
#include <maxscale/queryclassifier.hh>
 | 
						|
#include <maxscale/protocol/rwbackend.hh>
 | 
						|
 | 
						|
#define TARGET_IS_MASTER(t)       maxscale::QueryClassifier::target_is_master(t)
 | 
						|
#define TARGET_IS_SLAVE(t)        maxscale::QueryClassifier::target_is_slave(t)
 | 
						|
#define TARGET_IS_NAMED_SERVER(t) maxscale::QueryClassifier::target_is_named_server(t)
 | 
						|
#define TARGET_IS_ALL(t)          maxscale::QueryClassifier::target_is_all(t)
 | 
						|
#define TARGET_IS_RLAG_MAX(t)     maxscale::QueryClassifier::target_is_rlag_max(t)
 | 
						|
#define TARGET_IS_LAST_USED(t)    maxscale::QueryClassifier::target_is_last_used(t)
 | 
						|
 | 
						|
typedef std::map<uint32_t, uint32_t> ClientHandleMap;   /** External ID to internal ID */
 | 
						|
 | 
						|
typedef std::unordered_set<std::string> TableSet;
 | 
						|
typedef std::map<uint64_t, uint8_t>     ResponseMap;
 | 
						|
 | 
						|
/** List of slave responses that arrived before the master */
 | 
						|
typedef std::list<std::pair<mxs::RWBackend*, uint8_t>> SlaveResponseList;
 | 
						|
 | 
						|
/** Map of COM_STMT_EXECUTE targets by internal ID */
 | 
						|
typedef std::unordered_map<uint32_t, mxs::RWBackend*> ExecMap;
 | 
						|
 | 
						|
/**
 | 
						|
 * The client session of a RWSplit instance
 | 
						|
 */
 | 
						|
class RWSplitSession : public mxs::RouterSession
 | 
						|
                     , private mxs::QueryClassifier::Handler
 | 
						|
{
 | 
						|
    RWSplitSession(const RWSplitSession&) = delete;
 | 
						|
    RWSplitSession& operator=(const RWSplitSession&) = delete;
 | 
						|
 | 
						|
public:
 | 
						|
    enum
 | 
						|
    {
 | 
						|
        TARGET_UNDEFINED    = maxscale::QueryClassifier::TARGET_UNDEFINED,
 | 
						|
        TARGET_MASTER       = maxscale::QueryClassifier::TARGET_MASTER,
 | 
						|
        TARGET_SLAVE        = maxscale::QueryClassifier::TARGET_SLAVE,
 | 
						|
        TARGET_NAMED_SERVER = maxscale::QueryClassifier::TARGET_NAMED_SERVER,
 | 
						|
        TARGET_ALL          = maxscale::QueryClassifier::TARGET_ALL,
 | 
						|
        TARGET_RLAG_MAX     = maxscale::QueryClassifier::TARGET_RLAG_MAX,
 | 
						|
        TARGET_LAST_USED    = maxscale::QueryClassifier::TARGET_LAST_USED,
 | 
						|
    };
 | 
						|
 | 
						|
    enum otrx_state
 | 
						|
    {
 | 
						|
        OTRX_INACTIVE,  // No open transactions
 | 
						|
        OTRX_STARTING,  // Transaction starting on slave
 | 
						|
        OTRX_ACTIVE,    // Transaction open on a slave server
 | 
						|
        OTRX_ROLLBACK   // Transaction being rolled back on the slave server
 | 
						|
    };
 | 
						|
 | 
						|
    enum wait_gtid_state
 | 
						|
    {
 | 
						|
        NONE,
 | 
						|
        WAITING_FOR_HEADER,
 | 
						|
        RETRYING_ON_MASTER,
 | 
						|
        UPDATING_PACKETS
 | 
						|
    };
 | 
						|
 | 
						|
    virtual ~RWSplitSession()
 | 
						|
    {
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * Create a new router session
 | 
						|
     *
 | 
						|
     * @param instance Router instance
 | 
						|
     * @param session  The session object
 | 
						|
     *
 | 
						|
     * @return New router session
 | 
						|
     */
 | 
						|
    static RWSplitSession* create(RWSplit* router, MXS_SESSION* session);
 | 
						|
 | 
						|
    /**
 | 
						|
     * Called when a client session has been closed.
 | 
						|
     */
 | 
						|
    void close();
 | 
						|
 | 
						|
    /**
 | 
						|
     * Called when a packet being is routed to the backend. The router should
 | 
						|
     * forward the packet to the appropriate server(s).
 | 
						|
     *
 | 
						|
     * @param pPacket A client packet.
 | 
						|
     */
 | 
						|
    int32_t routeQuery(GWBUF* pPacket);
 | 
						|
 | 
						|
    /**
 | 
						|
     * Called when a packet is routed to the client. The router should
 | 
						|
     * forward the packet to the client using `MXS_SESSION_ROUTE_REPLY`.
 | 
						|
     *
 | 
						|
     * @param pPacket  A client packet.
 | 
						|
     * @param pBackend The backend the packet is coming from.
 | 
						|
     */
 | 
						|
    void clientReply(GWBUF* pPacket, DCB* pBackend);
 | 
						|
 | 
						|
    /**
 | 
						|
     *
 | 
						|
     * @param pMessage  The error message.
 | 
						|
     * @param pProblem  The DCB on which the error occurred.
 | 
						|
     * @param action    The context.
 | 
						|
     * @param pSuccess  On output, if false, the session will be terminated.
 | 
						|
     */
 | 
						|
    void handleError(GWBUF* pMessage,
 | 
						|
                     DCB* pProblem,
 | 
						|
                     mxs_error_action_t action,
 | 
						|
                     bool* pSuccess);
 | 
						|
 | 
						|
    mxs::QueryClassifier& qc()
 | 
						|
    {
 | 
						|
        return m_qc;
 | 
						|
    }
 | 
						|
 | 
						|
private:
 | 
						|
    RWSplitSession(RWSplit* instance, MXS_SESSION* session, mxs::SRWBackends backends);
 | 
						|
 | 
						|
    bool open_connections();
 | 
						|
    void process_sescmd_response(mxs::RWBackend* backend, GWBUF** ppPacket);
 | 
						|
    void compress_history(mxs::SSessionCommand& sescmd);
 | 
						|
 | 
						|
    void prune_to_position(uint64_t pos);
 | 
						|
    bool route_session_write(GWBUF* querybuf, uint8_t command, uint32_t type);
 | 
						|
    void continue_large_session_write(GWBUF* querybuf, uint32_t type);
 | 
						|
    bool route_single_stmt(GWBUF* querybuf);
 | 
						|
    bool route_stored_query();
 | 
						|
    void close_stale_connections();
 | 
						|
 | 
						|
    int64_t         get_current_rank();
 | 
						|
    mxs::RWBackend* get_hinted_backend(const char* name);
 | 
						|
    mxs::RWBackend* get_slave_backend(int max_rlag);
 | 
						|
    mxs::RWBackend* get_master_backend();
 | 
						|
    mxs::RWBackend* get_last_used_backend();
 | 
						|
    mxs::RWBackend* get_target_backend(backend_type_t btype, const char* name, int max_rlag);
 | 
						|
 | 
						|
    bool handle_target_is_all(route_target_t route_target,
 | 
						|
                              GWBUF* querybuf,
 | 
						|
                              int packet_type,
 | 
						|
                              uint32_t qtype);
 | 
						|
    mxs::RWBackend* handle_hinted_target(GWBUF* querybuf, route_target_t route_target);
 | 
						|
    mxs::RWBackend* handle_slave_is_target(uint8_t cmd, uint32_t stmt_id);
 | 
						|
    bool            handle_master_is_target(mxs::RWBackend** dest);
 | 
						|
    bool            handle_got_target(GWBUF* querybuf, mxs::RWBackend* target, bool store);
 | 
						|
    void            handle_connection_keepalive(mxs::RWBackend* target);
 | 
						|
    bool            prepare_target(mxs::RWBackend* target, route_target_t route_target);
 | 
						|
    bool            prepare_connection(mxs::RWBackend* target);
 | 
						|
    bool            create_one_connection();
 | 
						|
    void            retry_query(GWBUF* querybuf, int delay = 1);
 | 
						|
 | 
						|
    bool trx_is_starting();
 | 
						|
    bool should_replace_master(mxs::RWBackend* target);
 | 
						|
    void replace_master(mxs::RWBackend* target);
 | 
						|
    bool should_migrate_trx(mxs::RWBackend* target);
 | 
						|
    bool start_trx_migration(mxs::RWBackend* target, GWBUF* querybuf);
 | 
						|
    void log_master_routing_failure(bool found,
 | 
						|
                                    mxs::RWBackend* old_master,
 | 
						|
                                    mxs::RWBackend* curr_master);
 | 
						|
 | 
						|
    // Send unknown prepared statement ID error to client
 | 
						|
    bool send_unknown_ps_error(uint32_t stmt_id);
 | 
						|
 | 
						|
    GWBUF* handle_causal_read_reply(GWBUF* writebuf, mxs::RWBackend* backend);
 | 
						|
    GWBUF* add_prefix_wait_gtid(SERVER* server, GWBUF* origin);
 | 
						|
    void   correct_packet_sequence(GWBUF* buffer);
 | 
						|
    GWBUF* discard_master_wait_gtid_result(GWBUF* buffer);
 | 
						|
 | 
						|
    int             get_max_replication_lag();
 | 
						|
    mxs::RWBackend* get_backend_from_dcb(DCB* dcb);
 | 
						|
 | 
						|
    bool retry_master_query(mxs::RWBackend* backend);
 | 
						|
    void handle_error_reply_client(DCB* backend_dcb, GWBUF* errmsg);
 | 
						|
    bool handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg);
 | 
						|
    void manage_transactions(mxs::RWBackend* backend, GWBUF* writebuf);
 | 
						|
 | 
						|
    void trx_replay_next_stmt();
 | 
						|
 | 
						|
    // Do we have at least one open slave connection
 | 
						|
    bool have_connected_slaves() const;
 | 
						|
 | 
						|
    /**
 | 
						|
     * Start the replaying of the latest transaction
 | 
						|
     *
 | 
						|
     * @return True if the session can continue. False if the session must be closed.
 | 
						|
     */
 | 
						|
    bool start_trx_replay();
 | 
						|
 | 
						|
    /**
 | 
						|
     * See if the transaction could be done on a slave
 | 
						|
     *
 | 
						|
     * @param route_target Target where the query is routed
 | 
						|
     *
 | 
						|
     * @return True if the query can be attempted on a slave
 | 
						|
     */
 | 
						|
    bool should_try_trx_on_slave(route_target_t route_target) const;
 | 
						|
 | 
						|
    /**
 | 
						|
     * Track optimistic transaction status
 | 
						|
     *
 | 
						|
     * Tracks the progress of the optimistic transaction and starts the rollback
 | 
						|
     * procedure if the transaction turns out to be one that modifies data.
 | 
						|
     *
 | 
						|
     * @param buffer     Current query
 | 
						|
     *
 | 
						|
     * @return Whether the current statement should be stored for the duration of the query
 | 
						|
     */
 | 
						|
    bool track_optimistic_trx(GWBUF** buffer);
 | 
						|
 | 
						|
private:
 | 
						|
    // QueryClassifier::Handler
 | 
						|
    bool lock_to_master();
 | 
						|
    bool is_locked_to_master() const;
 | 
						|
    bool supports_hint(HINT_TYPE hint_type) const;
 | 
						|
 | 
						|
    inline bool can_retry_query() const
 | 
						|
    {
 | 
						|
        /** Individual queries can only be retried if we are not inside
 | 
						|
         * a transaction. If a query in a transaction needs to be retried,
 | 
						|
         * the whole transaction must be replayed before the retrying is done.
 | 
						|
         *
 | 
						|
         * @see handle_trx_replay
 | 
						|
         */
 | 
						|
        return m_config.delayed_retry
 | 
						|
               && m_retry_duration < m_config.delayed_retry_timeout
 | 
						|
               && !session_trx_is_active(m_client->session);
 | 
						|
    }
 | 
						|
 | 
						|
    // Whether a transaction replay can remain active
 | 
						|
    inline bool can_continue_trx_replay() const
 | 
						|
    {
 | 
						|
        return m_is_replay_active && m_retry_duration < m_config.delayed_retry_timeout;
 | 
						|
    }
 | 
						|
 | 
						|
    inline bool can_recover_servers() const
 | 
						|
    {
 | 
						|
        return !m_config.disable_sescmd_history || m_recv_sescmd == 0;
 | 
						|
    }
 | 
						|
 | 
						|
    inline bool can_continue_session() const
 | 
						|
    {
 | 
						|
        return std::any_of(m_raw_backends.begin(), m_raw_backends.end(), [](mxs::RWBackend* b) {
 | 
						|
                               return b->in_use();
 | 
						|
                           });
 | 
						|
    }
 | 
						|
 | 
						|
    inline bool is_large_query(GWBUF* buf)
 | 
						|
    {
 | 
						|
        uint32_t buflen = gwbuf_length(buf);
 | 
						|
 | 
						|
        // The buffer should contain at most (2^24 - 1) + 4 bytes ...
 | 
						|
        mxb_assert(buflen <= MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN);
 | 
						|
        // ... and the payload should be buflen - 4 bytes
 | 
						|
        mxb_assert(MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buf)) == buflen - MYSQL_HEADER_LEN);
 | 
						|
 | 
						|
        return buflen == MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN;
 | 
						|
    }
 | 
						|
 | 
						|
    inline bool can_route_queries() const
 | 
						|
    {
 | 
						|
        return m_expected_responses == 0
 | 
						|
               || m_qc.load_data_state() == mxs::QueryClassifier::LOAD_DATA_ACTIVE
 | 
						|
               || m_qc.large_query();
 | 
						|
    }
 | 
						|
 | 
						|
    inline mxs::QueryClassifier::current_target_t get_current_target() const
 | 
						|
    {
 | 
						|
        mxs::QueryClassifier::current_target_t current_target;
 | 
						|
 | 
						|
        if (m_target_node == NULL)
 | 
						|
        {
 | 
						|
            current_target = mxs::QueryClassifier::CURRENT_TARGET_UNDEFINED;
 | 
						|
        }
 | 
						|
        else if (m_target_node == m_current_master)
 | 
						|
        {
 | 
						|
            current_target = mxs::QueryClassifier::CURRENT_TARGET_MASTER;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            current_target = mxs::QueryClassifier::CURRENT_TARGET_SLAVE;
 | 
						|
        }
 | 
						|
 | 
						|
        return current_target;
 | 
						|
    }
 | 
						|
 | 
						|
    void update_trx_statistics()
 | 
						|
    {
 | 
						|
        if (session_trx_is_ending(m_client->session))
 | 
						|
        {
 | 
						|
            mxb::atomic::add(m_qc.is_trx_still_read_only() ?
 | 
						|
                             &m_router->stats().n_ro_trx :
 | 
						|
                             &m_router->stats().n_rw_trx,
 | 
						|
                             1,
 | 
						|
                             mxb::atomic::RELAXED);
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    mxs::SRWBackends m_backends;                /**< Mem. management, not for use outside RWSplitSession */
 | 
						|
    mxs::PRWBackends m_raw_backends;            /**< Backend pointers for use in interfaces . */
 | 
						|
    mxs::RWBackend*  m_current_master;          /**< Current master server */
 | 
						|
    mxs::RWBackend*  m_target_node;             /**< The currently locked target node */
 | 
						|
    mxs::RWBackend*  m_prev_target;             /**< The previous target where a query was sent */
 | 
						|
    Config           m_config;                  /**< Configuration for this session */
 | 
						|
    int              m_last_keepalive_check;    /**< When the last ping was done */
 | 
						|
    int              m_nbackends;               /**< Number of backend servers (obsolete) */
 | 
						|
    DCB*             m_client;                  /**< The client DCB */
 | 
						|
    uint64_t         m_sescmd_count;            /**< Number of executed session commands (starts from 1) */
 | 
						|
    int              m_expected_responses;      /**< Number of expected responses to the current
 | 
						|
                                                 * query */
 | 
						|
    std::deque<mxs::Buffer> m_query_queue;      /**< Queued commands waiting to be executed */
 | 
						|
    RWSplit*                m_router;           /**< The router instance */
 | 
						|
    mxs::SessionCommandList m_sescmd_list;      /**< List of executed session commands */
 | 
						|
    ResponseMap             m_sescmd_responses; /**< Response to each session command */
 | 
						|
    SlaveResponseList       m_slave_responses;  /**< Slaves that replied before the master */
 | 
						|
    uint64_t                m_sent_sescmd;      /**< ID of the last sent session command*/
 | 
						|
    uint64_t                m_recv_sescmd;      /**< ID of the most recently completed session
 | 
						|
                                                 * command */
 | 
						|
    ExecMap m_exec_map;                         /**< Map of COM_STMT_EXECUTE statement IDs to
 | 
						|
                                                 * Backends */
 | 
						|
    std::string          m_gtid_pos;            /**< Gtid position for causal read */
 | 
						|
    wait_gtid_state      m_wait_gtid;           /**< State of MASTER_GTID_WAIT reply */
 | 
						|
    uint32_t             m_next_seq;            /**< Next packet's sequence number */
 | 
						|
    mxs::QueryClassifier m_qc;                  /**< The query classifier. */
 | 
						|
    uint64_t             m_retry_duration;      /**< Total time spent retrying queries */
 | 
						|
    mxs::Buffer          m_current_query;       /**< Current query being executed */
 | 
						|
    Trx                  m_trx;                 /**< Current transaction */
 | 
						|
    bool                 m_is_replay_active;    /**< Whether we are actively replaying a
 | 
						|
                                                 * transaction */
 | 
						|
    bool        m_can_replay_trx;               /**< Whether the transaction can be replayed */
 | 
						|
    Trx         m_replayed_trx;                 /**< The transaction we are replaying */
 | 
						|
    mxs::Buffer m_interrupted_query;            /**< Query that was interrupted mid-transaction. */
 | 
						|
    Trx         m_orig_trx;                     /**< The backup of the transaction we're replaying */
 | 
						|
    mxs::Buffer m_orig_stmt;                    /**< The backup of the statement that was interrupted */
 | 
						|
    int64_t     m_num_trx_replays = 0;          /**< How many times trx replay has been attempted */
 | 
						|
 | 
						|
    otrx_state m_otrx_state = OTRX_INACTIVE;    /**< Optimistic trx state*/
 | 
						|
 | 
						|
    SrvStatMap& m_server_stats;     /**< The server stats local to this thread, cached in the session object.
 | 
						|
                                     * This avoids the lookup involved in getting the worker-local value from
 | 
						|
                                     * the worker's container.*/
 | 
						|
};
 | 
						|
 | 
						|
/**
 | 
						|
 * @brief Get the internal ID for the given binary prepared statement
 | 
						|
 *
 | 
						|
 * @param rses   Router client session
 | 
						|
 * @param buffer Buffer containing a binary protocol statement other than COM_STMT_PREPARE
 | 
						|
 *
 | 
						|
 * @return The internal ID of the prepared statement that the buffer contents refer to
 | 
						|
 */
 | 
						|
uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer);
 | 
						|
 | 
						|
static inline const char* route_target_to_string(route_target_t target)
 | 
						|
{
 | 
						|
    if (TARGET_IS_MASTER(target))
 | 
						|
    {
 | 
						|
        return "TARGET_MASTER";
 | 
						|
    }
 | 
						|
    else if (TARGET_IS_SLAVE(target))
 | 
						|
    {
 | 
						|
        return "TARGET_SLAVE";
 | 
						|
    }
 | 
						|
    else if (TARGET_IS_NAMED_SERVER(target))
 | 
						|
    {
 | 
						|
        return "TARGET_NAMED_SERVER";
 | 
						|
    }
 | 
						|
    else if (TARGET_IS_ALL(target))
 | 
						|
    {
 | 
						|
        return "TARGET_ALL";
 | 
						|
    }
 | 
						|
    else if (TARGET_IS_RLAG_MAX(target))
 | 
						|
    {
 | 
						|
        return "TARGET_RLAG_MAX";
 | 
						|
    }
 | 
						|
    else if (TARGET_IS_LAST_USED(target))
 | 
						|
    {
 | 
						|
        return "TARGET_LAST_USED";
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        mxb_assert(!true);
 | 
						|
        return "Unknown target value";
 | 
						|
    }
 | 
						|
}
 |