From 2c4228db47ef3bef3a00ab8658f3c8bdcea70faf Mon Sep 17 00:00:00 2001 From: Esa Korhonen Date: Wed, 20 Mar 2019 17:18:45 +0200 Subject: [PATCH 1/3] MXS-2400 Extend examplefilter Added more comments. Also the filter now demonstrates how to 1) read configuration parameters 2) react to queries and replies 3) handle shared filter data 4) print diagnostics output 5) add log entries As the filter is already built and moved to the library directory, it is immediately usable. This should be helpful with assignments. --- examples/examplefilter.cc | 69 +++++++++++++++++++++++++++----- examples/examplefilter.hh | 62 ++++++++++++++++++++++++---- examples/examplefiltersession.cc | 18 +++++++-- examples/examplefiltersession.hh | 46 ++++++++++++++++++--- 4 files changed, 170 insertions(+), 25 deletions(-) diff --git a/examples/examplefilter.cc b/examples/examplefilter.cc index 6e7d2c282..87993670b 100644 --- a/examples/examplefilter.cc +++ b/examples/examplefilter.cc @@ -12,28 +12,47 @@ */ // All log messages from this module are prefixed with this -#define MXS_MODULE_NAME "examplefilter" +#define MXS_MODULE_NAME "examplecppfilter" + +/* + * To use the filter in a configuration, add the following section to the config file: + * [ExampleFilter] + * type=filter + * module=examplecppfilter + * global_counts=true + * + * Then add the filter to a service: + * [Read-Write-Service] + * . + * . + * filters=ExampleFilter + */ #include "examplefilter.hh" +static const char CN_COUNT_GLOBALS[] = "global_counts"; + // This declares a module in MaxScale extern "C" MXS_MODULE* MXS_CREATE_MODULE() { + static const char DESC[] = "An example filter that counts the number of queries and replies " + "it has routed"; static MXS_MODULE info = { MXS_MODULE_API_FILTER, MXS_MODULE_IN_DEVELOPMENT, MXS_FILTER_VERSION, - "An example filter that does nothing", + DESC, "V1.0.0", - RCAP_TYPE_NONE, + RCAP_TYPE_STMT_INPUT, // See getCapabilities() below &ExampleFilter::s_object, // This is defined in the MaxScale filter template NULL, /* Process init. */ NULL, /* Process finish. */ NULL, /* Thread init. */ NULL, /* Thread finish. */ { - {"an_example_parameter", MXS_MODULE_PARAM_STRING,"a-default-value"}, + {"an_example_parameter", MXS_MODULE_PARAM_STRING, "a-default-value"}, + {CN_COUNT_GLOBALS, MXS_MODULE_PARAM_BOOL, "true"}, {MXS_END_MODULE_PARAMS} } }; @@ -41,8 +60,9 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE() return &info; } -ExampleFilter::ExampleFilter() +ExampleFilter::ExampleFilter(const MXS_CONFIG_PARAMETER* pParams) { + m_collect_global_counts = config_get_bool(pParams, CN_COUNT_GLOBALS); } ExampleFilter::~ExampleFilter() @@ -52,27 +72,58 @@ ExampleFilter::~ExampleFilter() // static ExampleFilter* ExampleFilter::create(const char* zName, MXS_CONFIG_PARAMETER* pParams) { - return new ExampleFilter(); + return new ExampleFilter(pParams); } ExampleFilterSession* ExampleFilter::newSession(MXS_SESSION* pSession) { - return ExampleFilterSession::create(pSession, this); + return ExampleFilterSession::create(pSession, *this); } // static void ExampleFilter::diagnostics(DCB* pDcb) const { + int queries = m_total_queries.load(std::memory_order_relaxed); + int replies = m_total_replies.load(std::memory_order_relaxed); + dcb_printf(pDcb, "\t\tTotal queries %i\n", queries); + dcb_printf(pDcb, "\t\tTotal replies %i\n", replies); } // static json_t* ExampleFilter::diagnostics_json() const { - return NULL; + json_t* rval = json_object(); + int queries = m_total_queries.load(std::memory_order_relaxed); + int replies = m_total_replies.load(std::memory_order_relaxed); + json_object_set_new(rval, "total_queries", json_integer(queries)); + json_object_set_new(rval, "total_replies", json_integer(replies)); + return rval; } // static uint64_t ExampleFilter::getCapabilities() { - return RCAP_TYPE_NONE; + // Tells the protocol that the filter expects complete queries from client, that is, a query cannot be + // sent in parts. + return RCAP_TYPE_STMT_INPUT; + + // Try the following to also expect replies to be complete. This can cause problems if the server sends + // a really big (e.g. 1 GB) resultset. + // return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_RESULTSET_OUTPUT; } + +void ExampleFilter::query_seen() +{ + if (m_collect_global_counts) + { + m_total_queries.fetch_add(1, std::memory_order_relaxed); + } +} + +void ExampleFilter::reply_seen() +{ + if (m_collect_global_counts) + { + m_total_replies.fetch_add(1, std::memory_order_relaxed); + } +} \ No newline at end of file diff --git a/examples/examplefilter.hh b/examples/examplefilter.hh index d441c7908..8faf2651a 100644 --- a/examples/examplefilter.hh +++ b/examples/examplefilter.hh @@ -13,9 +13,16 @@ #pragma once #include +#include #include #include "examplefiltersession.hh" +/** + * Defines general data for the filter. This object is generated when MaxScale starts and deleted at + * shutdown. When MaxScale is routing queries, this object may be accessed from multiple threads + * concurrently. This should be considered if the object contains fields that are unsafe to + * access/modify concurrently. + */ class ExampleFilter : public maxscale::Filter { // Prevent copy-constructor and assignment operator usage @@ -25,22 +32,63 @@ class ExampleFilter : public maxscale::Filter m_total_queries {0}; /**< How many queries has this filter seen */ + std::atomic m_total_replies {0}; /**< How many replies has this filter seen */ + + bool m_collect_global_counts {false}; /**< Should sessions manipulate the total counts */ }; diff --git a/examples/examplefiltersession.cc b/examples/examplefiltersession.cc index 22f2fecd0..2b6e6b949 100644 --- a/examples/examplefiltersession.cc +++ b/examples/examplefiltersession.cc @@ -17,8 +17,10 @@ #include "examplefiltersession.hh" #include "examplefilter.hh" -ExampleFilterSession::ExampleFilterSession(MXS_SESSION* pSession) +ExampleFilterSession::ExampleFilterSession(MXS_SESSION* pSession, ExampleFilter& filter) : mxs::FilterSession(pSession) + , m_filter(filter) + , m_session_id(pSession->ses_id) { } @@ -27,21 +29,31 @@ ExampleFilterSession::~ExampleFilterSession() } // static -ExampleFilterSession* ExampleFilterSession::create(MXS_SESSION* pSession, const ExampleFilter* pFilter) +ExampleFilterSession* ExampleFilterSession::create(MXS_SESSION* pSession, ExampleFilter& filter) { - return new ExampleFilterSession(pSession); + return new ExampleFilterSession(pSession, filter); } void ExampleFilterSession::close() { + // When the session is closed, report the numbers to the log. + MXS_NOTICE("Session %lu routed %i queries and %i replies.", m_session_id, m_queries, m_replies); } int ExampleFilterSession::routeQuery(GWBUF* pPacket) { + m_queries++; + m_filter.query_seen(); + + // Pass the query forward. return mxs::FilterSession::routeQuery(pPacket); } int ExampleFilterSession::clientReply(GWBUF* pPacket) { + m_replies++; + m_filter.reply_seen(); + + // Pass the reply forward. return mxs::FilterSession::clientReply(pPacket); } diff --git a/examples/examplefiltersession.hh b/examples/examplefiltersession.hh index 2dcb757d0..ced7116fd 100644 --- a/examples/examplefiltersession.hh +++ b/examples/examplefiltersession.hh @@ -17,6 +17,11 @@ class ExampleFilter; +/* + * Defines session-specific data for this filter. An object of this class is created when a client connects + * and deleted on disconnect. The object is only accessed from one thread because sessions are locked to + * a thread when created. + */ class ExampleFilterSession : public maxscale::FilterSession { // Prevent copy-constructor and assignment operator usage @@ -26,19 +31,48 @@ class ExampleFilterSession : public maxscale::FilterSession public: ~ExampleFilterSession(); - // Called when a client session has been closed + // Called when a client session has been closed. Destructor will be called right after. void close(); - // Create a new filter session - static ExampleFilterSession* create(MXS_SESSION* pSession, const ExampleFilter* pFilter); + /** + * Called by ExampleFilter::newSession() to create the session. + * + * @param pSession pSession The generic MaxScale session object + * @param pFilter The shared filter object + * @return A new session or NULL on failure + */ + static ExampleFilterSession* create(MXS_SESSION* pSession, ExampleFilter& pFilter); - // Handle a query from the client + /** + * Handle a query from the client. This is called when the client sends a query and the query has not + * been blocked by any previous component in the query processing chain. The filter should do its own + * processing and then send the query to the next component. If the query comes in multiple packets, + * this is called for each packet. + * + * @param pPacket Packet containing the query, or at least a part of it + * @return 0 on success. This typically depends on the later stages of the query processing chain. + */ int routeQuery(GWBUF* pPacket); - // Handle a reply from server + + /** + * Handle a reply from server. The reply typically contains a resultset or a response to a command. + * The filter should do its own processing and then send the query to the next component. + * If the reply comes in multiple packets, this is called for each packet. The processing chain for + * replies is the same as for queries, just walked in the opposite direction. + * + * @param pPacket Packet containing results + * @return 0 on success. This typically depends on the later stages of the reply processing chain. + */ int clientReply(GWBUF* pPacket); private: // Used in the create function - ExampleFilterSession(MXS_SESSION* pSession); + ExampleFilterSession(MXS_SESSION* pSession, ExampleFilter& filter); + + ExampleFilter& m_filter; /**< Shared filter data */ + + uint64_t m_session_id {0}; /**< Session id */ + int m_queries {0}; /**< How many queries has this session seen */ + int m_replies {0}; /**< How many replies has this session seen */ }; From 11ee74bad327e7fb15e8388d20e7838b9e49cadf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sat, 16 Mar 2019 10:24:04 +0200 Subject: [PATCH 2/3] Free the readwritesplit query queue If the queue isn't empty when the session closes, the queue would leak. --- server/modules/routing/readwritesplit/readwritesplit.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/server/modules/routing/readwritesplit/readwritesplit.cc b/server/modules/routing/readwritesplit/readwritesplit.cc index 82a37c38a..058efaf88 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.cc +++ b/server/modules/routing/readwritesplit/readwritesplit.cc @@ -918,6 +918,7 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio { router_cli_ses->rses_closed = true; close_all_connections(router_cli_ses->backends); + gwbuf_free(router_cli_ses->query_queue); if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO) && router_cli_ses->sescmd_list.size()) From 559b786533b0fb6c691ef8d28c8397488ca3b1ae Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Thu, 21 Mar 2019 10:06:15 +0200 Subject: [PATCH 3/3] MXS-2398 Handle MariaDB specific comments Same approach as with regular comments: - /*M! STMT */ are always parsed. - /*M!###### STMT */ are never parsed. --- .../sqlite-src-3110100/src/tokenize.c | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/query_classifier/qc_sqlite/sqlite-src-3110100/src/tokenize.c b/query_classifier/qc_sqlite/sqlite-src-3110100/src/tokenize.c index 11360dc56..84e4a4538 100644 --- a/query_classifier/qc_sqlite/sqlite-src-3110100/src/tokenize.c +++ b/query_classifier/qc_sqlite/sqlite-src-3110100/src/tokenize.c @@ -255,21 +255,27 @@ int sqlite3GetToken(const unsigned char *z, int *tokenType){ return 1; } #ifdef MAXSCALE - if ( z[2] == '!' ){ + if ( z[2]=='!' || (z[2]=='M' && z[3]=='!')){ + int j = (z[2]=='M' ? 4 : 3); // MySQL-specific code - for (i=3, c=z[2]; (c!='*' || z[i]!='/') && (c=z[i])!=0; i++){} + for (i=j, c=z[j-1]; (c!='*' || z[i]!='/') && (c=z[i])!=0; i++){} if (c=='*' && z[i]=='/'){ - if (sqlite3Isdigit(z[3])) { - // A version specific executable comment, e.g. "/*!99999 ..." => never parsed. + if (sqlite3Isdigit(z[j])) { + // A version specific executable comment, + // e.g. "/*!99999 ..." or "/*M!99999 ..." => never parsed. extern void maxscaleSetStatusCap(int); maxscaleSetStatusCap(2); // QC_QUERY_PARTIALLY_PARSED, see query_classifier.h:qc_parse_result ++i; // Next after the trailing '/' } else { - // A non-version specific executable comment, e.g. "/*! select 1 */ => always parsed. + // A non-version specific executable comment, + // e.g."/*! select 1 */ or "/*M! select 1 */ => always parsed. char* znc = (char*) z; znc[0]=znc[1]=znc[2]=znc[i-1]=znc[i]=' '; // Remove comment chars, i.e. "/*!" and "*/". - for (i=3; sqlite3Isspace(z[i]); ++i){} // Jump over any space. + if (j==4){ + znc[3]=0; // It wasn't "/*!" but "/*M!". + } + for (i=j; sqlite3Isspace(z[i]); ++i){} // Jump over any space. } } } else {