Merge branch '2.3' into develop

This commit is contained in:
Markus Mäkelä 2019-03-21 18:23:27 +02:00
commit bf2d6673bc
No known key found for this signature in database
GPG Key ID: 72D48FCE664F7B19
5 changed files with 182 additions and 31 deletions

View File

@ -12,28 +12,47 @@
*/
// All log messages from this module are prefixed with this
#define MXS_MODULE_NAME "examplefilter"
#define MXS_MODULE_NAME "examplecppfilter"
/*
* To use the filter in a configuration, add the following section to the config file:
* [ExampleFilter]
* type=filter
* module=examplecppfilter
* global_counts=true
*
* Then add the filter to a service:
* [Read-Write-Service]
* .
* .
* filters=ExampleFilter
*/
#include "examplefilter.hh"
static const char CN_COUNT_GLOBALS[] = "global_counts";
// This declares a module in MaxScale
extern "C" MXS_MODULE* MXS_CREATE_MODULE()
{
static const char DESC[] = "An example filter that counts the number of queries and replies "
"it has routed";
static MXS_MODULE info =
{
MXS_MODULE_API_FILTER,
MXS_MODULE_IN_DEVELOPMENT,
MXS_FILTER_VERSION,
"An example filter that does nothing",
DESC,
"V1.0.0",
RCAP_TYPE_NONE,
RCAP_TYPE_STMT_INPUT, // See getCapabilities() below
&ExampleFilter::s_object, // This is defined in the MaxScale filter template
NULL, /* Process init. */
NULL, /* Process finish. */
NULL, /* Thread init. */
NULL, /* Thread finish. */
{
{"an_example_parameter", MXS_MODULE_PARAM_STRING,"a-default-value"},
{"an_example_parameter", MXS_MODULE_PARAM_STRING, "a-default-value"},
{CN_COUNT_GLOBALS, MXS_MODULE_PARAM_BOOL, "true"},
{MXS_END_MODULE_PARAMS}
}
};
@ -41,8 +60,9 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
return &info;
}
ExampleFilter::ExampleFilter()
ExampleFilter::ExampleFilter(const MXS_CONFIG_PARAMETER* pParams)
{
m_collect_global_counts = config_get_bool(pParams, CN_COUNT_GLOBALS);
}
ExampleFilter::~ExampleFilter()
@ -52,27 +72,58 @@ ExampleFilter::~ExampleFilter()
// static
ExampleFilter* ExampleFilter::create(const char* zName, MXS_CONFIG_PARAMETER* pParams)
{
return new ExampleFilter();
return new ExampleFilter(pParams);
}
ExampleFilterSession* ExampleFilter::newSession(MXS_SESSION* pSession)
{
return ExampleFilterSession::create(pSession, this);
return ExampleFilterSession::create(pSession, *this);
}
// static
void ExampleFilter::diagnostics(DCB* pDcb) const
{
int queries = m_total_queries.load(std::memory_order_relaxed);
int replies = m_total_replies.load(std::memory_order_relaxed);
dcb_printf(pDcb, "\t\tTotal queries %i\n", queries);
dcb_printf(pDcb, "\t\tTotal replies %i\n", replies);
}
// static
json_t* ExampleFilter::diagnostics_json() const
{
return NULL;
json_t* rval = json_object();
int queries = m_total_queries.load(std::memory_order_relaxed);
int replies = m_total_replies.load(std::memory_order_relaxed);
json_object_set_new(rval, "total_queries", json_integer(queries));
json_object_set_new(rval, "total_replies", json_integer(replies));
return rval;
}
// static
uint64_t ExampleFilter::getCapabilities()
{
return RCAP_TYPE_NONE;
// Tells the protocol that the filter expects complete queries from client, that is, a query cannot be
// sent in parts.
return RCAP_TYPE_STMT_INPUT;
// Try the following to also expect replies to be complete. This can cause problems if the server sends
// a really big (e.g. 1 GB) resultset.
// return RCAP_TYPE_STMT_INPUT | RCAP_TYPE_RESULTSET_OUTPUT;
}
void ExampleFilter::query_seen()
{
if (m_collect_global_counts)
{
m_total_queries.fetch_add(1, std::memory_order_relaxed);
}
}
void ExampleFilter::reply_seen()
{
if (m_collect_global_counts)
{
m_total_replies.fetch_add(1, std::memory_order_relaxed);
}
}

View File

@ -13,9 +13,16 @@
#pragma once
#include <maxscale/ccdefs.hh>
#include <atomic>
#include <maxscale/filter.hh>
#include "examplefiltersession.hh"
/**
* Defines general data for the filter. This object is generated when MaxScale starts and deleted at
* shutdown. When MaxScale is routing queries, this object may be accessed from multiple threads
* concurrently. This should be considered if the object contains fields that are unsafe to
* access/modify concurrently.
*/
class ExampleFilter : public maxscale::Filter<ExampleFilter, ExampleFilterSession>
{
// Prevent copy-constructor and assignment operator usage
@ -25,22 +32,63 @@ class ExampleFilter : public maxscale::Filter<ExampleFilter, ExampleFilterSessio
public:
~ExampleFilter();
// Creates a new filter instance
/**
* Creates a new filter instance. A separate function from the ctor is used so that NULL can be
* returned on failure.
*
* @param zName The name given to the filter in the configuration file. Can be stored if required for
* e.g. log messages.
* @param ppParams Configuration parameters parsed from the configuration file
* @return The object on success, NULL on failure. Failure is typically caused by an invalid
* configuration parameter.
*/
static ExampleFilter* create(const char* zName, MXS_CONFIG_PARAMETER* ppParams);
// Creates a new session for this filter
/*
* Creates a new session for this filter. This is called when a new client connects.
*
* @param pSession The generic MaxScale session object.
* @return The new session, or NULL on failure.
*/
ExampleFilterSession* newSession(MXS_SESSION* pSession);
// Print diagnostics to a DCB
/*
* Print diagnostics to a DCB. This is called when the admin tool MaxAdmin asks for the status of this
* filter. Run MaxAdmin with "./maxadmin show filters" in the MaxScale binary directory.
*
* @param pDcb The connection descriptor to print diagnostic to
*/
void diagnostics(DCB* pDcb) const;
// Returns JSON form diagnostic data
/*
* Returns JSON form diagnostic data. This is called when the admin tool MaxCtrl asks for the status
* of this filter. Run MaxCtrl with "./maxctrl show filters" in the MaxScale binary directory.
*
* @return Json object
*/
json_t* diagnostics_json() const;
// Get filter capabilities
/*
* Get filter capabilities. This is used by protocol code to find out what kind of data the filter
* expects.
*
* @return Capabilities as a bitfield
*/
uint64_t getCapabilities();
// Specific to ExampleFilter. Called by a session when it sees a query.
void query_seen();
// Specific to ExampleFilter. Called by a session when it sees a reply.
void reply_seen();
private:
// Used in the create function
ExampleFilter();
// Used by the create function
ExampleFilter(const MXS_CONFIG_PARAMETER* pParams);
// The fields are specific to ExampleFilter.
std::atomic<int> m_total_queries {0}; /**< How many queries has this filter seen */
std::atomic<int> m_total_replies {0}; /**< How many replies has this filter seen */
bool m_collect_global_counts {false}; /**< Should sessions manipulate the total counts */
};

View File

@ -17,8 +17,10 @@
#include "examplefiltersession.hh"
#include "examplefilter.hh"
ExampleFilterSession::ExampleFilterSession(MXS_SESSION* pSession)
ExampleFilterSession::ExampleFilterSession(MXS_SESSION* pSession, ExampleFilter& filter)
: mxs::FilterSession(pSession)
, m_filter(filter)
, m_session_id(pSession->ses_id)
{
}
@ -27,21 +29,31 @@ ExampleFilterSession::~ExampleFilterSession()
}
// static
ExampleFilterSession* ExampleFilterSession::create(MXS_SESSION* pSession, const ExampleFilter* pFilter)
ExampleFilterSession* ExampleFilterSession::create(MXS_SESSION* pSession, ExampleFilter& filter)
{
return new ExampleFilterSession(pSession);
return new ExampleFilterSession(pSession, filter);
}
void ExampleFilterSession::close()
{
// When the session is closed, report the numbers to the log.
MXS_NOTICE("Session %lu routed %i queries and %i replies.", m_session_id, m_queries, m_replies);
}
int ExampleFilterSession::routeQuery(GWBUF* pPacket)
{
m_queries++;
m_filter.query_seen();
// Pass the query forward.
return mxs::FilterSession::routeQuery(pPacket);
}
int ExampleFilterSession::clientReply(GWBUF* pPacket)
{
m_replies++;
m_filter.reply_seen();
// Pass the reply forward.
return mxs::FilterSession::clientReply(pPacket);
}

View File

@ -17,6 +17,11 @@
class ExampleFilter;
/*
* Defines session-specific data for this filter. An object of this class is created when a client connects
* and deleted on disconnect. The object is only accessed from one thread because sessions are locked to
* a thread when created.
*/
class ExampleFilterSession : public maxscale::FilterSession
{
// Prevent copy-constructor and assignment operator usage
@ -26,19 +31,48 @@ class ExampleFilterSession : public maxscale::FilterSession
public:
~ExampleFilterSession();
// Called when a client session has been closed
// Called when a client session has been closed. Destructor will be called right after.
void close();
// Create a new filter session
static ExampleFilterSession* create(MXS_SESSION* pSession, const ExampleFilter* pFilter);
/**
* Called by ExampleFilter::newSession() to create the session.
*
* @param pSession pSession The generic MaxScale session object
* @param pFilter The shared filter object
* @return A new session or NULL on failure
*/
static ExampleFilterSession* create(MXS_SESSION* pSession, ExampleFilter& pFilter);
// Handle a query from the client
/**
* Handle a query from the client. This is called when the client sends a query and the query has not
* been blocked by any previous component in the query processing chain. The filter should do its own
* processing and then send the query to the next component. If the query comes in multiple packets,
* this is called for each packet.
*
* @param pPacket Packet containing the query, or at least a part of it
* @return 0 on success. This typically depends on the later stages of the query processing chain.
*/
int routeQuery(GWBUF* pPacket);
// Handle a reply from server
/**
* Handle a reply from server. The reply typically contains a resultset or a response to a command.
* The filter should do its own processing and then send the query to the next component.
* If the reply comes in multiple packets, this is called for each packet. The processing chain for
* replies is the same as for queries, just walked in the opposite direction.
*
* @param pPacket Packet containing results
* @return 0 on success. This typically depends on the later stages of the reply processing chain.
*/
int clientReply(GWBUF* pPacket);
private:
// Used in the create function
ExampleFilterSession(MXS_SESSION* pSession);
ExampleFilterSession(MXS_SESSION* pSession, ExampleFilter& filter);
ExampleFilter& m_filter; /**< Shared filter data */
uint64_t m_session_id {0}; /**< Session id */
int m_queries {0}; /**< How many queries has this session seen */
int m_replies {0}; /**< How many replies has this session seen */
};

View File

@ -255,21 +255,27 @@ int sqlite3GetToken(const unsigned char *z, int *tokenType){
return 1;
}
#ifdef MAXSCALE
if ( z[2] == '!' ){
if ( z[2]=='!' || (z[2]=='M' && z[3]=='!')){
int j = (z[2]=='M' ? 4 : 3);
// MySQL-specific code
for (i=3, c=z[2]; (c!='*' || z[i]!='/') && (c=z[i])!=0; i++){}
for (i=j, c=z[j-1]; (c!='*' || z[i]!='/') && (c=z[i])!=0; i++){}
if (c=='*' && z[i]=='/'){
if (sqlite3Isdigit(z[3])) {
// A version specific executable comment, e.g. "/*!99999 ..." => never parsed.
if (sqlite3Isdigit(z[j])) {
// A version specific executable comment,
// e.g. "/*!99999 ..." or "/*M!99999 ..." => never parsed.
extern void maxscaleSetStatusCap(int);
maxscaleSetStatusCap(2); // QC_QUERY_PARTIALLY_PARSED, see query_classifier.h:qc_parse_result
++i; // Next after the trailing '/'
}
else {
// A non-version specific executable comment, e.g. "/*! select 1 */ => always parsed.
// A non-version specific executable comment,
// e.g."/*! select 1 */ or "/*M! select 1 */ => always parsed.
char* znc = (char*) z;
znc[0]=znc[1]=znc[2]=znc[i-1]=znc[i]=' '; // Remove comment chars, i.e. "/*!" and "*/".
for (i=3; sqlite3Isspace(z[i]); ++i){} // Jump over any space.
if (j==4){
znc[3]=0; // It wasn't "/*!" but "/*M!".
}
for (i=j; sqlite3Isspace(z[i]); ++i){} // Jump over any space.
}
}
} else {