/* * Copyright (c) 2018 MariaDB Corporation Ab * * Use of this software is governed by the Business Source License included * in the LICENSE.TXT file and at www.mariadb.com/bsl11. * * Change Date: 2025-02-16 * * On the date above, in accordance with the Business Source License, use * of this software will be governed by version 2 or later of the General * Public License. */ #pragma once #include "readwritesplit.hh" #include "trx.hh" #include #include #include #include #include #include #define TARGET_IS_MASTER(t) maxscale::QueryClassifier::target_is_master(t) #define TARGET_IS_SLAVE(t) maxscale::QueryClassifier::target_is_slave(t) #define TARGET_IS_NAMED_SERVER(t) maxscale::QueryClassifier::target_is_named_server(t) #define TARGET_IS_ALL(t) maxscale::QueryClassifier::target_is_all(t) #define TARGET_IS_RLAG_MAX(t) maxscale::QueryClassifier::target_is_rlag_max(t) #define TARGET_IS_LAST_USED(t) maxscale::QueryClassifier::target_is_last_used(t) typedef std::map ClientHandleMap; /** External ID to internal ID */ typedef std::unordered_set TableSet; typedef std::map ResponseMap; /** List of slave responses that arrived before the master */ typedef std::list> SlaveResponseList; /** Map of COM_STMT_EXECUTE targets by internal ID */ typedef std::unordered_map ExecMap; /** * The client session of a RWSplit instance */ class RWSplitSession : public mxs::RouterSession , private mxs::QueryClassifier::Handler { RWSplitSession(const RWSplitSession&) = delete; RWSplitSession& operator=(const RWSplitSession&) = delete; public: enum { TARGET_UNDEFINED = maxscale::QueryClassifier::TARGET_UNDEFINED, TARGET_MASTER = maxscale::QueryClassifier::TARGET_MASTER, TARGET_SLAVE = maxscale::QueryClassifier::TARGET_SLAVE, TARGET_NAMED_SERVER = maxscale::QueryClassifier::TARGET_NAMED_SERVER, TARGET_ALL = maxscale::QueryClassifier::TARGET_ALL, TARGET_RLAG_MAX = maxscale::QueryClassifier::TARGET_RLAG_MAX, TARGET_LAST_USED = maxscale::QueryClassifier::TARGET_LAST_USED, }; enum otrx_state { OTRX_INACTIVE, // No open transactions OTRX_STARTING, // Transaction starting on slave OTRX_ACTIVE, // Transaction open on a slave server OTRX_ROLLBACK // Transaction being rolled back on the slave server }; enum wait_gtid_state { NONE, WAITING_FOR_HEADER, RETRYING_ON_MASTER, UPDATING_PACKETS }; virtual ~RWSplitSession() { } /** * Create a new router session * * @param instance Router instance * @param session The session object * * @return New router session */ static RWSplitSession* create(RWSplit* router, MXS_SESSION* session); /** * Called when a client session has been closed. */ void close(); /** * Called when a packet being is routed to the backend. The router should * forward the packet to the appropriate server(s). * * @param pPacket A client packet. */ int32_t routeQuery(GWBUF* pPacket); /** * Called when a packet is routed to the client. The router should * forward the packet to the client using `MXS_SESSION_ROUTE_REPLY`. * * @param pPacket A client packet. * @param pBackend The backend the packet is coming from. */ void clientReply(GWBUF* pPacket, DCB* pBackend); /** * * @param pMessage The error message. * @param pProblem The DCB on which the error occurred. * @param action The context. * @param pSuccess On output, if false, the session will be terminated. */ void handleError(GWBUF* pMessage, DCB* pProblem, mxs_error_action_t action, bool* pSuccess); mxs::QueryClassifier& qc() { return m_qc; } private: RWSplitSession(RWSplit* instance, MXS_SESSION* session, mxs::SRWBackends backends); bool open_connections(); void process_sescmd_response(mxs::RWBackend* backend, GWBUF** ppPacket); void compress_history(mxs::SSessionCommand& sescmd); void prune_to_position(uint64_t pos); bool route_session_write(GWBUF* querybuf, uint8_t command, uint32_t type); void continue_large_session_write(GWBUF* querybuf, uint32_t type); bool route_single_stmt(GWBUF* querybuf); bool route_stored_query(); void close_stale_connections(); int64_t get_current_rank(); mxs::RWBackend* get_hinted_backend(const char* name); mxs::RWBackend* get_slave_backend(int max_rlag); mxs::RWBackend* get_master_backend(); mxs::RWBackend* get_last_used_backend(); mxs::RWBackend* get_target_backend(backend_type_t btype, const char* name, int max_rlag); bool handle_target_is_all(route_target_t route_target, GWBUF* querybuf, int packet_type, uint32_t qtype); mxs::RWBackend* handle_hinted_target(GWBUF* querybuf, route_target_t route_target); mxs::RWBackend* handle_slave_is_target(uint8_t cmd, uint32_t stmt_id); bool handle_master_is_target(mxs::RWBackend** dest); bool handle_got_target(GWBUF* querybuf, mxs::RWBackend* target, bool store); void handle_connection_keepalive(mxs::RWBackend* target); bool prepare_target(mxs::RWBackend* target, route_target_t route_target); bool prepare_connection(mxs::RWBackend* target); bool create_one_connection(); void retry_query(GWBUF* querybuf, int delay = 1); bool trx_is_starting(); bool should_replace_master(mxs::RWBackend* target); void replace_master(mxs::RWBackend* target); bool should_migrate_trx(mxs::RWBackend* target); bool start_trx_migration(mxs::RWBackend* target, GWBUF* querybuf); void log_master_routing_failure(bool found, mxs::RWBackend* old_master, mxs::RWBackend* curr_master); // Send unknown prepared statement ID error to client bool send_unknown_ps_error(uint32_t stmt_id); GWBUF* handle_causal_read_reply(GWBUF* writebuf, mxs::RWBackend* backend); GWBUF* add_prefix_wait_gtid(SERVER* server, GWBUF* origin); void correct_packet_sequence(GWBUF* buffer); GWBUF* discard_master_wait_gtid_result(GWBUF* buffer); int get_max_replication_lag(); mxs::RWBackend* get_backend_from_dcb(DCB* dcb); bool retry_master_query(mxs::RWBackend* backend); void handle_error_reply_client(DCB* backend_dcb, GWBUF* errmsg); bool handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg); void manage_transactions(mxs::RWBackend* backend, GWBUF* writebuf); void trx_replay_next_stmt(); // Do we have at least one open slave connection bool have_connected_slaves() const; /** * Start the replaying of the latest transaction * * @return True if the session can continue. False if the session must be closed. */ bool start_trx_replay(); /** * See if the transaction could be done on a slave * * @param route_target Target where the query is routed * * @return True if the query can be attempted on a slave */ bool should_try_trx_on_slave(route_target_t route_target) const; /** * Track optimistic transaction status * * Tracks the progress of the optimistic transaction and starts the rollback * procedure if the transaction turns out to be one that modifies data. * * @param buffer Current query * * @return Whether the current statement should be stored for the duration of the query */ bool track_optimistic_trx(GWBUF** buffer); private: // QueryClassifier::Handler bool lock_to_master(); bool is_locked_to_master() const; bool supports_hint(HINT_TYPE hint_type) const; bool handle_ignorable_error(mxs::RWBackend* backend); inline bool can_retry_query() const { /** Individual queries can only be retried if we are not inside * a transaction. If a query in a transaction needs to be retried, * the whole transaction must be replayed before the retrying is done. * * @see handle_trx_replay */ return m_config.delayed_retry && m_retry_duration < m_config.delayed_retry_timeout && !session_trx_is_active(m_client->session); } // Whether a transaction replay can remain active inline bool can_continue_trx_replay() const { return m_is_replay_active && m_retry_duration < m_config.delayed_retry_timeout; } inline bool can_recover_servers() const { return !m_config.disable_sescmd_history || m_recv_sescmd == 0; } inline bool can_continue_session() const { return std::any_of(m_raw_backends.begin(), m_raw_backends.end(), [](mxs::RWBackend* b) { return b->in_use(); }); } std::string get_verbose_status() { std::string status; for (const auto& a : m_backends) { status += "\n"; status += a->get_verbose_status(); } return status; } inline bool is_large_query(GWBUF* buf) { uint32_t buflen = gwbuf_length(buf); // The buffer should contain at most (2^24 - 1) + 4 bytes ... mxb_assert(buflen <= MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN); // ... and the payload should be buflen - 4 bytes mxb_assert(MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buf)) == buflen - MYSQL_HEADER_LEN); return buflen == MYSQL_HEADER_LEN + GW_MYSQL_MAX_PACKET_LEN; } inline bool can_route_queries() const { return m_expected_responses == 0 || m_qc.load_data_state() == mxs::QueryClassifier::LOAD_DATA_ACTIVE || m_qc.large_query(); } inline mxs::QueryClassifier::current_target_t get_current_target() const { mxs::QueryClassifier::current_target_t current_target; if (m_target_node == NULL) { current_target = mxs::QueryClassifier::CURRENT_TARGET_UNDEFINED; } else if (m_target_node == m_current_master) { current_target = mxs::QueryClassifier::CURRENT_TARGET_MASTER; } else { current_target = mxs::QueryClassifier::CURRENT_TARGET_SLAVE; } return current_target; } void update_trx_statistics() { if (session_trx_is_ending(m_client->session)) { mxb::atomic::add(m_qc.is_trx_still_read_only() ? &m_router->stats().n_ro_trx : &m_router->stats().n_rw_trx, 1, mxb::atomic::RELAXED); } } mxs::SRWBackends m_backends; /**< Mem. management, not for use outside RWSplitSession */ mxs::PRWBackends m_raw_backends; /**< Backend pointers for use in interfaces . */ mxs::RWBackend* m_current_master; /**< Current master server */ mxs::RWBackend* m_target_node; /**< The currently locked target node */ mxs::RWBackend* m_prev_target; /**< The previous target where a query was sent */ Config m_config; /**< Configuration for this session */ int m_last_keepalive_check; /**< When the last ping was done */ int m_nbackends; /**< Number of backend servers (obsolete) */ DCB* m_client; /**< The client DCB */ uint64_t m_sescmd_count; /**< Number of executed session commands (starts from 1) */ int m_expected_responses; /**< Number of expected responses to the current * query */ std::deque m_query_queue; /**< Queued commands waiting to be executed */ RWSplit* m_router; /**< The router instance */ mxs::SessionCommandList m_sescmd_list; /**< List of executed session commands */ ResponseMap m_sescmd_responses; /**< Response to each session command */ SlaveResponseList m_slave_responses; /**< Slaves that replied before the master */ uint64_t m_sent_sescmd; /**< ID of the last sent session command*/ uint64_t m_recv_sescmd; /**< ID of the most recently completed session * command */ ExecMap m_exec_map; /**< Map of COM_STMT_EXECUTE statement IDs to * Backends */ std::string m_gtid_pos; /**< Gtid position for causal read */ wait_gtid_state m_wait_gtid; /**< State of MASTER_GTID_WAIT reply */ uint32_t m_next_seq; /**< Next packet's sequence number */ mxs::QueryClassifier m_qc; /**< The query classifier. */ uint64_t m_retry_duration; /**< Total time spent retrying queries */ mxs::Buffer m_current_query; /**< Current query being executed */ Trx m_trx; /**< Current transaction */ bool m_is_replay_active; /**< Whether we are actively replaying a * transaction */ bool m_can_replay_trx; /**< Whether the transaction can be replayed */ Trx m_replayed_trx; /**< The transaction we are replaying */ mxs::Buffer m_interrupted_query; /**< Query that was interrupted mid-transaction. */ Trx m_orig_trx; /**< The backup of the transaction we're replaying */ mxs::Buffer m_orig_stmt; /**< The backup of the statement that was interrupted */ int64_t m_num_trx_replays = 0; /**< How many times trx replay has been attempted */ otrx_state m_otrx_state = OTRX_INACTIVE; /**< Optimistic trx state*/ SrvStatMap& m_server_stats; /**< The server stats local to this thread, cached in the session object. * This avoids the lookup involved in getting the worker-local value from * the worker's container.*/ }; /** * @brief Get the internal ID for the given binary prepared statement * * @param rses Router client session * @param buffer Buffer containing a binary protocol statement other than COM_STMT_PREPARE * * @return The internal ID of the prepared statement that the buffer contents refer to */ uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer); static inline const char* route_target_to_string(route_target_t target) { if (TARGET_IS_MASTER(target)) { return "TARGET_MASTER"; } else if (TARGET_IS_SLAVE(target)) { return "TARGET_SLAVE"; } else if (TARGET_IS_NAMED_SERVER(target)) { return "TARGET_NAMED_SERVER"; } else if (TARGET_IS_ALL(target)) { return "TARGET_ALL"; } else if (TARGET_IS_RLAG_MAX(target)) { return "TARGET_RLAG_MAX"; } else if (TARGET_IS_LAST_USED(target)) { return "TARGET_LAST_USED"; } else { mxb_assert(!true); return "Unknown target value"; } }