diff --git a/server/modules/routing/CMakeLists.txt b/server/modules/routing/CMakeLists.txt index b3bfc3ddc..54f6fa959 100644 --- a/server/modules/routing/CMakeLists.txt +++ b/server/modules/routing/CMakeLists.txt @@ -12,5 +12,6 @@ add_subdirectory(maxinfo) add_subdirectory(readconnroute) add_subdirectory(readwritesplit) add_subdirectory(schemarouter) +add_subdirectory(cat) diff --git a/server/modules/routing/cat/CMakeLists.txt b/server/modules/routing/cat/CMakeLists.txt new file mode 100644 index 000000000..98eb2666b --- /dev/null +++ b/server/modules/routing/cat/CMakeLists.txt @@ -0,0 +1,4 @@ +add_library(cat SHARED cat.cc catsession.cc ../readwritesplit/rwbackend.cc) +target_link_libraries(cat maxscale-common mysqlcommon) +set_target_properties(cat PROPERTIES VERSION "1.0.0") +install_module(cat core) diff --git a/server/modules/routing/cat/cat.cc b/server/modules/routing/cat/cat.cc new file mode 100644 index 000000000..450bd25e9 --- /dev/null +++ b/server/modules/routing/cat/cat.cc @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "cat.hh" +#include "catsession.hh" + +using namespace maxscale; + +Cat::Cat(SERVICE* pService): + Router(pService) +{ +} + +Cat::~Cat() +{ +} + +Cat* Cat::create(SERVICE* pService, char** pzOptions) +{ + return new Cat(pService); +} + +CatSession* Cat::newSession(MXS_SESSION* pSession) +{ + auto backends = RWBackend::from_servers(pSession->service->dbref); + + for (auto a = backends.begin(); a != backends.end(); a++) + { + (*a)->connect(pSession); + } + + return new CatSession(pSession, this, backends); +} + +void Cat::diagnostics(DCB* dcb) +{ +} + +json_t* Cat::diagnostics_json() const +{ + return NULL; +} + +const uint64_t caps = RCAP_TYPE_STMT_OUTPUT | RCAP_TYPE_STMT_INPUT; + +uint64_t Cat::getCapabilities() +{ + return caps; +} + +MXS_BEGIN_DECLS + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +MXS_MODULE* MXS_CREATE_MODULE() +{ + static MXS_MODULE info = + { + MXS_MODULE_API_ROUTER, + MXS_MODULE_ALPHA_RELEASE, + MXS_ROUTER_VERSION, + "Resultset concatenation router", + "V1.0.0", + caps, + &Cat::s_object, + NULL, /* Process init. */ + NULL, /* Process finish. */ + NULL, /* Thread init. */ + NULL, /* Thread finish. */ + { + {MXS_END_MODULE_PARAMS} + } + }; + + return &info; +} + +MXS_END_DECLS diff --git a/server/modules/routing/cat/cat.hh b/server/modules/routing/cat/cat.hh new file mode 100644 index 000000000..e0d3f4858 --- /dev/null +++ b/server/modules/routing/cat/cat.hh @@ -0,0 +1,39 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include + +class CatSession; + +/** + * The per instance data for the router. + */ +class Cat: public mxs::Router +{ + Cat(const Cat&) = delete; + Cat& operator =(const Cat&) = delete; +public: + ~Cat(); + static Cat* create(SERVICE* pService, char** pzOptions); + CatSession* newSession(MXS_SESSION* pSession); + void diagnostics(DCB* pDcb); + json_t* diagnostics_json() const; + uint64_t getCapabilities(); + +private: + friend class CatSession; + + /** Internal functions */ + Cat(SERVICE* pService); +}; diff --git a/server/modules/routing/cat/catsession.cc b/server/modules/routing/cat/catsession.cc new file mode 100644 index 000000000..0fe0c8627 --- /dev/null +++ b/server/modules/routing/cat/catsession.cc @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "cat.hh" +#include "catsession.hh" +#include "maxscale/protocol/mysql.h" +#include "maxscale/modutil.h" + +using namespace maxscale; + +CatSession::CatSession(MXS_SESSION* session, Cat* router, SRWBackendList& backends): + RouterSession(session), + m_session(session), + m_backends(backends), + m_completed(0) +{ +} + +CatSession::~CatSession() +{ +} + +void CatSession::close() +{ +} + +void CatSession::skip_unused() +{ + // Skip unused backends + while (m_current != m_backends.end() && !(*m_current)->in_use()) + { + m_current++; + } +} + +int32_t CatSession::routeQuery(GWBUF* pPacket) +{ + int32_t rval = 0; + + m_completed = 0; + m_packet_num = 0; + m_query = pPacket; + m_current = m_backends.begin(); + + // If the first backend is not in use, find one that is + skip_unused(); + + if (m_current != m_backends.end()) + { + // We have a backend, write the query only to this one. It will be + // propagated onwards in clientReply. + rval = (*m_current)->write(gwbuf_clone(pPacket)); + (*m_current)->set_reply_state(REPLY_STATE_START); + } + + return rval; +} + +void CatSession::clientReply(GWBUF* pPacket, DCB* pDcb) +{ + auto backend = *m_current; + ss_dassert(backend->dcb() == pDcb); + bool send = false; + bool propagate = true; + + if (m_completed == 0 && backend->get_reply_state() == REPLY_STATE_START && + !mxs_mysql_is_result_set(pPacket)) + { + propagate = false; + } + + if (backend->reply_is_complete(pPacket)) + { + backend->ack_write(); + m_completed++; + m_current++; + skip_unused(); + + if (m_current == m_backends.end()) + { + uint8_t eof_packet[] = {0x5, 0x0, 0x0, 0x0, 0xfe, 0x0, 0x0, 0x2, 0x0}; + gwbuf_free(pPacket); + pPacket = gwbuf_alloc_and_load(sizeof(eof_packet), eof_packet); + send = true; + gwbuf_free(m_query); + m_query = NULL; + } + else if (propagate) + { + (*m_current)->write(gwbuf_clone(m_query)); + } + else + { + send = true; + gwbuf_free(m_query); + m_query = NULL; + } + } + + if (m_completed == 0) + { + send = backend->get_reply_state() != REPLY_STATE_DONE; + } + else if (backend->get_reply_state() == REPLY_STATE_RSET_ROWS && + mxs_mysql_get_command(pPacket) != MYSQL_REPLY_EOF) + { + send = true; + } + + if (send) + { + GWBUF_DATA(pPacket)[3] = m_packet_num++; + MXS_SESSION_ROUTE_REPLY(pDcb->session, pPacket); + } + else + { + gwbuf_free(pPacket); + } +} + +void CatSession::handleError(GWBUF* pMessage, DCB* pProblem, mxs_error_action_t action, bool* pSuccess) +{ + *pSuccess = false; +} diff --git a/server/modules/routing/cat/catsession.hh b/server/modules/routing/cat/catsession.hh new file mode 100644 index 000000000..d14f9741e --- /dev/null +++ b/server/modules/routing/cat/catsession.hh @@ -0,0 +1,82 @@ +#pragma once +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2020-01-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "cat.hh" + +#include "../readwritesplit/rwbackend.hh" + +class Cat; + +/** + * The client session structure used within this router. + */ +class CatSession: public mxs::RouterSession +{ + CatSession(const CatSession&) = delete; + CatSession& operator =(const CatSession&) = delete; +public: + + CatSession(MXS_SESSION* session, Cat* router, mxs::SRWBackendList& backends); + + /** + * The RouterSession instance will be deleted when a client session + * has terminated. Will be called only after @c close() has been called. + */ + ~CatSession(); + + /** + * Called when a client session has been closed. + */ + void close(); + + /** + * Called when a packet being is routed to the backend. The router should + * forward the packet to the appropriate server(s). + * + * @param pPacket A client packet. + */ + int32_t routeQuery(GWBUF* pPacket); + + /** + * Called when a packet is routed to the client. The router should + * forward the packet to the client using `MXS_SESSION_ROUTE_REPLY`. + * + * @param pPacket A client packet. + * @param pBackend The backend the packet is coming from. + */ + void clientReply(GWBUF* pPacket, DCB* pBackend); + + /** + * + * @param pMessage The error message. + * @param pProblem The DCB on which the error occurred. + * @param action The context. + * @param pSuccess On output, if false, the session will be terminated. + */ + void handleError(GWBUF* pMessage, + DCB* pProblem, + mxs_error_action_t action, + bool* pSuccess); +private: + + MXS_SESSION* m_session; + mxs::SRWBackendList m_backends; + uint64_t m_completed; + uint8_t m_packet_num; + mxs::SRWBackendList::iterator m_current; + GWBUF* m_query; + + // Skip over unused backend servers + void skip_unused(); +}; \ No newline at end of file