diff --git a/include/maxscale/mysql_binlog.h b/include/maxscale/mysql_binlog.h index ecf2f3068..963d274ad 100644 --- a/include/maxscale/mysql_binlog.h +++ b/include/maxscale/mysql_binlog.h @@ -85,11 +85,12 @@ bool column_is_decimal(uint8_t type); bool fixed_string_is_enum(uint8_t type); /** Value unpacking */ -uint64_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, struct tm *tm); -uint64_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest); -uint64_t unpack_numeric_field(uint8_t *ptr, uint8_t type, uint8_t* metadata, uint8_t* val); -uint64_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, +size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, struct tm *tm); +size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest); +size_t unpack_numeric_field(uint8_t *ptr, uint8_t type, uint8_t* metadata, uint8_t* val); +size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, uint32_t curr_col_index, uint8_t *metadata, uint64_t *dest); +size_t unpack_decimal_field(uint8_t *ptr, uint8_t *metadata, double *val_float); void format_temporal_value(char *str, size_t size, uint8_t type, struct tm *tm); diff --git a/server/core/mysql_binlog.c b/server/core/mysql_binlog.c index 0087bf19b..668c54778 100644 --- a/server/core/mysql_binlog.c +++ b/server/core/mysql_binlog.c @@ -23,6 +23,7 @@ #include #include #include +#include /** * @brief Convert a table column type to a string @@ -354,7 +355,7 @@ static void unpack_date(uint8_t *ptr, struct tm *dest) * @param metadata Pointer to field metadata * @return Length of the processed field in bytes */ -uint64_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest) +size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest) { memcpy(dest, ptr, metadata[1]); return metadata[1]; @@ -377,7 +378,7 @@ uint64_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest) * @param dest Destination where the value is stored * @return Length of the processed field in bytes */ -uint64_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, +size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count, uint32_t curr_col_index, uint8_t *metadata, uint64_t *dest) { if (metadata[1]) @@ -435,7 +436,7 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals) * @param val Extracted packed value * @param tm Pointer where the unpacked temporal value is stored */ -uint64_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, struct tm *tm) +size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, struct tm *tm) { switch (type) { @@ -550,3 +551,84 @@ size_t unpack_numeric_field(uint8_t *src, uint8_t type, uint8_t *metadata, uint8 memcpy(dest, src, size); return size; } + +static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes) +{ + uint64_t val = 0; + + switch (bytes) + { + case 1: + val = ptr[0]; + break; + case 2: + val = ptr[1] | ((uint64_t)(ptr[0]) << 8); + break; + case 3: + val = (uint64_t)ptr[2] | ((uint64_t)ptr[1] << 8) | ((uint64_t)ptr[0] << 16); + break; + case 4: + val = (uint64_t)ptr[3] | ((uint64_t)ptr[2] << 8) | ((uint64_t)ptr[1] << 16) | ((uint64_t)ptr[0] << 24); + break; + case 5: + val = (uint64_t)ptr[4] | ((uint64_t)ptr[3] << 8) | ((uint64_t)ptr[2] << 16) | ((uint64_t)ptr[1] << 24) | ((uint64_t)ptr[0] << 32); + break; + case 6: + val = (uint64_t)ptr[5] | ((uint64_t)ptr[4] << 8) | ((uint64_t)ptr[3] << 16) | ((uint64_t)ptr[2] << 24) | ((uint64_t)ptr[1] << 32) | ((uint64_t)ptr[0] << 40); + break; + case 7: + val = (uint64_t)ptr[6] | ((uint64_t)ptr[5] << 8) | ((uint64_t)ptr[4] << 16) | ((uint64_t)ptr[3] << 24) | ((uint64_t)ptr[2] << 32) | ((uint64_t)ptr[1] << 40) | ((uint64_t)ptr[0] << 48); + break; + case 8: + val = (uint64_t)ptr[7] | ((uint64_t)ptr[6] << 8) | ((uint64_t)ptr[5] << 16) | ((uint64_t)ptr[4] << 24) | ((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) | ((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56); + break; + } + + return val; +} + +size_t unpack_decimal_field(uint8_t *ptr, uint8_t *metadata, double *val_float) +{ + const int dec_dig = 9; + int precision = metadata[0]; + int decimals = metadata[1]; + int dig_bytes[] = {0, 1, 1, 2, 2, 3, 3, 4, 4, 4}; + int ipart = precision - decimals; + int ipart1 = ipart / dec_dig; + int fpart1 = decimals / dec_dig; + int ipart2 = ipart - ipart1 * dec_dig; + int fpart2 = decimals - fpart1 * dec_dig; + int ibytes = ipart1 * 4 + dig_bytes[ipart2]; + int fbytes = fpart1 * 4 + dig_bytes[fpart2]; + + /** Remove the sign bit and store it locally */ + bool signed_int = (ptr[0] & 0x80); + + if (!signed_int) + { + ptr[0] |= 0x80; + + for (int i = 0; i < ibytes; i++) + { + ptr[i] = ~ptr[i]; + } + + for (int i = 0; i < fbytes; i++) + { + ptr[i + ibytes] = ~ptr[i + ibytes]; + } + } + + int64_t val_i = unpack_bytes(ptr, ibytes); + int64_t val_f = fbytes ? unpack_bytes(ptr + ibytes, fbytes) : 0; + + if (!signed_int) + { + val_i = -val_i; + val_f = -val_f; + } + + *val_float = (double)val_i + ((double)val_f / (pow(10.0, decimals))); + + return ibytes + fbytes; +} diff --git a/server/modules/routing/avrorouter/avro_rbr.c b/server/modules/routing/avrorouter/avro_rbr.c index 38dd47655..59af65190 100644 --- a/server/modules/routing/avrorouter/avro_rbr.c +++ b/server/modules/routing/avrorouter/avro_rbr.c @@ -539,26 +539,9 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value } else if (column_is_decimal(map->column_types[i])) { - const int dec_dig = 9; - int precision = metadata[metadata_offset]; - int decimals = metadata[metadata_offset + 1]; - int dig_bytes[] = {0, 1, 1, 2, 2, 3, 3, 4, 4, 4}; - int ipart = precision - decimals; - int ipart1 = ipart / dec_dig; - int fpart1 = decimals / dec_dig; - int ipart2 = ipart - ipart1 * dec_dig; - int fpart2 = decimals - fpart1 * dec_dig; - int ibytes = ipart1 * 4 + dig_bytes[ipart2]; - int fbytes = fpart1 * 4 + dig_bytes[fpart2]; - ptr += ibytes + fbytes; - - // TODO: Add support for DECIMAL - if (!warn_decimal) - { - warn_decimal = true; - MXS_WARNING("DECIMAL is not currently supported, values are stored as 0."); - } - avro_value_set_int(&field, 0); + double f_value = 0.0; + ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value); + avro_value_set_double(&field, f_value); } else if (column_is_variable_string(map->column_types[i])) { diff --git a/server/modules/routing/avrorouter/avro_schema.c b/server/modules/routing/avrorouter/avro_schema.c index 48739e4b0..80db99f9d 100644 --- a/server/modules/routing/avrorouter/avro_schema.c +++ b/server/modules/routing/avrorouter/avro_schema.c @@ -44,7 +44,6 @@ static const char* column_type_to_avro_type(uint8_t type) { switch (type) { - case TABLE_COL_TYPE_NEWDECIMAL: case TABLE_COL_TYPE_TINY: case TABLE_COL_TYPE_SHORT: case TABLE_COL_TYPE_LONG: @@ -56,6 +55,7 @@ static const char* column_type_to_avro_type(uint8_t type) return "float"; case TABLE_COL_TYPE_DOUBLE: + case TABLE_COL_TYPE_NEWDECIMAL: return "double"; case TABLE_COL_TYPE_NULL: