Add table filtering to avrorouter
The filtering is implemented with PCRE2 regular expressions and as such is not the most user-friendly interface.
This commit is contained in:
@ -44,6 +44,7 @@
|
||||
#include <maxscale/service.h>
|
||||
#include <maxscale/spinlock.h>
|
||||
#include <maxscale/utils.h>
|
||||
#include <maxscale/pcre2.h>
|
||||
#include <binlog_common.h>
|
||||
|
||||
#ifndef BINLOG_NAMEFMT
|
||||
@ -272,6 +273,8 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
||||
{"start_index", MXS_MODULE_PARAM_COUNT, "1"},
|
||||
{"block_size", MXS_MODULE_PARAM_SIZE, "0"},
|
||||
{"codec", MXS_MODULE_PARAM_ENUM, "null", MXS_MODULE_OPT_ENUM_UNIQUE, codec_values},
|
||||
{"match", MXS_MODULE_PARAM_REGEX},
|
||||
{"exclude", MXS_MODULE_PARAM_REGEX},
|
||||
{MXS_END_MODULE_PARAMS}
|
||||
}
|
||||
};
|
||||
@ -481,6 +484,18 @@ createInstance(SERVICE *service, char **options)
|
||||
AVRO_INSTANCE *inst;
|
||||
int i;
|
||||
|
||||
MXS_CONFIG_PARAMETER *params = service->svc_config_param;
|
||||
pcre2_code* match = config_get_compiled_regex(params, "match", 0, NULL);
|
||||
pcre2_code* exclude = config_get_compiled_regex(params, "exclude", 0, NULL);
|
||||
pcre2_match_data* md_match = NULL;
|
||||
pcre2_match_data* md_exclude = NULL;
|
||||
|
||||
if ((match && (md_match = pcre2_match_data_create_from_pattern(match, NULL)) == NULL) ||
|
||||
(exclude && (md_exclude = pcre2_match_data_create_from_pattern(exclude, NULL)) == NULL))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if ((inst = MXS_CALLOC(1, sizeof(AVRO_INSTANCE))) == NULL)
|
||||
{
|
||||
return NULL;
|
||||
@ -502,8 +517,6 @@ createInstance(SERVICE *service, char **options)
|
||||
inst->trx_count = 0;
|
||||
inst->binlogdir = NULL;
|
||||
|
||||
MXS_CONFIG_PARAMETER *params = service->svc_config_param;
|
||||
|
||||
inst->avrodir = MXS_STRDUP_A(config_get_string(params, "avrodir"));
|
||||
inst->fileroot = MXS_STRDUP_A(config_get_string(params, "filestem"));
|
||||
inst->row_target = config_get_integer(params, "group_rows");
|
||||
@ -511,6 +524,10 @@ createInstance(SERVICE *service, char **options)
|
||||
inst->codec = config_get_enum(params, "codec", codec_values);
|
||||
int first_file = config_get_integer(params, "start_index");
|
||||
inst->block_size = config_get_size(params, "block_size");
|
||||
inst->match = match;
|
||||
inst->exclude = exclude;
|
||||
inst->md_match = md_match;
|
||||
inst->md_exclude = md_exclude;
|
||||
|
||||
MXS_CONFIG_PARAMETER *param = config_get_param(params, "source");
|
||||
inst->gtid.domain = 0;
|
||||
@ -1284,3 +1301,20 @@ static bool ensure_dir_ok(const char* path, int mode)
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
bool table_matches(AVRO_INSTANCE* inst, const char* ident)
|
||||
{
|
||||
bool rval = false;
|
||||
|
||||
if (!inst->match || pcre2_match(inst->match, (PCRE2_SPTR)ident, PCRE2_ZERO_TERMINATED,
|
||||
0, 0, inst->md_match, NULL) > 0)
|
||||
{
|
||||
if (!inst->exclude || pcre2_match(inst->exclude, (PCRE2_SPTR)ident, PCRE2_ZERO_TERMINATED,
|
||||
0, 0, inst->md_exclude, NULL) == PCRE2_ERROR_NOMATCH)
|
||||
{
|
||||
rval = true;
|
||||
}
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
@ -100,6 +100,12 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
|
||||
int ev_len = router->event_type_hdr_lens[hdr->event_type];
|
||||
|
||||
read_table_info(ptr, ev_len, &id, table_ident, sizeof(table_ident));
|
||||
|
||||
if (!table_matches(router, table_ident))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
TABLE_CREATE* create = hashtable_fetch(router->created_tables, table_ident);
|
||||
|
||||
if (create)
|
||||
@ -292,6 +298,12 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
{
|
||||
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
||||
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database, map->table);
|
||||
|
||||
if (!table_matches(router, table_ident))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
AVRO_TABLE* table = hashtable_fetch(router->open_tables, table_ident);
|
||||
TABLE_CREATE* create = map->table_create;
|
||||
ss_dassert(hashtable_fetch(router->created_tables, table_ident) == create);
|
||||
@ -369,8 +381,8 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_ERROR("Row event for unknown table mapped to ID %lu. Data will not "
|
||||
"be processed.", table_id);
|
||||
MXS_INFO("Row event for unknown table mapped to ID %lu. Data will not "
|
||||
"be processed.", table_id);
|
||||
}
|
||||
|
||||
return rval;
|
||||
|
@ -303,6 +303,12 @@ typedef struct avro_instance
|
||||
* a flush of all tables */
|
||||
uint64_t block_size; /**< Avro datablock size */
|
||||
enum mxs_avro_codec_type codec; /**< Avro codec type, defaults to `null` */
|
||||
|
||||
/** Match and exclude patterns for tables */
|
||||
pcre2_code* match;
|
||||
pcre2_code* exclude;
|
||||
pcre2_match_data* md_match;
|
||||
pcre2_match_data* md_exclude;
|
||||
struct avro_instance *next;
|
||||
} AVRO_INSTANCE;
|
||||
|
||||
@ -355,6 +361,8 @@ extern void avro_flush_all_tables(AVRO_INSTANCE *router, enum avrorouter_file_op
|
||||
#define AVRO_CS_BUSY 0x0001
|
||||
#define AVRO_WAIT_DATA 0x0002
|
||||
|
||||
bool table_matches(AVRO_INSTANCE* inst, const char* ident);
|
||||
|
||||
MXS_END_DECLS
|
||||
|
||||
#endif
|
||||
|
Reference in New Issue
Block a user