diff --git a/Documentation/Routers/Avrorouter.md b/Documentation/Routers/Avrorouter.md index f41dc7216..dcd86c648 100644 --- a/Documentation/Routers/Avrorouter.md +++ b/Documentation/Routers/Avrorouter.md @@ -162,6 +162,11 @@ data block. The default value is 1 transaction. Controls the number of row events that are grouped into a single Avro data block. The default value is 1000 row events. +#### `block_size` + +The Avro data block size in bytes. The default is 16 kilobytes. Increase this +value if individual events in the binary logs are very large. + ## Module commands Read [Module Commands](../Reference/Module-Commands.md) documentation for details about module commands. diff --git a/server/core/mysql_binlog.c b/server/core/mysql_binlog.c index 8eeaaa02e..4c5c38c20 100644 --- a/server/core/mysql_binlog.c +++ b/server/core/mysql_binlog.c @@ -97,6 +97,7 @@ const char* column_type_to_string(uint8_t type) case TABLE_COL_TYPE_GEOMETRY: return "GEOMETRY"; default: + ss_dassert(false); break; } return "UNKNOWN"; @@ -215,7 +216,6 @@ static void unpack_year(uint8_t *ptr, struct tm *dest) dest->tm_year = *ptr; } -#ifdef USE_OLD_DATETIME /** * @brief Unpack a DATETIME * @@ -224,8 +224,10 @@ static void unpack_year(uint8_t *ptr, struct tm *dest) * @param val Value read from the binary log * @param dest Pointer where the unpacked value is stored */ -static void unpack_datetime(uint8_t *ptr, uint8_t decimals, struct tm *dest) +static void unpack_datetime(uint8_t *ptr, struct tm *dest) { + uint64_t val = 0; + memcpy(&val, ptr, sizeof(val)); uint32_t second = val - ((val / 100) * 100); val /= 100; uint32_t minute = val - ((val / 100) * 100); @@ -240,13 +242,12 @@ static void unpack_datetime(uint8_t *ptr, uint8_t decimals, struct tm *dest) memset(dest, 0, sizeof(struct tm)); dest->tm_year = year - 1900; - dest->tm_mon = month; + dest->tm_mon = month - 1; dest->tm_mday = day; dest->tm_hour = hour; dest->tm_min = minute; dest->tm_sec = second; } -#endif /** * Unpack a 5 byte reverse byte order value @@ -412,6 +413,8 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals) return 3 + ((decimals + 1) / 2); case TABLE_COL_TYPE_DATETIME: + return 8; + case TABLE_COL_TYPE_TIMESTAMP: return 4; @@ -447,8 +450,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru break; case TABLE_COL_TYPE_DATETIME: - // This is not used with MariaDB RBR - //unpack_datetime(ptr, *metadata, tm); + unpack_datetime(ptr, tm); break; case TABLE_COL_TYPE_DATETIME2: @@ -467,6 +469,10 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru case TABLE_COL_TYPE_TIMESTAMP2: unpack_timestamp(ptr, *metadata, tm); break; + + default: + ss_dassert(false); + break; } return temporal_field_size(type, *metadata); } @@ -560,42 +566,46 @@ static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes) switch (bytes) { - case 1: - val = ptr[0]; - break; - case 2: - val = ptr[1] | ((uint64_t)(ptr[0]) << 8); - break; - case 3: - val = (uint64_t)ptr[2] | ((uint64_t)ptr[1] << 8) | - ((uint64_t)ptr[0] << 16); - break; - case 4: - val = (uint64_t)ptr[3] | ((uint64_t)ptr[2] << 8) | - ((uint64_t)ptr[1] << 16) | ((uint64_t)ptr[0] << 24); - break; - case 5: - val = (uint64_t)ptr[4] | ((uint64_t)ptr[3] << 8) | - ((uint64_t)ptr[2] << 16) | ((uint64_t)ptr[1] << 24) | - ((uint64_t)ptr[0] << 32); - break; - case 6: - val = (uint64_t)ptr[5] | ((uint64_t)ptr[4] << 8) | - ((uint64_t)ptr[3] << 16) | ((uint64_t)ptr[2] << 24) | - ((uint64_t)ptr[1] << 32) | ((uint64_t)ptr[0] << 40); - break; - case 7: - val = (uint64_t)ptr[6] | ((uint64_t)ptr[5] << 8) | - ((uint64_t)ptr[4] << 16) | ((uint64_t)ptr[3] << 24) | - ((uint64_t)ptr[2] << 32) | ((uint64_t)ptr[1] << 40) | - ((uint64_t)ptr[0] << 48); - break; - case 8: - val = (uint64_t)ptr[7] | ((uint64_t)ptr[6] << 8) | - ((uint64_t)ptr[5] << 16) | ((uint64_t)ptr[4] << 24) | - ((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) | - ((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56); - break; + case 1: + val = ptr[0]; + break; + case 2: + val = ptr[1] | ((uint64_t)(ptr[0]) << 8); + break; + case 3: + val = (uint64_t)ptr[2] | ((uint64_t)ptr[1] << 8) | + ((uint64_t)ptr[0] << 16); + break; + case 4: + val = (uint64_t)ptr[3] | ((uint64_t)ptr[2] << 8) | + ((uint64_t)ptr[1] << 16) | ((uint64_t)ptr[0] << 24); + break; + case 5: + val = (uint64_t)ptr[4] | ((uint64_t)ptr[3] << 8) | + ((uint64_t)ptr[2] << 16) | ((uint64_t)ptr[1] << 24) | + ((uint64_t)ptr[0] << 32); + break; + case 6: + val = (uint64_t)ptr[5] | ((uint64_t)ptr[4] << 8) | + ((uint64_t)ptr[3] << 16) | ((uint64_t)ptr[2] << 24) | + ((uint64_t)ptr[1] << 32) | ((uint64_t)ptr[0] << 40); + break; + case 7: + val = (uint64_t)ptr[6] | ((uint64_t)ptr[5] << 8) | + ((uint64_t)ptr[4] << 16) | ((uint64_t)ptr[3] << 24) | + ((uint64_t)ptr[2] << 32) | ((uint64_t)ptr[1] << 40) | + ((uint64_t)ptr[0] << 48); + break; + case 8: + val = (uint64_t)ptr[7] | ((uint64_t)ptr[6] << 8) | + ((uint64_t)ptr[5] << 16) | ((uint64_t)ptr[4] << 24) | + ((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) | + ((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56); + break; + + default: + ss_dassert(false); + break; } return val; diff --git a/server/modules/routing/avrorouter/avro.c b/server/modules/routing/avrorouter/avro.c index 55e23b295..f9ca49f74 100644 --- a/server/modules/routing/avrorouter/avro.c +++ b/server/modules/routing/avrorouter/avro.c @@ -184,6 +184,7 @@ MXS_MODULE* MXS_CREATE_MODULE() {"group_rows", MXS_MODULE_PARAM_COUNT, "1000"}, {"group_trx", MXS_MODULE_PARAM_COUNT, "1"}, {"start_index", MXS_MODULE_PARAM_COUNT, "1"}, + {"block_size", MXS_MODULE_PARAM_COUNT, "0"}, {MXS_END_MODULE_PARAMS} } }; @@ -405,6 +406,7 @@ createInstance(SERVICE *service, char **options) inst->row_target = config_get_integer(params, "group_rows"); inst->trx_target = config_get_integer(params, "group_trx"); int first_file = config_get_integer(params, "start_index"); + inst->block_size = config_get_integer(params, "block_size"); MXS_CONFIG_PARAMETER *param = config_get_param(params, "source"); bool err = false; @@ -479,6 +481,10 @@ createInstance(SERVICE *service, char **options) { first_file = MXS_MAX(1, atoi(value)); } + else if (strcmp(options[i], "block_size") == 0) + { + inst->block_size = atoi(value); + } else { MXS_WARNING("Unknown router option: '%s'", options[i]); @@ -1054,14 +1060,20 @@ void converter_func(void* data) while (!router->service->svc_do_shutdown && ok && binlog_end == AVRO_OK) { uint64_t start_pos = router->current_pos; + char binlog_name[BINLOG_FNAMELEN + 1]; + strcpy(binlog_name, router->binlog_name); + if (avro_open_binlog(router->binlogdir, router->binlog_name, &router->binlog_fd)) { binlog_end = avro_read_all_events(router); - if (router->current_pos != start_pos) + if (router->current_pos != start_pos || strcmp(binlog_name, router->binlog_name) != 0) { /** We processed some data, reset the conversion task delay */ router->task_delay = 1; + + /** Update the GTID index */ + avro_update_index(router); } avro_close_binlog(router->binlog_fd); diff --git a/server/modules/routing/avrorouter/avro_file.c b/server/modules/routing/avrorouter/avro_file.c index 12dfd7836..833be696f 100644 --- a/server/modules/routing/avrorouter/avro_file.c +++ b/server/modules/routing/avrorouter/avro_file.c @@ -106,7 +106,7 @@ void avro_close_binlog(int fd) * @param filepath Path to the created file * @param json_schema The schema of the table in JSON format */ -AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema) +AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, size_t block_size) { AVRO_TABLE *table = MXS_CALLOC(1, sizeof(AVRO_TABLE)); if (table) @@ -127,7 +127,7 @@ AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema) } else { - rc = avro_file_writer_create(filepath, table->avro_schema, &table->avro_file); + rc = avro_file_writer_create_with_codec(filepath, table->avro_schema, &table->avro_file, "null", block_size); } if (rc) @@ -883,12 +883,6 @@ void avro_flush_all_tables(AVRO_INSTANCE *router, enum avrorouter_file_op flush) } hashtable_iterator_free(iter); } - - /** Update the GTID index */ - if (flush == AVROROUTER_FLUSH) - { - avro_update_index(router); - } } /** diff --git a/server/modules/routing/avrorouter/avro_rbr.c b/server/modules/routing/avrorouter/avro_rbr.c index c1753ed62..0b086cefb 100644 --- a/server/modules/routing/avrorouter/avro_rbr.c +++ b/server/modules/routing/avrorouter/avro_rbr.c @@ -105,7 +105,7 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr /** Close the file and open a new one */ hashtable_delete(router->open_tables, table_ident); - AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema); + AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema, router->block_size); if (avro_table) { @@ -289,14 +289,19 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) * beforehand so we must continue processing them until we reach the end * of the event. */ int rows = 0; + while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN) { /** Add the current GTID and timestamp */ - uint8_t *end = ptr + hdr->event_size; + uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN; int event_type = get_event_type(hdr->event_type); prepare_record(router, hdr, event_type, &record); ptr = process_row_event_data(map, create, &record, ptr, col_present, end); - avro_file_writer_append_value(table->avro_file, &record); + if (avro_file_writer_append_value(table->avro_file, &record)) + { + MXS_ERROR("Failed to write value at position %ld: %s", + router->current_pos, avro_strerror()); + } /** Update rows events have the before and after images of the * affected rows so we'll process them as another record with @@ -305,7 +310,11 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr) { prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record); ptr = process_row_event_data(map, create, &record, ptr, col_present, end); - avro_file_writer_append_value(table->avro_file, &record); + if (avro_file_writer_append_value(table->avro_file, &record)) + { + MXS_ERROR("Failed to write value at position %ld: %s", + router->current_pos, avro_strerror()); + } } rows++; @@ -501,14 +510,23 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value for (long i = 0; i < map->columns && npresent < ncolumns; i++) { ss_dassert(create->columns == map->columns); - avro_value_get_by_name(record, create->column_names[i], &field, NULL); + ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL); + ss_dassert(rc == 0); if (bit_is_set(columns_present, ncolumns, i)) { npresent++; if (bit_is_set(null_bitmap, ncolumns, i)) { - avro_value_set_null(&field); + if (column_is_blob(map->column_types[i])) + { + uint8_t nullvalue = 0; + avro_value_set_bytes(&field, &nullvalue, 1); + } + else + { + avro_value_set_null(&field); + } } else if (column_is_fixed_string(map->column_types[i])) { @@ -597,8 +615,16 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value uint64_t len = 0; memcpy(&len, ptr, bytes); ptr += bytes; - avro_value_set_bytes(&field, ptr, len); - ptr += len; + if (len) + { + avro_value_set_bytes(&field, ptr, len); + ptr += len; + } + else + { + uint8_t nullvalue = 0; + avro_value_set_bytes(&field, &nullvalue, 1); + } ss_dassert(ptr < end); } else if (column_is_temporal(map->column_types[i])) diff --git a/server/modules/routing/avrorouter/avrorouter.h b/server/modules/routing/avrorouter/avrorouter.h index 6ee02a74e..aa33922a9 100644 --- a/server/modules/routing/avrorouter/avrorouter.h +++ b/server/modules/routing/avrorouter/avrorouter.h @@ -274,6 +274,7 @@ typedef struct avro_instance uint64_t row_count; /*< Row events processed */ uint64_t row_target; /*< Minimum about of row events that will trigger * a flush of all tables */ + uint64_t block_size; /**< Avro datablock size */ struct avro_instance *next; } AVRO_INSTANCE; @@ -291,7 +292,7 @@ extern void avro_client_rotate(AVRO_INSTANCE *router, AVRO_CLIENT *client, uint8 extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd); extern void avro_close_binlog(int fd); extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router); -extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema); +extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, size_t block_size); extern void avro_table_free(AVRO_TABLE *table); extern char* json_new_schema_from_table(TABLE_MAP *map); extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map);