MXS-1881: Move Rpl related code out of Avro files
Organized the Rpl and Avro code in a way that they aren't mixed.
This commit is contained in:
@ -362,23 +362,6 @@ void do_checkpoint(Avro *router)
|
|||||||
router->row_count = router->trx_count = 0;
|
router->row_count = router->trx_count = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Rpl::flush()
|
|
||||||
{
|
|
||||||
m_handler->flush_tables();
|
|
||||||
}
|
|
||||||
|
|
||||||
void Rpl::add_create(STableCreateEvent create)
|
|
||||||
{
|
|
||||||
auto it = m_created_tables.find(create->id());
|
|
||||||
|
|
||||||
if (it == m_created_tables.end() || create->version > it->second->version)
|
|
||||||
{
|
|
||||||
m_created_tables[create->id()] = create;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_binlog_end_t* rc)
|
bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_binlog_end_t* rc)
|
||||||
{
|
{
|
||||||
uint8_t hdbuf[BINLOG_EVENT_HDR_LEN];
|
uint8_t hdbuf[BINLOG_EVENT_HDR_LEN];
|
||||||
@ -461,45 +444,6 @@ static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos)
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Rpl::handle_event(REP_HEADER hdr, uint8_t* ptr)
|
|
||||||
{
|
|
||||||
if (m_binlog_checksum)
|
|
||||||
{
|
|
||||||
// We don't care about the checksum at this point so we ignore it
|
|
||||||
hdr.event_size -= 4;
|
|
||||||
}
|
|
||||||
|
|
||||||
// The following events are related to the actual data
|
|
||||||
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
|
|
||||||
{
|
|
||||||
const int BLRM_FDE_EVENT_TYPES_OFFSET = 2 + 50 + 4 + 1;
|
|
||||||
const int FDE_EXTRA_BYTES = 5;
|
|
||||||
int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1];
|
|
||||||
int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES;
|
|
||||||
uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES;
|
|
||||||
m_event_type_hdr_lens.assign(ptr, ptr + n_events);
|
|
||||||
m_event_types = n_events;
|
|
||||||
m_binlog_checksum = checksum[0];
|
|
||||||
}
|
|
||||||
else if (hdr.event_type == TABLE_MAP_EVENT)
|
|
||||||
{
|
|
||||||
handle_table_map_event(&hdr, ptr);
|
|
||||||
}
|
|
||||||
else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) ||
|
|
||||||
(hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2))
|
|
||||||
{
|
|
||||||
handle_row_event(&hdr, ptr);
|
|
||||||
}
|
|
||||||
else if (hdr.event_type == MARIADB10_GTID_EVENT)
|
|
||||||
{
|
|
||||||
m_gtid.extract(hdr, ptr);
|
|
||||||
}
|
|
||||||
else if (hdr.event_type == QUERY_EVENT)
|
|
||||||
{
|
|
||||||
handle_query_event(&hdr, ptr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Read all replication events from a binlog file.
|
* @brief Read all replication events from a binlog file.
|
||||||
*
|
*
|
||||||
@ -664,259 +608,3 @@ void avro_load_metadata_from_schemas(Avro *router)
|
|||||||
|
|
||||||
globfree(&files);
|
globfree(&files);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Detection of table creation statements
|
|
||||||
* @param router Avro router instance
|
|
||||||
* @param ptr Pointer to statement
|
|
||||||
* @param len Statement length
|
|
||||||
* @return True if the statement creates a new table
|
|
||||||
*/
|
|
||||||
bool is_create_table_statement(pcre2_code* create_table_re, char* ptr, size_t len)
|
|
||||||
{
|
|
||||||
int rc = 0;
|
|
||||||
pcre2_match_data *mdata = pcre2_match_data_create_from_pattern(create_table_re, NULL);
|
|
||||||
|
|
||||||
if (mdata)
|
|
||||||
{
|
|
||||||
rc = pcre2_match(create_table_re, (PCRE2_SPTR) ptr, len, 0, 0, mdata, NULL);
|
|
||||||
pcre2_match_data_free(mdata);
|
|
||||||
}
|
|
||||||
|
|
||||||
return rc > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool is_create_like_statement(const char* ptr, size_t len)
|
|
||||||
{
|
|
||||||
char sql[len + 1];
|
|
||||||
memcpy(sql, ptr, len);
|
|
||||||
sql[len] = '\0';
|
|
||||||
|
|
||||||
// This is not pretty but it should work
|
|
||||||
return strcasestr(sql, " like ") || strcasestr(sql, "(like ");
|
|
||||||
}
|
|
||||||
|
|
||||||
bool is_create_as_statement(const char* ptr, size_t len)
|
|
||||||
{
|
|
||||||
int err = 0;
|
|
||||||
char sql[len + 1];
|
|
||||||
memcpy(sql, ptr, len);
|
|
||||||
sql[len] = '\0';
|
|
||||||
const char* pattern =
|
|
||||||
// Case-insensitive mode
|
|
||||||
"(?i)"
|
|
||||||
// Main CREATE TABLE part (the \s is for any whitespace)
|
|
||||||
"create\\stable\\s"
|
|
||||||
// Optional IF NOT EXISTS
|
|
||||||
"(if\\snot\\sexists\\s)?"
|
|
||||||
// The table name with optional database name, both enclosed in optional backticks
|
|
||||||
"(`?\\S+`?.)`?\\S+`?\\s"
|
|
||||||
// And finally the AS keyword
|
|
||||||
"as";
|
|
||||||
|
|
||||||
return mxs_pcre2_simple_match(pattern, sql, 0, &err) == MXS_PCRE2_MATCH;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Detection of table alteration statements
|
|
||||||
* @param router Avro router instance
|
|
||||||
* @param ptr Pointer to statement
|
|
||||||
* @param len Statement length
|
|
||||||
* @return True if the statement alters a table
|
|
||||||
*/
|
|
||||||
bool is_alter_table_statement(pcre2_code* alter_table_re, char* ptr, size_t len)
|
|
||||||
{
|
|
||||||
int rc = 0;
|
|
||||||
pcre2_match_data *mdata = pcre2_match_data_create_from_pattern(alter_table_re, NULL);
|
|
||||||
|
|
||||||
if (mdata)
|
|
||||||
{
|
|
||||||
rc = pcre2_match(alter_table_re, (PCRE2_SPTR) ptr, len, 0, 0, mdata, NULL);
|
|
||||||
pcre2_match_data_free(mdata);
|
|
||||||
}
|
|
||||||
|
|
||||||
return rc > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Database name offset */
|
|
||||||
#define DBNM_OFF 8
|
|
||||||
|
|
||||||
/** Varblock offset */
|
|
||||||
#define VBLK_OFF 4 + 4 + 1 + 2
|
|
||||||
|
|
||||||
/** Post-header offset */
|
|
||||||
#define PHDR_OFF 4 + 4 + 1 + 2 + 2
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Save the CREATE TABLE statement to disk and replace older versions of the table
|
|
||||||
* in the router's hashtable.
|
|
||||||
* @param router Avro router instance
|
|
||||||
* @param created Created table
|
|
||||||
* @return False if an error occurred and true if successful
|
|
||||||
*/
|
|
||||||
bool Rpl::save_and_replace_table_create(STableCreateEvent created)
|
|
||||||
{
|
|
||||||
std::string table_ident = created->id();
|
|
||||||
auto it = m_created_tables.find(table_ident);
|
|
||||||
|
|
||||||
if (it != m_created_tables.end())
|
|
||||||
{
|
|
||||||
auto tm_it = m_table_maps.find(table_ident);
|
|
||||||
|
|
||||||
if (tm_it != m_table_maps.end())
|
|
||||||
{
|
|
||||||
m_active_maps.erase(tm_it->second->id);
|
|
||||||
m_table_maps.erase(tm_it);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
m_created_tables[table_ident] = created;
|
|
||||||
ss_dassert(created->columns.size() > 0);
|
|
||||||
return m_handler->create_table(created);
|
|
||||||
}
|
|
||||||
|
|
||||||
void unify_whitespace(char *sql, int len)
|
|
||||||
{
|
|
||||||
for (int i = 0; i < len; i++)
|
|
||||||
{
|
|
||||||
if (isspace(sql[i]) && sql[i] != ' ')
|
|
||||||
{
|
|
||||||
sql[i] = ' ';
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A very simple function for stripping auto-generated executable comments
|
|
||||||
*
|
|
||||||
* Note that the string will not strip the trailing part of the comment, making
|
|
||||||
* the SQL invalid.
|
|
||||||
*
|
|
||||||
* @param sql String to modify
|
|
||||||
* @param len Pointer to current length of string, updated to new length if
|
|
||||||
* @c sql is modified
|
|
||||||
*/
|
|
||||||
static void strip_executable_comments(char *sql, int* len)
|
|
||||||
{
|
|
||||||
if (strncmp(sql, "/*!", 3) == 0 || strncmp(sql, "/*M!", 4) == 0)
|
|
||||||
{
|
|
||||||
// Executable comment, remove it
|
|
||||||
char* p = sql + 3;
|
|
||||||
if (*p == '!')
|
|
||||||
{
|
|
||||||
p++;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Skip the versioning part
|
|
||||||
while (*p && isdigit(*p))
|
|
||||||
{
|
|
||||||
p++;
|
|
||||||
}
|
|
||||||
|
|
||||||
int n_extra = p - sql;
|
|
||||||
int new_len = *len - n_extra;
|
|
||||||
memmove(sql, sql + n_extra, new_len);
|
|
||||||
*len = new_len;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Handling of query events
|
|
||||||
*
|
|
||||||
* @param router Avro router instance
|
|
||||||
* @param hdr Replication header
|
|
||||||
* @param pending_transaction Pointer where status of pending transaction is stored
|
|
||||||
* @param ptr Pointer to the start of the event payload
|
|
||||||
*/
|
|
||||||
void Rpl::handle_query_event(REP_HEADER *hdr, uint8_t *ptr)
|
|
||||||
{
|
|
||||||
int dblen = ptr[DBNM_OFF];
|
|
||||||
int vblklen = gw_mysql_get_byte2(ptr + VBLK_OFF);
|
|
||||||
int len = hdr->event_size - BINLOG_EVENT_HDR_LEN - (PHDR_OFF + vblklen + 1 + dblen);
|
|
||||||
char *sql = (char *) ptr + PHDR_OFF + vblklen + 1 + dblen;
|
|
||||||
char db[dblen + 1];
|
|
||||||
memcpy(db, (char*) ptr + PHDR_OFF + vblklen, dblen);
|
|
||||||
db[dblen] = 0;
|
|
||||||
|
|
||||||
size_t sqlsz = len, tmpsz = len;
|
|
||||||
char *tmp = static_cast<char*>(MXS_MALLOC(len + 1));
|
|
||||||
MXS_ABORT_IF_NULL(tmp);
|
|
||||||
remove_mysql_comments((const char**)&sql, &sqlsz, &tmp, &tmpsz);
|
|
||||||
sql = tmp;
|
|
||||||
len = tmpsz;
|
|
||||||
unify_whitespace(sql, len);
|
|
||||||
strip_executable_comments(sql, &len);
|
|
||||||
sql[len] = '\0';
|
|
||||||
|
|
||||||
if (*sql == '\0')
|
|
||||||
{
|
|
||||||
MXS_FREE(tmp);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool warn_not_row_format = true;
|
|
||||||
|
|
||||||
if (warn_not_row_format)
|
|
||||||
{
|
|
||||||
GWBUF* buffer = gwbuf_alloc(len + 5);
|
|
||||||
gw_mysql_set_byte3(GWBUF_DATA(buffer), len + 1);
|
|
||||||
GWBUF_DATA(buffer)[4] = 0x03;
|
|
||||||
memcpy(GWBUF_DATA(buffer) + 5, sql, len);
|
|
||||||
qc_query_op_t op = qc_get_operation(buffer);
|
|
||||||
gwbuf_free(buffer);
|
|
||||||
|
|
||||||
if (op == QUERY_OP_UPDATE || op == QUERY_OP_INSERT || op == QUERY_OP_DELETE)
|
|
||||||
{
|
|
||||||
MXS_WARNING("Possible STATEMENT or MIXED format binary log. Check that "
|
|
||||||
"'binlog_format' is set to ROW on the master.");
|
|
||||||
warn_not_row_format = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
char ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
|
||||||
read_table_identifier(db, sql, sql + len, ident, sizeof(ident));
|
|
||||||
|
|
||||||
if (is_create_table_statement(m_create_table_re, sql, len))
|
|
||||||
{
|
|
||||||
STableCreateEvent created;
|
|
||||||
|
|
||||||
if (is_create_like_statement(sql, len))
|
|
||||||
{
|
|
||||||
created = table_create_copy(sql, len, db);
|
|
||||||
}
|
|
||||||
else if (is_create_as_statement(sql, len))
|
|
||||||
{
|
|
||||||
static bool warn_create_as = true;
|
|
||||||
if (warn_create_as)
|
|
||||||
{
|
|
||||||
MXS_WARNING("`CREATE TABLE AS` is not yet supported, ignoring events to this table: %.*s", len, sql);
|
|
||||||
warn_create_as = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
created = table_create_alloc(ident, sql, len);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (created && !save_and_replace_table_create(created))
|
|
||||||
{
|
|
||||||
MXS_ERROR("Failed to save statement to disk: %.*s", len, sql);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (is_alter_table_statement(m_alter_table_re, sql, len))
|
|
||||||
{
|
|
||||||
auto it = m_created_tables.find(ident);
|
|
||||||
|
|
||||||
if (it != m_created_tables.end())
|
|
||||||
{
|
|
||||||
table_create_alter(it->second, sql, sql + len);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
MXS_ERROR("Alter statement to table '%s' has no preceding create statement.", ident);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// TODO: Add COMMIT handling for non-transactional tables
|
|
||||||
|
|
||||||
MXS_FREE(tmp);
|
|
||||||
}
|
|
||||||
|
|||||||
@ -30,10 +30,6 @@ static bool warn_bit = false; /**< Remove when support for BIT is added */
|
|||||||
static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET values
|
static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET values
|
||||||
* larger than 255 is added */
|
* larger than 255 is added */
|
||||||
|
|
||||||
uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create,
|
|
||||||
SRowEventHandler& conv, uint8_t *ptr,
|
|
||||||
uint8_t *columns_present, uint8_t *end);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Get row event name
|
* @brief Get row event name
|
||||||
* @param event Event type
|
* @param event Event type
|
||||||
@ -65,222 +61,6 @@ static int get_event_type(uint8_t event)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Handle a table map event
|
|
||||||
*
|
|
||||||
* This converts a table map events into table meta data that will be used when
|
|
||||||
* converting binlogs to Avro format.
|
|
||||||
* @param router Avro router instance
|
|
||||||
* @param hdr Replication header
|
|
||||||
* @param ptr Pointer to event payload
|
|
||||||
*/
|
|
||||||
bool Rpl::handle_table_map_event(REP_HEADER *hdr, uint8_t *ptr)
|
|
||||||
{
|
|
||||||
bool rval = false;
|
|
||||||
uint64_t id;
|
|
||||||
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
|
||||||
int ev_len = m_event_type_hdr_lens[hdr->event_type];
|
|
||||||
|
|
||||||
read_table_info(ptr, ev_len, &id, table_ident, sizeof(table_ident));
|
|
||||||
auto create = m_created_tables.find(table_ident);
|
|
||||||
|
|
||||||
if (create != m_created_tables.end())
|
|
||||||
{
|
|
||||||
ss_dassert(create->second->columns.size() > 0);
|
|
||||||
auto it = m_table_maps.find(table_ident);
|
|
||||||
STableMapEvent map(table_map_alloc(ptr, ev_len, create->second.get()));
|
|
||||||
|
|
||||||
if (it != m_table_maps.end())
|
|
||||||
{
|
|
||||||
auto old = it->second;
|
|
||||||
|
|
||||||
if (old->id == map->id && old->version == map->version &&
|
|
||||||
old->table == map->table && old->database == map->database)
|
|
||||||
{
|
|
||||||
// We can reuse the table map object
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (m_handler->open_table(map, create->second))
|
|
||||||
{
|
|
||||||
create->second->was_used = true;
|
|
||||||
|
|
||||||
auto old = m_table_maps.find(table_ident);
|
|
||||||
bool notify = old != m_table_maps.end();
|
|
||||||
|
|
||||||
if (notify)
|
|
||||||
{
|
|
||||||
m_active_maps.erase(old->second->id);
|
|
||||||
}
|
|
||||||
|
|
||||||
m_table_maps[table_ident] = map;
|
|
||||||
m_active_maps[map->id] = map;
|
|
||||||
ss_dassert(m_active_maps[id] == map);
|
|
||||||
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
|
|
||||||
rval = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
MXS_WARNING("Table map event for table '%s' read before the DDL statement "
|
|
||||||
"for that table was read. Data will not be processed for this "
|
|
||||||
"table until a DDL statement for it is read.", table_ident);
|
|
||||||
}
|
|
||||||
|
|
||||||
return rval;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Handle a single RBR row event
|
|
||||||
*
|
|
||||||
* These events contain the changes in the data. This function assumes that full
|
|
||||||
* row image is sent in every row event.
|
|
||||||
*
|
|
||||||
* @param router Avro router instance
|
|
||||||
* @param hdr Replication header
|
|
||||||
* @param ptr Pointer to the start of the event
|
|
||||||
* @return True on succcess, false on error
|
|
||||||
*/
|
|
||||||
bool Rpl::handle_row_event(REP_HEADER *hdr, uint8_t *ptr)
|
|
||||||
{
|
|
||||||
bool rval = false;
|
|
||||||
uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
|
|
||||||
uint8_t table_id_size = m_event_type_hdr_lens[hdr->event_type] == 6 ? 4 : 6;
|
|
||||||
uint64_t table_id = 0;
|
|
||||||
|
|
||||||
/** The first value is the ID where the table was mapped. This should be
|
|
||||||
* the same as the ID in the table map even which was processed before this
|
|
||||||
* row event. */
|
|
||||||
memcpy(&table_id, ptr, table_id_size);
|
|
||||||
ptr += table_id_size;
|
|
||||||
|
|
||||||
/** Replication flags, currently ignored for the most part. */
|
|
||||||
uint16_t flags = 0;
|
|
||||||
memcpy(&flags, ptr, 2);
|
|
||||||
ptr += 2;
|
|
||||||
|
|
||||||
if (table_id == TABLE_DUMMY_ID && flags & ROW_EVENT_END_STATEMENT)
|
|
||||||
{
|
|
||||||
/** This is an dummy event which should release all table maps. Right
|
|
||||||
* now we just return without processing the rows. */
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Newer replication events have extra data stored in the header. MariaDB
|
|
||||||
* 10.1 does not use these and instead use the v1 events */
|
|
||||||
if (hdr->event_type > DELETE_ROWS_EVENTv1)
|
|
||||||
{
|
|
||||||
/** Version 2 row event, skip extra data */
|
|
||||||
uint16_t extra_len = 0;
|
|
||||||
memcpy(&extra_len, ptr, 2);
|
|
||||||
ptr += 2 + extra_len;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Number of columns in the table */
|
|
||||||
uint64_t ncolumns = mxs_leint_consume(&ptr);
|
|
||||||
|
|
||||||
/** If full row image is used, all columns are present. Currently only full
|
|
||||||
* row image is supported and thus the bitfield should be all ones. In
|
|
||||||
* the future partial row images could be used if the bitfield containing
|
|
||||||
* the columns that are present in this event is used. */
|
|
||||||
const int coldata_size = (ncolumns + 7) / 8;
|
|
||||||
uint8_t col_present[coldata_size];
|
|
||||||
memcpy(&col_present, ptr, coldata_size);
|
|
||||||
ptr += coldata_size;
|
|
||||||
|
|
||||||
/** Update events have the before and after images of the row. This can be
|
|
||||||
* used to calculate a "delta" of sorts if necessary. Currently we store
|
|
||||||
* both the before and the after images. */
|
|
||||||
uint8_t col_update[coldata_size];
|
|
||||||
if (hdr->event_type == UPDATE_ROWS_EVENTv1 ||
|
|
||||||
hdr->event_type == UPDATE_ROWS_EVENTv2)
|
|
||||||
{
|
|
||||||
memcpy(&col_update, ptr, coldata_size);
|
|
||||||
ptr += coldata_size;
|
|
||||||
}
|
|
||||||
|
|
||||||
// There should always be a table map event prior to a row event.
|
|
||||||
|
|
||||||
auto it = m_active_maps.find(table_id);
|
|
||||||
|
|
||||||
if (it != m_active_maps.end())
|
|
||||||
{
|
|
||||||
STableMapEvent map = it->second;
|
|
||||||
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
|
||||||
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str());
|
|
||||||
|
|
||||||
bool ok = m_handler->prepare_table(map->database, map->table);
|
|
||||||
auto create = m_created_tables.find(table_ident);
|
|
||||||
|
|
||||||
if (ok && create != m_created_tables.end() &&
|
|
||||||
ncolumns == map->columns() && create->second->columns.size() == map->columns())
|
|
||||||
{
|
|
||||||
/** Each event has one or more rows in it. The number of rows is not known
|
|
||||||
* beforehand so we must continue processing them until we reach the end
|
|
||||||
* of the event. */
|
|
||||||
int rows = 0;
|
|
||||||
MXS_INFO("Row Event for '%s' at %u", table_ident, hdr->next_pos - hdr->event_size);
|
|
||||||
|
|
||||||
while (ptr < end)
|
|
||||||
{
|
|
||||||
int event_type = get_event_type(hdr->event_type);
|
|
||||||
|
|
||||||
// Increment the event count for this transaction
|
|
||||||
m_gtid.event_num++;
|
|
||||||
|
|
||||||
m_handler->prepare_row(m_gtid, *hdr, event_type);
|
|
||||||
ptr = process_row_event_data(map, create->second, m_handler, ptr, col_present, end);
|
|
||||||
m_handler->commit(m_gtid);
|
|
||||||
|
|
||||||
/** Update rows events have the before and after images of the
|
|
||||||
* affected rows so we'll process them as another record with
|
|
||||||
* a different type */
|
|
||||||
if (event_type == UPDATE_EVENT)
|
|
||||||
{
|
|
||||||
m_handler->prepare_row(m_gtid, *hdr, UPDATE_EVENT_AFTER);
|
|
||||||
ptr = process_row_event_data(map, create->second, m_handler, ptr, col_present, end);
|
|
||||||
m_handler->commit(m_gtid);
|
|
||||||
}
|
|
||||||
|
|
||||||
rows++;
|
|
||||||
}
|
|
||||||
|
|
||||||
rval = true;
|
|
||||||
}
|
|
||||||
else if (!ok)
|
|
||||||
{
|
|
||||||
MXS_ERROR("Avro file handle was not found for table %s.%s. See earlier"
|
|
||||||
" errors for more details.", map->database.c_str(), map->table.c_str());
|
|
||||||
}
|
|
||||||
else if (create == m_created_tables.end())
|
|
||||||
{
|
|
||||||
MXS_ERROR("Create table statement for %s.%s was not found from the "
|
|
||||||
"binary logs or the stored schema was not correct.",
|
|
||||||
map->database.c_str(), map->table.c_str());
|
|
||||||
}
|
|
||||||
else if (ncolumns == map->columns() && create->second->columns.size() != map->columns())
|
|
||||||
{
|
|
||||||
MXS_ERROR("Table map event has a different column count for table "
|
|
||||||
"%s.%s than the CREATE TABLE statement. Possible "
|
|
||||||
"unsupported DDL detected.", map->database.c_str(), map->table.c_str());
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
MXS_ERROR("Row event and table map event have different column "
|
|
||||||
"counts for table %s.%s, only full row image is currently "
|
|
||||||
"supported.", map->database.c_str(), map->table.c_str());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
MXS_ERROR("Row event for unknown table mapped to ID %lu. Data will not "
|
|
||||||
"be processed.", table_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
return rval;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Unpack numeric types
|
* @brief Unpack numeric types
|
||||||
*
|
*
|
||||||
@ -638,3 +418,550 @@ uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create,
|
|||||||
|
|
||||||
return ptr;
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Read the fully qualified name of the table
|
||||||
|
*
|
||||||
|
* @param ptr Pointer to the start of the event payload
|
||||||
|
* @param post_header_len Length of the event specific header, 8 or 6 bytes
|
||||||
|
* @param dest Destination where the string is stored
|
||||||
|
* @param len Size of destination
|
||||||
|
*/
|
||||||
|
void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *tbl_id, char* dest, size_t len)
|
||||||
|
{
|
||||||
|
uint64_t table_id = 0;
|
||||||
|
size_t id_size = post_header_len == 6 ? 4 : 6;
|
||||||
|
memcpy(&table_id, ptr, id_size);
|
||||||
|
ptr += id_size;
|
||||||
|
|
||||||
|
uint16_t flags = 0;
|
||||||
|
memcpy(&flags, ptr, 2);
|
||||||
|
ptr += 2;
|
||||||
|
|
||||||
|
uint8_t schema_name_len = *ptr++;
|
||||||
|
char schema_name[schema_name_len + 2];
|
||||||
|
|
||||||
|
/** Copy the NULL byte after the schema name */
|
||||||
|
memcpy(schema_name, ptr, schema_name_len + 1);
|
||||||
|
ptr += schema_name_len + 1;
|
||||||
|
|
||||||
|
uint8_t table_name_len = *ptr++;
|
||||||
|
char table_name[table_name_len + 2];
|
||||||
|
|
||||||
|
/** Copy the NULL byte after the table name */
|
||||||
|
memcpy(table_name, ptr, table_name_len + 1);
|
||||||
|
|
||||||
|
snprintf(dest, len, "%s.%s", schema_name, table_name);
|
||||||
|
*tbl_id = table_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Handle a table map event
|
||||||
|
*
|
||||||
|
* This converts a table map events into table meta data that will be used when
|
||||||
|
* converting binlogs to Avro format.
|
||||||
|
* @param router Avro router instance
|
||||||
|
* @param hdr Replication header
|
||||||
|
* @param ptr Pointer to event payload
|
||||||
|
*/
|
||||||
|
bool Rpl::handle_table_map_event(REP_HEADER *hdr, uint8_t *ptr)
|
||||||
|
{
|
||||||
|
bool rval = false;
|
||||||
|
uint64_t id;
|
||||||
|
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
||||||
|
int ev_len = m_event_type_hdr_lens[hdr->event_type];
|
||||||
|
|
||||||
|
read_table_info(ptr, ev_len, &id, table_ident, sizeof(table_ident));
|
||||||
|
auto create = m_created_tables.find(table_ident);
|
||||||
|
|
||||||
|
if (create != m_created_tables.end())
|
||||||
|
{
|
||||||
|
ss_dassert(create->second->columns.size() > 0);
|
||||||
|
auto it = m_table_maps.find(table_ident);
|
||||||
|
STableMapEvent map(table_map_alloc(ptr, ev_len, create->second.get()));
|
||||||
|
|
||||||
|
if (it != m_table_maps.end())
|
||||||
|
{
|
||||||
|
auto old = it->second;
|
||||||
|
|
||||||
|
if (old->id == map->id && old->version == map->version &&
|
||||||
|
old->table == map->table && old->database == map->database)
|
||||||
|
{
|
||||||
|
// We can reuse the table map object
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_handler->open_table(map, create->second))
|
||||||
|
{
|
||||||
|
create->second->was_used = true;
|
||||||
|
|
||||||
|
auto old = m_table_maps.find(table_ident);
|
||||||
|
bool notify = old != m_table_maps.end();
|
||||||
|
|
||||||
|
if (notify)
|
||||||
|
{
|
||||||
|
m_active_maps.erase(old->second->id);
|
||||||
|
}
|
||||||
|
|
||||||
|
m_table_maps[table_ident] = map;
|
||||||
|
m_active_maps[map->id] = map;
|
||||||
|
ss_dassert(m_active_maps[id] == map);
|
||||||
|
MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
|
||||||
|
rval = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXS_WARNING("Table map event for table '%s' read before the DDL statement "
|
||||||
|
"for that table was read. Data will not be processed for this "
|
||||||
|
"table until a DDL statement for it is read.", table_ident);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Handle a single RBR row event
|
||||||
|
*
|
||||||
|
* These events contain the changes in the data. This function assumes that full
|
||||||
|
* row image is sent in every row event.
|
||||||
|
*
|
||||||
|
* @param router Avro router instance
|
||||||
|
* @param hdr Replication header
|
||||||
|
* @param ptr Pointer to the start of the event
|
||||||
|
* @return True on succcess, false on error
|
||||||
|
*/
|
||||||
|
bool Rpl::handle_row_event(REP_HEADER *hdr, uint8_t *ptr)
|
||||||
|
{
|
||||||
|
bool rval = false;
|
||||||
|
uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
|
||||||
|
uint8_t table_id_size = m_event_type_hdr_lens[hdr->event_type] == 6 ? 4 : 6;
|
||||||
|
uint64_t table_id = 0;
|
||||||
|
|
||||||
|
/** The first value is the ID where the table was mapped. This should be
|
||||||
|
* the same as the ID in the table map even which was processed before this
|
||||||
|
* row event. */
|
||||||
|
memcpy(&table_id, ptr, table_id_size);
|
||||||
|
ptr += table_id_size;
|
||||||
|
|
||||||
|
/** Replication flags, currently ignored for the most part. */
|
||||||
|
uint16_t flags = 0;
|
||||||
|
memcpy(&flags, ptr, 2);
|
||||||
|
ptr += 2;
|
||||||
|
|
||||||
|
if (table_id == TABLE_DUMMY_ID && flags & ROW_EVENT_END_STATEMENT)
|
||||||
|
{
|
||||||
|
/** This is an dummy event which should release all table maps. Right
|
||||||
|
* now we just return without processing the rows. */
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Newer replication events have extra data stored in the header. MariaDB
|
||||||
|
* 10.1 does not use these and instead use the v1 events */
|
||||||
|
if (hdr->event_type > DELETE_ROWS_EVENTv1)
|
||||||
|
{
|
||||||
|
/** Version 2 row event, skip extra data */
|
||||||
|
uint16_t extra_len = 0;
|
||||||
|
memcpy(&extra_len, ptr, 2);
|
||||||
|
ptr += 2 + extra_len;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Number of columns in the table */
|
||||||
|
uint64_t ncolumns = mxs_leint_consume(&ptr);
|
||||||
|
|
||||||
|
/** If full row image is used, all columns are present. Currently only full
|
||||||
|
* row image is supported and thus the bitfield should be all ones. In
|
||||||
|
* the future partial row images could be used if the bitfield containing
|
||||||
|
* the columns that are present in this event is used. */
|
||||||
|
const int coldata_size = (ncolumns + 7) / 8;
|
||||||
|
uint8_t col_present[coldata_size];
|
||||||
|
memcpy(&col_present, ptr, coldata_size);
|
||||||
|
ptr += coldata_size;
|
||||||
|
|
||||||
|
/** Update events have the before and after images of the row. This can be
|
||||||
|
* used to calculate a "delta" of sorts if necessary. Currently we store
|
||||||
|
* both the before and the after images. */
|
||||||
|
uint8_t col_update[coldata_size];
|
||||||
|
if (hdr->event_type == UPDATE_ROWS_EVENTv1 ||
|
||||||
|
hdr->event_type == UPDATE_ROWS_EVENTv2)
|
||||||
|
{
|
||||||
|
memcpy(&col_update, ptr, coldata_size);
|
||||||
|
ptr += coldata_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
// There should always be a table map event prior to a row event.
|
||||||
|
|
||||||
|
auto it = m_active_maps.find(table_id);
|
||||||
|
|
||||||
|
if (it != m_active_maps.end())
|
||||||
|
{
|
||||||
|
STableMapEvent map = it->second;
|
||||||
|
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
||||||
|
snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str());
|
||||||
|
|
||||||
|
bool ok = m_handler->prepare_table(map->database, map->table);
|
||||||
|
auto create = m_created_tables.find(table_ident);
|
||||||
|
|
||||||
|
if (ok && create != m_created_tables.end() &&
|
||||||
|
ncolumns == map->columns() && create->second->columns.size() == map->columns())
|
||||||
|
{
|
||||||
|
/** Each event has one or more rows in it. The number of rows is not known
|
||||||
|
* beforehand so we must continue processing them until we reach the end
|
||||||
|
* of the event. */
|
||||||
|
int rows = 0;
|
||||||
|
MXS_INFO("Row Event for '%s' at %u", table_ident, hdr->next_pos - hdr->event_size);
|
||||||
|
|
||||||
|
while (ptr < end)
|
||||||
|
{
|
||||||
|
int event_type = get_event_type(hdr->event_type);
|
||||||
|
|
||||||
|
// Increment the event count for this transaction
|
||||||
|
m_gtid.event_num++;
|
||||||
|
|
||||||
|
m_handler->prepare_row(m_gtid, *hdr, event_type);
|
||||||
|
ptr = process_row_event_data(map, create->second, m_handler, ptr, col_present, end);
|
||||||
|
m_handler->commit(m_gtid);
|
||||||
|
|
||||||
|
/** Update rows events have the before and after images of the
|
||||||
|
* affected rows so we'll process them as another record with
|
||||||
|
* a different type */
|
||||||
|
if (event_type == UPDATE_EVENT)
|
||||||
|
{
|
||||||
|
m_handler->prepare_row(m_gtid, *hdr, UPDATE_EVENT_AFTER);
|
||||||
|
ptr = process_row_event_data(map, create->second, m_handler, ptr, col_present, end);
|
||||||
|
m_handler->commit(m_gtid);
|
||||||
|
}
|
||||||
|
|
||||||
|
rows++;
|
||||||
|
}
|
||||||
|
|
||||||
|
rval = true;
|
||||||
|
}
|
||||||
|
else if (!ok)
|
||||||
|
{
|
||||||
|
MXS_ERROR("Avro file handle was not found for table %s.%s. See earlier"
|
||||||
|
" errors for more details.", map->database.c_str(), map->table.c_str());
|
||||||
|
}
|
||||||
|
else if (create == m_created_tables.end())
|
||||||
|
{
|
||||||
|
MXS_ERROR("Create table statement for %s.%s was not found from the "
|
||||||
|
"binary logs or the stored schema was not correct.",
|
||||||
|
map->database.c_str(), map->table.c_str());
|
||||||
|
}
|
||||||
|
else if (ncolumns == map->columns() && create->second->columns.size() != map->columns())
|
||||||
|
{
|
||||||
|
MXS_ERROR("Table map event has a different column count for table "
|
||||||
|
"%s.%s than the CREATE TABLE statement. Possible "
|
||||||
|
"unsupported DDL detected.", map->database.c_str(), map->table.c_str());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXS_ERROR("Row event and table map event have different column "
|
||||||
|
"counts for table %s.%s, only full row image is currently "
|
||||||
|
"supported.", map->database.c_str(), map->table.c_str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXS_ERROR("Row event for unknown table mapped to ID %lu. Data will not "
|
||||||
|
"be processed.", table_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Detection of table creation statements
|
||||||
|
* @param router Avro router instance
|
||||||
|
* @param ptr Pointer to statement
|
||||||
|
* @param len Statement length
|
||||||
|
* @return True if the statement creates a new table
|
||||||
|
*/
|
||||||
|
bool is_create_table_statement(pcre2_code* create_table_re, char* ptr, size_t len)
|
||||||
|
{
|
||||||
|
int rc = 0;
|
||||||
|
pcre2_match_data *mdata = pcre2_match_data_create_from_pattern(create_table_re, NULL);
|
||||||
|
|
||||||
|
if (mdata)
|
||||||
|
{
|
||||||
|
rc = pcre2_match(create_table_re, (PCRE2_SPTR) ptr, len, 0, 0, mdata, NULL);
|
||||||
|
pcre2_match_data_free(mdata);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rc > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool is_create_like_statement(const char* ptr, size_t len)
|
||||||
|
{
|
||||||
|
char sql[len + 1];
|
||||||
|
memcpy(sql, ptr, len);
|
||||||
|
sql[len] = '\0';
|
||||||
|
|
||||||
|
// This is not pretty but it should work
|
||||||
|
return strcasestr(sql, " like ") || strcasestr(sql, "(like ");
|
||||||
|
}
|
||||||
|
|
||||||
|
bool is_create_as_statement(const char* ptr, size_t len)
|
||||||
|
{
|
||||||
|
int err = 0;
|
||||||
|
char sql[len + 1];
|
||||||
|
memcpy(sql, ptr, len);
|
||||||
|
sql[len] = '\0';
|
||||||
|
const char* pattern =
|
||||||
|
// Case-insensitive mode
|
||||||
|
"(?i)"
|
||||||
|
// Main CREATE TABLE part (the \s is for any whitespace)
|
||||||
|
"create\\stable\\s"
|
||||||
|
// Optional IF NOT EXISTS
|
||||||
|
"(if\\snot\\sexists\\s)?"
|
||||||
|
// The table name with optional database name, both enclosed in optional backticks
|
||||||
|
"(`?\\S+`?.)`?\\S+`?\\s"
|
||||||
|
// And finally the AS keyword
|
||||||
|
"as";
|
||||||
|
|
||||||
|
return mxs_pcre2_simple_match(pattern, sql, 0, &err) == MXS_PCRE2_MATCH;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Detection of table alteration statements
|
||||||
|
* @param router Avro router instance
|
||||||
|
* @param ptr Pointer to statement
|
||||||
|
* @param len Statement length
|
||||||
|
* @return True if the statement alters a table
|
||||||
|
*/
|
||||||
|
bool is_alter_table_statement(pcre2_code* alter_table_re, char* ptr, size_t len)
|
||||||
|
{
|
||||||
|
int rc = 0;
|
||||||
|
pcre2_match_data *mdata = pcre2_match_data_create_from_pattern(alter_table_re, NULL);
|
||||||
|
|
||||||
|
if (mdata)
|
||||||
|
{
|
||||||
|
rc = pcre2_match(alter_table_re, (PCRE2_SPTR) ptr, len, 0, 0, mdata, NULL);
|
||||||
|
pcre2_match_data_free(mdata);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rc > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Database name offset */
|
||||||
|
#define DBNM_OFF 8
|
||||||
|
|
||||||
|
/** Varblock offset */
|
||||||
|
#define VBLK_OFF 4 + 4 + 1 + 2
|
||||||
|
|
||||||
|
/** Post-header offset */
|
||||||
|
#define PHDR_OFF 4 + 4 + 1 + 2 + 2
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Save the CREATE TABLE statement to disk and replace older versions of the table
|
||||||
|
* in the router's hashtable.
|
||||||
|
* @param router Avro router instance
|
||||||
|
* @param created Created table
|
||||||
|
* @return False if an error occurred and true if successful
|
||||||
|
*/
|
||||||
|
bool Rpl::save_and_replace_table_create(STableCreateEvent created)
|
||||||
|
{
|
||||||
|
std::string table_ident = created->id();
|
||||||
|
auto it = m_created_tables.find(table_ident);
|
||||||
|
|
||||||
|
if (it != m_created_tables.end())
|
||||||
|
{
|
||||||
|
auto tm_it = m_table_maps.find(table_ident);
|
||||||
|
|
||||||
|
if (tm_it != m_table_maps.end())
|
||||||
|
{
|
||||||
|
m_active_maps.erase(tm_it->second->id);
|
||||||
|
m_table_maps.erase(tm_it);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
m_created_tables[table_ident] = created;
|
||||||
|
ss_dassert(created->columns.size() > 0);
|
||||||
|
return m_handler->create_table(created);
|
||||||
|
}
|
||||||
|
|
||||||
|
void unify_whitespace(char *sql, int len)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < len; i++)
|
||||||
|
{
|
||||||
|
if (isspace(sql[i]) && sql[i] != ' ')
|
||||||
|
{
|
||||||
|
sql[i] = ' ';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A very simple function for stripping auto-generated executable comments
|
||||||
|
*
|
||||||
|
* Note that the string will not strip the trailing part of the comment, making
|
||||||
|
* the SQL invalid.
|
||||||
|
*
|
||||||
|
* @param sql String to modify
|
||||||
|
* @param len Pointer to current length of string, updated to new length if
|
||||||
|
* @c sql is modified
|
||||||
|
*/
|
||||||
|
static void strip_executable_comments(char *sql, int* len)
|
||||||
|
{
|
||||||
|
if (strncmp(sql, "/*!", 3) == 0 || strncmp(sql, "/*M!", 4) == 0)
|
||||||
|
{
|
||||||
|
// Executable comment, remove it
|
||||||
|
char* p = sql + 3;
|
||||||
|
if (*p == '!')
|
||||||
|
{
|
||||||
|
p++;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip the versioning part
|
||||||
|
while (*p && isdigit(*p))
|
||||||
|
{
|
||||||
|
p++;
|
||||||
|
}
|
||||||
|
|
||||||
|
int n_extra = p - sql;
|
||||||
|
int new_len = *len - n_extra;
|
||||||
|
memmove(sql, sql + n_extra, new_len);
|
||||||
|
*len = new_len;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Handling of query events
|
||||||
|
*
|
||||||
|
* @param router Avro router instance
|
||||||
|
* @param hdr Replication header
|
||||||
|
* @param pending_transaction Pointer where status of pending transaction is stored
|
||||||
|
* @param ptr Pointer to the start of the event payload
|
||||||
|
*/
|
||||||
|
void Rpl::handle_query_event(REP_HEADER *hdr, uint8_t *ptr)
|
||||||
|
{
|
||||||
|
int dblen = ptr[DBNM_OFF];
|
||||||
|
int vblklen = gw_mysql_get_byte2(ptr + VBLK_OFF);
|
||||||
|
int len = hdr->event_size - BINLOG_EVENT_HDR_LEN - (PHDR_OFF + vblklen + 1 + dblen);
|
||||||
|
char *sql = (char *) ptr + PHDR_OFF + vblklen + 1 + dblen;
|
||||||
|
char db[dblen + 1];
|
||||||
|
memcpy(db, (char*) ptr + PHDR_OFF + vblklen, dblen);
|
||||||
|
db[dblen] = 0;
|
||||||
|
|
||||||
|
size_t sqlsz = len, tmpsz = len;
|
||||||
|
char *tmp = static_cast<char*>(MXS_MALLOC(len + 1));
|
||||||
|
MXS_ABORT_IF_NULL(tmp);
|
||||||
|
remove_mysql_comments((const char**)&sql, &sqlsz, &tmp, &tmpsz);
|
||||||
|
sql = tmp;
|
||||||
|
len = tmpsz;
|
||||||
|
unify_whitespace(sql, len);
|
||||||
|
strip_executable_comments(sql, &len);
|
||||||
|
sql[len] = '\0';
|
||||||
|
|
||||||
|
if (*sql == '\0')
|
||||||
|
{
|
||||||
|
MXS_FREE(tmp);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool warn_not_row_format = true;
|
||||||
|
|
||||||
|
if (warn_not_row_format)
|
||||||
|
{
|
||||||
|
GWBUF* buffer = gwbuf_alloc(len + 5);
|
||||||
|
gw_mysql_set_byte3(GWBUF_DATA(buffer), len + 1);
|
||||||
|
GWBUF_DATA(buffer)[4] = 0x03;
|
||||||
|
memcpy(GWBUF_DATA(buffer) + 5, sql, len);
|
||||||
|
qc_query_op_t op = qc_get_operation(buffer);
|
||||||
|
gwbuf_free(buffer);
|
||||||
|
|
||||||
|
if (op == QUERY_OP_UPDATE || op == QUERY_OP_INSERT || op == QUERY_OP_DELETE)
|
||||||
|
{
|
||||||
|
MXS_WARNING("Possible STATEMENT or MIXED format binary log. Check that "
|
||||||
|
"'binlog_format' is set to ROW on the master.");
|
||||||
|
warn_not_row_format = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
char ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
|
||||||
|
read_table_identifier(db, sql, sql + len, ident, sizeof(ident));
|
||||||
|
|
||||||
|
if (is_create_table_statement(m_create_table_re, sql, len))
|
||||||
|
{
|
||||||
|
STableCreateEvent created;
|
||||||
|
|
||||||
|
if (is_create_like_statement(sql, len))
|
||||||
|
{
|
||||||
|
created = table_create_copy(sql, len, db);
|
||||||
|
}
|
||||||
|
else if (is_create_as_statement(sql, len))
|
||||||
|
{
|
||||||
|
static bool warn_create_as = true;
|
||||||
|
if (warn_create_as)
|
||||||
|
{
|
||||||
|
MXS_WARNING("`CREATE TABLE AS` is not yet supported, ignoring events to this table: %.*s", len, sql);
|
||||||
|
warn_create_as = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
created = table_create_alloc(ident, sql, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (created && !save_and_replace_table_create(created))
|
||||||
|
{
|
||||||
|
MXS_ERROR("Failed to save statement to disk: %.*s", len, sql);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (is_alter_table_statement(m_alter_table_re, sql, len))
|
||||||
|
{
|
||||||
|
auto it = m_created_tables.find(ident);
|
||||||
|
|
||||||
|
if (it != m_created_tables.end())
|
||||||
|
{
|
||||||
|
table_create_alter(it->second, sql, sql + len);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXS_ERROR("Alter statement to table '%s' has no preceding create statement.", ident);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// TODO: Add COMMIT handling for non-transactional tables
|
||||||
|
|
||||||
|
MXS_FREE(tmp);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Rpl::handle_event(REP_HEADER hdr, uint8_t* ptr)
|
||||||
|
{
|
||||||
|
if (m_binlog_checksum)
|
||||||
|
{
|
||||||
|
// We don't care about the checksum at this point so we ignore it
|
||||||
|
hdr.event_size -= 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The following events are related to the actual data
|
||||||
|
if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
|
||||||
|
{
|
||||||
|
const int BLRM_FDE_EVENT_TYPES_OFFSET = 2 + 50 + 4 + 1;
|
||||||
|
const int FDE_EXTRA_BYTES = 5;
|
||||||
|
int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1];
|
||||||
|
int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES;
|
||||||
|
uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES;
|
||||||
|
m_event_type_hdr_lens.assign(ptr, ptr + n_events);
|
||||||
|
m_event_types = n_events;
|
||||||
|
m_binlog_checksum = checksum[0];
|
||||||
|
}
|
||||||
|
else if (hdr.event_type == TABLE_MAP_EVENT)
|
||||||
|
{
|
||||||
|
handle_table_map_event(&hdr, ptr);
|
||||||
|
}
|
||||||
|
else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) ||
|
||||||
|
(hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2))
|
||||||
|
{
|
||||||
|
handle_row_event(&hdr, ptr);
|
||||||
|
}
|
||||||
|
else if (hdr.event_type == MARIADB10_GTID_EVENT)
|
||||||
|
{
|
||||||
|
m_gtid.extract(hdr, ptr);
|
||||||
|
}
|
||||||
|
else if (hdr.event_type == QUERY_EVENT)
|
||||||
|
{
|
||||||
|
handle_query_event(&hdr, ptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -1,3 +1,3 @@
|
|||||||
add_executable(test_alter_parsing test_alter_parsing.cc)
|
add_executable(test_alter_parsing test_alter_parsing.cc)
|
||||||
target_link_libraries(test_alter_parsing maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro sqlite3 lzma)
|
target_link_libraries(test_alter_parsing avro-common maxscale-common ${JANSSON_LIBRARIES} ${AVRO_LIBRARIES} maxavro sqlite3 lzma)
|
||||||
add_test(test_alter_parsing test_alter_parsing)
|
add_test(test_alter_parsing test_alter_parsing)
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
#include "../avro_schema.cc"
|
#include "../avro_schema.cc"
|
||||||
|
#include "../rpl.cc"
|
||||||
|
|
||||||
static struct
|
static struct
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user