1062 lines
		
	
	
		
			33 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1062 lines
		
	
	
		
			33 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|  * Copyright (c) 2016 MariaDB Corporation Ab
 | |
|  *
 | |
|  * Use of this software is governed by the Business Source License included
 | |
|  * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | |
|  *
 | |
|  * Change Date: 2024-06-02
 | |
|  *
 | |
|  * On the date above, in accordance with the Business Source License, use
 | |
|  * of this software will be governed by version 2 or later of the General
 | |
|  * Public License.
 | |
|  */
 | |
| 
 | |
| #include "avrorouter.hh"
 | |
| 
 | |
| #include <maxscale/mysql_utils.hh>
 | |
| #include <jansson.h>
 | |
| #include <maxbase/alloc.h>
 | |
| #include <strings.h>
 | |
| #include <signal.h>
 | |
| #include <maxscale/utils.h>
 | |
| 
 | |
| #define WRITE_EVENT        0
 | |
| #define UPDATE_EVENT       1
 | |
| #define UPDATE_EVENT_AFTER 2
 | |
| #define DELETE_EVENT       3
 | |
| 
 | |
| static bool warn_decimal = false;       /**< Remove when support for DECIMAL is added */
 | |
| static bool warn_bit = false;           /**< Remove when support for BIT is added */
 | |
| static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET values
 | |
|                                          * larger than 255 is added */
 | |
| 
 | |
| /**
 | |
|  * @brief Get row event name
 | |
|  * @param event Event type
 | |
|  * @return String representation of the event
 | |
|  */
 | |
| static int get_event_type(uint8_t event)
 | |
| {
 | |
|     switch (event)
 | |
|     {
 | |
| 
 | |
|     case WRITE_ROWS_EVENTv0:
 | |
|     case WRITE_ROWS_EVENTv1:
 | |
|     case WRITE_ROWS_EVENTv2:
 | |
|         return WRITE_EVENT;
 | |
| 
 | |
|     case UPDATE_ROWS_EVENTv0:
 | |
|     case UPDATE_ROWS_EVENTv1:
 | |
|     case UPDATE_ROWS_EVENTv2:
 | |
|         return UPDATE_EVENT;
 | |
| 
 | |
|     case DELETE_ROWS_EVENTv0:
 | |
|     case DELETE_ROWS_EVENTv1:
 | |
|     case DELETE_ROWS_EVENTv2:
 | |
|         return DELETE_EVENT;
 | |
| 
 | |
|     default:
 | |
|         MXS_ERROR("Unexpected event type: %d (%0x)", event, event);
 | |
|         return -1;
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Unpack numeric types
 | |
|  *
 | |
|  * Convert the raw binary data into actual numeric types.
 | |
|  *
 | |
|  * @param conv     Event converter to use
 | |
|  * @param idx      Position of this column in the row
 | |
|  * @param type     Event type
 | |
|  * @param metadata Field metadata
 | |
|  * @param value    Pointer to the start of the in-memory representation of the data
 | |
|  */
 | |
| void set_numeric_field_value(SRowEventHandler& conv,
 | |
|                              int idx,
 | |
|                              uint8_t type,
 | |
|                              uint8_t* metadata,
 | |
|                              uint8_t* value,
 | |
|                              bool is_unsigned)
 | |
| {
 | |
|     switch (type)
 | |
|     {
 | |
|     case TABLE_COL_TYPE_TINY:
 | |
|         if (is_unsigned)
 | |
|         {
 | |
|             uint8_t c = *value;
 | |
|             conv->column_int(idx, c);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             int8_t c = *value;
 | |
|             conv->column_int(idx, c);
 | |
|         }
 | |
|         break;
 | |
| 
 | |
|     case TABLE_COL_TYPE_SHORT:
 | |
|         if (is_unsigned)
 | |
|         {
 | |
|             uint16_t s = gw_mysql_get_byte2(value);
 | |
|             conv->column_int(idx, s);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             int16_t s = gw_mysql_get_byte2(value);
 | |
|             conv->column_int(idx, s);
 | |
|         }
 | |
|         break;
 | |
| 
 | |
|     case TABLE_COL_TYPE_INT24:
 | |
|         if (is_unsigned)
 | |
|         {
 | |
|             uint32_t x = gw_mysql_get_byte3(value);
 | |
|             conv->column_int(idx, x);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             int32_t x = gw_mysql_get_byte3(value);
 | |
| 
 | |
|             if (x & 0x800000)
 | |
|             {
 | |
|                 x = -((0xffffff & (~x)) + 1);
 | |
|             }
 | |
| 
 | |
|             conv->column_int(idx, (int64_t)x);
 | |
|         }
 | |
|         break;
 | |
| 
 | |
|     case TABLE_COL_TYPE_LONG:
 | |
|         if (is_unsigned)
 | |
|         {
 | |
|             uint32_t x = gw_mysql_get_byte4(value);
 | |
|             conv->column_long(idx, x);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             int32_t x = gw_mysql_get_byte4(value);
 | |
|             conv->column_long(idx, x);
 | |
|         }
 | |
|         break;
 | |
| 
 | |
|     case TABLE_COL_TYPE_LONGLONG:
 | |
|         conv->column_long(idx, gw_mysql_get_byte8(value));
 | |
|         break;
 | |
| 
 | |
|     case TABLE_COL_TYPE_FLOAT:
 | |
|         {
 | |
|             float f = 0;
 | |
|             memcpy(&f, value, 4);
 | |
|             conv->column_float(idx, f);
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|     case TABLE_COL_TYPE_DOUBLE:
 | |
|         {
 | |
|             double d = 0;
 | |
|             memcpy(&d, value, 8);
 | |
|             conv->column_double(idx, d);
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|     default:
 | |
|         break;
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Check if a bit is set
 | |
|  *
 | |
|  * @param ptr Pointer to start of bitfield
 | |
|  * @param columns Number of columns (bits)
 | |
|  * @param current_column Zero indexed column number
 | |
|  * @return True if the bit is set
 | |
|  */
 | |
| static bool bit_is_set(uint8_t* ptr, int columns, int current_column)
 | |
| {
 | |
|     if (current_column >= 8)
 | |
|     {
 | |
|         ptr += current_column / 8;
 | |
|         current_column = current_column % 8;
 | |
|     }
 | |
| 
 | |
|     return (*ptr) & (1 << current_column);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Get the length of the metadata for a particular field
 | |
|  *
 | |
|  * @param type Type of the field
 | |
|  * @return Length of the metadata for this field
 | |
|  */
 | |
| int get_metadata_len(uint8_t type)
 | |
| {
 | |
|     switch (type)
 | |
|     {
 | |
|     case TABLE_COL_TYPE_STRING:
 | |
|     case TABLE_COL_TYPE_VAR_STRING:
 | |
|     case TABLE_COL_TYPE_VARCHAR:
 | |
|     case TABLE_COL_TYPE_DECIMAL:
 | |
|     case TABLE_COL_TYPE_NEWDECIMAL:
 | |
|     case TABLE_COL_TYPE_ENUM:
 | |
|     case TABLE_COL_TYPE_SET:
 | |
|     case TABLE_COL_TYPE_BIT:
 | |
|         return 2;
 | |
| 
 | |
|     case TABLE_COL_TYPE_BLOB:
 | |
|     case TABLE_COL_TYPE_FLOAT:
 | |
|     case TABLE_COL_TYPE_DOUBLE:
 | |
|     case TABLE_COL_TYPE_DATETIME2:
 | |
|     case TABLE_COL_TYPE_TIMESTAMP2:
 | |
|     case TABLE_COL_TYPE_TIME2:
 | |
|         return 1;
 | |
| 
 | |
|     default:
 | |
|         return 0;
 | |
|     }
 | |
| }
 | |
| 
 | |
| // Make sure that both `i` and `trace` are defined before using this macro
 | |
| #define check_overflow(t) \
 | |
|     do \
 | |
|     { \
 | |
|         if (!(t)) \
 | |
|         { \
 | |
|             for (long x = 0; x < i; x++) \
 | |
|             { \
 | |
|                 MXS_ALERT("%s", trace[x]); \
 | |
|             } \
 | |
|             raise(SIGABRT); \
 | |
|         } \
 | |
|     } while (false)
 | |
| 
 | |
| // Debug function for checking whether a row event consists of only NULL values
 | |
| static bool all_fields_null(uint8_t* null_bitmap, int ncolumns)
 | |
| {
 | |
|     bool rval = true;
 | |
| 
 | |
|     for (long i = 0; i < ncolumns; i++)
 | |
|     {
 | |
|         if (!bit_is_set(null_bitmap, ncolumns, i))
 | |
|         {
 | |
|             rval = false;
 | |
|             break;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Extract the values from a single row  in a row event
 | |
|  *
 | |
|  * @param map Table map event associated with this row
 | |
|  * @param create Table creation associated with this row
 | |
|  * @param record Avro record used for storing this row
 | |
|  * @param ptr Pointer to the start of the row data, should be after the row event header
 | |
|  * @param columns_present The bitfield holding the columns that are present for
 | |
|  * this row event. Currently this should be a bitfield which has all bits set.
 | |
|  * @return Pointer to the first byte after the current row event
 | |
|  */
 | |
| uint8_t* process_row_event_data(STableMapEvent map,
 | |
|                                 STableCreateEvent create,
 | |
|                                 SRowEventHandler& conv,
 | |
|                                 uint8_t* ptr,
 | |
|                                 uint8_t* columns_present,
 | |
|                                 uint8_t* end)
 | |
| {
 | |
|     mxb_assert(create->database == map->database && create->table == map->table);
 | |
|     int npresent = 0;
 | |
|     long ncolumns = map->columns();
 | |
|     uint8_t* metadata = &map->column_metadata[0];
 | |
|     size_t metadata_offset = 0;
 | |
| 
 | |
|     /** BIT type values use the extra bits in the row event header */
 | |
|     int extra_bits = (((ncolumns + 7) / 8) * 8) - ncolumns;
 | |
|     mxb_assert(ptr < end);
 | |
| 
 | |
|     /** Store the null value bitmap */
 | |
|     uint8_t* null_bitmap = ptr;
 | |
|     ptr += (ncolumns + 7) / 8;
 | |
|     mxb_assert(ptr < end || (bit_is_set(null_bitmap, ncolumns, 0)));
 | |
| 
 | |
|     char trace[ncolumns][768];
 | |
|     memset(trace, 0, sizeof(trace));
 | |
| 
 | |
|     for (long i = 0; i < ncolumns && npresent < ncolumns; i++)
 | |
|     {
 | |
|         if (bit_is_set(columns_present, ncolumns, i))
 | |
|         {
 | |
|             npresent++;
 | |
| 
 | |
|             if (bit_is_set(null_bitmap, ncolumns, i))
 | |
|             {
 | |
|                 sprintf(trace[i], "[%ld] NULL", i);
 | |
|                 conv->column_null(i);
 | |
|             }
 | |
|             else if (column_is_fixed_string(map->column_types[i]))
 | |
|             {
 | |
|                 /** ENUM and SET are stored as STRING types with the type stored
 | |
|                  * in the metadata. */
 | |
|                 if (fixed_string_is_enum(metadata[metadata_offset]))
 | |
|                 {
 | |
|                     uint8_t val[metadata[metadata_offset + 1]];
 | |
|                     uint64_t bytes = unpack_enum(ptr, &metadata[metadata_offset], val);
 | |
|                     char strval[bytes * 2 + 1];
 | |
|                     gw_bin2hex(strval, val, bytes);
 | |
|                     conv->column_string(i, strval);
 | |
|                     sprintf(trace[i], "[%ld] ENUM: %lu bytes", i, bytes);
 | |
|                     ptr += bytes;
 | |
|                     check_overflow(ptr <= end);
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     /**
 | |
|                      * The first byte in the metadata stores the real type of
 | |
|                      * the string (ENUM and SET types are also stored as fixed
 | |
|                      * length strings).
 | |
|                      *
 | |
|                      * The first two bits of the second byte contain the XOR'ed
 | |
|                      * field length but as that information is not relevant for
 | |
|                      * us, we just use this information to know whether to read
 | |
|                      * one or two bytes for string length.
 | |
|                      */
 | |
| 
 | |
|                     uint16_t meta = metadata[metadata_offset + 1] + (metadata[metadata_offset] << 8);
 | |
|                     int bytes = 0;
 | |
|                     uint16_t extra_length = (((meta >> 4) & 0x300) ^ 0x300);
 | |
|                     uint16_t field_length = (meta & 0xff) + extra_length;
 | |
| 
 | |
|                     if (field_length > 255)
 | |
|                     {
 | |
|                         bytes = ptr[0] + (ptr[1] << 8);
 | |
|                         ptr += 2;
 | |
|                     }
 | |
|                     else
 | |
|                     {
 | |
|                         bytes = *ptr++;
 | |
|                     }
 | |
| 
 | |
|                     sprintf(trace[i], "[%ld] CHAR: field: %d bytes, data: %d bytes", i, field_length, bytes);
 | |
|                     char str[bytes + 1];
 | |
|                     memcpy(str, ptr, bytes);
 | |
|                     str[bytes] = '\0';
 | |
|                     conv->column_string(i, str);
 | |
|                     ptr += bytes;
 | |
|                     check_overflow(ptr <= end);
 | |
|                 }
 | |
|             }
 | |
|             else if (column_is_bit(map->column_types[i]))
 | |
|             {
 | |
|                 uint8_t len = metadata[metadata_offset + 1];
 | |
|                 uint8_t bit_len = metadata[metadata_offset] > 0 ? 1 : 0;
 | |
|                 size_t bytes = len + bit_len;
 | |
| 
 | |
|                 // TODO: extract the bytes
 | |
|                 if (!warn_bit)
 | |
|                 {
 | |
|                     warn_bit = true;
 | |
|                     MXS_WARNING("BIT is not currently supported, values are stored as 0.");
 | |
|                 }
 | |
|                 conv->column_int(i, 0);
 | |
|                 sprintf(trace[i], "[%ld] BIT", i);
 | |
|                 ptr += bytes;
 | |
|                 check_overflow(ptr <= end);
 | |
|             }
 | |
|             else if (column_is_decimal(map->column_types[i]))
 | |
|             {
 | |
|                 double f_value = 0.0;
 | |
|                 ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value);
 | |
|                 conv->column_double(i, f_value);
 | |
|                 sprintf(trace[i], "[%ld] DECIMAL", i);
 | |
|                 check_overflow(ptr <= end);
 | |
|             }
 | |
|             else if (column_is_variable_string(map->column_types[i]))
 | |
|             {
 | |
|                 size_t sz;
 | |
|                 int bytes = metadata[metadata_offset] | metadata[metadata_offset + 1] << 8;
 | |
|                 if (bytes > 255)
 | |
|                 {
 | |
|                     sz = gw_mysql_get_byte2(ptr);
 | |
|                     ptr += 2;
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     sz = *ptr;
 | |
|                     ptr++;
 | |
|                 }
 | |
| 
 | |
|                 sprintf(trace[i], "[%ld] VARCHAR: field: %d bytes, data: %lu bytes", i, bytes, sz);
 | |
|                 char buf[sz + 1];
 | |
|                 memcpy(buf, ptr, sz);
 | |
|                 buf[sz] = '\0';
 | |
|                 ptr += sz;
 | |
|                 conv->column_string(i, buf);
 | |
|                 check_overflow(ptr <= end);
 | |
|             }
 | |
|             else if (column_is_blob(map->column_types[i]))
 | |
|             {
 | |
|                 uint8_t bytes = metadata[metadata_offset];
 | |
|                 uint64_t len = 0;
 | |
|                 memcpy(&len, ptr, bytes);
 | |
|                 ptr += bytes;
 | |
|                 sprintf(trace[i], "[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len);
 | |
|                 if (len)
 | |
|                 {
 | |
|                     conv->column_bytes(i, ptr, len);
 | |
|                     ptr += len;
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     uint8_t nullvalue = 0;
 | |
|                     conv->column_bytes(i, &nullvalue, 1);
 | |
|                 }
 | |
|                 check_overflow(ptr <= end);
 | |
|             }
 | |
|             else if (column_is_temporal(map->column_types[i]))
 | |
|             {
 | |
|                 char buf[80];
 | |
|                 ptr += unpack_temporal_value(map->column_types[i], ptr,
 | |
|                                              &metadata[metadata_offset],
 | |
|                                              create->columns[i].length,
 | |
|                                              buf, sizeof(buf));
 | |
|                 conv->column_string(i, buf);
 | |
|                 sprintf(trace[i], "[%ld] %s: %s", i, column_type_to_string(map->column_types[i]), buf);
 | |
|                 check_overflow(ptr <= end);
 | |
|             }
 | |
|             /** All numeric types (INT, LONG, FLOAT etc.) */
 | |
|             else
 | |
|             {
 | |
|                 uint8_t lval[16];
 | |
|                 memset(lval, 0, sizeof(lval));
 | |
|                 ptr += unpack_numeric_field(ptr,
 | |
|                                             map->column_types[i],
 | |
|                                             &metadata[metadata_offset],
 | |
|                                             lval);
 | |
|                 set_numeric_field_value(conv, i, map->column_types[i], &metadata[metadata_offset], lval,
 | |
|                                         create->columns[i].is_unsigned);
 | |
|                 sprintf(trace[i], "[%ld] %s", i, column_type_to_string(map->column_types[i]));
 | |
|                 check_overflow(ptr <= end);
 | |
|             }
 | |
|             mxb_assert(metadata_offset <= map->column_metadata.size());
 | |
|             metadata_offset += get_metadata_len(map->column_types[i]);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             sprintf(trace[i], "[%ld] %s: Not present", i, column_type_to_string(map->column_types[i]));
 | |
|         }
 | |
| 
 | |
|         MXS_INFO("%s", trace[i]);
 | |
|     }
 | |
| 
 | |
|     return ptr;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Read the fully qualified name of the table
 | |
|  *
 | |
|  * @param ptr Pointer to the start of the event payload
 | |
|  * @param post_header_len Length of the event specific header, 8 or 6 bytes
 | |
|  * @param dest Destination where the string is stored
 | |
|  * @param len Size of destination
 | |
|  */
 | |
| void read_table_info(uint8_t* ptr, uint8_t post_header_len, uint64_t* tbl_id, char* dest, size_t len)
 | |
| {
 | |
|     uint64_t table_id = 0;
 | |
|     size_t id_size = post_header_len == 6 ? 4 : 6;
 | |
|     memcpy(&table_id, ptr, id_size);
 | |
|     ptr += id_size;
 | |
| 
 | |
|     uint16_t flags = 0;
 | |
|     memcpy(&flags, ptr, 2);
 | |
|     ptr += 2;
 | |
| 
 | |
|     uint8_t schema_name_len = *ptr++;
 | |
|     char schema_name[schema_name_len + 2];
 | |
| 
 | |
|     /** Copy the NULL byte after the schema name */
 | |
|     memcpy(schema_name, ptr, schema_name_len + 1);
 | |
|     ptr += schema_name_len + 1;
 | |
| 
 | |
|     uint8_t table_name_len = *ptr++;
 | |
|     char table_name[table_name_len + 2];
 | |
| 
 | |
|     /** Copy the NULL byte after the table name */
 | |
|     memcpy(table_name, ptr, table_name_len + 1);
 | |
| 
 | |
|     snprintf(dest, len, "%s.%s", schema_name, table_name);
 | |
|     *tbl_id = table_id;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Handle a table map event
 | |
|  *
 | |
|  * This converts a table map events into table meta data that will be used when
 | |
|  * converting binlogs to Avro format.
 | |
|  * @param router Avro router instance
 | |
|  * @param hdr Replication header
 | |
|  * @param ptr Pointer to event payload
 | |
|  */
 | |
| bool Rpl::handle_table_map_event(REP_HEADER* hdr, uint8_t* ptr)
 | |
| {
 | |
|     bool rval = false;
 | |
|     uint64_t id;
 | |
|     char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
 | |
|     int ev_len = m_event_type_hdr_lens[hdr->event_type];
 | |
| 
 | |
|     read_table_info(ptr, ev_len, &id, table_ident, sizeof(table_ident));
 | |
| 
 | |
|     if (!table_matches(table_ident))
 | |
|     {
 | |
|         return true;
 | |
|     }
 | |
| 
 | |
|     auto create = m_created_tables.find(table_ident);
 | |
| 
 | |
|     if (create != m_created_tables.end())
 | |
|     {
 | |
|         mxb_assert(create->second->columns.size() > 0);
 | |
|         auto it = m_table_maps.find(table_ident);
 | |
|         STableMapEvent map(table_map_alloc(ptr, ev_len, create->second.get()));
 | |
| 
 | |
|         if (it != m_table_maps.end())
 | |
|         {
 | |
|             auto old = it->second;
 | |
| 
 | |
|             if (old->id == map->id && old->version == map->version
 | |
|                 && old->table == map->table && old->database == map->database)
 | |
|             {
 | |
|                 // We can reuse the table map object
 | |
|                 return true;
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         if (m_handler->open_table(map, create->second))
 | |
|         {
 | |
|             create->second->was_used = true;
 | |
| 
 | |
|             auto old = m_table_maps.find(table_ident);
 | |
|             bool notify = old != m_table_maps.end();
 | |
| 
 | |
|             if (notify)
 | |
|             {
 | |
|                 m_active_maps.erase(old->second->id);
 | |
|             }
 | |
| 
 | |
|             m_table_maps[table_ident] = map;
 | |
|             m_active_maps[map->id] = map;
 | |
|             mxb_assert(m_active_maps[id] == map);
 | |
|             MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id);
 | |
|             rval = true;
 | |
|         }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         MXS_WARNING("Table map event for table '%s' read before the DDL statement "
 | |
|                     "for that table  was read. Data will not be processed for this "
 | |
|                     "table until a DDL statement for it is read.",
 | |
|                     table_ident);
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Handle a single RBR row event
 | |
|  *
 | |
|  * These events contain the changes in the data. This function assumes that full
 | |
|  * row image is sent in every row event.
 | |
|  *
 | |
|  * @param router Avro router instance
 | |
|  * @param hdr Replication header
 | |
|  * @param ptr Pointer to the start of the event
 | |
|  * @return True on succcess, false on error
 | |
|  */
 | |
| bool Rpl::handle_row_event(REP_HEADER* hdr, uint8_t* ptr)
 | |
| {
 | |
|     bool rval = false;
 | |
|     uint8_t* end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
 | |
|     uint8_t table_id_size = m_event_type_hdr_lens[hdr->event_type] == 6 ? 4 : 6;
 | |
|     uint64_t table_id = 0;
 | |
| 
 | |
|     /** The first value is the ID where the table was mapped. This should be
 | |
|      * the same as the ID in the table map even which was processed before this
 | |
|      * row event. */
 | |
|     memcpy(&table_id, ptr, table_id_size);
 | |
|     ptr += table_id_size;
 | |
| 
 | |
|     /** Replication flags, currently ignored for the most part. */
 | |
|     uint16_t flags = 0;
 | |
|     memcpy(&flags, ptr, 2);
 | |
|     ptr += 2;
 | |
| 
 | |
|     if (table_id == TABLE_DUMMY_ID && flags & ROW_EVENT_END_STATEMENT)
 | |
|     {
 | |
|         /** This is an dummy event which should release all table maps. Right
 | |
|          * now we just return without processing the rows. */
 | |
|         return true;
 | |
|     }
 | |
| 
 | |
|     /** Newer replication events have extra data stored in the header. MariaDB
 | |
|      * 10.1 does not use these and instead use the v1 events */
 | |
|     if (hdr->event_type > DELETE_ROWS_EVENTv1)
 | |
|     {
 | |
|         /** Version 2 row event, skip extra data */
 | |
|         uint16_t extra_len = 0;
 | |
|         memcpy(&extra_len, ptr, 2);
 | |
|         ptr += 2 + extra_len;
 | |
|     }
 | |
| 
 | |
|     /** Number of columns in the table */
 | |
|     uint64_t ncolumns = mxq::leint_consume(&ptr);
 | |
| 
 | |
|     /** If full row image is used, all columns are present. Currently only full
 | |
|      * row image is supported and thus the bitfield should be all ones. In
 | |
|      * the future partial row images could be used if the bitfield containing
 | |
|      * the columns that are present in this event is used. */
 | |
|     const int coldata_size = (ncolumns + 7) / 8;
 | |
|     uint8_t col_present[coldata_size];
 | |
|     memcpy(&col_present, ptr, coldata_size);
 | |
|     ptr += coldata_size;
 | |
| 
 | |
|     /** Update events have the before and after images of the row. This can be
 | |
|      * used to calculate a "delta" of sorts if necessary. Currently we store
 | |
|      * both the before and the after images. */
 | |
|     uint8_t col_update[coldata_size];
 | |
|     if (hdr->event_type == UPDATE_ROWS_EVENTv1
 | |
|         || hdr->event_type == UPDATE_ROWS_EVENTv2)
 | |
|     {
 | |
|         memcpy(&col_update, ptr, coldata_size);
 | |
|         ptr += coldata_size;
 | |
|     }
 | |
| 
 | |
|     // There should always be a table map event prior to a row event.
 | |
| 
 | |
|     auto it = m_active_maps.find(table_id);
 | |
| 
 | |
|     if (it != m_active_maps.end())
 | |
|     {
 | |
|         STableMapEvent map = it->second;
 | |
|         char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
 | |
|         snprintf(table_ident, sizeof(table_ident), "%s.%s", map->database.c_str(), map->table.c_str());
 | |
| 
 | |
|         if (!table_matches(table_ident))
 | |
|         {
 | |
|             return true;
 | |
|         }
 | |
| 
 | |
|         auto create = m_created_tables.find(table_ident);
 | |
|         bool ok = false;
 | |
| 
 | |
|         if (create != m_created_tables.end() && ncolumns == map->columns()
 | |
|             && create->second->columns.size() == map->columns()
 | |
|             && m_handler->prepare_table(map, create->second))
 | |
|         {
 | |
|             ok = true;
 | |
|         }
 | |
| 
 | |
|         if (ok)
 | |
|         {
 | |
|             /** Each event has one or more rows in it. The number of rows is not known
 | |
|              * beforehand so we must continue processing them until we reach the end
 | |
|              * of the event. */
 | |
|             int rows = 0;
 | |
|             MXS_INFO("Row Event for '%s' at %u", table_ident, hdr->next_pos - hdr->event_size);
 | |
| 
 | |
|             while (ptr < end)
 | |
|             {
 | |
|                 int event_type = get_event_type(hdr->event_type);
 | |
| 
 | |
|                 // Increment the event count for this transaction
 | |
|                 m_gtid.event_num++;
 | |
| 
 | |
|                 m_handler->prepare_row(m_gtid, *hdr, event_type);
 | |
|                 ptr = process_row_event_data(map, create->second, m_handler, ptr, col_present, end);
 | |
|                 m_handler->commit(m_gtid);
 | |
| 
 | |
|                 /** Update rows events have the before and after images of the
 | |
|                  * affected rows so we'll process them as another record with
 | |
|                  * a different type */
 | |
|                 if (event_type == UPDATE_EVENT)
 | |
|                 {
 | |
|                     m_gtid.event_num++;
 | |
|                     m_handler->prepare_row(m_gtid, *hdr, UPDATE_EVENT_AFTER);
 | |
|                     ptr = process_row_event_data(map, create->second, m_handler, ptr, col_present, end);
 | |
|                     m_handler->commit(m_gtid);
 | |
|                 }
 | |
| 
 | |
|                 rows++;
 | |
|             }
 | |
| 
 | |
|             rval = true;
 | |
|         }
 | |
|         else if (!ok)
 | |
|         {
 | |
|             MXS_ERROR("Avro file handle was not found for table %s.%s. See earlier"
 | |
|                       " errors for more details.",
 | |
|                       map->database.c_str(),
 | |
|                       map->table.c_str());
 | |
|         }
 | |
|         else if (create == m_created_tables.end())
 | |
|         {
 | |
|             MXS_ERROR("Create table statement for %s.%s was not found from the "
 | |
|                       "binary logs or the stored schema was not correct.",
 | |
|                       map->database.c_str(),
 | |
|                       map->table.c_str());
 | |
|         }
 | |
|         else if (ncolumns == map->columns() && create->second->columns.size() != map->columns())
 | |
|         {
 | |
|             MXS_ERROR("Table map event has a different column count for table "
 | |
|                       "%s.%s than the CREATE TABLE statement. Possible "
 | |
|                       "unsupported DDL detected.",
 | |
|                       map->database.c_str(),
 | |
|                       map->table.c_str());
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             MXS_ERROR("Row event and table map event have different column "
 | |
|                       "counts for table %s.%s, only full row image is currently "
 | |
|                       "supported.",
 | |
|                       map->database.c_str(),
 | |
|                       map->table.c_str());
 | |
|         }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         MXS_INFO("Row event for unknown table mapped to ID %lu. Data will not "
 | |
|                  "be processed.",
 | |
|                  table_id);
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Detection of table creation statements
 | |
|  * @param router Avro router instance
 | |
|  * @param ptr Pointer to statement
 | |
|  * @param len Statement length
 | |
|  * @return True if the statement creates a new table
 | |
|  */
 | |
| bool is_create_table_statement(pcre2_code* create_table_re, char* ptr, size_t len)
 | |
| {
 | |
|     int rc = 0;
 | |
|     pcre2_match_data* mdata = pcre2_match_data_create_from_pattern(create_table_re, NULL);
 | |
| 
 | |
|     if (mdata)
 | |
|     {
 | |
|         rc = pcre2_match(create_table_re, (PCRE2_SPTR) ptr, len, 0, 0, mdata, NULL);
 | |
|         pcre2_match_data_free(mdata);
 | |
|     }
 | |
| 
 | |
|     return rc > 0;
 | |
| }
 | |
| 
 | |
| bool is_create_like_statement(const char* ptr, size_t len)
 | |
| {
 | |
|     char sql[len + 1];
 | |
|     memcpy(sql, ptr, len);
 | |
|     sql[len] = '\0';
 | |
| 
 | |
|     // This is not pretty but it should work
 | |
|     return strcasestr(sql, " like ") || strcasestr(sql, "(like ");
 | |
| }
 | |
| 
 | |
| bool is_create_as_statement(const char* ptr, size_t len)
 | |
| {
 | |
|     int err = 0;
 | |
|     char sql[len + 1];
 | |
|     memcpy(sql, ptr, len);
 | |
|     sql[len] = '\0';
 | |
|     const char* pattern
 | |
|         =   // Case-insensitive mode
 | |
|             "(?i)"
 | |
|             // Main CREATE TABLE part (the \s is for any whitespace)
 | |
|             "create\\stable\\s"
 | |
|             // Optional IF NOT EXISTS
 | |
|             "(if\\snot\\sexists\\s)?"
 | |
|             // The table name with optional database name, both enclosed in optional backticks
 | |
|             "(`?\\S+`?.)`?\\S+`?\\s"
 | |
|             // And finally the AS keyword
 | |
|             "as";
 | |
| 
 | |
|     return mxs_pcre2_simple_match(pattern, sql, 0, &err) == MXS_PCRE2_MATCH;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Detection of table alteration statements
 | |
|  * @param router Avro router instance
 | |
|  * @param ptr Pointer to statement
 | |
|  * @param len Statement length
 | |
|  * @return True if the statement alters a table
 | |
|  */
 | |
| bool is_alter_table_statement(pcre2_code* alter_table_re, char* ptr, size_t len)
 | |
| {
 | |
|     int rc = 0;
 | |
|     pcre2_match_data* mdata = pcre2_match_data_create_from_pattern(alter_table_re, NULL);
 | |
| 
 | |
|     if (mdata)
 | |
|     {
 | |
|         rc = pcre2_match(alter_table_re, (PCRE2_SPTR) ptr, len, 0, 0, mdata, NULL);
 | |
|         pcre2_match_data_free(mdata);
 | |
|     }
 | |
| 
 | |
|     return rc > 0;
 | |
| }
 | |
| 
 | |
| bool is_rename_table_statement(const char* sql)
 | |
| {
 | |
|     int err = 0;
 | |
|     const char* pattern = "(?i)^\\s*rename\\s*table\\s*";
 | |
|     return mxs_pcre2_simple_match(pattern, sql, 0, &err) == MXS_PCRE2_MATCH;
 | |
| }
 | |
| 
 | |
| /** Database name offset */
 | |
| #define DBNM_OFF 8
 | |
| 
 | |
| /** Varblock offset */
 | |
| #define VBLK_OFF 4 + 4 + 1 + 2
 | |
| 
 | |
| /** Post-header offset */
 | |
| #define PHDR_OFF 4 + 4 + 1 + 2 + 2
 | |
| 
 | |
| /**
 | |
|  * Save the CREATE TABLE statement to disk and replace older versions of the table
 | |
|  * in the router's hashtable.
 | |
|  * @param router Avro router instance
 | |
|  * @param created Created table
 | |
|  * @return False if an error occurred and true if successful
 | |
|  */
 | |
| bool Rpl::save_and_replace_table_create(STableCreateEvent created)
 | |
| {
 | |
|     std::string table_ident = created->id();
 | |
|     auto it = m_created_tables.find(table_ident);
 | |
| 
 | |
|     if (it != m_created_tables.end())
 | |
|     {
 | |
|         auto tm_it = m_table_maps.find(table_ident);
 | |
| 
 | |
|         if (tm_it != m_table_maps.end())
 | |
|         {
 | |
|             m_active_maps.erase(tm_it->second->id);
 | |
|             m_table_maps.erase(tm_it);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     created->version = ++m_versions[created->id()];
 | |
|     m_created_tables[table_ident] = created;
 | |
|     mxb_assert(created->columns.size() > 0);
 | |
|     return m_handler->create_table(created);
 | |
| }
 | |
| 
 | |
| bool Rpl::rename_table_create(STableCreateEvent created, const std::string& old_id)
 | |
| {
 | |
|     auto it = m_created_tables.find(old_id);
 | |
| 
 | |
|     if (it != m_created_tables.end())
 | |
|     {
 | |
|         auto tm_it = m_table_maps.find(old_id);
 | |
| 
 | |
|         if (tm_it != m_table_maps.end())
 | |
|         {
 | |
|             m_active_maps.erase(tm_it->second->id);
 | |
|             m_table_maps.erase(tm_it);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     m_created_tables.erase(old_id);
 | |
|     m_created_tables[created->id()] = created;
 | |
|     mxb_assert(created->columns.size() > 0);
 | |
|     return m_handler->create_table(created);
 | |
| }
 | |
| 
 | |
| void unify_whitespace(char* sql, int len)
 | |
| {
 | |
|     for (int i = 0; i < len; i++)
 | |
|     {
 | |
|         if (isspace(sql[i]) && sql[i] != ' ')
 | |
|         {
 | |
|             sql[i] = ' ';
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * A very simple function for stripping auto-generated executable comments
 | |
|  *
 | |
|  * Note that the string will not strip the trailing part of the comment, making
 | |
|  * the SQL invalid.
 | |
|  *
 | |
|  * @param sql String to modify
 | |
|  * @param len Pointer to current length of string, updated to new length if
 | |
|  *            @c sql is modified
 | |
|  */
 | |
| static void strip_executable_comments(char* sql, int* len)
 | |
| {
 | |
|     if (strncmp(sql, "/*!", 3) == 0 || strncmp(sql, "/*M!", 4) == 0)
 | |
|     {
 | |
|         // Executable comment, remove it
 | |
|         char* p = sql + 3;
 | |
|         if (*p == '!')
 | |
|         {
 | |
|             p++;
 | |
|         }
 | |
| 
 | |
|         // Skip the versioning part
 | |
|         while (*p && isdigit(*p))
 | |
|         {
 | |
|             p++;
 | |
|         }
 | |
| 
 | |
|         int n_extra = p - sql;
 | |
|         int new_len = *len - n_extra;
 | |
|         memmove(sql, sql + n_extra, new_len);
 | |
|         *len = new_len;
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * @brief Handling of query events
 | |
|  *
 | |
|  * @param router Avro router instance
 | |
|  * @param hdr Replication header
 | |
|  * @param pending_transaction Pointer where status of pending transaction is stored
 | |
|  * @param ptr Pointer to the start of the event payload
 | |
|  */
 | |
| void Rpl::handle_query_event(REP_HEADER* hdr, uint8_t* ptr)
 | |
| {
 | |
|     int dblen = ptr[DBNM_OFF];
 | |
|     int vblklen = gw_mysql_get_byte2(ptr + VBLK_OFF);
 | |
|     int len = hdr->event_size - BINLOG_EVENT_HDR_LEN - (PHDR_OFF + vblklen + 1 + dblen);
 | |
|     char* sql = (char*) ptr + PHDR_OFF + vblklen + 1 + dblen;
 | |
|     char db[dblen + 1];
 | |
|     memcpy(db, (char*) ptr + PHDR_OFF + vblklen, dblen);
 | |
|     db[dblen] = 0;
 | |
| 
 | |
|     size_t sqlsz = len, tmpsz = len;
 | |
|     char* tmp = static_cast<char*>(MXS_MALLOC(len + 1));
 | |
|     MXS_ABORT_IF_NULL(tmp);
 | |
|     remove_mysql_comments((const char**)&sql, &sqlsz, &tmp, &tmpsz);
 | |
|     sql = tmp;
 | |
|     len = tmpsz;
 | |
|     unify_whitespace(sql, len);
 | |
|     strip_executable_comments(sql, &len);
 | |
|     sql[len] = '\0';
 | |
| 
 | |
|     if (*sql == '\0')
 | |
|     {
 | |
|         MXS_FREE(tmp);
 | |
|         return;
 | |
|     }
 | |
| 
 | |
|     static bool warn_not_row_format = true;
 | |
| 
 | |
|     if (warn_not_row_format)
 | |
|     {
 | |
|         GWBUF* buffer = gwbuf_alloc(len + 5);
 | |
|         gw_mysql_set_byte3(GWBUF_DATA(buffer), len + 1);
 | |
|         GWBUF_DATA(buffer)[4] = 0x03;
 | |
|         memcpy(GWBUF_DATA(buffer) + 5, sql, len);
 | |
|         qc_query_op_t op = qc_get_operation(buffer);
 | |
|         gwbuf_free(buffer);
 | |
| 
 | |
|         if (op == QUERY_OP_UPDATE || op == QUERY_OP_INSERT || op == QUERY_OP_DELETE)
 | |
|         {
 | |
|             MXS_WARNING("Possible STATEMENT or MIXED format binary log. Check that "
 | |
|                         "'binlog_format' is set to ROW on the master.");
 | |
|             warn_not_row_format = false;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     char ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
 | |
|     read_table_identifier(db, sql, sql + len, ident, sizeof(ident));
 | |
| 
 | |
|     if (is_create_table_statement(m_create_table_re, sql, len))
 | |
|     {
 | |
|         STableCreateEvent created;
 | |
| 
 | |
|         if (is_create_like_statement(sql, len))
 | |
|         {
 | |
|             created = table_create_copy(sql, len, db);
 | |
|         }
 | |
|         else if (is_create_as_statement(sql, len))
 | |
|         {
 | |
|             static bool warn_create_as = true;
 | |
|             if (warn_create_as)
 | |
|             {
 | |
|                 MXS_WARNING("`CREATE TABLE AS` is not yet supported, ignoring events to this table: %.*s",
 | |
|                             len,
 | |
|                             sql);
 | |
|                 warn_create_as = false;
 | |
|             }
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             created = table_create_alloc(ident, sql, len);
 | |
|         }
 | |
| 
 | |
|         if (created && !save_and_replace_table_create(created))
 | |
|         {
 | |
|             MXS_ERROR("Failed to save statement to disk: %.*s", len, sql);
 | |
|         }
 | |
|     }
 | |
|     else if (is_alter_table_statement(m_alter_table_re, sql, len))
 | |
|     {
 | |
|         auto it = m_created_tables.find(ident);
 | |
| 
 | |
|         if (it != m_created_tables.end())
 | |
|         {
 | |
|             table_create_alter(it->second, sql, sql + len);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             MXS_ERROR("Alter statement to table '%s' has no preceding create statement.", ident);
 | |
|         }
 | |
|     }
 | |
|     else if (is_rename_table_statement(sql))
 | |
|     {
 | |
|         table_create_rename(db, sql, sql + len);
 | |
|     }
 | |
| 
 | |
|     MXS_FREE(tmp);
 | |
| }
 | |
| 
 | |
| void Rpl::handle_event(REP_HEADER hdr, uint8_t* ptr)
 | |
| {
 | |
|     if (m_binlog_checksum)
 | |
|     {
 | |
|         // We don't care about the checksum at this point so we ignore it
 | |
|         hdr.event_size -= 4;
 | |
|     }
 | |
| 
 | |
|     // The following events are related to the actual data
 | |
|     if (hdr.event_type == FORMAT_DESCRIPTION_EVENT)
 | |
|     {
 | |
|         const int BLRM_FDE_EVENT_TYPES_OFFSET = 2 + 50 + 4 + 1;
 | |
|         const int FDE_EXTRA_BYTES = 5;
 | |
|         int event_header_length = ptr[BLRM_FDE_EVENT_TYPES_OFFSET - 1];
 | |
|         int n_events = hdr.event_size - event_header_length - BLRM_FDE_EVENT_TYPES_OFFSET - FDE_EXTRA_BYTES;
 | |
|         uint8_t* checksum = ptr + hdr.event_size - event_header_length - FDE_EXTRA_BYTES;
 | |
|         m_event_type_hdr_lens.assign(ptr, ptr + n_events);
 | |
|         m_event_types = n_events;
 | |
|         m_binlog_checksum = checksum[0];
 | |
|     }
 | |
|     else if (hdr.event_type == TABLE_MAP_EVENT)
 | |
|     {
 | |
|         handle_table_map_event(&hdr, ptr);
 | |
|     }
 | |
|     else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1)
 | |
|              || (hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2))
 | |
|     {
 | |
|         handle_row_event(&hdr, ptr);
 | |
|     }
 | |
|     else if (hdr.event_type == GTID_EVENT)
 | |
|     {
 | |
|         m_gtid.extract(hdr, ptr);
 | |
|     }
 | |
|     else if (hdr.event_type == QUERY_EVENT)
 | |
|     {
 | |
|         handle_query_event(&hdr, ptr);
 | |
|     }
 | |
| }
 | 
