Fix DECIMAL handling in Avrorouter
The DECIMAL value type is now properly handled in Avrorouter. It is processed into an Avro double value when before it was ignored and replaced with a zero integer. Backported to the 2.0 branch.
This commit is contained in:
parent
4bd743d3ce
commit
f9732d7041
@ -24,6 +24,7 @@
|
||||
#include <users.h>
|
||||
#include <dbusers.h>
|
||||
#include <strings.h>
|
||||
#include <math.h>
|
||||
|
||||
/**
|
||||
* @brief Convert a table column type to a string
|
||||
@ -357,7 +358,7 @@ static void unpack_date(uint8_t *ptr, struct tm *dest)
|
||||
* @param metadata Pointer to field metadata
|
||||
* @return Length of the processed field in bytes
|
||||
*/
|
||||
uint64_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest)
|
||||
size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest)
|
||||
{
|
||||
memcpy(dest, ptr, metadata[1]);
|
||||
return metadata[1];
|
||||
@ -380,7 +381,7 @@ uint64_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest)
|
||||
* @param dest Destination where the value is stored
|
||||
* @return Length of the processed field in bytes
|
||||
*/
|
||||
uint64_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count,
|
||||
size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count,
|
||||
uint32_t curr_col_index, uint8_t *metadata, uint64_t *dest)
|
||||
{
|
||||
if (metadata[1])
|
||||
@ -438,7 +439,7 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals)
|
||||
* @param val Extracted packed value
|
||||
* @param tm Pointer where the unpacked temporal value is stored
|
||||
*/
|
||||
uint64_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, struct tm *tm)
|
||||
size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, struct tm *tm)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
@ -553,3 +554,84 @@ size_t unpack_numeric_field(uint8_t *src, uint8_t type, uint8_t *metadata, uint8
|
||||
memcpy(dest, src, size);
|
||||
return size;
|
||||
}
|
||||
|
||||
static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes)
|
||||
{
|
||||
uint64_t val = 0;
|
||||
|
||||
switch (bytes)
|
||||
{
|
||||
case 1:
|
||||
val = ptr[0];
|
||||
break;
|
||||
case 2:
|
||||
val = ptr[1] | ((uint64_t)(ptr[0]) << 8);
|
||||
break;
|
||||
case 3:
|
||||
val = (uint64_t)ptr[2] | ((uint64_t)ptr[1] << 8) | ((uint64_t)ptr[0] << 16);
|
||||
break;
|
||||
case 4:
|
||||
val = (uint64_t)ptr[3] | ((uint64_t)ptr[2] << 8) | ((uint64_t)ptr[1] << 16) | ((uint64_t)ptr[0] << 24);
|
||||
break;
|
||||
case 5:
|
||||
val = (uint64_t)ptr[4] | ((uint64_t)ptr[3] << 8) | ((uint64_t)ptr[2] << 16) | ((uint64_t)ptr[1] << 24) | ((uint64_t)ptr[0] << 32);
|
||||
break;
|
||||
case 6:
|
||||
val = (uint64_t)ptr[5] | ((uint64_t)ptr[4] << 8) | ((uint64_t)ptr[3] << 16) | ((uint64_t)ptr[2] << 24) | ((uint64_t)ptr[1] << 32) | ((uint64_t)ptr[0] << 40);
|
||||
break;
|
||||
case 7:
|
||||
val = (uint64_t)ptr[6] | ((uint64_t)ptr[5] << 8) | ((uint64_t)ptr[4] << 16) | ((uint64_t)ptr[3] << 24) | ((uint64_t)ptr[2] << 32) | ((uint64_t)ptr[1] << 40) | ((uint64_t)ptr[0] << 48);
|
||||
break;
|
||||
case 8:
|
||||
val = (uint64_t)ptr[7] | ((uint64_t)ptr[6] << 8) | ((uint64_t)ptr[5] << 16) | ((uint64_t)ptr[4] << 24) | ((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) | ((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56);
|
||||
break;
|
||||
}
|
||||
|
||||
return val;
|
||||
}
|
||||
|
||||
size_t unpack_decimal_field(uint8_t *ptr, uint8_t *metadata, double *val_float)
|
||||
{
|
||||
const int dec_dig = 9;
|
||||
int precision = metadata[0];
|
||||
int decimals = metadata[1];
|
||||
int dig_bytes[] = {0, 1, 1, 2, 2, 3, 3, 4, 4, 4};
|
||||
int ipart = precision - decimals;
|
||||
int ipart1 = ipart / dec_dig;
|
||||
int fpart1 = decimals / dec_dig;
|
||||
int ipart2 = ipart - ipart1 * dec_dig;
|
||||
int fpart2 = decimals - fpart1 * dec_dig;
|
||||
int ibytes = ipart1 * 4 + dig_bytes[ipart2];
|
||||
int fbytes = fpart1 * 4 + dig_bytes[fpart2];
|
||||
|
||||
/** Remove the sign bit and store it locally */
|
||||
bool signed_int = (ptr[0] & 0x80);
|
||||
|
||||
if (!signed_int)
|
||||
{
|
||||
ptr[0] |= 0x80;
|
||||
|
||||
for (int i = 0; i < ibytes; i++)
|
||||
{
|
||||
ptr[i] = ~ptr[i];
|
||||
}
|
||||
|
||||
for (int i = 0; i < fbytes; i++)
|
||||
{
|
||||
ptr[i + ibytes] = ~ptr[i + ibytes];
|
||||
}
|
||||
}
|
||||
|
||||
int64_t val_i = unpack_bytes(ptr, ibytes);
|
||||
int64_t val_f = fbytes ? unpack_bytes(ptr + ibytes, fbytes) : 0;
|
||||
|
||||
if (!signed_int)
|
||||
{
|
||||
val_i = -val_i;
|
||||
val_f = -val_f;
|
||||
}
|
||||
|
||||
*val_float = (double)val_i + ((double)val_f / (pow(10.0, decimals)));
|
||||
|
||||
return ibytes + fbytes;
|
||||
}
|
||||
|
@ -83,11 +83,12 @@ bool column_is_decimal(uint8_t type);
|
||||
bool fixed_string_is_enum(uint8_t type);
|
||||
|
||||
/** Value unpacking */
|
||||
uint64_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, struct tm *tm);
|
||||
uint64_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest);
|
||||
uint64_t unpack_numeric_field(uint8_t *ptr, uint8_t type, uint8_t* metadata, uint8_t* val);
|
||||
uint64_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count,
|
||||
size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t* metadata, struct tm *tm);
|
||||
size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest);
|
||||
size_t unpack_numeric_field(uint8_t *ptr, uint8_t type, uint8_t* metadata, uint8_t* val);
|
||||
size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count,
|
||||
uint32_t curr_col_index, uint8_t *metadata, uint64_t *dest);
|
||||
size_t unpack_decimal_field(uint8_t *ptr, uint8_t *metadata, double *val_float);
|
||||
|
||||
void format_temporal_value(char *str, size_t size, uint8_t type, struct tm *tm);
|
||||
|
||||
|
@ -558,26 +558,9 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
||||
}
|
||||
else if (column_is_decimal(map->column_types[i]))
|
||||
{
|
||||
const int dec_dig = 9;
|
||||
int precision = metadata[metadata_offset];
|
||||
int decimals = metadata[metadata_offset + 1];
|
||||
int dig_bytes[] = {0, 1, 1, 2, 2, 3, 3, 4, 4, 4};
|
||||
int ipart = precision - decimals;
|
||||
int ipart1 = ipart / dec_dig;
|
||||
int fpart1 = decimals / dec_dig;
|
||||
int ipart2 = ipart - ipart1 * dec_dig;
|
||||
int fpart2 = decimals - fpart1 * dec_dig;
|
||||
int ibytes = ipart1 * 4 + dig_bytes[ipart2];
|
||||
int fbytes = fpart1 * 4 + dig_bytes[fpart2];
|
||||
ptr += ibytes + fbytes;
|
||||
|
||||
// TODO: Add support for DECIMAL
|
||||
if (!warn_decimal)
|
||||
{
|
||||
warn_decimal = true;
|
||||
MXS_WARNING("DECIMAL is not currently supported, values are stored as 0.");
|
||||
}
|
||||
avro_value_set_int(&field, 0);
|
||||
double f_value = 0.0;
|
||||
ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value);
|
||||
avro_value_set_double(&field, f_value);
|
||||
}
|
||||
else if (column_is_variable_string(map->column_types[i]))
|
||||
{
|
||||
|
@ -44,7 +44,6 @@ static const char* column_type_to_avro_type(uint8_t type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case TABLE_COL_TYPE_NEWDECIMAL:
|
||||
case TABLE_COL_TYPE_TINY:
|
||||
case TABLE_COL_TYPE_SHORT:
|
||||
case TABLE_COL_TYPE_LONG:
|
||||
@ -56,6 +55,7 @@ static const char* column_type_to_avro_type(uint8_t type)
|
||||
return "float";
|
||||
|
||||
case TABLE_COL_TYPE_DOUBLE:
|
||||
case TABLE_COL_TYPE_NEWDECIMAL:
|
||||
return "double";
|
||||
|
||||
case TABLE_COL_TYPE_NULL:
|
||||
|
Loading…
x
Reference in New Issue
Block a user