MXS-1881: Delegate event processing to the Rpl class

The actual processing of the replicated events is now delegated to the Rpl
class. This class only deals with the raw binary format log events which
allows it to be used for both binlogs stored on disk as well as binlogs
that have just been replicated.
This commit is contained in:
Markus Mäkelä
2018-06-10 12:26:18 +03:00
parent 4def9382f2
commit e74591cfe5
8 changed files with 225 additions and 189 deletions

View File

@ -145,28 +145,13 @@ Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRow
avrodir(config_get_string(params, "avrodir")), avrodir(config_get_string(params, "avrodir")),
current_pos(4), current_pos(4),
binlog_fd(-1), binlog_fd(-1),
event_types(0),
event_type_hdr_lens{0},
binlog_checksum(0),
trx_count(0), trx_count(0),
trx_target(config_get_integer(params, "group_trx")), trx_target(config_get_integer(params, "group_trx")),
row_count(0), row_count(0),
row_target(config_get_integer(params, "group_rows")), row_target(config_get_integer(params, "group_rows")),
task_handle(0), task_handle(0),
event_handler(handler) handler(service, handler)
{ {
/** For detection of CREATE/ALTER TABLE statements */
static const char* create_table_regex = "(?i)create[a-z0-9[:space:]_]+table";
static const char* alter_table_regex = "(?i)alter[[:space:]]+table";
int pcreerr;
size_t erroff;
create_table_re = pcre2_compile((PCRE2_SPTR) create_table_regex, PCRE2_ZERO_TERMINATED,
0, &pcreerr, &erroff, NULL);
ss_dassert(create_table_re); // This should never fail
alter_table_re = pcre2_compile((PCRE2_SPTR) alter_table_regex, PCRE2_ZERO_TERMINATED,
0, &pcreerr, &erroff, NULL);
ss_dassert(alter_table_re); // This should never fail
if (source) if (source)
{ {
read_source_service_options(source); read_source_service_options(source);

View File

@ -12,8 +12,8 @@
* Public License. * Public License.
*/ */
#include "rpl_events.hh"
#include "avrorouter.hh" #include "avrorouter.hh"
#include "rpl.hh"
#include <avro.h> #include <avro.h>

View File

@ -39,7 +39,6 @@
static const char *statefile_section = "avro-conversion"; static const char *statefile_section = "avro-conversion";
void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
/** /**
* Open a binlog file for reading * Open a binlog file for reading
@ -111,10 +110,11 @@ bool avro_save_conversion_state(Avro *router)
return false; return false;
} }
gtid_pos_t gtid = router->handler.get_gtid();
fprintf(config_file, "[%s]\n", statefile_section); fprintf(config_file, "[%s]\n", statefile_section);
fprintf(config_file, "position=%lu\n", router->current_pos); fprintf(config_file, "position=%lu\n", router->current_pos);
fprintf(config_file, "gtid=%lu-%lu-%lu:%lu\n", router->gtid.domain, fprintf(config_file, "gtid=%lu-%lu-%lu:%lu\n", gtid.domain,
router->gtid.server_id, router->gtid.seq, router->gtid.event_num); gtid.server_id, gtid.seq, gtid.event_num);
fprintf(config_file, "file=%s\n", router->binlog_name.c_str()); fprintf(config_file, "file=%s\n", router->binlog_name.c_str());
fclose(config_file); fclose(config_file);
@ -159,10 +159,12 @@ static int conv_state_handler(void* data, const char* section, const char* key,
if (domain && serv_id && seq && subseq) if (domain && serv_id && seq && subseq)
{ {
router->gtid.domain = strtol(domain, NULL, 10); gtid_pos_t gtid;
router->gtid.server_id = strtol(serv_id, NULL, 10); gtid.domain = strtol(domain, NULL, 10);
router->gtid.seq = strtol(seq, NULL, 10); gtid.server_id = strtol(serv_id, NULL, 10);
router->gtid.event_num = strtol(subseq, NULL, 10); gtid.seq = strtol(seq, NULL, 10);
gtid.event_num = strtol(subseq, NULL, 10);
router->handler.set_gtid(gtid);
} }
} }
else if (strcmp(key, "position") == 0) else if (strcmp(key, "position") == 0)
@ -217,10 +219,13 @@ bool avro_load_conversion_state(Avro *router)
switch (rc) switch (rc)
{ {
case 0: case 0:
rval = true; {
MXS_NOTICE("Loaded stored binary log conversion state: File: [%s] Position: [%ld] GTID: [%lu-%lu-%lu:%lu]", rval = true;
router->binlog_name.c_str(), router->current_pos, router->gtid.domain, gtid_pos_t gtid = router->handler.get_gtid();
router->gtid.server_id, router->gtid.seq, router->gtid.event_num); MXS_NOTICE("Loaded stored binary log conversion state: File: [%s] Position: [%ld] GTID: [%lu-%lu-%lu:%lu]",
router->binlog_name.c_str(), router->current_pos, gtid.domain,
gtid.server_id, gtid.seq, gtid.event_num);
}
break; break;
case -1: case -1:
@ -356,19 +361,34 @@ bool notify_cb(DCB* dcb, void* data)
return true; return true;
} }
void notify_all_clients(Avro *router) void notify_all_clients(SERVICE* service)
{ {
dcb_foreach(notify_cb, router->service); dcb_foreach(notify_cb, service);
} }
void do_checkpoint(Avro *router) void do_checkpoint(Avro *router)
{ {
router->event_handler->flush_tables(); router->handler.flush();
avro_save_conversion_state(router); avro_save_conversion_state(router);
notify_all_clients(router); notify_all_clients(router->service);
router->row_count = router->trx_count = 0; router->row_count = router->trx_count = 0;
} }
void Rpl::flush()
{
m_handler->flush_tables();
}
void Rpl::add_create(STableCreateEvent create)
{
auto it = m_created_tables.find(create->id());
if (it == m_created_tables.end() || create->version > it->second->version)
{
m_created_tables[create->id()] = create;
}
}
REP_HEADER construct_header(uint8_t* ptr) REP_HEADER construct_header(uint8_t* ptr)
{ {
REP_HEADER hdr; REP_HEADER hdr;
@ -465,8 +485,14 @@ static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos)
return rval; return rval;
} }
void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr) void Rpl::handle_event(REP_HEADER hdr, uint8_t* ptr)
{ {
if (m_binlog_checksum)
{
// We don't care about the checksum at this point so we ignore it
hdr.event_size -= 4;
}
// The following events are related to the actual data // The following events are related to the actual data
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT) if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
{ {
@ -475,45 +501,26 @@ void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr)
int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1]; int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1];
int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES; int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES;
uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES; uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES;
m_event_type_hdr_lens.assign(ptr, ptr + n_events);
// Precaution to prevent writing too much in case new events are added m_event_types = n_events;
int real_len = MXS_MIN(n_events, (int)sizeof(router->event_type_hdr_lens)); m_binlog_checksum = checksum[0];
memcpy(router->event_type_hdr_lens, ptr + BLRM_FDE_EVENT_TYPES_OFFSET, real_len);
router->event_types = n_events;
router->binlog_checksum = checksum[0];
} }
else if (hdr.event_type == TABLE_MAP_EVENT) else if (hdr.event_type == TABLE_MAP_EVENT)
{ {
handle_table_map_event(router, &hdr, ptr); handle_table_map_event(&hdr, ptr);
} }
else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) || else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) ||
(hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2)) (hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2))
{ {
router->row_count++; handle_row_event(&hdr, ptr);
handle_row_event(router, &hdr, ptr);
} }
else if (hdr.event_type == MARIADB10_GTID_EVENT) else if (hdr.event_type == MARIADB10_GTID_EVENT)
{ {
router->gtid.domain = extract_field(ptr + 8, 32); m_gtid.extract(hdr, ptr);
router->gtid.server_id = hdr.serverid;
router->gtid.seq = extract_field(ptr, 64);
router->gtid.event_num = 0;
router->gtid.timestamp = hdr.timestamp;
} }
else if (hdr.event_type == QUERY_EVENT) else if (hdr.event_type == QUERY_EVENT)
{ {
handle_query_event(router, &hdr, ptr); handle_query_event(&hdr, ptr);
}
else if (hdr.event_type == XID_EVENT)
{
router->trx_count++;
if (router->row_count >= router->row_target ||
router->trx_count >= router->trx_target)
{
do_checkpoint(router);
}
} }
} }
@ -573,14 +580,14 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
// These events are only related to binary log files // These events are only related to binary log files
if (hdr.event_type == ROTATE_EVENT) if (hdr.event_type == ROTATE_EVENT)
{ {
int len = hdr.event_size - BINLOG_EVENT_HDR_LEN - 8 - (router->binlog_checksum ? 4 : 0); int len = hdr.event_size - BINLOG_EVENT_HDR_LEN - 8 - (router->handler.have_checksums() ? 4 : 0);
next_binlog.assign((char*)ptr + 8, len); next_binlog.assign((char*)ptr + 8, len);
rotate_seen = true; rotate_seen = true;
} }
else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT) else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT)
{ {
// This appears to need special handling // This appears to need special handling
int annotate_len = hdr.event_size - BINLOG_EVENT_HDR_LEN - (router->binlog_checksum ? 4 : 0); int annotate_len = hdr.event_size - BINLOG_EVENT_HDR_LEN - (router->handler.have_checksums() ? 4 : 0);
MXS_INFO("Annotate_rows_event: %.*s", annotate_len, ptr); MXS_INFO("Annotate_rows_event: %.*s", annotate_len, ptr);
pos += hdr.event_size; pos += hdr.event_size;
router->current_pos = pos; router->current_pos = pos;
@ -589,21 +596,27 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
} }
else else
{ {
uint32_t orig_size = hdr.event_size; if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) ||
(hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2))
if (router->binlog_checksum)
{ {
// We don't care about the checksum at this point so we ignore it router->row_count++;
hdr.event_size -= 4; }
else if (hdr.event_type == XID_EVENT)
{
router->trx_count++;
} }
handle_one_event(router, ptr, hdr); router->handler.handle_event(hdr, ptr);
hdr.event_size = orig_size;
} }
gwbuf_free(result); gwbuf_free(result);
if (router->row_count >= router->row_target ||
router->trx_count >= router->trx_target)
{
do_checkpoint(router);
}
if (pos_is_ok(router, hdr, pos)) if (pos_is_ok(router, hdr, pos))
{ {
pos = hdr.next_pos; pos = hdr.next_pos;
@ -662,14 +675,9 @@ void avro_load_metadata_from_schemas(Avro *router)
if (versionend == suffix) if (versionend == suffix)
{ {
snprintf(table_ident, sizeof(table_ident), "%s.%s", db, table); snprintf(table_ident, sizeof(table_ident), "%s.%s", db, table);
auto it = router->created_tables.find(table_ident); STableCreateEvent created(table_create_from_schema(files.gl_pathv[i],
db, table, version));
if (it == router->created_tables.end() || version > it->second->version) router->handler.add_create(created);
{
STableCreateEvent created(table_create_from_schema(files.gl_pathv[i],
db, table, version));
router->created_tables[table_ident] = created;
}
} }
else else
{ {
@ -688,14 +696,14 @@ void avro_load_metadata_from_schemas(Avro *router)
* @param len Statement length * @param len Statement length
* @return True if the statement creates a new table * @return True if the statement creates a new table
*/ */
bool is_create_table_statement(Avro *router, char* ptr, size_t len) bool is_create_table_statement(pcre2_code* create_table_re, char* ptr, size_t len)
{ {
int rc = 0; int rc = 0;
pcre2_match_data *mdata = pcre2_match_data_create_from_pattern(router->create_table_re, NULL); pcre2_match_data *mdata = pcre2_match_data_create_from_pattern(create_table_re, NULL);
if (mdata) if (mdata)
{ {
rc = pcre2_match(router->create_table_re, (PCRE2_SPTR) ptr, len, 0, 0, mdata, NULL); rc = pcre2_match(create_table_re, (PCRE2_SPTR) ptr, len, 0, 0, mdata, NULL);
pcre2_match_data_free(mdata); pcre2_match_data_free(mdata);
} }
@ -740,14 +748,14 @@ bool is_create_as_statement(const char* ptr, size_t len)
* @param len Statement length * @param len Statement length
* @return True if the statement alters a table * @return True if the statement alters a table
*/ */
bool is_alter_table_statement(Avro *router, char* ptr, size_t len) bool is_alter_table_statement(pcre2_code* alter_table_re, char* ptr, size_t len)
{ {
int rc = 0; int rc = 0;
pcre2_match_data *mdata = pcre2_match_data_create_from_pattern(router->alter_table_re, NULL); pcre2_match_data *mdata = pcre2_match_data_create_from_pattern(alter_table_re, NULL);
if (mdata) if (mdata)
{ {
rc = pcre2_match(router->alter_table_re, (PCRE2_SPTR) ptr, len, 0, 0, mdata, NULL); rc = pcre2_match(alter_table_re, (PCRE2_SPTR) ptr, len, 0, 0, mdata, NULL);
pcre2_match_data_free(mdata); pcre2_match_data_free(mdata);
} }
@ -770,23 +778,23 @@ bool is_alter_table_statement(Avro *router, char* ptr, size_t len)
* @param created Created table * @param created Created table
* @return False if an error occurred and true if successful * @return False if an error occurred and true if successful
*/ */
bool save_and_replace_table_create(Avro *router, TableCreateEvent *created) bool Rpl::save_and_replace_table_create(STableCreateEvent created)
{ {
std::string table_ident = created->database + "." + created->table; std::string table_ident = created->database + "." + created->table;
auto it = router->created_tables.find(table_ident); auto it = m_created_tables.find(table_ident);
if (it != router->created_tables.end()) if (it != m_created_tables.end())
{ {
auto tm_it = router->table_maps.find(table_ident); auto tm_it = m_table_maps.find(table_ident);
if (tm_it != router->table_maps.end()) if (tm_it != m_table_maps.end())
{ {
router->active_maps.erase(tm_it->second->id); m_active_maps.erase(tm_it->second->id);
router->table_maps.erase(tm_it); m_table_maps.erase(tm_it);
} }
} }
router->created_tables[table_ident] = STableCreateEvent(created); m_created_tables[table_ident] = created;
ss_dassert(created->columns.size() > 0); ss_dassert(created->columns.size() > 0);
return true; return true;
} }
@ -844,7 +852,7 @@ static void strip_executable_comments(char *sql, int* len)
* @param pending_transaction Pointer where status of pending transaction is stored * @param pending_transaction Pointer where status of pending transaction is stored
* @param ptr Pointer to the start of the event payload * @param ptr Pointer to the start of the event payload
*/ */
void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) void Rpl::handle_query_event(REP_HEADER *hdr, uint8_t *ptr)
{ {
int dblen = ptr[DBNM_OFF]; int dblen = ptr[DBNM_OFF];
int vblklen = gw_mysql_get_byte2(ptr + VBLK_OFF); int vblklen = gw_mysql_get_byte2(ptr + VBLK_OFF);
@ -892,13 +900,13 @@ void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
char ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2]; char ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
read_table_identifier(db, sql, sql + len, ident, sizeof(ident)); read_table_identifier(db, sql, sql + len, ident, sizeof(ident));
if (is_create_table_statement(router, sql, len)) if (is_create_table_statement(m_create_table_re, sql, len))
{ {
TableCreateEvent *created = NULL; STableCreateEvent created;
if (is_create_like_statement(sql, len)) if (is_create_like_statement(sql, len))
{ {
created = table_create_copy(router, sql, len, db); created = table_create_copy(sql, len, db);
} }
else if (is_create_as_statement(sql, len)) else if (is_create_as_statement(sql, len))
{ {
@ -914,16 +922,16 @@ void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
created = table_create_alloc(ident, sql, len); created = table_create_alloc(ident, sql, len);
} }
if (created && !save_and_replace_table_create(router, created)) if (created && !save_and_replace_table_create(created))
{ {
MXS_ERROR("Failed to save statement to disk: %.*s", len, sql); MXS_ERROR("Failed to save statement to disk: %.*s", len, sql);
} }
} }
else if (is_alter_table_statement(router, sql, len)) else if (is_alter_table_statement(m_alter_table_re, sql, len))
{ {
auto it = router->created_tables.find(ident); auto it = m_created_tables.find(ident);
if (it != router->created_tables.end()) if (it != m_created_tables.end())
{ {
table_create_alter(it->second.get(), sql, sql + len); table_create_alter(it->second.get(), sql, sql + len);
} }
@ -932,11 +940,7 @@ void handle_query_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
MXS_ERROR("Alter statement to table '%s' has no preceding create statement.", ident); MXS_ERROR("Alter statement to table '%s' has no preceding create statement.", ident);
} }
} }
/* Commit received for non transactional tables, i.e. MyISAM */ // TODO: Add COMMIT handling for non-transactional tables
else if (strncmp(sql, "COMMIT", 6) == 0)
{
router->trx_count++;
}
MXS_FREE(tmp); MXS_FREE(tmp);
} }

View File

@ -62,7 +62,8 @@ static bool conversion_task_ctl(Avro *inst, bool start);
MXS_ROUTER* createInstance(SERVICE *service, char **options) MXS_ROUTER* createInstance(SERVICE *service, char **options)
{ {
uint64_t block_size = config_get_size(service->svc_config_param, "block_size"); uint64_t block_size = config_get_size(service->svc_config_param, "block_size");
mxs_avro_codec_type codec = static_cast<mxs_avro_codec_type>(config_get_enum(service->svc_config_param, "codec", codec_values)); mxs_avro_codec_type codec = static_cast<mxs_avro_codec_type>(config_get_enum(service->svc_config_param,
"codec", codec_values));
std::string avrodir = config_get_string(service->svc_config_param, "avrodir"); std::string avrodir = config_get_string(service->svc_config_param, "avrodir");
SRowEventHandler handler(new AvroConverter(avrodir, block_size, codec)); SRowEventHandler handler(new AvroConverter(avrodir, block_size, codec));
@ -153,6 +154,7 @@ static void
diagnostics(MXS_ROUTER *router, DCB *dcb) diagnostics(MXS_ROUTER *router, DCB *dcb)
{ {
Avro *router_inst = (Avro *) router; Avro *router_inst = (Avro *) router;
gtid_pos_t gtid = router_inst->handler.get_gtid();
dcb_printf(dcb, "\tAVRO files directory: %s\n", dcb_printf(dcb, "\tAVRO files directory: %s\n",
router_inst->avrodir.c_str()); router_inst->avrodir.c_str());
@ -164,12 +166,9 @@ diagnostics(MXS_ROUTER *router, DCB *dcb)
dcb_printf(dcb, "\tCurrent binlog position: %lu\n", dcb_printf(dcb, "\tCurrent binlog position: %lu\n",
router_inst->current_pos); router_inst->current_pos);
dcb_printf(dcb, "\tCurrent GTID value: %lu-%lu-%lu\n", dcb_printf(dcb, "\tCurrent GTID value: %lu-%lu-%lu\n",
router_inst->gtid.domain, router_inst->gtid.server_id, gtid.domain, gtid.server_id, gtid.seq);
router_inst->gtid.seq); dcb_printf(dcb, "\tCurrent GTID timestamp: %u\n", gtid.timestamp);
dcb_printf(dcb, "\tCurrent GTID timestamp: %u\n", dcb_printf(dcb, "\tCurrent GTID #events: %lu\n", gtid.event_num);
router_inst->gtid.timestamp);
dcb_printf(dcb, "\tCurrent GTID #events: %lu\n",
router_inst->gtid.event_num);
} }
/** /**
@ -192,11 +191,11 @@ static json_t* diagnostics_json(const MXS_ROUTER *router)
json_object_set_new(rval, "binlog_name", json_string(router_inst->binlog_name.c_str())); json_object_set_new(rval, "binlog_name", json_string(router_inst->binlog_name.c_str()));
json_object_set_new(rval, "binlog_pos", json_integer(router_inst->current_pos)); json_object_set_new(rval, "binlog_pos", json_integer(router_inst->current_pos));
snprintf(pathbuf, sizeof(pathbuf), "%lu-%lu-%lu", router_inst->gtid.domain, gtid_pos_t gtid = router_inst->handler.get_gtid();
router_inst->gtid.server_id, router_inst->gtid.seq); snprintf(pathbuf, sizeof(pathbuf), "%lu-%lu-%lu", gtid.domain, gtid.server_id, gtid.seq);
json_object_set_new(rval, "gtid", json_string(pathbuf)); json_object_set_new(rval, "gtid", json_string(pathbuf));
json_object_set_new(rval, "gtid_timestamp", json_integer(router_inst->gtid.timestamp)); json_object_set_new(rval, "gtid_timestamp", json_integer(gtid.timestamp));
json_object_set_new(rval, "gtid_event_number", json_integer(router_inst->gtid.event_num)); json_object_set_new(rval, "gtid_event_number", json_integer(gtid.event_num));
return rval; return rval;
} }
@ -281,7 +280,7 @@ bool converter_func(Worker::Call::action_t action, Avro* router)
/** We reached end of file, flush unwritten records to disk */ /** We reached end of file, flush unwritten records to disk */
if (progress) if (progress)
{ {
router->event_handler->flush_tables(); router->handler.flush();
avro_save_conversion_state(router); avro_save_conversion_state(router);
logged = false; logged = false;
} }

View File

@ -74,23 +74,23 @@ static int get_event_type(uint8_t event)
* @param hdr Replication header * @param hdr Replication header
* @param ptr Pointer to event payload * @param ptr Pointer to event payload
*/ */
bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) bool Rpl::handle_table_map_event(REP_HEADER *hdr, uint8_t *ptr)
{ {
bool rval = false; bool rval = false;
uint64_t id; uint64_t id;
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2]; char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
int ev_len = router->event_type_hdr_lens[hdr->event_type]; int ev_len = m_event_type_hdr_lens[hdr->event_type];
read_table_info(ptr, ev_len, &id, table_ident, sizeof(table_ident)); read_table_info(ptr, ev_len, &id, table_ident, sizeof(table_ident));
auto create = router->created_tables.find(table_ident); auto create = m_created_tables.find(table_ident);
if (create != router->created_tables.end()) if (create != m_created_tables.end())
{ {
ss_dassert(create->second->columns.size() > 0); ss_dassert(create->second->columns.size() > 0);
auto it = router->table_maps.find(table_ident); auto it = m_table_maps.find(table_ident);
STableMapEvent map(table_map_alloc(ptr, ev_len, create->second.get())); STableMapEvent map(table_map_alloc(ptr, ev_len, create->second.get()));
if (it != router->table_maps.end()) if (it != m_table_maps.end())
{ {
auto old = it->second; auto old = it->second;
@ -102,28 +102,23 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
} }
} }
if (router->event_handler->open_table(map, create->second)) if (m_handler->open_table(map, create->second))
{ {
create->second->was_used = true; create->second->was_used = true;
auto old = router->table_maps.find(table_ident); auto old = m_table_maps.find(table_ident);
bool notify = old != router->table_maps.end(); bool notify = old != m_table_maps.end();
if (notify) if (notify)
{ {
router->active_maps.erase(old->second->id); m_active_maps.erase(old->second->id);
} }
router->table_maps[table_ident] = map; m_table_maps[table_ident] = map;
router->active_maps[map->id] = map; m_active_maps[map->id] = map;
ss_dassert(router->active_maps[id] == map); ss_dassert(m_active_maps[id] == map);
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id); MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
rval = true; rval = true;
if (notify)
{
notify_all_clients(router);
}
} }
} }
else else
@ -133,11 +128,6 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
"table until a DDL statement for it is read.", table_ident); "table until a DDL statement for it is read.", table_ident);
} }
if (rval)
{
MXS_INFO("Table Map for '%s' at %lu", table_ident, router->current_pos);
}
return rval; return rval;
} }
@ -152,12 +142,11 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
* @param ptr Pointer to the start of the event * @param ptr Pointer to the start of the event
* @return True on succcess, false on error * @return True on succcess, false on error
*/ */
bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) bool Rpl::handle_row_event(REP_HEADER *hdr, uint8_t *ptr)
{ {
bool rval = false; bool rval = false;
uint8_t *start = ptr;
uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN; uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
uint8_t table_id_size = router->event_type_hdr_lens[hdr->event_type] == 6 ? 4 : 6; uint8_t table_id_size = m_event_type_hdr_lens[hdr->event_type] == 6 ? 4 : 6;
uint64_t table_id = 0; uint64_t table_id = 0;
/** The first value is the ID where the table was mapped. This should be /** The first value is the ID where the table was mapped. This should be
@ -213,46 +202,45 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
// There should always be a table map event prior to a row event. // There should always be a table map event prior to a row event.
auto it = router->active_maps.find(table_id); auto it = m_active_maps.find(table_id);
if (it != router->active_maps.end()) if (it != m_active_maps.end())
{ {
STableMapEvent map = it->second; STableMapEvent map = it->second;
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2]; char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str()); snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str());
bool ok = router->event_handler->prepare_table(map->database, map->table); bool ok = m_handler->prepare_table(map->database, map->table);
auto create = router->created_tables.find(table_ident); auto create = m_created_tables.find(table_ident);
if (ok && create != router->created_tables.end() && if (ok && create != m_created_tables.end() &&
ncolumns == map->columns() && create->second->columns.size() == map->columns()) ncolumns == map->columns() && create->second->columns.size() == map->columns())
{ {
/** Each event has one or more rows in it. The number of rows is not known /** Each event has one or more rows in it. The number of rows is not known
* beforehand so we must continue processing them until we reach the end * beforehand so we must continue processing them until we reach the end
* of the event. */ * of the event. */
int rows = 0; int rows = 0;
MXS_INFO("Row Event for '%s' at %lu", table_ident, router->current_pos); MXS_INFO("Row Event for '%s' at %u", table_ident, hdr->next_pos - hdr->event_size);
while (ptr < end) while (ptr < end)
{ {
static uint64_t total_row_count = 1;
int event_type = get_event_type(hdr->event_type); int event_type = get_event_type(hdr->event_type);
// Increment the event count for this transaction // Increment the event count for this transaction
router->gtid.event_num++; m_gtid.event_num++;
router->event_handler->prepare_row(router->gtid, *hdr, event_type); m_handler->prepare_row(m_gtid, *hdr, event_type);
ptr = process_row_event_data(map, create->second, router->event_handler, ptr, col_present, end); ptr = process_row_event_data(map, create->second, m_handler, ptr, col_present, end);
router->event_handler->commit(router->gtid); m_handler->commit(m_gtid);
/** Update rows events have the before and after images of the /** Update rows events have the before and after images of the
* affected rows so we'll process them as another record with * affected rows so we'll process them as another record with
* a different type */ * a different type */
if (event_type == UPDATE_EVENT) if (event_type == UPDATE_EVENT)
{ {
router->event_handler->prepare_row(router->gtid, *hdr, UPDATE_EVENT_AFTER); m_handler->prepare_row(m_gtid, *hdr, UPDATE_EVENT_AFTER);
ptr = process_row_event_data(map, create->second, router->event_handler, ptr, col_present, end); ptr = process_row_event_data(map, create->second, m_handler, ptr, col_present, end);
router->event_handler->commit(router->gtid); m_handler->commit(m_gtid);
} }
rows++; rows++;
@ -265,7 +253,7 @@ bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
MXS_ERROR("Avro file handle was not found for table %s.%s. See earlier" MXS_ERROR("Avro file handle was not found for table %s.%s. See earlier"
" errors for more details.", map->database.c_str(), map->table.c_str()); " errors for more details.", map->database.c_str(), map->table.c_str());
} }
else if (create == router->created_tables.end()) else if (create == m_created_tables.end())
{ {
MXS_ERROR("Create table statement for %s.%s was not found from the " MXS_ERROR("Create table statement for %s.%s was not found from the "
"binary logs or the stored schema was not correct.", "binary logs or the stored schema was not correct.",

View File

@ -514,7 +514,7 @@ int resolve_table_version(const char* db, const char* table)
* *
* @return New CREATE_TABLE object or NULL if an error occurred * @return New CREATE_TABLE object or NULL if an error occurred
*/ */
TableCreateEvent* table_create_alloc(char* ident, const char* sql, int len) STableCreateEvent table_create_alloc(char* ident, const char* sql, int len)
{ {
/** Extract the table definition so we can get the column names from it */ /** Extract the table definition so we can get the column names from it */
int stmt_len = 0; int stmt_len = 0;
@ -533,12 +533,12 @@ TableCreateEvent* table_create_alloc(char* ident, const char* sql, int len)
std::vector<Column> columns; std::vector<Column> columns;
process_column_definition(statement_sql, columns); process_column_definition(statement_sql, columns);
TableCreateEvent *rval = NULL; STableCreateEvent rval;
if (!columns.empty()) if (!columns.empty())
{ {
int version = resolve_table_version(database, table); int version = resolve_table_version(database, table);
rval = new (std::nothrow) TableCreateEvent(database, table, version, std::move(columns)); rval.reset(new (std::nothrow) TableCreateEvent(database, table, version, std::move(columns)));
} }
else else
{ {
@ -730,9 +730,9 @@ static bool extract_create_like_identifier(const char* sql, size_t len, char* ta
/** /**
* Create a table from another table * Create a table from another table
*/ */
TableCreateEvent* table_create_copy(Avro *router, const char* sql, size_t len, const char* db) STableCreateEvent Rpl::table_create_copy(const char* sql, size_t len, const char* db)
{ {
TableCreateEvent* rval = NULL; STableCreateEvent rval;
char target[MYSQL_TABLE_MAXLEN + 1] = ""; char target[MYSQL_TABLE_MAXLEN + 1] = "";
char source[MYSQL_TABLE_MAXLEN + 1] = ""; char source[MYSQL_TABLE_MAXLEN + 1] = "";
@ -748,11 +748,11 @@ TableCreateEvent* table_create_copy(Avro *router, const char* sql, size_t len, c
strcat(table_ident, source); strcat(table_ident, source);
auto it = router->created_tables.find(table_ident); auto it = m_created_tables.find(table_ident);
if (it != router->created_tables.end()) if (it != m_created_tables.end())
{ {
rval = new (std::nothrow) TableCreateEvent(*it->second); rval.reset(new (std::nothrow) TableCreateEvent(*it->second));
char* table = strchr(target, '.'); char* table = strchr(target, '.');
table = table ? table + 1 : target; table = table ? table + 1 : target;
rval->table = table; rval->table = table;

View File

@ -26,13 +26,12 @@
#include <maxscale/mysql_binlog.h> #include <maxscale/mysql_binlog.h>
#include <maxscale/users.h> #include <maxscale/users.h>
#include <cdc.h> #include <cdc.h>
#include <maxscale/pcre2.h>
#include <maxavro.h> #include <maxavro.h>
#include <binlog_common.h> #include <binlog_common.h>
#include <maxscale/protocol/mysql.h> #include <maxscale/protocol/mysql.h>
#include <blr_constants.h> #include <blr_constants.h>
#include "rpl_events.hh" #include "rpl.hh"
MXS_BEGIN_DECLS MXS_BEGIN_DECLS
@ -113,9 +112,6 @@ static const MXS_ENUM_VALUE codec_values[] =
{NULL} {NULL}
}; };
typedef std::tr1::unordered_map<std::string, STableCreateEvent> CreatedTables;
typedef std::tr1::unordered_map<std::string, STableMapEvent> MappedTables;
typedef std::tr1::unordered_map<uint64_t, STableMapEvent> ActiveMaps;
class Avro: public MXS_ROUTER class Avro: public MXS_ROUTER
{ {
@ -132,23 +128,12 @@ public:
std::string binlog_name; /*< Name of the current binlog file */ std::string binlog_name; /*< Name of the current binlog file */
uint64_t current_pos; /*< Current binlog position */ uint64_t current_pos; /*< Current binlog position */
int binlog_fd; /*< File descriptor of the binlog file being read */ int binlog_fd; /*< File descriptor of the binlog file being read */
pcre2_code* create_table_re; uint64_t trx_count; /*< Transactions processed */
pcre2_code* alter_table_re; uint64_t trx_target; /*< Number of transactions that trigger a flush */
uint8_t event_types; uint64_t row_count; /*< Row events processed */
uint8_t event_type_hdr_lens[MAX_EVENT_TYPE_END]; uint64_t row_target; /*< Number of row events that trigger a flush */
uint8_t binlog_checksum;
gtid_pos_t gtid;
ActiveMaps active_maps;
MappedTables table_maps;
CreatedTables created_tables;
uint64_t trx_count; /*< Transactions processed */
uint64_t trx_target; /*< Minimum about of transactions that will trigger
* a flush of all tables */
uint64_t row_count; /*< Row events processed */
uint64_t row_target; /*< Minimum about of row events that will trigger
* a flush of all tables */
uint32_t task_handle; /**< Delayed task handle */ uint32_t task_handle; /**< Delayed task handle */
SRowEventHandler event_handler; Rpl handler;
private: private:
Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRowEventHandler handler); Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRowEventHandler handler);
@ -208,8 +193,7 @@ private:
void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id, char* dest, size_t len); void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id, char* dest, size_t len);
TableMapEvent *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TableCreateEvent* create); TableMapEvent *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TableCreateEvent* create);
TableCreateEvent* table_create_alloc(char* ident, const char* sql, int len); STableCreateEvent table_create_alloc(char* ident, const char* sql, int len);
TableCreateEvent* table_create_copy(Avro *router, const char* sql, size_t len, const char* db);
bool table_create_save(TableCreateEvent *create, const char *filename); bool table_create_save(TableCreateEvent *create, const char *filename);
bool table_create_alter(TableCreateEvent *create, const char *sql, const char *end); bool table_create_alter(TableCreateEvent *create, const char *sql, const char *end);
TableCreateEvent* table_create_from_schema(const char* file, const char* db, const char* table, TableCreateEvent* table_create_from_schema(const char* file, const char* db, const char* table,
@ -221,7 +205,6 @@ bool avro_open_binlog(const char *binlogdir, const char *file, int *fd);
void avro_close_binlog(int fd); void avro_close_binlog(int fd);
avro_binlog_end_t avro_read_all_events(Avro *router); avro_binlog_end_t avro_read_all_events(Avro *router);
char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create); char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create);
bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr); bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos); void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos);
REP_HEADER construct_header(uint8_t* ptr); REP_HEADER construct_header(uint8_t* ptr);

View File

@ -18,6 +18,8 @@
#include <tr1/memory> #include <tr1/memory>
#include <tr1/unordered_map> #include <tr1/unordered_map>
#include <maxscale/pcre2.h>
#include <maxscale/service.h>
#include <binlog_common.h> #include <binlog_common.h>
typedef std::vector<uint8_t> Bytes; typedef std::vector<uint8_t> Bytes;
@ -42,6 +44,15 @@ struct gtid_pos_t
* is an internal representation of the position of * is an internal representation of the position of
* an event inside a GTID event and it is used to * an event inside a GTID event and it is used to
* rebuild GTID events in the correct order. */ * rebuild GTID events in the correct order. */
void extract(const REP_HEADER& hdr, uint8_t* ptr)
{
domain = extract_field(ptr + 8, 32);
server_id = hdr.serverid;
seq = extract_field(ptr, 64);
event_num = 0;
timestamp = hdr.timestamp;
}
}; };
/** A single column in a CREATE TABLE statement */ /** A single column in a CREATE TABLE statement */
@ -71,6 +82,11 @@ struct TableCreateEvent
{ {
} }
std::string id() const
{
return database + '.' + table;
}
std::vector<Column> columns; std::vector<Column> columns;
std::string table; std::string table;
std::string database; std::string database;
@ -113,6 +129,11 @@ struct TableMapEvent
typedef std::tr1::shared_ptr<TableCreateEvent> STableCreateEvent; typedef std::tr1::shared_ptr<TableCreateEvent> STableCreateEvent;
typedef std::tr1::shared_ptr<TableMapEvent> STableMapEvent; typedef std::tr1::shared_ptr<TableMapEvent> STableMapEvent;
// Containers for the replication events
typedef std::tr1::unordered_map<std::string, STableCreateEvent> CreatedTables;
typedef std::tr1::unordered_map<std::string, STableMapEvent> MappedTables;
typedef std::tr1::unordered_map<uint64_t, STableMapEvent> ActiveMaps;
// Handler class for row based replication events // Handler class for row based replication events
class RowEventHandler class RowEventHandler
{ {
@ -167,3 +188,59 @@ public:
}; };
typedef std::auto_ptr<RowEventHandler> SRowEventHandler; typedef std::auto_ptr<RowEventHandler> SRowEventHandler;
class Rpl
{
public:
Rpl(const Rpl&) = delete;
Rpl& operator=(const Rpl&) = delete;
// Construct a new replication stream transformer
Rpl(SERVICE* service, SRowEventHandler event_handler);
// Add a stored TableCreateEvent
void add_create(STableCreateEvent create);
// Handle a replicated binary log event
void handle_event(REP_HEADER hdr, uint8_t* ptr);
// Called when processed events need to be persisted to disk
void flush();
// Check if binlog checksums are enabled
bool have_checksums() const
{
return m_binlog_checksum;
}
// Set current GTID
void set_gtid(gtid_pos_t gtid)
{
m_gtid = gtid;
}
// Get current GTID
const gtid_pos_t& get_gtid() const
{
return m_gtid;
}
private:
SRowEventHandler m_handler;
SERVICE* m_service;
pcre2_code* m_create_table_re;
pcre2_code* m_alter_table_re;
uint8_t m_binlog_checksum;
uint8_t m_event_types;
Bytes m_event_type_hdr_lens;
gtid_pos_t m_gtid;
ActiveMaps m_active_maps;
MappedTables m_table_maps;
CreatedTables m_created_tables;
void handle_query_event(REP_HEADER *hdr, uint8_t *ptr);
bool handle_table_map_event(REP_HEADER *hdr, uint8_t *ptr);
bool handle_row_event(REP_HEADER *hdr, uint8_t *ptr);
STableCreateEvent table_create_copy(const char* sql, size_t len, const char* db);
bool save_and_replace_table_create(STableCreateEvent created);
};