Merge branch '2.2' into develop

This commit is contained in:
Markus Mäkelä
2018-09-08 18:58:57 +03:00
8 changed files with 141 additions and 22 deletions

View File

@ -42,6 +42,7 @@
#include <maxscale/service.h>
#include <maxscale/spinlock.h>
#include <maxscale/utils.hh>
#include <maxscale/pcre2.h>
#include <maxscale/routingworker.h>
#include <binlog_common.h>
@ -138,7 +139,8 @@ Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRow
row_count(0),
row_target(config_get_integer(params, "group_rows")),
task_handle(0),
handler(service, handler)
handler(service, handler, config_get_compiled_regex(params, "match", 0, NULL),
config_get_compiled_regex(params, "exclude", 0, NULL))
{
if (source)
{

View File

@ -513,6 +513,8 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
{"start_index", MXS_MODULE_PARAM_COUNT, "1"},
{"block_size", MXS_MODULE_PARAM_SIZE, "0"},
{"codec", MXS_MODULE_PARAM_ENUM, "null", MXS_MODULE_OPT_ENUM_UNIQUE, codec_values},
{"match", MXS_MODULE_PARAM_REGEX},
{"exclude", MXS_MODULE_PARAM_REGEX},
{MXS_END_MODULE_PARAMS}
}
};

View File

@ -472,6 +472,12 @@ bool Rpl::handle_table_map_event(REP_HEADER *hdr, uint8_t *ptr)
int ev_len = m_event_type_hdr_lens[hdr->event_type];
read_table_info(ptr, ev_len, &id, table_ident, sizeof(table_ident));
if (!table_matches(table_ident))
{
return true;
}
auto create = m_created_tables.find(table_ident);
if (create != m_created_tables.end())
@ -600,6 +606,11 @@ bool Rpl::handle_row_event(REP_HEADER *hdr, uint8_t *ptr)
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str());
if (!table_matches(table_ident))
{
return true;
}
bool ok = m_handler->prepare_table(map->database, map->table);
auto create = m_created_tables.find(table_ident);
@ -664,8 +675,8 @@ bool Rpl::handle_row_event(REP_HEADER *hdr, uint8_t *ptr)
}
else
{
MXS_ERROR("Row event for unknown table mapped to ID %lu. Data will not "
"be processed.", table_id);
MXS_INFO("Row event for unknown table mapped to ID %lu. Data will not "
"be processed.", table_id);
}
return rval;

View File

@ -604,12 +604,17 @@ TableMapEvent *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TableCreateEvent*
std::move(cols), std::move(nulls), std::move(meta));
}
Rpl::Rpl(SERVICE* service, SRowEventHandler handler, gtid_pos_t gtid):
Rpl::Rpl(SERVICE* service, SRowEventHandler handler, pcre2_code* match, pcre2_code* exclude,
gtid_pos_t gtid):
m_handler(handler),
m_service(service),
m_binlog_checksum(0),
m_event_types(0),
m_gtid(gtid)
m_gtid(gtid),
m_match(match),
m_exclude(exclude),
m_md_match(m_match ? pcre2_match_data_create_from_pattern(m_match, NULL) : nullptr),
m_md_exclude(m_exclude ? pcre2_match_data_create_from_pattern(m_exclude, NULL) : nullptr)
{
/** For detection of CREATE/ALTER TABLE statements */
static const char* create_table_regex = "(?i)create[a-z0-9[:space:]_]+table";
@ -1356,3 +1361,20 @@ bool Rpl::table_create_alter(STableCreateEvent create, const char *sql, const ch
return true;
}
bool Rpl::table_matches(const std::string& ident)
{
bool rval = false;
if (!m_match || pcre2_match(m_match, (PCRE2_SPTR)ident.c_str(), PCRE2_ZERO_TERMINATED,
0, 0, m_md_match, NULL) > 0)
{
if (!m_exclude || pcre2_match(m_exclude, (PCRE2_SPTR)ident.c_str(), PCRE2_ZERO_TERMINATED,
0, 0, m_md_exclude, NULL) == PCRE2_ERROR_NOMATCH)
{
rval = true;
}
}
return rval;
}

View File

@ -226,7 +226,8 @@ public:
Rpl& operator=(const Rpl&) = delete;
// Construct a new replication stream transformer
Rpl(SERVICE* service, SRowEventHandler event_handler, gtid_pos_t = {});
Rpl(SERVICE* service, SRowEventHandler event_handler, pcre2_code* match, pcre2_code* exclude,
gtid_pos_t = {});
// Add a stored TableCreateEvent
void add_create(STableCreateEvent create);
@ -256,17 +257,21 @@ public:
}
private:
SRowEventHandler m_handler;
SERVICE* m_service;
pcre2_code* m_create_table_re;
pcre2_code* m_alter_table_re;
uint8_t m_binlog_checksum;
uint8_t m_event_types;
Bytes m_event_type_hdr_lens;
gtid_pos_t m_gtid;
ActiveMaps m_active_maps;
MappedTables m_table_maps;
CreatedTables m_created_tables;
SRowEventHandler m_handler;
SERVICE* m_service;
pcre2_code* m_create_table_re;
pcre2_code* m_alter_table_re;
uint8_t m_binlog_checksum;
uint8_t m_event_types;
Bytes m_event_type_hdr_lens;
gtid_pos_t m_gtid;
ActiveMaps m_active_maps;
MappedTables m_table_maps;
CreatedTables m_created_tables;
pcre2_code* m_match;
pcre2_code* m_exclude;
pcre2_match_data* m_md_match;
pcre2_match_data* m_md_exclude;
void handle_query_event(REP_HEADER *hdr, uint8_t *ptr);
bool handle_table_map_event(REP_HEADER *hdr, uint8_t *ptr);
@ -274,4 +279,5 @@ private:
STableCreateEvent table_create_copy(const char* sql, size_t len, const char* db);
bool save_and_replace_table_create(STableCreateEvent created);
bool table_create_alter(STableCreateEvent create, const char *sql, const char *end);
bool table_matches(const std::string& ident);
};