From e829cae8b1bc171017db0fe2ebc465b7b9edb606 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Tue, 3 Dec 2019 13:58:04 +0200 Subject: [PATCH] MXS-2785: Add rewrite_src and rewrite_dest parameters The parameters allow rudimentary database rewriting in the replication stream. This is still very limited as the replacement must have the same length as the original. In theory it could be shorter without causing problems but making it longer is not easy. --- .../filter/binlogfilter/binlogfilter.cc | 25 +++++++++++++-- .../filter/binlogfilter/binlogfilter.hh | 7 ++++ .../binlogfilter/binlogfiltersession.cc | 32 +++++++++++++++---- .../binlogfilter/binlogfiltersession.hh | 3 +- 4 files changed, 55 insertions(+), 12 deletions(-) diff --git a/server/modules/filter/binlogfilter/binlogfilter.cc b/server/modules/filter/binlogfilter/binlogfilter.cc index 126909783..9a83952af 100644 --- a/server/modules/filter/binlogfilter/binlogfilter.cc +++ b/server/modules/filter/binlogfilter/binlogfilter.cc @@ -34,8 +34,10 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE() NULL, NULL, { - {"match", MXS_MODULE_PARAM_REGEX }, - {"exclude", MXS_MODULE_PARAM_REGEX }, + {"match", MXS_MODULE_PARAM_REGEX }, + {"exclude", MXS_MODULE_PARAM_REGEX }, + {REWRITE_SRC, MXS_MODULE_PARAM_STRING }, + {REWRITE_DEST, MXS_MODULE_PARAM_STRING }, {MXS_END_MODULE_PARAMS} } }; @@ -58,7 +60,24 @@ BinlogFilter::~BinlogFilter() BinlogFilter* BinlogFilter::create(const char* zName, MXS_CONFIG_PARAMETER* pParams) { - return new BinlogFilter(pParams); + BinlogFilter* rval = nullptr; + auto src = pParams->get_string(REWRITE_SRC); + auto dest = pParams->get_string(REWRITE_DEST); + + if (src.empty() != dest.empty()) + { + MXS_ERROR("Both '%s' and '%s' must be defined", REWRITE_SRC, REWRITE_DEST); + } + else if (src.length() != dest.length()) + { + MXS_ERROR("Both '%s' and '%s' must have the same length", REWRITE_SRC, REWRITE_DEST); + } + else + { + rval = new BinlogFilter(pParams); + } + + return rval; } // BinlogFilterSession create routine diff --git a/server/modules/filter/binlogfilter/binlogfilter.hh b/server/modules/filter/binlogfilter/binlogfilter.hh index c8e7aa5f0..23fa9c616 100644 --- a/server/modules/filter/binlogfilter/binlogfilter.hh +++ b/server/modules/filter/binlogfilter/binlogfilter.hh @@ -18,6 +18,9 @@ #include #include "binlogfiltersession.hh" +static constexpr const char REWRITE_SRC[] = "rewrite_src"; +static constexpr const char REWRITE_DEST[] = "rewrite_dest"; + // Binlog Filter configuration struct BinlogConfig { @@ -26,6 +29,8 @@ struct BinlogConfig , md_match(match ? pcre2_match_data_create_from_pattern(match, nullptr) : nullptr) , exclude(pParams->get_compiled_regex("exclude", 0, nullptr).release()) , md_exclude(exclude ? pcre2_match_data_create_from_pattern(exclude, nullptr) : nullptr) + , rewrite_src(pParams->get_string(REWRITE_SRC)) + , rewrite_dest(pParams->get_string(REWRITE_DEST)) { } @@ -33,6 +38,8 @@ struct BinlogConfig pcre2_match_data* md_match; pcre2_code* exclude; pcre2_match_data* md_exclude; + std::string rewrite_src; + std::string rewrite_dest; }; class BinlogFilter : public maxscale::Filter diff --git a/server/modules/filter/binlogfilter/binlogfiltersession.cc b/server/modules/filter/binlogfilter/binlogfiltersession.cc index 6e13dd355..7e5756129 100644 --- a/server/modules/filter/binlogfilter/binlogfiltersession.cc +++ b/server/modules/filter/binlogfilter/binlogfiltersession.cc @@ -305,11 +305,9 @@ bool BinlogFilterSession::checkEvent(GWBUF* buffer, case QUERY_EVENT: // Handle the SQL statement: DDL, DML, BEGIN, COMMIT - // If statement is COMMIT, then continue with next case. - if (checkStatement(body, body_size)) - { - break; - } + checkStatement(body, body_size); + fixEvent(event + MYSQL_HEADER_LEN + 1, hdr.event_size, hdr); + break; case XID_EVENT: /** Note: This case is reached when event_type is @@ -792,7 +790,7 @@ void BinlogFilterSession::handleEventData(uint32_t len) * @param event_size The binlog event size * @return False for COMMIT, true otherwise */ -bool BinlogFilterSession::checkStatement(const uint8_t* event, const uint32_t event_size) +bool BinlogFilterSession::checkStatement(uint8_t* event, const uint32_t event_size) { int db_name_len = event[4 + 4]; int var_block_len_offset = 4 + 4 + 1 + 2; @@ -803,9 +801,29 @@ bool BinlogFilterSession::checkStatement(const uint8_t* event, const uint32_t ev std::string db((char*)event + static_size + var_block_len, db_name_len); std::string sql((char*)event + static_size + var_block_len + db_name_len + 1, statement_len); - m_skip = should_skip_query(m_filter.getConfig(), sql, db); + const auto& config = m_filter.getConfig(); + m_skip = should_skip_query(config, sql, db); MXS_INFO("[%s] (%s) %s", m_skip ? "SKIP" : " ", db.c_str(), sql.c_str()); + if (!m_skip && !config.rewrite_src.empty()) + { + if (db == config.rewrite_src) + { + db = config.rewrite_dest; + } + + while (char* p = strstr(&sql[0], config.rewrite_src.c_str())) + { + memcpy(p, config.rewrite_dest.c_str(), config.rewrite_dest.length()); + } + + mxb_assert(db.length() == (size_t)db_name_len); + mxb_assert(sql.length() == (size_t)statement_len); + memcpy(event + static_size + var_block_len, db.c_str(), db_name_len); + memcpy(event + static_size + var_block_len + db_name_len + 1, sql.c_str(), statement_len); + MXS_INFO("Rename: (%s) %s", db.c_str(), sql.c_str()); + } + return true; } diff --git a/server/modules/filter/binlogfilter/binlogfiltersession.hh b/server/modules/filter/binlogfilter/binlogfiltersession.hh index 626039541..7735cdcad 100644 --- a/server/modules/filter/binlogfilter/binlogfiltersession.hh +++ b/server/modules/filter/binlogfilter/binlogfiltersession.hh @@ -100,8 +100,7 @@ private: void handleEventData(uint32_t len); // Check SQL statement in QUERY_EVENT - bool checkStatement(const uint8_t* event, - const uint32_t event_size); + bool checkStatement(uint8_t* event, const uint32_t event_size); // Check DB.TABLE in ANNOTATE_ROWS event void checkAnnotate(const uint8_t* event,