From 72d9e41ccb93b87598a5841e74d16e1f1075f6f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Wed, 25 Sep 2019 13:38:53 +0300 Subject: [PATCH] MXS-2263: Process unsigned integer columns correctly The unsigned integers that would previously be interpreted as negative values are now correctly converted into their corresponding avro values. Due to a limitation in the Avro file format, 64-bit unsigned integers cannot be represented in their unsigned form. Since both the signed and unsigned versions of a 32-bit integer cannot fit into a single Avro int, the type for these was changed to long. This is a backwards incompatible change which means files generated with older versions will not convert unsigned values correctly. --- .../routing/avrorouter/avro_converter.cc | 29 ++++--- .../routing/avrorouter/avro_converter.hh | 14 ++-- server/modules/routing/avrorouter/avro_rbr.cc | 83 ++++++++++++------- server/modules/routing/avrorouter/rpl.hh | 18 ++-- 4 files changed, 88 insertions(+), 56 deletions(-) diff --git a/server/modules/routing/avrorouter/avro_converter.cc b/server/modules/routing/avrorouter/avro_converter.cc index ce924aa33..1f9aeaa7d 100644 --- a/server/modules/routing/avrorouter/avro_converter.cc +++ b/server/modules/routing/avrorouter/avro_converter.cc @@ -100,9 +100,8 @@ static const char* column_type_to_avro_type(uint8_t type) { case TABLE_COL_TYPE_TINY: case TABLE_COL_TYPE_SHORT: - case TABLE_COL_TYPE_LONG: - case TABLE_COL_TYPE_INT24: case TABLE_COL_TYPE_BIT: + case TABLE_COL_TYPE_INT24: return "int"; case TABLE_COL_TYPE_FLOAT: @@ -115,6 +114,7 @@ static const char* column_type_to_avro_type(uint8_t type) case TABLE_COL_TYPE_NULL: return "null"; + case TABLE_COL_TYPE_LONG: case TABLE_COL_TYPE_LONGLONG: return "long"; @@ -414,43 +414,52 @@ bool AvroConverter::commit(const gtid_pos_t& gtid) return rval; } -void AvroConverter::column(int i, int32_t value) +void AvroConverter::column_int(int i, int32_t value) { set_active(i); avro_value_set_int(&m_field, value); } -void AvroConverter::column(int i, int64_t value) +void AvroConverter::column_long(int i, int64_t value) { set_active(i); - avro_value_set_long(&m_field, value); + + if (avro_value_get_type(&m_field) == AVRO_INT32) + { + // Pre-2.4.3 versions use int for 32-bit integers whereas 2.4.3 and newer use long + avro_value_set_int(&m_field, value); + } + else + { + avro_value_set_long(&m_field, value); + } } -void AvroConverter::column(int i, float value) +void AvroConverter::column_float(int i, float value) { set_active(i); avro_value_set_float(&m_field, value); } -void AvroConverter::column(int i, double value) +void AvroConverter::column_double(int i, double value) { set_active(i); avro_value_set_double(&m_field, value); } -void AvroConverter::column(int i, std::string value) +void AvroConverter::column_string(int i, const std::string& value) { set_active(i); avro_value_set_string(&m_field, value.c_str()); } -void AvroConverter::column(int i, uint8_t* value, int len) +void AvroConverter::column_bytes(int i, uint8_t* value, int len) { set_active(i); avro_value_set_bytes(&m_field, value, len); } -void AvroConverter::column(int i) +void AvroConverter::column_null(int i) { set_active(i); avro_value_set_branch(&m_union_value, 0, &m_field); diff --git a/server/modules/routing/avrorouter/avro_converter.hh b/server/modules/routing/avrorouter/avro_converter.hh index f5c0f73f5..683bcdf22 100644 --- a/server/modules/routing/avrorouter/avro_converter.hh +++ b/server/modules/routing/avrorouter/avro_converter.hh @@ -53,13 +53,13 @@ public: void flush_tables(); void prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type); bool commit(const gtid_pos_t& gtid); - void column(int i, int32_t value); - void column(int i, int64_t value); - void column(int i, float value); - void column(int i, double value); - void column(int i, std::string value); - void column(int i, uint8_t* value, int len); - void column(int i); + void column_int(int i, int32_t value); + void column_long(int i, int64_t value); + void column_float(int i, float value); + void column_double(int i, double value); + void column_string(int i, const std::string& value); + void column_bytes(int i, uint8_t* value, int len); + void column_null(int i); private: avro_value_iface_t* m_writer_iface; diff --git a/server/modules/routing/avrorouter/avro_rbr.cc b/server/modules/routing/avrorouter/avro_rbr.cc index 1fac9bbb4..b207d3ed5 100644 --- a/server/modules/routing/avrorouter/avro_rbr.cc +++ b/server/modules/routing/avrorouter/avro_rbr.cc @@ -76,56 +76,78 @@ void set_numeric_field_value(SRowEventHandler& conv, int idx, uint8_t type, uint8_t* metadata, - uint8_t* value) + uint8_t* value, + bool is_unsigned) { switch (type) { case TABLE_COL_TYPE_TINY: + if (is_unsigned) { - char c = *value; - conv->column(idx, c); - break; + uint8_t c = *value; + conv->column_int(idx, c); } + else + { + int8_t c = *value; + conv->column_int(idx, c); + } + break; case TABLE_COL_TYPE_SHORT: + if (is_unsigned) { - short s = gw_mysql_get_byte2(value); - conv->column(idx, s); - break; + uint16_t s = gw_mysql_get_byte2(value); + conv->column_int(idx, s); } + else + { + int16_t s = gw_mysql_get_byte2(value); + conv->column_int(idx, s); + } + break; case TABLE_COL_TYPE_INT24: + if (is_unsigned) { - int x = gw_mysql_get_byte3(value); + uint32_t x = gw_mysql_get_byte3(value); + conv->column_int(idx, x); + } + else + { + int32_t x = gw_mysql_get_byte3(value); if (x & 0x800000) { x = -((0xffffff & (~x)) + 1); } - conv->column(idx, x); - break; + conv->column_int(idx, (int64_t)x); } + break; case TABLE_COL_TYPE_LONG: + if (is_unsigned) { - int x = gw_mysql_get_byte4(value); - conv->column(idx, x); - break; + uint32_t x = gw_mysql_get_byte4(value); + conv->column_long(idx, x); } + else + { + int32_t x = gw_mysql_get_byte4(value); + conv->column_long(idx, x); + } + break; case TABLE_COL_TYPE_LONGLONG: - { - long l = gw_mysql_get_byte8(value); - conv->column(idx, l); - break; - } + conv->column_long(idx, gw_mysql_get_byte8(value)); + break; case TABLE_COL_TYPE_FLOAT: { float f = 0; memcpy(&f, value, 4); - conv->column(idx, f); + conv->column_float(idx, f); break; } @@ -133,7 +155,7 @@ void set_numeric_field_value(SRowEventHandler& conv, { double d = 0; memcpy(&d, value, 8); - conv->column(idx, d); + conv->column_double(idx, d); break; } @@ -270,7 +292,7 @@ uint8_t* process_row_event_data(STableMapEvent map, if (bit_is_set(null_bitmap, ncolumns, i)) { sprintf(trace[i], "[%ld] NULL", i); - conv->column(i); + conv->column_null(i); } else if (column_is_fixed_string(map->column_types[i])) { @@ -282,7 +304,7 @@ uint8_t* process_row_event_data(STableMapEvent map, uint64_t bytes = unpack_enum(ptr, &metadata[metadata_offset], val); char strval[bytes * 2 + 1]; gw_bin2hex(strval, val, bytes); - conv->column(i, strval); + conv->column_string(i, strval); sprintf(trace[i], "[%ld] ENUM: %lu bytes", i, bytes); ptr += bytes; check_overflow(ptr <= end); @@ -319,7 +341,7 @@ uint8_t* process_row_event_data(STableMapEvent map, char str[bytes + 1]; memcpy(str, ptr, bytes); str[bytes] = '\0'; - conv->column(i, str); + conv->column_string(i, str); ptr += bytes; check_overflow(ptr <= end); } @@ -336,7 +358,7 @@ uint8_t* process_row_event_data(STableMapEvent map, warn_bit = true; MXS_WARNING("BIT is not currently supported, values are stored as 0."); } - conv->column(i, 0); + conv->column_int(i, 0); sprintf(trace[i], "[%ld] BIT", i); ptr += bytes; check_overflow(ptr <= end); @@ -345,7 +367,7 @@ uint8_t* process_row_event_data(STableMapEvent map, { double f_value = 0.0; ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value); - conv->column(i, f_value); + conv->column_double(i, f_value); sprintf(trace[i], "[%ld] DECIMAL", i); check_overflow(ptr <= end); } @@ -369,7 +391,7 @@ uint8_t* process_row_event_data(STableMapEvent map, memcpy(buf, ptr, sz); buf[sz] = '\0'; ptr += sz; - conv->column(i, buf); + conv->column_string(i, buf); check_overflow(ptr <= end); } else if (column_is_blob(map->column_types[i])) @@ -381,13 +403,13 @@ uint8_t* process_row_event_data(STableMapEvent map, sprintf(trace[i], "[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len); if (len) { - conv->column(i, ptr, len); + conv->column_bytes(i, ptr, len); ptr += len; } else { uint8_t nullvalue = 0; - conv->column(i, &nullvalue, 1); + conv->column_bytes(i, &nullvalue, 1); } check_overflow(ptr <= end); } @@ -401,7 +423,7 @@ uint8_t* process_row_event_data(STableMapEvent map, create->columns[i].length, &tm); format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm); - conv->column(i, buf); + conv->column_string(i, buf); sprintf(trace[i], "[%ld] %s: %s", i, column_type_to_string(map->column_types[i]), buf); check_overflow(ptr <= end); } @@ -414,7 +436,8 @@ uint8_t* process_row_event_data(STableMapEvent map, map->column_types[i], &metadata[metadata_offset], lval); - set_numeric_field_value(conv, i, map->column_types[i], &metadata[metadata_offset], lval); + set_numeric_field_value(conv, i, map->column_types[i], &metadata[metadata_offset], lval, + create->columns[i].is_unsigned); sprintf(trace[i], "[%ld] %s", i, column_type_to_string(map->column_types[i])); check_overflow(ptr <= end); } diff --git a/server/modules/routing/avrorouter/rpl.hh b/server/modules/routing/avrorouter/rpl.hh index 0516d67bd..4ecf61653 100644 --- a/server/modules/routing/avrorouter/rpl.hh +++ b/server/modules/routing/avrorouter/rpl.hh @@ -202,26 +202,26 @@ public: // Called once all columns are processed virtual bool commit(const gtid_pos_t& gtid) = 0; - // 32-bit integer handler - virtual void column(int i, int32_t value) = 0; + // Integer handler for short types (less than 32 bits) + virtual void column_int(int i, int32_t value) = 0; - // 64-bit integer handler - virtual void column(int i, int64_t value) = 0; + // Integer handler for long integer types + virtual void column_long(int i, int64_t value) = 0; // Float handler - virtual void column(int i, float value) = 0; + virtual void column_float(int i, float value) = 0; // Double handler - virtual void column(int i, double value) = 0; + virtual void column_double(int i, double value) = 0; // String handler - virtual void column(int i, std::string value) = 0; + virtual void column_string(int i, const std::string& value) = 0; // Bytes handler - virtual void column(int i, uint8_t* value, int len) = 0; + virtual void column_bytes(int i, uint8_t* value, int len) = 0; // Empty (NULL) value type handler - virtual void column(int i) = 0; + virtual void column_null(int i) = 0; }; typedef std::auto_ptr SRowEventHandler;