Fix DATETIME and BLOB processing

The old DATETIME format wasn't processed properly which caused a
corruption of following events.

A BLOB type value could be non-NULL but still have no data. In this case,
the value should be stored as a null Avro value.
This commit is contained in:
Markus Mäkelä
2017-03-04 00:26:51 +02:00
parent 075ca42482
commit dadc0d6a9d
2 changed files with 27 additions and 10 deletions

View File

@ -98,6 +98,7 @@ const char* column_type_to_string(uint8_t type)
case TABLE_COL_TYPE_GEOMETRY: case TABLE_COL_TYPE_GEOMETRY:
return "GEOMETRY"; return "GEOMETRY";
default: default:
ss_dassert(false);
break; break;
} }
return "UNKNOWN"; return "UNKNOWN";
@ -216,7 +217,6 @@ static void unpack_year(uint8_t *ptr, struct tm *dest)
dest->tm_year = *ptr; dest->tm_year = *ptr;
} }
#ifdef USE_OLD_DATETIME
/** /**
* @brief Unpack a DATETIME * @brief Unpack a DATETIME
* *
@ -225,8 +225,10 @@ static void unpack_year(uint8_t *ptr, struct tm *dest)
* @param val Value read from the binary log * @param val Value read from the binary log
* @param dest Pointer where the unpacked value is stored * @param dest Pointer where the unpacked value is stored
*/ */
static void unpack_datetime(uint8_t *ptr, uint8_t decimals, struct tm *dest) static void unpack_datetime(uint8_t *ptr, struct tm *dest)
{ {
uint64_t val = 0;
memcpy(&val, ptr, sizeof(val));
uint32_t second = val - ((val / 100) * 100); uint32_t second = val - ((val / 100) * 100);
val /= 100; val /= 100;
uint32_t minute = val - ((val / 100) * 100); uint32_t minute = val - ((val / 100) * 100);
@ -241,13 +243,12 @@ static void unpack_datetime(uint8_t *ptr, uint8_t decimals, struct tm *dest)
memset(dest, 0, sizeof(struct tm)); memset(dest, 0, sizeof(struct tm));
dest->tm_year = year - 1900; dest->tm_year = year - 1900;
dest->tm_mon = month; dest->tm_mon = month - 1;
dest->tm_mday = day; dest->tm_mday = day;
dest->tm_hour = hour; dest->tm_hour = hour;
dest->tm_min = minute; dest->tm_min = minute;
dest->tm_sec = second; dest->tm_sec = second;
} }
#endif
/** /**
* Unpack a 5 byte reverse byte order value * Unpack a 5 byte reverse byte order value
@ -413,6 +414,8 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals)
return 3 + ((decimals + 1) / 2); return 3 + ((decimals + 1) / 2);
case TABLE_COL_TYPE_DATETIME: case TABLE_COL_TYPE_DATETIME:
return 8;
case TABLE_COL_TYPE_TIMESTAMP: case TABLE_COL_TYPE_TIMESTAMP:
return 4; return 4;
@ -448,8 +451,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru
break; break;
case TABLE_COL_TYPE_DATETIME: case TABLE_COL_TYPE_DATETIME:
// This is not used with MariaDB RBR unpack_datetime(ptr, tm);
//unpack_datetime(ptr, *metadata, tm);
break; break;
case TABLE_COL_TYPE_DATETIME2: case TABLE_COL_TYPE_DATETIME2:
@ -468,6 +470,10 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru
case TABLE_COL_TYPE_TIMESTAMP2: case TABLE_COL_TYPE_TIMESTAMP2:
unpack_timestamp(ptr, *metadata, tm); unpack_timestamp(ptr, *metadata, tm);
break; break;
default:
ss_dassert(false);
break;
} }
return temporal_field_size(type, *metadata); return temporal_field_size(type, *metadata);
} }
@ -597,6 +603,10 @@ static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes)
((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) | ((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) |
((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56); ((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56);
break; break;
default:
ss_dassert(false);
break;
} }
return val; return val;

View File

@ -271,7 +271,6 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
/** There should always be a table map event prior to a row event. /** There should always be a table map event prior to a row event.
* TODO: Make the active_maps dynamic */ * TODO: Make the active_maps dynamic */
TABLE_MAP *map = router->active_maps[table_id % sizeof(router->active_maps)]; TABLE_MAP *map = router->active_maps[table_id % sizeof(router->active_maps)];
ss_dassert(map);
if (map) if (map)
{ {
@ -289,10 +288,11 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
* beforehand so we must continue processing them until we reach the end * beforehand so we must continue processing them until we reach the end
* of the event. */ * of the event. */
int rows = 0; int rows = 0;
while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN) while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN)
{ {
/** Add the current GTID and timestamp */ /** Add the current GTID and timestamp */
uint8_t *end = ptr + hdr->event_size; uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
int event_type = get_event_type(hdr->event_type); int event_type = get_event_type(hdr->event_type);
prepare_record(router, hdr, event_type, &record); prepare_record(router, hdr, event_type, &record);
ptr = process_row_event_data(map, create, &record, ptr, col_present, end); ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
@ -597,8 +597,15 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
uint64_t len = 0; uint64_t len = 0;
memcpy(&len, ptr, bytes); memcpy(&len, ptr, bytes);
ptr += bytes; ptr += bytes;
if (len)
{
avro_value_set_bytes(&field, ptr, len); avro_value_set_bytes(&field, ptr, len);
ptr += len; ptr += len;
}
else
{
avro_value_set_null(&field);
}
ss_dassert(ptr < end); ss_dassert(ptr < end);
} }
else if (column_is_temporal(map->column_types[i])) else if (column_is_temporal(map->column_types[i]))