Clean up CCRFilter

Rearranges code, renames fields, uses string instead of const char etc.
This commit is contained in:
Esa Korhonen
2018-11-29 12:20:04 +02:00
parent 7d4f0483a9
commit 3cb3676f3e

View File

@ -28,28 +28,12 @@
using std::string; using std::string;
#define CCR_DEFAULT_TIME "60" namespace
static const char PARAM_MATCH[] = "match";
static const char PARAM_IGNORE[] = "ignore";
typedef struct lagstats
{ {
int n_add_count; /*< No. of statements diverted based on count */
int n_add_time; /*< No. of statements diverted based on time */
int n_modified; /*< No. of statements not diverted */
} LAGSTATS;
typedef enum ccr_hint_value_t const char PARAM_MATCH[] = "match";
{ const char PARAM_IGNORE[] = "ignore";
CCR_HINT_NONE, const MXS_ENUM_VALUE option_values[] =
CCR_HINT_MATCH,
CCR_HINT_IGNORE
} CCR_HINT_VALUE;
static CCR_HINT_VALUE search_ccr_hint(GWBUF* buffer);
static const MXS_ENUM_VALUE option_values[] =
{ {
{"ignorecase", PCRE2_CASELESS}, {"ignorecase", PCRE2_CASELESS},
{"case", 0 }, {"case", 0 },
@ -57,116 +41,123 @@ static const MXS_ENUM_VALUE option_values[] =
{NULL} {NULL}
}; };
}
class CCRFilter; class CCRFilter;
class CCRSession : public mxs::FilterSession class CCRSession : public mxs::FilterSession
{ {
public: public:
CCRSession(MXS_SESSION* session, CCRFilter& instance) CCRSession(const CCRSession&) = delete;
: maxscale::FilterSession(session) CCRSession& operator=(const CCRSession&) = delete;
, instance(instance)
{
}
~CCRSession() ~CCRSession()
{ {
pcre2_match_data_free(md); pcre2_match_data_free(m_md);
} }
static CCRSession* create(MXS_SESSION* session, CCRFilter& instance); static CCRSession* create(MXS_SESSION* session, CCRFilter* instance);
int routeQuery(GWBUF* queue); int routeQuery(GWBUF* queue);
private: private:
int hints_left; /*< Number of hints left to add to queries*/ CCRFilter& m_instance;
time_t last_modification;/*< Time of the last data modifying operation */ int m_hints_left = 0; /* Number of hints left to add to queries */
pcre2_match_data* md; /*< PCRE2 match data */ time_t m_last_modification = 0; /* Time of the last data modifying operation */
CCRFilter& instance; pcre2_match_data* m_md = nullptr; /* PCRE2 match data */
};
enum CcrHintValue
{
CCR_HINT_NONE,
CCR_HINT_MATCH,
CCR_HINT_IGNORE
};
CCRSession(MXS_SESSION* session, CCRFilter* instance)
: maxscale::FilterSession(session)
, m_instance(*instance)
{
}
static CcrHintValue search_ccr_hint(GWBUF* buffer);
};
class CCRFilter : public mxs::Filter<CCRFilter, CCRSession> class CCRFilter : public mxs::Filter<CCRFilter, CCRSession>
{ {
public: public:
friend class CCRSession; friend class CCRSession; // Session needs to access & modify data in filter object
static CCRFilter* create(const char* name, MXS_CONFIG_PARAMETER* params) static CCRFilter* create(const char* name, MXS_CONFIG_PARAMETER* params)
{ {
auto my_instance = new (std::nothrow) CCRFilter; CCRFilter* new_instance = new(std::nothrow) CCRFilter;
if (my_instance) if (new_instance)
{ {
my_instance->count = config_get_integer(params, "count"); new_instance->m_count = config_get_integer(params, "count");
my_instance->time = config_get_integer(params, "time"); new_instance->m_time = config_get_integer(params, "time");
my_instance->stats.n_add_count = 0; new_instance->m_match = config_get_string(params, PARAM_MATCH);
my_instance->stats.n_add_time = 0; new_instance->m_nomatch = config_get_string(params, PARAM_IGNORE);
my_instance->stats.n_modified = 0;
my_instance->ovector_size = 0;
my_instance->re = NULL;
my_instance->nore = NULL;
int cflags = config_get_enum(params, "options", option_values); int cflags = config_get_enum(params, "options", option_values);
my_instance->match = config_copy_string(params, PARAM_MATCH);
my_instance->nomatch = config_copy_string(params, PARAM_IGNORE);
const char* keys[] = {PARAM_MATCH, PARAM_IGNORE}; const char* keys[] = {PARAM_MATCH, PARAM_IGNORE};
pcre2_code** code_arr[] = {&my_instance->re, &my_instance->nore}; pcre2_code** code_arr[] = {&new_instance->re, &new_instance->nore};
if (!config_get_compiled_regexes(params, keys, sizeof(keys) / sizeof(char*), if (!config_get_compiled_regexes(params, keys, sizeof(keys) / sizeof(char*),
cflags, &my_instance->ovector_size, cflags, &new_instance->ovector_size,
code_arr)) code_arr))
{ {
MXS_FREE(my_instance->match); delete new_instance;
MXS_FREE(my_instance->nomatch); new_instance = nullptr;
pcre2_code_free(my_instance->re);
pcre2_code_free(my_instance->nore);
delete my_instance;
my_instance = NULL;
} }
} }
return new_instance;
return my_instance;
} }
CCRSession* newSession(MXS_SESSION* session); ~CCRFilter()
{
pcre2_code_free(re);
pcre2_code_free(nore);
}
CCRSession* newSession(MXS_SESSION* session)
{
return CCRSession::create(session, this);
}
void diagnostics(DCB* dcb) const void diagnostics(DCB* dcb) const
{ {
auto my_instance = this; dcb_printf(dcb, "Configuration:\n\tCount: %d\n", m_count);
dcb_printf(dcb, "Configuration:\n\tCount: %d\n", my_instance->count); dcb_printf(dcb, "\tTime: %d seconds\n", m_time);
dcb_printf(dcb, "\tTime: %d seconds\n", my_instance->time);
if (my_instance->match) if (!m_match.empty())
{ {
dcb_printf(dcb, "\tMatch regex: %s\n", my_instance->match); dcb_printf(dcb, "\tMatch regex: %s\n", m_match.c_str());
} }
if (my_instance->nomatch) if (!m_nomatch.empty())
{ {
dcb_printf(dcb, "\tExclude regex: %s\n", my_instance->nomatch); dcb_printf(dcb, "\tExclude regex: %s\n", m_nomatch.c_str());
} }
dcb_printf(dcb, "\nStatistics:\n"); dcb_printf(dcb, "\nStatistics:\n");
dcb_printf(dcb, "\tNo. of data modifications: %d\n", my_instance->stats.n_modified); dcb_printf(dcb, "\tNo. of data modifications: %d\n", m_stats.n_modified);
dcb_printf(dcb, "\tNo. of hints added based on count: %d\n", my_instance->stats.n_add_count); dcb_printf(dcb, "\tNo. of hints added based on count: %d\n", m_stats.n_add_count);
dcb_printf(dcb, "\tNo. of hints added based on time: %d\n", my_instance->stats.n_add_time); dcb_printf(dcb, "\tNo. of hints added based on time: %d\n", m_stats.n_add_time);
} }
json_t* diagnostics_json() const json_t* diagnostics_json() const
{ {
auto my_instance = this;
json_t* rval = json_object(); json_t* rval = json_object();
json_object_set_new(rval, "count", json_integer(my_instance->count)); json_object_set_new(rval, "count", json_integer(m_count));
json_object_set_new(rval, "time", json_integer(my_instance->time)); json_object_set_new(rval, "time", json_integer(m_time));
if (my_instance->match) if (!m_match.empty())
{ {
json_object_set_new(rval, PARAM_MATCH, json_string(my_instance->match)); json_object_set_new(rval, PARAM_MATCH, json_string(m_match.c_str()));
} }
if (my_instance->nomatch) if (!m_nomatch.empty())
{ {
json_object_set_new(rval, "nomatch", json_string(my_instance->nomatch)); json_object_set_new(rval, "nomatch", json_string(m_nomatch.c_str()));
} }
json_object_set_new(rval, "data_modifications", json_integer(my_instance->stats.n_modified)); json_object_set_new(rval, "data_modifications", json_integer(m_stats.n_modified));
json_object_set_new(rval, "hints_added_count", json_integer(my_instance->stats.n_add_count)); json_object_set_new(rval, "hints_added_count", json_integer(m_stats.n_add_count));
json_object_set_new(rval, "hints_added_time", json_integer(my_instance->stats.n_add_time)); json_object_set_new(rval, "hints_added_time", json_integer(m_stats.n_add_time));
return rval; return rval;
} }
@ -176,61 +167,61 @@ public:
} }
private: private:
char* match; /* Regular expression to match */ struct LagStats
char* nomatch; /* Regular expression to ignore */ {
int time; /* The number of seconds to wait before routing queries to slave servers after int n_add_count = 0; /*< No. of statements diverted based on count */
* a data modification operation is done. */ int n_add_time = 0; /*< No. of statements diverted based on time */
int count; /* Number of hints to add after each operation that modifies data. */ int n_modified = 0; /*< No. of statements not diverted */
};
LAGSTATS stats; string m_match; /* Regular expression to match */
pcre2_code* re; /* Compiled regex text of match */ string m_nomatch; /* Regular expression to ignore */
pcre2_code* nore; /* Compiled regex text of ignore */ int m_time = 0; /* The number of seconds to wait before routing queries to slave servers after
uint32_t ovector_size; /* PCRE2 match data ovector size */ * a data modification operation is done. */
int m_count = 0; /* Number of hints to add after each operation that modifies data. */
LagStats m_stats;
pcre2_code* re = nullptr; /* Compiled regex text of match */
pcre2_code* nore = nullptr; /* Compiled regex text of ignore */
uint32_t ovector_size = 0; /* PCRE2 match data ovector size */
}; };
CCRSession* CCRSession::create(MXS_SESSION* session, CCRFilter& instance) CCRSession* CCRSession::create(MXS_SESSION* session, CCRFilter* instance)
{ {
auto my_instance = &instance; CCRSession* new_session = new(std::nothrow) CCRSession(session, instance);
auto my_session = new (std::nothrow) CCRSession(session, instance); if (new_session)
if (my_session)
{ {
bool error = false; auto ovec_size = instance->ovector_size;
my_session->hints_left = 0; if (ovec_size)
my_session->last_modification = 0;
if (my_instance->ovector_size)
{ {
my_session->md = pcre2_match_data_create(my_instance->ovector_size, NULL); new_session->m_md = pcre2_match_data_create(ovec_size, NULL);
if (!my_session->md) if (!new_session->m_md)
{ {
delete my_session; delete new_session;
my_session = NULL; new_session = nullptr;
} }
} }
} }
return my_session; return new_session;
} }
int CCRSession::routeQuery(GWBUF* queue) int CCRSession::routeQuery(GWBUF* queue)
{ {
auto my_instance = &this->instance;
auto my_session = this;
char* sql;
int length;
time_t now = time(NULL);
if (modutil_is_SQL(queue)) if (modutil_is_SQL(queue))
{ {
/** auto filter = &this->m_instance;
* Not a simple SELECT statement, possibly modifies data. If we're processing a statement time_t now = time(NULL);
* with unknown query type, the safest thing to do is to treat it as a data modifying statement. /* Not a simple SELECT statement, possibly modifies data. If we're processing a statement
*/ * with unknown query type, the safest thing to do is to treat it as a data modifying statement. */
if (qc_query_is_type(qc_get_type_mask(queue), QUERY_TYPE_WRITE)) if (qc_query_is_type(qc_get_type_mask(queue), QUERY_TYPE_WRITE))
{ {
char* sql;
int length;
if (modutil_extract_SQL(queue, &sql, &length)) if (modutil_extract_SQL(queue, &sql, &length))
{ {
bool trigger_ccr = true; bool trigger_ccr = true;
bool decided = false; // Set by hints to take precedence. bool decided = false; // Set by hints to take precedence.
CCR_HINT_VALUE ccr_hint_val = search_ccr_hint(queue); CcrHintValue ccr_hint_val = search_ccr_hint(queue);
if (ccr_hint_val == CCR_HINT_IGNORE) if (ccr_hint_val == CCR_HINT_IGNORE)
{ {
trigger_ccr = false; trigger_ccr = false;
@ -242,100 +233,50 @@ int CCRSession::routeQuery(GWBUF* queue)
} }
if (!decided) if (!decided)
{ {
trigger_ccr = trigger_ccr = mxs_pcre2_check_match_exclude(filter->re, filter->nore, m_md,
mxs_pcre2_check_match_exclude(my_instance->re, my_instance->nore, my_session->md, sql, length, MXS_MODULE_NAME);
sql, length, MXS_MODULE_NAME);
} }
if (trigger_ccr) if (trigger_ccr)
{ {
if (my_instance->count) if (filter->m_count)
{ {
my_session->hints_left = my_instance->count; m_hints_left = filter->m_count;
MXS_INFO("Write operation detected, next %d queries routed to master", MXS_INFO("Write operation detected, next %d queries routed to master",
my_instance->count); filter->m_count);
} }
if (my_instance->time) if (filter->m_time)
{ {
my_session->last_modification = now; m_last_modification = now;
MXS_INFO("Write operation detected, queries routed to master for %d seconds", MXS_INFO("Write operation detected, queries routed to master for %d seconds",
my_instance->time); filter->m_time);
} }
my_instance->stats.n_modified++; filter->m_stats.n_modified++;
} }
} }
} }
else if (my_session->hints_left > 0) else if (m_hints_left > 0)
{ {
queue->hint = hint_create_route(queue->hint, HINT_ROUTE_TO_MASTER, NULL); queue->hint = hint_create_route(queue->hint, HINT_ROUTE_TO_MASTER, NULL);
my_session->hints_left--; m_hints_left--;
my_instance->stats.n_add_count++; filter->m_stats.n_add_count++;
MXS_INFO("%d queries left", my_instance->time); MXS_INFO("%d queries left", filter->m_time);
} }
else if (my_instance->time) else if (filter->m_time)
{ {
double dt = difftime(now, my_session->last_modification); double dt = difftime(now, m_last_modification);
if (dt < my_instance->time) if (dt < filter->m_time)
{ {
queue->hint = hint_create_route(queue->hint, HINT_ROUTE_TO_MASTER, NULL); queue->hint = hint_create_route(queue->hint, HINT_ROUTE_TO_MASTER, NULL);
my_instance->stats.n_add_time++; filter->m_stats.n_add_time++;
MXS_INFO("%.0f seconds left", dt); MXS_INFO("%.0f seconds left", dt);
} }
} }
} }
return my_session->m_down.routeQuery(queue); return m_down.routeQuery(queue);
}
CCRSession* CCRFilter::newSession(MXS_SESSION* session)
{
return CCRSession::create(session, *this);
}
// Global module object
extern "C" MXS_MODULE* MXS_CREATE_MODULE()
{
static MXS_MODULE info =
{
MXS_MODULE_API_FILTER,
MXS_MODULE_GA,
MXS_FILTER_VERSION,
"A routing hint filter that send queries to the master after data modification",
"V1.1.0",
RCAP_TYPE_CONTIGUOUS_INPUT,
&CCRFilter::s_object,
NULL, /* Process init.
* */
NULL, /* Process finish.
* */
NULL, /* Thread init. */
NULL, /* Thread finish.
* */
{
{"count",
MXS_MODULE_PARAM_COUNT,
"0"},
{"time",
MXS_MODULE_PARAM_COUNT,
CCR_DEFAULT_TIME},
{PARAM_MATCH,
MXS_MODULE_PARAM_REGEX},
{PARAM_IGNORE,
MXS_MODULE_PARAM_REGEX},
{
"options",
MXS_MODULE_PARAM_ENUM,
"ignorecase",
MXS_MODULE_OPT_NONE,
option_values
},
{MXS_END_MODULE_PARAMS}
}
};
return &info;
} }
/** /**
@ -345,10 +286,10 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
* @param buffer Input buffer * @param buffer Input buffer
* @return The found ccr hint value * @return The found ccr hint value
*/ */
static CCR_HINT_VALUE search_ccr_hint(GWBUF* buffer) CCRSession::CcrHintValue CCRSession::search_ccr_hint(GWBUF* buffer)
{ {
const char CCR[] = "ccr"; const char CCR[] = "ccr";
CCR_HINT_VALUE rval = CCR_HINT_NONE; CcrHintValue rval = CCR_HINT_NONE;
bool found_ccr = false; bool found_ccr = false;
HINT** prev_ptr = &buffer->hint; HINT** prev_ptr = &buffer->hint;
HINT* hint = buffer->hint; HINT* hint = buffer->hint;
@ -368,9 +309,7 @@ static CCR_HINT_VALUE search_ccr_hint(GWBUF* buffer)
} }
else else
{ {
MXS_ERROR("Unknown value for hint parameter %s: '%s'.", MXS_ERROR("Unknown value for hint parameter %s: '%s'.", CCR, (char*)hint->value);
CCR,
(char*)hint->value);
} }
} }
else else
@ -387,3 +326,33 @@ static CCR_HINT_VALUE search_ccr_hint(GWBUF* buffer)
} }
return rval; return rval;
} }
// Global module object
extern "C" MXS_MODULE* MXS_CREATE_MODULE()
{
static const char DESCRIPTION[] = "A routing hint filter that sends queries to the master "
"after data modification";
static MXS_MODULE info =
{
MXS_MODULE_API_FILTER,
MXS_MODULE_GA,
MXS_FILTER_VERSION,
DESCRIPTION,
"V1.1.0",
RCAP_TYPE_CONTIGUOUS_INPUT,
&CCRFilter::s_object,
NULL, /* Process init. */
NULL, /* Process finish. */
NULL, /* Thread init. */
NULL, /* Thread finish. */
{
{"count", MXS_MODULE_PARAM_COUNT, "0" },
{"time", MXS_MODULE_PARAM_COUNT, "60" },
{PARAM_MATCH, MXS_MODULE_PARAM_REGEX},
{PARAM_IGNORE, MXS_MODULE_PARAM_REGEX},
{"options", MXS_MODULE_PARAM_ENUM, "ignorecase", MXS_MODULE_OPT_NONE, option_values},
{MXS_END_MODULE_PARAMS}
}
};
return &info;
}