MXS-1267: Use filter template in tee

The tee filter now uses the MaxScale filter template.
This commit is contained in:
Markus Mäkelä
2017-06-01 13:36:28 +03:00
parent d59e98e238
commit 53a2139bb9
8 changed files with 307 additions and 311 deletions

View File

@ -183,7 +183,7 @@ protected:
* class MyFilter : public maxscale::Filter<MyFilter, MyFilterSession>
* {
* public:
* static MyFilter* create(const char* zName, char** pzOptions, FILTER_PARAMETER** ppParams);
* static MyFilter* create(const char* zName, char** pzOptions, MXS_CONFIG_PARAMETER* ppParams);
*
* MyFilterSession* newSession(MXS_SESSION* pSession);
*

View File

@ -1,4 +1,4 @@
add_library(tee SHARED tee.cc local_client.cc)
add_library(tee SHARED tee.cc teesession.cc local_client.cc)
target_link_libraries(tee maxscale-common MySQLCommon)
set_target_properties(tee PROPERTIES VERSION "1.0.0")
install_module(tee core)

View File

@ -4,7 +4,7 @@
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General

View File

@ -5,7 +5,7 @@
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2019-07-01
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General

View File

@ -19,48 +19,13 @@
#include <maxscale/cppdefs.hh>
#include <regex.h>
#include <set>
#include <string>
#include <maxscale/filter.h>
#include <maxscale/alloc.h>
#include <maxscale/modinfo.h>
#include <maxscale/log_manager.h>
#include <maxscale/service.h>
#include <maxscale/alloc.h>
#include "tee.hh"
#include "local_client.hh"
/**
* The instance structure for the TEE filter - this holds the configuration
* information for the filter.
*/
struct Tee
{
SERVICE *service; /* The service to duplicate requests to */
char *source; /* The source of the client connection */
char *user; /* The user name to filter on */
char *match; /* Optional text to match against */
regex_t re; /* Compiled regex text */
char *nomatch; /* Optional text to match against for exclusion */
regex_t nore; /* Compiled regex nomatch text */
};
/**
* The session structure for this TEE filter.
* This stores the downstream filter information, such that the
* filter is able to pass the query on to the next filter (or router)
* in the chain.
*
* It also holds the file descriptor to which queries are written.
*/
struct TeeSession
{
MXS_DOWNSTREAM down; /**< The downstream filter */
MXS_UPSTREAM up; /**< The upstream filter */
bool passive; /**< Whether to clone queries */
LocalClient* client; /**< The client connection to the local service */
};
#include "teesession.hh"
static const MXS_ENUM_VALUE option_values[] =
{
@ -70,7 +35,26 @@ static const MXS_ENUM_VALUE option_values[] =
{NULL}
};
bool recursive_tee_usage(std::set<std::string>& services, SERVICE* service);
Tee::Tee(SERVICE* service, const char* user, const char* remote,
const char* match, const char* nomatch, int cflags):
m_service(service),
m_user(user),
m_source(remote),
m_match(match),
m_nomatch(nomatch)
{
if (*match)
{
ss_debug(int rc = )regcomp(&m_re, match, cflags);
ss_dassert(rc == 0);
}
if (*nomatch)
{
ss_debug(int rc = )regcomp(&m_nore, nomatch, cflags);
ss_dassert(rc == 0);
}
}
/**
* Create an instance of the filter for a particular service
@ -82,193 +66,44 @@ bool recursive_tee_usage(std::set<std::string>& services, SERVICE* service);
*
* @return The instance data for this new instance
*/
static MXS_FILTER *
createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *params)
Tee* Tee::create(const char *name, char **options, MXS_CONFIG_PARAMETER *params)
{
Tee *my_instance = new (std::nothrow) Tee;
Tee *my_instance = NULL;
if (my_instance)
{
my_instance->service = config_get_service(params, "service");
my_instance->source = config_copy_string(params, "source");
my_instance->user = config_copy_string(params, "user");
my_instance->match = config_copy_string(params, "match");
my_instance->nomatch = config_copy_string(params, "exclude");
SERVICE* service = config_get_service(params, "service");
const char* source = config_get_string(params, "source");
const char* user = config_get_string(params, "user");
const char* match = config_get_string(params, "match");
const char* nomatch = config_get_string(params, "exclude");
int cflags = config_get_enum(params, "options", option_values);
regex_t re;
regex_t nore;
if (my_instance->match && regcomp(&my_instance->re, my_instance->match, cflags))
if (*match && regcomp(&re, match, cflags) != 0)
{
MXS_ERROR("Invalid regular expression '%s' for the match parameter.",
my_instance->match);
MXS_FREE(my_instance->match);
MXS_FREE(my_instance->nomatch);
MXS_FREE(my_instance->source);
MXS_FREE(my_instance->user);
delete my_instance;
return NULL;
MXS_ERROR("Invalid regular expression '%s' for the match parameter.", match);
}
else if (*nomatch && regcomp(&nore, nomatch, cflags) != 0)
{
MXS_ERROR("Invalid regular expression '%s' for the nomatch parameter.", nomatch);
if (my_instance->nomatch && regcomp(&my_instance->nore, my_instance->nomatch, cflags))
if (*match)
{
MXS_ERROR("Invalid regular expression '%s' for the nomatch paramter.",
my_instance->nomatch);
if (my_instance->match)
{
regfree(&my_instance->re);
MXS_FREE(my_instance->match);
regfree(&re);
}
MXS_FREE(my_instance->nomatch);
MXS_FREE(my_instance->source);
MXS_FREE(my_instance->user);
delete my_instance;
return NULL;
}
}
return (MXS_FILTER*) my_instance;
}
/**
* Create a filter new session
*
* @param instance The filter instance data
* @param session The session itself
*
* @return Session specific data for this session
*/
static MXS_FILTER_SESSION* newSession(MXS_FILTER *instance, MXS_SESSION *session)
{
std::set<std::string> services;
if (recursive_tee_usage(services, session->service))
{
MXS_ERROR("%s: Recursive use of tee filter in service.",
session->service->name);
return NULL;
}
TeeSession* my_session = new (std::nothrow) TeeSession;
if (my_session)
{
Tee *my_instance = reinterpret_cast<Tee*>(instance);
const char* remote = session_get_remote(session);
const char* user = session_get_user(session);
if ((my_instance->source && remote && strcmp(remote, my_instance->source) != 0) ||
(my_instance->user && user && strcmp(user, my_instance->user) != 0))
{
my_session->passive = true;
my_session->client = NULL;
}
else
{
my_session->client = LocalClient::create(session, my_instance->service);
my_session->passive = false;
my_instance = new (std::nothrow) Tee(service, source, user, match, nomatch, cflags);
}
if (my_session->client == NULL)
return my_instance;
}
TeeSession* Tee::newSession(MXS_SESSION* pSession)
{
delete my_session;
my_session = NULL;
}
}
}
return reinterpret_cast<MXS_FILTER_SESSION*>(my_session);
}
/**
* Close the filter session
*
* @param instance The filter instance data
* @param session The session being closed
*/
static void closeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session)
{
}
/**
* Free the memory associated with the session
*
* @param instance The filter instance
* @param session The filter session
*/
static void freeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session)
{
TeeSession *my_session = reinterpret_cast<TeeSession*>(session);
delete my_session->client;
delete my_session;
}
/**
* Set the downstream filter or router to which queries will be
* passed from this filter.
*
* @param instance The filter instance data
* @param session The filter session
* @param downstream The downstream filter or router.
*/
static void setDownstream(MXS_FILTER *instance, MXS_FILTER_SESSION *session, MXS_DOWNSTREAM *downstream)
{
TeeSession *my_session = reinterpret_cast<TeeSession*>(session);
my_session->down = *downstream;
}
/**
* Set the downstream filter or router to which queries will be
* passed from this filter.
*
* @param instance The filter instance data
* @param session The filter session
* @param downstream The downstream filter or router.
*/
static void setUpstream(MXS_FILTER *instance, MXS_FILTER_SESSION *session, MXS_UPSTREAM *upstream)
{
TeeSession *my_session = reinterpret_cast<TeeSession*>(session);
my_session->up = *upstream;
}
/**
* Route a query
*
* @param instance Filter instance
* @param session Filter session
* @param queue The query itself
*
* @retrn 1 on success, 0 on failure
*/
static int routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *queue)
{
TeeSession *my_session = reinterpret_cast<TeeSession*>(session);
int rval = my_session->down.routeQuery(my_session->down.instance,
my_session->down.session,
queue);
my_session->client->queue_query(queue);
return rval;
}
/**
* The clientReply entry point. This is passed the response buffer
* to which the filter should be applied. Once processed the
* query is passed to the upstream component
* (filter or router) in the filter chain.
*
* @param instance The filter instance data
* @param session The filter session
* @param reply The response data
*/
static int
clientReply(MXS_FILTER* instance, MXS_FILTER_SESSION *session, GWBUF *reply)
{
TeeSession *my_session = reinterpret_cast<TeeSession*>(session);
return my_session->up.clientReply(my_session->up.instance,
my_session->up.session,
reply);
return TeeSession::create(this, pSession);
}
/**
@ -282,32 +117,29 @@ clientReply(MXS_FILTER* instance, MXS_FILTER_SESSION *session, GWBUF *reply)
* @param fsession Filter session, may be NULL
* @param dcb The DCB for diagnostic output
*/
static void
diagnostic(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, DCB *dcb)
void Tee::diagnostics(DCB *dcb)
{
Tee *my_instance = reinterpret_cast<Tee*>(instance);
if (my_instance->source)
if (m_source.length())
{
dcb_printf(dcb, "\t\tLimit to connections from %s\n",
my_instance->source);
m_source.c_str());
}
dcb_printf(dcb, "\t\tDuplicate statements to service %s\n",
my_instance->service->name);
if (my_instance->user)
m_service->name);
if (m_user.length())
{
dcb_printf(dcb, "\t\tLimit to user %s\n",
my_instance->user);
m_user.c_str());
}
if (my_instance->match)
if (m_match.length())
{
dcb_printf(dcb, "\t\tInclude queries that match %s\n",
my_instance->match);
m_match.c_str());
}
if (my_instance->nomatch)
if (m_nomatch.c_str())
{
dcb_printf(dcb, "\t\tExclude queries that match %s\n",
my_instance->nomatch);
m_nomatch.c_str());
}
}
@ -321,87 +153,35 @@ diagnostic(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, DCB *dcb)
* @param instance The filter instance
* @param fsession Filter session, may be NULL
*/
static json_t* diagnostic_json(const MXS_FILTER *instance, const MXS_FILTER_SESSION *fsession)
json_t* Tee::diagnostics_json() const
{
const Tee *my_instance = reinterpret_cast<const Tee*>(instance);
json_t* rval = json_object();
if (my_instance->source)
if (m_source.length())
{
json_object_set_new(rval, "source", json_string(my_instance->source));
json_object_set_new(rval, "source", json_string(m_source.c_str()));
}
json_object_set_new(rval, "service", json_string(my_instance->service->name));
json_object_set_new(rval, "service", json_string(m_service->name));
if (my_instance->user)
if (m_user.length())
{
json_object_set_new(rval, "user", json_string(my_instance->user));
json_object_set_new(rval, "user", json_string(m_user.c_str()));
}
if (my_instance->match)
if (m_match.length())
{
json_object_set_new(rval, "match", json_string(my_instance->match));
json_object_set_new(rval, "match", json_string(m_match.c_str()));
}
if (my_instance->nomatch)
if (m_nomatch.length())
{
json_object_set_new(rval, "exclude", json_string(my_instance->nomatch));
json_object_set_new(rval, "exclude", json_string(m_nomatch.c_str()));
}
return rval;
}
/**
* Capability routine.
*
* @return The capabilities of the filter.
*/
static uint64_t getCapabilities(MXS_FILTER* instance)
{
return RCAP_TYPE_CONTIGUOUS_INPUT;
}
/**
* Detect loops in the filter chain.
*/
bool recursive_tee_usage(std::set<std::string>& services, SERVICE* service)
{
if (!services.insert(service->name).second)
{
/** The service name was already in the set */
return true;
}
for (int i = 0; i < service->n_filters; i++)
{
const char* module = filter_def_get_module_name(service->filters[i]);
if (strcmp(module, "tee") == 0)
{
/*
* Found a Tee filter, recurse down its path
* if the service name isn't already in the hashtable.
*/
Tee* inst = (Tee*)filter_def_get_instance(service->filters[i]);
if (inst == NULL)
{
/**
* This tee instance hasn't been initialized yet and full
* resolution of recursion cannot be done now.
*/
}
else if (recursive_tee_usage(services, inst->service))
{
return true;
}
}
}
return false;
}
MXS_BEGIN_DECLS
/**
@ -414,21 +194,6 @@ MXS_BEGIN_DECLS
*/
MXS_MODULE* MXS_CREATE_MODULE()
{
static MXS_FILTER_OBJECT MyObject =
{
createInstance,
newSession,
closeSession,
freeSession,
setDownstream,
setUpstream,
routeQuery,
clientReply,
diagnostic,
diagnostic_json,
getCapabilities,
NULL, // No destroyInstance
};
static MXS_MODULE info =
{
@ -436,9 +201,9 @@ MXS_MODULE* MXS_CREATE_MODULE()
MXS_MODULE_GA,
MXS_FILTER_VERSION,
"A tee piece in the filter plumbing",
"V1.0.0",
"V1.1.0",
RCAP_TYPE_CONTIGUOUS_INPUT,
&MyObject,
&Tee::s_object,
NULL, /* Process init. */
NULL, /* Process finish. */
NULL, /* Thread init. */

View File

@ -0,0 +1,71 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include <string>
#include <regex.h>
#include <maxscale/filter.hh>
#include <maxscale/service.h>
#include "teesession.hh"
/**
* The instance structure for the TEE filter - this holds the configuration
* information for the filter.
*/
class Tee: public mxs::Filter<Tee, TeeSession>
{
Tee(const Tee&);
const Tee& operator=(const Tee&);
public:
static Tee* create(const char* zName, char** pzOptions, MXS_CONFIG_PARAMETER* ppParams);
TeeSession* newSession(MXS_SESSION* session);
void diagnostics(DCB* pDcb);
json_t* diagnostics_json() const;
uint64_t getCapabilities()
{
return RCAP_TYPE_CONTIGUOUS_INPUT;
}
bool user_matches(const char* user)const
{
return m_user.length() == 0 || strcmp(user, m_user.c_str()) == 0;
}
bool remote_matches(const char* remote)const
{
return m_source.length() == 0 || strcmp(remote, m_source.c_str()) == 0;
}
SERVICE* get_service() const
{
return m_service;
}
private:
Tee(SERVICE* service, const char* user, const char* remote,
const char* match, const char* nomatch, int cflags);
SERVICE* m_service;
std::string m_user; /* The user name to filter on */
std::string m_source; /* The source of the client connection */
std::string m_match; /* Optional text to match against */
std::string m_nomatch; /* Optional text to match against for exclusion */
regex_t m_re; /* Compiled regex text */
regex_t m_nore; /* Compiled regex nomatch text */
};

View File

@ -0,0 +1,117 @@
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include "teesession.hh"
#include "tee.hh"
#include <set>
#include <string>
/**
* Detect loops in the filter chain.
*/
bool recursive_tee_usage(std::set<std::string>& services, SERVICE* service)
{
if (!services.insert(service->name).second)
{
/** The service name was already in the set */
return true;
}
for (int i = 0; i < service->n_filters; i++)
{
const char* module = filter_def_get_module_name(service->filters[i]);
if (strcmp(module, "tee") == 0)
{
/*
* Found a Tee filter, recurse down its path
* if the service name isn't already in the hashtable.
*/
Tee* inst = (Tee*)filter_def_get_instance(service->filters[i]);
if (inst == NULL)
{
/**
* This tee instance hasn't been initialized yet and full
* resolution of recursion cannot be done now.
*/
}
else if (recursive_tee_usage(services, inst->get_service()))
{
return true;
}
}
}
return false;
}
TeeSession::TeeSession(MXS_SESSION* session, LocalClient* client):
mxs::FilterSession(session),
m_client(client)
{
}
TeeSession* TeeSession::create(Tee* my_instance, MXS_SESSION* session)
{
std::set<std::string> services;
if (recursive_tee_usage(services, my_instance->get_service()))
{
MXS_ERROR("%s: Recursive use of tee filter in service.",
session->service->name);
return NULL;
}
LocalClient* client = NULL;
if (my_instance->user_matches(session_get_user(session)) &&
my_instance->remote_matches(session_get_remote(session)))
{
if ((client = LocalClient::create(session, my_instance->get_service())) == NULL)
{
return NULL;
}
}
return new (std::nothrow) TeeSession(session, client);
}
TeeSession::~TeeSession()
{
delete m_client;
}
void TeeSession::close()
{
}
int TeeSession::routeQuery(GWBUF* queue)
{
if (m_client)
{
m_client->queue_query(queue);
}
return mxs::FilterSession::routeQuery(queue);
}
void TeeSession::diagnostics(DCB *pDcb)
{
}
json_t* TeeSession::diagnostics_json() const
{
return NULL;
}

View File

@ -0,0 +1,43 @@
#pragma once
/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/cppdefs.hh>
#include <maxscale/filter.hh>
#include "local_client.hh"
class Tee;
/**
* A Tee session
*/
class TeeSession: public mxs::FilterSession
{
TeeSession(const TeeSession&);
const TeeSession& operator=(const TeeSession&);
public:
~TeeSession();
static TeeSession* create(Tee* my_instance, MXS_SESSION* session);
void close();
int routeQuery(GWBUF* pPacket);
void diagnostics(DCB *pDcb);
json_t* diagnostics_json() const;
private:
TeeSession(MXS_SESSION* session, LocalClient* client);
LocalClient* m_client; /**< The client connection to the local service */
};