diff --git a/examples/examplefilter.cc b/examples/examplefilter.cc index 6e7d2c282..87993670b 100644 --- a/examples/examplefilter.cc +++ b/examples/examplefilter.cc @@ -12,28 +12,47 @@ */ // All log messages from this module are prefixed with this -#define MXS_MODULE_NAME "examplefilter" +#define MXS_MODULE_NAME "examplecppfilter" + +/* + * To use the filter in a configuration, add the following section to the config file: + * [ExampleFilter] + * type=filter + * module=examplecppfilter + * global_counts=true + * + * Then add the filter to a service: + * [Read-Write-Service] + * . + * . + * filters=ExampleFilter + */ #include "examplefilter.hh" +static const char CN_COUNT_GLOBALS[] = "global_counts"; + // This declares a module in MaxScale extern "C" MXS_MODULE* MXS_CREATE_MODULE() { + static const char DESC[] = "An example filter that counts the number of queries and replies " + "it has routed"; static MXS_MODULE info = { MXS_MODULE_API_FILTER, MXS_MODULE_IN_DEVELOPMENT, MXS_FILTER_VERSION, - "An example filter that does nothing", + DESC, "V1.0.0", - RCAP_TYPE_NONE, + RCAP_TYPE_STMT_INPUT, // See getCapabilities() below &ExampleFilter::s_object, // This is defined in the MaxScale filter template NULL, /* Process init. */ NULL, /* Process finish. */ NULL, /* Thread init. */ NULL, /* Thread finish. */ { - {"an_example_parameter", MXS_MODULE_PARAM_STRING,"a-default-value"}, + {"an_example_parameter", MXS_MODULE_PARAM_STRING, "a-default-value"}, + {CN_COUNT_GLOBALS, MXS_MODULE_PARAM_BOOL, "true"}, {MXS_END_MODULE_PARAMS} } }; @@ -41,8 +60,9 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE() return &info; } -ExampleFilter::ExampleFilter() +ExampleFilter::ExampleFilter(const MXS_CONFIG_PARAMETER* pParams) { + m_collect_global_counts = config_get_bool(pParams, CN_COUNT_GLOBALS); } ExampleFilter::~ExampleFilter() @@ -52,27 +72,58 @@ ExampleFilter::~ExampleFilter() // static ExampleFilter* ExampleFilter::create(const char* zName, MXS_CONFIG_PARAMETER* pParams) { - return new ExampleFilter(); + return new ExampleFilter(pParams); } ExampleFilterSession* ExampleFilter::newSession(MXS_SESSION* pSession) { - return ExampleFilterSession::create(pSession, this); + return ExampleFilterSession::create(pSession, *this); } // static void ExampleFilter::diagnostics(DCB* pDcb) const { + int queries = m_total_queries.load(std::memory_order_relaxed); + int replies = m_total_replies.load(std::memory_order_relaxed); + dcb_printf(pDcb, "\t\tTotal queries %i\n", queries); + dcb_printf(pDcb, "\t\tTotal replies %i\n", replies); } // static json_t* ExampleFilter::diagnostics_json() const { - return NULL; + json_t* rval = json_object(); + int queries = m_total_queries.load(std::memory_order_relaxed); + int replies = m_total_replies.load(std::memory_order_relaxed); + json_object_set_new(rval, "total_queries", json_integer(queries)); + json_object_set_new(rval, "total_replies", json_integer(replies)); + return rval; } // static uint64_t ExampleFilter::getCapabilities() { - return RCAP_TYPE_NONE; + // Tells the protocol that the filter expects complete queries from client, that is, a query cannot be + // sent in parts. + return RCAP_TYPE_STMT_INPUT; + + // Try the following to also expect replies to be complete. This can cause problems if the server sends + // a really big (e.g. 1 GB) resultset. + // return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_RESULTSET_OUTPUT; } + +void ExampleFilter::query_seen() +{ + if (m_collect_global_counts) + { + m_total_queries.fetch_add(1, std::memory_order_relaxed); + } +} + +void ExampleFilter::reply_seen() +{ + if (m_collect_global_counts) + { + m_total_replies.fetch_add(1, std::memory_order_relaxed); + } +} \ No newline at end of file diff --git a/examples/examplefilter.hh b/examples/examplefilter.hh index d441c7908..8faf2651a 100644 --- a/examples/examplefilter.hh +++ b/examples/examplefilter.hh @@ -13,9 +13,16 @@ #pragma once #include +#include #include #include "examplefiltersession.hh" +/** + * Defines general data for the filter. This object is generated when MaxScale starts and deleted at + * shutdown. When MaxScale is routing queries, this object may be accessed from multiple threads + * concurrently. This should be considered if the object contains fields that are unsafe to + * access/modify concurrently. + */ class ExampleFilter : public maxscale::Filter { // Prevent copy-constructor and assignment operator usage @@ -25,22 +32,63 @@ class ExampleFilter : public maxscale::Filter m_total_queries {0}; /**< How many queries has this filter seen */ + std::atomic m_total_replies {0}; /**< How many replies has this filter seen */ + + bool m_collect_global_counts {false}; /**< Should sessions manipulate the total counts */ }; diff --git a/examples/examplefiltersession.cc b/examples/examplefiltersession.cc index 22f2fecd0..2b6e6b949 100644 --- a/examples/examplefiltersession.cc +++ b/examples/examplefiltersession.cc @@ -17,8 +17,10 @@ #include "examplefiltersession.hh" #include "examplefilter.hh" -ExampleFilterSession::ExampleFilterSession(MXS_SESSION* pSession) +ExampleFilterSession::ExampleFilterSession(MXS_SESSION* pSession, ExampleFilter& filter) : mxs::FilterSession(pSession) + , m_filter(filter) + , m_session_id(pSession->ses_id) { } @@ -27,21 +29,31 @@ ExampleFilterSession::~ExampleFilterSession() } // static -ExampleFilterSession* ExampleFilterSession::create(MXS_SESSION* pSession, const ExampleFilter* pFilter) +ExampleFilterSession* ExampleFilterSession::create(MXS_SESSION* pSession, ExampleFilter& filter) { - return new ExampleFilterSession(pSession); + return new ExampleFilterSession(pSession, filter); } void ExampleFilterSession::close() { + // When the session is closed, report the numbers to the log. + MXS_NOTICE("Session %lu routed %i queries and %i replies.", m_session_id, m_queries, m_replies); } int ExampleFilterSession::routeQuery(GWBUF* pPacket) { + m_queries++; + m_filter.query_seen(); + + // Pass the query forward. return mxs::FilterSession::routeQuery(pPacket); } int ExampleFilterSession::clientReply(GWBUF* pPacket) { + m_replies++; + m_filter.reply_seen(); + + // Pass the reply forward. return mxs::FilterSession::clientReply(pPacket); } diff --git a/examples/examplefiltersession.hh b/examples/examplefiltersession.hh index 2dcb757d0..ced7116fd 100644 --- a/examples/examplefiltersession.hh +++ b/examples/examplefiltersession.hh @@ -17,6 +17,11 @@ class ExampleFilter; +/* + * Defines session-specific data for this filter. An object of this class is created when a client connects + * and deleted on disconnect. The object is only accessed from one thread because sessions are locked to + * a thread when created. + */ class ExampleFilterSession : public maxscale::FilterSession { // Prevent copy-constructor and assignment operator usage @@ -26,19 +31,48 @@ class ExampleFilterSession : public maxscale::FilterSession public: ~ExampleFilterSession(); - // Called when a client session has been closed + // Called when a client session has been closed. Destructor will be called right after. void close(); - // Create a new filter session - static ExampleFilterSession* create(MXS_SESSION* pSession, const ExampleFilter* pFilter); + /** + * Called by ExampleFilter::newSession() to create the session. + * + * @param pSession pSession The generic MaxScale session object + * @param pFilter The shared filter object + * @return A new session or NULL on failure + */ + static ExampleFilterSession* create(MXS_SESSION* pSession, ExampleFilter& pFilter); - // Handle a query from the client + /** + * Handle a query from the client. This is called when the client sends a query and the query has not + * been blocked by any previous component in the query processing chain. The filter should do its own + * processing and then send the query to the next component. If the query comes in multiple packets, + * this is called for each packet. + * + * @param pPacket Packet containing the query, or at least a part of it + * @return 0 on success. This typically depends on the later stages of the query processing chain. + */ int routeQuery(GWBUF* pPacket); - // Handle a reply from server + + /** + * Handle a reply from server. The reply typically contains a resultset or a response to a command. + * The filter should do its own processing and then send the query to the next component. + * If the reply comes in multiple packets, this is called for each packet. The processing chain for + * replies is the same as for queries, just walked in the opposite direction. + * + * @param pPacket Packet containing results + * @return 0 on success. This typically depends on the later stages of the reply processing chain. + */ int clientReply(GWBUF* pPacket); private: // Used in the create function - ExampleFilterSession(MXS_SESSION* pSession); + ExampleFilterSession(MXS_SESSION* pSession, ExampleFilter& filter); + + ExampleFilter& m_filter; /**< Shared filter data */ + + uint64_t m_session_id {0}; /**< Session id */ + int m_queries {0}; /**< How many queries has this session seen */ + int m_replies {0}; /**< How many replies has this session seen */ };