Add global to ccrfilter

The `global` parameter causes the time window defined by the `time`
parameter to be applied at the instance level instead of the session
level. This means that a write from one connection will cause all other
connections to use the master for a certain period of time.

Using a configurable time window for consistency is not good as it is not
absolute and cannot adjust to how servers behave.

One example that demonstrates this is when a slave is normally lagging
behind by less than a second but some event causes the lag to spike up to
several seconds. In this case the configured time window would no longer
guarantee consistency.

Another reason to avoid a "static" time window is the fact taht it
prevents load balancing in the cases where slaves catch up to the master
within time window. This happens when time is configured to a higher value
to avoid inconsistencies at all costs.

Added a test case that verified the feature works.
This commit is contained in:
Markus Mäkelä
2019-10-29 16:35:51 +02:00
parent dcd3e60630
commit a1e8287265
5 changed files with 115 additions and 3 deletions

View File

@ -83,6 +83,16 @@ ignore=.*UPDATE.*
options=case,extended options=case,extended
``` ```
### `global`
`global` is a boolean parameter that when enabled causes writes from one
connection to propagate to all other connections. This can be used to work
around cases where one connection writes data and another reads it, expecting
the write done by the other connection to be visible.
This parameter only works with the `time` parameter. The use of `global` and
`count` at the same time is not allowed and will be treated as an error.
## Example Configuration ## Example Configuration
Here is a minimal filter configuration for the CCRFilter which should solve most Here is a minimal filter configuration for the CCRFilter which should solve most

View File

@ -172,6 +172,7 @@ add_test_executable(bulk_insert.cpp bulk_insert bulk_insert LABELS MySQLProtocol
# Tests for the CCRFilter module # Tests for the CCRFilter module
add_test_executable(ccrfilter.cpp ccrfilter_test ccrfilter LABELS ccrfilter LIGHT REPL_BACKEND) add_test_executable(ccrfilter.cpp ccrfilter_test ccrfilter LABELS ccrfilter LIGHT REPL_BACKEND)
add_test_executable(ccrfilter_global.cpp ccrfilter_global_test ccrfilter_global LABELS ccrfilter LIGHT REPL_BACKEND)
# Tries to reconfigure replication setup to use another node as a Master # Tries to reconfigure replication setup to use another node as a Master
add_test_executable(change_master_during_session.cpp change_master_during_session replication LABELS readwritesplit mysqlmon REPL_BACKEND) add_test_executable(change_master_during_session.cpp change_master_during_session replication LABELS readwritesplit mysqlmon REPL_BACKEND)

View File

@ -0,0 +1,45 @@
/**
* Test global mode for the CCRFilter
*/
#include "testconnections.h"
int main(int argc, char* argv[])
{
TestConnections test(argc, argv);
auto conn = test.maxscales->rwsplit();
conn.connect();
test.expect(conn.query("CREATE OR REPLACE TABLE test.t1 (a LONGTEXT)"),
"Table creation should work: %s", conn.error());
conn.disconnect();
std::string data(1000000, 'a');
auto secondary = test.maxscales->rwsplit();
secondary.connect();
for (int i = 0; i < 50; i++)
{
conn.connect();
test.expect(conn.query("INSERT INTO test.t1 VALUES ('" + data + "')"),
"INSERT should work: %s", conn.error());
conn.disconnect();
// New connections should see the inserted rows
conn.connect();
auto count = std::stoi(conn.field("SELECT COUNT(*) FROM test.t1"));
test.expect(count == i + 1, "Missing `%d` rows.", (i + 1) - count);
conn.disconnect();
// Existing connections should also see the inserted rows
auto second_count = std::stoi(secondary.field("SELECT COUNT(*) FROM test.t1"));
test.expect(second_count == i + 1, "Missing `%d` rows from open connection.", (i + 1) - count);
}
conn.connect();
test.expect(conn.query("DROP TABLE test.t1"),
"Table creation should work: %s", conn.error());
conn.disconnect();
return test.global_result;
}

View File

@ -0,0 +1,37 @@
[maxscale]
threads=###threads###
log_info=1
###server###
[MySQL-Monitor]
type=monitor
module=mysqlmon
servers=###server_line###
user=maxskysql
password=skysql
monitor_interval=1000
# CCRFilter
[ccrfilter]
type=filter
module=ccrfilter
time=5
global=true
# RWSplit
[RW-Split-Router]
type=service
router=readwritesplit
servers=###server_line###
user=maxskysql
password=skysql
filters=ccrfilter
[RW-Split-Listener]
type=listener
service=RW-Split-Router
protocol=MySQLClient
port=4006

View File

@ -32,6 +32,8 @@ namespace
const char PARAM_MATCH[] = "match"; const char PARAM_MATCH[] = "match";
const char PARAM_IGNORE[] = "ignore"; const char PARAM_IGNORE[] = "ignore";
const char PARAM_GLOBAL[] = "global";
const MXS_ENUM_VALUE option_values[] = const MXS_ENUM_VALUE option_values[] =
{ {
{"ignorecase", PCRE2_CASELESS}, {"ignorecase", PCRE2_CASELESS},
@ -85,6 +87,12 @@ public:
static CCRFilter* create(const char* name, MXS_CONFIG_PARAMETER* params) static CCRFilter* create(const char* name, MXS_CONFIG_PARAMETER* params)
{ {
if (params->get_integer("count") && params->get_bool(PARAM_GLOBAL))
{
MXS_ERROR("'count' and 'global' cannot be used at the same time.");
return nullptr;
}
CCRFilter* new_instance = new(std::nothrow) CCRFilter; CCRFilter* new_instance = new(std::nothrow) CCRFilter;
if (new_instance) if (new_instance)
{ {
@ -92,6 +100,7 @@ public:
new_instance->m_time = params->get_duration<std::chrono::seconds>("time").count(); new_instance->m_time = params->get_duration<std::chrono::seconds>("time").count();
new_instance->m_match = params->get_string(PARAM_MATCH); new_instance->m_match = params->get_string(PARAM_MATCH);
new_instance->m_nomatch = params->get_string(PARAM_IGNORE); new_instance->m_nomatch = params->get_string(PARAM_IGNORE);
new_instance->m_global = params->get_bool(PARAM_GLOBAL);
int cflags = params->get_enum("options", option_values); int cflags = params->get_enum("options", option_values);
bool compile_error = false; bool compile_error = false;
@ -181,6 +190,9 @@ private:
* a data modification operation is done. */ * a data modification operation is done. */
int m_count = 0; /* Number of hints to add after each operation that modifies data. */ int m_count = 0; /* Number of hints to add after each operation that modifies data. */
bool m_global;
std::atomic<time_t> m_last_modification {0}; /* Time of the last data modifying operation */
LagStats m_stats; LagStats m_stats;
pcre2_code* re = nullptr; /* Compiled regex text of match */ pcre2_code* re = nullptr; /* Compiled regex text of match */
pcre2_code* nore = nullptr; /* Compiled regex text of ignore */ pcre2_code* nore = nullptr; /* Compiled regex text of ignore */
@ -251,6 +263,11 @@ int CCRSession::routeQuery(GWBUF* queue)
m_last_modification = now; m_last_modification = now;
MXS_INFO("Write operation detected, queries routed to master for %d seconds", MXS_INFO("Write operation detected, queries routed to master for %d seconds",
filter->m_time); filter->m_time);
if (filter->m_global)
{
filter->m_last_modification.store(now, std::memory_order_relaxed);
}
} }
filter->m_stats.n_modified++; filter->m_stats.n_modified++;
@ -266,7 +283,8 @@ int CCRSession::routeQuery(GWBUF* queue)
} }
else if (filter->m_time) else if (filter->m_time)
{ {
double dt = difftime(now, m_last_modification); double dt = std::min(difftime(now, m_last_modification),
difftime(now, filter->m_last_modification.load(std::memory_order_relaxed)));
if (dt < filter->m_time) if (dt < filter->m_time)
{ {
@ -347,8 +365,9 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
NULL, NULL,
NULL, NULL,
{ {
{"count", MXS_MODULE_PARAM_COUNT, "0" }, {"count", MXS_MODULE_PARAM_COUNT, "0" },
{"time", MXS_MODULE_PARAM_DURATION, "60s"}, {"time", MXS_MODULE_PARAM_DURATION, "60s" },
{PARAM_GLOBAL, MXS_MODULE_PARAM_BOOL, "false"},
{PARAM_MATCH, MXS_MODULE_PARAM_REGEX}, {PARAM_MATCH, MXS_MODULE_PARAM_REGEX},
{PARAM_IGNORE, MXS_MODULE_PARAM_REGEX}, {PARAM_IGNORE, MXS_MODULE_PARAM_REGEX},
{ {