MXS-2263: Process unsigned integer columns correctly
The unsigned integers that would previously be interpreted as negative values are now correctly converted into their corresponding avro values. Due to a limitation in the Avro file format, 64-bit unsigned integers cannot be represented in their unsigned form. Since both the signed and unsigned versions of a 32-bit integer cannot fit into a single Avro int, the type for these was changed to long. This is a backwards incompatible change which means files generated with older versions will not convert unsigned values correctly.
This commit is contained in:
@ -100,9 +100,8 @@ static const char* column_type_to_avro_type(uint8_t type)
|
|||||||
{
|
{
|
||||||
case TABLE_COL_TYPE_TINY:
|
case TABLE_COL_TYPE_TINY:
|
||||||
case TABLE_COL_TYPE_SHORT:
|
case TABLE_COL_TYPE_SHORT:
|
||||||
case TABLE_COL_TYPE_LONG:
|
|
||||||
case TABLE_COL_TYPE_INT24:
|
|
||||||
case TABLE_COL_TYPE_BIT:
|
case TABLE_COL_TYPE_BIT:
|
||||||
|
case TABLE_COL_TYPE_INT24:
|
||||||
return "int";
|
return "int";
|
||||||
|
|
||||||
case TABLE_COL_TYPE_FLOAT:
|
case TABLE_COL_TYPE_FLOAT:
|
||||||
@ -115,6 +114,7 @@ static const char* column_type_to_avro_type(uint8_t type)
|
|||||||
case TABLE_COL_TYPE_NULL:
|
case TABLE_COL_TYPE_NULL:
|
||||||
return "null";
|
return "null";
|
||||||
|
|
||||||
|
case TABLE_COL_TYPE_LONG:
|
||||||
case TABLE_COL_TYPE_LONGLONG:
|
case TABLE_COL_TYPE_LONGLONG:
|
||||||
return "long";
|
return "long";
|
||||||
|
|
||||||
@ -414,43 +414,52 @@ bool AvroConverter::commit(const gtid_pos_t& gtid)
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
void AvroConverter::column(int i, int32_t value)
|
void AvroConverter::column_int(int i, int32_t value)
|
||||||
{
|
{
|
||||||
set_active(i);
|
set_active(i);
|
||||||
avro_value_set_int(&m_field, value);
|
avro_value_set_int(&m_field, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
void AvroConverter::column(int i, int64_t value)
|
void AvroConverter::column_long(int i, int64_t value)
|
||||||
{
|
{
|
||||||
set_active(i);
|
set_active(i);
|
||||||
avro_value_set_long(&m_field, value);
|
|
||||||
|
if (avro_value_get_type(&m_field) == AVRO_INT32)
|
||||||
|
{
|
||||||
|
// Pre-2.4.3 versions use int for 32-bit integers whereas 2.4.3 and newer use long
|
||||||
|
avro_value_set_int(&m_field, value);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
avro_value_set_long(&m_field, value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void AvroConverter::column(int i, float value)
|
void AvroConverter::column_float(int i, float value)
|
||||||
{
|
{
|
||||||
set_active(i);
|
set_active(i);
|
||||||
avro_value_set_float(&m_field, value);
|
avro_value_set_float(&m_field, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
void AvroConverter::column(int i, double value)
|
void AvroConverter::column_double(int i, double value)
|
||||||
{
|
{
|
||||||
set_active(i);
|
set_active(i);
|
||||||
avro_value_set_double(&m_field, value);
|
avro_value_set_double(&m_field, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
void AvroConverter::column(int i, std::string value)
|
void AvroConverter::column_string(int i, const std::string& value)
|
||||||
{
|
{
|
||||||
set_active(i);
|
set_active(i);
|
||||||
avro_value_set_string(&m_field, value.c_str());
|
avro_value_set_string(&m_field, value.c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
void AvroConverter::column(int i, uint8_t* value, int len)
|
void AvroConverter::column_bytes(int i, uint8_t* value, int len)
|
||||||
{
|
{
|
||||||
set_active(i);
|
set_active(i);
|
||||||
avro_value_set_bytes(&m_field, value, len);
|
avro_value_set_bytes(&m_field, value, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
void AvroConverter::column(int i)
|
void AvroConverter::column_null(int i)
|
||||||
{
|
{
|
||||||
set_active(i);
|
set_active(i);
|
||||||
avro_value_set_branch(&m_union_value, 0, &m_field);
|
avro_value_set_branch(&m_union_value, 0, &m_field);
|
||||||
|
@ -53,13 +53,13 @@ public:
|
|||||||
void flush_tables();
|
void flush_tables();
|
||||||
void prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type);
|
void prepare_row(const gtid_pos_t& gtid, const REP_HEADER& hdr, int event_type);
|
||||||
bool commit(const gtid_pos_t& gtid);
|
bool commit(const gtid_pos_t& gtid);
|
||||||
void column(int i, int32_t value);
|
void column_int(int i, int32_t value);
|
||||||
void column(int i, int64_t value);
|
void column_long(int i, int64_t value);
|
||||||
void column(int i, float value);
|
void column_float(int i, float value);
|
||||||
void column(int i, double value);
|
void column_double(int i, double value);
|
||||||
void column(int i, std::string value);
|
void column_string(int i, const std::string& value);
|
||||||
void column(int i, uint8_t* value, int len);
|
void column_bytes(int i, uint8_t* value, int len);
|
||||||
void column(int i);
|
void column_null(int i);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
avro_value_iface_t* m_writer_iface;
|
avro_value_iface_t* m_writer_iface;
|
||||||
|
@ -76,56 +76,78 @@ void set_numeric_field_value(SRowEventHandler& conv,
|
|||||||
int idx,
|
int idx,
|
||||||
uint8_t type,
|
uint8_t type,
|
||||||
uint8_t* metadata,
|
uint8_t* metadata,
|
||||||
uint8_t* value)
|
uint8_t* value,
|
||||||
|
bool is_unsigned)
|
||||||
{
|
{
|
||||||
switch (type)
|
switch (type)
|
||||||
{
|
{
|
||||||
case TABLE_COL_TYPE_TINY:
|
case TABLE_COL_TYPE_TINY:
|
||||||
|
if (is_unsigned)
|
||||||
{
|
{
|
||||||
char c = *value;
|
uint8_t c = *value;
|
||||||
conv->column(idx, c);
|
conv->column_int(idx, c);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
int8_t c = *value;
|
||||||
|
conv->column_int(idx, c);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
case TABLE_COL_TYPE_SHORT:
|
case TABLE_COL_TYPE_SHORT:
|
||||||
|
if (is_unsigned)
|
||||||
{
|
{
|
||||||
short s = gw_mysql_get_byte2(value);
|
uint16_t s = gw_mysql_get_byte2(value);
|
||||||
conv->column(idx, s);
|
conv->column_int(idx, s);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
int16_t s = gw_mysql_get_byte2(value);
|
||||||
|
conv->column_int(idx, s);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
case TABLE_COL_TYPE_INT24:
|
case TABLE_COL_TYPE_INT24:
|
||||||
|
if (is_unsigned)
|
||||||
{
|
{
|
||||||
int x = gw_mysql_get_byte3(value);
|
uint32_t x = gw_mysql_get_byte3(value);
|
||||||
|
conv->column_int(idx, x);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
int32_t x = gw_mysql_get_byte3(value);
|
||||||
|
|
||||||
if (x & 0x800000)
|
if (x & 0x800000)
|
||||||
{
|
{
|
||||||
x = -((0xffffff & (~x)) + 1);
|
x = -((0xffffff & (~x)) + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
conv->column(idx, x);
|
conv->column_int(idx, (int64_t)x);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
case TABLE_COL_TYPE_LONG:
|
case TABLE_COL_TYPE_LONG:
|
||||||
|
if (is_unsigned)
|
||||||
{
|
{
|
||||||
int x = gw_mysql_get_byte4(value);
|
uint32_t x = gw_mysql_get_byte4(value);
|
||||||
conv->column(idx, x);
|
conv->column_long(idx, x);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
int32_t x = gw_mysql_get_byte4(value);
|
||||||
|
conv->column_long(idx, x);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
case TABLE_COL_TYPE_LONGLONG:
|
case TABLE_COL_TYPE_LONGLONG:
|
||||||
{
|
conv->column_long(idx, gw_mysql_get_byte8(value));
|
||||||
long l = gw_mysql_get_byte8(value);
|
break;
|
||||||
conv->column(idx, l);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case TABLE_COL_TYPE_FLOAT:
|
case TABLE_COL_TYPE_FLOAT:
|
||||||
{
|
{
|
||||||
float f = 0;
|
float f = 0;
|
||||||
memcpy(&f, value, 4);
|
memcpy(&f, value, 4);
|
||||||
conv->column(idx, f);
|
conv->column_float(idx, f);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -133,7 +155,7 @@ void set_numeric_field_value(SRowEventHandler& conv,
|
|||||||
{
|
{
|
||||||
double d = 0;
|
double d = 0;
|
||||||
memcpy(&d, value, 8);
|
memcpy(&d, value, 8);
|
||||||
conv->column(idx, d);
|
conv->column_double(idx, d);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -270,7 +292,7 @@ uint8_t* process_row_event_data(STableMapEvent map,
|
|||||||
if (bit_is_set(null_bitmap, ncolumns, i))
|
if (bit_is_set(null_bitmap, ncolumns, i))
|
||||||
{
|
{
|
||||||
sprintf(trace[i], "[%ld] NULL", i);
|
sprintf(trace[i], "[%ld] NULL", i);
|
||||||
conv->column(i);
|
conv->column_null(i);
|
||||||
}
|
}
|
||||||
else if (column_is_fixed_string(map->column_types[i]))
|
else if (column_is_fixed_string(map->column_types[i]))
|
||||||
{
|
{
|
||||||
@ -282,7 +304,7 @@ uint8_t* process_row_event_data(STableMapEvent map,
|
|||||||
uint64_t bytes = unpack_enum(ptr, &metadata[metadata_offset], val);
|
uint64_t bytes = unpack_enum(ptr, &metadata[metadata_offset], val);
|
||||||
char strval[bytes * 2 + 1];
|
char strval[bytes * 2 + 1];
|
||||||
gw_bin2hex(strval, val, bytes);
|
gw_bin2hex(strval, val, bytes);
|
||||||
conv->column(i, strval);
|
conv->column_string(i, strval);
|
||||||
sprintf(trace[i], "[%ld] ENUM: %lu bytes", i, bytes);
|
sprintf(trace[i], "[%ld] ENUM: %lu bytes", i, bytes);
|
||||||
ptr += bytes;
|
ptr += bytes;
|
||||||
check_overflow(ptr <= end);
|
check_overflow(ptr <= end);
|
||||||
@ -319,7 +341,7 @@ uint8_t* process_row_event_data(STableMapEvent map,
|
|||||||
char str[bytes + 1];
|
char str[bytes + 1];
|
||||||
memcpy(str, ptr, bytes);
|
memcpy(str, ptr, bytes);
|
||||||
str[bytes] = '\0';
|
str[bytes] = '\0';
|
||||||
conv->column(i, str);
|
conv->column_string(i, str);
|
||||||
ptr += bytes;
|
ptr += bytes;
|
||||||
check_overflow(ptr <= end);
|
check_overflow(ptr <= end);
|
||||||
}
|
}
|
||||||
@ -336,7 +358,7 @@ uint8_t* process_row_event_data(STableMapEvent map,
|
|||||||
warn_bit = true;
|
warn_bit = true;
|
||||||
MXS_WARNING("BIT is not currently supported, values are stored as 0.");
|
MXS_WARNING("BIT is not currently supported, values are stored as 0.");
|
||||||
}
|
}
|
||||||
conv->column(i, 0);
|
conv->column_int(i, 0);
|
||||||
sprintf(trace[i], "[%ld] BIT", i);
|
sprintf(trace[i], "[%ld] BIT", i);
|
||||||
ptr += bytes;
|
ptr += bytes;
|
||||||
check_overflow(ptr <= end);
|
check_overflow(ptr <= end);
|
||||||
@ -345,7 +367,7 @@ uint8_t* process_row_event_data(STableMapEvent map,
|
|||||||
{
|
{
|
||||||
double f_value = 0.0;
|
double f_value = 0.0;
|
||||||
ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value);
|
ptr += unpack_decimal_field(ptr, metadata + metadata_offset, &f_value);
|
||||||
conv->column(i, f_value);
|
conv->column_double(i, f_value);
|
||||||
sprintf(trace[i], "[%ld] DECIMAL", i);
|
sprintf(trace[i], "[%ld] DECIMAL", i);
|
||||||
check_overflow(ptr <= end);
|
check_overflow(ptr <= end);
|
||||||
}
|
}
|
||||||
@ -369,7 +391,7 @@ uint8_t* process_row_event_data(STableMapEvent map,
|
|||||||
memcpy(buf, ptr, sz);
|
memcpy(buf, ptr, sz);
|
||||||
buf[sz] = '\0';
|
buf[sz] = '\0';
|
||||||
ptr += sz;
|
ptr += sz;
|
||||||
conv->column(i, buf);
|
conv->column_string(i, buf);
|
||||||
check_overflow(ptr <= end);
|
check_overflow(ptr <= end);
|
||||||
}
|
}
|
||||||
else if (column_is_blob(map->column_types[i]))
|
else if (column_is_blob(map->column_types[i]))
|
||||||
@ -381,13 +403,13 @@ uint8_t* process_row_event_data(STableMapEvent map,
|
|||||||
sprintf(trace[i], "[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len);
|
sprintf(trace[i], "[%ld] BLOB: field: %d bytes, data: %lu bytes", i, bytes, len);
|
||||||
if (len)
|
if (len)
|
||||||
{
|
{
|
||||||
conv->column(i, ptr, len);
|
conv->column_bytes(i, ptr, len);
|
||||||
ptr += len;
|
ptr += len;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
uint8_t nullvalue = 0;
|
uint8_t nullvalue = 0;
|
||||||
conv->column(i, &nullvalue, 1);
|
conv->column_bytes(i, &nullvalue, 1);
|
||||||
}
|
}
|
||||||
check_overflow(ptr <= end);
|
check_overflow(ptr <= end);
|
||||||
}
|
}
|
||||||
@ -401,7 +423,7 @@ uint8_t* process_row_event_data(STableMapEvent map,
|
|||||||
create->columns[i].length,
|
create->columns[i].length,
|
||||||
&tm);
|
&tm);
|
||||||
format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm);
|
format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm);
|
||||||
conv->column(i, buf);
|
conv->column_string(i, buf);
|
||||||
sprintf(trace[i], "[%ld] %s: %s", i, column_type_to_string(map->column_types[i]), buf);
|
sprintf(trace[i], "[%ld] %s: %s", i, column_type_to_string(map->column_types[i]), buf);
|
||||||
check_overflow(ptr <= end);
|
check_overflow(ptr <= end);
|
||||||
}
|
}
|
||||||
@ -414,7 +436,8 @@ uint8_t* process_row_event_data(STableMapEvent map,
|
|||||||
map->column_types[i],
|
map->column_types[i],
|
||||||
&metadata[metadata_offset],
|
&metadata[metadata_offset],
|
||||||
lval);
|
lval);
|
||||||
set_numeric_field_value(conv, i, map->column_types[i], &metadata[metadata_offset], lval);
|
set_numeric_field_value(conv, i, map->column_types[i], &metadata[metadata_offset], lval,
|
||||||
|
create->columns[i].is_unsigned);
|
||||||
sprintf(trace[i], "[%ld] %s", i, column_type_to_string(map->column_types[i]));
|
sprintf(trace[i], "[%ld] %s", i, column_type_to_string(map->column_types[i]));
|
||||||
check_overflow(ptr <= end);
|
check_overflow(ptr <= end);
|
||||||
}
|
}
|
||||||
|
@ -202,26 +202,26 @@ public:
|
|||||||
// Called once all columns are processed
|
// Called once all columns are processed
|
||||||
virtual bool commit(const gtid_pos_t& gtid) = 0;
|
virtual bool commit(const gtid_pos_t& gtid) = 0;
|
||||||
|
|
||||||
// 32-bit integer handler
|
// Integer handler for short types (less than 32 bits)
|
||||||
virtual void column(int i, int32_t value) = 0;
|
virtual void column_int(int i, int32_t value) = 0;
|
||||||
|
|
||||||
// 64-bit integer handler
|
// Integer handler for long integer types
|
||||||
virtual void column(int i, int64_t value) = 0;
|
virtual void column_long(int i, int64_t value) = 0;
|
||||||
|
|
||||||
// Float handler
|
// Float handler
|
||||||
virtual void column(int i, float value) = 0;
|
virtual void column_float(int i, float value) = 0;
|
||||||
|
|
||||||
// Double handler
|
// Double handler
|
||||||
virtual void column(int i, double value) = 0;
|
virtual void column_double(int i, double value) = 0;
|
||||||
|
|
||||||
// String handler
|
// String handler
|
||||||
virtual void column(int i, std::string value) = 0;
|
virtual void column_string(int i, const std::string& value) = 0;
|
||||||
|
|
||||||
// Bytes handler
|
// Bytes handler
|
||||||
virtual void column(int i, uint8_t* value, int len) = 0;
|
virtual void column_bytes(int i, uint8_t* value, int len) = 0;
|
||||||
|
|
||||||
// Empty (NULL) value type handler
|
// Empty (NULL) value type handler
|
||||||
virtual void column(int i) = 0;
|
virtual void column_null(int i) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef std::auto_ptr<RowEventHandler> SRowEventHandler;
|
typedef std::auto_ptr<RowEventHandler> SRowEventHandler;
|
||||||
|
Reference in New Issue
Block a user