MXS-2785: Add rewrite_src and rewrite_dest parameters
The parameters allow rudimentary database rewriting in the replication stream. This is still very limited as the replacement must have the same length as the original. In theory it could be shorter without causing problems but making it longer is not easy.
This commit is contained in:
		@ -34,8 +34,10 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
 | 
				
			|||||||
        NULL,
 | 
					        NULL,
 | 
				
			||||||
        NULL,
 | 
					        NULL,
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            {"match",             MXS_MODULE_PARAM_REGEX },
 | 
					            {"match",             MXS_MODULE_PARAM_REGEX  },
 | 
				
			||||||
            {"exclude",           MXS_MODULE_PARAM_REGEX },
 | 
					            {"exclude",           MXS_MODULE_PARAM_REGEX  },
 | 
				
			||||||
 | 
					            {REWRITE_SRC,         MXS_MODULE_PARAM_STRING },
 | 
				
			||||||
 | 
					            {REWRITE_DEST,        MXS_MODULE_PARAM_STRING },
 | 
				
			||||||
            {MXS_END_MODULE_PARAMS}
 | 
					            {MXS_END_MODULE_PARAMS}
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    };
 | 
					    };
 | 
				
			||||||
@ -58,7 +60,24 @@ BinlogFilter::~BinlogFilter()
 | 
				
			|||||||
BinlogFilter* BinlogFilter::create(const char* zName,
 | 
					BinlogFilter* BinlogFilter::create(const char* zName,
 | 
				
			||||||
                                   MXS_CONFIG_PARAMETER* pParams)
 | 
					                                   MXS_CONFIG_PARAMETER* pParams)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
    return new BinlogFilter(pParams);
 | 
					    BinlogFilter* rval = nullptr;
 | 
				
			||||||
 | 
					    auto src = pParams->get_string(REWRITE_SRC);
 | 
				
			||||||
 | 
					    auto dest = pParams->get_string(REWRITE_DEST);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if (src.empty() != dest.empty())
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        MXS_ERROR("Both '%s' and '%s' must be defined", REWRITE_SRC, REWRITE_DEST);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    else if (src.length() != dest.length())
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        MXS_ERROR("Both '%s' and '%s' must have the same length", REWRITE_SRC, REWRITE_DEST);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    else
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        rval = new BinlogFilter(pParams);
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return rval;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// BinlogFilterSession create routine
 | 
					// BinlogFilterSession create routine
 | 
				
			||||||
 | 
				
			|||||||
@ -18,6 +18,9 @@
 | 
				
			|||||||
#include <maxscale/pcre2.hh>
 | 
					#include <maxscale/pcre2.hh>
 | 
				
			||||||
#include "binlogfiltersession.hh"
 | 
					#include "binlogfiltersession.hh"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static constexpr const char REWRITE_SRC[] = "rewrite_src";
 | 
				
			||||||
 | 
					static constexpr const char REWRITE_DEST[] = "rewrite_dest";
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Binlog Filter configuration
 | 
					// Binlog Filter configuration
 | 
				
			||||||
struct BinlogConfig
 | 
					struct BinlogConfig
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
@ -26,6 +29,8 @@ struct BinlogConfig
 | 
				
			|||||||
        , md_match(match ? pcre2_match_data_create_from_pattern(match, nullptr) : nullptr)
 | 
					        , md_match(match ? pcre2_match_data_create_from_pattern(match, nullptr) : nullptr)
 | 
				
			||||||
        , exclude(pParams->get_compiled_regex("exclude", 0, nullptr).release())
 | 
					        , exclude(pParams->get_compiled_regex("exclude", 0, nullptr).release())
 | 
				
			||||||
        , md_exclude(exclude ? pcre2_match_data_create_from_pattern(exclude, nullptr) : nullptr)
 | 
					        , md_exclude(exclude ? pcre2_match_data_create_from_pattern(exclude, nullptr) : nullptr)
 | 
				
			||||||
 | 
					        , rewrite_src(pParams->get_string(REWRITE_SRC))
 | 
				
			||||||
 | 
					        , rewrite_dest(pParams->get_string(REWRITE_DEST))
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -33,6 +38,8 @@ struct BinlogConfig
 | 
				
			|||||||
    pcre2_match_data* md_match;
 | 
					    pcre2_match_data* md_match;
 | 
				
			||||||
    pcre2_code*       exclude;
 | 
					    pcre2_code*       exclude;
 | 
				
			||||||
    pcre2_match_data* md_exclude;
 | 
					    pcre2_match_data* md_exclude;
 | 
				
			||||||
 | 
					    std::string       rewrite_src;
 | 
				
			||||||
 | 
					    std::string       rewrite_dest;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class BinlogFilter : public maxscale::Filter<BinlogFilter, BinlogFilterSession>
 | 
					class BinlogFilter : public maxscale::Filter<BinlogFilter, BinlogFilterSession>
 | 
				
			||||||
 | 
				
			|||||||
@ -305,11 +305,9 @@ bool BinlogFilterSession::checkEvent(GWBUF* buffer,
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        case QUERY_EVENT:
 | 
					        case QUERY_EVENT:
 | 
				
			||||||
            // Handle the SQL statement: DDL, DML, BEGIN, COMMIT
 | 
					            // Handle the SQL statement: DDL, DML, BEGIN, COMMIT
 | 
				
			||||||
            // If statement is COMMIT, then continue with next case.
 | 
					            checkStatement(body, body_size);
 | 
				
			||||||
            if (checkStatement(body, body_size))
 | 
					            fixEvent(event + MYSQL_HEADER_LEN + 1, hdr.event_size, hdr);
 | 
				
			||||||
            {
 | 
					            break;
 | 
				
			||||||
                break;
 | 
					 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        case XID_EVENT:
 | 
					        case XID_EVENT:
 | 
				
			||||||
            /** Note: This case is reached when event_type is
 | 
					            /** Note: This case is reached when event_type is
 | 
				
			||||||
@ -792,7 +790,7 @@ void BinlogFilterSession::handleEventData(uint32_t len)
 | 
				
			|||||||
 * @param event_size    The binlog event size
 | 
					 * @param event_size    The binlog event size
 | 
				
			||||||
 * @return              False for COMMIT, true otherwise
 | 
					 * @return              False for COMMIT, true otherwise
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
bool BinlogFilterSession::checkStatement(const uint8_t* event, const uint32_t event_size)
 | 
					bool BinlogFilterSession::checkStatement(uint8_t* event, const uint32_t event_size)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
    int db_name_len = event[4 + 4];
 | 
					    int db_name_len = event[4 + 4];
 | 
				
			||||||
    int var_block_len_offset = 4 + 4 + 1 + 2;
 | 
					    int var_block_len_offset = 4 + 4 + 1 + 2;
 | 
				
			||||||
@ -803,9 +801,29 @@ bool BinlogFilterSession::checkStatement(const uint8_t* event, const uint32_t ev
 | 
				
			|||||||
    std::string db((char*)event + static_size + var_block_len, db_name_len);
 | 
					    std::string db((char*)event + static_size + var_block_len, db_name_len);
 | 
				
			||||||
    std::string sql((char*)event + static_size + var_block_len + db_name_len + 1, statement_len);
 | 
					    std::string sql((char*)event + static_size + var_block_len + db_name_len + 1, statement_len);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    m_skip = should_skip_query(m_filter.getConfig(), sql, db);
 | 
					    const auto& config = m_filter.getConfig();
 | 
				
			||||||
 | 
					    m_skip = should_skip_query(config, sql, db);
 | 
				
			||||||
    MXS_INFO("[%s] (%s) %s", m_skip ? "SKIP" : "    ", db.c_str(), sql.c_str());
 | 
					    MXS_INFO("[%s] (%s) %s", m_skip ? "SKIP" : "    ", db.c_str(), sql.c_str());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if (!m_skip && !config.rewrite_src.empty())
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					        if (db == config.rewrite_src)
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            db = config.rewrite_dest;
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        while (char* p = strstr(&sql[0], config.rewrite_src.c_str()))
 | 
				
			||||||
 | 
					        {
 | 
				
			||||||
 | 
					            memcpy(p, config.rewrite_dest.c_str(), config.rewrite_dest.length());
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        mxb_assert(db.length() == (size_t)db_name_len);
 | 
				
			||||||
 | 
					        mxb_assert(sql.length() == (size_t)statement_len);
 | 
				
			||||||
 | 
					        memcpy(event + static_size + var_block_len, db.c_str(), db_name_len);
 | 
				
			||||||
 | 
					        memcpy(event + static_size + var_block_len + db_name_len + 1, sql.c_str(), statement_len);
 | 
				
			||||||
 | 
					        MXS_INFO("Rename: (%s) %s", db.c_str(), sql.c_str());
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    return true;
 | 
					    return true;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -100,8 +100,7 @@ private:
 | 
				
			|||||||
    void handleEventData(uint32_t len);
 | 
					    void handleEventData(uint32_t len);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // Check SQL statement in QUERY_EVENT
 | 
					    // Check SQL statement in QUERY_EVENT
 | 
				
			||||||
    bool checkStatement(const uint8_t* event,
 | 
					    bool checkStatement(uint8_t* event, const uint32_t event_size);
 | 
				
			||||||
                        const uint32_t event_size);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // Check DB.TABLE in ANNOTATE_ROWS event
 | 
					    // Check DB.TABLE in ANNOTATE_ROWS event
 | 
				
			||||||
    void checkAnnotate(const uint8_t* event,
 | 
					    void checkAnnotate(const uint8_t* event,
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user