diff --git a/server/modules/filter/binlogfilter/binlogfilter.cc b/server/modules/filter/binlogfilter/binlogfilter.cc index 126909783..9a83952af 100644 --- a/server/modules/filter/binlogfilter/binlogfilter.cc +++ b/server/modules/filter/binlogfilter/binlogfilter.cc @@ -34,8 +34,10 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE() NULL, NULL, { - {"match", MXS_MODULE_PARAM_REGEX }, - {"exclude", MXS_MODULE_PARAM_REGEX }, + {"match", MXS_MODULE_PARAM_REGEX }, + {"exclude", MXS_MODULE_PARAM_REGEX }, + {REWRITE_SRC, MXS_MODULE_PARAM_STRING }, + {REWRITE_DEST, MXS_MODULE_PARAM_STRING }, {MXS_END_MODULE_PARAMS} } }; @@ -58,7 +60,24 @@ BinlogFilter::~BinlogFilter() BinlogFilter* BinlogFilter::create(const char* zName, MXS_CONFIG_PARAMETER* pParams) { - return new BinlogFilter(pParams); + BinlogFilter* rval = nullptr; + auto src = pParams->get_string(REWRITE_SRC); + auto dest = pParams->get_string(REWRITE_DEST); + + if (src.empty() != dest.empty()) + { + MXS_ERROR("Both '%s' and '%s' must be defined", REWRITE_SRC, REWRITE_DEST); + } + else if (src.length() != dest.length()) + { + MXS_ERROR("Both '%s' and '%s' must have the same length", REWRITE_SRC, REWRITE_DEST); + } + else + { + rval = new BinlogFilter(pParams); + } + + return rval; } // BinlogFilterSession create routine diff --git a/server/modules/filter/binlogfilter/binlogfilter.hh b/server/modules/filter/binlogfilter/binlogfilter.hh index c8e7aa5f0..23fa9c616 100644 --- a/server/modules/filter/binlogfilter/binlogfilter.hh +++ b/server/modules/filter/binlogfilter/binlogfilter.hh @@ -18,6 +18,9 @@ #include #include "binlogfiltersession.hh" +static constexpr const char REWRITE_SRC[] = "rewrite_src"; +static constexpr const char REWRITE_DEST[] = "rewrite_dest"; + // Binlog Filter configuration struct BinlogConfig { @@ -26,6 +29,8 @@ struct BinlogConfig , md_match(match ? pcre2_match_data_create_from_pattern(match, nullptr) : nullptr) , exclude(pParams->get_compiled_regex("exclude", 0, nullptr).release()) , md_exclude(exclude ? pcre2_match_data_create_from_pattern(exclude, nullptr) : nullptr) + , rewrite_src(pParams->get_string(REWRITE_SRC)) + , rewrite_dest(pParams->get_string(REWRITE_DEST)) { } @@ -33,6 +38,8 @@ struct BinlogConfig pcre2_match_data* md_match; pcre2_code* exclude; pcre2_match_data* md_exclude; + std::string rewrite_src; + std::string rewrite_dest; }; class BinlogFilter : public maxscale::Filter diff --git a/server/modules/filter/binlogfilter/binlogfiltersession.cc b/server/modules/filter/binlogfilter/binlogfiltersession.cc index 6e13dd355..7e5756129 100644 --- a/server/modules/filter/binlogfilter/binlogfiltersession.cc +++ b/server/modules/filter/binlogfilter/binlogfiltersession.cc @@ -305,11 +305,9 @@ bool BinlogFilterSession::checkEvent(GWBUF* buffer, case QUERY_EVENT: // Handle the SQL statement: DDL, DML, BEGIN, COMMIT - // If statement is COMMIT, then continue with next case. - if (checkStatement(body, body_size)) - { - break; - } + checkStatement(body, body_size); + fixEvent(event + MYSQL_HEADER_LEN + 1, hdr.event_size, hdr); + break; case XID_EVENT: /** Note: This case is reached when event_type is @@ -792,7 +790,7 @@ void BinlogFilterSession::handleEventData(uint32_t len) * @param event_size The binlog event size * @return False for COMMIT, true otherwise */ -bool BinlogFilterSession::checkStatement(const uint8_t* event, const uint32_t event_size) +bool BinlogFilterSession::checkStatement(uint8_t* event, const uint32_t event_size) { int db_name_len = event[4 + 4]; int var_block_len_offset = 4 + 4 + 1 + 2; @@ -803,9 +801,29 @@ bool BinlogFilterSession::checkStatement(const uint8_t* event, const uint32_t ev std::string db((char*)event + static_size + var_block_len, db_name_len); std::string sql((char*)event + static_size + var_block_len + db_name_len + 1, statement_len); - m_skip = should_skip_query(m_filter.getConfig(), sql, db); + const auto& config = m_filter.getConfig(); + m_skip = should_skip_query(config, sql, db); MXS_INFO("[%s] (%s) %s", m_skip ? "SKIP" : " ", db.c_str(), sql.c_str()); + if (!m_skip && !config.rewrite_src.empty()) + { + if (db == config.rewrite_src) + { + db = config.rewrite_dest; + } + + while (char* p = strstr(&sql[0], config.rewrite_src.c_str())) + { + memcpy(p, config.rewrite_dest.c_str(), config.rewrite_dest.length()); + } + + mxb_assert(db.length() == (size_t)db_name_len); + mxb_assert(sql.length() == (size_t)statement_len); + memcpy(event + static_size + var_block_len, db.c_str(), db_name_len); + memcpy(event + static_size + var_block_len + db_name_len + 1, sql.c_str(), statement_len); + MXS_INFO("Rename: (%s) %s", db.c_str(), sql.c_str()); + } + return true; } diff --git a/server/modules/filter/binlogfilter/binlogfiltersession.hh b/server/modules/filter/binlogfilter/binlogfiltersession.hh index 626039541..7735cdcad 100644 --- a/server/modules/filter/binlogfilter/binlogfiltersession.hh +++ b/server/modules/filter/binlogfilter/binlogfiltersession.hh @@ -100,8 +100,7 @@ private: void handleEventData(uint32_t len); // Check SQL statement in QUERY_EVENT - bool checkStatement(const uint8_t* event, - const uint32_t event_size); + bool checkStatement(uint8_t* event, const uint32_t event_size); // Check DB.TABLE in ANNOTATE_ROWS event void checkAnnotate(const uint8_t* event,