diff --git a/server/modules/routing/CMakeLists.txt b/server/modules/routing/CMakeLists.txt index 2131c51b6..420298746 100644 --- a/server/modules/routing/CMakeLists.txt +++ b/server/modules/routing/CMakeLists.txt @@ -12,5 +12,4 @@ add_subdirectory(maxinfo) add_subdirectory(readconnroute) add_subdirectory(readwritesplit) add_subdirectory(schemarouter) - - +add_subdirectory(smartrouter) diff --git a/server/modules/routing/smartrouter/CMakeLists.txt b/server/modules/routing/smartrouter/CMakeLists.txt new file mode 100644 index 000000000..38b36df35 --- /dev/null +++ b/server/modules/routing/smartrouter/CMakeLists.txt @@ -0,0 +1,8 @@ +add_library(smartrouter SHARED +smartrouter.cc +smartsession.cc +) + +target_link_libraries(smartrouter maxscale-common mysqlcommon) +set_target_properties(smartrouter PROPERTIES VERSION "1.0.0" LINK_FLAGS -Wl,-z,defs) +install_module(smartrouter core) diff --git a/server/modules/routing/smartrouter/smartrouter.cc b/server/modules/routing/smartrouter/smartrouter.cc new file mode 100644 index 000000000..4e9885eb7 --- /dev/null +++ b/server/modules/routing/smartrouter/smartrouter.cc @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2019 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2022-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "smartrouter.hh" +#include "smartsession.hh" + +#include + +/** + * The module entry point. + * + * @return The module object + */ +extern "C" MXS_MODULE* MXS_CREATE_MODULE() +{ + MXS_NOTICE("Initialise smartrouter module."); + + static MXS_MODULE info = + { + MXS_MODULE_API_ROUTER, + MXS_MODULE_GA, + MXS_ROUTER_VERSION, + "Provides routing for the Smart Query feature", + "V1.0.0", + RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_CONTIGUOUS_INPUT | RCAP_TYPE_CONTIGUOUS_OUTPUT, + &SmartRouter::s_object, + nullptr, /* Process init. */ + nullptr, /* Process finish. */ + nullptr, /* Thread init. */ + nullptr, /* Thread finish. */ + { + {MXS_END_MODULE_PARAMS} + } + }; + + return &info; +} + +bool SmartRouter::configure(MXS_CONFIG_PARAMETER* pParams) +{ + // TODO ensure Servers are internal ones. Later TODO call routers directly. + return true; +} + +SERVICE* SmartRouter::service() const +{ + return m_pService; +} + +SmartRouter::SmartRouter(SERVICE* service) + : mxs::Router(service) +{ +} + +SmartRouterSession* SmartRouter::newSession(MXS_SESSION* pSession) +{ + SmartRouterSession* pRouter = nullptr; + MXS_EXCEPTION_GUARD(pRouter = SmartRouterSession::create(this, pSession)); + return pRouter; +} + +// static +SmartRouter* SmartRouter::create(SERVICE* pService, MXS_CONFIG_PARAMETER* pParams) +{ + SmartRouter* pRouter = new(std::nothrow) SmartRouter(pService); + + if (pRouter && !pRouter->configure(pParams)) + { + delete pRouter; + pRouter = nullptr; + } + + return pRouter; +} + +void SmartRouter::diagnostics(DCB* pDcb) +{ +} + +json_t* SmartRouter::diagnostics_json() const +{ + json_t* pJson = json_object(); + + return pJson; +} + +uint64_t SmartRouter::getCapabilities() +{ + return RCAP_TYPE_TRANSACTION_TRACKING | RCAP_TYPE_CONTIGUOUS_INPUT | RCAP_TYPE_CONTIGUOUS_OUTPUT; +} diff --git a/server/modules/routing/smartrouter/smartrouter.hh b/server/modules/routing/smartrouter/smartrouter.hh new file mode 100644 index 000000000..526a5a9ed --- /dev/null +++ b/server/modules/routing/smartrouter/smartrouter.hh @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2019 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2022-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ +#pragma once + +#define MXS_MODULE_NAME "smartrouter" + +/** + * @file Smart Router. Routes queries to the best router for the type of query. + */ + +#include +#include + +class SmartRouterSession; + +/** class Smartrouter. Only defines the mxs::Router<> functions needed for all routers. + */ +class SmartRouter : public mxs::Router +{ +public: + static SmartRouter* create(SERVICE* pService, MXS_CONFIG_PARAMETER* pParams); + + SmartRouterSession* newSession(MXS_SESSION* pSession); + + void diagnostics(DCB* pDcb); + json_t* diagnostics_json() const; + uint64_t getCapabilities(); + bool configure(MXS_CONFIG_PARAMETER* pParams); + + SERVICE* service() const; +private: + SmartRouter(SERVICE* service); +}; diff --git a/server/modules/routing/smartrouter/smartsession.cc b/server/modules/routing/smartrouter/smartsession.cc new file mode 100644 index 000000000..4518814b9 --- /dev/null +++ b/server/modules/routing/smartrouter/smartsession.cc @@ -0,0 +1,468 @@ +/* + * Copyright (c) 2019 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2022-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ +#include "smartsession.hh" +#include "smartrouter.hh" + +#include +#include + +// TO_REVIEW: +// This is the base for Smart Router. It will currently route any normal query to all +// configured routers and only use the first response to forward back to the client. +// However, it should be reviewed as if it could actually be put in front of +// several current routers (several readwritesplits and readconnroutes) and +// succeed for anything but local infile. + +// TO_REVIEW. There is no need to go through the functionality with a very, very fine-comb at this point, +// maria-test will be used to do that. The idea is really to look for the totality of the +// router and how it will interact with the rest of the system. + +// TO_REVIEW routeQuery() and clientReply() are the obvious functions to check for correctness. + +// TO_REVIEW my use of mxs::QueryClassifier might be overly simple. The use should +// be as simple as possible, but no simpler. + + +// TODO, missing error handling. I did not add overly many asserts, which make reading code harder. +// But please note any that may be missing. + +// TODO, for m_qc.target_is_all(), check that responses from all routers match. + +// TODO Smart Query is not here yet, this is just a stupid router-router. + +// COPY-PASTED error-extraction functions from rwsplit. TODO move to lib. +inline void extract_error_state(uint8_t* pBuffer, uint8_t** ppState, uint16_t* pnState) +{ + mxb_assert(MYSQL_IS_ERROR_PACKET(pBuffer)); + + // The payload starts with a one byte command followed by a two byte error code, + // followed by a 1 byte sql state marker and 5 bytes of sql state. In this context + // the marker and the state itself are combined. + *ppState = pBuffer + MYSQL_HEADER_LEN + 1 + 2; + *pnState = 6; +} + +inline void extract_error_message(uint8_t* pBuffer, uint8_t** ppMessage, uint16_t* pnMessage) +{ + mxb_assert(MYSQL_IS_ERROR_PACKET(pBuffer)); + + int packet_len = MYSQL_HEADER_LEN + MYSQL_GET_PAYLOAD_LEN(pBuffer); + + // The payload starts with a one byte command followed by a two byte error code, + // followed by a 1 byte sql state marker and 5 bytes of sql state, followed by + // a message until the end of the packet. + *ppMessage = pBuffer + MYSQL_HEADER_LEN + 1 + 2 + 1 + 5; + *pnMessage = packet_len - MYSQL_HEADER_LEN - 1 - 2 - 1 - 5; +} + +std::string extract_error(GWBUF* buffer) +{ + std::string rval; + + if (MYSQL_IS_ERROR_PACKET(((uint8_t*)GWBUF_DATA(buffer)))) + { + size_t replylen = MYSQL_GET_PAYLOAD_LEN(GWBUF_DATA(buffer)) + MYSQL_HEADER_LEN; + uint8_t replybuf[replylen]; + gwbuf_copy_data(buffer, 0, sizeof(replybuf), replybuf); + + uint8_t* pState; + uint16_t nState; + extract_error_state(replybuf, &pState, &nState); + + uint8_t* pMessage; + uint16_t nMessage; + extract_error_message(replybuf, &pMessage, &nMessage); + + std::string err(reinterpret_cast(pState), nState); + std::string msg(reinterpret_cast(pMessage), nMessage); + + rval = err + ": " + msg; + } + + return rval; +} + +SmartRouterSession::SmartRouterSession(SmartRouter*, + MXS_SESSION* pSession, + Clusters clusters) + : mxs::RouterSession(pSession) + , m_pClient_dcb(pSession->client_dcb) + , m_clusters(std::move(clusters)) + , m_qc(this, pSession, TYPE_ALL) +{ +} + +std::vector SmartRouterSession::hosts() const +{ + std::vector ret; + for (const auto& c : m_clusters) + { + ret.push_back(c.host); + } + return ret; +} + +SmartRouterSession::~SmartRouterSession() +{ +} + +// static +SmartRouterSession* SmartRouterSession::create(SmartRouter* pRouter, MXS_SESSION* pSession) +{ + Clusters clusters; + + bool is_master = true; // TODO this will be read from config + int master_pos = 0; // and this will be initialized to the position of the master + + for (SERVER_REF* ref = pRouter->service()->dbref; ref; ref = ref->next) + { + if (!server_ref_is_active(ref) || !ref->server->is_connectable()) + { + continue; + } + + mxb_assert(ref->server->is_usable()); + + DCB* dcb = dcb_connect(ref->server, pSession, ref->server->protocol().c_str()); + if (dcb) + { + clusters.push_back({ref, dcb, is_master}); + is_master = false; // TODO, will come from config, there must be exactly one! + } + } + + if (master_pos) + { // put the master first. There must be exactly one master cluster. + std::swap(clusters[0], clusters[master_pos]); + } + + SmartRouterSession* pSess = new SmartRouterSession(pRouter, pSession, std::move(clusters)); + + return pSess; +} + +int SmartRouterSession::routeQuery(GWBUF* pBuf) +{ + bool ret = false; + + if (expecting_request_packets()) + { + write_split_packets(pBuf); + if (all_clusters_are_idle()) + { + m_mode = Mode::Idle; + } + } + else if (m_mode != Mode::Idle) + { + auto is_busy = !all_clusters_are_idle(); + // TODO add more detail, operator<< to PacketRouter. + MXS_SERROR("routeQuery() in wrong state. clusters busy = " << std::boolalpha << is_busy); + mxb_assert(false); + ret = false; + } + else + { + auto route_info = m_qc.update_route_info(mxs::QueryClassifier::CURRENT_TARGET_UNDEFINED, pBuf); + if (m_qc.target_is_all(route_info.target())) + { + MXS_SDEBUG("Write all"); + ret = write_to_all(pBuf); + } + else if (m_qc.target_is_master(route_info.target()) || session_trx_is_active(m_pClient_dcb->session)) + { + MXS_SDEBUG("Write to master"); + ret = write_to_master(pBuf); + } + else + { + // TODO: This is where canonical performance data will be used, and measurements initiated + // Currently writing to all for clientReply testing purposes. + ret = write_to_all(pBuf); + } + } + + return ret; +} + +void SmartRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) +{ + mxb_assert(GWBUF_IS_CONTIGUOUS(pPacket)); // TODO, do non-contiguous for slightly better speed? + + auto it = std::find_if(begin(m_clusters), end(m_clusters), + [pDcb](const Cluster& cluster) { + return cluster.pDcb == pDcb; + }); + + mxb_assert(it != end(m_clusters)); + + Cluster& cluster = *it; + + auto tracker_state_before = cluster.tracker.state(); + + cluster.tracker.update_response(pPacket); + + // these flags can all be true at the same time + bool first_response_packet = m_mode != Mode::CollectResults; + bool last_packet_for_this_cluster = !cluster.tracker.expecting_response_packets(); + bool very_last_response_packet = !expecting_response_packets(); // last from all clusters + + MXS_SDEBUG("Reply from " << std::boolalpha + << cluster.host + << " is_master=" << cluster.is_master + << " first_packet=" << first_response_packet + << " last_packet=" << last_packet_for_this_cluster + << " very_last_packet=" << very_last_response_packet + << " delayed_response=" << (m_pDelayed_packet != nullptr) + << " tracker_state: " << tracker_state_before << " => " + << cluster.tracker.state()); + + // marker1: If a connection is lost down the pipeline, we first get an ErrorPacket, then a call to + // handleError(). If we only rely on the handleError() the client receiving the ErrorPacket + // can retry using this connection/session, causing a an error (or assert) in routeQuery(). + // This will change once we implement direct function calls to the Clusters (which really + // are routers). + if (cluster.tracker.state() == maxsql::PacketTracker::State::ErrorPacket) + { + auto err_code = mxs_mysql_get_mysql_errno(pPacket); + switch (err_code) + { + case ER_CONNECTION_KILLED: // there might be more error codes needing to be caught here + MXS_SERROR("clientReply(): Lost connection to " + << cluster.host << " Error code=" << err_code << " " + << extract_error(pPacket)); + poll_fake_hangup_event(m_pClient_dcb); + return; + } + } + + if (cluster.tracker.state() == maxsql::PacketTracker::State::Error) + { + // TODO add more info + MXS_SERROR("ProtocolTracker error in state " << tracker_state_before); + poll_fake_hangup_event(m_pClient_dcb); + return; + } + + bool will_reply = false; + + if (first_response_packet) + { + MXS_SDEBUG("Host " << cluster.host << " will be responding to the client"); + cluster.is_replying_to_client = true; + m_mode = Mode::CollectResults; + will_reply = true; // tentatively, the packet might have to be delayed + } + + if (very_last_response_packet) + { + will_reply = true; + m_mode = Mode::Idle; + mxb_assert(cluster.is_replying_to_client || m_pDelayed_packet); + if (m_pDelayed_packet) + { + MXS_SDEBUG("Picking up delayed packet, discarding response from" << cluster.host); + gwbuf_free(pPacket); + pPacket = m_pDelayed_packet; + m_pDelayed_packet = nullptr; + } + } + else if (cluster.is_replying_to_client) + { + if (last_packet_for_this_cluster) + { + // Delay sending the last packet until all clusters have responded. The code currently + // does not allow multiple client-queries at the same time (no query buffer) + MXS_SDEBUG("Delaying last packet"); + mxb_assert(!m_pDelayed_packet); + m_pDelayed_packet = pPacket; + will_reply = false; + } + else + { + will_reply = true; + } + } + else + { + MXS_SDEBUG("Discarding response from " << cluster.host); + gwbuf_free(pPacket); + } + + if (will_reply) + { + MXS_SDEBUG("Forward response to client"); + MXS_SESSION_ROUTE_REPLY(pDcb->session, pPacket); + } +} + +bool SmartRouterSession::expecting_request_packets() const +{ + return std::any_of(begin(m_clusters), end(m_clusters), + [](const Cluster& cluster) { + return cluster.tracker.expecting_request_packets(); + }); +} + +bool SmartRouterSession::expecting_response_packets() const +{ + return std::any_of(begin(m_clusters), end(m_clusters), + [](const Cluster& cluster) { + return cluster.tracker.expecting_response_packets(); + }); +} + +bool SmartRouterSession::all_clusters_are_idle() const +{ + return std::all_of(begin(m_clusters), end(m_clusters), + [](const Cluster& cluster) { + return !cluster.tracker.expecting_more_packets(); + }); +} + +bool SmartRouterSession::write_to_master(GWBUF* pBuf) +{ + mxb_assert(!m_clusters.empty()); + auto& cluster = m_clusters[0]; + mxb_assert(cluster.is_master); + cluster.tracker = maxsql::PacketTracker(pBuf); + cluster.is_replying_to_client = false; + + if (cluster.tracker.expecting_response_packets()) + { + m_mode = Mode::Query; + } + + return cluster.pDcb->func.write(cluster.pDcb, pBuf); +} + +bool SmartRouterSession::write_to_host(const maxbase::Host& host, GWBUF* pBuf) +{ + auto it = std::find_if(begin(m_clusters), end(m_clusters), [host](const Cluster& cluster) { + return cluster.host == host; + }); + mxb_assert(it != end(m_clusters)); + auto& cluster = *it; + cluster.tracker = maxsql::PacketTracker(pBuf); + if (cluster.tracker.expecting_response_packets()) + { + m_mode = Mode::Query; + } + + cluster.is_replying_to_client = false; + + return cluster.pDcb->func.write(cluster.pDcb, pBuf); +} + +bool SmartRouterSession::write_to_all(GWBUF* pBuf) +{ + for (auto it = begin(m_clusters); it != end(m_clusters); ++it) + { + auto& cluster = *it; + cluster.tracker = maxsql::PacketTracker(pBuf); + cluster.is_replying_to_client = false; + auto pBuf_send = (std::next(it) == end(m_clusters)) ? pBuf : gwbuf_clone(pBuf); + cluster.pDcb->func.write(cluster.pDcb, pBuf_send); + } + + if (expecting_response_packets()) + { + m_mode = Mode::Query; + } + + return true; // TODO. What could possibly go wrong? +} + +bool SmartRouterSession::write_split_packets(GWBUF* pBuf) +{ + std::vector active; + + for (auto it = begin(m_clusters); it != end(m_clusters); ++it) + { + if (it->tracker.expecting_request_packets()) + { + active.push_back(&*it); + } + } + + for (auto it = begin(active); it != end(active); ++it) + { + auto& cluster = **it; + auto pBuf_send = (std::next(it) == end(active)) ? pBuf : gwbuf_clone(pBuf); + cluster.pDcb->func.write(cluster.pDcb, pBuf_send); + } + + return true; // TODO. What could possibly go wrong? +} + +void SmartRouterSession::handleError(GWBUF* pPacket, + DCB* pProblem, + mxs_error_action_t action, + bool* pSuccess) +{ + // One of the clusters closed the connection, in terms of SmartRouter this is a hopeless situation. + // Close the shop, and let the client retry. Also see marker1. + auto it = std::find_if(begin(m_clusters), end(m_clusters), + [pProblem](const Cluster& cluster) { + return cluster.pDcb == pProblem; + }); + + mxb_assert(it != end(m_clusters)); + Cluster& cluster = *it; + // TODO: Will the session close gracefully, or is some more checking needed here. + + auto err_code = mxs_mysql_get_mysql_errno(pPacket); + MXS_SERROR("handleError(): Lost connection to " << cluster.host << " Error code=" << err_code << " " + << extract_error(pPacket)); + + MXS_SESSION* pSession = pProblem->session; + mxs_session_state_t sesstate = pSession->state; + + /* Send error report to client */ + GWBUF* pCopy = gwbuf_clone(pPacket); + if (pCopy) + { + DCB* pClient = pSession->client_dcb; + pClient->func.write(pClient, pCopy); + } + + + // This will lead to the rest of the connections to be closed. + *pSuccess = false; +} + +bool SmartRouterSession::lock_to_master() +{ + return false; +} + +bool SmartRouterSession::is_locked_to_master() const +{ + return false; +} + +bool SmartRouterSession::supports_hint(HINT_TYPE hint_type) const +{ + return false; +} + +void SmartRouterSession::close() +{ + for (auto& cluster : m_clusters) + { + if (cluster.pDcb) + { + dcb_close(const_cast(cluster.pDcb)); + } + } +} diff --git a/server/modules/routing/smartrouter/smartsession.hh b/server/modules/routing/smartrouter/smartsession.hh new file mode 100644 index 000000000..c74c37a25 --- /dev/null +++ b/server/modules/routing/smartrouter/smartsession.hh @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2019 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2022-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ +#pragma once + +#include "smartrouter.hh" + +#include +#include +#include + +#include + +class SmartRouter; + +/** Currently SmartRouter is configured like this (star means many): + * SmartRouter -*> ServerAsService -> MaxscaleRouter -*> Server + * For the time being the limitation is that the tail router must be RWSplit. + * This will change once we implement it so that SmartRouter can call the tail router directly. + * Although the assumption is one RowServer and one ColumnServer, the code does not assume that, + * it simply forces you to state which Cluster is the master. + * Currently SmartRouter fails if any Cluster fails. That need not be the case, Clusters could + * be marked "is_critical" meaning non-crititcal non-masters, could be allowed to fail. + */ + +class SmartRouterSession : public mxs::RouterSession, private mxs::QueryClassifier::Handler +{ +public: + static SmartRouterSession* create(SmartRouter* pRouter, MXS_SESSION* pSession); + + virtual ~SmartRouterSession(); + SmartRouterSession(const SmartRouterSession&) = delete; + SmartRouterSession& operator=(const SmartRouterSession&) = delete; + + int routeQuery(GWBUF* pBuf); + void close(); + void clientReply(GWBUF* pPacket, DCB* pDcb); + void handleError(GWBUF* pPacket, + DCB* pProblem, + mxs_error_action_t action, + bool* pSuccess); + +private: + enum class Mode {Idle, Query, CollectResults}; // MeasureQuery + + /** struct Cluster represents a cluster of mariadb servers as a Maxscale internal Server. + * TODO In the next iteration a directly callable "Thing" should be implemented (Router, Backend + * Server - the terms are overused and confusing, maybe a new thing called MariaDB). + */ + struct Cluster + { + Cluster(SERVER_REF* b, DCB* pDcb, bool is_master) + : host {b->server->address, b->server->port} + , pBackend(b) + , pDcb(pDcb) + , is_master(is_master) + { + } + + Cluster(const Cluster&) = delete; + Cluster& operator=(const Cluster&) = delete; + + Cluster(Cluster&&) = default; + Cluster& operator=(Cluster&&) = default; + + maxbase::Host host; + SERVER_REF* pBackend; + DCB* pDcb; + bool is_master; + bool is_replying_to_client; + + maxsql::PacketTracker tracker; + }; + + using Clusters = std::vector; + + SmartRouterSession(SmartRouter*, MXS_SESSION* pSession, Clusters clusters); + + std::vector hosts() const; + + // The write functions initialize Cluster flags and Cluster::ProtocolTracker. + bool write_to_host(const maxbase::Host& host, GWBUF* pBuf); + bool write_to_master(GWBUF* pBuf); + bool write_to_all(GWBUF* pBuf); + bool write_split_packets(GWBUF* pBuf); + + bool expecting_request_packets() const; + bool expecting_response_packets() const; + bool all_clusters_are_idle() const; // no clusters expect packets + + // QueryClassifier::Handler overrides, not used. + bool lock_to_master() override; + bool is_locked_to_master() const override; + bool supports_hint(HINT_TYPE hint_type) const override; + + Mode m_mode = Mode::Idle; + GWBUF* m_pDelayed_packet = nullptr; + + DCB* m_pClient_dcb; + Clusters m_clusters; + mxs::QueryClassifier m_qc; +};