From a1e82872655a0b9821ee4e7f998caf8d4c009972 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Tue, 29 Oct 2019 16:35:51 +0200 Subject: [PATCH] Add `global` to ccrfilter The `global` parameter causes the time window defined by the `time` parameter to be applied at the instance level instead of the session level. This means that a write from one connection will cause all other connections to use the master for a certain period of time. Using a configurable time window for consistency is not good as it is not absolute and cannot adjust to how servers behave. One example that demonstrates this is when a slave is normally lagging behind by less than a second but some event causes the lag to spike up to several seconds. In this case the configured time window would no longer guarantee consistency. Another reason to avoid a "static" time window is the fact taht it prevents load balancing in the cases where slaves catch up to the master within time window. This happens when time is configured to a higher value to avoid inconsistencies at all costs. Added a test case that verified the feature works. --- Documentation/Filters/CCRFilter.md | 10 +++++ maxscale-system-test/CMakeLists.txt | 1 + maxscale-system-test/ccrfilter_global.cpp | 45 +++++++++++++++++++ .../maxscale.cnf.template.ccrfilter_global | 37 +++++++++++++++ server/modules/filter/ccrfilter/ccrfilter.cc | 25 +++++++++-- 5 files changed, 115 insertions(+), 3 deletions(-) create mode 100644 maxscale-system-test/ccrfilter_global.cpp create mode 100644 maxscale-system-test/cnf/maxscale.cnf.template.ccrfilter_global diff --git a/Documentation/Filters/CCRFilter.md b/Documentation/Filters/CCRFilter.md index 83d9f19db..2586c55c5 100644 --- a/Documentation/Filters/CCRFilter.md +++ b/Documentation/Filters/CCRFilter.md @@ -83,6 +83,16 @@ ignore=.*UPDATE.* options=case,extended ``` +### `global` + +`global` is a boolean parameter that when enabled causes writes from one +connection to propagate to all other connections. This can be used to work +around cases where one connection writes data and another reads it, expecting +the write done by the other connection to be visible. + +This parameter only works with the `time` parameter. The use of `global` and +`count` at the same time is not allowed and will be treated as an error. + ## Example Configuration Here is a minimal filter configuration for the CCRFilter which should solve most diff --git a/maxscale-system-test/CMakeLists.txt b/maxscale-system-test/CMakeLists.txt index a1fc28aee..8a18cde7c 100644 --- a/maxscale-system-test/CMakeLists.txt +++ b/maxscale-system-test/CMakeLists.txt @@ -172,6 +172,7 @@ add_test_executable(bulk_insert.cpp bulk_insert bulk_insert LABELS MySQLProtocol # Tests for the CCRFilter module add_test_executable(ccrfilter.cpp ccrfilter_test ccrfilter LABELS ccrfilter LIGHT REPL_BACKEND) +add_test_executable(ccrfilter_global.cpp ccrfilter_global_test ccrfilter_global LABELS ccrfilter LIGHT REPL_BACKEND) # Tries to reconfigure replication setup to use another node as a Master add_test_executable(change_master_during_session.cpp change_master_during_session replication LABELS readwritesplit mysqlmon REPL_BACKEND) diff --git a/maxscale-system-test/ccrfilter_global.cpp b/maxscale-system-test/ccrfilter_global.cpp new file mode 100644 index 000000000..709c19a98 --- /dev/null +++ b/maxscale-system-test/ccrfilter_global.cpp @@ -0,0 +1,45 @@ +/** + * Test global mode for the CCRFilter + */ + +#include "testconnections.h" + +int main(int argc, char* argv[]) +{ + TestConnections test(argc, argv); + + auto conn = test.maxscales->rwsplit(); + conn.connect(); + test.expect(conn.query("CREATE OR REPLACE TABLE test.t1 (a LONGTEXT)"), + "Table creation should work: %s", conn.error()); + conn.disconnect(); + + std::string data(1000000, 'a'); + auto secondary = test.maxscales->rwsplit(); + secondary.connect(); + + for (int i = 0; i < 50; i++) + { + conn.connect(); + test.expect(conn.query("INSERT INTO test.t1 VALUES ('" + data + "')"), + "INSERT should work: %s", conn.error()); + conn.disconnect(); + + // New connections should see the inserted rows + conn.connect(); + auto count = std::stoi(conn.field("SELECT COUNT(*) FROM test.t1")); + test.expect(count == i + 1, "Missing `%d` rows.", (i + 1) - count); + conn.disconnect(); + + // Existing connections should also see the inserted rows + auto second_count = std::stoi(secondary.field("SELECT COUNT(*) FROM test.t1")); + test.expect(second_count == i + 1, "Missing `%d` rows from open connection.", (i + 1) - count); + } + + conn.connect(); + test.expect(conn.query("DROP TABLE test.t1"), + "Table creation should work: %s", conn.error()); + conn.disconnect(); + + return test.global_result; +} diff --git a/maxscale-system-test/cnf/maxscale.cnf.template.ccrfilter_global b/maxscale-system-test/cnf/maxscale.cnf.template.ccrfilter_global new file mode 100644 index 000000000..c360e6b35 --- /dev/null +++ b/maxscale-system-test/cnf/maxscale.cnf.template.ccrfilter_global @@ -0,0 +1,37 @@ +[maxscale] +threads=###threads### +log_info=1 + +###server### + +[MySQL-Monitor] +type=monitor +module=mysqlmon +servers=###server_line### +user=maxskysql +password=skysql +monitor_interval=1000 + +# CCRFilter + +[ccrfilter] +type=filter +module=ccrfilter +time=5 +global=true + +# RWSplit + +[RW-Split-Router] +type=service +router=readwritesplit +servers=###server_line### +user=maxskysql +password=skysql +filters=ccrfilter + +[RW-Split-Listener] +type=listener +service=RW-Split-Router +protocol=MySQLClient +port=4006 diff --git a/server/modules/filter/ccrfilter/ccrfilter.cc b/server/modules/filter/ccrfilter/ccrfilter.cc index ca2417df5..46cdb6107 100644 --- a/server/modules/filter/ccrfilter/ccrfilter.cc +++ b/server/modules/filter/ccrfilter/ccrfilter.cc @@ -32,6 +32,8 @@ namespace const char PARAM_MATCH[] = "match"; const char PARAM_IGNORE[] = "ignore"; +const char PARAM_GLOBAL[] = "global"; + const MXS_ENUM_VALUE option_values[] = { {"ignorecase", PCRE2_CASELESS}, @@ -85,6 +87,12 @@ public: static CCRFilter* create(const char* name, MXS_CONFIG_PARAMETER* params) { + if (params->get_integer("count") && params->get_bool(PARAM_GLOBAL)) + { + MXS_ERROR("'count' and 'global' cannot be used at the same time."); + return nullptr; + } + CCRFilter* new_instance = new(std::nothrow) CCRFilter; if (new_instance) { @@ -92,6 +100,7 @@ public: new_instance->m_time = params->get_duration("time").count(); new_instance->m_match = params->get_string(PARAM_MATCH); new_instance->m_nomatch = params->get_string(PARAM_IGNORE); + new_instance->m_global = params->get_bool(PARAM_GLOBAL); int cflags = params->get_enum("options", option_values); bool compile_error = false; @@ -181,6 +190,9 @@ private: * a data modification operation is done. */ int m_count = 0; /* Number of hints to add after each operation that modifies data. */ + bool m_global; + std::atomic m_last_modification {0}; /* Time of the last data modifying operation */ + LagStats m_stats; pcre2_code* re = nullptr; /* Compiled regex text of match */ pcre2_code* nore = nullptr; /* Compiled regex text of ignore */ @@ -251,6 +263,11 @@ int CCRSession::routeQuery(GWBUF* queue) m_last_modification = now; MXS_INFO("Write operation detected, queries routed to master for %d seconds", filter->m_time); + + if (filter->m_global) + { + filter->m_last_modification.store(now, std::memory_order_relaxed); + } } filter->m_stats.n_modified++; @@ -266,7 +283,8 @@ int CCRSession::routeQuery(GWBUF* queue) } else if (filter->m_time) { - double dt = difftime(now, m_last_modification); + double dt = std::min(difftime(now, m_last_modification), + difftime(now, filter->m_last_modification.load(std::memory_order_relaxed))); if (dt < filter->m_time) { @@ -347,8 +365,9 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE() NULL, NULL, { - {"count", MXS_MODULE_PARAM_COUNT, "0" }, - {"time", MXS_MODULE_PARAM_DURATION, "60s"}, + {"count", MXS_MODULE_PARAM_COUNT, "0" }, + {"time", MXS_MODULE_PARAM_DURATION, "60s" }, + {PARAM_GLOBAL, MXS_MODULE_PARAM_BOOL, "false"}, {PARAM_MATCH, MXS_MODULE_PARAM_REGEX}, {PARAM_IGNORE, MXS_MODULE_PARAM_REGEX}, {