MXS-2188: Update target table in prepare_table
Passing the target table and create to the prepare_table function allows the converter to update the internal variables.
This commit is contained in:
@ -353,15 +353,17 @@ bool AvroConverter::open_table(const STableMapEvent& map, const STableCreateEven
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool AvroConverter::prepare_table(std::string database, std::string table)
|
bool AvroConverter::prepare_table(const STableMapEvent& map, const STableCreateEvent& create)
|
||||||
{
|
{
|
||||||
bool rval = false;
|
bool rval = false;
|
||||||
auto it = m_open_tables.find(database + "." + table);
|
auto it = m_open_tables.find(map->database + "." + map->table);
|
||||||
|
|
||||||
if (it != m_open_tables.end())
|
if (it != m_open_tables.end())
|
||||||
{
|
{
|
||||||
m_writer_iface = it->second->avro_writer_iface;
|
m_writer_iface = it->second->avro_writer_iface;
|
||||||
m_avro_file = &it->second->avro_file;
|
m_avro_file = &it->second->avro_file;
|
||||||
|
m_map = map;
|
||||||
|
m_create = create;
|
||||||
rval = true;
|
rval = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,7 +49,7 @@ public:
|
|||||||
|
|
||||||
AvroConverter(std::string avrodir, uint64_t block_size, mxs_avro_codec_type codec);
|
AvroConverter(std::string avrodir, uint64_t block_size, mxs_avro_codec_type codec);
|
||||||
bool open_table(const STableMapEvent& map, const STableCreateEvent& create);
|
bool open_table(const STableMapEvent& map, const STableCreateEvent& create);
|
||||||
bool prepare_table(std::string database, std::string table);
|
bool prepare_table(const STableMapEvent& map, const STableCreateEvent& create);
|
||||||
void flush_tables();
|
void flush_tables();
|
||||||
void prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type);
|
void prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type);
|
||||||
bool commit(const gtid_pos_t& gtid);
|
bool commit(const gtid_pos_t& gtid);
|
||||||
|
@ -243,6 +243,7 @@ uint8_t* process_row_event_data(STableMapEvent map,
|
|||||||
uint8_t* columns_present,
|
uint8_t* columns_present,
|
||||||
uint8_t* end)
|
uint8_t* end)
|
||||||
{
|
{
|
||||||
|
mxb_assert(create->database == map->database && create->table == map->table);
|
||||||
int npresent = 0;
|
int npresent = 0;
|
||||||
long ncolumns = map->columns();
|
long ncolumns = map->columns();
|
||||||
uint8_t* metadata = &map->column_metadata[0];
|
uint8_t* metadata = &map->column_metadata[0];
|
||||||
@ -624,11 +625,17 @@ bool Rpl::handle_row_event(REP_HEADER* hdr, uint8_t* ptr)
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ok = m_handler->prepare_table(map->database, map->table);
|
|
||||||
auto create = m_created_tables.find(table_ident);
|
auto create = m_created_tables.find(table_ident);
|
||||||
|
bool ok = false;
|
||||||
|
|
||||||
if (ok && create != m_created_tables.end()
|
if (create != m_created_tables.end() && ncolumns == map->columns()
|
||||||
&& ncolumns == map->columns() && create->second->columns.size() == map->columns())
|
&& create->second->columns.size() == map->columns()
|
||||||
|
&& m_handler->prepare_table(map, create->second))
|
||||||
|
{
|
||||||
|
ok = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ok)
|
||||||
{
|
{
|
||||||
/** Each event has one or more rows in it. The number of rows is not known
|
/** Each event has one or more rows in it. The number of rows is not known
|
||||||
* beforehand so we must continue processing them until we reach the end
|
* beforehand so we must continue processing them until we reach the end
|
||||||
|
@ -183,8 +183,8 @@ public:
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare a new row for processing
|
// Prepare a table for row processing
|
||||||
virtual bool prepare_table(std::string database, std::string table)
|
virtual bool prepare_table(const STableMapEvent& map, const STableCreateEvent& create)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user