489 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			489 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/*
 | 
						|
 * Copyright (c) 2019 MariaDB Corporation Ab
 | 
						|
 *
 | 
						|
 * Use of this software is governed by the Business Source License included
 | 
						|
 * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | 
						|
 *
 | 
						|
 * Change Date: 2022-01-01
 | 
						|
 *
 | 
						|
 * On the date above, in accordance with the Business Source License, use
 | 
						|
 * of this software will be governed by version 2 or later of the General
 | 
						|
 * Public License.
 | 
						|
 */
 | 
						|
#include "smartsession.hh"
 | 
						|
#include "smartrouter.hh"
 | 
						|
 | 
						|
#include <maxscale/modutil.hh>
 | 
						|
#include <maxsql/mysql_plus.hh>
 | 
						|
 | 
						|
// TO_REVIEW:
 | 
						|
// This is the base for Smart Router. It will currently route any normal query to all
 | 
						|
// configured routers and only use the first response to forward back to the client.
 | 
						|
// However, it should be reviewed as if it could actually be put in front of
 | 
						|
// several current routers (several readwritesplits and readconnroutes) and
 | 
						|
// succeed for anything but local infile.
 | 
						|
 | 
						|
// TO_REVIEW. There is no need to go through the functionality with a very, very fine-comb at this point,
 | 
						|
//            maria-test will be used to do that. The idea is really to look for the totality of the
 | 
						|
//            router and how it will interact with the rest of the system.
 | 
						|
 | 
						|
// TO_REVIEW routeQuery() and clientReply() are the obvious functions to check for correctness.
 | 
						|
 | 
						|
// TO_REVIEW my use of mxs::QueryClassifier might be overly simple. The use should
 | 
						|
//           be as simple as possible, but no simpler.
 | 
						|
 | 
						|
 | 
						|
// TODO, missing error handling. I did not add overly many asserts, which make reading code harder.
 | 
						|
//       But please note any that may be missing.
 | 
						|
 | 
						|
// TODO, for m_qc.target_is_all(), check that responses from all routers match.
 | 
						|
 | 
						|
// TODO Smart Query is not here yet, this is just a stupid router-router.
 | 
						|
 | 
						|
// COPY-PASTED error-extraction functions from rwsplit. TODO move to lib.
 | 
						|
inline void extract_error_state(uint8_t* pBuffer, uint8_t** ppState, uint16_t* pnState)
 | 
						|
{
 | 
						|
    mxb_assert(MYSQL_IS_ERROR_PACKET(pBuffer));
 | 
						|
 | 
						|
    // The payload starts with a one byte command followed by a two byte error code,
 | 
						|
    // followed by a 1 byte sql state marker and 5 bytes of sql state. In this context
 | 
						|
    // the marker and the state itself are combined.
 | 
						|
    *ppState = pBuffer + MYSQL_HEADER_LEN + 1 + 2;
 | 
						|
    *pnState = 6;
 | 
						|
}
 | 
						|
 | 
						|
inline void extract_error_message(uint8_t* pBuffer, uint8_t** ppMessage, uint16_t* pnMessage)
 | 
						|
{
 | 
						|
    mxb_assert(MYSQL_IS_ERROR_PACKET(pBuffer));
 | 
						|
 | 
						|
    int packet_len = MYSQL_HEADER_LEN + MYSQL_GET_PAYLOAD_LEN(pBuffer);
 | 
						|
 | 
						|
    // The payload starts with a one byte command followed by a two byte error code,
 | 
						|
    // followed by a 1 byte sql state marker and 5 bytes of sql state, followed by
 | 
						|
    // a message until the end of the packet.
 | 
						|
    *ppMessage = pBuffer + MYSQL_HEADER_LEN + 1 + 2 + 1 + 5;
 | 
						|
    *pnMessage = packet_len - MYSQL_HEADER_LEN - 1 - 2 - 1 - 5;
 | 
						|
}
 | 
						|
 | 
						|
std::string extract_error(GWBUF* buffer)
 | 
						|
{
 | 
						|
    std::string rval;
 | 
						|
 | 
						|
    if (MYSQL_IS_ERROR_PACKET(((uint8_t*)GWBUF_DATA(buffer))))
 | 
						|
    {
 | 
						|
        size_t replylen = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)) + MYSQL_HEADER_LEN;
 | 
						|
        uint8_t replybuf[replylen];
 | 
						|
        gwbuf_copy_data(buffer, 0, sizeof(replybuf), replybuf);
 | 
						|
 | 
						|
        uint8_t* pState;
 | 
						|
        uint16_t nState;
 | 
						|
        extract_error_state(replybuf, &pState, &nState);
 | 
						|
 | 
						|
        uint8_t* pMessage;
 | 
						|
        uint16_t nMessage;
 | 
						|
        extract_error_message(replybuf, &pMessage, &nMessage);
 | 
						|
 | 
						|
        std::string err(reinterpret_cast<const char*>(pState), nState);
 | 
						|
        std::string msg(reinterpret_cast<const char*>(pMessage), nMessage);
 | 
						|
 | 
						|
        rval = err + ": " + msg;
 | 
						|
    }
 | 
						|
 | 
						|
    return rval;
 | 
						|
}
 | 
						|
 | 
						|
SmartRouterSession::SmartRouterSession(SmartRouter*,
 | 
						|
                                       MXS_SESSION* pSession,
 | 
						|
                                       Clusters clusters)
 | 
						|
    : mxs::RouterSession(pSession)
 | 
						|
    , m_pClient_dcb(pSession->client_dcb)
 | 
						|
    , m_clusters(std::move(clusters))
 | 
						|
    , m_qc(this, pSession, TYPE_ALL)
 | 
						|
{
 | 
						|
}
 | 
						|
 | 
						|
std::vector<maxbase::Host> SmartRouterSession::hosts() const
 | 
						|
{
 | 
						|
    std::vector<maxbase::Host> ret;
 | 
						|
    for (const auto& c : m_clusters)
 | 
						|
    {
 | 
						|
        ret.push_back(c.host);
 | 
						|
    }
 | 
						|
    return ret;
 | 
						|
}
 | 
						|
 | 
						|
SmartRouterSession::~SmartRouterSession()
 | 
						|
{
 | 
						|
}
 | 
						|
 | 
						|
// static
 | 
						|
SmartRouterSession* SmartRouterSession::create(SmartRouter* pRouter, MXS_SESSION* pSession)
 | 
						|
{
 | 
						|
    Clusters clusters;
 | 
						|
 | 
						|
    SERVER* pMaster = pRouter->config().master();
 | 
						|
 | 
						|
    int master_pos = -1;
 | 
						|
    int i = 0;
 | 
						|
 | 
						|
    for (SERVER_REF* ref = pRouter->service()->dbref; ref; ref = ref->next)
 | 
						|
    {
 | 
						|
        if (!server_ref_is_active(ref) || !ref->server->is_connectable())
 | 
						|
        {
 | 
						|
            continue;
 | 
						|
        }
 | 
						|
 | 
						|
        mxb_assert(ref->server->is_usable());
 | 
						|
 | 
						|
        DCB* dcb = dcb_connect(ref->server, pSession, ref->server->protocol().c_str());
 | 
						|
        if (dcb)
 | 
						|
        {
 | 
						|
            bool is_master = (ref->server == pMaster);
 | 
						|
 | 
						|
            clusters.push_back({ref, dcb, is_master});
 | 
						|
 | 
						|
            if (is_master)
 | 
						|
            {
 | 
						|
                master_pos = i;
 | 
						|
            }
 | 
						|
 | 
						|
            ++i;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    SmartRouterSession* pSess = nullptr;
 | 
						|
 | 
						|
    if (master_pos != -1)
 | 
						|
    {
 | 
						|
        if (master_pos > 0)
 | 
						|
        {   // put the master first. There must be exactly one master cluster.
 | 
						|
            std::swap(clusters[0], clusters[master_pos]);
 | 
						|
        }
 | 
						|
 | 
						|
        pSess = new SmartRouterSession(pRouter, pSession, std::move(clusters));
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        MXS_ERROR("No master found for %s, smartrouter session cannot be created.",
 | 
						|
                  pRouter->config().name().c_str());
 | 
						|
    }
 | 
						|
 | 
						|
    return pSess;
 | 
						|
}
 | 
						|
 | 
						|
int SmartRouterSession::routeQuery(GWBUF* pBuf)
 | 
						|
{
 | 
						|
    bool ret = false;
 | 
						|
 | 
						|
    if (expecting_request_packets())
 | 
						|
    {
 | 
						|
        write_split_packets(pBuf);
 | 
						|
        if (all_clusters_are_idle())
 | 
						|
        {
 | 
						|
            m_mode = Mode::Idle;
 | 
						|
        }
 | 
						|
    }
 | 
						|
    else if (m_mode != Mode::Idle)
 | 
						|
    {
 | 
						|
        auto is_busy = !all_clusters_are_idle();
 | 
						|
        // TODO add more detail, operator<< to PacketRouter.
 | 
						|
        MXS_SERROR("routeQuery() in wrong state. clusters busy = " << std::boolalpha << is_busy);
 | 
						|
        mxb_assert(false);
 | 
						|
        ret = false;
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        auto route_info = m_qc.update_route_info(mxs::QueryClassifier::CURRENT_TARGET_UNDEFINED, pBuf);
 | 
						|
        if (m_qc.target_is_all(route_info.target()))
 | 
						|
        {
 | 
						|
            MXS_SDEBUG("Write all");
 | 
						|
            ret = write_to_all(pBuf);
 | 
						|
        }
 | 
						|
        else if (m_qc.target_is_master(route_info.target()) || session_trx_is_active(m_pClient_dcb->session))
 | 
						|
        {
 | 
						|
            MXS_SDEBUG("Write to master");
 | 
						|
            ret = write_to_master(pBuf);
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            // TODO: This is where canonical performance data will be used, and measurements initiated
 | 
						|
            //       Currently writing to all for clientReply testing purposes.
 | 
						|
            ret = write_to_all(pBuf);
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    return ret;
 | 
						|
}
 | 
						|
 | 
						|
void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
 | 
						|
{
 | 
						|
    mxb_assert(GWBUF_IS_CONTIGUOUS(pPacket));   // TODO, do non-contiguous for slightly better speed?
 | 
						|
 | 
						|
    auto it = std::find_if(begin(m_clusters), end(m_clusters),
 | 
						|
                           [pDcb](const Cluster& cluster) {
 | 
						|
                               return cluster.pDcb == pDcb;
 | 
						|
                           });
 | 
						|
 | 
						|
    mxb_assert(it != end(m_clusters));
 | 
						|
 | 
						|
    Cluster& cluster = *it;
 | 
						|
 | 
						|
    auto tracker_state_before = cluster.tracker.state();
 | 
						|
 | 
						|
    cluster.tracker.update_response(pPacket);
 | 
						|
 | 
						|
    // these flags can all be true at the same time
 | 
						|
    bool first_response_packet = m_mode != Mode::CollectResults;
 | 
						|
    bool last_packet_for_this_cluster = !cluster.tracker.expecting_response_packets();
 | 
						|
    bool very_last_response_packet = !expecting_response_packets();     // last from all clusters
 | 
						|
 | 
						|
    MXS_SDEBUG("Reply from " << std::boolalpha
 | 
						|
                             << cluster.host
 | 
						|
                             << " is_master=" << cluster.is_master
 | 
						|
                             << " first_packet=" << first_response_packet
 | 
						|
                             << " last_packet=" << last_packet_for_this_cluster
 | 
						|
                             << " very_last_packet=" << very_last_response_packet
 | 
						|
                             << " delayed_response=" << (m_pDelayed_packet != nullptr)
 | 
						|
                             << " tracker_state: " << tracker_state_before << " => "
 | 
						|
                             << cluster.tracker.state());
 | 
						|
 | 
						|
    // marker1: If a connection is lost down the pipeline, we first get an ErrorPacket, then a call to
 | 
						|
    // handleError(). If we only rely on the handleError() the client receiving the ErrorPacket
 | 
						|
    // can retry using this connection/session, causing a an error (or assert) in routeQuery().
 | 
						|
    // This will change once we implement direct function calls to the Clusters (which really
 | 
						|
    // are routers).
 | 
						|
    if (cluster.tracker.state() == maxsql::PacketTracker::State::ErrorPacket)
 | 
						|
    {
 | 
						|
        auto err_code = mxs_mysql_get_mysql_errno(pPacket);
 | 
						|
        switch (err_code)
 | 
						|
        {
 | 
						|
        case ER_CONNECTION_KILLED:      // there might be more error codes needing to be caught here
 | 
						|
            MXS_SERROR("clientReply(): Lost connection to "
 | 
						|
                       << cluster.host << " Error code=" << err_code << " "
 | 
						|
                       << extract_error(pPacket));
 | 
						|
            poll_fake_hangup_event(m_pClient_dcb);
 | 
						|
            return;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    if (cluster.tracker.state() == maxsql::PacketTracker::State::Error)
 | 
						|
    {
 | 
						|
        // TODO add more info
 | 
						|
        MXS_SERROR("ProtocolTracker error in state " << tracker_state_before);
 | 
						|
        poll_fake_hangup_event(m_pClient_dcb);
 | 
						|
        return;
 | 
						|
    }
 | 
						|
 | 
						|
    bool will_reply = false;
 | 
						|
 | 
						|
    if (first_response_packet)
 | 
						|
    {
 | 
						|
        MXS_SDEBUG("Host " << cluster.host << " will be responding to the client");
 | 
						|
        cluster.is_replying_to_client = true;
 | 
						|
        m_mode = Mode::CollectResults;
 | 
						|
        will_reply = true;      // tentatively, the packet might have to be delayed
 | 
						|
    }
 | 
						|
 | 
						|
    if (very_last_response_packet)
 | 
						|
    {
 | 
						|
        will_reply = true;
 | 
						|
        m_mode = Mode::Idle;
 | 
						|
        mxb_assert(cluster.is_replying_to_client || m_pDelayed_packet);
 | 
						|
        if (m_pDelayed_packet)
 | 
						|
        {
 | 
						|
            MXS_SDEBUG("Picking up delayed packet, discarding response from" << cluster.host);
 | 
						|
            gwbuf_free(pPacket);
 | 
						|
            pPacket = m_pDelayed_packet;
 | 
						|
            m_pDelayed_packet = nullptr;
 | 
						|
        }
 | 
						|
    }
 | 
						|
    else if (cluster.is_replying_to_client)
 | 
						|
    {
 | 
						|
        if (last_packet_for_this_cluster)
 | 
						|
        {
 | 
						|
            // Delay sending the last packet until all clusters have responded. The code currently
 | 
						|
            // does not allow multiple client-queries at the same time (no query buffer)
 | 
						|
            MXS_SDEBUG("Delaying last packet");
 | 
						|
            mxb_assert(!m_pDelayed_packet);
 | 
						|
            m_pDelayed_packet = pPacket;
 | 
						|
            will_reply = false;
 | 
						|
        }
 | 
						|
        else
 | 
						|
        {
 | 
						|
            will_reply = true;
 | 
						|
        }
 | 
						|
    }
 | 
						|
    else
 | 
						|
    {
 | 
						|
        MXS_SDEBUG("Discarding response from " << cluster.host);
 | 
						|
        gwbuf_free(pPacket);
 | 
						|
    }
 | 
						|
 | 
						|
    if (will_reply)
 | 
						|
    {
 | 
						|
        MXS_SDEBUG("Forward response to client");
 | 
						|
        MXS_SESSION_ROUTE_REPLY(pDcb->session, pPacket);
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
bool SmartRouterSession::expecting_request_packets() const
 | 
						|
{
 | 
						|
    return std::any_of(begin(m_clusters), end(m_clusters),
 | 
						|
                       [](const Cluster& cluster) {
 | 
						|
                           return cluster.tracker.expecting_request_packets();
 | 
						|
                       });
 | 
						|
}
 | 
						|
 | 
						|
bool SmartRouterSession::expecting_response_packets() const
 | 
						|
{
 | 
						|
    return std::any_of(begin(m_clusters), end(m_clusters),
 | 
						|
                       [](const Cluster& cluster) {
 | 
						|
                           return cluster.tracker.expecting_response_packets();
 | 
						|
                       });
 | 
						|
}
 | 
						|
 | 
						|
bool SmartRouterSession::all_clusters_are_idle() const
 | 
						|
{
 | 
						|
    return std::all_of(begin(m_clusters), end(m_clusters),
 | 
						|
                       [](const Cluster& cluster) {
 | 
						|
                           return !cluster.tracker.expecting_more_packets();
 | 
						|
                       });
 | 
						|
}
 | 
						|
 | 
						|
bool SmartRouterSession::write_to_master(GWBUF* pBuf)
 | 
						|
{
 | 
						|
    mxb_assert(!m_clusters.empty());
 | 
						|
    auto& cluster = m_clusters[0];
 | 
						|
    mxb_assert(cluster.is_master);
 | 
						|
    cluster.tracker = maxsql::PacketTracker(pBuf);
 | 
						|
    cluster.is_replying_to_client = false;
 | 
						|
 | 
						|
    if (cluster.tracker.expecting_response_packets())
 | 
						|
    {
 | 
						|
        m_mode = Mode::Query;
 | 
						|
    }
 | 
						|
 | 
						|
    return cluster.pDcb->func.write(cluster.pDcb, pBuf);
 | 
						|
}
 | 
						|
 | 
						|
bool SmartRouterSession::write_to_host(const maxbase::Host& host, GWBUF* pBuf)
 | 
						|
{
 | 
						|
    auto it = std::find_if(begin(m_clusters), end(m_clusters), [host](const Cluster& cluster) {
 | 
						|
                               return cluster.host == host;
 | 
						|
                           });
 | 
						|
    mxb_assert(it != end(m_clusters));
 | 
						|
    auto& cluster = *it;
 | 
						|
    cluster.tracker = maxsql::PacketTracker(pBuf);
 | 
						|
    if (cluster.tracker.expecting_response_packets())
 | 
						|
    {
 | 
						|
        m_mode = Mode::Query;
 | 
						|
    }
 | 
						|
 | 
						|
    cluster.is_replying_to_client = false;
 | 
						|
 | 
						|
    return cluster.pDcb->func.write(cluster.pDcb, pBuf);
 | 
						|
}
 | 
						|
 | 
						|
bool SmartRouterSession::write_to_all(GWBUF* pBuf)
 | 
						|
{
 | 
						|
    for (auto it = begin(m_clusters); it != end(m_clusters); ++it)
 | 
						|
    {
 | 
						|
        auto& cluster = *it;
 | 
						|
        cluster.tracker = maxsql::PacketTracker(pBuf);
 | 
						|
        cluster.is_replying_to_client = false;
 | 
						|
        auto pBuf_send = (std::next(it) == end(m_clusters)) ? pBuf : gwbuf_clone(pBuf);
 | 
						|
        cluster.pDcb->func.write(cluster.pDcb, pBuf_send);
 | 
						|
    }
 | 
						|
 | 
						|
    if (expecting_response_packets())
 | 
						|
    {
 | 
						|
        m_mode = Mode::Query;
 | 
						|
    }
 | 
						|
 | 
						|
    return true;    // TODO. What could possibly go wrong?
 | 
						|
}
 | 
						|
 | 
						|
bool SmartRouterSession::write_split_packets(GWBUF* pBuf)
 | 
						|
{
 | 
						|
    std::vector<Cluster*> active;
 | 
						|
 | 
						|
    for (auto it = begin(m_clusters); it != end(m_clusters); ++it)
 | 
						|
    {
 | 
						|
        if (it->tracker.expecting_request_packets())
 | 
						|
        {
 | 
						|
            active.push_back(&*it);
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    for (auto it = begin(active); it != end(active); ++it)
 | 
						|
    {
 | 
						|
        auto& cluster = **it;
 | 
						|
        auto pBuf_send = (std::next(it) == end(active)) ? pBuf : gwbuf_clone(pBuf);
 | 
						|
        cluster.pDcb->func.write(cluster.pDcb, pBuf_send);
 | 
						|
    }
 | 
						|
 | 
						|
    return true;    // TODO. What could possibly go wrong?
 | 
						|
}
 | 
						|
 | 
						|
void SmartRouterSession::handleError(GWBUF* pPacket,
 | 
						|
                                     DCB* pProblem,
 | 
						|
                                     mxs_error_action_t action,
 | 
						|
                                     bool* pSuccess)
 | 
						|
{
 | 
						|
    // One of the clusters closed the connection, in terms of SmartRouter this is a hopeless situation.
 | 
						|
    // Close the shop, and let the client retry. Also see marker1.
 | 
						|
    auto it = std::find_if(begin(m_clusters), end(m_clusters),
 | 
						|
                           [pProblem](const Cluster& cluster) {
 | 
						|
                               return cluster.pDcb == pProblem;
 | 
						|
                           });
 | 
						|
 | 
						|
    mxb_assert(it != end(m_clusters));
 | 
						|
    Cluster& cluster = *it;
 | 
						|
    // TODO: Will the session close gracefully, or is some more checking needed here.
 | 
						|
 | 
						|
    auto err_code = mxs_mysql_get_mysql_errno(pPacket);
 | 
						|
    MXS_SERROR("handleError(): Lost connection to " << cluster.host << " Error code=" << err_code << " "
 | 
						|
                                                    << extract_error(pPacket));
 | 
						|
 | 
						|
    MXS_SESSION* pSession = pProblem->session;
 | 
						|
    mxs_session_state_t sesstate = pSession->state;
 | 
						|
 | 
						|
    /* Send error report to client */
 | 
						|
    GWBUF* pCopy = gwbuf_clone(pPacket);
 | 
						|
    if (pCopy)
 | 
						|
    {
 | 
						|
        DCB* pClient = pSession->client_dcb;
 | 
						|
        pClient->func.write(pClient, pCopy);
 | 
						|
    }
 | 
						|
 | 
						|
 | 
						|
    // This will lead to the rest of the connections to be closed.
 | 
						|
    *pSuccess = false;
 | 
						|
}
 | 
						|
 | 
						|
bool SmartRouterSession::lock_to_master()
 | 
						|
{
 | 
						|
    return false;
 | 
						|
}
 | 
						|
 | 
						|
bool SmartRouterSession::is_locked_to_master() const
 | 
						|
{
 | 
						|
    return false;
 | 
						|
}
 | 
						|
 | 
						|
bool SmartRouterSession::supports_hint(HINT_TYPE hint_type) const
 | 
						|
{
 | 
						|
    return false;
 | 
						|
}
 | 
						|
 | 
						|
void SmartRouterSession::close()
 | 
						|
{
 | 
						|
    for (auto& cluster : m_clusters)
 | 
						|
    {
 | 
						|
        if (cluster.pDcb)
 | 
						|
        {
 | 
						|
            dcb_close(const_cast<DCB*>(cluster.pDcb));
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 |