diff --git a/server/core/mysql_binlog.c b/server/core/mysql_binlog.c index 5e0539679..ddf5825c0 100644 --- a/server/core/mysql_binlog.c +++ b/server/core/mysql_binlog.c @@ -98,6 +98,7 @@ const char* column_type_to_string(uint8_t type) case TABLE_COL_TYPE_GEOMETRY: return "GEOMETRY"; default: + ss_dassert(false); break; } return "UNKNOWN"; @@ -216,7 +217,6 @@ static void unpack_year(uint8_t *ptr, struct tm *dest) dest->tm_year = *ptr; } -#ifdef USE_OLD_DATETIME /** * @brief Unpack a DATETIME * @@ -225,8 +225,10 @@ static void unpack_year(uint8_t *ptr, struct tm *dest) * @param val Value read from the binary log * @param dest Pointer where the unpacked value is stored */ -static void unpack_datetime(uint8_t *ptr, uint8_t decimals, struct tm *dest) +static void unpack_datetime(uint8_t *ptr, struct tm *dest) { + uint64_t val = 0; + memcpy(&val, ptr, sizeof(val)); uint32_t second = val - ((val / 100) * 100); val /= 100; uint32_t minute = val - ((val / 100) * 100); @@ -241,13 +243,12 @@ static void unpack_datetime(uint8_t *ptr, uint8_t decimals, struct tm *dest) memset(dest, 0, sizeof(struct tm)); dest->tm_year = year - 1900; - dest->tm_mon = month; + dest->tm_mon = month - 1; dest->tm_mday = day; dest->tm_hour = hour; dest->tm_min = minute; dest->tm_sec = second; } -#endif /** * Unpack a 5 byte reverse byte order value @@ -413,6 +414,8 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals) return 3 + ((decimals + 1) / 2); case TABLE_COL_TYPE_DATETIME: + return 8; + case TABLE_COL_TYPE_TIMESTAMP: return 4; @@ -448,8 +451,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru break; case TABLE_COL_TYPE_DATETIME: - // This is not used with MariaDB RBR - //unpack_datetime(ptr, *metadata, tm); + unpack_datetime(ptr, tm); break; case TABLE_COL_TYPE_DATETIME2: @@ -468,6 +470,10 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru case TABLE_COL_TYPE_TIMESTAMP2: unpack_timestamp(ptr, *metadata, tm); break; + + default: + ss_dassert(false); + break; } return temporal_field_size(type, *metadata); } @@ -597,6 +603,10 @@ static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes) ((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) | ((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56); break; + + default: + ss_dassert(false); + break; } return val; diff --git a/server/modules/routing/avro/avro_rbr.c b/server/modules/routing/avro/avro_rbr.c index 3ac3cf8d5..7390a6dc0 100644 --- a/server/modules/routing/avro/avro_rbr.c +++ b/server/modules/routing/avro/avro_rbr.c @@ -271,7 +271,6 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) /** There should always be a table map event prior to a row event. * TODO: Make the active_maps dynamic */ TABLE_MAP *map = router->active_maps[table_id % sizeof(router->active_maps)]; - ss_dassert(map); if (map) { @@ -289,10 +288,11 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) * beforehand so we must continue processing them until we reach the end * of the event. */ int rows = 0; + while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN) { /** Add the current GTID and timestamp */ - uint8_t *end = ptr + hdr->event_size; + uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN; int event_type = get_event_type(hdr->event_type); prepare_record(router, hdr, event_type, &record); ptr = process_row_event_data(map, create, &record, ptr, col_present, end); @@ -597,8 +597,15 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value uint64_t len = 0; memcpy(&len, ptr, bytes); ptr += bytes; - avro_value_set_bytes(&field, ptr, len); - ptr += len; + if (len) + { + avro_value_set_bytes(&field, ptr, len); + ptr += len; + } + else + { + avro_value_set_null(&field); + } ss_dassert(ptr < end); } else if (column_is_temporal(map->column_types[i]))