Fix DECIMAL handling in Avrorouter
The DECIMAL value type is now properly handled in Avrorouter. It is processed into an Avro double value when before it was ignored and replaced with a zero integer.
This commit is contained in:
@ -23,6 +23,7 @@
|
||||
#include <maxscale/debug.h>
|
||||
#include <maxscale/users.h>
|
||||
#include <strings.h>
|
||||
#include <math.h>
|
||||
|
||||
/**
|
||||
* @brief Convert a table column type to a string
|
||||
@ -354,7 +355,7 @@ static void unpack_date(uint8_t *ptr, struct tm *dest)
|
||||
* @param metadata Pointer to field metadata
|
||||
* @return Length of the processed field in bytes
|
||||
*/
|
||||
uint64_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest)
|
||||
size_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest)
|
||||
{
|
||||
memcpy(dest, ptr, metadata[1]);
|
||||
return metadata[1];
|
||||
@ -377,7 +378,7 @@ uint64_t unpack_enum(uint8_t *ptr, uint8_t *metadata, uint8_t *dest)
|
||||
* @param dest Destination where the value is stored
|
||||
* @return Length of the processed field in bytes
|
||||
*/
|
||||
uint64_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count,
|
||||
size_t unpack_bit(uint8_t *ptr, uint8_t *null_mask, uint32_t col_count,
|
||||
uint32_t curr_col_index, uint8_t *metadata, uint64_t *dest)
|
||||
{
|
||||
if (metadata[1])
|
||||
@ -435,7 +436,7 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals)
|
||||
* @param val Extracted packed value
|
||||
* @param tm Pointer where the unpacked temporal value is stored
|
||||
*/
|
||||
uint64_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, struct tm *tm)
|
||||
size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, struct tm *tm)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
@ -550,3 +551,84 @@ size_t unpack_numeric_field(uint8_t *src, uint8_t type, uint8_t *metadata, uint8
|
||||
memcpy(dest, src, size);
|
||||
return size;
|
||||
}
|
||||
|
||||
static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes)
|
||||
{
|
||||
uint64_t val = 0;
|
||||
|
||||
switch (bytes)
|
||||
{
|
||||
case 1:
|
||||
val = ptr[0];
|
||||
break;
|
||||
case 2:
|
||||
val = ptr[1] | ((uint64_t)(ptr[0]) << 8);
|
||||
break;
|
||||
case 3:
|
||||
val = (uint64_t)ptr[2] | ((uint64_t)ptr[1] << 8) | ((uint64_t)ptr[0] << 16);
|
||||
break;
|
||||
case 4:
|
||||
val = (uint64_t)ptr[3] | ((uint64_t)ptr[2] << 8) | ((uint64_t)ptr[1] << 16) | ((uint64_t)ptr[0] << 24);
|
||||
break;
|
||||
case 5:
|
||||
val = (uint64_t)ptr[4] | ((uint64_t)ptr[3] << 8) | ((uint64_t)ptr[2] << 16) | ((uint64_t)ptr[1] << 24) | ((uint64_t)ptr[0] << 32);
|
||||
break;
|
||||
case 6:
|
||||
val = (uint64_t)ptr[5] | ((uint64_t)ptr[4] << 8) | ((uint64_t)ptr[3] << 16) | ((uint64_t)ptr[2] << 24) | ((uint64_t)ptr[1] << 32) | ((uint64_t)ptr[0] << 40);
|
||||
break;
|
||||
case 7:
|
||||
val = (uint64_t)ptr[6] | ((uint64_t)ptr[5] << 8) | ((uint64_t)ptr[4] << 16) | ((uint64_t)ptr[3] << 24) | ((uint64_t)ptr[2] << 32) | ((uint64_t)ptr[1] << 40) | ((uint64_t)ptr[0] << 48);
|
||||
break;
|
||||
case 8:
|
||||
val = (uint64_t)ptr[7] | ((uint64_t)ptr[6] << 8) | ((uint64_t)ptr[5] << 16) | ((uint64_t)ptr[4] << 24) | ((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) | ((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56);
|
||||
break;
|
||||
}
|
||||
|
||||
return val;
|
||||
}
|
||||
|
||||
size_t unpack_decimal_field(uint8_t *ptr, uint8_t *metadata, double *val_float)
|
||||
{
|
||||
const int dec_dig = 9;
|
||||
int precision = metadata[0];
|
||||
int decimals = metadata[1];
|
||||
int dig_bytes[] = {0, 1, 1, 2, 2, 3, 3, 4, 4, 4};
|
||||
int ipart = precision - decimals;
|
||||
int ipart1 = ipart / dec_dig;
|
||||
int fpart1 = decimals / dec_dig;
|
||||
int ipart2 = ipart - ipart1 * dec_dig;
|
||||
int fpart2 = decimals - fpart1 * dec_dig;
|
||||
int ibytes = ipart1 * 4 + dig_bytes[ipart2];
|
||||
int fbytes = fpart1 * 4 + dig_bytes[fpart2];
|
||||
|
||||
/** Remove the sign bit and store it locally */
|
||||
bool signed_int = (ptr[0] & 0x80);
|
||||
|
||||
if (!signed_int)
|
||||
{
|
||||
ptr[0] |= 0x80;
|
||||
|
||||
for (int i = 0; i < ibytes; i++)
|
||||
{
|
||||
ptr[i] = ~ptr[i];
|
||||
}
|
||||
|
||||
for (int i = 0; i < fbytes; i++)
|
||||
{
|
||||
ptr[i + ibytes] = ~ptr[i + ibytes];
|
||||
}
|
||||
}
|
||||
|
||||
int64_t val_i = unpack_bytes(ptr, ibytes);
|
||||
int64_t val_f = fbytes ? unpack_bytes(ptr + ibytes, fbytes) : 0;
|
||||
|
||||
if (!signed_int)
|
||||
{
|
||||
val_i = -val_i;
|
||||
val_f = -val_f;
|
||||
}
|
||||
|
||||
*val_float = (double)val_i + ((double)val_f / (pow(10.0, decimals)));
|
||||
|
||||
return ibytes + fbytes;
|
||||
}
|
||||
|
Reference in New Issue
Block a user