From f9732d7041bc3008cea52c5e118017509d078178 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Sat, 17 Dec 2016 12:15:16 +0200 Subject: [PATCH] Fix DECIMAL handling in Avrorouter The DECIMAL value type is now properly handled in Avrorouter. It is processed into an Avro double value when before it was ignored and replaced with a zero integer. Backported to the 2.0 branch. --- server/core/mysql_binlog.c | 88 ++++++++++++++++++++++- server/include/mysql_binlog.h | 9 +-- server/modules/routing/avro/avro_rbr.c | 23 +----- server/modules/routing/avro/avro_schema.c | 2 +- 4 files changed, 94 insertions(+), 28 deletions(-) diff --git a/server/core/mysql_binlog.c b/server/core/mysql_binlog.c index 597c7abed..0fa01d866 100644 --- a/server/core/mysql_binlog.c +++ b/server/core/mysql_binlog.c @@ -24,6 +24,7 @@ #include #include #include +#include /** * @brief Convert a table column type to a string @@ -357,7 +358,7 @@ static void unpack_date(uint8_t *ptr, struct tm *dest) * @param metadata Pointer to field metadata * @return Length of the processed field in bytes */ -uint64_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest) +size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest) { memcpy(dest, ptr, metadata[1]); return metadata[1]; @@ -380,7 +381,7 @@ uint64_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest) * @param dest Destination where the value is stored * @return Length of the processed field in bytes */ -uint64_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, +size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, uint32_t curr_col_index, uint8_t *metadata, uint64_t *dest) { if (metadata[1]) @@ -438,7 +439,7 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals) * @param val Extracted packed value * @param tm Pointer where the unpacked temporal value is stored */ -uint64_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, struct tm *tm) +size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, struct tm *tm) { switch (type) { @@ -553,3 +554,84 @@ size_t unpack_numeric_field(uint8_t *src, uint8_t type, uint8_t *metadata, uint8 memcpy(dest, src, size); return size; } + +static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes) +{ + uint64_t val = 0; + + switch (bytes) + { + case 1: + val = ptr[0]; + break; + case 2: + val = ptr[1] | ((uint64_t)(ptr[0]) << 8); + break; + case 3: + val = (uint64_t)ptr[2] | ((uint64_t)ptr[1] << 8) | ((uint64_t)ptr[0] << 16); + break; + case 4: + val = (uint64_t)ptr[3] | ((uint64_t)ptr[2] << 8) | ((uint64_t)ptr[1] << 16) | ((uint64_t)ptr[0] << 24); + break; + case 5: + val = (uint64_t)ptr[4] | ((uint64_t)ptr[3] << 8) | ((uint64_t)ptr[2] << 16) | ((uint64_t)ptr[1] << 24) | ((uint64_t)ptr[0] << 32); + break; + case 6: + val = (uint64_t)ptr[5] | ((uint64_t)ptr[4] << 8) | ((uint64_t)ptr[3] << 16) | ((uint64_t)ptr[2] << 24) | ((uint64_t)ptr[1] << 32) | ((uint64_t)ptr[0] << 40); + break; + case 7: + val = (uint64_t)ptr[6] | ((uint64_t)ptr[5] << 8) | ((uint64_t)ptr[4] << 16) | ((uint64_t)ptr[3] << 24) | ((uint64_t)ptr[2] << 32) | ((uint64_t)ptr[1] << 40) | ((uint64_t)ptr[0] << 48); + break; + case 8: + val = (uint64_t)ptr[7] | ((uint64_t)ptr[6] << 8) | ((uint64_t)ptr[5] << 16) | ((uint64_t)ptr[4] << 24) | ((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) | ((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56); + break; + } + + return val; +} + +size_t unpack_decimal_field(uint8_t *ptr, uint8_t *metadata, double *val_float) +{ + const int dec_dig = 9; + int precision = metadata[0]; + int decimals = metadata[1]; + int dig_bytes[] = {0, 1, 1, 2, 2, 3, 3, 4, 4, 4}; + int ipart = precision - decimals; + int ipart1 = ipart / dec_dig; + int fpart1 = decimals / dec_dig; + int ipart2 = ipart - ipart1 * dec_dig; + int fpart2 = decimals - fpart1 * dec_dig; + int ibytes = ipart1 * 4 + dig_bytes[ipart2]; + int fbytes = fpart1 * 4 + dig_bytes[fpart2]; + + /** Remove the sign bit and store it locally */ + bool signed_int = (ptr[0] & 0x80); + + if (!signed_int) + { + ptr[0] |= 0x80; + + for (int i = 0; i < ibytes; i++) + { + ptr[i] = ~ptr[i]; + } + + for (int i = 0; i < fbytes; i++) + { + ptr[i + ibytes] = ~ptr[i + ibytes]; + } + } + + int64_t val_i = unpack_bytes(ptr, ibytes); + int64_t val_f = fbytes ? unpack_bytes(ptr + ibytes, fbytes) : 0; + + if (!signed_int) + { + val_i = -val_i; + val_f = -val_f; + } + + *val_float = (double)val_i + ((double)val_f / (pow(10.0, decimals))); + + return ibytes + fbytes; +} diff --git a/server/include/mysql_binlog.h b/server/include/mysql_binlog.h index 8c3c90cf3..b1a81eaa2 100644 --- a/server/include/mysql_binlog.h +++ b/server/include/mysql_binlog.h @@ -83,11 +83,12 @@ bool column_is_decimal(uint8_t type); bool fixed_string_is_enum(uint8_t type); /** Value unpacking */ -uint64_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, struct tm *tm); -uint64_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest); -uint64_t unpack_numeric_field(uint8_t *ptr, uint8_t type, uint8_t* metadata, uint8_t* val); -uint64_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, +size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, struct tm *tm); +size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest); +size_t unpack_numeric_field(uint8_t *ptr, uint8_t type, uint8_t* metadata, uint8_t* val); +size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, uint32_t curr_col_index, uint8_t *metadata, uint64_t *dest); +size_t unpack_decimal_field(uint8_t *ptr, uint8_t *metadata, double *val_float); void format_temporal_value(char *str, size_t size, uint8_t type, struct tm *tm); diff --git a/server/modules/routing/avro/avro_rbr.c b/server/modules/routing/avro/avro_rbr.c index 730c3803b..148c6932a 100644 --- a/server/modules/routing/avro/avro_rbr.c +++ b/server/modules/routing/avro/avro_rbr.c @@ -558,26 +558,9 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value } else if (column_is_decimal(map->column_types[i])) { - const int dec_dig = 9; - int precision = metadata[metadata_offset]; - int decimals = metadata[metadata_offset + 1]; - int dig_bytes[] = {0, 1, 1, 2, 2, 3, 3, 4, 4, 4}; - int ipart = precision - decimals; - int ipart1 = ipart / dec_dig; - int fpart1 = decimals / dec_dig; - int ipart2 = ipart - ipart1 * dec_dig; - int fpart2 = decimals - fpart1 * dec_dig; - int ibytes = ipart1 * 4 + dig_bytes[ipart2]; - int fbytes = fpart1 * 4 + dig_bytes[fpart2]; - ptr += ibytes + fbytes; - - // TODO: Add support for DECIMAL - if (!warn_decimal) - { - warn_decimal = true; - MXS_WARNING("DECIMAL is not currently supported, values are stored as 0."); - } - avro_value_set_int(&field, 0); + double f_value = 0.0; + ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value); + avro_value_set_double(&field, f_value); } else if (column_is_variable_string(map->column_types[i])) { diff --git a/server/modules/routing/avro/avro_schema.c b/server/modules/routing/avro/avro_schema.c index 9595e83b7..dd75e9d7d 100644 --- a/server/modules/routing/avro/avro_schema.c +++ b/server/modules/routing/avro/avro_schema.c @@ -44,7 +44,6 @@ static const char* column_type_to_avro_type(uint8_t type) { switch (type) { - case TABLE_COL_TYPE_NEWDECIMAL: case TABLE_COL_TYPE_TINY: case TABLE_COL_TYPE_SHORT: case TABLE_COL_TYPE_LONG: @@ -56,6 +55,7 @@ static const char* column_type_to_avro_type(uint8_t type) return "float"; case TABLE_COL_TYPE_DOUBLE: + case TABLE_COL_TYPE_NEWDECIMAL: return "double"; case TABLE_COL_TYPE_NULL: