From 2dc6718d47e2e12a505f3f66a2f7c3236d83b7f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Tue, 27 Nov 2018 12:35:39 +0200 Subject: [PATCH] MXS-2188: Update target table in prepare_table Passing the target table and create to the prepare_table function allows the converter to update the internal variables. --- server/modules/routing/avrorouter/avro_converter.cc | 6 ++++-- server/modules/routing/avrorouter/avro_converter.hh | 2 +- server/modules/routing/avrorouter/avro_rbr.cc | 13 ++++++++++--- server/modules/routing/avrorouter/rpl.hh | 4 ++-- 4 files changed, 17 insertions(+), 8 deletions(-) diff --git a/server/modules/routing/avrorouter/avro_converter.cc b/server/modules/routing/avrorouter/avro_converter.cc index a0c9bd4cc..9d196feae 100644 --- a/server/modules/routing/avrorouter/avro_converter.cc +++ b/server/modules/routing/avrorouter/avro_converter.cc @@ -353,15 +353,17 @@ bool AvroConverter::open_table(const STableMapEvent& map, const STableCreateEven return rval; } -bool AvroConverter::prepare_table(std::string database, std::string table) +bool AvroConverter::prepare_table(const STableMapEvent& map, const STableCreateEvent& create) { bool rval = false; - auto it = m_open_tables.find(database + "." + table); + auto it = m_open_tables.find(map->database + "." + map->table); if (it != m_open_tables.end()) { m_writer_iface = it->second->avro_writer_iface; m_avro_file = &it->second->avro_file; + m_map = map; + m_create = create; rval = true; } diff --git a/server/modules/routing/avrorouter/avro_converter.hh b/server/modules/routing/avrorouter/avro_converter.hh index fdbab4241..3dbd6e83f 100644 --- a/server/modules/routing/avrorouter/avro_converter.hh +++ b/server/modules/routing/avrorouter/avro_converter.hh @@ -49,7 +49,7 @@ public: AvroConverter(std::string avrodir, uint64_t block_size, mxs_avro_codec_type codec); bool open_table(const STableMapEvent& map, const STableCreateEvent& create); - bool prepare_table(std::string database, std::string table); + bool prepare_table(const STableMapEvent& map, const STableCreateEvent& create); void flush_tables(); void prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type); bool commit(const gtid_pos_t& gtid); diff --git a/server/modules/routing/avrorouter/avro_rbr.cc b/server/modules/routing/avrorouter/avro_rbr.cc index 0b720aea6..ac3629e00 100644 --- a/server/modules/routing/avrorouter/avro_rbr.cc +++ b/server/modules/routing/avrorouter/avro_rbr.cc @@ -243,6 +243,7 @@ uint8_t* process_row_event_data(STableMapEvent map, uint8_t* columns_present, uint8_t* end) { + mxb_assert(create->database == map->database && create->table == map->table); int npresent = 0; long ncolumns = map->columns(); uint8_t* metadata = &map->column_metadata[0]; @@ -624,11 +625,17 @@ bool Rpl::handle_row_event(REP_HEADER* hdr, uint8_t* ptr) return true; } - bool ok = m_handler->prepare_table(map->database, map->table); auto create = m_created_tables.find(table_ident); + bool ok = false; - if (ok && create != m_created_tables.end() - && ncolumns == map->columns() && create->second->columns.size() == map->columns()) + if (create != m_created_tables.end() && ncolumns == map->columns() + && create->second->columns.size() == map->columns() + && m_handler->prepare_table(map, create->second)) + { + ok = true; + } + + if (ok) { /** Each event has one or more rows in it. The number of rows is not known * beforehand so we must continue processing them until we reach the end diff --git a/server/modules/routing/avrorouter/rpl.hh b/server/modules/routing/avrorouter/rpl.hh index 7eaebb5fc..3728af8a6 100644 --- a/server/modules/routing/avrorouter/rpl.hh +++ b/server/modules/routing/avrorouter/rpl.hh @@ -183,8 +183,8 @@ public: return true; } - // Prepare a new row for processing - virtual bool prepare_table(std::string database, std::string table) + // Prepare a table for row processing + virtual bool prepare_table(const STableMapEvent& map, const STableCreateEvent& create) { return true; }