 733dc491d9
			
		
	
	733dc491d9
	
	
	
		
			
			Took the Replicator into use in avrorouter as an alternative to the binlogrouter based setup. This also allows the avrorouter to automatically handle master failovers and to start replication from GTID coordinates.
		
			
				
	
	
		
			1010 lines
		
	
	
		
			31 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1010 lines
		
	
	
		
			31 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|  * Copyright (c) 2016 MariaDB Corporation Ab
 | |
|  *
 | |
|  * Use of this software is governed by the Business Source License included
 | |
|  * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | |
|  *
 | |
|  * Change Date: 2022-01-01
 | |
|  *
 | |
|  * On the date above, in accordance with the Business Source License, use
 | |
|  * of this software will be governed by version 2 or later of the General
 | |
|  * Public License.
 | |
|  */
 | |
| 
 | |
| #include "avrorouter.hh"
 | |
| 
 | |
| #include <maxscale/mysql_utils.hh>
 | |
| #include <jansson.h>
 | |
| #include <maxscale/alloc.h>
 | |
| #include <strings.h>
 | |
| #include <signal.h>
 | |
| #include <maxscale/utils.h>
 | |
| 
 | |
| #define WRITE_EVENT        0
 | |
| #define UPDATE_EVENT       1
 | |
| #define UPDATE_EVENT_AFTER 2
 | |
| #define DELETE_EVENT       3
 | |
| 
 | |
| static bool warn_decimal = false;       /**< Remove when support for DECIMAL is added */
 | |
| static bool warn_bit = false;           /**< Remove when support for BIT is added */
 | |
| static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET values
 | |
|                                          * larger than 255 is added */
 | |
| 
 | |
| /**
 | |
|  * @brief Get row event name
 | |
|  * @param event Event type
 | |
|  * @return String representation of the event
 | |
|  */
 | |
| static int get_event_type(uint8_t event)
 | |
| {
 | |
|     switch (event)
 | |
|     {
 | |
| 
 | |
|     case WRITE_ROWS_EVENTv0:
 | |
|     case WRITE_ROWS_EVENTv1:
 | |
|     case WRITE_ROWS_EVENTv2:
 | |
|         return WRITE_EVENT;
 | |
| 
 | |
|     case UPDATE_ROWS_EVENTv0:
 | |
|     case UPDATE_ROWS_EVENTv1:
 | |
|     case UPDATE_ROWS_EVENTv2:
 | |
|         return UPDATE_EVENT;
 | |
| 
 | |
|     case DELETE_ROWS_EVENTv0:
 | |
|     case DELETE_ROWS_EVENTv1:
 | |
|     case DELETE_ROWS_EVENTv2:
 | |
|         return DELETE_EVENT;
 | |
| 
 | |
|     default:
 | |
|         MXS_ERROR("Unexpected event type: %d (%0x)", event, event);
 | |
|         return -1;
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Unpack numeric types
 | |
|  *
 | |
|  * Convert the raw binary data into actual numeric types.
 | |
|  *
 | |
|  * @param conv     Event converter to use
 | |
|  * @param idx      Position of this column in the row
 | |
|  * @param type     Event type
 | |
|  * @param metadata Field metadata
 | |
|  * @param value    Pointer to the start of the in-memory representation of the data
 | |
|  */
 | |
| void set_numeric_field_value(SRowEventHandler& conv,
 | |
|                              int idx,
 | |
|                              uint8_t type,
 | |
|                              uint8_t* metadata,
 | |
|                              uint8_t* value)
 | |
| {
 | |
|     switch (type)
 | |
|     {
 | |
|     case TABLE_COL_TYPE_TINY:
 | |
|         {
 | |
|             char c = *value;
 | |
|             conv->column(idx, c);
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|     case TABLE_COL_TYPE_SHORT:
 | |
|         {
 | |
|             short s = gw_mysql_get_byte2(value);
 | |
|             conv->column(idx, s);
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|     case TABLE_COL_TYPE_INT24:
 | |
|         {
 | |
|             int x = gw_mysql_get_byte3(value);
 | |
| 
 | |
|             if (x & 0x800000)
 | |
|             {
 | |
|                 x = -((0xffffff & (~x)) + 1);
 | |
|             }
 | |
| 
 | |
|             conv->column(idx, x);
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|     case TABLE_COL_TYPE_LONG:
 | |
|         {
 | |
|             int x = gw_mysql_get_byte4(value);
 | |
|             conv->column(idx, x);
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|     case TABLE_COL_TYPE_LONGLONG:
 | |
|         {
 | |
|             long l = gw_mysql_get_byte8(value);
 | |
|             conv->column(idx, l);
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|     case TABLE_COL_TYPE_FLOAT:
 | |
|         {
 | |
|             float f = 0;
 | |
|             memcpy(&f, value, 4);
 | |
|             conv->column(idx, f);
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|     case TABLE_COL_TYPE_DOUBLE:
 | |
|         {
 | |
|             double d = 0;
 | |
|             memcpy(&d, value, 8);
 | |
|             conv->column(idx, d);
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|     default:
 | |
|         break;
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Check if a bit is set
 | |
|  *
 | |
|  * @param ptr Pointer to start of bitfield
 | |
|  * @param columns Number of columns (bits)
 | |
|  * @param current_column Zero indexed column number
 | |
|  * @return True if the bit is set
 | |
|  */
 | |
| static bool bit_is_set(uint8_t* ptr, int columns, int current_column)
 | |
| {
 | |
|     if (current_column >= 8)
 | |
|     {
 | |
|         ptr += current_column / 8;
 | |
|         current_column = current_column % 8;
 | |
|     }
 | |
| 
 | |
|     return (*ptr) & (1 << current_column);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Get the length of the metadata for a particular field
 | |
|  *
 | |
|  * @param type Type of the field
 | |
|  * @return Length of the metadata for this field
 | |
|  */
 | |
| int get_metadata_len(uint8_t type)
 | |
| {
 | |
|     switch (type)
 | |
|     {
 | |
|     case TABLE_COL_TYPE_STRING:
 | |
|     case TABLE_COL_TYPE_VAR_STRING:
 | |
|     case TABLE_COL_TYPE_VARCHAR:
 | |
|     case TABLE_COL_TYPE_DECIMAL:
 | |
|     case TABLE_COL_TYPE_NEWDECIMAL:
 | |
|     case TABLE_COL_TYPE_ENUM:
 | |
|     case TABLE_COL_TYPE_SET:
 | |
|     case TABLE_COL_TYPE_BIT:
 | |
|         return 2;
 | |
| 
 | |
|     case TABLE_COL_TYPE_BLOB:
 | |
|     case TABLE_COL_TYPE_FLOAT:
 | |
|     case TABLE_COL_TYPE_DOUBLE:
 | |
|     case TABLE_COL_TYPE_DATETIME2:
 | |
|     case TABLE_COL_TYPE_TIMESTAMP2:
 | |
|     case TABLE_COL_TYPE_TIME2:
 | |
|         return 1;
 | |
| 
 | |
|     default:
 | |
|         return 0;
 | |
|     }
 | |
| }
 | |
| 
 | |
| // Make sure that both `i` and `trace` are defined before using this macro
 | |
| #define check_overflow(t) \
 | |
|     do \
 | |
|     { \
 | |
|         if (!(t)) \
 | |
|         { \
 | |
|             for (long x = 0; x < i; x++) \
 | |
|             { \
 | |
|                 MXS_ALERT("%s", trace[x]); \
 | |
|             } \
 | |
|             raise(SIGABRT); \
 | |
|         } \
 | |
|     } while (false)
 | |
| 
 | |
| // Debug function for checking whether a row event consists of only NULL values
 | |
| static bool all_fields_null(uint8_t* null_bitmap, int ncolumns)
 | |
| {
 | |
|     bool rval = true;
 | |
| 
 | |
|     for (long i = 0; i < ncolumns; i++)
 | |
|     {
 | |
|         if (!bit_is_set(null_bitmap, ncolumns, i))
 | |
|         {
 | |
|             rval = false;
 | |
|             break;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Extract the values from a single row  in a row event
 | |
|  *
 | |
|  * @param map Table map event associated with this row
 | |
|  * @param create Table creation associated with this row
 | |
|  * @param record Avro record used for storing this row
 | |
|  * @param ptr Pointer to the start of the row data, should be after the row event header
 | |
|  * @param columns_present The bitfield holding the columns that are present for
 | |
|  * this row event. Currently this should be a bitfield which has all bits set.
 | |
|  * @return Pointer to the first byte after the current row event
 | |
|  */
 | |
| uint8_t* process_row_event_data(STableMapEvent map,
 | |
|                                 STableCreateEvent create,
 | |
|                                 SRowEventHandler& conv,
 | |
|                                 uint8_t* ptr,
 | |
|                                 uint8_t* columns_present,
 | |
|                                 uint8_t* end)
 | |
| {
 | |
|     mxb_assert(create->database == map->database && create->table == map->table);
 | |
|     int npresent = 0;
 | |
|     long ncolumns = map->columns();
 | |
|     uint8_t* metadata = &map->column_metadata[0];
 | |
|     size_t metadata_offset = 0;
 | |
| 
 | |
|     /** BIT type values use the extra bits in the row event header */
 | |
|     int extra_bits = (((ncolumns + 7) / 8) * 8) - ncolumns;
 | |
|     mxb_assert(ptr < end);
 | |
| 
 | |
|     /** Store the null value bitmap */
 | |
|     uint8_t* null_bitmap = ptr;
 | |
|     ptr += (ncolumns + 7) / 8;
 | |
|     mxb_assert(ptr < end || (bit_is_set(null_bitmap, ncolumns, 0)));
 | |
| 
 | |
|     char trace[ncolumns][768];
 | |
|     memset(trace, 0, sizeof(trace));
 | |
| 
 | |
|     for (long i = 0; i < ncolumns && npresent < ncolumns; i++)
 | |
|     {
 | |
|         if (bit_is_set(columns_present, ncolumns, i))
 | |
|         {
 | |
|             npresent++;
 | |
| 
 | |
|             if (bit_is_set(null_bitmap, ncolumns, i))
 | |
|             {
 | |
|                 sprintf(trace[i], "[%ld] NULL", i);
 | |
|                 conv->column(i);
 | |
|             }
 | |
|             else if (column_is_fixed_string(map->column_types[i]))
 | |
|             {
 | |
|                 /** ENUM and SET are stored as STRING types with the type stored
 | |
|                  * in the metadata. */
 | |
|                 if (fixed_string_is_enum(metadata[metadata_offset]))
 | |
|                 {
 | |
|                     uint8_t val[metadata[metadata_offset + 1]];
 | |
|                     uint64_t bytes = unpack_enum(ptr, &metadata[metadata_offset], val);
 | |
|                     char strval[bytes * 2 + 1];
 | |
|                     gw_bin2hex(strval, val, bytes);
 | |
|                     conv->column(i, strval);
 | |
|                     sprintf(trace[i], "[%ld] ENUM: %lu bytes", i, bytes);
 | |
|                     ptr += bytes;
 | |
|                     check_overflow(ptr <= end);
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     /**
 | |
|                      * The first byte in the metadata stores the real type of
 | |
|                      * the string (ENUM and SET types are also stored as fixed
 | |
|                      * length strings).
 | |
|                      *
 | |
|                      * The first two bits of the second byte contain the XOR'ed
 | |
|                      * field length but as that information is not relevant for
 | |
|                      * us, we just use this information to know whether to read
 | |
|                      * one or two bytes for string length.
 | |
|                      */
 | |
| 
 | |
|                     uint16_t meta = metadata[metadata_offset + 1] + (metadata[metadata_offset] << 8);
 | |
|                     int bytes = 0;
 | |
|                     uint16_t extra_length = (((meta >> 4) & 0x300) ^ 0x300);
 | |
|                     uint16_t field_length = (meta & 0xff) + extra_length;
 | |
| 
 | |
|                     if (field_length > 255)
 | |
|                     {
 | |
|                         bytes = ptr[0] + (ptr[1] << 8);
 | |
|                         ptr += 2;
 | |
|                     }
 | |
|                     else
 | |
|                     {
 | |
|                         bytes = *ptr++;
 | |
|                     }
 | |
| 
 | |
|                     sprintf(trace[i], "[%ld] CHAR: field: %d bytes, data: %d bytes", i, field_length, bytes);
 | |
|                     char str[bytes + 1];
 | |
|                     memcpy(str, ptr, bytes);
 | |
|                     str[bytes] = '\0';
 | |
|                     conv->column(i, str);
 | |
|                     ptr += bytes;
 | |
|                     check_overflow(ptr <= end);
 | |
|                 }
 | |
|             }
 | |
|             else if (column_is_bit(map->column_types[i]))
 | |
|             {
 | |
|                 uint8_t len = metadata[metadata_offset + 1];
 | |
|                 uint8_t bit_len = metadata[metadata_offset] > 0 ? 1 : 0;
 | |
|                 size_t bytes = len + bit_len;
 | |
| 
 | |
|                 // TODO: extract the bytes
 | |
|                 if (!warn_bit)
 | |
|                 {
 | |
|                     warn_bit = true;
 | |
|                     MXS_WARNING("BIT is not currently supported, values are stored as 0.");
 | |
|                 }
 | |
|                 conv->column(i, 0);
 | |
|                 sprintf(trace[i], "[%ld] BIT", i);
 | |
|                 ptr += bytes;
 | |
|                 check_overflow(ptr <= end);
 | |
|             }
 | |
|             else if (column_is_decimal(map->column_types[i]))
 | |
|             {
 | |
|                 double f_value = 0.0;
 | |
|                 ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value);
 | |
|                 conv->column(i, f_value);
 | |
|                 sprintf(trace[i], "[%ld] DECIMAL", i);
 | |
|                 check_overflow(ptr <= end);
 | |
|             }
 | |
|             else if (column_is_variable_string(map->column_types[i]))
 | |
|             {
 | |
|                 size_t sz;
 | |
|                 int bytes = metadata[metadata_offset] | metadata[metadata_offset + 1] << 8;
 | |
|                 if (bytes > 255)
 | |
|                 {
 | |
|                     sz = gw_mysql_get_byte2(ptr);
 | |
|                     ptr += 2;
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     sz = *ptr;
 | |
|                     ptr++;
 | |
|                 }
 | |
| 
 | |
|                 sprintf(trace[i], "[%ld] VARCHAR: field: %d bytes, data: %lu bytes", i, bytes, sz);
 | |
|                 char buf[sz + 1];
 | |
|                 memcpy(buf, ptr, sz);
 | |
|                 buf[sz] = '\0';
 | |
|                 ptr += sz;
 | |
|                 conv->column(i, buf);
 | |
|                 check_overflow(ptr <= end);
 | |
|             }
 | |
|             else if (column_is_blob(map->column_types[i]))
 | |
|             {
 | |
|                 uint8_t bytes = metadata[metadata_offset];
 | |
|                 uint64_t len = 0;
 | |
|                 memcpy(&len, ptr, bytes);
 | |
|                 ptr += bytes;
 | |
|                 sprintf(trace[i], "[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len);
 | |
|                 if (len)
 | |
|                 {
 | |
|                     conv->column(i, ptr, len);
 | |
|                     ptr += len;
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     uint8_t nullvalue = 0;
 | |
|                     conv->column(i, &nullvalue, 1);
 | |
|                 }
 | |
|                 check_overflow(ptr <= end);
 | |
|             }
 | |
|             else if (column_is_temporal(map->column_types[i]))
 | |
|             {
 | |
|                 char buf[80];
 | |
|                 struct tm tm;
 | |
|                 ptr += unpack_temporal_value(map->column_types[i],
 | |
|                                              ptr,
 | |
|                                              &metadata[metadata_offset],
 | |
|                                              create->columns[i].length,
 | |
|                                              &tm);
 | |
|                 format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm);
 | |
|                 conv->column(i, buf);
 | |
|                 sprintf(trace[i], "[%ld] %s: %s", i, column_type_to_string(map->column_types[i]), buf);
 | |
|                 check_overflow(ptr <= end);
 | |
|             }
 | |
|             /** All numeric types (INT, LONG, FLOAT etc.) */
 | |
|             else
 | |
|             {
 | |
|                 uint8_t lval[16];
 | |
|                 memset(lval, 0, sizeof(lval));
 | |
|                 ptr += unpack_numeric_field(ptr,
 | |
|                                             map->column_types[i],
 | |
|                                             &metadata[metadata_offset],
 | |
|                                             lval);
 | |
|                 set_numeric_field_value(conv, i, map->column_types[i], &metadata[metadata_offset], lval);
 | |
|                 sprintf(trace[i], "[%ld] %s", i, column_type_to_string(map->column_types[i]));
 | |
|                 check_overflow(ptr <= end);
 | |
|             }
 | |
|             mxb_assert(metadata_offset <= map->column_metadata.size());
 | |
|             metadata_offset += get_metadata_len(map->column_types[i]);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             sprintf(trace[i], "[%ld] %s: Not present", i, column_type_to_string(map->column_types[i]));
 | |
|         }
 | |
| 
 | |
|         MXS_INFO("%s", trace[i]);
 | |
|     }
 | |
| 
 | |
|     return ptr;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Read the fully qualified name of the table
 | |
|  *
 | |
|  * @param ptr Pointer to the start of the event payload
 | |
|  * @param post_header_len Length of the event specific header, 8 or 6 bytes
 | |
|  * @param dest Destination where the string is stored
 | |
|  * @param len Size of destination
 | |
|  */
 | |
| void read_table_info(uint8_t* ptr, uint8_t post_header_len, uint64_t* tbl_id, char* dest, size_t len)
 | |
| {
 | |
|     uint64_t table_id = 0;
 | |
|     size_t id_size = post_header_len == 6 ? 4 : 6;
 | |
|     memcpy(&table_id, ptr, id_size);
 | |
|     ptr += id_size;
 | |
| 
 | |
|     uint16_t flags = 0;
 | |
|     memcpy(&flags, ptr, 2);
 | |
|     ptr += 2;
 | |
| 
 | |
|     uint8_t schema_name_len = *ptr++;
 | |
|     char schema_name[schema_name_len + 2];
 | |
| 
 | |
|     /** Copy the NULL byte after the schema name */
 | |
|     memcpy(schema_name, ptr, schema_name_len + 1);
 | |
|     ptr += schema_name_len + 1;
 | |
| 
 | |
|     uint8_t table_name_len = *ptr++;
 | |
|     char table_name[table_name_len + 2];
 | |
| 
 | |
|     /** Copy the NULL byte after the table name */
 | |
|     memcpy(table_name, ptr, table_name_len + 1);
 | |
| 
 | |
|     snprintf(dest, len, "%s.%s", schema_name, table_name);
 | |
|     *tbl_id = table_id;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Handle a table map event
 | |
|  *
 | |
|  * This converts a table map events into table meta data that will be used when
 | |
|  * converting binlogs to Avro format.
 | |
|  * @param router Avro router instance
 | |
|  * @param hdr Replication header
 | |
|  * @param ptr Pointer to event payload
 | |
|  */
 | |
| bool Rpl::handle_table_map_event(REP_HEADER* hdr, uint8_t* ptr)
 | |
| {
 | |
|     bool rval = false;
 | |
|     uint64_t id;
 | |
|     char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
 | |
|     int ev_len = m_event_type_hdr_lens[hdr->event_type];
 | |
| 
 | |
|     read_table_info(ptr, ev_len, &id, table_ident, sizeof(table_ident));
 | |
| 
 | |
|     if (!table_matches(table_ident))
 | |
|     {
 | |
|         return true;
 | |
|     }
 | |
| 
 | |
|     auto create = m_created_tables.find(table_ident);
 | |
| 
 | |
|     if (create != m_created_tables.end())
 | |
|     {
 | |
|         mxb_assert(create->second->columns.size() > 0);
 | |
|         auto it = m_table_maps.find(table_ident);
 | |
|         STableMapEvent map(table_map_alloc(ptr, ev_len, create->second.get()));
 | |
| 
 | |
|         if (it != m_table_maps.end())
 | |
|         {
 | |
|             auto old = it->second;
 | |
| 
 | |
|             if (old->id == map->id && old->version == map->version
 | |
|                 && old->table == map->table && old->database == map->database)
 | |
|             {
 | |
|                 // We can reuse the table map object
 | |
|                 return true;
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         if (m_handler->open_table(map, create->second))
 | |
|         {
 | |
|             create->second->was_used = true;
 | |
| 
 | |
|             auto old = m_table_maps.find(table_ident);
 | |
|             bool notify = old != m_table_maps.end();
 | |
| 
 | |
|             if (notify)
 | |
|             {
 | |
|                 m_active_maps.erase(old->second->id);
 | |
|             }
 | |
| 
 | |
|             m_table_maps[table_ident] = map;
 | |
|             m_active_maps[map->id] = map;
 | |
|             mxb_assert(m_active_maps[id] == map);
 | |
|             MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
 | |
|             rval = true;
 | |
|         }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         MXS_WARNING("Table map event for table '%s' read before the DDL statement "
 | |
|                     "for that table  was read. Data will not be processed for this "
 | |
|                     "table until a DDL statement for it is read.",
 | |
|                     table_ident);
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Handle a single RBR row event
 | |
|  *
 | |
|  * These events contain the changes in the data. This function assumes that full
 | |
|  * row image is sent in every row event.
 | |
|  *
 | |
|  * @param router Avro router instance
 | |
|  * @param hdr Replication header
 | |
|  * @param ptr Pointer to the start of the event
 | |
|  * @return True on succcess, false on error
 | |
|  */
 | |
| bool Rpl::handle_row_event(REP_HEADER* hdr, uint8_t* ptr)
 | |
| {
 | |
|     bool rval = false;
 | |
|     uint8_t* end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
 | |
|     uint8_t table_id_size = m_event_type_hdr_lens[hdr->event_type] == 6 ? 4 : 6;
 | |
|     uint64_t table_id = 0;
 | |
| 
 | |
|     /** The first value is the ID where the table was mapped. This should be
 | |
|      * the same as the ID in the table map even which was processed before this
 | |
|      * row event. */
 | |
|     memcpy(&table_id, ptr, table_id_size);
 | |
|     ptr += table_id_size;
 | |
| 
 | |
|     /** Replication flags, currently ignored for the most part. */
 | |
|     uint16_t flags = 0;
 | |
|     memcpy(&flags, ptr, 2);
 | |
|     ptr += 2;
 | |
| 
 | |
|     if (table_id == TABLE_DUMMY_ID && flags & ROW_EVENT_END_STATEMENT)
 | |
|     {
 | |
|         /** This is an dummy event which should release all table maps. Right
 | |
|          * now we just return without processing the rows. */
 | |
|         return true;
 | |
|     }
 | |
| 
 | |
|     /** Newer replication events have extra data stored in the header. MariaDB
 | |
|      * 10.1 does not use these and instead use the v1 events */
 | |
|     if (hdr->event_type > DELETE_ROWS_EVENTv1)
 | |
|     {
 | |
|         /** Version 2 row event, skip extra data */
 | |
|         uint16_t extra_len = 0;
 | |
|         memcpy(&extra_len, ptr, 2);
 | |
|         ptr += 2 + extra_len;
 | |
|     }
 | |
| 
 | |
|     /** Number of columns in the table */
 | |
|     uint64_t ncolumns = mxs_leint_consume(&ptr);
 | |
| 
 | |
|     /** If full row image is used, all columns are present. Currently only full
 | |
|      * row image is supported and thus the bitfield should be all ones. In
 | |
|      * the future partial row images could be used if the bitfield containing
 | |
|      * the columns that are present in this event is used. */
 | |
|     const int coldata_size = (ncolumns + 7) / 8;
 | |
|     uint8_t col_present[coldata_size];
 | |
|     memcpy(&col_present, ptr, coldata_size);
 | |
|     ptr += coldata_size;
 | |
| 
 | |
|     /** Update events have the before and after images of the row. This can be
 | |
|      * used to calculate a "delta" of sorts if necessary. Currently we store
 | |
|      * both the before and the after images. */
 | |
|     uint8_t col_update[coldata_size];
 | |
|     if (hdr->event_type == UPDATE_ROWS_EVENTv1
 | |
|         || hdr->event_type == UPDATE_ROWS_EVENTv2)
 | |
|     {
 | |
|         memcpy(&col_update, ptr, coldata_size);
 | |
|         ptr += coldata_size;
 | |
|     }
 | |
| 
 | |
|     // There should always be a table map event prior to a row event.
 | |
| 
 | |
|     auto it = m_active_maps.find(table_id);
 | |
| 
 | |
|     if (it != m_active_maps.end())
 | |
|     {
 | |
|         STableMapEvent map = it->second;
 | |
|         char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
 | |
|         snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str());
 | |
| 
 | |
|         if (!table_matches(table_ident))
 | |
|         {
 | |
|             return true;
 | |
|         }
 | |
| 
 | |
|         auto create = m_created_tables.find(table_ident);
 | |
|         bool ok = false;
 | |
| 
 | |
|         if (create != m_created_tables.end() && ncolumns == map->columns()
 | |
|             && create->second->columns.size() == map->columns()
 | |
|             && m_handler->prepare_table(map, create->second))
 | |
|         {
 | |
|             ok = true;
 | |
|         }
 | |
| 
 | |
|         if (ok)
 | |
|         {
 | |
|             /** Each event has one or more rows in it. The number of rows is not known
 | |
|              * beforehand so we must continue processing them until we reach the end
 | |
|              * of the event. */
 | |
|             int rows = 0;
 | |
|             MXS_INFO("Row Event for '%s' at %u", table_ident, hdr->next_pos - hdr->event_size);
 | |
| 
 | |
|             while (ptr < end)
 | |
|             {
 | |
|                 int event_type = get_event_type(hdr->event_type);
 | |
| 
 | |
|                 // Increment the event count for this transaction
 | |
|                 m_gtid.event_num++;
 | |
| 
 | |
|                 m_handler->prepare_row(m_gtid, *hdr, event_type);
 | |
|                 ptr = process_row_event_data(map, create->second, m_handler, ptr, col_present, end);
 | |
|                 m_handler->commit(m_gtid);
 | |
| 
 | |
|                 /** Update rows events have the before and after images of the
 | |
|                  * affected rows so we'll process them as another record with
 | |
|                  * a different type */
 | |
|                 if (event_type == UPDATE_EVENT)
 | |
|                 {
 | |
|                     m_gtid.event_num++;
 | |
|                     m_handler->prepare_row(m_gtid, *hdr, UPDATE_EVENT_AFTER);
 | |
|                     ptr = process_row_event_data(map, create->second, m_handler, ptr, col_present, end);
 | |
|                     m_handler->commit(m_gtid);
 | |
|                 }
 | |
| 
 | |
|                 rows++;
 | |
|             }
 | |
| 
 | |
|             rval = true;
 | |
|         }
 | |
|         else if (!ok)
 | |
|         {
 | |
|             MXS_ERROR("Avro file handle was not found for table %s.%s. See earlier"
 | |
|                       " errors for more details.",
 | |
|                       map->database.c_str(),
 | |
|                       map->table.c_str());
 | |
|         }
 | |
|         else if (create == m_created_tables.end())
 | |
|         {
 | |
|             MXS_ERROR("Create table statement for %s.%s was not found from the "
 | |
|                       "binary logs or the stored schema was not correct.",
 | |
|                       map->database.c_str(),
 | |
|                       map->table.c_str());
 | |
|         }
 | |
|         else if (ncolumns == map->columns() && create->second->columns.size() != map->columns())
 | |
|         {
 | |
|             MXS_ERROR("Table map event has a different column count for table "
 | |
|                       "%s.%s than the CREATE TABLE statement. Possible "
 | |
|                       "unsupported DDL detected.",
 | |
|                       map->database.c_str(),
 | |
|                       map->table.c_str());
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             MXS_ERROR("Row event and table map event have different column "
 | |
|                       "counts for table %s.%s, only full row image is currently "
 | |
|                       "supported.",
 | |
|                       map->database.c_str(),
 | |
|                       map->table.c_str());
 | |
|         }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         MXS_INFO("Row event for unknown table mapped to ID %lu. Data will not "
 | |
|                  "be processed.",
 | |
|                  table_id);
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Detection of table creation statements
 | |
|  * @param router Avro router instance
 | |
|  * @param ptr Pointer to statement
 | |
|  * @param len Statement length
 | |
|  * @return True if the statement creates a new table
 | |
|  */
 | |
| bool is_create_table_statement(pcre2_code* create_table_re, char* ptr, size_t len)
 | |
| {
 | |
|     int rc = 0;
 | |
|     pcre2_match_data* mdata = pcre2_match_data_create_from_pattern(create_table_re, NULL);
 | |
| 
 | |
|     if (mdata)
 | |
|     {
 | |
|         rc = pcre2_match(create_table_re, (PCRE2_SPTR) ptr, len, 0, 0, mdata, NULL);
 | |
|         pcre2_match_data_free(mdata);
 | |
|     }
 | |
| 
 | |
|     return rc > 0;
 | |
| }
 | |
| 
 | |
| bool is_create_like_statement(const char* ptr, size_t len)
 | |
| {
 | |
|     char sql[len + 1];
 | |
|     memcpy(sql, ptr, len);
 | |
|     sql[len] = '\0';
 | |
| 
 | |
|     // This is not pretty but it should work
 | |
|     return strcasestr(sql, " like ") || strcasestr(sql, "(like ");
 | |
| }
 | |
| 
 | |
| bool is_create_as_statement(const char* ptr, size_t len)
 | |
| {
 | |
|     int err = 0;
 | |
|     char sql[len + 1];
 | |
|     memcpy(sql, ptr, len);
 | |
|     sql[len] = '\0';
 | |
|     const char* pattern
 | |
|         =   // Case-insensitive mode
 | |
|             "(?i)"
 | |
|             // Main CREATE TABLE part (the \s is for any whitespace)
 | |
|             "create\\stable\\s"
 | |
|             // Optional IF NOT EXISTS
 | |
|             "(if\\snot\\sexists\\s)?"
 | |
|             // The table name with optional database name, both enclosed in optional backticks
 | |
|             "(`?\\S+`?.)`?\\S+`?\\s"
 | |
|             // And finally the AS keyword
 | |
|             "as";
 | |
| 
 | |
|     return mxs_pcre2_simple_match(pattern, sql, 0, &err) == MXS_PCRE2_MATCH;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Detection of table alteration statements
 | |
|  * @param router Avro router instance
 | |
|  * @param ptr Pointer to statement
 | |
|  * @param len Statement length
 | |
|  * @return True if the statement alters a table
 | |
|  */
 | |
| bool is_alter_table_statement(pcre2_code* alter_table_re, char* ptr, size_t len)
 | |
| {
 | |
|     int rc = 0;
 | |
|     pcre2_match_data* mdata = pcre2_match_data_create_from_pattern(alter_table_re, NULL);
 | |
| 
 | |
|     if (mdata)
 | |
|     {
 | |
|         rc = pcre2_match(alter_table_re, (PCRE2_SPTR) ptr, len, 0, 0, mdata, NULL);
 | |
|         pcre2_match_data_free(mdata);
 | |
|     }
 | |
| 
 | |
|     return rc > 0;
 | |
| }
 | |
| 
 | |
| /** Database name offset */
 | |
| #define DBNM_OFF 8
 | |
| 
 | |
| /** Varblock offset */
 | |
| #define VBLK_OFF 4 + 4 + 1 + 2
 | |
| 
 | |
| /** Post-header offset */
 | |
| #define PHDR_OFF 4 + 4 + 1 + 2 + 2
 | |
| 
 | |
| /**
 | |
|  * Save the CREATE TABLE statement to disk and replace older versions of the table
 | |
|  * in the router's hashtable.
 | |
|  * @param router Avro router instance
 | |
|  * @param created Created table
 | |
|  * @return False if an error occurred and true if successful
 | |
|  */
 | |
| bool Rpl::save_and_replace_table_create(STableCreateEvent created)
 | |
| {
 | |
|     std::string table_ident = created->id();
 | |
|     auto it = m_created_tables.find(table_ident);
 | |
| 
 | |
|     if (it != m_created_tables.end())
 | |
|     {
 | |
|         auto tm_it = m_table_maps.find(table_ident);
 | |
| 
 | |
|         if (tm_it != m_table_maps.end())
 | |
|         {
 | |
|             m_active_maps.erase(tm_it->second->id);
 | |
|             m_table_maps.erase(tm_it);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     m_created_tables[table_ident] = created;
 | |
|     mxb_assert(created->columns.size() > 0);
 | |
|     return m_handler->create_table(created);
 | |
| }
 | |
| 
 | |
| void unify_whitespace(char* sql, int len)
 | |
| {
 | |
|     for (int i = 0; i < len; i++)
 | |
|     {
 | |
|         if (isspace(sql[i]) && sql[i] != ' ')
 | |
|         {
 | |
|             sql[i] = ' ';
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * A very simple function for stripping auto-generated executable comments
 | |
|  *
 | |
|  * Note that the string will not strip the trailing part of the comment, making
 | |
|  * the SQL invalid.
 | |
|  *
 | |
|  * @param sql String to modify
 | |
|  * @param len Pointer to current length of string, updated to new length if
 | |
|  *            @c sql is modified
 | |
|  */
 | |
| static void strip_executable_comments(char* sql, int* len)
 | |
| {
 | |
|     if (strncmp(sql, "/*!", 3) == 0 || strncmp(sql, "/*M!", 4) == 0)
 | |
|     {
 | |
|         // Executable comment, remove it
 | |
|         char* p = sql + 3;
 | |
|         if (*p == '!')
 | |
|         {
 | |
|             p++;
 | |
|         }
 | |
| 
 | |
|         // Skip the versioning part
 | |
|         while (*p && isdigit(*p))
 | |
|         {
 | |
|             p++;
 | |
|         }
 | |
| 
 | |
|         int n_extra = p - sql;
 | |
|         int new_len = *len - n_extra;
 | |
|         memmove(sql, sql + n_extra, new_len);
 | |
|         *len = new_len;
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Handling of query events
 | |
|  *
 | |
|  * @param router Avro router instance
 | |
|  * @param hdr Replication header
 | |
|  * @param pending_transaction Pointer where status of pending transaction is stored
 | |
|  * @param ptr Pointer to the start of the event payload
 | |
|  */
 | |
| void Rpl::handle_query_event(REP_HEADER* hdr, uint8_t* ptr)
 | |
| {
 | |
|     int dblen = ptr[DBNM_OFF];
 | |
|     int vblklen = gw_mysql_get_byte2(ptr + VBLK_OFF);
 | |
|     int len = hdr->event_size - BINLOG_EVENT_HDR_LEN - (PHDR_OFF + vblklen + 1 + dblen);
 | |
|     char* sql = (char*) ptr + PHDR_OFF + vblklen + 1 + dblen;
 | |
|     char db[dblen + 1];
 | |
|     memcpy(db, (char*) ptr + PHDR_OFF + vblklen, dblen);
 | |
|     db[dblen] = 0;
 | |
| 
 | |
|     size_t sqlsz = len, tmpsz = len;
 | |
|     char* tmp = static_cast<char*>(MXS_MALLOC(len + 1));
 | |
|     MXS_ABORT_IF_NULL(tmp);
 | |
|     remove_mysql_comments((const char**)&sql, &sqlsz, &tmp, &tmpsz);
 | |
|     sql = tmp;
 | |
|     len = tmpsz;
 | |
|     unify_whitespace(sql, len);
 | |
|     strip_executable_comments(sql, &len);
 | |
|     sql[len] = '\0';
 | |
| 
 | |
|     if (*sql == '\0')
 | |
|     {
 | |
|         MXS_FREE(tmp);
 | |
|         return;
 | |
|     }
 | |
| 
 | |
|     static bool warn_not_row_format = true;
 | |
| 
 | |
|     if (warn_not_row_format)
 | |
|     {
 | |
|         GWBUF* buffer = gwbuf_alloc(len + 5);
 | |
|         gw_mysql_set_byte3(GWBUF_DATA(buffer), len + 1);
 | |
|         GWBUF_DATA(buffer)[4] = 0x03;
 | |
|         memcpy(GWBUF_DATA(buffer) + 5, sql, len);
 | |
|         qc_query_op_t op = qc_get_operation(buffer);
 | |
|         gwbuf_free(buffer);
 | |
| 
 | |
|         if (op == QUERY_OP_UPDATE || op == QUERY_OP_INSERT || op == QUERY_OP_DELETE)
 | |
|         {
 | |
|             MXS_WARNING("Possible STATEMENT or MIXED format binary log. Check that "
 | |
|                         "'binlog_format' is set to ROW on the master.");
 | |
|             warn_not_row_format = false;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     char ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
 | |
|     read_table_identifier(db, sql, sql + len, ident, sizeof(ident));
 | |
| 
 | |
|     if (is_create_table_statement(m_create_table_re, sql, len))
 | |
|     {
 | |
|         STableCreateEvent created;
 | |
| 
 | |
|         if (is_create_like_statement(sql, len))
 | |
|         {
 | |
|             created = table_create_copy(sql, len, db);
 | |
|         }
 | |
|         else if (is_create_as_statement(sql, len))
 | |
|         {
 | |
|             static bool warn_create_as = true;
 | |
|             if (warn_create_as)
 | |
|             {
 | |
|                 MXS_WARNING("`CREATE TABLE AS` is not yet supported, ignoring events to this table: %.*s",
 | |
|                             len,
 | |
|                             sql);
 | |
|                 warn_create_as = false;
 | |
|             }
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             created = table_create_alloc(ident, sql, len);
 | |
|         }
 | |
| 
 | |
|         if (created && !save_and_replace_table_create(created))
 | |
|         {
 | |
|             MXS_ERROR("Failed to save statement to disk: %.*s", len, sql);
 | |
|         }
 | |
|     }
 | |
|     else if (is_alter_table_statement(m_alter_table_re, sql, len))
 | |
|     {
 | |
|         auto it = m_created_tables.find(ident);
 | |
| 
 | |
|         if (it != m_created_tables.end())
 | |
|         {
 | |
|             table_create_alter(it->second, sql, sql + len);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             MXS_ERROR("Alter statement to table '%s' has no preceding create statement.", ident);
 | |
|         }
 | |
|     }
 | |
|     // TODO: Add COMMIT handling for non-transactional tables
 | |
| 
 | |
|     MXS_FREE(tmp);
 | |
| }
 | |
| 
 | |
| void Rpl::handle_event(REP_HEADER hdr, uint8_t* ptr)
 | |
| {
 | |
|     if (m_binlog_checksum)
 | |
|     {
 | |
|         // We don't care about the checksum at this point so we ignore it
 | |
|         hdr.event_size -= 4;
 | |
|     }
 | |
| 
 | |
|     // The following events are related to the actual data
 | |
|     if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
 | |
|     {
 | |
|         const int BLRM_FDE_EVENT_TYPES_OFFSET = 2 + 50 + 4 + 1;
 | |
|         const int FDE_EXTRA_BYTES = 5;
 | |
|         int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1];
 | |
|         int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES;
 | |
|         uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES;
 | |
|         m_event_type_hdr_lens.assign(ptr, ptr + n_events);
 | |
|         m_event_types = n_events;
 | |
|         m_binlog_checksum = checksum[0];
 | |
|     }
 | |
|     else if (hdr.event_type == TABLE_MAP_EVENT)
 | |
|     {
 | |
|         handle_table_map_event(&hdr, ptr);
 | |
|     }
 | |
|     else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1)
 | |
|              || (hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2))
 | |
|     {
 | |
|         handle_row_event(&hdr, ptr);
 | |
|     }
 | |
|     else if (hdr.event_type == GTID_EVENT)
 | |
|     {
 | |
|         m_gtid.extract(hdr, ptr);
 | |
|     }
 | |
|     else if (hdr.event_type == QUERY_EVENT)
 | |
|     {
 | |
|         handle_query_event(&hdr, ptr);
 | |
|     }
 | |
| }
 |