MXS-2400 Extend examplefilter

Added more comments. Also the filter now demonstrates how to
1) read configuration parameters
2) react to queries and replies
3) handle shared filter data
4) print diagnostics output
5) add log entries

As the filter is already built and moved to the library directory,
it is immediately usable. This should be helpful with assignments.
This commit is contained in:
Esa Korhonen 2019-03-20 17:18:45 +02:00
parent 65b4ac7c1b
commit 2c4228db47
4 changed files with 170 additions and 25 deletions

View File

@ -12,28 +12,47 @@
*/
// All log messages from this module are prefixed with this
#define MXS_MODULE_NAME "examplefilter"
#define MXS_MODULE_NAME "examplecppfilter"
/*
* To use the filter in a configuration, add the following section to the config file:
* [ExampleFilter]
* type=filter
* module=examplecppfilter
* global_counts=true
*
* Then add the filter to a service:
* [Read-Write-Service]
* .
* .
* filters=ExampleFilter
*/
#include "examplefilter.hh"
static const char CN_COUNT_GLOBALS[] = "global_counts";
// This declares a module in MaxScale
extern "C" MXS_MODULE* MXS_CREATE_MODULE()
{
static const char DESC[] = "An example filter that counts the number of queries and replies "
"it has routed";
static MXS_MODULE info =
{
MXS_MODULE_API_FILTER,
MXS_MODULE_IN_DEVELOPMENT,
MXS_FILTER_VERSION,
"An example filter that does nothing",
DESC,
"V1.0.0",
RCAP_TYPE_NONE,
RCAP_TYPE_STMT_INPUT, // See getCapabilities() below
&ExampleFilter::s_object, // This is defined in the MaxScale filter template
NULL, /* Process init. */
NULL, /* Process finish. */
NULL, /* Thread init. */
NULL, /* Thread finish. */
{
{"an_example_parameter", MXS_MODULE_PARAM_STRING,"a-default-value"},
{"an_example_parameter", MXS_MODULE_PARAM_STRING, "a-default-value"},
{CN_COUNT_GLOBALS, MXS_MODULE_PARAM_BOOL, "true"},
{MXS_END_MODULE_PARAMS}
}
};
@ -41,8 +60,9 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
return &info;
}
ExampleFilter::ExampleFilter()
ExampleFilter::ExampleFilter(const MXS_CONFIG_PARAMETER* pParams)
{
m_collect_global_counts = config_get_bool(pParams, CN_COUNT_GLOBALS);
}
ExampleFilter::~ExampleFilter()
@ -52,27 +72,58 @@ ExampleFilter::~ExampleFilter()
// static
ExampleFilter* ExampleFilter::create(const char* zName, MXS_CONFIG_PARAMETER* pParams)
{
return new ExampleFilter();
return new ExampleFilter(pParams);
}
ExampleFilterSession* ExampleFilter::newSession(MXS_SESSION* pSession)
{
return ExampleFilterSession::create(pSession, this);
return ExampleFilterSession::create(pSession, *this);
}
// static
void ExampleFilter::diagnostics(DCB* pDcb) const
{
int queries = m_total_queries.load(std::memory_order_relaxed);
int replies = m_total_replies.load(std::memory_order_relaxed);
dcb_printf(pDcb, "\t\tTotal queries %i\n", queries);
dcb_printf(pDcb, "\t\tTotal replies %i\n", replies);
}
// static
json_t* ExampleFilter::diagnostics_json() const
{
return NULL;
json_t* rval = json_object();
int queries = m_total_queries.load(std::memory_order_relaxed);
int replies = m_total_replies.load(std::memory_order_relaxed);
json_object_set_new(rval, "total_queries", json_integer(queries));
json_object_set_new(rval, "total_replies", json_integer(replies));
return rval;
}
// static
uint64_t ExampleFilter::getCapabilities()
{
return RCAP_TYPE_NONE;
// Tells the protocol that the filter expects complete queries from client, that is, a query cannot be
// sent in parts.
return RCAP_TYPE_STMT_INPUT;
// Try the following to also expect replies to be complete. This can cause problems if the server sends
// a really big (e.g. 1 GB) resultset.
// return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_RESULTSET_OUTPUT;
}
void ExampleFilter::query_seen()
{
if (m_collect_global_counts)
{
m_total_queries.fetch_add(1, std::memory_order_relaxed);
}
}
void ExampleFilter::reply_seen()
{
if (m_collect_global_counts)
{
m_total_replies.fetch_add(1, std::memory_order_relaxed);
}
}

View File

@ -13,9 +13,16 @@
#pragma once
#include <maxscale/ccdefs.hh>
#include <atomic>
#include <maxscale/filter.hh>
#include "examplefiltersession.hh"
/**
* Defines general data for the filter. This object is generated when MaxScale starts and deleted at
* shutdown. When MaxScale is routing queries, this object may be accessed from multiple threads
* concurrently. This should be considered if the object contains fields that are unsafe to
* access/modify concurrently.
*/
class ExampleFilter : public maxscale::Filter<ExampleFilter, ExampleFilterSession>
{
// Prevent copy-constructor and assignment operator usage
@ -25,22 +32,63 @@ class ExampleFilter : public maxscale::Filter<ExampleFilter, ExampleFilterSessio
public:
~ExampleFilter();
// Creates a new filter instance
/**
* Creates a new filter instance. A separate function from the ctor is used so that NULL can be
* returned on failure.
*
* @param zName The name given to the filter in the configuration file. Can be stored if required for
* e.g. log messages.
* @param ppParams Configuration parameters parsed from the configuration file
* @return The object on success, NULL on failure. Failure is typically caused by an invalid
* configuration parameter.
*/
static ExampleFilter* create(const char* zName, MXS_CONFIG_PARAMETER* ppParams);
// Creates a new session for this filter
/*
* Creates a new session for this filter. This is called when a new client connects.
*
* @param pSession The generic MaxScale session object.
* @return The new session, or NULL on failure.
*/
ExampleFilterSession* newSession(MXS_SESSION* pSession);
// Print diagnostics to a DCB
/*
* Print diagnostics to a DCB. This is called when the admin tool MaxAdmin asks for the status of this
* filter. Run MaxAdmin with "./maxadmin show filters" in the MaxScale binary directory.
*
* @param pDcb The connection descriptor to print diagnostic to
*/
void diagnostics(DCB* pDcb) const;
// Returns JSON form diagnostic data
/*
* Returns JSON form diagnostic data. This is called when the admin tool MaxCtrl asks for the status
* of this filter. Run MaxCtrl with "./maxctrl show filters" in the MaxScale binary directory.
*
* @return Json object
*/
json_t* diagnostics_json() const;
// Get filter capabilities
/*
* Get filter capabilities. This is used by protocol code to find out what kind of data the filter
* expects.
*
* @return Capabilities as a bitfield
*/
uint64_t getCapabilities();
// Specific to ExampleFilter. Called by a session when it sees a query.
void query_seen();
// Specific to ExampleFilter. Called by a session when it sees a reply.
void reply_seen();
private:
// Used in the create function
ExampleFilter();
// Used by the create function
ExampleFilter(const MXS_CONFIG_PARAMETER* pParams);
// The fields are specific to ExampleFilter.
std::atomic<int> m_total_queries {0}; /**< How many queries has this filter seen */
std::atomic<int> m_total_replies {0}; /**< How many replies has this filter seen */
bool m_collect_global_counts {false}; /**< Should sessions manipulate the total counts */
};

View File

@ -17,8 +17,10 @@
#include "examplefiltersession.hh"
#include "examplefilter.hh"
ExampleFilterSession::ExampleFilterSession(MXS_SESSION* pSession)
ExampleFilterSession::ExampleFilterSession(MXS_SESSION* pSession, ExampleFilter& filter)
: mxs::FilterSession(pSession)
, m_filter(filter)
, m_session_id(pSession->ses_id)
{
}
@ -27,21 +29,31 @@ ExampleFilterSession::~ExampleFilterSession()
}
// static
ExampleFilterSession* ExampleFilterSession::create(MXS_SESSION* pSession, const ExampleFilter* pFilter)
ExampleFilterSession* ExampleFilterSession::create(MXS_SESSION* pSession, ExampleFilter& filter)
{
return new ExampleFilterSession(pSession);
return new ExampleFilterSession(pSession, filter);
}
void ExampleFilterSession::close()
{
// When the session is closed, report the numbers to the log.
MXS_NOTICE("Session %lu routed %i queries and %i replies.", m_session_id, m_queries, m_replies);
}
int ExampleFilterSession::routeQuery(GWBUF* pPacket)
{
m_queries++;
m_filter.query_seen();
// Pass the query forward.
return mxs::FilterSession::routeQuery(pPacket);
}
int ExampleFilterSession::clientReply(GWBUF* pPacket)
{
m_replies++;
m_filter.reply_seen();
// Pass the reply forward.
return mxs::FilterSession::clientReply(pPacket);
}

View File

@ -17,6 +17,11 @@
class ExampleFilter;
/*
* Defines session-specific data for this filter. An object of this class is created when a client connects
* and deleted on disconnect. The object is only accessed from one thread because sessions are locked to
* a thread when created.
*/
class ExampleFilterSession : public maxscale::FilterSession
{
// Prevent copy-constructor and assignment operator usage
@ -26,19 +31,48 @@ class ExampleFilterSession : public maxscale::FilterSession
public:
~ExampleFilterSession();
// Called when a client session has been closed
// Called when a client session has been closed. Destructor will be called right after.
void close();
// Create a new filter session
static ExampleFilterSession* create(MXS_SESSION* pSession, const ExampleFilter* pFilter);
/**
* Called by ExampleFilter::newSession() to create the session.
*
* @param pSession pSession The generic MaxScale session object
* @param pFilter The shared filter object
* @return A new session or NULL on failure
*/
static ExampleFilterSession* create(MXS_SESSION* pSession, ExampleFilter& pFilter);
// Handle a query from the client
/**
* Handle a query from the client. This is called when the client sends a query and the query has not
* been blocked by any previous component in the query processing chain. The filter should do its own
* processing and then send the query to the next component. If the query comes in multiple packets,
* this is called for each packet.
*
* @param pPacket Packet containing the query, or at least a part of it
* @return 0 on success. This typically depends on the later stages of the query processing chain.
*/
int routeQuery(GWBUF* pPacket);
// Handle a reply from server
/**
* Handle a reply from server. The reply typically contains a resultset or a response to a command.
* The filter should do its own processing and then send the query to the next component.
* If the reply comes in multiple packets, this is called for each packet. The processing chain for
* replies is the same as for queries, just walked in the opposite direction.
*
* @param pPacket Packet containing results
* @return 0 on success. This typically depends on the later stages of the reply processing chain.
*/
int clientReply(GWBUF* pPacket);
private:
// Used in the create function
ExampleFilterSession(MXS_SESSION* pSession);
ExampleFilterSession(MXS_SESSION* pSession, ExampleFilter& filter);
ExampleFilter& m_filter; /**< Shared filter data */
uint64_t m_session_id {0}; /**< Session id */
int m_queries {0}; /**< How many queries has this session seen */
int m_replies {0}; /**< How many replies has this session seen */
};